Add Python Textual TUI replacing gum-based bash TUI

New tui/ package with 14 screens (main menu, backup, restore, targets,
remotes, snapshots, verify, retention, schedule, logs, settings, wizard),
3 custom widgets (folder picker, confirm dialog, operation log), async
backend wrapper, pure-Python config parser, and TCSS theme.

bin/gniza now launches Textual TUI when available, falls back to gum.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
shuki
2026-03-05 23:39:48 +02:00
parent 611fe7da3f
commit 587149f062
28 changed files with 2092 additions and 1 deletions

2
.gitignore vendored
View File

@@ -2,3 +2,5 @@
*.swo *.swo
*~ *~
.DS_Store .DS_Store
__pycache__/
*.pyc

View File

@@ -436,8 +436,11 @@ if [[ -n "$SUBCOMMAND" ]]; then
run_cli run_cli
elif [[ "$FORCE_CLI" == "true" ]]; then elif [[ "$FORCE_CLI" == "true" ]]; then
run_cli run_cli
elif python3 -c "import textual" 2>/dev/null && [[ -t 1 ]]; then
# Python Textual TUI mode
PYTHONPATH="$GNIZA_DIR:${PYTHONPATH:-}" exec python3 -m tui "$@"
elif command -v gum &>/dev/null && [[ -t 1 ]]; then elif command -v gum &>/dev/null && [[ -t 1 ]]; then
# TUI mode # Legacy gum TUI mode
show_logo show_logo
if ! has_remotes || ! has_targets; then if ! has_remotes || ! has_targets; then
ui_first_run_wizard ui_first_run_wizard

0
tui/__init__.py Normal file
View File

10
tui/__main__.py Normal file
View File

@@ -0,0 +1,10 @@
from tui.app import GnizaApp
def main():
app = GnizaApp()
app.run()
if __name__ == "__main__":
main()

46
tui/app.py Normal file
View File

@@ -0,0 +1,46 @@
from textual.app import App
from tui.config import has_remotes, has_targets
from tui.screens.main_menu import MainMenuScreen
from tui.screens.backup import BackupScreen
from tui.screens.restore import RestoreScreen
from tui.screens.targets import TargetsScreen
from tui.screens.target_edit import TargetEditScreen
from tui.screens.remotes import RemotesScreen
from tui.screens.remote_edit import RemoteEditScreen
from tui.screens.snapshots import SnapshotsScreen
from tui.screens.verify import VerifyScreen
from tui.screens.retention import RetentionScreen
from tui.screens.schedule import ScheduleScreen
from tui.screens.logs import LogsScreen
from tui.screens.settings import SettingsScreen
from tui.screens.wizard import WizardScreen
class GnizaApp(App):
TITLE = "gniza - Linux Backup Manager"
CSS_PATH = "gniza.tcss"
SCREENS = {
"main": MainMenuScreen,
"backup": BackupScreen,
"restore": RestoreScreen,
"targets": TargetsScreen,
"target_edit": TargetEditScreen,
"remotes": RemotesScreen,
"remote_edit": RemoteEditScreen,
"snapshots": SnapshotsScreen,
"verify": VerifyScreen,
"retention": RetentionScreen,
"schedule": ScheduleScreen,
"logs": LogsScreen,
"settings": SettingsScreen,
"wizard": WizardScreen,
}
def on_mount(self) -> None:
if not has_remotes() or not has_targets():
self.push_screen("wizard")
else:
self.push_screen("main")

40
tui/backend.py Normal file
View File

@@ -0,0 +1,40 @@
import asyncio
import os
from pathlib import Path
def _gniza_bin() -> str:
gniza_dir = os.environ.get("GNIZA_DIR", "")
if gniza_dir:
return str(Path(gniza_dir) / "bin" / "gniza")
here = Path(__file__).resolve().parent.parent / "bin" / "gniza"
if here.is_file():
return str(here)
return "gniza"
async def run_cli(*args: str) -> tuple[int, str, str]:
cmd = [_gniza_bin(), "--cli"] + list(args)
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
return proc.returncode or 0, stdout.decode(), stderr.decode()
async def stream_cli(callback, *args: str) -> int:
cmd = [_gniza_bin(), "--cli"] + list(args)
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
while True:
line = await proc.stdout.readline()
if not line:
break
callback(line.decode().rstrip("\n"))
await proc.wait()
return proc.returncode or 0

92
tui/config.py Normal file
View File

@@ -0,0 +1,92 @@
import os
import re
from pathlib import Path
_KV_RE = re.compile(r'^([A-Z_][A-Z_0-9]*)=(.*)')
_QUOTED_RE = re.compile(r'^"(.*)"$|^\'(.*)\'$')
def _get_config_dir() -> Path:
if os.geteuid() == 0:
return Path("/etc/gniza")
xdg = os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config"))
return Path(xdg) / "gniza"
def _get_log_dir() -> Path:
if os.geteuid() == 0:
return Path("/var/log/gniza")
xdg = os.environ.get("XDG_STATE_HOME", os.path.expanduser("~/.local/state"))
return Path(xdg) / "gniza" / "log"
CONFIG_DIR = _get_config_dir()
LOG_DIR = _get_log_dir()
def parse_conf(filepath: Path) -> dict[str, str]:
data = {}
if not filepath.is_file():
return data
for line in filepath.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
m = _KV_RE.match(line)
if m:
key = m.group(1)
value = m.group(2)
qm = _QUOTED_RE.match(value)
if qm:
value = qm.group(1) if qm.group(1) is not None else qm.group(2)
data[key] = value
return data
def _sanitize_value(value: str) -> str:
return value.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "")
def write_conf(filepath: Path, data: dict[str, str]) -> None:
filepath.parent.mkdir(parents=True, exist_ok=True)
lines = []
for key, value in data.items():
lines.append(f'{key}="{_sanitize_value(value)}"')
filepath.write_text("\n".join(lines) + "\n")
filepath.chmod(0o600)
def update_conf_key(filepath: Path, key: str, value: str) -> None:
value = _sanitize_value(value)
filepath.parent.mkdir(parents=True, exist_ok=True)
if not filepath.is_file():
filepath.write_text(f'{key}="{value}"\n')
filepath.chmod(0o600)
return
lines = filepath.read_text().splitlines()
found = False
for i, line in enumerate(lines):
m = _KV_RE.match(line.strip())
if m and m.group(1) == key:
lines[i] = f'{key}="{value}"'
found = True
break
if not found:
lines.append(f'{key}="{value}"')
filepath.write_text("\n".join(lines) + "\n")
filepath.chmod(0o600)
def list_conf_dir(subdir: str) -> list[str]:
d = CONFIG_DIR / subdir
if not d.is_dir():
return []
return sorted(p.stem for p in d.glob("*.conf") if p.is_file())
def has_targets() -> bool:
return len(list_conf_dir("targets.d")) > 0
def has_remotes() -> bool:
return len(list_conf_dir("remotes.d")) > 0

197
tui/gniza.tcss Normal file
View File

@@ -0,0 +1,197 @@
/* gniza TUI theme */
Screen {
background: $surface;
}
#screen-title {
text-style: bold;
color: #00cc00;
padding: 1 0;
text-align: center;
}
/* Main menu */
#main-menu {
width: 50;
padding: 1 2;
}
#main-menu Button {
width: 100%;
margin: 0 0 1 0;
}
#logo {
text-align: center;
padding: 0 0 1 0;
}
/* Data tables */
DataTable {
height: 12;
margin: 1 0;
}
/* Form screens */
#target-edit,
#remote-edit,
#settings-screen,
#schedule-screen,
#backup-screen,
#restore-screen,
#verify-screen,
#retention-screen,
#snapshots-screen,
#targets-screen,
#remotes-screen,
#logs-screen {
padding: 1 2;
overflow-y: auto;
}
Input {
margin: 0 0 1 0;
}
Select {
margin: 0 0 1 0;
}
/* Button rows */
#backup-buttons,
#restore-buttons,
#verify-buttons,
#ret-buttons,
#targets-buttons,
#remotes-buttons,
#logs-buttons,
#sched-buttons,
#te-buttons,
#re-buttons,
#set-buttons {
height: auto;
margin: 1 0;
}
#backup-buttons Button,
#restore-buttons Button,
#verify-buttons Button,
#ret-buttons Button,
#targets-buttons Button,
#remotes-buttons Button,
#logs-buttons Button,
#sched-buttons Button,
#te-buttons Button,
#re-buttons Button,
#set-buttons Button {
margin: 0 1 0 0;
}
/* Dialogs */
#confirm-dialog {
width: 60;
height: auto;
padding: 2;
background: $panel;
border: thick $accent;
}
#cd-title {
text-style: bold;
color: #00cc00;
margin: 0 0 1 0;
}
#cd-message {
margin: 0 0 1 0;
}
#cd-buttons {
height: auto;
align: center middle;
}
#cd-buttons Button {
margin: 0 1 0 0;
}
/* Folder picker */
#folder-picker {
width: 70;
height: 30;
padding: 1;
background: $panel;
border: thick $accent;
}
#fp-title {
text-style: bold;
color: #00cc00;
margin: 0 0 1 0;
}
#fp-tree {
height: 1fr;
}
#fp-buttons {
height: auto;
margin: 1 0 0 0;
}
#fp-buttons Button {
margin: 0 1 0 0;
}
/* Operation log */
#op-log {
width: 80%;
height: 80%;
padding: 1;
background: $panel;
border: thick $accent;
}
#ol-title {
text-style: bold;
color: #00cc00;
margin: 0 0 1 0;
}
#ol-log {
height: 1fr;
border: round $accent;
}
#ol-close {
margin: 1 0 0 0;
}
/* Log viewer */
#log-viewer {
height: 1fr;
min-height: 8;
border: round $accent;
margin: 1 0;
}
/* Wizard */
#wizard {
width: 60;
padding: 2;
}
#wizard Button {
width: 100%;
margin: 0 0 1 0;
}
#wizard-welcome {
margin: 0 0 1 0;
}
/* Switch row */
Switch {
margin: 0 0 1 1;
}

