refactor(library): decompose monolithic LibraryClient into fetch/extract/positions/finished/format mixins while preserving public behavior

This commit is contained in:
2026-02-18 03:54:16 +01:00
parent eca58423dc
commit e813267d5e
6 changed files with 419 additions and 356 deletions

View File

@@ -1,365 +1,25 @@
"""Client for the Audible library API.""" """Client facade for Audible library fetch, extraction, and progress updates."""
from concurrent.futures import ThreadPoolExecutor, as_completed from __future__ import annotations
import audible import audible
from ..types import LibraryItem, StatusCallback from .client_extract import LibraryClientExtractMixin
from .client_fetch import LibraryClientFetchMixin
from .client_finished import LibraryClientFinishedMixin
from .client_format import LibraryClientFormatMixin
from .client_positions import LibraryClientPositionsMixin
class LibraryClient: class LibraryClient(
"""Client for the Audible library API. Fetches items, extracts metadata, and updates positions.""" LibraryClientFetchMixin,
LibraryClientExtractMixin,
LibraryClientPositionsMixin,
LibraryClientFinishedMixin,
LibraryClientFormatMixin,
):
"""Audible library client composed from focused behavior mixins."""
def __init__(self, client: audible.Client) -> None: def __init__(self, client: audible.Client) -> None:
"""Store authenticated Audible client used by all operations."""
self.client = client self.client = client
def fetch_all_items(self, on_progress: StatusCallback | None = None) -> list[LibraryItem]:
"""Fetch all library items from the API."""
response_groups = (
"contributors,media,product_attrs,product_desc,product_details,"
"is_finished,listening_status,percent_complete"
)
return self._fetch_all_pages(response_groups, on_progress)
def _fetch_page(
self, page: int, page_size: int, response_groups: str
) -> tuple[int, list[LibraryItem]]:
"""Fetch a single page of library items from the API."""
library = self.client.get(
path="library",
num_results=page_size,
page=page,
response_groups=response_groups,
)
items = library.get("items", [])
return page, list(items)
def _fetch_all_pages(
self, response_groups: str, on_progress: StatusCallback | None = None
) -> list[LibraryItem]:
"""Fetch all pages of library items using parallel requests."""
library_response = None
page_size = 200
for attempt_size in [200, 100, 50]:
try:
library_response = self.client.get(
path="library",
num_results=attempt_size,
page=1,
response_groups=response_groups,
)
page_size = attempt_size
break
except Exception:
continue
if not library_response:
return []
first_page_items = library_response.get("items", [])
if not first_page_items:
return []
all_items: list[LibraryItem] = list(first_page_items)
if on_progress:
on_progress(f"Fetched page 1 ({len(first_page_items)} items)...")
if len(first_page_items) < page_size:
return all_items
total_items_estimate = library_response.get(
"total_results") or library_response.get("total")
if total_items_estimate:
estimated_pages = (total_items_estimate +
page_size - 1) // page_size
estimated_pages = min(estimated_pages, 1000)
else:
estimated_pages = 500
max_workers = 50
page_results: dict[int, list[LibraryItem]] = {}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_page: dict = {}
for page in range(2, estimated_pages + 1):
future = executor.submit(
self._fetch_page, page, page_size, response_groups
)
future_to_page[future] = page
completed_count = 0
total_items = len(first_page_items)
for future in as_completed(future_to_page):
page_num = future_to_page.pop(future)
try:
fetched_page, items = future.result()
if not items or len(items) < page_size:
for remaining_future in list(future_to_page.keys()):
remaining_future.cancel()
break
page_results[fetched_page] = items
total_items += len(items)
completed_count += 1
if on_progress and completed_count % 20 == 0:
on_progress(
f"Fetched {completed_count} pages ({total_items} items)..."
)
except Exception:
pass
for page_num in sorted(page_results.keys()):
all_items.extend(page_results[page_num])
return all_items
def extract_title(self, item: LibraryItem) -> str:
"""Return the book title from a library item."""
product = item.get("product", {})
return (
product.get("title")
or item.get("title")
or product.get("asin", "Unknown Title")
)
def extract_authors(self, item: LibraryItem) -> str:
"""Return comma-separated author names from a library item."""
product = item.get("product", {})
authors = product.get("authors") or product.get("contributors") or []
if not authors and "authors" in item:
authors = item.get("authors", [])
author_names = [a.get("name", "")
for a in authors if isinstance(a, dict)]
return ", ".join(author_names) or "Unknown"
def extract_runtime_minutes(self, item: LibraryItem) -> int | None:
"""Return runtime in minutes if present."""
product = item.get("product", {})
runtime_fields = [
"runtime_length_min",
"runtime_length",
"vLength",
"length",
"duration",
]
runtime = None
for field in runtime_fields:
runtime = product.get(field) or item.get(field)
if runtime is not None:
break
if runtime is None:
return None
if isinstance(runtime, dict):
return int(runtime.get("min", 0))
if isinstance(runtime, (int, float)):
return int(runtime)
return None
def extract_progress_info(self, item: LibraryItem) -> float | None:
"""Return progress percentage (0100) if present."""
percent_complete = item.get("percent_complete")
listening_status = item.get("listening_status", {})
if isinstance(listening_status, dict) and percent_complete is None:
percent_complete = listening_status.get("percent_complete")
return float(percent_complete) if percent_complete is not None else None
def extract_asin(self, item: LibraryItem) -> str | None:
"""Return the ASIN for a library item."""
product = item.get("product", {})
return item.get("asin") or product.get("asin")
def is_finished(self, item: LibraryItem) -> bool:
"""Return True if the item is marked or inferred as finished."""
is_finished_flag = item.get("is_finished")
percent_complete = item.get("percent_complete")
listening_status = item.get("listening_status")
if isinstance(listening_status, dict):
is_finished_flag = is_finished_flag or listening_status.get(
"is_finished", False
)
if percent_complete is None:
percent_complete = listening_status.get("percent_complete", 0)
return bool(is_finished_flag) or (
isinstance(percent_complete, (int, float))
and percent_complete >= 100
)
def get_last_position(self, asin: str) -> float | None:
"""Get the last playback position for a book in seconds."""
try:
response = self.client.get(
path="1.0/annotations/lastpositions",
asins=asin,
)
annotations = response.get("asin_last_position_heard_annots", [])
for annot in annotations:
if annot.get("asin") != asin:
continue
last_position_heard = annot.get("last_position_heard", {})
if not isinstance(last_position_heard, dict):
continue
if last_position_heard.get("status") == "DoesNotExist":
return None
position_ms = last_position_heard.get("position_ms")
if position_ms is not None:
return float(position_ms) / 1000.0
return None
except (OSError, ValueError, KeyError):
return None
def _get_content_reference(self, asin: str) -> dict | None:
"""Fetch content reference (ACR and version) for position updates."""
try:
response = self.client.get(
path=f"1.0/content/{asin}/metadata",
response_groups="content_reference",
)
content_metadata = response.get("content_metadata", {})
content_reference = content_metadata.get("content_reference", {})
if isinstance(content_reference, dict):
return content_reference
return None
except (OSError, ValueError, KeyError):
return None
def _update_position(self, asin: str, position_seconds: float) -> bool:
"""Persist playback position to the API. Returns True on success."""
if position_seconds < 0:
return False
content_ref = self._get_content_reference(asin)
if not content_ref:
return False
acr = content_ref.get("acr")
if not acr:
return False
body = {
"acr": acr,
"asin": asin,
"position_ms": int(position_seconds * 1000),
}
if version := content_ref.get("version"):
body["version"] = version
try:
self.client.put(
path=f"1.0/lastpositions/{asin}",
body=body,
)
return True
except (OSError, ValueError, KeyError):
return False
def save_last_position(self, asin: str, position_seconds: float) -> bool:
"""Save playback position to Audible. Returns True on success."""
if position_seconds <= 0:
return False
return self._update_position(asin, position_seconds)
@staticmethod
def format_duration(
value: int | None, unit: str = "minutes", default_none: str | None = None
) -> str | None:
"""Format a duration value as e.g. 2h30m or 45m."""
if value is None or value <= 0:
return default_none
total_minutes = int(value)
if unit == "seconds":
total_minutes //= 60
hours, minutes = divmod(total_minutes, 60)
if hours > 0:
return f"{hours}h{minutes:02d}" if minutes else f"{hours}h"
return f"{minutes}m"
def mark_as_finished(self, asin: str, item: LibraryItem | None = None) -> bool:
"""Mark a book as finished on Audible. Optionally mutates item in place."""
total_ms = self._get_runtime_ms(asin, item)
if not total_ms:
return False
position_ms = total_ms
acr = self._get_acr(asin)
if not acr:
return False
try:
self.client.put(
path=f"1.0/lastpositions/{asin}",
body={"asin": asin, "acr": acr, "position_ms": position_ms},
)
if item:
item["is_finished"] = True
listening_status = item.get("listening_status", {})
if isinstance(listening_status, dict):
listening_status["is_finished"] = True
return True
except Exception:
return False
def _get_runtime_ms(self, asin: str, item: LibraryItem | None = None) -> int | None:
"""Return total runtime in ms from item or API."""
if item:
runtime_min = self.extract_runtime_minutes(item)
if runtime_min:
return runtime_min * 60 * 1000
try:
response = self.client.get(
path=f"1.0/content/{asin}/metadata",
response_groups="chapter_info",
)
chapter_info = response.get(
"content_metadata", {}).get("chapter_info", {})
return chapter_info.get("runtime_length_ms")
except Exception:
return None
def _get_acr(self, asin: str) -> str | None:
"""Fetch ACR token required for position and finish updates."""
try:
response = self.client.post(
path=f"1.0/content/{asin}/licenserequest",
body={
"response_groups": "content_reference",
"consumption_type": "Download",
"drm_type": "Adrm",
},
)
return response.get("content_license", {}).get("acr")
except Exception:
return None
@staticmethod
def format_time(seconds: float) -> str:
"""Format seconds as HH:MM:SS or MM:SS for display."""
total_seconds = int(seconds)
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
secs = total_seconds % 60
if hours > 0:
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
return f"{minutes:02d}:{secs:02d}"

