Files
gniza4linux/tui/widgets/operation_log.py
shuki 83ccf44117 Fix live log not updating: tail log file directly in OperationLog
The poll timer was reading from job.output in memory, which depended on
run_job's async task populating it. Now OperationLog reads new content
directly from the log file using seek, making it independent of the
async task. Also store the asyncio task reference to prevent GC.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 21:40:56 +02:00

141 lines
4.9 KiB
Python

import asyncio
from pathlib import Path
from rich.spinner import Spinner as RichSpinner
from rich.text import Text
from textual.app import ComposeResult
from textual.screen import ModalScreen
from textual.widgets import RichLog, Button, Static
from textual.containers import Vertical, Horizontal
from textual.timer import Timer
class SpinnerWidget(Static):
"""Animated spinner using Rich's Spinner renderable."""
def __init__(self, style: str = "dots", **kwargs):
super().__init__("", **kwargs)
self._spinner = RichSpinner(style, text=" Running...")
self._timer: Timer | None = None
def on_mount(self) -> None:
self._timer = self.set_interval(1 / 12, self._tick)
def _tick(self) -> None:
self.update(self._spinner)
def stop(self) -> None:
if self._timer:
self._timer.stop()
self.update("")
class OperationLog(ModalScreen[None]):
BINDINGS = [("escape", "close", "Close")]
def __init__(self, title: str = "Operation Output", show_spinner: bool = True, job_id: str | None = None):
super().__init__()
self._title = title
self._show_spinner = show_spinner
self._job_id = job_id
self._mounted_event = asyncio.Event()
self._buffer: list[str] = []
self._poll_timer: Timer | None = None
self._file_pos: int = 0
def compose(self) -> ComposeResult:
with Vertical(id="op-log"):
yield Static(self._title, id="ol-title")
yield RichLog(id="ol-log", wrap=True, highlight=True, markup=True)
with Horizontal(id="ol-footer"):
yield Button("Close", variant="primary", id="ol-close")
if self._show_spinner:
yield SpinnerWidget("arrow3", id="ol-spinner")
def on_mount(self) -> None:
log = self.query_one("#ol-log", RichLog)
if self._job_id:
from tui.jobs import job_manager
job = job_manager.get_job(self._job_id)
if job:
# Load existing content from log file
if job._log_file and Path(job._log_file).is_file():
try:
content = Path(job._log_file).read_text()
self._file_pos = len(content.encode())
for line in content.splitlines():
self._write_to_log(log, line)
except OSError:
pass
elif job.output:
for line in job.output:
self._write_to_log(log, line)
if job.status != "running":
self.finish()
else:
self._poll_timer = self.set_interval(0.3, self._poll_job)
# Flush any buffered writes
for text in self._buffer:
self._write_to_log(log, text)
self._buffer.clear()
self._mounted_event.set()
def on_button_pressed(self, event: Button.Pressed) -> None:
self.dismiss(None)
def action_close(self) -> None:
self.dismiss(None)
def _write_to_log(self, log: RichLog, text: str) -> None:
if "[" in text and "[/" in text:
log.write(Text.from_markup(text))
else:
log.write(text)
def finish(self) -> None:
try:
self.query_one("#ol-spinner", SpinnerWidget).stop()
except Exception:
pass
def write(self, text: str) -> None:
if not self._mounted_event.is_set():
self._buffer.append(text)
return
try:
log = self.query_one("#ol-log", RichLog)
self._write_to_log(log, text)
except Exception:
self._buffer.append(text)
def _poll_job(self) -> None:
from tui.jobs import job_manager
job = job_manager.get_job(self._job_id)
if not job:
return
try:
log = self.query_one("#ol-log", RichLog)
except Exception:
return
# Read new content directly from log file
if job._log_file and Path(job._log_file).is_file():
try:
with open(job._log_file, "r") as f:
f.seek(self._file_pos)
new_data = f.read()
if new_data:
self._file_pos += len(new_data.encode())
for line in new_data.splitlines():
self._write_to_log(log, line)
except OSError:
pass
if job.status != "running":
if job.return_code == 0:
self._write_to_log(log, "\n[green]Operation completed successfully.[/green]")
else:
self._write_to_log(log, f"\n[red]Operation failed (exit code {job.return_code}).[/red]")
self.finish()
if self._poll_timer:
self._poll_timer.stop()