Compare commits
16 Commits
5868c1649b
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| ff20228fa6 | |||
| 9d9c09d56a | |||
| 64355fbeeb | |||
| 4a337e6b20 | |||
| f27be4d603 | |||
| 9d254ac4b7 | |||
| 590cc03ba4 | |||
| c493a99860 | |||
| 3f9ef6527f | |||
| f53e5bb527 | |||
| f2854a0df5 | |||
| defd991006 | |||
| f922e3cf9d | |||
| 06d70edaf1 | |||
| 58ab6cfafa | |||
| dba06e642a |
17
README.md
17
README.md
@@ -2,9 +2,9 @@
|
||||
|
||||
Skywipe is a work-in-progress Python 3.13+ CLI that helps you wipe data from your Bluesky account using the AT Protocol SDK.
|
||||
|
||||
## Warning
|
||||
**Warning:** This tool performs _**destructive operations**_.
|
||||
|
||||
This tool performs destructive operations. Only use it if you intend to erase data from your Bluesky account.
|
||||
Only use it if you intend to permanently erase data from your Bluesky account.
|
||||
|
||||
## Requirements
|
||||
|
||||
@@ -34,6 +34,10 @@ uv run python -m skywipe.cli follows # unfollow all
|
||||
uv run python -m skywipe.cli bookmarks # delete bookmarks
|
||||
```
|
||||
|
||||
Use the `--yes` flag to skip the confirmation prompt and proceed with the operation.
|
||||
|
||||
A log of the operations will be saved in `~/.cache/skywipe/skywipe.log`.
|
||||
|
||||
## Configuration
|
||||
|
||||
If you run the tool for the first time, it will prompt you to use `skywipe configure` to create the configuration file, which is located in `~/.config/skywipe/config.yml` :
|
||||
@@ -61,12 +65,9 @@ BE SURE TO USE A [BLUESKY APP PASSWORD](https://blueskyfeeds.com/faq-app-passwor
|
||||
- [x] unfollow accounts
|
||||
- [x] remove bookmarks
|
||||
- [x] make `all` run the other commands
|
||||
- [ ] add simple progress and logging
|
||||
- [ ] add safeguards like confirmations and clear dry-run info
|
||||
|
||||
Once it's done, we'll think:
|
||||
|
||||
- [ ] decent code architecture
|
||||
- [x] add simple progress and logging
|
||||
- [x] add safeguards (confirmation, dry-run flag)
|
||||
- [x] decent code architecture
|
||||
- [ ] installation and run process
|
||||
|
||||
## License
|
||||
|
||||
@@ -1,75 +0,0 @@
|
||||
"""Bookmark deletion module for Skywipe"""
|
||||
|
||||
import time
|
||||
from atproto import models
|
||||
from .auth import Auth
|
||||
from .configure import Configuration
|
||||
|
||||
|
||||
def delete_bookmarks():
|
||||
auth = Auth()
|
||||
client = auth.login()
|
||||
config = Configuration()
|
||||
config_data = config.load()
|
||||
|
||||
batch_size = config_data.get("batch_size", 10)
|
||||
delay = config_data.get("delay", 1)
|
||||
verbose = config_data.get("verbose", False)
|
||||
|
||||
if verbose:
|
||||
print(
|
||||
f"Starting bookmark deletion with batch_size={batch_size}, delay={delay}s")
|
||||
|
||||
cursor = None
|
||||
total_deleted = 0
|
||||
|
||||
while True:
|
||||
get_params = models.AppBskyBookmarkGetBookmarks.Params(
|
||||
limit=batch_size,
|
||||
cursor=cursor
|
||||
)
|
||||
|
||||
response = client.app.bsky.bookmark.get_bookmarks(params=get_params)
|
||||
|
||||
bookmarks = response.bookmarks
|
||||
if not bookmarks:
|
||||
break
|
||||
|
||||
for bookmark in bookmarks:
|
||||
try:
|
||||
bookmark_uri = None
|
||||
if hasattr(bookmark, "uri"):
|
||||
bookmark_uri = bookmark.uri
|
||||
else:
|
||||
for attr_name in ("subject", "record", "post", "item"):
|
||||
if hasattr(bookmark, attr_name):
|
||||
nested = getattr(bookmark, attr_name)
|
||||
if hasattr(nested, "uri"):
|
||||
bookmark_uri = nested.uri
|
||||
break
|
||||
|
||||
if not bookmark_uri:
|
||||
if verbose:
|
||||
print(f"Skipping bookmark: unable to find uri")
|
||||
continue
|
||||
|
||||
delete_data = models.AppBskyBookmarkDeleteBookmark.Data(
|
||||
uri=bookmark_uri
|
||||
)
|
||||
client.app.bsky.bookmark.delete_bookmark(data=delete_data)
|
||||
total_deleted += 1
|
||||
if verbose:
|
||||
print(f"Deleted bookmark: {bookmark_uri}")
|
||||
except Exception as e:
|
||||
bookmark_uri = getattr(bookmark, "uri", "unknown")
|
||||
if verbose:
|
||||
print(f"Error deleting bookmark {bookmark_uri}: {e}")
|
||||
|
||||
cursor = response.cursor
|
||||
if not cursor:
|
||||
break
|
||||
|
||||
if delay > 0:
|
||||
time.sleep(delay)
|
||||
|
||||
print(f"Deleted {total_deleted} bookmarks.")
|
||||
@@ -2,9 +2,11 @@
|
||||
|
||||
import sys
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
|
||||
from .commands import registry
|
||||
from .configure import Configuration
|
||||
from .logger import setup_logger
|
||||
|
||||
|
||||
def create_parser():
|
||||
@@ -15,6 +17,12 @@ def create_parser():
|
||||
epilog="WARNING: This tool performs destructive operations. Only use it if you intend to erase data from your Bluesky account."
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--yes",
|
||||
action="store_true",
|
||||
help="Skip confirmation prompt and proceed with destructive operations"
|
||||
)
|
||||
|
||||
subparsers = parser.add_subparsers(
|
||||
dest="command",
|
||||
help="Command to execute",
|
||||
@@ -31,8 +39,9 @@ def create_parser():
|
||||
def require_config():
|
||||
config = Configuration()
|
||||
if not config.exists():
|
||||
print("Error: Configuration file not found.")
|
||||
print("You must run 'skywipe configure' first.")
|
||||
logger = setup_logger(verbose=False)
|
||||
logger.error("Configuration file not found.")
|
||||
logger.error("You must run 'skywipe configure' first.")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
@@ -42,11 +51,24 @@ def main():
|
||||
|
||||
if registry.requires_config(args.command):
|
||||
require_config()
|
||||
config = Configuration()
|
||||
config_data = config.load()
|
||||
verbose = config_data.get("verbose", False)
|
||||
log_file = Path.home() / ".cache" / "skywipe" / "skywipe.log"
|
||||
setup_logger(verbose=verbose, log_file=log_file)
|
||||
else:
|
||||
setup_logger(verbose=False)
|
||||
|
||||
try:
|
||||
registry.execute(args.command)
|
||||
registry.execute(
|
||||
args.command, skip_confirmation=getattr(args, "yes", False))
|
||||
except ValueError as e:
|
||||
print(f"Error: {e}")
|
||||
logger = setup_logger(verbose=False)
|
||||
logger.error(f"{e}")
|
||||
sys.exit(1)
|
||||
except Exception as e:
|
||||
logger = setup_logger(verbose=False)
|
||||
logger.error(f"Unexpected error: {e}", exc_info=True)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
|
||||
@@ -2,16 +2,13 @@
|
||||
|
||||
from typing import Callable, Dict, Optional
|
||||
from .configure import Configuration
|
||||
from .posts import delete_all_posts
|
||||
from .medias import delete_posts_with_medias
|
||||
from .likes import undo_likes
|
||||
from .reposts import undo_reposts
|
||||
from .quotes import delete_quotes_posts
|
||||
from .follows import unfollow_all
|
||||
from .bookmarks import delete_bookmarks
|
||||
from .operations import Operation
|
||||
from .post_analysis import PostAnalyzer
|
||||
from .logger import get_logger
|
||||
from .safeguard import require_confirmation
|
||||
|
||||
|
||||
CommandHandler = Callable[[], None]
|
||||
CommandHandler = Callable[..., None]
|
||||
|
||||
|
||||
class CommandRegistry:
|
||||
@@ -43,10 +40,13 @@ class CommandRegistry:
|
||||
def get_all_commands(self) -> Dict[str, str]:
|
||||
return self._help_texts.copy()
|
||||
|
||||
def execute(self, name: str):
|
||||
def execute(self, name: str, skip_confirmation: bool = False):
|
||||
handler = self.get_handler(name)
|
||||
if handler:
|
||||
if name == "configure":
|
||||
handler()
|
||||
else:
|
||||
handler(skip_confirmation)
|
||||
else:
|
||||
raise ValueError(f"Unknown command: {name}")
|
||||
|
||||
@@ -59,43 +59,63 @@ def run_configure():
|
||||
config.create()
|
||||
|
||||
|
||||
def run_posts():
|
||||
delete_all_posts()
|
||||
def run_posts(skip_confirmation: bool = False):
|
||||
require_confirmation("delete all posts", skip_confirmation)
|
||||
Operation("Deleting posts").run()
|
||||
|
||||
|
||||
def run_medias():
|
||||
delete_posts_with_medias()
|
||||
def run_medias(skip_confirmation: bool = False):
|
||||
require_confirmation("delete all posts with media", skip_confirmation)
|
||||
Operation("Deleting posts with media",
|
||||
filter_fn=lambda post: PostAnalyzer.has_media(post.post)).run()
|
||||
|
||||
|
||||
def run_likes():
|
||||
undo_likes()
|
||||
def run_likes(skip_confirmation: bool = False):
|
||||
require_confirmation("undo all likes", skip_confirmation)
|
||||
Operation("Undoing likes", strategy_type="record",
|
||||
collection="app.bsky.feed.like").run()
|
||||
|
||||
|
||||
def run_reposts():
|
||||
undo_reposts()
|
||||
def run_reposts(skip_confirmation: bool = False):
|
||||
require_confirmation("undo all reposts", skip_confirmation)
|
||||
Operation("Undoing reposts", strategy_type="record",
|
||||
collection="app.bsky.feed.repost").run()
|
||||
|
||||
|
||||
def run_quotes():
|
||||
delete_quotes_posts()
|
||||
def run_quotes(skip_confirmation: bool = False):
|
||||
require_confirmation("delete all quote posts", skip_confirmation)
|
||||
Operation("Deleting quote posts",
|
||||
filter_fn=lambda post: PostAnalyzer.has_quote(post.post)).run()
|
||||
|
||||
|
||||
def run_follows():
|
||||
unfollow_all()
|
||||
def run_follows(skip_confirmation: bool = False):
|
||||
require_confirmation("unfollow all accounts", skip_confirmation)
|
||||
Operation("Unfollowing accounts", strategy_type="record",
|
||||
collection="app.bsky.graph.follow").run()
|
||||
|
||||
|
||||
def run_bookmarks():
|
||||
delete_bookmarks()
|
||||
def run_bookmarks(skip_confirmation: bool = False):
|
||||
require_confirmation("delete all bookmarks", skip_confirmation)
|
||||
Operation("Deleting bookmarks", strategy_type="bookmark").run()
|
||||
|
||||
|
||||
def run_all():
|
||||
def run_all(skip_confirmation: bool = False):
|
||||
logger = get_logger()
|
||||
require_confirmation(
|
||||
"run all cleanup commands (posts, likes, reposts, follows, bookmarks)", skip_confirmation)
|
||||
|
||||
commands = ["posts", "likes", "reposts", "follows", "bookmarks"]
|
||||
|
||||
logger.info("Running all cleanup commands...")
|
||||
for cmd in commands:
|
||||
try:
|
||||
registry.execute(cmd)
|
||||
logger.info(f"Starting command: {cmd}")
|
||||
registry.execute(cmd, skip_confirmation=True)
|
||||
logger.info(f"Completed command: {cmd}")
|
||||
except Exception as e:
|
||||
print(f"Error running '{cmd}': {e}")
|
||||
logger.error(f"Error running '{cmd}': {e}", exc_info=True)
|
||||
continue
|
||||
logger.info("All commands completed.")
|
||||
|
||||
|
||||
registry.register("configure", run_configure,
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
import getpass
|
||||
from pathlib import Path
|
||||
import yaml
|
||||
from .logger import setup_logger
|
||||
|
||||
|
||||
class Configuration:
|
||||
@@ -13,10 +14,12 @@ class Configuration:
|
||||
return self.config_file.exists()
|
||||
|
||||
def create(self):
|
||||
logger = setup_logger(verbose=False)
|
||||
if self.exists():
|
||||
overwrite = input(
|
||||
"Configuration already exists. Overwrite? (y/N): ").strip().lower()
|
||||
if overwrite not in ("y", "yes"):
|
||||
logger.info("Configuration creation cancelled.")
|
||||
return
|
||||
|
||||
config_dir = self.config_file.parent
|
||||
@@ -37,7 +40,7 @@ class Configuration:
|
||||
batch_size = int(batch_size)
|
||||
delay = int(delay)
|
||||
except ValueError:
|
||||
print("Error: batch_size and delay must be integers")
|
||||
logger.error("batch_size and delay must be integers")
|
||||
return
|
||||
|
||||
config_data = {
|
||||
@@ -51,7 +54,7 @@ class Configuration:
|
||||
with open(self.config_file, "w") as f:
|
||||
yaml.dump(config_data, f, default_flow_style=False)
|
||||
|
||||
print(f"\nConfiguration saved to {self.config_file}")
|
||||
logger.info(f"Configuration saved to {self.config_file}")
|
||||
|
||||
def load(self) -> dict:
|
||||
if not self.exists():
|
||||
|
||||
@@ -1,69 +0,0 @@
|
||||
"""Follow undoing module for Skywipe"""
|
||||
|
||||
import time
|
||||
from atproto import models
|
||||
from .auth import Auth
|
||||
from .configure import Configuration
|
||||
|
||||
|
||||
FOLLOW_COLLECTION = "app.bsky.graph.follow"
|
||||
|
||||
|
||||
def unfollow_all():
|
||||
auth = Auth()
|
||||
client = auth.login()
|
||||
config = Configuration()
|
||||
config_data = config.load()
|
||||
|
||||
batch_size = config_data.get("batch_size", 10)
|
||||
delay = config_data.get("delay", 1)
|
||||
verbose = config_data.get("verbose", False)
|
||||
|
||||
if verbose:
|
||||
print(
|
||||
f"Starting unfollow operation with batch_size={batch_size}, delay={delay}s")
|
||||
|
||||
did = client.me.did
|
||||
cursor = None
|
||||
total_unfollowed = 0
|
||||
|
||||
while True:
|
||||
list_params = models.ComAtprotoRepoListRecords.Params(
|
||||
repo=did,
|
||||
collection=FOLLOW_COLLECTION,
|
||||
limit=batch_size,
|
||||
cursor=cursor
|
||||
)
|
||||
|
||||
response = client.com.atproto.repo.list_records(params=list_params)
|
||||
|
||||
records = response.records
|
||||
if not records:
|
||||
break
|
||||
|
||||
for record in records:
|
||||
try:
|
||||
record_uri = record.uri
|
||||
rkey = record_uri.rsplit("/", 1)[-1]
|
||||
delete_data = {
|
||||
"repo": did,
|
||||
"collection": FOLLOW_COLLECTION,
|
||||
"rkey": rkey
|
||||
}
|
||||
client.com.atproto.repo.delete_record(data=delete_data)
|
||||
total_unfollowed += 1
|
||||
if verbose:
|
||||
print(f"Unfollowed: {record_uri}")
|
||||
except Exception as e:
|
||||
record_uri = getattr(record, "uri", "unknown")
|
||||
if verbose:
|
||||
print(f"Error unfollowing {record_uri}: {e}")
|
||||
|
||||
cursor = response.cursor
|
||||
if not cursor:
|
||||
break
|
||||
|
||||
if delay > 0:
|
||||
time.sleep(delay)
|
||||
|
||||
print(f"Unfollowed {total_unfollowed} accounts.")
|
||||
@@ -1,69 +0,0 @@
|
||||
"""Like undoing module for Skywipe"""
|
||||
|
||||
import time
|
||||
from atproto import models
|
||||
from .auth import Auth
|
||||
from .configure import Configuration
|
||||
|
||||
|
||||
LIKE_COLLECTION = "app.bsky.feed.like"
|
||||
|
||||
|
||||
def undo_likes():
|
||||
auth = Auth()
|
||||
client = auth.login()
|
||||
config = Configuration()
|
||||
config_data = config.load()
|
||||
|
||||
batch_size = config_data.get("batch_size", 10)
|
||||
delay = config_data.get("delay", 1)
|
||||
verbose = config_data.get("verbose", False)
|
||||
|
||||
if verbose:
|
||||
print(
|
||||
f"Starting like deletion with batch_size={batch_size}, delay={delay}s")
|
||||
|
||||
did = client.me.did
|
||||
cursor = None
|
||||
total_undone = 0
|
||||
|
||||
while True:
|
||||
list_params = models.ComAtprotoRepoListRecords.Params(
|
||||
repo=did,
|
||||
collection=LIKE_COLLECTION,
|
||||
limit=batch_size,
|
||||
cursor=cursor
|
||||
)
|
||||
|
||||
response = client.com.atproto.repo.list_records(params=list_params)
|
||||
|
||||
records = response.records
|
||||
if not records:
|
||||
break
|
||||
|
||||
for record in records:
|
||||
try:
|
||||
record_uri = record.uri
|
||||
rkey = record_uri.rsplit("/", 1)[-1]
|
||||
delete_data = {
|
||||
"repo": did,
|
||||
"collection": LIKE_COLLECTION,
|
||||
"rkey": rkey
|
||||
}
|
||||
client.com.atproto.repo.delete_record(data=delete_data)
|
||||
total_undone += 1
|
||||
if verbose:
|
||||
print(f"Undone like: {record_uri}")
|
||||
except Exception as e:
|
||||
record_uri = getattr(record, "uri", "unknown")
|
||||
if verbose:
|
||||
print(f"Error undoing like {record_uri}: {e}")
|
||||
|
||||
cursor = response.cursor
|
||||
if not cursor:
|
||||
break
|
||||
|
||||
if delay > 0:
|
||||
time.sleep(delay)
|
||||
|
||||
print(f"Undone {total_undone} likes.")
|
||||
73
skywipe/logger.py
Normal file
73
skywipe/logger.py
Normal file
@@ -0,0 +1,73 @@
|
||||
"""Centralized logging module for Skywipe"""
|
||||
|
||||
import logging
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
|
||||
class ProgressTracker:
|
||||
def __init__(self, operation: str = "Processing"):
|
||||
self.current = 0
|
||||
self.operation = operation
|
||||
|
||||
def update(self, count: int = 1):
|
||||
self.current += count
|
||||
|
||||
def batch(self, batch_num: int, batch_size: int, total_batches: Optional[int] = None):
|
||||
logger = logging.getLogger("skywipe.progress")
|
||||
if total_batches:
|
||||
logger.info(
|
||||
f"{self.operation} - batch {batch_num}/{total_batches} ({batch_size} items)"
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
f"{self.operation} - batch {batch_num} ({batch_size} items)")
|
||||
|
||||
|
||||
class LevelFilter(logging.Filter):
|
||||
def __init__(self, min_level: int, max_level: int):
|
||||
super().__init__()
|
||||
self.min_level = min_level
|
||||
self.max_level = max_level
|
||||
|
||||
def filter(self, record: logging.LogRecord) -> bool:
|
||||
return self.min_level <= record.levelno <= self.max_level
|
||||
|
||||
|
||||
def setup_logger(verbose: bool = False, log_file: Optional[Path] = None) -> logging.Logger:
|
||||
logger = logging.getLogger("skywipe")
|
||||
logger.setLevel(logging.DEBUG if verbose else logging.INFO)
|
||||
|
||||
if logger.handlers:
|
||||
return logger
|
||||
|
||||
formatter = logging.Formatter(fmt="%(levelname)s: %(message)s")
|
||||
|
||||
info_handler = logging.StreamHandler(sys.stdout)
|
||||
info_handler.setLevel(logging.DEBUG if verbose else logging.INFO)
|
||||
info_handler.addFilter(LevelFilter(logging.DEBUG, logging.INFO))
|
||||
info_handler.setFormatter(formatter)
|
||||
logger.addHandler(info_handler)
|
||||
|
||||
error_handler = logging.StreamHandler(sys.stderr)
|
||||
error_handler.setLevel(logging.WARNING)
|
||||
error_handler.setFormatter(formatter)
|
||||
logger.addHandler(error_handler)
|
||||
|
||||
if log_file:
|
||||
log_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
file_handler = logging.FileHandler(log_file, encoding="utf-8")
|
||||
file_handler.setLevel(logging.DEBUG)
|
||||
file_formatter = logging.Formatter(
|
||||
fmt="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S"
|
||||
)
|
||||
file_handler.setFormatter(file_formatter)
|
||||
logger.addHandler(file_handler)
|
||||
|
||||
return logger
|
||||
|
||||
|
||||
def get_logger() -> logging.Logger:
|
||||
return logging.getLogger("skywipe")
|
||||
@@ -1,92 +0,0 @@
|
||||
"""Media post deletion module for Skywipe"""
|
||||
|
||||
import time
|
||||
from .auth import Auth
|
||||
from .configure import Configuration
|
||||
|
||||
|
||||
def delete_posts_with_medias():
|
||||
auth = Auth()
|
||||
client = auth.login()
|
||||
config = Configuration()
|
||||
config_data = config.load()
|
||||
|
||||
batch_size = config_data.get("batch_size", 10)
|
||||
delay = config_data.get("delay", 1)
|
||||
verbose = config_data.get("verbose", False)
|
||||
|
||||
if verbose:
|
||||
print(
|
||||
f"Starting media post deletion with batch_size={batch_size}, delay={delay}s")
|
||||
|
||||
did = client.me.did
|
||||
cursor = None
|
||||
total_deleted = 0
|
||||
|
||||
while True:
|
||||
response = client.get_author_feed(
|
||||
actor=did, limit=batch_size, cursor=cursor)
|
||||
|
||||
posts = response.feed
|
||||
if not posts:
|
||||
break
|
||||
|
||||
for post in posts:
|
||||
post_record = post.post
|
||||
|
||||
embed = getattr(post_record, 'embed', None)
|
||||
has_media = False
|
||||
if embed:
|
||||
embed_type = getattr(embed, 'py_type', None)
|
||||
media_types = {
|
||||
'app.bsky.embed.images',
|
||||
'app.bsky.embed.video',
|
||||
'app.bsky.embed.external'
|
||||
}
|
||||
|
||||
if embed_type:
|
||||
embed_type_base = embed_type.split('#')[0]
|
||||
|
||||
if embed_type_base in media_types:
|
||||
has_media = True
|
||||
|
||||
if not has_media and embed_type_base in ('app.bsky.embed.recordWithMedia', 'app.bsky.embed.record_with_media'):
|
||||
media = getattr(embed, 'media', None)
|
||||
if media:
|
||||
media_type = getattr(media, 'py_type', None)
|
||||
if media_type:
|
||||
media_type_base = media_type.split('#')[0]
|
||||
if media_type_base in media_types:
|
||||
has_media = True
|
||||
|
||||
if not has_media:
|
||||
for attr in ('images', 'video', 'external'):
|
||||
if hasattr(embed, attr):
|
||||
has_media = True
|
||||
break
|
||||
if isinstance(embed, dict) and embed.get(attr):
|
||||
has_media = True
|
||||
break
|
||||
|
||||
if not has_media:
|
||||
if verbose:
|
||||
print(f"Skipping post without media: {post_record.uri}")
|
||||
continue
|
||||
|
||||
try:
|
||||
client.delete_post(post_record.uri)
|
||||
total_deleted += 1
|
||||
if verbose:
|
||||
print(f"Deleted post with media: {post_record.uri}")
|
||||
except Exception as e:
|
||||
if verbose:
|
||||
print(f"Error deleting post {post_record.uri}: {e}")
|
||||
|
||||
cursor = response.cursor
|
||||
if not cursor:
|
||||
break
|
||||
|
||||
if delay > 0:
|
||||
time.sleep(delay)
|
||||
|
||||
print(f"Deleted {total_deleted} posts with media.")
|
||||
222
skywipe/operations.py
Normal file
222
skywipe/operations.py
Normal file
@@ -0,0 +1,222 @@
|
||||
"""Shared operation utilities and strategies for Skywipe"""
|
||||
|
||||
import time
|
||||
from typing import Callable, Optional, Any
|
||||
from atproto import models
|
||||
|
||||
from .auth import Auth
|
||||
from .configure import Configuration
|
||||
from .logger import get_logger, ProgressTracker
|
||||
|
||||
|
||||
class OperationContext:
|
||||
def __init__(self):
|
||||
self.logger = get_logger()
|
||||
self.auth = Auth()
|
||||
self.client = self.auth.login()
|
||||
self.config = Configuration()
|
||||
self.config_data = self.config.load()
|
||||
self.batch_size = self.config_data.get("batch_size", 10)
|
||||
self.delay = self.config_data.get("delay", 1)
|
||||
self.did = self.client.me.did
|
||||
|
||||
|
||||
class RecordDeletionStrategy:
|
||||
def __init__(self, collection: str):
|
||||
self.collection = collection
|
||||
|
||||
def fetch(self, context: OperationContext, cursor: Optional[str] = None):
|
||||
list_params = models.ComAtprotoRepoListRecords.Params(
|
||||
repo=context.did,
|
||||
collection=self.collection,
|
||||
limit=context.batch_size,
|
||||
cursor=cursor
|
||||
)
|
||||
return context.client.com.atproto.repo.list_records(params=list_params)
|
||||
|
||||
def extract_items(self, response):
|
||||
return response.records
|
||||
|
||||
def get_cursor(self, response):
|
||||
return response.cursor
|
||||
|
||||
def process_item(self, record, context: OperationContext):
|
||||
record_uri = record.uri
|
||||
rkey = record_uri.rsplit("/", 1)[-1]
|
||||
delete_data = {
|
||||
"repo": context.did,
|
||||
"collection": self.collection,
|
||||
"rkey": rkey
|
||||
}
|
||||
context.client.com.atproto.repo.delete_record(data=delete_data)
|
||||
context.logger.debug(f"Deleted: {record_uri}")
|
||||
|
||||
|
||||
class FeedStrategy:
|
||||
def fetch(self, context: OperationContext, cursor: Optional[str] = None):
|
||||
if cursor:
|
||||
return context.client.get_author_feed(
|
||||
actor=context.did, limit=context.batch_size, cursor=cursor
|
||||
)
|
||||
return context.client.get_author_feed(actor=context.did, limit=context.batch_size)
|
||||
|
||||
def extract_items(self, response):
|
||||
return response.feed
|
||||
|
||||
def get_cursor(self, response):
|
||||
return response.cursor
|
||||
|
||||
def process_item(self, post, context: OperationContext):
|
||||
uri = post.post.uri
|
||||
context.client.delete_post(uri)
|
||||
context.logger.debug(f"Deleted post: {uri}")
|
||||
|
||||
|
||||
class BookmarkStrategy:
|
||||
def fetch(self, context: OperationContext, cursor: Optional[str] = None):
|
||||
get_params = models.AppBskyBookmarkGetBookmarks.Params(
|
||||
limit=context.batch_size,
|
||||
cursor=cursor
|
||||
)
|
||||
return context.client.app.bsky.bookmark.get_bookmarks(params=get_params)
|
||||
|
||||
def extract_items(self, response):
|
||||
return response.bookmarks
|
||||
|
||||
def get_cursor(self, response):
|
||||
return response.cursor
|
||||
|
||||
def process_item(self, bookmark, context: OperationContext):
|
||||
bookmark_uri = self._extract_bookmark_uri(bookmark)
|
||||
if not bookmark_uri:
|
||||
raise ValueError("Unable to find bookmark URI")
|
||||
|
||||
delete_data = models.AppBskyBookmarkDeleteBookmark.Data(
|
||||
uri=bookmark_uri)
|
||||
context.client.app.bsky.bookmark.delete_bookmark(data=delete_data)
|
||||
context.logger.debug(f"Deleted bookmark: {bookmark_uri}")
|
||||
|
||||
def _extract_bookmark_uri(self, bookmark):
|
||||
if hasattr(bookmark, "uri"):
|
||||
return bookmark.uri
|
||||
|
||||
for attr_name in ("subject", "record", "post", "item"):
|
||||
if hasattr(bookmark, attr_name):
|
||||
nested = getattr(bookmark, attr_name)
|
||||
if hasattr(nested, "uri"):
|
||||
return nested.uri
|
||||
return None
|
||||
|
||||
|
||||
class Operation:
|
||||
def __init__(
|
||||
self,
|
||||
operation_name: str,
|
||||
strategy_type: str = "feed",
|
||||
collection: Optional[str] = None,
|
||||
filter_fn: Optional[Callable[[Any], bool]] = None
|
||||
):
|
||||
self.operation_name = operation_name
|
||||
self.filter_fn = filter_fn
|
||||
|
||||
if strategy_type == "record":
|
||||
if not collection:
|
||||
raise ValueError("Collection is required for record strategy")
|
||||
self.strategy = RecordDeletionStrategy(collection)
|
||||
elif strategy_type == "bookmark":
|
||||
self.strategy = BookmarkStrategy()
|
||||
else:
|
||||
self.strategy = FeedStrategy()
|
||||
|
||||
def run(self) -> int:
|
||||
context = OperationContext()
|
||||
progress = ProgressTracker(operation=self.operation_name)
|
||||
|
||||
context.logger.info(
|
||||
f"Starting {self.operation_name} with batch_size={context.batch_size}, delay={context.delay}s"
|
||||
)
|
||||
|
||||
cursor = None
|
||||
total_processed = 0
|
||||
batch_num = 0
|
||||
|
||||
while True:
|
||||
batch_num += 1
|
||||
response = self.strategy.fetch(context, cursor)
|
||||
items = self.strategy.extract_items(response)
|
||||
|
||||
if not items:
|
||||
break
|
||||
|
||||
progress.batch(batch_num, len(items))
|
||||
|
||||
for item in items:
|
||||
if self.filter_fn and not self.filter_fn(item):
|
||||
continue
|
||||
|
||||
try:
|
||||
self.strategy.process_item(item, context)
|
||||
total_processed += 1
|
||||
progress.update(1)
|
||||
except Exception as e:
|
||||
context.logger.error(f"Error processing item: {e}")
|
||||
|
||||
cursor = self.strategy.get_cursor(response)
|
||||
if not cursor:
|
||||
break
|
||||
|
||||
if context.delay > 0:
|
||||
time.sleep(context.delay)
|
||||
|
||||
context.logger.info(
|
||||
f"{self.operation_name}: {total_processed} items processed.")
|
||||
return total_processed
|
||||
|
||||
|
||||
def run_operation(
|
||||
strategy,
|
||||
operation_name: str,
|
||||
filter_fn: Optional[Callable[[Any], bool]] = None
|
||||
) -> int:
|
||||
context = OperationContext()
|
||||
progress = ProgressTracker(operation=operation_name)
|
||||
|
||||
context.logger.info(
|
||||
f"Starting {operation_name} with batch_size={context.batch_size}, delay={context.delay}s"
|
||||
)
|
||||
|
||||
cursor = None
|
||||
total_processed = 0
|
||||
batch_num = 0
|
||||
|
||||
while True:
|
||||
batch_num += 1
|
||||
response = strategy.fetch(context, cursor)
|
||||
items = strategy.extract_items(response)
|
||||
|
||||
if not items:
|
||||
break
|
||||
|
||||
progress.batch(batch_num, len(items))
|
||||
|
||||
for item in items:
|
||||
if filter_fn and not filter_fn(item):
|
||||
continue
|
||||
|
||||
try:
|
||||
strategy.process_item(item, context)
|
||||
total_processed += 1
|
||||
progress.update(1)
|
||||
except Exception as e:
|
||||
context.logger.error(f"Error processing item: {e}")
|
||||
|
||||
cursor = strategy.get_cursor(response)
|
||||
if not cursor:
|
||||
break
|
||||
|
||||
if context.delay > 0:
|
||||
time.sleep(context.delay)
|
||||
|
||||
context.logger.info(
|
||||
f"{operation_name}: {total_processed} items processed.")
|
||||
return total_processed
|
||||
60
skywipe/post_analysis.py
Normal file
60
skywipe/post_analysis.py
Normal file
@@ -0,0 +1,60 @@
|
||||
"""Post analysis utilities for Skywipe"""
|
||||
|
||||
|
||||
class PostAnalyzer:
|
||||
MEDIA_TYPES = {
|
||||
'app.bsky.embed.images',
|
||||
'app.bsky.embed.video',
|
||||
'app.bsky.embed.external'
|
||||
}
|
||||
|
||||
QUOTE_TYPES = {
|
||||
'app.bsky.embed.record',
|
||||
'app.bsky.embed.recordWithMedia',
|
||||
'app.bsky.embed.record_with_media'
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def has_media(post_record):
|
||||
embed = getattr(post_record, 'embed', None)
|
||||
if not embed:
|
||||
return False
|
||||
|
||||
embed_type = getattr(embed, 'py_type', None)
|
||||
if embed_type:
|
||||
embed_type_base = embed_type.split('#')[0]
|
||||
|
||||
if embed_type_base in PostAnalyzer.MEDIA_TYPES:
|
||||
return True
|
||||
|
||||
if embed_type_base in ('app.bsky.embed.recordWithMedia', 'app.bsky.embed.record_with_media'):
|
||||
media = getattr(embed, 'media', None)
|
||||
if media:
|
||||
media_type = getattr(media, 'py_type', None)
|
||||
if media_type:
|
||||
media_type_base = media_type.split('#')[0]
|
||||
if media_type_base in PostAnalyzer.MEDIA_TYPES:
|
||||
return True
|
||||
|
||||
for attr in ('images', 'video', 'external'):
|
||||
if hasattr(embed, attr):
|
||||
return True
|
||||
if isinstance(embed, dict) and embed.get(attr):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def has_quote(post_record):
|
||||
embed = getattr(post_record, 'embed', None)
|
||||
if not embed:
|
||||
return False
|
||||
|
||||
embed_type = getattr(embed, 'py_type', None)
|
||||
if embed_type:
|
||||
embed_type_base = embed_type.split('#')[0]
|
||||
if embed_type_base in PostAnalyzer.QUOTE_TYPES:
|
||||
return True
|
||||
|
||||
return (hasattr(embed, 'record') or
|
||||
(isinstance(embed, dict) and embed.get('record')))
|
||||
@@ -1,56 +0,0 @@
|
||||
"""Post deletion module for Skywipe"""
|
||||
|
||||
import time
|
||||
from .auth import Auth
|
||||
from .configure import Configuration
|
||||
|
||||
|
||||
def delete_all_posts():
|
||||
auth = Auth()
|
||||
client = auth.login()
|
||||
config = Configuration()
|
||||
config_data = config.load()
|
||||
|
||||
batch_size = config_data.get("batch_size", 10)
|
||||
delay = config_data.get("delay", 1)
|
||||
verbose = config_data.get("verbose", False)
|
||||
|
||||
if verbose:
|
||||
print(
|
||||
f"Starting post deletion with batch_size={batch_size}, delay={delay}s")
|
||||
|
||||
did = client.me.did
|
||||
cursor = None
|
||||
total_deleted = 0
|
||||
|
||||
while True:
|
||||
if cursor:
|
||||
response = client.get_author_feed(
|
||||
actor=did, limit=batch_size, cursor=cursor)
|
||||
else:
|
||||
response = client.get_author_feed(actor=did, limit=batch_size)
|
||||
|
||||
posts = response.feed
|
||||
if not posts:
|
||||
break
|
||||
|
||||
post_uris = [post.post.uri for post in posts]
|
||||
|
||||
for uri in post_uris:
|
||||
try:
|
||||
client.delete_post(uri)
|
||||
total_deleted += 1
|
||||
if verbose:
|
||||
print(f"Deleted post: {uri}")
|
||||
except Exception as e:
|
||||
if verbose:
|
||||
print(f"Error deleting post {uri}: {e}")
|
||||
|
||||
cursor = response.cursor
|
||||
if not cursor:
|
||||
break
|
||||
|
||||
if delay > 0:
|
||||
time.sleep(delay)
|
||||
|
||||
print(f"Deleted {total_deleted} posts.")
|
||||
@@ -1,75 +0,0 @@
|
||||
"""Quote post deletion module for Skywipe"""
|
||||
|
||||
import time
|
||||
from .auth import Auth
|
||||
from .configure import Configuration
|
||||
|
||||
|
||||
def delete_quotes_posts():
|
||||
auth = Auth()
|
||||
client = auth.login()
|
||||
config = Configuration()
|
||||
config_data = config.load()
|
||||
|
||||
batch_size = config_data.get("batch_size", 10)
|
||||
delay = config_data.get("delay", 1)
|
||||
verbose = config_data.get("verbose", False)
|
||||
|
||||
if verbose:
|
||||
print(
|
||||
f"Starting quote post deletion with batch_size={batch_size}, delay={delay}s")
|
||||
|
||||
did = client.me.did
|
||||
cursor = None
|
||||
total_deleted = 0
|
||||
|
||||
while True:
|
||||
response = client.get_author_feed(
|
||||
actor=did, limit=batch_size, cursor=cursor)
|
||||
|
||||
posts = response.feed
|
||||
if not posts:
|
||||
break
|
||||
|
||||
for post in posts:
|
||||
post_record = post.post
|
||||
|
||||
embed = getattr(post_record, 'embed', None)
|
||||
has_quote = False
|
||||
if embed:
|
||||
embed_type = getattr(embed, 'py_type', None)
|
||||
if embed_type:
|
||||
embed_type_base = embed_type.split('#')[0]
|
||||
quote_types = {
|
||||
'app.bsky.embed.record',
|
||||
'app.bsky.embed.recordWithMedia',
|
||||
'app.bsky.embed.record_with_media'
|
||||
}
|
||||
if embed_type_base in quote_types:
|
||||
has_quote = True
|
||||
|
||||
if not has_quote and (hasattr(embed, 'record') or (isinstance(embed, dict) and embed.get('record'))):
|
||||
has_quote = True
|
||||
|
||||
if not has_quote:
|
||||
if verbose:
|
||||
print(f"Skipping post without quote: {post_record.uri}")
|
||||
continue
|
||||
|
||||
try:
|
||||
client.delete_post(post_record.uri)
|
||||
total_deleted += 1
|
||||
if verbose:
|
||||
print(f"Deleted quote post: {post_record.uri}")
|
||||
except Exception as e:
|
||||
if verbose:
|
||||
print(f"Error deleting quote post {post_record.uri}: {e}")
|
||||
|
||||
cursor = response.cursor
|
||||
if not cursor:
|
||||
break
|
||||
|
||||
if delay > 0:
|
||||
time.sleep(delay)
|
||||
|
||||
print(f"Deleted {total_deleted} quote posts.")
|
||||
@@ -1,69 +0,0 @@
|
||||
"""Repost undoing module for Skywipe"""
|
||||
|
||||
import time
|
||||
from atproto import models
|
||||
from .auth import Auth
|
||||
from .configure import Configuration
|
||||
|
||||
|
||||
REPOST_COLLECTION = "app.bsky.feed.repost"
|
||||
|
||||
|
||||
def undo_reposts():
|
||||
auth = Auth()
|
||||
client = auth.login()
|
||||
config = Configuration()
|
||||
config_data = config.load()
|
||||
|
||||
batch_size = config_data.get("batch_size", 10)
|
||||
delay = config_data.get("delay", 1)
|
||||
verbose = config_data.get("verbose", False)
|
||||
|
||||
if verbose:
|
||||
print(
|
||||
f"Starting repost deletion with batch_size={batch_size}, delay={delay}s")
|
||||
|
||||
did = client.me.did
|
||||
cursor = None
|
||||
total_undone = 0
|
||||
|
||||
while True:
|
||||
list_params = models.ComAtprotoRepoListRecords.Params(
|
||||
repo=did,
|
||||
collection=REPOST_COLLECTION,
|
||||
limit=batch_size,
|
||||
cursor=cursor
|
||||
)
|
||||
|
||||
response = client.com.atproto.repo.list_records(params=list_params)
|
||||
|
||||
records = response.records
|
||||
if not records:
|
||||
break
|
||||
|
||||
for record in records:
|
||||
try:
|
||||
record_uri = record.uri
|
||||
rkey = record_uri.rsplit("/", 1)[-1]
|
||||
delete_data = {
|
||||
"repo": did,
|
||||
"collection": REPOST_COLLECTION,
|
||||
"rkey": rkey
|
||||
}
|
||||
client.com.atproto.repo.delete_record(data=delete_data)
|
||||
total_undone += 1
|
||||
if verbose:
|
||||
print(f"Undone repost: {record_uri}")
|
||||
except Exception as e:
|
||||
record_uri = getattr(record, "uri", "unknown")
|
||||
if verbose:
|
||||
print(f"Error undoing repost {record_uri}: {e}")
|
||||
|
||||
cursor = response.cursor
|
||||
if not cursor:
|
||||
break
|
||||
|
||||
if delay > 0:
|
||||
time.sleep(delay)
|
||||
|
||||
print(f"Undone {total_undone} reposts.")
|
||||
27
skywipe/safeguard.py
Normal file
27
skywipe/safeguard.py
Normal file
@@ -0,0 +1,27 @@
|
||||
"""Safeguard module for Skywipe"""
|
||||
|
||||
import sys
|
||||
from .logger import get_logger
|
||||
|
||||
|
||||
CONFIRM_RESPONSES = {"yes", "y"}
|
||||
|
||||
|
||||
def require_confirmation(operation: str, skip_confirmation: bool = False) -> None:
|
||||
if skip_confirmation:
|
||||
return
|
||||
|
||||
logger = get_logger()
|
||||
logger.warning(f"This will {operation}")
|
||||
logger.warning("This operation is DESTRUCTIVE and cannot be undone!")
|
||||
|
||||
try:
|
||||
response = input(
|
||||
"Are you sure you want to continue? (y/N): ").strip().lower()
|
||||
except (EOFError, KeyboardInterrupt):
|
||||
logger.info("\nOperation cancelled.")
|
||||
sys.exit(0)
|
||||
|
||||
if response not in CONFIRM_RESPONSES:
|
||||
logger.info("Operation cancelled.")
|
||||
sys.exit(0)
|
||||
Reference in New Issue
Block a user