Compare commits

...

56 Commits

Author SHA1 Message Date
4741080284 clean: shorter messages 2025-12-16 06:24:36 +01:00
737147b457 clean: remove unused import 2025-12-16 06:21:27 +01:00
123d35068f refactor: use ui.py and remove unused imports 2025-12-16 06:21:20 +01:00
258aabe10f refactor: future-proof ui components in ui.py 2025-12-16 06:21:09 +01:00
bc070c4162 feat: relooking of help screen 2025-12-16 06:02:33 +01:00
cbf6bff779 feat: help screen now is scrollable and looks better 2025-12-16 06:02:22 +01:00
080c731fd7 feat: add css for new help screen 2025-12-16 03:35:46 +01:00
1b6f1ff1f2 feat: add a help screen with all keybindings 2025-12-16 03:35:33 +01:00
aa5998c3e3 docs: update roadmap and main description 2025-12-16 03:35:16 +01:00
c65e949731 feat: improve margin 2025-12-16 03:25:12 +01:00
ab51e5506e feat: hide useless palette 2025-12-16 03:25:02 +01:00
3701b37f4c docs: update roadmap 2025-12-16 03:10:32 +01:00
1474302d7e feat: add downloaded status indicator to table rows 2025-12-16 03:10:13 +01:00
eeecaaf42e feat: add cache-related method to get, remove or check 2025-12-16 03:09:26 +01:00
f359dee194 feat: add a "downloaded" column in the UI 2025-12-16 03:09:06 +01:00
1e2655670d feat: add a toggle to download/remove a book from cache 2025-12-16 03:08:56 +01:00
cf6164c438 docs: update keybindings 2025-12-16 02:59:02 +01:00
46fa15fcfe clean: remove dark/light toggle 2025-12-16 02:58:57 +01:00
4b457452d4 refactor: move table-related utilities to table_utils.py 2025-12-16 02:55:15 +01:00
0de0286992 fix: docstring 2025-12-16 02:50:42 +01:00
391b0360bd docs: update definition of roadmap item 2025-12-16 02:26:31 +01:00
b0dc15a018 refactor: table_helpers to table_utils 2025-12-16 01:55:18 +01:00
a6d74265ed docs: update roadmap 2025-12-16 01:47:03 +01:00
4f49a081c9 clean: keep one-line docstring for consistency 2025-12-15 21:06:55 +01:00
3a19db2cf0 refactor: extract table sorting logic and media info loading to new modules 2025-12-15 21:03:52 +01:00
fcb1524806 feat: standardize error handling patterns 2025-12-15 21:01:09 +01:00
18ffae7ac8 refactor: use unified time formatting method 2025-12-15 20:59:36 +01:00
d71c751bbc clean: remove obsolete private method 2025-12-15 20:59:26 +01:00
234b65c9d8 refactor: consolidate time formatting 2025-12-15 20:59:15 +01:00
2d9970c198 refactor: move constants to constants.py 2025-12-15 20:57:30 +01:00
5e3b33570d feat: add last position retrieval and save functionality 2025-12-15 13:25:05 +01:00
2ced756cc0 docs: update roadmap 2025-12-15 13:24:12 +01:00
1c4017ae0c feat: resume from last position and auto-save playback position 2025-12-15 13:23:53 +01:00
251a7a26d5 format: pep8 2025-12-15 13:23:13 +01:00
6462c83a21 feat: save position 2025-12-15 13:22:43 +01:00
0c590cfa82 feat: get current month listening time using API 2025-12-15 12:25:21 +01:00
16395981dc fix: handle accented characters correctly in title sorting 2025-12-15 12:14:33 +01:00
30f0612bb5 docs: update readme with bindings 2025-12-15 07:52:41 +01:00
1aaff3b3b7 fix: correct binding according to name 2025-12-15 07:50:11 +01:00
986541f0d3 fix: binding description 2025-12-15 07:48:07 +01:00
151d565f36 feat: remove a redundant toggle, pressing p twice sort/reverse sort by progress 2025-12-15 07:46:54 +01:00
7e2b657cfc feat: remove a redundant toggle, pressing s twice sort/reverse sort 2025-12-15 07:46:03 +01:00
cef5e40347 fix: split clients, add surface error and follow redirects on downloads 2025-12-14 09:49:03 +01:00
839394343e feat: make chunk size configurable 2025-12-14 09:37:47 +01:00
84868c4afa feat: add default chunk size 2025-12-14 09:37:28 +01:00
03988f0988 fix:l close download_manager flux on app exit 2025-12-14 09:35:25 +01:00
9eba702a0a feat: reuse connections for better performance 2025-12-14 09:35:05 +01:00
f61f4ec55e build: update uv.lock 2025-12-14 09:34:53 +01:00
b45ff86061 build: add httpx dependency 2025-12-14 09:34:47 +01:00
6824d00088 feat: add url validation before trying to download 2025-12-14 09:32:29 +01:00
46c66e0d5c docs: update readme 2025-12-14 09:28:44 +01:00
d4e73e6a13 feat: implement goto previous/next chapter 2025-12-14 09:28:05 +01:00
b2dd430ac9 feat: methods to goto next/previous chapter 2025-12-14 09:27:56 +01:00
ce0d313187 clean: just handle auth as configure handles the rest of the flow 2025-12-14 09:11:55 +01:00
7fee7e56cf feat: use configuration flow if not existing/not correct 2025-12-14 09:11:12 +01:00
58661641d1 feat: add a configuration flow 2025-12-14 09:10:56 +01:00
15 changed files with 897 additions and 196 deletions

View File