View File

@@ -0,0 +1,84 @@
"""Metadata extraction helpers for library items."""
from __future__ import annotations
from ..types import LibraryItem
class LibraryClientExtractMixin:
"""Extracts display and status fields from library items."""
def extract_title(self, item: LibraryItem) -> str:
"""Return the book title from a library item."""
product = item.get("product", {})
return (
product.get("title")
or item.get("title")
or product.get("asin", "Unknown Title")
)
def extract_authors(self, item: LibraryItem) -> str:
"""Return comma-separated author names from a library item."""
product = item.get("product", {})
authors = product.get("authors") or product.get("contributors") or []
if not authors and "authors" in item:
authors = item.get("authors", [])
author_names = [
author.get("name", "") for author in authors if isinstance(author, dict)
]
return ", ".join(author_names) or "Unknown"
def extract_runtime_minutes(self, item: LibraryItem) -> int | None:
"""Return runtime in minutes if present."""
product = item.get("product", {})
runtime_fields = [
"runtime_length_min",
"runtime_length",
"vLength",
"length",
"duration",
]
runtime = None
for field in runtime_fields:
runtime = product.get(field) or item.get(field)
if runtime is not None:
break
if runtime is None:
return None
if isinstance(runtime, dict):
return int(runtime.get("min", 0))
if isinstance(runtime, (int, float)):
return int(runtime)
return None
def extract_progress_info(self, item: LibraryItem) -> float | None:
"""Return progress percentage (0-100) if present."""
percent_complete = item.get("percent_complete")
listening_status = item.get("listening_status", {})
if isinstance(listening_status, dict) and percent_complete is None:
percent_complete = listening_status.get("percent_complete")
return float(percent_complete) if percent_complete is not None else None
def extract_asin(self, item: LibraryItem) -> str | None:
"""Return the ASIN for a library item."""
product = item.get("product", {})
return item.get("asin") or product.get("asin")
def is_finished(self, item: LibraryItem) -> bool:
"""Return True if the item is marked or inferred as finished."""
is_finished_flag = item.get("is_finished")
percent_complete = item.get("percent_complete")
listening_status = item.get("listening_status")
if isinstance(listening_status, dict):
is_finished_flag = is_finished_flag or listening_status.get(
"is_finished", False
)
if percent_complete is None:
percent_complete = listening_status.get("percent_complete", 0)
return bool(is_finished_flag) or (
isinstance(percent_complete, (int, float)) and percent_complete >= 100
)

