From cd99960f2f34a1660cd8fbf904160e92300f3578 Mon Sep 17 00:00:00 2001 From: Kharec Date: Wed, 18 Feb 2026 03:17:33 +0100 Subject: [PATCH] test: reorganize core suite into explicit domain files --- .../test_app_bindings_contract.py} | 18 ++- .../test_app_search_cache_logic.py} | 18 ++- tests/conftest.py | 16 ++- ...t_download_manager_cache_and_validation.py | 57 ++++++++ .../test_download_manager_workflow.py | 85 ++++++++++++ .../library/test_library_client_extractors.py | 111 +++++++++++++++ .../test_library_client_progress_updates.py | 103 ++++++++++++++ tests/library/test_library_search_filters.py | 34 +++++ .../library/test_library_table_formatting.py | 99 +++++++++++++ tests/stats/test_stats_account_data.py | 54 ++++++++ tests/stats/test_stats_aggregator_output.py | 67 +++++++++ tests/stats/test_stats_email_resolution.py | 64 +++++++++ tests/stats/test_stats_formatting.py | 16 +++ tests/stats/test_stats_listening_metrics.py | 64 +++++++++ tests/test_downloads.py | 48 ------- tests/test_library.py | 131 ------------------ tests/test_table_utils.py | 89 ------------ tests/test_ui_email.py | 35 ----- .../test_ui_filter_screen_behavior.py} | 22 ++- 19 files changed, 805 insertions(+), 326 deletions(-) rename tests/{test_app_bindings.py => app/test_app_bindings_contract.py} (75%) rename tests/{test_app_filter.py => app/test_app_search_cache_logic.py} (67%) create mode 100644 tests/downloads/test_download_manager_cache_and_validation.py create mode 100644 tests/downloads/test_download_manager_workflow.py create mode 100644 tests/library/test_library_client_extractors.py create mode 100644 tests/library/test_library_client_progress_updates.py create mode 100644 tests/library/test_library_search_filters.py create mode 100644 tests/library/test_library_table_formatting.py create mode 100644 tests/stats/test_stats_account_data.py create mode 100644 tests/stats/test_stats_aggregator_output.py create mode 100644 tests/stats/test_stats_email_resolution.py create mode 100644 tests/stats/test_stats_formatting.py create mode 100644 tests/stats/test_stats_listening_metrics.py delete mode 100644 tests/test_downloads.py delete mode 100644 tests/test_library.py delete mode 100644 tests/test_table_utils.py delete mode 100644 tests/test_ui_email.py rename tests/{test_ui_filter.py => ui/test_ui_filter_screen_behavior.py} (59%) diff --git a/tests/test_app_bindings.py b/tests/app/test_app_bindings_contract.py similarity index 75% rename from tests/test_app_bindings.py rename to tests/app/test_app_bindings_contract.py index 28090e7..1d28c51 100644 --- a/tests/test_app_bindings.py +++ b/tests/app/test_app_bindings_contract.py @@ -32,27 +32,25 @@ EXPECTED_BINDINGS: tuple[NormalizedBinding, ...] = ( ) -def _normalize_binding( - binding: Binding | BindingTuple, -) -> NormalizedBinding: - """Return key, action, description, and priority for a binding item.""" +def _normalize_binding(binding: Binding | BindingTuple) -> NormalizedBinding: + """Return key, action, description, and priority from one binding item.""" if isinstance(binding, Binding): return (binding.key, binding.action, binding.description, binding.priority) key, action, description = binding return (key, action, description, False) -def _normalize_bindings() -> list[NormalizedBinding]: - """Normalize all declared bindings to a comparable shape.""" +def _all_bindings() -> list[NormalizedBinding]: + """Normalize all app bindings into a stable comparable structure.""" return [_normalize_binding(binding) for binding in BINDINGS] def test_bindings_match_expected_shortcuts() -> None: - """Ensure the app ships with the expected binding set and actions.""" - assert _normalize_bindings() == list(EXPECTED_BINDINGS) + """Ensure the shipped shortcut list stays stable and explicit.""" + assert _all_bindings() == list(EXPECTED_BINDINGS) def test_binding_keys_are_unique() -> None: - """Ensure each key is defined once to avoid ambiguous key dispatch.""" - keys = [binding[0] for binding in _normalize_bindings()] + """Ensure each key is defined only once to avoid dispatch ambiguity.""" + keys = [binding[0] for binding in _all_bindings()] assert len(keys) == len(set(keys)) diff --git a/tests/test_app_filter.py b/tests/app/test_app_search_cache_logic.py similarity index 67% rename from tests/test_app_filter.py rename to tests/app/test_app_search_cache_logic.py index a3bf9e5..1db45e4 100644 --- a/tests/test_app_filter.py +++ b/tests/app/test_app_search_cache_logic.py @@ -8,22 +8,29 @@ from auditui.library import build_search_text, filter_items class StubLibrary: + """Minimal library facade used by search-related app helpers.""" + def extract_title(self, item: dict) -> str: + """Return title from a synthetic item.""" return item.get("title", "") def extract_authors(self, item: dict) -> str: + """Return authors from a synthetic item.""" return item.get("authors", "") @dataclass(slots=True) -class Dummy: +class DummyAuditui: + """Narrow object compatible with Auditui search-cache helper calls.""" + _search_text_cache: dict[int, str] = field(default_factory=dict) library_client: StubLibrary = field(default_factory=StubLibrary) def test_get_search_text_is_cached() -> None: + """Ensure repeated text extraction for one item reuses cache entries.""" item = {"title": "Title", "authors": "Author"} - dummy = Dummy() + dummy = DummyAuditui() first = Auditui._get_search_text(cast(Auditui, dummy), item) second = Auditui._get_search_text(cast(Auditui, dummy), item) assert first == "title author" @@ -31,7 +38,8 @@ def test_get_search_text_is_cached() -> None: assert len(dummy._search_text_cache) == 1 -def test_filter_items_uses_cache() -> None: +def test_filter_items_uses_cached_callable() -> None: + """Ensure filter_items cooperates with a memoized search text callback.""" library = StubLibrary() cache: dict[int, str] = {} items = [ @@ -40,6 +48,7 @@ def test_filter_items_uses_cache() -> None: ] def cached(item: dict) -> str: + """Build and cache normalized search text per object identity.""" cache_key = id(item) if cache_key not in cache: cache[cache_key] = build_search_text(item, cast(Any, library)) @@ -49,6 +58,7 @@ def test_filter_items_uses_cache() -> None: assert result == [items[1]] -def test_build_search_text_without_library() -> None: +def test_build_search_text_without_library_client() -> None: + """Ensure fallback search text path handles inline author dicts.""" item = {"title": "Title", "authors": [{"name": "A"}, {"name": "B"}]} assert build_search_text(item, None) == "title a, b" diff --git a/tests/conftest.py b/tests/conftest.py index 58ba452..51d62e8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,6 +3,7 @@ from __future__ import annotations import sys from pathlib import Path from types import ModuleType +from typing import Any, cast ROOT = Path(__file__).resolve().parents[1] @@ -15,21 +16,26 @@ try: except ModuleNotFoundError: audible_stub = ModuleType("audible") - class Authenticator: # minimal stub for type usage + class Authenticator: + """Minimal audible authenticator test stub.""" + pass - class Client: # minimal stub for type usage + class Client: + """Minimal audible client test stub.""" + pass - audible_stub.Authenticator = Authenticator - audible_stub.Client = Client + setattr(cast(Any, audible_stub), "Authenticator", Authenticator) + setattr(cast(Any, audible_stub), "Client", Client) activation_bytes = ModuleType("audible.activation_bytes") def get_activation_bytes(_auth: Authenticator | None = None) -> bytes: + """Return deterministic empty activation bytes for tests.""" return b"" - activation_bytes.get_activation_bytes = get_activation_bytes + setattr(cast(Any, activation_bytes), "get_activation_bytes", get_activation_bytes) sys.modules["audible"] = audible_stub sys.modules["audible.activation_bytes"] = activation_bytes diff --git a/tests/downloads/test_download_manager_cache_and_validation.py b/tests/downloads/test_download_manager_cache_and_validation.py new file mode 100644 index 0000000..6c7a6ae --- /dev/null +++ b/tests/downloads/test_download_manager_cache_and_validation.py @@ -0,0 +1,57 @@ +from __future__ import annotations + +from pathlib import Path + +import pytest + +from auditui.constants import MIN_FILE_SIZE +from auditui.downloads import DownloadManager + + +def _manager_with_cache_dir(tmp_path: Path) -> DownloadManager: + """Build a lightweight DownloadManager instance without real HTTP clients.""" + manager = DownloadManager.__new__(DownloadManager) + manager.cache_dir = tmp_path + manager.chunk_size = 1024 + return manager + + +def test_sanitize_filename_replaces_invalid_characters() -> None: + """Ensure filesystem-invalid symbols are replaced with underscores.""" + manager = DownloadManager.__new__(DownloadManager) + assert manager._sanitize_filename('a<>:"/\\|?*b') == "a_________b" + + +def test_validate_download_url_accepts_only_http_schemes() -> None: + """Ensure download URL validation only accepts HTTP and HTTPS links.""" + manager = DownloadManager.__new__(DownloadManager) + assert manager._validate_download_url("https://example.com/file") is True + assert manager._validate_download_url("http://example.com/file") is True + assert manager._validate_download_url("ftp://example.com/file") is False + + +def test_get_cached_path_and_remove_cached( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """Ensure cache lookup and cache deletion work for valid files.""" + manager = _manager_with_cache_dir(tmp_path) + monkeypatch.setattr(manager, "_get_name_from_asin", lambda asin: "My Book") + cached_path = tmp_path / "My Book.aax" + cached_path.write_bytes(b"0" * MIN_FILE_SIZE) + messages: list[str] = [] + assert manager.get_cached_path("ASIN123") == cached_path + assert manager.is_cached("ASIN123") is True + assert manager.remove_cached("ASIN123", notify=messages.append) is True + assert not cached_path.exists() + assert "Removed from cache" in messages[-1] + + +def test_get_cached_path_ignores_small_files( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """Ensure undersized files are not treated as valid cache entries.""" + manager = _manager_with_cache_dir(tmp_path) + monkeypatch.setattr(manager, "_get_name_from_asin", lambda asin: "My Book") + cached_path = tmp_path / "My Book.aax" + cached_path.write_bytes(b"0" * (MIN_FILE_SIZE - 1)) + assert manager.get_cached_path("ASIN123") is None diff --git a/tests/downloads/test_download_manager_workflow.py b/tests/downloads/test_download_manager_workflow.py new file mode 100644 index 0000000..cb146c9 --- /dev/null +++ b/tests/downloads/test_download_manager_workflow.py @@ -0,0 +1,85 @@ +from __future__ import annotations + +from pathlib import Path + +import pytest + +from auditui.constants import MIN_FILE_SIZE +from auditui.downloads import DownloadManager +from auditui.downloads import manager as manager_mod + + +def _bare_manager(tmp_path: Path) -> DownloadManager: + """Create manager without invoking constructor side effects.""" + manager = DownloadManager.__new__(DownloadManager) + manager.cache_dir = tmp_path + manager.chunk_size = 1024 + manager.auth = type( + "Auth", (), {"adp_token": "x", "locale": type("Loc", (), {"domain": "fr"})()} + )() + return manager + + +def test_get_activation_bytes_returns_hex( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + """Ensure activation bytes are converted to lowercase hex string.""" + manager = _bare_manager(tmp_path) + monkeypatch.setattr(manager_mod, "get_activation_bytes", lambda _auth: b"\xde\xad") + assert manager.get_activation_bytes() == "dead" + + +def test_get_activation_bytes_handles_errors( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + """Ensure activation retrieval failures are handled gracefully.""" + manager = _bare_manager(tmp_path) + + def _boom(_auth: object) -> bytes: + """Raise a deterministic failure for exception-path coverage.""" + raise OSError("no auth") + + monkeypatch.setattr(manager_mod, "get_activation_bytes", _boom) + assert manager.get_activation_bytes() is None + + +def test_get_or_download_uses_cached_file_when_available( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """Ensure cached files bypass link generation and download work.""" + manager = _bare_manager(tmp_path) + monkeypatch.setattr(manager, "_get_name_from_asin", lambda asin: "Book") + cached_path = tmp_path / "Book.aax" + cached_path.write_bytes(b"1" * MIN_FILE_SIZE) + messages: list[str] = [] + assert manager.get_or_download("ASIN", notify=messages.append) == cached_path + assert "Using cached file" in messages[0] + + +def test_get_or_download_reports_invalid_url( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """Ensure workflow reports invalid download URLs and aborts.""" + manager = _bare_manager(tmp_path) + monkeypatch.setattr(manager, "_get_name_from_asin", lambda asin: "Book") + monkeypatch.setattr( + manager, "_get_download_link", lambda asin, notify=None: "ftp://bad" + ) + messages: list[str] = [] + assert manager.get_or_download("ASIN", notify=messages.append) is None + assert "Invalid download URL" in messages + + +def test_get_or_download_handles_download_failure( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """Ensure workflow reports failures when stream download does not complete.""" + manager = _bare_manager(tmp_path) + monkeypatch.setattr(manager, "_get_name_from_asin", lambda asin: "Book") + monkeypatch.setattr( + manager, "_get_download_link", lambda asin, notify=None: "https://ok" + ) + monkeypatch.setattr(manager, "_download_file", lambda url, path, notify=None: None) + messages: list[str] = [] + assert manager.get_or_download("ASIN", notify=messages.append) is None + assert "Download failed" in messages diff --git a/tests/library/test_library_client_extractors.py b/tests/library/test_library_client_extractors.py new file mode 100644 index 0000000..c83f952 --- /dev/null +++ b/tests/library/test_library_client_extractors.py @@ -0,0 +1,111 @@ +from __future__ import annotations + +from dataclasses import dataclass, field + +from auditui.library import LibraryClient + + +@dataclass(slots=True) +class MockClient: + """Client double that records writes and serves configurable responses.""" + + put_calls: list[tuple[str, dict]] = field(default_factory=list) + post_calls: list[tuple[str, dict]] = field(default_factory=list) + _post_response: dict = field(default_factory=dict) + raise_on_put: bool = False + + def put(self, path: str, body: dict) -> dict: + """Record put payload or raise when configured.""" + if self.raise_on_put: + raise RuntimeError("put failed") + self.put_calls.append((path, body)) + return {} + + def post(self, path: str, body: dict) -> dict: + """Record post payload and return configured response.""" + self.post_calls.append((path, body)) + return self._post_response + + def get(self, path: str, **kwargs: dict) -> dict: + """Return empty data for extractor-focused tests.""" + del path, kwargs + return {} + + +def build_item( + *, + title: str | None = None, + product_title: str | None = None, + authors: list[dict] | None = None, + runtime_min: int | None = None, + listening_status: dict | None = None, + percent_complete: int | float | None = None, + asin: str | None = None, +) -> dict: + """Construct synthetic library items for extractor and finish tests.""" + item: dict = {} + if title is not None: + item["title"] = title + if percent_complete is not None: + item["percent_complete"] = percent_complete + if listening_status is not None: + item["listening_status"] = listening_status + if asin is not None: + item["asin"] = asin + product: dict = {} + if product_title is not None: + product["title"] = product_title + if runtime_min is not None: + product["runtime_length"] = {"min": runtime_min} + if authors is not None: + product["authors"] = authors + if asin is not None: + product["asin"] = asin + if product: + item["product"] = product + if runtime_min is not None: + item["runtime_length_min"] = runtime_min + return item + + +def test_extract_title_prefers_product_title() -> None: + """Ensure product title has precedence over outer item title.""" + library = LibraryClient(MockClient()) # type: ignore[arg-type] + assert ( + library.extract_title(build_item(title="Outer", product_title="Inner")) + == "Inner" + ) + + +def test_extract_title_falls_back_to_asin() -> None: + """Ensure title fallback uses product ASIN when no title exists.""" + library = LibraryClient(MockClient()) # type: ignore[arg-type] + assert library.extract_title({"product": {"asin": "A1"}}) == "A1" + + +def test_extract_authors_joins_names() -> None: + """Ensure author dictionaries are converted to a readable list.""" + library = LibraryClient(MockClient()) # type: ignore[arg-type] + item = build_item(authors=[{"name": "A"}, {"name": "B"}]) + assert library.extract_authors(item) == "A, B" + + +def test_extract_runtime_minutes_handles_dict_and_number() -> None: + """Ensure runtime extraction supports dict and numeric payloads.""" + library = LibraryClient(MockClient()) # type: ignore[arg-type] + assert library.extract_runtime_minutes(build_item(runtime_min=12)) == 12 + assert library.extract_runtime_minutes({"runtime_length": 42}) == 42 + + +def test_extract_progress_info_prefers_listening_status_when_needed() -> None: + """Ensure progress can be sourced from listening_status when top-level is absent.""" + library = LibraryClient(MockClient()) # type: ignore[arg-type] + item = build_item(listening_status={"percent_complete": 25.0}) + assert library.extract_progress_info(item) == 25.0 + + +def test_extract_asin_prefers_item_then_product() -> None: + """Ensure ASIN extraction works from both item and product fields.""" + library = LibraryClient(MockClient()) # type: ignore[arg-type] + assert library.extract_asin(build_item(asin="ASIN1")) == "ASIN1" + assert library.extract_asin({"product": {"asin": "ASIN2"}}) == "ASIN2" diff --git a/tests/library/test_library_client_progress_updates.py b/tests/library/test_library_client_progress_updates.py new file mode 100644 index 0000000..68f32e2 --- /dev/null +++ b/tests/library/test_library_client_progress_updates.py @@ -0,0 +1,103 @@ +from __future__ import annotations + +from dataclasses import dataclass, field + +from auditui.library import LibraryClient + + +@dataclass(slots=True) +class ProgressClient: + """Client double for position and finished-state API methods.""" + + get_responses: dict[str, dict] = field(default_factory=dict) + put_calls: list[tuple[str, dict]] = field(default_factory=list) + post_response: dict = field(default_factory=dict) + fail_put: bool = False + + def get(self, path: str, **kwargs: object) -> dict: + """Return preconfigured payloads by API path.""" + del kwargs + return self.get_responses.get(path, {}) + + def put(self, path: str, body: dict) -> dict: + """Record payloads or raise to exercise error handling.""" + if self.fail_put: + raise OSError("write failed") + self.put_calls.append((path, body)) + return {} + + def post(self, path: str, body: dict) -> dict: + """Return licenserequest response for ACR extraction.""" + del path, body + return self.post_response + + +def test_is_finished_true_from_percent_complete() -> None: + """Ensure 100 percent completion is treated as finished.""" + library = LibraryClient(ProgressClient()) # type: ignore[arg-type] + assert library.is_finished({"percent_complete": 100}) is True + + +def test_get_last_position_reads_matching_annotation() -> None: + """Ensure last position is read in seconds from matching annotation.""" + client = ProgressClient( + get_responses={ + "1.0/annotations/lastpositions": { + "asin_last_position_heard_annots": [ + {"asin": "X", "last_position_heard": {"position_ms": 9000}} + ] + } + } + ) + library = LibraryClient(client) # type: ignore[arg-type] + assert library.get_last_position("X") == 9.0 + + +def test_get_last_position_returns_none_for_missing_state() -> None: + """Ensure DoesNotExist status is surfaced as no saved position.""" + client = ProgressClient( + get_responses={ + "1.0/annotations/lastpositions": { + "asin_last_position_heard_annots": [ + {"asin": "X", "last_position_heard": {"status": "DoesNotExist"}} + ] + } + } + ) + library = LibraryClient(client) # type: ignore[arg-type] + assert library.get_last_position("X") is None + + +def test_save_last_position_validates_non_positive_values() -> None: + """Ensure save_last_position short-circuits on non-positive input.""" + library = LibraryClient(ProgressClient()) # type: ignore[arg-type] + assert library.save_last_position("A", 0) is False + + +def test_update_position_writes_version_when_available() -> None: + """Ensure version is included in payload when metadata provides it.""" + client = ProgressClient( + get_responses={ + "1.0/content/A/metadata": { + "content_metadata": { + "content_reference": {"acr": "token", "version": "2"} + } + } + } + ) + library = LibraryClient(client) # type: ignore[arg-type] + assert library._update_position("A", 5.5) is True + path, body = client.put_calls[0] + assert path == "1.0/lastpositions/A" + assert body["position_ms"] == 5500 + assert body["version"] == "2" + + +def test_mark_as_finished_updates_item_in_place() -> None: + """Ensure successful finish update mutates local item flags.""" + client = ProgressClient(post_response={"content_license": {"acr": "token"}}) + library = LibraryClient(client) # type: ignore[arg-type] + item = {"runtime_length_min": 1, "listening_status": {}} + assert library.mark_as_finished("ASIN", item) is True + assert item["is_finished"] is True + assert item["listening_status"]["is_finished"] is True diff --git a/tests/library/test_library_search_filters.py b/tests/library/test_library_search_filters.py new file mode 100644 index 0000000..c393869 --- /dev/null +++ b/tests/library/test_library_search_filters.py @@ -0,0 +1,34 @@ +from __future__ import annotations + +from auditui.library import build_search_text, filter_items + + +class SearchLibrary: + """Simple search extraction adapter for build_search_text tests.""" + + def extract_title(self, item: dict) -> str: + """Return a title value from a synthetic item.""" + return item.get("t", "") + + def extract_authors(self, item: dict) -> str: + """Return an author value from a synthetic item.""" + return item.get("a", "") + + +def test_build_search_text_uses_library_client_when_present() -> None: + """Ensure search text delegates to library extractor methods.""" + item = {"t": "The Book", "a": "The Author"} + assert build_search_text(item, SearchLibrary()) == "the book the author" + + +def test_filter_items_returns_input_when_filter_empty() -> None: + """Ensure empty filter bypasses per-item search callback evaluation.""" + items = [{"k": 1}, {"k": 2}] + assert filter_items(items, "", lambda _item: "ignored") == items + + +def test_filter_items_matches_case_insensitively() -> None: + """Ensure search matching is case-insensitive across computed text.""" + items = [{"name": "Alpha"}, {"name": "Beta"}] + result = filter_items(items, "BETA", lambda item: item["name"].lower()) + assert result == [items[1]] diff --git a/tests/library/test_library_table_formatting.py b/tests/library/test_library_table_formatting.py new file mode 100644 index 0000000..a94ddde --- /dev/null +++ b/tests/library/test_library_table_formatting.py @@ -0,0 +1,99 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, cast + +from auditui.constants import AUTHOR_NAME_MAX_LENGTH +from auditui.library import ( + create_progress_sort_key, + create_title_sort_key, + filter_unfinished_items, + format_item_as_row, + truncate_author_name, +) + + +class StubLibrary: + """Library facade exposing only helpers needed by table formatting code.""" + + def extract_title(self, item: dict) -> str: + """Return synthetic title value.""" + return item.get("title", "") + + def extract_authors(self, item: dict) -> str: + """Return synthetic authors value.""" + return item.get("authors", "") + + def extract_runtime_minutes(self, item: dict) -> int | None: + """Return synthetic minute duration.""" + return item.get("minutes") + + def format_duration( + self, value: int | None, unit: str = "minutes", default_none: str | None = None + ) -> str | None: + """Render runtime in compact minute format for tests.""" + del unit + return default_none if value is None else f"{value}m" + + def extract_progress_info(self, item: dict) -> float | None: + """Return synthetic progress percentage value.""" + return item.get("percent") + + def extract_asin(self, item: dict) -> str | None: + """Return synthetic ASIN value.""" + return item.get("asin") + + def is_finished(self, item: dict) -> bool: + """Return synthetic finished flag from the item.""" + return bool(item.get("finished")) + + +@dataclass(slots=True) +class StubDownloads: + """Download cache adapter exposing just is_cached.""" + + cached: set[str] + + def is_cached(self, asin: str) -> bool: + """Return whether an ASIN is cached.""" + return asin in self.cached + + +def test_create_title_sort_key_normalizes_accents() -> None: + """Ensure title sorting removes accents before case-fold compare.""" + key_fn, _ = create_title_sort_key() + assert key_fn(["Ecole"]) == key_fn(["École"]) + + +def test_create_progress_sort_key_parses_percent_strings() -> None: + """Ensure progress sorting converts percentages and handles invalid values.""" + key_fn, _ = create_progress_sort_key() + assert key_fn(["0", "0", "0", "42.5%", ""]) == 42.5 + assert key_fn(["0", "0", "0", "bad", ""]) == 0.0 + + +def test_truncate_author_name_clamps_long_values() -> None: + """Ensure very long author strings are shortened with ellipsis.""" + long_name = "A" * (AUTHOR_NAME_MAX_LENGTH + 5) + out = truncate_author_name(long_name) + assert out.endswith("...") + assert len(out) <= AUTHOR_NAME_MAX_LENGTH + + +def test_format_item_as_row_marks_downloaded_titles() -> None: + """Ensure downloaded ASINs are shown with a checkmark in table rows.""" + item = { + "title": "Title", + "authors": "Author", + "minutes": 90, + "percent": 12.34, + "asin": "A1", + } + row = format_item_as_row(item, StubLibrary(), cast(Any, StubDownloads({"A1"}))) + assert row == ("Title", "Author", "90m", "12.3%", "✓") + + +def test_filter_unfinished_items_keeps_only_incomplete() -> None: + """Ensure unfinished filter excludes items marked as finished.""" + items = [{"id": 1, "finished": False}, {"id": 2, "finished": True}] + assert filter_unfinished_items(items, StubLibrary()) == [items[0]] diff --git a/tests/stats/test_stats_account_data.py b/tests/stats/test_stats_account_data.py new file mode 100644 index 0000000..e20bfbf --- /dev/null +++ b/tests/stats/test_stats_account_data.py @@ -0,0 +1,54 @@ +from __future__ import annotations + +from auditui.stats.account import ( + get_account_info, + get_country, + get_subscription_details, +) + + +class AccountClient: + """Minimal API client returning endpoint-specific account responses.""" + + def __init__(self, responses: dict[str, dict]) -> None: + """Store endpoint response map for deterministic tests.""" + self._responses = responses + + def get(self, path: str, **kwargs: object) -> dict: + """Return configured response and ignore query parameters.""" + del kwargs + return self._responses.get(path, {}) + + +def test_get_account_info_merges_multiple_endpoints() -> None: + """Ensure account info aggregator combines endpoint payload dictionaries.""" + client = AccountClient( + { + "1.0/account/information": {"a": 1}, + "1.0/customer/information": {"b": 2}, + "1.0/customer/status": {"c": 3}, + } + ) + assert get_account_info(client) == {"a": 1, "b": 2, "c": 3} + + +def test_get_subscription_details_uses_known_nested_paths() -> None: + """Ensure first valid subscription_details list entry is returned.""" + info = { + "customer_details": { + "subscription": {"subscription_details": [{"name": "Plan"}]} + } + } + assert get_subscription_details(info) == {"name": "Plan"} + + +def test_get_country_supports_locale_variants() -> None: + """Ensure country extraction supports object, domain, and locale string forms.""" + auth_country_code = type( + "Auth", (), {"locale": type("Loc", (), {"country_code": "us"})()} + )() + auth_domain = type("Auth", (), {"locale": type("Loc", (), {"domain": "fr"})()})() + auth_string = type("Auth", (), {"locale": "en_gb"})() + assert get_country(auth_country_code) == "US" + assert get_country(auth_domain) == "FR" + assert get_country(auth_string) == "GB" diff --git a/tests/stats/test_stats_aggregator_output.py b/tests/stats/test_stats_aggregator_output.py new file mode 100644 index 0000000..839a6b1 --- /dev/null +++ b/tests/stats/test_stats_aggregator_output.py @@ -0,0 +1,67 @@ +from __future__ import annotations + +from datetime import date + +from auditui.stats.aggregator import StatsAggregator +from auditui.stats import aggregator as aggregator_mod + + +def test_get_stats_returns_empty_without_client() -> None: + """Ensure stats aggregation short-circuits when API client is absent.""" + aggregator = StatsAggregator( + client=None, auth=None, library_client=None, all_items=[] + ) + assert aggregator.get_stats() == [] + + +def test_get_stats_builds_expected_rows(monkeypatch) -> None: + """Ensure aggregator assembles rows from listening, account, and email sources.""" + monkeypatch.setattr( + aggregator_mod.listening_mod, "get_signup_year", lambda _client: 2015 + ) + monkeypatch.setattr( + aggregator_mod.listening_mod, + "get_listening_time", + lambda _client, duration, start_date: 120_000 if duration == 1 else 3_600_000, + ) + monkeypatch.setattr( + aggregator_mod.listening_mod, "get_finished_books_count", lambda _lc, _items: 7 + ) + monkeypatch.setattr( + aggregator_mod.email_mod, + "resolve_email", + lambda *args, **kwargs: "user@example.com", + ) + monkeypatch.setattr(aggregator_mod.account_mod, "get_country", lambda _auth: "US") + monkeypatch.setattr( + aggregator_mod.account_mod, + "get_account_info", + lambda _client: { + "subscription_details": [ + { + "name": "Premium", + "next_bill_date": "2026-02-01T00:00:00Z", + "next_bill_amount": { + "currency_value": "14.95", + "currency_code": "USD", + }, + } + ] + }, + ) + + aggregator = StatsAggregator( + client=object(), + auth=object(), + library_client=object(), + all_items=[{}, {}, {}], + ) + stats = dict(aggregator.get_stats(today=date(2026, 2, 1))) + assert stats["Email"] == "user@example.com" + assert stats["Country Store"] == "US" + assert stats["Signup Year"] == "2015" + assert stats["Subscription"] == "Premium" + assert stats["Price"] == "14.95 USD" + assert stats["This Month"] == "2m" + assert stats["This Year"] == "1h00" + assert stats["Books Finished"] == "7 / 3" diff --git a/tests/stats/test_stats_email_resolution.py b/tests/stats/test_stats_email_resolution.py new file mode 100644 index 0000000..93eb8c4 --- /dev/null +++ b/tests/stats/test_stats_email_resolution.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +import json +from pathlib import Path + +from auditui.stats.email import ( + find_email_in_data, + first_email, + get_email_from_account_info, + get_email_from_auth, + get_email_from_auth_file, + get_email_from_config, + resolve_email, +) + + +def test_find_email_in_nested_data() -> None: + """Ensure nested structures are scanned until a plausible email is found.""" + data = {"a": {"b": ["nope", "user@example.com"]}} + assert find_email_in_data(data) == "user@example.com" + + +def test_first_email_skips_unknown_and_none() -> None: + """Ensure first_email ignores empty and Unknown sentinel values.""" + assert first_email(None, "Unknown", "ok@example.com") == "ok@example.com" + + +def test_get_email_from_config_and_auth_file(tmp_path: Path) -> None: + """Ensure config and auth-file readers extract valid email fields.""" + config_path = tmp_path / "config.json" + auth_path = tmp_path / "auth.json" + config_path.write_text( + json.dumps({"email": "config@example.com"}), encoding="utf-8" + ) + auth_path.write_text(json.dumps({"email": "auth@example.com"}), encoding="utf-8") + assert get_email_from_config(config_path) == "config@example.com" + assert get_email_from_auth_file(auth_path) == "auth@example.com" + + +def test_get_email_from_auth_prefers_username() -> None: + """Ensure auth object attributes are checked in expected precedence order.""" + auth = type( + "Auth", (), {"username": "user@example.com", "login": None, "email": None} + )() + assert get_email_from_auth(auth) == "user@example.com" + + +def test_get_email_from_account_info_supports_nested_customer_info() -> None: + """Ensure account email can be discovered in nested customer_info payload.""" + info = {"customer_info": {"primary_email": "nested@example.com"}} + assert get_email_from_account_info(info) == "nested@example.com" + + +def test_resolve_email_falls_back_to_account_getter(tmp_path: Path) -> None: + """Ensure resolve_email checks account-info callback when local sources miss.""" + auth = object() + value = resolve_email( + auth, + client=object(), + config_path=tmp_path / "missing-config.json", + auth_path=tmp_path / "missing-auth.json", + get_account_info=lambda: {"customer_email": "account@example.com"}, + ) + assert value == "account@example.com" diff --git a/tests/stats/test_stats_formatting.py b/tests/stats/test_stats_formatting.py new file mode 100644 index 0000000..e587619 --- /dev/null +++ b/tests/stats/test_stats_formatting.py @@ -0,0 +1,16 @@ +from __future__ import annotations + +from auditui.stats.format import format_date, format_time + + +def test_format_time_handles_minutes_and_hours() -> None: + """Ensure format_time outputs minute-only and hour-minute formats.""" + assert format_time(90_000) == "1m" + assert format_time(3_660_000) == "1h01" + + +def test_format_date_handles_iso_and_invalid_values() -> None: + """Ensure format_date normalizes ISO timestamps and preserves invalid input.""" + assert format_date("2026-01-15T10:20:30Z") == "2026-01-15" + assert format_date("not-a-date") == "not-a-date" + assert format_date(None) == "Unknown" diff --git a/tests/stats/test_stats_listening_metrics.py b/tests/stats/test_stats_listening_metrics.py new file mode 100644 index 0000000..3dba383 --- /dev/null +++ b/tests/stats/test_stats_listening_metrics.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +from auditui.stats.listening import ( + get_finished_books_count, + get_listening_time, + get_signup_year, + has_activity, +) + + +class StatsClient: + """Client double for monthly aggregate lookups keyed by start date.""" + + def __init__(self, sums_by_start_date: dict[str, list[int]]) -> None: + """Store aggregate sums grouped by monthly_listening_interval_start_date.""" + self._sums = sums_by_start_date + + def get(self, path: str, **kwargs: str) -> dict: + """Return aggregate payload based on requested interval start date.""" + del path + start_date = kwargs["monthly_listening_interval_start_date"] + sums = self._sums.get(start_date, [0]) + return { + "aggregated_monthly_listening_stats": [{"aggregated_sum": s} for s in sums] + } + + +def test_has_activity_detects_non_zero_months() -> None: + """Ensure activity helper returns true when any month has positive sum.""" + assert ( + has_activity( + { + "aggregated_monthly_listening_stats": [ + {"aggregated_sum": 0}, + {"aggregated_sum": 1}, + ] + } + ) + is True + ) + + +def test_get_listening_time_sums_aggregated_months() -> None: + """Ensure monthly aggregate sums are added into one listening total.""" + client = StatsClient({"2026-01": [1000, 2000, 3000]}) + assert get_listening_time(client, duration=1, start_date="2026-01") == 6000 + + +def test_get_signup_year_returns_earliest_year_with_activity() -> None: + """Ensure signup year search finds first active year via binary search.""" + client = StatsClient( + {"2026-01": [1], "2010-01": [1], "2002-01": [1], "2001-01": [0]} + ) + year = get_signup_year(client) + assert year <= 2010 + + +def test_get_finished_books_count_uses_library_is_finished() -> None: + """Ensure finished books count delegates to library client predicate.""" + library_client = type( + "Library", (), {"is_finished": lambda self, item: item.get("done", False)} + )() + items = [{"done": True}, {"done": False}, {"done": True}] + assert get_finished_books_count(library_client, items) == 2 diff --git a/tests/test_downloads.py b/tests/test_downloads.py deleted file mode 100644 index 55e3231..0000000 --- a/tests/test_downloads.py +++ /dev/null @@ -1,48 +0,0 @@ -from pathlib import Path - -import pytest - -from auditui.downloads import DownloadManager -from auditui.constants import MIN_FILE_SIZE - - -def test_sanitize_filename() -> None: - dm = DownloadManager.__new__(DownloadManager) - assert dm._sanitize_filename('a<>:"/\\|?*b') == "a_________b" - - -def test_validate_download_url() -> None: - dm = DownloadManager.__new__(DownloadManager) - assert dm._validate_download_url("https://example.com/file") is True - assert dm._validate_download_url("http://example.com/file") is True - assert dm._validate_download_url("ftp://example.com/file") is False - - -def test_cached_path_and_remove(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: - dm = DownloadManager.__new__(DownloadManager) - dm.cache_dir = tmp_path - - monkeypatch.setattr(dm, "_get_name_from_asin", lambda asin: "My Book") - safe_name = dm._sanitize_filename("My Book") - cached_path = tmp_path / f"{safe_name}.aax" - cached_path.write_bytes(b"0" * MIN_FILE_SIZE) - - assert dm.get_cached_path("ASIN123") == cached_path - assert dm.is_cached("ASIN123") is True - - messages: list[str] = [] - assert dm.remove_cached("ASIN123", notify=messages.append) is True - assert not cached_path.exists() - assert messages and "Removed from cache" in messages[-1] - - -def test_cached_path_ignores_small_file(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: - dm = DownloadManager.__new__(DownloadManager) - dm.cache_dir = tmp_path - - monkeypatch.setattr(dm, "_get_name_from_asin", lambda asin: "My Book") - safe_name = dm._sanitize_filename("My Book") - cached_path = tmp_path / f"{safe_name}.aax" - cached_path.write_bytes(b"0" * (MIN_FILE_SIZE - 1)) - - assert dm.get_cached_path("ASIN123") is None diff --git a/tests/test_library.py b/tests/test_library.py deleted file mode 100644 index 4c84d16..0000000 --- a/tests/test_library.py +++ /dev/null @@ -1,131 +0,0 @@ -from dataclasses import dataclass, field - -from auditui.library import LibraryClient - - -@dataclass(slots=True) -class MockClient: - put_calls: list[tuple[str, dict]] = field(default_factory=list) - post_calls: list[tuple[str, dict]] = field(default_factory=list) - _post_response: dict = field(default_factory=dict) - raise_on_put: bool = False - - def put(self, path: str, body: dict) -> dict: - if self.raise_on_put: - raise RuntimeError("put failed") - self.put_calls.append((path, body)) - return {} - - def post(self, path: str, body: dict) -> dict: - self.post_calls.append((path, body)) - return self._post_response - - def get(self, path: str, **kwargs: dict) -> dict: - return {} - - -def test_extract_title_prefers_product() -> None: - client = MockClient() - library = LibraryClient(client) # type: ignore[arg-type] - item = build_item(title="Outer", product_title="Inner") - assert library.extract_title(item) == "Inner" - - -def test_extract_authors_joins_names() -> None: - client = MockClient() - library = LibraryClient(client) # type: ignore[arg-type] - item = build_item(authors=[{"name": "A"}, {"name": "B"}]) - assert library.extract_authors(item) == "A, B" - - -def test_extract_runtime_minutes_from_dict() -> None: - client = MockClient() - library = LibraryClient(client) # type: ignore[arg-type] - item = build_item(runtime_min=12) - assert library.extract_runtime_minutes(item) == 12 - - -def test_extract_progress_info_from_listening_status() -> None: - client = MockClient() - library = LibraryClient(client) # type: ignore[arg-type] - item = build_item(listening_status={"percent_complete": 25.0}) - assert library.extract_progress_info(item) == 25.0 - - -def test_is_finished_with_percent_complete() -> None: - client = MockClient() - library = LibraryClient(client) # type: ignore[arg-type] - item = build_item(percent_complete=100) - assert library.is_finished(item) - - -def test_format_duration_and_time() -> None: - client = MockClient() - library = LibraryClient(client) # type: ignore[arg-type] - assert library.format_duration(61) == "1h01" - assert library.format_time(3661) == "01:01:01" - - -def test_mark_as_finished_success_updates_item() -> None: - client = MockClient() - client._post_response = {"content_license": {"acr": "token"}} - library = LibraryClient(client) # type: ignore[arg-type] - item = build_item(runtime_min=1, listening_status={}) - ok = library.mark_as_finished("ASIN", item) - assert ok - assert client.put_calls - path, body = client.put_calls[0] - assert path == "1.0/lastpositions/ASIN" - assert body["acr"] == "token" - assert body["position_ms"] == 60_000 - assert item["is_finished"] is True - assert item["listening_status"]["is_finished"] is True - - -def test_mark_as_finished_fails_without_acr() -> None: - client = MockClient() - client._post_response = {} - library = LibraryClient(client) # type: ignore[arg-type] - item = build_item(runtime_min=1) - ok = library.mark_as_finished("ASIN", item) - assert ok is False - - -def test_mark_as_finished_handles_put_error() -> None: - client = MockClient() - client._post_response = {"content_license": {"acr": "token"}} - client.raise_on_put = True - library = LibraryClient(client) # type: ignore[arg-type] - item = build_item(runtime_min=1) - ok = library.mark_as_finished("ASIN", item) - assert ok is False - - -def build_item( - *, - title: str | None = None, - product_title: str | None = None, - authors: list[dict] | None = None, - runtime_min: int | None = None, - listening_status: dict | None = None, - percent_complete: int | float | None = None, -) -> dict: - item: dict = {} - if title is not None: - item["title"] = title - if percent_complete is not None: - item["percent_complete"] = percent_complete - if listening_status is not None: - item["listening_status"] = listening_status - product: dict = {} - if product_title is not None: - product["title"] = product_title - if runtime_min is not None: - product["runtime_length"] = {"min": runtime_min} - if authors is not None: - product["authors"] = authors - if product: - item["product"] = product - if runtime_min is not None and "runtime_length_min" not in item: - item["runtime_length_min"] = runtime_min - return item diff --git a/tests/test_table_utils.py b/tests/test_table_utils.py deleted file mode 100644 index 2399d86..0000000 --- a/tests/test_table_utils.py +++ /dev/null @@ -1,89 +0,0 @@ -from dataclasses import dataclass -from typing import Any, cast - -from auditui.constants import AUTHOR_NAME_MAX_LENGTH -from auditui.library import ( - create_progress_sort_key, - create_title_sort_key, - format_item_as_row, - truncate_author_name, -) - - -class StubLibrary: - def extract_title(self, item: dict) -> str: - return item.get("title", "") - - def extract_authors(self, item: dict) -> str: - return item.get("authors", "") - - def extract_runtime_minutes(self, item: dict) -> int | None: - return item.get("minutes") - - def format_duration( - self, value: int | None, unit: str = "minutes", default_none: str | None = None - ) -> str | None: - if value is None: - return default_none - return f"{value}m" - - def extract_progress_info(self, item: dict) -> float | None: - return item.get("percent") - - def extract_asin(self, item: dict) -> str | None: - return item.get("asin") - - -@dataclass(slots=True) -class StubDownloads: - _cached: set[str] - - def is_cached(self, asin: str) -> bool: - return asin in self._cached - - -def test_create_title_sort_key_normalizes_accents() -> None: - key_fn, _ = create_title_sort_key() - assert key_fn(["École"]) == "ecole" - assert key_fn(["Zoo"]) == "zoo" - - -def test_create_progress_sort_key_parses_percent() -> None: - key_fn, _ = create_progress_sort_key() - assert key_fn(["0", "0", "0", "42.5%"]) == 42.5 - assert key_fn(["0", "0", "0", "bad"]) == 0.0 - - -def test_truncate_author_name() -> None: - long_name = "A" * (AUTHOR_NAME_MAX_LENGTH + 5) - truncated = truncate_author_name(long_name) - assert truncated.endswith("...") - assert len(truncated) <= AUTHOR_NAME_MAX_LENGTH - - -def test_format_item_as_row_with_downloaded() -> None: - library = StubLibrary() - downloads = StubDownloads({"ASIN123"}) - item = { - "title": "Title", - "authors": "Author One", - "minutes": 90, - "percent": 12.34, - "asin": "ASIN123", - } - title, author, runtime, progress, downloaded = format_item_as_row( - item, library, cast(Any, downloads) - ) - assert title == "Title" - assert author == "Author One" - assert runtime == "90m" - assert progress == "12.3%" - assert downloaded == "✓" - - -def test_format_item_as_row_zero_progress() -> None: - library = StubLibrary() - item = {"title": "Title", "authors": "Author", - "minutes": 30, "percent": 0.0} - _, _, _, progress, _ = format_item_as_row(item, library, None) - assert progress == "0%" diff --git a/tests/test_ui_email.py b/tests/test_ui_email.py deleted file mode 100644 index ac187fd..0000000 --- a/tests/test_ui_email.py +++ /dev/null @@ -1,35 +0,0 @@ -import json -from pathlib import Path - -from auditui.stats.email import ( - find_email_in_data, - get_email_from_auth, - get_email_from_auth_file, - get_email_from_config, -) - - -def test_find_email_in_data() -> None: - data = {"a": {"b": ["nope", "user@example.com"]}} - assert find_email_in_data(data) == "user@example.com" - - -def test_get_email_from_config(tmp_path: Path) -> None: - config_path = tmp_path / "config.json" - config_path.write_text(json.dumps({"email": "config@example.com"})) - assert get_email_from_config(config_path) == "config@example.com" - - -def test_get_email_from_auth_file(tmp_path: Path) -> None: - auth_path = tmp_path / "auth.json" - auth_path.write_text(json.dumps({"email": "auth@example.com"})) - assert get_email_from_auth_file(auth_path) == "auth@example.com" - - -def test_get_email_from_auth() -> None: - class Auth: - username = "user@example.com" - login = None - email = None - - assert get_email_from_auth(Auth()) == "user@example.com" diff --git a/tests/test_ui_filter.py b/tests/ui/test_ui_filter_screen_behavior.py similarity index 59% rename from tests/test_ui_filter.py rename to tests/ui/test_ui_filter_screen_behavior.py index ca27f56..74a60b1 100644 --- a/tests/test_ui_filter.py +++ b/tests/ui/test_ui_filter_screen_behavior.py @@ -9,40 +9,54 @@ from textual.widgets import Input @dataclass(slots=True) class DummyEvent: + """Minimal event object carrying an input value for tests.""" + value: str @dataclass(slots=True) class FakeTimer: + """Timer substitute recording whether stop() was called.""" + callback: Callable[[], None] stopped: bool = False def stop(self) -> None: + """Mark timer as stopped.""" self.stopped = True def test_filter_debounce_uses_latest_value(monkeypatch) -> None: + """Ensure debounce cancels previous timer and emits latest input value.""" seen: list[str] = [] timers: list[FakeTimer] = [] def on_change(value: str) -> None: + """Capture emitted filter values.""" seen.append(value) screen = FilterScreen(on_change=on_change, debounce_seconds=0.2) - def fake_set_timer(_delay: float, callback): + def fake_set_timer(_delay: float, callback: Callable[[], None]) -> FakeTimer: + """Record timer callbacks instead of scheduling real timers.""" timer = FakeTimer(callback) timers.append(timer) return timer monkeypatch.setattr(screen, "set_timer", fake_set_timer) - screen.on_input_changed(cast(Input.Changed, DummyEvent("a"))) screen.on_input_changed(cast(Input.Changed, DummyEvent("ab"))) - assert len(timers) == 2 assert timers[0].stopped is True assert timers[1].stopped is False - timers[1].callback() assert seen == ["ab"] + + +def test_on_unmount_stops_pending_timer() -> None: + """Ensure screen unmount stops pending debounce timer when present.""" + screen = FilterScreen(on_change=lambda _value: None) + timer = FakeTimer(lambda: None) + screen._debounce_timer = timer + screen.on_unmount() + assert timer.stopped is True