Compare commits

...

32 Commits

Author SHA1 Message Date
1e6f1544db Merge pull request 'Massive refactoring' (#1) from new-architecture into main
Reviewed-on: #1
2026-02-18 04:29:20 +01:00
26cba97cbd chore(changelog): document load-time tuning and duplicate row key fix 2026-02-18 04:28:35 +01:00
175bb7cbdc test(app): add coverage for unique table row key generation 2026-02-18 04:28:28 +01:00
bf0e70e9d9 fix(table): use unique row keys to avoid duplicate title crashes 2026-02-18 04:28:20 +01:00
cb4104e59a perf(app): remove eager search cache priming during library load 2026-02-18 04:28:13 +01:00
570639e988 perf(library): tune first-page probe order for faster medium-library loads 2026-02-18 04:28:06 +01:00
5ba0fafbc1 chore: update changelog 2026-02-18 04:20:56 +01:00
bed0ac4fea test(downloads): add regression coverage for too-small download retry behavior 2026-02-18 04:19:39 +01:00
0a909484e3 fix(downloads): retry undersized downloads and surface precise size failure messages 2026-02-18 04:19:33 +01:00
ecdd953ff4 refactor(downloads): split download streaming into focused helpers and reduce complexity 2026-02-18 04:12:54 +01:00
4ba2c43c93 clean: remove unused import 2026-02-18 04:09:04 +01:00
4b1924edd8 chore: update changelog 2026-02-18 04:08:41 +01:00
da20e84513 docs: update readme 2026-02-18 04:02:20 +01:00
dcb43f65dd chore: fix wording 2026-02-18 03:57:51 +01:00
beca8ee085 perf(library): optimize paginated fetch with bounded concurrent scheduling 2026-02-18 03:56:44 +01:00
e813267d5e refactor(library): decompose monolithic LibraryClient into fetch/extract/positions/finished/format mixins while preserving public behavior 2026-02-18 03:54:16 +01:00
eca58423dc refactor(constants): split constants into domain modules with compatibility exports 2026-02-18 03:44:26 +01:00
307368480a chore: update changelog 2026-02-18 03:39:53 +01:00
a8add30928 chore(changelog): document download filename metadata fallback fix 2026-02-18 03:39:18 +01:00
3e6e31c2db test(playback): verify prepare_and_start passes naming hints to downloads 2026-02-18 03:39:13 +01:00
6335f8bbac test(downloads): cover preferred naming hint propagation in get_or_download 2026-02-18 03:39:09 +01:00
0cf2644f55 test(downloads): validate author_title stem generation and cache fallbacks 2026-02-18 03:39:04 +01:00
597e82dc20 test(app): verify playback start receives selected item metadata 2026-02-18 03:38:58 +01:00
25d56cf407 test(app): cover selected item hint forwarding for downloads 2026-02-18 03:38:54 +01:00
76c991600c fix(playback): forward preferred title and author to download manager 2026-02-18 03:38:50 +01:00
95e641a527 fix(app): pass selected item title and author as download naming hints 2026-02-18 03:38:46 +01:00
8f8cdf7bfa fix(downloads): prefer library metadata for author_title filenames with fallback stems 2026-02-18 03:38:41 +01:00
9c19891443 chore: update changelog 2026-02-18 03:21:19 +01:00
01de75871a feat: versionning 2026-02-18 03:21:15 +01:00
e88dcee155 test: cover app and playback controller mixin behavior 2026-02-18 03:17:48 +01:00
4bc9b3fd3f test: add focused playback helper unit coverage 2026-02-18 03:17:42 +01:00
cd99960f2f test: reorganize core suite into explicit domain files 2026-02-18 03:17:33 +01:00
57 changed files with 3005 additions and 1064 deletions

View File

@@ -5,6 +5,24 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [0.2.0] - 2026-02-18
### Changed
- massive code refactoring
- complete test suite revamp
- updated download cache naming to use `Author_Title` format with normalized separators
- optimized library pagination fetch with bounded concurrent scheduling
- adjusted library first-page probe order to prefer larger page sizes for medium libraries
- removed eager search cache priming during library load to reduce startup work
### Fixed
- reused library metadata for download filename generation to avoid `Unknown-Author_Unknown-Title` when title/author are already known in the UI
- fixed Audible last-position request parameter handling after library client refactor
- added retry behavior and explicit size diagnostics when downloaded files are too small
- prevented table rendering crashes by generating unique row keys instead of using title-only keys
## [0.1.6] - 2026-02-16
### Changed

View File

@@ -2,7 +2,7 @@
A terminal-based user interface (TUI) client for [Audible](https://www.audible.fr/), written in Python 3.
Currently, the only available theme is Catppuccin Mocha, following their [style guide](https://github.com/catppuccin/catppuccin/blob/main/docs/style-guide.md), as it's my preferred theme across most of my tools.
The interface currently ships with a single built-in theme.
## Requirements
@@ -36,9 +36,9 @@ auditui --version
All set, run `auditui configure` to set up authentication, and then `auditui` to start the TUI.
### Workaround for Python 3.13 linux distribution
### Workaround for Python 3.13 Linux distributions
On some Linux distributions, Python 3.13 is already the default. So you have to install Python 3.12 manually before using `pipx`.
On some Linux distributions, Python 3.13 is already the default. In that case, install Python 3.12 manually before using `pipx`.
For Arch Linux:
@@ -52,7 +52,7 @@ Once you have Python 3.12, run:
pipx install git+https://git.kharec.info/Kharec/auditui.git --python python3.12
```
As Python <3.14 is supported on `master` branch of the upstream [`audible`](https://github.com/mkb79/Audible), this should be temporary until the next version.
This workaround is temporary and depends on upstream `audible` compatibility updates.
## Upgrade
@@ -90,6 +90,8 @@ pipx upgrade auditui
Books are downloaded to `~/.cache/auditui/books`.
Downloaded files use a normalized `Author_Title.aax` naming format. For example, `Stephen King` and `11/22/63` become `Stephen-King_11-22-63.aax`.
The `d` key toggles the download state for the selected book: if the book is not cached, pressing `d` will download it; if it's already cached, pressing `d` will delete it from the cache.
To check the total size of your cache:

View File

@@ -1,3 +1,3 @@
"""Auditui: Audible TUI client. One folder per module; all code lives inside module packages."""
"""Auditui: Audible TUI client"""
__version__ = "0.1.6"
__version__ = "0.2.0"

View File

@@ -10,11 +10,8 @@ from ..ui import FilterScreen, HelpScreen, StatsScreen
class AppActionsMixin:
def _get_selected_asin(self) -> str | None:
if not self.download_manager:
self.update_status(
"Not authenticated. Please restart and authenticate.")
return None
def _get_selected_item(self) -> dict | None:
"""Return the currently selected library item from the table."""
table = self.query_one("#library_table", DataTable)
if table.row_count == 0:
self.update_status("No books available")
@@ -23,10 +20,27 @@ class AppActionsMixin:
if cursor_row >= len(self.current_items):
self.update_status("Invalid selection")
return None
return self.current_items[cursor_row]
def _get_naming_hints(self, item: dict | None) -> tuple[str | None, str | None]:
"""Return preferred title and author values used for download filenames."""
if not item or not self.library_client:
return (None, None)
return (
self.library_client.extract_title(item),
self.library_client.extract_authors(item),
)
def _get_selected_asin(self) -> str | None:
if not self.download_manager:
self.update_status("Not authenticated. Please restart and authenticate.")
return None
if not self.library_client:
self.update_status("Library client not available")
return None
selected_item = self.current_items[cursor_row]
selected_item = self._get_selected_item()
if not selected_item:
return None
asin = self.library_client.extract_asin(selected_item)
if not asin:
self.update_status("Could not get ASIN for selected book")
@@ -36,7 +50,7 @@ class AppActionsMixin:
def action_play_selected(self) -> None:
asin = self._get_selected_asin()
if asin:
self._start_playback_async(asin)
self._start_playback_async(asin, self._get_selected_item())
def action_toggle_playback(self) -> None:
if not self.playback.toggle_playback():
@@ -86,8 +100,7 @@ class AppActionsMixin:
return
if self.library_client.is_finished(selected_item):
self.call_from_thread(self.update_status,
"Already marked as finished")
self.call_from_thread(self.update_status, "Already marked as finished")
return
success = self.library_client.mark_as_finished(asin, selected_item)
@@ -132,28 +145,36 @@ class AppActionsMixin:
def action_toggle_download(self) -> None:
asin = self._get_selected_asin()
if asin:
self._toggle_download_async(asin)
self._toggle_download_async(asin, self._get_selected_item())
@work(exclusive=True, thread=True)
def _toggle_download_async(self, asin: str) -> None:
def _toggle_download_async(self, asin: str, item: dict | None = None) -> None:
if not self.download_manager:
return
preferred_title, preferred_author = self._get_naming_hints(item)
if self.download_manager.is_cached(asin):
self.download_manager.remove_cached(
asin, self._thread_status_update)
self.download_manager.remove_cached(asin, self._thread_status_update)
else:
self.download_manager.get_or_download(
asin, self._thread_status_update)
asin,
self._thread_status_update,
preferred_title=preferred_title,
preferred_author=preferred_author,
)
self.call_from_thread(self._refresh_table)
@work(exclusive=True, thread=True)
def _start_playback_async(self, asin: str) -> None:
def _start_playback_async(self, asin: str, item: dict | None = None) -> None:
if not self.download_manager:
return
preferred_title, preferred_author = self._get_naming_hints(item)
self.playback.prepare_and_start(
self.download_manager,
asin,
self._thread_status_update,
preferred_title,
preferred_author,
)

View File

@@ -8,7 +8,7 @@ from textual.events import Resize
from textual.widgets import DataTable, ProgressBar, Static
from .. import __version__
from ..constants import TABLE_COLUMN_DEFS, TABLE_CSS
from ..constants import TABLE_COLUMN_DEFS
class AppLayoutMixin:

View File

@@ -16,16 +16,15 @@ class AppLibraryMixin:
return
try:
all_items = self.library_client.fetch_all_items(
self._thread_status_update)
all_items = self.library_client.fetch_all_items(self._thread_status_update)
self.call_from_thread(self.on_library_loaded, all_items)
except (OSError, ValueError, KeyError) as exc:
self.call_from_thread(self.on_library_error, str(exc))
def on_library_loaded(self, items: list[LibraryItem]) -> None:
"""Store fetched items and refresh the active library view."""
self.all_items = items
self._search_text_cache.clear()
self._prime_search_cache(items)
self.update_status(f"Loaded {len(items)} books")
if self.show_all_mode:
self.show_all()

View File

@@ -15,6 +15,7 @@ from textual.widgets import DataTable, Static
class AppTableMixin:
def _populate_table(self, items: list[LibraryItem]) -> None:
"""Render library items into the table with stable unique row keys."""
table = self.query_one("#library_table", DataTable)
table.clear()
@@ -22,18 +23,41 @@ class AppTableMixin:
self.update_status("No books found.")
return
for item in items:
used_keys: set[str] = set()
for index, item in enumerate(items):
title, author, runtime, progress, downloaded = format_item_as_row(
item, self.library_client, self.download_manager
)
table.add_row(title, author, runtime,
progress, downloaded, key=title)
row_key = self._build_row_key(item, title, index, used_keys)
table.add_row(title, author, runtime, progress, downloaded, key=row_key)
self.current_items = items
status = self.query_one("#status", Static)
status.display = False
self._apply_column_widths(table)
def _build_row_key(
self,
item: LibraryItem,
title: str,
index: int,
used_keys: set[str],
) -> str:
"""Return a unique table row key derived from ASIN when available."""
asin = self.library_client.extract_asin(item) if self.library_client else None
base_key = asin or f"{title}#{index}"
if base_key not in used_keys:
used_keys.add(base_key)
return base_key
suffix = 2
candidate = f"{base_key}#{suffix}"
while candidate in used_keys:
suffix += 1
candidate = f"{base_key}#{suffix}"
used_keys.add(candidate)
return candidate
def _refresh_table(self) -> None:
if self.current_items:
self._populate_table(self.current_items)
@@ -79,11 +103,9 @@ class AppTableMixin:
items = self.all_items
if self.filter_text:
items = filter_items(items, self.filter_text,
self._get_search_text)
items = filter_items(items, self.filter_text, self._get_search_text)
self._populate_table(items)
self.update_status(
f"Filter: '{self.filter_text}' ({len(items)} books)")
self.update_status(f"Filter: '{self.filter_text}' ({len(items)} books)")
return
if not self.show_all_mode and self.library_client:
@@ -97,6 +119,7 @@ class AppTableMixin:
if cached is not None:
return cached
from ..library import build_search_text
search_text = build_search_text(item, self.library_client)
self._search_text_cache[cache_key] = search_text
return search_text

View File

@@ -1,278 +1,29 @@
"""Paths, API/config values, and CSS used across the application."""
"""Compatibility exports for constants grouped by domain modules."""
from pathlib import Path
AUTH_PATH = Path.home() / ".config" / "auditui" / "auth.json"
CONFIG_PATH = Path.home() / ".config" / "auditui" / "config.json"
CACHE_DIR = Path.home() / ".cache" / "auditui" / "books"
DOWNLOAD_URL = "https://cde-ta-g7g.amazon.com/FionaCDEServiceEngine/FSDownloadContent"
DEFAULT_CODEC = "LC_128_44100_stereo"
MIN_FILE_SIZE = 1024 * 1024
DEFAULT_CHUNK_SIZE = 8192
TABLE_COLUMN_DEFS = (
("Title", 4),
("Author", 3),
("Length", 1),
("Progress", 1),
("Downloaded", 1),
from .downloads import DEFAULT_CHUNK_SIZE, DEFAULT_CODEC, DOWNLOAD_URL, MIN_FILE_SIZE
from .library import (
AUTHOR_NAME_DISPLAY_LENGTH,
AUTHOR_NAME_MAX_LENGTH,
PROGRESS_COLUMN_INDEX,
)
from .paths import AUTH_PATH, CACHE_DIR, CONFIG_PATH
from .playback import SEEK_SECONDS
from .table import TABLE_COLUMN_DEFS
from .ui import TABLE_CSS
AUTHOR_NAME_MAX_LENGTH = 40
AUTHOR_NAME_DISPLAY_LENGTH = 37
PROGRESS_COLUMN_INDEX = 3
SEEK_SECONDS = 30.0
TABLE_CSS = """
Screen {
background: #141622;
}
#top_bar {
background: #10131f;
color: #d5d9f0;
text-style: bold;
height: 1;
margin: 0;
padding: 0;
}
#top_left,
#top_center,
#top_right {
width: 1fr;
padding: 0 1;
background: #10131f;
margin: 0;
}
#top_left {
text-align: left;
}
#top_center {
text-align: center;
}
#top_right {
text-align: right;
}
DataTable {
width: 100%;
height: 1fr;
background: #141622;
color: #c7cfe8;
border: solid #262a3f;
scrollbar-size-horizontal: 0;
}
DataTable:focus {
border: solid #7aa2f7;
}
DataTable > .datatable--header {
background: #1b2033;
color: #b9c3e3;
text-style: bold;
}
DataTable > .datatable--cursor {
background: #232842;
color: #e6ebff;
}
DataTable > .datatable--odd-row {
background: #121422;
}
DataTable > .datatable--even-row {
background: #15182a;
}
Static {
height: 1;
text-align: center;
background: #10131f;
color: #c7cfe8;
}
Static#status {
color: #b6bfdc;
}
Static#progress_info {
color: #7aa2f7;
text-style: bold;
margin: 0;
padding: 0;
text-align: center;
width: 100%;
}
#progress_bar_container {
align: center middle;
width: 100%;
height: 1;
}
ProgressBar#progress_bar {
height: 1;
background: #10131f;
border: none;
margin: 0;
padding: 0;
width: 50%;
}
ProgressBar#progress_bar Bar {
width: 100%;
}
ProgressBar#progress_bar > .progress-bar--track {
background: #262a3f;
}
ProgressBar#progress_bar > .progress-bar--bar {
background: #8bd5ca;
}
HelpScreen,
StatsScreen,
FilterScreen {
align: center middle;
background: rgba(0, 0, 0, 0.7);
}
HelpScreen Static,
StatsScreen Static,
FilterScreen Static {
background: transparent;
}
StatsScreen #help_container {
width: auto;
min-width: 55;
max-width: 70;
}
StatsScreen #help_content {
align: center middle;
width: 100%;
}
StatsScreen .help_list {
width: 100%;
}
StatsScreen .help_list > ListItem {
background: transparent;
height: 1;
}
StatsScreen .help_list > ListItem:hover {
background: #232842;
}
StatsScreen .help_list > ListItem > Label {
width: 100%;
text-align: left;
padding-left: 2;
}
#help_container {
width: 72%;
max-width: 90;
min-width: 44;
height: auto;
max-height: 80%;
min-height: 14;
background: #181a2a;
border: heavy #7aa2f7;
padding: 1 1;
}
#help_title {
width: 100%;
height: 2;
text-align: center;
text-style: bold;
color: #7aa2f7;
content-align: center middle;
margin-bottom: 0;
border-bottom: solid #4b5165;
}
#help_content {
width: 100%;
height: auto;
padding: 0;
margin: 0 0 1 0;
align: center middle;
}
.help_list {
width: 100%;
height: auto;
background: transparent;
padding: 0;
scrollbar-size: 0 0;
}
.help_list > ListItem {
background: #1b1f33;
padding: 0 1;
height: 1;
}
.help_list > ListItem:hover {
background: #2a2f45;
}
.help_list > ListItem > Label {
width: 100%;
padding: 0;
}
#help_footer {
width: 100%;
height: 2;
text-align: center;
content-align: center middle;
color: #b6bfdc;
margin-top: 0;
border-top: solid #4b5165;
}
#filter_container {
width: 60;
height: auto;
background: #181a2a;
border: heavy #7aa2f7;
padding: 1 2;
}
#filter_title {
width: 100%;
height: 2;
text-align: center;
text-style: bold;
color: #7aa2f7;
content-align: center middle;
margin-bottom: 1;
}
#filter_input {
width: 100%;
margin: 1 0;
}
#filter_footer {
width: 100%;
height: 2;
text-align: center;
content-align: center middle;
color: #b6bfdc;
margin-top: 1;
}
"""
__all__ = [
"AUTH_PATH",
"CONFIG_PATH",
"CACHE_DIR",
"DOWNLOAD_URL",
"DEFAULT_CODEC",
"MIN_FILE_SIZE",
"DEFAULT_CHUNK_SIZE",
"TABLE_COLUMN_DEFS",
"AUTHOR_NAME_MAX_LENGTH",
"AUTHOR_NAME_DISPLAY_LENGTH",
"PROGRESS_COLUMN_INDEX",
"SEEK_SECONDS",
"TABLE_CSS",
]

View File

@@ -0,0 +1,6 @@
"""Download-related constants for Audible file retrieval."""
DOWNLOAD_URL = "https://cde-ta-g7g.amazon.com/FionaCDEServiceEngine/FSDownloadContent"
DEFAULT_CODEC = "LC_128_44100_stereo"
MIN_FILE_SIZE = 1024 * 1024
DEFAULT_CHUNK_SIZE = 8192

View File

@@ -0,0 +1,5 @@
"""Library and table formatting constants."""
AUTHOR_NAME_MAX_LENGTH = 40
AUTHOR_NAME_DISPLAY_LENGTH = 37
PROGRESS_COLUMN_INDEX = 3

View File

@@ -0,0 +1,8 @@
"""Filesystem paths used by configuration and caching."""
from pathlib import Path
AUTH_PATH = Path.home() / ".config" / "auditui" / "auth.json"
CONFIG_PATH = Path.home() / ".config" / "auditui" / "config.json"
CACHE_DIR = Path.home() / ".cache" / "auditui" / "books"

View File

@@ -0,0 +1,3 @@
"""Playback behavior constants."""
SEEK_SECONDS = 30.0

View File

@@ -0,0 +1,9 @@
"""Main library table column definitions."""
TABLE_COLUMN_DEFS = (
("Title", 4),
("Author", 3),
("Length", 1),
("Progress", 1),
("Downloaded", 1),
)

255
auditui/constants/ui.py Normal file
View File

@@ -0,0 +1,255 @@
"""Textual CSS constants for the application UI."""
TABLE_CSS = """
Screen {
background: #141622;
}
#top_bar {
background: #10131f;
color: #d5d9f0;
text-style: bold;
height: 1;
margin: 0;
padding: 0;
}
#top_left,
#top_center,
#top_right {
width: 1fr;
padding: 0 1;
background: #10131f;
margin: 0;
}
#top_left {
text-align: left;
}
#top_center {
text-align: center;
}
#top_right {
text-align: right;
}
DataTable {
width: 100%;
height: 1fr;
background: #141622;
color: #c7cfe8;
border: solid #262a3f;
scrollbar-size-horizontal: 0;
}
DataTable:focus {
border: solid #7aa2f7;
}
DataTable > .datatable--header {
background: #1b2033;
color: #b9c3e3;
text-style: bold;
}
DataTable > .datatable--cursor {
background: #232842;
color: #e6ebff;
}
DataTable > .datatable--odd-row {
background: #121422;
}
DataTable > .datatable--even-row {
background: #15182a;
}
Static {
height: 1;
text-align: center;
background: #10131f;
color: #c7cfe8;
}
Static#status {
color: #b6bfdc;
}
Static#progress_info {
color: #7aa2f7;
text-style: bold;
margin: 0;
padding: 0;
text-align: center;
width: 100%;
}
#progress_bar_container {
align: center middle;
width: 100%;
height: 1;
}
ProgressBar#progress_bar {
height: 1;
background: #10131f;
border: none;
margin: 0;
padding: 0;
width: 50%;
}
ProgressBar#progress_bar Bar {
width: 100%;
}
ProgressBar#progress_bar > .progress-bar--track {
background: #262a3f;
}
ProgressBar#progress_bar > .progress-bar--bar {
background: #8bd5ca;
}
HelpScreen,
StatsScreen,
FilterScreen {
align: center middle;
background: rgba(0, 0, 0, 0.7);
}
HelpScreen Static,
StatsScreen Static,
FilterScreen Static {
background: transparent;
}
StatsScreen #help_container {
width: auto;
min-width: 55;
max-width: 70;
}
StatsScreen #help_content {
align: center middle;
width: 100%;
}
StatsScreen .help_list {
width: 100%;
}
StatsScreen .help_list > ListItem {
background: transparent;
height: 1;
}
StatsScreen .help_list > ListItem:hover {
background: #232842;
}
StatsScreen .help_list > ListItem > Label {
width: 100%;
text-align: left;
padding-left: 2;
}
#help_container {
width: 72%;
max-width: 90;
min-width: 44;
height: auto;
max-height: 80%;
min-height: 14;
background: #181a2a;
border: heavy #7aa2f7;
padding: 1 1;
}
#help_title {
width: 100%;
height: 2;
text-align: center;
text-style: bold;
color: #7aa2f7;
content-align: center middle;
margin-bottom: 0;
border-bottom: solid #4b5165;
}
#help_content {
width: 100%;
height: auto;
padding: 0;
margin: 0 0 1 0;
align: center middle;
}
.help_list {
width: 100%;
height: auto;
background: transparent;
padding: 0;
scrollbar-size: 0 0;
}
.help_list > ListItem {
background: #1b1f33;
padding: 0 1;
height: 1;
}
.help_list > ListItem:hover {
background: #2a2f45;
}
.help_list > ListItem > Label {
width: 100%;
padding: 0;
}
#help_footer {
width: 100%;
height: 2;
text-align: center;
content-align: center middle;
color: #b6bfdc;
margin-top: 0;
border-top: solid #4b5165;
}
#filter_container {
width: 60;
height: auto;
background: #181a2a;
border: heavy #7aa2f7;
padding: 1 2;
}
#filter_title {
width: 100%;
height: 2;
text-align: center;
text-style: bold;
color: #7aa2f7;
content-align: center middle;
margin-bottom: 1;
}
#filter_input {
width: 100%;
margin: 1 0;
}
#filter_footer {
width: 100%;
height: 2;
text-align: center;
content-align: center middle;
color: #b6bfdc;
margin-top: 1;
}
"""

View File

@@ -1,7 +1,9 @@
"""Obtains AAX files from Audible (cache or download) and provides activation bytes."""
import re
import unicodedata
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
import audible
@@ -29,56 +31,94 @@ class DownloadManager:
chunk_size: int = DEFAULT_CHUNK_SIZE,
) -> None:
self.auth = auth
self.client = client
self.client: Any = client
self.cache_dir = cache_dir
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.chunk_size = chunk_size
self._http_client = httpx.Client(
auth=auth, timeout=30.0, follow_redirects=True)
self._http_client = httpx.Client(auth=auth, timeout=30.0, follow_redirects=True)
self._download_client = httpx.Client(
timeout=httpx.Timeout(connect=30.0, read=None,
write=30.0, pool=30.0),
timeout=httpx.Timeout(connect=30.0, read=None, write=30.0, pool=30.0),
follow_redirects=True,
)
def get_or_download(
self, asin: str, notify: StatusCallback | None = None
self,
asin: str,
notify: StatusCallback | None = None,
preferred_title: str | None = None,
preferred_author: str | None = None,
) -> Path | None:
"""Return local path to AAX file; download and cache if not present."""
title = self._get_name_from_asin(asin) or asin
safe_title = self._sanitize_filename(title)
local_path = self.cache_dir / f"{safe_title}.aax"
if local_path.exists() and local_path.stat().st_size >= MIN_FILE_SIZE:
filename_stems = self._get_filename_stems_from_asin(
asin,
preferred_title=preferred_title,
preferred_author=preferred_author,
)
local_path = self.cache_dir / f"{filename_stems[0]}.aax"
cached_path = self._find_cached_path(filename_stems)
if cached_path:
if notify:
notify(f"Using cached file: {local_path.name}")
return local_path
notify(f"Using cached file: {cached_path.name}")
return cached_path
if notify:
notify(f"Downloading to {local_path.name}...")
if not self._download_to_valid_file(asin, local_path, notify):
return None
return local_path
def _download_to_valid_file(
self,
asin: str,
local_path: Path,
notify: StatusCallback | None = None,
) -> bool:
"""Download with one retry and ensure resulting file has a valid size."""
for attempt in range(1, 3):
if not self._attempt_download(asin, local_path, notify):
return False
if local_path.exists() and local_path.stat().st_size >= MIN_FILE_SIZE:
return True
downloaded_size = local_path.stat().st_size if local_path.exists() else 0
if notify and attempt == 1:
notify(
f"Downloaded file too small ({downloaded_size} bytes), retrying..."
)
if notify and attempt == 2:
notify(
f"Download failed: file too small ({downloaded_size} bytes, expected >= {MIN_FILE_SIZE})"
)
self._cleanup_partial_file(local_path)
return False
def _attempt_download(
self,
asin: str,
local_path: Path,
notify: StatusCallback | None = None,
) -> bool:
"""Perform one download attempt including link lookup and URL validation."""
dl_link = self._get_download_link(asin, notify=notify)
if not dl_link:
if notify:
notify("Failed to get download link")
return None
return False
if not self._validate_download_url(dl_link):
if notify:
notify("Invalid download URL")
return None
return False
if not self._download_file(dl_link, local_path, notify):
if notify:
notify("Download failed")
return None
return False
if not local_path.exists() or local_path.stat().st_size < MIN_FILE_SIZE:
if notify:
notify("Download failed or file too small")
return None
return local_path
return True
def get_activation_bytes(self) -> str | None:
"""Return activation bytes as hex string for ffplay/ffmpeg."""
@@ -92,12 +132,7 @@ class DownloadManager:
def get_cached_path(self, asin: str) -> Path | None:
"""Return path to cached AAX file if it exists and is valid size."""
title = self._get_name_from_asin(asin) or asin
safe_title = self._sanitize_filename(title)
local_path = self.cache_dir / f"{safe_title}.aax"
if local_path.exists() and local_path.stat().st_size >= MIN_FILE_SIZE:
return local_path
return None
return self._find_cached_path(self._get_filename_stems_from_asin(asin))
def is_cached(self, asin: str) -> bool:
"""Return True if the title is present in cache with valid size."""
@@ -130,20 +165,68 @@ class DownloadManager:
return False
def _sanitize_filename(self, filename: str) -> str:
"""Remove invalid characters from filename."""
return re.sub(r'[<>:"/\\|?*]', "_", filename)
"""Normalize a filename segment with ASCII letters, digits, and dashes."""
ascii_text = unicodedata.normalize("NFKD", filename)
ascii_text = ascii_text.encode("ascii", "ignore").decode("ascii")
ascii_text = re.sub(r"[’'`]+", "", ascii_text)
ascii_text = re.sub(r"[^A-Za-z0-9]+", "-", ascii_text)
ascii_text = re.sub(r"-+", "-", ascii_text)
ascii_text = ascii_text.strip("-._")
return ascii_text or "Unknown"
def _find_cached_path(self, filename_stems: list[str]) -> Path | None:
"""Return the first valid cached path matching any candidate filename stem."""
for filename_stem in filename_stems:
local_path = self.cache_dir / f"{filename_stem}.aax"
if local_path.exists() and local_path.stat().st_size >= MIN_FILE_SIZE:
return local_path
return None
def _get_filename_stems_from_asin(
self,
asin: str,
preferred_title: str | None = None,
preferred_author: str | None = None,
) -> list[str]:
"""Build preferred and fallback cache filename stems for an ASIN."""
if preferred_title:
preferred_combined = (
f"{self._sanitize_filename(preferred_author or 'Unknown Author')}_"
f"{self._sanitize_filename(preferred_title)}"
)
preferred_legacy = self._sanitize_filename(preferred_title)
fallback_asin = self._sanitize_filename(asin)
return list(
dict.fromkeys([preferred_combined, preferred_legacy, fallback_asin])
)
def _get_name_from_asin(self, asin: str) -> str | None:
"""Get the title/name of a book from its ASIN."""
try:
product_info = self.client.get(
path=f"1.0/catalog/products/{asin}",
response_groups="product_desc,product_attrs",
**{"response_groups": "contributors,product_desc,product_attrs"},
)
product = product_info.get("product", {})
return product.get("title") or "Unknown Title"
except (OSError, ValueError, KeyError):
return None
title = product.get("title") or "Unknown Title"
author = self._get_primary_author(product)
combined = (
f"{self._sanitize_filename(author)}_{self._sanitize_filename(title)}"
)
legacy_title = self._sanitize_filename(title)
fallback_asin = self._sanitize_filename(asin)
return list(dict.fromkeys([combined, legacy_title, fallback_asin]))
except (OSError, ValueError, KeyError, AttributeError):
return [self._sanitize_filename(asin)]
def _get_primary_author(self, product: dict) -> str:
"""Extract a primary author name from product metadata."""
contributors = product.get("authors") or product.get("contributors") or []
for contributor in contributors:
if not isinstance(contributor, dict):
continue
name = contributor.get("name")
if isinstance(name, str) and name.strip():
return name
return "Unknown Author"
def _get_download_link(
self,
@@ -174,7 +257,8 @@ class DownloadManager:
if not link:
link = str(response.url)
tld = self.auth.locale.domain
locale = getattr(self.auth, "locale", None)
tld = getattr(locale, "domain", "com")
return link.replace("cds.audible.com", f"cds.audible.{tld}")
except httpx.HTTPError as exc:
@@ -194,19 +278,7 @@ class DownloadManager:
with self._download_client.stream("GET", url) as response:
response.raise_for_status()
total_size = int(response.headers.get("content-length", 0))
downloaded = 0
with open(dest_path, "wb") as file_handle:
for chunk in response.iter_bytes(chunk_size=self.chunk_size):
file_handle.write(chunk)
downloaded += len(chunk)
if total_size > 0 and notify:
percent = (downloaded / total_size) * 100
downloaded_mb = downloaded / (1024 * 1024)
total_mb = total_size / (1024 * 1024)
notify(
f"Downloading: {percent:.1f}% ({downloaded_mb:.1f}/{total_mb:.1f} MB)"
)
self._stream_to_file(response, dest_path, total_size, notify)
return dest_path
except httpx.HTTPStatusError as exc:
@@ -214,31 +286,56 @@ class DownloadManager:
notify(
f"Download HTTP error: {exc.response.status_code} {exc.response.reason_phrase}"
)
try:
if dest_path.exists() and dest_path.stat().st_size < MIN_FILE_SIZE:
dest_path.unlink()
except OSError:
pass
self._cleanup_partial_file(dest_path)
return None
except httpx.HTTPError as exc:
if notify:
notify(f"Download network error: {exc!s}")
try:
if dest_path.exists() and dest_path.stat().st_size < MIN_FILE_SIZE:
dest_path.unlink()
except OSError:
pass
self._cleanup_partial_file(dest_path)
return None
except (OSError, ValueError, KeyError) as exc:
if notify:
notify(f"Download error: {exc!s}")
try:
if dest_path.exists() and dest_path.stat().st_size < MIN_FILE_SIZE:
dest_path.unlink()
except OSError:
pass
self._cleanup_partial_file(dest_path)
return None
def _stream_to_file(
self,
response: httpx.Response,
dest_path: Path,
total_size: int,
notify: StatusCallback | None = None,
) -> None:
"""Write streamed response bytes to disk and emit progress messages."""
downloaded = 0
with open(dest_path, "wb") as file_handle:
for chunk in response.iter_bytes(chunk_size=self.chunk_size):
file_handle.write(chunk)
downloaded += len(chunk)
self._notify_download_progress(downloaded, total_size, notify)
def _notify_download_progress(
self,
downloaded: int,
total_size: int,
notify: StatusCallback | None = None,
) -> None:
"""Emit a formatted progress message when total size is known."""
if total_size <= 0 or not notify:
return
percent = (downloaded / total_size) * 100
downloaded_mb = downloaded / (1024 * 1024)
total_mb = total_size / (1024 * 1024)
notify(f"Downloading: {percent:.1f}% ({downloaded_mb:.1f}/{total_mb:.1f} MB)")
def _cleanup_partial_file(self, dest_path: Path) -> None:
"""Remove undersized partial download files after transfer failures."""
try:
if dest_path.exists() and dest_path.stat().st_size < MIN_FILE_SIZE:
dest_path.unlink()
except OSError:
return
def close(self) -> None:
"""Close internal HTTP clients. Safe to call multiple times."""
if hasattr(self, "_http_client"):

View File

@@ -1,365 +1,25 @@
"""Client for the Audible library API."""
"""Client facade for Audible library fetch, extraction, and progress updates."""
from concurrent.futures import ThreadPoolExecutor, as_completed
from __future__ import annotations
import audible
from ..types import LibraryItem, StatusCallback
from .client_extract import LibraryClientExtractMixin
from .client_fetch import LibraryClientFetchMixin
from .client_finished import LibraryClientFinishedMixin
from .client_format import LibraryClientFormatMixin
from .client_positions import LibraryClientPositionsMixin
class LibraryClient:
"""Client for the Audible library API. Fetches items, extracts metadata, and updates positions."""
class LibraryClient(
LibraryClientFetchMixin,
LibraryClientExtractMixin,
LibraryClientPositionsMixin,
LibraryClientFinishedMixin,
LibraryClientFormatMixin,
):
"""Audible library client composed from focused behavior mixins."""
def __init__(self, client: audible.Client) -> None:
"""Store authenticated Audible client used by all operations."""
self.client = client
def fetch_all_items(self, on_progress: StatusCallback | None = None) -> list[LibraryItem]:
"""Fetch all library items from the API."""
response_groups = (
"contributors,media,product_attrs,product_desc,product_details,"
"is_finished,listening_status,percent_complete"
)
return self._fetch_all_pages(response_groups, on_progress)
def _fetch_page(
self, page: int, page_size: int, response_groups: str
) -> tuple[int, list[LibraryItem]]:
"""Fetch a single page of library items from the API."""
library = self.client.get(
path="library",
num_results=page_size,
page=page,
response_groups=response_groups,
)
items = library.get("items", [])
return page, list(items)
def _fetch_all_pages(
self, response_groups: str, on_progress: StatusCallback | None = None
) -> list[LibraryItem]:
"""Fetch all pages of library items using parallel requests."""
library_response = None
page_size = 200
for attempt_size in [200, 100, 50]:
try:
library_response = self.client.get(
path="library",
num_results=attempt_size,
page=1,
response_groups=response_groups,
)
page_size = attempt_size
break
except Exception:
continue
if not library_response:
return []
first_page_items = library_response.get("items", [])
if not first_page_items:
return []
all_items: list[LibraryItem] = list(first_page_items)
if on_progress:
on_progress(f"Fetched page 1 ({len(first_page_items)} items)...")
if len(first_page_items) < page_size:
return all_items
total_items_estimate = library_response.get(
"total_results") or library_response.get("total")
if total_items_estimate:
estimated_pages = (total_items_estimate +
page_size - 1) // page_size
estimated_pages = min(estimated_pages, 1000)
else:
estimated_pages = 500
max_workers = 50
page_results: dict[int, list[LibraryItem]] = {}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_page: dict = {}
for page in range(2, estimated_pages + 1):
future = executor.submit(
self._fetch_page, page, page_size, response_groups
)
future_to_page[future] = page
completed_count = 0
total_items = len(first_page_items)
for future in as_completed(future_to_page):
page_num = future_to_page.pop(future)
try:
fetched_page, items = future.result()
if not items or len(items) < page_size:
for remaining_future in list(future_to_page.keys()):
remaining_future.cancel()
break
page_results[fetched_page] = items
total_items += len(items)
completed_count += 1
if on_progress and completed_count % 20 == 0:
on_progress(
f"Fetched {completed_count} pages ({total_items} items)..."
)
except Exception:
pass
for page_num in sorted(page_results.keys()):
all_items.extend(page_results[page_num])
return all_items
def extract_title(self, item: LibraryItem) -> str:
"""Return the book title from a library item."""
product = item.get("product", {})
return (
product.get("title")
or item.get("title")
or product.get("asin", "Unknown Title")
)
def extract_authors(self, item: LibraryItem) -> str:
"""Return comma-separated author names from a library item."""
product = item.get("product", {})
authors = product.get("authors") or product.get("contributors") or []
if not authors and "authors" in item:
authors = item.get("authors", [])
author_names = [a.get("name", "")
for a in authors if isinstance(a, dict)]
return ", ".join(author_names) or "Unknown"
def extract_runtime_minutes(self, item: LibraryItem) -> int | None:
"""Return runtime in minutes if present."""
product = item.get("product", {})
runtime_fields = [
"runtime_length_min",
"runtime_length",
"vLength",
"length",
"duration",
]
runtime = None
for field in runtime_fields:
runtime = product.get(field) or item.get(field)
if runtime is not None:
break
if runtime is None:
return None
if isinstance(runtime, dict):
return int(runtime.get("min", 0))
if isinstance(runtime, (int, float)):
return int(runtime)
return None
def extract_progress_info(self, item: LibraryItem) -> float | None:
"""Return progress percentage (0–100) if present."""
percent_complete = item.get("percent_complete")
listening_status = item.get("listening_status", {})
if isinstance(listening_status, dict) and percent_complete is None:
percent_complete = listening_status.get("percent_complete")
return float(percent_complete) if percent_complete is not None else None
def extract_asin(self, item: LibraryItem) -> str | None:
"""Return the ASIN for a library item."""
product = item.get("product", {})
return item.get("asin") or product.get("asin")
def is_finished(self, item: LibraryItem) -> bool:
"""Return True if the item is marked or inferred as finished."""
is_finished_flag = item.get("is_finished")
percent_complete = item.get("percent_complete")
listening_status = item.get("listening_status")
if isinstance(listening_status, dict):
is_finished_flag = is_finished_flag or listening_status.get(
"is_finished", False
)
if percent_complete is None:
percent_complete = listening_status.get("percent_complete", 0)
return bool(is_finished_flag) or (
isinstance(percent_complete, (int, float))
and percent_complete >= 100
)
def get_last_position(self, asin: str) -> float | None:
"""Get the last playback position for a book in seconds."""
try:
response = self.client.get(
path="1.0/annotations/lastpositions",
asins=asin,
)
annotations = response.get("asin_last_position_heard_annots", [])
for annot in annotations:
if annot.get("asin") != asin:
continue
last_position_heard = annot.get("last_position_heard", {})
if not isinstance(last_position_heard, dict):
continue
if last_position_heard.get("status") == "DoesNotExist":
return None
position_ms = last_position_heard.get("position_ms")
if position_ms is not None:
return float(position_ms) / 1000.0
return None
except (OSError, ValueError, KeyError):
return None
def _get_content_reference(self, asin: str) -> dict | None:
"""Fetch content reference (ACR and version) for position updates."""
try:
response = self.client.get(
path=f"1.0/content/{asin}/metadata",
response_groups="content_reference",
)
content_metadata = response.get("content_metadata", {})
content_reference = content_metadata.get("content_reference", {})
if isinstance(content_reference, dict):
return content_reference
return None
except (OSError, ValueError, KeyError):
return None
def _update_position(self, asin: str, position_seconds: float) -> bool:
"""Persist playback position to the API. Returns True on success."""
if position_seconds < 0:
return False
content_ref = self._get_content_reference(asin)
if not content_ref:
return False
acr = content_ref.get("acr")
if not acr:
return False
body = {
"acr": acr,
"asin": asin,
"position_ms": int(position_seconds * 1000),
}
if version := content_ref.get("version"):
body["version"] = version
try:
self.client.put(
path=f"1.0/lastpositions/{asin}",
body=body,
)
return True
except (OSError, ValueError, KeyError):
return False
def save_last_position(self, asin: str, position_seconds: float) -> bool:
"""Save playback position to Audible. Returns True on success."""
if position_seconds <= 0:
return False
return self._update_position(asin, position_seconds)
@staticmethod
def format_duration(
value: int | None, unit: str = "minutes", default_none: str | None = None
) -> str | None:
"""Format a duration value as e.g. 2h30m or 45m."""
if value is None or value <= 0:
return default_none
total_minutes = int(value)
if unit == "seconds":
total_minutes //= 60
hours, minutes = divmod(total_minutes, 60)
if hours > 0:
return f"{hours}h{minutes:02d}" if minutes else f"{hours}h"
return f"{minutes}m"
def mark_as_finished(self, asin: str, item: LibraryItem | None = None) -> bool:
"""Mark a book as finished on Audible. Optionally mutates item in place."""
total_ms = self._get_runtime_ms(asin, item)
if not total_ms:
return False
position_ms = total_ms
acr = self._get_acr(asin)
if not acr:
return False
try:
self.client.put(
path=f"1.0/lastpositions/{asin}",
body={"asin": asin, "acr": acr, "position_ms": position_ms},
)
if item:
item["is_finished"] = True
listening_status = item.get("listening_status", {})
if isinstance(listening_status, dict):
listening_status["is_finished"] = True
return True
except Exception:
return False
def _get_runtime_ms(self, asin: str, item: LibraryItem | None = None) -> int | None:
"""Return total runtime in ms from item or API."""
if item:
runtime_min = self.extract_runtime_minutes(item)
if runtime_min:
return runtime_min * 60 * 1000
try:
response = self.client.get(
path=f"1.0/content/{asin}/metadata",
response_groups="chapter_info",
)
chapter_info = response.get(
"content_metadata", {}).get("chapter_info", {})
return chapter_info.get("runtime_length_ms")
except Exception:
return None
def _get_acr(self, asin: str) -> str | None:
"""Fetch ACR token required for position and finish updates."""
try:
response = self.client.post(
path=f"1.0/content/{asin}/licenserequest",
body={
"response_groups": "content_reference",
"consumption_type": "Download",
"drm_type": "Adrm",
},
)
return response.get("content_license", {}).get("acr")
except Exception:
return None
@staticmethod
def format_time(seconds: float) -> str:
"""Format seconds as HH:MM:SS or MM:SS for display."""
total_seconds = int(seconds)
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
secs = total_seconds % 60
if hours > 0:
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
return f"{minutes:02d}:{secs:02d}"

View File

@@ -0,0 +1,84 @@
"""Metadata extraction helpers for library items."""
from __future__ import annotations
from ..types import LibraryItem
class LibraryClientExtractMixin:
"""Extracts display and status fields from library items."""
def extract_title(self, item: LibraryItem) -> str:
"""Return the book title from a library item."""
product = item.get("product", {})
return (
product.get("title")
or item.get("title")
or product.get("asin", "Unknown Title")
)
def extract_authors(self, item: LibraryItem) -> str:
"""Return comma-separated author names from a library item."""
product = item.get("product", {})
authors = product.get("authors") or product.get("contributors") or []
if not authors and "authors" in item:
authors = item.get("authors", [])
author_names = [
author.get("name", "") for author in authors if isinstance(author, dict)
]
return ", ".join(author_names) or "Unknown"
def extract_runtime_minutes(self, item: LibraryItem) -> int | None:
"""Return runtime in minutes if present."""
product = item.get("product", {})
runtime_fields = [
"runtime_length_min",
"runtime_length",
"vLength",
"length",
"duration",
]
runtime = None
for field in runtime_fields:
runtime = product.get(field) or item.get(field)
if runtime is not None:
break
if runtime is None:
return None
if isinstance(runtime, dict):
return int(runtime.get("min", 0))
if isinstance(runtime, (int, float)):
return int(runtime)
return None
def extract_progress_info(self, item: LibraryItem) -> float | None:
"""Return progress percentage (0-100) if present."""
percent_complete = item.get("percent_complete")
listening_status = item.get("listening_status", {})
if isinstance(listening_status, dict) and percent_complete is None:
percent_complete = listening_status.get("percent_complete")
return float(percent_complete) if percent_complete is not None else None
def extract_asin(self, item: LibraryItem) -> str | None:
"""Return the ASIN for a library item."""
product = item.get("product", {})
return item.get("asin") or product.get("asin")
def is_finished(self, item: LibraryItem) -> bool:
"""Return True if the item is marked or inferred as finished."""
is_finished_flag = item.get("is_finished")
percent_complete = item.get("percent_complete")
listening_status = item.get("listening_status")
if isinstance(listening_status, dict):
is_finished_flag = is_finished_flag or listening_status.get(
"is_finished", False
)
if percent_complete is None:
percent_complete = listening_status.get("percent_complete", 0)
return bool(is_finished_flag) or (
isinstance(percent_complete, (int, float)) and percent_complete >= 100
)

View File

@@ -0,0 +1,165 @@
"""Library page fetching helpers for the Audible API client."""
from __future__ import annotations
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any
from ..types import LibraryItem, StatusCallback
class LibraryClientFetchMixin:
"""Fetches all library items from paginated Audible endpoints."""
client: Any
def fetch_all_items(
self, on_progress: StatusCallback | None = None
) -> list[LibraryItem]:
"""Fetch all library items from the API."""
response_groups = "contributors,product_attrs,product_desc,is_finished,listening_status,percent_complete"
return self._fetch_all_pages(response_groups, on_progress)
def _fetch_page(
self,
page: int,
page_size: int,
response_groups: str,
) -> tuple[int, list[LibraryItem]]:
"""Fetch one library page and return its index with items."""
library = self.client.get(
path="library",
num_results=page_size,
page=page,
response_groups=response_groups,
)
items = library.get("items", [])
return page, list(items)
def _fetch_all_pages(
self,
response_groups: str,
on_progress: StatusCallback | None = None,
) -> list[LibraryItem]:
"""Fetch all library pages using parallel requests after page one."""
library_response = None
page_size = 200
for attempt_size in [200, 100, 50]:
try:
library_response = self.client.get(
path="library",
num_results=attempt_size,
page=1,
response_groups=response_groups,
)
page_size = attempt_size
break
except Exception:
continue
if not library_response:
return []
first_page_items = library_response.get("items", [])
if not first_page_items:
return []
all_items: list[LibraryItem] = list(first_page_items)
if on_progress:
on_progress(f"Fetched page 1 ({len(first_page_items)} items)...")
if len(first_page_items) < page_size:
return all_items
estimated_pages = self._estimate_total_pages(library_response, page_size)
page_results = self._fetch_remaining_pages(
response_groups=response_groups,
page_size=page_size,
estimated_pages=estimated_pages,
initial_total=len(first_page_items),
on_progress=on_progress,
)
for page_num in sorted(page_results.keys()):
all_items.extend(page_results[page_num])
return all_items
def _estimate_total_pages(self, library_response: dict, page_size: int) -> int:
"""Estimate total pages from API metadata with a conservative cap."""
total_items_estimate = library_response.get(
"total_results"
) or library_response.get("total")
if not total_items_estimate:
return 500
estimated_pages = (total_items_estimate + page_size - 1) // page_size
return min(estimated_pages, 1000)
def _fetch_remaining_pages(
self,
response_groups: str,
page_size: int,
estimated_pages: int,
initial_total: int,
on_progress: StatusCallback | None = None,
) -> dict[int, list[LibraryItem]]:
"""Fetch pages 2..N with bounded in-flight requests for faster startup."""
page_results: dict[int, list[LibraryItem]] = {}
max_workers = min(16, max(1, estimated_pages - 1))
next_page_to_submit = 2
stop_page = estimated_pages + 1
completed_count = 0
total_items = initial_total
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_page: dict = {}
while (
next_page_to_submit <= estimated_pages
and next_page_to_submit < stop_page
and len(future_to_page) < max_workers
):
future = executor.submit(
self._fetch_page,
next_page_to_submit,
page_size,
response_groups,
)
future_to_page[future] = next_page_to_submit
next_page_to_submit += 1
while future_to_page:
future = next(as_completed(future_to_page))
page_num = future_to_page.pop(future)
try:
fetched_page, items = future.result()
except Exception:
continue
if items:
page_results[fetched_page] = items
total_items += len(items)
completed_count += 1
if on_progress and completed_count % 20 == 0:
on_progress(
f"Fetched {completed_count} pages ({total_items} items)..."
)
if len(items) < page_size:
stop_page = min(stop_page, fetched_page)
while (
next_page_to_submit <= estimated_pages
and next_page_to_submit < stop_page
and len(future_to_page) < max_workers
):
next_future = executor.submit(
self._fetch_page,
next_page_to_submit,
page_size,
response_groups,
)
future_to_page[next_future] = next_page_to_submit
next_page_to_submit += 1
return page_results

View File

@@ -0,0 +1,70 @@
"""Helpers for marking content as finished through Audible APIs."""
from __future__ import annotations
from typing import Any
from ..types import LibraryItem
class LibraryClientFinishedMixin:
"""Marks titles as finished and mutates in-memory item state."""
client: Any
def mark_as_finished(self, asin: str, item: LibraryItem | None = None) -> bool:
"""Mark a book as finished on Audible and optionally update item state."""
total_ms = self._get_runtime_ms(asin, item)
if not total_ms:
return False
acr = self._get_acr(asin)
if not acr:
return False
try:
self.client.put(
path=f"1.0/lastpositions/{asin}",
body={"asin": asin, "acr": acr, "position_ms": total_ms},
)
if item:
item["is_finished"] = True
listening_status = item.get("listening_status", {})
if isinstance(listening_status, dict):
listening_status["is_finished"] = True
return True
except Exception:
return False
def _get_runtime_ms(self, asin: str, item: LibraryItem | None = None) -> int | None:
"""Return total runtime in milliseconds from item or metadata endpoint."""
if item:
extract_runtime_minutes = getattr(self, "extract_runtime_minutes")
runtime_min = extract_runtime_minutes(item)
if runtime_min:
return runtime_min * 60 * 1000
try:
response = self.client.get(
path=f"1.0/content/{asin}/metadata",
response_groups="chapter_info",
)
chapter_info = response.get("content_metadata", {}).get("chapter_info", {})
return chapter_info.get("runtime_length_ms")
except Exception:
return None
def _get_acr(self, asin: str) -> str | None:
"""Fetch the ACR token required by finish/update write operations."""
try:
response = self.client.post(
path=f"1.0/content/{asin}/licenserequest",
body={
"response_groups": "content_reference",
"consumption_type": "Download",
"drm_type": "Adrm",
},
)
return response.get("content_license", {}).get("acr")
except Exception:
return None

View File

@@ -0,0 +1,37 @@
"""Formatting helpers exposed by the library client."""
from __future__ import annotations
class LibraryClientFormatMixin:
"""Formats durations and timestamps for display usage."""
@staticmethod
def format_duration(
value: int | None,
unit: str = "minutes",
default_none: str | None = None,
) -> str | None:
"""Format duration values as compact hour-minute strings."""
if value is None or value <= 0:
return default_none
total_minutes = int(value)
if unit == "seconds":
total_minutes //= 60
hours, minutes = divmod(total_minutes, 60)
if hours > 0:
return f"{hours}h{minutes:02d}" if minutes else f"{hours}h"
return f"{minutes}m"
@staticmethod
def format_time(seconds: float) -> str:
"""Format seconds as HH:MM:SS or MM:SS for display."""
total_seconds = int(seconds)
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
secs = total_seconds % 60
if hours > 0:
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
return f"{minutes:02d}:{secs:02d}"

View File

@@ -0,0 +1,85 @@
"""Playback position read and write helpers for library content."""
from __future__ import annotations
from typing import Any
class LibraryClientPositionsMixin:
"""Handles last-position retrieval and persistence."""
client: Any
def get_last_position(self, asin: str) -> float | None:
"""Get the last playback position for a book in seconds."""
try:
response = self.client.get(
path="1.0/annotations/lastpositions",
asins=asin,
)
annotations = response.get("asin_last_position_heard_annots", [])
for annotation in annotations:
if annotation.get("asin") != asin:
continue
last_position_heard = annotation.get("last_position_heard", {})
if not isinstance(last_position_heard, dict):
continue
if last_position_heard.get("status") == "DoesNotExist":
return None
position_ms = last_position_heard.get("position_ms")
if position_ms is not None:
return float(position_ms) / 1000.0
return None
except (OSError, ValueError, KeyError):
return None
def _get_content_reference(self, asin: str) -> dict | None:
"""Fetch content reference payload used by position update calls."""
try:
response = self.client.get(
path=f"1.0/content/{asin}/metadata",
response_groups="content_reference",
)
content_metadata = response.get("content_metadata", {})
content_reference = content_metadata.get("content_reference", {})
if isinstance(content_reference, dict):
return content_reference
return None
except (OSError, ValueError, KeyError):
return None
def _update_position(self, asin: str, position_seconds: float) -> bool:
"""Persist playback position to the API and return success state."""
if position_seconds < 0:
return False
content_ref = self._get_content_reference(asin)
if not content_ref:
return False
acr = content_ref.get("acr")
if not acr:
return False
body = {
"acr": acr,
"asin": asin,
"position_ms": int(position_seconds * 1000),
}
if version := content_ref.get("version"):
body["version"] = version
try:
self.client.put(
path=f"1.0/lastpositions/{asin}",
body=body,
)
return True
except (OSError, ValueError, KeyError):
return False
def save_last_position(self, asin: str, position_seconds: float) -> bool:
"""Save playback position to Audible and return success state."""
if position_seconds <= 0:
return False
return self._update_position(asin, position_seconds)

View File

@@ -45,12 +45,16 @@ class ControllerLifecycleMixin(ControllerStateMixin):
try:
proc, return_code = process_mod.run_ffplay(cmd)
if proc is None:
if return_code == 0 and start_position > 0 and self.total_duration and start_position >= self.total_duration - 5:
if (
return_code == 0
and start_position > 0
and self.total_duration
and start_position >= self.total_duration - 5
):
notify("Reached end of file")
self._reset_state()
return False
notify(
f"Playback process exited immediately (code: {return_code})")
notify(f"Playback process exited immediately (code: {return_code})")
return False
self.playback_process = proc
self.is_playing = True
@@ -114,6 +118,8 @@ class ControllerLifecycleMixin(ControllerStateMixin):
download_manager: DownloadManager,
asin: str,
status_callback: StatusCallback | None = None,
preferred_title: str | None = None,
preferred_author: str | None = None,
) -> bool:
"""Download AAX if needed, get activation bytes, then start playback. Returns True on success."""
notify = status_callback or self.notify
@@ -121,7 +127,12 @@ class ControllerLifecycleMixin(ControllerStateMixin):
notify("Could not download file")
return False
notify("Preparing playback...")
local_path = download_manager.get_or_download(asin, notify)
local_path = download_manager.get_or_download(
asin,
notify,
preferred_title=preferred_title,
preferred_author=preferred_author,
)
if not local_path:
notify("Could not download file")
return False
@@ -136,14 +147,15 @@ class ControllerLifecycleMixin(ControllerStateMixin):
last = self.library_client.get_last_position(asin)
if last is not None and last > 0:
start_position = last
notify(
f"Resuming from {LibraryClient.format_time(start_position)}")
notify(f"Resuming from {LibraryClient.format_time(start_position)}")
except (OSError, ValueError, KeyError):
pass
notify(f"Starting playback of {local_path.name}...")
self.current_asin = asin
self.last_save_time = time.time()
return self.start(local_path, activation_hex, notify, start_position, self.playback_speed)
return self.start(
local_path, activation_hex, notify, start_position, self.playback_speed
)
def toggle_playback(self) -> bool:
"""Toggle between pause and resume. Returns True if an action was performed."""
@@ -160,7 +172,10 @@ class ControllerLifecycleMixin(ControllerStateMixin):
return True
def _restart_at_position(
self, new_position: float, new_speed: float | None = None, message: str | None = None
self,
new_position: float,
new_speed: float | None = None,
message: str | None = None,
) -> bool:
"""Stop current process and start again at new_position; optionally set speed and notify."""
if not self.is_playing or not self.current_file_path:
@@ -170,7 +185,9 @@ class ControllerLifecycleMixin(ControllerStateMixin):
speed = new_speed if new_speed is not None else saved["speed"]
self._stop_process()
time.sleep(0.2)
if self.start(saved["file_path"], saved["activation"], self.notify, new_position, speed):
if self.start(
saved["file_path"], saved["activation"], self.notify, new_position, speed
):
self.current_asin = saved["asin"]
self.total_duration = saved["duration"]
self.chapters = saved["chapters"]

View File

@@ -1,6 +1,6 @@
[project]
name = "auditui"
version = "0.1.6"
version = "0.2.0"
description = "An Audible TUI client"
readme = "README.md"
requires-python = ">=3.10,<3.13"

View File

@@ -0,0 +1,52 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, cast
from auditui.app.actions import AppActionsMixin
@dataclass(slots=True)
class FakeTable:
"""Minimal table shim exposing cursor and row count."""
row_count: int
cursor_row: int = 0
class DummyActionsApp(AppActionsMixin):
"""Minimal app host used for download naming hint tests."""
def __init__(self) -> None:
"""Initialize state required by action helpers."""
self.current_items: list[dict] = []
self.download_manager = object()
self.library_client = type(
"Library", (), {"extract_asin": lambda self, item: item.get("asin")}
)()
self._table = FakeTable(row_count=0, cursor_row=0)
def update_status(self, message: str) -> None:
"""Ignore status in this focused behavior test."""
del message
def query_one(self, selector: str, _type: object) -> FakeTable:
"""Return the fake table used in selection tests."""
assert selector == "#library_table"
return self._table
def test_action_toggle_download_passes_selected_item() -> None:
"""Ensure download toggle forwards selected item for naming hints."""
app = DummyActionsApp()
seen: list[tuple[str, str | None]] = []
def capture_toggle(asin: str, item: dict | None = None) -> None:
"""Capture download toggle arguments for assertions."""
seen.append((asin, item.get("title") if item else None))
setattr(cast(Any, app), "_toggle_download_async", capture_toggle)
app._table = FakeTable(row_count=1, cursor_row=0)
app.current_items = [{"asin": "ASIN", "title": "Book"}]
app.action_toggle_download()
assert seen == [("ASIN", "Book")]

View File

@@ -0,0 +1,135 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, cast
from auditui.app.actions import AppActionsMixin
@dataclass(slots=True)
class FakeTable:
"""Minimal table shim exposing cursor and row count."""
row_count: int
cursor_row: int = 0
class FakePlayback:
"""Playback stub with togglable boolean return values."""
def __init__(self, result: bool) -> None:
"""Store deterministic toggle result for tests."""
self._result = result
self.calls: list[str] = []
def toggle_playback(self) -> bool:
"""Return configured result and record call."""
self.calls.append("toggle")
return self._result
def seek_forward(self, _seconds: float) -> bool:
"""Return configured result and record call."""
self.calls.append("seek_forward")
return self._result
class DummyActionsApp(AppActionsMixin):
"""Mixin host with just enough state for action method tests."""
def __init__(self) -> None:
"""Initialize fake app state used by action helpers."""
self.messages: list[str] = []
self.current_items: list[dict] = []
self.download_manager = object()
self.library_client = type(
"Library",
(),
{
"extract_asin": lambda self, item: item.get("asin"),
"extract_title": lambda self, item: item.get("title"),
"extract_authors": lambda self, item: item.get("authors"),
},
)()
self.playback = FakePlayback(True)
self.filter_text = "hello"
self._refreshed = 0
self._table = FakeTable(row_count=0, cursor_row=0)
def update_status(self, message: str) -> None:
"""Collect status messages for assertions."""
self.messages.append(message)
def query_one(self, selector: str, _type: object) -> FakeTable:
"""Return the fake table used in selection tests."""
assert selector == "#library_table"
return self._table
def _refresh_filtered_view(self) -> None:
"""Record refresh invocations for filter tests."""
self._refreshed += 1
def test_get_selected_asin_requires_non_empty_table() -> None:
"""Ensure selection fails gracefully when table has no rows."""
app = DummyActionsApp()
app._table = FakeTable(row_count=0)
assert app._get_selected_asin() is None
assert app.messages[-1] == "No books available"
def test_get_selected_asin_returns_current_row_asin() -> None:
"""Ensure selected row index maps to current_items ASIN."""
app = DummyActionsApp()
app._table = FakeTable(row_count=2, cursor_row=1)
app.current_items = [{"asin": "A1"}, {"asin": "A2"}]
assert app._get_selected_asin() == "A2"
def test_action_play_selected_starts_async_playback() -> None:
"""Ensure play action calls async starter with selected ASIN."""
app = DummyActionsApp()
seen: list[str] = []
def capture_start(asin: str, item: dict | None = None) -> None:
"""Capture playback start arguments for assertions."""
suffix = f":{item.get('title')}" if item else ""
seen.append(f"start:{asin}{suffix}")
setattr(cast(Any, app), "_start_playback_async", capture_start)
app._table = FakeTable(row_count=1, cursor_row=0)
app.current_items = [{"asin": "ASIN", "title": "Book"}]
app.action_play_selected()
assert seen[-1] == "start:ASIN:Book"
def test_action_toggle_playback_shows_hint_when_no_playback() -> None:
"""Ensure toggle action displays no-playback hint on false return."""
app = DummyActionsApp()
app.playback = FakePlayback(False)
app.action_toggle_playback()
assert app.messages[-1] == "No playback active. Press Enter to play a book."
def test_action_seek_forward_shows_hint_when_seek_fails() -> None:
"""Ensure failed seek action reuses no-playback helper status."""
app = DummyActionsApp()
app.playback = FakePlayback(False)
app.action_seek_forward()
assert app.messages[-1] == "No playback active. Press Enter to play a book."
def test_action_clear_filter_resets_filter_and_refreshes() -> None:
"""Ensure clearing filter resets text and refreshes filtered view."""
app = DummyActionsApp()
app.action_clear_filter()
assert app.filter_text == ""
assert app._refreshed == 1
assert app.messages[-1] == "Filter cleared"
def test_apply_filter_coerces_none_to_empty_string() -> None:
"""Ensure apply_filter normalizes None and refreshes list view."""
app = DummyActionsApp()
app._apply_filter(None)
assert app.filter_text == ""
assert app._refreshed == 1

View File

@@ -32,27 +32,25 @@ EXPECTED_BINDINGS: tuple[NormalizedBinding, ...] = (
)
def _normalize_binding(
binding: Binding | BindingTuple,
) -> NormalizedBinding:
"""Return key, action, description, and priority for a binding item."""
def _normalize_binding(binding: Binding | BindingTuple) -> NormalizedBinding:
"""Return key, action, description, and priority from one binding item."""
if isinstance(binding, Binding):
return (binding.key, binding.action, binding.description, binding.priority)
key, action, description = binding
return (key, action, description, False)
def _normalize_bindings() -> list[NormalizedBinding]:
"""Normalize all declared bindings to a comparable shape."""
def _all_bindings() -> list[NormalizedBinding]:
"""Normalize all app bindings into a stable comparable structure."""
return [_normalize_binding(binding) for binding in BINDINGS]
def test_bindings_match_expected_shortcuts() -> None:
"""Ensure the app ships with the expected binding set and actions."""
assert _normalize_bindings() == list(EXPECTED_BINDINGS)
"""Ensure the shipped shortcut list stays stable and explicit."""
assert _all_bindings() == list(EXPECTED_BINDINGS)
def test_binding_keys_are_unique() -> None:
"""Ensure each key is defined once to avoid ambiguous key dispatch."""
keys = [binding[0] for binding in _normalize_bindings()]
"""Ensure each key is defined only once to avoid dispatch ambiguity."""
keys = [binding[0] for binding in _all_bindings()]
assert len(keys) == len(set(keys))

View File

@@ -0,0 +1,114 @@
from __future__ import annotations
from auditui.app.library import AppLibraryMixin
from auditui.app import library as library_mod
class DummyLibraryApp(AppLibraryMixin):
"""Mixin host exposing only members used by AppLibraryMixin."""
def __init__(self) -> None:
"""Initialize in-memory app state and call tracking."""
self.all_items: list[dict] = []
self.show_all_mode = False
self._search_text_cache: dict[int, str] = {1: "x"}
self.messages: list[str] = []
self.call_log: list[tuple[str, tuple]] = []
self.library_client = None
def _prime_search_cache(self, items: list[dict]) -> None:
"""Store a marker so callers can assert this method was reached."""
self.call_log.append(("prime", (items,)))
def show_all(self) -> None:
"""Record show_all invocation for assertion."""
self.call_log.append(("show_all", ()))
def show_unfinished(self) -> None:
"""Record show_unfinished invocation for assertion."""
self.call_log.append(("show_unfinished", ()))
def update_status(self, message: str) -> None:
"""Capture status messages."""
self.messages.append(message)
def call_from_thread(self, func, *args) -> None:
"""Execute callback immediately to simplify tests."""
func(*args)
def _thread_status_update(self, message: str) -> None:
"""Capture worker-thread status update messages."""
self.messages.append(message)
def test_on_library_loaded_refreshes_cache_and_shows_unfinished() -> None:
"""Ensure loaded items reset cache and default to unfinished view."""
app = DummyLibraryApp()
items = [{"asin": "a"}, {"asin": "b"}]
app.on_library_loaded(items)
assert app.all_items == items
assert app._search_text_cache == {}
assert app.messages[-1] == "Loaded 2 books"
assert app.call_log[-1][0] == "show_unfinished"
def test_on_library_loaded_uses_show_all_mode() -> None:
"""Ensure loaded items respect show_all mode when enabled."""
app = DummyLibraryApp()
app.show_all_mode = True
app.on_library_loaded([{"asin": "a"}])
assert app.call_log[-1][0] == "show_all"
def test_on_library_error_formats_message() -> None:
"""Ensure library errors are surfaced through status updates."""
app = DummyLibraryApp()
app.on_library_error("boom")
assert app.messages == ["Error fetching library: boom"]
def test_fetch_library_calls_on_loaded(monkeypatch) -> None:
"""Ensure fetch_library forwards fetched items through call_from_thread."""
app = DummyLibraryApp()
class Worker:
"""Simple worker shim exposing cancellation state."""
is_cancelled = False
class LibraryClient:
"""Fake client returning a deterministic item list."""
def fetch_all_items(self, callback):
"""Invoke callback and return one item."""
callback("progress")
return [{"asin": "x"}]
app.library_client = LibraryClient()
monkeypatch.setattr(library_mod, "get_current_worker", lambda: Worker())
AppLibraryMixin.fetch_library.__wrapped__(app)
assert app.all_items == [{"asin": "x"}]
assert "Loaded 1 books" in app.messages
def test_fetch_library_handles_expected_exception(monkeypatch) -> None:
"""Ensure fetch exceptions call on_library_error with error text."""
app = DummyLibraryApp()
class Worker:
"""Simple worker shim exposing cancellation state."""
is_cancelled = False
class BrokenClient:
"""Fake client raising an expected fetch exception."""
def fetch_all_items(self, callback):
"""Raise the same exception family handled by mixin."""
del callback
raise ValueError("bad fetch")
app.library_client = BrokenClient()
monkeypatch.setattr(library_mod, "get_current_worker", lambda: Worker())
AppLibraryMixin.fetch_library.__wrapped__(app)
assert app.messages[-1] == "Error fetching library: bad fetch"

View File

@@ -0,0 +1,148 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import cast
from auditui.app.progress import AppProgressMixin
from textual.events import Key
from textual.widgets import DataTable
@dataclass(slots=True)
class FakeKeyEvent:
"""Minimal key event carrying key value and prevent_default state."""
key: str
prevented: bool = False
def prevent_default(self) -> None:
"""Mark event as prevented."""
self.prevented = True
@dataclass(slots=True)
class FakeStatic:
"""Minimal static widget with text and visibility fields."""
display: bool = False
text: str = ""
def update(self, value: str) -> None:
"""Store rendered text value."""
self.text = value
@dataclass(slots=True)
class FakeProgressBar:
"""Minimal progress bar widget storing latest progress value."""
progress: float = 0.0
def update(self, progress: float) -> None:
"""Store progress value for assertions."""
self.progress = progress
@dataclass(slots=True)
class FakeContainer:
"""Minimal container exposing display property."""
display: bool = False
class DummyPlayback:
"""Playback shim exposing only members used by AppProgressMixin."""
def __init__(self) -> None:
"""Initialize playback state and update counters."""
self.is_playing = False
self._status: str | None = None
self._progress: tuple[str, float, float] | None = None
self.saved_calls = 0
def check_status(self):
"""Return configurable status check message."""
return self._status
def get_current_progress(self):
"""Return configurable progress tuple."""
return self._progress
def update_position_if_needed(self) -> None:
"""Record periodic save invocations."""
self.saved_calls += 1
class DummyProgressApp(AppProgressMixin):
"""Mixin host that records action dispatch and widget updates."""
def __init__(self) -> None:
"""Initialize fake widgets and playback state."""
self.playback = DummyPlayback()
self.focused = object()
self.actions: list[str] = []
self.messages: list[str] = []
self.progress_info = FakeStatic()
self.progress_bar = FakeProgressBar()
self.progress_container = FakeContainer()
def action_seek_backward(self) -> None:
"""Record backward seek action dispatch."""
self.actions.append("seek_backward")
def action_toggle_playback(self) -> None:
"""Record toggle playback action dispatch."""
self.actions.append("toggle")
def update_status(self, message: str) -> None:
"""Capture status messages for assertions."""
self.messages.append(message)
def query_one(self, selector: str, _type: object):
"""Return fake widgets by selector used by progress mixin."""
return {
"#progress_info": self.progress_info,
"#progress_bar": self.progress_bar,
"#progress_bar_container": self.progress_container,
}[selector]
def test_on_key_dispatches_seek_when_playing() -> None:
"""Ensure left key is intercepted and dispatched to seek action."""
app = DummyProgressApp()
app.playback.is_playing = True
event = FakeKeyEvent("left")
app.on_key(cast(Key, event))
assert event.prevented is True
assert app.actions == ["seek_backward"]
def test_on_key_dispatches_space_when_table_focused() -> None:
"""Ensure space is intercepted and dispatched when table is focused."""
app = DummyProgressApp()
app.focused = DataTable()
event = FakeKeyEvent("space")
app.on_key(cast(Key, event))
assert event.prevented is True
assert app.actions == ["toggle"]
def test_check_playback_status_hides_progress_after_message() -> None:
"""Ensure playback status message triggers hide-progress behavior."""
app = DummyProgressApp()
app.playback._status = "Finished"
app._check_playback_status()
assert app.messages[-1] == "Finished"
assert app.progress_info.display is False
assert app.progress_container.display is False
def test_update_progress_renders_visible_progress_row() -> None:
"""Ensure valid progress data updates widgets and makes them visible."""
app = DummyProgressApp()
app.playback.is_playing = True
app.playback._progress = ("Chapter", 30.0, 60.0)
app._update_progress()
assert app.progress_bar.progress == 50.0
assert app.progress_info.display is True
assert app.progress_container.display is True

View File

@@ -0,0 +1,30 @@
from __future__ import annotations
from auditui.app.progress import AppProgressMixin
class DummyPlayback:
"""Playback stub exposing periodic update method."""
def __init__(self) -> None:
"""Initialize call counter."""
self.saved_calls = 0
def update_position_if_needed(self) -> None:
"""Increment call counter for assertions."""
self.saved_calls += 1
class DummyProgressApp(AppProgressMixin):
"""Minimal app host containing playback dependency only."""
def __init__(self) -> None:
"""Initialize playback stub."""
self.playback = DummyPlayback()
def test_save_position_periodically_delegates_to_playback() -> None:
"""Ensure periodic save method delegates to playback updater."""
app = DummyProgressApp()
app._save_position_periodically()
assert app.playback.saved_calls == 1

View File

@@ -8,22 +8,29 @@ from auditui.library import build_search_text, filter_items
class StubLibrary:
"""Minimal library facade used by search-related app helpers."""
def extract_title(self, item: dict) -> str:
"""Return title from a synthetic item."""
return item.get("title", "")
def extract_authors(self, item: dict) -> str:
"""Return authors from a synthetic item."""
return item.get("authors", "")
@dataclass(slots=True)
class Dummy:
class DummyAuditui:
"""Narrow object compatible with Auditui search-cache helper calls."""
_search_text_cache: dict[int, str] = field(default_factory=dict)
library_client: StubLibrary = field(default_factory=StubLibrary)
def test_get_search_text_is_cached() -> None:
"""Ensure repeated text extraction for one item reuses cache entries."""
item = {"title": "Title", "authors": "Author"}
dummy = Dummy()
dummy = DummyAuditui()
first = Auditui._get_search_text(cast(Auditui, dummy), item)
second = Auditui._get_search_text(cast(Auditui, dummy), item)
assert first == "title author"
@@ -31,7 +38,8 @@ def test_get_search_text_is_cached() -> None:
assert len(dummy._search_text_cache) == 1
def test_filter_items_uses_cache() -> None:
def test_filter_items_uses_cached_callable() -> None:
"""Ensure filter_items cooperates with a memoized search text callback."""
library = StubLibrary()
cache: dict[int, str] = {}
items = [
@@ -40,6 +48,7 @@ def test_filter_items_uses_cache() -> None:
]
def cached(item: dict) -> str:
"""Build and cache normalized search text per object identity."""
cache_key = id(item)
if cache_key not in cache:
cache[cache_key] = build_search_text(item, cast(Any, library))
@@ -49,6 +58,7 @@ def test_filter_items_uses_cache() -> None:
assert result == [items[1]]
def test_build_search_text_without_library() -> None:
def test_build_search_text_without_library_client() -> None:
"""Ensure fallback search text path handles inline author dicts."""
item = {"title": "Title", "authors": [{"name": "A"}, {"name": "B"}]}
assert build_search_text(item, None) == "title a, b"

View File

@@ -0,0 +1,78 @@
from __future__ import annotations
from auditui.app import state as state_mod
class DummyApp:
"""Lightweight app object for state initialization tests."""
def __init__(self) -> None:
"""Expose update_status to satisfy init dependencies."""
self.messages: list[str] = []
def update_status(self, message: str) -> None:
"""Collect status updates for assertions."""
self.messages.append(message)
def test_init_state_without_auth_or_client(monkeypatch) -> None:
"""Ensure baseline state is initialized when no auth/client is provided."""
app = DummyApp()
playback_args: list[tuple[object, object]] = []
class FakePlayback:
"""Playback constructor recorder for init tests."""
def __init__(self, notify, library_client) -> None:
"""Capture arguments passed by init_auditui_state."""
playback_args.append((notify, library_client))
monkeypatch.setattr(state_mod, "PlaybackController", FakePlayback)
state_mod.init_auditui_state(app)
assert app.library_client is None
assert app.download_manager is None
assert app.all_items == []
assert app.current_items == []
assert app.filter_text == ""
assert app.show_all_mode is False
assert playback_args and playback_args[0][1] is None
def test_init_state_with_auth_and_client_builds_dependencies(monkeypatch) -> None:
"""Ensure init constructs library, downloads, and playback dependencies."""
app = DummyApp()
auth = object()
client = object()
class FakeLibraryClient:
"""Fake library client constructor for dependency wiring checks."""
def __init__(self, value) -> None:
"""Store constructor argument for assertions."""
self.value = value
class FakeDownloadManager:
"""Fake download manager constructor for dependency wiring checks."""
def __init__(self, auth_value, client_value) -> None:
"""Store constructor arguments for assertions."""
self.args = (auth_value, client_value)
class FakePlayback:
"""Fake playback constructor for dependency wiring checks."""
def __init__(self, notify, library_client) -> None:
"""Store constructor arguments for assertions."""
self.notify = notify
self.library_client = library_client
monkeypatch.setattr(state_mod, "LibraryClient", FakeLibraryClient)
monkeypatch.setattr(state_mod, "DownloadManager", FakeDownloadManager)
monkeypatch.setattr(state_mod, "PlaybackController", FakePlayback)
state_mod.init_auditui_state(app, auth=auth, client=client)
assert isinstance(app.library_client, FakeLibraryClient)
assert isinstance(app.download_manager, FakeDownloadManager)
assert isinstance(app.playback, FakePlayback)
assert app.library_client.value is client
assert app.download_manager.args == (auth, client)
assert app.playback.library_client.value is client

View File

@@ -0,0 +1,34 @@
from __future__ import annotations
from auditui.app.table import AppTableMixin
class DummyTableApp(AppTableMixin):
"""Minimal host exposing library client for row key helper tests."""
def __init__(self) -> None:
"""Initialize a fake library client with ASIN extraction."""
self.library_client = type(
"Library",
(),
{"extract_asin": lambda self, item: item.get("asin")},
)()
def test_build_row_key_prefers_asin_and_remains_unique() -> None:
"""Ensure duplicate ASINs receive deterministic unique key suffixes."""
app = DummyTableApp()
used: set[str] = set()
item = {"asin": "ASIN1"}
first = app._build_row_key(item, "Title", 0, used)
second = app._build_row_key(item, "Title", 1, used)
assert first == "ASIN1"
assert second == "ASIN1#2"
def test_build_row_key_falls_back_to_title_and_index() -> None:
"""Ensure missing ASIN values use title-index fallback keys."""
app = DummyTableApp()
used: set[str] = set()
key = app._build_row_key({"asin": None}, "Unknown Title", 3, used)
assert key == "Unknown Title#3"

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
import sys
from pathlib import Path
from types import ModuleType
from typing import Any, cast
ROOT = Path(__file__).resolve().parents[1]
@@ -15,21 +16,26 @@ try:
except ModuleNotFoundError:
audible_stub = ModuleType("audible")
class Authenticator: # minimal stub for type usage
class Authenticator:
"""Minimal audible authenticator test stub."""
pass
class Client: # minimal stub for type usage
class Client:
"""Minimal audible client test stub."""
pass
audible_stub.Authenticator = Authenticator
audible_stub.Client = Client
setattr(cast(Any, audible_stub), "Authenticator", Authenticator)
setattr(cast(Any, audible_stub), "Client", Client)
activation_bytes = ModuleType("audible.activation_bytes")
def get_activation_bytes(_auth: Authenticator | None = None) -> bytes:
"""Return deterministic empty activation bytes for tests."""
return b""
activation_bytes.get_activation_bytes = get_activation_bytes
setattr(cast(Any, activation_bytes), "get_activation_bytes", get_activation_bytes)
sys.modules["audible"] = audible_stub
sys.modules["audible.activation_bytes"] = activation_bytes

View File

@@ -0,0 +1,91 @@
from __future__ import annotations
from pathlib import Path
from typing import Any, cast
import pytest
from auditui.constants import MIN_FILE_SIZE
from auditui.downloads import DownloadManager
def _manager_with_cache_dir(tmp_path: Path) -> DownloadManager:
"""Build a lightweight DownloadManager instance without real HTTP clients."""
manager = DownloadManager.__new__(DownloadManager)
manager.cache_dir = tmp_path
manager.chunk_size = 1024
return manager
def test_sanitize_filename_replaces_invalid_characters() -> None:
"""Ensure filename normalization uses ASCII words and dashes."""
manager = DownloadManager.__new__(DownloadManager)
assert (
manager._sanitize_filename("Stephen King 11/22/63") == "Stephen-King-11-22-63"
)
def test_validate_download_url_accepts_only_http_schemes() -> None:
"""Ensure download URL validation only accepts HTTP and HTTPS links."""
manager = DownloadManager.__new__(DownloadManager)
assert manager._validate_download_url("https://example.com/file") is True
assert manager._validate_download_url("http://example.com/file") is True
assert manager._validate_download_url("ftp://example.com/file") is False
def test_get_cached_path_and_remove_cached(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Ensure cache lookup and cache deletion work for valid files."""
manager = _manager_with_cache_dir(tmp_path)
monkeypatch.setattr(
manager,
"_get_filename_stems_from_asin",
lambda asin: ["Stephen-King_11-22-63", "11-22-63"],
)
cached_path = tmp_path / "Stephen-King_11-22-63.aax"
cached_path.write_bytes(b"0" * MIN_FILE_SIZE)
messages: list[str] = []
assert manager.get_cached_path("ASIN123") == cached_path
assert manager.is_cached("ASIN123") is True
assert manager.remove_cached("ASIN123", notify=messages.append) is True
assert not cached_path.exists()
assert "Removed from cache" in messages[-1]
def test_get_cached_path_ignores_small_files(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Ensure undersized files are not treated as valid cache entries."""
manager = _manager_with_cache_dir(tmp_path)
monkeypatch.setattr(
manager,
"_get_filename_stems_from_asin",
lambda asin: ["Stephen-King_11-22-63", "11-22-63"],
)
cached_path = tmp_path / "Stephen-King_11-22-63.aax"
cached_path.write_bytes(b"0" * (MIN_FILE_SIZE - 1))
assert manager.get_cached_path("ASIN123") is None
def test_get_filename_stems_include_author_title_and_legacy_title() -> None:
"""Ensure filename candidates include new author_title and legacy title names."""
manager = DownloadManager.__new__(DownloadManager)
manager.client = cast(
Any,
type(
"Client",
(),
{
"get": lambda self, path, **kwargs: {
"product": {
"title": "11/22/63",
"authors": [{"name": "Stephen King"}],
}
}
},
)(),
)
stems = manager._get_filename_stems_from_asin("B00TEST")
assert stems[0] == "Stephen-King_11-22-63"
assert "11-22-63" in stems

View File

@@ -0,0 +1,160 @@
from __future__ import annotations
from pathlib import Path
from typing import Any, cast
import pytest
from auditui.constants import MIN_FILE_SIZE
from auditui.downloads import DownloadManager
from auditui.downloads import manager as manager_mod
def _bare_manager(tmp_path: Path) -> DownloadManager:
"""Create manager without invoking constructor side effects."""
manager = DownloadManager.__new__(DownloadManager)
manager.cache_dir = tmp_path
manager.chunk_size = 1024
manager.auth = cast(
Any,
type(
"Auth",
(),
{"adp_token": "x", "locale": type("Loc", (), {"domain": "fr"})()},
)(),
)
return manager
def test_get_activation_bytes_returns_hex(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:
"""Ensure activation bytes are converted to lowercase hex string."""
manager = _bare_manager(tmp_path)
monkeypatch.setattr(manager_mod, "get_activation_bytes", lambda _auth: b"\xde\xad")
assert manager.get_activation_bytes() == "dead"
def test_get_activation_bytes_handles_errors(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:
"""Ensure activation retrieval failures are handled gracefully."""
manager = _bare_manager(tmp_path)
def _boom(_auth: object) -> bytes:
"""Raise a deterministic failure for exception-path coverage."""
raise OSError("no auth")
monkeypatch.setattr(manager_mod, "get_activation_bytes", _boom)
assert manager.get_activation_bytes() is None
def test_get_or_download_uses_cached_file_when_available(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Ensure cached files bypass link generation and download work."""
manager = _bare_manager(tmp_path)
monkeypatch.setattr(
manager,
"_get_filename_stems_from_asin",
lambda asin, preferred_title=None, preferred_author=None: ["Author_Book"],
)
cached_path = tmp_path / "Author_Book.aax"
cached_path.write_bytes(b"1" * MIN_FILE_SIZE)
messages: list[str] = []
assert manager.get_or_download("ASIN", notify=messages.append) == cached_path
assert "Using cached file" in messages[0]
def test_get_or_download_reports_invalid_url(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Ensure workflow reports invalid download URLs and aborts."""
manager = _bare_manager(tmp_path)
monkeypatch.setattr(
manager,
"_get_filename_stems_from_asin",
lambda asin, preferred_title=None, preferred_author=None: ["Author_Book"],
)
monkeypatch.setattr(
manager, "_get_download_link", lambda asin, notify=None: "ftp://bad"
)
messages: list[str] = []
assert manager.get_or_download("ASIN", notify=messages.append) is None
assert "Invalid download URL" in messages
def test_get_or_download_handles_download_failure(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Ensure workflow reports failures when stream download does not complete."""
manager = _bare_manager(tmp_path)
monkeypatch.setattr(
manager,
"_get_filename_stems_from_asin",
lambda asin, preferred_title=None, preferred_author=None: ["Author_Book"],
)
monkeypatch.setattr(
manager, "_get_download_link", lambda asin, notify=None: "https://ok"
)
monkeypatch.setattr(manager, "_download_file", lambda url, path, notify=None: None)
messages: list[str] = []
assert manager.get_or_download("ASIN", notify=messages.append) is None
assert "Download failed" in messages
def test_get_or_download_uses_preferred_naming_hints(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Ensure preferred title/author are forwarded to filename stem selection."""
manager = _bare_manager(tmp_path)
captured: list[tuple[str | None, str | None]] = []
def stems(
asin: str,
preferred_title: str | None = None,
preferred_author: str | None = None,
) -> list[str]:
"""Capture naming hints and return one deterministic filename stem."""
del asin
captured.append((preferred_title, preferred_author))
return ["Author_Book"]
monkeypatch.setattr(manager, "_get_filename_stems_from_asin", stems)
monkeypatch.setattr(manager, "_get_download_link", lambda asin, notify=None: None)
manager.get_or_download(
"ASIN",
preferred_title="11/22/63",
preferred_author="Stephen King",
)
assert captured == [("11/22/63", "Stephen King")]
def test_get_or_download_retries_when_file_is_too_small(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Ensure small downloads are retried and then reported with exact byte size."""
manager = _bare_manager(tmp_path)
monkeypatch.setattr(
manager,
"_get_filename_stems_from_asin",
lambda asin, preferred_title=None, preferred_author=None: ["Author_Book"],
)
monkeypatch.setattr(
manager, "_get_download_link", lambda asin, notify=None: "https://ok"
)
attempts = {"count": 0}
def write_small_file(url: str, path: Path, notify=None) -> Path:
"""Write an undersized file to trigger retry and final failure messages."""
del url, notify
attempts["count"] += 1
path.write_bytes(b"x" * 100)
return path
monkeypatch.setattr(manager, "_download_file", write_small_file)
messages: list[str] = []
assert manager.get_or_download("ASIN", notify=messages.append) is None
assert attempts["count"] == 2
assert any("retrying" in message for message in messages)
assert any("file too small" in message for message in messages)

View File

@@ -0,0 +1,111 @@
from __future__ import annotations
from dataclasses import dataclass, field
from auditui.library import LibraryClient
@dataclass(slots=True)
class MockClient:
"""Client double that records writes and serves configurable responses."""
put_calls: list[tuple[str, dict]] = field(default_factory=list)
post_calls: list[tuple[str, dict]] = field(default_factory=list)
_post_response: dict = field(default_factory=dict)
raise_on_put: bool = False
def put(self, path: str, body: dict) -> dict:
"""Record put payload or raise when configured."""
if self.raise_on_put:
raise RuntimeError("put failed")
self.put_calls.append((path, body))
return {}
def post(self, path: str, body: dict) -> dict:
"""Record post payload and return configured response."""
self.post_calls.append((path, body))
return self._post_response
def get(self, path: str, **kwargs: dict) -> dict:
"""Return empty data for extractor-focused tests."""
del path, kwargs
return {}
def build_item(
*,
title: str | None = None,
product_title: str | None = None,
authors: list[dict] | None = None,
runtime_min: int | None = None,
listening_status: dict | None = None,
percent_complete: int | float | None = None,
asin: str | None = None,
) -> dict:
"""Construct synthetic library items for extractor and finish tests."""
item: dict = {}
if title is not None:
item["title"] = title
if percent_complete is not None:
item["percent_complete"] = percent_complete
if listening_status is not None:
item["listening_status"] = listening_status
if asin is not None:
item["asin"] = asin
product: dict = {}
if product_title is not None:
product["title"] = product_title
if runtime_min is not None:
product["runtime_length"] = {"min": runtime_min}
if authors is not None:
product["authors"] = authors
if asin is not None:
product["asin"] = asin
if product:
item["product"] = product
if runtime_min is not None:
item["runtime_length_min"] = runtime_min
return item
def test_extract_title_prefers_product_title() -> None:
"""Ensure product title has precedence over outer item title."""
library = LibraryClient(MockClient()) # type: ignore[arg-type]
assert (
library.extract_title(build_item(title="Outer", product_title="Inner"))
== "Inner"
)
def test_extract_title_falls_back_to_asin() -> None:
"""Ensure title fallback uses product ASIN when no title exists."""
library = LibraryClient(MockClient()) # type: ignore[arg-type]
assert library.extract_title({"product": {"asin": "A1"}}) == "A1"
def test_extract_authors_joins_names() -> None:
"""Ensure author dictionaries are converted to a readable list."""
library = LibraryClient(MockClient()) # type: ignore[arg-type]
item = build_item(authors=[{"name": "A"}, {"name": "B"}])
assert library.extract_authors(item) == "A, B"
def test_extract_runtime_minutes_handles_dict_and_number() -> None:
"""Ensure runtime extraction supports dict and numeric payloads."""
library = LibraryClient(MockClient()) # type: ignore[arg-type]
assert library.extract_runtime_minutes(build_item(runtime_min=12)) == 12
assert library.extract_runtime_minutes({"runtime_length": 42}) == 42
def test_extract_progress_info_prefers_listening_status_when_needed() -> None:
"""Ensure progress can be sourced from listening_status when top-level is absent."""
library = LibraryClient(MockClient()) # type: ignore[arg-type]
item = build_item(listening_status={"percent_complete": 25.0})
assert library.extract_progress_info(item) == 25.0
def test_extract_asin_prefers_item_then_product() -> None:
"""Ensure ASIN extraction works from both item and product fields."""
library = LibraryClient(MockClient()) # type: ignore[arg-type]
assert library.extract_asin(build_item(asin="ASIN1")) == "ASIN1"
assert library.extract_asin({"product": {"asin": "ASIN2"}}) == "ASIN2"

View File

@@ -0,0 +1,103 @@
from __future__ import annotations
from dataclasses import dataclass, field
from auditui.library import LibraryClient
@dataclass(slots=True)
class ProgressClient:
"""Client double for position and finished-state API methods."""
get_responses: dict[str, dict] = field(default_factory=dict)
put_calls: list[tuple[str, dict]] = field(default_factory=list)
post_response: dict = field(default_factory=dict)
fail_put: bool = False
def get(self, path: str, **kwargs: object) -> dict:
"""Return preconfigured payloads by API path."""
del kwargs
return self.get_responses.get(path, {})
def put(self, path: str, body: dict) -> dict:
"""Record payloads or raise to exercise error handling."""
if self.fail_put:
raise OSError("write failed")
self.put_calls.append((path, body))
return {}
def post(self, path: str, body: dict) -> dict:
"""Return licenserequest response for ACR extraction."""
del path, body
return self.post_response
def test_is_finished_true_from_percent_complete() -> None:
"""Ensure 100 percent completion is treated as finished."""
library = LibraryClient(ProgressClient()) # type: ignore[arg-type]
assert library.is_finished({"percent_complete": 100}) is True
def test_get_last_position_reads_matching_annotation() -> None:
"""Ensure last position is read in seconds from matching annotation."""
client = ProgressClient(
get_responses={
"1.0/annotations/lastpositions": {
"asin_last_position_heard_annots": [
{"asin": "X", "last_position_heard": {"position_ms": 9000}}
]
}
}
)
library = LibraryClient(client) # type: ignore[arg-type]
assert library.get_last_position("X") == 9.0
def test_get_last_position_returns_none_for_missing_state() -> None:
"""Ensure DoesNotExist status is surfaced as no saved position."""
client = ProgressClient(
get_responses={
"1.0/annotations/lastpositions": {
"asin_last_position_heard_annots": [
{"asin": "X", "last_position_heard": {"status": "DoesNotExist"}}
]
}
}
)
library = LibraryClient(client) # type: ignore[arg-type]
assert library.get_last_position("X") is None
def test_save_last_position_validates_non_positive_values() -> None:
"""Ensure save_last_position short-circuits on non-positive input."""
library = LibraryClient(ProgressClient()) # type: ignore[arg-type]
assert library.save_last_position("A", 0) is False
def test_update_position_writes_version_when_available() -> None:
"""Ensure version is included in payload when metadata provides it."""
client = ProgressClient(
get_responses={
"1.0/content/A/metadata": {
"content_metadata": {
"content_reference": {"acr": "token", "version": "2"}
}
}
}
)
library = LibraryClient(client) # type: ignore[arg-type]
assert library._update_position("A", 5.5) is True
path, body = client.put_calls[0]
assert path == "1.0/lastpositions/A"
assert body["position_ms"] == 5500
assert body["version"] == "2"
def test_mark_as_finished_updates_item_in_place() -> None:
"""Ensure successful finish update mutates local item flags."""
client = ProgressClient(post_response={"content_license": {"acr": "token"}})
library = LibraryClient(client) # type: ignore[arg-type]
item = {"runtime_length_min": 1, "listening_status": {}}
assert library.mark_as_finished("ASIN", item) is True
assert item["is_finished"] is True
assert item["listening_status"]["is_finished"] is True

View File

@@ -0,0 +1,34 @@
from __future__ import annotations
from auditui.library import build_search_text, filter_items
class SearchLibrary:
"""Simple search extraction adapter for build_search_text tests."""
def extract_title(self, item: dict) -> str:
"""Return a title value from a synthetic item."""
return item.get("t", "")
def extract_authors(self, item: dict) -> str:
"""Return an author value from a synthetic item."""
return item.get("a", "")
def test_build_search_text_uses_library_client_when_present() -> None:
"""Ensure search text delegates to library extractor methods."""
item = {"t": "The Book", "a": "The Author"}
assert build_search_text(item, SearchLibrary()) == "the book the author"
def test_filter_items_returns_input_when_filter_empty() -> None:
"""Ensure empty filter bypasses per-item search callback evaluation."""
items = [{"k": 1}, {"k": 2}]
assert filter_items(items, "", lambda _item: "ignored") == items
def test_filter_items_matches_case_insensitively() -> None:
"""Ensure search matching is case-insensitive across computed text."""
items = [{"name": "Alpha"}, {"name": "Beta"}]
result = filter_items(items, "BETA", lambda item: item["name"].lower())
assert result == [items[1]]

View File

@@ -0,0 +1,99 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, cast
from auditui.constants import AUTHOR_NAME_MAX_LENGTH
from auditui.library import (
create_progress_sort_key,
create_title_sort_key,
filter_unfinished_items,
format_item_as_row,
truncate_author_name,
)
class StubLibrary:
"""Library facade exposing only helpers needed by table formatting code."""
def extract_title(self, item: dict) -> str:
"""Return synthetic title value."""
return item.get("title", "")
def extract_authors(self, item: dict) -> str:
"""Return synthetic authors value."""
return item.get("authors", "")
def extract_runtime_minutes(self, item: dict) -> int | None:
"""Return synthetic minute duration."""
return item.get("minutes")
def format_duration(
self, value: int | None, unit: str = "minutes", default_none: str | None = None
) -> str | None:
"""Render runtime in compact minute format for tests."""
del unit
return default_none if value is None else f"{value}m"
def extract_progress_info(self, item: dict) -> float | None:
"""Return synthetic progress percentage value."""
return item.get("percent")
def extract_asin(self, item: dict) -> str | None:
"""Return synthetic ASIN value."""
return item.get("asin")
def is_finished(self, item: dict) -> bool:
"""Return synthetic finished flag from the item."""
return bool(item.get("finished"))
@dataclass(slots=True)
class StubDownloads:
"""Download cache adapter exposing just is_cached."""
cached: set[str]
def is_cached(self, asin: str) -> bool:
"""Return whether an ASIN is cached."""
return asin in self.cached
def test_create_title_sort_key_normalizes_accents() -> None:
"""Ensure title sorting removes accents before case-fold compare."""
key_fn, _ = create_title_sort_key()
assert key_fn(["Ecole"]) == key_fn(["École"])
def test_create_progress_sort_key_parses_percent_strings() -> None:
"""Ensure progress sorting converts percentages and handles invalid values."""
key_fn, _ = create_progress_sort_key()
assert key_fn(["0", "0", "0", "42.5%", ""]) == 42.5
assert key_fn(["0", "0", "0", "bad", ""]) == 0.0
def test_truncate_author_name_clamps_long_values() -> None:
"""Ensure very long author strings are shortened with ellipsis."""
long_name = "A" * (AUTHOR_NAME_MAX_LENGTH + 5)
out = truncate_author_name(long_name)
assert out.endswith("...")
assert len(out) <= AUTHOR_NAME_MAX_LENGTH
def test_format_item_as_row_marks_downloaded_titles() -> None:
"""Ensure downloaded ASINs are shown with a checkmark in table rows."""
item = {
"title": "Title",
"authors": "Author",
"minutes": 90,
"percent": 12.34,
"asin": "A1",
}
row = format_item_as_row(item, StubLibrary(), cast(Any, StubDownloads({"A1"})))
assert row == ("Title", "Author", "90m", "12.3%", "âś“")
def test_filter_unfinished_items_keeps_only_incomplete() -> None:
"""Ensure unfinished filter excludes items marked as finished."""
items = [{"id": 1, "finished": False}, {"id": 2, "finished": True}]
assert filter_unfinished_items(items, StubLibrary()) == [items[0]]

View File

@@ -0,0 +1,34 @@
from __future__ import annotations
from auditui.playback.chapters import get_current_chapter, get_current_chapter_index
CHAPTERS = [
{"title": "One", "start_time": 0.0, "end_time": 60.0},
{"title": "Two", "start_time": 60.0, "end_time": 120.0},
]
def test_get_current_chapter_handles_empty_chapter_list() -> None:
"""Ensure empty chapter metadata still returns a sensible fallback row."""
assert get_current_chapter(12.0, [], 300.0) == ("Unknown Chapter", 12.0, 300.0)
def test_get_current_chapter_returns_matching_chapter_window() -> None:
"""Ensure chapter selection returns title and chapter-relative timing."""
assert get_current_chapter(75.0, CHAPTERS, 120.0) == ("Two", 15.0, 60.0)
def test_get_current_chapter_falls_back_to_last_chapter() -> None:
"""Ensure elapsed values past known ranges map to last chapter."""
assert get_current_chapter(150.0, CHAPTERS, 200.0) == ("Two", 90.0, 60.0)
def test_get_current_chapter_index_returns_none_without_chapters() -> None:
"""Ensure chapter index lookup returns None when no chapters exist."""
assert get_current_chapter_index(10.0, []) is None
def test_get_current_chapter_index_returns_last_when_past_end() -> None:
"""Ensure chapter index lookup falls back to the final chapter index."""
assert get_current_chapter_index(200.0, CHAPTERS) == 1

View File

@@ -0,0 +1,129 @@
from __future__ import annotations
from pathlib import Path
from typing import Any, cast
from auditui.playback import controller_lifecycle as lifecycle_mod
from auditui.playback.controller import PlaybackController
class Proc:
"""Process shim used for lifecycle tests."""
def __init__(self, poll_value=None) -> None:
"""Set initial poll result."""
self._poll_value = poll_value
def poll(self):
"""Return process running status."""
return self._poll_value
def _controller() -> tuple[PlaybackController, list[str]]:
"""Build controller and message capture list."""
messages: list[str] = []
return PlaybackController(messages.append, None), messages
def test_start_reports_missing_ffplay(monkeypatch) -> None:
"""Ensure start fails fast when ffplay is unavailable."""
controller, messages = _controller()
monkeypatch.setattr(lifecycle_mod.process_mod, "is_ffplay_available", lambda: False)
assert controller.start(Path("book.aax")) is False
assert messages[-1] == "ffplay not found. Please install ffmpeg"
def test_start_sets_state_on_success(monkeypatch) -> None:
"""Ensure successful start initializes playback state and metadata."""
controller, messages = _controller()
monkeypatch.setattr(lifecycle_mod.process_mod, "is_ffplay_available", lambda: True)
monkeypatch.setattr(
lifecycle_mod.process_mod, "build_ffplay_cmd", lambda *args: ["ffplay"]
)
monkeypatch.setattr(
lifecycle_mod.process_mod, "run_ffplay", lambda cmd: (Proc(None), None)
)
monkeypatch.setattr(
lifecycle_mod,
"load_media_info",
lambda path, activation: (600.0, [{"title": "ch"}]),
)
monkeypatch.setattr(lifecycle_mod.time, "time", lambda: 100.0)
ok = controller.start(
Path("book.aax"), activation_hex="abcd", start_position=10.0, speed=1.2
)
assert ok is True
assert controller.is_playing is True
assert controller.current_file_path == Path("book.aax")
assert controller.total_duration == 600.0
assert messages[-1] == "Playing: book.aax"
def test_prepare_and_start_uses_last_position(monkeypatch) -> None:
"""Ensure prepare flow resumes from saved position when available."""
messages: list[str] = []
lib = type("Lib", (), {"get_last_position": lambda self, asin: 75.0})()
controller = PlaybackController(messages.append, cast(Any, lib))
started: list[tuple] = []
class DM:
"""Download manager shim returning path and activation token."""
def get_or_download(
self,
asin,
notify,
preferred_title: str | None = None,
preferred_author: str | None = None,
):
"""Return deterministic downloaded file path."""
del asin, notify, preferred_title, preferred_author
return Path("book.aax")
def get_activation_bytes(self):
"""Return deterministic activation token."""
return "abcd"
monkeypatch.setattr(controller, "start", lambda *args: started.append(args) or True)
monkeypatch.setattr(lifecycle_mod.time, "time", lambda: 200.0)
assert controller.prepare_and_start(cast(Any, DM()), "ASIN") is True
assert started and started[0][3] == 75.0
assert "Resuming from 01:15" in messages
def test_toggle_playback_uses_pause_and_resume_paths(monkeypatch) -> None:
"""Ensure toggle dispatches pause or resume based on paused flag."""
controller, _ = _controller()
controller.is_playing = True
controller.playback_process = cast(Any, Proc(None))
called: list[str] = []
monkeypatch.setattr(controller, "pause", lambda: called.append("pause"))
monkeypatch.setattr(controller, "resume", lambda: called.append("resume"))
controller.is_paused = False
assert controller.toggle_playback() is True
controller.is_paused = True
assert controller.toggle_playback() is True
assert called == ["pause", "resume"]
def test_restart_at_position_restores_state_and_notifies(monkeypatch) -> None:
"""Ensure restart logic preserves metadata and emits custom message."""
controller, messages = _controller()
controller.is_playing = True
controller.is_paused = True
controller.current_file_path = Path("book.aax")
controller.current_asin = "ASIN"
controller.activation_hex = "abcd"
controller.total_duration = 400.0
controller.chapters = [{"title": "One"}]
controller.playback_speed = 1.0
monkeypatch.setattr(controller, "_stop_process", lambda: None)
monkeypatch.setattr(lifecycle_mod.time, "sleep", lambda _s: None)
monkeypatch.setattr(controller, "start", lambda *args: True)
paused: list[str] = []
monkeypatch.setattr(controller, "pause", lambda: paused.append("pause"))
assert controller._restart_at_position(120.0, message="Jumped") is True
assert controller.current_asin == "ASIN"
assert controller.chapters == [{"title": "One"}]
assert paused == ["pause"]
assert messages[-1] == "Jumped"

View File

@@ -0,0 +1,100 @@
from __future__ import annotations
from auditui.playback import controller_seek_speed as seek_speed_mod
from auditui.playback.controller import PlaybackController
def _controller() -> tuple[PlaybackController, list[str]]:
"""Build controller and in-memory notification sink."""
messages: list[str] = []
return PlaybackController(messages.append, None), messages
def test_seek_notifies_when_target_invalid(monkeypatch) -> None:
"""Ensure seek reports end-of-file condition when target is invalid."""
controller, messages = _controller()
monkeypatch.setattr(controller, "_get_current_elapsed", lambda: 20.0)
controller.seek_offset = 100.0
controller.total_duration = 120.0
monkeypatch.setattr(
seek_speed_mod.seek_mod, "compute_seek_target", lambda *args: None
)
assert controller._seek(30.0, "forward") is False
assert messages[-1] == "Already at end of file"
def test_seek_to_chapter_reports_bounds(monkeypatch) -> None:
"""Ensure chapter seek reports first and last chapter boundaries."""
controller, messages = _controller()
controller.is_playing = True
controller.current_file_path = object()
controller.chapters = [{"title": "One", "start_time": 0.0, "end_time": 10.0}]
monkeypatch.setattr(controller, "_get_current_elapsed", lambda: 1.0)
monkeypatch.setattr(
seek_speed_mod.chapters_mod,
"get_current_chapter_index",
lambda elapsed, chapters: 0,
)
assert controller.seek_to_chapter("next") is False
assert messages[-1] == "Already at last chapter"
assert controller.seek_to_chapter("previous") is False
assert messages[-1] == "Already at first chapter"
def test_save_current_position_writes_positive_values() -> None:
"""Ensure save_current_position persists elapsed time via library client."""
calls: list[tuple[str, float]] = []
library = type(
"Library",
(),
{"save_last_position": lambda self, asin, pos: calls.append((asin, pos))},
)()
controller = PlaybackController(lambda _msg: None, library)
controller.current_asin = "ASIN"
controller.is_playing = True
controller.playback_start_time = 1.0
controller.seek_offset = 10.0
controller._get_current_elapsed = lambda: 15.0
controller._save_current_position()
assert calls == [("ASIN", 25.0)]
def test_update_position_if_needed_honors_interval(monkeypatch) -> None:
"""Ensure periodic save runs only when interval has elapsed."""
controller, _ = _controller()
controller.is_playing = True
controller.current_asin = "ASIN"
controller.library_client = object()
controller.last_save_time = 10.0
controller.position_save_interval = 30.0
saves: list[str] = []
monkeypatch.setattr(
controller, "_save_current_position", lambda: saves.append("save")
)
monkeypatch.setattr(seek_speed_mod.time, "time", lambda: 20.0)
controller.update_position_if_needed()
monkeypatch.setattr(seek_speed_mod.time, "time", lambda: 45.0)
controller.update_position_if_needed()
assert saves == ["save"]
def test_change_speed_restarts_with_new_rate(monkeypatch) -> None:
"""Ensure speed changes restart playback at current position."""
controller, _ = _controller()
controller.playback_speed = 1.0
controller.seek_offset = 5.0
monkeypatch.setattr(controller, "_get_current_elapsed", lambda: 10.0)
seen: list[tuple[float, float, str]] = []
def fake_restart(
position: float, speed: float | None = None, message: str | None = None
) -> bool:
"""Capture restart call parameters."""
seen.append((position, speed or 0.0, message or ""))
return True
monkeypatch.setattr(controller, "_restart_at_position", fake_restart)
assert controller.increase_speed() is True
assert seen and seen[0][0] == 15.0
assert seen[0][1] > 1.0
assert seen[0][2].startswith("Speed: ")

View File

@@ -0,0 +1,76 @@
from __future__ import annotations
from pathlib import Path
from typing import Any, cast
from auditui.playback import controller_state as state_mod
from auditui.playback.controller import PlaybackController
class Proc:
"""Simple process shim exposing poll and pid for state tests."""
def __init__(self, poll_value=None) -> None:
"""Store poll return value and fake pid."""
self._poll_value = poll_value
self.pid = 123
def poll(self):
"""Return configured process status code or None."""
return self._poll_value
def _controller() -> tuple[PlaybackController, list[str]]:
"""Build playback controller and collected notifications list."""
messages: list[str] = []
return PlaybackController(messages.append, None), messages
def test_get_current_elapsed_rolls_pause_into_duration(monkeypatch) -> None:
"""Ensure elapsed helper absorbs stale pause_start_time when resumed."""
controller, _ = _controller()
controller.pause_start_time = 100.0
controller.is_paused = False
monkeypatch.setattr(state_mod.time, "time", lambda: 120.0)
monkeypatch.setattr(state_mod.elapsed_mod, "get_elapsed", lambda *args: 50.0)
assert controller._get_current_elapsed() == 50.0
assert controller.paused_duration == 20.0
assert controller.pause_start_time is None
def test_validate_playback_state_stops_when_process_ended() -> None:
"""Ensure state validation stops and reports when process is gone."""
controller, messages = _controller()
controller.playback_process = cast(Any, Proc(poll_value=1))
controller.is_playing = True
controller.current_file_path = Path("book.aax")
ok = controller._validate_playback_state(require_paused=False)
assert ok is False
assert messages[-1] == "Playback process has ended"
def test_send_signal_sets_paused_state_and_notifies(monkeypatch) -> None:
"""Ensure SIGSTOP updates paused state and includes filename in status."""
controller, messages = _controller()
controller.playback_process = cast(Any, Proc())
controller.current_file_path = Path("book.aax")
monkeypatch.setattr(state_mod.process_mod, "send_signal", lambda proc, sig: None)
controller._send_signal(state_mod.signal.SIGSTOP, "Paused", "pause")
assert controller.is_paused is True
assert messages[-1] == "Paused: book.aax"
def test_send_signal_handles_process_lookup(monkeypatch) -> None:
"""Ensure missing process lookup errors are handled with user-facing message."""
controller, messages = _controller()
controller.playback_process = cast(Any, Proc())
def raise_lookup(proc, sig):
"""Raise process lookup error to exercise exception path."""
del proc, sig
raise ProcessLookupError("gone")
monkeypatch.setattr(state_mod.process_mod, "send_signal", raise_lookup)
monkeypatch.setattr(state_mod.process_mod, "terminate_process", lambda proc: None)
controller._send_signal(state_mod.signal.SIGCONT, "Playing", "resume")
assert messages[-1] == "Process no longer exists"

View File

@@ -0,0 +1,21 @@
from __future__ import annotations
from auditui.playback.elapsed import get_elapsed
from auditui.playback import elapsed as elapsed_mod
def test_get_elapsed_returns_zero_without_start_time() -> None:
"""Ensure elapsed computation returns zero when playback has not started."""
assert get_elapsed(None, None, 0.0, False) == 0.0
def test_get_elapsed_while_paused_uses_pause_start(monkeypatch) -> None:
"""Ensure paused elapsed is fixed at pause_start minus previous pauses."""
monkeypatch.setattr(elapsed_mod.time, "time", lambda: 500.0)
assert get_elapsed(100.0, 250.0, 20.0, True) == 130.0
def test_get_elapsed_subtracts_pause_duration_when_resumed(monkeypatch) -> None:
"""Ensure resumed elapsed removes newly accumulated paused duration."""
monkeypatch.setattr(elapsed_mod.time, "time", lambda: 400.0)
assert get_elapsed(100.0, 300.0, 10.0, False) == 190.0

View File

@@ -0,0 +1,67 @@
from __future__ import annotations
import subprocess
from pathlib import Path
from auditui.playback import process as process_mod
class DummyProc:
"""Minimal subprocess-like object for terminate_process tests."""
def __init__(self, alive: bool = True) -> None:
"""Initialize process state and bookkeeping flags."""
self._alive = alive
self.terminated = False
self.killed = False
self.pid = 123
def poll(self) -> int | None:
"""Return None while process is alive and 0 when stopped."""
return None if self._alive else 0
def terminate(self) -> None:
"""Mark process as terminated and no longer alive."""
self.terminated = True
self._alive = False
def wait(self, timeout: float | None = None) -> int:
"""Return immediately to emulate a cooperative shutdown."""
del timeout
return 0
def kill(self) -> None:
"""Mark process as killed and no longer alive."""
self.killed = True
self._alive = False
def test_build_ffplay_cmd_includes_activation_seek_and_speed() -> None:
"""Ensure ffplay command includes optional playback arguments when set."""
cmd = process_mod.build_ffplay_cmd(Path("book.aax"), "abcd", 12.5, 1.2)
assert "-activation_bytes" in cmd
assert "-ss" in cmd
assert "atempo=1.20" in " ".join(cmd)
def test_terminate_process_handles_alive_process() -> None:
"""Ensure terminate_process gracefully shuts down a running process."""
proc = DummyProc(alive=True)
process_mod.terminate_process(proc) # type: ignore[arg-type]
assert proc.terminated is True
def test_run_ffplay_returns_none_when_unavailable(monkeypatch) -> None:
"""Ensure ffplay launch exits early when binary is not on PATH."""
monkeypatch.setattr(process_mod, "is_ffplay_available", lambda: False)
assert process_mod.run_ffplay(["ffplay", "book.aax"]) == (None, None)
def test_send_signal_delegates_to_os_kill(monkeypatch) -> None:
"""Ensure send_signal forwards process PID and signal to os.kill."""
seen: list[tuple[int, object]] = []
monkeypatch.setattr(
process_mod.os, "kill", lambda pid, sig: seen.append((pid, sig))
)
process_mod.send_signal(DummyProc(), process_mod.signal.SIGSTOP) # type: ignore[arg-type]
assert seen and seen[0][0] == 123

View File

@@ -0,0 +1,20 @@
from __future__ import annotations
from auditui.playback.seek import compute_seek_target
def test_forward_seek_returns_new_position_and_message() -> None:
"""Ensure forward seek computes expected position and status message."""
target = compute_seek_target(10.0, 100.0, 30.0, "forward")
assert target == (40.0, "Skipped forward 30s")
def test_forward_seek_returns_none_near_end() -> None:
"""Ensure seeking too close to end returns an invalid seek result."""
assert compute_seek_target(95.0, 100.0, 10.0, "forward") is None
def test_backward_seek_clamps_to_zero() -> None:
"""Ensure backward seek cannot go below zero."""
target = compute_seek_target(5.0, None, 30.0, "backward")
assert target == (0.0, "Skipped backward 30s")

View File

@@ -0,0 +1,54 @@
from __future__ import annotations
from auditui.stats.account import (
get_account_info,
get_country,
get_subscription_details,
)
class AccountClient:
"""Minimal API client returning endpoint-specific account responses."""
def __init__(self, responses: dict[str, dict]) -> None:
"""Store endpoint response map for deterministic tests."""
self._responses = responses
def get(self, path: str, **kwargs: object) -> dict:
"""Return configured response and ignore query parameters."""
del kwargs
return self._responses.get(path, {})
def test_get_account_info_merges_multiple_endpoints() -> None:
"""Ensure account info aggregator combines endpoint payload dictionaries."""
client = AccountClient(
{
"1.0/account/information": {"a": 1},
"1.0/customer/information": {"b": 2},
"1.0/customer/status": {"c": 3},
}
)
assert get_account_info(client) == {"a": 1, "b": 2, "c": 3}
def test_get_subscription_details_uses_known_nested_paths() -> None:
"""Ensure first valid subscription_details list entry is returned."""
info = {
"customer_details": {
"subscription": {"subscription_details": [{"name": "Plan"}]}
}
}
assert get_subscription_details(info) == {"name": "Plan"}
def test_get_country_supports_locale_variants() -> None:
"""Ensure country extraction supports object, domain, and locale string forms."""
auth_country_code = type(
"Auth", (), {"locale": type("Loc", (), {"country_code": "us"})()}
)()
auth_domain = type("Auth", (), {"locale": type("Loc", (), {"domain": "fr"})()})()
auth_string = type("Auth", (), {"locale": "en_gb"})()
assert get_country(auth_country_code) == "US"
assert get_country(auth_domain) == "FR"
assert get_country(auth_string) == "GB"

View File

@@ -0,0 +1,67 @@
from __future__ import annotations
from datetime import date
from auditui.stats.aggregator import StatsAggregator
from auditui.stats import aggregator as aggregator_mod
def test_get_stats_returns_empty_without_client() -> None:
"""Ensure stats aggregation short-circuits when API client is absent."""
aggregator = StatsAggregator(
client=None, auth=None, library_client=None, all_items=[]
)
assert aggregator.get_stats() == []
def test_get_stats_builds_expected_rows(monkeypatch) -> None:
"""Ensure aggregator assembles rows from listening, account, and email sources."""
monkeypatch.setattr(
aggregator_mod.listening_mod, "get_signup_year", lambda _client: 2015
)
monkeypatch.setattr(
aggregator_mod.listening_mod,
"get_listening_time",
lambda _client, duration, start_date: 120_000 if duration == 1 else 3_600_000,
)
monkeypatch.setattr(
aggregator_mod.listening_mod, "get_finished_books_count", lambda _lc, _items: 7
)
monkeypatch.setattr(
aggregator_mod.email_mod,
"resolve_email",
lambda *args, **kwargs: "user@example.com",
)
monkeypatch.setattr(aggregator_mod.account_mod, "get_country", lambda _auth: "US")
monkeypatch.setattr(
aggregator_mod.account_mod,
"get_account_info",
lambda _client: {
"subscription_details": [
{
"name": "Premium",
"next_bill_date": "2026-02-01T00:00:00Z",
"next_bill_amount": {
"currency_value": "14.95",
"currency_code": "USD",
},
}
]
},
)
aggregator = StatsAggregator(
client=object(),
auth=object(),
library_client=object(),
all_items=[{}, {}, {}],
)
stats = dict(aggregator.get_stats(today=date(2026, 2, 1)))
assert stats["Email"] == "user@example.com"
assert stats["Country Store"] == "US"
assert stats["Signup Year"] == "2015"
assert stats["Subscription"] == "Premium"
assert stats["Price"] == "14.95 USD"
assert stats["This Month"] == "2m"
assert stats["This Year"] == "1h00"
assert stats["Books Finished"] == "7 / 3"

View File

@@ -0,0 +1,64 @@
from __future__ import annotations
import json
from pathlib import Path
from auditui.stats.email import (
find_email_in_data,
first_email,
get_email_from_account_info,
get_email_from_auth,
get_email_from_auth_file,
get_email_from_config,
resolve_email,
)
def test_find_email_in_nested_data() -> None:
"""Ensure nested structures are scanned until a plausible email is found."""
data = {"a": {"b": ["nope", "user@example.com"]}}
assert find_email_in_data(data) == "user@example.com"
def test_first_email_skips_unknown_and_none() -> None:
"""Ensure first_email ignores empty and Unknown sentinel values."""
assert first_email(None, "Unknown", "ok@example.com") == "ok@example.com"
def test_get_email_from_config_and_auth_file(tmp_path: Path) -> None:
"""Ensure config and auth-file readers extract valid email fields."""
config_path = tmp_path / "config.json"
auth_path = tmp_path / "auth.json"
config_path.write_text(
json.dumps({"email": "config@example.com"}), encoding="utf-8"
)
auth_path.write_text(json.dumps({"email": "auth@example.com"}), encoding="utf-8")
assert get_email_from_config(config_path) == "config@example.com"
assert get_email_from_auth_file(auth_path) == "auth@example.com"
def test_get_email_from_auth_prefers_username() -> None:
"""Ensure auth object attributes are checked in expected precedence order."""
auth = type(
"Auth", (), {"username": "user@example.com", "login": None, "email": None}
)()
assert get_email_from_auth(auth) == "user@example.com"
def test_get_email_from_account_info_supports_nested_customer_info() -> None:
"""Ensure account email can be discovered in nested customer_info payload."""
info = {"customer_info": {"primary_email": "nested@example.com"}}
assert get_email_from_account_info(info) == "nested@example.com"
def test_resolve_email_falls_back_to_account_getter(tmp_path: Path) -> None:
"""Ensure resolve_email checks account-info callback when local sources miss."""
auth = object()
value = resolve_email(
auth,
client=object(),
config_path=tmp_path / "missing-config.json",
auth_path=tmp_path / "missing-auth.json",
get_account_info=lambda: {"customer_email": "account@example.com"},
)
assert value == "account@example.com"

View File

@@ -0,0 +1,16 @@
from __future__ import annotations
from auditui.stats.format import format_date, format_time
def test_format_time_handles_minutes_and_hours() -> None:
"""Ensure format_time outputs minute-only and hour-minute formats."""
assert format_time(90_000) == "1m"
assert format_time(3_660_000) == "1h01"
def test_format_date_handles_iso_and_invalid_values() -> None:
"""Ensure format_date normalizes ISO timestamps and preserves invalid input."""
assert format_date("2026-01-15T10:20:30Z") == "2026-01-15"
assert format_date("not-a-date") == "not-a-date"
assert format_date(None) == "Unknown"

View File

@@ -0,0 +1,64 @@
from __future__ import annotations
from auditui.stats.listening import (
get_finished_books_count,
get_listening_time,
get_signup_year,
has_activity,
)
class StatsClient:
"""Client double for monthly aggregate lookups keyed by start date."""
def __init__(self, sums_by_start_date: dict[str, list[int]]) -> None:
"""Store aggregate sums grouped by monthly_listening_interval_start_date."""
self._sums = sums_by_start_date
def get(self, path: str, **kwargs: str) -> dict:
"""Return aggregate payload based on requested interval start date."""
del path
start_date = kwargs["monthly_listening_interval_start_date"]
sums = self._sums.get(start_date, [0])
return {
"aggregated_monthly_listening_stats": [{"aggregated_sum": s} for s in sums]
}
def test_has_activity_detects_non_zero_months() -> None:
"""Ensure activity helper returns true when any month has positive sum."""
assert (
has_activity(
{
"aggregated_monthly_listening_stats": [
{"aggregated_sum": 0},
{"aggregated_sum": 1},
]
}
)
is True
)
def test_get_listening_time_sums_aggregated_months() -> None:
"""Ensure monthly aggregate sums are added into one listening total."""
client = StatsClient({"2026-01": [1000, 2000, 3000]})
assert get_listening_time(client, duration=1, start_date="2026-01") == 6000
def test_get_signup_year_returns_earliest_year_with_activity() -> None:
"""Ensure signup year search finds first active year via binary search."""
client = StatsClient(
{"2026-01": [1], "2010-01": [1], "2002-01": [1], "2001-01": [0]}
)
year = get_signup_year(client)
assert year <= 2010
def test_get_finished_books_count_uses_library_is_finished() -> None:
"""Ensure finished books count delegates to library client predicate."""
library_client = type(
"Library", (), {"is_finished": lambda self, item: item.get("done", False)}
)()
items = [{"done": True}, {"done": False}, {"done": True}]
assert get_finished_books_count(library_client, items) == 2

View File

@@ -1,48 +0,0 @@
from pathlib import Path
import pytest
from auditui.downloads import DownloadManager
from auditui.constants import MIN_FILE_SIZE
def test_sanitize_filename() -> None:
dm = DownloadManager.__new__(DownloadManager)
assert dm._sanitize_filename('a<>:"/\\|?*b') == "a_________b"
def test_validate_download_url() -> None:
dm = DownloadManager.__new__(DownloadManager)
assert dm._validate_download_url("https://example.com/file") is True
assert dm._validate_download_url("http://example.com/file") is True
assert dm._validate_download_url("ftp://example.com/file") is False
def test_cached_path_and_remove(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
dm = DownloadManager.__new__(DownloadManager)
dm.cache_dir = tmp_path
monkeypatch.setattr(dm, "_get_name_from_asin", lambda asin: "My Book")
safe_name = dm._sanitize_filename("My Book")
cached_path = tmp_path / f"{safe_name}.aax"
cached_path.write_bytes(b"0" * MIN_FILE_SIZE)
assert dm.get_cached_path("ASIN123") == cached_path
assert dm.is_cached("ASIN123") is True
messages: list[str] = []
assert dm.remove_cached("ASIN123", notify=messages.append) is True
assert not cached_path.exists()
assert messages and "Removed from cache" in messages[-1]
def test_cached_path_ignores_small_file(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
dm = DownloadManager.__new__(DownloadManager)
dm.cache_dir = tmp_path
monkeypatch.setattr(dm, "_get_name_from_asin", lambda asin: "My Book")
safe_name = dm._sanitize_filename("My Book")
cached_path = tmp_path / f"{safe_name}.aax"
cached_path.write_bytes(b"0" * (MIN_FILE_SIZE - 1))
assert dm.get_cached_path("ASIN123") is None

View File

@@ -1,131 +0,0 @@
from dataclasses import dataclass, field
from auditui.library import LibraryClient
@dataclass(slots=True)
class MockClient:
put_calls: list[tuple[str, dict]] = field(default_factory=list)
post_calls: list[tuple[str, dict]] = field(default_factory=list)
_post_response: dict = field(default_factory=dict)
raise_on_put: bool = False
def put(self, path: str, body: dict) -> dict:
if self.raise_on_put:
raise RuntimeError("put failed")
self.put_calls.append((path, body))
return {}
def post(self, path: str, body: dict) -> dict:
self.post_calls.append((path, body))
return self._post_response
def get(self, path: str, **kwargs: dict) -> dict:
return {}
def test_extract_title_prefers_product() -> None:
client = MockClient()
library = LibraryClient(client) # type: ignore[arg-type]
item = build_item(title="Outer", product_title="Inner")
assert library.extract_title(item) == "Inner"
def test_extract_authors_joins_names() -> None:
client = MockClient()
library = LibraryClient(client) # type: ignore[arg-type]
item = build_item(authors=[{"name": "A"}, {"name": "B"}])
assert library.extract_authors(item) == "A, B"
def test_extract_runtime_minutes_from_dict() -> None:
client = MockClient()
library = LibraryClient(client) # type: ignore[arg-type]
item = build_item(runtime_min=12)
assert library.extract_runtime_minutes(item) == 12
def test_extract_progress_info_from_listening_status() -> None:
client = MockClient()
library = LibraryClient(client) # type: ignore[arg-type]
item = build_item(listening_status={"percent_complete": 25.0})
assert library.extract_progress_info(item) == 25.0
def test_is_finished_with_percent_complete() -> None:
client = MockClient()
library = LibraryClient(client) # type: ignore[arg-type]
item = build_item(percent_complete=100)
assert library.is_finished(item)
def test_format_duration_and_time() -> None:
client = MockClient()
library = LibraryClient(client) # type: ignore[arg-type]
assert library.format_duration(61) == "1h01"
assert library.format_time(3661) == "01:01:01"
def test_mark_as_finished_success_updates_item() -> None:
client = MockClient()
client._post_response = {"content_license": {"acr": "token"}}
library = LibraryClient(client) # type: ignore[arg-type]
item = build_item(runtime_min=1, listening_status={})
ok = library.mark_as_finished("ASIN", item)
assert ok
assert client.put_calls
path, body = client.put_calls[0]
assert path == "1.0/lastpositions/ASIN"
assert body["acr"] == "token"
assert body["position_ms"] == 60_000
assert item["is_finished"] is True
assert item["listening_status"]["is_finished"] is True
def test_mark_as_finished_fails_without_acr() -> None:
client = MockClient()
client._post_response = {}
library = LibraryClient(client) # type: ignore[arg-type]
item = build_item(runtime_min=1)
ok = library.mark_as_finished("ASIN", item)
assert ok is False
def test_mark_as_finished_handles_put_error() -> None:
client = MockClient()
client._post_response = {"content_license": {"acr": "token"}}
client.raise_on_put = True
library = LibraryClient(client) # type: ignore[arg-type]
item = build_item(runtime_min=1)
ok = library.mark_as_finished("ASIN", item)
assert ok is False
def build_item(
*,
title: str | None = None,
product_title: str | None = None,
authors: list[dict] | None = None,
runtime_min: int | None = None,
listening_status: dict | None = None,
percent_complete: int | float | None = None,
) -> dict:
item: dict = {}
if title is not None:
item["title"] = title
if percent_complete is not None:
item["percent_complete"] = percent_complete
if listening_status is not None:
item["listening_status"] = listening_status
product: dict = {}
if product_title is not None:
product["title"] = product_title
if runtime_min is not None:
product["runtime_length"] = {"min": runtime_min}
if authors is not None:
product["authors"] = authors
if product:
item["product"] = product
if runtime_min is not None and "runtime_length_min" not in item:
item["runtime_length_min"] = runtime_min
return item

View File

@@ -1,89 +0,0 @@
from dataclasses import dataclass
from typing import Any, cast
from auditui.constants import AUTHOR_NAME_MAX_LENGTH
from auditui.library import (
create_progress_sort_key,
create_title_sort_key,
format_item_as_row,
truncate_author_name,
)
class StubLibrary:
def extract_title(self, item: dict) -> str:
return item.get("title", "")
def extract_authors(self, item: dict) -> str:
return item.get("authors", "")
def extract_runtime_minutes(self, item: dict) -> int | None:
return item.get("minutes")
def format_duration(
self, value: int | None, unit: str = "minutes", default_none: str | None = None
) -> str | None:
if value is None:
return default_none
return f"{value}m"
def extract_progress_info(self, item: dict) -> float | None:
return item.get("percent")
def extract_asin(self, item: dict) -> str | None:
return item.get("asin")
@dataclass(slots=True)
class StubDownloads:
_cached: set[str]
def is_cached(self, asin: str) -> bool:
return asin in self._cached
def test_create_title_sort_key_normalizes_accents() -> None:
key_fn, _ = create_title_sort_key()
assert key_fn(["École"]) == "ecole"
assert key_fn(["Zoo"]) == "zoo"
def test_create_progress_sort_key_parses_percent() -> None:
key_fn, _ = create_progress_sort_key()
assert key_fn(["0", "0", "0", "42.5%"]) == 42.5
assert key_fn(["0", "0", "0", "bad"]) == 0.0
def test_truncate_author_name() -> None:
long_name = "A" * (AUTHOR_NAME_MAX_LENGTH + 5)
truncated = truncate_author_name(long_name)
assert truncated.endswith("...")
assert len(truncated) <= AUTHOR_NAME_MAX_LENGTH
def test_format_item_as_row_with_downloaded() -> None:
library = StubLibrary()
downloads = StubDownloads({"ASIN123"})
item = {
"title": "Title",
"authors": "Author One",
"minutes": 90,
"percent": 12.34,
"asin": "ASIN123",
}
title, author, runtime, progress, downloaded = format_item_as_row(
item, library, cast(Any, downloads)
)
assert title == "Title"
assert author == "Author One"
assert runtime == "90m"
assert progress == "12.3%"
assert downloaded == "âś“"
def test_format_item_as_row_zero_progress() -> None:
library = StubLibrary()
item = {"title": "Title", "authors": "Author",
"minutes": 30, "percent": 0.0}
_, _, _, progress, _ = format_item_as_row(item, library, None)
assert progress == "0%"

View File

@@ -1,35 +0,0 @@
import json
from pathlib import Path
from auditui.stats.email import (
find_email_in_data,
get_email_from_auth,
get_email_from_auth_file,
get_email_from_config,
)
def test_find_email_in_data() -> None:
data = {"a": {"b": ["nope", "user@example.com"]}}
assert find_email_in_data(data) == "user@example.com"
def test_get_email_from_config(tmp_path: Path) -> None:
config_path = tmp_path / "config.json"
config_path.write_text(json.dumps({"email": "config@example.com"}))
assert get_email_from_config(config_path) == "config@example.com"
def test_get_email_from_auth_file(tmp_path: Path) -> None:
auth_path = tmp_path / "auth.json"
auth_path.write_text(json.dumps({"email": "auth@example.com"}))
assert get_email_from_auth_file(auth_path) == "auth@example.com"
def test_get_email_from_auth() -> None:
class Auth:
username = "user@example.com"
login = None
email = None
assert get_email_from_auth(Auth()) == "user@example.com"

View File

@@ -9,40 +9,54 @@ from textual.widgets import Input
@dataclass(slots=True)
class DummyEvent:
"""Minimal event object carrying an input value for tests."""
value: str
@dataclass(slots=True)
class FakeTimer:
"""Timer substitute recording whether stop() was called."""
callback: Callable[[], None]
stopped: bool = False
def stop(self) -> None:
"""Mark timer as stopped."""
self.stopped = True
def test_filter_debounce_uses_latest_value(monkeypatch) -> None:
"""Ensure debounce cancels previous timer and emits latest input value."""
seen: list[str] = []
timers: list[FakeTimer] = []
def on_change(value: str) -> None:
"""Capture emitted filter values."""
seen.append(value)
screen = FilterScreen(on_change=on_change, debounce_seconds=0.2)
def fake_set_timer(_delay: float, callback):
def fake_set_timer(_delay: float, callback: Callable[[], None]) -> FakeTimer:
"""Record timer callbacks instead of scheduling real timers."""
timer = FakeTimer(callback)
timers.append(timer)
return timer
monkeypatch.setattr(screen, "set_timer", fake_set_timer)
screen.on_input_changed(cast(Input.Changed, DummyEvent("a")))
screen.on_input_changed(cast(Input.Changed, DummyEvent("ab")))
assert len(timers) == 2
assert timers[0].stopped is True
assert timers[1].stopped is False
timers[1].callback()
assert seen == ["ab"]
def test_on_unmount_stops_pending_timer() -> None:
"""Ensure screen unmount stops pending debounce timer when present."""
screen = FilterScreen(on_change=lambda _value: None)
timer = FakeTimer(lambda: None)
screen._debounce_timer = timer
screen.on_unmount()
assert timer.stopped is True

2
uv.lock generated
View File

@@ -35,7 +35,7 @@ wheels = [
[[package]]
name = "auditui"
version = "0.1.6"
version = "0.2.0"
source = { editable = "." }
dependencies = [
{ name = "audible" },