Compare commits

..

5 Commits

5 changed files with 196 additions and 54 deletions

View File

@@ -2,7 +2,7 @@
from typing import Callable, Any, TypedDict
from .configure import Configuration
from .operations import Operation
from .operations import Operation, OperationFetchError
from .post_analysis import PostAnalyzer
from .logger import get_logger, handle_error
from .safeguard import require_confirmation
@@ -35,7 +35,7 @@ COMMAND_METADATA: dict[str, CommandMetadata] = {
"operation_name": "Deleting posts with media",
"strategy_type": "feed",
"collection": None,
"filter_fn": lambda post: PostAnalyzer.has_media(post.post),
"filter_fn": lambda post: bool(PostAnalyzer.has_media(post.post)),
},
"likes": {
"confirmation": "undo all likes",
@@ -59,7 +59,7 @@ COMMAND_METADATA: dict[str, CommandMetadata] = {
"operation_name": "Deleting quote posts",
"strategy_type": "feed",
"collection": None,
"filter_fn": lambda post: PostAnalyzer.has_quote(post.post),
"filter_fn": lambda post: bool(PostAnalyzer.has_quote(post.post)),
},
"follows": {
"confirmation": "unfollow all accounts",
@@ -101,7 +101,7 @@ class CommandRegistry:
name: str,
handler: CommandHandler,
help_text: str,
requires_config: bool = True
requires_config: bool = True,
):
self._commands[name] = handler
self._help_texts[name] = help_text
@@ -138,7 +138,7 @@ def _create_operation_handler(
operation_name: str,
strategy_type: str = "feed",
collection: str | None = None,
filter_fn: Callable[[Any], bool] | None = None
filter_fn: Callable[[Any], bool] | None = None,
) -> CommandHandler:
logger = get_logger()
@@ -149,10 +149,11 @@ def _create_operation_handler(
operation_name,
strategy_type=strategy_type,
collection=collection,
filter_fn=filter_fn
filter_fn=filter_fn,
).run()
except (ValueError, Exception) as e:
except Exception as e:
handle_error(e, logger)
return handler
@@ -169,7 +170,7 @@ def _create_command_handlers():
metadata["operation_name"],
strategy_type=metadata["strategy_type"],
collection=metadata["collection"],
filter_fn=metadata["filter_fn"]
filter_fn=metadata["filter_fn"],
)
return handlers
@@ -179,16 +180,19 @@ _command_handlers = _create_command_handlers()
def run_all(skip_confirmation: bool = False):
logger = get_logger()
fetch_failures: list[str] = []
all_commands = registry.get_all_commands()
available_commands = [cmd for cmd in all_commands.keys()
if cmd not in ("configure", "all")]
available_commands = [
cmd for cmd in all_commands.keys() if cmd not in ("configure", "all")
]
commands = [cmd for cmd in COMMAND_EXECUTION_ORDER
if cmd in available_commands]
commands = [
cmd for cmd in COMMAND_EXECUTION_ORDER if cmd in available_commands]
commands.extend([cmd for cmd in available_commands
if cmd not in COMMAND_EXECUTION_ORDER])
commands.extend(
[cmd for cmd in available_commands if cmd not in COMMAND_EXECUTION_ORDER]
)
commands_str = ", ".join(commands)
all_confirmation = f"run all cleanup commands ({commands_str})"
@@ -197,6 +201,7 @@ def run_all(skip_confirmation: bool = False):
logger.info("Running all cleanup commands...")
from .operations import OperationContext
try:
context = OperationContext()
shared_client = context.client
@@ -217,19 +222,32 @@ def run_all(skip_confirmation: bool = False):
collection=metadata["collection"],
filter_fn=metadata["filter_fn"],
client=shared_client,
config_data=shared_config_data
config_data=shared_config_data,
).run()
else:
registry.execute(cmd, skip_confirmation=True)
logger.info(f"Completed command: {cmd}")
except OperationFetchError as e:
fetch_failures.append(cmd)
logger.error(
f"Fetch error while running '{cmd}': {e}", exc_info=True)
continue
except Exception as e:
logger.error(f"Error running '{cmd}': {e}", exc_info=True)
continue
if fetch_failures:
failed_commands = ", ".join(fetch_failures)
raise RuntimeError(
f"One or more commands failed while fetching items: {failed_commands}"
)
logger.info("All commands completed.")
registry.register("configure", run_configure,
"create configuration", requires_config=False)
registry.register(
"configure", run_configure, "create configuration", requires_config=False
)
for cmd, metadata in COMMAND_METADATA.items():
registry.register(cmd, _command_handlers[cmd], metadata["help_text"])
registry.register("all", run_all, "target everything")

