Files
auditui/player.py
2025-12-05 15:55:03 +01:00

363 lines
12 KiB
Python

#!/usr/bin/env python3
"""Player playground for Audible TUI - download and play a specific track"""
import argparse
import json
import logging
import re
import shutil
import subprocess
import sys
from getpass import getpass
from pathlib import Path
import audible
import httpx
from audible.activation_bytes import get_activation_bytes
MIN_FILE_SIZE = 1024 * 1024
DOWNLOAD_URL = "https://cde-ta-g7g.amazon.com/FionaCDEServiceEngine/FSDownloadContent"
DEFAULT_CODEC = "LC_128_44100_stereo"
logging.basicConfig(level=logging.INFO, format="%(message)s")
logger = logging.getLogger(__name__)
logging.getLogger("audible").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
def sanitize_filename(filename: str) -> str:
"""Remove invalid characters from filename."""
return re.sub(r'[<>:"/\\|?*]', "_", filename)
class AudiblePlayer:
"""Class to handle Audible authentication, downloading, and playback."""
def __init__(self) -> None:
"""Initialize the player with authentication."""
self.auth: audible.Authenticator | None = None
self.client: audible.Client | None = None
self.home = Path.home()
self.cache_dir = self.home / ".cache" / "auditui" / "books"
self.cache_dir.mkdir(parents=True, exist_ok=True)
def authenticate(self) -> None:
"""Authenticate with Audible and store auth and client."""
auth_path = self.home / ".config" / "auditui" / "auth.json"
auth_path.parent.mkdir(parents=True, exist_ok=True)
if auth_path.exists():
try:
self.auth = audible.Authenticator.from_file(str(auth_path))
self.client = audible.Client(auth=self.auth)
return
except Exception:
logger.info(
"Failed to load existing auth. Re-authenticating.\n")
email = input("Email: ")
password = getpass("Password: ")
marketplace = (
input("Marketplace locale (default: US): ").strip().upper() or "US"
)
self.auth = audible.Authenticator.from_login(
username=email, password=password, locale=marketplace
)
self.auth.to_file(str(auth_path))
self.client = audible.Client(auth=self.auth)
def get_name_from_asin(self, asin: str) -> str | None:
"""Get the title/name of a book from its ASIN."""
try:
product_info = self.client.get(
path=f"1.0/catalog/products/{asin}",
response_groups="product_desc,product_attrs",
)
product = product_info.get("product", {})
return product.get("title") or "Unknown Title"
except Exception as e:
logger.error(f"Error getting name for ASIN {asin}: {e}")
return None
def get_download_link(self, asin: str, codec: str = DEFAULT_CODEC) -> str | None:
"""Get download link for book using the example method."""
if self.auth.adp_token is None:
raise Exception("No adp token present. Can't get download link.")
try:
params = {
"type": "AUDI",
"currentTransportMethod": "WIFI",
"key": asin,
"codec": codec,
}
r = httpx.get(
url=DOWNLOAD_URL, params=params, follow_redirects=False, auth=self.auth
)
r.raise_for_status()
link = r.headers.get("Location")
if not link:
raise ValueError("No Location header in response")
tld = self.auth.locale.domain
new_link = link.replace("cds.audible.com", f"cds.audible.{tld}")
return new_link
except httpx.HTTPError as e:
logger.error(f"HTTP error getting download link: {e}")
return None
except Exception as e:
logger.error(f"Error getting download link: {e}")
return None
def download_file(self, url: str, dest_path: Path) -> Path | None:
"""Download file from URL to destination."""
try:
with httpx.stream("GET", url) as r:
r.raise_for_status()
total_size = int(r.headers.get("content-length", 0))
downloaded = 0
with open(dest_path, "wb") as f:
for chunk in r.iter_bytes(chunk_size=8192):
f.write(chunk)
downloaded += len(chunk)
if total_size > 0:
percent = (downloaded / total_size) * 100
print(
f"\rDownloading: {percent:.1f}% ({downloaded}/{total_size} bytes)",
end="",
flush=True,
)
print()
return dest_path
except httpx.HTTPError as e:
logger.error(f"\nHTTP error downloading file: {e}")
return None
except Exception as e:
logger.error(f"\nError downloading file: {e}")
return None
def get_or_download(self, asin: str) -> Path | None:
"""Get local path of AAX file, downloading if missing."""
title = self.get_name_from_asin(asin)
if not title:
title = asin
safe_title = sanitize_filename(title)
local_path = self.cache_dir / f"{safe_title}.aax"
if local_path.exists() and local_path.stat().st_size >= MIN_FILE_SIZE:
logger.info(f"Using cached file: {local_path.name}")
return local_path
logger.info(f"\nDownloading to {local_path.name}...")
dl_link = self.get_download_link(asin)
if not dl_link:
return None
if not self.download_file(dl_link, local_path):
return None
if not local_path.exists() or local_path.stat().st_size < MIN_FILE_SIZE:
logger.error("Download failed or file too small.")
return None
return local_path
def fetch_library(self) -> list[dict]:
"""Fetch all library items from Audible API."""
response_groups = (
"contributors,media,product_attrs,product_desc,product_details,"
"rating,is_finished,listening_status,percent_complete"
)
all_items = []
page = 1
page_size = 50
while True:
library = self.client.get(
path="library",
num_results=page_size,
page=page,
response_groups=response_groups,
)
items = library.get("items", [])
if not items:
break
all_items.extend(items)
logger.debug(f"Fetched page {page} ({len(items)} items)...")
if len(items) < page_size:
break
page += 1
return all_items
def extract_asin(self, item: dict) -> str | None:
"""Extract ASIN from library item."""
product = item.get("product", {})
return item.get("asin") or product.get("asin")
def extract_title(self, item: dict) -> str:
"""Extract title from library item."""
product = item.get("product", {})
return (
product.get("title")
or item.get("title")
or product.get("asin", "Unknown Title")
)
def extract_authors(self, item: dict) -> str:
"""Extract author names from library item."""
product = item.get("product", {})
authors = product.get("authors") or product.get("contributors") or []
if not authors and "authors" in item:
authors = item.get("authors", [])
author_names = ", ".join(
[a.get("name", "") for a in authors if isinstance(a, dict)]
)
return author_names or "Unknown"
def extract_progress(self, item: dict) -> float:
"""Extract progress percentage from library item."""
percent_complete = item.get("percent_complete")
listening_status = item.get("listening_status", {})
if isinstance(listening_status, dict):
if percent_complete is None:
percent_complete = listening_status.get("percent_complete", 0)
return float(percent_complete) if percent_complete is not None else 0.0
def is_finished(self, item: dict) -> bool:
"""Check if a library item is finished."""
is_finished_flag = item.get("is_finished")
percent_complete = item.get("percent_complete")
listening_status = item.get("listening_status")
if isinstance(listening_status, dict):
is_finished_flag = is_finished_flag or listening_status.get(
"is_finished", False
)
if percent_complete is None:
percent_complete = listening_status.get("percent_complete", 0)
return bool(is_finished_flag) or (
isinstance(percent_complete, (int, float)
) and percent_complete >= 100
)
def list_unfinished_tracks(self) -> list[dict]:
"""List all unfinished tracks with ASIN and name."""
logger.info("Fetching library...")
all_items = self.fetch_library()
unfinished = []
for item in all_items:
if not self.is_finished(item):
unfinished.append(item)
return unfinished
def get_activation_bytes(self) -> str:
"""Get activation bytes as hex string."""
activation_bytes = get_activation_bytes(self.auth)
if isinstance(activation_bytes, bytes):
return activation_bytes.hex()
return str(activation_bytes)
def play(self, path: Path, activation_hex: str | None = None) -> bool:
"""Play a local file using ffplay."""
if not shutil.which("ffplay"):
logger.error(
"ffplay not found. Please install ffmpeg (which includes ffplay)"
)
return False
cmd = ["ffplay", "-nodisp", "-autoexit"]
if activation_hex:
cmd.extend(["-activation_bytes", activation_hex])
cmd.append(str(path))
try:
logger.info(f"Playing: {path.name}")
logger.info("Press Ctrl+C to stop playback\n")
subprocess.run(
cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
return True
except KeyboardInterrupt:
logger.info("\nPlayback stopped by user")
return True
except subprocess.CalledProcessError as e:
logger.error(f"Error playing file: {e}")
return False
except FileNotFoundError:
logger.error("ffplay not found")
return False
def main() -> None:
"""Main entry point."""
parser = argparse.ArgumentParser(
description="Download and play Audible audiobooks")
parser.add_argument(
"asin", nargs="?", default="", help="ASIN of the audiobook to play"
)
parser.add_argument(
"-v", "--verbose", action="store_true", help="Enable verbose logging"
)
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
logging.getLogger("audible").setLevel(logging.DEBUG)
logging.getLogger("httpx").setLevel(logging.DEBUG)
player = AudiblePlayer()
logger.info("Authenticating...")
player.authenticate()
asin = args.asin
if not asin:
unfinished = player.list_unfinished_tracks()
tracks = []
for item in unfinished:
tracks.append(
{
"title": player.extract_title(item),
"asin": player.extract_asin(item),
"authors": player.extract_authors(item),
"progress": player.extract_progress(item),
}
)
print(json.dumps(tracks, indent=2))
sys.exit(0)
logger.info(f"\nGetting download link for ASIN: {asin}")
local_path = player.get_or_download(asin)
if not local_path:
logger.error("Could not download file.")
sys.exit(1)
logger.info("\nGetting activation bytes...")
try:
activation_hex = player.get_activation_bytes()
except Exception as e:
logger.error(f"Activation bytes error: {e}")
sys.exit(1)
logger.info("Starting playback...\n")
if not player.play(local_path, activation_hex):
sys.exit(1)
if __name__ == "__main__":
main()