diff --git a/tui/app.py b/tui/app.py index d28c246..3dd0c14 100644 --- a/tui/app.py +++ b/tui/app.py @@ -47,6 +47,9 @@ class GnizaApp(App): self.push_screen("wizard") else: self.push_screen("main") + # Start tailing log files for any jobs that were running + # when the TUI was last closed + job_manager.start_tailing_reconnected(self) def on_job_finished(self, message: JobFinished) -> None: job = job_manager.get_job(message.job_id) diff --git a/tui/jobs.py b/tui/jobs.py index fa5e5cc..c1dd63f 100644 --- a/tui/jobs.py +++ b/tui/jobs.py @@ -12,6 +12,7 @@ from textual.message import Message from tui.backend import start_cli_process MAX_OUTPUT_LINES = 10_000 +FINISHED_JOB_TTL_HOURS = 24 def _work_dir() -> Path: @@ -46,6 +47,7 @@ class Job: _pgid: int | None = field(default=None, repr=False) _reconnected: bool = field(default=False, repr=False) _log_file: str | None = field(default=None, repr=False) + _tail_task: asyncio.Task | None = field(default=None, repr=False) class JobManager: @@ -106,16 +108,25 @@ class JobManager: rc = proc.returncode if proc.returncode is not None else 1 job.return_code = rc job.status = "success" if rc == 0 else "failed" + except (asyncio.CancelledError, KeyboardInterrupt): + # TUI is shutting down — keep status as "running" so the job + # stays in the registry for reconnection on next launch. + self._save_registry() + raise except Exception: job.status = "failed" job.return_code = job.return_code if job.return_code is not None else 1 finally: - job.finished_at = datetime.now() + if job.status != "running": + job.finished_at = datetime.now() job._proc = None job._reconnected = False self._save_registry() rc = job.return_code if job.return_code is not None else 1 - app.post_message(JobFinished(job.id, rc)) + try: + app.post_message(JobFinished(job.id, rc)) + except Exception: + pass return job.return_code if job.return_code is not None else 1 @staticmethod @@ -178,23 +189,30 @@ class JobManager: def _save_registry(self) -> None: entries = [] + now = datetime.now() for job in self._jobs.values(): - if job.status != "running": - continue + # Skip finished jobs older than TTL + if job.status != "running" and job.finished_at: + age_hours = (now - job.finished_at).total_seconds() / 3600 + if age_hours > FINISHED_JOB_TTL_HOURS: + continue pid = job._pid if job._proc is not None: pid = job._proc.pid - if pid is None: - continue - entries.append({ + entry = { "id": job.id, "kind": job.kind, "label": job.label, - "pid": pid, - "pgid": job._pgid, + "status": job.status, + "return_code": job.return_code, "started_at": job.started_at.isoformat(), + "finished_at": job.finished_at.isoformat() if job.finished_at else None, "log_file": job._log_file, - }) + } + if job.status == "running" and pid is not None: + entry["pid"] = pid + entry["pgid"] = job._pgid + entries.append(entry) try: REGISTRY_FILE.parent.mkdir(parents=True, exist_ok=True) REGISTRY_FILE.write_text(json.dumps(entries, indent=2)) @@ -208,31 +226,79 @@ class JobManager: entries = json.loads(REGISTRY_FILE.read_text()) except (json.JSONDecodeError, OSError): return + now = datetime.now() for entry in entries: - pid = entry.get("pid") - if pid is None: - continue job_id = entry["id"] if job_id in self._jobs: continue - # Check if process is still alive + saved_status = entry.get("status", "running") + pid = entry.get("pid") + + # Already-finished job from a previous session + if saved_status != "running": + finished_at_str = entry.get("finished_at") + finished_at = datetime.fromisoformat(finished_at_str) if finished_at_str else now + age_hours = (now - finished_at).total_seconds() / 3600 + if age_hours > FINISHED_JOB_TTL_HOURS: + continue + job = Job( + id=job_id, + kind=entry.get("kind", "backup"), + label=entry.get("label", "Job"), + status=saved_status, + started_at=datetime.fromisoformat(entry["started_at"]), + finished_at=finished_at, + return_code=entry.get("return_code"), + ) + job._log_file = entry.get("log_file") + if job._log_file and Path(job._log_file).is_file(): + try: + lines = Path(job._log_file).read_text().splitlines() + job.output = lines[:MAX_OUTPUT_LINES] + except OSError: + pass + self._jobs[job.id] = job + continue + + # Running job — check if process is still alive + if pid is None: + continue alive = False try: os.kill(pid, 0) alive = True - except (ProcessLookupError, PermissionError): + except ProcessLookupError: pass - job = Job( - id=job_id, - kind=entry.get("kind", "backup"), - label=entry.get("label", f"Job (PID {pid})"), - status="running" if alive else "success", - started_at=datetime.fromisoformat(entry["started_at"]), - finished_at=None if alive else datetime.now(), - ) - job._pid = pid - job._pgid = entry.get("pgid") - job._reconnected = alive + except PermissionError: + # Process exists but we can't signal it + alive = True + if alive: + job = Job( + id=job_id, + kind=entry.get("kind", "backup"), + label=entry.get("label", f"Job (PID {pid})"), + status="running", + started_at=datetime.fromisoformat(entry["started_at"]), + ) + job._pid = pid + job._pgid = entry.get("pgid") + job._reconnected = True + else: + # Process finished while TUI was closed — check log for exit info + rc = self._detect_return_code(entry.get("log_file")) + if rc is None: + status = "unknown" + else: + status = "success" if rc == 0 else "failed" + job = Job( + id=job_id, + kind=entry.get("kind", "backup"), + label=entry.get("label", f"Job (PID {pid})"), + status=status, + started_at=datetime.fromisoformat(entry["started_at"]), + finished_at=now, + return_code=rc, + ) job._log_file = entry.get("log_file") # Load output from log file if job._log_file and Path(job._log_file).is_file(): @@ -242,20 +308,121 @@ class JobManager: except OSError: pass self._jobs[job.id] = job - # Clean up registry: only keep still-running entries self._save_registry() + @staticmethod + def _detect_return_code(log_file: str | None) -> int | None: + """Try to determine exit code from log file content. + + Returns 0 for success, 1 for detected failure, None if unknown. + """ + if not log_file or not Path(log_file).is_file(): + return None + try: + text = Path(log_file).read_text() + if not text.strip(): + return None + for marker in ("FATAL:", "ERROR:", "failed", "Failed"): + if marker in text: + return 1 + # Look for success indicators + if "completed" in text.lower() or "Backup Summary" in text: + return 0 + except OSError: + return None + return None + + def start_tailing_reconnected(self, app) -> None: + """Start log file tailing tasks for all reconnected running jobs.""" + for job in self._jobs.values(): + if job._reconnected and job.status == "running" and job._tail_task is None: + job._tail_task = asyncio.create_task( + self._tail_reconnected(app, job) + ) + + async def _tail_reconnected(self, app, job: Job) -> None: + """Tail the log file and monitor PID for a reconnected job.""" + try: + log_path = job._log_file + if not log_path or not Path(log_path).is_file(): + # No log file — just poll PID + while job.status == "running": + if not job._pid: + break + try: + os.kill(job._pid, 0) + except ProcessLookupError: + break + except PermissionError: + pass + await asyncio.sleep(1) + else: + with open(log_path, "r") as f: + # Seek to end of already-loaded content + f.seek(0, 2) + while job.status == "running": + line = f.readline() + if line: + text = line.rstrip("\n") + if len(job.output) < MAX_OUTPUT_LINES: + job.output.append(text) + else: + # Check if process is still alive + if job._pid: + try: + os.kill(job._pid, 0) + except ProcessLookupError: + break + except PermissionError: + pass + await asyncio.sleep(0.3) + # Read remaining lines after process exit + for line in f: + text = line.rstrip("\n") + if len(job.output) < MAX_OUTPUT_LINES: + job.output.append(text) + # Process finished + if job.status == "running": + rc = self._detect_return_code(job._log_file) + job.return_code = rc + if rc is None: + job.status = "unknown" + else: + job.status = "success" if rc == 0 else "failed" + job.finished_at = datetime.now() + job._reconnected = False + self._save_registry() + try: + app.post_message(JobFinished(job.id, rc or 0)) + except Exception: + pass + except (asyncio.CancelledError, KeyboardInterrupt): + # TUI shutting down — keep job as running for next reconnect + raise + except Exception: + pass + finally: + job._tail_task = None + def check_reconnected(self) -> None: changed = False for job in list(self._jobs.values()): if not job._reconnected or job.status != "running": continue + # Skip jobs that have an active tail task + if job._tail_task is not None: + continue if job._pid is None: continue try: os.kill(job._pid, 0) except ProcessLookupError: - job.status = "success" + rc = self._detect_return_code(job._log_file) + job.return_code = rc + if rc is None: + job.status = "unknown" + else: + job.status = "success" if rc == 0 else "failed" job.finished_at = datetime.now() job._reconnected = False changed = True diff --git a/tui/screens/running_tasks.py b/tui/screens/running_tasks.py index b69ca89..859a7fa 100644 --- a/tui/screens/running_tasks.py +++ b/tui/screens/running_tasks.py @@ -55,6 +55,8 @@ class RunningTasksScreen(Screen): icon = "... " elif job.status == "success": icon = " ok " + elif job.status == "unknown": + icon = " ? " else: icon = " X " started = job.started_at.strftime("%H:%M:%S")