Compare commits

..

10 Commits

9 changed files with 318 additions and 75 deletions

View File

@@ -11,6 +11,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- massive code refactoring - massive code refactoring
- complete test suite revamp - complete test suite revamp
- updated download cache naming to use `Author_Title` format with normalized separators.
### Fixed
- reused library metadata for download filename generation to avoid `Unknown-Author_Unknown-Title` when title/author are already known in the UI.
## [0.1.6] - 2026-02-16 ## [0.1.6] - 2026-02-16

View File

@@ -10,11 +10,8 @@ from ..ui import FilterScreen, HelpScreen, StatsScreen
class AppActionsMixin: class AppActionsMixin:
def _get_selected_asin(self) -> str | None: def _get_selected_item(self) -> dict | None:
if not self.download_manager: """Return the currently selected library item from the table."""
self.update_status(
"Not authenticated. Please restart and authenticate.")
return None
table = self.query_one("#library_table", DataTable) table = self.query_one("#library_table", DataTable)
if table.row_count == 0: if table.row_count == 0:
self.update_status("No books available") self.update_status("No books available")
@@ -23,10 +20,27 @@ class AppActionsMixin:
if cursor_row >= len(self.current_items): if cursor_row >= len(self.current_items):
self.update_status("Invalid selection") self.update_status("Invalid selection")
return None return None
return self.current_items[cursor_row]
def _get_naming_hints(self, item: dict | None) -> tuple[str | None, str | None]:
"""Return preferred title and author values used for download filenames."""
if not item or not self.library_client:
return (None, None)
return (
self.library_client.extract_title(item),
self.library_client.extract_authors(item),
)
def _get_selected_asin(self) -> str | None:
if not self.download_manager:
self.update_status("Not authenticated. Please restart and authenticate.")
return None
if not self.library_client: if not self.library_client:
self.update_status("Library client not available") self.update_status("Library client not available")
return None return None
selected_item = self.current_items[cursor_row] selected_item = self._get_selected_item()
if not selected_item:
return None
asin = self.library_client.extract_asin(selected_item) asin = self.library_client.extract_asin(selected_item)
if not asin: if not asin:
self.update_status("Could not get ASIN for selected book") self.update_status("Could not get ASIN for selected book")
@@ -36,7 +50,7 @@ class AppActionsMixin:
def action_play_selected(self) -> None: def action_play_selected(self) -> None:
asin = self._get_selected_asin() asin = self._get_selected_asin()
if asin: if asin:
self._start_playback_async(asin) self._start_playback_async(asin, self._get_selected_item())
def action_toggle_playback(self) -> None: def action_toggle_playback(self) -> None:
if not self.playback.toggle_playback(): if not self.playback.toggle_playback():
@@ -86,8 +100,7 @@ class AppActionsMixin:
return return
if self.library_client.is_finished(selected_item): if self.library_client.is_finished(selected_item):
self.call_from_thread(self.update_status, self.call_from_thread(self.update_status, "Already marked as finished")
"Already marked as finished")
return return
success = self.library_client.mark_as_finished(asin, selected_item) success = self.library_client.mark_as_finished(asin, selected_item)
@@ -132,28 +145,36 @@ class AppActionsMixin:
def action_toggle_download(self) -> None: def action_toggle_download(self) -> None:
asin = self._get_selected_asin() asin = self._get_selected_asin()
if asin: if asin:
self._toggle_download_async(asin) self._toggle_download_async(asin, self._get_selected_item())
@work(exclusive=True, thread=True) @work(exclusive=True, thread=True)
def _toggle_download_async(self, asin: str) -> None: def _toggle_download_async(self, asin: str, item: dict | None = None) -> None:
if not self.download_manager: if not self.download_manager:
return return
preferred_title, preferred_author = self._get_naming_hints(item)
if self.download_manager.is_cached(asin): if self.download_manager.is_cached(asin):
self.download_manager.remove_cached( self.download_manager.remove_cached(asin, self._thread_status_update)
asin, self._thread_status_update)
else: else:
self.download_manager.get_or_download( self.download_manager.get_or_download(
asin, self._thread_status_update) asin,
self._thread_status_update,
preferred_title=preferred_title,
preferred_author=preferred_author,
)
self.call_from_thread(self._refresh_table) self.call_from_thread(self._refresh_table)
@work(exclusive=True, thread=True) @work(exclusive=True, thread=True)
def _start_playback_async(self, asin: str) -> None: def _start_playback_async(self, asin: str, item: dict | None = None) -> None:
if not self.download_manager: if not self.download_manager:
return return
preferred_title, preferred_author = self._get_naming_hints(item)
self.playback.prepare_and_start( self.playback.prepare_and_start(
self.download_manager, self.download_manager,
asin, asin,
self._thread_status_update, self._thread_status_update,
preferred_title,
preferred_author,
) )

