diff --git a/CHANGELOG.md b/CHANGELOG.md index c687249..19906cc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ * Introduce `enroll explain` - a tool to analyze and explain what's in (or not in) a harvest and why. * Centralise the cron and logrotate stuff into their respective roles, we had a bit of duplication between roles based on harvest discovery. + * Capture other files in the user's home directory such as `.bashrc`, `.bash_aliases`, `.profile`, if these files differ from the `/etc/skel` defaults # 0.2.3 diff --git a/enroll/cli.py b/enroll/cli.py index 5624587..829a4ac 100644 --- a/enroll/cli.py +++ b/enroll/cli.py @@ -914,56 +914,6 @@ def main() -> None: fqdn=args.fqdn, jinjaturtle=_jt_mode(args), ) - elif args.cmd == "diff": - report, has_changes = compare_harvests( - args.old, args.new, sops_mode=bool(getattr(args, "sops", False)) - ) - - rendered = format_report(report, fmt=str(args.format)) - if args.out: - Path(args.out).expanduser().write_text(rendered, encoding="utf-8") - else: - print(rendered, end="") - - do_notify = bool(has_changes or getattr(args, "notify_always", False)) - - if do_notify and getattr(args, "webhook", None): - wf = str(getattr(args, "webhook_format", "json")) - body = format_report(report, fmt=wf).encode("utf-8") - headers = {"User-Agent": "enroll"} - if wf == "json": - headers["Content-Type"] = "application/json" - else: - headers["Content-Type"] = "text/plain; charset=utf-8" - for hv in getattr(args, "webhook_header", []) or []: - if ":" not in hv: - raise SystemExit( - "error: --webhook-header must be in the form 'K:V'" - ) - k, v = hv.split(":", 1) - headers[k.strip()] = v.strip() - status, _ = post_webhook(str(args.webhook), body, headers=headers) - if status and status >= 400: - raise SystemExit(f"error: webhook returned HTTP {status}") - - if do_notify and (getattr(args, "email_to", []) or []): - subject = getattr(args, "email_subject", None) or "enroll diff report" - smtp_password = None - pw_env = getattr(args, "smtp_password_env", None) - if pw_env: - smtp_password = os.environ.get(str(pw_env)) - send_email( - to_addrs=list(getattr(args, "email_to", []) or []), - subject=str(subject), - body=rendered, - from_addr=getattr(args, "email_from", None), - smtp=getattr(args, "smtp", None), - smtp_user=getattr(args, "smtp_user", None), - smtp_password=smtp_password, - ) - - if getattr(args, "exit_code", False) and has_changes: - raise SystemExit(2) except RemoteSudoPasswordRequired: raise SystemExit( "error: remote sudo requires a password. Re-run with --ask-become-pass." diff --git a/enroll/harvest.py b/enroll/harvest.py index 6ecf676..40fe284 100644 --- a/enroll/harvest.py +++ b/enroll/harvest.py @@ -5,6 +5,7 @@ import json import os import re import shutil +import stat import time from dataclasses import dataclass, asdict, field from typing import Dict, List, Optional, Set @@ -157,6 +158,54 @@ MAX_FILES_CAP = 4000 MAX_UNOWNED_FILES_PER_ROLE = 500 +def _files_differ(a: str, b: str, *, max_bytes: int = 2_000_000) -> bool: + """Return True if file `a` differs from file `b`. + + Best-effort and conservative: + - If `b` (baseline) does not exist or is not a regular file, treat as + "different" so we err on the side of capturing user state. + - If we can't stat/read either file, treat as "different" (capture will + later be filtered via IgnorePolicy). + - If files are large, avoid reading them fully. + """ + + try: + st_a = os.stat(a, follow_symlinks=True) + except OSError: + return True + + # Refuse to do content comparisons on non-regular files. + if not stat.S_ISREG(st_a.st_mode): + return True + + try: + st_b = os.stat(b, follow_symlinks=True) + except OSError: + return True + + if not stat.S_ISREG(st_b.st_mode): + return True + + if st_a.st_size != st_b.st_size: + return True + + # If it's unexpectedly big, treat as different to avoid expensive reads. + if st_a.st_size > max_bytes: + return True + + try: + with open(a, "rb") as fa, open(b, "rb") as fb: + while True: + ca = fa.read(1024 * 64) + cb = fb.read(1024 * 64) + if ca != cb: + return True + if not ca: # EOF on both + return False + except OSError: + return True + + def _merge_parent_dirs( existing_dirs: List[ManagedDir], managed_files: List[ManagedFile], @@ -1319,6 +1368,18 @@ def harvest( users_role_name = "users" users_role_seen = seen_by_role.setdefault(users_role_name, set()) + skel_dir = "/etc/skel" + # Dotfiles to harvest for non-system users. For the common "skeleton" + # files, only capture if the user's copy differs from /etc/skel. + skel_dotfiles = [ + (".bashrc", "user_shell_rc"), + (".profile", "user_profile"), + (".bash_logout", "user_shell_logout"), + ] + extra_dotfiles = [ + (".bash_aliases", "user_shell_aliases"), + ] + for u in user_records: users_list.append( { @@ -1353,6 +1414,48 @@ def harvest( seen_global=captured_global, ) + # Capture common per-user shell dotfiles when they differ from /etc/skel. + # These still go through IgnorePolicy and user path filters. + home = (u.home or "").rstrip("/") + if home and home.startswith("/"): + for rel, reason in skel_dotfiles: + upath = os.path.join(home, rel) + if not os.path.exists(upath): + continue + skel_path = os.path.join(skel_dir, rel) + if not _files_differ(upath, skel_path, max_bytes=policy.max_file_bytes): + continue + _capture_file( + bundle_dir=bundle_dir, + role_name=users_role_name, + abs_path=upath, + reason=reason, + policy=policy, + path_filter=path_filter, + managed_out=users_managed, + excluded_out=users_excluded, + seen_role=users_role_seen, + seen_global=captured_global, + ) + + # Capture other common per-user shell files unconditionally if present. + for rel, reason in extra_dotfiles: + upath = os.path.join(home, rel) + if not os.path.exists(upath): + continue + _capture_file( + bundle_dir=bundle_dir, + role_name=users_role_name, + abs_path=upath, + reason=reason, + policy=policy, + path_filter=path_filter, + managed_out=users_managed, + excluded_out=users_excluded, + seen_role=users_role_seen, + seen_global=captured_global, + ) + users_snapshot = UsersSnapshot( role_name=users_role_name, users=users_list, diff --git a/enroll/manifest.py b/enroll/manifest.py index ea38e98..b616fe6 100644 --- a/enroll/manifest.py +++ b/enroll/manifest.py @@ -819,7 +819,12 @@ def _manifest_from_bundle_dir( group = str(u.get("primary_group") or owner) break - mode = "0600" if mf.get("reason") == "authorized_keys" else "0644" + # Prefer the harvested file mode so we preserve any deliberate + # permissions (e.g. 0600 for certain dotfiles). For authorized_keys, + # enforce 0600 regardless. + mode = mf.get("mode") or "0644" + if mf.get("reason") == "authorized_keys": + mode = "0600" ssh_files.append( { "dest": dest, diff --git a/tests.sh b/tests.sh index 23fe30b..23c5ce1 100755 --- a/tests.sh +++ b/tests.sh @@ -27,9 +27,10 @@ poetry run \ enroll harvest --out "${BUNDLE_DIR}2" poetry run \ enroll diff \ - --old "${BUNDLE_DIR}" \ - --new "${BUNDLE_DIR}2" \ - --format json | jq + --old "${BUNDLE_DIR}" \ + --new "${BUNDLE_DIR}2" \ + --format json | jq +DEBIAN_FRONTEND=noninteractive apt-get remove --purge cowsay # Ansible test builtin cd "${ANSIBLE_DIR}" diff --git a/tests/test_cli.py b/tests/test_cli.py index 5fc9a66..e5c6966 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,9 +1,14 @@ +from __future__ import annotations import sys import pytest - import enroll.cli as cli +from pathlib import Path + +from enroll.remote import RemoteSudoPasswordRequired +from enroll.sopsutil import SopsError + def test_cli_harvest_subcommand_calls_harvest(monkeypatch, capsys, tmp_path): called = {} @@ -398,3 +403,286 @@ def test_cli_manifest_common_args(monkeypatch, tmp_path): cli.main() assert called["fqdn"] == "example.test" assert called["jinjaturtle"] == "off" + + +def test_cli_explain_passes_args_and_writes_stdout(monkeypatch, capsys, tmp_path): + called = {} + + def fake_explain_state( + harvest: str, + *, + sops_mode: bool = False, + fmt: str = "text", + max_examples: int = 3, + ): + called["harvest"] = harvest + called["sops_mode"] = sops_mode + called["fmt"] = fmt + called["max_examples"] = max_examples + return "EXPLAINED\n" + + monkeypatch.setattr(cli, "explain_state", fake_explain_state) + + monkeypatch.setattr( + sys, + "argv", + [ + "enroll", + "explain", + "--sops", + "--format", + "json", + "--max-examples", + "7", + str(tmp_path / "bundle" / "state.json"), + ], + ) + + cli.main() + out = capsys.readouterr().out + assert out == "EXPLAINED\n" + assert called["sops_mode"] is True + assert called["fmt"] == "json" + assert called["max_examples"] == 7 + + +def test_discover_config_path_missing_config_value_returns_none(monkeypatch): + # Covers the "--config" flag present with no value. + monkeypatch.delenv("ENROLL_CONFIG", raising=False) + monkeypatch.delenv("XDG_CONFIG_HOME", raising=False) + assert cli._discover_config_path(["--config"]) is None + + +def test_discover_config_path_defaults_to_home_config(monkeypatch, tmp_path: Path): + # Covers the Path.home() / ".config" fallback. + monkeypatch.delenv("ENROLL_CONFIG", raising=False) + monkeypatch.delenv("XDG_CONFIG_HOME", raising=False) + monkeypatch.setattr(cli.Path, "home", lambda: tmp_path) + monkeypatch.setattr(cli.Path, "cwd", lambda: tmp_path) + + cp = tmp_path / ".config" / "enroll" / "enroll.ini" + cp.parent.mkdir(parents=True) + cp.write_text("[enroll]\n", encoding="utf-8") + + assert cli._discover_config_path(["harvest"]) == cp + + +def test_cli_harvest_local_sops_encrypts_and_prints_path( + monkeypatch, tmp_path: Path, capsys +): + out_dir = tmp_path / "out" + out_dir.mkdir() + calls: dict[str, object] = {} + + def fake_harvest(bundle_dir: str, **kwargs): + calls["bundle"] = bundle_dir + # Create a minimal state.json so tooling that expects it won't break. + Path(bundle_dir).mkdir(parents=True, exist_ok=True) + (Path(bundle_dir) / "state.json").write_text("{}", encoding="utf-8") + return str(Path(bundle_dir) / "state.json") + + def fake_encrypt(bundle_dir: Path, out_file: Path, fps: list[str]): + calls["encrypt"] = (bundle_dir, out_file, fps) + out_file.write_text("encrypted", encoding="utf-8") + return out_file + + monkeypatch.setattr(cli, "harvest", fake_harvest) + monkeypatch.setattr(cli, "_encrypt_harvest_dir_to_sops", fake_encrypt) + + monkeypatch.setattr( + sys, + "argv", + [ + "enroll", + "harvest", + "--sops", + "ABCDEF", + "--out", + str(out_dir), + ], + ) + cli.main() + + printed = capsys.readouterr().out.strip() + assert printed.endswith("harvest.tar.gz.sops") + assert Path(printed).exists() + assert calls.get("encrypt") + + +def test_cli_harvest_remote_sops_encrypts_and_prints_path( + monkeypatch, tmp_path: Path, capsys +): + out_dir = tmp_path / "out" + out_dir.mkdir() + calls: dict[str, object] = {} + + def fake_remote_harvest(**kwargs): + calls["remote"] = kwargs + # Create a minimal state.json in the temp bundle. + out = Path(kwargs["local_out_dir"]) / "state.json" + out.write_text("{}", encoding="utf-8") + return out + + def fake_encrypt(bundle_dir: Path, out_file: Path, fps: list[str]): + calls["encrypt"] = (bundle_dir, out_file, fps) + out_file.write_text("encrypted", encoding="utf-8") + return out_file + + monkeypatch.setattr(cli, "remote_harvest", fake_remote_harvest) + monkeypatch.setattr(cli, "_encrypt_harvest_dir_to_sops", fake_encrypt) + + monkeypatch.setattr( + sys, + "argv", + [ + "enroll", + "harvest", + "--remote-host", + "example.com", + "--remote-user", + "root", + "--sops", + "ABCDEF", + "--out", + str(out_dir), + ], + ) + cli.main() + + printed = capsys.readouterr().out.strip() + assert printed.endswith("harvest.tar.gz.sops") + assert Path(printed).exists() + assert calls.get("remote") + assert calls.get("encrypt") + + +def test_cli_harvest_remote_password_required_exits_cleanly(monkeypatch): + def boom(**kwargs): + raise RemoteSudoPasswordRequired("pw required") + + monkeypatch.setattr(cli, "remote_harvest", boom) + monkeypatch.setattr( + sys, + "argv", + [ + "enroll", + "harvest", + "--remote-host", + "example.com", + "--remote-user", + "root", + ], + ) + with pytest.raises(SystemExit) as e: + cli.main() + assert "--ask-become-pass" in str(e.value) + + +def test_cli_runtime_error_is_wrapped_as_user_friendly_system_exit(monkeypatch): + def boom(*args, **kwargs): + raise RuntimeError("nope") + + monkeypatch.setattr(cli, "harvest", boom) + monkeypatch.setattr(sys, "argv", ["enroll", "harvest", "--out", "/tmp/x"]) + with pytest.raises(SystemExit) as e: + cli.main() + assert str(e.value) == "error: nope" + + +def test_cli_sops_error_is_wrapped_as_user_friendly_system_exit(monkeypatch): + def boom(*args, **kwargs): + raise SopsError("sops broke") + + monkeypatch.setattr(cli, "manifest", boom) + monkeypatch.setattr( + sys, "argv", ["enroll", "manifest", "--harvest", "/tmp/x", "--out", "/tmp/y"] + ) + with pytest.raises(SystemExit) as e: + cli.main() + assert str(e.value) == "error: sops broke" + + +def test_cli_diff_notifies_webhook_and_email_and_respects_exit_code( + monkeypatch, capsys +): + calls: dict[str, object] = {} + + def fake_compare(old, new, sops_mode=False): + calls["compare"] = (old, new, sops_mode) + return {"dummy": True}, True + + def fake_format(report, fmt="text"): + calls.setdefault("format", []).append((report, fmt)) + return "REPORT\n" + + def fake_post(url, body, headers=None): + calls["webhook"] = (url, body, headers) + return 200, b"ok" + + def fake_email(**kwargs): + calls["email"] = kwargs + + monkeypatch.setattr(cli, "compare_harvests", fake_compare) + monkeypatch.setattr(cli, "format_report", fake_format) + monkeypatch.setattr(cli, "post_webhook", fake_post) + monkeypatch.setattr(cli, "send_email", fake_email) + monkeypatch.setenv("SMTPPW", "secret") + + monkeypatch.setattr( + sys, + "argv", + [ + "enroll", + "diff", + "--old", + "/tmp/old", + "--new", + "/tmp/new", + "--webhook", + "https://example.invalid/h", + "--webhook-header", + "X-Test: ok", + "--email-to", + "a@example.com", + "--smtp-password-env", + "SMTPPW", + "--exit-code", + ], + ) + + with pytest.raises(SystemExit) as e: + cli.main() + assert e.value.code == 2 + + assert calls.get("compare") + assert calls.get("webhook") + assert calls.get("email") + # No report printed when exiting via --exit-code? (we still render and print). + _ = capsys.readouterr() + + +def test_cli_diff_webhook_http_error_raises_system_exit(monkeypatch): + def fake_compare(old, new, sops_mode=False): + return {"dummy": True}, True + + monkeypatch.setattr(cli, "compare_harvests", fake_compare) + monkeypatch.setattr(cli, "format_report", lambda report, fmt="text": "R\n") + monkeypatch.setattr(cli, "post_webhook", lambda url, body, headers=None: (500, b"")) + + monkeypatch.setattr( + sys, + "argv", + [ + "enroll", + "diff", + "--old", + "/tmp/old", + "--new", + "/tmp/new", + "--webhook", + "https://example.invalid/h", + ], + ) + with pytest.raises(SystemExit) as e: + cli.main() + assert "HTTP 500" in str(e.value) diff --git a/tests/test_explain.py b/tests/test_explain.py new file mode 100644 index 0000000..69f4a88 --- /dev/null +++ b/tests/test_explain.py @@ -0,0 +1,222 @@ +from __future__ import annotations + +import json +from pathlib import Path + +import enroll.explain as ex + + +def _write_state(bundle: Path, state: dict) -> Path: + bundle.mkdir(parents=True, exist_ok=True) + (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") + return bundle / "state.json" + + +def test_explain_state_text_renders_roles_inventory_and_reasons(tmp_path: Path): + bundle = tmp_path / "bundle" + state = { + "schema_version": 3, + "host": {"hostname": "h1", "os": "debian", "pkg_backend": "dpkg"}, + "enroll": {"version": "0.0.0"}, + "inventory": { + "packages": { + "foo": { + "installations": [{"version": "1.0", "arch": "amd64"}], + "observed_via": [ + {"kind": "systemd_unit", "ref": "foo.service"}, + {"kind": "package_role", "ref": "foo"}, + ], + "roles": ["foo"], + }, + "bar": { + "installations": [{"version": "2.0", "arch": "amd64"}], + "observed_via": [{"kind": "user_installed", "ref": "manual"}], + "roles": ["bar"], + }, + } + }, + "roles": { + "users": { + "role_name": "users", + "users": [{"name": "alice"}], + "managed_files": [ + { + "path": "/home/alice/.ssh/authorized_keys", + "src_rel": "home/alice/.ssh/authorized_keys", + "owner": "alice", + "group": "alice", + "mode": "0600", + "reason": "authorized_keys", + } + ], + "managed_dirs": [ + { + "path": "/home/alice/.ssh", + "owner": "alice", + "group": "alice", + "mode": "0700", + "reason": "parent_of_managed_file", + } + ], + "excluded": [{"path": "/etc/shadow", "reason": "sensitive_content"}], + "notes": ["n1", "n2"], + }, + "services": [ + { + "unit": "foo.service", + "role_name": "foo", + "packages": ["foo"], + "managed_files": [ + { + "path": "/etc/foo.conf", + "src_rel": "etc/foo.conf", + "owner": "root", + "group": "root", + "mode": "0644", + "reason": "modified_conffile", + }, + # Unknown reason should fall back to generic text. + { + "path": "/etc/odd.conf", + "src_rel": "etc/odd.conf", + "owner": "root", + "group": "root", + "mode": "0644", + "reason": "mystery_reason", + }, + ], + "excluded": [], + "notes": [], + } + ], + "packages": [ + { + "package": "bar", + "role_name": "bar", + "managed_files": [], + "excluded": [], + "notes": [], + } + ], + "extra_paths": { + "role_name": "extra_paths", + "include_patterns": ["/etc/a", "/etc/b"], + "exclude_patterns": ["/etc/x", "/etc/y"], + "managed_files": [], + "excluded": [], + "notes": [], + }, + "apt_config": { + "role_name": "apt_config", + "managed_files": [], + "excluded": [], + "notes": [], + }, + "dnf_config": { + "role_name": "dnf_config", + "managed_files": [], + "excluded": [], + "notes": [], + }, + "etc_custom": { + "role_name": "etc_custom", + "managed_files": [], + "excluded": [], + "notes": [], + }, + "usr_local_custom": { + "role_name": "usr_local_custom", + "managed_files": [], + "excluded": [], + "notes": [], + }, + }, + } + + state_path = _write_state(bundle, state) + + out = ex.explain_state(str(state_path), fmt="text", max_examples=1) + + assert "Enroll explained:" in out + assert "Host: h1" in out + assert "Inventory" in out + # observed_via summary should include both kinds (order not strictly guaranteed) + assert "observed_via" in out + assert "systemd_unit" in out + assert "user_installed" in out + + # extra_paths include/exclude patterns should be rendered with max_examples truncation. + assert "include_patterns:" in out + assert "/etc/a" in out + assert "exclude_patterns:" in out + + # Reasons section should mention known and unknown reasons. + assert "modified_conffile" in out + assert "mystery_reason" in out + assert "Captured with reason 'mystery_reason'" in out + + # Excluded paths section. + assert "Why paths were excluded" in out + assert "sensitive_content" in out + + +def test_explain_state_json_contains_structured_report(tmp_path: Path): + bundle = tmp_path / "bundle" + state = { + "schema_version": 3, + "host": {"hostname": "h2", "os": "rhel", "pkg_backend": "rpm"}, + "enroll": {"version": "1.2.3"}, + "inventory": {"packages": {}}, + "roles": { + "users": { + "role_name": "users", + "users": [], + "managed_files": [], + "excluded": [], + "notes": [], + }, + "services": [], + "packages": [], + "apt_config": { + "role_name": "apt_config", + "managed_files": [], + "excluded": [], + "notes": [], + }, + "dnf_config": { + "role_name": "dnf_config", + "managed_files": [], + "excluded": [], + "notes": [], + }, + "etc_custom": { + "role_name": "etc_custom", + "managed_files": [], + "excluded": [], + "notes": [], + }, + "usr_local_custom": { + "role_name": "usr_local_custom", + "managed_files": [], + "excluded": [], + "notes": [], + }, + "extra_paths": { + "role_name": "extra_paths", + "include_patterns": [], + "exclude_patterns": [], + "managed_files": [], + "excluded": [], + "notes": [], + }, + }, + } + state_path = _write_state(bundle, state) + + raw = ex.explain_state(str(state_path), fmt="json", max_examples=2) + rep = json.loads(raw) + assert rep["host"]["hostname"] == "h2" + assert rep["enroll"]["version"] == "1.2.3" + assert rep["inventory"]["package_count"] == 0 + assert isinstance(rep["roles"], list) + assert "reasons" in rep diff --git a/tests/test_harvest_cron_logrotate.py b/tests/test_harvest_cron_logrotate.py new file mode 100644 index 0000000..d20d371 --- /dev/null +++ b/tests/test_harvest_cron_logrotate.py @@ -0,0 +1,164 @@ +from __future__ import annotations + +import json +from pathlib import Path + +import enroll.harvest as h +from enroll.platform import PlatformInfo +from enroll.systemd import UnitInfo + + +class AllowAllPolicy: + def deny_reason(self, path: str): + return None + + +class FakeBackend: + def __init__( + self, + *, + name: str, + installed: dict[str, list[dict[str, str]]], + manual: list[str], + ): + self.name = name + self._installed = dict(installed) + self._manual = list(manual) + + def build_etc_index(self): + # No package ownership information needed for this test. + return set(), {}, {}, {} + + def installed_packages(self): + return dict(self._installed) + + def list_manual_packages(self): + return list(self._manual) + + def owner_of_path(self, path: str): + return None + + def specific_paths_for_hints(self, hints: set[str]): + return [] + + def is_pkg_config_path(self, path: str) -> bool: + return False + + def modified_paths(self, pkg: str, etc_paths: list[str]): + return {} + + +def test_harvest_unifies_cron_and_logrotate_into_dedicated_package_roles( + monkeypatch, tmp_path: Path +): + bundle = tmp_path / "bundle" + + # Fake files we want harvested. + files = { + "/etc/crontab": b"* * * * * root echo hi\n", + "/etc/cron.d/php": b"# php cron\n", + "/var/spool/cron/crontabs/alice": b"@daily echo user\n", + "/etc/logrotate.conf": b"weekly\n", + "/etc/logrotate.d/rsyslog": b"/var/log/syslog { rotate 7 }\n", + } + + monkeypatch.setattr(h.os.path, "islink", lambda p: False) + monkeypatch.setattr(h.os.path, "isfile", lambda p: p in files) + monkeypatch.setattr(h.os.path, "isdir", lambda p: False) + monkeypatch.setattr(h.os.path, "exists", lambda p: (p in files) or False) + + # Expand cron/logrotate globs deterministically. + def fake_iter_matching(spec: str, cap: int = 10000): + mapping = { + "/etc/crontab": ["/etc/crontab"], + "/etc/cron.d/*": ["/etc/cron.d/php"], + "/etc/cron.hourly/*": [], + "/etc/cron.daily/*": [], + "/etc/cron.weekly/*": [], + "/etc/cron.monthly/*": [], + "/etc/cron.allow": [], + "/etc/cron.deny": [], + "/etc/anacrontab": [], + "/etc/anacron/*": [], + "/var/spool/cron/*": [], + "/var/spool/cron/crontabs/*": ["/var/spool/cron/crontabs/alice"], + "/var/spool/crontabs/*": [], + "/var/spool/anacron/*": [], + "/etc/logrotate.conf": ["/etc/logrotate.conf"], + "/etc/logrotate.d/*": ["/etc/logrotate.d/rsyslog"], + } + return list(mapping.get(spec, []))[:cap] + + monkeypatch.setattr(h, "_iter_matching_files", fake_iter_matching) + + # Avoid real system probing. + monkeypatch.setattr( + h, "detect_platform", lambda: PlatformInfo("debian", "dpkg", {}) + ) + backend = FakeBackend( + name="dpkg", + installed={ + "cron": [{"version": "1", "arch": "amd64"}], + "logrotate": [{"version": "1", "arch": "amd64"}], + }, + # Include cron/logrotate in manual packages to ensure they are skipped in the generic loop. + manual=["cron", "logrotate"], + ) + monkeypatch.setattr(h, "get_backend", lambda info=None: backend) + + # Include a service that would collide with cron role naming. + monkeypatch.setattr( + h, "list_enabled_services", lambda: ["cron.service", "foo.service"] + ) + monkeypatch.setattr(h, "list_enabled_timers", lambda: []) + monkeypatch.setattr( + h, + "get_unit_info", + lambda unit: UnitInfo( + name=unit, + fragment_path=None, + dropin_paths=[], + env_files=[], + exec_paths=[], + active_state="active", + sub_state="running", + unit_file_state="enabled", + condition_result=None, + ), + ) + monkeypatch.setattr(h, "collect_non_system_users", lambda: []) + monkeypatch.setattr( + h, + "stat_triplet", + lambda p: ("alice" if "alice" in p else "root", "root", "0644"), + ) + + # Avoid needing real source files by implementing our own bundle copier. + def fake_copy(bundle_dir: str, role_name: str, abs_path: str, src_rel: str): + dst = Path(bundle_dir) / "artifacts" / role_name / src_rel + dst.parent.mkdir(parents=True, exist_ok=True) + dst.write_bytes(files.get(abs_path, b"")) + + monkeypatch.setattr(h, "_copy_into_bundle", fake_copy) + + state_path = h.harvest(str(bundle), policy=AllowAllPolicy()) + st = json.loads(Path(state_path).read_text(encoding="utf-8")) + + # cron.service must be skipped to avoid colliding with the dedicated "cron" package role. + svc_units = [s["unit"] for s in st["roles"]["services"]] + assert "cron.service" not in svc_units + assert "foo.service" in svc_units + + pkgs = st["roles"]["packages"] + cron = next(p for p in pkgs if p["role_name"] == "cron") + logrotate = next(p for p in pkgs if p["role_name"] == "logrotate") + + cron_paths = {mf["path"] for mf in cron["managed_files"]} + assert "/etc/crontab" in cron_paths + assert "/etc/cron.d/php" in cron_paths + # user crontab captured + assert "/var/spool/cron/crontabs/alice" in cron_paths + + lr_paths = {mf["path"] for mf in logrotate["managed_files"]} + assert "/etc/logrotate.conf" in lr_paths + assert "/etc/logrotate.d/rsyslog" in lr_paths diff --git a/tests/test_harvest_helpers.py b/tests/test_harvest_helpers.py new file mode 100644 index 0000000..531a62c --- /dev/null +++ b/tests/test_harvest_helpers.py @@ -0,0 +1,170 @@ +from __future__ import annotations + +import os +from pathlib import Path + +import enroll.harvest as h + + +def test_iter_matching_files_skips_symlinks_and_walks_dirs(monkeypatch, tmp_path: Path): + # Layout: + # root/real.txt (file) + # root/sub/nested.txt + # root/link -> ... (ignored) + root = tmp_path / "root" + (root / "sub").mkdir(parents=True) + (root / "real.txt").write_text("a", encoding="utf-8") + (root / "sub" / "nested.txt").write_text("b", encoding="utf-8") + + paths = { + str(root): "dir", + str(root / "real.txt"): "file", + str(root / "sub"): "dir", + str(root / "sub" / "nested.txt"): "file", + str(root / "link"): "link", + } + + monkeypatch.setattr(h.glob, "glob", lambda spec: [str(root), str(root / "link")]) + monkeypatch.setattr(h.os.path, "islink", lambda p: paths.get(p) == "link") + monkeypatch.setattr(h.os.path, "isfile", lambda p: paths.get(p) == "file") + monkeypatch.setattr(h.os.path, "isdir", lambda p: paths.get(p) == "dir") + monkeypatch.setattr( + h.os, + "walk", + lambda p: [ + (str(root), ["sub"], ["real.txt", "link"]), + (str(root / "sub"), [], ["nested.txt"]), + ], + ) + + out = h._iter_matching_files("/whatever/*", cap=100) + assert str(root / "real.txt") in out + assert str(root / "sub" / "nested.txt") in out + assert str(root / "link") not in out + + +def test_parse_apt_signed_by_extracts_keyrings(tmp_path: Path): + f1 = tmp_path / "a.list" + f1.write_text( + "deb [signed-by=/usr/share/keyrings/foo.gpg] https://example.invalid stable main\n", + encoding="utf-8", + ) + f2 = tmp_path / "b.sources" + f2.write_text( + "Types: deb\nSigned-By: /etc/apt/keyrings/bar.gpg, /usr/share/keyrings/baz.gpg\n", + encoding="utf-8", + ) + f3 = tmp_path / "c.sources" + f3.write_text("Signed-By: | /bin/echo nope\n", encoding="utf-8") + + out = h._parse_apt_signed_by([str(f1), str(f2), str(f3)]) + assert "/usr/share/keyrings/foo.gpg" in out + assert "/etc/apt/keyrings/bar.gpg" in out + assert "/usr/share/keyrings/baz.gpg" in out + + +def test_iter_apt_capture_paths_includes_signed_by_keyring(monkeypatch): + # Simulate: + # /etc/apt/apt.conf.d/00test + # /etc/apt/sources.list.d/test.list (signed-by outside /etc/apt) + # /usr/share/keyrings/ext.gpg + files = { + "/etc/apt/apt.conf.d/00test": "file", + "/etc/apt/sources.list.d/test.list": "file", + "/usr/share/keyrings/ext.gpg": "file", + } + + monkeypatch.setattr(h.os.path, "isdir", lambda p: p in {"/etc/apt"}) + monkeypatch.setattr( + h.os, + "walk", + lambda root: [ + ("/etc/apt", ["apt.conf.d", "sources.list.d"], []), + ("/etc/apt/apt.conf.d", [], ["00test"]), + ("/etc/apt/sources.list.d", [], ["test.list"]), + ], + ) + monkeypatch.setattr(h.os.path, "islink", lambda p: False) + monkeypatch.setattr(h.os.path, "isfile", lambda p: files.get(p) == "file") + + # Only treat the sources glob as having a hit. + def fake_iter_matching(spec: str, cap: int = 10000): + if spec == "/etc/apt/sources.list.d/*.list": + return ["/etc/apt/sources.list.d/test.list"] + return [] + + monkeypatch.setattr(h, "_iter_matching_files", fake_iter_matching) + + # Provide file contents for the sources file. + real_open = open + + def fake_open(path, *a, **k): + if path == "/etc/apt/sources.list.d/test.list": + return real_open(os.devnull, "r", encoding="utf-8") # placeholder + return real_open(path, *a, **k) + + # Easier: patch _parse_apt_signed_by directly to avoid filesystem reads. + monkeypatch.setattr( + h, "_parse_apt_signed_by", lambda sfs: {"/usr/share/keyrings/ext.gpg"} + ) + + out = h._iter_apt_capture_paths() + paths = {p for p, _r in out} + reasons = {p: r for p, r in out} + assert "/etc/apt/apt.conf.d/00test" in paths + assert "/etc/apt/sources.list.d/test.list" in paths + assert "/usr/share/keyrings/ext.gpg" in paths + assert reasons["/usr/share/keyrings/ext.gpg"] == "apt_signed_by_keyring" + + +def test_iter_dnf_capture_paths(monkeypatch): + files = { + "/etc/dnf/dnf.conf": "file", + "/etc/yum/yum.conf": "file", + "/etc/yum.conf": "file", + "/etc/yum.repos.d/test.repo": "file", + "/etc/pki/rpm-gpg/RPM-GPG-KEY": "file", + } + + def isdir(p): + return p in {"/etc/dnf", "/etc/yum", "/etc/yum.repos.d", "/etc/pki/rpm-gpg"} + + def walk(root): + if root == "/etc/dnf": + return [("/etc/dnf", [], ["dnf.conf"])] + if root == "/etc/yum": + return [("/etc/yum", [], ["yum.conf"])] + if root == "/etc/pki/rpm-gpg": + return [("/etc/pki/rpm-gpg", [], ["RPM-GPG-KEY"])] + return [] + + monkeypatch.setattr(h.os.path, "isdir", isdir) + monkeypatch.setattr(h.os, "walk", walk) + monkeypatch.setattr(h.os.path, "islink", lambda p: False) + monkeypatch.setattr(h.os.path, "isfile", lambda p: files.get(p) == "file") + monkeypatch.setattr( + h, + "_iter_matching_files", + lambda spec, cap=10000: ( + ["/etc/yum.repos.d/test.repo"] if spec.endswith("*.repo") else [] + ), + ) + + out = h._iter_dnf_capture_paths() + paths = {p for p, _r in out} + assert "/etc/dnf/dnf.conf" in paths + assert "/etc/yum/yum.conf" in paths + assert "/etc/yum.conf" in paths + assert "/etc/yum.repos.d/test.repo" in paths + assert "/etc/pki/rpm-gpg/RPM-GPG-KEY" in paths + + +def test_iter_system_capture_paths_dedupes_first_reason(monkeypatch): + monkeypatch.setattr(h, "_SYSTEM_CAPTURE_GLOBS", [("/a", "r1"), ("/b", "r2")]) + monkeypatch.setattr( + h, + "_iter_matching_files", + lambda spec, cap=10000: ["/dup"] if spec in {"/a", "/b"} else [], + ) + out = h._iter_system_capture_paths() + assert out == [("/dup", "r1")] diff --git a/tests/test_manifest.py b/tests/test_manifest.py index fec9cc3..8b34fcb 100644 --- a/tests/test_manifest.py +++ b/tests/test_manifest.py @@ -1,7 +1,12 @@ import json from pathlib import Path -from enroll.manifest import manifest +import os +import stat +import tarfile +import pytest + +import enroll.manifest as manifest def test_manifest_writes_roles_and_playbook_with_clean_when(tmp_path: Path): @@ -176,7 +181,7 @@ def test_manifest_writes_roles_and_playbook_with_clean_when(tmp_path: Path): bundle / "artifacts" / "usr_local_custom" / "usr" / "local" / "bin" / "myscript" ).write_text("#!/bin/sh\necho hi\n", encoding="utf-8") - manifest(str(bundle), str(out)) + manifest.manifest(str(bundle), str(out)) # Service role: systemd management should be gated on foo_manage_unit and a probe. tasks = (out / "roles" / "foo" / "tasks" / "main.yml").read_text(encoding="utf-8") @@ -345,7 +350,7 @@ def test_manifest_site_mode_creates_host_inventory_and_raw_files(tmp_path: Path) / "myapp.conf" ).write_text("myapp=1\n", encoding="utf-8") - manifest(str(bundle), str(out), fqdn=fqdn) + manifest.manifest(str(bundle), str(out), fqdn=fqdn) # Host playbook exists. assert (out / "playbooks" / f"{fqdn}.yml").exists() @@ -482,7 +487,7 @@ def test_manifest_includes_dnf_config_role_when_present(tmp_path: Path): bundle.mkdir(parents=True, exist_ok=True) (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") - manifest(str(bundle), str(out)) + manifest.manifest(str(bundle), str(out)) pb = (out / "playbook.yml").read_text(encoding="utf-8") assert "- dnf_config" in pb @@ -502,3 +507,291 @@ def test_render_install_packages_tasks_contains_dnf_branch(): assert "ansible.builtin.dnf" in txt assert "ansible.builtin.package" in txt assert "pkg_mgr" in txt + + +def test_manifest_orders_cron_and_logrotate_at_playbook_tail(tmp_path: Path): + """Cron/logrotate roles should appear at the end. + + The cron role may restore per-user crontabs under /var/spool, so it should + run after users have been created. + """ + + bundle = tmp_path / "bundle" + out = tmp_path / "ansible" + + state = { + "schema_version": 3, + "host": {"hostname": "test", "os": "debian", "pkg_backend": "dpkg"}, + "inventory": {"packages": {}}, + "roles": { + "users": { + "role_name": "users", + "users": [{"name": "alice"}], + "managed_files": [], + "excluded": [], + "notes": [], + }, + "services": [], + "packages": [ + { + "package": "curl", + "role_name": "curl", + "managed_files": [], + "excluded": [], + "notes": [], + }, + { + "package": "cron", + "role_name": "cron", + "managed_files": [ + { + "path": "/var/spool/cron/crontabs/alice", + "src_rel": "var/spool/cron/crontabs/alice", + "owner": "alice", + "group": "root", + "mode": "0600", + "reason": "system_cron", + } + ], + "excluded": [], + "notes": [], + }, + { + "package": "logrotate", + "role_name": "logrotate", + "managed_files": [ + { + "path": "/etc/logrotate.conf", + "src_rel": "etc/logrotate.conf", + "owner": "root", + "group": "root", + "mode": "0644", + "reason": "system_logrotate", + } + ], + "excluded": [], + "notes": [], + }, + ], + "apt_config": { + "role_name": "apt_config", + "managed_files": [], + "excluded": [], + "notes": [], + }, + "dnf_config": { + "role_name": "dnf_config", + "managed_files": [], + "excluded": [], + "notes": [], + }, + "etc_custom": { + "role_name": "etc_custom", + "managed_files": [], + "excluded": [], + "notes": [], + }, + "usr_local_custom": { + "role_name": "usr_local_custom", + "managed_files": [], + "excluded": [], + "notes": [], + }, + "extra_paths": { + "role_name": "extra_paths", + "include_patterns": [], + "exclude_patterns": [], + "managed_files": [], + "excluded": [], + "notes": [], + }, + }, + } + + # Minimal artifacts for managed files. + (bundle / "artifacts" / "cron" / "var" / "spool" / "cron" / "crontabs").mkdir( + parents=True, exist_ok=True + ) + ( + bundle / "artifacts" / "cron" / "var" / "spool" / "cron" / "crontabs" / "alice" + ).write_text("@daily echo hi\n", encoding="utf-8") + (bundle / "artifacts" / "logrotate" / "etc").mkdir(parents=True, exist_ok=True) + (bundle / "artifacts" / "logrotate" / "etc" / "logrotate.conf").write_text( + "weekly\n", encoding="utf-8" + ) + + bundle.mkdir(parents=True, exist_ok=True) + (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") + + manifest.manifest(str(bundle), str(out)) + + pb = (out / "playbook.yml").read_text(encoding="utf-8").splitlines() + # Roles are emitted as indented list items under the `roles:` key. + roles = [ + ln.strip().removeprefix("- ").strip() for ln in pb if ln.startswith(" - ") + ] + + # Ensure tail ordering. + assert roles[-2:] == ["cron", "logrotate"] + assert "users" in roles + assert roles.index("users") < roles.index("cron") + + +def test_yaml_helpers_fallback_when_yaml_unavailable(monkeypatch): + monkeypatch.setattr(manifest, "_try_yaml", lambda: None) + assert manifest._yaml_load_mapping("foo: 1\n") == {} + out = manifest._yaml_dump_mapping({"b": 2, "a": 1}) + # Best-effort fallback is key: repr(value) + assert out.splitlines()[0].startswith("a: ") + assert out.endswith("\n") + + +def test_copy2_replace_makes_readonly_sources_user_writable( + monkeypatch, tmp_path: Path +): + src = tmp_path / "src.txt" + dst = tmp_path / "dst.txt" + src.write_text("hello", encoding="utf-8") + # Make source read-only; copy2 preserves mode, so tmp will be read-only too. + os.chmod(src, 0o444) + + manifest._copy2_replace(str(src), str(dst)) + + st = os.stat(dst, follow_symlinks=False) + assert stat.S_IMODE(st.st_mode) & stat.S_IWUSR + + +def test_prepare_bundle_dir_sops_decrypts_and_extracts(monkeypatch, tmp_path: Path): + enc = tmp_path / "harvest.tar.gz.sops" + enc.write_text("ignored", encoding="utf-8") + + def fake_require(): + return None + + def fake_decrypt(src: str, dst: str, *, mode: int = 0o600): + # Create a minimal tar.gz with a state.json file. + with tarfile.open(dst, "w:gz") as tf: + p = tmp_path / "state.json" + p.write_text("{}", encoding="utf-8") + tf.add(p, arcname="state.json") + + monkeypatch.setattr(manifest, "require_sops_cmd", fake_require) + monkeypatch.setattr(manifest, "decrypt_file_binary_to", fake_decrypt) + + bundle_dir, td = manifest._prepare_bundle_dir(str(enc), sops_mode=True) + try: + assert (Path(bundle_dir) / "state.json").exists() + finally: + td.cleanup() + + +def test_prepare_bundle_dir_rejects_non_dir_without_sops(tmp_path: Path): + fp = tmp_path / "bundle.tar.gz" + fp.write_text("x", encoding="utf-8") + with pytest.raises(RuntimeError): + manifest._prepare_bundle_dir(str(fp), sops_mode=False) + + +def test_tar_dir_to_with_progress_writes_progress_when_tty(monkeypatch, tmp_path: Path): + src = tmp_path / "dir" + src.mkdir() + (src / "a.txt").write_text("a", encoding="utf-8") + (src / "b.txt").write_text("b", encoding="utf-8") + + out = tmp_path / "out.tar.gz" + writes: list[bytes] = [] + + monkeypatch.setattr(manifest.os, "isatty", lambda fd: True) + monkeypatch.setattr(manifest.os, "write", lambda fd, b: writes.append(b) or len(b)) + + manifest._tar_dir_to_with_progress(str(src), str(out), desc="tarring") + assert out.exists() + assert writes # progress was written + assert writes[-1].endswith(b"\n") + + +def test_encrypt_manifest_out_dir_to_sops_handles_missing_tmp_cleanup( + monkeypatch, tmp_path: Path +): + src_dir = tmp_path / "manifest" + src_dir.mkdir() + (src_dir / "x.txt").write_text("x", encoding="utf-8") + + out = tmp_path / "manifest.tar.gz.sops" + + monkeypatch.setattr(manifest, "require_sops_cmd", lambda: None) + + def fake_encrypt(in_fp, out_fp, *args, **kwargs): + Path(out_fp).write_text("enc", encoding="utf-8") + + monkeypatch.setattr(manifest, "encrypt_file_binary", fake_encrypt) + # Simulate race where tmp tar is already removed. + monkeypatch.setattr( + manifest.os, "unlink", lambda p: (_ for _ in ()).throw(FileNotFoundError()) + ) + + res = manifest._encrypt_manifest_out_dir_to_sops(str(src_dir), str(out), ["ABC"]) # type: ignore[arg-type] + assert str(res).endswith(".sops") + assert out.exists() + + +def test_manifest_applies_jinjaturtle_to_jinjifyable_managed_file( + monkeypatch, tmp_path: Path +): + # Create a minimal bundle with just an apt_config snapshot. + bundle = tmp_path / "bundle" + (bundle / "artifacts" / "apt_config" / "etc" / "apt").mkdir(parents=True) + (bundle / "artifacts" / "apt_config" / "etc" / "apt" / "foo.ini").write_text( + "key=VALUE\n", encoding="utf-8" + ) + + state = { + "schema_version": 1, + "inventory": {"packages": {}}, + "roles": { + "services": [], + "packages": [], + "apt_config": { + "role_name": "apt_config", + "managed_files": [ + { + "path": "/etc/apt/foo.ini", + "src_rel": "etc/apt/foo.ini", + "owner": "root", + "group": "root", + "mode": "0644", + "reason": "apt_config", + } + ], + "managed_dirs": [], + "excluded": [], + "notes": [], + }, + }, + } + (bundle / "state.json").write_text( + __import__("json").dumps(state), encoding="utf-8" + ) + + monkeypatch.setattr(manifest, "find_jinjaturtle_cmd", lambda: "jinjaturtle") + + class _Res: + template_text = "key={{ foo }}\n" + vars_text = "foo: 123\n" + + monkeypatch.setattr(manifest, "run_jinjaturtle", lambda *a, **k: _Res()) + + out_dir = tmp_path / "out" + manifest.manifest(str(bundle), str(out_dir), jinjaturtle="on") + + tmpl = out_dir / "roles" / "apt_config" / "templates" / "etc" / "apt" / "foo.ini.j2" + assert tmpl.exists() + assert "{{ foo }}" in tmpl.read_text(encoding="utf-8") + + defaults = out_dir / "roles" / "apt_config" / "defaults" / "main.yml" + txt = defaults.read_text(encoding="utf-8") + assert "foo: 123" in txt + # Non-templated file should not exist under files/. + assert not ( + out_dir / "roles" / "apt_config" / "files" / "etc" / "apt" / "foo.ini" + ).exists() diff --git a/tests/test_misc_coverage.py b/tests/test_misc_coverage.py index b4250fc..1ff6e98 100644 --- a/tests/test_misc_coverage.py +++ b/tests/test_misc_coverage.py @@ -1,5 +1,13 @@ +from __future__ import annotations + +import json +import os import stat +import subprocess +import sys +import types from pathlib import Path +from types import SimpleNamespace import pytest @@ -94,3 +102,315 @@ def test_sops_pgp_arg_and_encrypt_decrypt_roundtrip(tmp_path: Path, monkeypatch) # Sanity: we invoked encrypt and decrypt. assert any("--encrypt" in c for c in calls) assert any("--decrypt" in c for c in calls) + + +def test_cache_dir_defaults_to_home_cache(monkeypatch, tmp_path: Path): + # Ensure default path uses ~/.cache when XDG_CACHE_HOME is unset. + from enroll.cache import enroll_cache_dir + + monkeypatch.delenv("XDG_CACHE_HOME", raising=False) + monkeypatch.setattr(Path, "home", lambda: tmp_path) + + p = enroll_cache_dir() + assert str(p).startswith(str(tmp_path)) + assert p.name == "enroll" + + +def test_harvest_cache_state_json_property(tmp_path: Path): + from enroll.cache import HarvestCache + + hc = HarvestCache(tmp_path / "h1") + assert hc.state_json == hc.dir / "state.json" + + +def test_cache_dir_security_rejects_symlink(tmp_path: Path): + from enroll.cache import _ensure_dir_secure + + real = tmp_path / "real" + real.mkdir() + link = tmp_path / "link" + link.symlink_to(real, target_is_directory=True) + + with pytest.raises(RuntimeError, match="Refusing to use symlink"): + _ensure_dir_secure(link) + + +def test_cache_dir_chmod_failures_are_ignored(monkeypatch, tmp_path: Path): + from enroll import cache + + # Make the cache base path deterministic and writable. + monkeypatch.setattr(cache, "enroll_cache_dir", lambda: tmp_path) + + # Force os.chmod to fail to cover the "except OSError: pass" paths. + monkeypatch.setattr( + os, "chmod", lambda *a, **k: (_ for _ in ()).throw(OSError("nope")) + ) + + hc = cache.new_harvest_cache_dir() + assert hc.dir.exists() + assert hc.dir.is_dir() + + +def test_stat_triplet_falls_back_to_numeric_ids(monkeypatch, tmp_path: Path): + from enroll.fsutil import stat_triplet + import pwd + import grp + + p = tmp_path / "x" + p.write_text("x", encoding="utf-8") + + # Force username/group resolution failures. + monkeypatch.setattr( + pwd, "getpwuid", lambda _uid: (_ for _ in ()).throw(KeyError("no user")) + ) + monkeypatch.setattr( + grp, "getgrgid", lambda _gid: (_ for _ in ()).throw(KeyError("no group")) + ) + + owner, group, mode = stat_triplet(str(p)) + assert owner.isdigit() + assert group.isdigit() + assert len(mode) == 4 + + +def test_ignore_policy_iter_effective_lines_removes_block_comments(): + from enroll.ignore import IgnorePolicy + + pol = IgnorePolicy() + data = b"""keep1 +/* +drop me +*/ +keep2 +""" + assert list(pol.iter_effective_lines(data)) == [b"keep1", b"keep2"] + + +def test_ignore_policy_deny_reason_dir_variants(tmp_path: Path): + from enroll.ignore import IgnorePolicy + + pol = IgnorePolicy() + + # denied by glob + assert pol.deny_reason_dir("/etc/shadow") == "denied_path" + + # symlink rejected + d = tmp_path / "d" + d.mkdir() + link = tmp_path / "l" + link.symlink_to(d, target_is_directory=True) + assert pol.deny_reason_dir(str(link)) == "symlink" + + # not a directory + f = tmp_path / "f" + f.write_text("x", encoding="utf-8") + assert pol.deny_reason_dir(str(f)) == "not_directory" + + # ok + assert pol.deny_reason_dir(str(d)) is None + + +def test_run_jinjaturtle_parses_outputs(monkeypatch, tmp_path: Path): + # Fully unit-test enroll.jinjaturtle.run_jinjaturtle by stubbing subprocess.run. + from enroll.jinjaturtle import run_jinjaturtle + + def fake_run(cmd, **kwargs): # noqa: ARG001 + # cmd includes "-d -t