Compare commits

..

51 Commits

Author SHA1 Message Date
dba06e642a docs: update roadmap 2025-12-19 14:39:15 +01:00
5868c1649b docs: update readme 2025-12-19 14:35:25 +01:00
a14184cddc feat: run all 2025-12-19 14:35:21 +01:00
6587f8c39c feat: implement bookmark deletion module 2025-12-19 14:35:05 +01:00
ae6663572c refactor: inline has_quote_embed 2025-12-19 14:34:42 +01:00
3eb456e999 refactor: delete_posts is now delete_all_posts 2025-12-19 14:34:25 +01:00
cfa5773e62 refactor: inline has_media_embed 2025-12-19 14:34:02 +01:00
ddee2a6029 feat: implement follow undoing module 2025-12-19 14:33:43 +01:00
5e2b4f3408 docs: update readme 2025-12-19 14:10:37 +01:00
c238278df6 feat: add quote post deletion 2025-12-19 14:10:33 +01:00
a9c25c8c10 feat: implemented delete_quotes() 2025-12-19 14:10:25 +01:00
5ff25b3eb6 docs: update readme 2025-12-19 12:51:30 +01:00
c396ba8ae9 feat: create repost undoing module 2025-12-19 12:51:25 +01:00
d12f14a994 feat: implemented undo_reposts() 2025-12-19 12:51:03 +01:00
a4b622bfd3 docs: update chips 2025-12-19 10:11:50 +01:00
005c76119f docs: update readme 2025-12-18 16:29:44 +01:00
b1e2b266f4 refactor: switch to relative imports 2025-12-18 16:08:15 +01:00
6475a117e7 build: update entrypoint 2025-12-18 16:06:16 +01:00
28a193078a docs: update readme 2025-12-18 16:06:09 +01:00
07bbe88784 refactor: move the entrypoint inside package 2025-12-18 16:06:04 +01:00
59554f6f17 feat: plan quotes command 2025-12-18 15:28:23 +01:00
02f609d829 docs: plan to have a quotes-only command 2025-12-18 15:28:16 +01:00
c8df3d0460 docs: update roadmap 2025-12-18 15:13:55 +01:00
5afee97259 feat: like undoing module 2025-12-18 15:13:51 +01:00
ebbcbeeaa7 feat: undo_likes() implemented 2025-12-18 15:13:36 +01:00
044ec67aa3 feat: prepare bookmark command 2025-12-18 14:36:41 +01:00
e871d19a9f docs: wording 2025-12-18 14:34:44 +01:00
0ec562e0d2 docs: add bookmarks on the roadmap 2025-12-18 14:32:59 +01:00
276d177c4d docs: forgotten word 2025-12-18 13:50:45 +01:00
ed0076a34e docs: link to file 2025-12-18 13:50:18 +01:00
e99defc533 docs: update readme 2025-12-18 13:48:26 +01:00
50288e9130 feat: run_medias() is now implemented 2025-12-18 13:48:22 +01:00
c7ef63cc05 feat: media post deletion module 2025-12-18 13:48:13 +01:00
2395f60d11 chore: clean pyproject.toml 2025-12-18 13:47:39 +01:00
edba17e9a3 refactor: unify docstrings 2025-12-18 13:14:27 +01:00
e3da6c4f12 fix: restore final message 2025-12-18 13:09:12 +01:00
debf55577d refactor: use direct registry export and migrate methods to public scope 2025-12-18 13:07:16 +01:00
2efe83650b refactor: use a single registry object 2025-12-18 13:05:31 +01:00
1c7a903131 docs: update roadmap 2025-12-18 13:02:43 +01:00
472b828f72 feat: add post deletion module 2025-12-18 13:02:18 +01:00
8090a3432c feat: add is_logged method 2025-12-14 17:34:58 +01:00
8b406f5d4e docs: update roadmap 2025-12-14 17:15:50 +01:00
053bb8696f feat: add a method to load configuration 2025-12-14 17:15:31 +01:00
a31df05bb8 feat: define authentication logic to at protocol 2025-12-14 17:15:22 +01:00
e402e844c9 feat: add entrypoint 2025-12-14 16:56:13 +01:00
75d29a2c0d feat: use our newly built registry 2025-12-14 16:53:24 +01:00
11f0bce116 feat: switch to use a command registry pattern 2025-12-14 16:53:17 +01:00
6c424c6135 revert: don't cipher password in config 2025-12-14 16:48:51 +01:00
1b8eb4abd4 docs: update readme 2025-12-14 16:48:42 +01:00
4d530a5a65 build: update uv.lock 2025-12-14 16:48:38 +01:00
ac21ad39b1 clean: remove dep 2025-12-14 16:48:32 +01:00
14 changed files with 692 additions and 105 deletions