View File

@@ -1,6 +1,7 @@
"""Obtains AAX files from Audible (cache or download) and provides activation bytes.""" """Obtains AAX files from Audible (cache or download) and provides activation bytes."""
import re import re
import unicodedata
from pathlib import Path from pathlib import Path
from urllib.parse import urlparse from urllib.parse import urlparse
@@ -33,26 +34,31 @@ class DownloadManager:
self.cache_dir = cache_dir self.cache_dir = cache_dir
self.cache_dir.mkdir(parents=True, exist_ok=True) self.cache_dir.mkdir(parents=True, exist_ok=True)
self.chunk_size = chunk_size self.chunk_size = chunk_size
self._http_client = httpx.Client( self._http_client = httpx.Client(auth=auth, timeout=30.0, follow_redirects=True)
auth=auth, timeout=30.0, follow_redirects=True)
self._download_client = httpx.Client( self._download_client = httpx.Client(
timeout=httpx.Timeout(connect=30.0, read=None, timeout=httpx.Timeout(connect=30.0, read=None, write=30.0, pool=30.0),
write=30.0, pool=30.0),
follow_redirects=True, follow_redirects=True,
) )
def get_or_download( def get_or_download(
self, asin: str, notify: StatusCallback | None = None self,
asin: str,
notify: StatusCallback | None = None,
preferred_title: str | None = None,
preferred_author: str | None = None,
) -> Path | None: ) -> Path | None:
"""Return local path to AAX file; download and cache if not present.""" """Return local path to AAX file; download and cache if not present."""
title = self._get_name_from_asin(asin) or asin filename_stems = self._get_filename_stems_from_asin(
safe_title = self._sanitize_filename(title) asin,
local_path = self.cache_dir / f"{safe_title}.aax" preferred_title=preferred_title,
preferred_author=preferred_author,
if local_path.exists() and local_path.stat().st_size >= MIN_FILE_SIZE: )
local_path = self.cache_dir / f"{filename_stems[0]}.aax"
cached_path = self._find_cached_path(filename_stems)
if cached_path:
if notify: if notify:
notify(f"Using cached file: {local_path.name}") notify(f"Using cached file: {cached_path.name}")
return local_path return cached_path
if notify: if notify:
notify(f"Downloading to {local_path.name}...") notify(f"Downloading to {local_path.name}...")
@@ -92,12 +98,7 @@ class DownloadManager:
def get_cached_path(self, asin: str) -> Path | None: def get_cached_path(self, asin: str) -> Path | None:
"""Return path to cached AAX file if it exists and is valid size.""" """Return path to cached AAX file if it exists and is valid size."""
title = self._get_name_from_asin(asin) or asin return self._find_cached_path(self._get_filename_stems_from_asin(asin))
safe_title = self._sanitize_filename(title)
local_path = self.cache_dir / f"{safe_title}.aax"
if local_path.exists() and local_path.stat().st_size >= MIN_FILE_SIZE:
return local_path
return None
def is_cached(self, asin: str) -> bool: def is_cached(self, asin: str) -> bool:
"""Return True if the title is present in cache with valid size.""" """Return True if the title is present in cache with valid size."""
@@ -130,20 +131,68 @@ class DownloadManager:
return False return False
def _sanitize_filename(self, filename: str) -> str: def _sanitize_filename(self, filename: str) -> str:
"""Remove invalid characters from filename.""" """Normalize a filename segment with ASCII letters, digits, and dashes."""
return re.sub(r'[<>:"/\\|?*]', "_", filename) ascii_text = unicodedata.normalize("NFKD", filename)
ascii_text = ascii_text.encode("ascii", "ignore").decode("ascii")
ascii_text = re.sub(r"[’'`]+", "", ascii_text)
ascii_text = re.sub(r"[^A-Za-z0-9]+", "-", ascii_text)
ascii_text = re.sub(r"-+", "-", ascii_text)
ascii_text = ascii_text.strip("-._")
return ascii_text or "Unknown"
def _find_cached_path(self, filename_stems: list[str]) -> Path | None:
"""Return the first valid cached path matching any candidate filename stem."""
for filename_stem in filename_stems:
local_path = self.cache_dir / f"{filename_stem}.aax"
if local_path.exists() and local_path.stat().st_size >= MIN_FILE_SIZE:
return local_path
return None
def _get_filename_stems_from_asin(
self,
asin: str,
preferred_title: str | None = None,
preferred_author: str | None = None,
) -> list[str]:
"""Build preferred and fallback cache filename stems for an ASIN."""
if preferred_title:
preferred_combined = (
f"{self._sanitize_filename(preferred_author or 'Unknown Author')}_"
f"{self._sanitize_filename(preferred_title)}"
)
preferred_legacy = self._sanitize_filename(preferred_title)
fallback_asin = self._sanitize_filename(asin)
return list(
dict.fromkeys([preferred_combined, preferred_legacy, fallback_asin])
)
def _get_name_from_asin(self, asin: str) -> str | None:
"""Get the title/name of a book from its ASIN."""
try: try:
product_info = self.client.get( product_info = self.client.get(
path=f"1.0/catalog/products/{asin}", path=f"1.0/catalog/products/{asin}",
response_groups="product_desc,product_attrs", **{"response_groups": "contributors,product_desc,product_attrs"},
) )
product = product_info.get("product", {}) product = product_info.get("product", {})
return product.get("title") or "Unknown Title" title = product.get("title") or "Unknown Title"
except (OSError, ValueError, KeyError): author = self._get_primary_author(product)
return None combined = (
f"{self._sanitize_filename(author)}_{self._sanitize_filename(title)}"
)
legacy_title = self._sanitize_filename(title)
fallback_asin = self._sanitize_filename(asin)
return list(dict.fromkeys([combined, legacy_title, fallback_asin]))
except (OSError, ValueError, KeyError, AttributeError):
return [self._sanitize_filename(asin)]
def _get_primary_author(self, product: dict) -> str:
"""Extract a primary author name from product metadata."""
contributors = product.get("authors") or product.get("contributors") or []
for contributor in contributors:
if not isinstance(contributor, dict):
continue
name = contributor.get("name")
if isinstance(name, str) and name.strip():
return name
return "Unknown Author"
def _get_download_link( def _get_download_link(
self, self,
@@ -174,7 +223,8 @@ class DownloadManager:
if not link: if not link:
link = str(response.url) link = str(response.url)
tld = self.auth.locale.domain locale = getattr(self.auth, "locale", None)
tld = getattr(locale, "domain", "com")
return link.replace("cds.audible.com", f"cds.audible.{tld}") return link.replace("cds.audible.com", f"cds.audible.{tld}")
except httpx.HTTPError as exc: except httpx.HTTPError as exc:

View File

@@ -45,12 +45,16 @@ class ControllerLifecycleMixin(ControllerStateMixin):
try: try:
proc, return_code = process_mod.run_ffplay(cmd) proc, return_code = process_mod.run_ffplay(cmd)
if proc is None: if proc is None:
if return_code == 0 and start_position > 0 and self.total_duration and start_position >= self.total_duration - 5: if (
return_code == 0
and start_position > 0
and self.total_duration
and start_position >= self.total_duration - 5
):
notify("Reached end of file") notify("Reached end of file")
self._reset_state() self._reset_state()
return False return False
notify( notify(f"Playback process exited immediately (code: {return_code})")
f"Playback process exited immediately (code: {return_code})")
return False return False
self.playback_process = proc self.playback_process = proc
self.is_playing = True self.is_playing = True
@@ -114,6 +118,8 @@ class ControllerLifecycleMixin(ControllerStateMixin):
download_manager: DownloadManager, download_manager: DownloadManager,
asin: str, asin: str,
status_callback: StatusCallback | None = None, status_callback: StatusCallback | None = None,
preferred_title: str | None = None,
preferred_author: str | None = None,
) -> bool: ) -> bool:
"""Download AAX if needed, get activation bytes, then start playback. Returns True on success.""" """Download AAX if needed, get activation bytes, then start playback. Returns True on success."""
notify = status_callback or self.notify notify = status_callback or self.notify
@@ -121,7 +127,12 @@ class ControllerLifecycleMixin(ControllerStateMixin):
notify("Could not download file") notify("Could not download file")
return False return False
notify("Preparing playback...") notify("Preparing playback...")
local_path = download_manager.get_or_download(asin, notify) local_path = download_manager.get_or_download(
asin,
notify,
preferred_title=preferred_title,
preferred_author=preferred_author,
)
if not local_path: if not local_path:
notify("Could not download file") notify("Could not download file")
return False return False
@@ -136,14 +147,15 @@ class ControllerLifecycleMixin(ControllerStateMixin):
last = self.library_client.get_last_position(asin) last = self.library_client.get_last_position(asin)
if last is not None and last > 0: if last is not None and last > 0:
start_position = last start_position = last
notify( notify(f"Resuming from {LibraryClient.format_time(start_position)}")
f"Resuming from {LibraryClient.format_time(start_position)}")
except (OSError, ValueError, KeyError): except (OSError, ValueError, KeyError):
pass pass
notify(f"Starting playback of {local_path.name}...") notify(f"Starting playback of {local_path.name}...")
self.current_asin = asin self.current_asin = asin
self.last_save_time = time.time() self.last_save_time = time.time()
return self.start(local_path, activation_hex, notify, start_position, self.playback_speed) return self.start(
local_path, activation_hex, notify, start_position, self.playback_speed
)
def toggle_playback(self) -> bool: def toggle_playback(self) -> bool:
"""Toggle between pause and resume. Returns True if an action was performed.""" """Toggle between pause and resume. Returns True if an action was performed."""
@@ -160,7 +172,10 @@ class ControllerLifecycleMixin(ControllerStateMixin):
return True return True
def _restart_at_position( def _restart_at_position(
self, new_position: float, new_speed: float | None = None, message: str | None = None self,
new_position: float,
new_speed: float | None = None,
message: str | None = None,
) -> bool: ) -> bool:
"""Stop current process and start again at new_position; optionally set speed and notify.""" """Stop current process and start again at new_position; optionally set speed and notify."""
if not self.is_playing or not self.current_file_path: if not self.is_playing or not self.current_file_path:
@@ -170,7 +185,9 @@ class ControllerLifecycleMixin(ControllerStateMixin):
speed = new_speed if new_speed is not None else saved["speed"] speed = new_speed if new_speed is not None else saved["speed"]
self._stop_process() self._stop_process()
time.sleep(0.2) time.sleep(0.2)
if self.start(saved["file_path"], saved["activation"], self.notify, new_position, speed): if self.start(
saved["file_path"], saved["activation"], self.notify, new_position, speed
):
self.current_asin = saved["asin"] self.current_asin = saved["asin"]
self.total_duration = saved["duration"] self.total_duration = saved["duration"]
self.chapters = saved["chapters"] self.chapters = saved["chapters"]

