From 587149f062c56305697ed788cb65c5aef6d66708 Mon Sep 17 00:00:00 2001 From: shuki Date: Thu, 5 Mar 2026 23:39:48 +0200 Subject: [PATCH] Add Python Textual TUI replacing gum-based bash TUI New tui/ package with 14 screens (main menu, backup, restore, targets, remotes, snapshots, verify, retention, schedule, logs, settings, wizard), 3 custom widgets (folder picker, confirm dialog, operation log), async backend wrapper, pure-Python config parser, and TCSS theme. bin/gniza now launches Textual TUI when available, falls back to gum. Co-Authored-By: Claude Opus 4.6 --- .gitignore | 2 + bin/gniza | 5 +- tui/__init__.py | 0 tui/__main__.py | 10 ++ tui/app.py | 46 ++++++++ tui/backend.py | 40 +++++++ tui/config.py | 92 +++++++++++++++ tui/gniza.tcss | 197 +++++++++++++++++++++++++++++++ tui/models.py | 215 ++++++++++++++++++++++++++++++++++ tui/screens/__init__.py | 14 +++ tui/screens/backup.py | 92 +++++++++++++++ tui/screens/logs.py | 111 ++++++++++++++++++ tui/screens/main_menu.py | 58 +++++++++ tui/screens/remote_edit.py | 161 +++++++++++++++++++++++++ tui/screens/remotes.py | 106 +++++++++++++++++ tui/screens/restore.py | 110 +++++++++++++++++ tui/screens/retention.py | 90 ++++++++++++++ tui/screens/schedule.py | 164 ++++++++++++++++++++++++++ tui/screens/settings.py | 100 ++++++++++++++++ tui/screens/snapshots.py | 68 +++++++++++ tui/screens/target_edit.py | 116 ++++++++++++++++++ tui/screens/targets.py | 80 +++++++++++++ tui/screens/verify.py | 69 +++++++++++ tui/screens/wizard.py | 46 ++++++++ tui/widgets/__init__.py | 3 + tui/widgets/confirm_dialog.py | 28 +++++ tui/widgets/folder_picker.py | 42 +++++++ tui/widgets/operation_log.py | 28 +++++ 28 files changed, 2092 insertions(+), 1 deletion(-) create mode 100644 tui/__init__.py create mode 100644 tui/__main__.py create mode 100644 tui/app.py create mode 100644 tui/backend.py create mode 100644 tui/config.py create mode 100644 tui/gniza.tcss create mode 100644 tui/models.py create mode 100644 tui/screens/__init__.py create mode 100644 tui/screens/backup.py create mode 100644 tui/screens/logs.py create mode 100644 tui/screens/main_menu.py create mode 100644 tui/screens/remote_edit.py create mode 100644 tui/screens/remotes.py create mode 100644 tui/screens/restore.py create mode 100644 tui/screens/retention.py create mode 100644 tui/screens/schedule.py create mode 100644 tui/screens/settings.py create mode 100644 tui/screens/snapshots.py create mode 100644 tui/screens/target_edit.py create mode 100644 tui/screens/targets.py create mode 100644 tui/screens/verify.py create mode 100644 tui/screens/wizard.py create mode 100644 tui/widgets/__init__.py create mode 100644 tui/widgets/confirm_dialog.py create mode 100644 tui/widgets/folder_picker.py create mode 100644 tui/widgets/operation_log.py diff --git a/.gitignore b/.gitignore index 44fa520..fd40d90 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,5 @@ *.swo *~ .DS_Store +__pycache__/ +*.pyc diff --git a/bin/gniza b/bin/gniza index 853a75b..9c905ca 100755 --- a/bin/gniza +++ b/bin/gniza @@ -436,8 +436,11 @@ if [[ -n "$SUBCOMMAND" ]]; then run_cli elif [[ "$FORCE_CLI" == "true" ]]; then run_cli +elif python3 -c "import textual" 2>/dev/null && [[ -t 1 ]]; then + # Python Textual TUI mode + PYTHONPATH="$GNIZA_DIR:${PYTHONPATH:-}" exec python3 -m tui "$@" elif command -v gum &>/dev/null && [[ -t 1 ]]; then - # TUI mode + # Legacy gum TUI mode show_logo if ! has_remotes || ! has_targets; then ui_first_run_wizard diff --git a/tui/__init__.py b/tui/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tui/__main__.py b/tui/__main__.py new file mode 100644 index 0000000..25c13d5 --- /dev/null +++ b/tui/__main__.py @@ -0,0 +1,10 @@ +from tui.app import GnizaApp + + +def main(): + app = GnizaApp() + app.run() + + +if __name__ == "__main__": + main() diff --git a/tui/app.py b/tui/app.py new file mode 100644 index 0000000..8a05b72 --- /dev/null +++ b/tui/app.py @@ -0,0 +1,46 @@ +from textual.app import App + +from tui.config import has_remotes, has_targets +from tui.screens.main_menu import MainMenuScreen +from tui.screens.backup import BackupScreen +from tui.screens.restore import RestoreScreen +from tui.screens.targets import TargetsScreen +from tui.screens.target_edit import TargetEditScreen +from tui.screens.remotes import RemotesScreen +from tui.screens.remote_edit import RemoteEditScreen +from tui.screens.snapshots import SnapshotsScreen +from tui.screens.verify import VerifyScreen +from tui.screens.retention import RetentionScreen +from tui.screens.schedule import ScheduleScreen +from tui.screens.logs import LogsScreen +from tui.screens.settings import SettingsScreen +from tui.screens.wizard import WizardScreen + + +class GnizaApp(App): + + TITLE = "gniza - Linux Backup Manager" + CSS_PATH = "gniza.tcss" + + SCREENS = { + "main": MainMenuScreen, + "backup": BackupScreen, + "restore": RestoreScreen, + "targets": TargetsScreen, + "target_edit": TargetEditScreen, + "remotes": RemotesScreen, + "remote_edit": RemoteEditScreen, + "snapshots": SnapshotsScreen, + "verify": VerifyScreen, + "retention": RetentionScreen, + "schedule": ScheduleScreen, + "logs": LogsScreen, + "settings": SettingsScreen, + "wizard": WizardScreen, + } + + def on_mount(self) -> None: + if not has_remotes() or not has_targets(): + self.push_screen("wizard") + else: + self.push_screen("main") diff --git a/tui/backend.py b/tui/backend.py new file mode 100644 index 0000000..16d9999 --- /dev/null +++ b/tui/backend.py @@ -0,0 +1,40 @@ +import asyncio +import os +from pathlib import Path + + +def _gniza_bin() -> str: + gniza_dir = os.environ.get("GNIZA_DIR", "") + if gniza_dir: + return str(Path(gniza_dir) / "bin" / "gniza") + here = Path(__file__).resolve().parent.parent / "bin" / "gniza" + if here.is_file(): + return str(here) + return "gniza" + + +async def run_cli(*args: str) -> tuple[int, str, str]: + cmd = [_gniza_bin(), "--cli"] + list(args) + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await proc.communicate() + return proc.returncode or 0, stdout.decode(), stderr.decode() + + +async def stream_cli(callback, *args: str) -> int: + cmd = [_gniza_bin(), "--cli"] + list(args) + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + ) + while True: + line = await proc.stdout.readline() + if not line: + break + callback(line.decode().rstrip("\n")) + await proc.wait() + return proc.returncode or 0 diff --git a/tui/config.py b/tui/config.py new file mode 100644 index 0000000..0356504 --- /dev/null +++ b/tui/config.py @@ -0,0 +1,92 @@ +import os +import re +from pathlib import Path + +_KV_RE = re.compile(r'^([A-Z_][A-Z_0-9]*)=(.*)') +_QUOTED_RE = re.compile(r'^"(.*)"$|^\'(.*)\'$') + + +def _get_config_dir() -> Path: + if os.geteuid() == 0: + return Path("/etc/gniza") + xdg = os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config")) + return Path(xdg) / "gniza" + + +def _get_log_dir() -> Path: + if os.geteuid() == 0: + return Path("/var/log/gniza") + xdg = os.environ.get("XDG_STATE_HOME", os.path.expanduser("~/.local/state")) + return Path(xdg) / "gniza" / "log" + + +CONFIG_DIR = _get_config_dir() +LOG_DIR = _get_log_dir() + + +def parse_conf(filepath: Path) -> dict[str, str]: + data = {} + if not filepath.is_file(): + return data + for line in filepath.read_text().splitlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + m = _KV_RE.match(line) + if m: + key = m.group(1) + value = m.group(2) + qm = _QUOTED_RE.match(value) + if qm: + value = qm.group(1) if qm.group(1) is not None else qm.group(2) + data[key] = value + return data + + +def _sanitize_value(value: str) -> str: + return value.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "") + + +def write_conf(filepath: Path, data: dict[str, str]) -> None: + filepath.parent.mkdir(parents=True, exist_ok=True) + lines = [] + for key, value in data.items(): + lines.append(f'{key}="{_sanitize_value(value)}"') + filepath.write_text("\n".join(lines) + "\n") + filepath.chmod(0o600) + + +def update_conf_key(filepath: Path, key: str, value: str) -> None: + value = _sanitize_value(value) + filepath.parent.mkdir(parents=True, exist_ok=True) + if not filepath.is_file(): + filepath.write_text(f'{key}="{value}"\n') + filepath.chmod(0o600) + return + lines = filepath.read_text().splitlines() + found = False + for i, line in enumerate(lines): + m = _KV_RE.match(line.strip()) + if m and m.group(1) == key: + lines[i] = f'{key}="{value}"' + found = True + break + if not found: + lines.append(f'{key}="{value}"') + filepath.write_text("\n".join(lines) + "\n") + filepath.chmod(0o600) + + +def list_conf_dir(subdir: str) -> list[str]: + d = CONFIG_DIR / subdir + if not d.is_dir(): + return [] + return sorted(p.stem for p in d.glob("*.conf") if p.is_file()) + + +def has_targets() -> bool: + return len(list_conf_dir("targets.d")) > 0 + + +def has_remotes() -> bool: + return len(list_conf_dir("remotes.d")) > 0 diff --git a/tui/gniza.tcss b/tui/gniza.tcss new file mode 100644 index 0000000..92a7409 --- /dev/null +++ b/tui/gniza.tcss @@ -0,0 +1,197 @@ +/* gniza TUI theme */ + +Screen { + background: $surface; +} + +#screen-title { + text-style: bold; + color: #00cc00; + padding: 1 0; + text-align: center; +} + +/* Main menu */ +#main-menu { + width: 50; + padding: 1 2; +} + +#main-menu Button { + width: 100%; + margin: 0 0 1 0; +} + +#logo { + text-align: center; + padding: 0 0 1 0; +} + +/* Data tables */ +DataTable { + height: 12; + margin: 1 0; +} + +/* Form screens */ +#target-edit, +#remote-edit, +#settings-screen, +#schedule-screen, +#backup-screen, +#restore-screen, +#verify-screen, +#retention-screen, +#snapshots-screen, +#targets-screen, +#remotes-screen, +#logs-screen { + padding: 1 2; + overflow-y: auto; +} + +Input { + margin: 0 0 1 0; +} + +Select { + margin: 0 0 1 0; +} + +/* Button rows */ +#backup-buttons, +#restore-buttons, +#verify-buttons, +#ret-buttons, +#targets-buttons, +#remotes-buttons, +#logs-buttons, +#sched-buttons, +#te-buttons, +#re-buttons, +#set-buttons { + height: auto; + margin: 1 0; +} + +#backup-buttons Button, +#restore-buttons Button, +#verify-buttons Button, +#ret-buttons Button, +#targets-buttons Button, +#remotes-buttons Button, +#logs-buttons Button, +#sched-buttons Button, +#te-buttons Button, +#re-buttons Button, +#set-buttons Button { + margin: 0 1 0 0; +} + +/* Dialogs */ +#confirm-dialog { + width: 60; + height: auto; + padding: 2; + background: $panel; + border: thick $accent; +} + +#cd-title { + text-style: bold; + color: #00cc00; + margin: 0 0 1 0; +} + +#cd-message { + margin: 0 0 1 0; +} + +#cd-buttons { + height: auto; + align: center middle; +} + +#cd-buttons Button { + margin: 0 1 0 0; +} + +/* Folder picker */ +#folder-picker { + width: 70; + height: 30; + padding: 1; + background: $panel; + border: thick $accent; +} + +#fp-title { + text-style: bold; + color: #00cc00; + margin: 0 0 1 0; +} + +#fp-tree { + height: 1fr; +} + +#fp-buttons { + height: auto; + margin: 1 0 0 0; +} + +#fp-buttons Button { + margin: 0 1 0 0; +} + +/* Operation log */ +#op-log { + width: 80%; + height: 80%; + padding: 1; + background: $panel; + border: thick $accent; +} + +#ol-title { + text-style: bold; + color: #00cc00; + margin: 0 0 1 0; +} + +#ol-log { + height: 1fr; + border: round $accent; +} + +#ol-close { + margin: 1 0 0 0; +} + +/* Log viewer */ +#log-viewer { + height: 1fr; + min-height: 8; + border: round $accent; + margin: 1 0; +} + +/* Wizard */ +#wizard { + width: 60; + padding: 2; +} + +#wizard Button { + width: 100%; + margin: 0 0 1 0; +} + +#wizard-welcome { + margin: 0 0 1 0; +} + +/* Switch row */ +Switch { + margin: 0 0 1 1; +} diff --git a/tui/models.py b/tui/models.py new file mode 100644 index 0000000..8d37baf --- /dev/null +++ b/tui/models.py @@ -0,0 +1,215 @@ +from dataclasses import dataclass, field + + +@dataclass +class Target: + name: str = "" + folders: str = "" + exclude: str = "" + remote: str = "" + retention: str = "" + pre_hook: str = "" + post_hook: str = "" + enabled: str = "yes" + + def to_conf(self) -> dict[str, str]: + return { + "TARGET_NAME": self.name, + "TARGET_FOLDERS": self.folders, + "TARGET_EXCLUDE": self.exclude, + "TARGET_REMOTE": self.remote, + "TARGET_RETENTION": self.retention, + "TARGET_PRE_HOOK": self.pre_hook, + "TARGET_POST_HOOK": self.post_hook, + "TARGET_ENABLED": self.enabled, + } + + @classmethod + def from_conf(cls, name: str, data: dict[str, str]) -> "Target": + return cls( + name=data.get("TARGET_NAME", name), + folders=data.get("TARGET_FOLDERS", ""), + exclude=data.get("TARGET_EXCLUDE", ""), + remote=data.get("TARGET_REMOTE", ""), + retention=data.get("TARGET_RETENTION", ""), + pre_hook=data.get("TARGET_PRE_HOOK", ""), + post_hook=data.get("TARGET_POST_HOOK", ""), + enabled=data.get("TARGET_ENABLED", "yes"), + ) + + +@dataclass +class Remote: + name: str = "" + type: str = "ssh" + host: str = "" + port: str = "22" + user: str = "root" + auth_method: str = "key" + key: str = "" + password: str = "" + base: str = "/backups" + bwlimit: str = "0" + retention_count: str = "30" + s3_bucket: str = "" + s3_region: str = "us-east-1" + s3_endpoint: str = "" + s3_access_key_id: str = "" + s3_secret_access_key: str = "" + gdrive_sa_file: str = "" + gdrive_root_folder_id: str = "" + + def to_conf(self) -> dict[str, str]: + data: dict[str, str] = {"REMOTE_TYPE": self.type} + if self.type == "ssh": + data.update({ + "REMOTE_HOST": self.host, + "REMOTE_PORT": self.port, + "REMOTE_USER": self.user, + "REMOTE_AUTH_METHOD": self.auth_method, + "REMOTE_KEY": self.key, + "REMOTE_PASSWORD": self.password, + "REMOTE_BASE": self.base, + "BWLIMIT": self.bwlimit, + "RETENTION_COUNT": self.retention_count, + }) + elif self.type == "local": + data.update({ + "REMOTE_BASE": self.base, + "RETENTION_COUNT": self.retention_count, + }) + elif self.type == "s3": + data.update({ + "S3_BUCKET": self.s3_bucket, + "S3_REGION": self.s3_region, + "S3_ENDPOINT": self.s3_endpoint, + "S3_ACCESS_KEY_ID": self.s3_access_key_id, + "S3_SECRET_ACCESS_KEY": self.s3_secret_access_key, + "REMOTE_BASE": self.base, + "RETENTION_COUNT": self.retention_count, + }) + elif self.type == "gdrive": + data.update({ + "GDRIVE_SERVICE_ACCOUNT_FILE": self.gdrive_sa_file, + "GDRIVE_ROOT_FOLDER_ID": self.gdrive_root_folder_id, + "REMOTE_BASE": self.base, + "RETENTION_COUNT": self.retention_count, + }) + return data + + @classmethod + def from_conf(cls, name: str, data: dict[str, str]) -> "Remote": + return cls( + name=name, + type=data.get("REMOTE_TYPE", "ssh"), + host=data.get("REMOTE_HOST", ""), + port=data.get("REMOTE_PORT", "22"), + user=data.get("REMOTE_USER", "root"), + auth_method=data.get("REMOTE_AUTH_METHOD", "key"), + key=data.get("REMOTE_KEY", ""), + password=data.get("REMOTE_PASSWORD", ""), + base=data.get("REMOTE_BASE", "/backups"), + bwlimit=data.get("BWLIMIT", "0"), + retention_count=data.get("RETENTION_COUNT", "30"), + s3_bucket=data.get("S3_BUCKET", ""), + s3_region=data.get("S3_REGION", "us-east-1"), + s3_endpoint=data.get("S3_ENDPOINT", ""), + s3_access_key_id=data.get("S3_ACCESS_KEY_ID", ""), + s3_secret_access_key=data.get("S3_SECRET_ACCESS_KEY", ""), + gdrive_sa_file=data.get("GDRIVE_SERVICE_ACCOUNT_FILE", ""), + gdrive_root_folder_id=data.get("GDRIVE_ROOT_FOLDER_ID", ""), + ) + + +@dataclass +class Schedule: + name: str = "" + schedule: str = "daily" + time: str = "02:00" + day: str = "" + cron: str = "" + targets: str = "" + remotes: str = "" + + def to_conf(self) -> dict[str, str]: + return { + "SCHEDULE": self.schedule, + "SCHEDULE_TIME": self.time, + "SCHEDULE_DAY": self.day, + "SCHEDULE_CRON": self.cron, + "TARGETS": self.targets, + "REMOTES": self.remotes, + } + + @classmethod + def from_conf(cls, name: str, data: dict[str, str]) -> "Schedule": + return cls( + name=name, + schedule=data.get("SCHEDULE", "daily"), + time=data.get("SCHEDULE_TIME", "02:00"), + day=data.get("SCHEDULE_DAY", ""), + cron=data.get("SCHEDULE_CRON", ""), + targets=data.get("TARGETS", ""), + remotes=data.get("REMOTES", ""), + ) + + +@dataclass +class AppSettings: + backup_mode: str = "incremental" + bwlimit: str = "0" + retention_count: str = "7" + log_level: str = "INFO" + log_retain: str = "30" + notify_email: str = "" + notify_on: str = "failure" + smtp_host: str = "" + smtp_port: str = "587" + smtp_user: str = "" + smtp_password: str = "" + smtp_from: str = "" + smtp_security: str = "tls" + ssh_timeout: str = "30" + ssh_retries: str = "3" + rsync_extra_opts: str = "" + + @classmethod + def from_conf(cls, data: dict[str, str]) -> "AppSettings": + return cls( + backup_mode=data.get("BACKUP_MODE", "incremental"), + bwlimit=data.get("BWLIMIT", "0"), + retention_count=data.get("RETENTION_COUNT", "7"), + log_level=data.get("LOG_LEVEL", "INFO"), + log_retain=data.get("LOG_RETAIN", "30"), + notify_email=data.get("NOTIFY_EMAIL", ""), + notify_on=data.get("NOTIFY_ON", "failure"), + smtp_host=data.get("SMTP_HOST", ""), + smtp_port=data.get("SMTP_PORT", "587"), + smtp_user=data.get("SMTP_USER", ""), + smtp_password=data.get("SMTP_PASSWORD", ""), + smtp_from=data.get("SMTP_FROM", ""), + smtp_security=data.get("SMTP_SECURITY", "tls"), + ssh_timeout=data.get("SSH_TIMEOUT", "30"), + ssh_retries=data.get("SSH_RETRIES", "3"), + rsync_extra_opts=data.get("RSYNC_EXTRA_OPTS", ""), + ) + + def to_conf(self) -> dict[str, str]: + return { + "BACKUP_MODE": self.backup_mode, + "BWLIMIT": self.bwlimit, + "RETENTION_COUNT": self.retention_count, + "LOG_LEVEL": self.log_level, + "LOG_RETAIN": self.log_retain, + "NOTIFY_EMAIL": self.notify_email, + "NOTIFY_ON": self.notify_on, + "SMTP_HOST": self.smtp_host, + "SMTP_PORT": self.smtp_port, + "SMTP_USER": self.smtp_user, + "SMTP_PASSWORD": self.smtp_password, + "SMTP_FROM": self.smtp_from, + "SMTP_SECURITY": self.smtp_security, + "SSH_TIMEOUT": self.ssh_timeout, + "SSH_RETRIES": self.ssh_retries, + "RSYNC_EXTRA_OPTS": self.rsync_extra_opts, + } diff --git a/tui/screens/__init__.py b/tui/screens/__init__.py new file mode 100644 index 0000000..a543d0d --- /dev/null +++ b/tui/screens/__init__.py @@ -0,0 +1,14 @@ +from tui.screens.main_menu import MainMenuScreen +from tui.screens.backup import BackupScreen +from tui.screens.restore import RestoreScreen +from tui.screens.targets import TargetsScreen +from tui.screens.target_edit import TargetEditScreen +from tui.screens.remotes import RemotesScreen +from tui.screens.remote_edit import RemoteEditScreen +from tui.screens.snapshots import SnapshotsScreen +from tui.screens.verify import VerifyScreen +from tui.screens.retention import RetentionScreen +from tui.screens.schedule import ScheduleScreen +from tui.screens.logs import LogsScreen +from tui.screens.settings import SettingsScreen +from tui.screens.wizard import WizardScreen diff --git a/tui/screens/backup.py b/tui/screens/backup.py new file mode 100644 index 0000000..119df12 --- /dev/null +++ b/tui/screens/backup.py @@ -0,0 +1,92 @@ +from textual.app import ComposeResult +from textual.screen import Screen +from textual.widgets import Header, Footer, Static, Button, Select +from textual.containers import Vertical, Horizontal +from textual import work + +from tui.config import list_conf_dir, has_targets, has_remotes +from tui.backend import stream_cli +from tui.widgets import ConfirmDialog, OperationLog + + +class BackupScreen(Screen): + + BINDINGS = [("escape", "go_back", "Back")] + + def compose(self) -> ComposeResult: + yield Header() + targets = list_conf_dir("targets.d") + remotes = list_conf_dir("remotes.d") + with Vertical(id="backup-screen"): + yield Static("Backup", id="screen-title") + if not targets: + yield Static("No targets configured. Add a target first.") + else: + yield Static("Target:") + yield Select( + [(t, t) for t in targets], + id="backup-target", + prompt="Select target", + ) + yield Static("Remote (optional):") + yield Select( + [("Default (all)", "")] + [(r, r) for r in remotes], + id="backup-remote", + prompt="Select remote", + value="", + ) + with Horizontal(id="backup-buttons"): + yield Button("Run Backup", variant="primary", id="btn-backup") + yield Button("Backup All", variant="warning", id="btn-backup-all") + yield Button("Back", id="btn-back") + yield Footer() + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "btn-back": + self.app.pop_screen() + elif event.button.id == "btn-backup": + target_sel = self.query_one("#backup-target", Select) + if target_sel.value is Select.BLANK: + self.notify("Please select a target", severity="error") + return + target = str(target_sel.value) + remote_sel = self.query_one("#backup-remote", Select) + remote = str(remote_sel.value) if remote_sel.value != Select.BLANK else "" + msg = f"Run backup for target '{target}'?" + if remote: + msg += f"\nRemote: {remote}" + self.app.push_screen( + ConfirmDialog(msg, "Confirm Backup"), + callback=lambda ok: self._do_backup(target, remote) if ok else None, + ) + elif event.button.id == "btn-backup-all": + self.app.push_screen( + ConfirmDialog("Backup ALL targets now?", "Confirm Backup"), + callback=lambda ok: self._do_backup_all() if ok else None, + ) + + @work + async def _do_backup(self, target: str, remote: str) -> None: + log_screen = OperationLog(f"Backup: {target}") + self.app.push_screen(log_screen) + args = ["backup", f"--target={target}"] + if remote: + args.append(f"--remote={remote}") + rc = await stream_cli(log_screen.write, *args) + if rc == 0: + log_screen.write("\n[green]Backup completed successfully.[/green]") + else: + log_screen.write(f"\n[red]Backup failed (exit code {rc}).[/red]") + + @work + async def _do_backup_all(self) -> None: + log_screen = OperationLog("Backup All Targets") + self.app.push_screen(log_screen) + rc = await stream_cli(log_screen.write, "backup", "--all") + if rc == 0: + log_screen.write("\n[green]All backups completed.[/green]") + else: + log_screen.write(f"\n[red]Backup failed (exit code {rc}).[/red]") + + def action_go_back(self) -> None: + self.app.pop_screen() diff --git a/tui/screens/logs.py b/tui/screens/logs.py new file mode 100644 index 0000000..45e8d3a --- /dev/null +++ b/tui/screens/logs.py @@ -0,0 +1,111 @@ +from pathlib import Path +from textual.app import ComposeResult +from textual.screen import Screen +from textual.widgets import Header, Footer, Static, Button, DataTable, RichLog +from textual.containers import Vertical, Horizontal + +from tui.config import LOG_DIR + + +class LogsScreen(Screen): + + BINDINGS = [("escape", "go_back", "Back")] + + def compose(self) -> ComposeResult: + yield Header() + with Vertical(id="logs-screen"): + yield Static("Logs", id="screen-title") + yield DataTable(id="logs-table") + with Horizontal(id="logs-buttons"): + yield Button("View", variant="primary", id="btn-view") + yield Button("Status", id="btn-status") + yield Button("Back", id="btn-back") + yield RichLog(id="log-viewer", wrap=True, highlight=True) + yield Footer() + + def on_mount(self) -> None: + self._refresh_table() + + def _refresh_table(self) -> None: + table = self.query_one("#logs-table", DataTable) + table.clear(columns=True) + table.add_columns("File", "Size") + log_dir = Path(LOG_DIR) + if not log_dir.is_dir(): + return + logs = sorted(log_dir.glob("gniza-*.log"), key=lambda p: p.stat().st_mtime, reverse=True)[:20] + for f in logs: + size = f.stat().st_size + if size >= 1048576: + size_str = f"{size / 1048576:.1f} MB" + elif size >= 1024: + size_str = f"{size / 1024:.1f} KB" + else: + size_str = f"{size} B" + table.add_row(f.name, size_str, key=f.name) + + def _selected_log(self) -> str | None: + table = self.query_one("#logs-table", DataTable) + if table.cursor_row is not None and table.row_count > 0: + return str(table.coordinate_to_cell_key((table.cursor_row, 0)).row_key.value) + return None + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "btn-back": + self.app.pop_screen() + elif event.button.id == "btn-view": + name = self._selected_log() + if name: + self._view_log(name) + else: + self.notify("Select a log file first", severity="warning") + elif event.button.id == "btn-status": + self._show_status() + + def _view_log(self, name: str) -> None: + filepath = (Path(LOG_DIR) / name).resolve() + if not filepath.is_relative_to(Path(LOG_DIR).resolve()): + self.notify("Invalid log path", severity="error") + return + viewer = self.query_one("#log-viewer", RichLog) + viewer.clear() + if filepath.is_file(): + content = filepath.read_text() + viewer.write(content) + else: + viewer.write(f"File not found: {filepath}") + + def _show_status(self) -> None: + viewer = self.query_one("#log-viewer", RichLog) + viewer.clear() + log_dir = Path(LOG_DIR) + viewer.write("Backup Status Overview") + viewer.write("=" * 40) + if not log_dir.is_dir(): + viewer.write("Log directory does not exist.") + return + logs = sorted(log_dir.glob("gniza-*.log"), key=lambda p: p.stat().st_mtime, reverse=True) + if logs: + latest = logs[0] + from datetime import datetime + mtime = datetime.fromtimestamp(latest.stat().st_mtime) + viewer.write(f"Last log: {mtime.strftime('%Y-%m-%d %H:%M:%S')}") + last_line = "" + with open(latest) as f: + for line in f: + last_line = line.rstrip() + if last_line: + viewer.write(f"Last entry: {last_line}") + else: + viewer.write("No backup logs found.") + viewer.write(f"Log files: {len(logs)}") + total = sum(f.stat().st_size for f in logs) + if total >= 1048576: + viewer.write(f"Total size: {total / 1048576:.1f} MB") + elif total >= 1024: + viewer.write(f"Total size: {total / 1024:.1f} KB") + else: + viewer.write(f"Total size: {total} B") + + def action_go_back(self) -> None: + self.app.pop_screen() diff --git a/tui/screens/main_menu.py b/tui/screens/main_menu.py new file mode 100644 index 0000000..b19cdd6 --- /dev/null +++ b/tui/screens/main_menu.py @@ -0,0 +1,58 @@ +from textual.app import ComposeResult +from textual.screen import Screen +from textual.widgets import Header, Footer, Static, Button +from textual.containers import Vertical, Center + +LOGO = """\ + [green] + \u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593 + \u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593 \u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593 + \u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593 + \u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2593 + [/green] + gniza - Linux Backup Manager +""" + + +class MainMenuScreen(Screen): + + BINDINGS = [("q", "quit_app", "Quit")] + + def compose(self) -> ComposeResult: + yield Header() + with Center(): + with Vertical(id="main-menu"): + yield Static(LOGO, id="logo", markup=True) + yield Button("Backup", id="menu-backup", variant="primary") + yield Button("Restore", id="menu-restore") + yield Button("Targets", id="menu-targets") + yield Button("Remotes", id="menu-remotes") + yield Button("Snapshots", id="menu-snapshots") + yield Button("Verify", id="menu-verify") + yield Button("Retention", id="menu-retention") + yield Button("Schedules", id="menu-schedule") + yield Button("Logs", id="menu-logs") + yield Button("Settings", id="menu-settings") + yield Button("Quit", id="menu-quit", variant="error") + yield Footer() + + def on_button_pressed(self, event: Button.Pressed) -> None: + button_map = { + "menu-backup": "backup", + "menu-restore": "restore", + "menu-targets": "targets", + "menu-remotes": "remotes", + "menu-snapshots": "snapshots", + "menu-verify": "verify", + "menu-retention": "retention", + "menu-schedule": "schedule", + "menu-logs": "logs", + "menu-settings": "settings", + } + if event.button.id == "menu-quit": + self.app.exit() + elif event.button.id in button_map: + self.app.push_screen(button_map[event.button.id]) + + def action_quit_app(self) -> None: + self.app.exit() diff --git a/tui/screens/remote_edit.py b/tui/screens/remote_edit.py new file mode 100644 index 0000000..713bc62 --- /dev/null +++ b/tui/screens/remote_edit.py @@ -0,0 +1,161 @@ +import re +from textual.app import ComposeResult +from textual.screen import Screen +from textual.widgets import Header, Footer, Static, Button, Input, Select, RadioSet, RadioButton +from textual.containers import Vertical, Horizontal + +from tui.config import parse_conf, write_conf, CONFIG_DIR +from tui.models import Remote + +_NAME_RE = re.compile(r'^[a-zA-Z][a-zA-Z0-9_-]{0,31}$') + +REMOTE_TYPES = [("SSH", "ssh"), ("Local directory", "local"), ("Amazon S3", "s3"), ("Google Drive", "gdrive")] + + +class RemoteEditScreen(Screen): + + BINDINGS = [("escape", "go_back", "Back")] + + def __init__(self, name: str = ""): + super().__init__() + self._edit_name = name + self._is_new = not name + + def compose(self) -> ComposeResult: + yield Header() + title = "Add Remote" if self._is_new else f"Edit Remote: {self._edit_name}" + remote = Remote() + if not self._is_new: + data = parse_conf(CONFIG_DIR / "remotes.d" / f"{self._edit_name}.conf") + remote = Remote.from_conf(self._edit_name, data) + + with Vertical(id="remote-edit"): + yield Static(title, id="screen-title") + if self._is_new: + yield Static("Name:") + yield Input(value="", placeholder="Remote name", id="re-name") + yield Static("Type:") + yield Select( + REMOTE_TYPES, + id="re-type", + value=remote.type, + ) + # SSH fields + yield Static("Host:", id="lbl-host", classes="ssh-field") + yield Input(value=remote.host, placeholder="hostname or IP", id="re-host", classes="ssh-field") + yield Static("Port:", id="lbl-port", classes="ssh-field") + yield Input(value=remote.port, placeholder="22", id="re-port", classes="ssh-field") + yield Static("User:", id="lbl-user", classes="ssh-field") + yield Input(value=remote.user, placeholder="root", id="re-user", classes="ssh-field") + yield Static("Auth method:", id="lbl-auth", classes="ssh-field") + yield Select( + [("SSH Key", "key"), ("Password", "password")], + id="re-auth", + value=remote.auth_method, + classes="ssh-field", + ) + yield Static("SSH Key path:", id="lbl-key", classes="ssh-field") + yield Input(value=remote.key, placeholder="~/.ssh/id_rsa", id="re-key", classes="ssh-field") + yield Static("Password:", id="lbl-password", classes="ssh-field") + yield Input(value=remote.password, placeholder="SSH password", password=True, id="re-password", classes="ssh-field") + # Common fields + yield Static("Base path:") + yield Input(value=remote.base, placeholder="/backups", id="re-base") + yield Static("Bandwidth limit (KB/s, 0=unlimited):") + yield Input(value=remote.bwlimit, placeholder="0", id="re-bwlimit") + yield Static("Retention count:") + yield Input(value=remote.retention_count, placeholder="30", id="re-retention") + # S3 fields + yield Static("S3 Bucket:", id="lbl-s3bucket", classes="s3-field") + yield Input(value=remote.s3_bucket, placeholder="bucket-name", id="re-s3bucket", classes="s3-field") + yield Static("S3 Region:", id="lbl-s3region", classes="s3-field") + yield Input(value=remote.s3_region, placeholder="us-east-1", id="re-s3region", classes="s3-field") + yield Static("S3 Endpoint:", id="lbl-s3endpoint", classes="s3-field") + yield Input(value=remote.s3_endpoint, placeholder="Leave empty for AWS", id="re-s3endpoint", classes="s3-field") + yield Static("Access Key ID:", id="lbl-s3key", classes="s3-field") + yield Input(value=remote.s3_access_key_id, id="re-s3key", classes="s3-field") + yield Static("Secret Access Key:", id="lbl-s3secret", classes="s3-field") + yield Input(value=remote.s3_secret_access_key, password=True, id="re-s3secret", classes="s3-field") + # GDrive fields + yield Static("Service Account JSON:", id="lbl-gdsa", classes="gdrive-field") + yield Input(value=remote.gdrive_sa_file, placeholder="/path/to/sa.json", id="re-gdsa", classes="gdrive-field") + yield Static("Root Folder ID:", id="lbl-gdfolder", classes="gdrive-field") + yield Input(value=remote.gdrive_root_folder_id, id="re-gdfolder", classes="gdrive-field") + with Horizontal(id="re-buttons"): + yield Button("Save", variant="primary", id="btn-save") + yield Button("Cancel", id="btn-cancel") + yield Footer() + + def on_mount(self) -> None: + self._update_field_visibility() + + def on_select_changed(self, event: Select.Changed) -> None: + if event.select.id == "re-type": + self._update_field_visibility() + + def _update_field_visibility(self) -> None: + type_sel = self.query_one("#re-type", Select) + rtype = str(type_sel.value) if type_sel.value is not Select.BLANK else "ssh" + for w in self.query(".ssh-field"): + w.display = rtype == "ssh" + for w in self.query(".s3-field"): + w.display = rtype == "s3" + for w in self.query(".gdrive-field"): + w.display = rtype == "gdrive" + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "btn-cancel": + self.dismiss(None) + elif event.button.id == "btn-save": + self._save() + + def _save(self) -> None: + if self._is_new: + name = self.query_one("#re-name", Input).value.strip() + if not name: + self.notify("Name is required", severity="error") + return + if not _NAME_RE.match(name): + self.notify("Invalid name.", severity="error") + return + if (CONFIG_DIR / "remotes.d" / f"{name}.conf").exists(): + self.notify(f"Remote '{name}' already exists.", severity="error") + return + else: + name = self._edit_name + + type_sel = self.query_one("#re-type", Select) + rtype = str(type_sel.value) if type_sel.value is not Select.BLANK else "ssh" + + remote = Remote( + name=name, + type=rtype, + host=self.query_one("#re-host", Input).value.strip(), + port=self.query_one("#re-port", Input).value.strip() or "22", + user=self.query_one("#re-user", Input).value.strip() or "root", + auth_method=str(self.query_one("#re-auth", Select).value) if self.query_one("#re-auth", Select).value is not Select.BLANK else "key", + key=self.query_one("#re-key", Input).value.strip(), + password=self.query_one("#re-password", Input).value, + base=self.query_one("#re-base", Input).value.strip() or "/backups", + bwlimit=self.query_one("#re-bwlimit", Input).value.strip() or "0", + retention_count=self.query_one("#re-retention", Input).value.strip() or "30", + s3_bucket=self.query_one("#re-s3bucket", Input).value.strip(), + s3_region=self.query_one("#re-s3region", Input).value.strip() or "us-east-1", + s3_endpoint=self.query_one("#re-s3endpoint", Input).value.strip(), + s3_access_key_id=self.query_one("#re-s3key", Input).value.strip(), + s3_secret_access_key=self.query_one("#re-s3secret", Input).value, + gdrive_sa_file=self.query_one("#re-gdsa", Input).value.strip(), + gdrive_root_folder_id=self.query_one("#re-gdfolder", Input).value.strip(), + ) + + if rtype == "ssh" and not remote.host: + self.notify("Host is required for SSH remotes", severity="error") + return + + conf = CONFIG_DIR / "remotes.d" / f"{name}.conf" + write_conf(conf, remote.to_conf()) + self.notify(f"Remote '{name}' saved.") + self.dismiss(name) + + def action_go_back(self) -> None: + self.dismiss(None) diff --git a/tui/screens/remotes.py b/tui/screens/remotes.py new file mode 100644 index 0000000..e742b4e --- /dev/null +++ b/tui/screens/remotes.py @@ -0,0 +1,106 @@ +from textual.app import ComposeResult +from textual.screen import Screen +from textual.widgets import Header, Footer, Static, Button, DataTable +from textual.containers import Vertical, Horizontal +from textual import work + +from tui.config import list_conf_dir, parse_conf, CONFIG_DIR +from tui.backend import run_cli +from tui.widgets import ConfirmDialog, OperationLog + + +class RemotesScreen(Screen): + + BINDINGS = [("escape", "go_back", "Back")] + + def compose(self) -> ComposeResult: + yield Header() + with Vertical(id="remotes-screen"): + yield Static("Remotes", id="screen-title") + yield DataTable(id="remotes-table") + with Horizontal(id="remotes-buttons"): + yield Button("Add", variant="primary", id="btn-add") + yield Button("Edit", id="btn-edit") + yield Button("Test", variant="warning", id="btn-test") + yield Button("Delete", variant="error", id="btn-delete") + yield Button("Back", id="btn-back") + yield Footer() + + def on_mount(self) -> None: + self._refresh_table() + + def _refresh_table(self) -> None: + table = self.query_one("#remotes-table", DataTable) + table.clear(columns=True) + table.add_columns("Name", "Type", "Host/Path") + remotes = list_conf_dir("remotes.d") + for name in remotes: + data = parse_conf(CONFIG_DIR / "remotes.d" / f"{name}.conf") + rtype = data.get("REMOTE_TYPE", "ssh") + if rtype == "ssh": + loc = f"{data.get('REMOTE_USER', 'root')}@{data.get('REMOTE_HOST', '')}:{data.get('REMOTE_BASE', '')}" + elif rtype == "local": + loc = data.get("REMOTE_BASE", "") + elif rtype == "s3": + loc = f"s3://{data.get('S3_BUCKET', '')}{data.get('REMOTE_BASE', '')}" + else: + loc = data.get("REMOTE_BASE", "") + table.add_row(name, rtype, loc, key=name) + + def _selected_remote(self) -> str | None: + table = self.query_one("#remotes-table", DataTable) + if table.cursor_row is not None and table.row_count > 0: + return str(table.coordinate_to_cell_key((table.cursor_row, 0)).row_key.value) + return None + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "btn-back": + self.app.pop_screen() + elif event.button.id == "btn-add": + self.app.push_screen("remote_edit", callback=lambda _: self._refresh_table()) + elif event.button.id == "btn-edit": + name = self._selected_remote() + if name: + from tui.screens.remote_edit import RemoteEditScreen + self.app.push_screen(RemoteEditScreen(name), callback=lambda _: self._refresh_table()) + else: + self.notify("Select a remote first", severity="warning") + elif event.button.id == "btn-test": + name = self._selected_remote() + if name: + self._test_remote(name) + else: + self.notify("Select a remote first", severity="warning") + elif event.button.id == "btn-delete": + name = self._selected_remote() + if name: + self.app.push_screen( + ConfirmDialog(f"Delete remote '{name}'? This cannot be undone.", "Delete Remote"), + callback=lambda ok: self._delete_remote(name) if ok else None, + ) + else: + self.notify("Select a remote first", severity="warning") + + @work + async def _test_remote(self, name: str) -> None: + log_screen = OperationLog(f"Testing Remote: {name}") + self.app.push_screen(log_screen) + rc, stdout, stderr = await run_cli("remotes", "test", f"--name={name}") + if stdout: + log_screen.write(stdout) + if stderr: + log_screen.write(stderr) + if rc == 0: + log_screen.write("\n[green]Connection test passed.[/green]") + else: + log_screen.write(f"\n[red]Connection test failed (exit code {rc}).[/red]") + + def _delete_remote(self, name: str) -> None: + conf = CONFIG_DIR / "remotes.d" / f"{name}.conf" + if conf.is_file(): + conf.unlink() + self.notify(f"Remote '{name}' deleted.") + self._refresh_table() + + def action_go_back(self) -> None: + self.app.pop_screen() diff --git a/tui/screens/restore.py b/tui/screens/restore.py new file mode 100644 index 0000000..e55f2a1 --- /dev/null +++ b/tui/screens/restore.py @@ -0,0 +1,110 @@ +from textual.app import ComposeResult +from textual.screen import Screen +from textual.widgets import Header, Footer, Static, Button, Select, Input, RadioSet, RadioButton +from textual.containers import Vertical, Horizontal +from textual import work + +from tui.config import list_conf_dir, parse_conf, CONFIG_DIR +from tui.backend import run_cli, stream_cli +from tui.widgets import ConfirmDialog, OperationLog + + +class RestoreScreen(Screen): + + BINDINGS = [("escape", "go_back", "Back")] + + def compose(self) -> ComposeResult: + yield Header() + targets = list_conf_dir("targets.d") + remotes = list_conf_dir("remotes.d") + with Vertical(id="restore-screen"): + yield Static("Restore", id="screen-title") + if not targets or not remotes: + yield Static("Both targets and remotes must be configured for restore.") + else: + yield Static("Target:") + yield Select([(t, t) for t in targets], id="restore-target", prompt="Select target") + yield Static("Remote:") + yield Select([(r, r) for r in remotes], id="restore-remote", prompt="Select remote") + yield Static("Snapshot:") + yield Select([], id="restore-snapshot", prompt="Load snapshots first") + yield Button("Load Snapshots", id="btn-load-snaps") + yield Static("Restore location:") + with RadioSet(id="restore-location"): + yield RadioButton("In-place (original)", value=True) + yield RadioButton("Custom directory") + yield Input(placeholder="Destination directory (e.g. /tmp/restore)", id="restore-dest") + with Horizontal(id="restore-buttons"): + yield Button("Restore", variant="primary", id="btn-restore") + yield Button("Back", id="btn-back") + yield Footer() + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "btn-back": + self.app.pop_screen() + elif event.button.id == "btn-load-snaps": + self._load_snapshots() + elif event.button.id == "btn-restore": + self._start_restore() + + @work + async def _load_snapshots(self) -> None: + target_sel = self.query_one("#restore-target", Select) + remote_sel = self.query_one("#restore-remote", Select) + if target_sel.value is Select.BLANK or remote_sel.value is Select.BLANK: + self.notify("Select target and remote first", severity="error") + return + target = str(target_sel.value) + remote = str(remote_sel.value) + rc, stdout, stderr = await run_cli("snapshots", "list", f"--target={target}", f"--remote={remote}") + snap_sel = self.query_one("#restore-snapshot", Select) + lines = [l.strip() for l in stdout.splitlines() if l.strip() and not l.startswith("===")] + if lines: + snap_sel.set_options([(s, s) for s in lines]) + else: + self.notify("No snapshots found", severity="warning") + + def _start_restore(self) -> None: + target_sel = self.query_one("#restore-target", Select) + remote_sel = self.query_one("#restore-remote", Select) + snap_sel = self.query_one("#restore-snapshot", Select) + if target_sel.value is Select.BLANK: + self.notify("Select a target", severity="error") + return + if remote_sel.value is Select.BLANK: + self.notify("Select a remote", severity="error") + return + if snap_sel.value is Select.BLANK: + self.notify("Select a snapshot", severity="error") + return + target = str(target_sel.value) + remote = str(remote_sel.value) + snapshot = str(snap_sel.value) + radio = self.query_one("#restore-location", RadioSet) + dest_input = self.query_one("#restore-dest", Input) + dest = "" if radio.pressed_index == 0 else dest_input.value + msg = f"Restore snapshot?\n\nTarget: {target}\nRemote: {remote}\nSnapshot: {snapshot}" + if dest: + msg += f"\nDestination: {dest}" + else: + msg += "\nLocation: In-place" + self.app.push_screen( + ConfirmDialog(msg, "Confirm Restore"), + callback=lambda ok: self._do_restore(target, remote, snapshot, dest) if ok else None, + ) + + @work + async def _do_restore(self, target: str, remote: str, snapshot: str, dest: str) -> None: + log_screen = OperationLog(f"Restore: {target}") + self.app.push_screen(log_screen) + args = ["restore", f"--target={target}", f"--remote={remote}", f"--snapshot={snapshot}"] + if dest: + args.append(f"--dest={dest}") + rc = await stream_cli(log_screen.write, *args) + if rc == 0: + log_screen.write("\n[green]Restore completed successfully.[/green]") + else: + log_screen.write(f"\n[red]Restore failed (exit code {rc}).[/red]") + + def action_go_back(self) -> None: + self.app.pop_screen() diff --git a/tui/screens/retention.py b/tui/screens/retention.py new file mode 100644 index 0000000..839345d --- /dev/null +++ b/tui/screens/retention.py @@ -0,0 +1,90 @@ +from textual.app import ComposeResult +from textual.screen import Screen +from textual.widgets import Header, Footer, Static, Button, Select, Input +from textual.containers import Vertical, Horizontal +from textual import work + +from tui.config import list_conf_dir, parse_conf, update_conf_key, CONFIG_DIR +from tui.backend import stream_cli +from tui.widgets import ConfirmDialog, OperationLog + + +class RetentionScreen(Screen): + + BINDINGS = [("escape", "go_back", "Back")] + + def compose(self) -> ComposeResult: + yield Header() + targets = list_conf_dir("targets.d") + conf = parse_conf(CONFIG_DIR / "gniza.conf") + current_count = conf.get("RETENTION_COUNT", "30") + with Vertical(id="retention-screen"): + yield Static("Retention Cleanup", id="screen-title") + if not targets: + yield Static("No targets configured.") + else: + yield Static("Target:") + yield Select( + [(t, t) for t in targets], + id="ret-target", + prompt="Select target", + ) + with Horizontal(id="ret-buttons"): + yield Button("Run Cleanup", variant="primary", id="btn-cleanup") + yield Button("Cleanup All", variant="warning", id="btn-cleanup-all") + yield Static("") + yield Static("Default retention count:") + with Horizontal(): + yield Input(value=current_count, id="ret-count", placeholder="30") + yield Button("Save", id="btn-save-count") + yield Button("Back", id="btn-back") + yield Footer() + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "btn-back": + self.app.pop_screen() + elif event.button.id == "btn-cleanup": + target_sel = self.query_one("#ret-target", Select) + if target_sel.value is Select.BLANK: + self.notify("Select a target first", severity="error") + return + target = str(target_sel.value) + self.app.push_screen( + ConfirmDialog(f"Run retention cleanup for '{target}'?", "Confirm"), + callback=lambda ok: self._do_cleanup(target) if ok else None, + ) + elif event.button.id == "btn-cleanup-all": + self.app.push_screen( + ConfirmDialog("Run retention cleanup for ALL targets?", "Confirm"), + callback=lambda ok: self._do_cleanup_all() if ok else None, + ) + elif event.button.id == "btn-save-count": + val = self.query_one("#ret-count", Input).value.strip() + if not val.isdigit() or int(val) < 1: + self.notify("Retention count must be a positive integer.", severity="error") + return + update_conf_key(CONFIG_DIR / "gniza.conf", "RETENTION_COUNT", val) + self.notify(f"Retention count set to {val}.") + + @work + async def _do_cleanup(self, target: str) -> None: + log_screen = OperationLog(f"Retention: {target}") + self.app.push_screen(log_screen) + rc = await stream_cli(log_screen.write, "retention", f"--target={target}") + if rc == 0: + log_screen.write("\n[green]Cleanup completed.[/green]") + else: + log_screen.write(f"\n[red]Cleanup failed (exit code {rc}).[/red]") + + @work + async def _do_cleanup_all(self) -> None: + log_screen = OperationLog("Retention: All Targets") + self.app.push_screen(log_screen) + rc = await stream_cli(log_screen.write, "retention", "--all") + if rc == 0: + log_screen.write("\n[green]All cleanups completed.[/green]") + else: + log_screen.write(f"\n[red]Cleanup failed (exit code {rc}).[/red]") + + def action_go_back(self) -> None: + self.app.pop_screen() diff --git a/tui/screens/schedule.py b/tui/screens/schedule.py new file mode 100644 index 0000000..30ea76a --- /dev/null +++ b/tui/screens/schedule.py @@ -0,0 +1,164 @@ +import re +from textual.app import ComposeResult +from textual.screen import Screen +from textual.widgets import Header, Footer, Static, Button, DataTable, Input, Select +from textual.containers import Vertical, Horizontal +from textual import work + +from tui.config import list_conf_dir, parse_conf, write_conf, CONFIG_DIR +from tui.models import Schedule +from tui.backend import run_cli +from tui.widgets import ConfirmDialog, OperationLog + +_NAME_RE = re.compile(r'^[a-zA-Z][a-zA-Z0-9_-]{0,31}$') + +SCHEDULE_TYPES = [ + ("Hourly", "hourly"), + ("Daily", "daily"), + ("Weekly", "weekly"), + ("Monthly", "monthly"), + ("Custom cron", "custom"), +] + + +class ScheduleScreen(Screen): + + BINDINGS = [("escape", "go_back", "Back")] + + def compose(self) -> ComposeResult: + yield Header() + with Vertical(id="schedule-screen"): + yield Static("Schedules", id="screen-title") + yield DataTable(id="sched-table") + with Horizontal(id="sched-buttons"): + yield Button("Add", variant="primary", id="btn-add") + yield Button("Delete", variant="error", id="btn-delete") + yield Button("Install to crontab", id="btn-install") + yield Button("Remove from crontab", id="btn-remove") + yield Button("Show crontab", id="btn-show") + yield Button("Back", id="btn-back") + yield Static("", id="sched-divider") + yield Static("Add Schedule", id="sched-add-title") + yield Static("Name:") + yield Input(id="sched-name", placeholder="Schedule name") + yield Static("Type:") + yield Select(SCHEDULE_TYPES, id="sched-type", value="daily") + yield Static("Time (HH:MM):") + yield Input(id="sched-time", value="02:00", placeholder="02:00") + yield Static("Day (0=Sun for weekly, 1-28 for monthly):") + yield Input(id="sched-day", placeholder="Leave empty if not needed") + yield Static("Custom cron (5 fields):") + yield Input(id="sched-cron", placeholder="0 2 * * *") + yield Static("Targets (comma-separated, empty=all):") + yield Input(id="sched-targets", placeholder="") + yield Static("Remotes (comma-separated, empty=all):") + yield Input(id="sched-remotes", placeholder="") + yield Footer() + + def on_mount(self) -> None: + self._refresh_table() + + def _refresh_table(self) -> None: + table = self.query_one("#sched-table", DataTable) + table.clear(columns=True) + table.add_columns("Name", "Type", "Time", "Targets", "Remotes") + schedules = list_conf_dir("schedules.d") + for name in schedules: + data = parse_conf(CONFIG_DIR / "schedules.d" / f"{name}.conf") + s = Schedule.from_conf(name, data) + table.add_row(name, s.schedule, s.time, s.targets or "all", s.remotes or "all", key=name) + + def _selected_schedule(self) -> str | None: + table = self.query_one("#sched-table", DataTable) + if table.cursor_row is not None and table.row_count > 0: + return str(table.coordinate_to_cell_key((table.cursor_row, 0)).row_key.value) + return None + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "btn-back": + self.app.pop_screen() + elif event.button.id == "btn-add": + self._add_schedule() + elif event.button.id == "btn-delete": + name = self._selected_schedule() + if name: + self.app.push_screen( + ConfirmDialog(f"Delete schedule '{name}'?", "Delete Schedule"), + callback=lambda ok: self._delete_schedule(name) if ok else None, + ) + else: + self.notify("Select a schedule first", severity="warning") + elif event.button.id == "btn-install": + self._install_schedules() + elif event.button.id == "btn-remove": + self._remove_schedules() + elif event.button.id == "btn-show": + self._show_crontab() + + def _add_schedule(self) -> None: + name = self.query_one("#sched-name", Input).value.strip() + if not name: + self.notify("Name is required", severity="error") + return + if not _NAME_RE.match(name): + self.notify("Invalid name.", severity="error") + return + conf = CONFIG_DIR / "schedules.d" / f"{name}.conf" + if conf.exists(): + self.notify(f"Schedule '{name}' already exists.", severity="error") + return + type_sel = self.query_one("#sched-type", Select) + stype = str(type_sel.value) if type_sel.value is not Select.BLANK else "daily" + sched = Schedule( + name=name, + schedule=stype, + time=self.query_one("#sched-time", Input).value.strip() or "02:00", + day=self.query_one("#sched-day", Input).value.strip(), + cron=self.query_one("#sched-cron", Input).value.strip(), + targets=self.query_one("#sched-targets", Input).value.strip(), + remotes=self.query_one("#sched-remotes", Input).value.strip(), + ) + write_conf(conf, sched.to_conf()) + self.notify(f"Schedule '{name}' created.") + self._refresh_table() + self.query_one("#sched-name", Input).value = "" + + def _delete_schedule(self, name: str) -> None: + conf = CONFIG_DIR / "schedules.d" / f"{name}.conf" + if conf.is_file(): + conf.unlink() + self.notify(f"Schedule '{name}' deleted.") + self._refresh_table() + + @work + async def _install_schedules(self) -> None: + log_screen = OperationLog("Install Schedules") + self.app.push_screen(log_screen) + rc, stdout, stderr = await run_cli("schedule", "install") + if stdout: + log_screen.write(stdout) + if stderr: + log_screen.write(stderr) + + @work + async def _remove_schedules(self) -> None: + log_screen = OperationLog("Remove Schedules") + self.app.push_screen(log_screen) + rc, stdout, stderr = await run_cli("schedule", "remove") + if stdout: + log_screen.write(stdout) + if stderr: + log_screen.write(stderr) + + @work + async def _show_crontab(self) -> None: + log_screen = OperationLog("Current Crontab") + self.app.push_screen(log_screen) + rc, stdout, stderr = await run_cli("schedule", "show") + if stdout: + log_screen.write(stdout) + if stderr: + log_screen.write(stderr) + + def action_go_back(self) -> None: + self.app.pop_screen() diff --git a/tui/screens/settings.py b/tui/screens/settings.py new file mode 100644 index 0000000..e215c66 --- /dev/null +++ b/tui/screens/settings.py @@ -0,0 +1,100 @@ +from textual.app import ComposeResult +from textual.screen import Screen +from textual.widgets import Header, Footer, Static, Button, Input, Select +from textual.containers import Vertical, Horizontal + +from tui.config import parse_conf, write_conf, CONFIG_DIR +from tui.models import AppSettings + + +class SettingsScreen(Screen): + + BINDINGS = [("escape", "go_back", "Back")] + + def compose(self) -> ComposeResult: + yield Header() + conf = parse_conf(CONFIG_DIR / "gniza.conf") + settings = AppSettings.from_conf(conf) + with Vertical(id="settings-screen"): + yield Static("Settings", id="screen-title") + yield Static("Log Level:") + yield Select( + [("Debug", "debug"), ("Info", "info"), ("Warning", "warn"), ("Error", "error")], + id="set-loglevel", + value=settings.log_level.lower(), + ) + yield Static("Log Retention (days):") + yield Input(value=settings.log_retain, id="set-logretain") + yield Static("Default Retention Count:") + yield Input(value=settings.retention_count, id="set-retention") + yield Static("Default Bandwidth Limit (KB/s, 0=unlimited):") + yield Input(value=settings.bwlimit, id="set-bwlimit") + yield Static("Notification Email:") + yield Input(value=settings.notify_email, id="set-email") + yield Static("Notify On:") + yield Select( + [("Always", "always"), ("Failure only", "failure"), ("Never", "never")], + id="set-notifyon", + value=settings.notify_on, + ) + yield Static("SMTP Host:") + yield Input(value=settings.smtp_host, id="set-smtphost") + yield Static("SMTP Port:") + yield Input(value=settings.smtp_port, id="set-smtpport") + yield Static("SMTP User:") + yield Input(value=settings.smtp_user, id="set-smtpuser") + yield Static("SMTP Password:") + yield Input(value=settings.smtp_password, password=True, id="set-smtppass") + yield Static("SMTP From:") + yield Input(value=settings.smtp_from, id="set-smtpfrom") + yield Static("SMTP Security:") + yield Select( + [("TLS", "tls"), ("SSL", "ssl"), ("None", "none")], + id="set-smtpsec", + value=settings.smtp_security, + ) + yield Static("SSH Timeout:") + yield Input(value=settings.ssh_timeout, id="set-sshtimeout") + yield Static("SSH Retries:") + yield Input(value=settings.ssh_retries, id="set-sshretries") + yield Static("Extra rsync options:") + yield Input(value=settings.rsync_extra_opts, id="set-rsyncopts") + with Horizontal(id="set-buttons"): + yield Button("Save", variant="primary", id="btn-save") + yield Button("Back", id="btn-back") + yield Footer() + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "btn-back": + self.app.pop_screen() + elif event.button.id == "btn-save": + self._save() + + def _get_select_val(self, sel_id: str, default: str) -> str: + sel = self.query_one(sel_id, Select) + return str(sel.value) if sel.value is not Select.BLANK else default + + def _save(self) -> None: + settings = AppSettings( + log_level=self._get_select_val("#set-loglevel", "info"), + log_retain=self.query_one("#set-logretain", Input).value.strip() or "30", + retention_count=self.query_one("#set-retention", Input).value.strip() or "7", + bwlimit=self.query_one("#set-bwlimit", Input).value.strip() or "0", + notify_email=self.query_one("#set-email", Input).value.strip(), + notify_on=self._get_select_val("#set-notifyon", "failure"), + smtp_host=self.query_one("#set-smtphost", Input).value.strip(), + smtp_port=self.query_one("#set-smtpport", Input).value.strip() or "587", + smtp_user=self.query_one("#set-smtpuser", Input).value.strip(), + smtp_password=self.query_one("#set-smtppass", Input).value, + smtp_from=self.query_one("#set-smtpfrom", Input).value.strip(), + smtp_security=self._get_select_val("#set-smtpsec", "tls"), + ssh_timeout=self.query_one("#set-sshtimeout", Input).value.strip() or "30", + ssh_retries=self.query_one("#set-sshretries", Input).value.strip() or "3", + rsync_extra_opts=self.query_one("#set-rsyncopts", Input).value.strip(), + ) + conf_path = CONFIG_DIR / "gniza.conf" + write_conf(conf_path, settings.to_conf()) + self.notify("Settings saved.") + + def action_go_back(self) -> None: + self.app.pop_screen() diff --git a/tui/screens/snapshots.py b/tui/screens/snapshots.py new file mode 100644 index 0000000..627d65f --- /dev/null +++ b/tui/screens/snapshots.py @@ -0,0 +1,68 @@ +from textual.app import ComposeResult +from textual.screen import Screen +from textual.widgets import Header, Footer, Static, Button, Select, DataTable +from textual.containers import Vertical, Horizontal +from textual import work + +from tui.config import list_conf_dir +from tui.backend import run_cli + + +class SnapshotsScreen(Screen): + + BINDINGS = [("escape", "go_back", "Back")] + + def compose(self) -> ComposeResult: + yield Header() + targets = list_conf_dir("targets.d") + remotes = list_conf_dir("remotes.d") + with Vertical(id="snapshots-screen"): + yield Static("Snapshots", id="screen-title") + if not targets or not remotes: + yield Static("Targets and remotes must be configured to browse snapshots.") + else: + yield Static("Target:") + yield Select([(t, t) for t in targets], id="snap-target", prompt="Select target") + yield Static("Remote:") + yield Select([(r, r) for r in remotes], id="snap-remote", prompt="Select remote") + yield Button("Load Snapshots", id="btn-load", variant="primary") + yield DataTable(id="snap-table") + yield Button("Back", id="btn-back") + yield Footer() + + def on_mount(self) -> None: + try: + table = self.query_one("#snap-table", DataTable) + table.add_columns("Snapshot") + except Exception: + pass + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "btn-back": + self.app.pop_screen() + elif event.button.id == "btn-load": + self._load_snapshots() + + @work + async def _load_snapshots(self) -> None: + target_sel = self.query_one("#snap-target", Select) + remote_sel = self.query_one("#snap-remote", Select) + if target_sel.value is Select.BLANK or remote_sel.value is Select.BLANK: + self.notify("Select target and remote first", severity="error") + return + target = str(target_sel.value) + remote = str(remote_sel.value) + rc, stdout, stderr = await run_cli("snapshots", "list", f"--target={target}", f"--remote={remote}") + table = self.query_one("#snap-table", DataTable) + table.clear() + lines = [l.strip() for l in stdout.splitlines() if l.strip() and not l.startswith("===")] + if lines: + for s in lines: + table.add_row(s) + else: + self.notify("No snapshots found", severity="warning") + if stderr: + self.notify(stderr.strip(), severity="error") + + def action_go_back(self) -> None: + self.app.pop_screen() diff --git a/tui/screens/target_edit.py b/tui/screens/target_edit.py new file mode 100644 index 0000000..6e503f9 --- /dev/null +++ b/tui/screens/target_edit.py @@ -0,0 +1,116 @@ +import re +from textual.app import ComposeResult +from textual.screen import Screen +from textual.widgets import Header, Footer, Static, Button, Input, Switch +from textual.containers import Vertical, Horizontal + +from tui.config import parse_conf, write_conf, CONFIG_DIR, list_conf_dir +from tui.models import Target +from tui.widgets import FolderPicker + +_NAME_RE = re.compile(r'^[a-zA-Z][a-zA-Z0-9_-]{0,31}$') + + +class TargetEditScreen(Screen): + + BINDINGS = [("escape", "go_back", "Back")] + + def __init__(self, name: str = ""): + super().__init__() + self._edit_name = name + self._is_new = not name + + def compose(self) -> ComposeResult: + yield Header() + title = "Add Target" if self._is_new else f"Edit Target: {self._edit_name}" + target = Target() + if not self._is_new: + data = parse_conf(CONFIG_DIR / "targets.d" / f"{self._edit_name}.conf") + target = Target.from_conf(self._edit_name, data) + + with Vertical(id="target-edit"): + yield Static(title, id="screen-title") + if self._is_new: + yield Static("Name:") + yield Input(value="", placeholder="Target name", id="te-name") + yield Static("Folders (comma-separated):") + yield Input(value=target.folders, placeholder="/path1,/path2", id="te-folders") + yield Button("Browse...", id="btn-browse") + yield Static("Exclude patterns:") + yield Input(value=target.exclude, placeholder="*.tmp,*.log", id="te-exclude") + yield Static("Remote override:") + yield Input(value=target.remote, placeholder="Leave empty for default", id="te-remote") + yield Static("Retention override:") + yield Input(value=target.retention, placeholder="Leave empty for default", id="te-retention") + yield Static("Pre-backup hook:") + yield Input(value=target.pre_hook, placeholder="Command to run before backup", id="te-prehook") + yield Static("Post-backup hook:") + yield Input(value=target.post_hook, placeholder="Command to run after backup", id="te-posthook") + with Horizontal(): + yield Static("Enabled: ") + yield Switch(value=target.enabled == "yes", id="te-enabled") + with Horizontal(id="te-buttons"): + yield Button("Save", variant="primary", id="btn-save") + yield Button("Cancel", id="btn-cancel") + yield Footer() + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "btn-cancel": + self.dismiss(None) + elif event.button.id == "btn-browse": + self.app.push_screen( + FolderPicker("Select folder to back up"), + callback=self._folder_selected, + ) + elif event.button.id == "btn-save": + self._save() + + def _folder_selected(self, path: str | None) -> None: + if path: + folders_input = self.query_one("#te-folders", Input) + current = folders_input.value.strip() + if current: + existing = [f.strip() for f in current.split(",")] + if path not in existing: + folders_input.value = current + "," + path + else: + folders_input.value = path + + def _save(self) -> None: + if self._is_new: + name = self.query_one("#te-name", Input).value.strip() + if not name: + self.notify("Name is required", severity="error") + return + if not _NAME_RE.match(name): + self.notify("Invalid name. Use letters, digits, _ - (max 32 chars, start with letter).", severity="error") + return + conf = CONFIG_DIR / "targets.d" / f"{name}.conf" + if conf.exists(): + self.notify(f"Target '{name}' already exists.", severity="error") + return + else: + name = self._edit_name + + folders = self.query_one("#te-folders", Input).value.strip() + if not folders: + self.notify("At least one folder is required", severity="error") + return + + target = Target( + name=name, + folders=folders, + exclude=self.query_one("#te-exclude", Input).value.strip(), + remote=self.query_one("#te-remote", Input).value.strip(), + retention=self.query_one("#te-retention", Input).value.strip(), + pre_hook=self.query_one("#te-prehook", Input).value.strip(), + post_hook=self.query_one("#te-posthook", Input).value.strip(), + enabled="yes" if self.query_one("#te-enabled", Switch).value else "no", + ) + conf = CONFIG_DIR / "targets.d" / f"{name}.conf" + write_conf(conf, target.to_conf()) + self.notify(f"Target '{name}' saved.") + self.dismiss(name) + + def action_go_back(self) -> None: + self.dismiss(None) diff --git a/tui/screens/targets.py b/tui/screens/targets.py new file mode 100644 index 0000000..8ccca8a --- /dev/null +++ b/tui/screens/targets.py @@ -0,0 +1,80 @@ +from textual.app import ComposeResult +from textual.screen import Screen +from textual.widgets import Header, Footer, Static, Button, DataTable +from textual.containers import Vertical, Horizontal + +from tui.config import list_conf_dir, parse_conf, CONFIG_DIR +from tui.widgets import ConfirmDialog + + +class TargetsScreen(Screen): + + BINDINGS = [("escape", "go_back", "Back")] + + def compose(self) -> ComposeResult: + yield Header() + with Vertical(id="targets-screen"): + yield Static("Targets", id="screen-title") + yield DataTable(id="targets-table") + with Horizontal(id="targets-buttons"): + yield Button("Add", variant="primary", id="btn-add") + yield Button("Edit", id="btn-edit") + yield Button("Delete", variant="error", id="btn-delete") + yield Button("Back", id="btn-back") + yield Footer() + + def on_mount(self) -> None: + self._refresh_table() + + def _refresh_table(self) -> None: + table = self.query_one("#targets-table", DataTable) + table.clear(columns=True) + table.add_columns("Name", "Folders", "Enabled") + targets = list_conf_dir("targets.d") + for name in targets: + data = parse_conf(CONFIG_DIR / "targets.d" / f"{name}.conf") + table.add_row( + name, + data.get("TARGET_FOLDERS", ""), + data.get("TARGET_ENABLED", "yes"), + key=name, + ) + + def _selected_target(self) -> str | None: + table = self.query_one("#targets-table", DataTable) + if table.cursor_row is not None and table.row_count > 0: + row_key = table.get_row_at(table.cursor_row) + return str(table.coordinate_to_cell_key((table.cursor_row, 0)).row_key.value) + return None + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "btn-back": + self.app.pop_screen() + elif event.button.id == "btn-add": + self.app.push_screen("target_edit", callback=lambda _: self._refresh_table()) + elif event.button.id == "btn-edit": + name = self._selected_target() + if name: + from tui.screens.target_edit import TargetEditScreen + self.app.push_screen(TargetEditScreen(name), callback=lambda _: self._refresh_table()) + else: + self.notify("Select a target first", severity="warning") + elif event.button.id == "btn-delete": + name = self._selected_target() + if name: + self.app.push_screen( + ConfirmDialog(f"Delete target '{name}'? This cannot be undone.", "Delete Target"), + callback=lambda ok: self._delete_target(name) if ok else None, + ) + else: + self.notify("Select a target first", severity="warning") + + def _delete_target(self, name: str) -> None: + conf = CONFIG_DIR / "targets.d" / f"{name}.conf" + if conf.is_file(): + conf.unlink() + self.notify(f"Target '{name}' deleted.") + self._refresh_table() + + def action_go_back(self) -> None: + self.app.pop_screen() diff --git a/tui/screens/verify.py b/tui/screens/verify.py new file mode 100644 index 0000000..84d623f --- /dev/null +++ b/tui/screens/verify.py @@ -0,0 +1,69 @@ +from textual.app import ComposeResult +from textual.screen import Screen +from textual.widgets import Header, Footer, Static, Button, Select +from textual.containers import Vertical, Horizontal +from textual import work + +from tui.config import list_conf_dir +from tui.backend import stream_cli +from tui.widgets import ConfirmDialog, OperationLog + + +class VerifyScreen(Screen): + + BINDINGS = [("escape", "go_back", "Back")] + + def compose(self) -> ComposeResult: + yield Header() + targets = list_conf_dir("targets.d") + with Vertical(id="verify-screen"): + yield Static("Verify Backups", id="screen-title") + if not targets: + yield Static("No targets configured.") + else: + yield Static("Target:") + yield Select( + [(t, t) for t in targets], + id="verify-target", + prompt="Select target", + ) + with Horizontal(id="verify-buttons"): + yield Button("Verify Selected", variant="primary", id="btn-verify") + yield Button("Verify All", variant="warning", id="btn-verify-all") + yield Button("Back", id="btn-back") + yield Footer() + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "btn-back": + self.app.pop_screen() + elif event.button.id == "btn-verify": + target_sel = self.query_one("#verify-target", Select) + if target_sel.value is Select.BLANK: + self.notify("Select a target first", severity="error") + return + self._do_verify(str(target_sel.value)) + elif event.button.id == "btn-verify-all": + self._do_verify_all() + + @work + async def _do_verify(self, target: str) -> None: + log_screen = OperationLog(f"Verify: {target}") + self.app.push_screen(log_screen) + rc = await stream_cli(log_screen.write, "verify", f"--target={target}") + if rc == 0: + log_screen.write("\n[green]Verification completed successfully.[/green]") + else: + log_screen.write(f"\n[red]Verification failed (exit code {rc}).[/red]") + + @work + async def _do_verify_all(self) -> None: + log_screen = OperationLog("Verify All Targets") + self.app.push_screen(log_screen) + rc = await stream_cli(log_screen.write, "verify", "--all") + if rc == 0: + log_screen.write("\n[green]All verifications completed.[/green]") + else: + log_screen.write(f"\n[red]Verification failed (exit code {rc}).[/red]") + + def action_go_back(self) -> None: + self.app.pop_screen() diff --git a/tui/screens/wizard.py b/tui/screens/wizard.py new file mode 100644 index 0000000..5ed38b2 --- /dev/null +++ b/tui/screens/wizard.py @@ -0,0 +1,46 @@ +from textual.app import ComposeResult +from textual.screen import Screen +from textual.widgets import Header, Footer, Static, Button +from textual.containers import Vertical, Center + +from tui.config import has_remotes, has_targets + + +class WizardScreen(Screen): + + def compose(self) -> ComposeResult: + yield Header() + with Center(): + with Vertical(id="wizard"): + yield Static( + "[bold]Welcome to gniza Backup Manager![/bold]\n\n" + "This wizard will help you set up your first backup:\n\n" + " 1. Configure a backup destination (remote)\n" + " 2. Define what to back up (target)\n" + " 3. Optionally run your first backup\n", + id="wizard-welcome", + markup=True, + ) + if not has_remotes(): + yield Button("Step 1: Add Remote", variant="primary", id="wiz-remote") + else: + yield Static("[green]Remote configured.[/green]", markup=True) + if not has_targets(): + yield Button("Step 2: Add Target", variant="primary", id="wiz-target") + else: + yield Static("[green]Target configured.[/green]", markup=True) + yield Button("Continue to Main Menu", id="wiz-continue") + yield Button("Skip Setup", id="wiz-skip") + yield Footer() + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "wiz-remote": + self.app.push_screen("remote_edit", callback=self._check_progress) + elif event.button.id == "wiz-target": + self.app.push_screen("target_edit", callback=self._check_progress) + elif event.button.id in ("wiz-continue", "wiz-skip"): + self.app.switch_screen("main") + + def _check_progress(self, result) -> None: + if has_remotes() and has_targets(): + self.app.switch_screen("main") diff --git a/tui/widgets/__init__.py b/tui/widgets/__init__.py new file mode 100644 index 0000000..7212319 --- /dev/null +++ b/tui/widgets/__init__.py @@ -0,0 +1,3 @@ +from tui.widgets.folder_picker import FolderPicker +from tui.widgets.confirm_dialog import ConfirmDialog +from tui.widgets.operation_log import OperationLog diff --git a/tui/widgets/confirm_dialog.py b/tui/widgets/confirm_dialog.py new file mode 100644 index 0000000..c08142c --- /dev/null +++ b/tui/widgets/confirm_dialog.py @@ -0,0 +1,28 @@ +from textual.app import ComposeResult +from textual.screen import ModalScreen +from textual.widgets import Static, Button +from textual.containers import Horizontal, Vertical + + +class ConfirmDialog(ModalScreen[bool]): + + BINDINGS = [("escape", "cancel", "Cancel")] + + def __init__(self, message: str, title: str = "Confirm"): + super().__init__() + self._message = message + self._title = title + + def compose(self) -> ComposeResult: + with Vertical(id="confirm-dialog"): + yield Static(self._title, id="cd-title") + yield Static(self._message, id="cd-message") + with Horizontal(id="cd-buttons"): + yield Button("Yes", variant="primary", id="cd-yes") + yield Button("No", variant="default", id="cd-no") + + def on_button_pressed(self, event: Button.Pressed) -> None: + self.dismiss(event.button.id == "cd-yes") + + def action_cancel(self) -> None: + self.dismiss(False) diff --git a/tui/widgets/folder_picker.py b/tui/widgets/folder_picker.py new file mode 100644 index 0000000..69b5d32 --- /dev/null +++ b/tui/widgets/folder_picker.py @@ -0,0 +1,42 @@ +from textual.app import ComposeResult +from textual.screen import ModalScreen +from textual.widgets import DirectoryTree, Header, Footer, Static, Button +from textual.containers import Horizontal, Vertical +from pathlib import Path + + +class _DirOnly(DirectoryTree): + def filter_paths(self, paths): + return [p for p in paths if p.is_dir()] + + +class FolderPicker(ModalScreen[str | None]): + + BINDINGS = [("escape", "cancel", "Cancel")] + + def __init__(self, title: str = "Select folder", start: str = "/"): + super().__init__() + self._title = title + self._start = start + + def compose(self) -> ComposeResult: + with Vertical(id="folder-picker"): + yield Static(self._title, id="fp-title") + yield _DirOnly(self._start, id="fp-tree") + with Horizontal(id="fp-buttons"): + yield Button("Select", variant="primary", id="fp-select") + yield Button("Cancel", variant="default", id="fp-cancel") + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "fp-select": + tree = self.query_one("#fp-tree", DirectoryTree) + node = tree.cursor_node + if node and node.data and node.data.path: + self.dismiss(str(node.data.path)) + else: + self.dismiss(None) + else: + self.dismiss(None) + + def action_cancel(self) -> None: + self.dismiss(None) diff --git a/tui/widgets/operation_log.py b/tui/widgets/operation_log.py new file mode 100644 index 0000000..972645e --- /dev/null +++ b/tui/widgets/operation_log.py @@ -0,0 +1,28 @@ +from textual.app import ComposeResult +from textual.screen import ModalScreen +from textual.widgets import RichLog, Button, Static +from textual.containers import Vertical + + +class OperationLog(ModalScreen[None]): + + BINDINGS = [("escape", "close", "Close")] + + def __init__(self, title: str = "Operation Output"): + super().__init__() + self._title = title + + def compose(self) -> ComposeResult: + with Vertical(id="op-log"): + yield Static(self._title, id="ol-title") + yield RichLog(id="ol-log", wrap=True, highlight=True) + yield Button("Close", variant="primary", id="ol-close") + + def on_button_pressed(self, event: Button.Pressed) -> None: + self.dismiss(None) + + def action_close(self) -> None: + self.dismiss(None) + + def write(self, text: str) -> None: + self.query_one("#ol-log", RichLog).write(text)