diff --git a/auditui/library/client.py b/auditui/library/client.py index 58ae618..74da5cc 100644 --- a/auditui/library/client.py +++ b/auditui/library/client.py @@ -1,365 +1,25 @@ -"""Client for the Audible library API.""" +"""Client facade for Audible library fetch, extraction, and progress updates.""" -from concurrent.futures import ThreadPoolExecutor, as_completed +from __future__ import annotations import audible -from ..types import LibraryItem, StatusCallback +from .client_extract import LibraryClientExtractMixin +from .client_fetch import LibraryClientFetchMixin +from .client_finished import LibraryClientFinishedMixin +from .client_format import LibraryClientFormatMixin +from .client_positions import LibraryClientPositionsMixin -class LibraryClient: - """Client for the Audible library API. Fetches items, extracts metadata, and updates positions.""" +class LibraryClient( + LibraryClientFetchMixin, + LibraryClientExtractMixin, + LibraryClientPositionsMixin, + LibraryClientFinishedMixin, + LibraryClientFormatMixin, +): + """Audible library client composed from focused behavior mixins.""" def __init__(self, client: audible.Client) -> None: + """Store authenticated Audible client used by all operations.""" self.client = client - - def fetch_all_items(self, on_progress: StatusCallback | None = None) -> list[LibraryItem]: - """Fetch all library items from the API.""" - response_groups = ( - "contributors,media,product_attrs,product_desc,product_details," - "is_finished,listening_status,percent_complete" - ) - return self._fetch_all_pages(response_groups, on_progress) - - def _fetch_page( - self, page: int, page_size: int, response_groups: str - ) -> tuple[int, list[LibraryItem]]: - """Fetch a single page of library items from the API.""" - library = self.client.get( - path="library", - num_results=page_size, - page=page, - response_groups=response_groups, - ) - items = library.get("items", []) - return page, list(items) - - def _fetch_all_pages( - self, response_groups: str, on_progress: StatusCallback | None = None - ) -> list[LibraryItem]: - """Fetch all pages of library items using parallel requests.""" - library_response = None - page_size = 200 - - for attempt_size in [200, 100, 50]: - try: - library_response = self.client.get( - path="library", - num_results=attempt_size, - page=1, - response_groups=response_groups, - ) - page_size = attempt_size - break - except Exception: - continue - - if not library_response: - return [] - - first_page_items = library_response.get("items", []) - if not first_page_items: - return [] - - all_items: list[LibraryItem] = list(first_page_items) - if on_progress: - on_progress(f"Fetched page 1 ({len(first_page_items)} items)...") - - if len(first_page_items) < page_size: - return all_items - - total_items_estimate = library_response.get( - "total_results") or library_response.get("total") - if total_items_estimate: - estimated_pages = (total_items_estimate + - page_size - 1) // page_size - estimated_pages = min(estimated_pages, 1000) - else: - estimated_pages = 500 - - max_workers = 50 - page_results: dict[int, list[LibraryItem]] = {} - - with ThreadPoolExecutor(max_workers=max_workers) as executor: - future_to_page: dict = {} - - for page in range(2, estimated_pages + 1): - future = executor.submit( - self._fetch_page, page, page_size, response_groups - ) - future_to_page[future] = page - - completed_count = 0 - total_items = len(first_page_items) - - for future in as_completed(future_to_page): - page_num = future_to_page.pop(future) - try: - fetched_page, items = future.result() - if not items or len(items) < page_size: - for remaining_future in list(future_to_page.keys()): - remaining_future.cancel() - break - - page_results[fetched_page] = items - total_items += len(items) - completed_count += 1 - if on_progress and completed_count % 20 == 0: - on_progress( - f"Fetched {completed_count} pages ({total_items} items)..." - ) - - except Exception: - pass - - for page_num in sorted(page_results.keys()): - all_items.extend(page_results[page_num]) - - return all_items - - def extract_title(self, item: LibraryItem) -> str: - """Return the book title from a library item.""" - product = item.get("product", {}) - return ( - product.get("title") - or item.get("title") - or product.get("asin", "Unknown Title") - ) - - def extract_authors(self, item: LibraryItem) -> str: - """Return comma-separated author names from a library item.""" - product = item.get("product", {}) - authors = product.get("authors") or product.get("contributors") or [] - if not authors and "authors" in item: - authors = item.get("authors", []) - - author_names = [a.get("name", "") - for a in authors if isinstance(a, dict)] - return ", ".join(author_names) or "Unknown" - - def extract_runtime_minutes(self, item: LibraryItem) -> int | None: - """Return runtime in minutes if present.""" - product = item.get("product", {}) - runtime_fields = [ - "runtime_length_min", - "runtime_length", - "vLength", - "length", - "duration", - ] - - runtime = None - for field in runtime_fields: - runtime = product.get(field) or item.get(field) - if runtime is not None: - break - - if runtime is None: - return None - - if isinstance(runtime, dict): - return int(runtime.get("min", 0)) - if isinstance(runtime, (int, float)): - return int(runtime) - return None - - def extract_progress_info(self, item: LibraryItem) -> float | None: - """Return progress percentage (0–100) if present.""" - percent_complete = item.get("percent_complete") - listening_status = item.get("listening_status", {}) - - if isinstance(listening_status, dict) and percent_complete is None: - percent_complete = listening_status.get("percent_complete") - - return float(percent_complete) if percent_complete is not None else None - - def extract_asin(self, item: LibraryItem) -> str | None: - """Return the ASIN for a library item.""" - product = item.get("product", {}) - return item.get("asin") or product.get("asin") - - def is_finished(self, item: LibraryItem) -> bool: - """Return True if the item is marked or inferred as finished.""" - is_finished_flag = item.get("is_finished") - percent_complete = item.get("percent_complete") - listening_status = item.get("listening_status") - - if isinstance(listening_status, dict): - is_finished_flag = is_finished_flag or listening_status.get( - "is_finished", False - ) - if percent_complete is None: - percent_complete = listening_status.get("percent_complete", 0) - - return bool(is_finished_flag) or ( - isinstance(percent_complete, (int, float)) - and percent_complete >= 100 - ) - - def get_last_position(self, asin: str) -> float | None: - """Get the last playback position for a book in seconds.""" - try: - response = self.client.get( - path="1.0/annotations/lastpositions", - asins=asin, - ) - annotations = response.get("asin_last_position_heard_annots", []) - - for annot in annotations: - if annot.get("asin") != asin: - continue - - last_position_heard = annot.get("last_position_heard", {}) - if not isinstance(last_position_heard, dict): - continue - - if last_position_heard.get("status") == "DoesNotExist": - return None - - position_ms = last_position_heard.get("position_ms") - if position_ms is not None: - return float(position_ms) / 1000.0 - - return None - except (OSError, ValueError, KeyError): - return None - - def _get_content_reference(self, asin: str) -> dict | None: - """Fetch content reference (ACR and version) for position updates.""" - try: - response = self.client.get( - path=f"1.0/content/{asin}/metadata", - response_groups="content_reference", - ) - content_metadata = response.get("content_metadata", {}) - content_reference = content_metadata.get("content_reference", {}) - if isinstance(content_reference, dict): - return content_reference - return None - except (OSError, ValueError, KeyError): - return None - - def _update_position(self, asin: str, position_seconds: float) -> bool: - """Persist playback position to the API. Returns True on success.""" - if position_seconds < 0: - return False - - content_ref = self._get_content_reference(asin) - if not content_ref: - return False - - acr = content_ref.get("acr") - if not acr: - return False - - body = { - "acr": acr, - "asin": asin, - "position_ms": int(position_seconds * 1000), - } - - if version := content_ref.get("version"): - body["version"] = version - - try: - self.client.put( - path=f"1.0/lastpositions/{asin}", - body=body, - ) - return True - except (OSError, ValueError, KeyError): - return False - - def save_last_position(self, asin: str, position_seconds: float) -> bool: - """Save playback position to Audible. Returns True on success.""" - if position_seconds <= 0: - return False - return self._update_position(asin, position_seconds) - - @staticmethod - def format_duration( - value: int | None, unit: str = "minutes", default_none: str | None = None - ) -> str | None: - """Format a duration value as e.g. 2h30m or 45m.""" - if value is None or value <= 0: - return default_none - - total_minutes = int(value) - if unit == "seconds": - total_minutes //= 60 - - hours, minutes = divmod(total_minutes, 60) - - if hours > 0: - return f"{hours}h{minutes:02d}" if minutes else f"{hours}h" - return f"{minutes}m" - - def mark_as_finished(self, asin: str, item: LibraryItem | None = None) -> bool: - """Mark a book as finished on Audible. Optionally mutates item in place.""" - total_ms = self._get_runtime_ms(asin, item) - if not total_ms: - return False - - position_ms = total_ms - acr = self._get_acr(asin) - if not acr: - return False - - try: - self.client.put( - path=f"1.0/lastpositions/{asin}", - body={"asin": asin, "acr": acr, "position_ms": position_ms}, - ) - if item: - item["is_finished"] = True - listening_status = item.get("listening_status", {}) - if isinstance(listening_status, dict): - listening_status["is_finished"] = True - return True - except Exception: - return False - - def _get_runtime_ms(self, asin: str, item: LibraryItem | None = None) -> int | None: - """Return total runtime in ms from item or API.""" - if item: - runtime_min = self.extract_runtime_minutes(item) - if runtime_min: - return runtime_min * 60 * 1000 - - try: - response = self.client.get( - path=f"1.0/content/{asin}/metadata", - response_groups="chapter_info", - ) - chapter_info = response.get( - "content_metadata", {}).get("chapter_info", {}) - return chapter_info.get("runtime_length_ms") - except Exception: - return None - - def _get_acr(self, asin: str) -> str | None: - """Fetch ACR token required for position and finish updates.""" - try: - response = self.client.post( - path=f"1.0/content/{asin}/licenserequest", - body={ - "response_groups": "content_reference", - "consumption_type": "Download", - "drm_type": "Adrm", - }, - ) - return response.get("content_license", {}).get("acr") - except Exception: - return None - - @staticmethod - def format_time(seconds: float) -> str: - """Format seconds as HH:MM:SS or MM:SS for display.""" - total_seconds = int(seconds) - hours = total_seconds // 3600 - minutes = (total_seconds % 3600) // 60 - secs = total_seconds % 60 - - if hours > 0: - return f"{hours:02d}:{minutes:02d}:{secs:02d}" - return f"{minutes:02d}:{secs:02d}" diff --git a/auditui/library/client_extract.py b/auditui/library/client_extract.py new file mode 100644 index 0000000..8c0d76e --- /dev/null +++ b/auditui/library/client_extract.py @@ -0,0 +1,84 @@ +"""Metadata extraction helpers for library items.""" + +from __future__ import annotations + +from ..types import LibraryItem + + +class LibraryClientExtractMixin: + """Extracts display and status fields from library items.""" + + def extract_title(self, item: LibraryItem) -> str: + """Return the book title from a library item.""" + product = item.get("product", {}) + return ( + product.get("title") + or item.get("title") + or product.get("asin", "Unknown Title") + ) + + def extract_authors(self, item: LibraryItem) -> str: + """Return comma-separated author names from a library item.""" + product = item.get("product", {}) + authors = product.get("authors") or product.get("contributors") or [] + if not authors and "authors" in item: + authors = item.get("authors", []) + author_names = [ + author.get("name", "") for author in authors if isinstance(author, dict) + ] + return ", ".join(author_names) or "Unknown" + + def extract_runtime_minutes(self, item: LibraryItem) -> int | None: + """Return runtime in minutes if present.""" + product = item.get("product", {}) + runtime_fields = [ + "runtime_length_min", + "runtime_length", + "vLength", + "length", + "duration", + ] + + runtime = None + for field in runtime_fields: + runtime = product.get(field) or item.get(field) + if runtime is not None: + break + + if runtime is None: + return None + if isinstance(runtime, dict): + return int(runtime.get("min", 0)) + if isinstance(runtime, (int, float)): + return int(runtime) + return None + + def extract_progress_info(self, item: LibraryItem) -> float | None: + """Return progress percentage (0-100) if present.""" + percent_complete = item.get("percent_complete") + listening_status = item.get("listening_status", {}) + if isinstance(listening_status, dict) and percent_complete is None: + percent_complete = listening_status.get("percent_complete") + return float(percent_complete) if percent_complete is not None else None + + def extract_asin(self, item: LibraryItem) -> str | None: + """Return the ASIN for a library item.""" + product = item.get("product", {}) + return item.get("asin") or product.get("asin") + + def is_finished(self, item: LibraryItem) -> bool: + """Return True if the item is marked or inferred as finished.""" + is_finished_flag = item.get("is_finished") + percent_complete = item.get("percent_complete") + listening_status = item.get("listening_status") + + if isinstance(listening_status, dict): + is_finished_flag = is_finished_flag or listening_status.get( + "is_finished", False + ) + if percent_complete is None: + percent_complete = listening_status.get("percent_complete", 0) + + return bool(is_finished_flag) or ( + isinstance(percent_complete, (int, float)) and percent_complete >= 100 + ) diff --git a/auditui/library/client_fetch.py b/auditui/library/client_fetch.py new file mode 100644 index 0000000..4294aa5 --- /dev/null +++ b/auditui/library/client_fetch.py @@ -0,0 +1,127 @@ +"""Library page fetching helpers for the Audible API client.""" + +from __future__ import annotations + +from concurrent.futures import ThreadPoolExecutor, as_completed + +from typing import Any + +from ..types import LibraryItem, StatusCallback + + +class LibraryClientFetchMixin: + """Fetches all library items from paginated Audible endpoints.""" + + client: Any + + def fetch_all_items( + self, on_progress: StatusCallback | None = None + ) -> list[LibraryItem]: + """Fetch all library items from the API.""" + response_groups = ( + "contributors,media,product_attrs,product_desc,product_details," + "is_finished,listening_status,percent_complete" + ) + return self._fetch_all_pages(response_groups, on_progress) + + def _fetch_page( + self, + page: int, + page_size: int, + response_groups: str, + ) -> tuple[int, list[LibraryItem]]: + """Fetch one library page and return its index with items.""" + library = self.client.get( + path="library", + num_results=page_size, + page=page, + response_groups=response_groups, + ) + items = library.get("items", []) + return page, list(items) + + def _fetch_all_pages( + self, + response_groups: str, + on_progress: StatusCallback | None = None, + ) -> list[LibraryItem]: + """Fetch all library pages using parallel requests after page one.""" + library_response = None + page_size = 200 + + for attempt_size in [200, 100, 50]: + try: + library_response = self.client.get( + path="library", + num_results=attempt_size, + page=1, + response_groups=response_groups, + ) + page_size = attempt_size + break + except Exception: + continue + + if not library_response: + return [] + + first_page_items = library_response.get("items", []) + if not first_page_items: + return [] + + all_items: list[LibraryItem] = list(first_page_items) + if on_progress: + on_progress(f"Fetched page 1 ({len(first_page_items)} items)...") + + if len(first_page_items) < page_size: + return all_items + + total_items_estimate = library_response.get( + "total_results" + ) or library_response.get("total") + if total_items_estimate: + estimated_pages = (total_items_estimate + page_size - 1) // page_size + estimated_pages = min(estimated_pages, 1000) + else: + estimated_pages = 500 + + max_workers = 50 + page_results: dict[int, list[LibraryItem]] = {} + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + future_to_page: dict = {} + + for page in range(2, estimated_pages + 1): + future = executor.submit( + self._fetch_page, page, page_size, response_groups + ) + future_to_page[future] = page + + completed_count = 0 + total_items = len(first_page_items) + + for future in as_completed(future_to_page): + try: + fetched_page, items = future.result() + future_to_page.pop(future, None) + if not items or len(items) < page_size: + for remaining_future in list(future_to_page.keys()): + remaining_future.cancel() + break + + page_results[fetched_page] = items + total_items += len(items) + completed_count += 1 + if on_progress and completed_count % 20 == 0: + on_progress( + f"Fetched {completed_count} pages ({total_items} items)..." + ) + + except Exception: + future_to_page.pop(future, None) + pass + + for page_num in sorted(page_results.keys()): + all_items.extend(page_results[page_num]) + + return all_items diff --git a/auditui/library/client_finished.py b/auditui/library/client_finished.py new file mode 100644 index 0000000..82848c3 --- /dev/null +++ b/auditui/library/client_finished.py @@ -0,0 +1,70 @@ +"""Helpers for marking content as finished through Audible APIs.""" + +from __future__ import annotations + +from typing import Any + +from ..types import LibraryItem + + +class LibraryClientFinishedMixin: + """Marks titles as finished and mutates in-memory item state.""" + + client: Any + + def mark_as_finished(self, asin: str, item: LibraryItem | None = None) -> bool: + """Mark a book as finished on Audible and optionally update item state.""" + total_ms = self._get_runtime_ms(asin, item) + if not total_ms: + return False + + acr = self._get_acr(asin) + if not acr: + return False + + try: + self.client.put( + path=f"1.0/lastpositions/{asin}", + body={"asin": asin, "acr": acr, "position_ms": total_ms}, + ) + if item: + item["is_finished"] = True + listening_status = item.get("listening_status", {}) + if isinstance(listening_status, dict): + listening_status["is_finished"] = True + return True + except Exception: + return False + + def _get_runtime_ms(self, asin: str, item: LibraryItem | None = None) -> int | None: + """Return total runtime in milliseconds from item or metadata endpoint.""" + if item: + extract_runtime_minutes = getattr(self, "extract_runtime_minutes") + runtime_min = extract_runtime_minutes(item) + if runtime_min: + return runtime_min * 60 * 1000 + + try: + response = self.client.get( + path=f"1.0/content/{asin}/metadata", + response_groups="chapter_info", + ) + chapter_info = response.get("content_metadata", {}).get("chapter_info", {}) + return chapter_info.get("runtime_length_ms") + except Exception: + return None + + def _get_acr(self, asin: str) -> str | None: + """Fetch the ACR token required by finish/update write operations.""" + try: + response = self.client.post( + path=f"1.0/content/{asin}/licenserequest", + body={ + "response_groups": "content_reference", + "consumption_type": "Download", + "drm_type": "Adrm", + }, + ) + return response.get("content_license", {}).get("acr") + except Exception: + return None diff --git a/auditui/library/client_format.py b/auditui/library/client_format.py new file mode 100644 index 0000000..c4dc422 --- /dev/null +++ b/auditui/library/client_format.py @@ -0,0 +1,37 @@ +"""Formatting helpers exposed by the library client.""" + +from __future__ import annotations + + +class LibraryClientFormatMixin: + """Formats durations and timestamps for display usage.""" + + @staticmethod + def format_duration( + value: int | None, + unit: str = "minutes", + default_none: str | None = None, + ) -> str | None: + """Format duration values as compact hour-minute strings.""" + if value is None or value <= 0: + return default_none + + total_minutes = int(value) + if unit == "seconds": + total_minutes //= 60 + + hours, minutes = divmod(total_minutes, 60) + if hours > 0: + return f"{hours}h{minutes:02d}" if minutes else f"{hours}h" + return f"{minutes}m" + + @staticmethod + def format_time(seconds: float) -> str: + """Format seconds as HH:MM:SS or MM:SS for display.""" + total_seconds = int(seconds) + hours = total_seconds // 3600 + minutes = (total_seconds % 3600) // 60 + secs = total_seconds % 60 + if hours > 0: + return f"{hours:02d}:{minutes:02d}:{secs:02d}" + return f"{minutes:02d}:{secs:02d}" diff --git a/auditui/library/client_positions.py b/auditui/library/client_positions.py new file mode 100644 index 0000000..0c3b7c4 --- /dev/null +++ b/auditui/library/client_positions.py @@ -0,0 +1,85 @@ +"""Playback position read and write helpers for library content.""" + +from __future__ import annotations + +from typing import Any + + +class LibraryClientPositionsMixin: + """Handles last-position retrieval and persistence.""" + + client: Any + + def get_last_position(self, asin: str) -> float | None: + """Get the last playback position for a book in seconds.""" + try: + response = self.client.get( + path="1.0/annotations/lastpositions", + asins=asin, + ) + annotations = response.get("asin_last_position_heard_annots", []) + for annotation in annotations: + if annotation.get("asin") != asin: + continue + last_position_heard = annotation.get("last_position_heard", {}) + if not isinstance(last_position_heard, dict): + continue + if last_position_heard.get("status") == "DoesNotExist": + return None + position_ms = last_position_heard.get("position_ms") + if position_ms is not None: + return float(position_ms) / 1000.0 + return None + except (OSError, ValueError, KeyError): + return None + + def _get_content_reference(self, asin: str) -> dict | None: + """Fetch content reference payload used by position update calls.""" + try: + response = self.client.get( + path=f"1.0/content/{asin}/metadata", + response_groups="content_reference", + ) + content_metadata = response.get("content_metadata", {}) + content_reference = content_metadata.get("content_reference", {}) + if isinstance(content_reference, dict): + return content_reference + return None + except (OSError, ValueError, KeyError): + return None + + def _update_position(self, asin: str, position_seconds: float) -> bool: + """Persist playback position to the API and return success state.""" + if position_seconds < 0: + return False + + content_ref = self._get_content_reference(asin) + if not content_ref: + return False + + acr = content_ref.get("acr") + if not acr: + return False + + body = { + "acr": acr, + "asin": asin, + "position_ms": int(position_seconds * 1000), + } + if version := content_ref.get("version"): + body["version"] = version + + try: + self.client.put( + path=f"1.0/lastpositions/{asin}", + body=body, + ) + return True + except (OSError, ValueError, KeyError): + return False + + def save_last_position(self, asin: str, position_seconds: float) -> bool: + """Save playback position to Audible and return success state.""" + if position_seconds <= 0: + return False + return self._update_position(asin, position_seconds)