Compare commits

...

17 Commits

Author SHA1 Message Date
1e6f1544db Merge pull request 'Massive refactoring' (#1) from new-architecture into main
Reviewed-on: #1
2026-02-18 04:29:20 +01:00
26cba97cbd chore(changelog): document load-time tuning and duplicate row key fix 2026-02-18 04:28:35 +01:00
175bb7cbdc test(app): add coverage for unique table row key generation 2026-02-18 04:28:28 +01:00
bf0e70e9d9 fix(table): use unique row keys to avoid duplicate title crashes 2026-02-18 04:28:20 +01:00
cb4104e59a perf(app): remove eager search cache priming during library load 2026-02-18 04:28:13 +01:00
570639e988 perf(library): tune first-page probe order for faster medium-library loads 2026-02-18 04:28:06 +01:00
5ba0fafbc1 chore: update changelog 2026-02-18 04:20:56 +01:00
bed0ac4fea test(downloads): add regression coverage for too-small download retry behavior 2026-02-18 04:19:39 +01:00
0a909484e3 fix(downloads): retry undersized downloads and surface precise size failure messages 2026-02-18 04:19:33 +01:00
ecdd953ff4 refactor(downloads): split download streaming into focused helpers and reduce complexity 2026-02-18 04:12:54 +01:00
4ba2c43c93 clean: remove unused import 2026-02-18 04:09:04 +01:00
4b1924edd8 chore: update changelog 2026-02-18 04:08:41 +01:00
da20e84513 docs: update readme 2026-02-18 04:02:20 +01:00
dcb43f65dd chore: fix wording 2026-02-18 03:57:51 +01:00
beca8ee085 perf(library): optimize paginated fetch with bounded concurrent scheduling 2026-02-18 03:56:44 +01:00
e813267d5e refactor(library): decompose monolithic LibraryClient into fetch/extract/positions/finished/format mixins while preserving public behavior 2026-02-18 03:54:16 +01:00
eca58423dc refactor(constants): split constants into domain modules with compatibility exports 2026-02-18 03:44:26 +01:00
21 changed files with 964 additions and 685 deletions

View File

@@ -11,11 +11,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- massive code refactoring
- complete test suite revamp
- updated download cache naming to use `Author_Title` format with normalized separators.
- updated download cache naming to use `Author_Title` format with normalized separators
- optimized library pagination fetch with bounded concurrent scheduling
- adjusted library first-page probe order to prefer larger page sizes for medium libraries
- removed eager search cache priming during library load to reduce startup work
### Fixed
- reused library metadata for download filename generation to avoid `Unknown-Author_Unknown-Title` when title/author are already known in the UI.
- reused library metadata for download filename generation to avoid `Unknown-Author_Unknown-Title` when title/author are already known in the UI
- fixed Audible last-position request parameter handling after library client refactor
- added retry behavior and explicit size diagnostics when downloaded files are too small
- prevented table rendering crashes by generating unique row keys instead of using title-only keys
## [0.1.6] - 2026-02-16

View File

@@ -2,7 +2,7 @@
A terminal-based user interface (TUI) client for [Audible](https://www.audible.fr/), written in Python 3.
Currently, the only available theme is Catppuccin Mocha, following their [style guide](https://github.com/catppuccin/catppuccin/blob/main/docs/style-guide.md), as it's my preferred theme across most of my tools.
The interface currently ships with a single built-in theme.
## Requirements
@@ -36,9 +36,9 @@ auditui --version
All set, run `auditui configure` to set up authentication, and then `auditui` to start the TUI.
### Workaround for Python 3.13 linux distribution
### Workaround for Python 3.13 Linux distributions
On some Linux distributions, Python 3.13 is already the default. So you have to install Python 3.12 manually before using `pipx`.
On some Linux distributions, Python 3.13 is already the default. In that case, install Python 3.12 manually before using `pipx`.
For Arch Linux:
@@ -52,7 +52,7 @@ Once you have Python 3.12, run:
pipx install git+https://git.kharec.info/Kharec/auditui.git --python python3.12
```
As Python <3.14 is supported on `master` branch of the upstream [`audible`](https://github.com/mkb79/Audible), this should be temporary until the next version.
This workaround is temporary and depends on upstream `audible` compatibility updates.
## Upgrade
@@ -90,6 +90,8 @@ pipx upgrade auditui
Books are downloaded to `~/.cache/auditui/books`.
Downloaded files use a normalized `Author_Title.aax` naming format. For example, `Stephen King` and `11/22/63` become `Stephen-King_11-22-63.aax`.
The `d` key toggles the download state for the selected book: if the book is not cached, pressing `d` will download it; if it's already cached, pressing `d` will delete it from the cache.
To check the total size of your cache:

View File

@@ -8,7 +8,7 @@ from textual.events import Resize
from textual.widgets import DataTable, ProgressBar, Static
from .. import __version__
from ..constants import TABLE_COLUMN_DEFS, TABLE_CSS
from ..constants import TABLE_COLUMN_DEFS
class AppLayoutMixin:

View File

@@ -16,16 +16,15 @@ class AppLibraryMixin:
return
try:
all_items = self.library_client.fetch_all_items(
self._thread_status_update)
all_items = self.library_client.fetch_all_items(self._thread_status_update)
self.call_from_thread(self.on_library_loaded, all_items)
except (OSError, ValueError, KeyError) as exc:
self.call_from_thread(self.on_library_error, str(exc))
def on_library_loaded(self, items: list[LibraryItem]) -> None:
"""Store fetched items and refresh the active library view."""
self.all_items = items
self._search_text_cache.clear()
self._prime_search_cache(items)
self.update_status(f"Loaded {len(items)} books")
if self.show_all_mode:
self.show_all()

View File

@@ -15,6 +15,7 @@ from textual.widgets import DataTable, Static
class AppTableMixin:
def _populate_table(self, items: list[LibraryItem]) -> None:
"""Render library items into the table with stable unique row keys."""
table = self.query_one("#library_table", DataTable)
table.clear()
@@ -22,18 +23,41 @@ class AppTableMixin:
self.update_status("No books found.")
return
for item in items:
used_keys: set[str] = set()
for index, item in enumerate(items):
title, author, runtime, progress, downloaded = format_item_as_row(
item, self.library_client, self.download_manager
)
table.add_row(title, author, runtime,
progress, downloaded, key=title)
row_key = self._build_row_key(item, title, index, used_keys)
table.add_row(title, author, runtime, progress, downloaded, key=row_key)
self.current_items = items
status = self.query_one("#status", Static)
status.display = False
self._apply_column_widths(table)
def _build_row_key(
self,
item: LibraryItem,
title: str,
index: int,
used_keys: set[str],
) -> str:
"""Return a unique table row key derived from ASIN when available."""
asin = self.library_client.extract_asin(item) if self.library_client else None
base_key = asin or f"{title}#{index}"
if base_key not in used_keys:
used_keys.add(base_key)
return base_key
suffix = 2
candidate = f"{base_key}#{suffix}"
while candidate in used_keys:
suffix += 1
candidate = f"{base_key}#{suffix}"
used_keys.add(candidate)
return candidate
def _refresh_table(self) -> None:
if self.current_items:
self._populate_table(self.current_items)
@@ -79,11 +103,9 @@ class AppTableMixin:
items = self.all_items
if self.filter_text:
items = filter_items(items, self.filter_text,
self._get_search_text)
items = filter_items(items, self.filter_text, self._get_search_text)
self._populate_table(items)
self.update_status(
f"Filter: '{self.filter_text}' ({len(items)} books)")
self.update_status(f"Filter: '{self.filter_text}' ({len(items)} books)")
return
if not self.show_all_mode and self.library_client:
@@ -97,6 +119,7 @@ class AppTableMixin:
if cached is not None:
return cached
from ..library import build_search_text
search_text = build_search_text(item, self.library_client)
self._search_text_cache[cache_key] = search_text
return search_text

View File

@@ -1,278 +1,29 @@
"""Paths, API/config values, and CSS used across the application."""
"""Compatibility exports for constants grouped by domain modules."""
from pathlib import Path
AUTH_PATH = Path.home() / ".config" / "auditui" / "auth.json"
CONFIG_PATH = Path.home() / ".config" / "auditui" / "config.json"
CACHE_DIR = Path.home() / ".cache" / "auditui" / "books"
DOWNLOAD_URL = "https://cde-ta-g7g.amazon.com/FionaCDEServiceEngine/FSDownloadContent"
DEFAULT_CODEC = "LC_128_44100_stereo"
MIN_FILE_SIZE = 1024 * 1024
DEFAULT_CHUNK_SIZE = 8192
TABLE_COLUMN_DEFS = (
("Title", 4),
("Author", 3),
("Length", 1),
("Progress", 1),
("Downloaded", 1),
from .downloads import DEFAULT_CHUNK_SIZE, DEFAULT_CODEC, DOWNLOAD_URL, MIN_FILE_SIZE
from .library import (
AUTHOR_NAME_DISPLAY_LENGTH,
AUTHOR_NAME_MAX_LENGTH,
PROGRESS_COLUMN_INDEX,
)
from .paths import AUTH_PATH, CACHE_DIR, CONFIG_PATH
from .playback import SEEK_SECONDS
from .table import TABLE_COLUMN_DEFS
from .ui import TABLE_CSS
AUTHOR_NAME_MAX_LENGTH = 40
AUTHOR_NAME_DISPLAY_LENGTH = 37
PROGRESS_COLUMN_INDEX = 3
SEEK_SECONDS = 30.0
TABLE_CSS = """
Screen {
background: #141622;
}
#top_bar {
background: #10131f;
color: #d5d9f0;
text-style: bold;
height: 1;
margin: 0;
padding: 0;
}
#top_left,
#top_center,
#top_right {
width: 1fr;
padding: 0 1;
background: #10131f;
margin: 0;
}
#top_left {
text-align: left;
}
#top_center {
text-align: center;
}
#top_right {
text-align: right;
}
DataTable {
width: 100%;
height: 1fr;
background: #141622;
color: #c7cfe8;
border: solid #262a3f;
scrollbar-size-horizontal: 0;
}
DataTable:focus {
border: solid #7aa2f7;
}
DataTable > .datatable--header {
background: #1b2033;
color: #b9c3e3;
text-style: bold;
}
DataTable > .datatable--cursor {
background: #232842;
color: #e6ebff;
}
DataTable > .datatable--odd-row {
background: #121422;
}
DataTable > .datatable--even-row {
background: #15182a;
}
Static {
height: 1;
text-align: center;
background: #10131f;
color: #c7cfe8;
}
Static#status {
color: #b6bfdc;
}
Static#progress_info {
color: #7aa2f7;
text-style: bold;
margin: 0;
padding: 0;
text-align: center;
width: 100%;
}
#progress_bar_container {
align: center middle;
width: 100%;
height: 1;
}
ProgressBar#progress_bar {
height: 1;
background: #10131f;
border: none;
margin: 0;
padding: 0;
width: 50%;
}
ProgressBar#progress_bar Bar {
width: 100%;
}
ProgressBar#progress_bar > .progress-bar--track {
background: #262a3f;
}
ProgressBar#progress_bar > .progress-bar--bar {
background: #8bd5ca;
}
HelpScreen,
StatsScreen,
FilterScreen {
align: center middle;
background: rgba(0, 0, 0, 0.7);
}
HelpScreen Static,
StatsScreen Static,
FilterScreen Static {
background: transparent;
}
StatsScreen #help_container {
width: auto;
min-width: 55;
max-width: 70;
}
StatsScreen #help_content {
align: center middle;
width: 100%;
}
StatsScreen .help_list {
width: 100%;
}
StatsScreen .help_list > ListItem {
background: transparent;
height: 1;
}
StatsScreen .help_list > ListItem:hover {
background: #232842;
}
StatsScreen .help_list > ListItem > Label {
width: 100%;
text-align: left;
padding-left: 2;
}
#help_container {
width: 72%;
max-width: 90;
min-width: 44;
height: auto;
max-height: 80%;
min-height: 14;
background: #181a2a;
border: heavy #7aa2f7;
padding: 1 1;
}
#help_title {
width: 100%;
height: 2;
text-align: center;
text-style: bold;
color: #7aa2f7;
content-align: center middle;
margin-bottom: 0;
border-bottom: solid #4b5165;
}
#help_content {
width: 100%;
height: auto;
padding: 0;
margin: 0 0 1 0;
align: center middle;
}
.help_list {
width: 100%;
height: auto;
background: transparent;
padding: 0;
scrollbar-size: 0 0;
}
.help_list > ListItem {
background: #1b1f33;
padding: 0 1;
height: 1;
}
.help_list > ListItem:hover {
background: #2a2f45;
}
.help_list > ListItem > Label {
width: 100%;
padding: 0;
}
#help_footer {
width: 100%;
height: 2;
text-align: center;
content-align: center middle;
color: #b6bfdc;
margin-top: 0;
border-top: solid #4b5165;
}
#filter_container {
width: 60;
height: auto;
background: #181a2a;
border: heavy #7aa2f7;
padding: 1 2;
}
#filter_title {
width: 100%;
height: 2;
text-align: center;
text-style: bold;
color: #7aa2f7;
content-align: center middle;
margin-bottom: 1;
}
#filter_input {
width: 100%;
margin: 1 0;
}
#filter_footer {
width: 100%;
height: 2;
text-align: center;
content-align: center middle;
color: #b6bfdc;
margin-top: 1;
}
"""
__all__ = [
"AUTH_PATH",
"CONFIG_PATH",
"CACHE_DIR",
"DOWNLOAD_URL",
"DEFAULT_CODEC",
"MIN_FILE_SIZE",
"DEFAULT_CHUNK_SIZE",
"TABLE_COLUMN_DEFS",
"AUTHOR_NAME_MAX_LENGTH",
"AUTHOR_NAME_DISPLAY_LENGTH",
"PROGRESS_COLUMN_INDEX",
"SEEK_SECONDS",
"TABLE_CSS",
]

View File

@@ -0,0 +1,6 @@
"""Download-related constants for Audible file retrieval."""
DOWNLOAD_URL = "https://cde-ta-g7g.amazon.com/FionaCDEServiceEngine/FSDownloadContent"
DEFAULT_CODEC = "LC_128_44100_stereo"
MIN_FILE_SIZE = 1024 * 1024
DEFAULT_CHUNK_SIZE = 8192

View File

@@ -0,0 +1,5 @@
"""Library and table formatting constants."""
AUTHOR_NAME_MAX_LENGTH = 40
AUTHOR_NAME_DISPLAY_LENGTH = 37
PROGRESS_COLUMN_INDEX = 3

View File

@@ -0,0 +1,8 @@
"""Filesystem paths used by configuration and caching."""
from pathlib import Path
AUTH_PATH = Path.home() / ".config" / "auditui" / "auth.json"
CONFIG_PATH = Path.home() / ".config" / "auditui" / "config.json"
CACHE_DIR = Path.home() / ".cache" / "auditui" / "books"

View File

@@ -0,0 +1,3 @@
"""Playback behavior constants."""
SEEK_SECONDS = 30.0

View File

@@ -0,0 +1,9 @@
"""Main library table column definitions."""
TABLE_COLUMN_DEFS = (
("Title", 4),
("Author", 3),
("Length", 1),
("Progress", 1),
("Downloaded", 1),
)

255
auditui/constants/ui.py Normal file
View File

@@ -0,0 +1,255 @@
"""Textual CSS constants for the application UI."""
TABLE_CSS = """
Screen {
background: #141622;
}
#top_bar {
background: #10131f;
color: #d5d9f0;
text-style: bold;
height: 1;
margin: 0;
padding: 0;
}
#top_left,
#top_center,
#top_right {
width: 1fr;
padding: 0 1;
background: #10131f;
margin: 0;
}
#top_left {
text-align: left;
}
#top_center {
text-align: center;
}
#top_right {
text-align: right;
}
DataTable {
width: 100%;
height: 1fr;
background: #141622;
color: #c7cfe8;
border: solid #262a3f;
scrollbar-size-horizontal: 0;
}
DataTable:focus {
border: solid #7aa2f7;
}
DataTable > .datatable--header {
background: #1b2033;
color: #b9c3e3;
text-style: bold;
}
DataTable > .datatable--cursor {
background: #232842;
color: #e6ebff;
}
DataTable > .datatable--odd-row {
background: #121422;
}
DataTable > .datatable--even-row {
background: #15182a;
}
Static {
height: 1;
text-align: center;
background: #10131f;
color: #c7cfe8;
}
Static#status {
color: #b6bfdc;
}
Static#progress_info {
color: #7aa2f7;
text-style: bold;
margin: 0;
padding: 0;
text-align: center;
width: 100%;
}
#progress_bar_container {
align: center middle;
width: 100%;
height: 1;
}
ProgressBar#progress_bar {
height: 1;
background: #10131f;
border: none;
margin: 0;
padding: 0;
width: 50%;
}
ProgressBar#progress_bar Bar {
width: 100%;
}
ProgressBar#progress_bar > .progress-bar--track {
background: #262a3f;
}
ProgressBar#progress_bar > .progress-bar--bar {
background: #8bd5ca;
}
HelpScreen,
StatsScreen,
FilterScreen {
align: center middle;
background: rgba(0, 0, 0, 0.7);
}
HelpScreen Static,
StatsScreen Static,
FilterScreen Static {
background: transparent;
}
StatsScreen #help_container {
width: auto;
min-width: 55;
max-width: 70;
}
StatsScreen #help_content {
align: center middle;
width: 100%;
}
StatsScreen .help_list {
width: 100%;
}
StatsScreen .help_list > ListItem {
background: transparent;
height: 1;
}
StatsScreen .help_list > ListItem:hover {
background: #232842;
}
StatsScreen .help_list > ListItem > Label {
width: 100%;
text-align: left;
padding-left: 2;
}
#help_container {
width: 72%;
max-width: 90;
min-width: 44;
height: auto;
max-height: 80%;
min-height: 14;
background: #181a2a;
border: heavy #7aa2f7;
padding: 1 1;
}
#help_title {
width: 100%;
height: 2;
text-align: center;
text-style: bold;
color: #7aa2f7;
content-align: center middle;
margin-bottom: 0;
border-bottom: solid #4b5165;
}
#help_content {
width: 100%;
height: auto;
padding: 0;
margin: 0 0 1 0;
align: center middle;
}
.help_list {
width: 100%;
height: auto;
background: transparent;
padding: 0;
scrollbar-size: 0 0;
}
.help_list > ListItem {
background: #1b1f33;
padding: 0 1;
height: 1;
}
.help_list > ListItem:hover {
background: #2a2f45;
}
.help_list > ListItem > Label {
width: 100%;
padding: 0;
}
#help_footer {
width: 100%;
height: 2;
text-align: center;
content-align: center middle;
color: #b6bfdc;
margin-top: 0;
border-top: solid #4b5165;
}
#filter_container {
width: 60;
height: auto;
background: #181a2a;
border: heavy #7aa2f7;
padding: 1 2;
}
#filter_title {
width: 100%;
height: 2;
text-align: center;
text-style: bold;
color: #7aa2f7;
content-align: center middle;
margin-bottom: 1;
}
#filter_input {
width: 100%;
margin: 1 0;
}
#filter_footer {
width: 100%;
height: 2;
text-align: center;
content-align: center middle;
color: #b6bfdc;
margin-top: 1;
}
"""

View File

@@ -3,6 +3,7 @@
import re
import unicodedata
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
import audible
@@ -30,7 +31,7 @@ class DownloadManager:
chunk_size: int = DEFAULT_CHUNK_SIZE,
) -> None:
self.auth = auth
self.client = client
self.client: Any = client
self.cache_dir = cache_dir
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.chunk_size = chunk_size
@@ -63,28 +64,61 @@ class DownloadManager:
if notify:
notify(f"Downloading to {local_path.name}...")
if not self._download_to_valid_file(asin, local_path, notify):
return None
return local_path
def _download_to_valid_file(
self,
asin: str,
local_path: Path,
notify: StatusCallback | None = None,
) -> bool:
"""Download with one retry and ensure resulting file has a valid size."""
for attempt in range(1, 3):
if not self._attempt_download(asin, local_path, notify):
return False
if local_path.exists() and local_path.stat().st_size >= MIN_FILE_SIZE:
return True
downloaded_size = local_path.stat().st_size if local_path.exists() else 0
if notify and attempt == 1:
notify(
f"Downloaded file too small ({downloaded_size} bytes), retrying..."
)
if notify and attempt == 2:
notify(
f"Download failed: file too small ({downloaded_size} bytes, expected >= {MIN_FILE_SIZE})"
)
self._cleanup_partial_file(local_path)
return False
def _attempt_download(
self,
asin: str,
local_path: Path,
notify: StatusCallback | None = None,
) -> bool:
"""Perform one download attempt including link lookup and URL validation."""
dl_link = self._get_download_link(asin, notify=notify)
if not dl_link:
if notify:
notify("Failed to get download link")
return None
return False
if not self._validate_download_url(dl_link):
if notify:
notify("Invalid download URL")
return None
return False
if not self._download_file(dl_link, local_path, notify):
if notify:
notify("Download failed")
return None
return False
if not local_path.exists() or local_path.stat().st_size < MIN_FILE_SIZE:
if notify:
notify("Download failed or file too small")
return None
return local_path
return True
def get_activation_bytes(self) -> str | None:
"""Return activation bytes as hex string for ffplay/ffmpeg."""
@@ -244,19 +278,7 @@ class DownloadManager:
with self._download_client.stream("GET", url) as response:
response.raise_for_status()
total_size = int(response.headers.get("content-length", 0))
downloaded = 0
with open(dest_path, "wb") as file_handle:
for chunk in response.iter_bytes(chunk_size=self.chunk_size):
file_handle.write(chunk)
downloaded += len(chunk)
if total_size > 0 and notify:
percent = (downloaded / total_size) * 100
downloaded_mb = downloaded / (1024 * 1024)
total_mb = total_size / (1024 * 1024)
notify(
f"Downloading: {percent:.1f}% ({downloaded_mb:.1f}/{total_mb:.1f} MB)"
)
self._stream_to_file(response, dest_path, total_size, notify)
return dest_path
except httpx.HTTPStatusError as exc:
@@ -264,31 +286,56 @@ class DownloadManager:
notify(
f"Download HTTP error: {exc.response.status_code} {exc.response.reason_phrase}"
)
try:
if dest_path.exists() and dest_path.stat().st_size < MIN_FILE_SIZE:
dest_path.unlink()
except OSError:
pass
self._cleanup_partial_file(dest_path)
return None
except httpx.HTTPError as exc:
if notify:
notify(f"Download network error: {exc!s}")
try:
if dest_path.exists() and dest_path.stat().st_size < MIN_FILE_SIZE:
dest_path.unlink()
except OSError:
pass
self._cleanup_partial_file(dest_path)
return None
except (OSError, ValueError, KeyError) as exc:
if notify:
notify(f"Download error: {exc!s}")
try:
if dest_path.exists() and dest_path.stat().st_size < MIN_FILE_SIZE:
dest_path.unlink()
except OSError:
pass
self._cleanup_partial_file(dest_path)
return None
def _stream_to_file(
self,
response: httpx.Response,
dest_path: Path,
total_size: int,
notify: StatusCallback | None = None,
) -> None:
"""Write streamed response bytes to disk and emit progress messages."""
downloaded = 0
with open(dest_path, "wb") as file_handle:
for chunk in response.iter_bytes(chunk_size=self.chunk_size):
file_handle.write(chunk)
downloaded += len(chunk)
self._notify_download_progress(downloaded, total_size, notify)
def _notify_download_progress(
self,
downloaded: int,
total_size: int,
notify: StatusCallback | None = None,
) -> None:
"""Emit a formatted progress message when total size is known."""
if total_size <= 0 or not notify:
return
percent = (downloaded / total_size) * 100
downloaded_mb = downloaded / (1024 * 1024)
total_mb = total_size / (1024 * 1024)
notify(f"Downloading: {percent:.1f}% ({downloaded_mb:.1f}/{total_mb:.1f} MB)")
def _cleanup_partial_file(self, dest_path: Path) -> None:
"""Remove undersized partial download files after transfer failures."""
try:
if dest_path.exists() and dest_path.stat().st_size < MIN_FILE_SIZE:
dest_path.unlink()
except OSError:
return
def close(self) -> None:
"""Close internal HTTP clients. Safe to call multiple times."""
if hasattr(self, "_http_client"):

View File

@@ -1,365 +1,25 @@
"""Client for the Audible library API."""
"""Client facade for Audible library fetch, extraction, and progress updates."""
from concurrent.futures import ThreadPoolExecutor, as_completed
from __future__ import annotations
import audible
from ..types import LibraryItem, StatusCallback
from .client_extract import LibraryClientExtractMixin
from .client_fetch import LibraryClientFetchMixin
from .client_finished import LibraryClientFinishedMixin
from .client_format import LibraryClientFormatMixin
from .client_positions import LibraryClientPositionsMixin
class LibraryClient:
"""Client for the Audible library API. Fetches items, extracts metadata, and updates positions."""
class LibraryClient(
LibraryClientFetchMixin,
LibraryClientExtractMixin,
LibraryClientPositionsMixin,
LibraryClientFinishedMixin,
LibraryClientFormatMixin,
):
"""Audible library client composed from focused behavior mixins."""
def __init__(self, client: audible.Client) -> None:
"""Store authenticated Audible client used by all operations."""
self.client = client
def fetch_all_items(self, on_progress: StatusCallback | None = None) -> list[LibraryItem]:
"""Fetch all library items from the API."""
response_groups = (
"contributors,media,product_attrs,product_desc,product_details,"
"is_finished,listening_status,percent_complete"
)
return self._fetch_all_pages(response_groups, on_progress)
def _fetch_page(
self, page: int, page_size: int, response_groups: str
) -> tuple[int, list[LibraryItem]]:
"""Fetch a single page of library items from the API."""
library = self.client.get(
path="library",
num_results=page_size,
page=page,
response_groups=response_groups,
)
items = library.get("items", [])
return page, list(items)
def _fetch_all_pages(
self, response_groups: str, on_progress: StatusCallback | None = None
) -> list[LibraryItem]:
"""Fetch all pages of library items using parallel requests."""
library_response = None
page_size = 200
for attempt_size in [200, 100, 50]:
try:
library_response = self.client.get(
path="library",
num_results=attempt_size,
page=1,
response_groups=response_groups,
)
page_size = attempt_size
break
except Exception:
continue
if not library_response:
return []
first_page_items = library_response.get("items", [])
if not first_page_items:
return []
all_items: list[LibraryItem] = list(first_page_items)
if on_progress:
on_progress(f"Fetched page 1 ({len(first_page_items)} items)...")
if len(first_page_items) < page_size:
return all_items
total_items_estimate = library_response.get(
"total_results") or library_response.get("total")
if total_items_estimate:
estimated_pages = (total_items_estimate +
page_size - 1) // page_size
estimated_pages = min(estimated_pages, 1000)
else:
estimated_pages = 500
max_workers = 50
page_results: dict[int, list[LibraryItem]] = {}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_page: dict = {}
for page in range(2, estimated_pages + 1):
future = executor.submit(
self._fetch_page, page, page_size, response_groups
)
future_to_page[future] = page
completed_count = 0
total_items = len(first_page_items)
for future in as_completed(future_to_page):
page_num = future_to_page.pop(future)
try:
fetched_page, items = future.result()
if not items or len(items) < page_size:
for remaining_future in list(future_to_page.keys()):
remaining_future.cancel()
break
page_results[fetched_page] = items
total_items += len(items)
completed_count += 1
if on_progress and completed_count % 20 == 0:
on_progress(
f"Fetched {completed_count} pages ({total_items} items)..."
)
except Exception:
pass
for page_num in sorted(page_results.keys()):
all_items.extend(page_results[page_num])
return all_items
def extract_title(self, item: LibraryItem) -> str:
"""Return the book title from a library item."""
product = item.get("product", {})
return (
product.get("title")
or item.get("title")
or product.get("asin", "Unknown Title")
)
def extract_authors(self, item: LibraryItem) -> str:
"""Return comma-separated author names from a library item."""
product = item.get("product", {})
authors = product.get("authors") or product.get("contributors") or []
if not authors and "authors" in item:
authors = item.get("authors", [])
author_names = [a.get("name", "")
for a in authors if isinstance(a, dict)]
return ", ".join(author_names) or "Unknown"
def extract_runtime_minutes(self, item: LibraryItem) -> int | None:
"""Return runtime in minutes if present."""
product = item.get("product", {})
runtime_fields = [
"runtime_length_min",
"runtime_length",
"vLength",
"length",
"duration",
]
runtime = None
for field in runtime_fields:
runtime = product.get(field) or item.get(field)
if runtime is not None:
break
if runtime is None:
return None
if isinstance(runtime, dict):
return int(runtime.get("min", 0))
if isinstance(runtime, (int, float)):
return int(runtime)
return None
def extract_progress_info(self, item: LibraryItem) -> float | None:
"""Return progress percentage (0–100) if present."""
percent_complete = item.get("percent_complete")
listening_status = item.get("listening_status", {})
if isinstance(listening_status, dict) and percent_complete is None:
percent_complete = listening_status.get("percent_complete")
return float(percent_complete) if percent_complete is not None else None
def extract_asin(self, item: LibraryItem) -> str | None:
"""Return the ASIN for a library item."""
product = item.get("product", {})
return item.get("asin") or product.get("asin")
def is_finished(self, item: LibraryItem) -> bool:
"""Return True if the item is marked or inferred as finished."""
is_finished_flag = item.get("is_finished")
percent_complete = item.get("percent_complete")
listening_status = item.get("listening_status")
if isinstance(listening_status, dict):
is_finished_flag = is_finished_flag or listening_status.get(
"is_finished", False
)
if percent_complete is None:
percent_complete = listening_status.get("percent_complete", 0)
return bool(is_finished_flag) or (
isinstance(percent_complete, (int, float))
and percent_complete >= 100
)
def get_last_position(self, asin: str) -> float | None:
"""Get the last playback position for a book in seconds."""
try:
response = self.client.get(
path="1.0/annotations/lastpositions",
asins=asin,
)
annotations = response.get("asin_last_position_heard_annots", [])
for annot in annotations:
if annot.get("asin") != asin:
continue
last_position_heard = annot.get("last_position_heard", {})
if not isinstance(last_position_heard, dict):
continue
if last_position_heard.get("status") == "DoesNotExist":
return None
position_ms = last_position_heard.get("position_ms")
if position_ms is not None:
return float(position_ms) / 1000.0
return None
except (OSError, ValueError, KeyError):
return None
def _get_content_reference(self, asin: str) -> dict | None:
"""Fetch content reference (ACR and version) for position updates."""
try:
response = self.client.get(
path=f"1.0/content/{asin}/metadata",
response_groups="content_reference",
)
content_metadata = response.get("content_metadata", {})
content_reference = content_metadata.get("content_reference", {})
if isinstance(content_reference, dict):
return content_reference
return None
except (OSError, ValueError, KeyError):
return None
def _update_position(self, asin: str, position_seconds: float) -> bool:
"""Persist playback position to the API. Returns True on success."""
if position_seconds < 0:
return False
content_ref = self._get_content_reference(asin)
if not content_ref:
return False
acr = content_ref.get("acr")
if not acr:
return False
body = {
"acr": acr,
"asin": asin,
"position_ms": int(position_seconds * 1000),
}
if version := content_ref.get("version"):
body["version"] = version
try:
self.client.put(
path=f"1.0/lastpositions/{asin}",
body=body,
)
return True
except (OSError, ValueError, KeyError):
return False
def save_last_position(self, asin: str, position_seconds: float) -> bool:
"""Save playback position to Audible. Returns True on success."""
if position_seconds <= 0:
return False
return self._update_position(asin, position_seconds)
@staticmethod
def format_duration(
value: int | None, unit: str = "minutes", default_none: str | None = None
) -> str | None:
"""Format a duration value as e.g. 2h30m or 45m."""
if value is None or value <= 0:
return default_none
total_minutes = int(value)
if unit == "seconds":
total_minutes //= 60
hours, minutes = divmod(total_minutes, 60)
if hours > 0:
return f"{hours}h{minutes:02d}" if minutes else f"{hours}h"
return f"{minutes}m"
def mark_as_finished(self, asin: str, item: LibraryItem | None = None) -> bool:
"""Mark a book as finished on Audible. Optionally mutates item in place."""
total_ms = self._get_runtime_ms(asin, item)
if not total_ms:
return False
position_ms = total_ms
acr = self._get_acr(asin)
if not acr:
return False
try:
self.client.put(
path=f"1.0/lastpositions/{asin}",
body={"asin": asin, "acr": acr, "position_ms": position_ms},
)
if item:
item["is_finished"] = True
listening_status = item.get("listening_status", {})
if isinstance(listening_status, dict):
listening_status["is_finished"] = True
return True
except Exception:
return False
def _get_runtime_ms(self, asin: str, item: LibraryItem | None = None) -> int | None:
"""Return total runtime in ms from item or API."""
if item:
runtime_min = self.extract_runtime_minutes(item)
if runtime_min:
return runtime_min * 60 * 1000
try:
response = self.client.get(
path=f"1.0/content/{asin}/metadata",
response_groups="chapter_info",
)
chapter_info = response.get(
"content_metadata", {}).get("chapter_info", {})
return chapter_info.get("runtime_length_ms")
except Exception:
return None
def _get_acr(self, asin: str) -> str | None:
"""Fetch ACR token required for position and finish updates."""
try:
response = self.client.post(
path=f"1.0/content/{asin}/licenserequest",
body={
"response_groups": "content_reference",
"consumption_type": "Download",
"drm_type": "Adrm",
},
)
return response.get("content_license", {}).get("acr")
except Exception:
return None
@staticmethod
def format_time(seconds: float) -> str:
"""Format seconds as HH:MM:SS or MM:SS for display."""
total_seconds = int(seconds)
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
secs = total_seconds % 60
if hours > 0:
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
return f"{minutes:02d}:{secs:02d}"

View File

@@ -0,0 +1,84 @@
"""Metadata extraction helpers for library items."""
from __future__ import annotations
from ..types import LibraryItem
class LibraryClientExtractMixin:
"""Extracts display and status fields from library items."""
def extract_title(self, item: LibraryItem) -> str:
"""Return the book title from a library item."""
product = item.get("product", {})
return (
product.get("title")
or item.get("title")
or product.get("asin", "Unknown Title")
)
def extract_authors(self, item: LibraryItem) -> str:
"""Return comma-separated author names from a library item."""
product = item.get("product", {})
authors = product.get("authors") or product.get("contributors") or []
if not authors and "authors" in item:
authors = item.get("authors", [])
author_names = [
author.get("name", "") for author in authors if isinstance(author, dict)
]
return ", ".join(author_names) or "Unknown"
def extract_runtime_minutes(self, item: LibraryItem) -> int | None:
"""Return runtime in minutes if present."""
product = item.get("product", {})
runtime_fields = [
"runtime_length_min",
"runtime_length",
"vLength",
"length",
"duration",
]
runtime = None
for field in runtime_fields:
runtime = product.get(field) or item.get(field)
if runtime is not None:
break
if runtime is None:
return None
if isinstance(runtime, dict):
return int(runtime.get("min", 0))
if isinstance(runtime, (int, float)):
return int(runtime)
return None
def extract_progress_info(self, item: LibraryItem) -> float | None:
"""Return progress percentage (0-100) if present."""
percent_complete = item.get("percent_complete")
listening_status = item.get("listening_status", {})
if isinstance(listening_status, dict) and percent_complete is None:
percent_complete = listening_status.get("percent_complete")
return float(percent_complete) if percent_complete is not None else None
def extract_asin(self, item: LibraryItem) -> str | None:
"""Return the ASIN for a library item."""
product = item.get("product", {})
return item.get("asin") or product.get("asin")
def is_finished(self, item: LibraryItem) -> bool:
"""Return True if the item is marked or inferred as finished."""
is_finished_flag = item.get("is_finished")
percent_complete = item.get("percent_complete")
listening_status = item.get("listening_status")
if isinstance(listening_status, dict):
is_finished_flag = is_finished_flag or listening_status.get(
"is_finished", False
)
if percent_complete is None:
percent_complete = listening_status.get("percent_complete", 0)
return bool(is_finished_flag) or (
isinstance(percent_complete, (int, float)) and percent_complete >= 100
)

View File

@@ -0,0 +1,165 @@
"""Library page fetching helpers for the Audible API client."""
from __future__ import annotations
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any
from ..types import LibraryItem, StatusCallback
class LibraryClientFetchMixin:
"""Fetches all library items from paginated Audible endpoints."""
client: Any
def fetch_all_items(
self, on_progress: StatusCallback | None = None
) -> list[LibraryItem]:
"""Fetch all library items from the API."""
response_groups = "contributors,product_attrs,product_desc,is_finished,listening_status,percent_complete"
return self._fetch_all_pages(response_groups, on_progress)
def _fetch_page(
self,
page: int,
page_size: int,
response_groups: str,
) -> tuple[int, list[LibraryItem]]:
"""Fetch one library page and return its index with items."""
library = self.client.get(
path="library",
num_results=page_size,
page=page,
response_groups=response_groups,
)
items = library.get("items", [])
return page, list(items)
def _fetch_all_pages(
self,
response_groups: str,
on_progress: StatusCallback | None = None,
) -> list[LibraryItem]:
"""Fetch all library pages using parallel requests after page one."""
library_response = None
page_size = 200
for attempt_size in [200, 100, 50]:
try:
library_response = self.client.get(
path="library",
num_results=attempt_size,
page=1,
response_groups=response_groups,
)
page_size = attempt_size
break
except Exception:
continue
if not library_response:
return []
first_page_items = library_response.get("items", [])
if not first_page_items:
return []
all_items: list[LibraryItem] = list(first_page_items)
if on_progress:
on_progress(f"Fetched page 1 ({len(first_page_items)} items)...")
if len(first_page_items) < page_size:
return all_items
estimated_pages = self._estimate_total_pages(library_response, page_size)
page_results = self._fetch_remaining_pages(
response_groups=response_groups,
page_size=page_size,
estimated_pages=estimated_pages,
initial_total=len(first_page_items),
on_progress=on_progress,
)
for page_num in sorted(page_results.keys()):
all_items.extend(page_results[page_num])
return all_items
def _estimate_total_pages(self, library_response: dict, page_size: int) -> int:
"""Estimate total pages from API metadata with a conservative cap."""
total_items_estimate = library_response.get(
"total_results"
) or library_response.get("total")
if not total_items_estimate:
return 500
estimated_pages = (total_items_estimate + page_size - 1) // page_size
return min(estimated_pages, 1000)
def _fetch_remaining_pages(
self,
response_groups: str,
page_size: int,
estimated_pages: int,
initial_total: int,
on_progress: StatusCallback | None = None,
) -> dict[int, list[LibraryItem]]:
"""Fetch pages 2..N with bounded in-flight requests for faster startup."""
page_results: dict[int, list[LibraryItem]] = {}
max_workers = min(16, max(1, estimated_pages - 1))
next_page_to_submit = 2
stop_page = estimated_pages + 1
completed_count = 0
total_items = initial_total
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_page: dict = {}
while (
next_page_to_submit <= estimated_pages
and next_page_to_submit < stop_page
and len(future_to_page) < max_workers
):
future = executor.submit(
self._fetch_page,
next_page_to_submit,
page_size,
response_groups,
)
future_to_page[future] = next_page_to_submit
next_page_to_submit += 1
while future_to_page:
future = next(as_completed(future_to_page))
page_num = future_to_page.pop(future)
try:
fetched_page, items = future.result()
except Exception:
continue
if items:
page_results[fetched_page] = items
total_items += len(items)
completed_count += 1
if on_progress and completed_count % 20 == 0:
on_progress(
f"Fetched {completed_count} pages ({total_items} items)..."
)
if len(items) < page_size:
stop_page = min(stop_page, fetched_page)
while (
next_page_to_submit <= estimated_pages
and next_page_to_submit < stop_page
and len(future_to_page) < max_workers
):
next_future = executor.submit(
self._fetch_page,
next_page_to_submit,
page_size,
response_groups,
)
future_to_page[next_future] = next_page_to_submit
next_page_to_submit += 1
return page_results

View File

@@ -0,0 +1,70 @@
"""Helpers for marking content as finished through Audible APIs."""
from __future__ import annotations
from typing import Any
from ..types import LibraryItem
class LibraryClientFinishedMixin:
"""Marks titles as finished and mutates in-memory item state."""
client: Any
def mark_as_finished(self, asin: str, item: LibraryItem | None = None) -> bool:
"""Mark a book as finished on Audible and optionally update item state."""
total_ms = self._get_runtime_ms(asin, item)
if not total_ms:
return False
acr = self._get_acr(asin)
if not acr:
return False
try:
self.client.put(
path=f"1.0/lastpositions/{asin}",
body={"asin": asin, "acr": acr, "position_ms": total_ms},
)
if item:
item["is_finished"] = True
listening_status = item.get("listening_status", {})
if isinstance(listening_status, dict):
listening_status["is_finished"] = True
return True
except Exception:
return False
def _get_runtime_ms(self, asin: str, item: LibraryItem | None = None) -> int | None:
"""Return total runtime in milliseconds from item or metadata endpoint."""
if item:
extract_runtime_minutes = getattr(self, "extract_runtime_minutes")
runtime_min = extract_runtime_minutes(item)
if runtime_min:
return runtime_min * 60 * 1000
try:
response = self.client.get(
path=f"1.0/content/{asin}/metadata",
response_groups="chapter_info",
)
chapter_info = response.get("content_metadata", {}).get("chapter_info", {})
return chapter_info.get("runtime_length_ms")
except Exception:
return None
def _get_acr(self, asin: str) -> str | None:
"""Fetch the ACR token required by finish/update write operations."""
try:
response = self.client.post(
path=f"1.0/content/{asin}/licenserequest",
body={
"response_groups": "content_reference",
"consumption_type": "Download",
"drm_type": "Adrm",
},
)
return response.get("content_license", {}).get("acr")
except Exception:
return None

View File

@@ -0,0 +1,37 @@
"""Formatting helpers exposed by the library client."""
from __future__ import annotations
class LibraryClientFormatMixin:
"""Formats durations and timestamps for display usage."""
@staticmethod
def format_duration(
value: int | None,
unit: str = "minutes",
default_none: str | None = None,
) -> str | None:
"""Format duration values as compact hour-minute strings."""
if value is None or value <= 0:
return default_none
total_minutes = int(value)
if unit == "seconds":
total_minutes //= 60
hours, minutes = divmod(total_minutes, 60)
if hours > 0:
return f"{hours}h{minutes:02d}" if minutes else f"{hours}h"
return f"{minutes}m"
@staticmethod
def format_time(seconds: float) -> str:
"""Format seconds as HH:MM:SS or MM:SS for display."""
total_seconds = int(seconds)
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
secs = total_seconds % 60
if hours > 0:
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
return f"{minutes:02d}:{secs:02d}"

View File

@@ -0,0 +1,85 @@
"""Playback position read and write helpers for library content."""
from __future__ import annotations
from typing import Any
class LibraryClientPositionsMixin:
"""Handles last-position retrieval and persistence."""
client: Any
def get_last_position(self, asin: str) -> float | None:
"""Get the last playback position for a book in seconds."""
try:
response = self.client.get(
path="1.0/annotations/lastpositions",
asins=asin,
)
annotations = response.get("asin_last_position_heard_annots", [])
for annotation in annotations:
if annotation.get("asin") != asin:
continue
last_position_heard = annotation.get("last_position_heard", {})
if not isinstance(last_position_heard, dict):
continue
if last_position_heard.get("status") == "DoesNotExist":
return None
position_ms = last_position_heard.get("position_ms")
if position_ms is not None:
return float(position_ms) / 1000.0
return None
except (OSError, ValueError, KeyError):
return None
def _get_content_reference(self, asin: str) -> dict | None:
"""Fetch content reference payload used by position update calls."""
try:
response = self.client.get(
path=f"1.0/content/{asin}/metadata",
response_groups="content_reference",
)
content_metadata = response.get("content_metadata", {})
content_reference = content_metadata.get("content_reference", {})
if isinstance(content_reference, dict):
return content_reference
return None
except (OSError, ValueError, KeyError):
return None
def _update_position(self, asin: str, position_seconds: float) -> bool:
"""Persist playback position to the API and return success state."""
if position_seconds < 0:
return False
content_ref = self._get_content_reference(asin)
if not content_ref:
return False
acr = content_ref.get("acr")
if not acr:
return False
body = {
"acr": acr,
"asin": asin,
"position_ms": int(position_seconds * 1000),
}
if version := content_ref.get("version"):
body["version"] = version
try:
self.client.put(
path=f"1.0/lastpositions/{asin}",
body=body,
)
return True
except (OSError, ValueError, KeyError):
return False
def save_last_position(self, asin: str, position_seconds: float) -> bool:
"""Save playback position to Audible and return success state."""
if position_seconds <= 0:
return False
return self._update_position(asin, position_seconds)

View File

@@ -0,0 +1,34 @@
from __future__ import annotations
from auditui.app.table import AppTableMixin
class DummyTableApp(AppTableMixin):
"""Minimal host exposing library client for row key helper tests."""
def __init__(self) -> None:
"""Initialize a fake library client with ASIN extraction."""
self.library_client = type(
"Library",
(),
{"extract_asin": lambda self, item: item.get("asin")},
)()
def test_build_row_key_prefers_asin_and_remains_unique() -> None:
"""Ensure duplicate ASINs receive deterministic unique key suffixes."""
app = DummyTableApp()
used: set[str] = set()
item = {"asin": "ASIN1"}
first = app._build_row_key(item, "Title", 0, used)
second = app._build_row_key(item, "Title", 1, used)
assert first == "ASIN1"
assert second == "ASIN1#2"
def test_build_row_key_falls_back_to_title_and_index() -> None:
"""Ensure missing ASIN values use title-index fallback keys."""
app = DummyTableApp()
used: set[str] = set()
key = app._build_row_key({"asin": None}, "Unknown Title", 3, used)
assert key == "Unknown Title#3"

View File

@@ -128,3 +128,33 @@ def test_get_or_download_uses_preferred_naming_hints(
preferred_author="Stephen King",
)
assert captured == [("11/22/63", "Stephen King")]
def test_get_or_download_retries_when_file_is_too_small(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Ensure small downloads are retried and then reported with exact byte size."""
manager = _bare_manager(tmp_path)
monkeypatch.setattr(
manager,
"_get_filename_stems_from_asin",
lambda asin, preferred_title=None, preferred_author=None: ["Author_Book"],
)
monkeypatch.setattr(
manager, "_get_download_link", lambda asin, notify=None: "https://ok"
)
attempts = {"count": 0}
def write_small_file(url: str, path: Path, notify=None) -> Path:
"""Write an undersized file to trigger retry and final failure messages."""
del url, notify
attempts["count"] += 1
path.write_bytes(b"x" * 100)
return path
monkeypatch.setattr(manager, "_download_file", write_small_file)
messages: list[str] = []
assert manager.get_or_download("ASIN", notify=messages.append) is None
assert attempts["count"] == 2
assert any("retrying" in message for message in messages)
assert any("file too small" in message for message in messages)