Compare commits

...

10 Commits

9 changed files with 318 additions and 75 deletions

View File

@@ -11,6 +11,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- massive code refactoring
- complete test suite revamp
- updated download cache naming to use `Author_Title` format with normalized separators.
### Fixed
- reused library metadata for download filename generation to avoid `Unknown-Author_Unknown-Title` when title/author are already known in the UI.
## [0.1.6] - 2026-02-16

View File

@@ -10,11 +10,8 @@ from ..ui import FilterScreen, HelpScreen, StatsScreen
class AppActionsMixin:
def _get_selected_asin(self) -> str | None:
if not self.download_manager:
self.update_status(
"Not authenticated. Please restart and authenticate.")
return None
def _get_selected_item(self) -> dict | None:
"""Return the currently selected library item from the table."""
table = self.query_one("#library_table", DataTable)
if table.row_count == 0:
self.update_status("No books available")
@@ -23,10 +20,27 @@ class AppActionsMixin:
if cursor_row >= len(self.current_items):
self.update_status("Invalid selection")
return None
return self.current_items[cursor_row]
def _get_naming_hints(self, item: dict | None) -> tuple[str | None, str | None]:
"""Return preferred title and author values used for download filenames."""
if not item or not self.library_client:
return (None, None)
return (
self.library_client.extract_title(item),
self.library_client.extract_authors(item),
)
def _get_selected_asin(self) -> str | None:
if not self.download_manager:
self.update_status("Not authenticated. Please restart and authenticate.")
return None
if not self.library_client:
self.update_status("Library client not available")
return None
selected_item = self.current_items[cursor_row]
selected_item = self._get_selected_item()
if not selected_item:
return None
asin = self.library_client.extract_asin(selected_item)
if not asin:
self.update_status("Could not get ASIN for selected book")
@@ -36,7 +50,7 @@ class AppActionsMixin:
def action_play_selected(self) -> None:
asin = self._get_selected_asin()
if asin:
self._start_playback_async(asin)
self._start_playback_async(asin, self._get_selected_item())
def action_toggle_playback(self) -> None:
if not self.playback.toggle_playback():
@@ -86,8 +100,7 @@ class AppActionsMixin:
return
if self.library_client.is_finished(selected_item):
self.call_from_thread(self.update_status,
"Already marked as finished")
self.call_from_thread(self.update_status, "Already marked as finished")
return
success = self.library_client.mark_as_finished(asin, selected_item)
@@ -132,28 +145,36 @@ class AppActionsMixin:
def action_toggle_download(self) -> None:
asin = self._get_selected_asin()
if asin:
self._toggle_download_async(asin)
self._toggle_download_async(asin, self._get_selected_item())
@work(exclusive=True, thread=True)
def _toggle_download_async(self, asin: str) -> None:
def _toggle_download_async(self, asin: str, item: dict | None = None) -> None:
if not self.download_manager:
return
preferred_title, preferred_author = self._get_naming_hints(item)
if self.download_manager.is_cached(asin):
self.download_manager.remove_cached(
asin, self._thread_status_update)
self.download_manager.remove_cached(asin, self._thread_status_update)
else:
self.download_manager.get_or_download(
asin, self._thread_status_update)
asin,
self._thread_status_update,
preferred_title=preferred_title,
preferred_author=preferred_author,
)
self.call_from_thread(self._refresh_table)
@work(exclusive=True, thread=True)
def _start_playback_async(self, asin: str) -> None:
def _start_playback_async(self, asin: str, item: dict | None = None) -> None:
if not self.download_manager:
return
preferred_title, preferred_author = self._get_naming_hints(item)
self.playback.prepare_and_start(
self.download_manager,
asin,
self._thread_status_update,
preferred_title,
preferred_author,
)

View File

