Massive refactoring #1
@@ -3,7 +3,6 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||||
|
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from ..types import LibraryItem, StatusCallback
|
from ..types import LibraryItem, StatusCallback
|
||||||
@@ -76,39 +75,73 @@ class LibraryClientFetchMixin:
|
|||||||
if len(first_page_items) < page_size:
|
if len(first_page_items) < page_size:
|
||||||
return all_items
|
return all_items
|
||||||
|
|
||||||
|
estimated_pages = self._estimate_total_pages(
|
||||||
|
library_response, page_size)
|
||||||
|
page_results = self._fetch_remaining_pages(
|
||||||
|
response_groups=response_groups,
|
||||||
|
page_size=page_size,
|
||||||
|
estimated_pages=estimated_pages,
|
||||||
|
initial_total=len(first_page_items),
|
||||||
|
on_progress=on_progress,
|
||||||
|
)
|
||||||
|
|
||||||
|
for page_num in sorted(page_results.keys()):
|
||||||
|
all_items.extend(page_results[page_num])
|
||||||
|
|
||||||
|
return all_items
|
||||||
|
|
||||||
|
def _estimate_total_pages(self, library_response: dict, page_size: int) -> int:
|
||||||
|
"""Estimate total pages from API metadata with a conservative cap."""
|
||||||
total_items_estimate = library_response.get(
|
total_items_estimate = library_response.get(
|
||||||
"total_results"
|
"total_results"
|
||||||
) or library_response.get("total")
|
) or library_response.get("total")
|
||||||
if total_items_estimate:
|
if not total_items_estimate:
|
||||||
|
return 500
|
||||||
estimated_pages = (total_items_estimate + page_size - 1) // page_size
|
estimated_pages = (total_items_estimate + page_size - 1) // page_size
|
||||||
estimated_pages = min(estimated_pages, 1000)
|
return min(estimated_pages, 1000)
|
||||||
else:
|
|
||||||
estimated_pages = 500
|
|
||||||
|
|
||||||
max_workers = 50
|
def _fetch_remaining_pages(
|
||||||
|
self,
|
||||||
|
response_groups: str,
|
||||||
|
page_size: int,
|
||||||
|
estimated_pages: int,
|
||||||
|
initial_total: int,
|
||||||
|
on_progress: StatusCallback | None = None,
|
||||||
|
) -> dict[int, list[LibraryItem]]:
|
||||||
|
"""Fetch pages 2..N with bounded in-flight requests for faster startup."""
|
||||||
page_results: dict[int, list[LibraryItem]] = {}
|
page_results: dict[int, list[LibraryItem]] = {}
|
||||||
|
max_workers = min(16, max(1, estimated_pages - 1))
|
||||||
|
next_page_to_submit = 2
|
||||||
|
stop_page = estimated_pages + 1
|
||||||
|
completed_count = 0
|
||||||
|
total_items = initial_total
|
||||||
|
|
||||||
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||||
future_to_page: dict = {}
|
future_to_page: dict = {}
|
||||||
|
|
||||||
for page in range(2, estimated_pages + 1):
|
while (
|
||||||
|
next_page_to_submit <= estimated_pages
|
||||||
|
and next_page_to_submit < stop_page
|
||||||
|
and len(future_to_page) < max_workers
|
||||||
|
):
|
||||||
future = executor.submit(
|
future = executor.submit(
|
||||||
self._fetch_page, page, page_size, response_groups
|
self._fetch_page,
|
||||||
|
next_page_to_submit,
|
||||||
|
page_size,
|
||||||
|
response_groups,
|
||||||
)
|
)
|
||||||
future_to_page[future] = page
|
future_to_page[future] = next_page_to_submit
|
||||||
|
next_page_to_submit += 1
|
||||||
|
|
||||||
completed_count = 0
|
while future_to_page:
|
||||||
total_items = len(first_page_items)
|
future = next(as_completed(future_to_page))
|
||||||
|
page_num = future_to_page.pop(future)
|
||||||
for future in as_completed(future_to_page):
|
|
||||||
try:
|
try:
|
||||||
fetched_page, items = future.result()
|
fetched_page, items = future.result()
|
||||||
future_to_page.pop(future, None)
|
except Exception:
|
||||||
if not items or len(items) < page_size:
|
continue
|
||||||
for remaining_future in list(future_to_page.keys()):
|
|
||||||
remaining_future.cancel()
|
|
||||||
break
|
|
||||||
|
|
||||||
|
if items:
|
||||||
page_results[fetched_page] = items
|
page_results[fetched_page] = items
|
||||||
total_items += len(items)
|
total_items += len(items)
|
||||||
completed_count += 1
|
completed_count += 1
|
||||||
@@ -116,12 +149,21 @@ class LibraryClientFetchMixin:
|
|||||||
on_progress(
|
on_progress(
|
||||||
f"Fetched {completed_count} pages ({total_items} items)..."
|
f"Fetched {completed_count} pages ({total_items} items)..."
|
||||||
)
|
)
|
||||||
|
if len(items) < page_size:
|
||||||
|
stop_page = min(stop_page, fetched_page)
|
||||||
|
|
||||||
except Exception:
|
while (
|
||||||
future_to_page.pop(future, None)
|
next_page_to_submit <= estimated_pages
|
||||||
pass
|
and next_page_to_submit < stop_page
|
||||||
|
and len(future_to_page) < max_workers
|
||||||
|
):
|
||||||
|
next_future = executor.submit(
|
||||||
|
self._fetch_page,
|
||||||
|
next_page_to_submit,
|
||||||
|
page_size,
|
||||||
|
response_groups,
|
||||||
|
)
|
||||||
|
future_to_page[next_future] = next_page_to_submit
|
||||||
|
next_page_to_submit += 1
|
||||||
|
|
||||||
for page_num in sorted(page_results.keys()):
|
return page_results
|
||||||
all_items.extend(page_results[page_num])
|
|
||||||
|
|
||||||
return all_items
|
|
||||||
|
|||||||
Reference in New Issue
Block a user