Compare commits

..

8 Commits

7 changed files with 133 additions and 65 deletions

View File

@@ -61,14 +61,14 @@ def main():
setup_logger(verbose=False, log_file=LOG_FILE) setup_logger(verbose=False, log_file=LOG_FILE)
logger = get_logger() logger = get_logger()
if registry.requires_config(args.command):
require_config(logger)
config = Configuration()
config_data = config.load()
verbose = config_data.get("verbose", False)
setup_logger(verbose=verbose, log_file=LOG_FILE)
try: try:
if registry.requires_config(args.command):
require_config(logger)
config = Configuration()
config_data = config.load()
verbose = config_data.get("verbose", False)
setup_logger(verbose=verbose, log_file=LOG_FILE)
registry.execute( registry.execute(
args.command, skip_confirmation=getattr(args, "yes", False)) args.command, skip_confirmation=getattr(args, "yes", False))
except (ValueError, Exception) as e: except (ValueError, Exception) as e:

View File

@@ -1,6 +1,6 @@
"""Command implementations for Skywipe""" """Command implementations for Skywipe"""
from typing import Callable, Any from typing import Callable, Any, TypedDict
from .configure import Configuration from .configure import Configuration
from .operations import Operation from .operations import Operation
from .post_analysis import PostAnalyzer from .post_analysis import PostAnalyzer
@@ -11,7 +11,16 @@ from .safeguard import require_confirmation
CommandHandler = Callable[..., None] CommandHandler = Callable[..., None]
COMMAND_METADATA = { class CommandMetadata(TypedDict):
confirmation: str
help_text: str
operation_name: str
strategy_type: str
collection: str | None
filter_fn: Callable[[Any], bool] | None
COMMAND_METADATA: dict[str, CommandMetadata] = {
"posts": { "posts": {
"confirmation": "delete all posts", "confirmation": "delete all posts",
"help_text": "only posts", "help_text": "only posts",

View File

@@ -56,65 +56,100 @@ class Configuration:
def exists(self) -> bool: def exists(self) -> bool:
return self.config_file.exists() return self.config_file.exists()
def create(self): def _confirm_overwrite(self, logger) -> bool:
logger = setup_logger(verbose=False) if not self.exists():
if self.exists(): return True
overwrite = input( overwrite = input(
"Configuration already exists. Overwrite? (y/N): ").strip().lower() "Configuration already exists. Overwrite? (y/N): ").strip().lower()
if overwrite not in ("y", "yes"): if overwrite in ("y", "yes"):
logger.info("Configuration creation cancelled.") return True
return logger.info("Configuration creation cancelled.")
return False
def _ensure_config_dir(self) -> None:
config_dir = self.config_file.parent config_dir = self.config_file.parent
config_dir.mkdir(parents=True, exist_ok=True) config_dir.mkdir(parents=True, exist_ok=True)
print("Skywipe Configuration") def _prompt_handle(self, logger) -> str:
print("=" * 50)
print("Note: You should use an app password from Bluesky settings.")
while True: while True:
handle = input("Bluesky handle: ").strip() handle = input("Bluesky handle: ").strip()
is_valid, error_msg = _validate_handle(handle) is_valid, error_msg = _validate_handle(handle)
if is_valid: if is_valid:
break return handle
logger.error(error_msg) logger.error(error_msg)
logger.info("Please enter a valid handle and try again.") logger.info("Please enter a valid handle and try again.")
def _prompt_password(self, logger) -> str:
while True: while True:
password = getpass( password = getpass(
"Bluesky (hopefully app) password: ").strip() "Bluesky (hopefully app) password: ").strip()
is_valid, error_msg = _validate_password(password) is_valid, error_msg = _validate_password(password)
if is_valid: if is_valid:
break return password
logger.error(error_msg) logger.error(error_msg)
logger.info("Please check your password and try again.") logger.info("Please check your password and try again.")
logger.info( logger.info(
"Generate an app password at: https://bsky.app/settings/app-passwords") "Generate an app password at: https://bsky.app/settings/app-passwords")
def _parse_batch_size(self, logger) -> int | None:
batch_size = input("Batch size (default: 10): ").strip() or "10" batch_size = input("Batch size (default: 10): ").strip() or "10"
delay = input(
"Delay between batches in seconds (default: 1): ").strip() or "1"
verbose_input = input(
"Verbose mode (y/n, default: y): ").strip().lower() or "y"
verbose = verbose_input in ("y", "yes", "true", "1")
try: try:
batch_size = int(batch_size) batch_size_int = int(batch_size)
if batch_size < 1 or batch_size > 100:
logger.error("batch_size must be between 1 and 100")
return
except ValueError: except ValueError:
logger.error("batch_size must be an integer") logger.error("batch_size must be an integer")
return return None
if batch_size_int < 1 or batch_size_int > 100:
logger.error("batch_size must be between 1 and 100")
return None
return batch_size_int
def _parse_delay(self, logger) -> int | None:
delay = input(
"Delay between batches in seconds (default: 1): ").strip() or "1"
try: try:
delay = int(delay) delay_int = int(delay)
if delay < 0 or delay > 60:
logger.error("delay must be between 0 and 60 seconds")
return
except ValueError: except ValueError:
logger.error("delay must be an integer") logger.error("delay must be an integer")
return None
if delay_int < 0 or delay_int > 60:
logger.error("delay must be between 0 and 60 seconds")
return None
return delay_int
def _parse_verbose(self) -> bool:
verbose_input = input(
"Verbose mode (y/n, default: y): ").strip().lower() or "y"
return verbose_input in ("y", "yes", "true", "1")
def _write_config(self, logger, config_data: dict) -> None:
try:
with open(self.config_file, "w") as f:
yaml.dump(config_data, f, default_flow_style=False)
except (IOError, OSError) as e:
logger.error(f"Failed to save configuration: {e}")
return return
logger.info(f"Configuration saved to {self.config_file}")
def create(self):
logger = setup_logger(verbose=False)
if not self._confirm_overwrite(logger):
return
self._ensure_config_dir()
print("Skywipe Configuration")
print("=" * 50)
print("Note: You should use an app password from Bluesky settings.")
handle = self._prompt_handle(logger)
password = self._prompt_password(logger)
batch_size = self._parse_batch_size(logger)
if batch_size is None:
return
delay = self._parse_delay(logger)
if delay is None:
return
verbose = self._parse_verbose()
config_data = { config_data = {
"handle": handle, "handle": handle,
@@ -124,14 +159,7 @@ class Configuration:
"verbose": verbose "verbose": verbose
} }
try: self._write_config(logger, config_data)
with open(self.config_file, "w") as f:
yaml.dump(config_data, f, default_flow_style=False)
except (IOError, OSError) as e:
logger.error(f"Failed to save configuration: {e}")
return
logger.info(f"Configuration saved to {self.config_file}")
def load(self) -> dict: def load(self) -> dict:
if not self.exists(): if not self.exists():

View File

@@ -41,7 +41,8 @@ def setup_logger(verbose: bool = False, log_file: Path | None = None) -> logging
logger.setLevel(target_level) logger.setLevel(target_level)
progress_logger = logging.getLogger("skywipe.progress") progress_logger = logging.getLogger("skywipe.progress")
progress_logger.propagate = True if not progress_logger.handlers:
progress_logger.propagate = True
info_handler = None info_handler = None
error_handler = None error_handler = None
@@ -60,8 +61,11 @@ def setup_logger(verbose: bool = False, log_file: Path | None = None) -> logging
if info_handler is None: if info_handler is None:
info_handler = logging.StreamHandler(sys.stdout) info_handler = logging.StreamHandler(sys.stdout)
info_handler.addFilter(LevelFilter(logging.DEBUG, logging.INFO))
logger.addHandler(info_handler) logger.addHandler(info_handler)
for existing in list(info_handler.filters):
if isinstance(existing, LevelFilter):
info_handler.removeFilter(existing)
info_handler.addFilter(LevelFilter(logging.DEBUG, logging.INFO))
info_handler.setFormatter(formatter) info_handler.setFormatter(formatter)
info_handler.setLevel(target_level) info_handler.setLevel(target_level)
@@ -93,11 +97,18 @@ def get_logger() -> logging.Logger:
return logging.getLogger("skywipe") return logging.getLogger("skywipe")
def _format_error_message(error: Exception) -> str:
if isinstance(error, KeyError):
return str(error.args[0]) if error.args else str(error)
return str(error)
def handle_error(error: Exception, logger: logging.Logger, exit_on_error: bool = False) -> None: def handle_error(error: Exception, logger: logging.Logger, exit_on_error: bool = False) -> None:
if isinstance(error, ValueError): if isinstance(error, (KeyError, ValueError)):
logger.error(f"{error}") logger.error(_format_error_message(error))
else: else:
logger.error(f"Unexpected error: {error}", exc_info=True) logger.error(
f"Unexpected error: {_format_error_message(error)}", exc_info=True)
if exit_on_error: if exit_on_error:
sys.exit(1) sys.exit(1)

View File

@@ -52,7 +52,16 @@ class OperationContext:
class BaseStrategy: class BaseStrategy:
def get_cursor(self, response): def fetch(self, context: OperationContext, cursor: str | None = None) -> Any:
raise NotImplementedError
def extract_items(self, response: Any) -> list[Any]:
raise NotImplementedError
def process_item(self, item: Any, context: OperationContext) -> None:
raise NotImplementedError
def get_cursor(self, response: Any) -> str | None:
return response.cursor return response.cursor
@@ -60,7 +69,7 @@ class RecordDeletionStrategy(BaseStrategy):
def __init__(self, collection: str): def __init__(self, collection: str):
self.collection = collection self.collection = collection
def fetch(self, context: OperationContext, cursor: str | None = None): def fetch(self, context: OperationContext, cursor: str | None = None) -> Any:
list_params = models.ComAtprotoRepoListRecords.Params( list_params = models.ComAtprotoRepoListRecords.Params(
repo=context.did, repo=context.did,
collection=self.collection, collection=self.collection,
@@ -69,10 +78,10 @@ class RecordDeletionStrategy(BaseStrategy):
) )
return context.client.com.atproto.repo.list_records(params=list_params) return context.client.com.atproto.repo.list_records(params=list_params)
def extract_items(self, response): def extract_items(self, response: Any) -> list[Any]:
return response.records return response.records
def process_item(self, record, context: OperationContext): def process_item(self, record: Any, context: OperationContext) -> None:
record_uri = record.uri record_uri = record.uri
rkey = record_uri.rsplit("/", 1)[-1] rkey = record_uri.rsplit("/", 1)[-1]
delete_data = { delete_data = {
@@ -85,34 +94,34 @@ class RecordDeletionStrategy(BaseStrategy):
class FeedStrategy(BaseStrategy): class FeedStrategy(BaseStrategy):
def fetch(self, context: OperationContext, cursor: str | None = None): def fetch(self, context: OperationContext, cursor: str | None = None) -> Any:
if cursor: if cursor:
return context.client.get_author_feed( return context.client.get_author_feed(
actor=context.did, limit=context.batch_size, cursor=cursor actor=context.did, limit=context.batch_size, cursor=cursor
) )
return context.client.get_author_feed(actor=context.did, limit=context.batch_size) return context.client.get_author_feed(actor=context.did, limit=context.batch_size)
def extract_items(self, response): def extract_items(self, response: Any) -> list[Any]:
return response.feed return response.feed
def process_item(self, post, context: OperationContext): def process_item(self, post: Any, context: OperationContext) -> None:
uri = post.post.uri uri = post.post.uri
context.client.delete_post(uri) context.client.delete_post(uri)
context.logger.debug(f"Deleted post: {uri}") context.logger.debug(f"Deleted post: {uri}")
class BookmarkStrategy(BaseStrategy): class BookmarkStrategy(BaseStrategy):
def fetch(self, context: OperationContext, cursor: str | None = None): def fetch(self, context: OperationContext, cursor: str | None = None) -> Any:
get_params = models.AppBskyBookmarkGetBookmarks.Params( get_params = models.AppBskyBookmarkGetBookmarks.Params(
limit=context.batch_size, limit=context.batch_size,
cursor=cursor cursor=cursor
) )
return context.client.app.bsky.bookmark.get_bookmarks(params=get_params) return context.client.app.bsky.bookmark.get_bookmarks(params=get_params)
def extract_items(self, response): def extract_items(self, response: Any) -> list[Any]:
return response.bookmarks return response.bookmarks
def process_item(self, bookmark, context: OperationContext): def process_item(self, bookmark: Any, context: OperationContext) -> None:
bookmark_uri = self._extract_bookmark_uri(bookmark) bookmark_uri = self._extract_bookmark_uri(bookmark)
if not bookmark_uri: if not bookmark_uri:
raise ValueError("Unable to find bookmark URI") raise ValueError("Unable to find bookmark URI")
@@ -122,7 +131,7 @@ class BookmarkStrategy(BaseStrategy):
context.client.app.bsky.bookmark.delete_bookmark(data=delete_data) context.client.app.bsky.bookmark.delete_bookmark(data=delete_data)
context.logger.debug(f"Deleted bookmark: {bookmark_uri}") context.logger.debug(f"Deleted bookmark: {bookmark_uri}")
def _extract_bookmark_uri(self, bookmark): def _extract_bookmark_uri(self, bookmark: Any) -> str | None:
if hasattr(bookmark, "uri"): if hasattr(bookmark, "uri"):
return bookmark.uri return bookmark.uri
@@ -148,6 +157,7 @@ class Operation:
self.filter_fn = filter_fn self.filter_fn = filter_fn
self._client = client self._client = client
self._config_data = config_data self._config_data = config_data
self.strategy: BaseStrategy
if strategy_type == "record": if strategy_type == "record":
if not collection: if not collection:

View File

@@ -13,7 +13,7 @@ def user_input(monkeypatch) -> Callable[[Iterable[str], Iterable[str]], None]:
password_iter = iter(passwords) password_iter = iter(passwords)
monkeypatch.setattr("builtins.input", lambda _prompt: next(input_iter)) monkeypatch.setattr("builtins.input", lambda _prompt: next(input_iter))
monkeypatch.setattr("getpass.getpass", monkeypatch.setattr("skywipe.configure.getpass",
lambda _prompt: next(password_iter)) lambda _prompt: next(password_iter))
return _set return _set

View File

@@ -50,9 +50,14 @@ def _setup_error_mocks(monkeypatch, calls, error_factory):
monkeypatch.setattr(cli, "get_logger", monkeypatch.setattr(cli, "get_logger",
lambda: logging.getLogger(TEST_LOGGER_NAME)) lambda: logging.getLogger(TEST_LOGGER_NAME))
def _format_error_message(error):
if isinstance(error, KeyError):
return error.args[0] if error.args else str(error)
return str(error)
def mock_handle_error(error, logger, exit_on_error=False): def mock_handle_error(error, logger, exit_on_error=False):
calls["handle_error"] = (type(error).__name__, calls["handle_error"] = (type(error).__name__,
str(error), exit_on_error) _format_error_message(error), exit_on_error)
monkeypatch.setattr(cli, "handle_error", mock_handle_error) monkeypatch.setattr(cli, "handle_error", mock_handle_error)
@@ -186,9 +191,14 @@ def test_main_handles_config_load_error(monkeypatch):
def raise_config_error(self): def raise_config_error(self):
raise RuntimeError("config error") raise RuntimeError("config error")
def _format_error_message(error):
if isinstance(error, KeyError):
return error.args[0] if error.args else str(error)
return str(error)
def mock_handle_error(error, logger, exit_on_error=False): def mock_handle_error(error, logger, exit_on_error=False):
calls["handle_error"] = (type(error).__name__, calls["handle_error"] = (type(error).__name__,
str(error), exit_on_error) _format_error_message(error), exit_on_error)
_setup_parser_mocks(monkeypatch) _setup_parser_mocks(monkeypatch)
monkeypatch.setattr(cli.registry, "requires_config", lambda name: True) monkeypatch.setattr(cli.registry, "requires_config", lambda name: True)