215
tui/models.py Normal file
View File

@@ -0,0 +1,215 @@
from dataclasses import dataclass, field
@dataclass
class Target:
name: str = ""
folders: str = ""
exclude: str = ""
remote: str = ""
retention: str = ""
pre_hook: str = ""
post_hook: str = ""
enabled: str = "yes"
def to_conf(self) -> dict[str, str]:
return {
"TARGET_NAME": self.name,
"TARGET_FOLDERS": self.folders,
"TARGET_EXCLUDE": self.exclude,
"TARGET_REMOTE": self.remote,
"TARGET_RETENTION": self.retention,
"TARGET_PRE_HOOK": self.pre_hook,
"TARGET_POST_HOOK": self.post_hook,
"TARGET_ENABLED": self.enabled,
}
@classmethod
def from_conf(cls, name: str, data: dict[str, str]) -> "Target":
return cls(
name=data.get("TARGET_NAME", name),
folders=data.get("TARGET_FOLDERS", ""),
exclude=data.get("TARGET_EXCLUDE", ""),
remote=data.get("TARGET_REMOTE", ""),
retention=data.get("TARGET_RETENTION", ""),
pre_hook=data.get("TARGET_PRE_HOOK", ""),
post_hook=data.get("TARGET_POST_HOOK", ""),
enabled=data.get("TARGET_ENABLED", "yes"),
)
@dataclass
class Remote:
name: str = ""
type: str = "ssh"
host: str = ""
port: str = "22"
user: str = "root"
auth_method: str = "key"
key: str = ""
password: str = ""
base: str = "/backups"
bwlimit: str = "0"
retention_count: str = "30"
s3_bucket: str = ""
s3_region: str = "us-east-1"
s3_endpoint: str = ""
s3_access_key_id: str = ""
s3_secret_access_key: str = ""
gdrive_sa_file: str = ""
gdrive_root_folder_id: str = ""
def to_conf(self) -> dict[str, str]:
data: dict[str, str] = {"REMOTE_TYPE": self.type}
if self.type == "ssh":
data.update({
"REMOTE_HOST": self.host,
"REMOTE_PORT": self.port,
"REMOTE_USER": self.user,
"REMOTE_AUTH_METHOD": self.auth_method,
"REMOTE_KEY": self.key,
"REMOTE_PASSWORD": self.password,
"REMOTE_BASE": self.base,
"BWLIMIT": self.bwlimit,
"RETENTION_COUNT": self.retention_count,
})
elif self.type == "local":
data.update({
"REMOTE_BASE": self.base,
"RETENTION_COUNT": self.retention_count,
})
elif self.type == "s3":
data.update({
"S3_BUCKET": self.s3_bucket,
"S3_REGION": self.s3_region,
"S3_ENDPOINT": self.s3_endpoint,
"S3_ACCESS_KEY_ID": self.s3_access_key_id,
"S3_SECRET_ACCESS_KEY": self.s3_secret_access_key,
"REMOTE_BASE": self.base,
"RETENTION_COUNT": self.retention_count,
})
elif self.type == "gdrive":
data.update({
"GDRIVE_SERVICE_ACCOUNT_FILE": self.gdrive_sa_file,
"GDRIVE_ROOT_FOLDER_ID": self.gdrive_root_folder_id,
"REMOTE_BASE": self.base,
"RETENTION_COUNT": self.retention_count,
})
return data
@classmethod
def from_conf(cls, name: str, data: dict[str, str]) -> "Remote":
return cls(
name=name,
type=data.get("REMOTE_TYPE", "ssh"),
host=data.get("REMOTE_HOST", ""),
port=data.get("REMOTE_PORT", "22"),
user=data.get("REMOTE_USER", "root"),
auth_method=data.get("REMOTE_AUTH_METHOD", "key"),
key=data.get("REMOTE_KEY", ""),
password=data.get("REMOTE_PASSWORD", ""),
base=data.get("REMOTE_BASE", "/backups"),
bwlimit=data.get("BWLIMIT", "0"),
retention_count=data.get("RETENTION_COUNT", "30"),
s3_bucket=data.get("S3_BUCKET", ""),
s3_region=data.get("S3_REGION", "us-east-1"),
s3_endpoint=data.get("S3_ENDPOINT", ""),
s3_access_key_id=data.get("S3_ACCESS_KEY_ID", ""),
s3_secret_access_key=data.get("S3_SECRET_ACCESS_KEY", ""),
gdrive_sa_file=data.get("GDRIVE_SERVICE_ACCOUNT_FILE", ""),
gdrive_root_folder_id=data.get("GDRIVE_ROOT_FOLDER_ID", ""),
)
@dataclass
class Schedule:
name: str = ""
schedule: str = "daily"
time: str = "02:00"
day: str = ""
cron: str = ""
targets: str = ""
remotes: str = ""
def to_conf(self) -> dict[str, str]:
return {
"SCHEDULE": self.schedule,
"SCHEDULE_TIME": self.time,
"SCHEDULE_DAY": self.day,
"SCHEDULE_CRON": self.cron,
"TARGETS": self.targets,
"REMOTES": self.remotes,
}
@classmethod
def from_conf(cls, name: str, data: dict[str, str]) -> "Schedule":
return cls(
name=name,
schedule=data.get("SCHEDULE", "daily"),
time=data.get("SCHEDULE_TIME", "02:00"),
day=data.get("SCHEDULE_DAY", ""),
cron=data.get("SCHEDULE_CRON", ""),
targets=data.get("TARGETS", ""),
remotes=data.get("REMOTES", ""),
)
@dataclass
class AppSettings:
backup_mode: str = "incremental"
bwlimit: str = "0"
retention_count: str = "7"
log_level: str = "INFO"
log_retain: str = "30"
notify_email: str = ""
notify_on: str = "failure"
smtp_host: str = ""
smtp_port: str = "587"
smtp_user: str = ""
smtp_password: str = ""
smtp_from: str = ""
smtp_security: str = "tls"
ssh_timeout: str = "30"
ssh_retries: str = "3"
rsync_extra_opts: str = ""
@classmethod
def from_conf(cls, data: dict[str, str]) -> "AppSettings":
return cls(
backup_mode=data.get("BACKUP_MODE", "incremental"),
bwlimit=data.get("BWLIMIT", "0"),
retention_count=data.get("RETENTION_COUNT", "7"),
log_level=data.get("LOG_LEVEL", "INFO"),
log_retain=data.get("LOG_RETAIN", "30"),
notify_email=data.get("NOTIFY_EMAIL", ""),
notify_on=data.get("NOTIFY_ON", "failure"),
smtp_host=data.get("SMTP_HOST", ""),
smtp_port=data.get("SMTP_PORT", "587"),
smtp_user=data.get("SMTP_USER", ""),
smtp_password=data.get("SMTP_PASSWORD", ""),
smtp_from=data.get("SMTP_FROM", ""),
smtp_security=data.get("SMTP_SECURITY", "tls"),
ssh_timeout=data.get("SSH_TIMEOUT", "30"),
ssh_retries=data.get("SSH_RETRIES", "3"),
rsync_extra_opts=data.get("RSYNC_EXTRA_OPTS", ""),
)
def to_conf(self) -> dict[str, str]:
return {
"BACKUP_MODE": self.backup_mode,
"BWLIMIT": self.bwlimit,
"RETENTION_COUNT": self.retention_count,
"LOG_LEVEL": self.log_level,
"LOG_RETAIN": self.log_retain,
"NOTIFY_EMAIL": self.notify_email,
"NOTIFY_ON": self.notify_on,
"SMTP_HOST": self.smtp_host,
"SMTP_PORT": self.smtp_port,
"SMTP_USER": self.smtp_user,
"SMTP_PASSWORD": self.smtp_password,
"SMTP_FROM": self.smtp_from,
"SMTP_SECURITY": self.smtp_security,
"SSH_TIMEOUT": self.ssh_timeout,
"SSH_RETRIES": self.ssh_retries,
"RSYNC_EXTRA_OPTS": self.rsync_extra_opts,
}

