Compare commits
2 Commits
1cac45e6cf
...
37ac47698c
| Author | SHA1 | Date | |
|---|---|---|---|
| 37ac47698c | |||
| d6e2284db1 |
11
README.md
11
README.md
@@ -6,18 +6,15 @@ Listen to your audiobooks or podcasts, browse your library, and more.
|
|||||||
|
|
||||||
## What it does and where are we
|
## What it does and where are we
|
||||||
|
|
||||||
`main.py` offers a TUI interface for browsing your Audible library, listing your books with progress information. You can sort by progress or title, show all books, or show only unfinished books which is the default.
|
`main.py` offers a TUI interface for browsing your Audible library, listing your books with progress information. You can sort by progress or title, show all books, or show only unfinished books which is the default.
|
||||||
|
|
||||||
Now, I'm working on the "play" feature, which should allow you to play/pause/unpause a book from the terminal by pressing `Space` on a book in the list.
|
You can also play a book by pressing `Enter` on a book in the list. You can pause/resume the playback by pressing `Space`.
|
||||||
|
|
||||||
Then, the next thing to add is a progress bar at the bottom of the interface, to show the progress of the book while it's playing.
|
|
||||||
|
|
||||||
Look at the [roadmap](#roadmap) for more details.
|
Look at the [roadmap](#roadmap) for more details.
|
||||||
|
|
||||||
It's still a work in progress, so :
|
It's still a work in progress, so :
|
||||||
|
|
||||||
- currently, most code resides in `main.py`, except for some experimental files that aren't part of the final structure:
|
- currently, most code resides in `main.py`, except for some experimental files that aren't part of the final structure:
|
||||||
- `player.py` is the test playground for the download and play functionality
|
|
||||||
- `stats.py` is the test playground for the stats functionality
|
- `stats.py` is the test playground for the stats functionality
|
||||||
- expect bugs and missing features
|
- expect bugs and missing features
|
||||||
- the code is not yet organized as I'm currently experimenting
|
- the code is not yet organized as I'm currently experimenting
|
||||||
@@ -31,7 +28,7 @@ This project uses [uv](https://github.com/astral-sh/uv) for dependency managemen
|
|||||||
$ uv sync
|
$ uv sync
|
||||||
|
|
||||||
# run the TUI
|
# run the TUI
|
||||||
$ uv run main.py # or player.py or stats.py
|
$ uv run main.py
|
||||||
```
|
```
|
||||||
|
|
||||||
Please also note that as of now, you need to have [ffmpeg](https://ffmpeg.org/) installed to play audio files.
|
Please also note that as of now, you need to have [ffmpeg](https://ffmpeg.org/) installed to play audio files.
|
||||||
@@ -40,7 +37,7 @@ Please also note that as of now, you need to have [ffmpeg](https://ffmpeg.org/)
|
|||||||
|
|
||||||
- [x] list your library
|
- [x] list your library
|
||||||
- [x] list your unfinished books with progress information
|
- [x] list your unfinished books with progress information
|
||||||
- [ ] play/pause a book
|
- [x] play/pause a book
|
||||||
- [ ] resume playback of a book from the last position, regardless of which device was used previously
|
- [ ] resume playback of a book from the last position, regardless of which device was used previously
|
||||||
- [ ] save the current playback position when pausing or exiting the app
|
- [ ] save the current playback position when pausing or exiting the app
|
||||||
- [ ] print progress at the bottom of the app while a book is playing
|
- [ ] print progress at the bottom of the app while a book is playing
|
||||||
|
|||||||
552
main.py
552
main.py
@@ -1,19 +1,28 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
"""A terminal-based user interface (TUI) client for Audible"""
|
"""A terminal-based user interface (TUI) client for Audible"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import shutil
|
||||||
|
import signal
|
||||||
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
from getpass import getpass
|
from getpass import getpass
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
import audible
|
import audible
|
||||||
|
import httpx
|
||||||
|
from audible.activation_bytes import get_activation_bytes
|
||||||
from textual.app import App, ComposeResult
|
from textual.app import App, ComposeResult
|
||||||
from textual.widgets import Footer, Header, DataTable, Static
|
from textual.events import Key
|
||||||
|
from textual.widgets import DataTable, Footer, Header, Static
|
||||||
from textual.worker import get_current_worker
|
from textual.worker import get_current_worker
|
||||||
from textual import work
|
from textual import work
|
||||||
|
|
||||||
|
|
||||||
class AudituiApp(App):
|
class AudituiApp(App):
|
||||||
"""Main application class for the Audible TUI app."""
|
"""Main application class for the Audible TUI app."""
|
||||||
|
|
||||||
BINDINGS = [
|
BINDINGS = [
|
||||||
("d", "toggle_dark", "Toggle dark mode"),
|
("d", "toggle_dark", "Toggle dark mode"),
|
||||||
("s", "sort", "Sort by title"),
|
("s", "sort", "Sort by title"),
|
||||||
@@ -21,6 +30,8 @@ class AudituiApp(App):
|
|||||||
("p", "sort_by_progress", "Sort by progress"),
|
("p", "sort_by_progress", "Sort by progress"),
|
||||||
("a", "show_all", "Show all books"),
|
("a", "show_all", "Show all books"),
|
||||||
("u", "show_unfinished", "Show unfinished"),
|
("u", "show_unfinished", "Show unfinished"),
|
||||||
|
("enter", "play_selected", "Play selected book"),
|
||||||
|
("space", "toggle_playback", "Pause/Resume"),
|
||||||
("q", "quit", "Quit application"),
|
("q", "quit", "Quit application"),
|
||||||
]
|
]
|
||||||
|
|
||||||
@@ -36,6 +47,10 @@ class AudituiApp(App):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
AUTH_PATH = Path.home() / ".config" / "auditui" / "auth.json"
|
AUTH_PATH = Path.home() / ".config" / "auditui" / "auth.json"
|
||||||
|
CACHE_DIR = Path.home() / ".cache" / "auditui" / "books"
|
||||||
|
DOWNLOAD_URL = "https://cde-ta-g7g.amazon.com/FionaCDEServiceEngine/FSDownloadContent"
|
||||||
|
DEFAULT_CODEC = "LC_128_44100_stereo"
|
||||||
|
MIN_FILE_SIZE = 1024 * 1024
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
@@ -49,6 +64,14 @@ class AudituiApp(App):
|
|||||||
self.progress_column_key = None
|
self.progress_column_key = None
|
||||||
self.progress_column_index = 3
|
self.progress_column_index = 3
|
||||||
|
|
||||||
|
self.playback_process = None
|
||||||
|
self.is_playing = False
|
||||||
|
self.is_paused = False
|
||||||
|
self.current_file_path = None
|
||||||
|
self.current_asin = None
|
||||||
|
|
||||||
|
self.CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
def compose(self) -> ComposeResult:
|
def compose(self) -> ComposeResult:
|
||||||
yield Header()
|
yield Header()
|
||||||
yield Static("Loading...", id="status")
|
yield Static("Loading...", id="status")
|
||||||
@@ -65,12 +88,33 @@ class AudituiApp(App):
|
|||||||
column_keys = list(table.columns.keys())
|
column_keys = list(table.columns.keys())
|
||||||
self.title_column_key = column_keys[0]
|
self.title_column_key = column_keys[0]
|
||||||
self.progress_column_key = column_keys[3]
|
self.progress_column_key = column_keys[3]
|
||||||
|
|
||||||
if self.client:
|
if self.client:
|
||||||
self.update_status("Fetching library...")
|
self.update_status("Fetching library...")
|
||||||
self.fetch_library()
|
self.fetch_library()
|
||||||
else:
|
else:
|
||||||
self.update_status(
|
self.update_status("Not authenticated. Please restart and authenticate.")
|
||||||
"Not authenticated. Please restart and authenticate.")
|
|
||||||
|
self.set_interval(1.0, self._check_playback_status)
|
||||||
|
|
||||||
|
def on_unmount(self) -> None:
|
||||||
|
"""Clean up on app exit."""
|
||||||
|
self._stop_playback()
|
||||||
|
|
||||||
|
def on_key(self, event: Key) -> None:
|
||||||
|
"""Handle key presses on DataTable."""
|
||||||
|
if isinstance(self.focused, DataTable):
|
||||||
|
if event.key == "enter":
|
||||||
|
event.prevent_default()
|
||||||
|
self.action_play_selected()
|
||||||
|
elif event.key == "space":
|
||||||
|
event.prevent_default()
|
||||||
|
self.action_toggle_playback()
|
||||||
|
|
||||||
|
def update_status(self, message: str) -> None:
|
||||||
|
"""Update the status message in the UI."""
|
||||||
|
status = self.query_one("#status", Static)
|
||||||
|
status.update(message)
|
||||||
|
|
||||||
@work(exclusive=True, thread=True)
|
@work(exclusive=True, thread=True)
|
||||||
def fetch_library(self) -> None:
|
def fetch_library(self) -> None:
|
||||||
@@ -85,12 +129,11 @@ class AudituiApp(App):
|
|||||||
"rating,is_finished,listening_status,percent_complete"
|
"rating,is_finished,listening_status,percent_complete"
|
||||||
)
|
)
|
||||||
all_items = self._fetch_all_pages(response_groups)
|
all_items = self._fetch_all_pages(response_groups)
|
||||||
self.all_items = all_items
|
|
||||||
self.call_from_thread(self.on_library_loaded, all_items)
|
self.call_from_thread(self.on_library_loaded, all_items)
|
||||||
except (OSError, ValueError, KeyError) as e:
|
except (OSError, ValueError, KeyError) as e:
|
||||||
self.call_from_thread(self.on_library_error, str(e))
|
self.call_from_thread(self.on_library_error, str(e))
|
||||||
|
|
||||||
def _fetch_all_pages(self, response_groups):
|
def _fetch_all_pages(self, response_groups: str) -> list:
|
||||||
"""Fetch all pages of library items from the API."""
|
"""Fetch all pages of library items from the API."""
|
||||||
all_items = []
|
all_items = []
|
||||||
page = 1
|
page = 1
|
||||||
@@ -105,7 +148,6 @@ class AudituiApp(App):
|
|||||||
)
|
)
|
||||||
|
|
||||||
items = library.get("items", [])
|
items = library.get("items", [])
|
||||||
|
|
||||||
if not items:
|
if not items:
|
||||||
break
|
break
|
||||||
|
|
||||||
@@ -122,7 +164,7 @@ class AudituiApp(App):
|
|||||||
|
|
||||||
return all_items
|
return all_items
|
||||||
|
|
||||||
def on_library_loaded(self, items) -> None:
|
def on_library_loaded(self, items: list) -> None:
|
||||||
"""Handle successful library load."""
|
"""Handle successful library load."""
|
||||||
self.all_items = items
|
self.all_items = items
|
||||||
self.update_status(f"Loaded {len(items)} books")
|
self.update_status(f"Loaded {len(items)} books")
|
||||||
@@ -132,12 +174,85 @@ class AudituiApp(App):
|
|||||||
"""Handle library fetch error."""
|
"""Handle library fetch error."""
|
||||||
self.update_status(f"Error fetching library: {error}")
|
self.update_status(f"Error fetching library: {error}")
|
||||||
|
|
||||||
def update_status(self, message: str) -> None:
|
def _extract_title(self, item: dict) -> str:
|
||||||
"""Update the status message in the UI."""
|
"""Extract title from library item."""
|
||||||
status = self.query_one("#status", Static)
|
product = item.get("product", {})
|
||||||
status.update(message)
|
return (
|
||||||
|
product.get("title") or
|
||||||
|
item.get("title") or
|
||||||
|
product.get("asin", "Unknown Title")
|
||||||
|
)
|
||||||
|
|
||||||
def format_duration(self, value, unit='minutes', default_none=None):
|
def _extract_authors(self, item: dict) -> str:
|
||||||
|
"""Extract author names from library item."""
|
||||||
|
product = item.get("product", {})
|
||||||
|
authors = product.get("authors") or product.get("contributors") or []
|
||||||
|
if not authors and "authors" in item:
|
||||||
|
authors = item.get("authors", [])
|
||||||
|
|
||||||
|
author_names = [
|
||||||
|
a.get("name", "") for a in authors if isinstance(a, dict)
|
||||||
|
]
|
||||||
|
return ", ".join(author_names) or "Unknown"
|
||||||
|
|
||||||
|
def _extract_runtime_minutes(self, item: dict) -> int | None:
|
||||||
|
"""Extract runtime in minutes from library item."""
|
||||||
|
product = item.get("product", {})
|
||||||
|
runtime_fields = [
|
||||||
|
"runtime_length_min",
|
||||||
|
"runtime_length",
|
||||||
|
"vLength",
|
||||||
|
"length",
|
||||||
|
"duration"
|
||||||
|
]
|
||||||
|
|
||||||
|
runtime = None
|
||||||
|
for field in runtime_fields:
|
||||||
|
runtime = product.get(field) or item.get(field)
|
||||||
|
if runtime is not None:
|
||||||
|
break
|
||||||
|
|
||||||
|
if runtime is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
if isinstance(runtime, dict):
|
||||||
|
return int(runtime.get("min", 0))
|
||||||
|
elif isinstance(runtime, (int, float)):
|
||||||
|
return int(runtime)
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _extract_progress_info(self, item: dict) -> float | None:
|
||||||
|
"""Extract progress percentage from library item."""
|
||||||
|
percent_complete = item.get("percent_complete")
|
||||||
|
listening_status = item.get("listening_status", {})
|
||||||
|
|
||||||
|
if isinstance(listening_status, dict) and percent_complete is None:
|
||||||
|
percent_complete = listening_status.get("percent_complete")
|
||||||
|
|
||||||
|
return float(percent_complete) if percent_complete is not None else None
|
||||||
|
|
||||||
|
def _extract_asin(self, item: dict) -> str | None:
|
||||||
|
"""Extract ASIN from library item."""
|
||||||
|
product = item.get("product", {})
|
||||||
|
return item.get("asin") or product.get("asin")
|
||||||
|
|
||||||
|
def _is_finished(self, item: dict) -> bool:
|
||||||
|
"""Check if a library item is finished."""
|
||||||
|
is_finished_flag = item.get("is_finished")
|
||||||
|
percent_complete = item.get("percent_complete")
|
||||||
|
listening_status = item.get("listening_status")
|
||||||
|
|
||||||
|
if isinstance(listening_status, dict):
|
||||||
|
is_finished_flag = is_finished_flag or listening_status.get("is_finished", False)
|
||||||
|
if percent_complete is None:
|
||||||
|
percent_complete = listening_status.get("percent_complete", 0)
|
||||||
|
|
||||||
|
return bool(is_finished_flag) or (
|
||||||
|
isinstance(percent_complete, (int, float)) and percent_complete >= 100
|
||||||
|
)
|
||||||
|
|
||||||
|
def format_duration(self, value: int | None, unit: str = 'minutes', default_none: str | None = None) -> str | None:
|
||||||
"""Format duration value into human-readable string."""
|
"""Format duration value into human-readable string."""
|
||||||
if value is None or value <= 0:
|
if value is None or value <= 0:
|
||||||
return default_none
|
return default_none
|
||||||
@@ -156,65 +271,7 @@ class AudituiApp(App):
|
|||||||
return f"{hours} hour{'s' if hours != 1 else ''}"
|
return f"{hours} hour{'s' if hours != 1 else ''}"
|
||||||
return f"{hours} hour{'s' if hours != 1 else ''} {mins} minute{'s' if mins != 1 else ''}"
|
return f"{hours} hour{'s' if hours != 1 else ''} {mins} minute{'s' if mins != 1 else ''}"
|
||||||
|
|
||||||
def _extract_title(self, item):
|
def _populate_table(self, items: list) -> None:
|
||||||
"""Extract title from library item."""
|
|
||||||
product = item.get("product", {})
|
|
||||||
return (product.get("title") or
|
|
||||||
item.get("title") or
|
|
||||||
product.get("asin", "Unknown Title"))
|
|
||||||
|
|
||||||
def _extract_authors(self, item):
|
|
||||||
"""Extract author names from library item."""
|
|
||||||
product = item.get("product", {})
|
|
||||||
authors = product.get("authors") or product.get("contributors") or []
|
|
||||||
if not authors and "authors" in item:
|
|
||||||
authors = item.get("authors", [])
|
|
||||||
return ", ".join([a.get("name", "") for a in authors if isinstance(a, dict)])
|
|
||||||
|
|
||||||
def _extract_runtime_minutes(self, item):
|
|
||||||
"""Extract runtime in minutes from library item."""
|
|
||||||
product = item.get("product", {})
|
|
||||||
runtime_fields = [
|
|
||||||
"runtime_length_min",
|
|
||||||
"runtime_length",
|
|
||||||
"vLength",
|
|
||||||
"length",
|
|
||||||
"duration"
|
|
||||||
]
|
|
||||||
|
|
||||||
runtime = None
|
|
||||||
for field in runtime_fields:
|
|
||||||
runtime = product.get(field)
|
|
||||||
if runtime is None:
|
|
||||||
runtime = item.get(field)
|
|
||||||
if runtime is not None:
|
|
||||||
break
|
|
||||||
|
|
||||||
if runtime is None:
|
|
||||||
return None
|
|
||||||
|
|
||||||
if isinstance(runtime, dict):
|
|
||||||
if "min" in runtime:
|
|
||||||
return int(runtime.get("min", 0))
|
|
||||||
elif isinstance(runtime, (int, float)):
|
|
||||||
return int(runtime)
|
|
||||||
|
|
||||||
return None
|
|
||||||
|
|
||||||
def _extract_progress_info(self, item):
|
|
||||||
"""Extract progress percentage from library item."""
|
|
||||||
percent_complete = item.get("percent_complete")
|
|
||||||
listening_status = item.get("listening_status", {})
|
|
||||||
|
|
||||||
if isinstance(listening_status, dict):
|
|
||||||
if percent_complete is None:
|
|
||||||
percent_complete = listening_status.get("percent_complete")
|
|
||||||
else:
|
|
||||||
percent_complete = None
|
|
||||||
|
|
||||||
return percent_complete
|
|
||||||
|
|
||||||
def _populate_table(self, items):
|
|
||||||
"""Populate the DataTable with library items."""
|
"""Populate the DataTable with library items."""
|
||||||
table = self.query_one(DataTable)
|
table = self.query_one(DataTable)
|
||||||
table.clear()
|
table.clear()
|
||||||
@@ -228,7 +285,8 @@ class AudituiApp(App):
|
|||||||
author_names = self._extract_authors(item)
|
author_names = self._extract_authors(item)
|
||||||
minutes = self._extract_runtime_minutes(item)
|
minutes = self._extract_runtime_minutes(item)
|
||||||
runtime_str = self.format_duration(
|
runtime_str = self.format_duration(
|
||||||
minutes, unit='minutes', default_none="Unknown length")
|
minutes, unit='minutes', default_none="Unknown length"
|
||||||
|
)
|
||||||
percent_complete = self._extract_progress_info(item)
|
percent_complete = self._extract_progress_info(item)
|
||||||
|
|
||||||
progress_str = "0%"
|
progress_str = "0%"
|
||||||
@@ -258,30 +316,11 @@ class AudituiApp(App):
|
|||||||
"""Display only unfinished books in the table."""
|
"""Display only unfinished books in the table."""
|
||||||
if not self.all_items:
|
if not self.all_items:
|
||||||
return
|
return
|
||||||
|
|
||||||
self.show_all_mode = False
|
self.show_all_mode = False
|
||||||
|
unfinished_items = [
|
||||||
unfinished_items = []
|
item for item in self.all_items if not self._is_finished(item)
|
||||||
for item in self.all_items:
|
]
|
||||||
is_finished_flag = item.get("is_finished")
|
|
||||||
percent_complete = item.get("percent_complete")
|
|
||||||
listening_status = item.get("listening_status")
|
|
||||||
|
|
||||||
if isinstance(listening_status, dict):
|
|
||||||
is_finished_flag = is_finished_flag or listening_status.get(
|
|
||||||
"is_finished", False)
|
|
||||||
if percent_complete is None:
|
|
||||||
percent_complete = listening_status.get(
|
|
||||||
"percent_complete", 0)
|
|
||||||
|
|
||||||
is_finished = False
|
|
||||||
if is_finished_flag is True:
|
|
||||||
is_finished = True
|
|
||||||
elif isinstance(percent_complete, (int, float)) and percent_complete >= 100:
|
|
||||||
is_finished = True
|
|
||||||
|
|
||||||
if not is_finished:
|
|
||||||
unfinished_items.append(item)
|
|
||||||
|
|
||||||
self._populate_table(unfinished_items)
|
self._populate_table(unfinished_items)
|
||||||
|
|
||||||
def action_toggle_dark(self) -> None:
|
def action_toggle_dark(self) -> None:
|
||||||
@@ -327,13 +366,330 @@ class AudituiApp(App):
|
|||||||
"""Action handler to show unfinished books."""
|
"""Action handler to show unfinished books."""
|
||||||
self.show_unfinished()
|
self.show_unfinished()
|
||||||
|
|
||||||
|
def action_play_selected(self) -> None:
|
||||||
|
"""Start playing the selected book."""
|
||||||
|
table = self.query_one(DataTable)
|
||||||
|
if table.row_count == 0:
|
||||||
|
self.update_status("No books available")
|
||||||
|
return
|
||||||
|
|
||||||
|
cursor_row = table.cursor_row
|
||||||
|
if cursor_row >= len(self.current_items):
|
||||||
|
self.update_status("Invalid selection")
|
||||||
|
return
|
||||||
|
|
||||||
|
selected_item = self.current_items[cursor_row]
|
||||||
|
asin = self._extract_asin(selected_item)
|
||||||
|
|
||||||
|
if not asin:
|
||||||
|
self.update_status("Could not get ASIN for selected book")
|
||||||
|
return
|
||||||
|
|
||||||
|
if self.is_playing:
|
||||||
|
self._stop_playback()
|
||||||
|
|
||||||
|
self.current_asin = asin
|
||||||
|
self._start_playback_async(asin)
|
||||||
|
|
||||||
|
def action_toggle_playback(self) -> None:
|
||||||
|
"""Toggle pause/resume state."""
|
||||||
|
if not self.is_playing:
|
||||||
|
self.update_status("No playback active. Press Enter to play a book.")
|
||||||
|
return
|
||||||
|
|
||||||
|
if not self._is_process_alive():
|
||||||
|
self._stop_playback()
|
||||||
|
self.update_status("Playback has ended")
|
||||||
|
return
|
||||||
|
|
||||||
|
if self.is_paused:
|
||||||
|
self._resume_playback()
|
||||||
|
else:
|
||||||
|
self._pause_playback()
|
||||||
|
|
||||||
|
def _get_playback_status_message(self, prefix: str) -> str:
|
||||||
|
"""Generate status message with filename if available."""
|
||||||
|
filename = self.current_file_path.name if self.current_file_path else ""
|
||||||
|
return f"{prefix}: {filename}" if filename else prefix
|
||||||
|
|
||||||
|
def _check_playback_status(self) -> None:
|
||||||
|
"""Check if playback process has finished and update state accordingly."""
|
||||||
|
if self.playback_process is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
return_code = self.playback_process.poll()
|
||||||
|
if return_code is not None:
|
||||||
|
finished_file = self.current_file_path
|
||||||
|
self.playback_process = None
|
||||||
|
self.is_playing = False
|
||||||
|
self.is_paused = False
|
||||||
|
|
||||||
|
if finished_file:
|
||||||
|
if return_code == 0:
|
||||||
|
self.update_status(f"Finished: {finished_file.name}")
|
||||||
|
else:
|
||||||
|
self.update_status(
|
||||||
|
f"Playback ended unexpectedly (code: {return_code}): {finished_file.name}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
self.update_status("Playback finished")
|
||||||
|
|
||||||
|
self.current_file_path = None
|
||||||
|
self.current_asin = None
|
||||||
|
|
||||||
|
def _start_playback(self, path: Path, activation_hex: str | None = None) -> bool:
|
||||||
|
"""Start playing a local file using ffplay."""
|
||||||
|
if not shutil.which("ffplay"):
|
||||||
|
self.update_status("ffplay not found. Please install ffmpeg")
|
||||||
|
return False
|
||||||
|
|
||||||
|
if self.playback_process is not None:
|
||||||
|
self._stop_playback()
|
||||||
|
|
||||||
|
cmd = ["ffplay", "-nodisp", "-autoexit"]
|
||||||
|
if activation_hex:
|
||||||
|
cmd.extend(["-activation_bytes", activation_hex])
|
||||||
|
cmd.append(str(path))
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.playback_process = subprocess.Popen(
|
||||||
|
cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
|
||||||
|
)
|
||||||
|
|
||||||
|
if self.playback_process.poll() is not None:
|
||||||
|
return_code = self.playback_process.returncode
|
||||||
|
self.update_status(
|
||||||
|
f"Playback process exited immediately (code: {return_code})"
|
||||||
|
)
|
||||||
|
self.playback_process = None
|
||||||
|
return False
|
||||||
|
|
||||||
|
self.is_playing = True
|
||||||
|
self.is_paused = False
|
||||||
|
self.current_file_path = path
|
||||||
|
self.update_status(f"Playing: {path.name}")
|
||||||
|
return True
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.update_status(f"Error starting playback: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _stop_playback(self) -> None:
|
||||||
|
"""Stop the current playback."""
|
||||||
|
if self.playback_process is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
if self.playback_process.poll() is None:
|
||||||
|
self.playback_process.terminate()
|
||||||
|
try:
|
||||||
|
self.playback_process.wait(timeout=2)
|
||||||
|
except subprocess.TimeoutExpired:
|
||||||
|
self.playback_process.kill()
|
||||||
|
self.playback_process.wait()
|
||||||
|
except ProcessLookupError:
|
||||||
|
pass
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
self.playback_process = None
|
||||||
|
self.is_playing = False
|
||||||
|
self.is_paused = False
|
||||||
|
self.current_file_path = None
|
||||||
|
|
||||||
|
def _is_process_alive(self) -> bool:
|
||||||
|
"""Check if playback process is still running."""
|
||||||
|
if self.playback_process is None:
|
||||||
|
return False
|
||||||
|
return self.playback_process.poll() is None
|
||||||
|
|
||||||
|
def _pause_playback(self) -> None:
|
||||||
|
"""Pause the current playback."""
|
||||||
|
if not (self.playback_process and self.is_playing and not self.is_paused):
|
||||||
|
return
|
||||||
|
|
||||||
|
if not self._is_process_alive():
|
||||||
|
self._stop_playback()
|
||||||
|
self.update_status("Playback process has ended")
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
os.kill(self.playback_process.pid, signal.SIGSTOP)
|
||||||
|
self.is_paused = True
|
||||||
|
self.update_status(self._get_playback_status_message("Paused"))
|
||||||
|
except ProcessLookupError:
|
||||||
|
self._stop_playback()
|
||||||
|
self.update_status("Process no longer exists")
|
||||||
|
except PermissionError:
|
||||||
|
self.update_status("Permission denied: cannot pause playback")
|
||||||
|
except Exception as e:
|
||||||
|
self.update_status(f"Error pausing playback: {e}")
|
||||||
|
|
||||||
|
def _resume_playback(self) -> None:
|
||||||
|
"""Resume the current playback."""
|
||||||
|
if not (self.playback_process and self.is_playing and self.is_paused):
|
||||||
|
return
|
||||||
|
|
||||||
|
if not self._is_process_alive():
|
||||||
|
self._stop_playback()
|
||||||
|
self.update_status("Playback process has ended")
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
os.kill(self.playback_process.pid, signal.SIGCONT)
|
||||||
|
self.is_paused = False
|
||||||
|
self.update_status(self._get_playback_status_message("Playing"))
|
||||||
|
except ProcessLookupError:
|
||||||
|
self._stop_playback()
|
||||||
|
self.update_status("Process no longer exists")
|
||||||
|
except PermissionError:
|
||||||
|
self.update_status("Permission denied: cannot resume playback")
|
||||||
|
except Exception as e:
|
||||||
|
self.update_status(f"Error resuming playback: {e}")
|
||||||
|
|
||||||
|
@work(exclusive=True, thread=True)
|
||||||
|
def _start_playback_async(self, asin: str) -> None:
|
||||||
|
"""Start playback asynchronously."""
|
||||||
|
self.call_from_thread(self.update_status, "Preparing playback...")
|
||||||
|
|
||||||
|
local_path = self._get_or_download(asin)
|
||||||
|
if not local_path:
|
||||||
|
self.call_from_thread(self.update_status, "Could not download file")
|
||||||
|
return
|
||||||
|
|
||||||
|
self.call_from_thread(self.update_status, "Getting activation bytes...")
|
||||||
|
activation_hex = self._get_activation_bytes()
|
||||||
|
if not activation_hex:
|
||||||
|
self.call_from_thread(self.update_status, "Failed to get activation bytes")
|
||||||
|
return
|
||||||
|
|
||||||
|
self.call_from_thread(
|
||||||
|
self.update_status,
|
||||||
|
f"Starting playback of {local_path.name}..."
|
||||||
|
)
|
||||||
|
self.call_from_thread(self._start_playback, local_path, activation_hex)
|
||||||
|
|
||||||
|
def _sanitize_filename(self, filename: str) -> str:
|
||||||
|
"""Remove invalid characters from filename."""
|
||||||
|
return re.sub(r'[<>:"/\\|?*]', "_", filename)
|
||||||
|
|
||||||
|
def _get_name_from_asin(self, asin: str) -> str | None:
|
||||||
|
"""Get the title/name of a book from its ASIN."""
|
||||||
|
try:
|
||||||
|
product_info = self.client.get(
|
||||||
|
path=f"1.0/catalog/products/{asin}",
|
||||||
|
response_groups="product_desc,product_attrs",
|
||||||
|
)
|
||||||
|
product = product_info.get("product", {})
|
||||||
|
return product.get("title") or "Unknown Title"
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _get_download_link(self, asin: str, codec: str = DEFAULT_CODEC) -> str | None:
|
||||||
|
"""Get download link for book."""
|
||||||
|
if self.auth.adp_token is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
params = {
|
||||||
|
"type": "AUDI",
|
||||||
|
"currentTransportMethod": "WIFI",
|
||||||
|
"key": asin,
|
||||||
|
"codec": codec,
|
||||||
|
}
|
||||||
|
response = httpx.get(
|
||||||
|
url=self.DOWNLOAD_URL,
|
||||||
|
params=params,
|
||||||
|
follow_redirects=False,
|
||||||
|
auth=self.auth
|
||||||
|
)
|
||||||
|
response.raise_for_status()
|
||||||
|
|
||||||
|
link = response.headers.get("Location")
|
||||||
|
if not link:
|
||||||
|
return None
|
||||||
|
|
||||||
|
tld = self.auth.locale.domain
|
||||||
|
return link.replace("cds.audible.com", f"cds.audible.{tld}")
|
||||||
|
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _download_file(self, url: str, dest_path: Path) -> Path | None:
|
||||||
|
"""Download file from URL to destination."""
|
||||||
|
try:
|
||||||
|
with httpx.stream("GET", url) as response:
|
||||||
|
response.raise_for_status()
|
||||||
|
total_size = int(response.headers.get("content-length", 0))
|
||||||
|
downloaded = 0
|
||||||
|
|
||||||
|
with open(dest_path, "wb") as f:
|
||||||
|
for chunk in response.iter_bytes(chunk_size=8192):
|
||||||
|
f.write(chunk)
|
||||||
|
downloaded += len(chunk)
|
||||||
|
if total_size > 0:
|
||||||
|
percent = (downloaded / total_size) * 100
|
||||||
|
self.call_from_thread(
|
||||||
|
self.update_status,
|
||||||
|
f"Downloading: {percent:.1f}% ({downloaded}/{total_size} bytes)"
|
||||||
|
)
|
||||||
|
|
||||||
|
return dest_path
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _get_or_download(self, asin: str) -> Path | None:
|
||||||
|
"""Get local path of AAX file, downloading if missing."""
|
||||||
|
title = self._get_name_from_asin(asin) or asin
|
||||||
|
safe_title = self._sanitize_filename(title)
|
||||||
|
local_path = self.CACHE_DIR / f"{safe_title}.aax"
|
||||||
|
|
||||||
|
if local_path.exists() and local_path.stat().st_size >= self.MIN_FILE_SIZE:
|
||||||
|
self.call_from_thread(
|
||||||
|
self.update_status,
|
||||||
|
f"Using cached file: {local_path.name}"
|
||||||
|
)
|
||||||
|
return local_path
|
||||||
|
|
||||||
|
self.call_from_thread(
|
||||||
|
self.update_status,
|
||||||
|
f"Downloading to {local_path.name}..."
|
||||||
|
)
|
||||||
|
|
||||||
|
dl_link = self._get_download_link(asin)
|
||||||
|
if not dl_link:
|
||||||
|
self.call_from_thread(self.update_status, "Failed to get download link")
|
||||||
|
return None
|
||||||
|
|
||||||
|
if not self._download_file(dl_link, local_path):
|
||||||
|
self.call_from_thread(self.update_status, "Download failed")
|
||||||
|
return None
|
||||||
|
|
||||||
|
if not local_path.exists() or local_path.stat().st_size < self.MIN_FILE_SIZE:
|
||||||
|
self.call_from_thread(
|
||||||
|
self.update_status,
|
||||||
|
"Download failed or file too small"
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
|
return local_path
|
||||||
|
|
||||||
|
def _get_activation_bytes(self) -> str | None:
|
||||||
|
"""Get activation bytes as hex string."""
|
||||||
|
try:
|
||||||
|
activation_bytes = get_activation_bytes(self.auth)
|
||||||
|
if isinstance(activation_bytes, bytes):
|
||||||
|
return activation_bytes.hex()
|
||||||
|
return str(activation_bytes)
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def authenticate():
|
def authenticate():
|
||||||
"""Authenticate with Audible and return auth and client objects."""
|
"""Authenticate with Audible and return auth and client objects."""
|
||||||
auth_path = Path.home() / ".config" / "auditui" / "auth.json"
|
auth_path = Path.home() / ".config" / "auditui" / "auth.json"
|
||||||
auth_path.parent.mkdir(parents=True, exist_ok=True)
|
auth_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
authenticator = None
|
|
||||||
if auth_path.exists():
|
if auth_path.exists():
|
||||||
try:
|
try:
|
||||||
authenticator = audible.Authenticator.from_file(str(auth_path))
|
authenticator = audible.Authenticator.from_file(str(auth_path))
|
||||||
@@ -352,7 +708,8 @@ def authenticate():
|
|||||||
email = input("\nEmail: ")
|
email = input("\nEmail: ")
|
||||||
password = getpass("Password: ")
|
password = getpass("Password: ")
|
||||||
marketplace = input(
|
marketplace = input(
|
||||||
"Marketplace locale (default: US): ").strip().upper() or "US"
|
"Marketplace locale (default: US): "
|
||||||
|
).strip().upper() or "US"
|
||||||
|
|
||||||
try:
|
try:
|
||||||
authenticator = audible.Authenticator.from_login(
|
authenticator = audible.Authenticator.from_login(
|
||||||
@@ -366,6 +723,7 @@ def authenticate():
|
|||||||
print("Authentication successful!")
|
print("Authentication successful!")
|
||||||
audible_client = audible.Client(auth=authenticator)
|
audible_client = audible.Client(auth=authenticator)
|
||||||
return authenticator, audible_client
|
return authenticator, audible_client
|
||||||
|
|
||||||
except (OSError, ValueError, KeyError) as e:
|
except (OSError, ValueError, KeyError) as e:
|
||||||
print(f"Authentication failed: {e}")
|
print(f"Authentication failed: {e}")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|||||||
Reference in New Issue
Block a user