Compare commits

...

85 Commits

Author SHA1 Message Date
553f5cb4f7 build: add script entrypoint 2025-12-20 22:53:00 +01:00
32b37a0834 docs: update readme 2025-12-20 22:52:30 +01:00
a2d2c7ce3a refactor: move main cli into package 2025-12-20 22:52:27 +01:00
4741080284 clean: shorter messages 2025-12-16 06:24:36 +01:00
737147b457 clean: remove unused import 2025-12-16 06:21:27 +01:00
123d35068f refactor: use ui.py and remove unused imports 2025-12-16 06:21:20 +01:00
258aabe10f refactor: future-proof ui components in ui.py 2025-12-16 06:21:09 +01:00
bc070c4162 feat: relooking of help screen 2025-12-16 06:02:33 +01:00
cbf6bff779 feat: help screen now is scrollable and looks better 2025-12-16 06:02:22 +01:00
080c731fd7 feat: add css for new help screen 2025-12-16 03:35:46 +01:00
1b6f1ff1f2 feat: add a help screen with all keybindings 2025-12-16 03:35:33 +01:00
aa5998c3e3 docs: update roadmap and main description 2025-12-16 03:35:16 +01:00
c65e949731 feat: improve margin 2025-12-16 03:25:12 +01:00
ab51e5506e feat: hide useless palette 2025-12-16 03:25:02 +01:00
3701b37f4c docs: update roadmap 2025-12-16 03:10:32 +01:00
1474302d7e feat: add downloaded status indicator to table rows 2025-12-16 03:10:13 +01:00
eeecaaf42e feat: add cache-related method to get, remove or check 2025-12-16 03:09:26 +01:00
f359dee194 feat: add a "downloaded" column in the UI 2025-12-16 03:09:06 +01:00
1e2655670d feat: add a toggle to download/remove a book from cache 2025-12-16 03:08:56 +01:00
cf6164c438 docs: update keybindings 2025-12-16 02:59:02 +01:00
46fa15fcfe clean: remove dark/light toggle 2025-12-16 02:58:57 +01:00
4b457452d4 refactor: move table-related utilities to table_utils.py 2025-12-16 02:55:15 +01:00
0de0286992 fix: docstring 2025-12-16 02:50:42 +01:00
391b0360bd docs: update definition of roadmap item 2025-12-16 02:26:31 +01:00
b0dc15a018 refactor: table_helpers to table_utils 2025-12-16 01:55:18 +01:00
a6d74265ed docs: update roadmap 2025-12-16 01:47:03 +01:00
4f49a081c9 clean: keep one-line docstring for consistency 2025-12-15 21:06:55 +01:00
3a19db2cf0 refactor: extract table sorting logic and media info loading to new modules 2025-12-15 21:03:52 +01:00
fcb1524806 feat: standardize error handling patterns 2025-12-15 21:01:09 +01:00
18ffae7ac8 refactor: use unified time formatting method 2025-12-15 20:59:36 +01:00
d71c751bbc clean: remove obsolete private method 2025-12-15 20:59:26 +01:00
234b65c9d8 refactor: consolidate time formatting 2025-12-15 20:59:15 +01:00
2d9970c198 refactor: move constants to constants.py 2025-12-15 20:57:30 +01:00
5e3b33570d feat: add last position retrieval and save functionality 2025-12-15 13:25:05 +01:00
2ced756cc0 docs: update roadmap 2025-12-15 13:24:12 +01:00
1c4017ae0c feat: resume from last position and auto-save playback position 2025-12-15 13:23:53 +01:00
251a7a26d5 format: pep8 2025-12-15 13:23:13 +01:00
6462c83a21 feat: save position 2025-12-15 13:22:43 +01:00
0c590cfa82 feat: get current month listening time using API 2025-12-15 12:25:21 +01:00
16395981dc fix: handle accented characters correctly in title sorting 2025-12-15 12:14:33 +01:00
30f0612bb5 docs: update readme with bindings 2025-12-15 07:52:41 +01:00
1aaff3b3b7 fix: correct binding according to name 2025-12-15 07:50:11 +01:00
986541f0d3 fix: binding description 2025-12-15 07:48:07 +01:00
151d565f36 feat: remove a redundant toggle, pressing p twice sort/reverse sort by progress 2025-12-15 07:46:54 +01:00
7e2b657cfc feat: remove a redundant toggle, pressing s twice sort/reverse sort 2025-12-15 07:46:03 +01:00
cef5e40347 fix: split clients, add surface error and follow redirects on downloads 2025-12-14 09:49:03 +01:00
839394343e feat: make chunk size configurable 2025-12-14 09:37:47 +01:00
84868c4afa feat: add default chunk size 2025-12-14 09:37:28 +01:00
03988f0988 fix:l close download_manager flux on app exit 2025-12-14 09:35:25 +01:00
9eba702a0a feat: reuse connections for better performance 2025-12-14 09:35:05 +01:00
f61f4ec55e build: update uv.lock 2025-12-14 09:34:53 +01:00
b45ff86061 build: add httpx dependency 2025-12-14 09:34:47 +01:00
6824d00088 feat: add url validation before trying to download 2025-12-14 09:32:29 +01:00
46c66e0d5c docs: update readme 2025-12-14 09:28:44 +01:00
d4e73e6a13 feat: implement goto previous/next chapter 2025-12-14 09:28:05 +01:00
b2dd430ac9 feat: methods to goto next/previous chapter 2025-12-14 09:27:56 +01:00
ce0d313187 clean: just handle auth as configure handles the rest of the flow 2025-12-14 09:11:55 +01:00
7fee7e56cf feat: use configuration flow if not existing/not correct 2025-12-14 09:11:12 +01:00
58661641d1 feat: add a configuration flow 2025-12-14 09:10:56 +01:00
95f30954b5 docs: update readme 2025-12-10 10:16:38 +01:00
d96a08935c docs: two more done ! \o/ 2025-12-09 19:51:31 +01:00
0ce45c26b7 feat: add the possibility to move forward/backward 30s with left/right 2025-12-09 19:50:43 +01:00
8b74c0f773 feat: progress bar + move (for|back)ward 30s 2025-12-09 19:50:24 +01:00
4a5e475f27 feat: add a progress bar 2025-12-09 19:50:08 +01:00
44d4f28ceb docs: one more done 2025-12-09 10:48:00 +01:00
1d6033f057 docs: update readme 2025-12-09 10:47:48 +01:00
5fe10a1636 feat: print chapter and progress in the footer of the app while a book is playing 2025-12-09 10:47:38 +01:00
1af3be37ce fix: unused import 2025-12-08 07:35:43 +01:00
c3dfa239fa fix: solve some mypy errors 2025-12-08 07:35:09 +01:00
42e6a1e029 fix: truncate author names if too long to not break UI 2025-12-07 21:37:43 +01:00
41f5183653 feat: optimize format_duration 2025-12-07 21:34:16 +01:00
1a1fee0984 refactor: extract playback orchestration and optimize code structure 2025-12-07 20:31:37 +01:00
ddb7cab39e refactor: delegate playback orchestration to PlaybackController 2025-12-07 20:30:59 +01:00
2d331288dd feat: make authentication less verbose 2025-12-07 14:01:37 +01:00
d1a6fda863 docs: update readme 2025-12-07 11:45:12 +01:00
2d10922a7c feat: create a catppuccin mocha theme 2025-12-07 11:45:03 +01:00
0ad4db95c5 docs: update readme after massive refactor 2025-12-07 00:12:17 +01:00
0d9d65088b feat: add __init__ 2025-12-07 00:09:16 +01:00
3b9d1ecf96 feat: add app submodule 2025-12-07 00:09:07 +01:00
27f9a5396e feat: add auth submodule 2025-12-07 00:08:52 +01:00
d3be27c70d feat: add constants 2025-12-07 00:08:46 +01:00
df2ae17721 feat: download module 2025-12-07 00:08:41 +01:00
a0edab8e32 feat: add library module 2025-12-07 00:08:38 +01:00
ddb1704cb0 feat: add playback module 2025-12-07 00:08:33 +01:00
53284d7c0a refactor: do a bit a code architecture 2025-12-07 00:08:28 +01:00
17 changed files with 1955 additions and 751 deletions

View File

