diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/state_helpers.py b/tests/state_helpers.py new file mode 100644 index 0000000..9cbca20 --- /dev/null +++ b/tests/state_helpers.py @@ -0,0 +1,234 @@ +from __future__ import annotations + +import copy +import json +from pathlib import Path +from typing import Any + +_VALID_REASON_FALLBACKS = { + "dangerous_user_dotfile": "user_shell_rc", + "possible_secret": "sensitive_content", +} + +_COMMON_ROLES = { + "users", + "apt_config", + "dnf_config", + "etc_custom", + "usr_local_custom", + "extra_paths", +} + + +def _common_role(name: str) -> dict[str, Any]: + out: dict[str, Any] = { + "role_name": name, + "managed_dirs": [], + "managed_files": [], + "excluded": [], + "notes": [], + } + if name == "users": + out["users"] = [] + if name == "extra_paths": + out["include_patterns"] = [] + out["exclude_patterns"] = [] + out["managed_links"] = [] + return out + + +def _normalise_managed_file(mf: dict[str, Any]) -> None: + reason = mf.get("reason") + if reason in _VALID_REASON_FALLBACKS: + mf["reason"] = _VALID_REASON_FALLBACKS[reason] + mf.setdefault("owner", "root") + mf.setdefault("group", "root") + mf.setdefault("mode", "0644") + mf.setdefault("reason", "modified_conffile") + + +def _normalise_managed_dir(md: dict[str, Any]) -> None: + md.setdefault("owner", "root") + md.setdefault("group", "root") + md.setdefault("mode", "0755") + if md.get("reason") in {None, "parent_dir"}: + md["reason"] = "parent_of_managed_file" + + +def _normalise_managed_link(ml: dict[str, Any]) -> None: + ml.setdefault("reason", "enabled_symlink") + + +def _normalise_common_role(role: dict[str, Any], name: str) -> None: + role.setdefault("role_name", name) + role.setdefault("managed_dirs", []) + role.setdefault("managed_files", []) + role.setdefault("excluded", []) + role.setdefault("notes", []) + for mf in role.get("managed_files") or []: + if isinstance(mf, dict): + _normalise_managed_file(mf) + for md in role.get("managed_dirs") or []: + if isinstance(md, dict): + _normalise_managed_dir(md) + for ml in role.get("managed_links") or []: + if isinstance(ml, dict): + _normalise_managed_link(ml) + for ex in role.get("excluded") or []: + if isinstance(ex, dict) and ex.get("reason") in _VALID_REASON_FALLBACKS: + ex["reason"] = _VALID_REASON_FALLBACKS[ex["reason"]] + + +def make_schema_valid_state(state: dict[str, Any]) -> dict[str, Any]: + """Return a current-schema harvest state from a compact renderer fixture. + + Many renderer tests intentionally build only the fields needed by the + renderer under test. Manifest now validates strictly before rendering, so + those fixtures need current-schema boilerplate too. + """ + + st = copy.deepcopy(state) + st.pop("schema_version", None) + + enroll = st.setdefault("enroll", {}) + enroll.setdefault("version", "0.0.test") + enroll.setdefault("harvest_time", 0) + + host = st.setdefault("host", {}) + host.setdefault("hostname", "testhost") + host.setdefault("os", "unknown") + host.setdefault("pkg_backend", "dpkg") + host.setdefault("os_release", {}) + + inv = st.setdefault("inventory", {}) + inv.setdefault("packages", {}) + for pkg in (inv.get("packages") or {}).values(): + if not isinstance(pkg, dict): + continue + pkg.setdefault("version", None) + pkg.setdefault("arches", []) + installations = pkg.setdefault("installations", []) + for inst in installations: + if isinstance(inst, dict): + inst.setdefault("version", str(pkg.get("version") or "1.0")) + inst.setdefault("arch", "amd64") + observed = pkg.setdefault("observed_via", []) + for ov in observed: + if isinstance(ov, dict) and ov.get("kind") not in { + "user_installed", + "systemd_unit", + "package_role", + "firewall_runtime", + }: + ov["kind"] = "package_role" + ov.setdefault("ref", "package") + pkg.setdefault("roles", []) + + roles = st.setdefault("roles", {}) + for name in _COMMON_ROLES: + cur = roles.get(name) + if not isinstance(cur, dict): + roles[name] = _common_role(name) + else: + _normalise_common_role(cur, name) + + roles.setdefault("services", []) + roles.setdefault("packages", []) + + users = roles.get("users") or {} + users.setdefault("users", []) + for user in users.get("users") or []: + if not isinstance(user, dict): + continue + user.setdefault("uid", 0) + user.setdefault("gid", user.get("uid", 0)) + user.setdefault("gecos", "") + user.setdefault("home", f"/home/{user.get('name', 'user')}") + user.setdefault("shell", "/bin/sh") + user.setdefault("primary_group", user.get("name", "users")) + user.setdefault("supplementary_groups", []) + + extra = roles.get("extra_paths") or {} + extra.setdefault("include_patterns", []) + extra.setdefault("exclude_patterns", []) + extra.setdefault("managed_links", []) + + for svc in roles.get("services") or []: + if not isinstance(svc, dict): + continue + _normalise_common_role(svc, str(svc.get("role_name") or "service_role")) + svc.setdefault("unit", "example.service") + svc.setdefault("packages", []) + svc.setdefault("active_state", None) + svc.setdefault("sub_state", None) + svc.setdefault("unit_file_state", None) + svc.setdefault("condition_result", None) + + for pkg in roles.get("packages") or []: + if not isinstance(pkg, dict): + continue + _normalise_common_role( + pkg, str(pkg.get("role_name") or pkg.get("package") or "package_role") + ) + pkg.setdefault("package", str(pkg.get("role_name") or "package")) + + if isinstance(roles.get("sysctl"), dict): + sysctl = roles["sysctl"] + sysctl.setdefault("role_name", "sysctl") + sysctl.setdefault("managed_files", []) + sysctl.setdefault("parameters", {}) + sysctl.setdefault("notes", []) + sysctl.pop("managed_dirs", None) + sysctl.pop("managed_links", None) + for mf in sysctl.get("managed_files") or []: + if isinstance(mf, dict): + _normalise_managed_file(mf) + + if isinstance(roles.get("firewall_runtime"), dict): + fw = roles["firewall_runtime"] + fw.setdefault("role_name", "firewall_runtime") + fw.setdefault("packages", []) + fw.setdefault("ipset_save", None) + fw.setdefault("ipset_sets", []) + fw.setdefault("iptables_v4_save", None) + fw.setdefault("iptables_v6_save", None) + fw.setdefault("notes", []) + + if isinstance(roles.get("flatpak"), dict): + roles["flatpak"].setdefault("role_name", "flatpak") + if isinstance(roles.get("snap"), dict): + roles["snap"].setdefault("role_name", "snap") + if isinstance(roles.get("container_images"), dict): + ci = roles["container_images"] + ci.setdefault("role_name", "container_images") + ci.setdefault("images", []) + ci.setdefault("notes", []) + for img in ci.get("images") or []: + if not isinstance(img, dict): + continue + img.setdefault("engine", "docker") + img.setdefault("scope", "system") + img.setdefault("user", None) + img.setdefault("home", None) + img.setdefault("image_id", None) + img.setdefault("repo_tags", []) + img.setdefault("repo_digests", []) + img.setdefault("pull_ref", None) + img.setdefault("tag_aliases", []) + img.setdefault("os", None) + img.setdefault("architecture", None) + img.setdefault("variant", None) + img.setdefault("platform", None) + img.setdefault("size", None) + img.setdefault("created", None) + img.setdefault("source", "test") + img.setdefault("notes", []) + + return st + + +def write_schema_state(bundle: Path, state: dict[str, Any]) -> None: + bundle.mkdir(parents=True, exist_ok=True) + (bundle / "state.json").write_text( + json.dumps(make_schema_valid_state(state), indent=2), encoding="utf-8" + ) diff --git a/tests/test_cli_helpers.py b/tests/test_cli_helpers.py index 264ff85..dab28b4 100644 --- a/tests/test_cli_helpers.py +++ b/tests/test_cli_helpers.py @@ -8,7 +8,7 @@ from pathlib import Path def test_discover_config_path_precedence(tmp_path: Path, monkeypatch): - """_discover_config_path: --config > ENROLL_CONFIG > ./enroll.ini > XDG.""" + """_discover_config_path: --config > ENROLL_CONFIG > XDG.""" from enroll.cli import _discover_config_path cfg1 = tmp_path / "one.ini" @@ -27,14 +27,14 @@ def test_discover_config_path_precedence(tmp_path: Path, monkeypatch): monkeypatch.setenv("ENROLL_CONFIG", str(cfg2)) assert _discover_config_path([]) == cfg2 - # Local ./enroll.ini fallback. + # Local ./enroll.ini is ignored unless passed explicitly. monkeypatch.delenv("ENROLL_CONFIG", raising=False) local = tmp_path / "enroll.ini" local.write_text("[enroll]\n", encoding="utf-8") - assert _discover_config_path([]) == local + assert _discover_config_path([]) is None + assert _discover_config_path(["--config", str(local)]) == local # XDG fallback. - local.unlink() xdg = tmp_path / "xdg" cfg3 = xdg / "enroll" / "enroll.ini" cfg3.parent.mkdir(parents=True) diff --git a/tests/test_diff_ignore_versions_exclude_enforce.py b/tests/test_diff_ignore_versions_exclude_enforce.py index d08c60e..89e7d7c 100644 --- a/tests/test_diff_ignore_versions_exclude_enforce.py +++ b/tests/test_diff_ignore_versions_exclude_enforce.py @@ -244,6 +244,7 @@ def test_enforce_old_harvest_runs_ansible_with_tags_from_file_drift( # Stub manifest generation to only create playbook.yml (fast, no real roles needed). def fake_manifest(_harvest_dir: str, out_dir: str, **_kwargs): out = Path(out_dir) + out.mkdir(parents=True, exist_ok=False) (out / "playbook.yml").write_text( "---\n- hosts: all\n gather_facts: false\n roles: []\n", encoding="utf-8", @@ -363,7 +364,10 @@ def test_enforce_old_harvest_runs_puppet_target(monkeypatch, tmp_path: Path): argv = calls.get("argv") assert argv and argv[:2] == ["/usr/bin/puppet", "apply"] assert "--modulepath" in argv - assert str(Path(calls["cwd"]) / "manifests" / "site.pp") in argv + assert any( + str(Path(calls["cwd"]) / "manifest" / "manifests" / "site.pp") == str(a) + for a in argv + ) def test_enforce_old_harvest_runs_salt_target(monkeypatch, tmp_path: Path): @@ -418,7 +422,7 @@ def test_enforce_old_harvest_runs_salt_target(monkeypatch, tmp_path: Path): assert "--local" in argv assert "--file-root" in argv assert "state.apply" in argv - assert str(Path(calls["cwd"]) / "states") in argv + assert str(Path(calls["cwd"]) / "manifest" / "states") in argv def test_cli_diff_enforce_forwards_target(monkeypatch): diff --git a/tests/test_diff_notifications.py b/tests/test_diff_notifications.py index 53f6b57..9a433b4 100644 --- a/tests/test_diff_notifications.py +++ b/tests/test_diff_notifications.py @@ -81,3 +81,42 @@ def test_send_email_raises_when_no_delivery_method(monkeypatch): from_addr="a@example.com", to_addrs=["b@example.com"], ) + + +def test_send_email_refuses_smtp_auth_without_starttls(monkeypatch): + from enroll.diff import send_email + + class FakeSMTP: + def __init__(self, *_args, **_kwargs): + pass + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def ehlo(self): + pass + + def starttls(self): + raise RuntimeError("no starttls") + + def login(self, *_args): + raise AssertionError("login should not be called without TLS") + + def send_message(self, *_args): + raise AssertionError("message should not be sent without TLS") + + monkeypatch.setattr("smtplib.SMTP", FakeSMTP) + + with pytest.raises(RuntimeError, match="STARTTLS failed"): + send_email( + subject="Subj", + body="Body", + from_addr="a@example.com", + to_addrs=["b@example.com"], + smtp="smtp.example.com:587", + smtp_user="user", + smtp_password="secret", + ) diff --git a/tests/test_jinjaturtle.py b/tests/test_jinjaturtle.py index e4198c7..e363193 100644 --- a/tests/test_jinjaturtle.py +++ b/tests/test_jinjaturtle.py @@ -1,6 +1,7 @@ -import json from pathlib import Path +from tests.state_helpers import write_schema_state + import enroll.manifest as manifest_mod import enroll.jinjaturtle as jinjaturtle_mod from enroll.jinjaturtle import JinjifyResult @@ -103,7 +104,7 @@ def test_manifest_uses_jinjaturtle_templates_and_does_not_copy_raw( } bundle.mkdir(parents=True, exist_ok=True) - (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") + write_schema_state(bundle, state) # Pretend jinjaturtle exists. monkeypatch.setattr( diff --git a/tests/test_manifest.py b/tests/test_manifest.py index dba3d24..b8898cd 100644 --- a/tests/test_manifest.py +++ b/tests/test_manifest.py @@ -1,4 +1,3 @@ -import json from pathlib import Path import os @@ -7,6 +6,7 @@ import tarfile import pytest import enroll.manifest as manifest +from tests.state_helpers import write_schema_state import enroll.jinjaturtle as jinjaturtle_mod from enroll import ansible as ansible_layout from enroll import ansible as ansible_tasks @@ -84,8 +84,7 @@ def _minimal_package_state(packages): def _write_state(bundle: Path, state: dict) -> None: - bundle.mkdir(parents=True, exist_ok=True) - (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") + write_schema_state(bundle, state) def test_manifest_writes_roles_and_playbook_with_clean_when(tmp_path: Path): @@ -230,7 +229,7 @@ def test_manifest_writes_roles_and_playbook_with_clean_when(tmp_path: Path): } bundle.mkdir(parents=True, exist_ok=True) - (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") + write_schema_state(bundle, state) # Create artifact for etc_custom file so copy works (bundle / "artifacts" / "etc_custom" / "etc" / "default").mkdir( @@ -936,7 +935,7 @@ def test_manifest_site_mode_creates_host_inventory_and_raw_files(tmp_path: Path) } bundle.mkdir(parents=True, exist_ok=True) - (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") + write_schema_state(bundle, state) # Artifacts for usr_local_custom file so copy works. (bundle / "artifacts" / "usr_local_custom" / "usr" / "local" / "etc").mkdir( @@ -1087,7 +1086,7 @@ def test_manifest_includes_dnf_config_role_when_present(tmp_path: Path): } bundle.mkdir(parents=True, exist_ok=True) - (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") + write_schema_state(bundle, state) manifest.manifest(str(bundle), str(out)) @@ -1224,7 +1223,7 @@ def test_manifest_orders_cron_and_logrotate_at_playbook_tail(tmp_path: Path): ) bundle.mkdir(parents=True, exist_ok=True) - (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") + write_schema_state(bundle, state) manifest.manifest(str(bundle), str(out)) @@ -1363,9 +1362,7 @@ def test_manifest_applies_jinjaturtle_to_jinjifyable_managed_file( }, }, } - (bundle / "state.json").write_text( - __import__("json").dumps(state), encoding="utf-8" - ) + write_schema_state(bundle, state) monkeypatch.setattr(jinjaturtle_mod, "find_jinjaturtle_cmd", lambda: "jinjaturtle") @@ -1463,7 +1460,7 @@ def test_manifest_writes_firewall_runtime_role(tmp_path: Path): }, }, } - (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") + write_schema_state(bundle, state) manifest.manifest(str(bundle), str(out)) @@ -1768,7 +1765,7 @@ def test_manifest_renders_flatpak_and_snap_details(tmp_path: Path): }, } bundle.mkdir(parents=True, exist_ok=True) - (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") + write_schema_state(bundle, state) manifest.manifest(str(bundle), str(out)) @@ -1844,7 +1841,7 @@ def test_users_role_without_portable_apps_omits_community_general_tasks(tmp_path }, } bundle.mkdir(parents=True, exist_ok=True) - (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") + write_schema_state(bundle, state) manifest.manifest(str(bundle), str(out)) @@ -1922,7 +1919,7 @@ def test_users_role_only_creates_ssh_dir_when_managed_ssh_files_exist(tmp_path): }, } bundle.mkdir(parents=True, exist_ok=True) - (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") + write_schema_state(bundle, state) manifest.manifest(str(bundle), str(out)) @@ -1972,7 +1969,7 @@ def test_manifest_emits_flatpak_role_even_when_no_flatpaks(tmp_path): } } bundle.mkdir(parents=True, exist_ok=True) - (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") + write_schema_state(bundle, state) manifest.manifest(str(bundle), str(out)) @@ -2039,7 +2036,7 @@ def test_manifest_avoids_package_role_collision_with_flatpak_singleton(tmp_path) } } bundle.mkdir(parents=True, exist_ok=True) - (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") + write_schema_state(bundle, state) manifest.manifest(str(bundle), str(out), no_common_roles=True) @@ -2135,7 +2132,7 @@ def test_manifest_writes_sysctl_role(tmp_path: Path): }, }, } - (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") + write_schema_state(bundle, state) manifest.manifest(str(bundle), str(out)) @@ -2237,7 +2234,7 @@ def test_manifest_renders_container_image_role_for_ansible(tmp_path: Path): } } bundle.mkdir(parents=True, exist_ok=True) - (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") + write_schema_state(bundle, state) manifest.manifest(str(bundle), str(out)) @@ -2314,7 +2311,7 @@ def test_manifest_writes_container_images_to_hostvars_in_fqdn_mode(tmp_path: Pat } } bundle.mkdir(parents=True, exist_ok=True) - (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") + write_schema_state(bundle, state) manifest.manifest(str(bundle), str(out), fqdn="host.example.test") @@ -2329,3 +2326,14 @@ def test_manifest_writes_container_images_to_hostvars_in_fqdn_mode(tmp_path: Pat assert "container_images: []" in defaults assert digest in hostvars assert "role: container_images" in playbook + + +def test_manifest_non_fqdn_refuses_existing_output(tmp_path: Path): + bundle = tmp_path / "bundle" + out = tmp_path / "ansible" + bundle.mkdir(parents=True) + out.mkdir() + write_schema_state(bundle, _minimal_package_state([])) + + with pytest.raises(RuntimeError, match="already exists"): + manifest.manifest(str(bundle), str(out), no_common_roles=True) diff --git a/tests/test_manifest_puppet.py b/tests/test_manifest_puppet.py index 74f0dc6..24f7b5a 100644 --- a/tests/test_manifest_puppet.py +++ b/tests/test_manifest_puppet.py @@ -5,6 +5,8 @@ from pathlib import Path import yaml +from tests.state_helpers import write_schema_state + from enroll import manifest from enroll.puppet import ( PuppetRole, @@ -15,8 +17,7 @@ from enroll.puppet import ( def _write_state(bundle: Path, state: dict) -> None: - bundle.mkdir(parents=True, exist_ok=True) - (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") + write_schema_state(bundle, state) def test_manifest_puppet_writes_control_repo_style_output(tmp_path: Path): diff --git a/tests/test_manifest_salt.py b/tests/test_manifest_salt.py index 9a35c2b..f629b36 100644 --- a/tests/test_manifest_salt.py +++ b/tests/test_manifest_salt.py @@ -1,11 +1,12 @@ from __future__ import annotations -import json from collections import OrderedDict from pathlib import Path import yaml +from tests.state_helpers import write_schema_state + from enroll import manifest from enroll.salt import ( SaltRole, @@ -18,8 +19,7 @@ from enroll.salt import ( def _write_state(bundle: Path, state: dict) -> None: - bundle.mkdir(parents=True, exist_ok=True) - (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") + write_schema_state(bundle, state) def _sample_state() -> dict: diff --git a/tests/test_manifest_symlinks.py b/tests/test_manifest_symlinks.py index 39ef9a0..051a009 100644 --- a/tests/test_manifest_symlinks.py +++ b/tests/test_manifest_symlinks.py @@ -1,6 +1,7 @@ -import json from pathlib import Path +from tests.state_helpers import write_schema_state + import enroll.manifest as manifest @@ -92,7 +93,7 @@ def test_manifest_emits_symlink_tasks_and_vars(tmp_path: Path): bundle.mkdir(parents=True, exist_ok=True) (bundle / "artifacts").mkdir(parents=True, exist_ok=True) - (bundle / "state.json").write_text(json.dumps(state), encoding="utf-8") + write_schema_state(bundle, state) manifest.manifest(str(bundle), str(out)) diff --git a/tests/test_validate.py b/tests/test_validate.py index 05ee88b..5ac33c9 100644 --- a/tests/test_validate.py +++ b/tests/test_validate.py @@ -411,3 +411,45 @@ def test_validate_harvest_no_schema_option(tmp_path: Path): result = validate_harvest(str(bundle_dir), no_schema=True) assert result.ok is False assert any("failed to parse" in e for e in result.errors) + + +def test_validate_harvest_rejects_artifact_symlink(tmp_path: Path): + bundle_dir = tmp_path / "bundle" + artifact = bundle_dir / "artifacts" / "users" / "etc" / "shadow" + artifact.parent.mkdir(parents=True) + artifact.symlink_to("/etc/shadow") + (bundle_dir / "state.json").write_text( + json.dumps( + { + "roles": { + "users": { + "managed_files": [ + {"path": "/etc/shadow", "src_rel": "etc/shadow"} + ] + } + } + } + ), + encoding="utf-8", + ) + + result = validate_harvest(str(bundle_dir), no_schema=True) + + assert result.ok is False + assert any("symlink" in e for e in result.errors) + + +def test_validate_harvest_rejects_unreferenced_artifact_symlink(tmp_path: Path): + bundle_dir = tmp_path / "bundle" + artifact = bundle_dir / "artifacts" / "users" / "etc" / "shadow" + artifact.parent.mkdir(parents=True) + artifact.symlink_to("/etc/shadow") + (bundle_dir / "state.json").write_text( + json.dumps({"roles": {"users": {"managed_files": []}}}), + encoding="utf-8", + ) + + result = validate_harvest(str(bundle_dir), no_schema=True) + + assert result.ok is False + assert any("symlink" in e for e in result.errors)