Compare commits

...

5 Commits

5 changed files with 337 additions and 13 deletions

View File

@@ -42,6 +42,7 @@ Please also note that as of now, you need to have [ffmpeg](https://ffmpeg.org/)
| `down` | Decrease playback speed |
| `f` | Mark as finished/unfinished |
| `d` | Download/delete from cache |
| `s` | Show stats screen |
| `q` | Quit the application |
## Roadmap
@@ -60,7 +61,7 @@ Please also note that as of now, you need to have [ffmpeg](https://ffmpeg.org/)
- [x] increase/decrease reading speed
- [x] mark a book as finished or unfinished
- [x] make ui responsive
- [ ] get your stats in a separated pane
- [x] get your stats in a separated pane
- [ ] filter books on views
- [ ] search in your book library
- [ ] search the marketplace for books

View File

@@ -27,7 +27,7 @@ from .table_utils import (
filter_unfinished_items,
format_item_as_row,
)
from .ui import HelpScreen
from .ui import HelpScreen, StatsScreen
if TYPE_CHECKING:
from textual.widgets._data_table import ColumnKey
@@ -41,6 +41,7 @@ class Auditui(App):
BINDINGS = [
("?", "show_help", "Help"),
("s", "show_stats", "Stats"),
("n", "sort", "Sort by name"),
("p", "sort_by_progress", "Sort by progress"),
("a", "show_all", "All/Unfinished"),
@@ -417,6 +418,10 @@ class Auditui(App):
"""Show the help screen with all keybindings."""
self.push_screen(HelpScreen())
def action_show_stats(self) -> None:
"""Show the stats screen with listening statistics."""
self.push_screen(StatsScreen())
def _check_playback_status(self) -> None:
"""Check if playback process has finished and update state accordingly."""
message = self.playback.check_status()

View File

@@ -1,11 +1,12 @@
"""Configuration helpers for the Auditui app."""
import json
from getpass import getpass
from pathlib import Path
import audible
from .constants import AUTH_PATH
from .constants import AUTH_PATH, CONFIG_PATH
def configure(
@@ -33,6 +34,11 @@ def configure(
auth_path.parent.mkdir(parents=True, exist_ok=True)
authenticator.to_file(str(auth_path))
config = {"email": email}
with open(CONFIG_PATH, "w", encoding="utf-8") as f:
json.dump(config, f)
print("Authentication successful!")
audible_client = audible.Client(auth=authenticator)
return authenticator, audible_client

View File

@@ -3,6 +3,7 @@
from pathlib import Path
AUTH_PATH = Path.home() / ".config" / "auditui" / "auth.json"
CONFIG_PATH = Path.home() / ".config" / "auditui" / "config.json"
CACHE_DIR = Path.home() / ".cache" / "auditui" / "books"
DOWNLOAD_URL = "https://cde-ta-g7g.amazon.com/FionaCDEServiceEngine/FSDownloadContent"
DEFAULT_CODEC = "LC_128_44100_stereo"
@@ -132,6 +133,45 @@ HelpScreen Static {
background: transparent;
}
StatsScreen {
align: center middle;
background: rgba(0, 0, 0, 0.7);
}
StatsScreen Static {
background: transparent;
}
StatsScreen #help_container {
width: auto;
min-width: 55;
max-width: 70;
}
StatsScreen #help_content {
align: center middle;
width: 100%;
}
StatsScreen .help_list {
width: 100%;
}
StatsScreen .help_list > ListItem {
background: transparent;
height: 1;
}
StatsScreen .help_list > ListItem:hover {
background: #232842;
}
StatsScreen .help_list > ListItem > Label {
width: 100%;
text-align: left;
padding-left: 2;
}
#help_container {
width: 88%;
max-width: 120;

View File

@@ -1,10 +1,15 @@
"""UI components for the Auditui application."""
import json
from datetime import date, datetime
from textual.app import ComposeResult
from textual.containers import Container, Horizontal
from textual.containers import Container, Horizontal, Vertical
from textual.screen import ModalScreen
from textual.widgets import Label, ListItem, ListView, Static
from .constants import CONFIG_PATH
KEY_DISPLAY_MAP = {
"ctrl+": "^",
@@ -37,11 +42,8 @@ class HelpScreen(ModalScreen):
def _parse_binding(binding: tuple | object) -> tuple[str, str]:
"""Extract key and description from a binding."""
if isinstance(binding, tuple):
key, _, description = binding
else:
key = binding.key
description = binding.description
return key, description
return binding[0], binding[2]
return binding.key, binding.description
def _make_item(self, binding: tuple | object) -> ListItem:
"""Create a ListItem for a single binding."""
@@ -53,18 +55,16 @@ class HelpScreen(ModalScreen):
def compose(self) -> ComposeResult:
bindings = list(self.app.BINDINGS)
mid = (len(bindings) + 1) // 2
left_bindings = bindings[:mid]
right_bindings = bindings[mid:]
with Container(id="help_container"):
yield Static("Key Bindings", id="help_title")
with Horizontal(id="help_content"):
yield ListView(
*[self._make_item(b) for b in left_bindings],
*[self._make_item(b) for b in bindings[:mid]],
classes="help_list",
)
yield ListView(
*[self._make_item(b) for b in right_bindings],
*[self._make_item(b) for b in bindings[mid:]],
classes="help_list",
)
yield Static(
@@ -74,3 +74,275 @@ class HelpScreen(ModalScreen):
def action_dismiss(self) -> None:
self.dismiss()
class StatsScreen(ModalScreen):
"""Stats screen displaying listening statistics."""
BINDINGS = [("escape", "dismiss", "Close"), ("s", "dismiss", "Close")]
def _format_time(self, milliseconds: int) -> str:
"""Format milliseconds as hours and minutes."""
total_seconds = int(milliseconds) // 1000
hours, remainder = divmod(total_seconds, 3600)
minutes, _ = divmod(remainder, 60)
if hours > 0:
return f"{hours}h{minutes:02d}"
return f"{minutes}m"
def _format_date(self, date_str: str | None) -> str:
"""Format ISO date string for display."""
if not date_str:
return "Unknown"
try:
dt = datetime.fromisoformat(date_str.replace("Z", "+00:00"))
return dt.strftime("%Y-%m-%d")
except ValueError:
return date_str
def _get_signup_year(self) -> int:
"""Get signup year using binary search on listening activity."""
if not self.app.client:
return 0
current_year = date.today().year
try:
stats = self.app.client.get(
"1.0/stats/aggregates",
monthly_listening_interval_duration="12",
monthly_listening_interval_start_date=f"{current_year}-01",
store="Audible",
)
if not self._has_activity(stats):
return 0
except Exception:
return 0
left, right = 1995, current_year
earliest_year = current_year
while left <= right:
middle = (left + right) // 2
try:
stats = self.app.client.get(
"1.0/stats/aggregates",
monthly_listening_interval_duration="12",
monthly_listening_interval_start_date=f"{middle}-01",
store="Audible",
)
has_activity = self._has_activity(stats)
except Exception:
has_activity = False
if has_activity:
earliest_year = middle
right = middle - 1
else:
left = middle + 1
return earliest_year
@staticmethod
def _has_activity(stats: dict) -> bool:
"""Check if stats contain any listening activity."""
monthly_stats = stats.get("aggregated_monthly_listening_stats", [])
return bool(
monthly_stats and any(s.get("aggregated_sum", 0)
> 0 for s in monthly_stats)
)
def _get_listening_time(self, duration: int, start_date: str) -> int:
"""Get listening time in milliseconds for a given period."""
if not self.app.client:
return 0
try:
stats = self.app.client.get(
"1.0/stats/aggregates",
monthly_listening_interval_duration=str(duration),
monthly_listening_interval_start_date=start_date,
store="Audible",
)
monthly_stats = stats.get("aggregated_monthly_listening_stats", [])
return sum(s.get("aggregated_sum", 0) for s in monthly_stats)
except Exception:
return 0
def _get_finished_books_count(self) -> int:
"""Get count of finished books from library."""
if not self.app.library_client or not self.app.all_items:
return 0
return sum(
1 for item in self.app.all_items if self.app.library_client.is_finished(item)
)
def _get_account_info(self) -> dict:
"""Get account information including subscription details."""
if not self.app.client:
return {}
account_info = {}
endpoints = [
(
"1.0/account/information",
"subscription_details,plan_summary,subscription_details_payment_instrument,delinquency_status,customer_benefits,customer_segments,directed_ids",
),
(
"1.0/customer/information",
"subscription_details_premium,subscription_details_rodizio,customer_segment,subscription_details_channels,migration_details",
),
(
"1.0/customer/status",
"benefits_status,member_giving_status,prime_benefits_status,prospect_benefits_status",
),
]
for endpoint, response_groups in endpoints:
try:
response = self.app.client.get(
endpoint, response_groups=response_groups)
account_info.update(response)
except Exception:
pass
return account_info
def _get_email(self) -> str:
"""Get email from config file."""
try:
if CONFIG_PATH.exists():
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
config = json.load(f)
return config.get("email", "Unknown")
except Exception:
pass
return "Unknown"
def _get_subscription_details(self, account_info: dict) -> dict:
"""Extract subscription details from nested API response."""
paths = [
["customer_details", "subscription", "subscription_details"],
["customer", "customer_details", "subscription", "subscription_details"],
["subscription_details"],
["subscription", "subscription_details"],
]
for path in paths:
data = account_info
for key in path:
if isinstance(data, dict):
data = data.get(key)
else:
break
if isinstance(data, list) and data:
return data[0]
return {}
def _get_country(self) -> str:
"""Get country from authenticator locale."""
if not self.app.auth:
return "Unknown"
try:
locale_obj = getattr(self.app.auth, "locale", None)
if not locale_obj:
return "Unknown"
if hasattr(locale_obj, "country_code"):
return locale_obj.country_code.upper()
if hasattr(locale_obj, "domain"):
return locale_obj.domain.upper()
if isinstance(locale_obj, str):
return locale_obj.split("_")[-1].upper() if "_" in locale_obj else locale_obj.upper()
return str(locale_obj)
except Exception:
return "Unknown"
def _make_stat_item(self, label: str, value: str) -> ListItem:
"""Create a ListItem for a stat."""
text = f"[bold {KEY_COLOR}]{label:>16}[/] [{DESC_COLOR}]{value:<25}[/]"
return ListItem(Label(text))
def compose(self) -> ComposeResult:
if not self.app.client:
with Container(id="help_container"):
yield Static("Statistics", id="help_title")
yield Static(
"Not authenticated. Please restart and authenticate.",
classes="help_row",
)
yield Static(
f"Press [bold {KEY_COLOR}]s[/] or [bold {KEY_COLOR}]Escape[/] to close",
id="help_footer",
)
return
today = date.today()
stats_items = self._build_stats_items(today)
with Container(id="help_container"):
yield Static("Statistics", id="help_title")
with Vertical(id="help_content"):
yield ListView(
*[self._make_stat_item(label, value)
for label, value in stats_items],
classes="help_list",
)
yield Static(
f"Press [bold {KEY_COLOR}]s[/] or [bold {KEY_COLOR}]Escape[/] to close",
id="help_footer",
)
def _build_stats_items(self, today: date) -> list[tuple[str, str]]:
"""Build the list of stats items to display."""
signup_year = self._get_signup_year()
month_time = self._get_listening_time(1, today.strftime("%Y-%m"))
year_time = self._get_listening_time(12, today.strftime("%Y-01"))
finished_count = self._get_finished_books_count()
total_books = len(self.app.all_items) if self.app.all_items else 0
email = self._get_email()
country = self._get_country()
subscription_name = "Unknown"
subscription_price = "Unknown"
next_bill_date = "Unknown"
account_info = self._get_account_info()
if account_info:
subscription_data = self._get_subscription_details(account_info)
if subscription_data:
if name := subscription_data.get("name"):
subscription_name = name
if bill_date := subscription_data.get("next_bill_date"):
next_bill_date = self._format_date(bill_date)
if bill_amount := subscription_data.get("next_bill_amount", {}):
amount = bill_amount.get("currency_value")
currency = bill_amount.get("currency_code", "EUR")
if amount is not None:
subscription_price = f"{amount} {currency}"
stats_items = []
if email != "Unknown":
stats_items.append(("Email", email))
stats_items.append(("Country Store", country))
stats_items.append(("Signup Year", str(signup_year)
if signup_year > 0 else "Unknown"))
if next_bill_date != "Unknown":
stats_items.append(("Next Credit", next_bill_date))
stats_items.append(("Next Bill", next_bill_date))
if subscription_name != "Unknown":
stats_items.append(("Subscription", subscription_name))
if subscription_price != "Unknown":
stats_items.append(("Price", subscription_price))
stats_items.append(("This Month", self._format_time(month_time)))
stats_items.append(("This Year", self._format_time(year_time)))
stats_items.append(
("Books Finished", f"{finished_count} / {total_books}"))
return stats_items
def action_dismiss(self) -> None:
self.dismiss()