@@ -1,23 +1,12 @@
# auditui
A terminal-based user interface (TUI) client for Audible, written in Python 3.
A terminal-based user interface (TUI) client for Audible, written in Python 3 : listen to your audiobooks (even offline), browse and manage your library, and more!
Listen to your audiobooks or podcasts, browse your library, and more.
## What it does and where are we
`main.py` offers a TUI interface for browsing your Audible library, listing your books with progress information. You can sort by progress or title, show all books, or show only unfinished books which is the default.
You can also play a book by pressing `Enter` on a book in the list. You can pause/resume the playback by pressing `Space`.
Currently, the only available theme is Catppuccin Mocha, following their [style guide](https://github.com/catppuccin/catppuccin/blob/main/docs/style-guide.md), as it's my preferred theme across most of my tools.
Look at the [roadmap](#roadmap) for more details.
It's still a work in progress, so :
- currently, most code resides in `main.py`, except for some experimental files that aren't part of the final structure:
- `stats.py` is the test playground for the stats functionality
- expect bugs and missing features
- the code is not yet organized as I'm currently experimenting
It's still a work in progress, so expect bugs and missing features.
## How to run
@@ -28,28 +17,57 @@ This project uses [uv](https://github.com/astral-sh/uv) for dependency managemen
$ uv sync
# run the TUI
$ uv run main.py
$ uv run python -m auditui.cli
```
(`stats.py` is a playground for the stats functionality)
Please also note that as of now, you need to have [ffmpeg](https://ffmpeg.org/) installed to play audio files.
### Bindings
| Key | Action |
| ------------ | -------------------------- |
| `?` | Show help screen |
| `n` | Sort by name |
| `p` | Sort by progress |
| `a` | Show all/unfinished |
| `enter` | Play the selected book |
| `space` | Pause/resume the playback |
| `left` | Seek backward 30 seconds |
| `right` | Seek forward 30 seconds |
| `ctrl+left` | Go to the previous chapter |
| `ctrl+right` | Go to the next chapter |
| `d` | Download/delete from cache |
| `q` | Quit the application |
## Roadmap
- [x] list your library
- [x] list your unfinished books with progress information
- [x] play/pause a book
- [ ] resume playback of a book from the last position, regardless of which device was used previously
- [ ] save the current playback position when pausing or exiting the app
- [ ] print progress at the bottom of the app while a book is playing
- [ ] add control to go to the previous/next chapter
- [ ] add a control to jump 30s earlier/later
- [x] catppuccin mocha theme
- [x] print chapter and progress in the footer of the app while a book is playing
- [x] chapter progress bar in footer
- [x] add a control to jump 30s earlier/later
- [x] add control to go to the previous/next chapter
- [x] save/resume playback of a book from the last position, regardless of which device was used previously
- [x] download/remove a book in the cache without having to play it
- [x] add a help screen with all the keybindings
- [ ] increase/decrease reading speed
- [ ] mark a book as finished or unfinished
- [ ] get your stats in a separated pane
- [ ] filter books on views
- [ ] search in your book library
And after that:
- [ ] search the marketplace for books
- [ ] add a book in your wishlist
- [ ] get your listening stats from Audible
All of this, and of course:
- [ ] installation setup
- [ ] build a decent TUI interface using [Textual](https://textual.textualize.io/)
- [ ] code cleanup / organization

2
auditui/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
"""Auditui package providing the Audible TUI app components."""

402
auditui/app.py Normal file
View File

@@ -0,0 +1,402 @@
"""Textual application for the Audible TUI."""
from __future__ import annotations
from typing import TYPE_CHECKING
from textual import work
from textual.app import App, ComposeResult
from textual.events import Key
from textual.widgets import DataTable, Footer, Header, ProgressBar, Static
from textual.worker import get_current_worker
from .constants import PROGRESS_COLUMN_INDEX, SEEK_SECONDS, TABLE_CSS, TABLE_COLUMNS
from .downloads import DownloadManager
from .library import LibraryClient
from .playback import PlaybackController
from .table_utils import (
create_progress_sort_key,
create_title_sort_key,
filter_unfinished_items,
format_item_as_row,
)
from .ui import HelpScreen
if TYPE_CHECKING:
from textual.widgets._data_table import ColumnKey
class Auditui(App):
"""Main application class for the Audible TUI app."""
theme = "textual-dark"
SHOW_PALETTE = False
BINDINGS = [
("?", "show_help", "Help"),
("n", "sort", "Sort by name"),
("p", "sort_by_progress", "Sort by progress"),
("a", "show_all", "All/Unfinished"),
("enter", "play_selected", "Play"),
("space", "toggle_playback", "Pause/Resume"),
("left", "seek_backward", "-30s"),
("right", "seek_forward", "+30s"),
("ctrl+left", "previous_chapter", "Previous chapter"),
("ctrl+right", "next_chapter", "Next chapter"),
("d", "toggle_download", "Download/Delete"),
("q", "quit", "Quit"),
]
CSS = TABLE_CSS
def __init__(self, auth=None, client=None) -> None:
super().__init__()
self.auth = auth
self.client = client
self.library_client = LibraryClient(client) if client else None
self.download_manager = (
DownloadManager(auth, client) if auth and client else None
)
self.playback = PlaybackController(
self.update_status, self.library_client)
self.all_items: list[dict] = []
self.current_items: list[dict] = []
self.show_all_mode = False
self.title_sort_reverse = False
self.progress_sort_reverse = False
self.title_column_key: ColumnKey | None = None
self.progress_column_key: ColumnKey | None = None
self.progress_column_index = PROGRESS_COLUMN_INDEX
def compose(self) -> ComposeResult:
yield Header()
yield Static("Loading...", id="status")
table: DataTable = DataTable()
table.zebra_stripes = True
table.cursor_type = "row"
yield table
yield Static("", id="progress_info")
yield ProgressBar(id="progress_bar", show_eta=False, show_percentage=False, total=100)
yield Footer(show_command_palette=False)
def on_mount(self) -> None:
"""Initialize the table and start fetching library data."""
table = self.query_one(DataTable)
table.add_columns(*TABLE_COLUMNS)
column_keys = list(table.columns.keys())
self.title_column_key = column_keys[0]
self.progress_column_key = column_keys[3]
if self.client:
self.update_status("Fetching library...")
self.fetch_library()
else:
self.update_status(
"Not authenticated. Please restart and authenticate.")
self.set_interval(1.0, self._check_playback_status)
self.set_interval(0.5, self._update_progress)
self.set_interval(30.0, self._save_position_periodically)
def on_unmount(self) -> None:
"""Clean up on app exit."""
self.playback.stop()
if self.download_manager:
self.download_manager.close()
def on_key(self, event: Key) -> None:
"""Handle key presses."""
if self.playback.is_playing:
if event.key == "ctrl+left":
event.prevent_default()
self.action_previous_chapter()
return
elif event.key == "ctrl+right":
event.prevent_default()
self.action_next_chapter()
return
elif event.key == "left":
event.prevent_default()
self.action_seek_backward()
return
elif event.key == "right":
event.prevent_default()
self.action_seek_forward()
return
if isinstance(self.focused, DataTable):
if event.key == "enter":
event.prevent_default()
self.action_play_selected()
elif event.key == "space":
event.prevent_default()
self.action_toggle_playback()
def update_status(self, message: str) -> None:
"""Update the status message in the UI."""
status = self.query_one("#status", Static)
status.update(message)
def _thread_status_update(self, message: str) -> None:
"""Safely update status from worker threads."""
self.call_from_thread(self.update_status, message)
@work(exclusive=True, thread=True)
def fetch_library(self) -> None:
"""Fetch all library items from Audible API in background thread."""
worker = get_current_worker()
if worker.is_cancelled or not self.library_client:
return
try:
all_items = self.library_client.fetch_all_items(
self._thread_status_update)
self.call_from_thread(self.on_library_loaded, all_items)
except (OSError, ValueError, KeyError) as exc:
self.call_from_thread(self.on_library_error, str(exc))
def on_library_loaded(self, items: list[dict]) -> None:
"""Handle successful library load."""
self.all_items = items
self.update_status(f"Loaded {len(items)} books")
self.show_unfinished()
def on_library_error(self, error: str) -> None:
"""Handle library fetch error."""
self.update_status(f"Error fetching library: {error}")
def _populate_table(self, items: list[dict]) -> None:
"""Populate the DataTable with library items."""
table = self.query_one(DataTable)
table.clear()
if not items or not self.library_client:
self.update_status("No books found.")
return
for item in items:
title, author, runtime, progress, downloaded = format_item_as_row(
item, self.library_client, self.download_manager)
table.add_row(title, author, runtime,
progress, downloaded, key=title)
self.current_items = items
mode = "all" if self.show_all_mode else "unfinished"
self.update_status(f"Showing {len(items)} books ({mode})")
def _refresh_table(self) -> None:
"""Refresh the table with current items."""
if self.current_items:
self._populate_table(self.current_items)
def show_all(self) -> None:
"""Display all books in the table."""
if not self.all_items:
return
self.show_all_mode = True
self._populate_table(self.all_items)
def show_unfinished(self) -> None:
"""Display only unfinished books in the table."""
if not self.all_items or not self.library_client:
return
self.show_all_mode = False
unfinished_items = filter_unfinished_items(
self.all_items, self.library_client)
self._populate_table(unfinished_items)
def action_sort(self) -> None:
"""Sort table by title, toggling direction on each press."""
table = self.query_one(DataTable)
if table.row_count > 0 and self.title_column_key:
title_key, reverse = create_title_sort_key(self.title_sort_reverse)
table.sort(key=title_key, reverse=reverse)
self.title_sort_reverse = not self.title_sort_reverse
def action_sort_by_progress(self) -> None:
"""Sort table by progress percentage, toggling direction on each press."""
table = self.query_one(DataTable)
if table.row_count > 0:
self.progress_sort_reverse = not self.progress_sort_reverse
progress_key, reverse = create_progress_sort_key(
self.progress_column_index, self.progress_sort_reverse)
table.sort(key=progress_key, reverse=reverse)
def action_show_all(self) -> None:
"""Toggle between showing all and unfinished books."""
if self.show_all_mode:
self.show_unfinished()
else:
self.show_all()
def action_show_unfinished(self) -> None:
"""Show unfinished books."""
self.show_unfinished()
def action_play_selected(self) -> None:
"""Start playing the selected book."""
if not self.download_manager:
self.update_status(
"Not authenticated. Please restart and authenticate.")
return
table = self.query_one(DataTable)
if table.row_count == 0:
self.update_status("No books available")
return
cursor_row = table.cursor_row
if cursor_row >= len(self.current_items):
self.update_status("Invalid selection")
return
if not self.library_client:
self.update_status("Library client not available")
return
selected_item = self.current_items[cursor_row]
asin = self.library_client.extract_asin(selected_item)
if not asin:
self.update_status("Could not get ASIN for selected book")
return
self._start_playback_async(asin)
def action_toggle_playback(self) -> None:
"""Toggle pause/resume state."""
if not self.playback.toggle_playback():
self._no_playback_message()
def action_seek_forward(self) -> None:
"""Seek forward 30 seconds."""
if not self.playback.seek_forward(SEEK_SECONDS):
self._no_playback_message()
def action_seek_backward(self) -> None:
"""Seek backward 30 seconds."""
if not self.playback.seek_backward(SEEK_SECONDS):
self._no_playback_message()
def action_next_chapter(self) -> None:
"""Seek to the next chapter."""
if not self.playback.seek_to_next_chapter():
self._no_playback_message()
def action_previous_chapter(self) -> None:
"""Seek to the previous chapter."""
if not self.playback.seek_to_previous_chapter():
self._no_playback_message()
def _no_playback_message(self) -> None:
"""Show message when no playback is active."""
self.update_status("No playback active. Press Enter to play a book.")
def action_show_help(self) -> None:
"""Show the help screen with all keybindings."""
self.push_screen(HelpScreen())
def _check_playback_status(self) -> None:
"""Check if playback process has finished and update state accordingly."""
message = self.playback.check_status()
if message:
self.update_status(message)
self._hide_progress()
def _update_progress(self) -> None:
"""Update the progress bar and info during playback."""
if not self.playback.is_playing:
self._hide_progress()
return
progress_data = self.playback.get_current_progress()
if not progress_data:
self._hide_progress()
return
chapter_name, chapter_elapsed, chapter_total = progress_data
if chapter_total <= 0:
self._hide_progress()
return
progress_info = self.query_one("#progress_info", Static)
progress_bar = self.query_one("#progress_bar", ProgressBar)
progress_percent = min(100.0, max(
0.0, (chapter_elapsed / chapter_total) * 100.0))
progress_bar.update(progress=progress_percent)
chapter_elapsed_str = LibraryClient.format_time(chapter_elapsed)
chapter_total_str = LibraryClient.format_time(chapter_total)
progress_info.update(
f"{chapter_name} | {chapter_elapsed_str} / {chapter_total_str}")
progress_info.display = True
progress_bar.display = True
def _hide_progress(self) -> None:
"""Hide the progress widget."""
progress_info = self.query_one("#progress_info", Static)
progress_bar = self.query_one("#progress_bar", ProgressBar)
progress_info.display = False
progress_bar.display = False
def _save_position_periodically(self) -> None:
"""Periodically save playback position."""
self.playback.update_position_if_needed()
def action_toggle_download(self) -> None:
"""Toggle download/remove for the selected book."""
if not self.download_manager:
self.update_status(
"Not authenticated. Please restart and authenticate.")
return
table = self.query_one(DataTable)
if table.row_count == 0:
self.update_status("No books available")
return
cursor_row = table.cursor_row
if cursor_row >= len(self.current_items):
self.update_status("Invalid selection")
return
if not self.library_client:
self.update_status("Library client not available")
return
selected_item = self.current_items[cursor_row]
asin = self.library_client.extract_asin(selected_item)
if not asin:
self.update_status("Could not get ASIN for selected book")
return
self._toggle_download_async(asin)
@work(exclusive=True, thread=True)
def _toggle_download_async(self, asin: str) -> None:
"""Toggle download/remove asynchronously."""
if not self.download_manager:
return
if self.download_manager.is_cached(asin):
self.download_manager.remove_cached(
asin, self._thread_status_update)
else:
self.download_manager.get_or_download(
asin, self._thread_status_update)
self.call_from_thread(self._refresh_table)
@work(exclusive=True, thread=True)
def _start_playback_async(self, asin: str) -> None:
"""Start playback asynchronously."""
if not self.download_manager:
return
self.playback.prepare_and_start(
self.download_manager,
asin,
self._thread_status_update,
)

25
auditui/auth.py Normal file
View File

@@ -0,0 +1,25 @@
"""Authentication helpers for the Auditui app."""
from pathlib import Path
from typing import Tuple
import audible
from .constants import AUTH_PATH
def authenticate(
auth_path: Path = AUTH_PATH,
) -> Tuple[audible.Authenticator, audible.Client]:
"""Authenticate with Audible and return authenticator and client."""
if not auth_path.exists():
raise FileNotFoundError(
"Authentication file not found. Please run 'auditui configure' to set up authentication.")
try:
authenticator = audible.Authenticator.from_file(str(auth_path))
audible_client = audible.Client(auth=authenticator)
return authenticator, audible_client
except (OSError, ValueError, KeyError) as exc:
raise ValueError(
f"Failed to load existing authentication: {exc}") from exc

45
auditui/cli.py Normal file
View File

@@ -0,0 +1,45 @@
#!/usr/bin/env python3
"""Auditui entrypoint."""
import sys
from auditui.app import Auditui
from auditui.auth import authenticate
from auditui.configure import configure
from auditui.constants import AUTH_PATH
def main() -> None:
"""Authenticate and launch the app."""
if len(sys.argv) > 1 and sys.argv[1] == "configure":
try:
configure()
print("Configuration completed successfully.")
except Exception as exc:
print(f"Configuration error: {exc}")
sys.exit(1)
return
config_dir = AUTH_PATH.parent
if not config_dir.exists():
print("No configuration yet, please run 'auditui configure'.")
sys.exit(1)
try:
auth, client = authenticate()
except Exception as exc:
print(f"Authentication error: {exc}")
if not AUTH_PATH.exists():
print("No configuration yet, please run 'auditui configure'.")
else:
print("Please re-authenticate by running 'auditui configure'.")
sys.exit(1)
app = Auditui(auth=auth, client=client)
app.run()
if __name__ == "__main__":
main()

40
auditui/configure.py Normal file
View File

@@ -0,0 +1,40 @@
"""Configuration helpers for the Auditui app."""
from getpass import getpass
from pathlib import Path
from typing import Tuple
import audible
from .constants import AUTH_PATH
def configure(
auth_path: Path = AUTH_PATH,
) -> Tuple[audible.Authenticator, audible.Client]:
"""Force re-authentication and save credentials."""
if auth_path.exists():
response = input(
"Configuration already exists. Are you sure you want to overwrite it? (y/N): "
).strip().lower()
if response not in ("yes", "y"):
print("Configuration cancelled.")
raise SystemExit(0)
print("Please authenticate with your Audible account.")
email = input("\nEmail: ")
password = getpass("Password: ")
marketplace = input(
"Marketplace locale (default: US): ").strip().upper() or "US"
authenticator = audible.Authenticator.from_login(
username=email, password=password, locale=marketplace
)
auth_path.parent.mkdir(parents=True, exist_ok=True)
authenticator.to_file(str(auth_path))
print("Authentication successful!")
audible_client = audible.Client(auth=authenticator)
return authenticator, audible_client

236
auditui/constants.py Normal file
View File

@@ -0,0 +1,236 @@
"""Shared constants for the Auditui application."""
from pathlib import Path
AUTH_PATH = Path.home() / ".config" / "auditui" / "auth.json"
CACHE_DIR = Path.home() / ".cache" / "auditui" / "books"
DOWNLOAD_URL = "https://cde-ta-g7g.amazon.com/FionaCDEServiceEngine/FSDownloadContent"
DEFAULT_CODEC = "LC_128_44100_stereo"
MIN_FILE_SIZE = 1024 * 1024
DEFAULT_CHUNK_SIZE = 8192
TABLE_COLUMNS = ("Title", "Author", "Length", "Progress", "Downloaded")
AUTHOR_NAME_MAX_LENGTH = 40
AUTHOR_NAME_DISPLAY_LENGTH = 37
PROGRESS_COLUMN_INDEX = 3
SEEK_SECONDS = 30.0
TABLE_CSS = """
Screen {
background: #1e1e2e;
}
Header {
background: #181825;
color: #cdd6f4;
text-style: bold;
}
Footer {
background: #181825;
color: #bac2de;
height: 2;
padding: 0 1;
scrollbar-size: 0 0;
overflow-x: auto;
overflow-y: hidden;
}
Footer > HorizontalGroup > KeyGroup,
Footer > HorizontalGroup > KeyGroup.-compact {
margin: 0;
padding: 0;
background: #181825;
}
FooterKey,
FooterKey.-grouped,
Footer.-compact FooterKey {
background: #181825;
padding: 0;
margin: 0 1 0 0;
}
FooterKey .footer-key--key {
color: #f9e2af;
background: #181825;
text-style: bold;
padding: 0 1 0 0;
}
FooterKey .footer-key--description {
color: #cdd6f4;
background: #181825;
padding: 0;
}
FooterKey:hover {
background: #313244;
color: #cdd6f4;
}
FooterKey:hover .footer-key--key,
FooterKey:hover .footer-key--description {
background: #313244;
}
DataTable {
height: 1fr;
background: #1e1e2e;
color: #cdd6f4;
border: solid #585b70;
}
DataTable:focus {
border: solid #89b4fa;
}
DataTable > .datatable--header {
background: #45475a;
color: #bac2de;
text-style: bold;
}
DataTable > .datatable--cursor {
background: #313244;
color: #cdd6f4;
}
DataTable > .datatable--odd-row {
background: #181825;
}
DataTable > .datatable--even-row {
background: #1e1e2e;
}
Static {
height: 1;
text-align: center;
background: #181825;
color: #cdd6f4;
}
Static#status {
color: #bac2de;
}
Static#progress_info {
color: #89b4fa;
text-style: bold;
margin: 0;
padding: 0;
width: 100%;
}
ProgressBar#progress_bar {
height: 1;
background: #181825;
border: none;
margin: 0;
padding: 0 1;
width: 100%;
align: center middle;
}
ProgressBar#progress_bar > .progress-bar--track {
background: #45475a;
}
ProgressBar#progress_bar > .progress-bar--bar {
background: #a6e3a1;
}
HelpScreen {
align: center middle;
background: rgba(0, 0, 0, 0.7);
}
#help_container {
width: 70;
height: auto;
max-height: 85%;
min-height: 20;
background: #1e1e2e;
border: thick #89b4fa;
padding: 2;
}
#help_title {
text-align: center;
text-style: bold;
color: #89b4fa;
margin-bottom: 2;
padding-bottom: 1;
border-bottom: solid #585b70;
height: 3;
align: center middle;
}
#help_content {
width: 100%;
height: 1fr;
padding: 1 0;
margin: 1 0;
overflow-y: auto;
scrollbar-size: 0 1;
}
#help_content > .scrollbar--vertical {
background: #313244;
}
#help_content > .scrollbar--vertical > .scrollbar--track {
background: #181825;
}
#help_content > .scrollbar--vertical > .scrollbar--handle {
background: #585b70;
}
#help_content > .scrollbar--vertical > .scrollbar--handle:hover {
background: #45475a;
}
.help_row {
height: 3;
margin: 0 0 1 0;
padding: 0 1;
background: #181825;
border: solid #313244;
align: left middle;
}
.help_row:hover {
background: #313244;
border: solid #45475a;
}
.help_key {
width: 20;
text-align: right;
padding: 0 2 0 0;
color: #f9e2af;
text-style: bold;
align: right middle;
}
.help_action {
width: 1fr;
text-align: left;
padding: 0 0 0 2;
color: #cdd6f4;
align: left middle;
}
#help_footer {
text-align: center;
color: #bac2de;
margin-top: 2;
padding-top: 1;
border-top: solid #585b70;
height: 3;
align: center middle;
}
"""

236
auditui/downloads.py Normal file
View File

@@ -0,0 +1,236 @@
"""Download helpers for Audible content."""
import re
from pathlib import Path
from typing import Callable
from urllib.parse import urlparse
import audible
import httpx
from audible.activation_bytes import get_activation_bytes
from .constants import CACHE_DIR, DEFAULT_CHUNK_SIZE, DEFAULT_CODEC, DOWNLOAD_URL, MIN_FILE_SIZE
StatusCallback = Callable[[str], None]
class DownloadManager:
"""Handle retrieval and download of Audible titles."""
def __init__(
self,
auth: audible.Authenticator,
client: audible.Client,
cache_dir: Path = CACHE_DIR,
chunk_size: int = DEFAULT_CHUNK_SIZE,
) -> None:
self.auth = auth
self.client = client
self.cache_dir = cache_dir
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.chunk_size = chunk_size
self._http_client = httpx.Client(
auth=auth, timeout=30.0, follow_redirects=True)
self._download_client = httpx.Client(
timeout=httpx.Timeout(connect=30.0, read=None,
write=30.0, pool=30.0),
follow_redirects=True,
)
def get_or_download(self, asin: str, notify: StatusCallback | None = None) -> Path | None:
"""Get local path of AAX file, downloading if missing."""
title = self._get_name_from_asin(asin) or asin
safe_title = self._sanitize_filename(title)
local_path = self.cache_dir / f"{safe_title}.aax"
if local_path.exists() and local_path.stat().st_size >= MIN_FILE_SIZE:
if notify:
notify(f"Using cached file: {local_path.name}")
return local_path
if notify:
notify(f"Downloading to {local_path.name}...")
dl_link = self._get_download_link(asin, notify=notify)
if not dl_link:
if notify:
notify("Failed to get download link")
return None
if not self._validate_download_url(dl_link):
if notify:
notify("Invalid download URL")
return None
if not self._download_file(dl_link, local_path, notify):
if notify:
notify("Download failed")
return None
if not local_path.exists() or local_path.stat().st_size < MIN_FILE_SIZE:
if notify:
notify("Download failed or file too small")
return None
return local_path
def get_activation_bytes(self) -> str | None:
"""Get activation bytes as hex string."""
try:
activation_bytes = get_activation_bytes(self.auth)
if isinstance(activation_bytes, bytes):
return activation_bytes.hex()
return str(activation_bytes)
except (OSError, ValueError, KeyError, AttributeError):
return None
def get_cached_path(self, asin: str) -> Path | None:
"""Get the cached file path for a book if it exists."""
title = self._get_name_from_asin(asin) or asin
safe_title = self._sanitize_filename(title)
local_path = self.cache_dir / f"{safe_title}.aax"
if local_path.exists() and local_path.stat().st_size >= MIN_FILE_SIZE:
return local_path
return None
def is_cached(self, asin: str) -> bool:
"""Check if a book is already cached."""
return self.get_cached_path(asin) is not None
def remove_cached(self, asin: str, notify: StatusCallback | None = None) -> bool:
"""Remove a cached book file."""
cached_path = self.get_cached_path(asin)
if not cached_path:
if notify:
notify("Book is not cached")
return False
try:
cached_path.unlink()
if notify:
notify(f"Removed from cache: {cached_path.name}")
return True
except OSError as exc:
if notify:
notify(f"Failed to remove cache: {exc}")
return False
def _validate_download_url(self, url: str) -> bool:
"""Validate that the URL is a valid HTTP/HTTPS URL."""
try:
parsed = urlparse(url)
return parsed.scheme in ("http", "https") and bool(parsed.netloc)
except (ValueError, AttributeError):
return False
def _sanitize_filename(self, filename: str) -> str:
"""Remove invalid characters from filename."""
return re.sub(r'[<>:"/\\|?*]', "_", filename)
def _get_name_from_asin(self, asin: str) -> str | None:
"""Get the title/name of a book from its ASIN."""
try:
product_info = self.client.get(
path=f"1.0/catalog/products/{asin}",
response_groups="product_desc,product_attrs",
)
product = product_info.get("product", {})
return product.get("title") or "Unknown Title"
except (OSError, ValueError, KeyError):
return None
def _get_download_link(
self, asin: str, codec: str = DEFAULT_CODEC, notify: StatusCallback | None = None
) -> str | None:
"""Get download link for book."""
if self.auth.adp_token is None:
if notify:
notify("Missing ADP token (not authenticated?)")
return None
try:
params = {
"type": "AUDI",
"currentTransportMethod": "WIFI",
"key": asin,
"codec": codec,
}
response = self._http_client.get(
url=DOWNLOAD_URL,
params=params,
)
response.raise_for_status()
link = response.headers.get("Location")
if not link:
link = str(response.url)
tld = self.auth.locale.domain
return link.replace("cds.audible.com", f"cds.audible.{tld}")
except httpx.HTTPError as exc:
if notify:
notify(f"Download-link request failed: {exc!s}")
return None
except (OSError, ValueError, KeyError, AttributeError) as exc:
if notify:
notify(f"Download-link error: {exc!s}")
return None
def _download_file(
self, url: str, dest_path: Path, notify: StatusCallback | None = None
) -> Path | None:
"""Download file from URL to destination."""
try:
with self._download_client.stream("GET", url) as response:
response.raise_for_status()
total_size = int(response.headers.get("content-length", 0))
downloaded = 0
with open(dest_path, "wb") as file_handle:
for chunk in response.iter_bytes(chunk_size=self.chunk_size):
file_handle.write(chunk)
downloaded += len(chunk)
if total_size > 0 and notify:
percent = (downloaded / total_size) * 100
notify(
f"Downloading: {percent:.1f}% ({downloaded}/{total_size} bytes)"
)
return dest_path
except httpx.HTTPStatusError as exc:
if notify:
notify(
f"Download HTTP error: {exc.response.status_code} {exc.response.reason_phrase}"
)
try:
if dest_path.exists() and dest_path.stat().st_size < MIN_FILE_SIZE:
dest_path.unlink()
except OSError:
pass
return None
except httpx.HTTPError as exc:
if notify:
notify(f"Download network error: {exc!s}")
try:
if dest_path.exists() and dest_path.stat().st_size < MIN_FILE_SIZE:
dest_path.unlink()
except OSError:
pass
return None
except (OSError, ValueError, KeyError) as exc:
if notify:
notify(f"Download error: {exc!s}")
try:
if dest_path.exists() and dest_path.stat().st_size < MIN_FILE_SIZE:
dest_path.unlink()
except OSError:
pass
return None
def close(self) -> None:
"""Close the HTTP clients and release resources."""
if hasattr(self, "_http_client"):
self._http_client.close()
if hasattr(self, "_download_client"):
self._download_client.close()

241
auditui/library.py Normal file
View File

@@ -0,0 +1,241 @@
"""Library helpers for fetching and formatting Audible data."""
from typing import Callable, List
import audible
ProgressCallback = Callable[[str], None]
class LibraryClient:
"""Helper for interacting with the Audible library."""
def __init__(self, client: audible.Client) -> None:
self.client = client
def fetch_all_items(self, on_progress: ProgressCallback | None = None) -> list:
"""Fetch all library items from the API."""
response_groups = (
"contributors,media,product_attrs,product_desc,product_details,"
"rating,is_finished,listening_status,percent_complete"
)
return self._fetch_all_pages(response_groups, on_progress)
def _fetch_all_pages(
self, response_groups: str, on_progress: ProgressCallback | None = None
) -> list:
"""Fetch all pages of library items from the API."""
all_items: List[dict] = []
page = 1
page_size = 50
while True:
library = self.client.get(
path="library",
num_results=page_size,
page=page,
response_groups=response_groups,
)
items = list(library.get("items", []))
if not items:
break
all_items.extend(items)
if on_progress:
on_progress(f"Fetched page {page} ({len(items)} items)...")
if len(items) < page_size:
break
page += 1
return all_items
def extract_title(self, item: dict) -> str:
"""Extract title from library item."""
product = item.get("product", {})
return (
product.get("title")
or item.get("title")
or product.get("asin", "Unknown Title")
)
def extract_authors(self, item: dict) -> str:
"""Extract author names from library item."""
product = item.get("product", {})
authors = product.get("authors") or product.get("contributors") or []
if not authors and "authors" in item:
authors = item.get("authors", [])
author_names = [a.get("name", "")
for a in authors if isinstance(a, dict)]
return ", ".join(author_names) or "Unknown"
def extract_runtime_minutes(self, item: dict) -> int | None:
"""Extract runtime in minutes from library item."""
product = item.get("product", {})
runtime_fields = [
"runtime_length_min",
"runtime_length",
"vLength",
"length",
"duration",
]
runtime = None
for field in runtime_fields:
runtime = product.get(field) or item.get(field)
if runtime is not None:
break
if runtime is None:
return None
if isinstance(runtime, dict):
return int(runtime.get("min", 0))
if isinstance(runtime, (int, float)):
return int(runtime)
return None
def extract_progress_info(self, item: dict) -> float | None:
"""Extract progress percentage from library item."""
percent_complete = item.get("percent_complete")
listening_status = item.get("listening_status", {})
if isinstance(listening_status, dict) and percent_complete is None:
percent_complete = listening_status.get("percent_complete")
return float(percent_complete) if percent_complete is not None else None
def extract_asin(self, item: dict) -> str | None:
"""Extract ASIN from library item."""
product = item.get("product", {})
return item.get("asin") or product.get("asin")
def is_finished(self, item: dict) -> bool:
"""Check if a library item is finished."""
is_finished_flag = item.get("is_finished")
percent_complete = item.get("percent_complete")
listening_status = item.get("listening_status")
if isinstance(listening_status, dict):
is_finished_flag = is_finished_flag or listening_status.get(
"is_finished", False
)
if percent_complete is None:
percent_complete = listening_status.get("percent_complete", 0)
return bool(is_finished_flag) or (
isinstance(percent_complete, (int, float)
) and percent_complete >= 100
)
def get_last_position(self, asin: str) -> float | None:
"""Get the last playback position for a book in seconds."""
try:
response = self.client.get(
path="1.0/annotations/lastpositions",
asins=asin,
)
annotations = response.get("asin_last_position_heard_annots", [])
for annot in annotations:
if annot.get("asin") != asin:
continue
last_position_heard = annot.get("last_position_heard", {})
if not isinstance(last_position_heard, dict):
continue
if last_position_heard.get("status") == "DoesNotExist":
return None
position_ms = last_position_heard.get("position_ms")
if position_ms is not None:
return float(position_ms) / 1000.0
return None
except (OSError, ValueError, KeyError):
return None
def _get_content_reference(self, asin: str) -> dict | None:
"""Get content reference data including ACR and version."""
try:
response = self.client.get(
path=f"1.0/content/{asin}/metadata",
response_groups="content_reference",
)
content_metadata = response.get("content_metadata", {})
content_reference = content_metadata.get("content_reference", {})
if isinstance(content_reference, dict):
return content_reference
return None
except (OSError, ValueError, KeyError):
return None
def save_last_position(self, asin: str, position_seconds: float) -> bool:
"""Save the last playback position for a book."""
if position_seconds <= 0:
return False
content_ref = self._get_content_reference(asin)
if not content_ref:
return False
acr = content_ref.get("acr")
if not acr:
return False
body = {
"acr": acr,
"asin": asin,
"position_ms": int(position_seconds * 1000),
}
if version := content_ref.get("version"):
body["version"] = version
try:
self.client.put(
path=f"1.0/lastpositions/{asin}",
body=body,
)
return True
except (OSError, ValueError, KeyError):
return False
@staticmethod
def format_duration(
value: int | None, unit: str = "minutes", default_none: str | None = None
) -> str | None:
"""Format duration value into a human-readable string."""
if value is None or value <= 0:
return default_none
total_minutes = int(value)
if unit == "seconds":
total_minutes //= 60
hours, minutes = divmod(total_minutes, 60)
parts = []
if hours:
parts.append(f"{hours} hour{'s' if hours != 1 else ''}")
if minutes:
parts.append(f"{minutes} minute{'s' if minutes != 1 else ''}")
return " ".join(parts) if parts else default_none
@staticmethod
def format_time(seconds: float) -> str:
"""Format seconds as HH:MM:SS or MM:SS."""
total_seconds = int(seconds)
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
secs = total_seconds % 60
if hours > 0:
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
return f"{minutes:02d}:{secs:02d}"

42
auditui/media_info.py Normal file
View File

@@ -0,0 +1,42 @@
"""Media information loading for Audible content."""
import json
import shutil
import subprocess
from pathlib import Path
def load_media_info(path: Path, activation_hex: str | None = None) -> tuple[float | None, list[dict]]:
"""Load media information including duration and chapters using ffprobe."""
if not shutil.which("ffprobe"):
return None, []
try:
cmd = ["ffprobe", "-v", "quiet", "-print_format",
"json", "-show_format", "-show_chapters"]
if activation_hex:
cmd.extend(["-activation_bytes", activation_hex])
cmd.append(str(path))
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=10)
if result.returncode != 0:
return None, []
data = json.loads(result.stdout)
format_info = data.get("format", {})
duration_str = format_info.get("duration")
duration = float(duration_str) if duration_str else None
chapters_data = data.get("chapters", [])
chapters = [
{
"start_time": float(ch.get("start_time", 0)),
"end_time": float(ch.get("end_time", 0)),
"title": ch.get("tags", {}).get("title", f"Chapter {idx + 1}"),
}
for idx, ch in enumerate(chapters_data)
]
return duration, chapters
except (json.JSONDecodeError, subprocess.TimeoutExpired, ValueError, KeyError):
return None, []

491
auditui/playback.py Normal file
View File

@@ -0,0 +1,491 @@
"""Playback control for Auditui."""
from __future__ import annotations
import os
import shutil
import signal
import subprocess
import time
from pathlib import Path
from typing import Callable
from .downloads import DownloadManager
from .library import LibraryClient
from .media_info import load_media_info
StatusCallback = Callable[[str], None]
class PlaybackController:
"""Manage playback through ffplay."""
def __init__(self, notify: StatusCallback, library_client: LibraryClient | None = None) -> None:
self.notify = notify
self.library_client = library_client
self.playback_process: subprocess.Popen | None = None
self.is_playing = False
self.is_paused = False
self.current_file_path: Path | None = None
self.current_asin: str | None = None
self.playback_start_time: float | None = None
self.paused_duration: float = 0.0
self.pause_start_time: float | None = None
self.total_duration: float | None = None
self.chapters: list[dict] = []
self.seek_offset: float = 0.0
self.activation_hex: str | None = None
self.last_save_time: float = 0.0
self.position_save_interval: float = 30.0
def start(
self,
path: Path,
activation_hex: str | None = None,
status_callback: StatusCallback | None = None,
start_position: float = 0.0,
) -> bool:
"""Start playing a local file using ffplay."""
notify = status_callback or self.notify
if not shutil.which("ffplay"):
notify("ffplay not found. Please install ffmpeg")
return False
if self.playback_process is not None:
self.stop()
self.activation_hex = activation_hex
self.seek_offset = start_position
cmd = ["ffplay", "-nodisp", "-autoexit"]
if activation_hex:
cmd.extend(["-activation_bytes", activation_hex])
if start_position > 0:
cmd.extend(["-ss", str(start_position)])
cmd.append(str(path))
try:
self.playback_process = subprocess.Popen(
cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
time.sleep(0.2)
if self.playback_process.poll() is not None:
return_code = self.playback_process.returncode
if return_code == 0 and start_position > 0 and self.total_duration:
if start_position >= self.total_duration - 5:
notify("Reached end of file")
self._reset_state()
return False
notify(
f"Playback process exited immediately (code: {return_code})")
self.playback_process = None
return False
self.is_playing = True
self.is_paused = False
self.current_file_path = path
self.playback_start_time = time.time()
self.paused_duration = 0.0
self.pause_start_time = None
duration, chapters = load_media_info(path, activation_hex)
self.total_duration = duration
self.chapters = chapters
notify(f"Playing: {path.name}")
return True
except (OSError, ValueError, subprocess.SubprocessError) as exc:
notify(f"Error starting playback: {exc}")
return False
def stop(self) -> None:
"""Stop the current playback."""
if self.playback_process is None:
return
self._save_current_position()
try:
if self.playback_process.poll() is None:
self.playback_process.terminate()
try:
self.playback_process.wait(timeout=2)
except subprocess.TimeoutExpired:
self.playback_process.kill()
self.playback_process.wait()
except (ProcessLookupError, ValueError):
pass
finally:
self._reset_state()
def pause(self) -> None:
"""Pause the current playback."""
if not self._validate_playback_state(require_paused=False):
return
self.pause_start_time = time.time()
self._send_signal(signal.SIGSTOP, "Paused", "pause")
def resume(self) -> None:
"""Resume the current playback."""
if not self._validate_playback_state(require_paused=True):
return
if self.pause_start_time is not None:
self.paused_duration += time.time() - self.pause_start_time
self.pause_start_time = None
self._send_signal(signal.SIGCONT, "Playing", "resume")
def check_status(self) -> str | None:
"""Check if playback process has finished and return status message."""
if self.playback_process is None:
return None
return_code = self.playback_process.poll()
if return_code is None:
return None
finished_file = self.current_file_path
self._reset_state()
if finished_file:
if return_code == 0:
return f"Finished: {finished_file.name}"
return f"Playback ended unexpectedly (code: {return_code}): {finished_file.name}"
return "Playback finished"
def _reset_state(self) -> None:
"""Reset all playback state."""
self.playback_process = None
self.is_playing = False
self.is_paused = False
self.current_file_path = None
self.current_asin = None
self.playback_start_time = None
self.paused_duration = 0.0
self.pause_start_time = None
self.total_duration = None
self.chapters = []
self.seek_offset = 0.0
self.activation_hex = None
self.last_save_time = 0.0
def _validate_playback_state(self, require_paused: bool) -> bool:
"""Validate playback state before pause/resume operations."""
if not (self.playback_process and self.is_playing):
return False
if require_paused and not self.is_paused:
return False
if not require_paused and self.is_paused:
return False
if not self.is_alive():
self.stop()
self.notify("Playback process has ended")
return False
return True
def _send_signal(self, sig: signal.Signals, status_prefix: str, action: str) -> None:
"""Send signal to playback process and update state."""
if self.playback_process is None:
return
try:
os.kill(self.playback_process.pid, sig)
self.is_paused = sig == signal.SIGSTOP
filename = self.current_file_path.name if self.current_file_path else None
message = f"{status_prefix}: {filename}" if filename else status_prefix
self.notify(message)
except ProcessLookupError:
self.stop()
self.notify("Process no longer exists")
except PermissionError:
self.notify(f"Permission denied: cannot {action} playback")
except (OSError, ValueError) as exc:
self.notify(f"Error {action}ing playback: {exc}")
def is_alive(self) -> bool:
"""Check if playback process is still running."""
if self.playback_process is None:
return False
return self.playback_process.poll() is None
def prepare_and_start(
self,
download_manager: DownloadManager,
asin: str,
status_callback: StatusCallback | None = None,
) -> bool:
"""Download file, get activation bytes, and start playback."""
notify = status_callback or self.notify
if not download_manager:
notify("Could not download file")
return False
notify("Preparing playback...")
local_path = download_manager.get_or_download(asin, notify)
if not local_path:
notify("Could not download file")
return False
notify("Getting activation bytes...")
activation_hex = download_manager.get_activation_bytes()
if not activation_hex:
notify("Failed to get activation bytes")
return False
start_position = 0.0
if self.library_client:
try:
last_position = self.library_client.get_last_position(asin)
if last_position is not None and last_position > 0:
start_position = last_position
notify(
f"Resuming from {LibraryClient.format_time(start_position)}")
except (OSError, ValueError, KeyError):
pass
notify(f"Starting playback of {local_path.name}...")
self.current_asin = asin
self.last_save_time = time.time()
return self.start(local_path, activation_hex, notify, start_position)
def toggle_playback(self) -> bool:
"""Toggle pause/resume state. Returns True if action was taken."""
if not self.is_playing:
return False
if not self.is_alive():
self.stop()
self.notify("Playback has ended")
return False
if self.is_paused:
self.resume()
else:
self.pause()
return True
def _get_current_elapsed(self) -> float:
"""Calculate current elapsed playback time."""
if self.playback_start_time is None:
return 0.0
current_time = time.time()
if self.is_paused and self.pause_start_time is not None:
return (self.pause_start_time - self.playback_start_time) - self.paused_duration
if self.pause_start_time is not None:
self.paused_duration += current_time - self.pause_start_time
self.pause_start_time = None
return max(0.0, (current_time - self.playback_start_time) - self.paused_duration)
def _stop_process(self) -> None:
"""Stop the playback process without resetting state."""
if not self.playback_process:
return
try:
if self.playback_process.poll() is None:
self.playback_process.terminate()
try:
self.playback_process.wait(timeout=2)
except subprocess.TimeoutExpired:
self.playback_process.kill()
self.playback_process.wait()
except (ProcessLookupError, ValueError):
pass
self.playback_process = None
self.is_playing = False
self.is_paused = False
self.playback_start_time = None
self.paused_duration = 0.0
self.pause_start_time = None
def _seek(self, seconds: float, direction: str) -> bool:
"""Seek forward or backward by specified seconds."""
if not self.is_playing or not self.current_file_path:
return False
elapsed = self._get_current_elapsed()
current_total_position = self.seek_offset + elapsed
if direction == "forward":
new_position = current_total_position + seconds
if self.total_duration:
if new_position >= self.total_duration - 2:
self.notify("Already at end of file")
return False
new_position = min(new_position, self.total_duration - 2)
message = f"Skipped forward {int(seconds)}s"
else:
new_position = max(0.0, current_total_position - seconds)
message = f"Skipped backward {int(seconds)}s"
was_paused = self.is_paused
saved_state = {
"file_path": self.current_file_path,
"asin": self.current_asin,
"activation": self.activation_hex,
"duration": self.total_duration,
"chapters": self.chapters.copy(),
}
self._stop_process()
time.sleep(0.2)
if self.start(saved_state["file_path"], saved_state["activation"], self.notify, new_position):
self.current_asin = saved_state["asin"]
self.total_duration = saved_state["duration"]
self.chapters = saved_state["chapters"]
if was_paused:
time.sleep(0.3)
self.pause()
self.notify(message)
return True
return False
def seek_forward(self, seconds: float = 30.0) -> bool:
"""Seek forward by specified seconds. Returns True if action was taken."""
return self._seek(seconds, "forward")
def seek_backward(self, seconds: float = 30.0) -> bool:
"""Seek backward by specified seconds. Returns True if action was taken."""
return self._seek(seconds, "backward")
def get_current_progress(self) -> tuple[str, float, float] | None:
"""Get current playback progress."""
if not self.is_playing or self.playback_start_time is None:
return None
elapsed = self._get_current_elapsed()
total_elapsed = self.seek_offset + elapsed
chapter_name, chapter_elapsed, chapter_total = self._get_current_chapter(
total_elapsed)
return (chapter_name, chapter_elapsed, chapter_total)
def _get_current_chapter(self, elapsed: float) -> tuple[str, float, float]:
"""Get current chapter info."""
if not self.chapters:
return ("Unknown Chapter", elapsed, self.total_duration or 0.0)
for chapter in self.chapters:
if chapter["start_time"] <= elapsed < chapter["end_time"]:
chapter_elapsed = elapsed - chapter["start_time"]
chapter_total = chapter["end_time"] - chapter["start_time"]
return (chapter["title"], chapter_elapsed, chapter_total)
last_chapter = self.chapters[-1]
chapter_elapsed = max(0.0, elapsed - last_chapter["start_time"])
chapter_total = last_chapter["end_time"] - last_chapter["start_time"]
return (last_chapter["title"], chapter_elapsed, chapter_total)
def _get_current_chapter_index(self, elapsed: float) -> int | None:
"""Get the index of the current chapter based on elapsed time."""
if not self.chapters:
return None
for idx, chapter in enumerate(self.chapters):
if chapter["start_time"] <= elapsed < chapter["end_time"]:
return idx
return len(self.chapters) - 1
def seek_to_chapter(self, direction: str) -> bool:
"""Seek to next or previous chapter."""
if not self.is_playing or not self.current_file_path:
return False
if not self.chapters:
self.notify("No chapter information available")
return False
elapsed = self._get_current_elapsed()
current_total_position = self.seek_offset + elapsed
current_chapter_idx = self._get_current_chapter_index(
current_total_position)
if current_chapter_idx is None:
self.notify("Could not determine current chapter")
return False
if direction == "next":
if current_chapter_idx >= len(self.chapters) - 1:
self.notify("Already at last chapter")
return False
target_chapter = self.chapters[current_chapter_idx + 1]
new_position = target_chapter["start_time"]
message = f"Next chapter: {target_chapter['title']}"
else:
if current_chapter_idx <= 0:
self.notify("Already at first chapter")
return False
target_chapter = self.chapters[current_chapter_idx - 1]
new_position = target_chapter["start_time"]
message = f"Previous chapter: {target_chapter['title']}"
was_paused = self.is_paused
saved_state = {
"file_path": self.current_file_path,
"asin": self.current_asin,
"activation": self.activation_hex,
"duration": self.total_duration,
"chapters": self.chapters.copy(),
}
self._stop_process()
time.sleep(0.2)
if self.start(saved_state["file_path"], saved_state["activation"], self.notify, new_position):
self.current_asin = saved_state["asin"]
self.total_duration = saved_state["duration"]
self.chapters = saved_state["chapters"]
if was_paused:
time.sleep(0.3)
self.pause()
self.notify(message)
return True
return False
def seek_to_next_chapter(self) -> bool:
"""Seek to the next chapter. Returns True if action was taken."""
return self.seek_to_chapter("next")
def seek_to_previous_chapter(self) -> bool:
"""Seek to the previous chapter. Returns True if action was taken."""
return self.seek_to_chapter("previous")
def _save_current_position(self) -> None:
"""Save the current playback position to Audible."""
if not (self.library_client and self.current_asin and self.is_playing):
return
if self.playback_start_time is None:
return
current_position = self.seek_offset + self._get_current_elapsed()
if current_position <= 0:
return
try:
self.library_client.save_last_position(
self.current_asin, current_position)
except (OSError, ValueError, KeyError):
pass
def update_position_if_needed(self) -> None:
"""Periodically save position if enough time has passed."""
if not (self.is_playing and self.library_client and self.current_asin):
return
current_time = time.time()
if current_time - self.last_save_time >= self.position_save_interval:
self._save_current_position()
self.last_save_time = current_time

87
auditui/table_utils.py Normal file
View File

@@ -0,0 +1,87 @@
"""Utils for table operations."""
import unicodedata
from typing import TYPE_CHECKING, Callable
from .constants import (
AUTHOR_NAME_DISPLAY_LENGTH,
AUTHOR_NAME_MAX_LENGTH,
PROGRESS_COLUMN_INDEX,
)
if TYPE_CHECKING:
from .downloads import DownloadManager
def create_title_sort_key(reverse: bool = False) -> tuple[Callable, bool]:
"""Create a sort key function for sorting by title."""
def title_key(row_values):
title_cell = row_values[0]
if isinstance(title_cell, str):
normalized = unicodedata.normalize('NFD', title_cell)
return normalized.encode('ascii', 'ignore').decode('ascii').lower()
return str(title_cell).lower()
return title_key, reverse
def create_progress_sort_key(progress_column_index: int = PROGRESS_COLUMN_INDEX, reverse: bool = False) -> tuple[Callable, bool]:
"""Create a sort key function for sorting by progress percentage."""
def progress_key(row_values):
progress_cell = row_values[progress_column_index]
if isinstance(progress_cell, str):
try:
return float(progress_cell.rstrip("%"))
except (ValueError, AttributeError):
return 0.0
return 0.0
return progress_key, reverse
def truncate_author_name(author_names: str) -> str:
"""Truncate author name if it exceeds maximum length."""
if author_names and len(author_names) > AUTHOR_NAME_MAX_LENGTH:
return f"{author_names[:AUTHOR_NAME_DISPLAY_LENGTH]}..."
return author_names
def format_item_as_row(item: dict, library_client, download_manager: "DownloadManager | None" = None) -> tuple[str, str, str, str, str]:
"""Format a library item into table row data.
Returns:
Tuple of (title, author, runtime, progress, downloaded) strings
"""
title = library_client.extract_title(item)
author_names = library_client.extract_authors(item)
author_names = truncate_author_name(author_names)
author_display = author_names or "Unknown"
minutes = library_client.extract_runtime_minutes(item)
runtime_str = library_client.format_duration(
minutes, unit="minutes", default_none="Unknown length"
) or "Unknown"
percent_complete = library_client.extract_progress_info(item)
progress_str = (
f"{percent_complete:.1f}%"
if percent_complete and percent_complete > 0
else "0%"
)
downloaded_str = ""
if download_manager:
asin = library_client.extract_asin(item)
if asin and download_manager.is_cached(asin):
downloaded_str = ""
return (title, author_display, runtime_str, progress_str, downloaded_str)
def filter_unfinished_items(items: list[dict], library_client) -> list[dict]:
"""Filter out finished items from the list."""
return [
item for item in items
if not library_client.is_finished(item)
]

33
auditui/ui.py Normal file
View File

@@ -0,0 +1,33 @@
"""UI components for the Auditui application."""
from textual.app import ComposeResult
from textual.containers import Container, Horizontal, ScrollableContainer
from textual.screen import ModalScreen
from textual.widgets import Static
class HelpScreen(ModalScreen):
"""Help screen displaying all available keybindings."""
BINDINGS = [("escape", "dismiss", "Close"), ("?", "dismiss", "Close")]
def compose(self) -> ComposeResult:
with Container(id="help_container"):
yield Static("Key Bindings", id="help_title")
with ScrollableContainer(id="help_content"):
bindings = self.app.BINDINGS
for binding in bindings:
if isinstance(binding, tuple):
key, action, description = binding
else:
key = binding.key
description = binding.description
key_display = key.replace(
"ctrl+", "^").replace("left", "").replace("right", "").replace("space", "Space").replace("enter", "Enter")
with Horizontal(classes="help_row"):
yield Static(f"[bold #f9e2af]{key_display}[/]", classes="help_key")
yield Static(description, classes="help_action")
yield Static("Press [bold #f9e2af]?[/] or [bold #f9e2af]Escape[/] to close", id="help_footer")
def action_dismiss(self) -> None:
self.dismiss()

727
main.py
View File

@@ -1,727 +0,0 @@
#!/usr/bin/env python3
"""A terminal-based user interface (TUI) client for Audible"""
import os
import re
import shutil
import signal
import subprocess
from getpass import getpass
from pathlib import Path
import audible
import httpx
from audible.activation_bytes import get_activation_bytes
from textual.app import App, ComposeResult
from textual.events import Key
from textual.widgets import DataTable, Footer, Header, Static
from textual.worker import get_current_worker
from textual import work
class Auditui(App):
"""Main application class for the Audible TUI app."""
BINDINGS = [
("d", "toggle_dark", "Toggle dark mode"),
("s", "sort", "Sort by title"),
("r", "reverse_sort", "Reverse sort"),
("p", "sort_by_progress", "Sort by progress"),
("a", "show_all", "Show all books"),
("u", "show_unfinished", "Show unfinished"),
("enter", "play_selected", "Play selected book"),
("space", "toggle_playback", "Pause/Resume"),
("q", "quit", "Quit application"),
]
CSS = """
DataTable {
height: 1fr;
}
Static {
height: 1;
text-align: center;
background: $primary;
}
"""
AUTH_PATH = Path.home() / ".config" / "auditui" / "auth.json"
CACHE_DIR = Path.home() / ".cache" / "auditui" / "books"
DOWNLOAD_URL = (
"https://cde-ta-g7g.amazon.com/FionaCDEServiceEngine/FSDownloadContent"
)
DEFAULT_CODEC = "LC_128_44100_stereo"
MIN_FILE_SIZE = 1024 * 1024
def __init__(self):
super().__init__()
self.auth = None
self.client = None
self.all_items = []
self.current_items = []
self.show_all_mode = False
self.progress_sort_reverse = False
self.title_column_key = None
self.progress_column_key = None
self.progress_column_index = 3
self.playback_process = None
self.is_playing = False
self.is_paused = False
self.current_file_path = None
self.current_asin = None
self.CACHE_DIR.mkdir(parents=True, exist_ok=True)
def compose(self) -> ComposeResult:
yield Header()
yield Static("Loading...", id="status")
table = DataTable()
table.zebra_stripes = True
table.cursor_type = "row"
yield table
yield Footer()
def on_mount(self) -> None:
"""Initialize the table and start fetching library data."""
table = self.query_one(DataTable)
table.add_columns("Title", "Author", "Length", "Progress")
column_keys = list(table.columns.keys())
self.title_column_key = column_keys[0]
self.progress_column_key = column_keys[3]
if self.client:
self.update_status("Fetching library...")
self.fetch_library()
else:
self.update_status("Not authenticated. Please restart and authenticate.")
self.set_interval(1.0, self._check_playback_status)
def on_unmount(self) -> None:
"""Clean up on app exit."""
self._stop_playback()
def on_key(self, event: Key) -> None:
"""Handle key presses on DataTable."""
if isinstance(self.focused, DataTable):
if event.key == "enter":
event.prevent_default()
self.action_play_selected()
elif event.key == "space":
event.prevent_default()
self.action_toggle_playback()
def update_status(self, message: str) -> None:
"""Update the status message in the UI."""
status = self.query_one("#status", Static)
status.update(message)
@work(exclusive=True, thread=True)
def fetch_library(self) -> None:
"""Fetch all library items from Audible API in background thread."""
worker = get_current_worker()
if worker.is_cancelled:
return
try:
response_groups = (
"contributors,media,product_attrs,product_desc,product_details,"
"rating,is_finished,listening_status,percent_complete"
)
all_items = self._fetch_all_pages(response_groups)
self.call_from_thread(self.on_library_loaded, all_items)
except (OSError, ValueError, KeyError) as e:
self.call_from_thread(self.on_library_error, str(e))
def _fetch_all_pages(self, response_groups: str) -> list:
"""Fetch all pages of library items from the API."""
all_items = []
page = 1
page_size = 50
while True:
library = self.client.get(
path="library",
num_results=page_size,
page=page,
response_groups=response_groups,
)
items = library.get("items", [])
if not items:
break
all_items.extend(items)
self.call_from_thread(
self.update_status, f"Fetched page {page} ({len(items)} items)..."
)
if len(items) < page_size:
break
page += 1
return all_items
def on_library_loaded(self, items: list) -> None:
"""Handle successful library load."""
self.all_items = items
self.update_status(f"Loaded {len(items)} books")
self.show_unfinished()
def on_library_error(self, error: str) -> None:
"""Handle library fetch error."""
self.update_status(f"Error fetching library: {error}")
def _extract_title(self, item: dict) -> str:
"""Extract title from library item."""
product = item.get("product", {})
return (
product.get("title")
or item.get("title")
or product.get("asin", "Unknown Title")
)
def _extract_authors(self, item: dict) -> str:
"""Extract author names from library item."""
product = item.get("product", {})
authors = product.get("authors") or product.get("contributors") or []
if not authors and "authors" in item:
authors = item.get("authors", [])
author_names = [a.get("name", "") for a in authors if isinstance(a, dict)]
return ", ".join(author_names) or "Unknown"
def _extract_runtime_minutes(self, item: dict) -> int | None:
"""Extract runtime in minutes from library item."""
product = item.get("product", {})
runtime_fields = [
"runtime_length_min",
"runtime_length",
"vLength",
"length",
"duration",
]
runtime = None
for field in runtime_fields:
runtime = product.get(field) or item.get(field)
if runtime is not None:
break
if runtime is None:
return None
if isinstance(runtime, dict):
return int(runtime.get("min", 0))
elif isinstance(runtime, (int, float)):
return int(runtime)
return None
def _extract_progress_info(self, item: dict) -> float | None:
"""Extract progress percentage from library item."""
percent_complete = item.get("percent_complete")
listening_status = item.get("listening_status", {})
if isinstance(listening_status, dict) and percent_complete is None:
percent_complete = listening_status.get("percent_complete")
return float(percent_complete) if percent_complete is not None else None
def _extract_asin(self, item: dict) -> str | None:
"""Extract ASIN from library item."""
product = item.get("product", {})
return item.get("asin") or product.get("asin")
def _is_finished(self, item: dict) -> bool:
"""Check if a library item is finished."""
is_finished_flag = item.get("is_finished")
percent_complete = item.get("percent_complete")
listening_status = item.get("listening_status")
if isinstance(listening_status, dict):
is_finished_flag = is_finished_flag or listening_status.get(
"is_finished", False
)
if percent_complete is None:
percent_complete = listening_status.get("percent_complete", 0)
return bool(is_finished_flag) or (
isinstance(percent_complete, (int, float)) and percent_complete >= 100
)
def format_duration(
self, value: int | None, unit: str = "minutes", default_none: str | None = None
) -> str | None:
"""Format duration value into human-readable string."""
if value is None or value <= 0:
return default_none
if unit == "seconds":
total_minutes = int(value) // 60
else:
total_minutes = int(value)
if total_minutes < 60:
return f"{total_minutes} minute{'s' if total_minutes != 1 else ''}"
hours = total_minutes // 60
mins = total_minutes % 60
if mins == 0:
return f"{hours} hour{'s' if hours != 1 else ''}"
return f"{hours} hour{'s' if hours != 1 else ''} {mins} minute{'s' if mins != 1 else ''}"
def _populate_table(self, items: list) -> None:
"""Populate the DataTable with library items."""
table = self.query_one(DataTable)
table.clear()
if not items:
self.update_status("No books found.")
return
for item in items:
title = self._extract_title(item)
author_names = self._extract_authors(item)
minutes = self._extract_runtime_minutes(item)
runtime_str = self.format_duration(
minutes, unit="minutes", default_none="Unknown length"
)
percent_complete = self._extract_progress_info(item)
progress_str = "0%"
if percent_complete is not None and percent_complete > 0:
progress_str = f"{percent_complete:.1f}%"
table.add_row(
title,
author_names or "Unknown",
runtime_str or "Unknown",
progress_str,
key=title,
)
self.current_items = items
mode = "all" if self.show_all_mode else "unfinished"
self.update_status(f"Showing {len(items)} books ({mode})")
def show_all(self) -> None:
"""Display all books in the table."""
if not self.all_items:
return
self.show_all_mode = True
self._populate_table(self.all_items)
def show_unfinished(self) -> None:
"""Display only unfinished books in the table."""
if not self.all_items:
return
self.show_all_mode = False
unfinished_items = [
item for item in self.all_items if not self._is_finished(item)
]
self._populate_table(unfinished_items)
def action_toggle_dark(self) -> None:
"""Toggle between dark and light theme."""
self.theme = (
"textual-dark" if self.theme == "textual-light" else "textual-light"
)
def action_sort(self) -> None:
"""Sort table by title in ascending order."""
table = self.query_one(DataTable)
if table.row_count > 0:
table.sort(self.title_column_key)
def action_reverse_sort(self) -> None:
"""Sort table by title in descending order."""
table = self.query_one(DataTable)
if table.row_count > 0:
table.sort(self.title_column_key, reverse=True)
def action_sort_by_progress(self) -> None:
"""Sort table by progress percentage, toggling direction on each press."""
table = self.query_one(DataTable)
if table.row_count > 0:
self.progress_sort_reverse = not self.progress_sort_reverse
def progress_key(row_values):
progress_cell = row_values[self.progress_column_index]
if isinstance(progress_cell, str):
try:
return float(progress_cell.rstrip("%"))
except (ValueError, AttributeError):
return 0.0
return 0.0
table.sort(key=progress_key, reverse=self.progress_sort_reverse)
def action_show_all(self) -> None:
"""Action handler to show all books."""
self.show_all()
def action_show_unfinished(self) -> None:
"""Action handler to show unfinished books."""
self.show_unfinished()
def action_play_selected(self) -> None:
"""Start playing the selected book."""
table = self.query_one(DataTable)
if table.row_count == 0:
self.update_status("No books available")
return
cursor_row = table.cursor_row
if cursor_row >= len(self.current_items):
self.update_status("Invalid selection")
return
selected_item = self.current_items[cursor_row]
asin = self._extract_asin(selected_item)
if not asin:
self.update_status("Could not get ASIN for selected book")
return
if self.is_playing:
self._stop_playback()
self.current_asin = asin
self._start_playback_async(asin)
def action_toggle_playback(self) -> None:
"""Toggle pause/resume state."""
if not self.is_playing:
self.update_status("No playback active. Press Enter to play a book.")
return
if not self._is_process_alive():
self._stop_playback()
self.update_status("Playback has ended")
return
if self.is_paused:
self._resume_playback()
else:
self._pause_playback()
def _get_playback_status_message(self, prefix: str) -> str:
"""Generate status message with filename if available."""
filename = self.current_file_path.name if self.current_file_path else ""
return f"{prefix}: {filename}" if filename else prefix
def _check_playback_status(self) -> None:
"""Check if playback process has finished and update state accordingly."""
if self.playback_process is None:
return
return_code = self.playback_process.poll()
if return_code is not None:
finished_file = self.current_file_path
self.playback_process = None
self.is_playing = False
self.is_paused = False
if finished_file:
if return_code == 0:
self.update_status(f"Finished: {finished_file.name}")
else:
self.update_status(
f"Playback ended unexpectedly (code: {return_code}): {finished_file.name}"
)
else:
self.update_status("Playback finished")
self.current_file_path = None
self.current_asin = None
def _start_playback(self, path: Path, activation_hex: str | None = None) -> bool:
"""Start playing a local file using ffplay."""
if not shutil.which("ffplay"):
self.update_status("ffplay not found. Please install ffmpeg")
return False
if self.playback_process is not None:
self._stop_playback()
cmd = ["ffplay", "-nodisp", "-autoexit"]
if activation_hex:
cmd.extend(["-activation_bytes", activation_hex])
cmd.append(str(path))
try:
self.playback_process = subprocess.Popen(
cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
if self.playback_process.poll() is not None:
return_code = self.playback_process.returncode
self.update_status(
f"Playback process exited immediately (code: {return_code})"
)
self.playback_process = None
return False
self.is_playing = True
self.is_paused = False
self.current_file_path = path
self.update_status(f"Playing: {path.name}")
return True
except Exception as e:
self.update_status(f"Error starting playback: {e}")
return False
def _stop_playback(self) -> None:
"""Stop the current playback."""
if self.playback_process is None:
return
try:
if self.playback_process.poll() is None:
self.playback_process.terminate()
try:
self.playback_process.wait(timeout=2)
except subprocess.TimeoutExpired:
self.playback_process.kill()
self.playback_process.wait()
except ProcessLookupError:
pass
except Exception:
pass
finally:
self.playback_process = None
self.is_playing = False
self.is_paused = False
self.current_file_path = None
def _is_process_alive(self) -> bool:
"""Check if playback process is still running."""
if self.playback_process is None:
return False
return self.playback_process.poll() is None
def _pause_playback(self) -> None:
"""Pause the current playback."""
if not (self.playback_process and self.is_playing and not self.is_paused):
return
if not self._is_process_alive():
self._stop_playback()
self.update_status("Playback process has ended")
return
try:
os.kill(self.playback_process.pid, signal.SIGSTOP)
self.is_paused = True
self.update_status(self._get_playback_status_message("Paused"))
except ProcessLookupError:
self._stop_playback()
self.update_status("Process no longer exists")
except PermissionError:
self.update_status("Permission denied: cannot pause playback")
except Exception as e:
self.update_status(f"Error pausing playback: {e}")
def _resume_playback(self) -> None:
"""Resume the current playback."""
if not (self.playback_process and self.is_playing and self.is_paused):
return
if not self._is_process_alive():
self._stop_playback()
self.update_status("Playback process has ended")
return
try:
os.kill(self.playback_process.pid, signal.SIGCONT)
self.is_paused = False
self.update_status(self._get_playback_status_message("Playing"))
except ProcessLookupError:
self._stop_playback()
self.update_status("Process no longer exists")
except PermissionError:
self.update_status("Permission denied: cannot resume playback")
except Exception as e:
self.update_status(f"Error resuming playback: {e}")
@work(exclusive=True, thread=True)
def _start_playback_async(self, asin: str) -> None:
"""Start playback asynchronously."""
self.call_from_thread(self.update_status, "Preparing playback...")
local_path = self._get_or_download(asin)
if not local_path:
self.call_from_thread(self.update_status, "Could not download file")
return
self.call_from_thread(self.update_status, "Getting activation bytes...")
activation_hex = self._get_activation_bytes()
if not activation_hex:
self.call_from_thread(self.update_status, "Failed to get activation bytes")
return
self.call_from_thread(
self.update_status, f"Starting playback of {local_path.name}..."
)
self.call_from_thread(self._start_playback, local_path, activation_hex)
def _sanitize_filename(self, filename: str) -> str:
"""Remove invalid characters from filename."""
return re.sub(r'[<>:"/\\|?*]', "_", filename)
def _get_name_from_asin(self, asin: str) -> str | None:
"""Get the title/name of a book from its ASIN."""
try:
product_info = self.client.get(
path=f"1.0/catalog/products/{asin}",
response_groups="product_desc,product_attrs",
)
product = product_info.get("product", {})
return product.get("title") or "Unknown Title"
except Exception:
return None
def _get_download_link(self, asin: str, codec: str = DEFAULT_CODEC) -> str | None:
"""Get download link for book."""
if self.auth.adp_token is None:
return None
try:
params = {
"type": "AUDI",
"currentTransportMethod": "WIFI",
"key": asin,
"codec": codec,
}
response = httpx.get(
url=self.DOWNLOAD_URL,
params=params,
follow_redirects=False,
auth=self.auth,
)
response.raise_for_status()
link = response.headers.get("Location")
if not link:
return None
tld = self.auth.locale.domain
return link.replace("cds.audible.com", f"cds.audible.{tld}")
except Exception:
return None
def _download_file(self, url: str, dest_path: Path) -> Path | None:
"""Download file from URL to destination."""
try:
with httpx.stream("GET", url) as response:
response.raise_for_status()
total_size = int(response.headers.get("content-length", 0))
downloaded = 0
with open(dest_path, "wb") as f:
for chunk in response.iter_bytes(chunk_size=8192):
f.write(chunk)
downloaded += len(chunk)
if total_size > 0:
percent = (downloaded / total_size) * 100
self.call_from_thread(
self.update_status,
f"Downloading: {percent:.1f}% ({downloaded}/{total_size} bytes)",
)
return dest_path
except Exception:
return None
def _get_or_download(self, asin: str) -> Path | None:
"""Get local path of AAX file, downloading if missing."""
title = self._get_name_from_asin(asin) or asin
safe_title = self._sanitize_filename(title)
local_path = self.CACHE_DIR / f"{safe_title}.aax"
if local_path.exists() and local_path.stat().st_size >= self.MIN_FILE_SIZE:
self.call_from_thread(
self.update_status, f"Using cached file: {local_path.name}"
)
return local_path
self.call_from_thread(
self.update_status, f"Downloading to {local_path.name}..."
)
dl_link = self._get_download_link(asin)
if not dl_link:
self.call_from_thread(self.update_status, "Failed to get download link")
return None
if not self._download_file(dl_link, local_path):
self.call_from_thread(self.update_status, "Download failed")
return None
if not local_path.exists() or local_path.stat().st_size < self.MIN_FILE_SIZE:
self.call_from_thread(
self.update_status, "Download failed or file too small"
)
return None
return local_path
def _get_activation_bytes(self) -> str | None:
"""Get activation bytes as hex string."""
try:
activation_bytes = get_activation_bytes(self.auth)
if isinstance(activation_bytes, bytes):
return activation_bytes.hex()
return str(activation_bytes)
except Exception:
return None
def authenticate(self) -> None:
"""Authenticate with Audible and set auth and client objects."""
self.AUTH_PATH.parent.mkdir(parents=True, exist_ok=True)
if self.AUTH_PATH.exists():
try:
authenticator = audible.Authenticator.from_file(str(self.AUTH_PATH))
audible_client = audible.Client(auth=authenticator)
self.auth = authenticator
self.client = audible_client
return
except (OSError, ValueError, KeyError) as e:
print(f"Failed to load existing auth: {e}")
print("Please re-authenticate.")
print("Please authenticate with your Audible account.")
print("You will need to provide:")
print(" - Your Audible email/username")
print(" - Your password")
print(" - Your marketplace locale (e.g., 'US', 'UK', 'DE', 'FR')")
email = input("\nEmail: ")
password = getpass("Password: ")
marketplace = (
input("Marketplace locale (default: US): ").strip().upper() or "US"
)
authenticator = audible.Authenticator.from_login(
username=email, password=password, locale=marketplace
)
self.AUTH_PATH.parent.mkdir(parents=True, exist_ok=True)
authenticator.to_file(str(self.AUTH_PATH))
print("Authentication successful!")
audible_client = audible.Client(auth=authenticator)
self.auth = authenticator
self.client = audible_client
if __name__ == "__main__":
app = Auditui()
app.authenticate()
app.run()

View File

@@ -4,4 +4,7 @@ version = "0.1.0"
description = "An Audible TUI client"
readme = "README.md"
requires-python = ">=3.13"
dependencies = ["audible>=0.10.0", "textual>=6.7.1"]
dependencies = ["audible>=0.10.0", "httpx>=0.28.1", "textual>=6.7.1"]
[project.scripts]
auditui = "auditui.cli:main"

View File

@@ -37,7 +37,8 @@ class AudibleStats:
self.client = audible.Client(auth=self.auth)
return
except Exception:
logger.info("Failed to load existing auth. Re-authenticating.\n")
logger.info(
"Failed to load existing auth. Re-authenticating.\n")
email = input("Email: ")
password = getpass("Password: ")
@@ -85,7 +86,8 @@ class AudibleStats:
monthly_listening_interval_start_date=f"{middle}-01",
store="Audible",
)
monthly_stats = stats.get("aggregated_monthly_listening_stats", [])
monthly_stats = stats.get(
"aggregated_monthly_listening_stats", [])
has_activity = bool(
monthly_stats
and any(stat.get("aggregated_sum", 0) > 0 for stat in monthly_stats)
@@ -101,12 +103,38 @@ class AudibleStats:
return earliest_year
def get_current_month_listening_time(self) -> tuple[int, int, int]:
"""Get total listening time for the current month as (hours, minutes, seconds)."""
try:
stats = self.client.get(
"1.0/stats/aggregates",
monthly_listening_interval_duration="1",
monthly_listening_interval_start_date=date.today().strftime("%Y-%m"),
store="Audible",
)
monthly_stats = stats.get("aggregated_monthly_listening_stats", [])
if not monthly_stats:
return (0, 0, 0)
total_milliseconds = sum(
stat.get("aggregated_sum", 0) for stat in monthly_stats
)
total_seconds = int(total_milliseconds // 1000)
hours, remainder = divmod(total_seconds, 3600)
minutes, seconds = divmod(remainder, 60)
return (hours, minutes, seconds)
except Exception as e:
logger.warning(f"Could not get current month listening time: {e}")
return (0, 0, 0)
def main() -> None:
"""Main entry point."""
worker = AudibleStats()
worker.authenticate()
print(worker.get_signup_year())
hours, minutes, seconds = worker.get_current_month_listening_time()
print(f"Total listening time this month: {hours}h {minutes}m {seconds}s")
if __name__ == "__main__":

2
uv.lock generated
View File

@@ -37,12 +37,14 @@ version = "0.1.0"
source = { virtual = "." }
dependencies = [
{ name = "audible" },
{ name = "httpx" },
{ name = "textual" },
]
[package.metadata]
requires-dist = [
{ name = "audible", specifier = ">=0.10.0" },
{ name = "httpx", specifier = ">=0.28.1" },
{ name = "textual", specifier = ">=6.7.1" },
]