14
tui/screens/__init__.py Normal file
View File

@@ -0,0 +1,14 @@
from tui.screens.main_menu import MainMenuScreen
from tui.screens.backup import BackupScreen
from tui.screens.restore import RestoreScreen
from tui.screens.targets import TargetsScreen
from tui.screens.target_edit import TargetEditScreen
from tui.screens.remotes import RemotesScreen
from tui.screens.remote_edit import RemoteEditScreen
from tui.screens.snapshots import SnapshotsScreen
from tui.screens.verify import VerifyScreen
from tui.screens.retention import RetentionScreen
from tui.screens.schedule import ScheduleScreen
from tui.screens.logs import LogsScreen
from tui.screens.settings import SettingsScreen
from tui.screens.wizard import WizardScreen

92
tui/screens/backup.py Normal file
View File

@@ -0,0 +1,92 @@
from textual.app import ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Button, Select
from textual.containers import Vertical, Horizontal
from textual import work
from tui.config import list_conf_dir, has_targets, has_remotes
from tui.backend import stream_cli
from tui.widgets import ConfirmDialog, OperationLog
class BackupScreen(Screen):
BINDINGS = [("escape", "go_back", "Back")]
def compose(self) -> ComposeResult:
yield Header()
targets = list_conf_dir("targets.d")
remotes = list_conf_dir("remotes.d")
with Vertical(id="backup-screen"):
yield Static("Backup", id="screen-title")
if not targets:
yield Static("No targets configured. Add a target first.")
else:
yield Static("Target:")
yield Select(
[(t, t) for t in targets],
id="backup-target",
prompt="Select target",
)
yield Static("Remote (optional):")
yield Select(
[("Default (all)", "")] + [(r, r) for r in remotes],
id="backup-remote",
prompt="Select remote",
value="",
)
with Horizontal(id="backup-buttons"):
yield Button("Run Backup", variant="primary", id="btn-backup")
yield Button("Backup All", variant="warning", id="btn-backup-all")
yield Button("Back", id="btn-back")
yield Footer()
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "btn-back":
self.app.pop_screen()
elif event.button.id == "btn-backup":
target_sel = self.query_one("#backup-target", Select)
if target_sel.value is Select.BLANK:
self.notify("Please select a target", severity="error")
return
target = str(target_sel.value)
remote_sel = self.query_one("#backup-remote", Select)
remote = str(remote_sel.value) if remote_sel.value != Select.BLANK else ""
msg = f"Run backup for target '{target}'?"
if remote:
msg += f"\nRemote: {remote}"
self.app.push_screen(
ConfirmDialog(msg, "Confirm Backup"),
callback=lambda ok: self._do_backup(target, remote) if ok else None,
)
elif event.button.id == "btn-backup-all":
self.app.push_screen(
ConfirmDialog("Backup ALL targets now?", "Confirm Backup"),
callback=lambda ok: self._do_backup_all() if ok else None,
)
@work
async def _do_backup(self, target: str, remote: str) -> None:
log_screen = OperationLog(f"Backup: {target}")
self.app.push_screen(log_screen)
args = ["backup", f"--target={target}"]
if remote:
args.append(f"--remote={remote}")
rc = await stream_cli(log_screen.write, *args)
if rc == 0:
log_screen.write("\n[green]Backup completed successfully.[/green]")
else:
log_screen.write(f"\n[red]Backup failed (exit code {rc}).[/red]")
@work
async def _do_backup_all(self) -> None:
log_screen = OperationLog("Backup All Targets")
self.app.push_screen(log_screen)
rc = await stream_cli(log_screen.write, "backup", "--all")
if rc == 0:
log_screen.write("\n[green]All backups completed.[/green]")
else:
log_screen.write(f"\n[red]Backup failed (exit code {rc}).[/red]")
def action_go_back(self) -> None:
self.app.pop_screen()

111
tui/screens/logs.py Normal file
View File

@@ -0,0 +1,111 @@
from pathlib import Path
from textual.app import ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Button, DataTable, RichLog
from textual.containers import Vertical, Horizontal
from tui.config import LOG_DIR
class LogsScreen(Screen):
BINDINGS = [("escape", "go_back", "Back")]
def compose(self) -> ComposeResult:
yield Header()
with Vertical(id="logs-screen"):
yield Static("Logs", id="screen-title")
yield DataTable(id="logs-table")
with Horizontal(id="logs-buttons"):
yield Button("View", variant="primary", id="btn-view")
yield Button("Status", id="btn-status")
yield Button("Back", id="btn-back")
yield RichLog(id="log-viewer", wrap=True, highlight=True)
yield Footer()
def on_mount(self) -> None:
self._refresh_table()
def _refresh_table(self) -> None:
table = self.query_one("#logs-table", DataTable)
table.clear(columns=True)
table.add_columns("File", "Size")
log_dir = Path(LOG_DIR)
if not log_dir.is_dir():
return
logs = sorted(log_dir.glob("gniza-*.log"), key=lambda p: p.stat().st_mtime, reverse=True)[:20]
for f in logs:
size = f.stat().st_size
if size >= 1048576:
size_str = f"{size / 1048576:.1f} MB"
elif size >= 1024:
size_str = f"{size / 1024:.1f} KB"
else:
size_str = f"{size} B"
table.add_row(f.name, size_str, key=f.name)
def _selected_log(self) -> str | None:
table = self.query_one("#logs-table", DataTable)
if table.cursor_row is not None and table.row_count > 0:
return str(table.coordinate_to_cell_key((table.cursor_row, 0)).row_key.value)
return None
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "btn-back":
self.app.pop_screen()
elif event.button.id == "btn-view":
name = self._selected_log()
if name:
self._view_log(name)
else:
self.notify("Select a log file first", severity="warning")
elif event.button.id == "btn-status":
self._show_status()
def _view_log(self, name: str) -> None:
filepath = (Path(LOG_DIR) / name).resolve()
if not filepath.is_relative_to(Path(LOG_DIR).resolve()):
self.notify("Invalid log path", severity="error")
return
viewer = self.query_one("#log-viewer", RichLog)
viewer.clear()
if filepath.is_file():
content = filepath.read_text()
viewer.write(content)
else:
viewer.write(f"File not found: {filepath}")
def _show_status(self) -> None:
viewer = self.query_one("#log-viewer", RichLog)
viewer.clear()
log_dir = Path(LOG_DIR)
viewer.write("Backup Status Overview")
viewer.write("=" * 40)
if not log_dir.is_dir():
viewer.write("Log directory does not exist.")
return
logs = sorted(log_dir.glob("gniza-*.log"), key=lambda p: p.stat().st_mtime, reverse=True)
if logs:
latest = logs[0]
from datetime import datetime
mtime = datetime.fromtimestamp(latest.stat().st_mtime)
viewer.write(f"Last log: {mtime.strftime('%Y-%m-%d %H:%M:%S')}")
last_line = ""
with open(latest) as f:
for line in f:
last_line = line.rstrip()
if last_line:
viewer.write(f"Last entry: {last_line}")
else:
viewer.write("No backup logs found.")
viewer.write(f"Log files: {len(logs)}")
total = sum(f.stat().st_size for f in logs)
if total >= 1048576:
viewer.write(f"Total size: {total / 1048576:.1f} MB")
elif total >= 1024:
viewer.write(f"Total size: {total / 1024:.1f} KB")
else:
viewer.write(f"Total size: {total} B")
def action_go_back(self) -> None:
self.app.pop_screen()

58
tui/screens/main_menu.py Normal file
View File

@@ -0,0 +1,58 @@
from textual.app import ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Button
from textual.containers import Vertical, Center
LOGO = """\
[green]
\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593
\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593 \u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593
\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593
\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593
[/green]
gniza - Linux Backup Manager
"""
class MainMenuScreen(Screen):
BINDINGS = [("q", "quit_app", "Quit")]
def compose(self) -> ComposeResult:
yield Header()
with Center():
with Vertical(id="main-menu"):
yield Static(LOGO, id="logo", markup=True)
yield Button("Backup", id="menu-backup", variant="primary")
yield Button("Restore", id="menu-restore")
yield Button("Targets", id="menu-targets")
yield Button("Remotes", id="menu-remotes")
yield Button("Snapshots", id="menu-snapshots")
yield Button("Verify", id="menu-verify")
yield Button("Retention", id="menu-retention")
yield Button("Schedules", id="menu-schedule")
yield Button("Logs", id="menu-logs")
yield Button("Settings", id="menu-settings")
yield Button("Quit", id="menu-quit", variant="error")
yield Footer()
def on_button_pressed(self, event: Button.Pressed) -> None:
button_map = {
"menu-backup": "backup",
"menu-restore": "restore",
"menu-targets": "targets",
"menu-remotes": "remotes",
"menu-snapshots": "snapshots",
"menu-verify": "verify",
"menu-retention": "retention",
"menu-schedule": "schedule",
"menu-logs": "logs",
"menu-settings": "settings",
}
if event.button.id == "menu-quit":
self.app.exit()
elif event.button.id in button_map:
self.app.push_screen(button_map[event.button.id])
def action_quit_app(self) -> None:
self.app.exit()

