Files
auditui/auditui/ui.py

573 lines
20 KiB
Python

"""UI components for the Auditui application."""
import json
from datetime import date, datetime
from typing import Any, Callable, Protocol, TYPE_CHECKING, cast
from textual.app import ComposeResult
from textual.containers import Container, Horizontal, Vertical
from textual.screen import ModalScreen
from textual.timer import Timer
from textual.widgets import Input, Label, ListItem, ListView, Static
from .constants import AUTH_PATH, CONFIG_PATH
if TYPE_CHECKING:
from textual.binding import Binding
class _AppContext(Protocol):
BINDINGS: list[tuple[str, str, str]]
client: Any
auth: Any
library_client: Any
all_items: list[dict]
KEY_DISPLAY_MAP = {
"ctrl+": "^",
"left": "",
"right": "",
"up": "",
"down": "",
"space": "Space",
"enter": "Enter",
}
KEY_COLOR = "#f9e2af"
DESC_COLOR = "#cdd6f4"
class AppContextMixin(ModalScreen):
"""Mixin to provide a typed app accessor."""
def _app(self) -> _AppContext:
return cast(_AppContext, self.app)
class HelpScreen(AppContextMixin, ModalScreen):
"""Help screen displaying all available keybindings."""
BINDINGS = [("escape", "dismiss", "Close"), ("?", "dismiss", "Close")]
@staticmethod
def _format_key_display(key: str) -> str:
"""Format a key string for display with symbols."""
result = key
for old, new in KEY_DISPLAY_MAP.items():
result = result.replace(old, new)
return result
@staticmethod
def _parse_binding(binding: "Binding | tuple[str, str, str]") -> tuple[str, str]:
"""Extract key and description from a binding."""
if isinstance(binding, tuple):
return binding[0], binding[2]
return binding.key, binding.description
def _make_item(self, binding: "Binding | tuple[str, str, str]") -> ListItem:
"""Create a ListItem for a single binding."""
key, description = self._parse_binding(binding)
key_display = self._format_key_display(key)
text = f"[bold {KEY_COLOR}]{key_display:>12}[/] [{DESC_COLOR}]{description}[/]"
return ListItem(Label(text))
def compose(self) -> ComposeResult:
app = self._app()
bindings = list(app.BINDINGS)
mid = (len(bindings) + 1) // 2
with Container(id="help_container"):
yield Static("Key Bindings", id="help_title")
with Horizontal(id="help_content"):
yield ListView(
*[self._make_item(b) for b in bindings[:mid]],
classes="help_list",
)
yield ListView(
*[self._make_item(b) for b in bindings[mid:]],
classes="help_list",
)
yield Static(
f"Press [bold {KEY_COLOR}]?[/] or [bold {KEY_COLOR}]Escape[/] to close",
id="help_footer",
)
async def action_dismiss(self, result: Any | None = None) -> None:
await self.dismiss(result)
class StatsScreen(AppContextMixin, ModalScreen):
"""Stats screen displaying listening statistics."""
BINDINGS = [("escape", "dismiss", "Close"), ("s", "dismiss", "Close")]
def _format_time(self, milliseconds: int) -> str:
"""Format milliseconds as hours and minutes."""
total_seconds = int(milliseconds) // 1000
hours, remainder = divmod(total_seconds, 3600)
minutes, _ = divmod(remainder, 60)
if hours > 0:
return f"{hours}h{minutes:02d}"
return f"{minutes}m"
def _format_date(self, date_str: str | None) -> str:
"""Format ISO date string for display."""
if not date_str:
return "Unknown"
try:
dt = datetime.fromisoformat(date_str.replace("Z", "+00:00"))
return dt.strftime("%Y-%m-%d")
except ValueError:
return date_str
def _get_signup_year(self) -> int:
"""Get signup year using binary search on listening activity."""
app = self._app()
if not app.client:
return 0
current_year = date.today().year
try:
stats = app.client.get(
"1.0/stats/aggregates",
monthly_listening_interval_duration="12",
monthly_listening_interval_start_date=f"{current_year}-01",
store="Audible",
)
if not self._has_activity(stats):
return 0
except Exception:
return 0
left, right = 1995, current_year
earliest_year = current_year
while left <= right:
middle = (left + right) // 2
try:
stats = app.client.get(
"1.0/stats/aggregates",
monthly_listening_interval_duration="12",
monthly_listening_interval_start_date=f"{middle}-01",
store="Audible",
)
has_activity = self._has_activity(stats)
except Exception:
has_activity = False
if has_activity:
earliest_year = middle
right = middle - 1
else:
left = middle + 1
return earliest_year
@staticmethod
def _has_activity(stats: dict) -> bool:
"""Check if stats contain any listening activity."""
monthly_stats = stats.get("aggregated_monthly_listening_stats", [])
return bool(
monthly_stats and any(s.get("aggregated_sum", 0)
> 0 for s in monthly_stats)
)
def _get_listening_time(self, duration: int, start_date: str) -> int:
"""Get listening time in milliseconds for a given period."""
app = self._app()
if not app.client:
return 0
try:
stats = app.client.get(
"1.0/stats/aggregates",
monthly_listening_interval_duration=str(duration),
monthly_listening_interval_start_date=start_date,
store="Audible",
)
monthly_stats = stats.get("aggregated_monthly_listening_stats", [])
return sum(s.get("aggregated_sum", 0) for s in monthly_stats)
except Exception:
return 0
def _get_finished_books_count(self) -> int:
"""Get count of finished books from library."""
app = self._app()
if not app.library_client or not app.all_items:
return 0
return sum(
1 for item in app.all_items if app.library_client.is_finished(item)
)
def _get_account_info(self) -> dict:
"""Get account information including subscription details."""
app = self._app()
if not app.client:
return {}
account_info = {}
endpoints = [
(
"1.0/account/information",
"subscription_details,plan_summary,subscription_details_payment_instrument,delinquency_status,customer_benefits,customer_segments,directed_ids",
),
(
"1.0/customer/information",
"subscription_details_premium,subscription_details_rodizio,customer_segment,subscription_details_channels,migration_details",
),
(
"1.0/customer/status",
"benefits_status,member_giving_status,prime_benefits_status,prospect_benefits_status",
),
]
for endpoint, response_groups in endpoints:
try:
response = app.client.get(
endpoint, response_groups=response_groups)
account_info.update(response)
except Exception:
pass
return account_info
def _get_email(self) -> str:
"""Get email from auth, config, or API."""
app = self._app()
for getter in (
self._get_email_from_auth,
self._get_email_from_config,
self._get_email_from_auth_file,
self._get_email_from_account_info,
):
email = getter(app)
if email:
return email
auth_data: dict[str, Any] | None = None
if app.auth:
try:
auth_data = getattr(app.auth, "data", None)
except Exception:
auth_data = None
account_info = self._get_account_info() if app.client else None
for candidate in (auth_data, account_info):
email = self._find_email_in_data(candidate)
if email:
return email
return "Unknown"
def _get_email_from_auth(self, app: _AppContext) -> str | None:
"""Extract email from the authenticator if available."""
if not app.auth:
return None
try:
email = self._first_email(
getattr(app.auth, "username", None),
getattr(app.auth, "login", None),
getattr(app.auth, "email", None),
)
if email:
return email
except Exception:
return None
try:
customer_info = getattr(app.auth, "customer_info", None)
if isinstance(customer_info, dict):
email = self._first_email(
customer_info.get("email"),
customer_info.get("email_address"),
customer_info.get("primary_email"),
)
if email:
return email
except Exception:
return None
try:
data = getattr(app.auth, "data", None)
if isinstance(data, dict):
return self._first_email(
data.get("username"),
data.get("email"),
data.get("login"),
data.get("user_email"),
)
except Exception:
return None
return None
def _get_email_from_config(self, app: _AppContext) -> str | None:
"""Extract email from the config file."""
try:
if CONFIG_PATH.exists():
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
config = json.load(f)
return self._first_email(
config.get("email"),
config.get("username"),
config.get("login"),
)
except Exception:
return None
return None
def _get_email_from_auth_file(self, app: _AppContext) -> str | None:
"""Extract email from the auth file."""
try:
if AUTH_PATH.exists():
with open(AUTH_PATH, "r", encoding="utf-8") as f:
auth_file_data = json.load(f)
return self._first_email(
auth_file_data.get("username"),
auth_file_data.get("email"),
auth_file_data.get("login"),
auth_file_data.get("user_email"),
)
except Exception:
return None
return None
def _get_email_from_account_info(self, app: _AppContext) -> str | None:
"""Extract email from the account info API."""
if not app.client:
return None
try:
account_info = self._get_account_info()
if account_info:
email = self._first_email(
account_info.get("email"),
account_info.get("customer_email"),
account_info.get("username"),
)
if email:
return email
customer_info = account_info.get("customer_info", {})
if isinstance(customer_info, dict):
return self._first_email(
customer_info.get("email"),
customer_info.get("email_address"),
customer_info.get("primary_email"),
)
except Exception:
return None
return None
def _first_email(self, *values: str | None) -> str | None:
"""Return the first non-empty, non-Unknown email value."""
for value in values:
if value and value != "Unknown":
return value
return None
def _find_email_in_data(self, data: Any) -> str | None:
"""Search nested data for an email-like value."""
if data is None:
return None
stack: list[Any] = [data]
while stack:
current = stack.pop()
if isinstance(current, dict):
stack.extend(current.values())
elif isinstance(current, list):
stack.extend(current)
elif isinstance(current, str):
if "@" in current:
local, _, domain = current.partition("@")
if local and "." in domain:
return current
return None
def _get_subscription_details(self, account_info: dict) -> dict:
"""Extract subscription details from nested API response."""
paths = [
["customer_details", "subscription", "subscription_details"],
["customer", "customer_details", "subscription", "subscription_details"],
["subscription_details"],
["subscription", "subscription_details"],
]
for path in paths:
data: Any = account_info
for key in path:
if isinstance(data, dict):
data = data.get(key)
else:
break
if isinstance(data, list) and data:
return data[0]
return {}
def _get_country(self) -> str:
"""Get country from authenticator locale."""
app = self._app()
if not app.auth:
return "Unknown"
try:
locale_obj = getattr(app.auth, "locale", None)
if not locale_obj:
return "Unknown"
if hasattr(locale_obj, "country_code"):
return locale_obj.country_code.upper()
if hasattr(locale_obj, "domain"):
return locale_obj.domain.upper()
if isinstance(locale_obj, str):
return locale_obj.split("_")[-1].upper() if "_" in locale_obj else locale_obj.upper()
return str(locale_obj)
except Exception:
return "Unknown"
def _make_stat_item(self, label: str, value: str) -> ListItem:
"""Create a ListItem for a stat."""
text = f"[bold {KEY_COLOR}]{label:>16}[/] [{DESC_COLOR}]{value:<25}[/]"
return ListItem(Label(text))
def compose(self) -> ComposeResult:
app = self._app()
if not app.client:
with Container(id="help_container"):
yield Static("Statistics", id="help_title")
yield Static(
"Not authenticated. Please restart and authenticate.",
classes="help_row",
)
yield Static(
f"Press [bold {KEY_COLOR}]s[/] or [bold {KEY_COLOR}]Escape[/] to close",
id="help_footer",
)
return
today = date.today()
stats_items = self._build_stats_items(today)
with Container(id="help_container"):
yield Static("Statistics", id="help_title")
with Vertical(id="help_content"):
yield ListView(
*[self._make_stat_item(label, value)
for label, value in stats_items],
classes="help_list",
)
yield Static(
f"Press [bold {KEY_COLOR}]s[/] or [bold {KEY_COLOR}]Escape[/] to close",
id="help_footer",
)
def _build_stats_items(self, today: date) -> list[tuple[str, str]]:
"""Build the list of stats items to display."""
signup_year = self._get_signup_year()
month_time = self._get_listening_time(1, today.strftime("%Y-%m"))
year_time = self._get_listening_time(12, today.strftime("%Y-01"))
finished_count = self._get_finished_books_count()
app = self._app()
total_books = len(app.all_items) if app.all_items else 0
email = self._get_email()
country = self._get_country()
subscription_name = "Unknown"
subscription_price = "Unknown"
next_bill_date = "Unknown"
account_info = self._get_account_info()
if account_info:
subscription_data = self._get_subscription_details(account_info)
if subscription_data:
if name := subscription_data.get("name"):
subscription_name = name
if bill_date := subscription_data.get("next_bill_date"):
next_bill_date = self._format_date(bill_date)
if bill_amount := subscription_data.get("next_bill_amount", {}):
amount = bill_amount.get("currency_value")
currency = bill_amount.get("currency_code", "EUR")
if amount is not None:
subscription_price = f"{amount} {currency}"
stats_items = []
if email != "Unknown":
stats_items.append(("Email", email))
stats_items.append(("Country Store", country))
stats_items.append(("Signup Year", str(signup_year)
if signup_year > 0 else "Unknown"))
if next_bill_date != "Unknown":
stats_items.append(("Next Credit", next_bill_date))
stats_items.append(("Next Bill", next_bill_date))
if subscription_name != "Unknown":
stats_items.append(("Subscription", subscription_name))
if subscription_price != "Unknown":
stats_items.append(("Price", subscription_price))
stats_items.append(("This Month", self._format_time(month_time)))
stats_items.append(("This Year", self._format_time(year_time)))
stats_items.append(
("Books Finished", f"{finished_count} / {total_books}"))
return stats_items
async def action_dismiss(self, result: Any | None = None) -> None:
await self.dismiss(result)
class FilterScreen(ModalScreen[str]):
"""Filter screen for searching the library."""
BINDINGS = [("escape", "cancel", "Cancel")]
def __init__(
self,
initial_filter: str = "",
on_change: Callable[[str], None] | None = None,
debounce_seconds: float = 0.2,
) -> None:
super().__init__()
self._initial_filter = initial_filter
self._on_change = on_change
self._debounce_seconds = debounce_seconds
self._debounce_timer: Timer | None = None
def compose(self) -> ComposeResult:
with Container(id="filter_container"):
yield Static("Filter Library", id="filter_title")
yield Input(
value=self._initial_filter,
placeholder="Type to filter by title or author...",
id="filter_input",
)
yield Static(
f"Press [bold {KEY_COLOR}]Enter[/] to apply, "
f"[bold {KEY_COLOR}]Escape[/] to clear",
id="filter_footer",
)
def on_mount(self) -> None:
self.query_one("#filter_input", Input).focus()
def on_input_submitted(self, event: Input.Submitted) -> None:
self.dismiss(event.value)
def on_input_changed(self, event: Input.Changed) -> None:
if not self._on_change:
return
if self._debounce_timer:
self._debounce_timer.stop()
value = event.value
self._debounce_timer = self.set_timer(
self._debounce_seconds,
lambda: self._on_change(value),
)
def action_cancel(self) -> None:
self.dismiss("")
def on_unmount(self) -> None:
if self._debounce_timer:
self._debounce_timer.stop()