test: cover app and playback controller mixin behavior
This commit is contained in:
124
tests/app/test_app_actions_selection_and_controls.py
Normal file
124
tests/app/test_app_actions_selection_and_controls.py
Normal file
@@ -0,0 +1,124 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
from auditui.app.actions import AppActionsMixin
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class FakeTable:
|
||||
"""Minimal table shim exposing cursor and row count."""
|
||||
|
||||
row_count: int
|
||||
cursor_row: int = 0
|
||||
|
||||
|
||||
class FakePlayback:
|
||||
"""Playback stub with togglable boolean return values."""
|
||||
|
||||
def __init__(self, result: bool) -> None:
|
||||
"""Store deterministic toggle result for tests."""
|
||||
self._result = result
|
||||
self.calls: list[str] = []
|
||||
|
||||
def toggle_playback(self) -> bool:
|
||||
"""Return configured result and record call."""
|
||||
self.calls.append("toggle")
|
||||
return self._result
|
||||
|
||||
def seek_forward(self, _seconds: float) -> bool:
|
||||
"""Return configured result and record call."""
|
||||
self.calls.append("seek_forward")
|
||||
return self._result
|
||||
|
||||
|
||||
class DummyActionsApp(AppActionsMixin):
|
||||
"""Mixin host with just enough state for action method tests."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Initialize fake app state used by action helpers."""
|
||||
self.messages: list[str] = []
|
||||
self.current_items: list[dict] = []
|
||||
self.download_manager = object()
|
||||
self.library_client = type(
|
||||
"Library", (), {"extract_asin": lambda self, item: item.get("asin")}
|
||||
)()
|
||||
self.playback = FakePlayback(True)
|
||||
self.filter_text = "hello"
|
||||
self._refreshed = 0
|
||||
self._table = FakeTable(row_count=0, cursor_row=0)
|
||||
|
||||
def update_status(self, message: str) -> None:
|
||||
"""Collect status messages for assertions."""
|
||||
self.messages.append(message)
|
||||
|
||||
def query_one(self, selector: str, _type: object) -> FakeTable:
|
||||
"""Return the fake table used in selection tests."""
|
||||
assert selector == "#library_table"
|
||||
return self._table
|
||||
|
||||
def _refresh_filtered_view(self) -> None:
|
||||
"""Record refresh invocations for filter tests."""
|
||||
self._refreshed += 1
|
||||
|
||||
def _start_playback_async(self, asin: str) -> None:
|
||||
"""Capture async playback launch argument."""
|
||||
self.messages.append(f"start:{asin}")
|
||||
|
||||
|
||||
def test_get_selected_asin_requires_non_empty_table() -> None:
|
||||
"""Ensure selection fails gracefully when table has no rows."""
|
||||
app = DummyActionsApp()
|
||||
app._table = FakeTable(row_count=0)
|
||||
assert app._get_selected_asin() is None
|
||||
assert app.messages[-1] == "No books available"
|
||||
|
||||
|
||||
def test_get_selected_asin_returns_current_row_asin() -> None:
|
||||
"""Ensure selected row index maps to current_items ASIN."""
|
||||
app = DummyActionsApp()
|
||||
app._table = FakeTable(row_count=2, cursor_row=1)
|
||||
app.current_items = [{"asin": "A1"}, {"asin": "A2"}]
|
||||
assert app._get_selected_asin() == "A2"
|
||||
|
||||
|
||||
def test_action_play_selected_starts_async_playback() -> None:
|
||||
"""Ensure play action calls async starter with selected ASIN."""
|
||||
app = DummyActionsApp()
|
||||
app._table = FakeTable(row_count=1, cursor_row=0)
|
||||
app.current_items = [{"asin": "ASIN"}]
|
||||
app.action_play_selected()
|
||||
assert app.messages[-1] == "start:ASIN"
|
||||
|
||||
|
||||
def test_action_toggle_playback_shows_hint_when_no_playback() -> None:
|
||||
"""Ensure toggle action displays no-playback hint on false return."""
|
||||
app = DummyActionsApp()
|
||||
app.playback = FakePlayback(False)
|
||||
app.action_toggle_playback()
|
||||
assert app.messages[-1] == "No playback active. Press Enter to play a book."
|
||||
|
||||
|
||||
def test_action_seek_forward_shows_hint_when_seek_fails() -> None:
|
||||
"""Ensure failed seek action reuses no-playback helper status."""
|
||||
app = DummyActionsApp()
|
||||
app.playback = FakePlayback(False)
|
||||
app.action_seek_forward()
|
||||
assert app.messages[-1] == "No playback active. Press Enter to play a book."
|
||||
|
||||
|
||||
def test_action_clear_filter_resets_filter_and_refreshes() -> None:
|
||||
"""Ensure clearing filter resets text and refreshes filtered view."""
|
||||
app = DummyActionsApp()
|
||||
app.action_clear_filter()
|
||||
assert app.filter_text == ""
|
||||
assert app._refreshed == 1
|
||||
assert app.messages[-1] == "Filter cleared"
|
||||
|
||||
|
||||
def test_apply_filter_coerces_none_to_empty_string() -> None:
|
||||
"""Ensure apply_filter normalizes None and refreshes list view."""
|
||||
app = DummyActionsApp()
|
||||
app._apply_filter(None)
|
||||
assert app.filter_text == ""
|
||||
assert app._refreshed == 1
|
||||
114
tests/app/test_app_library_mixin_behavior.py
Normal file
114
tests/app/test_app_library_mixin_behavior.py
Normal file
@@ -0,0 +1,114 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from auditui.app.library import AppLibraryMixin
|
||||
from auditui.app import library as library_mod
|
||||
|
||||
|
||||
class DummyLibraryApp(AppLibraryMixin):
|
||||
"""Mixin host exposing only members used by AppLibraryMixin."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Initialize in-memory app state and call tracking."""
|
||||
self.all_items: list[dict] = []
|
||||
self.show_all_mode = False
|
||||
self._search_text_cache: dict[int, str] = {1: "x"}
|
||||
self.messages: list[str] = []
|
||||
self.call_log: list[tuple[str, tuple]] = []
|
||||
self.library_client = None
|
||||
|
||||
def _prime_search_cache(self, items: list[dict]) -> None:
|
||||
"""Store a marker so callers can assert this method was reached."""
|
||||
self.call_log.append(("prime", (items,)))
|
||||
|
||||
def show_all(self) -> None:
|
||||
"""Record show_all invocation for assertion."""
|
||||
self.call_log.append(("show_all", ()))
|
||||
|
||||
def show_unfinished(self) -> None:
|
||||
"""Record show_unfinished invocation for assertion."""
|
||||
self.call_log.append(("show_unfinished", ()))
|
||||
|
||||
def update_status(self, message: str) -> None:
|
||||
"""Capture status messages."""
|
||||
self.messages.append(message)
|
||||
|
||||
def call_from_thread(self, func, *args) -> None:
|
||||
"""Execute callback immediately to simplify tests."""
|
||||
func(*args)
|
||||
|
||||
def _thread_status_update(self, message: str) -> None:
|
||||
"""Capture worker-thread status update messages."""
|
||||
self.messages.append(message)
|
||||
|
||||
|
||||
def test_on_library_loaded_refreshes_cache_and_shows_unfinished() -> None:
|
||||
"""Ensure loaded items reset cache and default to unfinished view."""
|
||||
app = DummyLibraryApp()
|
||||
items = [{"asin": "a"}, {"asin": "b"}]
|
||||
app.on_library_loaded(items)
|
||||
assert app.all_items == items
|
||||
assert app._search_text_cache == {}
|
||||
assert app.messages[-1] == "Loaded 2 books"
|
||||
assert app.call_log[-1][0] == "show_unfinished"
|
||||
|
||||
|
||||
def test_on_library_loaded_uses_show_all_mode() -> None:
|
||||
"""Ensure loaded items respect show_all mode when enabled."""
|
||||
app = DummyLibraryApp()
|
||||
app.show_all_mode = True
|
||||
app.on_library_loaded([{"asin": "a"}])
|
||||
assert app.call_log[-1][0] == "show_all"
|
||||
|
||||
|
||||
def test_on_library_error_formats_message() -> None:
|
||||
"""Ensure library errors are surfaced through status updates."""
|
||||
app = DummyLibraryApp()
|
||||
app.on_library_error("boom")
|
||||
assert app.messages == ["Error fetching library: boom"]
|
||||
|
||||
|
||||
def test_fetch_library_calls_on_loaded(monkeypatch) -> None:
|
||||
"""Ensure fetch_library forwards fetched items through call_from_thread."""
|
||||
app = DummyLibraryApp()
|
||||
|
||||
class Worker:
|
||||
"""Simple worker shim exposing cancellation state."""
|
||||
|
||||
is_cancelled = False
|
||||
|
||||
class LibraryClient:
|
||||
"""Fake client returning a deterministic item list."""
|
||||
|
||||
def fetch_all_items(self, callback):
|
||||
"""Invoke callback and return one item."""
|
||||
callback("progress")
|
||||
return [{"asin": "x"}]
|
||||
|
||||
app.library_client = LibraryClient()
|
||||
monkeypatch.setattr(library_mod, "get_current_worker", lambda: Worker())
|
||||
AppLibraryMixin.fetch_library.__wrapped__(app)
|
||||
assert app.all_items == [{"asin": "x"}]
|
||||
assert "Loaded 1 books" in app.messages
|
||||
|
||||
|
||||
def test_fetch_library_handles_expected_exception(monkeypatch) -> None:
|
||||
"""Ensure fetch exceptions call on_library_error with error text."""
|
||||
app = DummyLibraryApp()
|
||||
|
||||
class Worker:
|
||||
"""Simple worker shim exposing cancellation state."""
|
||||
|
||||
is_cancelled = False
|
||||
|
||||
class BrokenClient:
|
||||
"""Fake client raising an expected fetch exception."""
|
||||
|
||||
def fetch_all_items(self, callback):
|
||||
"""Raise the same exception family handled by mixin."""
|
||||
del callback
|
||||
raise ValueError("bad fetch")
|
||||
|
||||
app.library_client = BrokenClient()
|
||||
monkeypatch.setattr(library_mod, "get_current_worker", lambda: Worker())
|
||||
AppLibraryMixin.fetch_library.__wrapped__(app)
|
||||
assert app.messages[-1] == "Error fetching library: bad fetch"
|
||||
148
tests/app/test_app_progress_mixin_behavior.py
Normal file
148
tests/app/test_app_progress_mixin_behavior.py
Normal file
@@ -0,0 +1,148 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import cast
|
||||
|
||||
from auditui.app.progress import AppProgressMixin
|
||||
from textual.events import Key
|
||||
from textual.widgets import DataTable
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class FakeKeyEvent:
|
||||
"""Minimal key event carrying key value and prevent_default state."""
|
||||
|
||||
key: str
|
||||
prevented: bool = False
|
||||
|
||||
def prevent_default(self) -> None:
|
||||
"""Mark event as prevented."""
|
||||
self.prevented = True
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class FakeStatic:
|
||||
"""Minimal static widget with text and visibility fields."""
|
||||
|
||||
display: bool = False
|
||||
text: str = ""
|
||||
|
||||
def update(self, value: str) -> None:
|
||||
"""Store rendered text value."""
|
||||
self.text = value
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class FakeProgressBar:
|
||||
"""Minimal progress bar widget storing latest progress value."""
|
||||
|
||||
progress: float = 0.0
|
||||
|
||||
def update(self, progress: float) -> None:
|
||||
"""Store progress value for assertions."""
|
||||
self.progress = progress
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class FakeContainer:
|
||||
"""Minimal container exposing display property."""
|
||||
|
||||
display: bool = False
|
||||
|
||||
|
||||
class DummyPlayback:
|
||||
"""Playback shim exposing only members used by AppProgressMixin."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Initialize playback state and update counters."""
|
||||
self.is_playing = False
|
||||
self._status: str | None = None
|
||||
self._progress: tuple[str, float, float] | None = None
|
||||
self.saved_calls = 0
|
||||
|
||||
def check_status(self):
|
||||
"""Return configurable status check message."""
|
||||
return self._status
|
||||
|
||||
def get_current_progress(self):
|
||||
"""Return configurable progress tuple."""
|
||||
return self._progress
|
||||
|
||||
def update_position_if_needed(self) -> None:
|
||||
"""Record periodic save invocations."""
|
||||
self.saved_calls += 1
|
||||
|
||||
|
||||
class DummyProgressApp(AppProgressMixin):
|
||||
"""Mixin host that records action dispatch and widget updates."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Initialize fake widgets and playback state."""
|
||||
self.playback = DummyPlayback()
|
||||
self.focused = object()
|
||||
self.actions: list[str] = []
|
||||
self.messages: list[str] = []
|
||||
self.progress_info = FakeStatic()
|
||||
self.progress_bar = FakeProgressBar()
|
||||
self.progress_container = FakeContainer()
|
||||
|
||||
def action_seek_backward(self) -> None:
|
||||
"""Record backward seek action dispatch."""
|
||||
self.actions.append("seek_backward")
|
||||
|
||||
def action_toggle_playback(self) -> None:
|
||||
"""Record toggle playback action dispatch."""
|
||||
self.actions.append("toggle")
|
||||
|
||||
def update_status(self, message: str) -> None:
|
||||
"""Capture status messages for assertions."""
|
||||
self.messages.append(message)
|
||||
|
||||
def query_one(self, selector: str, _type: object):
|
||||
"""Return fake widgets by selector used by progress mixin."""
|
||||
return {
|
||||
"#progress_info": self.progress_info,
|
||||
"#progress_bar": self.progress_bar,
|
||||
"#progress_bar_container": self.progress_container,
|
||||
}[selector]
|
||||
|
||||
|
||||
def test_on_key_dispatches_seek_when_playing() -> None:
|
||||
"""Ensure left key is intercepted and dispatched to seek action."""
|
||||
app = DummyProgressApp()
|
||||
app.playback.is_playing = True
|
||||
event = FakeKeyEvent("left")
|
||||
app.on_key(cast(Key, event))
|
||||
assert event.prevented is True
|
||||
assert app.actions == ["seek_backward"]
|
||||
|
||||
|
||||
def test_on_key_dispatches_space_when_table_focused() -> None:
|
||||
"""Ensure space is intercepted and dispatched when table is focused."""
|
||||
app = DummyProgressApp()
|
||||
app.focused = DataTable()
|
||||
event = FakeKeyEvent("space")
|
||||
app.on_key(cast(Key, event))
|
||||
assert event.prevented is True
|
||||
assert app.actions == ["toggle"]
|
||||
|
||||
|
||||
def test_check_playback_status_hides_progress_after_message() -> None:
|
||||
"""Ensure playback status message triggers hide-progress behavior."""
|
||||
app = DummyProgressApp()
|
||||
app.playback._status = "Finished"
|
||||
app._check_playback_status()
|
||||
assert app.messages[-1] == "Finished"
|
||||
assert app.progress_info.display is False
|
||||
assert app.progress_container.display is False
|
||||
|
||||
|
||||
def test_update_progress_renders_visible_progress_row() -> None:
|
||||
"""Ensure valid progress data updates widgets and makes them visible."""
|
||||
app = DummyProgressApp()
|
||||
app.playback.is_playing = True
|
||||
app.playback._progress = ("Chapter", 30.0, 60.0)
|
||||
app._update_progress()
|
||||
assert app.progress_bar.progress == 50.0
|
||||
assert app.progress_info.display is True
|
||||
assert app.progress_container.display is True
|
||||
30
tests/app/test_app_progress_periodic_save.py
Normal file
30
tests/app/test_app_progress_periodic_save.py
Normal file
@@ -0,0 +1,30 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from auditui.app.progress import AppProgressMixin
|
||||
|
||||
|
||||
class DummyPlayback:
|
||||
"""Playback stub exposing periodic update method."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Initialize call counter."""
|
||||
self.saved_calls = 0
|
||||
|
||||
def update_position_if_needed(self) -> None:
|
||||
"""Increment call counter for assertions."""
|
||||
self.saved_calls += 1
|
||||
|
||||
|
||||
class DummyProgressApp(AppProgressMixin):
|
||||
"""Minimal app host containing playback dependency only."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Initialize playback stub."""
|
||||
self.playback = DummyPlayback()
|
||||
|
||||
|
||||
def test_save_position_periodically_delegates_to_playback() -> None:
|
||||
"""Ensure periodic save method delegates to playback updater."""
|
||||
app = DummyProgressApp()
|
||||
app._save_position_periodically()
|
||||
assert app.playback.saved_calls == 1
|
||||
78
tests/app/test_app_state_initialization.py
Normal file
78
tests/app/test_app_state_initialization.py
Normal file
@@ -0,0 +1,78 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from auditui.app import state as state_mod
|
||||
|
||||
|
||||
class DummyApp:
|
||||
"""Lightweight app object for state initialization tests."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Expose update_status to satisfy init dependencies."""
|
||||
self.messages: list[str] = []
|
||||
|
||||
def update_status(self, message: str) -> None:
|
||||
"""Collect status updates for assertions."""
|
||||
self.messages.append(message)
|
||||
|
||||
|
||||
def test_init_state_without_auth_or_client(monkeypatch) -> None:
|
||||
"""Ensure baseline state is initialized when no auth/client is provided."""
|
||||
app = DummyApp()
|
||||
playback_args: list[tuple[object, object]] = []
|
||||
|
||||
class FakePlayback:
|
||||
"""Playback constructor recorder for init tests."""
|
||||
|
||||
def __init__(self, notify, library_client) -> None:
|
||||
"""Capture arguments passed by init_auditui_state."""
|
||||
playback_args.append((notify, library_client))
|
||||
|
||||
monkeypatch.setattr(state_mod, "PlaybackController", FakePlayback)
|
||||
state_mod.init_auditui_state(app)
|
||||
assert app.library_client is None
|
||||
assert app.download_manager is None
|
||||
assert app.all_items == []
|
||||
assert app.current_items == []
|
||||
assert app.filter_text == ""
|
||||
assert app.show_all_mode is False
|
||||
assert playback_args and playback_args[0][1] is None
|
||||
|
||||
|
||||
def test_init_state_with_auth_and_client_builds_dependencies(monkeypatch) -> None:
|
||||
"""Ensure init constructs library, downloads, and playback dependencies."""
|
||||
app = DummyApp()
|
||||
auth = object()
|
||||
client = object()
|
||||
|
||||
class FakeLibraryClient:
|
||||
"""Fake library client constructor for dependency wiring checks."""
|
||||
|
||||
def __init__(self, value) -> None:
|
||||
"""Store constructor argument for assertions."""
|
||||
self.value = value
|
||||
|
||||
class FakeDownloadManager:
|
||||
"""Fake download manager constructor for dependency wiring checks."""
|
||||
|
||||
def __init__(self, auth_value, client_value) -> None:
|
||||
"""Store constructor arguments for assertions."""
|
||||
self.args = (auth_value, client_value)
|
||||
|
||||
class FakePlayback:
|
||||
"""Fake playback constructor for dependency wiring checks."""
|
||||
|
||||
def __init__(self, notify, library_client) -> None:
|
||||
"""Store constructor arguments for assertions."""
|
||||
self.notify = notify
|
||||
self.library_client = library_client
|
||||
|
||||
monkeypatch.setattr(state_mod, "LibraryClient", FakeLibraryClient)
|
||||
monkeypatch.setattr(state_mod, "DownloadManager", FakeDownloadManager)
|
||||
monkeypatch.setattr(state_mod, "PlaybackController", FakePlayback)
|
||||
state_mod.init_auditui_state(app, auth=auth, client=client)
|
||||
assert isinstance(app.library_client, FakeLibraryClient)
|
||||
assert isinstance(app.download_manager, FakeDownloadManager)
|
||||
assert isinstance(app.playback, FakePlayback)
|
||||
assert app.library_client.value is client
|
||||
assert app.download_manager.args == (auth, client)
|
||||
assert app.playback.library_client.value is client
|
||||
121
tests/playback/test_playback_controller_lifecycle_mixin.py
Normal file
121
tests/playback/test_playback_controller_lifecycle_mixin.py
Normal file
@@ -0,0 +1,121 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from auditui.playback import controller_lifecycle as lifecycle_mod
|
||||
from auditui.playback.controller import PlaybackController
|
||||
|
||||
|
||||
class Proc:
|
||||
"""Process shim used for lifecycle tests."""
|
||||
|
||||
def __init__(self, poll_value=None) -> None:
|
||||
"""Set initial poll result."""
|
||||
self._poll_value = poll_value
|
||||
|
||||
def poll(self):
|
||||
"""Return process running status."""
|
||||
return self._poll_value
|
||||
|
||||
|
||||
def _controller() -> tuple[PlaybackController, list[str]]:
|
||||
"""Build controller and message capture list."""
|
||||
messages: list[str] = []
|
||||
return PlaybackController(messages.append, None), messages
|
||||
|
||||
|
||||
def test_start_reports_missing_ffplay(monkeypatch) -> None:
|
||||
"""Ensure start fails fast when ffplay is unavailable."""
|
||||
controller, messages = _controller()
|
||||
monkeypatch.setattr(lifecycle_mod.process_mod, "is_ffplay_available", lambda: False)
|
||||
assert controller.start(Path("book.aax")) is False
|
||||
assert messages[-1] == "ffplay not found. Please install ffmpeg"
|
||||
|
||||
|
||||
def test_start_sets_state_on_success(monkeypatch) -> None:
|
||||
"""Ensure successful start initializes playback state and metadata."""
|
||||
controller, messages = _controller()
|
||||
monkeypatch.setattr(lifecycle_mod.process_mod, "is_ffplay_available", lambda: True)
|
||||
monkeypatch.setattr(
|
||||
lifecycle_mod.process_mod, "build_ffplay_cmd", lambda *args: ["ffplay"]
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
lifecycle_mod.process_mod, "run_ffplay", lambda cmd: (Proc(None), None)
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
lifecycle_mod,
|
||||
"load_media_info",
|
||||
lambda path, activation: (600.0, [{"title": "ch"}]),
|
||||
)
|
||||
monkeypatch.setattr(lifecycle_mod.time, "time", lambda: 100.0)
|
||||
ok = controller.start(
|
||||
Path("book.aax"), activation_hex="abcd", start_position=10.0, speed=1.2
|
||||
)
|
||||
assert ok is True
|
||||
assert controller.is_playing is True
|
||||
assert controller.current_file_path == Path("book.aax")
|
||||
assert controller.total_duration == 600.0
|
||||
assert messages[-1] == "Playing: book.aax"
|
||||
|
||||
|
||||
def test_prepare_and_start_uses_last_position(monkeypatch) -> None:
|
||||
"""Ensure prepare flow resumes from saved position when available."""
|
||||
messages: list[str] = []
|
||||
lib = type("Lib", (), {"get_last_position": lambda self, asin: 75.0})()
|
||||
controller = PlaybackController(messages.append, lib)
|
||||
started: list[tuple] = []
|
||||
|
||||
class DM:
|
||||
"""Download manager shim returning path and activation token."""
|
||||
|
||||
def get_or_download(self, asin, notify):
|
||||
"""Return deterministic downloaded file path."""
|
||||
return Path("book.aax")
|
||||
|
||||
def get_activation_bytes(self):
|
||||
"""Return deterministic activation token."""
|
||||
return "abcd"
|
||||
|
||||
monkeypatch.setattr(controller, "start", lambda *args: started.append(args) or True)
|
||||
monkeypatch.setattr(lifecycle_mod.time, "time", lambda: 200.0)
|
||||
assert controller.prepare_and_start(DM(), "ASIN") is True
|
||||
assert started and started[0][3] == 75.0
|
||||
assert "Resuming from 01:15" in messages
|
||||
|
||||
|
||||
def test_toggle_playback_uses_pause_and_resume_paths(monkeypatch) -> None:
|
||||
"""Ensure toggle dispatches pause or resume based on paused flag."""
|
||||
controller, _ = _controller()
|
||||
controller.is_playing = True
|
||||
controller.playback_process = Proc(None)
|
||||
called: list[str] = []
|
||||
monkeypatch.setattr(controller, "pause", lambda: called.append("pause"))
|
||||
monkeypatch.setattr(controller, "resume", lambda: called.append("resume"))
|
||||
controller.is_paused = False
|
||||
assert controller.toggle_playback() is True
|
||||
controller.is_paused = True
|
||||
assert controller.toggle_playback() is True
|
||||
assert called == ["pause", "resume"]
|
||||
|
||||
|
||||
def test_restart_at_position_restores_state_and_notifies(monkeypatch) -> None:
|
||||
"""Ensure restart logic preserves metadata and emits custom message."""
|
||||
controller, messages = _controller()
|
||||
controller.is_playing = True
|
||||
controller.is_paused = True
|
||||
controller.current_file_path = Path("book.aax")
|
||||
controller.current_asin = "ASIN"
|
||||
controller.activation_hex = "abcd"
|
||||
controller.total_duration = 400.0
|
||||
controller.chapters = [{"title": "One"}]
|
||||
controller.playback_speed = 1.0
|
||||
monkeypatch.setattr(controller, "_stop_process", lambda: None)
|
||||
monkeypatch.setattr(lifecycle_mod.time, "sleep", lambda _s: None)
|
||||
monkeypatch.setattr(controller, "start", lambda *args: True)
|
||||
paused: list[str] = []
|
||||
monkeypatch.setattr(controller, "pause", lambda: paused.append("pause"))
|
||||
assert controller._restart_at_position(120.0, message="Jumped") is True
|
||||
assert controller.current_asin == "ASIN"
|
||||
assert controller.chapters == [{"title": "One"}]
|
||||
assert paused == ["pause"]
|
||||
assert messages[-1] == "Jumped"
|
||||
100
tests/playback/test_playback_controller_seek_speed_mixin.py
Normal file
100
tests/playback/test_playback_controller_seek_speed_mixin.py
Normal file
@@ -0,0 +1,100 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from auditui.playback import controller_seek_speed as seek_speed_mod
|
||||
from auditui.playback.controller import PlaybackController
|
||||
|
||||
|
||||
def _controller() -> tuple[PlaybackController, list[str]]:
|
||||
"""Build controller and in-memory notification sink."""
|
||||
messages: list[str] = []
|
||||
return PlaybackController(messages.append, None), messages
|
||||
|
||||
|
||||
def test_seek_notifies_when_target_invalid(monkeypatch) -> None:
|
||||
"""Ensure seek reports end-of-file condition when target is invalid."""
|
||||
controller, messages = _controller()
|
||||
monkeypatch.setattr(controller, "_get_current_elapsed", lambda: 20.0)
|
||||
controller.seek_offset = 100.0
|
||||
controller.total_duration = 120.0
|
||||
monkeypatch.setattr(
|
||||
seek_speed_mod.seek_mod, "compute_seek_target", lambda *args: None
|
||||
)
|
||||
assert controller._seek(30.0, "forward") is False
|
||||
assert messages[-1] == "Already at end of file"
|
||||
|
||||
|
||||
def test_seek_to_chapter_reports_bounds(monkeypatch) -> None:
|
||||
"""Ensure chapter seek reports first and last chapter boundaries."""
|
||||
controller, messages = _controller()
|
||||
controller.is_playing = True
|
||||
controller.current_file_path = object()
|
||||
controller.chapters = [{"title": "One", "start_time": 0.0, "end_time": 10.0}]
|
||||
monkeypatch.setattr(controller, "_get_current_elapsed", lambda: 1.0)
|
||||
monkeypatch.setattr(
|
||||
seek_speed_mod.chapters_mod,
|
||||
"get_current_chapter_index",
|
||||
lambda elapsed, chapters: 0,
|
||||
)
|
||||
assert controller.seek_to_chapter("next") is False
|
||||
assert messages[-1] == "Already at last chapter"
|
||||
assert controller.seek_to_chapter("previous") is False
|
||||
assert messages[-1] == "Already at first chapter"
|
||||
|
||||
|
||||
def test_save_current_position_writes_positive_values() -> None:
|
||||
"""Ensure save_current_position persists elapsed time via library client."""
|
||||
calls: list[tuple[str, float]] = []
|
||||
library = type(
|
||||
"Library",
|
||||
(),
|
||||
{"save_last_position": lambda self, asin, pos: calls.append((asin, pos))},
|
||||
)()
|
||||
controller = PlaybackController(lambda _msg: None, library)
|
||||
controller.current_asin = "ASIN"
|
||||
controller.is_playing = True
|
||||
controller.playback_start_time = 1.0
|
||||
controller.seek_offset = 10.0
|
||||
controller._get_current_elapsed = lambda: 15.0
|
||||
controller._save_current_position()
|
||||
assert calls == [("ASIN", 25.0)]
|
||||
|
||||
|
||||
def test_update_position_if_needed_honors_interval(monkeypatch) -> None:
|
||||
"""Ensure periodic save runs only when interval has elapsed."""
|
||||
controller, _ = _controller()
|
||||
controller.is_playing = True
|
||||
controller.current_asin = "ASIN"
|
||||
controller.library_client = object()
|
||||
controller.last_save_time = 10.0
|
||||
controller.position_save_interval = 30.0
|
||||
saves: list[str] = []
|
||||
monkeypatch.setattr(
|
||||
controller, "_save_current_position", lambda: saves.append("save")
|
||||
)
|
||||
monkeypatch.setattr(seek_speed_mod.time, "time", lambda: 20.0)
|
||||
controller.update_position_if_needed()
|
||||
monkeypatch.setattr(seek_speed_mod.time, "time", lambda: 45.0)
|
||||
controller.update_position_if_needed()
|
||||
assert saves == ["save"]
|
||||
|
||||
|
||||
def test_change_speed_restarts_with_new_rate(monkeypatch) -> None:
|
||||
"""Ensure speed changes restart playback at current position."""
|
||||
controller, _ = _controller()
|
||||
controller.playback_speed = 1.0
|
||||
controller.seek_offset = 5.0
|
||||
monkeypatch.setattr(controller, "_get_current_elapsed", lambda: 10.0)
|
||||
seen: list[tuple[float, float, str]] = []
|
||||
|
||||
def fake_restart(
|
||||
position: float, speed: float | None = None, message: str | None = None
|
||||
) -> bool:
|
||||
"""Capture restart call parameters."""
|
||||
seen.append((position, speed or 0.0, message or ""))
|
||||
return True
|
||||
|
||||
monkeypatch.setattr(controller, "_restart_at_position", fake_restart)
|
||||
assert controller.increase_speed() is True
|
||||
assert seen and seen[0][0] == 15.0
|
||||
assert seen[0][1] > 1.0
|
||||
assert seen[0][2].startswith("Speed: ")
|
||||
76
tests/playback/test_playback_controller_state_mixin.py
Normal file
76
tests/playback/test_playback_controller_state_mixin.py
Normal file
@@ -0,0 +1,76 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Any, cast
|
||||
|
||||
from auditui.playback import controller_state as state_mod
|
||||
from auditui.playback.controller import PlaybackController
|
||||
|
||||
|
||||
class Proc:
|
||||
"""Simple process shim exposing poll and pid for state tests."""
|
||||
|
||||
def __init__(self, poll_value=None) -> None:
|
||||
"""Store poll return value and fake pid."""
|
||||
self._poll_value = poll_value
|
||||
self.pid = 123
|
||||
|
||||
def poll(self):
|
||||
"""Return configured process status code or None."""
|
||||
return self._poll_value
|
||||
|
||||
|
||||
def _controller() -> tuple[PlaybackController, list[str]]:
|
||||
"""Build playback controller and collected notifications list."""
|
||||
messages: list[str] = []
|
||||
return PlaybackController(messages.append, None), messages
|
||||
|
||||
|
||||
def test_get_current_elapsed_rolls_pause_into_duration(monkeypatch) -> None:
|
||||
"""Ensure elapsed helper absorbs stale pause_start_time when resumed."""
|
||||
controller, _ = _controller()
|
||||
controller.pause_start_time = 100.0
|
||||
controller.is_paused = False
|
||||
monkeypatch.setattr(state_mod.time, "time", lambda: 120.0)
|
||||
monkeypatch.setattr(state_mod.elapsed_mod, "get_elapsed", lambda *args: 50.0)
|
||||
assert controller._get_current_elapsed() == 50.0
|
||||
assert controller.paused_duration == 20.0
|
||||
assert controller.pause_start_time is None
|
||||
|
||||
|
||||
def test_validate_playback_state_stops_when_process_ended() -> None:
|
||||
"""Ensure state validation stops and reports when process is gone."""
|
||||
controller, messages = _controller()
|
||||
controller.playback_process = cast(Any, Proc(poll_value=1))
|
||||
controller.is_playing = True
|
||||
controller.current_file_path = Path("book.aax")
|
||||
ok = controller._validate_playback_state(require_paused=False)
|
||||
assert ok is False
|
||||
assert messages[-1] == "Playback process has ended"
|
||||
|
||||
|
||||
def test_send_signal_sets_paused_state_and_notifies(monkeypatch) -> None:
|
||||
"""Ensure SIGSTOP updates paused state and includes filename in status."""
|
||||
controller, messages = _controller()
|
||||
controller.playback_process = cast(Any, Proc())
|
||||
controller.current_file_path = Path("book.aax")
|
||||
monkeypatch.setattr(state_mod.process_mod, "send_signal", lambda proc, sig: None)
|
||||
controller._send_signal(state_mod.signal.SIGSTOP, "Paused", "pause")
|
||||
assert controller.is_paused is True
|
||||
assert messages[-1] == "Paused: book.aax"
|
||||
|
||||
|
||||
def test_send_signal_handles_process_lookup(monkeypatch) -> None:
|
||||
"""Ensure missing process lookup errors are handled with user-facing message."""
|
||||
controller, messages = _controller()
|
||||
controller.playback_process = cast(Any, Proc())
|
||||
|
||||
def raise_lookup(proc, sig):
|
||||
"""Raise process lookup error to exercise exception path."""
|
||||
del proc, sig
|
||||
raise ProcessLookupError("gone")
|
||||
|
||||
monkeypatch.setattr(state_mod.process_mod, "send_signal", raise_lookup)
|
||||
monkeypatch.setattr(state_mod.process_mod, "terminate_process", lambda proc: None)
|
||||
controller._send_signal(state_mod.signal.SIGCONT, "Playing", "resume")
|
||||
assert messages[-1] == "Process no longer exists"
|
||||
Reference in New Issue
Block a user