diff --git a/CHANGELOG.md b/CHANGELOG.md index 19906cc..c687249 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,6 @@ * Introduce `enroll explain` - a tool to analyze and explain what's in (or not in) a harvest and why. * Centralise the cron and logrotate stuff into their respective roles, we had a bit of duplication between roles based on harvest discovery. - * Capture other files in the user's home directory such as `.bashrc`, `.bash_aliases`, `.profile`, if these files differ from the `/etc/skel` defaults # 0.2.3 diff --git a/enroll/cli.py b/enroll/cli.py index 829a4ac..5624587 100644 --- a/enroll/cli.py +++ b/enroll/cli.py @@ -914,6 +914,56 @@ def main() -> None: fqdn=args.fqdn, jinjaturtle=_jt_mode(args), ) + elif args.cmd == "diff": + report, has_changes = compare_harvests( + args.old, args.new, sops_mode=bool(getattr(args, "sops", False)) + ) + + rendered = format_report(report, fmt=str(args.format)) + if args.out: + Path(args.out).expanduser().write_text(rendered, encoding="utf-8") + else: + print(rendered, end="") + + do_notify = bool(has_changes or getattr(args, "notify_always", False)) + + if do_notify and getattr(args, "webhook", None): + wf = str(getattr(args, "webhook_format", "json")) + body = format_report(report, fmt=wf).encode("utf-8") + headers = {"User-Agent": "enroll"} + if wf == "json": + headers["Content-Type"] = "application/json" + else: + headers["Content-Type"] = "text/plain; charset=utf-8" + for hv in getattr(args, "webhook_header", []) or []: + if ":" not in hv: + raise SystemExit( + "error: --webhook-header must be in the form 'K:V'" + ) + k, v = hv.split(":", 1) + headers[k.strip()] = v.strip() + status, _ = post_webhook(str(args.webhook), body, headers=headers) + if status and status >= 400: + raise SystemExit(f"error: webhook returned HTTP {status}") + + if do_notify and (getattr(args, "email_to", []) or []): + subject = getattr(args, "email_subject", None) or "enroll diff report" + smtp_password = None + pw_env = getattr(args, "smtp_password_env", None) + if pw_env: + smtp_password = os.environ.get(str(pw_env)) + send_email( + to_addrs=list(getattr(args, "email_to", []) or []), + subject=str(subject), + body=rendered, + from_addr=getattr(args, "email_from", None), + smtp=getattr(args, "smtp", None), + smtp_user=getattr(args, "smtp_user", None), + smtp_password=smtp_password, + ) + + if getattr(args, "exit_code", False) and has_changes: + raise SystemExit(2) except RemoteSudoPasswordRequired: raise SystemExit( "error: remote sudo requires a password. Re-run with --ask-become-pass." diff --git a/enroll/harvest.py b/enroll/harvest.py index 40fe284..6ecf676 100644 --- a/enroll/harvest.py +++ b/enroll/harvest.py @@ -5,7 +5,6 @@ import json import os import re import shutil -import stat import time from dataclasses import dataclass, asdict, field from typing import Dict, List, Optional, Set @@ -158,54 +157,6 @@ MAX_FILES_CAP = 4000 MAX_UNOWNED_FILES_PER_ROLE = 500 -def _files_differ(a: str, b: str, *, max_bytes: int = 2_000_000) -> bool: - """Return True if file `a` differs from file `b`. - - Best-effort and conservative: - - If `b` (baseline) does not exist or is not a regular file, treat as - "different" so we err on the side of capturing user state. - - If we can't stat/read either file, treat as "different" (capture will - later be filtered via IgnorePolicy). - - If files are large, avoid reading them fully. - """ - - try: - st_a = os.stat(a, follow_symlinks=True) - except OSError: - return True - - # Refuse to do content comparisons on non-regular files. - if not stat.S_ISREG(st_a.st_mode): - return True - - try: - st_b = os.stat(b, follow_symlinks=True) - except OSError: - return True - - if not stat.S_ISREG(st_b.st_mode): - return True - - if st_a.st_size != st_b.st_size: - return True - - # If it's unexpectedly big, treat as different to avoid expensive reads. - if st_a.st_size > max_bytes: - return True - - try: - with open(a, "rb") as fa, open(b, "rb") as fb: - while True: - ca = fa.read(1024 * 64) - cb = fb.read(1024 * 64) - if ca != cb: - return True - if not ca: # EOF on both - return False - except OSError: - return True - - def _merge_parent_dirs( existing_dirs: List[ManagedDir], managed_files: List[ManagedFile], @@ -1368,18 +1319,6 @@ def harvest( users_role_name = "users" users_role_seen = seen_by_role.setdefault(users_role_name, set()) - skel_dir = "/etc/skel" - # Dotfiles to harvest for non-system users. For the common "skeleton" - # files, only capture if the user's copy differs from /etc/skel. - skel_dotfiles = [ - (".bashrc", "user_shell_rc"), - (".profile", "user_profile"), - (".bash_logout", "user_shell_logout"), - ] - extra_dotfiles = [ - (".bash_aliases", "user_shell_aliases"), - ] - for u in user_records: users_list.append( { @@ -1414,48 +1353,6 @@ def harvest( seen_global=captured_global, ) - # Capture common per-user shell dotfiles when they differ from /etc/skel. - # These still go through IgnorePolicy and user path filters. - home = (u.home or "").rstrip("/") - if home and home.startswith("/"): - for rel, reason in skel_dotfiles: - upath = os.path.join(home, rel) - if not os.path.exists(upath): - continue - skel_path = os.path.join(skel_dir, rel) - if not _files_differ(upath, skel_path, max_bytes=policy.max_file_bytes): - continue - _capture_file( - bundle_dir=bundle_dir, - role_name=users_role_name, - abs_path=upath, - reason=reason, - policy=policy, - path_filter=path_filter, - managed_out=users_managed, - excluded_out=users_excluded, - seen_role=users_role_seen, - seen_global=captured_global, - ) - - # Capture other common per-user shell files unconditionally if present. - for rel, reason in extra_dotfiles: - upath = os.path.join(home, rel) - if not os.path.exists(upath): - continue - _capture_file( - bundle_dir=bundle_dir, - role_name=users_role_name, - abs_path=upath, - reason=reason, - policy=policy, - path_filter=path_filter, - managed_out=users_managed, - excluded_out=users_excluded, - seen_role=users_role_seen, - seen_global=captured_global, - ) - users_snapshot = UsersSnapshot( role_name=users_role_name, users=users_list, diff --git a/enroll/manifest.py b/enroll/manifest.py index b616fe6..ea38e98 100644 --- a/enroll/manifest.py +++ b/enroll/manifest.py @@ -819,12 +819,7 @@ def _manifest_from_bundle_dir( group = str(u.get("primary_group") or owner) break - # Prefer the harvested file mode so we preserve any deliberate - # permissions (e.g. 0600 for certain dotfiles). For authorized_keys, - # enforce 0600 regardless. - mode = mf.get("mode") or "0644" - if mf.get("reason") == "authorized_keys": - mode = "0600" + mode = "0600" if mf.get("reason") == "authorized_keys" else "0644" ssh_files.append( { "dest": dest, diff --git a/tests.sh b/tests.sh index 23c5ce1..23fe30b 100755 --- a/tests.sh +++ b/tests.sh @@ -27,10 +27,9 @@ poetry run \ enroll harvest --out "${BUNDLE_DIR}2" poetry run \ enroll diff \ - --old "${BUNDLE_DIR}" \ - --new "${BUNDLE_DIR}2" \ - --format json | jq -DEBIAN_FRONTEND=noninteractive apt-get remove --purge cowsay + --old "${BUNDLE_DIR}" \ + --new "${BUNDLE_DIR}2" \ + --format json | jq # Ansible test builtin cd "${ANSIBLE_DIR}" diff --git a/tests/test_cli.py b/tests/test_cli.py index e5c6966..5fc9a66 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,14 +1,9 @@ -from __future__ import annotations import sys import pytest + import enroll.cli as cli -from pathlib import Path - -from enroll.remote import RemoteSudoPasswordRequired -from enroll.sopsutil import SopsError - def test_cli_harvest_subcommand_calls_harvest(monkeypatch, capsys, tmp_path): called = {} @@ -403,286 +398,3 @@ def test_cli_manifest_common_args(monkeypatch, tmp_path): cli.main() assert called["fqdn"] == "example.test" assert called["jinjaturtle"] == "off" - - -def test_cli_explain_passes_args_and_writes_stdout(monkeypatch, capsys, tmp_path): - called = {} - - def fake_explain_state( - harvest: str, - *, - sops_mode: bool = False, - fmt: str = "text", - max_examples: int = 3, - ): - called["harvest"] = harvest - called["sops_mode"] = sops_mode - called["fmt"] = fmt - called["max_examples"] = max_examples - return "EXPLAINED\n" - - monkeypatch.setattr(cli, "explain_state", fake_explain_state) - - monkeypatch.setattr( - sys, - "argv", - [ - "enroll", - "explain", - "--sops", - "--format", - "json", - "--max-examples", - "7", - str(tmp_path / "bundle" / "state.json"), - ], - ) - - cli.main() - out = capsys.readouterr().out - assert out == "EXPLAINED\n" - assert called["sops_mode"] is True - assert called["fmt"] == "json" - assert called["max_examples"] == 7 - - -def test_discover_config_path_missing_config_value_returns_none(monkeypatch): - # Covers the "--config" flag present with no value. - monkeypatch.delenv("ENROLL_CONFIG", raising=False) - monkeypatch.delenv("XDG_CONFIG_HOME", raising=False) - assert cli._discover_config_path(["--config"]) is None - - -def test_discover_config_path_defaults_to_home_config(monkeypatch, tmp_path: Path): - # Covers the Path.home() / ".config" fallback. - monkeypatch.delenv("ENROLL_CONFIG", raising=False) - monkeypatch.delenv("XDG_CONFIG_HOME", raising=False) - monkeypatch.setattr(cli.Path, "home", lambda: tmp_path) - monkeypatch.setattr(cli.Path, "cwd", lambda: tmp_path) - - cp = tmp_path / ".config" / "enroll" / "enroll.ini" - cp.parent.mkdir(parents=True) - cp.write_text("[enroll]\n", encoding="utf-8") - - assert cli._discover_config_path(["harvest"]) == cp - - -def test_cli_harvest_local_sops_encrypts_and_prints_path( - monkeypatch, tmp_path: Path, capsys -): - out_dir = tmp_path / "out" - out_dir.mkdir() - calls: dict[str, object] = {} - - def fake_harvest(bundle_dir: str, **kwargs): - calls["bundle"] = bundle_dir - # Create a minimal state.json so tooling that expects it won't break. - Path(bundle_dir).mkdir(parents=True, exist_ok=True) - (Path(bundle_dir) / "state.json").write_text("{}", encoding="utf-8") - return str(Path(bundle_dir) / "state.json") - - def fake_encrypt(bundle_dir: Path, out_file: Path, fps: list[str]): - calls["encrypt"] = (bundle_dir, out_file, fps) - out_file.write_text("encrypted", encoding="utf-8") - return out_file - - monkeypatch.setattr(cli, "harvest", fake_harvest) - monkeypatch.setattr(cli, "_encrypt_harvest_dir_to_sops", fake_encrypt) - - monkeypatch.setattr( - sys, - "argv", - [ - "enroll", - "harvest", - "--sops", - "ABCDEF", - "--out", - str(out_dir), - ], - ) - cli.main() - - printed = capsys.readouterr().out.strip() - assert printed.endswith("harvest.tar.gz.sops") - assert Path(printed).exists() - assert calls.get("encrypt") - - -def test_cli_harvest_remote_sops_encrypts_and_prints_path( - monkeypatch, tmp_path: Path, capsys -): - out_dir = tmp_path / "out" - out_dir.mkdir() - calls: dict[str, object] = {} - - def fake_remote_harvest(**kwargs): - calls["remote"] = kwargs - # Create a minimal state.json in the temp bundle. - out = Path(kwargs["local_out_dir"]) / "state.json" - out.write_text("{}", encoding="utf-8") - return out - - def fake_encrypt(bundle_dir: Path, out_file: Path, fps: list[str]): - calls["encrypt"] = (bundle_dir, out_file, fps) - out_file.write_text("encrypted", encoding="utf-8") - return out_file - - monkeypatch.setattr(cli, "remote_harvest", fake_remote_harvest) - monkeypatch.setattr(cli, "_encrypt_harvest_dir_to_sops", fake_encrypt) - - monkeypatch.setattr( - sys, - "argv", - [ - "enroll", - "harvest", - "--remote-host", - "example.com", - "--remote-user", - "root", - "--sops", - "ABCDEF", - "--out", - str(out_dir), - ], - ) - cli.main() - - printed = capsys.readouterr().out.strip() - assert printed.endswith("harvest.tar.gz.sops") - assert Path(printed).exists() - assert calls.get("remote") - assert calls.get("encrypt") - - -def test_cli_harvest_remote_password_required_exits_cleanly(monkeypatch): - def boom(**kwargs): - raise RemoteSudoPasswordRequired("pw required") - - monkeypatch.setattr(cli, "remote_harvest", boom) - monkeypatch.setattr( - sys, - "argv", - [ - "enroll", - "harvest", - "--remote-host", - "example.com", - "--remote-user", - "root", - ], - ) - with pytest.raises(SystemExit) as e: - cli.main() - assert "--ask-become-pass" in str(e.value) - - -def test_cli_runtime_error_is_wrapped_as_user_friendly_system_exit(monkeypatch): - def boom(*args, **kwargs): - raise RuntimeError("nope") - - monkeypatch.setattr(cli, "harvest", boom) - monkeypatch.setattr(sys, "argv", ["enroll", "harvest", "--out", "/tmp/x"]) - with pytest.raises(SystemExit) as e: - cli.main() - assert str(e.value) == "error: nope" - - -def test_cli_sops_error_is_wrapped_as_user_friendly_system_exit(monkeypatch): - def boom(*args, **kwargs): - raise SopsError("sops broke") - - monkeypatch.setattr(cli, "manifest", boom) - monkeypatch.setattr( - sys, "argv", ["enroll", "manifest", "--harvest", "/tmp/x", "--out", "/tmp/y"] - ) - with pytest.raises(SystemExit) as e: - cli.main() - assert str(e.value) == "error: sops broke" - - -def test_cli_diff_notifies_webhook_and_email_and_respects_exit_code( - monkeypatch, capsys -): - calls: dict[str, object] = {} - - def fake_compare(old, new, sops_mode=False): - calls["compare"] = (old, new, sops_mode) - return {"dummy": True}, True - - def fake_format(report, fmt="text"): - calls.setdefault("format", []).append((report, fmt)) - return "REPORT\n" - - def fake_post(url, body, headers=None): - calls["webhook"] = (url, body, headers) - return 200, b"ok" - - def fake_email(**kwargs): - calls["email"] = kwargs - - monkeypatch.setattr(cli, "compare_harvests", fake_compare) - monkeypatch.setattr(cli, "format_report", fake_format) - monkeypatch.setattr(cli, "post_webhook", fake_post) - monkeypatch.setattr(cli, "send_email", fake_email) - monkeypatch.setenv("SMTPPW", "secret") - - monkeypatch.setattr( - sys, - "argv", - [ - "enroll", - "diff", - "--old", - "/tmp/old", - "--new", - "/tmp/new", - "--webhook", - "https://example.invalid/h", - "--webhook-header", - "X-Test: ok", - "--email-to", - "a@example.com", - "--smtp-password-env", - "SMTPPW", - "--exit-code", - ], - ) - - with pytest.raises(SystemExit) as e: - cli.main() - assert e.value.code == 2 - - assert calls.get("compare") - assert calls.get("webhook") - assert calls.get("email") - # No report printed when exiting via --exit-code? (we still render and print). - _ = capsys.readouterr() - - -def test_cli_diff_webhook_http_error_raises_system_exit(monkeypatch): - def fake_compare(old, new, sops_mode=False): - return {"dummy": True}, True - - monkeypatch.setattr(cli, "compare_harvests", fake_compare) - monkeypatch.setattr(cli, "format_report", lambda report, fmt="text": "R\n") - monkeypatch.setattr(cli, "post_webhook", lambda url, body, headers=None: (500, b"")) - - monkeypatch.setattr( - sys, - "argv", - [ - "enroll", - "diff", - "--old", - "/tmp/old", - "--new", - "/tmp/new", - "--webhook", - "https://example.invalid/h", - ], - ) - with pytest.raises(SystemExit) as e: - cli.main() - assert "HTTP 500" in str(e.value) diff --git a/tests/test_explain.py b/tests/test_explain.py deleted file mode 100644 index 69f4a88..0000000 --- a/tests/test_explain.py +++ /dev/null @@ -1,222 +0,0 @@ -from __future__ import annotations - -import json -from pathlib import Path - -import enroll.explain as ex - - -def _write_state(bundle: Path, state: dict) -> Path: - bundle.mkdir(parents=True, exist_ok=True) - (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") - return bundle / "state.json" - - -def test_explain_state_text_renders_roles_inventory_and_reasons(tmp_path: Path): - bundle = tmp_path / "bundle" - state = { - "schema_version": 3, - "host": {"hostname": "h1", "os": "debian", "pkg_backend": "dpkg"}, - "enroll": {"version": "0.0.0"}, - "inventory": { - "packages": { - "foo": { - "installations": [{"version": "1.0", "arch": "amd64"}], - "observed_via": [ - {"kind": "systemd_unit", "ref": "foo.service"}, - {"kind": "package_role", "ref": "foo"}, - ], - "roles": ["foo"], - }, - "bar": { - "installations": [{"version": "2.0", "arch": "amd64"}], - "observed_via": [{"kind": "user_installed", "ref": "manual"}], - "roles": ["bar"], - }, - } - }, - "roles": { - "users": { - "role_name": "users", - "users": [{"name": "alice"}], - "managed_files": [ - { - "path": "/home/alice/.ssh/authorized_keys", - "src_rel": "home/alice/.ssh/authorized_keys", - "owner": "alice", - "group": "alice", - "mode": "0600", - "reason": "authorized_keys", - } - ], - "managed_dirs": [ - { - "path": "/home/alice/.ssh", - "owner": "alice", - "group": "alice", - "mode": "0700", - "reason": "parent_of_managed_file", - } - ], - "excluded": [{"path": "/etc/shadow", "reason": "sensitive_content"}], - "notes": ["n1", "n2"], - }, - "services": [ - { - "unit": "foo.service", - "role_name": "foo", - "packages": ["foo"], - "managed_files": [ - { - "path": "/etc/foo.conf", - "src_rel": "etc/foo.conf", - "owner": "root", - "group": "root", - "mode": "0644", - "reason": "modified_conffile", - }, - # Unknown reason should fall back to generic text. - { - "path": "/etc/odd.conf", - "src_rel": "etc/odd.conf", - "owner": "root", - "group": "root", - "mode": "0644", - "reason": "mystery_reason", - }, - ], - "excluded": [], - "notes": [], - } - ], - "packages": [ - { - "package": "bar", - "role_name": "bar", - "managed_files": [], - "excluded": [], - "notes": [], - } - ], - "extra_paths": { - "role_name": "extra_paths", - "include_patterns": ["/etc/a", "/etc/b"], - "exclude_patterns": ["/etc/x", "/etc/y"], - "managed_files": [], - "excluded": [], - "notes": [], - }, - "apt_config": { - "role_name": "apt_config", - "managed_files": [], - "excluded": [], - "notes": [], - }, - "dnf_config": { - "role_name": "dnf_config", - "managed_files": [], - "excluded": [], - "notes": [], - }, - "etc_custom": { - "role_name": "etc_custom", - "managed_files": [], - "excluded": [], - "notes": [], - }, - "usr_local_custom": { - "role_name": "usr_local_custom", - "managed_files": [], - "excluded": [], - "notes": [], - }, - }, - } - - state_path = _write_state(bundle, state) - - out = ex.explain_state(str(state_path), fmt="text", max_examples=1) - - assert "Enroll explained:" in out - assert "Host: h1" in out - assert "Inventory" in out - # observed_via summary should include both kinds (order not strictly guaranteed) - assert "observed_via" in out - assert "systemd_unit" in out - assert "user_installed" in out - - # extra_paths include/exclude patterns should be rendered with max_examples truncation. - assert "include_patterns:" in out - assert "/etc/a" in out - assert "exclude_patterns:" in out - - # Reasons section should mention known and unknown reasons. - assert "modified_conffile" in out - assert "mystery_reason" in out - assert "Captured with reason 'mystery_reason'" in out - - # Excluded paths section. - assert "Why paths were excluded" in out - assert "sensitive_content" in out - - -def test_explain_state_json_contains_structured_report(tmp_path: Path): - bundle = tmp_path / "bundle" - state = { - "schema_version": 3, - "host": {"hostname": "h2", "os": "rhel", "pkg_backend": "rpm"}, - "enroll": {"version": "1.2.3"}, - "inventory": {"packages": {}}, - "roles": { - "users": { - "role_name": "users", - "users": [], - "managed_files": [], - "excluded": [], - "notes": [], - }, - "services": [], - "packages": [], - "apt_config": { - "role_name": "apt_config", - "managed_files": [], - "excluded": [], - "notes": [], - }, - "dnf_config": { - "role_name": "dnf_config", - "managed_files": [], - "excluded": [], - "notes": [], - }, - "etc_custom": { - "role_name": "etc_custom", - "managed_files": [], - "excluded": [], - "notes": [], - }, - "usr_local_custom": { - "role_name": "usr_local_custom", - "managed_files": [], - "excluded": [], - "notes": [], - }, - "extra_paths": { - "role_name": "extra_paths", - "include_patterns": [], - "exclude_patterns": [], - "managed_files": [], - "excluded": [], - "notes": [], - }, - }, - } - state_path = _write_state(bundle, state) - - raw = ex.explain_state(str(state_path), fmt="json", max_examples=2) - rep = json.loads(raw) - assert rep["host"]["hostname"] == "h2" - assert rep["enroll"]["version"] == "1.2.3" - assert rep["inventory"]["package_count"] == 0 - assert isinstance(rep["roles"], list) - assert "reasons" in rep diff --git a/tests/test_harvest_cron_logrotate.py b/tests/test_harvest_cron_logrotate.py deleted file mode 100644 index d20d371..0000000 --- a/tests/test_harvest_cron_logrotate.py +++ /dev/null @@ -1,164 +0,0 @@ -from __future__ import annotations - -import json -from pathlib import Path - -import enroll.harvest as h -from enroll.platform import PlatformInfo -from enroll.systemd import UnitInfo - - -class AllowAllPolicy: - def deny_reason(self, path: str): - return None - - -class FakeBackend: - def __init__( - self, - *, - name: str, - installed: dict[str, list[dict[str, str]]], - manual: list[str], - ): - self.name = name - self._installed = dict(installed) - self._manual = list(manual) - - def build_etc_index(self): - # No package ownership information needed for this test. - return set(), {}, {}, {} - - def installed_packages(self): - return dict(self._installed) - - def list_manual_packages(self): - return list(self._manual) - - def owner_of_path(self, path: str): - return None - - def specific_paths_for_hints(self, hints: set[str]): - return [] - - def is_pkg_config_path(self, path: str) -> bool: - return False - - def modified_paths(self, pkg: str, etc_paths: list[str]): - return {} - - -def test_harvest_unifies_cron_and_logrotate_into_dedicated_package_roles( - monkeypatch, tmp_path: Path -): - bundle = tmp_path / "bundle" - - # Fake files we want harvested. - files = { - "/etc/crontab": b"* * * * * root echo hi\n", - "/etc/cron.d/php": b"# php cron\n", - "/var/spool/cron/crontabs/alice": b"@daily echo user\n", - "/etc/logrotate.conf": b"weekly\n", - "/etc/logrotate.d/rsyslog": b"/var/log/syslog { rotate 7 }\n", - } - - monkeypatch.setattr(h.os.path, "islink", lambda p: False) - monkeypatch.setattr(h.os.path, "isfile", lambda p: p in files) - monkeypatch.setattr(h.os.path, "isdir", lambda p: False) - monkeypatch.setattr(h.os.path, "exists", lambda p: (p in files) or False) - - # Expand cron/logrotate globs deterministically. - def fake_iter_matching(spec: str, cap: int = 10000): - mapping = { - "/etc/crontab": ["/etc/crontab"], - "/etc/cron.d/*": ["/etc/cron.d/php"], - "/etc/cron.hourly/*": [], - "/etc/cron.daily/*": [], - "/etc/cron.weekly/*": [], - "/etc/cron.monthly/*": [], - "/etc/cron.allow": [], - "/etc/cron.deny": [], - "/etc/anacrontab": [], - "/etc/anacron/*": [], - "/var/spool/cron/*": [], - "/var/spool/cron/crontabs/*": ["/var/spool/cron/crontabs/alice"], - "/var/spool/crontabs/*": [], - "/var/spool/anacron/*": [], - "/etc/logrotate.conf": ["/etc/logrotate.conf"], - "/etc/logrotate.d/*": ["/etc/logrotate.d/rsyslog"], - } - return list(mapping.get(spec, []))[:cap] - - monkeypatch.setattr(h, "_iter_matching_files", fake_iter_matching) - - # Avoid real system probing. - monkeypatch.setattr( - h, "detect_platform", lambda: PlatformInfo("debian", "dpkg", {}) - ) - backend = FakeBackend( - name="dpkg", - installed={ - "cron": [{"version": "1", "arch": "amd64"}], - "logrotate": [{"version": "1", "arch": "amd64"}], - }, - # Include cron/logrotate in manual packages to ensure they are skipped in the generic loop. - manual=["cron", "logrotate"], - ) - monkeypatch.setattr(h, "get_backend", lambda info=None: backend) - - # Include a service that would collide with cron role naming. - monkeypatch.setattr( - h, "list_enabled_services", lambda: ["cron.service", "foo.service"] - ) - monkeypatch.setattr(h, "list_enabled_timers", lambda: []) - monkeypatch.setattr( - h, - "get_unit_info", - lambda unit: UnitInfo( - name=unit, - fragment_path=None, - dropin_paths=[], - env_files=[], - exec_paths=[], - active_state="active", - sub_state="running", - unit_file_state="enabled", - condition_result=None, - ), - ) - monkeypatch.setattr(h, "collect_non_system_users", lambda: []) - monkeypatch.setattr( - h, - "stat_triplet", - lambda p: ("alice" if "alice" in p else "root", "root", "0644"), - ) - - # Avoid needing real source files by implementing our own bundle copier. - def fake_copy(bundle_dir: str, role_name: str, abs_path: str, src_rel: str): - dst = Path(bundle_dir) / "artifacts" / role_name / src_rel - dst.parent.mkdir(parents=True, exist_ok=True) - dst.write_bytes(files.get(abs_path, b"")) - - monkeypatch.setattr(h, "_copy_into_bundle", fake_copy) - - state_path = h.harvest(str(bundle), policy=AllowAllPolicy()) - st = json.loads(Path(state_path).read_text(encoding="utf-8")) - - # cron.service must be skipped to avoid colliding with the dedicated "cron" package role. - svc_units = [s["unit"] for s in st["roles"]["services"]] - assert "cron.service" not in svc_units - assert "foo.service" in svc_units - - pkgs = st["roles"]["packages"] - cron = next(p for p in pkgs if p["role_name"] == "cron") - logrotate = next(p for p in pkgs if p["role_name"] == "logrotate") - - cron_paths = {mf["path"] for mf in cron["managed_files"]} - assert "/etc/crontab" in cron_paths - assert "/etc/cron.d/php" in cron_paths - # user crontab captured - assert "/var/spool/cron/crontabs/alice" in cron_paths - - lr_paths = {mf["path"] for mf in logrotate["managed_files"]} - assert "/etc/logrotate.conf" in lr_paths - assert "/etc/logrotate.d/rsyslog" in lr_paths diff --git a/tests/test_harvest_helpers.py b/tests/test_harvest_helpers.py deleted file mode 100644 index 531a62c..0000000 --- a/tests/test_harvest_helpers.py +++ /dev/null @@ -1,170 +0,0 @@ -from __future__ import annotations - -import os -from pathlib import Path - -import enroll.harvest as h - - -def test_iter_matching_files_skips_symlinks_and_walks_dirs(monkeypatch, tmp_path: Path): - # Layout: - # root/real.txt (file) - # root/sub/nested.txt - # root/link -> ... (ignored) - root = tmp_path / "root" - (root / "sub").mkdir(parents=True) - (root / "real.txt").write_text("a", encoding="utf-8") - (root / "sub" / "nested.txt").write_text("b", encoding="utf-8") - - paths = { - str(root): "dir", - str(root / "real.txt"): "file", - str(root / "sub"): "dir", - str(root / "sub" / "nested.txt"): "file", - str(root / "link"): "link", - } - - monkeypatch.setattr(h.glob, "glob", lambda spec: [str(root), str(root / "link")]) - monkeypatch.setattr(h.os.path, "islink", lambda p: paths.get(p) == "link") - monkeypatch.setattr(h.os.path, "isfile", lambda p: paths.get(p) == "file") - monkeypatch.setattr(h.os.path, "isdir", lambda p: paths.get(p) == "dir") - monkeypatch.setattr( - h.os, - "walk", - lambda p: [ - (str(root), ["sub"], ["real.txt", "link"]), - (str(root / "sub"), [], ["nested.txt"]), - ], - ) - - out = h._iter_matching_files("/whatever/*", cap=100) - assert str(root / "real.txt") in out - assert str(root / "sub" / "nested.txt") in out - assert str(root / "link") not in out - - -def test_parse_apt_signed_by_extracts_keyrings(tmp_path: Path): - f1 = tmp_path / "a.list" - f1.write_text( - "deb [signed-by=/usr/share/keyrings/foo.gpg] https://example.invalid stable main\n", - encoding="utf-8", - ) - f2 = tmp_path / "b.sources" - f2.write_text( - "Types: deb\nSigned-By: /etc/apt/keyrings/bar.gpg, /usr/share/keyrings/baz.gpg\n", - encoding="utf-8", - ) - f3 = tmp_path / "c.sources" - f3.write_text("Signed-By: | /bin/echo nope\n", encoding="utf-8") - - out = h._parse_apt_signed_by([str(f1), str(f2), str(f3)]) - assert "/usr/share/keyrings/foo.gpg" in out - assert "/etc/apt/keyrings/bar.gpg" in out - assert "/usr/share/keyrings/baz.gpg" in out - - -def test_iter_apt_capture_paths_includes_signed_by_keyring(monkeypatch): - # Simulate: - # /etc/apt/apt.conf.d/00test - # /etc/apt/sources.list.d/test.list (signed-by outside /etc/apt) - # /usr/share/keyrings/ext.gpg - files = { - "/etc/apt/apt.conf.d/00test": "file", - "/etc/apt/sources.list.d/test.list": "file", - "/usr/share/keyrings/ext.gpg": "file", - } - - monkeypatch.setattr(h.os.path, "isdir", lambda p: p in {"/etc/apt"}) - monkeypatch.setattr( - h.os, - "walk", - lambda root: [ - ("/etc/apt", ["apt.conf.d", "sources.list.d"], []), - ("/etc/apt/apt.conf.d", [], ["00test"]), - ("/etc/apt/sources.list.d", [], ["test.list"]), - ], - ) - monkeypatch.setattr(h.os.path, "islink", lambda p: False) - monkeypatch.setattr(h.os.path, "isfile", lambda p: files.get(p) == "file") - - # Only treat the sources glob as having a hit. - def fake_iter_matching(spec: str, cap: int = 10000): - if spec == "/etc/apt/sources.list.d/*.list": - return ["/etc/apt/sources.list.d/test.list"] - return [] - - monkeypatch.setattr(h, "_iter_matching_files", fake_iter_matching) - - # Provide file contents for the sources file. - real_open = open - - def fake_open(path, *a, **k): - if path == "/etc/apt/sources.list.d/test.list": - return real_open(os.devnull, "r", encoding="utf-8") # placeholder - return real_open(path, *a, **k) - - # Easier: patch _parse_apt_signed_by directly to avoid filesystem reads. - monkeypatch.setattr( - h, "_parse_apt_signed_by", lambda sfs: {"/usr/share/keyrings/ext.gpg"} - ) - - out = h._iter_apt_capture_paths() - paths = {p for p, _r in out} - reasons = {p: r for p, r in out} - assert "/etc/apt/apt.conf.d/00test" in paths - assert "/etc/apt/sources.list.d/test.list" in paths - assert "/usr/share/keyrings/ext.gpg" in paths - assert reasons["/usr/share/keyrings/ext.gpg"] == "apt_signed_by_keyring" - - -def test_iter_dnf_capture_paths(monkeypatch): - files = { - "/etc/dnf/dnf.conf": "file", - "/etc/yum/yum.conf": "file", - "/etc/yum.conf": "file", - "/etc/yum.repos.d/test.repo": "file", - "/etc/pki/rpm-gpg/RPM-GPG-KEY": "file", - } - - def isdir(p): - return p in {"/etc/dnf", "/etc/yum", "/etc/yum.repos.d", "/etc/pki/rpm-gpg"} - - def walk(root): - if root == "/etc/dnf": - return [("/etc/dnf", [], ["dnf.conf"])] - if root == "/etc/yum": - return [("/etc/yum", [], ["yum.conf"])] - if root == "/etc/pki/rpm-gpg": - return [("/etc/pki/rpm-gpg", [], ["RPM-GPG-KEY"])] - return [] - - monkeypatch.setattr(h.os.path, "isdir", isdir) - monkeypatch.setattr(h.os, "walk", walk) - monkeypatch.setattr(h.os.path, "islink", lambda p: False) - monkeypatch.setattr(h.os.path, "isfile", lambda p: files.get(p) == "file") - monkeypatch.setattr( - h, - "_iter_matching_files", - lambda spec, cap=10000: ( - ["/etc/yum.repos.d/test.repo"] if spec.endswith("*.repo") else [] - ), - ) - - out = h._iter_dnf_capture_paths() - paths = {p for p, _r in out} - assert "/etc/dnf/dnf.conf" in paths - assert "/etc/yum/yum.conf" in paths - assert "/etc/yum.conf" in paths - assert "/etc/yum.repos.d/test.repo" in paths - assert "/etc/pki/rpm-gpg/RPM-GPG-KEY" in paths - - -def test_iter_system_capture_paths_dedupes_first_reason(monkeypatch): - monkeypatch.setattr(h, "_SYSTEM_CAPTURE_GLOBS", [("/a", "r1"), ("/b", "r2")]) - monkeypatch.setattr( - h, - "_iter_matching_files", - lambda spec, cap=10000: ["/dup"] if spec in {"/a", "/b"} else [], - ) - out = h._iter_system_capture_paths() - assert out == [("/dup", "r1")] diff --git a/tests/test_manifest.py b/tests/test_manifest.py index 8b34fcb..fec9cc3 100644 --- a/tests/test_manifest.py +++ b/tests/test_manifest.py @@ -1,12 +1,7 @@ import json from pathlib import Path -import os -import stat -import tarfile -import pytest - -import enroll.manifest as manifest +from enroll.manifest import manifest def test_manifest_writes_roles_and_playbook_with_clean_when(tmp_path: Path): @@ -181,7 +176,7 @@ def test_manifest_writes_roles_and_playbook_with_clean_when(tmp_path: Path): bundle / "artifacts" / "usr_local_custom" / "usr" / "local" / "bin" / "myscript" ).write_text("#!/bin/sh\necho hi\n", encoding="utf-8") - manifest.manifest(str(bundle), str(out)) + manifest(str(bundle), str(out)) # Service role: systemd management should be gated on foo_manage_unit and a probe. tasks = (out / "roles" / "foo" / "tasks" / "main.yml").read_text(encoding="utf-8") @@ -350,7 +345,7 @@ def test_manifest_site_mode_creates_host_inventory_and_raw_files(tmp_path: Path) / "myapp.conf" ).write_text("myapp=1\n", encoding="utf-8") - manifest.manifest(str(bundle), str(out), fqdn=fqdn) + manifest(str(bundle), str(out), fqdn=fqdn) # Host playbook exists. assert (out / "playbooks" / f"{fqdn}.yml").exists() @@ -487,7 +482,7 @@ def test_manifest_includes_dnf_config_role_when_present(tmp_path: Path): bundle.mkdir(parents=True, exist_ok=True) (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") - manifest.manifest(str(bundle), str(out)) + manifest(str(bundle), str(out)) pb = (out / "playbook.yml").read_text(encoding="utf-8") assert "- dnf_config" in pb @@ -507,291 +502,3 @@ def test_render_install_packages_tasks_contains_dnf_branch(): assert "ansible.builtin.dnf" in txt assert "ansible.builtin.package" in txt assert "pkg_mgr" in txt - - -def test_manifest_orders_cron_and_logrotate_at_playbook_tail(tmp_path: Path): - """Cron/logrotate roles should appear at the end. - - The cron role may restore per-user crontabs under /var/spool, so it should - run after users have been created. - """ - - bundle = tmp_path / "bundle" - out = tmp_path / "ansible" - - state = { - "schema_version": 3, - "host": {"hostname": "test", "os": "debian", "pkg_backend": "dpkg"}, - "inventory": {"packages": {}}, - "roles": { - "users": { - "role_name": "users", - "users": [{"name": "alice"}], - "managed_files": [], - "excluded": [], - "notes": [], - }, - "services": [], - "packages": [ - { - "package": "curl", - "role_name": "curl", - "managed_files": [], - "excluded": [], - "notes": [], - }, - { - "package": "cron", - "role_name": "cron", - "managed_files": [ - { - "path": "/var/spool/cron/crontabs/alice", - "src_rel": "var/spool/cron/crontabs/alice", - "owner": "alice", - "group": "root", - "mode": "0600", - "reason": "system_cron", - } - ], - "excluded": [], - "notes": [], - }, - { - "package": "logrotate", - "role_name": "logrotate", - "managed_files": [ - { - "path": "/etc/logrotate.conf", - "src_rel": "etc/logrotate.conf", - "owner": "root", - "group": "root", - "mode": "0644", - "reason": "system_logrotate", - } - ], - "excluded": [], - "notes": [], - }, - ], - "apt_config": { - "role_name": "apt_config", - "managed_files": [], - "excluded": [], - "notes": [], - }, - "dnf_config": { - "role_name": "dnf_config", - "managed_files": [], - "excluded": [], - "notes": [], - }, - "etc_custom": { - "role_name": "etc_custom", - "managed_files": [], - "excluded": [], - "notes": [], - }, - "usr_local_custom": { - "role_name": "usr_local_custom", - "managed_files": [], - "excluded": [], - "notes": [], - }, - "extra_paths": { - "role_name": "extra_paths", - "include_patterns": [], - "exclude_patterns": [], - "managed_files": [], - "excluded": [], - "notes": [], - }, - }, - } - - # Minimal artifacts for managed files. - (bundle / "artifacts" / "cron" / "var" / "spool" / "cron" / "crontabs").mkdir( - parents=True, exist_ok=True - ) - ( - bundle / "artifacts" / "cron" / "var" / "spool" / "cron" / "crontabs" / "alice" - ).write_text("@daily echo hi\n", encoding="utf-8") - (bundle / "artifacts" / "logrotate" / "etc").mkdir(parents=True, exist_ok=True) - (bundle / "artifacts" / "logrotate" / "etc" / "logrotate.conf").write_text( - "weekly\n", encoding="utf-8" - ) - - bundle.mkdir(parents=True, exist_ok=True) - (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") - - manifest.manifest(str(bundle), str(out)) - - pb = (out / "playbook.yml").read_text(encoding="utf-8").splitlines() - # Roles are emitted as indented list items under the `roles:` key. - roles = [ - ln.strip().removeprefix("- ").strip() for ln in pb if ln.startswith(" - ") - ] - - # Ensure tail ordering. - assert roles[-2:] == ["cron", "logrotate"] - assert "users" in roles - assert roles.index("users") < roles.index("cron") - - -def test_yaml_helpers_fallback_when_yaml_unavailable(monkeypatch): - monkeypatch.setattr(manifest, "_try_yaml", lambda: None) - assert manifest._yaml_load_mapping("foo: 1\n") == {} - out = manifest._yaml_dump_mapping({"b": 2, "a": 1}) - # Best-effort fallback is key: repr(value) - assert out.splitlines()[0].startswith("a: ") - assert out.endswith("\n") - - -def test_copy2_replace_makes_readonly_sources_user_writable( - monkeypatch, tmp_path: Path -): - src = tmp_path / "src.txt" - dst = tmp_path / "dst.txt" - src.write_text("hello", encoding="utf-8") - # Make source read-only; copy2 preserves mode, so tmp will be read-only too. - os.chmod(src, 0o444) - - manifest._copy2_replace(str(src), str(dst)) - - st = os.stat(dst, follow_symlinks=False) - assert stat.S_IMODE(st.st_mode) & stat.S_IWUSR - - -def test_prepare_bundle_dir_sops_decrypts_and_extracts(monkeypatch, tmp_path: Path): - enc = tmp_path / "harvest.tar.gz.sops" - enc.write_text("ignored", encoding="utf-8") - - def fake_require(): - return None - - def fake_decrypt(src: str, dst: str, *, mode: int = 0o600): - # Create a minimal tar.gz with a state.json file. - with tarfile.open(dst, "w:gz") as tf: - p = tmp_path / "state.json" - p.write_text("{}", encoding="utf-8") - tf.add(p, arcname="state.json") - - monkeypatch.setattr(manifest, "require_sops_cmd", fake_require) - monkeypatch.setattr(manifest, "decrypt_file_binary_to", fake_decrypt) - - bundle_dir, td = manifest._prepare_bundle_dir(str(enc), sops_mode=True) - try: - assert (Path(bundle_dir) / "state.json").exists() - finally: - td.cleanup() - - -def test_prepare_bundle_dir_rejects_non_dir_without_sops(tmp_path: Path): - fp = tmp_path / "bundle.tar.gz" - fp.write_text("x", encoding="utf-8") - with pytest.raises(RuntimeError): - manifest._prepare_bundle_dir(str(fp), sops_mode=False) - - -def test_tar_dir_to_with_progress_writes_progress_when_tty(monkeypatch, tmp_path: Path): - src = tmp_path / "dir" - src.mkdir() - (src / "a.txt").write_text("a", encoding="utf-8") - (src / "b.txt").write_text("b", encoding="utf-8") - - out = tmp_path / "out.tar.gz" - writes: list[bytes] = [] - - monkeypatch.setattr(manifest.os, "isatty", lambda fd: True) - monkeypatch.setattr(manifest.os, "write", lambda fd, b: writes.append(b) or len(b)) - - manifest._tar_dir_to_with_progress(str(src), str(out), desc="tarring") - assert out.exists() - assert writes # progress was written - assert writes[-1].endswith(b"\n") - - -def test_encrypt_manifest_out_dir_to_sops_handles_missing_tmp_cleanup( - monkeypatch, tmp_path: Path -): - src_dir = tmp_path / "manifest" - src_dir.mkdir() - (src_dir / "x.txt").write_text("x", encoding="utf-8") - - out = tmp_path / "manifest.tar.gz.sops" - - monkeypatch.setattr(manifest, "require_sops_cmd", lambda: None) - - def fake_encrypt(in_fp, out_fp, *args, **kwargs): - Path(out_fp).write_text("enc", encoding="utf-8") - - monkeypatch.setattr(manifest, "encrypt_file_binary", fake_encrypt) - # Simulate race where tmp tar is already removed. - monkeypatch.setattr( - manifest.os, "unlink", lambda p: (_ for _ in ()).throw(FileNotFoundError()) - ) - - res = manifest._encrypt_manifest_out_dir_to_sops(str(src_dir), str(out), ["ABC"]) # type: ignore[arg-type] - assert str(res).endswith(".sops") - assert out.exists() - - -def test_manifest_applies_jinjaturtle_to_jinjifyable_managed_file( - monkeypatch, tmp_path: Path -): - # Create a minimal bundle with just an apt_config snapshot. - bundle = tmp_path / "bundle" - (bundle / "artifacts" / "apt_config" / "etc" / "apt").mkdir(parents=True) - (bundle / "artifacts" / "apt_config" / "etc" / "apt" / "foo.ini").write_text( - "key=VALUE\n", encoding="utf-8" - ) - - state = { - "schema_version": 1, - "inventory": {"packages": {}}, - "roles": { - "services": [], - "packages": [], - "apt_config": { - "role_name": "apt_config", - "managed_files": [ - { - "path": "/etc/apt/foo.ini", - "src_rel": "etc/apt/foo.ini", - "owner": "root", - "group": "root", - "mode": "0644", - "reason": "apt_config", - } - ], - "managed_dirs": [], - "excluded": [], - "notes": [], - }, - }, - } - (bundle / "state.json").write_text( - __import__("json").dumps(state), encoding="utf-8" - ) - - monkeypatch.setattr(manifest, "find_jinjaturtle_cmd", lambda: "jinjaturtle") - - class _Res: - template_text = "key={{ foo }}\n" - vars_text = "foo: 123\n" - - monkeypatch.setattr(manifest, "run_jinjaturtle", lambda *a, **k: _Res()) - - out_dir = tmp_path / "out" - manifest.manifest(str(bundle), str(out_dir), jinjaturtle="on") - - tmpl = out_dir / "roles" / "apt_config" / "templates" / "etc" / "apt" / "foo.ini.j2" - assert tmpl.exists() - assert "{{ foo }}" in tmpl.read_text(encoding="utf-8") - - defaults = out_dir / "roles" / "apt_config" / "defaults" / "main.yml" - txt = defaults.read_text(encoding="utf-8") - assert "foo: 123" in txt - # Non-templated file should not exist under files/. - assert not ( - out_dir / "roles" / "apt_config" / "files" / "etc" / "apt" / "foo.ini" - ).exists() diff --git a/tests/test_misc_coverage.py b/tests/test_misc_coverage.py index 1ff6e98..b4250fc 100644 --- a/tests/test_misc_coverage.py +++ b/tests/test_misc_coverage.py @@ -1,13 +1,5 @@ -from __future__ import annotations - -import json -import os import stat -import subprocess -import sys -import types from pathlib import Path -from types import SimpleNamespace import pytest @@ -102,315 +94,3 @@ def test_sops_pgp_arg_and_encrypt_decrypt_roundtrip(tmp_path: Path, monkeypatch) # Sanity: we invoked encrypt and decrypt. assert any("--encrypt" in c for c in calls) assert any("--decrypt" in c for c in calls) - - -def test_cache_dir_defaults_to_home_cache(monkeypatch, tmp_path: Path): - # Ensure default path uses ~/.cache when XDG_CACHE_HOME is unset. - from enroll.cache import enroll_cache_dir - - monkeypatch.delenv("XDG_CACHE_HOME", raising=False) - monkeypatch.setattr(Path, "home", lambda: tmp_path) - - p = enroll_cache_dir() - assert str(p).startswith(str(tmp_path)) - assert p.name == "enroll" - - -def test_harvest_cache_state_json_property(tmp_path: Path): - from enroll.cache import HarvestCache - - hc = HarvestCache(tmp_path / "h1") - assert hc.state_json == hc.dir / "state.json" - - -def test_cache_dir_security_rejects_symlink(tmp_path: Path): - from enroll.cache import _ensure_dir_secure - - real = tmp_path / "real" - real.mkdir() - link = tmp_path / "link" - link.symlink_to(real, target_is_directory=True) - - with pytest.raises(RuntimeError, match="Refusing to use symlink"): - _ensure_dir_secure(link) - - -def test_cache_dir_chmod_failures_are_ignored(monkeypatch, tmp_path: Path): - from enroll import cache - - # Make the cache base path deterministic and writable. - monkeypatch.setattr(cache, "enroll_cache_dir", lambda: tmp_path) - - # Force os.chmod to fail to cover the "except OSError: pass" paths. - monkeypatch.setattr( - os, "chmod", lambda *a, **k: (_ for _ in ()).throw(OSError("nope")) - ) - - hc = cache.new_harvest_cache_dir() - assert hc.dir.exists() - assert hc.dir.is_dir() - - -def test_stat_triplet_falls_back_to_numeric_ids(monkeypatch, tmp_path: Path): - from enroll.fsutil import stat_triplet - import pwd - import grp - - p = tmp_path / "x" - p.write_text("x", encoding="utf-8") - - # Force username/group resolution failures. - monkeypatch.setattr( - pwd, "getpwuid", lambda _uid: (_ for _ in ()).throw(KeyError("no user")) - ) - monkeypatch.setattr( - grp, "getgrgid", lambda _gid: (_ for _ in ()).throw(KeyError("no group")) - ) - - owner, group, mode = stat_triplet(str(p)) - assert owner.isdigit() - assert group.isdigit() - assert len(mode) == 4 - - -def test_ignore_policy_iter_effective_lines_removes_block_comments(): - from enroll.ignore import IgnorePolicy - - pol = IgnorePolicy() - data = b"""keep1 -/* -drop me -*/ -keep2 -""" - assert list(pol.iter_effective_lines(data)) == [b"keep1", b"keep2"] - - -def test_ignore_policy_deny_reason_dir_variants(tmp_path: Path): - from enroll.ignore import IgnorePolicy - - pol = IgnorePolicy() - - # denied by glob - assert pol.deny_reason_dir("/etc/shadow") == "denied_path" - - # symlink rejected - d = tmp_path / "d" - d.mkdir() - link = tmp_path / "l" - link.symlink_to(d, target_is_directory=True) - assert pol.deny_reason_dir(str(link)) == "symlink" - - # not a directory - f = tmp_path / "f" - f.write_text("x", encoding="utf-8") - assert pol.deny_reason_dir(str(f)) == "not_directory" - - # ok - assert pol.deny_reason_dir(str(d)) is None - - -def test_run_jinjaturtle_parses_outputs(monkeypatch, tmp_path: Path): - # Fully unit-test enroll.jinjaturtle.run_jinjaturtle by stubbing subprocess.run. - from enroll.jinjaturtle import run_jinjaturtle - - def fake_run(cmd, **kwargs): # noqa: ARG001 - # cmd includes "-d -t