diff --git a/auditui/downloads/manager.py b/auditui/downloads/manager.py index 6cc3bca..d4deb8f 100644 --- a/auditui/downloads/manager.py +++ b/auditui/downloads/manager.py @@ -1,6 +1,7 @@ """Obtains AAX files from Audible (cache or download) and provides activation bytes.""" import re +import unicodedata from pathlib import Path from urllib.parse import urlparse @@ -33,26 +34,31 @@ class DownloadManager: self.cache_dir = cache_dir self.cache_dir.mkdir(parents=True, exist_ok=True) self.chunk_size = chunk_size - self._http_client = httpx.Client( - auth=auth, timeout=30.0, follow_redirects=True) + self._http_client = httpx.Client(auth=auth, timeout=30.0, follow_redirects=True) self._download_client = httpx.Client( - timeout=httpx.Timeout(connect=30.0, read=None, - write=30.0, pool=30.0), + timeout=httpx.Timeout(connect=30.0, read=None, write=30.0, pool=30.0), follow_redirects=True, ) def get_or_download( - self, asin: str, notify: StatusCallback | None = None + self, + asin: str, + notify: StatusCallback | None = None, + preferred_title: str | None = None, + preferred_author: str | None = None, ) -> Path | None: """Return local path to AAX file; download and cache if not present.""" - title = self._get_name_from_asin(asin) or asin - safe_title = self._sanitize_filename(title) - local_path = self.cache_dir / f"{safe_title}.aax" - - if local_path.exists() and local_path.stat().st_size >= MIN_FILE_SIZE: + filename_stems = self._get_filename_stems_from_asin( + asin, + preferred_title=preferred_title, + preferred_author=preferred_author, + ) + local_path = self.cache_dir / f"{filename_stems[0]}.aax" + cached_path = self._find_cached_path(filename_stems) + if cached_path: if notify: - notify(f"Using cached file: {local_path.name}") - return local_path + notify(f"Using cached file: {cached_path.name}") + return cached_path if notify: notify(f"Downloading to {local_path.name}...") @@ -92,12 +98,7 @@ class DownloadManager: def get_cached_path(self, asin: str) -> Path | None: """Return path to cached AAX file if it exists and is valid size.""" - title = self._get_name_from_asin(asin) or asin - safe_title = self._sanitize_filename(title) - local_path = self.cache_dir / f"{safe_title}.aax" - if local_path.exists() and local_path.stat().st_size >= MIN_FILE_SIZE: - return local_path - return None + return self._find_cached_path(self._get_filename_stems_from_asin(asin)) def is_cached(self, asin: str) -> bool: """Return True if the title is present in cache with valid size.""" @@ -130,20 +131,68 @@ class DownloadManager: return False def _sanitize_filename(self, filename: str) -> str: - """Remove invalid characters from filename.""" - return re.sub(r'[<>:"/\\|?*]', "_", filename) + """Normalize a filename segment with ASCII letters, digits, and dashes.""" + ascii_text = unicodedata.normalize("NFKD", filename) + ascii_text = ascii_text.encode("ascii", "ignore").decode("ascii") + ascii_text = re.sub(r"[’'`]+", "", ascii_text) + ascii_text = re.sub(r"[^A-Za-z0-9]+", "-", ascii_text) + ascii_text = re.sub(r"-+", "-", ascii_text) + ascii_text = ascii_text.strip("-._") + return ascii_text or "Unknown" + + def _find_cached_path(self, filename_stems: list[str]) -> Path | None: + """Return the first valid cached path matching any candidate filename stem.""" + for filename_stem in filename_stems: + local_path = self.cache_dir / f"{filename_stem}.aax" + if local_path.exists() and local_path.stat().st_size >= MIN_FILE_SIZE: + return local_path + return None + + def _get_filename_stems_from_asin( + self, + asin: str, + preferred_title: str | None = None, + preferred_author: str | None = None, + ) -> list[str]: + """Build preferred and fallback cache filename stems for an ASIN.""" + if preferred_title: + preferred_combined = ( + f"{self._sanitize_filename(preferred_author or 'Unknown Author')}_" + f"{self._sanitize_filename(preferred_title)}" + ) + preferred_legacy = self._sanitize_filename(preferred_title) + fallback_asin = self._sanitize_filename(asin) + return list( + dict.fromkeys([preferred_combined, preferred_legacy, fallback_asin]) + ) - def _get_name_from_asin(self, asin: str) -> str | None: - """Get the title/name of a book from its ASIN.""" try: product_info = self.client.get( path=f"1.0/catalog/products/{asin}", - response_groups="product_desc,product_attrs", + **{"response_groups": "contributors,product_desc,product_attrs"}, ) product = product_info.get("product", {}) - return product.get("title") or "Unknown Title" - except (OSError, ValueError, KeyError): - return None + title = product.get("title") or "Unknown Title" + author = self._get_primary_author(product) + combined = ( + f"{self._sanitize_filename(author)}_{self._sanitize_filename(title)}" + ) + legacy_title = self._sanitize_filename(title) + fallback_asin = self._sanitize_filename(asin) + return list(dict.fromkeys([combined, legacy_title, fallback_asin])) + except (OSError, ValueError, KeyError, AttributeError): + return [self._sanitize_filename(asin)] + + def _get_primary_author(self, product: dict) -> str: + """Extract a primary author name from product metadata.""" + contributors = product.get("authors") or product.get("contributors") or [] + for contributor in contributors: + if not isinstance(contributor, dict): + continue + name = contributor.get("name") + if isinstance(name, str) and name.strip(): + return name + return "Unknown Author" def _get_download_link( self, @@ -174,7 +223,8 @@ class DownloadManager: if not link: link = str(response.url) - tld = self.auth.locale.domain + locale = getattr(self.auth, "locale", None) + tld = getattr(locale, "domain", "com") return link.replace("cds.audible.com", f"cds.audible.{tld}") except httpx.HTTPError as exc: