Compare commits
92 Commits
1cac45e6cf
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 553f5cb4f7 | |||
| 32b37a0834 | |||
| a2d2c7ce3a | |||
| 4741080284 | |||
| 737147b457 | |||
| 123d35068f | |||
| 258aabe10f | |||
| bc070c4162 | |||
| cbf6bff779 | |||
| 080c731fd7 | |||
| 1b6f1ff1f2 | |||
| aa5998c3e3 | |||
| c65e949731 | |||
| ab51e5506e | |||
| 3701b37f4c | |||
| 1474302d7e | |||
| eeecaaf42e | |||
| f359dee194 | |||
| 1e2655670d | |||
| cf6164c438 | |||
| 46fa15fcfe | |||
| 4b457452d4 | |||
| 0de0286992 | |||
| 391b0360bd | |||
| b0dc15a018 | |||
| a6d74265ed | |||
| 4f49a081c9 | |||
| 3a19db2cf0 | |||
| fcb1524806 | |||
| 18ffae7ac8 | |||
| d71c751bbc | |||
| 234b65c9d8 | |||
| 2d9970c198 | |||
| 5e3b33570d | |||
| 2ced756cc0 | |||
| 1c4017ae0c | |||
| 251a7a26d5 | |||
| 6462c83a21 | |||
| 0c590cfa82 | |||
| 16395981dc | |||
| 30f0612bb5 | |||
| 1aaff3b3b7 | |||
| 986541f0d3 | |||
| 151d565f36 | |||
| 7e2b657cfc | |||
| cef5e40347 | |||
| 839394343e | |||
| 84868c4afa | |||
| 03988f0988 | |||
| 9eba702a0a | |||
| f61f4ec55e | |||
| b45ff86061 | |||
| 6824d00088 | |||
| 46c66e0d5c | |||
| d4e73e6a13 | |||
| b2dd430ac9 | |||
| ce0d313187 | |||
| 7fee7e56cf | |||
| 58661641d1 | |||
| 95f30954b5 | |||
| d96a08935c | |||
| 0ce45c26b7 | |||
| 8b74c0f773 | |||
| 4a5e475f27 | |||
| 44d4f28ceb | |||
| 1d6033f057 | |||
| 5fe10a1636 | |||
| 1af3be37ce | |||
| c3dfa239fa | |||
| 42e6a1e029 | |||
| 41f5183653 | |||
| 1a1fee0984 | |||
| ddb7cab39e | |||
| 2d331288dd | |||
| d1a6fda863 | |||
| 2d10922a7c | |||
| 0ad4db95c5 | |||
| 0d9d65088b | |||
| 3b9d1ecf96 | |||
| 27f9a5396e | |||
| d3be27c70d | |||
| df2ae17721 | |||
| a0edab8e32 | |||
| ddb1704cb0 | |||
| 53284d7c0a | |||
| 7951373033 | |||
| cc3a1c6818 | |||
| 1088517cd5 | |||
| a62c3e9bf4 | |||
| fc15096918 | |||
| 37ac47698c | |||
| d6e2284db1 |
65
README.md
65
README.md
@@ -1,26 +1,12 @@
|
||||
# auditui
|
||||
|
||||
A terminal-based user interface (TUI) client for Audible, written in Python 3.
|
||||
A terminal-based user interface (TUI) client for Audible, written in Python 3 : listen to your audiobooks (even offline), browse and manage your library, and more!
|
||||
|
||||
Listen to your audiobooks or podcasts, browse your library, and more.
|
||||
|
||||
## What it does and where are we
|
||||
|
||||
`main.py` offers a TUI interface for browsing your Audible library, listing your books with progress information. You can sort by progress or title, show all books, or show only unfinished books which is the default.
|
||||
|
||||
Now, I'm working on the "play" feature, which should allow you to play/pause/unpause a book from the terminal by pressing `Space` on a book in the list.
|
||||
|
||||
Then, the next thing to add is a progress bar at the bottom of the interface, to show the progress of the book while it's playing.
|
||||
Currently, the only available theme is Catppuccin Mocha, following their [style guide](https://github.com/catppuccin/catppuccin/blob/main/docs/style-guide.md), as it's my preferred theme across most of my tools.
|
||||
|
||||
Look at the [roadmap](#roadmap) for more details.
|
||||
|
||||
It's still a work in progress, so :
|
||||
|
||||
- currently, most code resides in `main.py`, except for some experimental files that aren't part of the final structure:
|
||||
- `player.py` is the test playground for the download and play functionality
|
||||
- `stats.py` is the test playground for the stats functionality
|
||||
- expect bugs and missing features
|
||||
- the code is not yet organized as I'm currently experimenting
|
||||
It's still a work in progress, so expect bugs and missing features.
|
||||
|
||||
## How to run
|
||||
|
||||
@@ -31,28 +17,57 @@ This project uses [uv](https://github.com/astral-sh/uv) for dependency managemen
|
||||
$ uv sync
|
||||
|
||||
# run the TUI
|
||||
$ uv run main.py # or player.py or stats.py
|
||||
$ uv run python -m auditui.cli
|
||||
```
|
||||
|
||||
(`stats.py` is a playground for the stats functionality)
|
||||
|
||||
Please also note that as of now, you need to have [ffmpeg](https://ffmpeg.org/) installed to play audio files.
|
||||
|
||||
### Bindings
|
||||
|
||||
| Key | Action |
|
||||
| ------------ | -------------------------- |
|
||||
| `?` | Show help screen |
|
||||
| `n` | Sort by name |
|
||||
| `p` | Sort by progress |
|
||||
| `a` | Show all/unfinished |
|
||||
| `enter` | Play the selected book |
|
||||
| `space` | Pause/resume the playback |
|
||||
| `left` | Seek backward 30 seconds |
|
||||
| `right` | Seek forward 30 seconds |
|
||||
| `ctrl+left` | Go to the previous chapter |
|
||||
| `ctrl+right` | Go to the next chapter |
|
||||
| `d` | Download/delete from cache |
|
||||
| `q` | Quit the application |
|
||||
|
||||
## Roadmap
|
||||
|
||||
- [x] list your library
|
||||
- [x] list your unfinished books with progress information
|
||||
- [ ] play/pause a book
|
||||
- [ ] resume playback of a book from the last position, regardless of which device was used previously
|
||||
- [ ] save the current playback position when pausing or exiting the app
|
||||
- [ ] print progress at the bottom of the app while a book is playing
|
||||
- [ ] add control to go to the previous/next chapter
|
||||
- [ ] add a control to jump 30s earlier/later
|
||||
- [x] play/pause a book
|
||||
- [x] catppuccin mocha theme
|
||||
- [x] print chapter and progress in the footer of the app while a book is playing
|
||||
- [x] chapter progress bar in footer
|
||||
- [x] add a control to jump 30s earlier/later
|
||||
- [x] add control to go to the previous/next chapter
|
||||
- [x] save/resume playback of a book from the last position, regardless of which device was used previously
|
||||
- [x] download/remove a book in the cache without having to play it
|
||||
- [x] add a help screen with all the keybindings
|
||||
- [ ] increase/decrease reading speed
|
||||
- [ ] mark a book as finished or unfinished
|
||||
- [ ] get your stats in a separated pane
|
||||
- [ ] filter books on views
|
||||
- [ ] search in your book library
|
||||
|
||||
And after that:
|
||||
|
||||
- [ ] search the marketplace for books
|
||||
- [ ] add a book in your wishlist
|
||||
- [ ] get your listening stats from Audible
|
||||
|
||||
All of this, and of course:
|
||||
|
||||
- [ ] installation setup
|
||||
- [ ] build a decent TUI interface using [Textual](https://textual.textualize.io/)
|
||||
- [ ] code cleanup / organization
|
||||
|
||||
|
||||
2
auditui/__init__.py
Normal file
2
auditui/__init__.py
Normal file
@@ -0,0 +1,2 @@
|
||||
"""Auditui package providing the Audible TUI app components."""
|
||||
|
||||
402
auditui/app.py
Normal file
402
auditui/app.py
Normal file
@@ -0,0 +1,402 @@
|
||||
"""Textual application for the Audible TUI."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from textual import work
|
||||
from textual.app import App, ComposeResult
|
||||
from textual.events import Key
|
||||
from textual.widgets import DataTable, Footer, Header, ProgressBar, Static
|
||||
from textual.worker import get_current_worker
|
||||
|
||||
from .constants import PROGRESS_COLUMN_INDEX, SEEK_SECONDS, TABLE_CSS, TABLE_COLUMNS
|
||||
from .downloads import DownloadManager
|
||||
from .library import LibraryClient
|
||||
from .playback import PlaybackController
|
||||
from .table_utils import (
|
||||
create_progress_sort_key,
|
||||
create_title_sort_key,
|
||||
filter_unfinished_items,
|
||||
format_item_as_row,
|
||||
)
|
||||
from .ui import HelpScreen
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from textual.widgets._data_table import ColumnKey
|
||||
|
||||
|
||||
class Auditui(App):
|
||||
"""Main application class for the Audible TUI app."""
|
||||
|
||||
theme = "textual-dark"
|
||||
SHOW_PALETTE = False
|
||||
|
||||
BINDINGS = [
|
||||
("?", "show_help", "Help"),
|
||||
("n", "sort", "Sort by name"),
|
||||
("p", "sort_by_progress", "Sort by progress"),
|
||||
("a", "show_all", "All/Unfinished"),
|
||||
("enter", "play_selected", "Play"),
|
||||
("space", "toggle_playback", "Pause/Resume"),
|
||||
("left", "seek_backward", "-30s"),
|
||||
("right", "seek_forward", "+30s"),
|
||||
("ctrl+left", "previous_chapter", "Previous chapter"),
|
||||
("ctrl+right", "next_chapter", "Next chapter"),
|
||||
("d", "toggle_download", "Download/Delete"),
|
||||
("q", "quit", "Quit"),
|
||||
]
|
||||
|
||||
CSS = TABLE_CSS
|
||||
|
||||
def __init__(self, auth=None, client=None) -> None:
|
||||
super().__init__()
|
||||
self.auth = auth
|
||||
self.client = client
|
||||
self.library_client = LibraryClient(client) if client else None
|
||||
self.download_manager = (
|
||||
DownloadManager(auth, client) if auth and client else None
|
||||
)
|
||||
self.playback = PlaybackController(
|
||||
self.update_status, self.library_client)
|
||||
|
||||
self.all_items: list[dict] = []
|
||||
self.current_items: list[dict] = []
|
||||
self.show_all_mode = False
|
||||
self.title_sort_reverse = False
|
||||
self.progress_sort_reverse = False
|
||||
self.title_column_key: ColumnKey | None = None
|
||||
self.progress_column_key: ColumnKey | None = None
|
||||
self.progress_column_index = PROGRESS_COLUMN_INDEX
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
yield Header()
|
||||
yield Static("Loading...", id="status")
|
||||
table: DataTable = DataTable()
|
||||
table.zebra_stripes = True
|
||||
table.cursor_type = "row"
|
||||
yield table
|
||||
yield Static("", id="progress_info")
|
||||
yield ProgressBar(id="progress_bar", show_eta=False, show_percentage=False, total=100)
|
||||
yield Footer(show_command_palette=False)
|
||||
|
||||
def on_mount(self) -> None:
|
||||
"""Initialize the table and start fetching library data."""
|
||||
table = self.query_one(DataTable)
|
||||
table.add_columns(*TABLE_COLUMNS)
|
||||
column_keys = list(table.columns.keys())
|
||||
self.title_column_key = column_keys[0]
|
||||
self.progress_column_key = column_keys[3]
|
||||
|
||||
if self.client:
|
||||
self.update_status("Fetching library...")
|
||||
self.fetch_library()
|
||||
else:
|
||||
self.update_status(
|
||||
"Not authenticated. Please restart and authenticate.")
|
||||
|
||||
self.set_interval(1.0, self._check_playback_status)
|
||||
self.set_interval(0.5, self._update_progress)
|
||||
self.set_interval(30.0, self._save_position_periodically)
|
||||
|
||||
def on_unmount(self) -> None:
|
||||
"""Clean up on app exit."""
|
||||
self.playback.stop()
|
||||
if self.download_manager:
|
||||
self.download_manager.close()
|
||||
|
||||
def on_key(self, event: Key) -> None:
|
||||
"""Handle key presses."""
|
||||
if self.playback.is_playing:
|
||||
if event.key == "ctrl+left":
|
||||
event.prevent_default()
|
||||
self.action_previous_chapter()
|
||||
return
|
||||
elif event.key == "ctrl+right":
|
||||
event.prevent_default()
|
||||
self.action_next_chapter()
|
||||
return
|
||||
elif event.key == "left":
|
||||
event.prevent_default()
|
||||
self.action_seek_backward()
|
||||
return
|
||||
elif event.key == "right":
|
||||
event.prevent_default()
|
||||
self.action_seek_forward()
|
||||
return
|
||||
|
||||
if isinstance(self.focused, DataTable):
|
||||
if event.key == "enter":
|
||||
event.prevent_default()
|
||||
self.action_play_selected()
|
||||
elif event.key == "space":
|
||||
event.prevent_default()
|
||||
self.action_toggle_playback()
|
||||
|
||||
def update_status(self, message: str) -> None:
|
||||
"""Update the status message in the UI."""
|
||||
status = self.query_one("#status", Static)
|
||||
status.update(message)
|
||||
|
||||
def _thread_status_update(self, message: str) -> None:
|
||||
"""Safely update status from worker threads."""
|
||||
self.call_from_thread(self.update_status, message)
|
||||
|
||||
@work(exclusive=True, thread=True)
|
||||
def fetch_library(self) -> None:
|
||||
"""Fetch all library items from Audible API in background thread."""
|
||||
worker = get_current_worker()
|
||||
if worker.is_cancelled or not self.library_client:
|
||||
return
|
||||
|
||||
try:
|
||||
all_items = self.library_client.fetch_all_items(
|
||||
self._thread_status_update)
|
||||
self.call_from_thread(self.on_library_loaded, all_items)
|
||||
except (OSError, ValueError, KeyError) as exc:
|
||||
self.call_from_thread(self.on_library_error, str(exc))
|
||||
|
||||
def on_library_loaded(self, items: list[dict]) -> None:
|
||||
"""Handle successful library load."""
|
||||
self.all_items = items
|
||||
self.update_status(f"Loaded {len(items)} books")
|
||||
self.show_unfinished()
|
||||
|
||||
def on_library_error(self, error: str) -> None:
|
||||
"""Handle library fetch error."""
|
||||
self.update_status(f"Error fetching library: {error}")
|
||||
|
||||
def _populate_table(self, items: list[dict]) -> None:
|
||||
"""Populate the DataTable with library items."""
|
||||
table = self.query_one(DataTable)
|
||||
table.clear()
|
||||
|
||||
if not items or not self.library_client:
|
||||
self.update_status("No books found.")
|
||||
return
|
||||
|
||||
for item in items:
|
||||
title, author, runtime, progress, downloaded = format_item_as_row(
|
||||
item, self.library_client, self.download_manager)
|
||||
table.add_row(title, author, runtime,
|
||||
progress, downloaded, key=title)
|
||||
|
||||
self.current_items = items
|
||||
mode = "all" if self.show_all_mode else "unfinished"
|
||||
self.update_status(f"Showing {len(items)} books ({mode})")
|
||||
|
||||
def _refresh_table(self) -> None:
|
||||
"""Refresh the table with current items."""
|
||||
if self.current_items:
|
||||
self._populate_table(self.current_items)
|
||||
|
||||
def show_all(self) -> None:
|
||||
"""Display all books in the table."""
|
||||
if not self.all_items:
|
||||
return
|
||||
self.show_all_mode = True
|
||||
self._populate_table(self.all_items)
|
||||
|
||||
def show_unfinished(self) -> None:
|
||||
"""Display only unfinished books in the table."""
|
||||
if not self.all_items or not self.library_client:
|
||||
return
|
||||
|
||||
self.show_all_mode = False
|
||||
unfinished_items = filter_unfinished_items(
|
||||
self.all_items, self.library_client)
|
||||
self._populate_table(unfinished_items)
|
||||
|
||||
def action_sort(self) -> None:
|
||||
"""Sort table by title, toggling direction on each press."""
|
||||
table = self.query_one(DataTable)
|
||||
if table.row_count > 0 and self.title_column_key:
|
||||
title_key, reverse = create_title_sort_key(self.title_sort_reverse)
|
||||
table.sort(key=title_key, reverse=reverse)
|
||||
self.title_sort_reverse = not self.title_sort_reverse
|
||||
|
||||
def action_sort_by_progress(self) -> None:
|
||||
"""Sort table by progress percentage, toggling direction on each press."""
|
||||
table = self.query_one(DataTable)
|
||||
if table.row_count > 0:
|
||||
self.progress_sort_reverse = not self.progress_sort_reverse
|
||||
progress_key, reverse = create_progress_sort_key(
|
||||
self.progress_column_index, self.progress_sort_reverse)
|
||||
table.sort(key=progress_key, reverse=reverse)
|
||||
|
||||
def action_show_all(self) -> None:
|
||||
"""Toggle between showing all and unfinished books."""
|
||||
if self.show_all_mode:
|
||||
self.show_unfinished()
|
||||
else:
|
||||
self.show_all()
|
||||
|
||||
def action_show_unfinished(self) -> None:
|
||||
"""Show unfinished books."""
|
||||
self.show_unfinished()
|
||||
|
||||
def action_play_selected(self) -> None:
|
||||
"""Start playing the selected book."""
|
||||
if not self.download_manager:
|
||||
self.update_status(
|
||||
"Not authenticated. Please restart and authenticate.")
|
||||
return
|
||||
|
||||
table = self.query_one(DataTable)
|
||||
if table.row_count == 0:
|
||||
self.update_status("No books available")
|
||||
return
|
||||
|
||||
cursor_row = table.cursor_row
|
||||
if cursor_row >= len(self.current_items):
|
||||
self.update_status("Invalid selection")
|
||||
return
|
||||
|
||||
if not self.library_client:
|
||||
self.update_status("Library client not available")
|
||||
return
|
||||
|
||||
selected_item = self.current_items[cursor_row]
|
||||
asin = self.library_client.extract_asin(selected_item)
|
||||
|
||||
if not asin:
|
||||
self.update_status("Could not get ASIN for selected book")
|
||||
return
|
||||
|
||||
self._start_playback_async(asin)
|
||||
|
||||
def action_toggle_playback(self) -> None:
|
||||
"""Toggle pause/resume state."""
|
||||
if not self.playback.toggle_playback():
|
||||
self._no_playback_message()
|
||||
|
||||
def action_seek_forward(self) -> None:
|
||||
"""Seek forward 30 seconds."""
|
||||
if not self.playback.seek_forward(SEEK_SECONDS):
|
||||
self._no_playback_message()
|
||||
|
||||
def action_seek_backward(self) -> None:
|
||||
"""Seek backward 30 seconds."""
|
||||
if not self.playback.seek_backward(SEEK_SECONDS):
|
||||
self._no_playback_message()
|
||||
|
||||
def action_next_chapter(self) -> None:
|
||||
"""Seek to the next chapter."""
|
||||
if not self.playback.seek_to_next_chapter():
|
||||
self._no_playback_message()
|
||||
|
||||
def action_previous_chapter(self) -> None:
|
||||
"""Seek to the previous chapter."""
|
||||
if not self.playback.seek_to_previous_chapter():
|
||||
self._no_playback_message()
|
||||
|
||||
def _no_playback_message(self) -> None:
|
||||
"""Show message when no playback is active."""
|
||||
self.update_status("No playback active. Press Enter to play a book.")
|
||||
|
||||
def action_show_help(self) -> None:
|
||||
"""Show the help screen with all keybindings."""
|
||||
self.push_screen(HelpScreen())
|
||||
|
||||
def _check_playback_status(self) -> None:
|
||||
"""Check if playback process has finished and update state accordingly."""
|
||||
message = self.playback.check_status()
|
||||
if message:
|
||||
self.update_status(message)
|
||||
self._hide_progress()
|
||||
|
||||
def _update_progress(self) -> None:
|
||||
"""Update the progress bar and info during playback."""
|
||||
if not self.playback.is_playing:
|
||||
self._hide_progress()
|
||||
return
|
||||
|
||||
progress_data = self.playback.get_current_progress()
|
||||
if not progress_data:
|
||||
self._hide_progress()
|
||||
return
|
||||
|
||||
chapter_name, chapter_elapsed, chapter_total = progress_data
|
||||
if chapter_total <= 0:
|
||||
self._hide_progress()
|
||||
return
|
||||
|
||||
progress_info = self.query_one("#progress_info", Static)
|
||||
progress_bar = self.query_one("#progress_bar", ProgressBar)
|
||||
|
||||
progress_percent = min(100.0, max(
|
||||
0.0, (chapter_elapsed / chapter_total) * 100.0))
|
||||
progress_bar.update(progress=progress_percent)
|
||||
chapter_elapsed_str = LibraryClient.format_time(chapter_elapsed)
|
||||
chapter_total_str = LibraryClient.format_time(chapter_total)
|
||||
progress_info.update(
|
||||
f"{chapter_name} | {chapter_elapsed_str} / {chapter_total_str}")
|
||||
progress_info.display = True
|
||||
progress_bar.display = True
|
||||
|
||||
def _hide_progress(self) -> None:
|
||||
"""Hide the progress widget."""
|
||||
progress_info = self.query_one("#progress_info", Static)
|
||||
progress_bar = self.query_one("#progress_bar", ProgressBar)
|
||||
progress_info.display = False
|
||||
progress_bar.display = False
|
||||
|
||||
def _save_position_periodically(self) -> None:
|
||||
"""Periodically save playback position."""
|
||||
self.playback.update_position_if_needed()
|
||||
|
||||
def action_toggle_download(self) -> None:
|
||||
"""Toggle download/remove for the selected book."""
|
||||
if not self.download_manager:
|
||||
self.update_status(
|
||||
"Not authenticated. Please restart and authenticate.")
|
||||
return
|
||||
|
||||
table = self.query_one(DataTable)
|
||||
if table.row_count == 0:
|
||||
self.update_status("No books available")
|
||||
return
|
||||
|
||||
cursor_row = table.cursor_row
|
||||
if cursor_row >= len(self.current_items):
|
||||
self.update_status("Invalid selection")
|
||||
return
|
||||
|
||||
if not self.library_client:
|
||||
self.update_status("Library client not available")
|
||||
return
|
||||
|
||||
selected_item = self.current_items[cursor_row]
|
||||
asin = self.library_client.extract_asin(selected_item)
|
||||
|
||||
if not asin:
|
||||
self.update_status("Could not get ASIN for selected book")
|
||||
return
|
||||
|
||||
self._toggle_download_async(asin)
|
||||
|
||||
@work(exclusive=True, thread=True)
|
||||
def _toggle_download_async(self, asin: str) -> None:
|
||||
"""Toggle download/remove asynchronously."""
|
||||
if not self.download_manager:
|
||||
return
|
||||
|
||||
if self.download_manager.is_cached(asin):
|
||||
self.download_manager.remove_cached(
|
||||
asin, self._thread_status_update)
|
||||
else:
|
||||
self.download_manager.get_or_download(
|
||||
asin, self._thread_status_update)
|
||||
|
||||
self.call_from_thread(self._refresh_table)
|
||||
|
||||
@work(exclusive=True, thread=True)
|
||||
def _start_playback_async(self, asin: str) -> None:
|
||||
"""Start playback asynchronously."""
|
||||
if not self.download_manager:
|
||||
return
|
||||
self.playback.prepare_and_start(
|
||||
self.download_manager,
|
||||
asin,
|
||||
self._thread_status_update,
|
||||
)
|
||||
25
auditui/auth.py
Normal file
25
auditui/auth.py
Normal file
@@ -0,0 +1,25 @@
|
||||
"""Authentication helpers for the Auditui app."""
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Tuple
|
||||
|
||||
import audible
|
||||
|
||||
from .constants import AUTH_PATH
|
||||
|
||||
|
||||
def authenticate(
|
||||
auth_path: Path = AUTH_PATH,
|
||||
) -> Tuple[audible.Authenticator, audible.Client]:
|
||||
"""Authenticate with Audible and return authenticator and client."""
|
||||
if not auth_path.exists():
|
||||
raise FileNotFoundError(
|
||||
"Authentication file not found. Please run 'auditui configure' to set up authentication.")
|
||||
|
||||
try:
|
||||
authenticator = audible.Authenticator.from_file(str(auth_path))
|
||||
audible_client = audible.Client(auth=authenticator)
|
||||
return authenticator, audible_client
|
||||
except (OSError, ValueError, KeyError) as exc:
|
||||
raise ValueError(
|
||||
f"Failed to load existing authentication: {exc}") from exc
|
||||
45
auditui/cli.py
Normal file
45
auditui/cli.py
Normal file
@@ -0,0 +1,45 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Auditui entrypoint."""
|
||||
|
||||
import sys
|
||||
|
||||
from auditui.app import Auditui
|
||||
from auditui.auth import authenticate
|
||||
from auditui.configure import configure
|
||||
from auditui.constants import AUTH_PATH
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""Authenticate and launch the app."""
|
||||
if len(sys.argv) > 1 and sys.argv[1] == "configure":
|
||||
try:
|
||||
configure()
|
||||
print("Configuration completed successfully.")
|
||||
except Exception as exc:
|
||||
print(f"Configuration error: {exc}")
|
||||
sys.exit(1)
|
||||
return
|
||||
|
||||
config_dir = AUTH_PATH.parent
|
||||
|
||||
if not config_dir.exists():
|
||||
print("No configuration yet, please run 'auditui configure'.")
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
auth, client = authenticate()
|
||||
except Exception as exc:
|
||||
print(f"Authentication error: {exc}")
|
||||
if not AUTH_PATH.exists():
|
||||
print("No configuration yet, please run 'auditui configure'.")
|
||||
else:
|
||||
print("Please re-authenticate by running 'auditui configure'.")
|
||||
sys.exit(1)
|
||||
|
||||
app = Auditui(auth=auth, client=client)
|
||||
app.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
40
auditui/configure.py
Normal file
40
auditui/configure.py
Normal file
@@ -0,0 +1,40 @@
|
||||
"""Configuration helpers for the Auditui app."""
|
||||
|
||||
from getpass import getpass
|
||||
from pathlib import Path
|
||||
from typing import Tuple
|
||||
|
||||
import audible
|
||||
|
||||
from .constants import AUTH_PATH
|
||||
|
||||
|
||||
def configure(
|
||||
auth_path: Path = AUTH_PATH,
|
||||
) -> Tuple[audible.Authenticator, audible.Client]:
|
||||
"""Force re-authentication and save credentials."""
|
||||
if auth_path.exists():
|
||||
response = input(
|
||||
"Configuration already exists. Are you sure you want to overwrite it? (y/N): "
|
||||
).strip().lower()
|
||||
if response not in ("yes", "y"):
|
||||
print("Configuration cancelled.")
|
||||
raise SystemExit(0)
|
||||
|
||||
print("Please authenticate with your Audible account.")
|
||||
|
||||
email = input("\nEmail: ")
|
||||
password = getpass("Password: ")
|
||||
marketplace = input(
|
||||
"Marketplace locale (default: US): ").strip().upper() or "US"
|
||||
|
||||
authenticator = audible.Authenticator.from_login(
|
||||
username=email, password=password, locale=marketplace
|
||||
)
|
||||
|
||||
auth_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
authenticator.to_file(str(auth_path))
|
||||
print("Authentication successful!")
|
||||
audible_client = audible.Client(auth=authenticator)
|
||||
return authenticator, audible_client
|
||||
|
||||
236
auditui/constants.py
Normal file
236
auditui/constants.py
Normal file
@@ -0,0 +1,236 @@
|
||||
"""Shared constants for the Auditui application."""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
AUTH_PATH = Path.home() / ".config" / "auditui" / "auth.json"
|
||||
CACHE_DIR = Path.home() / ".cache" / "auditui" / "books"
|
||||
DOWNLOAD_URL = "https://cde-ta-g7g.amazon.com/FionaCDEServiceEngine/FSDownloadContent"
|
||||
DEFAULT_CODEC = "LC_128_44100_stereo"
|
||||
MIN_FILE_SIZE = 1024 * 1024
|
||||
DEFAULT_CHUNK_SIZE = 8192
|
||||
|
||||
TABLE_COLUMNS = ("Title", "Author", "Length", "Progress", "Downloaded")
|
||||
|
||||
AUTHOR_NAME_MAX_LENGTH = 40
|
||||
AUTHOR_NAME_DISPLAY_LENGTH = 37
|
||||
PROGRESS_COLUMN_INDEX = 3
|
||||
SEEK_SECONDS = 30.0
|
||||
|
||||
TABLE_CSS = """
|
||||
Screen {
|
||||
background: #1e1e2e;
|
||||
}
|
||||
|
||||
Header {
|
||||
background: #181825;
|
||||
color: #cdd6f4;
|
||||
text-style: bold;
|
||||
}
|
||||
|
||||
Footer {
|
||||
background: #181825;
|
||||
color: #bac2de;
|
||||
height: 2;
|
||||
padding: 0 1;
|
||||
scrollbar-size: 0 0;
|
||||
overflow-x: auto;
|
||||
overflow-y: hidden;
|
||||
}
|
||||
|
||||
Footer > HorizontalGroup > KeyGroup,
|
||||
Footer > HorizontalGroup > KeyGroup.-compact {
|
||||
margin: 0;
|
||||
padding: 0;
|
||||
background: #181825;
|
||||
}
|
||||
|
||||
FooterKey,
|
||||
FooterKey.-grouped,
|
||||
Footer.-compact FooterKey {
|
||||
background: #181825;
|
||||
padding: 0;
|
||||
margin: 0 1 0 0;
|
||||
}
|
||||
|
||||
FooterKey .footer-key--key {
|
||||
color: #f9e2af;
|
||||
background: #181825;
|
||||
text-style: bold;
|
||||
padding: 0 1 0 0;
|
||||
}
|
||||
|
||||
FooterKey .footer-key--description {
|
||||
color: #cdd6f4;
|
||||
background: #181825;
|
||||
padding: 0;
|
||||
}
|
||||
|
||||
FooterKey:hover {
|
||||
background: #313244;
|
||||
color: #cdd6f4;
|
||||
}
|
||||
|
||||
FooterKey:hover .footer-key--key,
|
||||
FooterKey:hover .footer-key--description {
|
||||
background: #313244;
|
||||
}
|
||||
|
||||
DataTable {
|
||||
height: 1fr;
|
||||
background: #1e1e2e;
|
||||
color: #cdd6f4;
|
||||
border: solid #585b70;
|
||||
}
|
||||
|
||||
DataTable:focus {
|
||||
border: solid #89b4fa;
|
||||
}
|
||||
|
||||
DataTable > .datatable--header {
|
||||
background: #45475a;
|
||||
color: #bac2de;
|
||||
text-style: bold;
|
||||
}
|
||||
|
||||
DataTable > .datatable--cursor {
|
||||
background: #313244;
|
||||
color: #cdd6f4;
|
||||
}
|
||||
|
||||
DataTable > .datatable--odd-row {
|
||||
background: #181825;
|
||||
}
|
||||
|
||||
DataTable > .datatable--even-row {
|
||||
background: #1e1e2e;
|
||||
}
|
||||
|
||||
Static {
|
||||
height: 1;
|
||||
text-align: center;
|
||||
background: #181825;
|
||||
color: #cdd6f4;
|
||||
}
|
||||
|
||||
Static#status {
|
||||
color: #bac2de;
|
||||
}
|
||||
|
||||
Static#progress_info {
|
||||
color: #89b4fa;
|
||||
text-style: bold;
|
||||
margin: 0;
|
||||
padding: 0;
|
||||
width: 100%;
|
||||
}
|
||||
|
||||
ProgressBar#progress_bar {
|
||||
height: 1;
|
||||
background: #181825;
|
||||
border: none;
|
||||
margin: 0;
|
||||
padding: 0 1;
|
||||
width: 100%;
|
||||
align: center middle;
|
||||
}
|
||||
|
||||
ProgressBar#progress_bar > .progress-bar--track {
|
||||
background: #45475a;
|
||||
}
|
||||
|
||||
ProgressBar#progress_bar > .progress-bar--bar {
|
||||
background: #a6e3a1;
|
||||
}
|
||||
|
||||
HelpScreen {
|
||||
align: center middle;
|
||||
background: rgba(0, 0, 0, 0.7);
|
||||
}
|
||||
|
||||
#help_container {
|
||||
width: 70;
|
||||
height: auto;
|
||||
max-height: 85%;
|
||||
min-height: 20;
|
||||
background: #1e1e2e;
|
||||
border: thick #89b4fa;
|
||||
padding: 2;
|
||||
}
|
||||
|
||||
#help_title {
|
||||
text-align: center;
|
||||
text-style: bold;
|
||||
color: #89b4fa;
|
||||
margin-bottom: 2;
|
||||
padding-bottom: 1;
|
||||
border-bottom: solid #585b70;
|
||||
height: 3;
|
||||
align: center middle;
|
||||
}
|
||||
|
||||
#help_content {
|
||||
width: 100%;
|
||||
height: 1fr;
|
||||
padding: 1 0;
|
||||
margin: 1 0;
|
||||
overflow-y: auto;
|
||||
scrollbar-size: 0 1;
|
||||
}
|
||||
|
||||
#help_content > .scrollbar--vertical {
|
||||
background: #313244;
|
||||
}
|
||||
|
||||
#help_content > .scrollbar--vertical > .scrollbar--track {
|
||||
background: #181825;
|
||||
}
|
||||
|
||||
#help_content > .scrollbar--vertical > .scrollbar--handle {
|
||||
background: #585b70;
|
||||
}
|
||||
|
||||
#help_content > .scrollbar--vertical > .scrollbar--handle:hover {
|
||||
background: #45475a;
|
||||
}
|
||||
|
||||
.help_row {
|
||||
height: 3;
|
||||
margin: 0 0 1 0;
|
||||
padding: 0 1;
|
||||
background: #181825;
|
||||
border: solid #313244;
|
||||
align: left middle;
|
||||
}
|
||||
|
||||
.help_row:hover {
|
||||
background: #313244;
|
||||
border: solid #45475a;
|
||||
}
|
||||
|
||||
.help_key {
|
||||
width: 20;
|
||||
text-align: right;
|
||||
padding: 0 2 0 0;
|
||||
color: #f9e2af;
|
||||
text-style: bold;
|
||||
align: right middle;
|
||||
}
|
||||
|
||||
.help_action {
|
||||
width: 1fr;
|
||||
text-align: left;
|
||||
padding: 0 0 0 2;
|
||||
color: #cdd6f4;
|
||||
align: left middle;
|
||||
}
|
||||
|
||||
#help_footer {
|
||||
text-align: center;
|
||||
color: #bac2de;
|
||||
margin-top: 2;
|
||||
padding-top: 1;
|
||||
border-top: solid #585b70;
|
||||
height: 3;
|
||||
align: center middle;
|
||||
}
|
||||
"""
|
||||
236
auditui/downloads.py
Normal file
236
auditui/downloads.py
Normal file
@@ -0,0 +1,236 @@
|
||||
"""Download helpers for Audible content."""
|
||||
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Callable
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import audible
|
||||
import httpx
|
||||
from audible.activation_bytes import get_activation_bytes
|
||||
|
||||
from .constants import CACHE_DIR, DEFAULT_CHUNK_SIZE, DEFAULT_CODEC, DOWNLOAD_URL, MIN_FILE_SIZE
|
||||
|
||||
StatusCallback = Callable[[str], None]
|
||||
|
||||
|
||||
class DownloadManager:
|
||||
"""Handle retrieval and download of Audible titles."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
auth: audible.Authenticator,
|
||||
client: audible.Client,
|
||||
cache_dir: Path = CACHE_DIR,
|
||||
chunk_size: int = DEFAULT_CHUNK_SIZE,
|
||||
) -> None:
|
||||
self.auth = auth
|
||||
self.client = client
|
||||
self.cache_dir = cache_dir
|
||||
self.cache_dir.mkdir(parents=True, exist_ok=True)
|
||||
self.chunk_size = chunk_size
|
||||
self._http_client = httpx.Client(
|
||||
auth=auth, timeout=30.0, follow_redirects=True)
|
||||
self._download_client = httpx.Client(
|
||||
timeout=httpx.Timeout(connect=30.0, read=None,
|
||||
write=30.0, pool=30.0),
|
||||
follow_redirects=True,
|
||||
)
|
||||
|
||||
def get_or_download(self, asin: str, notify: StatusCallback | None = None) -> Path | None:
|
||||
"""Get local path of AAX file, downloading if missing."""
|
||||
title = self._get_name_from_asin(asin) or asin
|
||||
safe_title = self._sanitize_filename(title)
|
||||
local_path = self.cache_dir / f"{safe_title}.aax"
|
||||
|
||||
if local_path.exists() and local_path.stat().st_size >= MIN_FILE_SIZE:
|
||||
if notify:
|
||||
notify(f"Using cached file: {local_path.name}")
|
||||
return local_path
|
||||
|
||||
if notify:
|
||||
notify(f"Downloading to {local_path.name}...")
|
||||
|
||||
dl_link = self._get_download_link(asin, notify=notify)
|
||||
if not dl_link:
|
||||
if notify:
|
||||
notify("Failed to get download link")
|
||||
return None
|
||||
|
||||
if not self._validate_download_url(dl_link):
|
||||
if notify:
|
||||
notify("Invalid download URL")
|
||||
return None
|
||||
|
||||
if not self._download_file(dl_link, local_path, notify):
|
||||
if notify:
|
||||
notify("Download failed")
|
||||
return None
|
||||
|
||||
if not local_path.exists() or local_path.stat().st_size < MIN_FILE_SIZE:
|
||||
if notify:
|
||||
notify("Download failed or file too small")
|
||||
return None
|
||||
|
||||
return local_path
|
||||
|
||||
def get_activation_bytes(self) -> str | None:
|
||||
"""Get activation bytes as hex string."""
|
||||
try:
|
||||
activation_bytes = get_activation_bytes(self.auth)
|
||||
if isinstance(activation_bytes, bytes):
|
||||
return activation_bytes.hex()
|
||||
return str(activation_bytes)
|
||||
except (OSError, ValueError, KeyError, AttributeError):
|
||||
return None
|
||||
|
||||
def get_cached_path(self, asin: str) -> Path | None:
|
||||
"""Get the cached file path for a book if it exists."""
|
||||
title = self._get_name_from_asin(asin) or asin
|
||||
safe_title = self._sanitize_filename(title)
|
||||
local_path = self.cache_dir / f"{safe_title}.aax"
|
||||
if local_path.exists() and local_path.stat().st_size >= MIN_FILE_SIZE:
|
||||
return local_path
|
||||
return None
|
||||
|
||||
def is_cached(self, asin: str) -> bool:
|
||||
"""Check if a book is already cached."""
|
||||
return self.get_cached_path(asin) is not None
|
||||
|
||||
def remove_cached(self, asin: str, notify: StatusCallback | None = None) -> bool:
|
||||
"""Remove a cached book file."""
|
||||
cached_path = self.get_cached_path(asin)
|
||||
if not cached_path:
|
||||
if notify:
|
||||
notify("Book is not cached")
|
||||
return False
|
||||
|
||||
try:
|
||||
cached_path.unlink()
|
||||
if notify:
|
||||
notify(f"Removed from cache: {cached_path.name}")
|
||||
return True
|
||||
except OSError as exc:
|
||||
if notify:
|
||||
notify(f"Failed to remove cache: {exc}")
|
||||
return False
|
||||
|
||||
def _validate_download_url(self, url: str) -> bool:
|
||||
"""Validate that the URL is a valid HTTP/HTTPS URL."""
|
||||
try:
|
||||
parsed = urlparse(url)
|
||||
return parsed.scheme in ("http", "https") and bool(parsed.netloc)
|
||||
except (ValueError, AttributeError):
|
||||
return False
|
||||
|
||||
def _sanitize_filename(self, filename: str) -> str:
|
||||
"""Remove invalid characters from filename."""
|
||||
return re.sub(r'[<>:"/\\|?*]', "_", filename)
|
||||
|
||||
def _get_name_from_asin(self, asin: str) -> str | None:
|
||||
"""Get the title/name of a book from its ASIN."""
|
||||
try:
|
||||
product_info = self.client.get(
|
||||
path=f"1.0/catalog/products/{asin}",
|
||||
response_groups="product_desc,product_attrs",
|
||||
)
|
||||
product = product_info.get("product", {})
|
||||
return product.get("title") or "Unknown Title"
|
||||
except (OSError, ValueError, KeyError):
|
||||
return None
|
||||
|
||||
def _get_download_link(
|
||||
self, asin: str, codec: str = DEFAULT_CODEC, notify: StatusCallback | None = None
|
||||
) -> str | None:
|
||||
"""Get download link for book."""
|
||||
if self.auth.adp_token is None:
|
||||
if notify:
|
||||
notify("Missing ADP token (not authenticated?)")
|
||||
return None
|
||||
|
||||
try:
|
||||
params = {
|
||||
"type": "AUDI",
|
||||
"currentTransportMethod": "WIFI",
|
||||
"key": asin,
|
||||
"codec": codec,
|
||||
}
|
||||
response = self._http_client.get(
|
||||
url=DOWNLOAD_URL,
|
||||
params=params,
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
link = response.headers.get("Location")
|
||||
if not link:
|
||||
link = str(response.url)
|
||||
|
||||
tld = self.auth.locale.domain
|
||||
return link.replace("cds.audible.com", f"cds.audible.{tld}")
|
||||
|
||||
except httpx.HTTPError as exc:
|
||||
if notify:
|
||||
notify(f"Download-link request failed: {exc!s}")
|
||||
return None
|
||||
except (OSError, ValueError, KeyError, AttributeError) as exc:
|
||||
if notify:
|
||||
notify(f"Download-link error: {exc!s}")
|
||||
return None
|
||||
|
||||
def _download_file(
|
||||
self, url: str, dest_path: Path, notify: StatusCallback | None = None
|
||||
) -> Path | None:
|
||||
"""Download file from URL to destination."""
|
||||
try:
|
||||
with self._download_client.stream("GET", url) as response:
|
||||
response.raise_for_status()
|
||||
total_size = int(response.headers.get("content-length", 0))
|
||||
downloaded = 0
|
||||
|
||||
with open(dest_path, "wb") as file_handle:
|
||||
for chunk in response.iter_bytes(chunk_size=self.chunk_size):
|
||||
file_handle.write(chunk)
|
||||
downloaded += len(chunk)
|
||||
if total_size > 0 and notify:
|
||||
percent = (downloaded / total_size) * 100
|
||||
notify(
|
||||
f"Downloading: {percent:.1f}% ({downloaded}/{total_size} bytes)"
|
||||
)
|
||||
|
||||
return dest_path
|
||||
except httpx.HTTPStatusError as exc:
|
||||
if notify:
|
||||
notify(
|
||||
f"Download HTTP error: {exc.response.status_code} {exc.response.reason_phrase}"
|
||||
)
|
||||
try:
|
||||
if dest_path.exists() and dest_path.stat().st_size < MIN_FILE_SIZE:
|
||||
dest_path.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
return None
|
||||
except httpx.HTTPError as exc:
|
||||
if notify:
|
||||
notify(f"Download network error: {exc!s}")
|
||||
try:
|
||||
if dest_path.exists() and dest_path.stat().st_size < MIN_FILE_SIZE:
|
||||
dest_path.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
return None
|
||||
except (OSError, ValueError, KeyError) as exc:
|
||||
if notify:
|
||||
notify(f"Download error: {exc!s}")
|
||||
try:
|
||||
if dest_path.exists() and dest_path.stat().st_size < MIN_FILE_SIZE:
|
||||
dest_path.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
return None
|
||||
|
||||
def close(self) -> None:
|
||||
"""Close the HTTP clients and release resources."""
|
||||
if hasattr(self, "_http_client"):
|
||||
self._http_client.close()
|
||||
if hasattr(self, "_download_client"):
|
||||
self._download_client.close()
|
||||
241
auditui/library.py
Normal file
241
auditui/library.py
Normal file
@@ -0,0 +1,241 @@
|
||||
"""Library helpers for fetching and formatting Audible data."""
|
||||
|
||||
from typing import Callable, List
|
||||
|
||||
import audible
|
||||
|
||||
|
||||
ProgressCallback = Callable[[str], None]
|
||||
|
||||
|
||||
class LibraryClient:
|
||||
"""Helper for interacting with the Audible library."""
|
||||
|
||||
def __init__(self, client: audible.Client) -> None:
|
||||
self.client = client
|
||||
|
||||
def fetch_all_items(self, on_progress: ProgressCallback | None = None) -> list:
|
||||
"""Fetch all library items from the API."""
|
||||
response_groups = (
|
||||
"contributors,media,product_attrs,product_desc,product_details,"
|
||||
"rating,is_finished,listening_status,percent_complete"
|
||||
)
|
||||
return self._fetch_all_pages(response_groups, on_progress)
|
||||
|
||||
def _fetch_all_pages(
|
||||
self, response_groups: str, on_progress: ProgressCallback | None = None
|
||||
) -> list:
|
||||
"""Fetch all pages of library items from the API."""
|
||||
all_items: List[dict] = []
|
||||
page = 1
|
||||
page_size = 50
|
||||
|
||||
while True:
|
||||
library = self.client.get(
|
||||
path="library",
|
||||
num_results=page_size,
|
||||
page=page,
|
||||
response_groups=response_groups,
|
||||
)
|
||||
|
||||
items = list(library.get("items", []))
|
||||
if not items:
|
||||
break
|
||||
|
||||
all_items.extend(items)
|
||||
if on_progress:
|
||||
on_progress(f"Fetched page {page} ({len(items)} items)...")
|
||||
|
||||
if len(items) < page_size:
|
||||
break
|
||||
|
||||
page += 1
|
||||
|
||||
return all_items
|
||||
|
||||
def extract_title(self, item: dict) -> str:
|
||||
"""Extract title from library item."""
|
||||
product = item.get("product", {})
|
||||
return (
|
||||
product.get("title")
|
||||
or item.get("title")
|
||||
or product.get("asin", "Unknown Title")
|
||||
)
|
||||
|
||||
def extract_authors(self, item: dict) -> str:
|
||||
"""Extract author names from library item."""
|
||||
product = item.get("product", {})
|
||||
authors = product.get("authors") or product.get("contributors") or []
|
||||
if not authors and "authors" in item:
|
||||
authors = item.get("authors", [])
|
||||
|
||||
author_names = [a.get("name", "")
|
||||
for a in authors if isinstance(a, dict)]
|
||||
return ", ".join(author_names) or "Unknown"
|
||||
|
||||
def extract_runtime_minutes(self, item: dict) -> int | None:
|
||||
"""Extract runtime in minutes from library item."""
|
||||
product = item.get("product", {})
|
||||
runtime_fields = [
|
||||
"runtime_length_min",
|
||||
"runtime_length",
|
||||
"vLength",
|
||||
"length",
|
||||
"duration",
|
||||
]
|
||||
|
||||
runtime = None
|
||||
for field in runtime_fields:
|
||||
runtime = product.get(field) or item.get(field)
|
||||
if runtime is not None:
|
||||
break
|
||||
|
||||
if runtime is None:
|
||||
return None
|
||||
|
||||
if isinstance(runtime, dict):
|
||||
return int(runtime.get("min", 0))
|
||||
if isinstance(runtime, (int, float)):
|
||||
return int(runtime)
|
||||
return None
|
||||
|
||||
def extract_progress_info(self, item: dict) -> float | None:
|
||||
"""Extract progress percentage from library item."""
|
||||
percent_complete = item.get("percent_complete")
|
||||
listening_status = item.get("listening_status", {})
|
||||
|
||||
if isinstance(listening_status, dict) and percent_complete is None:
|
||||
percent_complete = listening_status.get("percent_complete")
|
||||
|
||||
return float(percent_complete) if percent_complete is not None else None
|
||||
|
||||
def extract_asin(self, item: dict) -> str | None:
|
||||
"""Extract ASIN from library item."""
|
||||
product = item.get("product", {})
|
||||
return item.get("asin") or product.get("asin")
|
||||
|
||||
def is_finished(self, item: dict) -> bool:
|
||||
"""Check if a library item is finished."""
|
||||
is_finished_flag = item.get("is_finished")
|
||||
percent_complete = item.get("percent_complete")
|
||||
listening_status = item.get("listening_status")
|
||||
|
||||
if isinstance(listening_status, dict):
|
||||
is_finished_flag = is_finished_flag or listening_status.get(
|
||||
"is_finished", False
|
||||
)
|
||||
if percent_complete is None:
|
||||
percent_complete = listening_status.get("percent_complete", 0)
|
||||
|
||||
return bool(is_finished_flag) or (
|
||||
isinstance(percent_complete, (int, float)
|
||||
) and percent_complete >= 100
|
||||
)
|
||||
|
||||
def get_last_position(self, asin: str) -> float | None:
|
||||
"""Get the last playback position for a book in seconds."""
|
||||
try:
|
||||
response = self.client.get(
|
||||
path="1.0/annotations/lastpositions",
|
||||
asins=asin,
|
||||
)
|
||||
annotations = response.get("asin_last_position_heard_annots", [])
|
||||
|
||||
for annot in annotations:
|
||||
if annot.get("asin") != asin:
|
||||
continue
|
||||
|
||||
last_position_heard = annot.get("last_position_heard", {})
|
||||
if not isinstance(last_position_heard, dict):
|
||||
continue
|
||||
|
||||
if last_position_heard.get("status") == "DoesNotExist":
|
||||
return None
|
||||
|
||||
position_ms = last_position_heard.get("position_ms")
|
||||
if position_ms is not None:
|
||||
return float(position_ms) / 1000.0
|
||||
|
||||
return None
|
||||
except (OSError, ValueError, KeyError):
|
||||
return None
|
||||
|
||||
def _get_content_reference(self, asin: str) -> dict | None:
|
||||
"""Get content reference data including ACR and version."""
|
||||
try:
|
||||
response = self.client.get(
|
||||
path=f"1.0/content/{asin}/metadata",
|
||||
response_groups="content_reference",
|
||||
)
|
||||
content_metadata = response.get("content_metadata", {})
|
||||
content_reference = content_metadata.get("content_reference", {})
|
||||
if isinstance(content_reference, dict):
|
||||
return content_reference
|
||||
return None
|
||||
except (OSError, ValueError, KeyError):
|
||||
return None
|
||||
|
||||
def save_last_position(self, asin: str, position_seconds: float) -> bool:
|
||||
"""Save the last playback position for a book."""
|
||||
if position_seconds <= 0:
|
||||
return False
|
||||
|
||||
content_ref = self._get_content_reference(asin)
|
||||
if not content_ref:
|
||||
return False
|
||||
|
||||
acr = content_ref.get("acr")
|
||||
if not acr:
|
||||
return False
|
||||
|
||||
body = {
|
||||
"acr": acr,
|
||||
"asin": asin,
|
||||
"position_ms": int(position_seconds * 1000),
|
||||
}
|
||||
|
||||
if version := content_ref.get("version"):
|
||||
body["version"] = version
|
||||
|
||||
try:
|
||||
self.client.put(
|
||||
path=f"1.0/lastpositions/{asin}",
|
||||
body=body,
|
||||
)
|
||||
return True
|
||||
except (OSError, ValueError, KeyError):
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def format_duration(
|
||||
value: int | None, unit: str = "minutes", default_none: str | None = None
|
||||
) -> str | None:
|
||||
"""Format duration value into a human-readable string."""
|
||||
if value is None or value <= 0:
|
||||
return default_none
|
||||
|
||||
total_minutes = int(value)
|
||||
if unit == "seconds":
|
||||
total_minutes //= 60
|
||||
|
||||
hours, minutes = divmod(total_minutes, 60)
|
||||
|
||||
parts = []
|
||||
if hours:
|
||||
parts.append(f"{hours} hour{'s' if hours != 1 else ''}")
|
||||
if minutes:
|
||||
parts.append(f"{minutes} minute{'s' if minutes != 1 else ''}")
|
||||
|
||||
return " ".join(parts) if parts else default_none
|
||||
|
||||
@staticmethod
|
||||
def format_time(seconds: float) -> str:
|
||||
"""Format seconds as HH:MM:SS or MM:SS."""
|
||||
total_seconds = int(seconds)
|
||||
hours = total_seconds // 3600
|
||||
minutes = (total_seconds % 3600) // 60
|
||||
secs = total_seconds % 60
|
||||
|
||||
if hours > 0:
|
||||
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
|
||||
return f"{minutes:02d}:{secs:02d}"
|
||||
42
auditui/media_info.py
Normal file
42
auditui/media_info.py
Normal file
@@ -0,0 +1,42 @@
|
||||
"""Media information loading for Audible content."""
|
||||
|
||||
import json
|
||||
import shutil
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def load_media_info(path: Path, activation_hex: str | None = None) -> tuple[float | None, list[dict]]:
|
||||
"""Load media information including duration and chapters using ffprobe."""
|
||||
if not shutil.which("ffprobe"):
|
||||
return None, []
|
||||
|
||||
try:
|
||||
cmd = ["ffprobe", "-v", "quiet", "-print_format",
|
||||
"json", "-show_format", "-show_chapters"]
|
||||
if activation_hex:
|
||||
cmd.extend(["-activation_bytes", activation_hex])
|
||||
cmd.append(str(path))
|
||||
|
||||
result = subprocess.run(
|
||||
cmd, capture_output=True, text=True, timeout=10)
|
||||
if result.returncode != 0:
|
||||
return None, []
|
||||
|
||||
data = json.loads(result.stdout)
|
||||
format_info = data.get("format", {})
|
||||
duration_str = format_info.get("duration")
|
||||
duration = float(duration_str) if duration_str else None
|
||||
|
||||
chapters_data = data.get("chapters", [])
|
||||
chapters = [
|
||||
{
|
||||
"start_time": float(ch.get("start_time", 0)),
|
||||
"end_time": float(ch.get("end_time", 0)),
|
||||
"title": ch.get("tags", {}).get("title", f"Chapter {idx + 1}"),
|
||||
}
|
||||
for idx, ch in enumerate(chapters_data)
|
||||
]
|
||||
return duration, chapters
|
||||
except (json.JSONDecodeError, subprocess.TimeoutExpired, ValueError, KeyError):
|
||||
return None, []
|
||||
491
auditui/playback.py
Normal file
491
auditui/playback.py
Normal file
@@ -0,0 +1,491 @@
|
||||
"""Playback control for Auditui."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import signal
|
||||
import subprocess
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Callable
|
||||
|
||||
from .downloads import DownloadManager
|
||||
from .library import LibraryClient
|
||||
from .media_info import load_media_info
|
||||
|
||||
StatusCallback = Callable[[str], None]
|
||||
|
||||
|
||||
class PlaybackController:
|
||||
"""Manage playback through ffplay."""
|
||||
|
||||
def __init__(self, notify: StatusCallback, library_client: LibraryClient | None = None) -> None:
|
||||
self.notify = notify
|
||||
self.library_client = library_client
|
||||
self.playback_process: subprocess.Popen | None = None
|
||||
self.is_playing = False
|
||||
self.is_paused = False
|
||||
self.current_file_path: Path | None = None
|
||||
self.current_asin: str | None = None
|
||||
self.playback_start_time: float | None = None
|
||||
self.paused_duration: float = 0.0
|
||||
self.pause_start_time: float | None = None
|
||||
self.total_duration: float | None = None
|
||||
self.chapters: list[dict] = []
|
||||
self.seek_offset: float = 0.0
|
||||
self.activation_hex: str | None = None
|
||||
self.last_save_time: float = 0.0
|
||||
self.position_save_interval: float = 30.0
|
||||
|
||||
def start(
|
||||
self,
|
||||
path: Path,
|
||||
activation_hex: str | None = None,
|
||||
status_callback: StatusCallback | None = None,
|
||||
start_position: float = 0.0,
|
||||
) -> bool:
|
||||
"""Start playing a local file using ffplay."""
|
||||
notify = status_callback or self.notify
|
||||
|
||||
if not shutil.which("ffplay"):
|
||||
notify("ffplay not found. Please install ffmpeg")
|
||||
return False
|
||||
|
||||
if self.playback_process is not None:
|
||||
self.stop()
|
||||
|
||||
self.activation_hex = activation_hex
|
||||
self.seek_offset = start_position
|
||||
|
||||
cmd = ["ffplay", "-nodisp", "-autoexit"]
|
||||
if activation_hex:
|
||||
cmd.extend(["-activation_bytes", activation_hex])
|
||||
if start_position > 0:
|
||||
cmd.extend(["-ss", str(start_position)])
|
||||
cmd.append(str(path))
|
||||
|
||||
try:
|
||||
self.playback_process = subprocess.Popen(
|
||||
cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
|
||||
)
|
||||
|
||||
time.sleep(0.2)
|
||||
if self.playback_process.poll() is not None:
|
||||
return_code = self.playback_process.returncode
|
||||
if return_code == 0 and start_position > 0 and self.total_duration:
|
||||
if start_position >= self.total_duration - 5:
|
||||
notify("Reached end of file")
|
||||
self._reset_state()
|
||||
return False
|
||||
notify(
|
||||
f"Playback process exited immediately (code: {return_code})")
|
||||
self.playback_process = None
|
||||
return False
|
||||
|
||||
self.is_playing = True
|
||||
self.is_paused = False
|
||||
self.current_file_path = path
|
||||
self.playback_start_time = time.time()
|
||||
self.paused_duration = 0.0
|
||||
self.pause_start_time = None
|
||||
duration, chapters = load_media_info(path, activation_hex)
|
||||
self.total_duration = duration
|
||||
self.chapters = chapters
|
||||
notify(f"Playing: {path.name}")
|
||||
return True
|
||||
|
||||
except (OSError, ValueError, subprocess.SubprocessError) as exc:
|
||||
notify(f"Error starting playback: {exc}")
|
||||
return False
|
||||
|
||||
def stop(self) -> None:
|
||||
"""Stop the current playback."""
|
||||
if self.playback_process is None:
|
||||
return
|
||||
|
||||
self._save_current_position()
|
||||
|
||||
try:
|
||||
if self.playback_process.poll() is None:
|
||||
self.playback_process.terminate()
|
||||
try:
|
||||
self.playback_process.wait(timeout=2)
|
||||
except subprocess.TimeoutExpired:
|
||||
self.playback_process.kill()
|
||||
self.playback_process.wait()
|
||||
except (ProcessLookupError, ValueError):
|
||||
pass
|
||||
finally:
|
||||
self._reset_state()
|
||||
|
||||
def pause(self) -> None:
|
||||
"""Pause the current playback."""
|
||||
if not self._validate_playback_state(require_paused=False):
|
||||
return
|
||||
|
||||
self.pause_start_time = time.time()
|
||||
self._send_signal(signal.SIGSTOP, "Paused", "pause")
|
||||
|
||||
def resume(self) -> None:
|
||||
"""Resume the current playback."""
|
||||
if not self._validate_playback_state(require_paused=True):
|
||||
return
|
||||
|
||||
if self.pause_start_time is not None:
|
||||
self.paused_duration += time.time() - self.pause_start_time
|
||||
self.pause_start_time = None
|
||||
self._send_signal(signal.SIGCONT, "Playing", "resume")
|
||||
|
||||
def check_status(self) -> str | None:
|
||||
"""Check if playback process has finished and return status message."""
|
||||
if self.playback_process is None:
|
||||
return None
|
||||
|
||||
return_code = self.playback_process.poll()
|
||||
if return_code is None:
|
||||
return None
|
||||
|
||||
finished_file = self.current_file_path
|
||||
self._reset_state()
|
||||
|
||||
if finished_file:
|
||||
if return_code == 0:
|
||||
return f"Finished: {finished_file.name}"
|
||||
return f"Playback ended unexpectedly (code: {return_code}): {finished_file.name}"
|
||||
return "Playback finished"
|
||||
|
||||
def _reset_state(self) -> None:
|
||||
"""Reset all playback state."""
|
||||
self.playback_process = None
|
||||
self.is_playing = False
|
||||
self.is_paused = False
|
||||
self.current_file_path = None
|
||||
self.current_asin = None
|
||||
self.playback_start_time = None
|
||||
self.paused_duration = 0.0
|
||||
self.pause_start_time = None
|
||||
self.total_duration = None
|
||||
self.chapters = []
|
||||
self.seek_offset = 0.0
|
||||
self.activation_hex = None
|
||||
self.last_save_time = 0.0
|
||||
|
||||
def _validate_playback_state(self, require_paused: bool) -> bool:
|
||||
"""Validate playback state before pause/resume operations."""
|
||||
if not (self.playback_process and self.is_playing):
|
||||
return False
|
||||
|
||||
if require_paused and not self.is_paused:
|
||||
return False
|
||||
if not require_paused and self.is_paused:
|
||||
return False
|
||||
|
||||
if not self.is_alive():
|
||||
self.stop()
|
||||
self.notify("Playback process has ended")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def _send_signal(self, sig: signal.Signals, status_prefix: str, action: str) -> None:
|
||||
"""Send signal to playback process and update state."""
|
||||
if self.playback_process is None:
|
||||
return
|
||||
|
||||
try:
|
||||
os.kill(self.playback_process.pid, sig)
|
||||
self.is_paused = sig == signal.SIGSTOP
|
||||
filename = self.current_file_path.name if self.current_file_path else None
|
||||
message = f"{status_prefix}: {filename}" if filename else status_prefix
|
||||
self.notify(message)
|
||||
except ProcessLookupError:
|
||||
self.stop()
|
||||
self.notify("Process no longer exists")
|
||||
except PermissionError:
|
||||
self.notify(f"Permission denied: cannot {action} playback")
|
||||
except (OSError, ValueError) as exc:
|
||||
self.notify(f"Error {action}ing playback: {exc}")
|
||||
|
||||
def is_alive(self) -> bool:
|
||||
"""Check if playback process is still running."""
|
||||
if self.playback_process is None:
|
||||
return False
|
||||
return self.playback_process.poll() is None
|
||||
|
||||
def prepare_and_start(
|
||||
self,
|
||||
download_manager: DownloadManager,
|
||||
asin: str,
|
||||
status_callback: StatusCallback | None = None,
|
||||
) -> bool:
|
||||
"""Download file, get activation bytes, and start playback."""
|
||||
notify = status_callback or self.notify
|
||||
|
||||
if not download_manager:
|
||||
notify("Could not download file")
|
||||
return False
|
||||
|
||||
notify("Preparing playback...")
|
||||
|
||||
local_path = download_manager.get_or_download(asin, notify)
|
||||
if not local_path:
|
||||
notify("Could not download file")
|
||||
return False
|
||||
|
||||
notify("Getting activation bytes...")
|
||||
activation_hex = download_manager.get_activation_bytes()
|
||||
if not activation_hex:
|
||||
notify("Failed to get activation bytes")
|
||||
return False
|
||||
|
||||
start_position = 0.0
|
||||
if self.library_client:
|
||||
try:
|
||||
last_position = self.library_client.get_last_position(asin)
|
||||
if last_position is not None and last_position > 0:
|
||||
start_position = last_position
|
||||
notify(
|
||||
f"Resuming from {LibraryClient.format_time(start_position)}")
|
||||
except (OSError, ValueError, KeyError):
|
||||
pass
|
||||
|
||||
notify(f"Starting playback of {local_path.name}...")
|
||||
self.current_asin = asin
|
||||
self.last_save_time = time.time()
|
||||
return self.start(local_path, activation_hex, notify, start_position)
|
||||
|
||||
def toggle_playback(self) -> bool:
|
||||
"""Toggle pause/resume state. Returns True if action was taken."""
|
||||
if not self.is_playing:
|
||||
return False
|
||||
|
||||
if not self.is_alive():
|
||||
self.stop()
|
||||
self.notify("Playback has ended")
|
||||
return False
|
||||
|
||||
if self.is_paused:
|
||||
self.resume()
|
||||
else:
|
||||
self.pause()
|
||||
return True
|
||||
|
||||
def _get_current_elapsed(self) -> float:
|
||||
"""Calculate current elapsed playback time."""
|
||||
if self.playback_start_time is None:
|
||||
return 0.0
|
||||
|
||||
current_time = time.time()
|
||||
if self.is_paused and self.pause_start_time is not None:
|
||||
return (self.pause_start_time - self.playback_start_time) - self.paused_duration
|
||||
|
||||
if self.pause_start_time is not None:
|
||||
self.paused_duration += current_time - self.pause_start_time
|
||||
self.pause_start_time = None
|
||||
|
||||
return max(0.0, (current_time - self.playback_start_time) - self.paused_duration)
|
||||
|
||||
def _stop_process(self) -> None:
|
||||
"""Stop the playback process without resetting state."""
|
||||
if not self.playback_process:
|
||||
return
|
||||
|
||||
try:
|
||||
if self.playback_process.poll() is None:
|
||||
self.playback_process.terminate()
|
||||
try:
|
||||
self.playback_process.wait(timeout=2)
|
||||
except subprocess.TimeoutExpired:
|
||||
self.playback_process.kill()
|
||||
self.playback_process.wait()
|
||||
except (ProcessLookupError, ValueError):
|
||||
pass
|
||||
|
||||
self.playback_process = None
|
||||
self.is_playing = False
|
||||
self.is_paused = False
|
||||
self.playback_start_time = None
|
||||
self.paused_duration = 0.0
|
||||
self.pause_start_time = None
|
||||
|
||||
def _seek(self, seconds: float, direction: str) -> bool:
|
||||
"""Seek forward or backward by specified seconds."""
|
||||
if not self.is_playing or not self.current_file_path:
|
||||
return False
|
||||
|
||||
elapsed = self._get_current_elapsed()
|
||||
current_total_position = self.seek_offset + elapsed
|
||||
|
||||
if direction == "forward":
|
||||
new_position = current_total_position + seconds
|
||||
if self.total_duration:
|
||||
if new_position >= self.total_duration - 2:
|
||||
self.notify("Already at end of file")
|
||||
return False
|
||||
new_position = min(new_position, self.total_duration - 2)
|
||||
message = f"Skipped forward {int(seconds)}s"
|
||||
else:
|
||||
new_position = max(0.0, current_total_position - seconds)
|
||||
message = f"Skipped backward {int(seconds)}s"
|
||||
|
||||
was_paused = self.is_paused
|
||||
saved_state = {
|
||||
"file_path": self.current_file_path,
|
||||
"asin": self.current_asin,
|
||||
"activation": self.activation_hex,
|
||||
"duration": self.total_duration,
|
||||
"chapters": self.chapters.copy(),
|
||||
}
|
||||
|
||||
self._stop_process()
|
||||
time.sleep(0.2)
|
||||
|
||||
if self.start(saved_state["file_path"], saved_state["activation"], self.notify, new_position):
|
||||
self.current_asin = saved_state["asin"]
|
||||
self.total_duration = saved_state["duration"]
|
||||
self.chapters = saved_state["chapters"]
|
||||
if was_paused:
|
||||
time.sleep(0.3)
|
||||
self.pause()
|
||||
self.notify(message)
|
||||
return True
|
||||
return False
|
||||
|
||||
def seek_forward(self, seconds: float = 30.0) -> bool:
|
||||
"""Seek forward by specified seconds. Returns True if action was taken."""
|
||||
return self._seek(seconds, "forward")
|
||||
|
||||
def seek_backward(self, seconds: float = 30.0) -> bool:
|
||||
"""Seek backward by specified seconds. Returns True if action was taken."""
|
||||
return self._seek(seconds, "backward")
|
||||
|
||||
def get_current_progress(self) -> tuple[str, float, float] | None:
|
||||
"""Get current playback progress."""
|
||||
if not self.is_playing or self.playback_start_time is None:
|
||||
return None
|
||||
|
||||
elapsed = self._get_current_elapsed()
|
||||
total_elapsed = self.seek_offset + elapsed
|
||||
chapter_name, chapter_elapsed, chapter_total = self._get_current_chapter(
|
||||
total_elapsed)
|
||||
return (chapter_name, chapter_elapsed, chapter_total)
|
||||
|
||||
def _get_current_chapter(self, elapsed: float) -> tuple[str, float, float]:
|
||||
"""Get current chapter info."""
|
||||
if not self.chapters:
|
||||
return ("Unknown Chapter", elapsed, self.total_duration or 0.0)
|
||||
|
||||
for chapter in self.chapters:
|
||||
if chapter["start_time"] <= elapsed < chapter["end_time"]:
|
||||
chapter_elapsed = elapsed - chapter["start_time"]
|
||||
chapter_total = chapter["end_time"] - chapter["start_time"]
|
||||
return (chapter["title"], chapter_elapsed, chapter_total)
|
||||
|
||||
last_chapter = self.chapters[-1]
|
||||
chapter_elapsed = max(0.0, elapsed - last_chapter["start_time"])
|
||||
chapter_total = last_chapter["end_time"] - last_chapter["start_time"]
|
||||
return (last_chapter["title"], chapter_elapsed, chapter_total)
|
||||
|
||||
def _get_current_chapter_index(self, elapsed: float) -> int | None:
|
||||
"""Get the index of the current chapter based on elapsed time."""
|
||||
if not self.chapters:
|
||||
return None
|
||||
|
||||
for idx, chapter in enumerate(self.chapters):
|
||||
if chapter["start_time"] <= elapsed < chapter["end_time"]:
|
||||
return idx
|
||||
|
||||
return len(self.chapters) - 1
|
||||
|
||||
def seek_to_chapter(self, direction: str) -> bool:
|
||||
"""Seek to next or previous chapter."""
|
||||
if not self.is_playing or not self.current_file_path:
|
||||
return False
|
||||
|
||||
if not self.chapters:
|
||||
self.notify("No chapter information available")
|
||||
return False
|
||||
|
||||
elapsed = self._get_current_elapsed()
|
||||
current_total_position = self.seek_offset + elapsed
|
||||
current_chapter_idx = self._get_current_chapter_index(
|
||||
current_total_position)
|
||||
|
||||
if current_chapter_idx is None:
|
||||
self.notify("Could not determine current chapter")
|
||||
return False
|
||||
|
||||
if direction == "next":
|
||||
if current_chapter_idx >= len(self.chapters) - 1:
|
||||
self.notify("Already at last chapter")
|
||||
return False
|
||||
target_chapter = self.chapters[current_chapter_idx + 1]
|
||||
new_position = target_chapter["start_time"]
|
||||
message = f"Next chapter: {target_chapter['title']}"
|
||||
else:
|
||||
if current_chapter_idx <= 0:
|
||||
self.notify("Already at first chapter")
|
||||
return False
|
||||
target_chapter = self.chapters[current_chapter_idx - 1]
|
||||
new_position = target_chapter["start_time"]
|
||||
message = f"Previous chapter: {target_chapter['title']}"
|
||||
|
||||
was_paused = self.is_paused
|
||||
saved_state = {
|
||||
"file_path": self.current_file_path,
|
||||
"asin": self.current_asin,
|
||||
"activation": self.activation_hex,
|
||||
"duration": self.total_duration,
|
||||
"chapters": self.chapters.copy(),
|
||||
}
|
||||
|
||||
self._stop_process()
|
||||
time.sleep(0.2)
|
||||
|
||||
if self.start(saved_state["file_path"], saved_state["activation"], self.notify, new_position):
|
||||
self.current_asin = saved_state["asin"]
|
||||
self.total_duration = saved_state["duration"]
|
||||
self.chapters = saved_state["chapters"]
|
||||
if was_paused:
|
||||
time.sleep(0.3)
|
||||
self.pause()
|
||||
self.notify(message)
|
||||
return True
|
||||
return False
|
||||
|
||||
def seek_to_next_chapter(self) -> bool:
|
||||
"""Seek to the next chapter. Returns True if action was taken."""
|
||||
return self.seek_to_chapter("next")
|
||||
|
||||
def seek_to_previous_chapter(self) -> bool:
|
||||
"""Seek to the previous chapter. Returns True if action was taken."""
|
||||
return self.seek_to_chapter("previous")
|
||||
|
||||
def _save_current_position(self) -> None:
|
||||
"""Save the current playback position to Audible."""
|
||||
if not (self.library_client and self.current_asin and self.is_playing):
|
||||
return
|
||||
|
||||
if self.playback_start_time is None:
|
||||
return
|
||||
|
||||
current_position = self.seek_offset + self._get_current_elapsed()
|
||||
if current_position <= 0:
|
||||
return
|
||||
|
||||
try:
|
||||
self.library_client.save_last_position(
|
||||
self.current_asin, current_position)
|
||||
except (OSError, ValueError, KeyError):
|
||||
pass
|
||||
|
||||
def update_position_if_needed(self) -> None:
|
||||
"""Periodically save position if enough time has passed."""
|
||||
if not (self.is_playing and self.library_client and self.current_asin):
|
||||
return
|
||||
|
||||
current_time = time.time()
|
||||
if current_time - self.last_save_time >= self.position_save_interval:
|
||||
self._save_current_position()
|
||||
self.last_save_time = current_time
|
||||
87
auditui/table_utils.py
Normal file
87
auditui/table_utils.py
Normal file
@@ -0,0 +1,87 @@
|
||||
"""Utils for table operations."""
|
||||
|
||||
import unicodedata
|
||||
from typing import TYPE_CHECKING, Callable
|
||||
|
||||
from .constants import (
|
||||
AUTHOR_NAME_DISPLAY_LENGTH,
|
||||
AUTHOR_NAME_MAX_LENGTH,
|
||||
PROGRESS_COLUMN_INDEX,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .downloads import DownloadManager
|
||||
|
||||
|
||||
def create_title_sort_key(reverse: bool = False) -> tuple[Callable, bool]:
|
||||
"""Create a sort key function for sorting by title."""
|
||||
def title_key(row_values):
|
||||
title_cell = row_values[0]
|
||||
if isinstance(title_cell, str):
|
||||
normalized = unicodedata.normalize('NFD', title_cell)
|
||||
return normalized.encode('ascii', 'ignore').decode('ascii').lower()
|
||||
return str(title_cell).lower()
|
||||
|
||||
return title_key, reverse
|
||||
|
||||
|
||||
def create_progress_sort_key(progress_column_index: int = PROGRESS_COLUMN_INDEX, reverse: bool = False) -> tuple[Callable, bool]:
|
||||
"""Create a sort key function for sorting by progress percentage."""
|
||||
def progress_key(row_values):
|
||||
progress_cell = row_values[progress_column_index]
|
||||
if isinstance(progress_cell, str):
|
||||
try:
|
||||
return float(progress_cell.rstrip("%"))
|
||||
except (ValueError, AttributeError):
|
||||
return 0.0
|
||||
return 0.0
|
||||
|
||||
return progress_key, reverse
|
||||
|
||||
|
||||
def truncate_author_name(author_names: str) -> str:
|
||||
"""Truncate author name if it exceeds maximum length."""
|
||||
if author_names and len(author_names) > AUTHOR_NAME_MAX_LENGTH:
|
||||
return f"{author_names[:AUTHOR_NAME_DISPLAY_LENGTH]}..."
|
||||
return author_names
|
||||
|
||||
|
||||
def format_item_as_row(item: dict, library_client, download_manager: "DownloadManager | None" = None) -> tuple[str, str, str, str, str]:
|
||||
"""Format a library item into table row data.
|
||||
|
||||
Returns:
|
||||
Tuple of (title, author, runtime, progress, downloaded) strings
|
||||
"""
|
||||
title = library_client.extract_title(item)
|
||||
|
||||
author_names = library_client.extract_authors(item)
|
||||
author_names = truncate_author_name(author_names)
|
||||
author_display = author_names or "Unknown"
|
||||
|
||||
minutes = library_client.extract_runtime_minutes(item)
|
||||
runtime_str = library_client.format_duration(
|
||||
minutes, unit="minutes", default_none="Unknown length"
|
||||
) or "Unknown"
|
||||
|
||||
percent_complete = library_client.extract_progress_info(item)
|
||||
progress_str = (
|
||||
f"{percent_complete:.1f}%"
|
||||
if percent_complete and percent_complete > 0
|
||||
else "0%"
|
||||
)
|
||||
|
||||
downloaded_str = ""
|
||||
if download_manager:
|
||||
asin = library_client.extract_asin(item)
|
||||
if asin and download_manager.is_cached(asin):
|
||||
downloaded_str = "✓"
|
||||
|
||||
return (title, author_display, runtime_str, progress_str, downloaded_str)
|
||||
|
||||
|
||||
def filter_unfinished_items(items: list[dict], library_client) -> list[dict]:
|
||||
"""Filter out finished items from the list."""
|
||||
return [
|
||||
item for item in items
|
||||
if not library_client.is_finished(item)
|
||||
]
|
||||
33
auditui/ui.py
Normal file
33
auditui/ui.py
Normal file
@@ -0,0 +1,33 @@
|
||||
"""UI components for the Auditui application."""
|
||||
|
||||
from textual.app import ComposeResult
|
||||
from textual.containers import Container, Horizontal, ScrollableContainer
|
||||
from textual.screen import ModalScreen
|
||||
from textual.widgets import Static
|
||||
|
||||
|
||||
class HelpScreen(ModalScreen):
|
||||
"""Help screen displaying all available keybindings."""
|
||||
|
||||
BINDINGS = [("escape", "dismiss", "Close"), ("?", "dismiss", "Close")]
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
with Container(id="help_container"):
|
||||
yield Static("Key Bindings", id="help_title")
|
||||
with ScrollableContainer(id="help_content"):
|
||||
bindings = self.app.BINDINGS
|
||||
for binding in bindings:
|
||||
if isinstance(binding, tuple):
|
||||
key, action, description = binding
|
||||
else:
|
||||
key = binding.key
|
||||
description = binding.description
|
||||
key_display = key.replace(
|
||||
"ctrl+", "^").replace("left", "←").replace("right", "→").replace("space", "Space").replace("enter", "Enter")
|
||||
with Horizontal(classes="help_row"):
|
||||
yield Static(f"[bold #f9e2af]{key_display}[/]", classes="help_key")
|
||||
yield Static(description, classes="help_action")
|
||||
yield Static("Press [bold #f9e2af]?[/] or [bold #f9e2af]Escape[/] to close", id="help_footer")
|
||||
|
||||
def action_dismiss(self) -> None:
|
||||
self.dismiss()
|
||||
379
main.py
379
main.py
@@ -1,379 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""A terminal-based user interface (TUI) client for Audible"""
|
||||
|
||||
import sys
|
||||
from getpass import getpass
|
||||
from pathlib import Path
|
||||
|
||||
import audible
|
||||
from textual.app import App, ComposeResult
|
||||
from textual.widgets import Footer, Header, DataTable, Static
|
||||
from textual.worker import get_current_worker
|
||||
from textual import work
|
||||
|
||||
|
||||
class AudituiApp(App):
|
||||
"""Main application class for the Audible TUI app."""
|
||||
BINDINGS = [
|
||||
("d", "toggle_dark", "Toggle dark mode"),
|
||||
("s", "sort", "Sort by title"),
|
||||
("r", "reverse_sort", "Reverse sort"),
|
||||
("p", "sort_by_progress", "Sort by progress"),
|
||||
("a", "show_all", "Show all books"),
|
||||
("u", "show_unfinished", "Show unfinished"),
|
||||
("q", "quit", "Quit application"),
|
||||
]
|
||||
|
||||
CSS = """
|
||||
DataTable {
|
||||
height: 1fr;
|
||||
}
|
||||
Static {
|
||||
height: 1;
|
||||
text-align: center;
|
||||
background: $primary;
|
||||
}
|
||||
"""
|
||||
|
||||
AUTH_PATH = Path.home() / ".config" / "auditui" / "auth.json"
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.auth = None
|
||||
self.client = None
|
||||
self.all_items = []
|
||||
self.current_items = []
|
||||
self.show_all_mode = False
|
||||
self.progress_sort_reverse = False
|
||||
self.title_column_key = None
|
||||
self.progress_column_key = None
|
||||
self.progress_column_index = 3
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
yield Header()
|
||||
yield Static("Loading...", id="status")
|
||||
table = DataTable()
|
||||
table.zebra_stripes = True
|
||||
table.cursor_type = "row"
|
||||
yield table
|
||||
yield Footer()
|
||||
|
||||
def on_mount(self) -> None:
|
||||
"""Initialize the table and start fetching library data."""
|
||||
table = self.query_one(DataTable)
|
||||
table.add_columns("Title", "Author", "Length", "Progress")
|
||||
column_keys = list(table.columns.keys())
|
||||
self.title_column_key = column_keys[0]
|
||||
self.progress_column_key = column_keys[3]
|
||||
if self.client:
|
||||
self.update_status("Fetching library...")
|
||||
self.fetch_library()
|
||||
else:
|
||||
self.update_status(
|
||||
"Not authenticated. Please restart and authenticate.")
|
||||
|
||||
@work(exclusive=True, thread=True)
|
||||
def fetch_library(self) -> None:
|
||||
"""Fetch all library items from Audible API in background thread."""
|
||||
worker = get_current_worker()
|
||||
if worker.is_cancelled:
|
||||
return
|
||||
|
||||
try:
|
||||
response_groups = (
|
||||
"contributors,media,product_attrs,product_desc,product_details,"
|
||||
"rating,is_finished,listening_status,percent_complete"
|
||||
)
|
||||
all_items = self._fetch_all_pages(response_groups)
|
||||
self.all_items = all_items
|
||||
self.call_from_thread(self.on_library_loaded, all_items)
|
||||
except (OSError, ValueError, KeyError) as e:
|
||||
self.call_from_thread(self.on_library_error, str(e))
|
||||
|
||||
def _fetch_all_pages(self, response_groups):
|
||||
"""Fetch all pages of library items from the API."""
|
||||
all_items = []
|
||||
page = 1
|
||||
page_size = 50
|
||||
|
||||
while True:
|
||||
library = self.client.get(
|
||||
path="library",
|
||||
num_results=page_size,
|
||||
page=page,
|
||||
response_groups=response_groups
|
||||
)
|
||||
|
||||
items = library.get("items", [])
|
||||
|
||||
if not items:
|
||||
break
|
||||
|
||||
all_items.extend(items)
|
||||
self.call_from_thread(
|
||||
self.update_status,
|
||||
f"Fetched page {page} ({len(items)} items)..."
|
||||
)
|
||||
|
||||
if len(items) < page_size:
|
||||
break
|
||||
|
||||
page += 1
|
||||
|
||||
return all_items
|
||||
|
||||
def on_library_loaded(self, items) -> None:
|
||||
"""Handle successful library load."""
|
||||
self.all_items = items
|
||||
self.update_status(f"Loaded {len(items)} books")
|
||||
self.show_unfinished()
|
||||
|
||||
def on_library_error(self, error: str) -> None:
|
||||
"""Handle library fetch error."""
|
||||
self.update_status(f"Error fetching library: {error}")
|
||||
|
||||
def update_status(self, message: str) -> None:
|
||||
"""Update the status message in the UI."""
|
||||
status = self.query_one("#status", Static)
|
||||
status.update(message)
|
||||
|
||||
def format_duration(self, value, unit='minutes', default_none=None):
|
||||
"""Format duration value into human-readable string."""
|
||||
if value is None or value <= 0:
|
||||
return default_none
|
||||
|
||||
if unit == 'seconds':
|
||||
total_minutes = int(value) // 60
|
||||
else:
|
||||
total_minutes = int(value)
|
||||
|
||||
if total_minutes < 60:
|
||||
return f"{total_minutes} minute{'s' if total_minutes != 1 else ''}"
|
||||
|
||||
hours = total_minutes // 60
|
||||
mins = total_minutes % 60
|
||||
if mins == 0:
|
||||
return f"{hours} hour{'s' if hours != 1 else ''}"
|
||||
return f"{hours} hour{'s' if hours != 1 else ''} {mins} minute{'s' if mins != 1 else ''}"
|
||||
|
||||
def _extract_title(self, item):
|
||||
"""Extract title from library item."""
|
||||
product = item.get("product", {})
|
||||
return (product.get("title") or
|
||||
item.get("title") or
|
||||
product.get("asin", "Unknown Title"))
|
||||
|
||||
def _extract_authors(self, item):
|
||||
"""Extract author names from library item."""
|
||||
product = item.get("product", {})
|
||||
authors = product.get("authors") or product.get("contributors") or []
|
||||
if not authors and "authors" in item:
|
||||
authors = item.get("authors", [])
|
||||
return ", ".join([a.get("name", "") for a in authors if isinstance(a, dict)])
|
||||
|
||||
def _extract_runtime_minutes(self, item):
|
||||
"""Extract runtime in minutes from library item."""
|
||||
product = item.get("product", {})
|
||||
runtime_fields = [
|
||||
"runtime_length_min",
|
||||
"runtime_length",
|
||||
"vLength",
|
||||
"length",
|
||||
"duration"
|
||||
]
|
||||
|
||||
runtime = None
|
||||
for field in runtime_fields:
|
||||
runtime = product.get(field)
|
||||
if runtime is None:
|
||||
runtime = item.get(field)
|
||||
if runtime is not None:
|
||||
break
|
||||
|
||||
if runtime is None:
|
||||
return None
|
||||
|
||||
if isinstance(runtime, dict):
|
||||
if "min" in runtime:
|
||||
return int(runtime.get("min", 0))
|
||||
elif isinstance(runtime, (int, float)):
|
||||
return int(runtime)
|
||||
|
||||
return None
|
||||
|
||||
def _extract_progress_info(self, item):
|
||||
"""Extract progress percentage from library item."""
|
||||
percent_complete = item.get("percent_complete")
|
||||
listening_status = item.get("listening_status", {})
|
||||
|
||||
if isinstance(listening_status, dict):
|
||||
if percent_complete is None:
|
||||
percent_complete = listening_status.get("percent_complete")
|
||||
else:
|
||||
percent_complete = None
|
||||
|
||||
return percent_complete
|
||||
|
||||
def _populate_table(self, items):
|
||||
"""Populate the DataTable with library items."""
|
||||
table = self.query_one(DataTable)
|
||||
table.clear()
|
||||
|
||||
if not items:
|
||||
self.update_status("No books found.")
|
||||
return
|
||||
|
||||
for item in items:
|
||||
title = self._extract_title(item)
|
||||
author_names = self._extract_authors(item)
|
||||
minutes = self._extract_runtime_minutes(item)
|
||||
runtime_str = self.format_duration(
|
||||
minutes, unit='minutes', default_none="Unknown length")
|
||||
percent_complete = self._extract_progress_info(item)
|
||||
|
||||
progress_str = "0%"
|
||||
if percent_complete is not None and percent_complete > 0:
|
||||
progress_str = f"{percent_complete:.1f}%"
|
||||
|
||||
table.add_row(
|
||||
title,
|
||||
author_names or "Unknown",
|
||||
runtime_str or "Unknown",
|
||||
progress_str,
|
||||
key=title
|
||||
)
|
||||
|
||||
self.current_items = items
|
||||
mode = "all" if self.show_all_mode else "unfinished"
|
||||
self.update_status(f"Showing {len(items)} books ({mode})")
|
||||
|
||||
def show_all(self) -> None:
|
||||
"""Display all books in the table."""
|
||||
if not self.all_items:
|
||||
return
|
||||
self.show_all_mode = True
|
||||
self._populate_table(self.all_items)
|
||||
|
||||
def show_unfinished(self) -> None:
|
||||
"""Display only unfinished books in the table."""
|
||||
if not self.all_items:
|
||||
return
|
||||
self.show_all_mode = False
|
||||
|
||||
unfinished_items = []
|
||||
for item in self.all_items:
|
||||
is_finished_flag = item.get("is_finished")
|
||||
percent_complete = item.get("percent_complete")
|
||||
listening_status = item.get("listening_status")
|
||||
|
||||
if isinstance(listening_status, dict):
|
||||
is_finished_flag = is_finished_flag or listening_status.get(
|
||||
"is_finished", False)
|
||||
if percent_complete is None:
|
||||
percent_complete = listening_status.get(
|
||||
"percent_complete", 0)
|
||||
|
||||
is_finished = False
|
||||
if is_finished_flag is True:
|
||||
is_finished = True
|
||||
elif isinstance(percent_complete, (int, float)) and percent_complete >= 100:
|
||||
is_finished = True
|
||||
|
||||
if not is_finished:
|
||||
unfinished_items.append(item)
|
||||
|
||||
self._populate_table(unfinished_items)
|
||||
|
||||
def action_toggle_dark(self) -> None:
|
||||
"""Toggle between dark and light theme."""
|
||||
self.theme = (
|
||||
"textual-dark" if self.theme == "textual-light" else "textual-light"
|
||||
)
|
||||
|
||||
def action_sort(self) -> None:
|
||||
"""Sort table by title in ascending order."""
|
||||
table = self.query_one(DataTable)
|
||||
if table.row_count > 0:
|
||||
table.sort(self.title_column_key)
|
||||
|
||||
def action_reverse_sort(self) -> None:
|
||||
"""Sort table by title in descending order."""
|
||||
table = self.query_one(DataTable)
|
||||
if table.row_count > 0:
|
||||
table.sort(self.title_column_key, reverse=True)
|
||||
|
||||
def action_sort_by_progress(self) -> None:
|
||||
"""Sort table by progress percentage, toggling direction on each press."""
|
||||
table = self.query_one(DataTable)
|
||||
if table.row_count > 0:
|
||||
self.progress_sort_reverse = not self.progress_sort_reverse
|
||||
|
||||
def progress_key(row_values):
|
||||
progress_cell = row_values[self.progress_column_index]
|
||||
if isinstance(progress_cell, str):
|
||||
try:
|
||||
return float(progress_cell.rstrip("%"))
|
||||
except (ValueError, AttributeError):
|
||||
return 0.0
|
||||
return 0.0
|
||||
|
||||
table.sort(key=progress_key, reverse=self.progress_sort_reverse)
|
||||
|
||||
def action_show_all(self) -> None:
|
||||
"""Action handler to show all books."""
|
||||
self.show_all()
|
||||
|
||||
def action_show_unfinished(self) -> None:
|
||||
"""Action handler to show unfinished books."""
|
||||
self.show_unfinished()
|
||||
|
||||
|
||||
def authenticate():
|
||||
"""Authenticate with Audible and return auth and client objects."""
|
||||
auth_path = Path.home() / ".config" / "auditui" / "auth.json"
|
||||
auth_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
authenticator = None
|
||||
if auth_path.exists():
|
||||
try:
|
||||
authenticator = audible.Authenticator.from_file(str(auth_path))
|
||||
audible_client = audible.Client(auth=authenticator)
|
||||
return authenticator, audible_client
|
||||
except (OSError, ValueError, KeyError) as e:
|
||||
print(f"Failed to load existing auth: {e}")
|
||||
print("Please re-authenticate.")
|
||||
|
||||
print("Please authenticate with your Audible account.")
|
||||
print("You will need to provide:")
|
||||
print(" - Your Audible email/username")
|
||||
print(" - Your password")
|
||||
print(" - Your marketplace locale (e.g., 'US', 'UK', 'DE', 'FR')")
|
||||
|
||||
email = input("\nEmail: ")
|
||||
password = getpass("Password: ")
|
||||
marketplace = input(
|
||||
"Marketplace locale (default: US): ").strip().upper() or "US"
|
||||
|
||||
try:
|
||||
authenticator = audible.Authenticator.from_login(
|
||||
username=email,
|
||||
password=password,
|
||||
locale=marketplace
|
||||
)
|
||||
|
||||
auth_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
authenticator.to_file(str(auth_path))
|
||||
print("Authentication successful!")
|
||||
audible_client = audible.Client(auth=authenticator)
|
||||
return authenticator, audible_client
|
||||
except (OSError, ValueError, KeyError) as e:
|
||||
print(f"Authentication failed: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
auth, client = authenticate()
|
||||
app = AudituiApp()
|
||||
app.auth = auth
|
||||
app.client = client
|
||||
app.run()
|
||||
362
player.py
362
player.py
@@ -1,362 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Player playground for Audible TUI - download and play a specific track"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
from getpass import getpass
|
||||
from pathlib import Path
|
||||
|
||||
import audible
|
||||
import httpx
|
||||
from audible.activation_bytes import get_activation_bytes
|
||||
|
||||
|
||||
MIN_FILE_SIZE = 1024 * 1024
|
||||
DOWNLOAD_URL = "https://cde-ta-g7g.amazon.com/FionaCDEServiceEngine/FSDownloadContent"
|
||||
DEFAULT_CODEC = "LC_128_44100_stereo"
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format="%(message)s")
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
logging.getLogger("audible").setLevel(logging.WARNING)
|
||||
logging.getLogger("httpx").setLevel(logging.WARNING)
|
||||
|
||||
|
||||
def sanitize_filename(filename: str) -> str:
|
||||
"""Remove invalid characters from filename."""
|
||||
return re.sub(r'[<>:"/\\|?*]', "_", filename)
|
||||
|
||||
|
||||
class AudiblePlayer:
|
||||
"""Class to handle Audible authentication, downloading, and playback."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Initialize the player with authentication."""
|
||||
self.auth: audible.Authenticator | None = None
|
||||
self.client: audible.Client | None = None
|
||||
self.home = Path.home()
|
||||
self.cache_dir = self.home / ".cache" / "auditui" / "books"
|
||||
self.cache_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def authenticate(self) -> None:
|
||||
"""Authenticate with Audible and store auth and client."""
|
||||
auth_path = self.home / ".config" / "auditui" / "auth.json"
|
||||
auth_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if auth_path.exists():
|
||||
try:
|
||||
self.auth = audible.Authenticator.from_file(str(auth_path))
|
||||
self.client = audible.Client(auth=self.auth)
|
||||
return
|
||||
except Exception:
|
||||
logger.info(
|
||||
"Failed to load existing auth. Re-authenticating.\n")
|
||||
|
||||
email = input("Email: ")
|
||||
password = getpass("Password: ")
|
||||
marketplace = (
|
||||
input("Marketplace locale (default: US): ").strip().upper() or "US"
|
||||
)
|
||||
|
||||
self.auth = audible.Authenticator.from_login(
|
||||
username=email, password=password, locale=marketplace
|
||||
)
|
||||
self.auth.to_file(str(auth_path))
|
||||
self.client = audible.Client(auth=self.auth)
|
||||
|
||||
def get_name_from_asin(self, asin: str) -> str | None:
|
||||
"""Get the title/name of a book from its ASIN."""
|
||||
try:
|
||||
product_info = self.client.get(
|
||||
path=f"1.0/catalog/products/{asin}",
|
||||
response_groups="product_desc,product_attrs",
|
||||
)
|
||||
product = product_info.get("product", {})
|
||||
return product.get("title") or "Unknown Title"
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting name for ASIN {asin}: {e}")
|
||||
return None
|
||||
|
||||
def get_download_link(self, asin: str, codec: str = DEFAULT_CODEC) -> str | None:
|
||||
"""Get download link for book using the example method."""
|
||||
if self.auth.adp_token is None:
|
||||
raise Exception("No adp token present. Can't get download link.")
|
||||
|
||||
try:
|
||||
params = {
|
||||
"type": "AUDI",
|
||||
"currentTransportMethod": "WIFI",
|
||||
"key": asin,
|
||||
"codec": codec,
|
||||
}
|
||||
r = httpx.get(
|
||||
url=DOWNLOAD_URL, params=params, follow_redirects=False, auth=self.auth
|
||||
)
|
||||
r.raise_for_status()
|
||||
|
||||
link = r.headers.get("Location")
|
||||
if not link:
|
||||
raise ValueError("No Location header in response")
|
||||
|
||||
tld = self.auth.locale.domain
|
||||
new_link = link.replace("cds.audible.com", f"cds.audible.{tld}")
|
||||
return new_link
|
||||
except httpx.HTTPError as e:
|
||||
logger.error(f"HTTP error getting download link: {e}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting download link: {e}")
|
||||
return None
|
||||
|
||||
def download_file(self, url: str, dest_path: Path) -> Path | None:
|
||||
"""Download file from URL to destination."""
|
||||
try:
|
||||
with httpx.stream("GET", url) as r:
|
||||
r.raise_for_status()
|
||||
total_size = int(r.headers.get("content-length", 0))
|
||||
downloaded = 0
|
||||
|
||||
with open(dest_path, "wb") as f:
|
||||
for chunk in r.iter_bytes(chunk_size=8192):
|
||||
f.write(chunk)
|
||||
downloaded += len(chunk)
|
||||
if total_size > 0:
|
||||
percent = (downloaded / total_size) * 100
|
||||
print(
|
||||
f"\rDownloading: {percent:.1f}% ({downloaded}/{total_size} bytes)",
|
||||
end="",
|
||||
flush=True,
|
||||
)
|
||||
|
||||
print()
|
||||
return dest_path
|
||||
except httpx.HTTPError as e:
|
||||
logger.error(f"\nHTTP error downloading file: {e}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"\nError downloading file: {e}")
|
||||
return None
|
||||
|
||||
def get_or_download(self, asin: str) -> Path | None:
|
||||
"""Get local path of AAX file, downloading if missing."""
|
||||
title = self.get_name_from_asin(asin)
|
||||
if not title:
|
||||
title = asin
|
||||
|
||||
safe_title = sanitize_filename(title)
|
||||
local_path = self.cache_dir / f"{safe_title}.aax"
|
||||
|
||||
if local_path.exists() and local_path.stat().st_size >= MIN_FILE_SIZE:
|
||||
logger.info(f"Using cached file: {local_path.name}")
|
||||
return local_path
|
||||
|
||||
logger.info(f"\nDownloading to {local_path.name}...")
|
||||
dl_link = self.get_download_link(asin)
|
||||
if not dl_link:
|
||||
return None
|
||||
|
||||
if not self.download_file(dl_link, local_path):
|
||||
return None
|
||||
|
||||
if not local_path.exists() or local_path.stat().st_size < MIN_FILE_SIZE:
|
||||
logger.error("Download failed or file too small.")
|
||||
return None
|
||||
|
||||
return local_path
|
||||
|
||||
def fetch_library(self) -> list[dict]:
|
||||
"""Fetch all library items from Audible API."""
|
||||
response_groups = (
|
||||
"contributors,media,product_attrs,product_desc,product_details,"
|
||||
"rating,is_finished,listening_status,percent_complete"
|
||||
)
|
||||
all_items = []
|
||||
page = 1
|
||||
page_size = 50
|
||||
|
||||
while True:
|
||||
library = self.client.get(
|
||||
path="library",
|
||||
num_results=page_size,
|
||||
page=page,
|
||||
response_groups=response_groups,
|
||||
)
|
||||
items = library.get("items", [])
|
||||
if not items:
|
||||
break
|
||||
all_items.extend(items)
|
||||
logger.debug(f"Fetched page {page} ({len(items)} items)...")
|
||||
if len(items) < page_size:
|
||||
break
|
||||
page += 1
|
||||
|
||||
return all_items
|
||||
|
||||
def extract_asin(self, item: dict) -> str | None:
|
||||
"""Extract ASIN from library item."""
|
||||
product = item.get("product", {})
|
||||
return item.get("asin") or product.get("asin")
|
||||
|
||||
def extract_title(self, item: dict) -> str:
|
||||
"""Extract title from library item."""
|
||||
product = item.get("product", {})
|
||||
return (
|
||||
product.get("title")
|
||||
or item.get("title")
|
||||
or product.get("asin", "Unknown Title")
|
||||
)
|
||||
|
||||
def extract_authors(self, item: dict) -> str:
|
||||
"""Extract author names from library item."""
|
||||
product = item.get("product", {})
|
||||
authors = product.get("authors") or product.get("contributors") or []
|
||||
if not authors and "authors" in item:
|
||||
authors = item.get("authors", [])
|
||||
author_names = ", ".join(
|
||||
[a.get("name", "") for a in authors if isinstance(a, dict)]
|
||||
)
|
||||
return author_names or "Unknown"
|
||||
|
||||
def extract_progress(self, item: dict) -> float:
|
||||
"""Extract progress percentage from library item."""
|
||||
percent_complete = item.get("percent_complete")
|
||||
listening_status = item.get("listening_status", {})
|
||||
|
||||
if isinstance(listening_status, dict):
|
||||
if percent_complete is None:
|
||||
percent_complete = listening_status.get("percent_complete", 0)
|
||||
|
||||
return float(percent_complete) if percent_complete is not None else 0.0
|
||||
|
||||
def is_finished(self, item: dict) -> bool:
|
||||
"""Check if a library item is finished."""
|
||||
is_finished_flag = item.get("is_finished")
|
||||
percent_complete = item.get("percent_complete")
|
||||
listening_status = item.get("listening_status")
|
||||
|
||||
if isinstance(listening_status, dict):
|
||||
is_finished_flag = is_finished_flag or listening_status.get(
|
||||
"is_finished", False
|
||||
)
|
||||
if percent_complete is None:
|
||||
percent_complete = listening_status.get("percent_complete", 0)
|
||||
|
||||
return bool(is_finished_flag) or (
|
||||
isinstance(percent_complete, (int, float)
|
||||
) and percent_complete >= 100
|
||||
)
|
||||
|
||||
def list_unfinished_tracks(self) -> list[dict]:
|
||||
"""List all unfinished tracks with ASIN and name."""
|
||||
logger.info("Fetching library...")
|
||||
all_items = self.fetch_library()
|
||||
|
||||
unfinished = []
|
||||
for item in all_items:
|
||||
if not self.is_finished(item):
|
||||
unfinished.append(item)
|
||||
|
||||
return unfinished
|
||||
|
||||
def get_activation_bytes(self) -> str:
|
||||
"""Get activation bytes as hex string."""
|
||||
activation_bytes = get_activation_bytes(self.auth)
|
||||
if isinstance(activation_bytes, bytes):
|
||||
return activation_bytes.hex()
|
||||
return str(activation_bytes)
|
||||
|
||||
def play(self, path: Path, activation_hex: str | None = None) -> bool:
|
||||
"""Play a local file using ffplay."""
|
||||
if not shutil.which("ffplay"):
|
||||
logger.error(
|
||||
"ffplay not found. Please install ffmpeg (which includes ffplay)"
|
||||
)
|
||||
return False
|
||||
|
||||
cmd = ["ffplay", "-nodisp", "-autoexit"]
|
||||
if activation_hex:
|
||||
cmd.extend(["-activation_bytes", activation_hex])
|
||||
cmd.append(str(path))
|
||||
|
||||
try:
|
||||
logger.info(f"Playing: {path.name}")
|
||||
logger.info("Press Ctrl+C to stop playback\n")
|
||||
subprocess.run(
|
||||
cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
|
||||
)
|
||||
return True
|
||||
except KeyboardInterrupt:
|
||||
logger.info("\nPlayback stopped by user")
|
||||
return True
|
||||
except subprocess.CalledProcessError as e:
|
||||
logger.error(f"Error playing file: {e}")
|
||||
return False
|
||||
except FileNotFoundError:
|
||||
logger.error("ffplay not found")
|
||||
return False
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""Main entry point."""
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Download and play Audible audiobooks")
|
||||
parser.add_argument(
|
||||
"asin", nargs="?", default="", help="ASIN of the audiobook to play"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-v", "--verbose", action="store_true", help="Enable verbose logging"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.verbose:
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
logging.getLogger("audible").setLevel(logging.DEBUG)
|
||||
logging.getLogger("httpx").setLevel(logging.DEBUG)
|
||||
|
||||
player = AudiblePlayer()
|
||||
logger.info("Authenticating...")
|
||||
player.authenticate()
|
||||
|
||||
asin = args.asin
|
||||
|
||||
if not asin:
|
||||
unfinished = player.list_unfinished_tracks()
|
||||
tracks = []
|
||||
for item in unfinished:
|
||||
tracks.append(
|
||||
{
|
||||
"title": player.extract_title(item),
|
||||
"asin": player.extract_asin(item),
|
||||
"authors": player.extract_authors(item),
|
||||
"progress": player.extract_progress(item),
|
||||
}
|
||||
)
|
||||
print(json.dumps(tracks, indent=2))
|
||||
sys.exit(0)
|
||||
|
||||
logger.info(f"\nGetting download link for ASIN: {asin}")
|
||||
local_path = player.get_or_download(asin)
|
||||
|
||||
if not local_path:
|
||||
logger.error("Could not download file.")
|
||||
sys.exit(1)
|
||||
|
||||
logger.info("\nGetting activation bytes...")
|
||||
try:
|
||||
activation_hex = player.get_activation_bytes()
|
||||
except Exception as e:
|
||||
logger.error(f"Activation bytes error: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
logger.info("Starting playback...\n")
|
||||
if not player.play(local_path, activation_hex):
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -4,4 +4,7 @@ version = "0.1.0"
|
||||
description = "An Audible TUI client"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.13"
|
||||
dependencies = ["audible>=0.10.0", "textual>=6.7.1"]
|
||||
dependencies = ["audible>=0.10.0", "httpx>=0.28.1", "textual>=6.7.1"]
|
||||
|
||||
[project.scripts]
|
||||
auditui = "auditui.cli:main"
|
||||
|
||||
32
stats.py
32
stats.py
@@ -37,7 +37,8 @@ class AudibleStats:
|
||||
self.client = audible.Client(auth=self.auth)
|
||||
return
|
||||
except Exception:
|
||||
logger.info("Failed to load existing auth. Re-authenticating.\n")
|
||||
logger.info(
|
||||
"Failed to load existing auth. Re-authenticating.\n")
|
||||
|
||||
email = input("Email: ")
|
||||
password = getpass("Password: ")
|
||||
@@ -85,7 +86,8 @@ class AudibleStats:
|
||||
monthly_listening_interval_start_date=f"{middle}-01",
|
||||
store="Audible",
|
||||
)
|
||||
monthly_stats = stats.get("aggregated_monthly_listening_stats", [])
|
||||
monthly_stats = stats.get(
|
||||
"aggregated_monthly_listening_stats", [])
|
||||
has_activity = bool(
|
||||
monthly_stats
|
||||
and any(stat.get("aggregated_sum", 0) > 0 for stat in monthly_stats)
|
||||
@@ -101,12 +103,38 @@ class AudibleStats:
|
||||
|
||||
return earliest_year
|
||||
|
||||
def get_current_month_listening_time(self) -> tuple[int, int, int]:
|
||||
"""Get total listening time for the current month as (hours, minutes, seconds)."""
|
||||
try:
|
||||
stats = self.client.get(
|
||||
"1.0/stats/aggregates",
|
||||
monthly_listening_interval_duration="1",
|
||||
monthly_listening_interval_start_date=date.today().strftime("%Y-%m"),
|
||||
store="Audible",
|
||||
)
|
||||
monthly_stats = stats.get("aggregated_monthly_listening_stats", [])
|
||||
if not monthly_stats:
|
||||
return (0, 0, 0)
|
||||
|
||||
total_milliseconds = sum(
|
||||
stat.get("aggregated_sum", 0) for stat in monthly_stats
|
||||
)
|
||||
total_seconds = int(total_milliseconds // 1000)
|
||||
hours, remainder = divmod(total_seconds, 3600)
|
||||
minutes, seconds = divmod(remainder, 60)
|
||||
return (hours, minutes, seconds)
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not get current month listening time: {e}")
|
||||
return (0, 0, 0)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""Main entry point."""
|
||||
worker = AudibleStats()
|
||||
worker.authenticate()
|
||||
print(worker.get_signup_year())
|
||||
hours, minutes, seconds = worker.get_current_month_listening_time()
|
||||
print(f"Total listening time this month: {hours}h {minutes}m {seconds}s")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
2
uv.lock
generated
2
uv.lock
generated
@@ -37,12 +37,14 @@ version = "0.1.0"
|
||||
source = { virtual = "." }
|
||||
dependencies = [
|
||||
{ name = "audible" },
|
||||
{ name = "httpx" },
|
||||
{ name = "textual" },
|
||||
]
|
||||
|
||||
[package.metadata]
|
||||
requires-dist = [
|
||||
{ name = "audible", specifier = ">=0.10.0" },
|
||||
{ name = "httpx", specifier = ">=0.28.1" },
|
||||
{ name = "textual", specifier = ">=6.7.1" },
|
||||
]
|
||||
|
||||
|
||||
Reference in New Issue
Block a user