161
tui/screens/remote_edit.py Normal file
View File

@@ -0,0 +1,161 @@
import re
from textual.app import ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Button, Input, Select, RadioSet, RadioButton
from textual.containers import Vertical, Horizontal
from tui.config import parse_conf, write_conf, CONFIG_DIR
from tui.models import Remote
_NAME_RE = re.compile(r'^[a-zA-Z][a-zA-Z0-9_-]{0,31}$')
REMOTE_TYPES = [("SSH", "ssh"), ("Local directory", "local"), ("Amazon S3", "s3"), ("Google Drive", "gdrive")]
class RemoteEditScreen(Screen):
BINDINGS = [("escape", "go_back", "Back")]
def __init__(self, name: str = ""):
super().__init__()
self._edit_name = name
self._is_new = not name
def compose(self) -> ComposeResult:
yield Header()
title = "Add Remote" if self._is_new else f"Edit Remote: {self._edit_name}"
remote = Remote()
if not self._is_new:
data = parse_conf(CONFIG_DIR / "remotes.d" / f"{self._edit_name}.conf")
remote = Remote.from_conf(self._edit_name, data)
with Vertical(id="remote-edit"):
yield Static(title, id="screen-title")
if self._is_new:
yield Static("Name:")
yield Input(value="", placeholder="Remote name", id="re-name")
yield Static("Type:")
yield Select(
REMOTE_TYPES,
id="re-type",
value=remote.type,
)
# SSH fields
yield Static("Host:", id="lbl-host", classes="ssh-field")
yield Input(value=remote.host, placeholder="hostname or IP", id="re-host", classes="ssh-field")
yield Static("Port:", id="lbl-port", classes="ssh-field")
yield Input(value=remote.port, placeholder="22", id="re-port", classes="ssh-field")
yield Static("User:", id="lbl-user", classes="ssh-field")
yield Input(value=remote.user, placeholder="root", id="re-user", classes="ssh-field")
yield Static("Auth method:", id="lbl-auth", classes="ssh-field")
yield Select(
[("SSH Key", "key"), ("Password", "password")],
id="re-auth",
value=remote.auth_method,
classes="ssh-field",
)
yield Static("SSH Key path:", id="lbl-key", classes="ssh-field")
yield Input(value=remote.key, placeholder="~/.ssh/id_rsa", id="re-key", classes="ssh-field")
yield Static("Password:", id="lbl-password", classes="ssh-field")
yield Input(value=remote.password, placeholder="SSH password", password=True, id="re-password", classes="ssh-field")
# Common fields
yield Static("Base path:")
yield Input(value=remote.base, placeholder="/backups", id="re-base")
yield Static("Bandwidth limit (KB/s, 0=unlimited):")
yield Input(value=remote.bwlimit, placeholder="0", id="re-bwlimit")
yield Static("Retention count:")
yield Input(value=remote.retention_count, placeholder="30", id="re-retention")
# S3 fields
yield Static("S3 Bucket:", id="lbl-s3bucket", classes="s3-field")
yield Input(value=remote.s3_bucket, placeholder="bucket-name", id="re-s3bucket", classes="s3-field")
yield Static("S3 Region:", id="lbl-s3region", classes="s3-field")
yield Input(value=remote.s3_region, placeholder="us-east-1", id="re-s3region", classes="s3-field")
yield Static("S3 Endpoint:", id="lbl-s3endpoint", classes="s3-field")
yield Input(value=remote.s3_endpoint, placeholder="Leave empty for AWS", id="re-s3endpoint", classes="s3-field")
yield Static("Access Key ID:", id="lbl-s3key", classes="s3-field")
yield Input(value=remote.s3_access_key_id, id="re-s3key", classes="s3-field")
yield Static("Secret Access Key:", id="lbl-s3secret", classes="s3-field")
yield Input(value=remote.s3_secret_access_key, password=True, id="re-s3secret", classes="s3-field")
# GDrive fields
yield Static("Service Account JSON:", id="lbl-gdsa", classes="gdrive-field")
yield Input(value=remote.gdrive_sa_file, placeholder="/path/to/sa.json", id="re-gdsa", classes="gdrive-field")
yield Static("Root Folder ID:", id="lbl-gdfolder", classes="gdrive-field")
yield Input(value=remote.gdrive_root_folder_id, id="re-gdfolder", classes="gdrive-field")
with Horizontal(id="re-buttons"):
yield Button("Save", variant="primary", id="btn-save")
yield Button("Cancel", id="btn-cancel")
yield Footer()
def on_mount(self) -> None:
self._update_field_visibility()
def on_select_changed(self, event: Select.Changed) -> None:
if event.select.id == "re-type":
self._update_field_visibility()
def _update_field_visibility(self) -> None:
type_sel = self.query_one("#re-type", Select)
rtype = str(type_sel.value) if type_sel.value is not Select.BLANK else "ssh"
for w in self.query(".ssh-field"):
w.display = rtype == "ssh"
for w in self.query(".s3-field"):
w.display = rtype == "s3"
for w in self.query(".gdrive-field"):
w.display = rtype == "gdrive"
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "btn-cancel":
self.dismiss(None)
elif event.button.id == "btn-save":
self._save()
def _save(self) -> None:
if self._is_new:
name = self.query_one("#re-name", Input).value.strip()
if not name:
self.notify("Name is required", severity="error")
return
if not _NAME_RE.match(name):
self.notify("Invalid name.", severity="error")
return
if (CONFIG_DIR / "remotes.d" / f"{name}.conf").exists():
self.notify(f"Remote '{name}' already exists.", severity="error")
return
else:
name = self._edit_name
type_sel = self.query_one("#re-type", Select)
rtype = str(type_sel.value) if type_sel.value is not Select.BLANK else "ssh"
remote = Remote(
name=name,
type=rtype,
host=self.query_one("#re-host", Input).value.strip(),
port=self.query_one("#re-port", Input).value.strip() or "22",
user=self.query_one("#re-user", Input).value.strip() or "root",
auth_method=str(self.query_one("#re-auth", Select).value) if self.query_one("#re-auth", Select).value is not Select.BLANK else "key",
key=self.query_one("#re-key", Input).value.strip(),
password=self.query_one("#re-password", Input).value,
base=self.query_one("#re-base", Input).value.strip() or "/backups",
bwlimit=self.query_one("#re-bwlimit", Input).value.strip() or "0",
retention_count=self.query_one("#re-retention", Input).value.strip() or "30",
s3_bucket=self.query_one("#re-s3bucket", Input).value.strip(),
s3_region=self.query_one("#re-s3region", Input).value.strip() or "us-east-1",
s3_endpoint=self.query_one("#re-s3endpoint", Input).value.strip(),
s3_access_key_id=self.query_one("#re-s3key", Input).value.strip(),
s3_secret_access_key=self.query_one("#re-s3secret", Input).value,
gdrive_sa_file=self.query_one("#re-gdsa", Input).value.strip(),
gdrive_root_folder_id=self.query_one("#re-gdfolder", Input).value.strip(),
)
if rtype == "ssh" and not remote.host:
self.notify("Host is required for SSH remotes", severity="error")
return
conf = CONFIG_DIR / "remotes.d" / f"{name}.conf"
write_conf(conf, remote.to_conf())
self.notify(f"Remote '{name}' saved.")
self.dismiss(name)
def action_go_back(self) -> None:
self.dismiss(None)

106
tui/screens/remotes.py Normal file
View File