View File

@@ -1,6 +1,7 @@
"""Shared operation utilities and strategies for Skywipe"""
import time
from dataclasses import dataclass
from typing import Callable, Any
from atproto import models
@@ -9,6 +10,29 @@ from .configure import Configuration
from .logger import get_logger, ProgressTracker
@dataclass(slots=True)
class OperationResult:
processed: int = 0
failed: int = 0
skipped: int = 0
class OperationFetchError(RuntimeError):
def __init__(
self,
operation_name: str,
batch_num: int,
error: Exception,
result: OperationResult,
):
self.operation_name = operation_name
self.batch_num = batch_num
self.result = result
super().__init__(
f"Failed to fetch items for '{operation_name}' at batch {batch_num}: {error}"
)
class OperationContext:
def __init__(self, client=None, config_data=None):
self.logger = get_logger()
@@ -19,21 +43,31 @@ class OperationContext:
def _initialize_client(self, client):
if client is not None:
return client, client.me.did
did = self._extract_did(client)
return client, did
try:
auth = Auth()
client = auth.login()
return client, client.me.did
did = self._extract_did(client)
return client, did
except (ValueError, FileNotFoundError) as e:
self.logger.error(f"Configuration error: {e}")
raise
except Exception as e:
self.logger.error(
f"Unexpected error during initialization: {e}", exc_info=True)
f"Unexpected error during initialization: {e}", exc_info=True
)
raise ValueError(
f"Failed to initialize operation context: {e}") from e
def _extract_did(self, client) -> str:
me = getattr(client, "me", None)
did = getattr(me, "did", None)
if not did:
raise ValueError("Authenticated client does not expose a DID")
return did
def _initialize_config(self, config_data):
if config_data is not None:
return config_data
@@ -46,9 +80,9 @@ class OperationContext:
raise
except Exception as e:
self.logger.error(
f"Unexpected error loading configuration: {e}", exc_info=True)
raise ValueError(
f"Failed to load configuration: {e}") from e
f"Unexpected error loading configuration: {e}", exc_info=True
)
raise ValueError(f"Failed to load configuration: {e}") from e
class BaseStrategy:
@@ -74,21 +108,21 @@ class RecordDeletionStrategy(BaseStrategy):
repo=context.did,
collection=self.collection,
limit=context.batch_size,
cursor=cursor
cursor=cursor,
)
return context.client.com.atproto.repo.list_records(params=list_params)
def extract_items(self, response: Any) -> list[Any]:
return response.records
def process_item(self, record: Any, context: OperationContext) -> None:
record_uri = record.uri
def process_item(self, item: Any, context: OperationContext) -> None:
record_uri = item.uri
rkey = record_uri.rsplit("/", 1)[-1]
delete_data = {
"repo": context.did,
"collection": self.collection,
"rkey": rkey
}
delete_data = models.ComAtprotoRepoDeleteRecord.Data(
repo=context.did,
collection=self.collection,
rkey=rkey,
)
context.client.com.atproto.repo.delete_record(data=delete_data)
context.logger.debug(f"Deleted: {record_uri}")
@@ -99,13 +133,15 @@ class FeedStrategy(BaseStrategy):
return context.client.get_author_feed(
actor=context.did, limit=context.batch_size, cursor=cursor
)
return context.client.get_author_feed(actor=context.did, limit=context.batch_size)
return context.client.get_author_feed(
actor=context.did, limit=context.batch_size
)
def extract_items(self, response: Any) -> list[Any]:
return response.feed
def process_item(self, post: Any, context: OperationContext) -> None:
uri = post.post.uri
def process_item(self, item: Any, context: OperationContext) -> None:
uri = item.post.uri
context.client.delete_post(uri)
context.logger.debug(f"Deleted post: {uri}")
@@ -113,16 +149,15 @@ class FeedStrategy(BaseStrategy):
class BookmarkStrategy(BaseStrategy):
def fetch(self, context: OperationContext, cursor: str | None = None) -> Any:
get_params = models.AppBskyBookmarkGetBookmarks.Params(
limit=context.batch_size,
cursor=cursor
limit=context.batch_size, cursor=cursor
)
return context.client.app.bsky.bookmark.get_bookmarks(params=get_params)
def extract_items(self, response: Any) -> list[Any]:
return response.bookmarks
def process_item(self, bookmark: Any, context: OperationContext) -> None:
bookmark_uri = self._extract_bookmark_uri(bookmark)
def process_item(self, item: Any, context: OperationContext) -> None:
bookmark_uri = self._extract_bookmark_uri(item)
if not bookmark_uri:
raise ValueError("Unable to find bookmark URI")
@@ -151,7 +186,7 @@ class Operation:
collection: str | None = None,
filter_fn: Callable[[Any], bool] | None = None,
client=None,
config_data=None
config_data=None,
):
self.operation_name = operation_name
self.filter_fn = filter_fn
@@ -168,7 +203,7 @@ class Operation:
else:
self.strategy = FeedStrategy()
def run(self) -> int:
def run(self) -> OperationResult:
context = OperationContext(
client=self._client, config_data=self._config_data)
progress = ProgressTracker(operation=self.operation_name)
@@ -178,7 +213,7 @@ class Operation:
)
cursor = None
total_processed = 0
result = OperationResult()
batch_num = 0
while True:
@@ -187,9 +222,16 @@ class Operation:
response = self.strategy.fetch(context, cursor)
items = self.strategy.extract_items(response)
except Exception as e:
context.logger.error(
f"Error fetching items for batch {batch_num}: {e}", exc_info=True)
break
raise OperationFetchError(
self.operation_name,
batch_num,
e,
OperationResult(
processed=result.processed,
failed=result.failed,
skipped=result.skipped,
),
) from e
if not items:
break
@@ -198,13 +240,15 @@ class Operation:
for item in items:
if self.filter_fn and not self.filter_fn(item):
result.skipped += 1
continue
try:
self.strategy.process_item(item, context)
total_processed += 1
result.processed += 1
progress.update(1)
except Exception as e:
result.failed += 1
context.logger.error(f"Error processing item: {e}")
cursor = self.strategy.get_cursor(response)
@@ -215,5 +259,6 @@ class Operation:
time.sleep(context.delay)
context.logger.info(
f"{self.operation_name}: {total_processed} items processed.")
return total_processed
f"{self.operation_name}: processed={result.processed}, failed={result.failed}, skipped={result.skipped}."
)
return result

