feat: return operation result stats and raise on fetch failures

This commit is contained in:
2026-02-16 12:34:05 +01:00
parent 2cdc4c6c42
commit 67ca69f900

View File

@@ -1,6 +1,7 @@
"""Shared operation utilities and strategies for Skywipe""" """Shared operation utilities and strategies for Skywipe"""
import time import time
from dataclasses import dataclass
from typing import Callable, Any from typing import Callable, Any
from atproto import models from atproto import models
@@ -9,6 +10,29 @@ from .configure import Configuration
from .logger import get_logger, ProgressTracker from .logger import get_logger, ProgressTracker
@dataclass(slots=True)
class OperationResult:
processed: int = 0
failed: int = 0
skipped: int = 0
class OperationFetchError(RuntimeError):
def __init__(
self,
operation_name: str,
batch_num: int,
error: Exception,
result: OperationResult,
):
self.operation_name = operation_name
self.batch_num = batch_num
self.result = result
super().__init__(
f"Failed to fetch items for '{operation_name}' at batch {batch_num}: {error}"
)
class OperationContext: class OperationContext:
def __init__(self, client=None, config_data=None): def __init__(self, client=None, config_data=None):
self.logger = get_logger() self.logger = get_logger()
@@ -19,21 +43,31 @@ class OperationContext:
def _initialize_client(self, client): def _initialize_client(self, client):
if client is not None: if client is not None:
return client, client.me.did did = self._extract_did(client)
return client, did
try: try:
auth = Auth() auth = Auth()
client = auth.login() client = auth.login()
return client, client.me.did did = self._extract_did(client)
return client, did
except (ValueError, FileNotFoundError) as e: except (ValueError, FileNotFoundError) as e:
self.logger.error(f"Configuration error: {e}") self.logger.error(f"Configuration error: {e}")
raise raise
except Exception as e: except Exception as e:
self.logger.error( self.logger.error(
f"Unexpected error during initialization: {e}", exc_info=True) f"Unexpected error during initialization: {e}", exc_info=True
)
raise ValueError( raise ValueError(
f"Failed to initialize operation context: {e}") from e f"Failed to initialize operation context: {e}") from e
def _extract_did(self, client) -> str:
me = getattr(client, "me", None)
did = getattr(me, "did", None)
if not did:
raise ValueError("Authenticated client does not expose a DID")
return did
def _initialize_config(self, config_data): def _initialize_config(self, config_data):
if config_data is not None: if config_data is not None:
return config_data return config_data
@@ -46,9 +80,9 @@ class OperationContext:
raise raise
except Exception as e: except Exception as e:
self.logger.error( self.logger.error(
f"Unexpected error loading configuration: {e}", exc_info=True) f"Unexpected error loading configuration: {e}", exc_info=True
raise ValueError( )
f"Failed to load configuration: {e}") from e raise ValueError(f"Failed to load configuration: {e}") from e
class BaseStrategy: class BaseStrategy:
@@ -74,21 +108,21 @@ class RecordDeletionStrategy(BaseStrategy):
repo=context.did, repo=context.did,
collection=self.collection, collection=self.collection,
limit=context.batch_size, limit=context.batch_size,
cursor=cursor cursor=cursor,
) )
return context.client.com.atproto.repo.list_records(params=list_params) return context.client.com.atproto.repo.list_records(params=list_params)
def extract_items(self, response: Any) -> list[Any]: def extract_items(self, response: Any) -> list[Any]:
return response.records return response.records
def process_item(self, record: Any, context: OperationContext) -> None: def process_item(self, item: Any, context: OperationContext) -> None:
record_uri = record.uri record_uri = item.uri
rkey = record_uri.rsplit("/", 1)[-1] rkey = record_uri.rsplit("/", 1)[-1]
delete_data = { delete_data = models.ComAtprotoRepoDeleteRecord.Data(
"repo": context.did, repo=context.did,
"collection": self.collection, collection=self.collection,
"rkey": rkey rkey=rkey,
} )
context.client.com.atproto.repo.delete_record(data=delete_data) context.client.com.atproto.repo.delete_record(data=delete_data)
context.logger.debug(f"Deleted: {record_uri}") context.logger.debug(f"Deleted: {record_uri}")
@@ -99,13 +133,15 @@ class FeedStrategy(BaseStrategy):
return context.client.get_author_feed( return context.client.get_author_feed(
actor=context.did, limit=context.batch_size, cursor=cursor actor=context.did, limit=context.batch_size, cursor=cursor
) )
return context.client.get_author_feed(actor=context.did, limit=context.batch_size) return context.client.get_author_feed(
actor=context.did, limit=context.batch_size
)
def extract_items(self, response: Any) -> list[Any]: def extract_items(self, response: Any) -> list[Any]:
return response.feed return response.feed
def process_item(self, post: Any, context: OperationContext) -> None: def process_item(self, item: Any, context: OperationContext) -> None:
uri = post.post.uri uri = item.post.uri
context.client.delete_post(uri) context.client.delete_post(uri)
context.logger.debug(f"Deleted post: {uri}") context.logger.debug(f"Deleted post: {uri}")
@@ -113,16 +149,15 @@ class FeedStrategy(BaseStrategy):
class BookmarkStrategy(BaseStrategy): class BookmarkStrategy(BaseStrategy):
def fetch(self, context: OperationContext, cursor: str | None = None) -> Any: def fetch(self, context: OperationContext, cursor: str | None = None) -> Any:
get_params = models.AppBskyBookmarkGetBookmarks.Params( get_params = models.AppBskyBookmarkGetBookmarks.Params(
limit=context.batch_size, limit=context.batch_size, cursor=cursor
cursor=cursor
) )
return context.client.app.bsky.bookmark.get_bookmarks(params=get_params) return context.client.app.bsky.bookmark.get_bookmarks(params=get_params)
def extract_items(self, response: Any) -> list[Any]: def extract_items(self, response: Any) -> list[Any]:
return response.bookmarks return response.bookmarks
def process_item(self, bookmark: Any, context: OperationContext) -> None: def process_item(self, item: Any, context: OperationContext) -> None:
bookmark_uri = self._extract_bookmark_uri(bookmark) bookmark_uri = self._extract_bookmark_uri(item)
if not bookmark_uri: if not bookmark_uri:
raise ValueError("Unable to find bookmark URI") raise ValueError("Unable to find bookmark URI")
@@ -151,7 +186,7 @@ class Operation:
collection: str | None = None, collection: str | None = None,
filter_fn: Callable[[Any], bool] | None = None, filter_fn: Callable[[Any], bool] | None = None,
client=None, client=None,
config_data=None config_data=None,
): ):
self.operation_name = operation_name self.operation_name = operation_name
self.filter_fn = filter_fn self.filter_fn = filter_fn
@@ -168,7 +203,7 @@ class Operation:
else: else:
self.strategy = FeedStrategy() self.strategy = FeedStrategy()
def run(self) -> int: def run(self) -> OperationResult:
context = OperationContext( context = OperationContext(
client=self._client, config_data=self._config_data) client=self._client, config_data=self._config_data)
progress = ProgressTracker(operation=self.operation_name) progress = ProgressTracker(operation=self.operation_name)
@@ -178,7 +213,7 @@ class Operation:
) )
cursor = None cursor = None
total_processed = 0 result = OperationResult()
batch_num = 0 batch_num = 0
while True: while True:
@@ -187,9 +222,16 @@ class Operation:
response = self.strategy.fetch(context, cursor) response = self.strategy.fetch(context, cursor)
items = self.strategy.extract_items(response) items = self.strategy.extract_items(response)
except Exception as e: except Exception as e:
context.logger.error( raise OperationFetchError(
f"Error fetching items for batch {batch_num}: {e}", exc_info=True) self.operation_name,
break batch_num,
e,
OperationResult(
processed=result.processed,
failed=result.failed,
skipped=result.skipped,
),
) from e
if not items: if not items:
break break
@@ -198,13 +240,15 @@ class Operation:
for item in items: for item in items:
if self.filter_fn and not self.filter_fn(item): if self.filter_fn and not self.filter_fn(item):
result.skipped += 1
continue continue
try: try:
self.strategy.process_item(item, context) self.strategy.process_item(item, context)
total_processed += 1 result.processed += 1
progress.update(1) progress.update(1)
except Exception as e: except Exception as e:
result.failed += 1
context.logger.error(f"Error processing item: {e}") context.logger.error(f"Error processing item: {e}")
cursor = self.strategy.get_cursor(response) cursor = self.strategy.get_cursor(response)
@@ -215,5 +259,6 @@ class Operation:
time.sleep(context.delay) time.sleep(context.delay)
context.logger.info( context.logger.info(
f"{self.operation_name}: {total_processed} items processed.") f"{self.operation_name}: processed={result.processed}, failed={result.failed}, skipped={result.skipped}."
return total_processed )
return result