- Save finished jobs to registry (24h TTL) so they survive TUI restart - Fix PermissionError in PID check incorrectly marking alive processes as dead - Handle CancelledError explicitly to preserve running status on TUI exit - Tail log files for reconnected running jobs instead of showing stale output - Detect actual return code from log content; show "?" for unknown status Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
436 lines
16 KiB
Python
436 lines
16 KiB
Python
import asyncio
|
|
import json
|
|
import os
|
|
import signal
|
|
import uuid
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
from textual.message import Message
|
|
|
|
from tui.backend import start_cli_process
|
|
|
|
MAX_OUTPUT_LINES = 10_000
|
|
FINISHED_JOB_TTL_HOURS = 24
|
|
|
|
|
|
def _work_dir() -> Path:
|
|
if os.geteuid() == 0:
|
|
return Path("/usr/local/gniza/workdir")
|
|
state_home = os.environ.get("XDG_STATE_HOME", str(Path.home() / ".local" / "state"))
|
|
return Path(state_home) / "gniza" / "workdir"
|
|
|
|
|
|
REGISTRY_FILE = _work_dir() / "gniza-jobs.json"
|
|
|
|
|
|
class JobFinished(Message):
|
|
def __init__(self, job_id: str, return_code: int) -> None:
|
|
super().__init__()
|
|
self.job_id = job_id
|
|
self.return_code = return_code
|
|
|
|
|
|
@dataclass
|
|
class Job:
|
|
id: str
|
|
kind: str
|
|
label: str
|
|
status: str = "running"
|
|
started_at: datetime = field(default_factory=datetime.now)
|
|
finished_at: datetime | None = None
|
|
return_code: int | None = None
|
|
output: list[str] = field(default_factory=list)
|
|
_proc: asyncio.subprocess.Process | None = field(default=None, repr=False)
|
|
_pid: int | None = field(default=None, repr=False)
|
|
_pgid: int | None = field(default=None, repr=False)
|
|
_reconnected: bool = field(default=False, repr=False)
|
|
_log_file: str | None = field(default=None, repr=False)
|
|
_tail_task: asyncio.Task | None = field(default=None, repr=False)
|
|
|
|
|
|
class JobManager:
|
|
|
|
def __init__(self) -> None:
|
|
self._jobs: dict[str, Job] = {}
|
|
self._load_registry()
|
|
|
|
def create_job(self, kind: str, label: str) -> Job:
|
|
job = Job(id=uuid.uuid4().hex[:8], kind=kind, label=label)
|
|
self._jobs[job.id] = job
|
|
return job
|
|
|
|
def get_job(self, job_id: str) -> Job | None:
|
|
return self._jobs.get(job_id)
|
|
|
|
def list_jobs(self) -> list[Job]:
|
|
return list(self._jobs.values())
|
|
|
|
def running_count(self) -> int:
|
|
return sum(1 for j in self._jobs.values() if j.status == "running")
|
|
|
|
def remove_finished(self) -> None:
|
|
self._jobs = {k: v for k, v in self._jobs.items() if v.status == "running"}
|
|
self._save_registry()
|
|
|
|
def start_job(self, app, job: Job, *cli_args: str) -> None:
|
|
asyncio.create_task(self.run_job(app, job, *cli_args))
|
|
|
|
async def run_job(self, app, job: Job, *cli_args: str) -> int:
|
|
log_path = _work_dir() / f"gniza-job-{job.id}.log"
|
|
job._log_file = str(log_path)
|
|
proc = await start_cli_process(*cli_args, log_file=str(log_path))
|
|
job._proc = proc
|
|
job._pid = proc.pid
|
|
try:
|
|
job._pgid = os.getpgid(proc.pid)
|
|
except OSError:
|
|
job._pgid = None
|
|
self._save_registry()
|
|
try:
|
|
# Wait for process and tail log file concurrently
|
|
wait_task = asyncio.create_task(proc.wait())
|
|
with open(log_path, "r") as f:
|
|
while not wait_task.done():
|
|
line = f.readline()
|
|
if line:
|
|
text = line.rstrip("\n")
|
|
if len(job.output) < MAX_OUTPUT_LINES:
|
|
job.output.append(text)
|
|
else:
|
|
await asyncio.sleep(0.2)
|
|
# Read remaining lines after process exit
|
|
for line in f:
|
|
text = line.rstrip("\n")
|
|
if len(job.output) < MAX_OUTPUT_LINES:
|
|
job.output.append(text)
|
|
rc = proc.returncode if proc.returncode is not None else 1
|
|
job.return_code = rc
|
|
job.status = "success" if rc == 0 else "failed"
|
|
except (asyncio.CancelledError, KeyboardInterrupt):
|
|
# TUI is shutting down — keep status as "running" so the job
|
|
# stays in the registry for reconnection on next launch.
|
|
self._save_registry()
|
|
raise
|
|
except Exception:
|
|
job.status = "failed"
|
|
job.return_code = job.return_code if job.return_code is not None else 1
|
|
finally:
|
|
if job.status != "running":
|
|
job.finished_at = datetime.now()
|
|
job._proc = None
|
|
job._reconnected = False
|
|
self._save_registry()
|
|
rc = job.return_code if job.return_code is not None else 1
|
|
try:
|
|
app.post_message(JobFinished(job.id, rc))
|
|
except Exception:
|
|
pass
|
|
return job.return_code if job.return_code is not None else 1
|
|
|
|
@staticmethod
|
|
def _kill_process_group(proc: asyncio.subprocess.Process) -> None:
|
|
try:
|
|
pgid = os.getpgid(proc.pid)
|
|
os.killpg(pgid, signal.SIGKILL)
|
|
except (ProcessLookupError, PermissionError, OSError):
|
|
try:
|
|
proc.kill()
|
|
except (ProcessLookupError, OSError):
|
|
pass
|
|
|
|
def kill_job(self, job_id: str) -> str:
|
|
"""Kill a job. Returns a status message for debugging."""
|
|
job = self._jobs.get(job_id)
|
|
if not job:
|
|
return "job not found"
|
|
# Reconnected jobs: use stored PID/PGID
|
|
if job._reconnected and job._pid:
|
|
try:
|
|
pgid = job._pgid or os.getpgid(job._pid)
|
|
os.killpg(pgid, signal.SIGKILL)
|
|
return f"killed pgid={pgid} (pid={job._pid})"
|
|
except (ProcessLookupError, PermissionError, OSError) as e:
|
|
try:
|
|
os.kill(job._pid, signal.SIGKILL)
|
|
return f"fallback kill pid={job._pid} ({e})"
|
|
except (ProcessLookupError, OSError) as e2:
|
|
return f"failed: {e}, {e2}"
|
|
if job._proc is None:
|
|
return f"proc is None (status={job.status})"
|
|
pid = job._proc.pid
|
|
try:
|
|
pgid = os.getpgid(pid)
|
|
os.killpg(pgid, signal.SIGKILL)
|
|
return f"killed pgid={pgid} (pid={pid})"
|
|
except (ProcessLookupError, PermissionError, OSError) as e:
|
|
try:
|
|
job._proc.kill()
|
|
return f"fallback kill pid={pid} ({e})"
|
|
except (ProcessLookupError, OSError) as e2:
|
|
return f"failed: {e}, {e2}"
|
|
|
|
def kill_running(self) -> None:
|
|
for job in self._jobs.values():
|
|
if job._proc is not None:
|
|
self._kill_process_group(job._proc)
|
|
elif job._reconnected and job._pid:
|
|
try:
|
|
pgid = job._pgid or os.getpgid(job._pid)
|
|
os.killpg(pgid, signal.SIGKILL)
|
|
except (ProcessLookupError, PermissionError, OSError):
|
|
try:
|
|
os.kill(job._pid, signal.SIGKILL)
|
|
except (ProcessLookupError, OSError):
|
|
pass
|
|
|
|
# ── Job Registry Persistence ─────────────────────────────
|
|
|
|
def _save_registry(self) -> None:
|
|
entries = []
|
|
now = datetime.now()
|
|
for job in self._jobs.values():
|
|
# Skip finished jobs older than TTL
|
|
if job.status != "running" and job.finished_at:
|
|
age_hours = (now - job.finished_at).total_seconds() / 3600
|
|
if age_hours > FINISHED_JOB_TTL_HOURS:
|
|
continue
|
|
pid = job._pid
|
|
if job._proc is not None:
|
|
pid = job._proc.pid
|
|
entry = {
|
|
"id": job.id,
|
|
"kind": job.kind,
|
|
"label": job.label,
|
|
"status": job.status,
|
|
"return_code": job.return_code,
|
|
"started_at": job.started_at.isoformat(),
|
|
"finished_at": job.finished_at.isoformat() if job.finished_at else None,
|
|
"log_file": job._log_file,
|
|
}
|
|
if job.status == "running" and pid is not None:
|
|
entry["pid"] = pid
|
|
entry["pgid"] = job._pgid
|
|
entries.append(entry)
|
|
try:
|
|
REGISTRY_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
REGISTRY_FILE.write_text(json.dumps(entries, indent=2))
|
|
except OSError:
|
|
pass
|
|
|
|
def _load_registry(self) -> None:
|
|
if not REGISTRY_FILE.is_file():
|
|
return
|
|
try:
|
|
entries = json.loads(REGISTRY_FILE.read_text())
|
|
except (json.JSONDecodeError, OSError):
|
|
return
|
|
now = datetime.now()
|
|
for entry in entries:
|
|
job_id = entry["id"]
|
|
if job_id in self._jobs:
|
|
continue
|
|
saved_status = entry.get("status", "running")
|
|
pid = entry.get("pid")
|
|
|
|
# Already-finished job from a previous session
|
|
if saved_status != "running":
|
|
finished_at_str = entry.get("finished_at")
|
|
finished_at = datetime.fromisoformat(finished_at_str) if finished_at_str else now
|
|
age_hours = (now - finished_at).total_seconds() / 3600
|
|
if age_hours > FINISHED_JOB_TTL_HOURS:
|
|
continue
|
|
job = Job(
|
|
id=job_id,
|
|
kind=entry.get("kind", "backup"),
|
|
label=entry.get("label", "Job"),
|
|
status=saved_status,
|
|
started_at=datetime.fromisoformat(entry["started_at"]),
|
|
finished_at=finished_at,
|
|
return_code=entry.get("return_code"),
|
|
)
|
|
job._log_file = entry.get("log_file")
|
|
if job._log_file and Path(job._log_file).is_file():
|
|
try:
|
|
lines = Path(job._log_file).read_text().splitlines()
|
|
job.output = lines[:MAX_OUTPUT_LINES]
|
|
except OSError:
|
|
pass
|
|
self._jobs[job.id] = job
|
|
continue
|
|
|
|
# Running job — check if process is still alive
|
|
if pid is None:
|
|
continue
|
|
alive = False
|
|
try:
|
|
os.kill(pid, 0)
|
|
alive = True
|
|
except ProcessLookupError:
|
|
pass
|
|
except PermissionError:
|
|
# Process exists but we can't signal it
|
|
alive = True
|
|
if alive:
|
|
job = Job(
|
|
id=job_id,
|
|
kind=entry.get("kind", "backup"),
|
|
label=entry.get("label", f"Job (PID {pid})"),
|
|
status="running",
|
|
started_at=datetime.fromisoformat(entry["started_at"]),
|
|
)
|
|
job._pid = pid
|
|
job._pgid = entry.get("pgid")
|
|
job._reconnected = True
|
|
else:
|
|
# Process finished while TUI was closed — check log for exit info
|
|
rc = self._detect_return_code(entry.get("log_file"))
|
|
if rc is None:
|
|
status = "unknown"
|
|
else:
|
|
status = "success" if rc == 0 else "failed"
|
|
job = Job(
|
|
id=job_id,
|
|
kind=entry.get("kind", "backup"),
|
|
label=entry.get("label", f"Job (PID {pid})"),
|
|
status=status,
|
|
started_at=datetime.fromisoformat(entry["started_at"]),
|
|
finished_at=now,
|
|
return_code=rc,
|
|
)
|
|
job._log_file = entry.get("log_file")
|
|
# Load output from log file
|
|
if job._log_file and Path(job._log_file).is_file():
|
|
try:
|
|
lines = Path(job._log_file).read_text().splitlines()
|
|
job.output = lines[:MAX_OUTPUT_LINES]
|
|
except OSError:
|
|
pass
|
|
self._jobs[job.id] = job
|
|
self._save_registry()
|
|
|
|
@staticmethod
|
|
def _detect_return_code(log_file: str | None) -> int | None:
|
|
"""Try to determine exit code from log file content.
|
|
|
|
Returns 0 for success, 1 for detected failure, None if unknown.
|
|
"""
|
|
if not log_file or not Path(log_file).is_file():
|
|
return None
|
|
try:
|
|
text = Path(log_file).read_text()
|
|
if not text.strip():
|
|
return None
|
|
for marker in ("FATAL:", "ERROR:", "failed", "Failed"):
|
|
if marker in text:
|
|
return 1
|
|
# Look for success indicators
|
|
if "completed" in text.lower() or "Backup Summary" in text:
|
|
return 0
|
|
except OSError:
|
|
return None
|
|
return None
|
|
|
|
def start_tailing_reconnected(self, app) -> None:
|
|
"""Start log file tailing tasks for all reconnected running jobs."""
|
|
for job in self._jobs.values():
|
|
if job._reconnected and job.status == "running" and job._tail_task is None:
|
|
job._tail_task = asyncio.create_task(
|
|
self._tail_reconnected(app, job)
|
|
)
|
|
|
|
async def _tail_reconnected(self, app, job: Job) -> None:
|
|
"""Tail the log file and monitor PID for a reconnected job."""
|
|
try:
|
|
log_path = job._log_file
|
|
if not log_path or not Path(log_path).is_file():
|
|
# No log file — just poll PID
|
|
while job.status == "running":
|
|
if not job._pid:
|
|
break
|
|
try:
|
|
os.kill(job._pid, 0)
|
|
except ProcessLookupError:
|
|
break
|
|
except PermissionError:
|
|
pass
|
|
await asyncio.sleep(1)
|
|
else:
|
|
with open(log_path, "r") as f:
|
|
# Seek to end of already-loaded content
|
|
f.seek(0, 2)
|
|
while job.status == "running":
|
|
line = f.readline()
|
|
if line:
|
|
text = line.rstrip("\n")
|
|
if len(job.output) < MAX_OUTPUT_LINES:
|
|
job.output.append(text)
|
|
else:
|
|
# Check if process is still alive
|
|
if job._pid:
|
|
try:
|
|
os.kill(job._pid, 0)
|
|
except ProcessLookupError:
|
|
break
|
|
except PermissionError:
|
|
pass
|
|
await asyncio.sleep(0.3)
|
|
# Read remaining lines after process exit
|
|
for line in f:
|
|
text = line.rstrip("\n")
|
|
if len(job.output) < MAX_OUTPUT_LINES:
|
|
job.output.append(text)
|
|
# Process finished
|
|
if job.status == "running":
|
|
rc = self._detect_return_code(job._log_file)
|
|
job.return_code = rc
|
|
if rc is None:
|
|
job.status = "unknown"
|
|
else:
|
|
job.status = "success" if rc == 0 else "failed"
|
|
job.finished_at = datetime.now()
|
|
job._reconnected = False
|
|
self._save_registry()
|
|
try:
|
|
app.post_message(JobFinished(job.id, rc or 0))
|
|
except Exception:
|
|
pass
|
|
except (asyncio.CancelledError, KeyboardInterrupt):
|
|
# TUI shutting down — keep job as running for next reconnect
|
|
raise
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
job._tail_task = None
|
|
|
|
def check_reconnected(self) -> None:
|
|
changed = False
|
|
for job in list(self._jobs.values()):
|
|
if not job._reconnected or job.status != "running":
|
|
continue
|
|
# Skip jobs that have an active tail task
|
|
if job._tail_task is not None:
|
|
continue
|
|
if job._pid is None:
|
|
continue
|
|
try:
|
|
os.kill(job._pid, 0)
|
|
except ProcessLookupError:
|
|
rc = self._detect_return_code(job._log_file)
|
|
job.return_code = rc
|
|
if rc is None:
|
|
job.status = "unknown"
|
|
else:
|
|
job.status = "success" if rc == 0 else "failed"
|
|
job.finished_at = datetime.now()
|
|
job._reconnected = False
|
|
changed = True
|
|
except PermissionError:
|
|
pass # Process exists but we can't signal it
|
|
if changed:
|
|
self._save_registry()
|
|
|
|
|
|
job_manager = JobManager()
|