View File

@@ -0,0 +1,127 @@
"""Library page fetching helpers for the Audible API client."""
from __future__ import annotations
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any
from ..types import LibraryItem, StatusCallback
class LibraryClientFetchMixin:
"""Fetches all library items from paginated Audible endpoints."""
client: Any
def fetch_all_items(
self, on_progress: StatusCallback | None = None
) -> list[LibraryItem]:
"""Fetch all library items from the API."""
response_groups = (
"contributors,media,product_attrs,product_desc,product_details,"
"is_finished,listening_status,percent_complete"
)
return self._fetch_all_pages(response_groups, on_progress)
def _fetch_page(
self,
page: int,
page_size: int,
response_groups: str,
) -> tuple[int, list[LibraryItem]]:
"""Fetch one library page and return its index with items."""
library = self.client.get(
path="library",
num_results=page_size,
page=page,
response_groups=response_groups,
)
items = library.get("items", [])
return page, list(items)
def _fetch_all_pages(
self,
response_groups: str,
on_progress: StatusCallback | None = None,
) -> list[LibraryItem]:
"""Fetch all library pages using parallel requests after page one."""
library_response = None
page_size = 200
for attempt_size in [200, 100, 50]:
try:
library_response = self.client.get(
path="library",
num_results=attempt_size,
page=1,
response_groups=response_groups,
)
page_size = attempt_size
break
except Exception:
continue
if not library_response:
return []
first_page_items = library_response.get("items", [])
if not first_page_items:
return []
all_items: list[LibraryItem] = list(first_page_items)
if on_progress:
on_progress(f"Fetched page 1 ({len(first_page_items)} items)...")
if len(first_page_items) < page_size:
return all_items
total_items_estimate = library_response.get(
"total_results"
) or library_response.get("total")
if total_items_estimate:
estimated_pages = (total_items_estimate + page_size - 1) // page_size
estimated_pages = min(estimated_pages, 1000)
else:
estimated_pages = 500
max_workers = 50
page_results: dict[int, list[LibraryItem]] = {}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_page: dict = {}
for page in range(2, estimated_pages + 1):
future = executor.submit(
self._fetch_page, page, page_size, response_groups
)
future_to_page[future] = page
completed_count = 0
total_items = len(first_page_items)
for future in as_completed(future_to_page):
try:
fetched_page, items = future.result()
future_to_page.pop(future, None)
if not items or len(items) < page_size:
for remaining_future in list(future_to_page.keys()):
remaining_future.cancel()
break
page_results[fetched_page] = items
total_items += len(items)
completed_count += 1
if on_progress and completed_count % 20 == 0:
on_progress(
f"Fetched {completed_count} pages ({total_items} items)..."
)
except Exception:
future_to_page.pop(future, None)
pass
for page_num in sorted(page_results.keys()):
all_items.extend(page_results[page_num])
return all_items

