Compare commits

..

27 Commits

Author SHA1 Message Date
1e6f1544db Merge pull request 'Massive refactoring' (#1) from new-architecture into main
Reviewed-on: #1
2026-02-18 04:29:20 +01:00
26cba97cbd chore(changelog): document load-time tuning and duplicate row key fix 2026-02-18 04:28:35 +01:00
175bb7cbdc test(app): add coverage for unique table row key generation 2026-02-18 04:28:28 +01:00
bf0e70e9d9 fix(table): use unique row keys to avoid duplicate title crashes 2026-02-18 04:28:20 +01:00
cb4104e59a perf(app): remove eager search cache priming during library load 2026-02-18 04:28:13 +01:00
570639e988 perf(library): tune first-page probe order for faster medium-library loads 2026-02-18 04:28:06 +01:00
5ba0fafbc1 chore: update changelog 2026-02-18 04:20:56 +01:00
bed0ac4fea test(downloads): add regression coverage for too-small download retry behavior 2026-02-18 04:19:39 +01:00
0a909484e3 fix(downloads): retry undersized downloads and surface precise size failure messages 2026-02-18 04:19:33 +01:00
ecdd953ff4 refactor(downloads): split download streaming into focused helpers and reduce complexity 2026-02-18 04:12:54 +01:00
4ba2c43c93 clean: remove unused import 2026-02-18 04:09:04 +01:00
4b1924edd8 chore: update changelog 2026-02-18 04:08:41 +01:00
da20e84513 docs: update readme 2026-02-18 04:02:20 +01:00
dcb43f65dd chore: fix wording 2026-02-18 03:57:51 +01:00
beca8ee085 perf(library): optimize paginated fetch with bounded concurrent scheduling 2026-02-18 03:56:44 +01:00
e813267d5e refactor(library): decompose monolithic LibraryClient into fetch/extract/positions/finished/format mixins while preserving public behavior 2026-02-18 03:54:16 +01:00
eca58423dc refactor(constants): split constants into domain modules with compatibility exports 2026-02-18 03:44:26 +01:00
307368480a chore: update changelog 2026-02-18 03:39:53 +01:00
a8add30928 chore(changelog): document download filename metadata fallback fix 2026-02-18 03:39:18 +01:00
3e6e31c2db test(playback): verify prepare_and_start passes naming hints to downloads 2026-02-18 03:39:13 +01:00
6335f8bbac test(downloads): cover preferred naming hint propagation in get_or_download 2026-02-18 03:39:09 +01:00
0cf2644f55 test(downloads): validate author_title stem generation and cache fallbacks 2026-02-18 03:39:04 +01:00
597e82dc20 test(app): verify playback start receives selected item metadata 2026-02-18 03:38:58 +01:00
25d56cf407 test(app): cover selected item hint forwarding for downloads 2026-02-18 03:38:54 +01:00
76c991600c fix(playback): forward preferred title and author to download manager 2026-02-18 03:38:50 +01:00
95e641a527 fix(app): pass selected item title and author as download naming hints 2026-02-18 03:38:46 +01:00
8f8cdf7bfa fix(downloads): prefer library metadata for author_title filenames with fallback stems 2026-02-18 03:38:41 +01:00
27 changed files with 1280 additions and 758 deletions

View File

@@ -11,6 +11,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- massive code refactoring - massive code refactoring
- complete test suite revamp - complete test suite revamp
- updated download cache naming to use `Author_Title` format with normalized separators
- optimized library pagination fetch with bounded concurrent scheduling
- adjusted library first-page probe order to prefer larger page sizes for medium libraries
- removed eager search cache priming during library load to reduce startup work
### Fixed
- reused library metadata for download filename generation to avoid `Unknown-Author_Unknown-Title` when title/author are already known in the UI
- fixed Audible last-position request parameter handling after library client refactor
- added retry behavior and explicit size diagnostics when downloaded files are too small
- prevented table rendering crashes by generating unique row keys instead of using title-only keys
## [0.1.6] - 2026-02-16 ## [0.1.6] - 2026-02-16

View File

@@ -2,7 +2,7 @@
A terminal-based user interface (TUI) client for [Audible](https://www.audible.fr/), written in Python 3. A terminal-based user interface (TUI) client for [Audible](https://www.audible.fr/), written in Python 3.
Currently, the only available theme is Catppuccin Mocha, following their [style guide](https://github.com/catppuccin/catppuccin/blob/main/docs/style-guide.md), as it's my preferred theme across most of my tools. The interface currently ships with a single built-in theme.
## Requirements ## Requirements
@@ -36,9 +36,9 @@ auditui --version
All set, run `auditui configure` to set up authentication, and then `auditui` to start the TUI. All set, run `auditui configure` to set up authentication, and then `auditui` to start the TUI.
### Workaround for Python 3.13 linux distribution ### Workaround for Python 3.13 Linux distributions
On some Linux distributions, Python 3.13 is already the default. So you have to install Python 3.12 manually before using `pipx`. On some Linux distributions, Python 3.13 is already the default. In that case, install Python 3.12 manually before using `pipx`.
For Arch Linux: For Arch Linux:
@@ -52,7 +52,7 @@ Once you have Python 3.12, run:
pipx install git+https://git.kharec.info/Kharec/auditui.git --python python3.12 pipx install git+https://git.kharec.info/Kharec/auditui.git --python python3.12
``` ```
As Python <3.14 is supported on `master` branch of the upstream [`audible`](https://github.com/mkb79/Audible), this should be temporary until the next version. This workaround is temporary and depends on upstream `audible` compatibility updates.
## Upgrade ## Upgrade
@@ -90,6 +90,8 @@ pipx upgrade auditui
Books are downloaded to `~/.cache/auditui/books`. Books are downloaded to `~/.cache/auditui/books`.
Downloaded files use a normalized `Author_Title.aax` naming format. For example, `Stephen King` and `11/22/63` become `Stephen-King_11-22-63.aax`.
The `d` key toggles the download state for the selected book: if the book is not cached, pressing `d` will download it; if it's already cached, pressing `d` will delete it from the cache. The `d` key toggles the download state for the selected book: if the book is not cached, pressing `d` will download it; if it's already cached, pressing `d` will delete it from the cache.
To check the total size of your cache: To check the total size of your cache:

View File

@@ -10,11 +10,8 @@ from ..ui import FilterScreen, HelpScreen, StatsScreen
class AppActionsMixin: class AppActionsMixin:
def _get_selected_asin(self) -> str | None: def _get_selected_item(self) -> dict | None:
if not self.download_manager: """Return the currently selected library item from the table."""
self.update_status(
"Not authenticated. Please restart and authenticate.")
return None
table = self.query_one("#library_table", DataTable) table = self.query_one("#library_table", DataTable)
if table.row_count == 0: if table.row_count == 0:
self.update_status("No books available") self.update_status("No books available")
@@ -23,10 +20,27 @@ class AppActionsMixin:
if cursor_row >= len(self.current_items): if cursor_row >= len(self.current_items):
self.update_status("Invalid selection") self.update_status("Invalid selection")
return None return None
return self.current_items[cursor_row]
def _get_naming_hints(self, item: dict | None) -> tuple[str | None, str | None]:
"""Return preferred title and author values used for download filenames."""
if not item or not self.library_client:
return (None, None)
return (
self.library_client.extract_title(item),
self.library_client.extract_authors(item),
)
def _get_selected_asin(self) -> str | None:
if not self.download_manager:
self.update_status("Not authenticated. Please restart and authenticate.")
return None
if not self.library_client: if not self.library_client:
self.update_status("Library client not available") self.update_status("Library client not available")
return None return None
selected_item = self.current_items[cursor_row] selected_item = self._get_selected_item()
if not selected_item:
return None
asin = self.library_client.extract_asin(selected_item) asin = self.library_client.extract_asin(selected_item)
if not asin: if not asin:
self.update_status("Could not get ASIN for selected book") self.update_status("Could not get ASIN for selected book")
@@ -36,7 +50,7 @@ class AppActionsMixin:
def action_play_selected(self) -> None: def action_play_selected(self) -> None:
asin = self._get_selected_asin() asin = self._get_selected_asin()
if asin: if asin:
self._start_playback_async(asin) self._start_playback_async(asin, self._get_selected_item())
def action_toggle_playback(self) -> None: def action_toggle_playback(self) -> None:
if not self.playback.toggle_playback(): if not self.playback.toggle_playback():
@@ -86,8 +100,7 @@ class AppActionsMixin:
return return
if self.library_client.is_finished(selected_item): if self.library_client.is_finished(selected_item):
self.call_from_thread(self.update_status, self.call_from_thread(self.update_status, "Already marked as finished")
"Already marked as finished")
return return
success = self.library_client.mark_as_finished(asin, selected_item) success = self.library_client.mark_as_finished(asin, selected_item)
@@ -132,28 +145,36 @@ class AppActionsMixin:
def action_toggle_download(self) -> None: def action_toggle_download(self) -> None:
asin = self._get_selected_asin() asin = self._get_selected_asin()
if asin: if asin:
self._toggle_download_async(asin) self._toggle_download_async(asin, self._get_selected_item())
@work(exclusive=True, thread=True) @work(exclusive=True, thread=True)
def _toggle_download_async(self, asin: str) -> None: def _toggle_download_async(self, asin: str, item: dict | None = None) -> None:
if not self.download_manager: if not self.download_manager:
return return
preferred_title, preferred_author = self._get_naming_hints(item)
if self.download_manager.is_cached(asin): if self.download_manager.is_cached(asin):
self.download_manager.remove_cached( self.download_manager.remove_cached(asin, self._thread_status_update)
asin, self._thread_status_update)
else: else:
self.download_manager.get_or_download( self.download_manager.get_or_download(
asin, self._thread_status_update) asin,
self._thread_status_update,
preferred_title=preferred_title,
preferred_author=preferred_author,
)
self.call_from_thread(self._refresh_table) self.call_from_thread(self._refresh_table)
@work(exclusive=True, thread=True) @work(exclusive=True, thread=True)
def _start_playback_async(self, asin: str) -> None: def _start_playback_async(self, asin: str, item: dict | None = None) -> None:
if not self.download_manager: if not self.download_manager:
return return
preferred_title, preferred_author = self._get_naming_hints(item)
self.playback.prepare_and_start( self.playback.prepare_and_start(
self.download_manager, self.download_manager,
asin, asin,
self._thread_status_update, self._thread_status_update,
preferred_title,
preferred_author,
) )

View File

@@ -8,7 +8,7 @@ from textual.events import Resize
from textual.widgets import DataTable, ProgressBar, Static from textual.widgets import DataTable, ProgressBar, Static
from .. import __version__ from .. import __version__
from ..constants import TABLE_COLUMN_DEFS, TABLE_CSS from ..constants import TABLE_COLUMN_DEFS
class AppLayoutMixin: class AppLayoutMixin:

View File

@@ -16,16 +16,15 @@ class AppLibraryMixin:
return return
try: try:
all_items = self.library_client.fetch_all_items( all_items = self.library_client.fetch_all_items(self._thread_status_update)
self._thread_status_update)
self.call_from_thread(self.on_library_loaded, all_items) self.call_from_thread(self.on_library_loaded, all_items)
except (OSError, ValueError, KeyError) as exc: except (OSError, ValueError, KeyError) as exc:
self.call_from_thread(self.on_library_error, str(exc)) self.call_from_thread(self.on_library_error, str(exc))
def on_library_loaded(self, items: list[LibraryItem]) -> None: def on_library_loaded(self, items: list[LibraryItem]) -> None:
"""Store fetched items and refresh the active library view."""
self.all_items = items self.all_items = items
self._search_text_cache.clear() self._search_text_cache.clear()
self._prime_search_cache(items)
self.update_status(f"Loaded {len(items)} books") self.update_status(f"Loaded {len(items)} books")
if self.show_all_mode: if self.show_all_mode:
self.show_all() self.show_all()

View File

@@ -15,6 +15,7 @@ from textual.widgets import DataTable, Static
class AppTableMixin: class AppTableMixin:
def _populate_table(self, items: list[LibraryItem]) -> None: def _populate_table(self, items: list[LibraryItem]) -> None:
"""Render library items into the table with stable unique row keys."""
table = self.query_one("#library_table", DataTable) table = self.query_one("#library_table", DataTable)
table.clear() table.clear()
@@ -22,18 +23,41 @@ class AppTableMixin:
self.update_status("No books found.") self.update_status("No books found.")
return return
for item in items: used_keys: set[str] = set()
for index, item in enumerate(items):
title, author, runtime, progress, downloaded = format_item_as_row( title, author, runtime, progress, downloaded = format_item_as_row(
item, self.library_client, self.download_manager item, self.library_client, self.download_manager
) )
table.add_row(title, author, runtime, row_key = self._build_row_key(item, title, index, used_keys)
progress, downloaded, key=title) table.add_row(title, author, runtime, progress, downloaded, key=row_key)
self.current_items = items self.current_items = items
status = self.query_one("#status", Static) status = self.query_one("#status", Static)
status.display = False status.display = False
self._apply_column_widths(table) self._apply_column_widths(table)
def _build_row_key(
self,
item: LibraryItem,
title: str,
index: int,
used_keys: set[str],
) -> str:
"""Return a unique table row key derived from ASIN when available."""
asin = self.library_client.extract_asin(item) if self.library_client else None
base_key = asin or f"{title}#{index}"
if base_key not in used_keys:
used_keys.add(base_key)
return base_key
suffix = 2
candidate = f"{base_key}#{suffix}"
while candidate in used_keys:
suffix += 1
candidate = f"{base_key}#{suffix}"
used_keys.add(candidate)
return candidate
def _refresh_table(self) -> None: def _refresh_table(self) -> None:
if self.current_items: if self.current_items:
self._populate_table(self.current_items) self._populate_table(self.current_items)
@@ -79,11 +103,9 @@ class AppTableMixin:
items = self.all_items items = self.all_items
if self.filter_text: if self.filter_text:
items = filter_items(items, self.filter_text, items = filter_items(items, self.filter_text, self._get_search_text)
self._get_search_text)
self._populate_table(items) self._populate_table(items)
self.update_status( self.update_status(f"Filter: '{self.filter_text}' ({len(items)} books)")
f"Filter: '{self.filter_text}' ({len(items)} books)")
return return
if not self.show_all_mode and self.library_client: if not self.show_all_mode and self.library_client:
@@ -97,6 +119,7 @@ class AppTableMixin:
if cached is not None: if cached is not None:
return cached return cached
from ..library import build_search_text from ..library import build_search_text
search_text = build_search_text(item, self.library_client) search_text = build_search_text(item, self.library_client)
self._search_text_cache[cache_key] = search_text self._search_text_cache[cache_key] = search_text
return search_text return search_text

View File

@@ -1,278 +1,29 @@
"""Paths, API/config values, and CSS used across the application.""" """Compatibility exports for constants grouped by domain modules."""
from pathlib import Path from .downloads import DEFAULT_CHUNK_SIZE, DEFAULT_CODEC, DOWNLOAD_URL, MIN_FILE_SIZE
from .library import (
AUTH_PATH = Path.home() / ".config" / "auditui" / "auth.json" AUTHOR_NAME_DISPLAY_LENGTH,
CONFIG_PATH = Path.home() / ".config" / "auditui" / "config.json" AUTHOR_NAME_MAX_LENGTH,
CACHE_DIR = Path.home() / ".cache" / "auditui" / "books" PROGRESS_COLUMN_INDEX,
DOWNLOAD_URL = "https://cde-ta-g7g.amazon.com/FionaCDEServiceEngine/FSDownloadContent"
DEFAULT_CODEC = "LC_128_44100_stereo"
MIN_FILE_SIZE = 1024 * 1024
DEFAULT_CHUNK_SIZE = 8192
TABLE_COLUMN_DEFS = (
("Title", 4),
("Author", 3),
("Length", 1),
("Progress", 1),
("Downloaded", 1),
) )
from .paths import AUTH_PATH, CACHE_DIR, CONFIG_PATH
from .playback import SEEK_SECONDS
from .table import TABLE_COLUMN_DEFS
from .ui import TABLE_CSS
AUTHOR_NAME_MAX_LENGTH = 40
AUTHOR_NAME_DISPLAY_LENGTH = 37
PROGRESS_COLUMN_INDEX = 3
SEEK_SECONDS = 30.0
TABLE_CSS = """ __all__ = [
Screen { "AUTH_PATH",
background: #141622; "CONFIG_PATH",
} "CACHE_DIR",
"DOWNLOAD_URL",
#top_bar { "DEFAULT_CODEC",
background: #10131f; "MIN_FILE_SIZE",
color: #d5d9f0; "DEFAULT_CHUNK_SIZE",
text-style: bold; "TABLE_COLUMN_DEFS",
height: 1; "AUTHOR_NAME_MAX_LENGTH",
margin: 0; "AUTHOR_NAME_DISPLAY_LENGTH",
padding: 0; "PROGRESS_COLUMN_INDEX",
} "SEEK_SECONDS",
"TABLE_CSS",
#top_left, ]
#top_center,
#top_right {
width: 1fr;
padding: 0 1;
background: #10131f;
margin: 0;
}
#top_left {
text-align: left;
}
#top_center {
text-align: center;
}
#top_right {
text-align: right;
}
DataTable {
width: 100%;
height: 1fr;
background: #141622;
color: #c7cfe8;
border: solid #262a3f;
scrollbar-size-horizontal: 0;
}
DataTable:focus {
border: solid #7aa2f7;
}
DataTable > .datatable--header {
background: #1b2033;
color: #b9c3e3;
text-style: bold;
}
DataTable > .datatable--cursor {
background: #232842;
color: #e6ebff;
}
DataTable > .datatable--odd-row {
background: #121422;
}
DataTable > .datatable--even-row {
background: #15182a;
}
Static {
height: 1;
text-align: center;
background: #10131f;
color: #c7cfe8;
}
Static#status {
color: #b6bfdc;
}
Static#progress_info {
color: #7aa2f7;
text-style: bold;
margin: 0;
padding: 0;
text-align: center;
width: 100%;
}
#progress_bar_container {
align: center middle;
width: 100%;
height: 1;
}
ProgressBar#progress_bar {
height: 1;
background: #10131f;
border: none;
margin: 0;
padding: 0;
width: 50%;
}
ProgressBar#progress_bar Bar {
width: 100%;
}
ProgressBar#progress_bar > .progress-bar--track {
background: #262a3f;
}
ProgressBar#progress_bar > .progress-bar--bar {
background: #8bd5ca;
}
HelpScreen,
StatsScreen,
FilterScreen {
align: center middle;
background: rgba(0, 0, 0, 0.7);
}
HelpScreen Static,
StatsScreen Static,
FilterScreen Static {
background: transparent;
}
StatsScreen #help_container {
width: auto;
min-width: 55;
max-width: 70;
}
StatsScreen #help_content {
align: center middle;
width: 100%;
}
StatsScreen .help_list {
width: 100%;
}
StatsScreen .help_list > ListItem {
background: transparent;
height: 1;
}
StatsScreen .help_list > ListItem:hover {
background: #232842;
}
StatsScreen .help_list > ListItem > Label {
width: 100%;
text-align: left;
padding-left: 2;
}
#help_container {
width: 72%;
max-width: 90;
min-width: 44;
height: auto;
max-height: 80%;
min-height: 14;
background: #181a2a;
border: heavy #7aa2f7;
padding: 1 1;
}
#help_title {
width: 100%;
height: 2;
text-align: center;
text-style: bold;
color: #7aa2f7;
content-align: center middle;
margin-bottom: 0;
border-bottom: solid #4b5165;
}
#help_content {
width: 100%;
height: auto;
padding: 0;
margin: 0 0 1 0;
align: center middle;
}
.help_list {
width: 100%;
height: auto;
background: transparent;
padding: 0;
scrollbar-size: 0 0;
}
.help_list > ListItem {
background: #1b1f33;
padding: 0 1;
height: 1;
}
.help_list > ListItem:hover {
background: #2a2f45;
}
.help_list > ListItem > Label {
width: 100%;
padding: 0;
}
#help_footer {
width: 100%;
height: 2;
text-align: center;
content-align: center middle;
color: #b6bfdc;
margin-top: 0;
border-top: solid #4b5165;
}
#filter_container {
width: 60;
height: auto;
background: #181a2a;
border: heavy #7aa2f7;
padding: 1 2;
}
#filter_title {
width: 100%;
height: 2;
text-align: center;
text-style: bold;
color: #7aa2f7;
content-align: center middle;
margin-bottom: 1;
}
#filter_input {
width: 100%;
margin: 1 0;
}
#filter_footer {
width: 100%;
height: 2;
text-align: center;
content-align: center middle;
color: #b6bfdc;
margin-top: 1;
}
"""

View File

@@ -0,0 +1,6 @@
"""Download-related constants for Audible file retrieval."""
DOWNLOAD_URL = "https://cde-ta-g7g.amazon.com/FionaCDEServiceEngine/FSDownloadContent"
DEFAULT_CODEC = "LC_128_44100_stereo"
MIN_FILE_SIZE = 1024 * 1024
DEFAULT_CHUNK_SIZE = 8192

View File

@@ -0,0 +1,5 @@
"""Library and table formatting constants."""
AUTHOR_NAME_MAX_LENGTH = 40
AUTHOR_NAME_DISPLAY_LENGTH = 37
PROGRESS_COLUMN_INDEX = 3

View File

@@ -0,0 +1,8 @@
"""Filesystem paths used by configuration and caching."""
from pathlib import Path
AUTH_PATH = Path.home() / ".config" / "auditui" / "auth.json"
CONFIG_PATH = Path.home() / ".config" / "auditui" / "config.json"
CACHE_DIR = Path.home() / ".cache" / "auditui" / "books"

View File

@@ -0,0 +1,3 @@
"""Playback behavior constants."""
SEEK_SECONDS = 30.0

View File

@@ -0,0 +1,9 @@
"""Main library table column definitions."""
TABLE_COLUMN_DEFS = (
("Title", 4),
("Author", 3),
("Length", 1),
("Progress", 1),
("Downloaded", 1),
)

255
auditui/constants/ui.py Normal file
View File

@@ -0,0 +1,255 @@
"""Textual CSS constants for the application UI."""
TABLE_CSS = """
Screen {
background: #141622;
}
#top_bar {
background: #10131f;
color: #d5d9f0;
text-style: bold;
height: 1;
margin: 0;
padding: 0;
}
#top_left,
#top_center,
#top_right {
width: 1fr;
padding: 0 1;
background: #10131f;
margin: 0;
}
#top_left {
text-align: left;
}
#top_center {
text-align: center;
}
#top_right {
text-align: right;
}
DataTable {
width: 100%;
height: 1fr;
background: #141622;
color: #c7cfe8;
border: solid #262a3f;
scrollbar-size-horizontal: 0;
}
DataTable:focus {
border: solid #7aa2f7;
}
DataTable > .datatable--header {
background: #1b2033;
color: #b9c3e3;
text-style: bold;
}
DataTable > .datatable--cursor {
background: #232842;
color: #e6ebff;
}
DataTable > .datatable--odd-row {
background: #121422;
}
DataTable > .datatable--even-row {
background: #15182a;
}
Static {
height: 1;
text-align: center;
background: #10131f;
color: #c7cfe8;
}
Static#status {
color: #b6bfdc;
}
Static#progress_info {
color: #7aa2f7;
text-style: bold;
margin: 0;
padding: 0;
text-align: center;
width: 100%;
}
#progress_bar_container {
align: center middle;
width: 100%;
height: 1;
}
ProgressBar#progress_bar {
height: 1;
background: #10131f;
border: none;
margin: 0;
padding: 0;
width: 50%;
}
ProgressBar#progress_bar Bar {
width: 100%;
}
ProgressBar#progress_bar > .progress-bar--track {
background: #262a3f;
}
ProgressBar#progress_bar > .progress-bar--bar {
background: #8bd5ca;
}
HelpScreen,
StatsScreen,
FilterScreen {
align: center middle;
background: rgba(0, 0, 0, 0.7);
}
HelpScreen Static,
StatsScreen Static,
FilterScreen Static {
background: transparent;
}
StatsScreen #help_container {
width: auto;
min-width: 55;
max-width: 70;
}
StatsScreen #help_content {
align: center middle;
width: 100%;
}
StatsScreen .help_list {
width: 100%;
}
StatsScreen .help_list > ListItem {
background: transparent;
height: 1;
}
StatsScreen .help_list > ListItem:hover {
background: #232842;
}
StatsScreen .help_list > ListItem > Label {
width: 100%;
text-align: left;
padding-left: 2;
}
#help_container {
width: 72%;
max-width: 90;
min-width: 44;
height: auto;
max-height: 80%;
min-height: 14;
background: #181a2a;
border: heavy #7aa2f7;
padding: 1 1;
}
#help_title {
width: 100%;
height: 2;
text-align: center;
text-style: bold;
color: #7aa2f7;
content-align: center middle;
margin-bottom: 0;
border-bottom: solid #4b5165;
}
#help_content {
width: 100%;
height: auto;
padding: 0;
margin: 0 0 1 0;
align: center middle;
}
.help_list {
width: 100%;
height: auto;
background: transparent;
padding: 0;
scrollbar-size: 0 0;
}
.help_list > ListItem {
background: #1b1f33;
padding: 0 1;
height: 1;
}
.help_list > ListItem:hover {
background: #2a2f45;
}
.help_list > ListItem > Label {
width: 100%;
padding: 0;
}
#help_footer {
width: 100%;
height: 2;
text-align: center;
content-align: center middle;
color: #b6bfdc;
margin-top: 0;
border-top: solid #4b5165;
}
#filter_container {
width: 60;
height: auto;
background: #181a2a;
border: heavy #7aa2f7;
padding: 1 2;
}
#filter_title {
width: 100%;
height: 2;
text-align: center;
text-style: bold;
color: #7aa2f7;
content-align: center middle;
margin-bottom: 1;
}
#filter_input {
width: 100%;
margin: 1 0;
}
#filter_footer {
width: 100%;
height: 2;
text-align: center;
content-align: center middle;
color: #b6bfdc;
margin-top: 1;
}
"""

View File

@@ -1,7 +1,9 @@
"""Obtains AAX files from Audible (cache or download) and provides activation bytes.""" """Obtains AAX files from Audible (cache or download) and provides activation bytes."""
import re import re
import unicodedata
from pathlib import Path from pathlib import Path
from typing import Any
from urllib.parse import urlparse from urllib.parse import urlparse
import audible import audible
@@ -29,56 +31,94 @@ class DownloadManager:
chunk_size: int = DEFAULT_CHUNK_SIZE, chunk_size: int = DEFAULT_CHUNK_SIZE,
) -> None: ) -> None:
self.auth = auth self.auth = auth
self.client = client self.client: Any = client
self.cache_dir = cache_dir self.cache_dir = cache_dir
self.cache_dir.mkdir(parents=True, exist_ok=True) self.cache_dir.mkdir(parents=True, exist_ok=True)
self.chunk_size = chunk_size self.chunk_size = chunk_size
self._http_client = httpx.Client( self._http_client = httpx.Client(auth=auth, timeout=30.0, follow_redirects=True)
auth=auth, timeout=30.0, follow_redirects=True)
self._download_client = httpx.Client( self._download_client = httpx.Client(
timeout=httpx.Timeout(connect=30.0, read=None, timeout=httpx.Timeout(connect=30.0, read=None, write=30.0, pool=30.0),
write=30.0, pool=30.0),
follow_redirects=True, follow_redirects=True,
) )
def get_or_download( def get_or_download(
self, asin: str, notify: StatusCallback | None = None self,
asin: str,
notify: StatusCallback | None = None,
preferred_title: str | None = None,
preferred_author: str | None = None,
) -> Path | None: ) -> Path | None:
"""Return local path to AAX file; download and cache if not present.""" """Return local path to AAX file; download and cache if not present."""
title = self._get_name_from_asin(asin) or asin filename_stems = self._get_filename_stems_from_asin(
safe_title = self._sanitize_filename(title) asin,
local_path = self.cache_dir / f"{safe_title}.aax" preferred_title=preferred_title,
preferred_author=preferred_author,
if local_path.exists() and local_path.stat().st_size >= MIN_FILE_SIZE: )
local_path = self.cache_dir / f"{filename_stems[0]}.aax"
cached_path = self._find_cached_path(filename_stems)
if cached_path:
if notify: if notify:
notify(f"Using cached file: {local_path.name}") notify(f"Using cached file: {cached_path.name}")
return local_path return cached_path
if notify: if notify:
notify(f"Downloading to {local_path.name}...") notify(f"Downloading to {local_path.name}...")
if not self._download_to_valid_file(asin, local_path, notify):
return None
return local_path
def _download_to_valid_file(
self,
asin: str,
local_path: Path,
notify: StatusCallback | None = None,
) -> bool:
"""Download with one retry and ensure resulting file has a valid size."""
for attempt in range(1, 3):
if not self._attempt_download(asin, local_path, notify):
return False
if local_path.exists() and local_path.stat().st_size >= MIN_FILE_SIZE:
return True
downloaded_size = local_path.stat().st_size if local_path.exists() else 0
if notify and attempt == 1:
notify(
f"Downloaded file too small ({downloaded_size} bytes), retrying..."
)
if notify and attempt == 2:
notify(
f"Download failed: file too small ({downloaded_size} bytes, expected >= {MIN_FILE_SIZE})"
)
self._cleanup_partial_file(local_path)
return False
def _attempt_download(
self,
asin: str,
local_path: Path,
notify: StatusCallback | None = None,
) -> bool:
"""Perform one download attempt including link lookup and URL validation."""
dl_link = self._get_download_link(asin, notify=notify) dl_link = self._get_download_link(asin, notify=notify)
if not dl_link: if not dl_link:
if notify: if notify:
notify("Failed to get download link") notify("Failed to get download link")
return None return False
if not self._validate_download_url(dl_link): if not self._validate_download_url(dl_link):
if notify: if notify:
notify("Invalid download URL") notify("Invalid download URL")
return None return False
if not self._download_file(dl_link, local_path, notify): if not self._download_file(dl_link, local_path, notify):
if notify: if notify:
notify("Download failed") notify("Download failed")
return None return False
if not local_path.exists() or local_path.stat().st_size < MIN_FILE_SIZE: return True
if notify:
notify("Download failed or file too small")
return None
return local_path
def get_activation_bytes(self) -> str | None: def get_activation_bytes(self) -> str | None:
"""Return activation bytes as hex string for ffplay/ffmpeg.""" """Return activation bytes as hex string for ffplay/ffmpeg."""
@@ -92,12 +132,7 @@ class DownloadManager:
def get_cached_path(self, asin: str) -> Path | None: def get_cached_path(self, asin: str) -> Path | None:
"""Return path to cached AAX file if it exists and is valid size.""" """Return path to cached AAX file if it exists and is valid size."""
title = self._get_name_from_asin(asin) or asin return self._find_cached_path(self._get_filename_stems_from_asin(asin))
safe_title = self._sanitize_filename(title)
local_path = self.cache_dir / f"{safe_title}.aax"
if local_path.exists() and local_path.stat().st_size >= MIN_FILE_SIZE:
return local_path
return None
def is_cached(self, asin: str) -> bool: def is_cached(self, asin: str) -> bool:
"""Return True if the title is present in cache with valid size.""" """Return True if the title is present in cache with valid size."""
@@ -130,20 +165,68 @@ class DownloadManager:
return False return False
def _sanitize_filename(self, filename: str) -> str: def _sanitize_filename(self, filename: str) -> str:
"""Remove invalid characters from filename.""" """Normalize a filename segment with ASCII letters, digits, and dashes."""
return re.sub(r'[<>:"/\\|?*]', "_", filename) ascii_text = unicodedata.normalize("NFKD", filename)
ascii_text = ascii_text.encode("ascii", "ignore").decode("ascii")
ascii_text = re.sub(r"[’'`]+", "", ascii_text)
ascii_text = re.sub(r"[^A-Za-z0-9]+", "-", ascii_text)
ascii_text = re.sub(r"-+", "-", ascii_text)
ascii_text = ascii_text.strip("-._")
return ascii_text or "Unknown"
def _find_cached_path(self, filename_stems: list[str]) -> Path | None:
"""Return the first valid cached path matching any candidate filename stem."""
for filename_stem in filename_stems:
local_path = self.cache_dir / f"{filename_stem}.aax"
if local_path.exists() and local_path.stat().st_size >= MIN_FILE_SIZE:
return local_path
return None
def _get_filename_stems_from_asin(
self,
asin: str,
preferred_title: str | None = None,
preferred_author: str | None = None,
) -> list[str]:
"""Build preferred and fallback cache filename stems for an ASIN."""
if preferred_title:
preferred_combined = (
f"{self._sanitize_filename(preferred_author or 'Unknown Author')}_"
f"{self._sanitize_filename(preferred_title)}"
)
preferred_legacy = self._sanitize_filename(preferred_title)
fallback_asin = self._sanitize_filename(asin)
return list(
dict.fromkeys([preferred_combined, preferred_legacy, fallback_asin])
)
def _get_name_from_asin(self, asin: str) -> str | None:
"""Get the title/name of a book from its ASIN."""
try: try:
product_info = self.client.get( product_info = self.client.get(
path=f"1.0/catalog/products/{asin}", path=f"1.0/catalog/products/{asin}",
response_groups="product_desc,product_attrs", **{"response_groups": "contributors,product_desc,product_attrs"},
) )
product = product_info.get("product", {}) product = product_info.get("product", {})
return product.get("title") or "Unknown Title" title = product.get("title") or "Unknown Title"
except (OSError, ValueError, KeyError): author = self._get_primary_author(product)
return None combined = (
f"{self._sanitize_filename(author)}_{self._sanitize_filename(title)}"
)
legacy_title = self._sanitize_filename(title)
fallback_asin = self._sanitize_filename(asin)
return list(dict.fromkeys([combined, legacy_title, fallback_asin]))
except (OSError, ValueError, KeyError, AttributeError):
return [self._sanitize_filename(asin)]
def _get_primary_author(self, product: dict) -> str:
"""Extract a primary author name from product metadata."""
contributors = product.get("authors") or product.get("contributors") or []
for contributor in contributors:
if not isinstance(contributor, dict):
continue
name = contributor.get("name")
if isinstance(name, str) and name.strip():
return name
return "Unknown Author"
def _get_download_link( def _get_download_link(
self, self,
@@ -174,7 +257,8 @@ class DownloadManager:
if not link: if not link:
link = str(response.url) link = str(response.url)
tld = self.auth.locale.domain locale = getattr(self.auth, "locale", None)
tld = getattr(locale, "domain", "com")
return link.replace("cds.audible.com", f"cds.audible.{tld}") return link.replace("cds.audible.com", f"cds.audible.{tld}")
except httpx.HTTPError as exc: except httpx.HTTPError as exc:
@@ -194,19 +278,7 @@ class DownloadManager:
with self._download_client.stream("GET", url) as response: with self._download_client.stream("GET", url) as response:
response.raise_for_status() response.raise_for_status()
total_size = int(response.headers.get("content-length", 0)) total_size = int(response.headers.get("content-length", 0))
downloaded = 0 self._stream_to_file(response, dest_path, total_size, notify)
with open(dest_path, "wb") as file_handle:
for chunk in response.iter_bytes(chunk_size=self.chunk_size):
file_handle.write(chunk)
downloaded += len(chunk)
if total_size > 0 and notify:
percent = (downloaded / total_size) * 100
downloaded_mb = downloaded / (1024 * 1024)
total_mb = total_size / (1024 * 1024)
notify(
f"Downloading: {percent:.1f}% ({downloaded_mb:.1f}/{total_mb:.1f} MB)"
)
return dest_path return dest_path
except httpx.HTTPStatusError as exc: except httpx.HTTPStatusError as exc:
@@ -214,30 +286,55 @@ class DownloadManager:
notify( notify(
f"Download HTTP error: {exc.response.status_code} {exc.response.reason_phrase}" f"Download HTTP error: {exc.response.status_code} {exc.response.reason_phrase}"
) )
try: self._cleanup_partial_file(dest_path)
if dest_path.exists() and dest_path.stat().st_size < MIN_FILE_SIZE:
dest_path.unlink()
except OSError:
pass
return None return None
except httpx.HTTPError as exc: except httpx.HTTPError as exc:
if notify: if notify:
notify(f"Download network error: {exc!s}") notify(f"Download network error: {exc!s}")
try: self._cleanup_partial_file(dest_path)
if dest_path.exists() and dest_path.stat().st_size < MIN_FILE_SIZE:
dest_path.unlink()
except OSError:
pass
return None return None
except (OSError, ValueError, KeyError) as exc: except (OSError, ValueError, KeyError) as exc:
if notify: if notify:
notify(f"Download error: {exc!s}") notify(f"Download error: {exc!s}")
self._cleanup_partial_file(dest_path)
return None
def _stream_to_file(
self,
response: httpx.Response,
dest_path: Path,
total_size: int,
notify: StatusCallback | None = None,
) -> None:
"""Write streamed response bytes to disk and emit progress messages."""
downloaded = 0
with open(dest_path, "wb") as file_handle:
for chunk in response.iter_bytes(chunk_size=self.chunk_size):
file_handle.write(chunk)
downloaded += len(chunk)
self._notify_download_progress(downloaded, total_size, notify)
def _notify_download_progress(
self,
downloaded: int,
total_size: int,
notify: StatusCallback | None = None,
) -> None:
"""Emit a formatted progress message when total size is known."""
if total_size <= 0 or not notify:
return
percent = (downloaded / total_size) * 100
downloaded_mb = downloaded / (1024 * 1024)
total_mb = total_size / (1024 * 1024)
notify(f"Downloading: {percent:.1f}% ({downloaded_mb:.1f}/{total_mb:.1f} MB)")
def _cleanup_partial_file(self, dest_path: Path) -> None:
"""Remove undersized partial download files after transfer failures."""
try: try:
if dest_path.exists() and dest_path.stat().st_size < MIN_FILE_SIZE: if dest_path.exists() and dest_path.stat().st_size < MIN_FILE_SIZE:
dest_path.unlink() dest_path.unlink()
except OSError: except OSError:
pass return
return None
def close(self) -> None: def close(self) -> None:
"""Close internal HTTP clients. Safe to call multiple times.""" """Close internal HTTP clients. Safe to call multiple times."""

View File

@@ -1,365 +1,25 @@
"""Client for the Audible library API.""" """Client facade for Audible library fetch, extraction, and progress updates."""
from concurrent.futures import ThreadPoolExecutor, as_completed from __future__ import annotations
import audible import audible
from ..types import LibraryItem, StatusCallback from .client_extract import LibraryClientExtractMixin
from .client_fetch import LibraryClientFetchMixin
from .client_finished import LibraryClientFinishedMixin
from .client_format import LibraryClientFormatMixin
from .client_positions import LibraryClientPositionsMixin
class LibraryClient: class LibraryClient(
"""Client for the Audible library API. Fetches items, extracts metadata, and updates positions.""" LibraryClientFetchMixin,
LibraryClientExtractMixin,
LibraryClientPositionsMixin,
LibraryClientFinishedMixin,
LibraryClientFormatMixin,
):
"""Audible library client composed from focused behavior mixins."""
def __init__(self, client: audible.Client) -> None: def __init__(self, client: audible.Client) -> None:
"""Store authenticated Audible client used by all operations."""
self.client = client self.client = client
def fetch_all_items(self, on_progress: StatusCallback | None = None) -> list[LibraryItem]:
"""Fetch all library items from the API."""
response_groups = (
"contributors,media,product_attrs,product_desc,product_details,"
"is_finished,listening_status,percent_complete"
)
return self._fetch_all_pages(response_groups, on_progress)
def _fetch_page(
self, page: int, page_size: int, response_groups: str
) -> tuple[int, list[LibraryItem]]:
"""Fetch a single page of library items from the API."""
library = self.client.get(
path="library",
num_results=page_size,
page=page,
response_groups=response_groups,
)
items = library.get("items", [])
return page, list(items)
def _fetch_all_pages(
self, response_groups: str, on_progress: StatusCallback | None = None
) -> list[LibraryItem]:
"""Fetch all pages of library items using parallel requests."""
library_response = None
page_size = 200
for attempt_size in [200, 100, 50]:
try:
library_response = self.client.get(
path="library",
num_results=attempt_size,
page=1,
response_groups=response_groups,
)
page_size = attempt_size
break
except Exception:
continue
if not library_response:
return []
first_page_items = library_response.get("items", [])
if not first_page_items:
return []
all_items: list[LibraryItem] = list(first_page_items)
if on_progress:
on_progress(f"Fetched page 1 ({len(first_page_items)} items)...")
if len(first_page_items) < page_size:
return all_items
total_items_estimate = library_response.get(
"total_results") or library_response.get("total")
if total_items_estimate:
estimated_pages = (total_items_estimate +
page_size - 1) // page_size
estimated_pages = min(estimated_pages, 1000)
else:
estimated_pages = 500
max_workers = 50
page_results: dict[int, list[LibraryItem]] = {}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_page: dict = {}
for page in range(2, estimated_pages + 1):
future = executor.submit(
self._fetch_page, page, page_size, response_groups
)
future_to_page[future] = page
completed_count = 0
total_items = len(first_page_items)
for future in as_completed(future_to_page):
page_num = future_to_page.pop(future)
try:
fetched_page, items = future.result()
if not items or len(items) < page_size:
for remaining_future in list(future_to_page.keys()):
remaining_future.cancel()
break
page_results[fetched_page] = items
total_items += len(items)
completed_count += 1
if on_progress and completed_count % 20 == 0:
on_progress(
f"Fetched {completed_count} pages ({total_items} items)..."
)
except Exception:
pass
for page_num in sorted(page_results.keys()):
all_items.extend(page_results[page_num])
return all_items
def extract_title(self, item: LibraryItem) -> str:
"""Return the book title from a library item."""
product = item.get("product", {})
return (
product.get("title")
or item.get("title")
or product.get("asin", "Unknown Title")
)
def extract_authors(self, item: LibraryItem) -> str:
"""Return comma-separated author names from a library item."""
product = item.get("product", {})
authors = product.get("authors") or product.get("contributors") or []
if not authors and "authors" in item:
authors = item.get("authors", [])
author_names = [a.get("name", "")
for a in authors if isinstance(a, dict)]
return ", ".join(author_names) or "Unknown"
def extract_runtime_minutes(self, item: LibraryItem) -> int | None:
"""Return runtime in minutes if present."""
product = item.get("product", {})
runtime_fields = [
"runtime_length_min",
"runtime_length",
"vLength",
"length",
"duration",
]
runtime = None
for field in runtime_fields:
runtime = product.get(field) or item.get(field)
if runtime is not None:
break
if runtime is None:
return None
if isinstance(runtime, dict):
return int(runtime.get("min", 0))
if isinstance(runtime, (int, float)):
return int(runtime)
return None
def extract_progress_info(self, item: LibraryItem) -> float | None:
"""Return progress percentage (0–100) if present."""
percent_complete = item.get("percent_complete")
listening_status = item.get("listening_status", {})
if isinstance(listening_status, dict) and percent_complete is None:
percent_complete = listening_status.get("percent_complete")
return float(percent_complete) if percent_complete is not None else None
def extract_asin(self, item: LibraryItem) -> str | None:
"""Return the ASIN for a library item."""
product = item.get("product", {})
return item.get("asin") or product.get("asin")
def is_finished(self, item: LibraryItem) -> bool:
"""Return True if the item is marked or inferred as finished."""
is_finished_flag = item.get("is_finished")
percent_complete = item.get("percent_complete")
listening_status = item.get("listening_status")
if isinstance(listening_status, dict):
is_finished_flag = is_finished_flag or listening_status.get(
"is_finished", False
)
if percent_complete is None:
percent_complete = listening_status.get("percent_complete", 0)
return bool(is_finished_flag) or (
isinstance(percent_complete, (int, float))
and percent_complete >= 100
)
def get_last_position(self, asin: str) -> float | None:
"""Get the last playback position for a book in seconds."""
try:
response = self.client.get(
path="1.0/annotations/lastpositions",
asins=asin,
)
annotations = response.get("asin_last_position_heard_annots", [])
for annot in annotations:
if annot.get("asin") != asin:
continue
last_position_heard = annot.get("last_position_heard", {})
if not isinstance(last_position_heard, dict):
continue
if last_position_heard.get("status") == "DoesNotExist":
return None
position_ms = last_position_heard.get("position_ms")
if position_ms is not None:
return float(position_ms) / 1000.0
return None
except (OSError, ValueError, KeyError):
return None
def _get_content_reference(self, asin: str) -> dict | None:
"""Fetch content reference (ACR and version) for position updates."""
try:
response = self.client.get(
path=f"1.0/content/{asin}/metadata",
response_groups="content_reference",
)
content_metadata = response.get("content_metadata", {})
content_reference = content_metadata.get("content_reference", {})
if isinstance(content_reference, dict):
return content_reference
return None
except (OSError, ValueError, KeyError):
return None
def _update_position(self, asin: str, position_seconds: float) -> bool:
"""Persist playback position to the API. Returns True on success."""
if position_seconds < 0:
return False
content_ref = self._get_content_reference(asin)
if not content_ref:
return False
acr = content_ref.get("acr")
if not acr:
return False
body = {
"acr": acr,
"asin": asin,
"position_ms": int(position_seconds * 1000),
}
if version := content_ref.get("version"):
body["version"] = version
try:
self.client.put(
path=f"1.0/lastpositions/{asin}",
body=body,
)
return True
except (OSError, ValueError, KeyError):
return False
def save_last_position(self, asin: str, position_seconds: float) -> bool:
"""Save playback position to Audible. Returns True on success."""
if position_seconds <= 0:
return False
return self._update_position(asin, position_seconds)
@staticmethod
def format_duration(
value: int | None, unit: str = "minutes", default_none: str | None = None
) -> str | None:
"""Format a duration value as e.g. 2h30m or 45m."""
if value is None or value <= 0:
return default_none
total_minutes = int(value)
if unit == "seconds":
total_minutes //= 60
hours, minutes = divmod(total_minutes, 60)
if hours > 0:
return f"{hours}h{minutes:02d}" if minutes else f"{hours}h"
return f"{minutes}m"
def mark_as_finished(self, asin: str, item: LibraryItem | None = None) -> bool:
"""Mark a book as finished on Audible. Optionally mutates item in place."""
total_ms = self._get_runtime_ms(asin, item)
if not total_ms:
return False
position_ms = total_ms
acr = self._get_acr(asin)
if not acr:
return False
try:
self.client.put(
path=f"1.0/lastpositions/{asin}",
body={"asin": asin, "acr": acr, "position_ms": position_ms},
)
if item:
item["is_finished"] = True
listening_status = item.get("listening_status", {})
if isinstance(listening_status, dict):
listening_status["is_finished"] = True
return True
except Exception:
return False
def _get_runtime_ms(self, asin: str, item: LibraryItem | None = None) -> int | None:
"""Return total runtime in ms from item or API."""
if item:
runtime_min = self.extract_runtime_minutes(item)
if runtime_min:
return runtime_min * 60 * 1000
try:
response = self.client.get(
path=f"1.0/content/{asin}/metadata",
response_groups="chapter_info",
)
chapter_info = response.get(
"content_metadata", {}).get("chapter_info", {})
return chapter_info.get("runtime_length_ms")
except Exception:
return None
def _get_acr(self, asin: str) -> str | None:
"""Fetch ACR token required for position and finish updates."""
try:
response = self.client.post(
path=f"1.0/content/{asin}/licenserequest",
body={
"response_groups": "content_reference",
"consumption_type": "Download",
"drm_type": "Adrm",
},
)
return response.get("content_license", {}).get("acr")
except Exception:
return None
@staticmethod
def format_time(seconds: float) -> str:
"""Format seconds as HH:MM:SS or MM:SS for display."""
total_seconds = int(seconds)
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
secs = total_seconds % 60
if hours > 0:
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
return f"{minutes:02d}:{secs:02d}"

View File

@@ -0,0 +1,84 @@
"""Metadata extraction helpers for library items."""
from __future__ import annotations
from ..types import LibraryItem
class LibraryClientExtractMixin:
"""Extracts display and status fields from library items."""
def extract_title(self, item: LibraryItem) -> str:
"""Return the book title from a library item."""
product = item.get("product", {})
return (
product.get("title")
or item.get("title")
or product.get("asin", "Unknown Title")
)
def extract_authors(self, item: LibraryItem) -> str:
"""Return comma-separated author names from a library item."""
product = item.get("product", {})
authors = product.get("authors") or product.get("contributors") or []
if not authors and "authors" in item:
authors = item.get("authors", [])
author_names = [
author.get("name", "") for author in authors if isinstance(author, dict)
]
return ", ".join(author_names) or "Unknown"
def extract_runtime_minutes(self, item: LibraryItem) -> int | None:
"""Return runtime in minutes if present."""
product = item.get("product", {})
runtime_fields = [
"runtime_length_min",
"runtime_length",
"vLength",
"length",
"duration",
]
runtime = None
for field in runtime_fields:
runtime = product.get(field) or item.get(field)
if runtime is not None:
break
if runtime is None:
return None
if isinstance(runtime, dict):
return int(runtime.get("min", 0))
if isinstance(runtime, (int, float)):
return int(runtime)
return None
def extract_progress_info(self, item: LibraryItem) -> float | None:
"""Return progress percentage (0-100) if present."""
percent_complete = item.get("percent_complete")
listening_status = item.get("listening_status", {})
if isinstance(listening_status, dict) and percent_complete is None:
percent_complete = listening_status.get("percent_complete")
return float(percent_complete) if percent_complete is not None else None
def extract_asin(self, item: LibraryItem) -> str | None:
"""Return the ASIN for a library item."""
product = item.get("product", {})
return item.get("asin") or product.get("asin")
def is_finished(self, item: LibraryItem) -> bool:
"""Return True if the item is marked or inferred as finished."""
is_finished_flag = item.get("is_finished")
percent_complete = item.get("percent_complete")
listening_status = item.get("listening_status")
if isinstance(listening_status, dict):
is_finished_flag = is_finished_flag or listening_status.get(
"is_finished", False
)
if percent_complete is None:
percent_complete = listening_status.get("percent_complete", 0)
return bool(is_finished_flag) or (
isinstance(percent_complete, (int, float)) and percent_complete >= 100
)

View File

@@ -0,0 +1,165 @@
"""Library page fetching helpers for the Audible API client."""
from __future__ import annotations
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any
from ..types import LibraryItem, StatusCallback
class LibraryClientFetchMixin:
"""Fetches all library items from paginated Audible endpoints."""
client: Any
def fetch_all_items(
self, on_progress: StatusCallback | None = None
) -> list[LibraryItem]:
"""Fetch all library items from the API."""
response_groups = "contributors,product_attrs,product_desc,is_finished,listening_status,percent_complete"
return self._fetch_all_pages(response_groups, on_progress)
def _fetch_page(
self,
page: int,
page_size: int,
response_groups: str,
) -> tuple[int, list[LibraryItem]]:
"""Fetch one library page and return its index with items."""
library = self.client.get(
path="library",
num_results=page_size,
page=page,
response_groups=response_groups,
)
items = library.get("items", [])
return page, list(items)
def _fetch_all_pages(
self,
response_groups: str,
on_progress: StatusCallback | None = None,
) -> list[LibraryItem]:
"""Fetch all library pages using parallel requests after page one."""
library_response = None
page_size = 200
for attempt_size in [200, 100, 50]:
try:
library_response = self.client.get(
path="library",
num_results=attempt_size,
page=1,
response_groups=response_groups,
)
page_size = attempt_size
break
except Exception:
continue
if not library_response:
return []
first_page_items = library_response.get("items", [])
if not first_page_items:
return []
all_items: list[LibraryItem] = list(first_page_items)
if on_progress:
on_progress(f"Fetched page 1 ({len(first_page_items)} items)...")
if len(first_page_items) < page_size:
return all_items
estimated_pages = self._estimate_total_pages(library_response, page_size)
page_results = self._fetch_remaining_pages(
response_groups=response_groups,
page_size=page_size,
estimated_pages=estimated_pages,
initial_total=len(first_page_items),
on_progress=on_progress,
)
for page_num in sorted(page_results.keys()):
all_items.extend(page_results[page_num])
return all_items
def _estimate_total_pages(self, library_response: dict, page_size: int) -> int:
"""Estimate total pages from API metadata with a conservative cap."""
total_items_estimate = library_response.get(
"total_results"
) or library_response.get("total")
if not total_items_estimate:
return 500
estimated_pages = (total_items_estimate + page_size - 1) // page_size
return min(estimated_pages, 1000)
def _fetch_remaining_pages(
self,
response_groups: str,
page_size: int,
estimated_pages: int,
initial_total: int,
on_progress: StatusCallback | None = None,
) -> dict[int, list[LibraryItem]]:
"""Fetch pages 2..N with bounded in-flight requests for faster startup."""
page_results: dict[int, list[LibraryItem]] = {}
max_workers = min(16, max(1, estimated_pages - 1))
next_page_to_submit = 2
stop_page = estimated_pages + 1
completed_count = 0
total_items = initial_total
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_page: dict = {}
while (
next_page_to_submit <= estimated_pages
and next_page_to_submit < stop_page
and len(future_to_page) < max_workers
):
future = executor.submit(
self._fetch_page,
next_page_to_submit,
page_size,
response_groups,
)
future_to_page[future] = next_page_to_submit
next_page_to_submit += 1
while future_to_page:
future = next(as_completed(future_to_page))
page_num = future_to_page.pop(future)
try:
fetched_page, items = future.result()
except Exception:
continue
if items:
page_results[fetched_page] = items
total_items += len(items)
completed_count += 1
if on_progress and completed_count % 20 == 0:
on_progress(
f"Fetched {completed_count} pages ({total_items} items)..."
)
if len(items) < page_size:
stop_page = min(stop_page, fetched_page)
while (
next_page_to_submit <= estimated_pages
and next_page_to_submit < stop_page
and len(future_to_page) < max_workers
):
next_future = executor.submit(
self._fetch_page,
next_page_to_submit,
page_size,
response_groups,
)
future_to_page[next_future] = next_page_to_submit
next_page_to_submit += 1
return page_results

View File

@@ -0,0 +1,70 @@
"""Helpers for marking content as finished through Audible APIs."""
from __future__ import annotations
from typing import Any
from ..types import LibraryItem
class LibraryClientFinishedMixin:
"""Marks titles as finished and mutates in-memory item state."""
client: Any
def mark_as_finished(self, asin: str, item: LibraryItem | None = None) -> bool:
"""Mark a book as finished on Audible and optionally update item state."""
total_ms = self._get_runtime_ms(asin, item)
if not total_ms:
return False
acr = self._get_acr(asin)
if not acr:
return False
try:
self.client.put(
path=f"1.0/lastpositions/{asin}",
body={"asin": asin, "acr": acr, "position_ms": total_ms},
)
if item:
item["is_finished"] = True
listening_status = item.get("listening_status", {})
if isinstance(listening_status, dict):
listening_status["is_finished"] = True
return True
except Exception:
return False
def _get_runtime_ms(self, asin: str, item: LibraryItem | None = None) -> int | None:
"""Return total runtime in milliseconds from item or metadata endpoint."""
if item:
extract_runtime_minutes = getattr(self, "extract_runtime_minutes")
runtime_min = extract_runtime_minutes(item)
if runtime_min:
return runtime_min * 60 * 1000
try:
response = self.client.get(
path=f"1.0/content/{asin}/metadata",
response_groups="chapter_info",
)
chapter_info = response.get("content_metadata", {}).get("chapter_info", {})
return chapter_info.get("runtime_length_ms")
except Exception:
return None
def _get_acr(self, asin: str) -> str | None:
"""Fetch the ACR token required by finish/update write operations."""
try:
response = self.client.post(
path=f"1.0/content/{asin}/licenserequest",
body={
"response_groups": "content_reference",
"consumption_type": "Download",
"drm_type": "Adrm",
},
)
return response.get("content_license", {}).get("acr")
except Exception:
return None

View File

@@ -0,0 +1,37 @@
"""Formatting helpers exposed by the library client."""
from __future__ import annotations
class LibraryClientFormatMixin:
"""Formats durations and timestamps for display usage."""
@staticmethod
def format_duration(
value: int | None,
unit: str = "minutes",
default_none: str | None = None,
) -> str | None:
"""Format duration values as compact hour-minute strings."""
if value is None or value <= 0:
return default_none
total_minutes = int(value)
if unit == "seconds":
total_minutes //= 60
hours, minutes = divmod(total_minutes, 60)
if hours > 0:
return f"{hours}h{minutes:02d}" if minutes else f"{hours}h"
return f"{minutes}m"
@staticmethod
def format_time(seconds: float) -> str:
"""Format seconds as HH:MM:SS or MM:SS for display."""
total_seconds = int(seconds)
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
secs = total_seconds % 60
if hours > 0:
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
return f"{minutes:02d}:{secs:02d}"

View File

@@ -0,0 +1,85 @@
"""Playback position read and write helpers for library content."""
from __future__ import annotations
from typing import Any
class LibraryClientPositionsMixin:
"""Handles last-position retrieval and persistence."""
client: Any
def get_last_position(self, asin: str) -> float | None:
"""Get the last playback position for a book in seconds."""
try:
response = self.client.get(
path="1.0/annotations/lastpositions",
asins=asin,
)
annotations = response.get("asin_last_position_heard_annots", [])
for annotation in annotations:
if annotation.get("asin") != asin:
continue
last_position_heard = annotation.get("last_position_heard", {})
if not isinstance(last_position_heard, dict):
continue
if last_position_heard.get("status") == "DoesNotExist":
return None
position_ms = last_position_heard.get("position_ms")
if position_ms is not None:
return float(position_ms) / 1000.0
return None
except (OSError, ValueError, KeyError):
return None
def _get_content_reference(self, asin: str) -> dict | None:
"""Fetch content reference payload used by position update calls."""
try:
response = self.client.get(
path=f"1.0/content/{asin}/metadata",
response_groups="content_reference",
)
content_metadata = response.get("content_metadata", {})
content_reference = content_metadata.get("content_reference", {})
if isinstance(content_reference, dict):
return content_reference
return None
except (OSError, ValueError, KeyError):
return None
def _update_position(self, asin: str, position_seconds: float) -> bool:
"""Persist playback position to the API and return success state."""
if position_seconds < 0:
return False
content_ref = self._get_content_reference(asin)
if not content_ref:
return False
acr = content_ref.get("acr")
if not acr:
return False
body = {
"acr": acr,
"asin": asin,
"position_ms": int(position_seconds * 1000),
}
if version := content_ref.get("version"):
body["version"] = version
try:
self.client.put(
path=f"1.0/lastpositions/{asin}",
body=body,
)
return True
except (OSError, ValueError, KeyError):
return False
def save_last_position(self, asin: str, position_seconds: float) -> bool:
"""Save playback position to Audible and return success state."""
if position_seconds <= 0:
return False
return self._update_position(asin, position_seconds)

View File

@@ -45,12 +45,16 @@ class ControllerLifecycleMixin(ControllerStateMixin):
try: try:
proc, return_code = process_mod.run_ffplay(cmd) proc, return_code = process_mod.run_ffplay(cmd)
if proc is None: if proc is None:
if return_code == 0 and start_position > 0 and self.total_duration and start_position >= self.total_duration - 5: if (
return_code == 0
and start_position > 0
and self.total_duration
and start_position >= self.total_duration - 5
):
notify("Reached end of file") notify("Reached end of file")
self._reset_state() self._reset_state()
return False return False
notify( notify(f"Playback process exited immediately (code: {return_code})")
f"Playback process exited immediately (code: {return_code})")
return False return False
self.playback_process = proc self.playback_process = proc
self.is_playing = True self.is_playing = True
@@ -114,6 +118,8 @@ class ControllerLifecycleMixin(ControllerStateMixin):
download_manager: DownloadManager, download_manager: DownloadManager,
asin: str, asin: str,
status_callback: StatusCallback | None = None, status_callback: StatusCallback | None = None,
preferred_title: str | None = None,
preferred_author: str | None = None,
) -> bool: ) -> bool:
"""Download AAX if needed, get activation bytes, then start playback. Returns True on success.""" """Download AAX if needed, get activation bytes, then start playback. Returns True on success."""
notify = status_callback or self.notify notify = status_callback or self.notify
@@ -121,7 +127,12 @@ class ControllerLifecycleMixin(ControllerStateMixin):
notify("Could not download file") notify("Could not download file")
return False return False
notify("Preparing playback...") notify("Preparing playback...")
local_path = download_manager.get_or_download(asin, notify) local_path = download_manager.get_or_download(
asin,
notify,
preferred_title=preferred_title,
preferred_author=preferred_author,
)
if not local_path: if not local_path:
notify("Could not download file") notify("Could not download file")
return False return False
@@ -136,14 +147,15 @@ class ControllerLifecycleMixin(ControllerStateMixin):
last = self.library_client.get_last_position(asin) last = self.library_client.get_last_position(asin)
if last is not None and last > 0: if last is not None and last > 0:
start_position = last start_position = last
notify( notify(f"Resuming from {LibraryClient.format_time(start_position)}")
f"Resuming from {LibraryClient.format_time(start_position)}")
except (OSError, ValueError, KeyError): except (OSError, ValueError, KeyError):
pass pass
notify(f"Starting playback of {local_path.name}...") notify(f"Starting playback of {local_path.name}...")
self.current_asin = asin self.current_asin = asin
self.last_save_time = time.time() self.last_save_time = time.time()
return self.start(local_path, activation_hex, notify, start_position, self.playback_speed) return self.start(
local_path, activation_hex, notify, start_position, self.playback_speed
)
def toggle_playback(self) -> bool: def toggle_playback(self) -> bool:
"""Toggle between pause and resume. Returns True if an action was performed.""" """Toggle between pause and resume. Returns True if an action was performed."""
@@ -160,7 +172,10 @@ class ControllerLifecycleMixin(ControllerStateMixin):
return True return True
def _restart_at_position( def _restart_at_position(
self, new_position: float, new_speed: float | None = None, message: str | None = None self,
new_position: float,
new_speed: float | None = None,
message: str | None = None,
) -> bool: ) -> bool:
"""Stop current process and start again at new_position; optionally set speed and notify.""" """Stop current process and start again at new_position; optionally set speed and notify."""
if not self.is_playing or not self.current_file_path: if not self.is_playing or not self.current_file_path:
@@ -170,7 +185,9 @@ class ControllerLifecycleMixin(ControllerStateMixin):
speed = new_speed if new_speed is not None else saved["speed"] speed = new_speed if new_speed is not None else saved["speed"]
self._stop_process() self._stop_process()
time.sleep(0.2) time.sleep(0.2)
if self.start(saved["file_path"], saved["activation"], self.notify, new_position, speed): if self.start(
saved["file_path"], saved["activation"], self.notify, new_position, speed
):
self.current_asin = saved["asin"] self.current_asin = saved["asin"]
self.total_duration = saved["duration"] self.total_duration = saved["duration"]
self.chapters = saved["chapters"] self.chapters = saved["chapters"]

View File

@@ -0,0 +1,52 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, cast
from auditui.app.actions import AppActionsMixin
@dataclass(slots=True)
class FakeTable:
"""Minimal table shim exposing cursor and row count."""
row_count: int
cursor_row: int = 0
class DummyActionsApp(AppActionsMixin):
"""Minimal app host used for download naming hint tests."""
def __init__(self) -> None:
"""Initialize state required by action helpers."""
self.current_items: list[dict] = []
self.download_manager = object()
self.library_client = type(
"Library", (), {"extract_asin": lambda self, item: item.get("asin")}
)()
self._table = FakeTable(row_count=0, cursor_row=0)
def update_status(self, message: str) -> None:
"""Ignore status in this focused behavior test."""
del message
def query_one(self, selector: str, _type: object) -> FakeTable:
"""Return the fake table used in selection tests."""
assert selector == "#library_table"
return self._table
def test_action_toggle_download_passes_selected_item() -> None:
"""Ensure download toggle forwards selected item for naming hints."""
app = DummyActionsApp()
seen: list[tuple[str, str | None]] = []
def capture_toggle(asin: str, item: dict | None = None) -> None:
"""Capture download toggle arguments for assertions."""
seen.append((asin, item.get("title") if item else None))
setattr(cast(Any, app), "_toggle_download_async", capture_toggle)
app._table = FakeTable(row_count=1, cursor_row=0)
app.current_items = [{"asin": "ASIN", "title": "Book"}]
app.action_toggle_download()
assert seen == [("ASIN", "Book")]

View File

@@ -1,6 +1,7 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any, cast
from auditui.app.actions import AppActionsMixin from auditui.app.actions import AppActionsMixin
@@ -41,7 +42,13 @@ class DummyActionsApp(AppActionsMixin):
self.current_items: list[dict] = [] self.current_items: list[dict] = []
self.download_manager = object() self.download_manager = object()
self.library_client = type( self.library_client = type(
"Library", (), {"extract_asin": lambda self, item: item.get("asin")} "Library",
(),
{
"extract_asin": lambda self, item: item.get("asin"),
"extract_title": lambda self, item: item.get("title"),
"extract_authors": lambda self, item: item.get("authors"),
},
)() )()
self.playback = FakePlayback(True) self.playback = FakePlayback(True)
self.filter_text = "hello" self.filter_text = "hello"
@@ -61,10 +68,6 @@ class DummyActionsApp(AppActionsMixin):
"""Record refresh invocations for filter tests.""" """Record refresh invocations for filter tests."""
self._refreshed += 1 self._refreshed += 1
def _start_playback_async(self, asin: str) -> None:
"""Capture async playback launch argument."""
self.messages.append(f"start:{asin}")
def test_get_selected_asin_requires_non_empty_table() -> None: def test_get_selected_asin_requires_non_empty_table() -> None:
"""Ensure selection fails gracefully when table has no rows.""" """Ensure selection fails gracefully when table has no rows."""
@@ -85,10 +88,18 @@ def test_get_selected_asin_returns_current_row_asin() -> None:
def test_action_play_selected_starts_async_playback() -> None: def test_action_play_selected_starts_async_playback() -> None:
"""Ensure play action calls async starter with selected ASIN.""" """Ensure play action calls async starter with selected ASIN."""
app = DummyActionsApp() app = DummyActionsApp()
seen: list[str] = []
def capture_start(asin: str, item: dict | None = None) -> None:
"""Capture playback start arguments for assertions."""
suffix = f":{item.get('title')}" if item else ""
seen.append(f"start:{asin}{suffix}")
setattr(cast(Any, app), "_start_playback_async", capture_start)
app._table = FakeTable(row_count=1, cursor_row=0) app._table = FakeTable(row_count=1, cursor_row=0)
app.current_items = [{"asin": "ASIN"}] app.current_items = [{"asin": "ASIN", "title": "Book"}]
app.action_play_selected() app.action_play_selected()
assert app.messages[-1] == "start:ASIN" assert seen[-1] == "start:ASIN:Book"
def test_action_toggle_playback_shows_hint_when_no_playback() -> None: def test_action_toggle_playback_shows_hint_when_no_playback() -> None:

View File

@@ -0,0 +1,34 @@
from __future__ import annotations
from auditui.app.table import AppTableMixin
class DummyTableApp(AppTableMixin):
"""Minimal host exposing library client for row key helper tests."""
def __init__(self) -> None:
"""Initialize a fake library client with ASIN extraction."""
self.library_client = type(
"Library",
(),
{"extract_asin": lambda self, item: item.get("asin")},
)()
def test_build_row_key_prefers_asin_and_remains_unique() -> None:
"""Ensure duplicate ASINs receive deterministic unique key suffixes."""
app = DummyTableApp()
used: set[str] = set()
item = {"asin": "ASIN1"}
first = app._build_row_key(item, "Title", 0, used)
second = app._build_row_key(item, "Title", 1, used)
assert first == "ASIN1"
assert second == "ASIN1#2"
def test_build_row_key_falls_back_to_title_and_index() -> None:
"""Ensure missing ASIN values use title-index fallback keys."""
app = DummyTableApp()
used: set[str] = set()
key = app._build_row_key({"asin": None}, "Unknown Title", 3, used)
assert key == "Unknown Title#3"

View File

@@ -1,6 +1,7 @@
from __future__ import annotations from __future__ import annotations
from pathlib import Path from pathlib import Path
from typing import Any, cast
import pytest import pytest
@@ -17,9 +18,11 @@ def _manager_with_cache_dir(tmp_path: Path) -> DownloadManager:
def test_sanitize_filename_replaces_invalid_characters() -> None: def test_sanitize_filename_replaces_invalid_characters() -> None:
"""Ensure filesystem-invalid symbols are replaced with underscores.""" """Ensure filename normalization uses ASCII words and dashes."""
manager = DownloadManager.__new__(DownloadManager) manager = DownloadManager.__new__(DownloadManager)
assert manager._sanitize_filename('a<>:"/\\|?*b') == "a_________b" assert (
manager._sanitize_filename("Stephen King 11/22/63") == "Stephen-King-11-22-63"
)
def test_validate_download_url_accepts_only_http_schemes() -> None: def test_validate_download_url_accepts_only_http_schemes() -> None:
@@ -35,8 +38,12 @@ def test_get_cached_path_and_remove_cached(
) -> None: ) -> None:
"""Ensure cache lookup and cache deletion work for valid files.""" """Ensure cache lookup and cache deletion work for valid files."""
manager = _manager_with_cache_dir(tmp_path) manager = _manager_with_cache_dir(tmp_path)
monkeypatch.setattr(manager, "_get_name_from_asin", lambda asin: "My Book") monkeypatch.setattr(
cached_path = tmp_path / "My Book.aax" manager,
"_get_filename_stems_from_asin",
lambda asin: ["Stephen-King_11-22-63", "11-22-63"],
)
cached_path = tmp_path / "Stephen-King_11-22-63.aax"
cached_path.write_bytes(b"0" * MIN_FILE_SIZE) cached_path.write_bytes(b"0" * MIN_FILE_SIZE)
messages: list[str] = [] messages: list[str] = []
assert manager.get_cached_path("ASIN123") == cached_path assert manager.get_cached_path("ASIN123") == cached_path
@@ -51,7 +58,34 @@ def test_get_cached_path_ignores_small_files(
) -> None: ) -> None:
"""Ensure undersized files are not treated as valid cache entries.""" """Ensure undersized files are not treated as valid cache entries."""
manager = _manager_with_cache_dir(tmp_path) manager = _manager_with_cache_dir(tmp_path)
monkeypatch.setattr(manager, "_get_name_from_asin", lambda asin: "My Book") monkeypatch.setattr(
cached_path = tmp_path / "My Book.aax" manager,
"_get_filename_stems_from_asin",
lambda asin: ["Stephen-King_11-22-63", "11-22-63"],
)
cached_path = tmp_path / "Stephen-King_11-22-63.aax"
cached_path.write_bytes(b"0" * (MIN_FILE_SIZE - 1)) cached_path.write_bytes(b"0" * (MIN_FILE_SIZE - 1))
assert manager.get_cached_path("ASIN123") is None assert manager.get_cached_path("ASIN123") is None
def test_get_filename_stems_include_author_title_and_legacy_title() -> None:
"""Ensure filename candidates include new author_title and legacy title names."""
manager = DownloadManager.__new__(DownloadManager)
manager.client = cast(
Any,
type(
"Client",
(),
{
"get": lambda self, path, **kwargs: {
"product": {
"title": "11/22/63",
"authors": [{"name": "Stephen King"}],
}
}
},
)(),
)
stems = manager._get_filename_stems_from_asin("B00TEST")
assert stems[0] == "Stephen-King_11-22-63"
assert "11-22-63" in stems