View File

@@ -1,6 +1,6 @@
# Skywipe # Skywipe
Skywipe is a work-in-progress Python 3.13+ CLI that helps you wipe data from your Bluesky account using the AT Protocol. Skywipe is a work-in-progress Python 3.13+ CLI that helps you wipe data from your Bluesky account using the AT Protocol SDK.
## Warning ## Warning
@@ -8,9 +8,9 @@ This tool performs destructive operations. Only use it if you intend to erase da
## Requirements ## Requirements
We're using [`uv`](https://github.com/astral-sh/uv) for dependency and virtual environment management. Check [pyproject.toml](pyproject.toml).
You can setup the project (aka create a virtual environment and install dependencies) with : You can use `uv` to install dependencies:
```bash ```bash
git clone https://git.kharec.info/Kharec/skywipe.git git clone https://git.kharec.info/Kharec/skywipe.git
@@ -20,37 +20,27 @@ uv sync
## How to run ## How to run
When installation will be worked out, you'll be able to :
```bash
skywipe all # target everything
skywipe configure # create configuration
skywipe posts # only posts
skywipe medias # only posts with medias
skywipe likes # only likes
skywipe reposts # only reposts
skywipe follows # only follows
```
While it's being developed, you can use the tool using `uv` : While it's being developed, you can use the tool using `uv` :
```bash ```bash
uv run main.py all # target everything uv run python -m skywipe.cli all # target everything
uv run main.py configure # create configuration uv run python -m skywipe.cli configure # create configuration
uv run main.py posts # only posts uv run python -m skywipe.cli posts # delete posts
uv run main.py medias # only posts with medias uv run python -m skywipe.cli medias # delete posts with medias
uv run main.py likes # only likes uv run python -m skywipe.cli likes # undo likes
uv run main.py reposts # only reposts uv run python -m skywipe.cli reposts # undo reposts
uv run main.py follows # only follows uv run python -m skywipe.cli quotes # delete quotes
uv run python -m skywipe.cli follows # unfollow all
uv run python -m skywipe.cli bookmarks # delete bookmarks
``` ```
### Configuration ## Configuration
If you run the tool for the first time, it will prompt you to use `skywipe configure` to create the configuration file, which is located in `~/.config/skywipe/config.yml` : If you run the tool for the first time, it will prompt you to use `skywipe configure` to create the configuration file, which is located in `~/.config/skywipe/config.yml` :
```yaml ```yaml
handle: your_handle handle: your_handle
password: your_password_encrypted password: your_password
batch_size: 10 batch_size: 10
delay: 1 delay: 1
verbose: true verbose: true
@@ -60,17 +50,20 @@ BE SURE TO USE A [BLUESKY APP PASSWORD](https://blueskyfeeds.com/faq-app-passwor
## Roadmap ## Roadmap
- [ ] build cli parameter management - [x] build cli parameter management
- [ ] handle configuration logic - [x] handle configuration logic
- [ ] sign in to at protocol - [x] sign in to at protocol
- [ ] delete posts in groups - [x] delete posts in batch
- [ ] only delete posts with media - [x] only delete posts with media
- [ ] remove likes - [x] undo likes
- [ ] remove reposts - [x] undo reposts
- [ ] unfollow accounts - [x] delete quotes
- [ ] make `all` run the other commands - [x] unfollow accounts
- [x] remove bookmarks
- [x] make `all` run the other commands
- [ ] add simple progress and logging - [ ] add simple progress and logging
- [ ] add safeguards like confirmations and clear dry-run info - [ ] add safeguards (confirmation, dry-run flag)
- [ ] decent code architecture
- [ ] installation and run process - [ ] installation and run process
## License ## License

View File

@@ -1,11 +1,10 @@
[project] [project]
name = "skywipe" name = "skywipe"
version = "0.1.0" version = "0.1.0"
description = "Clean your bluesky account with style" description = "Clean your bluesky account"
readme = "README.md" readme = "README.md"
requires-python = ">=3.13" requires-python = ">=3.13"
dependencies = [ dependencies = ["atproto>=0.0.65", "pyyaml>=6.0"]
"atproto>=0.0.65",
"pyyaml>=6.0", [project.scripts]
"cryptography>=42.0.0", skywipe = "skywipe.cli:main"
]

28
skywipe/auth.py Normal file
View File

@@ -0,0 +1,28 @@
"""Authentication module for Skywipe"""
from atproto import Client
from .configure import Configuration
class Auth:
def __init__(self):
self.config = Configuration()
self.client = None
def login(self) -> Client:
config_data = self.config.load()
handle = config_data.get("handle")
password = config_data.get("password")
if not handle or not password:
raise ValueError(
"handle and password must be set in configuration")
self.client = Client()
self.client.login(handle, password)
return self.client
def is_logged(self) -> bool:
return bool(getattr(self.client, "me", None))

75
skywipe/bookmarks.py Normal file
View File

@@ -0,0 +1,75 @@
"""Bookmark deletion module for Skywipe"""
import time
from atproto import models
from .auth import Auth
from .configure import Configuration
def delete_bookmarks():
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting bookmark deletion with batch_size={batch_size}, delay={delay}s")
cursor = None
total_deleted = 0
while True:
get_params = models.AppBskyBookmarkGetBookmarks.Params(
limit=batch_size,
cursor=cursor
)
response = client.app.bsky.bookmark.get_bookmarks(params=get_params)
bookmarks = response.bookmarks
if not bookmarks:
break
for bookmark in bookmarks:
try:
bookmark_uri = None
if hasattr(bookmark, "uri"):
bookmark_uri = bookmark.uri
else:
for attr_name in ("subject", "record", "post", "item"):
if hasattr(bookmark, attr_name):
nested = getattr(bookmark, attr_name)
if hasattr(nested, "uri"):
bookmark_uri = nested.uri
break
if not bookmark_uri:
if verbose:
print(f"Skipping bookmark: unable to find uri")
continue
delete_data = models.AppBskyBookmarkDeleteBookmark.Data(
uri=bookmark_uri
)
client.app.bsky.bookmark.delete_bookmark(data=delete_data)
total_deleted += 1
if verbose:
print(f"Deleted bookmark: {bookmark_uri}")
except Exception as e:
bookmark_uri = getattr(bookmark, "uri", "unknown")
if verbose:
print(f"Error deleting bookmark {bookmark_uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
print(f"Deleted {total_deleted} bookmarks.")

View File

@@ -3,22 +3,13 @@
import sys import sys
import argparse import argparse
from skywipe.commands import run_configure from .commands import registry
from skywipe.configure import Configuration from .configure import Configuration
COMMANDS = { def create_parser():
"all": "target everything", commands = registry.get_all_commands()
"configure": "create configuration",
"posts": "only posts",
"medias": "only posts with medias",
"likes": "only likes",
"reposts": "only reposts",
"follows": "only follows",
}
def _create_parser():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="Clean your bluesky account with style.", description="Clean your bluesky account with style.",
epilog="WARNING: This tool performs destructive operations. Only use it if you intend to erase data from your Bluesky account." epilog="WARNING: This tool performs destructive operations. Only use it if you intend to erase data from your Bluesky account."
@@ -31,17 +22,13 @@ def _create_parser():
required=True required=True
) )
for cmd, help_text in COMMANDS.items(): for cmd, help_text in commands.items():
subparsers.add_parser(cmd, help=help_text) subparsers.add_parser(cmd, help=help_text)
return parser return parser
def _check_config_required(command: str) -> bool: def require_config():
return command != "configure"
def _require_config():
config = Configuration() config = Configuration()
if not config.exists(): if not config.exists():
print("Error: Configuration file not found.") print("Error: Configuration file not found.")
@@ -50,16 +37,17 @@ def _require_config():
def main(): def main():
parser = _create_parser() parser = create_parser()
args = parser.parse_args() args = parser.parse_args()
if _check_config_required(args.command): if registry.requires_config(args.command):
_require_config() require_config()
if args.command == "configure": try:
run_configure() registry.execute(args.command)
else: except ValueError as e:
print(f"Command '{args.command}' is not yet implemented.") print(f"Error: {e}")
sys.exit(1)
if __name__ == '__main__': if __name__ == '__main__':

