Compare commits

..

13 Commits

Author SHA1 Message Date
590cc03ba4 docs: refactor warning 2025-12-20 16:03:39 +01:00
c493a99860 docs: inform about log 2025-12-20 16:01:07 +01:00
3f9ef6527f docs: update roadmap 2025-12-20 16:00:03 +01:00
f53e5bb527 feat: implement our safeguard in commands 2025-12-20 15:59:54 +01:00
f2854a0df5 feat: implement safeguard and relocate log file 2025-12-20 15:59:41 +01:00
defd991006 feat: new safeguard module 2025-12-20 15:59:17 +01:00
f922e3cf9d docs: update roadmap 2025-12-20 15:46:48 +01:00
06d70edaf1 feat: use our new logger 2025-12-20 15:46:33 +01:00
58ab6cfafa feat: implement proper logging 2025-12-20 15:46:22 +01:00
dba06e642a docs: update roadmap 2025-12-19 14:39:15 +01:00
5868c1649b docs: update readme 2025-12-19 14:35:25 +01:00
a14184cddc feat: run all 2025-12-19 14:35:21 +01:00
6587f8c39c feat: implement bookmark deletion module 2025-12-19 14:35:05 +01:00
13 changed files with 346 additions and 99 deletions

View File

@@ -2,9 +2,9 @@
Skywipe is a work-in-progress Python 3.13+ CLI that helps you wipe data from your Bluesky account using the AT Protocol SDK.
## Warning
**Warning:** This tool performs _**destructive operations**_.
This tool performs destructive operations. Only use it if you intend to erase data from your Bluesky account.
Only use it if you intend to permanently erase data from your Bluesky account.
## Requirements
@@ -34,6 +34,10 @@ uv run python -m skywipe.cli follows # unfollow all
uv run python -m skywipe.cli bookmarks # delete bookmarks
```
Use the `--yes` flag to skip the confirmation prompt and proceed with the operation.
A log of the operations will be saved in `~/.cache/skywipe/skywipe.log`.
## Configuration
If you run the tool for the first time, it will prompt you to use `skywipe configure` to create the configuration file, which is located in `~/.config/skywipe/config.yml` :
@@ -58,14 +62,11 @@ BE SURE TO USE A [BLUESKY APP PASSWORD](https://blueskyfeeds.com/faq-app-passwor
- [x] undo likes
- [x] undo reposts
- [x] delete quotes
- [ ] unfollow accounts
- [ ] remove bookmarks
- [ ] make `all` run the other commands
- [ ] add simple progress and logging
- [ ] add safeguards like confirmations and clear dry-run info
Once it's done, we'll think:
- [x] unfollow accounts
- [x] remove bookmarks
- [x] make `all` run the other commands
- [x] add simple progress and logging
- [x] add safeguards (confirmation, dry-run flag)
- [ ] decent code architecture
- [ ] installation and run process

78
skywipe/bookmarks.py Normal file
View File

@@ -0,0 +1,78 @@
"""Bookmark deletion module for Skywipe"""
import time
from atproto import models
from .auth import Auth
from .configure import Configuration
from .logger import get_logger, ProgressTracker
def delete_bookmarks():
logger = get_logger()
auth = Auth()
client = auth.login()
config = Configuration()
config_data = config.load()
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
logger.info(
f"Starting bookmark deletion with batch_size={batch_size}, delay={delay}s")
cursor = None
total_deleted = 0
batch_num = 0
progress = ProgressTracker(operation="Deleting bookmarks")
while True:
batch_num += 1
get_params = models.AppBskyBookmarkGetBookmarks.Params(
limit=batch_size,
cursor=cursor
)
response = client.app.bsky.bookmark.get_bookmarks(params=get_params)
bookmarks = response.bookmarks
if not bookmarks:
break
progress.batch(batch_num, len(bookmarks))
for bookmark in bookmarks:
try:
bookmark_uri = None
if hasattr(bookmark, "uri"):
bookmark_uri = bookmark.uri
else:
for attr_name in ("subject", "record", "post", "item"):
if hasattr(bookmark, attr_name):
nested = getattr(bookmark, attr_name)
if hasattr(nested, "uri"):
bookmark_uri = nested.uri
break
if not bookmark_uri:
logger.debug("Skipping bookmark: unable to find uri")
continue
delete_data = models.AppBskyBookmarkDeleteBookmark.Data(
uri=bookmark_uri
)
client.app.bsky.bookmark.delete_bookmark(data=delete_data)
total_deleted += 1
progress.update(1)
logger.debug(f"Deleted bookmark: {bookmark_uri}")
except Exception as e:
bookmark_uri = getattr(bookmark, "uri", "unknown")
logger.error(f"Error deleting bookmark {bookmark_uri}: {e}")
cursor = response.cursor
if not cursor:
break
if delay > 0:
time.sleep(delay)
logger.info(f"Deleted {total_deleted} bookmarks.")

View File

@@ -2,9 +2,11 @@
import sys
import argparse
from pathlib import Path
from .commands import registry
from .configure import Configuration
from .logger import setup_logger
def create_parser():
@@ -15,6 +17,12 @@ def create_parser():
epilog="WARNING: This tool performs destructive operations. Only use it if you intend to erase data from your Bluesky account."
)
parser.add_argument(
"--yes",
action="store_true",
help="Skip confirmation prompt and proceed with destructive operations"
)
subparsers = parser.add_subparsers(
dest="command",
help="Command to execute",
@@ -31,8 +39,9 @@ def create_parser():
def require_config():
config = Configuration()
if not config.exists():
print("Error: Configuration file not found.")
print("You must run 'skywipe configure' first.")
logger = setup_logger(verbose=False)
logger.error("Configuration file not found.")
logger.error("You must run 'skywipe configure' first.")
sys.exit(1)
@@ -42,11 +51,24 @@ def main():
if registry.requires_config(args.command):
require_config()
config = Configuration()
config_data = config.load()
verbose = config_data.get("verbose", False)
log_file = Path.home() / ".cache" / "skywipe" / "skywipe.log"
setup_logger(verbose=verbose, log_file=log_file)
else:
setup_logger(verbose=False)
try:
registry.execute(args.command)
registry.execute(
args.command, skip_confirmation=getattr(args, "yes", False))
except ValueError as e:
print(f"Error: {e}")
logger = setup_logger(verbose=False)
logger.error(f"{e}")
sys.exit(1)
except Exception as e:
logger = setup_logger(verbose=False)
logger.error(f"Unexpected error: {e}", exc_info=True)
sys.exit(1)

View File

@@ -2,14 +2,18 @@
from typing import Callable, Dict, Optional
from .configure import Configuration
from .posts import delete_posts
from .posts import delete_all_posts
from .medias import delete_posts_with_medias
from .likes import undo_likes
from .reposts import undo_reposts
from .quotes import delete_quotes
from .quotes import delete_quotes_posts
from .follows import unfollow_all
from .bookmarks import delete_bookmarks
from .logger import get_logger
from .safeguard import require_confirmation
CommandHandler = Callable[[], None]
CommandHandler = Callable[..., None]
class CommandRegistry:
@@ -41,10 +45,13 @@ class CommandRegistry:
def get_all_commands(self) -> Dict[str, str]:
return self._help_texts.copy()
def execute(self, name: str):
def execute(self, name: str, skip_confirmation: bool = False):
handler = self.get_handler(name)
if handler:
handler()
if name == "configure":
handler()
else:
handler(skip_confirmation)
else:
raise ValueError(f"Unknown command: {name}")
@@ -57,42 +64,58 @@ def run_configure():
config.create()
def run_posts():
delete_posts()
def run_posts(skip_confirmation: bool = False):
require_confirmation("delete all posts", skip_confirmation)
delete_all_posts()
def run_medias():
def run_medias(skip_confirmation: bool = False):
require_confirmation("delete all posts with media", skip_confirmation)
delete_posts_with_medias()
def run_likes():
def run_likes(skip_confirmation: bool = False):
require_confirmation("undo all likes", skip_confirmation)
undo_likes()
def run_reposts():
def run_reposts(skip_confirmation: bool = False):
require_confirmation("undo all reposts", skip_confirmation)
undo_reposts()
def run_quotes():
delete_quotes()
def run_quotes(skip_confirmation: bool = False):
require_confirmation("delete all quote posts", skip_confirmation)
delete_quotes_posts()
def run_follows():
print("Command 'follows' is not yet implemented.")
def run_follows(skip_confirmation: bool = False):
require_confirmation("unfollow all accounts", skip_confirmation)
unfollow_all()
def run_bookmarks():
print("Command 'bookmarks' is not yet implemented")
def run_bookmarks(skip_confirmation: bool = False):
require_confirmation("delete all bookmarks", skip_confirmation)
delete_bookmarks()
def run_all():
registry.execute("posts")
registry.execute("medias")
registry.execute("likes")
registry.execute("reposts")
registry.execute("quotes")
registry.execute("follows")
registry.execute("bookmarks")
def run_all(skip_confirmation: bool = False):
logger = get_logger()
require_confirmation(
"run all cleanup commands (posts, likes, reposts, follows, bookmarks)", skip_confirmation)
commands = ["posts", "likes", "reposts", "follows", "bookmarks"]
logger.info("Running all cleanup commands...")
for cmd in commands:
try:
logger.info(f"Starting command: {cmd}")
registry.execute(cmd, skip_confirmation=True)
logger.info(f"Completed command: {cmd}")
except Exception as e:
logger.error(f"Error running '{cmd}': {e}", exc_info=True)
continue
logger.info("All commands completed.")
registry.register("configure", run_configure,
@@ -103,5 +126,5 @@ registry.register("likes", run_likes, "only likes")
registry.register("reposts", run_reposts, "only reposts")
registry.register("quotes", run_quotes, "only quotes")
registry.register("follows", run_follows, "only follows")
registry.register("bookmarks", run_follows, "only bookmarks")
registry.register("bookmarks", run_bookmarks, "only bookmarks")
registry.register("all", run_all, "target everything")

View File

@@ -3,6 +3,7 @@
import getpass
from pathlib import Path
import yaml
from .logger import setup_logger, get_logger
class Configuration:
@@ -13,10 +14,12 @@ class Configuration:
return self.config_file.exists()
def create(self):
logger = setup_logger(verbose=False)
if self.exists():
overwrite = input(
"Configuration already exists. Overwrite? (y/N): ").strip().lower()
if overwrite not in ("y", "yes"):
logger.info("Configuration creation cancelled.")
return
config_dir = self.config_file.parent
@@ -37,7 +40,7 @@ class Configuration:
batch_size = int(batch_size)
delay = int(delay)
except ValueError:
print("Error: batch_size and delay must be integers")
logger.error("batch_size and delay must be integers")
return
config_data = {
@@ -51,7 +54,7 @@ class Configuration:
with open(self.config_file, "w") as f:
yaml.dump(config_data, f, default_flow_style=False)
print(f"\nConfiguration saved to {self.config_file}")
logger.info(f"Configuration saved to {self.config_file}")
def load(self) -> dict:
if not self.exists():

View File

@@ -4,12 +4,14 @@ import time
from atproto import models
from .auth import Auth
from .configure import Configuration
from .logger import get_logger, ProgressTracker
FOLLOW_COLLECTION = "app.bsky.graph.follow"
def unfollow_all():
logger = get_logger()
auth = Auth()
client = auth.login()
config = Configuration()
@@ -17,17 +19,18 @@ def unfollow_all():
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting unfollow operation with batch_size={batch_size}, delay={delay}s")
logger.info(
f"Starting unfollow operation with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_unfollowed = 0
batch_num = 0
progress = ProgressTracker(operation="Unfollowing accounts")
while True:
batch_num += 1
list_params = models.ComAtprotoRepoListRecords.Params(
repo=did,
collection=FOLLOW_COLLECTION,
@@ -41,6 +44,8 @@ def unfollow_all():
if not records:
break
progress.batch(batch_num, len(records))
for record in records:
try:
record_uri = record.uri
@@ -52,12 +57,11 @@ def unfollow_all():
}
client.com.atproto.repo.delete_record(data=delete_data)
total_unfollowed += 1
if verbose:
print(f"Unfollowed: {record_uri}")
progress.update(1)
logger.debug(f"Unfollowed: {record_uri}")
except Exception as e:
record_uri = getattr(record, "uri", "unknown")
if verbose:
print(f"Error unfollowing {record_uri}: {e}")
logger.error(f"Error unfollowing {record_uri}: {e}")
cursor = response.cursor
if not cursor:
@@ -66,4 +70,4 @@ def unfollow_all():
if delay > 0:
time.sleep(delay)
print(f"Unfollowed {total_unfollowed} accounts.")
logger.info(f"Unfollowed {total_unfollowed} accounts.")

View File

@@ -4,12 +4,14 @@ import time
from atproto import models
from .auth import Auth
from .configure import Configuration
from .logger import get_logger, ProgressTracker
LIKE_COLLECTION = "app.bsky.feed.like"
def undo_likes():
logger = get_logger()
auth = Auth()
client = auth.login()
config = Configuration()
@@ -17,17 +19,18 @@ def undo_likes():
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting like deletion with batch_size={batch_size}, delay={delay}s")
logger.info(
f"Starting like deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_undone = 0
batch_num = 0
progress = ProgressTracker(operation="Undoing likes")
while True:
batch_num += 1
list_params = models.ComAtprotoRepoListRecords.Params(
repo=did,
collection=LIKE_COLLECTION,
@@ -41,6 +44,8 @@ def undo_likes():
if not records:
break
progress.batch(batch_num, len(records))
for record in records:
try:
record_uri = record.uri
@@ -52,12 +57,11 @@ def undo_likes():
}
client.com.atproto.repo.delete_record(data=delete_data)
total_undone += 1
if verbose:
print(f"Undone like: {record_uri}")
progress.update(1)
logger.debug(f"Undone like: {record_uri}")
except Exception as e:
record_uri = getattr(record, "uri", "unknown")
if verbose:
print(f"Error undoing like {record_uri}: {e}")
logger.error(f"Error undoing like {record_uri}: {e}")
cursor = response.cursor
if not cursor:
@@ -66,4 +70,4 @@ def undo_likes():
if delay > 0:
time.sleep(delay)
print(f"Undone {total_undone} likes.")
logger.info(f"Undone {total_undone} likes.")

73
skywipe/logger.py Normal file
View File

@@ -0,0 +1,73 @@
"""Centralized logging module for Skywipe"""
import logging
import sys
from pathlib import Path
from typing import Optional
class ProgressTracker:
def __init__(self, operation: str = "Processing"):
self.current = 0
self.operation = operation
def update(self, count: int = 1):
self.current += count
def batch(self, batch_num: int, batch_size: int, total_batches: Optional[int] = None):
logger = logging.getLogger("skywipe.progress")
if total_batches:
logger.info(
f"{self.operation} - batch {batch_num}/{total_batches} ({batch_size} items)"
)
else:
logger.info(
f"{self.operation} - batch {batch_num} ({batch_size} items)")
class LevelFilter(logging.Filter):
def __init__(self, min_level: int, max_level: int):
super().__init__()
self.min_level = min_level
self.max_level = max_level
def filter(self, record: logging.LogRecord) -> bool:
return self.min_level <= record.levelno <= self.max_level
def setup_logger(verbose: bool = False, log_file: Optional[Path] = None) -> logging.Logger:
logger = logging.getLogger("skywipe")
logger.setLevel(logging.DEBUG if verbose else logging.INFO)
if logger.handlers:
return logger
formatter = logging.Formatter(fmt="%(levelname)s: %(message)s")
info_handler = logging.StreamHandler(sys.stdout)
info_handler.setLevel(logging.DEBUG if verbose else logging.INFO)
info_handler.addFilter(LevelFilter(logging.DEBUG, logging.INFO))
info_handler.setFormatter(formatter)
logger.addHandler(info_handler)
error_handler = logging.StreamHandler(sys.stderr)
error_handler.setLevel(logging.WARNING)
error_handler.setFormatter(formatter)
logger.addHandler(error_handler)
if log_file:
log_file.parent.mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(log_file, encoding="utf-8")
file_handler.setLevel(logging.DEBUG)
file_formatter = logging.Formatter(
fmt="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
return logger
def get_logger() -> logging.Logger:
return logging.getLogger("skywipe")

View File

@@ -3,9 +3,11 @@
import time
from .auth import Auth
from .configure import Configuration
from .logger import get_logger, ProgressTracker
def delete_posts_with_medias():
logger = get_logger()
auth = Auth()
client = auth.login()
config = Configuration()
@@ -13,17 +15,18 @@ def delete_posts_with_medias():
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting media post deletion with batch_size={batch_size}, delay={delay}s")
logger.info(
f"Starting media post deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_deleted = 0
batch_num = 0
progress = ProgressTracker(operation="Deleting posts with media")
while True:
batch_num += 1
response = client.get_author_feed(
actor=did, limit=batch_size, cursor=cursor)
@@ -31,6 +34,8 @@ def delete_posts_with_medias():
if not posts:
break
progress.batch(batch_num, len(posts))
for post in posts:
post_record = post.post
@@ -69,18 +74,16 @@ def delete_posts_with_medias():
break
if not has_media:
if verbose:
print(f"Skipping post without media: {post_record.uri}")
logger.debug(f"Skipping post without media: {post_record.uri}")
continue
try:
client.delete_post(post_record.uri)
total_deleted += 1
if verbose:
print(f"Deleted post with media: {post_record.uri}")
progress.update(1)
logger.debug(f"Deleted post with media: {post_record.uri}")
except Exception as e:
if verbose:
print(f"Error deleting post {post_record.uri}: {e}")
logger.error(f"Error deleting post {post_record.uri}: {e}")
cursor = response.cursor
if not cursor:
@@ -89,4 +92,4 @@ def delete_posts_with_medias():
if delay > 0:
time.sleep(delay)
print(f"Deleted {total_deleted} posts with media.")
logger.info(f"Deleted {total_deleted} posts with media.")

View File

@@ -3,9 +3,11 @@
import time
from .auth import Auth
from .configure import Configuration
from .logger import get_logger, ProgressTracker
def delete_all_posts():
logger = get_logger()
auth = Auth()
client = auth.login()
config = Configuration()
@@ -13,17 +15,18 @@ def delete_all_posts():
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting post deletion with batch_size={batch_size}, delay={delay}s")
logger.info(
f"Starting post deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_deleted = 0
batch_num = 0
progress = ProgressTracker(operation="Deleting posts")
while True:
batch_num += 1
if cursor:
response = client.get_author_feed(
actor=did, limit=batch_size, cursor=cursor)
@@ -35,16 +38,16 @@ def delete_all_posts():
break
post_uris = [post.post.uri for post in posts]
progress.batch(batch_num, len(post_uris))
for uri in post_uris:
try:
client.delete_post(uri)
total_deleted += 1
if verbose:
print(f"Deleted post: {uri}")
progress.update(1)
logger.debug(f"Deleted post: {uri}")
except Exception as e:
if verbose:
print(f"Error deleting post {uri}: {e}")
logger.error(f"Error deleting post {uri}: {e}")
cursor = response.cursor
if not cursor:
@@ -53,4 +56,4 @@ def delete_all_posts():
if delay > 0:
time.sleep(delay)
print(f"Deleted {total_deleted} posts.")
logger.info(f"Deleted {total_deleted} posts.")

View File

@@ -3,9 +3,11 @@
import time
from .auth import Auth
from .configure import Configuration
from .logger import get_logger, ProgressTracker
def delete_quotes_posts():
logger = get_logger()
auth = Auth()
client = auth.login()
config = Configuration()
@@ -13,17 +15,17 @@ def delete_quotes_posts():
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting quote post deletion with batch_size={batch_size}, delay={delay}s")
logger.info(f"Starting quote post deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_deleted = 0
batch_num = 0
progress = ProgressTracker(operation="Deleting quote posts")
while True:
batch_num += 1
response = client.get_author_feed(
actor=did, limit=batch_size, cursor=cursor)
@@ -31,6 +33,8 @@ def delete_quotes_posts():
if not posts:
break
progress.batch(batch_num, len(posts))
for post in posts:
post_record = post.post
@@ -52,18 +56,16 @@ def delete_quotes_posts():
has_quote = True
if not has_quote:
if verbose:
print(f"Skipping post without quote: {post_record.uri}")
logger.debug(f"Skipping post without quote: {post_record.uri}")
continue
try:
client.delete_post(post_record.uri)
total_deleted += 1
if verbose:
print(f"Deleted quote post: {post_record.uri}")
progress.update(1)
logger.debug(f"Deleted quote post: {post_record.uri}")
except Exception as e:
if verbose:
print(f"Error deleting quote post {post_record.uri}: {e}")
logger.error(f"Error deleting quote post {post_record.uri}: {e}")
cursor = response.cursor
if not cursor:
@@ -72,4 +74,4 @@ def delete_quotes_posts():
if delay > 0:
time.sleep(delay)
print(f"Deleted {total_deleted} quote posts.")
logger.info(f"Deleted {total_deleted} quote posts.")

View File

@@ -4,12 +4,14 @@ import time
from atproto import models
from .auth import Auth
from .configure import Configuration
from .logger import get_logger, ProgressTracker
REPOST_COLLECTION = "app.bsky.feed.repost"
def undo_reposts():
logger = get_logger()
auth = Auth()
client = auth.login()
config = Configuration()
@@ -17,17 +19,18 @@ def undo_reposts():
batch_size = config_data.get("batch_size", 10)
delay = config_data.get("delay", 1)
verbose = config_data.get("verbose", False)
if verbose:
print(
f"Starting repost deletion with batch_size={batch_size}, delay={delay}s")
logger.info(
f"Starting repost deletion with batch_size={batch_size}, delay={delay}s")
did = client.me.did
cursor = None
total_undone = 0
batch_num = 0
progress = ProgressTracker(operation="Undoing reposts")
while True:
batch_num += 1
list_params = models.ComAtprotoRepoListRecords.Params(
repo=did,
collection=REPOST_COLLECTION,
@@ -41,6 +44,8 @@ def undo_reposts():
if not records:
break
progress.batch(batch_num, len(records))
for record in records:
try:
record_uri = record.uri
@@ -52,12 +57,11 @@ def undo_reposts():
}
client.com.atproto.repo.delete_record(data=delete_data)
total_undone += 1
if verbose:
print(f"Undone repost: {record_uri}")
progress.update(1)
logger.debug(f"Undone repost: {record_uri}")
except Exception as e:
record_uri = getattr(record, "uri", "unknown")
if verbose:
print(f"Error undoing repost {record_uri}: {e}")
logger.error(f"Error undoing repost {record_uri}: {e}")
cursor = response.cursor
if not cursor:
@@ -66,4 +70,4 @@ def undo_reposts():
if delay > 0:
time.sleep(delay)
print(f"Undone {total_undone} reposts.")
logger.info(f"Undone {total_undone} reposts.")

27
skywipe/safeguard.py Normal file
View File

@@ -0,0 +1,27 @@
"""Safeguard module for Skywipe"""
import sys
from .logger import get_logger
CONFIRM_RESPONSES = {"yes", "y"}
def require_confirmation(operation: str, skip_confirmation: bool = False) -> None:
if skip_confirmation:
return
logger = get_logger()
logger.warning(f"This will {operation}")
logger.warning("This operation is DESTRUCTIVE and cannot be undone!")
try:
response = input(
"Are you sure you want to continue? (y/N): ").strip().lower()
except (EOFError, KeyboardInterrupt):
logger.info("\nOperation cancelled.")
sys.exit(0)
if response not in CONFIRM_RESPONSES:
logger.info("Operation cancelled.")
sys.exit(0)