refactor: consolidate all command metadata into a single structure
This commit is contained in:
@@ -11,16 +11,75 @@ from .safeguard import require_confirmation
|
||||
CommandHandler = Callable[..., None]
|
||||
|
||||
|
||||
CONFIRMATION_MESSAGES = {
|
||||
"posts": "delete all posts",
|
||||
"medias": "delete all posts with media",
|
||||
"likes": "undo all likes",
|
||||
"reposts": "undo all reposts",
|
||||
"quotes": "delete all quote posts",
|
||||
"follows": "unfollow all accounts",
|
||||
"bookmarks": "delete all bookmarks",
|
||||
COMMAND_METADATA = {
|
||||
"posts": {
|
||||
"confirmation": "delete all posts",
|
||||
"help_text": "only posts",
|
||||
"operation_name": "Deleting posts",
|
||||
"strategy_type": "feed",
|
||||
"collection": None,
|
||||
"filter_fn": None,
|
||||
},
|
||||
"medias": {
|
||||
"confirmation": "delete all posts with media",
|
||||
"help_text": "only posts with medias",
|
||||
"operation_name": "Deleting posts with media",
|
||||
"strategy_type": "feed",
|
||||
"collection": None,
|
||||
"filter_fn": lambda post: PostAnalyzer.has_media(post.post),
|
||||
},
|
||||
"likes": {
|
||||
"confirmation": "undo all likes",
|
||||
"help_text": "only likes",
|
||||
"operation_name": "Undoing likes",
|
||||
"strategy_type": "record",
|
||||
"collection": "app.bsky.feed.like",
|
||||
"filter_fn": None,
|
||||
},
|
||||
"reposts": {
|
||||
"confirmation": "undo all reposts",
|
||||
"help_text": "only reposts",
|
||||
"operation_name": "Undoing reposts",
|
||||
"strategy_type": "record",
|
||||
"collection": "app.bsky.feed.repost",
|
||||
"filter_fn": None,
|
||||
},
|
||||
"quotes": {
|
||||
"confirmation": "delete all quote posts",
|
||||
"help_text": "only quotes",
|
||||
"operation_name": "Deleting quote posts",
|
||||
"strategy_type": "feed",
|
||||
"collection": None,
|
||||
"filter_fn": lambda post: PostAnalyzer.has_quote(post.post),
|
||||
},
|
||||
"follows": {
|
||||
"confirmation": "unfollow all accounts",
|
||||
"help_text": "only follows",
|
||||
"operation_name": "Unfollowing accounts",
|
||||
"strategy_type": "record",
|
||||
"collection": "app.bsky.graph.follow",
|
||||
"filter_fn": None,
|
||||
},
|
||||
"bookmarks": {
|
||||
"confirmation": "delete all bookmarks",
|
||||
"help_text": "only bookmarks",
|
||||
"operation_name": "Deleting bookmarks",
|
||||
"strategy_type": "bookmark",
|
||||
"collection": None,
|
||||
"filter_fn": None,
|
||||
},
|
||||
}
|
||||
|
||||
COMMAND_EXECUTION_ORDER = [
|
||||
"quotes",
|
||||
"medias",
|
||||
"posts",
|
||||
"likes",
|
||||
"reposts",
|
||||
"follows",
|
||||
"bookmarks",
|
||||
]
|
||||
|
||||
|
||||
class CommandRegistry:
|
||||
def __init__(self):
|
||||
@@ -98,108 +157,27 @@ def run_configure():
|
||||
config.create()
|
||||
|
||||
|
||||
run_posts = _create_operation_handler(
|
||||
CONFIRMATION_MESSAGES["posts"],
|
||||
"Deleting posts"
|
||||
)
|
||||
|
||||
run_medias = _create_operation_handler(
|
||||
CONFIRMATION_MESSAGES["medias"],
|
||||
"Deleting posts with media",
|
||||
filter_fn=lambda post: PostAnalyzer.has_media(post.post)
|
||||
)
|
||||
|
||||
run_likes = _create_operation_handler(
|
||||
CONFIRMATION_MESSAGES["likes"],
|
||||
"Undoing likes",
|
||||
strategy_type="record",
|
||||
collection="app.bsky.feed.like"
|
||||
)
|
||||
|
||||
run_reposts = _create_operation_handler(
|
||||
CONFIRMATION_MESSAGES["reposts"],
|
||||
"Undoing reposts",
|
||||
strategy_type="record",
|
||||
collection="app.bsky.feed.repost"
|
||||
)
|
||||
|
||||
run_quotes = _create_operation_handler(
|
||||
CONFIRMATION_MESSAGES["quotes"],
|
||||
"Deleting quote posts",
|
||||
filter_fn=lambda post: PostAnalyzer.has_quote(post.post)
|
||||
)
|
||||
|
||||
run_follows = _create_operation_handler(
|
||||
CONFIRMATION_MESSAGES["follows"],
|
||||
"Unfollowing accounts",
|
||||
strategy_type="record",
|
||||
collection="app.bsky.graph.follow"
|
||||
)
|
||||
|
||||
run_bookmarks = _create_operation_handler(
|
||||
CONFIRMATION_MESSAGES["bookmarks"],
|
||||
"Deleting bookmarks",
|
||||
strategy_type="bookmark"
|
||||
def _create_command_handlers():
|
||||
handlers = {}
|
||||
for cmd, metadata in COMMAND_METADATA.items():
|
||||
handlers[cmd] = _create_operation_handler(
|
||||
metadata["confirmation"],
|
||||
metadata["operation_name"],
|
||||
strategy_type=metadata["strategy_type"],
|
||||
collection=metadata["collection"],
|
||||
filter_fn=metadata["filter_fn"]
|
||||
)
|
||||
return handlers
|
||||
|
||||
|
||||
def _get_operation_config(cmd: str) -> Optional[Dict[str, Any]]:
|
||||
configs = {
|
||||
"posts": {
|
||||
"operation_name": "Deleting posts",
|
||||
"strategy_type": "feed",
|
||||
"collection": None,
|
||||
"filter_fn": None
|
||||
},
|
||||
"medias": {
|
||||
"operation_name": "Deleting posts with media",
|
||||
"strategy_type": "feed",
|
||||
"collection": None,
|
||||
"filter_fn": lambda post: PostAnalyzer.has_media(post.post)
|
||||
},
|
||||
"likes": {
|
||||
"operation_name": "Undoing likes",
|
||||
"strategy_type": "record",
|
||||
"collection": "app.bsky.feed.like",
|
||||
"filter_fn": None
|
||||
},
|
||||
"reposts": {
|
||||
"operation_name": "Undoing reposts",
|
||||
"strategy_type": "record",
|
||||
"collection": "app.bsky.feed.repost",
|
||||
"filter_fn": None
|
||||
},
|
||||
"quotes": {
|
||||
"operation_name": "Deleting quote posts",
|
||||
"strategy_type": "feed",
|
||||
"collection": None,
|
||||
"filter_fn": lambda post: PostAnalyzer.has_quote(post.post)
|
||||
},
|
||||
"follows": {
|
||||
"operation_name": "Unfollowing accounts",
|
||||
"strategy_type": "record",
|
||||
"collection": "app.bsky.graph.follow",
|
||||
"filter_fn": None
|
||||
},
|
||||
"bookmarks": {
|
||||
"operation_name": "Deleting bookmarks",
|
||||
"strategy_type": "bookmark",
|
||||
"collection": None,
|
||||
"filter_fn": None
|
||||
}
|
||||
}
|
||||
return configs.get(cmd)
|
||||
|
||||
|
||||
COMMAND_EXECUTION_ORDER = [
|
||||
"quotes",
|
||||
"medias",
|
||||
"posts",
|
||||
"likes",
|
||||
"reposts",
|
||||
"follows",
|
||||
"bookmarks",
|
||||
]
|
||||
_command_handlers = _create_command_handlers()
|
||||
run_posts = _command_handlers["posts"]
|
||||
run_medias = _command_handlers["medias"]
|
||||
run_likes = _command_handlers["likes"]
|
||||
run_reposts = _command_handlers["reposts"]
|
||||
run_quotes = _command_handlers["quotes"]
|
||||
run_follows = _command_handlers["follows"]
|
||||
run_bookmarks = _command_handlers["bookmarks"]
|
||||
|
||||
|
||||
def run_all(skip_confirmation: bool = False):
|
||||
@@ -234,13 +212,13 @@ def run_all(skip_confirmation: bool = False):
|
||||
for cmd in commands:
|
||||
try:
|
||||
logger.info(f"Starting command: {cmd}")
|
||||
operation_config = _get_operation_config(cmd)
|
||||
if operation_config:
|
||||
metadata = COMMAND_METADATA.get(cmd)
|
||||
if metadata:
|
||||
Operation(
|
||||
operation_config["operation_name"],
|
||||
strategy_type=operation_config["strategy_type"],
|
||||
collection=operation_config["collection"],
|
||||
filter_fn=operation_config["filter_fn"],
|
||||
metadata["operation_name"],
|
||||
strategy_type=metadata["strategy_type"],
|
||||
collection=metadata["collection"],
|
||||
filter_fn=metadata["filter_fn"],
|
||||
client=shared_client,
|
||||
config_data=shared_config_data
|
||||
).run()
|
||||
@@ -255,11 +233,6 @@ def run_all(skip_confirmation: bool = False):
|
||||
|
||||
registry.register("configure", run_configure,
|
||||
"create configuration", requires_config=False)
|
||||
registry.register("posts", run_posts, "only posts")
|
||||
registry.register("medias", run_medias, "only posts with medias")
|
||||
registry.register("likes", run_likes, "only likes")
|
||||
registry.register("reposts", run_reposts, "only reposts")
|
||||
registry.register("quotes", run_quotes, "only quotes")
|
||||
registry.register("follows", run_follows, "only follows")
|
||||
registry.register("bookmarks", run_bookmarks, "only bookmarks")
|
||||
for cmd, metadata in COMMAND_METADATA.items():
|
||||
registry.register(cmd, _command_handlers[cmd], metadata["help_text"])
|
||||
registry.register("all", run_all, "target everything")
|
||||
|
||||
Reference in New Issue
Block a user