More test coverage

This commit is contained in:
Miguel Jacq 2026-01-05 14:27:56 +11:00
parent 24cedc8c8d
commit e68ec0bffc
Signed by: mig5
GPG key ID: 59B3F0C24135C6A9
12 changed files with 1650 additions and 381 deletions

View file

@ -1,7 +1,12 @@
import json
from pathlib import Path
from enroll.manifest import manifest
import os
import stat
import tarfile
import pytest
import enroll.manifest as manifest
def test_manifest_writes_roles_and_playbook_with_clean_when(tmp_path: Path):
@ -176,7 +181,7 @@ def test_manifest_writes_roles_and_playbook_with_clean_when(tmp_path: Path):
bundle / "artifacts" / "usr_local_custom" / "usr" / "local" / "bin" / "myscript"
).write_text("#!/bin/sh\necho hi\n", encoding="utf-8")
manifest(str(bundle), str(out))
manifest.manifest(str(bundle), str(out))
# Service role: systemd management should be gated on foo_manage_unit and a probe.
tasks = (out / "roles" / "foo" / "tasks" / "main.yml").read_text(encoding="utf-8")
@ -345,7 +350,7 @@ def test_manifest_site_mode_creates_host_inventory_and_raw_files(tmp_path: Path)
/ "myapp.conf"
).write_text("myapp=1\n", encoding="utf-8")
manifest(str(bundle), str(out), fqdn=fqdn)
manifest.manifest(str(bundle), str(out), fqdn=fqdn)
# Host playbook exists.
assert (out / "playbooks" / f"{fqdn}.yml").exists()
@ -482,7 +487,7 @@ def test_manifest_includes_dnf_config_role_when_present(tmp_path: Path):
bundle.mkdir(parents=True, exist_ok=True)
(bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8")
manifest(str(bundle), str(out))
manifest.manifest(str(bundle), str(out))
pb = (out / "playbook.yml").read_text(encoding="utf-8")
assert "- dnf_config" in pb
@ -502,3 +507,291 @@ def test_render_install_packages_tasks_contains_dnf_branch():
assert "ansible.builtin.dnf" in txt
assert "ansible.builtin.package" in txt
assert "pkg_mgr" in txt
def test_manifest_orders_cron_and_logrotate_at_playbook_tail(tmp_path: Path):
"""Cron/logrotate roles should appear at the end.
The cron role may restore per-user crontabs under /var/spool, so it should
run after users have been created.
"""
bundle = tmp_path / "bundle"
out = tmp_path / "ansible"
state = {
"schema_version": 3,
"host": {"hostname": "test", "os": "debian", "pkg_backend": "dpkg"},
"inventory": {"packages": {}},
"roles": {
"users": {
"role_name": "users",
"users": [{"name": "alice"}],
"managed_files": [],
"excluded": [],
"notes": [],
},
"services": [],
"packages": [
{
"package": "curl",
"role_name": "curl",
"managed_files": [],
"excluded": [],
"notes": [],
},
{
"package": "cron",
"role_name": "cron",
"managed_files": [
{
"path": "/var/spool/cron/crontabs/alice",
"src_rel": "var/spool/cron/crontabs/alice",
"owner": "alice",
"group": "root",
"mode": "0600",
"reason": "system_cron",
}
],
"excluded": [],
"notes": [],
},
{
"package": "logrotate",
"role_name": "logrotate",
"managed_files": [
{
"path": "/etc/logrotate.conf",
"src_rel": "etc/logrotate.conf",
"owner": "root",
"group": "root",
"mode": "0644",
"reason": "system_logrotate",
}
],
"excluded": [],
"notes": [],
},
],
"apt_config": {
"role_name": "apt_config",
"managed_files": [],
"excluded": [],
"notes": [],
},
"dnf_config": {
"role_name": "dnf_config",
"managed_files": [],
"excluded": [],
"notes": [],
},
"etc_custom": {
"role_name": "etc_custom",
"managed_files": [],
"excluded": [],
"notes": [],
},
"usr_local_custom": {
"role_name": "usr_local_custom",
"managed_files": [],
"excluded": [],
"notes": [],
},
"extra_paths": {
"role_name": "extra_paths",
"include_patterns": [],
"exclude_patterns": [],
"managed_files": [],
"excluded": [],
"notes": [],
},
},
}
# Minimal artifacts for managed files.
(bundle / "artifacts" / "cron" / "var" / "spool" / "cron" / "crontabs").mkdir(
parents=True, exist_ok=True
)
(
bundle / "artifacts" / "cron" / "var" / "spool" / "cron" / "crontabs" / "alice"
).write_text("@daily echo hi\n", encoding="utf-8")
(bundle / "artifacts" / "logrotate" / "etc").mkdir(parents=True, exist_ok=True)
(bundle / "artifacts" / "logrotate" / "etc" / "logrotate.conf").write_text(
"weekly\n", encoding="utf-8"
)
bundle.mkdir(parents=True, exist_ok=True)
(bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8")
manifest.manifest(str(bundle), str(out))
pb = (out / "playbook.yml").read_text(encoding="utf-8").splitlines()
# Roles are emitted as indented list items under the `roles:` key.
roles = [
ln.strip().removeprefix("- ").strip() for ln in pb if ln.startswith(" - ")
]
# Ensure tail ordering.
assert roles[-2:] == ["cron", "logrotate"]
assert "users" in roles
assert roles.index("users") < roles.index("cron")
def test_yaml_helpers_fallback_when_yaml_unavailable(monkeypatch):
monkeypatch.setattr(manifest, "_try_yaml", lambda: None)
assert manifest._yaml_load_mapping("foo: 1\n") == {}
out = manifest._yaml_dump_mapping({"b": 2, "a": 1})
# Best-effort fallback is key: repr(value)
assert out.splitlines()[0].startswith("a: ")
assert out.endswith("\n")
def test_copy2_replace_makes_readonly_sources_user_writable(
monkeypatch, tmp_path: Path
):
src = tmp_path / "src.txt"
dst = tmp_path / "dst.txt"
src.write_text("hello", encoding="utf-8")
# Make source read-only; copy2 preserves mode, so tmp will be read-only too.
os.chmod(src, 0o444)
manifest._copy2_replace(str(src), str(dst))
st = os.stat(dst, follow_symlinks=False)
assert stat.S_IMODE(st.st_mode) & stat.S_IWUSR
def test_prepare_bundle_dir_sops_decrypts_and_extracts(monkeypatch, tmp_path: Path):
enc = tmp_path / "harvest.tar.gz.sops"
enc.write_text("ignored", encoding="utf-8")
def fake_require():
return None
def fake_decrypt(src: str, dst: str, *, mode: int = 0o600):
# Create a minimal tar.gz with a state.json file.
with tarfile.open(dst, "w:gz") as tf:
p = tmp_path / "state.json"
p.write_text("{}", encoding="utf-8")
tf.add(p, arcname="state.json")
monkeypatch.setattr(manifest, "require_sops_cmd", fake_require)
monkeypatch.setattr(manifest, "decrypt_file_binary_to", fake_decrypt)
bundle_dir, td = manifest._prepare_bundle_dir(str(enc), sops_mode=True)
try:
assert (Path(bundle_dir) / "state.json").exists()
finally:
td.cleanup()
def test_prepare_bundle_dir_rejects_non_dir_without_sops(tmp_path: Path):
fp = tmp_path / "bundle.tar.gz"
fp.write_text("x", encoding="utf-8")
with pytest.raises(RuntimeError):
manifest._prepare_bundle_dir(str(fp), sops_mode=False)
def test_tar_dir_to_with_progress_writes_progress_when_tty(monkeypatch, tmp_path: Path):
src = tmp_path / "dir"
src.mkdir()
(src / "a.txt").write_text("a", encoding="utf-8")
(src / "b.txt").write_text("b", encoding="utf-8")
out = tmp_path / "out.tar.gz"
writes: list[bytes] = []
monkeypatch.setattr(manifest.os, "isatty", lambda fd: True)
monkeypatch.setattr(manifest.os, "write", lambda fd, b: writes.append(b) or len(b))
manifest._tar_dir_to_with_progress(str(src), str(out), desc="tarring")
assert out.exists()
assert writes # progress was written
assert writes[-1].endswith(b"\n")
def test_encrypt_manifest_out_dir_to_sops_handles_missing_tmp_cleanup(
monkeypatch, tmp_path: Path
):
src_dir = tmp_path / "manifest"
src_dir.mkdir()
(src_dir / "x.txt").write_text("x", encoding="utf-8")
out = tmp_path / "manifest.tar.gz.sops"
monkeypatch.setattr(manifest, "require_sops_cmd", lambda: None)
def fake_encrypt(in_fp, out_fp, *args, **kwargs):
Path(out_fp).write_text("enc", encoding="utf-8")
monkeypatch.setattr(manifest, "encrypt_file_binary", fake_encrypt)
# Simulate race where tmp tar is already removed.
monkeypatch.setattr(
manifest.os, "unlink", lambda p: (_ for _ in ()).throw(FileNotFoundError())
)
res = manifest._encrypt_manifest_out_dir_to_sops(str(src_dir), str(out), ["ABC"]) # type: ignore[arg-type]
assert str(res).endswith(".sops")
assert out.exists()
def test_manifest_applies_jinjaturtle_to_jinjifyable_managed_file(
monkeypatch, tmp_path: Path
):
# Create a minimal bundle with just an apt_config snapshot.
bundle = tmp_path / "bundle"
(bundle / "artifacts" / "apt_config" / "etc" / "apt").mkdir(parents=True)
(bundle / "artifacts" / "apt_config" / "etc" / "apt" / "foo.ini").write_text(
"key=VALUE\n", encoding="utf-8"
)
state = {
"schema_version": 1,
"inventory": {"packages": {}},
"roles": {
"services": [],
"packages": [],
"apt_config": {
"role_name": "apt_config",
"managed_files": [
{
"path": "/etc/apt/foo.ini",
"src_rel": "etc/apt/foo.ini",
"owner": "root",
"group": "root",
"mode": "0644",
"reason": "apt_config",
}
],
"managed_dirs": [],
"excluded": [],
"notes": [],
},
},
}
(bundle / "state.json").write_text(
__import__("json").dumps(state), encoding="utf-8"
)
monkeypatch.setattr(manifest, "find_jinjaturtle_cmd", lambda: "jinjaturtle")
class _Res:
template_text = "key={{ foo }}\n"
vars_text = "foo: 123\n"
monkeypatch.setattr(manifest, "run_jinjaturtle", lambda *a, **k: _Res())
out_dir = tmp_path / "out"
manifest.manifest(str(bundle), str(out_dir), jinjaturtle="on")
tmpl = out_dir / "roles" / "apt_config" / "templates" / "etc" / "apt" / "foo.ini.j2"
assert tmpl.exists()
assert "{{ foo }}" in tmpl.read_text(encoding="utf-8")
defaults = out_dir / "roles" / "apt_config" / "defaults" / "main.yml"
txt = defaults.read_text(encoding="utf-8")
assert "foo: 123" in txt
# Non-templated file should not exist under files/.
assert not (
out_dir / "roles" / "apt_config" / "files" / "etc" / "apt" / "foo.ini"
).exists()