From 1a1fee0984a8ccc486df565d1e3133e46f044018 Mon Sep 17 00:00:00 2001 From: Kharec Date: Sun, 7 Dec 2025 20:31:37 +0100 Subject: [PATCH] refactor: extract playback orchestration and optimize code structure --- auditui/playback.py | 167 +++++++++++++++++++++++++++++--------------- 1 file changed, 110 insertions(+), 57 deletions(-) diff --git a/auditui/playback.py b/auditui/playback.py index aed8ab3..5ff90fd 100644 --- a/auditui/playback.py +++ b/auditui/playback.py @@ -1,5 +1,7 @@ """Playback control for Auditui.""" +from __future__ import annotations + import os import shutil import signal @@ -7,6 +9,7 @@ import subprocess from pathlib import Path from typing import Callable +from .downloads import DownloadManager StatusCallback = Callable[[str], None] @@ -22,10 +25,14 @@ class PlaybackController: self.current_file_path: Path | None = None self.current_asin: str | None = None - def start(self, path: Path, activation_hex: str | None = None) -> bool: + def start( + self, path: Path, activation_hex: str | None = None, status_callback: StatusCallback | None = None + ) -> bool: """Start playing a local file using ffplay.""" + notify = status_callback or self.notify + if not shutil.which("ffplay"): - self.notify("ffplay not found. Please install ffmpeg") + notify("ffplay not found. Please install ffmpeg") return False if self.playback_process is not None: @@ -43,18 +50,20 @@ class PlaybackController: if self.playback_process.poll() is not None: return_code = self.playback_process.returncode - self.notify(f"Playback process exited immediately (code: {return_code})") + notify( + f"Playback process exited immediately (code: {return_code})" + ) self.playback_process = None return False self.is_playing = True self.is_paused = False self.current_file_path = path - self.notify(f"Playing: {path.name}") + notify(f"Playing: {path.name}") return True except Exception as exc: - self.notify(f"Error starting playback: {exc}") + notify(f"Error starting playback: {exc}") return False def stop(self) -> None: @@ -70,60 +79,24 @@ class PlaybackController: except subprocess.TimeoutExpired: self.playback_process.kill() self.playback_process.wait() - except ProcessLookupError: - pass - except Exception: + except (ProcessLookupError, ValueError): pass finally: - self.playback_process = None - self.is_playing = False - self.is_paused = False - self.current_file_path = None - self.current_asin = None + self._reset_state() def pause(self) -> None: """Pause the current playback.""" - if not (self.playback_process and self.is_playing and not self.is_paused): + if not self._validate_playback_state(require_paused=False): return - if not self.is_alive(): - self.stop() - self.notify("Playback process has ended") - return - - try: - os.kill(self.playback_process.pid, signal.SIGSTOP) - self.is_paused = True - self.notify(self._status_message("Paused")) - except ProcessLookupError: - self.stop() - self.notify("Process no longer exists") - except PermissionError: - self.notify("Permission denied: cannot pause playback") - except Exception as exc: - self.notify(f"Error pausing playback: {exc}") + self._send_signal(signal.SIGSTOP, "Paused", "pause") def resume(self) -> None: """Resume the current playback.""" - if not (self.playback_process and self.is_playing and self.is_paused): + if not self._validate_playback_state(require_paused=True): return - if not self.is_alive(): - self.stop() - self.notify("Playback process has ended") - return - - try: - os.kill(self.playback_process.pid, signal.SIGCONT) - self.is_paused = False - self.notify(self._status_message("Playing")) - except ProcessLookupError: - self.stop() - self.notify("Process no longer exists") - except PermissionError: - self.notify("Permission denied: cannot resume playback") - except Exception as exc: - self.notify(f"Error resuming playback: {exc}") + self._send_signal(signal.SIGCONT, "Playing", "resume") def check_status(self) -> str | None: """Check if playback process has finished and return status message.""" @@ -135,12 +108,7 @@ class PlaybackController: return None finished_file = self.current_file_path - self.playback_process = None - self.is_playing = False - self.is_paused = False - - self.current_file_path = None - self.current_asin = None + self._reset_state() if finished_file: if return_code == 0: @@ -148,13 +116,98 @@ class PlaybackController: return f"Playback ended unexpectedly (code: {return_code}): {finished_file.name}" return "Playback finished" - def _status_message(self, prefix: str) -> str: - """Generate status message with filename if available.""" - filename = self.current_file_path.name if self.current_file_path else "" - return f"{prefix}: {filename}" if filename else prefix + def _reset_state(self) -> None: + """Reset all playback state.""" + self.playback_process = None + self.is_playing = False + self.is_paused = False + self.current_file_path = None + self.current_asin = None + + def _validate_playback_state(self, require_paused: bool) -> bool: + """Validate playback state before pause/resume operations.""" + if not (self.playback_process and self.is_playing): + return False + + if require_paused and not self.is_paused: + return False + if not require_paused and self.is_paused: + return False + + if not self.is_alive(): + self.stop() + self.notify("Playback process has ended") + return False + + return True + + def _send_signal(self, sig: signal.Signals, status_prefix: str, action: str) -> None: + """Send signal to playback process and update state.""" + if self.playback_process is None: + return + + try: + os.kill(self.playback_process.pid, sig) + self.is_paused = sig == signal.SIGSTOP + filename = self.current_file_path.name if self.current_file_path else "" + message = f"{status_prefix}: {filename}" if filename else status_prefix + self.notify(message) + except ProcessLookupError: + self.stop() + self.notify("Process no longer exists") + except PermissionError: + self.notify(f"Permission denied: cannot {action} playback") + except Exception as exc: + self.notify(f"Error {action}ing playback: {exc}") def is_alive(self) -> bool: """Check if playback process is still running.""" if self.playback_process is None: return False return self.playback_process.poll() is None + + def prepare_and_start( + self, + download_manager: DownloadManager, + asin: str, + status_callback: StatusCallback | None = None, + ) -> bool: + """Download file, get activation bytes, and start playback.""" + notify = status_callback or self.notify + + if not download_manager: + notify("Could not download file") + return False + + notify("Preparing playback...") + + local_path = download_manager.get_or_download(asin, notify) + if not local_path: + notify("Could not download file") + return False + + notify("Getting activation bytes...") + activation_hex = download_manager.get_activation_bytes() + if not activation_hex: + notify("Failed to get activation bytes") + return False + + notify(f"Starting playback of {local_path.name}...") + self.current_asin = asin + return self.start(local_path, activation_hex, notify) + + def toggle_playback(self) -> bool: + """Toggle pause/resume state. Returns True if action was taken.""" + if not self.is_playing: + return False + + if not self.is_alive(): + self.stop() + self.notify("Playback has ended") + return False + + if self.is_paused: + self.resume() + else: + self.pause() + return True