Compare commits

..

65 Commits

Author SHA1 Message Date
b336991d67 feat: add -v|--version flag 2025-12-20 23:05:42 +01:00
62c058e9ee feat: add version 2025-12-20 23:05:20 +01:00
35c8b5b8d1 refactor: use NamedTuple for validation results 2025-12-20 22:36:51 +01:00
54c3353667 refactor: simplify nested try-except blocks 2025-12-20 22:35:01 +01:00
93a124be2a clean: remove useless declarations 2025-12-20 22:33:54 +01:00
ecbee7a8ac refactor: pass logger instead of multiple get_logger() calls 2025-12-20 22:30:57 +01:00
eaf4e94d24 refactor: pass logger parameter and use error handling helper 2025-12-20 22:30:47 +01:00
97e166d5f7 refactor: make logger parameter optional 2025-12-20 22:30:33 +01:00
61e2d7f731 refactor: add handle_error() helper for centralized error handling 2025-12-20 22:30:21 +01:00
054dc01813 fix: ensure logger always writes to log file 2025-12-20 22:09:25 +01:00
799b1083ab refactor: add hacking at the bottom 2025-12-20 21:56:57 +01:00
15db235fe1 docs: update hacking section with example 2025-12-20 21:48:57 +01:00
d09dcf06cf docs: ready to fly 2025-12-20 21:47:36 +01:00
5b9589794e clean: shorter epilog 2025-12-20 21:46:16 +01:00
93b88917df refactor: consolidate all command metadata into a single structure 2025-12-20 21:26:11 +01:00
25618ab5bf refactor: simplify logger 2025-12-20 21:24:02 +01:00
81fa68ed08 fix: run commands in right order if all 2025-12-20 21:11:45 +01:00
ca6eaed146 feat: improve confirmation message handling 2025-12-20 21:10:00 +01:00
1b8b32027c feat: dependency injection to allow reusing an existing auth client 2025-12-20 21:07:50 +01:00
a6190aeb84 feat: update run_all to reuse auth and config 2025-12-20 21:07:43 +01:00
887169e7d2 feat: add some validation 2025-12-20 21:04:09 +01:00
9565e4008e feat: consistent error handling 2025-12-20 20:59:18 +01:00
c2aab71955 refactor: add basestrategy and make all class inherits to get rid of get_cursor() 2025-12-20 20:56:22 +01:00
97dd55981b refactor: use a factory pattern 2025-12-20 20:54:52 +01:00
0e91c95e9b refactor: extract common embed checking logic 2025-12-20 20:51:45 +01:00
a818df4a6c refactor: replace hardcoded tuple with constants (consistency+readability) 2025-12-20 20:50:06 +01:00
9aec57bd56 fix: logger's already setup 2025-12-20 20:48:28 +01:00
ec9943822c fix: replace hardcoded list with a registry-derived list 2025-12-20 20:47:15 +01:00
0240ff9f8e clean: remove duplicate run_operation, dead code now 2025-12-20 20:44:42 +01:00
45e2e1eb00 clean: remove is_logged() (not used) 2025-12-20 20:44:29 +01:00
ff20228fa6 docs: update roadmap 2025-12-20 17:25:02 +01:00
9d9c09d56a refactor: use our new Operation and PostAnalyzer tools 2025-12-20 17:24:29 +01:00
64355fbeeb clean: remove unused import 2025-12-20 17:24:15 +01:00
4a337e6b20 feat: add post analysis utilities 2025-12-20 17:24:05 +01:00
f27be4d603 refactor: add OperationContext and Operations classes 2025-12-20 17:23:54 +01:00
9d254ac4b7 refactor: delete unitary "actions" py files 2025-12-20 17:23:35 +01:00
590cc03ba4 docs: refactor warning 2025-12-20 16:03:39 +01:00
c493a99860 docs: inform about log 2025-12-20 16:01:07 +01:00
3f9ef6527f docs: update roadmap 2025-12-20 16:00:03 +01:00
f53e5bb527 feat: implement our safeguard in commands 2025-12-20 15:59:54 +01:00
f2854a0df5 feat: implement safeguard and relocate log file 2025-12-20 15:59:41 +01:00
defd991006 feat: new safeguard module 2025-12-20 15:59:17 +01:00
f922e3cf9d docs: update roadmap 2025-12-20 15:46:48 +01:00
06d70edaf1 feat: use our new logger 2025-12-20 15:46:33 +01:00
58ab6cfafa feat: implement proper logging 2025-12-20 15:46:22 +01:00
dba06e642a docs: update roadmap 2025-12-19 14:39:15 +01:00
5868c1649b docs: update readme 2025-12-19 14:35:25 +01:00
a14184cddc feat: run all 2025-12-19 14:35:21 +01:00
6587f8c39c feat: implement bookmark deletion module 2025-12-19 14:35:05 +01:00
ae6663572c refactor: inline has_quote_embed 2025-12-19 14:34:42 +01:00
3eb456e999 refactor: delete_posts is now delete_all_posts 2025-12-19 14:34:25 +01:00
cfa5773e62 refactor: inline has_media_embed 2025-12-19 14:34:02 +01:00
ddee2a6029 feat: implement follow undoing module 2025-12-19 14:33:43 +01:00
5e2b4f3408 docs: update readme 2025-12-19 14:10:37 +01:00
c238278df6 feat: add quote post deletion 2025-12-19 14:10:33 +01:00
a9c25c8c10 feat: implemented delete_quotes() 2025-12-19 14:10:25 +01:00
5ff25b3eb6 docs: update readme 2025-12-19 12:51:30 +01:00
c396ba8ae9 feat: create repost undoing module 2025-12-19 12:51:25 +01:00
d12f14a994 feat: implemented undo_reposts() 2025-12-19 12:51:03 +01:00
a4b622bfd3 docs: update chips 2025-12-19 10:11:50 +01:00
005c76119f docs: update readme 2025-12-18 16:29:44 +01:00
b1e2b266f4 refactor: switch to relative imports 2025-12-18 16:08:15 +01:00
6475a117e7 build: update entrypoint 2025-12-18 16:06:16 +01:00
28a193078a docs: update readme 2025-12-18 16:06:09 +01:00
07bbe88784 refactor: move the entrypoint inside package 2025-12-18 16:06:04 +01:00
15 changed files with 796 additions and 383 deletions

