enroll/tests/test_cli.py
2026-01-05 14:27:56 +11:00

688 lines
19 KiB
Python

from __future__ import annotations
import sys
import pytest
import enroll.cli as cli
from pathlib import Path
from enroll.remote import RemoteSudoPasswordRequired
from enroll.sopsutil import SopsError
def test_cli_harvest_subcommand_calls_harvest(monkeypatch, capsys, tmp_path):
called = {}
def fake_harvest(
out: str,
dangerous: bool = False,
include_paths=None,
exclude_paths=None,
**_kwargs,
):
called["out"] = out
called["dangerous"] = dangerous
called["include_paths"] = include_paths or []
called["exclude_paths"] = exclude_paths or []
return str(tmp_path / "state.json")
monkeypatch.setattr(cli, "harvest", fake_harvest)
monkeypatch.setattr(sys, "argv", ["enroll", "harvest", "--out", str(tmp_path)])
cli.main()
assert called["out"] == str(tmp_path)
assert called["dangerous"] is False
assert called["include_paths"] == []
assert called["exclude_paths"] == []
captured = capsys.readouterr()
assert str(tmp_path / "state.json") in captured.out
def test_cli_manifest_subcommand_calls_manifest(monkeypatch, tmp_path):
called = {}
def fake_manifest(harvest_dir: str, out_dir: str, **kwargs):
called["harvest"] = harvest_dir
called["out"] = out_dir
# Common manifest args should be passed through by the CLI.
called["fqdn"] = kwargs.get("fqdn")
called["jinjaturtle"] = kwargs.get("jinjaturtle")
monkeypatch.setattr(cli, "manifest", fake_manifest)
monkeypatch.setattr(
sys,
"argv",
[
"enroll",
"manifest",
"--harvest",
str(tmp_path / "bundle"),
"--out",
str(tmp_path / "ansible"),
],
)
cli.main()
assert called["harvest"] == str(tmp_path / "bundle")
assert called["out"] == str(tmp_path / "ansible")
assert called["fqdn"] is None
assert called["jinjaturtle"] == "auto"
def test_cli_enroll_subcommand_runs_harvest_then_manifest(monkeypatch, tmp_path):
calls = []
def fake_harvest(
bundle_dir: str,
dangerous: bool = False,
include_paths=None,
exclude_paths=None,
**_kwargs,
):
calls.append(
("harvest", bundle_dir, dangerous, include_paths or [], exclude_paths or [])
)
return str(tmp_path / "bundle" / "state.json")
def fake_manifest(bundle_dir: str, out_dir: str, **kwargs):
calls.append(
(
"manifest",
bundle_dir,
out_dir,
kwargs.get("fqdn"),
kwargs.get("jinjaturtle"),
)
)
monkeypatch.setattr(cli, "harvest", fake_harvest)
monkeypatch.setattr(cli, "manifest", fake_manifest)
monkeypatch.setattr(
sys,
"argv",
[
"enroll",
"single-shot",
"--harvest",
str(tmp_path / "bundle"),
"--out",
str(tmp_path / "ansible"),
],
)
cli.main()
assert calls == [
("harvest", str(tmp_path / "bundle"), False, [], []),
("manifest", str(tmp_path / "bundle"), str(tmp_path / "ansible"), None, "auto"),
]
def test_cli_harvest_dangerous_flag_is_forwarded(monkeypatch, tmp_path):
called = {}
def fake_harvest(
out: str,
dangerous: bool = False,
include_paths=None,
exclude_paths=None,
**_kwargs,
):
called["out"] = out
called["dangerous"] = dangerous
called["include_paths"] = include_paths or []
called["exclude_paths"] = exclude_paths or []
return str(tmp_path / "state.json")
monkeypatch.setattr(cli, "harvest", fake_harvest)
monkeypatch.setattr(
sys, "argv", ["enroll", "harvest", "--out", str(tmp_path), "--dangerous"]
)
cli.main()
assert called["dangerous"] is True
assert called["include_paths"] == []
assert called["exclude_paths"] == []
def test_cli_harvest_remote_calls_remote_harvest_and_uses_cache_dir(
monkeypatch, capsys, tmp_path
):
from enroll.cache import HarvestCache
cache_dir = tmp_path / "cache"
cache_dir.mkdir()
called = {}
def fake_cache_dir(*, hint=None):
called["hint"] = hint
return HarvestCache(dir=cache_dir)
def fake_remote_harvest(
*,
local_out_dir,
remote_host,
remote_port,
remote_user,
dangerous,
no_sudo,
include_paths=None,
exclude_paths=None,
**_kwargs,
):
called.update(
{
"local_out_dir": local_out_dir,
"remote_host": remote_host,
"remote_port": remote_port,
"remote_user": remote_user,
"dangerous": dangerous,
"no_sudo": no_sudo,
"include_paths": include_paths or [],
"exclude_paths": exclude_paths or [],
}
)
return cache_dir / "state.json"
monkeypatch.setattr(cli, "new_harvest_cache_dir", fake_cache_dir)
monkeypatch.setattr(cli, "remote_harvest", fake_remote_harvest)
monkeypatch.setattr(
sys,
"argv",
[
"enroll",
"harvest",
"--remote-host",
"example.test",
"--remote-user",
"alice",
],
)
cli.main()
out = capsys.readouterr().out
assert str(cache_dir / "state.json") in out
assert called["hint"] == "example.test"
assert called["local_out_dir"] == cache_dir
assert called["remote_host"] == "example.test"
assert called["remote_port"] == 22
assert called["remote_user"] == "alice"
assert called["dangerous"] is False
assert called["no_sudo"] is False
assert called["include_paths"] == []
assert called["exclude_paths"] == []
def test_cli_single_shot_remote_without_harvest_prints_state_path(
monkeypatch, capsys, tmp_path
):
from enroll.cache import HarvestCache
cache_dir = tmp_path / "cache"
cache_dir.mkdir()
ansible_dir = tmp_path / "ansible"
calls = []
def fake_cache_dir(*, hint=None):
return HarvestCache(dir=cache_dir)
def fake_remote_harvest(**kwargs):
calls.append(("remote_harvest", kwargs))
return cache_dir / "state.json"
def fake_manifest(harvest_dir: str, out_dir: str, **kwargs):
calls.append(("manifest", harvest_dir, out_dir, kwargs.get("fqdn")))
monkeypatch.setattr(cli, "new_harvest_cache_dir", fake_cache_dir)
monkeypatch.setattr(cli, "remote_harvest", fake_remote_harvest)
monkeypatch.setattr(cli, "manifest", fake_manifest)
monkeypatch.setattr(
sys,
"argv",
[
"enroll",
"single-shot",
"--remote-host",
"example.test",
"--remote-user",
"alice",
"--out",
str(ansible_dir),
"--fqdn",
"example.test",
],
)
cli.main()
out = capsys.readouterr().out
# It should print the derived state.json path for usability when --harvest
# wasn't provided.
assert str(cache_dir / "state.json") in out
# And it should manifest using the cache dir.
assert ("manifest", str(cache_dir), str(ansible_dir), "example.test") in calls
def test_cli_harvest_remote_ask_become_pass_prompts_and_passes_password(
monkeypatch, tmp_path
):
from enroll.cache import HarvestCache
import enroll.remote as r
cache_dir = tmp_path / "cache"
cache_dir.mkdir()
called = {}
def fake_cache_dir(*, hint=None):
return HarvestCache(dir=cache_dir)
def fake__remote_harvest(*, sudo_password=None, **kwargs):
called["sudo_password"] = sudo_password
return cache_dir / "state.json"
monkeypatch.setattr(cli, "new_harvest_cache_dir", fake_cache_dir)
monkeypatch.setattr(r, "_remote_harvest", fake__remote_harvest)
monkeypatch.setattr(r.getpass, "getpass", lambda _prompt="": "pw123")
monkeypatch.setattr(
sys,
"argv",
[
"enroll",
"harvest",
"--remote-host",
"example.test",
"--ask-become-pass",
],
)
cli.main()
assert called["sudo_password"] == "pw123"
def test_cli_harvest_remote_password_required_fallback_prompts_and_retries(
monkeypatch, tmp_path
):
from enroll.cache import HarvestCache
import enroll.remote as r
cache_dir = tmp_path / "cache"
cache_dir.mkdir()
def fake_cache_dir(*, hint=None):
return HarvestCache(dir=cache_dir)
calls = []
def fake__remote_harvest(*, sudo_password=None, **kwargs):
calls.append(sudo_password)
if sudo_password is None:
raise r.RemoteSudoPasswordRequired("pw required")
return cache_dir / "state.json"
class _TTYStdin:
def isatty(self):
return True
monkeypatch.setattr(cli, "new_harvest_cache_dir", fake_cache_dir)
monkeypatch.setattr(r, "_remote_harvest", fake__remote_harvest)
monkeypatch.setattr(r.getpass, "getpass", lambda _prompt="": "pw456")
monkeypatch.setattr(sys, "stdin", _TTYStdin())
monkeypatch.setattr(
sys, "argv", ["enroll", "harvest", "--remote-host", "example.test"]
)
cli.main()
assert calls == [None, "pw456"]
def test_cli_harvest_remote_password_required_noninteractive_errors(
monkeypatch, tmp_path
):
from enroll.cache import HarvestCache
import enroll.remote as r
cache_dir = tmp_path / "cache"
cache_dir.mkdir()
def fake_cache_dir(*, hint=None):
return HarvestCache(dir=cache_dir)
def fake__remote_harvest(*, sudo_password=None, **kwargs):
raise r.RemoteSudoPasswordRequired("pw required")
class _NoTTYStdin:
def isatty(self):
return False
monkeypatch.setattr(cli, "new_harvest_cache_dir", fake_cache_dir)
monkeypatch.setattr(r, "_remote_harvest", fake__remote_harvest)
monkeypatch.setattr(sys, "stdin", _NoTTYStdin())
monkeypatch.setattr(
sys, "argv", ["enroll", "harvest", "--remote-host", "example.test"]
)
with pytest.raises(SystemExit) as e:
cli.main()
assert "--ask-become-pass" in str(e.value)
def test_cli_manifest_common_args(monkeypatch, tmp_path):
"""Ensure --fqdn and jinjaturtle mode flags are forwarded correctly."""
called = {}
def fake_manifest(harvest_dir: str, out_dir: str, **kwargs):
called["harvest"] = harvest_dir
called["out"] = out_dir
called["fqdn"] = kwargs.get("fqdn")
called["jinjaturtle"] = kwargs.get("jinjaturtle")
monkeypatch.setattr(cli, "manifest", fake_manifest)
monkeypatch.setattr(
sys,
"argv",
[
"enroll",
"manifest",
"--harvest",
str(tmp_path / "bundle"),
"--out",
str(tmp_path / "ansible"),
"--fqdn",
"example.test",
"--no-jinjaturtle",
],
)
cli.main()
assert called["fqdn"] == "example.test"
assert called["jinjaturtle"] == "off"
def test_cli_explain_passes_args_and_writes_stdout(monkeypatch, capsys, tmp_path):
called = {}
def fake_explain_state(
harvest: str,
*,
sops_mode: bool = False,
fmt: str = "text",
max_examples: int = 3,
):
called["harvest"] = harvest
called["sops_mode"] = sops_mode
called["fmt"] = fmt
called["max_examples"] = max_examples
return "EXPLAINED\n"
monkeypatch.setattr(cli, "explain_state", fake_explain_state)
monkeypatch.setattr(
sys,
"argv",
[
"enroll",
"explain",
"--sops",
"--format",
"json",
"--max-examples",
"7",
str(tmp_path / "bundle" / "state.json"),
],
)
cli.main()
out = capsys.readouterr().out
assert out == "EXPLAINED\n"
assert called["sops_mode"] is True
assert called["fmt"] == "json"
assert called["max_examples"] == 7
def test_discover_config_path_missing_config_value_returns_none(monkeypatch):
# Covers the "--config" flag present with no value.
monkeypatch.delenv("ENROLL_CONFIG", raising=False)
monkeypatch.delenv("XDG_CONFIG_HOME", raising=False)
assert cli._discover_config_path(["--config"]) is None
def test_discover_config_path_defaults_to_home_config(monkeypatch, tmp_path: Path):
# Covers the Path.home() / ".config" fallback.
monkeypatch.delenv("ENROLL_CONFIG", raising=False)
monkeypatch.delenv("XDG_CONFIG_HOME", raising=False)
monkeypatch.setattr(cli.Path, "home", lambda: tmp_path)
monkeypatch.setattr(cli.Path, "cwd", lambda: tmp_path)
cp = tmp_path / ".config" / "enroll" / "enroll.ini"
cp.parent.mkdir(parents=True)
cp.write_text("[enroll]\n", encoding="utf-8")
assert cli._discover_config_path(["harvest"]) == cp
def test_cli_harvest_local_sops_encrypts_and_prints_path(
monkeypatch, tmp_path: Path, capsys
):
out_dir = tmp_path / "out"
out_dir.mkdir()
calls: dict[str, object] = {}
def fake_harvest(bundle_dir: str, **kwargs):
calls["bundle"] = bundle_dir
# Create a minimal state.json so tooling that expects it won't break.
Path(bundle_dir).mkdir(parents=True, exist_ok=True)
(Path(bundle_dir) / "state.json").write_text("{}", encoding="utf-8")
return str(Path(bundle_dir) / "state.json")
def fake_encrypt(bundle_dir: Path, out_file: Path, fps: list[str]):
calls["encrypt"] = (bundle_dir, out_file, fps)
out_file.write_text("encrypted", encoding="utf-8")
return out_file
monkeypatch.setattr(cli, "harvest", fake_harvest)
monkeypatch.setattr(cli, "_encrypt_harvest_dir_to_sops", fake_encrypt)
monkeypatch.setattr(
sys,
"argv",
[
"enroll",
"harvest",
"--sops",
"ABCDEF",
"--out",
str(out_dir),
],
)
cli.main()
printed = capsys.readouterr().out.strip()
assert printed.endswith("harvest.tar.gz.sops")
assert Path(printed).exists()
assert calls.get("encrypt")
def test_cli_harvest_remote_sops_encrypts_and_prints_path(
monkeypatch, tmp_path: Path, capsys
):
out_dir = tmp_path / "out"
out_dir.mkdir()
calls: dict[str, object] = {}
def fake_remote_harvest(**kwargs):
calls["remote"] = kwargs
# Create a minimal state.json in the temp bundle.
out = Path(kwargs["local_out_dir"]) / "state.json"
out.write_text("{}", encoding="utf-8")
return out
def fake_encrypt(bundle_dir: Path, out_file: Path, fps: list[str]):
calls["encrypt"] = (bundle_dir, out_file, fps)
out_file.write_text("encrypted", encoding="utf-8")
return out_file
monkeypatch.setattr(cli, "remote_harvest", fake_remote_harvest)
monkeypatch.setattr(cli, "_encrypt_harvest_dir_to_sops", fake_encrypt)
monkeypatch.setattr(
sys,
"argv",
[
"enroll",
"harvest",
"--remote-host",
"example.com",
"--remote-user",
"root",
"--sops",
"ABCDEF",
"--out",
str(out_dir),
],
)
cli.main()
printed = capsys.readouterr().out.strip()
assert printed.endswith("harvest.tar.gz.sops")
assert Path(printed).exists()
assert calls.get("remote")
assert calls.get("encrypt")
def test_cli_harvest_remote_password_required_exits_cleanly(monkeypatch):
def boom(**kwargs):
raise RemoteSudoPasswordRequired("pw required")
monkeypatch.setattr(cli, "remote_harvest", boom)
monkeypatch.setattr(
sys,
"argv",
[
"enroll",
"harvest",
"--remote-host",
"example.com",
"--remote-user",
"root",
],
)
with pytest.raises(SystemExit) as e:
cli.main()
assert "--ask-become-pass" in str(e.value)
def test_cli_runtime_error_is_wrapped_as_user_friendly_system_exit(monkeypatch):
def boom(*args, **kwargs):
raise RuntimeError("nope")
monkeypatch.setattr(cli, "harvest", boom)
monkeypatch.setattr(sys, "argv", ["enroll", "harvest", "--out", "/tmp/x"])
with pytest.raises(SystemExit) as e:
cli.main()
assert str(e.value) == "error: nope"
def test_cli_sops_error_is_wrapped_as_user_friendly_system_exit(monkeypatch):
def boom(*args, **kwargs):
raise SopsError("sops broke")
monkeypatch.setattr(cli, "manifest", boom)
monkeypatch.setattr(
sys, "argv", ["enroll", "manifest", "--harvest", "/tmp/x", "--out", "/tmp/y"]
)
with pytest.raises(SystemExit) as e:
cli.main()
assert str(e.value) == "error: sops broke"
def test_cli_diff_notifies_webhook_and_email_and_respects_exit_code(
monkeypatch, capsys
):
calls: dict[str, object] = {}
def fake_compare(old, new, sops_mode=False):
calls["compare"] = (old, new, sops_mode)
return {"dummy": True}, True
def fake_format(report, fmt="text"):
calls.setdefault("format", []).append((report, fmt))
return "REPORT\n"
def fake_post(url, body, headers=None):
calls["webhook"] = (url, body, headers)
return 200, b"ok"
def fake_email(**kwargs):
calls["email"] = kwargs
monkeypatch.setattr(cli, "compare_harvests", fake_compare)
monkeypatch.setattr(cli, "format_report", fake_format)
monkeypatch.setattr(cli, "post_webhook", fake_post)
monkeypatch.setattr(cli, "send_email", fake_email)
monkeypatch.setenv("SMTPPW", "secret")
monkeypatch.setattr(
sys,
"argv",
[
"enroll",
"diff",
"--old",
"/tmp/old",
"--new",
"/tmp/new",
"--webhook",
"https://example.invalid/h",
"--webhook-header",
"X-Test: ok",
"--email-to",
"a@example.com",
"--smtp-password-env",
"SMTPPW",
"--exit-code",
],
)
with pytest.raises(SystemExit) as e:
cli.main()
assert e.value.code == 2
assert calls.get("compare")
assert calls.get("webhook")
assert calls.get("email")
# No report printed when exiting via --exit-code? (we still render and print).
_ = capsys.readouterr()
def test_cli_diff_webhook_http_error_raises_system_exit(monkeypatch):
def fake_compare(old, new, sops_mode=False):
return {"dummy": True}, True
monkeypatch.setattr(cli, "compare_harvests", fake_compare)
monkeypatch.setattr(cli, "format_report", lambda report, fmt="text": "R\n")
monkeypatch.setattr(cli, "post_webhook", lambda url, body, headers=None: (500, b""))
monkeypatch.setattr(
sys,
"argv",
[
"enroll",
"diff",
"--old",
"/tmp/old",
"--new",
"/tmp/new",
"--webhook",
"https://example.invalid/h",
],
)
with pytest.raises(SystemExit) as e:
cli.main()
assert "HTTP 500" in str(e.value)