126 lines
4.9 KiB
Python
126 lines
4.9 KiB
Python
import re
|
|
|
|
from textual.app import ComposeResult
|
|
from textual.screen import Screen
|
|
from textual.widgets import Header, Footer, Static, Button, Select, DataTable
|
|
from textual.containers import Vertical, Horizontal
|
|
from textual import work
|
|
|
|
from tui.config import list_conf_dir
|
|
from tui.backend import run_cli
|
|
from tui.widgets import SnapshotBrowser
|
|
|
|
|
|
class SnapshotsScreen(Screen):
|
|
|
|
BINDINGS = [("escape", "go_back", "Back")]
|
|
|
|
def compose(self) -> ComposeResult:
|
|
yield Header(show_clock=True)
|
|
targets = list_conf_dir("targets.d")
|
|
remotes = list_conf_dir("remotes.d")
|
|
with Vertical(id="snapshots-screen"):
|
|
yield Static("Snapshots Browser", id="screen-title")
|
|
if not targets or not remotes:
|
|
yield Static("Targets and remotes must be configured to browse snapshots.")
|
|
else:
|
|
yield Static("Target:")
|
|
yield Select([(t, t) for t in targets], id="snap-target", prompt="Select target")
|
|
yield Static("Remote:")
|
|
yield Select([(r, r) for r in remotes], id="snap-remote", prompt="Select remote")
|
|
yield Button("Load Snapshots", id="btn-load", variant="primary")
|
|
yield DataTable(id="snap-table")
|
|
with Horizontal(id="snapshots-buttons"):
|
|
yield Button("Browse Files", id="btn-browse")
|
|
yield Button("Back", id="btn-back")
|
|
yield Footer()
|
|
|
|
def on_mount(self) -> None:
|
|
try:
|
|
table = self.query_one("#snap-table", DataTable)
|
|
table.add_columns("Snapshot")
|
|
except Exception:
|
|
pass
|
|
|
|
def on_button_pressed(self, event: Button.Pressed) -> None:
|
|
if event.button.id == "btn-back":
|
|
self.app.pop_screen()
|
|
elif event.button.id == "btn-load":
|
|
self._load_snapshots()
|
|
elif event.button.id == "btn-browse":
|
|
self._browse_snapshot()
|
|
|
|
def _selected_snapshot(self) -> str | None:
|
|
try:
|
|
table = self.query_one("#snap-table", DataTable)
|
|
if table.cursor_row is not None and table.row_count > 0:
|
|
return str(table.coordinate_to_cell_key((table.cursor_row, 0)).row_key.value)
|
|
return None
|
|
except Exception:
|
|
return None
|
|
|
|
@work
|
|
async def _load_snapshots(self) -> None:
|
|
target_sel = self.query_one("#snap-target", Select)
|
|
remote_sel = self.query_one("#snap-remote", Select)
|
|
if not isinstance(target_sel.value, str) or not isinstance(remote_sel.value, str):
|
|
self.notify("Select target and remote first", severity="error")
|
|
return
|
|
target = str(target_sel.value)
|
|
remote = str(remote_sel.value)
|
|
rc, stdout, stderr = await run_cli("snapshots", "list", f"--target={target}", f"--remote={remote}")
|
|
table = self.query_one("#snap-table", DataTable)
|
|
table.clear()
|
|
lines = [l.strip() for l in stdout.splitlines() if l.strip() and not l.startswith("===")]
|
|
if lines:
|
|
for s in lines:
|
|
table.add_row(s, key=s)
|
|
else:
|
|
self.notify("No snapshots found", severity="warning")
|
|
if stderr:
|
|
self.notify(stderr.strip(), severity="error")
|
|
|
|
@work
|
|
async def _browse_snapshot(self) -> None:
|
|
target_sel = self.query_one("#snap-target", Select)
|
|
remote_sel = self.query_one("#snap-remote", Select)
|
|
if not isinstance(target_sel.value, str) or not isinstance(remote_sel.value, str):
|
|
self.notify("Select target and remote first", severity="error")
|
|
return
|
|
snapshot = self._selected_snapshot()
|
|
if not snapshot:
|
|
self.notify("Select a snapshot first", severity="warning")
|
|
return
|
|
target = str(target_sel.value)
|
|
remote = str(remote_sel.value)
|
|
|
|
self.notify("Loading files...")
|
|
rc, stdout, stderr = await run_cli(
|
|
"snapshots", "browse", f"--target={target}", f"--remote={remote}", f"--snapshot={snapshot}"
|
|
)
|
|
|
|
# Parse file list, strip remote prefix to get relative paths
|
|
# Paths look like: /remote/base/.../snapshots/<timestamp>/etc/foo.conf
|
|
# We want everything after the snapshot timestamp directory
|
|
files = []
|
|
pattern = re.compile(re.escape(snapshot) + r"/(.*)")
|
|
for line in (stdout or "").splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
m = pattern.search(line)
|
|
if m:
|
|
files.append(m.group(1))
|
|
else:
|
|
files.append(line)
|
|
|
|
if not files:
|
|
self.notify("No files found in snapshot", severity="warning")
|
|
return
|
|
|
|
browser = SnapshotBrowser(f"{target} / {snapshot}", files)
|
|
self.app.push_screen(browser)
|
|
|
|
def action_go_back(self) -> None:
|
|
self.app.pop_screen()
|