Compare commits

...

4 Commits

4 changed files with 297 additions and 15 deletions

View File

@@ -2,13 +2,8 @@
from typing import Callable, Dict, Optional from typing import Callable, Dict, Optional
from .configure import Configuration from .configure import Configuration
from .posts import delete_all_posts from .operations import Operation
from .medias import delete_posts_with_medias from .post_analysis import PostAnalyzer
from .likes import undo_likes
from .reposts import undo_reposts
from .quotes import delete_quotes_posts
from .follows import unfollow_all
from .bookmarks import delete_bookmarks
from .logger import get_logger from .logger import get_logger
from .safeguard import require_confirmation from .safeguard import require_confirmation
@@ -66,37 +61,42 @@ def run_configure():
def run_posts(skip_confirmation: bool = False): def run_posts(skip_confirmation: bool = False):
require_confirmation("delete all posts", skip_confirmation) require_confirmation("delete all posts", skip_confirmation)
delete_all_posts() Operation("Deleting posts").run()
def run_medias(skip_confirmation: bool = False): def run_medias(skip_confirmation: bool = False):
require_confirmation("delete all posts with media", skip_confirmation) require_confirmation("delete all posts with media", skip_confirmation)
delete_posts_with_medias() Operation("Deleting posts with media",
filter_fn=lambda post: PostAnalyzer.has_media(post.post)).run()
def run_likes(skip_confirmation: bool = False): def run_likes(skip_confirmation: bool = False):
require_confirmation("undo all likes", skip_confirmation) require_confirmation("undo all likes", skip_confirmation)
undo_likes() Operation("Undoing likes", strategy_type="record",
collection="app.bsky.feed.like").run()
def run_reposts(skip_confirmation: bool = False): def run_reposts(skip_confirmation: bool = False):
require_confirmation("undo all reposts", skip_confirmation) require_confirmation("undo all reposts", skip_confirmation)
undo_reposts() Operation("Undoing reposts", strategy_type="record",
collection="app.bsky.feed.repost").run()
def run_quotes(skip_confirmation: bool = False): def run_quotes(skip_confirmation: bool = False):
require_confirmation("delete all quote posts", skip_confirmation) require_confirmation("delete all quote posts", skip_confirmation)
delete_quotes_posts() Operation("Deleting quote posts",
filter_fn=lambda post: PostAnalyzer.has_quote(post.post)).run()
def run_follows(skip_confirmation: bool = False): def run_follows(skip_confirmation: bool = False):
require_confirmation("unfollow all accounts", skip_confirmation) require_confirmation("unfollow all accounts", skip_confirmation)
unfollow_all() Operation("Unfollowing accounts", strategy_type="record",
collection="app.bsky.graph.follow").run()
def run_bookmarks(skip_confirmation: bool = False): def run_bookmarks(skip_confirmation: bool = False):
require_confirmation("delete all bookmarks", skip_confirmation) require_confirmation("delete all bookmarks", skip_confirmation)
delete_bookmarks() Operation("Deleting bookmarks", strategy_type="bookmark").run()
def run_all(skip_confirmation: bool = False): def run_all(skip_confirmation: bool = False):

View File

@@ -3,7 +3,7 @@
import getpass import getpass
from pathlib import Path from pathlib import Path
import yaml import yaml
from .logger import setup_logger, get_logger from .logger import setup_logger
class Configuration: class Configuration:

222
skywipe/operations.py Normal file
View File