View File

@@ -0,0 +1,70 @@
"""Helpers for marking content as finished through Audible APIs."""
from __future__ import annotations
from typing import Any
from ..types import LibraryItem
class LibraryClientFinishedMixin:
"""Marks titles as finished and mutates in-memory item state."""
client: Any
def mark_as_finished(self, asin: str, item: LibraryItem | None = None) -> bool:
"""Mark a book as finished on Audible and optionally update item state."""
total_ms = self._get_runtime_ms(asin, item)
if not total_ms:
return False
acr = self._get_acr(asin)
if not acr:
return False
try:
self.client.put(
path=f"1.0/lastpositions/{asin}",
body={"asin": asin, "acr": acr, "position_ms": total_ms},
)
if item:
item["is_finished"] = True
listening_status = item.get("listening_status", {})
if isinstance(listening_status, dict):
listening_status["is_finished"] = True
return True
except Exception:
return False
def _get_runtime_ms(self, asin: str, item: LibraryItem | None = None) -> int | None:
"""Return total runtime in milliseconds from item or metadata endpoint."""
if item:
extract_runtime_minutes = getattr(self, "extract_runtime_minutes")
runtime_min = extract_runtime_minutes(item)
if runtime_min:
return runtime_min * 60 * 1000
try:
response = self.client.get(
path=f"1.0/content/{asin}/metadata",
response_groups="chapter_info",
)
chapter_info = response.get("content_metadata", {}).get("chapter_info", {})
return chapter_info.get("runtime_length_ms")
except Exception:
return None
def _get_acr(self, asin: str) -> str | None:
"""Fetch the ACR token required by finish/update write operations."""
try:
response = self.client.post(
path=f"1.0/content/{asin}/licenserequest",
body={
"response_groups": "content_reference",
"consumption_type": "Download",
"drm_type": "Adrm",
},
)
return response.get("content_license", {}).get("acr")
except Exception:
return None