View File

@@ -1,4 +1,5 @@
import logging
from types import SimpleNamespace
import pytest
@@ -21,6 +22,7 @@ def test_create_operation_handler_calls_confirmation_and_run(monkeypatch):
def run(self):
calls["run"] += 1
return SimpleNamespace(processed=1, failed=0, skipped=0)
monkeypatch.setattr(commands, "require_confirmation", fake_confirm)
monkeypatch.setattr(commands, "Operation", FakeOperation)
@@ -77,6 +79,7 @@ def test_run_all_runs_in_order(monkeypatch):
def run(self):
ran.append(self.operation_name)
return SimpleNamespace(processed=1, failed=0, skipped=0)
class FakeOperationContext:
def __init__(self):
@@ -115,6 +118,7 @@ def test_run_all_continues_on_error(monkeypatch):
ran.append(self.operation_name)
if self.operation_name == "Deleting posts":
raise RuntimeError("fail")
return SimpleNamespace(processed=1, failed=0, skipped=0)
class FakeOperationContext:
def __init__(self):
@@ -130,3 +134,48 @@ def test_run_all_continues_on_error(monkeypatch):
assert "Deleting posts" in ran
assert len(ran) >= 2
def test_run_all_raises_on_fetch_failure(monkeypatch):
class FakeOperation:
def __init__(self, operation_name, **kwargs):
self.operation_name = operation_name
def run(self):
if self.operation_name == "Deleting posts":
raise operations.OperationFetchError(
"Deleting posts",
1,
RuntimeError("api unavailable"),
operations.OperationResult(),
)
return SimpleNamespace(processed=1, failed=0, skipped=0)
class FakeOperationContext:
def __init__(self):
self.client = object()
self.config_data = {"batch_size": 1, "delay": 0}
monkeypatch.setattr(commands, "require_confirmation",
lambda *args, **kwargs: None)
monkeypatch.setattr(commands, "Operation", FakeOperation)
monkeypatch.setattr(operations, "OperationContext", FakeOperationContext)
monkeypatch.setattr(
commands,
"COMMAND_METADATA",
{
"posts": {
"operation_name": "Deleting posts",
"strategy_type": "feed",
"collection": None,
"filter_fn": None,
}
},
)
monkeypatch.setattr(
commands.registry, "get_all_commands", lambda: {"posts": "only posts"}
)
monkeypatch.setattr(commands, "COMMAND_EXECUTION_ORDER", ["posts"])
with pytest.raises(RuntimeError, match="failed while fetching items"):
commands.run_all(skip_confirmation=True)

