From 67ca69f900b80c9693a1df67ae0947c70db19d52 Mon Sep 17 00:00:00 2001 From: Kharec Date: Mon, 16 Feb 2026 12:34:05 +0100 Subject: [PATCH] feat: return operation result stats and raise on fetch failures --- skywipe/operations.py | 105 ++++++++++++++++++++++++++++++------------ 1 file changed, 75 insertions(+), 30 deletions(-) diff --git a/skywipe/operations.py b/skywipe/operations.py index 6cc91c0..e28dab1 100644 --- a/skywipe/operations.py +++ b/skywipe/operations.py @@ -1,6 +1,7 @@ """Shared operation utilities and strategies for Skywipe""" import time +from dataclasses import dataclass from typing import Callable, Any from atproto import models @@ -9,6 +10,29 @@ from .configure import Configuration from .logger import get_logger, ProgressTracker +@dataclass(slots=True) +class OperationResult: + processed: int = 0 + failed: int = 0 + skipped: int = 0 + + +class OperationFetchError(RuntimeError): + def __init__( + self, + operation_name: str, + batch_num: int, + error: Exception, + result: OperationResult, + ): + self.operation_name = operation_name + self.batch_num = batch_num + self.result = result + super().__init__( + f"Failed to fetch items for '{operation_name}' at batch {batch_num}: {error}" + ) + + class OperationContext: def __init__(self, client=None, config_data=None): self.logger = get_logger() @@ -19,21 +43,31 @@ class OperationContext: def _initialize_client(self, client): if client is not None: - return client, client.me.did + did = self._extract_did(client) + return client, did try: auth = Auth() client = auth.login() - return client, client.me.did + did = self._extract_did(client) + return client, did except (ValueError, FileNotFoundError) as e: self.logger.error(f"Configuration error: {e}") raise except Exception as e: self.logger.error( - f"Unexpected error during initialization: {e}", exc_info=True) + f"Unexpected error during initialization: {e}", exc_info=True + ) raise ValueError( f"Failed to initialize operation context: {e}") from e + def _extract_did(self, client) -> str: + me = getattr(client, "me", None) + did = getattr(me, "did", None) + if not did: + raise ValueError("Authenticated client does not expose a DID") + return did + def _initialize_config(self, config_data): if config_data is not None: return config_data @@ -46,9 +80,9 @@ class OperationContext: raise except Exception as e: self.logger.error( - f"Unexpected error loading configuration: {e}", exc_info=True) - raise ValueError( - f"Failed to load configuration: {e}") from e + f"Unexpected error loading configuration: {e}", exc_info=True + ) + raise ValueError(f"Failed to load configuration: {e}") from e class BaseStrategy: @@ -74,21 +108,21 @@ class RecordDeletionStrategy(BaseStrategy): repo=context.did, collection=self.collection, limit=context.batch_size, - cursor=cursor + cursor=cursor, ) return context.client.com.atproto.repo.list_records(params=list_params) def extract_items(self, response: Any) -> list[Any]: return response.records - def process_item(self, record: Any, context: OperationContext) -> None: - record_uri = record.uri + def process_item(self, item: Any, context: OperationContext) -> None: + record_uri = item.uri rkey = record_uri.rsplit("/", 1)[-1] - delete_data = { - "repo": context.did, - "collection": self.collection, - "rkey": rkey - } + delete_data = models.ComAtprotoRepoDeleteRecord.Data( + repo=context.did, + collection=self.collection, + rkey=rkey, + ) context.client.com.atproto.repo.delete_record(data=delete_data) context.logger.debug(f"Deleted: {record_uri}") @@ -99,13 +133,15 @@ class FeedStrategy(BaseStrategy): return context.client.get_author_feed( actor=context.did, limit=context.batch_size, cursor=cursor ) - return context.client.get_author_feed(actor=context.did, limit=context.batch_size) + return context.client.get_author_feed( + actor=context.did, limit=context.batch_size + ) def extract_items(self, response: Any) -> list[Any]: return response.feed - def process_item(self, post: Any, context: OperationContext) -> None: - uri = post.post.uri + def process_item(self, item: Any, context: OperationContext) -> None: + uri = item.post.uri context.client.delete_post(uri) context.logger.debug(f"Deleted post: {uri}") @@ -113,16 +149,15 @@ class FeedStrategy(BaseStrategy): class BookmarkStrategy(BaseStrategy): def fetch(self, context: OperationContext, cursor: str | None = None) -> Any: get_params = models.AppBskyBookmarkGetBookmarks.Params( - limit=context.batch_size, - cursor=cursor + limit=context.batch_size, cursor=cursor ) return context.client.app.bsky.bookmark.get_bookmarks(params=get_params) def extract_items(self, response: Any) -> list[Any]: return response.bookmarks - def process_item(self, bookmark: Any, context: OperationContext) -> None: - bookmark_uri = self._extract_bookmark_uri(bookmark) + def process_item(self, item: Any, context: OperationContext) -> None: + bookmark_uri = self._extract_bookmark_uri(item) if not bookmark_uri: raise ValueError("Unable to find bookmark URI") @@ -151,7 +186,7 @@ class Operation: collection: str | None = None, filter_fn: Callable[[Any], bool] | None = None, client=None, - config_data=None + config_data=None, ): self.operation_name = operation_name self.filter_fn = filter_fn @@ -168,7 +203,7 @@ class Operation: else: self.strategy = FeedStrategy() - def run(self) -> int: + def run(self) -> OperationResult: context = OperationContext( client=self._client, config_data=self._config_data) progress = ProgressTracker(operation=self.operation_name) @@ -178,7 +213,7 @@ class Operation: ) cursor = None - total_processed = 0 + result = OperationResult() batch_num = 0 while True: @@ -187,9 +222,16 @@ class Operation: response = self.strategy.fetch(context, cursor) items = self.strategy.extract_items(response) except Exception as e: - context.logger.error( - f"Error fetching items for batch {batch_num}: {e}", exc_info=True) - break + raise OperationFetchError( + self.operation_name, + batch_num, + e, + OperationResult( + processed=result.processed, + failed=result.failed, + skipped=result.skipped, + ), + ) from e if not items: break @@ -198,13 +240,15 @@ class Operation: for item in items: if self.filter_fn and not self.filter_fn(item): + result.skipped += 1 continue try: self.strategy.process_item(item, context) - total_processed += 1 + result.processed += 1 progress.update(1) except Exception as e: + result.failed += 1 context.logger.error(f"Error processing item: {e}") cursor = self.strategy.get_cursor(response) @@ -215,5 +259,6 @@ class Operation: time.sleep(context.delay) context.logger.info( - f"{self.operation_name}: {total_processed} items processed.") - return total_processed + f"{self.operation_name}: processed={result.processed}, failed={result.failed}, skipped={result.skipped}." + ) + return result