Files
gniza4linux/tui/screens/running_tasks.py
shuki 040f861096 Fix progress bar stuck: split on \r to get latest rsync progress
rsync --info=progress2 uses \r without \n, so the entire progress
stream is one long line. Split on both \r and \n to parse each
progress update independently.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 21:57:17 +02:00

216 lines
8.5 KiB
Python

import re
from datetime import datetime
from pathlib import Path
from textual.app import ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Button, DataTable, RichLog, ProgressBar
from textual.containers import Vertical, Horizontal
from textual.timer import Timer
from tui.jobs import job_manager
from tui.widgets import ConfirmDialog
_PROGRESS_RE = re.compile(r"(\d+)%")
class RunningTasksScreen(Screen):
BINDINGS = [("escape", "go_back", "Back")]
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with Vertical(id="running-tasks-screen"):
yield Static("Running Tasks", id="screen-title")
yield DataTable(id="rt-table")
with Horizontal(id="rt-buttons"):
yield Button("View Log", variant="primary", id="btn-rt-view")
yield Button("Kill Job", variant="error", id="btn-rt-kill")
yield Button("Clear Finished", variant="warning", id="btn-rt-clear")
yield Button("Back", id="btn-rt-back")
yield Static("", id="rt-progress-label")
yield ProgressBar(id="rt-progress", total=100, show_eta=False)
yield RichLog(id="rt-log-viewer", wrap=True, highlight=True)
yield Footer()
def on_mount(self) -> None:
table = self.query_one("#rt-table", DataTable)
table.add_columns("Status", "Job", "Started", "Duration")
table.cursor_type = "row"
self._refresh_table()
self._timer = self.set_interval(1, self._refresh_table)
self._log_timer: Timer | None = None
self._viewing_job_id: str | None = None
self._log_file_pos: int = 0
# Hide progress bar initially
self.query_one("#rt-progress", ProgressBar).display = False
self.query_one("#rt-progress-label", Static).display = False
def _format_duration(self, job) -> str:
end = job.finished_at or datetime.now()
delta = end - job.started_at
secs = int(delta.total_seconds())
if secs < 60:
return f"{secs}s"
mins, s = divmod(secs, 60)
if mins < 60:
return f"{mins}m {s}s"
hours, m = divmod(mins, 60)
return f"{hours}h {m}m"
def _refresh_table(self) -> None:
job_manager.check_reconnected()
table = self.query_one("#rt-table", DataTable)
old_row = table.cursor_coordinate.row if table.row_count > 0 else 0
table.clear()
for job in job_manager.list_jobs():
if job.status == "running":
icon = "... "
elif job.status == "success":
icon = " ok "
elif job.status == "unknown":
icon = " ? "
else:
icon = " X "
started = job.started_at.strftime("%H:%M:%S")
table.add_row(icon, job.label, started, self._format_duration(job), key=job.id)
if table.row_count > 0:
table.move_cursor(row=min(old_row, table.row_count - 1))
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "btn-rt-back":
self.app.pop_screen()
elif event.button.id == "btn-rt-clear":
job_manager.remove_finished()
self._refresh_table()
elif event.button.id == "btn-rt-kill":
self._kill_selected()
elif event.button.id == "btn-rt-view":
self._view_selected_log()
def _view_selected_log(self) -> None:
table = self.query_one("#rt-table", DataTable)
if table.row_count == 0:
self.notify("No jobs to view", severity="warning")
return
row_key, _ = table.coordinate_to_cell_key(table.cursor_coordinate)
job_id = str(row_key.value if hasattr(row_key, 'value') else row_key)
job = job_manager.get_job(job_id)
if not job:
self.notify("Job not found", severity="warning")
return
log_viewer = self.query_one("#rt-log-viewer", RichLog)
log_viewer.clear()
self._viewing_job_id = job_id
self._log_file_pos = 0
# Reset progress bar
progress = self.query_one("#rt-progress", ProgressBar)
label = self.query_one("#rt-progress-label", Static)
progress.update(progress=0)
# Load existing content from log file
if job._log_file and Path(job._log_file).is_file():
try:
raw = Path(job._log_file).read_bytes()
self._log_file_pos = len(raw)
content = raw.decode(errors="replace")
self._process_log_content(content, log_viewer)
except OSError:
pass
elif job.output:
for line in job.output:
log_viewer.write(line)
# Show/hide progress bar based on job status
is_running = job.status == "running"
progress.display = is_running
label.display = is_running
# Start polling for new content if job is running
if self._log_timer:
self._log_timer.stop()
if is_running:
self._log_timer = self.set_interval(0.3, self._poll_log)
def _process_log_content(self, content: str, log_viewer: RichLog) -> None:
"""Process log content, extracting rsync progress and writing log lines."""
# Split on both \n and \r to handle rsync --info=progress2 output
parts = re.split(r"[\r\n]+", content)
for part in parts:
part = part.strip()
if not part:
continue
m = _PROGRESS_RE.search(part)
if m and ("xfr#" in part or "to-chk=" in part or "MB/s" in part or "kB/s" in part or "GB/s" in part):
self._update_progress(part)
else:
log_viewer.write(part)
def _update_progress(self, text: str) -> None:
"""Parse rsync progress2 line and update progress bar."""
m = _PROGRESS_RE.search(text)
if not m:
return
pct = int(m.group(1))
try:
progress = self.query_one("#rt-progress", ProgressBar)
label = self.query_one("#rt-progress-label", Static)
progress.update(progress=pct)
# Show the raw progress info as label
label.update(f" {text.strip()}")
except Exception:
pass
def _poll_log(self) -> None:
if not self._viewing_job_id:
return
job = job_manager.get_job(self._viewing_job_id)
if not job:
if self._log_timer:
self._log_timer.stop()
return
try:
log_viewer = self.query_one("#rt-log-viewer", RichLog)
except Exception:
return
if job._log_file and Path(job._log_file).is_file():
try:
with open(job._log_file, "rb") as f:
f.seek(self._log_file_pos)
new_raw = f.read()
if new_raw:
self._log_file_pos += len(new_raw)
new_data = new_raw.decode(errors="replace")
self._process_log_content(new_data, log_viewer)
except OSError:
pass
if job.status != "running":
self.query_one("#rt-progress", ProgressBar).display = False
self.query_one("#rt-progress-label", Static).display = False
if self._log_timer:
self._log_timer.stop()
def _kill_selected(self) -> None:
table = self.query_one("#rt-table", DataTable)
if table.row_count == 0:
self.notify("No jobs to kill", severity="warning")
return
row_key, _ = table.coordinate_to_cell_key(table.cursor_coordinate)
job_id = str(row_key.value if hasattr(row_key, 'value') else row_key)
job = job_manager.get_job(job_id)
if not job:
ids = [j.id for j in job_manager.list_jobs()]
self.notify(f"Not found: key={row_key!r} ids={ids}", severity="warning")
return
if job.status != "running":
self.notify(f"Job already finished ({job.status})", severity="warning")
return
self.app.push_screen(
ConfirmDialog(f"Kill job '{job.label}'?", "Confirm Kill"),
callback=lambda ok: self._do_kill(job_id) if ok else None,
)
def _do_kill(self, job_id: str) -> None:
result = job_manager.kill_job(job_id)
self.notify(f"Kill: {result}")
def action_go_back(self) -> None:
self.app.pop_screen()