Compare commits
3 Commits
bd2bd43e7f
...
e88dcee155
| Author | SHA1 | Date | |
|---|---|---|---|
| e88dcee155 | |||
| 4bc9b3fd3f | |||
| cd99960f2f |
124
tests/app/test_app_actions_selection_and_controls.py
Normal file
124
tests/app/test_app_actions_selection_and_controls.py
Normal file
@@ -0,0 +1,124 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
from auditui.app.actions import AppActionsMixin
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class FakeTable:
|
||||
"""Minimal table shim exposing cursor and row count."""
|
||||
|
||||
row_count: int
|
||||
cursor_row: int = 0
|
||||
|
||||
|
||||
class FakePlayback:
|
||||
"""Playback stub with togglable boolean return values."""
|
||||
|
||||
def __init__(self, result: bool) -> None:
|
||||
"""Store deterministic toggle result for tests."""
|
||||
self._result = result
|
||||
self.calls: list[str] = []
|
||||
|
||||
def toggle_playback(self) -> bool:
|
||||
"""Return configured result and record call."""
|
||||
self.calls.append("toggle")
|
||||
return self._result
|
||||
|
||||
def seek_forward(self, _seconds: float) -> bool:
|
||||
"""Return configured result and record call."""
|
||||
self.calls.append("seek_forward")
|
||||
return self._result
|
||||
|
||||
|
||||
class DummyActionsApp(AppActionsMixin):
|
||||
"""Mixin host with just enough state for action method tests."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Initialize fake app state used by action helpers."""
|
||||
self.messages: list[str] = []
|
||||
self.current_items: list[dict] = []
|
||||
self.download_manager = object()
|
||||
self.library_client = type(
|
||||
"Library", (), {"extract_asin": lambda self, item: item.get("asin")}
|
||||
)()
|
||||
self.playback = FakePlayback(True)
|
||||
self.filter_text = "hello"
|
||||
self._refreshed = 0
|
||||
self._table = FakeTable(row_count=0, cursor_row=0)
|
||||
|
||||
def update_status(self, message: str) -> None:
|
||||
"""Collect status messages for assertions."""
|
||||
self.messages.append(message)
|
||||
|
||||
def query_one(self, selector: str, _type: object) -> FakeTable:
|
||||
"""Return the fake table used in selection tests."""
|
||||
assert selector == "#library_table"
|
||||
return self._table
|
||||
|
||||
def _refresh_filtered_view(self) -> None:
|
||||
"""Record refresh invocations for filter tests."""
|
||||
self._refreshed += 1
|
||||
|
||||
def _start_playback_async(self, asin: str) -> None:
|
||||
"""Capture async playback launch argument."""
|
||||
self.messages.append(f"start:{asin}")
|
||||
|
||||
|
||||
def test_get_selected_asin_requires_non_empty_table() -> None:
|
||||
"""Ensure selection fails gracefully when table has no rows."""
|
||||
app = DummyActionsApp()
|
||||
app._table = FakeTable(row_count=0)
|
||||
assert app._get_selected_asin() is None
|
||||
assert app.messages[-1] == "No books available"
|
||||
|
||||
|
||||
def test_get_selected_asin_returns_current_row_asin() -> None:
|
||||
"""Ensure selected row index maps to current_items ASIN."""
|
||||
app = DummyActionsApp()
|
||||
app._table = FakeTable(row_count=2, cursor_row=1)
|
||||
app.current_items = [{"asin": "A1"}, {"asin": "A2"}]
|
||||
assert app._get_selected_asin() == "A2"
|
||||
|
||||
|
||||
def test_action_play_selected_starts_async_playback() -> None:
|
||||
"""Ensure play action calls async starter with selected ASIN."""
|
||||
app = DummyActionsApp()
|
||||
app._table = FakeTable(row_count=1, cursor_row=0)
|
||||
app.current_items = [{"asin": "ASIN"}]
|
||||
app.action_play_selected()
|
||||
assert app.messages[-1] == "start:ASIN"
|
||||
|
||||
|
||||
def test_action_toggle_playback_shows_hint_when_no_playback() -> None:
|
||||
"""Ensure toggle action displays no-playback hint on false return."""
|
||||
app = DummyActionsApp()
|
||||
app.playback = FakePlayback(False)
|
||||
app.action_toggle_playback()
|
||||
assert app.messages[-1] == "No playback active. Press Enter to play a book."
|
||||
|
||||
|
||||
def test_action_seek_forward_shows_hint_when_seek_fails() -> None:
|
||||
"""Ensure failed seek action reuses no-playback helper status."""
|
||||
app = DummyActionsApp()
|
||||
app.playback = FakePlayback(False)
|
||||
app.action_seek_forward()
|
||||
assert app.messages[-1] == "No playback active. Press Enter to play a book."
|
||||
|
||||
|
||||
def test_action_clear_filter_resets_filter_and_refreshes() -> None:
|
||||
"""Ensure clearing filter resets text and refreshes filtered view."""
|
||||
app = DummyActionsApp()
|
||||
app.action_clear_filter()
|
||||
assert app.filter_text == ""
|
||||
assert app._refreshed == 1
|
||||
assert app.messages[-1] == "Filter cleared"
|
||||
|
||||
|
||||
def test_apply_filter_coerces_none_to_empty_string() -> None:
|
||||
"""Ensure apply_filter normalizes None and refreshes list view."""
|
||||
app = DummyActionsApp()
|
||||
app._apply_filter(None)
|
||||
assert app.filter_text == ""
|
||||
assert app._refreshed == 1
|
||||
@@ -32,27 +32,25 @@ EXPECTED_BINDINGS: tuple[NormalizedBinding, ...] = (
|
||||
)
|
||||
|
||||
|
||||
def _normalize_binding(
|
||||
binding: Binding | BindingTuple,
|
||||
) -> NormalizedBinding:
|
||||
"""Return key, action, description, and priority for a binding item."""
|
||||
def _normalize_binding(binding: Binding | BindingTuple) -> NormalizedBinding:
|
||||
"""Return key, action, description, and priority from one binding item."""
|
||||
if isinstance(binding, Binding):
|
||||
return (binding.key, binding.action, binding.description, binding.priority)
|
||||
key, action, description = binding
|
||||
return (key, action, description, False)
|
||||
|
||||
|
||||
def _normalize_bindings() -> list[NormalizedBinding]:
|
||||
"""Normalize all declared bindings to a comparable shape."""
|
||||
def _all_bindings() -> list[NormalizedBinding]:
|
||||
"""Normalize all app bindings into a stable comparable structure."""
|
||||
return [_normalize_binding(binding) for binding in BINDINGS]
|
||||
|
||||
|
||||
def test_bindings_match_expected_shortcuts() -> None:
|
||||
"""Ensure the app ships with the expected binding set and actions."""
|
||||
assert _normalize_bindings() == list(EXPECTED_BINDINGS)
|
||||
"""Ensure the shipped shortcut list stays stable and explicit."""
|
||||
assert _all_bindings() == list(EXPECTED_BINDINGS)
|
||||
|
||||
|
||||
def test_binding_keys_are_unique() -> None:
|
||||
"""Ensure each key is defined once to avoid ambiguous key dispatch."""
|
||||
keys = [binding[0] for binding in _normalize_bindings()]
|
||||
"""Ensure each key is defined only once to avoid dispatch ambiguity."""
|
||||
keys = [binding[0] for binding in _all_bindings()]
|
||||
assert len(keys) == len(set(keys))
|
||||
114
tests/app/test_app_library_mixin_behavior.py
Normal file
114
tests/app/test_app_library_mixin_behavior.py
Normal file
@@ -0,0 +1,114 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from auditui.app.library import AppLibraryMixin
|
||||
from auditui.app import library as library_mod
|
||||
|
||||
|
||||
class DummyLibraryApp(AppLibraryMixin):
|
||||
"""Mixin host exposing only members used by AppLibraryMixin."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Initialize in-memory app state and call tracking."""
|
||||
self.all_items: list[dict] = []
|
||||
self.show_all_mode = False
|
||||
self._search_text_cache: dict[int, str] = {1: "x"}
|
||||
self.messages: list[str] = []
|
||||
self.call_log: list[tuple[str, tuple]] = []
|
||||
self.library_client = None
|
||||
|
||||
def _prime_search_cache(self, items: list[dict]) -> None:
|
||||
"""Store a marker so callers can assert this method was reached."""
|
||||
self.call_log.append(("prime", (items,)))
|
||||
|
||||
def show_all(self) -> None:
|
||||
"""Record show_all invocation for assertion."""
|
||||
self.call_log.append(("show_all", ()))
|
||||
|
||||
def show_unfinished(self) -> None:
|
||||
"""Record show_unfinished invocation for assertion."""
|
||||
self.call_log.append(("show_unfinished", ()))
|
||||
|
||||
def update_status(self, message: str) -> None:
|
||||
"""Capture status messages."""
|
||||
self.messages.append(message)
|
||||
|
||||
def call_from_thread(self, func, *args) -> None:
|
||||
"""Execute callback immediately to simplify tests."""
|
||||
func(*args)
|
||||
|
||||
def _thread_status_update(self, message: str) -> None:
|
||||
"""Capture worker-thread status update messages."""
|
||||
self.messages.append(message)
|
||||
|
||||
|
||||
def test_on_library_loaded_refreshes_cache_and_shows_unfinished() -> None:
|
||||
"""Ensure loaded items reset cache and default to unfinished view."""
|
||||
app = DummyLibraryApp()
|
||||
items = [{"asin": "a"}, {"asin": "b"}]
|
||||
app.on_library_loaded(items)
|
||||
assert app.all_items == items
|
||||
assert app._search_text_cache == {}
|
||||
assert app.messages[-1] == "Loaded 2 books"
|
||||
assert app.call_log[-1][0] == "show_unfinished"
|
||||
|
||||
|
||||
def test_on_library_loaded_uses_show_all_mode() -> None:
|
||||
"""Ensure loaded items respect show_all mode when enabled."""
|
||||
app = DummyLibraryApp()
|
||||
app.show_all_mode = True
|
||||
app.on_library_loaded([{"asin": "a"}])
|
||||
assert app.call_log[-1][0] == "show_all"
|
||||
|
||||
|
||||
def test_on_library_error_formats_message() -> None:
|
||||
"""Ensure library errors are surfaced through status updates."""
|
||||
app = DummyLibraryApp()
|
||||
app.on_library_error("boom")
|
||||
assert app.messages == ["Error fetching library: boom"]
|
||||
|
||||
|
||||
def test_fetch_library_calls_on_loaded(monkeypatch) -> None:
|
||||
"""Ensure fetch_library forwards fetched items through call_from_thread."""
|
||||
app = DummyLibraryApp()
|
||||
|
||||
class Worker:
|
||||
"""Simple worker shim exposing cancellation state."""
|
||||
|
||||
is_cancelled = False
|
||||
|
||||
class LibraryClient:
|
||||
"""Fake client returning a deterministic item list."""
|
||||
|
||||
def fetch_all_items(self, callback):
|
||||
"""Invoke callback and return one item."""
|
||||
callback("progress")
|
||||
return [{"asin": "x"}]
|
||||
|
||||
app.library_client = LibraryClient()
|
||||
monkeypatch.setattr(library_mod, "get_current_worker", lambda: Worker())
|
||||
AppLibraryMixin.fetch_library.__wrapped__(app)
|
||||
assert app.all_items == [{"asin": "x"}]
|
||||
assert "Loaded 1 books" in app.messages
|
||||
|
||||
|
||||
def test_fetch_library_handles_expected_exception(monkeypatch) -> None:
|
||||
"""Ensure fetch exceptions call on_library_error with error text."""
|
||||
app = DummyLibraryApp()
|
||||
|
||||
class Worker:
|
||||
"""Simple worker shim exposing cancellation state."""
|
||||
|
||||
is_cancelled = False
|
||||
|
||||
class BrokenClient:
|
||||
"""Fake client raising an expected fetch exception."""
|
||||
|
||||
def fetch_all_items(self, callback):
|
||||
"""Raise the same exception family handled by mixin."""
|
||||
del callback
|
||||
raise ValueError("bad fetch")
|
||||
|
||||
app.library_client = BrokenClient()
|
||||
monkeypatch.setattr(library_mod, "get_current_worker", lambda: Worker())
|
||||
AppLibraryMixin.fetch_library.__wrapped__(app)
|
||||
assert app.messages[-1] == "Error fetching library: bad fetch"
|
||||
148
tests/app/test_app_progress_mixin_behavior.py
Normal file
148
tests/app/test_app_progress_mixin_behavior.py
Normal file
@@ -0,0 +1,148 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import cast
|
||||
|
||||
from auditui.app.progress import AppProgressMixin
|
||||
from textual.events import Key
|
||||
from textual.widgets import DataTable
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class FakeKeyEvent:
|
||||
"""Minimal key event carrying key value and prevent_default state."""
|
||||
|
||||
key: str
|
||||
prevented: bool = False
|
||||
|
||||
def prevent_default(self) -> None:
|
||||
"""Mark event as prevented."""
|
||||
self.prevented = True
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class FakeStatic:
|
||||
"""Minimal static widget with text and visibility fields."""
|
||||
|
||||
display: bool = False
|
||||
text: str = ""
|
||||
|
||||
def update(self, value: str) -> None:
|
||||
"""Store rendered text value."""
|
||||
self.text = value
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class FakeProgressBar:
|
||||
"""Minimal progress bar widget storing latest progress value."""
|
||||
|
||||
progress: float = 0.0
|
||||
|
||||
def update(self, progress: float) -> None:
|
||||
"""Store progress value for assertions."""
|
||||
self.progress = progress
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class FakeContainer:
|
||||
"""Minimal container exposing display property."""
|
||||
|
||||
display: bool = False
|
||||
|
||||
|
||||
class DummyPlayback:
|
||||
"""Playback shim exposing only members used by AppProgressMixin."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Initialize playback state and update counters."""
|
||||
self.is_playing = False
|
||||
self._status: str | None = None
|
||||
self._progress: tuple[str, float, float] | None = None
|
||||
self.saved_calls = 0
|
||||
|
||||
def check_status(self):
|
||||
"""Return configurable status check message."""
|
||||
return self._status
|
||||
|
||||
def get_current_progress(self):
|
||||
"""Return configurable progress tuple."""
|
||||
return self._progress
|
||||
|
||||
def update_position_if_needed(self) -> None:
|
||||
"""Record periodic save invocations."""
|
||||
self.saved_calls += 1
|
||||
|
||||
|
||||
class DummyProgressApp(AppProgressMixin):
|
||||
"""Mixin host that records action dispatch and widget updates."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Initialize fake widgets and playback state."""
|
||||
self.playback = DummyPlayback()
|
||||
self.focused = object()
|
||||
self.actions: list[str] = []
|
||||
self.messages: list[str] = []
|
||||
self.progress_info = FakeStatic()
|
||||
self.progress_bar = FakeProgressBar()
|
||||
self.progress_container = FakeContainer()
|
||||
|
||||
def action_seek_backward(self) -> None:
|
||||
"""Record backward seek action dispatch."""
|
||||
self.actions.append("seek_backward")
|
||||
|
||||
def action_toggle_playback(self) -> None:
|
||||
"""Record toggle playback action dispatch."""
|
||||
self.actions.append("toggle")
|
||||
|
||||
def update_status(self, message: str) -> None:
|
||||
"""Capture status messages for assertions."""
|
||||
self.messages.append(message)
|
||||
|
||||
def query_one(self, selector: str, _type: object):
|
||||
"""Return fake widgets by selector used by progress mixin."""
|
||||
return {
|
||||
"#progress_info": self.progress_info,
|
||||
"#progress_bar": self.progress_bar,
|
||||
"#progress_bar_container": self.progress_container,
|
||||
}[selector]
|
||||
|
||||
|
||||
def test_on_key_dispatches_seek_when_playing() -> None:
|
||||
"""Ensure left key is intercepted and dispatched to seek action."""
|
||||
app = DummyProgressApp()
|
||||
app.playback.is_playing = True
|
||||
event = FakeKeyEvent("left")
|
||||
app.on_key(cast(Key, event))
|
||||
assert event.prevented is True
|
||||
assert app.actions == ["seek_backward"]
|
||||
|
||||
|
||||
def test_on_key_dispatches_space_when_table_focused() -> None:
|
||||
"""Ensure space is intercepted and dispatched when table is focused."""
|
||||
app = DummyProgressApp()
|
||||
app.focused = DataTable()
|
||||
event = FakeKeyEvent("space")
|
||||
app.on_key(cast(Key, event))
|
||||
assert event.prevented is True
|
||||
assert app.actions == ["toggle"]
|
||||
|
||||
|
||||
def test_check_playback_status_hides_progress_after_message() -> None:
|
||||
"""Ensure playback status message triggers hide-progress behavior."""
|
||||
app = DummyProgressApp()
|
||||
app.playback._status = "Finished"
|
||||
app._check_playback_status()
|
||||
assert app.messages[-1] == "Finished"
|
||||
assert app.progress_info.display is False
|
||||
assert app.progress_container.display is False
|
||||
|
||||
|
||||
def test_update_progress_renders_visible_progress_row() -> None:
|
||||
"""Ensure valid progress data updates widgets and makes them visible."""
|
||||
app = DummyProgressApp()
|
||||
app.playback.is_playing = True
|
||||
app.playback._progress = ("Chapter", 30.0, 60.0)
|
||||
app._update_progress()
|
||||
assert app.progress_bar.progress == 50.0
|
||||
assert app.progress_info.display is True
|
||||
assert app.progress_container.display is True
|
||||
30
tests/app/test_app_progress_periodic_save.py
Normal file
30
tests/app/test_app_progress_periodic_save.py
Normal file
@@ -0,0 +1,30 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from auditui.app.progress import AppProgressMixin
|
||||
|
||||
|
||||
class DummyPlayback:
|
||||
"""Playback stub exposing periodic update method."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Initialize call counter."""
|
||||
self.saved_calls = 0
|
||||
|
||||
def update_position_if_needed(self) -> None:
|
||||
"""Increment call counter for assertions."""
|
||||
self.saved_calls += 1
|
||||
|
||||
|
||||
class DummyProgressApp(AppProgressMixin):
|
||||
"""Minimal app host containing playback dependency only."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Initialize playback stub."""
|
||||
self.playback = DummyPlayback()
|
||||
|
||||
|
||||
def test_save_position_periodically_delegates_to_playback() -> None:
|
||||
"""Ensure periodic save method delegates to playback updater."""
|
||||
app = DummyProgressApp()
|
||||
app._save_position_periodically()
|
||||
assert app.playback.saved_calls == 1
|
||||
@@ -8,22 +8,29 @@ from auditui.library import build_search_text, filter_items
|
||||
|
||||
|
||||
class StubLibrary:
|
||||
"""Minimal library facade used by search-related app helpers."""
|
||||
|
||||
def extract_title(self, item: dict) -> str:
|
||||
"""Return title from a synthetic item."""
|
||||
return item.get("title", "")
|
||||
|
||||
def extract_authors(self, item: dict) -> str:
|
||||
"""Return authors from a synthetic item."""
|
||||
return item.get("authors", "")
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class Dummy:
|
||||
class DummyAuditui:
|
||||
"""Narrow object compatible with Auditui search-cache helper calls."""
|
||||
|
||||
_search_text_cache: dict[int, str] = field(default_factory=dict)
|
||||
library_client: StubLibrary = field(default_factory=StubLibrary)
|
||||
|
||||
|
||||
def test_get_search_text_is_cached() -> None:
|
||||
"""Ensure repeated text extraction for one item reuses cache entries."""
|
||||
item = {"title": "Title", "authors": "Author"}
|
||||
dummy = Dummy()
|
||||
dummy = DummyAuditui()
|
||||
first = Auditui._get_search_text(cast(Auditui, dummy), item)
|
||||
second = Auditui._get_search_text(cast(Auditui, dummy), item)
|
||||
assert first == "title author"
|
||||
@@ -31,7 +38,8 @@ def test_get_search_text_is_cached() -> None:
|
||||
assert len(dummy._search_text_cache) == 1
|
||||
|
||||
|
||||
def test_filter_items_uses_cache() -> None:
|
||||
def test_filter_items_uses_cached_callable() -> None:
|
||||
"""Ensure filter_items cooperates with a memoized search text callback."""
|
||||
library = StubLibrary()
|
||||
cache: dict[int, str] = {}
|
||||
items = [
|
||||
@@ -40,6 +48,7 @@ def test_filter_items_uses_cache() -> None:
|
||||
]
|
||||
|
||||
def cached(item: dict) -> str:
|
||||
"""Build and cache normalized search text per object identity."""
|
||||
cache_key = id(item)
|
||||
if cache_key not in cache:
|
||||
cache[cache_key] = build_search_text(item, cast(Any, library))
|
||||
@@ -49,6 +58,7 @@ def test_filter_items_uses_cache() -> None:
|
||||
assert result == [items[1]]
|
||||
|
||||
|
||||
def test_build_search_text_without_library() -> None:
|
||||
def test_build_search_text_without_library_client() -> None:
|
||||
"""Ensure fallback search text path handles inline author dicts."""
|
||||
item = {"title": "Title", "authors": [{"name": "A"}, {"name": "B"}]}
|
||||
assert build_search_text(item, None) == "title a, b"
|
||||
78
tests/app/test_app_state_initialization.py
Normal file
78
tests/app/test_app_state_initialization.py
Normal file
@@ -0,0 +1,78 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from auditui.app import state as state_mod
|
||||
|
||||
|
||||
class DummyApp:
|
||||
"""Lightweight app object for state initialization tests."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Expose update_status to satisfy init dependencies."""
|
||||
self.messages: list[str] = []
|
||||
|
||||
def update_status(self, message: str) -> None:
|
||||
"""Collect status updates for assertions."""
|
||||
self.messages.append(message)
|
||||
|
||||
|
||||
def test_init_state_without_auth_or_client(monkeypatch) -> None:
|
||||
"""Ensure baseline state is initialized when no auth/client is provided."""
|
||||
app = DummyApp()
|
||||
playback_args: list[tuple[object, object]] = []
|
||||
|
||||
class FakePlayback:
|
||||
"""Playback constructor recorder for init tests."""
|
||||
|
||||
def __init__(self, notify, library_client) -> None:
|
||||
"""Capture arguments passed by init_auditui_state."""
|
||||
playback_args.append((notify, library_client))
|
||||
|
||||
monkeypatch.setattr(state_mod, "PlaybackController", FakePlayback)
|
||||
state_mod.init_auditui_state(app)
|
||||
assert app.library_client is None
|
||||
assert app.download_manager is None
|
||||
assert app.all_items == []
|
||||
assert app.current_items == []
|
||||
assert app.filter_text == ""
|
||||
assert app.show_all_mode is False
|
||||
assert playback_args and playback_args[0][1] is None
|
||||
|
||||
|
||||
def test_init_state_with_auth_and_client_builds_dependencies(monkeypatch) -> None:
|
||||
"""Ensure init constructs library, downloads, and playback dependencies."""
|
||||
app = DummyApp()
|
||||
auth = object()
|
||||
client = object()
|
||||
|
||||
class FakeLibraryClient:
|
||||
"""Fake library client constructor for dependency wiring checks."""
|
||||
|
||||
def __init__(self, value) -> None:
|
||||
"""Store constructor argument for assertions."""
|
||||
self.value = value
|
||||
|
||||
class FakeDownloadManager:
|
||||
"""Fake download manager constructor for dependency wiring checks."""
|
||||
|
||||
def __init__(self, auth_value, client_value) -> None:
|
||||
"""Store constructor arguments for assertions."""
|
||||
self.args = (auth_value, client_value)
|
||||
|
||||
class FakePlayback:
|
||||
"""Fake playback constructor for dependency wiring checks."""
|
||||
|
||||
def __init__(self, notify, library_client) -> None:
|
||||
"""Store constructor arguments for assertions."""
|
||||
self.notify = notify
|
||||
self.library_client = library_client
|
||||
|
||||
monkeypatch.setattr(state_mod, "LibraryClient", FakeLibraryClient)
|
||||
monkeypatch.setattr(state_mod, "DownloadManager", FakeDownloadManager)
|
||||
monkeypatch.setattr(state_mod, "PlaybackController", FakePlayback)
|
||||
state_mod.init_auditui_state(app, auth=auth, client=client)
|
||||
assert isinstance(app.library_client, FakeLibraryClient)
|
||||
assert isinstance(app.download_manager, FakeDownloadManager)
|
||||
assert isinstance(app.playback, FakePlayback)
|
||||
assert app.library_client.value is client
|
||||
assert app.download_manager.args == (auth, client)
|
||||
assert app.playback.library_client.value is client
|
||||
@@ -3,6 +3,7 @@ from __future__ import annotations
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from types import ModuleType
|
||||
from typing import Any, cast
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[1]
|
||||
@@ -15,21 +16,26 @@ try:
|
||||
except ModuleNotFoundError:
|
||||
audible_stub = ModuleType("audible")
|
||||
|
||||
class Authenticator: # minimal stub for type usage
|
||||
class Authenticator:
|
||||
"""Minimal audible authenticator test stub."""
|
||||
|
||||
pass
|
||||
|
||||
class Client: # minimal stub for type usage
|
||||
class Client:
|
||||
"""Minimal audible client test stub."""
|
||||
|
||||
pass
|
||||
|
||||
audible_stub.Authenticator = Authenticator
|
||||
audible_stub.Client = Client
|
||||
setattr(cast(Any, audible_stub), "Authenticator", Authenticator)
|
||||
setattr(cast(Any, audible_stub), "Client", Client)
|
||||
|
||||
activation_bytes = ModuleType("audible.activation_bytes")
|
||||
|
||||
def get_activation_bytes(_auth: Authenticator | None = None) -> bytes:
|
||||
"""Return deterministic empty activation bytes for tests."""
|
||||
return b""
|
||||
|
||||
activation_bytes.get_activation_bytes = get_activation_bytes
|
||||
setattr(cast(Any, activation_bytes), "get_activation_bytes", get_activation_bytes)
|
||||
|
||||
sys.modules["audible"] = audible_stub
|
||||
sys.modules["audible.activation_bytes"] = activation_bytes
|
||||
|
||||
@@ -0,0 +1,57 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from auditui.constants import MIN_FILE_SIZE
|
||||
from auditui.downloads import DownloadManager
|
||||
|
||||
|
||||
def _manager_with_cache_dir(tmp_path: Path) -> DownloadManager:
|
||||
"""Build a lightweight DownloadManager instance without real HTTP clients."""
|
||||
manager = DownloadManager.__new__(DownloadManager)
|
||||
manager.cache_dir = tmp_path
|
||||
manager.chunk_size = 1024
|
||||
return manager
|
||||
|
||||
|
||||
def test_sanitize_filename_replaces_invalid_characters() -> None:
|
||||
"""Ensure filesystem-invalid symbols are replaced with underscores."""
|
||||
manager = DownloadManager.__new__(DownloadManager)
|
||||
assert manager._sanitize_filename('a<>:"/\\|?*b') == "a_________b"
|
||||
|
||||
|
||||
def test_validate_download_url_accepts_only_http_schemes() -> None:
|
||||
"""Ensure download URL validation only accepts HTTP and HTTPS links."""
|
||||
manager = DownloadManager.__new__(DownloadManager)
|
||||
assert manager._validate_download_url("https://example.com/file") is True
|
||||
assert manager._validate_download_url("http://example.com/file") is True
|
||||
assert manager._validate_download_url("ftp://example.com/file") is False
|
||||
|
||||
|
||||
def test_get_cached_path_and_remove_cached(
|
||||
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
||||
) -> None:
|
||||
"""Ensure cache lookup and cache deletion work for valid files."""
|
||||
manager = _manager_with_cache_dir(tmp_path)
|
||||
monkeypatch.setattr(manager, "_get_name_from_asin", lambda asin: "My Book")
|
||||
cached_path = tmp_path / "My Book.aax"
|
||||
cached_path.write_bytes(b"0" * MIN_FILE_SIZE)
|
||||
messages: list[str] = []
|
||||
assert manager.get_cached_path("ASIN123") == cached_path
|
||||
assert manager.is_cached("ASIN123") is True
|
||||
assert manager.remove_cached("ASIN123", notify=messages.append) is True
|
||||
assert not cached_path.exists()
|
||||
assert "Removed from cache" in messages[-1]
|
||||
|
||||
|
||||
def test_get_cached_path_ignores_small_files(
|
||||
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
||||
) -> None:
|
||||
"""Ensure undersized files are not treated as valid cache entries."""
|
||||
manager = _manager_with_cache_dir(tmp_path)
|
||||
monkeypatch.setattr(manager, "_get_name_from_asin", lambda asin: "My Book")
|
||||
cached_path = tmp_path / "My Book.aax"
|
||||
cached_path.write_bytes(b"0" * (MIN_FILE_SIZE - 1))
|
||||
assert manager.get_cached_path("ASIN123") is None
|
||||
85
tests/downloads/test_download_manager_workflow.py
Normal file
85
tests/downloads/test_download_manager_workflow.py
Normal file
@@ -0,0 +1,85 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from auditui.constants import MIN_FILE_SIZE
|
||||
from auditui.downloads import DownloadManager
|
||||
from auditui.downloads import manager as manager_mod
|
||||
|
||||
|
||||
def _bare_manager(tmp_path: Path) -> DownloadManager:
|
||||
"""Create manager without invoking constructor side effects."""
|
||||
manager = DownloadManager.__new__(DownloadManager)
|
||||
manager.cache_dir = tmp_path
|
||||
manager.chunk_size = 1024
|
||||
manager.auth = type(
|
||||
"Auth", (), {"adp_token": "x", "locale": type("Loc", (), {"domain": "fr"})()}
|
||||
)()
|
||||
return manager
|
||||
|
||||
|
||||
def test_get_activation_bytes_returns_hex(
|
||||
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
|
||||
) -> None:
|
||||
"""Ensure activation bytes are converted to lowercase hex string."""
|
||||
manager = _bare_manager(tmp_path)
|
||||
monkeypatch.setattr(manager_mod, "get_activation_bytes", lambda _auth: b"\xde\xad")
|
||||
assert manager.get_activation_bytes() == "dead"
|
||||
|
||||
|
||||
def test_get_activation_bytes_handles_errors(
|
||||
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
|
||||
) -> None:
|
||||
"""Ensure activation retrieval failures are handled gracefully."""
|
||||
manager = _bare_manager(tmp_path)
|
||||
|
||||
def _boom(_auth: object) -> bytes:
|
||||
"""Raise a deterministic failure for exception-path coverage."""
|
||||
raise OSError("no auth")
|
||||
|
||||
monkeypatch.setattr(manager_mod, "get_activation_bytes", _boom)
|
||||
assert manager.get_activation_bytes() is None
|
||||
|
||||
|
||||
def test_get_or_download_uses_cached_file_when_available(
|
||||
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
||||
) -> None:
|
||||
"""Ensure cached files bypass link generation and download work."""
|
||||
manager = _bare_manager(tmp_path)
|
||||
monkeypatch.setattr(manager, "_get_name_from_asin", lambda asin: "Book")
|
||||
cached_path = tmp_path / "Book.aax"
|
||||
cached_path.write_bytes(b"1" * MIN_FILE_SIZE)
|
||||
messages: list[str] = []
|
||||
assert manager.get_or_download("ASIN", notify=messages.append) == cached_path
|
||||
assert "Using cached file" in messages[0]
|
||||
|
||||
|
||||
def test_get_or_download_reports_invalid_url(
|
||||
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
||||
) -> None:
|
||||
"""Ensure workflow reports invalid download URLs and aborts."""
|
||||
manager = _bare_manager(tmp_path)
|
||||
monkeypatch.setattr(manager, "_get_name_from_asin", lambda asin: "Book")
|
||||
monkeypatch.setattr(
|
||||
manager, "_get_download_link", lambda asin, notify=None: "ftp://bad"
|
||||
)
|
||||
messages: list[str] = []
|
||||
assert manager.get_or_download("ASIN", notify=messages.append) is None
|
||||
assert "Invalid download URL" in messages
|
||||
|
||||
|
||||
def test_get_or_download_handles_download_failure(
|
||||
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
||||
) -> None:
|
||||
"""Ensure workflow reports failures when stream download does not complete."""
|
||||
manager = _bare_manager(tmp_path)
|
||||
monkeypatch.setattr(manager, "_get_name_from_asin", lambda asin: "Book")
|
||||
monkeypatch.setattr(
|
||||
manager, "_get_download_link", lambda asin, notify=None: "https://ok"
|
||||
)
|
||||
monkeypatch.setattr(manager, "_download_file", lambda url, path, notify=None: None)
|
||||
messages: list[str] = []
|
||||
assert manager.get_or_download("ASIN", notify=messages.append) is None
|
||||
assert "Download failed" in messages
|
||||
111
tests/library/test_library_client_extractors.py
Normal file
111
tests/library/test_library_client_extractors.py
Normal file
@@ -0,0 +1,111 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
from auditui.library import LibraryClient
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class MockClient:
|
||||
"""Client double that records writes and serves configurable responses."""
|
||||
|
||||
put_calls: list[tuple[str, dict]] = field(default_factory=list)
|
||||
post_calls: list[tuple[str, dict]] = field(default_factory=list)
|
||||
_post_response: dict = field(default_factory=dict)
|
||||
raise_on_put: bool = False
|
||||
|
||||
def put(self, path: str, body: dict) -> dict:
|
||||
"""Record put payload or raise when configured."""
|
||||
if self.raise_on_put:
|
||||
raise RuntimeError("put failed")
|
||||
self.put_calls.append((path, body))
|
||||
return {}
|
||||
|
||||
def post(self, path: str, body: dict) -> dict:
|
||||
"""Record post payload and return configured response."""
|
||||
self.post_calls.append((path, body))
|
||||
return self._post_response
|
||||
|
||||
def get(self, path: str, **kwargs: dict) -> dict:
|
||||
"""Return empty data for extractor-focused tests."""
|
||||
del path, kwargs
|
||||
return {}
|
||||
|
||||
|
||||
def build_item(
|
||||
*,
|
||||
title: str | None = None,
|
||||
product_title: str | None = None,
|
||||
authors: list[dict] | None = None,
|
||||
runtime_min: int | None = None,
|
||||
listening_status: dict | None = None,
|
||||
percent_complete: int | float | None = None,
|
||||
asin: str | None = None,
|
||||
) -> dict:
|
||||
"""Construct synthetic library items for extractor and finish tests."""
|
||||
item: dict = {}
|
||||
if title is not None:
|
||||
item["title"] = title
|
||||
if percent_complete is not None:
|
||||
item["percent_complete"] = percent_complete
|
||||
if listening_status is not None:
|
||||
item["listening_status"] = listening_status
|
||||
if asin is not None:
|
||||
item["asin"] = asin
|
||||
product: dict = {}
|
||||
if product_title is not None:
|
||||
product["title"] = product_title
|
||||
if runtime_min is not None:
|
||||
product["runtime_length"] = {"min": runtime_min}
|
||||
if authors is not None:
|
||||
product["authors"] = authors
|
||||
if asin is not None:
|
||||
product["asin"] = asin
|
||||
if product:
|
||||
item["product"] = product
|
||||
if runtime_min is not None:
|
||||
item["runtime_length_min"] = runtime_min
|
||||
return item
|
||||
|
||||
|
||||
def test_extract_title_prefers_product_title() -> None:
|
||||
"""Ensure product title has precedence over outer item title."""
|
||||
library = LibraryClient(MockClient()) # type: ignore[arg-type]
|
||||
assert (
|
||||
library.extract_title(build_item(title="Outer", product_title="Inner"))
|
||||
== "Inner"
|
||||
)
|
||||
|
||||
|
||||
def test_extract_title_falls_back_to_asin() -> None:
|
||||
"""Ensure title fallback uses product ASIN when no title exists."""
|
||||
library = LibraryClient(MockClient()) # type: ignore[arg-type]
|
||||
assert library.extract_title({"product": {"asin": "A1"}}) == "A1"
|
||||
|
||||
|
||||
def test_extract_authors_joins_names() -> None:
|
||||
"""Ensure author dictionaries are converted to a readable list."""
|
||||
library = LibraryClient(MockClient()) # type: ignore[arg-type]
|
||||
item = build_item(authors=[{"name": "A"}, {"name": "B"}])
|
||||
assert library.extract_authors(item) == "A, B"
|
||||
|
||||
|
||||
def test_extract_runtime_minutes_handles_dict_and_number() -> None:
|
||||
"""Ensure runtime extraction supports dict and numeric payloads."""
|
||||
library = LibraryClient(MockClient()) # type: ignore[arg-type]
|
||||
assert library.extract_runtime_minutes(build_item(runtime_min=12)) == 12
|
||||
assert library.extract_runtime_minutes({"runtime_length": 42}) == 42
|
||||
|
||||
|
||||
def test_extract_progress_info_prefers_listening_status_when_needed() -> None:
|
||||
"""Ensure progress can be sourced from listening_status when top-level is absent."""
|
||||
library = LibraryClient(MockClient()) # type: ignore[arg-type]
|
||||
item = build_item(listening_status={"percent_complete": 25.0})
|
||||
assert library.extract_progress_info(item) == 25.0
|
||||
|
||||
|
||||
def test_extract_asin_prefers_item_then_product() -> None:
|
||||
"""Ensure ASIN extraction works from both item and product fields."""
|
||||
library = LibraryClient(MockClient()) # type: ignore[arg-type]
|
||||
assert library.extract_asin(build_item(asin="ASIN1")) == "ASIN1"
|
||||
assert library.extract_asin({"product": {"asin": "ASIN2"}}) == "ASIN2"
|
||||
103
tests/library/test_library_client_progress_updates.py
Normal file
103
tests/library/test_library_client_progress_updates.py
Normal file
@@ -0,0 +1,103 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
from auditui.library import LibraryClient
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class ProgressClient:
|
||||
"""Client double for position and finished-state API methods."""
|
||||
|
||||
get_responses: dict[str, dict] = field(default_factory=dict)
|
||||
put_calls: list[tuple[str, dict]] = field(default_factory=list)
|
||||
post_response: dict = field(default_factory=dict)
|
||||
fail_put: bool = False
|
||||
|
||||
def get(self, path: str, **kwargs: object) -> dict:
|
||||
"""Return preconfigured payloads by API path."""
|
||||
del kwargs
|
||||
return self.get_responses.get(path, {})
|
||||
|
||||
def put(self, path: str, body: dict) -> dict:
|
||||
"""Record payloads or raise to exercise error handling."""
|
||||
if self.fail_put:
|
||||
raise OSError("write failed")
|
||||
self.put_calls.append((path, body))
|
||||
return {}
|
||||
|
||||
def post(self, path: str, body: dict) -> dict:
|
||||
"""Return licenserequest response for ACR extraction."""
|
||||
del path, body
|
||||
return self.post_response
|
||||
|
||||
|
||||
def test_is_finished_true_from_percent_complete() -> None:
|
||||
"""Ensure 100 percent completion is treated as finished."""
|
||||
library = LibraryClient(ProgressClient()) # type: ignore[arg-type]
|
||||
assert library.is_finished({"percent_complete": 100}) is True
|
||||
|
||||
|
||||
def test_get_last_position_reads_matching_annotation() -> None:
|
||||
"""Ensure last position is read in seconds from matching annotation."""
|
||||
client = ProgressClient(
|
||||
get_responses={
|
||||
"1.0/annotations/lastpositions": {
|
||||
"asin_last_position_heard_annots": [
|
||||
{"asin": "X", "last_position_heard": {"position_ms": 9000}}
|
||||
]
|
||||
}
|
||||
}
|
||||
)
|
||||
library = LibraryClient(client) # type: ignore[arg-type]
|
||||
assert library.get_last_position("X") == 9.0
|
||||
|
||||
|
||||
def test_get_last_position_returns_none_for_missing_state() -> None:
|
||||
"""Ensure DoesNotExist status is surfaced as no saved position."""
|
||||
client = ProgressClient(
|
||||
get_responses={
|
||||
"1.0/annotations/lastpositions": {
|
||||
"asin_last_position_heard_annots": [
|
||||
{"asin": "X", "last_position_heard": {"status": "DoesNotExist"}}
|
||||
]
|
||||
}
|
||||
}
|
||||
)
|
||||
library = LibraryClient(client) # type: ignore[arg-type]
|
||||
assert library.get_last_position("X") is None
|
||||
|
||||
|
||||
def test_save_last_position_validates_non_positive_values() -> None:
|
||||
"""Ensure save_last_position short-circuits on non-positive input."""
|
||||
library = LibraryClient(ProgressClient()) # type: ignore[arg-type]
|
||||
assert library.save_last_position("A", 0) is False
|
||||
|
||||
|
||||
def test_update_position_writes_version_when_available() -> None:
|
||||
"""Ensure version is included in payload when metadata provides it."""
|
||||
client = ProgressClient(
|
||||
get_responses={
|
||||
"1.0/content/A/metadata": {
|
||||
"content_metadata": {
|
||||
"content_reference": {"acr": "token", "version": "2"}
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
library = LibraryClient(client) # type: ignore[arg-type]
|
||||
assert library._update_position("A", 5.5) is True
|
||||
path, body = client.put_calls[0]
|
||||
assert path == "1.0/lastpositions/A"
|
||||
assert body["position_ms"] == 5500
|
||||
assert body["version"] == "2"
|
||||
|
||||
|
||||
def test_mark_as_finished_updates_item_in_place() -> None:
|
||||
"""Ensure successful finish update mutates local item flags."""
|
||||
client = ProgressClient(post_response={"content_license": {"acr": "token"}})
|
||||
library = LibraryClient(client) # type: ignore[arg-type]
|
||||
item = {"runtime_length_min": 1, "listening_status": {}}
|
||||
assert library.mark_as_finished("ASIN", item) is True
|
||||
assert item["is_finished"] is True
|
||||
assert item["listening_status"]["is_finished"] is True
|
||||
34
tests/library/test_library_search_filters.py
Normal file
34
tests/library/test_library_search_filters.py
Normal file
@@ -0,0 +1,34 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from auditui.library import build_search_text, filter_items
|
||||
|
||||
|
||||
class SearchLibrary:
|
||||
"""Simple search extraction adapter for build_search_text tests."""
|
||||
|
||||
def extract_title(self, item: dict) -> str:
|
||||
"""Return a title value from a synthetic item."""
|
||||
return item.get("t", "")
|
||||
|
||||
def extract_authors(self, item: dict) -> str:
|
||||
"""Return an author value from a synthetic item."""
|
||||
return item.get("a", "")
|
||||
|
||||
|
||||
def test_build_search_text_uses_library_client_when_present() -> None:
|
||||
"""Ensure search text delegates to library extractor methods."""
|
||||
item = {"t": "The Book", "a": "The Author"}
|
||||
assert build_search_text(item, SearchLibrary()) == "the book the author"
|
||||
|
||||
|
||||
def test_filter_items_returns_input_when_filter_empty() -> None:
|
||||
"""Ensure empty filter bypasses per-item search callback evaluation."""
|
||||
items = [{"k": 1}, {"k": 2}]
|
||||
assert filter_items(items, "", lambda _item: "ignored") == items
|
||||
|
||||
|
||||
def test_filter_items_matches_case_insensitively() -> None:
|
||||
"""Ensure search matching is case-insensitive across computed text."""
|
||||
items = [{"name": "Alpha"}, {"name": "Beta"}]
|
||||
result = filter_items(items, "BETA", lambda item: item["name"].lower())
|
||||
assert result == [items[1]]
|
||||
99
tests/library/test_library_table_formatting.py
Normal file
99
tests/library/test_library_table_formatting.py
Normal file
@@ -0,0 +1,99 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, cast
|
||||
|
||||
from auditui.constants import AUTHOR_NAME_MAX_LENGTH
|
||||
from auditui.library import (
|
||||
create_progress_sort_key,
|
||||
create_title_sort_key,
|
||||
filter_unfinished_items,
|
||||
format_item_as_row,
|
||||
truncate_author_name,
|
||||
)
|
||||
|
||||
|
||||
class StubLibrary:
|
||||
"""Library facade exposing only helpers needed by table formatting code."""
|
||||
|
||||
def extract_title(self, item: dict) -> str:
|
||||
"""Return synthetic title value."""
|
||||
return item.get("title", "")
|
||||
|
||||
def extract_authors(self, item: dict) -> str:
|
||||
"""Return synthetic authors value."""
|
||||
return item.get("authors", "")
|
||||
|
||||
def extract_runtime_minutes(self, item: dict) -> int | None:
|
||||
"""Return synthetic minute duration."""
|
||||
return item.get("minutes")
|
||||
|
||||
def format_duration(
|
||||
self, value: int | None, unit: str = "minutes", default_none: str | None = None
|
||||
) -> str | None:
|
||||
"""Render runtime in compact minute format for tests."""
|
||||
del unit
|
||||
return default_none if value is None else f"{value}m"
|
||||
|
||||
def extract_progress_info(self, item: dict) -> float | None:
|
||||
"""Return synthetic progress percentage value."""
|
||||
return item.get("percent")
|
||||
|
||||
def extract_asin(self, item: dict) -> str | None:
|
||||
"""Return synthetic ASIN value."""
|
||||
return item.get("asin")
|
||||
|
||||
def is_finished(self, item: dict) -> bool:
|
||||
"""Return synthetic finished flag from the item."""
|
||||
return bool(item.get("finished"))
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class StubDownloads:
|
||||
"""Download cache adapter exposing just is_cached."""
|
||||
|
||||
cached: set[str]
|
||||
|
||||
def is_cached(self, asin: str) -> bool:
|
||||
"""Return whether an ASIN is cached."""
|
||||
return asin in self.cached
|
||||
|
||||
|
||||
def test_create_title_sort_key_normalizes_accents() -> None:
|
||||
"""Ensure title sorting removes accents before case-fold compare."""
|
||||
key_fn, _ = create_title_sort_key()
|
||||
assert key_fn(["Ecole"]) == key_fn(["École"])
|
||||
|
||||
|
||||
def test_create_progress_sort_key_parses_percent_strings() -> None:
|
||||
"""Ensure progress sorting converts percentages and handles invalid values."""
|
||||
key_fn, _ = create_progress_sort_key()
|
||||
assert key_fn(["0", "0", "0", "42.5%", ""]) == 42.5
|
||||
assert key_fn(["0", "0", "0", "bad", ""]) == 0.0
|
||||
|
||||
|
||||
def test_truncate_author_name_clamps_long_values() -> None:
|
||||
"""Ensure very long author strings are shortened with ellipsis."""
|
||||
long_name = "A" * (AUTHOR_NAME_MAX_LENGTH + 5)
|
||||
out = truncate_author_name(long_name)
|
||||
assert out.endswith("...")
|
||||
assert len(out) <= AUTHOR_NAME_MAX_LENGTH
|
||||
|
||||
|
||||
def test_format_item_as_row_marks_downloaded_titles() -> None:
|
||||
"""Ensure downloaded ASINs are shown with a checkmark in table rows."""
|
||||
item = {
|
||||
"title": "Title",
|
||||
"authors": "Author",
|
||||
"minutes": 90,
|
||||
"percent": 12.34,
|
||||
"asin": "A1",
|
||||
}
|
||||
row = format_item_as_row(item, StubLibrary(), cast(Any, StubDownloads({"A1"})))
|
||||
assert row == ("Title", "Author", "90m", "12.3%", "✓")
|
||||
|
||||
|
||||
def test_filter_unfinished_items_keeps_only_incomplete() -> None:
|
||||
"""Ensure unfinished filter excludes items marked as finished."""
|
||||
items = [{"id": 1, "finished": False}, {"id": 2, "finished": True}]
|
||||
assert filter_unfinished_items(items, StubLibrary()) == [items[0]]
|
||||
34
tests/playback/test_playback_chapter_selection.py
Normal file
34
tests/playback/test_playback_chapter_selection.py
Normal file
@@ -0,0 +1,34 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from auditui.playback.chapters import get_current_chapter, get_current_chapter_index
|
||||
|
||||
|
||||
CHAPTERS = [
|
||||
{"title": "One", "start_time": 0.0, "end_time": 60.0},
|
||||
{"title": "Two", "start_time": 60.0, "end_time": 120.0},
|
||||
]
|
||||
|
||||
|
||||
def test_get_current_chapter_handles_empty_chapter_list() -> None:
|
||||
"""Ensure empty chapter metadata still returns a sensible fallback row."""
|
||||
assert get_current_chapter(12.0, [], 300.0) == ("Unknown Chapter", 12.0, 300.0)
|
||||
|
||||
|
||||
def test_get_current_chapter_returns_matching_chapter_window() -> None:
|
||||
"""Ensure chapter selection returns title and chapter-relative timing."""
|
||||
assert get_current_chapter(75.0, CHAPTERS, 120.0) == ("Two", 15.0, 60.0)
|
||||
|
||||
|
||||
def test_get_current_chapter_falls_back_to_last_chapter() -> None:
|
||||
"""Ensure elapsed values past known ranges map to last chapter."""
|
||||
assert get_current_chapter(150.0, CHAPTERS, 200.0) == ("Two", 90.0, 60.0)
|
||||
|
||||
|
||||
def test_get_current_chapter_index_returns_none_without_chapters() -> None:
|
||||
"""Ensure chapter index lookup returns None when no chapters exist."""
|
||||
assert get_current_chapter_index(10.0, []) is None
|
||||
|
||||
|
||||
def test_get_current_chapter_index_returns_last_when_past_end() -> None:
|
||||
"""Ensure chapter index lookup falls back to the final chapter index."""
|
||||
assert get_current_chapter_index(200.0, CHAPTERS) == 1
|
||||
121
tests/playback/test_playback_controller_lifecycle_mixin.py
Normal file
121
tests/playback/test_playback_controller_lifecycle_mixin.py
Normal file
@@ -0,0 +1,121 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from auditui.playback import controller_lifecycle as lifecycle_mod
|
||||
from auditui.playback.controller import PlaybackController
|
||||
|
||||
|
||||
class Proc:
|
||||
"""Process shim used for lifecycle tests."""
|
||||
|
||||
def __init__(self, poll_value=None) -> None:
|
||||
"""Set initial poll result."""
|
||||
self._poll_value = poll_value
|
||||
|
||||
def poll(self):
|
||||
"""Return process running status."""
|
||||
return self._poll_value
|
||||
|
||||
|
||||
def _controller() -> tuple[PlaybackController, list[str]]:
|
||||
"""Build controller and message capture list."""
|
||||
messages: list[str] = []
|
||||
return PlaybackController(messages.append, None), messages
|
||||
|
||||
|
||||
def test_start_reports_missing_ffplay(monkeypatch) -> None:
|
||||
"""Ensure start fails fast when ffplay is unavailable."""
|
||||
controller, messages = _controller()
|
||||
monkeypatch.setattr(lifecycle_mod.process_mod, "is_ffplay_available", lambda: False)
|
||||
assert controller.start(Path("book.aax")) is False
|
||||
assert messages[-1] == "ffplay not found. Please install ffmpeg"
|
||||
|
||||
|
||||
def test_start_sets_state_on_success(monkeypatch) -> None:
|
||||
"""Ensure successful start initializes playback state and metadata."""
|
||||
controller, messages = _controller()
|
||||
monkeypatch.setattr(lifecycle_mod.process_mod, "is_ffplay_available", lambda: True)
|
||||
monkeypatch.setattr(
|
||||
lifecycle_mod.process_mod, "build_ffplay_cmd", lambda *args: ["ffplay"]
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
lifecycle_mod.process_mod, "run_ffplay", lambda cmd: (Proc(None), None)
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
lifecycle_mod,
|
||||
"load_media_info",
|
||||
lambda path, activation: (600.0, [{"title": "ch"}]),
|
||||
)
|
||||
monkeypatch.setattr(lifecycle_mod.time, "time", lambda: 100.0)
|
||||
ok = controller.start(
|
||||
Path("book.aax"), activation_hex="abcd", start_position=10.0, speed=1.2
|
||||
)
|
||||
assert ok is True
|
||||
assert controller.is_playing is True
|
||||
assert controller.current_file_path == Path("book.aax")
|
||||
assert controller.total_duration == 600.0
|
||||
assert messages[-1] == "Playing: book.aax"
|
||||
|
||||
|
||||
def test_prepare_and_start_uses_last_position(monkeypatch) -> None:
|
||||
"""Ensure prepare flow resumes from saved position when available."""
|
||||
messages: list[str] = []
|
||||
lib = type("Lib", (), {"get_last_position": lambda self, asin: 75.0})()
|
||||
controller = PlaybackController(messages.append, lib)
|
||||
started: list[tuple] = []
|
||||
|
||||
class DM:
|
||||
"""Download manager shim returning path and activation token."""
|
||||
|
||||
def get_or_download(self, asin, notify):
|
||||
"""Return deterministic downloaded file path."""
|
||||
return Path("book.aax")
|
||||
|
||||
def get_activation_bytes(self):
|
||||
"""Return deterministic activation token."""
|
||||
return "abcd"
|
||||
|
||||
monkeypatch.setattr(controller, "start", lambda *args: started.append(args) or True)
|
||||
monkeypatch.setattr(lifecycle_mod.time, "time", lambda: 200.0)
|
||||
assert controller.prepare_and_start(DM(), "ASIN") is True
|
||||
assert started and started[0][3] == 75.0
|
||||
assert "Resuming from 01:15" in messages
|
||||
|
||||
|
||||
def test_toggle_playback_uses_pause_and_resume_paths(monkeypatch) -> None:
|
||||
"""Ensure toggle dispatches pause or resume based on paused flag."""
|
||||
controller, _ = _controller()
|
||||
controller.is_playing = True
|
||||
controller.playback_process = Proc(None)
|
||||
called: list[str] = []
|
||||
monkeypatch.setattr(controller, "pause", lambda: called.append("pause"))
|
||||
monkeypatch.setattr(controller, "resume", lambda: called.append("resume"))
|
||||
controller.is_paused = False
|
||||
assert controller.toggle_playback() is True
|
||||
controller.is_paused = True
|
||||
assert controller.toggle_playback() is True
|
||||
assert called == ["pause", "resume"]
|
||||
|
||||
|
||||
def test_restart_at_position_restores_state_and_notifies(monkeypatch) -> None:
|
||||
"""Ensure restart logic preserves metadata and emits custom message."""
|
||||
controller, messages = _controller()
|
||||
controller.is_playing = True
|
||||
controller.is_paused = True
|
||||
controller.current_file_path = Path("book.aax")
|
||||
controller.current_asin = "ASIN"
|
||||
controller.activation_hex = "abcd"
|
||||
controller.total_duration = 400.0
|
||||
controller.chapters = [{"title": "One"}]
|
||||
controller.playback_speed = 1.0
|
||||
monkeypatch.setattr(controller, "_stop_process", lambda: None)
|
||||
monkeypatch.setattr(lifecycle_mod.time, "sleep", lambda _s: None)
|
||||
monkeypatch.setattr(controller, "start", lambda *args: True)
|
||||
paused: list[str] = []
|
||||
monkeypatch.setattr(controller, "pause", lambda: paused.append("pause"))
|
||||
assert controller._restart_at_position(120.0, message="Jumped") is True
|
||||
assert controller.current_asin == "ASIN"
|
||||
assert controller.chapters == [{"title": "One"}]
|
||||
assert paused == ["pause"]
|
||||
assert messages[-1] == "Jumped"
|
||||
100
tests/playback/test_playback_controller_seek_speed_mixin.py
Normal file
100
tests/playback/test_playback_controller_seek_speed_mixin.py
Normal file
@@ -0,0 +1,100 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from auditui.playback import controller_seek_speed as seek_speed_mod
|
||||
from auditui.playback.controller import PlaybackController
|
||||
|
||||
|
||||
def _controller() -> tuple[PlaybackController, list[str]]:
|
||||
"""Build controller and in-memory notification sink."""
|
||||
messages: list[str] = []
|
||||
return PlaybackController(messages.append, None), messages
|
||||
|
||||
|
||||
def test_seek_notifies_when_target_invalid(monkeypatch) -> None:
|
||||
"""Ensure seek reports end-of-file condition when target is invalid."""
|
||||
controller, messages = _controller()
|
||||
monkeypatch.setattr(controller, "_get_current_elapsed", lambda: 20.0)
|
||||
controller.seek_offset = 100.0
|
||||
controller.total_duration = 120.0
|
||||
monkeypatch.setattr(
|
||||
seek_speed_mod.seek_mod, "compute_seek_target", lambda *args: None
|
||||
)
|
||||
assert controller._seek(30.0, "forward") is False
|
||||
assert messages[-1] == "Already at end of file"
|
||||
|
||||
|
||||
def test_seek_to_chapter_reports_bounds(monkeypatch) -> None:
|
||||
"""Ensure chapter seek reports first and last chapter boundaries."""
|
||||
controller, messages = _controller()
|
||||
controller.is_playing = True
|
||||
controller.current_file_path = object()
|
||||
controller.chapters = [{"title": "One", "start_time": 0.0, "end_time": 10.0}]
|
||||
monkeypatch.setattr(controller, "_get_current_elapsed", lambda: 1.0)
|
||||
monkeypatch.setattr(
|
||||
seek_speed_mod.chapters_mod,
|
||||
"get_current_chapter_index",
|
||||
lambda elapsed, chapters: 0,
|
||||
)
|
||||
assert controller.seek_to_chapter("next") is False
|
||||
assert messages[-1] == "Already at last chapter"
|
||||
assert controller.seek_to_chapter("previous") is False
|
||||
assert messages[-1] == "Already at first chapter"
|
||||
|
||||
|
||||
def test_save_current_position_writes_positive_values() -> None:
|
||||
"""Ensure save_current_position persists elapsed time via library client."""
|
||||
calls: list[tuple[str, float]] = []
|
||||
library = type(
|
||||
"Library",
|
||||
(),
|
||||
{"save_last_position": lambda self, asin, pos: calls.append((asin, pos))},
|
||||
)()
|
||||
controller = PlaybackController(lambda _msg: None, library)
|
||||
controller.current_asin = "ASIN"
|
||||
controller.is_playing = True
|
||||
controller.playback_start_time = 1.0
|
||||
controller.seek_offset = 10.0
|
||||
controller._get_current_elapsed = lambda: 15.0
|
||||
controller._save_current_position()
|
||||
assert calls == [("ASIN", 25.0)]
|
||||
|
||||
|
||||
def test_update_position_if_needed_honors_interval(monkeypatch) -> None:
|
||||
"""Ensure periodic save runs only when interval has elapsed."""
|
||||
controller, _ = _controller()
|
||||
controller.is_playing = True
|
||||
controller.current_asin = "ASIN"
|
||||
controller.library_client = object()
|
||||
controller.last_save_time = 10.0
|
||||
controller.position_save_interval = 30.0
|
||||
saves: list[str] = []
|
||||
monkeypatch.setattr(
|
||||
controller, "_save_current_position", lambda: saves.append("save")
|
||||
)
|
||||
monkeypatch.setattr(seek_speed_mod.time, "time", lambda: 20.0)
|
||||
controller.update_position_if_needed()
|
||||
monkeypatch.setattr(seek_speed_mod.time, "time", lambda: 45.0)
|
||||
controller.update_position_if_needed()
|
||||
assert saves == ["save"]
|
||||
|
||||
|
||||
def test_change_speed_restarts_with_new_rate(monkeypatch) -> None:
|
||||
"""Ensure speed changes restart playback at current position."""
|
||||
controller, _ = _controller()
|
||||
controller.playback_speed = 1.0
|
||||
controller.seek_offset = 5.0
|
||||
monkeypatch.setattr(controller, "_get_current_elapsed", lambda: 10.0)
|
||||
seen: list[tuple[float, float, str]] = []
|
||||
|
||||
def fake_restart(
|
||||
position: float, speed: float | None = None, message: str | None = None
|
||||
) -> bool:
|
||||
"""Capture restart call parameters."""
|
||||
seen.append((position, speed or 0.0, message or ""))
|
||||
return True
|
||||
|
||||
monkeypatch.setattr(controller, "_restart_at_position", fake_restart)
|
||||
assert controller.increase_speed() is True
|
||||
assert seen and seen[0][0] == 15.0
|
||||
assert seen[0][1] > 1.0
|
||||
assert seen[0][2].startswith("Speed: ")
|
||||
76
tests/playback/test_playback_controller_state_mixin.py
Normal file
76
tests/playback/test_playback_controller_state_mixin.py
Normal file
@@ -0,0 +1,76 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Any, cast
|
||||
|
||||
from auditui.playback import controller_state as state_mod
|
||||
from auditui.playback.controller import PlaybackController
|
||||
|
||||
|
||||
class Proc:
|
||||
"""Simple process shim exposing poll and pid for state tests."""
|
||||
|
||||
def __init__(self, poll_value=None) -> None:
|
||||
"""Store poll return value and fake pid."""
|
||||
self._poll_value = poll_value
|
||||
self.pid = 123
|
||||
|
||||
def poll(self):
|
||||
"""Return configured process status code or None."""
|
||||
return self._poll_value
|
||||
|
||||
|
||||
def _controller() -> tuple[PlaybackController, list[str]]:
|
||||
"""Build playback controller and collected notifications list."""
|
||||
messages: list[str] = []
|
||||
return PlaybackController(messages.append, None), messages
|
||||
|
||||
|
||||
def test_get_current_elapsed_rolls_pause_into_duration(monkeypatch) -> None:
|
||||
"""Ensure elapsed helper absorbs stale pause_start_time when resumed."""
|
||||
controller, _ = _controller()
|
||||
controller.pause_start_time = 100.0
|
||||
controller.is_paused = False
|
||||
monkeypatch.setattr(state_mod.time, "time", lambda: 120.0)
|
||||
monkeypatch.setattr(state_mod.elapsed_mod, "get_elapsed", lambda *args: 50.0)
|
||||
assert controller._get_current_elapsed() == 50.0
|
||||
assert controller.paused_duration == 20.0
|
||||
assert controller.pause_start_time is None
|
||||
|
||||
|
||||
def test_validate_playback_state_stops_when_process_ended() -> None:
|
||||
"""Ensure state validation stops and reports when process is gone."""
|
||||
controller, messages = _controller()
|
||||
controller.playback_process = cast(Any, Proc(poll_value=1))
|
||||
controller.is_playing = True
|
||||
controller.current_file_path = Path("book.aax")
|
||||
ok = controller._validate_playback_state(require_paused=False)
|
||||
assert ok is False
|
||||
assert messages[-1] == "Playback process has ended"
|
||||
|
||||
|
||||
def test_send_signal_sets_paused_state_and_notifies(monkeypatch) -> None:
|
||||
"""Ensure SIGSTOP updates paused state and includes filename in status."""
|
||||
controller, messages = _controller()
|
||||
controller.playback_process = cast(Any, Proc())
|
||||
controller.current_file_path = Path("book.aax")
|
||||
monkeypatch.setattr(state_mod.process_mod, "send_signal", lambda proc, sig: None)
|
||||
controller._send_signal(state_mod.signal.SIGSTOP, "Paused", "pause")
|
||||
assert controller.is_paused is True
|
||||
assert messages[-1] == "Paused: book.aax"
|
||||
|
||||
|
||||
def test_send_signal_handles_process_lookup(monkeypatch) -> None:
|
||||
"""Ensure missing process lookup errors are handled with user-facing message."""
|
||||
controller, messages = _controller()
|
||||
controller.playback_process = cast(Any, Proc())
|
||||
|
||||
def raise_lookup(proc, sig):
|
||||
"""Raise process lookup error to exercise exception path."""
|
||||
del proc, sig
|
||||
raise ProcessLookupError("gone")
|
||||
|
||||
monkeypatch.setattr(state_mod.process_mod, "send_signal", raise_lookup)
|
||||
monkeypatch.setattr(state_mod.process_mod, "terminate_process", lambda proc: None)
|
||||
controller._send_signal(state_mod.signal.SIGCONT, "Playing", "resume")
|
||||
assert messages[-1] == "Process no longer exists"
|
||||
21
tests/playback/test_playback_elapsed_math.py
Normal file
21
tests/playback/test_playback_elapsed_math.py
Normal file
@@ -0,0 +1,21 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from auditui.playback.elapsed import get_elapsed
|
||||
from auditui.playback import elapsed as elapsed_mod
|
||||
|
||||
|
||||
def test_get_elapsed_returns_zero_without_start_time() -> None:
|
||||
"""Ensure elapsed computation returns zero when playback has not started."""
|
||||
assert get_elapsed(None, None, 0.0, False) == 0.0
|
||||
|
||||
|
||||
def test_get_elapsed_while_paused_uses_pause_start(monkeypatch) -> None:
|
||||
"""Ensure paused elapsed is fixed at pause_start minus previous pauses."""
|
||||
monkeypatch.setattr(elapsed_mod.time, "time", lambda: 500.0)
|
||||
assert get_elapsed(100.0, 250.0, 20.0, True) == 130.0
|
||||
|
||||
|
||||
def test_get_elapsed_subtracts_pause_duration_when_resumed(monkeypatch) -> None:
|
||||
"""Ensure resumed elapsed removes newly accumulated paused duration."""
|
||||
monkeypatch.setattr(elapsed_mod.time, "time", lambda: 400.0)
|
||||
assert get_elapsed(100.0, 300.0, 10.0, False) == 190.0
|
||||
67
tests/playback/test_playback_process_helpers.py
Normal file
67
tests/playback/test_playback_process_helpers.py
Normal file
@@ -0,0 +1,67 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
from auditui.playback import process as process_mod
|
||||
|
||||
|
||||
class DummyProc:
|
||||
"""Minimal subprocess-like object for terminate_process tests."""
|
||||
|
||||
def __init__(self, alive: bool = True) -> None:
|
||||
"""Initialize process state and bookkeeping flags."""
|
||||
self._alive = alive
|
||||
self.terminated = False
|
||||
self.killed = False
|
||||
self.pid = 123
|
||||
|
||||
def poll(self) -> int | None:
|
||||
"""Return None while process is alive and 0 when stopped."""
|
||||
return None if self._alive else 0
|
||||
|
||||
def terminate(self) -> None:
|
||||
"""Mark process as terminated and no longer alive."""
|
||||
self.terminated = True
|
||||
self._alive = False
|
||||
|
||||
def wait(self, timeout: float | None = None) -> int:
|
||||
"""Return immediately to emulate a cooperative shutdown."""
|
||||
del timeout
|
||||
return 0
|
||||
|
||||
def kill(self) -> None:
|
||||
"""Mark process as killed and no longer alive."""
|
||||
self.killed = True
|
||||
self._alive = False
|
||||
|
||||
|
||||
def test_build_ffplay_cmd_includes_activation_seek_and_speed() -> None:
|
||||
"""Ensure ffplay command includes optional playback arguments when set."""
|
||||
cmd = process_mod.build_ffplay_cmd(Path("book.aax"), "abcd", 12.5, 1.2)
|
||||
assert "-activation_bytes" in cmd
|
||||
assert "-ss" in cmd
|
||||
assert "atempo=1.20" in " ".join(cmd)
|
||||
|
||||
|
||||
def test_terminate_process_handles_alive_process() -> None:
|
||||
"""Ensure terminate_process gracefully shuts down a running process."""
|
||||
proc = DummyProc(alive=True)
|
||||
process_mod.terminate_process(proc) # type: ignore[arg-type]
|
||||
assert proc.terminated is True
|
||||
|
||||
|
||||
def test_run_ffplay_returns_none_when_unavailable(monkeypatch) -> None:
|
||||
"""Ensure ffplay launch exits early when binary is not on PATH."""
|
||||
monkeypatch.setattr(process_mod, "is_ffplay_available", lambda: False)
|
||||
assert process_mod.run_ffplay(["ffplay", "book.aax"]) == (None, None)
|
||||
|
||||
|
||||
def test_send_signal_delegates_to_os_kill(monkeypatch) -> None:
|
||||
"""Ensure send_signal forwards process PID and signal to os.kill."""
|
||||
seen: list[tuple[int, object]] = []
|
||||
monkeypatch.setattr(
|
||||
process_mod.os, "kill", lambda pid, sig: seen.append((pid, sig))
|
||||
)
|
||||
process_mod.send_signal(DummyProc(), process_mod.signal.SIGSTOP) # type: ignore[arg-type]
|
||||
assert seen and seen[0][0] == 123
|
||||
20
tests/playback/test_playback_seek_targets.py
Normal file
20
tests/playback/test_playback_seek_targets.py
Normal file
@@ -0,0 +1,20 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from auditui.playback.seek import compute_seek_target
|
||||
|
||||
|
||||
def test_forward_seek_returns_new_position_and_message() -> None:
|
||||
"""Ensure forward seek computes expected position and status message."""
|
||||
target = compute_seek_target(10.0, 100.0, 30.0, "forward")
|
||||
assert target == (40.0, "Skipped forward 30s")
|
||||
|
||||
|
||||
def test_forward_seek_returns_none_near_end() -> None:
|
||||
"""Ensure seeking too close to end returns an invalid seek result."""
|
||||
assert compute_seek_target(95.0, 100.0, 10.0, "forward") is None
|
||||
|
||||
|
||||
def test_backward_seek_clamps_to_zero() -> None:
|
||||
"""Ensure backward seek cannot go below zero."""
|
||||
target = compute_seek_target(5.0, None, 30.0, "backward")
|
||||
assert target == (0.0, "Skipped backward 30s")
|
||||
54
tests/stats/test_stats_account_data.py
Normal file
54
tests/stats/test_stats_account_data.py
Normal file
@@ -0,0 +1,54 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from auditui.stats.account import (
|
||||
get_account_info,
|
||||
get_country,
|
||||
get_subscription_details,
|
||||
)
|
||||
|
||||
|
||||
class AccountClient:
|
||||
"""Minimal API client returning endpoint-specific account responses."""
|
||||
|
||||
def __init__(self, responses: dict[str, dict]) -> None:
|
||||
"""Store endpoint response map for deterministic tests."""
|
||||
self._responses = responses
|
||||
|
||||
def get(self, path: str, **kwargs: object) -> dict:
|
||||
"""Return configured response and ignore query parameters."""
|
||||
del kwargs
|
||||
return self._responses.get(path, {})
|
||||
|
||||
|
||||
def test_get_account_info_merges_multiple_endpoints() -> None:
|
||||
"""Ensure account info aggregator combines endpoint payload dictionaries."""
|
||||
client = AccountClient(
|
||||
{
|
||||
"1.0/account/information": {"a": 1},
|
||||
"1.0/customer/information": {"b": 2},
|
||||
"1.0/customer/status": {"c": 3},
|
||||
}
|
||||
)
|
||||
assert get_account_info(client) == {"a": 1, "b": 2, "c": 3}
|
||||
|
||||
|
||||
def test_get_subscription_details_uses_known_nested_paths() -> None:
|
||||
"""Ensure first valid subscription_details list entry is returned."""
|
||||
info = {
|
||||
"customer_details": {
|
||||
"subscription": {"subscription_details": [{"name": "Plan"}]}
|
||||
}
|
||||
}
|
||||
assert get_subscription_details(info) == {"name": "Plan"}
|
||||
|
||||
|
||||
def test_get_country_supports_locale_variants() -> None:
|
||||
"""Ensure country extraction supports object, domain, and locale string forms."""
|
||||
auth_country_code = type(
|
||||
"Auth", (), {"locale": type("Loc", (), {"country_code": "us"})()}
|
||||
)()
|
||||
auth_domain = type("Auth", (), {"locale": type("Loc", (), {"domain": "fr"})()})()
|
||||
auth_string = type("Auth", (), {"locale": "en_gb"})()
|
||||
assert get_country(auth_country_code) == "US"
|
||||
assert get_country(auth_domain) == "FR"
|
||||
assert get_country(auth_string) == "GB"
|
||||
67
tests/stats/test_stats_aggregator_output.py
Normal file
67
tests/stats/test_stats_aggregator_output.py
Normal file
@@ -0,0 +1,67 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import date
|
||||
|
||||
from auditui.stats.aggregator import StatsAggregator
|
||||
from auditui.stats import aggregator as aggregator_mod
|
||||
|
||||
|
||||
def test_get_stats_returns_empty_without_client() -> None:
|
||||
"""Ensure stats aggregation short-circuits when API client is absent."""
|
||||
aggregator = StatsAggregator(
|
||||
client=None, auth=None, library_client=None, all_items=[]
|
||||
)
|
||||
assert aggregator.get_stats() == []
|
||||
|
||||
|
||||
def test_get_stats_builds_expected_rows(monkeypatch) -> None:
|
||||
"""Ensure aggregator assembles rows from listening, account, and email sources."""
|
||||
monkeypatch.setattr(
|
||||
aggregator_mod.listening_mod, "get_signup_year", lambda _client: 2015
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
aggregator_mod.listening_mod,
|
||||
"get_listening_time",
|
||||
lambda _client, duration, start_date: 120_000 if duration == 1 else 3_600_000,
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
aggregator_mod.listening_mod, "get_finished_books_count", lambda _lc, _items: 7
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
aggregator_mod.email_mod,
|
||||
"resolve_email",
|
||||
lambda *args, **kwargs: "user@example.com",
|
||||
)
|
||||
monkeypatch.setattr(aggregator_mod.account_mod, "get_country", lambda _auth: "US")
|
||||
monkeypatch.setattr(
|
||||
aggregator_mod.account_mod,
|
||||
"get_account_info",
|
||||
lambda _client: {
|
||||
"subscription_details": [
|
||||
{
|
||||
"name": "Premium",
|
||||
"next_bill_date": "2026-02-01T00:00:00Z",
|
||||
"next_bill_amount": {
|
||||
"currency_value": "14.95",
|
||||
"currency_code": "USD",
|
||||
},
|
||||
}
|
||||
]
|
||||
},
|
||||
)
|
||||
|
||||
aggregator = StatsAggregator(
|
||||
client=object(),
|
||||
auth=object(),
|
||||
library_client=object(),
|
||||
all_items=[{}, {}, {}],
|
||||
)
|
||||
stats = dict(aggregator.get_stats(today=date(2026, 2, 1)))
|
||||
assert stats["Email"] == "user@example.com"
|
||||
assert stats["Country Store"] == "US"
|
||||
assert stats["Signup Year"] == "2015"
|
||||
assert stats["Subscription"] == "Premium"
|
||||
assert stats["Price"] == "14.95 USD"
|
||||
assert stats["This Month"] == "2m"
|
||||
assert stats["This Year"] == "1h00"
|
||||
assert stats["Books Finished"] == "7 / 3"
|
||||
64
tests/stats/test_stats_email_resolution.py
Normal file
64
tests/stats/test_stats_email_resolution.py
Normal file
@@ -0,0 +1,64 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
from auditui.stats.email import (
|
||||
find_email_in_data,
|
||||
first_email,
|
||||
get_email_from_account_info,
|
||||
get_email_from_auth,
|
||||
get_email_from_auth_file,
|
||||
get_email_from_config,
|
||||
resolve_email,
|
||||
)
|
||||
|
||||
|
||||
def test_find_email_in_nested_data() -> None:
|
||||
"""Ensure nested structures are scanned until a plausible email is found."""
|
||||
data = {"a": {"b": ["nope", "user@example.com"]}}
|
||||
assert find_email_in_data(data) == "user@example.com"
|
||||
|
||||
|
||||
def test_first_email_skips_unknown_and_none() -> None:
|
||||
"""Ensure first_email ignores empty and Unknown sentinel values."""
|
||||
assert first_email(None, "Unknown", "ok@example.com") == "ok@example.com"
|
||||
|
||||
|
||||
def test_get_email_from_config_and_auth_file(tmp_path: Path) -> None:
|
||||
"""Ensure config and auth-file readers extract valid email fields."""
|
||||
config_path = tmp_path / "config.json"
|
||||
auth_path = tmp_path / "auth.json"
|
||||
config_path.write_text(
|
||||
json.dumps({"email": "config@example.com"}), encoding="utf-8"
|
||||
)
|
||||
auth_path.write_text(json.dumps({"email": "auth@example.com"}), encoding="utf-8")
|
||||
assert get_email_from_config(config_path) == "config@example.com"
|
||||
assert get_email_from_auth_file(auth_path) == "auth@example.com"
|
||||
|
||||
|
||||
def test_get_email_from_auth_prefers_username() -> None:
|
||||
"""Ensure auth object attributes are checked in expected precedence order."""
|
||||
auth = type(
|
||||
"Auth", (), {"username": "user@example.com", "login": None, "email": None}
|
||||
)()
|
||||
assert get_email_from_auth(auth) == "user@example.com"
|
||||
|
||||
|
||||
def test_get_email_from_account_info_supports_nested_customer_info() -> None:
|
||||
"""Ensure account email can be discovered in nested customer_info payload."""
|
||||
info = {"customer_info": {"primary_email": "nested@example.com"}}
|
||||
assert get_email_from_account_info(info) == "nested@example.com"
|
||||
|
||||
|
||||
def test_resolve_email_falls_back_to_account_getter(tmp_path: Path) -> None:
|
||||
"""Ensure resolve_email checks account-info callback when local sources miss."""
|
||||
auth = object()
|
||||
value = resolve_email(
|
||||
auth,
|
||||
client=object(),
|
||||
config_path=tmp_path / "missing-config.json",
|
||||
auth_path=tmp_path / "missing-auth.json",
|
||||
get_account_info=lambda: {"customer_email": "account@example.com"},
|
||||
)
|
||||
assert value == "account@example.com"
|
||||
16
tests/stats/test_stats_formatting.py
Normal file
16
tests/stats/test_stats_formatting.py
Normal file
@@ -0,0 +1,16 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from auditui.stats.format import format_date, format_time
|
||||
|
||||
|
||||
def test_format_time_handles_minutes_and_hours() -> None:
|
||||
"""Ensure format_time outputs minute-only and hour-minute formats."""
|
||||
assert format_time(90_000) == "1m"
|
||||
assert format_time(3_660_000) == "1h01"
|
||||
|
||||
|
||||
def test_format_date_handles_iso_and_invalid_values() -> None:
|
||||
"""Ensure format_date normalizes ISO timestamps and preserves invalid input."""
|
||||
assert format_date("2026-01-15T10:20:30Z") == "2026-01-15"
|
||||
assert format_date("not-a-date") == "not-a-date"
|
||||
assert format_date(None) == "Unknown"
|
||||
64
tests/stats/test_stats_listening_metrics.py
Normal file
64
tests/stats/test_stats_listening_metrics.py
Normal file
@@ -0,0 +1,64 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from auditui.stats.listening import (
|
||||
get_finished_books_count,
|
||||
get_listening_time,
|
||||
get_signup_year,
|
||||
has_activity,
|
||||
)
|
||||
|
||||
|
||||
class StatsClient:
|
||||
"""Client double for monthly aggregate lookups keyed by start date."""
|
||||
|
||||
def __init__(self, sums_by_start_date: dict[str, list[int]]) -> None:
|
||||
"""Store aggregate sums grouped by monthly_listening_interval_start_date."""
|
||||
self._sums = sums_by_start_date
|
||||
|
||||
def get(self, path: str, **kwargs: str) -> dict:
|
||||
"""Return aggregate payload based on requested interval start date."""
|
||||
del path
|
||||
start_date = kwargs["monthly_listening_interval_start_date"]
|
||||
sums = self._sums.get(start_date, [0])
|
||||
return {
|
||||
"aggregated_monthly_listening_stats": [{"aggregated_sum": s} for s in sums]
|
||||
}
|
||||
|
||||
|
||||
def test_has_activity_detects_non_zero_months() -> None:
|
||||
"""Ensure activity helper returns true when any month has positive sum."""
|
||||
assert (
|
||||
has_activity(
|
||||
{
|
||||
"aggregated_monthly_listening_stats": [
|
||||
{"aggregated_sum": 0},
|
||||
{"aggregated_sum": 1},
|
||||
]
|
||||
}
|
||||
)
|
||||
is True
|
||||
)
|
||||
|
||||
|
||||
def test_get_listening_time_sums_aggregated_months() -> None:
|
||||
"""Ensure monthly aggregate sums are added into one listening total."""
|
||||
client = StatsClient({"2026-01": [1000, 2000, 3000]})
|
||||
assert get_listening_time(client, duration=1, start_date="2026-01") == 6000
|
||||
|
||||
|
||||
def test_get_signup_year_returns_earliest_year_with_activity() -> None:
|
||||
"""Ensure signup year search finds first active year via binary search."""
|
||||
client = StatsClient(
|
||||
{"2026-01": [1], "2010-01": [1], "2002-01": [1], "2001-01": [0]}
|
||||
)
|
||||
year = get_signup_year(client)
|
||||
assert year <= 2010
|
||||
|
||||
|
||||
def test_get_finished_books_count_uses_library_is_finished() -> None:
|
||||
"""Ensure finished books count delegates to library client predicate."""
|
||||
library_client = type(
|
||||
"Library", (), {"is_finished": lambda self, item: item.get("done", False)}
|
||||
)()
|
||||
items = [{"done": True}, {"done": False}, {"done": True}]
|
||||
assert get_finished_books_count(library_client, items) == 2
|
||||
@@ -1,48 +0,0 @@
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from auditui.downloads import DownloadManager
|
||||
from auditui.constants import MIN_FILE_SIZE
|
||||
|
||||
|
||||
def test_sanitize_filename() -> None:
|
||||
dm = DownloadManager.__new__(DownloadManager)
|
||||
assert dm._sanitize_filename('a<>:"/\\|?*b') == "a_________b"
|
||||
|
||||
|
||||
def test_validate_download_url() -> None:
|
||||
dm = DownloadManager.__new__(DownloadManager)
|
||||
assert dm._validate_download_url("https://example.com/file") is True
|
||||
assert dm._validate_download_url("http://example.com/file") is True
|
||||
assert dm._validate_download_url("ftp://example.com/file") is False
|
||||
|
||||
|
||||
def test_cached_path_and_remove(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
dm = DownloadManager.__new__(DownloadManager)
|
||||
dm.cache_dir = tmp_path
|
||||
|
||||
monkeypatch.setattr(dm, "_get_name_from_asin", lambda asin: "My Book")
|
||||
safe_name = dm._sanitize_filename("My Book")
|
||||
cached_path = tmp_path / f"{safe_name}.aax"
|
||||
cached_path.write_bytes(b"0" * MIN_FILE_SIZE)
|
||||
|
||||
assert dm.get_cached_path("ASIN123") == cached_path
|
||||
assert dm.is_cached("ASIN123") is True
|
||||
|
||||
messages: list[str] = []
|
||||
assert dm.remove_cached("ASIN123", notify=messages.append) is True
|
||||
assert not cached_path.exists()
|
||||
assert messages and "Removed from cache" in messages[-1]
|
||||
|
||||
|
||||
def test_cached_path_ignores_small_file(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
dm = DownloadManager.__new__(DownloadManager)
|
||||
dm.cache_dir = tmp_path
|
||||
|
||||
monkeypatch.setattr(dm, "_get_name_from_asin", lambda asin: "My Book")
|
||||
safe_name = dm._sanitize_filename("My Book")
|
||||
cached_path = tmp_path / f"{safe_name}.aax"
|
||||
cached_path.write_bytes(b"0" * (MIN_FILE_SIZE - 1))
|
||||
|
||||
assert dm.get_cached_path("ASIN123") is None
|
||||
@@ -1,131 +0,0 @@
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
from auditui.library import LibraryClient
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class MockClient:
|
||||
put_calls: list[tuple[str, dict]] = field(default_factory=list)
|
||||
post_calls: list[tuple[str, dict]] = field(default_factory=list)
|
||||
_post_response: dict = field(default_factory=dict)
|
||||
raise_on_put: bool = False
|
||||
|
||||
def put(self, path: str, body: dict) -> dict:
|
||||
if self.raise_on_put:
|
||||
raise RuntimeError("put failed")
|
||||
self.put_calls.append((path, body))
|
||||
return {}
|
||||
|
||||
def post(self, path: str, body: dict) -> dict:
|
||||
self.post_calls.append((path, body))
|
||||
return self._post_response
|
||||
|
||||
def get(self, path: str, **kwargs: dict) -> dict:
|
||||
return {}
|
||||
|
||||
|
||||
def test_extract_title_prefers_product() -> None:
|
||||
client = MockClient()
|
||||
library = LibraryClient(client) # type: ignore[arg-type]
|
||||
item = build_item(title="Outer", product_title="Inner")
|
||||
assert library.extract_title(item) == "Inner"
|
||||
|
||||
|
||||
def test_extract_authors_joins_names() -> None:
|
||||
client = MockClient()
|
||||
library = LibraryClient(client) # type: ignore[arg-type]
|
||||
item = build_item(authors=[{"name": "A"}, {"name": "B"}])
|
||||
assert library.extract_authors(item) == "A, B"
|
||||
|
||||
|
||||
def test_extract_runtime_minutes_from_dict() -> None:
|
||||
client = MockClient()
|
||||
library = LibraryClient(client) # type: ignore[arg-type]
|
||||
item = build_item(runtime_min=12)
|
||||
assert library.extract_runtime_minutes(item) == 12
|
||||
|
||||
|
||||
def test_extract_progress_info_from_listening_status() -> None:
|
||||
client = MockClient()
|
||||
library = LibraryClient(client) # type: ignore[arg-type]
|
||||
item = build_item(listening_status={"percent_complete": 25.0})
|
||||
assert library.extract_progress_info(item) == 25.0
|
||||
|
||||
|
||||
def test_is_finished_with_percent_complete() -> None:
|
||||
client = MockClient()
|
||||
library = LibraryClient(client) # type: ignore[arg-type]
|
||||
item = build_item(percent_complete=100)
|
||||
assert library.is_finished(item)
|
||||
|
||||
|
||||
def test_format_duration_and_time() -> None:
|
||||
client = MockClient()
|
||||
library = LibraryClient(client) # type: ignore[arg-type]
|
||||
assert library.format_duration(61) == "1h01"
|
||||
assert library.format_time(3661) == "01:01:01"
|
||||
|
||||
|
||||
def test_mark_as_finished_success_updates_item() -> None:
|
||||
client = MockClient()
|
||||
client._post_response = {"content_license": {"acr": "token"}}
|
||||
library = LibraryClient(client) # type: ignore[arg-type]
|
||||
item = build_item(runtime_min=1, listening_status={})
|
||||
ok = library.mark_as_finished("ASIN", item)
|
||||
assert ok
|
||||
assert client.put_calls
|
||||
path, body = client.put_calls[0]
|
||||
assert path == "1.0/lastpositions/ASIN"
|
||||
assert body["acr"] == "token"
|
||||
assert body["position_ms"] == 60_000
|
||||
assert item["is_finished"] is True
|
||||
assert item["listening_status"]["is_finished"] is True
|
||||
|
||||
|
||||
def test_mark_as_finished_fails_without_acr() -> None:
|
||||
client = MockClient()
|
||||
client._post_response = {}
|
||||
library = LibraryClient(client) # type: ignore[arg-type]
|
||||
item = build_item(runtime_min=1)
|
||||
ok = library.mark_as_finished("ASIN", item)
|
||||
assert ok is False
|
||||
|
||||
|
||||
def test_mark_as_finished_handles_put_error() -> None:
|
||||
client = MockClient()
|
||||
client._post_response = {"content_license": {"acr": "token"}}
|
||||
client.raise_on_put = True
|
||||
library = LibraryClient(client) # type: ignore[arg-type]
|
||||
item = build_item(runtime_min=1)
|
||||
ok = library.mark_as_finished("ASIN", item)
|
||||
assert ok is False
|
||||
|
||||
|
||||
def build_item(
|
||||
*,
|
||||
title: str | None = None,
|
||||
product_title: str | None = None,
|
||||
authors: list[dict] | None = None,
|
||||
runtime_min: int | None = None,
|
||||
listening_status: dict | None = None,
|
||||
percent_complete: int | float | None = None,
|
||||
) -> dict:
|
||||
item: dict = {}
|
||||
if title is not None:
|
||||
item["title"] = title
|
||||
if percent_complete is not None:
|
||||
item["percent_complete"] = percent_complete
|
||||
if listening_status is not None:
|
||||
item["listening_status"] = listening_status
|
||||
product: dict = {}
|
||||
if product_title is not None:
|
||||
product["title"] = product_title
|
||||
if runtime_min is not None:
|
||||
product["runtime_length"] = {"min": runtime_min}
|
||||
if authors is not None:
|
||||
product["authors"] = authors
|
||||
if product:
|
||||
item["product"] = product
|
||||
if runtime_min is not None and "runtime_length_min" not in item:
|
||||
item["runtime_length_min"] = runtime_min
|
||||
return item
|
||||
@@ -1,89 +0,0 @@
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, cast
|
||||
|
||||
from auditui.constants import AUTHOR_NAME_MAX_LENGTH
|
||||
from auditui.library import (
|
||||
create_progress_sort_key,
|
||||
create_title_sort_key,
|
||||
format_item_as_row,
|
||||
truncate_author_name,
|
||||
)
|
||||
|
||||
|
||||
class StubLibrary:
|
||||
def extract_title(self, item: dict) -> str:
|
||||
return item.get("title", "")
|
||||
|
||||
def extract_authors(self, item: dict) -> str:
|
||||
return item.get("authors", "")
|
||||
|
||||
def extract_runtime_minutes(self, item: dict) -> int | None:
|
||||
return item.get("minutes")
|
||||
|
||||
def format_duration(
|
||||
self, value: int | None, unit: str = "minutes", default_none: str | None = None
|
||||
) -> str | None:
|
||||
if value is None:
|
||||
return default_none
|
||||
return f"{value}m"
|
||||
|
||||
def extract_progress_info(self, item: dict) -> float | None:
|
||||
return item.get("percent")
|
||||
|
||||
def extract_asin(self, item: dict) -> str | None:
|
||||
return item.get("asin")
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class StubDownloads:
|
||||
_cached: set[str]
|
||||
|
||||
def is_cached(self, asin: str) -> bool:
|
||||
return asin in self._cached
|
||||
|
||||
|
||||
def test_create_title_sort_key_normalizes_accents() -> None:
|
||||
key_fn, _ = create_title_sort_key()
|
||||
assert key_fn(["École"]) == "ecole"
|
||||
assert key_fn(["Zoo"]) == "zoo"
|
||||
|
||||
|
||||
def test_create_progress_sort_key_parses_percent() -> None:
|
||||
key_fn, _ = create_progress_sort_key()
|
||||
assert key_fn(["0", "0", "0", "42.5%"]) == 42.5
|
||||
assert key_fn(["0", "0", "0", "bad"]) == 0.0
|
||||
|
||||
|
||||
def test_truncate_author_name() -> None:
|
||||
long_name = "A" * (AUTHOR_NAME_MAX_LENGTH + 5)
|
||||
truncated = truncate_author_name(long_name)
|
||||
assert truncated.endswith("...")
|
||||
assert len(truncated) <= AUTHOR_NAME_MAX_LENGTH
|
||||
|
||||
|
||||
def test_format_item_as_row_with_downloaded() -> None:
|
||||
library = StubLibrary()
|
||||
downloads = StubDownloads({"ASIN123"})
|
||||
item = {
|
||||
"title": "Title",
|
||||
"authors": "Author One",
|
||||
"minutes": 90,
|
||||
"percent": 12.34,
|
||||
"asin": "ASIN123",
|
||||
}
|
||||
title, author, runtime, progress, downloaded = format_item_as_row(
|
||||
item, library, cast(Any, downloads)
|
||||
)
|
||||
assert title == "Title"
|
||||
assert author == "Author One"
|
||||
assert runtime == "90m"
|
||||
assert progress == "12.3%"
|
||||
assert downloaded == "✓"
|
||||
|
||||
|
||||
def test_format_item_as_row_zero_progress() -> None:
|
||||
library = StubLibrary()
|
||||
item = {"title": "Title", "authors": "Author",
|
||||
"minutes": 30, "percent": 0.0}
|
||||
_, _, _, progress, _ = format_item_as_row(item, library, None)
|
||||
assert progress == "0%"
|
||||
@@ -1,35 +0,0 @@
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
from auditui.stats.email import (
|
||||
find_email_in_data,
|
||||
get_email_from_auth,
|
||||
get_email_from_auth_file,
|
||||
get_email_from_config,
|
||||
)
|
||||
|
||||
|
||||
def test_find_email_in_data() -> None:
|
||||
data = {"a": {"b": ["nope", "user@example.com"]}}
|
||||
assert find_email_in_data(data) == "user@example.com"
|
||||
|
||||
|
||||
def test_get_email_from_config(tmp_path: Path) -> None:
|
||||
config_path = tmp_path / "config.json"
|
||||
config_path.write_text(json.dumps({"email": "config@example.com"}))
|
||||
assert get_email_from_config(config_path) == "config@example.com"
|
||||
|
||||
|
||||
def test_get_email_from_auth_file(tmp_path: Path) -> None:
|
||||
auth_path = tmp_path / "auth.json"
|
||||
auth_path.write_text(json.dumps({"email": "auth@example.com"}))
|
||||
assert get_email_from_auth_file(auth_path) == "auth@example.com"
|
||||
|
||||
|
||||
def test_get_email_from_auth() -> None:
|
||||
class Auth:
|
||||
username = "user@example.com"
|
||||
login = None
|
||||
email = None
|
||||
|
||||
assert get_email_from_auth(Auth()) == "user@example.com"
|
||||
@@ -9,40 +9,54 @@ from textual.widgets import Input
|
||||
|
||||
@dataclass(slots=True)
|
||||
class DummyEvent:
|
||||
"""Minimal event object carrying an input value for tests."""
|
||||
|
||||
value: str
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class FakeTimer:
|
||||
"""Timer substitute recording whether stop() was called."""
|
||||
|
||||
callback: Callable[[], None]
|
||||
stopped: bool = False
|
||||
|
||||
def stop(self) -> None:
|
||||
"""Mark timer as stopped."""
|
||||
self.stopped = True
|
||||
|
||||
|
||||
def test_filter_debounce_uses_latest_value(monkeypatch) -> None:
|
||||
"""Ensure debounce cancels previous timer and emits latest input value."""
|
||||
seen: list[str] = []
|
||||
timers: list[FakeTimer] = []
|
||||
|
||||
def on_change(value: str) -> None:
|
||||
"""Capture emitted filter values."""
|
||||
seen.append(value)
|
||||
|
||||
screen = FilterScreen(on_change=on_change, debounce_seconds=0.2)
|
||||
|
||||
def fake_set_timer(_delay: float, callback):
|
||||
def fake_set_timer(_delay: float, callback: Callable[[], None]) -> FakeTimer:
|
||||
"""Record timer callbacks instead of scheduling real timers."""
|
||||
timer = FakeTimer(callback)
|
||||
timers.append(timer)
|
||||
return timer
|
||||
|
||||
monkeypatch.setattr(screen, "set_timer", fake_set_timer)
|
||||
|
||||
screen.on_input_changed(cast(Input.Changed, DummyEvent("a")))
|
||||
screen.on_input_changed(cast(Input.Changed, DummyEvent("ab")))
|
||||
|
||||
assert len(timers) == 2
|
||||
assert timers[0].stopped is True
|
||||
assert timers[1].stopped is False
|
||||
|
||||
timers[1].callback()
|
||||
assert seen == ["ab"]
|
||||
|
||||
|
||||
def test_on_unmount_stops_pending_timer() -> None:
|
||||
"""Ensure screen unmount stops pending debounce timer when present."""
|
||||
screen = FilterScreen(on_change=lambda _value: None)
|
||||
timer = FakeTimer(lambda: None)
|
||||
screen._debounce_timer = timer
|
||||
screen.on_unmount()
|
||||
assert timer.stopped is True
|
||||
Reference in New Issue
Block a user