Compare commits

..

85 Commits

Author SHA1 Message Date
2cdc4c6c42 refactor: tidy logger handler setup 2026-01-15 16:06:32 +01:00
07862f0ea2 refactor: simplify configuration create flow 2026-01-15 16:02:21 +01:00
e095c68f72 test: share error message formatting helper 2026-01-08 06:51:56 +01:00
c718e8c6f5 test: patch configure getpass correctly 2026-01-08 06:51:46 +01:00
a07cc02fb0 fix: format KeyError messages cleanly 2026-01-08 06:51:37 +01:00
ecc33054af fix: handle config load errors via handler 2026-01-08 06:51:23 +01:00
e6d68dd37d refactor: type strategy interface 2026-01-05 20:38:20 +01:00
809b7823ea refactor: type command metadata 2026-01-05 20:38:09 +01:00
6a88ab8560 refactor: import getpass directly 2025-12-31 08:22:27 +01:00
dd3b220a4a test: use shared config fixture 2025-12-30 23:09:01 +01:00
82b99da50d test: add config_with_tmp_path fixture 2025-12-30 23:08:51 +01:00
ce2a1ad594 test: expand configure load tests with fixture and error cases 2025-12-30 23:04:45 +01:00
313b6fc453 test: refactor CLI tests and expand coverage 2025-12-30 18:29:34 +01:00
e364598414 test: expect progress propagation 2025-12-30 18:16:51 +01:00
cd1cf1f170 fix: restore progress logger propagation 2025-12-30 18:16:44 +01:00
c3761d1d08 test: cover formatter reset on reuse 2025-12-30 18:13:56 +01:00
85f1ea4efb fix: reset stream formatters in setup_logger 2025-12-30 18:13:51 +01:00
df22b3dd3d test: cover logger propagation behavior 2025-12-30 18:11:48 +01:00
8c0bbceeac fix: disable skywipe log propagation 2025-12-30 18:11:38 +01:00
5e60374937 test: add tests for ProgressTracker.batch with total_batches=0 2025-12-30 17:46:29 +01:00
fd62bb5ea2 fix: use "is not None" check for total_batches in ProgressTracker.batch 2025-12-30 17:46:22 +01:00
6785ecd45a test: strengthen logger handler duplication test 2025-12-30 08:53:00 +01:00
7828989150 refactor: dedupe file handler cleanup in setup_logger 2025-12-30 08:45:39 +01:00
9eb2ed0097 test: verify FileHandler replacement when log_file path changes 2025-12-30 08:31:14 +01:00
5c8932599c fix: replace existing FileHandler when log_file changes 2025-12-30 08:30:59 +01:00
b2af41d5fb style: prefer PEP 604/585 type hints 2025-12-23 05:14:38 +01:00
6de91e2bb9 build: package = true 2025-12-23 04:55:49 +01:00
d026c53c0a chore: update uv.lock 2025-12-23 04:55:42 +01:00
d6ce77ab15 test: cover operation run + bookmark parsing 2025-12-23 04:50:21 +01:00
b6e0c55c3e test: cover operation contect error paths 2025-12-23 04:50:10 +01:00
3b84be90b7 test: cover run_all ordering and errors 2025-12-23 04:49:46 +01:00
b8f6953a17 test: cover config create flows 2025-12-23 04:49:32 +01:00
7be7922b08 test: cover config load and errors 2025-12-23 04:49:13 +01:00
c669bc9de7 test: cover strategy fetch/process behaviors 2025-12-23 04:48:50 +01:00
155cb927ba test: cover handle/password validation 2025-12-23 04:48:29 +01:00
769a1af58c test: cover cli parser and main flow 2025-12-23 04:48:07 +01:00
4e04a9d7b7 test: cover command registry basics 2025-12-23 04:47:48 +01:00
875feb204c test: cover confirmation prompt handling 2025-12-23 04:47:06 +01:00
02364e54c6 test: cover logger helpers 2025-12-23 04:46:49 +01:00
7ca3c8e969 docs: how to for unit test 2025-12-23 04:46:24 +01:00
45b43e7062 build: update uv.lock 2025-12-23 04:46:16 +01:00
e518f96e9d build: add pytest as optional 2025-12-23 04:46:07 +01:00
1c4a256641 test: add shared input/password fixture 2025-12-23 04:45:48 +01:00
66594d9f59 fix: adjust handler detection order 2025-12-23 04:44:01 +01:00
b336991d67 feat: add -v|--version flag 2025-12-20 23:05:42 +01:00
62c058e9ee feat: add version 2025-12-20 23:05:20 +01:00
35c8b5b8d1 refactor: use NamedTuple for validation results 2025-12-20 22:36:51 +01:00
54c3353667 refactor: simplify nested try-except blocks 2025-12-20 22:35:01 +01:00
93a124be2a clean: remove useless declarations 2025-12-20 22:33:54 +01:00
ecbee7a8ac refactor: pass logger instead of multiple get_logger() calls 2025-12-20 22:30:57 +01:00
eaf4e94d24 refactor: pass logger parameter and use error handling helper 2025-12-20 22:30:47 +01:00
97e166d5f7 refactor: make logger parameter optional 2025-12-20 22:30:33 +01:00
61e2d7f731 refactor: add handle_error() helper for centralized error handling 2025-12-20 22:30:21 +01:00
054dc01813 fix: ensure logger always writes to log file 2025-12-20 22:09:25 +01:00
799b1083ab refactor: add hacking at the bottom 2025-12-20 21:56:57 +01:00
15db235fe1 docs: update hacking section with example 2025-12-20 21:48:57 +01:00
d09dcf06cf docs: ready to fly 2025-12-20 21:47:36 +01:00
5b9589794e clean: shorter epilog 2025-12-20 21:46:16 +01:00
93b88917df refactor: consolidate all command metadata into a single structure 2025-12-20 21:26:11 +01:00
25618ab5bf refactor: simplify logger 2025-12-20 21:24:02 +01:00
81fa68ed08 fix: run commands in right order if all 2025-12-20 21:11:45 +01:00
ca6eaed146 feat: improve confirmation message handling 2025-12-20 21:10:00 +01:00
1b8b32027c feat: dependency injection to allow reusing an existing auth client 2025-12-20 21:07:50 +01:00
a6190aeb84 feat: update run_all to reuse auth and config 2025-12-20 21:07:43 +01:00
887169e7d2 feat: add some validation 2025-12-20 21:04:09 +01:00
9565e4008e feat: consistent error handling 2025-12-20 20:59:18 +01:00
c2aab71955 refactor: add basestrategy and make all class inherits to get rid of get_cursor() 2025-12-20 20:56:22 +01:00
97dd55981b refactor: use a factory pattern 2025-12-20 20:54:52 +01:00
0e91c95e9b refactor: extract common embed checking logic 2025-12-20 20:51:45 +01:00
a818df4a6c refactor: replace hardcoded tuple with constants (consistency+readability) 2025-12-20 20:50:06 +01:00
9aec57bd56 fix: logger's already setup 2025-12-20 20:48:28 +01:00
ec9943822c fix: replace hardcoded list with a registry-derived list 2025-12-20 20:47:15 +01:00
0240ff9f8e clean: remove duplicate run_operation, dead code now 2025-12-20 20:44:42 +01:00
45e2e1eb00 clean: remove is_logged() (not used) 2025-12-20 20:44:29 +01:00
ff20228fa6 docs: update roadmap 2025-12-20 17:25:02 +01:00
9d9c09d56a refactor: use our new Operation and PostAnalyzer tools 2025-12-20 17:24:29 +01:00
64355fbeeb clean: remove unused import 2025-12-20 17:24:15 +01:00
4a337e6b20 feat: add post analysis utilities 2025-12-20 17:24:05 +01:00
f27be4d603 refactor: add OperationContext and Operations classes 2025-12-20 17:23:54 +01:00
9d254ac4b7 refactor: delete unitary "actions" py files 2025-12-20 17:23:35 +01:00
590cc03ba4 docs: refactor warning 2025-12-20 16:03:39 +01:00
c493a99860 docs: inform about log 2025-12-20 16:01:07 +01:00
3f9ef6527f docs: update roadmap 2025-12-20 16:00:03 +01:00
f53e5bb527 feat: implement our safeguard in commands 2025-12-20 15:59:54 +01:00
f2854a0df5 feat: implement safeguard and relocate log file 2025-12-20 15:59:41 +01:00
31 changed files with 1974 additions and 682 deletions

View File

