Compare commits

...

53 Commits

Author SHA1 Message Date
b336991d67 feat: add -v|--version flag 2025-12-20 23:05:42 +01:00
62c058e9ee feat: add version 2025-12-20 23:05:20 +01:00
35c8b5b8d1 refactor: use NamedTuple for validation results 2025-12-20 22:36:51 +01:00
54c3353667 refactor: simplify nested try-except blocks 2025-12-20 22:35:01 +01:00
93a124be2a clean: remove useless declarations 2025-12-20 22:33:54 +01:00
ecbee7a8ac refactor: pass logger instead of multiple get_logger() calls 2025-12-20 22:30:57 +01:00
eaf4e94d24 refactor: pass logger parameter and use error handling helper 2025-12-20 22:30:47 +01:00
97e166d5f7 refactor: make logger parameter optional 2025-12-20 22:30:33 +01:00
61e2d7f731 refactor: add handle_error() helper for centralized error handling 2025-12-20 22:30:21 +01:00
054dc01813 fix: ensure logger always writes to log file 2025-12-20 22:09:25 +01:00
799b1083ab refactor: add hacking at the bottom 2025-12-20 21:56:57 +01:00
15db235fe1 docs: update hacking section with example 2025-12-20 21:48:57 +01:00
d09dcf06cf docs: ready to fly 2025-12-20 21:47:36 +01:00
5b9589794e clean: shorter epilog 2025-12-20 21:46:16 +01:00
93b88917df refactor: consolidate all command metadata into a single structure 2025-12-20 21:26:11 +01:00
25618ab5bf refactor: simplify logger 2025-12-20 21:24:02 +01:00
81fa68ed08 fix: run commands in right order if all 2025-12-20 21:11:45 +01:00
ca6eaed146 feat: improve confirmation message handling 2025-12-20 21:10:00 +01:00
1b8b32027c feat: dependency injection to allow reusing an existing auth client 2025-12-20 21:07:50 +01:00
a6190aeb84 feat: update run_all to reuse auth and config 2025-12-20 21:07:43 +01:00
887169e7d2 feat: add some validation 2025-12-20 21:04:09 +01:00
9565e4008e feat: consistent error handling 2025-12-20 20:59:18 +01:00
c2aab71955 refactor: add basestrategy and make all class inherits to get rid of get_cursor() 2025-12-20 20:56:22 +01:00
97dd55981b refactor: use a factory pattern 2025-12-20 20:54:52 +01:00
0e91c95e9b refactor: extract common embed checking logic 2025-12-20 20:51:45 +01:00
a818df4a6c refactor: replace hardcoded tuple with constants (consistency+readability) 2025-12-20 20:50:06 +01:00
9aec57bd56 fix: logger's already setup 2025-12-20 20:48:28 +01:00
ec9943822c fix: replace hardcoded list with a registry-derived list 2025-12-20 20:47:15 +01:00
0240ff9f8e clean: remove duplicate run_operation, dead code now 2025-12-20 20:44:42 +01:00
45e2e1eb00 clean: remove is_logged() (not used) 2025-12-20 20:44:29 +01:00
ff20228fa6 docs: update roadmap 2025-12-20 17:25:02 +01:00
9d9c09d56a refactor: use our new Operation and PostAnalyzer tools 2025-12-20 17:24:29 +01:00
64355fbeeb clean: remove unused import 2025-12-20 17:24:15 +01:00
4a337e6b20 feat: add post analysis utilities 2025-12-20 17:24:05 +01:00
f27be4d603 refactor: add OperationContext and Operations classes 2025-12-20 17:23:54 +01:00
9d254ac4b7 refactor: delete unitary "actions" py files 2025-12-20 17:23:35 +01:00
590cc03ba4 docs: refactor warning 2025-12-20 16:03:39 +01:00
c493a99860 docs: inform about log 2025-12-20 16:01:07 +01:00
3f9ef6527f docs: update roadmap 2025-12-20 16:00:03 +01:00
f53e5bb527 feat: implement our safeguard in commands 2025-12-20 15:59:54 +01:00
f2854a0df5 feat: implement safeguard and relocate log file 2025-12-20 15:59:41 +01:00
defd991006 feat: new safeguard module 2025-12-20 15:59:17 +01:00
f922e3cf9d docs: update roadmap 2025-12-20 15:46:48 +01:00
06d70edaf1 feat: use our new logger 2025-12-20 15:46:33 +01:00
58ab6cfafa feat: implement proper logging 2025-12-20 15:46:22 +01:00
dba06e642a docs: update roadmap 2025-12-19 14:39:15 +01:00
5868c1649b docs: update readme 2025-12-19 14:35:25 +01:00
a14184cddc feat: run all 2025-12-19 14:35:21 +01:00
6587f8c39c feat: implement bookmark deletion module 2025-12-19 14:35:05 +01:00
ae6663572c refactor: inline has_quote_embed 2025-12-19 14:34:42 +01:00
3eb456e999 refactor: delete_posts is now delete_all_posts 2025-12-19 14:34:25 +01:00
cfa5773e62 refactor: inline has_media_embed 2025-12-19 14:34:02 +01:00
ddee2a6029 feat: implement follow undoing module 2025-12-19 14:33:43 +01:00
15 changed files with 748 additions and 472 deletions

