Compare commits
8 Commits
6a88ab8560
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 2cdc4c6c42 | |||
| 07862f0ea2 | |||
| e095c68f72 | |||
| c718e8c6f5 | |||
| a07cc02fb0 | |||
| ecc33054af | |||
| e6d68dd37d | |||
| 809b7823ea |
@@ -61,6 +61,7 @@ def main():
|
||||
setup_logger(verbose=False, log_file=LOG_FILE)
|
||||
logger = get_logger()
|
||||
|
||||
try:
|
||||
if registry.requires_config(args.command):
|
||||
require_config(logger)
|
||||
config = Configuration()
|
||||
@@ -68,7 +69,6 @@ def main():
|
||||
verbose = config_data.get("verbose", False)
|
||||
setup_logger(verbose=verbose, log_file=LOG_FILE)
|
||||
|
||||
try:
|
||||
registry.execute(
|
||||
args.command, skip_confirmation=getattr(args, "yes", False))
|
||||
except (ValueError, Exception) as e:
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""Command implementations for Skywipe"""
|
||||
|
||||
from typing import Callable, Any
|
||||
from typing import Callable, Any, TypedDict
|
||||
from .configure import Configuration
|
||||
from .operations import Operation
|
||||
from .post_analysis import PostAnalyzer
|
||||
@@ -11,7 +11,16 @@ from .safeguard import require_confirmation
|
||||
CommandHandler = Callable[..., None]
|
||||
|
||||
|
||||
COMMAND_METADATA = {
|
||||
class CommandMetadata(TypedDict):
|
||||
confirmation: str
|
||||
help_text: str
|
||||
operation_name: str
|
||||
strategy_type: str
|
||||
collection: str | None
|
||||
filter_fn: Callable[[Any], bool] | None
|
||||
|
||||
|
||||
COMMAND_METADATA: dict[str, CommandMetadata] = {
|
||||
"posts": {
|
||||
"confirmation": "delete all posts",
|
||||
"help_text": "only posts",
|
||||
|
||||
@@ -56,65 +56,100 @@ class Configuration:
|
||||
def exists(self) -> bool:
|
||||
return self.config_file.exists()
|
||||
|
||||
def create(self):
|
||||
logger = setup_logger(verbose=False)
|
||||
if self.exists():
|
||||
def _confirm_overwrite(self, logger) -> bool:
|
||||
if not self.exists():
|
||||
return True
|
||||
overwrite = input(
|
||||
"Configuration already exists. Overwrite? (y/N): ").strip().lower()
|
||||
if overwrite not in ("y", "yes"):
|
||||
if overwrite in ("y", "yes"):
|
||||
return True
|
||||
logger.info("Configuration creation cancelled.")
|
||||
return
|
||||
return False
|
||||
|
||||
def _ensure_config_dir(self) -> None:
|
||||
config_dir = self.config_file.parent
|
||||
config_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
print("Skywipe Configuration")
|
||||
print("=" * 50)
|
||||
print("Note: You should use an app password from Bluesky settings.")
|
||||
|
||||
def _prompt_handle(self, logger) -> str:
|
||||
while True:
|
||||
handle = input("Bluesky handle: ").strip()
|
||||
is_valid, error_msg = _validate_handle(handle)
|
||||
if is_valid:
|
||||
break
|
||||
return handle
|
||||
logger.error(error_msg)
|
||||
logger.info("Please enter a valid handle and try again.")
|
||||
|
||||
def _prompt_password(self, logger) -> str:
|
||||
while True:
|
||||
password = getpass(
|
||||
"Bluesky (hopefully app) password: ").strip()
|
||||
is_valid, error_msg = _validate_password(password)
|
||||
if is_valid:
|
||||
break
|
||||
return password
|
||||
logger.error(error_msg)
|
||||
logger.info("Please check your password and try again.")
|
||||
logger.info(
|
||||
"Generate an app password at: https://bsky.app/settings/app-passwords")
|
||||
|
||||
def _parse_batch_size(self, logger) -> int | None:
|
||||
batch_size = input("Batch size (default: 10): ").strip() or "10"
|
||||
delay = input(
|
||||
"Delay between batches in seconds (default: 1): ").strip() or "1"
|
||||
verbose_input = input(
|
||||
"Verbose mode (y/n, default: y): ").strip().lower() or "y"
|
||||
verbose = verbose_input in ("y", "yes", "true", "1")
|
||||
|
||||
try:
|
||||
batch_size = int(batch_size)
|
||||
if batch_size < 1 or batch_size > 100:
|
||||
logger.error("batch_size must be between 1 and 100")
|
||||
return
|
||||
batch_size_int = int(batch_size)
|
||||
except ValueError:
|
||||
logger.error("batch_size must be an integer")
|
||||
return
|
||||
return None
|
||||
if batch_size_int < 1 or batch_size_int > 100:
|
||||
logger.error("batch_size must be between 1 and 100")
|
||||
return None
|
||||
return batch_size_int
|
||||
|
||||
def _parse_delay(self, logger) -> int | None:
|
||||
delay = input(
|
||||
"Delay between batches in seconds (default: 1): ").strip() or "1"
|
||||
try:
|
||||
delay = int(delay)
|
||||
if delay < 0 or delay > 60:
|
||||
logger.error("delay must be between 0 and 60 seconds")
|
||||
return
|
||||
delay_int = int(delay)
|
||||
except ValueError:
|
||||
logger.error("delay must be an integer")
|
||||
return None
|
||||
if delay_int < 0 or delay_int > 60:
|
||||
logger.error("delay must be between 0 and 60 seconds")
|
||||
return None
|
||||
return delay_int
|
||||
|
||||
def _parse_verbose(self) -> bool:
|
||||
verbose_input = input(
|
||||
"Verbose mode (y/n, default: y): ").strip().lower() or "y"
|
||||
return verbose_input in ("y", "yes", "true", "1")
|
||||
|
||||
def _write_config(self, logger, config_data: dict) -> None:
|
||||
try:
|
||||
with open(self.config_file, "w") as f:
|
||||
yaml.dump(config_data, f, default_flow_style=False)
|
||||
except (IOError, OSError) as e:
|
||||
logger.error(f"Failed to save configuration: {e}")
|
||||
return
|
||||
logger.info(f"Configuration saved to {self.config_file}")
|
||||
|
||||
def create(self):
|
||||
logger = setup_logger(verbose=False)
|
||||
if not self._confirm_overwrite(logger):
|
||||
return
|
||||
|
||||
self._ensure_config_dir()
|
||||
|
||||
print("Skywipe Configuration")
|
||||
print("=" * 50)
|
||||
print("Note: You should use an app password from Bluesky settings.")
|
||||
|
||||
handle = self._prompt_handle(logger)
|
||||
password = self._prompt_password(logger)
|
||||
batch_size = self._parse_batch_size(logger)
|
||||
if batch_size is None:
|
||||
return
|
||||
delay = self._parse_delay(logger)
|
||||
if delay is None:
|
||||
return
|
||||
verbose = self._parse_verbose()
|
||||
|
||||
config_data = {
|
||||
"handle": handle,
|
||||
@@ -124,14 +159,7 @@ class Configuration:
|
||||
"verbose": verbose
|
||||
}
|
||||
|
||||
try:
|
||||
with open(self.config_file, "w") as f:
|
||||
yaml.dump(config_data, f, default_flow_style=False)
|
||||
except (IOError, OSError) as e:
|
||||
logger.error(f"Failed to save configuration: {e}")
|
||||
return
|
||||
|
||||
logger.info(f"Configuration saved to {self.config_file}")
|
||||
self._write_config(logger, config_data)
|
||||
|
||||
def load(self) -> dict:
|
||||
if not self.exists():
|
||||
|
||||
@@ -41,6 +41,7 @@ def setup_logger(verbose: bool = False, log_file: Path | None = None) -> logging
|
||||
logger.setLevel(target_level)
|
||||
|
||||
progress_logger = logging.getLogger("skywipe.progress")
|
||||
if not progress_logger.handlers:
|
||||
progress_logger.propagate = True
|
||||
|
||||
info_handler = None
|
||||
@@ -60,8 +61,11 @@ def setup_logger(verbose: bool = False, log_file: Path | None = None) -> logging
|
||||
|
||||
if info_handler is None:
|
||||
info_handler = logging.StreamHandler(sys.stdout)
|
||||
info_handler.addFilter(LevelFilter(logging.DEBUG, logging.INFO))
|
||||
logger.addHandler(info_handler)
|
||||
for existing in list(info_handler.filters):
|
||||
if isinstance(existing, LevelFilter):
|
||||
info_handler.removeFilter(existing)
|
||||
info_handler.addFilter(LevelFilter(logging.DEBUG, logging.INFO))
|
||||
info_handler.setFormatter(formatter)
|
||||
info_handler.setLevel(target_level)
|
||||
|
||||
@@ -93,11 +97,18 @@ def get_logger() -> logging.Logger:
|
||||
return logging.getLogger("skywipe")
|
||||
|
||||
|
||||
def _format_error_message(error: Exception) -> str:
|
||||
if isinstance(error, KeyError):
|
||||
return str(error.args[0]) if error.args else str(error)
|
||||
return str(error)
|
||||
|
||||
|
||||
def handle_error(error: Exception, logger: logging.Logger, exit_on_error: bool = False) -> None:
|
||||
if isinstance(error, ValueError):
|
||||
logger.error(f"{error}")
|
||||
if isinstance(error, (KeyError, ValueError)):
|
||||
logger.error(_format_error_message(error))
|
||||
else:
|
||||
logger.error(f"Unexpected error: {error}", exc_info=True)
|
||||
logger.error(
|
||||
f"Unexpected error: {_format_error_message(error)}", exc_info=True)
|
||||
|
||||
if exit_on_error:
|
||||
sys.exit(1)
|
||||
|
||||
@@ -52,7 +52,16 @@ class OperationContext:
|
||||
|
||||
|
||||
class BaseStrategy:
|
||||
def get_cursor(self, response):
|
||||
def fetch(self, context: OperationContext, cursor: str | None = None) -> Any:
|
||||
raise NotImplementedError
|
||||
|
||||
def extract_items(self, response: Any) -> list[Any]:
|
||||
raise NotImplementedError
|
||||
|
||||
def process_item(self, item: Any, context: OperationContext) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
def get_cursor(self, response: Any) -> str | None:
|
||||
return response.cursor
|
||||
|
||||
|
||||
@@ -60,7 +69,7 @@ class RecordDeletionStrategy(BaseStrategy):
|
||||
def __init__(self, collection: str):
|
||||
self.collection = collection
|
||||
|
||||
def fetch(self, context: OperationContext, cursor: str | None = None):
|
||||
def fetch(self, context: OperationContext, cursor: str | None = None) -> Any:
|
||||
list_params = models.ComAtprotoRepoListRecords.Params(
|
||||
repo=context.did,
|
||||
collection=self.collection,
|
||||
@@ -69,10 +78,10 @@ class RecordDeletionStrategy(BaseStrategy):
|
||||
)
|
||||
return context.client.com.atproto.repo.list_records(params=list_params)
|
||||
|
||||
def extract_items(self, response):
|
||||
def extract_items(self, response: Any) -> list[Any]:
|
||||
return response.records
|
||||
|
||||
def process_item(self, record, context: OperationContext):
|
||||
def process_item(self, record: Any, context: OperationContext) -> None:
|
||||
record_uri = record.uri
|
||||
rkey = record_uri.rsplit("/", 1)[-1]
|
||||
delete_data = {
|
||||
@@ -85,34 +94,34 @@ class RecordDeletionStrategy(BaseStrategy):
|
||||
|
||||
|
||||
class FeedStrategy(BaseStrategy):
|
||||
def fetch(self, context: OperationContext, cursor: str | None = None):
|
||||
def fetch(self, context: OperationContext, cursor: str | None = None) -> Any:
|
||||
if cursor:
|
||||
return context.client.get_author_feed(
|
||||
actor=context.did, limit=context.batch_size, cursor=cursor
|
||||
)
|
||||
return context.client.get_author_feed(actor=context.did, limit=context.batch_size)
|
||||
|
||||
def extract_items(self, response):
|
||||
def extract_items(self, response: Any) -> list[Any]:
|
||||
return response.feed
|
||||
|
||||
def process_item(self, post, context: OperationContext):
|
||||
def process_item(self, post: Any, context: OperationContext) -> None:
|
||||
uri = post.post.uri
|
||||
context.client.delete_post(uri)
|
||||
context.logger.debug(f"Deleted post: {uri}")
|
||||
|
||||
|
||||
class BookmarkStrategy(BaseStrategy):
|
||||
def fetch(self, context: OperationContext, cursor: str | None = None):
|
||||
def fetch(self, context: OperationContext, cursor: str | None = None) -> Any:
|
||||
get_params = models.AppBskyBookmarkGetBookmarks.Params(
|
||||
limit=context.batch_size,
|
||||
cursor=cursor
|
||||
)
|
||||
return context.client.app.bsky.bookmark.get_bookmarks(params=get_params)
|
||||
|
||||
def extract_items(self, response):
|
||||
def extract_items(self, response: Any) -> list[Any]:
|
||||
return response.bookmarks
|
||||
|
||||
def process_item(self, bookmark, context: OperationContext):
|
||||
def process_item(self, bookmark: Any, context: OperationContext) -> None:
|
||||
bookmark_uri = self._extract_bookmark_uri(bookmark)
|
||||
if not bookmark_uri:
|
||||
raise ValueError("Unable to find bookmark URI")
|
||||
@@ -122,7 +131,7 @@ class BookmarkStrategy(BaseStrategy):
|
||||
context.client.app.bsky.bookmark.delete_bookmark(data=delete_data)
|
||||
context.logger.debug(f"Deleted bookmark: {bookmark_uri}")
|
||||
|
||||
def _extract_bookmark_uri(self, bookmark):
|
||||
def _extract_bookmark_uri(self, bookmark: Any) -> str | None:
|
||||
if hasattr(bookmark, "uri"):
|
||||
return bookmark.uri
|
||||
|
||||
@@ -148,6 +157,7 @@ class Operation:
|
||||
self.filter_fn = filter_fn
|
||||
self._client = client
|
||||
self._config_data = config_data
|
||||
self.strategy: BaseStrategy
|
||||
|
||||
if strategy_type == "record":
|
||||
if not collection:
|
||||
|
||||
@@ -13,7 +13,7 @@ def user_input(monkeypatch) -> Callable[[Iterable[str], Iterable[str]], None]:
|
||||
password_iter = iter(passwords)
|
||||
|
||||
monkeypatch.setattr("builtins.input", lambda _prompt: next(input_iter))
|
||||
monkeypatch.setattr("getpass.getpass",
|
||||
monkeypatch.setattr("skywipe.configure.getpass",
|
||||
lambda _prompt: next(password_iter))
|
||||
|
||||
return _set
|
||||
|
||||
@@ -50,9 +50,14 @@ def _setup_error_mocks(monkeypatch, calls, error_factory):
|
||||
monkeypatch.setattr(cli, "get_logger",
|
||||
lambda: logging.getLogger(TEST_LOGGER_NAME))
|
||||
|
||||
def _format_error_message(error):
|
||||
if isinstance(error, KeyError):
|
||||
return error.args[0] if error.args else str(error)
|
||||
return str(error)
|
||||
|
||||
def mock_handle_error(error, logger, exit_on_error=False):
|
||||
calls["handle_error"] = (type(error).__name__,
|
||||
str(error), exit_on_error)
|
||||
_format_error_message(error), exit_on_error)
|
||||
|
||||
monkeypatch.setattr(cli, "handle_error", mock_handle_error)
|
||||
|
||||
@@ -186,9 +191,14 @@ def test_main_handles_config_load_error(monkeypatch):
|
||||
def raise_config_error(self):
|
||||
raise RuntimeError("config error")
|
||||
|
||||
def _format_error_message(error):
|
||||
if isinstance(error, KeyError):
|
||||
return error.args[0] if error.args else str(error)
|
||||
return str(error)
|
||||
|
||||
def mock_handle_error(error, logger, exit_on_error=False):
|
||||
calls["handle_error"] = (type(error).__name__,
|
||||
str(error), exit_on_error)
|
||||
_format_error_message(error), exit_on_error)
|
||||
|
||||
_setup_parser_mocks(monkeypatch)
|
||||
monkeypatch.setattr(cli.registry, "requires_config", lambda name: True)
|
||||
|
||||
Reference in New Issue
Block a user