Massive refactoring #1

Merged
Kharec merged 35 commits from new-architecture into main 2026-02-18 04:29:20 +01:00
19 changed files with 805 additions and 326 deletions
Showing only changes of commit cd99960f2f - Show all commits

View File

@@ -32,27 +32,25 @@ EXPECTED_BINDINGS: tuple[NormalizedBinding, ...] = (
)
def _normalize_binding(
binding: Binding | BindingTuple,
) -> NormalizedBinding:
"""Return key, action, description, and priority for a binding item."""
def _normalize_binding(binding: Binding | BindingTuple) -> NormalizedBinding:
"""Return key, action, description, and priority from one binding item."""
if isinstance(binding, Binding):
return (binding.key, binding.action, binding.description, binding.priority)
key, action, description = binding
return (key, action, description, False)
def _normalize_bindings() -> list[NormalizedBinding]:
"""Normalize all declared bindings to a comparable shape."""
def _all_bindings() -> list[NormalizedBinding]:
"""Normalize all app bindings into a stable comparable structure."""
return [_normalize_binding(binding) for binding in BINDINGS]
def test_bindings_match_expected_shortcuts() -> None:
"""Ensure the app ships with the expected binding set and actions."""
assert _normalize_bindings() == list(EXPECTED_BINDINGS)
"""Ensure the shipped shortcut list stays stable and explicit."""
assert _all_bindings() == list(EXPECTED_BINDINGS)
def test_binding_keys_are_unique() -> None:
"""Ensure each key is defined once to avoid ambiguous key dispatch."""
keys = [binding[0] for binding in _normalize_bindings()]
"""Ensure each key is defined only once to avoid dispatch ambiguity."""
keys = [binding[0] for binding in _all_bindings()]
assert len(keys) == len(set(keys))

View File

@@ -8,22 +8,29 @@ from auditui.library import build_search_text, filter_items
class StubLibrary:
"""Minimal library facade used by search-related app helpers."""
def extract_title(self, item: dict) -> str:
"""Return title from a synthetic item."""
return item.get("title", "")
def extract_authors(self, item: dict) -> str:
"""Return authors from a synthetic item."""
return item.get("authors", "")
@dataclass(slots=True)
class Dummy:
class DummyAuditui:
"""Narrow object compatible with Auditui search-cache helper calls."""
_search_text_cache: dict[int, str] = field(default_factory=dict)
library_client: StubLibrary = field(default_factory=StubLibrary)
def test_get_search_text_is_cached() -> None:
"""Ensure repeated text extraction for one item reuses cache entries."""
item = {"title": "Title", "authors": "Author"}
dummy = Dummy()
dummy = DummyAuditui()
first = Auditui._get_search_text(cast(Auditui, dummy), item)
second = Auditui._get_search_text(cast(Auditui, dummy), item)
assert first == "title author"
@@ -31,7 +38,8 @@ def test_get_search_text_is_cached() -> None:
assert len(dummy._search_text_cache) == 1
def test_filter_items_uses_cache() -> None:
def test_filter_items_uses_cached_callable() -> None:
"""Ensure filter_items cooperates with a memoized search text callback."""
library = StubLibrary()
cache: dict[int, str] = {}
items = [
@@ -40,6 +48,7 @@ def test_filter_items_uses_cache() -> None:
]
def cached(item: dict) -> str:
"""Build and cache normalized search text per object identity."""
cache_key = id(item)
if cache_key not in cache:
cache[cache_key] = build_search_text(item, cast(Any, library))
@@ -49,6 +58,7 @@ def test_filter_items_uses_cache() -> None:
assert result == [items[1]]
def test_build_search_text_without_library() -> None:
def test_build_search_text_without_library_client() -> None:
"""Ensure fallback search text path handles inline author dicts."""
item = {"title": "Title", "authors": [{"name": "A"}, {"name": "B"}]}
assert build_search_text(item, None) == "title a, b"

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
import sys
from pathlib import Path
from types import ModuleType
from typing import Any, cast
ROOT = Path(__file__).resolve().parents[1]
@@ -15,21 +16,26 @@ try:
except ModuleNotFoundError:
audible_stub = ModuleType("audible")
class Authenticator: # minimal stub for type usage
class Authenticator:
"""Minimal audible authenticator test stub."""
pass
class Client: # minimal stub for type usage
class Client:
"""Minimal audible client test stub."""
pass
audible_stub.Authenticator = Authenticator
audible_stub.Client = Client
setattr(cast(Any, audible_stub), "Authenticator", Authenticator)
setattr(cast(Any, audible_stub), "Client", Client)
activation_bytes = ModuleType("audible.activation_bytes")
def get_activation_bytes(_auth: Authenticator | None = None) -> bytes:
"""Return deterministic empty activation bytes for tests."""
return b""
activation_bytes.get_activation_bytes = get_activation_bytes
setattr(cast(Any, activation_bytes), "get_activation_bytes", get_activation_bytes)
sys.modules["audible"] = audible_stub
sys.modules["audible.activation_bytes"] = activation_bytes

View File

