Files
gniza4linux/tui/screens/running_tasks.py
shuki ba93a74f8e Persist running jobs to registry for TUI reconnection
Jobs are now saved to gniza-jobs.json in WORK_DIR when they start and
finish. On TUI restart, the registry is loaded and PIDs are checked —
still-running jobs appear in the Running Tasks screen and can be
killed. Reconnected jobs are polled every second to detect completion.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 19:27:37 +02:00

115 lines
4.4 KiB
Python

from datetime import datetime
from textual.app import ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Button, DataTable
from textual.containers import Vertical, Horizontal
from tui.jobs import job_manager
from tui.widgets import ConfirmDialog, OperationLog
class RunningTasksScreen(Screen):
BINDINGS = [("escape", "go_back", "Back")]
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with Vertical(id="running-tasks-screen"):
yield Static("Running Tasks", id="screen-title")
yield DataTable(id="rt-table")
with Horizontal(id="rt-buttons"):
yield Button("View Log", variant="primary", id="btn-rt-view")
yield Button("Kill Job", variant="error", id="btn-rt-kill")
yield Button("Clear Finished", variant="warning", id="btn-rt-clear")
yield Button("Back", id="btn-rt-back")
yield Footer()
def on_mount(self) -> None:
table = self.query_one("#rt-table", DataTable)
table.add_columns("Status", "Job", "Started", "Duration")
table.cursor_type = "row"
self._refresh_table()
self._timer = self.set_interval(1, self._refresh_table)
def _format_duration(self, job) -> str:
end = job.finished_at or datetime.now()
delta = end - job.started_at
secs = int(delta.total_seconds())
if secs < 60:
return f"{secs}s"
mins, s = divmod(secs, 60)
if mins < 60:
return f"{mins}m {s}s"
hours, m = divmod(mins, 60)
return f"{hours}h {m}m"
def _refresh_table(self) -> None:
job_manager.check_reconnected()
table = self.query_one("#rt-table", DataTable)
# Preserve cursor position
old_row = table.cursor_coordinate.row if table.row_count > 0 else 0
table.clear()
for job in job_manager.list_jobs():
if job.status == "running":
icon = "... "
elif job.status == "success":
icon = " ok "
else:
icon = " X "
started = job.started_at.strftime("%H:%M:%S")
table.add_row(icon, job.label, started, self._format_duration(job), key=job.id)
if table.row_count > 0:
table.move_cursor(row=min(old_row, table.row_count - 1))
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "btn-rt-back":
self.app.pop_screen()
elif event.button.id == "btn-rt-clear":
job_manager.remove_finished()
self._refresh_table()
elif event.button.id == "btn-rt-kill":
self._kill_selected()
elif event.button.id == "btn-rt-view":
table = self.query_one("#rt-table", DataTable)
if table.row_count == 0:
self.notify("No jobs to view", severity="warning")
return
row_key, _ = table.coordinate_to_cell_key(table.cursor_coordinate)
job_id = str(row_key)
job = job_manager.get_job(job_id)
if job:
log_screen = OperationLog(
title=job.label,
show_spinner=job.status == "running",
job_id=job.id,
)
self.app.push_screen(log_screen)
def _kill_selected(self) -> None:
table = self.query_one("#rt-table", DataTable)
if table.row_count == 0:
self.notify("No jobs to kill", severity="warning")
return
row_key, _ = table.coordinate_to_cell_key(table.cursor_coordinate)
job_id = str(row_key.value if hasattr(row_key, 'value') else row_key)
job = job_manager.get_job(job_id)
if not job:
ids = [j.id for j in job_manager.list_jobs()]
self.notify(f"Not found: key={row_key!r} ids={ids}", severity="warning")
return
if job.status != "running":
self.notify(f"Job already finished ({job.status})", severity="warning")
return
self.app.push_screen(
ConfirmDialog(f"Kill job '{job.label}'?", "Confirm Kill"),
callback=lambda ok: self._do_kill(job_id) if ok else None,
)
def _do_kill(self, job_id: str) -> None:
result = job_manager.kill_job(job_id)
self.notify(f"Kill: {result}")
def action_go_back(self) -> None:
self.app.pop_screen()