View File

@@ -0,0 +1,52 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, cast
from auditui.app.actions import AppActionsMixin
@dataclass(slots=True)
class FakeTable:
"""Minimal table shim exposing cursor and row count."""
row_count: int
cursor_row: int = 0
class DummyActionsApp(AppActionsMixin):
"""Minimal app host used for download naming hint tests."""
def __init__(self) -> None:
"""Initialize state required by action helpers."""
self.current_items: list[dict] = []
self.download_manager = object()
self.library_client = type(
"Library", (), {"extract_asin": lambda self, item: item.get("asin")}
)()
self._table = FakeTable(row_count=0, cursor_row=0)
def update_status(self, message: str) -> None:
"""Ignore status in this focused behavior test."""
del message
def query_one(self, selector: str, _type: object) -> FakeTable:
"""Return the fake table used in selection tests."""
assert selector == "#library_table"
return self._table
def test_action_toggle_download_passes_selected_item() -> None:
"""Ensure download toggle forwards selected item for naming hints."""
app = DummyActionsApp()
seen: list[tuple[str, str | None]] = []
def capture_toggle(asin: str, item: dict | None = None) -> None:
"""Capture download toggle arguments for assertions."""
seen.append((asin, item.get("title") if item else None))
setattr(cast(Any, app), "_toggle_download_async", capture_toggle)
app._table = FakeTable(row_count=1, cursor_row=0)
app.current_items = [{"asin": "ASIN", "title": "Book"}]
app.action_toggle_download()
assert seen == [("ASIN", "Book")]

