Compare commits

..

23 Commits

Author SHA1 Message Date
590cc03ba4 docs: refactor warning 2025-12-20 16:03:39 +01:00
c493a99860 docs: inform about log 2025-12-20 16:01:07 +01:00
3f9ef6527f docs: update roadmap 2025-12-20 16:00:03 +01:00
f53e5bb527 feat: implement our safeguard in commands 2025-12-20 15:59:54 +01:00
f2854a0df5 feat: implement safeguard and relocate log file 2025-12-20 15:59:41 +01:00
defd991006 feat: new safeguard module 2025-12-20 15:59:17 +01:00
f922e3cf9d docs: update roadmap 2025-12-20 15:46:48 +01:00
06d70edaf1 feat: use our new logger 2025-12-20 15:46:33 +01:00
58ab6cfafa feat: implement proper logging 2025-12-20 15:46:22 +01:00
dba06e642a docs: update roadmap 2025-12-19 14:39:15 +01:00
5868c1649b docs: update readme 2025-12-19 14:35:25 +01:00
a14184cddc feat: run all 2025-12-19 14:35:21 +01:00
6587f8c39c feat: implement bookmark deletion module 2025-12-19 14:35:05 +01:00
ae6663572c refactor: inline has_quote_embed 2025-12-19 14:34:42 +01:00
3eb456e999 refactor: delete_posts is now delete_all_posts 2025-12-19 14:34:25 +01:00
cfa5773e62 refactor: inline has_media_embed 2025-12-19 14:34:02 +01:00
ddee2a6029 feat: implement follow undoing module 2025-12-19 14:33:43 +01:00
5e2b4f3408 docs: update readme 2025-12-19 14:10:37 +01:00
c238278df6 feat: add quote post deletion 2025-12-19 14:10:33 +01:00
a9c25c8c10 feat: implemented delete_quotes() 2025-12-19 14:10:25 +01:00
5ff25b3eb6 docs: update readme 2025-12-19 12:51:30 +01:00
c396ba8ae9 feat: create repost undoing module 2025-12-19 12:51:25 +01:00
d12f14a994 feat: implemented undo_reposts() 2025-12-19 12:51:03 +01:00
13 changed files with 573 additions and 131 deletions

View File

