Compare commits

..

6 Commits

5 changed files with 101 additions and 52 deletions

View File

@@ -61,14 +61,14 @@ def main():
setup_logger(verbose=False, log_file=LOG_FILE) setup_logger(verbose=False, log_file=LOG_FILE)
logger = get_logger() logger = get_logger()
if registry.requires_config(args.command):
require_config(logger)
config = Configuration()
config_data = config.load()
verbose = config_data.get("verbose", False)
setup_logger(verbose=verbose, log_file=LOG_FILE)
try: try:
if registry.requires_config(args.command):
require_config(logger)
config = Configuration()
config_data = config.load()
verbose = config_data.get("verbose", False)
setup_logger(verbose=verbose, log_file=LOG_FILE)
registry.execute( registry.execute(
args.command, skip_confirmation=getattr(args, "yes", False)) args.command, skip_confirmation=getattr(args, "yes", False))
except (ValueError, Exception) as e: except (ValueError, Exception) as e:

View File

@@ -56,65 +56,100 @@ class Configuration:
def exists(self) -> bool: def exists(self) -> bool:
return self.config_file.exists() return self.config_file.exists()
def create(self): def _confirm_overwrite(self, logger) -> bool:
logger = setup_logger(verbose=False) if not self.exists():
if self.exists(): return True
overwrite = input( overwrite = input(
"Configuration already exists. Overwrite? (y/N): ").strip().lower() "Configuration already exists. Overwrite? (y/N): ").strip().lower()
if overwrite not in ("y", "yes"): if overwrite in ("y", "yes"):
logger.info("Configuration creation cancelled.") return True
return logger.info("Configuration creation cancelled.")
return False
def _ensure_config_dir(self) -> None:
config_dir = self.config_file.parent config_dir = self.config_file.parent
config_dir.mkdir(parents=True, exist_ok=True) config_dir.mkdir(parents=True, exist_ok=True)
print("Skywipe Configuration") def _prompt_handle(self, logger) -> str:
print("=" * 50)
print("Note: You should use an app password from Bluesky settings.")
while True: while True:
handle = input("Bluesky handle: ").strip() handle = input("Bluesky handle: ").strip()
is_valid, error_msg = _validate_handle(handle) is_valid, error_msg = _validate_handle(handle)
if is_valid: if is_valid:
break return handle
logger.error(error_msg) logger.error(error_msg)
logger.info("Please enter a valid handle and try again.") logger.info("Please enter a valid handle and try again.")
def _prompt_password(self, logger) -> str:
while True: while True:
password = getpass( password = getpass(
"Bluesky (hopefully app) password: ").strip() "Bluesky (hopefully app) password: ").strip()
is_valid, error_msg = _validate_password(password) is_valid, error_msg = _validate_password(password)
if is_valid: if is_valid:
break return password
logger.error(error_msg) logger.error(error_msg)
logger.info("Please check your password and try again.") logger.info("Please check your password and try again.")
logger.info( logger.info(
"Generate an app password at: https://bsky.app/settings/app-passwords") "Generate an app password at: https://bsky.app/settings/app-passwords")
def _parse_batch_size(self, logger) -> int | None:
batch_size = input("Batch size (default: 10): ").strip() or "10" batch_size = input("Batch size (default: 10): ").strip() or "10"
delay = input(
"Delay between batches in seconds (default: 1): ").strip() or "1"
verbose_input = input(
"Verbose mode (y/n, default: y): ").strip().lower() or "y"
verbose = verbose_input in ("y", "yes", "true", "1")
try: try:
batch_size = int(batch_size) batch_size_int = int(batch_size)
if batch_size < 1 or batch_size > 100:
logger.error("batch_size must be between 1 and 100")
return
except ValueError: except ValueError:
logger.error("batch_size must be an integer") logger.error("batch_size must be an integer")
return return None
if batch_size_int < 1 or batch_size_int > 100:
logger.error("batch_size must be between 1 and 100")
return None
return batch_size_int
def _parse_delay(self, logger) -> int | None:
delay = input(
"Delay between batches in seconds (default: 1): ").strip() or "1"
try: try:
delay = int(delay) delay_int = int(delay)
if delay < 0 or delay > 60:
logger.error("delay must be between 0 and 60 seconds")
return
except ValueError: except ValueError:
logger.error("delay must be an integer") logger.error("delay must be an integer")
return None
if delay_int < 0 or delay_int > 60:
logger.error("delay must be between 0 and 60 seconds")
return None
return delay_int
def _parse_verbose(self) -> bool:
verbose_input = input(
"Verbose mode (y/n, default: y): ").strip().lower() or "y"
return verbose_input in ("y", "yes", "true", "1")
def _write_config(self, logger, config_data: dict) -> None:
try:
with open(self.config_file, "w") as f:
yaml.dump(config_data, f, default_flow_style=False)
except (IOError, OSError) as e:
logger.error(f"Failed to save configuration: {e}")
return return
logger.info(f"Configuration saved to {self.config_file}")
def create(self):
logger = setup_logger(verbose=False)
if not self._confirm_overwrite(logger):
return
self._ensure_config_dir()
print("Skywipe Configuration")
print("=" * 50)
print("Note: You should use an app password from Bluesky settings.")
handle = self._prompt_handle(logger)
password = self._prompt_password(logger)
batch_size = self._parse_batch_size(logger)
if batch_size is None:
return
delay = self._parse_delay(logger)
if delay is None:
return
verbose = self._parse_verbose()
config_data = { config_data = {
"handle": handle, "handle": handle,
@@ -124,14 +159,7 @@ class Configuration:
"verbose": verbose "verbose": verbose
} }
try: self._write_config(logger, config_data)
with open(self.config_file, "w") as f:
yaml.dump(config_data, f, default_flow_style=False)
except (IOError, OSError) as e:
logger.error(f"Failed to save configuration: {e}")
return
logger.info(f"Configuration saved to {self.config_file}")
def load(self) -> dict: def load(self) -> dict:
if not self.exists(): if not self.exists():

