refactor: extract table sorting logic and media info loading to new modules
This commit is contained in:
@@ -2,7 +2,6 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import unicodedata
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from textual import work
|
||||
@@ -15,6 +14,7 @@ from .constants import *
|
||||
from .downloads import DownloadManager
|
||||
from .library import LibraryClient
|
||||
from .playback import PlaybackController
|
||||
from .table_helpers import create_progress_sort_key, create_title_sort_key
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from textual.widgets._data_table import ColumnKey
|
||||
@@ -221,14 +221,8 @@ class Auditui(App):
|
||||
"""Sort table by title, toggling direction on each press."""
|
||||
table = self.query_one(DataTable)
|
||||
if table.row_count > 0 and self.title_column_key:
|
||||
def title_key(row_values):
|
||||
title_cell = row_values[0]
|
||||
if isinstance(title_cell, str):
|
||||
normalized = unicodedata.normalize('NFD', title_cell)
|
||||
return normalized.encode('ascii', 'ignore').decode('ascii').lower()
|
||||
return str(title_cell).lower()
|
||||
|
||||
table.sort(key=title_key, reverse=self.title_sort_reverse)
|
||||
title_key, reverse = create_title_sort_key(self.title_sort_reverse)
|
||||
table.sort(key=title_key, reverse=reverse)
|
||||
self.title_sort_reverse = not self.title_sort_reverse
|
||||
|
||||
def action_sort_by_progress(self) -> None:
|
||||
@@ -236,17 +230,9 @@ class Auditui(App):
|
||||
table = self.query_one(DataTable)
|
||||
if table.row_count > 0:
|
||||
self.progress_sort_reverse = not self.progress_sort_reverse
|
||||
|
||||
def progress_key(row_values):
|
||||
progress_cell = row_values[self.progress_column_index]
|
||||
if isinstance(progress_cell, str):
|
||||
try:
|
||||
return float(progress_cell.rstrip("%"))
|
||||
except (ValueError, AttributeError):
|
||||
return 0.0
|
||||
return 0.0
|
||||
|
||||
table.sort(key=progress_key, reverse=self.progress_sort_reverse)
|
||||
progress_key, reverse = create_progress_sort_key(
|
||||
self.progress_column_index, self.progress_sort_reverse)
|
||||
table.sort(key=progress_key, reverse=reverse)
|
||||
|
||||
def action_show_all(self) -> None:
|
||||
"""Toggle between showing all and unfinished books."""
|
||||
|
||||
46
auditui/media_info.py
Normal file
46
auditui/media_info.py
Normal file
@@ -0,0 +1,46 @@
|
||||
"""Media information loading for Audible content."""
|
||||
|
||||
import json
|
||||
import shutil
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def load_media_info(path: Path, activation_hex: str | None = None) -> tuple[float | None, list[dict]]:
|
||||
"""Load media information including duration and chapters using ffprobe.
|
||||
|
||||
Returns:
|
||||
Tuple of (duration in seconds, list of chapter dicts with start_time, end_time, title)
|
||||
"""
|
||||
if not shutil.which("ffprobe"):
|
||||
return None, []
|
||||
|
||||
try:
|
||||
cmd = ["ffprobe", "-v", "quiet", "-print_format",
|
||||
"json", "-show_format", "-show_chapters"]
|
||||
if activation_hex:
|
||||
cmd.extend(["-activation_bytes", activation_hex])
|
||||
cmd.append(str(path))
|
||||
|
||||
result = subprocess.run(
|
||||
cmd, capture_output=True, text=True, timeout=10)
|
||||
if result.returncode != 0:
|
||||
return None, []
|
||||
|
||||
data = json.loads(result.stdout)
|
||||
format_info = data.get("format", {})
|
||||
duration_str = format_info.get("duration")
|
||||
duration = float(duration_str) if duration_str else None
|
||||
|
||||
chapters_data = data.get("chapters", [])
|
||||
chapters = [
|
||||
{
|
||||
"start_time": float(ch.get("start_time", 0)),
|
||||
"end_time": float(ch.get("end_time", 0)),
|
||||
"title": ch.get("tags", {}).get("title", f"Chapter {idx + 1}"),
|
||||
}
|
||||
for idx, ch in enumerate(chapters_data)
|
||||
]
|
||||
return duration, chapters
|
||||
except (json.JSONDecodeError, subprocess.TimeoutExpired, ValueError, KeyError):
|
||||
return None, []
|
||||
@@ -2,7 +2,6 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import signal
|
||||
@@ -13,6 +12,7 @@ from typing import Callable
|
||||
|
||||
from .downloads import DownloadManager
|
||||
from .library import LibraryClient
|
||||
from .media_info import load_media_info
|
||||
|
||||
StatusCallback = Callable[[str], None]
|
||||
|
||||
@@ -89,7 +89,9 @@ class PlaybackController:
|
||||
self.playback_start_time = time.time()
|
||||
self.paused_duration = 0.0
|
||||
self.pause_start_time = None
|
||||
self._load_media_info(path, activation_hex)
|
||||
duration, chapters = load_media_info(path, activation_hex)
|
||||
self.total_duration = duration
|
||||
self.chapters = chapters
|
||||
notify(f"Playing: {path.name}")
|
||||
return True
|
||||
|
||||
@@ -358,41 +360,6 @@ class PlaybackController:
|
||||
"""Seek backward by specified seconds. Returns True if action was taken."""
|
||||
return self._seek(seconds, "backward")
|
||||
|
||||
def _load_media_info(self, path: Path, activation_hex: str | None) -> None:
|
||||
"""Load media information including duration and chapters using ffprobe."""
|
||||
if not shutil.which("ffprobe"):
|
||||
return
|
||||
|
||||
try:
|
||||
cmd = ["ffprobe", "-v", "quiet", "-print_format",
|
||||
"json", "-show_format", "-show_chapters"]
|
||||
if activation_hex:
|
||||
cmd.extend(["-activation_bytes", activation_hex])
|
||||
cmd.append(str(path))
|
||||
|
||||
result = subprocess.run(
|
||||
cmd, capture_output=True, text=True, timeout=10)
|
||||
if result.returncode != 0:
|
||||
return
|
||||
|
||||
data = json.loads(result.stdout)
|
||||
format_info = data.get("format", {})
|
||||
duration_str = format_info.get("duration")
|
||||
if duration_str:
|
||||
self.total_duration = float(duration_str)
|
||||
|
||||
chapters_data = data.get("chapters", [])
|
||||
self.chapters = [
|
||||
{
|
||||
"start_time": float(ch.get("start_time", 0)),
|
||||
"end_time": float(ch.get("end_time", 0)),
|
||||
"title": ch.get("tags", {}).get("title", f"Chapter {idx + 1}"),
|
||||
}
|
||||
for idx, ch in enumerate(chapters_data)
|
||||
]
|
||||
except (json.JSONDecodeError, subprocess.TimeoutExpired, ValueError, KeyError):
|
||||
pass
|
||||
|
||||
def get_current_progress(self) -> tuple[str, float, float] | None:
|
||||
"""Get current playback progress."""
|
||||
if not self.is_playing or self.playback_start_time is None:
|
||||
|
||||
40
auditui/table_helpers.py
Normal file
40
auditui/table_helpers.py
Normal file
@@ -0,0 +1,40 @@
|
||||
"""Helper functions for table operations."""
|
||||
|
||||
import unicodedata
|
||||
from typing import Callable
|
||||
|
||||
from .constants import PROGRESS_COLUMN_INDEX
|
||||
|
||||
|
||||
def create_title_sort_key(reverse: bool = False) -> tuple[Callable, bool]:
|
||||
"""Create a sort key function for sorting by title.
|
||||
|
||||
Returns:
|
||||
Tuple of (sort_key_function, reverse_flag)
|
||||
"""
|
||||
def title_key(row_values):
|
||||
title_cell = row_values[0]
|
||||
if isinstance(title_cell, str):
|
||||
normalized = unicodedata.normalize('NFD', title_cell)
|
||||
return normalized.encode('ascii', 'ignore').decode('ascii').lower()
|
||||
return str(title_cell).lower()
|
||||
|
||||
return title_key, reverse
|
||||
|
||||
|
||||
def create_progress_sort_key(progress_column_index: int = PROGRESS_COLUMN_INDEX, reverse: bool = False) -> tuple[Callable, bool]:
|
||||
"""Create a sort key function for sorting by progress percentage.
|
||||
|
||||
Returns:
|
||||
Tuple of (sort_key_function, reverse_flag)
|
||||
"""
|
||||
def progress_key(row_values):
|
||||
progress_cell = row_values[progress_column_index]
|
||||
if isinstance(progress_cell, str):
|
||||
try:
|
||||
return float(progress_cell.rstrip("%"))
|
||||
except (ValueError, AttributeError):
|
||||
return 0.0
|
||||
return 0.0
|
||||
|
||||
return progress_key, reverse
|
||||
Reference in New Issue
Block a user