@@ -0,0 +1,106 @@
from textual.app import ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Button, DataTable
from textual.containers import Vertical, Horizontal
from textual import work
from tui.config import list_conf_dir, parse_conf, CONFIG_DIR
from tui.backend import run_cli
from tui.widgets import ConfirmDialog, OperationLog
class RemotesScreen(Screen):
BINDINGS = [("escape", "go_back", "Back")]
def compose(self) -> ComposeResult:
yield Header()
with Vertical(id="remotes-screen"):
yield Static("Remotes", id="screen-title")
yield DataTable(id="remotes-table")
with Horizontal(id="remotes-buttons"):
yield Button("Add", variant="primary", id="btn-add")
yield Button("Edit", id="btn-edit")
yield Button("Test", variant="warning", id="btn-test")
yield Button("Delete", variant="error", id="btn-delete")
yield Button("Back", id="btn-back")
yield Footer()
def on_mount(self) -> None:
self._refresh_table()
def _refresh_table(self) -> None:
table = self.query_one("#remotes-table", DataTable)
table.clear(columns=True)
table.add_columns("Name", "Type", "Host/Path")
remotes = list_conf_dir("remotes.d")
for name in remotes:
data = parse_conf(CONFIG_DIR / "remotes.d" / f"{name}.conf")
rtype = data.get("REMOTE_TYPE", "ssh")
if rtype == "ssh":
loc = f"{data.get('REMOTE_USER', 'root')}@{data.get('REMOTE_HOST', '')}:{data.get('REMOTE_BASE', '')}"
elif rtype == "local":
loc = data.get("REMOTE_BASE", "")
elif rtype == "s3":
loc = f"s3://{data.get('S3_BUCKET', '')}{data.get('REMOTE_BASE', '')}"
else:
loc = data.get("REMOTE_BASE", "")
table.add_row(name, rtype, loc, key=name)
def _selected_remote(self) -> str | None:
table = self.query_one("#remotes-table", DataTable)
if table.cursor_row is not None and table.row_count > 0:
return str(table.coordinate_to_cell_key((table.cursor_row, 0)).row_key.value)
return None
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "btn-back":
self.app.pop_screen()
elif event.button.id == "btn-add":
self.app.push_screen("remote_edit", callback=lambda _: self._refresh_table())
elif event.button.id == "btn-edit":
name = self._selected_remote()
if name:
from tui.screens.remote_edit import RemoteEditScreen
self.app.push_screen(RemoteEditScreen(name), callback=lambda _: self._refresh_table())
else:
self.notify("Select a remote first", severity="warning")
elif event.button.id == "btn-test":
name = self._selected_remote()
if name:
self._test_remote(name)
else:
self.notify("Select a remote first", severity="warning")
elif event.button.id == "btn-delete":
name = self._selected_remote()
if name:
self.app.push_screen(
ConfirmDialog(f"Delete remote '{name}'? This cannot be undone.", "Delete Remote"),
callback=lambda ok: self._delete_remote(name) if ok else None,
)
else:
self.notify("Select a remote first", severity="warning")
@work
async def _test_remote(self, name: str) -> None:
log_screen = OperationLog(f"Testing Remote: {name}")
self.app.push_screen(log_screen)
rc, stdout, stderr = await run_cli("remotes", "test", f"--name={name}")
if stdout:
log_screen.write(stdout)
if stderr:
log_screen.write(stderr)
if rc == 0:
log_screen.write("\n[green]Connection test passed.[/green]")
else:
log_screen.write(f"\n[red]Connection test failed (exit code {rc}).[/red]")
def _delete_remote(self, name: str) -> None:
conf = CONFIG_DIR / "remotes.d" / f"{name}.conf"
if conf.is_file():
conf.unlink()
self.notify(f"Remote '{name}' deleted.")
self._refresh_table()
def action_go_back(self) -> None:
self.app.pop_screen()

110
tui/screens/restore.py Normal file
View File

@@ -0,0 +1,110 @@
from textual.app import ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Button, Select, Input, RadioSet, RadioButton
from textual.containers import Vertical, Horizontal
from textual import work
from tui.config import list_conf_dir, parse_conf, CONFIG_DIR
from tui.backend import run_cli, stream_cli
from tui.widgets import ConfirmDialog, OperationLog
class RestoreScreen(Screen):
BINDINGS = [("escape", "go_back", "Back")]
def compose(self) -> ComposeResult:
yield Header()
targets = list_conf_dir("targets.d")
remotes = list_conf_dir("remotes.d")
with Vertical(id="restore-screen"):
yield Static("Restore", id="screen-title")
if not targets or not remotes:
yield Static("Both targets and remotes must be configured for restore.")
else:
yield Static("Target:")
yield Select([(t, t) for t in targets], id="restore-target", prompt="Select target")
yield Static("Remote:")
yield Select([(r, r) for r in remotes], id="restore-remote", prompt="Select remote")
yield Static("Snapshot:")
yield Select([], id="restore-snapshot", prompt="Load snapshots first")
yield Button("Load Snapshots", id="btn-load-snaps")
yield Static("Restore location:")
with RadioSet(id="restore-location"):
yield RadioButton("In-place (original)", value=True)
yield RadioButton("Custom directory")
yield Input(placeholder="Destination directory (e.g. /tmp/restore)", id="restore-dest")
with Horizontal(id="restore-buttons"):
yield Button("Restore", variant="primary", id="btn-restore")
yield Button("Back", id="btn-back")
yield Footer()
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "btn-back":
self.app.pop_screen()
elif event.button.id == "btn-load-snaps":
self._load_snapshots()
elif event.button.id == "btn-restore":
self._start_restore()
@work
async def _load_snapshots(self) -> None:
target_sel = self.query_one("#restore-target", Select)
remote_sel = self.query_one("#restore-remote", Select)
if target_sel.value is Select.BLANK or remote_sel.value is Select.BLANK:
self.notify("Select target and remote first", severity="error")
return
target = str(target_sel.value)
remote = str(remote_sel.value)
rc, stdout, stderr = await run_cli("snapshots", "list", f"--target={target}", f"--remote={remote}")
snap_sel = self.query_one("#restore-snapshot", Select)
lines = [l.strip() for l in stdout.splitlines() if l.strip() and not l.startswith("===")]
if lines:
snap_sel.set_options([(s, s) for s in lines])
else:
self.notify("No snapshots found", severity="warning")
def _start_restore(self) -> None:
target_sel = self.query_one("#restore-target", Select)
remote_sel = self.query_one("#restore-remote", Select)
snap_sel = self.query_one("#restore-snapshot", Select)
if target_sel.value is Select.BLANK:
self.notify("Select a target", severity="error")
return
if remote_sel.value is Select.BLANK:
self.notify("Select a remote", severity="error")
return
if snap_sel.value is Select.BLANK:
self.notify("Select a snapshot", severity="error")
return
target = str(target_sel.value)
remote = str(remote_sel.value)
snapshot = str(snap_sel.value)
radio = self.query_one("#restore-location", RadioSet)
dest_input = self.query_one("#restore-dest", Input)
dest = "" if radio.pressed_index == 0 else dest_input.value
msg = f"Restore snapshot?\n\nTarget: {target}\nRemote: {remote}\nSnapshot: {snapshot}"
if dest:
msg += f"\nDestination: {dest}"
else:
msg += "\nLocation: In-place"
self.app.push_screen(
ConfirmDialog(msg, "Confirm Restore"),
callback=lambda ok: self._do_restore(target, remote, snapshot, dest) if ok else None,
)
@work
async def _do_restore(self, target: str, remote: str, snapshot: str, dest: str) -> None:
log_screen = OperationLog(f"Restore: {target}")
self.app.push_screen(log_screen)
args = ["restore", f"--target={target}", f"--remote={remote}", f"--snapshot={snapshot}"]
if dest:
args.append(f"--dest={dest}")
rc = await stream_cli(log_screen.write, *args)
if rc == 0:
log_screen.write("\n[green]Restore completed successfully.[/green]")
else:
log_screen.write(f"\n[red]Restore failed (exit code {rc}).[/red]")
def action_go_back(self) -> None:
self.app.pop_screen()

90
tui/screens/retention.py Normal file
View File

@@ -0,0 +1,90 @@
from textual.app import ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Button, Select, Input
from textual.containers import Vertical, Horizontal
from textual import work
from tui.config import list_conf_dir, parse_conf, update_conf_key, CONFIG_DIR
from tui.backend import stream_cli
from tui.widgets import ConfirmDialog, OperationLog
class RetentionScreen(Screen):
BINDINGS = [("escape", "go_back", "Back")]
def compose(self) -> ComposeResult:
yield Header()
targets = list_conf_dir("targets.d")
conf = parse_conf(CONFIG_DIR / "gniza.conf")
current_count = conf.get("RETENTION_COUNT", "30")
with Vertical(id="retention-screen"):
yield Static("Retention Cleanup", id="screen-title")
if not targets:
yield Static("No targets configured.")
else:
yield Static("Target:")
yield Select(
[(t, t) for t in targets],
id="ret-target",
prompt="Select target",
)
with Horizontal(id="ret-buttons"):
yield Button("Run Cleanup", variant="primary", id="btn-cleanup")
yield Button("Cleanup All", variant="warning", id="btn-cleanup-all")
yield Static("")
yield Static("Default retention count:")
with Horizontal():
yield Input(value=current_count, id="ret-count", placeholder="30")
yield Button("Save", id="btn-save-count")
yield Button("Back", id="btn-back")
yield Footer()
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "btn-back":
self.app.pop_screen()
elif event.button.id == "btn-cleanup":
target_sel = self.query_one("#ret-target", Select)
if target_sel.value is Select.BLANK:
self.notify("Select a target first", severity="error")
return
target = str(target_sel.value)
self.app.push_screen(
ConfirmDialog(f"Run retention cleanup for '{target}'?", "Confirm"),
callback=lambda ok: self._do_cleanup(target) if ok else None,
)
elif event.button.id == "btn-cleanup-all":
self.app.push_screen(
ConfirmDialog("Run retention cleanup for ALL targets?", "Confirm"),
callback=lambda ok: self._do_cleanup_all() if ok else None,
)
elif event.button.id == "btn-save-count":
val = self.query_one("#ret-count", Input).value.strip()
if not val.isdigit() or int(val) < 1:
self.notify("Retention count must be a positive integer.", severity="error")
return
update_conf_key(CONFIG_DIR / "gniza.conf", "RETENTION_COUNT", val)
self.notify(f"Retention count set to {val}.")
@work
async def _do_cleanup(self, target: str) -> None:
log_screen = OperationLog(f"Retention: {target}")
self.app.push_screen(log_screen)
rc = await stream_cli(log_screen.write, "retention", f"--target={target}")
if rc == 0:
log_screen.write("\n[green]Cleanup completed.[/green]")
else:
log_screen.write(f"\n[red]Cleanup failed (exit code {rc}).[/red]")
@work
async def _do_cleanup_all(self) -> None:
log_screen = OperationLog("Retention: All Targets")
self.app.push_screen(log_screen)
rc = await stream_cli(log_screen.write, "retention", "--all")
if rc == 0:
log_screen.write("\n[green]All cleanups completed.[/green]")
else:
log_screen.write(f"\n[red]Cleanup failed (exit code {rc}).[/red]")
def action_go_back(self) -> None:
self.app.pop_screen()

