Massive refactoring #1

Merged
Kharec merged 35 commits from new-architecture into main 2026-02-18 04:29:20 +01:00
51 changed files with 1970 additions and 1848 deletions
Showing only changes of commit 8e73e45e2d - Show all commits

View File

@@ -145,7 +145,7 @@ This project uses [uv](https://github.com/astral-sh/uv) for dependency managemen
$ uv sync $ uv sync
# modify the code... # modify the code...
# ...and run the TUI # ...and run the TUI
$ uv run python -m auditui.cli $ uv run auditui
``` ```
Don't forget to run the tests. Don't forget to run the tests.

View File

@@ -1,3 +1,3 @@
"""Auditui package""" """Auditui: Audible TUI client. One folder per module; all code lives inside module packages."""
__version__ = "0.1.6" __version__ = "0.1.6"

View File

@@ -1,614 +0,0 @@
"""Textual application for the Audible TUI."""
from __future__ import annotations
from typing import TYPE_CHECKING
from textual import work
from textual.app import App, ComposeResult
from textual.containers import Horizontal
from textual.events import Key, Resize
from textual.widgets import DataTable, ProgressBar, Static
from textual.worker import get_current_worker
from . import __version__
from .constants import (
PROGRESS_COLUMN_INDEX,
SEEK_SECONDS,
TABLE_COLUMN_DEFS,
TABLE_CSS,
)
from .downloads import DownloadManager
from .library import LibraryClient
from .playback import PlaybackController
from .table_utils import (
create_progress_sort_key,
create_title_sort_key,
filter_unfinished_items,
format_item_as_row,
)
from .search_utils import build_search_text, filter_items
from .ui import FilterScreen, HelpScreen, StatsScreen
if TYPE_CHECKING:
from textual.widgets.data_table import ColumnKey
class Auditui(App):
"""Main application class for the Audible TUI app."""
SHOW_PALETTE = False
BINDINGS = [
("?", "show_help", "Help"),
("s", "show_stats", "Stats"),
("/", "filter", "Filter"),
("escape", "clear_filter", "Clear filter"),
("n", "sort", "Sort by name"),
("p", "sort_by_progress", "Sort by progress"),
("a", "show_all", "All/Unfinished"),
("r", "refresh", "Refresh"),
("enter", "play_selected", "Play"),
("space", "toggle_playback", "Pause/Resume"),
("left", "seek_backward", "-30s"),
("right", "seek_forward", "+30s"),
("ctrl+left", "previous_chapter", "Previous chapter"),
("ctrl+right", "next_chapter", "Next chapter"),
("up", "increase_speed", "Increase speed"),
("down", "decrease_speed", "Decrease speed"),
("f", "toggle_finished", "Mark finished"),
("d", "toggle_download", "Download/Delete"),
("q", "quit", "Quit"),
]
CSS = TABLE_CSS
def __init__(self, auth=None, client=None) -> None:
super().__init__()
self.auth = auth
self.client = client
self.library_client = LibraryClient(client) if client else None
self.download_manager = (
DownloadManager(auth, client) if auth and client else None
)
self.playback = PlaybackController(self.update_status, self.library_client)
self.all_items: list[dict] = []
self.current_items: list[dict] = []
self._search_text_cache: dict[int, str] = {}
self.show_all_mode = False
self.filter_text = ""
self.title_sort_reverse = False
self.progress_sort_reverse = False
self.title_column_key: ColumnKey | None = None
self.progress_column_index = PROGRESS_COLUMN_INDEX
def compose(self) -> ComposeResult:
yield Horizontal(
Static("? Help", id="top_left"),
Static(f"Auditui v{__version__}", id="top_center"),
Static("q Quit", id="top_right"),
id="top_bar",
)
yield Static("Loading...", id="status")
table: DataTable = DataTable()
table.zebra_stripes = True
table.cursor_type = "row"
yield table
yield Static("", id="progress_info")
with Horizontal(id="progress_bar_container"):
yield ProgressBar(
id="progress_bar", show_eta=False, show_percentage=False, total=100
)
def on_mount(self) -> None:
"""Initialize the table and start fetching library data."""
self.theme = "textual-dark"
table = self.query_one(DataTable)
for column_name, _ratio in TABLE_COLUMN_DEFS:
table.add_column(column_name)
self.call_after_refresh(lambda: self._apply_column_widths(table))
column_keys = list(table.columns.keys())
self.title_column_key = column_keys[0]
if self.client:
self.update_status("Fetching library...")
self.fetch_library()
else:
self.update_status("Not authenticated. Please restart and authenticate.")
self.set_interval(1.0, self._check_playback_status)
self.set_interval(0.5, self._update_progress)
self.set_interval(30.0, self._save_position_periodically)
def on_unmount(self) -> None:
"""Clean up on app exit."""
self.playback.stop()
if self.download_manager:
self.download_manager.close()
def on_resize(self, event: Resize) -> None:
"""Keep table columns responsive to terminal width changes."""
del event
try:
table = self.query_one(DataTable)
except Exception:
return
self.call_after_refresh(lambda: self._apply_column_widths(table))
def on_key(self, event: Key) -> None:
"""Handle key presses."""
if self.playback.is_playing:
if event.key == "ctrl+left":
event.prevent_default()
self.action_previous_chapter()
return
elif event.key == "ctrl+right":
event.prevent_default()
self.action_next_chapter()
return
elif event.key == "left":
event.prevent_default()
self.action_seek_backward()
return
elif event.key == "right":
event.prevent_default()
self.action_seek_forward()
return
elif event.key == "up":
event.prevent_default()
self.action_increase_speed()
return
elif event.key == "down":
event.prevent_default()
self.action_decrease_speed()
return
if isinstance(self.focused, DataTable):
if event.key == "enter":
event.prevent_default()
self.action_play_selected()
elif event.key == "space":
event.prevent_default()
self.action_toggle_playback()
def update_status(self, message: str) -> None:
"""Update the status message in the UI."""
status = self.query_one("#status", Static)
status.display = True
status.update(message)
def _apply_column_widths(self, table: DataTable) -> None:
"""Assign proportional column widths based on available space.
Each column's render width = column.width + 2 * cell_padding.
We compute column.width values so columns fill the full table width.
"""
if not table.columns:
return
column_keys = list(table.columns.keys())
num_cols = len(column_keys)
ratios = [ratio for _, ratio in TABLE_COLUMN_DEFS]
total_ratio = sum(ratios) or num_cols
content_width = table.scrollable_content_region.width
if content_width <= 0:
content_width = table.size.width
if content_width <= 0:
return
padding_total = 2 * table.cell_padding * num_cols
distributable = max(num_cols, content_width - padding_total)
widths: list[int] = []
for ratio in ratios:
w = max(1, (distributable * ratio) // total_ratio)
widths.append(w)
remainder = distributable - sum(widths)
if remainder > 0:
indices = sorted(range(num_cols), key=lambda i: ratios[i], reverse=True)
for i in range(remainder):
widths[indices[i % num_cols]] += 1
for column_key, w in zip(column_keys, widths):
col = table.columns[column_key]
col.auto_width = False
col.width = w
table.refresh()
def _thread_status_update(self, message: str) -> None:
"""Safely update status from worker threads."""
self.call_from_thread(self.update_status, message)
@work(exclusive=True, thread=True)
def fetch_library(self) -> None:
"""Fetch all library items from Audible API in background thread."""
worker = get_current_worker()
if worker.is_cancelled or not self.library_client:
return
try:
all_items = self.library_client.fetch_all_items(self._thread_status_update)
self.call_from_thread(self.on_library_loaded, all_items)
except (OSError, ValueError, KeyError) as exc:
self.call_from_thread(self.on_library_error, str(exc))
def on_library_loaded(self, items: list[dict]) -> None:
"""Handle successful library load."""
self.all_items = items
self._search_text_cache.clear()
self._prime_search_cache(items)
self.update_status(f"Loaded {len(items)} books")
if self.show_all_mode:
self.show_all()
else:
self.show_unfinished()
def on_library_error(self, error: str) -> None:
"""Handle library fetch error."""
self.update_status(f"Error fetching library: {error}")
def _populate_table(self, items: list[dict]) -> None:
"""Populate the DataTable with library items."""
table = self.query_one(DataTable)
table.clear()
if not items or not self.library_client:
self.update_status("No books found.")
return
for item in items:
title, author, runtime, progress, downloaded = format_item_as_row(
item, self.library_client, self.download_manager
)
table.add_row(title, author, runtime, progress, downloaded, key=title)
self.current_items = items
status = self.query_one("#status", Static)
status.display = False
self._apply_column_widths(table)
def _refresh_table(self) -> None:
"""Refresh the table with current items."""
if self.current_items:
self._populate_table(self.current_items)
def show_all(self) -> None:
"""Display all books in the table."""
if not self.all_items:
return
self.show_all_mode = True
self._refresh_filtered_view()
def show_unfinished(self) -> None:
"""Display only unfinished books in the table."""
if not self.all_items or not self.library_client:
return
self.show_all_mode = False
self._refresh_filtered_view()
def action_sort(self) -> None:
"""Sort table by title, toggling direction on each press."""
table = self.query_one(DataTable)
if table.row_count > 0 and self.title_column_key:
title_key, reverse = create_title_sort_key(self.title_sort_reverse)
table.sort(key=title_key, reverse=reverse)
self.title_sort_reverse = not self.title_sort_reverse
def action_sort_by_progress(self) -> None:
"""Sort table by progress percentage, toggling direction on each press."""
table = self.query_one(DataTable)
if table.row_count > 0:
self.progress_sort_reverse = not self.progress_sort_reverse
progress_key, reverse = create_progress_sort_key(
self.progress_column_index, self.progress_sort_reverse
)
table.sort(key=progress_key, reverse=reverse)
def action_show_all(self) -> None:
"""Toggle between showing all and unfinished books."""
if self.show_all_mode:
self.show_unfinished()
else:
self.show_all()
def action_refresh(self) -> None:
"""Refresh the library data from the API."""
if not self.client:
self.update_status("Not authenticated. Cannot refresh.")
return
self.update_status("Refreshing library...")
self.fetch_library()
def action_play_selected(self) -> None:
"""Start playing the selected book."""
if not self.download_manager:
self.update_status("Not authenticated. Please restart and authenticate.")
return
table = self.query_one(DataTable)
if table.row_count == 0:
self.update_status("No books available")
return
cursor_row = table.cursor_row
if cursor_row >= len(self.current_items):
self.update_status("Invalid selection")
return
if not self.library_client:
self.update_status("Library client not available")
return
selected_item = self.current_items[cursor_row]
asin = self.library_client.extract_asin(selected_item)
if not asin:
self.update_status("Could not get ASIN for selected book")
return
self._start_playback_async(asin)
def action_toggle_playback(self) -> None:
"""Toggle pause/resume state."""
if not self.playback.toggle_playback():
self._no_playback_message()
def action_seek_forward(self) -> None:
"""Seek forward 30 seconds."""
if not self.playback.seek_forward(SEEK_SECONDS):
self._no_playback_message()
def action_seek_backward(self) -> None:
"""Seek backward 30 seconds."""
if not self.playback.seek_backward(SEEK_SECONDS):
self._no_playback_message()
def action_next_chapter(self) -> None:
"""Seek to the next chapter."""
if not self.playback.seek_to_next_chapter():
self._no_playback_message()
def action_previous_chapter(self) -> None:
"""Seek to the previous chapter."""
if not self.playback.seek_to_previous_chapter():
self._no_playback_message()
def action_increase_speed(self) -> None:
"""Increase playback speed."""
if not self.playback.increase_speed():
self._no_playback_message()
def action_decrease_speed(self) -> None:
"""Decrease playback speed."""
if not self.playback.decrease_speed():
self._no_playback_message()
def action_toggle_finished(self) -> None:
"""Toggle finished/unfinished status for the selected book."""
if not self.library_client:
self.update_status("Library client not available")
return
table = self.query_one(DataTable)
if table.row_count == 0:
self.update_status("No books available")
return
cursor_row = table.cursor_row
if cursor_row >= len(self.current_items):
self.update_status("Invalid selection")
return
selected_item = self.current_items[cursor_row]
asin = self.library_client.extract_asin(selected_item)
if not asin:
self.update_status("Could not get ASIN for selected book")
return
self._toggle_finished_async(asin)
@work(exclusive=True, thread=True)
def _toggle_finished_async(self, asin: str) -> None:
"""Toggle finished/unfinished status asynchronously."""
if not self.library_client:
return
selected_item = None
for item in self.current_items:
if self.library_client.extract_asin(item) == asin:
selected_item = item
break
if not selected_item:
return
is_currently_finished = self.library_client.is_finished(selected_item)
if is_currently_finished:
self.call_from_thread(self.update_status, "Already marked as finished")
return
success = self.library_client.mark_as_finished(asin, selected_item)
message = "Marked as finished" if success else "Failed to mark as finished"
self.call_from_thread(self.update_status, message)
if success:
if self.download_manager and self.download_manager.is_cached(asin):
self.download_manager.remove_cached(
asin, notify=self._thread_status_update
)
self.call_from_thread(self.fetch_library)
def _no_playback_message(self) -> None:
"""Show message when no playback is active."""
self.update_status("No playback active. Press Enter to play a book.")
def action_show_help(self) -> None:
"""Show the help screen with all keybindings."""
self.push_screen(HelpScreen())
def action_show_stats(self) -> None:
"""Show the stats screen with listening statistics."""
self.push_screen(StatsScreen())
def action_filter(self) -> None:
"""Show the filter screen to search the library."""
self.push_screen(
FilterScreen(
self.filter_text,
on_change=self._apply_filter,
),
self._apply_filter,
)
def action_clear_filter(self) -> None:
"""Clear the current filter if active."""
if self.filter_text:
self.filter_text = ""
self._refresh_filtered_view()
self.update_status("Filter cleared")
def _apply_filter(self, filter_text: str | None) -> None:
"""Apply the filter to the library."""
self.filter_text = filter_text or ""
self._refresh_filtered_view()
def _refresh_filtered_view(self) -> None:
"""Refresh the table with current filter and view mode."""
if not self.all_items:
return
items = self.all_items
if self.filter_text:
items = filter_items(items, self.filter_text, self._get_search_text)
self._populate_table(items)
self.update_status(f"Filter: '{self.filter_text}' ({len(items)} books)")
return
if not self.show_all_mode and self.library_client:
items = filter_unfinished_items(items, self.library_client)
self._populate_table(items)
def _get_search_text(self, item: dict) -> str:
"""Return cached search text for filtering."""
cache_key = id(item)
cached = self._search_text_cache.get(cache_key)
if cached is not None:
return cached
search_text = build_search_text(item, self.library_client)
self._search_text_cache[cache_key] = search_text
return search_text
def _prime_search_cache(self, items: list[dict]) -> None:
"""Precompute search text for a list of items."""
for item in items:
self._get_search_text(item)
def _check_playback_status(self) -> None:
"""Check if playback process has finished and update state accordingly."""
message = self.playback.check_status()
if message:
self.update_status(message)
self._hide_progress()
def _update_progress(self) -> None:
"""Update the progress bar and info during playback."""
if not self.playback.is_playing:
self._hide_progress()
return
progress_data = self.playback.get_current_progress()
if not progress_data:
self._hide_progress()
return
chapter_name, chapter_elapsed, chapter_total = progress_data
if chapter_total <= 0:
self._hide_progress()
return
progress_info = self.query_one("#progress_info", Static)
progress_bar = self.query_one("#progress_bar", ProgressBar)
progress_bar_container = self.query_one("#progress_bar_container", Horizontal)
progress_percent = min(
100.0, max(0.0, (chapter_elapsed / chapter_total) * 100.0)
)
progress_bar.update(progress=progress_percent)
chapter_elapsed_str = LibraryClient.format_time(chapter_elapsed)
chapter_total_str = LibraryClient.format_time(chapter_total)
progress_info.update(
f"{chapter_name} | {chapter_elapsed_str} / {chapter_total_str}"
)
progress_info.display = True
progress_bar_container.display = True
def _hide_progress(self) -> None:
"""Hide the progress widget."""
progress_info = self.query_one("#progress_info", Static)
progress_bar_container = self.query_one("#progress_bar_container", Horizontal)
progress_info.display = False
progress_bar_container.display = False
def _save_position_periodically(self) -> None:
"""Periodically save playback position."""
self.playback.update_position_if_needed()
def action_toggle_download(self) -> None:
"""Toggle download/remove for the selected book."""
if not self.download_manager:
self.update_status("Not authenticated. Please restart and authenticate.")
return
table = self.query_one(DataTable)
if table.row_count == 0:
self.update_status("No books available")
return
cursor_row = table.cursor_row
if cursor_row >= len(self.current_items):
self.update_status("Invalid selection")
return
if not self.library_client:
self.update_status("Library client not available")
return
selected_item = self.current_items[cursor_row]
asin = self.library_client.extract_asin(selected_item)
if not asin:
self.update_status("Could not get ASIN for selected book")
return
self._toggle_download_async(asin)
@work(exclusive=True, thread=True)
def _toggle_download_async(self, asin: str) -> None:
"""Toggle download/remove asynchronously."""
if not self.download_manager:
return
if self.download_manager.is_cached(asin):
self.download_manager.remove_cached(asin, self._thread_status_update)
else:
self.download_manager.get_or_download(asin, self._thread_status_update)
self.call_from_thread(self._refresh_table)
@work(exclusive=True, thread=True)
def _start_playback_async(self, asin: str) -> None:
"""Start playback asynchronously."""
if not self.download_manager:
return
self.playback.prepare_and_start(
self.download_manager,
asin,
self._thread_status_update,
)

27
auditui/app/__init__.py Normal file
View File

@@ -0,0 +1,27 @@
"""Main Textual app: table, bindings, and orchestration of library, playback, and downloads."""
from __future__ import annotations
from textual.app import App
from ..constants import TABLE_CSS
from .bindings import BINDINGS
from .state import init_auditui_state
from .layout import AppLayoutMixin
from .table import AppTableMixin
from .library import AppLibraryMixin
from .actions import AppActionsMixin
from .progress import AppProgressMixin
class Auditui(App, AppProgressMixin, AppActionsMixin, AppLibraryMixin, AppTableMixin, AppLayoutMixin):
"""Orchestrates the library table, playback, downloads, filter, and modal screens."""
SHOW_PALETTE = False
BINDINGS = BINDINGS
CSS = TABLE_CSS
def __init__(self, auth=None, client=None) -> None:
super().__init__()
init_auditui_state(self, auth, client)

159
auditui/app/actions.py Normal file
View File

@@ -0,0 +1,159 @@
"""Selection, playback/download/finish actions, modals, and filter."""
from __future__ import annotations
from textual import work
from textual.widgets import DataTable
from ..constants import SEEK_SECONDS
from ..ui import FilterScreen, HelpScreen, StatsScreen
class AppActionsMixin:
def _get_selected_asin(self) -> str | None:
if not self.download_manager:
self.update_status(
"Not authenticated. Please restart and authenticate.")
return None
table = self.query_one(DataTable)
if table.row_count == 0:
self.update_status("No books available")
return None
cursor_row = table.cursor_row
if cursor_row >= len(self.current_items):
self.update_status("Invalid selection")
return None
if not self.library_client:
self.update_status("Library client not available")
return None
selected_item = self.current_items[cursor_row]
asin = self.library_client.extract_asin(selected_item)
if not asin:
self.update_status("Could not get ASIN for selected book")
return None
return asin
def action_play_selected(self) -> None:
asin = self._get_selected_asin()
if asin:
self._start_playback_async(asin)
def action_toggle_playback(self) -> None:
if not self.playback.toggle_playback():
self._no_playback_message()
def action_seek_forward(self) -> None:
if not self.playback.seek_forward(SEEK_SECONDS):
self._no_playback_message()
def action_seek_backward(self) -> None:
if not self.playback.seek_backward(SEEK_SECONDS):
self._no_playback_message()
def action_next_chapter(self) -> None:
if not self.playback.seek_to_next_chapter():
self._no_playback_message()
def action_previous_chapter(self) -> None:
if not self.playback.seek_to_previous_chapter():
self._no_playback_message()
def action_increase_speed(self) -> None:
if not self.playback.increase_speed():
self._no_playback_message()
def action_decrease_speed(self) -> None:
if not self.playback.decrease_speed():
self._no_playback_message()
def action_toggle_finished(self) -> None:
asin = self._get_selected_asin()
if asin:
self._toggle_finished_async(asin)
@work(exclusive=True, thread=True)
def _toggle_finished_async(self, asin: str) -> None:
if not self.library_client:
return
selected_item = None
for item in self.current_items:
if self.library_client.extract_asin(item) == asin:
selected_item = item
break
if not selected_item:
return
if self.library_client.is_finished(selected_item):
self.call_from_thread(self.update_status,
"Already marked as finished")
return
success = self.library_client.mark_as_finished(asin, selected_item)
message = "Marked as finished" if success else "Failed to mark as finished"
self.call_from_thread(self.update_status, message)
if success:
if self.download_manager and self.download_manager.is_cached(asin):
self.download_manager.remove_cached(
asin, notify=self._thread_status_update
)
self.call_from_thread(self.fetch_library)
def _no_playback_message(self) -> None:
self.update_status("No playback active. Press Enter to play a book.")
def action_show_help(self) -> None:
self.push_screen(HelpScreen())
def action_show_stats(self) -> None:
self.push_screen(StatsScreen())
def action_filter(self) -> None:
self.push_screen(
FilterScreen(
self.filter_text,
on_change=self._apply_filter,
),
self._apply_filter,
)
def action_clear_filter(self) -> None:
if self.filter_text:
self.filter_text = ""
self._refresh_filtered_view()
self.update_status("Filter cleared")
def _apply_filter(self, filter_text: str | None) -> None:
self.filter_text = filter_text or ""
self._refresh_filtered_view()
def action_toggle_download(self) -> None:
asin = self._get_selected_asin()
if asin:
self._toggle_download_async(asin)
@work(exclusive=True, thread=True)
def _toggle_download_async(self, asin: str) -> None:
if not self.download_manager:
return
if self.download_manager.is_cached(asin):
self.download_manager.remove_cached(
asin, self._thread_status_update)
else:
self.download_manager.get_or_download(
asin, self._thread_status_update)
self.call_from_thread(self._refresh_table)
@work(exclusive=True, thread=True)
def _start_playback_async(self, asin: str) -> None:
if not self.download_manager:
return
self.playback.prepare_and_start(
self.download_manager,
asin,
self._thread_status_update,
)

23
auditui/app/bindings.py Normal file
View File

@@ -0,0 +1,23 @@
"""Key bindings for the main app."""
BINDINGS = [
("?", "show_help", "Help"),
("s", "show_stats", "Stats"),
("/", "filter", "Filter"),
("escape", "clear_filter", "Clear filter"),
("n", "sort", "Sort by name"),
("p", "sort_by_progress", "Sort by progress"),
("a", "show_all", "All/Unfinished"),
("r", "refresh", "Refresh"),
("enter", "play_selected", "Play"),
("space", "toggle_playback", "Pause/Resume"),
("left", "seek_backward", "-30s"),
("right", "seek_forward", "+30s"),
("ctrl+left", "previous_chapter", "Previous chapter"),
("ctrl+right", "next_chapter", "Next chapter"),
("up", "increase_speed", "Increase speed"),
("down", "decrease_speed", "Decrease speed"),
("f", "toggle_finished", "Mark finished"),
("d", "toggle_download", "Download/Delete"),
("q", "quit", "Quit"),
]

108
auditui/app/layout.py Normal file
View File

@@ -0,0 +1,108 @@
"""Main layout: compose, mount, resize, status bar, table column widths."""
from __future__ import annotations
from textual.app import ComposeResult
from textual.containers import Horizontal
from textual.events import Resize
from textual.widgets import DataTable, ProgressBar, Static
from .. import __version__
from ..constants import TABLE_COLUMN_DEFS, TABLE_CSS
class AppLayoutMixin:
def compose(self) -> ComposeResult:
yield Horizontal(
Static("? Help", id="top_left"),
Static(f"Auditui v{__version__}", id="top_center"),
Static("q Quit", id="top_right"),
id="top_bar",
)
yield Static("Loading...", id="status")
table = DataTable()
table.zebra_stripes = True
table.cursor_type = "row"
yield table
yield Static("", id="progress_info")
with Horizontal(id="progress_bar_container"):
yield ProgressBar(
id="progress_bar", show_eta=False, show_percentage=False, total=100
)
def on_mount(self) -> None:
self.theme = "textual-dark"
table = self.query_one(DataTable)
for column_name, _ratio in TABLE_COLUMN_DEFS:
table.add_column(column_name)
self.call_after_refresh(lambda: self._apply_column_widths(table))
column_keys = list(table.columns.keys())
self.title_column_key = column_keys[0]
if self.client:
self.update_status("Fetching library...")
self.fetch_library()
else:
self.update_status(
"Not authenticated. Please restart and authenticate.")
self.set_interval(1.0, self._check_playback_status)
self.set_interval(0.5, self._update_progress)
self.set_interval(30.0, self._save_position_periodically)
def on_unmount(self) -> None:
self.playback.stop()
if self.download_manager:
self.download_manager.close()
def on_resize(self, event: Resize) -> None:
del event
try:
table = self.query_one(DataTable)
except Exception:
return
self.call_after_refresh(lambda: self._apply_column_widths(table))
def update_status(self, message: str) -> None:
status = self.query_one("#status", Static)
status.display = True
status.update(message)
def _apply_column_widths(self, table: DataTable) -> None:
if not table.columns:
return
column_keys = list(table.columns.keys())
num_cols = len(column_keys)
ratios = [ratio for _, ratio in TABLE_COLUMN_DEFS]
total_ratio = sum(ratios) or num_cols
content_width = table.scrollable_content_region.width
if content_width <= 0:
content_width = table.size.width
if content_width <= 0:
return
padding_total = 2 * table.cell_padding * num_cols
distributable = max(num_cols, content_width - padding_total)
widths = []
for ratio in ratios:
w = max(1, (distributable * ratio) // total_ratio)
widths.append(w)
remainder = distributable - sum(widths)
if remainder > 0:
indices = sorted(
range(num_cols), key=lambda i: ratios[i], reverse=True)
for i in range(remainder):
widths[indices[i % num_cols]] += 1
for column_key, w in zip(column_keys, widths):
col = table.columns[column_key]
col.auto_width = False
col.width = w
table.refresh()
def _thread_status_update(self, message: str) -> None:
self.call_from_thread(self.update_status, message)

36
auditui/app/library.py Normal file
View File

@@ -0,0 +1,36 @@
"""Library fetch and load/error handlers."""
from __future__ import annotations
from textual import work
from textual.worker import get_current_worker
from ..types import LibraryItem
class AppLibraryMixin:
@work(exclusive=True, thread=True)
def fetch_library(self) -> None:
worker = get_current_worker()
if worker.is_cancelled or not self.library_client:
return
try:
all_items = self.library_client.fetch_all_items(
self._thread_status_update)
self.call_from_thread(self.on_library_loaded, all_items)
except (OSError, ValueError, KeyError) as exc:
self.call_from_thread(self.on_library_error, str(exc))
def on_library_loaded(self, items: list[LibraryItem]) -> None:
self.all_items = items
self._search_text_cache.clear()
self._prime_search_cache(items)
self.update_status(f"Loaded {len(items)} books")
if self.show_all_mode:
self.show_all()
else:
self.show_unfinished()
def on_library_error(self, error: str) -> None:
self.update_status(f"Error fetching library: {error}")

94
auditui/app/progress.py Normal file
View File

@@ -0,0 +1,94 @@
"""Playback key handling, progress bar updates, and position save."""
from __future__ import annotations
from textual.containers import Horizontal
from textual.events import Key
from textual.widgets import DataTable, ProgressBar, Static
from ..library import LibraryClient
class AppProgressMixin:
def on_key(self, event: Key) -> None:
if self.playback.is_playing:
if event.key == "ctrl+left":
event.prevent_default()
self.action_previous_chapter()
return
elif event.key == "ctrl+right":
event.prevent_default()
self.action_next_chapter()
return
elif event.key == "left":
event.prevent_default()
self.action_seek_backward()
return
elif event.key == "right":
event.prevent_default()
self.action_seek_forward()
return
elif event.key == "up":
event.prevent_default()
self.action_increase_speed()
return
elif event.key == "down":
event.prevent_default()
self.action_decrease_speed()
return
if isinstance(self.focused, DataTable):
if event.key == "enter":
event.prevent_default()
self.action_play_selected()
elif event.key == "space":
event.prevent_default()
self.action_toggle_playback()
def _check_playback_status(self) -> None:
message = self.playback.check_status()
if message:
self.update_status(message)
self._hide_progress()
def _update_progress(self) -> None:
if not self.playback.is_playing:
self._hide_progress()
return
progress_data = self.playback.get_current_progress()
if not progress_data:
self._hide_progress()
return
chapter_name, chapter_elapsed, chapter_total = progress_data
if chapter_total <= 0:
self._hide_progress()
return
progress_info = self.query_one("#progress_info", Static)
progress_bar = self.query_one("#progress_bar", ProgressBar)
progress_bar_container = self.query_one(
"#progress_bar_container", Horizontal)
progress_percent = min(
100.0, max(0.0, (chapter_elapsed / chapter_total) * 100.0)
)
progress_bar.update(progress=progress_percent)
chapter_elapsed_str = LibraryClient.format_time(chapter_elapsed)
chapter_total_str = LibraryClient.format_time(chapter_total)
progress_info.update(
f"{chapter_name} | {chapter_elapsed_str} / {chapter_total_str}"
)
progress_info.display = True
progress_bar_container.display = True
def _hide_progress(self) -> None:
progress_info = self.query_one("#progress_info", Static)
progress_bar_container = self.query_one(
"#progress_bar_container", Horizontal)
progress_info.display = False
progress_bar_container.display = False
def _save_position_periodically(self) -> None:
self.playback.update_position_if_needed()

31
auditui/app/state.py Normal file
View File

@@ -0,0 +1,31 @@
"""App state initialization."""
from __future__ import annotations
from ..constants import PROGRESS_COLUMN_INDEX
from ..downloads import DownloadManager
from ..library import LibraryClient
from ..playback import PlaybackController
def init_auditui_state(self: object, auth=None, client=None) -> None:
setattr(self, "auth", auth)
setattr(self, "client", client)
setattr(self, "library_client", LibraryClient(client) if client else None)
setattr(
self,
"download_manager",
DownloadManager(auth, client) if auth and client else None,
)
notify = getattr(self, "update_status")
lib_client = LibraryClient(client) if client else None
setattr(self, "playback", PlaybackController(notify, lib_client))
setattr(self, "all_items", [])
setattr(self, "current_items", [])
setattr(self, "_search_text_cache", {})
setattr(self, "show_all_mode", False)
setattr(self, "filter_text", "")
setattr(self, "title_sort_reverse", False)
setattr(self, "progress_sort_reverse", False)
setattr(self, "title_column_key", None)
setattr(self, "progress_column_index", PROGRESS_COLUMN_INDEX)

106
auditui/app/table.py Normal file
View File

@@ -0,0 +1,106 @@
"""Table population, sorting, filter view, and search cache."""
from __future__ import annotations
from ..library import (
filter_items,
filter_unfinished_items,
format_item_as_row,
create_progress_sort_key,
create_title_sort_key,
)
from ..types import LibraryItem
from textual.widgets import DataTable, Static
class AppTableMixin:
def _populate_table(self, items: list[LibraryItem]) -> None:
table = self.query_one(DataTable)
table.clear()
if not items or not self.library_client:
self.update_status("No books found.")
return
for item in items:
title, author, runtime, progress, downloaded = format_item_as_row(
item, self.library_client, self.download_manager
)
table.add_row(title, author, runtime,
progress, downloaded, key=title)
self.current_items = items
status = self.query_one("#status", Static)
status.display = False
self._apply_column_widths(table)
def _refresh_table(self) -> None:
if self.current_items:
self._populate_table(self.current_items)
def show_all(self) -> None:
if not self.all_items:
return
self.show_all_mode = True
self._refresh_filtered_view()
def show_unfinished(self) -> None:
if not self.all_items or not self.library_client:
return
self.show_all_mode = False
self._refresh_filtered_view()
def action_sort(self) -> None:
table = self.query_one(DataTable)
if table.row_count > 0 and self.title_column_key:
title_key, reverse = create_title_sort_key(self.title_sort_reverse)
table.sort(key=title_key, reverse=reverse)
self.title_sort_reverse = not self.title_sort_reverse
def action_sort_by_progress(self) -> None:
table = self.query_one(DataTable)
if table.row_count > 0:
self.progress_sort_reverse = not self.progress_sort_reverse
progress_key, reverse = create_progress_sort_key(
self.progress_column_index, self.progress_sort_reverse
)
table.sort(key=progress_key, reverse=reverse)
def action_show_all(self) -> None:
if self.show_all_mode:
self.show_unfinished()
else:
self.show_all()
def _refresh_filtered_view(self) -> None:
if not self.all_items:
return
items = self.all_items
if self.filter_text:
items = filter_items(items, self.filter_text,
self._get_search_text)
self._populate_table(items)
self.update_status(
f"Filter: '{self.filter_text}' ({len(items)} books)")
return
if not self.show_all_mode and self.library_client:
items = filter_unfinished_items(items, self.library_client)
self._populate_table(items)
def _get_search_text(self, item: LibraryItem) -> str:
cache_key = id(item)
cached = self._search_text_cache.get(cache_key)
if cached is not None:
return cached
from ..library import build_search_text
search_text = build_search_text(item, self.library_client)
self._search_text_cache[cache_key] = search_text
return search_text
def _prime_search_cache(self, items: list[LibraryItem]) -> None:
for item in items:
self._get_search_text(item)

View File

@@ -1,19 +1,20 @@
"""Authentication helpers for the Auditui app.""" """Load saved Audible credentials and build authenticator and API client."""
from pathlib import Path from pathlib import Path
import audible import audible
from .constants import AUTH_PATH from ..constants import AUTH_PATH
def authenticate( def authenticate(
auth_path: Path = AUTH_PATH, auth_path: Path = AUTH_PATH,
) -> tuple[audible.Authenticator, audible.Client]: ) -> tuple[audible.Authenticator, audible.Client]:
"""Authenticate with Audible and return authenticator and client.""" """Load auth from file and return (Authenticator, Client). Raises if file missing or invalid."""
if not auth_path.exists(): if not auth_path.exists():
raise FileNotFoundError( raise FileNotFoundError(
"Authentication file not found. Please run 'auditui configure' to set up authentication.") "Authentication file not found. Please run 'auditui configure' to set up authentication."
)
try: try:
authenticator = audible.Authenticator.from_file(str(auth_path)) authenticator = audible.Authenticator.from_file(str(auth_path))
@@ -21,4 +22,5 @@ def authenticate(
return authenticator, audible_client return authenticator, audible_client
except (OSError, ValueError, KeyError) as exc: except (OSError, ValueError, KeyError) as exc:
raise ValueError( raise ValueError(
f"Failed to load existing authentication: {exc}") from exc f"Failed to load existing authentication: {exc}"
) from exc

5
auditui/cli/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
"""CLI package; entry point is main() from .main."""
from .main import main
__all__ = ["main"]

View File

@@ -1,5 +1,4 @@
#!/usr/bin/env python3 """CLI entrypoint: configure subcommand or authenticate and run the TUI."""
"""Auditui entrypoint."""
import argparse import argparse
import sys import sys
@@ -12,7 +11,6 @@ from auditui.constants import AUTH_PATH
def main() -> None: def main() -> None:
"""Authenticate and launch the app."""
parser = argparse.ArgumentParser(prog="auditui") parser = argparse.ArgumentParser(prog="auditui")
parser.add_argument( parser.add_argument(
"-v", "-v",
@@ -52,7 +50,3 @@ def main() -> None:
app = Auditui(auth=auth, client=client) app = Auditui(auth=auth, client=client)
app.run() app.run()
if __name__ == "__main__":
main()

View File

@@ -1,4 +1,4 @@
"""Configuration helpers for the Auditui app.""" """Interactive setup of Audible credentials; writes auth and config files."""
import json import json
from getpass import getpass from getpass import getpass
@@ -6,13 +6,13 @@ from pathlib import Path
import audible import audible
from .constants import AUTH_PATH, CONFIG_PATH from ..constants import AUTH_PATH, CONFIG_PATH
def configure( def configure(
auth_path: Path = AUTH_PATH, auth_path: Path = AUTH_PATH,
) -> tuple[audible.Authenticator, audible.Client]: ) -> tuple[audible.Authenticator, audible.Client]:
"""Force re-authentication and save credentials.""" """Prompt for email/password/locale, authenticate, and save auth.json and config.json."""
if auth_path.exists(): if auth_path.exists():
response = input( response = input(
"Configuration already exists. Are you sure you want to overwrite it? (y/N): " "Configuration already exists. Are you sure you want to overwrite it? (y/N): "
@@ -26,7 +26,8 @@ def configure(
email = input("\nEmail: ") email = input("\nEmail: ")
password = getpass("Password: ") password = getpass("Password: ")
marketplace = input( marketplace = input(
"Marketplace locale (default: US): ").strip().upper() or "US" "Marketplace locale (default: US): "
).strip().upper() or "US"
authenticator = audible.Authenticator.from_login( authenticator = audible.Authenticator.from_login(
username=email, password=password, locale=marketplace username=email, password=password, locale=marketplace

View File

@@ -1,4 +1,4 @@
"""Shared constants for the Auditui application.""" """Paths, API/config values, and CSS used across the application."""
from pathlib import Path from pathlib import Path

View File

@@ -0,0 +1,5 @@
"""Download and cache of Audible AAX files."""
from .manager import DownloadManager
__all__ = ["DownloadManager"]

View File

@@ -1,27 +1,25 @@
"""Download helpers for Audible content.""" """Obtains AAX files from Audible (cache or download) and provides activation bytes."""
import re import re
from pathlib import Path from pathlib import Path
from typing import Callable
from urllib.parse import urlparse from urllib.parse import urlparse
import audible import audible
import httpx import httpx
from audible.activation_bytes import get_activation_bytes from audible.activation_bytes import get_activation_bytes
from .constants import ( from ..constants import (
CACHE_DIR, CACHE_DIR,
DEFAULT_CHUNK_SIZE, DEFAULT_CHUNK_SIZE,
DEFAULT_CODEC, DEFAULT_CODEC,
DOWNLOAD_URL, DOWNLOAD_URL,
MIN_FILE_SIZE, MIN_FILE_SIZE,
) )
from ..types import StatusCallback
StatusCallback = Callable[[str], None]
class DownloadManager: class DownloadManager:
"""Handle retrieval and download of Audible titles.""" """Obtains AAX files from Audible (cache or download) and provides activation bytes."""
def __init__( def __init__(
self, self,
@@ -35,16 +33,18 @@ class DownloadManager:
self.cache_dir = cache_dir self.cache_dir = cache_dir
self.cache_dir.mkdir(parents=True, exist_ok=True) self.cache_dir.mkdir(parents=True, exist_ok=True)
self.chunk_size = chunk_size self.chunk_size = chunk_size
self._http_client = httpx.Client(auth=auth, timeout=30.0, follow_redirects=True) self._http_client = httpx.Client(
auth=auth, timeout=30.0, follow_redirects=True)
self._download_client = httpx.Client( self._download_client = httpx.Client(
timeout=httpx.Timeout(connect=30.0, read=None, write=30.0, pool=30.0), timeout=httpx.Timeout(connect=30.0, read=None,
write=30.0, pool=30.0),
follow_redirects=True, follow_redirects=True,
) )
def get_or_download( def get_or_download(
self, asin: str, notify: StatusCallback | None = None self, asin: str, notify: StatusCallback | None = None
) -> Path | None: ) -> Path | None:
"""Get local path of AAX file, downloading if missing.""" """Return local path to AAX file; download and cache if not present."""
title = self._get_name_from_asin(asin) or asin title = self._get_name_from_asin(asin) or asin
safe_title = self._sanitize_filename(title) safe_title = self._sanitize_filename(title)
local_path = self.cache_dir / f"{safe_title}.aax" local_path = self.cache_dir / f"{safe_title}.aax"
@@ -81,7 +81,7 @@ class DownloadManager:
return local_path return local_path
def get_activation_bytes(self) -> str | None: def get_activation_bytes(self) -> str | None:
"""Get activation bytes as hex string.""" """Return activation bytes as hex string for ffplay/ffmpeg."""
try: try:
activation_bytes = get_activation_bytes(self.auth) activation_bytes = get_activation_bytes(self.auth)
if isinstance(activation_bytes, bytes): if isinstance(activation_bytes, bytes):
@@ -91,7 +91,7 @@ class DownloadManager:
return None return None
def get_cached_path(self, asin: str) -> Path | None: def get_cached_path(self, asin: str) -> Path | None:
"""Get the cached file path for a book if it exists.""" """Return path to cached AAX file if it exists and is valid size."""
title = self._get_name_from_asin(asin) or asin title = self._get_name_from_asin(asin) or asin
safe_title = self._sanitize_filename(title) safe_title = self._sanitize_filename(title)
local_path = self.cache_dir / f"{safe_title}.aax" local_path = self.cache_dir / f"{safe_title}.aax"
@@ -100,11 +100,11 @@ class DownloadManager:
return None return None
def is_cached(self, asin: str) -> bool: def is_cached(self, asin: str) -> bool:
"""Check if a book is already cached.""" """Return True if the title is present in cache with valid size."""
return self.get_cached_path(asin) is not None return self.get_cached_path(asin) is not None
def remove_cached(self, asin: str, notify: StatusCallback | None = None) -> bool: def remove_cached(self, asin: str, notify: StatusCallback | None = None) -> bool:
"""Remove a cached book file.""" """Delete the cached AAX file for the given ASIN. Returns True on success."""
cached_path = self.get_cached_path(asin) cached_path = self.get_cached_path(asin)
if not cached_path: if not cached_path:
if notify: if notify:
@@ -151,7 +151,7 @@ class DownloadManager:
codec: str = DEFAULT_CODEC, codec: str = DEFAULT_CODEC,
notify: StatusCallback | None = None, notify: StatusCallback | None = None,
) -> str | None: ) -> str | None:
"""Get download link for book.""" """Obtain CDN download URL for the given ASIN and codec."""
if self.auth.adp_token is None: if self.auth.adp_token is None:
if notify: if notify:
notify("Missing ADP token (not authenticated?)") notify("Missing ADP token (not authenticated?)")
@@ -189,7 +189,7 @@ class DownloadManager:
def _download_file( def _download_file(
self, url: str, dest_path: Path, notify: StatusCallback | None = None self, url: str, dest_path: Path, notify: StatusCallback | None = None
) -> Path | None: ) -> Path | None:
"""Download file from URL to destination.""" """Stream download from URL to dest_path; reports progress via notify."""
try: try:
with self._download_client.stream("GET", url) as response: with self._download_client.stream("GET", url) as response:
response.raise_for_status() response.raise_for_status()
@@ -240,7 +240,7 @@ class DownloadManager:
return None return None
def close(self) -> None: def close(self) -> None:
"""Close the HTTP clients and release resources.""" """Close internal HTTP clients. Safe to call multiple times."""
if hasattr(self, "_http_client"): if hasattr(self, "_http_client"):
self._http_client.close() self._http_client.close()
if hasattr(self, "_download_client"): if hasattr(self, "_download_client"):

View File

@@ -0,0 +1,22 @@
"""Fetching, formatting, and filtering of the user's Audible library."""
from .client import LibraryClient
from .search import build_search_text, filter_items
from .table import (
create_progress_sort_key,
create_title_sort_key,
filter_unfinished_items,
format_item_as_row,
truncate_author_name,
)
__all__ = [
"LibraryClient",
"build_search_text",
"filter_items",
"create_progress_sort_key",
"create_title_sort_key",
"filter_unfinished_items",
"format_item_as_row",
"truncate_author_name",
]

View File

@@ -1,21 +1,19 @@
"""Library helpers for fetching and formatting Audible data.""" """Client for the Audible library API."""
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Callable
import audible import audible
from ..types import LibraryItem, StatusCallback
ProgressCallback = Callable[[str], None]
class LibraryClient: class LibraryClient:
"""Helper for interacting with the Audible library.""" """Client for the Audible library API. Fetches items, extracts metadata, and updates positions."""
def __init__(self, client: audible.Client) -> None: def __init__(self, client: audible.Client) -> None:
self.client = client self.client = client
def fetch_all_items(self, on_progress: ProgressCallback | None = None) -> list: def fetch_all_items(self, on_progress: StatusCallback | None = None) -> list[LibraryItem]:
"""Fetch all library items from the API.""" """Fetch all library items from the API."""
response_groups = ( response_groups = (
"contributors,media,product_attrs,product_desc,product_details," "contributors,media,product_attrs,product_desc,product_details,"
@@ -25,8 +23,8 @@ class LibraryClient:
def _fetch_page( def _fetch_page(
self, page: int, page_size: int, response_groups: str self, page: int, page_size: int, response_groups: str
) -> tuple[int, list[dict]]: ) -> tuple[int, list[LibraryItem]]:
"""Fetch a single page of library items.""" """Fetch a single page of library items from the API."""
library = self.client.get( library = self.client.get(
path="library", path="library",
num_results=page_size, num_results=page_size,
@@ -37,9 +35,9 @@ class LibraryClient:
return page, list(items) return page, list(items)
def _fetch_all_pages( def _fetch_all_pages(
self, response_groups: str, on_progress: ProgressCallback | None = None self, response_groups: str, on_progress: StatusCallback | None = None
) -> list: ) -> list[LibraryItem]:
"""Fetch all pages of library items from the API using maximum parallel fetching.""" """Fetch all pages of library items using parallel requests."""
library_response = None library_response = None
page_size = 200 page_size = 200
@@ -63,7 +61,7 @@ class LibraryClient:
if not first_page_items: if not first_page_items:
return [] return []
all_items: list[dict] = list(first_page_items) all_items: list[LibraryItem] = list(first_page_items)
if on_progress: if on_progress:
on_progress(f"Fetched page 1 ({len(first_page_items)} items)...") on_progress(f"Fetched page 1 ({len(first_page_items)} items)...")
@@ -80,7 +78,7 @@ class LibraryClient:
estimated_pages = 500 estimated_pages = 500
max_workers = 50 max_workers = 50
page_results: dict[int, list[dict]] = {} page_results: dict[int, list[LibraryItem]] = {}
with ThreadPoolExecutor(max_workers=max_workers) as executor: with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_page: dict = {} future_to_page: dict = {}
@@ -119,8 +117,8 @@ class LibraryClient:
return all_items return all_items
def extract_title(self, item: dict) -> str: def extract_title(self, item: LibraryItem) -> str:
"""Extract title from library item.""" """Return the book title from a library item."""
product = item.get("product", {}) product = item.get("product", {})
return ( return (
product.get("title") product.get("title")
@@ -128,8 +126,8 @@ class LibraryClient:
or product.get("asin", "Unknown Title") or product.get("asin", "Unknown Title")
) )
def extract_authors(self, item: dict) -> str: def extract_authors(self, item: LibraryItem) -> str:
"""Extract author names from library item.""" """Return comma-separated author names from a library item."""
product = item.get("product", {}) product = item.get("product", {})
authors = product.get("authors") or product.get("contributors") or [] authors = product.get("authors") or product.get("contributors") or []
if not authors and "authors" in item: if not authors and "authors" in item:
@@ -139,8 +137,8 @@ class LibraryClient:
for a in authors if isinstance(a, dict)] for a in authors if isinstance(a, dict)]
return ", ".join(author_names) or "Unknown" return ", ".join(author_names) or "Unknown"
def extract_runtime_minutes(self, item: dict) -> int | None: def extract_runtime_minutes(self, item: LibraryItem) -> int | None:
"""Extract runtime in minutes from library item.""" """Return runtime in minutes if present."""
product = item.get("product", {}) product = item.get("product", {})
runtime_fields = [ runtime_fields = [
"runtime_length_min", "runtime_length_min",
@@ -165,8 +163,8 @@ class LibraryClient:
return int(runtime) return int(runtime)
return None return None
def extract_progress_info(self, item: dict) -> float | None: def extract_progress_info(self, item: LibraryItem) -> float | None:
"""Extract progress percentage from library item.""" """Return progress percentage (0100) if present."""
percent_complete = item.get("percent_complete") percent_complete = item.get("percent_complete")
listening_status = item.get("listening_status", {}) listening_status = item.get("listening_status", {})
@@ -175,13 +173,13 @@ class LibraryClient:
return float(percent_complete) if percent_complete is not None else None return float(percent_complete) if percent_complete is not None else None
def extract_asin(self, item: dict) -> str | None: def extract_asin(self, item: LibraryItem) -> str | None:
"""Extract ASIN from library item.""" """Return the ASIN for a library item."""
product = item.get("product", {}) product = item.get("product", {})
return item.get("asin") or product.get("asin") return item.get("asin") or product.get("asin")
def is_finished(self, item: dict) -> bool: def is_finished(self, item: LibraryItem) -> bool:
"""Check if a library item is finished.""" """Return True if the item is marked or inferred as finished."""
is_finished_flag = item.get("is_finished") is_finished_flag = item.get("is_finished")
percent_complete = item.get("percent_complete") percent_complete = item.get("percent_complete")
listening_status = item.get("listening_status") listening_status = item.get("listening_status")
@@ -194,8 +192,8 @@ class LibraryClient:
percent_complete = listening_status.get("percent_complete", 0) percent_complete = listening_status.get("percent_complete", 0)
return bool(is_finished_flag) or ( return bool(is_finished_flag) or (
isinstance(percent_complete, (int, float) isinstance(percent_complete, (int, float))
) and percent_complete >= 100 and percent_complete >= 100
) )
def get_last_position(self, asin: str) -> float | None: def get_last_position(self, asin: str) -> float | None:
@@ -227,7 +225,7 @@ class LibraryClient:
return None return None
def _get_content_reference(self, asin: str) -> dict | None: def _get_content_reference(self, asin: str) -> dict | None:
"""Get content reference data including ACR and version.""" """Fetch content reference (ACR and version) for position updates."""
try: try:
response = self.client.get( response = self.client.get(
path=f"1.0/content/{asin}/metadata", path=f"1.0/content/{asin}/metadata",
@@ -242,7 +240,7 @@ class LibraryClient:
return None return None
def _update_position(self, asin: str, position_seconds: float) -> bool: def _update_position(self, asin: str, position_seconds: float) -> bool:
"""Update the playback position for a book.""" """Persist playback position to the API. Returns True on success."""
if position_seconds < 0: if position_seconds < 0:
return False return False
@@ -273,7 +271,7 @@ class LibraryClient:
return False return False
def save_last_position(self, asin: str, position_seconds: float) -> bool: def save_last_position(self, asin: str, position_seconds: float) -> bool:
"""Save the last playback position for a book.""" """Save playback position to Audible. Returns True on success."""
if position_seconds <= 0: if position_seconds <= 0:
return False return False
return self._update_position(asin, position_seconds) return self._update_position(asin, position_seconds)
@@ -282,7 +280,7 @@ class LibraryClient:
def format_duration( def format_duration(
value: int | None, unit: str = "minutes", default_none: str | None = None value: int | None, unit: str = "minutes", default_none: str | None = None
) -> str | None: ) -> str | None:
"""Format duration value into a compact string.""" """Format a duration value as e.g. 2h30m or 45m."""
if value is None or value <= 0: if value is None or value <= 0:
return default_none return default_none
@@ -296,8 +294,8 @@ class LibraryClient:
return f"{hours}h{minutes:02d}" if minutes else f"{hours}h" return f"{hours}h{minutes:02d}" if minutes else f"{hours}h"
return f"{minutes}m" return f"{minutes}m"
def mark_as_finished(self, asin: str, item: dict | None = None) -> bool: def mark_as_finished(self, asin: str, item: LibraryItem | None = None) -> bool:
"""Mark a book as finished by setting position to the end.""" """Mark a book as finished on Audible. Optionally mutates item in place."""
total_ms = self._get_runtime_ms(asin, item) total_ms = self._get_runtime_ms(asin, item)
if not total_ms: if not total_ms:
return False return False
@@ -321,8 +319,8 @@ class LibraryClient:
except Exception: except Exception:
return False return False
def _get_runtime_ms(self, asin: str, item: dict | None = None) -> int | None: def _get_runtime_ms(self, asin: str, item: LibraryItem | None = None) -> int | None:
"""Get total runtime in milliseconds.""" """Return total runtime in ms from item or API."""
if item: if item:
runtime_min = self.extract_runtime_minutes(item) runtime_min = self.extract_runtime_minutes(item)
if runtime_min: if runtime_min:
@@ -340,7 +338,7 @@ class LibraryClient:
return None return None
def _get_acr(self, asin: str) -> str | None: def _get_acr(self, asin: str) -> str | None:
"""Get ACR token needed for position updates.""" """Fetch ACR token required for position and finish updates."""
try: try:
response = self.client.post( response = self.client.post(
path=f"1.0/content/{asin}/licenserequest", path=f"1.0/content/{asin}/licenserequest",
@@ -356,7 +354,7 @@ class LibraryClient:
@staticmethod @staticmethod
def format_time(seconds: float) -> str: def format_time(seconds: float) -> str:
"""Format seconds as HH:MM:SS or MM:SS.""" """Format seconds as HH:MM:SS or MM:SS for display."""
total_seconds = int(seconds) total_seconds = int(seconds)
hours = total_seconds // 3600 hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60 minutes = (total_seconds % 3600) // 60

View File

@@ -1,14 +1,16 @@
"""Search helpers for filtering library items.""" """Text search over library items for the filter feature."""
from __future__ import annotations from __future__ import annotations
from typing import Callable from typing import Callable
from .library import LibraryClient from ..types import LibraryItem
from .client import LibraryClient
def build_search_text(item: dict, library_client: LibraryClient | None) -> str: def build_search_text(item: LibraryItem, library_client: LibraryClient | None) -> str:
"""Build a lowercase search string for an item.""" """Build a single lowercase string from title and authors for matching."""
if library_client: if library_client:
title = library_client.extract_title(item) title = library_client.extract_title(item)
authors = library_client.extract_authors(item) authors = library_client.extract_authors(item)
@@ -23,11 +25,11 @@ def build_search_text(item: dict, library_client: LibraryClient | None) -> str:
def filter_items( def filter_items(
items: list[dict], items: list[LibraryItem],
filter_text: str, filter_text: str,
get_search_text: Callable[[dict], str], get_search_text: Callable[[LibraryItem], str],
) -> list[dict]: ) -> list[LibraryItem]:
"""Filter items by a search string.""" """Return items whose search text contains filter_text (case-insensitive)."""
if not filter_text: if not filter_text:
return items return items
filter_lower = filter_text.lower() filter_lower = filter_text.lower()

View File

@@ -1,20 +1,21 @@
"""Utils for table operations.""" """Formatting and sorting of library items for the main table."""
import unicodedata import unicodedata
from typing import TYPE_CHECKING, Callable from typing import TYPE_CHECKING, Callable
from .constants import ( from ..constants import (
AUTHOR_NAME_DISPLAY_LENGTH, AUTHOR_NAME_DISPLAY_LENGTH,
AUTHOR_NAME_MAX_LENGTH, AUTHOR_NAME_MAX_LENGTH,
PROGRESS_COLUMN_INDEX, PROGRESS_COLUMN_INDEX,
) )
from ..types import LibraryItem
if TYPE_CHECKING: if TYPE_CHECKING:
from .downloads import DownloadManager from ..downloads import DownloadManager
def create_title_sort_key(reverse: bool = False) -> tuple[Callable, bool]: def create_title_sort_key(reverse: bool = False) -> tuple[Callable, bool]:
"""Create a sort key function for sorting by title.""" """Return a (key_fn, reverse) pair for DataTable sort by title column."""
def title_key(row_values): def title_key(row_values):
title_cell = row_values[0] title_cell = row_values[0]
if isinstance(title_cell, str): if isinstance(title_cell, str):
@@ -26,7 +27,7 @@ def create_title_sort_key(reverse: bool = False) -> tuple[Callable, bool]:
def create_progress_sort_key(progress_column_index: int = PROGRESS_COLUMN_INDEX, reverse: bool = False) -> tuple[Callable, bool]: def create_progress_sort_key(progress_column_index: int = PROGRESS_COLUMN_INDEX, reverse: bool = False) -> tuple[Callable, bool]:
"""Create a sort key function for sorting by progress percentage.""" """Return a (key_fn, reverse) pair for DataTable sort by progress column."""
def progress_key(row_values): def progress_key(row_values):
progress_cell = row_values[progress_column_index] progress_cell = row_values[progress_column_index]
if isinstance(progress_cell, str): if isinstance(progress_cell, str):
@@ -40,18 +41,14 @@ def create_progress_sort_key(progress_column_index: int = PROGRESS_COLUMN_INDEX,
def truncate_author_name(author_names: str) -> str: def truncate_author_name(author_names: str) -> str:
"""Truncate author name if it exceeds maximum length.""" """Truncate author string to display length with ellipsis if over max."""
if author_names and len(author_names) > AUTHOR_NAME_MAX_LENGTH: if author_names and len(author_names) > AUTHOR_NAME_MAX_LENGTH:
return f"{author_names[:AUTHOR_NAME_DISPLAY_LENGTH]}..." return f"{author_names[:AUTHOR_NAME_DISPLAY_LENGTH]}..."
return author_names return author_names
def format_item_as_row(item: dict, library_client, download_manager: "DownloadManager | None" = None) -> tuple[str, str, str, str, str]: def format_item_as_row(item: LibraryItem, library_client, download_manager: "DownloadManager | None" = None) -> tuple[str, str, str, str, str]:
"""Format a library item into table row data. """Turn a library item into (title, author, runtime, progress, downloaded) for the table."""
Returns:
Tuple of (title, author, runtime, progress, downloaded) strings
"""
title = library_client.extract_title(item) title = library_client.extract_title(item)
author_names = library_client.extract_authors(item) author_names = library_client.extract_authors(item)
@@ -79,8 +76,8 @@ def format_item_as_row(item: dict, library_client, download_manager: "DownloadMa
return (title, author_display, runtime_str, progress_str, downloaded_str) return (title, author_display, runtime_str, progress_str, downloaded_str)
def filter_unfinished_items(items: list[dict], library_client) -> list[dict]: def filter_unfinished_items(items: list[LibraryItem], library_client) -> list[LibraryItem]:
"""Filter out finished items from the list.""" """Return only items that are not marked as finished."""
return [ return [
item for item in items item for item in items
if not library_client.is_finished(item) if not library_client.is_finished(item)

View File

@@ -1,513 +0,0 @@
"""Playback control for Auditui."""
from __future__ import annotations
import os
import shutil
import signal
import subprocess
import time
from pathlib import Path
from typing import Callable
from .downloads import DownloadManager
from .library import LibraryClient
from .media_info import load_media_info
StatusCallback = Callable[[str], None]
MIN_SPEED = 0.5
MAX_SPEED = 2.0
SPEED_INCREMENT = 0.5
class PlaybackController:
"""Manage playback through ffplay."""
def __init__(self, notify: StatusCallback, library_client: LibraryClient | None = None) -> None:
self.notify = notify
self.library_client = library_client
self.playback_process: subprocess.Popen | None = None
self.is_playing = False
self.is_paused = False
self.current_file_path: Path | None = None
self.current_asin: str | None = None
self.playback_start_time: float | None = None
self.paused_duration: float = 0.0
self.pause_start_time: float | None = None
self.total_duration: float | None = None
self.chapters: list[dict] = []
self.seek_offset: float = 0.0
self.activation_hex: str | None = None
self.last_save_time: float = 0.0
self.position_save_interval: float = 30.0
self.playback_speed: float = 1.0
def start(
self,
path: Path,
activation_hex: str | None = None,
status_callback: StatusCallback | None = None,
start_position: float = 0.0,
speed: float | None = None,
) -> bool:
"""Start playing a local file using ffplay."""
notify = status_callback or self.notify
if not shutil.which("ffplay"):
notify("ffplay not found. Please install ffmpeg")
return False
if self.playback_process is not None:
self.stop()
self.activation_hex = activation_hex
self.seek_offset = start_position
if speed is not None:
self.playback_speed = speed
cmd = ["ffplay", "-nodisp", "-autoexit"]
if activation_hex:
cmd.extend(["-activation_bytes", activation_hex])
if start_position > 0:
cmd.extend(["-ss", str(start_position)])
if self.playback_speed != 1.0:
cmd.extend(["-af", f"atempo={self.playback_speed:.2f}"])
cmd.append(str(path))
try:
self.playback_process = subprocess.Popen(
cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
time.sleep(0.2)
if self.playback_process.poll() is not None:
return_code = self.playback_process.returncode
if return_code == 0 and start_position > 0 and self.total_duration:
if start_position >= self.total_duration - 5:
notify("Reached end of file")
self._reset_state()
return False
notify(
f"Playback process exited immediately (code: {return_code})")
self.playback_process = None
return False
self.is_playing = True
self.is_paused = False
self.current_file_path = path
self.playback_start_time = time.time()
self.paused_duration = 0.0
self.pause_start_time = None
duration, chapters = load_media_info(path, activation_hex)
self.total_duration = duration
self.chapters = chapters
notify(f"Playing: {path.name}")
return True
except (OSError, ValueError, subprocess.SubprocessError) as exc:
notify(f"Error starting playback: {exc}")
return False
def stop(self) -> None:
"""Stop the current playback."""
if self.playback_process is None:
return
self._save_current_position()
try:
if self.playback_process.poll() is None:
self.playback_process.terminate()
try:
self.playback_process.wait(timeout=2)
except subprocess.TimeoutExpired:
self.playback_process.kill()
self.playback_process.wait()
except (ProcessLookupError, ValueError):
pass
finally:
self._reset_state()
def pause(self) -> None:
"""Pause the current playback."""
if not self._validate_playback_state(require_paused=False):
return
self.pause_start_time = time.time()
self._send_signal(signal.SIGSTOP, "Paused", "pause")
def resume(self) -> None:
"""Resume the current playback."""
if not self._validate_playback_state(require_paused=True):
return
if self.pause_start_time is not None:
self.paused_duration += time.time() - self.pause_start_time
self.pause_start_time = None
self._send_signal(signal.SIGCONT, "Playing", "resume")
def check_status(self) -> str | None:
"""Check if playback process has finished and return status message."""
if self.playback_process is None:
return None
return_code = self.playback_process.poll()
if return_code is None:
return None
finished_file = self.current_file_path
self._reset_state()
if finished_file:
if return_code == 0:
return f"Finished: {finished_file.name}"
return f"Playback ended unexpectedly (code: {return_code}): {finished_file.name}"
return "Playback finished"
def _reset_state(self) -> None:
"""Reset all playback state."""
self.playback_process = None
self.is_playing = False
self.is_paused = False
self.current_file_path = None
self.current_asin = None
self.playback_start_time = None
self.paused_duration = 0.0
self.pause_start_time = None
self.total_duration = None
self.chapters = []
self.seek_offset = 0.0
self.activation_hex = None
self.last_save_time = 0.0
self.playback_speed = 1.0
def _validate_playback_state(self, require_paused: bool) -> bool:
"""Validate playback state before pause/resume operations."""
if not (self.playback_process and self.is_playing):
return False
if require_paused and not self.is_paused:
return False
if not require_paused and self.is_paused:
return False
if not self.is_alive():
self.stop()
self.notify("Playback process has ended")
return False
return True
def _send_signal(self, sig: signal.Signals, status_prefix: str, action: str) -> None:
"""Send signal to playback process and update state."""
if self.playback_process is None:
return
try:
os.kill(self.playback_process.pid, sig)
self.is_paused = sig == signal.SIGSTOP
filename = self.current_file_path.name if self.current_file_path else None
message = f"{status_prefix}: {filename}" if filename else status_prefix
self.notify(message)
except ProcessLookupError:
self.stop()
self.notify("Process no longer exists")
except PermissionError:
self.notify(f"Permission denied: cannot {action} playback")
except (OSError, ValueError) as exc:
self.notify(f"Error {action}ing playback: {exc}")
def is_alive(self) -> bool:
"""Check if playback process is still running."""
if self.playback_process is None:
return False
return self.playback_process.poll() is None
def prepare_and_start(
self,
download_manager: DownloadManager,
asin: str,
status_callback: StatusCallback | None = None,
) -> bool:
"""Download file, get activation bytes, and start playback."""
notify = status_callback or self.notify
if not download_manager:
notify("Could not download file")
return False
notify("Preparing playback...")
local_path = download_manager.get_or_download(asin, notify)
if not local_path:
notify("Could not download file")
return False
notify("Getting activation bytes...")
activation_hex = download_manager.get_activation_bytes()
if not activation_hex:
notify("Failed to get activation bytes")
return False
start_position = 0.0
if self.library_client:
try:
last_position = self.library_client.get_last_position(asin)
if last_position is not None and last_position > 0:
start_position = last_position
notify(
f"Resuming from {LibraryClient.format_time(start_position)}")
except (OSError, ValueError, KeyError):
pass
notify(f"Starting playback of {local_path.name}...")
self.current_asin = asin
self.last_save_time = time.time()
return self.start(local_path, activation_hex, notify, start_position, self.playback_speed)
def toggle_playback(self) -> bool:
"""Toggle pause/resume state. Returns True if action was taken."""
if not self.is_playing:
return False
if not self.is_alive():
self.stop()
self.notify("Playback has ended")
return False
if self.is_paused:
self.resume()
else:
self.pause()
return True
def _get_current_elapsed(self) -> float:
"""Calculate current elapsed playback time."""
if self.playback_start_time is None:
return 0.0
current_time = time.time()
if self.is_paused and self.pause_start_time is not None:
return (self.pause_start_time - self.playback_start_time) - self.paused_duration
if self.pause_start_time is not None:
self.paused_duration += current_time - self.pause_start_time
self.pause_start_time = None
return max(0.0, (current_time - self.playback_start_time) - self.paused_duration)
def _stop_process(self) -> None:
"""Stop the playback process without resetting state."""
if not self.playback_process:
return
try:
if self.playback_process.poll() is None:
self.playback_process.terminate()
try:
self.playback_process.wait(timeout=2)
except subprocess.TimeoutExpired:
self.playback_process.kill()
self.playback_process.wait()
except (ProcessLookupError, ValueError):
pass
self.playback_process = None
self.is_playing = False
self.is_paused = False
self.playback_start_time = None
self.paused_duration = 0.0
self.pause_start_time = None
def _get_saved_state(self) -> dict:
"""Get current playback state for saving."""
return {
"file_path": self.current_file_path,
"asin": self.current_asin,
"activation": self.activation_hex,
"duration": self.total_duration,
"chapters": self.chapters.copy(),
"speed": self.playback_speed,
}
def _restart_at_position(
self, new_position: float, new_speed: float | None = None, message: str | None = None
) -> bool:
"""Restart playback at a new position, optionally with new speed."""
if not self.is_playing or not self.current_file_path:
return False
was_paused = self.is_paused
saved_state = self._get_saved_state()
speed = new_speed if new_speed is not None else saved_state["speed"]
self._stop_process()
time.sleep(0.2)
if self.start(saved_state["file_path"], saved_state["activation"], self.notify, new_position, speed):
self.current_asin = saved_state["asin"]
self.total_duration = saved_state["duration"]
self.chapters = saved_state["chapters"]
if was_paused:
time.sleep(0.3)
self.pause()
if message:
self.notify(message)
return True
return False
def _seek(self, seconds: float, direction: str) -> bool:
"""Seek forward or backward by specified seconds."""
elapsed = self._get_current_elapsed()
current_total_position = self.seek_offset + elapsed
if direction == "forward":
new_position = current_total_position + seconds
if self.total_duration:
if new_position >= self.total_duration - 2:
self.notify("Already at end of file")
return False
new_position = min(new_position, self.total_duration - 2)
message = f"Skipped forward {int(seconds)}s"
else:
new_position = max(0.0, current_total_position - seconds)
message = f"Skipped backward {int(seconds)}s"
return self._restart_at_position(new_position, message=message)
def seek_forward(self, seconds: float = 30.0) -> bool:
"""Seek forward by specified seconds. Returns True if action was taken."""
return self._seek(seconds, "forward")
def seek_backward(self, seconds: float = 30.0) -> bool:
"""Seek backward by specified seconds. Returns True if action was taken."""
return self._seek(seconds, "backward")
def get_current_progress(self) -> tuple[str, float, float] | None:
"""Get current playback progress."""
if not self.is_playing or self.playback_start_time is None:
return None
elapsed = self._get_current_elapsed()
total_elapsed = self.seek_offset + elapsed
chapter_name, chapter_elapsed, chapter_total = self._get_current_chapter(
total_elapsed)
return (chapter_name, chapter_elapsed, chapter_total)
def _get_current_chapter(self, elapsed: float) -> tuple[str, float, float]:
"""Get current chapter info."""
if not self.chapters:
return ("Unknown Chapter", elapsed, self.total_duration or 0.0)
for chapter in self.chapters:
if chapter["start_time"] <= elapsed < chapter["end_time"]:
chapter_elapsed = elapsed - chapter["start_time"]
chapter_total = chapter["end_time"] - chapter["start_time"]
return (chapter["title"], chapter_elapsed, chapter_total)
last_chapter = self.chapters[-1]
chapter_elapsed = max(0.0, elapsed - last_chapter["start_time"])
chapter_total = last_chapter["end_time"] - last_chapter["start_time"]
return (last_chapter["title"], chapter_elapsed, chapter_total)
def _get_current_chapter_index(self, elapsed: float) -> int | None:
"""Get the index of the current chapter based on elapsed time."""
if not self.chapters:
return None
for idx, chapter in enumerate(self.chapters):
if chapter["start_time"] <= elapsed < chapter["end_time"]:
return idx
return len(self.chapters) - 1
def seek_to_chapter(self, direction: str) -> bool:
"""Seek to next or previous chapter."""
if not self.is_playing or not self.current_file_path:
return False
if not self.chapters:
self.notify("No chapter information available")
return False
elapsed = self._get_current_elapsed()
current_total_position = self.seek_offset + elapsed
current_chapter_idx = self._get_current_chapter_index(
current_total_position)
if current_chapter_idx is None:
self.notify("Could not determine current chapter")
return False
if direction == "next":
if current_chapter_idx >= len(self.chapters) - 1:
self.notify("Already at last chapter")
return False
target_chapter = self.chapters[current_chapter_idx + 1]
new_position = target_chapter["start_time"]
message = f"Next chapter: {target_chapter['title']}"
else:
if current_chapter_idx <= 0:
self.notify("Already at first chapter")
return False
target_chapter = self.chapters[current_chapter_idx - 1]
new_position = target_chapter["start_time"]
message = f"Previous chapter: {target_chapter['title']}"
return self._restart_at_position(new_position, message=message)
def seek_to_next_chapter(self) -> bool:
"""Seek to the next chapter. Returns True if action was taken."""
return self.seek_to_chapter("next")
def seek_to_previous_chapter(self) -> bool:
"""Seek to the previous chapter. Returns True if action was taken."""
return self.seek_to_chapter("previous")
def _save_current_position(self) -> None:
"""Save the current playback position to Audible."""
if not (self.library_client and self.current_asin and self.is_playing):
return
if self.playback_start_time is None:
return
current_position = self.seek_offset + self._get_current_elapsed()
if current_position <= 0:
return
try:
self.library_client.save_last_position(
self.current_asin, current_position)
except (OSError, ValueError, KeyError):
pass
def update_position_if_needed(self) -> None:
"""Periodically save position if enough time has passed."""
if not (self.is_playing and self.library_client and self.current_asin):
return
current_time = time.time()
if current_time - self.last_save_time >= self.position_save_interval:
self._save_current_position()
self.last_save_time = current_time
def _change_speed(self, delta: float) -> bool:
"""Change playback speed by delta amount. Returns True if action was taken."""
new_speed = max(MIN_SPEED, min(MAX_SPEED, self.playback_speed + delta))
if new_speed == self.playback_speed:
return False
elapsed = self._get_current_elapsed()
current_total_position = self.seek_offset + elapsed
return self._restart_at_position(current_total_position, new_speed, f"Speed: {new_speed:.2f}x")
def increase_speed(self) -> bool:
"""Increase playback speed. Returns True if action was taken."""
return self._change_speed(SPEED_INCREMENT)
def decrease_speed(self) -> bool:
"""Decrease playback speed. Returns True if action was taken."""
return self._change_speed(-SPEED_INCREMENT)

View File

@@ -0,0 +1,6 @@
"""Playback control via ffplay and position sync with Audible."""
from .controller import PlaybackController
from .media_info import load_media_info
__all__ = ["PlaybackController", "load_media_info"]

View File

@@ -0,0 +1,30 @@
"""Chapter lookup by elapsed time."""
def get_current_chapter(
elapsed: float,
chapters: list[dict],
total_duration: float | None,
) -> tuple[str, float, float]:
"""Return (title, elapsed_in_chapter, chapter_duration) for the chapter at elapsed time."""
if not chapters:
return ("Unknown Chapter", elapsed, total_duration or 0.0)
for chapter in chapters:
if chapter["start_time"] <= elapsed < chapter["end_time"]:
chapter_elapsed = elapsed - chapter["start_time"]
chapter_total = chapter["end_time"] - chapter["start_time"]
return (chapter["title"], chapter_elapsed, chapter_total)
last = chapters[-1]
chapter_elapsed = max(0.0, elapsed - last["start_time"])
chapter_total = last["end_time"] - last["start_time"]
return (last["title"], chapter_elapsed, chapter_total)
def get_current_chapter_index(elapsed: float, chapters: list[dict]) -> int | None:
"""Return the index of the chapter containing the given elapsed time."""
if not chapters:
return None
for idx, chapter in enumerate(chapters):
if chapter["start_time"] <= elapsed < chapter["end_time"]:
return idx
return len(chapters) - 1

View File

@@ -0,0 +1,5 @@
"""Speed limits and increment for playback."""
MIN_SPEED = 0.5
MAX_SPEED = 2.0
SPEED_INCREMENT = 0.5

View File

@@ -0,0 +1,14 @@
"""Orchestrates ffplay process, position, chapters, seek, and speed; delegates to playback submodules."""
from __future__ import annotations
from ..library import LibraryClient
from ..types import StatusCallback
from .controller_seek_speed import ControllerSeekSpeedMixin
from .controller_lifecycle import ControllerLifecycleMixin
from .controller_state import ControllerStateMixin
class PlaybackController(ControllerSeekSpeedMixin, ControllerLifecycleMixin, ControllerStateMixin):
"""Controls ffplay: start/stop, pause/resume, seek, speed, and saving position to Audible."""

View File

@@ -0,0 +1,183 @@
"""Playback lifecycle: start, stop, pause, resume, prepare_and_start, restart at position."""
from __future__ import annotations
import signal
import subprocess
import time
from pathlib import Path
from ..downloads import DownloadManager
from ..library import LibraryClient
from ..types import StatusCallback
from . import process as process_mod
from .media_info import load_media_info
from .controller_state import ControllerStateMixin
class ControllerLifecycleMixin(ControllerStateMixin):
"""Start/stop, pause/resume, and restart-at-position logic."""
def start(
self,
path: Path,
activation_hex: str | None = None,
status_callback: StatusCallback | None = None,
start_position: float = 0.0,
speed: float | None = None,
) -> bool:
"""Start ffplay for the given AAX path. Returns True if playback started."""
notify = status_callback or self.notify
if not process_mod.is_ffplay_available():
notify("ffplay not found. Please install ffmpeg")
return False
if self.playback_process is not None:
self.stop()
self.activation_hex = activation_hex
self.seek_offset = start_position
if speed is not None:
self.playback_speed = speed
cmd = process_mod.build_ffplay_cmd(
path, activation_hex, start_position, self.playback_speed
)
try:
proc, return_code = process_mod.run_ffplay(cmd)
if proc is None:
if return_code == 0 and start_position > 0 and self.total_duration and start_position >= self.total_duration - 5:
notify("Reached end of file")
self._reset_state()
return False
notify(
f"Playback process exited immediately (code: {return_code})")
return False
self.playback_process = proc
self.is_playing = True
self.is_paused = False
self.current_file_path = path
self.playback_start_time = time.time()
self.paused_duration = 0.0
self.pause_start_time = None
duration, chs = load_media_info(path, activation_hex)
self.total_duration = duration
self.chapters = chs
notify(f"Playing: {path.name}")
return True
except (OSError, ValueError, subprocess.SubprocessError) as exc:
notify(f"Error starting playback: {exc}")
return False
def stop(self) -> None:
"""Stop ffplay, save position to Audible, and reset state."""
if self.playback_process is None:
return
self._save_current_position()
try:
process_mod.terminate_process(self.playback_process)
finally:
self._reset_state()
def pause(self) -> None:
"""Send SIGSTOP to ffplay and mark state as paused."""
if not self._validate_playback_state(require_paused=False):
return
self.pause_start_time = time.time()
self._send_signal(signal.SIGSTOP, "Paused", "pause")
def resume(self) -> None:
"""Send SIGCONT to ffplay and clear paused state."""
if not self._validate_playback_state(require_paused=True):
return
if self.pause_start_time is not None:
self.paused_duration += time.time() - self.pause_start_time
self.pause_start_time = None
self._send_signal(signal.SIGCONT, "Playing", "resume")
def check_status(self) -> str | None:
"""If the process has exited, return a status message and reset state; else None."""
if self.playback_process is None:
return None
return_code = self.playback_process.poll()
if return_code is None:
return None
finished_file = self.current_file_path
self._reset_state()
if finished_file:
if return_code == 0:
return f"Finished: {finished_file.name}"
return f"Playback ended unexpectedly (code: {return_code}): {finished_file.name}"
return "Playback finished"
def prepare_and_start(
self,
download_manager: DownloadManager,
asin: str,
status_callback: StatusCallback | None = None,
) -> bool:
"""Download AAX if needed, get activation bytes, then start playback. Returns True on success."""
notify = status_callback or self.notify
if not download_manager:
notify("Could not download file")
return False
notify("Preparing playback...")
local_path = download_manager.get_or_download(asin, notify)
if not local_path:
notify("Could not download file")
return False
notify("Getting activation bytes...")
activation_hex = download_manager.get_activation_bytes()
if not activation_hex:
notify("Failed to get activation bytes")
return False
start_position = 0.0
if self.library_client:
try:
last = self.library_client.get_last_position(asin)
if last is not None and last > 0:
start_position = last
notify(
f"Resuming from {LibraryClient.format_time(start_position)}")
except (OSError, ValueError, KeyError):
pass
notify(f"Starting playback of {local_path.name}...")
self.current_asin = asin
self.last_save_time = time.time()
return self.start(local_path, activation_hex, notify, start_position, self.playback_speed)
def toggle_playback(self) -> bool:
"""Toggle between pause and resume. Returns True if an action was performed."""
if not self.is_playing:
return False
if not self.is_alive():
self.stop()
self.notify("Playback has ended")
return False
if self.is_paused:
self.resume()
else:
self.pause()
return True
def _restart_at_position(
self, new_position: float, new_speed: float | None = None, message: str | None = None
) -> bool:
"""Stop current process and start again at new_position; optionally set speed and notify."""
if not self.is_playing or not self.current_file_path:
return False
was_paused = self.is_paused
saved = self._get_saved_state()
speed = new_speed if new_speed is not None else saved["speed"]
self._stop_process()
time.sleep(0.2)
if self.start(saved["file_path"], saved["activation"], self.notify, new_position, speed):
self.current_asin = saved["asin"]
self.total_duration = saved["duration"]
self.chapters = saved["chapters"]
if was_paused:
time.sleep(0.3)
self.pause()
if message:
self.notify(message)
return True
return False

View File

@@ -0,0 +1,127 @@
"""Seek, chapter, position save, and playback speed for the controller."""
from __future__ import annotations
import time
from . import chapters as chapters_mod
from . import seek as seek_mod
from .constants import MIN_SPEED, MAX_SPEED, SPEED_INCREMENT
from .controller_lifecycle import ControllerLifecycleMixin
class ControllerSeekSpeedMixin(ControllerLifecycleMixin):
"""Seek, chapter navigation, position persistence, and speed control."""
def _seek(self, seconds: float, direction: str) -> bool:
"""Seek forward or backward by seconds via restart at new position. Returns True if done."""
elapsed = self._get_current_elapsed()
current = self.seek_offset + elapsed
result = seek_mod.compute_seek_target(
current, self.total_duration, seconds, direction
)
if result is None:
self.notify("Already at end of file")
return False
new_position, message = result
return self._restart_at_position(new_position, message=message)
def seek_forward(self, seconds: float = 30.0) -> bool:
"""Seek forward by the given seconds. Returns True if seek was performed."""
return self._seek(seconds, "forward")
def seek_backward(self, seconds: float = 30.0) -> bool:
"""Seek backward by the given seconds. Returns True if seek was performed."""
return self._seek(seconds, "backward")
def get_current_progress(self) -> tuple[str, float, float] | None:
"""Return (chapter_title, chapter_elapsed, chapter_total) for progress display, or None."""
if not self.is_playing or self.playback_start_time is None:
return None
elapsed = self._get_current_elapsed()
total_elapsed = self.seek_offset + elapsed
return chapters_mod.get_current_chapter(
total_elapsed, self.chapters, self.total_duration
)
def seek_to_chapter(self, direction: str) -> bool:
"""Seek to the next or previous chapter. direction is 'next' or 'previous'. Returns True if done."""
if not self.is_playing or not self.current_file_path:
return False
if not self.chapters:
self.notify("No chapter information available")
return False
elapsed = self._get_current_elapsed()
current_total = self.seek_offset + elapsed
idx = chapters_mod.get_current_chapter_index(
current_total, self.chapters)
if idx is None:
self.notify("Could not determine current chapter")
return False
if direction == "next":
if idx >= len(self.chapters) - 1:
self.notify("Already at last chapter")
return False
target = self.chapters[idx + 1]
new_position = target["start_time"]
message = f"Next chapter: {target['title']}"
else:
if idx <= 0:
self.notify("Already at first chapter")
return False
target = self.chapters[idx - 1]
new_position = target["start_time"]
message = f"Previous chapter: {target['title']}"
return self._restart_at_position(new_position, message=message)
def seek_to_next_chapter(self) -> bool:
"""Seek to the next chapter. Returns True if seek was performed."""
return self.seek_to_chapter("next")
def seek_to_previous_chapter(self) -> bool:
"""Seek to the previous chapter. Returns True if seek was performed."""
return self.seek_to_chapter("previous")
def _save_current_position(self) -> None:
"""Persist current position to Audible via library_client."""
if not (self.library_client and self.current_asin and self.is_playing):
return
if self.playback_start_time is None:
return
current_position = self.seek_offset + self._get_current_elapsed()
if current_position <= 0:
return
try:
self.library_client.save_last_position(
self.current_asin, current_position)
except (OSError, ValueError, KeyError):
pass
def update_position_if_needed(self) -> None:
"""Save position to Audible if the save interval has elapsed since last save."""
if not (self.is_playing and self.library_client and self.current_asin):
return
current_time = time.time()
if current_time - self.last_save_time >= self.position_save_interval:
self._save_current_position()
self.last_save_time = current_time
def _change_speed(self, delta: float) -> bool:
"""Change speed by delta (clamped to MIN/MAX). Restarts playback. Returns True if changed."""
new_speed = max(MIN_SPEED, min(MAX_SPEED, self.playback_speed + delta))
if new_speed == self.playback_speed:
return False
elapsed = self._get_current_elapsed()
current_total = self.seek_offset + elapsed
return self._restart_at_position(
current_total, new_speed, f"Speed: {new_speed:.2f}x"
)
def increase_speed(self) -> bool:
"""Increase playback speed. Returns True if speed was changed."""
return self._change_speed(SPEED_INCREMENT)
def decrease_speed(self) -> bool:
"""Decrease playback speed. Returns True if speed was changed."""
return self._change_speed(-SPEED_INCREMENT)

View File

@@ -0,0 +1,124 @@
"""Playback state: init, reset, elapsed time, process validation and signals."""
from __future__ import annotations
import signal
import time
from pathlib import Path
from ..library import LibraryClient
from ..types import StatusCallback
from . import elapsed as elapsed_mod
from . import process as process_mod
class ControllerStateMixin:
"""State attributes and helpers for process/signal handling."""
def __init__(self, notify: StatusCallback, library_client: LibraryClient | None = None) -> None:
self.notify = notify
self.library_client = library_client
self.playback_process = None
self.is_playing = False
self.is_paused = False
self.current_file_path: Path | None = None
self.current_asin: str | None = None
self.playback_start_time: float | None = None
self.paused_duration: float = 0.0
self.pause_start_time: float | None = None
self.total_duration: float | None = None
self.chapters: list[dict] = []
self.seek_offset: float = 0.0
self.activation_hex: str | None = None
self.last_save_time: float = 0.0
self.position_save_interval: float = 30.0
self.playback_speed: float = 1.0
def _reset_state(self) -> None:
"""Clear playing/paused state and references to current file/asin."""
self.playback_process = None
self.is_playing = False
self.is_paused = False
self.current_file_path = None
self.current_asin = None
self.playback_start_time = None
self.paused_duration = 0.0
self.pause_start_time = None
self.total_duration = None
self.chapters = []
self.seek_offset = 0.0
self.activation_hex = None
self.last_save_time = 0.0
self.playback_speed = 1.0
def _get_saved_state(self) -> dict:
"""Return a snapshot of path, asin, activation, duration, chapters, speed for restart."""
return {
"file_path": self.current_file_path,
"asin": self.current_asin,
"activation": self.activation_hex,
"duration": self.total_duration,
"chapters": self.chapters.copy(),
"speed": self.playback_speed,
}
def _get_current_elapsed(self) -> float:
"""Return elapsed seconds since start, accounting for pauses."""
if self.pause_start_time is not None and not self.is_paused:
self.paused_duration += time.time() - self.pause_start_time
self.pause_start_time = None
return elapsed_mod.get_elapsed(
self.playback_start_time,
self.pause_start_time,
self.paused_duration,
self.is_paused,
)
def _stop_process(self) -> None:
"""Terminate the process and clear playing state without saving position."""
process_mod.terminate_process(self.playback_process)
self.playback_process = None
self.is_playing = False
self.is_paused = False
self.playback_start_time = None
self.paused_duration = 0.0
self.pause_start_time = None
def _validate_playback_state(self, require_paused: bool) -> bool:
"""Return True if process is running and paused state matches require_paused."""
if not (self.playback_process and self.is_playing):
return False
if require_paused and not self.is_paused:
return False
if not require_paused and self.is_paused:
return False
if not self.is_alive():
self.stop()
self.notify("Playback process has ended")
return False
return True
def _send_signal(self, sig: signal.Signals, status_prefix: str, action: str) -> None:
"""Send sig to the process, update is_paused, and notify."""
if self.playback_process is None:
return
try:
process_mod.send_signal(self.playback_process, sig)
self.is_paused = sig == signal.SIGSTOP
filename = self.current_file_path.name if self.current_file_path else None
msg = f"{status_prefix}: {filename}" if filename else status_prefix
self.notify(msg)
except ProcessLookupError:
self.stop()
self.notify("Process no longer exists")
except PermissionError:
self.notify(f"Permission denied: cannot {action} playback")
except (OSError, ValueError) as exc:
self.notify(f"Error {action}ing playback: {exc}")
def is_alive(self) -> bool:
"""Return True if the ffplay process is still running."""
if self.playback_process is None:
return False
return self.playback_process.poll() is None

View File

@@ -0,0 +1,23 @@
"""Elapsed playback time accounting for pauses."""
import time
def get_elapsed(
playback_start_time: float | None,
pause_start_time: float | None,
paused_duration: float,
is_paused: bool,
) -> float:
"""Return elapsed seconds since start, accounting for pauses."""
if playback_start_time is None:
return 0.0
current_time = time.time()
if is_paused and pause_start_time is not None:
return (pause_start_time - playback_start_time) - paused_duration
if pause_start_time is not None:
paused_duration += current_time - pause_start_time
return max(
0.0,
(current_time - playback_start_time) - paused_duration,
)

View File

@@ -1,4 +1,4 @@
"""Media information loading for Audible content.""" """Duration and chapter list for AAX files via ffprobe."""
import json import json
import shutil import shutil
@@ -7,7 +7,7 @@ from pathlib import Path
def load_media_info(path: Path, activation_hex: str | None = None) -> tuple[float | None, list[dict]]: def load_media_info(path: Path, activation_hex: str | None = None) -> tuple[float | None, list[dict]]:
"""Load media information including duration and chapters using ffprobe.""" """Return (total_duration_seconds, chapters) for the AAX file. Chapters have start_time, end_time, title."""
if not shutil.which("ffprobe"): if not shutil.which("ffprobe"):
return None, [] return None, []

View File

@@ -0,0 +1,68 @@
"""FFplay process: build command, spawn, terminate, and send signals."""
import os
import shutil
import signal
import subprocess
import time
from pathlib import Path
def build_ffplay_cmd(
path: Path,
activation_hex: str | None,
start_position: float,
speed: float,
) -> list[str]:
"""Build the ffplay command line for the given path and options."""
cmd = ["ffplay", "-nodisp", "-autoexit"]
if activation_hex:
cmd.extend(["-activation_bytes", activation_hex])
if start_position > 0:
cmd.extend(["-ss", str(start_position)])
if speed != 1.0:
cmd.extend(["-af", f"atempo={speed:.2f}"])
cmd.append(str(path))
return cmd
def is_ffplay_available() -> bool:
"""Return True if ffplay is on PATH."""
return shutil.which("ffplay") is not None
def run_ffplay(cmd: list[str]) -> tuple[subprocess.Popen | None, int | None]:
"""Spawn ffplay. Returns (proc, None) on success, (None, return_code) if process exited immediately, (None, None) if ffplay missing or spawn failed."""
if not is_ffplay_available():
return (None, None)
try:
proc = subprocess.Popen(
cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
time.sleep(0.2)
if proc.poll() is not None:
return (None, proc.returncode)
return (proc, None)
except (OSError, ValueError, subprocess.SubprocessError):
return (None, None)
def terminate_process(proc: subprocess.Popen | None) -> None:
"""Terminate the process; kill if it does not exit within timeout."""
if proc is None:
return
try:
if proc.poll() is None:
proc.terminate()
try:
proc.wait(timeout=2)
except subprocess.TimeoutExpired:
proc.kill()
proc.wait()
except (ProcessLookupError, ValueError):
pass
def send_signal(proc: subprocess.Popen, sig: signal.Signals) -> None:
"""Send sig to the process. May raise ProcessLookupError, PermissionError, OSError."""
os.kill(proc.pid, sig)

19
auditui/playback/seek.py Normal file
View File

@@ -0,0 +1,19 @@
"""Seek target computation: new position and message from direction and seconds."""
def compute_seek_target(
current_position: float,
total_duration: float | None,
seconds: float,
direction: str,
) -> tuple[float, str] | None:
"""Return (new_position, message) for a seek, or None if seek is invalid (e.g. at end)."""
if direction == "forward":
new_position = current_position + seconds
if total_duration is not None:
if new_position >= total_duration - 2:
return None
new_position = min(new_position, total_duration - 2)
return (new_position, f"Skipped forward {int(seconds)}s")
new_position = max(0.0, current_position - seconds)
return (new_position, f"Skipped backward {int(seconds)}s")

View File

@@ -0,0 +1,5 @@
"""Listening and account statistics for the stats screen."""
from .aggregator import StatsAggregator
__all__ = ["StatsAggregator"]

71
auditui/stats/account.py Normal file
View File

@@ -0,0 +1,71 @@
"""Account and subscription data from the API."""
from typing import Any
def get_account_info(client: Any) -> dict:
if not client:
return {}
account_info = {}
endpoints = [
(
"1.0/account/information",
"subscription_details,plan_summary,subscription_details_payment_instrument,delinquency_status,customer_benefits,customer_segments,directed_ids",
),
(
"1.0/customer/information",
"subscription_details_premium,subscription_details_rodizio,customer_segment,subscription_details_channels,migration_details",
),
(
"1.0/customer/status",
"benefits_status,member_giving_status,prime_benefits_status,prospect_benefits_status",
),
]
for endpoint, response_groups in endpoints:
try:
response = client.get(endpoint, response_groups=response_groups)
account_info.update(response)
except Exception:
pass
return account_info
def get_subscription_details(account_info: dict) -> dict:
paths = [
["customer_details", "subscription", "subscription_details"],
["customer", "customer_details", "subscription", "subscription_details"],
["subscription_details"],
["subscription", "subscription_details"],
]
for path in paths:
data = account_info
for key in path:
if isinstance(data, dict):
data = data.get(key)
else:
break
if isinstance(data, list) and data:
return data[0]
return {}
def get_country(auth: Any) -> str:
if not auth:
return "Unknown"
try:
locale_obj = getattr(auth, "locale", None)
if not locale_obj:
return "Unknown"
if hasattr(locale_obj, "country_code"):
return locale_obj.country_code.upper()
if hasattr(locale_obj, "domain"):
return locale_obj.domain.upper()
if isinstance(locale_obj, str):
return (
locale_obj.split("_")[-1].upper()
if "_" in locale_obj
else locale_obj.upper()
)
return str(locale_obj)
except Exception:
return "Unknown"

View File

@@ -0,0 +1,85 @@
"""Aggregates listening time, account info, and subscription data for display."""
from datetime import date
from typing import Any
from ..types import LibraryItem
from . import account as account_mod
from . import email as email_mod
from . import format as format_mod
from . import listening as listening_mod
class StatsAggregator:
"""Builds a list of (label, value) stats from the API, auth, and library."""
def __init__(
self,
client: Any,
auth: Any,
library_client: Any,
all_items: list[LibraryItem],
) -> None:
self.client = client
self.auth = auth
self.library_client = library_client
self.all_items = all_items
def get_stats(self, today: date | None = None) -> list[tuple[str, str]]:
if not self.client:
return []
today = today or date.today()
signup_year = listening_mod.get_signup_year(self.client)
month_time = listening_mod.get_listening_time(
self.client, 1, today.strftime("%Y-%m")
)
year_time = listening_mod.get_listening_time(
self.client, 12, today.strftime("%Y-01")
)
finished_count = listening_mod.get_finished_books_count(
self.library_client, self.all_items or []
)
total_books = len(self.all_items) if self.all_items else 0
email = email_mod.resolve_email(
self.auth,
self.client,
get_account_info=lambda: account_mod.get_account_info(self.client),
)
country = account_mod.get_country(self.auth)
subscription_name = "Unknown"
subscription_price = "Unknown"
next_bill_date = "Unknown"
account_info = account_mod.get_account_info(self.client)
if account_info:
subscription_data = account_mod.get_subscription_details(
account_info)
if subscription_data:
if name := subscription_data.get("name"):
subscription_name = name
if bill_date := subscription_data.get("next_bill_date"):
next_bill_date = format_mod.format_date(bill_date)
if bill_amount := subscription_data.get("next_bill_amount", {}):
amount = bill_amount.get("currency_value")
currency = bill_amount.get("currency_code", "EUR")
if amount is not None:
subscription_price = f"{amount} {currency}"
stats_items = []
if email != "Unknown":
stats_items.append(("Email", email))
stats_items.append(("Country Store", country))
stats_items.append(
("Signup Year", str(signup_year) if signup_year > 0 else "Unknown")
)
if next_bill_date != "Unknown":
stats_items.append(("Next Credit", next_bill_date))
stats_items.append(("Next Bill", next_bill_date))
if subscription_name != "Unknown":
stats_items.append(("Subscription", subscription_name))
if subscription_price != "Unknown":
stats_items.append(("Price", subscription_price))
stats_items.append(("This Month", format_mod.format_time(month_time)))
stats_items.append(("This Year", format_mod.format_time(year_time)))
stats_items.append(
("Books Finished", f"{finished_count} / {total_books}"))
return stats_items

155
auditui/stats/email.py Normal file
View File

@@ -0,0 +1,155 @@
"""Email resolution from auth, config, auth file, and account API."""
import json
from pathlib import Path
from typing import Any, Callable
from ..constants import AUTH_PATH, CONFIG_PATH
def find_email_in_data(data: Any) -> str | None:
if data is None:
return None
stack = [data]
while stack:
current = stack.pop()
if isinstance(current, dict):
stack.extend(current.values())
elif isinstance(current, list):
stack.extend(current)
elif isinstance(current, str):
if "@" in current:
local, _, domain = current.partition("@")
if local and "." in domain:
return current
return None
def first_email(*values: str | None) -> str | None:
for value in values:
if value and value != "Unknown":
return value
return None
def get_email_from_auth(auth: Any) -> str | None:
if not auth:
return None
try:
email = first_email(
getattr(auth, "username", None),
getattr(auth, "login", None),
getattr(auth, "email", None),
)
if email:
return email
except Exception:
return None
try:
customer_info = getattr(auth, "customer_info", None)
if isinstance(customer_info, dict):
email = first_email(
customer_info.get("email"),
customer_info.get("email_address"),
customer_info.get("primary_email"),
)
if email:
return email
except Exception:
return None
try:
data = getattr(auth, "data", None)
if isinstance(data, dict):
return first_email(
data.get("username"),
data.get("email"),
data.get("login"),
data.get("user_email"),
)
except Exception:
return None
return None
def get_email_from_config(config_path: Path | None = None) -> str | None:
path = config_path or CONFIG_PATH
try:
if path.exists():
with open(path, "r", encoding="utf-8") as f:
config = json.load(f)
return first_email(
config.get("email"),
config.get("username"),
config.get("login"),
)
except Exception:
return None
return None
def get_email_from_auth_file(auth_path: Path | None = None) -> str | None:
path = auth_path or AUTH_PATH
try:
if path.exists():
with open(path, "r", encoding="utf-8") as f:
auth_file_data = json.load(f)
return first_email(
auth_file_data.get("username"),
auth_file_data.get("email"),
auth_file_data.get("login"),
auth_file_data.get("user_email"),
)
except Exception:
return None
return None
def get_email_from_account_info(account_info: dict) -> str | None:
email = first_email(
account_info.get("email"),
account_info.get("customer_email"),
account_info.get("username"),
)
if email:
return email
customer_info = account_info.get("customer_info", {})
if isinstance(customer_info, dict):
return first_email(
customer_info.get("email"),
customer_info.get("email_address"),
customer_info.get("primary_email"),
)
return None
def resolve_email(
auth: Any,
client: Any,
config_path: Path | None = None,
auth_path: Path | None = None,
get_account_info: Callable[[], dict] | None = None,
) -> str:
config_path = config_path or CONFIG_PATH
auth_path = auth_path or AUTH_PATH
for getter in (
lambda: get_email_from_auth(auth),
lambda: get_email_from_config(config_path),
lambda: get_email_from_auth_file(auth_path),
lambda: get_email_from_account_info(
get_account_info()) if get_account_info else None,
):
email = getter()
if email:
return email
auth_data = None
if auth:
try:
auth_data = getattr(auth, "data", None)
except Exception:
pass
account_info = get_account_info() if get_account_info else {}
for candidate in (auth_data, account_info):
email = find_email_in_data(candidate)
if email:
return email
return "Unknown"

22
auditui/stats/format.py Normal file
View File

@@ -0,0 +1,22 @@
"""Time and date formatting for stats display."""
from datetime import datetime
def format_time(milliseconds: int) -> str:
total_seconds = int(milliseconds) // 1000
hours, remainder = divmod(total_seconds, 3600)
minutes, _ = divmod(remainder, 60)
if hours > 0:
return f"{hours}h{minutes:02d}"
return f"{minutes}m"
def format_date(date_str: str | None) -> str:
if not date_str:
return "Unknown"
try:
dt = datetime.fromisoformat(date_str.replace("Z", "+00:00"))
return dt.strftime("%Y-%m-%d")
except ValueError:
return date_str

View File

@@ -0,0 +1,75 @@
"""Listening time and signup year from stats API; finished books count."""
from datetime import date
from typing import Any
from ..types import LibraryItem
def has_activity(stats: dict) -> bool:
monthly_stats = stats.get("aggregated_monthly_listening_stats", [])
return bool(
monthly_stats and any(s.get("aggregated_sum", 0)
> 0 for s in monthly_stats)
)
def get_listening_time(client: Any, duration: int, start_date: str) -> int:
if not client:
return 0
try:
stats = client.get(
"1.0/stats/aggregates",
monthly_listening_interval_duration=str(duration),
monthly_listening_interval_start_date=start_date,
store="Audible",
)
monthly_stats = stats.get("aggregated_monthly_listening_stats", [])
return sum(s.get("aggregated_sum", 0) for s in monthly_stats)
except Exception:
return 0
def get_signup_year(client: Any) -> int:
if not client:
return 0
current_year = date.today().year
try:
stats = client.get(
"1.0/stats/aggregates",
monthly_listening_interval_duration="12",
monthly_listening_interval_start_date=f"{current_year}-01",
store="Audible",
)
if not has_activity(stats):
return 0
except Exception:
return 0
left, right = 1995, current_year
earliest_year = current_year
while left <= right:
middle = (left + right) // 2
try:
stats = client.get(
"1.0/stats/aggregates",
monthly_listening_interval_duration="12",
monthly_listening_interval_start_date=f"{middle}-01",
store="Audible",
)
has_activity_ = has_activity(stats)
except Exception:
has_activity_ = False
if has_activity_:
earliest_year = middle
right = middle - 1
else:
left = middle + 1
return earliest_year
def get_finished_books_count(
library_client: Any, all_items: list[LibraryItem]
) -> int:
if not library_client or not all_items:
return 0
return sum(1 for item in all_items if library_client.is_finished(item))

View File

@@ -0,0 +1,8 @@
"""Shared type aliases for the Audible TUI."""
from __future__ import annotations
from typing import Callable
LibraryItem = dict
StatusCallback = Callable[[str], None]

View File

@@ -1,570 +0,0 @@
"""UI components for the Auditui application."""
import json
from datetime import date, datetime
from typing import Any, Callable, Protocol, TYPE_CHECKING, cast
from textual.app import ComposeResult
from textual.containers import Container, Vertical
from textual.screen import ModalScreen
from textual.timer import Timer
from textual.widgets import Input, Label, ListItem, ListView, Static
from .constants import AUTH_PATH, CONFIG_PATH
if TYPE_CHECKING:
from textual.binding import Binding
class _AppContext(Protocol):
BINDINGS: list[tuple[str, str, str]]
client: Any
auth: Any
library_client: Any
all_items: list[dict]
KEY_DISPLAY_MAP = {
"ctrl+": "^",
"left": "",
"right": "",
"up": "",
"down": "",
"space": "Space",
"enter": "Enter",
}
KEY_COLOR = "#f9e2af"
DESC_COLOR = "#cdd6f4"
class AppContextMixin:
"""Mixin to provide a typed app accessor."""
def _app(self) -> _AppContext:
return cast(_AppContext, cast(Any, self).app)
class HelpScreen(AppContextMixin, ModalScreen):
"""Help screen displaying all available keybindings."""
BINDINGS = [("escape", "dismiss", "Close"), ("?", "dismiss", "Close")]
@staticmethod
def _format_key_display(key: str) -> str:
"""Format a key string for display with symbols."""
result = key
for old, new in KEY_DISPLAY_MAP.items():
result = result.replace(old, new)
return result
@staticmethod
def _parse_binding(binding: "Binding | tuple[str, str, str]") -> tuple[str, str]:
"""Extract key and description from a binding."""
if isinstance(binding, tuple):
return binding[0], binding[2]
return binding.key, binding.description
def _make_item(self, binding: "Binding | tuple[str, str, str]") -> ListItem:
"""Create a ListItem for a single binding."""
key, description = self._parse_binding(binding)
key_display = self._format_key_display(key)
text = f"[bold {KEY_COLOR}]{key_display:>16}[/] [{DESC_COLOR}]{description:<25}[/]"
return ListItem(Label(text))
def compose(self) -> ComposeResult:
app = self._app()
bindings = list(app.BINDINGS)
with Container(id="help_container"):
yield Static("Keybindings", id="help_title")
with Vertical(id="help_content"):
yield ListView(
*[self._make_item(b) for b in bindings],
classes="help_list",
)
yield Static(
f"Press [bold {KEY_COLOR}]?[/] or [bold {KEY_COLOR}]Escape[/] to close",
id="help_footer",
)
async def action_dismiss(self, result: Any | None = None) -> None:
await self.dismiss(result)
class StatsScreen(AppContextMixin, ModalScreen):
"""Stats screen displaying listening statistics."""
BINDINGS = [("escape", "dismiss", "Close"), ("s", "dismiss", "Close")]
def _format_time(self, milliseconds: int) -> str:
"""Format milliseconds as hours and minutes."""
total_seconds = int(milliseconds) // 1000
hours, remainder = divmod(total_seconds, 3600)
minutes, _ = divmod(remainder, 60)
if hours > 0:
return f"{hours}h{minutes:02d}"
return f"{minutes}m"
def _format_date(self, date_str: str | None) -> str:
"""Format ISO date string for display."""
if not date_str:
return "Unknown"
try:
dt = datetime.fromisoformat(date_str.replace("Z", "+00:00"))
return dt.strftime("%Y-%m-%d")
except ValueError:
return date_str
def _get_signup_year(self) -> int:
"""Get signup year using binary search on listening activity."""
app = self._app()
if not app.client:
return 0
current_year = date.today().year
try:
stats = app.client.get(
"1.0/stats/aggregates",
monthly_listening_interval_duration="12",
monthly_listening_interval_start_date=f"{current_year}-01",
store="Audible",
)
if not self._has_activity(stats):
return 0
except Exception:
return 0
left, right = 1995, current_year
earliest_year = current_year
while left <= right:
middle = (left + right) // 2
try:
stats = app.client.get(
"1.0/stats/aggregates",
monthly_listening_interval_duration="12",
monthly_listening_interval_start_date=f"{middle}-01",
store="Audible",
)
has_activity = self._has_activity(stats)
except Exception:
has_activity = False
if has_activity:
earliest_year = middle
right = middle - 1
else:
left = middle + 1
return earliest_year
@staticmethod
def _has_activity(stats: dict) -> bool:
"""Check if stats contain any listening activity."""
monthly_stats = stats.get("aggregated_monthly_listening_stats", [])
return bool(
monthly_stats and any(s.get("aggregated_sum", 0) > 0 for s in monthly_stats)
)
def _get_listening_time(self, duration: int, start_date: str) -> int:
"""Get listening time in milliseconds for a given period."""
app = self._app()
if not app.client:
return 0
try:
stats = app.client.get(
"1.0/stats/aggregates",
monthly_listening_interval_duration=str(duration),
monthly_listening_interval_start_date=start_date,
store="Audible",
)
monthly_stats = stats.get("aggregated_monthly_listening_stats", [])
return sum(s.get("aggregated_sum", 0) for s in monthly_stats)
except Exception:
return 0
def _get_finished_books_count(self) -> int:
"""Get count of finished books from library."""
app = self._app()
if not app.library_client or not app.all_items:
return 0
return sum(1 for item in app.all_items if app.library_client.is_finished(item))
def _get_account_info(self) -> dict:
"""Get account information including subscription details."""
app = self._app()
if not app.client:
return {}
account_info = {}
endpoints = [
(
"1.0/account/information",
"subscription_details,plan_summary,subscription_details_payment_instrument,delinquency_status,customer_benefits,customer_segments,directed_ids",
),
(
"1.0/customer/information",
"subscription_details_premium,subscription_details_rodizio,customer_segment,subscription_details_channels,migration_details",
),
(
"1.0/customer/status",
"benefits_status,member_giving_status,prime_benefits_status,prospect_benefits_status",
),
]
for endpoint, response_groups in endpoints:
try:
response = app.client.get(endpoint, response_groups=response_groups)
account_info.update(response)
except Exception:
pass
return account_info
def _get_email(self) -> str:
"""Get email from auth, config, or API."""
app = self._app()
for getter in (
self._get_email_from_auth,
self._get_email_from_config,
self._get_email_from_auth_file,
self._get_email_from_account_info,
):
email = getter(app)
if email:
return email
auth_data: dict[str, Any] | None = None
if app.auth:
try:
auth_data = getattr(app.auth, "data", None)
except Exception:
auth_data = None
account_info = self._get_account_info() if app.client else None
for candidate in (auth_data, account_info):
email = self._find_email_in_data(candidate)
if email:
return email
return "Unknown"
def _get_email_from_auth(self, app: _AppContext) -> str | None:
"""Extract email from the authenticator if available."""
if not app.auth:
return None
try:
email = self._first_email(
getattr(app.auth, "username", None),
getattr(app.auth, "login", None),
getattr(app.auth, "email", None),
)
if email:
return email
except Exception:
return None
try:
customer_info = getattr(app.auth, "customer_info", None)
if isinstance(customer_info, dict):
email = self._first_email(
customer_info.get("email"),
customer_info.get("email_address"),
customer_info.get("primary_email"),
)
if email:
return email
except Exception:
return None
try:
data = getattr(app.auth, "data", None)
if isinstance(data, dict):
return self._first_email(
data.get("username"),
data.get("email"),
data.get("login"),
data.get("user_email"),
)
except Exception:
return None
return None
def _get_email_from_config(self, app: _AppContext) -> str | None:
"""Extract email from the config file."""
try:
if CONFIG_PATH.exists():
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
config = json.load(f)
return self._first_email(
config.get("email"),
config.get("username"),
config.get("login"),
)
except Exception:
return None
return None
def _get_email_from_auth_file(self, app: _AppContext) -> str | None:
"""Extract email from the auth file."""
try:
if AUTH_PATH.exists():
with open(AUTH_PATH, "r", encoding="utf-8") as f:
auth_file_data = json.load(f)
return self._first_email(
auth_file_data.get("username"),
auth_file_data.get("email"),
auth_file_data.get("login"),
auth_file_data.get("user_email"),
)
except Exception:
return None
return None
def _get_email_from_account_info(self, app: _AppContext) -> str | None:
"""Extract email from the account info API."""
if not app.client:
return None
try:
account_info = self._get_account_info()
if account_info:
email = self._first_email(
account_info.get("email"),
account_info.get("customer_email"),
account_info.get("username"),
)
if email:
return email
customer_info = account_info.get("customer_info", {})
if isinstance(customer_info, dict):
return self._first_email(
customer_info.get("email"),
customer_info.get("email_address"),
customer_info.get("primary_email"),
)
except Exception:
return None
return None
def _first_email(self, *values: str | None) -> str | None:
"""Return the first non-empty, non-Unknown email value."""
for value in values:
if value and value != "Unknown":
return value
return None
def _find_email_in_data(self, data: Any) -> str | None:
"""Search nested data for an email-like value."""
if data is None:
return None
stack: list[Any] = [data]
while stack:
current = stack.pop()
if isinstance(current, dict):
stack.extend(current.values())
elif isinstance(current, list):
stack.extend(current)
elif isinstance(current, str):
if "@" in current:
local, _, domain = current.partition("@")
if local and "." in domain:
return current
return None
def _get_subscription_details(self, account_info: dict) -> dict:
"""Extract subscription details from nested API response."""
paths = [
["customer_details", "subscription", "subscription_details"],
["customer", "customer_details", "subscription", "subscription_details"],
["subscription_details"],
["subscription", "subscription_details"],
]
for path in paths:
data: Any = account_info
for key in path:
if isinstance(data, dict):
data = data.get(key)
else:
break
if isinstance(data, list) and data:
return data[0]
return {}
def _get_country(self) -> str:
"""Get country from authenticator locale."""
app = self._app()
if not app.auth:
return "Unknown"
try:
locale_obj = getattr(app.auth, "locale", None)
if not locale_obj:
return "Unknown"
if hasattr(locale_obj, "country_code"):
return locale_obj.country_code.upper()
if hasattr(locale_obj, "domain"):
return locale_obj.domain.upper()
if isinstance(locale_obj, str):
return (
locale_obj.split("_")[-1].upper()
if "_" in locale_obj
else locale_obj.upper()
)
return str(locale_obj)
except Exception:
return "Unknown"
def _make_stat_item(self, label: str, value: str) -> ListItem:
"""Create a ListItem for a stat."""
text = f"[bold {KEY_COLOR}]{label:>16}[/] [{DESC_COLOR}]{value:<25}[/]"
return ListItem(Label(text))
def compose(self) -> ComposeResult:
app = self._app()
if not app.client:
with Container(id="help_container"):
yield Static("Statistics", id="help_title")
yield Static(
"Not authenticated. Please restart and authenticate.",
classes="help_row",
)
yield Static(
f"Press [bold {KEY_COLOR}]s[/] or [bold {KEY_COLOR}]Escape[/] to close",
id="help_footer",
)
return
today = date.today()
stats_items = self._build_stats_items(today)
with Container(id="help_container"):
yield Static("Statistics", id="help_title")
with Vertical(id="help_content"):
yield ListView(
*[
self._make_stat_item(label, value)
for label, value in stats_items
],
classes="help_list",
)
yield Static(
f"Press [bold {KEY_COLOR}]s[/] or [bold {KEY_COLOR}]Escape[/] to close",
id="help_footer",
)
def _build_stats_items(self, today: date) -> list[tuple[str, str]]:
"""Build the list of stats items to display."""
signup_year = self._get_signup_year()
month_time = self._get_listening_time(1, today.strftime("%Y-%m"))
year_time = self._get_listening_time(12, today.strftime("%Y-01"))
finished_count = self._get_finished_books_count()
app = self._app()
total_books = len(app.all_items) if app.all_items else 0
email = self._get_email()
country = self._get_country()
subscription_name = "Unknown"
subscription_price = "Unknown"
next_bill_date = "Unknown"
account_info = self._get_account_info()
if account_info:
subscription_data = self._get_subscription_details(account_info)
if subscription_data:
if name := subscription_data.get("name"):
subscription_name = name
if bill_date := subscription_data.get("next_bill_date"):
next_bill_date = self._format_date(bill_date)
if bill_amount := subscription_data.get("next_bill_amount", {}):
amount = bill_amount.get("currency_value")
currency = bill_amount.get("currency_code", "EUR")
if amount is not None:
subscription_price = f"{amount} {currency}"
stats_items = []
if email != "Unknown":
stats_items.append(("Email", email))
stats_items.append(("Country Store", country))
stats_items.append(
("Signup Year", str(signup_year) if signup_year > 0 else "Unknown")
)
if next_bill_date != "Unknown":
stats_items.append(("Next Credit", next_bill_date))
stats_items.append(("Next Bill", next_bill_date))
if subscription_name != "Unknown":
stats_items.append(("Subscription", subscription_name))
if subscription_price != "Unknown":
stats_items.append(("Price", subscription_price))
stats_items.append(("This Month", self._format_time(month_time)))
stats_items.append(("This Year", self._format_time(year_time)))
stats_items.append(("Books Finished", f"{finished_count} / {total_books}"))
return stats_items
async def action_dismiss(self, result: Any | None = None) -> None:
await self.dismiss(result)
class FilterScreen(ModalScreen[str]):
"""Filter screen for searching the library."""
BINDINGS = [("escape", "cancel", "Cancel")]
def __init__(
self,
initial_filter: str = "",
on_change: Callable[[str], None] | None = None,
debounce_seconds: float = 0.2,
) -> None:
super().__init__()
self._initial_filter = initial_filter
self._on_change = on_change
self._debounce_seconds = debounce_seconds
self._debounce_timer: Timer | None = None
def compose(self) -> ComposeResult:
with Container(id="filter_container"):
yield Static("Filter Library", id="filter_title")
yield Input(
value=self._initial_filter,
placeholder="Type to filter by title or author...",
id="filter_input",
)
yield Static(
f"Press [bold {KEY_COLOR}]Enter[/] to apply, "
f"[bold {KEY_COLOR}]Escape[/] to clear",
id="filter_footer",
)
def on_mount(self) -> None:
self.query_one("#filter_input", Input).focus()
def on_input_submitted(self, event: Input.Submitted) -> None:
self.dismiss(event.value)
def on_input_changed(self, event: Input.Changed) -> None:
callback = self._on_change
if not callback:
return
if self._debounce_timer:
self._debounce_timer.stop()
value = event.value
self._debounce_timer = self.set_timer(
self._debounce_seconds,
lambda: callback(value),
)
def action_cancel(self) -> None:
self.dismiss("")
def on_unmount(self) -> None:
if self._debounce_timer:
self._debounce_timer.stop()

7
auditui/ui/__init__.py Normal file
View File

@@ -0,0 +1,7 @@
"""Modal screens: help keybindings, filter input, and listening/account statistics."""
from .filter_screen import FilterScreen
from .help_screen import HelpScreen
from .stats_screen import StatsScreen
__all__ = ["FilterScreen", "HelpScreen", "StatsScreen"]

30
auditui/ui/common.py Normal file
View File

@@ -0,0 +1,30 @@
"""Shared protocol, constants, and mixin for modal screens that need app context."""
from typing import Any, Protocol, cast
class _AppContext(Protocol):
BINDINGS: list[tuple[str, str, str]]
client: Any
auth: Any
library_client: Any
all_items: list[dict]
KEY_DISPLAY_MAP = {
"ctrl+": "^",
"left": "",
"right": "",
"up": "",
"down": "",
"space": "Space",
"enter": "Enter",
}
KEY_COLOR = "#f9e2af"
DESC_COLOR = "#cdd6f4"
class AppContextMixin:
def _app(self) -> _AppContext:
return cast(_AppContext, cast(Any, self).app)

View File

@@ -0,0 +1,66 @@
"""Filter modal with input; returns filter string on dismiss."""
from typing import Callable
from textual.app import ComposeResult
from textual.containers import Container
from textual.screen import ModalScreen
from textual.timer import Timer
from textual.widgets import Input, Static
from .common import KEY_COLOR
class FilterScreen(ModalScreen[str]):
BINDINGS = [("escape", "cancel", "Cancel")]
def __init__(
self,
initial_filter: str = "",
on_change: Callable[[str], None] | None = None,
debounce_seconds: float = 0.2,
) -> None:
super().__init__()
self._initial_filter = initial_filter
self._on_change = on_change
self._debounce_seconds = debounce_seconds
self._debounce_timer: Timer | None = None
def compose(self) -> ComposeResult:
with Container(id="filter_container"):
yield Static("Filter Library", id="filter_title")
yield Input(
value=self._initial_filter,
placeholder="Type to filter by title or author...",
id="filter_input",
)
yield Static(
f"Press [bold {KEY_COLOR}]Enter[/] to apply, "
f"[bold {KEY_COLOR}]Escape[/] to clear",
id="filter_footer",
)
def on_mount(self) -> None:
self.query_one("#filter_input", Input).focus()
def on_input_submitted(self, event: Input.Submitted) -> None:
self.dismiss(event.value)
def on_input_changed(self, event: Input.Changed) -> None:
callback = self._on_change
if not callback:
return
if self._debounce_timer:
self._debounce_timer.stop()
value = event.value
self._debounce_timer = self.set_timer(
self._debounce_seconds,
lambda: callback(value),
)
def action_cancel(self) -> None:
self.dismiss("")
def on_unmount(self) -> None:
if self._debounce_timer:
self._debounce_timer.stop()

54
auditui/ui/help_screen.py Normal file
View File

@@ -0,0 +1,54 @@
"""Help modal that lists keybindings from the main app."""
from typing import TYPE_CHECKING, Any
from textual.app import ComposeResult
from textual.containers import Container, Vertical
from textual.screen import ModalScreen
from textual.widgets import Label, ListItem, ListView, Static
from .common import KEY_COLOR, KEY_DISPLAY_MAP, DESC_COLOR, AppContextMixin
if TYPE_CHECKING:
from textual.binding import Binding
class HelpScreen(AppContextMixin, ModalScreen):
BINDINGS = [("escape", "dismiss", "Close"), ("?", "dismiss", "Close")]
@staticmethod
def _format_key_display(key: str) -> str:
result = key
for old, new in KEY_DISPLAY_MAP.items():
result = result.replace(old, new)
return result
@staticmethod
def _parse_binding(binding: "Binding | tuple[str, str, str]") -> tuple[str, str]:
if isinstance(binding, tuple):
return binding[0], binding[2]
return binding.key, binding.description
def _make_item(self, binding: "Binding | tuple[str, str, str]") -> ListItem:
key, description = self._parse_binding(binding)
key_display = self._format_key_display(key)
text = f"[bold {KEY_COLOR}]{key_display:>16}[/] [{DESC_COLOR}]{description:<25}[/]"
return ListItem(Label(text))
def compose(self) -> ComposeResult:
app = self._app()
bindings = list(app.BINDINGS)
with Container(id="help_container"):
yield Static("Keybindings", id="help_title")
with Vertical(id="help_content"):
yield ListView(
*[self._make_item(b) for b in bindings],
classes="help_list",
)
yield Static(
f"Press [bold {KEY_COLOR}]?[/] or [bold {KEY_COLOR}]Escape[/] to close",
id="help_footer",
)
async def action_dismiss(self, result: Any | None = None) -> None:
await self.dismiss(result)

View File

@@ -0,0 +1,54 @@
"""Statistics modal showing listening time and account info via StatsAggregator."""
from datetime import date
from typing import Any
from textual.app import ComposeResult
from textual.containers import Container, Vertical
from textual.screen import ModalScreen
from textual.widgets import Label, ListItem, ListView, Static
from ..stats import StatsAggregator
from .common import KEY_COLOR, DESC_COLOR, AppContextMixin
class StatsScreen(AppContextMixin, ModalScreen):
BINDINGS = [("escape", "dismiss", "Close"), ("s", "dismiss", "Close")]
def _make_stat_item(self, label: str, value: str) -> ListItem:
text = f"[bold {KEY_COLOR}]{label:>16}[/] [{DESC_COLOR}]{value:<25}[/]"
return ListItem(Label(text))
def compose(self) -> ComposeResult:
app = self._app()
if not app.client:
with Container(id="help_container"):
yield Static("Statistics", id="help_title")
yield Static(
"Not authenticated. Please restart and authenticate.",
classes="help_row",
)
yield Static(
f"Press [bold {KEY_COLOR}]s[/] or [bold {KEY_COLOR}]Escape[/] to close",
id="help_footer",
)
return
aggregator = StatsAggregator(
app.client, app.auth, app.library_client, app.all_items or []
)
stats_items = aggregator.get_stats(date.today())
with Container(id="help_container"):
yield Static("Statistics", id="help_title")
with Vertical(id="help_content"):
yield ListView(
*[self._make_stat_item(label, value)
for label, value in stats_items],
classes="help_list",
)
yield Static(
f"Press [bold {KEY_COLOR}]s[/] or [bold {KEY_COLOR}]Escape[/] to close",
id="help_footer",
)
async def action_dismiss(self, result: Any | None = None) -> None:
await self.dismiss(result)

View File

@@ -4,7 +4,7 @@ from dataclasses import dataclass, field
from typing import Any, cast from typing import Any, cast
from auditui.app import Auditui from auditui.app import Auditui
from auditui.search_utils import build_search_text, filter_items from auditui.library import build_search_text, filter_items
class StubLibrary: class StubLibrary:

View File

@@ -2,24 +2,24 @@ from pathlib import Path
import pytest import pytest
from auditui import downloads from auditui.downloads import DownloadManager
from auditui.constants import MIN_FILE_SIZE from auditui.constants import MIN_FILE_SIZE
def test_sanitize_filename() -> None: def test_sanitize_filename() -> None:
dm = downloads.DownloadManager.__new__(downloads.DownloadManager) dm = DownloadManager.__new__(DownloadManager)
assert dm._sanitize_filename('a<>:"/\\|?*b') == "a_________b" assert dm._sanitize_filename('a<>:"/\\|?*b') == "a_________b"
def test_validate_download_url() -> None: def test_validate_download_url() -> None:
dm = downloads.DownloadManager.__new__(downloads.DownloadManager) dm = DownloadManager.__new__(DownloadManager)
assert dm._validate_download_url("https://example.com/file") is True assert dm._validate_download_url("https://example.com/file") is True
assert dm._validate_download_url("http://example.com/file") is True assert dm._validate_download_url("http://example.com/file") is True
assert dm._validate_download_url("ftp://example.com/file") is False assert dm._validate_download_url("ftp://example.com/file") is False
def test_cached_path_and_remove(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: def test_cached_path_and_remove(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
dm = downloads.DownloadManager.__new__(downloads.DownloadManager) dm = DownloadManager.__new__(DownloadManager)
dm.cache_dir = tmp_path dm.cache_dir = tmp_path
monkeypatch.setattr(dm, "_get_name_from_asin", lambda asin: "My Book") monkeypatch.setattr(dm, "_get_name_from_asin", lambda asin: "My Book")
@@ -37,7 +37,7 @@ def test_cached_path_and_remove(tmp_path: Path, monkeypatch: pytest.MonkeyPatch)
def test_cached_path_ignores_small_file(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: def test_cached_path_ignores_small_file(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
dm = downloads.DownloadManager.__new__(downloads.DownloadManager) dm = DownloadManager.__new__(DownloadManager)
dm.cache_dir = tmp_path dm.cache_dir = tmp_path
monkeypatch.setattr(dm, "_get_name_from_asin", lambda asin: "My Book") monkeypatch.setattr(dm, "_get_name_from_asin", lambda asin: "My Book")

View File

@@ -1,7 +1,13 @@
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any, cast from typing import Any, cast
from auditui import table_utils from auditui.constants import AUTHOR_NAME_MAX_LENGTH
from auditui.library import (
create_progress_sort_key,
create_title_sort_key,
format_item_as_row,
truncate_author_name,
)
class StubLibrary: class StubLibrary:
@@ -37,22 +43,22 @@ class StubDownloads:
def test_create_title_sort_key_normalizes_accents() -> None: def test_create_title_sort_key_normalizes_accents() -> None:
key_fn, _ = table_utils.create_title_sort_key() key_fn, _ = create_title_sort_key()
assert key_fn(["École"]) == "ecole" assert key_fn(["École"]) == "ecole"
assert key_fn(["Zoo"]) == "zoo" assert key_fn(["Zoo"]) == "zoo"
def test_create_progress_sort_key_parses_percent() -> None: def test_create_progress_sort_key_parses_percent() -> None:
key_fn, _ = table_utils.create_progress_sort_key() key_fn, _ = create_progress_sort_key()
assert key_fn(["0", "0", "0", "42.5%"]) == 42.5 assert key_fn(["0", "0", "0", "42.5%"]) == 42.5
assert key_fn(["0", "0", "0", "bad"]) == 0.0 assert key_fn(["0", "0", "0", "bad"]) == 0.0
def test_truncate_author_name() -> None: def test_truncate_author_name() -> None:
long_name = "A" * (table_utils.AUTHOR_NAME_MAX_LENGTH + 5) long_name = "A" * (AUTHOR_NAME_MAX_LENGTH + 5)
truncated = table_utils.truncate_author_name(long_name) truncated = truncate_author_name(long_name)
assert truncated.endswith("...") assert truncated.endswith("...")
assert len(truncated) <= table_utils.AUTHOR_NAME_MAX_LENGTH assert len(truncated) <= AUTHOR_NAME_MAX_LENGTH
def test_format_item_as_row_with_downloaded() -> None: def test_format_item_as_row_with_downloaded() -> None:
@@ -65,7 +71,7 @@ def test_format_item_as_row_with_downloaded() -> None:
"percent": 12.34, "percent": 12.34,
"asin": "ASIN123", "asin": "ASIN123",
} }
title, author, runtime, progress, downloaded = table_utils.format_item_as_row( title, author, runtime, progress, downloaded = format_item_as_row(
item, library, cast(Any, downloads) item, library, cast(Any, downloads)
) )
assert title == "Title" assert title == "Title"
@@ -79,5 +85,5 @@ def test_format_item_as_row_zero_progress() -> None:
library = StubLibrary() library = StubLibrary()
item = {"title": "Title", "authors": "Author", item = {"title": "Title", "authors": "Author",
"minutes": 30, "percent": 0.0} "minutes": 30, "percent": 0.0}
_, _, _, progress, _ = table_utils.format_item_as_row(item, library, None) _, _, _, progress, _ = format_item_as_row(item, library, None)
assert progress == "0%" assert progress == "0%"

View File

@@ -1,63 +1,35 @@
import json import json
from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
import pytest from auditui.stats.email import (
find_email_in_data,
from auditui import ui get_email_from_auth,
get_email_from_auth_file,
get_email_from_config,
@dataclass(slots=True) )
class DummyApp:
client: object | None = None
auth: object | None = None
library_client: object | None = None
all_items: list[dict] = field(default_factory=list)
BINDINGS: list[tuple[str, str, str]] = field(default_factory=list)
@pytest.fixture
def dummy_app() -> DummyApp:
return DummyApp()
def test_find_email_in_data() -> None: def test_find_email_in_data() -> None:
screen = ui.StatsScreen()
data = {"a": {"b": ["nope", "user@example.com"]}} data = {"a": {"b": ["nope", "user@example.com"]}}
assert screen._find_email_in_data(data) == "user@example.com" assert find_email_in_data(data) == "user@example.com"
def test_get_email_from_config( def test_get_email_from_config(tmp_path: Path) -> None:
tmp_path: Path, monkeypatch: pytest.MonkeyPatch, dummy_app: DummyApp
) -> None:
screen = ui.StatsScreen()
config_path = tmp_path / "config.json" config_path = tmp_path / "config.json"
config_path.write_text(json.dumps({"email": "config@example.com"})) config_path.write_text(json.dumps({"email": "config@example.com"}))
monkeypatch.setattr(ui, "CONFIG_PATH", config_path) assert get_email_from_config(config_path) == "config@example.com"
email = screen._get_email_from_config(dummy_app)
assert email == "config@example.com"
def test_get_email_from_auth_file( def test_get_email_from_auth_file(tmp_path: Path) -> None:
tmp_path: Path, monkeypatch: pytest.MonkeyPatch, dummy_app: DummyApp
) -> None:
screen = ui.StatsScreen()
auth_path = tmp_path / "auth.json" auth_path = tmp_path / "auth.json"
auth_path.write_text(json.dumps({"email": "auth@example.com"})) auth_path.write_text(json.dumps({"email": "auth@example.com"}))
monkeypatch.setattr(ui, "AUTH_PATH", auth_path) assert get_email_from_auth_file(auth_path) == "auth@example.com"
email = screen._get_email_from_auth_file(dummy_app)
assert email == "auth@example.com"
def test_get_email_from_auth(dummy_app: DummyApp) -> None: def test_get_email_from_auth() -> None:
screen = ui.StatsScreen()
class Auth: class Auth:
username = "user@example.com" username = "user@example.com"
login = None login = None
email = None email = None
dummy_app.auth = Auth() assert get_email_from_auth(Auth()) == "user@example.com"
assert screen._get_email_from_auth(dummy_app) == "user@example.com"