380 lines
12 KiB
Python
380 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""A terminal-based user interface (TUI) client for Audible"""
|
|
|
|
import sys
|
|
from getpass import getpass
|
|
from pathlib import Path
|
|
|
|
import audible
|
|
from textual.app import App, ComposeResult
|
|
from textual.widgets import Footer, Header, DataTable, Static
|
|
from textual.worker import get_current_worker
|
|
from textual import work
|
|
|
|
|
|
class AudituiApp(App):
|
|
"""Main application class for the Audible TUI app."""
|
|
BINDINGS = [
|
|
("d", "toggle_dark", "Toggle dark mode"),
|
|
("s", "sort", "Sort by title"),
|
|
("r", "reverse_sort", "Reverse sort"),
|
|
("p", "sort_by_progress", "Sort by progress"),
|
|
("a", "show_all", "Show all books"),
|
|
("u", "show_unfinished", "Show unfinished"),
|
|
("q", "quit", "Quit application"),
|
|
]
|
|
|
|
CSS = """
|
|
DataTable {
|
|
height: 1fr;
|
|
}
|
|
Static {
|
|
height: 1;
|
|
text-align: center;
|
|
background: $primary;
|
|
}
|
|
"""
|
|
|
|
AUTH_PATH = Path.home() / ".config" / "auditui" / "auth.json"
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.auth = None
|
|
self.client = None
|
|
self.all_items = []
|
|
self.current_items = []
|
|
self.show_all_mode = False
|
|
self.progress_sort_reverse = False
|
|
self.title_column_key = None
|
|
self.progress_column_key = None
|
|
self.progress_column_index = 3
|
|
|
|
def compose(self) -> ComposeResult:
|
|
yield Header()
|
|
yield Static("Loading...", id="status")
|
|
table = DataTable()
|
|
table.zebra_stripes = True
|
|
table.cursor_type = "row"
|
|
yield table
|
|
yield Footer()
|
|
|
|
def on_mount(self) -> None:
|
|
"""Initialize the table and start fetching library data."""
|
|
table = self.query_one(DataTable)
|
|
table.add_columns("Title", "Author", "Length", "Progress")
|
|
column_keys = list(table.columns.keys())
|
|
self.title_column_key = column_keys[0]
|
|
self.progress_column_key = column_keys[3]
|
|
if self.client:
|
|
self.update_status("Fetching library...")
|
|
self.fetch_library()
|
|
else:
|
|
self.update_status(
|
|
"Not authenticated. Please restart and authenticate.")
|
|
|
|
@work(exclusive=True, thread=True)
|
|
def fetch_library(self) -> None:
|
|
"""Fetch all library items from Audible API in background thread."""
|
|
worker = get_current_worker()
|
|
if worker.is_cancelled:
|
|
return
|
|
|
|
try:
|
|
response_groups = (
|
|
"contributors,media,product_attrs,product_desc,product_details,"
|
|
"rating,is_finished,listening_status,percent_complete"
|
|
)
|
|
all_items = self._fetch_all_pages(response_groups)
|
|
self.all_items = all_items
|
|
self.call_from_thread(self.on_library_loaded, all_items)
|
|
except (OSError, ValueError, KeyError) as e:
|
|
self.call_from_thread(self.on_library_error, str(e))
|
|
|
|
def _fetch_all_pages(self, response_groups):
|
|
"""Fetch all pages of library items from the API."""
|
|
all_items = []
|
|
page = 1
|
|
page_size = 50
|
|
|
|
while True:
|
|
library = self.client.get(
|
|
path="library",
|
|
num_results=page_size,
|
|
page=page,
|
|
response_groups=response_groups
|
|
)
|
|
|
|
items = library.get("items", [])
|
|
|
|
if not items:
|
|
break
|
|
|
|
all_items.extend(items)
|
|
self.call_from_thread(
|
|
self.update_status,
|
|
f"Fetched page {page} ({len(items)} items)..."
|
|
)
|
|
|
|
if len(items) < page_size:
|
|
break
|
|
|
|
page += 1
|
|
|
|
return all_items
|
|
|
|
def on_library_loaded(self, items) -> None:
|
|
"""Handle successful library load."""
|
|
self.all_items = items
|
|
self.update_status(f"Loaded {len(items)} books")
|
|
self.show_unfinished()
|
|
|
|
def on_library_error(self, error: str) -> None:
|
|
"""Handle library fetch error."""
|
|
self.update_status(f"Error fetching library: {error}")
|
|
|
|
def update_status(self, message: str) -> None:
|
|
"""Update the status message in the UI."""
|
|
status = self.query_one("#status", Static)
|
|
status.update(message)
|
|
|
|
def format_duration(self, value, unit='minutes', default_none=None):
|
|
"""Format duration value into human-readable string."""
|
|
if value is None or value <= 0:
|
|
return default_none
|
|
|
|
if unit == 'seconds':
|
|
total_minutes = int(value) // 60
|
|
else:
|
|
total_minutes = int(value)
|
|
|
|
if total_minutes < 60:
|
|
return f"{total_minutes} minute{'s' if total_minutes != 1 else ''}"
|
|
|
|
hours = total_minutes // 60
|
|
mins = total_minutes % 60
|
|
if mins == 0:
|
|
return f"{hours} hour{'s' if hours != 1 else ''}"
|
|
return f"{hours} hour{'s' if hours != 1 else ''} {mins} minute{'s' if mins != 1 else ''}"
|
|
|
|
def _extract_title(self, item):
|
|
"""Extract title from library item."""
|
|
product = item.get("product", {})
|
|
return (product.get("title") or
|
|
item.get("title") or
|
|
product.get("asin", "Unknown Title"))
|
|
|
|
def _extract_authors(self, item):
|
|
"""Extract author names from library item."""
|
|
product = item.get("product", {})
|
|
authors = product.get("authors") or product.get("contributors") or []
|
|
if not authors and "authors" in item:
|
|
authors = item.get("authors", [])
|
|
return ", ".join([a.get("name", "") for a in authors if isinstance(a, dict)])
|
|
|
|
def _extract_runtime_minutes(self, item):
|
|
"""Extract runtime in minutes from library item."""
|
|
product = item.get("product", {})
|
|
runtime_fields = [
|
|
"runtime_length_min",
|
|
"runtime_length",
|
|
"vLength",
|
|
"length",
|
|
"duration"
|
|
]
|
|
|
|
runtime = None
|
|
for field in runtime_fields:
|
|
runtime = product.get(field)
|
|
if runtime is None:
|
|
runtime = item.get(field)
|
|
if runtime is not None:
|
|
break
|
|
|
|
if runtime is None:
|
|
return None
|
|
|
|
if isinstance(runtime, dict):
|
|
if "min" in runtime:
|
|
return int(runtime.get("min", 0))
|
|
elif isinstance(runtime, (int, float)):
|
|
return int(runtime)
|
|
|
|
return None
|
|
|
|
def _extract_progress_info(self, item):
|
|
"""Extract progress percentage from library item."""
|
|
percent_complete = item.get("percent_complete")
|
|
listening_status = item.get("listening_status", {})
|
|
|
|
if isinstance(listening_status, dict):
|
|
if percent_complete is None:
|
|
percent_complete = listening_status.get("percent_complete")
|
|
else:
|
|
percent_complete = None
|
|
|
|
return percent_complete
|
|
|
|
def _populate_table(self, items):
|
|
"""Populate the DataTable with library items."""
|
|
table = self.query_one(DataTable)
|
|
table.clear()
|
|
|
|
if not items:
|
|
self.update_status("No books found.")
|
|
return
|
|
|
|
for item in items:
|
|
title = self._extract_title(item)
|
|
author_names = self._extract_authors(item)
|
|
minutes = self._extract_runtime_minutes(item)
|
|
runtime_str = self.format_duration(
|
|
minutes, unit='minutes', default_none="Unknown length")
|
|
percent_complete = self._extract_progress_info(item)
|
|
|
|
progress_str = "0%"
|
|
if percent_complete is not None and percent_complete > 0:
|
|
progress_str = f"{percent_complete:.1f}%"
|
|
|
|
table.add_row(
|
|
title,
|
|
author_names or "Unknown",
|
|
runtime_str or "Unknown",
|
|
progress_str,
|
|
key=title
|
|
)
|
|
|
|
self.current_items = items
|
|
mode = "all" if self.show_all_mode else "unfinished"
|
|
self.update_status(f"Showing {len(items)} books ({mode})")
|
|
|
|
def show_all(self) -> None:
|
|
"""Display all books in the table."""
|
|
if not self.all_items:
|
|
return
|
|
self.show_all_mode = True
|
|
self._populate_table(self.all_items)
|
|
|
|
def show_unfinished(self) -> None:
|
|
"""Display only unfinished books in the table."""
|
|
if not self.all_items:
|
|
return
|
|
self.show_all_mode = False
|
|
|
|
unfinished_items = []
|
|
for item in self.all_items:
|
|
is_finished_flag = item.get("is_finished")
|
|
percent_complete = item.get("percent_complete")
|
|
listening_status = item.get("listening_status")
|
|
|
|
if isinstance(listening_status, dict):
|
|
is_finished_flag = is_finished_flag or listening_status.get(
|
|
"is_finished", False)
|
|
if percent_complete is None:
|
|
percent_complete = listening_status.get(
|
|
"percent_complete", 0)
|
|
|
|
is_finished = False
|
|
if is_finished_flag is True:
|
|
is_finished = True
|
|
elif isinstance(percent_complete, (int, float)) and percent_complete >= 100:
|
|
is_finished = True
|
|
|
|
if not is_finished:
|
|
unfinished_items.append(item)
|
|
|
|
self._populate_table(unfinished_items)
|
|
|
|
def action_toggle_dark(self) -> None:
|
|
"""Toggle between dark and light theme."""
|
|
self.theme = (
|
|
"textual-dark" if self.theme == "textual-light" else "textual-light"
|
|
)
|
|
|
|
def action_sort(self) -> None:
|
|
"""Sort table by title in ascending order."""
|
|
table = self.query_one(DataTable)
|
|
if table.row_count > 0:
|
|
table.sort(self.title_column_key)
|
|
|
|
def action_reverse_sort(self) -> None:
|
|
"""Sort table by title in descending order."""
|
|
table = self.query_one(DataTable)
|
|
if table.row_count > 0:
|
|
table.sort(self.title_column_key, reverse=True)
|
|
|
|
def action_sort_by_progress(self) -> None:
|
|
"""Sort table by progress percentage, toggling direction on each press."""
|
|
table = self.query_one(DataTable)
|
|
if table.row_count > 0:
|
|
self.progress_sort_reverse = not self.progress_sort_reverse
|
|
|
|
def progress_key(row_values):
|
|
progress_cell = row_values[self.progress_column_index]
|
|
if isinstance(progress_cell, str):
|
|
try:
|
|
return float(progress_cell.rstrip("%"))
|
|
except (ValueError, AttributeError):
|
|
return 0.0
|
|
return 0.0
|
|
|
|
table.sort(key=progress_key, reverse=self.progress_sort_reverse)
|
|
|
|
def action_show_all(self) -> None:
|
|
"""Action handler to show all books."""
|
|
self.show_all()
|
|
|
|
def action_show_unfinished(self) -> None:
|
|
"""Action handler to show unfinished books."""
|
|
self.show_unfinished()
|
|
|
|
|
|
def authenticate():
|
|
"""Authenticate with Audible and return auth and client objects."""
|
|
auth_path = Path.home() / ".config" / "auditui" / "auth.json"
|
|
auth_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
authenticator = None
|
|
if auth_path.exists():
|
|
try:
|
|
authenticator = audible.Authenticator.from_file(str(auth_path))
|
|
audible_client = audible.Client(auth=authenticator)
|
|
return authenticator, audible_client
|
|
except (OSError, ValueError, KeyError) as e:
|
|
print(f"Failed to load existing auth: {e}")
|
|
print("Please re-authenticate.")
|
|
|
|
print("Please authenticate with your Audible account.")
|
|
print("You will need to provide:")
|
|
print(" - Your Audible email/username")
|
|
print(" - Your password")
|
|
print(" - Your marketplace locale (e.g., 'US', 'UK', 'DE', 'FR')")
|
|
|
|
email = input("\nEmail: ")
|
|
password = getpass("Password: ")
|
|
marketplace = input(
|
|
"Marketplace locale (default: US): ").strip().upper() or "US"
|
|
|
|
try:
|
|
authenticator = audible.Authenticator.from_login(
|
|
username=email,
|
|
password=password,
|
|
locale=marketplace
|
|
)
|
|
|
|
auth_path.parent.mkdir(parents=True, exist_ok=True)
|
|
authenticator.to_file(str(auth_path))
|
|
print("Authentication successful!")
|
|
audible_client = audible.Client(auth=authenticator)
|
|
return authenticator, audible_client
|
|
except (OSError, ValueError, KeyError) as e:
|
|
print(f"Authentication failed: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
auth, client = authenticate()
|
|
app = AudituiApp()
|
|
app.auth = auth
|
|
app.client = client
|
|
app.run()
|