164
tui/screens/schedule.py Normal file
View File

@@ -0,0 +1,164 @@
import re
from textual.app import ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Button, DataTable, Input, Select
from textual.containers import Vertical, Horizontal
from textual import work
from tui.config import list_conf_dir, parse_conf, write_conf, CONFIG_DIR
from tui.models import Schedule
from tui.backend import run_cli
from tui.widgets import ConfirmDialog, OperationLog
_NAME_RE = re.compile(r'^[a-zA-Z][a-zA-Z0-9_-]{0,31}$')
SCHEDULE_TYPES = [
("Hourly", "hourly"),
("Daily", "daily"),
("Weekly", "weekly"),
("Monthly", "monthly"),
("Custom cron", "custom"),
]
class ScheduleScreen(Screen):
BINDINGS = [("escape", "go_back", "Back")]
def compose(self) -> ComposeResult:
yield Header()
with Vertical(id="schedule-screen"):
yield Static("Schedules", id="screen-title")
yield DataTable(id="sched-table")
with Horizontal(id="sched-buttons"):
yield Button("Add", variant="primary", id="btn-add")
yield Button("Delete", variant="error", id="btn-delete")
yield Button("Install to crontab", id="btn-install")
yield Button("Remove from crontab", id="btn-remove")
yield Button("Show crontab", id="btn-show")
yield Button("Back", id="btn-back")
yield Static("", id="sched-divider")
yield Static("Add Schedule", id="sched-add-title")
yield Static("Name:")
yield Input(id="sched-name", placeholder="Schedule name")
yield Static("Type:")
yield Select(SCHEDULE_TYPES, id="sched-type", value="daily")
yield Static("Time (HH:MM):")
yield Input(id="sched-time", value="02:00", placeholder="02:00")
yield Static("Day (0=Sun for weekly, 1-28 for monthly):")
yield Input(id="sched-day", placeholder="Leave empty if not needed")
yield Static("Custom cron (5 fields):")
yield Input(id="sched-cron", placeholder="0 2 * * *")
yield Static("Targets (comma-separated, empty=all):")
yield Input(id="sched-targets", placeholder="")
yield Static("Remotes (comma-separated, empty=all):")
yield Input(id="sched-remotes", placeholder="")
yield Footer()
def on_mount(self) -> None:
self._refresh_table()
def _refresh_table(self) -> None:
table = self.query_one("#sched-table", DataTable)
table.clear(columns=True)
table.add_columns("Name", "Type", "Time", "Targets", "Remotes")
schedules = list_conf_dir("schedules.d")
for name in schedules:
data = parse_conf(CONFIG_DIR / "schedules.d" / f"{name}.conf")
s = Schedule.from_conf(name, data)
table.add_row(name, s.schedule, s.time, s.targets or "all", s.remotes or "all", key=name)
def _selected_schedule(self) -> str | None:
table = self.query_one("#sched-table", DataTable)
if table.cursor_row is not None and table.row_count > 0:
return str(table.coordinate_to_cell_key((table.cursor_row, 0)).row_key.value)
return None
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "btn-back":
self.app.pop_screen()
elif event.button.id == "btn-add":
self._add_schedule()
elif event.button.id == "btn-delete":
name = self._selected_schedule()
if name:
self.app.push_screen(
ConfirmDialog(f"Delete schedule '{name}'?", "Delete Schedule"),
callback=lambda ok: self._delete_schedule(name) if ok else None,
)
else:
self.notify("Select a schedule first", severity="warning")
elif event.button.id == "btn-install":
self._install_schedules()
elif event.button.id == "btn-remove":
self._remove_schedules()
elif event.button.id == "btn-show":
self._show_crontab()
def _add_schedule(self) -> None:
name = self.query_one("#sched-name", Input).value.strip()
if not name:
self.notify("Name is required", severity="error")
return
if not _NAME_RE.match(name):
self.notify("Invalid name.", severity="error")
return
conf = CONFIG_DIR / "schedules.d" / f"{name}.conf"
if conf.exists():
self.notify(f"Schedule '{name}' already exists.", severity="error")
return
type_sel = self.query_one("#sched-type", Select)
stype = str(type_sel.value) if type_sel.value is not Select.BLANK else "daily"
sched = Schedule(
name=name,
schedule=stype,
time=self.query_one("#sched-time", Input).value.strip() or "02:00",
day=self.query_one("#sched-day", Input).value.strip(),
cron=self.query_one("#sched-cron", Input).value.strip(),
targets=self.query_one("#sched-targets", Input).value.strip(),
remotes=self.query_one("#sched-remotes", Input).value.strip(),
)
write_conf(conf, sched.to_conf())
self.notify(f"Schedule '{name}' created.")
self._refresh_table()
self.query_one("#sched-name", Input).value = ""
def _delete_schedule(self, name: str) -> None:
conf = CONFIG_DIR / "schedules.d" / f"{name}.conf"
if conf.is_file():
conf.unlink()
self.notify(f"Schedule '{name}' deleted.")
self._refresh_table()
@work
async def _install_schedules(self) -> None:
log_screen = OperationLog("Install Schedules")
self.app.push_screen(log_screen)
rc, stdout, stderr = await run_cli("schedule", "install")
if stdout:
log_screen.write(stdout)
if stderr:
log_screen.write(stderr)
@work
async def _remove_schedules(self) -> None:
log_screen = OperationLog("Remove Schedules")
self.app.push_screen(log_screen)
rc, stdout, stderr = await run_cli("schedule", "remove")
if stdout:
log_screen.write(stdout)
if stderr:
log_screen.write(stderr)
@work
async def _show_crontab(self) -> None:
log_screen = OperationLog("Current Crontab")
self.app.push_screen(log_screen)
rc, stdout, stderr = await run_cli("schedule", "show")
if stdout:
log_screen.write(stdout)
if stderr:
log_screen.write(stderr)
def action_go_back(self) -> None:
self.app.pop_screen()

100
tui/screens/settings.py Normal file
View File