View File

@@ -1,6 +1,7 @@
from __future__ import annotations from __future__ import annotations
from pathlib import Path from pathlib import Path
from typing import Any, cast
import pytest import pytest
@@ -14,9 +15,14 @@ def _bare_manager(tmp_path: Path) -> DownloadManager:
manager = DownloadManager.__new__(DownloadManager) manager = DownloadManager.__new__(DownloadManager)
manager.cache_dir = tmp_path manager.cache_dir = tmp_path
manager.chunk_size = 1024 manager.chunk_size = 1024
manager.auth = type( manager.auth = cast(
"Auth", (), {"adp_token": "x", "locale": type("Loc", (), {"domain": "fr"})()} Any,
)() type(
"Auth",
(),
{"adp_token": "x", "locale": type("Loc", (), {"domain": "fr"})()},
)(),
)
return manager return manager
@@ -48,8 +54,12 @@ def test_get_or_download_uses_cached_file_when_available(
) -> None: ) -> None:
"""Ensure cached files bypass link generation and download work.""" """Ensure cached files bypass link generation and download work."""
manager = _bare_manager(tmp_path) manager = _bare_manager(tmp_path)
monkeypatch.setattr(manager, "_get_name_from_asin", lambda asin: "Book") monkeypatch.setattr(
cached_path = tmp_path / "Book.aax" manager,
"_get_filename_stems_from_asin",
lambda asin, preferred_title=None, preferred_author=None: ["Author_Book"],
)
cached_path = tmp_path / "Author_Book.aax"
cached_path.write_bytes(b"1" * MIN_FILE_SIZE) cached_path.write_bytes(b"1" * MIN_FILE_SIZE)
messages: list[str] = [] messages: list[str] = []
assert manager.get_or_download("ASIN", notify=messages.append) == cached_path assert manager.get_or_download("ASIN", notify=messages.append) == cached_path
@@ -61,7 +71,11 @@ def test_get_or_download_reports_invalid_url(
) -> None: ) -> None:
"""Ensure workflow reports invalid download URLs and aborts.""" """Ensure workflow reports invalid download URLs and aborts."""
manager = _bare_manager(tmp_path) manager = _bare_manager(tmp_path)
monkeypatch.setattr(manager, "_get_name_from_asin", lambda asin: "Book") monkeypatch.setattr(
manager,
"_get_filename_stems_from_asin",
lambda asin, preferred_title=None, preferred_author=None: ["Author_Book"],
)
monkeypatch.setattr( monkeypatch.setattr(
manager, "_get_download_link", lambda asin, notify=None: "ftp://bad" manager, "_get_download_link", lambda asin, notify=None: "ftp://bad"
) )
@@ -75,7 +89,11 @@ def test_get_or_download_handles_download_failure(
) -> None: ) -> None:
"""Ensure workflow reports failures when stream download does not complete.""" """Ensure workflow reports failures when stream download does not complete."""
manager = _bare_manager(tmp_path) manager = _bare_manager(tmp_path)
monkeypatch.setattr(manager, "_get_name_from_asin", lambda asin: "Book") monkeypatch.setattr(
manager,
"_get_filename_stems_from_asin",
lambda asin, preferred_title=None, preferred_author=None: ["Author_Book"],
)
monkeypatch.setattr( monkeypatch.setattr(
manager, "_get_download_link", lambda asin, notify=None: "https://ok" manager, "_get_download_link", lambda asin, notify=None: "https://ok"
) )
@@ -83,3 +101,60 @@ def test_get_or_download_handles_download_failure(
messages: list[str] = [] messages: list[str] = []
assert manager.get_or_download("ASIN", notify=messages.append) is None assert manager.get_or_download("ASIN", notify=messages.append) is None
assert "Download failed" in messages assert "Download failed" in messages
def test_get_or_download_uses_preferred_naming_hints(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Ensure preferred title/author are forwarded to filename stem selection."""
manager = _bare_manager(tmp_path)
captured: list[tuple[str | None, str | None]] = []
def stems(
asin: str,
preferred_title: str | None = None,
preferred_author: str | None = None,
) -> list[str]:
"""Capture naming hints and return one deterministic filename stem."""
del asin
captured.append((preferred_title, preferred_author))
return ["Author_Book"]
monkeypatch.setattr(manager, "_get_filename_stems_from_asin", stems)
monkeypatch.setattr(manager, "_get_download_link", lambda asin, notify=None: None)
manager.get_or_download(
"ASIN",
preferred_title="11/22/63",
preferred_author="Stephen King",
)
assert captured == [("11/22/63", "Stephen King")]
def test_get_or_download_retries_when_file_is_too_small(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Ensure small downloads are retried and then reported with exact byte size."""
manager = _bare_manager(tmp_path)
monkeypatch.setattr(
manager,
"_get_filename_stems_from_asin",
lambda asin, preferred_title=None, preferred_author=None: ["Author_Book"],
)
monkeypatch.setattr(
manager, "_get_download_link", lambda asin, notify=None: "https://ok"
)
attempts = {"count": 0}
def write_small_file(url: str, path: Path, notify=None) -> Path:
"""Write an undersized file to trigger retry and final failure messages."""
del url, notify
attempts["count"] += 1
path.write_bytes(b"x" * 100)
return path
monkeypatch.setattr(manager, "_download_file", write_small_file)
messages: list[str] = []
assert manager.get_or_download("ASIN", notify=messages.append) is None
assert attempts["count"] == 2
assert any("retrying" in message for message in messages)
assert any("file too small" in message for message in messages)

View File

@@ -1,6 +1,7 @@
from __future__ import annotations from __future__ import annotations
from pathlib import Path from pathlib import Path
from typing import Any, cast
from auditui.playback import controller_lifecycle as lifecycle_mod from auditui.playback import controller_lifecycle as lifecycle_mod
from auditui.playback.controller import PlaybackController from auditui.playback.controller import PlaybackController
@@ -62,14 +63,21 @@ def test_prepare_and_start_uses_last_position(monkeypatch) -> None:
"""Ensure prepare flow resumes from saved position when available.""" """Ensure prepare flow resumes from saved position when available."""
messages: list[str] = [] messages: list[str] = []
lib = type("Lib", (), {"get_last_position": lambda self, asin: 75.0})() lib = type("Lib", (), {"get_last_position": lambda self, asin: 75.0})()
controller = PlaybackController(messages.append, lib) controller = PlaybackController(messages.append, cast(Any, lib))
started: list[tuple] = [] started: list[tuple] = []
class DM: class DM:
"""Download manager shim returning path and activation token.""" """Download manager shim returning path and activation token."""
def get_or_download(self, asin, notify): def get_or_download(
self,
asin,
notify,
preferred_title: str | None = None,
preferred_author: str | None = None,
):
"""Return deterministic downloaded file path.""" """Return deterministic downloaded file path."""
del asin, notify, preferred_title, preferred_author
return Path("book.aax") return Path("book.aax")
def get_activation_bytes(self): def get_activation_bytes(self):
@@ -78,7 +86,7 @@ def test_prepare_and_start_uses_last_position(monkeypatch) -> None:
monkeypatch.setattr(controller, "start", lambda *args: started.append(args) or True) monkeypatch.setattr(controller, "start", lambda *args: started.append(args) or True)
monkeypatch.setattr(lifecycle_mod.time, "time", lambda: 200.0) monkeypatch.setattr(lifecycle_mod.time, "time", lambda: 200.0)
assert controller.prepare_and_start(DM(), "ASIN") is True assert controller.prepare_and_start(cast(Any, DM()), "ASIN") is True
assert started and started[0][3] == 75.0 assert started and started[0][3] == 75.0
assert "Resuming from 01:15" in messages assert "Resuming from 01:15" in messages
@@ -87,7 +95,7 @@ def test_toggle_playback_uses_pause_and_resume_paths(monkeypatch) -> None:
"""Ensure toggle dispatches pause or resume based on paused flag.""" """Ensure toggle dispatches pause or resume based on paused flag."""
controller, _ = _controller() controller, _ = _controller()
controller.is_playing = True controller.is_playing = True
controller.playback_process = Proc(None) controller.playback_process = cast(Any, Proc(None))
called: list[str] = [] called: list[str] = []
monkeypatch.setattr(controller, "pause", lambda: called.append("pause")) monkeypatch.setattr(controller, "pause", lambda: called.append("pause"))
monkeypatch.setattr(controller, "resume", lambda: called.append("resume")) monkeypatch.setattr(controller, "resume", lambda: called.append("resume"))