Files
auditui/main.py

380 lines
12 KiB
Python

#!/usr/bin/env python3
"""A terminal-based user interface (TUI) client for Audible"""
import sys
from getpass import getpass
from pathlib import Path
import audible
from textual.app import App, ComposeResult
from textual.widgets import Footer, Header, DataTable, Static
from textual.worker import get_current_worker
from textual import work
class AudituiApp(App):
"""Main application class for the Audible TUI app."""
BINDINGS = [
("d", "toggle_dark", "Toggle dark mode"),
("s", "sort", "Sort by title"),
("r", "reverse_sort", "Reverse sort"),
("p", "sort_by_progress", "Sort by progress"),
("a", "show_all", "Show all books"),
("u", "show_unfinished", "Show unfinished"),
("q", "quit", "Quit application"),
]
CSS = """
DataTable {
height: 1fr;
}
Static {
height: 1;
text-align: center;
background: $primary;
}
"""
AUTH_PATH = Path.home() / ".config" / "auditui" / "auth.json"
def __init__(self):
super().__init__()
self.auth = None
self.client = None
self.all_items = []
self.current_items = []
self.show_all_mode = False
self.progress_sort_reverse = False
self.title_column_key = None
self.progress_column_key = None
self.progress_column_index = 3
def compose(self) -> ComposeResult:
yield Header()
yield Static("Loading...", id="status")
table = DataTable()
table.zebra_stripes = True
table.cursor_type = "row"
yield table
yield Footer()
def on_mount(self) -> None:
"""Initialize the table and start fetching library data."""
table = self.query_one(DataTable)
table.add_columns("Title", "Author", "Length", "Progress")
column_keys = list(table.columns.keys())
self.title_column_key = column_keys[0]
self.progress_column_key = column_keys[3]
if self.client:
self.update_status("Fetching library...")
self.fetch_library()
else:
self.update_status(
"Not authenticated. Please restart and authenticate.")
@work(exclusive=True, thread=True)
def fetch_library(self) -> None:
"""Fetch all library items from Audible API in background thread."""
worker = get_current_worker()
if worker.is_cancelled:
return
try:
response_groups = (
"contributors,media,product_attrs,product_desc,product_details,"
"rating,is_finished,listening_status,percent_complete"
)
all_items = self._fetch_all_pages(response_groups)
self.all_items = all_items
self.call_from_thread(self.on_library_loaded, all_items)
except (OSError, ValueError, KeyError) as e:
self.call_from_thread(self.on_library_error, str(e))
def _fetch_all_pages(self, response_groups):
"""Fetch all pages of library items from the API."""
all_items = []
page = 1
page_size = 50
while True:
library = self.client.get(
path="library",
num_results=page_size,
page=page,
response_groups=response_groups
)
items = library.get("items", [])
if not items:
break
all_items.extend(items)
self.call_from_thread(
self.update_status,
f"Fetched page {page} ({len(items)} items)..."
)
if len(items) < page_size:
break
page += 1
return all_items
def on_library_loaded(self, items) -> None:
"""Handle successful library load."""
self.all_items = items
self.update_status(f"Loaded {len(items)} books")
self.show_unfinished()
def on_library_error(self, error: str) -> None:
"""Handle library fetch error."""
self.update_status(f"Error fetching library: {error}")
def update_status(self, message: str) -> None:
"""Update the status message in the UI."""
status = self.query_one("#status", Static)
status.update(message)
def format_duration(self, value, unit='minutes', default_none=None):
"""Format duration value into human-readable string."""
if value is None or value <= 0:
return default_none
if unit == 'seconds':
total_minutes = int(value) // 60
else:
total_minutes = int(value)
if total_minutes < 60:
return f"{total_minutes} minute{'s' if total_minutes != 1 else ''}"
hours = total_minutes // 60
mins = total_minutes % 60
if mins == 0:
return f"{hours} hour{'s' if hours != 1 else ''}"
return f"{hours} hour{'s' if hours != 1 else ''} {mins} minute{'s' if mins != 1 else ''}"
def _extract_title(self, item):
"""Extract title from library item."""
product = item.get("product", {})
return (product.get("title") or
item.get("title") or
product.get("asin", "Unknown Title"))
def _extract_authors(self, item):
"""Extract author names from library item."""
product = item.get("product", {})
authors = product.get("authors") or product.get("contributors") or []
if not authors and "authors" in item:
authors = item.get("authors", [])
return ", ".join([a.get("name", "") for a in authors if isinstance(a, dict)])
def _extract_runtime_minutes(self, item):
"""Extract runtime in minutes from library item."""
product = item.get("product", {})
runtime_fields = [
"runtime_length_min",
"runtime_length",
"vLength",
"length",
"duration"
]
runtime = None
for field in runtime_fields:
runtime = product.get(field)
if runtime is None:
runtime = item.get(field)
if runtime is not None:
break
if runtime is None:
return None
if isinstance(runtime, dict):
if "min" in runtime:
return int(runtime.get("min", 0))
elif isinstance(runtime, (int, float)):
return int(runtime)
return None
def _extract_progress_info(self, item):
"""Extract progress percentage from library item."""
percent_complete = item.get("percent_complete")
listening_status = item.get("listening_status", {})
if isinstance(listening_status, dict):
if percent_complete is None:
percent_complete = listening_status.get("percent_complete")
else:
percent_complete = None
return percent_complete
def _populate_table(self, items):
"""Populate the DataTable with library items."""
table = self.query_one(DataTable)
table.clear()
if not items:
self.update_status("No books found.")
return
for item in items:
title = self._extract_title(item)
author_names = self._extract_authors(item)
minutes = self._extract_runtime_minutes(item)
runtime_str = self.format_duration(
minutes, unit='minutes', default_none="Unknown length")
percent_complete = self._extract_progress_info(item)
progress_str = "0%"
if percent_complete is not None and percent_complete > 0:
progress_str = f"{percent_complete:.1f}%"
table.add_row(
title,
author_names or "Unknown",
runtime_str or "Unknown",
progress_str,
key=title
)
self.current_items = items
mode = "all" if self.show_all_mode else "unfinished"
self.update_status(f"Showing {len(items)} books ({mode})")
def show_all(self) -> None:
"""Display all books in the table."""
if not self.all_items:
return
self.show_all_mode = True
self._populate_table(self.all_items)
def show_unfinished(self) -> None:
"""Display only unfinished books in the table."""
if not self.all_items:
return
self.show_all_mode = False
unfinished_items = []
for item in self.all_items:
is_finished_flag = item.get("is_finished")
percent_complete = item.get("percent_complete")
listening_status = item.get("listening_status")
if isinstance(listening_status, dict):
is_finished_flag = is_finished_flag or listening_status.get(
"is_finished", False)
if percent_complete is None:
percent_complete = listening_status.get(
"percent_complete", 0)
is_finished = False
if is_finished_flag is True:
is_finished = True
elif isinstance(percent_complete, (int, float)) and percent_complete >= 100:
is_finished = True
if not is_finished:
unfinished_items.append(item)
self._populate_table(unfinished_items)
def action_toggle_dark(self) -> None:
"""Toggle between dark and light theme."""
self.theme = (
"textual-dark" if self.theme == "textual-light" else "textual-light"
)
def action_sort(self) -> None:
"""Sort table by title in ascending order."""
table = self.query_one(DataTable)
if table.row_count > 0:
table.sort(self.title_column_key)
def action_reverse_sort(self) -> None:
"""Sort table by title in descending order."""
table = self.query_one(DataTable)
if table.row_count > 0:
table.sort(self.title_column_key, reverse=True)
def action_sort_by_progress(self) -> None:
"""Sort table by progress percentage, toggling direction on each press."""
table = self.query_one(DataTable)
if table.row_count > 0:
self.progress_sort_reverse = not self.progress_sort_reverse
def progress_key(row_values):
progress_cell = row_values[self.progress_column_index]
if isinstance(progress_cell, str):
try:
return float(progress_cell.rstrip("%"))
except (ValueError, AttributeError):
return 0.0
return 0.0
table.sort(key=progress_key, reverse=self.progress_sort_reverse)
def action_show_all(self) -> None:
"""Action handler to show all books."""
self.show_all()
def action_show_unfinished(self) -> None:
"""Action handler to show unfinished books."""
self.show_unfinished()
def authenticate():
"""Authenticate with Audible and return auth and client objects."""
auth_path = Path.home() / ".config" / "auditui" / "auth.json"
auth_path.parent.mkdir(parents=True, exist_ok=True)
authenticator = None
if auth_path.exists():
try:
authenticator = audible.Authenticator.from_file(str(auth_path))
audible_client = audible.Client(auth=authenticator)
return authenticator, audible_client
except (OSError, ValueError, KeyError) as e:
print(f"Failed to load existing auth: {e}")
print("Please re-authenticate.")
print("Please authenticate with your Audible account.")
print("You will need to provide:")
print(" - Your Audible email/username")
print(" - Your password")
print(" - Your marketplace locale (e.g., 'US', 'UK', 'DE', 'FR')")
email = input("\nEmail: ")
password = getpass("Password: ")
marketplace = input(
"Marketplace locale (default: US): ").strip().upper() or "US"
try:
authenticator = audible.Authenticator.from_login(
username=email,
password=password,
locale=marketplace
)
auth_path.parent.mkdir(parents=True, exist_ok=True)
authenticator.to_file(str(auth_path))
print("Authentication successful!")
audible_client = audible.Client(auth=authenticator)
return authenticator, audible_client
except (OSError, ValueError, KeyError) as e:
print(f"Authentication failed: {e}")
sys.exit(1)
if __name__ == "__main__":
auth, client = authenticate()
app = AudituiApp()
app.auth = auth
app.client = client
app.run()