@@ -1,39 +1,33 @@
# Skywipe # Skywipe
Skywipe is a work-in-progress Python 3.13+ CLI that helps you wipe data from your Bluesky account using the AT Protocol SDK. Skywipe is a Python 3.13+ CLI that helps you wipe data from your Bluesky account using the AT Protocol SDK.
## Warning **Warning:** This tool performs _**destructive operations**_.
This tool performs destructive operations. Only use it if you intend to erase data from your Bluesky account. Only use it if you intend to permanently erase data from your Bluesky account.
## Requirements ## Requirements
Check [pyproject.toml](pyproject.toml). Python 3.13+.
You can use `uv` to install dependencies: The rest of the dependencies are listed in [pyproject.toml](pyproject.toml).
## Installation
Use [`pipx`](https://pipx.pypa.io/latest/installation/) to install `skywipe`:
```bash ```bash
git clone https://git.kharec.info/Kharec/skywipe.git pipx install git+https://git.kharec.info/Kharec/skywipe.git
cd skywipe
uv sync
``` ```
## How to run Run the tool and see available commands with:
While it's being developed, you can use the tool using `uv` :
```bash ```bash
uv run python -m skywipe.cli all # target everything skywipe -h
uv run python -m skywipe.cli configure # create configuration
uv run python -m skywipe.cli posts # delete posts
uv run python -m skywipe.cli medias # delete posts with medias
uv run python -m skywipe.cli likes # undo likes
uv run python -m skywipe.cli reposts # undo reposts
uv run python -m skywipe.cli quotes # delete quotes
uv run python -m skywipe.cli follows # unfollow all
uv run python -m skywipe.cli bookmarks # delete bookmarks
``` ```
Use the `--yes` flag to skip the confirmation prompt and proceed with the operation. A log of the operations will be saved in `~/.cache/skywipe/skywipe.log`.
## Configuration ## Configuration
If you run the tool for the first time, it will prompt you to use `skywipe configure` to create the configuration file, which is located in `~/.config/skywipe/config.yml` : If you run the tool for the first time, it will prompt you to use `skywipe configure` to create the configuration file, which is located in `~/.config/skywipe/config.yml` :
@@ -48,23 +42,30 @@ verbose: true
BE SURE TO USE A [BLUESKY APP PASSWORD](https://blueskyfeeds.com/faq-app-password) FOR OBVIOUS SECURITY REASONS. BE SURE TO USE A [BLUESKY APP PASSWORD](https://blueskyfeeds.com/faq-app-password) FOR OBVIOUS SECURITY REASONS.
## Roadmap ## Hacking
- [x] build cli parameter management You can use `uv` to install dependencies:
- [x] handle configuration logic
- [x] sign in to at protocol ```bash
- [x] delete posts in batch git clone https://git.kharec.info/Kharec/skywipe.git
- [x] only delete posts with media cd skywipe
- [x] undo likes uv sync
- [x] undo reposts ```
- [x] delete quotes
- [x] unfollow accounts Run unit tests with:
- [x] remove bookmarks
- [x] make `all` run the other commands ```bash
- [x] add simple progress and logging uv sync --extra dev
- [ ] add safeguards (confirmation, dry-run flag) uv run pytest
- [ ] decent code architecture ```
- [ ] installation and run process
If you want to test your changes, you can run the tool with:
```bash
uv run python -m skywipe.cli -h
uv run python -m skywipe.cli all
# or any other command
```
## License ## License

View File

@@ -6,5 +6,14 @@ readme = "README.md"
requires-python = ">=3.13" requires-python = ">=3.13"
dependencies = ["atproto>=0.0.65", "pyyaml>=6.0"] dependencies = ["atproto>=0.0.65", "pyyaml>=6.0"]
[project.optional-dependencies]
dev = ["pytest>=8.0"]
[project.scripts] [project.scripts]
skywipe = "skywipe.cli:main" skywipe = "skywipe.cli:main"
[tool.uv]
package = true
[tool.pytest.ini_options]
testpaths = ["tests"]

View File

@@ -0,0 +1 @@
__version__ = "0.1.0"

View File

@@ -10,7 +10,11 @@ class Auth:
self.client = None self.client = None
def login(self) -> Client: def login(self) -> Client:
try:
config_data = self.config.load() config_data = self.config.load()
except FileNotFoundError as e:
raise ValueError(
"Configuration file not found. Run 'skywipe configure' first.") from e
handle = config_data.get("handle") handle = config_data.get("handle")
password = config_data.get("password") password = config_data.get("password")
@@ -19,10 +23,11 @@ class Auth:
raise ValueError( raise ValueError(
"handle and password must be set in configuration") "handle and password must be set in configuration")
try:
self.client = Client() self.client = Client()
self.client.login(handle, password) self.client.login(handle, password)
except Exception as e:
raise ValueError(
f"Failed to authenticate: {e}") from e
return self.client return self.client
def is_logged(self) -> bool:
return bool(getattr(self.client, "me", None))

View File

@@ -1,78 +0,0 @@
"""Bookmark deletion module for Skywipe"""
import time
from atproto import models
from .auth import Auth
from .configure import Configuration
from .logger import get_logger, ProgressTracker
def delete_bookmarks():
logger = get_logger()
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
logger.info(
f"Starting bookmark deletion with batch_size={batch_size}, delay={delay}s")
cursor = None
total_deleted = 0
batch_num = 0
progress = ProgressTracker(operation="Deleting bookmarks")
while True:
batch_num += 1
get_params = models.AppBskyBookmarkGetBookmarks.Params(
limit=batch_size,
cursor=cursor
)
response = client.app.bsky.bookmark.get_bookmarks(params=get_params)
bookmarks = response.bookmarks
if not bookmarks:
break
progress.batch(batch_num, len(bookmarks))
for bookmark in bookmarks:
try:
bookmark_uri = None
if hasattr(bookmark, "uri"):
bookmark_uri = bookmark.uri
else:
for attr_name in ("subject", "record", "post", "item"):
if hasattr(bookmark, attr_name):
nested = getattr(bookmark, attr_name)
if hasattr(nested, "uri"):
bookmark_uri = nested.uri
break
if not bookmark_uri:
logger.debug("Skipping bookmark: unable to find uri")
continue
delete_data = models.AppBskyBookmarkDeleteBookmark.Data(
uri=bookmark_uri
)
client.app.bsky.bookmark.delete_bookmark(data=delete_data)
total_deleted += 1
progress.update(1)
logger.debug(f"Deleted bookmark: {bookmark_uri}")
except Exception as e:
bookmark_uri = getattr(bookmark, "uri", "unknown")
logger.error(f"Error deleting bookmark {bookmark_uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
logger.info(f"Deleted {total_deleted} bookmarks.")

View File

@@ -4,9 +4,13 @@ import sys
import argparse import argparse
from pathlib import Path from pathlib import Path
from . import __version__
from .commands import registry from .commands import registry
from .configure import Configuration from .configure import Configuration
from .logger import setup_logger from .logger import setup_logger, get_logger, handle_error
LOG_FILE = Path.home() / ".cache" / "skywipe" / "skywipe.log"
def create_parser(): def create_parser():
@@ -14,7 +18,19 @@ def create_parser():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="Clean your bluesky account with style.", description="Clean your bluesky account with style.",
epilog="WARNING: This tool performs destructive operations. Only use it if you intend to erase data from your Bluesky account." epilog="WARNING: This tool deletes your Bluesky data permanently."
)
parser.add_argument(
"-v", "--version",
action="version",
version=f"Skywipe {__version__}"
)
parser.add_argument(
"--yes",
action="store_true",
help="Skip confirmation prompt and proceed with destructive operations"
) )
subparsers = parser.add_subparsers( subparsers = parser.add_subparsers(
@@ -30,10 +46,9 @@ def create_parser():
return parser return parser
def require_config(): def require_config(logger):
config = Configuration() config = Configuration()
if not config.exists(): if not config.exists():
logger = setup_logger(verbose=False)
logger.error("Configuration file not found.") logger.error("Configuration file not found.")
logger.error("You must run 'skywipe configure' first.") logger.error("You must run 'skywipe configure' first.")
sys.exit(1) sys.exit(1)
@@ -43,26 +58,21 @@ def main():
parser = create_parser() parser = create_parser()
args = parser.parse_args() args = parser.parse_args()
setup_logger(verbose=False, log_file=LOG_FILE)
logger = get_logger()
try:
if registry.requires_config(args.command): if registry.requires_config(args.command):
require_config() require_config(logger)
config = Configuration() config = Configuration()
config_data = config.load() config_data = config.load()
verbose = config_data.get("verbose", False) verbose = config_data.get("verbose", False)
log_file = Path.home() / ".config" / "skywipe" / "skywipe.log" setup_logger(verbose=verbose, log_file=LOG_FILE)
setup_logger(verbose=verbose, log_file=log_file)
else:
setup_logger(verbose=False)
try: registry.execute(
registry.execute(args.command) args.command, skip_confirmation=getattr(args, "yes", False))
except ValueError as e: except (ValueError, Exception) as e:
logger = setup_logger(verbose=False) handle_error(e, logger, exit_on_error=True)
logger.error(f"{e}")
sys.exit(1)
except Exception as e:
logger = setup_logger(verbose=False)
logger.error(f"Unexpected error: {e}", exc_info=True)
sys.exit(1)
if __name__ == '__main__': if __name__ == '__main__':

View File

@@ -1,18 +1,93 @@
"""Command implementations for Skywipe""" """Command implementations for Skywipe"""
from typing import Callable, Dict, Optional from typing import Callable, Any, TypedDict
from .configure import Configuration from .configure import Configuration
from .posts import delete_all_posts from .operations import Operation
from .medias import delete_posts_with_medias from .post_analysis import PostAnalyzer
from .likes import undo_likes from .logger import get_logger, handle_error
from .reposts import undo_reposts from .safeguard import require_confirmation
from .quotes import delete_quotes_posts
from .follows import unfollow_all
from .bookmarks import delete_bookmarks
from .logger import get_logger
CommandHandler = Callable[[], None] CommandHandler = Callable[..., None]
class CommandMetadata(TypedDict):
confirmation: str
help_text: str
operation_name: str
strategy_type: str
collection: str | None
filter_fn: Callable[[Any], bool] | None
COMMAND_METADATA: dict[str, CommandMetadata] = {
"posts": {
"confirmation": "delete all posts",
"help_text": "only posts",
"operation_name": "Deleting posts",
"strategy_type": "feed",
"collection": None,
"filter_fn": None,
},
"medias": {
"confirmation": "delete all posts with media",
"help_text": "only posts with medias",
"operation_name": "Deleting posts with media",
"strategy_type": "feed",
"collection": None,
"filter_fn": lambda post: PostAnalyzer.has_media(post.post),
},
"likes": {
"confirmation": "undo all likes",
"help_text": "only likes",
"operation_name": "Undoing likes",
"strategy_type": "record",
"collection": "app.bsky.feed.like",
"filter_fn": None,
},
"reposts": {
"confirmation": "undo all reposts",
"help_text": "only reposts",
"operation_name": "Undoing reposts",
"strategy_type": "record",
"collection": "app.bsky.feed.repost",
"filter_fn": None,
},
"quotes": {
"confirmation": "delete all quote posts",
"help_text": "only quotes",
"operation_name": "Deleting quote posts",
"strategy_type": "feed",
"collection": None,
"filter_fn": lambda post: PostAnalyzer.has_quote(post.post),
},
"follows": {
"confirmation": "unfollow all accounts",
"help_text": "only follows",
"operation_name": "Unfollowing accounts",
"strategy_type": "record",
"collection": "app.bsky.graph.follow",
"filter_fn": None,
},
"bookmarks": {
"confirmation": "delete all bookmarks",
"help_text": "only bookmarks",
"operation_name": "Deleting bookmarks",
"strategy_type": "bookmark",
"collection": None,
"filter_fn": None,
},
}
COMMAND_EXECUTION_ORDER = [
"quotes",
"medias",
"posts",
"likes",
"reposts",
"follows",
"bookmarks",
]
class CommandRegistry: class CommandRegistry:
@@ -32,22 +107,25 @@ class CommandRegistry:
self._help_texts[name] = help_text self._help_texts[name] = help_text
self._requires_config[name] = requires_config self._requires_config[name] = requires_config
def get_handler(self, name: str) -> Optional[CommandHandler]: def get_handler(self, name: str) -> CommandHandler | None:
return self._commands.get(name) return self._commands.get(name)
def get_help_text(self, name: str) -> Optional[str]: def get_help_text(self, name: str) -> str | None:
return self._help_texts.get(name) return self._help_texts.get(name)
def requires_config(self, name: str) -> bool: def requires_config(self, name: str) -> bool:
return self._requires_config.get(name, True) return self._requires_config.get(name, True)
def get_all_commands(self) -> Dict[str, str]: def get_all_commands(self) -> dict[str, str]:
return self._help_texts.copy() return self._help_texts.copy()
def execute(self, name: str): def execute(self, name: str, skip_confirmation: bool = False):
handler = self.get_handler(name) handler = self.get_handler(name)
if handler: if handler:
if name == "configure":
handler() handler()
else:
handler(skip_confirmation)
else: else:
raise ValueError(f"Unknown command: {name}") raise ValueError(f"Unknown command: {name}")
@@ -55,48 +133,94 @@ class CommandRegistry:
registry = CommandRegistry() registry = CommandRegistry()
def _create_operation_handler(
confirmation_message: str,
operation_name: str,
strategy_type: str = "feed",
collection: str | None = None,
filter_fn: Callable[[Any], bool] | None = None
) -> CommandHandler:
logger = get_logger()
def handler(skip_confirmation: bool = False):
require_confirmation(confirmation_message, skip_confirmation, logger)
try:
Operation(
operation_name,
strategy_type=strategy_type,
collection=collection,
filter_fn=filter_fn
).run()
except (ValueError, Exception) as e:
handle_error(e, logger)
return handler
def run_configure(): def run_configure():
config = Configuration() config = Configuration()
config.create() config.create()
def run_posts(): def _create_command_handlers():
delete_all_posts() handlers = {}
for cmd, metadata in COMMAND_METADATA.items():
handlers[cmd] = _create_operation_handler(
metadata["confirmation"],
metadata["operation_name"],
strategy_type=metadata["strategy_type"],
collection=metadata["collection"],
filter_fn=metadata["filter_fn"]
)
return handlers
def run_medias(): _command_handlers = _create_command_handlers()
delete_posts_with_medias()
def run_likes(): def run_all(skip_confirmation: bool = False):
undo_likes()
def run_reposts():
undo_reposts()
def run_quotes():
delete_quotes_posts()
def run_follows():
unfollow_all()
def run_bookmarks():
delete_bookmarks()
def run_all():
logger = get_logger() logger = get_logger()
commands = ["posts", "likes", "reposts", "follows", "bookmarks"]
all_commands = registry.get_all_commands()
available_commands = [cmd for cmd in all_commands.keys()
if cmd not in ("configure", "all")]
commands = [cmd for cmd in COMMAND_EXECUTION_ORDER
if cmd in available_commands]
commands.extend([cmd for cmd in available_commands
if cmd not in COMMAND_EXECUTION_ORDER])
commands_str = ", ".join(commands)
all_confirmation = f"run all cleanup commands ({commands_str})"
require_confirmation(all_confirmation, skip_confirmation, logger)
logger.info("Running all cleanup commands...") logger.info("Running all cleanup commands...")
from .operations import OperationContext
try:
context = OperationContext()
shared_client = context.client
shared_config_data = context.config_data
except Exception as e:
logger.error(
f"Failed to initialize shared context: {e}", exc_info=True)
raise
for cmd in commands: for cmd in commands:
try: try:
logger.info(f"Starting command: {cmd}") logger.info(f"Starting command: {cmd}")
registry.execute(cmd) metadata = COMMAND_METADATA.get(cmd)
if metadata:
Operation(
metadata["operation_name"],
strategy_type=metadata["strategy_type"],
collection=metadata["collection"],
filter_fn=metadata["filter_fn"],
client=shared_client,
config_data=shared_config_data
).run()
else:
registry.execute(cmd, skip_confirmation=True)
logger.info(f"Completed command: {cmd}") logger.info(f"Completed command: {cmd}")
except Exception as e: except Exception as e:
logger.error(f"Error running '{cmd}': {e}", exc_info=True) logger.error(f"Error running '{cmd}': {e}", exc_info=True)
@@ -106,11 +230,6 @@ def run_all():
registry.register("configure", run_configure, registry.register("configure", run_configure,
"create configuration", requires_config=False) "create configuration", requires_config=False)
registry.register("posts", run_posts, "only posts") for cmd, metadata in COMMAND_METADATA.items():
registry.register("medias", run_medias, "only posts with medias") registry.register(cmd, _command_handlers[cmd], metadata["help_text"])
registry.register("likes", run_likes, "only likes")
registry.register("reposts", run_reposts, "only reposts")
registry.register("quotes", run_quotes, "only quotes")
registry.register("follows", run_follows, "only follows")
registry.register("bookmarks", run_bookmarks, "only bookmarks")
registry.register("all", run_all, "target everything") registry.register("all", run_all, "target everything")

View File

@@ -1,9 +1,52 @@
"""Core configuration module for Skywipe""" """Core configuration module for Skywipe"""
import getpass from getpass import getpass
import re
from pathlib import Path from pathlib import Path
from typing import NamedTuple
import yaml import yaml
from .logger import setup_logger, get_logger from .logger import setup_logger
class ValidationResult(NamedTuple):
is_valid: bool
error_message: str
def _validate_handle(handle: str) -> ValidationResult:
if not handle:
return ValidationResult(False, "Handle cannot be empty")
if len(handle) > 253:
return ValidationResult(False, "Handle is too long (max 253 characters)")
if " " in handle:
return ValidationResult(False, "Handle cannot contain spaces")
handle_pattern = re.compile(
r"^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$|"
r"^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$|"
r"^did:[a-z]+:[a-zA-Z0-9._-]+$"
)
if not handle_pattern.match(handle):
return ValidationResult(False, (
"Invalid handle format. "
"Use a username (e.g., 'alice'), full handle (e.g., 'alice.bsky.social'), "
"or DID (e.g., 'did:plc:...')"
))
return ValidationResult(True, "")
def _validate_password(password: str) -> ValidationResult:
if not password:
return ValidationResult(False, "Password cannot be empty")
if len(password) < 8:
return ValidationResult(False, "Password is too short (minimum 8 characters)")
return ValidationResult(True, "")
class Configuration: class Configuration:
@@ -13,35 +56,100 @@ class Configuration:
def exists(self) -> bool: def exists(self) -> bool:
return self.config_file.exists() return self.config_file.exists()
def create(self): def _confirm_overwrite(self, logger) -> bool:
logger = setup_logger(verbose=False) if not self.exists():
if self.exists(): return True
overwrite = input( overwrite = input(
"Configuration already exists. Overwrite? (y/N): ").strip().lower() "Configuration already exists. Overwrite? (y/N): ").strip().lower()
if overwrite not in ("y", "yes"): if overwrite in ("y", "yes"):
return True
logger.info("Configuration creation cancelled.") logger.info("Configuration creation cancelled.")
return return False
def _ensure_config_dir(self) -> None:
config_dir = self.config_file.parent config_dir = self.config_file.parent
config_dir.mkdir(parents=True, exist_ok=True) config_dir.mkdir(parents=True, exist_ok=True)
print("Skywipe Configuration") def _prompt_handle(self, logger) -> str:
print("=" * 50) while True:
handle = input("Bluesky handle: ").strip() handle = input("Bluesky handle: ").strip()
password = getpass.getpass("Bluesky app password: ").strip() is_valid, error_msg = _validate_handle(handle)
if is_valid:
return handle
logger.error(error_msg)
logger.info("Please enter a valid handle and try again.")
def _prompt_password(self, logger) -> str:
while True:
password = getpass(
"Bluesky (hopefully app) password: ").strip()
is_valid, error_msg = _validate_password(password)
if is_valid:
return password
logger.error(error_msg)
logger.info("Please check your password and try again.")
logger.info(
"Generate an app password at: https://bsky.app/settings/app-passwords")
def _parse_batch_size(self, logger) -> int | None:
batch_size = input("Batch size (default: 10): ").strip() or "10" batch_size = input("Batch size (default: 10): ").strip() or "10"
try:
batch_size_int = int(batch_size)
except ValueError:
logger.error("batch_size must be an integer")
return None
if batch_size_int < 1 or batch_size_int > 100:
logger.error("batch_size must be between 1 and 100")
return None
return batch_size_int
def _parse_delay(self, logger) -> int | None:
delay = input( delay = input(
"Delay between batches in seconds (default: 1): ").strip() or "1" "Delay between batches in seconds (default: 1): ").strip() or "1"
try:
delay_int = int(delay)
except ValueError:
logger.error("delay must be an integer")
return None
if delay_int < 0 or delay_int > 60:
logger.error("delay must be between 0 and 60 seconds")
return None
return delay_int
def _parse_verbose(self) -> bool:
verbose_input = input( verbose_input = input(
"Verbose mode (y/n, default: y): ").strip().lower() or "y" "Verbose mode (y/n, default: y): ").strip().lower() or "y"
verbose = verbose_input in ("y", "yes", "true", "1") return verbose_input in ("y", "yes", "true", "1")
def _write_config(self, logger, config_data: dict) -> None:
try: try:
batch_size = int(batch_size) with open(self.config_file, "w") as f:
delay = int(delay) yaml.dump(config_data, f, default_flow_style=False)
except ValueError: except (IOError, OSError) as e:
logger.error("batch_size and delay must be integers") logger.error(f"Failed to save configuration: {e}")
return return
logger.info(f"Configuration saved to {self.config_file}")
def create(self):
logger = setup_logger(verbose=False)
if not self._confirm_overwrite(logger):
return
self._ensure_config_dir()
print("Skywipe Configuration")
print("=" * 50)
print("Note: You should use an app password from Bluesky settings.")
handle = self._prompt_handle(logger)
password = self._prompt_password(logger)
batch_size = self._parse_batch_size(logger)
if batch_size is None:
return
delay = self._parse_delay(logger)
if delay is None:
return
verbose = self._parse_verbose()
config_data = { config_data = {
"handle": handle, "handle": handle,
@@ -51,14 +159,21 @@ class Configuration:
"verbose": verbose "verbose": verbose
} }
with open(self.config_file, "w") as f: self._write_config(logger, config_data)
yaml.dump(config_data, f, default_flow_style=False)
logger.info(f"Configuration saved to {self.config_file}")
def load(self) -> dict: def load(self) -> dict:
if not self.exists(): if not self.exists():
raise FileNotFoundError( raise FileNotFoundError(
f"Configuration file not found: {self.config_file}") f"Configuration file not found: {self.config_file}")
try:
with open(self.config_file, "r") as f: with open(self.config_file, "r") as f:
return yaml.safe_load(f) config_data = yaml.safe_load(f)
if config_data is None:
raise ValueError("Configuration file is empty or invalid")
return config_data
except (IOError, OSError) as e:
raise ValueError(
f"Failed to read configuration file: {e}") from e
except yaml.YAMLError as e:
raise ValueError(
f"Invalid YAML in configuration file: {e}") from e

View File

@@ -1,73 +0,0 @@
"""Follow undoing module for Skywipe"""
import time
from atproto import models
from .auth import Auth
from .configure import Configuration
from .logger import get_logger, ProgressTracker
FOLLOW_COLLECTION = "app.bsky.graph.follow"
def unfollow_all():
logger = get_logger()
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
logger.info(
f"Starting unfollow operation with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_unfollowed = 0
batch_num = 0
progress = ProgressTracker(operation="Unfollowing accounts")
while True:
batch_num += 1
list_params = models.ComAtprotoRepoListRecords.Params(
repo=did,
collection=FOLLOW_COLLECTION,
limit=batch_size,
cursor=cursor
)
response = client.com.atproto.repo.list_records(params=list_params)
records = response.records
if not records:
break
progress.batch(batch_num, len(records))
for record in records:
try:
record_uri = record.uri
rkey = record_uri.rsplit("/", 1)[-1]
delete_data = {
"repo": did,
"collection": FOLLOW_COLLECTION,
"rkey": rkey
}
client.com.atproto.repo.delete_record(data=delete_data)
total_unfollowed += 1
progress.update(1)
logger.debug(f"Unfollowed: {record_uri}")
except Exception as e:
record_uri = getattr(record, "uri", "unknown")
logger.error(f"Error unfollowing {record_uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
logger.info(f"Unfollowed {total_unfollowed} accounts.")

View File

@@ -1,73 +0,0 @@
"""Like undoing module for Skywipe"""
import time
from atproto import models
from .auth import Auth
from .configure import Configuration
from .logger import get_logger, ProgressTracker
LIKE_COLLECTION = "app.bsky.feed.like"
def undo_likes():
logger = get_logger()
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
logger.info(
f"Starting like deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_undone = 0
batch_num = 0
progress = ProgressTracker(operation="Undoing likes")
while True:
batch_num += 1
list_params = models.ComAtprotoRepoListRecords.Params(
repo=did,
collection=LIKE_COLLECTION,
limit=batch_size,
cursor=cursor
)
response = client.com.atproto.repo.list_records(params=list_params)
records = response.records
if not records:
break
progress.batch(batch_num, len(records))
for record in records:
try:
record_uri = record.uri
rkey = record_uri.rsplit("/", 1)[-1]
delete_data = {
"repo": did,
"collection": LIKE_COLLECTION,
"rkey": rkey
}
client.com.atproto.repo.delete_record(data=delete_data)
total_undone += 1
progress.update(1)
logger.debug(f"Undone like: {record_uri}")
except Exception as e:
record_uri = getattr(record, "uri", "unknown")
logger.error(f"Error undoing like {record_uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
logger.info(f"Undone {total_undone} likes.")

View File

@@ -3,7 +3,6 @@
import logging import logging
import sys import sys
from pathlib import Path from pathlib import Path
from typing import Optional
class ProgressTracker: class ProgressTracker:
@@ -14,9 +13,9 @@ class ProgressTracker:
def update(self, count: int = 1): def update(self, count: int = 1):
self.current += count self.current += count
def batch(self, batch_num: int, batch_size: int, total_batches: Optional[int] = None): def batch(self, batch_num: int, batch_size: int, total_batches: int | None = None):
logger = logging.getLogger("skywipe.progress") logger = logging.getLogger("skywipe.progress")
if total_batches: if total_batches is not None:
logger.info( logger.info(
f"{self.operation} - batch {batch_num}/{total_batches} ({batch_size} items)" f"{self.operation} - batch {batch_num}/{total_batches} ({batch_size} items)"
) )
@@ -35,25 +34,50 @@ class LevelFilter(logging.Filter):
return self.min_level <= record.levelno <= self.max_level return self.min_level <= record.levelno <= self.max_level
def setup_logger(verbose: bool = False, log_file: Optional[Path] = None) -> logging.Logger: def setup_logger(verbose: bool = False, log_file: Path | None = None) -> logging.Logger:
logger = logging.getLogger("skywipe") logger = logging.getLogger("skywipe")
logger.setLevel(logging.DEBUG if verbose else logging.INFO) logger.propagate = False
target_level = logging.DEBUG if verbose else logging.INFO
logger.setLevel(target_level)
if logger.handlers: progress_logger = logging.getLogger("skywipe.progress")
return logger if not progress_logger.handlers:
progress_logger.propagate = True
info_handler = None
error_handler = None
file_handlers = []
for handler in logger.handlers:
if isinstance(handler, logging.FileHandler):
file_handlers.append(handler)
elif isinstance(handler, logging.StreamHandler):
if handler.stream == sys.stdout:
info_handler = handler
elif handler.stream == sys.stderr:
error_handler = handler
formatter = logging.Formatter(fmt="%(levelname)s: %(message)s") formatter = logging.Formatter(fmt="%(levelname)s: %(message)s")
if info_handler is None:
info_handler = logging.StreamHandler(sys.stdout) info_handler = logging.StreamHandler(sys.stdout)
info_handler.setLevel(logging.DEBUG if verbose else logging.INFO) logger.addHandler(info_handler)
for existing in list(info_handler.filters):
if isinstance(existing, LevelFilter):
info_handler.removeFilter(existing)
info_handler.addFilter(LevelFilter(logging.DEBUG, logging.INFO)) info_handler.addFilter(LevelFilter(logging.DEBUG, logging.INFO))
info_handler.setFormatter(formatter) info_handler.setFormatter(formatter)
logger.addHandler(info_handler) info_handler.setLevel(target_level)
if error_handler is None:
error_handler = logging.StreamHandler(sys.stderr) error_handler = logging.StreamHandler(sys.stderr)
error_handler.setLevel(logging.WARNING) error_handler.setLevel(logging.WARNING)
error_handler.setFormatter(formatter)
logger.addHandler(error_handler) logger.addHandler(error_handler)
error_handler.setFormatter(formatter)
for handler in file_handlers:
handler.close()
logger.removeHandler(handler)
if log_file: if log_file:
log_file.parent.mkdir(parents=True, exist_ok=True) log_file.parent.mkdir(parents=True, exist_ok=True)
@@ -71,3 +95,22 @@ def setup_logger(verbose: bool = False, log_file: Optional[Path] = None) -> logg
def get_logger() -> logging.Logger: def get_logger() -> logging.Logger:
return logging.getLogger("skywipe") return logging.getLogger("skywipe")
def _format_error_message(error: Exception) -> str:
if isinstance(error, KeyError):
return str(error.args[0]) if error.args else str(error)
return str(error)
def handle_error(error: Exception, logger: logging.Logger, exit_on_error: bool = False) -> None:
if isinstance(error, (KeyError, ValueError)):
logger.error(_format_error_message(error))
else:
logger.error(
f"Unexpected error: {_format_error_message(error)}", exc_info=True)
if exit_on_error:
sys.exit(1)
else:
raise error

View File

@@ -1,95 +0,0 @@
"""Media post deletion module for Skywipe"""
import time
from .auth import Auth
from .configure import Configuration
from .logger import get_logger, ProgressTracker
def delete_posts_with_medias():
logger = get_logger()
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
logger.info(
f"Starting media post deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_deleted = 0
batch_num = 0
progress = ProgressTracker(operation="Deleting posts with media")
while True:
batch_num += 1
response = client.get_author_feed(
actor=did, limit=batch_size, cursor=cursor)
posts = response.feed
if not posts:
break
progress.batch(batch_num, len(posts))
for post in posts:
post_record = post.post
embed = getattr(post_record, 'embed', None)
has_media = False
if embed:
embed_type = getattr(embed, 'py_type', None)
media_types = {
'app.bsky.embed.images',
'app.bsky.embed.video',
'app.bsky.embed.external'
}
if embed_type:
embed_type_base = embed_type.split('#')[0]
if embed_type_base in media_types:
has_media = True
if not has_media and embed_type_base in ('app.bsky.embed.recordWithMedia', 'app.bsky.embed.record_with_media'):
media = getattr(embed, 'media', None)
if media:
media_type = getattr(media, 'py_type', None)
if media_type:
media_type_base = media_type.split('#')[0]
if media_type_base in media_types:
has_media = True
if not has_media:
for attr in ('images', 'video', 'external'):
if hasattr(embed, attr):
has_media = True
break
if isinstance(embed, dict) and embed.get(attr):
has_media = True
break
if not has_media:
logger.debug(f"Skipping post without media: {post_record.uri}")
continue
try:
client.delete_post(post_record.uri)
total_deleted += 1
progress.update(1)
logger.debug(f"Deleted post with media: {post_record.uri}")
except Exception as e:
logger.error(f"Error deleting post {post_record.uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
logger.info(f"Deleted {total_deleted} posts with media.")

219
skywipe/operations.py Normal file
View File

@@ -0,0 +1,219 @@
"""Shared operation utilities and strategies for Skywipe"""
import time
from typing import Callable, Any
from atproto import models
from .auth import Auth
from .configure import Configuration
from .logger import get_logger, ProgressTracker
class OperationContext:
def __init__(self, client=None, config_data=None):
self.logger = get_logger()
self.client, self.did = self._initialize_client(client)
self.config_data = self._initialize_config(config_data)
self.batch_size = self.config_data.get("batch_size", 10)
self.delay = self.config_data.get("delay", 1)
def _initialize_client(self, client):
if client is not None:
return client, client.me.did
try:
auth = Auth()
client = auth.login()
return client, client.me.did
except (ValueError, FileNotFoundError) as e:
self.logger.error(f"Configuration error: {e}")
raise
except Exception as e:
self.logger.error(
f"Unexpected error during initialization: {e}", exc_info=True)
raise ValueError(
f"Failed to initialize operation context: {e}") from e
def _initialize_config(self, config_data):
if config_data is not None:
return config_data
try:
config = Configuration()
return config.load()
except (ValueError, FileNotFoundError) as e:
self.logger.error(f"Configuration error: {e}")
raise
except Exception as e:
self.logger.error(
f"Unexpected error loading configuration: {e}", exc_info=True)
raise ValueError(
f"Failed to load configuration: {e}") from e
class BaseStrategy:
def fetch(self, context: OperationContext, cursor: str | None = None) -> Any:
raise NotImplementedError
def extract_items(self, response: Any) -> list[Any]:
raise NotImplementedError
def process_item(self, item: Any, context: OperationContext) -> None:
raise NotImplementedError
def get_cursor(self, response: Any) -> str | None:
return response.cursor
class RecordDeletionStrategy(BaseStrategy):
def __init__(self, collection: str):
self.collection = collection
def fetch(self, context: OperationContext, cursor: str | None = None) -> Any:
list_params = models.ComAtprotoRepoListRecords.Params(
repo=context.did,
collection=self.collection,
limit=context.batch_size,
cursor=cursor
)
return context.client.com.atproto.repo.list_records(params=list_params)
def extract_items(self, response: Any) -> list[Any]:
return response.records
def process_item(self, record: Any, context: OperationContext) -> None:
record_uri = record.uri
rkey = record_uri.rsplit("/", 1)[-1]
delete_data = {
"repo": context.did,
"collection": self.collection,
"rkey": rkey
}
context.client.com.atproto.repo.delete_record(data=delete_data)
context.logger.debug(f"Deleted: {record_uri}")
class FeedStrategy(BaseStrategy):
def fetch(self, context: OperationContext, cursor: str | None = None) -> Any:
if cursor:
return context.client.get_author_feed(
actor=context.did, limit=context.batch_size, cursor=cursor
)
return context.client.get_author_feed(actor=context.did, limit=context.batch_size)
def extract_items(self, response: Any) -> list[Any]:
return response.feed
def process_item(self, post: Any, context: OperationContext) -> None:
uri = post.post.uri
context.client.delete_post(uri)
context.logger.debug(f"Deleted post: {uri}")
class BookmarkStrategy(BaseStrategy):
def fetch(self, context: OperationContext, cursor: str | None = None) -> Any:
get_params = models.AppBskyBookmarkGetBookmarks.Params(
limit=context.batch_size,
cursor=cursor
)
return context.client.app.bsky.bookmark.get_bookmarks(params=get_params)
def extract_items(self, response: Any) -> list[Any]:
return response.bookmarks
def process_item(self, bookmark: Any, context: OperationContext) -> None:
bookmark_uri = self._extract_bookmark_uri(bookmark)
if not bookmark_uri:
raise ValueError("Unable to find bookmark URI")
delete_data = models.AppBskyBookmarkDeleteBookmark.Data(
uri=bookmark_uri)
context.client.app.bsky.bookmark.delete_bookmark(data=delete_data)
context.logger.debug(f"Deleted bookmark: {bookmark_uri}")
def _extract_bookmark_uri(self, bookmark: Any) -> str | None:
if hasattr(bookmark, "uri"):
return bookmark.uri
for attr_name in ("subject", "record", "post", "item"):
if hasattr(bookmark, attr_name):
nested = getattr(bookmark, attr_name)
if hasattr(nested, "uri"):
return nested.uri
return None
class Operation:
def __init__(
self,
operation_name: str,
strategy_type: str = "feed",
collection: str | None = None,
filter_fn: Callable[[Any], bool] | None = None,
client=None,
config_data=None
):
self.operation_name = operation_name
self.filter_fn = filter_fn
self._client = client
self._config_data = config_data
self.strategy: BaseStrategy
if strategy_type == "record":
if not collection:
raise ValueError("Collection is required for record strategy")
self.strategy = RecordDeletionStrategy(collection)
elif strategy_type == "bookmark":
self.strategy = BookmarkStrategy()
else:
self.strategy = FeedStrategy()
def run(self) -> int:
context = OperationContext(
client=self._client, config_data=self._config_data)
progress = ProgressTracker(operation=self.operation_name)
context.logger.info(
f"Starting {self.operation_name} with batch_size={context.batch_size}, delay={context.delay}s"
)
cursor = None
total_processed = 0
batch_num = 0
while True:
batch_num += 1
try:
response = self.strategy.fetch(context, cursor)
items = self.strategy.extract_items(response)
except Exception as e:
context.logger.error(
f"Error fetching items for batch {batch_num}: {e}", exc_info=True)
break
if not items:
break
progress.batch(batch_num, len(items))
for item in items:
if self.filter_fn and not self.filter_fn(item):
continue
try:
self.strategy.process_item(item, context)
total_processed += 1
progress.update(1)
except Exception as e:
context.logger.error(f"Error processing item: {e}")
cursor = self.strategy.get_cursor(response)
if not cursor:
break
if context.delay > 0:
time.sleep(context.delay)
context.logger.info(
f"{self.operation_name}: {total_processed} items processed.")
return total_processed

70
skywipe/post_analysis.py Normal file
View File

@@ -0,0 +1,70 @@
"""Post analysis utilities for Skywipe"""
class PostAnalyzer:
MEDIA_TYPES = {
'app.bsky.embed.images',
'app.bsky.embed.video',
'app.bsky.embed.external'
}
QUOTE_TYPES = {
'app.bsky.embed.record',
'app.bsky.embed.recordWithMedia',
'app.bsky.embed.record_with_media'
}
QUOTE_WITH_MEDIA_TYPES = {
'app.bsky.embed.recordWithMedia',
'app.bsky.embed.record_with_media'
}
@staticmethod
def _get_embed(post_record):
return getattr(post_record, 'embed', None)
@staticmethod
def _get_embed_type_base(embed):
embed_type = getattr(embed, 'py_type', None)
if embed_type:
return embed_type.split('#')[0]
return None
@staticmethod
def has_media(post_record):
embed = PostAnalyzer._get_embed(post_record)
if not embed:
return False
embed_type_base = PostAnalyzer._get_embed_type_base(embed)
if embed_type_base:
if embed_type_base in PostAnalyzer.MEDIA_TYPES:
return True
if embed_type_base in PostAnalyzer.QUOTE_WITH_MEDIA_TYPES:
media = getattr(embed, 'media', None)
if media:
media_type_base = PostAnalyzer._get_embed_type_base(media)
if media_type_base and media_type_base in PostAnalyzer.MEDIA_TYPES:
return True
for attr in ('images', 'video', 'external'):
if hasattr(embed, attr):
return True
if isinstance(embed, dict) and embed.get(attr):
return True
return False
@staticmethod
def has_quote(post_record):
embed = PostAnalyzer._get_embed(post_record)
if not embed:
return False
embed_type_base = PostAnalyzer._get_embed_type_base(embed)
if embed_type_base and embed_type_base in PostAnalyzer.QUOTE_TYPES:
return True
return (hasattr(embed, 'record') or
(isinstance(embed, dict) and embed.get('record')))

View File

@@ -1,59 +0,0 @@
"""Post deletion module for Skywipe"""
import time
from .auth import Auth
from .configure import Configuration
from .logger import get_logger, ProgressTracker
def delete_all_posts():
logger = get_logger()
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
logger.info(
f"Starting post deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_deleted = 0
batch_num = 0
progress = ProgressTracker(operation="Deleting posts")
while True:
batch_num += 1
if cursor:
response = client.get_author_feed(
actor=did, limit=batch_size, cursor=cursor)
else:
response = client.get_author_feed(actor=did, limit=batch_size)
posts = response.feed
if not posts:
break
post_uris = [post.post.uri for post in posts]
progress.batch(batch_num, len(post_uris))
for uri in post_uris:
try:
client.delete_post(uri)
total_deleted += 1
progress.update(1)
logger.debug(f"Deleted post: {uri}")
except Exception as e:
logger.error(f"Error deleting post {uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
logger.info(f"Deleted {total_deleted} posts.")

View File

@@ -1,77 +0,0 @@
"""Quote post deletion module for Skywipe"""
import time
from .auth import Auth
from .configure import Configuration
from .logger import get_logger, ProgressTracker
def delete_quotes_posts():
logger = get_logger()
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
logger.info(f"Starting quote post deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_deleted = 0
batch_num = 0
progress = ProgressTracker(operation="Deleting quote posts")
while True:
batch_num += 1
response = client.get_author_feed(
actor=did, limit=batch_size, cursor=cursor)
posts = response.feed
if not posts:
break
progress.batch(batch_num, len(posts))
for post in posts:
post_record = post.post
embed = getattr(post_record, 'embed', None)
has_quote = False
if embed:
embed_type = getattr(embed, 'py_type', None)
if embed_type:
embed_type_base = embed_type.split('#')[0]
quote_types = {
'app.bsky.embed.record',
'app.bsky.embed.recordWithMedia',
'app.bsky.embed.record_with_media'
}
if embed_type_base in quote_types:
has_quote = True
if not has_quote and (hasattr(embed, 'record') or (isinstance(embed, dict) and embed.get('record'))):
has_quote = True
if not has_quote:
logger.debug(f"Skipping post without quote: {post_record.uri}")
continue
try:
client.delete_post(post_record.uri)
total_deleted += 1
progress.update(1)
logger.debug(f"Deleted quote post: {post_record.uri}")
except Exception as e:
logger.error(f"Error deleting quote post {post_record.uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
logger.info(f"Deleted {total_deleted} quote posts.")

View File

@@ -1,73 +0,0 @@
"""Repost undoing module for Skywipe"""
import time
from atproto import models
from .auth import Auth
from .configure import Configuration
from .logger import get_logger, ProgressTracker
REPOST_COLLECTION = "app.bsky.feed.repost"
def undo_reposts():
logger = get_logger()
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
logger.info(
f"Starting repost deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_undone = 0
batch_num = 0
progress = ProgressTracker(operation="Undoing reposts")
while True:
batch_num += 1
list_params = models.ComAtprotoRepoListRecords.Params(
repo=did,
collection=REPOST_COLLECTION,
limit=batch_size,
cursor=cursor
)
response = client.com.atproto.repo.list_records(params=list_params)
records = response.records
if not records:
break
progress.batch(batch_num, len(records))
for record in records:
try:
record_uri = record.uri
rkey = record_uri.rsplit("/", 1)[-1]
delete_data = {
"repo": did,
"collection": REPOST_COLLECTION,
"rkey": rkey
}
client.com.atproto.repo.delete_record(data=delete_data)
total_undone += 1
progress.update(1)
logger.debug(f"Undone repost: {record_uri}")
except Exception as e:
record_uri = getattr(record, "uri", "unknown")
logger.error(f"Error undoing repost {record_uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
logger.info(f"Undone {total_undone} reposts.")

View File

@@ -1,17 +1,24 @@
"""Safeguard module for Skywipe""" """Safeguard module for Skywipe"""
import sys import sys
import logging
from .logger import get_logger from .logger import get_logger
CONFIRM_RESPONSES = {"yes", "y"} CONFIRM_RESPONSES = {"yes", "y"}
def require_confirmation(operation: str, skip_confirmation: bool = False) -> None: def require_confirmation(
operation: str,
skip_confirmation: bool = False,
logger: logging.Logger | None = None
) -> None:
if skip_confirmation: if skip_confirmation:
return return
if logger is None:
logger = get_logger() logger = get_logger()
logger.warning(f"This will {operation}") logger.warning(f"This will {operation}")
logger.warning("This operation is DESTRUCTIVE and cannot be undone!") logger.warning("This operation is DESTRUCTIVE and cannot be undone!")

25
tests/conftest.py Normal file
View File

@@ -0,0 +1,25 @@
from pathlib import Path
from typing import Iterable, Callable
import pytest
from skywipe.configure import Configuration
@pytest.fixture
def user_input(monkeypatch) -> Callable[[Iterable[str], Iterable[str]], None]:
def _set(inputs: Iterable[str], passwords: Iterable[str]) -> None:
input_iter = iter(inputs)
password_iter = iter(passwords)
monkeypatch.setattr("builtins.input", lambda _prompt: next(input_iter))
monkeypatch.setattr("skywipe.configure.getpass",
lambda _prompt: next(password_iter))
return _set
@pytest.fixture
def config_with_tmp_path(monkeypatch, tmp_path):
monkeypatch.setattr(Path, "home", lambda: tmp_path)
return Configuration()

239
tests/test_cli.py Normal file
View File

@@ -0,0 +1,239 @@
import logging
import sys
import pytest
import skywipe.cli as cli
TEST_COMMAND = "posts"
TEST_ERROR_MESSAGE = "boom"
TEST_LOGGER_NAME = "test.cli"
def _setup_parser_mocks(monkeypatch, commands=None):
if commands is None:
commands = {TEST_COMMAND: "only posts"}
monkeypatch.setattr(cli.registry, "get_all_commands", lambda: commands)
def _setup_main_mocks(monkeypatch, calls, requires_config=False, config_data=None):
_setup_parser_mocks(monkeypatch)
monkeypatch.setattr(cli.registry, "requires_config",
lambda name: requires_config)
def mock_execute(name, skip_confirmation=False):
calls["execute"] = (name, skip_confirmation)
def mock_setup_logger(verbose, log_file):
calls["setup"].append((verbose, log_file))
def mock_require_config(logger):
calls["require_config"].append(logger)
monkeypatch.setattr(cli.registry, "execute", mock_execute)
monkeypatch.setattr(cli, "setup_logger", mock_setup_logger)
monkeypatch.setattr(cli, "require_config", mock_require_config)
monkeypatch.setattr(cli, "get_logger",
lambda: logging.getLogger(TEST_LOGGER_NAME))
if config_data is not None:
monkeypatch.setattr(cli.Configuration, "load",
lambda self: config_data)
def _setup_error_mocks(monkeypatch, calls, error_factory):
_setup_parser_mocks(monkeypatch)
monkeypatch.setattr(cli.registry, "requires_config", lambda name: False)
monkeypatch.setattr(cli.registry, "execute", error_factory)
monkeypatch.setattr(cli, "setup_logger", lambda verbose, log_file: None)
monkeypatch.setattr(cli, "get_logger",
lambda: logging.getLogger(TEST_LOGGER_NAME))
def _format_error_message(error):
if isinstance(error, KeyError):
return error.args[0] if error.args else str(error)
return str(error)
def mock_handle_error(error, logger, exit_on_error=False):
calls["handle_error"] = (type(error).__name__,
_format_error_message(error), exit_on_error)
monkeypatch.setattr(cli, "handle_error", mock_handle_error)
def test_create_parser_includes_commands(monkeypatch):
_setup_parser_mocks(monkeypatch)
parser = cli.create_parser()
args = parser.parse_args([TEST_COMMAND])
assert args.command == TEST_COMMAND
def test_create_parser_handles_multiple_commands(monkeypatch):
commands = {
"posts": "only posts",
"likes": "only likes",
"reposts": "only reposts"
}
_setup_parser_mocks(monkeypatch, commands)
parser = cli.create_parser()
args1 = parser.parse_args(["posts"])
args2 = parser.parse_args(["likes"])
args3 = parser.parse_args(["reposts"])
assert args1.command == "posts"
assert args2.command == "likes"
assert args3.command == "reposts"
def test_create_parser_parses_yes_flag(monkeypatch):
_setup_parser_mocks(monkeypatch)
parser = cli.create_parser()
args = parser.parse_args(["--yes", TEST_COMMAND])
assert args.command == TEST_COMMAND
assert args.yes is True
def test_create_parser_parses_without_yes_flag(monkeypatch):
_setup_parser_mocks(monkeypatch)
parser = cli.create_parser()
args = parser.parse_args([TEST_COMMAND])
assert args.command == TEST_COMMAND
assert getattr(args, "yes", False) is False
def test_create_parser_version_flag_exits(monkeypatch):
_setup_parser_mocks(monkeypatch)
parser = cli.create_parser()
with pytest.raises(SystemExit) as excinfo:
parser.parse_args(["--version"])
assert excinfo.value.code == 0
def test_create_parser_requires_command(monkeypatch):
_setup_parser_mocks(monkeypatch)
parser = cli.create_parser()
with pytest.raises(SystemExit):
parser.parse_args([])
def test_create_parser_rejects_invalid_command(monkeypatch):
_setup_parser_mocks(monkeypatch)
parser = cli.create_parser()
with pytest.raises(SystemExit):
parser.parse_args(["invalid_command"])
def test_require_config_exits_when_missing(monkeypatch):
monkeypatch.setattr(cli.Configuration, "exists", lambda self: False)
logger = logging.getLogger(TEST_LOGGER_NAME)
with pytest.raises(SystemExit) as excinfo:
cli.require_config(logger)
assert excinfo.value.code == 1
def test_require_config_does_not_exit_when_exists(monkeypatch):
monkeypatch.setattr(cli.Configuration, "exists", lambda self: True)
logger = logging.getLogger(TEST_LOGGER_NAME)
cli.require_config(logger)
def test_main_executes_without_config(monkeypatch):
calls = {"execute": None, "setup": [], "require_config": []}
_setup_main_mocks(monkeypatch, calls, requires_config=False)
monkeypatch.setattr(sys, "argv", ["skywipe", "--yes", TEST_COMMAND])
cli.main()
assert len(calls["require_config"]) == 0
assert calls["setup"] == [(False, cli.LOG_FILE)]
assert calls["execute"] == (TEST_COMMAND, True)
def test_main_loads_config_and_sets_verbose(monkeypatch):
calls = {"setup": [], "execute": None, "require_config": []}
_setup_main_mocks(monkeypatch, calls, requires_config=True,
config_data={"verbose": True})
monkeypatch.setattr(sys, "argv", ["skywipe", TEST_COMMAND])
cli.main()
assert len(calls["require_config"]) == 1
assert calls["setup"] == [(False, cli.LOG_FILE), (True, cli.LOG_FILE)]
assert calls["execute"] == (TEST_COMMAND, False)
@pytest.mark.parametrize("config_data,expected_verbose", [
({}, False),
({"verbose": False}, False),
])
def test_main_config_verbose_defaults(monkeypatch, config_data, expected_verbose):
calls = {"setup": [], "execute": None, "require_config": []}
_setup_main_mocks(monkeypatch, calls, requires_config=True,
config_data=config_data)
monkeypatch.setattr(sys, "argv", ["skywipe", TEST_COMMAND])
cli.main()
assert len(calls["require_config"]) == 1
assert calls["setup"] == [(False, cli.LOG_FILE),
(expected_verbose, cli.LOG_FILE)]
assert calls["execute"] == (TEST_COMMAND, False)
def test_main_handles_config_load_error(monkeypatch):
calls = {"handle_error": None, "require_config": []}
def mock_require_config(logger):
calls["require_config"].append(logger)
def raise_config_error(self):
raise RuntimeError("config error")
def _format_error_message(error):
if isinstance(error, KeyError):
return error.args[0] if error.args else str(error)
return str(error)
def mock_handle_error(error, logger, exit_on_error=False):
calls["handle_error"] = (type(error).__name__,
_format_error_message(error), exit_on_error)
_setup_parser_mocks(monkeypatch)
monkeypatch.setattr(cli.registry, "requires_config", lambda name: True)
monkeypatch.setattr(cli, "require_config", mock_require_config)
monkeypatch.setattr(cli.Configuration, "load", raise_config_error)
monkeypatch.setattr(cli, "setup_logger", lambda verbose, log_file: None)
monkeypatch.setattr(cli, "get_logger",
lambda: logging.getLogger(TEST_LOGGER_NAME))
monkeypatch.setattr(cli, "handle_error", mock_handle_error)
monkeypatch.setattr(sys, "argv", ["skywipe", TEST_COMMAND])
cli.main()
assert len(calls["require_config"]) == 1
assert calls["handle_error"] is not None
assert calls["handle_error"][0] == "RuntimeError"
assert calls["handle_error"][2] is True
@pytest.mark.parametrize("error_class,error_message", [
(ValueError, TEST_ERROR_MESSAGE),
(RuntimeError, "runtime error"),
(KeyError, "missing key"),
])
def test_main_handles_execute_errors(monkeypatch, error_class, error_message):
calls = {"handle_error": None}
def raise_error(*_args, **_kwargs):
raise error_class(error_message)
_setup_error_mocks(monkeypatch, calls, raise_error)
monkeypatch.setattr(sys, "argv", ["skywipe", TEST_COMMAND])
cli.main()
assert calls["handle_error"] is not None
assert calls["handle_error"][0] == error_class.__name__
assert calls["handle_error"][1] == error_message
assert calls["handle_error"][2] is True

32
tests/test_commands.py Normal file
View File

@@ -0,0 +1,32 @@
import pytest
from skywipe.commands import CommandRegistry
def test_command_registry_register_and_execute():
registry = CommandRegistry()
calls = {"configure": 0, "run": 0, "skip": None}
def configure_handler():
calls["configure"] += 1
def run_handler(skip_confirmation=False):
calls["run"] += 1
calls["skip"] = skip_confirmation
registry.register("configure", configure_handler,
"config", requires_config=False)
registry.register("run", run_handler, "run")
registry.execute("configure")
registry.execute("run", skip_confirmation=True)
assert calls["configure"] == 1
assert calls["run"] == 1
assert calls["skip"] is True
def test_command_registry_unknown_command():
registry = CommandRegistry()
with pytest.raises(ValueError, match="Unknown command"):
registry.execute("nope")

View File

@@ -0,0 +1,132 @@
import logging
import pytest
import skywipe.commands as commands
import skywipe.operations as operations
def test_create_operation_handler_calls_confirmation_and_run(monkeypatch):
calls = {"confirmed": False, "run": 0}
def fake_confirm(message, skip_confirmation, logger):
calls["confirmed"] = True
assert message == "do the thing"
assert skip_confirmation is True
assert isinstance(logger, logging.Logger)
class FakeOperation:
def __init__(self, *args, **kwargs):
pass
def run(self):
calls["run"] += 1
monkeypatch.setattr(commands, "require_confirmation", fake_confirm)
monkeypatch.setattr(commands, "Operation", FakeOperation)
handler = commands._create_operation_handler(
"do the thing", "Test", strategy_type="feed"
)
handler(skip_confirmation=True)
assert calls["confirmed"] is True
assert calls["run"] == 1
def test_run_all_runs_in_order(monkeypatch):
ran = []
fake_commands = {
"posts": "only posts",
"likes": "only likes",
"medias": "only medias",
"extra": "extra",
}
fake_metadata = {
"posts": {
"operation_name": "Deleting posts",
"strategy_type": "feed",
"collection": None,
"filter_fn": None,
},
"likes": {
"operation_name": "Undoing likes",
"strategy_type": "record",
"collection": "app.bsky.feed.like",
"filter_fn": None,
},
"medias": {
"operation_name": "Deleting posts with media",
"strategy_type": "feed",
"collection": None,
"filter_fn": None,
},
"extra": {
"operation_name": "Extra op",
"strategy_type": "feed",
"collection": None,
"filter_fn": None,
},
}
fake_order = ["medias", "posts", "likes"]
class FakeOperation:
def __init__(self, operation_name, **kwargs):
self.operation_name = operation_name
def run(self):
ran.append(self.operation_name)
class FakeOperationContext:
def __init__(self):
self.client = object()
self.config_data = {"batch_size": 1, "delay": 0}
monkeypatch.setattr(commands, "require_confirmation",
lambda *args, **kwargs: None)
monkeypatch.setattr(commands, "Operation", FakeOperation)
monkeypatch.setattr(operations, "OperationContext", FakeOperationContext)
monkeypatch.setattr(commands.registry,
"get_all_commands", lambda: fake_commands)
monkeypatch.setattr(commands, "COMMAND_METADATA", fake_metadata)
monkeypatch.setattr(commands, "COMMAND_EXECUTION_ORDER", fake_order)
commands.run_all(skip_confirmation=True)
expected = [
"Deleting posts with media",
"Deleting posts",
"Undoing likes",
"Extra op",
]
assert ran == expected
def test_run_all_continues_on_error(monkeypatch):
ran = []
class FakeOperation:
def __init__(self, operation_name, **kwargs):
self.operation_name = operation_name
def run(self):
ran.append(self.operation_name)
if self.operation_name == "Deleting posts":
raise RuntimeError("fail")
class FakeOperationContext:
def __init__(self):
self.client = object()
self.config_data = {"batch_size": 1, "delay": 0}
monkeypatch.setattr(commands, "require_confirmation",
lambda *args, **kwargs: None)
monkeypatch.setattr(commands, "Operation", FakeOperation)
monkeypatch.setattr(operations, "OperationContext", FakeOperationContext)
commands.run_all(skip_confirmation=True)
assert "Deleting posts" in ran
assert len(ran) >= 2

33
tests/test_configure.py Normal file
View File

@@ -0,0 +1,33 @@
import pytest
from skywipe.configure import _validate_handle, _validate_password
@pytest.mark.parametrize(
"handle,expected_valid",
[
("", False),
("a" * 254, False),
("has space", False),
("invalid_handle!", False),
("alice", True),
("alice.bsky.social", True),
("did:plc:abcd1234", True),
],
)
def test_validate_handle(handle, expected_valid):
result = _validate_handle(handle)
assert result.is_valid is expected_valid
@pytest.mark.parametrize(
"password,expected_valid",
[
("", False),
("short", False),
("longenough", True),
],
)
def test_validate_password(password, expected_valid):
result = _validate_password(password)
assert result.is_valid is expected_valid

View File

@@ -0,0 +1,97 @@
from pathlib import Path
import yaml
def test_configuration_create_reprompts_and_writes_file(config_with_tmp_path, user_input):
inputs = iter([
"bad handle",
"alice.bsky.social",
"5",
"0",
"n",
])
passwords = iter([
"short",
"longenough",
])
user_input(inputs, passwords)
config = config_with_tmp_path
config.create()
assert config.config_file.exists() is True
data = yaml.safe_load(config.config_file.read_text())
assert data["handle"] == "alice.bsky.social"
assert data["password"] == "longenough"
assert data["batch_size"] == 5
assert data["delay"] == 0
assert data["verbose"] is False
def test_configuration_create_invalid_batch_size(config_with_tmp_path, user_input):
inputs = iter([
"alice.bsky.social",
"0",
"1",
"y",
])
passwords = iter(["longenough"])
user_input(inputs, passwords)
config = config_with_tmp_path
config.create()
assert config.config_file.exists() is False
def test_configuration_create_invalid_delay(config_with_tmp_path, user_input):
inputs = iter([
"alice.bsky.social",
"10",
"61",
"y",
])
passwords = iter(["longenough"])
user_input(inputs, passwords)
config = config_with_tmp_path
config.create()
assert config.config_file.exists() is False
def test_configuration_create_overwrite_cancel(config_with_tmp_path, user_input):
config = config_with_tmp_path
config.config_file.parent.mkdir(parents=True, exist_ok=True)
config.config_file.write_text("existing")
user_input(["n"], [])
config.create()
assert config.config_file.read_text() == "existing"
def test_configuration_create_write_failure(config_with_tmp_path, user_input, monkeypatch):
user_input(
["alice.bsky.social", "5", "0", "y"],
["longenough"],
)
config = config_with_tmp_path
original_open = open
def fake_open(path, mode="r", *args, **kwargs):
if Path(path) == config.config_file and "w" in mode:
raise OSError("disk full")
return original_open(path, mode, *args, **kwargs)
monkeypatch.setattr("builtins.open", fake_open)
config.create()
assert config.config_file.exists() is False

View File

@@ -0,0 +1,59 @@
from unittest.mock import patch
import pytest
import yaml
def test_configuration_load_missing_file(config_with_tmp_path):
with pytest.raises(FileNotFoundError, match="Configuration file not found"):
config_with_tmp_path.load()
def test_configuration_load_empty_file(config_with_tmp_path):
config_with_tmp_path.config_file.parent.mkdir(parents=True, exist_ok=True)
config_with_tmp_path.config_file.write_text("")
with pytest.raises(ValueError, match="empty or invalid"):
config_with_tmp_path.load()
def test_configuration_load_invalid_yaml(config_with_tmp_path):
config_with_tmp_path.config_file.parent.mkdir(parents=True, exist_ok=True)
config_with_tmp_path.config_file.write_text(": bad")
with pytest.raises(ValueError, match="Invalid YAML"):
config_with_tmp_path.load()
def test_configuration_load_valid_file(config_with_tmp_path):
config_with_tmp_path.config_file.parent.mkdir(parents=True, exist_ok=True)
config_data = {
"handle": "alice.bsky.social",
"password": "password123",
"batch_size": 10,
"delay": 1,
"verbose": True
}
with open(config_with_tmp_path.config_file, "w") as f:
yaml.dump(config_data, f)
result = config_with_tmp_path.load()
assert result == config_data
def test_configuration_load_file_read_error(config_with_tmp_path):
config_with_tmp_path.config_file.parent.mkdir(parents=True, exist_ok=True)
config_with_tmp_path.config_file.write_text("handle: alice")
with patch("builtins.open", side_effect=IOError("Permission denied")):
with pytest.raises(ValueError, match="Failed to read configuration file"):
config_with_tmp_path.load()
def test_configuration_load_file_os_error(config_with_tmp_path):
config_with_tmp_path.config_file.parent.mkdir(parents=True, exist_ok=True)
config_with_tmp_path.config_file.write_text("handle: alice")
with patch("builtins.open", side_effect=OSError("File is locked")):
with pytest.raises(ValueError, match="Failed to read configuration file"):
config_with_tmp_path.load()

252
tests/test_logger.py Normal file
View File

@@ -0,0 +1,252 @@
import logging
import sys
from skywipe.logger import LevelFilter, ProgressTracker, setup_logger
def test_level_filter_accepts_in_range():
logger = logging.getLogger("test.level_filter")
record = logger.makeRecord(
name="test.level_filter",
level=logging.INFO,
fn="test",
lno=1,
msg="message",
args=(),
exc_info=None,
)
level_filter = LevelFilter(logging.DEBUG, logging.INFO)
assert level_filter.filter(record) is True
def test_level_filter_rejects_out_of_range():
logger = logging.getLogger("test.level_filter")
record = logger.makeRecord(
name="test.level_filter",
level=logging.ERROR,
fn="test",
lno=1,
msg="message",
args=(),
exc_info=None,
)
level_filter = LevelFilter(logging.DEBUG, logging.INFO)
assert level_filter.filter(record) is False
def test_progress_tracker_updates_counts():
tracker = ProgressTracker(operation="Testing")
tracker.update()
tracker.update(2)
assert tracker.current == 3
def test_progress_tracker_batch_with_total():
tracker = ProgressTracker(operation="Testing")
logger = logging.getLogger("skywipe.progress")
messages = []
class MessageHandler(logging.Handler):
def emit(self, record):
messages.append(self.format(record))
handler = MessageHandler()
handler.setLevel(logging.INFO)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
try:
tracker.batch(1, 10, total_batches=5)
assert messages[-1] == "Testing - batch 1/5 (10 items)"
tracker.batch(0, 5, total_batches=0)
assert messages[-1] == "Testing - batch 0/0 (5 items)"
finally:
handler.close()
logger.removeHandler(handler)
def test_progress_tracker_batch_without_total():
tracker = ProgressTracker(operation="Testing")
logger = logging.getLogger("skywipe.progress")
messages = []
class MessageHandler(logging.Handler):
def emit(self, record):
messages.append(self.format(record))
handler = MessageHandler()
handler.setLevel(logging.INFO)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
try:
tracker.batch(1, 10, total_batches=None)
assert messages[-1] == "Testing - batch 1 (10 items)"
finally:
handler.close()
logger.removeHandler(handler)
def test_setup_logger_does_not_duplicate_handlers():
logger = logging.getLogger("skywipe")
original_handlers = list(logger.handlers)
for handler in original_handlers:
logger.removeHandler(handler)
try:
setup_logger(verbose=False)
first_handlers = list(logger.handlers)
stream_handlers = [
handler for handler in first_handlers
if isinstance(handler, logging.StreamHandler)
]
assert len(stream_handlers) == 2
assert {handler.stream for handler in stream_handlers} == {
sys.stdout,
sys.stderr,
}
assert not any(
isinstance(handler, logging.FileHandler)
for handler in first_handlers
)
first_count = len(first_handlers)
setup_logger(verbose=False)
second_count = len(logger.handlers)
finally:
for handler in list(logger.handlers):
handler.close()
logger.removeHandler(handler)
for handler in original_handlers:
logger.addHandler(handler)
assert first_count == second_count
def test_setup_logger_resets_stream_formatters():
logger = logging.getLogger("skywipe")
original_handlers = list(logger.handlers)
for handler in original_handlers:
logger.removeHandler(handler)
try:
setup_logger(verbose=False)
alt_formatter = logging.Formatter(fmt="%(message)s")
for handler in logger.handlers:
if isinstance(handler, logging.StreamHandler):
handler.setFormatter(alt_formatter)
setup_logger(verbose=False)
for handler in logger.handlers:
if isinstance(handler, logging.StreamHandler):
assert handler.formatter is not None
assert handler.formatter._fmt == "%(levelname)s: %(message)s"
finally:
for handler in list(logger.handlers):
handler.close()
logger.removeHandler(handler)
for handler in original_handlers:
logger.addHandler(handler)
def test_setup_logger_disables_propagation():
root_logger = logging.getLogger()
root_messages = []
original_root_level = root_logger.level
class RootHandler(logging.Handler):
def emit(self, record):
root_messages.append(self.format(record))
root_handler = RootHandler()
root_handler.setLevel(logging.INFO)
root_logger.addHandler(root_handler)
root_logger.setLevel(logging.INFO)
logger = logging.getLogger("skywipe")
original_handlers = list(logger.handlers)
for handler in original_handlers:
logger.removeHandler(handler)
try:
setup_logger(verbose=False)
assert logger.propagate is False
progress_logger = logging.getLogger("skywipe.progress")
assert progress_logger.propagate is True
logger.info("Test message")
progress_logger.info("Progress message")
assert len(root_messages) == 0
finally:
root_handler.close()
root_logger.removeHandler(root_handler)
root_logger.setLevel(original_root_level)
for handler in list(logger.handlers):
handler.close()
logger.removeHandler(handler)
for handler in original_handlers:
logger.addHandler(handler)
def test_setup_logger_file_handler_lifecycle(tmp_path):
logger = logging.getLogger("skywipe")
original_handlers = list(logger.handlers)
for handler in original_handlers:
logger.removeHandler(handler)
log_file = tmp_path / "skywipe.log"
try:
setup_logger(verbose=False, log_file=log_file)
file_handlers = [
handler for handler in logger.handlers
if isinstance(handler, logging.FileHandler)
]
assert len(file_handlers) == 1
assert file_handlers[0].baseFilename == str(log_file)
setup_logger(verbose=False, log_file=None)
file_handlers = [
handler for handler in logger.handlers
if isinstance(handler, logging.FileHandler)
]
assert file_handlers == []
finally:
for handler in list(logger.handlers):
logger.removeHandler(handler)
for handler in original_handlers:
logger.addHandler(handler)
def test_setup_logger_replaces_file_handler_when_path_changes(tmp_path):
logger = logging.getLogger("skywipe")
original_handlers = list(logger.handlers)
for handler in original_handlers:
logger.removeHandler(handler)
log_file1 = tmp_path / "skywipe1.log"
log_file2 = tmp_path / "skywipe2.log"
try:
setup_logger(verbose=False, log_file=log_file1)
file_handlers = [
handler for handler in logger.handlers
if isinstance(handler, logging.FileHandler)
]
assert len(file_handlers) == 1
assert file_handlers[0].baseFilename == str(log_file1)
setup_logger(verbose=False, log_file=log_file2)
file_handlers = [
handler for handler in logger.handlers
if isinstance(handler, logging.FileHandler)
]
assert len(file_handlers) == 1
assert file_handlers[0].baseFilename == str(log_file2)
finally:
for handler in list(logger.handlers):
logger.removeHandler(handler)
for handler in original_handlers:
logger.addHandler(handler)

View File

@@ -0,0 +1,30 @@
import pytest
import skywipe.operations as operations
def test_operation_context_raises_on_auth_error(monkeypatch):
class FakeAuth:
def login(self):
raise ValueError("bad auth")
monkeypatch.setattr(operations, "Auth", FakeAuth)
with pytest.raises(ValueError, match="bad auth"):
operations.OperationContext()
def test_operation_context_raises_on_config_error(monkeypatch):
class FakeClient:
class Me:
did = "did:plc:fake"
me = Me()
def fake_load(self):
raise ValueError("bad config")
monkeypatch.setattr(operations.Configuration, "load", fake_load)
with pytest.raises(ValueError, match="bad config"):
operations.OperationContext(client=FakeClient())

98
tests/test_operations.py Normal file
View File

@@ -0,0 +1,98 @@
import time
import pytest
from skywipe.operations import Operation, BookmarkStrategy
class FakeClient:
class Me:
did = "did:plc:fake"
me = Me()
class FakeResponse:
def __init__(self, items, cursor=None):
self.items = items
self.cursor = cursor
class FakeStrategy:
def __init__(self, responses, fail_on=None):
self._responses = list(responses)
self._fail_on = fail_on
def fetch(self, context, cursor=None):
return self._responses.pop(0)
def extract_items(self, response):
return response.items
def process_item(self, item, context):
if self._fail_on is not None and item == self._fail_on:
raise ValueError("boom")
def get_cursor(self, response):
return response.cursor
def test_operation_run_batches_filters_and_sleeps(monkeypatch):
responses = [
FakeResponse(items=[1, 2, 3], cursor="next"),
FakeResponse(items=[4], cursor=None),
]
operation = Operation(
"Testing",
strategy_type="feed",
client=FakeClient(),
config_data={"batch_size": 2, "delay": 1},
filter_fn=lambda item: item != 2,
)
operation.strategy = FakeStrategy(responses, fail_on=3)
slept = []
def fake_sleep(seconds):
slept.append(seconds)
monkeypatch.setattr(time, "sleep", fake_sleep)
total = operation.run()
assert total == 2
assert slept == [1]
def test_bookmark_strategy_extracts_uri_from_shapes():
strategy = BookmarkStrategy()
class Obj:
pass
direct = Obj()
direct.uri = "direct"
assert strategy._extract_bookmark_uri(direct) == "direct"
subject = Obj()
subject.subject = Obj()
subject.subject.uri = "subject"
assert strategy._extract_bookmark_uri(subject) == "subject"
record = Obj()
record.record = Obj()
record.record.uri = "record"
assert strategy._extract_bookmark_uri(record) == "record"
post = Obj()
post.post = Obj()
post.post.uri = "post"
assert strategy._extract_bookmark_uri(post) == "post"
item = Obj()
item.item = Obj()
item.item.uri = "item"
assert strategy._extract_bookmark_uri(item) == "item"
missing = Obj()
assert strategy._extract_bookmark_uri(missing) is None

36
tests/test_safeguard.py Normal file
View File

@@ -0,0 +1,36 @@
import logging
import pytest
from skywipe.safeguard import require_confirmation
def test_require_confirmation_skips_prompt():
logger = logging.getLogger("test.safeguard")
require_confirmation("do nothing", skip_confirmation=True, logger=logger)
def test_require_confirmation_accepts_yes(monkeypatch):
logger = logging.getLogger("test.safeguard")
monkeypatch.setattr("builtins.input", lambda _: "y")
require_confirmation("do nothing", logger=logger)
def test_require_confirmation_rejects_no(monkeypatch):
logger = logging.getLogger("test.safeguard")
monkeypatch.setattr("builtins.input", lambda _: "n")
with pytest.raises(SystemExit) as excinfo:
require_confirmation("do nothing", logger=logger)
assert excinfo.value.code == 0
def test_require_confirmation_handles_eof(monkeypatch):
logger = logging.getLogger("test.safeguard")
def raise_eof(_prompt):
raise EOFError
monkeypatch.setattr("builtins.input", raise_eof)
with pytest.raises(SystemExit) as excinfo:
require_confirmation("do nothing", logger=logger)
assert excinfo.value.code == 0

129
tests/test_strategies.py Normal file
View File

@@ -0,0 +1,129 @@
from types import SimpleNamespace
from skywipe.operations import (
RecordDeletionStrategy,
FeedStrategy,
BookmarkStrategy,
)
def test_record_deletion_strategy_fetch_and_process(monkeypatch):
captured = {}
class FakeParams:
def __init__(self, **kwargs):
self.kwargs = kwargs
def fake_list_records(params):
captured["params"] = params
return "response"
def fake_delete_record(data):
captured["delete"] = data
client = SimpleNamespace(
com=SimpleNamespace(
atproto=SimpleNamespace(
repo=SimpleNamespace(
list_records=fake_list_records,
delete_record=fake_delete_record,
)
)
)
)
context = SimpleNamespace(
did="did:plc:fake",
batch_size=2,
client=client,
logger=SimpleNamespace(debug=lambda *_args, **_kwargs: None),
)
monkeypatch.setattr(
"skywipe.operations.models.ComAtprotoRepoListRecords.Params",
FakeParams,
)
strategy = RecordDeletionStrategy("app.bsky.feed.like")
response = strategy.fetch(context, cursor="next")
assert response == "response"
assert captured["params"].kwargs == {
"repo": "did:plc:fake",
"collection": "app.bsky.feed.like",
"limit": 2,
"cursor": "next",
}
record = SimpleNamespace(uri="at://did:plc:fake/app.bsky.feed.like/abc123")
strategy.process_item(record, context)
assert captured["delete"] == {
"repo": "did:plc:fake",
"collection": "app.bsky.feed.like",
"rkey": "abc123",
}
def test_feed_strategy_fetch_and_process():
captured = {}
def fake_get_author_feed(**kwargs):
captured["feed"] = kwargs
return "feed"
def fake_delete_post(uri):
captured["delete"] = uri
client = SimpleNamespace(
get_author_feed=fake_get_author_feed,
delete_post=fake_delete_post,
)
context = SimpleNamespace(
did="did:plc:fake",
batch_size=3,
client=client,
logger=SimpleNamespace(debug=lambda *_args, **_kwargs: None),
)
strategy = FeedStrategy()
response = strategy.fetch(context, cursor=None)
assert response == "feed"
assert captured["feed"] == {"actor": "did:plc:fake", "limit": 3}
response = strategy.fetch(context, cursor="next")
assert response == "feed"
assert captured["feed"] == {
"actor": "did:plc:fake", "limit": 3, "cursor": "next"}
post = SimpleNamespace(post=SimpleNamespace(uri="at://post"))
strategy.process_item(post, context)
assert captured["delete"] == "at://post"
def test_bookmark_strategy_fetch(monkeypatch):
captured = {}
class FakeParams:
def __init__(self, **kwargs):
self.kwargs = kwargs
def fake_get_bookmarks(params):
captured["params"] = params
return "bookmarks"
client = SimpleNamespace(
app=SimpleNamespace(
bsky=SimpleNamespace(
bookmark=SimpleNamespace(get_bookmarks=fake_get_bookmarks)
)
)
)
context = SimpleNamespace(batch_size=5, client=client)
monkeypatch.setattr(
"skywipe.operations.models.AppBskyBookmarkGetBookmarks.Params",
FakeParams,
)
strategy = BookmarkStrategy()
response = strategy.fetch(context, cursor="cursor")
assert response == "bookmarks"
assert captured["params"].kwargs == {"limit": 5, "cursor": "cursor"}

61
uv.lock generated
View File

@@ -228,6 +228,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/0e/61/66938bbb5fc52dbdf84594873d5b51fb1f7c7794e9c0f5bd885f30bc507b/idna-3.11-py3-none-any.whl", hash = "sha256:771a87f49d9defaf64091e6e6fe9c18d4833f140bd19464795bc32d966ca37ea", size = 71008, upload-time = "2025-10-12T14:55:18.883Z" }, { url = "https://files.pythonhosted.org/packages/0e/61/66938bbb5fc52dbdf84594873d5b51fb1f7c7794e9c0f5bd885f30bc507b/idna-3.11-py3-none-any.whl", hash = "sha256:771a87f49d9defaf64091e6e6fe9c18d4833f140bd19464795bc32d966ca37ea", size = 71008, upload-time = "2025-10-12T14:55:18.883Z" },
] ]
[[package]]
name = "iniconfig"
version = "2.3.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/72/34/14ca021ce8e5dfedc35312d08ba8bf51fdd999c576889fc2c24cb97f4f10/iniconfig-2.3.0.tar.gz", hash = "sha256:c76315c77db068650d49c5b56314774a7804df16fee4402c1f19d6d15d8c4730", size = 20503, upload-time = "2025-10-18T21:55:43.219Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/cb/b1/3846dd7f199d53cb17f49cba7e651e9ce294d8497c8c150530ed11865bb8/iniconfig-2.3.0-py3-none-any.whl", hash = "sha256:f631c04d2c48c52b84d0d0549c99ff3859c98df65b3101406327ecc7d53fbf12", size = 7484, upload-time = "2025-10-18T21:55:41.639Z" },
]
[[package]] [[package]]
name = "libipld" name = "libipld"
version = "3.3.2" version = "3.3.2"
@@ -262,6 +271,24 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/22/19/bb42dc53bb8855c1f40b4a431ed3cb2df257bd5a6af61842626712c83073/libipld-3.3.2-cp314-cp314-win_arm64.whl", hash = "sha256:08261503b7307c6d9acbd3b2a221da9294b457204dcefce446f627893abb077e", size = 149324, upload-time = "2025-12-05T12:59:18.815Z" }, { url = "https://files.pythonhosted.org/packages/22/19/bb42dc53bb8855c1f40b4a431ed3cb2df257bd5a6af61842626712c83073/libipld-3.3.2-cp314-cp314-win_arm64.whl", hash = "sha256:08261503b7307c6d9acbd3b2a221da9294b457204dcefce446f627893abb077e", size = 149324, upload-time = "2025-12-05T12:59:18.815Z" },
] ]
[[package]]
name = "packaging"
version = "25.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/a1/d4/1fc4078c65507b51b96ca8f8c3ba19e6a61c8253c72794544580a7b6c24d/packaging-25.0.tar.gz", hash = "sha256:d443872c98d677bf60f6a1f2f8c1cb748e8fe762d2bf9d3148b5599295b0fc4f", size = 165727, upload-time = "2025-04-19T11:48:59.673Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/20/12/38679034af332785aac8774540895e234f4d07f7545804097de4b666afd8/packaging-25.0-py3-none-any.whl", hash = "sha256:29572ef2b1f17581046b3a2227d5c611fb25ec70ca1ba8554b24b0e69331a484", size = 66469, upload-time = "2025-04-19T11:48:57.875Z" },
]
[[package]]
name = "pluggy"
version = "1.6.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/f9/e2/3e91f31a7d2b083fe6ef3fa267035b518369d9511ffab804f839851d2779/pluggy-1.6.0.tar.gz", hash = "sha256:7dcc130b76258d33b90f61b658791dede3486c3e6bfb003ee5c9bfb396dd22f3", size = 69412, upload-time = "2025-05-15T12:30:07.975Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538, upload-time = "2025-05-15T12:30:06.134Z" },
]
[[package]] [[package]]
name = "pycparser" name = "pycparser"
version = "2.23" version = "2.23"
@@ -339,6 +366,31 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/9f/ed/068e41660b832bb0b1aa5b58011dea2a3fe0ba7861ff38c4d4904c1c1a99/pydantic_core-2.41.5-cp314-cp314t-win_arm64.whl", hash = "sha256:35b44f37a3199f771c3eaa53051bc8a70cd7b54f333531c59e29fd4db5d15008", size = 1974769, upload-time = "2025-11-04T13:42:01.186Z" }, { url = "https://files.pythonhosted.org/packages/9f/ed/068e41660b832bb0b1aa5b58011dea2a3fe0ba7861ff38c4d4904c1c1a99/pydantic_core-2.41.5-cp314-cp314t-win_arm64.whl", hash = "sha256:35b44f37a3199f771c3eaa53051bc8a70cd7b54f333531c59e29fd4db5d15008", size = 1974769, upload-time = "2025-11-04T13:42:01.186Z" },
] ]
[[package]]
name = "pygments"
version = "2.19.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/b0/77/a5b8c569bf593b0140bde72ea885a803b82086995367bf2037de0159d924/pygments-2.19.2.tar.gz", hash = "sha256:636cb2477cec7f8952536970bc533bc43743542f70392ae026374600add5b887", size = 4968631, upload-time = "2025-06-21T13:39:12.283Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c7/21/705964c7812476f378728bdf590ca4b771ec72385c533964653c68e86bdc/pygments-2.19.2-py3-none-any.whl", hash = "sha256:86540386c03d588bb81d44bc3928634ff26449851e99741617ecb9037ee5ec0b", size = 1225217, upload-time = "2025-06-21T13:39:07.939Z" },
]
[[package]]
name = "pytest"
version = "9.0.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "colorama", marker = "sys_platform == 'win32'" },
{ name = "iniconfig" },
{ name = "packaging" },
{ name = "pluggy" },
{ name = "pygments" },
]
sdist = { url = "https://files.pythonhosted.org/packages/d1/db/7ef3487e0fb0049ddb5ce41d3a49c235bf9ad299b6a25d5780a89f19230f/pytest-9.0.2.tar.gz", hash = "sha256:75186651a92bd89611d1d9fc20f0b4345fd827c41ccd5c299a868a05d70edf11", size = 1568901, upload-time = "2025-12-06T21:30:51.014Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/3b/ab/b3226f0bd7cdcf710fbede2b3548584366da3b19b5021e74f5bde2a8fa3f/pytest-9.0.2-py3-none-any.whl", hash = "sha256:711ffd45bf766d5264d487b917733b453d917afd2b0ad65223959f59089f875b", size = 374801, upload-time = "2025-12-06T21:30:49.154Z" },
]
[[package]] [[package]]
name = "pyyaml" name = "pyyaml"
version = "6.0.3" version = "6.0.3"
@@ -378,17 +430,24 @@ wheels = [
[[package]] [[package]]
name = "skywipe" name = "skywipe"
version = "0.1.0" version = "0.1.0"
source = { virtual = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "atproto" }, { name = "atproto" },
{ name = "pyyaml" }, { name = "pyyaml" },
] ]
[package.optional-dependencies]
dev = [
{ name = "pytest" },
]
[package.metadata] [package.metadata]
requires-dist = [ requires-dist = [
{ name = "atproto", specifier = ">=0.0.65" }, { name = "atproto", specifier = ">=0.0.65" },
{ name = "pytest", marker = "extra == 'dev'", specifier = ">=8.0" },
{ name = "pyyaml", specifier = ">=6.0" }, { name = "pyyaml", specifier = ">=6.0" },
] ]
provides-extras = ["dev"]
[[package]] [[package]]
name = "typing-extensions" name = "typing-extensions"