Files
company-nix/pkgs/helpers/cli.py
2026-03-18 13:28:51 +01:00

1499 lines
54 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import dataclasses
import datetime as dt
import difflib
import json
import os
import re
import shlex
import shutil
import select
import subprocess
import sys
import tempfile
import time
from pathlib import Path
from typing import Any
SUPPORTED_CONFIG_MARKER = "Generated by nodeiwest host init."
SUPPORTED_DISKO_MARKER = "Generated by nodeiwest host init."
DEFAULT_STATE_VERSION = "25.05"
BOOT_MODE_CHOICES = ("uefi", "bios")
ACTIVITY_FRAMES = (0, 1, 2, 3, 2, 1)
class NodeiwestError(RuntimeError):
pass
@dataclasses.dataclass
class ProbeFacts:
ip: str
user: str
boot_mode: str
primary_disk: str
root_partition: str
root_source: str
disk_family: str
swap_devices: list[str]
disk_rows: list[dict[str, str]]
raw_outputs: dict[str, str]
def to_json(self) -> dict[str, Any]:
return dataclasses.asdict(self)
@dataclasses.dataclass
class ExistingConfiguration:
host_name: str
timezone: str
boot_mode: str
tailscale_openbao: bool
user_ca_public_keys: list[str]
state_version: str
managed: bool
@dataclasses.dataclass
class ExistingDisko:
disk_device: str
boot_mode: str
swap_size: str
managed: bool
@dataclasses.dataclass
class RepoDefaults:
state_version: str
user_ca_public_keys: list[str]
def main() -> int:
parser = build_parser()
args = parser.parse_args()
if not hasattr(args, "func"):
parser.print_help()
return 1
try:
return int(args.func(args) or 0)
except KeyboardInterrupt:
print("Interrupted.", file=sys.stderr)
return 130
except NodeiwestError as exc:
print(str(exc), file=sys.stderr)
return 1
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="nodeiwest",
description="Safe VPS provisioning helpers for the NodeiWest flake.",
)
subparsers = parser.add_subparsers(dest="command")
host_parser = subparsers.add_parser("host", help="Probe and initialize host files.")
host_subparsers = host_parser.add_subparsers(dest="host_command")
probe_parser = host_subparsers.add_parser("probe", help="Probe a live host over SSH.")
probe_parser.add_argument("--ip", required=True, help="Target host IP or hostname.")
probe_parser.add_argument("--user", default="root", help="SSH user. Default: root.")
probe_parser.add_argument("--json", action="store_true", help="Emit machine-readable JSON.")
probe_parser.set_defaults(func=cmd_host_probe)
init_parser = host_subparsers.add_parser("init", help="Create or update hosts/<name>/ files.")
init_parser.add_argument("--name", required=True, help="Host name, e.g. vps2.")
init_parser.add_argument("--ip", required=True, help="Target host IP or hostname.")
init_parser.add_argument("--user", default="root", help="SSH user. Default: root.")
init_parser.add_argument("--disk", help="Override the probed disk device, e.g. /dev/sda.")
init_parser.add_argument("--boot-mode", choices=BOOT_MODE_CHOICES, help="Override the probed boot mode.")
init_parser.add_argument("--swap-size", help="Swap partition size. Default: 4G.")
init_parser.add_argument("--timezone", help="Time zone. Default for new hosts: UTC.")
init_parser.add_argument(
"--tailscale-openbao",
choices=("on", "off"),
help="Enable or disable OpenBao-backed Tailscale bootstrap. Default for new hosts: on.",
)
init_parser.add_argument("--apply", action="store_true", help="Write files after confirmation.")
init_parser.add_argument("--yes", action="store_true", help="Skip the interactive confirmation prompt.")
init_parser.add_argument("--force", action="store_true", help="Proceed even if target files are dirty.")
init_parser.set_defaults(func=cmd_host_init)
openbao_parser = subparsers.add_parser("openbao", help="Create host OpenBao bootstrap material.")
openbao_subparsers = openbao_parser.add_subparsers(dest="openbao_command")
init_host_parser = openbao_subparsers.add_parser("init-host", help="Create policy, AppRole, and bootstrap files.")
init_host_parser.add_argument("--name", required=True, help="Host name, e.g. vps2.")
init_host_parser.add_argument("--namespace", default="it", help="OpenBao namespace. Default: it.")
init_host_parser.add_argument("--kv-mount", default="kv", help="KV v2 mount name. Default: kv.")
init_host_parser.add_argument("--secret-path", default="tailscale", help="Logical secret path. Default: tailscale.")
init_host_parser.add_argument("--field", default="CLIENT_SECRET", help="Secret field. Default: CLIENT_SECRET.")
init_host_parser.add_argument("--auth-path", default="auth/approle", help="AppRole auth mount. Default: auth/approle.")
init_host_parser.add_argument("--policy-name", help="Policy name. Default: tailscale-<host>.")
init_host_parser.add_argument("--role-name", help="AppRole name. Default: tailscale-<host>.")
init_host_parser.add_argument("--out", default="bootstrap", help="Bootstrap output directory. Default: ./bootstrap.")
init_host_parser.add_argument(
"--kv-mount-path",
help="Override the actual HCL policy path, e.g. kv/data/tailscale.",
)
init_host_parser.add_argument("--cidr", action="append", default=[], help="Optional CIDR restriction. Repeatable.")
init_host_parser.add_argument("--apply", action="store_true", help="Execute the plan after confirmation.")
init_host_parser.add_argument("--yes", action="store_true", help="Skip the interactive confirmation prompt.")
init_host_parser.set_defaults(func=cmd_openbao_init_host)
install_parser = subparsers.add_parser("install", help="Plan or run nixos-anywhere.")
install_subparsers = install_parser.add_subparsers(dest="install_command")
install_plan_parser = install_subparsers.add_parser("plan", help="Print the nixos-anywhere command.")
add_install_arguments(install_plan_parser)
install_plan_parser.set_defaults(func=cmd_install_plan)
install_run_parser = install_subparsers.add_parser("run", help="Execute the nixos-anywhere command.")
add_install_arguments(install_run_parser)
install_run_parser.add_argument("--apply", action="store_true", help="Actually run nixos-anywhere.")
install_run_parser.add_argument("--yes", action="store_true", help="Skip the interactive confirmation prompt.")
install_run_parser.set_defaults(func=cmd_install_run)
verify_parser = subparsers.add_parser("verify", help="Verify a provisioned host.")
verify_subparsers = verify_parser.add_subparsers(dest="verify_command")
verify_host_parser = verify_subparsers.add_parser("host", help="Check first-boot service health.")
verify_host_parser.add_argument("--name", required=True, help="Host name.")
verify_host_parser.add_argument("--ip", required=True, help="Target host IP or hostname.")
verify_host_parser.add_argument("--user", default="root", help="SSH user. Default: root.")
verify_host_parser.set_defaults(func=cmd_verify_host)
colmena_parser = subparsers.add_parser("colmena", help="Check colmena host inventory.")
colmena_subparsers = colmena_parser.add_subparsers(dest="colmena_command")
colmena_plan_parser = colmena_subparsers.add_parser("plan", help="Print the colmena target block or deploy command.")
colmena_plan_parser.add_argument("--name", required=True, help="Host name.")
colmena_plan_parser.add_argument("--ip", help="Target IP to use in a suggested snippet when missing.")
colmena_plan_parser.set_defaults(func=cmd_colmena_plan)
return parser
def add_install_arguments(parser: argparse.ArgumentParser) -> None:
parser.add_argument("--name", required=True, help="Host name.")
parser.add_argument("--ip", help="Target host IP. Defaults to the colmena inventory if present.")
parser.add_argument("--bootstrap-dir", default="bootstrap", help="Bootstrap directory. Default: ./bootstrap.")
parser.add_argument(
"--copy-host-keys",
choices=("on", "off"),
default="on",
help="Whether to pass --copy-host-keys. Default: on.",
)
parser.add_argument(
"--generate-hardware-config",
choices=("on", "off"),
default="on",
help="Whether to pass --generate-hardware-config. Default: on.",
)
def cmd_host_probe(args: argparse.Namespace) -> int:
facts = probe_host(args.ip, args.user)
if args.json:
print(json.dumps(facts.to_json(), indent=2, sort_keys=True))
return 0
print(f"Host: {args.user}@{args.ip}")
print(f"Boot mode: {facts.boot_mode.upper()}")
print(f"Primary disk: {facts.primary_disk}")
print(f"Root source: {facts.root_source}")
print(f"Root partition: {facts.root_partition}")
print(f"Disk family: {facts.disk_family}")
print(f"Swap devices: {', '.join(facts.swap_devices) if facts.swap_devices else 'none'}")
print("")
print("Disk inventory:")
for row in facts.disk_rows:
model = row.get("MODEL") or "n/a"
print(
" "
+ f"{row.get('NAME', '?')} size={row.get('SIZE', '?')} type={row.get('TYPE', '?')} "
+ f"model={model} fstype={row.get('FSTYPE', '') or '-'} pttype={row.get('PTTYPE', '') or '-'}"
)
return 0
def cmd_host_init(args: argparse.Namespace) -> int:
repo_root = find_repo_root(Path.cwd())
ensure_expected_repo_root(repo_root)
validate_host_name(args.name)
host_dir = repo_root / "hosts" / args.name
config_path = host_dir / "configuration.nix"
disko_path = host_dir / "disko.nix"
hardware_path = host_dir / "hardware-configuration.nix"
if not args.force:
ensure_git_paths_clean(repo_root, [config_path, disko_path, hardware_path])
host_dir.mkdir(parents=True, exist_ok=True)
existing_config = parse_existing_configuration(config_path) if config_path.exists() else None
existing_disko = parse_existing_disko(disko_path) if disko_path.exists() else None
repo_defaults = infer_repo_defaults(repo_root, skip_host=args.name)
facts = None
if not (args.disk and args.boot_mode):
facts = probe_host(args.ip, args.user)
disk_device = args.disk or (facts.primary_disk if facts else None)
boot_mode = normalize_boot_mode(args.boot_mode or (facts.boot_mode if facts else None))
if not disk_device or not boot_mode:
raise NodeiwestError("Unable to determine both disk and boot mode. Supply --disk and --boot-mode explicitly.")
if existing_config is not None and existing_config.host_name != args.name:
raise NodeiwestError(
f"{config_path.relative_to(repo_root)} already declares hostName={existing_config.host_name!r}, not {args.name!r}."
)
if existing_config is not None and existing_config.boot_mode != boot_mode:
raise NodeiwestError(
f"{config_path.relative_to(repo_root)} uses {existing_config.boot_mode.upper()} boot settings but the requested boot mode is {boot_mode.upper()}."
)
if existing_disko is not None and existing_disko.boot_mode != boot_mode:
raise NodeiwestError(
f"{disko_path.relative_to(repo_root)} describes a {existing_disko.boot_mode.upper()} layout but the requested boot mode is {boot_mode.upper()}."
)
if existing_disko is not None and existing_disko.disk_device != disk_device and not args.yes:
print(
f"Existing disk device in {disko_path.relative_to(repo_root)} is {existing_disko.disk_device}; requested device is {disk_device}.",
file=sys.stderr,
)
swap_size = normalize_swap_size(args.swap_size or (existing_disko.swap_size if existing_disko else "4G"))
timezone = args.timezone or (existing_config.timezone if existing_config else "UTC")
tailscale_openbao = parse_on_off(args.tailscale_openbao, existing_config.tailscale_openbao if existing_config else True)
state_version = existing_config.state_version if existing_config else repo_defaults.state_version
user_ca_public_keys = existing_config.user_ca_public_keys if existing_config else repo_defaults.user_ca_public_keys
if not user_ca_public_keys:
raise NodeiwestError(
"No SSH user CA public keys could be inferred from the repo. Add them to an existing host config first or create this host manually."
)
configuration_text = render_configuration(
host_name=args.name,
timezone=timezone,
boot_mode=boot_mode,
disk_device=disk_device,
tailscale_openbao=tailscale_openbao,
state_version=state_version,
user_ca_public_keys=user_ca_public_keys,
)
disko_text = render_disko(boot_mode=boot_mode, disk_device=disk_device, swap_size=swap_size)
hardware_text = load_template("hardware-configuration.placeholder.nix")
plans = []
plans.extend(plan_file_update(config_path, configuration_text))
plans.extend(plan_file_update(disko_path, disko_text))
if hardware_path.exists():
plans.extend(plan_file_update(hardware_path, hardware_path.read_text()))
else:
plans.extend(plan_file_update(hardware_path, hardware_text))
if not plans:
print(f"No changes required under hosts/{args.name}.")
else:
print(f"Planned updates for hosts/{args.name}:")
for plan in plans:
print("")
print(plan["summary"])
if plan["diff"]:
print(plan["diff"])
flake_text = (repo_root / "flake.nix").read_text()
nixos_missing = not flake_has_nixos_configuration(flake_text, args.name)
colmena_missing = not flake_has_colmena_host(flake_text, args.name)
if nixos_missing or colmena_missing:
print("")
print("flake.nix additions required:")
if nixos_missing:
print(build_nixos_configuration_snippet(args.name))
if colmena_missing:
print(build_colmena_host_snippet(args.name, args.ip))
if not args.apply:
print("")
print("Dry run only. Re-run with --apply to write these files.")
return 0
if plans and not args.yes:
if not confirm("Write the planned host files? [y/N] "):
raise NodeiwestError("Aborted before writing host files.")
for plan in plans:
if plan["changed"]:
write_file_with_backup(plan["path"], plan["new_text"])
rel_path = plan["path"].relative_to(repo_root)
print(f"Wrote {rel_path}")
if not plans:
print("Nothing to write.")
return 0
def cmd_openbao_init_host(args: argparse.Namespace) -> int:
repo_root = find_repo_root(Path.cwd())
ensure_expected_repo_root(repo_root)
validate_host_name(args.name)
ensure_command_available("bao")
ensure_bao_authenticated()
policy_name = args.policy_name or f"tailscale-{args.name}"
role_name = args.role_name or f"tailscale-{args.name}"
output_dir = resolve_path(repo_root, args.out)
role_id_path = output_dir / "var" / "lib" / "nodeiwest" / "openbao-approle-role-id"
secret_id_path = output_dir / "var" / "lib" / "nodeiwest" / "openbao-approle-secret-id"
secret_data = bao_kv_get(args.namespace, args.kv_mount, args.secret_path)
fields = secret_data.get("data", {})
if isinstance(fields.get("data"), dict):
fields = fields["data"]
if args.field not in fields:
raise NodeiwestError(
f"OpenBao secret {args.secret_path!r} in namespace {args.namespace!r} does not contain field {args.field!r}."
)
if args.kv_mount_path:
policy_content = render_openbao_policy(args.kv_mount_path)
else:
policy_content = derive_openbao_policy(args.namespace, args.kv_mount, args.secret_path)
role_command = build_approle_write_command(args.auth_path, role_name, policy_name, args.cidr)
print(f"Namespace: {args.namespace}")
print(f"KV mount: {args.kv_mount}")
print(f"Policy name: {policy_name}")
print(f"Role name: {role_name}")
print(f"Secret path: {args.secret_path}")
print(f"Field: {args.field}")
print(f"Bootstrap output: {output_dir}")
print("")
print("Policy content:")
print(policy_content.rstrip())
print("")
print("AppRole command:")
print(shlex.join(role_command))
print("")
print("Bootstrap files:")
print(f" {role_id_path}")
print(f" {secret_id_path}")
if not args.apply:
print("")
print("Dry run only. Re-run with --apply to create the policy, AppRole, and bootstrap files.")
return 0
if not args.yes and not confirm("Create or update the OpenBao policy, AppRole, and bootstrap files? [y/N] "):
raise NodeiwestError("Aborted before OpenBao writes.")
with tempfile.NamedTemporaryFile("w", delete=False) as handle:
handle.write(policy_content.rstrip() + "\n")
temp_policy_path = Path(handle.name)
try:
bao_env = {"BAO_NAMESPACE": args.namespace}
run_command(
["bao", "policy", "write", policy_name, str(temp_policy_path)],
cwd=repo_root,
env=bao_env,
next_fix="Check that your token can write policies in the selected namespace.",
)
run_command(
role_command,
cwd=repo_root,
env=bao_env,
next_fix="Check that the AppRole auth mount exists and that your token can manage roles.",
)
role_id = run_command(
["bao", "read", "-field=role_id", f"{args.auth_path}/role/{role_name}/role-id"],
cwd=repo_root,
env=bao_env,
next_fix="Check that the AppRole was created successfully before fetching role_id.",
).stdout.strip()
secret_id = run_command(
["bao", "write", "-f", "-field=secret_id", f"{args.auth_path}/role/{role_name}/secret-id"],
cwd=repo_root,
env=bao_env,
next_fix="Check that the AppRole supports SecretIDs and that your token can generate them.",
).stdout.strip()
finally:
temp_policy_path.unlink(missing_ok=True)
role_id_path.parent.mkdir(parents=True, exist_ok=True)
write_secret_file(role_id_path, role_id + "\n")
write_secret_file(secret_id_path, secret_id + "\n")
print("")
print("OpenBao bootstrap material written.")
print(f"Role ID: {role_id_path}")
print(f"Secret ID: {secret_id_path}")
print("")
print("Next step:")
print(f" nodeiwest install plan --name {args.name} --bootstrap-dir {shlex.quote(str(output_dir))}")
return 0
def cmd_install_plan(args: argparse.Namespace) -> int:
repo_root = find_repo_root(Path.cwd())
ensure_expected_repo_root(repo_root)
install_context = build_install_context(repo_root, args)
print_install_plan(install_context)
return 0
def cmd_install_run(args: argparse.Namespace) -> int:
if not args.apply:
raise NodeiwestError("install run is destructive. Re-run with --apply to execute nixos-anywhere.")
repo_root = find_repo_root(Path.cwd())
ensure_expected_repo_root(repo_root)
install_context = build_install_context(repo_root, args)
ensure_ssh_reachable(install_context["ip"], "root")
print_install_plan(install_context)
if not args.yes and not confirm("Run nixos-anywhere now? [y/N] "):
raise NodeiwestError("Aborted before running nixos-anywhere.")
print("")
stream_command(
install_context["command"],
cwd=repo_root,
next_fix="Recover via provider console or public SSH, then re-check the generated host files and bootstrap material.",
activity_label="Executing install",
)
print("")
print("Install completed. Verify first boot with:")
print(f" nodeiwest verify host --name {args.name} --ip {install_context['ip']}")
return 0
def cmd_verify_host(args: argparse.Namespace) -> int:
validate_host_name(args.name)
services = [
"vault-agent-tailscale",
"nodeiwest-tailscale-authkey-ready",
"tailscaled-autoconnect",
]
service_results: dict[str, subprocess.CompletedProcess[str]] = {}
for service in services:
service_results[service] = ssh_command(
args.user,
args.ip,
f"systemctl status --no-pager --lines=20 {shlex.quote(service)}",
check=False,
next_fix="Check public SSH reachability before retrying verification.",
)
tailscale_status = ssh_command(
args.user,
args.ip,
"tailscale status",
check=False,
next_fix="Check public SSH reachability before retrying verification.",
)
print(f"Verification target: {args.user}@{args.ip} ({args.name})")
print("")
for service in services:
state = classify_systemd_status(service_results[service])
print(f"{service}: {state}")
print(f"tailscale status: {'healthy' if tailscale_status.returncode == 0 else 'error'}")
causes = infer_verify_failures(service_results, tailscale_status)
if causes:
print("")
print("Likely causes:")
for cause in causes:
print(f" - {cause}")
print("")
print("Service excerpts:")
for service in services:
print(f"[{service}]")
excerpt = summarize_text(service_results[service].stdout or service_results[service].stderr, 12)
print(excerpt or "(no output)")
print("")
print("[tailscale status]")
print(summarize_text(tailscale_status.stdout or tailscale_status.stderr, 12) or "(no output)")
return 0
def cmd_colmena_plan(args: argparse.Namespace) -> int:
repo_root = find_repo_root(Path.cwd())
ensure_expected_repo_root(repo_root)
validate_host_name(args.name)
flake_text = (repo_root / "flake.nix").read_text()
target_host = lookup_colmena_target_host(flake_text, args.name)
if target_host:
print(f"colmena targetHost for {args.name}: {target_host}")
else:
if not args.ip:
raise NodeiwestError(
f"flake.nix does not define colmena.{args.name}.deployment.targetHost and no --ip was provided."
)
print("Missing colmena host block. Add this to flake.nix:")
print(build_colmena_host_snippet(args.name, args.ip))
print("")
print(f"Deploy command: nix run .#colmena -- apply --on {args.name}")
return 0
def find_repo_root(start: Path) -> Path:
git_result = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
cwd=start,
text=True,
capture_output=True,
)
if git_result.returncode == 0:
return Path(git_result.stdout.strip()).resolve()
current = start.resolve()
for candidate in [current, *current.parents]:
if (candidate / "flake.nix").exists() and (candidate / "modules" / "home.nix").exists():
return candidate
raise NodeiwestError("Not inside the nix-nodeiwest repository. Run the helper from this flake checkout.")
def ensure_expected_repo_root(repo_root: Path) -> None:
required = [
repo_root / "flake.nix",
repo_root / "modules" / "home.nix",
repo_root / "hosts",
]
missing = [path for path in required if not path.exists()]
if missing:
formatted = ", ".join(str(path.relative_to(repo_root)) for path in missing)
raise NodeiwestError(f"Repository root is missing expected files: {formatted}")
def validate_host_name(name: str) -> None:
if not re.fullmatch(r"[a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?", name):
raise NodeiwestError(
f"Invalid host name {name!r}. Use lowercase letters, digits, and hyphens only, without a trailing hyphen."
)
def probe_host(ip: str, user: str) -> ProbeFacts:
lsblk_cmd = "lsblk -P -o NAME,SIZE,TYPE,MODEL,FSTYPE,PTTYPE,MOUNTPOINTS"
boot_cmd = "test -d /sys/firmware/efi && echo UEFI || echo BIOS"
root_cmd = "findmnt -no SOURCE /"
swap_cmd = "cat /proc/swaps"
lsblk_output = ssh_command(user, ip, lsblk_cmd, next_fix="Check SSH access and that lsblk exists on the target.").stdout
boot_output = ssh_command(user, ip, boot_cmd, next_fix="Check SSH access and that /sys/firmware is readable.").stdout
root_output = ssh_command(user, ip, root_cmd, next_fix="Check SSH access and that findmnt exists on the target.").stdout
swap_output = ssh_command(user, ip, swap_cmd, next_fix="Check SSH access and that /proc/swaps is readable.").stdout
disk_rows = parse_lsblk_output(lsblk_output)
disk_devices = [f"/dev/{row['NAME']}" for row in disk_rows if row.get("TYPE") == "disk"]
if not disk_devices:
raise NodeiwestError("No disk devices were found in the remote lsblk output.")
root_source = root_output.strip()
if not root_source:
raise NodeiwestError("findmnt returned an empty root source; cannot determine the primary disk.")
root_partition = normalize_device(root_source)
primary_disk = disk_from_device(root_partition)
if primary_disk not in disk_devices:
if len(disk_devices) == 1:
primary_disk = disk_devices[0]
else:
raise NodeiwestError(
"Multiple candidate disks were found and the root source did not map cleanly to one of them. Re-run with --disk."
)
boot_mode = normalize_boot_mode(boot_output.strip())
swap_devices = parse_swaps(swap_output)
return ProbeFacts(
ip=ip,
user=user,
boot_mode=boot_mode,
primary_disk=primary_disk,
root_partition=root_partition,
root_source=root_source,
disk_family=classify_disk_family(primary_disk),
swap_devices=swap_devices,
disk_rows=disk_rows,
raw_outputs={
"lsblk": lsblk_output,
"boot_mode": boot_output,
"root_source": root_output,
"swaps": swap_output,
},
)
def parse_lsblk_output(output: str) -> list[dict[str, str]]:
lines = [line.strip() for line in output.splitlines() if line.strip()]
if not lines:
raise NodeiwestError("Unexpected lsblk output: not enough lines to parse.")
columns = ["NAME", "SIZE", "TYPE", "MODEL", "FSTYPE", "PTTYPE", "MOUNTPOINTS"]
rows: list[dict[str, str]] = []
for line in lines:
tokens = shlex.split(line)
row = {}
for token in tokens:
if "=" not in token:
continue
key, value = token.split("=", 1)
row[key] = value
missing = [column for column in columns if column not in row]
if missing:
raise NodeiwestError(
f"Unexpected lsblk output: missing columns {', '.join(missing)} in line {line!r}."
)
rows.append(row)
return rows
def normalize_boot_mode(value: str | None) -> str:
if not value:
raise NodeiwestError("Boot mode is missing.")
normalized = value.strip().lower()
if normalized not in BOOT_MODE_CHOICES:
raise NodeiwestError(f"Unsupported boot mode {value!r}. Expected one of: {', '.join(BOOT_MODE_CHOICES)}.")
return normalized
def normalize_device(value: str) -> str:
normalized = value.strip()
if not normalized.startswith("/dev/"):
raise NodeiwestError(
f"Unsupported root source {value!r}. Only plain /dev/* block devices are supported by the helper."
)
return normalized
def disk_from_device(device: str) -> str:
name = Path(device).name
if re.fullmatch(r"nvme\d+n\d+p\d+", name) or re.fullmatch(r"mmcblk\d+p\d+", name):
base_name = re.sub(r"p\d+$", "", name)
return f"/dev/{base_name}"
if re.search(r"\d+$", name):
base_name = re.sub(r"\d+$", "", name)
return f"/dev/{base_name}"
return device
def classify_disk_family(device: str) -> str:
name = Path(device).name
if name.startswith("nvme"):
return "nvme"
if name.startswith("vd"):
return "vda"
if name.startswith("sd"):
return "sda"
return "other"
def parse_swaps(output: str) -> list[str]:
lines = [line.strip() for line in output.splitlines() if line.strip()]
if len(lines) <= 1:
return []
return [line.split()[0] for line in lines[1:]]
def normalize_swap_size(value: str) -> str:
normalized = value.strip()
replacements = {
"KiB": "K",
"MiB": "M",
"GiB": "G",
"TiB": "T",
"PiB": "P",
}
for suffix, replacement in replacements.items():
if normalized.endswith(suffix):
return normalized[: -len(suffix)] + replacement
return normalized
def parse_existing_configuration(path: Path) -> ExistingConfiguration:
text = path.read_text()
if "./disko.nix" not in text or "./hardware-configuration.nix" not in text:
raise NodeiwestError(
f"{path} does not match the supported configuration shape. Manual intervention is required."
)
host_name = extract_single_match(text, r'networking\.hostName\s*=\s*"([^"]+)";', path, "hostName")
timezone = extract_single_match(text, r'time\.timeZone\s*=\s*"([^"]+)";', path, "time.timeZone")
state_version = extract_single_match(text, r'system\.stateVersion\s*=\s*"([^"]+)";', path, "system.stateVersion")
user_ca_public_keys = extract_nix_string_list(text, r"nodeiwest\.ssh\.userCAPublicKeys\s*=\s*\[(.*?)\];", path)
tailscale_enable_text = extract_optional_match(
text,
r"nodeiwest\.tailscale\.openbao(?:\.enable\s*=\s*|\s*=\s*\{[^}]*enable\s*=\s*)(true|false);",
)
if tailscale_enable_text is None:
raise NodeiwestError(
f"{path} does not contain a supported nodeiwest.tailscale.openbao.enable declaration."
)
if 'boot.loader.efi.canTouchEfiVariables = true;' in text and 'device = "nodev";' in text:
boot_mode = "uefi"
elif re.search(r'boot\.loader\.grub\s*=\s*\{[^}]*device\s*=\s*"/dev/', text, re.S) or 'efiSupport = false;' in text:
boot_mode = "bios"
else:
raise NodeiwestError(
f"{path} has a boot loader configuration outside the helper's supported template shape."
)
return ExistingConfiguration(
host_name=host_name,
timezone=timezone,
boot_mode=boot_mode,
tailscale_openbao=(tailscale_enable_text == "true"),
user_ca_public_keys=user_ca_public_keys,
state_version=state_version,
managed=SUPPORTED_CONFIG_MARKER in text,
)
def parse_existing_disko(path: Path) -> ExistingDisko:
text = path.read_text()
if 'type = "gpt";' not in text or 'format = "ext4";' not in text or 'type = "swap";' not in text:
raise NodeiwestError(
f"{path} does not match the supported single-disk ext4+swap disko shape. Manual intervention is required."
)
disk_device = extract_single_match(text, r'device\s*=\s*lib\.mkDefault\s*"([^"]+)";', path, "disk device")
swap_size = extract_single_match(text, r'swap\s*=\s*\{.*?size\s*=\s*"([^"]+)";', path, "swap size", flags=re.S)
if 'type = "EF00";' in text and 'mountpoint = "/boot";' in text:
boot_mode = "uefi"
elif 'type = "EF02";' in text:
boot_mode = "bios"
else:
raise NodeiwestError(
f"{path} does not match the helper's supported UEFI or BIOS templates."
)
return ExistingDisko(
disk_device=disk_device,
boot_mode=boot_mode,
swap_size=swap_size,
managed=SUPPORTED_DISKO_MARKER in text,
)
def infer_repo_defaults(repo_root: Path, skip_host: str | None = None) -> RepoDefaults:
hosts_dir = repo_root / "hosts"
state_versions: list[str] = []
ca_key_sets: set[tuple[str, ...]] = set()
for config_path in sorted(hosts_dir.glob("*/configuration.nix")):
if skip_host and config_path.parent.name == skip_host:
continue
try:
existing = parse_existing_configuration(config_path)
except NodeiwestError:
continue
state_versions.append(existing.state_version)
if existing.user_ca_public_keys:
ca_key_sets.add(tuple(existing.user_ca_public_keys))
state_version = most_common_value(state_versions) or DEFAULT_STATE_VERSION
if len(ca_key_sets) > 1:
raise NodeiwestError(
"Existing host configs define multiple different SSH user CA key lists. The helper will not guess which set to reuse."
)
user_ca_public_keys = list(next(iter(ca_key_sets))) if ca_key_sets else []
return RepoDefaults(state_version=state_version, user_ca_public_keys=user_ca_public_keys)
def most_common_value(values: list[str]) -> str | None:
if not values:
return None
counts: dict[str, int] = {}
for value in values:
counts[value] = counts.get(value, 0) + 1
return sorted(counts.items(), key=lambda item: (-item[1], item[0]))[0][0]
def render_configuration(
*,
host_name: str,
timezone: str,
boot_mode: str,
disk_device: str,
tailscale_openbao: bool,
state_version: str,
user_ca_public_keys: list[str],
) -> str:
template = load_template("configuration.nix.tmpl")
boot_loader_block = render_boot_loader_block(boot_mode, disk_device)
rendered = template
rendered = rendered.replace("@@HOST_NAME@@", host_name)
rendered = rendered.replace("@@TIMEZONE@@", timezone)
rendered = rendered.replace("@@BOOT_LOADER_BLOCK@@", indent(boot_loader_block.rstrip(), " "))
rendered = rendered.replace("@@SSH_CA_KEYS@@", render_nix_string_list(user_ca_public_keys, indent_level=2))
rendered = rendered.replace("@@TAILSCALE_OPENBAO_ENABLE@@", render_nix_bool(tailscale_openbao))
rendered = rendered.replace("@@STATE_VERSION@@", state_version)
return ensure_trailing_newline(rendered)
def render_boot_loader_block(boot_mode: str, disk_device: str) -> str:
if boot_mode == "uefi":
return """
boot.loader.efi.canTouchEfiVariables = true;
boot.loader.grub = {
enable = true;
efiSupport = true;
device = "nodev";
};
""".strip("\n")
return f"""
boot.loader.grub = {{
enable = true;
efiSupport = false;
device = "{escape_nix_string(disk_device)}";
}};
""".strip("\n")
def render_disko(*, boot_mode: str, disk_device: str, swap_size: str) -> str:
template_name = "disko-uefi-ext4.nix" if boot_mode == "uefi" else "disko-bios-ext4.nix"
rendered = load_template(template_name)
rendered = rendered.replace("@@DISK_DEVICE@@", escape_nix_string(disk_device))
rendered = rendered.replace("@@SWAP_SIZE@@", escape_nix_string(swap_size))
return ensure_trailing_newline(rendered)
def render_openbao_policy(policy_path: str) -> str:
rendered = load_template("openbao-policy.hcl.tmpl").replace("@@POLICY_PATH@@", policy_path)
return ensure_trailing_newline(rendered)
def load_template(name: str) -> str:
templates_dir = Path(os.environ.get("NODEIWEST_HELPER_TEMPLATES", Path(__file__).resolve().parent / "templates"))
template_path = templates_dir / name
if not template_path.exists():
raise NodeiwestError(f"Missing helper template: {template_path}")
return template_path.read_text()
def render_nix_string_list(values: list[str], indent_level: int = 0) -> str:
if not values:
return "[ ]"
indent_text = " " * indent_level
lines = ["["]
for value in values:
lines.append(f'{indent_text} "{escape_nix_string(value)}"')
lines.append(f"{indent_text}]")
return "\n".join(lines)
def render_nix_bool(value: bool) -> str:
return "true" if value else "false"
def escape_nix_string(value: str) -> str:
return value.replace("\\", "\\\\").replace('"', '\\"')
def ensure_trailing_newline(text: str) -> str:
return text if text.endswith("\n") else text + "\n"
def indent(text: str, prefix: str) -> str:
return "\n".join(prefix + line if line else line for line in text.splitlines())
def plan_file_update(path: Path, new_text: str) -> list[dict[str, Any]]:
if path.exists():
old_text = path.read_text()
if old_text == new_text:
return []
diff = unified_diff(path, old_text, new_text)
return [{
"path": path,
"changed": True,
"new_text": new_text,
"summary": f"Update {path.name}",
"diff": diff,
}]
diff = unified_diff(path, "", new_text)
return [{
"path": path,
"changed": True,
"new_text": new_text,
"summary": f"Create {path.name}",
"diff": diff,
}]
def unified_diff(path: Path, old_text: str, new_text: str) -> str:
old_lines = old_text.splitlines()
new_lines = new_text.splitlines()
diff = difflib.unified_diff(
old_lines,
new_lines,
fromfile=str(path),
tofile=str(path),
lineterm="",
)
return "\n".join(diff)
def write_file_with_backup(path: Path, text: str) -> None:
if path.exists():
backup_path = backup_file(path)
print(f"Backed up {path.name} to {backup_path.name}")
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(text)
def write_secret_file(path: Path, text: str) -> None:
if path.exists():
backup_path = backup_file(path)
print(f"Backed up {path.name} to {backup_path.name}")
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(text)
path.chmod(0o400)
def backup_file(path: Path) -> Path:
timestamp = dt.datetime.now(dt.timezone.utc).strftime("%Y%m%d%H%M%S")
backup_path = path.with_name(f"{path.name}.bak.{timestamp}")
shutil.copy2(path, backup_path)
return backup_path
def ensure_git_paths_clean(repo_root: Path, paths: list[Path]) -> None:
existing_paths = [path for path in paths if path.exists()]
if not existing_paths:
return
relative_paths = [str(path.relative_to(repo_root)) for path in existing_paths]
result = run_command(
["git", "status", "--porcelain", "--", *relative_paths],
cwd=repo_root,
next_fix="Commit or stash local edits to the target host files, or re-run with --force if you intentionally want to overwrite them.",
)
if result.stdout.strip():
raise NodeiwestError(
"Refusing to modify host files with local git changes:\n"
+ summarize_text(result.stdout, 20)
+ "\nRe-run with --force to override this guard."
)
def flake_has_nixos_configuration(flake_text: str, name: str) -> bool:
pattern = rf'^\s*{re.escape(name)}\s*=\s*mkHost\s+"{re.escape(name)}";'
return re.search(pattern, flake_text, re.M) is not None
def flake_has_colmena_host(flake_text: str, name: str) -> bool:
target_host = lookup_colmena_target_host(flake_text, name)
return target_host is not None
def lookup_colmena_target_host(flake_text: str, name: str) -> str | None:
pattern = re.compile(
rf'colmena\s*=\s*\{{.*?^\s*{re.escape(name)}\s*=\s*\{{.*?targetHost\s*=\s*"([^"]+)";',
re.S | re.M,
)
match = pattern.search(flake_text)
return match.group(1) if match else None
def build_nixos_configuration_snippet(name: str) -> str:
return f' {name} = mkHost "{name}";'
def build_colmena_host_snippet(name: str, ip: str) -> str:
return (
f" {name} = {{\n"
f" deployment = {{\n"
f' targetHost = "{ip}";\n'
f' targetUser = "root";\n'
f" tags = [\n"
f' "company"\n'
f" ];\n"
f" }};\n\n"
f" imports = [ ./hosts/{name}/configuration.nix ];\n"
f" }};"
)
def ensure_command_available(name: str) -> None:
if shutil.which(name) is None:
raise NodeiwestError(f"Required command {name!r} is not available in PATH.")
def ensure_bao_authenticated() -> None:
run_command(
["bao", "token", "lookup"],
next_fix="Run a bao login flow first and verify that `bao token lookup` succeeds.",
)
def bao_kv_get(namespace: str, kv_mount: str, secret_path: str) -> dict[str, Any]:
result = run_command(
["bao", "kv", "get", f"-mount={kv_mount}", "-format=json", secret_path],
env={"BAO_NAMESPACE": namespace},
next_fix=(
"Check BAO_ADDR, BAO_NAMESPACE, the KV mount, and the logical secret path. "
"If the KV mount is not the default, re-run with --kv-mount."
),
)
try:
return json.loads(result.stdout)
except json.JSONDecodeError as exc:
raise NodeiwestError(f"Failed to parse `bao kv get` JSON output: {exc}") from exc
def derive_openbao_policy(namespace: str, kv_mount: str, secret_path: str) -> str:
result = run_command(
["bao", "kv", "get", f"-mount={kv_mount}", "-output-policy", secret_path],
env={"BAO_NAMESPACE": namespace},
next_fix=(
"Check BAO_ADDR, BAO_NAMESPACE, the KV mount, and the logical secret path. "
"If the KV mount is not the default, re-run with --kv-mount. "
"If policy derivation still does not match your mount layout, re-run with --kv-mount-path."
),
)
policy = result.stdout.strip()
if not policy:
raise NodeiwestError("`bao kv get -output-policy` returned an empty policy.")
return ensure_trailing_newline(policy)
def build_approle_write_command(auth_path: str, role_name: str, policy_name: str, cidrs: list[str]) -> list[str]:
command = [
"bao",
"write",
f"{auth_path}/role/{role_name}",
f"token_policies={policy_name}",
"token_ttl=1h",
"token_max_ttl=24h",
"token_num_uses=0",
"secret_id_num_uses=0",
]
if cidrs:
csv = ",".join(cidrs)
command.extend([
f"token_bound_cidrs={csv}",
f"secret_id_bound_cidrs={csv}",
])
return command
def build_install_context(repo_root: Path, args: argparse.Namespace) -> dict[str, Any]:
validate_host_name(args.name)
flake_text = (repo_root / "flake.nix").read_text()
if not flake_has_nixos_configuration(flake_text, args.name):
raise NodeiwestError(
f"flake.nix does not define nixosConfigurations.{args.name}.\nAdd this block:\n{build_nixos_configuration_snippet(args.name)}"
)
ip = args.ip or lookup_colmena_target_host(flake_text, args.name)
if not ip:
raise NodeiwestError(
f"Could not determine an IP for {args.name}. Pass --ip or add a colmena targetHost.\n"
+ build_colmena_host_snippet(args.name, "<ip>")
)
host_dir = repo_root / "hosts" / args.name
configuration_path = host_dir / "configuration.nix"
disko_path = host_dir / "disko.nix"
hardware_path = host_dir / "hardware-configuration.nix"
bootstrap_dir = resolve_path(repo_root, args.bootstrap_dir)
role_id_path = bootstrap_dir / "var" / "lib" / "nodeiwest" / "openbao-approle-role-id"
secret_id_path = bootstrap_dir / "var" / "lib" / "nodeiwest" / "openbao-approle-secret-id"
required_paths = [configuration_path, disko_path, role_id_path, secret_id_path]
missing = [path for path in required_paths if not path.exists()]
if missing:
formatted = "\n".join(f" - {path}" for path in missing)
raise NodeiwestError(f"Install prerequisites are missing:\n{formatted}")
if args.generate_hardware_config == "off" and not hardware_path.exists():
raise NodeiwestError(
f"{hardware_path.relative_to(repo_root)} is missing and --generate-hardware-config=off was requested."
)
command = [
"nix",
"run",
"github:nix-community/nixos-anywhere",
"--",
"--extra-files",
str(bootstrap_dir),
]
if args.copy_host_keys == "on":
command.append("--copy-host-keys")
if args.generate_hardware_config == "on":
command.extend([
"--generate-hardware-config",
"nixos-generate-config",
str(hardware_path),
])
command.extend([
"--flake",
f".#{args.name}",
f"root@{ip}",
])
return {
"ip": ip,
"command": command,
"configuration_path": configuration_path,
"disko_path": disko_path,
"hardware_path": hardware_path,
"role_id_path": role_id_path,
"secret_id_path": secret_id_path,
"colmena_missing": not flake_has_colmena_host(flake_text, args.name),
}
def print_install_plan(context: dict[str, Any]) -> None:
print("Install command:")
print(shlex.join(context["command"]))
print("")
print("Preflight checklist:")
print(" - provider snapshot taken")
print(" - application/data backup taken")
print(" - public SSH reachable")
print(" - host keys may change after install")
print("")
print("Validated files:")
print(f" - {context['configuration_path']}")
print(f" - {context['disko_path']}")
if context["hardware_path"].exists():
print(f" - {context['hardware_path']}")
print(f" - {context['role_id_path']}")
print(f" - {context['secret_id_path']}")
if context["colmena_missing"]:
print("")
print("colmena host block is missing. Add this before the first deploy:")
print(build_colmena_host_snippet(Path(context["configuration_path"]).parent.name, context["ip"]))
def ensure_ssh_reachable(ip: str, user: str) -> None:
ssh_command(
user,
ip,
"true",
next_fix="Check public SSH reachability, host keys, and the target user before running nixos-anywhere.",
)
def ssh_command(
user: str,
ip: str,
remote_command: str,
*,
check: bool = True,
next_fix: str | None = None,
) -> subprocess.CompletedProcess[str]:
return run_command(
[
"ssh",
"-o",
"BatchMode=yes",
"-o",
"ConnectTimeout=10",
f"{user}@{ip}",
remote_command,
],
check=check,
next_fix=next_fix or "Check SSH reachability and authentication before retrying.",
)
def classify_systemd_status(result: subprocess.CompletedProcess[str]) -> str:
text = f"{result.stdout}\n{result.stderr}".lower()
if "active (running)" in text or "active (exited)" in text:
return "active"
if "failed" in text:
return "failed"
if "inactive" in text:
return "inactive"
return "unknown"
def infer_verify_failures(
service_results: dict[str, subprocess.CompletedProcess[str]],
tailscale_status: subprocess.CompletedProcess[str],
) -> list[str]:
messages: list[str] = []
combined = "\n".join(
(result.stdout or "") + "\n" + (result.stderr or "")
for result in [*service_results.values(), tailscale_status]
).lower()
if any(path in combined for path in ["openbao-approle-role-id", "openbao-approle-secret-id", "no such file"]):
messages.append("Missing AppRole files on the host. Check /var/lib/nodeiwest/openbao-approle-role-id and ...secret-id.")
if any(fragment in combined for fragment in ["invalid secret id", "permission denied", "approle", "failed to authenticate"]):
messages.append("OpenBao AppRole authentication failed. Re-check the role, secret_id, namespace, and auth mount.")
if any(fragment in combined for fragment in ["CLIENT_SECRET", "timed out waiting for rendered tailscale auth key", "no data", "secret path"]):
messages.append("OpenBao rendered no Tailscale auth key. Check the secret path, KV mount path, and CLIENT_SECRET field.")
if tailscale_status.returncode != 0 or "logged out" in (tailscale_status.stdout or "").lower():
messages.append("Tailscale autoconnect is blocked. Check tailscaled-autoconnect, the rendered auth key, and outbound access to Tailscale.")
deduped: list[str] = []
for message in messages:
if message not in deduped:
deduped.append(message)
return deduped
def summarize_text(text: str, lines: int) -> str:
cleaned = [line.rstrip() for line in text.splitlines() if line.strip()]
return "\n".join(cleaned[:lines])
def resolve_path(repo_root: Path, value: str) -> Path:
path = Path(value)
return path if path.is_absolute() else (repo_root / path)
def parse_on_off(value: str | None, default: bool) -> bool:
if value is None:
return default
return value == "on"
def confirm(prompt: str) -> bool:
answer = input(prompt).strip().lower()
return answer in {"y", "yes"}
def extract_single_match(
text: str,
pattern: str,
path: Path,
label: str,
*,
flags: int = 0,
) -> str:
match = re.search(pattern, text, flags)
if not match:
raise NodeiwestError(f"Could not parse {label} from {path}; manual intervention is required.")
return match.group(1)
def extract_optional_match(text: str, pattern: str, *, flags: int = re.S) -> str | None:
match = re.search(pattern, text, flags)
return match.group(1) if match else None
def extract_nix_string_list(text: str, pattern: str, path: Path) -> list[str]:
match = re.search(pattern, text, re.S)
if not match:
raise NodeiwestError(f"Could not parse nodeiwest.ssh.userCAPublicKeys from {path}.")
values = re.findall(r'"((?:[^"\\]|\\.)*)"', match.group(1))
return [value.replace('\\"', '"').replace("\\\\", "\\") for value in values]
def run_command(
command: list[str],
*,
cwd: Path | None = None,
env: dict[str, str] | None = None,
check: bool = True,
next_fix: str | None = None,
) -> subprocess.CompletedProcess[str]:
merged_env = os.environ.copy()
if env:
merged_env.update(env)
result = subprocess.run(
command,
cwd=str(cwd) if cwd else None,
env=merged_env,
text=True,
capture_output=True,
)
if check and result.returncode != 0:
raise NodeiwestError(format_command_failure(command, result, next_fix))
return result
def stream_command(
command: list[str],
*,
cwd: Path | None = None,
env: dict[str, str] | None = None,
next_fix: str | None = None,
activity_label: str | None = None,
) -> None:
merged_env = os.environ.copy()
if env:
merged_env.update(env)
indicator = BottomActivityIndicator(activity_label) if activity_label else None
process = subprocess.Popen(
command,
cwd=str(cwd) if cwd else None,
env=merged_env,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
if process.stdout is None:
raise NodeiwestError(f"Failed to open output stream for command: {shlex.join(command)}")
if indicator is not None:
indicator.start()
stdout_fd = process.stdout.fileno()
os.set_blocking(stdout_fd, False)
try:
while True:
if indicator is not None:
indicator.render()
ready, _, _ = select.select([stdout_fd], [], [], 0.1)
if ready:
chunk = read_process_chunk(stdout_fd)
if chunk:
write_output_chunk(chunk)
if indicator is not None:
indicator.render(force=True)
continue
break
if process.poll() is not None:
chunk = read_process_chunk(stdout_fd)
if chunk:
write_output_chunk(chunk)
continue
break
finally:
process.stdout.close()
if indicator is not None:
indicator.stop()
return_code = process.wait()
if return_code != 0:
raise NodeiwestError(
f"Command failed: {shlex.join(command)}\nExit code: {return_code}\n"
+ (f"Next likely fix: {next_fix}" if next_fix else "")
)
class BottomActivityIndicator:
def __init__(self, label: str, stream: Any | None = None) -> None:
self.label = label
self.stream = stream or choose_activity_stream()
self.enabled = bool(self.stream and supports_ansi_status(self.stream))
self.rows = 0
self.frame_index = 0
self.last_render_at = 0.0
def start(self) -> None:
if not self.enabled:
return
self.rows = shutil.get_terminal_size(fallback=(80, 24)).lines
if self.rows < 2:
self.enabled = False
return
self.stream.write("\033[?25l")
self.stream.write(f"\033[1;{self.rows - 1}r")
self.stream.flush()
self.render(force=True)
def render(self, *, force: bool = False) -> None:
if not self.enabled:
return
now = time.monotonic()
if not force and (now - self.last_render_at) < 0.12:
return
rows = shutil.get_terminal_size(fallback=(80, 24)).lines
if rows != self.rows and rows >= 2:
self.rows = rows
self.stream.write(f"\033[1;{self.rows - 1}r")
frame = format_activity_frame(self.label, ACTIVITY_FRAMES[self.frame_index])
self.frame_index = (self.frame_index + 1) % len(ACTIVITY_FRAMES)
self.stream.write("\0337")
self.stream.write(f"\033[{self.rows};1H\033[2K{frame}")
self.stream.write("\0338")
self.stream.flush()
self.last_render_at = now
def stop(self) -> None:
if not self.enabled:
return
self.stream.write("\0337")
self.stream.write(f"\033[{self.rows};1H\033[2K")
self.stream.write("\0338")
self.stream.write("\033[r")
self.stream.write("\033[?25h")
self.stream.flush()
def choose_activity_stream() -> Any | None:
if getattr(sys.stderr, "isatty", lambda: False)():
return sys.stderr
if getattr(sys.stdout, "isatty", lambda: False)():
return sys.stdout
return None
def supports_ansi_status(stream: Any) -> bool:
return bool(getattr(stream, "isatty", lambda: False)() and os.environ.get("TERM", "") not in {"", "dumb"})
def format_activity_frame(label: str, active_index: int) -> str:
blocks = []
for index in range(4):
if index == active_index:
blocks.append("\033[38;5;220m█\033[0m")
else:
blocks.append("\033[38;5;208m█\033[0m")
return f"{''.join(blocks)} \033[1;37m{label}\033[0m"
def read_process_chunk(fd: int) -> bytes:
try:
return os.read(fd, 4096)
except BlockingIOError:
return b""
def write_output_chunk(chunk: bytes) -> None:
if hasattr(sys.stdout, "buffer"):
sys.stdout.buffer.write(chunk)
sys.stdout.buffer.flush()
return
sys.stdout.write(chunk.decode(errors="replace"))
sys.stdout.flush()
def format_command_failure(
command: list[str],
result: subprocess.CompletedProcess[str],
next_fix: str | None,
) -> str:
pieces = [
f"Command failed: {shlex.join(command)}",
f"Exit code: {result.returncode}",
]
stdout = summarize_text(result.stdout or "", 20)
stderr = summarize_text(result.stderr or "", 20)
if stdout:
pieces.append(f"stdout:\n{stdout}")
if stderr:
pieces.append(f"stderr:\n{stderr}")
if next_fix:
pieces.append(f"Next likely fix: {next_fix}")
return "\n".join(pieces)
if __name__ == "__main__":
raise SystemExit(main())