Compare commits

..

21 Commits

Author SHA1 Message Date
a818df4a6c refactor: replace hardcoded tuple with constants (consistency+readability) 2025-12-20 20:50:06 +01:00
9aec57bd56 fix: logger's already setup 2025-12-20 20:48:28 +01:00
ec9943822c fix: replace hardcoded list with a registry-derived list 2025-12-20 20:47:15 +01:00
0240ff9f8e clean: remove duplicate run_operation, dead code now 2025-12-20 20:44:42 +01:00
45e2e1eb00 clean: remove is_logged() (not used) 2025-12-20 20:44:29 +01:00
ff20228fa6 docs: update roadmap 2025-12-20 17:25:02 +01:00
9d9c09d56a refactor: use our new Operation and PostAnalyzer tools 2025-12-20 17:24:29 +01:00
64355fbeeb clean: remove unused import 2025-12-20 17:24:15 +01:00
4a337e6b20 feat: add post analysis utilities 2025-12-20 17:24:05 +01:00
f27be4d603 refactor: add OperationContext and Operations classes 2025-12-20 17:23:54 +01:00
9d254ac4b7 refactor: delete unitary "actions" py files 2025-12-20 17:23:35 +01:00
590cc03ba4 docs: refactor warning 2025-12-20 16:03:39 +01:00
c493a99860 docs: inform about log 2025-12-20 16:01:07 +01:00
3f9ef6527f docs: update roadmap 2025-12-20 16:00:03 +01:00
f53e5bb527 feat: implement our safeguard in commands 2025-12-20 15:59:54 +01:00
f2854a0df5 feat: implement safeguard and relocate log file 2025-12-20 15:59:41 +01:00
defd991006 feat: new safeguard module 2025-12-20 15:59:17 +01:00
f922e3cf9d docs: update roadmap 2025-12-20 15:46:48 +01:00
06d70edaf1 feat: use our new logger 2025-12-20 15:46:33 +01:00
58ab6cfafa feat: implement proper logging 2025-12-20 15:46:22 +01:00
dba06e642a docs: update roadmap 2025-12-19 14:39:15 +01:00
16 changed files with 430 additions and 550 deletions

View File