@@ -0,0 +1,57 @@
from __future__ import annotations
from pathlib import Path
import pytest
from auditui.constants import MIN_FILE_SIZE
from auditui.downloads import DownloadManager
def _manager_with_cache_dir(tmp_path: Path) -> DownloadManager:
"""Build a lightweight DownloadManager instance without real HTTP clients."""
manager = DownloadManager.__new__(DownloadManager)
manager.cache_dir = tmp_path
manager.chunk_size = 1024
return manager
def test_sanitize_filename_replaces_invalid_characters() -> None:
"""Ensure filesystem-invalid symbols are replaced with underscores."""
manager = DownloadManager.__new__(DownloadManager)
assert manager._sanitize_filename('a<>:"/\\|?*b') == "a_________b"
def test_validate_download_url_accepts_only_http_schemes() -> None:
"""Ensure download URL validation only accepts HTTP and HTTPS links."""
manager = DownloadManager.__new__(DownloadManager)
assert manager._validate_download_url("https://example.com/file") is True
assert manager._validate_download_url("http://example.com/file") is True
assert manager._validate_download_url("ftp://example.com/file") is False
def test_get_cached_path_and_remove_cached(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Ensure cache lookup and cache deletion work for valid files."""
manager = _manager_with_cache_dir(tmp_path)
monkeypatch.setattr(manager, "_get_name_from_asin", lambda asin: "My Book")
cached_path = tmp_path / "My Book.aax"
cached_path.write_bytes(b"0" * MIN_FILE_SIZE)
messages: list[str] = []
assert manager.get_cached_path("ASIN123") == cached_path
assert manager.is_cached("ASIN123") is True
assert manager.remove_cached("ASIN123", notify=messages.append) is True
assert not cached_path.exists()
assert "Removed from cache" in messages[-1]
def test_get_cached_path_ignores_small_files(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Ensure undersized files are not treated as valid cache entries."""
manager = _manager_with_cache_dir(tmp_path)
monkeypatch.setattr(manager, "_get_name_from_asin", lambda asin: "My Book")
cached_path = tmp_path / "My Book.aax"
cached_path.write_bytes(b"0" * (MIN_FILE_SIZE - 1))
assert manager.get_cached_path("ASIN123") is None

View File

@@ -0,0 +1,85 @@
from __future__ import annotations
from pathlib import Path
import pytest
from auditui.constants import MIN_FILE_SIZE
from auditui.downloads import DownloadManager
from auditui.downloads import manager as manager_mod
def _bare_manager(tmp_path: Path) -> DownloadManager:
"""Create manager without invoking constructor side effects."""
manager = DownloadManager.__new__(DownloadManager)
manager.cache_dir = tmp_path
manager.chunk_size = 1024
manager.auth = type(
"Auth", (), {"adp_token": "x", "locale": type("Loc", (), {"domain": "fr"})()}
)()
return manager
def test_get_activation_bytes_returns_hex(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:
"""Ensure activation bytes are converted to lowercase hex string."""
manager = _bare_manager(tmp_path)
monkeypatch.setattr(manager_mod, "get_activation_bytes", lambda _auth: b"\xde\xad")
assert manager.get_activation_bytes() == "dead"
def test_get_activation_bytes_handles_errors(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:
"""Ensure activation retrieval failures are handled gracefully."""
manager = _bare_manager(tmp_path)
def _boom(_auth: object) -> bytes:
"""Raise a deterministic failure for exception-path coverage."""
raise OSError("no auth")
monkeypatch.setattr(manager_mod, "get_activation_bytes", _boom)
assert manager.get_activation_bytes() is None
def test_get_or_download_uses_cached_file_when_available(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Ensure cached files bypass link generation and download work."""
manager = _bare_manager(tmp_path)
monkeypatch.setattr(manager, "_get_name_from_asin", lambda asin: "Book")
cached_path = tmp_path / "Book.aax"
cached_path.write_bytes(b"1" * MIN_FILE_SIZE)
messages: list[str] = []
assert manager.get_or_download("ASIN", notify=messages.append) == cached_path
assert "Using cached file" in messages[0]
def test_get_or_download_reports_invalid_url(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Ensure workflow reports invalid download URLs and aborts."""
manager = _bare_manager(tmp_path)
monkeypatch.setattr(manager, "_get_name_from_asin", lambda asin: "Book")
monkeypatch.setattr(
manager, "_get_download_link", lambda asin, notify=None: "ftp://bad"
)
messages: list[str] = []
assert manager.get_or_download("ASIN", notify=messages.append) is None
assert "Invalid download URL" in messages
def test_get_or_download_handles_download_failure(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Ensure workflow reports failures when stream download does not complete."""
manager = _bare_manager(tmp_path)
monkeypatch.setattr(manager, "_get_name_from_asin", lambda asin: "Book")
monkeypatch.setattr(
manager, "_get_download_link", lambda asin, notify=None: "https://ok"
)
monkeypatch.setattr(manager, "_download_file", lambda url, path, notify=None: None)
messages: list[str] = []
assert manager.get_or_download("ASIN", notify=messages.append) is None
assert "Download failed" in messages

View File

@@ -0,0 +1,111 @@
from __future__ import annotations
from dataclasses import dataclass, field
from auditui.library import LibraryClient
@dataclass(slots=True)
class MockClient:
"""Client double that records writes and serves configurable responses."""
put_calls: list[tuple[str, dict]] = field(default_factory=list)
post_calls: list[tuple[str, dict]] = field(default_factory=list)
_post_response: dict = field(default_factory=dict)
raise_on_put: bool = False
def put(self, path: str, body: dict) -> dict:
"""Record put payload or raise when configured."""
if self.raise_on_put:
raise RuntimeError("put failed")
self.put_calls.append((path, body))
return {}
def post(self, path: str, body: dict) -> dict:
"""Record post payload and return configured response."""
self.post_calls.append((path, body))
return self._post_response
def get(self, path: str, **kwargs: dict) -> dict:
"""Return empty data for extractor-focused tests."""
del path, kwargs
return {}
def build_item(
*,
title: str | None = None,
product_title: str | None = None,
authors: list[dict] | None = None,
runtime_min: int | None = None,
listening_status: dict | None = None,
percent_complete: int | float | None = None,
asin: str | None = None,
) -> dict:
"""Construct synthetic library items for extractor and finish tests."""
item: dict = {}
if title is not None:
item["title"] = title
if percent_complete is not None:
item["percent_complete"] = percent_complete
if listening_status is not None:
item["listening_status"] = listening_status
if asin is not None:
item["asin"] = asin
product: dict = {}
if product_title is not None:
product["title"] = product_title
if runtime_min is not None:
product["runtime_length"] = {"min": runtime_min}
if authors is not None:
product["authors"] = authors
if asin is not None:
product["asin"] = asin
if product:
item["product"] = product
if runtime_min is not None:
item["runtime_length_min"] = runtime_min
return item
def test_extract_title_prefers_product_title() -> None:
"""Ensure product title has precedence over outer item title."""
library = LibraryClient(MockClient()) # type: ignore[arg-type]
assert (
library.extract_title(build_item(title="Outer", product_title="Inner"))
== "Inner"
)
def test_extract_title_falls_back_to_asin() -> None:
"""Ensure title fallback uses product ASIN when no title exists."""
library = LibraryClient(MockClient()) # type: ignore[arg-type]
assert library.extract_title({"product": {"asin": "A1"}}) == "A1"
def test_extract_authors_joins_names() -> None:
"""Ensure author dictionaries are converted to a readable list."""
library = LibraryClient(MockClient()) # type: ignore[arg-type]
item = build_item(authors=[{"name": "A"}, {"name": "B"}])
assert library.extract_authors(item) == "A, B"
def test_extract_runtime_minutes_handles_dict_and_number() -> None:
"""Ensure runtime extraction supports dict and numeric payloads."""
library = LibraryClient(MockClient()) # type: ignore[arg-type]
assert library.extract_runtime_minutes(build_item(runtime_min=12)) == 12
assert library.extract_runtime_minutes({"runtime_length": 42}) == 42
def test_extract_progress_info_prefers_listening_status_when_needed() -> None:
"""Ensure progress can be sourced from listening_status when top-level is absent."""
library = LibraryClient(MockClient()) # type: ignore[arg-type]
item = build_item(listening_status={"percent_complete": 25.0})
assert library.extract_progress_info(item) == 25.0
def test_extract_asin_prefers_item_then_product() -> None:
"""Ensure ASIN extraction works from both item and product fields."""
library = LibraryClient(MockClient()) # type: ignore[arg-type]
assert library.extract_asin(build_item(asin="ASIN1")) == "ASIN1"
assert library.extract_asin({"product": {"asin": "ASIN2"}}) == "ASIN2"

View File

@@ -0,0 +1,103 @@
from __future__ import annotations
from dataclasses import dataclass, field
from auditui.library import LibraryClient
@dataclass(slots=True)
class ProgressClient:
"""Client double for position and finished-state API methods."""
get_responses: dict[str, dict] = field(default_factory=dict)
put_calls: list[tuple[str, dict]] = field(default_factory=list)
post_response: dict = field(default_factory=dict)
fail_put: bool = False
def get(self, path: str, **kwargs: object) -> dict:
"""Return preconfigured payloads by API path."""
del kwargs
return self.get_responses.get(path, {})
def put(self, path: str, body: dict) -> dict:
"""Record payloads or raise to exercise error handling."""
if self.fail_put:
raise OSError("write failed")
self.put_calls.append((path, body))
return {}
def post(self, path: str, body: dict) -> dict:
"""Return licenserequest response for ACR extraction."""
del path, body
return self.post_response
def test_is_finished_true_from_percent_complete() -> None:
"""Ensure 100 percent completion is treated as finished."""
library = LibraryClient(ProgressClient()) # type: ignore[arg-type]
assert library.is_finished({"percent_complete": 100}) is True
def test_get_last_position_reads_matching_annotation() -> None:
"""Ensure last position is read in seconds from matching annotation."""
client = ProgressClient(
get_responses={
"1.0/annotations/lastpositions": {
"asin_last_position_heard_annots": [
{"asin": "X", "last_position_heard": {"position_ms": 9000}}
]
}
}
)
library = LibraryClient(client) # type: ignore[arg-type]
assert library.get_last_position("X") == 9.0
def test_get_last_position_returns_none_for_missing_state() -> None:
"""Ensure DoesNotExist status is surfaced as no saved position."""
client = ProgressClient(
get_responses={
"1.0/annotations/lastpositions": {
"asin_last_position_heard_annots": [
{"asin": "X", "last_position_heard": {"status": "DoesNotExist"}}
]
}
}
)
library = LibraryClient(client) # type: ignore[arg-type]
assert library.get_last_position("X") is None
def test_save_last_position_validates_non_positive_values() -> None:
"""Ensure save_last_position short-circuits on non-positive input."""
library = LibraryClient(ProgressClient()) # type: ignore[arg-type]
assert library.save_last_position("A", 0) is False
def test_update_position_writes_version_when_available() -> None:
"""Ensure version is included in payload when metadata provides it."""
client = ProgressClient(
get_responses={
"1.0/content/A/metadata": {
"content_metadata": {
"content_reference": {"acr": "token", "version": "2"}
}
}
}
)
library = LibraryClient(client) # type: ignore[arg-type]
assert library._update_position("A", 5.5) is True
path, body = client.put_calls[0]
assert path == "1.0/lastpositions/A"
assert body["position_ms"] == 5500
assert body["version"] == "2"
def test_mark_as_finished_updates_item_in_place() -> None:
"""Ensure successful finish update mutates local item flags."""
client = ProgressClient(post_response={"content_license": {"acr": "token"}})
library = LibraryClient(client) # type: ignore[arg-type]
item = {"runtime_length_min": 1, "listening_status": {}}
assert library.mark_as_finished("ASIN", item) is True
assert item["is_finished"] is True
assert item["listening_status"]["is_finished"] is True

View File

@@ -0,0 +1,34 @@
from __future__ import annotations
from auditui.library import build_search_text, filter_items
class SearchLibrary:
"""Simple search extraction adapter for build_search_text tests."""
def extract_title(self, item: dict) -> str:
"""Return a title value from a synthetic item."""
return item.get("t", "")
def extract_authors(self, item: dict) -> str:
"""Return an author value from a synthetic item."""
return item.get("a", "")
def test_build_search_text_uses_library_client_when_present() -> None:
"""Ensure search text delegates to library extractor methods."""
item = {"t": "The Book", "a": "The Author"}
assert build_search_text(item, SearchLibrary()) == "the book the author"
def test_filter_items_returns_input_when_filter_empty() -> None:
"""Ensure empty filter bypasses per-item search callback evaluation."""
items = [{"k": 1}, {"k": 2}]
assert filter_items(items, "", lambda _item: "ignored") == items
def test_filter_items_matches_case_insensitively() -> None:
"""Ensure search matching is case-insensitive across computed text."""
items = [{"name": "Alpha"}, {"name": "Beta"}]
result = filter_items(items, "BETA", lambda item: item["name"].lower())
assert result == [items[1]]

View File

@@ -0,0 +1,99 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, cast
from auditui.constants import AUTHOR_NAME_MAX_LENGTH
from auditui.library import (
create_progress_sort_key,
create_title_sort_key,
filter_unfinished_items,
format_item_as_row,
truncate_author_name,
)
class StubLibrary:
"""Library facade exposing only helpers needed by table formatting code."""
def extract_title(self, item: dict) -> str:
"""Return synthetic title value."""
return item.get("title", "")
def extract_authors(self, item: dict) -> str:
"""Return synthetic authors value."""
return item.get("authors", "")
def extract_runtime_minutes(self, item: dict) -> int | None:
"""Return synthetic minute duration."""
return item.get("minutes")
def format_duration(
self, value: int | None, unit: str = "minutes", default_none: str | None = None
) -> str | None:
"""Render runtime in compact minute format for tests."""
del unit
return default_none if value is None else f"{value}m"
def extract_progress_info(self, item: dict) -> float | None:
"""Return synthetic progress percentage value."""
return item.get("percent")
def extract_asin(self, item: dict) -> str | None:
"""Return synthetic ASIN value."""
return item.get("asin")
def is_finished(self, item: dict) -> bool:
"""Return synthetic finished flag from the item."""
return bool(item.get("finished"))
@dataclass(slots=True)
class StubDownloads:
"""Download cache adapter exposing just is_cached."""
cached: set[str]
def is_cached(self, asin: str) -> bool:
"""Return whether an ASIN is cached."""
return asin in self.cached
def test_create_title_sort_key_normalizes_accents() -> None:
"""Ensure title sorting removes accents before case-fold compare."""
key_fn, _ = create_title_sort_key()
assert key_fn(["Ecole"]) == key_fn(["École"])
def test_create_progress_sort_key_parses_percent_strings() -> None:
"""Ensure progress sorting converts percentages and handles invalid values."""
key_fn, _ = create_progress_sort_key()
assert key_fn(["0", "0", "0", "42.5%", ""]) == 42.5
assert key_fn(["0", "0", "0", "bad", ""]) == 0.0
def test_truncate_author_name_clamps_long_values() -> None:
"""Ensure very long author strings are shortened with ellipsis."""
long_name = "A" * (AUTHOR_NAME_MAX_LENGTH + 5)
out = truncate_author_name(long_name)
assert out.endswith("...")
assert len(out) <= AUTHOR_NAME_MAX_LENGTH
def test_format_item_as_row_marks_downloaded_titles() -> None:
"""Ensure downloaded ASINs are shown with a checkmark in table rows."""
item = {
"title": "Title",
"authors": "Author",
"minutes": 90,
"percent": 12.34,
"asin": "A1",
}
row = format_item_as_row(item, StubLibrary(), cast(Any, StubDownloads({"A1"})))
assert row == ("Title", "Author", "90m", "12.3%", "")
def test_filter_unfinished_items_keeps_only_incomplete() -> None:
"""Ensure unfinished filter excludes items marked as finished."""
items = [{"id": 1, "finished": False}, {"id": 2, "finished": True}]
assert filter_unfinished_items(items, StubLibrary()) == [items[0]]

View File

@@ -0,0 +1,54 @@
from __future__ import annotations
from auditui.stats.account import (
get_account_info,
get_country,
get_subscription_details,
)
class AccountClient:
"""Minimal API client returning endpoint-specific account responses."""
def __init__(self, responses: dict[str, dict]) -> None:
"""Store endpoint response map for deterministic tests."""
self._responses = responses
def get(self, path: str, **kwargs: object) -> dict:
"""Return configured response and ignore query parameters."""
del kwargs
return self._responses.get(path, {})
def test_get_account_info_merges_multiple_endpoints() -> None:
"""Ensure account info aggregator combines endpoint payload dictionaries."""
client = AccountClient(
{
"1.0/account/information": {"a": 1},
"1.0/customer/information": {"b": 2},
"1.0/customer/status": {"c": 3},
}
)
assert get_account_info(client) == {"a": 1, "b": 2, "c": 3}
def test_get_subscription_details_uses_known_nested_paths() -> None:
"""Ensure first valid subscription_details list entry is returned."""
info = {
"customer_details": {
"subscription": {"subscription_details": [{"name": "Plan"}]}
}
}
assert get_subscription_details(info) == {"name": "Plan"}
def test_get_country_supports_locale_variants() -> None:
"""Ensure country extraction supports object, domain, and locale string forms."""
auth_country_code = type(
"Auth", (), {"locale": type("Loc", (), {"country_code": "us"})()}
)()
auth_domain = type("Auth", (), {"locale": type("Loc", (), {"domain": "fr"})()})()
auth_string = type("Auth", (), {"locale": "en_gb"})()
assert get_country(auth_country_code) == "US"
assert get_country(auth_domain) == "FR"
assert get_country(auth_string) == "GB"

View File

@@ -0,0 +1,67 @@
from __future__ import annotations
from datetime import date
from auditui.stats.aggregator import StatsAggregator
from auditui.stats import aggregator as aggregator_mod
def test_get_stats_returns_empty_without_client() -> None:
"""Ensure stats aggregation short-circuits when API client is absent."""
aggregator = StatsAggregator(
client=None, auth=None, library_client=None, all_items=[]
)
assert aggregator.get_stats() == []
def test_get_stats_builds_expected_rows(monkeypatch) -> None:
"""Ensure aggregator assembles rows from listening, account, and email sources."""
monkeypatch.setattr(
aggregator_mod.listening_mod, "get_signup_year", lambda _client: 2015
)
monkeypatch.setattr(
aggregator_mod.listening_mod,
"get_listening_time",
lambda _client, duration, start_date: 120_000 if duration == 1 else 3_600_000,
)
monkeypatch.setattr(
aggregator_mod.listening_mod, "get_finished_books_count", lambda _lc, _items: 7
)
monkeypatch.setattr(
aggregator_mod.email_mod,
"resolve_email",
lambda *args, **kwargs: "user@example.com",
)
monkeypatch.setattr(aggregator_mod.account_mod, "get_country", lambda _auth: "US")
monkeypatch.setattr(
aggregator_mod.account_mod,
"get_account_info",
lambda _client: {
"subscription_details": [
{
"name": "Premium",
"next_bill_date": "2026-02-01T00:00:00Z",
"next_bill_amount": {
"currency_value": "14.95",
"currency_code": "USD",
},
}
]
},
)
aggregator = StatsAggregator(
client=object(),
auth=object(),
library_client=object(),
all_items=[{}, {}, {}],
)
stats = dict(aggregator.get_stats(today=date(2026, 2, 1)))
assert stats["Email"] == "user@example.com"
assert stats["Country Store"] == "US"
assert stats["Signup Year"] == "2015"
assert stats["Subscription"] == "Premium"
assert stats["Price"] == "14.95 USD"
assert stats["This Month"] == "2m"
assert stats["This Year"] == "1h00"
assert stats["Books Finished"] == "7 / 3"

View File

@@ -0,0 +1,64 @@
from __future__ import annotations
import json
from pathlib import Path
from auditui.stats.email import (
find_email_in_data,
first_email,
get_email_from_account_info,
get_email_from_auth,
get_email_from_auth_file,
get_email_from_config,
resolve_email,
)
def test_find_email_in_nested_data() -> None:
"""Ensure nested structures are scanned until a plausible email is found."""
data = {"a": {"b": ["nope", "user@example.com"]}}
assert find_email_in_data(data) == "user@example.com"
def test_first_email_skips_unknown_and_none() -> None:
"""Ensure first_email ignores empty and Unknown sentinel values."""
assert first_email(None, "Unknown", "ok@example.com") == "ok@example.com"
def test_get_email_from_config_and_auth_file(tmp_path: Path) -> None:
"""Ensure config and auth-file readers extract valid email fields."""
config_path = tmp_path / "config.json"
auth_path = tmp_path / "auth.json"
config_path.write_text(
json.dumps({"email": "config@example.com"}), encoding="utf-8"
)
auth_path.write_text(json.dumps({"email": "auth@example.com"}), encoding="utf-8")
assert get_email_from_config(config_path) == "config@example.com"
assert get_email_from_auth_file(auth_path) == "auth@example.com"
def test_get_email_from_auth_prefers_username() -> None:
"""Ensure auth object attributes are checked in expected precedence order."""
auth = type(
"Auth", (), {"username": "user@example.com", "login": None, "email": None}
)()
assert get_email_from_auth(auth) == "user@example.com"
def test_get_email_from_account_info_supports_nested_customer_info() -> None:
"""Ensure account email can be discovered in nested customer_info payload."""
info = {"customer_info": {"primary_email": "nested@example.com"}}
assert get_email_from_account_info(info) == "nested@example.com"
def test_resolve_email_falls_back_to_account_getter(tmp_path: Path) -> None:
"""Ensure resolve_email checks account-info callback when local sources miss."""
auth = object()
value = resolve_email(
auth,
client=object(),
config_path=tmp_path / "missing-config.json",
auth_path=tmp_path / "missing-auth.json",
get_account_info=lambda: {"customer_email": "account@example.com"},
)
assert value == "account@example.com"

View File

@@ -0,0 +1,16 @@
from __future__ import annotations
from auditui.stats.format import format_date, format_time
def test_format_time_handles_minutes_and_hours() -> None:
"""Ensure format_time outputs minute-only and hour-minute formats."""
assert format_time(90_000) == "1m"
assert format_time(3_660_000) == "1h01"
def test_format_date_handles_iso_and_invalid_values() -> None:
"""Ensure format_date normalizes ISO timestamps and preserves invalid input."""
assert format_date("2026-01-15T10:20:30Z") == "2026-01-15"
assert format_date("not-a-date") == "not-a-date"
assert format_date(None) == "Unknown"

View File

@@ -0,0 +1,64 @@
from __future__ import annotations
from auditui.stats.listening import (
get_finished_books_count,
get_listening_time,
get_signup_year,
has_activity,
)
class StatsClient:
"""Client double for monthly aggregate lookups keyed by start date."""
def __init__(self, sums_by_start_date: dict[str, list[int]]) -> None:
"""Store aggregate sums grouped by monthly_listening_interval_start_date."""
self._sums = sums_by_start_date
def get(self, path: str, **kwargs: str) -> dict:
"""Return aggregate payload based on requested interval start date."""
del path
start_date = kwargs["monthly_listening_interval_start_date"]
sums = self._sums.get(start_date, [0])
return {
"aggregated_monthly_listening_stats": [{"aggregated_sum": s} for s in sums]
}
def test_has_activity_detects_non_zero_months() -> None:
"""Ensure activity helper returns true when any month has positive sum."""
assert (
has_activity(
{
"aggregated_monthly_listening_stats": [
{"aggregated_sum": 0},
{"aggregated_sum": 1},
]
}
)
is True
)
def test_get_listening_time_sums_aggregated_months() -> None:
"""Ensure monthly aggregate sums are added into one listening total."""
client = StatsClient({"2026-01": [1000, 2000, 3000]})
assert get_listening_time(client, duration=1, start_date="2026-01") == 6000
def test_get_signup_year_returns_earliest_year_with_activity() -> None:
"""Ensure signup year search finds first active year via binary search."""
client = StatsClient(
{"2026-01": [1], "2010-01": [1], "2002-01": [1], "2001-01": [0]}
)
year = get_signup_year(client)
assert year <= 2010
def test_get_finished_books_count_uses_library_is_finished() -> None:
"""Ensure finished books count delegates to library client predicate."""
library_client = type(
"Library", (), {"is_finished": lambda self, item: item.get("done", False)}
)()
items = [{"done": True}, {"done": False}, {"done": True}]
assert get_finished_books_count(library_client, items) == 2

View File

@@ -1,48 +0,0 @@
from pathlib import Path
import pytest
from auditui.downloads import DownloadManager
from auditui.constants import MIN_FILE_SIZE
def test_sanitize_filename() -> None:
dm = DownloadManager.__new__(DownloadManager)
assert dm._sanitize_filename('a<>:"/\\|?*b') == "a_________b"
def test_validate_download_url() -> None:
dm = DownloadManager.__new__(DownloadManager)
assert dm._validate_download_url("https://example.com/file") is True
assert dm._validate_download_url("http://example.com/file") is True
assert dm._validate_download_url("ftp://example.com/file") is False
def test_cached_path_and_remove(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
dm = DownloadManager.__new__(DownloadManager)
dm.cache_dir = tmp_path
monkeypatch.setattr(dm, "_get_name_from_asin", lambda asin: "My Book")
safe_name = dm._sanitize_filename("My Book")
cached_path = tmp_path / f"{safe_name}.aax"
cached_path.write_bytes(b"0" * MIN_FILE_SIZE)
assert dm.get_cached_path("ASIN123") == cached_path
assert dm.is_cached("ASIN123") is True
messages: list[str] = []
assert dm.remove_cached("ASIN123", notify=messages.append) is True
assert not cached_path.exists()
assert messages and "Removed from cache" in messages[-1]
def test_cached_path_ignores_small_file(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
dm = DownloadManager.__new__(DownloadManager)
dm.cache_dir = tmp_path
monkeypatch.setattr(dm, "_get_name_from_asin", lambda asin: "My Book")
safe_name = dm._sanitize_filename("My Book")
cached_path = tmp_path / f"{safe_name}.aax"
cached_path.write_bytes(b"0" * (MIN_FILE_SIZE - 1))
assert dm.get_cached_path("ASIN123") is None

View File

@@ -1,131 +0,0 @@
from dataclasses import dataclass, field
from auditui.library import LibraryClient
@dataclass(slots=True)
class MockClient:
put_calls: list[tuple[str, dict]] = field(default_factory=list)
post_calls: list[tuple[str, dict]] = field(default_factory=list)
_post_response: dict = field(default_factory=dict)
raise_on_put: bool = False
def put(self, path: str, body: dict) -> dict:
if self.raise_on_put:
raise RuntimeError("put failed")
self.put_calls.append((path, body))
return {}
def post(self, path: str, body: dict) -> dict:
self.post_calls.append((path, body))
return self._post_response
def get(self, path: str, **kwargs: dict) -> dict:
return {}
def test_extract_title_prefers_product() -> None:
client = MockClient()
library = LibraryClient(client) # type: ignore[arg-type]
item = build_item(title="Outer", product_title="Inner")
assert library.extract_title(item) == "Inner"
def test_extract_authors_joins_names() -> None:
client = MockClient()
library = LibraryClient(client) # type: ignore[arg-type]
item = build_item(authors=[{"name": "A"}, {"name": "B"}])
assert library.extract_authors(item) == "A, B"
def test_extract_runtime_minutes_from_dict() -> None:
client = MockClient()
library = LibraryClient(client) # type: ignore[arg-type]
item = build_item(runtime_min=12)
assert library.extract_runtime_minutes(item) == 12
def test_extract_progress_info_from_listening_status() -> None:
client = MockClient()
library = LibraryClient(client) # type: ignore[arg-type]
item = build_item(listening_status={"percent_complete": 25.0})
assert library.extract_progress_info(item) == 25.0
def test_is_finished_with_percent_complete() -> None:
client = MockClient()
library = LibraryClient(client) # type: ignore[arg-type]
item = build_item(percent_complete=100)
assert library.is_finished(item)
def test_format_duration_and_time() -> None:
client = MockClient()
library = LibraryClient(client) # type: ignore[arg-type]
assert library.format_duration(61) == "1h01"
assert library.format_time(3661) == "01:01:01"
def test_mark_as_finished_success_updates_item() -> None:
client = MockClient()
client._post_response = {"content_license": {"acr": "token"}}
library = LibraryClient(client) # type: ignore[arg-type]
item = build_item(runtime_min=1, listening_status={})
ok = library.mark_as_finished("ASIN", item)
assert ok
assert client.put_calls
path, body = client.put_calls[0]
assert path == "1.0/lastpositions/ASIN"
assert body["acr"] == "token"
assert body["position_ms"] == 60_000
assert item["is_finished"] is True
assert item["listening_status"]["is_finished"] is True
def test_mark_as_finished_fails_without_acr() -> None:
client = MockClient()
client._post_response = {}
library = LibraryClient(client) # type: ignore[arg-type]
item = build_item(runtime_min=1)
ok = library.mark_as_finished("ASIN", item)
assert ok is False
def test_mark_as_finished_handles_put_error() -> None:
client = MockClient()
client._post_response = {"content_license": {"acr": "token"}}
client.raise_on_put = True
library = LibraryClient(client) # type: ignore[arg-type]
item = build_item(runtime_min=1)
ok = library.mark_as_finished("ASIN", item)
assert ok is False
def build_item(
*,
title: str | None = None,
product_title: str | None = None,
authors: list[dict] | None = None,
runtime_min: int | None = None,
listening_status: dict | None = None,
percent_complete: int | float | None = None,
) -> dict:
item: dict = {}
if title is not None:
item["title"] = title
if percent_complete is not None:
item["percent_complete"] = percent_complete
if listening_status is not None:
item["listening_status"] = listening_status
product: dict = {}
if product_title is not None:
product["title"] = product_title
if runtime_min is not None:
product["runtime_length"] = {"min": runtime_min}
if authors is not None:
product["authors"] = authors
if product:
item["product"] = product
if runtime_min is not None and "runtime_length_min" not in item:
item["runtime_length_min"] = runtime_min
return item

View File

@@ -1,89 +0,0 @@
from dataclasses import dataclass
from typing import Any, cast
from auditui.constants import AUTHOR_NAME_MAX_LENGTH
from auditui.library import (
create_progress_sort_key,
create_title_sort_key,
format_item_as_row,
truncate_author_name,
)
class StubLibrary:
def extract_title(self, item: dict) -> str:
return item.get("title", "")
def extract_authors(self, item: dict) -> str:
return item.get("authors", "")
def extract_runtime_minutes(self, item: dict) -> int | None:
return item.get("minutes")
def format_duration(
self, value: int | None, unit: str = "minutes", default_none: str | None = None
) -> str | None:
if value is None:
return default_none
return f"{value}m"
def extract_progress_info(self, item: dict) -> float | None:
return item.get("percent")
def extract_asin(self, item: dict) -> str | None:
return item.get("asin")
@dataclass(slots=True)
class StubDownloads:
_cached: set[str]
def is_cached(self, asin: str) -> bool:
return asin in self._cached
def test_create_title_sort_key_normalizes_accents() -> None:
key_fn, _ = create_title_sort_key()
assert key_fn(["École"]) == "ecole"
assert key_fn(["Zoo"]) == "zoo"
def test_create_progress_sort_key_parses_percent() -> None:
key_fn, _ = create_progress_sort_key()
assert key_fn(["0", "0", "0", "42.5%"]) == 42.5
assert key_fn(["0", "0", "0", "bad"]) == 0.0
def test_truncate_author_name() -> None:
long_name = "A" * (AUTHOR_NAME_MAX_LENGTH + 5)
truncated = truncate_author_name(long_name)
assert truncated.endswith("...")
assert len(truncated) <= AUTHOR_NAME_MAX_LENGTH
def test_format_item_as_row_with_downloaded() -> None:
library = StubLibrary()
downloads = StubDownloads({"ASIN123"})
item = {
"title": "Title",
"authors": "Author One",
"minutes": 90,
"percent": 12.34,
"asin": "ASIN123",
}
title, author, runtime, progress, downloaded = format_item_as_row(
item, library, cast(Any, downloads)
)
assert title == "Title"
assert author == "Author One"
assert runtime == "90m"
assert progress == "12.3%"
assert downloaded == ""
def test_format_item_as_row_zero_progress() -> None:
library = StubLibrary()
item = {"title": "Title", "authors": "Author",
"minutes": 30, "percent": 0.0}
_, _, _, progress, _ = format_item_as_row(item, library, None)
assert progress == "0%"

View File

@@ -1,35 +0,0 @@
import json
from pathlib import Path
from auditui.stats.email import (
find_email_in_data,
get_email_from_auth,
get_email_from_auth_file,
get_email_from_config,
)
def test_find_email_in_data() -> None:
data = {"a": {"b": ["nope", "user@example.com"]}}
assert find_email_in_data(data) == "user@example.com"
def test_get_email_from_config(tmp_path: Path) -> None:
config_path = tmp_path / "config.json"
config_path.write_text(json.dumps({"email": "config@example.com"}))
assert get_email_from_config(config_path) == "config@example.com"
def test_get_email_from_auth_file(tmp_path: Path) -> None:
auth_path = tmp_path / "auth.json"
auth_path.write_text(json.dumps({"email": "auth@example.com"}))
assert get_email_from_auth_file(auth_path) == "auth@example.com"
def test_get_email_from_auth() -> None:
class Auth:
username = "user@example.com"
login = None
email = None
assert get_email_from_auth(Auth()) == "user@example.com"

View File

@@ -9,40 +9,54 @@ from textual.widgets import Input
@dataclass(slots=True)
class DummyEvent:
"""Minimal event object carrying an input value for tests."""
value: str
@dataclass(slots=True)
class FakeTimer:
"""Timer substitute recording whether stop() was called."""
callback: Callable[[], None]
stopped: bool = False
def stop(self) -> None:
"""Mark timer as stopped."""
self.stopped = True
def test_filter_debounce_uses_latest_value(monkeypatch) -> None:
"""Ensure debounce cancels previous timer and emits latest input value."""
seen: list[str] = []
timers: list[FakeTimer] = []
def on_change(value: str) -> None:
"""Capture emitted filter values."""
seen.append(value)
screen = FilterScreen(on_change=on_change, debounce_seconds=0.2)
def fake_set_timer(_delay: float, callback):
def fake_set_timer(_delay: float, callback: Callable[[], None]) -> FakeTimer:
"""Record timer callbacks instead of scheduling real timers."""
timer = FakeTimer(callback)
timers.append(timer)
return timer
monkeypatch.setattr(screen, "set_timer", fake_set_timer)
screen.on_input_changed(cast(Input.Changed, DummyEvent("a")))
screen.on_input_changed(cast(Input.Changed, DummyEvent("ab")))
assert len(timers) == 2
assert timers[0].stopped is True
assert timers[1].stopped is False
timers[1].callback()
assert seen == ["ab"]
def test_on_unmount_stops_pending_timer() -> None:
"""Ensure screen unmount stops pending debounce timer when present."""
screen = FilterScreen(on_change=lambda _value: None)
timer = FakeTimer(lambda: None)
screen._debounce_timer = timer
screen.on_unmount()
assert timer.stopped is True