View File

@@ -41,7 +41,8 @@ def setup_logger(verbose: bool = False, log_file: Path | None = None) -> logging
logger.setLevel(target_level) logger.setLevel(target_level)
progress_logger = logging.getLogger("skywipe.progress") progress_logger = logging.getLogger("skywipe.progress")
progress_logger.propagate = True if not progress_logger.handlers:
progress_logger.propagate = True
info_handler = None info_handler = None
error_handler = None error_handler = None
@@ -60,8 +61,11 @@ def setup_logger(verbose: bool = False, log_file: Path | None = None) -> logging
if info_handler is None: if info_handler is None:
info_handler = logging.StreamHandler(sys.stdout) info_handler = logging.StreamHandler(sys.stdout)
info_handler.addFilter(LevelFilter(logging.DEBUG, logging.INFO))
logger.addHandler(info_handler) logger.addHandler(info_handler)
for existing in list(info_handler.filters):
if isinstance(existing, LevelFilter):
info_handler.removeFilter(existing)
info_handler.addFilter(LevelFilter(logging.DEBUG, logging.INFO))
info_handler.setFormatter(formatter) info_handler.setFormatter(formatter)
info_handler.setLevel(target_level) info_handler.setLevel(target_level)
@@ -93,11 +97,18 @@ def get_logger() -> logging.Logger:
return logging.getLogger("skywipe") return logging.getLogger("skywipe")
def _format_error_message(error: Exception) -> str:
if isinstance(error, KeyError):
return str(error.args[0]) if error.args else str(error)
return str(error)
def handle_error(error: Exception, logger: logging.Logger, exit_on_error: bool = False) -> None: def handle_error(error: Exception, logger: logging.Logger, exit_on_error: bool = False) -> None:
if isinstance(error, ValueError): if isinstance(error, (KeyError, ValueError)):
logger.error(f"{error}") logger.error(_format_error_message(error))
else: else:
logger.error(f"Unexpected error: {error}", exc_info=True) logger.error(
f"Unexpected error: {_format_error_message(error)}", exc_info=True)
if exit_on_error: if exit_on_error:
sys.exit(1) sys.exit(1)

View File

@@ -13,7 +13,7 @@ def user_input(monkeypatch) -> Callable[[Iterable[str], Iterable[str]], None]:
password_iter = iter(passwords) password_iter = iter(passwords)
monkeypatch.setattr("builtins.input", lambda _prompt: next(input_iter)) monkeypatch.setattr("builtins.input", lambda _prompt: next(input_iter))
monkeypatch.setattr("getpass.getpass", monkeypatch.setattr("skywipe.configure.getpass",
lambda _prompt: next(password_iter)) lambda _prompt: next(password_iter))
return _set return _set

View File

@@ -50,9 +50,14 @@ def _setup_error_mocks(monkeypatch, calls, error_factory):
monkeypatch.setattr(cli, "get_logger", monkeypatch.setattr(cli, "get_logger",
lambda: logging.getLogger(TEST_LOGGER_NAME)) lambda: logging.getLogger(TEST_LOGGER_NAME))
def _format_error_message(error):
if isinstance(error, KeyError):
return error.args[0] if error.args else str(error)
return str(error)
def mock_handle_error(error, logger, exit_on_error=False): def mock_handle_error(error, logger, exit_on_error=False):
calls["handle_error"] = (type(error).__name__, calls["handle_error"] = (type(error).__name__,
str(error), exit_on_error) _format_error_message(error), exit_on_error)
monkeypatch.setattr(cli, "handle_error", mock_handle_error) monkeypatch.setattr(cli, "handle_error", mock_handle_error)
@@ -186,9 +191,14 @@ def test_main_handles_config_load_error(monkeypatch):
def raise_config_error(self): def raise_config_error(self):
raise RuntimeError("config error") raise RuntimeError("config error")
def _format_error_message(error):
if isinstance(error, KeyError):
return error.args[0] if error.args else str(error)
return str(error)
def mock_handle_error(error, logger, exit_on_error=False): def mock_handle_error(error, logger, exit_on_error=False):
calls["handle_error"] = (type(error).__name__, calls["handle_error"] = (type(error).__name__,
str(error), exit_on_error) _format_error_message(error), exit_on_error)
_setup_parser_mocks(monkeypatch) _setup_parser_mocks(monkeypatch)
monkeypatch.setattr(cli.registry, "requires_config", lambda name: True) monkeypatch.setattr(cli.registry, "requires_config", lambda name: True)