diff --git a/tui/jobs.py b/tui/jobs.py index 61dc009..8e44bed 100644 --- a/tui/jobs.py +++ b/tui/jobs.py @@ -1,9 +1,11 @@ import asyncio +import json import os import signal import uuid from dataclasses import dataclass, field from datetime import datetime +from pathlib import Path from textual.message import Message @@ -12,6 +14,16 @@ from tui.backend import start_cli_process MAX_OUTPUT_LINES = 10_000 +def _work_dir() -> Path: + if os.geteuid() == 0: + return Path("/usr/local/gniza/workdir") + state_home = os.environ.get("XDG_STATE_HOME", str(Path.home() / ".local" / "state")) + return Path(state_home) / "gniza" / "workdir" + + +REGISTRY_FILE = _work_dir() / "gniza-jobs.json" + + class JobFinished(Message): def __init__(self, job_id: str, return_code: int) -> None: super().__init__() @@ -30,12 +42,16 @@ class Job: return_code: int | None = None output: list[str] = field(default_factory=list) _proc: asyncio.subprocess.Process | None = field(default=None, repr=False) + _pid: int | None = field(default=None, repr=False) + _pgid: int | None = field(default=None, repr=False) + _reconnected: bool = field(default=False, repr=False) class JobManager: def __init__(self) -> None: self._jobs: dict[str, Job] = {} + self._load_registry() def create_job(self, kind: str, label: str) -> Job: job = Job(id=uuid.uuid4().hex[:8], kind=kind, label=label) @@ -53,6 +69,7 @@ class JobManager: def remove_finished(self) -> None: self._jobs = {k: v for k, v in self._jobs.items() if v.status == "running"} + self._save_registry() def start_job(self, app, job: Job, *cli_args: str) -> None: asyncio.create_task(self.run_job(app, job, *cli_args)) @@ -60,6 +77,12 @@ class JobManager: async def run_job(self, app, job: Job, *cli_args: str) -> int: proc = await start_cli_process(*cli_args) job._proc = proc + job._pid = proc.pid + try: + job._pgid = os.getpgid(proc.pid) + except OSError: + job._pgid = None + self._save_registry() try: while True: line = await proc.stdout.readline() @@ -78,6 +101,8 @@ class JobManager: finally: job.finished_at = datetime.now() job._proc = None + job._reconnected = False + self._save_registry() rc = job.return_code if job.return_code is not None else 1 app.post_message(JobFinished(job.id, rc)) return job.return_code if job.return_code is not None else 1 @@ -98,6 +123,18 @@ class JobManager: job = self._jobs.get(job_id) if not job: return "job not found" + # Reconnected jobs: use stored PID/PGID + if job._reconnected and job._pid: + try: + pgid = job._pgid or os.getpgid(job._pid) + os.killpg(pgid, signal.SIGKILL) + return f"killed pgid={pgid} (pid={job._pid})" + except (ProcessLookupError, PermissionError, OSError) as e: + try: + os.kill(job._pid, signal.SIGKILL) + return f"fallback kill pid={job._pid} ({e})" + except (ProcessLookupError, OSError) as e2: + return f"failed: {e}, {e2}" if job._proc is None: return f"proc is None (status={job.status})" pid = job._proc.pid @@ -116,6 +153,91 @@ class JobManager: for job in self._jobs.values(): if job._proc is not None: self._kill_process_group(job._proc) + elif job._reconnected and job._pid: + try: + pgid = job._pgid or os.getpgid(job._pid) + os.killpg(pgid, signal.SIGKILL) + except (ProcessLookupError, PermissionError, OSError): + try: + os.kill(job._pid, signal.SIGKILL) + except (ProcessLookupError, OSError): + pass + + # ── Job Registry Persistence ───────────────────────────── + + def _save_registry(self) -> None: + entries = [] + for job in self._jobs.values(): + if job.status != "running": + continue + pid = job._pid + if job._proc is not None: + pid = job._proc.pid + if pid is None: + continue + entries.append({ + "id": job.id, + "kind": job.kind, + "label": job.label, + "pid": pid, + "pgid": job._pgid, + "started_at": job.started_at.isoformat(), + }) + try: + REGISTRY_FILE.parent.mkdir(parents=True, exist_ok=True) + REGISTRY_FILE.write_text(json.dumps(entries, indent=2)) + except OSError: + pass + + def _load_registry(self) -> None: + if not REGISTRY_FILE.is_file(): + return + try: + entries = json.loads(REGISTRY_FILE.read_text()) + except (json.JSONDecodeError, OSError): + return + for entry in entries: + pid = entry.get("pid") + if pid is None: + continue + # Check if process is still alive + try: + os.kill(pid, 0) + except (ProcessLookupError, PermissionError): + continue + job_id = entry["id"] + if job_id in self._jobs: + continue + job = Job( + id=job_id, + kind=entry.get("kind", "backup"), + label=entry.get("label", f"Job (PID {pid})"), + status="running", + started_at=datetime.fromisoformat(entry["started_at"]), + ) + job._pid = pid + job._pgid = entry.get("pgid") + job._reconnected = True + self._jobs[job.id] = job + + def check_reconnected(self) -> None: + changed = False + for job in list(self._jobs.values()): + if not job._reconnected or job.status != "running": + continue + if job._pid is None: + continue + try: + os.kill(job._pid, 0) + except ProcessLookupError: + job.status = "success" + job.finished_at = datetime.now() + job._reconnected = False + changed = True + except PermissionError: + pass # Process exists but we can't signal it + if changed: + self._save_registry() job_manager = JobManager() diff --git a/tui/screens/running_tasks.py b/tui/screens/running_tasks.py index c75cf55..b69ca89 100644 --- a/tui/screens/running_tasks.py +++ b/tui/screens/running_tasks.py @@ -45,6 +45,7 @@ class RunningTasksScreen(Screen): return f"{hours}h {m}m" def _refresh_table(self) -> None: + job_manager.check_reconnected() table = self.query_one("#rt-table", DataTable) # Preserve cursor position old_row = table.cursor_coordinate.row if table.row_count > 0 else 0