View File

@@ -1,54 +1,34 @@
# Skywipe # Skywipe
Skywipe is a work-in-progress Python 3.13+ CLI that helps you wipe data from your Bluesky account using the AT Protocol SDK. Skywipe is a Python 3.13+ CLI that helps you wipe data from your Bluesky account using the AT Protocol SDK.
## Warning **Warning:** This tool performs _**destructive operations**_.
This tool performs destructive operations. Only use it if you intend to erase data from your Bluesky account. Only use it if you intend to permanently erase data from your Bluesky account.
## Requirements ## Requirements
Check [pyproject.toml](pyproject.toml). Python 3.13+.
You can use `uv` to install dependencies: The rest of the dependencies are listed in [pyproject.toml](pyproject.toml).
## Installation
Use [`pipx`](https://pipx.pypa.io/latest/installation/) to install `skywipe`:
```bash ```bash
git clone https://git.kharec.info/Kharec/skywipe.git pipx install git+https://git.kharec.info/Kharec/skywipe.git
cd skywipe
uv sync
``` ```
## How to run Run the tool and see available commands with:
When installation will be worked out, you'll be able to :
```bash ```bash
skywipe all # target everything skywipe -h
skywipe configure # create configuration
skywipe posts # delete posts
skywipe medias # delete posts with medias
skywipe likes # undo likes
skywipe reposts # undo reposts
skywipe quotes # delete quotes
skywipe follows # unfollow all
skywipe bookmarks # delete bookmarks
``` ```
While it's being developed, you can use the tool using `uv` : Use the `--yes` flag to skip the confirmation prompt and proceed with the operation. A log of the operations will be saved in `~/.cache/skywipe/skywipe.log`.
```bash ## Configuration
uv run main.py all # target everything
uv run main.py configure # create configuration
uv run main.py posts # delete posts
uv run main.py medias # delete posts with medias
uv run main.py likes # undo likes
uv run main.py reposts # undo reposts
uv run main.py quotes # delete quotes
uv run main.py follows # unfollow all
uv run main.py bookmarks # delete bookmarks
```
### Configuration
If you run the tool for the first time, it will prompt you to use `skywipe configure` to create the configuration file, which is located in `~/.config/skywipe/config.yml` : If you run the tool for the first time, it will prompt you to use `skywipe configure` to create the configuration file, which is located in `~/.config/skywipe/config.yml` :
@@ -62,26 +42,25 @@ verbose: true
BE SURE TO USE A [BLUESKY APP PASSWORD](https://blueskyfeeds.com/faq-app-password) FOR OBVIOUS SECURITY REASONS. BE SURE TO USE A [BLUESKY APP PASSWORD](https://blueskyfeeds.com/faq-app-password) FOR OBVIOUS SECURITY REASONS.
## Roadmap ## Hacking
- [x] build cli parameter management You can use `uv` to install dependencies:
- [x] handle configuration logic
- [x] sign in to at protocol
- [x] delete posts in batch
- [x] only delete posts with media
- [x] undo likes
- [ ] undo reposts
- [ ] delete quotes
- [ ] unfollow accounts
- [ ] remove bookmarks
- [ ] make `all` run the other commands
- [ ] add simple progress and logging
- [ ] add safeguards like confirmations and clear dry-run info
Once it's done, we'll think: ```bash
git clone https://git.kharec.info/Kharec/skywipe.git
cd skywipe
uv sync
```
- [ ] decent code architecture Then start coding.
- [ ] installation and run process
If you want to test your changes, you can run the tool with:
```bash
uv run python -m skywipe.cli -h
uv run python -m skywipe.cli all
# or any other command
```
## License ## License

54
main.py
View File

@@ -1,54 +0,0 @@
"""Main entry point for Skywipe"""
import sys
import argparse
from skywipe.commands import registry
from skywipe.configure import Configuration
def create_parser():
commands = registry.get_all_commands()
parser = argparse.ArgumentParser(
description="Clean your bluesky account with style.",
epilog="WARNING: This tool performs destructive operations. Only use it if you intend to erase data from your Bluesky account."
)
subparsers = parser.add_subparsers(
dest="command",
help="Command to execute",
metavar="COMMAND",
required=True
)
for cmd, help_text in commands.items():
subparsers.add_parser(cmd, help=help_text)
return parser
def require_config():
config = Configuration()
if not config.exists():
print("Error: Configuration file not found.")
print("You must run 'skywipe configure' first.")
sys.exit(1)
def main():
parser = create_parser()
args = parser.parse_args()
if registry.requires_config(args.command):
require_config()
try:
registry.execute(args.command)
except ValueError as e:
print(f"Error: {e}")
sys.exit(1)
if __name__ == '__main__':
main()

View File

@@ -7,4 +7,4 @@ requires-python = ">=3.13"
dependencies = ["atproto>=0.0.65", "pyyaml>=6.0"] dependencies = ["atproto>=0.0.65", "pyyaml>=6.0"]
[project.scripts] [project.scripts]
skywipe = "main:main" skywipe = "skywipe.cli:main"

View File

@@ -0,0 +1 @@
__version__ = "0.1.0"

View File

@@ -1,7 +1,7 @@
"""Authentication module for Skywipe""" """Authentication module for Skywipe"""
from atproto import Client from atproto import Client
from skywipe.configure import Configuration from .configure import Configuration
class Auth: class Auth:
@@ -10,7 +10,11 @@ class Auth:
self.client = None self.client = None
def login(self) -> Client: def login(self) -> Client:
try:
config_data = self.config.load() config_data = self.config.load()
except FileNotFoundError as e:
raise ValueError(
"Configuration file not found. Run 'skywipe configure' first.") from e
handle = config_data.get("handle") handle = config_data.get("handle")
password = config_data.get("password") password = config_data.get("password")
@@ -19,10 +23,11 @@ class Auth:
raise ValueError( raise ValueError(
"handle and password must be set in configuration") "handle and password must be set in configuration")
try:
self.client = Client() self.client = Client()
self.client.login(handle, password) self.client.login(handle, password)
except Exception as e:
raise ValueError(
f"Failed to authenticate: {e}") from e
return self.client return self.client
def is_logged(self) -> bool:
return bool(getattr(self.client, "me", None))

79
skywipe/cli.py Normal file
View File

@@ -0,0 +1,79 @@
"""Main entry point for Skywipe"""
import sys
import argparse
from pathlib import Path
from . import __version__
from .commands import registry
from .configure import Configuration
from .logger import setup_logger, get_logger, handle_error
LOG_FILE = Path.home() / ".cache" / "skywipe" / "skywipe.log"
def create_parser():
commands = registry.get_all_commands()
parser = argparse.ArgumentParser(
description="Clean your bluesky account with style.",
epilog="WARNING: This tool deletes your Bluesky data permanently."
)
parser.add_argument(
"-v", "--version",
action="version",
version=f"Skywipe {__version__}"
)
parser.add_argument(
"--yes",
action="store_true",
help="Skip confirmation prompt and proceed with destructive operations"
)
subparsers = parser.add_subparsers(
dest="command",
help="Command to execute",
metavar="COMMAND",
required=True
)
for cmd, help_text in commands.items():
subparsers.add_parser(cmd, help=help_text)
return parser
def require_config(logger):
config = Configuration()
if not config.exists():
logger.error("Configuration file not found.")
logger.error("You must run 'skywipe configure' first.")
sys.exit(1)
def main():
parser = create_parser()
args = parser.parse_args()
setup_logger(verbose=False, log_file=LOG_FILE)
logger = get_logger()
if registry.requires_config(args.command):
require_config(logger)
config = Configuration()
config_data = config.load()
verbose = config_data.get("verbose", False)
setup_logger(verbose=verbose, log_file=LOG_FILE)
try:
registry.execute(
args.command, skip_confirmation=getattr(args, "yes", False))
except (ValueError, Exception) as e:
handle_error(e, logger, exit_on_error=True)
if __name__ == '__main__':
main()

View File

@@ -1,13 +1,84 @@
"""Command implementations for Skywipe""" """Command implementations for Skywipe"""
from typing import Callable, Dict, Optional from typing import Callable, Dict, Optional, Any
from skywipe.configure import Configuration from .configure import Configuration
from skywipe.posts import delete_posts from .operations import Operation
from skywipe.medias import delete_posts_with_medias from .post_analysis import PostAnalyzer
from skywipe.likes import undo_likes from .logger import get_logger, handle_error
from .safeguard import require_confirmation
CommandHandler = Callable[[], None] CommandHandler = Callable[..., None]
COMMAND_METADATA = {
"posts": {
"confirmation": "delete all posts",
"help_text": "only posts",
"operation_name": "Deleting posts",
"strategy_type": "feed",
"collection": None,
"filter_fn": None,
},
"medias": {
"confirmation": "delete all posts with media",
"help_text": "only posts with medias",
"operation_name": "Deleting posts with media",
"strategy_type": "feed",
"collection": None,
"filter_fn": lambda post: PostAnalyzer.has_media(post.post),
},
"likes": {
"confirmation": "undo all likes",
"help_text": "only likes",
"operation_name": "Undoing likes",
"strategy_type": "record",
"collection": "app.bsky.feed.like",
"filter_fn": None,
},
"reposts": {
"confirmation": "undo all reposts",
"help_text": "only reposts",
"operation_name": "Undoing reposts",
"strategy_type": "record",
"collection": "app.bsky.feed.repost",
"filter_fn": None,
},
"quotes": {
"confirmation": "delete all quote posts",
"help_text": "only quotes",
"operation_name": "Deleting quote posts",
"strategy_type": "feed",
"collection": None,
"filter_fn": lambda post: PostAnalyzer.has_quote(post.post),
},
"follows": {
"confirmation": "unfollow all accounts",
"help_text": "only follows",
"operation_name": "Unfollowing accounts",
"strategy_type": "record",
"collection": "app.bsky.graph.follow",
"filter_fn": None,
},
"bookmarks": {
"confirmation": "delete all bookmarks",
"help_text": "only bookmarks",
"operation_name": "Deleting bookmarks",
"strategy_type": "bookmark",
"collection": None,
"filter_fn": None,
},
}
COMMAND_EXECUTION_ORDER = [
"quotes",
"medias",
"posts",
"likes",
"reposts",
"follows",
"bookmarks",
]
class CommandRegistry: class CommandRegistry:
@@ -39,10 +110,13 @@ class CommandRegistry:
def get_all_commands(self) -> Dict[str, str]: def get_all_commands(self) -> Dict[str, str]:
return self._help_texts.copy() return self._help_texts.copy()
def execute(self, name: str): def execute(self, name: str, skip_confirmation: bool = False):
handler = self.get_handler(name) handler = self.get_handler(name)
if handler: if handler:
if name == "configure":
handler() handler()
else:
handler(skip_confirmation)
else: else:
raise ValueError(f"Unknown command: {name}") raise ValueError(f"Unknown command: {name}")
@@ -50,56 +124,103 @@ class CommandRegistry:
registry = CommandRegistry() registry = CommandRegistry()
def _create_operation_handler(
confirmation_message: str,
operation_name: str,
strategy_type: str = "feed",
collection: Optional[str] = None,
filter_fn: Optional[Callable[[Any], bool]] = None
) -> CommandHandler:
logger = get_logger()
def handler(skip_confirmation: bool = False):
require_confirmation(confirmation_message, skip_confirmation, logger)
try:
Operation(
operation_name,
strategy_type=strategy_type,
collection=collection,
filter_fn=filter_fn
).run()
except (ValueError, Exception) as e:
handle_error(e, logger)
return handler
def run_configure(): def run_configure():
config = Configuration() config = Configuration()
config.create() config.create()
def run_posts(): def _create_command_handlers():
delete_posts() handlers = {}
for cmd, metadata in COMMAND_METADATA.items():
handlers[cmd] = _create_operation_handler(
metadata["confirmation"],
metadata["operation_name"],
strategy_type=metadata["strategy_type"],
collection=metadata["collection"],
filter_fn=metadata["filter_fn"]
)
return handlers
def run_medias(): _command_handlers = _create_command_handlers()
delete_posts_with_medias()
def run_likes(): def run_all(skip_confirmation: bool = False):
undo_likes() logger = get_logger()
all_commands = registry.get_all_commands()
available_commands = [cmd for cmd in all_commands.keys()
if cmd not in ("configure", "all")]
def run_reposts(): commands = [cmd for cmd in COMMAND_EXECUTION_ORDER
print("Command 'reposts' is not yet implemented.") if cmd in available_commands]
commands.extend([cmd for cmd in available_commands
if cmd not in COMMAND_EXECUTION_ORDER])
def run_quotes(): commands_str = ", ".join(commands)
print("Command 'quotes' is not yet implemented.") all_confirmation = f"run all cleanup commands ({commands_str})"
require_confirmation(all_confirmation, skip_confirmation, logger)
logger.info("Running all cleanup commands...")
def run_follows(): from .operations import OperationContext
print("Command 'follows' is not yet implemented.") try:
context = OperationContext()
shared_client = context.client
shared_config_data = context.config_data
except Exception as e:
logger.error(
f"Failed to initialize shared context: {e}", exc_info=True)
raise
for cmd in commands:
def run_bookmarks(): try:
print("Command 'bookmarks' is not yet implemented") logger.info(f"Starting command: {cmd}")
metadata = COMMAND_METADATA.get(cmd)
if metadata:
def run_all(): Operation(
registry.execute("posts") metadata["operation_name"],
registry.execute("medias") strategy_type=metadata["strategy_type"],
registry.execute("likes") collection=metadata["collection"],
registry.execute("reposts") filter_fn=metadata["filter_fn"],
registry.execute("quotes") client=shared_client,
registry.execute("follows") config_data=shared_config_data
registry.execute("bookmarks") ).run()
else:
registry.execute(cmd, skip_confirmation=True)
logger.info(f"Completed command: {cmd}")
except Exception as e:
logger.error(f"Error running '{cmd}': {e}", exc_info=True)
continue
logger.info("All commands completed.")
registry.register("configure", run_configure, registry.register("configure", run_configure,
"create configuration", requires_config=False) "create configuration", requires_config=False)
registry.register("posts", run_posts, "only posts") for cmd, metadata in COMMAND_METADATA.items():
registry.register("medias", run_medias, "only posts with medias") registry.register(cmd, _command_handlers[cmd], metadata["help_text"])
registry.register("likes", run_likes, "only likes")
registry.register("reposts", run_reposts, "only reposts")
registry.register("quotes", run_reposts, "only quotes")
registry.register("follows", run_follows, "only follows")
registry.register("bookmarks", run_follows, "only bookmarks")
registry.register("all", run_all, "target everything") registry.register("all", run_all, "target everything")

View File

@@ -1,8 +1,52 @@
"""Core configuration module for Skywipe""" """Core configuration module for Skywipe"""
import getpass import getpass
import re
from pathlib import Path from pathlib import Path
from typing import NamedTuple
import yaml import yaml
from .logger import setup_logger
class ValidationResult(NamedTuple):
is_valid: bool
error_message: str
def _validate_handle(handle: str) -> ValidationResult:
if not handle:
return ValidationResult(False, "Handle cannot be empty")
if len(handle) > 253:
return ValidationResult(False, "Handle is too long (max 253 characters)")
if " " in handle:
return ValidationResult(False, "Handle cannot contain spaces")
handle_pattern = re.compile(
r"^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$|"
r"^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$|"
r"^did:[a-z]+:[a-zA-Z0-9._-]+$"
)
if not handle_pattern.match(handle):
return ValidationResult(False, (
"Invalid handle format. "
"Use a username (e.g., 'alice'), full handle (e.g., 'alice.bsky.social'), "
"or DID (e.g., 'did:plc:...')"
))
return ValidationResult(True, "")
def _validate_password(password: str) -> ValidationResult:
if not password:
return ValidationResult(False, "Password cannot be empty")
if len(password) < 8:
return ValidationResult(False, "Password is too short (minimum 8 characters)")
return ValidationResult(True, "")
class Configuration: class Configuration:
@@ -13,10 +57,12 @@ class Configuration:
return self.config_file.exists() return self.config_file.exists()
def create(self): def create(self):
logger = setup_logger(verbose=False)
if self.exists(): if self.exists():
overwrite = input( overwrite = input(
"Configuration already exists. Overwrite? (y/N): ").strip().lower() "Configuration already exists. Overwrite? (y/N): ").strip().lower()
if overwrite not in ("y", "yes"): if overwrite not in ("y", "yes"):
logger.info("Configuration creation cancelled.")
return return
config_dir = self.config_file.parent config_dir = self.config_file.parent
@@ -24,8 +70,27 @@ class Configuration:
print("Skywipe Configuration") print("Skywipe Configuration")
print("=" * 50) print("=" * 50)
print("Note: You should use an app password from Bluesky settings.")
while True:
handle = input("Bluesky handle: ").strip() handle = input("Bluesky handle: ").strip()
password = getpass.getpass("Bluesky app password: ").strip() is_valid, error_msg = _validate_handle(handle)
if is_valid:
break
logger.error(error_msg)
logger.info("Please enter a valid handle and try again.")
while True:
password = getpass.getpass(
"Bluesky (hopefully app) password: ").strip()
is_valid, error_msg = _validate_password(password)
if is_valid:
break
logger.error(error_msg)
logger.info("Please check your password and try again.")
logger.info(
"Generate an app password at: https://bsky.app/settings/app-passwords")
batch_size = input("Batch size (default: 10): ").strip() or "10" batch_size = input("Batch size (default: 10): ").strip() or "10"
delay = input( delay = input(
"Delay between batches in seconds (default: 1): ").strip() or "1" "Delay between batches in seconds (default: 1): ").strip() or "1"
@@ -35,9 +100,20 @@ class Configuration:
try: try:
batch_size = int(batch_size) batch_size = int(batch_size)
delay = int(delay) if batch_size < 1 or batch_size > 100:
logger.error("batch_size must be between 1 and 100")
return
except ValueError: except ValueError:
print("Error: batch_size and delay must be integers") logger.error("batch_size must be an integer")
return
try:
delay = int(delay)
if delay < 0 or delay > 60:
logger.error("delay must be between 0 and 60 seconds")
return
except ValueError:
logger.error("delay must be an integer")
return return
config_data = { config_data = {
@@ -48,14 +124,28 @@ class Configuration:
"verbose": verbose "verbose": verbose
} }
try:
with open(self.config_file, "w") as f: with open(self.config_file, "w") as f:
yaml.dump(config_data, f, default_flow_style=False) yaml.dump(config_data, f, default_flow_style=False)
except (IOError, OSError) as e:
logger.error(f"Failed to save configuration: {e}")
return
print(f"\nConfiguration saved to {self.config_file}") logger.info(f"Configuration saved to {self.config_file}")
def load(self) -> dict: def load(self) -> dict:
if not self.exists(): if not self.exists():
raise FileNotFoundError( raise FileNotFoundError(
f"Configuration file not found: {self.config_file}") f"Configuration file not found: {self.config_file}")
try:
with open(self.config_file, "r") as f: with open(self.config_file, "r") as f:
return yaml.safe_load(f) config_data = yaml.safe_load(f)
if config_data is None:
raise ValueError("Configuration file is empty or invalid")
return config_data
except (IOError, OSError) as e:
raise ValueError(
f"Failed to read configuration file: {e}") from e
except yaml.YAMLError as e:
raise ValueError(
f"Invalid YAML in configuration file: {e}") from e

View File

@@ -1,69 +0,0 @@
"""Like undoing module for Skywipe"""
import time
from atproto import models
from skywipe.auth import Auth
from skywipe.configure import Configuration
LIKE_COLLECTION = "app.bsky.feed.like"
def undo_likes():
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting like deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_undone = 0
while True:
list_params = models.ComAtprotoRepoListRecords.Params(
repo=did,
collection=LIKE_COLLECTION,
limit=batch_size,
cursor=cursor
)
response = client.com.atproto.repo.list_records(params=list_params)
records = response.records
if not records:
break
for record in records:
try:
record_uri = record.uri
rkey = record_uri.rsplit("/", 1)[-1]
delete_data = {
"repo": did,
"collection": LIKE_COLLECTION,
"rkey": rkey
}
client.com.atproto.repo.delete_record(data=delete_data)
total_undone += 1
if verbose:
print(f"Undone like: {record_uri}")
except Exception as e:
record_uri = getattr(record, "uri", "unknown")
if verbose:
print(f"Error undoing like {record_uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
print(f"Undone {total_undone} likes.")

103
skywipe/logger.py Normal file
View File

@@ -0,0 +1,103 @@
"""Centralized logging module for Skywipe"""
import logging
import sys
from pathlib import Path
from typing import Optional
class ProgressTracker:
def __init__(self, operation: str = "Processing"):
self.current = 0
self.operation = operation
def update(self, count: int = 1):
self.current += count
def batch(self, batch_num: int, batch_size: int, total_batches: Optional[int] = None):
logger = logging.getLogger("skywipe.progress")
if total_batches:
logger.info(
f"{self.operation} - batch {batch_num}/{total_batches} ({batch_size} items)"
)
else:
logger.info(
f"{self.operation} - batch {batch_num} ({batch_size} items)")
class LevelFilter(logging.Filter):
def __init__(self, min_level: int, max_level: int):
super().__init__()
self.min_level = min_level
self.max_level = max_level
def filter(self, record: logging.LogRecord) -> bool:
return self.min_level <= record.levelno <= self.max_level
def setup_logger(verbose: bool = False, log_file: Optional[Path] = None) -> logging.Logger:
logger = logging.getLogger("skywipe")
target_level = logging.DEBUG if verbose else logging.INFO
logger.setLevel(target_level)
info_handler = None
error_handler = None
file_handlers = []
for handler in logger.handlers:
if isinstance(handler, logging.StreamHandler):
if handler.stream == sys.stdout:
info_handler = handler
elif handler.stream == sys.stderr:
error_handler = handler
elif isinstance(handler, logging.FileHandler):
file_handlers.append(handler)
formatter = logging.Formatter(fmt="%(levelname)s: %(message)s")
if info_handler is None:
info_handler = logging.StreamHandler(sys.stdout)
info_handler.addFilter(LevelFilter(logging.DEBUG, logging.INFO))
info_handler.setFormatter(formatter)
logger.addHandler(info_handler)
info_handler.setLevel(target_level)
if error_handler is None:
error_handler = logging.StreamHandler(sys.stderr)
error_handler.setLevel(logging.WARNING)
error_handler.setFormatter(formatter)
logger.addHandler(error_handler)
if log_file:
if not file_handlers:
log_file.parent.mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(log_file, encoding="utf-8")
file_handler.setLevel(logging.DEBUG)
file_formatter = logging.Formatter(
fmt="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
else:
for handler in file_handlers:
handler.close()
logger.removeHandler(handler)
return logger
def get_logger() -> logging.Logger:
return logging.getLogger("skywipe")
def handle_error(error: Exception, logger: logging.Logger, exit_on_error: bool = False) -> None:
if isinstance(error, ValueError):
logger.error(f"{error}")
else:
logger.error(f"Unexpected error: {error}", exc_info=True)
if exit_on_error:
sys.exit(1)
else:
raise error

View File

@@ -1,96 +0,0 @@
"""Media post deletion module for Skywipe"""
import time
from skywipe.auth import Auth
from skywipe.configure import Configuration
def has_media_embed(post_record):
embed = getattr(post_record, 'embed', None)
if not embed:
return False
embed_type = getattr(embed, 'py_type', None)
media_types = {
'app.bsky.embed.images',
'app.bsky.embed.video',
'app.bsky.embed.external'
}
if embed_type:
embed_type_base = embed_type.split('#')[0]
if embed_type_base in media_types:
return True
if embed_type_base in ('app.bsky.embed.recordWithMedia', 'app.bsky.embed.record_with_media'):
media = getattr(embed, 'media', None)
if media:
media_type = getattr(media, 'py_type', None)
if media_type:
media_type_base = media_type.split('#')[0]
if media_type_base in media_types:
return True
for attr in ('images', 'video', 'external'):
if hasattr(embed, attr):
return True
if isinstance(embed, dict) and embed.get(attr):
return True
return False
def delete_posts_with_medias():
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting media post deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_deleted = 0
while True:
if cursor:
response = client.get_author_feed(
actor=did, limit=batch_size, cursor=cursor)
else:
response = client.get_author_feed(actor=did, limit=batch_size)
posts = response.feed
if not posts:
break
for post in posts:
post_record = post.post
if not has_media_embed(post_record):
if verbose:
print(f"Skipping post without media: {post_record.uri}")
continue
try:
client.delete_post(post_record.uri)
total_deleted += 1
if verbose:
print(f"Deleted post with media: {post_record.uri}")
except Exception as e:
if verbose:
print(f"Error deleting post {post_record.uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
print(f"Deleted {total_deleted} posts with media.")

209
skywipe/operations.py Normal file
View File

@@ -0,0 +1,209 @@
"""Shared operation utilities and strategies for Skywipe"""
import time
from typing import Callable, Optional, Any
from atproto import models
from .auth import Auth
from .configure import Configuration
from .logger import get_logger, ProgressTracker
class OperationContext:
def __init__(self, client=None, config_data=None):
self.logger = get_logger()
self.client, self.did = self._initialize_client(client)
self.config_data = self._initialize_config(config_data)
self.batch_size = self.config_data.get("batch_size", 10)
self.delay = self.config_data.get("delay", 1)
def _initialize_client(self, client):
if client is not None:
return client, client.me.did
try:
auth = Auth()
client = auth.login()
return client, client.me.did
except (ValueError, FileNotFoundError) as e:
self.logger.error(f"Configuration error: {e}")
raise
except Exception as e:
self.logger.error(
f"Unexpected error during initialization: {e}", exc_info=True)
raise ValueError(
f"Failed to initialize operation context: {e}") from e
def _initialize_config(self, config_data):
if config_data is not None:
return config_data
try:
config = Configuration()
return config.load()
except (ValueError, FileNotFoundError) as e:
self.logger.error(f"Configuration error: {e}")
raise
except Exception as e:
self.logger.error(
f"Unexpected error loading configuration: {e}", exc_info=True)
raise ValueError(
f"Failed to load configuration: {e}") from e
class BaseStrategy:
def get_cursor(self, response):
return response.cursor
class RecordDeletionStrategy(BaseStrategy):
def __init__(self, collection: str):
self.collection = collection
def fetch(self, context: OperationContext, cursor: Optional[str] = None):
list_params = models.ComAtprotoRepoListRecords.Params(
repo=context.did,
collection=self.collection,
limit=context.batch_size,
cursor=cursor
)
return context.client.com.atproto.repo.list_records(params=list_params)
def extract_items(self, response):
return response.records
def process_item(self, record, context: OperationContext):
record_uri = record.uri
rkey = record_uri.rsplit("/", 1)[-1]
delete_data = {
"repo": context.did,
"collection": self.collection,
"rkey": rkey
}
context.client.com.atproto.repo.delete_record(data=delete_data)
context.logger.debug(f"Deleted: {record_uri}")
class FeedStrategy(BaseStrategy):
def fetch(self, context: OperationContext, cursor: Optional[str] = None):
if cursor:
return context.client.get_author_feed(
actor=context.did, limit=context.batch_size, cursor=cursor
)
return context.client.get_author_feed(actor=context.did, limit=context.batch_size)
def extract_items(self, response):
return response.feed
def process_item(self, post, context: OperationContext):
uri = post.post.uri
context.client.delete_post(uri)
context.logger.debug(f"Deleted post: {uri}")
class BookmarkStrategy(BaseStrategy):
def fetch(self, context: OperationContext, cursor: Optional[str] = None):
get_params = models.AppBskyBookmarkGetBookmarks.Params(
limit=context.batch_size,
cursor=cursor
)
return context.client.app.bsky.bookmark.get_bookmarks(params=get_params)
def extract_items(self, response):
return response.bookmarks
def process_item(self, bookmark, context: OperationContext):
bookmark_uri = self._extract_bookmark_uri(bookmark)
if not bookmark_uri:
raise ValueError("Unable to find bookmark URI")
delete_data = models.AppBskyBookmarkDeleteBookmark.Data(
uri=bookmark_uri)
context.client.app.bsky.bookmark.delete_bookmark(data=delete_data)
context.logger.debug(f"Deleted bookmark: {bookmark_uri}")
def _extract_bookmark_uri(self, bookmark):
if hasattr(bookmark, "uri"):
return bookmark.uri
for attr_name in ("subject", "record", "post", "item"):
if hasattr(bookmark, attr_name):
nested = getattr(bookmark, attr_name)
if hasattr(nested, "uri"):
return nested.uri
return None
class Operation:
def __init__(
self,
operation_name: str,
strategy_type: str = "feed",
collection: Optional[str] = None,
filter_fn: Optional[Callable[[Any], bool]] = None,
client=None,
config_data=None
):
self.operation_name = operation_name
self.filter_fn = filter_fn
self._client = client
self._config_data = config_data
if strategy_type == "record":
if not collection:
raise ValueError("Collection is required for record strategy")
self.strategy = RecordDeletionStrategy(collection)
elif strategy_type == "bookmark":
self.strategy = BookmarkStrategy()
else:
self.strategy = FeedStrategy()
def run(self) -> int:
context = OperationContext(
client=self._client, config_data=self._config_data)
progress = ProgressTracker(operation=self.operation_name)
context.logger.info(
f"Starting {self.operation_name} with batch_size={context.batch_size}, delay={context.delay}s"
)
cursor = None
total_processed = 0
batch_num = 0
while True:
batch_num += 1
try:
response = self.strategy.fetch(context, cursor)
items = self.strategy.extract_items(response)
except Exception as e:
context.logger.error(
f"Error fetching items for batch {batch_num}: {e}", exc_info=True)
break
if not items:
break
progress.batch(batch_num, len(items))
for item in items:
if self.filter_fn and not self.filter_fn(item):
continue
try:
self.strategy.process_item(item, context)
total_processed += 1
progress.update(1)
except Exception as e:
context.logger.error(f"Error processing item: {e}")
cursor = self.strategy.get_cursor(response)
if not cursor:
break
if context.delay > 0:
time.sleep(context.delay)
context.logger.info(
f"{self.operation_name}: {total_processed} items processed.")
return total_processed

70
skywipe/post_analysis.py Normal file
View File

@@ -0,0 +1,70 @@
"""Post analysis utilities for Skywipe"""
class PostAnalyzer:
MEDIA_TYPES = {
'app.bsky.embed.images',
'app.bsky.embed.video',
'app.bsky.embed.external'
}
QUOTE_TYPES = {
'app.bsky.embed.record',
'app.bsky.embed.recordWithMedia',
'app.bsky.embed.record_with_media'
}
QUOTE_WITH_MEDIA_TYPES = {
'app.bsky.embed.recordWithMedia',
'app.bsky.embed.record_with_media'
}
@staticmethod
def _get_embed(post_record):
return getattr(post_record, 'embed', None)
@staticmethod
def _get_embed_type_base(embed):
embed_type = getattr(embed, 'py_type', None)
if embed_type:
return embed_type.split('#')[0]
return None
@staticmethod
def has_media(post_record):
embed = PostAnalyzer._get_embed(post_record)
if not embed:
return False
embed_type_base = PostAnalyzer._get_embed_type_base(embed)
if embed_type_base:
if embed_type_base in PostAnalyzer.MEDIA_TYPES:
return True
if embed_type_base in PostAnalyzer.QUOTE_WITH_MEDIA_TYPES:
media = getattr(embed, 'media', None)
if media:
media_type_base = PostAnalyzer._get_embed_type_base(media)
if media_type_base and media_type_base in PostAnalyzer.MEDIA_TYPES:
return True
for attr in ('images', 'video', 'external'):
if hasattr(embed, attr):
return True
if isinstance(embed, dict) and embed.get(attr):
return True
return False
@staticmethod
def has_quote(post_record):
embed = PostAnalyzer._get_embed(post_record)
if not embed:
return False
embed_type_base = PostAnalyzer._get_embed_type_base(embed)
if embed_type_base and embed_type_base in PostAnalyzer.QUOTE_TYPES:
return True
return (hasattr(embed, 'record') or
(isinstance(embed, dict) and embed.get('record')))

View File

@@ -1,56 +0,0 @@
"""Post deletion module for Skywipe"""
import time
from skywipe.auth import Auth
from skywipe.configure import Configuration
def delete_posts():
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting post deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_deleted = 0
while True:
if cursor:
response = client.get_author_feed(
actor=did, limit=batch_size, cursor=cursor)
else:
response = client.get_author_feed(actor=did, limit=batch_size)
posts = response.feed
if not posts:
break
post_uris = [post.post.uri for post in posts]
for uri in post_uris:
try:
client.delete_post(uri)
total_deleted += 1
if verbose:
print(f"Deleted post: {uri}")
except Exception as e:
if verbose:
print(f"Error deleting post {uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
print(f"Deleted {total_deleted} posts.")

31
skywipe/safeguard.py Normal file
View File

@@ -0,0 +1,31 @@
"""Safeguard module for Skywipe"""
import sys
import logging
from typing import Optional
from .logger import get_logger
CONFIRM_RESPONSES = {"yes", "y"}
def require_confirmation(operation: str, skip_confirmation: bool = False, logger: Optional[logging.Logger] = None) -> None:
if skip_confirmation:
return
if logger is None:
logger = get_logger()
logger.warning(f"This will {operation}")
logger.warning("This operation is DESTRUCTIVE and cannot be undone!")
try:
response = input(
"Are you sure you want to continue? (y/N): ").strip().lower()
except (EOFError, KeyboardInterrupt):
logger.info("\nOperation cancelled.")
sys.exit(0)
if response not in CONFIRM_RESPONSES:
logger.info("Operation cancelled.")
sys.exit(0)