Compare commits

..

16 Commits

Author SHA1 Message Date
ff20228fa6 docs: update roadmap 2025-12-20 17:25:02 +01:00
9d9c09d56a refactor: use our new Operation and PostAnalyzer tools 2025-12-20 17:24:29 +01:00
64355fbeeb clean: remove unused import 2025-12-20 17:24:15 +01:00
4a337e6b20 feat: add post analysis utilities 2025-12-20 17:24:05 +01:00
f27be4d603 refactor: add OperationContext and Operations classes 2025-12-20 17:23:54 +01:00
9d254ac4b7 refactor: delete unitary "actions" py files 2025-12-20 17:23:35 +01:00
590cc03ba4 docs: refactor warning 2025-12-20 16:03:39 +01:00
c493a99860 docs: inform about log 2025-12-20 16:01:07 +01:00
3f9ef6527f docs: update roadmap 2025-12-20 16:00:03 +01:00
f53e5bb527 feat: implement our safeguard in commands 2025-12-20 15:59:54 +01:00
f2854a0df5 feat: implement safeguard and relocate log file 2025-12-20 15:59:41 +01:00
defd991006 feat: new safeguard module 2025-12-20 15:59:17 +01:00
f922e3cf9d docs: update roadmap 2025-12-20 15:46:48 +01:00
06d70edaf1 feat: use our new logger 2025-12-20 15:46:33 +01:00
58ab6cfafa feat: implement proper logging 2025-12-20 15:46:22 +01:00
dba06e642a docs: update roadmap 2025-12-19 14:39:15 +01:00
15 changed files with 469 additions and 546 deletions

View File