@@ -1,14 +1,6 @@
# auditui
A terminal-based user interface (TUI) client for Audible, written in Python 3.
Listen to your audiobooks or podcasts, browse your library, and more.
## What it does and where are we
The project offers a TUI interface for browsing your Audible library, listing your books with progress information. You can sort by progress or title, show all books, or show only unfinished books which is the default.
You can also play a book by pressing `Enter` on a book in the list, and pause/resume the playback by pressing `Space`. `Left` and `Right` let you move 30 seconds backward/forward.
A terminal-based user interface (TUI) client for Audible, written in Python 3 : listen to your audiobooks (even offline), browse and manage your library, and more!
Currently, the only available theme is Catppuccin Mocha, following their [style guide](https://github.com/catppuccin/catppuccin/blob/main/docs/style-guide.md), as it's my preferred theme across most of my tools.
@@ -32,6 +24,23 @@ $ uv run main.py
Please also note that as of now, you need to have [ffmpeg](https://ffmpeg.org/) installed to play audio files.
### Bindings
| Key | Action |
| ------------ | -------------------------- |
| `?` | Show help screen |
| `n` | Sort by name |
| `p` | Sort by progress |
| `a` | Show all/unfinished |
| `enter` | Play the selected book |
| `space` | Pause/resume the playback |
| `left` | Seek backward 30 seconds |
| `right` | Seek forward 30 seconds |
| `ctrl+left` | Go to the previous chapter |
| `ctrl+right` | Go to the next chapter |
| `d` | Download/delete from cache |
| `q` | Quit the application |
## Roadmap
- [x] list your library
@@ -41,16 +50,24 @@ Please also note that as of now, you need to have [ffmpeg](https://ffmpeg.org/)
- [x] print chapter and progress in the footer of the app while a book is playing
- [x] chapter progress bar in footer
- [x] add a control to jump 30s earlier/later
- [ ] add control to go to the previous/next chapter
- [ ] save/resume playback of a book from the last position, regardless of which device was used previously
- [x] add control to go to the previous/next chapter
- [x] save/resume playback of a book from the last position, regardless of which device was used previously
- [x] download/remove a book in the cache without having to play it
- [x] add a help screen with all the keybindings
- [ ] increase/decrease reading speed
- [ ] mark a book as finished or unfinished
- [ ] get your stats in a separated pane
- [ ] filter books on views
- [ ] search in your book library
And after that:
- [ ] search the marketplace for books
- [ ] add a book in your wishlist
- [ ] get your listening stats from Audible
- [ ] add multiple themes and theme selector
All of this, and of course:
- [ ] installation setup
- [ ] build a decent TUI interface using [Textual](https://textual.textualize.io/)
- [ ] code cleanup / organization

View File

@@ -10,10 +10,17 @@ from textual.events import Key
from textual.widgets import DataTable, Footer, Header, ProgressBar, Static
from textual.worker import get_current_worker
from .constants import TABLE_COLUMNS, TABLE_CSS
from .constants import PROGRESS_COLUMN_INDEX, SEEK_SECONDS, TABLE_CSS, TABLE_COLUMNS
from .downloads import DownloadManager
from .library import LibraryClient
from .playback import PlaybackController
from .table_utils import (
create_progress_sort_key,
create_title_sort_key,
filter_unfinished_items,
format_item_as_row,
)
from .ui import HelpScreen
if TYPE_CHECKING:
from textual.widgets._data_table import ColumnKey
@@ -22,18 +29,22 @@ if TYPE_CHECKING:
class Auditui(App):
"""Main application class for the Audible TUI app."""
theme = "textual-dark"
SHOW_PALETTE = False
BINDINGS = [
("d", "toggle_dark", "Toggle dark mode"),
("s", "sort", "Sort by title"),
("r", "reverse_sort", "Reverse sort"),
("?", "show_help", "Help"),
("n", "sort", "Sort by name"),
("p", "sort_by_progress", "Sort by progress"),
("a", "show_all", "Show all books"),
("u", "show_unfinished", "Show unfinished"),
("enter", "play_selected", "Play selected book"),
("a", "show_all", "All/Unfinished"),
("enter", "play_selected", "Play"),
("space", "toggle_playback", "Pause/Resume"),
("left", "seek_backward", "Seek -30s"),
("right", "seek_forward", "Seek +30s"),
("q", "quit", "Quit application"),
("left", "seek_backward", "-30s"),
("right", "seek_forward", "+30s"),
("ctrl+left", "previous_chapter", "Previous chapter"),
("ctrl+right", "next_chapter", "Next chapter"),
("d", "toggle_download", "Download/Delete"),
("q", "quit", "Quit"),
]
CSS = TABLE_CSS
@@ -46,15 +57,17 @@ class Auditui(App):
self.download_manager = (
DownloadManager(auth, client) if auth and client else None
)
self.playback = PlaybackController(self.update_status)
self.playback = PlaybackController(
self.update_status, self.library_client)
self.all_items: list[dict] = []
self.current_items: list[dict] = []
self.show_all_mode = False
self.title_sort_reverse = False
self.progress_sort_reverse = False
self.title_column_key: ColumnKey | None = None
self.progress_column_key: ColumnKey | None = None
self.progress_column_index = 3
self.progress_column_index = PROGRESS_COLUMN_INDEX
def compose(self) -> ComposeResult:
yield Header()
@@ -65,7 +78,7 @@ class Auditui(App):
yield table
yield Static("", id="progress_info")
yield ProgressBar(id="progress_bar", show_eta=False, show_percentage=False, total=100)
yield Footer()
yield Footer(show_command_palette=False)
def on_mount(self) -> None:
"""Initialize the table and start fetching library data."""
@@ -79,17 +92,39 @@ class Auditui(App):
self.update_status("Fetching library...")
self.fetch_library()
else:
self.update_status("Not authenticated. Please restart and authenticate.")
self.update_status(
"Not authenticated. Please restart and authenticate.")
self.set_interval(1.0, self._check_playback_status)
self.set_interval(0.5, self._update_progress)
self.set_interval(30.0, self._save_position_periodically)
def on_unmount(self) -> None:
"""Clean up on app exit."""
self.playback.stop()
if self.download_manager:
self.download_manager.close()
def on_key(self, event: Key) -> None:
"""Handle key presses on DataTable."""
"""Handle key presses."""
if self.playback.is_playing:
if event.key == "ctrl+left":
event.prevent_default()
self.action_previous_chapter()
return
elif event.key == "ctrl+right":
event.prevent_default()
self.action_next_chapter()
return
elif event.key == "left":
event.prevent_default()
self.action_seek_backward()
return
elif event.key == "right":
event.prevent_default()
self.action_seek_forward()
return
if isinstance(self.focused, DataTable):
if event.key == "enter":
event.prevent_default()
@@ -97,12 +132,6 @@ class Auditui(App):
elif event.key == "space":
event.prevent_default()
self.action_toggle_playback()
elif event.key == "left" and self.playback.is_playing:
event.prevent_default()
self.action_seek_backward()
elif event.key == "right" and self.playback.is_playing:
event.prevent_default()
self.action_seek_forward()
def update_status(self, message: str) -> None:
"""Update the status message in the UI."""
@@ -127,7 +156,7 @@ class Auditui(App):
except (OSError, ValueError, KeyError) as exc:
self.call_from_thread(self.on_library_error, str(exc))
def on_library_loaded(self, items: list) -> None:
def on_library_loaded(self, items: list[dict]) -> None:
"""Handle successful library load."""
self.all_items = items
self.update_status(f"Loaded {len(items)} books")
@@ -137,7 +166,7 @@ class Auditui(App):
"""Handle library fetch error."""
self.update_status(f"Error fetching library: {error}")
def _populate_table(self, items: list) -> None:
def _populate_table(self, items: list[dict]) -> None:
"""Populate the DataTable with library items."""
table = self.query_one(DataTable)
table.clear()
@@ -147,29 +176,20 @@ class Auditui(App):
return
for item in items:
title = self.library_client.extract_title(item)
author_names = self.library_client.extract_authors(item)
if author_names and len(author_names) > 40:
author_names = f"{author_names[:37]}..."
minutes = self.library_client.extract_runtime_minutes(item)
runtime_str = self.library_client.format_duration(
minutes, unit="minutes", default_none="Unknown length"
)
percent_complete = self.library_client.extract_progress_info(item)
progress_str = f"{percent_complete:.1f}%" if percent_complete and percent_complete > 0 else "0%"
table.add_row(
title,
author_names or "Unknown",
runtime_str or "Unknown",
progress_str,
key=title,
)
title, author, runtime, progress, downloaded = format_item_as_row(
item, self.library_client, self.download_manager)
table.add_row(title, author, runtime,
progress, downloaded, key=title)
self.current_items = items
mode = "all" if self.show_all_mode else "unfinished"
self.update_status(f"Showing {len(items)} books ({mode})")
def _refresh_table(self) -> None:
"""Refresh the table with current items."""
if self.current_items:
self._populate_table(self.current_items)
def show_all(self) -> None:
"""Display all books in the table."""
if not self.all_items:
@@ -183,49 +203,33 @@ class Auditui(App):
return
self.show_all_mode = False
unfinished_items = [
item for item in self.all_items if not self.library_client.is_finished(item)
]
unfinished_items = filter_unfinished_items(
self.all_items, self.library_client)
self._populate_table(unfinished_items)
def action_toggle_dark(self) -> None:
"""Toggle between dark and light theme."""
self.theme = (
"textual-dark" if self.theme == "textual-light" else "textual-light"
)
def action_sort(self) -> None:
"""Sort table by title in ascending order."""
"""Sort table by title, toggling direction on each press."""
table = self.query_one(DataTable)
if table.row_count > 0 and self.title_column_key:
table.sort(self.title_column_key)
def action_reverse_sort(self) -> None:
"""Sort table by title in descending order."""
table = self.query_one(DataTable)
if table.row_count > 0 and self.title_column_key:
table.sort(self.title_column_key, reverse=True)
title_key, reverse = create_title_sort_key(self.title_sort_reverse)
table.sort(key=title_key, reverse=reverse)
self.title_sort_reverse = not self.title_sort_reverse
def action_sort_by_progress(self) -> None:
"""Sort table by progress percentage, toggling direction on each press."""
table = self.query_one(DataTable)
if table.row_count > 0:
self.progress_sort_reverse = not self.progress_sort_reverse
def progress_key(row_values):
progress_cell = row_values[self.progress_column_index]
if isinstance(progress_cell, str):
try:
return float(progress_cell.rstrip("%"))
except (ValueError, AttributeError):
return 0.0
return 0.0
table.sort(key=progress_key, reverse=self.progress_sort_reverse)
progress_key, reverse = create_progress_sort_key(
self.progress_column_index, self.progress_sort_reverse)
table.sort(key=progress_key, reverse=reverse)
def action_show_all(self) -> None:
"""Show all books."""
self.show_all()
"""Toggle between showing all and unfinished books."""
if self.show_all_mode:
self.show_unfinished()
else:
self.show_all()
def action_show_unfinished(self) -> None:
"""Show unfinished books."""
@@ -234,7 +238,8 @@ class Auditui(App):
def action_play_selected(self) -> None:
"""Start playing the selected book."""
if not self.download_manager:
self.update_status("Not authenticated. Please restart and authenticate.")
self.update_status(
"Not authenticated. Please restart and authenticate.")
return
table = self.query_one(DataTable)
@@ -247,8 +252,12 @@ class Auditui(App):
self.update_status("Invalid selection")
return
if not self.library_client:
self.update_status("Library client not available")
return
selected_item = self.current_items[cursor_row]
asin = self.library_client.extract_asin(selected_item) if self.library_client else None
asin = self.library_client.extract_asin(selected_item)
if not asin:
self.update_status("Could not get ASIN for selected book")
@@ -263,18 +272,32 @@ class Auditui(App):
def action_seek_forward(self) -> None:
"""Seek forward 30 seconds."""
if not self.playback.seek_forward(30.0):
if not self.playback.seek_forward(SEEK_SECONDS):
self._no_playback_message()
def action_seek_backward(self) -> None:
"""Seek backward 30 seconds."""
if not self.playback.seek_backward(30.0):
if not self.playback.seek_backward(SEEK_SECONDS):
self._no_playback_message()
def action_next_chapter(self) -> None:
"""Seek to the next chapter."""
if not self.playback.seek_to_next_chapter():
self._no_playback_message()
def action_previous_chapter(self) -> None:
"""Seek to the previous chapter."""
if not self.playback.seek_to_previous_chapter():
self._no_playback_message()
def _no_playback_message(self) -> None:
"""Show message when no playback is active."""
self.update_status("No playback active. Press Enter to play a book.")
def action_show_help(self) -> None:
"""Show the help screen with all keybindings."""
self.push_screen(HelpScreen())
def _check_playback_status(self) -> None:
"""Check if playback process has finished and update state accordingly."""
message = self.playback.check_status()
@@ -301,11 +324,13 @@ class Auditui(App):
progress_info = self.query_one("#progress_info", Static)
progress_bar = self.query_one("#progress_bar", ProgressBar)
progress_percent = min(100.0, max(0.0, (chapter_elapsed / chapter_total) * 100.0))
progress_percent = min(100.0, max(
0.0, (chapter_elapsed / chapter_total) * 100.0))
progress_bar.update(progress=progress_percent)
chapter_elapsed_str = self._format_time(chapter_elapsed)
chapter_total_str = self._format_time(chapter_total)
progress_info.update(f"{chapter_name} | {chapter_elapsed_str} / {chapter_total_str}")
chapter_elapsed_str = LibraryClient.format_time(chapter_elapsed)
chapter_total_str = LibraryClient.format_time(chapter_total)
progress_info.update(
f"{chapter_name} | {chapter_elapsed_str} / {chapter_total_str}")
progress_info.display = True
progress_bar.display = True
@@ -316,16 +341,54 @@ class Auditui(App):
progress_info.display = False
progress_bar.display = False
def _format_time(self, seconds: float) -> str:
"""Format seconds as HH:MM:SS or MM:SS."""
total_seconds = int(seconds)
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
secs = total_seconds % 60
def _save_position_periodically(self) -> None:
"""Periodically save playback position."""
self.playback.update_position_if_needed()
if hours > 0:
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
return f"{minutes:02d}:{secs:02d}"
def action_toggle_download(self) -> None:
"""Toggle download/remove for the selected book."""
if not self.download_manager:
self.update_status(
"Not authenticated. Please restart and authenticate.")
return
table = self.query_one(DataTable)
if table.row_count == 0:
self.update_status("No books available")
return
cursor_row = table.cursor_row
if cursor_row >= len(self.current_items):
self.update_status("Invalid selection")
return
if not self.library_client:
self.update_status("Library client not available")
return
selected_item = self.current_items[cursor_row]
asin = self.library_client.extract_asin(selected_item)
if not asin:
self.update_status("Could not get ASIN for selected book")
return
self._toggle_download_async(asin)
@work(exclusive=True, thread=True)
def _toggle_download_async(self, asin: str) -> None:
"""Toggle download/remove asynchronously."""
if not self.download_manager:
return
if self.download_manager.is_cached(asin):
self.download_manager.remove_cached(
asin, self._thread_status_update)
else:
self.download_manager.get_or_download(
asin, self._thread_status_update)
self.call_from_thread(self._refresh_table)
@work(exclusive=True, thread=True)
def _start_playback_async(self, asin: str) -> None:

View File

@@ -1,6 +1,5 @@
"""Authentication helpers for the Auditui app."""
from getpass import getpass
from pathlib import Path
from typing import Tuple
@@ -13,30 +12,14 @@ def authenticate(
auth_path: Path = AUTH_PATH,
) -> Tuple[audible.Authenticator, audible.Client]:
"""Authenticate with Audible and return authenticator and client."""
auth_path.parent.mkdir(parents=True, exist_ok=True)
if not auth_path.exists():
raise FileNotFoundError(
"Authentication file not found. Please run 'auditui configure' to set up authentication.")
if auth_path.exists():
try:
authenticator = audible.Authenticator.from_file(str(auth_path))
audible_client = audible.Client(auth=authenticator)
return authenticator, audible_client
except (OSError, ValueError, KeyError) as exc:
print(f"Failed to load existing auth: {exc}")
print("Please re-authenticate.")
print("Please authenticate with your Audible account.")
email = input("\nEmail: ")
password = getpass("Password: ")
marketplace = input(
"Marketplace locale (default: US): ").strip().upper() or "US"
authenticator = audible.Authenticator.from_login(
username=email, password=password, locale=marketplace
)
auth_path.parent.mkdir(parents=True, exist_ok=True)
authenticator.to_file(str(auth_path))
print("Authentication successful!")
audible_client = audible.Client(auth=authenticator)
return authenticator, audible_client
try:
authenticator = audible.Authenticator.from_file(str(auth_path))
audible_client = audible.Client(auth=authenticator)
return authenticator, audible_client
except (OSError, ValueError, KeyError) as exc:
raise ValueError(
f"Failed to load existing authentication: {exc}") from exc

40
auditui/configure.py Normal file
View File

@@ -0,0 +1,40 @@
"""Configuration helpers for the Auditui app."""
from getpass import getpass
from pathlib import Path
from typing import Tuple
import audible
from .constants import AUTH_PATH
def configure(
auth_path: Path = AUTH_PATH,
) -> Tuple[audible.Authenticator, audible.Client]:
"""Force re-authentication and save credentials."""
if auth_path.exists():
response = input(
"Configuration already exists. Are you sure you want to overwrite it? (y/N): "
).strip().lower()
if response not in ("yes", "y"):
print("Configuration cancelled.")
raise SystemExit(0)
print("Please authenticate with your Audible account.")
email = input("\nEmail: ")
password = getpass("Password: ")
marketplace = input(
"Marketplace locale (default: US): ").strip().upper() or "US"
authenticator = audible.Authenticator.from_login(
username=email, password=password, locale=marketplace
)
auth_path.parent.mkdir(parents=True, exist_ok=True)
authenticator.to_file(str(auth_path))
print("Authentication successful!")
audible_client = audible.Client(auth=authenticator)
return authenticator, audible_client

View File

@@ -7,8 +7,14 @@ CACHE_DIR = Path.home() / ".cache" / "auditui" / "books"
DOWNLOAD_URL = "https://cde-ta-g7g.amazon.com/FionaCDEServiceEngine/FSDownloadContent"
DEFAULT_CODEC = "LC_128_44100_stereo"
MIN_FILE_SIZE = 1024 * 1024
DEFAULT_CHUNK_SIZE = 8192
TABLE_COLUMNS = ("Title", "Author", "Length", "Progress")
TABLE_COLUMNS = ("Title", "Author", "Length", "Progress", "Downloaded")
AUTHOR_NAME_MAX_LENGTH = 40
AUTHOR_NAME_DISPLAY_LENGTH = 37
PROGRESS_COLUMN_INDEX = 3
SEEK_SECONDS = 30.0
TABLE_CSS = """
Screen {
@@ -24,10 +30,10 @@ Header {
Footer {
background: #181825;
color: #bac2de;
height: 1;
height: 2;
padding: 0 1;
scrollbar-size: 0 0;
overflow-x: hidden;
overflow-x: auto;
overflow-y: hidden;
}
@@ -43,7 +49,7 @@ FooterKey.-grouped,
Footer.-compact FooterKey {
background: #181825;
padding: 0;
margin: 0 2 0 0;
margin: 0 1 0 0;
}
FooterKey .footer-key--key {
@@ -134,6 +140,97 @@ ProgressBar#progress_bar > .progress-bar--track {
ProgressBar#progress_bar > .progress-bar--bar {
background: #a6e3a1;
width: auto;
}
HelpScreen {
align: center middle;
background: rgba(0, 0, 0, 0.7);
}
#help_container {
width: 70;
height: auto;
max-height: 85%;
min-height: 20;
background: #1e1e2e;
border: thick #89b4fa;
padding: 2;
}
#help_title {
text-align: center;
text-style: bold;
color: #89b4fa;
margin-bottom: 2;
padding-bottom: 1;
border-bottom: solid #585b70;
height: 3;
align: center middle;
}
#help_content {
width: 100%;
height: 1fr;
padding: 1 0;
margin: 1 0;
overflow-y: auto;
scrollbar-size: 0 1;
}
#help_content > .scrollbar--vertical {
background: #313244;
}
#help_content > .scrollbar--vertical > .scrollbar--track {
background: #181825;
}
#help_content > .scrollbar--vertical > .scrollbar--handle {
background: #585b70;
}
#help_content > .scrollbar--vertical > .scrollbar--handle:hover {
background: #45475a;
}
.help_row {
height: 3;
margin: 0 0 1 0;
padding: 0 1;
background: #181825;
border: solid #313244;
align: left middle;
}
.help_row:hover {
background: #313244;
border: solid #45475a;
}
.help_key {
width: 20;
text-align: right;
padding: 0 2 0 0;
color: #f9e2af;
text-style: bold;
align: right middle;
}
.help_action {
width: 1fr;
text-align: left;
padding: 0 0 0 2;
color: #cdd6f4;
align: left middle;
}
#help_footer {
text-align: center;
color: #bac2de;
margin-top: 2;
padding-top: 1;
border-top: solid #585b70;
height: 3;
align: center middle;
}
"""

View File

@@ -3,12 +3,13 @@
import re
from pathlib import Path
from typing import Callable
from urllib.parse import urlparse
import audible
import httpx
from audible.activation_bytes import get_activation_bytes
from .constants import CACHE_DIR, DEFAULT_CODEC, DOWNLOAD_URL, MIN_FILE_SIZE
from .constants import CACHE_DIR, DEFAULT_CHUNK_SIZE, DEFAULT_CODEC, DOWNLOAD_URL, MIN_FILE_SIZE
StatusCallback = Callable[[str], None]
@@ -17,12 +18,24 @@ class DownloadManager:
"""Handle retrieval and download of Audible titles."""
def __init__(
self, auth: audible.Authenticator, client: audible.Client, cache_dir: Path = CACHE_DIR
self,
auth: audible.Authenticator,
client: audible.Client,
cache_dir: Path = CACHE_DIR,
chunk_size: int = DEFAULT_CHUNK_SIZE,
) -> None:
self.auth = auth
self.client = client
self.cache_dir = cache_dir
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.chunk_size = chunk_size
self._http_client = httpx.Client(
auth=auth, timeout=30.0, follow_redirects=True)
self._download_client = httpx.Client(
timeout=httpx.Timeout(connect=30.0, read=None,
write=30.0, pool=30.0),
follow_redirects=True,
)
def get_or_download(self, asin: str, notify: StatusCallback | None = None) -> Path | None:
"""Get local path of AAX file, downloading if missing."""
@@ -38,12 +51,17 @@ class DownloadManager:
if notify:
notify(f"Downloading to {local_path.name}...")
dl_link = self._get_download_link(asin)
dl_link = self._get_download_link(asin, notify=notify)
if not dl_link:
if notify:
notify("Failed to get download link")
return None
if not self._validate_download_url(dl_link):
if notify:
notify("Invalid download URL")
return None
if not self._download_file(dl_link, local_path, notify):
if notify:
notify("Download failed")
@@ -63,9 +81,48 @@ class DownloadManager:
if isinstance(activation_bytes, bytes):
return activation_bytes.hex()
return str(activation_bytes)
except Exception:
except (OSError, ValueError, KeyError, AttributeError):
return None
def get_cached_path(self, asin: str) -> Path | None:
"""Get the cached file path for a book if it exists."""
title = self._get_name_from_asin(asin) or asin
safe_title = self._sanitize_filename(title)
local_path = self.cache_dir / f"{safe_title}.aax"
if local_path.exists() and local_path.stat().st_size >= MIN_FILE_SIZE:
return local_path
return None
def is_cached(self, asin: str) -> bool:
"""Check if a book is already cached."""
return self.get_cached_path(asin) is not None
def remove_cached(self, asin: str, notify: StatusCallback | None = None) -> bool:
"""Remove a cached book file."""
cached_path = self.get_cached_path(asin)
if not cached_path:
if notify:
notify("Book is not cached")
return False
try:
cached_path.unlink()
if notify:
notify(f"Removed from cache: {cached_path.name}")
return True
except OSError as exc:
if notify:
notify(f"Failed to remove cache: {exc}")
return False
def _validate_download_url(self, url: str) -> bool:
"""Validate that the URL is a valid HTTP/HTTPS URL."""
try:
parsed = urlparse(url)
return parsed.scheme in ("http", "https") and bool(parsed.netloc)
except (ValueError, AttributeError):
return False
def _sanitize_filename(self, filename: str) -> str:
"""Remove invalid characters from filename."""
return re.sub(r'[<>:"/\\|?*]', "_", filename)
@@ -79,12 +136,16 @@ class DownloadManager:
)
product = product_info.get("product", {})
return product.get("title") or "Unknown Title"
except Exception:
except (OSError, ValueError, KeyError):
return None
def _get_download_link(self, asin: str, codec: str = DEFAULT_CODEC) -> str | None:
def _get_download_link(
self, asin: str, codec: str = DEFAULT_CODEC, notify: StatusCallback | None = None
) -> str | None:
"""Get download link for book."""
if self.auth.adp_token is None:
if notify:
notify("Missing ADP token (not authenticated?)")
return None
try:
@@ -94,22 +155,26 @@ class DownloadManager:
"key": asin,
"codec": codec,
}
response = httpx.get(
response = self._http_client.get(
url=DOWNLOAD_URL,
params=params,
follow_redirects=False,
auth=self.auth,
)
response.raise_for_status()
link = response.headers.get("Location")
if not link:
return None
link = str(response.url)
tld = self.auth.locale.domain
return link.replace("cds.audible.com", f"cds.audible.{tld}")
except Exception:
except httpx.HTTPError as exc:
if notify:
notify(f"Download-link request failed: {exc!s}")
return None
except (OSError, ValueError, KeyError, AttributeError) as exc:
if notify:
notify(f"Download-link error: {exc!s}")
return None
def _download_file(
@@ -117,13 +182,13 @@ class DownloadManager:
) -> Path | None:
"""Download file from URL to destination."""
try:
with httpx.stream("GET", url) as response:
with self._download_client.stream("GET", url) as response:
response.raise_for_status()
total_size = int(response.headers.get("content-length", 0))
downloaded = 0
with open(dest_path, "wb") as file_handle:
for chunk in response.iter_bytes(chunk_size=8192):
for chunk in response.iter_bytes(chunk_size=self.chunk_size):
file_handle.write(chunk)
downloaded += len(chunk)
if total_size > 0 and notify:
@@ -133,6 +198,39 @@ class DownloadManager:
)
return dest_path
except Exception:
except httpx.HTTPStatusError as exc:
if notify:
notify(
f"Download HTTP error: {exc.response.status_code} {exc.response.reason_phrase}"
)
try:
if dest_path.exists() and dest_path.stat().st_size < MIN_FILE_SIZE:
dest_path.unlink()
except OSError:
pass
return None
except httpx.HTTPError as exc:
if notify:
notify(f"Download network error: {exc!s}")
try:
if dest_path.exists() and dest_path.stat().st_size < MIN_FILE_SIZE:
dest_path.unlink()
except OSError:
pass
return None
except (OSError, ValueError, KeyError) as exc:
if notify:
notify(f"Download error: {exc!s}")
try:
if dest_path.exists() and dest_path.stat().st_size < MIN_FILE_SIZE:
dest_path.unlink()
except OSError:
pass
return None
def close(self) -> None:
"""Close the HTTP clients and release resources."""
if hasattr(self, "_http_client"):
self._http_client.close()
if hasattr(self, "_download_client"):
self._download_client.close()

View File

@@ -69,7 +69,8 @@ class LibraryClient:
if not authors and "authors" in item:
authors = item.get("authors", [])
author_names = [a.get("name", "") for a in authors if isinstance(a, dict)]
author_names = [a.get("name", "")
for a in authors if isinstance(a, dict)]
return ", ".join(author_names) or "Unknown"
def extract_runtime_minutes(self, item: dict) -> int | None:
@@ -127,9 +128,84 @@ class LibraryClient:
percent_complete = listening_status.get("percent_complete", 0)
return bool(is_finished_flag) or (
isinstance(percent_complete, (int, float)) and percent_complete >= 100
isinstance(percent_complete, (int, float)
) and percent_complete >= 100
)
def get_last_position(self, asin: str) -> float | None:
"""Get the last playback position for a book in seconds."""
try:
response = self.client.get(
path="1.0/annotations/lastpositions",
asins=asin,
)
annotations = response.get("asin_last_position_heard_annots", [])
for annot in annotations:
if annot.get("asin") != asin:
continue
last_position_heard = annot.get("last_position_heard", {})
if not isinstance(last_position_heard, dict):
continue
if last_position_heard.get("status") == "DoesNotExist":
return None
position_ms = last_position_heard.get("position_ms")
if position_ms is not None:
return float(position_ms) / 1000.0
return None
except (OSError, ValueError, KeyError):
return None
def _get_content_reference(self, asin: str) -> dict | None:
"""Get content reference data including ACR and version."""
try:
response = self.client.get(
path=f"1.0/content/{asin}/metadata",
response_groups="content_reference",
)
content_metadata = response.get("content_metadata", {})
content_reference = content_metadata.get("content_reference", {})
if isinstance(content_reference, dict):
return content_reference
return None
except (OSError, ValueError, KeyError):
return None
def save_last_position(self, asin: str, position_seconds: float) -> bool:
"""Save the last playback position for a book."""
if position_seconds <= 0:
return False
content_ref = self._get_content_reference(asin)
if not content_ref:
return False
acr = content_ref.get("acr")
if not acr:
return False
body = {
"acr": acr,
"asin": asin,
"position_ms": int(position_seconds * 1000),
}
if version := content_ref.get("version"):
body["version"] = version
try:
self.client.put(
path=f"1.0/lastpositions/{asin}",
body=body,
)
return True
except (OSError, ValueError, KeyError):
return False
@staticmethod
def format_duration(
value: int | None, unit: str = "minutes", default_none: str | None = None
@@ -151,3 +227,15 @@ class LibraryClient:
parts.append(f"{minutes} minute{'s' if minutes != 1 else ''}")
return " ".join(parts) if parts else default_none
@staticmethod
def format_time(seconds: float) -> str:
"""Format seconds as HH:MM:SS or MM:SS."""
total_seconds = int(seconds)
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
secs = total_seconds % 60
if hours > 0:
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
return f"{minutes:02d}:{secs:02d}"

42
auditui/media_info.py Normal file
View File

@@ -0,0 +1,42 @@
"""Media information loading for Audible content."""
import json
import shutil
import subprocess
from pathlib import Path
def load_media_info(path: Path, activation_hex: str | None = None) -> tuple[float | None, list[dict]]:
"""Load media information including duration and chapters using ffprobe."""
if not shutil.which("ffprobe"):
return None, []
try:
cmd = ["ffprobe", "-v", "quiet", "-print_format",
"json", "-show_format", "-show_chapters"]
if activation_hex:
cmd.extend(["-activation_bytes", activation_hex])
cmd.append(str(path))
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=10)
if result.returncode != 0:
return None, []
data = json.loads(result.stdout)
format_info = data.get("format", {})
duration_str = format_info.get("duration")
duration = float(duration_str) if duration_str else None
chapters_data = data.get("chapters", [])
chapters = [
{
"start_time": float(ch.get("start_time", 0)),
"end_time": float(ch.get("end_time", 0)),
"title": ch.get("tags", {}).get("title", f"Chapter {idx + 1}"),
}
for idx, ch in enumerate(chapters_data)
]
return duration, chapters
except (json.JSONDecodeError, subprocess.TimeoutExpired, ValueError, KeyError):
return None, []

View File

@@ -2,7 +2,6 @@
from __future__ import annotations
import json
import os
import shutil
import signal
@@ -12,6 +11,8 @@ from pathlib import Path
from typing import Callable
from .downloads import DownloadManager
from .library import LibraryClient
from .media_info import load_media_info
StatusCallback = Callable[[str], None]
@@ -19,8 +20,9 @@ StatusCallback = Callable[[str], None]
class PlaybackController:
"""Manage playback through ffplay."""
def __init__(self, notify: StatusCallback) -> None:
def __init__(self, notify: StatusCallback, library_client: LibraryClient | None = None) -> None:
self.notify = notify
self.library_client = library_client
self.playback_process: subprocess.Popen | None = None
self.is_playing = False
self.is_paused = False
@@ -33,6 +35,8 @@ class PlaybackController:
self.chapters: list[dict] = []
self.seek_offset: float = 0.0
self.activation_hex: str | None = None
self.last_save_time: float = 0.0
self.position_save_interval: float = 30.0
def start(
self,
@@ -74,7 +78,8 @@ class PlaybackController:
notify("Reached end of file")
self._reset_state()
return False
notify(f"Playback process exited immediately (code: {return_code})")
notify(
f"Playback process exited immediately (code: {return_code})")
self.playback_process = None
return False
@@ -84,11 +89,13 @@ class PlaybackController:
self.playback_start_time = time.time()
self.paused_duration = 0.0
self.pause_start_time = None
self._load_media_info(path, activation_hex)
duration, chapters = load_media_info(path, activation_hex)
self.total_duration = duration
self.chapters = chapters
notify(f"Playing: {path.name}")
return True
except Exception as exc:
except (OSError, ValueError, subprocess.SubprocessError) as exc:
notify(f"Error starting playback: {exc}")
return False
@@ -97,6 +104,8 @@ class PlaybackController:
if self.playback_process is None:
return
self._save_current_position()
try:
if self.playback_process.poll() is None:
self.playback_process.terminate()
@@ -160,6 +169,7 @@ class PlaybackController:
self.chapters = []
self.seek_offset = 0.0
self.activation_hex = None
self.last_save_time = 0.0
def _validate_playback_state(self, require_paused: bool) -> bool:
"""Validate playback state before pause/resume operations."""
@@ -194,7 +204,7 @@ class PlaybackController:
self.notify("Process no longer exists")
except PermissionError:
self.notify(f"Permission denied: cannot {action} playback")
except Exception as exc:
except (OSError, ValueError) as exc:
self.notify(f"Error {action}ing playback: {exc}")
def is_alive(self) -> bool:
@@ -229,9 +239,21 @@ class PlaybackController:
notify("Failed to get activation bytes")
return False
start_position = 0.0
if self.library_client:
try:
last_position = self.library_client.get_last_position(asin)
if last_position is not None and last_position > 0:
start_position = last_position
notify(
f"Resuming from {LibraryClient.format_time(start_position)}")
except (OSError, ValueError, KeyError):
pass
notify(f"Starting playback of {local_path.name}...")
self.current_asin = asin
return self.start(local_path, activation_hex, notify)
self.last_save_time = time.time()
return self.start(local_path, activation_hex, notify, start_position)
def toggle_playback(self) -> bool:
"""Toggle pause/resume state. Returns True if action was taken."""
@@ -338,41 +360,6 @@ class PlaybackController:
"""Seek backward by specified seconds. Returns True if action was taken."""
return self._seek(seconds, "backward")
def _load_media_info(self, path: Path, activation_hex: str | None) -> None:
"""Load media information including duration and chapters using ffprobe."""
if not shutil.which("ffprobe"):
return
try:
cmd = ["ffprobe", "-v", "quiet", "-print_format",
"json", "-show_format", "-show_chapters"]
if activation_hex:
cmd.extend(["-activation_bytes", activation_hex])
cmd.append(str(path))
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=10)
if result.returncode != 0:
return
data = json.loads(result.stdout)
format_info = data.get("format", {})
duration_str = format_info.get("duration")
if duration_str:
self.total_duration = float(duration_str)
chapters_data = data.get("chapters", [])
self.chapters = [
{
"start_time": float(ch.get("start_time", 0)),
"end_time": float(ch.get("end_time", 0)),
"title": ch.get("tags", {}).get("title", f"Chapter {idx + 1}"),
}
for idx, ch in enumerate(chapters_data)
]
except (json.JSONDecodeError, subprocess.TimeoutExpired, ValueError, KeyError):
pass
def get_current_progress(self) -> tuple[str, float, float] | None:
"""Get current playback progress."""
if not self.is_playing or self.playback_start_time is None:
@@ -380,7 +367,8 @@ class PlaybackController:
elapsed = self._get_current_elapsed()
total_elapsed = self.seek_offset + elapsed
chapter_name, chapter_elapsed, chapter_total = self._get_current_chapter(total_elapsed)
chapter_name, chapter_elapsed, chapter_total = self._get_current_chapter(
total_elapsed)
return (chapter_name, chapter_elapsed, chapter_total)
def _get_current_chapter(self, elapsed: float) -> tuple[str, float, float]:
@@ -398,3 +386,106 @@ class PlaybackController:
chapter_elapsed = max(0.0, elapsed - last_chapter["start_time"])
chapter_total = last_chapter["end_time"] - last_chapter["start_time"]
return (last_chapter["title"], chapter_elapsed, chapter_total)
def _get_current_chapter_index(self, elapsed: float) -> int | None:
"""Get the index of the current chapter based on elapsed time."""
if not self.chapters:
return None
for idx, chapter in enumerate(self.chapters):
if chapter["start_time"] <= elapsed < chapter["end_time"]:
return idx
return len(self.chapters) - 1
def seek_to_chapter(self, direction: str) -> bool:
"""Seek to next or previous chapter."""
if not self.is_playing or not self.current_file_path:
return False
if not self.chapters:
self.notify("No chapter information available")
return False
elapsed = self._get_current_elapsed()
current_total_position = self.seek_offset + elapsed
current_chapter_idx = self._get_current_chapter_index(
current_total_position)
if current_chapter_idx is None:
self.notify("Could not determine current chapter")
return False
if direction == "next":
if current_chapter_idx >= len(self.chapters) - 1:
self.notify("Already at last chapter")
return False
target_chapter = self.chapters[current_chapter_idx + 1]
new_position = target_chapter["start_time"]
message = f"Next chapter: {target_chapter['title']}"
else:
if current_chapter_idx <= 0:
self.notify("Already at first chapter")
return False
target_chapter = self.chapters[current_chapter_idx - 1]
new_position = target_chapter["start_time"]
message = f"Previous chapter: {target_chapter['title']}"
was_paused = self.is_paused
saved_state = {
"file_path": self.current_file_path,
"asin": self.current_asin,
"activation": self.activation_hex,
"duration": self.total_duration,
"chapters": self.chapters.copy(),
}
self._stop_process()
time.sleep(0.2)
if self.start(saved_state["file_path"], saved_state["activation"], self.notify, new_position):
self.current_asin = saved_state["asin"]
self.total_duration = saved_state["duration"]
self.chapters = saved_state["chapters"]
if was_paused:
time.sleep(0.3)
self.pause()
self.notify(message)
return True
return False
def seek_to_next_chapter(self) -> bool:
"""Seek to the next chapter. Returns True if action was taken."""
return self.seek_to_chapter("next")
def seek_to_previous_chapter(self) -> bool:
"""Seek to the previous chapter. Returns True if action was taken."""
return self.seek_to_chapter("previous")
def _save_current_position(self) -> None:
"""Save the current playback position to Audible."""
if not (self.library_client and self.current_asin and self.is_playing):
return
if self.playback_start_time is None:
return
current_position = self.seek_offset + self._get_current_elapsed()
if current_position <= 0:
return
try:
self.library_client.save_last_position(
self.current_asin, current_position)
except (OSError, ValueError, KeyError):
pass
def update_position_if_needed(self) -> None:
"""Periodically save position if enough time has passed."""
if not (self.is_playing and self.library_client and self.current_asin):
return
current_time = time.time()
if current_time - self.last_save_time >= self.position_save_interval:
self._save_current_position()
self.last_save_time = current_time

87
auditui/table_utils.py Normal file
View File

@@ -0,0 +1,87 @@
"""Utils for table operations."""
import unicodedata
from typing import TYPE_CHECKING, Callable
from .constants import (
AUTHOR_NAME_DISPLAY_LENGTH,
AUTHOR_NAME_MAX_LENGTH,
PROGRESS_COLUMN_INDEX,
)
if TYPE_CHECKING:
from .downloads import DownloadManager
def create_title_sort_key(reverse: bool = False) -> tuple[Callable, bool]:
"""Create a sort key function for sorting by title."""
def title_key(row_values):
title_cell = row_values[0]
if isinstance(title_cell, str):
normalized = unicodedata.normalize('NFD', title_cell)
return normalized.encode('ascii', 'ignore').decode('ascii').lower()
return str(title_cell).lower()
return title_key, reverse
def create_progress_sort_key(progress_column_index: int = PROGRESS_COLUMN_INDEX, reverse: bool = False) -> tuple[Callable, bool]:
"""Create a sort key function for sorting by progress percentage."""
def progress_key(row_values):
progress_cell = row_values[progress_column_index]
if isinstance(progress_cell, str):
try:
return float(progress_cell.rstrip("%"))
except (ValueError, AttributeError):
return 0.0
return 0.0
return progress_key, reverse
def truncate_author_name(author_names: str) -> str:
"""Truncate author name if it exceeds maximum length."""
if author_names and len(author_names) > AUTHOR_NAME_MAX_LENGTH:
return f"{author_names[:AUTHOR_NAME_DISPLAY_LENGTH]}..."
return author_names
def format_item_as_row(item: dict, library_client, download_manager: "DownloadManager | None" = None) -> tuple[str, str, str, str, str]:
"""Format a library item into table row data.
Returns:
Tuple of (title, author, runtime, progress, downloaded) strings
"""
title = library_client.extract_title(item)
author_names = library_client.extract_authors(item)
author_names = truncate_author_name(author_names)
author_display = author_names or "Unknown"
minutes = library_client.extract_runtime_minutes(item)
runtime_str = library_client.format_duration(
minutes, unit="minutes", default_none="Unknown length"
) or "Unknown"
percent_complete = library_client.extract_progress_info(item)
progress_str = (
f"{percent_complete:.1f}%"
if percent_complete and percent_complete > 0
else "0%"
)
downloaded_str = ""
if download_manager:
asin = library_client.extract_asin(item)
if asin and download_manager.is_cached(asin):
downloaded_str = ""
return (title, author_display, runtime_str, progress_str, downloaded_str)
def filter_unfinished_items(items: list[dict], library_client) -> list[dict]:
"""Filter out finished items from the list."""
return [
item for item in items
if not library_client.is_finished(item)
]

33
auditui/ui.py Normal file
View File

@@ -0,0 +1,33 @@
"""UI components for the Auditui application."""
from textual.app import ComposeResult
from textual.containers import Container, Horizontal, ScrollableContainer
from textual.screen import ModalScreen
from textual.widgets import Static
class HelpScreen(ModalScreen):
"""Help screen displaying all available keybindings."""
BINDINGS = [("escape", "dismiss", "Close"), ("?", "dismiss", "Close")]
def compose(self) -> ComposeResult:
with Container(id="help_container"):
yield Static("Key Bindings", id="help_title")
with ScrollableContainer(id="help_content"):
bindings = self.app.BINDINGS
for binding in bindings:
if isinstance(binding, tuple):
key, action, description = binding
else:
key = binding.key
description = binding.description
key_display = key.replace(
"ctrl+", "^").replace("left", "").replace("right", "").replace("space", "Space").replace("enter", "Enter")
with Horizontal(classes="help_row"):
yield Static(f"[bold #f9e2af]{key_display}[/]", classes="help_key")
yield Static(description, classes="help_action")
yield Static("Press [bold #f9e2af]?[/] or [bold #f9e2af]Escape[/] to close", id="help_footer")
def action_dismiss(self) -> None:
self.dismiss()

30
main.py
View File

@@ -1,13 +1,41 @@
#!/usr/bin/env python3
"""Auditui entrypoint."""
import sys
from auditui.app import Auditui
from auditui.auth import authenticate
from auditui.configure import configure
from auditui.constants import AUTH_PATH
def main() -> None:
"""Authenticate and launch the app."""
auth, client = authenticate()
if len(sys.argv) > 1 and sys.argv[1] == "configure":
try:
configure()
print("Configuration completed successfully.")
except Exception as exc:
print(f"Configuration error: {exc}")
sys.exit(1)
return
config_dir = AUTH_PATH.parent
if not config_dir.exists():
print("No configuration yet, please run 'auditui configure'.")
sys.exit(1)
try:
auth, client = authenticate()
except Exception as exc:
print(f"Authentication error: {exc}")
if not AUTH_PATH.exists():
print("No configuration yet, please run 'auditui configure'.")
else:
print("Please re-authenticate by running 'auditui configure'.")
sys.exit(1)
app = Auditui(auth=auth, client=client)
app.run()

View File

@@ -4,4 +4,8 @@ version = "0.1.0"
description = "An Audible TUI client"
readme = "README.md"
requires-python = ">=3.13"
dependencies = ["audible>=0.10.0", "textual>=6.7.1"]
dependencies = [
"audible>=0.10.0",
"httpx>=0.28.1",
"textual>=6.7.1",
]

View File

@@ -37,7 +37,8 @@ class AudibleStats:
self.client = audible.Client(auth=self.auth)
return
except Exception:
logger.info("Failed to load existing auth. Re-authenticating.\n")
logger.info(
"Failed to load existing auth. Re-authenticating.\n")
email = input("Email: ")
password = getpass("Password: ")
@@ -85,7 +86,8 @@ class AudibleStats:
monthly_listening_interval_start_date=f"{middle}-01",
store="Audible",
)
monthly_stats = stats.get("aggregated_monthly_listening_stats", [])
monthly_stats = stats.get(
"aggregated_monthly_listening_stats", [])
has_activity = bool(
monthly_stats
and any(stat.get("aggregated_sum", 0) > 0 for stat in monthly_stats)
@@ -101,12 +103,38 @@ class AudibleStats:
return earliest_year
def get_current_month_listening_time(self) -> tuple[int, int, int]:
"""Get total listening time for the current month as (hours, minutes, seconds)."""
try:
stats = self.client.get(
"1.0/stats/aggregates",
monthly_listening_interval_duration="1",
monthly_listening_interval_start_date=date.today().strftime("%Y-%m"),
store="Audible",
)
monthly_stats = stats.get("aggregated_monthly_listening_stats", [])
if not monthly_stats:
return (0, 0, 0)
total_milliseconds = sum(
stat.get("aggregated_sum", 0) for stat in monthly_stats
)
total_seconds = int(total_milliseconds // 1000)
hours, remainder = divmod(total_seconds, 3600)
minutes, seconds = divmod(remainder, 60)
return (hours, minutes, seconds)
except Exception as e:
logger.warning(f"Could not get current month listening time: {e}")
return (0, 0, 0)
def main() -> None:
"""Main entry point."""
worker = AudibleStats()
worker.authenticate()
print(worker.get_signup_year())
hours, minutes, seconds = worker.get_current_month_listening_time()
print(f"Total listening time this month: {hours}h {minutes}m {seconds}s")
if __name__ == "__main__":

2
uv.lock generated
View File

@@ -37,12 +37,14 @@ version = "0.1.0"
source = { virtual = "." }
dependencies = [
{ name = "audible" },
{ name = "httpx" },
{ name = "textual" },
]
[package.metadata]
requires-dist = [
{ name = "audible", specifier = ">=0.10.0" },
{ name = "httpx", specifier = ">=0.28.1" },
{ name = "textual", specifier = ">=6.7.1" },
]