@@ -1,6 +1,7 @@
"""Obtains AAX files from Audible (cache or download) and provides activation bytes."""
import re
import unicodedata
from pathlib import Path
from urllib.parse import urlparse
@@ -33,26 +34,31 @@ class DownloadManager:
self.cache_dir = cache_dir
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.chunk_size = chunk_size
self._http_client = httpx.Client(
auth=auth, timeout=30.0, follow_redirects=True)
self._http_client = httpx.Client(auth=auth, timeout=30.0, follow_redirects=True)
self._download_client = httpx.Client(
timeout=httpx.Timeout(connect=30.0, read=None,
write=30.0, pool=30.0),
timeout=httpx.Timeout(connect=30.0, read=None, write=30.0, pool=30.0),
follow_redirects=True,
)
def get_or_download(
self, asin: str, notify: StatusCallback | None = None
self,
asin: str,
notify: StatusCallback | None = None,
preferred_title: str | None = None,
preferred_author: str | None = None,
) -> Path | None:
"""Return local path to AAX file; download and cache if not present."""
title = self._get_name_from_asin(asin) or asin
safe_title = self._sanitize_filename(title)
local_path = self.cache_dir / f"{safe_title}.aax"
if local_path.exists() and local_path.stat().st_size >= MIN_FILE_SIZE:
filename_stems = self._get_filename_stems_from_asin(
asin,
preferred_title=preferred_title,
preferred_author=preferred_author,
)
local_path = self.cache_dir / f"{filename_stems[0]}.aax"
cached_path = self._find_cached_path(filename_stems)
if cached_path:
if notify:
notify(f"Using cached file: {local_path.name}")
return local_path
notify(f"Using cached file: {cached_path.name}")
return cached_path
if notify:
notify(f"Downloading to {local_path.name}...")
@@ -92,12 +98,7 @@ class DownloadManager:
def get_cached_path(self, asin: str) -> Path | None:
"""Return path to cached AAX file if it exists and is valid size."""
title = self._get_name_from_asin(asin) or asin
safe_title = self._sanitize_filename(title)
local_path = self.cache_dir / f"{safe_title}.aax"
if local_path.exists() and local_path.stat().st_size >= MIN_FILE_SIZE:
return local_path
return None
return self._find_cached_path(self._get_filename_stems_from_asin(asin))
def is_cached(self, asin: str) -> bool:
"""Return True if the title is present in cache with valid size."""
@@ -130,20 +131,68 @@ class DownloadManager:
return False
def _sanitize_filename(self, filename: str) -> str:
"""Remove invalid characters from filename."""
return re.sub(r'[<>:"/\\|?*]', "_", filename)
"""Normalize a filename segment with ASCII letters, digits, and dashes."""
ascii_text = unicodedata.normalize("NFKD", filename)
ascii_text = ascii_text.encode("ascii", "ignore").decode("ascii")
ascii_text = re.sub(r"[’'`]+", "", ascii_text)
ascii_text = re.sub(r"[^A-Za-z0-9]+", "-", ascii_text)
ascii_text = re.sub(r"-+", "-", ascii_text)
ascii_text = ascii_text.strip("-._")
return ascii_text or "Unknown"
def _find_cached_path(self, filename_stems: list[str]) -> Path | None:
"""Return the first valid cached path matching any candidate filename stem."""
for filename_stem in filename_stems:
local_path = self.cache_dir / f"{filename_stem}.aax"
if local_path.exists() and local_path.stat().st_size >= MIN_FILE_SIZE:
return local_path
return None
def _get_filename_stems_from_asin(
self,
asin: str,
preferred_title: str | None = None,
preferred_author: str | None = None,
) -> list[str]:
"""Build preferred and fallback cache filename stems for an ASIN."""
if preferred_title:
preferred_combined = (
f"{self._sanitize_filename(preferred_author or 'Unknown Author')}_"
f"{self._sanitize_filename(preferred_title)}"
)
preferred_legacy = self._sanitize_filename(preferred_title)
fallback_asin = self._sanitize_filename(asin)
return list(
dict.fromkeys([preferred_combined, preferred_legacy, fallback_asin])
)
def _get_name_from_asin(self, asin: str) -> str | None:
"""Get the title/name of a book from its ASIN."""
try:
product_info = self.client.get(
path=f"1.0/catalog/products/{asin}",
response_groups="product_desc,product_attrs",
**{"response_groups": "contributors,product_desc,product_attrs"},
)
product = product_info.get("product", {})
return product.get("title") or "Unknown Title"
except (OSError, ValueError, KeyError):
return None
title = product.get("title") or "Unknown Title"
author = self._get_primary_author(product)
combined = (
f"{self._sanitize_filename(author)}_{self._sanitize_filename(title)}"
)
legacy_title = self._sanitize_filename(title)
fallback_asin = self._sanitize_filename(asin)
return list(dict.fromkeys([combined, legacy_title, fallback_asin]))
except (OSError, ValueError, KeyError, AttributeError):
return [self._sanitize_filename(asin)]
def _get_primary_author(self, product: dict) -> str:
"""Extract a primary author name from product metadata."""
contributors = product.get("authors") or product.get("contributors") or []
for contributor in contributors:
if not isinstance(contributor, dict):
continue
name = contributor.get("name")
if isinstance(name, str) and name.strip():
return name
return "Unknown Author"
def _get_download_link(
self,
@@ -174,7 +223,8 @@ class DownloadManager:
if not link:
link = str(response.url)
tld = self.auth.locale.domain
locale = getattr(self.auth, "locale", None)
tld = getattr(locale, "domain", "com")
return link.replace("cds.audible.com", f"cds.audible.{tld}")
except httpx.HTTPError as exc:

View File

@@ -45,12 +45,16 @@ class ControllerLifecycleMixin(ControllerStateMixin):
try:
proc, return_code = process_mod.run_ffplay(cmd)
if proc is None:
if return_code == 0 and start_position > 0 and self.total_duration and start_position >= self.total_duration - 5:
if (
return_code == 0
and start_position > 0
and self.total_duration
and start_position >= self.total_duration - 5
):
notify("Reached end of file")
self._reset_state()
return False
notify(
f"Playback process exited immediately (code: {return_code})")
notify(f"Playback process exited immediately (code: {return_code})")
return False
self.playback_process = proc
self.is_playing = True
@@ -114,6 +118,8 @@ class ControllerLifecycleMixin(ControllerStateMixin):
download_manager: DownloadManager,
asin: str,
status_callback: StatusCallback | None = None,
preferred_title: str | None = None,
preferred_author: str | None = None,
) -> bool:
"""Download AAX if needed, get activation bytes, then start playback. Returns True on success."""
notify = status_callback or self.notify
@@ -121,7 +127,12 @@ class ControllerLifecycleMixin(ControllerStateMixin):
notify("Could not download file")
return False
notify("Preparing playback...")
local_path = download_manager.get_or_download(asin, notify)
local_path = download_manager.get_or_download(
asin,
notify,
preferred_title=preferred_title,
preferred_author=preferred_author,
)
if not local_path:
notify("Could not download file")
return False
@@ -136,14 +147,15 @@ class ControllerLifecycleMixin(ControllerStateMixin):
last = self.library_client.get_last_position(asin)
if last is not None and last > 0:
start_position = last
notify(
f"Resuming from {LibraryClient.format_time(start_position)}")
notify(f"Resuming from {LibraryClient.format_time(start_position)}")
except (OSError, ValueError, KeyError):
pass
notify(f"Starting playback of {local_path.name}...")
self.current_asin = asin
self.last_save_time = time.time()
return self.start(local_path, activation_hex, notify, start_position, self.playback_speed)
return self.start(
local_path, activation_hex, notify, start_position, self.playback_speed
)
def toggle_playback(self) -> bool:
"""Toggle between pause and resume. Returns True if an action was performed."""
@@ -160,7 +172,10 @@ class ControllerLifecycleMixin(ControllerStateMixin):
return True
def _restart_at_position(
self, new_position: float, new_speed: float | None = None, message: str | None = None
self,
new_position: float,
new_speed: float | None = None,
message: str | None = None,
) -> bool:
"""Stop current process and start again at new_position; optionally set speed and notify."""
if not self.is_playing or not self.current_file_path:
@@ -170,7 +185,9 @@ class ControllerLifecycleMixin(ControllerStateMixin):
speed = new_speed if new_speed is not None else saved["speed"]
self._stop_process()
time.sleep(0.2)
if self.start(saved["file_path"], saved["activation"], self.notify, new_position, speed):
if self.start(
saved["file_path"], saved["activation"], self.notify, new_position, speed
):
self.current_asin = saved["asin"]
self.total_duration = saved["duration"]
self.chapters = saved["chapters"]

