From c3c3b083f9687963d4989e36d50fec545c5665dc Mon Sep 17 00:00:00 2001 From: Kharec Date: Tue, 25 Nov 2025 22:05:08 +0100 Subject: [PATCH] feat: add first successful and consistent attempt textualize+audible --- main.py | 379 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 379 insertions(+) create mode 100644 main.py diff --git a/main.py b/main.py new file mode 100644 index 0000000..cd258c8 --- /dev/null +++ b/main.py @@ -0,0 +1,379 @@ +#!/usr/bin/env python3 +"""A terminal-based user interface (TUI) client for Audible""" + +import sys +from getpass import getpass +from pathlib import Path + +import audible +from textual.app import App, ComposeResult +from textual.widgets import Footer, Header, DataTable, Static +from textual.worker import get_current_worker +from textual import work + + +class AudituiApp(App): + """Main application class for the Audible TUI app.""" + BINDINGS = [ + ("d", "toggle_dark", "Toggle dark mode"), + ("s", "sort", "Sort by title"), + ("r", "reverse_sort", "Reverse sort"), + ("p", "sort_by_progress", "Sort by progress"), + ("a", "show_all", "Show all books"), + ("u", "show_unfinished", "Show unfinished"), + ("q", "quit", "Quit application"), + ] + + CSS = """ + DataTable { + height: 1fr; + } + Static { + height: 1; + text-align: center; + background: $primary; + } + """ + + AUTH_PATH = Path.home() / ".config" / "auditui" / "auth.json" + + def __init__(self): + super().__init__() + self.auth = None + self.client = None + self.all_items = [] + self.current_items = [] + self.show_all_mode = False + self.progress_sort_reverse = False + self.title_column_key = None + self.progress_column_key = None + self.progress_column_index = 3 + + def compose(self) -> ComposeResult: + yield Header() + yield Static("Loading...", id="status") + table = DataTable() + table.zebra_stripes = True + table.cursor_type = "row" + yield table + yield Footer() + + def on_mount(self) -> None: + """Initialize the table and start fetching library data.""" + table = self.query_one(DataTable) + table.add_columns("Title", "Author", "Length", "Progress") + column_keys = list(table.columns.keys()) + self.title_column_key = column_keys[0] + self.progress_column_key = column_keys[3] + if self.client: + self.update_status("Fetching library...") + self.fetch_library() + else: + self.update_status( + "Not authenticated. Please restart and authenticate.") + + @work(exclusive=True, thread=True) + def fetch_library(self) -> None: + """Fetch all library items from Audible API in background thread.""" + worker = get_current_worker() + if worker.is_cancelled: + return + + try: + response_groups = ( + "contributors,media,product_attrs,product_desc,product_details," + "rating,is_finished,listening_status,percent_complete" + ) + all_items = self._fetch_all_pages(response_groups) + self.all_items = all_items + self.call_from_thread(self.on_library_loaded, all_items) + except (OSError, ValueError, KeyError) as e: + self.call_from_thread(self.on_library_error, str(e)) + + def _fetch_all_pages(self, response_groups): + """Fetch all pages of library items from the API.""" + all_items = [] + page = 1 + page_size = 50 + + while True: + library = self.client.get( + path="library", + num_results=page_size, + page=page, + response_groups=response_groups + ) + + items = library.get("items", []) + + if not items: + break + + all_items.extend(items) + self.call_from_thread( + self.update_status, + f"Fetched page {page} ({len(items)} items)..." + ) + + if len(items) < page_size: + break + + page += 1 + + return all_items + + def on_library_loaded(self, items) -> None: + """Handle successful library load.""" + self.all_items = items + self.update_status(f"Loaded {len(items)} books") + self.show_unfinished() + + def on_library_error(self, error: str) -> None: + """Handle library fetch error.""" + self.update_status(f"Error fetching library: {error}") + + def update_status(self, message: str) -> None: + """Update the status message in the UI.""" + status = self.query_one("#status", Static) + status.update(message) + + def format_duration(self, value, unit='minutes', default_none=None): + """Format duration value into human-readable string.""" + if value is None or value <= 0: + return default_none + + if unit == 'seconds': + total_minutes = int(value) // 60 + else: + total_minutes = int(value) + + if total_minutes < 60: + return f"{total_minutes} minute{'s' if total_minutes != 1 else ''}" + + hours = total_minutes // 60 + mins = total_minutes % 60 + if mins == 0: + return f"{hours} hour{'s' if hours != 1 else ''}" + return f"{hours} hour{'s' if hours != 1 else ''} {mins} minute{'s' if mins != 1 else ''}" + + def _extract_title(self, item): + """Extract title from library item.""" + product = item.get("product", {}) + return (product.get("title") or + item.get("title") or + product.get("asin", "Unknown Title")) + + def _extract_authors(self, item): + """Extract author names from library item.""" + product = item.get("product", {}) + authors = product.get("authors") or product.get("contributors") or [] + if not authors and "authors" in item: + authors = item.get("authors", []) + return ", ".join([a.get("name", "") for a in authors if isinstance(a, dict)]) + + def _extract_runtime_minutes(self, item): + """Extract runtime in minutes from library item.""" + product = item.get("product", {}) + runtime_fields = [ + "runtime_length_min", + "runtime_length", + "vLength", + "length", + "duration" + ] + + runtime = None + for field in runtime_fields: + runtime = product.get(field) + if runtime is None: + runtime = item.get(field) + if runtime is not None: + break + + if runtime is None: + return None + + if isinstance(runtime, dict): + if "min" in runtime: + return int(runtime.get("min", 0)) + elif isinstance(runtime, (int, float)): + return int(runtime) + + return None + + def _extract_progress_info(self, item): + """Extract progress percentage from library item.""" + percent_complete = item.get("percent_complete") + listening_status = item.get("listening_status", {}) + + if isinstance(listening_status, dict): + if percent_complete is None: + percent_complete = listening_status.get("percent_complete") + else: + percent_complete = None + + return percent_complete + + def _populate_table(self, items): + """Populate the DataTable with library items.""" + table = self.query_one(DataTable) + table.clear() + + if not items: + self.update_status("No books found.") + return + + for item in items: + title = self._extract_title(item) + author_names = self._extract_authors(item) + minutes = self._extract_runtime_minutes(item) + runtime_str = self.format_duration( + minutes, unit='minutes', default_none="Unknown length") + percent_complete = self._extract_progress_info(item) + + progress_str = "0%" + if percent_complete is not None and percent_complete > 0: + progress_str = f"{percent_complete:.1f}%" + + table.add_row( + title, + author_names or "Unknown", + runtime_str or "Unknown", + progress_str, + key=title + ) + + self.current_items = items + mode = "all" if self.show_all_mode else "unfinished" + self.update_status(f"Showing {len(items)} books ({mode})") + + def show_all(self) -> None: + """Display all books in the table.""" + if not self.all_items: + return + self.show_all_mode = True + self._populate_table(self.all_items) + + def show_unfinished(self) -> None: + """Display only unfinished books in the table.""" + if not self.all_items: + return + self.show_all_mode = False + + unfinished_items = [] + for item in self.all_items: + is_finished_flag = item.get("is_finished") + percent_complete = item.get("percent_complete") + listening_status = item.get("listening_status") + + if isinstance(listening_status, dict): + is_finished_flag = is_finished_flag or listening_status.get( + "is_finished", False) + if percent_complete is None: + percent_complete = listening_status.get( + "percent_complete", 0) + + is_finished = False + if is_finished_flag is True: + is_finished = True + elif isinstance(percent_complete, (int, float)) and percent_complete >= 100: + is_finished = True + + if not is_finished: + unfinished_items.append(item) + + self._populate_table(unfinished_items) + + def action_toggle_dark(self) -> None: + """Toggle between dark and light theme.""" + self.theme = ( + "textual-dark" if self.theme == "textual-light" else "textual-light" + ) + + def action_sort(self) -> None: + """Sort table by title in ascending order.""" + table = self.query_one(DataTable) + if table.row_count > 0: + table.sort(self.title_column_key) + + def action_reverse_sort(self) -> None: + """Sort table by title in descending order.""" + table = self.query_one(DataTable) + if table.row_count > 0: + table.sort(self.title_column_key, reverse=True) + + def action_sort_by_progress(self) -> None: + """Sort table by progress percentage, toggling direction on each press.""" + table = self.query_one(DataTable) + if table.row_count > 0: + self.progress_sort_reverse = not self.progress_sort_reverse + + def progress_key(row_values): + progress_cell = row_values[self.progress_column_index] + if isinstance(progress_cell, str): + try: + return float(progress_cell.rstrip("%")) + except (ValueError, AttributeError): + return 0.0 + return 0.0 + + table.sort(key=progress_key, reverse=self.progress_sort_reverse) + + def action_show_all(self) -> None: + """Action handler to show all books.""" + self.show_all() + + def action_show_unfinished(self) -> None: + """Action handler to show unfinished books.""" + self.show_unfinished() + + +def authenticate(): + """Authenticate with Audible and return auth and client objects.""" + auth_path = Path.home() / ".config" / "auditui" / "auth.json" + auth_path.parent.mkdir(parents=True, exist_ok=True) + + authenticator = None + if auth_path.exists(): + try: + authenticator = audible.Authenticator.from_file(str(auth_path)) + audible_client = audible.Client(auth=authenticator) + return authenticator, audible_client + except (OSError, ValueError, KeyError) as e: + print(f"Failed to load existing auth: {e}") + print("Please re-authenticate.") + + print("Please authenticate with your Audible account.") + print("You will need to provide:") + print(" - Your Audible email/username") + print(" - Your password") + print(" - Your marketplace locale (e.g., 'US', 'UK', 'DE', 'FR')") + + email = input("\nEmail: ") + password = getpass("Password: ") + marketplace = input( + "Marketplace locale (default: US): ").strip().upper() or "US" + + try: + authenticator = audible.Authenticator.from_login( + username=email, + password=password, + locale=marketplace + ) + + auth_path.parent.mkdir(parents=True, exist_ok=True) + authenticator.to_file(str(auth_path)) + print("Authentication successful!") + audible_client = audible.Client(auth=authenticator) + return authenticator, audible_client + except (OSError, ValueError, KeyError) as e: + print(f"Authentication failed: {e}") + sys.exit(1) + + +if __name__ == "__main__": + auth, client = authenticate() + app = AudituiApp() + app.auth = auth + app.client = client + app.run()