Massive refactoring #1
@@ -1,6 +1,7 @@
|
||||
"""Obtains AAX files from Audible (cache or download) and provides activation bytes."""
|
||||
|
||||
import re
|
||||
import unicodedata
|
||||
from pathlib import Path
|
||||
from urllib.parse import urlparse
|
||||
|
||||
@@ -33,26 +34,31 @@ class DownloadManager:
|
||||
self.cache_dir = cache_dir
|
||||
self.cache_dir.mkdir(parents=True, exist_ok=True)
|
||||
self.chunk_size = chunk_size
|
||||
self._http_client = httpx.Client(
|
||||
auth=auth, timeout=30.0, follow_redirects=True)
|
||||
self._http_client = httpx.Client(auth=auth, timeout=30.0, follow_redirects=True)
|
||||
self._download_client = httpx.Client(
|
||||
timeout=httpx.Timeout(connect=30.0, read=None,
|
||||
write=30.0, pool=30.0),
|
||||
timeout=httpx.Timeout(connect=30.0, read=None, write=30.0, pool=30.0),
|
||||
follow_redirects=True,
|
||||
)
|
||||
|
||||
def get_or_download(
|
||||
self, asin: str, notify: StatusCallback | None = None
|
||||
self,
|
||||
asin: str,
|
||||
notify: StatusCallback | None = None,
|
||||
preferred_title: str | None = None,
|
||||
preferred_author: str | None = None,
|
||||
) -> Path | None:
|
||||
"""Return local path to AAX file; download and cache if not present."""
|
||||
title = self._get_name_from_asin(asin) or asin
|
||||
safe_title = self._sanitize_filename(title)
|
||||
local_path = self.cache_dir / f"{safe_title}.aax"
|
||||
|
||||
if local_path.exists() and local_path.stat().st_size >= MIN_FILE_SIZE:
|
||||
filename_stems = self._get_filename_stems_from_asin(
|
||||
asin,
|
||||
preferred_title=preferred_title,
|
||||
preferred_author=preferred_author,
|
||||
)
|
||||
local_path = self.cache_dir / f"{filename_stems[0]}.aax"
|
||||
cached_path = self._find_cached_path(filename_stems)
|
||||
if cached_path:
|
||||
if notify:
|
||||
notify(f"Using cached file: {local_path.name}")
|
||||
return local_path
|
||||
notify(f"Using cached file: {cached_path.name}")
|
||||
return cached_path
|
||||
|
||||
if notify:
|
||||
notify(f"Downloading to {local_path.name}...")
|
||||
@@ -92,12 +98,7 @@ class DownloadManager:
|
||||
|
||||
def get_cached_path(self, asin: str) -> Path | None:
|
||||
"""Return path to cached AAX file if it exists and is valid size."""
|
||||
title = self._get_name_from_asin(asin) or asin
|
||||
safe_title = self._sanitize_filename(title)
|
||||
local_path = self.cache_dir / f"{safe_title}.aax"
|
||||
if local_path.exists() and local_path.stat().st_size >= MIN_FILE_SIZE:
|
||||
return local_path
|
||||
return None
|
||||
return self._find_cached_path(self._get_filename_stems_from_asin(asin))
|
||||
|
||||
def is_cached(self, asin: str) -> bool:
|
||||
"""Return True if the title is present in cache with valid size."""
|
||||
@@ -130,20 +131,68 @@ class DownloadManager:
|
||||
return False
|
||||
|
||||
def _sanitize_filename(self, filename: str) -> str:
|
||||
"""Remove invalid characters from filename."""
|
||||
return re.sub(r'[<>:"/\\|?*]', "_", filename)
|
||||
"""Normalize a filename segment with ASCII letters, digits, and dashes."""
|
||||
ascii_text = unicodedata.normalize("NFKD", filename)
|
||||
ascii_text = ascii_text.encode("ascii", "ignore").decode("ascii")
|
||||
ascii_text = re.sub(r"[’'`]+", "", ascii_text)
|
||||
ascii_text = re.sub(r"[^A-Za-z0-9]+", "-", ascii_text)
|
||||
ascii_text = re.sub(r"-+", "-", ascii_text)
|
||||
ascii_text = ascii_text.strip("-._")
|
||||
return ascii_text or "Unknown"
|
||||
|
||||
def _find_cached_path(self, filename_stems: list[str]) -> Path | None:
|
||||
"""Return the first valid cached path matching any candidate filename stem."""
|
||||
for filename_stem in filename_stems:
|
||||
local_path = self.cache_dir / f"{filename_stem}.aax"
|
||||
if local_path.exists() and local_path.stat().st_size >= MIN_FILE_SIZE:
|
||||
return local_path
|
||||
return None
|
||||
|
||||
def _get_filename_stems_from_asin(
|
||||
self,
|
||||
asin: str,
|
||||
preferred_title: str | None = None,
|
||||
preferred_author: str | None = None,
|
||||
) -> list[str]:
|
||||
"""Build preferred and fallback cache filename stems for an ASIN."""
|
||||
if preferred_title:
|
||||
preferred_combined = (
|
||||
f"{self._sanitize_filename(preferred_author or 'Unknown Author')}_"
|
||||
f"{self._sanitize_filename(preferred_title)}"
|
||||
)
|
||||
preferred_legacy = self._sanitize_filename(preferred_title)
|
||||
fallback_asin = self._sanitize_filename(asin)
|
||||
return list(
|
||||
dict.fromkeys([preferred_combined, preferred_legacy, fallback_asin])
|
||||
)
|
||||
|
||||
def _get_name_from_asin(self, asin: str) -> str | None:
|
||||
"""Get the title/name of a book from its ASIN."""
|
||||
try:
|
||||
product_info = self.client.get(
|
||||
path=f"1.0/catalog/products/{asin}",
|
||||
response_groups="product_desc,product_attrs",
|
||||
**{"response_groups": "contributors,product_desc,product_attrs"},
|
||||
)
|
||||
product = product_info.get("product", {})
|
||||
return product.get("title") or "Unknown Title"
|
||||
except (OSError, ValueError, KeyError):
|
||||
return None
|
||||
title = product.get("title") or "Unknown Title"
|
||||
author = self._get_primary_author(product)
|
||||
combined = (
|
||||
f"{self._sanitize_filename(author)}_{self._sanitize_filename(title)}"
|
||||
)
|
||||
legacy_title = self._sanitize_filename(title)
|
||||
fallback_asin = self._sanitize_filename(asin)
|
||||
return list(dict.fromkeys([combined, legacy_title, fallback_asin]))
|
||||
except (OSError, ValueError, KeyError, AttributeError):
|
||||
return [self._sanitize_filename(asin)]
|
||||
|
||||
def _get_primary_author(self, product: dict) -> str:
|
||||
"""Extract a primary author name from product metadata."""
|
||||
contributors = product.get("authors") or product.get("contributors") or []
|
||||
for contributor in contributors:
|
||||
if not isinstance(contributor, dict):
|
||||
continue
|
||||
name = contributor.get("name")
|
||||
if isinstance(name, str) and name.strip():
|
||||
return name
|
||||
return "Unknown Author"
|
||||
|
||||
def _get_download_link(
|
||||
self,
|
||||
@@ -174,7 +223,8 @@ class DownloadManager:
|
||||
if not link:
|
||||
link = str(response.url)
|
||||
|
||||
tld = self.auth.locale.domain
|
||||
locale = getattr(self.auth, "locale", None)
|
||||
tld = getattr(locale, "domain", "com")
|
||||
return link.replace("cds.audible.com", f"cds.audible.{tld}")
|
||||
|
||||
except httpx.HTTPError as exc:
|
||||
|
||||
Reference in New Issue
Block a user