feat: add app submodule

This commit is contained in:
2025-12-07 00:09:03 +01:00
parent 27f9a5396e
commit 3b9d1ecf96

292
auditui/app.py Normal file
View File

@@ -0,0 +1,292 @@
"""Textual application for the Audible TUI."""
from pathlib import Path
from textual import work
from textual.app import App, ComposeResult
from textual.events import Key
from textual.widgets import DataTable, Footer, Header, Static
from textual.worker import get_current_worker
from .constants import TABLE_COLUMNS, TABLE_CSS
from .downloads import DownloadManager
from .library import LibraryClient
from .playback import PlaybackController
class Auditui(App):
"""Main application class for the Audible TUI app."""
BINDINGS = [
("d", "toggle_dark", "Toggle dark mode"),
("s", "sort", "Sort by title"),
("r", "reverse_sort", "Reverse sort"),
("p", "sort_by_progress", "Sort by progress"),
("a", "show_all", "Show all books"),
("u", "show_unfinished", "Show unfinished"),
("enter", "play_selected", "Play selected book"),
("space", "toggle_playback", "Pause/Resume"),
("q", "quit", "Quit application"),
]
CSS = TABLE_CSS
def __init__(self, auth=None, client=None) -> None:
super().__init__()
self.auth = auth
self.client = client
self.library_client = LibraryClient(client) if client else None
self.download_manager = (
DownloadManager(auth, client) if auth and client else None
)
self.playback = PlaybackController(self.update_status)
self.all_items: list = []
self.current_items: list = []
self.show_all_mode = False
self.progress_sort_reverse = False
self.title_column_key = None
self.progress_column_key = None
self.progress_column_index = 3
def compose(self) -> ComposeResult:
yield Header()
yield Static("Loading...", id="status")
table = DataTable()
table.zebra_stripes = True
table.cursor_type = "row"
yield table
yield Footer()
def on_mount(self) -> None:
"""Initialize the table and start fetching library data."""
table = self.query_one(DataTable)
table.add_columns(*TABLE_COLUMNS)
column_keys = list(table.columns.keys())
self.title_column_key = column_keys[0]
self.progress_column_key = column_keys[3]
if self.client:
self.update_status("Fetching library...")
self.fetch_library()
else:
self.update_status("Not authenticated. Please restart and authenticate.")
self.set_interval(1.0, self._check_playback_status)
def on_unmount(self) -> None:
"""Clean up on app exit."""
self.playback.stop()
def on_key(self, event: Key) -> None:
"""Handle key presses on DataTable."""
if isinstance(self.focused, DataTable):
if event.key == "enter":
event.prevent_default()
self.action_play_selected()
elif event.key == "space":
event.prevent_default()
self.action_toggle_playback()
def update_status(self, message: str) -> None:
"""Update the status message in the UI."""
status = self.query_one("#status", Static)
status.update(message)
def _thread_status_update(self, message: str) -> None:
"""Safely update status from worker threads."""
self.call_from_thread(self.update_status, message)
@work(exclusive=True, thread=True)
def fetch_library(self) -> None:
"""Fetch all library items from Audible API in background thread."""
worker = get_current_worker()
if worker.is_cancelled or not self.library_client:
return
try:
all_items = self.library_client.fetch_all_items(self._thread_status_update)
self.call_from_thread(self.on_library_loaded, all_items)
except (OSError, ValueError, KeyError) as exc:
self.call_from_thread(self.on_library_error, str(exc))
def on_library_loaded(self, items: list) -> None:
"""Handle successful library load."""
self.all_items = items
self.update_status(f"Loaded {len(items)} books")
self.show_unfinished()
def on_library_error(self, error: str) -> None:
"""Handle library fetch error."""
self.update_status(f"Error fetching library: {error}")
def _populate_table(self, items: list) -> None:
"""Populate the DataTable with library items."""
table = self.query_one(DataTable)
table.clear()
if not items or not self.library_client:
self.update_status("No books found.")
return
for item in items:
title = self.library_client.extract_title(item)
author_names = self.library_client.extract_authors(item)
minutes = self.library_client.extract_runtime_minutes(item)
runtime_str = self.library_client.format_duration(
minutes, unit="minutes", default_none="Unknown length"
)
percent_complete = self.library_client.extract_progress_info(item)
progress_str = "0%"
if percent_complete is not None and percent_complete > 0:
progress_str = f"{percent_complete:.1f}%"
table.add_row(
title,
author_names or "Unknown",
runtime_str or "Unknown",
progress_str,
key=title,
)
self.current_items = items
mode = "all" if self.show_all_mode else "unfinished"
self.update_status(f"Showing {len(items)} books ({mode})")
def show_all(self) -> None:
"""Display all books in the table."""
if not self.all_items:
return
self.show_all_mode = True
self._populate_table(self.all_items)
def show_unfinished(self) -> None:
"""Display only unfinished books in the table."""
if not self.all_items or not self.library_client:
return
self.show_all_mode = False
unfinished_items = [
item for item in self.all_items if not self.library_client.is_finished(item)
]
self._populate_table(unfinished_items)
def action_toggle_dark(self) -> None:
"""Toggle between dark and light theme."""
self.theme = (
"textual-dark" if self.theme == "textual-light" else "textual-light"
)
def action_sort(self) -> None:
"""Sort table by title in ascending order."""
table = self.query_one(DataTable)
if table.row_count > 0:
table.sort(self.title_column_key)
def action_reverse_sort(self) -> None:
"""Sort table by title in descending order."""
table = self.query_one(DataTable)
if table.row_count > 0:
table.sort(self.title_column_key, reverse=True)
def action_sort_by_progress(self) -> None:
"""Sort table by progress percentage, toggling direction on each press."""
table = self.query_one(DataTable)
if table.row_count > 0:
self.progress_sort_reverse = not self.progress_sort_reverse
def progress_key(row_values):
progress_cell = row_values[self.progress_column_index]
if isinstance(progress_cell, str):
try:
return float(progress_cell.rstrip("%"))
except (ValueError, AttributeError):
return 0.0
return 0.0
table.sort(key=progress_key, reverse=self.progress_sort_reverse)
def action_show_all(self) -> None:
"""Action handler to show all books."""
self.show_all()
def action_show_unfinished(self) -> None:
"""Action handler to show unfinished books."""
self.show_unfinished()
def action_play_selected(self) -> None:
"""Start playing the selected book."""
if not self.download_manager:
self.update_status("Not authenticated. Please restart and authenticate.")
return
table = self.query_one(DataTable)
if table.row_count == 0:
self.update_status("No books available")
return
cursor_row = table.cursor_row
if cursor_row >= len(self.current_items):
self.update_status("Invalid selection")
return
selected_item = self.current_items[cursor_row]
asin = self.library_client.extract_asin(selected_item) if self.library_client else None
if not asin:
self.update_status("Could not get ASIN for selected book")
return
if self.playback.is_playing:
self.playback.stop()
self.playback.current_asin = asin
self._start_playback_async(asin)
def action_toggle_playback(self) -> None:
"""Toggle pause/resume state."""
if not self.playback.is_playing:
self.update_status("No playback active. Press Enter to play a book.")
return
if not self.playback.is_alive():
self.playback.stop()
self.update_status("Playback has ended")
return
if self.playback.is_paused:
self.playback.resume()
else:
self.playback.pause()
def _check_playback_status(self) -> None:
"""Check if playback process has finished and update state accordingly."""
message = self.playback.check_status()
if message:
self.update_status(message)
@work(exclusive=True, thread=True)
def _start_playback_async(self, asin: str) -> None:
"""Start playback asynchronously."""
if not self.download_manager:
self.call_from_thread(self.update_status, "Could not download file")
return
self.call_from_thread(self.update_status, "Preparing playback...")
local_path = self.download_manager.get_or_download(asin, self._thread_status_update)
if not local_path:
self.call_from_thread(self.update_status, "Could not download file")
return
self.call_from_thread(self.update_status, "Getting activation bytes...")
activation_hex = self.download_manager.get_activation_bytes()
if not activation_hex:
self.call_from_thread(self.update_status, "Failed to get activation bytes")
return
self.call_from_thread(
self.update_status, f"Starting playback of {local_path.name}..."
)
self.call_from_thread(self.playback.start, local_path, activation_hex)