from __future__ import annotations import json from pathlib import Path import yaml from enroll import manifest def _write_state(bundle: Path, state: dict) -> None: bundle.mkdir(parents=True, exist_ok=True) (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") def _sample_state() -> dict: return { "schema_version": 3, "host": {"hostname": "test", "os": "debian", "pkg_backend": "dpkg"}, "inventory": { "packages": {"foo": {"section": "net"}, "curl": {"section": "net"}} }, "roles": { "users": { "role_name": "users", "users": [ { "name": "alice", "uid": 1000, "gid": 1000, "gecos": "Alice Example", "home": "/home/alice", "shell": "/bin/bash", "primary_group": "alice", "supplementary_groups": ["docker"], } ], "managed_dirs": [], "managed_files": [], "managed_links": [], "excluded": [], "notes": [], }, "services": [ { "unit": "foo.service", "role_name": "foo", "packages": ["foo"], "active_state": "active", "unit_file_state": "enabled", "managed_dirs": [ { "path": "/etc/foo", "owner": "root", "group": "root", "mode": "0755", } ], "managed_files": [ { "path": "/etc/foo/foo.conf", "src_rel": "etc/foo.conf", "owner": "root", "group": "root", "mode": "0644", } ], "managed_links": [ {"path": "/etc/foo/enabled.conf", "target": "/etc/foo/foo.conf"} ], "excluded": [], "notes": [], } ], "packages": [ { "package": "curl", "role_name": "curl", "section": "net", "managed_dirs": [], "managed_files": [], "managed_links": [], "excluded": [], "notes": [], } ], "apt_config": { "role_name": "apt_config", "managed_dirs": [], "managed_files": [], "managed_links": [], }, "dnf_config": { "role_name": "dnf_config", "managed_dirs": [], "managed_files": [], "managed_links": [], }, "sysctl": { "role_name": "sysctl", "managed_dirs": [], "managed_files": [ { "path": "/etc/sysctl.d/99-enroll.conf", "src_rel": "sysctl/99-enroll.conf", "owner": "root", "group": "root", "mode": "0644", } ], "managed_links": [], "parameters": {"net.ipv4.ip_forward": "1"}, "notes": [], }, "firewall_runtime": {"role_name": "firewall_runtime", "packages": []}, "etc_custom": { "role_name": "etc_custom", "managed_dirs": [], "managed_files": [], "managed_links": [], }, "usr_local_custom": { "role_name": "usr_local_custom", "managed_dirs": [], "managed_files": [], "managed_links": [], }, "extra_paths": { "role_name": "extra_paths", "managed_dirs": [], "managed_files": [], "managed_links": [], }, }, } def _write_sample_artifacts(bundle: Path) -> None: artifact = bundle / "artifacts" / "foo" / "etc" / "foo.conf" artifact.parent.mkdir(parents=True, exist_ok=True) artifact.write_text("setting = true\n", encoding="utf-8") sysctl_artifact = bundle / "artifacts" / "sysctl" / "sysctl" / "99-enroll.conf" sysctl_artifact.parent.mkdir(parents=True, exist_ok=True) sysctl_artifact.write_text("net.ipv4.ip_forward = 1\n", encoding="utf-8") def test_manifest_salt_writes_single_site_state_tree(tmp_path: Path): bundle = tmp_path / "bundle" out = tmp_path / "salt" _write_sample_artifacts(bundle) _write_state(bundle, _sample_state()) manifest.manifest(str(bundle), str(out), target="salt") top = yaml.safe_load((out / "states" / "top.sls").read_text(encoding="utf-8")) assert top["base"]["*"] == ["roles.net", "roles.users", "roles.sysctl"] net_sls = (out / "states" / "roles" / "net" / "init.sls").read_text( encoding="utf-8" ) assert "pkg.installed:" in net_sls assert '- name: "curl"' in net_sls assert '- name: "foo"' in net_sls assert '"/etc/foo/foo.conf":' in net_sls assert 'source: "salt://roles/net/files/etc/foo.conf"' in net_sls assert "file.symlink:" in net_sls assert "service.running:" in net_sls assert (out / "states" / "roles" / "net" / "files" / "etc" / "foo.conf").exists() users_sls = (out / "states" / "roles" / "users" / "init.sls").read_text( encoding="utf-8" ) assert "group.present:" in users_sls assert "user.present:" in users_sls assert "Alice Example" in users_sls assert "optional_groups" not in users_sls assert "- remove_groups: false" in users_sls sysctl_sls = (out / "states" / "roles" / "sysctl" / "init.sls").read_text( encoding="utf-8" ) assert "cmd.run:" in sysctl_sls assert "sysctl -e -p /etc/sysctl.d/99-enroll.conf || true" in sysctl_sls assert (out / "README.md").exists() assert (out / "config" / "master.d" / "enroll.conf").exists() def test_manifest_salt_fqdn_mode_uses_pillar_and_accumulates_nodes(tmp_path: Path): out = tmp_path / "salt" def write_bundle(name: str, content: str) -> Path: bundle = tmp_path / name _write_sample_artifacts(bundle) (bundle / "artifacts" / "foo" / "etc" / "foo.conf").write_text( content, encoding="utf-8" ) state = _sample_state() state["host"]["hostname"] = name _write_state(bundle, state) return bundle first = write_bundle("first", "first=true\n") second = write_bundle("second", "second=true\n") manifest.manifest(str(first), str(out), target="salt", fqdn="first.example") manifest.manifest(str(second), str(out), target="salt", fqdn="second.example") state_top = yaml.safe_load((out / "states" / "top.sls").read_text(encoding="utf-8")) assert state_top["base"]["first.example"] == [ "roles.curl", "roles.foo", "roles.users", "roles.sysctl", ] assert state_top["base"]["second.example"] == [ "roles.curl", "roles.foo", "roles.users", "roles.sysctl", ] pillar_top = yaml.safe_load( (out / "pillar" / "top.sls").read_text(encoding="utf-8") ) assert set(pillar_top["base"]) == {"first.example", "second.example"} first_pillar_sls = pillar_top["base"]["first.example"][0] first_node = out / "pillar" / Path(*first_pillar_sls.split(".")) first_data = yaml.safe_load( first_node.with_suffix(".sls").read_text(encoding="utf-8") ) assert first_data["enroll"]["classes"] == [ "roles.curl", "roles.foo", "roles.users", "roles.sysctl", ] assert first_data["enroll"]["roles"]["foo"]["packages"] == ["foo"] assert first_data["enroll"]["roles"]["foo"]["files"]["/etc/foo/foo.conf"][ "source" ] == ("salt://roles/foo/files/nodes/first.example/etc/foo.conf") foo_sls = (out / "states" / "roles" / "foo" / "init.sls").read_text( encoding="utf-8" ) assert "salt['pillar.get']('enroll:roles:foo'" in foo_sls assert "pkg.installed:" in foo_sls assert "file.managed:" in foo_sls assert ( out / "states" / "roles" / "foo" / "files" / "nodes" / "first.example" / "etc" / "foo.conf" ).exists() assert ( out / "states" / "roles" / "foo" / "files" / "nodes" / "second.example" / "etc" / "foo.conf" ).exists() def test_manifest_salt_user_gecos_and_groups_are_salt_safe(tmp_path: Path): bundle = tmp_path / "bundle" out = tmp_path / "salt" state = _sample_state() state["roles"]["users"]["users"][0]["name"] = "node" state["roles"]["users"]["users"][0]["primary_group"] = "node" state["roles"]["users"]["users"][0]["gid"] = 1000 state["roles"]["users"]["users"][0]["gecos"] = "Node,,," _write_sample_artifacts(bundle) _write_state(bundle, state) manifest.manifest(str(bundle), str(out), target="salt") users_sls = (out / "states" / "roles" / "users" / "init.sls").read_text( encoding="utf-8" ) assert '- fullname: "Node"' in users_sls assert "Node,,," not in users_sls assert "optional_groups" not in users_sls assert "- remove_groups: false" in users_sls def test_manifest_salt_fqdn_user_pillar_gecos_and_groups_are_salt_safe(tmp_path: Path): bundle = tmp_path / "bundle" out = tmp_path / "salt" state = _sample_state() state["roles"]["users"]["users"][0]["gecos"] = "Node,,," _write_sample_artifacts(bundle) _write_state(bundle, state) manifest.manifest(str(bundle), str(out), target="salt", fqdn="node.example") pillar_top = yaml.safe_load( (out / "pillar" / "top.sls").read_text(encoding="utf-8") ) node_sls = pillar_top["base"]["node.example"][0] pillar_path = out / "pillar" / Path(*node_sls.split(".")) data = yaml.safe_load(pillar_path.with_suffix(".sls").read_text(encoding="utf-8")) alice = data["enroll"]["roles"]["users"]["users"]["alice"] assert alice["fullname"] == "Node" assert "Node,,," not in pillar_path.with_suffix(".sls").read_text(encoding="utf-8") assert alice["remove_groups"] is False assert "optional_groups" not in pillar_path.with_suffix(".sls").read_text( encoding="utf-8" ) users_sls = (out / "states" / "roles" / "users" / "init.sls").read_text( encoding="utf-8" ) assert "optional_groups" not in users_sls assert "remove_groups" in users_sls def test_cli_manifest_target_salt_is_forwarded(monkeypatch, tmp_path): import sys import enroll.cli as cli called = {} def fake_manifest(harvest_dir: str, out_dir: str, **kwargs): called["harvest"] = harvest_dir called["out"] = out_dir called["target"] = kwargs.get("target") monkeypatch.setattr(cli, "manifest", fake_manifest) monkeypatch.setattr( sys, "argv", [ "enroll", "manifest", "--harvest", str(tmp_path / "bundle"), "--out", str(tmp_path / "salt"), "--target", "salt", ], ) cli.main() assert called["harvest"] == str(tmp_path / "bundle") assert called["out"] == str(tmp_path / "salt") assert called["target"] == "salt"