Compare commits
29 Commits
e88dcee155
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 1e6f1544db | |||
| 26cba97cbd | |||
| 175bb7cbdc | |||
| bf0e70e9d9 | |||
| cb4104e59a | |||
| 570639e988 | |||
| 5ba0fafbc1 | |||
| bed0ac4fea | |||
| 0a909484e3 | |||
| ecdd953ff4 | |||
| 4ba2c43c93 | |||
| 4b1924edd8 | |||
| da20e84513 | |||
| dcb43f65dd | |||
| beca8ee085 | |||
| e813267d5e | |||
| eca58423dc | |||
| 307368480a | |||
| a8add30928 | |||
| 3e6e31c2db | |||
| 6335f8bbac | |||
| 0cf2644f55 | |||
| 597e82dc20 | |||
| 25d56cf407 | |||
| 76c991600c | |||
| 95e641a527 | |||
| 8f8cdf7bfa | |||
| 9c19891443 | |||
| 01de75871a |
18
CHANGELOG.md
18
CHANGELOG.md
@@ -5,6 +5,24 @@ All notable changes to this project will be documented in this file.
|
||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [0.2.0] - 2026-02-18
|
||||
|
||||
### Changed
|
||||
|
||||
- massive code refactoring
|
||||
- complete test suite revamp
|
||||
- updated download cache naming to use `Author_Title` format with normalized separators
|
||||
- optimized library pagination fetch with bounded concurrent scheduling
|
||||
- adjusted library first-page probe order to prefer larger page sizes for medium libraries
|
||||
- removed eager search cache priming during library load to reduce startup work
|
||||
|
||||
### Fixed
|
||||
|
||||
- reused library metadata for download filename generation to avoid `Unknown-Author_Unknown-Title` when title/author are already known in the UI
|
||||
- fixed Audible last-position request parameter handling after library client refactor
|
||||
- added retry behavior and explicit size diagnostics when downloaded files are too small
|
||||
- prevented table rendering crashes by generating unique row keys instead of using title-only keys
|
||||
|
||||
## [0.1.6] - 2026-02-16
|
||||
|
||||
### Changed
|
||||
|
||||
10
README.md
10
README.md
@@ -2,7 +2,7 @@
|
||||
|
||||
A terminal-based user interface (TUI) client for [Audible](https://www.audible.fr/), written in Python 3.
|
||||
|
||||
Currently, the only available theme is Catppuccin Mocha, following their [style guide](https://github.com/catppuccin/catppuccin/blob/main/docs/style-guide.md), as it's my preferred theme across most of my tools.
|
||||
The interface currently ships with a single built-in theme.
|
||||
|
||||
## Requirements
|
||||
|
||||
@@ -36,9 +36,9 @@ auditui --version
|
||||
|
||||
All set, run `auditui configure` to set up authentication, and then `auditui` to start the TUI.
|
||||
|
||||
### Workaround for Python 3.13 linux distribution
|
||||
### Workaround for Python 3.13 Linux distributions
|
||||
|
||||
On some Linux distributions, Python 3.13 is already the default. So you have to install Python 3.12 manually before using `pipx`.
|
||||
On some Linux distributions, Python 3.13 is already the default. In that case, install Python 3.12 manually before using `pipx`.
|
||||
|
||||
For Arch Linux:
|
||||
|
||||
@@ -52,7 +52,7 @@ Once you have Python 3.12, run:
|
||||
pipx install git+https://git.kharec.info/Kharec/auditui.git --python python3.12
|
||||
```
|
||||
|
||||
As Python <3.14 is supported on `master` branch of the upstream [`audible`](https://github.com/mkb79/Audible), this should be temporary until the next version.
|
||||
This workaround is temporary and depends on upstream `audible` compatibility updates.
|
||||
|
||||
## Upgrade
|
||||
|
||||
@@ -90,6 +90,8 @@ pipx upgrade auditui
|
||||
|
||||
Books are downloaded to `~/.cache/auditui/books`.
|
||||
|
||||
Downloaded files use a normalized `Author_Title.aax` naming format. For example, `Stephen King` and `11/22/63` become `Stephen-King_11-22-63.aax`.
|
||||
|
||||
The `d` key toggles the download state for the selected book: if the book is not cached, pressing `d` will download it; if it's already cached, pressing `d` will delete it from the cache.
|
||||
|
||||
To check the total size of your cache:
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
"""Auditui: Audible TUI client. One folder per module; all code lives inside module packages."""
|
||||
"""Auditui: Audible TUI client"""
|
||||
|
||||
__version__ = "0.1.6"
|
||||
__version__ = "0.2.0"
|
||||
|
||||
@@ -10,11 +10,8 @@ from ..ui import FilterScreen, HelpScreen, StatsScreen
|
||||
|
||||
|
||||
class AppActionsMixin:
|
||||
def _get_selected_asin(self) -> str | None:
|
||||
if not self.download_manager:
|
||||
self.update_status(
|
||||
"Not authenticated. Please restart and authenticate.")
|
||||
return None
|
||||
def _get_selected_item(self) -> dict | None:
|
||||
"""Return the currently selected library item from the table."""
|
||||
table = self.query_one("#library_table", DataTable)
|
||||
if table.row_count == 0:
|
||||
self.update_status("No books available")
|
||||
@@ -23,10 +20,27 @@ class AppActionsMixin:
|
||||
if cursor_row >= len(self.current_items):
|
||||
self.update_status("Invalid selection")
|
||||
return None
|
||||
return self.current_items[cursor_row]
|
||||
|
||||
def _get_naming_hints(self, item: dict | None) -> tuple[str | None, str | None]:
|
||||
"""Return preferred title and author values used for download filenames."""
|
||||
if not item or not self.library_client:
|
||||
return (None, None)
|
||||
return (
|
||||
self.library_client.extract_title(item),
|
||||
self.library_client.extract_authors(item),
|
||||
)
|
||||
|
||||
def _get_selected_asin(self) -> str | None:
|
||||
if not self.download_manager:
|
||||
self.update_status("Not authenticated. Please restart and authenticate.")
|
||||
return None
|
||||
if not self.library_client:
|
||||
self.update_status("Library client not available")
|
||||
return None
|
||||
selected_item = self.current_items[cursor_row]
|
||||
selected_item = self._get_selected_item()
|
||||
if not selected_item:
|
||||
return None
|
||||
asin = self.library_client.extract_asin(selected_item)
|
||||
if not asin:
|
||||
self.update_status("Could not get ASIN for selected book")
|
||||
@@ -36,7 +50,7 @@ class AppActionsMixin:
|
||||
def action_play_selected(self) -> None:
|
||||
asin = self._get_selected_asin()
|
||||
if asin:
|
||||
self._start_playback_async(asin)
|
||||
self._start_playback_async(asin, self._get_selected_item())
|
||||
|
||||
def action_toggle_playback(self) -> None:
|
||||
if not self.playback.toggle_playback():
|
||||
@@ -86,8 +100,7 @@ class AppActionsMixin:
|
||||
return
|
||||
|
||||
if self.library_client.is_finished(selected_item):
|
||||
self.call_from_thread(self.update_status,
|
||||
"Already marked as finished")
|
||||
self.call_from_thread(self.update_status, "Already marked as finished")
|
||||
return
|
||||
|
||||
success = self.library_client.mark_as_finished(asin, selected_item)
|
||||
@@ -132,28 +145,36 @@ class AppActionsMixin:
|
||||
def action_toggle_download(self) -> None:
|
||||
asin = self._get_selected_asin()
|
||||
if asin:
|
||||
self._toggle_download_async(asin)
|
||||
self._toggle_download_async(asin, self._get_selected_item())
|
||||
|
||||
@work(exclusive=True, thread=True)
|
||||
def _toggle_download_async(self, asin: str) -> None:
|
||||
def _toggle_download_async(self, asin: str, item: dict | None = None) -> None:
|
||||
if not self.download_manager:
|
||||
return
|
||||
|
||||
preferred_title, preferred_author = self._get_naming_hints(item)
|
||||
|
||||
if self.download_manager.is_cached(asin):
|
||||
self.download_manager.remove_cached(
|
||||
asin, self._thread_status_update)
|
||||
self.download_manager.remove_cached(asin, self._thread_status_update)
|
||||
else:
|
||||
self.download_manager.get_or_download(
|
||||
asin, self._thread_status_update)
|
||||
asin,
|
||||
self._thread_status_update,
|
||||
preferred_title=preferred_title,
|
||||
preferred_author=preferred_author,
|
||||
)
|
||||
|
||||
self.call_from_thread(self._refresh_table)
|
||||
|
||||
@work(exclusive=True, thread=True)
|
||||
def _start_playback_async(self, asin: str) -> None:
|
||||
def _start_playback_async(self, asin: str, item: dict | None = None) -> None:
|
||||
if not self.download_manager:
|
||||
return
|
||||
preferred_title, preferred_author = self._get_naming_hints(item)
|
||||
self.playback.prepare_and_start(
|
||||
self.download_manager,
|
||||
asin,
|
||||
self._thread_status_update,
|
||||
preferred_title,
|
||||
preferred_author,
|
||||
)
|
||||
|
||||
@@ -8,7 +8,7 @@ from textual.events import Resize
|
||||
from textual.widgets import DataTable, ProgressBar, Static
|
||||
|
||||
from .. import __version__
|
||||
from ..constants import TABLE_COLUMN_DEFS, TABLE_CSS
|
||||
from ..constants import TABLE_COLUMN_DEFS
|
||||
|
||||
|
||||
class AppLayoutMixin:
|
||||
|
||||
@@ -16,16 +16,15 @@ class AppLibraryMixin:
|
||||
return
|
||||
|
||||
try:
|
||||
all_items = self.library_client.fetch_all_items(
|
||||
self._thread_status_update)
|
||||
all_items = self.library_client.fetch_all_items(self._thread_status_update)
|
||||
self.call_from_thread(self.on_library_loaded, all_items)
|
||||
except (OSError, ValueError, KeyError) as exc:
|
||||
self.call_from_thread(self.on_library_error, str(exc))
|
||||
|
||||
def on_library_loaded(self, items: list[LibraryItem]) -> None:
|
||||
"""Store fetched items and refresh the active library view."""
|
||||
self.all_items = items
|
||||
self._search_text_cache.clear()
|
||||
self._prime_search_cache(items)
|
||||
self.update_status(f"Loaded {len(items)} books")
|
||||
if self.show_all_mode:
|
||||
self.show_all()
|
||||
|
||||
@@ -15,6 +15,7 @@ from textual.widgets import DataTable, Static
|
||||
|
||||
class AppTableMixin:
|
||||
def _populate_table(self, items: list[LibraryItem]) -> None:
|
||||
"""Render library items into the table with stable unique row keys."""
|
||||
table = self.query_one("#library_table", DataTable)
|
||||
table.clear()
|
||||
|
||||
@@ -22,18 +23,41 @@ class AppTableMixin:
|
||||
self.update_status("No books found.")
|
||||
return
|
||||
|
||||
for item in items:
|
||||
used_keys: set[str] = set()
|
||||
for index, item in enumerate(items):
|
||||
title, author, runtime, progress, downloaded = format_item_as_row(
|
||||
item, self.library_client, self.download_manager
|
||||
)
|
||||
table.add_row(title, author, runtime,
|
||||
progress, downloaded, key=title)
|
||||
row_key = self._build_row_key(item, title, index, used_keys)
|
||||
table.add_row(title, author, runtime, progress, downloaded, key=row_key)
|
||||
|
||||
self.current_items = items
|
||||
status = self.query_one("#status", Static)
|
||||
status.display = False
|
||||
self._apply_column_widths(table)
|
||||
|
||||
def _build_row_key(
|
||||
self,
|
||||
item: LibraryItem,
|
||||
title: str,
|
||||
index: int,
|
||||
used_keys: set[str],
|
||||
) -> str:
|
||||
"""Return a unique table row key derived from ASIN when available."""
|
||||
asin = self.library_client.extract_asin(item) if self.library_client else None
|
||||
base_key = asin or f"{title}#{index}"
|
||||
if base_key not in used_keys:
|
||||
used_keys.add(base_key)
|
||||
return base_key
|
||||
|
||||
suffix = 2
|
||||
candidate = f"{base_key}#{suffix}"
|
||||
while candidate in used_keys:
|
||||
suffix += 1
|
||||
candidate = f"{base_key}#{suffix}"
|
||||
used_keys.add(candidate)
|
||||
return candidate
|
||||
|
||||
def _refresh_table(self) -> None:
|
||||
if self.current_items:
|
||||
self._populate_table(self.current_items)
|
||||
@@ -79,11 +103,9 @@ class AppTableMixin:
|
||||
items = self.all_items
|
||||
|
||||
if self.filter_text:
|
||||
items = filter_items(items, self.filter_text,
|
||||
self._get_search_text)
|
||||
items = filter_items(items, self.filter_text, self._get_search_text)
|
||||
self._populate_table(items)
|
||||
self.update_status(
|
||||
f"Filter: '{self.filter_text}' ({len(items)} books)")
|
||||
self.update_status(f"Filter: '{self.filter_text}' ({len(items)} books)")
|
||||
return
|
||||
|
||||
if not self.show_all_mode and self.library_client:
|
||||
@@ -97,6 +119,7 @@ class AppTableMixin:
|
||||
if cached is not None:
|
||||
return cached
|
||||
from ..library import build_search_text
|
||||
|
||||
search_text = build_search_text(item, self.library_client)
|
||||
self._search_text_cache[cache_key] = search_text
|
||||
return search_text
|
||||
|
||||
@@ -1,278 +1,29 @@
|
||||
"""Paths, API/config values, and CSS used across the application."""
|
||||
"""Compatibility exports for constants grouped by domain modules."""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
AUTH_PATH = Path.home() / ".config" / "auditui" / "auth.json"
|
||||
CONFIG_PATH = Path.home() / ".config" / "auditui" / "config.json"
|
||||
CACHE_DIR = Path.home() / ".cache" / "auditui" / "books"
|
||||
DOWNLOAD_URL = "https://cde-ta-g7g.amazon.com/FionaCDEServiceEngine/FSDownloadContent"
|
||||
DEFAULT_CODEC = "LC_128_44100_stereo"
|
||||
MIN_FILE_SIZE = 1024 * 1024
|
||||
DEFAULT_CHUNK_SIZE = 8192
|
||||
|
||||
TABLE_COLUMN_DEFS = (
|
||||
("Title", 4),
|
||||
("Author", 3),
|
||||
("Length", 1),
|
||||
("Progress", 1),
|
||||
("Downloaded", 1),
|
||||
from .downloads import DEFAULT_CHUNK_SIZE, DEFAULT_CODEC, DOWNLOAD_URL, MIN_FILE_SIZE
|
||||
from .library import (
|
||||
AUTHOR_NAME_DISPLAY_LENGTH,
|
||||
AUTHOR_NAME_MAX_LENGTH,
|
||||
PROGRESS_COLUMN_INDEX,
|
||||
)
|
||||
from .paths import AUTH_PATH, CACHE_DIR, CONFIG_PATH
|
||||
from .playback import SEEK_SECONDS
|
||||
from .table import TABLE_COLUMN_DEFS
|
||||
from .ui import TABLE_CSS
|
||||
|
||||
AUTHOR_NAME_MAX_LENGTH = 40
|
||||
AUTHOR_NAME_DISPLAY_LENGTH = 37
|
||||
PROGRESS_COLUMN_INDEX = 3
|
||||
SEEK_SECONDS = 30.0
|
||||
|
||||
TABLE_CSS = """
|
||||
Screen {
|
||||
background: #141622;
|
||||
}
|
||||
|
||||
#top_bar {
|
||||
background: #10131f;
|
||||
color: #d5d9f0;
|
||||
text-style: bold;
|
||||
height: 1;
|
||||
margin: 0;
|
||||
padding: 0;
|
||||
}
|
||||
|
||||
#top_left,
|
||||
#top_center,
|
||||
#top_right {
|
||||
width: 1fr;
|
||||
padding: 0 1;
|
||||
background: #10131f;
|
||||
margin: 0;
|
||||
}
|
||||
|
||||
#top_left {
|
||||
text-align: left;
|
||||
}
|
||||
|
||||
#top_center {
|
||||
text-align: center;
|
||||
}
|
||||
|
||||
#top_right {
|
||||
text-align: right;
|
||||
}
|
||||
|
||||
DataTable {
|
||||
width: 100%;
|
||||
height: 1fr;
|
||||
background: #141622;
|
||||
color: #c7cfe8;
|
||||
border: solid #262a3f;
|
||||
scrollbar-size-horizontal: 0;
|
||||
}
|
||||
|
||||
DataTable:focus {
|
||||
border: solid #7aa2f7;
|
||||
}
|
||||
|
||||
DataTable > .datatable--header {
|
||||
background: #1b2033;
|
||||
color: #b9c3e3;
|
||||
text-style: bold;
|
||||
}
|
||||
|
||||
DataTable > .datatable--cursor {
|
||||
background: #232842;
|
||||
color: #e6ebff;
|
||||
}
|
||||
|
||||
DataTable > .datatable--odd-row {
|
||||
background: #121422;
|
||||
}
|
||||
|
||||
DataTable > .datatable--even-row {
|
||||
background: #15182a;
|
||||
}
|
||||
|
||||
Static {
|
||||
height: 1;
|
||||
text-align: center;
|
||||
background: #10131f;
|
||||
color: #c7cfe8;
|
||||
}
|
||||
|
||||
Static#status {
|
||||
color: #b6bfdc;
|
||||
}
|
||||
|
||||
Static#progress_info {
|
||||
color: #7aa2f7;
|
||||
text-style: bold;
|
||||
margin: 0;
|
||||
padding: 0;
|
||||
text-align: center;
|
||||
width: 100%;
|
||||
}
|
||||
|
||||
#progress_bar_container {
|
||||
align: center middle;
|
||||
width: 100%;
|
||||
height: 1;
|
||||
}
|
||||
|
||||
ProgressBar#progress_bar {
|
||||
height: 1;
|
||||
background: #10131f;
|
||||
border: none;
|
||||
margin: 0;
|
||||
padding: 0;
|
||||
width: 50%;
|
||||
}
|
||||
|
||||
ProgressBar#progress_bar Bar {
|
||||
width: 100%;
|
||||
}
|
||||
|
||||
ProgressBar#progress_bar > .progress-bar--track {
|
||||
background: #262a3f;
|
||||
}
|
||||
|
||||
ProgressBar#progress_bar > .progress-bar--bar {
|
||||
background: #8bd5ca;
|
||||
}
|
||||
|
||||
HelpScreen,
|
||||
StatsScreen,
|
||||
FilterScreen {
|
||||
align: center middle;
|
||||
background: rgba(0, 0, 0, 0.7);
|
||||
}
|
||||
|
||||
HelpScreen Static,
|
||||
StatsScreen Static,
|
||||
FilterScreen Static {
|
||||
background: transparent;
|
||||
}
|
||||
|
||||
StatsScreen #help_container {
|
||||
width: auto;
|
||||
min-width: 55;
|
||||
max-width: 70;
|
||||
}
|
||||
|
||||
StatsScreen #help_content {
|
||||
align: center middle;
|
||||
width: 100%;
|
||||
}
|
||||
|
||||
StatsScreen .help_list {
|
||||
width: 100%;
|
||||
}
|
||||
|
||||
StatsScreen .help_list > ListItem {
|
||||
background: transparent;
|
||||
height: 1;
|
||||
}
|
||||
|
||||
StatsScreen .help_list > ListItem:hover {
|
||||
background: #232842;
|
||||
}
|
||||
|
||||
StatsScreen .help_list > ListItem > Label {
|
||||
width: 100%;
|
||||
text-align: left;
|
||||
padding-left: 2;
|
||||
}
|
||||
|
||||
#help_container {
|
||||
width: 72%;
|
||||
max-width: 90;
|
||||
min-width: 44;
|
||||
height: auto;
|
||||
max-height: 80%;
|
||||
min-height: 14;
|
||||
background: #181a2a;
|
||||
border: heavy #7aa2f7;
|
||||
padding: 1 1;
|
||||
}
|
||||
|
||||
#help_title {
|
||||
width: 100%;
|
||||
height: 2;
|
||||
text-align: center;
|
||||
text-style: bold;
|
||||
color: #7aa2f7;
|
||||
content-align: center middle;
|
||||
margin-bottom: 0;
|
||||
border-bottom: solid #4b5165;
|
||||
}
|
||||
|
||||
#help_content {
|
||||
width: 100%;
|
||||
height: auto;
|
||||
padding: 0;
|
||||
margin: 0 0 1 0;
|
||||
align: center middle;
|
||||
}
|
||||
|
||||
.help_list {
|
||||
width: 100%;
|
||||
height: auto;
|
||||
background: transparent;
|
||||
padding: 0;
|
||||
scrollbar-size: 0 0;
|
||||
}
|
||||
|
||||
.help_list > ListItem {
|
||||
background: #1b1f33;
|
||||
padding: 0 1;
|
||||
height: 1;
|
||||
}
|
||||
|
||||
.help_list > ListItem:hover {
|
||||
background: #2a2f45;
|
||||
}
|
||||
|
||||
.help_list > ListItem > Label {
|
||||
width: 100%;
|
||||
padding: 0;
|
||||
}
|
||||
|
||||
#help_footer {
|
||||
width: 100%;
|
||||
height: 2;
|
||||
text-align: center;
|
||||
content-align: center middle;
|
||||
color: #b6bfdc;
|
||||
margin-top: 0;
|
||||
border-top: solid #4b5165;
|
||||
}
|
||||
|
||||
#filter_container {
|
||||
width: 60;
|
||||
height: auto;
|
||||
background: #181a2a;
|
||||
border: heavy #7aa2f7;
|
||||
padding: 1 2;
|
||||
}
|
||||
|
||||
#filter_title {
|
||||
width: 100%;
|
||||
height: 2;
|
||||
text-align: center;
|
||||
text-style: bold;
|
||||
color: #7aa2f7;
|
||||
content-align: center middle;
|
||||
margin-bottom: 1;
|
||||
}
|
||||
|
||||
#filter_input {
|
||||
width: 100%;
|
||||
margin: 1 0;
|
||||
}
|
||||
|
||||
#filter_footer {
|
||||
width: 100%;
|
||||
height: 2;
|
||||
text-align: center;
|
||||
content-align: center middle;
|
||||
color: #b6bfdc;
|
||||
margin-top: 1;
|
||||
}
|
||||
"""
|
||||
__all__ = [
|
||||
"AUTH_PATH",
|
||||
"CONFIG_PATH",
|
||||
"CACHE_DIR",
|
||||
"DOWNLOAD_URL",
|
||||
"DEFAULT_CODEC",
|
||||
"MIN_FILE_SIZE",
|
||||
"DEFAULT_CHUNK_SIZE",
|
||||
"TABLE_COLUMN_DEFS",
|
||||
"AUTHOR_NAME_MAX_LENGTH",
|
||||
"AUTHOR_NAME_DISPLAY_LENGTH",
|
||||
"PROGRESS_COLUMN_INDEX",
|
||||
"SEEK_SECONDS",
|
||||
"TABLE_CSS",
|
||||
]
|
||||
|
||||
6
auditui/constants/downloads.py
Normal file
6
auditui/constants/downloads.py
Normal file
@@ -0,0 +1,6 @@
|
||||
"""Download-related constants for Audible file retrieval."""
|
||||
|
||||
DOWNLOAD_URL = "https://cde-ta-g7g.amazon.com/FionaCDEServiceEngine/FSDownloadContent"
|
||||
DEFAULT_CODEC = "LC_128_44100_stereo"
|
||||
MIN_FILE_SIZE = 1024 * 1024
|
||||
DEFAULT_CHUNK_SIZE = 8192
|
||||
5
auditui/constants/library.py
Normal file
5
auditui/constants/library.py
Normal file
@@ -0,0 +1,5 @@
|
||||
"""Library and table formatting constants."""
|
||||
|
||||
AUTHOR_NAME_MAX_LENGTH = 40
|
||||
AUTHOR_NAME_DISPLAY_LENGTH = 37
|
||||
PROGRESS_COLUMN_INDEX = 3
|
||||
8
auditui/constants/paths.py
Normal file
8
auditui/constants/paths.py
Normal file
@@ -0,0 +1,8 @@
|
||||
"""Filesystem paths used by configuration and caching."""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
AUTH_PATH = Path.home() / ".config" / "auditui" / "auth.json"
|
||||
CONFIG_PATH = Path.home() / ".config" / "auditui" / "config.json"
|
||||
CACHE_DIR = Path.home() / ".cache" / "auditui" / "books"
|
||||
3
auditui/constants/playback.py
Normal file
3
auditui/constants/playback.py
Normal file
@@ -0,0 +1,3 @@
|
||||
"""Playback behavior constants."""
|
||||
|
||||
SEEK_SECONDS = 30.0
|
||||
9
auditui/constants/table.py
Normal file
9
auditui/constants/table.py
Normal file
@@ -0,0 +1,9 @@
|
||||
"""Main library table column definitions."""
|
||||
|
||||
TABLE_COLUMN_DEFS = (
|
||||
("Title", 4),
|
||||
("Author", 3),
|
||||
("Length", 1),
|
||||
("Progress", 1),
|
||||
("Downloaded", 1),
|
||||
)
|
||||
255
auditui/constants/ui.py
Normal file
255
auditui/constants/ui.py
Normal file
@@ -0,0 +1,255 @@
|
||||
"""Textual CSS constants for the application UI."""
|
||||
|
||||
TABLE_CSS = """
|
||||
Screen {
|
||||
background: #141622;
|
||||
}
|
||||
|
||||
#top_bar {
|
||||
background: #10131f;
|
||||
color: #d5d9f0;
|
||||
text-style: bold;
|
||||
height: 1;
|
||||
margin: 0;
|
||||
padding: 0;
|
||||
}
|
||||
|
||||
#top_left,
|
||||
#top_center,
|
||||
#top_right {
|
||||
width: 1fr;
|
||||
padding: 0 1;
|
||||
background: #10131f;
|
||||
margin: 0;
|
||||
}
|
||||
|
||||
#top_left {
|
||||
text-align: left;
|
||||
}
|
||||
|
||||
#top_center {
|
||||
text-align: center;
|
||||
}
|
||||
|
||||
#top_right {
|
||||
text-align: right;
|
||||
}
|
||||
|
||||
DataTable {
|
||||
width: 100%;
|
||||
height: 1fr;
|
||||
background: #141622;
|
||||
color: #c7cfe8;
|
||||
border: solid #262a3f;
|
||||
scrollbar-size-horizontal: 0;
|
||||
}
|
||||
|
||||
DataTable:focus {
|
||||
border: solid #7aa2f7;
|
||||
}
|
||||
|
||||
DataTable > .datatable--header {
|
||||
background: #1b2033;
|
||||
color: #b9c3e3;
|
||||
text-style: bold;
|
||||
}
|
||||
|
||||
DataTable > .datatable--cursor {
|
||||
background: #232842;
|
||||
color: #e6ebff;
|
||||
}
|
||||
|
||||
DataTable > .datatable--odd-row {
|
||||
background: #121422;
|
||||
}
|
||||
|
||||
DataTable > .datatable--even-row {
|
||||
background: #15182a;
|
||||
}
|
||||
|
||||
Static {
|
||||
height: 1;
|
||||
text-align: center;
|
||||
background: #10131f;
|
||||
color: #c7cfe8;
|
||||
}
|
||||
|
||||
Static#status {
|
||||
color: #b6bfdc;
|
||||
}
|
||||
|
||||
Static#progress_info {
|
||||
color: #7aa2f7;
|
||||
text-style: bold;
|
||||
margin: 0;
|
||||
padding: 0;
|
||||
text-align: center;
|
||||
width: 100%;
|
||||
}
|
||||
|
||||
#progress_bar_container {
|
||||
align: center middle;
|
||||
width: 100%;
|
||||
height: 1;
|
||||
}
|
||||
|
||||
ProgressBar#progress_bar {
|
||||
height: 1;
|
||||
background: #10131f;
|
||||
border: none;
|
||||
margin: 0;
|
||||
padding: 0;
|
||||
width: 50%;
|
||||
}
|
||||
|
||||
ProgressBar#progress_bar Bar {
|
||||
width: 100%;
|
||||
}
|
||||
|
||||
ProgressBar#progress_bar > .progress-bar--track {
|
||||
background: #262a3f;
|
||||
}
|
||||
|
||||
ProgressBar#progress_bar > .progress-bar--bar {
|
||||
background: #8bd5ca;
|
||||
}
|
||||
|
||||
HelpScreen,
|
||||
StatsScreen,
|
||||
FilterScreen {
|
||||
align: center middle;
|
||||
background: rgba(0, 0, 0, 0.7);
|
||||
}
|
||||
|
||||
HelpScreen Static,
|
||||
StatsScreen Static,
|
||||
FilterScreen Static {
|
||||
background: transparent;
|
||||
}
|
||||
|
||||
StatsScreen #help_container {
|
||||
width: auto;
|
||||
min-width: 55;
|
||||
max-width: 70;
|
||||
}
|
||||
|
||||
StatsScreen #help_content {
|
||||
align: center middle;
|
||||
width: 100%;
|
||||
}
|
||||
|
||||
StatsScreen .help_list {
|
||||
width: 100%;
|
||||
}
|
||||
|
||||
StatsScreen .help_list > ListItem {
|
||||
background: transparent;
|
||||
height: 1;
|
||||
}
|
||||
|
||||
StatsScreen .help_list > ListItem:hover {
|
||||
background: #232842;
|
||||
}
|
||||
|
||||
StatsScreen .help_list > ListItem > Label {
|
||||
width: 100%;
|
||||
text-align: left;
|
||||
padding-left: 2;
|
||||
}
|
||||
|
||||
#help_container {
|
||||
width: 72%;
|
||||
max-width: 90;
|
||||
min-width: 44;
|
||||
height: auto;
|
||||
max-height: 80%;
|
||||
min-height: 14;
|
||||
background: #181a2a;
|
||||
border: heavy #7aa2f7;
|
||||
padding: 1 1;
|
||||
}
|
||||
|
||||
#help_title {
|
||||
width: 100%;
|
||||
height: 2;
|
||||
text-align: center;
|
||||
text-style: bold;
|
||||
color: #7aa2f7;
|
||||
content-align: center middle;
|
||||
margin-bottom: 0;
|
||||
border-bottom: solid #4b5165;
|
||||
}
|
||||
|
||||
#help_content {
|
||||
width: 100%;
|
||||
height: auto;
|
||||
padding: 0;
|
||||
margin: 0 0 1 0;
|
||||
align: center middle;
|
||||
}
|
||||
|
||||
.help_list {
|
||||
width: 100%;
|
||||
height: auto;
|
||||
background: transparent;
|
||||
padding: 0;
|
||||
scrollbar-size: 0 0;
|
||||
}
|
||||
|
||||
.help_list > ListItem {
|
||||
background: #1b1f33;
|
||||
padding: 0 1;
|
||||
height: 1;
|
||||
}
|
||||
|
||||
.help_list > ListItem:hover {
|
||||
background: #2a2f45;
|
||||
}
|
||||
|
||||
.help_list > ListItem > Label {
|
||||
width: 100%;
|
||||
padding: 0;
|
||||
}
|
||||
|
||||
#help_footer {
|
||||
width: 100%;
|
||||
height: 2;
|
||||
text-align: center;
|
||||
content-align: center middle;
|
||||
color: #b6bfdc;
|
||||
margin-top: 0;
|
||||
border-top: solid #4b5165;
|
||||
}
|
||||
|
||||
#filter_container {
|
||||
width: 60;
|
||||
height: auto;
|
||||
background: #181a2a;
|
||||
border: heavy #7aa2f7;
|
||||
padding: 1 2;
|
||||
}
|
||||
|
||||
#filter_title {
|
||||
width: 100%;
|
||||
height: 2;
|
||||
text-align: center;
|
||||
text-style: bold;
|
||||
color: #7aa2f7;
|
||||
content-align: center middle;
|
||||
margin-bottom: 1;
|
||||
}
|
||||
|
||||
#filter_input {
|
||||
width: 100%;
|
||||
margin: 1 0;
|
||||
}
|
||||
|
||||
#filter_footer {
|
||||
width: 100%;
|
||||
height: 2;
|
||||
text-align: center;
|
||||
content-align: center middle;
|
||||
color: #b6bfdc;
|
||||
margin-top: 1;
|
||||
}
|
||||
"""
|
||||
@@ -1,7 +1,9 @@
|
||||
"""Obtains AAX files from Audible (cache or download) and provides activation bytes."""
|
||||
|
||||
import re
|
||||
import unicodedata
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import audible
|
||||
@@ -29,56 +31,94 @@ class DownloadManager:
|
||||
chunk_size: int = DEFAULT_CHUNK_SIZE,
|
||||
) -> None:
|
||||
self.auth = auth
|
||||
self.client = client
|
||||
self.client: Any = client
|
||||
self.cache_dir = cache_dir
|
||||
self.cache_dir.mkdir(parents=True, exist_ok=True)
|
||||
self.chunk_size = chunk_size
|
||||
self._http_client = httpx.Client(
|
||||
auth=auth, timeout=30.0, follow_redirects=True)
|
||||
self._http_client = httpx.Client(auth=auth, timeout=30.0, follow_redirects=True)
|
||||
self._download_client = httpx.Client(
|
||||
timeout=httpx.Timeout(connect=30.0, read=None,
|
||||
write=30.0, pool=30.0),
|
||||
timeout=httpx.Timeout(connect=30.0, read=None, write=30.0, pool=30.0),
|
||||
follow_redirects=True,
|
||||
)
|
||||
|
||||
def get_or_download(
|
||||
self, asin: str, notify: StatusCallback | None = None
|
||||
self,
|
||||
asin: str,
|
||||
notify: StatusCallback | None = None,
|
||||
preferred_title: str | None = None,
|
||||
preferred_author: str | None = None,
|
||||
) -> Path | None:
|
||||
"""Return local path to AAX file; download and cache if not present."""
|
||||
title = self._get_name_from_asin(asin) or asin
|
||||
safe_title = self._sanitize_filename(title)
|
||||
local_path = self.cache_dir / f"{safe_title}.aax"
|
||||
|
||||
if local_path.exists() and local_path.stat().st_size >= MIN_FILE_SIZE:
|
||||
filename_stems = self._get_filename_stems_from_asin(
|
||||
asin,
|
||||
preferred_title=preferred_title,
|
||||
preferred_author=preferred_author,
|
||||
)
|
||||
local_path = self.cache_dir / f"{filename_stems[0]}.aax"
|
||||
cached_path = self._find_cached_path(filename_stems)
|
||||
if cached_path:
|
||||
if notify:
|
||||
notify(f"Using cached file: {local_path.name}")
|
||||
return local_path
|
||||
notify(f"Using cached file: {cached_path.name}")
|
||||
return cached_path
|
||||
|
||||
if notify:
|
||||
notify(f"Downloading to {local_path.name}...")
|
||||
|
||||
if not self._download_to_valid_file(asin, local_path, notify):
|
||||
return None
|
||||
|
||||
return local_path
|
||||
|
||||
def _download_to_valid_file(
|
||||
self,
|
||||
asin: str,
|
||||
local_path: Path,
|
||||
notify: StatusCallback | None = None,
|
||||
) -> bool:
|
||||
"""Download with one retry and ensure resulting file has a valid size."""
|
||||
for attempt in range(1, 3):
|
||||
if not self._attempt_download(asin, local_path, notify):
|
||||
return False
|
||||
if local_path.exists() and local_path.stat().st_size >= MIN_FILE_SIZE:
|
||||
return True
|
||||
|
||||
downloaded_size = local_path.stat().st_size if local_path.exists() else 0
|
||||
if notify and attempt == 1:
|
||||
notify(
|
||||
f"Downloaded file too small ({downloaded_size} bytes), retrying..."
|
||||
)
|
||||
if notify and attempt == 2:
|
||||
notify(
|
||||
f"Download failed: file too small ({downloaded_size} bytes, expected >= {MIN_FILE_SIZE})"
|
||||
)
|
||||
self._cleanup_partial_file(local_path)
|
||||
|
||||
return False
|
||||
|
||||
def _attempt_download(
|
||||
self,
|
||||
asin: str,
|
||||
local_path: Path,
|
||||
notify: StatusCallback | None = None,
|
||||
) -> bool:
|
||||
"""Perform one download attempt including link lookup and URL validation."""
|
||||
dl_link = self._get_download_link(asin, notify=notify)
|
||||
if not dl_link:
|
||||
if notify:
|
||||
notify("Failed to get download link")
|
||||
return None
|
||||
return False
|
||||
|
||||
if not self._validate_download_url(dl_link):
|
||||
if notify:
|
||||
notify("Invalid download URL")
|
||||
return None
|
||||
return False
|
||||
|
||||
if not self._download_file(dl_link, local_path, notify):
|
||||
if notify:
|
||||
notify("Download failed")
|
||||
return None
|
||||
return False
|
||||
|
||||
if not local_path.exists() or local_path.stat().st_size < MIN_FILE_SIZE:
|
||||
if notify:
|
||||
notify("Download failed or file too small")
|
||||
return None
|
||||
|
||||
return local_path
|
||||
return True
|
||||
|
||||
def get_activation_bytes(self) -> str | None:
|
||||
"""Return activation bytes as hex string for ffplay/ffmpeg."""
|
||||
@@ -92,12 +132,7 @@ class DownloadManager:
|
||||
|
||||
def get_cached_path(self, asin: str) -> Path | None:
|
||||
"""Return path to cached AAX file if it exists and is valid size."""
|
||||
title = self._get_name_from_asin(asin) or asin
|
||||
safe_title = self._sanitize_filename(title)
|
||||
local_path = self.cache_dir / f"{safe_title}.aax"
|
||||
if local_path.exists() and local_path.stat().st_size >= MIN_FILE_SIZE:
|
||||
return local_path
|
||||
return None
|
||||
return self._find_cached_path(self._get_filename_stems_from_asin(asin))
|
||||
|
||||
def is_cached(self, asin: str) -> bool:
|
||||
"""Return True if the title is present in cache with valid size."""
|
||||
@@ -130,20 +165,68 @@ class DownloadManager:
|
||||
return False
|
||||
|
||||
def _sanitize_filename(self, filename: str) -> str:
|
||||
"""Remove invalid characters from filename."""
|
||||
return re.sub(r'[<>:"/\\|?*]', "_", filename)
|
||||
"""Normalize a filename segment with ASCII letters, digits, and dashes."""
|
||||
ascii_text = unicodedata.normalize("NFKD", filename)
|
||||
ascii_text = ascii_text.encode("ascii", "ignore").decode("ascii")
|
||||
ascii_text = re.sub(r"[’'`]+", "", ascii_text)
|
||||
ascii_text = re.sub(r"[^A-Za-z0-9]+", "-", ascii_text)
|
||||
ascii_text = re.sub(r"-+", "-", ascii_text)
|
||||
ascii_text = ascii_text.strip("-._")
|
||||
return ascii_text or "Unknown"
|
||||
|
||||
def _find_cached_path(self, filename_stems: list[str]) -> Path | None:
|
||||
"""Return the first valid cached path matching any candidate filename stem."""
|
||||
for filename_stem in filename_stems:
|
||||
local_path = self.cache_dir / f"{filename_stem}.aax"
|
||||
if local_path.exists() and local_path.stat().st_size >= MIN_FILE_SIZE:
|
||||
return local_path
|
||||
return None
|
||||
|
||||
def _get_filename_stems_from_asin(
|
||||
self,
|
||||
asin: str,
|
||||
preferred_title: str | None = None,
|
||||
preferred_author: str | None = None,
|
||||
) -> list[str]:
|
||||
"""Build preferred and fallback cache filename stems for an ASIN."""
|
||||
if preferred_title:
|
||||
preferred_combined = (
|
||||
f"{self._sanitize_filename(preferred_author or 'Unknown Author')}_"
|
||||
f"{self._sanitize_filename(preferred_title)}"
|
||||
)
|
||||
preferred_legacy = self._sanitize_filename(preferred_title)
|
||||
fallback_asin = self._sanitize_filename(asin)
|
||||
return list(
|
||||
dict.fromkeys([preferred_combined, preferred_legacy, fallback_asin])
|
||||
)
|
||||
|
||||
def _get_name_from_asin(self, asin: str) -> str | None:
|
||||
"""Get the title/name of a book from its ASIN."""
|
||||
try:
|
||||
product_info = self.client.get(
|
||||
path=f"1.0/catalog/products/{asin}",
|
||||
response_groups="product_desc,product_attrs",
|
||||
**{"response_groups": "contributors,product_desc,product_attrs"},
|
||||
)
|
||||
product = product_info.get("product", {})
|
||||
return product.get("title") or "Unknown Title"
|
||||
except (OSError, ValueError, KeyError):
|
||||
return None
|
||||
title = product.get("title") or "Unknown Title"
|
||||
author = self._get_primary_author(product)
|
||||
combined = (
|
||||
f"{self._sanitize_filename(author)}_{self._sanitize_filename(title)}"
|
||||
)
|
||||
legacy_title = self._sanitize_filename(title)
|
||||
fallback_asin = self._sanitize_filename(asin)
|
||||
return list(dict.fromkeys([combined, legacy_title, fallback_asin]))
|
||||
except (OSError, ValueError, KeyError, AttributeError):
|
||||
return [self._sanitize_filename(asin)]
|
||||
|
||||
def _get_primary_author(self, product: dict) -> str:
|
||||
"""Extract a primary author name from product metadata."""
|
||||
contributors = product.get("authors") or product.get("contributors") or []
|
||||
for contributor in contributors:
|
||||
if not isinstance(contributor, dict):
|
||||
continue
|
||||
name = contributor.get("name")
|
||||
if isinstance(name, str) and name.strip():
|
||||
return name
|
||||
return "Unknown Author"
|
||||
|
||||
def _get_download_link(
|
||||
self,
|
||||
@@ -174,7 +257,8 @@ class DownloadManager:
|
||||
if not link:
|
||||
link = str(response.url)
|
||||
|
||||
tld = self.auth.locale.domain
|
||||
locale = getattr(self.auth, "locale", None)
|
||||
tld = getattr(locale, "domain", "com")
|
||||
return link.replace("cds.audible.com", f"cds.audible.{tld}")
|
||||
|
||||
except httpx.HTTPError as exc:
|
||||
@@ -194,19 +278,7 @@ class DownloadManager:
|
||||
with self._download_client.stream("GET", url) as response:
|
||||
response.raise_for_status()
|
||||
total_size = int(response.headers.get("content-length", 0))
|
||||
downloaded = 0
|
||||
|
||||
with open(dest_path, "wb") as file_handle:
|
||||
for chunk in response.iter_bytes(chunk_size=self.chunk_size):
|
||||
file_handle.write(chunk)
|
||||
downloaded += len(chunk)
|
||||
if total_size > 0 and notify:
|
||||
percent = (downloaded / total_size) * 100
|
||||
downloaded_mb = downloaded / (1024 * 1024)
|
||||
total_mb = total_size / (1024 * 1024)
|
||||
notify(
|
||||
f"Downloading: {percent:.1f}% ({downloaded_mb:.1f}/{total_mb:.1f} MB)"
|
||||
)
|
||||
self._stream_to_file(response, dest_path, total_size, notify)
|
||||
|
||||
return dest_path
|
||||
except httpx.HTTPStatusError as exc:
|
||||
@@ -214,31 +286,56 @@ class DownloadManager:
|
||||
notify(
|
||||
f"Download HTTP error: {exc.response.status_code} {exc.response.reason_phrase}"
|
||||
)
|
||||
try:
|
||||
if dest_path.exists() and dest_path.stat().st_size < MIN_FILE_SIZE:
|
||||
dest_path.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
self._cleanup_partial_file(dest_path)
|
||||
return None
|
||||
except httpx.HTTPError as exc:
|
||||
if notify:
|
||||
notify(f"Download network error: {exc!s}")
|
||||
try:
|
||||
if dest_path.exists() and dest_path.stat().st_size < MIN_FILE_SIZE:
|
||||
dest_path.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
self._cleanup_partial_file(dest_path)
|
||||
return None
|
||||
except (OSError, ValueError, KeyError) as exc:
|
||||
if notify:
|
||||
notify(f"Download error: {exc!s}")
|
||||
try:
|
||||
if dest_path.exists() and dest_path.stat().st_size < MIN_FILE_SIZE:
|
||||
dest_path.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
self._cleanup_partial_file(dest_path)
|
||||
return None
|
||||
|
||||
def _stream_to_file(
|
||||
self,
|
||||
response: httpx.Response,
|
||||
dest_path: Path,
|
||||
total_size: int,
|
||||
notify: StatusCallback | None = None,
|
||||
) -> None:
|
||||
"""Write streamed response bytes to disk and emit progress messages."""
|
||||
downloaded = 0
|
||||
with open(dest_path, "wb") as file_handle:
|
||||
for chunk in response.iter_bytes(chunk_size=self.chunk_size):
|
||||
file_handle.write(chunk)
|
||||
downloaded += len(chunk)
|
||||
self._notify_download_progress(downloaded, total_size, notify)
|
||||
|
||||
def _notify_download_progress(
|
||||
self,
|
||||
downloaded: int,
|
||||
total_size: int,
|
||||
notify: StatusCallback | None = None,
|
||||
) -> None:
|
||||
"""Emit a formatted progress message when total size is known."""
|
||||
if total_size <= 0 or not notify:
|
||||
return
|
||||
percent = (downloaded / total_size) * 100
|
||||
downloaded_mb = downloaded / (1024 * 1024)
|
||||
total_mb = total_size / (1024 * 1024)
|
||||
notify(f"Downloading: {percent:.1f}% ({downloaded_mb:.1f}/{total_mb:.1f} MB)")
|
||||
|
||||
def _cleanup_partial_file(self, dest_path: Path) -> None:
|
||||
"""Remove undersized partial download files after transfer failures."""
|
||||
try:
|
||||
if dest_path.exists() and dest_path.stat().st_size < MIN_FILE_SIZE:
|
||||
dest_path.unlink()
|
||||
except OSError:
|
||||
return
|
||||
|
||||
def close(self) -> None:
|
||||
"""Close internal HTTP clients. Safe to call multiple times."""
|
||||
if hasattr(self, "_http_client"):
|
||||
|
||||
@@ -1,365 +1,25 @@
|
||||
"""Client for the Audible library API."""
|
||||
"""Client facade for Audible library fetch, extraction, and progress updates."""
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from __future__ import annotations
|
||||
|
||||
import audible
|
||||
|
||||
from ..types import LibraryItem, StatusCallback
|
||||
from .client_extract import LibraryClientExtractMixin
|
||||
from .client_fetch import LibraryClientFetchMixin
|
||||
from .client_finished import LibraryClientFinishedMixin
|
||||
from .client_format import LibraryClientFormatMixin
|
||||
from .client_positions import LibraryClientPositionsMixin
|
||||
|
||||
|
||||
class LibraryClient:
|
||||
"""Client for the Audible library API. Fetches items, extracts metadata, and updates positions."""
|
||||
class LibraryClient(
|
||||
LibraryClientFetchMixin,
|
||||
LibraryClientExtractMixin,
|
||||
LibraryClientPositionsMixin,
|
||||
LibraryClientFinishedMixin,
|
||||
LibraryClientFormatMixin,
|
||||
):
|
||||
"""Audible library client composed from focused behavior mixins."""
|
||||
|
||||
def __init__(self, client: audible.Client) -> None:
|
||||
"""Store authenticated Audible client used by all operations."""
|
||||
self.client = client
|
||||
|
||||
def fetch_all_items(self, on_progress: StatusCallback | None = None) -> list[LibraryItem]:
|
||||
"""Fetch all library items from the API."""
|
||||
response_groups = (
|
||||
"contributors,media,product_attrs,product_desc,product_details,"
|
||||
"is_finished,listening_status,percent_complete"
|
||||
)
|
||||
return self._fetch_all_pages(response_groups, on_progress)
|
||||
|
||||
def _fetch_page(
|
||||
self, page: int, page_size: int, response_groups: str
|
||||
) -> tuple[int, list[LibraryItem]]:
|
||||
"""Fetch a single page of library items from the API."""
|
||||
library = self.client.get(
|
||||
path="library",
|
||||
num_results=page_size,
|
||||
page=page,
|
||||
response_groups=response_groups,
|
||||
)
|
||||
items = library.get("items", [])
|
||||
return page, list(items)
|
||||
|
||||
def _fetch_all_pages(
|
||||
self, response_groups: str, on_progress: StatusCallback | None = None
|
||||
) -> list[LibraryItem]:
|
||||
"""Fetch all pages of library items using parallel requests."""
|
||||
library_response = None
|
||||
page_size = 200
|
||||
|
||||
for attempt_size in [200, 100, 50]:
|
||||
try:
|
||||
library_response = self.client.get(
|
||||
path="library",
|
||||
num_results=attempt_size,
|
||||
page=1,
|
||||
response_groups=response_groups,
|
||||
)
|
||||
page_size = attempt_size
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if not library_response:
|
||||
return []
|
||||
|
||||
first_page_items = library_response.get("items", [])
|
||||
if not first_page_items:
|
||||
return []
|
||||
|
||||
all_items: list[LibraryItem] = list(first_page_items)
|
||||
if on_progress:
|
||||
on_progress(f"Fetched page 1 ({len(first_page_items)} items)...")
|
||||
|
||||
if len(first_page_items) < page_size:
|
||||
return all_items
|
||||
|
||||
total_items_estimate = library_response.get(
|
||||
"total_results") or library_response.get("total")
|
||||
if total_items_estimate:
|
||||
estimated_pages = (total_items_estimate +
|
||||
page_size - 1) // page_size
|
||||
estimated_pages = min(estimated_pages, 1000)
|
||||
else:
|
||||
estimated_pages = 500
|
||||
|
||||
max_workers = 50
|
||||
page_results: dict[int, list[LibraryItem]] = {}
|
||||
|
||||
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||
future_to_page: dict = {}
|
||||
|
||||
for page in range(2, estimated_pages + 1):
|
||||
future = executor.submit(
|
||||
self._fetch_page, page, page_size, response_groups
|
||||
)
|
||||
future_to_page[future] = page
|
||||
|
||||
completed_count = 0
|
||||
total_items = len(first_page_items)
|
||||
|
||||
for future in as_completed(future_to_page):
|
||||
page_num = future_to_page.pop(future)
|
||||
try:
|
||||
fetched_page, items = future.result()
|
||||
if not items or len(items) < page_size:
|
||||
for remaining_future in list(future_to_page.keys()):
|
||||
remaining_future.cancel()
|
||||
break
|
||||
|
||||
page_results[fetched_page] = items
|
||||
total_items += len(items)
|
||||
completed_count += 1
|
||||
if on_progress and completed_count % 20 == 0:
|
||||
on_progress(
|
||||
f"Fetched {completed_count} pages ({total_items} items)..."
|
||||
)
|
||||
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
for page_num in sorted(page_results.keys()):
|
||||
all_items.extend(page_results[page_num])
|
||||
|
||||
return all_items
|
||||
|
||||
def extract_title(self, item: LibraryItem) -> str:
|
||||
"""Return the book title from a library item."""
|
||||
product = item.get("product", {})
|
||||
return (
|
||||
product.get("title")
|
||||
or item.get("title")
|
||||
or product.get("asin", "Unknown Title")
|
||||
)
|
||||
|
||||
def extract_authors(self, item: LibraryItem) -> str:
|
||||
"""Return comma-separated author names from a library item."""
|
||||
product = item.get("product", {})
|
||||
authors = product.get("authors") or product.get("contributors") or []
|
||||
if not authors and "authors" in item:
|
||||
authors = item.get("authors", [])
|
||||
|
||||
author_names = [a.get("name", "")
|
||||
for a in authors if isinstance(a, dict)]
|
||||
return ", ".join(author_names) or "Unknown"
|
||||
|
||||
def extract_runtime_minutes(self, item: LibraryItem) -> int | None:
|
||||
"""Return runtime in minutes if present."""
|
||||
product = item.get("product", {})
|
||||
runtime_fields = [
|
||||
"runtime_length_min",
|
||||
"runtime_length",
|
||||
"vLength",
|
||||
"length",
|
||||
"duration",
|
||||
]
|
||||
|
||||
runtime = None
|
||||
for field in runtime_fields:
|
||||
runtime = product.get(field) or item.get(field)
|
||||
if runtime is not None:
|
||||
break
|
||||
|
||||
if runtime is None:
|
||||
return None
|
||||
|
||||
if isinstance(runtime, dict):
|
||||
return int(runtime.get("min", 0))
|
||||
if isinstance(runtime, (int, float)):
|
||||
return int(runtime)
|
||||
return None
|
||||
|
||||
def extract_progress_info(self, item: LibraryItem) -> float | None:
|
||||
"""Return progress percentage (0–100) if present."""
|
||||
percent_complete = item.get("percent_complete")
|
||||
listening_status = item.get("listening_status", {})
|
||||
|
||||
if isinstance(listening_status, dict) and percent_complete is None:
|
||||
percent_complete = listening_status.get("percent_complete")
|
||||
|
||||
return float(percent_complete) if percent_complete is not None else None
|
||||
|
||||
def extract_asin(self, item: LibraryItem) -> str | None:
|
||||
"""Return the ASIN for a library item."""
|
||||
product = item.get("product", {})
|
||||
return item.get("asin") or product.get("asin")
|
||||
|
||||
def is_finished(self, item: LibraryItem) -> bool:
|
||||
"""Return True if the item is marked or inferred as finished."""
|
||||
is_finished_flag = item.get("is_finished")
|
||||
percent_complete = item.get("percent_complete")
|
||||
listening_status = item.get("listening_status")
|
||||
|
||||
if isinstance(listening_status, dict):
|
||||
is_finished_flag = is_finished_flag or listening_status.get(
|
||||
"is_finished", False
|
||||
)
|
||||
if percent_complete is None:
|
||||
percent_complete = listening_status.get("percent_complete", 0)
|
||||
|
||||
return bool(is_finished_flag) or (
|
||||
isinstance(percent_complete, (int, float))
|
||||
and percent_complete >= 100
|
||||
)
|
||||
|
||||
def get_last_position(self, asin: str) -> float | None:
|
||||
"""Get the last playback position for a book in seconds."""
|
||||
try:
|
||||
response = self.client.get(
|
||||
path="1.0/annotations/lastpositions",
|
||||
asins=asin,
|
||||
)
|
||||
annotations = response.get("asin_last_position_heard_annots", [])
|
||||
|
||||
for annot in annotations:
|
||||
if annot.get("asin") != asin:
|
||||
continue
|
||||
|
||||
last_position_heard = annot.get("last_position_heard", {})
|
||||
if not isinstance(last_position_heard, dict):
|
||||
continue
|
||||
|
||||
if last_position_heard.get("status") == "DoesNotExist":
|
||||
return None
|
||||
|
||||
position_ms = last_position_heard.get("position_ms")
|
||||
if position_ms is not None:
|
||||
return float(position_ms) / 1000.0
|
||||
|
||||
return None
|
||||
except (OSError, ValueError, KeyError):
|
||||
return None
|
||||
|
||||
def _get_content_reference(self, asin: str) -> dict | None:
|
||||
"""Fetch content reference (ACR and version) for position updates."""
|
||||
try:
|
||||
response = self.client.get(
|
||||
path=f"1.0/content/{asin}/metadata",
|
||||
response_groups="content_reference",
|
||||
)
|
||||
content_metadata = response.get("content_metadata", {})
|
||||
content_reference = content_metadata.get("content_reference", {})
|
||||
if isinstance(content_reference, dict):
|
||||
return content_reference
|
||||
return None
|
||||
except (OSError, ValueError, KeyError):
|
||||
return None
|
||||
|
||||
def _update_position(self, asin: str, position_seconds: float) -> bool:
|
||||
"""Persist playback position to the API. Returns True on success."""
|
||||
if position_seconds < 0:
|
||||
return False
|
||||
|
||||
content_ref = self._get_content_reference(asin)
|
||||
if not content_ref:
|
||||
return False
|
||||
|
||||
acr = content_ref.get("acr")
|
||||
if not acr:
|
||||
return False
|
||||
|
||||
body = {
|
||||
"acr": acr,
|
||||
"asin": asin,
|
||||
"position_ms": int(position_seconds * 1000),
|
||||
}
|
||||
|
||||
if version := content_ref.get("version"):
|
||||
body["version"] = version
|
||||
|
||||
try:
|
||||
self.client.put(
|
||||
path=f"1.0/lastpositions/{asin}",
|
||||
body=body,
|
||||
)
|
||||
return True
|
||||
except (OSError, ValueError, KeyError):
|
||||
return False
|
||||
|
||||
def save_last_position(self, asin: str, position_seconds: float) -> bool:
|
||||
"""Save playback position to Audible. Returns True on success."""
|
||||
if position_seconds <= 0:
|
||||
return False
|
||||
return self._update_position(asin, position_seconds)
|
||||
|
||||
@staticmethod
|
||||
def format_duration(
|
||||
value: int | None, unit: str = "minutes", default_none: str | None = None
|
||||
) -> str | None:
|
||||
"""Format a duration value as e.g. 2h30m or 45m."""
|
||||
if value is None or value <= 0:
|
||||
return default_none
|
||||
|
||||
total_minutes = int(value)
|
||||
if unit == "seconds":
|
||||
total_minutes //= 60
|
||||
|
||||
hours, minutes = divmod(total_minutes, 60)
|
||||
|
||||
if hours > 0:
|
||||
return f"{hours}h{minutes:02d}" if minutes else f"{hours}h"
|
||||
return f"{minutes}m"
|
||||
|
||||
def mark_as_finished(self, asin: str, item: LibraryItem | None = None) -> bool:
|
||||
"""Mark a book as finished on Audible. Optionally mutates item in place."""
|
||||
total_ms = self._get_runtime_ms(asin, item)
|
||||
if not total_ms:
|
||||
return False
|
||||
|
||||
position_ms = total_ms
|
||||
acr = self._get_acr(asin)
|
||||
if not acr:
|
||||
return False
|
||||
|
||||
try:
|
||||
self.client.put(
|
||||
path=f"1.0/lastpositions/{asin}",
|
||||
body={"asin": asin, "acr": acr, "position_ms": position_ms},
|
||||
)
|
||||
if item:
|
||||
item["is_finished"] = True
|
||||
listening_status = item.get("listening_status", {})
|
||||
if isinstance(listening_status, dict):
|
||||
listening_status["is_finished"] = True
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def _get_runtime_ms(self, asin: str, item: LibraryItem | None = None) -> int | None:
|
||||
"""Return total runtime in ms from item or API."""
|
||||
if item:
|
||||
runtime_min = self.extract_runtime_minutes(item)
|
||||
if runtime_min:
|
||||
return runtime_min * 60 * 1000
|
||||
|
||||
try:
|
||||
response = self.client.get(
|
||||
path=f"1.0/content/{asin}/metadata",
|
||||
response_groups="chapter_info",
|
||||
)
|
||||
chapter_info = response.get(
|
||||
"content_metadata", {}).get("chapter_info", {})
|
||||
return chapter_info.get("runtime_length_ms")
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def _get_acr(self, asin: str) -> str | None:
|
||||
"""Fetch ACR token required for position and finish updates."""
|
||||
try:
|
||||
response = self.client.post(
|
||||
path=f"1.0/content/{asin}/licenserequest",
|
||||
body={
|
||||
"response_groups": "content_reference",
|
||||
"consumption_type": "Download",
|
||||
"drm_type": "Adrm",
|
||||
},
|
||||
)
|
||||
return response.get("content_license", {}).get("acr")
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def format_time(seconds: float) -> str:
|
||||
"""Format seconds as HH:MM:SS or MM:SS for display."""
|
||||
total_seconds = int(seconds)
|
||||
hours = total_seconds // 3600
|
||||
minutes = (total_seconds % 3600) // 60
|
||||
secs = total_seconds % 60
|
||||
|
||||
if hours > 0:
|
||||
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
|
||||
return f"{minutes:02d}:{secs:02d}"
|
||||
|
||||
84
auditui/library/client_extract.py
Normal file
84
auditui/library/client_extract.py
Normal file
@@ -0,0 +1,84 @@
|
||||
"""Metadata extraction helpers for library items."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from ..types import LibraryItem
|
||||
|
||||
|
||||
class LibraryClientExtractMixin:
|
||||
"""Extracts display and status fields from library items."""
|
||||
|
||||
def extract_title(self, item: LibraryItem) -> str:
|
||||
"""Return the book title from a library item."""
|
||||
product = item.get("product", {})
|
||||
return (
|
||||
product.get("title")
|
||||
or item.get("title")
|
||||
or product.get("asin", "Unknown Title")
|
||||
)
|
||||
|
||||
def extract_authors(self, item: LibraryItem) -> str:
|
||||
"""Return comma-separated author names from a library item."""
|
||||
product = item.get("product", {})
|
||||
authors = product.get("authors") or product.get("contributors") or []
|
||||
if not authors and "authors" in item:
|
||||
authors = item.get("authors", [])
|
||||
author_names = [
|
||||
author.get("name", "") for author in authors if isinstance(author, dict)
|
||||
]
|
||||
return ", ".join(author_names) or "Unknown"
|
||||
|
||||
def extract_runtime_minutes(self, item: LibraryItem) -> int | None:
|
||||
"""Return runtime in minutes if present."""
|
||||
product = item.get("product", {})
|
||||
runtime_fields = [
|
||||
"runtime_length_min",
|
||||
"runtime_length",
|
||||
"vLength",
|
||||
"length",
|
||||
"duration",
|
||||
]
|
||||
|
||||
runtime = None
|
||||
for field in runtime_fields:
|
||||
runtime = product.get(field) or item.get(field)
|
||||
if runtime is not None:
|
||||
break
|
||||
|
||||
if runtime is None:
|
||||
return None
|
||||
if isinstance(runtime, dict):
|
||||
return int(runtime.get("min", 0))
|
||||
if isinstance(runtime, (int, float)):
|
||||
return int(runtime)
|
||||
return None
|
||||
|
||||
def extract_progress_info(self, item: LibraryItem) -> float | None:
|
||||
"""Return progress percentage (0-100) if present."""
|
||||
percent_complete = item.get("percent_complete")
|
||||
listening_status = item.get("listening_status", {})
|
||||
if isinstance(listening_status, dict) and percent_complete is None:
|
||||
percent_complete = listening_status.get("percent_complete")
|
||||
return float(percent_complete) if percent_complete is not None else None
|
||||
|
||||
def extract_asin(self, item: LibraryItem) -> str | None:
|
||||
"""Return the ASIN for a library item."""
|
||||
product = item.get("product", {})
|
||||
return item.get("asin") or product.get("asin")
|
||||
|
||||
def is_finished(self, item: LibraryItem) -> bool:
|
||||
"""Return True if the item is marked or inferred as finished."""
|
||||
is_finished_flag = item.get("is_finished")
|
||||
percent_complete = item.get("percent_complete")
|
||||
listening_status = item.get("listening_status")
|
||||
|
||||
if isinstance(listening_status, dict):
|
||||
is_finished_flag = is_finished_flag or listening_status.get(
|
||||
"is_finished", False
|
||||
)
|
||||
if percent_complete is None:
|
||||
percent_complete = listening_status.get("percent_complete", 0)
|
||||
|
||||
return bool(is_finished_flag) or (
|
||||
isinstance(percent_complete, (int, float)) and percent_complete >= 100
|
||||
)
|
||||
165
auditui/library/client_fetch.py
Normal file
165
auditui/library/client_fetch.py
Normal file
@@ -0,0 +1,165 @@
|
||||
"""Library page fetching helpers for the Audible API client."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from typing import Any
|
||||
|
||||
from ..types import LibraryItem, StatusCallback
|
||||
|
||||
|
||||
class LibraryClientFetchMixin:
|
||||
"""Fetches all library items from paginated Audible endpoints."""
|
||||
|
||||
client: Any
|
||||
|
||||
def fetch_all_items(
|
||||
self, on_progress: StatusCallback | None = None
|
||||
) -> list[LibraryItem]:
|
||||
"""Fetch all library items from the API."""
|
||||
response_groups = "contributors,product_attrs,product_desc,is_finished,listening_status,percent_complete"
|
||||
return self._fetch_all_pages(response_groups, on_progress)
|
||||
|
||||
def _fetch_page(
|
||||
self,
|
||||
page: int,
|
||||
page_size: int,
|
||||
response_groups: str,
|
||||
) -> tuple[int, list[LibraryItem]]:
|
||||
"""Fetch one library page and return its index with items."""
|
||||
library = self.client.get(
|
||||
path="library",
|
||||
num_results=page_size,
|
||||
page=page,
|
||||
response_groups=response_groups,
|
||||
)
|
||||
items = library.get("items", [])
|
||||
return page, list(items)
|
||||
|
||||
def _fetch_all_pages(
|
||||
self,
|
||||
response_groups: str,
|
||||
on_progress: StatusCallback | None = None,
|
||||
) -> list[LibraryItem]:
|
||||
"""Fetch all library pages using parallel requests after page one."""
|
||||
library_response = None
|
||||
page_size = 200
|
||||
|
||||
for attempt_size in [200, 100, 50]:
|
||||
try:
|
||||
library_response = self.client.get(
|
||||
path="library",
|
||||
num_results=attempt_size,
|
||||
page=1,
|
||||
response_groups=response_groups,
|
||||
)
|
||||
page_size = attempt_size
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if not library_response:
|
||||
return []
|
||||
|
||||
first_page_items = library_response.get("items", [])
|
||||
if not first_page_items:
|
||||
return []
|
||||
|
||||
all_items: list[LibraryItem] = list(first_page_items)
|
||||
if on_progress:
|
||||
on_progress(f"Fetched page 1 ({len(first_page_items)} items)...")
|
||||
|
||||
if len(first_page_items) < page_size:
|
||||
return all_items
|
||||
|
||||
estimated_pages = self._estimate_total_pages(library_response, page_size)
|
||||
page_results = self._fetch_remaining_pages(
|
||||
response_groups=response_groups,
|
||||
page_size=page_size,
|
||||
estimated_pages=estimated_pages,
|
||||
initial_total=len(first_page_items),
|
||||
on_progress=on_progress,
|
||||
)
|
||||
|
||||
for page_num in sorted(page_results.keys()):
|
||||
all_items.extend(page_results[page_num])
|
||||
|
||||
return all_items
|
||||
|
||||
def _estimate_total_pages(self, library_response: dict, page_size: int) -> int:
|
||||
"""Estimate total pages from API metadata with a conservative cap."""
|
||||
total_items_estimate = library_response.get(
|
||||
"total_results"
|
||||
) or library_response.get("total")
|
||||
if not total_items_estimate:
|
||||
return 500
|
||||
estimated_pages = (total_items_estimate + page_size - 1) // page_size
|
||||
return min(estimated_pages, 1000)
|
||||
|
||||
def _fetch_remaining_pages(
|
||||
self,
|
||||
response_groups: str,
|
||||
page_size: int,
|
||||
estimated_pages: int,
|
||||
initial_total: int,
|
||||
on_progress: StatusCallback | None = None,
|
||||
) -> dict[int, list[LibraryItem]]:
|
||||
"""Fetch pages 2..N with bounded in-flight requests for faster startup."""
|
||||
page_results: dict[int, list[LibraryItem]] = {}
|
||||
max_workers = min(16, max(1, estimated_pages - 1))
|
||||
next_page_to_submit = 2
|
||||
stop_page = estimated_pages + 1
|
||||
completed_count = 0
|
||||
total_items = initial_total
|
||||
|
||||
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||
future_to_page: dict = {}
|
||||
|
||||
while (
|
||||
next_page_to_submit <= estimated_pages
|
||||
and next_page_to_submit < stop_page
|
||||
and len(future_to_page) < max_workers
|
||||
):
|
||||
future = executor.submit(
|
||||
self._fetch_page,
|
||||
next_page_to_submit,
|
||||
page_size,
|
||||
response_groups,
|
||||
)
|
||||
future_to_page[future] = next_page_to_submit
|
||||
next_page_to_submit += 1
|
||||
|
||||
while future_to_page:
|
||||
future = next(as_completed(future_to_page))
|
||||
page_num = future_to_page.pop(future)
|
||||
try:
|
||||
fetched_page, items = future.result()
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if items:
|
||||
page_results[fetched_page] = items
|
||||
total_items += len(items)
|
||||
completed_count += 1
|
||||
if on_progress and completed_count % 20 == 0:
|
||||
on_progress(
|
||||
f"Fetched {completed_count} pages ({total_items} items)..."
|
||||
)
|
||||
if len(items) < page_size:
|
||||
stop_page = min(stop_page, fetched_page)
|
||||
|
||||
while (
|
||||
next_page_to_submit <= estimated_pages
|
||||
and next_page_to_submit < stop_page
|
||||
and len(future_to_page) < max_workers
|
||||
):
|
||||
next_future = executor.submit(
|
||||
self._fetch_page,
|
||||
next_page_to_submit,
|
||||
page_size,
|
||||
response_groups,
|
||||
)
|
||||
future_to_page[next_future] = next_page_to_submit
|
||||
next_page_to_submit += 1
|
||||
|
||||
return page_results
|
||||
70
auditui/library/client_finished.py
Normal file
70
auditui/library/client_finished.py
Normal file
@@ -0,0 +1,70 @@
|
||||
"""Helpers for marking content as finished through Audible APIs."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from ..types import LibraryItem
|
||||
|
||||
|
||||
class LibraryClientFinishedMixin:
|
||||
"""Marks titles as finished and mutates in-memory item state."""
|
||||
|
||||
client: Any
|
||||
|
||||
def mark_as_finished(self, asin: str, item: LibraryItem | None = None) -> bool:
|
||||
"""Mark a book as finished on Audible and optionally update item state."""
|
||||
total_ms = self._get_runtime_ms(asin, item)
|
||||
if not total_ms:
|
||||
return False
|
||||
|
||||
acr = self._get_acr(asin)
|
||||
if not acr:
|
||||
return False
|
||||
|
||||
try:
|
||||
self.client.put(
|
||||
path=f"1.0/lastpositions/{asin}",
|
||||
body={"asin": asin, "acr": acr, "position_ms": total_ms},
|
||||
)
|
||||
if item:
|
||||
item["is_finished"] = True
|
||||
listening_status = item.get("listening_status", {})
|
||||
if isinstance(listening_status, dict):
|
||||
listening_status["is_finished"] = True
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def _get_runtime_ms(self, asin: str, item: LibraryItem | None = None) -> int | None:
|
||||
"""Return total runtime in milliseconds from item or metadata endpoint."""
|
||||
if item:
|
||||
extract_runtime_minutes = getattr(self, "extract_runtime_minutes")
|
||||
runtime_min = extract_runtime_minutes(item)
|
||||
if runtime_min:
|
||||
return runtime_min * 60 * 1000
|
||||
|
||||
try:
|
||||
response = self.client.get(
|
||||
path=f"1.0/content/{asin}/metadata",
|
||||
response_groups="chapter_info",
|
||||
)
|
||||
chapter_info = response.get("content_metadata", {}).get("chapter_info", {})
|
||||
return chapter_info.get("runtime_length_ms")
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def _get_acr(self, asin: str) -> str | None:
|
||||
"""Fetch the ACR token required by finish/update write operations."""
|
||||
try:
|
||||
response = self.client.post(
|
||||
path=f"1.0/content/{asin}/licenserequest",
|
||||
body={
|
||||
"response_groups": "content_reference",
|
||||
"consumption_type": "Download",
|
||||
"drm_type": "Adrm",
|
||||
},
|
||||
)
|
||||
return response.get("content_license", {}).get("acr")
|
||||
except Exception:
|
||||
return None
|
||||
37
auditui/library/client_format.py
Normal file
37
auditui/library/client_format.py
Normal file
@@ -0,0 +1,37 @@
|
||||
"""Formatting helpers exposed by the library client."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
|
||||
class LibraryClientFormatMixin:
|
||||
"""Formats durations and timestamps for display usage."""
|
||||
|
||||
@staticmethod
|
||||
def format_duration(
|
||||
value: int | None,
|
||||
unit: str = "minutes",
|
||||
default_none: str | None = None,
|
||||
) -> str | None:
|
||||
"""Format duration values as compact hour-minute strings."""
|
||||
if value is None or value <= 0:
|
||||
return default_none
|
||||
|
||||
total_minutes = int(value)
|
||||
if unit == "seconds":
|
||||
total_minutes //= 60
|
||||
|
||||
hours, minutes = divmod(total_minutes, 60)
|
||||
if hours > 0:
|
||||
return f"{hours}h{minutes:02d}" if minutes else f"{hours}h"
|
||||
return f"{minutes}m"
|
||||
|
||||
@staticmethod
|
||||
def format_time(seconds: float) -> str:
|
||||
"""Format seconds as HH:MM:SS or MM:SS for display."""
|
||||
total_seconds = int(seconds)
|
||||
hours = total_seconds // 3600
|
||||
minutes = (total_seconds % 3600) // 60
|
||||
secs = total_seconds % 60
|
||||
if hours > 0:
|
||||
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
|
||||
return f"{minutes:02d}:{secs:02d}"
|
||||
85
auditui/library/client_positions.py
Normal file
85
auditui/library/client_positions.py
Normal file
@@ -0,0 +1,85 @@
|
||||
"""Playback position read and write helpers for library content."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
|
||||
class LibraryClientPositionsMixin:
|
||||
"""Handles last-position retrieval and persistence."""
|
||||
|
||||
client: Any
|
||||
|
||||
def get_last_position(self, asin: str) -> float | None:
|
||||
"""Get the last playback position for a book in seconds."""
|
||||
try:
|
||||
response = self.client.get(
|
||||
path="1.0/annotations/lastpositions",
|
||||
asins=asin,
|
||||
)
|
||||
annotations = response.get("asin_last_position_heard_annots", [])
|
||||
for annotation in annotations:
|
||||
if annotation.get("asin") != asin:
|
||||
continue
|
||||
last_position_heard = annotation.get("last_position_heard", {})
|
||||
if not isinstance(last_position_heard, dict):
|
||||
continue
|
||||
if last_position_heard.get("status") == "DoesNotExist":
|
||||
return None
|
||||
position_ms = last_position_heard.get("position_ms")
|
||||
if position_ms is not None:
|
||||
return float(position_ms) / 1000.0
|
||||
return None
|
||||
except (OSError, ValueError, KeyError):
|
||||
return None
|
||||
|
||||
def _get_content_reference(self, asin: str) -> dict | None:
|
||||
"""Fetch content reference payload used by position update calls."""
|
||||
try:
|
||||
response = self.client.get(
|
||||
path=f"1.0/content/{asin}/metadata",
|
||||
response_groups="content_reference",
|
||||
)
|
||||
content_metadata = response.get("content_metadata", {})
|
||||
content_reference = content_metadata.get("content_reference", {})
|
||||
if isinstance(content_reference, dict):
|
||||
return content_reference
|
||||
return None
|
||||
except (OSError, ValueError, KeyError):
|
||||
return None
|
||||
|
||||
def _update_position(self, asin: str, position_seconds: float) -> bool:
|
||||
"""Persist playback position to the API and return success state."""
|
||||
if position_seconds < 0:
|
||||
return False
|
||||
|
||||
content_ref = self._get_content_reference(asin)
|
||||
if not content_ref:
|
||||
return False
|
||||
|
||||
acr = content_ref.get("acr")
|
||||
if not acr:
|
||||
return False
|
||||
|
||||
body = {
|
||||
"acr": acr,
|
||||
"asin": asin,
|
||||
"position_ms": int(position_seconds * 1000),
|
||||
}
|
||||
if version := content_ref.get("version"):
|
||||
body["version"] = version
|
||||
|
||||
try:
|
||||
self.client.put(
|
||||
path=f"1.0/lastpositions/{asin}",
|
||||
body=body,
|
||||
)
|
||||
return True
|
||||
except (OSError, ValueError, KeyError):
|
||||
return False
|
||||
|
||||
def save_last_position(self, asin: str, position_seconds: float) -> bool:
|
||||
"""Save playback position to Audible and return success state."""
|
||||
if position_seconds <= 0:
|
||||
return False
|
||||
return self._update_position(asin, position_seconds)
|
||||
@@ -45,12 +45,16 @@ class ControllerLifecycleMixin(ControllerStateMixin):
|
||||
try:
|
||||
proc, return_code = process_mod.run_ffplay(cmd)
|
||||
if proc is None:
|
||||
if return_code == 0 and start_position > 0 and self.total_duration and start_position >= self.total_duration - 5:
|
||||
if (
|
||||
return_code == 0
|
||||
and start_position > 0
|
||||
and self.total_duration
|
||||
and start_position >= self.total_duration - 5
|
||||
):
|
||||
notify("Reached end of file")
|
||||
self._reset_state()
|
||||
return False
|
||||
notify(
|
||||
f"Playback process exited immediately (code: {return_code})")
|
||||
notify(f"Playback process exited immediately (code: {return_code})")
|
||||
return False
|
||||
self.playback_process = proc
|
||||
self.is_playing = True
|
||||
@@ -114,6 +118,8 @@ class ControllerLifecycleMixin(ControllerStateMixin):
|
||||
download_manager: DownloadManager,
|
||||
asin: str,
|
||||
status_callback: StatusCallback | None = None,
|
||||
preferred_title: str | None = None,
|
||||
preferred_author: str | None = None,
|
||||
) -> bool:
|
||||
"""Download AAX if needed, get activation bytes, then start playback. Returns True on success."""
|
||||
notify = status_callback or self.notify
|
||||
@@ -121,7 +127,12 @@ class ControllerLifecycleMixin(ControllerStateMixin):
|
||||
notify("Could not download file")
|
||||
return False
|
||||
notify("Preparing playback...")
|
||||
local_path = download_manager.get_or_download(asin, notify)
|
||||
local_path = download_manager.get_or_download(
|
||||
asin,
|
||||
notify,
|
||||
preferred_title=preferred_title,
|
||||
preferred_author=preferred_author,
|
||||
)
|
||||
if not local_path:
|
||||
notify("Could not download file")
|
||||
return False
|
||||
@@ -136,14 +147,15 @@ class ControllerLifecycleMixin(ControllerStateMixin):
|
||||
last = self.library_client.get_last_position(asin)
|
||||
if last is not None and last > 0:
|
||||
start_position = last
|
||||
notify(
|
||||
f"Resuming from {LibraryClient.format_time(start_position)}")
|
||||
notify(f"Resuming from {LibraryClient.format_time(start_position)}")
|
||||
except (OSError, ValueError, KeyError):
|
||||
pass
|
||||
notify(f"Starting playback of {local_path.name}...")
|
||||
self.current_asin = asin
|
||||
self.last_save_time = time.time()
|
||||
return self.start(local_path, activation_hex, notify, start_position, self.playback_speed)
|
||||
return self.start(
|
||||
local_path, activation_hex, notify, start_position, self.playback_speed
|
||||
)
|
||||
|
||||
def toggle_playback(self) -> bool:
|
||||
"""Toggle between pause and resume. Returns True if an action was performed."""
|
||||
@@ -160,7 +172,10 @@ class ControllerLifecycleMixin(ControllerStateMixin):
|
||||
return True
|
||||
|
||||
def _restart_at_position(
|
||||
self, new_position: float, new_speed: float | None = None, message: str | None = None
|
||||
self,
|
||||
new_position: float,
|
||||
new_speed: float | None = None,
|
||||
message: str | None = None,
|
||||
) -> bool:
|
||||
"""Stop current process and start again at new_position; optionally set speed and notify."""
|
||||
if not self.is_playing or not self.current_file_path:
|
||||
@@ -170,7 +185,9 @@ class ControllerLifecycleMixin(ControllerStateMixin):
|
||||
speed = new_speed if new_speed is not None else saved["speed"]
|
||||
self._stop_process()
|
||||
time.sleep(0.2)
|
||||
if self.start(saved["file_path"], saved["activation"], self.notify, new_position, speed):
|
||||
if self.start(
|
||||
saved["file_path"], saved["activation"], self.notify, new_position, speed
|
||||
):
|
||||
self.current_asin = saved["asin"]
|
||||
self.total_duration = saved["duration"]
|
||||
self.chapters = saved["chapters"]
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "auditui"
|
||||
version = "0.1.6"
|
||||
version = "0.2.0"
|
||||
description = "An Audible TUI client"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.10,<3.13"
|
||||
|
||||
52
tests/app/test_app_actions_download_hints.py
Normal file
52
tests/app/test_app_actions_download_hints.py
Normal file
@@ -0,0 +1,52 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, cast
|
||||
|
||||
from auditui.app.actions import AppActionsMixin
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class FakeTable:
|
||||
"""Minimal table shim exposing cursor and row count."""
|
||||
|
||||
row_count: int
|
||||
cursor_row: int = 0
|
||||
|
||||
|
||||
class DummyActionsApp(AppActionsMixin):
|
||||
"""Minimal app host used for download naming hint tests."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Initialize state required by action helpers."""
|
||||
self.current_items: list[dict] = []
|
||||
self.download_manager = object()
|
||||
self.library_client = type(
|
||||
"Library", (), {"extract_asin": lambda self, item: item.get("asin")}
|
||||
)()
|
||||
self._table = FakeTable(row_count=0, cursor_row=0)
|
||||
|
||||
def update_status(self, message: str) -> None:
|
||||
"""Ignore status in this focused behavior test."""
|
||||
del message
|
||||
|
||||
def query_one(self, selector: str, _type: object) -> FakeTable:
|
||||
"""Return the fake table used in selection tests."""
|
||||
assert selector == "#library_table"
|
||||
return self._table
|
||||
|
||||
|
||||
def test_action_toggle_download_passes_selected_item() -> None:
|
||||
"""Ensure download toggle forwards selected item for naming hints."""
|
||||
app = DummyActionsApp()
|
||||
seen: list[tuple[str, str | None]] = []
|
||||
|
||||
def capture_toggle(asin: str, item: dict | None = None) -> None:
|
||||
"""Capture download toggle arguments for assertions."""
|
||||
seen.append((asin, item.get("title") if item else None))
|
||||
|
||||
setattr(cast(Any, app), "_toggle_download_async", capture_toggle)
|
||||
app._table = FakeTable(row_count=1, cursor_row=0)
|
||||
app.current_items = [{"asin": "ASIN", "title": "Book"}]
|
||||
app.action_toggle_download()
|
||||
assert seen == [("ASIN", "Book")]
|
||||
@@ -1,6 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, cast
|
||||
|
||||
from auditui.app.actions import AppActionsMixin
|
||||
|
||||
@@ -41,7 +42,13 @@ class DummyActionsApp(AppActionsMixin):
|
||||
self.current_items: list[dict] = []
|
||||
self.download_manager = object()
|
||||
self.library_client = type(
|
||||
"Library", (), {"extract_asin": lambda self, item: item.get("asin")}
|
||||
"Library",
|
||||
(),
|
||||
{
|
||||
"extract_asin": lambda self, item: item.get("asin"),
|
||||
"extract_title": lambda self, item: item.get("title"),
|
||||
"extract_authors": lambda self, item: item.get("authors"),
|
||||
},
|
||||
)()
|
||||
self.playback = FakePlayback(True)
|
||||
self.filter_text = "hello"
|
||||
@@ -61,10 +68,6 @@ class DummyActionsApp(AppActionsMixin):
|
||||
"""Record refresh invocations for filter tests."""
|
||||
self._refreshed += 1
|
||||
|
||||
def _start_playback_async(self, asin: str) -> None:
|
||||
"""Capture async playback launch argument."""
|
||||
self.messages.append(f"start:{asin}")
|
||||
|
||||
|
||||
def test_get_selected_asin_requires_non_empty_table() -> None:
|
||||
"""Ensure selection fails gracefully when table has no rows."""
|
||||
@@ -85,10 +88,18 @@ def test_get_selected_asin_returns_current_row_asin() -> None:
|
||||
def test_action_play_selected_starts_async_playback() -> None:
|
||||
"""Ensure play action calls async starter with selected ASIN."""
|
||||
app = DummyActionsApp()
|
||||
seen: list[str] = []
|
||||
|
||||
def capture_start(asin: str, item: dict | None = None) -> None:
|
||||
"""Capture playback start arguments for assertions."""
|
||||
suffix = f":{item.get('title')}" if item else ""
|
||||
seen.append(f"start:{asin}{suffix}")
|
||||
|
||||
setattr(cast(Any, app), "_start_playback_async", capture_start)
|
||||
app._table = FakeTable(row_count=1, cursor_row=0)
|
||||
app.current_items = [{"asin": "ASIN"}]
|
||||
app.current_items = [{"asin": "ASIN", "title": "Book"}]
|
||||
app.action_play_selected()
|
||||
assert app.messages[-1] == "start:ASIN"
|
||||
assert seen[-1] == "start:ASIN:Book"
|
||||
|
||||
|
||||
def test_action_toggle_playback_shows_hint_when_no_playback() -> None:
|
||||
|
||||
34
tests/app/test_app_table_row_keys.py
Normal file
34
tests/app/test_app_table_row_keys.py
Normal file
@@ -0,0 +1,34 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from auditui.app.table import AppTableMixin
|
||||
|
||||
|
||||
class DummyTableApp(AppTableMixin):
|
||||
"""Minimal host exposing library client for row key helper tests."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Initialize a fake library client with ASIN extraction."""
|
||||
self.library_client = type(
|
||||
"Library",
|
||||
(),
|
||||
{"extract_asin": lambda self, item: item.get("asin")},
|
||||
)()
|
||||
|
||||
|
||||
def test_build_row_key_prefers_asin_and_remains_unique() -> None:
|
||||
"""Ensure duplicate ASINs receive deterministic unique key suffixes."""
|
||||
app = DummyTableApp()
|
||||
used: set[str] = set()
|
||||
item = {"asin": "ASIN1"}
|
||||
first = app._build_row_key(item, "Title", 0, used)
|
||||
second = app._build_row_key(item, "Title", 1, used)
|
||||
assert first == "ASIN1"
|
||||
assert second == "ASIN1#2"
|
||||
|
||||
|
||||
def test_build_row_key_falls_back_to_title_and_index() -> None:
|
||||
"""Ensure missing ASIN values use title-index fallback keys."""
|
||||
app = DummyTableApp()
|
||||
used: set[str] = set()
|
||||
key = app._build_row_key({"asin": None}, "Unknown Title", 3, used)
|
||||
assert key == "Unknown Title#3"
|
||||
@@ -1,6 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Any, cast
|
||||
|
||||
import pytest
|
||||
|
||||
@@ -17,9 +18,11 @@ def _manager_with_cache_dir(tmp_path: Path) -> DownloadManager:
|
||||
|
||||
|
||||
def test_sanitize_filename_replaces_invalid_characters() -> None:
|
||||
"""Ensure filesystem-invalid symbols are replaced with underscores."""
|
||||
"""Ensure filename normalization uses ASCII words and dashes."""
|
||||
manager = DownloadManager.__new__(DownloadManager)
|
||||
assert manager._sanitize_filename('a<>:"/\\|?*b') == "a_________b"
|
||||
assert (
|
||||
manager._sanitize_filename("Stephen King 11/22/63") == "Stephen-King-11-22-63"
|
||||
)
|
||||
|
||||
|
||||
def test_validate_download_url_accepts_only_http_schemes() -> None:
|
||||
@@ -35,8 +38,12 @@ def test_get_cached_path_and_remove_cached(
|
||||
) -> None:
|
||||
"""Ensure cache lookup and cache deletion work for valid files."""
|
||||
manager = _manager_with_cache_dir(tmp_path)
|
||||
monkeypatch.setattr(manager, "_get_name_from_asin", lambda asin: "My Book")
|
||||
cached_path = tmp_path / "My Book.aax"
|
||||
monkeypatch.setattr(
|
||||
manager,
|
||||
"_get_filename_stems_from_asin",
|
||||
lambda asin: ["Stephen-King_11-22-63", "11-22-63"],
|
||||
)
|
||||
cached_path = tmp_path / "Stephen-King_11-22-63.aax"
|
||||
cached_path.write_bytes(b"0" * MIN_FILE_SIZE)
|
||||
messages: list[str] = []
|
||||
assert manager.get_cached_path("ASIN123") == cached_path
|
||||
@@ -51,7 +58,34 @@ def test_get_cached_path_ignores_small_files(
|
||||
) -> None:
|
||||
"""Ensure undersized files are not treated as valid cache entries."""
|
||||
manager = _manager_with_cache_dir(tmp_path)
|
||||
monkeypatch.setattr(manager, "_get_name_from_asin", lambda asin: "My Book")
|
||||
cached_path = tmp_path / "My Book.aax"
|
||||
monkeypatch.setattr(
|
||||
manager,
|
||||
"_get_filename_stems_from_asin",
|
||||
lambda asin: ["Stephen-King_11-22-63", "11-22-63"],
|
||||
)
|
||||
cached_path = tmp_path / "Stephen-King_11-22-63.aax"
|
||||
cached_path.write_bytes(b"0" * (MIN_FILE_SIZE - 1))
|
||||
assert manager.get_cached_path("ASIN123") is None
|
||||
|
||||
|
||||
def test_get_filename_stems_include_author_title_and_legacy_title() -> None:
|
||||
"""Ensure filename candidates include new author_title and legacy title names."""
|
||||
manager = DownloadManager.__new__(DownloadManager)
|
||||
manager.client = cast(
|
||||
Any,
|
||||
type(
|
||||
"Client",
|
||||
(),
|
||||
{
|
||||
"get": lambda self, path, **kwargs: {
|
||||
"product": {
|
||||
"title": "11/22/63",
|
||||
"authors": [{"name": "Stephen King"}],
|
||||
}
|
||||
}
|
||||
},
|
||||
)(),
|
||||
)
|
||||
stems = manager._get_filename_stems_from_asin("B00TEST")
|
||||
assert stems[0] == "Stephen-King_11-22-63"
|
||||
assert "11-22-63" in stems
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Any, cast
|
||||
|
||||
import pytest
|
||||
|
||||
@@ -14,9 +15,14 @@ def _bare_manager(tmp_path: Path) -> DownloadManager:
|
||||
manager = DownloadManager.__new__(DownloadManager)
|
||||
manager.cache_dir = tmp_path
|
||||
manager.chunk_size = 1024
|
||||
manager.auth = type(
|
||||
"Auth", (), {"adp_token": "x", "locale": type("Loc", (), {"domain": "fr"})()}
|
||||
)()
|
||||
manager.auth = cast(
|
||||
Any,
|
||||
type(
|
||||
"Auth",
|
||||
(),
|
||||
{"adp_token": "x", "locale": type("Loc", (), {"domain": "fr"})()},
|
||||
)(),
|
||||
)
|
||||
return manager
|
||||
|
||||
|
||||
@@ -48,8 +54,12 @@ def test_get_or_download_uses_cached_file_when_available(
|
||||
) -> None:
|
||||
"""Ensure cached files bypass link generation and download work."""
|
||||
manager = _bare_manager(tmp_path)
|
||||
monkeypatch.setattr(manager, "_get_name_from_asin", lambda asin: "Book")
|
||||
cached_path = tmp_path / "Book.aax"
|
||||
monkeypatch.setattr(
|
||||
manager,
|
||||
"_get_filename_stems_from_asin",
|
||||
lambda asin, preferred_title=None, preferred_author=None: ["Author_Book"],
|
||||
)
|
||||
cached_path = tmp_path / "Author_Book.aax"
|
||||
cached_path.write_bytes(b"1" * MIN_FILE_SIZE)
|
||||
messages: list[str] = []
|
||||
assert manager.get_or_download("ASIN", notify=messages.append) == cached_path
|
||||
@@ -61,7 +71,11 @@ def test_get_or_download_reports_invalid_url(
|
||||
) -> None:
|
||||
"""Ensure workflow reports invalid download URLs and aborts."""
|
||||
manager = _bare_manager(tmp_path)
|
||||
monkeypatch.setattr(manager, "_get_name_from_asin", lambda asin: "Book")
|
||||
monkeypatch.setattr(
|
||||
manager,
|
||||
"_get_filename_stems_from_asin",
|
||||
lambda asin, preferred_title=None, preferred_author=None: ["Author_Book"],
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
manager, "_get_download_link", lambda asin, notify=None: "ftp://bad"
|
||||
)
|
||||
@@ -75,7 +89,11 @@ def test_get_or_download_handles_download_failure(
|
||||
) -> None:
|
||||
"""Ensure workflow reports failures when stream download does not complete."""
|
||||
manager = _bare_manager(tmp_path)
|
||||
monkeypatch.setattr(manager, "_get_name_from_asin", lambda asin: "Book")
|
||||
monkeypatch.setattr(
|
||||
manager,
|
||||
"_get_filename_stems_from_asin",
|
||||
lambda asin, preferred_title=None, preferred_author=None: ["Author_Book"],
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
manager, "_get_download_link", lambda asin, notify=None: "https://ok"
|
||||
)
|
||||
@@ -83,3 +101,60 @@ def test_get_or_download_handles_download_failure(
|
||||
messages: list[str] = []
|
||||
assert manager.get_or_download("ASIN", notify=messages.append) is None
|
||||
assert "Download failed" in messages
|
||||
|
||||
|
||||
def test_get_or_download_uses_preferred_naming_hints(
|
||||
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
||||
) -> None:
|
||||
"""Ensure preferred title/author are forwarded to filename stem selection."""
|
||||
manager = _bare_manager(tmp_path)
|
||||
captured: list[tuple[str | None, str | None]] = []
|
||||
|
||||
def stems(
|
||||
asin: str,
|
||||
preferred_title: str | None = None,
|
||||
preferred_author: str | None = None,
|
||||
) -> list[str]:
|
||||
"""Capture naming hints and return one deterministic filename stem."""
|
||||
del asin
|
||||
captured.append((preferred_title, preferred_author))
|
||||
return ["Author_Book"]
|
||||
|
||||
monkeypatch.setattr(manager, "_get_filename_stems_from_asin", stems)
|
||||
monkeypatch.setattr(manager, "_get_download_link", lambda asin, notify=None: None)
|
||||
manager.get_or_download(
|
||||
"ASIN",
|
||||
preferred_title="11/22/63",
|
||||
preferred_author="Stephen King",
|
||||
)
|
||||
assert captured == [("11/22/63", "Stephen King")]
|
||||
|
||||
|
||||
def test_get_or_download_retries_when_file_is_too_small(
|
||||
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
||||
) -> None:
|
||||
"""Ensure small downloads are retried and then reported with exact byte size."""
|
||||
manager = _bare_manager(tmp_path)
|
||||
monkeypatch.setattr(
|
||||
manager,
|
||||
"_get_filename_stems_from_asin",
|
||||
lambda asin, preferred_title=None, preferred_author=None: ["Author_Book"],
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
manager, "_get_download_link", lambda asin, notify=None: "https://ok"
|
||||
)
|
||||
attempts = {"count": 0}
|
||||
|
||||
def write_small_file(url: str, path: Path, notify=None) -> Path:
|
||||
"""Write an undersized file to trigger retry and final failure messages."""
|
||||
del url, notify
|
||||
attempts["count"] += 1
|
||||
path.write_bytes(b"x" * 100)
|
||||
return path
|
||||
|
||||
monkeypatch.setattr(manager, "_download_file", write_small_file)
|
||||
messages: list[str] = []
|
||||
assert manager.get_or_download("ASIN", notify=messages.append) is None
|
||||
assert attempts["count"] == 2
|
||||
assert any("retrying" in message for message in messages)
|
||||
assert any("file too small" in message for message in messages)
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Any, cast
|
||||
|
||||
from auditui.playback import controller_lifecycle as lifecycle_mod
|
||||
from auditui.playback.controller import PlaybackController
|
||||
@@ -62,14 +63,21 @@ def test_prepare_and_start_uses_last_position(monkeypatch) -> None:
|
||||
"""Ensure prepare flow resumes from saved position when available."""
|
||||
messages: list[str] = []
|
||||
lib = type("Lib", (), {"get_last_position": lambda self, asin: 75.0})()
|
||||
controller = PlaybackController(messages.append, lib)
|
||||
controller = PlaybackController(messages.append, cast(Any, lib))
|
||||
started: list[tuple] = []
|
||||
|
||||
class DM:
|
||||
"""Download manager shim returning path and activation token."""
|
||||
|
||||
def get_or_download(self, asin, notify):
|
||||
def get_or_download(
|
||||
self,
|
||||
asin,
|
||||
notify,
|
||||
preferred_title: str | None = None,
|
||||
preferred_author: str | None = None,
|
||||
):
|
||||
"""Return deterministic downloaded file path."""
|
||||
del asin, notify, preferred_title, preferred_author
|
||||
return Path("book.aax")
|
||||
|
||||
def get_activation_bytes(self):
|
||||
@@ -78,7 +86,7 @@ def test_prepare_and_start_uses_last_position(monkeypatch) -> None:
|
||||
|
||||
monkeypatch.setattr(controller, "start", lambda *args: started.append(args) or True)
|
||||
monkeypatch.setattr(lifecycle_mod.time, "time", lambda: 200.0)
|
||||
assert controller.prepare_and_start(DM(), "ASIN") is True
|
||||
assert controller.prepare_and_start(cast(Any, DM()), "ASIN") is True
|
||||
assert started and started[0][3] == 75.0
|
||||
assert "Resuming from 01:15" in messages
|
||||
|
||||
@@ -87,7 +95,7 @@ def test_toggle_playback_uses_pause_and_resume_paths(monkeypatch) -> None:
|
||||
"""Ensure toggle dispatches pause or resume based on paused flag."""
|
||||
controller, _ = _controller()
|
||||
controller.is_playing = True
|
||||
controller.playback_process = Proc(None)
|
||||
controller.playback_process = cast(Any, Proc(None))
|
||||
called: list[str] = []
|
||||
monkeypatch.setattr(controller, "pause", lambda: called.append("pause"))
|
||||
monkeypatch.setattr(controller, "resume", lambda: called.append("resume"))
|
||||
|
||||
Reference in New Issue
Block a user