feat: speed up library fetching with concurrent page requests
This commit is contained in:
@@ -1,5 +1,6 @@
|
|||||||
"""Library helpers for fetching and formatting Audible data."""
|
"""Library helpers for fetching and formatting Audible data."""
|
||||||
|
|
||||||
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||||
from typing import Callable
|
from typing import Callable
|
||||||
|
|
||||||
import audible
|
import audible
|
||||||
@@ -18,38 +19,103 @@ class LibraryClient:
|
|||||||
"""Fetch all library items from the API."""
|
"""Fetch all library items from the API."""
|
||||||
response_groups = (
|
response_groups = (
|
||||||
"contributors,media,product_attrs,product_desc,product_details,"
|
"contributors,media,product_attrs,product_desc,product_details,"
|
||||||
"rating,is_finished,listening_status,percent_complete"
|
"is_finished,listening_status,percent_complete"
|
||||||
)
|
)
|
||||||
return self._fetch_all_pages(response_groups, on_progress)
|
return self._fetch_all_pages(response_groups, on_progress)
|
||||||
|
|
||||||
def _fetch_all_pages(
|
def _fetch_page(
|
||||||
self, response_groups: str, on_progress: ProgressCallback | None = None
|
self, page: int, page_size: int, response_groups: str
|
||||||
) -> list:
|
) -> tuple[int, list[dict]]:
|
||||||
"""Fetch all pages of library items from the API."""
|
"""Fetch a single page of library items."""
|
||||||
all_items: list[dict] = []
|
|
||||||
page = 1
|
|
||||||
page_size = 50
|
|
||||||
|
|
||||||
while True:
|
|
||||||
library = self.client.get(
|
library = self.client.get(
|
||||||
path="library",
|
path="library",
|
||||||
num_results=page_size,
|
num_results=page_size,
|
||||||
page=page,
|
page=page,
|
||||||
response_groups=response_groups,
|
response_groups=response_groups,
|
||||||
)
|
)
|
||||||
|
items = library.get("items", [])
|
||||||
|
return page, list(items)
|
||||||
|
|
||||||
items = list(library.get("items", []))
|
def _fetch_all_pages(
|
||||||
if not items:
|
self, response_groups: str, on_progress: ProgressCallback | None = None
|
||||||
|
) -> list:
|
||||||
|
"""Fetch all pages of library items from the API using maximum parallel fetching."""
|
||||||
|
library_response = None
|
||||||
|
page_size = 200
|
||||||
|
|
||||||
|
for attempt_size in [200, 100, 50]:
|
||||||
|
try:
|
||||||
|
library_response = self.client.get(
|
||||||
|
path="library",
|
||||||
|
num_results=attempt_size,
|
||||||
|
page=1,
|
||||||
|
response_groups=response_groups,
|
||||||
|
)
|
||||||
|
page_size = attempt_size
|
||||||
break
|
break
|
||||||
|
except Exception:
|
||||||
|
continue
|
||||||
|
|
||||||
all_items.extend(items)
|
if not library_response:
|
||||||
|
return []
|
||||||
|
|
||||||
|
first_page_items = library_response.get("items", [])
|
||||||
|
if not first_page_items:
|
||||||
|
return []
|
||||||
|
|
||||||
|
all_items: list[dict] = list(first_page_items)
|
||||||
if on_progress:
|
if on_progress:
|
||||||
on_progress(f"Fetched page {page} ({len(items)} items)...")
|
on_progress(f"Fetched page 1 ({len(first_page_items)} items)...")
|
||||||
|
|
||||||
if len(items) < page_size:
|
if len(first_page_items) < page_size:
|
||||||
|
return all_items
|
||||||
|
|
||||||
|
total_items_estimate = library_response.get(
|
||||||
|
"total_results") or library_response.get("total")
|
||||||
|
if total_items_estimate:
|
||||||
|
estimated_pages = (total_items_estimate +
|
||||||
|
page_size - 1) // page_size
|
||||||
|
estimated_pages = min(estimated_pages, 1000)
|
||||||
|
else:
|
||||||
|
estimated_pages = 500
|
||||||
|
|
||||||
|
max_workers = 50
|
||||||
|
page_results: dict[int, list[dict]] = {}
|
||||||
|
|
||||||
|
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||||
|
future_to_page: dict = {}
|
||||||
|
|
||||||
|
for page in range(2, estimated_pages + 1):
|
||||||
|
future = executor.submit(
|
||||||
|
self._fetch_page, page, page_size, response_groups
|
||||||
|
)
|
||||||
|
future_to_page[future] = page
|
||||||
|
|
||||||
|
completed_count = 0
|
||||||
|
total_items = len(first_page_items)
|
||||||
|
|
||||||
|
for future in as_completed(future_to_page):
|
||||||
|
page_num = future_to_page.pop(future)
|
||||||
|
try:
|
||||||
|
fetched_page, items = future.result()
|
||||||
|
if not items or len(items) < page_size:
|
||||||
|
for remaining_future in list(future_to_page.keys()):
|
||||||
|
remaining_future.cancel()
|
||||||
break
|
break
|
||||||
|
|
||||||
page += 1
|
page_results[fetched_page] = items
|
||||||
|
total_items += len(items)
|
||||||
|
completed_count += 1
|
||||||
|
if on_progress and completed_count % 20 == 0:
|
||||||
|
on_progress(
|
||||||
|
f"Fetched {completed_count} pages ({total_items} items)..."
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
for page_num in sorted(page_results.keys()):
|
||||||
|
all_items.extend(page_results[page_num])
|
||||||
|
|
||||||
return all_items
|
return all_items
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user