feat: output possible track in json and quit
This commit is contained in:
119
player.py
119
player.py
@@ -2,6 +2,7 @@
|
||||
"""Player playground for Audible TUI - download and play a specific track"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import shutil
|
||||
@@ -19,10 +20,7 @@ MIN_FILE_SIZE = 1024 * 1024
|
||||
DOWNLOAD_URL = "https://cde-ta-g7g.amazon.com/FionaCDEServiceEngine/FSDownloadContent"
|
||||
DEFAULT_CODEC = "LC_128_44100_stereo"
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(message)s"
|
||||
)
|
||||
logging.basicConfig(level=logging.INFO, format="%(message)s")
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
logging.getLogger("audible").setLevel(logging.WARNING)
|
||||
@@ -56,11 +54,14 @@ class AudiblePlayer:
|
||||
self.client = audible.Client(auth=self.auth)
|
||||
return
|
||||
except Exception:
|
||||
logger.info("Failed to load existing auth. Re-authenticating.\n")
|
||||
logger.info(
|
||||
"Failed to load existing auth. Re-authenticating.\n")
|
||||
|
||||
email = input("Email: ")
|
||||
password = getpass("Password: ")
|
||||
marketplace = input("Marketplace locale (default: US): ").strip().upper() or "US"
|
||||
marketplace = (
|
||||
input("Marketplace locale (default: US): ").strip().upper() or "US"
|
||||
)
|
||||
|
||||
self.auth = audible.Authenticator.from_login(
|
||||
username=email, password=password, locale=marketplace
|
||||
@@ -126,7 +127,11 @@ class AudiblePlayer:
|
||||
downloaded += len(chunk)
|
||||
if total_size > 0:
|
||||
percent = (downloaded / total_size) * 100
|
||||
print(f"\rDownloading: {percent:.1f}% ({downloaded}/{total_size} bytes)", end="", flush=True)
|
||||
print(
|
||||
f"\rDownloading: {percent:.1f}% ({downloaded}/{total_size} bytes)",
|
||||
end="",
|
||||
flush=True,
|
||||
)
|
||||
|
||||
print()
|
||||
return dest_path
|
||||
@@ -179,7 +184,7 @@ class AudiblePlayer:
|
||||
path="library",
|
||||
num_results=page_size,
|
||||
page=page,
|
||||
response_groups=response_groups
|
||||
response_groups=response_groups,
|
||||
)
|
||||
items = library.get("items", [])
|
||||
if not items:
|
||||
@@ -200,9 +205,11 @@ class AudiblePlayer:
|
||||
def extract_title(self, item: dict) -> str:
|
||||
"""Extract title from library item."""
|
||||
product = item.get("product", {})
|
||||
return (product.get("title") or
|
||||
item.get("title") or
|
||||
product.get("asin", "Unknown Title"))
|
||||
return (
|
||||
product.get("title")
|
||||
or item.get("title")
|
||||
or product.get("asin", "Unknown Title")
|
||||
)
|
||||
|
||||
def extract_authors(self, item: dict) -> str:
|
||||
"""Extract author names from library item."""
|
||||
@@ -210,7 +217,9 @@ class AudiblePlayer:
|
||||
authors = product.get("authors") or product.get("contributors") or []
|
||||
if not authors and "authors" in item:
|
||||
authors = item.get("authors", [])
|
||||
author_names = ", ".join([a.get("name", "") for a in authors if isinstance(a, dict)])
|
||||
author_names = ", ".join(
|
||||
[a.get("name", "") for a in authors if isinstance(a, dict)]
|
||||
)
|
||||
return author_names or "Unknown"
|
||||
|
||||
def extract_progress(self, item: dict) -> float:
|
||||
@@ -231,22 +240,27 @@ class AudiblePlayer:
|
||||
listening_status = item.get("listening_status")
|
||||
|
||||
if isinstance(listening_status, dict):
|
||||
is_finished_flag = is_finished_flag or listening_status.get("is_finished", False)
|
||||
is_finished_flag = is_finished_flag or listening_status.get(
|
||||
"is_finished", False
|
||||
)
|
||||
if percent_complete is None:
|
||||
percent_complete = listening_status.get("percent_complete", 0)
|
||||
|
||||
return bool(is_finished_flag) or (isinstance(percent_complete, (int, float)) and percent_complete >= 100)
|
||||
return bool(is_finished_flag) or (
|
||||
isinstance(percent_complete, (int, float)
|
||||
) and percent_complete >= 100
|
||||
)
|
||||
|
||||
def list_unfinished_tracks(self) -> list[dict]:
|
||||
"""List all unfinished tracks with ASIN and name."""
|
||||
logger.info("Fetching library...")
|
||||
all_items = self.fetch_library()
|
||||
|
||||
|
||||
unfinished = []
|
||||
for item in all_items:
|
||||
if not self.is_finished(item):
|
||||
unfinished.append(item)
|
||||
|
||||
|
||||
return unfinished
|
||||
|
||||
def get_activation_bytes(self) -> str:
|
||||
@@ -259,7 +273,9 @@ class AudiblePlayer:
|
||||
def play(self, path: Path, activation_hex: str | None = None) -> bool:
|
||||
"""Play a local file using ffplay."""
|
||||
if not shutil.which("ffplay"):
|
||||
logger.error("ffplay not found. Please install ffmpeg (which includes ffplay)")
|
||||
logger.error(
|
||||
"ffplay not found. Please install ffmpeg (which includes ffplay)"
|
||||
)
|
||||
return False
|
||||
|
||||
cmd = ["ffplay", "-nodisp", "-autoexit"]
|
||||
@@ -270,7 +286,9 @@ class AudiblePlayer:
|
||||
try:
|
||||
logger.info(f"Playing: {path.name}")
|
||||
logger.info("Press Ctrl+C to stop playback\n")
|
||||
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
||||
subprocess.run(
|
||||
cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
|
||||
)
|
||||
return True
|
||||
except KeyboardInterrupt:
|
||||
logger.info("\nPlayback stopped by user")
|
||||
@@ -285,17 +303,13 @@ class AudiblePlayer:
|
||||
|
||||
def main() -> None:
|
||||
"""Main entry point."""
|
||||
parser = argparse.ArgumentParser(description="Download and play Audible audiobooks")
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Download and play Audible audiobooks")
|
||||
parser.add_argument(
|
||||
"asin",
|
||||
nargs="?",
|
||||
default="",
|
||||
help="ASIN of the audiobook to play"
|
||||
"asin", nargs="?", default="", help="ASIN of the audiobook to play"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-v", "--verbose",
|
||||
action="store_true",
|
||||
help="Enable verbose logging"
|
||||
"-v", "--verbose", action="store_true", help="Enable verbose logging"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
@@ -312,35 +326,30 @@ def main() -> None:
|
||||
|
||||
if not asin:
|
||||
unfinished = player.list_unfinished_tracks()
|
||||
if not unfinished:
|
||||
logger.info("No unfinished tracks found.")
|
||||
sys.exit(0)
|
||||
|
||||
logger.info(f"\nFound {len(unfinished)} unfinished tracks:\n")
|
||||
for idx, item in enumerate(unfinished, 1):
|
||||
title = player.extract_title(item)
|
||||
asin_val = player.extract_asin(item)
|
||||
authors = player.extract_authors(item)
|
||||
progress = player.extract_progress(item)
|
||||
logger.info(f"{idx}. {title}")
|
||||
logger.info(f" Author: {authors}")
|
||||
logger.info(f" Progress: {progress:.1f}%")
|
||||
logger.info(f" ASIN: {asin_val}\n")
|
||||
|
||||
choice = input("Enter track number or ASIN (blank to cancel): ").strip()
|
||||
if not choice:
|
||||
logger.info("Cancelled.")
|
||||
sys.exit(0)
|
||||
|
||||
if choice.isdigit():
|
||||
idx = int(choice) - 1
|
||||
if 0 <= idx < len(unfinished):
|
||||
asin = player.extract_asin(unfinished[idx])
|
||||
else:
|
||||
logger.error("Invalid track number.")
|
||||
sys.exit(1)
|
||||
else:
|
||||
asin = choice
|
||||
tracks = []
|
||||
for item in unfinished:
|
||||
tracks.append(
|
||||
{
|
||||
"title": player.extract_title(item),
|
||||
"asin": player.extract_asin(item),
|
||||
"authors": player.extract_authors(item),
|
||||
"progress": player.extract_progress(item),
|
||||
}
|
||||
)
|
||||
if not asin:
|
||||
unfinished = player.list_unfinished_tracks()
|
||||
tracks = []
|
||||
for item in unfinished:
|
||||
tracks.append(
|
||||
{
|
||||
"title": player.extract_title(item),
|
||||
"asin": player.extract_asin(item),
|
||||
"authors": player.extract_authors(item),
|
||||
"progress": player.extract_progress(item),
|
||||
}
|
||||
)
|
||||
print(json.dumps(tracks, indent=2))
|
||||
sys.exit(0)
|
||||
|
||||
logger.info(f"\nGetting download link for ASIN: {asin}")
|
||||
local_path = player.get_or_download(asin)
|
||||
|
||||
Reference in New Issue
Block a user