View File

@@ -1,8 +1,110 @@
"""Command implementations for Skywipe CLI.""" """Command implementations for Skywipe"""
from skywipe.configure import Configuration from typing import Callable, Dict, Optional
from .configure import Configuration
from .posts import delete_all_posts
from .medias import delete_posts_with_medias
from .likes import undo_likes
from .reposts import undo_reposts
from .quotes import delete_quotes_posts
from .follows import unfollow_all
from .bookmarks import delete_bookmarks
CommandHandler = Callable[[], None]
class CommandRegistry:
def __init__(self):
self._commands = {}
self._help_texts = {}
self._requires_config = {}
def register(
self,
name: str,
handler: CommandHandler,
help_text: str,
requires_config: bool = True
):
self._commands[name] = handler
self._help_texts[name] = help_text
self._requires_config[name] = requires_config
def get_handler(self, name: str) -> Optional[CommandHandler]:
return self._commands.get(name)
def get_help_text(self, name: str) -> Optional[str]:
return self._help_texts.get(name)
def requires_config(self, name: str) -> bool:
return self._requires_config.get(name, True)
def get_all_commands(self) -> Dict[str, str]:
return self._help_texts.copy()
def execute(self, name: str):
handler = self.get_handler(name)
if handler:
handler()
else:
raise ValueError(f"Unknown command: {name}")
registry = CommandRegistry()
def run_configure(): def run_configure():
config = Configuration() config = Configuration()
config.create() config.create()
def run_posts():
delete_all_posts()
def run_medias():
delete_posts_with_medias()
def run_likes():
undo_likes()
def run_reposts():
undo_reposts()
def run_quotes():
delete_quotes_posts()
def run_follows():
unfollow_all()
def run_bookmarks():
delete_bookmarks()
def run_all():
commands = ["posts", "likes", "reposts", "follows", "bookmarks"]
for cmd in commands:
try:
registry.execute(cmd)
except Exception as e:
print(f"Error running '{cmd}': {e}")
continue
registry.register("configure", run_configure,
"create configuration", requires_config=False)
registry.register("posts", run_posts, "only posts")
registry.register("medias", run_medias, "only posts with medias")
registry.register("likes", run_likes, "only likes")
registry.register("reposts", run_reposts, "only reposts")
registry.register("quotes", run_quotes, "only quotes")
registry.register("follows", run_follows, "only follows")
registry.register("bookmarks", run_bookmarks, "only bookmarks")
registry.register("all", run_all, "target everything")