@@ -0,0 +1,100 @@
from textual.app import ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Button, Input, Select
from textual.containers import Vertical, Horizontal
from tui.config import parse_conf, write_conf, CONFIG_DIR
from tui.models import AppSettings
class SettingsScreen(Screen):
BINDINGS = [("escape", "go_back", "Back")]
def compose(self) -> ComposeResult:
yield Header()
conf = parse_conf(CONFIG_DIR / "gniza.conf")
settings = AppSettings.from_conf(conf)
with Vertical(id="settings-screen"):
yield Static("Settings", id="screen-title")
yield Static("Log Level:")
yield Select(
[("Debug", "debug"), ("Info", "info"), ("Warning", "warn"), ("Error", "error")],
id="set-loglevel",
value=settings.log_level.lower(),
)
yield Static("Log Retention (days):")
yield Input(value=settings.log_retain, id="set-logretain")
yield Static("Default Retention Count:")
yield Input(value=settings.retention_count, id="set-retention")
yield Static("Default Bandwidth Limit (KB/s, 0=unlimited):")
yield Input(value=settings.bwlimit, id="set-bwlimit")
yield Static("Notification Email:")
yield Input(value=settings.notify_email, id="set-email")
yield Static("Notify On:")
yield Select(
[("Always", "always"), ("Failure only", "failure"), ("Never", "never")],
id="set-notifyon",
value=settings.notify_on,
)
yield Static("SMTP Host:")
yield Input(value=settings.smtp_host, id="set-smtphost")
yield Static("SMTP Port:")
yield Input(value=settings.smtp_port, id="set-smtpport")
yield Static("SMTP User:")
yield Input(value=settings.smtp_user, id="set-smtpuser")
yield Static("SMTP Password:")
yield Input(value=settings.smtp_password, password=True, id="set-smtppass")
yield Static("SMTP From:")
yield Input(value=settings.smtp_from, id="set-smtpfrom")
yield Static("SMTP Security:")
yield Select(
[("TLS", "tls"), ("SSL", "ssl"), ("None", "none")],
id="set-smtpsec",
value=settings.smtp_security,
)
yield Static("SSH Timeout:")
yield Input(value=settings.ssh_timeout, id="set-sshtimeout")
yield Static("SSH Retries:")
yield Input(value=settings.ssh_retries, id="set-sshretries")
yield Static("Extra rsync options:")
yield Input(value=settings.rsync_extra_opts, id="set-rsyncopts")
with Horizontal(id="set-buttons"):
yield Button("Save", variant="primary", id="btn-save")
yield Button("Back", id="btn-back")
yield Footer()
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "btn-back":
self.app.pop_screen()
elif event.button.id == "btn-save":
self._save()
def _get_select_val(self, sel_id: str, default: str) -> str:
sel = self.query_one(sel_id, Select)
return str(sel.value) if sel.value is not Select.BLANK else default
def _save(self) -> None:
settings = AppSettings(
log_level=self._get_select_val("#set-loglevel", "info"),
log_retain=self.query_one("#set-logretain", Input).value.strip() or "30",
retention_count=self.query_one("#set-retention", Input).value.strip() or "7",
bwlimit=self.query_one("#set-bwlimit", Input).value.strip() or "0",
notify_email=self.query_one("#set-email", Input).value.strip(),
notify_on=self._get_select_val("#set-notifyon", "failure"),
smtp_host=self.query_one("#set-smtphost", Input).value.strip(),
smtp_port=self.query_one("#set-smtpport", Input).value.strip() or "587",
smtp_user=self.query_one("#set-smtpuser", Input).value.strip(),
smtp_password=self.query_one("#set-smtppass", Input).value,
smtp_from=self.query_one("#set-smtpfrom", Input).value.strip(),
smtp_security=self._get_select_val("#set-smtpsec", "tls"),
ssh_timeout=self.query_one("#set-sshtimeout", Input).value.strip() or "30",
ssh_retries=self.query_one("#set-sshretries", Input).value.strip() or "3",
rsync_extra_opts=self.query_one("#set-rsyncopts", Input).value.strip(),
)
conf_path = CONFIG_DIR / "gniza.conf"
write_conf(conf_path, settings.to_conf())
self.notify("Settings saved.")
def action_go_back(self) -> None:
self.app.pop_screen()

68
tui/screens/snapshots.py Normal file
View File

@@ -0,0 +1,68 @@
from textual.app import ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Button, Select, DataTable
from textual.containers import Vertical, Horizontal
from textual import work
from tui.config import list_conf_dir
from tui.backend import run_cli
class SnapshotsScreen(Screen):
BINDINGS = [("escape", "go_back", "Back")]
def compose(self) -> ComposeResult:
yield Header()
targets = list_conf_dir("targets.d")
remotes = list_conf_dir("remotes.d")
with Vertical(id="snapshots-screen"):
yield Static("Snapshots", id="screen-title")
if not targets or not remotes:
yield Static("Targets and remotes must be configured to browse snapshots.")
else:
yield Static("Target:")
yield Select([(t, t) for t in targets], id="snap-target", prompt="Select target")
yield Static("Remote:")
yield Select([(r, r) for r in remotes], id="snap-remote", prompt="Select remote")
yield Button("Load Snapshots", id="btn-load", variant="primary")
yield DataTable(id="snap-table")
yield Button("Back", id="btn-back")
yield Footer()
def on_mount(self) -> None:
try:
table = self.query_one("#snap-table", DataTable)
table.add_columns("Snapshot")
except Exception:
pass
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "btn-back":
self.app.pop_screen()
elif event.button.id == "btn-load":
self._load_snapshots()
@work
async def _load_snapshots(self) -> None:
target_sel = self.query_one("#snap-target", Select)
remote_sel = self.query_one("#snap-remote", Select)
if target_sel.value is Select.BLANK or remote_sel.value is Select.BLANK:
self.notify("Select target and remote first", severity="error")
return
target = str(target_sel.value)
remote = str(remote_sel.value)
rc, stdout, stderr = await run_cli("snapshots", "list", f"--target={target}", f"--remote={remote}")
table = self.query_one("#snap-table", DataTable)
table.clear()
lines = [l.strip() for l in stdout.splitlines() if l.strip() and not l.startswith("===")]
if lines:
for s in lines:
table.add_row(s)
else:
self.notify("No snapshots found", severity="warning")
if stderr:
self.notify(stderr.strip(), severity="error")
def action_go_back(self) -> None:
self.app.pop_screen()

116
tui/screens/target_edit.py Normal file
View File

@@ -0,0 +1,116 @@
import re
from textual.app import ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Button, Input, Switch
from textual.containers import Vertical, Horizontal
from tui.config import parse_conf, write_conf, CONFIG_DIR, list_conf_dir
from tui.models import Target
from tui.widgets import FolderPicker
_NAME_RE = re.compile(r'^[a-zA-Z][a-zA-Z0-9_-]{0,31}$')
class TargetEditScreen(Screen):
BINDINGS = [("escape", "go_back", "Back")]
def __init__(self, name: str = ""):
super().__init__()
self._edit_name = name
self._is_new = not name
def compose(self) -> ComposeResult:
yield Header()
title = "Add Target" if self._is_new else f"Edit Target: {self._edit_name}"
target = Target()
if not self._is_new:
data = parse_conf(CONFIG_DIR / "targets.d" / f"{self._edit_name}.conf")
target = Target.from_conf(self._edit_name, data)
with Vertical(id="target-edit"):
yield Static(title, id="screen-title")
if self._is_new:
yield Static("Name:")
yield Input(value="", placeholder="Target name", id="te-name")
yield Static("Folders (comma-separated):")
yield Input(value=target.folders, placeholder="/path1,/path2", id="te-folders")
yield Button("Browse...", id="btn-browse")
yield Static("Exclude patterns:")
yield Input(value=target.exclude, placeholder="*.tmp,*.log", id="te-exclude")
yield Static("Remote override:")
yield Input(value=target.remote, placeholder="Leave empty for default", id="te-remote")
yield Static("Retention override:")
yield Input(value=target.retention, placeholder="Leave empty for default", id="te-retention")
yield Static("Pre-backup hook:")
yield Input(value=target.pre_hook, placeholder="Command to run before backup", id="te-prehook")
yield Static("Post-backup hook:")
yield Input(value=target.post_hook, placeholder="Command to run after backup", id="te-posthook")
with Horizontal():
yield Static("Enabled: ")
yield Switch(value=target.enabled == "yes", id="te-enabled")
with Horizontal(id="te-buttons"):
yield Button("Save", variant="primary", id="btn-save")
yield Button("Cancel", id="btn-cancel")
yield Footer()
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "btn-cancel":
self.dismiss(None)
elif event.button.id == "btn-browse":
self.app.push_screen(
FolderPicker("Select folder to back up"),
callback=self._folder_selected,
)
elif event.button.id == "btn-save":
self._save()
def _folder_selected(self, path: str | None) -> None:
if path:
folders_input = self.query_one("#te-folders", Input)
current = folders_input.value.strip()
if current:
existing = [f.strip() for f in current.split(",")]
if path not in existing:
folders_input.value = current + "," + path
else:
folders_input.value = path
def _save(self) -> None:
if self._is_new:
name = self.query_one("#te-name", Input).value.strip()
if not name:
self.notify("Name is required", severity="error")
return
if not _NAME_RE.match(name):
self.notify("Invalid name. Use letters, digits, _ - (max 32 chars, start with letter).", severity="error")
return
conf = CONFIG_DIR / "targets.d" / f"{name}.conf"
if conf.exists():
self.notify(f"Target '{name}' already exists.", severity="error")
return
else:
name = self._edit_name
folders = self.query_one("#te-folders", Input).value.strip()
if not folders:
self.notify("At least one folder is required", severity="error")
return
target = Target(
name=name,
folders=folders,
exclude=self.query_one("#te-exclude", Input).value.strip(),
remote=self.query_one("#te-remote", Input).value.strip(),
retention=self.query_one("#te-retention", Input).value.strip(),
pre_hook=self.query_one("#te-prehook", Input).value.strip(),
post_hook=self.query_one("#te-posthook", Input).value.strip(),
enabled="yes" if self.query_one("#te-enabled", Switch).value else "no",
)
conf = CONFIG_DIR / "targets.d" / f"{name}.conf"
write_conf(conf, target.to_conf())
self.notify(f"Target '{name}' saved.")
self.dismiss(name)
def action_go_back(self) -> None:
self.dismiss(None)

80
tui/screens/targets.py Normal file
View File