@@ -2,9 +2,9 @@
Skywipe is a work-in-progress Python 3.13+ CLI that helps you wipe data from your Bluesky account using the AT Protocol SDK.
## Warning
**Warning:** This tool performs _**destructive operations**_.
This tool performs destructive operations. Only use it if you intend to erase data from your Bluesky account.
Only use it if you intend to permanently erase data from your Bluesky account.
## Requirements
@@ -34,6 +34,10 @@ uv run python -m skywipe.cli follows # unfollow all
uv run python -m skywipe.cli bookmarks # delete bookmarks
```
Use the `--yes` flag to skip the confirmation prompt and proceed with the operation.
A log of the operations will be saved in `~/.cache/skywipe/skywipe.log`.
## Configuration
If you run the tool for the first time, it will prompt you to use `skywipe configure` to create the configuration file, which is located in `~/.config/skywipe/config.yml` :
@@ -61,12 +65,9 @@ BE SURE TO USE A [BLUESKY APP PASSWORD](https://blueskyfeeds.com/faq-app-passwor
- [x] unfollow accounts
- [x] remove bookmarks
- [x] make `all` run the other commands
- [ ] add simple progress and logging
- [ ] add safeguards like confirmations and clear dry-run info
Once it's done, we'll think:
- [ ] decent code architecture
- [x] add simple progress and logging
- [x] add safeguards (confirmation, dry-run flag)
- [x] decent code architecture
- [ ] installation and run process
## License

View File

@@ -1,75 +0,0 @@
"""Bookmark deletion module for Skywipe"""
import time
from atproto import models
from .auth import Auth
from .configure import Configuration
def delete_bookmarks():
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting bookmark deletion with batch_size={batch_size}, delay={delay}s")
cursor = None
total_deleted = 0
while True:
get_params = models.AppBskyBookmarkGetBookmarks.Params(
limit=batch_size,
cursor=cursor
)
response = client.app.bsky.bookmark.get_bookmarks(params=get_params)
bookmarks = response.bookmarks
if not bookmarks:
break
for bookmark in bookmarks:
try:
bookmark_uri = None
if hasattr(bookmark, "uri"):
bookmark_uri = bookmark.uri
else:
for attr_name in ("subject", "record", "post", "item"):
if hasattr(bookmark, attr_name):
nested = getattr(bookmark, attr_name)
if hasattr(nested, "uri"):
bookmark_uri = nested.uri
break
if not bookmark_uri:
if verbose:
print(f"Skipping bookmark: unable to find uri")
continue
delete_data = models.AppBskyBookmarkDeleteBookmark.Data(
uri=bookmark_uri
)
client.app.bsky.bookmark.delete_bookmark(data=delete_data)
total_deleted += 1
if verbose:
print(f"Deleted bookmark: {bookmark_uri}")
except Exception as e:
bookmark_uri = getattr(bookmark, "uri", "unknown")
if verbose:
print(f"Error deleting bookmark {bookmark_uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
print(f"Deleted {total_deleted} bookmarks.")

View File

@@ -2,9 +2,11 @@
import sys
import argparse
from pathlib import Path
from .commands import registry
from .configure import Configuration
from .logger import setup_logger
def create_parser():
@@ -15,6 +17,12 @@ def create_parser():
epilog="WARNING: This tool performs destructive operations. Only use it if you intend to erase data from your Bluesky account."
)
parser.add_argument(
"--yes",
action="store_true",
help="Skip confirmation prompt and proceed with destructive operations"
)
subparsers = parser.add_subparsers(
dest="command",
help="Command to execute",
@@ -31,8 +39,9 @@ def create_parser():
def require_config():
config = Configuration()
if not config.exists():
print("Error: Configuration file not found.")
print("You must run 'skywipe configure' first.")
logger = setup_logger(verbose=False)
logger.error("Configuration file not found.")
logger.error("You must run 'skywipe configure' first.")
sys.exit(1)
@@ -42,11 +51,24 @@ def main():
if registry.requires_config(args.command):
require_config()
config = Configuration()
config_data = config.load()
verbose = config_data.get("verbose", False)
log_file = Path.home() / ".cache" / "skywipe" / "skywipe.log"
setup_logger(verbose=verbose, log_file=log_file)
else:
setup_logger(verbose=False)
try:
registry.execute(args.command)
registry.execute(
args.command, skip_confirmation=getattr(args, "yes", False))
except ValueError as e:
print(f"Error: {e}")
logger = setup_logger(verbose=False)
logger.error(f"{e}")
sys.exit(1)
except Exception as e:
logger = setup_logger(verbose=False)
logger.error(f"Unexpected error: {e}", exc_info=True)
sys.exit(1)

View File

@@ -2,16 +2,13 @@
from typing import Callable, Dict, Optional
from .configure import Configuration
from .posts import delete_all_posts
from .medias import delete_posts_with_medias
from .likes import undo_likes
from .reposts import undo_reposts
from .quotes import delete_quotes_posts
from .follows import unfollow_all
from .bookmarks import delete_bookmarks
from .operations import Operation
from .post_analysis import PostAnalyzer
from .logger import get_logger
from .safeguard import require_confirmation
CommandHandler = Callable[[], None]
CommandHandler = Callable[..., None]
class CommandRegistry:
@@ -43,10 +40,13 @@ class CommandRegistry:
def get_all_commands(self) -> Dict[str, str]:
return self._help_texts.copy()
def execute(self, name: str):
def execute(self, name: str, skip_confirmation: bool = False):
handler = self.get_handler(name)
if handler:
handler()
if name == "configure":
handler()
else:
handler(skip_confirmation)
else:
raise ValueError(f"Unknown command: {name}")
@@ -59,43 +59,63 @@ def run_configure():
config.create()
def run_posts():
delete_all_posts()
def run_posts(skip_confirmation: bool = False):
require_confirmation("delete all posts", skip_confirmation)
Operation("Deleting posts").run()
def run_medias():
delete_posts_with_medias()
def run_medias(skip_confirmation: bool = False):
require_confirmation("delete all posts with media", skip_confirmation)
Operation("Deleting posts with media",
filter_fn=lambda post: PostAnalyzer.has_media(post.post)).run()
def run_likes():
undo_likes()
def run_likes(skip_confirmation: bool = False):
require_confirmation("undo all likes", skip_confirmation)
Operation("Undoing likes", strategy_type="record",
collection="app.bsky.feed.like").run()
def run_reposts():
undo_reposts()
def run_reposts(skip_confirmation: bool = False):
require_confirmation("undo all reposts", skip_confirmation)
Operation("Undoing reposts", strategy_type="record",
collection="app.bsky.feed.repost").run()
def run_quotes():
delete_quotes_posts()
def run_quotes(skip_confirmation: bool = False):
require_confirmation("delete all quote posts", skip_confirmation)
Operation("Deleting quote posts",
filter_fn=lambda post: PostAnalyzer.has_quote(post.post)).run()
def run_follows():
unfollow_all()
def run_follows(skip_confirmation: bool = False):
require_confirmation("unfollow all accounts", skip_confirmation)
Operation("Unfollowing accounts", strategy_type="record",
collection="app.bsky.graph.follow").run()
def run_bookmarks():
delete_bookmarks()
def run_bookmarks(skip_confirmation: bool = False):
require_confirmation("delete all bookmarks", skip_confirmation)
Operation("Deleting bookmarks", strategy_type="bookmark").run()
def run_all():
def run_all(skip_confirmation: bool = False):
logger = get_logger()
require_confirmation(
"run all cleanup commands (posts, likes, reposts, follows, bookmarks)", skip_confirmation)
commands = ["posts", "likes", "reposts", "follows", "bookmarks"]
logger.info("Running all cleanup commands...")
for cmd in commands:
try:
registry.execute(cmd)
logger.info(f"Starting command: {cmd}")
registry.execute(cmd, skip_confirmation=True)
logger.info(f"Completed command: {cmd}")
except Exception as e:
print(f"Error running '{cmd}': {e}")
logger.error(f"Error running '{cmd}': {e}", exc_info=True)
continue
logger.info("All commands completed.")
registry.register("configure", run_configure,

View File

@@ -3,6 +3,7 @@
import getpass
from pathlib import Path
import yaml
from .logger import setup_logger
class Configuration:
@@ -13,10 +14,12 @@ class Configuration:
return self.config_file.exists()
def create(self):
logger = setup_logger(verbose=False)
if self.exists():
overwrite = input(
"Configuration already exists. Overwrite? (y/N): ").strip().lower()
if overwrite not in ("y", "yes"):
logger.info("Configuration creation cancelled.")
return
config_dir = self.config_file.parent
@@ -37,7 +40,7 @@ class Configuration:
batch_size = int(batch_size)
delay = int(delay)
except ValueError:
print("Error: batch_size and delay must be integers")
logger.error("batch_size and delay must be integers")
return
config_data = {
@@ -51,7 +54,7 @@ class Configuration:
with open(self.config_file, "w") as f:
yaml.dump(config_data, f, default_flow_style=False)
print(f"\nConfiguration saved to {self.config_file}")
logger.info(f"Configuration saved to {self.config_file}")
def load(self) -> dict:
if not self.exists():

View File

@@ -1,69 +0,0 @@
"""Follow undoing module for Skywipe"""
import time
from atproto import models
from .auth import Auth
from .configure import Configuration
FOLLOW_COLLECTION = "app.bsky.graph.follow"
def unfollow_all():
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting unfollow operation with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_unfollowed = 0
while True:
list_params = models.ComAtprotoRepoListRecords.Params(
repo=did,
collection=FOLLOW_COLLECTION,
limit=batch_size,
cursor=cursor
)
response = client.com.atproto.repo.list_records(params=list_params)
records = response.records
if not records:
break
for record in records:
try:
record_uri = record.uri
rkey = record_uri.rsplit("/", 1)[-1]
delete_data = {
"repo": did,
"collection": FOLLOW_COLLECTION,
"rkey": rkey
}
client.com.atproto.repo.delete_record(data=delete_data)
total_unfollowed += 1
if verbose:
print(f"Unfollowed: {record_uri}")
except Exception as e:
record_uri = getattr(record, "uri", "unknown")
if verbose:
print(f"Error unfollowing {record_uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
print(f"Unfollowed {total_unfollowed} accounts.")

View File

@@ -1,69 +0,0 @@
"""Like undoing module for Skywipe"""
import time
from atproto import models
from .auth import Auth
from .configure import Configuration
LIKE_COLLECTION = "app.bsky.feed.like"
def undo_likes():
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting like deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_undone = 0
while True:
list_params = models.ComAtprotoRepoListRecords.Params(
repo=did,
collection=LIKE_COLLECTION,
limit=batch_size,
cursor=cursor
)
response = client.com.atproto.repo.list_records(params=list_params)
records = response.records
if not records:
break
for record in records:
try:
record_uri = record.uri
rkey = record_uri.rsplit("/", 1)[-1]
delete_data = {
"repo": did,
"collection": LIKE_COLLECTION,
"rkey": rkey
}
client.com.atproto.repo.delete_record(data=delete_data)
total_undone += 1
if verbose:
print(f"Undone like: {record_uri}")
except Exception as e:
record_uri = getattr(record, "uri", "unknown")
if verbose:
print(f"Error undoing like {record_uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
print(f"Undone {total_undone} likes.")

73
skywipe/logger.py Normal file
View File

@@ -0,0 +1,73 @@
"""Centralized logging module for Skywipe"""
import logging
import sys
from pathlib import Path
from typing import Optional
class ProgressTracker:
def __init__(self, operation: str = "Processing"):
self.current = 0
self.operation = operation
def update(self, count: int = 1):
self.current += count
def batch(self, batch_num: int, batch_size: int, total_batches: Optional[int] = None):
logger = logging.getLogger("skywipe.progress")
if total_batches:
logger.info(
f"{self.operation} - batch {batch_num}/{total_batches} ({batch_size} items)"
)
else:
logger.info(
f"{self.operation} - batch {batch_num} ({batch_size} items)")
class LevelFilter(logging.Filter):
def __init__(self, min_level: int, max_level: int):
super().__init__()
self.min_level = min_level
self.max_level = max_level
def filter(self, record: logging.LogRecord) -> bool:
return self.min_level <= record.levelno <= self.max_level
def setup_logger(verbose: bool = False, log_file: Optional[Path] = None) -> logging.Logger:
logger = logging.getLogger("skywipe")
logger.setLevel(logging.DEBUG if verbose else logging.INFO)
if logger.handlers:
return logger
formatter = logging.Formatter(fmt="%(levelname)s: %(message)s")
info_handler = logging.StreamHandler(sys.stdout)
info_handler.setLevel(logging.DEBUG if verbose else logging.INFO)
info_handler.addFilter(LevelFilter(logging.DEBUG, logging.INFO))
info_handler.setFormatter(formatter)
logger.addHandler(info_handler)
error_handler = logging.StreamHandler(sys.stderr)
error_handler.setLevel(logging.WARNING)
error_handler.setFormatter(formatter)
logger.addHandler(error_handler)
if log_file:
log_file.parent.mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(log_file, encoding="utf-8")
file_handler.setLevel(logging.DEBUG)
file_formatter = logging.Formatter(
fmt="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
return logger
def get_logger() -> logging.Logger:
return logging.getLogger("skywipe")

View File

@@ -1,92 +0,0 @@
"""Media post deletion module for Skywipe"""
import time
from .auth import Auth
from .configure import Configuration
def delete_posts_with_medias():
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting media post deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_deleted = 0
while True:
response = client.get_author_feed(
actor=did, limit=batch_size, cursor=cursor)
posts = response.feed
if not posts:
break
for post in posts:
post_record = post.post
embed = getattr(post_record, 'embed', None)
has_media = False
if embed:
embed_type = getattr(embed, 'py_type', None)
media_types = {
'app.bsky.embed.images',
'app.bsky.embed.video',
'app.bsky.embed.external'
}
if embed_type:
embed_type_base = embed_type.split('#')[0]
if embed_type_base in media_types:
has_media = True
if not has_media and embed_type_base in ('app.bsky.embed.recordWithMedia', 'app.bsky.embed.record_with_media'):
media = getattr(embed, 'media', None)
if media:
media_type = getattr(media, 'py_type', None)
if media_type:
media_type_base = media_type.split('#')[0]
if media_type_base in media_types:
has_media = True
if not has_media:
for attr in ('images', 'video', 'external'):
if hasattr(embed, attr):
has_media = True
break
if isinstance(embed, dict) and embed.get(attr):
has_media = True
break
if not has_media:
if verbose:
print(f"Skipping post without media: {post_record.uri}")
continue
try:
client.delete_post(post_record.uri)
total_deleted += 1
if verbose:
print(f"Deleted post with media: {post_record.uri}")
except Exception as e:
if verbose:
print(f"Error deleting post {post_record.uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
print(f"Deleted {total_deleted} posts with media.")

222
skywipe/operations.py Normal file
View File

@@ -0,0 +1,222 @@
"""Shared operation utilities and strategies for Skywipe"""
import time
from typing import Callable, Optional, Any
from atproto import models
from .auth import Auth
from .configure import Configuration
from .logger import get_logger, ProgressTracker
class OperationContext:
def __init__(self):
self.logger = get_logger()
self.auth = Auth()
self.client = self.auth.login()
self.config = Configuration()
self.config_data = self.config.load()
self.batch_size = self.config_data.get("batch_size", 10)
self.delay = self.config_data.get("delay", 1)
self.did = self.client.me.did
class RecordDeletionStrategy:
def __init__(self, collection: str):
self.collection = collection
def fetch(self, context: OperationContext, cursor: Optional[str] = None):
list_params = models.ComAtprotoRepoListRecords.Params(
repo=context.did,
collection=self.collection,
limit=context.batch_size,
cursor=cursor
)
return context.client.com.atproto.repo.list_records(params=list_params)
def extract_items(self, response):
return response.records
def get_cursor(self, response):
return response.cursor
def process_item(self, record, context: OperationContext):
record_uri = record.uri
rkey = record_uri.rsplit("/", 1)[-1]
delete_data = {
"repo": context.did,
"collection": self.collection,
"rkey": rkey
}
context.client.com.atproto.repo.delete_record(data=delete_data)
context.logger.debug(f"Deleted: {record_uri}")
class FeedStrategy:
def fetch(self, context: OperationContext, cursor: Optional[str] = None):
if cursor:
return context.client.get_author_feed(
actor=context.did, limit=context.batch_size, cursor=cursor
)
return context.client.get_author_feed(actor=context.did, limit=context.batch_size)
def extract_items(self, response):
return response.feed
def get_cursor(self, response):
return response.cursor
def process_item(self, post, context: OperationContext):
uri = post.post.uri
context.client.delete_post(uri)
context.logger.debug(f"Deleted post: {uri}")
class BookmarkStrategy:
def fetch(self, context: OperationContext, cursor: Optional[str] = None):
get_params = models.AppBskyBookmarkGetBookmarks.Params(
limit=context.batch_size,
cursor=cursor
)
return context.client.app.bsky.bookmark.get_bookmarks(params=get_params)
def extract_items(self, response):
return response.bookmarks
def get_cursor(self, response):
return response.cursor
def process_item(self, bookmark, context: OperationContext):
bookmark_uri = self._extract_bookmark_uri(bookmark)
if not bookmark_uri:
raise ValueError("Unable to find bookmark URI")
delete_data = models.AppBskyBookmarkDeleteBookmark.Data(
uri=bookmark_uri)
context.client.app.bsky.bookmark.delete_bookmark(data=delete_data)
context.logger.debug(f"Deleted bookmark: {bookmark_uri}")
def _extract_bookmark_uri(self, bookmark):
if hasattr(bookmark, "uri"):
return bookmark.uri
for attr_name in ("subject", "record", "post", "item"):
if hasattr(bookmark, attr_name):
nested = getattr(bookmark, attr_name)
if hasattr(nested, "uri"):
return nested.uri
return None
class Operation:
def __init__(
self,
operation_name: str,
strategy_type: str = "feed",
collection: Optional[str] = None,
filter_fn: Optional[Callable[[Any], bool]] = None
):
self.operation_name = operation_name
self.filter_fn = filter_fn
if strategy_type == "record":
if not collection:
raise ValueError("Collection is required for record strategy")
self.strategy = RecordDeletionStrategy(collection)
elif strategy_type == "bookmark":
self.strategy = BookmarkStrategy()
else:
self.strategy = FeedStrategy()
def run(self) -> int:
context = OperationContext()
progress = ProgressTracker(operation=self.operation_name)
context.logger.info(
f"Starting {self.operation_name} with batch_size={context.batch_size}, delay={context.delay}s"
)
cursor = None
total_processed = 0
batch_num = 0
while True:
batch_num += 1
response = self.strategy.fetch(context, cursor)
items = self.strategy.extract_items(response)
if not items:
break
progress.batch(batch_num, len(items))
for item in items:
if self.filter_fn and not self.filter_fn(item):
continue
try:
self.strategy.process_item(item, context)
total_processed += 1
progress.update(1)
except Exception as e:
context.logger.error(f"Error processing item: {e}")
cursor = self.strategy.get_cursor(response)
if not cursor:
break
if context.delay > 0:
time.sleep(context.delay)
context.logger.info(
f"{self.operation_name}: {total_processed} items processed.")
return total_processed
def run_operation(
strategy,
operation_name: str,
filter_fn: Optional[Callable[[Any], bool]] = None
) -> int:
context = OperationContext()
progress = ProgressTracker(operation=operation_name)
context.logger.info(
f"Starting {operation_name} with batch_size={context.batch_size}, delay={context.delay}s"
)
cursor = None
total_processed = 0
batch_num = 0
while True:
batch_num += 1
response = strategy.fetch(context, cursor)
items = strategy.extract_items(response)
if not items:
break
progress.batch(batch_num, len(items))
for item in items:
if filter_fn and not filter_fn(item):
continue
try:
strategy.process_item(item, context)
total_processed += 1
progress.update(1)
except Exception as e:
context.logger.error(f"Error processing item: {e}")
cursor = strategy.get_cursor(response)
if not cursor:
break
if context.delay > 0:
time.sleep(context.delay)
context.logger.info(
f"{operation_name}: {total_processed} items processed.")
return total_processed

60
skywipe/post_analysis.py Normal file
View File

@@ -0,0 +1,60 @@
"""Post analysis utilities for Skywipe"""
class PostAnalyzer:
MEDIA_TYPES = {
'app.bsky.embed.images',
'app.bsky.embed.video',
'app.bsky.embed.external'
}
QUOTE_TYPES = {
'app.bsky.embed.record',
'app.bsky.embed.recordWithMedia',
'app.bsky.embed.record_with_media'
}
@staticmethod
def has_media(post_record):
embed = getattr(post_record, 'embed', None)
if not embed:
return False
embed_type = getattr(embed, 'py_type', None)
if embed_type:
embed_type_base = embed_type.split('#')[0]
if embed_type_base in PostAnalyzer.MEDIA_TYPES:
return True
if embed_type_base in ('app.bsky.embed.recordWithMedia', 'app.bsky.embed.record_with_media'):
media = getattr(embed, 'media', None)
if media:
media_type = getattr(media, 'py_type', None)
if media_type:
media_type_base = media_type.split('#')[0]
if media_type_base in PostAnalyzer.MEDIA_TYPES:
return True
for attr in ('images', 'video', 'external'):
if hasattr(embed, attr):
return True
if isinstance(embed, dict) and embed.get(attr):
return True
return False
@staticmethod
def has_quote(post_record):
embed = getattr(post_record, 'embed', None)
if not embed:
return False
embed_type = getattr(embed, 'py_type', None)
if embed_type:
embed_type_base = embed_type.split('#')[0]
if embed_type_base in PostAnalyzer.QUOTE_TYPES:
return True
return (hasattr(embed, 'record') or
(isinstance(embed, dict) and embed.get('record')))

View File

@@ -1,56 +0,0 @@
"""Post deletion module for Skywipe"""
import time
from .auth import Auth
from .configure import Configuration
def delete_all_posts():
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting post deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_deleted = 0
while True:
if cursor:
response = client.get_author_feed(
actor=did, limit=batch_size, cursor=cursor)
else:
response = client.get_author_feed(actor=did, limit=batch_size)
posts = response.feed
if not posts:
break
post_uris = [post.post.uri for post in posts]
for uri in post_uris:
try:
client.delete_post(uri)
total_deleted += 1
if verbose:
print(f"Deleted post: {uri}")
except Exception as e:
if verbose:
print(f"Error deleting post {uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
print(f"Deleted {total_deleted} posts.")

View File

@@ -1,75 +0,0 @@
"""Quote post deletion module for Skywipe"""
import time
from .auth import Auth
from .configure import Configuration
def delete_quotes_posts():
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting quote post deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_deleted = 0
while True:
response = client.get_author_feed(
actor=did, limit=batch_size, cursor=cursor)
posts = response.feed
if not posts:
break
for post in posts:
post_record = post.post
embed = getattr(post_record, 'embed', None)
has_quote = False
if embed:
embed_type = getattr(embed, 'py_type', None)
if embed_type:
embed_type_base = embed_type.split('#')[0]
quote_types = {
'app.bsky.embed.record',
'app.bsky.embed.recordWithMedia',
'app.bsky.embed.record_with_media'
}
if embed_type_base in quote_types:
has_quote = True
if not has_quote and (hasattr(embed, 'record') or (isinstance(embed, dict) and embed.get('record'))):
has_quote = True
if not has_quote:
if verbose:
print(f"Skipping post without quote: {post_record.uri}")
continue
try:
client.delete_post(post_record.uri)
total_deleted += 1
if verbose:
print(f"Deleted quote post: {post_record.uri}")
except Exception as e:
if verbose:
print(f"Error deleting quote post {post_record.uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
print(f"Deleted {total_deleted} quote posts.")

View File

@@ -1,69 +0,0 @@
"""Repost undoing module for Skywipe"""
import time
from atproto import models
from .auth import Auth
from .configure import Configuration
REPOST_COLLECTION = "app.bsky.feed.repost"
def undo_reposts():
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting repost deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_undone = 0
while True:
list_params = models.ComAtprotoRepoListRecords.Params(
repo=did,
collection=REPOST_COLLECTION,
limit=batch_size,
cursor=cursor
)
response = client.com.atproto.repo.list_records(params=list_params)
records = response.records
if not records:
break
for record in records:
try:
record_uri = record.uri
rkey = record_uri.rsplit("/", 1)[-1]
delete_data = {
"repo": did,
"collection": REPOST_COLLECTION,
"rkey": rkey
}
client.com.atproto.repo.delete_record(data=delete_data)
total_undone += 1
if verbose:
print(f"Undone repost: {record_uri}")
except Exception as e:
record_uri = getattr(record, "uri", "unknown")
if verbose:
print(f"Error undoing repost {record_uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
print(f"Undone {total_undone} reposts.")

27
skywipe/safeguard.py Normal file
View File

@@ -0,0 +1,27 @@
"""Safeguard module for Skywipe"""
import sys
from .logger import get_logger
CONFIRM_RESPONSES = {"yes", "y"}
def require_confirmation(operation: str, skip_confirmation: bool = False) -> None:
if skip_confirmation:
return
logger = get_logger()
logger.warning(f"This will {operation}")
logger.warning("This operation is DESTRUCTIVE and cannot be undone!")
try:
response = input(
"Are you sure you want to continue? (y/N): ").strip().lower()
except (EOFError, KeyboardInterrupt):
logger.info("\nOperation cancelled.")
sys.exit(0)
if response not in CONFIRM_RESPONSES:
logger.info("Operation cancelled.")
sys.exit(0)