diff --git a/auditui/app.py b/auditui/app.py new file mode 100644 index 0000000..05bc82d --- /dev/null +++ b/auditui/app.py @@ -0,0 +1,292 @@ +"""Textual application for the Audible TUI.""" + +from pathlib import Path + +from textual import work +from textual.app import App, ComposeResult +from textual.events import Key +from textual.widgets import DataTable, Footer, Header, Static +from textual.worker import get_current_worker + +from .constants import TABLE_COLUMNS, TABLE_CSS +from .downloads import DownloadManager +from .library import LibraryClient +from .playback import PlaybackController + + +class Auditui(App): + """Main application class for the Audible TUI app.""" + + BINDINGS = [ + ("d", "toggle_dark", "Toggle dark mode"), + ("s", "sort", "Sort by title"), + ("r", "reverse_sort", "Reverse sort"), + ("p", "sort_by_progress", "Sort by progress"), + ("a", "show_all", "Show all books"), + ("u", "show_unfinished", "Show unfinished"), + ("enter", "play_selected", "Play selected book"), + ("space", "toggle_playback", "Pause/Resume"), + ("q", "quit", "Quit application"), + ] + + CSS = TABLE_CSS + + def __init__(self, auth=None, client=None) -> None: + super().__init__() + self.auth = auth + self.client = client + self.library_client = LibraryClient(client) if client else None + self.download_manager = ( + DownloadManager(auth, client) if auth and client else None + ) + self.playback = PlaybackController(self.update_status) + + self.all_items: list = [] + self.current_items: list = [] + self.show_all_mode = False + self.progress_sort_reverse = False + self.title_column_key = None + self.progress_column_key = None + self.progress_column_index = 3 + + def compose(self) -> ComposeResult: + yield Header() + yield Static("Loading...", id="status") + table = DataTable() + table.zebra_stripes = True + table.cursor_type = "row" + yield table + yield Footer() + + def on_mount(self) -> None: + """Initialize the table and start fetching library data.""" + table = self.query_one(DataTable) + table.add_columns(*TABLE_COLUMNS) + column_keys = list(table.columns.keys()) + self.title_column_key = column_keys[0] + self.progress_column_key = column_keys[3] + + if self.client: + self.update_status("Fetching library...") + self.fetch_library() + else: + self.update_status("Not authenticated. Please restart and authenticate.") + + self.set_interval(1.0, self._check_playback_status) + + def on_unmount(self) -> None: + """Clean up on app exit.""" + self.playback.stop() + + def on_key(self, event: Key) -> None: + """Handle key presses on DataTable.""" + if isinstance(self.focused, DataTable): + if event.key == "enter": + event.prevent_default() + self.action_play_selected() + elif event.key == "space": + event.prevent_default() + self.action_toggle_playback() + + def update_status(self, message: str) -> None: + """Update the status message in the UI.""" + status = self.query_one("#status", Static) + status.update(message) + + def _thread_status_update(self, message: str) -> None: + """Safely update status from worker threads.""" + self.call_from_thread(self.update_status, message) + + @work(exclusive=True, thread=True) + def fetch_library(self) -> None: + """Fetch all library items from Audible API in background thread.""" + worker = get_current_worker() + if worker.is_cancelled or not self.library_client: + return + + try: + all_items = self.library_client.fetch_all_items(self._thread_status_update) + self.call_from_thread(self.on_library_loaded, all_items) + except (OSError, ValueError, KeyError) as exc: + self.call_from_thread(self.on_library_error, str(exc)) + + def on_library_loaded(self, items: list) -> None: + """Handle successful library load.""" + self.all_items = items + self.update_status(f"Loaded {len(items)} books") + self.show_unfinished() + + def on_library_error(self, error: str) -> None: + """Handle library fetch error.""" + self.update_status(f"Error fetching library: {error}") + + def _populate_table(self, items: list) -> None: + """Populate the DataTable with library items.""" + table = self.query_one(DataTable) + table.clear() + + if not items or not self.library_client: + self.update_status("No books found.") + return + + for item in items: + title = self.library_client.extract_title(item) + author_names = self.library_client.extract_authors(item) + minutes = self.library_client.extract_runtime_minutes(item) + runtime_str = self.library_client.format_duration( + minutes, unit="minutes", default_none="Unknown length" + ) + percent_complete = self.library_client.extract_progress_info(item) + + progress_str = "0%" + if percent_complete is not None and percent_complete > 0: + progress_str = f"{percent_complete:.1f}%" + + table.add_row( + title, + author_names or "Unknown", + runtime_str or "Unknown", + progress_str, + key=title, + ) + + self.current_items = items + mode = "all" if self.show_all_mode else "unfinished" + self.update_status(f"Showing {len(items)} books ({mode})") + + def show_all(self) -> None: + """Display all books in the table.""" + if not self.all_items: + return + self.show_all_mode = True + self._populate_table(self.all_items) + + def show_unfinished(self) -> None: + """Display only unfinished books in the table.""" + if not self.all_items or not self.library_client: + return + + self.show_all_mode = False + unfinished_items = [ + item for item in self.all_items if not self.library_client.is_finished(item) + ] + self._populate_table(unfinished_items) + + def action_toggle_dark(self) -> None: + """Toggle between dark and light theme.""" + self.theme = ( + "textual-dark" if self.theme == "textual-light" else "textual-light" + ) + + def action_sort(self) -> None: + """Sort table by title in ascending order.""" + table = self.query_one(DataTable) + if table.row_count > 0: + table.sort(self.title_column_key) + + def action_reverse_sort(self) -> None: + """Sort table by title in descending order.""" + table = self.query_one(DataTable) + if table.row_count > 0: + table.sort(self.title_column_key, reverse=True) + + def action_sort_by_progress(self) -> None: + """Sort table by progress percentage, toggling direction on each press.""" + table = self.query_one(DataTable) + if table.row_count > 0: + self.progress_sort_reverse = not self.progress_sort_reverse + + def progress_key(row_values): + progress_cell = row_values[self.progress_column_index] + if isinstance(progress_cell, str): + try: + return float(progress_cell.rstrip("%")) + except (ValueError, AttributeError): + return 0.0 + return 0.0 + + table.sort(key=progress_key, reverse=self.progress_sort_reverse) + + def action_show_all(self) -> None: + """Action handler to show all books.""" + self.show_all() + + def action_show_unfinished(self) -> None: + """Action handler to show unfinished books.""" + self.show_unfinished() + + def action_play_selected(self) -> None: + """Start playing the selected book.""" + if not self.download_manager: + self.update_status("Not authenticated. Please restart and authenticate.") + return + + table = self.query_one(DataTable) + if table.row_count == 0: + self.update_status("No books available") + return + + cursor_row = table.cursor_row + if cursor_row >= len(self.current_items): + self.update_status("Invalid selection") + return + + selected_item = self.current_items[cursor_row] + asin = self.library_client.extract_asin(selected_item) if self.library_client else None + + if not asin: + self.update_status("Could not get ASIN for selected book") + return + + if self.playback.is_playing: + self.playback.stop() + + self.playback.current_asin = asin + self._start_playback_async(asin) + + def action_toggle_playback(self) -> None: + """Toggle pause/resume state.""" + if not self.playback.is_playing: + self.update_status("No playback active. Press Enter to play a book.") + return + + if not self.playback.is_alive(): + self.playback.stop() + self.update_status("Playback has ended") + return + + if self.playback.is_paused: + self.playback.resume() + else: + self.playback.pause() + + def _check_playback_status(self) -> None: + """Check if playback process has finished and update state accordingly.""" + message = self.playback.check_status() + if message: + self.update_status(message) + + @work(exclusive=True, thread=True) + def _start_playback_async(self, asin: str) -> None: + """Start playback asynchronously.""" + if not self.download_manager: + self.call_from_thread(self.update_status, "Could not download file") + return + + self.call_from_thread(self.update_status, "Preparing playback...") + + local_path = self.download_manager.get_or_download(asin, self._thread_status_update) + if not local_path: + self.call_from_thread(self.update_status, "Could not download file") + return + + self.call_from_thread(self.update_status, "Getting activation bytes...") + activation_hex = self.download_manager.get_activation_bytes() + if not activation_hex: + self.call_from_thread(self.update_status, "Failed to get activation bytes") + return + + self.call_from_thread( + self.update_status, f"Starting playback of {local_path.name}..." + ) + self.call_from_thread(self.playback.start, local_path, activation_hex)