View File

@@ -2,7 +2,7 @@ import time
import pytest
from skywipe.operations import Operation, BookmarkStrategy
from skywipe.operations import Operation, BookmarkStrategy, OperationFetchError
class FakeClient:
@@ -60,10 +60,42 @@ def test_operation_run_batches_filters_and_sleeps(monkeypatch):
total = operation.run()
assert total == 2
assert total.processed == 2
assert total.failed == 1
assert total.skipped == 1
assert slept == [1]
def test_operation_run_raises_on_fetch_error(monkeypatch):
class FetchErrorStrategy:
def fetch(self, context, cursor=None):
raise RuntimeError("api down")
def extract_items(self, response):
return []
def process_item(self, item, context):
return None
def get_cursor(self, response):
return None
operation = Operation(
"Testing",
strategy_type="feed",
client=FakeClient(),
config_data={"batch_size": 2, "delay": 0},
)
operation.strategy = FetchErrorStrategy()
with pytest.raises(OperationFetchError, match="Failed to fetch items") as exc:
operation.run()
assert exc.value.result.processed == 0
assert exc.value.result.failed == 0
assert exc.value.result.skipped == 0
def test_bookmark_strategy_extracts_uri_from_shapes():
strategy = BookmarkStrategy()

View File

@@ -55,11 +55,9 @@ def test_record_deletion_strategy_fetch_and_process(monkeypatch):
record = SimpleNamespace(uri="at://did:plc:fake/app.bsky.feed.like/abc123")
strategy.process_item(record, context)
assert captured["delete"] == {
"repo": "did:plc:fake",
"collection": "app.bsky.feed.like",
"rkey": "abc123",
}
assert captured["delete"].repo == "did:plc:fake"
assert captured["delete"].collection == "app.bsky.feed.like"
assert captured["delete"].rkey == "abc123"
def test_feed_strategy_fetch_and_process():