1351 lines
50 KiB
Python
1351 lines
50 KiB
Python
#!/usr/bin/env python3
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import dataclasses
|
|
import datetime as dt
|
|
import difflib
|
|
import json
|
|
import os
|
|
import re
|
|
import shlex
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
SUPPORTED_CONFIG_MARKER = "Generated by nodeiwest host init."
|
|
SUPPORTED_DISKO_MARKER = "Generated by nodeiwest host init."
|
|
DEFAULT_STATE_VERSION = "25.05"
|
|
BOOT_MODE_CHOICES = ("uefi", "bios")
|
|
|
|
|
|
class NodeiwestError(RuntimeError):
|
|
pass
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class ProbeFacts:
|
|
ip: str
|
|
user: str
|
|
boot_mode: str
|
|
primary_disk: str
|
|
root_partition: str
|
|
root_source: str
|
|
disk_family: str
|
|
swap_devices: list[str]
|
|
disk_rows: list[dict[str, str]]
|
|
raw_outputs: dict[str, str]
|
|
|
|
def to_json(self) -> dict[str, Any]:
|
|
return dataclasses.asdict(self)
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class ExistingConfiguration:
|
|
host_name: str
|
|
timezone: str
|
|
boot_mode: str
|
|
tailscale_openbao: bool
|
|
user_ca_public_keys: list[str]
|
|
state_version: str
|
|
managed: bool
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class ExistingDisko:
|
|
disk_device: str
|
|
boot_mode: str
|
|
swap_size: str
|
|
managed: bool
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class RepoDefaults:
|
|
state_version: str
|
|
user_ca_public_keys: list[str]
|
|
|
|
|
|
def main() -> int:
|
|
parser = build_parser()
|
|
args = parser.parse_args()
|
|
|
|
if not hasattr(args, "func"):
|
|
parser.print_help()
|
|
return 1
|
|
|
|
try:
|
|
return int(args.func(args) or 0)
|
|
except KeyboardInterrupt:
|
|
print("Interrupted.", file=sys.stderr)
|
|
return 130
|
|
except NodeiwestError as exc:
|
|
print(str(exc), file=sys.stderr)
|
|
return 1
|
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(
|
|
prog="nodeiwest",
|
|
description="Safe VPS provisioning helpers for the NodeiWest flake.",
|
|
)
|
|
subparsers = parser.add_subparsers(dest="command")
|
|
|
|
host_parser = subparsers.add_parser("host", help="Probe and initialize host files.")
|
|
host_subparsers = host_parser.add_subparsers(dest="host_command")
|
|
|
|
probe_parser = host_subparsers.add_parser("probe", help="Probe a live host over SSH.")
|
|
probe_parser.add_argument("--ip", required=True, help="Target host IP or hostname.")
|
|
probe_parser.add_argument("--user", default="root", help="SSH user. Default: root.")
|
|
probe_parser.add_argument("--json", action="store_true", help="Emit machine-readable JSON.")
|
|
probe_parser.set_defaults(func=cmd_host_probe)
|
|
|
|
init_parser = host_subparsers.add_parser("init", help="Create or update hosts/<name>/ files.")
|
|
init_parser.add_argument("--name", required=True, help="Host name, e.g. vps2.")
|
|
init_parser.add_argument("--ip", required=True, help="Target host IP or hostname.")
|
|
init_parser.add_argument("--user", default="root", help="SSH user. Default: root.")
|
|
init_parser.add_argument("--disk", help="Override the probed disk device, e.g. /dev/sda.")
|
|
init_parser.add_argument("--boot-mode", choices=BOOT_MODE_CHOICES, help="Override the probed boot mode.")
|
|
init_parser.add_argument("--swap-size", help="Swap partition size. Default: 4GiB.")
|
|
init_parser.add_argument("--timezone", help="Time zone. Default for new hosts: UTC.")
|
|
init_parser.add_argument(
|
|
"--tailscale-openbao",
|
|
choices=("on", "off"),
|
|
help="Enable or disable OpenBao-backed Tailscale bootstrap. Default for new hosts: on.",
|
|
)
|
|
init_parser.add_argument("--apply", action="store_true", help="Write files after confirmation.")
|
|
init_parser.add_argument("--yes", action="store_true", help="Skip the interactive confirmation prompt.")
|
|
init_parser.add_argument("--force", action="store_true", help="Proceed even if target files are dirty.")
|
|
init_parser.set_defaults(func=cmd_host_init)
|
|
|
|
openbao_parser = subparsers.add_parser("openbao", help="Create host OpenBao bootstrap material.")
|
|
openbao_subparsers = openbao_parser.add_subparsers(dest="openbao_command")
|
|
|
|
init_host_parser = openbao_subparsers.add_parser("init-host", help="Create policy, AppRole, and bootstrap files.")
|
|
init_host_parser.add_argument("--name", required=True, help="Host name, e.g. vps2.")
|
|
init_host_parser.add_argument("--namespace", default="it", help="OpenBao namespace. Default: it.")
|
|
init_host_parser.add_argument("--secret-path", default="tailscale", help="Logical secret path. Default: tailscale.")
|
|
init_host_parser.add_argument("--field", default="auth_key", help="Secret field. Default: auth_key.")
|
|
init_host_parser.add_argument("--auth-path", default="auth/approle", help="AppRole auth mount. Default: auth/approle.")
|
|
init_host_parser.add_argument("--policy-name", help="Policy name. Default: tailscale-<host>.")
|
|
init_host_parser.add_argument("--role-name", help="AppRole name. Default: tailscale-<host>.")
|
|
init_host_parser.add_argument("--out", default="bootstrap", help="Bootstrap output directory. Default: ./bootstrap.")
|
|
init_host_parser.add_argument(
|
|
"--kv-mount-path",
|
|
help="Override the actual HCL policy path, e.g. kv/data/tailscale.",
|
|
)
|
|
init_host_parser.add_argument("--cidr", action="append", default=[], help="Optional CIDR restriction. Repeatable.")
|
|
init_host_parser.add_argument("--apply", action="store_true", help="Execute the plan after confirmation.")
|
|
init_host_parser.add_argument("--yes", action="store_true", help="Skip the interactive confirmation prompt.")
|
|
init_host_parser.set_defaults(func=cmd_openbao_init_host)
|
|
|
|
install_parser = subparsers.add_parser("install", help="Plan or run nixos-anywhere.")
|
|
install_subparsers = install_parser.add_subparsers(dest="install_command")
|
|
|
|
install_plan_parser = install_subparsers.add_parser("plan", help="Print the nixos-anywhere command.")
|
|
add_install_arguments(install_plan_parser)
|
|
install_plan_parser.set_defaults(func=cmd_install_plan)
|
|
|
|
install_run_parser = install_subparsers.add_parser("run", help="Execute the nixos-anywhere command.")
|
|
add_install_arguments(install_run_parser)
|
|
install_run_parser.add_argument("--apply", action="store_true", help="Actually run nixos-anywhere.")
|
|
install_run_parser.add_argument("--yes", action="store_true", help="Skip the interactive confirmation prompt.")
|
|
install_run_parser.set_defaults(func=cmd_install_run)
|
|
|
|
verify_parser = subparsers.add_parser("verify", help="Verify a provisioned host.")
|
|
verify_subparsers = verify_parser.add_subparsers(dest="verify_command")
|
|
|
|
verify_host_parser = verify_subparsers.add_parser("host", help="Check first-boot service health.")
|
|
verify_host_parser.add_argument("--name", required=True, help="Host name.")
|
|
verify_host_parser.add_argument("--ip", required=True, help="Target host IP or hostname.")
|
|
verify_host_parser.add_argument("--user", default="root", help="SSH user. Default: root.")
|
|
verify_host_parser.set_defaults(func=cmd_verify_host)
|
|
|
|
colmena_parser = subparsers.add_parser("colmena", help="Check colmena host inventory.")
|
|
colmena_subparsers = colmena_parser.add_subparsers(dest="colmena_command")
|
|
|
|
colmena_plan_parser = colmena_subparsers.add_parser("plan", help="Print the colmena target block or deploy command.")
|
|
colmena_plan_parser.add_argument("--name", required=True, help="Host name.")
|
|
colmena_plan_parser.add_argument("--ip", help="Target IP to use in a suggested snippet when missing.")
|
|
colmena_plan_parser.set_defaults(func=cmd_colmena_plan)
|
|
|
|
return parser
|
|
|
|
|
|
def add_install_arguments(parser: argparse.ArgumentParser) -> None:
|
|
parser.add_argument("--name", required=True, help="Host name.")
|
|
parser.add_argument("--ip", help="Target host IP. Defaults to the colmena inventory if present.")
|
|
parser.add_argument("--bootstrap-dir", default="bootstrap", help="Bootstrap directory. Default: ./bootstrap.")
|
|
parser.add_argument(
|
|
"--copy-host-keys",
|
|
choices=("on", "off"),
|
|
default="on",
|
|
help="Whether to pass --copy-host-keys. Default: on.",
|
|
)
|
|
parser.add_argument(
|
|
"--generate-hardware-config",
|
|
choices=("on", "off"),
|
|
default="on",
|
|
help="Whether to pass --generate-hardware-config. Default: on.",
|
|
)
|
|
|
|
|
|
def cmd_host_probe(args: argparse.Namespace) -> int:
|
|
facts = probe_host(args.ip, args.user)
|
|
if args.json:
|
|
print(json.dumps(facts.to_json(), indent=2, sort_keys=True))
|
|
return 0
|
|
|
|
print(f"Host: {args.user}@{args.ip}")
|
|
print(f"Boot mode: {facts.boot_mode.upper()}")
|
|
print(f"Primary disk: {facts.primary_disk}")
|
|
print(f"Root source: {facts.root_source}")
|
|
print(f"Root partition: {facts.root_partition}")
|
|
print(f"Disk family: {facts.disk_family}")
|
|
print(f"Swap devices: {', '.join(facts.swap_devices) if facts.swap_devices else 'none'}")
|
|
print("")
|
|
print("Disk inventory:")
|
|
for row in facts.disk_rows:
|
|
model = row.get("MODEL") or "n/a"
|
|
print(
|
|
" "
|
|
+ f"{row.get('NAME', '?')} size={row.get('SIZE', '?')} type={row.get('TYPE', '?')} "
|
|
+ f"model={model} fstype={row.get('FSTYPE', '') or '-'} pttype={row.get('PTTYPE', '') or '-'}"
|
|
)
|
|
return 0
|
|
|
|
|
|
def cmd_host_init(args: argparse.Namespace) -> int:
|
|
repo_root = find_repo_root(Path.cwd())
|
|
ensure_expected_repo_root(repo_root)
|
|
validate_host_name(args.name)
|
|
|
|
host_dir = repo_root / "hosts" / args.name
|
|
config_path = host_dir / "configuration.nix"
|
|
disko_path = host_dir / "disko.nix"
|
|
hardware_path = host_dir / "hardware-configuration.nix"
|
|
|
|
if not args.force:
|
|
ensure_git_paths_clean(repo_root, [config_path, disko_path, hardware_path])
|
|
|
|
host_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
existing_config = parse_existing_configuration(config_path) if config_path.exists() else None
|
|
existing_disko = parse_existing_disko(disko_path) if disko_path.exists() else None
|
|
|
|
repo_defaults = infer_repo_defaults(repo_root, skip_host=args.name)
|
|
facts = None
|
|
if not (args.disk and args.boot_mode):
|
|
facts = probe_host(args.ip, args.user)
|
|
|
|
disk_device = args.disk or (facts.primary_disk if facts else None)
|
|
boot_mode = normalize_boot_mode(args.boot_mode or (facts.boot_mode if facts else None))
|
|
if not disk_device or not boot_mode:
|
|
raise NodeiwestError("Unable to determine both disk and boot mode. Supply --disk and --boot-mode explicitly.")
|
|
|
|
if existing_config is not None and existing_config.host_name != args.name:
|
|
raise NodeiwestError(
|
|
f"{config_path.relative_to(repo_root)} already declares hostName={existing_config.host_name!r}, not {args.name!r}."
|
|
)
|
|
if existing_config is not None and existing_config.boot_mode != boot_mode:
|
|
raise NodeiwestError(
|
|
f"{config_path.relative_to(repo_root)} uses {existing_config.boot_mode.upper()} boot settings but the requested boot mode is {boot_mode.upper()}."
|
|
)
|
|
if existing_disko is not None and existing_disko.boot_mode != boot_mode:
|
|
raise NodeiwestError(
|
|
f"{disko_path.relative_to(repo_root)} describes a {existing_disko.boot_mode.upper()} layout but the requested boot mode is {boot_mode.upper()}."
|
|
)
|
|
|
|
if existing_disko is not None and existing_disko.disk_device != disk_device and not args.yes:
|
|
print(
|
|
f"Existing disk device in {disko_path.relative_to(repo_root)} is {existing_disko.disk_device}; requested device is {disk_device}.",
|
|
file=sys.stderr,
|
|
)
|
|
|
|
swap_size = args.swap_size or (existing_disko.swap_size if existing_disko else "4GiB")
|
|
timezone = args.timezone or (existing_config.timezone if existing_config else "UTC")
|
|
tailscale_openbao = parse_on_off(args.tailscale_openbao, existing_config.tailscale_openbao if existing_config else True)
|
|
state_version = existing_config.state_version if existing_config else repo_defaults.state_version
|
|
user_ca_public_keys = existing_config.user_ca_public_keys if existing_config else repo_defaults.user_ca_public_keys
|
|
|
|
if not user_ca_public_keys:
|
|
raise NodeiwestError(
|
|
"No SSH user CA public keys could be inferred from the repo. Add them to an existing host config first or create this host manually."
|
|
)
|
|
|
|
configuration_text = render_configuration(
|
|
host_name=args.name,
|
|
timezone=timezone,
|
|
boot_mode=boot_mode,
|
|
disk_device=disk_device,
|
|
tailscale_openbao=tailscale_openbao,
|
|
state_version=state_version,
|
|
user_ca_public_keys=user_ca_public_keys,
|
|
)
|
|
disko_text = render_disko(boot_mode=boot_mode, disk_device=disk_device, swap_size=swap_size)
|
|
hardware_text = load_template("hardware-configuration.placeholder.nix")
|
|
|
|
plans = []
|
|
plans.extend(plan_file_update(config_path, configuration_text))
|
|
plans.extend(plan_file_update(disko_path, disko_text))
|
|
if hardware_path.exists():
|
|
plans.extend(plan_file_update(hardware_path, hardware_path.read_text()))
|
|
else:
|
|
plans.extend(plan_file_update(hardware_path, hardware_text))
|
|
|
|
if not plans:
|
|
print(f"No changes required under hosts/{args.name}.")
|
|
else:
|
|
print(f"Planned updates for hosts/{args.name}:")
|
|
for plan in plans:
|
|
print("")
|
|
print(plan["summary"])
|
|
if plan["diff"]:
|
|
print(plan["diff"])
|
|
|
|
flake_text = (repo_root / "flake.nix").read_text()
|
|
nixos_missing = not flake_has_nixos_configuration(flake_text, args.name)
|
|
colmena_missing = not flake_has_colmena_host(flake_text, args.name)
|
|
if nixos_missing or colmena_missing:
|
|
print("")
|
|
print("flake.nix additions required:")
|
|
if nixos_missing:
|
|
print(build_nixos_configuration_snippet(args.name))
|
|
if colmena_missing:
|
|
print(build_colmena_host_snippet(args.name, args.ip))
|
|
|
|
if not args.apply:
|
|
print("")
|
|
print("Dry run only. Re-run with --apply to write these files.")
|
|
return 0
|
|
|
|
if plans and not args.yes:
|
|
if not confirm("Write the planned host files? [y/N] "):
|
|
raise NodeiwestError("Aborted before writing host files.")
|
|
|
|
for plan in plans:
|
|
if plan["changed"]:
|
|
write_file_with_backup(plan["path"], plan["new_text"])
|
|
rel_path = plan["path"].relative_to(repo_root)
|
|
print(f"Wrote {rel_path}")
|
|
|
|
if not plans:
|
|
print("Nothing to write.")
|
|
return 0
|
|
|
|
|
|
def cmd_openbao_init_host(args: argparse.Namespace) -> int:
|
|
repo_root = find_repo_root(Path.cwd())
|
|
ensure_expected_repo_root(repo_root)
|
|
validate_host_name(args.name)
|
|
ensure_command_available("bao")
|
|
ensure_bao_authenticated()
|
|
|
|
policy_name = args.policy_name or f"tailscale-{args.name}"
|
|
role_name = args.role_name or f"tailscale-{args.name}"
|
|
output_dir = resolve_path(repo_root, args.out)
|
|
role_id_path = output_dir / "var" / "lib" / "nodeiwest" / "openbao-approle-role-id"
|
|
secret_id_path = output_dir / "var" / "lib" / "nodeiwest" / "openbao-approle-secret-id"
|
|
|
|
secret_data = bao_kv_get(args.namespace, args.secret_path)
|
|
fields = secret_data.get("data", {})
|
|
if isinstance(fields.get("data"), dict):
|
|
fields = fields["data"]
|
|
if args.field not in fields:
|
|
raise NodeiwestError(
|
|
f"OpenBao secret {args.secret_path!r} in namespace {args.namespace!r} does not contain field {args.field!r}."
|
|
)
|
|
|
|
if args.kv_mount_path:
|
|
policy_content = render_openbao_policy(args.kv_mount_path)
|
|
else:
|
|
policy_content = derive_openbao_policy(args.namespace, args.secret_path)
|
|
|
|
role_command = build_approle_write_command(args.auth_path, role_name, policy_name, args.cidr)
|
|
|
|
print(f"Namespace: {args.namespace}")
|
|
print(f"Policy name: {policy_name}")
|
|
print(f"Role name: {role_name}")
|
|
print(f"Secret path: {args.secret_path}")
|
|
print(f"Field: {args.field}")
|
|
print(f"Bootstrap output: {output_dir}")
|
|
print("")
|
|
print("Policy content:")
|
|
print(policy_content.rstrip())
|
|
print("")
|
|
print("AppRole command:")
|
|
print(shlex.join(role_command))
|
|
print("")
|
|
print("Bootstrap files:")
|
|
print(f" {role_id_path}")
|
|
print(f" {secret_id_path}")
|
|
|
|
if not args.apply:
|
|
print("")
|
|
print("Dry run only. Re-run with --apply to create the policy, AppRole, and bootstrap files.")
|
|
return 0
|
|
|
|
if not args.yes and not confirm("Create or update the OpenBao policy, AppRole, and bootstrap files? [y/N] "):
|
|
raise NodeiwestError("Aborted before OpenBao writes.")
|
|
|
|
with tempfile.NamedTemporaryFile("w", delete=False) as handle:
|
|
handle.write(policy_content.rstrip() + "\n")
|
|
temp_policy_path = Path(handle.name)
|
|
|
|
try:
|
|
bao_env = {"BAO_NAMESPACE": args.namespace}
|
|
run_command(
|
|
["bao", "policy", "write", policy_name, str(temp_policy_path)],
|
|
cwd=repo_root,
|
|
env=bao_env,
|
|
next_fix="Check that your token can write policies in the selected namespace.",
|
|
)
|
|
run_command(
|
|
role_command,
|
|
cwd=repo_root,
|
|
env=bao_env,
|
|
next_fix="Check that the AppRole auth mount exists and that your token can manage roles.",
|
|
)
|
|
role_id = run_command(
|
|
["bao", "read", "-field=role_id", f"{args.auth_path}/role/{role_name}/role-id"],
|
|
cwd=repo_root,
|
|
env=bao_env,
|
|
next_fix="Check that the AppRole was created successfully before fetching role_id.",
|
|
).stdout.strip()
|
|
secret_id = run_command(
|
|
["bao", "write", "-f", "-field=secret_id", f"{args.auth_path}/role/{role_name}/secret-id"],
|
|
cwd=repo_root,
|
|
env=bao_env,
|
|
next_fix="Check that the AppRole supports SecretIDs and that your token can generate them.",
|
|
).stdout.strip()
|
|
finally:
|
|
temp_policy_path.unlink(missing_ok=True)
|
|
|
|
role_id_path.parent.mkdir(parents=True, exist_ok=True)
|
|
write_secret_file(role_id_path, role_id + "\n")
|
|
write_secret_file(secret_id_path, secret_id + "\n")
|
|
|
|
print("")
|
|
print("OpenBao bootstrap material written.")
|
|
print(f"Role ID: {role_id_path}")
|
|
print(f"Secret ID: {secret_id_path}")
|
|
print("")
|
|
print("Next step:")
|
|
print(f" nodeiwest install plan --name {args.name} --bootstrap-dir {shlex.quote(str(output_dir))}")
|
|
return 0
|
|
|
|
|
|
def cmd_install_plan(args: argparse.Namespace) -> int:
|
|
repo_root = find_repo_root(Path.cwd())
|
|
ensure_expected_repo_root(repo_root)
|
|
install_context = build_install_context(repo_root, args)
|
|
print_install_plan(install_context)
|
|
return 0
|
|
|
|
|
|
def cmd_install_run(args: argparse.Namespace) -> int:
|
|
if not args.apply:
|
|
raise NodeiwestError("install run is destructive. Re-run with --apply to execute nixos-anywhere.")
|
|
|
|
repo_root = find_repo_root(Path.cwd())
|
|
ensure_expected_repo_root(repo_root)
|
|
install_context = build_install_context(repo_root, args)
|
|
ensure_ssh_reachable(install_context["ip"], "root")
|
|
print_install_plan(install_context)
|
|
if not args.yes and not confirm("Run nixos-anywhere now? [y/N] "):
|
|
raise NodeiwestError("Aborted before running nixos-anywhere.")
|
|
|
|
print("")
|
|
stream_command(
|
|
install_context["command"],
|
|
cwd=repo_root,
|
|
next_fix="Recover via provider console or public SSH, then re-check the generated host files and bootstrap material.",
|
|
)
|
|
|
|
print("")
|
|
print("Install completed. Verify first boot with:")
|
|
print(f" nodeiwest verify host --name {args.name} --ip {install_context['ip']}")
|
|
return 0
|
|
|
|
|
|
def cmd_verify_host(args: argparse.Namespace) -> int:
|
|
validate_host_name(args.name)
|
|
services = [
|
|
"vault-agent-tailscale",
|
|
"nodeiwest-tailscale-authkey-ready",
|
|
"tailscaled-autoconnect",
|
|
]
|
|
|
|
service_results: dict[str, subprocess.CompletedProcess[str]] = {}
|
|
for service in services:
|
|
service_results[service] = ssh_command(
|
|
args.user,
|
|
args.ip,
|
|
f"systemctl status --no-pager --lines=20 {shlex.quote(service)}",
|
|
check=False,
|
|
next_fix="Check public SSH reachability before retrying verification.",
|
|
)
|
|
|
|
tailscale_status = ssh_command(
|
|
args.user,
|
|
args.ip,
|
|
"tailscale status",
|
|
check=False,
|
|
next_fix="Check public SSH reachability before retrying verification.",
|
|
)
|
|
|
|
print(f"Verification target: {args.user}@{args.ip} ({args.name})")
|
|
print("")
|
|
for service in services:
|
|
state = classify_systemd_status(service_results[service])
|
|
print(f"{service}: {state}")
|
|
print(f"tailscale status: {'healthy' if tailscale_status.returncode == 0 else 'error'}")
|
|
|
|
causes = infer_verify_failures(service_results, tailscale_status)
|
|
if causes:
|
|
print("")
|
|
print("Likely causes:")
|
|
for cause in causes:
|
|
print(f" - {cause}")
|
|
|
|
print("")
|
|
print("Service excerpts:")
|
|
for service in services:
|
|
print(f"[{service}]")
|
|
excerpt = summarize_text(service_results[service].stdout or service_results[service].stderr, 12)
|
|
print(excerpt or "(no output)")
|
|
print("")
|
|
print("[tailscale status]")
|
|
print(summarize_text(tailscale_status.stdout or tailscale_status.stderr, 12) or "(no output)")
|
|
return 0
|
|
|
|
|
|
def cmd_colmena_plan(args: argparse.Namespace) -> int:
|
|
repo_root = find_repo_root(Path.cwd())
|
|
ensure_expected_repo_root(repo_root)
|
|
validate_host_name(args.name)
|
|
|
|
flake_text = (repo_root / "flake.nix").read_text()
|
|
target_host = lookup_colmena_target_host(flake_text, args.name)
|
|
if target_host:
|
|
print(f"colmena targetHost for {args.name}: {target_host}")
|
|
else:
|
|
if not args.ip:
|
|
raise NodeiwestError(
|
|
f"flake.nix does not define colmena.{args.name}.deployment.targetHost and no --ip was provided."
|
|
)
|
|
print("Missing colmena host block. Add this to flake.nix:")
|
|
print(build_colmena_host_snippet(args.name, args.ip))
|
|
print("")
|
|
print(f"Deploy command: nix run .#colmena -- apply --on {args.name}")
|
|
return 0
|
|
|
|
|
|
def find_repo_root(start: Path) -> Path:
|
|
git_result = subprocess.run(
|
|
["git", "rev-parse", "--show-toplevel"],
|
|
cwd=start,
|
|
text=True,
|
|
capture_output=True,
|
|
)
|
|
if git_result.returncode == 0:
|
|
return Path(git_result.stdout.strip()).resolve()
|
|
|
|
current = start.resolve()
|
|
for candidate in [current, *current.parents]:
|
|
if (candidate / "flake.nix").exists() and (candidate / "modules" / "home.nix").exists():
|
|
return candidate
|
|
raise NodeiwestError("Not inside the nix-nodeiwest repository. Run the helper from this flake checkout.")
|
|
|
|
|
|
def ensure_expected_repo_root(repo_root: Path) -> None:
|
|
required = [
|
|
repo_root / "flake.nix",
|
|
repo_root / "modules" / "home.nix",
|
|
repo_root / "hosts",
|
|
]
|
|
missing = [path for path in required if not path.exists()]
|
|
if missing:
|
|
formatted = ", ".join(str(path.relative_to(repo_root)) for path in missing)
|
|
raise NodeiwestError(f"Repository root is missing expected files: {formatted}")
|
|
|
|
|
|
def validate_host_name(name: str) -> None:
|
|
if not re.fullmatch(r"[a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?", name):
|
|
raise NodeiwestError(
|
|
f"Invalid host name {name!r}. Use lowercase letters, digits, and hyphens only, without a trailing hyphen."
|
|
)
|
|
|
|
|
|
def probe_host(ip: str, user: str) -> ProbeFacts:
|
|
lsblk_cmd = "lsblk -o NAME,SIZE,TYPE,MODEL,FSTYPE,PTTYPE,MOUNTPOINTS"
|
|
boot_cmd = "test -d /sys/firmware/efi && echo UEFI || echo BIOS"
|
|
root_cmd = "findmnt -no SOURCE /"
|
|
swap_cmd = "cat /proc/swaps"
|
|
|
|
lsblk_output = ssh_command(user, ip, lsblk_cmd, next_fix="Check SSH access and that lsblk exists on the target.").stdout
|
|
boot_output = ssh_command(user, ip, boot_cmd, next_fix="Check SSH access and that /sys/firmware is readable.").stdout
|
|
root_output = ssh_command(user, ip, root_cmd, next_fix="Check SSH access and that findmnt exists on the target.").stdout
|
|
swap_output = ssh_command(user, ip, swap_cmd, next_fix="Check SSH access and that /proc/swaps is readable.").stdout
|
|
|
|
disk_rows = parse_lsblk_output(lsblk_output)
|
|
disk_devices = [f"/dev/{row['NAME']}" for row in disk_rows if row.get("TYPE") == "disk"]
|
|
if not disk_devices:
|
|
raise NodeiwestError("No disk devices were found in the remote lsblk output.")
|
|
|
|
root_source = root_output.strip()
|
|
if not root_source:
|
|
raise NodeiwestError("findmnt returned an empty root source; cannot determine the primary disk.")
|
|
root_partition = normalize_device(root_source)
|
|
primary_disk = disk_from_device(root_partition)
|
|
if primary_disk not in disk_devices:
|
|
if len(disk_devices) == 1:
|
|
primary_disk = disk_devices[0]
|
|
else:
|
|
raise NodeiwestError(
|
|
"Multiple candidate disks were found and the root source did not map cleanly to one of them. Re-run with --disk."
|
|
)
|
|
|
|
boot_mode = normalize_boot_mode(boot_output.strip())
|
|
swap_devices = parse_swaps(swap_output)
|
|
|
|
return ProbeFacts(
|
|
ip=ip,
|
|
user=user,
|
|
boot_mode=boot_mode,
|
|
primary_disk=primary_disk,
|
|
root_partition=root_partition,
|
|
root_source=root_source,
|
|
disk_family=classify_disk_family(primary_disk),
|
|
swap_devices=swap_devices,
|
|
disk_rows=disk_rows,
|
|
raw_outputs={
|
|
"lsblk": lsblk_output,
|
|
"boot_mode": boot_output,
|
|
"root_source": root_output,
|
|
"swaps": swap_output,
|
|
},
|
|
)
|
|
|
|
|
|
def parse_lsblk_output(output: str) -> list[dict[str, str]]:
|
|
lines = [line.rstrip("\n") for line in output.splitlines() if line.strip()]
|
|
if len(lines) < 2:
|
|
raise NodeiwestError("Unexpected lsblk output: not enough lines to parse.")
|
|
|
|
header = lines[0]
|
|
columns = ["NAME", "SIZE", "TYPE", "MODEL", "FSTYPE", "PTTYPE", "MOUNTPOINTS"]
|
|
starts = []
|
|
for column in columns:
|
|
index = header.find(column)
|
|
if index == -1:
|
|
raise NodeiwestError(f"Unexpected lsblk output: missing {column} column.")
|
|
starts.append(index)
|
|
starts.append(len(header) + 20)
|
|
|
|
rows: list[dict[str, str]] = []
|
|
for line in lines[1:]:
|
|
row: dict[str, str] = {}
|
|
for idx, column in enumerate(columns):
|
|
start = starts[idx]
|
|
end = starts[idx + 1] if idx + 1 < len(columns) else len(line)
|
|
row[column] = line[start:end].strip()
|
|
row["NAME"] = re.sub(r"^[^0-9A-Za-z]+", "", row["NAME"])
|
|
rows.append(row)
|
|
return rows
|
|
|
|
|
|
def normalize_boot_mode(value: str | None) -> str:
|
|
if not value:
|
|
raise NodeiwestError("Boot mode is missing.")
|
|
normalized = value.strip().lower()
|
|
if normalized not in BOOT_MODE_CHOICES:
|
|
raise NodeiwestError(f"Unsupported boot mode {value!r}. Expected one of: {', '.join(BOOT_MODE_CHOICES)}.")
|
|
return normalized
|
|
|
|
|
|
def normalize_device(value: str) -> str:
|
|
normalized = value.strip()
|
|
if not normalized.startswith("/dev/"):
|
|
raise NodeiwestError(
|
|
f"Unsupported root source {value!r}. Only plain /dev/* block devices are supported by the helper."
|
|
)
|
|
return normalized
|
|
|
|
|
|
def disk_from_device(device: str) -> str:
|
|
name = Path(device).name
|
|
if re.fullmatch(r"nvme\d+n\d+p\d+", name) or re.fullmatch(r"mmcblk\d+p\d+", name):
|
|
base_name = re.sub(r"p\d+$", "", name)
|
|
return f"/dev/{base_name}"
|
|
if re.search(r"\d+$", name):
|
|
base_name = re.sub(r"\d+$", "", name)
|
|
return f"/dev/{base_name}"
|
|
return device
|
|
|
|
|
|
def classify_disk_family(device: str) -> str:
|
|
name = Path(device).name
|
|
if name.startswith("nvme"):
|
|
return "nvme"
|
|
if name.startswith("vd"):
|
|
return "vda"
|
|
if name.startswith("sd"):
|
|
return "sda"
|
|
return "other"
|
|
|
|
|
|
def parse_swaps(output: str) -> list[str]:
|
|
lines = [line.strip() for line in output.splitlines() if line.strip()]
|
|
if len(lines) <= 1:
|
|
return []
|
|
return [line.split()[0] for line in lines[1:]]
|
|
|
|
|
|
def parse_existing_configuration(path: Path) -> ExistingConfiguration:
|
|
text = path.read_text()
|
|
if "./disko.nix" not in text or "./hardware-configuration.nix" not in text:
|
|
raise NodeiwestError(
|
|
f"{path} does not match the supported configuration shape. Manual intervention is required."
|
|
)
|
|
|
|
host_name = extract_single_match(text, r'networking\.hostName\s*=\s*"([^"]+)";', path, "hostName")
|
|
timezone = extract_single_match(text, r'time\.timeZone\s*=\s*"([^"]+)";', path, "time.timeZone")
|
|
state_version = extract_single_match(text, r'system\.stateVersion\s*=\s*"([^"]+)";', path, "system.stateVersion")
|
|
user_ca_public_keys = extract_nix_string_list(text, r"nodeiwest\.ssh\.userCAPublicKeys\s*=\s*\[(.*?)\];", path)
|
|
tailscale_enable_text = extract_optional_match(
|
|
text,
|
|
r"nodeiwest\.tailscale\.openbao(?:\.enable\s*=\s*|\s*=\s*\{[^}]*enable\s*=\s*)(true|false);",
|
|
)
|
|
if tailscale_enable_text is None:
|
|
raise NodeiwestError(
|
|
f"{path} does not contain a supported nodeiwest.tailscale.openbao.enable declaration."
|
|
)
|
|
if 'boot.loader.efi.canTouchEfiVariables = true;' in text and 'device = "nodev";' in text:
|
|
boot_mode = "uefi"
|
|
elif re.search(r'boot\.loader\.grub\s*=\s*\{[^}]*device\s*=\s*"/dev/', text, re.S) or 'efiSupport = false;' in text:
|
|
boot_mode = "bios"
|
|
else:
|
|
raise NodeiwestError(
|
|
f"{path} has a boot loader configuration outside the helper's supported template shape."
|
|
)
|
|
|
|
return ExistingConfiguration(
|
|
host_name=host_name,
|
|
timezone=timezone,
|
|
boot_mode=boot_mode,
|
|
tailscale_openbao=(tailscale_enable_text == "true"),
|
|
user_ca_public_keys=user_ca_public_keys,
|
|
state_version=state_version,
|
|
managed=SUPPORTED_CONFIG_MARKER in text,
|
|
)
|
|
|
|
|
|
def parse_existing_disko(path: Path) -> ExistingDisko:
|
|
text = path.read_text()
|
|
if 'type = "gpt";' not in text or 'format = "ext4";' not in text or 'type = "swap";' not in text:
|
|
raise NodeiwestError(
|
|
f"{path} does not match the supported single-disk ext4+swap disko shape. Manual intervention is required."
|
|
)
|
|
disk_device = extract_single_match(text, r'device\s*=\s*lib\.mkDefault\s*"([^"]+)";', path, "disk device")
|
|
swap_size = extract_single_match(text, r'swap\s*=\s*\{.*?size\s*=\s*"([^"]+)";', path, "swap size", flags=re.S)
|
|
if 'type = "EF00";' in text and 'mountpoint = "/boot";' in text:
|
|
boot_mode = "uefi"
|
|
elif 'type = "EF02";' in text:
|
|
boot_mode = "bios"
|
|
else:
|
|
raise NodeiwestError(
|
|
f"{path} does not match the helper's supported UEFI or BIOS templates."
|
|
)
|
|
|
|
return ExistingDisko(
|
|
disk_device=disk_device,
|
|
boot_mode=boot_mode,
|
|
swap_size=swap_size,
|
|
managed=SUPPORTED_DISKO_MARKER in text,
|
|
)
|
|
|
|
|
|
def infer_repo_defaults(repo_root: Path, skip_host: str | None = None) -> RepoDefaults:
|
|
hosts_dir = repo_root / "hosts"
|
|
state_versions: list[str] = []
|
|
ca_key_sets: set[tuple[str, ...]] = set()
|
|
|
|
for config_path in sorted(hosts_dir.glob("*/configuration.nix")):
|
|
if skip_host and config_path.parent.name == skip_host:
|
|
continue
|
|
try:
|
|
existing = parse_existing_configuration(config_path)
|
|
except NodeiwestError:
|
|
continue
|
|
state_versions.append(existing.state_version)
|
|
if existing.user_ca_public_keys:
|
|
ca_key_sets.add(tuple(existing.user_ca_public_keys))
|
|
|
|
state_version = most_common_value(state_versions) or DEFAULT_STATE_VERSION
|
|
if len(ca_key_sets) > 1:
|
|
raise NodeiwestError(
|
|
"Existing host configs define multiple different SSH user CA key lists. The helper will not guess which set to reuse."
|
|
)
|
|
user_ca_public_keys = list(next(iter(ca_key_sets))) if ca_key_sets else []
|
|
return RepoDefaults(state_version=state_version, user_ca_public_keys=user_ca_public_keys)
|
|
|
|
|
|
def most_common_value(values: list[str]) -> str | None:
|
|
if not values:
|
|
return None
|
|
counts: dict[str, int] = {}
|
|
for value in values:
|
|
counts[value] = counts.get(value, 0) + 1
|
|
return sorted(counts.items(), key=lambda item: (-item[1], item[0]))[0][0]
|
|
|
|
|
|
def render_configuration(
|
|
*,
|
|
host_name: str,
|
|
timezone: str,
|
|
boot_mode: str,
|
|
disk_device: str,
|
|
tailscale_openbao: bool,
|
|
state_version: str,
|
|
user_ca_public_keys: list[str],
|
|
) -> str:
|
|
template = load_template("configuration.nix.tmpl")
|
|
boot_loader_block = render_boot_loader_block(boot_mode, disk_device)
|
|
rendered = template
|
|
rendered = rendered.replace("@@HOST_NAME@@", host_name)
|
|
rendered = rendered.replace("@@TIMEZONE@@", timezone)
|
|
rendered = rendered.replace("@@BOOT_LOADER_BLOCK@@", indent(boot_loader_block.rstrip(), " "))
|
|
rendered = rendered.replace("@@SSH_CA_KEYS@@", render_nix_string_list(user_ca_public_keys, indent_level=2))
|
|
rendered = rendered.replace("@@TAILSCALE_OPENBAO_ENABLE@@", render_nix_bool(tailscale_openbao))
|
|
rendered = rendered.replace("@@STATE_VERSION@@", state_version)
|
|
return ensure_trailing_newline(rendered)
|
|
|
|
|
|
def render_boot_loader_block(boot_mode: str, disk_device: str) -> str:
|
|
if boot_mode == "uefi":
|
|
return """
|
|
boot.loader.efi.canTouchEfiVariables = true;
|
|
boot.loader.grub = {
|
|
enable = true;
|
|
efiSupport = true;
|
|
device = "nodev";
|
|
};
|
|
""".strip("\n")
|
|
return f"""
|
|
boot.loader.grub = {{
|
|
enable = true;
|
|
efiSupport = false;
|
|
device = "{escape_nix_string(disk_device)}";
|
|
}};
|
|
""".strip("\n")
|
|
|
|
|
|
def render_disko(*, boot_mode: str, disk_device: str, swap_size: str) -> str:
|
|
template_name = "disko-uefi-ext4.nix" if boot_mode == "uefi" else "disko-bios-ext4.nix"
|
|
rendered = load_template(template_name)
|
|
rendered = rendered.replace("@@DISK_DEVICE@@", escape_nix_string(disk_device))
|
|
rendered = rendered.replace("@@SWAP_SIZE@@", escape_nix_string(swap_size))
|
|
return ensure_trailing_newline(rendered)
|
|
|
|
|
|
def render_openbao_policy(policy_path: str) -> str:
|
|
rendered = load_template("openbao-policy.hcl.tmpl").replace("@@POLICY_PATH@@", policy_path)
|
|
return ensure_trailing_newline(rendered)
|
|
|
|
|
|
def load_template(name: str) -> str:
|
|
templates_dir = Path(os.environ.get("NODEIWEST_HELPER_TEMPLATES", Path(__file__).resolve().parent / "templates"))
|
|
template_path = templates_dir / name
|
|
if not template_path.exists():
|
|
raise NodeiwestError(f"Missing helper template: {template_path}")
|
|
return template_path.read_text()
|
|
|
|
|
|
def render_nix_string_list(values: list[str], indent_level: int = 0) -> str:
|
|
if not values:
|
|
return "[ ]"
|
|
indent_text = " " * indent_level
|
|
lines = ["["]
|
|
for value in values:
|
|
lines.append(f'{indent_text} "{escape_nix_string(value)}"')
|
|
lines.append(f"{indent_text}]")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def render_nix_bool(value: bool) -> str:
|
|
return "true" if value else "false"
|
|
|
|
|
|
def escape_nix_string(value: str) -> str:
|
|
return value.replace("\\", "\\\\").replace('"', '\\"')
|
|
|
|
|
|
def ensure_trailing_newline(text: str) -> str:
|
|
return text if text.endswith("\n") else text + "\n"
|
|
|
|
|
|
def indent(text: str, prefix: str) -> str:
|
|
return "\n".join(prefix + line if line else line for line in text.splitlines())
|
|
|
|
|
|
def plan_file_update(path: Path, new_text: str) -> list[dict[str, Any]]:
|
|
if path.exists():
|
|
old_text = path.read_text()
|
|
if old_text == new_text:
|
|
return []
|
|
diff = unified_diff(path, old_text, new_text)
|
|
return [{
|
|
"path": path,
|
|
"changed": True,
|
|
"new_text": new_text,
|
|
"summary": f"Update {path.name}",
|
|
"diff": diff,
|
|
}]
|
|
|
|
diff = unified_diff(path, "", new_text)
|
|
return [{
|
|
"path": path,
|
|
"changed": True,
|
|
"new_text": new_text,
|
|
"summary": f"Create {path.name}",
|
|
"diff": diff,
|
|
}]
|
|
|
|
|
|
def unified_diff(path: Path, old_text: str, new_text: str) -> str:
|
|
old_lines = old_text.splitlines()
|
|
new_lines = new_text.splitlines()
|
|
diff = difflib.unified_diff(
|
|
old_lines,
|
|
new_lines,
|
|
fromfile=str(path),
|
|
tofile=str(path),
|
|
lineterm="",
|
|
)
|
|
return "\n".join(diff)
|
|
|
|
|
|
def write_file_with_backup(path: Path, text: str) -> None:
|
|
if path.exists():
|
|
backup_path = backup_file(path)
|
|
print(f"Backed up {path.name} to {backup_path.name}")
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(text)
|
|
|
|
|
|
def write_secret_file(path: Path, text: str) -> None:
|
|
if path.exists():
|
|
backup_path = backup_file(path)
|
|
print(f"Backed up {path.name} to {backup_path.name}")
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(text)
|
|
path.chmod(0o400)
|
|
|
|
|
|
def backup_file(path: Path) -> Path:
|
|
timestamp = dt.datetime.now(dt.timezone.utc).strftime("%Y%m%d%H%M%S")
|
|
backup_path = path.with_name(f"{path.name}.bak.{timestamp}")
|
|
shutil.copy2(path, backup_path)
|
|
return backup_path
|
|
|
|
|
|
def ensure_git_paths_clean(repo_root: Path, paths: list[Path]) -> None:
|
|
existing_paths = [path for path in paths if path.exists()]
|
|
if not existing_paths:
|
|
return
|
|
relative_paths = [str(path.relative_to(repo_root)) for path in existing_paths]
|
|
result = run_command(
|
|
["git", "status", "--porcelain", "--", *relative_paths],
|
|
cwd=repo_root,
|
|
next_fix="Commit or stash local edits to the target host files, or re-run with --force if you intentionally want to overwrite them.",
|
|
)
|
|
if result.stdout.strip():
|
|
raise NodeiwestError(
|
|
"Refusing to modify host files with local git changes:\n"
|
|
+ summarize_text(result.stdout, 20)
|
|
+ "\nRe-run with --force to override this guard."
|
|
)
|
|
|
|
|
|
def flake_has_nixos_configuration(flake_text: str, name: str) -> bool:
|
|
pattern = rf'^\s*{re.escape(name)}\s*=\s*mkHost\s+"{re.escape(name)}";'
|
|
return re.search(pattern, flake_text, re.M) is not None
|
|
|
|
|
|
def flake_has_colmena_host(flake_text: str, name: str) -> bool:
|
|
target_host = lookup_colmena_target_host(flake_text, name)
|
|
return target_host is not None
|
|
|
|
|
|
def lookup_colmena_target_host(flake_text: str, name: str) -> str | None:
|
|
pattern = re.compile(
|
|
rf'colmena\s*=\s*\{{.*?^\s*{re.escape(name)}\s*=\s*\{{.*?targetHost\s*=\s*"([^"]+)";',
|
|
re.S | re.M,
|
|
)
|
|
match = pattern.search(flake_text)
|
|
return match.group(1) if match else None
|
|
|
|
|
|
def build_nixos_configuration_snippet(name: str) -> str:
|
|
return f' {name} = mkHost "{name}";'
|
|
|
|
|
|
def build_colmena_host_snippet(name: str, ip: str) -> str:
|
|
return (
|
|
f" {name} = {{\n"
|
|
f" deployment = {{\n"
|
|
f' targetHost = "{ip}";\n'
|
|
f' targetUser = "root";\n'
|
|
f" tags = [\n"
|
|
f' "company"\n'
|
|
f" ];\n"
|
|
f" }};\n\n"
|
|
f" imports = [ ./hosts/{name}/configuration.nix ];\n"
|
|
f" }};"
|
|
)
|
|
|
|
|
|
def ensure_command_available(name: str) -> None:
|
|
if shutil.which(name) is None:
|
|
raise NodeiwestError(f"Required command {name!r} is not available in PATH.")
|
|
|
|
|
|
def ensure_bao_authenticated() -> None:
|
|
run_command(
|
|
["bao", "token", "lookup"],
|
|
next_fix="Run a bao login flow first and verify that `bao token lookup` succeeds.",
|
|
)
|
|
|
|
|
|
def bao_kv_get(namespace: str, secret_path: str) -> dict[str, Any]:
|
|
result = run_command(
|
|
["bao", "kv", "get", "-format=json", secret_path],
|
|
env={"BAO_NAMESPACE": namespace},
|
|
next_fix=(
|
|
"Check BAO_ADDR, BAO_NAMESPACE, and the logical secret path. "
|
|
"If the path or mount is ambiguous, re-run with --kv-mount-path."
|
|
),
|
|
)
|
|
try:
|
|
return json.loads(result.stdout)
|
|
except json.JSONDecodeError as exc:
|
|
raise NodeiwestError(f"Failed to parse `bao kv get` JSON output: {exc}") from exc
|
|
|
|
|
|
def derive_openbao_policy(namespace: str, secret_path: str) -> str:
|
|
result = run_command(
|
|
["bao", "kv", "get", "-output-policy", secret_path],
|
|
env={"BAO_NAMESPACE": namespace},
|
|
next_fix=(
|
|
"Check BAO_ADDR, BAO_NAMESPACE, and the logical secret path. "
|
|
"If policy derivation still does not match your mount layout, re-run with --kv-mount-path."
|
|
),
|
|
)
|
|
policy = result.stdout.strip()
|
|
if not policy:
|
|
raise NodeiwestError("`bao kv get -output-policy` returned an empty policy.")
|
|
return ensure_trailing_newline(policy)
|
|
|
|
|
|
def build_approle_write_command(auth_path: str, role_name: str, policy_name: str, cidrs: list[str]) -> list[str]:
|
|
command = [
|
|
"bao",
|
|
"write",
|
|
f"{auth_path}/role/{role_name}",
|
|
f"token_policies={policy_name}",
|
|
"token_ttl=1h",
|
|
"token_max_ttl=24h",
|
|
"token_num_uses=0",
|
|
"secret_id_num_uses=0",
|
|
]
|
|
if cidrs:
|
|
csv = ",".join(cidrs)
|
|
command.extend([
|
|
f"token_bound_cidrs={csv}",
|
|
f"secret_id_bound_cidrs={csv}",
|
|
])
|
|
return command
|
|
|
|
|
|
def build_install_context(repo_root: Path, args: argparse.Namespace) -> dict[str, Any]:
|
|
validate_host_name(args.name)
|
|
flake_text = (repo_root / "flake.nix").read_text()
|
|
if not flake_has_nixos_configuration(flake_text, args.name):
|
|
raise NodeiwestError(
|
|
f"flake.nix does not define nixosConfigurations.{args.name}.\nAdd this block:\n{build_nixos_configuration_snippet(args.name)}"
|
|
)
|
|
|
|
ip = args.ip or lookup_colmena_target_host(flake_text, args.name)
|
|
if not ip:
|
|
raise NodeiwestError(
|
|
f"Could not determine an IP for {args.name}. Pass --ip or add a colmena targetHost.\n"
|
|
+ build_colmena_host_snippet(args.name, "<ip>")
|
|
)
|
|
|
|
host_dir = repo_root / "hosts" / args.name
|
|
configuration_path = host_dir / "configuration.nix"
|
|
disko_path = host_dir / "disko.nix"
|
|
hardware_path = host_dir / "hardware-configuration.nix"
|
|
bootstrap_dir = resolve_path(repo_root, args.bootstrap_dir)
|
|
role_id_path = bootstrap_dir / "var" / "lib" / "nodeiwest" / "openbao-approle-role-id"
|
|
secret_id_path = bootstrap_dir / "var" / "lib" / "nodeiwest" / "openbao-approle-secret-id"
|
|
|
|
required_paths = [configuration_path, disko_path, role_id_path, secret_id_path]
|
|
missing = [path for path in required_paths if not path.exists()]
|
|
if missing:
|
|
formatted = "\n".join(f" - {path}" for path in missing)
|
|
raise NodeiwestError(f"Install prerequisites are missing:\n{formatted}")
|
|
|
|
if args.generate_hardware_config == "off" and not hardware_path.exists():
|
|
raise NodeiwestError(
|
|
f"{hardware_path.relative_to(repo_root)} is missing and --generate-hardware-config=off was requested."
|
|
)
|
|
|
|
command = [
|
|
"nix",
|
|
"run",
|
|
"github:nix-community/nixos-anywhere",
|
|
"--",
|
|
"--extra-files",
|
|
str(bootstrap_dir),
|
|
]
|
|
if args.copy_host_keys == "on":
|
|
command.append("--copy-host-keys")
|
|
if args.generate_hardware_config == "on":
|
|
command.extend([
|
|
"--generate-hardware-config",
|
|
"nixos-generate-config",
|
|
str(hardware_path),
|
|
])
|
|
command.extend([
|
|
"--flake",
|
|
f".#{args.name}",
|
|
f"root@{ip}",
|
|
])
|
|
|
|
return {
|
|
"ip": ip,
|
|
"command": command,
|
|
"configuration_path": configuration_path,
|
|
"disko_path": disko_path,
|
|
"hardware_path": hardware_path,
|
|
"role_id_path": role_id_path,
|
|
"secret_id_path": secret_id_path,
|
|
"colmena_missing": not flake_has_colmena_host(flake_text, args.name),
|
|
}
|
|
|
|
|
|
def print_install_plan(context: dict[str, Any]) -> None:
|
|
print("Install command:")
|
|
print(shlex.join(context["command"]))
|
|
print("")
|
|
print("Preflight checklist:")
|
|
print(" - provider snapshot taken")
|
|
print(" - application/data backup taken")
|
|
print(" - public SSH reachable")
|
|
print(" - host keys may change after install")
|
|
print("")
|
|
print("Validated files:")
|
|
print(f" - {context['configuration_path']}")
|
|
print(f" - {context['disko_path']}")
|
|
if context["hardware_path"].exists():
|
|
print(f" - {context['hardware_path']}")
|
|
print(f" - {context['role_id_path']}")
|
|
print(f" - {context['secret_id_path']}")
|
|
if context["colmena_missing"]:
|
|
print("")
|
|
print("colmena host block is missing. Add this before the first deploy:")
|
|
print(build_colmena_host_snippet(Path(context["configuration_path"]).parent.name, context["ip"]))
|
|
|
|
|
|
def ensure_ssh_reachable(ip: str, user: str) -> None:
|
|
ssh_command(
|
|
user,
|
|
ip,
|
|
"true",
|
|
next_fix="Check public SSH reachability, host keys, and the target user before running nixos-anywhere.",
|
|
)
|
|
|
|
|
|
def ssh_command(
|
|
user: str,
|
|
ip: str,
|
|
remote_command: str,
|
|
*,
|
|
check: bool = True,
|
|
next_fix: str | None = None,
|
|
) -> subprocess.CompletedProcess[str]:
|
|
return run_command(
|
|
[
|
|
"ssh",
|
|
"-o",
|
|
"BatchMode=yes",
|
|
"-o",
|
|
"ConnectTimeout=10",
|
|
f"{user}@{ip}",
|
|
remote_command,
|
|
],
|
|
check=check,
|
|
next_fix=next_fix or "Check SSH reachability and authentication before retrying.",
|
|
)
|
|
|
|
|
|
def classify_systemd_status(result: subprocess.CompletedProcess[str]) -> str:
|
|
text = f"{result.stdout}\n{result.stderr}".lower()
|
|
if "active (running)" in text or "active (exited)" in text:
|
|
return "active"
|
|
if "failed" in text:
|
|
return "failed"
|
|
if "inactive" in text:
|
|
return "inactive"
|
|
return "unknown"
|
|
|
|
|
|
def infer_verify_failures(
|
|
service_results: dict[str, subprocess.CompletedProcess[str]],
|
|
tailscale_status: subprocess.CompletedProcess[str],
|
|
) -> list[str]:
|
|
messages: list[str] = []
|
|
combined = "\n".join(
|
|
(result.stdout or "") + "\n" + (result.stderr or "")
|
|
for result in [*service_results.values(), tailscale_status]
|
|
).lower()
|
|
|
|
if any(path in combined for path in ["openbao-approle-role-id", "openbao-approle-secret-id", "no such file"]):
|
|
messages.append("Missing AppRole files on the host. Check /var/lib/nodeiwest/openbao-approle-role-id and ...secret-id.")
|
|
if any(fragment in combined for fragment in ["invalid secret id", "permission denied", "approle", "failed to authenticate"]):
|
|
messages.append("OpenBao AppRole authentication failed. Re-check the role, secret_id, namespace, and auth mount.")
|
|
if any(fragment in combined for fragment in ["auth_key", "timed out waiting for rendered tailscale auth key", "no data", "secret path"]):
|
|
messages.append("OpenBao rendered no Tailscale auth key. Check the secret path, KV mount path, and auth_key field.")
|
|
if tailscale_status.returncode != 0 or "logged out" in (tailscale_status.stdout or "").lower():
|
|
messages.append("Tailscale autoconnect is blocked. Check tailscaled-autoconnect, the rendered auth key, and outbound access to Tailscale.")
|
|
|
|
deduped: list[str] = []
|
|
for message in messages:
|
|
if message not in deduped:
|
|
deduped.append(message)
|
|
return deduped
|
|
|
|
|
|
def summarize_text(text: str, lines: int) -> str:
|
|
cleaned = [line.rstrip() for line in text.splitlines() if line.strip()]
|
|
return "\n".join(cleaned[:lines])
|
|
|
|
|
|
def resolve_path(repo_root: Path, value: str) -> Path:
|
|
path = Path(value)
|
|
return path if path.is_absolute() else (repo_root / path)
|
|
|
|
|
|
def parse_on_off(value: str | None, default: bool) -> bool:
|
|
if value is None:
|
|
return default
|
|
return value == "on"
|
|
|
|
|
|
def confirm(prompt: str) -> bool:
|
|
answer = input(prompt).strip().lower()
|
|
return answer in {"y", "yes"}
|
|
|
|
|
|
def extract_single_match(
|
|
text: str,
|
|
pattern: str,
|
|
path: Path,
|
|
label: str,
|
|
*,
|
|
flags: int = 0,
|
|
) -> str:
|
|
match = re.search(pattern, text, flags)
|
|
if not match:
|
|
raise NodeiwestError(f"Could not parse {label} from {path}; manual intervention is required.")
|
|
return match.group(1)
|
|
|
|
|
|
def extract_optional_match(text: str, pattern: str, *, flags: int = re.S) -> str | None:
|
|
match = re.search(pattern, text, flags)
|
|
return match.group(1) if match else None
|
|
|
|
|
|
def extract_nix_string_list(text: str, pattern: str, path: Path) -> list[str]:
|
|
match = re.search(pattern, text, re.S)
|
|
if not match:
|
|
raise NodeiwestError(f"Could not parse nodeiwest.ssh.userCAPublicKeys from {path}.")
|
|
values = re.findall(r'"((?:[^"\\]|\\.)*)"', match.group(1))
|
|
return [value.replace('\\"', '"').replace("\\\\", "\\") for value in values]
|
|
|
|
|
|
def run_command(
|
|
command: list[str],
|
|
*,
|
|
cwd: Path | None = None,
|
|
env: dict[str, str] | None = None,
|
|
check: bool = True,
|
|
next_fix: str | None = None,
|
|
) -> subprocess.CompletedProcess[str]:
|
|
merged_env = os.environ.copy()
|
|
if env:
|
|
merged_env.update(env)
|
|
result = subprocess.run(
|
|
command,
|
|
cwd=str(cwd) if cwd else None,
|
|
env=merged_env,
|
|
text=True,
|
|
capture_output=True,
|
|
)
|
|
if check and result.returncode != 0:
|
|
raise NodeiwestError(format_command_failure(command, result, next_fix))
|
|
return result
|
|
|
|
|
|
def stream_command(
|
|
command: list[str],
|
|
*,
|
|
cwd: Path | None = None,
|
|
env: dict[str, str] | None = None,
|
|
next_fix: str | None = None,
|
|
) -> None:
|
|
merged_env = os.environ.copy()
|
|
if env:
|
|
merged_env.update(env)
|
|
process = subprocess.Popen(
|
|
command,
|
|
cwd=str(cwd) if cwd else None,
|
|
env=merged_env,
|
|
)
|
|
return_code = process.wait()
|
|
if return_code != 0:
|
|
raise NodeiwestError(
|
|
f"Command failed: {shlex.join(command)}\nExit code: {return_code}\n"
|
|
+ (f"Next likely fix: {next_fix}" if next_fix else "")
|
|
)
|
|
|
|
|
|
def format_command_failure(
|
|
command: list[str],
|
|
result: subprocess.CompletedProcess[str],
|
|
next_fix: str | None,
|
|
) -> str:
|
|
pieces = [
|
|
f"Command failed: {shlex.join(command)}",
|
|
f"Exit code: {result.returncode}",
|
|
]
|
|
stdout = summarize_text(result.stdout or "", 20)
|
|
stderr = summarize_text(result.stderr or "", 20)
|
|
if stdout:
|
|
pieces.append(f"stdout:\n{stdout}")
|
|
if stderr:
|
|
pieces.append(f"stderr:\n{stderr}")
|
|
if next_fix:
|
|
pieces.append(f"Next likely fix: {next_fix}")
|
|
return "\n".join(pieces)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|