View File

@@ -1,48 +1,17 @@
"""Core configuration handling class and related logic.""" """Core configuration module for Skywipe"""
import os
import getpass import getpass
from pathlib import Path from pathlib import Path
import yaml import yaml
from cryptography.fernet import Fernet
class Configuration: class Configuration:
def __init__(self): def __init__(self):
self.config_file = Path.home() / ".config" / "skywipe" / "config.yml" self.config_file = Path.home() / ".config" / "skywipe" / "config.yml"
self.key_file = Path.home() / ".config" / "skywipe" / ".key"
self._fernet = None
def exists(self) -> bool: def exists(self) -> bool:
return self.config_file.exists() return self.config_file.exists()
def _get_or_create_key(self) -> bytes:
if self.key_file.exists():
return self.key_file.read_bytes()
key = Fernet.generate_key()
config_dir = self.key_file.parent
config_dir.mkdir(parents=True, exist_ok=True)
self.key_file.write_bytes(key)
os.chmod(self.key_file, 0o600)
return key
def _get_fernet(self) -> Fernet:
if self._fernet is None:
key = self._get_or_create_key()
self._fernet = Fernet(key)
return self._fernet
def encrypt_password(self, password: str) -> str:
fernet = self._get_fernet()
encrypted = fernet.encrypt(password.encode())
return encrypted.decode()
def decrypt_password(self, encrypted_password: str) -> str:
fernet = self._get_fernet()
decrypted = fernet.decrypt(encrypted_password.encode())
return decrypted.decode()
def create(self): def create(self):
if self.exists(): if self.exists():
overwrite = input( overwrite = input(
@@ -71,11 +40,9 @@ class Configuration:
print("Error: batch_size and delay must be integers") print("Error: batch_size and delay must be integers")
return return
encrypted_password = self.encrypt_password(password)
config_data = { config_data = {
"handle": handle, "handle": handle,
"password": encrypted_password, "password": password,
"batch_size": batch_size, "batch_size": batch_size,
"delay": delay, "delay": delay,
"verbose": verbose "verbose": verbose
@@ -85,3 +52,10 @@ class Configuration:
yaml.dump(config_data, f, default_flow_style=False) yaml.dump(config_data, f, default_flow_style=False)
print(f"\nConfiguration saved to {self.config_file}") print(f"\nConfiguration saved to {self.config_file}")
def load(self) -> dict:
if not self.exists():
raise FileNotFoundError(
f"Configuration file not found: {self.config_file}")
with open(self.config_file, "r") as f:
return yaml.safe_load(f)

69
skywipe/follows.py Normal file
View File

@@ -0,0 +1,69 @@
"""Follow undoing module for Skywipe"""
import time
from atproto import models
from .auth import Auth
from .configure import Configuration
FOLLOW_COLLECTION = "app.bsky.graph.follow"
def unfollow_all():
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting unfollow operation with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_unfollowed = 0
while True:
list_params = models.ComAtprotoRepoListRecords.Params(
repo=did,
collection=FOLLOW_COLLECTION,
limit=batch_size,
cursor=cursor
)
response = client.com.atproto.repo.list_records(params=list_params)
records = response.records
if not records:
break
for record in records:
try:
record_uri = record.uri
rkey = record_uri.rsplit("/", 1)[-1]
delete_data = {
"repo": did,
"collection": FOLLOW_COLLECTION,
"rkey": rkey
}
client.com.atproto.repo.delete_record(data=delete_data)
total_unfollowed += 1
if verbose:
print(f"Unfollowed: {record_uri}")
except Exception as e:
record_uri = getattr(record, "uri", "unknown")
if verbose:
print(f"Error unfollowing {record_uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
print(f"Unfollowed {total_unfollowed} accounts.")

69
skywipe/likes.py Normal file
View File

@@ -0,0 +1,69 @@
"""Like undoing module for Skywipe"""
import time
from atproto import models
from .auth import Auth
from .configure import Configuration
LIKE_COLLECTION = "app.bsky.feed.like"
def undo_likes():
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting like deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_undone = 0
while True:
list_params = models.ComAtprotoRepoListRecords.Params(
repo=did,
collection=LIKE_COLLECTION,
limit=batch_size,
cursor=cursor
)
response = client.com.atproto.repo.list_records(params=list_params)
records = response.records
if not records:
break
for record in records:
try:
record_uri = record.uri
rkey = record_uri.rsplit("/", 1)[-1]
delete_data = {
"repo": did,
"collection": LIKE_COLLECTION,
"rkey": rkey
}
client.com.atproto.repo.delete_record(data=delete_data)
total_undone += 1
if verbose:
print(f"Undone like: {record_uri}")
except Exception as e:
record_uri = getattr(record, "uri", "unknown")
if verbose:
print(f"Error undoing like {record_uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
print(f"Undone {total_undone} likes.")

92
skywipe/medias.py Normal file
View File

@@ -0,0 +1,92 @@
"""Media post deletion module for Skywipe"""
import time
from .auth import Auth
from .configure import Configuration
def delete_posts_with_medias():
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting media post deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_deleted = 0
while True:
response = client.get_author_feed(
actor=did, limit=batch_size, cursor=cursor)
posts = response.feed
if not posts:
break
for post in posts:
post_record = post.post
embed = getattr(post_record, 'embed', None)
has_media = False
if embed:
embed_type = getattr(embed, 'py_type', None)
media_types = {
'app.bsky.embed.images',
'app.bsky.embed.video',
'app.bsky.embed.external'
}
if embed_type:
embed_type_base = embed_type.split('#')[0]
if embed_type_base in media_types:
has_media = True
if not has_media and embed_type_base in ('app.bsky.embed.recordWithMedia', 'app.bsky.embed.record_with_media'):
media = getattr(embed, 'media', None)
if media:
media_type = getattr(media, 'py_type', None)
if media_type:
media_type_base = media_type.split('#')[0]
if media_type_base in media_types:
has_media = True
if not has_media:
for attr in ('images', 'video', 'external'):
if hasattr(embed, attr):
has_media = True
break
if isinstance(embed, dict) and embed.get(attr):
has_media = True
break
if not has_media:
if verbose:
print(f"Skipping post without media: {post_record.uri}")
continue
try:
client.delete_post(post_record.uri)
total_deleted += 1
if verbose:
print(f"Deleted post with media: {post_record.uri}")
except Exception as e:
if verbose:
print(f"Error deleting post {post_record.uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
print(f"Deleted {total_deleted} posts with media.")

56
skywipe/posts.py Normal file
View File

@@ -0,0 +1,56 @@
"""Post deletion module for Skywipe"""
import time
from .auth import Auth
from .configure import Configuration
def delete_all_posts():
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting post deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_deleted = 0
while True:
if cursor:
response = client.get_author_feed(
actor=did, limit=batch_size, cursor=cursor)
else:
response = client.get_author_feed(actor=did, limit=batch_size)
posts = response.feed
if not posts:
break
post_uris = [post.post.uri for post in posts]
for uri in post_uris:
try:
client.delete_post(uri)
total_deleted += 1
if verbose:
print(f"Deleted post: {uri}")
except Exception as e:
if verbose:
print(f"Error deleting post {uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
print(f"Deleted {total_deleted} posts.")

75
skywipe/quotes.py Normal file
View File

@@ -0,0 +1,75 @@
"""Quote post deletion module for Skywipe"""
import time
from .auth import Auth
from .configure import Configuration
def delete_quotes_posts():
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting quote post deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_deleted = 0
while True:
response = client.get_author_feed(
actor=did, limit=batch_size, cursor=cursor)
posts = response.feed
if not posts:
break
for post in posts:
post_record = post.post
embed = getattr(post_record, 'embed', None)
has_quote = False
if embed:
embed_type = getattr(embed, 'py_type', None)
if embed_type:
embed_type_base = embed_type.split('#')[0]
quote_types = {
'app.bsky.embed.record',
'app.bsky.embed.recordWithMedia',
'app.bsky.embed.record_with_media'
}
if embed_type_base in quote_types:
has_quote = True
if not has_quote and (hasattr(embed, 'record') or (isinstance(embed, dict) and embed.get('record'))):
has_quote = True
if not has_quote:
if verbose:
print(f"Skipping post without quote: {post_record.uri}")
continue
try:
client.delete_post(post_record.uri)
total_deleted += 1
if verbose:
print(f"Deleted quote post: {post_record.uri}")
except Exception as e:
if verbose:
print(f"Error deleting quote post {post_record.uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
print(f"Deleted {total_deleted} quote posts.")

69
skywipe/reposts.py Normal file
View File

@@ -0,0 +1,69 @@
"""Repost undoing module for Skywipe"""
import time
from atproto import models
from .auth import Auth
from .configure import Configuration
REPOST_COLLECTION = "app.bsky.feed.repost"
def undo_reposts():
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting repost deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_undone = 0
while True:
list_params = models.ComAtprotoRepoListRecords.Params(
repo=did,
collection=REPOST_COLLECTION,
limit=batch_size,
cursor=cursor
)
response = client.com.atproto.repo.list_records(params=list_params)
records = response.records
if not records:
break
for record in records:
try:
record_uri = record.uri
rkey = record_uri.rsplit("/", 1)[-1]
delete_data = {
"repo": did,
"collection": REPOST_COLLECTION,
"rkey": rkey
}
client.com.atproto.repo.delete_record(data=delete_data)
total_undone += 1
if verbose:
print(f"Undone repost: {record_uri}")
except Exception as e:
record_uri = getattr(record, "uri", "unknown")
if verbose:
print(f"Error undoing repost {record_uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
print(f"Undone {total_undone} reposts.")

2
uv.lock generated
View File

@@ -381,14 +381,12 @@ version = "0.1.0"
source = { virtual = "." } source = { virtual = "." }
dependencies = [ dependencies = [
{ name = "atproto" }, { name = "atproto" },
{ name = "cryptography" },
{ name = "pyyaml" }, { name = "pyyaml" },
] ]
[package.metadata] [package.metadata]
requires-dist = [ requires-dist = [
{ name = "atproto", specifier = ">=0.0.65" }, { name = "atproto", specifier = ">=0.0.65" },
{ name = "cryptography", specifier = ">=42.0.0" },
{ name = "pyyaml", specifier = ">=6.0" }, { name = "pyyaml", specifier = ">=6.0" },
] ]