diff --git a/player.py b/player.py deleted file mode 100644 index a534130..0000000 --- a/player.py +++ /dev/null @@ -1,362 +0,0 @@ -#!/usr/bin/env python3 -"""Player playground for Audible TUI - download and play a specific track""" - -import argparse -import json -import logging -import re -import shutil -import subprocess -import sys -from getpass import getpass -from pathlib import Path - -import audible -import httpx -from audible.activation_bytes import get_activation_bytes - - -MIN_FILE_SIZE = 1024 * 1024 -DOWNLOAD_URL = "https://cde-ta-g7g.amazon.com/FionaCDEServiceEngine/FSDownloadContent" -DEFAULT_CODEC = "LC_128_44100_stereo" - -logging.basicConfig(level=logging.INFO, format="%(message)s") -logger = logging.getLogger(__name__) - -logging.getLogger("audible").setLevel(logging.WARNING) -logging.getLogger("httpx").setLevel(logging.WARNING) - - -def sanitize_filename(filename: str) -> str: - """Remove invalid characters from filename.""" - return re.sub(r'[<>:"/\\|?*]', "_", filename) - - -class AudiblePlayer: - """Class to handle Audible authentication, downloading, and playback.""" - - def __init__(self) -> None: - """Initialize the player with authentication.""" - self.auth: audible.Authenticator | None = None - self.client: audible.Client | None = None - self.home = Path.home() - self.cache_dir = self.home / ".cache" / "auditui" / "books" - self.cache_dir.mkdir(parents=True, exist_ok=True) - - def authenticate(self) -> None: - """Authenticate with Audible and store auth and client.""" - auth_path = self.home / ".config" / "auditui" / "auth.json" - auth_path.parent.mkdir(parents=True, exist_ok=True) - - if auth_path.exists(): - try: - self.auth = audible.Authenticator.from_file(str(auth_path)) - self.client = audible.Client(auth=self.auth) - return - except Exception: - logger.info( - "Failed to load existing auth. Re-authenticating.\n") - - email = input("Email: ") - password = getpass("Password: ") - marketplace = ( - input("Marketplace locale (default: US): ").strip().upper() or "US" - ) - - self.auth = audible.Authenticator.from_login( - username=email, password=password, locale=marketplace - ) - self.auth.to_file(str(auth_path)) - self.client = audible.Client(auth=self.auth) - - def get_name_from_asin(self, asin: str) -> str | None: - """Get the title/name of a book from its ASIN.""" - try: - product_info = self.client.get( - path=f"1.0/catalog/products/{asin}", - response_groups="product_desc,product_attrs", - ) - product = product_info.get("product", {}) - return product.get("title") or "Unknown Title" - except Exception as e: - logger.error(f"Error getting name for ASIN {asin}: {e}") - return None - - def get_download_link(self, asin: str, codec: str = DEFAULT_CODEC) -> str | None: - """Get download link for book using the example method.""" - if self.auth.adp_token is None: - raise Exception("No adp token present. Can't get download link.") - - try: - params = { - "type": "AUDI", - "currentTransportMethod": "WIFI", - "key": asin, - "codec": codec, - } - r = httpx.get( - url=DOWNLOAD_URL, params=params, follow_redirects=False, auth=self.auth - ) - r.raise_for_status() - - link = r.headers.get("Location") - if not link: - raise ValueError("No Location header in response") - - tld = self.auth.locale.domain - new_link = link.replace("cds.audible.com", f"cds.audible.{tld}") - return new_link - except httpx.HTTPError as e: - logger.error(f"HTTP error getting download link: {e}") - return None - except Exception as e: - logger.error(f"Error getting download link: {e}") - return None - - def download_file(self, url: str, dest_path: Path) -> Path | None: - """Download file from URL to destination.""" - try: - with httpx.stream("GET", url) as r: - r.raise_for_status() - total_size = int(r.headers.get("content-length", 0)) - downloaded = 0 - - with open(dest_path, "wb") as f: - for chunk in r.iter_bytes(chunk_size=8192): - f.write(chunk) - downloaded += len(chunk) - if total_size > 0: - percent = (downloaded / total_size) * 100 - print( - f"\rDownloading: {percent:.1f}% ({downloaded}/{total_size} bytes)", - end="", - flush=True, - ) - - print() - return dest_path - except httpx.HTTPError as e: - logger.error(f"\nHTTP error downloading file: {e}") - return None - except Exception as e: - logger.error(f"\nError downloading file: {e}") - return None - - def get_or_download(self, asin: str) -> Path | None: - """Get local path of AAX file, downloading if missing.""" - title = self.get_name_from_asin(asin) - if not title: - title = asin - - safe_title = sanitize_filename(title) - local_path = self.cache_dir / f"{safe_title}.aax" - - if local_path.exists() and local_path.stat().st_size >= MIN_FILE_SIZE: - logger.info(f"Using cached file: {local_path.name}") - return local_path - - logger.info(f"\nDownloading to {local_path.name}...") - dl_link = self.get_download_link(asin) - if not dl_link: - return None - - if not self.download_file(dl_link, local_path): - return None - - if not local_path.exists() or local_path.stat().st_size < MIN_FILE_SIZE: - logger.error("Download failed or file too small.") - return None - - return local_path - - def fetch_library(self) -> list[dict]: - """Fetch all library items from Audible API.""" - response_groups = ( - "contributors,media,product_attrs,product_desc,product_details," - "rating,is_finished,listening_status,percent_complete" - ) - all_items = [] - page = 1 - page_size = 50 - - while True: - library = self.client.get( - path="library", - num_results=page_size, - page=page, - response_groups=response_groups, - ) - items = library.get("items", []) - if not items: - break - all_items.extend(items) - logger.debug(f"Fetched page {page} ({len(items)} items)...") - if len(items) < page_size: - break - page += 1 - - return all_items - - def extract_asin(self, item: dict) -> str | None: - """Extract ASIN from library item.""" - product = item.get("product", {}) - return item.get("asin") or product.get("asin") - - def extract_title(self, item: dict) -> str: - """Extract title from library item.""" - product = item.get("product", {}) - return ( - product.get("title") - or item.get("title") - or product.get("asin", "Unknown Title") - ) - - def extract_authors(self, item: dict) -> str: - """Extract author names from library item.""" - product = item.get("product", {}) - authors = product.get("authors") or product.get("contributors") or [] - if not authors and "authors" in item: - authors = item.get("authors", []) - author_names = ", ".join( - [a.get("name", "") for a in authors if isinstance(a, dict)] - ) - return author_names or "Unknown" - - def extract_progress(self, item: dict) -> float: - """Extract progress percentage from library item.""" - percent_complete = item.get("percent_complete") - listening_status = item.get("listening_status", {}) - - if isinstance(listening_status, dict): - if percent_complete is None: - percent_complete = listening_status.get("percent_complete", 0) - - return float(percent_complete) if percent_complete is not None else 0.0 - - def is_finished(self, item: dict) -> bool: - """Check if a library item is finished.""" - is_finished_flag = item.get("is_finished") - percent_complete = item.get("percent_complete") - listening_status = item.get("listening_status") - - if isinstance(listening_status, dict): - is_finished_flag = is_finished_flag or listening_status.get( - "is_finished", False - ) - if percent_complete is None: - percent_complete = listening_status.get("percent_complete", 0) - - return bool(is_finished_flag) or ( - isinstance(percent_complete, (int, float) - ) and percent_complete >= 100 - ) - - def list_unfinished_tracks(self) -> list[dict]: - """List all unfinished tracks with ASIN and name.""" - logger.info("Fetching library...") - all_items = self.fetch_library() - - unfinished = [] - for item in all_items: - if not self.is_finished(item): - unfinished.append(item) - - return unfinished - - def get_activation_bytes(self) -> str: - """Get activation bytes as hex string.""" - activation_bytes = get_activation_bytes(self.auth) - if isinstance(activation_bytes, bytes): - return activation_bytes.hex() - return str(activation_bytes) - - def play(self, path: Path, activation_hex: str | None = None) -> bool: - """Play a local file using ffplay.""" - if not shutil.which("ffplay"): - logger.error( - "ffplay not found. Please install ffmpeg (which includes ffplay)" - ) - return False - - cmd = ["ffplay", "-nodisp", "-autoexit"] - if activation_hex: - cmd.extend(["-activation_bytes", activation_hex]) - cmd.append(str(path)) - - try: - logger.info(f"Playing: {path.name}") - logger.info("Press Ctrl+C to stop playback\n") - subprocess.run( - cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL - ) - return True - except KeyboardInterrupt: - logger.info("\nPlayback stopped by user") - return True - except subprocess.CalledProcessError as e: - logger.error(f"Error playing file: {e}") - return False - except FileNotFoundError: - logger.error("ffplay not found") - return False - - -def main() -> None: - """Main entry point.""" - parser = argparse.ArgumentParser( - description="Download and play Audible audiobooks") - parser.add_argument( - "asin", nargs="?", default="", help="ASIN of the audiobook to play" - ) - parser.add_argument( - "-v", "--verbose", action="store_true", help="Enable verbose logging" - ) - args = parser.parse_args() - - if args.verbose: - logging.getLogger().setLevel(logging.DEBUG) - logging.getLogger("audible").setLevel(logging.DEBUG) - logging.getLogger("httpx").setLevel(logging.DEBUG) - - player = AudiblePlayer() - logger.info("Authenticating...") - player.authenticate() - - asin = args.asin - - if not asin: - unfinished = player.list_unfinished_tracks() - tracks = [] - for item in unfinished: - tracks.append( - { - "title": player.extract_title(item), - "asin": player.extract_asin(item), - "authors": player.extract_authors(item), - "progress": player.extract_progress(item), - } - ) - print(json.dumps(tracks, indent=2)) - sys.exit(0) - - logger.info(f"\nGetting download link for ASIN: {asin}") - local_path = player.get_or_download(asin) - - if not local_path: - logger.error("Could not download file.") - sys.exit(1) - - logger.info("\nGetting activation bytes...") - try: - activation_hex = player.get_activation_bytes() - except Exception as e: - logger.error(f"Activation bytes error: {e}") - sys.exit(1) - - logger.info("Starting playback...\n") - if not player.play(local_path, activation_hex): - sys.exit(1) - - -if __name__ == "__main__": - main()