@@ -2,9 +2,9 @@
Skywipe is a work-in-progress Python 3.13+ CLI that helps you wipe data from your Bluesky account using the AT Protocol SDK. Skywipe is a work-in-progress Python 3.13+ CLI that helps you wipe data from your Bluesky account using the AT Protocol SDK.
## Warning **Warning:** This tool performs _**destructive operations**_.
This tool performs destructive operations. Only use it if you intend to erase data from your Bluesky account. Only use it if you intend to permanently erase data from your Bluesky account.
## Requirements ## Requirements
@@ -34,6 +34,10 @@ uv run python -m skywipe.cli follows # unfollow all
uv run python -m skywipe.cli bookmarks # delete bookmarks uv run python -m skywipe.cli bookmarks # delete bookmarks
``` ```
Use the `--yes` flag to skip the confirmation prompt and proceed with the operation.
A log of the operations will be saved in `~/.cache/skywipe/skywipe.log`.
## Configuration ## Configuration
If you run the tool for the first time, it will prompt you to use `skywipe configure` to create the configuration file, which is located in `~/.config/skywipe/config.yml` : If you run the tool for the first time, it will prompt you to use `skywipe configure` to create the configuration file, which is located in `~/.config/skywipe/config.yml` :
@@ -61,12 +65,9 @@ BE SURE TO USE A [BLUESKY APP PASSWORD](https://blueskyfeeds.com/faq-app-passwor
- [x] unfollow accounts - [x] unfollow accounts
- [x] remove bookmarks - [x] remove bookmarks
- [x] make `all` run the other commands - [x] make `all` run the other commands
- [ ] add simple progress and logging - [x] add simple progress and logging
- [ ] add safeguards like confirmations and clear dry-run info - [x] add safeguards (confirmation, dry-run flag)
- [x] decent code architecture
Once it's done, we'll think:
- [ ] decent code architecture
- [ ] installation and run process - [ ] installation and run process
## License ## License

View File

@@ -23,6 +23,3 @@ class Auth:
self.client.login(handle, password) self.client.login(handle, password)
return self.client return self.client
def is_logged(self) -> bool:
return bool(getattr(self.client, "me", None))

View File

@@ -1,75 +0,0 @@
"""Bookmark deletion module for Skywipe"""
import time
from atproto import models
from .auth import Auth
from .configure import Configuration
def delete_bookmarks():
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting bookmark deletion with batch_size={batch_size}, delay={delay}s")
cursor = None
total_deleted = 0
while True:
get_params = models.AppBskyBookmarkGetBookmarks.Params(
limit=batch_size,
cursor=cursor
)
response = client.app.bsky.bookmark.get_bookmarks(params=get_params)
bookmarks = response.bookmarks
if not bookmarks:
break
for bookmark in bookmarks:
try:
bookmark_uri = None
if hasattr(bookmark, "uri"):
bookmark_uri = bookmark.uri
else:
for attr_name in ("subject", "record", "post", "item"):
if hasattr(bookmark, attr_name):
nested = getattr(bookmark, attr_name)
if hasattr(nested, "uri"):
bookmark_uri = nested.uri
break
if not bookmark_uri:
if verbose:
print(f"Skipping bookmark: unable to find uri")
continue
delete_data = models.AppBskyBookmarkDeleteBookmark.Data(
uri=bookmark_uri
)
client.app.bsky.bookmark.delete_bookmark(data=delete_data)
total_deleted += 1
if verbose:
print(f"Deleted bookmark: {bookmark_uri}")
except Exception as e:
bookmark_uri = getattr(bookmark, "uri", "unknown")
if verbose:
print(f"Error deleting bookmark {bookmark_uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
print(f"Deleted {total_deleted} bookmarks.")

View File

@@ -2,9 +2,11 @@
import sys import sys
import argparse import argparse
from pathlib import Path
from .commands import registry from .commands import registry
from .configure import Configuration from .configure import Configuration
from .logger import setup_logger, get_logger
def create_parser(): def create_parser():
@@ -15,6 +17,12 @@ def create_parser():
epilog="WARNING: This tool performs destructive operations. Only use it if you intend to erase data from your Bluesky account." epilog="WARNING: This tool performs destructive operations. Only use it if you intend to erase data from your Bluesky account."
) )
parser.add_argument(
"--yes",
action="store_true",
help="Skip confirmation prompt and proceed with destructive operations"
)
subparsers = parser.add_subparsers( subparsers = parser.add_subparsers(
dest="command", dest="command",
help="Command to execute", help="Command to execute",
@@ -31,8 +39,9 @@ def create_parser():
def require_config(): def require_config():
config = Configuration() config = Configuration()
if not config.exists(): if not config.exists():
print("Error: Configuration file not found.") logger = setup_logger(verbose=False)
print("You must run 'skywipe configure' first.") logger.error("Configuration file not found.")
logger.error("You must run 'skywipe configure' first.")
sys.exit(1) sys.exit(1)
@@ -42,11 +51,24 @@ def main():
if registry.requires_config(args.command): if registry.requires_config(args.command):
require_config() require_config()
config = Configuration()
config_data = config.load()
verbose = config_data.get("verbose", False)
log_file = Path.home() / ".cache" / "skywipe" / "skywipe.log"
setup_logger(verbose=verbose, log_file=log_file)
else:
setup_logger(verbose=False)
try: try:
registry.execute(args.command) registry.execute(
args.command, skip_confirmation=getattr(args, "yes", False))
except ValueError as e: except ValueError as e:
print(f"Error: {e}") logger = get_logger()
logger.error(f"{e}")
sys.exit(1)
except Exception as e:
logger = get_logger()
logger.error(f"Unexpected error: {e}", exc_info=True)
sys.exit(1) sys.exit(1)

View File

@@ -2,16 +2,13 @@
from typing import Callable, Dict, Optional from typing import Callable, Dict, Optional
from .configure import Configuration from .configure import Configuration
from .posts import delete_all_posts from .operations import Operation
from .medias import delete_posts_with_medias from .post_analysis import PostAnalyzer
from .likes import undo_likes from .logger import get_logger
from .reposts import undo_reposts from .safeguard import require_confirmation
from .quotes import delete_quotes_posts
from .follows import unfollow_all
from .bookmarks import delete_bookmarks
CommandHandler = Callable[[], None] CommandHandler = Callable[..., None]
class CommandRegistry: class CommandRegistry:
@@ -43,10 +40,13 @@ class CommandRegistry:
def get_all_commands(self) -> Dict[str, str]: def get_all_commands(self) -> Dict[str, str]:
return self._help_texts.copy() return self._help_texts.copy()
def execute(self, name: str): def execute(self, name: str, skip_confirmation: bool = False):
handler = self.get_handler(name) handler = self.get_handler(name)
if handler: if handler:
handler() if name == "configure":
handler()
else:
handler(skip_confirmation)
else: else:
raise ValueError(f"Unknown command: {name}") raise ValueError(f"Unknown command: {name}")
@@ -59,43 +59,67 @@ def run_configure():
config.create() config.create()
def run_posts(): def run_posts(skip_confirmation: bool = False):
delete_all_posts() require_confirmation("delete all posts", skip_confirmation)
Operation("Deleting posts").run()
def run_medias(): def run_medias(skip_confirmation: bool = False):
delete_posts_with_medias() require_confirmation("delete all posts with media", skip_confirmation)
Operation("Deleting posts with media",
filter_fn=lambda post: PostAnalyzer.has_media(post.post)).run()
def run_likes(): def run_likes(skip_confirmation: bool = False):
undo_likes() require_confirmation("undo all likes", skip_confirmation)
Operation("Undoing likes", strategy_type="record",
collection="app.bsky.feed.like").run()
def run_reposts(): def run_reposts(skip_confirmation: bool = False):
undo_reposts() require_confirmation("undo all reposts", skip_confirmation)
Operation("Undoing reposts", strategy_type="record",
collection="app.bsky.feed.repost").run()
def run_quotes(): def run_quotes(skip_confirmation: bool = False):
delete_quotes_posts() require_confirmation("delete all quote posts", skip_confirmation)
Operation("Deleting quote posts",
filter_fn=lambda post: PostAnalyzer.has_quote(post.post)).run()
def run_follows(): def run_follows(skip_confirmation: bool = False):
unfollow_all() require_confirmation("unfollow all accounts", skip_confirmation)
Operation("Unfollowing accounts", strategy_type="record",
collection="app.bsky.graph.follow").run()
def run_bookmarks(): def run_bookmarks(skip_confirmation: bool = False):
delete_bookmarks() require_confirmation("delete all bookmarks", skip_confirmation)
Operation("Deleting bookmarks", strategy_type="bookmark").run()
def run_all(): def run_all(skip_confirmation: bool = False):
commands = ["posts", "likes", "reposts", "follows", "bookmarks"] logger = get_logger()
all_commands = registry.get_all_commands()
commands = [cmd for cmd in all_commands.keys()
if cmd not in ("configure", "all")]
commands_str = ", ".join(commands)
require_confirmation(
f"run all cleanup commands ({commands_str})", skip_confirmation)
logger.info("Running all cleanup commands...")
for cmd in commands: for cmd in commands:
try: try:
registry.execute(cmd) logger.info(f"Starting command: {cmd}")
registry.execute(cmd, skip_confirmation=True)
logger.info(f"Completed command: {cmd}")
except Exception as e: except Exception as e:
print(f"Error running '{cmd}': {e}") logger.error(f"Error running '{cmd}': {e}", exc_info=True)
continue continue
logger.info("All commands completed.")
registry.register("configure", run_configure, registry.register("configure", run_configure,

View File

@@ -3,6 +3,7 @@
import getpass import getpass
from pathlib import Path from pathlib import Path
import yaml import yaml
from .logger import setup_logger
class Configuration: class Configuration:
@@ -13,10 +14,12 @@ class Configuration:
return self.config_file.exists() return self.config_file.exists()
def create(self): def create(self):
logger = setup_logger(verbose=False)
if self.exists(): if self.exists():
overwrite = input( overwrite = input(
"Configuration already exists. Overwrite? (y/N): ").strip().lower() "Configuration already exists. Overwrite? (y/N): ").strip().lower()
if overwrite not in ("y", "yes"): if overwrite not in ("y", "yes"):
logger.info("Configuration creation cancelled.")
return return
config_dir = self.config_file.parent config_dir = self.config_file.parent
@@ -37,7 +40,7 @@ class Configuration:
batch_size = int(batch_size) batch_size = int(batch_size)
delay = int(delay) delay = int(delay)
except ValueError: except ValueError:
print("Error: batch_size and delay must be integers") logger.error("batch_size and delay must be integers")
return return
config_data = { config_data = {
@@ -51,7 +54,7 @@ class Configuration:
with open(self.config_file, "w") as f: with open(self.config_file, "w") as f:
yaml.dump(config_data, f, default_flow_style=False) yaml.dump(config_data, f, default_flow_style=False)
print(f"\nConfiguration saved to {self.config_file}") logger.info(f"Configuration saved to {self.config_file}")
def load(self) -> dict: def load(self) -> dict:
if not self.exists(): if not self.exists():

View File

@@ -1,69 +0,0 @@
"""Follow undoing module for Skywipe"""
import time
from atproto import models
from .auth import Auth
from .configure import Configuration
FOLLOW_COLLECTION = "app.bsky.graph.follow"
def unfollow_all():
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting unfollow operation with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_unfollowed = 0
while True:
list_params = models.ComAtprotoRepoListRecords.Params(
repo=did,
collection=FOLLOW_COLLECTION,
limit=batch_size,
cursor=cursor
)
response = client.com.atproto.repo.list_records(params=list_params)
records = response.records
if not records:
break
for record in records:
try:
record_uri = record.uri
rkey = record_uri.rsplit("/", 1)[-1]
delete_data = {
"repo": did,
"collection": FOLLOW_COLLECTION,
"rkey": rkey
}
client.com.atproto.repo.delete_record(data=delete_data)
total_unfollowed += 1
if verbose:
print(f"Unfollowed: {record_uri}")
except Exception as e:
record_uri = getattr(record, "uri", "unknown")
if verbose:
print(f"Error unfollowing {record_uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
print(f"Unfollowed {total_unfollowed} accounts.")

View File

@@ -1,69 +0,0 @@
"""Like undoing module for Skywipe"""
import time
from atproto import models
from .auth import Auth
from .configure import Configuration
LIKE_COLLECTION = "app.bsky.feed.like"
def undo_likes():
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting like deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_undone = 0
while True:
list_params = models.ComAtprotoRepoListRecords.Params(
repo=did,
collection=LIKE_COLLECTION,
limit=batch_size,
cursor=cursor
)
response = client.com.atproto.repo.list_records(params=list_params)
records = response.records
if not records:
break
for record in records:
try:
record_uri = record.uri
rkey = record_uri.rsplit("/", 1)[-1]
delete_data = {
"repo": did,
"collection": LIKE_COLLECTION,
"rkey": rkey
}
client.com.atproto.repo.delete_record(data=delete_data)
total_undone += 1
if verbose:
print(f"Undone like: {record_uri}")
except Exception as e:
record_uri = getattr(record, "uri", "unknown")
if verbose:
print(f"Error undoing like {record_uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
print(f"Undone {total_undone} likes.")

73
skywipe/logger.py Normal file
View File

@@ -0,0 +1,73 @@
"""Centralized logging module for Skywipe"""
import logging
import sys
from pathlib import Path
from typing import Optional
class ProgressTracker:
def __init__(self, operation: str = "Processing"):
self.current = 0
self.operation = operation
def update(self, count: int = 1):
self.current += count
def batch(self, batch_num: int, batch_size: int, total_batches: Optional[int] = None):
logger = logging.getLogger("skywipe.progress")
if total_batches:
logger.info(
f"{self.operation} - batch {batch_num}/{total_batches} ({batch_size} items)"
)
else:
logger.info(
f"{self.operation} - batch {batch_num} ({batch_size} items)")
class LevelFilter(logging.Filter):
def __init__(self, min_level: int, max_level: int):
super().__init__()
self.min_level = min_level
self.max_level = max_level
def filter(self, record: logging.LogRecord) -> bool:
return self.min_level <= record.levelno <= self.max_level
def setup_logger(verbose: bool = False, log_file: Optional[Path] = None) -> logging.Logger:
logger = logging.getLogger("skywipe")
logger.setLevel(logging.DEBUG if verbose else logging.INFO)
if logger.handlers:
return logger
formatter = logging.Formatter(fmt="%(levelname)s: %(message)s")
info_handler = logging.StreamHandler(sys.stdout)
info_handler.setLevel(logging.DEBUG if verbose else logging.INFO)
info_handler.addFilter(LevelFilter(logging.DEBUG, logging.INFO))
info_handler.setFormatter(formatter)
logger.addHandler(info_handler)
error_handler = logging.StreamHandler(sys.stderr)
error_handler.setLevel(logging.WARNING)
error_handler.setFormatter(formatter)
logger.addHandler(error_handler)
if log_file:
log_file.parent.mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(log_file, encoding="utf-8")
file_handler.setLevel(logging.DEBUG)
file_formatter = logging.Formatter(
fmt="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
return logger
def get_logger() -> logging.Logger:
return logging.getLogger("skywipe")

View File

@@ -1,92 +0,0 @@
"""Media post deletion module for Skywipe"""
import time
from .auth import Auth
from .configure import Configuration
def delete_posts_with_medias():
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting media post deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_deleted = 0
while True:
response = client.get_author_feed(
actor=did, limit=batch_size, cursor=cursor)
posts = response.feed
if not posts:
break
for post in posts:
post_record = post.post
embed = getattr(post_record, 'embed', None)
has_media = False
if embed:
embed_type = getattr(embed, 'py_type', None)
media_types = {
'app.bsky.embed.images',
'app.bsky.embed.video',
'app.bsky.embed.external'
}
if embed_type:
embed_type_base = embed_type.split('#')[0]
if embed_type_base in media_types:
has_media = True
if not has_media and embed_type_base in ('app.bsky.embed.recordWithMedia', 'app.bsky.embed.record_with_media'):
media = getattr(embed, 'media', None)
if media:
media_type = getattr(media, 'py_type', None)
if media_type:
media_type_base = media_type.split('#')[0]
if media_type_base in media_types:
has_media = True
if not has_media:
for attr in ('images', 'video', 'external'):
if hasattr(embed, attr):
has_media = True
break
if isinstance(embed, dict) and embed.get(attr):
has_media = True
break
if not has_media:
if verbose:
print(f"Skipping post without media: {post_record.uri}")
continue
try:
client.delete_post(post_record.uri)
total_deleted += 1
if verbose:
print(f"Deleted post with media: {post_record.uri}")
except Exception as e:
if verbose:
print(f"Error deleting post {post_record.uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
print(f"Deleted {total_deleted} posts with media.")

173
skywipe/operations.py Normal file
View File

@@ -0,0 +1,173 @@
"""Shared operation utilities and strategies for Skywipe"""
import time
from typing import Callable, Optional, Any
from atproto import models
from .auth import Auth
from .configure import Configuration
from .logger import get_logger, ProgressTracker
class OperationContext:
def __init__(self):
self.logger = get_logger()
self.auth = Auth()
self.client = self.auth.login()
self.config = Configuration()
self.config_data = self.config.load()
self.batch_size = self.config_data.get("batch_size", 10)
self.delay = self.config_data.get("delay", 1)
self.did = self.client.me.did
class RecordDeletionStrategy:
def __init__(self, collection: str):
self.collection = collection
def fetch(self, context: OperationContext, cursor: Optional[str] = None):
list_params = models.ComAtprotoRepoListRecords.Params(
repo=context.did,
collection=self.collection,
limit=context.batch_size,
cursor=cursor
)
return context.client.com.atproto.repo.list_records(params=list_params)
def extract_items(self, response):
return response.records
def get_cursor(self, response):
return response.cursor
def process_item(self, record, context: OperationContext):
record_uri = record.uri
rkey = record_uri.rsplit("/", 1)[-1]
delete_data = {
"repo": context.did,
"collection": self.collection,
"rkey": rkey
}
context.client.com.atproto.repo.delete_record(data=delete_data)
context.logger.debug(f"Deleted: {record_uri}")
class FeedStrategy:
def fetch(self, context: OperationContext, cursor: Optional[str] = None):
if cursor:
return context.client.get_author_feed(
actor=context.did, limit=context.batch_size, cursor=cursor
)
return context.client.get_author_feed(actor=context.did, limit=context.batch_size)
def extract_items(self, response):
return response.feed
def get_cursor(self, response):
return response.cursor
def process_item(self, post, context: OperationContext):
uri = post.post.uri
context.client.delete_post(uri)
context.logger.debug(f"Deleted post: {uri}")
class BookmarkStrategy:
def fetch(self, context: OperationContext, cursor: Optional[str] = None):
get_params = models.AppBskyBookmarkGetBookmarks.Params(
limit=context.batch_size,
cursor=cursor
)
return context.client.app.bsky.bookmark.get_bookmarks(params=get_params)
def extract_items(self, response):
return response.bookmarks
def get_cursor(self, response):
return response.cursor
def process_item(self, bookmark, context: OperationContext):
bookmark_uri = self._extract_bookmark_uri(bookmark)
if not bookmark_uri:
raise ValueError("Unable to find bookmark URI")
delete_data = models.AppBskyBookmarkDeleteBookmark.Data(
uri=bookmark_uri)
context.client.app.bsky.bookmark.delete_bookmark(data=delete_data)
context.logger.debug(f"Deleted bookmark: {bookmark_uri}")
def _extract_bookmark_uri(self, bookmark):
if hasattr(bookmark, "uri"):
return bookmark.uri
for attr_name in ("subject", "record", "post", "item"):
if hasattr(bookmark, attr_name):
nested = getattr(bookmark, attr_name)
if hasattr(nested, "uri"):
return nested.uri
return None
class Operation:
def __init__(
self,
operation_name: str,
strategy_type: str = "feed",
collection: Optional[str] = None,
filter_fn: Optional[Callable[[Any], bool]] = None
):
self.operation_name = operation_name
self.filter_fn = filter_fn
if strategy_type == "record":
if not collection:
raise ValueError("Collection is required for record strategy")
self.strategy = RecordDeletionStrategy(collection)
elif strategy_type == "bookmark":
self.strategy = BookmarkStrategy()
else:
self.strategy = FeedStrategy()
def run(self) -> int:
context = OperationContext()
progress = ProgressTracker(operation=self.operation_name)
context.logger.info(
f"Starting {self.operation_name} with batch_size={context.batch_size}, delay={context.delay}s"
)
cursor = None
total_processed = 0
batch_num = 0
while True:
batch_num += 1
response = self.strategy.fetch(context, cursor)
items = self.strategy.extract_items(response)
if not items:
break
progress.batch(batch_num, len(items))
for item in items:
if self.filter_fn and not self.filter_fn(item):
continue
try:
self.strategy.process_item(item, context)
total_processed += 1
progress.update(1)
except Exception as e:
context.logger.error(f"Error processing item: {e}")
cursor = self.strategy.get_cursor(response)
if not cursor:
break
if context.delay > 0:
time.sleep(context.delay)
context.logger.info(
f"{self.operation_name}: {total_processed} items processed.")
return total_processed

65
skywipe/post_analysis.py Normal file
View File

@@ -0,0 +1,65 @@
"""Post analysis utilities for Skywipe"""
class PostAnalyzer:
MEDIA_TYPES = {
'app.bsky.embed.images',
'app.bsky.embed.video',
'app.bsky.embed.external'
}
QUOTE_TYPES = {
'app.bsky.embed.record',
'app.bsky.embed.recordWithMedia',
'app.bsky.embed.record_with_media'
}
QUOTE_WITH_MEDIA_TYPES = {
'app.bsky.embed.recordWithMedia',
'app.bsky.embed.record_with_media'
}
@staticmethod
def has_media(post_record):
embed = getattr(post_record, 'embed', None)
if not embed:
return False
embed_type = getattr(embed, 'py_type', None)
if embed_type:
embed_type_base = embed_type.split('#')[0]
if embed_type_base in PostAnalyzer.MEDIA_TYPES:
return True
if embed_type_base in PostAnalyzer.QUOTE_WITH_MEDIA_TYPES:
media = getattr(embed, 'media', None)
if media:
media_type = getattr(media, 'py_type', None)
if media_type:
media_type_base = media_type.split('#')[0]
if media_type_base in PostAnalyzer.MEDIA_TYPES:
return True
for attr in ('images', 'video', 'external'):
if hasattr(embed, attr):
return True
if isinstance(embed, dict) and embed.get(attr):
return True
return False
@staticmethod
def has_quote(post_record):
embed = getattr(post_record, 'embed', None)
if not embed:
return False
embed_type = getattr(embed, 'py_type', None)
if embed_type:
embed_type_base = embed_type.split('#')[0]
if embed_type_base in PostAnalyzer.QUOTE_TYPES:
return True
return (hasattr(embed, 'record') or
(isinstance(embed, dict) and embed.get('record')))

View File

@@ -1,56 +0,0 @@
"""Post deletion module for Skywipe"""
import time
from .auth import Auth
from .configure import Configuration
def delete_all_posts():
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting post deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_deleted = 0
while True:
if cursor:
response = client.get_author_feed(
actor=did, limit=batch_size, cursor=cursor)
else:
response = client.get_author_feed(actor=did, limit=batch_size)
posts = response.feed
if not posts:
break
post_uris = [post.post.uri for post in posts]
for uri in post_uris:
try:
client.delete_post(uri)
total_deleted += 1
if verbose:
print(f"Deleted post: {uri}")
except Exception as e:
if verbose:
print(f"Error deleting post {uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
print(f"Deleted {total_deleted} posts.")

View File

@@ -1,75 +0,0 @@
"""Quote post deletion module for Skywipe"""
import time
from .auth import Auth
from .configure import Configuration
def delete_quotes_posts():
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting quote post deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_deleted = 0
while True:
response = client.get_author_feed(
actor=did, limit=batch_size, cursor=cursor)
posts = response.feed
if not posts:
break
for post in posts:
post_record = post.post
embed = getattr(post_record, 'embed', None)
has_quote = False
if embed:
embed_type = getattr(embed, 'py_type', None)
if embed_type:
embed_type_base = embed_type.split('#')[0]
quote_types = {
'app.bsky.embed.record',
'app.bsky.embed.recordWithMedia',
'app.bsky.embed.record_with_media'
}
if embed_type_base in quote_types:
has_quote = True
if not has_quote and (hasattr(embed, 'record') or (isinstance(embed, dict) and embed.get('record'))):
has_quote = True
if not has_quote:
if verbose:
print(f"Skipping post without quote: {post_record.uri}")
continue
try:
client.delete_post(post_record.uri)
total_deleted += 1
if verbose:
print(f"Deleted quote post: {post_record.uri}")
except Exception as e:
if verbose:
print(f"Error deleting quote post {post_record.uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
print(f"Deleted {total_deleted} quote posts.")

View File

@@ -1,69 +0,0 @@
"""Repost undoing module for Skywipe"""
import time
from atproto import models
from .auth import Auth
from .configure import Configuration
REPOST_COLLECTION = "app.bsky.feed.repost"
def undo_reposts():
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting repost deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_undone = 0
while True:
list_params = models.ComAtprotoRepoListRecords.Params(
repo=did,
collection=REPOST_COLLECTION,
limit=batch_size,
cursor=cursor
)
response = client.com.atproto.repo.list_records(params=list_params)
records = response.records
if not records:
break
for record in records:
try:
record_uri = record.uri
rkey = record_uri.rsplit("/", 1)[-1]
delete_data = {
"repo": did,
"collection": REPOST_COLLECTION,
"rkey": rkey
}
client.com.atproto.repo.delete_record(data=delete_data)
total_undone += 1
if verbose:
print(f"Undone repost: {record_uri}")
except Exception as e:
record_uri = getattr(record, "uri", "unknown")
if verbose:
print(f"Error undoing repost {record_uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
print(f"Undone {total_undone} reposts.")

27
skywipe/safeguard.py Normal file
View File

@@ -0,0 +1,27 @@
"""Safeguard module for Skywipe"""
import sys
from .logger import get_logger
CONFIRM_RESPONSES = {"yes", "y"}
def require_confirmation(operation: str, skip_confirmation: bool = False) -> None:
if skip_confirmation:
return
logger = get_logger()
logger.warning(f"This will {operation}")
logger.warning("This operation is DESTRUCTIVE and cannot be undone!")
try:
response = input(
"Are you sure you want to continue? (y/N): ").strip().lower()
except (EOFError, KeyboardInterrupt):
logger.info("\nOperation cancelled.")
sys.exit(0)
if response not in CONFIRM_RESPONSES:
logger.info("Operation cancelled.")
sys.exit(0)