View File

@@ -1,6 +1,7 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any, cast
from auditui.app.actions import AppActionsMixin from auditui.app.actions import AppActionsMixin
@@ -41,7 +42,13 @@ class DummyActionsApp(AppActionsMixin):
self.current_items: list[dict] = [] self.current_items: list[dict] = []
self.download_manager = object() self.download_manager = object()
self.library_client = type( self.library_client = type(
"Library", (), {"extract_asin": lambda self, item: item.get("asin")} "Library",
(),
{
"extract_asin": lambda self, item: item.get("asin"),
"extract_title": lambda self, item: item.get("title"),
"extract_authors": lambda self, item: item.get("authors"),
},
)() )()
self.playback = FakePlayback(True) self.playback = FakePlayback(True)
self.filter_text = "hello" self.filter_text = "hello"
@@ -61,10 +68,6 @@ class DummyActionsApp(AppActionsMixin):
"""Record refresh invocations for filter tests.""" """Record refresh invocations for filter tests."""
self._refreshed += 1 self._refreshed += 1
def _start_playback_async(self, asin: str) -> None:
"""Capture async playback launch argument."""
self.messages.append(f"start:{asin}")
def test_get_selected_asin_requires_non_empty_table() -> None: def test_get_selected_asin_requires_non_empty_table() -> None:
"""Ensure selection fails gracefully when table has no rows.""" """Ensure selection fails gracefully when table has no rows."""
@@ -85,10 +88,18 @@ def test_get_selected_asin_returns_current_row_asin() -> None:
def test_action_play_selected_starts_async_playback() -> None: def test_action_play_selected_starts_async_playback() -> None:
"""Ensure play action calls async starter with selected ASIN.""" """Ensure play action calls async starter with selected ASIN."""
app = DummyActionsApp() app = DummyActionsApp()
seen: list[str] = []
def capture_start(asin: str, item: dict | None = None) -> None:
"""Capture playback start arguments for assertions."""
suffix = f":{item.get('title')}" if item else ""
seen.append(f"start:{asin}{suffix}")
setattr(cast(Any, app), "_start_playback_async", capture_start)
app._table = FakeTable(row_count=1, cursor_row=0) app._table = FakeTable(row_count=1, cursor_row=0)
app.current_items = [{"asin": "ASIN"}] app.current_items = [{"asin": "ASIN", "title": "Book"}]
app.action_play_selected() app.action_play_selected()
assert app.messages[-1] == "start:ASIN" assert seen[-1] == "start:ASIN:Book"
def test_action_toggle_playback_shows_hint_when_no_playback() -> None: def test_action_toggle_playback_shows_hint_when_no_playback() -> None:

View File

@@ -1,6 +1,7 @@
from __future__ import annotations from __future__ import annotations
from pathlib import Path from pathlib import Path
from typing import Any, cast
import pytest import pytest
@@ -17,9 +18,11 @@ def _manager_with_cache_dir(tmp_path: Path) -> DownloadManager:
def test_sanitize_filename_replaces_invalid_characters() -> None: def test_sanitize_filename_replaces_invalid_characters() -> None:
"""Ensure filesystem-invalid symbols are replaced with underscores.""" """Ensure filename normalization uses ASCII words and dashes."""
manager = DownloadManager.__new__(DownloadManager) manager = DownloadManager.__new__(DownloadManager)
assert manager._sanitize_filename('a<>:"/\\|?*b') == "a_________b" assert (
manager._sanitize_filename("Stephen King 11/22/63") == "Stephen-King-11-22-63"
)
def test_validate_download_url_accepts_only_http_schemes() -> None: def test_validate_download_url_accepts_only_http_schemes() -> None:
@@ -35,8 +38,12 @@ def test_get_cached_path_and_remove_cached(
) -> None: ) -> None:
"""Ensure cache lookup and cache deletion work for valid files.""" """Ensure cache lookup and cache deletion work for valid files."""
manager = _manager_with_cache_dir(tmp_path) manager = _manager_with_cache_dir(tmp_path)
monkeypatch.setattr(manager, "_get_name_from_asin", lambda asin: "My Book") monkeypatch.setattr(
cached_path = tmp_path / "My Book.aax" manager,
"_get_filename_stems_from_asin",
lambda asin: ["Stephen-King_11-22-63", "11-22-63"],
)
cached_path = tmp_path / "Stephen-King_11-22-63.aax"
cached_path.write_bytes(b"0" * MIN_FILE_SIZE) cached_path.write_bytes(b"0" * MIN_FILE_SIZE)
messages: list[str] = [] messages: list[str] = []
assert manager.get_cached_path("ASIN123") == cached_path assert manager.get_cached_path("ASIN123") == cached_path
@@ -51,7 +58,34 @@ def test_get_cached_path_ignores_small_files(
) -> None: ) -> None:
"""Ensure undersized files are not treated as valid cache entries.""" """Ensure undersized files are not treated as valid cache entries."""
manager = _manager_with_cache_dir(tmp_path) manager = _manager_with_cache_dir(tmp_path)
monkeypatch.setattr(manager, "_get_name_from_asin", lambda asin: "My Book") monkeypatch.setattr(
cached_path = tmp_path / "My Book.aax" manager,
"_get_filename_stems_from_asin",
lambda asin: ["Stephen-King_11-22-63", "11-22-63"],
)
cached_path = tmp_path / "Stephen-King_11-22-63.aax"
cached_path.write_bytes(b"0" * (MIN_FILE_SIZE - 1)) cached_path.write_bytes(b"0" * (MIN_FILE_SIZE - 1))
assert manager.get_cached_path("ASIN123") is None assert manager.get_cached_path("ASIN123") is None
def test_get_filename_stems_include_author_title_and_legacy_title() -> None:
"""Ensure filename candidates include new author_title and legacy title names."""
manager = DownloadManager.__new__(DownloadManager)
manager.client = cast(
Any,
type(
"Client",
(),
{
"get": lambda self, path, **kwargs: {
"product": {
"title": "11/22/63",
"authors": [{"name": "Stephen King"}],
}
}
},
)(),
)
stems = manager._get_filename_stems_from_asin("B00TEST")
assert stems[0] == "Stephen-King_11-22-63"
assert "11-22-63" in stems