@@ -0,0 +1,222 @@
"""Shared operation utilities and strategies for Skywipe"""
import time
from typing import Callable, Optional, Any
from atproto import models
from .auth import Auth
from .configure import Configuration
from .logger import get_logger, ProgressTracker
class OperationContext:
def __init__(self):
self.logger = get_logger()
self.auth = Auth()
self.client = self.auth.login()
self.config = Configuration()
self.config_data = self.config.load()
self.batch_size = self.config_data.get("batch_size", 10)
self.delay = self.config_data.get("delay", 1)
self.did = self.client.me.did
class RecordDeletionStrategy:
def __init__(self, collection: str):
self.collection = collection
def fetch(self, context: OperationContext, cursor: Optional[str] = None):
list_params = models.ComAtprotoRepoListRecords.Params(
repo=context.did,
collection=self.collection,
limit=context.batch_size,
cursor=cursor
)
return context.client.com.atproto.repo.list_records(params=list_params)
def extract_items(self, response):
return response.records
def get_cursor(self, response):
return response.cursor
def process_item(self, record, context: OperationContext):
record_uri = record.uri
rkey = record_uri.rsplit("/", 1)[-1]
delete_data = {
"repo": context.did,
"collection": self.collection,
"rkey": rkey
}
context.client.com.atproto.repo.delete_record(data=delete_data)
context.logger.debug(f"Deleted: {record_uri}")
class FeedStrategy:
def fetch(self, context: OperationContext, cursor: Optional[str] = None):
if cursor:
return context.client.get_author_feed(
actor=context.did, limit=context.batch_size, cursor=cursor
)
return context.client.get_author_feed(actor=context.did, limit=context.batch_size)
def extract_items(self, response):
return response.feed
def get_cursor(self, response):
return response.cursor
def process_item(self, post, context: OperationContext):
uri = post.post.uri
context.client.delete_post(uri)
context.logger.debug(f"Deleted post: {uri}")
class BookmarkStrategy:
def fetch(self, context: OperationContext, cursor: Optional[str] = None):
get_params = models.AppBskyBookmarkGetBookmarks.Params(
limit=context.batch_size,
cursor=cursor
)
return context.client.app.bsky.bookmark.get_bookmarks(params=get_params)
def extract_items(self, response):
return response.bookmarks
def get_cursor(self, response):
return response.cursor
def process_item(self, bookmark, context: OperationContext):
bookmark_uri = self._extract_bookmark_uri(bookmark)
if not bookmark_uri:
raise ValueError("Unable to find bookmark URI")
delete_data = models.AppBskyBookmarkDeleteBookmark.Data(
uri=bookmark_uri)
context.client.app.bsky.bookmark.delete_bookmark(data=delete_data)
context.logger.debug(f"Deleted bookmark: {bookmark_uri}")
def _extract_bookmark_uri(self, bookmark):
if hasattr(bookmark, "uri"):
return bookmark.uri
for attr_name in ("subject", "record", "post", "item"):
if hasattr(bookmark, attr_name):
nested = getattr(bookmark, attr_name)
if hasattr(nested, "uri"):
return nested.uri
return None
class Operation:
def __init__(
self,
operation_name: str,
strategy_type: str = "feed",
collection: Optional[str] = None,
filter_fn: Optional[Callable[[Any], bool]] = None
):
self.operation_name = operation_name
self.filter_fn = filter_fn
if strategy_type == "record":
if not collection:
raise ValueError("Collection is required for record strategy")
self.strategy = RecordDeletionStrategy(collection)
elif strategy_type == "bookmark":
self.strategy = BookmarkStrategy()
else:
self.strategy = FeedStrategy()
def run(self) -> int:
context = OperationContext()
progress = ProgressTracker(operation=self.operation_name)
context.logger.info(
f"Starting {self.operation_name} with batch_size={context.batch_size}, delay={context.delay}s"
)
cursor = None
total_processed = 0
batch_num = 0
while True:
batch_num += 1
response = self.strategy.fetch(context, cursor)
items = self.strategy.extract_items(response)
if not items:
break
progress.batch(batch_num, len(items))
for item in items:
if self.filter_fn and not self.filter_fn(item):
continue
try:
self.strategy.process_item(item, context)
total_processed += 1
progress.update(1)
except Exception as e:
context.logger.error(f"Error processing item: {e}")
cursor = self.strategy.get_cursor(response)
if not cursor:
break
if context.delay > 0:
time.sleep(context.delay)
context.logger.info(
f"{self.operation_name}: {total_processed} items processed.")
return total_processed
def run_operation(
strategy,
operation_name: str,
filter_fn: Optional[Callable[[Any], bool]] = None
) -> int:
context = OperationContext()
progress = ProgressTracker(operation=operation_name)
context.logger.info(
f"Starting {operation_name} with batch_size={context.batch_size}, delay={context.delay}s"
)
cursor = None
total_processed = 0
batch_num = 0
while True:
batch_num += 1
response = strategy.fetch(context, cursor)
items = strategy.extract_items(response)
if not items:
break
progress.batch(batch_num, len(items))
for item in items:
if filter_fn and not filter_fn(item):
continue
try:
strategy.process_item(item, context)
total_processed += 1
progress.update(1)
except Exception as e:
context.logger.error(f"Error processing item: {e}")
cursor = strategy.get_cursor(response)
if not cursor:
break
if context.delay > 0:
time.sleep(context.delay)
context.logger.info(
f"{operation_name}: {total_processed} items processed.")
return total_processed

60
skywipe/post_analysis.py Normal file
View File

@@ -0,0 +1,60 @@
"""Post analysis utilities for Skywipe"""
class PostAnalyzer:
MEDIA_TYPES = {
'app.bsky.embed.images',
'app.bsky.embed.video',
'app.bsky.embed.external'
}
QUOTE_TYPES = {
'app.bsky.embed.record',
'app.bsky.embed.recordWithMedia',
'app.bsky.embed.record_with_media'
}
@staticmethod
def has_media(post_record):
embed = getattr(post_record, 'embed', None)
if not embed:
return False
embed_type = getattr(embed, 'py_type', None)
if embed_type:
embed_type_base = embed_type.split('#')[0]
if embed_type_base in PostAnalyzer.MEDIA_TYPES:
return True
if embed_type_base in ('app.bsky.embed.recordWithMedia', 'app.bsky.embed.record_with_media'):
media = getattr(embed, 'media', None)
if media:
media_type = getattr(media, 'py_type', None)
if media_type:
media_type_base = media_type.split('#')[0]
if media_type_base in PostAnalyzer.MEDIA_TYPES:
return True
for attr in ('images', 'video', 'external'):
if hasattr(embed, attr):
return True
if isinstance(embed, dict) and embed.get(attr):
return True
return False
@staticmethod
def has_quote(post_record):
embed = getattr(post_record, 'embed', None)
if not embed:
return False
embed_type = getattr(embed, 'py_type', None)
if embed_type:
embed_type_base = embed_type.split('#')[0]
if embed_type_base in PostAnalyzer.QUOTE_TYPES:
return True
return (hasattr(embed, 'record') or
(isinstance(embed, dict) and embed.get('record')))