View File

@@ -0,0 +1,37 @@
"""Formatting helpers exposed by the library client."""
from __future__ import annotations
class LibraryClientFormatMixin:
"""Formats durations and timestamps for display usage."""
@staticmethod
def format_duration(
value: int | None,
unit: str = "minutes",
default_none: str | None = None,
) -> str | None:
"""Format duration values as compact hour-minute strings."""
if value is None or value <= 0:
return default_none
total_minutes = int(value)
if unit == "seconds":
total_minutes //= 60
hours, minutes = divmod(total_minutes, 60)
if hours > 0:
return f"{hours}h{minutes:02d}" if minutes else f"{hours}h"
return f"{minutes}m"
@staticmethod
def format_time(seconds: float) -> str:
"""Format seconds as HH:MM:SS or MM:SS for display."""
total_seconds = int(seconds)
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
secs = total_seconds % 60
if hours > 0:
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
return f"{minutes:02d}:{secs:02d}"

View File

@@ -0,0 +1,85 @@
"""Playback position read and write helpers for library content."""
from __future__ import annotations
from typing import Any
class LibraryClientPositionsMixin:
"""Handles last-position retrieval and persistence."""
client: Any
def get_last_position(self, asin: str) -> float | None:
"""Get the last playback position for a book in seconds."""
try:
response = self.client.get(
path="1.0/annotations/lastpositions",
asins=asin,
)
annotations = response.get("asin_last_position_heard_annots", [])
for annotation in annotations:
if annotation.get("asin") != asin:
continue
last_position_heard = annotation.get("last_position_heard", {})
if not isinstance(last_position_heard, dict):
continue
if last_position_heard.get("status") == "DoesNotExist":
return None
position_ms = last_position_heard.get("position_ms")
if position_ms is not None:
return float(position_ms) / 1000.0
return None
except (OSError, ValueError, KeyError):
return None
def _get_content_reference(self, asin: str) -> dict | None:
"""Fetch content reference payload used by position update calls."""
try:
response = self.client.get(
path=f"1.0/content/{asin}/metadata",
response_groups="content_reference",
)
content_metadata = response.get("content_metadata", {})
content_reference = content_metadata.get("content_reference", {})
if isinstance(content_reference, dict):
return content_reference
return None
except (OSError, ValueError, KeyError):
return None
def _update_position(self, asin: str, position_seconds: float) -> bool:
"""Persist playback position to the API and return success state."""
if position_seconds < 0:
return False
content_ref = self._get_content_reference(asin)
if not content_ref:
return False
acr = content_ref.get("acr")
if not acr:
return False
body = {
"acr": acr,
"asin": asin,
"position_ms": int(position_seconds * 1000),
}
if version := content_ref.get("version"):
body["version"] = version
try:
self.client.put(
path=f"1.0/lastpositions/{asin}",
body=body,
)
return True
except (OSError, ValueError, KeyError):
return False
def save_last_position(self, asin: str, position_seconds: float) -> bool:
"""Save playback position to Audible and return success state."""
if position_seconds <= 0:
return False
return self._update_position(asin, position_seconds)