Compare commits

...

79 Commits

Author SHA1 Message Date
2cdc4c6c42 refactor: tidy logger handler setup 2026-01-15 16:06:32 +01:00
07862f0ea2 refactor: simplify configuration create flow 2026-01-15 16:02:21 +01:00
e095c68f72 test: share error message formatting helper 2026-01-08 06:51:56 +01:00
c718e8c6f5 test: patch configure getpass correctly 2026-01-08 06:51:46 +01:00
a07cc02fb0 fix: format KeyError messages cleanly 2026-01-08 06:51:37 +01:00
ecc33054af fix: handle config load errors via handler 2026-01-08 06:51:23 +01:00
e6d68dd37d refactor: type strategy interface 2026-01-05 20:38:20 +01:00
809b7823ea refactor: type command metadata 2026-01-05 20:38:09 +01:00
6a88ab8560 refactor: import getpass directly 2025-12-31 08:22:27 +01:00
dd3b220a4a test: use shared config fixture 2025-12-30 23:09:01 +01:00
82b99da50d test: add config_with_tmp_path fixture 2025-12-30 23:08:51 +01:00
ce2a1ad594 test: expand configure load tests with fixture and error cases 2025-12-30 23:04:45 +01:00
313b6fc453 test: refactor CLI tests and expand coverage 2025-12-30 18:29:34 +01:00
e364598414 test: expect progress propagation 2025-12-30 18:16:51 +01:00
cd1cf1f170 fix: restore progress logger propagation 2025-12-30 18:16:44 +01:00
c3761d1d08 test: cover formatter reset on reuse 2025-12-30 18:13:56 +01:00
85f1ea4efb fix: reset stream formatters in setup_logger 2025-12-30 18:13:51 +01:00
df22b3dd3d test: cover logger propagation behavior 2025-12-30 18:11:48 +01:00
8c0bbceeac fix: disable skywipe log propagation 2025-12-30 18:11:38 +01:00
5e60374937 test: add tests for ProgressTracker.batch with total_batches=0 2025-12-30 17:46:29 +01:00
fd62bb5ea2 fix: use "is not None" check for total_batches in ProgressTracker.batch 2025-12-30 17:46:22 +01:00
6785ecd45a test: strengthen logger handler duplication test 2025-12-30 08:53:00 +01:00
7828989150 refactor: dedupe file handler cleanup in setup_logger 2025-12-30 08:45:39 +01:00
9eb2ed0097 test: verify FileHandler replacement when log_file path changes 2025-12-30 08:31:14 +01:00
5c8932599c fix: replace existing FileHandler when log_file changes 2025-12-30 08:30:59 +01:00
b2af41d5fb style: prefer PEP 604/585 type hints 2025-12-23 05:14:38 +01:00
6de91e2bb9 build: package = true 2025-12-23 04:55:49 +01:00
d026c53c0a chore: update uv.lock 2025-12-23 04:55:42 +01:00
d6ce77ab15 test: cover operation run + bookmark parsing 2025-12-23 04:50:21 +01:00
b6e0c55c3e test: cover operation contect error paths 2025-12-23 04:50:10 +01:00
3b84be90b7 test: cover run_all ordering and errors 2025-12-23 04:49:46 +01:00
b8f6953a17 test: cover config create flows 2025-12-23 04:49:32 +01:00
7be7922b08 test: cover config load and errors 2025-12-23 04:49:13 +01:00
c669bc9de7 test: cover strategy fetch/process behaviors 2025-12-23 04:48:50 +01:00
155cb927ba test: cover handle/password validation 2025-12-23 04:48:29 +01:00
769a1af58c test: cover cli parser and main flow 2025-12-23 04:48:07 +01:00
4e04a9d7b7 test: cover command registry basics 2025-12-23 04:47:48 +01:00
875feb204c test: cover confirmation prompt handling 2025-12-23 04:47:06 +01:00
02364e54c6 test: cover logger helpers 2025-12-23 04:46:49 +01:00
7ca3c8e969 docs: how to for unit test 2025-12-23 04:46:24 +01:00
45b43e7062 build: update uv.lock 2025-12-23 04:46:16 +01:00
e518f96e9d build: add pytest as optional 2025-12-23 04:46:07 +01:00
1c4a256641 test: add shared input/password fixture 2025-12-23 04:45:48 +01:00
66594d9f59 fix: adjust handler detection order 2025-12-23 04:44:01 +01:00
b336991d67 feat: add -v|--version flag 2025-12-20 23:05:42 +01:00
62c058e9ee feat: add version 2025-12-20 23:05:20 +01:00
35c8b5b8d1 refactor: use NamedTuple for validation results 2025-12-20 22:36:51 +01:00
54c3353667 refactor: simplify nested try-except blocks 2025-12-20 22:35:01 +01:00
93a124be2a clean: remove useless declarations 2025-12-20 22:33:54 +01:00
ecbee7a8ac refactor: pass logger instead of multiple get_logger() calls 2025-12-20 22:30:57 +01:00
eaf4e94d24 refactor: pass logger parameter and use error handling helper 2025-12-20 22:30:47 +01:00
97e166d5f7 refactor: make logger parameter optional 2025-12-20 22:30:33 +01:00
61e2d7f731 refactor: add handle_error() helper for centralized error handling 2025-12-20 22:30:21 +01:00
054dc01813 fix: ensure logger always writes to log file 2025-12-20 22:09:25 +01:00
799b1083ab refactor: add hacking at the bottom 2025-12-20 21:56:57 +01:00
15db235fe1 docs: update hacking section with example 2025-12-20 21:48:57 +01:00
d09dcf06cf docs: ready to fly 2025-12-20 21:47:36 +01:00
5b9589794e clean: shorter epilog 2025-12-20 21:46:16 +01:00
93b88917df refactor: consolidate all command metadata into a single structure 2025-12-20 21:26:11 +01:00
25618ab5bf refactor: simplify logger 2025-12-20 21:24:02 +01:00
81fa68ed08 fix: run commands in right order if all 2025-12-20 21:11:45 +01:00
ca6eaed146 feat: improve confirmation message handling 2025-12-20 21:10:00 +01:00
1b8b32027c feat: dependency injection to allow reusing an existing auth client 2025-12-20 21:07:50 +01:00
a6190aeb84 feat: update run_all to reuse auth and config 2025-12-20 21:07:43 +01:00
887169e7d2 feat: add some validation 2025-12-20 21:04:09 +01:00
9565e4008e feat: consistent error handling 2025-12-20 20:59:18 +01:00
c2aab71955 refactor: add basestrategy and make all class inherits to get rid of get_cursor() 2025-12-20 20:56:22 +01:00
97dd55981b refactor: use a factory pattern 2025-12-20 20:54:52 +01:00
0e91c95e9b refactor: extract common embed checking logic 2025-12-20 20:51:45 +01:00
a818df4a6c refactor: replace hardcoded tuple with constants (consistency+readability) 2025-12-20 20:50:06 +01:00
9aec57bd56 fix: logger's already setup 2025-12-20 20:48:28 +01:00
ec9943822c fix: replace hardcoded list with a registry-derived list 2025-12-20 20:47:15 +01:00
0240ff9f8e clean: remove duplicate run_operation, dead code now 2025-12-20 20:44:42 +01:00
45e2e1eb00 clean: remove is_logged() (not used) 2025-12-20 20:44:29 +01:00
ff20228fa6 docs: update roadmap 2025-12-20 17:25:02 +01:00
9d9c09d56a refactor: use our new Operation and PostAnalyzer tools 2025-12-20 17:24:29 +01:00
64355fbeeb clean: remove unused import 2025-12-20 17:24:15 +01:00
4a337e6b20 feat: add post analysis utilities 2025-12-20 17:24:05 +01:00
f27be4d603 refactor: add OperationContext and Operations classes 2025-12-20 17:23:54 +01:00
24 changed files with 1954 additions and 159 deletions

View File