View File

@@ -1,6 +1,7 @@
from __future__ import annotations from __future__ import annotations
from pathlib import Path from pathlib import Path
from typing import Any, cast
import pytest import pytest
@@ -14,9 +15,14 @@ def _bare_manager(tmp_path: Path) -> DownloadManager:
manager = DownloadManager.__new__(DownloadManager) manager = DownloadManager.__new__(DownloadManager)
manager.cache_dir = tmp_path manager.cache_dir = tmp_path
manager.chunk_size = 1024 manager.chunk_size = 1024
manager.auth = type( manager.auth = cast(
"Auth", (), {"adp_token": "x", "locale": type("Loc", (), {"domain": "fr"})()} Any,
)() type(
"Auth",
(),
{"adp_token": "x", "locale": type("Loc", (), {"domain": "fr"})()},
)(),
)
return manager return manager
@@ -48,8 +54,12 @@ def test_get_or_download_uses_cached_file_when_available(
) -> None: ) -> None:
"""Ensure cached files bypass link generation and download work.""" """Ensure cached files bypass link generation and download work."""
manager = _bare_manager(tmp_path) manager = _bare_manager(tmp_path)
monkeypatch.setattr(manager, "_get_name_from_asin", lambda asin: "Book") monkeypatch.setattr(
cached_path = tmp_path / "Book.aax" manager,
"_get_filename_stems_from_asin",
lambda asin, preferred_title=None, preferred_author=None: ["Author_Book"],
)
cached_path = tmp_path / "Author_Book.aax"
cached_path.write_bytes(b"1" * MIN_FILE_SIZE) cached_path.write_bytes(b"1" * MIN_FILE_SIZE)
messages: list[str] = [] messages: list[str] = []
assert manager.get_or_download("ASIN", notify=messages.append) == cached_path assert manager.get_or_download("ASIN", notify=messages.append) == cached_path
@@ -61,7 +71,11 @@ def test_get_or_download_reports_invalid_url(
) -> None: ) -> None:
"""Ensure workflow reports invalid download URLs and aborts.""" """Ensure workflow reports invalid download URLs and aborts."""
manager = _bare_manager(tmp_path) manager = _bare_manager(tmp_path)
monkeypatch.setattr(manager, "_get_name_from_asin", lambda asin: "Book") monkeypatch.setattr(
manager,
"_get_filename_stems_from_asin",
lambda asin, preferred_title=None, preferred_author=None: ["Author_Book"],
)
monkeypatch.setattr( monkeypatch.setattr(
manager, "_get_download_link", lambda asin, notify=None: "ftp://bad" manager, "_get_download_link", lambda asin, notify=None: "ftp://bad"
) )
@@ -75,7 +89,11 @@ def test_get_or_download_handles_download_failure(
) -> None: ) -> None:
"""Ensure workflow reports failures when stream download does not complete.""" """Ensure workflow reports failures when stream download does not complete."""
manager = _bare_manager(tmp_path) manager = _bare_manager(tmp_path)
monkeypatch.setattr(manager, "_get_name_from_asin", lambda asin: "Book") monkeypatch.setattr(
manager,
"_get_filename_stems_from_asin",
lambda asin, preferred_title=None, preferred_author=None: ["Author_Book"],
)
monkeypatch.setattr( monkeypatch.setattr(
manager, "_get_download_link", lambda asin, notify=None: "https://ok" manager, "_get_download_link", lambda asin, notify=None: "https://ok"
) )
@@ -83,3 +101,30 @@ def test_get_or_download_handles_download_failure(
messages: list[str] = [] messages: list[str] = []
assert manager.get_or_download("ASIN", notify=messages.append) is None assert manager.get_or_download("ASIN", notify=messages.append) is None
assert "Download failed" in messages assert "Download failed" in messages
def test_get_or_download_uses_preferred_naming_hints(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Ensure preferred title/author are forwarded to filename stem selection."""
manager = _bare_manager(tmp_path)
captured: list[tuple[str | None, str | None]] = []
def stems(
asin: str,
preferred_title: str | None = None,
preferred_author: str | None = None,
) -> list[str]:
"""Capture naming hints and return one deterministic filename stem."""
del asin
captured.append((preferred_title, preferred_author))
return ["Author_Book"]
monkeypatch.setattr(manager, "_get_filename_stems_from_asin", stems)
monkeypatch.setattr(manager, "_get_download_link", lambda asin, notify=None: None)
manager.get_or_download(
"ASIN",
preferred_title="11/22/63",
preferred_author="Stephen King",
)
assert captured == [("11/22/63", "Stephen King")]

View File

@@ -1,6 +1,7 @@
from __future__ import annotations from __future__ import annotations
from pathlib import Path from pathlib import Path
from typing import Any, cast
from auditui.playback import controller_lifecycle as lifecycle_mod from auditui.playback import controller_lifecycle as lifecycle_mod
from auditui.playback.controller import PlaybackController from auditui.playback.controller import PlaybackController
@@ -62,14 +63,21 @@ def test_prepare_and_start_uses_last_position(monkeypatch) -> None:
"""Ensure prepare flow resumes from saved position when available.""" """Ensure prepare flow resumes from saved position when available."""
messages: list[str] = [] messages: list[str] = []
lib = type("Lib", (), {"get_last_position": lambda self, asin: 75.0})() lib = type("Lib", (), {"get_last_position": lambda self, asin: 75.0})()
controller = PlaybackController(messages.append, lib) controller = PlaybackController(messages.append, cast(Any, lib))
started: list[tuple] = [] started: list[tuple] = []
class DM: class DM:
"""Download manager shim returning path and activation token.""" """Download manager shim returning path and activation token."""
def get_or_download(self, asin, notify): def get_or_download(
self,
asin,
notify,
preferred_title: str | None = None,
preferred_author: str | None = None,
):
"""Return deterministic downloaded file path.""" """Return deterministic downloaded file path."""
del asin, notify, preferred_title, preferred_author
return Path("book.aax") return Path("book.aax")
def get_activation_bytes(self): def get_activation_bytes(self):
@@ -78,7 +86,7 @@ def test_prepare_and_start_uses_last_position(monkeypatch) -> None:
monkeypatch.setattr(controller, "start", lambda *args: started.append(args) or True) monkeypatch.setattr(controller, "start", lambda *args: started.append(args) or True)
monkeypatch.setattr(lifecycle_mod.time, "time", lambda: 200.0) monkeypatch.setattr(lifecycle_mod.time, "time", lambda: 200.0)
assert controller.prepare_and_start(DM(), "ASIN") is True assert controller.prepare_and_start(cast(Any, DM()), "ASIN") is True
assert started and started[0][3] == 75.0 assert started and started[0][3] == 75.0
assert "Resuming from 01:15" in messages assert "Resuming from 01:15" in messages
@@ -87,7 +95,7 @@ def test_toggle_playback_uses_pause_and_resume_paths(monkeypatch) -> None:
"""Ensure toggle dispatches pause or resume based on paused flag.""" """Ensure toggle dispatches pause or resume based on paused flag."""
controller, _ = _controller() controller, _ = _controller()
controller.is_playing = True controller.is_playing = True
controller.playback_process = Proc(None) controller.playback_process = cast(Any, Proc(None))
called: list[str] = [] called: list[str] = []
monkeypatch.setattr(controller, "pause", lambda: called.append("pause")) monkeypatch.setattr(controller, "pause", lambda: called.append("pause"))
monkeypatch.setattr(controller, "resume", lambda: called.append("resume")) monkeypatch.setattr(controller, "resume", lambda: called.append("resume"))