@@ -2,9 +2,9 @@
Skywipe is a work-in-progress Python 3.13+ CLI that helps you wipe data from your Bluesky account using the AT Protocol SDK.
## Warning
**Warning:** This tool performs _**destructive operations**_.
This tool performs destructive operations. Only use it if you intend to erase data from your Bluesky account.
Only use it if you intend to permanently erase data from your Bluesky account.
## Requirements
@@ -20,22 +20,6 @@ uv sync
## How to run
When installation will be worked out, you'll be able to :
```bash
skywipe all # target everything
skywipe configure # create configuration
skywipe posts # delete posts
skywipe medias # delete posts with medias
skywipe likes # undo likes
skywipe reposts # undo reposts
skywipe quotes # delete quotes
skywipe follows # unfollow all
skywipe bookmarks # delete bookmarks
```
### Running with `uv` (without installation)
While it's being developed, you can use the tool using `uv` :
```bash
@@ -50,6 +34,10 @@ uv run python -m skywipe.cli follows # unfollow all
uv run python -m skywipe.cli bookmarks # delete bookmarks
```
Use the `--yes` flag to skip the confirmation prompt and proceed with the operation.
A log of the operations will be saved in `~/.cache/skywipe/skywipe.log`.
## Configuration
If you run the tool for the first time, it will prompt you to use `skywipe configure` to create the configuration file, which is located in `~/.config/skywipe/config.yml` :
@@ -72,16 +60,13 @@ BE SURE TO USE A [BLUESKY APP PASSWORD](https://blueskyfeeds.com/faq-app-passwor
- [x] delete posts in batch
- [x] only delete posts with media
- [x] undo likes
- [ ] undo reposts
- [ ] delete quotes
- [ ] unfollow accounts
- [ ] remove bookmarks
- [ ] make `all` run the other commands
- [ ] add simple progress and logging
- [ ] add safeguards like confirmations and clear dry-run info
Once it's done, we'll think:
- [x] undo reposts
- [x] delete quotes
- [x] unfollow accounts
- [x] remove bookmarks
- [x] make `all` run the other commands
- [x] add simple progress and logging
- [x] add safeguards (confirmation, dry-run flag)
- [ ] decent code architecture
- [ ] installation and run process

78
skywipe/bookmarks.py Normal file
View File

@@ -0,0 +1,78 @@
"""Bookmark deletion module for Skywipe"""
import time
from atproto import models
from .auth import Auth
from .configure import Configuration
from .logger import get_logger, ProgressTracker
def delete_bookmarks():
logger = get_logger()
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
logger.info(
f"Starting bookmark deletion with batch_size={batch_size}, delay={delay}s")
cursor = None
total_deleted = 0
batch_num = 0
progress = ProgressTracker(operation="Deleting bookmarks")
while True:
batch_num += 1
get_params = models.AppBskyBookmarkGetBookmarks.Params(
limit=batch_size,
cursor=cursor
)
response = client.app.bsky.bookmark.get_bookmarks(params=get_params)
bookmarks = response.bookmarks
if not bookmarks:
break
progress.batch(batch_num, len(bookmarks))
for bookmark in bookmarks:
try:
bookmark_uri = None
if hasattr(bookmark, "uri"):
bookmark_uri = bookmark.uri
else:
for attr_name in ("subject", "record", "post", "item"):
if hasattr(bookmark, attr_name):
nested = getattr(bookmark, attr_name)
if hasattr(nested, "uri"):
bookmark_uri = nested.uri
break
if not bookmark_uri:
logger.debug("Skipping bookmark: unable to find uri")
continue
delete_data = models.AppBskyBookmarkDeleteBookmark.Data(
uri=bookmark_uri
)
client.app.bsky.bookmark.delete_bookmark(data=delete_data)
total_deleted += 1
progress.update(1)
logger.debug(f"Deleted bookmark: {bookmark_uri}")
except Exception as e:
bookmark_uri = getattr(bookmark, "uri", "unknown")
logger.error(f"Error deleting bookmark {bookmark_uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
logger.info(f"Deleted {total_deleted} bookmarks.")

View File

@@ -2,9 +2,11 @@
import sys
import argparse
from pathlib import Path
from .commands import registry
from .configure import Configuration
from .logger import setup_logger
def create_parser():
@@ -15,6 +17,12 @@ def create_parser():
epilog="WARNING: This tool performs destructive operations. Only use it if you intend to erase data from your Bluesky account."
)
parser.add_argument(
"--yes",
action="store_true",
help="Skip confirmation prompt and proceed with destructive operations"
)
subparsers = parser.add_subparsers(
dest="command",
help="Command to execute",
@@ -31,8 +39,9 @@ def create_parser():
def require_config():
config = Configuration()
if not config.exists():
print("Error: Configuration file not found.")
print("You must run 'skywipe configure' first.")
logger = setup_logger(verbose=False)
logger.error("Configuration file not found.")
logger.error("You must run 'skywipe configure' first.")
sys.exit(1)
@@ -42,11 +51,24 @@ def main():
if registry.requires_config(args.command):
require_config()
config = Configuration()
config_data = config.load()
verbose = config_data.get("verbose", False)
log_file = Path.home() / ".cache" / "skywipe" / "skywipe.log"
setup_logger(verbose=verbose, log_file=log_file)
else:
setup_logger(verbose=False)
try:
registry.execute(args.command)
registry.execute(
args.command, skip_confirmation=getattr(args, "yes", False))
except ValueError as e:
print(f"Error: {e}")
logger = setup_logger(verbose=False)
logger.error(f"{e}")
sys.exit(1)
except Exception as e:
logger = setup_logger(verbose=False)
logger.error(f"Unexpected error: {e}", exc_info=True)
sys.exit(1)

View File

@@ -2,12 +2,18 @@
from typing import Callable, Dict, Optional
from .configure import Configuration
from .posts import delete_posts
from .posts import delete_all_posts
from .medias import delete_posts_with_medias
from .likes import undo_likes
from .reposts import undo_reposts
from .quotes import delete_quotes_posts
from .follows import unfollow_all
from .bookmarks import delete_bookmarks
from .logger import get_logger
from .safeguard import require_confirmation
CommandHandler = Callable[[], None]
CommandHandler = Callable[..., None]
class CommandRegistry:
@@ -39,10 +45,13 @@ class CommandRegistry:
def get_all_commands(self) -> Dict[str, str]:
return self._help_texts.copy()
def execute(self, name: str):
def execute(self, name: str, skip_confirmation: bool = False):
handler = self.get_handler(name)
if handler:
handler()
if name == "configure":
handler()
else:
handler(skip_confirmation)
else:
raise ValueError(f"Unknown command: {name}")
@@ -55,42 +64,58 @@ def run_configure():
config.create()
def run_posts():
delete_posts()
def run_posts(skip_confirmation: bool = False):
require_confirmation("delete all posts", skip_confirmation)
delete_all_posts()
def run_medias():
def run_medias(skip_confirmation: bool = False):
require_confirmation("delete all posts with media", skip_confirmation)
delete_posts_with_medias()
def run_likes():
def run_likes(skip_confirmation: bool = False):
require_confirmation("undo all likes", skip_confirmation)
undo_likes()
def run_reposts():
print("Command 'reposts' is not yet implemented.")
def run_reposts(skip_confirmation: bool = False):
require_confirmation("undo all reposts", skip_confirmation)
undo_reposts()
def run_quotes():
print("Command 'quotes' is not yet implemented.")
def run_quotes(skip_confirmation: bool = False):
require_confirmation("delete all quote posts", skip_confirmation)
delete_quotes_posts()
def run_follows():
print("Command 'follows' is not yet implemented.")
def run_follows(skip_confirmation: bool = False):
require_confirmation("unfollow all accounts", skip_confirmation)
unfollow_all()
def run_bookmarks():
print("Command 'bookmarks' is not yet implemented")
def run_bookmarks(skip_confirmation: bool = False):
require_confirmation("delete all bookmarks", skip_confirmation)
delete_bookmarks()
def run_all():
registry.execute("posts")
registry.execute("medias")
registry.execute("likes")
registry.execute("reposts")
registry.execute("quotes")
registry.execute("follows")
registry.execute("bookmarks")
def run_all(skip_confirmation: bool = False):
logger = get_logger()
require_confirmation(
"run all cleanup commands (posts, likes, reposts, follows, bookmarks)", skip_confirmation)
commands = ["posts", "likes", "reposts", "follows", "bookmarks"]
logger.info("Running all cleanup commands...")
for cmd in commands:
try:
logger.info(f"Starting command: {cmd}")
registry.execute(cmd, skip_confirmation=True)
logger.info(f"Completed command: {cmd}")
except Exception as e:
logger.error(f"Error running '{cmd}': {e}", exc_info=True)
continue
logger.info("All commands completed.")
registry.register("configure", run_configure,
@@ -99,7 +124,7 @@ registry.register("posts", run_posts, "only posts")
registry.register("medias", run_medias, "only posts with medias")
registry.register("likes", run_likes, "only likes")
registry.register("reposts", run_reposts, "only reposts")
registry.register("quotes", run_reposts, "only quotes")
registry.register("quotes", run_quotes, "only quotes")
registry.register("follows", run_follows, "only follows")
registry.register("bookmarks", run_follows, "only bookmarks")
registry.register("bookmarks", run_bookmarks, "only bookmarks")
registry.register("all", run_all, "target everything")

View File

@@ -3,6 +3,7 @@
import getpass
from pathlib import Path
import yaml
from .logger import setup_logger, get_logger
class Configuration:
@@ -13,10 +14,12 @@ class Configuration:
return self.config_file.exists()
def create(self):
logger = setup_logger(verbose=False)
if self.exists():
overwrite = input(
"Configuration already exists. Overwrite? (y/N): ").strip().lower()
if overwrite not in ("y", "yes"):
logger.info("Configuration creation cancelled.")
return
config_dir = self.config_file.parent
@@ -37,7 +40,7 @@ class Configuration:
batch_size = int(batch_size)
delay = int(delay)
except ValueError:
print("Error: batch_size and delay must be integers")
logger.error("batch_size and delay must be integers")
return
config_data = {
@@ -51,7 +54,7 @@ class Configuration:
with open(self.config_file, "w") as f:
yaml.dump(config_data, f, default_flow_style=False)
print(f"\nConfiguration saved to {self.config_file}")
logger.info(f"Configuration saved to {self.config_file}")
def load(self) -> dict:
if not self.exists():

73
skywipe/follows.py Normal file
View File

@@ -0,0 +1,73 @@
"""Follow undoing module for Skywipe"""
import time
from atproto import models
from .auth import Auth
from .configure import Configuration
from .logger import get_logger, ProgressTracker
FOLLOW_COLLECTION = "app.bsky.graph.follow"
def unfollow_all():
logger = get_logger()
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
logger.info(
f"Starting unfollow operation with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_unfollowed = 0
batch_num = 0
progress = ProgressTracker(operation="Unfollowing accounts")
while True:
batch_num += 1
list_params = models.ComAtprotoRepoListRecords.Params(
repo=did,
collection=FOLLOW_COLLECTION,
limit=batch_size,
cursor=cursor
)
response = client.com.atproto.repo.list_records(params=list_params)
records = response.records
if not records:
break
progress.batch(batch_num, len(records))
for record in records:
try:
record_uri = record.uri
rkey = record_uri.rsplit("/", 1)[-1]
delete_data = {
"repo": did,
"collection": FOLLOW_COLLECTION,
"rkey": rkey
}
client.com.atproto.repo.delete_record(data=delete_data)
total_unfollowed += 1
progress.update(1)
logger.debug(f"Unfollowed: {record_uri}")
except Exception as e:
record_uri = getattr(record, "uri", "unknown")
logger.error(f"Error unfollowing {record_uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
logger.info(f"Unfollowed {total_unfollowed} accounts.")

View File

@@ -4,12 +4,14 @@ import time
from atproto import models
from .auth import Auth
from .configure import Configuration
from .logger import get_logger, ProgressTracker
LIKE_COLLECTION = "app.bsky.feed.like"
def undo_likes():
logger = get_logger()
auth = Auth()
client = auth.login()
config = Configuration()
@@ -17,17 +19,18 @@ def undo_likes():
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting like deletion with batch_size={batch_size}, delay={delay}s")
logger.info(
f"Starting like deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_undone = 0
batch_num = 0
progress = ProgressTracker(operation="Undoing likes")
while True:
batch_num += 1
list_params = models.ComAtprotoRepoListRecords.Params(
repo=did,
collection=LIKE_COLLECTION,
@@ -41,6 +44,8 @@ def undo_likes():
if not records:
break
progress.batch(batch_num, len(records))
for record in records:
try:
record_uri = record.uri
@@ -52,12 +57,11 @@ def undo_likes():
}
client.com.atproto.repo.delete_record(data=delete_data)
total_undone += 1
if verbose:
print(f"Undone like: {record_uri}")
progress.update(1)
logger.debug(f"Undone like: {record_uri}")
except Exception as e:
record_uri = getattr(record, "uri", "unknown")
if verbose:
print(f"Error undoing like {record_uri}: {e}")
logger.error(f"Error undoing like {record_uri}: {e}")
cursor = response.cursor
if not cursor:
@@ -66,4 +70,4 @@ def undo_likes():
if delay > 0:
time.sleep(delay)
print(f"Undone {total_undone} likes.")
logger.info(f"Undone {total_undone} likes.")

73
skywipe/logger.py Normal file
View File

@@ -0,0 +1,73 @@
"""Centralized logging module for Skywipe"""
import logging
import sys
from pathlib import Path
from typing import Optional
class ProgressTracker:
def __init__(self, operation: str = "Processing"):
self.current = 0
self.operation = operation
def update(self, count: int = 1):
self.current += count
def batch(self, batch_num: int, batch_size: int, total_batches: Optional[int] = None):
logger = logging.getLogger("skywipe.progress")
if total_batches:
logger.info(
f"{self.operation} - batch {batch_num}/{total_batches} ({batch_size} items)"
)
else:
logger.info(
f"{self.operation} - batch {batch_num} ({batch_size} items)")
class LevelFilter(logging.Filter):
def __init__(self, min_level: int, max_level: int):
super().__init__()
self.min_level = min_level
self.max_level = max_level
def filter(self, record: logging.LogRecord) -> bool:
return self.min_level <= record.levelno <= self.max_level
def setup_logger(verbose: bool = False, log_file: Optional[Path] = None) -> logging.Logger:
logger = logging.getLogger("skywipe")
logger.setLevel(logging.DEBUG if verbose else logging.INFO)
if logger.handlers:
return logger
formatter = logging.Formatter(fmt="%(levelname)s: %(message)s")
info_handler = logging.StreamHandler(sys.stdout)
info_handler.setLevel(logging.DEBUG if verbose else logging.INFO)
info_handler.addFilter(LevelFilter(logging.DEBUG, logging.INFO))
info_handler.setFormatter(formatter)
logger.addHandler(info_handler)
error_handler = logging.StreamHandler(sys.stderr)
error_handler.setLevel(logging.WARNING)
error_handler.setFormatter(formatter)
logger.addHandler(error_handler)
if log_file:
log_file.parent.mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(log_file, encoding="utf-8")
file_handler.setLevel(logging.DEBUG)
file_formatter = logging.Formatter(
fmt="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
return logger
def get_logger() -> logging.Logger:
return logging.getLogger("skywipe")

View File

@@ -3,44 +3,11 @@
import time
from .auth import Auth
from .configure import Configuration
def has_media_embed(post_record):
embed = getattr(post_record, 'embed', None)
if not embed:
return False
embed_type = getattr(embed, 'py_type', None)
media_types = {
'app.bsky.embed.images',
'app.bsky.embed.video',
'app.bsky.embed.external'
}
if embed_type:
embed_type_base = embed_type.split('#')[0]
if embed_type_base in media_types:
return True
if embed_type_base in ('app.bsky.embed.recordWithMedia', 'app.bsky.embed.record_with_media'):
media = getattr(embed, 'media', None)
if media:
media_type = getattr(media, 'py_type', None)
if media_type:
media_type_base = media_type.split('#')[0]
if media_type_base in media_types:
return True
for attr in ('images', 'video', 'external'):
if hasattr(embed, attr):
return True
if isinstance(embed, dict) and embed.get(attr):
return True
return False
from .logger import get_logger, ProgressTracker
def delete_posts_with_medias():
logger = get_logger()
auth = Auth()
client = auth.login()
config = Configuration()
@@ -48,43 +15,75 @@ def delete_posts_with_medias():
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting media post deletion with batch_size={batch_size}, delay={delay}s")
logger.info(
f"Starting media post deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_deleted = 0
batch_num = 0
progress = ProgressTracker(operation="Deleting posts with media")
while True:
if cursor:
response = client.get_author_feed(
actor=did, limit=batch_size, cursor=cursor)
else:
response = client.get_author_feed(actor=did, limit=batch_size)
batch_num += 1
response = client.get_author_feed(
actor=did, limit=batch_size, cursor=cursor)
posts = response.feed
if not posts:
break
progress.batch(batch_num, len(posts))
for post in posts:
post_record = post.post
if not has_media_embed(post_record):
if verbose:
print(f"Skipping post without media: {post_record.uri}")
embed = getattr(post_record, 'embed', None)
has_media = False
if embed:
embed_type = getattr(embed, 'py_type', None)
media_types = {
'app.bsky.embed.images',
'app.bsky.embed.video',
'app.bsky.embed.external'
}
if embed_type:
embed_type_base = embed_type.split('#')[0]
if embed_type_base in media_types:
has_media = True
if not has_media and embed_type_base in ('app.bsky.embed.recordWithMedia', 'app.bsky.embed.record_with_media'):
media = getattr(embed, 'media', None)
if media:
media_type = getattr(media, 'py_type', None)
if media_type:
media_type_base = media_type.split('#')[0]
if media_type_base in media_types:
has_media = True
if not has_media:
for attr in ('images', 'video', 'external'):
if hasattr(embed, attr):
has_media = True
break
if isinstance(embed, dict) and embed.get(attr):
has_media = True
break
if not has_media:
logger.debug(f"Skipping post without media: {post_record.uri}")
continue
try:
client.delete_post(post_record.uri)
total_deleted += 1
if verbose:
print(f"Deleted post with media: {post_record.uri}")
progress.update(1)
logger.debug(f"Deleted post with media: {post_record.uri}")
except Exception as e:
if verbose:
print(f"Error deleting post {post_record.uri}: {e}")
logger.error(f"Error deleting post {post_record.uri}: {e}")
cursor = response.cursor
if not cursor:
@@ -93,4 +92,4 @@ def delete_posts_with_medias():
if delay > 0:
time.sleep(delay)
print(f"Deleted {total_deleted} posts with media.")
logger.info(f"Deleted {total_deleted} posts with media.")

View File

@@ -3,9 +3,11 @@
import time
from .auth import Auth
from .configure import Configuration
from .logger import get_logger, ProgressTracker
def delete_posts():
def delete_all_posts():
logger = get_logger()
auth = Auth()
client = auth.login()
config = Configuration()
@@ -13,17 +15,18 @@ def delete_posts():
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting post deletion with batch_size={batch_size}, delay={delay}s")
logger.info(
f"Starting post deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_deleted = 0
batch_num = 0
progress = ProgressTracker(operation="Deleting posts")
while True:
batch_num += 1
if cursor:
response = client.get_author_feed(
actor=did, limit=batch_size, cursor=cursor)
@@ -35,16 +38,16 @@ def delete_posts():
break
post_uris = [post.post.uri for post in posts]
progress.batch(batch_num, len(post_uris))
for uri in post_uris:
try:
client.delete_post(uri)
total_deleted += 1
if verbose:
print(f"Deleted post: {uri}")
progress.update(1)
logger.debug(f"Deleted post: {uri}")
except Exception as e:
if verbose:
print(f"Error deleting post {uri}: {e}")
logger.error(f"Error deleting post {uri}: {e}")
cursor = response.cursor
if not cursor:
@@ -53,4 +56,4 @@ def delete_posts():
if delay > 0:
time.sleep(delay)
print(f"Deleted {total_deleted} posts.")
logger.info(f"Deleted {total_deleted} posts.")

77
skywipe/quotes.py Normal file
View File

@@ -0,0 +1,77 @@
"""Quote post deletion module for Skywipe"""
import time
from .auth import Auth
from .configure import Configuration
from .logger import get_logger, ProgressTracker
def delete_quotes_posts():
logger = get_logger()
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
logger.info(f"Starting quote post deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_deleted = 0
batch_num = 0
progress = ProgressTracker(operation="Deleting quote posts")
while True:
batch_num += 1
response = client.get_author_feed(
actor=did, limit=batch_size, cursor=cursor)
posts = response.feed
if not posts:
break
progress.batch(batch_num, len(posts))
for post in posts:
post_record = post.post
embed = getattr(post_record, 'embed', None)
has_quote = False
if embed:
embed_type = getattr(embed, 'py_type', None)
if embed_type:
embed_type_base = embed_type.split('#')[0]
quote_types = {
'app.bsky.embed.record',
'app.bsky.embed.recordWithMedia',
'app.bsky.embed.record_with_media'
}
if embed_type_base in quote_types:
has_quote = True
if not has_quote and (hasattr(embed, 'record') or (isinstance(embed, dict) and embed.get('record'))):
has_quote = True
if not has_quote:
logger.debug(f"Skipping post without quote: {post_record.uri}")
continue
try:
client.delete_post(post_record.uri)
total_deleted += 1
progress.update(1)
logger.debug(f"Deleted quote post: {post_record.uri}")
except Exception as e:
logger.error(f"Error deleting quote post {post_record.uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
logger.info(f"Deleted {total_deleted} quote posts.")

73
skywipe/reposts.py Normal file
View File

@@ -0,0 +1,73 @@
"""Repost undoing module for Skywipe"""
import time
from atproto import models
from .auth import Auth
from .configure import Configuration
from .logger import get_logger, ProgressTracker
REPOST_COLLECTION = "app.bsky.feed.repost"
def undo_reposts():
logger = get_logger()
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
logger.info(
f"Starting repost deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_undone = 0
batch_num = 0
progress = ProgressTracker(operation="Undoing reposts")
while True:
batch_num += 1
list_params = models.ComAtprotoRepoListRecords.Params(
repo=did,
collection=REPOST_COLLECTION,
limit=batch_size,
cursor=cursor
)
response = client.com.atproto.repo.list_records(params=list_params)
records = response.records
if not records:
break
progress.batch(batch_num, len(records))
for record in records:
try:
record_uri = record.uri
rkey = record_uri.rsplit("/", 1)[-1]
delete_data = {
"repo": did,
"collection": REPOST_COLLECTION,
"rkey": rkey
}
client.com.atproto.repo.delete_record(data=delete_data)
total_undone += 1
progress.update(1)
logger.debug(f"Undone repost: {record_uri}")
except Exception as e:
record_uri = getattr(record, "uri", "unknown")
logger.error(f"Error undoing repost {record_uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
logger.info(f"Undone {total_undone} reposts.")

27
skywipe/safeguard.py Normal file
View File

@@ -0,0 +1,27 @@
"""Safeguard module for Skywipe"""
import sys
from .logger import get_logger
CONFIRM_RESPONSES = {"yes", "y"}
def require_confirmation(operation: str, skip_confirmation: bool = False) -> None:
if skip_confirmation:
return
logger = get_logger()
logger.warning(f"This will {operation}")
logger.warning("This operation is DESTRUCTIVE and cannot be undone!")
try:
response = input(
"Are you sure you want to continue? (y/N): ").strip().lower()
except (EOFError, KeyboardInterrupt):
logger.info("\nOperation cancelled.")
sys.exit(0)
if response not in CONFIRM_RESPONSES:
logger.info("Operation cancelled.")
sys.exit(0)