@@ -1,6 +1,6 @@
# Skywipe
Skywipe is a work-in-progress Python 3.13+ CLI that helps you wipe data from your Bluesky account using the AT Protocol SDK.
Skywipe is a Python 3.13+ CLI that helps you wipe data from your Bluesky account using the AT Protocol SDK.
**Warning:** This tool performs _**destructive operations**_.
@@ -8,35 +8,25 @@ Only use it if you intend to permanently erase data from your Bluesky account.
## Requirements
Check [pyproject.toml](pyproject.toml).
Python 3.13+.
You can use `uv` to install dependencies:
The rest of the dependencies are listed in [pyproject.toml](pyproject.toml).
## Installation
Use [`pipx`](https://pipx.pypa.io/latest/installation/) to install `skywipe`:
```bash
git clone https://git.kharec.info/Kharec/skywipe.git
cd skywipe
uv sync
pipx install git+https://git.kharec.info/Kharec/skywipe.git
```
## How to run
While it's being developed, you can use the tool using `uv` :
Run the tool and see available commands with:
```bash
uv run python -m skywipe.cli all # target everything
uv run python -m skywipe.cli configure # create configuration
uv run python -m skywipe.cli posts # delete posts
uv run python -m skywipe.cli medias # delete posts with medias
uv run python -m skywipe.cli likes # undo likes
uv run python -m skywipe.cli reposts # undo reposts
uv run python -m skywipe.cli quotes # delete quotes
uv run python -m skywipe.cli follows # unfollow all
uv run python -m skywipe.cli bookmarks # delete bookmarks
skywipe -h
```
Use the `--yes` flag to skip the confirmation prompt and proceed with the operation.
A log of the operations will be saved in `~/.cache/skywipe/skywipe.log`.
Use the `--yes` flag to skip the confirmation prompt and proceed with the operation. A log of the operations will be saved in `~/.cache/skywipe/skywipe.log`.
## Configuration
@@ -52,23 +42,30 @@ verbose: true
BE SURE TO USE A [BLUESKY APP PASSWORD](https://blueskyfeeds.com/faq-app-password) FOR OBVIOUS SECURITY REASONS.
## Roadmap
## Hacking
- [x] build cli parameter management
- [x] handle configuration logic
- [x] sign in to at protocol
- [x] delete posts in batch
- [x] only delete posts with media
- [x] undo likes
- [x] undo reposts
- [x] delete quotes
- [x] unfollow accounts
- [x] remove bookmarks
- [x] make `all` run the other commands
- [x] add simple progress and logging
- [x] add safeguards (confirmation, dry-run flag)
- [ ] decent code architecture
- [ ] installation and run process
You can use `uv` to install dependencies:
```bash
git clone https://git.kharec.info/Kharec/skywipe.git
cd skywipe
uv sync
```
Run unit tests with:
```bash
uv sync --extra dev
uv run pytest
```
If you want to test your changes, you can run the tool with:
```bash
uv run python -m skywipe.cli -h
uv run python -m skywipe.cli all
# or any other command
```
## License

View File

@@ -6,5 +6,14 @@ readme = "README.md"
requires-python = ">=3.13"
dependencies = ["atproto>=0.0.65", "pyyaml>=6.0"]
[project.optional-dependencies]
dev = ["pytest>=8.0"]
[project.scripts]
skywipe = "skywipe.cli:main"
[tool.uv]
package = true
[tool.pytest.ini_options]
testpaths = ["tests"]

View File

@@ -0,0 +1 @@
__version__ = "0.1.0"

View File

@@ -10,7 +10,11 @@ class Auth:
self.client = None
def login(self) -> Client:
try:
config_data = self.config.load()
except FileNotFoundError as e:
raise ValueError(
"Configuration file not found. Run 'skywipe configure' first.") from e
handle = config_data.get("handle")
password = config_data.get("password")
@@ -19,10 +23,11 @@ class Auth:
raise ValueError(
"handle and password must be set in configuration")
try:
self.client = Client()
self.client.login(handle, password)
except Exception as e:
raise ValueError(
f"Failed to authenticate: {e}") from e
return self.client
def is_logged(self) -> bool:
return bool(getattr(self.client, "me", None))

View File

@@ -4,9 +4,13 @@ import sys
import argparse
from pathlib import Path
from . import __version__
from .commands import registry
from .configure import Configuration
from .logger import setup_logger
from .logger import setup_logger, get_logger, handle_error
LOG_FILE = Path.home() / ".cache" / "skywipe" / "skywipe.log"
def create_parser():
@@ -14,7 +18,13 @@ def create_parser():
parser = argparse.ArgumentParser(
description="Clean your bluesky account with style.",
epilog="WARNING: This tool performs destructive operations. Only use it if you intend to erase data from your Bluesky account."
epilog="WARNING: This tool deletes your Bluesky data permanently."
)
parser.add_argument(
"-v", "--version",
action="version",
version=f"Skywipe {__version__}"
)
parser.add_argument(
@@ -36,10 +46,9 @@ def create_parser():
return parser
def require_config():
def require_config(logger):
config = Configuration()
if not config.exists():
logger = setup_logger(verbose=False)
logger.error("Configuration file not found.")
logger.error("You must run 'skywipe configure' first.")
sys.exit(1)
@@ -49,27 +58,21 @@ def main():
parser = create_parser()
args = parser.parse_args()
setup_logger(verbose=False, log_file=LOG_FILE)
logger = get_logger()
try:
if registry.requires_config(args.command):
require_config()
require_config(logger)
config = Configuration()
config_data = config.load()
verbose = config_data.get("verbose", False)
log_file = Path.home() / ".cache" / "skywipe" / "skywipe.log"
setup_logger(verbose=verbose, log_file=log_file)
else:
setup_logger(verbose=False)
setup_logger(verbose=verbose, log_file=LOG_FILE)
try:
registry.execute(
args.command, skip_confirmation=getattr(args, "yes", False))
except ValueError as e:
logger = setup_logger(verbose=False)
logger.error(f"{e}")
sys.exit(1)
except Exception as e:
logger = setup_logger(verbose=False)
logger.error(f"Unexpected error: {e}", exc_info=True)
sys.exit(1)
except (ValueError, Exception) as e:
handle_error(e, logger, exit_on_error=True)
if __name__ == '__main__':

View File

@@ -1,21 +1,95 @@
"""Command implementations for Skywipe"""
from typing import Callable, Dict, Optional
from typing import Callable, Any, TypedDict
from .configure import Configuration
from .posts import delete_all_posts
from .medias import delete_posts_with_medias
from .likes import undo_likes
from .reposts import undo_reposts
from .quotes import delete_quotes_posts
from .follows import unfollow_all
from .bookmarks import delete_bookmarks
from .logger import get_logger
from .operations import Operation
from .post_analysis import PostAnalyzer
from .logger import get_logger, handle_error
from .safeguard import require_confirmation
CommandHandler = Callable[..., None]
class CommandMetadata(TypedDict):
confirmation: str
help_text: str
operation_name: str
strategy_type: str
collection: str | None
filter_fn: Callable[[Any], bool] | None
COMMAND_METADATA: dict[str, CommandMetadata] = {
"posts": {
"confirmation": "delete all posts",
"help_text": "only posts",
"operation_name": "Deleting posts",
"strategy_type": "feed",
"collection": None,
"filter_fn": None,
},
"medias": {
"confirmation": "delete all posts with media",
"help_text": "only posts with medias",
"operation_name": "Deleting posts with media",
"strategy_type": "feed",
"collection": None,
"filter_fn": lambda post: PostAnalyzer.has_media(post.post),
},
"likes": {
"confirmation": "undo all likes",
"help_text": "only likes",
"operation_name": "Undoing likes",
"strategy_type": "record",
"collection": "app.bsky.feed.like",
"filter_fn": None,
},
"reposts": {
"confirmation": "undo all reposts",
"help_text": "only reposts",
"operation_name": "Undoing reposts",
"strategy_type": "record",
"collection": "app.bsky.feed.repost",
"filter_fn": None,
},
"quotes": {
"confirmation": "delete all quote posts",
"help_text": "only quotes",
"operation_name": "Deleting quote posts",
"strategy_type": "feed",
"collection": None,
"filter_fn": lambda post: PostAnalyzer.has_quote(post.post),
},
"follows": {
"confirmation": "unfollow all accounts",
"help_text": "only follows",
"operation_name": "Unfollowing accounts",
"strategy_type": "record",
"collection": "app.bsky.graph.follow",
"filter_fn": None,
},
"bookmarks": {
"confirmation": "delete all bookmarks",
"help_text": "only bookmarks",
"operation_name": "Deleting bookmarks",
"strategy_type": "bookmark",
"collection": None,
"filter_fn": None,
},
}
COMMAND_EXECUTION_ORDER = [
"quotes",
"medias",
"posts",
"likes",
"reposts",
"follows",
"bookmarks",
]
class CommandRegistry:
def __init__(self):
self._commands = {}
@@ -33,16 +107,16 @@ class CommandRegistry:
self._help_texts[name] = help_text
self._requires_config[name] = requires_config
def get_handler(self, name: str) -> Optional[CommandHandler]:
def get_handler(self, name: str) -> CommandHandler | None:
return self._commands.get(name)
def get_help_text(self, name: str) -> Optional[str]:
def get_help_text(self, name: str) -> str | None:
return self._help_texts.get(name)
def requires_config(self, name: str) -> bool:
return self._requires_config.get(name, True)
def get_all_commands(self) -> Dict[str, str]:
def get_all_commands(self) -> dict[str, str]:
return self._help_texts.copy()
def execute(self, name: str, skip_confirmation: bool = False):
@@ -59,57 +133,93 @@ class CommandRegistry:
registry = CommandRegistry()
def _create_operation_handler(
confirmation_message: str,
operation_name: str,
strategy_type: str = "feed",
collection: str | None = None,
filter_fn: Callable[[Any], bool] | None = None
) -> CommandHandler:
logger = get_logger()
def handler(skip_confirmation: bool = False):
require_confirmation(confirmation_message, skip_confirmation, logger)
try:
Operation(
operation_name,
strategy_type=strategy_type,
collection=collection,
filter_fn=filter_fn
).run()
except (ValueError, Exception) as e:
handle_error(e, logger)
return handler
def run_configure():
config = Configuration()
config.create()
def run_posts(skip_confirmation: bool = False):
require_confirmation("delete all posts", skip_confirmation)
delete_all_posts()
def _create_command_handlers():
handlers = {}
for cmd, metadata in COMMAND_METADATA.items():
handlers[cmd] = _create_operation_handler(
metadata["confirmation"],
metadata["operation_name"],
strategy_type=metadata["strategy_type"],
collection=metadata["collection"],
filter_fn=metadata["filter_fn"]
)
return handlers
def run_medias(skip_confirmation: bool = False):
require_confirmation("delete all posts with media", skip_confirmation)
delete_posts_with_medias()
def run_likes(skip_confirmation: bool = False):
require_confirmation("undo all likes", skip_confirmation)
undo_likes()
def run_reposts(skip_confirmation: bool = False):
require_confirmation("undo all reposts", skip_confirmation)
undo_reposts()
def run_quotes(skip_confirmation: bool = False):
require_confirmation("delete all quote posts", skip_confirmation)
delete_quotes_posts()
def run_follows(skip_confirmation: bool = False):
require_confirmation("unfollow all accounts", skip_confirmation)
unfollow_all()
def run_bookmarks(skip_confirmation: bool = False):
require_confirmation("delete all bookmarks", skip_confirmation)
delete_bookmarks()
_command_handlers = _create_command_handlers()
def run_all(skip_confirmation: bool = False):
logger = get_logger()
require_confirmation(
"run all cleanup commands (posts, likes, reposts, follows, bookmarks)", skip_confirmation)
commands = ["posts", "likes", "reposts", "follows", "bookmarks"]
all_commands = registry.get_all_commands()
available_commands = [cmd for cmd in all_commands.keys()
if cmd not in ("configure", "all")]
commands = [cmd for cmd in COMMAND_EXECUTION_ORDER
if cmd in available_commands]
commands.extend([cmd for cmd in available_commands
if cmd not in COMMAND_EXECUTION_ORDER])
commands_str = ", ".join(commands)
all_confirmation = f"run all cleanup commands ({commands_str})"
require_confirmation(all_confirmation, skip_confirmation, logger)
logger.info("Running all cleanup commands...")
from .operations import OperationContext
try:
context = OperationContext()
shared_client = context.client
shared_config_data = context.config_data
except Exception as e:
logger.error(
f"Failed to initialize shared context: {e}", exc_info=True)
raise
for cmd in commands:
try:
logger.info(f"Starting command: {cmd}")
metadata = COMMAND_METADATA.get(cmd)
if metadata:
Operation(
metadata["operation_name"],
strategy_type=metadata["strategy_type"],
collection=metadata["collection"],
filter_fn=metadata["filter_fn"],
client=shared_client,
config_data=shared_config_data
).run()
else:
registry.execute(cmd, skip_confirmation=True)
logger.info(f"Completed command: {cmd}")
except Exception as e:
@@ -120,11 +230,6 @@ def run_all(skip_confirmation: bool = False):
registry.register("configure", run_configure,
"create configuration", requires_config=False)
registry.register("posts", run_posts, "only posts")
registry.register("medias", run_medias, "only posts with medias")
registry.register("likes", run_likes, "only likes")
registry.register("reposts", run_reposts, "only reposts")
registry.register("quotes", run_quotes, "only quotes")
registry.register("follows", run_follows, "only follows")
registry.register("bookmarks", run_bookmarks, "only bookmarks")
for cmd, metadata in COMMAND_METADATA.items():
registry.register(cmd, _command_handlers[cmd], metadata["help_text"])
registry.register("all", run_all, "target everything")

View File

@@ -1,9 +1,52 @@
"""Core configuration module for Skywipe"""
import getpass
from getpass import getpass
import re
from pathlib import Path
from typing import NamedTuple
import yaml
from .logger import setup_logger, get_logger
from .logger import setup_logger
class ValidationResult(NamedTuple):
is_valid: bool
error_message: str
def _validate_handle(handle: str) -> ValidationResult:
if not handle:
return ValidationResult(False, "Handle cannot be empty")
if len(handle) > 253:
return ValidationResult(False, "Handle is too long (max 253 characters)")
if " " in handle:
return ValidationResult(False, "Handle cannot contain spaces")
handle_pattern = re.compile(
r"^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$|"
r"^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$|"
r"^did:[a-z]+:[a-zA-Z0-9._-]+$"
)
if not handle_pattern.match(handle):
return ValidationResult(False, (
"Invalid handle format. "
"Use a username (e.g., 'alice'), full handle (e.g., 'alice.bsky.social'), "
"or DID (e.g., 'did:plc:...')"
))
return ValidationResult(True, "")
def _validate_password(password: str) -> ValidationResult:
if not password:
return ValidationResult(False, "Password cannot be empty")
if len(password) < 8:
return ValidationResult(False, "Password is too short (minimum 8 characters)")
return ValidationResult(True, "")
class Configuration:
@@ -13,35 +56,100 @@ class Configuration:
def exists(self) -> bool:
return self.config_file.exists()
def create(self):
logger = setup_logger(verbose=False)
if self.exists():
def _confirm_overwrite(self, logger) -> bool:
if not self.exists():
return True
overwrite = input(
"Configuration already exists. Overwrite? (y/N): ").strip().lower()
if overwrite not in ("y", "yes"):
if overwrite in ("y", "yes"):
return True
logger.info("Configuration creation cancelled.")
return
return False
def _ensure_config_dir(self) -> None:
config_dir = self.config_file.parent
config_dir.mkdir(parents=True, exist_ok=True)
print("Skywipe Configuration")
print("=" * 50)
def _prompt_handle(self, logger) -> str:
while True:
handle = input("Bluesky handle: ").strip()
password = getpass.getpass("Bluesky app password: ").strip()
is_valid, error_msg = _validate_handle(handle)
if is_valid:
return handle
logger.error(error_msg)
logger.info("Please enter a valid handle and try again.")
def _prompt_password(self, logger) -> str:
while True:
password = getpass(
"Bluesky (hopefully app) password: ").strip()
is_valid, error_msg = _validate_password(password)
if is_valid:
return password
logger.error(error_msg)
logger.info("Please check your password and try again.")
logger.info(
"Generate an app password at: https://bsky.app/settings/app-passwords")
def _parse_batch_size(self, logger) -> int | None:
batch_size = input("Batch size (default: 10): ").strip() or "10"
try:
batch_size_int = int(batch_size)
except ValueError:
logger.error("batch_size must be an integer")
return None
if batch_size_int < 1 or batch_size_int > 100:
logger.error("batch_size must be between 1 and 100")
return None
return batch_size_int
def _parse_delay(self, logger) -> int | None:
delay = input(
"Delay between batches in seconds (default: 1): ").strip() or "1"
try:
delay_int = int(delay)
except ValueError:
logger.error("delay must be an integer")
return None
if delay_int < 0 or delay_int > 60:
logger.error("delay must be between 0 and 60 seconds")
return None
return delay_int
def _parse_verbose(self) -> bool:
verbose_input = input(
"Verbose mode (y/n, default: y): ").strip().lower() or "y"
verbose = verbose_input in ("y", "yes", "true", "1")
return verbose_input in ("y", "yes", "true", "1")
def _write_config(self, logger, config_data: dict) -> None:
try:
batch_size = int(batch_size)
delay = int(delay)
except ValueError:
logger.error("batch_size and delay must be integers")
with open(self.config_file, "w") as f:
yaml.dump(config_data, f, default_flow_style=False)
except (IOError, OSError) as e:
logger.error(f"Failed to save configuration: {e}")
return
logger.info(f"Configuration saved to {self.config_file}")
def create(self):
logger = setup_logger(verbose=False)
if not self._confirm_overwrite(logger):
return
self._ensure_config_dir()
print("Skywipe Configuration")
print("=" * 50)
print("Note: You should use an app password from Bluesky settings.")
handle = self._prompt_handle(logger)
password = self._prompt_password(logger)
batch_size = self._parse_batch_size(logger)
if batch_size is None:
return
delay = self._parse_delay(logger)
if delay is None:
return
verbose = self._parse_verbose()
config_data = {
"handle": handle,
@@ -51,14 +159,21 @@ class Configuration:
"verbose": verbose
}
with open(self.config_file, "w") as f:
yaml.dump(config_data, f, default_flow_style=False)
logger.info(f"Configuration saved to {self.config_file}")
self._write_config(logger, config_data)
def load(self) -> dict:
if not self.exists():
raise FileNotFoundError(
f"Configuration file not found: {self.config_file}")
try:
with open(self.config_file, "r") as f:
return yaml.safe_load(f)
config_data = yaml.safe_load(f)
if config_data is None:
raise ValueError("Configuration file is empty or invalid")
return config_data
except (IOError, OSError) as e:
raise ValueError(
f"Failed to read configuration file: {e}") from e
except yaml.YAMLError as e:
raise ValueError(
f"Invalid YAML in configuration file: {e}") from e

View File

@@ -3,7 +3,6 @@
import logging
import sys
from pathlib import Path
from typing import Optional
class ProgressTracker:
@@ -14,9 +13,9 @@ class ProgressTracker:
def update(self, count: int = 1):
self.current += count
def batch(self, batch_num: int, batch_size: int, total_batches: Optional[int] = None):
def batch(self, batch_num: int, batch_size: int, total_batches: int | None = None):
logger = logging.getLogger("skywipe.progress")
if total_batches:
if total_batches is not None:
logger.info(
f"{self.operation} - batch {batch_num}/{total_batches} ({batch_size} items)"
)
@@ -35,25 +34,50 @@ class LevelFilter(logging.Filter):
return self.min_level <= record.levelno <= self.max_level
def setup_logger(verbose: bool = False, log_file: Optional[Path] = None) -> logging.Logger:
def setup_logger(verbose: bool = False, log_file: Path | None = None) -> logging.Logger:
logger = logging.getLogger("skywipe")
logger.setLevel(logging.DEBUG if verbose else logging.INFO)
logger.propagate = False
target_level = logging.DEBUG if verbose else logging.INFO
logger.setLevel(target_level)
if logger.handlers:
return logger
progress_logger = logging.getLogger("skywipe.progress")
if not progress_logger.handlers:
progress_logger.propagate = True
info_handler = None
error_handler = None
file_handlers = []
for handler in logger.handlers:
if isinstance(handler, logging.FileHandler):
file_handlers.append(handler)
elif isinstance(handler, logging.StreamHandler):
if handler.stream == sys.stdout:
info_handler = handler
elif handler.stream == sys.stderr:
error_handler = handler
formatter = logging.Formatter(fmt="%(levelname)s: %(message)s")
if info_handler is None:
info_handler = logging.StreamHandler(sys.stdout)
info_handler.setLevel(logging.DEBUG if verbose else logging.INFO)
logger.addHandler(info_handler)
for existing in list(info_handler.filters):
if isinstance(existing, LevelFilter):
info_handler.removeFilter(existing)
info_handler.addFilter(LevelFilter(logging.DEBUG, logging.INFO))
info_handler.setFormatter(formatter)
logger.addHandler(info_handler)
info_handler.setLevel(target_level)
if error_handler is None:
error_handler = logging.StreamHandler(sys.stderr)
error_handler.setLevel(logging.WARNING)
error_handler.setFormatter(formatter)
logger.addHandler(error_handler)
error_handler.setFormatter(formatter)
for handler in file_handlers:
handler.close()
logger.removeHandler(handler)
if log_file:
log_file.parent.mkdir(parents=True, exist_ok=True)
@@ -71,3 +95,22 @@ def setup_logger(verbose: bool = False, log_file: Optional[Path] = None) -> logg
def get_logger() -> logging.Logger:
return logging.getLogger("skywipe")
def _format_error_message(error: Exception) -> str:
if isinstance(error, KeyError):
return str(error.args[0]) if error.args else str(error)
return str(error)
def handle_error(error: Exception, logger: logging.Logger, exit_on_error: bool = False) -> None:
if isinstance(error, (KeyError, ValueError)):
logger.error(_format_error_message(error))
else:
logger.error(
f"Unexpected error: {_format_error_message(error)}", exc_info=True)
if exit_on_error:
sys.exit(1)
else:
raise error

219
skywipe/operations.py Normal file
View File

@@ -0,0 +1,219 @@
"""Shared operation utilities and strategies for Skywipe"""
import time
from typing import Callable, Any
from atproto import models
from .auth import Auth
from .configure import Configuration
from .logger import get_logger, ProgressTracker
class OperationContext:
def __init__(self, client=None, config_data=None):
self.logger = get_logger()
self.client, self.did = self._initialize_client(client)
self.config_data = self._initialize_config(config_data)
self.batch_size = self.config_data.get("batch_size", 10)
self.delay = self.config_data.get("delay", 1)
def _initialize_client(self, client):
if client is not None:
return client, client.me.did
try:
auth = Auth()
client = auth.login()
return client, client.me.did
except (ValueError, FileNotFoundError) as e:
self.logger.error(f"Configuration error: {e}")
raise
except Exception as e:
self.logger.error(
f"Unexpected error during initialization: {e}", exc_info=True)
raise ValueError(
f"Failed to initialize operation context: {e}") from e
def _initialize_config(self, config_data):
if config_data is not None:
return config_data
try:
config = Configuration()
return config.load()
except (ValueError, FileNotFoundError) as e:
self.logger.error(f"Configuration error: {e}")
raise
except Exception as e:
self.logger.error(
f"Unexpected error loading configuration: {e}", exc_info=True)
raise ValueError(
f"Failed to load configuration: {e}") from e
class BaseStrategy:
def fetch(self, context: OperationContext, cursor: str | None = None) -> Any:
raise NotImplementedError
def extract_items(self, response: Any) -> list[Any]:
raise NotImplementedError
def process_item(self, item: Any, context: OperationContext) -> None:
raise NotImplementedError
def get_cursor(self, response: Any) -> str | None:
return response.cursor
class RecordDeletionStrategy(BaseStrategy):
def __init__(self, collection: str):
self.collection = collection
def fetch(self, context: OperationContext, cursor: str | None = None) -> Any:
list_params = models.ComAtprotoRepoListRecords.Params(
repo=context.did,
collection=self.collection,
limit=context.batch_size,
cursor=cursor
)
return context.client.com.atproto.repo.list_records(params=list_params)
def extract_items(self, response: Any) -> list[Any]:
return response.records
def process_item(self, record: Any, context: OperationContext) -> None:
record_uri = record.uri
rkey = record_uri.rsplit("/", 1)[-1]
delete_data = {
"repo": context.did,
"collection": self.collection,
"rkey": rkey
}
context.client.com.atproto.repo.delete_record(data=delete_data)
context.logger.debug(f"Deleted: {record_uri}")
class FeedStrategy(BaseStrategy):
def fetch(self, context: OperationContext, cursor: str | None = None) -> Any:
if cursor:
return context.client.get_author_feed(
actor=context.did, limit=context.batch_size, cursor=cursor
)
return context.client.get_author_feed(actor=context.did, limit=context.batch_size)
def extract_items(self, response: Any) -> list[Any]:
return response.feed
def process_item(self, post: Any, context: OperationContext) -> None:
uri = post.post.uri
context.client.delete_post(uri)
context.logger.debug(f"Deleted post: {uri}")
class BookmarkStrategy(BaseStrategy):
def fetch(self, context: OperationContext, cursor: str | None = None) -> Any:
get_params = models.AppBskyBookmarkGetBookmarks.Params(
limit=context.batch_size,
cursor=cursor
)
return context.client.app.bsky.bookmark.get_bookmarks(params=get_params)
def extract_items(self, response: Any) -> list[Any]:
return response.bookmarks
def process_item(self, bookmark: Any, context: OperationContext) -> None:
bookmark_uri = self._extract_bookmark_uri(bookmark)
if not bookmark_uri:
raise ValueError("Unable to find bookmark URI")
delete_data = models.AppBskyBookmarkDeleteBookmark.Data(
uri=bookmark_uri)
context.client.app.bsky.bookmark.delete_bookmark(data=delete_data)
context.logger.debug(f"Deleted bookmark: {bookmark_uri}")
def _extract_bookmark_uri(self, bookmark: Any) -> str | None:
if hasattr(bookmark, "uri"):
return bookmark.uri
for attr_name in ("subject", "record", "post", "item"):
if hasattr(bookmark, attr_name):
nested = getattr(bookmark, attr_name)
if hasattr(nested, "uri"):
return nested.uri
return None
class Operation:
def __init__(
self,
operation_name: str,
strategy_type: str = "feed",
collection: str | None = None,
filter_fn: Callable[[Any], bool] | None = None,
client=None,
config_data=None
):
self.operation_name = operation_name
self.filter_fn = filter_fn
self._client = client
self._config_data = config_data
self.strategy: BaseStrategy
if strategy_type == "record":
if not collection:
raise ValueError("Collection is required for record strategy")
self.strategy = RecordDeletionStrategy(collection)
elif strategy_type == "bookmark":
self.strategy = BookmarkStrategy()
else:
self.strategy = FeedStrategy()
def run(self) -> int:
context = OperationContext(
client=self._client, config_data=self._config_data)
progress = ProgressTracker(operation=self.operation_name)
context.logger.info(
f"Starting {self.operation_name} with batch_size={context.batch_size}, delay={context.delay}s"
)
cursor = None
total_processed = 0
batch_num = 0
while True:
batch_num += 1
try:
response = self.strategy.fetch(context, cursor)
items = self.strategy.extract_items(response)
except Exception as e:
context.logger.error(
f"Error fetching items for batch {batch_num}: {e}", exc_info=True)
break
if not items:
break
progress.batch(batch_num, len(items))
for item in items:
if self.filter_fn and not self.filter_fn(item):
continue
try:
self.strategy.process_item(item, context)
total_processed += 1
progress.update(1)
except Exception as e:
context.logger.error(f"Error processing item: {e}")
cursor = self.strategy.get_cursor(response)
if not cursor:
break
if context.delay > 0:
time.sleep(context.delay)
context.logger.info(
f"{self.operation_name}: {total_processed} items processed.")
return total_processed

70
skywipe/post_analysis.py Normal file
View File

@@ -0,0 +1,70 @@
"""Post analysis utilities for Skywipe"""
class PostAnalyzer:
MEDIA_TYPES = {
'app.bsky.embed.images',
'app.bsky.embed.video',
'app.bsky.embed.external'
}
QUOTE_TYPES = {
'app.bsky.embed.record',
'app.bsky.embed.recordWithMedia',
'app.bsky.embed.record_with_media'
}
QUOTE_WITH_MEDIA_TYPES = {
'app.bsky.embed.recordWithMedia',
'app.bsky.embed.record_with_media'
}
@staticmethod
def _get_embed(post_record):
return getattr(post_record, 'embed', None)
@staticmethod
def _get_embed_type_base(embed):
embed_type = getattr(embed, 'py_type', None)
if embed_type:
return embed_type.split('#')[0]
return None
@staticmethod
def has_media(post_record):
embed = PostAnalyzer._get_embed(post_record)
if not embed:
return False
embed_type_base = PostAnalyzer._get_embed_type_base(embed)
if embed_type_base:
if embed_type_base in PostAnalyzer.MEDIA_TYPES:
return True
if embed_type_base in PostAnalyzer.QUOTE_WITH_MEDIA_TYPES:
media = getattr(embed, 'media', None)
if media:
media_type_base = PostAnalyzer._get_embed_type_base(media)
if media_type_base and media_type_base in PostAnalyzer.MEDIA_TYPES:
return True
for attr in ('images', 'video', 'external'):
if hasattr(embed, attr):
return True
if isinstance(embed, dict) and embed.get(attr):
return True
return False
@staticmethod
def has_quote(post_record):
embed = PostAnalyzer._get_embed(post_record)
if not embed:
return False
embed_type_base = PostAnalyzer._get_embed_type_base(embed)
if embed_type_base and embed_type_base in PostAnalyzer.QUOTE_TYPES:
return True
return (hasattr(embed, 'record') or
(isinstance(embed, dict) and embed.get('record')))

View File

@@ -1,17 +1,24 @@
"""Safeguard module for Skywipe"""
import sys
import logging
from .logger import get_logger
CONFIRM_RESPONSES = {"yes", "y"}
def require_confirmation(operation: str, skip_confirmation: bool = False) -> None:
def require_confirmation(
operation: str,
skip_confirmation: bool = False,
logger: logging.Logger | None = None
) -> None:
if skip_confirmation:
return
if logger is None:
logger = get_logger()
logger.warning(f"This will {operation}")
logger.warning("This operation is DESTRUCTIVE and cannot be undone!")

25
tests/conftest.py Normal file
View File

@@ -0,0 +1,25 @@
from pathlib import Path
from typing import Iterable, Callable
import pytest
from skywipe.configure import Configuration
@pytest.fixture
def user_input(monkeypatch) -> Callable[[Iterable[str], Iterable[str]], None]:
def _set(inputs: Iterable[str], passwords: Iterable[str]) -> None:
input_iter = iter(inputs)
password_iter = iter(passwords)
monkeypatch.setattr("builtins.input", lambda _prompt: next(input_iter))
monkeypatch.setattr("skywipe.configure.getpass",
lambda _prompt: next(password_iter))
return _set
@pytest.fixture
def config_with_tmp_path(monkeypatch, tmp_path):
monkeypatch.setattr(Path, "home", lambda: tmp_path)
return Configuration()

239
tests/test_cli.py Normal file
View File

@@ -0,0 +1,239 @@
import logging
import sys
import pytest
import skywipe.cli as cli
TEST_COMMAND = "posts"
TEST_ERROR_MESSAGE = "boom"
TEST_LOGGER_NAME = "test.cli"
def _setup_parser_mocks(monkeypatch, commands=None):
if commands is None:
commands = {TEST_COMMAND: "only posts"}
monkeypatch.setattr(cli.registry, "get_all_commands", lambda: commands)
def _setup_main_mocks(monkeypatch, calls, requires_config=False, config_data=None):
_setup_parser_mocks(monkeypatch)
monkeypatch.setattr(cli.registry, "requires_config",
lambda name: requires_config)
def mock_execute(name, skip_confirmation=False):
calls["execute"] = (name, skip_confirmation)
def mock_setup_logger(verbose, log_file):
calls["setup"].append((verbose, log_file))
def mock_require_config(logger):
calls["require_config"].append(logger)
monkeypatch.setattr(cli.registry, "execute", mock_execute)
monkeypatch.setattr(cli, "setup_logger", mock_setup_logger)
monkeypatch.setattr(cli, "require_config", mock_require_config)
monkeypatch.setattr(cli, "get_logger",
lambda: logging.getLogger(TEST_LOGGER_NAME))
if config_data is not None:
monkeypatch.setattr(cli.Configuration, "load",
lambda self: config_data)
def _setup_error_mocks(monkeypatch, calls, error_factory):
_setup_parser_mocks(monkeypatch)
monkeypatch.setattr(cli.registry, "requires_config", lambda name: False)
monkeypatch.setattr(cli.registry, "execute", error_factory)
monkeypatch.setattr(cli, "setup_logger", lambda verbose, log_file: None)
monkeypatch.setattr(cli, "get_logger",
lambda: logging.getLogger(TEST_LOGGER_NAME))
def _format_error_message(error):
if isinstance(error, KeyError):
return error.args[0] if error.args else str(error)
return str(error)
def mock_handle_error(error, logger, exit_on_error=False):
calls["handle_error"] = (type(error).__name__,
_format_error_message(error), exit_on_error)
monkeypatch.setattr(cli, "handle_error", mock_handle_error)
def test_create_parser_includes_commands(monkeypatch):
_setup_parser_mocks(monkeypatch)
parser = cli.create_parser()
args = parser.parse_args([TEST_COMMAND])
assert args.command == TEST_COMMAND
def test_create_parser_handles_multiple_commands(monkeypatch):
commands = {
"posts": "only posts",
"likes": "only likes",
"reposts": "only reposts"
}
_setup_parser_mocks(monkeypatch, commands)
parser = cli.create_parser()
args1 = parser.parse_args(["posts"])
args2 = parser.parse_args(["likes"])
args3 = parser.parse_args(["reposts"])
assert args1.command == "posts"
assert args2.command == "likes"
assert args3.command == "reposts"
def test_create_parser_parses_yes_flag(monkeypatch):
_setup_parser_mocks(monkeypatch)
parser = cli.create_parser()
args = parser.parse_args(["--yes", TEST_COMMAND])
assert args.command == TEST_COMMAND
assert args.yes is True
def test_create_parser_parses_without_yes_flag(monkeypatch):
_setup_parser_mocks(monkeypatch)
parser = cli.create_parser()
args = parser.parse_args([TEST_COMMAND])
assert args.command == TEST_COMMAND
assert getattr(args, "yes", False) is False
def test_create_parser_version_flag_exits(monkeypatch):
_setup_parser_mocks(monkeypatch)
parser = cli.create_parser()
with pytest.raises(SystemExit) as excinfo:
parser.parse_args(["--version"])
assert excinfo.value.code == 0
def test_create_parser_requires_command(monkeypatch):
_setup_parser_mocks(monkeypatch)
parser = cli.create_parser()
with pytest.raises(SystemExit):
parser.parse_args([])
def test_create_parser_rejects_invalid_command(monkeypatch):
_setup_parser_mocks(monkeypatch)
parser = cli.create_parser()
with pytest.raises(SystemExit):
parser.parse_args(["invalid_command"])
def test_require_config_exits_when_missing(monkeypatch):
monkeypatch.setattr(cli.Configuration, "exists", lambda self: False)
logger = logging.getLogger(TEST_LOGGER_NAME)
with pytest.raises(SystemExit) as excinfo:
cli.require_config(logger)
assert excinfo.value.code == 1
def test_require_config_does_not_exit_when_exists(monkeypatch):
monkeypatch.setattr(cli.Configuration, "exists", lambda self: True)
logger = logging.getLogger(TEST_LOGGER_NAME)
cli.require_config(logger)
def test_main_executes_without_config(monkeypatch):
calls = {"execute": None, "setup": [], "require_config": []}
_setup_main_mocks(monkeypatch, calls, requires_config=False)
monkeypatch.setattr(sys, "argv", ["skywipe", "--yes", TEST_COMMAND])
cli.main()
assert len(calls["require_config"]) == 0
assert calls["setup"] == [(False, cli.LOG_FILE)]
assert calls["execute"] == (TEST_COMMAND, True)
def test_main_loads_config_and_sets_verbose(monkeypatch):
calls = {"setup": [], "execute": None, "require_config": []}
_setup_main_mocks(monkeypatch, calls, requires_config=True,
config_data={"verbose": True})
monkeypatch.setattr(sys, "argv", ["skywipe", TEST_COMMAND])
cli.main()
assert len(calls["require_config"]) == 1
assert calls["setup"] == [(False, cli.LOG_FILE), (True, cli.LOG_FILE)]
assert calls["execute"] == (TEST_COMMAND, False)
@pytest.mark.parametrize("config_data,expected_verbose", [
({}, False),
({"verbose": False}, False),
])
def test_main_config_verbose_defaults(monkeypatch, config_data, expected_verbose):
calls = {"setup": [], "execute": None, "require_config": []}
_setup_main_mocks(monkeypatch, calls, requires_config=True,
config_data=config_data)
monkeypatch.setattr(sys, "argv", ["skywipe", TEST_COMMAND])
cli.main()
assert len(calls["require_config"]) == 1
assert calls["setup"] == [(False, cli.LOG_FILE),
(expected_verbose, cli.LOG_FILE)]
assert calls["execute"] == (TEST_COMMAND, False)
def test_main_handles_config_load_error(monkeypatch):
calls = {"handle_error": None, "require_config": []}
def mock_require_config(logger):
calls["require_config"].append(logger)
def raise_config_error(self):
raise RuntimeError("config error")
def _format_error_message(error):
if isinstance(error, KeyError):
return error.args[0] if error.args else str(error)
return str(error)
def mock_handle_error(error, logger, exit_on_error=False):
calls["handle_error"] = (type(error).__name__,
_format_error_message(error), exit_on_error)
_setup_parser_mocks(monkeypatch)
monkeypatch.setattr(cli.registry, "requires_config", lambda name: True)
monkeypatch.setattr(cli, "require_config", mock_require_config)
monkeypatch.setattr(cli.Configuration, "load", raise_config_error)
monkeypatch.setattr(cli, "setup_logger", lambda verbose, log_file: None)
monkeypatch.setattr(cli, "get_logger",
lambda: logging.getLogger(TEST_LOGGER_NAME))
monkeypatch.setattr(cli, "handle_error", mock_handle_error)
monkeypatch.setattr(sys, "argv", ["skywipe", TEST_COMMAND])
cli.main()
assert len(calls["require_config"]) == 1
assert calls["handle_error"] is not None
assert calls["handle_error"][0] == "RuntimeError"
assert calls["handle_error"][2] is True
@pytest.mark.parametrize("error_class,error_message", [
(ValueError, TEST_ERROR_MESSAGE),
(RuntimeError, "runtime error"),
(KeyError, "missing key"),
])
def test_main_handles_execute_errors(monkeypatch, error_class, error_message):
calls = {"handle_error": None}
def raise_error(*_args, **_kwargs):
raise error_class(error_message)
_setup_error_mocks(monkeypatch, calls, raise_error)
monkeypatch.setattr(sys, "argv", ["skywipe", TEST_COMMAND])
cli.main()
assert calls["handle_error"] is not None
assert calls["handle_error"][0] == error_class.__name__
assert calls["handle_error"][1] == error_message
assert calls["handle_error"][2] is True

32
tests/test_commands.py Normal file
View File

@@ -0,0 +1,32 @@
import pytest
from skywipe.commands import CommandRegistry
def test_command_registry_register_and_execute():
registry = CommandRegistry()
calls = {"configure": 0, "run": 0, "skip": None}
def configure_handler():
calls["configure"] += 1
def run_handler(skip_confirmation=False):
calls["run"] += 1
calls["skip"] = skip_confirmation
registry.register("configure", configure_handler,
"config", requires_config=False)
registry.register("run", run_handler, "run")
registry.execute("configure")
registry.execute("run", skip_confirmation=True)
assert calls["configure"] == 1
assert calls["run"] == 1
assert calls["skip"] is True
def test_command_registry_unknown_command():
registry = CommandRegistry()
with pytest.raises(ValueError, match="Unknown command"):
registry.execute("nope")

View File

@@ -0,0 +1,132 @@
import logging
import pytest
import skywipe.commands as commands
import skywipe.operations as operations
def test_create_operation_handler_calls_confirmation_and_run(monkeypatch):
calls = {"confirmed": False, "run": 0}
def fake_confirm(message, skip_confirmation, logger):
calls["confirmed"] = True
assert message == "do the thing"
assert skip_confirmation is True
assert isinstance(logger, logging.Logger)
class FakeOperation:
def __init__(self, *args, **kwargs):
pass
def run(self):
calls["run"] += 1
monkeypatch.setattr(commands, "require_confirmation", fake_confirm)
monkeypatch.setattr(commands, "Operation", FakeOperation)
handler = commands._create_operation_handler(
"do the thing", "Test", strategy_type="feed"
)
handler(skip_confirmation=True)
assert calls["confirmed"] is True
assert calls["run"] == 1
def test_run_all_runs_in_order(monkeypatch):
ran = []
fake_commands = {
"posts": "only posts",
"likes": "only likes",
"medias": "only medias",
"extra": "extra",
}
fake_metadata = {
"posts": {
"operation_name": "Deleting posts",
"strategy_type": "feed",
"collection": None,
"filter_fn": None,
},
"likes": {
"operation_name": "Undoing likes",
"strategy_type": "record",
"collection": "app.bsky.feed.like",
"filter_fn": None,
},
"medias": {
"operation_name": "Deleting posts with media",
"strategy_type": "feed",
"collection": None,
"filter_fn": None,
},
"extra": {
"operation_name": "Extra op",
"strategy_type": "feed",
"collection": None,
"filter_fn": None,
},
}
fake_order = ["medias", "posts", "likes"]
class FakeOperation:
def __init__(self, operation_name, **kwargs):
self.operation_name = operation_name
def run(self):
ran.append(self.operation_name)
class FakeOperationContext:
def __init__(self):
self.client = object()
self.config_data = {"batch_size": 1, "delay": 0}
monkeypatch.setattr(commands, "require_confirmation",
lambda *args, **kwargs: None)
monkeypatch.setattr(commands, "Operation", FakeOperation)
monkeypatch.setattr(operations, "OperationContext", FakeOperationContext)
monkeypatch.setattr(commands.registry,
"get_all_commands", lambda: fake_commands)
monkeypatch.setattr(commands, "COMMAND_METADATA", fake_metadata)
monkeypatch.setattr(commands, "COMMAND_EXECUTION_ORDER", fake_order)
commands.run_all(skip_confirmation=True)
expected = [
"Deleting posts with media",
"Deleting posts",
"Undoing likes",
"Extra op",
]
assert ran == expected
def test_run_all_continues_on_error(monkeypatch):
ran = []
class FakeOperation:
def __init__(self, operation_name, **kwargs):
self.operation_name = operation_name
def run(self):
ran.append(self.operation_name)
if self.operation_name == "Deleting posts":
raise RuntimeError("fail")
class FakeOperationContext:
def __init__(self):
self.client = object()
self.config_data = {"batch_size": 1, "delay": 0}
monkeypatch.setattr(commands, "require_confirmation",
lambda *args, **kwargs: None)
monkeypatch.setattr(commands, "Operation", FakeOperation)
monkeypatch.setattr(operations, "OperationContext", FakeOperationContext)
commands.run_all(skip_confirmation=True)
assert "Deleting posts" in ran
assert len(ran) >= 2

33
tests/test_configure.py Normal file
View File

@@ -0,0 +1,33 @@
import pytest
from skywipe.configure import _validate_handle, _validate_password
@pytest.mark.parametrize(
"handle,expected_valid",
[
("", False),
("a" * 254, False),
("has space", False),
("invalid_handle!", False),
("alice", True),
("alice.bsky.social", True),
("did:plc:abcd1234", True),
],
)
def test_validate_handle(handle, expected_valid):
result = _validate_handle(handle)
assert result.is_valid is expected_valid
@pytest.mark.parametrize(
"password,expected_valid",
[
("", False),
("short", False),
("longenough", True),
],
)
def test_validate_password(password, expected_valid):
result = _validate_password(password)
assert result.is_valid is expected_valid

View File

@@ -0,0 +1,97 @@
from pathlib import Path
import yaml
def test_configuration_create_reprompts_and_writes_file(config_with_tmp_path, user_input):
inputs = iter([
"bad handle",
"alice.bsky.social",
"5",
"0",
"n",
])
passwords = iter([
"short",
"longenough",
])
user_input(inputs, passwords)
config = config_with_tmp_path
config.create()
assert config.config_file.exists() is True
data = yaml.safe_load(config.config_file.read_text())
assert data["handle"] == "alice.bsky.social"
assert data["password"] == "longenough"
assert data["batch_size"] == 5
assert data["delay"] == 0
assert data["verbose"] is False
def test_configuration_create_invalid_batch_size(config_with_tmp_path, user_input):
inputs = iter([
"alice.bsky.social",
"0",
"1",
"y",
])
passwords = iter(["longenough"])
user_input(inputs, passwords)
config = config_with_tmp_path
config.create()
assert config.config_file.exists() is False
def test_configuration_create_invalid_delay(config_with_tmp_path, user_input):
inputs = iter([
"alice.bsky.social",
"10",
"61",
"y",
])
passwords = iter(["longenough"])
user_input(inputs, passwords)
config = config_with_tmp_path
config.create()
assert config.config_file.exists() is False
def test_configuration_create_overwrite_cancel(config_with_tmp_path, user_input):
config = config_with_tmp_path
config.config_file.parent.mkdir(parents=True, exist_ok=True)
config.config_file.write_text("existing")
user_input(["n"], [])
config.create()
assert config.config_file.read_text() == "existing"
def test_configuration_create_write_failure(config_with_tmp_path, user_input, monkeypatch):
user_input(
["alice.bsky.social", "5", "0", "y"],
["longenough"],
)
config = config_with_tmp_path
original_open = open
def fake_open(path, mode="r", *args, **kwargs):
if Path(path) == config.config_file and "w" in mode:
raise OSError("disk full")
return original_open(path, mode, *args, **kwargs)
monkeypatch.setattr("builtins.open", fake_open)
config.create()
assert config.config_file.exists() is False

View File

@@ -0,0 +1,59 @@
from unittest.mock import patch
import pytest
import yaml
def test_configuration_load_missing_file(config_with_tmp_path):
with pytest.raises(FileNotFoundError, match="Configuration file not found"):
config_with_tmp_path.load()
def test_configuration_load_empty_file(config_with_tmp_path):
config_with_tmp_path.config_file.parent.mkdir(parents=True, exist_ok=True)
config_with_tmp_path.config_file.write_text("")
with pytest.raises(ValueError, match="empty or invalid"):
config_with_tmp_path.load()
def test_configuration_load_invalid_yaml(config_with_tmp_path):
config_with_tmp_path.config_file.parent.mkdir(parents=True, exist_ok=True)
config_with_tmp_path.config_file.write_text(": bad")
with pytest.raises(ValueError, match="Invalid YAML"):
config_with_tmp_path.load()
def test_configuration_load_valid_file(config_with_tmp_path):
config_with_tmp_path.config_file.parent.mkdir(parents=True, exist_ok=True)
config_data = {
"handle": "alice.bsky.social",
"password": "password123",
"batch_size": 10,
"delay": 1,
"verbose": True
}
with open(config_with_tmp_path.config_file, "w") as f:
yaml.dump(config_data, f)
result = config_with_tmp_path.load()
assert result == config_data
def test_configuration_load_file_read_error(config_with_tmp_path):
config_with_tmp_path.config_file.parent.mkdir(parents=True, exist_ok=True)
config_with_tmp_path.config_file.write_text("handle: alice")
with patch("builtins.open", side_effect=IOError("Permission denied")):
with pytest.raises(ValueError, match="Failed to read configuration file"):
config_with_tmp_path.load()
def test_configuration_load_file_os_error(config_with_tmp_path):
config_with_tmp_path.config_file.parent.mkdir(parents=True, exist_ok=True)
config_with_tmp_path.config_file.write_text("handle: alice")
with patch("builtins.open", side_effect=OSError("File is locked")):
with pytest.raises(ValueError, match="Failed to read configuration file"):
config_with_tmp_path.load()

252
tests/test_logger.py Normal file
View File

@@ -0,0 +1,252 @@
import logging
import sys
from skywipe.logger import LevelFilter, ProgressTracker, setup_logger
def test_level_filter_accepts_in_range():
logger = logging.getLogger("test.level_filter")
record = logger.makeRecord(
name="test.level_filter",
level=logging.INFO,
fn="test",
lno=1,
msg="message",
args=(),
exc_info=None,
)
level_filter = LevelFilter(logging.DEBUG, logging.INFO)
assert level_filter.filter(record) is True
def test_level_filter_rejects_out_of_range():
logger = logging.getLogger("test.level_filter")
record = logger.makeRecord(
name="test.level_filter",
level=logging.ERROR,
fn="test",
lno=1,
msg="message",
args=(),
exc_info=None,
)
level_filter = LevelFilter(logging.DEBUG, logging.INFO)
assert level_filter.filter(record) is False
def test_progress_tracker_updates_counts():
tracker = ProgressTracker(operation="Testing")
tracker.update()
tracker.update(2)
assert tracker.current == 3
def test_progress_tracker_batch_with_total():
tracker = ProgressTracker(operation="Testing")
logger = logging.getLogger("skywipe.progress")
messages = []
class MessageHandler(logging.Handler):
def emit(self, record):
messages.append(self.format(record))
handler = MessageHandler()
handler.setLevel(logging.INFO)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
try:
tracker.batch(1, 10, total_batches=5)
assert messages[-1] == "Testing - batch 1/5 (10 items)"
tracker.batch(0, 5, total_batches=0)
assert messages[-1] == "Testing - batch 0/0 (5 items)"
finally:
handler.close()
logger.removeHandler(handler)
def test_progress_tracker_batch_without_total():
tracker = ProgressTracker(operation="Testing")
logger = logging.getLogger("skywipe.progress")
messages = []
class MessageHandler(logging.Handler):
def emit(self, record):
messages.append(self.format(record))
handler = MessageHandler()
handler.setLevel(logging.INFO)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
try:
tracker.batch(1, 10, total_batches=None)
assert messages[-1] == "Testing - batch 1 (10 items)"
finally:
handler.close()
logger.removeHandler(handler)
def test_setup_logger_does_not_duplicate_handlers():
logger = logging.getLogger("skywipe")
original_handlers = list(logger.handlers)
for handler in original_handlers:
logger.removeHandler(handler)
try:
setup_logger(verbose=False)
first_handlers = list(logger.handlers)
stream_handlers = [
handler for handler in first_handlers
if isinstance(handler, logging.StreamHandler)
]
assert len(stream_handlers) == 2
assert {handler.stream for handler in stream_handlers} == {
sys.stdout,
sys.stderr,
}
assert not any(
isinstance(handler, logging.FileHandler)
for handler in first_handlers
)
first_count = len(first_handlers)
setup_logger(verbose=False)
second_count = len(logger.handlers)
finally:
for handler in list(logger.handlers):
handler.close()
logger.removeHandler(handler)
for handler in original_handlers:
logger.addHandler(handler)
assert first_count == second_count
def test_setup_logger_resets_stream_formatters():
logger = logging.getLogger("skywipe")
original_handlers = list(logger.handlers)
for handler in original_handlers:
logger.removeHandler(handler)
try:
setup_logger(verbose=False)
alt_formatter = logging.Formatter(fmt="%(message)s")
for handler in logger.handlers:
if isinstance(handler, logging.StreamHandler):
handler.setFormatter(alt_formatter)
setup_logger(verbose=False)
for handler in logger.handlers:
if isinstance(handler, logging.StreamHandler):
assert handler.formatter is not None
assert handler.formatter._fmt == "%(levelname)s: %(message)s"
finally:
for handler in list(logger.handlers):
handler.close()
logger.removeHandler(handler)
for handler in original_handlers:
logger.addHandler(handler)
def test_setup_logger_disables_propagation():
root_logger = logging.getLogger()
root_messages = []
original_root_level = root_logger.level
class RootHandler(logging.Handler):
def emit(self, record):
root_messages.append(self.format(record))
root_handler = RootHandler()
root_handler.setLevel(logging.INFO)
root_logger.addHandler(root_handler)
root_logger.setLevel(logging.INFO)
logger = logging.getLogger("skywipe")
original_handlers = list(logger.handlers)
for handler in original_handlers:
logger.removeHandler(handler)
try:
setup_logger(verbose=False)
assert logger.propagate is False
progress_logger = logging.getLogger("skywipe.progress")
assert progress_logger.propagate is True
logger.info("Test message")
progress_logger.info("Progress message")
assert len(root_messages) == 0
finally:
root_handler.close()
root_logger.removeHandler(root_handler)
root_logger.setLevel(original_root_level)
for handler in list(logger.handlers):
handler.close()
logger.removeHandler(handler)
for handler in original_handlers:
logger.addHandler(handler)
def test_setup_logger_file_handler_lifecycle(tmp_path):
logger = logging.getLogger("skywipe")
original_handlers = list(logger.handlers)
for handler in original_handlers:
logger.removeHandler(handler)
log_file = tmp_path / "skywipe.log"
try:
setup_logger(verbose=False, log_file=log_file)
file_handlers = [
handler for handler in logger.handlers
if isinstance(handler, logging.FileHandler)
]
assert len(file_handlers) == 1
assert file_handlers[0].baseFilename == str(log_file)
setup_logger(verbose=False, log_file=None)
file_handlers = [
handler for handler in logger.handlers
if isinstance(handler, logging.FileHandler)
]
assert file_handlers == []
finally:
for handler in list(logger.handlers):
logger.removeHandler(handler)
for handler in original_handlers:
logger.addHandler(handler)
def test_setup_logger_replaces_file_handler_when_path_changes(tmp_path):
logger = logging.getLogger("skywipe")
original_handlers = list(logger.handlers)
for handler in original_handlers:
logger.removeHandler(handler)
log_file1 = tmp_path / "skywipe1.log"
log_file2 = tmp_path / "skywipe2.log"
try:
setup_logger(verbose=False, log_file=log_file1)
file_handlers = [
handler for handler in logger.handlers
if isinstance(handler, logging.FileHandler)
]
assert len(file_handlers) == 1
assert file_handlers[0].baseFilename == str(log_file1)
setup_logger(verbose=False, log_file=log_file2)
file_handlers = [
handler for handler in logger.handlers
if isinstance(handler, logging.FileHandler)
]
assert len(file_handlers) == 1
assert file_handlers[0].baseFilename == str(log_file2)
finally:
for handler in list(logger.handlers):
logger.removeHandler(handler)
for handler in original_handlers:
logger.addHandler(handler)

View File

@@ -0,0 +1,30 @@
import pytest
import skywipe.operations as operations
def test_operation_context_raises_on_auth_error(monkeypatch):
class FakeAuth:
def login(self):
raise ValueError("bad auth")
monkeypatch.setattr(operations, "Auth", FakeAuth)
with pytest.raises(ValueError, match="bad auth"):
operations.OperationContext()
def test_operation_context_raises_on_config_error(monkeypatch):
class FakeClient:
class Me:
did = "did:plc:fake"
me = Me()
def fake_load(self):
raise ValueError("bad config")
monkeypatch.setattr(operations.Configuration, "load", fake_load)
with pytest.raises(ValueError, match="bad config"):
operations.OperationContext(client=FakeClient())

98
tests/test_operations.py Normal file
View File

@@ -0,0 +1,98 @@
import time
import pytest
from skywipe.operations import Operation, BookmarkStrategy
class FakeClient:
class Me:
did = "did:plc:fake"
me = Me()
class FakeResponse:
def __init__(self, items, cursor=None):
self.items = items
self.cursor = cursor
class FakeStrategy:
def __init__(self, responses, fail_on=None):
self._responses = list(responses)
self._fail_on = fail_on
def fetch(self, context, cursor=None):
return self._responses.pop(0)
def extract_items(self, response):
return response.items
def process_item(self, item, context):
if self._fail_on is not None and item == self._fail_on:
raise ValueError("boom")
def get_cursor(self, response):
return response.cursor
def test_operation_run_batches_filters_and_sleeps(monkeypatch):
responses = [
FakeResponse(items=[1, 2, 3], cursor="next"),
FakeResponse(items=[4], cursor=None),
]
operation = Operation(
"Testing",
strategy_type="feed",
client=FakeClient(),
config_data={"batch_size": 2, "delay": 1},
filter_fn=lambda item: item != 2,
)
operation.strategy = FakeStrategy(responses, fail_on=3)
slept = []
def fake_sleep(seconds):
slept.append(seconds)
monkeypatch.setattr(time, "sleep", fake_sleep)
total = operation.run()
assert total == 2
assert slept == [1]
def test_bookmark_strategy_extracts_uri_from_shapes():
strategy = BookmarkStrategy()
class Obj:
pass
direct = Obj()
direct.uri = "direct"
assert strategy._extract_bookmark_uri(direct) == "direct"
subject = Obj()
subject.subject = Obj()
subject.subject.uri = "subject"
assert strategy._extract_bookmark_uri(subject) == "subject"
record = Obj()
record.record = Obj()
record.record.uri = "record"
assert strategy._extract_bookmark_uri(record) == "record"
post = Obj()
post.post = Obj()
post.post.uri = "post"
assert strategy._extract_bookmark_uri(post) == "post"
item = Obj()
item.item = Obj()
item.item.uri = "item"
assert strategy._extract_bookmark_uri(item) == "item"
missing = Obj()
assert strategy._extract_bookmark_uri(missing) is None

36
tests/test_safeguard.py Normal file
View File

@@ -0,0 +1,36 @@
import logging
import pytest
from skywipe.safeguard import require_confirmation
def test_require_confirmation_skips_prompt():
logger = logging.getLogger("test.safeguard")
require_confirmation("do nothing", skip_confirmation=True, logger=logger)
def test_require_confirmation_accepts_yes(monkeypatch):
logger = logging.getLogger("test.safeguard")
monkeypatch.setattr("builtins.input", lambda _: "y")
require_confirmation("do nothing", logger=logger)
def test_require_confirmation_rejects_no(monkeypatch):
logger = logging.getLogger("test.safeguard")
monkeypatch.setattr("builtins.input", lambda _: "n")
with pytest.raises(SystemExit) as excinfo:
require_confirmation("do nothing", logger=logger)
assert excinfo.value.code == 0
def test_require_confirmation_handles_eof(monkeypatch):
logger = logging.getLogger("test.safeguard")
def raise_eof(_prompt):
raise EOFError
monkeypatch.setattr("builtins.input", raise_eof)
with pytest.raises(SystemExit) as excinfo:
require_confirmation("do nothing", logger=logger)
assert excinfo.value.code == 0

129
tests/test_strategies.py Normal file
View File

@@ -0,0 +1,129 @@
from types import SimpleNamespace
from skywipe.operations import (
RecordDeletionStrategy,
FeedStrategy,
BookmarkStrategy,
)
def test_record_deletion_strategy_fetch_and_process(monkeypatch):
captured = {}
class FakeParams:
def __init__(self, **kwargs):
self.kwargs = kwargs
def fake_list_records(params):
captured["params"] = params
return "response"
def fake_delete_record(data):
captured["delete"] = data
client = SimpleNamespace(
com=SimpleNamespace(
atproto=SimpleNamespace(
repo=SimpleNamespace(
list_records=fake_list_records,
delete_record=fake_delete_record,
)
)
)
)
context = SimpleNamespace(
did="did:plc:fake",
batch_size=2,
client=client,
logger=SimpleNamespace(debug=lambda *_args, **_kwargs: None),
)
monkeypatch.setattr(
"skywipe.operations.models.ComAtprotoRepoListRecords.Params",
FakeParams,
)
strategy = RecordDeletionStrategy("app.bsky.feed.like")
response = strategy.fetch(context, cursor="next")
assert response == "response"
assert captured["params"].kwargs == {
"repo": "did:plc:fake",
"collection": "app.bsky.feed.like",
"limit": 2,
"cursor": "next",
}
record = SimpleNamespace(uri="at://did:plc:fake/app.bsky.feed.like/abc123")
strategy.process_item(record, context)
assert captured["delete"] == {
"repo": "did:plc:fake",
"collection": "app.bsky.feed.like",
"rkey": "abc123",
}
def test_feed_strategy_fetch_and_process():
captured = {}
def fake_get_author_feed(**kwargs):
captured["feed"] = kwargs
return "feed"
def fake_delete_post(uri):
captured["delete"] = uri
client = SimpleNamespace(
get_author_feed=fake_get_author_feed,
delete_post=fake_delete_post,
)
context = SimpleNamespace(
did="did:plc:fake",
batch_size=3,
client=client,
logger=SimpleNamespace(debug=lambda *_args, **_kwargs: None),
)
strategy = FeedStrategy()
response = strategy.fetch(context, cursor=None)
assert response == "feed"
assert captured["feed"] == {"actor": "did:plc:fake", "limit": 3}
response = strategy.fetch(context, cursor="next")
assert response == "feed"
assert captured["feed"] == {
"actor": "did:plc:fake", "limit": 3, "cursor": "next"}
post = SimpleNamespace(post=SimpleNamespace(uri="at://post"))
strategy.process_item(post, context)
assert captured["delete"] == "at://post"
def test_bookmark_strategy_fetch(monkeypatch):
captured = {}
class FakeParams:
def __init__(self, **kwargs):
self.kwargs = kwargs
def fake_get_bookmarks(params):
captured["params"] = params
return "bookmarks"
client = SimpleNamespace(
app=SimpleNamespace(
bsky=SimpleNamespace(
bookmark=SimpleNamespace(get_bookmarks=fake_get_bookmarks)
)
)
)
context = SimpleNamespace(batch_size=5, client=client)
monkeypatch.setattr(
"skywipe.operations.models.AppBskyBookmarkGetBookmarks.Params",
FakeParams,
)
strategy = BookmarkStrategy()
response = strategy.fetch(context, cursor="cursor")
assert response == "bookmarks"
assert captured["params"].kwargs == {"limit": 5, "cursor": "cursor"}

61
uv.lock generated
View File

@@ -228,6 +228,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/0e/61/66938bbb5fc52dbdf84594873d5b51fb1f7c7794e9c0f5bd885f30bc507b/idna-3.11-py3-none-any.whl", hash = "sha256:771a87f49d9defaf64091e6e6fe9c18d4833f140bd19464795bc32d966ca37ea", size = 71008, upload-time = "2025-10-12T14:55:18.883Z" },
]
[[package]]
name = "iniconfig"
version = "2.3.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/72/34/14ca021ce8e5dfedc35312d08ba8bf51fdd999c576889fc2c24cb97f4f10/iniconfig-2.3.0.tar.gz", hash = "sha256:c76315c77db068650d49c5b56314774a7804df16fee4402c1f19d6d15d8c4730", size = 20503, upload-time = "2025-10-18T21:55:43.219Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/cb/b1/3846dd7f199d53cb17f49cba7e651e9ce294d8497c8c150530ed11865bb8/iniconfig-2.3.0-py3-none-any.whl", hash = "sha256:f631c04d2c48c52b84d0d0549c99ff3859c98df65b3101406327ecc7d53fbf12", size = 7484, upload-time = "2025-10-18T21:55:41.639Z" },
]
[[package]]
name = "libipld"
version = "3.3.2"
@@ -262,6 +271,24 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/22/19/bb42dc53bb8855c1f40b4a431ed3cb2df257bd5a6af61842626712c83073/libipld-3.3.2-cp314-cp314-win_arm64.whl", hash = "sha256:08261503b7307c6d9acbd3b2a221da9294b457204dcefce446f627893abb077e", size = 149324, upload-time = "2025-12-05T12:59:18.815Z" },
]
[[package]]
name = "packaging"
version = "25.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/a1/d4/1fc4078c65507b51b96ca8f8c3ba19e6a61c8253c72794544580a7b6c24d/packaging-25.0.tar.gz", hash = "sha256:d443872c98d677bf60f6a1f2f8c1cb748e8fe762d2bf9d3148b5599295b0fc4f", size = 165727, upload-time = "2025-04-19T11:48:59.673Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/20/12/38679034af332785aac8774540895e234f4d07f7545804097de4b666afd8/packaging-25.0-py3-none-any.whl", hash = "sha256:29572ef2b1f17581046b3a2227d5c611fb25ec70ca1ba8554b24b0e69331a484", size = 66469, upload-time = "2025-04-19T11:48:57.875Z" },
]
[[package]]
name = "pluggy"
version = "1.6.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/f9/e2/3e91f31a7d2b083fe6ef3fa267035b518369d9511ffab804f839851d2779/pluggy-1.6.0.tar.gz", hash = "sha256:7dcc130b76258d33b90f61b658791dede3486c3e6bfb003ee5c9bfb396dd22f3", size = 69412, upload-time = "2025-05-15T12:30:07.975Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538, upload-time = "2025-05-15T12:30:06.134Z" },
]
[[package]]
name = "pycparser"
version = "2.23"
@@ -339,6 +366,31 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/9f/ed/068e41660b832bb0b1aa5b58011dea2a3fe0ba7861ff38c4d4904c1c1a99/pydantic_core-2.41.5-cp314-cp314t-win_arm64.whl", hash = "sha256:35b44f37a3199f771c3eaa53051bc8a70cd7b54f333531c59e29fd4db5d15008", size = 1974769, upload-time = "2025-11-04T13:42:01.186Z" },
]
[[package]]
name = "pygments"
version = "2.19.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/b0/77/a5b8c569bf593b0140bde72ea885a803b82086995367bf2037de0159d924/pygments-2.19.2.tar.gz", hash = "sha256:636cb2477cec7f8952536970bc533bc43743542f70392ae026374600add5b887", size = 4968631, upload-time = "2025-06-21T13:39:12.283Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c7/21/705964c7812476f378728bdf590ca4b771ec72385c533964653c68e86bdc/pygments-2.19.2-py3-none-any.whl", hash = "sha256:86540386c03d588bb81d44bc3928634ff26449851e99741617ecb9037ee5ec0b", size = 1225217, upload-time = "2025-06-21T13:39:07.939Z" },
]
[[package]]
name = "pytest"
version = "9.0.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "colorama", marker = "sys_platform == 'win32'" },
{ name = "iniconfig" },
{ name = "packaging" },
{ name = "pluggy" },
{ name = "pygments" },
]
sdist = { url = "https://files.pythonhosted.org/packages/d1/db/7ef3487e0fb0049ddb5ce41d3a49c235bf9ad299b6a25d5780a89f19230f/pytest-9.0.2.tar.gz", hash = "sha256:75186651a92bd89611d1d9fc20f0b4345fd827c41ccd5c299a868a05d70edf11", size = 1568901, upload-time = "2025-12-06T21:30:51.014Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/3b/ab/b3226f0bd7cdcf710fbede2b3548584366da3b19b5021e74f5bde2a8fa3f/pytest-9.0.2-py3-none-any.whl", hash = "sha256:711ffd45bf766d5264d487b917733b453d917afd2b0ad65223959f59089f875b", size = 374801, upload-time = "2025-12-06T21:30:49.154Z" },
]
[[package]]
name = "pyyaml"
version = "6.0.3"
@@ -378,17 +430,24 @@ wheels = [
[[package]]
name = "skywipe"
version = "0.1.0"
source = { virtual = "." }
source = { editable = "." }
dependencies = [
{ name = "atproto" },
{ name = "pyyaml" },
]
[package.optional-dependencies]
dev = [
{ name = "pytest" },
]
[package.metadata]
requires-dist = [
{ name = "atproto", specifier = ">=0.0.65" },
{ name = "pytest", marker = "extra == 'dev'", specifier = ">=8.0" },
{ name = "pyyaml", specifier = ">=6.0" },
]
provides-extras = ["dev"]
[[package]]
name = "typing-extensions"