View File

@@ -1,39 +1,33 @@
# Skywipe
Skywipe is a work-in-progress Python 3.13+ CLI that helps you wipe data from your Bluesky account using the AT Protocol SDK.
Skywipe is a Python 3.13+ CLI that helps you wipe data from your Bluesky account using the AT Protocol SDK.
## Warning
**Warning:** This tool performs _**destructive operations**_.
This tool performs destructive operations. Only use it if you intend to erase data from your Bluesky account.
Only use it if you intend to permanently erase data from your Bluesky account.
## Requirements
Check [pyproject.toml](pyproject.toml).
Python 3.13+.
You can use `uv` to install dependencies:
The rest of the dependencies are listed in [pyproject.toml](pyproject.toml).
## Installation
Use [`pipx`](https://pipx.pypa.io/latest/installation/) to install `skywipe`:
```bash
git clone https://git.kharec.info/Kharec/skywipe.git
cd skywipe
uv sync
pipx install git+https://git.kharec.info/Kharec/skywipe.git
```
## How to run
While it's being developed, you can use the tool using `uv` :
Run the tool and see available commands with:
```bash
uv run python -m skywipe.cli all # target everything
uv run python -m skywipe.cli configure # create configuration
uv run python -m skywipe.cli posts # delete posts
uv run python -m skywipe.cli medias # delete posts with medias
uv run python -m skywipe.cli likes # undo likes
uv run python -m skywipe.cli reposts # undo reposts
uv run python -m skywipe.cli quotes # delete quotes
uv run python -m skywipe.cli follows # unfollow all
uv run python -m skywipe.cli bookmarks # delete bookmarks
skywipe -h
```
Use the `--yes` flag to skip the confirmation prompt and proceed with the operation. A log of the operations will be saved in `~/.cache/skywipe/skywipe.log`.
## Configuration
If you run the tool for the first time, it will prompt you to use `skywipe configure` to create the configuration file, which is located in `~/.config/skywipe/config.yml` :
@@ -48,26 +42,25 @@ verbose: true
BE SURE TO USE A [BLUESKY APP PASSWORD](https://blueskyfeeds.com/faq-app-password) FOR OBVIOUS SECURITY REASONS.
## Roadmap
## Hacking
- [x] build cli parameter management
- [x] handle configuration logic
- [x] sign in to at protocol
- [x] delete posts in batch
- [x] only delete posts with media
- [x] undo likes
- [x] undo reposts
- [x] delete quotes
- [ ] unfollow accounts
- [ ] remove bookmarks
- [ ] make `all` run the other commands
- [ ] add simple progress and logging
- [ ] add safeguards like confirmations and clear dry-run info
You can use `uv` to install dependencies:
Once it's done, we'll think:
```bash
git clone https://git.kharec.info/Kharec/skywipe.git
cd skywipe
uv sync
```
- [ ] decent code architecture
- [ ] installation and run process
Then start coding.
If you want to test your changes, you can run the tool with:
```bash
uv run python -m skywipe.cli -h
uv run python -m skywipe.cli all
# or any other command
```
## License

View File

@@ -0,0 +1 @@
__version__ = "0.1.0"

View File

@@ -10,7 +10,11 @@ class Auth:
self.client = None
def login(self) -> Client:
config_data = self.config.load()
try:
config_data = self.config.load()
except FileNotFoundError as e:
raise ValueError(
"Configuration file not found. Run 'skywipe configure' first.") from e
handle = config_data.get("handle")
password = config_data.get("password")
@@ -19,10 +23,11 @@ class Auth:
raise ValueError(
"handle and password must be set in configuration")
self.client = Client()
self.client.login(handle, password)
try:
self.client = Client()
self.client.login(handle, password)
except Exception as e:
raise ValueError(
f"Failed to authenticate: {e}") from e
return self.client
def is_logged(self) -> bool:
return bool(getattr(self.client, "me", None))

View File

@@ -2,9 +2,15 @@
import sys
import argparse
from pathlib import Path
from . import __version__
from .commands import registry
from .configure import Configuration
from .logger import setup_logger, get_logger, handle_error
LOG_FILE = Path.home() / ".cache" / "skywipe" / "skywipe.log"
def create_parser():
@@ -12,7 +18,19 @@ def create_parser():
parser = argparse.ArgumentParser(
description="Clean your bluesky account with style.",
epilog="WARNING: This tool performs destructive operations. Only use it if you intend to erase data from your Bluesky account."
epilog="WARNING: This tool deletes your Bluesky data permanently."
)
parser.add_argument(
"-v", "--version",
action="version",
version=f"Skywipe {__version__}"
)
parser.add_argument(
"--yes",
action="store_true",
help="Skip confirmation prompt and proceed with destructive operations"
)
subparsers = parser.add_subparsers(
@@ -28,11 +46,11 @@ def create_parser():
return parser
def require_config():
def require_config(logger):
config = Configuration()
if not config.exists():
print("Error: Configuration file not found.")
print("You must run 'skywipe configure' first.")
logger.error("Configuration file not found.")
logger.error("You must run 'skywipe configure' first.")
sys.exit(1)
@@ -40,14 +58,21 @@ def main():
parser = create_parser()
args = parser.parse_args()
setup_logger(verbose=False, log_file=LOG_FILE)
logger = get_logger()
if registry.requires_config(args.command):
require_config()
require_config(logger)
config = Configuration()
config_data = config.load()
verbose = config_data.get("verbose", False)
setup_logger(verbose=verbose, log_file=LOG_FILE)
try:
registry.execute(args.command)
except ValueError as e:
print(f"Error: {e}")
sys.exit(1)
registry.execute(
args.command, skip_confirmation=getattr(args, "yes", False))
except (ValueError, Exception) as e:
handle_error(e, logger, exit_on_error=True)
if __name__ == '__main__':

View File

@@ -1,15 +1,84 @@
"""Command implementations for Skywipe"""
from typing import Callable, Dict, Optional
from typing import Callable, Dict, Optional, Any
from .configure import Configuration
from .posts import delete_posts
from .medias import delete_posts_with_medias
from .likes import undo_likes
from .reposts import undo_reposts
from .quotes import delete_quotes
from .operations import Operation
from .post_analysis import PostAnalyzer
from .logger import get_logger, handle_error
from .safeguard import require_confirmation
CommandHandler = Callable[[], None]
CommandHandler = Callable[..., None]
COMMAND_METADATA = {
"posts": {
"confirmation": "delete all posts",
"help_text": "only posts",
"operation_name": "Deleting posts",
"strategy_type": "feed",
"collection": None,
"filter_fn": None,
},
"medias": {
"confirmation": "delete all posts with media",
"help_text": "only posts with medias",
"operation_name": "Deleting posts with media",
"strategy_type": "feed",
"collection": None,
"filter_fn": lambda post: PostAnalyzer.has_media(post.post),
},
"likes": {
"confirmation": "undo all likes",
"help_text": "only likes",
"operation_name": "Undoing likes",
"strategy_type": "record",
"collection": "app.bsky.feed.like",
"filter_fn": None,
},
"reposts": {
"confirmation": "undo all reposts",
"help_text": "only reposts",
"operation_name": "Undoing reposts",
"strategy_type": "record",
"collection": "app.bsky.feed.repost",
"filter_fn": None,
},
"quotes": {
"confirmation": "delete all quote posts",
"help_text": "only quotes",
"operation_name": "Deleting quote posts",
"strategy_type": "feed",
"collection": None,
"filter_fn": lambda post: PostAnalyzer.has_quote(post.post),
},
"follows": {
"confirmation": "unfollow all accounts",
"help_text": "only follows",
"operation_name": "Unfollowing accounts",
"strategy_type": "record",
"collection": "app.bsky.graph.follow",
"filter_fn": None,
},
"bookmarks": {
"confirmation": "delete all bookmarks",
"help_text": "only bookmarks",
"operation_name": "Deleting bookmarks",
"strategy_type": "bookmark",
"collection": None,
"filter_fn": None,
},
}
COMMAND_EXECUTION_ORDER = [
"quotes",
"medias",
"posts",
"likes",
"reposts",
"follows",
"bookmarks",
]
class CommandRegistry:
@@ -41,10 +110,13 @@ class CommandRegistry:
def get_all_commands(self) -> Dict[str, str]:
return self._help_texts.copy()
def execute(self, name: str):
def execute(self, name: str, skip_confirmation: bool = False):
handler = self.get_handler(name)
if handler:
handler()
if name == "configure":
handler()
else:
handler(skip_confirmation)
else:
raise ValueError(f"Unknown command: {name}")
@@ -52,56 +124,103 @@ class CommandRegistry:
registry = CommandRegistry()
def _create_operation_handler(
confirmation_message: str,
operation_name: str,
strategy_type: str = "feed",
collection: Optional[str] = None,
filter_fn: Optional[Callable[[Any], bool]] = None
) -> CommandHandler:
logger = get_logger()
def handler(skip_confirmation: bool = False):
require_confirmation(confirmation_message, skip_confirmation, logger)
try:
Operation(
operation_name,
strategy_type=strategy_type,
collection=collection,
filter_fn=filter_fn
).run()
except (ValueError, Exception) as e:
handle_error(e, logger)
return handler
def run_configure():
config = Configuration()
config.create()
def run_posts():
delete_posts()
def _create_command_handlers():
handlers = {}
for cmd, metadata in COMMAND_METADATA.items():
handlers[cmd] = _create_operation_handler(
metadata["confirmation"],
metadata["operation_name"],
strategy_type=metadata["strategy_type"],
collection=metadata["collection"],
filter_fn=metadata["filter_fn"]
)
return handlers
def run_medias():
delete_posts_with_medias()
_command_handlers = _create_command_handlers()
def run_likes():
undo_likes()
def run_all(skip_confirmation: bool = False):
logger = get_logger()
all_commands = registry.get_all_commands()
available_commands = [cmd for cmd in all_commands.keys()
if cmd not in ("configure", "all")]
def run_reposts():
undo_reposts()
commands = [cmd for cmd in COMMAND_EXECUTION_ORDER
if cmd in available_commands]
commands.extend([cmd for cmd in available_commands
if cmd not in COMMAND_EXECUTION_ORDER])
def run_quotes():
delete_quotes()
commands_str = ", ".join(commands)
all_confirmation = f"run all cleanup commands ({commands_str})"
require_confirmation(all_confirmation, skip_confirmation, logger)
logger.info("Running all cleanup commands...")
def run_follows():
print("Command 'follows' is not yet implemented.")
from .operations import OperationContext
try:
context = OperationContext()
shared_client = context.client
shared_config_data = context.config_data
except Exception as e:
logger.error(
f"Failed to initialize shared context: {e}", exc_info=True)
raise
def run_bookmarks():
print("Command 'bookmarks' is not yet implemented")
def run_all():
registry.execute("posts")
registry.execute("medias")
registry.execute("likes")
registry.execute("reposts")
registry.execute("quotes")
registry.execute("follows")
registry.execute("bookmarks")
for cmd in commands:
try:
logger.info(f"Starting command: {cmd}")
metadata = COMMAND_METADATA.get(cmd)
if metadata:
Operation(
metadata["operation_name"],
strategy_type=metadata["strategy_type"],
collection=metadata["collection"],
filter_fn=metadata["filter_fn"],
client=shared_client,
config_data=shared_config_data
).run()
else:
registry.execute(cmd, skip_confirmation=True)
logger.info(f"Completed command: {cmd}")
except Exception as e:
logger.error(f"Error running '{cmd}': {e}", exc_info=True)
continue
logger.info("All commands completed.")
registry.register("configure", run_configure,
"create configuration", requires_config=False)
registry.register("posts", run_posts, "only posts")
registry.register("medias", run_medias, "only posts with medias")
registry.register("likes", run_likes, "only likes")
registry.register("reposts", run_reposts, "only reposts")
registry.register("quotes", run_quotes, "only quotes")
registry.register("follows", run_follows, "only follows")
registry.register("bookmarks", run_follows, "only bookmarks")
for cmd, metadata in COMMAND_METADATA.items():
registry.register(cmd, _command_handlers[cmd], metadata["help_text"])
registry.register("all", run_all, "target everything")

View File

@@ -1,8 +1,52 @@
"""Core configuration module for Skywipe"""
import getpass
import re
from pathlib import Path
from typing import NamedTuple
import yaml
from .logger import setup_logger
class ValidationResult(NamedTuple):
is_valid: bool
error_message: str
def _validate_handle(handle: str) -> ValidationResult:
if not handle:
return ValidationResult(False, "Handle cannot be empty")
if len(handle) > 253:
return ValidationResult(False, "Handle is too long (max 253 characters)")
if " " in handle:
return ValidationResult(False, "Handle cannot contain spaces")
handle_pattern = re.compile(
r"^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$|"
r"^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$|"
r"^did:[a-z]+:[a-zA-Z0-9._-]+$"
)
if not handle_pattern.match(handle):
return ValidationResult(False, (
"Invalid handle format. "
"Use a username (e.g., 'alice'), full handle (e.g., 'alice.bsky.social'), "
"or DID (e.g., 'did:plc:...')"
))
return ValidationResult(True, "")
def _validate_password(password: str) -> ValidationResult:
if not password:
return ValidationResult(False, "Password cannot be empty")
if len(password) < 8:
return ValidationResult(False, "Password is too short (minimum 8 characters)")
return ValidationResult(True, "")
class Configuration:
@@ -13,10 +57,12 @@ class Configuration:
return self.config_file.exists()
def create(self):
logger = setup_logger(verbose=False)
if self.exists():
overwrite = input(
"Configuration already exists. Overwrite? (y/N): ").strip().lower()
if overwrite not in ("y", "yes"):
logger.info("Configuration creation cancelled.")
return
config_dir = self.config_file.parent
@@ -24,8 +70,27 @@ class Configuration:
print("Skywipe Configuration")
print("=" * 50)
handle = input("Bluesky handle: ").strip()
password = getpass.getpass("Bluesky app password: ").strip()
print("Note: You should use an app password from Bluesky settings.")
while True:
handle = input("Bluesky handle: ").strip()
is_valid, error_msg = _validate_handle(handle)
if is_valid:
break
logger.error(error_msg)
logger.info("Please enter a valid handle and try again.")
while True:
password = getpass.getpass(
"Bluesky (hopefully app) password: ").strip()
is_valid, error_msg = _validate_password(password)
if is_valid:
break
logger.error(error_msg)
logger.info("Please check your password and try again.")
logger.info(
"Generate an app password at: https://bsky.app/settings/app-passwords")
batch_size = input("Batch size (default: 10): ").strip() or "10"
delay = input(
"Delay between batches in seconds (default: 1): ").strip() or "1"
@@ -35,9 +100,20 @@ class Configuration:
try:
batch_size = int(batch_size)
delay = int(delay)
if batch_size < 1 or batch_size > 100:
logger.error("batch_size must be between 1 and 100")
return
except ValueError:
print("Error: batch_size and delay must be integers")
logger.error("batch_size must be an integer")
return
try:
delay = int(delay)
if delay < 0 or delay > 60:
logger.error("delay must be between 0 and 60 seconds")
return
except ValueError:
logger.error("delay must be an integer")
return
config_data = {
@@ -48,14 +124,28 @@ class Configuration:
"verbose": verbose
}
with open(self.config_file, "w") as f:
yaml.dump(config_data, f, default_flow_style=False)
try:
with open(self.config_file, "w") as f:
yaml.dump(config_data, f, default_flow_style=False)
except (IOError, OSError) as e:
logger.error(f"Failed to save configuration: {e}")
return
print(f"\nConfiguration saved to {self.config_file}")
logger.info(f"Configuration saved to {self.config_file}")
def load(self) -> dict:
if not self.exists():
raise FileNotFoundError(
f"Configuration file not found: {self.config_file}")
with open(self.config_file, "r") as f:
return yaml.safe_load(f)
try:
with open(self.config_file, "r") as f:
config_data = yaml.safe_load(f)
if config_data is None:
raise ValueError("Configuration file is empty or invalid")
return config_data
except (IOError, OSError) as e:
raise ValueError(
f"Failed to read configuration file: {e}") from e
except yaml.YAMLError as e:
raise ValueError(
f"Invalid YAML in configuration file: {e}") from e

View File

@@ -1,69 +0,0 @@
"""Like undoing module for Skywipe"""
import time
from atproto import models
from .auth import Auth
from .configure import Configuration
LIKE_COLLECTION = "app.bsky.feed.like"
def undo_likes():
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting like deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_undone = 0
while True:
list_params = models.ComAtprotoRepoListRecords.Params(
repo=did,
collection=LIKE_COLLECTION,
limit=batch_size,
cursor=cursor
)
response = client.com.atproto.repo.list_records(params=list_params)
records = response.records
if not records:
break
for record in records:
try:
record_uri = record.uri
rkey = record_uri.rsplit("/", 1)[-1]
delete_data = {
"repo": did,
"collection": LIKE_COLLECTION,
"rkey": rkey
}
client.com.atproto.repo.delete_record(data=delete_data)
total_undone += 1
if verbose:
print(f"Undone like: {record_uri}")
except Exception as e:
record_uri = getattr(record, "uri", "unknown")
if verbose:
print(f"Error undoing like {record_uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
print(f"Undone {total_undone} likes.")

103
skywipe/logger.py Normal file
View File

@@ -0,0 +1,103 @@
"""Centralized logging module for Skywipe"""
import logging
import sys
from pathlib import Path
from typing import Optional
class ProgressTracker:
def __init__(self, operation: str = "Processing"):
self.current = 0
self.operation = operation
def update(self, count: int = 1):
self.current += count
def batch(self, batch_num: int, batch_size: int, total_batches: Optional[int] = None):
logger = logging.getLogger("skywipe.progress")
if total_batches:
logger.info(
f"{self.operation} - batch {batch_num}/{total_batches} ({batch_size} items)"
)
else:
logger.info(
f"{self.operation} - batch {batch_num} ({batch_size} items)")
class LevelFilter(logging.Filter):
def __init__(self, min_level: int, max_level: int):
super().__init__()
self.min_level = min_level
self.max_level = max_level
def filter(self, record: logging.LogRecord) -> bool:
return self.min_level <= record.levelno <= self.max_level
def setup_logger(verbose: bool = False, log_file: Optional[Path] = None) -> logging.Logger:
logger = logging.getLogger("skywipe")
target_level = logging.DEBUG if verbose else logging.INFO
logger.setLevel(target_level)
info_handler = None
error_handler = None
file_handlers = []
for handler in logger.handlers:
if isinstance(handler, logging.StreamHandler):
if handler.stream == sys.stdout:
info_handler = handler
elif handler.stream == sys.stderr:
error_handler = handler
elif isinstance(handler, logging.FileHandler):
file_handlers.append(handler)
formatter = logging.Formatter(fmt="%(levelname)s: %(message)s")
if info_handler is None:
info_handler = logging.StreamHandler(sys.stdout)
info_handler.addFilter(LevelFilter(logging.DEBUG, logging.INFO))
info_handler.setFormatter(formatter)
logger.addHandler(info_handler)
info_handler.setLevel(target_level)
if error_handler is None:
error_handler = logging.StreamHandler(sys.stderr)
error_handler.setLevel(logging.WARNING)
error_handler.setFormatter(formatter)
logger.addHandler(error_handler)
if log_file:
if not file_handlers:
log_file.parent.mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(log_file, encoding="utf-8")
file_handler.setLevel(logging.DEBUG)
file_formatter = logging.Formatter(
fmt="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
else:
for handler in file_handlers:
handler.close()
logger.removeHandler(handler)
return logger
def get_logger() -> logging.Logger:
return logging.getLogger("skywipe")
def handle_error(error: Exception, logger: logging.Logger, exit_on_error: bool = False) -> None:
if isinstance(error, ValueError):
logger.error(f"{error}")
else:
logger.error(f"Unexpected error: {error}", exc_info=True)
if exit_on_error:
sys.exit(1)
else:
raise error

View File

@@ -1,96 +0,0 @@
"""Media post deletion module for Skywipe"""
import time
from .auth import Auth
from .configure import Configuration
def has_media_embed(post_record):
embed = getattr(post_record, 'embed', None)
if not embed:
return False
embed_type = getattr(embed, 'py_type', None)
media_types = {
'app.bsky.embed.images',
'app.bsky.embed.video',
'app.bsky.embed.external'
}
if embed_type:
embed_type_base = embed_type.split('#')[0]
if embed_type_base in media_types:
return True
if embed_type_base in ('app.bsky.embed.recordWithMedia', 'app.bsky.embed.record_with_media'):
media = getattr(embed, 'media', None)
if media:
media_type = getattr(media, 'py_type', None)
if media_type:
media_type_base = media_type.split('#')[0]
if media_type_base in media_types:
return True
for attr in ('images', 'video', 'external'):
if hasattr(embed, attr):
return True
if isinstance(embed, dict) and embed.get(attr):
return True
return False
def delete_posts_with_medias():
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting media post deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_deleted = 0
while True:
if cursor:
response = client.get_author_feed(
actor=did, limit=batch_size, cursor=cursor)
else:
response = client.get_author_feed(actor=did, limit=batch_size)
posts = response.feed
if not posts:
break
for post in posts:
post_record = post.post
if not has_media_embed(post_record):
if verbose:
print(f"Skipping post without media: {post_record.uri}")
continue
try:
client.delete_post(post_record.uri)
total_deleted += 1
if verbose:
print(f"Deleted post with media: {post_record.uri}")
except Exception as e:
if verbose:
print(f"Error deleting post {post_record.uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
print(f"Deleted {total_deleted} posts with media.")

209
skywipe/operations.py Normal file
View File

@@ -0,0 +1,209 @@
"""Shared operation utilities and strategies for Skywipe"""
import time
from typing import Callable, Optional, Any
from atproto import models
from .auth import Auth
from .configure import Configuration
from .logger import get_logger, ProgressTracker
class OperationContext:
def __init__(self, client=None, config_data=None):
self.logger = get_logger()
self.client, self.did = self._initialize_client(client)
self.config_data = self._initialize_config(config_data)
self.batch_size = self.config_data.get("batch_size", 10)
self.delay = self.config_data.get("delay", 1)
def _initialize_client(self, client):
if client is not None:
return client, client.me.did
try:
auth = Auth()
client = auth.login()
return client, client.me.did
except (ValueError, FileNotFoundError) as e:
self.logger.error(f"Configuration error: {e}")
raise
except Exception as e:
self.logger.error(
f"Unexpected error during initialization: {e}", exc_info=True)
raise ValueError(
f"Failed to initialize operation context: {e}") from e
def _initialize_config(self, config_data):
if config_data is not None:
return config_data
try:
config = Configuration()
return config.load()
except (ValueError, FileNotFoundError) as e:
self.logger.error(f"Configuration error: {e}")
raise
except Exception as e:
self.logger.error(
f"Unexpected error loading configuration: {e}", exc_info=True)
raise ValueError(
f"Failed to load configuration: {e}") from e
class BaseStrategy:
def get_cursor(self, response):
return response.cursor
class RecordDeletionStrategy(BaseStrategy):
def __init__(self, collection: str):
self.collection = collection
def fetch(self, context: OperationContext, cursor: Optional[str] = None):
list_params = models.ComAtprotoRepoListRecords.Params(
repo=context.did,
collection=self.collection,
limit=context.batch_size,
cursor=cursor
)
return context.client.com.atproto.repo.list_records(params=list_params)
def extract_items(self, response):
return response.records
def process_item(self, record, context: OperationContext):
record_uri = record.uri
rkey = record_uri.rsplit("/", 1)[-1]
delete_data = {
"repo": context.did,
"collection": self.collection,
"rkey": rkey
}
context.client.com.atproto.repo.delete_record(data=delete_data)
context.logger.debug(f"Deleted: {record_uri}")
class FeedStrategy(BaseStrategy):
def fetch(self, context: OperationContext, cursor: Optional[str] = None):
if cursor:
return context.client.get_author_feed(
actor=context.did, limit=context.batch_size, cursor=cursor
)
return context.client.get_author_feed(actor=context.did, limit=context.batch_size)
def extract_items(self, response):
return response.feed
def process_item(self, post, context: OperationContext):
uri = post.post.uri
context.client.delete_post(uri)
context.logger.debug(f"Deleted post: {uri}")
class BookmarkStrategy(BaseStrategy):
def fetch(self, context: OperationContext, cursor: Optional[str] = None):
get_params = models.AppBskyBookmarkGetBookmarks.Params(
limit=context.batch_size,
cursor=cursor
)
return context.client.app.bsky.bookmark.get_bookmarks(params=get_params)
def extract_items(self, response):
return response.bookmarks
def process_item(self, bookmark, context: OperationContext):
bookmark_uri = self._extract_bookmark_uri(bookmark)
if not bookmark_uri:
raise ValueError("Unable to find bookmark URI")
delete_data = models.AppBskyBookmarkDeleteBookmark.Data(
uri=bookmark_uri)
context.client.app.bsky.bookmark.delete_bookmark(data=delete_data)
context.logger.debug(f"Deleted bookmark: {bookmark_uri}")
def _extract_bookmark_uri(self, bookmark):
if hasattr(bookmark, "uri"):
return bookmark.uri
for attr_name in ("subject", "record", "post", "item"):
if hasattr(bookmark, attr_name):
nested = getattr(bookmark, attr_name)
if hasattr(nested, "uri"):
return nested.uri
return None
class Operation:
def __init__(
self,
operation_name: str,
strategy_type: str = "feed",
collection: Optional[str] = None,
filter_fn: Optional[Callable[[Any], bool]] = None,
client=None,
config_data=None
):
self.operation_name = operation_name
self.filter_fn = filter_fn
self._client = client
self._config_data = config_data
if strategy_type == "record":
if not collection:
raise ValueError("Collection is required for record strategy")
self.strategy = RecordDeletionStrategy(collection)
elif strategy_type == "bookmark":
self.strategy = BookmarkStrategy()
else:
self.strategy = FeedStrategy()
def run(self) -> int:
context = OperationContext(
client=self._client, config_data=self._config_data)
progress = ProgressTracker(operation=self.operation_name)
context.logger.info(
f"Starting {self.operation_name} with batch_size={context.batch_size}, delay={context.delay}s"
)
cursor = None
total_processed = 0
batch_num = 0
while True:
batch_num += 1
try:
response = self.strategy.fetch(context, cursor)
items = self.strategy.extract_items(response)
except Exception as e:
context.logger.error(
f"Error fetching items for batch {batch_num}: {e}", exc_info=True)
break
if not items:
break
progress.batch(batch_num, len(items))
for item in items:
if self.filter_fn and not self.filter_fn(item):
continue
try:
self.strategy.process_item(item, context)
total_processed += 1
progress.update(1)
except Exception as e:
context.logger.error(f"Error processing item: {e}")
cursor = self.strategy.get_cursor(response)
if not cursor:
break
if context.delay > 0:
time.sleep(context.delay)
context.logger.info(
f"{self.operation_name}: {total_processed} items processed.")
return total_processed

70
skywipe/post_analysis.py Normal file
View File

@@ -0,0 +1,70 @@
"""Post analysis utilities for Skywipe"""
class PostAnalyzer:
MEDIA_TYPES = {
'app.bsky.embed.images',
'app.bsky.embed.video',
'app.bsky.embed.external'
}
QUOTE_TYPES = {
'app.bsky.embed.record',
'app.bsky.embed.recordWithMedia',
'app.bsky.embed.record_with_media'
}
QUOTE_WITH_MEDIA_TYPES = {
'app.bsky.embed.recordWithMedia',
'app.bsky.embed.record_with_media'
}
@staticmethod
def _get_embed(post_record):
return getattr(post_record, 'embed', None)
@staticmethod
def _get_embed_type_base(embed):
embed_type = getattr(embed, 'py_type', None)
if embed_type:
return embed_type.split('#')[0]
return None
@staticmethod
def has_media(post_record):
embed = PostAnalyzer._get_embed(post_record)
if not embed:
return False
embed_type_base = PostAnalyzer._get_embed_type_base(embed)
if embed_type_base:
if embed_type_base in PostAnalyzer.MEDIA_TYPES:
return True
if embed_type_base in PostAnalyzer.QUOTE_WITH_MEDIA_TYPES:
media = getattr(embed, 'media', None)
if media:
media_type_base = PostAnalyzer._get_embed_type_base(media)
if media_type_base and media_type_base in PostAnalyzer.MEDIA_TYPES:
return True
for attr in ('images', 'video', 'external'):
if hasattr(embed, attr):
return True
if isinstance(embed, dict) and embed.get(attr):
return True
return False
@staticmethod
def has_quote(post_record):
embed = PostAnalyzer._get_embed(post_record)
if not embed:
return False
embed_type_base = PostAnalyzer._get_embed_type_base(embed)
if embed_type_base and embed_type_base in PostAnalyzer.QUOTE_TYPES:
return True
return (hasattr(embed, 'record') or
(isinstance(embed, dict) and embed.get('record')))

View File

@@ -1,56 +0,0 @@
"""Post deletion module for Skywipe"""
import time
from .auth import Auth
from .configure import Configuration
def delete_posts():
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting post deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_deleted = 0
while True:
if cursor:
response = client.get_author_feed(
actor=did, limit=batch_size, cursor=cursor)
else:
response = client.get_author_feed(actor=did, limit=batch_size)
posts = response.feed
if not posts:
break
post_uris = [post.post.uri for post in posts]
for uri in post_uris:
try:
client.delete_post(uri)
total_deleted += 1
if verbose:
print(f"Deleted post: {uri}")
except Exception as e:
if verbose:
print(f"Error deleting post {uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
print(f"Deleted {total_deleted} posts.")

View File

@@ -1,80 +0,0 @@
"""Quote post deletion module for Skywipe"""
import time
from .auth import Auth
from .configure import Configuration
def has_quote_embed(post_record):
embed = getattr(post_record, 'embed', None)
if not embed:
return False
embed_type = getattr(embed, 'py_type', None)
if embed_type:
embed_type_base = embed_type.split('#')[0]
quote_types = {
'app.bsky.embed.record',
'app.bsky.embed.recordWithMedia',
'app.bsky.embed.record_with_media'
}
if embed_type_base in quote_types:
return True
if hasattr(embed, 'record') or (isinstance(embed, dict) and embed.get('record')):
return True
return False
def delete_quotes():
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting quote post deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_deleted = 0
while True:
response = client.get_author_feed(
actor=did, limit=batch_size, cursor=cursor)
posts = response.feed
if not posts:
break
for post in posts:
post_record = post.post
if not has_quote_embed(post_record):
if verbose:
print(f"Skipping post without quote: {post_record.uri}")
continue
try:
client.delete_post(post_record.uri)
total_deleted += 1
if verbose:
print(f"Deleted quote post: {post_record.uri}")
except Exception as e:
if verbose:
print(f"Error deleting quote post {post_record.uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
print(f"Deleted {total_deleted} quote posts.")

View File

@@ -1,69 +0,0 @@
"""Repost undoing module for Skywipe"""
import time
from atproto import models
from .auth import Auth
from .configure import Configuration
REPOST_COLLECTION = "app.bsky.feed.repost"
def undo_reposts():
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting repost deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_undone = 0
while True:
list_params = models.ComAtprotoRepoListRecords.Params(
repo=did,
collection=REPOST_COLLECTION,
limit=batch_size,
cursor=cursor
)
response = client.com.atproto.repo.list_records(params=list_params)
records = response.records
if not records:
break
for record in records:
try:
record_uri = record.uri
rkey = record_uri.rsplit("/", 1)[-1]
delete_data = {
"repo": did,
"collection": REPOST_COLLECTION,
"rkey": rkey
}
client.com.atproto.repo.delete_record(data=delete_data)
total_undone += 1
if verbose:
print(f"Undone repost: {record_uri}")
except Exception as e:
record_uri = getattr(record, "uri", "unknown")
if verbose:
print(f"Error undoing repost {record_uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
print(f"Undone {total_undone} reposts.")

31
skywipe/safeguard.py Normal file
View File

@@ -0,0 +1,31 @@
"""Safeguard module for Skywipe"""
import sys
import logging
from typing import Optional
from .logger import get_logger
CONFIRM_RESPONSES = {"yes", "y"}
def require_confirmation(operation: str, skip_confirmation: bool = False, logger: Optional[logging.Logger] = None) -> None:
if skip_confirmation:
return
if logger is None:
logger = get_logger()
logger.warning(f"This will {operation}")
logger.warning("This operation is DESTRUCTIVE and cannot be undone!")
try:
response = input(
"Are you sure you want to continue? (y/N): ").strip().lower()
except (EOFError, KeyboardInterrupt):
logger.info("\nOperation cancelled.")
sys.exit(0)
if response not in CONFIRM_RESPONSES:
logger.info("Operation cancelled.")
sys.exit(0)