diff --git a/README.md b/README.md index dcb615a..9a81576 100644 --- a/README.md +++ b/README.md @@ -145,7 +145,7 @@ This project uses [uv](https://github.com/astral-sh/uv) for dependency managemen $ uv sync # modify the code... # ...and run the TUI -$ uv run python -m auditui.cli +$ uv run auditui ``` Don't forget to run the tests. diff --git a/auditui/__init__.py b/auditui/__init__.py index fceab3b..db53994 100644 --- a/auditui/__init__.py +++ b/auditui/__init__.py @@ -1,3 +1,3 @@ -"""Auditui package""" +"""Auditui: Audible TUI client. One folder per module; all code lives inside module packages.""" __version__ = "0.1.6" diff --git a/auditui/app.py b/auditui/app.py deleted file mode 100644 index b77e5fa..0000000 --- a/auditui/app.py +++ /dev/null @@ -1,614 +0,0 @@ -"""Textual application for the Audible TUI.""" - -from __future__ import annotations - -from typing import TYPE_CHECKING - -from textual import work -from textual.app import App, ComposeResult -from textual.containers import Horizontal -from textual.events import Key, Resize -from textual.widgets import DataTable, ProgressBar, Static -from textual.worker import get_current_worker - -from . import __version__ -from .constants import ( - PROGRESS_COLUMN_INDEX, - SEEK_SECONDS, - TABLE_COLUMN_DEFS, - TABLE_CSS, -) -from .downloads import DownloadManager -from .library import LibraryClient -from .playback import PlaybackController -from .table_utils import ( - create_progress_sort_key, - create_title_sort_key, - filter_unfinished_items, - format_item_as_row, -) -from .search_utils import build_search_text, filter_items -from .ui import FilterScreen, HelpScreen, StatsScreen - -if TYPE_CHECKING: - from textual.widgets.data_table import ColumnKey - - -class Auditui(App): - """Main application class for the Audible TUI app.""" - - SHOW_PALETTE = False - - BINDINGS = [ - ("?", "show_help", "Help"), - ("s", "show_stats", "Stats"), - ("/", "filter", "Filter"), - ("escape", "clear_filter", "Clear filter"), - ("n", "sort", "Sort by name"), - ("p", "sort_by_progress", "Sort by progress"), - ("a", "show_all", "All/Unfinished"), - ("r", "refresh", "Refresh"), - ("enter", "play_selected", "Play"), - ("space", "toggle_playback", "Pause/Resume"), - ("left", "seek_backward", "-30s"), - ("right", "seek_forward", "+30s"), - ("ctrl+left", "previous_chapter", "Previous chapter"), - ("ctrl+right", "next_chapter", "Next chapter"), - ("up", "increase_speed", "Increase speed"), - ("down", "decrease_speed", "Decrease speed"), - ("f", "toggle_finished", "Mark finished"), - ("d", "toggle_download", "Download/Delete"), - ("q", "quit", "Quit"), - ] - - CSS = TABLE_CSS - - def __init__(self, auth=None, client=None) -> None: - super().__init__() - self.auth = auth - self.client = client - self.library_client = LibraryClient(client) if client else None - self.download_manager = ( - DownloadManager(auth, client) if auth and client else None - ) - self.playback = PlaybackController(self.update_status, self.library_client) - - self.all_items: list[dict] = [] - self.current_items: list[dict] = [] - self._search_text_cache: dict[int, str] = {} - self.show_all_mode = False - self.filter_text = "" - self.title_sort_reverse = False - self.progress_sort_reverse = False - self.title_column_key: ColumnKey | None = None - self.progress_column_index = PROGRESS_COLUMN_INDEX - - def compose(self) -> ComposeResult: - yield Horizontal( - Static("? Help", id="top_left"), - Static(f"Auditui v{__version__}", id="top_center"), - Static("q Quit", id="top_right"), - id="top_bar", - ) - yield Static("Loading...", id="status") - table: DataTable = DataTable() - table.zebra_stripes = True - table.cursor_type = "row" - yield table - yield Static("", id="progress_info") - with Horizontal(id="progress_bar_container"): - yield ProgressBar( - id="progress_bar", show_eta=False, show_percentage=False, total=100 - ) - - def on_mount(self) -> None: - """Initialize the table and start fetching library data.""" - self.theme = "textual-dark" - table = self.query_one(DataTable) - for column_name, _ratio in TABLE_COLUMN_DEFS: - table.add_column(column_name) - self.call_after_refresh(lambda: self._apply_column_widths(table)) - column_keys = list(table.columns.keys()) - self.title_column_key = column_keys[0] - - if self.client: - self.update_status("Fetching library...") - self.fetch_library() - else: - self.update_status("Not authenticated. Please restart and authenticate.") - - self.set_interval(1.0, self._check_playback_status) - self.set_interval(0.5, self._update_progress) - self.set_interval(30.0, self._save_position_periodically) - - def on_unmount(self) -> None: - """Clean up on app exit.""" - self.playback.stop() - if self.download_manager: - self.download_manager.close() - - def on_resize(self, event: Resize) -> None: - """Keep table columns responsive to terminal width changes.""" - del event - try: - table = self.query_one(DataTable) - except Exception: - return - self.call_after_refresh(lambda: self._apply_column_widths(table)) - - def on_key(self, event: Key) -> None: - """Handle key presses.""" - if self.playback.is_playing: - if event.key == "ctrl+left": - event.prevent_default() - self.action_previous_chapter() - return - elif event.key == "ctrl+right": - event.prevent_default() - self.action_next_chapter() - return - elif event.key == "left": - event.prevent_default() - self.action_seek_backward() - return - elif event.key == "right": - event.prevent_default() - self.action_seek_forward() - return - elif event.key == "up": - event.prevent_default() - self.action_increase_speed() - return - elif event.key == "down": - event.prevent_default() - self.action_decrease_speed() - return - - if isinstance(self.focused, DataTable): - if event.key == "enter": - event.prevent_default() - self.action_play_selected() - elif event.key == "space": - event.prevent_default() - self.action_toggle_playback() - - def update_status(self, message: str) -> None: - """Update the status message in the UI.""" - status = self.query_one("#status", Static) - status.display = True - status.update(message) - - def _apply_column_widths(self, table: DataTable) -> None: - """Assign proportional column widths based on available space. - - Each column's render width = column.width + 2 * cell_padding. - We compute column.width values so columns fill the full table width. - """ - if not table.columns: - return - - column_keys = list(table.columns.keys()) - num_cols = len(column_keys) - ratios = [ratio for _, ratio in TABLE_COLUMN_DEFS] - total_ratio = sum(ratios) or num_cols - - content_width = table.scrollable_content_region.width - if content_width <= 0: - content_width = table.size.width - if content_width <= 0: - return - - padding_total = 2 * table.cell_padding * num_cols - distributable = max(num_cols, content_width - padding_total) - - widths: list[int] = [] - for ratio in ratios: - w = max(1, (distributable * ratio) // total_ratio) - widths.append(w) - - remainder = distributable - sum(widths) - if remainder > 0: - indices = sorted(range(num_cols), key=lambda i: ratios[i], reverse=True) - for i in range(remainder): - widths[indices[i % num_cols]] += 1 - - for column_key, w in zip(column_keys, widths): - col = table.columns[column_key] - col.auto_width = False - col.width = w - table.refresh() - - def _thread_status_update(self, message: str) -> None: - """Safely update status from worker threads.""" - self.call_from_thread(self.update_status, message) - - @work(exclusive=True, thread=True) - def fetch_library(self) -> None: - """Fetch all library items from Audible API in background thread.""" - worker = get_current_worker() - if worker.is_cancelled or not self.library_client: - return - - try: - all_items = self.library_client.fetch_all_items(self._thread_status_update) - self.call_from_thread(self.on_library_loaded, all_items) - except (OSError, ValueError, KeyError) as exc: - self.call_from_thread(self.on_library_error, str(exc)) - - def on_library_loaded(self, items: list[dict]) -> None: - """Handle successful library load.""" - self.all_items = items - self._search_text_cache.clear() - self._prime_search_cache(items) - self.update_status(f"Loaded {len(items)} books") - if self.show_all_mode: - self.show_all() - else: - self.show_unfinished() - - def on_library_error(self, error: str) -> None: - """Handle library fetch error.""" - self.update_status(f"Error fetching library: {error}") - - def _populate_table(self, items: list[dict]) -> None: - """Populate the DataTable with library items.""" - table = self.query_one(DataTable) - table.clear() - - if not items or not self.library_client: - self.update_status("No books found.") - return - - for item in items: - title, author, runtime, progress, downloaded = format_item_as_row( - item, self.library_client, self.download_manager - ) - table.add_row(title, author, runtime, progress, downloaded, key=title) - - self.current_items = items - status = self.query_one("#status", Static) - status.display = False - self._apply_column_widths(table) - - def _refresh_table(self) -> None: - """Refresh the table with current items.""" - if self.current_items: - self._populate_table(self.current_items) - - def show_all(self) -> None: - """Display all books in the table.""" - if not self.all_items: - return - self.show_all_mode = True - self._refresh_filtered_view() - - def show_unfinished(self) -> None: - """Display only unfinished books in the table.""" - if not self.all_items or not self.library_client: - return - self.show_all_mode = False - self._refresh_filtered_view() - - def action_sort(self) -> None: - """Sort table by title, toggling direction on each press.""" - table = self.query_one(DataTable) - if table.row_count > 0 and self.title_column_key: - title_key, reverse = create_title_sort_key(self.title_sort_reverse) - table.sort(key=title_key, reverse=reverse) - self.title_sort_reverse = not self.title_sort_reverse - - def action_sort_by_progress(self) -> None: - """Sort table by progress percentage, toggling direction on each press.""" - table = self.query_one(DataTable) - if table.row_count > 0: - self.progress_sort_reverse = not self.progress_sort_reverse - progress_key, reverse = create_progress_sort_key( - self.progress_column_index, self.progress_sort_reverse - ) - table.sort(key=progress_key, reverse=reverse) - - def action_show_all(self) -> None: - """Toggle between showing all and unfinished books.""" - if self.show_all_mode: - self.show_unfinished() - else: - self.show_all() - - def action_refresh(self) -> None: - """Refresh the library data from the API.""" - if not self.client: - self.update_status("Not authenticated. Cannot refresh.") - return - self.update_status("Refreshing library...") - self.fetch_library() - - def action_play_selected(self) -> None: - """Start playing the selected book.""" - if not self.download_manager: - self.update_status("Not authenticated. Please restart and authenticate.") - return - - table = self.query_one(DataTable) - if table.row_count == 0: - self.update_status("No books available") - return - - cursor_row = table.cursor_row - if cursor_row >= len(self.current_items): - self.update_status("Invalid selection") - return - - if not self.library_client: - self.update_status("Library client not available") - return - - selected_item = self.current_items[cursor_row] - asin = self.library_client.extract_asin(selected_item) - - if not asin: - self.update_status("Could not get ASIN for selected book") - return - - self._start_playback_async(asin) - - def action_toggle_playback(self) -> None: - """Toggle pause/resume state.""" - if not self.playback.toggle_playback(): - self._no_playback_message() - - def action_seek_forward(self) -> None: - """Seek forward 30 seconds.""" - if not self.playback.seek_forward(SEEK_SECONDS): - self._no_playback_message() - - def action_seek_backward(self) -> None: - """Seek backward 30 seconds.""" - if not self.playback.seek_backward(SEEK_SECONDS): - self._no_playback_message() - - def action_next_chapter(self) -> None: - """Seek to the next chapter.""" - if not self.playback.seek_to_next_chapter(): - self._no_playback_message() - - def action_previous_chapter(self) -> None: - """Seek to the previous chapter.""" - if not self.playback.seek_to_previous_chapter(): - self._no_playback_message() - - def action_increase_speed(self) -> None: - """Increase playback speed.""" - if not self.playback.increase_speed(): - self._no_playback_message() - - def action_decrease_speed(self) -> None: - """Decrease playback speed.""" - if not self.playback.decrease_speed(): - self._no_playback_message() - - def action_toggle_finished(self) -> None: - """Toggle finished/unfinished status for the selected book.""" - if not self.library_client: - self.update_status("Library client not available") - return - - table = self.query_one(DataTable) - if table.row_count == 0: - self.update_status("No books available") - return - - cursor_row = table.cursor_row - if cursor_row >= len(self.current_items): - self.update_status("Invalid selection") - return - - selected_item = self.current_items[cursor_row] - asin = self.library_client.extract_asin(selected_item) - - if not asin: - self.update_status("Could not get ASIN for selected book") - return - - self._toggle_finished_async(asin) - - @work(exclusive=True, thread=True) - def _toggle_finished_async(self, asin: str) -> None: - """Toggle finished/unfinished status asynchronously.""" - if not self.library_client: - return - - selected_item = None - for item in self.current_items: - if self.library_client.extract_asin(item) == asin: - selected_item = item - break - - if not selected_item: - return - - is_currently_finished = self.library_client.is_finished(selected_item) - - if is_currently_finished: - self.call_from_thread(self.update_status, "Already marked as finished") - return - - success = self.library_client.mark_as_finished(asin, selected_item) - message = "Marked as finished" if success else "Failed to mark as finished" - - self.call_from_thread(self.update_status, message) - if success: - if self.download_manager and self.download_manager.is_cached(asin): - self.download_manager.remove_cached( - asin, notify=self._thread_status_update - ) - self.call_from_thread(self.fetch_library) - - def _no_playback_message(self) -> None: - """Show message when no playback is active.""" - self.update_status("No playback active. Press Enter to play a book.") - - def action_show_help(self) -> None: - """Show the help screen with all keybindings.""" - self.push_screen(HelpScreen()) - - def action_show_stats(self) -> None: - """Show the stats screen with listening statistics.""" - self.push_screen(StatsScreen()) - - def action_filter(self) -> None: - """Show the filter screen to search the library.""" - self.push_screen( - FilterScreen( - self.filter_text, - on_change=self._apply_filter, - ), - self._apply_filter, - ) - - def action_clear_filter(self) -> None: - """Clear the current filter if active.""" - if self.filter_text: - self.filter_text = "" - self._refresh_filtered_view() - self.update_status("Filter cleared") - - def _apply_filter(self, filter_text: str | None) -> None: - """Apply the filter to the library.""" - self.filter_text = filter_text or "" - self._refresh_filtered_view() - - def _refresh_filtered_view(self) -> None: - """Refresh the table with current filter and view mode.""" - if not self.all_items: - return - - items = self.all_items - - if self.filter_text: - items = filter_items(items, self.filter_text, self._get_search_text) - self._populate_table(items) - self.update_status(f"Filter: '{self.filter_text}' ({len(items)} books)") - return - - if not self.show_all_mode and self.library_client: - items = filter_unfinished_items(items, self.library_client) - - self._populate_table(items) - - def _get_search_text(self, item: dict) -> str: - """Return cached search text for filtering.""" - cache_key = id(item) - cached = self._search_text_cache.get(cache_key) - if cached is not None: - return cached - search_text = build_search_text(item, self.library_client) - self._search_text_cache[cache_key] = search_text - return search_text - - def _prime_search_cache(self, items: list[dict]) -> None: - """Precompute search text for a list of items.""" - for item in items: - self._get_search_text(item) - - def _check_playback_status(self) -> None: - """Check if playback process has finished and update state accordingly.""" - message = self.playback.check_status() - if message: - self.update_status(message) - self._hide_progress() - - def _update_progress(self) -> None: - """Update the progress bar and info during playback.""" - if not self.playback.is_playing: - self._hide_progress() - return - - progress_data = self.playback.get_current_progress() - if not progress_data: - self._hide_progress() - return - - chapter_name, chapter_elapsed, chapter_total = progress_data - if chapter_total <= 0: - self._hide_progress() - return - - progress_info = self.query_one("#progress_info", Static) - progress_bar = self.query_one("#progress_bar", ProgressBar) - progress_bar_container = self.query_one("#progress_bar_container", Horizontal) - - progress_percent = min( - 100.0, max(0.0, (chapter_elapsed / chapter_total) * 100.0) - ) - progress_bar.update(progress=progress_percent) - chapter_elapsed_str = LibraryClient.format_time(chapter_elapsed) - chapter_total_str = LibraryClient.format_time(chapter_total) - progress_info.update( - f"{chapter_name} | {chapter_elapsed_str} / {chapter_total_str}" - ) - progress_info.display = True - progress_bar_container.display = True - - def _hide_progress(self) -> None: - """Hide the progress widget.""" - progress_info = self.query_one("#progress_info", Static) - progress_bar_container = self.query_one("#progress_bar_container", Horizontal) - progress_info.display = False - progress_bar_container.display = False - - def _save_position_periodically(self) -> None: - """Periodically save playback position.""" - self.playback.update_position_if_needed() - - def action_toggle_download(self) -> None: - """Toggle download/remove for the selected book.""" - if not self.download_manager: - self.update_status("Not authenticated. Please restart and authenticate.") - return - - table = self.query_one(DataTable) - if table.row_count == 0: - self.update_status("No books available") - return - - cursor_row = table.cursor_row - if cursor_row >= len(self.current_items): - self.update_status("Invalid selection") - return - - if not self.library_client: - self.update_status("Library client not available") - return - - selected_item = self.current_items[cursor_row] - asin = self.library_client.extract_asin(selected_item) - - if not asin: - self.update_status("Could not get ASIN for selected book") - return - - self._toggle_download_async(asin) - - @work(exclusive=True, thread=True) - def _toggle_download_async(self, asin: str) -> None: - """Toggle download/remove asynchronously.""" - if not self.download_manager: - return - - if self.download_manager.is_cached(asin): - self.download_manager.remove_cached(asin, self._thread_status_update) - else: - self.download_manager.get_or_download(asin, self._thread_status_update) - - self.call_from_thread(self._refresh_table) - - @work(exclusive=True, thread=True) - def _start_playback_async(self, asin: str) -> None: - """Start playback asynchronously.""" - if not self.download_manager: - return - self.playback.prepare_and_start( - self.download_manager, - asin, - self._thread_status_update, - ) diff --git a/auditui/app/__init__.py b/auditui/app/__init__.py new file mode 100644 index 0000000..e1c10b4 --- /dev/null +++ b/auditui/app/__init__.py @@ -0,0 +1,27 @@ +"""Main Textual app: table, bindings, and orchestration of library, playback, and downloads.""" + +from __future__ import annotations + +from textual.app import App + +from ..constants import TABLE_CSS + +from .bindings import BINDINGS +from .state import init_auditui_state +from .layout import AppLayoutMixin +from .table import AppTableMixin +from .library import AppLibraryMixin +from .actions import AppActionsMixin +from .progress import AppProgressMixin + + +class Auditui(App, AppProgressMixin, AppActionsMixin, AppLibraryMixin, AppTableMixin, AppLayoutMixin): + """Orchestrates the library table, playback, downloads, filter, and modal screens.""" + + SHOW_PALETTE = False + BINDINGS = BINDINGS + CSS = TABLE_CSS + + def __init__(self, auth=None, client=None) -> None: + super().__init__() + init_auditui_state(self, auth, client) diff --git a/auditui/app/actions.py b/auditui/app/actions.py new file mode 100644 index 0000000..284c61c --- /dev/null +++ b/auditui/app/actions.py @@ -0,0 +1,159 @@ +"""Selection, playback/download/finish actions, modals, and filter.""" + +from __future__ import annotations + +from textual import work +from textual.widgets import DataTable + +from ..constants import SEEK_SECONDS +from ..ui import FilterScreen, HelpScreen, StatsScreen + + +class AppActionsMixin: + def _get_selected_asin(self) -> str | None: + if not self.download_manager: + self.update_status( + "Not authenticated. Please restart and authenticate.") + return None + table = self.query_one(DataTable) + if table.row_count == 0: + self.update_status("No books available") + return None + cursor_row = table.cursor_row + if cursor_row >= len(self.current_items): + self.update_status("Invalid selection") + return None + if not self.library_client: + self.update_status("Library client not available") + return None + selected_item = self.current_items[cursor_row] + asin = self.library_client.extract_asin(selected_item) + if not asin: + self.update_status("Could not get ASIN for selected book") + return None + return asin + + def action_play_selected(self) -> None: + asin = self._get_selected_asin() + if asin: + self._start_playback_async(asin) + + def action_toggle_playback(self) -> None: + if not self.playback.toggle_playback(): + self._no_playback_message() + + def action_seek_forward(self) -> None: + if not self.playback.seek_forward(SEEK_SECONDS): + self._no_playback_message() + + def action_seek_backward(self) -> None: + if not self.playback.seek_backward(SEEK_SECONDS): + self._no_playback_message() + + def action_next_chapter(self) -> None: + if not self.playback.seek_to_next_chapter(): + self._no_playback_message() + + def action_previous_chapter(self) -> None: + if not self.playback.seek_to_previous_chapter(): + self._no_playback_message() + + def action_increase_speed(self) -> None: + if not self.playback.increase_speed(): + self._no_playback_message() + + def action_decrease_speed(self) -> None: + if not self.playback.decrease_speed(): + self._no_playback_message() + + def action_toggle_finished(self) -> None: + asin = self._get_selected_asin() + if asin: + self._toggle_finished_async(asin) + + @work(exclusive=True, thread=True) + def _toggle_finished_async(self, asin: str) -> None: + if not self.library_client: + return + + selected_item = None + for item in self.current_items: + if self.library_client.extract_asin(item) == asin: + selected_item = item + break + + if not selected_item: + return + + if self.library_client.is_finished(selected_item): + self.call_from_thread(self.update_status, + "Already marked as finished") + return + + success = self.library_client.mark_as_finished(asin, selected_item) + message = "Marked as finished" if success else "Failed to mark as finished" + + self.call_from_thread(self.update_status, message) + if success: + if self.download_manager and self.download_manager.is_cached(asin): + self.download_manager.remove_cached( + asin, notify=self._thread_status_update + ) + self.call_from_thread(self.fetch_library) + + def _no_playback_message(self) -> None: + self.update_status("No playback active. Press Enter to play a book.") + + def action_show_help(self) -> None: + self.push_screen(HelpScreen()) + + def action_show_stats(self) -> None: + self.push_screen(StatsScreen()) + + def action_filter(self) -> None: + self.push_screen( + FilterScreen( + self.filter_text, + on_change=self._apply_filter, + ), + self._apply_filter, + ) + + def action_clear_filter(self) -> None: + if self.filter_text: + self.filter_text = "" + self._refresh_filtered_view() + self.update_status("Filter cleared") + + def _apply_filter(self, filter_text: str | None) -> None: + self.filter_text = filter_text or "" + self._refresh_filtered_view() + + def action_toggle_download(self) -> None: + asin = self._get_selected_asin() + if asin: + self._toggle_download_async(asin) + + @work(exclusive=True, thread=True) + def _toggle_download_async(self, asin: str) -> None: + if not self.download_manager: + return + + if self.download_manager.is_cached(asin): + self.download_manager.remove_cached( + asin, self._thread_status_update) + else: + self.download_manager.get_or_download( + asin, self._thread_status_update) + + self.call_from_thread(self._refresh_table) + + @work(exclusive=True, thread=True) + def _start_playback_async(self, asin: str) -> None: + if not self.download_manager: + return + self.playback.prepare_and_start( + self.download_manager, + asin, + self._thread_status_update, + ) diff --git a/auditui/app/bindings.py b/auditui/app/bindings.py new file mode 100644 index 0000000..2aa04a4 --- /dev/null +++ b/auditui/app/bindings.py @@ -0,0 +1,23 @@ +"""Key bindings for the main app.""" + +BINDINGS = [ + ("?", "show_help", "Help"), + ("s", "show_stats", "Stats"), + ("/", "filter", "Filter"), + ("escape", "clear_filter", "Clear filter"), + ("n", "sort", "Sort by name"), + ("p", "sort_by_progress", "Sort by progress"), + ("a", "show_all", "All/Unfinished"), + ("r", "refresh", "Refresh"), + ("enter", "play_selected", "Play"), + ("space", "toggle_playback", "Pause/Resume"), + ("left", "seek_backward", "-30s"), + ("right", "seek_forward", "+30s"), + ("ctrl+left", "previous_chapter", "Previous chapter"), + ("ctrl+right", "next_chapter", "Next chapter"), + ("up", "increase_speed", "Increase speed"), + ("down", "decrease_speed", "Decrease speed"), + ("f", "toggle_finished", "Mark finished"), + ("d", "toggle_download", "Download/Delete"), + ("q", "quit", "Quit"), +] diff --git a/auditui/app/layout.py b/auditui/app/layout.py new file mode 100644 index 0000000..4348a0c --- /dev/null +++ b/auditui/app/layout.py @@ -0,0 +1,108 @@ +"""Main layout: compose, mount, resize, status bar, table column widths.""" + +from __future__ import annotations + +from textual.app import ComposeResult +from textual.containers import Horizontal +from textual.events import Resize +from textual.widgets import DataTable, ProgressBar, Static + +from .. import __version__ +from ..constants import TABLE_COLUMN_DEFS, TABLE_CSS + + +class AppLayoutMixin: + def compose(self) -> ComposeResult: + yield Horizontal( + Static("? Help", id="top_left"), + Static(f"Auditui v{__version__}", id="top_center"), + Static("q Quit", id="top_right"), + id="top_bar", + ) + yield Static("Loading...", id="status") + table = DataTable() + table.zebra_stripes = True + table.cursor_type = "row" + yield table + yield Static("", id="progress_info") + with Horizontal(id="progress_bar_container"): + yield ProgressBar( + id="progress_bar", show_eta=False, show_percentage=False, total=100 + ) + + def on_mount(self) -> None: + self.theme = "textual-dark" + table = self.query_one(DataTable) + for column_name, _ratio in TABLE_COLUMN_DEFS: + table.add_column(column_name) + self.call_after_refresh(lambda: self._apply_column_widths(table)) + column_keys = list(table.columns.keys()) + self.title_column_key = column_keys[0] + + if self.client: + self.update_status("Fetching library...") + self.fetch_library() + else: + self.update_status( + "Not authenticated. Please restart and authenticate.") + + self.set_interval(1.0, self._check_playback_status) + self.set_interval(0.5, self._update_progress) + self.set_interval(30.0, self._save_position_periodically) + + def on_unmount(self) -> None: + self.playback.stop() + if self.download_manager: + self.download_manager.close() + + def on_resize(self, event: Resize) -> None: + del event + try: + table = self.query_one(DataTable) + except Exception: + return + self.call_after_refresh(lambda: self._apply_column_widths(table)) + + def update_status(self, message: str) -> None: + status = self.query_one("#status", Static) + status.display = True + status.update(message) + + def _apply_column_widths(self, table: DataTable) -> None: + if not table.columns: + return + + column_keys = list(table.columns.keys()) + num_cols = len(column_keys) + ratios = [ratio for _, ratio in TABLE_COLUMN_DEFS] + total_ratio = sum(ratios) or num_cols + + content_width = table.scrollable_content_region.width + if content_width <= 0: + content_width = table.size.width + if content_width <= 0: + return + + padding_total = 2 * table.cell_padding * num_cols + distributable = max(num_cols, content_width - padding_total) + + widths = [] + for ratio in ratios: + w = max(1, (distributable * ratio) // total_ratio) + widths.append(w) + + remainder = distributable - sum(widths) + if remainder > 0: + indices = sorted( + range(num_cols), key=lambda i: ratios[i], reverse=True) + for i in range(remainder): + widths[indices[i % num_cols]] += 1 + + for column_key, w in zip(column_keys, widths): + col = table.columns[column_key] + col.auto_width = False + col.width = w + table.refresh() + + def _thread_status_update(self, message: str) -> None: + self.call_from_thread(self.update_status, message) diff --git a/auditui/app/library.py b/auditui/app/library.py new file mode 100644 index 0000000..bd2075b --- /dev/null +++ b/auditui/app/library.py @@ -0,0 +1,36 @@ +"""Library fetch and load/error handlers.""" + +from __future__ import annotations + +from textual import work +from textual.worker import get_current_worker + +from ..types import LibraryItem + + +class AppLibraryMixin: + @work(exclusive=True, thread=True) + def fetch_library(self) -> None: + worker = get_current_worker() + if worker.is_cancelled or not self.library_client: + return + + try: + all_items = self.library_client.fetch_all_items( + self._thread_status_update) + self.call_from_thread(self.on_library_loaded, all_items) + except (OSError, ValueError, KeyError) as exc: + self.call_from_thread(self.on_library_error, str(exc)) + + def on_library_loaded(self, items: list[LibraryItem]) -> None: + self.all_items = items + self._search_text_cache.clear() + self._prime_search_cache(items) + self.update_status(f"Loaded {len(items)} books") + if self.show_all_mode: + self.show_all() + else: + self.show_unfinished() + + def on_library_error(self, error: str) -> None: + self.update_status(f"Error fetching library: {error}") diff --git a/auditui/app/progress.py b/auditui/app/progress.py new file mode 100644 index 0000000..659bb2c --- /dev/null +++ b/auditui/app/progress.py @@ -0,0 +1,94 @@ +"""Playback key handling, progress bar updates, and position save.""" + +from __future__ import annotations + +from textual.containers import Horizontal +from textual.events import Key +from textual.widgets import DataTable, ProgressBar, Static + +from ..library import LibraryClient + + +class AppProgressMixin: + def on_key(self, event: Key) -> None: + if self.playback.is_playing: + if event.key == "ctrl+left": + event.prevent_default() + self.action_previous_chapter() + return + elif event.key == "ctrl+right": + event.prevent_default() + self.action_next_chapter() + return + elif event.key == "left": + event.prevent_default() + self.action_seek_backward() + return + elif event.key == "right": + event.prevent_default() + self.action_seek_forward() + return + elif event.key == "up": + event.prevent_default() + self.action_increase_speed() + return + elif event.key == "down": + event.prevent_default() + self.action_decrease_speed() + return + + if isinstance(self.focused, DataTable): + if event.key == "enter": + event.prevent_default() + self.action_play_selected() + elif event.key == "space": + event.prevent_default() + self.action_toggle_playback() + + def _check_playback_status(self) -> None: + message = self.playback.check_status() + if message: + self.update_status(message) + self._hide_progress() + + def _update_progress(self) -> None: + if not self.playback.is_playing: + self._hide_progress() + return + + progress_data = self.playback.get_current_progress() + if not progress_data: + self._hide_progress() + return + + chapter_name, chapter_elapsed, chapter_total = progress_data + if chapter_total <= 0: + self._hide_progress() + return + + progress_info = self.query_one("#progress_info", Static) + progress_bar = self.query_one("#progress_bar", ProgressBar) + progress_bar_container = self.query_one( + "#progress_bar_container", Horizontal) + + progress_percent = min( + 100.0, max(0.0, (chapter_elapsed / chapter_total) * 100.0) + ) + progress_bar.update(progress=progress_percent) + chapter_elapsed_str = LibraryClient.format_time(chapter_elapsed) + chapter_total_str = LibraryClient.format_time(chapter_total) + progress_info.update( + f"{chapter_name} | {chapter_elapsed_str} / {chapter_total_str}" + ) + progress_info.display = True + progress_bar_container.display = True + + def _hide_progress(self) -> None: + progress_info = self.query_one("#progress_info", Static) + progress_bar_container = self.query_one( + "#progress_bar_container", Horizontal) + progress_info.display = False + progress_bar_container.display = False + + def _save_position_periodically(self) -> None: + self.playback.update_position_if_needed() diff --git a/auditui/app/state.py b/auditui/app/state.py new file mode 100644 index 0000000..90b048f --- /dev/null +++ b/auditui/app/state.py @@ -0,0 +1,31 @@ +"""App state initialization.""" + +from __future__ import annotations + +from ..constants import PROGRESS_COLUMN_INDEX +from ..downloads import DownloadManager +from ..library import LibraryClient +from ..playback import PlaybackController + + +def init_auditui_state(self: object, auth=None, client=None) -> None: + setattr(self, "auth", auth) + setattr(self, "client", client) + setattr(self, "library_client", LibraryClient(client) if client else None) + setattr( + self, + "download_manager", + DownloadManager(auth, client) if auth and client else None, + ) + notify = getattr(self, "update_status") + lib_client = LibraryClient(client) if client else None + setattr(self, "playback", PlaybackController(notify, lib_client)) + setattr(self, "all_items", []) + setattr(self, "current_items", []) + setattr(self, "_search_text_cache", {}) + setattr(self, "show_all_mode", False) + setattr(self, "filter_text", "") + setattr(self, "title_sort_reverse", False) + setattr(self, "progress_sort_reverse", False) + setattr(self, "title_column_key", None) + setattr(self, "progress_column_index", PROGRESS_COLUMN_INDEX) diff --git a/auditui/app/table.py b/auditui/app/table.py new file mode 100644 index 0000000..2faeb92 --- /dev/null +++ b/auditui/app/table.py @@ -0,0 +1,106 @@ +"""Table population, sorting, filter view, and search cache.""" + +from __future__ import annotations + +from ..library import ( + filter_items, + filter_unfinished_items, + format_item_as_row, + create_progress_sort_key, + create_title_sort_key, +) +from ..types import LibraryItem +from textual.widgets import DataTable, Static + + +class AppTableMixin: + def _populate_table(self, items: list[LibraryItem]) -> None: + table = self.query_one(DataTable) + table.clear() + + if not items or not self.library_client: + self.update_status("No books found.") + return + + for item in items: + title, author, runtime, progress, downloaded = format_item_as_row( + item, self.library_client, self.download_manager + ) + table.add_row(title, author, runtime, + progress, downloaded, key=title) + + self.current_items = items + status = self.query_one("#status", Static) + status.display = False + self._apply_column_widths(table) + + def _refresh_table(self) -> None: + if self.current_items: + self._populate_table(self.current_items) + + def show_all(self) -> None: + if not self.all_items: + return + self.show_all_mode = True + self._refresh_filtered_view() + + def show_unfinished(self) -> None: + if not self.all_items or not self.library_client: + return + self.show_all_mode = False + self._refresh_filtered_view() + + def action_sort(self) -> None: + table = self.query_one(DataTable) + if table.row_count > 0 and self.title_column_key: + title_key, reverse = create_title_sort_key(self.title_sort_reverse) + table.sort(key=title_key, reverse=reverse) + self.title_sort_reverse = not self.title_sort_reverse + + def action_sort_by_progress(self) -> None: + table = self.query_one(DataTable) + if table.row_count > 0: + self.progress_sort_reverse = not self.progress_sort_reverse + progress_key, reverse = create_progress_sort_key( + self.progress_column_index, self.progress_sort_reverse + ) + table.sort(key=progress_key, reverse=reverse) + + def action_show_all(self) -> None: + if self.show_all_mode: + self.show_unfinished() + else: + self.show_all() + + def _refresh_filtered_view(self) -> None: + if not self.all_items: + return + + items = self.all_items + + if self.filter_text: + items = filter_items(items, self.filter_text, + self._get_search_text) + self._populate_table(items) + self.update_status( + f"Filter: '{self.filter_text}' ({len(items)} books)") + return + + if not self.show_all_mode and self.library_client: + items = filter_unfinished_items(items, self.library_client) + + self._populate_table(items) + + def _get_search_text(self, item: LibraryItem) -> str: + cache_key = id(item) + cached = self._search_text_cache.get(cache_key) + if cached is not None: + return cached + from ..library import build_search_text + search_text = build_search_text(item, self.library_client) + self._search_text_cache[cache_key] = search_text + return search_text + + def _prime_search_cache(self, items: list[LibraryItem]) -> None: + for item in items: + self._get_search_text(item) diff --git a/auditui/auth.py b/auditui/auth/__init__.py similarity index 61% rename from auditui/auth.py rename to auditui/auth/__init__.py index f1ca3a9..fbe2b69 100644 --- a/auditui/auth.py +++ b/auditui/auth/__init__.py @@ -1,19 +1,20 @@ -"""Authentication helpers for the Auditui app.""" +"""Load saved Audible credentials and build authenticator and API client.""" from pathlib import Path import audible -from .constants import AUTH_PATH +from ..constants import AUTH_PATH def authenticate( auth_path: Path = AUTH_PATH, ) -> tuple[audible.Authenticator, audible.Client]: - """Authenticate with Audible and return authenticator and client.""" + """Load auth from file and return (Authenticator, Client). Raises if file missing or invalid.""" if not auth_path.exists(): raise FileNotFoundError( - "Authentication file not found. Please run 'auditui configure' to set up authentication.") + "Authentication file not found. Please run 'auditui configure' to set up authentication." + ) try: authenticator = audible.Authenticator.from_file(str(auth_path)) @@ -21,4 +22,5 @@ def authenticate( return authenticator, audible_client except (OSError, ValueError, KeyError) as exc: raise ValueError( - f"Failed to load existing authentication: {exc}") from exc + f"Failed to load existing authentication: {exc}" + ) from exc diff --git a/auditui/cli/__init__.py b/auditui/cli/__init__.py new file mode 100644 index 0000000..4f80a9c --- /dev/null +++ b/auditui/cli/__init__.py @@ -0,0 +1,5 @@ +"""CLI package; entry point is main() from .main.""" + +from .main import main + +__all__ = ["main"] diff --git a/auditui/cli.py b/auditui/cli/main.py similarity index 91% rename from auditui/cli.py rename to auditui/cli/main.py index 02bddd8..2ce2257 100644 --- a/auditui/cli.py +++ b/auditui/cli/main.py @@ -1,5 +1,4 @@ -#!/usr/bin/env python3 -"""Auditui entrypoint.""" +"""CLI entrypoint: configure subcommand or authenticate and run the TUI.""" import argparse import sys @@ -12,7 +11,6 @@ from auditui.constants import AUTH_PATH def main() -> None: - """Authenticate and launch the app.""" parser = argparse.ArgumentParser(prog="auditui") parser.add_argument( "-v", @@ -52,7 +50,3 @@ def main() -> None: app = Auditui(auth=auth, client=client) app.run() - - -if __name__ == "__main__": - main() diff --git a/auditui/configure.py b/auditui/configure/__init__.py similarity index 78% rename from auditui/configure.py rename to auditui/configure/__init__.py index 74fcf4f..f8b15d2 100644 --- a/auditui/configure.py +++ b/auditui/configure/__init__.py @@ -1,4 +1,4 @@ -"""Configuration helpers for the Auditui app.""" +"""Interactive setup of Audible credentials; writes auth and config files.""" import json from getpass import getpass @@ -6,13 +6,13 @@ from pathlib import Path import audible -from .constants import AUTH_PATH, CONFIG_PATH +from ..constants import AUTH_PATH, CONFIG_PATH def configure( auth_path: Path = AUTH_PATH, ) -> tuple[audible.Authenticator, audible.Client]: - """Force re-authentication and save credentials.""" + """Prompt for email/password/locale, authenticate, and save auth.json and config.json.""" if auth_path.exists(): response = input( "Configuration already exists. Are you sure you want to overwrite it? (y/N): " @@ -26,7 +26,8 @@ def configure( email = input("\nEmail: ") password = getpass("Password: ") marketplace = input( - "Marketplace locale (default: US): ").strip().upper() or "US" + "Marketplace locale (default: US): " + ).strip().upper() or "US" authenticator = audible.Authenticator.from_login( username=email, password=password, locale=marketplace diff --git a/auditui/constants.py b/auditui/constants/__init__.py similarity index 98% rename from auditui/constants.py rename to auditui/constants/__init__.py index c3a461d..8471512 100644 --- a/auditui/constants.py +++ b/auditui/constants/__init__.py @@ -1,4 +1,4 @@ -"""Shared constants for the Auditui application.""" +"""Paths, API/config values, and CSS used across the application.""" from pathlib import Path diff --git a/auditui/downloads/__init__.py b/auditui/downloads/__init__.py new file mode 100644 index 0000000..c0ca62b --- /dev/null +++ b/auditui/downloads/__init__.py @@ -0,0 +1,5 @@ +"""Download and cache of Audible AAX files.""" + +from .manager import DownloadManager + +__all__ = ["DownloadManager"] diff --git a/auditui/downloads.py b/auditui/downloads/manager.py similarity index 88% rename from auditui/downloads.py rename to auditui/downloads/manager.py index 20d7c50..6cc3bca 100644 --- a/auditui/downloads.py +++ b/auditui/downloads/manager.py @@ -1,27 +1,25 @@ -"""Download helpers for Audible content.""" +"""Obtains AAX files from Audible (cache or download) and provides activation bytes.""" import re from pathlib import Path -from typing import Callable from urllib.parse import urlparse import audible import httpx from audible.activation_bytes import get_activation_bytes -from .constants import ( +from ..constants import ( CACHE_DIR, DEFAULT_CHUNK_SIZE, DEFAULT_CODEC, DOWNLOAD_URL, MIN_FILE_SIZE, ) - -StatusCallback = Callable[[str], None] +from ..types import StatusCallback class DownloadManager: - """Handle retrieval and download of Audible titles.""" + """Obtains AAX files from Audible (cache or download) and provides activation bytes.""" def __init__( self, @@ -35,16 +33,18 @@ class DownloadManager: self.cache_dir = cache_dir self.cache_dir.mkdir(parents=True, exist_ok=True) self.chunk_size = chunk_size - self._http_client = httpx.Client(auth=auth, timeout=30.0, follow_redirects=True) + self._http_client = httpx.Client( + auth=auth, timeout=30.0, follow_redirects=True) self._download_client = httpx.Client( - timeout=httpx.Timeout(connect=30.0, read=None, write=30.0, pool=30.0), + timeout=httpx.Timeout(connect=30.0, read=None, + write=30.0, pool=30.0), follow_redirects=True, ) def get_or_download( self, asin: str, notify: StatusCallback | None = None ) -> Path | None: - """Get local path of AAX file, downloading if missing.""" + """Return local path to AAX file; download and cache if not present.""" title = self._get_name_from_asin(asin) or asin safe_title = self._sanitize_filename(title) local_path = self.cache_dir / f"{safe_title}.aax" @@ -81,7 +81,7 @@ class DownloadManager: return local_path def get_activation_bytes(self) -> str | None: - """Get activation bytes as hex string.""" + """Return activation bytes as hex string for ffplay/ffmpeg.""" try: activation_bytes = get_activation_bytes(self.auth) if isinstance(activation_bytes, bytes): @@ -91,7 +91,7 @@ class DownloadManager: return None def get_cached_path(self, asin: str) -> Path | None: - """Get the cached file path for a book if it exists.""" + """Return path to cached AAX file if it exists and is valid size.""" title = self._get_name_from_asin(asin) or asin safe_title = self._sanitize_filename(title) local_path = self.cache_dir / f"{safe_title}.aax" @@ -100,11 +100,11 @@ class DownloadManager: return None def is_cached(self, asin: str) -> bool: - """Check if a book is already cached.""" + """Return True if the title is present in cache with valid size.""" return self.get_cached_path(asin) is not None def remove_cached(self, asin: str, notify: StatusCallback | None = None) -> bool: - """Remove a cached book file.""" + """Delete the cached AAX file for the given ASIN. Returns True on success.""" cached_path = self.get_cached_path(asin) if not cached_path: if notify: @@ -151,7 +151,7 @@ class DownloadManager: codec: str = DEFAULT_CODEC, notify: StatusCallback | None = None, ) -> str | None: - """Get download link for book.""" + """Obtain CDN download URL for the given ASIN and codec.""" if self.auth.adp_token is None: if notify: notify("Missing ADP token (not authenticated?)") @@ -189,7 +189,7 @@ class DownloadManager: def _download_file( self, url: str, dest_path: Path, notify: StatusCallback | None = None ) -> Path | None: - """Download file from URL to destination.""" + """Stream download from URL to dest_path; reports progress via notify.""" try: with self._download_client.stream("GET", url) as response: response.raise_for_status() @@ -240,7 +240,7 @@ class DownloadManager: return None def close(self) -> None: - """Close the HTTP clients and release resources.""" + """Close internal HTTP clients. Safe to call multiple times.""" if hasattr(self, "_http_client"): self._http_client.close() if hasattr(self, "_download_client"): diff --git a/auditui/library/__init__.py b/auditui/library/__init__.py new file mode 100644 index 0000000..57d90d3 --- /dev/null +++ b/auditui/library/__init__.py @@ -0,0 +1,22 @@ +"""Fetching, formatting, and filtering of the user's Audible library.""" + +from .client import LibraryClient +from .search import build_search_text, filter_items +from .table import ( + create_progress_sort_key, + create_title_sort_key, + filter_unfinished_items, + format_item_as_row, + truncate_author_name, +) + +__all__ = [ + "LibraryClient", + "build_search_text", + "filter_items", + "create_progress_sort_key", + "create_title_sort_key", + "filter_unfinished_items", + "format_item_as_row", + "truncate_author_name", +] diff --git a/auditui/library.py b/auditui/library/client.py similarity index 82% rename from auditui/library.py rename to auditui/library/client.py index b3a00fd..58ae618 100644 --- a/auditui/library.py +++ b/auditui/library/client.py @@ -1,21 +1,19 @@ -"""Library helpers for fetching and formatting Audible data.""" +"""Client for the Audible library API.""" from concurrent.futures import ThreadPoolExecutor, as_completed -from typing import Callable import audible - -ProgressCallback = Callable[[str], None] +from ..types import LibraryItem, StatusCallback class LibraryClient: - """Helper for interacting with the Audible library.""" + """Client for the Audible library API. Fetches items, extracts metadata, and updates positions.""" def __init__(self, client: audible.Client) -> None: self.client = client - def fetch_all_items(self, on_progress: ProgressCallback | None = None) -> list: + def fetch_all_items(self, on_progress: StatusCallback | None = None) -> list[LibraryItem]: """Fetch all library items from the API.""" response_groups = ( "contributors,media,product_attrs,product_desc,product_details," @@ -25,8 +23,8 @@ class LibraryClient: def _fetch_page( self, page: int, page_size: int, response_groups: str - ) -> tuple[int, list[dict]]: - """Fetch a single page of library items.""" + ) -> tuple[int, list[LibraryItem]]: + """Fetch a single page of library items from the API.""" library = self.client.get( path="library", num_results=page_size, @@ -37,9 +35,9 @@ class LibraryClient: return page, list(items) def _fetch_all_pages( - self, response_groups: str, on_progress: ProgressCallback | None = None - ) -> list: - """Fetch all pages of library items from the API using maximum parallel fetching.""" + self, response_groups: str, on_progress: StatusCallback | None = None + ) -> list[LibraryItem]: + """Fetch all pages of library items using parallel requests.""" library_response = None page_size = 200 @@ -63,7 +61,7 @@ class LibraryClient: if not first_page_items: return [] - all_items: list[dict] = list(first_page_items) + all_items: list[LibraryItem] = list(first_page_items) if on_progress: on_progress(f"Fetched page 1 ({len(first_page_items)} items)...") @@ -80,7 +78,7 @@ class LibraryClient: estimated_pages = 500 max_workers = 50 - page_results: dict[int, list[dict]] = {} + page_results: dict[int, list[LibraryItem]] = {} with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_page: dict = {} @@ -119,8 +117,8 @@ class LibraryClient: return all_items - def extract_title(self, item: dict) -> str: - """Extract title from library item.""" + def extract_title(self, item: LibraryItem) -> str: + """Return the book title from a library item.""" product = item.get("product", {}) return ( product.get("title") @@ -128,8 +126,8 @@ class LibraryClient: or product.get("asin", "Unknown Title") ) - def extract_authors(self, item: dict) -> str: - """Extract author names from library item.""" + def extract_authors(self, item: LibraryItem) -> str: + """Return comma-separated author names from a library item.""" product = item.get("product", {}) authors = product.get("authors") or product.get("contributors") or [] if not authors and "authors" in item: @@ -139,8 +137,8 @@ class LibraryClient: for a in authors if isinstance(a, dict)] return ", ".join(author_names) or "Unknown" - def extract_runtime_minutes(self, item: dict) -> int | None: - """Extract runtime in minutes from library item.""" + def extract_runtime_minutes(self, item: LibraryItem) -> int | None: + """Return runtime in minutes if present.""" product = item.get("product", {}) runtime_fields = [ "runtime_length_min", @@ -165,8 +163,8 @@ class LibraryClient: return int(runtime) return None - def extract_progress_info(self, item: dict) -> float | None: - """Extract progress percentage from library item.""" + def extract_progress_info(self, item: LibraryItem) -> float | None: + """Return progress percentage (0–100) if present.""" percent_complete = item.get("percent_complete") listening_status = item.get("listening_status", {}) @@ -175,13 +173,13 @@ class LibraryClient: return float(percent_complete) if percent_complete is not None else None - def extract_asin(self, item: dict) -> str | None: - """Extract ASIN from library item.""" + def extract_asin(self, item: LibraryItem) -> str | None: + """Return the ASIN for a library item.""" product = item.get("product", {}) return item.get("asin") or product.get("asin") - def is_finished(self, item: dict) -> bool: - """Check if a library item is finished.""" + def is_finished(self, item: LibraryItem) -> bool: + """Return True if the item is marked or inferred as finished.""" is_finished_flag = item.get("is_finished") percent_complete = item.get("percent_complete") listening_status = item.get("listening_status") @@ -194,8 +192,8 @@ class LibraryClient: percent_complete = listening_status.get("percent_complete", 0) return bool(is_finished_flag) or ( - isinstance(percent_complete, (int, float) - ) and percent_complete >= 100 + isinstance(percent_complete, (int, float)) + and percent_complete >= 100 ) def get_last_position(self, asin: str) -> float | None: @@ -227,7 +225,7 @@ class LibraryClient: return None def _get_content_reference(self, asin: str) -> dict | None: - """Get content reference data including ACR and version.""" + """Fetch content reference (ACR and version) for position updates.""" try: response = self.client.get( path=f"1.0/content/{asin}/metadata", @@ -242,7 +240,7 @@ class LibraryClient: return None def _update_position(self, asin: str, position_seconds: float) -> bool: - """Update the playback position for a book.""" + """Persist playback position to the API. Returns True on success.""" if position_seconds < 0: return False @@ -273,7 +271,7 @@ class LibraryClient: return False def save_last_position(self, asin: str, position_seconds: float) -> bool: - """Save the last playback position for a book.""" + """Save playback position to Audible. Returns True on success.""" if position_seconds <= 0: return False return self._update_position(asin, position_seconds) @@ -282,7 +280,7 @@ class LibraryClient: def format_duration( value: int | None, unit: str = "minutes", default_none: str | None = None ) -> str | None: - """Format duration value into a compact string.""" + """Format a duration value as e.g. 2h30m or 45m.""" if value is None or value <= 0: return default_none @@ -296,8 +294,8 @@ class LibraryClient: return f"{hours}h{minutes:02d}" if minutes else f"{hours}h" return f"{minutes}m" - def mark_as_finished(self, asin: str, item: dict | None = None) -> bool: - """Mark a book as finished by setting position to the end.""" + def mark_as_finished(self, asin: str, item: LibraryItem | None = None) -> bool: + """Mark a book as finished on Audible. Optionally mutates item in place.""" total_ms = self._get_runtime_ms(asin, item) if not total_ms: return False @@ -321,8 +319,8 @@ class LibraryClient: except Exception: return False - def _get_runtime_ms(self, asin: str, item: dict | None = None) -> int | None: - """Get total runtime in milliseconds.""" + def _get_runtime_ms(self, asin: str, item: LibraryItem | None = None) -> int | None: + """Return total runtime in ms from item or API.""" if item: runtime_min = self.extract_runtime_minutes(item) if runtime_min: @@ -340,7 +338,7 @@ class LibraryClient: return None def _get_acr(self, asin: str) -> str | None: - """Get ACR token needed for position updates.""" + """Fetch ACR token required for position and finish updates.""" try: response = self.client.post( path=f"1.0/content/{asin}/licenserequest", @@ -356,7 +354,7 @@ class LibraryClient: @staticmethod def format_time(seconds: float) -> str: - """Format seconds as HH:MM:SS or MM:SS.""" + """Format seconds as HH:MM:SS or MM:SS for display.""" total_seconds = int(seconds) hours = total_seconds // 3600 minutes = (total_seconds % 3600) // 60 diff --git a/auditui/search_utils.py b/auditui/library/search.py similarity index 57% rename from auditui/search_utils.py rename to auditui/library/search.py index 32e1b48..bdcf659 100644 --- a/auditui/search_utils.py +++ b/auditui/library/search.py @@ -1,14 +1,16 @@ -"""Search helpers for filtering library items.""" +"""Text search over library items for the filter feature.""" from __future__ import annotations from typing import Callable -from .library import LibraryClient +from ..types import LibraryItem + +from .client import LibraryClient -def build_search_text(item: dict, library_client: LibraryClient | None) -> str: - """Build a lowercase search string for an item.""" +def build_search_text(item: LibraryItem, library_client: LibraryClient | None) -> str: + """Build a single lowercase string from title and authors for matching.""" if library_client: title = library_client.extract_title(item) authors = library_client.extract_authors(item) @@ -23,11 +25,11 @@ def build_search_text(item: dict, library_client: LibraryClient | None) -> str: def filter_items( - items: list[dict], + items: list[LibraryItem], filter_text: str, - get_search_text: Callable[[dict], str], -) -> list[dict]: - """Filter items by a search string.""" + get_search_text: Callable[[LibraryItem], str], +) -> list[LibraryItem]: + """Return items whose search text contains filter_text (case-insensitive).""" if not filter_text: return items filter_lower = filter_text.lower() diff --git a/auditui/table_utils.py b/auditui/library/table.py similarity index 73% rename from auditui/table_utils.py rename to auditui/library/table.py index a4c4299..5f06af6 100644 --- a/auditui/table_utils.py +++ b/auditui/library/table.py @@ -1,20 +1,21 @@ -"""Utils for table operations.""" +"""Formatting and sorting of library items for the main table.""" import unicodedata from typing import TYPE_CHECKING, Callable -from .constants import ( +from ..constants import ( AUTHOR_NAME_DISPLAY_LENGTH, AUTHOR_NAME_MAX_LENGTH, PROGRESS_COLUMN_INDEX, ) +from ..types import LibraryItem if TYPE_CHECKING: - from .downloads import DownloadManager + from ..downloads import DownloadManager def create_title_sort_key(reverse: bool = False) -> tuple[Callable, bool]: - """Create a sort key function for sorting by title.""" + """Return a (key_fn, reverse) pair for DataTable sort by title column.""" def title_key(row_values): title_cell = row_values[0] if isinstance(title_cell, str): @@ -26,7 +27,7 @@ def create_title_sort_key(reverse: bool = False) -> tuple[Callable, bool]: def create_progress_sort_key(progress_column_index: int = PROGRESS_COLUMN_INDEX, reverse: bool = False) -> tuple[Callable, bool]: - """Create a sort key function for sorting by progress percentage.""" + """Return a (key_fn, reverse) pair for DataTable sort by progress column.""" def progress_key(row_values): progress_cell = row_values[progress_column_index] if isinstance(progress_cell, str): @@ -40,18 +41,14 @@ def create_progress_sort_key(progress_column_index: int = PROGRESS_COLUMN_INDEX, def truncate_author_name(author_names: str) -> str: - """Truncate author name if it exceeds maximum length.""" + """Truncate author string to display length with ellipsis if over max.""" if author_names and len(author_names) > AUTHOR_NAME_MAX_LENGTH: return f"{author_names[:AUTHOR_NAME_DISPLAY_LENGTH]}..." return author_names -def format_item_as_row(item: dict, library_client, download_manager: "DownloadManager | None" = None) -> tuple[str, str, str, str, str]: - """Format a library item into table row data. - - Returns: - Tuple of (title, author, runtime, progress, downloaded) strings - """ +def format_item_as_row(item: LibraryItem, library_client, download_manager: "DownloadManager | None" = None) -> tuple[str, str, str, str, str]: + """Turn a library item into (title, author, runtime, progress, downloaded) for the table.""" title = library_client.extract_title(item) author_names = library_client.extract_authors(item) @@ -79,8 +76,8 @@ def format_item_as_row(item: dict, library_client, download_manager: "DownloadMa return (title, author_display, runtime_str, progress_str, downloaded_str) -def filter_unfinished_items(items: list[dict], library_client) -> list[dict]: - """Filter out finished items from the list.""" +def filter_unfinished_items(items: list[LibraryItem], library_client) -> list[LibraryItem]: + """Return only items that are not marked as finished.""" return [ item for item in items if not library_client.is_finished(item) diff --git a/auditui/playback.py b/auditui/playback.py deleted file mode 100644 index 0ef69f5..0000000 --- a/auditui/playback.py +++ /dev/null @@ -1,513 +0,0 @@ -"""Playback control for Auditui.""" - -from __future__ import annotations - -import os -import shutil -import signal -import subprocess -import time -from pathlib import Path -from typing import Callable - -from .downloads import DownloadManager -from .library import LibraryClient -from .media_info import load_media_info - -StatusCallback = Callable[[str], None] - -MIN_SPEED = 0.5 -MAX_SPEED = 2.0 -SPEED_INCREMENT = 0.5 - - -class PlaybackController: - """Manage playback through ffplay.""" - - def __init__(self, notify: StatusCallback, library_client: LibraryClient | None = None) -> None: - self.notify = notify - self.library_client = library_client - self.playback_process: subprocess.Popen | None = None - self.is_playing = False - self.is_paused = False - self.current_file_path: Path | None = None - self.current_asin: str | None = None - self.playback_start_time: float | None = None - self.paused_duration: float = 0.0 - self.pause_start_time: float | None = None - self.total_duration: float | None = None - self.chapters: list[dict] = [] - self.seek_offset: float = 0.0 - self.activation_hex: str | None = None - self.last_save_time: float = 0.0 - self.position_save_interval: float = 30.0 - self.playback_speed: float = 1.0 - - def start( - self, - path: Path, - activation_hex: str | None = None, - status_callback: StatusCallback | None = None, - start_position: float = 0.0, - speed: float | None = None, - ) -> bool: - """Start playing a local file using ffplay.""" - notify = status_callback or self.notify - - if not shutil.which("ffplay"): - notify("ffplay not found. Please install ffmpeg") - return False - - if self.playback_process is not None: - self.stop() - - self.activation_hex = activation_hex - self.seek_offset = start_position - if speed is not None: - self.playback_speed = speed - - cmd = ["ffplay", "-nodisp", "-autoexit"] - if activation_hex: - cmd.extend(["-activation_bytes", activation_hex]) - if start_position > 0: - cmd.extend(["-ss", str(start_position)]) - if self.playback_speed != 1.0: - cmd.extend(["-af", f"atempo={self.playback_speed:.2f}"]) - cmd.append(str(path)) - - try: - self.playback_process = subprocess.Popen( - cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL - ) - - time.sleep(0.2) - if self.playback_process.poll() is not None: - return_code = self.playback_process.returncode - if return_code == 0 and start_position > 0 and self.total_duration: - if start_position >= self.total_duration - 5: - notify("Reached end of file") - self._reset_state() - return False - notify( - f"Playback process exited immediately (code: {return_code})") - self.playback_process = None - return False - - self.is_playing = True - self.is_paused = False - self.current_file_path = path - self.playback_start_time = time.time() - self.paused_duration = 0.0 - self.pause_start_time = None - duration, chapters = load_media_info(path, activation_hex) - self.total_duration = duration - self.chapters = chapters - notify(f"Playing: {path.name}") - return True - - except (OSError, ValueError, subprocess.SubprocessError) as exc: - notify(f"Error starting playback: {exc}") - return False - - def stop(self) -> None: - """Stop the current playback.""" - if self.playback_process is None: - return - - self._save_current_position() - - try: - if self.playback_process.poll() is None: - self.playback_process.terminate() - try: - self.playback_process.wait(timeout=2) - except subprocess.TimeoutExpired: - self.playback_process.kill() - self.playback_process.wait() - except (ProcessLookupError, ValueError): - pass - finally: - self._reset_state() - - def pause(self) -> None: - """Pause the current playback.""" - if not self._validate_playback_state(require_paused=False): - return - - self.pause_start_time = time.time() - self._send_signal(signal.SIGSTOP, "Paused", "pause") - - def resume(self) -> None: - """Resume the current playback.""" - if not self._validate_playback_state(require_paused=True): - return - - if self.pause_start_time is not None: - self.paused_duration += time.time() - self.pause_start_time - self.pause_start_time = None - self._send_signal(signal.SIGCONT, "Playing", "resume") - - def check_status(self) -> str | None: - """Check if playback process has finished and return status message.""" - if self.playback_process is None: - return None - - return_code = self.playback_process.poll() - if return_code is None: - return None - - finished_file = self.current_file_path - self._reset_state() - - if finished_file: - if return_code == 0: - return f"Finished: {finished_file.name}" - return f"Playback ended unexpectedly (code: {return_code}): {finished_file.name}" - return "Playback finished" - - def _reset_state(self) -> None: - """Reset all playback state.""" - self.playback_process = None - self.is_playing = False - self.is_paused = False - self.current_file_path = None - self.current_asin = None - self.playback_start_time = None - self.paused_duration = 0.0 - self.pause_start_time = None - self.total_duration = None - self.chapters = [] - self.seek_offset = 0.0 - self.activation_hex = None - self.last_save_time = 0.0 - self.playback_speed = 1.0 - - def _validate_playback_state(self, require_paused: bool) -> bool: - """Validate playback state before pause/resume operations.""" - if not (self.playback_process and self.is_playing): - return False - - if require_paused and not self.is_paused: - return False - if not require_paused and self.is_paused: - return False - - if not self.is_alive(): - self.stop() - self.notify("Playback process has ended") - return False - - return True - - def _send_signal(self, sig: signal.Signals, status_prefix: str, action: str) -> None: - """Send signal to playback process and update state.""" - if self.playback_process is None: - return - - try: - os.kill(self.playback_process.pid, sig) - self.is_paused = sig == signal.SIGSTOP - filename = self.current_file_path.name if self.current_file_path else None - message = f"{status_prefix}: {filename}" if filename else status_prefix - self.notify(message) - except ProcessLookupError: - self.stop() - self.notify("Process no longer exists") - except PermissionError: - self.notify(f"Permission denied: cannot {action} playback") - except (OSError, ValueError) as exc: - self.notify(f"Error {action}ing playback: {exc}") - - def is_alive(self) -> bool: - """Check if playback process is still running.""" - if self.playback_process is None: - return False - return self.playback_process.poll() is None - - def prepare_and_start( - self, - download_manager: DownloadManager, - asin: str, - status_callback: StatusCallback | None = None, - ) -> bool: - """Download file, get activation bytes, and start playback.""" - notify = status_callback or self.notify - - if not download_manager: - notify("Could not download file") - return False - - notify("Preparing playback...") - - local_path = download_manager.get_or_download(asin, notify) - if not local_path: - notify("Could not download file") - return False - - notify("Getting activation bytes...") - activation_hex = download_manager.get_activation_bytes() - if not activation_hex: - notify("Failed to get activation bytes") - return False - - start_position = 0.0 - if self.library_client: - try: - last_position = self.library_client.get_last_position(asin) - if last_position is not None and last_position > 0: - start_position = last_position - notify( - f"Resuming from {LibraryClient.format_time(start_position)}") - except (OSError, ValueError, KeyError): - pass - - notify(f"Starting playback of {local_path.name}...") - self.current_asin = asin - self.last_save_time = time.time() - return self.start(local_path, activation_hex, notify, start_position, self.playback_speed) - - def toggle_playback(self) -> bool: - """Toggle pause/resume state. Returns True if action was taken.""" - if not self.is_playing: - return False - - if not self.is_alive(): - self.stop() - self.notify("Playback has ended") - return False - - if self.is_paused: - self.resume() - else: - self.pause() - return True - - def _get_current_elapsed(self) -> float: - """Calculate current elapsed playback time.""" - if self.playback_start_time is None: - return 0.0 - - current_time = time.time() - if self.is_paused and self.pause_start_time is not None: - return (self.pause_start_time - self.playback_start_time) - self.paused_duration - - if self.pause_start_time is not None: - self.paused_duration += current_time - self.pause_start_time - self.pause_start_time = None - - return max(0.0, (current_time - self.playback_start_time) - self.paused_duration) - - def _stop_process(self) -> None: - """Stop the playback process without resetting state.""" - if not self.playback_process: - return - - try: - if self.playback_process.poll() is None: - self.playback_process.terminate() - try: - self.playback_process.wait(timeout=2) - except subprocess.TimeoutExpired: - self.playback_process.kill() - self.playback_process.wait() - except (ProcessLookupError, ValueError): - pass - - self.playback_process = None - self.is_playing = False - self.is_paused = False - self.playback_start_time = None - self.paused_duration = 0.0 - self.pause_start_time = None - - def _get_saved_state(self) -> dict: - """Get current playback state for saving.""" - return { - "file_path": self.current_file_path, - "asin": self.current_asin, - "activation": self.activation_hex, - "duration": self.total_duration, - "chapters": self.chapters.copy(), - "speed": self.playback_speed, - } - - def _restart_at_position( - self, new_position: float, new_speed: float | None = None, message: str | None = None - ) -> bool: - """Restart playback at a new position, optionally with new speed.""" - if not self.is_playing or not self.current_file_path: - return False - - was_paused = self.is_paused - saved_state = self._get_saved_state() - speed = new_speed if new_speed is not None else saved_state["speed"] - - self._stop_process() - time.sleep(0.2) - - if self.start(saved_state["file_path"], saved_state["activation"], self.notify, new_position, speed): - self.current_asin = saved_state["asin"] - self.total_duration = saved_state["duration"] - self.chapters = saved_state["chapters"] - if was_paused: - time.sleep(0.3) - self.pause() - if message: - self.notify(message) - return True - return False - - def _seek(self, seconds: float, direction: str) -> bool: - """Seek forward or backward by specified seconds.""" - elapsed = self._get_current_elapsed() - current_total_position = self.seek_offset + elapsed - - if direction == "forward": - new_position = current_total_position + seconds - if self.total_duration: - if new_position >= self.total_duration - 2: - self.notify("Already at end of file") - return False - new_position = min(new_position, self.total_duration - 2) - message = f"Skipped forward {int(seconds)}s" - else: - new_position = max(0.0, current_total_position - seconds) - message = f"Skipped backward {int(seconds)}s" - - return self._restart_at_position(new_position, message=message) - - def seek_forward(self, seconds: float = 30.0) -> bool: - """Seek forward by specified seconds. Returns True if action was taken.""" - return self._seek(seconds, "forward") - - def seek_backward(self, seconds: float = 30.0) -> bool: - """Seek backward by specified seconds. Returns True if action was taken.""" - return self._seek(seconds, "backward") - - def get_current_progress(self) -> tuple[str, float, float] | None: - """Get current playback progress.""" - if not self.is_playing or self.playback_start_time is None: - return None - - elapsed = self._get_current_elapsed() - total_elapsed = self.seek_offset + elapsed - chapter_name, chapter_elapsed, chapter_total = self._get_current_chapter( - total_elapsed) - return (chapter_name, chapter_elapsed, chapter_total) - - def _get_current_chapter(self, elapsed: float) -> tuple[str, float, float]: - """Get current chapter info.""" - if not self.chapters: - return ("Unknown Chapter", elapsed, self.total_duration or 0.0) - - for chapter in self.chapters: - if chapter["start_time"] <= elapsed < chapter["end_time"]: - chapter_elapsed = elapsed - chapter["start_time"] - chapter_total = chapter["end_time"] - chapter["start_time"] - return (chapter["title"], chapter_elapsed, chapter_total) - - last_chapter = self.chapters[-1] - chapter_elapsed = max(0.0, elapsed - last_chapter["start_time"]) - chapter_total = last_chapter["end_time"] - last_chapter["start_time"] - return (last_chapter["title"], chapter_elapsed, chapter_total) - - def _get_current_chapter_index(self, elapsed: float) -> int | None: - """Get the index of the current chapter based on elapsed time.""" - if not self.chapters: - return None - - for idx, chapter in enumerate(self.chapters): - if chapter["start_time"] <= elapsed < chapter["end_time"]: - return idx - - return len(self.chapters) - 1 - - def seek_to_chapter(self, direction: str) -> bool: - """Seek to next or previous chapter.""" - if not self.is_playing or not self.current_file_path: - return False - - if not self.chapters: - self.notify("No chapter information available") - return False - - elapsed = self._get_current_elapsed() - current_total_position = self.seek_offset + elapsed - current_chapter_idx = self._get_current_chapter_index( - current_total_position) - - if current_chapter_idx is None: - self.notify("Could not determine current chapter") - return False - - if direction == "next": - if current_chapter_idx >= len(self.chapters) - 1: - self.notify("Already at last chapter") - return False - target_chapter = self.chapters[current_chapter_idx + 1] - new_position = target_chapter["start_time"] - message = f"Next chapter: {target_chapter['title']}" - else: - if current_chapter_idx <= 0: - self.notify("Already at first chapter") - return False - target_chapter = self.chapters[current_chapter_idx - 1] - new_position = target_chapter["start_time"] - message = f"Previous chapter: {target_chapter['title']}" - - return self._restart_at_position(new_position, message=message) - - def seek_to_next_chapter(self) -> bool: - """Seek to the next chapter. Returns True if action was taken.""" - return self.seek_to_chapter("next") - - def seek_to_previous_chapter(self) -> bool: - """Seek to the previous chapter. Returns True if action was taken.""" - return self.seek_to_chapter("previous") - - def _save_current_position(self) -> None: - """Save the current playback position to Audible.""" - if not (self.library_client and self.current_asin and self.is_playing): - return - - if self.playback_start_time is None: - return - - current_position = self.seek_offset + self._get_current_elapsed() - if current_position <= 0: - return - - try: - self.library_client.save_last_position( - self.current_asin, current_position) - except (OSError, ValueError, KeyError): - pass - - def update_position_if_needed(self) -> None: - """Periodically save position if enough time has passed.""" - if not (self.is_playing and self.library_client and self.current_asin): - return - - current_time = time.time() - if current_time - self.last_save_time >= self.position_save_interval: - self._save_current_position() - self.last_save_time = current_time - - def _change_speed(self, delta: float) -> bool: - """Change playback speed by delta amount. Returns True if action was taken.""" - new_speed = max(MIN_SPEED, min(MAX_SPEED, self.playback_speed + delta)) - if new_speed == self.playback_speed: - return False - - elapsed = self._get_current_elapsed() - current_total_position = self.seek_offset + elapsed - - return self._restart_at_position(current_total_position, new_speed, f"Speed: {new_speed:.2f}x") - - def increase_speed(self) -> bool: - """Increase playback speed. Returns True if action was taken.""" - return self._change_speed(SPEED_INCREMENT) - - def decrease_speed(self) -> bool: - """Decrease playback speed. Returns True if action was taken.""" - return self._change_speed(-SPEED_INCREMENT) diff --git a/auditui/playback/__init__.py b/auditui/playback/__init__.py new file mode 100644 index 0000000..e997c39 --- /dev/null +++ b/auditui/playback/__init__.py @@ -0,0 +1,6 @@ +"""Playback control via ffplay and position sync with Audible.""" + +from .controller import PlaybackController +from .media_info import load_media_info + +__all__ = ["PlaybackController", "load_media_info"] diff --git a/auditui/playback/chapters.py b/auditui/playback/chapters.py new file mode 100644 index 0000000..2f79b01 --- /dev/null +++ b/auditui/playback/chapters.py @@ -0,0 +1,30 @@ +"""Chapter lookup by elapsed time.""" + + +def get_current_chapter( + elapsed: float, + chapters: list[dict], + total_duration: float | None, +) -> tuple[str, float, float]: + """Return (title, elapsed_in_chapter, chapter_duration) for the chapter at elapsed time.""" + if not chapters: + return ("Unknown Chapter", elapsed, total_duration or 0.0) + for chapter in chapters: + if chapter["start_time"] <= elapsed < chapter["end_time"]: + chapter_elapsed = elapsed - chapter["start_time"] + chapter_total = chapter["end_time"] - chapter["start_time"] + return (chapter["title"], chapter_elapsed, chapter_total) + last = chapters[-1] + chapter_elapsed = max(0.0, elapsed - last["start_time"]) + chapter_total = last["end_time"] - last["start_time"] + return (last["title"], chapter_elapsed, chapter_total) + + +def get_current_chapter_index(elapsed: float, chapters: list[dict]) -> int | None: + """Return the index of the chapter containing the given elapsed time.""" + if not chapters: + return None + for idx, chapter in enumerate(chapters): + if chapter["start_time"] <= elapsed < chapter["end_time"]: + return idx + return len(chapters) - 1 diff --git a/auditui/playback/constants.py b/auditui/playback/constants.py new file mode 100644 index 0000000..da63e35 --- /dev/null +++ b/auditui/playback/constants.py @@ -0,0 +1,5 @@ +"""Speed limits and increment for playback.""" + +MIN_SPEED = 0.5 +MAX_SPEED = 2.0 +SPEED_INCREMENT = 0.5 diff --git a/auditui/playback/controller.py b/auditui/playback/controller.py new file mode 100644 index 0000000..6ef91d6 --- /dev/null +++ b/auditui/playback/controller.py @@ -0,0 +1,14 @@ +"""Orchestrates ffplay process, position, chapters, seek, and speed; delegates to playback submodules.""" + +from __future__ import annotations + +from ..library import LibraryClient +from ..types import StatusCallback + +from .controller_seek_speed import ControllerSeekSpeedMixin +from .controller_lifecycle import ControllerLifecycleMixin +from .controller_state import ControllerStateMixin + + +class PlaybackController(ControllerSeekSpeedMixin, ControllerLifecycleMixin, ControllerStateMixin): + """Controls ffplay: start/stop, pause/resume, seek, speed, and saving position to Audible.""" diff --git a/auditui/playback/controller_lifecycle.py b/auditui/playback/controller_lifecycle.py new file mode 100644 index 0000000..23946ee --- /dev/null +++ b/auditui/playback/controller_lifecycle.py @@ -0,0 +1,183 @@ +"""Playback lifecycle: start, stop, pause, resume, prepare_and_start, restart at position.""" + +from __future__ import annotations + +import signal +import subprocess +import time +from pathlib import Path + +from ..downloads import DownloadManager +from ..library import LibraryClient +from ..types import StatusCallback + +from . import process as process_mod +from .media_info import load_media_info + +from .controller_state import ControllerStateMixin + + +class ControllerLifecycleMixin(ControllerStateMixin): + """Start/stop, pause/resume, and restart-at-position logic.""" + + def start( + self, + path: Path, + activation_hex: str | None = None, + status_callback: StatusCallback | None = None, + start_position: float = 0.0, + speed: float | None = None, + ) -> bool: + """Start ffplay for the given AAX path. Returns True if playback started.""" + notify = status_callback or self.notify + if not process_mod.is_ffplay_available(): + notify("ffplay not found. Please install ffmpeg") + return False + if self.playback_process is not None: + self.stop() + self.activation_hex = activation_hex + self.seek_offset = start_position + if speed is not None: + self.playback_speed = speed + cmd = process_mod.build_ffplay_cmd( + path, activation_hex, start_position, self.playback_speed + ) + try: + proc, return_code = process_mod.run_ffplay(cmd) + if proc is None: + if return_code == 0 and start_position > 0 and self.total_duration and start_position >= self.total_duration - 5: + notify("Reached end of file") + self._reset_state() + return False + notify( + f"Playback process exited immediately (code: {return_code})") + return False + self.playback_process = proc + self.is_playing = True + self.is_paused = False + self.current_file_path = path + self.playback_start_time = time.time() + self.paused_duration = 0.0 + self.pause_start_time = None + duration, chs = load_media_info(path, activation_hex) + self.total_duration = duration + self.chapters = chs + notify(f"Playing: {path.name}") + return True + except (OSError, ValueError, subprocess.SubprocessError) as exc: + notify(f"Error starting playback: {exc}") + return False + + def stop(self) -> None: + """Stop ffplay, save position to Audible, and reset state.""" + if self.playback_process is None: + return + self._save_current_position() + try: + process_mod.terminate_process(self.playback_process) + finally: + self._reset_state() + + def pause(self) -> None: + """Send SIGSTOP to ffplay and mark state as paused.""" + if not self._validate_playback_state(require_paused=False): + return + self.pause_start_time = time.time() + self._send_signal(signal.SIGSTOP, "Paused", "pause") + + def resume(self) -> None: + """Send SIGCONT to ffplay and clear paused state.""" + if not self._validate_playback_state(require_paused=True): + return + if self.pause_start_time is not None: + self.paused_duration += time.time() - self.pause_start_time + self.pause_start_time = None + self._send_signal(signal.SIGCONT, "Playing", "resume") + + def check_status(self) -> str | None: + """If the process has exited, return a status message and reset state; else None.""" + if self.playback_process is None: + return None + return_code = self.playback_process.poll() + if return_code is None: + return None + finished_file = self.current_file_path + self._reset_state() + if finished_file: + if return_code == 0: + return f"Finished: {finished_file.name}" + return f"Playback ended unexpectedly (code: {return_code}): {finished_file.name}" + return "Playback finished" + + def prepare_and_start( + self, + download_manager: DownloadManager, + asin: str, + status_callback: StatusCallback | None = None, + ) -> bool: + """Download AAX if needed, get activation bytes, then start playback. Returns True on success.""" + notify = status_callback or self.notify + if not download_manager: + notify("Could not download file") + return False + notify("Preparing playback...") + local_path = download_manager.get_or_download(asin, notify) + if not local_path: + notify("Could not download file") + return False + notify("Getting activation bytes...") + activation_hex = download_manager.get_activation_bytes() + if not activation_hex: + notify("Failed to get activation bytes") + return False + start_position = 0.0 + if self.library_client: + try: + last = self.library_client.get_last_position(asin) + if last is not None and last > 0: + start_position = last + notify( + f"Resuming from {LibraryClient.format_time(start_position)}") + except (OSError, ValueError, KeyError): + pass + notify(f"Starting playback of {local_path.name}...") + self.current_asin = asin + self.last_save_time = time.time() + return self.start(local_path, activation_hex, notify, start_position, self.playback_speed) + + def toggle_playback(self) -> bool: + """Toggle between pause and resume. Returns True if an action was performed.""" + if not self.is_playing: + return False + if not self.is_alive(): + self.stop() + self.notify("Playback has ended") + return False + if self.is_paused: + self.resume() + else: + self.pause() + return True + + def _restart_at_position( + self, new_position: float, new_speed: float | None = None, message: str | None = None + ) -> bool: + """Stop current process and start again at new_position; optionally set speed and notify.""" + if not self.is_playing or not self.current_file_path: + return False + was_paused = self.is_paused + saved = self._get_saved_state() + speed = new_speed if new_speed is not None else saved["speed"] + self._stop_process() + time.sleep(0.2) + if self.start(saved["file_path"], saved["activation"], self.notify, new_position, speed): + self.current_asin = saved["asin"] + self.total_duration = saved["duration"] + self.chapters = saved["chapters"] + if was_paused: + time.sleep(0.3) + self.pause() + if message: + self.notify(message) + return True + return False diff --git a/auditui/playback/controller_seek_speed.py b/auditui/playback/controller_seek_speed.py new file mode 100644 index 0000000..956e907 --- /dev/null +++ b/auditui/playback/controller_seek_speed.py @@ -0,0 +1,127 @@ +"""Seek, chapter, position save, and playback speed for the controller.""" + +from __future__ import annotations + +import time + +from . import chapters as chapters_mod +from . import seek as seek_mod +from .constants import MIN_SPEED, MAX_SPEED, SPEED_INCREMENT + +from .controller_lifecycle import ControllerLifecycleMixin + + +class ControllerSeekSpeedMixin(ControllerLifecycleMixin): + """Seek, chapter navigation, position persistence, and speed control.""" + + def _seek(self, seconds: float, direction: str) -> bool: + """Seek forward or backward by seconds via restart at new position. Returns True if done.""" + elapsed = self._get_current_elapsed() + current = self.seek_offset + elapsed + result = seek_mod.compute_seek_target( + current, self.total_duration, seconds, direction + ) + if result is None: + self.notify("Already at end of file") + return False + new_position, message = result + return self._restart_at_position(new_position, message=message) + + def seek_forward(self, seconds: float = 30.0) -> bool: + """Seek forward by the given seconds. Returns True if seek was performed.""" + return self._seek(seconds, "forward") + + def seek_backward(self, seconds: float = 30.0) -> bool: + """Seek backward by the given seconds. Returns True if seek was performed.""" + return self._seek(seconds, "backward") + + def get_current_progress(self) -> tuple[str, float, float] | None: + """Return (chapter_title, chapter_elapsed, chapter_total) for progress display, or None.""" + if not self.is_playing or self.playback_start_time is None: + return None + elapsed = self._get_current_elapsed() + total_elapsed = self.seek_offset + elapsed + return chapters_mod.get_current_chapter( + total_elapsed, self.chapters, self.total_duration + ) + + def seek_to_chapter(self, direction: str) -> bool: + """Seek to the next or previous chapter. direction is 'next' or 'previous'. Returns True if done.""" + if not self.is_playing or not self.current_file_path: + return False + if not self.chapters: + self.notify("No chapter information available") + return False + elapsed = self._get_current_elapsed() + current_total = self.seek_offset + elapsed + idx = chapters_mod.get_current_chapter_index( + current_total, self.chapters) + if idx is None: + self.notify("Could not determine current chapter") + return False + if direction == "next": + if idx >= len(self.chapters) - 1: + self.notify("Already at last chapter") + return False + target = self.chapters[idx + 1] + new_position = target["start_time"] + message = f"Next chapter: {target['title']}" + else: + if idx <= 0: + self.notify("Already at first chapter") + return False + target = self.chapters[idx - 1] + new_position = target["start_time"] + message = f"Previous chapter: {target['title']}" + return self._restart_at_position(new_position, message=message) + + def seek_to_next_chapter(self) -> bool: + """Seek to the next chapter. Returns True if seek was performed.""" + return self.seek_to_chapter("next") + + def seek_to_previous_chapter(self) -> bool: + """Seek to the previous chapter. Returns True if seek was performed.""" + return self.seek_to_chapter("previous") + + def _save_current_position(self) -> None: + """Persist current position to Audible via library_client.""" + if not (self.library_client and self.current_asin and self.is_playing): + return + if self.playback_start_time is None: + return + current_position = self.seek_offset + self._get_current_elapsed() + if current_position <= 0: + return + try: + self.library_client.save_last_position( + self.current_asin, current_position) + except (OSError, ValueError, KeyError): + pass + + def update_position_if_needed(self) -> None: + """Save position to Audible if the save interval has elapsed since last save.""" + if not (self.is_playing and self.library_client and self.current_asin): + return + current_time = time.time() + if current_time - self.last_save_time >= self.position_save_interval: + self._save_current_position() + self.last_save_time = current_time + + def _change_speed(self, delta: float) -> bool: + """Change speed by delta (clamped to MIN/MAX). Restarts playback. Returns True if changed.""" + new_speed = max(MIN_SPEED, min(MAX_SPEED, self.playback_speed + delta)) + if new_speed == self.playback_speed: + return False + elapsed = self._get_current_elapsed() + current_total = self.seek_offset + elapsed + return self._restart_at_position( + current_total, new_speed, f"Speed: {new_speed:.2f}x" + ) + + def increase_speed(self) -> bool: + """Increase playback speed. Returns True if speed was changed.""" + return self._change_speed(SPEED_INCREMENT) + + def decrease_speed(self) -> bool: + """Decrease playback speed. Returns True if speed was changed.""" + return self._change_speed(-SPEED_INCREMENT) diff --git a/auditui/playback/controller_state.py b/auditui/playback/controller_state.py new file mode 100644 index 0000000..86c2498 --- /dev/null +++ b/auditui/playback/controller_state.py @@ -0,0 +1,124 @@ +"""Playback state: init, reset, elapsed time, process validation and signals.""" + +from __future__ import annotations + +import signal +import time +from pathlib import Path + +from ..library import LibraryClient +from ..types import StatusCallback + +from . import elapsed as elapsed_mod +from . import process as process_mod + + +class ControllerStateMixin: + """State attributes and helpers for process/signal handling.""" + + def __init__(self, notify: StatusCallback, library_client: LibraryClient | None = None) -> None: + self.notify = notify + self.library_client = library_client + self.playback_process = None + self.is_playing = False + self.is_paused = False + self.current_file_path: Path | None = None + self.current_asin: str | None = None + self.playback_start_time: float | None = None + self.paused_duration: float = 0.0 + self.pause_start_time: float | None = None + self.total_duration: float | None = None + self.chapters: list[dict] = [] + self.seek_offset: float = 0.0 + self.activation_hex: str | None = None + self.last_save_time: float = 0.0 + self.position_save_interval: float = 30.0 + self.playback_speed: float = 1.0 + + def _reset_state(self) -> None: + """Clear playing/paused state and references to current file/asin.""" + self.playback_process = None + self.is_playing = False + self.is_paused = False + self.current_file_path = None + self.current_asin = None + self.playback_start_time = None + self.paused_duration = 0.0 + self.pause_start_time = None + self.total_duration = None + self.chapters = [] + self.seek_offset = 0.0 + self.activation_hex = None + self.last_save_time = 0.0 + self.playback_speed = 1.0 + + def _get_saved_state(self) -> dict: + """Return a snapshot of path, asin, activation, duration, chapters, speed for restart.""" + return { + "file_path": self.current_file_path, + "asin": self.current_asin, + "activation": self.activation_hex, + "duration": self.total_duration, + "chapters": self.chapters.copy(), + "speed": self.playback_speed, + } + + def _get_current_elapsed(self) -> float: + """Return elapsed seconds since start, accounting for pauses.""" + if self.pause_start_time is not None and not self.is_paused: + self.paused_duration += time.time() - self.pause_start_time + self.pause_start_time = None + return elapsed_mod.get_elapsed( + self.playback_start_time, + self.pause_start_time, + self.paused_duration, + self.is_paused, + ) + + def _stop_process(self) -> None: + """Terminate the process and clear playing state without saving position.""" + process_mod.terminate_process(self.playback_process) + self.playback_process = None + self.is_playing = False + self.is_paused = False + self.playback_start_time = None + self.paused_duration = 0.0 + self.pause_start_time = None + + def _validate_playback_state(self, require_paused: bool) -> bool: + """Return True if process is running and paused state matches require_paused.""" + if not (self.playback_process and self.is_playing): + return False + if require_paused and not self.is_paused: + return False + if not require_paused and self.is_paused: + return False + if not self.is_alive(): + self.stop() + self.notify("Playback process has ended") + return False + return True + + def _send_signal(self, sig: signal.Signals, status_prefix: str, action: str) -> None: + """Send sig to the process, update is_paused, and notify.""" + if self.playback_process is None: + return + try: + process_mod.send_signal(self.playback_process, sig) + self.is_paused = sig == signal.SIGSTOP + filename = self.current_file_path.name if self.current_file_path else None + msg = f"{status_prefix}: {filename}" if filename else status_prefix + self.notify(msg) + except ProcessLookupError: + self.stop() + self.notify("Process no longer exists") + except PermissionError: + self.notify(f"Permission denied: cannot {action} playback") + except (OSError, ValueError) as exc: + self.notify(f"Error {action}ing playback: {exc}") + + def is_alive(self) -> bool: + """Return True if the ffplay process is still running.""" + if self.playback_process is None: + return False + return self.playback_process.poll() is None diff --git a/auditui/playback/elapsed.py b/auditui/playback/elapsed.py new file mode 100644 index 0000000..9d7f0f1 --- /dev/null +++ b/auditui/playback/elapsed.py @@ -0,0 +1,23 @@ +"""Elapsed playback time accounting for pauses.""" + +import time + + +def get_elapsed( + playback_start_time: float | None, + pause_start_time: float | None, + paused_duration: float, + is_paused: bool, +) -> float: + """Return elapsed seconds since start, accounting for pauses.""" + if playback_start_time is None: + return 0.0 + current_time = time.time() + if is_paused and pause_start_time is not None: + return (pause_start_time - playback_start_time) - paused_duration + if pause_start_time is not None: + paused_duration += current_time - pause_start_time + return max( + 0.0, + (current_time - playback_start_time) - paused_duration, + ) diff --git a/auditui/media_info.py b/auditui/playback/media_info.py similarity index 88% rename from auditui/media_info.py rename to auditui/playback/media_info.py index edf33c1..f8fa9e4 100644 --- a/auditui/media_info.py +++ b/auditui/playback/media_info.py @@ -1,4 +1,4 @@ -"""Media information loading for Audible content.""" +"""Duration and chapter list for AAX files via ffprobe.""" import json import shutil @@ -7,7 +7,7 @@ from pathlib import Path def load_media_info(path: Path, activation_hex: str | None = None) -> tuple[float | None, list[dict]]: - """Load media information including duration and chapters using ffprobe.""" + """Return (total_duration_seconds, chapters) for the AAX file. Chapters have start_time, end_time, title.""" if not shutil.which("ffprobe"): return None, [] diff --git a/auditui/playback/process.py b/auditui/playback/process.py new file mode 100644 index 0000000..95c89d8 --- /dev/null +++ b/auditui/playback/process.py @@ -0,0 +1,68 @@ +"""FFplay process: build command, spawn, terminate, and send signals.""" + +import os +import shutil +import signal +import subprocess +import time +from pathlib import Path + + +def build_ffplay_cmd( + path: Path, + activation_hex: str | None, + start_position: float, + speed: float, +) -> list[str]: + """Build the ffplay command line for the given path and options.""" + cmd = ["ffplay", "-nodisp", "-autoexit"] + if activation_hex: + cmd.extend(["-activation_bytes", activation_hex]) + if start_position > 0: + cmd.extend(["-ss", str(start_position)]) + if speed != 1.0: + cmd.extend(["-af", f"atempo={speed:.2f}"]) + cmd.append(str(path)) + return cmd + + +def is_ffplay_available() -> bool: + """Return True if ffplay is on PATH.""" + return shutil.which("ffplay") is not None + + +def run_ffplay(cmd: list[str]) -> tuple[subprocess.Popen | None, int | None]: + """Spawn ffplay. Returns (proc, None) on success, (None, return_code) if process exited immediately, (None, None) if ffplay missing or spawn failed.""" + if not is_ffplay_available(): + return (None, None) + try: + proc = subprocess.Popen( + cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL + ) + time.sleep(0.2) + if proc.poll() is not None: + return (None, proc.returncode) + return (proc, None) + except (OSError, ValueError, subprocess.SubprocessError): + return (None, None) + + +def terminate_process(proc: subprocess.Popen | None) -> None: + """Terminate the process; kill if it does not exit within timeout.""" + if proc is None: + return + try: + if proc.poll() is None: + proc.terminate() + try: + proc.wait(timeout=2) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait() + except (ProcessLookupError, ValueError): + pass + + +def send_signal(proc: subprocess.Popen, sig: signal.Signals) -> None: + """Send sig to the process. May raise ProcessLookupError, PermissionError, OSError.""" + os.kill(proc.pid, sig) diff --git a/auditui/playback/seek.py b/auditui/playback/seek.py new file mode 100644 index 0000000..f4c7f7f --- /dev/null +++ b/auditui/playback/seek.py @@ -0,0 +1,19 @@ +"""Seek target computation: new position and message from direction and seconds.""" + + +def compute_seek_target( + current_position: float, + total_duration: float | None, + seconds: float, + direction: str, +) -> tuple[float, str] | None: + """Return (new_position, message) for a seek, or None if seek is invalid (e.g. at end).""" + if direction == "forward": + new_position = current_position + seconds + if total_duration is not None: + if new_position >= total_duration - 2: + return None + new_position = min(new_position, total_duration - 2) + return (new_position, f"Skipped forward {int(seconds)}s") + new_position = max(0.0, current_position - seconds) + return (new_position, f"Skipped backward {int(seconds)}s") diff --git a/auditui/stats/__init__.py b/auditui/stats/__init__.py new file mode 100644 index 0000000..3bcd881 --- /dev/null +++ b/auditui/stats/__init__.py @@ -0,0 +1,5 @@ +"""Listening and account statistics for the stats screen.""" + +from .aggregator import StatsAggregator + +__all__ = ["StatsAggregator"] diff --git a/auditui/stats/account.py b/auditui/stats/account.py new file mode 100644 index 0000000..77079a6 --- /dev/null +++ b/auditui/stats/account.py @@ -0,0 +1,71 @@ +"""Account and subscription data from the API.""" + +from typing import Any + + +def get_account_info(client: Any) -> dict: + if not client: + return {} + account_info = {} + endpoints = [ + ( + "1.0/account/information", + "subscription_details,plan_summary,subscription_details_payment_instrument,delinquency_status,customer_benefits,customer_segments,directed_ids", + ), + ( + "1.0/customer/information", + "subscription_details_premium,subscription_details_rodizio,customer_segment,subscription_details_channels,migration_details", + ), + ( + "1.0/customer/status", + "benefits_status,member_giving_status,prime_benefits_status,prospect_benefits_status", + ), + ] + for endpoint, response_groups in endpoints: + try: + response = client.get(endpoint, response_groups=response_groups) + account_info.update(response) + except Exception: + pass + return account_info + + +def get_subscription_details(account_info: dict) -> dict: + paths = [ + ["customer_details", "subscription", "subscription_details"], + ["customer", "customer_details", "subscription", "subscription_details"], + ["subscription_details"], + ["subscription", "subscription_details"], + ] + for path in paths: + data = account_info + for key in path: + if isinstance(data, dict): + data = data.get(key) + else: + break + if isinstance(data, list) and data: + return data[0] + return {} + + +def get_country(auth: Any) -> str: + if not auth: + return "Unknown" + try: + locale_obj = getattr(auth, "locale", None) + if not locale_obj: + return "Unknown" + if hasattr(locale_obj, "country_code"): + return locale_obj.country_code.upper() + if hasattr(locale_obj, "domain"): + return locale_obj.domain.upper() + if isinstance(locale_obj, str): + return ( + locale_obj.split("_")[-1].upper() + if "_" in locale_obj + else locale_obj.upper() + ) + return str(locale_obj) + except Exception: + return "Unknown" diff --git a/auditui/stats/aggregator.py b/auditui/stats/aggregator.py new file mode 100644 index 0000000..508da84 --- /dev/null +++ b/auditui/stats/aggregator.py @@ -0,0 +1,85 @@ +"""Aggregates listening time, account info, and subscription data for display.""" + +from datetime import date +from typing import Any + +from ..types import LibraryItem + +from . import account as account_mod +from . import email as email_mod +from . import format as format_mod +from . import listening as listening_mod + + +class StatsAggregator: + """Builds a list of (label, value) stats from the API, auth, and library.""" + + def __init__( + self, + client: Any, + auth: Any, + library_client: Any, + all_items: list[LibraryItem], + ) -> None: + self.client = client + self.auth = auth + self.library_client = library_client + self.all_items = all_items + + def get_stats(self, today: date | None = None) -> list[tuple[str, str]]: + if not self.client: + return [] + today = today or date.today() + signup_year = listening_mod.get_signup_year(self.client) + month_time = listening_mod.get_listening_time( + self.client, 1, today.strftime("%Y-%m") + ) + year_time = listening_mod.get_listening_time( + self.client, 12, today.strftime("%Y-01") + ) + finished_count = listening_mod.get_finished_books_count( + self.library_client, self.all_items or [] + ) + total_books = len(self.all_items) if self.all_items else 0 + email = email_mod.resolve_email( + self.auth, + self.client, + get_account_info=lambda: account_mod.get_account_info(self.client), + ) + country = account_mod.get_country(self.auth) + subscription_name = "Unknown" + subscription_price = "Unknown" + next_bill_date = "Unknown" + account_info = account_mod.get_account_info(self.client) + if account_info: + subscription_data = account_mod.get_subscription_details( + account_info) + if subscription_data: + if name := subscription_data.get("name"): + subscription_name = name + if bill_date := subscription_data.get("next_bill_date"): + next_bill_date = format_mod.format_date(bill_date) + if bill_amount := subscription_data.get("next_bill_amount", {}): + amount = bill_amount.get("currency_value") + currency = bill_amount.get("currency_code", "EUR") + if amount is not None: + subscription_price = f"{amount} {currency}" + stats_items = [] + if email != "Unknown": + stats_items.append(("Email", email)) + stats_items.append(("Country Store", country)) + stats_items.append( + ("Signup Year", str(signup_year) if signup_year > 0 else "Unknown") + ) + if next_bill_date != "Unknown": + stats_items.append(("Next Credit", next_bill_date)) + stats_items.append(("Next Bill", next_bill_date)) + if subscription_name != "Unknown": + stats_items.append(("Subscription", subscription_name)) + if subscription_price != "Unknown": + stats_items.append(("Price", subscription_price)) + stats_items.append(("This Month", format_mod.format_time(month_time))) + stats_items.append(("This Year", format_mod.format_time(year_time))) + stats_items.append( + ("Books Finished", f"{finished_count} / {total_books}")) + return stats_items diff --git a/auditui/stats/email.py b/auditui/stats/email.py new file mode 100644 index 0000000..e57d4ae --- /dev/null +++ b/auditui/stats/email.py @@ -0,0 +1,155 @@ +"""Email resolution from auth, config, auth file, and account API.""" + +import json +from pathlib import Path +from typing import Any, Callable + +from ..constants import AUTH_PATH, CONFIG_PATH + + +def find_email_in_data(data: Any) -> str | None: + if data is None: + return None + stack = [data] + while stack: + current = stack.pop() + if isinstance(current, dict): + stack.extend(current.values()) + elif isinstance(current, list): + stack.extend(current) + elif isinstance(current, str): + if "@" in current: + local, _, domain = current.partition("@") + if local and "." in domain: + return current + return None + + +def first_email(*values: str | None) -> str | None: + for value in values: + if value and value != "Unknown": + return value + return None + + +def get_email_from_auth(auth: Any) -> str | None: + if not auth: + return None + try: + email = first_email( + getattr(auth, "username", None), + getattr(auth, "login", None), + getattr(auth, "email", None), + ) + if email: + return email + except Exception: + return None + try: + customer_info = getattr(auth, "customer_info", None) + if isinstance(customer_info, dict): + email = first_email( + customer_info.get("email"), + customer_info.get("email_address"), + customer_info.get("primary_email"), + ) + if email: + return email + except Exception: + return None + try: + data = getattr(auth, "data", None) + if isinstance(data, dict): + return first_email( + data.get("username"), + data.get("email"), + data.get("login"), + data.get("user_email"), + ) + except Exception: + return None + return None + + +def get_email_from_config(config_path: Path | None = None) -> str | None: + path = config_path or CONFIG_PATH + try: + if path.exists(): + with open(path, "r", encoding="utf-8") as f: + config = json.load(f) + return first_email( + config.get("email"), + config.get("username"), + config.get("login"), + ) + except Exception: + return None + return None + + +def get_email_from_auth_file(auth_path: Path | None = None) -> str | None: + path = auth_path or AUTH_PATH + try: + if path.exists(): + with open(path, "r", encoding="utf-8") as f: + auth_file_data = json.load(f) + return first_email( + auth_file_data.get("username"), + auth_file_data.get("email"), + auth_file_data.get("login"), + auth_file_data.get("user_email"), + ) + except Exception: + return None + return None + + +def get_email_from_account_info(account_info: dict) -> str | None: + email = first_email( + account_info.get("email"), + account_info.get("customer_email"), + account_info.get("username"), + ) + if email: + return email + customer_info = account_info.get("customer_info", {}) + if isinstance(customer_info, dict): + return first_email( + customer_info.get("email"), + customer_info.get("email_address"), + customer_info.get("primary_email"), + ) + return None + + +def resolve_email( + auth: Any, + client: Any, + config_path: Path | None = None, + auth_path: Path | None = None, + get_account_info: Callable[[], dict] | None = None, +) -> str: + config_path = config_path or CONFIG_PATH + auth_path = auth_path or AUTH_PATH + for getter in ( + lambda: get_email_from_auth(auth), + lambda: get_email_from_config(config_path), + lambda: get_email_from_auth_file(auth_path), + lambda: get_email_from_account_info( + get_account_info()) if get_account_info else None, + ): + email = getter() + if email: + return email + auth_data = None + if auth: + try: + auth_data = getattr(auth, "data", None) + except Exception: + pass + account_info = get_account_info() if get_account_info else {} + for candidate in (auth_data, account_info): + email = find_email_in_data(candidate) + if email: + return email + return "Unknown" diff --git a/auditui/stats/format.py b/auditui/stats/format.py new file mode 100644 index 0000000..9e20fc0 --- /dev/null +++ b/auditui/stats/format.py @@ -0,0 +1,22 @@ +"""Time and date formatting for stats display.""" + +from datetime import datetime + + +def format_time(milliseconds: int) -> str: + total_seconds = int(milliseconds) // 1000 + hours, remainder = divmod(total_seconds, 3600) + minutes, _ = divmod(remainder, 60) + if hours > 0: + return f"{hours}h{minutes:02d}" + return f"{minutes}m" + + +def format_date(date_str: str | None) -> str: + if not date_str: + return "Unknown" + try: + dt = datetime.fromisoformat(date_str.replace("Z", "+00:00")) + return dt.strftime("%Y-%m-%d") + except ValueError: + return date_str diff --git a/auditui/stats/listening.py b/auditui/stats/listening.py new file mode 100644 index 0000000..e4743fe --- /dev/null +++ b/auditui/stats/listening.py @@ -0,0 +1,75 @@ +"""Listening time and signup year from stats API; finished books count.""" + +from datetime import date +from typing import Any + +from ..types import LibraryItem + + +def has_activity(stats: dict) -> bool: + monthly_stats = stats.get("aggregated_monthly_listening_stats", []) + return bool( + monthly_stats and any(s.get("aggregated_sum", 0) + > 0 for s in monthly_stats) + ) + + +def get_listening_time(client: Any, duration: int, start_date: str) -> int: + if not client: + return 0 + try: + stats = client.get( + "1.0/stats/aggregates", + monthly_listening_interval_duration=str(duration), + monthly_listening_interval_start_date=start_date, + store="Audible", + ) + monthly_stats = stats.get("aggregated_monthly_listening_stats", []) + return sum(s.get("aggregated_sum", 0) for s in monthly_stats) + except Exception: + return 0 + + +def get_signup_year(client: Any) -> int: + if not client: + return 0 + current_year = date.today().year + try: + stats = client.get( + "1.0/stats/aggregates", + monthly_listening_interval_duration="12", + monthly_listening_interval_start_date=f"{current_year}-01", + store="Audible", + ) + if not has_activity(stats): + return 0 + except Exception: + return 0 + left, right = 1995, current_year + earliest_year = current_year + while left <= right: + middle = (left + right) // 2 + try: + stats = client.get( + "1.0/stats/aggregates", + monthly_listening_interval_duration="12", + monthly_listening_interval_start_date=f"{middle}-01", + store="Audible", + ) + has_activity_ = has_activity(stats) + except Exception: + has_activity_ = False + if has_activity_: + earliest_year = middle + right = middle - 1 + else: + left = middle + 1 + return earliest_year + + +def get_finished_books_count( + library_client: Any, all_items: list[LibraryItem] +) -> int: + if not library_client or not all_items: + return 0 + return sum(1 for item in all_items if library_client.is_finished(item)) diff --git a/auditui/types/__init__.py b/auditui/types/__init__.py new file mode 100644 index 0000000..5f628ee --- /dev/null +++ b/auditui/types/__init__.py @@ -0,0 +1,8 @@ +"""Shared type aliases for the Audible TUI.""" + +from __future__ import annotations + +from typing import Callable + +LibraryItem = dict +StatusCallback = Callable[[str], None] diff --git a/auditui/ui.py b/auditui/ui.py deleted file mode 100644 index 570d4e5..0000000 --- a/auditui/ui.py +++ /dev/null @@ -1,570 +0,0 @@ -"""UI components for the Auditui application.""" - -import json -from datetime import date, datetime -from typing import Any, Callable, Protocol, TYPE_CHECKING, cast - -from textual.app import ComposeResult -from textual.containers import Container, Vertical -from textual.screen import ModalScreen -from textual.timer import Timer -from textual.widgets import Input, Label, ListItem, ListView, Static - -from .constants import AUTH_PATH, CONFIG_PATH - -if TYPE_CHECKING: - from textual.binding import Binding - - -class _AppContext(Protocol): - BINDINGS: list[tuple[str, str, str]] - client: Any - auth: Any - library_client: Any - all_items: list[dict] - - -KEY_DISPLAY_MAP = { - "ctrl+": "^", - "left": "←", - "right": "→", - "up": "↑", - "down": "↓", - "space": "Space", - "enter": "Enter", -} - -KEY_COLOR = "#f9e2af" -DESC_COLOR = "#cdd6f4" - - -class AppContextMixin: - """Mixin to provide a typed app accessor.""" - - def _app(self) -> _AppContext: - return cast(_AppContext, cast(Any, self).app) - - -class HelpScreen(AppContextMixin, ModalScreen): - """Help screen displaying all available keybindings.""" - - BINDINGS = [("escape", "dismiss", "Close"), ("?", "dismiss", "Close")] - - @staticmethod - def _format_key_display(key: str) -> str: - """Format a key string for display with symbols.""" - result = key - for old, new in KEY_DISPLAY_MAP.items(): - result = result.replace(old, new) - return result - - @staticmethod - def _parse_binding(binding: "Binding | tuple[str, str, str]") -> tuple[str, str]: - """Extract key and description from a binding.""" - if isinstance(binding, tuple): - return binding[0], binding[2] - return binding.key, binding.description - - def _make_item(self, binding: "Binding | tuple[str, str, str]") -> ListItem: - """Create a ListItem for a single binding.""" - key, description = self._parse_binding(binding) - key_display = self._format_key_display(key) - text = f"[bold {KEY_COLOR}]{key_display:>16}[/] [{DESC_COLOR}]{description:<25}[/]" - return ListItem(Label(text)) - - def compose(self) -> ComposeResult: - app = self._app() - bindings = list(app.BINDINGS) - - with Container(id="help_container"): - yield Static("Keybindings", id="help_title") - with Vertical(id="help_content"): - yield ListView( - *[self._make_item(b) for b in bindings], - classes="help_list", - ) - yield Static( - f"Press [bold {KEY_COLOR}]?[/] or [bold {KEY_COLOR}]Escape[/] to close", - id="help_footer", - ) - - async def action_dismiss(self, result: Any | None = None) -> None: - await self.dismiss(result) - - -class StatsScreen(AppContextMixin, ModalScreen): - """Stats screen displaying listening statistics.""" - - BINDINGS = [("escape", "dismiss", "Close"), ("s", "dismiss", "Close")] - - def _format_time(self, milliseconds: int) -> str: - """Format milliseconds as hours and minutes.""" - total_seconds = int(milliseconds) // 1000 - hours, remainder = divmod(total_seconds, 3600) - minutes, _ = divmod(remainder, 60) - if hours > 0: - return f"{hours}h{minutes:02d}" - return f"{minutes}m" - - def _format_date(self, date_str: str | None) -> str: - """Format ISO date string for display.""" - if not date_str: - return "Unknown" - try: - dt = datetime.fromisoformat(date_str.replace("Z", "+00:00")) - return dt.strftime("%Y-%m-%d") - except ValueError: - return date_str - - def _get_signup_year(self) -> int: - """Get signup year using binary search on listening activity.""" - app = self._app() - if not app.client: - return 0 - - current_year = date.today().year - - try: - stats = app.client.get( - "1.0/stats/aggregates", - monthly_listening_interval_duration="12", - monthly_listening_interval_start_date=f"{current_year}-01", - store="Audible", - ) - if not self._has_activity(stats): - return 0 - except Exception: - return 0 - - left, right = 1995, current_year - earliest_year = current_year - - while left <= right: - middle = (left + right) // 2 - try: - stats = app.client.get( - "1.0/stats/aggregates", - monthly_listening_interval_duration="12", - monthly_listening_interval_start_date=f"{middle}-01", - store="Audible", - ) - has_activity = self._has_activity(stats) - except Exception: - has_activity = False - - if has_activity: - earliest_year = middle - right = middle - 1 - else: - left = middle + 1 - - return earliest_year - - @staticmethod - def _has_activity(stats: dict) -> bool: - """Check if stats contain any listening activity.""" - monthly_stats = stats.get("aggregated_monthly_listening_stats", []) - return bool( - monthly_stats and any(s.get("aggregated_sum", 0) > 0 for s in monthly_stats) - ) - - def _get_listening_time(self, duration: int, start_date: str) -> int: - """Get listening time in milliseconds for a given period.""" - app = self._app() - if not app.client: - return 0 - - try: - stats = app.client.get( - "1.0/stats/aggregates", - monthly_listening_interval_duration=str(duration), - monthly_listening_interval_start_date=start_date, - store="Audible", - ) - monthly_stats = stats.get("aggregated_monthly_listening_stats", []) - return sum(s.get("aggregated_sum", 0) for s in monthly_stats) - except Exception: - return 0 - - def _get_finished_books_count(self) -> int: - """Get count of finished books from library.""" - app = self._app() - if not app.library_client or not app.all_items: - return 0 - return sum(1 for item in app.all_items if app.library_client.is_finished(item)) - - def _get_account_info(self) -> dict: - """Get account information including subscription details.""" - app = self._app() - if not app.client: - return {} - - account_info = {} - endpoints = [ - ( - "1.0/account/information", - "subscription_details,plan_summary,subscription_details_payment_instrument,delinquency_status,customer_benefits,customer_segments,directed_ids", - ), - ( - "1.0/customer/information", - "subscription_details_premium,subscription_details_rodizio,customer_segment,subscription_details_channels,migration_details", - ), - ( - "1.0/customer/status", - "benefits_status,member_giving_status,prime_benefits_status,prospect_benefits_status", - ), - ] - - for endpoint, response_groups in endpoints: - try: - response = app.client.get(endpoint, response_groups=response_groups) - account_info.update(response) - except Exception: - pass - - return account_info - - def _get_email(self) -> str: - """Get email from auth, config, or API.""" - app = self._app() - for getter in ( - self._get_email_from_auth, - self._get_email_from_config, - self._get_email_from_auth_file, - self._get_email_from_account_info, - ): - email = getter(app) - if email: - return email - - auth_data: dict[str, Any] | None = None - if app.auth: - try: - auth_data = getattr(app.auth, "data", None) - except Exception: - auth_data = None - - account_info = self._get_account_info() if app.client else None - for candidate in (auth_data, account_info): - email = self._find_email_in_data(candidate) - if email: - return email - - return "Unknown" - - def _get_email_from_auth(self, app: _AppContext) -> str | None: - """Extract email from the authenticator if available.""" - if not app.auth: - return None - try: - email = self._first_email( - getattr(app.auth, "username", None), - getattr(app.auth, "login", None), - getattr(app.auth, "email", None), - ) - if email: - return email - except Exception: - return None - - try: - customer_info = getattr(app.auth, "customer_info", None) - if isinstance(customer_info, dict): - email = self._first_email( - customer_info.get("email"), - customer_info.get("email_address"), - customer_info.get("primary_email"), - ) - if email: - return email - except Exception: - return None - - try: - data = getattr(app.auth, "data", None) - if isinstance(data, dict): - return self._first_email( - data.get("username"), - data.get("email"), - data.get("login"), - data.get("user_email"), - ) - except Exception: - return None - return None - - def _get_email_from_config(self, app: _AppContext) -> str | None: - """Extract email from the config file.""" - try: - if CONFIG_PATH.exists(): - with open(CONFIG_PATH, "r", encoding="utf-8") as f: - config = json.load(f) - return self._first_email( - config.get("email"), - config.get("username"), - config.get("login"), - ) - except Exception: - return None - return None - - def _get_email_from_auth_file(self, app: _AppContext) -> str | None: - """Extract email from the auth file.""" - try: - if AUTH_PATH.exists(): - with open(AUTH_PATH, "r", encoding="utf-8") as f: - auth_file_data = json.load(f) - return self._first_email( - auth_file_data.get("username"), - auth_file_data.get("email"), - auth_file_data.get("login"), - auth_file_data.get("user_email"), - ) - except Exception: - return None - return None - - def _get_email_from_account_info(self, app: _AppContext) -> str | None: - """Extract email from the account info API.""" - if not app.client: - return None - try: - account_info = self._get_account_info() - if account_info: - email = self._first_email( - account_info.get("email"), - account_info.get("customer_email"), - account_info.get("username"), - ) - if email: - return email - customer_info = account_info.get("customer_info", {}) - if isinstance(customer_info, dict): - return self._first_email( - customer_info.get("email"), - customer_info.get("email_address"), - customer_info.get("primary_email"), - ) - except Exception: - return None - return None - - def _first_email(self, *values: str | None) -> str | None: - """Return the first non-empty, non-Unknown email value.""" - for value in values: - if value and value != "Unknown": - return value - return None - - def _find_email_in_data(self, data: Any) -> str | None: - """Search nested data for an email-like value.""" - if data is None: - return None - - stack: list[Any] = [data] - while stack: - current = stack.pop() - if isinstance(current, dict): - stack.extend(current.values()) - elif isinstance(current, list): - stack.extend(current) - elif isinstance(current, str): - if "@" in current: - local, _, domain = current.partition("@") - if local and "." in domain: - return current - return None - - def _get_subscription_details(self, account_info: dict) -> dict: - """Extract subscription details from nested API response.""" - paths = [ - ["customer_details", "subscription", "subscription_details"], - ["customer", "customer_details", "subscription", "subscription_details"], - ["subscription_details"], - ["subscription", "subscription_details"], - ] - for path in paths: - data: Any = account_info - for key in path: - if isinstance(data, dict): - data = data.get(key) - else: - break - if isinstance(data, list) and data: - return data[0] - return {} - - def _get_country(self) -> str: - """Get country from authenticator locale.""" - app = self._app() - if not app.auth: - return "Unknown" - - try: - locale_obj = getattr(app.auth, "locale", None) - if not locale_obj: - return "Unknown" - - if hasattr(locale_obj, "country_code"): - return locale_obj.country_code.upper() - if hasattr(locale_obj, "domain"): - return locale_obj.domain.upper() - if isinstance(locale_obj, str): - return ( - locale_obj.split("_")[-1].upper() - if "_" in locale_obj - else locale_obj.upper() - ) - return str(locale_obj) - except Exception: - return "Unknown" - - def _make_stat_item(self, label: str, value: str) -> ListItem: - """Create a ListItem for a stat.""" - text = f"[bold {KEY_COLOR}]{label:>16}[/] [{DESC_COLOR}]{value:<25}[/]" - return ListItem(Label(text)) - - def compose(self) -> ComposeResult: - app = self._app() - if not app.client: - with Container(id="help_container"): - yield Static("Statistics", id="help_title") - yield Static( - "Not authenticated. Please restart and authenticate.", - classes="help_row", - ) - yield Static( - f"Press [bold {KEY_COLOR}]s[/] or [bold {KEY_COLOR}]Escape[/] to close", - id="help_footer", - ) - return - - today = date.today() - stats_items = self._build_stats_items(today) - - with Container(id="help_container"): - yield Static("Statistics", id="help_title") - with Vertical(id="help_content"): - yield ListView( - *[ - self._make_stat_item(label, value) - for label, value in stats_items - ], - classes="help_list", - ) - yield Static( - f"Press [bold {KEY_COLOR}]s[/] or [bold {KEY_COLOR}]Escape[/] to close", - id="help_footer", - ) - - def _build_stats_items(self, today: date) -> list[tuple[str, str]]: - """Build the list of stats items to display.""" - signup_year = self._get_signup_year() - month_time = self._get_listening_time(1, today.strftime("%Y-%m")) - year_time = self._get_listening_time(12, today.strftime("%Y-01")) - finished_count = self._get_finished_books_count() - app = self._app() - total_books = len(app.all_items) if app.all_items else 0 - - email = self._get_email() - country = self._get_country() - - subscription_name = "Unknown" - subscription_price = "Unknown" - next_bill_date = "Unknown" - - account_info = self._get_account_info() - if account_info: - subscription_data = self._get_subscription_details(account_info) - if subscription_data: - if name := subscription_data.get("name"): - subscription_name = name - - if bill_date := subscription_data.get("next_bill_date"): - next_bill_date = self._format_date(bill_date) - - if bill_amount := subscription_data.get("next_bill_amount", {}): - amount = bill_amount.get("currency_value") - currency = bill_amount.get("currency_code", "EUR") - if amount is not None: - subscription_price = f"{amount} {currency}" - - stats_items = [] - if email != "Unknown": - stats_items.append(("Email", email)) - stats_items.append(("Country Store", country)) - stats_items.append( - ("Signup Year", str(signup_year) if signup_year > 0 else "Unknown") - ) - if next_bill_date != "Unknown": - stats_items.append(("Next Credit", next_bill_date)) - stats_items.append(("Next Bill", next_bill_date)) - if subscription_name != "Unknown": - stats_items.append(("Subscription", subscription_name)) - if subscription_price != "Unknown": - stats_items.append(("Price", subscription_price)) - stats_items.append(("This Month", self._format_time(month_time))) - stats_items.append(("This Year", self._format_time(year_time))) - stats_items.append(("Books Finished", f"{finished_count} / {total_books}")) - - return stats_items - - async def action_dismiss(self, result: Any | None = None) -> None: - await self.dismiss(result) - - -class FilterScreen(ModalScreen[str]): - """Filter screen for searching the library.""" - - BINDINGS = [("escape", "cancel", "Cancel")] - - def __init__( - self, - initial_filter: str = "", - on_change: Callable[[str], None] | None = None, - debounce_seconds: float = 0.2, - ) -> None: - super().__init__() - self._initial_filter = initial_filter - self._on_change = on_change - self._debounce_seconds = debounce_seconds - self._debounce_timer: Timer | None = None - - def compose(self) -> ComposeResult: - with Container(id="filter_container"): - yield Static("Filter Library", id="filter_title") - yield Input( - value=self._initial_filter, - placeholder="Type to filter by title or author...", - id="filter_input", - ) - yield Static( - f"Press [bold {KEY_COLOR}]Enter[/] to apply, " - f"[bold {KEY_COLOR}]Escape[/] to clear", - id="filter_footer", - ) - - def on_mount(self) -> None: - self.query_one("#filter_input", Input).focus() - - def on_input_submitted(self, event: Input.Submitted) -> None: - self.dismiss(event.value) - - def on_input_changed(self, event: Input.Changed) -> None: - callback = self._on_change - if not callback: - return - if self._debounce_timer: - self._debounce_timer.stop() - value = event.value - self._debounce_timer = self.set_timer( - self._debounce_seconds, - lambda: callback(value), - ) - - def action_cancel(self) -> None: - self.dismiss("") - - def on_unmount(self) -> None: - if self._debounce_timer: - self._debounce_timer.stop() diff --git a/auditui/ui/__init__.py b/auditui/ui/__init__.py new file mode 100644 index 0000000..b4e3ec1 --- /dev/null +++ b/auditui/ui/__init__.py @@ -0,0 +1,7 @@ +"""Modal screens: help keybindings, filter input, and listening/account statistics.""" + +from .filter_screen import FilterScreen +from .help_screen import HelpScreen +from .stats_screen import StatsScreen + +__all__ = ["FilterScreen", "HelpScreen", "StatsScreen"] diff --git a/auditui/ui/common.py b/auditui/ui/common.py new file mode 100644 index 0000000..63c0dd3 --- /dev/null +++ b/auditui/ui/common.py @@ -0,0 +1,30 @@ +"""Shared protocol, constants, and mixin for modal screens that need app context.""" + +from typing import Any, Protocol, cast + + +class _AppContext(Protocol): + BINDINGS: list[tuple[str, str, str]] + client: Any + auth: Any + library_client: Any + all_items: list[dict] + + +KEY_DISPLAY_MAP = { + "ctrl+": "^", + "left": "←", + "right": "→", + "up": "↑", + "down": "↓", + "space": "Space", + "enter": "Enter", +} + +KEY_COLOR = "#f9e2af" +DESC_COLOR = "#cdd6f4" + + +class AppContextMixin: + def _app(self) -> _AppContext: + return cast(_AppContext, cast(Any, self).app) diff --git a/auditui/ui/filter_screen.py b/auditui/ui/filter_screen.py new file mode 100644 index 0000000..ae234a2 --- /dev/null +++ b/auditui/ui/filter_screen.py @@ -0,0 +1,66 @@ +"""Filter modal with input; returns filter string on dismiss.""" + +from typing import Callable + +from textual.app import ComposeResult +from textual.containers import Container +from textual.screen import ModalScreen +from textual.timer import Timer +from textual.widgets import Input, Static + +from .common import KEY_COLOR + + +class FilterScreen(ModalScreen[str]): + BINDINGS = [("escape", "cancel", "Cancel")] + + def __init__( + self, + initial_filter: str = "", + on_change: Callable[[str], None] | None = None, + debounce_seconds: float = 0.2, + ) -> None: + super().__init__() + self._initial_filter = initial_filter + self._on_change = on_change + self._debounce_seconds = debounce_seconds + self._debounce_timer: Timer | None = None + + def compose(self) -> ComposeResult: + with Container(id="filter_container"): + yield Static("Filter Library", id="filter_title") + yield Input( + value=self._initial_filter, + placeholder="Type to filter by title or author...", + id="filter_input", + ) + yield Static( + f"Press [bold {KEY_COLOR}]Enter[/] to apply, " + f"[bold {KEY_COLOR}]Escape[/] to clear", + id="filter_footer", + ) + + def on_mount(self) -> None: + self.query_one("#filter_input", Input).focus() + + def on_input_submitted(self, event: Input.Submitted) -> None: + self.dismiss(event.value) + + def on_input_changed(self, event: Input.Changed) -> None: + callback = self._on_change + if not callback: + return + if self._debounce_timer: + self._debounce_timer.stop() + value = event.value + self._debounce_timer = self.set_timer( + self._debounce_seconds, + lambda: callback(value), + ) + + def action_cancel(self) -> None: + self.dismiss("") + + def on_unmount(self) -> None: + if self._debounce_timer: + self._debounce_timer.stop() diff --git a/auditui/ui/help_screen.py b/auditui/ui/help_screen.py new file mode 100644 index 0000000..114a160 --- /dev/null +++ b/auditui/ui/help_screen.py @@ -0,0 +1,54 @@ +"""Help modal that lists keybindings from the main app.""" + +from typing import TYPE_CHECKING, Any + +from textual.app import ComposeResult +from textual.containers import Container, Vertical +from textual.screen import ModalScreen +from textual.widgets import Label, ListItem, ListView, Static + +from .common import KEY_COLOR, KEY_DISPLAY_MAP, DESC_COLOR, AppContextMixin + +if TYPE_CHECKING: + from textual.binding import Binding + + +class HelpScreen(AppContextMixin, ModalScreen): + BINDINGS = [("escape", "dismiss", "Close"), ("?", "dismiss", "Close")] + + @staticmethod + def _format_key_display(key: str) -> str: + result = key + for old, new in KEY_DISPLAY_MAP.items(): + result = result.replace(old, new) + return result + + @staticmethod + def _parse_binding(binding: "Binding | tuple[str, str, str]") -> tuple[str, str]: + if isinstance(binding, tuple): + return binding[0], binding[2] + return binding.key, binding.description + + def _make_item(self, binding: "Binding | tuple[str, str, str]") -> ListItem: + key, description = self._parse_binding(binding) + key_display = self._format_key_display(key) + text = f"[bold {KEY_COLOR}]{key_display:>16}[/] [{DESC_COLOR}]{description:<25}[/]" + return ListItem(Label(text)) + + def compose(self) -> ComposeResult: + app = self._app() + bindings = list(app.BINDINGS) + with Container(id="help_container"): + yield Static("Keybindings", id="help_title") + with Vertical(id="help_content"): + yield ListView( + *[self._make_item(b) for b in bindings], + classes="help_list", + ) + yield Static( + f"Press [bold {KEY_COLOR}]?[/] or [bold {KEY_COLOR}]Escape[/] to close", + id="help_footer", + ) + + async def action_dismiss(self, result: Any | None = None) -> None: + await self.dismiss(result) diff --git a/auditui/ui/stats_screen.py b/auditui/ui/stats_screen.py new file mode 100644 index 0000000..5b11756 --- /dev/null +++ b/auditui/ui/stats_screen.py @@ -0,0 +1,54 @@ +"""Statistics modal showing listening time and account info via StatsAggregator.""" + +from datetime import date +from typing import Any + +from textual.app import ComposeResult +from textual.containers import Container, Vertical +from textual.screen import ModalScreen +from textual.widgets import Label, ListItem, ListView, Static + +from ..stats import StatsAggregator +from .common import KEY_COLOR, DESC_COLOR, AppContextMixin + + +class StatsScreen(AppContextMixin, ModalScreen): + BINDINGS = [("escape", "dismiss", "Close"), ("s", "dismiss", "Close")] + + def _make_stat_item(self, label: str, value: str) -> ListItem: + text = f"[bold {KEY_COLOR}]{label:>16}[/] [{DESC_COLOR}]{value:<25}[/]" + return ListItem(Label(text)) + + def compose(self) -> ComposeResult: + app = self._app() + if not app.client: + with Container(id="help_container"): + yield Static("Statistics", id="help_title") + yield Static( + "Not authenticated. Please restart and authenticate.", + classes="help_row", + ) + yield Static( + f"Press [bold {KEY_COLOR}]s[/] or [bold {KEY_COLOR}]Escape[/] to close", + id="help_footer", + ) + return + aggregator = StatsAggregator( + app.client, app.auth, app.library_client, app.all_items or [] + ) + stats_items = aggregator.get_stats(date.today()) + with Container(id="help_container"): + yield Static("Statistics", id="help_title") + with Vertical(id="help_content"): + yield ListView( + *[self._make_stat_item(label, value) + for label, value in stats_items], + classes="help_list", + ) + yield Static( + f"Press [bold {KEY_COLOR}]s[/] or [bold {KEY_COLOR}]Escape[/] to close", + id="help_footer", + ) + + async def action_dismiss(self, result: Any | None = None) -> None: + await self.dismiss(result) diff --git a/tests/test_app_filter.py b/tests/test_app_filter.py index ff53abc..a3bf9e5 100644 --- a/tests/test_app_filter.py +++ b/tests/test_app_filter.py @@ -4,7 +4,7 @@ from dataclasses import dataclass, field from typing import Any, cast from auditui.app import Auditui -from auditui.search_utils import build_search_text, filter_items +from auditui.library import build_search_text, filter_items class StubLibrary: diff --git a/tests/test_downloads.py b/tests/test_downloads.py index 8f07f36..55e3231 100644 --- a/tests/test_downloads.py +++ b/tests/test_downloads.py @@ -2,24 +2,24 @@ from pathlib import Path import pytest -from auditui import downloads +from auditui.downloads import DownloadManager from auditui.constants import MIN_FILE_SIZE def test_sanitize_filename() -> None: - dm = downloads.DownloadManager.__new__(downloads.DownloadManager) + dm = DownloadManager.__new__(DownloadManager) assert dm._sanitize_filename('a<>:"/\\|?*b') == "a_________b" def test_validate_download_url() -> None: - dm = downloads.DownloadManager.__new__(downloads.DownloadManager) + dm = DownloadManager.__new__(DownloadManager) assert dm._validate_download_url("https://example.com/file") is True assert dm._validate_download_url("http://example.com/file") is True assert dm._validate_download_url("ftp://example.com/file") is False def test_cached_path_and_remove(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: - dm = downloads.DownloadManager.__new__(downloads.DownloadManager) + dm = DownloadManager.__new__(DownloadManager) dm.cache_dir = tmp_path monkeypatch.setattr(dm, "_get_name_from_asin", lambda asin: "My Book") @@ -37,7 +37,7 @@ def test_cached_path_and_remove(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) def test_cached_path_ignores_small_file(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: - dm = downloads.DownloadManager.__new__(downloads.DownloadManager) + dm = DownloadManager.__new__(DownloadManager) dm.cache_dir = tmp_path monkeypatch.setattr(dm, "_get_name_from_asin", lambda asin: "My Book") diff --git a/tests/test_table_utils.py b/tests/test_table_utils.py index 3aec19b..2399d86 100644 --- a/tests/test_table_utils.py +++ b/tests/test_table_utils.py @@ -1,7 +1,13 @@ from dataclasses import dataclass from typing import Any, cast -from auditui import table_utils +from auditui.constants import AUTHOR_NAME_MAX_LENGTH +from auditui.library import ( + create_progress_sort_key, + create_title_sort_key, + format_item_as_row, + truncate_author_name, +) class StubLibrary: @@ -37,22 +43,22 @@ class StubDownloads: def test_create_title_sort_key_normalizes_accents() -> None: - key_fn, _ = table_utils.create_title_sort_key() + key_fn, _ = create_title_sort_key() assert key_fn(["École"]) == "ecole" assert key_fn(["Zoo"]) == "zoo" def test_create_progress_sort_key_parses_percent() -> None: - key_fn, _ = table_utils.create_progress_sort_key() + key_fn, _ = create_progress_sort_key() assert key_fn(["0", "0", "0", "42.5%"]) == 42.5 assert key_fn(["0", "0", "0", "bad"]) == 0.0 def test_truncate_author_name() -> None: - long_name = "A" * (table_utils.AUTHOR_NAME_MAX_LENGTH + 5) - truncated = table_utils.truncate_author_name(long_name) + long_name = "A" * (AUTHOR_NAME_MAX_LENGTH + 5) + truncated = truncate_author_name(long_name) assert truncated.endswith("...") - assert len(truncated) <= table_utils.AUTHOR_NAME_MAX_LENGTH + assert len(truncated) <= AUTHOR_NAME_MAX_LENGTH def test_format_item_as_row_with_downloaded() -> None: @@ -65,7 +71,7 @@ def test_format_item_as_row_with_downloaded() -> None: "percent": 12.34, "asin": "ASIN123", } - title, author, runtime, progress, downloaded = table_utils.format_item_as_row( + title, author, runtime, progress, downloaded = format_item_as_row( item, library, cast(Any, downloads) ) assert title == "Title" @@ -79,5 +85,5 @@ def test_format_item_as_row_zero_progress() -> None: library = StubLibrary() item = {"title": "Title", "authors": "Author", "minutes": 30, "percent": 0.0} - _, _, _, progress, _ = table_utils.format_item_as_row(item, library, None) + _, _, _, progress, _ = format_item_as_row(item, library, None) assert progress == "0%" diff --git a/tests/test_ui_email.py b/tests/test_ui_email.py index bb9e76f..ac187fd 100644 --- a/tests/test_ui_email.py +++ b/tests/test_ui_email.py @@ -1,63 +1,35 @@ import json -from dataclasses import dataclass, field from pathlib import Path -import pytest - -from auditui import ui - - -@dataclass(slots=True) -class DummyApp: - client: object | None = None - auth: object | None = None - library_client: object | None = None - all_items: list[dict] = field(default_factory=list) - BINDINGS: list[tuple[str, str, str]] = field(default_factory=list) - - -@pytest.fixture -def dummy_app() -> DummyApp: - return DummyApp() +from auditui.stats.email import ( + find_email_in_data, + get_email_from_auth, + get_email_from_auth_file, + get_email_from_config, +) def test_find_email_in_data() -> None: - screen = ui.StatsScreen() data = {"a": {"b": ["nope", "user@example.com"]}} - assert screen._find_email_in_data(data) == "user@example.com" + assert find_email_in_data(data) == "user@example.com" -def test_get_email_from_config( - tmp_path: Path, monkeypatch: pytest.MonkeyPatch, dummy_app: DummyApp -) -> None: - screen = ui.StatsScreen() +def test_get_email_from_config(tmp_path: Path) -> None: config_path = tmp_path / "config.json" config_path.write_text(json.dumps({"email": "config@example.com"})) - monkeypatch.setattr(ui, "CONFIG_PATH", config_path) - - email = screen._get_email_from_config(dummy_app) - assert email == "config@example.com" + assert get_email_from_config(config_path) == "config@example.com" -def test_get_email_from_auth_file( - tmp_path: Path, monkeypatch: pytest.MonkeyPatch, dummy_app: DummyApp -) -> None: - screen = ui.StatsScreen() +def test_get_email_from_auth_file(tmp_path: Path) -> None: auth_path = tmp_path / "auth.json" auth_path.write_text(json.dumps({"email": "auth@example.com"})) - monkeypatch.setattr(ui, "AUTH_PATH", auth_path) - - email = screen._get_email_from_auth_file(dummy_app) - assert email == "auth@example.com" + assert get_email_from_auth_file(auth_path) == "auth@example.com" -def test_get_email_from_auth(dummy_app: DummyApp) -> None: - screen = ui.StatsScreen() - +def test_get_email_from_auth() -> None: class Auth: username = "user@example.com" login = None email = None - dummy_app.auth = Auth() - assert screen._get_email_from_auth(dummy_app) == "user@example.com" + assert get_email_from_auth(Auth()) == "user@example.com"