feat: move things around
This commit is contained in:
@@ -1,114 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib.util
|
||||
import sys
|
||||
import unittest
|
||||
from unittest import mock
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parents[3]
|
||||
CLI_PATH = REPO_ROOT / "pkgs" / "helpers" / "cli.py"
|
||||
|
||||
spec = importlib.util.spec_from_file_location("nodeiwest_cli", CLI_PATH)
|
||||
cli = importlib.util.module_from_spec(spec)
|
||||
assert spec.loader is not None
|
||||
sys.modules[spec.name] = cli
|
||||
spec.loader.exec_module(cli)
|
||||
|
||||
|
||||
class HelperCliTests(unittest.TestCase):
|
||||
def test_format_activity_frame_highlights_one_block_and_keeps_label(self) -> None:
|
||||
frame = cli.format_activity_frame("Executing install", 2)
|
||||
|
||||
self.assertIn("Executing install", frame)
|
||||
self.assertEqual(frame.count("█"), 4)
|
||||
self.assertEqual(frame.count("[38;5;220m"), 1)
|
||||
self.assertEqual(frame.count("[38;5;208m"), 3)
|
||||
|
||||
def test_supports_ansi_status_requires_tty_and_real_term(self) -> None:
|
||||
tty_stream = mock.Mock()
|
||||
tty_stream.isatty.return_value = True
|
||||
dumb_stream = mock.Mock()
|
||||
dumb_stream.isatty.return_value = True
|
||||
pipe_stream = mock.Mock()
|
||||
pipe_stream.isatty.return_value = False
|
||||
|
||||
with mock.patch.dict(cli.os.environ, {"TERM": "xterm-256color"}, clear=False):
|
||||
self.assertTrue(cli.supports_ansi_status(tty_stream))
|
||||
self.assertFalse(cli.supports_ansi_status(pipe_stream))
|
||||
|
||||
with mock.patch.dict(cli.os.environ, {"TERM": "dumb"}, clear=False):
|
||||
self.assertFalse(cli.supports_ansi_status(dumb_stream))
|
||||
|
||||
def test_disk_from_device_supports_sd_and_nvme(self) -> None:
|
||||
self.assertEqual(cli.disk_from_device("/dev/sda2"), "/dev/sda")
|
||||
self.assertEqual(cli.disk_from_device("/dev/nvme0n1p2"), "/dev/nvme0n1")
|
||||
|
||||
def test_lookup_colmena_target_host_reads_existing_inventory(self) -> None:
|
||||
flake_text = (REPO_ROOT / "flake.nix").read_text()
|
||||
self.assertEqual(cli.lookup_colmena_target_host(flake_text, "vps1"), "100.101.167.118")
|
||||
|
||||
def test_parse_existing_vps1_configuration(self) -> None:
|
||||
configuration = cli.parse_existing_configuration(REPO_ROOT / "hosts" / "vps1" / "configuration.nix")
|
||||
self.assertEqual(configuration.host_name, "vps1")
|
||||
self.assertEqual(configuration.boot_mode, "uefi")
|
||||
self.assertTrue(configuration.tailscale_openbao)
|
||||
self.assertEqual(configuration.state_version, "25.05")
|
||||
self.assertTrue(configuration.user_ca_public_keys)
|
||||
|
||||
def test_parse_existing_vps1_disko(self) -> None:
|
||||
disko = cli.parse_existing_disko(REPO_ROOT / "hosts" / "vps1" / "disko.nix")
|
||||
self.assertEqual(disko.disk_device, "/dev/sda")
|
||||
self.assertEqual(disko.boot_mode, "uefi")
|
||||
self.assertEqual(disko.swap_size, "4G")
|
||||
|
||||
def test_render_bios_disko_uses_bios_partition(self) -> None:
|
||||
rendered = cli.render_disko(boot_mode="bios", disk_device="/dev/vda", swap_size="8G")
|
||||
self.assertIn('type = "EF02";', rendered)
|
||||
self.assertIn('device = lib.mkDefault "/dev/vda";', rendered)
|
||||
self.assertIn('size = "8G";', rendered)
|
||||
|
||||
def test_parse_lsblk_output_reads_pairs_without_smearing_columns(self) -> None:
|
||||
output = (
|
||||
'NAME="sda" SIZE="11G" TYPE="disk" MODEL="QEMU HARDDISK" FSTYPE="" PTTYPE="gpt" MOUNTPOINTS=""\n'
|
||||
'NAME="sda1" SIZE="512M" TYPE="part" MODEL="" FSTYPE="vfat" PTTYPE="" MOUNTPOINTS="/boot"\n'
|
||||
)
|
||||
rows = cli.parse_lsblk_output(output)
|
||||
|
||||
self.assertEqual(rows[0]["NAME"], "sda")
|
||||
self.assertEqual(rows[0]["SIZE"], "11G")
|
||||
self.assertEqual(rows[0]["MODEL"], "QEMU HARDDISK")
|
||||
self.assertEqual(rows[1]["NAME"], "sda1")
|
||||
self.assertEqual(rows[1]["MOUNTPOINTS"], "/boot")
|
||||
|
||||
def test_normalize_swap_size_accepts_gib_suffix(self) -> None:
|
||||
self.assertEqual(cli.normalize_swap_size("4GiB"), "4G")
|
||||
self.assertEqual(cli.normalize_swap_size("512MiB"), "512M")
|
||||
self.assertEqual(cli.normalize_swap_size("8G"), "8G")
|
||||
|
||||
def test_bao_kv_get_uses_explicit_kv_mount(self) -> None:
|
||||
completed = mock.Mock()
|
||||
completed.stdout = '{"data": {"data": {"CLIENT_ID": "x"}}}'
|
||||
with mock.patch.object(cli, "run_command", return_value=completed) as run_command:
|
||||
data = cli.bao_kv_get("it", "kv", "tailscale")
|
||||
|
||||
self.assertEqual(data["data"]["data"]["CLIENT_ID"], "x")
|
||||
command = run_command.call_args.args[0]
|
||||
self.assertEqual(command, ["bao", "kv", "get", "-mount=kv", "-format=json", "tailscale"])
|
||||
self.assertEqual(run_command.call_args.kwargs["env"], {"BAO_NAMESPACE": "it"})
|
||||
|
||||
def test_derive_openbao_policy_uses_explicit_kv_mount(self) -> None:
|
||||
completed = mock.Mock()
|
||||
completed.stdout = 'path "kv/data/tailscale" { capabilities = ["read"] }\n'
|
||||
with mock.patch.object(cli, "run_command", return_value=completed) as run_command:
|
||||
policy = cli.derive_openbao_policy("it", "kv", "tailscale")
|
||||
|
||||
self.assertIn('path "kv/data/tailscale"', policy)
|
||||
command = run_command.call_args.args[0]
|
||||
self.assertEqual(command, ["bao", "kv", "get", "-mount=kv", "-output-policy", "tailscale"])
|
||||
self.assertEqual(run_command.call_args.kwargs["env"], {"BAO_NAMESPACE": "it"})
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user