@@ -0,0 +1,80 @@
from textual.app import ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Button, DataTable
from textual.containers import Vertical, Horizontal
from tui.config import list_conf_dir, parse_conf, CONFIG_DIR
from tui.widgets import ConfirmDialog
class TargetsScreen(Screen):
BINDINGS = [("escape", "go_back", "Back")]
def compose(self) -> ComposeResult:
yield Header()
with Vertical(id="targets-screen"):
yield Static("Targets", id="screen-title")
yield DataTable(id="targets-table")
with Horizontal(id="targets-buttons"):
yield Button("Add", variant="primary", id="btn-add")
yield Button("Edit", id="btn-edit")
yield Button("Delete", variant="error", id="btn-delete")
yield Button("Back", id="btn-back")
yield Footer()
def on_mount(self) -> None:
self._refresh_table()
def _refresh_table(self) -> None:
table = self.query_one("#targets-table", DataTable)
table.clear(columns=True)
table.add_columns("Name", "Folders", "Enabled")
targets = list_conf_dir("targets.d")
for name in targets:
data = parse_conf(CONFIG_DIR / "targets.d" / f"{name}.conf")
table.add_row(
name,
data.get("TARGET_FOLDERS", ""),
data.get("TARGET_ENABLED", "yes"),
key=name,
)
def _selected_target(self) -> str | None:
table = self.query_one("#targets-table", DataTable)
if table.cursor_row is not None and table.row_count > 0:
row_key = table.get_row_at(table.cursor_row)
return str(table.coordinate_to_cell_key((table.cursor_row, 0)).row_key.value)
return None
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "btn-back":
self.app.pop_screen()
elif event.button.id == "btn-add":
self.app.push_screen("target_edit", callback=lambda _: self._refresh_table())
elif event.button.id == "btn-edit":
name = self._selected_target()
if name:
from tui.screens.target_edit import TargetEditScreen
self.app.push_screen(TargetEditScreen(name), callback=lambda _: self._refresh_table())
else:
self.notify("Select a target first", severity="warning")
elif event.button.id == "btn-delete":
name = self._selected_target()
if name:
self.app.push_screen(
ConfirmDialog(f"Delete target '{name}'? This cannot be undone.", "Delete Target"),
callback=lambda ok: self._delete_target(name) if ok else None,
)
else:
self.notify("Select a target first", severity="warning")
def _delete_target(self, name: str) -> None:
conf = CONFIG_DIR / "targets.d" / f"{name}.conf"
if conf.is_file():
conf.unlink()
self.notify(f"Target '{name}' deleted.")
self._refresh_table()
def action_go_back(self) -> None:
self.app.pop_screen()

69
tui/screens/verify.py Normal file
View File

@@ -0,0 +1,69 @@
from textual.app import ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Button, Select
from textual.containers import Vertical, Horizontal
from textual import work
from tui.config import list_conf_dir
from tui.backend import stream_cli
from tui.widgets import ConfirmDialog, OperationLog
class VerifyScreen(Screen):
BINDINGS = [("escape", "go_back", "Back")]
def compose(self) -> ComposeResult:
yield Header()
targets = list_conf_dir("targets.d")
with Vertical(id="verify-screen"):
yield Static("Verify Backups", id="screen-title")
if not targets:
yield Static("No targets configured.")
else:
yield Static("Target:")
yield Select(
[(t, t) for t in targets],
id="verify-target",
prompt="Select target",
)
with Horizontal(id="verify-buttons"):
yield Button("Verify Selected", variant="primary", id="btn-verify")
yield Button("Verify All", variant="warning", id="btn-verify-all")
yield Button("Back", id="btn-back")
yield Footer()
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "btn-back":
self.app.pop_screen()
elif event.button.id == "btn-verify":
target_sel = self.query_one("#verify-target", Select)
if target_sel.value is Select.BLANK:
self.notify("Select a target first", severity="error")
return
self._do_verify(str(target_sel.value))
elif event.button.id == "btn-verify-all":
self._do_verify_all()
@work
async def _do_verify(self, target: str) -> None:
log_screen = OperationLog(f"Verify: {target}")
self.app.push_screen(log_screen)
rc = await stream_cli(log_screen.write, "verify", f"--target={target}")
if rc == 0:
log_screen.write("\n[green]Verification completed successfully.[/green]")
else:
log_screen.write(f"\n[red]Verification failed (exit code {rc}).[/red]")
@work
async def _do_verify_all(self) -> None:
log_screen = OperationLog("Verify All Targets")
self.app.push_screen(log_screen)
rc = await stream_cli(log_screen.write, "verify", "--all")
if rc == 0:
log_screen.write("\n[green]All verifications completed.[/green]")
else:
log_screen.write(f"\n[red]Verification failed (exit code {rc}).[/red]")
def action_go_back(self) -> None:
self.app.pop_screen()

46
tui/screens/wizard.py Normal file
View File

@@ -0,0 +1,46 @@
from textual.app import ComposeResult
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Button
from textual.containers import Vertical, Center
from tui.config import has_remotes, has_targets
class WizardScreen(Screen):
def compose(self) -> ComposeResult:
yield Header()
with Center():
with Vertical(id="wizard"):
yield Static(
"[bold]Welcome to gniza Backup Manager![/bold]\n\n"
"This wizard will help you set up your first backup:\n\n"
" 1. Configure a backup destination (remote)\n"
" 2. Define what to back up (target)\n"
" 3. Optionally run your first backup\n",
id="wizard-welcome",
markup=True,
)
if not has_remotes():
yield Button("Step 1: Add Remote", variant="primary", id="wiz-remote")
else:
yield Static("[green]Remote configured.[/green]", markup=True)
if not has_targets():
yield Button("Step 2: Add Target", variant="primary", id="wiz-target")
else:
yield Static("[green]Target configured.[/green]", markup=True)
yield Button("Continue to Main Menu", id="wiz-continue")
yield Button("Skip Setup", id="wiz-skip")
yield Footer()
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "wiz-remote":
self.app.push_screen("remote_edit", callback=self._check_progress)
elif event.button.id == "wiz-target":
self.app.push_screen("target_edit", callback=self._check_progress)
elif event.button.id in ("wiz-continue", "wiz-skip"):
self.app.switch_screen("main")
def _check_progress(self, result) -> None:
if has_remotes() and has_targets():
self.app.switch_screen("main")

3
tui/widgets/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
from tui.widgets.folder_picker import FolderPicker
from tui.widgets.confirm_dialog import ConfirmDialog
from tui.widgets.operation_log import OperationLog

View File

@@ -0,0 +1,28 @@
from textual.app import ComposeResult
from textual.screen import ModalScreen
from textual.widgets import Static, Button
from textual.containers import Horizontal, Vertical
class ConfirmDialog(ModalScreen[bool]):
BINDINGS = [("escape", "cancel", "Cancel")]
def __init__(self, message: str, title: str = "Confirm"):
super().__init__()
self._message = message
self._title = title
def compose(self) -> ComposeResult:
with Vertical(id="confirm-dialog"):
yield Static(self._title, id="cd-title")
yield Static(self._message, id="cd-message")
with Horizontal(id="cd-buttons"):
yield Button("Yes", variant="primary", id="cd-yes")
yield Button("No", variant="default", id="cd-no")
def on_button_pressed(self, event: Button.Pressed) -> None:
self.dismiss(event.button.id == "cd-yes")
def action_cancel(self) -> None:
self.dismiss(False)

View File

@@ -0,0 +1,42 @@
from textual.app import ComposeResult
from textual.screen import ModalScreen
from textual.widgets import DirectoryTree, Header, Footer, Static, Button
from textual.containers import Horizontal, Vertical
from pathlib import Path
class _DirOnly(DirectoryTree):
def filter_paths(self, paths):
return [p for p in paths if p.is_dir()]
class FolderPicker(ModalScreen[str | None]):
BINDINGS = [("escape", "cancel", "Cancel")]
def __init__(self, title: str = "Select folder", start: str = "/"):
super().__init__()
self._title = title
self._start = start
def compose(self) -> ComposeResult:
with Vertical(id="folder-picker"):
yield Static(self._title, id="fp-title")
yield _DirOnly(self._start, id="fp-tree")
with Horizontal(id="fp-buttons"):
yield Button("Select", variant="primary", id="fp-select")
yield Button("Cancel", variant="default", id="fp-cancel")
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "fp-select":
tree = self.query_one("#fp-tree", DirectoryTree)
node = tree.cursor_node
if node and node.data and node.data.path:
self.dismiss(str(node.data.path))
else:
self.dismiss(None)
else:
self.dismiss(None)
def action_cancel(self) -> None:
self.dismiss(None)

View File

@@ -0,0 +1,28 @@
from textual.app import ComposeResult
from textual.screen import ModalScreen
from textual.widgets import RichLog, Button, Static
from textual.containers import Vertical
class OperationLog(ModalScreen[None]):
BINDINGS = [("escape", "close", "Close")]
def __init__(self, title: str = "Operation Output"):
super().__init__()
self._title = title
def compose(self) -> ComposeResult:
with Vertical(id="op-log"):
yield Static(self._title, id="ol-title")
yield RichLog(id="ol-log", wrap=True, highlight=True)
yield Button("Close", variant="primary", id="ol-close")
def on_button_pressed(self, event: Button.Pressed) -> None:
self.dismiss(None)
def action_close(self) -> None:
self.dismiss(None)
def write(self, text: str) -> None:
self.query_one("#ol-log", RichLog).write(text)