View File

@@ -0,0 +1,52 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, cast
from auditui.app.actions import AppActionsMixin
@dataclass(slots=True)
class FakeTable:
"""Minimal table shim exposing cursor and row count."""
row_count: int
cursor_row: int = 0
class DummyActionsApp(AppActionsMixin):
"""Minimal app host used for download naming hint tests."""
def __init__(self) -> None:
"""Initialize state required by action helpers."""
self.current_items: list[dict] = []
self.download_manager = object()
self.library_client = type(
"Library", (), {"extract_asin": lambda self, item: item.get("asin")}
)()
self._table = FakeTable(row_count=0, cursor_row=0)
def update_status(self, message: str) -> None:
"""Ignore status in this focused behavior test."""
del message
def query_one(self, selector: str, _type: object) -> FakeTable:
"""Return the fake table used in selection tests."""
assert selector == "#library_table"
return self._table
def test_action_toggle_download_passes_selected_item() -> None:
"""Ensure download toggle forwards selected item for naming hints."""
app = DummyActionsApp()
seen: list[tuple[str, str | None]] = []
def capture_toggle(asin: str, item: dict | None = None) -> None:
"""Capture download toggle arguments for assertions."""
seen.append((asin, item.get("title") if item else None))
setattr(cast(Any, app), "_toggle_download_async", capture_toggle)
app._table = FakeTable(row_count=1, cursor_row=0)
app.current_items = [{"asin": "ASIN", "title": "Book"}]
app.action_toggle_download()
assert seen == [("ASIN", "Book")]

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, cast
from auditui.app.actions import AppActionsMixin
@@ -41,7 +42,13 @@ class DummyActionsApp(AppActionsMixin):
self.current_items: list[dict] = []
self.download_manager = object()
self.library_client = type(
"Library", (), {"extract_asin": lambda self, item: item.get("asin")}
"Library",
(),
{
"extract_asin": lambda self, item: item.get("asin"),
"extract_title": lambda self, item: item.get("title"),
"extract_authors": lambda self, item: item.get("authors"),
},
)()
self.playback = FakePlayback(True)
self.filter_text = "hello"
@@ -61,10 +68,6 @@ class DummyActionsApp(AppActionsMixin):
"""Record refresh invocations for filter tests."""
self._refreshed += 1
def _start_playback_async(self, asin: str) -> None:
"""Capture async playback launch argument."""
self.messages.append(f"start:{asin}")
def test_get_selected_asin_requires_non_empty_table() -> None:
"""Ensure selection fails gracefully when table has no rows."""
@@ -85,10 +88,18 @@ def test_get_selected_asin_returns_current_row_asin() -> None:
def test_action_play_selected_starts_async_playback() -> None:
"""Ensure play action calls async starter with selected ASIN."""
app = DummyActionsApp()
seen: list[str] = []
def capture_start(asin: str, item: dict | None = None) -> None:
"""Capture playback start arguments for assertions."""
suffix = f":{item.get('title')}" if item else ""
seen.append(f"start:{asin}{suffix}")
setattr(cast(Any, app), "_start_playback_async", capture_start)
app._table = FakeTable(row_count=1, cursor_row=0)
app.current_items = [{"asin": "ASIN"}]
app.current_items = [{"asin": "ASIN", "title": "Book"}]
app.action_play_selected()
assert app.messages[-1] == "start:ASIN"
assert seen[-1] == "start:ASIN:Book"
def test_action_toggle_playback_shows_hint_when_no_playback() -> None:

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
from pathlib import Path
from typing import Any, cast
import pytest
@@ -17,9 +18,11 @@ def _manager_with_cache_dir(tmp_path: Path) -> DownloadManager:
def test_sanitize_filename_replaces_invalid_characters() -> None:
"""Ensure filesystem-invalid symbols are replaced with underscores."""
"""Ensure filename normalization uses ASCII words and dashes."""
manager = DownloadManager.__new__(DownloadManager)
assert manager._sanitize_filename('a<>:"/\\|?*b') == "a_________b"
assert (
manager._sanitize_filename("Stephen King 11/22/63") == "Stephen-King-11-22-63"
)
def test_validate_download_url_accepts_only_http_schemes() -> None:
@@ -35,8 +38,12 @@ def test_get_cached_path_and_remove_cached(
) -> None:
"""Ensure cache lookup and cache deletion work for valid files."""
manager = _manager_with_cache_dir(tmp_path)
monkeypatch.setattr(manager, "_get_name_from_asin", lambda asin: "My Book")
cached_path = tmp_path / "My Book.aax"
monkeypatch.setattr(
manager,
"_get_filename_stems_from_asin",
lambda asin: ["Stephen-King_11-22-63", "11-22-63"],
)
cached_path = tmp_path / "Stephen-King_11-22-63.aax"
cached_path.write_bytes(b"0" * MIN_FILE_SIZE)
messages: list[str] = []
assert manager.get_cached_path("ASIN123") == cached_path
@@ -51,7 +58,34 @@ def test_get_cached_path_ignores_small_files(
) -> None:
"""Ensure undersized files are not treated as valid cache entries."""
manager = _manager_with_cache_dir(tmp_path)
monkeypatch.setattr(manager, "_get_name_from_asin", lambda asin: "My Book")
cached_path = tmp_path / "My Book.aax"
monkeypatch.setattr(
manager,
"_get_filename_stems_from_asin",
lambda asin: ["Stephen-King_11-22-63", "11-22-63"],
)
cached_path = tmp_path / "Stephen-King_11-22-63.aax"
cached_path.write_bytes(b"0" * (MIN_FILE_SIZE - 1))
assert manager.get_cached_path("ASIN123") is None
def test_get_filename_stems_include_author_title_and_legacy_title() -> None:
"""Ensure filename candidates include new author_title and legacy title names."""
manager = DownloadManager.__new__(DownloadManager)
manager.client = cast(
Any,
type(
"Client",
(),
{
"get": lambda self, path, **kwargs: {
"product": {
"title": "11/22/63",
"authors": [{"name": "Stephen King"}],
}
}
},
)(),
)
stems = manager._get_filename_stems_from_asin("B00TEST")
assert stems[0] == "Stephen-King_11-22-63"
assert "11-22-63" in stems

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
from pathlib import Path
from typing import Any, cast
import pytest
@@ -14,9 +15,14 @@ def _bare_manager(tmp_path: Path) -> DownloadManager:
manager = DownloadManager.__new__(DownloadManager)
manager.cache_dir = tmp_path
manager.chunk_size = 1024
manager.auth = type(
"Auth", (), {"adp_token": "x", "locale": type("Loc", (), {"domain": "fr"})()}
)()
manager.auth = cast(
Any,
type(
"Auth",
(),
{"adp_token": "x", "locale": type("Loc", (), {"domain": "fr"})()},
)(),
)
return manager
@@ -48,8 +54,12 @@ def test_get_or_download_uses_cached_file_when_available(
) -> None:
"""Ensure cached files bypass link generation and download work."""
manager = _bare_manager(tmp_path)
monkeypatch.setattr(manager, "_get_name_from_asin", lambda asin: "Book")
cached_path = tmp_path / "Book.aax"
monkeypatch.setattr(
manager,
"_get_filename_stems_from_asin",
lambda asin, preferred_title=None, preferred_author=None: ["Author_Book"],
)
cached_path = tmp_path / "Author_Book.aax"
cached_path.write_bytes(b"1" * MIN_FILE_SIZE)
messages: list[str] = []
assert manager.get_or_download("ASIN", notify=messages.append) == cached_path
@@ -61,7 +71,11 @@ def test_get_or_download_reports_invalid_url(
) -> None:
"""Ensure workflow reports invalid download URLs and aborts."""
manager = _bare_manager(tmp_path)
monkeypatch.setattr(manager, "_get_name_from_asin", lambda asin: "Book")
monkeypatch.setattr(
manager,
"_get_filename_stems_from_asin",
lambda asin, preferred_title=None, preferred_author=None: ["Author_Book"],
)
monkeypatch.setattr(
manager, "_get_download_link", lambda asin, notify=None: "ftp://bad"
)
@@ -75,7 +89,11 @@ def test_get_or_download_handles_download_failure(
) -> None:
"""Ensure workflow reports failures when stream download does not complete."""
manager = _bare_manager(tmp_path)
monkeypatch.setattr(manager, "_get_name_from_asin", lambda asin: "Book")
monkeypatch.setattr(
manager,
"_get_filename_stems_from_asin",
lambda asin, preferred_title=None, preferred_author=None: ["Author_Book"],
)
monkeypatch.setattr(
manager, "_get_download_link", lambda asin, notify=None: "https://ok"
)
@@ -83,3 +101,30 @@ def test_get_or_download_handles_download_failure(
messages: list[str] = []
assert manager.get_or_download("ASIN", notify=messages.append) is None
assert "Download failed" in messages
def test_get_or_download_uses_preferred_naming_hints(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Ensure preferred title/author are forwarded to filename stem selection."""
manager = _bare_manager(tmp_path)
captured: list[tuple[str | None, str | None]] = []
def stems(
asin: str,
preferred_title: str | None = None,
preferred_author: str | None = None,
) -> list[str]:
"""Capture naming hints and return one deterministic filename stem."""
del asin
captured.append((preferred_title, preferred_author))
return ["Author_Book"]
monkeypatch.setattr(manager, "_get_filename_stems_from_asin", stems)
monkeypatch.setattr(manager, "_get_download_link", lambda asin, notify=None: None)
manager.get_or_download(
"ASIN",
preferred_title="11/22/63",
preferred_author="Stephen King",
)
assert captured == [("11/22/63", "Stephen King")]

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
from pathlib import Path
from typing import Any, cast
from auditui.playback import controller_lifecycle as lifecycle_mod
from auditui.playback.controller import PlaybackController
@@ -62,14 +63,21 @@ def test_prepare_and_start_uses_last_position(monkeypatch) -> None:
"""Ensure prepare flow resumes from saved position when available."""
messages: list[str] = []
lib = type("Lib", (), {"get_last_position": lambda self, asin: 75.0})()
controller = PlaybackController(messages.append, lib)
controller = PlaybackController(messages.append, cast(Any, lib))
started: list[tuple] = []
class DM:
"""Download manager shim returning path and activation token."""
def get_or_download(self, asin, notify):
def get_or_download(
self,
asin,
notify,
preferred_title: str | None = None,
preferred_author: str | None = None,
):
"""Return deterministic downloaded file path."""
del asin, notify, preferred_title, preferred_author
return Path("book.aax")
def get_activation_bytes(self):
@@ -78,7 +86,7 @@ def test_prepare_and_start_uses_last_position(monkeypatch) -> None:
monkeypatch.setattr(controller, "start", lambda *args: started.append(args) or True)
monkeypatch.setattr(lifecycle_mod.time, "time", lambda: 200.0)
assert controller.prepare_and_start(DM(), "ASIN") is True
assert controller.prepare_and_start(cast(Any, DM()), "ASIN") is True
assert started and started[0][3] == 75.0
assert "Resuming from 01:15" in messages
@@ -87,7 +95,7 @@ def test_toggle_playback_uses_pause_and_resume_paths(monkeypatch) -> None:
"""Ensure toggle dispatches pause or resume based on paused flag."""
controller, _ = _controller()
controller.is_playing = True
controller.playback_process = Proc(None)
controller.playback_process = cast(Any, Proc(None))
called: list[str] = []
monkeypatch.setattr(controller, "pause", lambda: called.append("pause"))
monkeypatch.setattr(controller, "resume", lambda: called.append("resume"))