Compare commits

..

No commits in common. "a1433d645f7b0964db05eb51d55c2de980ee19e5" and "24cedc8c8df6d8f00a4b48f3b6393077cb5b45ef" have entirely different histories.

15 changed files with 382 additions and 1760 deletions

View file

@ -2,7 +2,6 @@
* Introduce `enroll explain` - a tool to analyze and explain what's in (or not in) a harvest and why.
* Centralise the cron and logrotate stuff into their respective roles, we had a bit of duplication between roles based on harvest discovery.
* Capture other files in the user's home directory such as `.bashrc`, `.bash_aliases`, `.profile`, if these files differ from the `/etc/skel` defaults
# 0.2.3

View file

@ -914,6 +914,56 @@ def main() -> None:
fqdn=args.fqdn,
jinjaturtle=_jt_mode(args),
)
elif args.cmd == "diff":
report, has_changes = compare_harvests(
args.old, args.new, sops_mode=bool(getattr(args, "sops", False))
)
rendered = format_report(report, fmt=str(args.format))
if args.out:
Path(args.out).expanduser().write_text(rendered, encoding="utf-8")
else:
print(rendered, end="")
do_notify = bool(has_changes or getattr(args, "notify_always", False))
if do_notify and getattr(args, "webhook", None):
wf = str(getattr(args, "webhook_format", "json"))
body = format_report(report, fmt=wf).encode("utf-8")
headers = {"User-Agent": "enroll"}
if wf == "json":
headers["Content-Type"] = "application/json"
else:
headers["Content-Type"] = "text/plain; charset=utf-8"
for hv in getattr(args, "webhook_header", []) or []:
if ":" not in hv:
raise SystemExit(
"error: --webhook-header must be in the form 'K:V'"
)
k, v = hv.split(":", 1)
headers[k.strip()] = v.strip()
status, _ = post_webhook(str(args.webhook), body, headers=headers)
if status and status >= 400:
raise SystemExit(f"error: webhook returned HTTP {status}")
if do_notify and (getattr(args, "email_to", []) or []):
subject = getattr(args, "email_subject", None) or "enroll diff report"
smtp_password = None
pw_env = getattr(args, "smtp_password_env", None)
if pw_env:
smtp_password = os.environ.get(str(pw_env))
send_email(
to_addrs=list(getattr(args, "email_to", []) or []),
subject=str(subject),
body=rendered,
from_addr=getattr(args, "email_from", None),
smtp=getattr(args, "smtp", None),
smtp_user=getattr(args, "smtp_user", None),
smtp_password=smtp_password,
)
if getattr(args, "exit_code", False) and has_changes:
raise SystemExit(2)
except RemoteSudoPasswordRequired:
raise SystemExit(
"error: remote sudo requires a password. Re-run with --ask-become-pass."

View file

@ -5,7 +5,6 @@ import json
import os
import re
import shutil
import stat
import time
from dataclasses import dataclass, asdict, field
from typing import Dict, List, Optional, Set
@ -158,54 +157,6 @@ MAX_FILES_CAP = 4000
MAX_UNOWNED_FILES_PER_ROLE = 500
def _files_differ(a: str, b: str, *, max_bytes: int = 2_000_000) -> bool:
"""Return True if file `a` differs from file `b`.
Best-effort and conservative:
- If `b` (baseline) does not exist or is not a regular file, treat as
"different" so we err on the side of capturing user state.
- If we can't stat/read either file, treat as "different" (capture will
later be filtered via IgnorePolicy).
- If files are large, avoid reading them fully.
"""
try:
st_a = os.stat(a, follow_symlinks=True)
except OSError:
return True
# Refuse to do content comparisons on non-regular files.
if not stat.S_ISREG(st_a.st_mode):
return True
try:
st_b = os.stat(b, follow_symlinks=True)
except OSError:
return True
if not stat.S_ISREG(st_b.st_mode):
return True
if st_a.st_size != st_b.st_size:
return True
# If it's unexpectedly big, treat as different to avoid expensive reads.
if st_a.st_size > max_bytes:
return True
try:
with open(a, "rb") as fa, open(b, "rb") as fb:
while True:
ca = fa.read(1024 * 64)
cb = fb.read(1024 * 64)
if ca != cb:
return True
if not ca: # EOF on both
return False
except OSError:
return True
def _merge_parent_dirs(
existing_dirs: List[ManagedDir],
managed_files: List[ManagedFile],
@ -1368,18 +1319,6 @@ def harvest(
users_role_name = "users"
users_role_seen = seen_by_role.setdefault(users_role_name, set())
skel_dir = "/etc/skel"
# Dotfiles to harvest for non-system users. For the common "skeleton"
# files, only capture if the user's copy differs from /etc/skel.
skel_dotfiles = [
(".bashrc", "user_shell_rc"),
(".profile", "user_profile"),
(".bash_logout", "user_shell_logout"),
]
extra_dotfiles = [
(".bash_aliases", "user_shell_aliases"),
]
for u in user_records:
users_list.append(
{
@ -1414,48 +1353,6 @@ def harvest(
seen_global=captured_global,
)
# Capture common per-user shell dotfiles when they differ from /etc/skel.
# These still go through IgnorePolicy and user path filters.
home = (u.home or "").rstrip("/")
if home and home.startswith("/"):
for rel, reason in skel_dotfiles:
upath = os.path.join(home, rel)
if not os.path.exists(upath):
continue
skel_path = os.path.join(skel_dir, rel)
if not _files_differ(upath, skel_path, max_bytes=policy.max_file_bytes):
continue
_capture_file(
bundle_dir=bundle_dir,
role_name=users_role_name,
abs_path=upath,
reason=reason,
policy=policy,
path_filter=path_filter,
managed_out=users_managed,
excluded_out=users_excluded,
seen_role=users_role_seen,
seen_global=captured_global,
)
# Capture other common per-user shell files unconditionally if present.
for rel, reason in extra_dotfiles:
upath = os.path.join(home, rel)
if not os.path.exists(upath):
continue
_capture_file(
bundle_dir=bundle_dir,
role_name=users_role_name,
abs_path=upath,
reason=reason,
policy=policy,
path_filter=path_filter,
managed_out=users_managed,
excluded_out=users_excluded,
seen_role=users_role_seen,
seen_global=captured_global,
)
users_snapshot = UsersSnapshot(
role_name=users_role_name,
users=users_list,

View file

@ -819,12 +819,7 @@ def _manifest_from_bundle_dir(
group = str(u.get("primary_group") or owner)
break
# Prefer the harvested file mode so we preserve any deliberate
# permissions (e.g. 0600 for certain dotfiles). For authorized_keys,
# enforce 0600 regardless.
mode = mf.get("mode") or "0644"
if mf.get("reason") == "authorized_keys":
mode = "0600"
mode = "0600" if mf.get("reason") == "authorized_keys" else "0644"
ssh_files.append(
{
"dest": dest,

View file

@ -27,10 +27,9 @@ poetry run \
enroll harvest --out "${BUNDLE_DIR}2"
poetry run \
enroll diff \
--old "${BUNDLE_DIR}" \
--new "${BUNDLE_DIR}2" \
--format json | jq
DEBIAN_FRONTEND=noninteractive apt-get remove --purge cowsay
--old "${BUNDLE_DIR}" \
--new "${BUNDLE_DIR}2" \
--format json | jq
# Ansible test
builtin cd "${ANSIBLE_DIR}"

View file

@ -1,14 +1,9 @@
from __future__ import annotations
import sys
import pytest
import enroll.cli as cli
from pathlib import Path
from enroll.remote import RemoteSudoPasswordRequired
from enroll.sopsutil import SopsError
def test_cli_harvest_subcommand_calls_harvest(monkeypatch, capsys, tmp_path):
called = {}
@ -403,286 +398,3 @@ def test_cli_manifest_common_args(monkeypatch, tmp_path):
cli.main()
assert called["fqdn"] == "example.test"
assert called["jinjaturtle"] == "off"
def test_cli_explain_passes_args_and_writes_stdout(monkeypatch, capsys, tmp_path):
called = {}
def fake_explain_state(
harvest: str,
*,
sops_mode: bool = False,
fmt: str = "text",
max_examples: int = 3,
):
called["harvest"] = harvest
called["sops_mode"] = sops_mode
called["fmt"] = fmt
called["max_examples"] = max_examples
return "EXPLAINED\n"
monkeypatch.setattr(cli, "explain_state", fake_explain_state)
monkeypatch.setattr(
sys,
"argv",
[
"enroll",
"explain",
"--sops",
"--format",
"json",
"--max-examples",
"7",
str(tmp_path / "bundle" / "state.json"),
],
)
cli.main()
out = capsys.readouterr().out
assert out == "EXPLAINED\n"
assert called["sops_mode"] is True
assert called["fmt"] == "json"
assert called["max_examples"] == 7
def test_discover_config_path_missing_config_value_returns_none(monkeypatch):
# Covers the "--config" flag present with no value.
monkeypatch.delenv("ENROLL_CONFIG", raising=False)
monkeypatch.delenv("XDG_CONFIG_HOME", raising=False)
assert cli._discover_config_path(["--config"]) is None
def test_discover_config_path_defaults_to_home_config(monkeypatch, tmp_path: Path):
# Covers the Path.home() / ".config" fallback.
monkeypatch.delenv("ENROLL_CONFIG", raising=False)
monkeypatch.delenv("XDG_CONFIG_HOME", raising=False)
monkeypatch.setattr(cli.Path, "home", lambda: tmp_path)
monkeypatch.setattr(cli.Path, "cwd", lambda: tmp_path)
cp = tmp_path / ".config" / "enroll" / "enroll.ini"
cp.parent.mkdir(parents=True)
cp.write_text("[enroll]\n", encoding="utf-8")
assert cli._discover_config_path(["harvest"]) == cp
def test_cli_harvest_local_sops_encrypts_and_prints_path(
monkeypatch, tmp_path: Path, capsys
):
out_dir = tmp_path / "out"
out_dir.mkdir()
calls: dict[str, object] = {}
def fake_harvest(bundle_dir: str, **kwargs):
calls["bundle"] = bundle_dir
# Create a minimal state.json so tooling that expects it won't break.
Path(bundle_dir).mkdir(parents=True, exist_ok=True)
(Path(bundle_dir) / "state.json").write_text("{}", encoding="utf-8")
return str(Path(bundle_dir) / "state.json")
def fake_encrypt(bundle_dir: Path, out_file: Path, fps: list[str]):
calls["encrypt"] = (bundle_dir, out_file, fps)
out_file.write_text("encrypted", encoding="utf-8")
return out_file
monkeypatch.setattr(cli, "harvest", fake_harvest)
monkeypatch.setattr(cli, "_encrypt_harvest_dir_to_sops", fake_encrypt)
monkeypatch.setattr(
sys,
"argv",
[
"enroll",
"harvest",
"--sops",
"ABCDEF",
"--out",
str(out_dir),
],
)
cli.main()
printed = capsys.readouterr().out.strip()
assert printed.endswith("harvest.tar.gz.sops")
assert Path(printed).exists()
assert calls.get("encrypt")
def test_cli_harvest_remote_sops_encrypts_and_prints_path(
monkeypatch, tmp_path: Path, capsys
):
out_dir = tmp_path / "out"
out_dir.mkdir()
calls: dict[str, object] = {}
def fake_remote_harvest(**kwargs):
calls["remote"] = kwargs
# Create a minimal state.json in the temp bundle.
out = Path(kwargs["local_out_dir"]) / "state.json"
out.write_text("{}", encoding="utf-8")
return out
def fake_encrypt(bundle_dir: Path, out_file: Path, fps: list[str]):
calls["encrypt"] = (bundle_dir, out_file, fps)
out_file.write_text("encrypted", encoding="utf-8")
return out_file
monkeypatch.setattr(cli, "remote_harvest", fake_remote_harvest)
monkeypatch.setattr(cli, "_encrypt_harvest_dir_to_sops", fake_encrypt)
monkeypatch.setattr(
sys,
"argv",
[
"enroll",
"harvest",
"--remote-host",
"example.com",
"--remote-user",
"root",
"--sops",
"ABCDEF",
"--out",
str(out_dir),
],
)
cli.main()
printed = capsys.readouterr().out.strip()
assert printed.endswith("harvest.tar.gz.sops")
assert Path(printed).exists()
assert calls.get("remote")
assert calls.get("encrypt")
def test_cli_harvest_remote_password_required_exits_cleanly(monkeypatch):
def boom(**kwargs):
raise RemoteSudoPasswordRequired("pw required")
monkeypatch.setattr(cli, "remote_harvest", boom)
monkeypatch.setattr(
sys,
"argv",
[
"enroll",
"harvest",
"--remote-host",
"example.com",
"--remote-user",
"root",
],
)
with pytest.raises(SystemExit) as e:
cli.main()
assert "--ask-become-pass" in str(e.value)
def test_cli_runtime_error_is_wrapped_as_user_friendly_system_exit(monkeypatch):
def boom(*args, **kwargs):
raise RuntimeError("nope")
monkeypatch.setattr(cli, "harvest", boom)
monkeypatch.setattr(sys, "argv", ["enroll", "harvest", "--out", "/tmp/x"])
with pytest.raises(SystemExit) as e:
cli.main()
assert str(e.value) == "error: nope"
def test_cli_sops_error_is_wrapped_as_user_friendly_system_exit(monkeypatch):
def boom(*args, **kwargs):
raise SopsError("sops broke")
monkeypatch.setattr(cli, "manifest", boom)
monkeypatch.setattr(
sys, "argv", ["enroll", "manifest", "--harvest", "/tmp/x", "--out", "/tmp/y"]
)
with pytest.raises(SystemExit) as e:
cli.main()
assert str(e.value) == "error: sops broke"
def test_cli_diff_notifies_webhook_and_email_and_respects_exit_code(
monkeypatch, capsys
):
calls: dict[str, object] = {}
def fake_compare(old, new, sops_mode=False):
calls["compare"] = (old, new, sops_mode)
return {"dummy": True}, True
def fake_format(report, fmt="text"):
calls.setdefault("format", []).append((report, fmt))
return "REPORT\n"
def fake_post(url, body, headers=None):
calls["webhook"] = (url, body, headers)
return 200, b"ok"
def fake_email(**kwargs):
calls["email"] = kwargs
monkeypatch.setattr(cli, "compare_harvests", fake_compare)
monkeypatch.setattr(cli, "format_report", fake_format)
monkeypatch.setattr(cli, "post_webhook", fake_post)
monkeypatch.setattr(cli, "send_email", fake_email)
monkeypatch.setenv("SMTPPW", "secret")
monkeypatch.setattr(
sys,
"argv",
[
"enroll",
"diff",
"--old",
"/tmp/old",
"--new",
"/tmp/new",
"--webhook",
"https://example.invalid/h",
"--webhook-header",
"X-Test: ok",
"--email-to",
"a@example.com",
"--smtp-password-env",
"SMTPPW",
"--exit-code",
],
)
with pytest.raises(SystemExit) as e:
cli.main()
assert e.value.code == 2
assert calls.get("compare")
assert calls.get("webhook")
assert calls.get("email")
# No report printed when exiting via --exit-code? (we still render and print).
_ = capsys.readouterr()
def test_cli_diff_webhook_http_error_raises_system_exit(monkeypatch):
def fake_compare(old, new, sops_mode=False):
return {"dummy": True}, True
monkeypatch.setattr(cli, "compare_harvests", fake_compare)
monkeypatch.setattr(cli, "format_report", lambda report, fmt="text": "R\n")
monkeypatch.setattr(cli, "post_webhook", lambda url, body, headers=None: (500, b""))
monkeypatch.setattr(
sys,
"argv",
[
"enroll",
"diff",
"--old",
"/tmp/old",
"--new",
"/tmp/new",
"--webhook",
"https://example.invalid/h",
],
)
with pytest.raises(SystemExit) as e:
cli.main()
assert "HTTP 500" in str(e.value)

View file

@ -1,222 +0,0 @@
from __future__ import annotations
import json
from pathlib import Path
import enroll.explain as ex
def _write_state(bundle: Path, state: dict) -> Path:
bundle.mkdir(parents=True, exist_ok=True)
(bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8")
return bundle / "state.json"
def test_explain_state_text_renders_roles_inventory_and_reasons(tmp_path: Path):
bundle = tmp_path / "bundle"
state = {
"schema_version": 3,
"host": {"hostname": "h1", "os": "debian", "pkg_backend": "dpkg"},
"enroll": {"version": "0.0.0"},
"inventory": {
"packages": {
"foo": {
"installations": [{"version": "1.0", "arch": "amd64"}],
"observed_via": [
{"kind": "systemd_unit", "ref": "foo.service"},
{"kind": "package_role", "ref": "foo"},
],
"roles": ["foo"],
},
"bar": {
"installations": [{"version": "2.0", "arch": "amd64"}],
"observed_via": [{"kind": "user_installed", "ref": "manual"}],
"roles": ["bar"],
},
}
},
"roles": {
"users": {
"role_name": "users",
"users": [{"name": "alice"}],
"managed_files": [
{
"path": "/home/alice/.ssh/authorized_keys",
"src_rel": "home/alice/.ssh/authorized_keys",
"owner": "alice",
"group": "alice",
"mode": "0600",
"reason": "authorized_keys",
}
],
"managed_dirs": [
{
"path": "/home/alice/.ssh",
"owner": "alice",
"group": "alice",
"mode": "0700",
"reason": "parent_of_managed_file",
}
],
"excluded": [{"path": "/etc/shadow", "reason": "sensitive_content"}],
"notes": ["n1", "n2"],
},
"services": [
{
"unit": "foo.service",
"role_name": "foo",
"packages": ["foo"],
"managed_files": [
{
"path": "/etc/foo.conf",
"src_rel": "etc/foo.conf",
"owner": "root",
"group": "root",
"mode": "0644",
"reason": "modified_conffile",
},
# Unknown reason should fall back to generic text.
{
"path": "/etc/odd.conf",
"src_rel": "etc/odd.conf",
"owner": "root",
"group": "root",
"mode": "0644",
"reason": "mystery_reason",
},
],
"excluded": [],
"notes": [],
}
],
"packages": [
{
"package": "bar",
"role_name": "bar",
"managed_files": [],
"excluded": [],
"notes": [],
}
],
"extra_paths": {
"role_name": "extra_paths",
"include_patterns": ["/etc/a", "/etc/b"],
"exclude_patterns": ["/etc/x", "/etc/y"],
"managed_files": [],
"excluded": [],
"notes": [],
},
"apt_config": {
"role_name": "apt_config",
"managed_files": [],
"excluded": [],
"notes": [],
},
"dnf_config": {
"role_name": "dnf_config",
"managed_files": [],
"excluded": [],
"notes": [],
},
"etc_custom": {
"role_name": "etc_custom",
"managed_files": [],
"excluded": [],
"notes": [],
},
"usr_local_custom": {
"role_name": "usr_local_custom",
"managed_files": [],
"excluded": [],
"notes": [],
},
},
}
state_path = _write_state(bundle, state)
out = ex.explain_state(str(state_path), fmt="text", max_examples=1)
assert "Enroll explained:" in out
assert "Host: h1" in out
assert "Inventory" in out
# observed_via summary should include both kinds (order not strictly guaranteed)
assert "observed_via" in out
assert "systemd_unit" in out
assert "user_installed" in out
# extra_paths include/exclude patterns should be rendered with max_examples truncation.
assert "include_patterns:" in out
assert "/etc/a" in out
assert "exclude_patterns:" in out
# Reasons section should mention known and unknown reasons.
assert "modified_conffile" in out
assert "mystery_reason" in out
assert "Captured with reason 'mystery_reason'" in out
# Excluded paths section.
assert "Why paths were excluded" in out
assert "sensitive_content" in out
def test_explain_state_json_contains_structured_report(tmp_path: Path):
bundle = tmp_path / "bundle"
state = {
"schema_version": 3,
"host": {"hostname": "h2", "os": "rhel", "pkg_backend": "rpm"},
"enroll": {"version": "1.2.3"},
"inventory": {"packages": {}},
"roles": {
"users": {
"role_name": "users",
"users": [],
"managed_files": [],
"excluded": [],
"notes": [],
},
"services": [],
"packages": [],
"apt_config": {
"role_name": "apt_config",
"managed_files": [],
"excluded": [],
"notes": [],
},
"dnf_config": {
"role_name": "dnf_config",
"managed_files": [],
"excluded": [],
"notes": [],
},
"etc_custom": {
"role_name": "etc_custom",
"managed_files": [],
"excluded": [],
"notes": [],
},
"usr_local_custom": {
"role_name": "usr_local_custom",
"managed_files": [],
"excluded": [],
"notes": [],
},
"extra_paths": {
"role_name": "extra_paths",
"include_patterns": [],
"exclude_patterns": [],
"managed_files": [],
"excluded": [],
"notes": [],
},
},
}
state_path = _write_state(bundle, state)
raw = ex.explain_state(str(state_path), fmt="json", max_examples=2)
rep = json.loads(raw)
assert rep["host"]["hostname"] == "h2"
assert rep["enroll"]["version"] == "1.2.3"
assert rep["inventory"]["package_count"] == 0
assert isinstance(rep["roles"], list)
assert "reasons" in rep

View file

@ -1,164 +0,0 @@
from __future__ import annotations
import json
from pathlib import Path
import enroll.harvest as h
from enroll.platform import PlatformInfo
from enroll.systemd import UnitInfo
class AllowAllPolicy:
def deny_reason(self, path: str):
return None
class FakeBackend:
def __init__(
self,
*,
name: str,
installed: dict[str, list[dict[str, str]]],
manual: list[str],
):
self.name = name
self._installed = dict(installed)
self._manual = list(manual)
def build_etc_index(self):
# No package ownership information needed for this test.
return set(), {}, {}, {}
def installed_packages(self):
return dict(self._installed)
def list_manual_packages(self):
return list(self._manual)
def owner_of_path(self, path: str):
return None
def specific_paths_for_hints(self, hints: set[str]):
return []
def is_pkg_config_path(self, path: str) -> bool:
return False
def modified_paths(self, pkg: str, etc_paths: list[str]):
return {}
def test_harvest_unifies_cron_and_logrotate_into_dedicated_package_roles(
monkeypatch, tmp_path: Path
):
bundle = tmp_path / "bundle"
# Fake files we want harvested.
files = {
"/etc/crontab": b"* * * * * root echo hi\n",
"/etc/cron.d/php": b"# php cron\n",
"/var/spool/cron/crontabs/alice": b"@daily echo user\n",
"/etc/logrotate.conf": b"weekly\n",
"/etc/logrotate.d/rsyslog": b"/var/log/syslog { rotate 7 }\n",
}
monkeypatch.setattr(h.os.path, "islink", lambda p: False)
monkeypatch.setattr(h.os.path, "isfile", lambda p: p in files)
monkeypatch.setattr(h.os.path, "isdir", lambda p: False)
monkeypatch.setattr(h.os.path, "exists", lambda p: (p in files) or False)
# Expand cron/logrotate globs deterministically.
def fake_iter_matching(spec: str, cap: int = 10000):
mapping = {
"/etc/crontab": ["/etc/crontab"],
"/etc/cron.d/*": ["/etc/cron.d/php"],
"/etc/cron.hourly/*": [],
"/etc/cron.daily/*": [],
"/etc/cron.weekly/*": [],
"/etc/cron.monthly/*": [],
"/etc/cron.allow": [],
"/etc/cron.deny": [],
"/etc/anacrontab": [],
"/etc/anacron/*": [],
"/var/spool/cron/*": [],
"/var/spool/cron/crontabs/*": ["/var/spool/cron/crontabs/alice"],
"/var/spool/crontabs/*": [],
"/var/spool/anacron/*": [],
"/etc/logrotate.conf": ["/etc/logrotate.conf"],
"/etc/logrotate.d/*": ["/etc/logrotate.d/rsyslog"],
}
return list(mapping.get(spec, []))[:cap]
monkeypatch.setattr(h, "_iter_matching_files", fake_iter_matching)
# Avoid real system probing.
monkeypatch.setattr(
h, "detect_platform", lambda: PlatformInfo("debian", "dpkg", {})
)
backend = FakeBackend(
name="dpkg",
installed={
"cron": [{"version": "1", "arch": "amd64"}],
"logrotate": [{"version": "1", "arch": "amd64"}],
},
# Include cron/logrotate in manual packages to ensure they are skipped in the generic loop.
manual=["cron", "logrotate"],
)
monkeypatch.setattr(h, "get_backend", lambda info=None: backend)
# Include a service that would collide with cron role naming.
monkeypatch.setattr(
h, "list_enabled_services", lambda: ["cron.service", "foo.service"]
)
monkeypatch.setattr(h, "list_enabled_timers", lambda: [])
monkeypatch.setattr(
h,
"get_unit_info",
lambda unit: UnitInfo(
name=unit,
fragment_path=None,
dropin_paths=[],
env_files=[],
exec_paths=[],
active_state="active",
sub_state="running",
unit_file_state="enabled",
condition_result=None,
),
)
monkeypatch.setattr(h, "collect_non_system_users", lambda: [])
monkeypatch.setattr(
h,
"stat_triplet",
lambda p: ("alice" if "alice" in p else "root", "root", "0644"),
)
# Avoid needing real source files by implementing our own bundle copier.
def fake_copy(bundle_dir: str, role_name: str, abs_path: str, src_rel: str):
dst = Path(bundle_dir) / "artifacts" / role_name / src_rel
dst.parent.mkdir(parents=True, exist_ok=True)
dst.write_bytes(files.get(abs_path, b""))
monkeypatch.setattr(h, "_copy_into_bundle", fake_copy)
state_path = h.harvest(str(bundle), policy=AllowAllPolicy())
st = json.loads(Path(state_path).read_text(encoding="utf-8"))
# cron.service must be skipped to avoid colliding with the dedicated "cron" package role.
svc_units = [s["unit"] for s in st["roles"]["services"]]
assert "cron.service" not in svc_units
assert "foo.service" in svc_units
pkgs = st["roles"]["packages"]
cron = next(p for p in pkgs if p["role_name"] == "cron")
logrotate = next(p for p in pkgs if p["role_name"] == "logrotate")
cron_paths = {mf["path"] for mf in cron["managed_files"]}
assert "/etc/crontab" in cron_paths
assert "/etc/cron.d/php" in cron_paths
# user crontab captured
assert "/var/spool/cron/crontabs/alice" in cron_paths
lr_paths = {mf["path"] for mf in logrotate["managed_files"]}
assert "/etc/logrotate.conf" in lr_paths
assert "/etc/logrotate.d/rsyslog" in lr_paths

View file

@ -1,170 +0,0 @@
from __future__ import annotations
import os
from pathlib import Path
import enroll.harvest as h
def test_iter_matching_files_skips_symlinks_and_walks_dirs(monkeypatch, tmp_path: Path):
# Layout:
# root/real.txt (file)
# root/sub/nested.txt
# root/link -> ... (ignored)
root = tmp_path / "root"
(root / "sub").mkdir(parents=True)
(root / "real.txt").write_text("a", encoding="utf-8")
(root / "sub" / "nested.txt").write_text("b", encoding="utf-8")
paths = {
str(root): "dir",
str(root / "real.txt"): "file",
str(root / "sub"): "dir",
str(root / "sub" / "nested.txt"): "file",
str(root / "link"): "link",
}
monkeypatch.setattr(h.glob, "glob", lambda spec: [str(root), str(root / "link")])
monkeypatch.setattr(h.os.path, "islink", lambda p: paths.get(p) == "link")
monkeypatch.setattr(h.os.path, "isfile", lambda p: paths.get(p) == "file")
monkeypatch.setattr(h.os.path, "isdir", lambda p: paths.get(p) == "dir")
monkeypatch.setattr(
h.os,
"walk",
lambda p: [
(str(root), ["sub"], ["real.txt", "link"]),
(str(root / "sub"), [], ["nested.txt"]),
],
)
out = h._iter_matching_files("/whatever/*", cap=100)
assert str(root / "real.txt") in out
assert str(root / "sub" / "nested.txt") in out
assert str(root / "link") not in out
def test_parse_apt_signed_by_extracts_keyrings(tmp_path: Path):
f1 = tmp_path / "a.list"
f1.write_text(
"deb [signed-by=/usr/share/keyrings/foo.gpg] https://example.invalid stable main\n",
encoding="utf-8",
)
f2 = tmp_path / "b.sources"
f2.write_text(
"Types: deb\nSigned-By: /etc/apt/keyrings/bar.gpg, /usr/share/keyrings/baz.gpg\n",
encoding="utf-8",
)
f3 = tmp_path / "c.sources"
f3.write_text("Signed-By: | /bin/echo nope\n", encoding="utf-8")
out = h._parse_apt_signed_by([str(f1), str(f2), str(f3)])
assert "/usr/share/keyrings/foo.gpg" in out
assert "/etc/apt/keyrings/bar.gpg" in out
assert "/usr/share/keyrings/baz.gpg" in out
def test_iter_apt_capture_paths_includes_signed_by_keyring(monkeypatch):
# Simulate:
# /etc/apt/apt.conf.d/00test
# /etc/apt/sources.list.d/test.list (signed-by outside /etc/apt)
# /usr/share/keyrings/ext.gpg
files = {
"/etc/apt/apt.conf.d/00test": "file",
"/etc/apt/sources.list.d/test.list": "file",
"/usr/share/keyrings/ext.gpg": "file",
}
monkeypatch.setattr(h.os.path, "isdir", lambda p: p in {"/etc/apt"})
monkeypatch.setattr(
h.os,
"walk",
lambda root: [
("/etc/apt", ["apt.conf.d", "sources.list.d"], []),
("/etc/apt/apt.conf.d", [], ["00test"]),
("/etc/apt/sources.list.d", [], ["test.list"]),
],
)
monkeypatch.setattr(h.os.path, "islink", lambda p: False)
monkeypatch.setattr(h.os.path, "isfile", lambda p: files.get(p) == "file")
# Only treat the sources glob as having a hit.
def fake_iter_matching(spec: str, cap: int = 10000):
if spec == "/etc/apt/sources.list.d/*.list":
return ["/etc/apt/sources.list.d/test.list"]
return []
monkeypatch.setattr(h, "_iter_matching_files", fake_iter_matching)
# Provide file contents for the sources file.
real_open = open
def fake_open(path, *a, **k):
if path == "/etc/apt/sources.list.d/test.list":
return real_open(os.devnull, "r", encoding="utf-8") # placeholder
return real_open(path, *a, **k)
# Easier: patch _parse_apt_signed_by directly to avoid filesystem reads.
monkeypatch.setattr(
h, "_parse_apt_signed_by", lambda sfs: {"/usr/share/keyrings/ext.gpg"}
)
out = h._iter_apt_capture_paths()
paths = {p for p, _r in out}
reasons = {p: r for p, r in out}
assert "/etc/apt/apt.conf.d/00test" in paths
assert "/etc/apt/sources.list.d/test.list" in paths
assert "/usr/share/keyrings/ext.gpg" in paths
assert reasons["/usr/share/keyrings/ext.gpg"] == "apt_signed_by_keyring"
def test_iter_dnf_capture_paths(monkeypatch):
files = {
"/etc/dnf/dnf.conf": "file",
"/etc/yum/yum.conf": "file",
"/etc/yum.conf": "file",
"/etc/yum.repos.d/test.repo": "file",
"/etc/pki/rpm-gpg/RPM-GPG-KEY": "file",
}
def isdir(p):
return p in {"/etc/dnf", "/etc/yum", "/etc/yum.repos.d", "/etc/pki/rpm-gpg"}
def walk(root):
if root == "/etc/dnf":
return [("/etc/dnf", [], ["dnf.conf"])]
if root == "/etc/yum":
return [("/etc/yum", [], ["yum.conf"])]
if root == "/etc/pki/rpm-gpg":
return [("/etc/pki/rpm-gpg", [], ["RPM-GPG-KEY"])]
return []
monkeypatch.setattr(h.os.path, "isdir", isdir)
monkeypatch.setattr(h.os, "walk", walk)
monkeypatch.setattr(h.os.path, "islink", lambda p: False)
monkeypatch.setattr(h.os.path, "isfile", lambda p: files.get(p) == "file")
monkeypatch.setattr(
h,
"_iter_matching_files",
lambda spec, cap=10000: (
["/etc/yum.repos.d/test.repo"] if spec.endswith("*.repo") else []
),
)
out = h._iter_dnf_capture_paths()
paths = {p for p, _r in out}
assert "/etc/dnf/dnf.conf" in paths
assert "/etc/yum/yum.conf" in paths
assert "/etc/yum.conf" in paths
assert "/etc/yum.repos.d/test.repo" in paths
assert "/etc/pki/rpm-gpg/RPM-GPG-KEY" in paths
def test_iter_system_capture_paths_dedupes_first_reason(monkeypatch):
monkeypatch.setattr(h, "_SYSTEM_CAPTURE_GLOBS", [("/a", "r1"), ("/b", "r2")])
monkeypatch.setattr(
h,
"_iter_matching_files",
lambda spec, cap=10000: ["/dup"] if spec in {"/a", "/b"} else [],
)
out = h._iter_system_capture_paths()
assert out == [("/dup", "r1")]

View file

@ -1,12 +1,7 @@
import json
from pathlib import Path
import os
import stat
import tarfile
import pytest
import enroll.manifest as manifest
from enroll.manifest import manifest
def test_manifest_writes_roles_and_playbook_with_clean_when(tmp_path: Path):
@ -181,7 +176,7 @@ def test_manifest_writes_roles_and_playbook_with_clean_when(tmp_path: Path):
bundle / "artifacts" / "usr_local_custom" / "usr" / "local" / "bin" / "myscript"
).write_text("#!/bin/sh\necho hi\n", encoding="utf-8")
manifest.manifest(str(bundle), str(out))
manifest(str(bundle), str(out))
# Service role: systemd management should be gated on foo_manage_unit and a probe.
tasks = (out / "roles" / "foo" / "tasks" / "main.yml").read_text(encoding="utf-8")
@ -350,7 +345,7 @@ def test_manifest_site_mode_creates_host_inventory_and_raw_files(tmp_path: Path)
/ "myapp.conf"
).write_text("myapp=1\n", encoding="utf-8")
manifest.manifest(str(bundle), str(out), fqdn=fqdn)
manifest(str(bundle), str(out), fqdn=fqdn)
# Host playbook exists.
assert (out / "playbooks" / f"{fqdn}.yml").exists()
@ -487,7 +482,7 @@ def test_manifest_includes_dnf_config_role_when_present(tmp_path: Path):
bundle.mkdir(parents=True, exist_ok=True)
(bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8")
manifest.manifest(str(bundle), str(out))
manifest(str(bundle), str(out))
pb = (out / "playbook.yml").read_text(encoding="utf-8")
assert "- dnf_config" in pb
@ -507,291 +502,3 @@ def test_render_install_packages_tasks_contains_dnf_branch():
assert "ansible.builtin.dnf" in txt
assert "ansible.builtin.package" in txt
assert "pkg_mgr" in txt
def test_manifest_orders_cron_and_logrotate_at_playbook_tail(tmp_path: Path):
"""Cron/logrotate roles should appear at the end.
The cron role may restore per-user crontabs under /var/spool, so it should
run after users have been created.
"""
bundle = tmp_path / "bundle"
out = tmp_path / "ansible"
state = {
"schema_version": 3,
"host": {"hostname": "test", "os": "debian", "pkg_backend": "dpkg"},
"inventory": {"packages": {}},
"roles": {
"users": {
"role_name": "users",
"users": [{"name": "alice"}],
"managed_files": [],
"excluded": [],
"notes": [],
},
"services": [],
"packages": [
{
"package": "curl",
"role_name": "curl",
"managed_files": [],
"excluded": [],
"notes": [],
},
{
"package": "cron",
"role_name": "cron",
"managed_files": [
{
"path": "/var/spool/cron/crontabs/alice",
"src_rel": "var/spool/cron/crontabs/alice",
"owner": "alice",
"group": "root",
"mode": "0600",
"reason": "system_cron",
}
],
"excluded": [],
"notes": [],
},
{
"package": "logrotate",
"role_name": "logrotate",
"managed_files": [
{
"path": "/etc/logrotate.conf",
"src_rel": "etc/logrotate.conf",
"owner": "root",
"group": "root",
"mode": "0644",
"reason": "system_logrotate",
}
],
"excluded": [],
"notes": [],
},
],
"apt_config": {
"role_name": "apt_config",
"managed_files": [],
"excluded": [],
"notes": [],
},
"dnf_config": {
"role_name": "dnf_config",
"managed_files": [],
"excluded": [],
"notes": [],
},
"etc_custom": {
"role_name": "etc_custom",
"managed_files": [],
"excluded": [],
"notes": [],
},
"usr_local_custom": {
"role_name": "usr_local_custom",
"managed_files": [],
"excluded": [],
"notes": [],
},
"extra_paths": {
"role_name": "extra_paths",
"include_patterns": [],
"exclude_patterns": [],
"managed_files": [],
"excluded": [],
"notes": [],
},
},
}
# Minimal artifacts for managed files.
(bundle / "artifacts" / "cron" / "var" / "spool" / "cron" / "crontabs").mkdir(
parents=True, exist_ok=True
)
(
bundle / "artifacts" / "cron" / "var" / "spool" / "cron" / "crontabs" / "alice"
).write_text("@daily echo hi\n", encoding="utf-8")
(bundle / "artifacts" / "logrotate" / "etc").mkdir(parents=True, exist_ok=True)
(bundle / "artifacts" / "logrotate" / "etc" / "logrotate.conf").write_text(
"weekly\n", encoding="utf-8"
)
bundle.mkdir(parents=True, exist_ok=True)
(bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8")
manifest.manifest(str(bundle), str(out))
pb = (out / "playbook.yml").read_text(encoding="utf-8").splitlines()
# Roles are emitted as indented list items under the `roles:` key.
roles = [
ln.strip().removeprefix("- ").strip() for ln in pb if ln.startswith(" - ")
]
# Ensure tail ordering.
assert roles[-2:] == ["cron", "logrotate"]
assert "users" in roles
assert roles.index("users") < roles.index("cron")
def test_yaml_helpers_fallback_when_yaml_unavailable(monkeypatch):
monkeypatch.setattr(manifest, "_try_yaml", lambda: None)
assert manifest._yaml_load_mapping("foo: 1\n") == {}
out = manifest._yaml_dump_mapping({"b": 2, "a": 1})
# Best-effort fallback is key: repr(value)
assert out.splitlines()[0].startswith("a: ")
assert out.endswith("\n")
def test_copy2_replace_makes_readonly_sources_user_writable(
monkeypatch, tmp_path: Path
):
src = tmp_path / "src.txt"
dst = tmp_path / "dst.txt"
src.write_text("hello", encoding="utf-8")
# Make source read-only; copy2 preserves mode, so tmp will be read-only too.
os.chmod(src, 0o444)
manifest._copy2_replace(str(src), str(dst))
st = os.stat(dst, follow_symlinks=False)
assert stat.S_IMODE(st.st_mode) & stat.S_IWUSR
def test_prepare_bundle_dir_sops_decrypts_and_extracts(monkeypatch, tmp_path: Path):
enc = tmp_path / "harvest.tar.gz.sops"
enc.write_text("ignored", encoding="utf-8")
def fake_require():
return None
def fake_decrypt(src: str, dst: str, *, mode: int = 0o600):
# Create a minimal tar.gz with a state.json file.
with tarfile.open(dst, "w:gz") as tf:
p = tmp_path / "state.json"
p.write_text("{}", encoding="utf-8")
tf.add(p, arcname="state.json")
monkeypatch.setattr(manifest, "require_sops_cmd", fake_require)
monkeypatch.setattr(manifest, "decrypt_file_binary_to", fake_decrypt)
bundle_dir, td = manifest._prepare_bundle_dir(str(enc), sops_mode=True)
try:
assert (Path(bundle_dir) / "state.json").exists()
finally:
td.cleanup()
def test_prepare_bundle_dir_rejects_non_dir_without_sops(tmp_path: Path):
fp = tmp_path / "bundle.tar.gz"
fp.write_text("x", encoding="utf-8")
with pytest.raises(RuntimeError):
manifest._prepare_bundle_dir(str(fp), sops_mode=False)
def test_tar_dir_to_with_progress_writes_progress_when_tty(monkeypatch, tmp_path: Path):
src = tmp_path / "dir"
src.mkdir()
(src / "a.txt").write_text("a", encoding="utf-8")
(src / "b.txt").write_text("b", encoding="utf-8")
out = tmp_path / "out.tar.gz"
writes: list[bytes] = []
monkeypatch.setattr(manifest.os, "isatty", lambda fd: True)
monkeypatch.setattr(manifest.os, "write", lambda fd, b: writes.append(b) or len(b))
manifest._tar_dir_to_with_progress(str(src), str(out), desc="tarring")
assert out.exists()
assert writes # progress was written
assert writes[-1].endswith(b"\n")
def test_encrypt_manifest_out_dir_to_sops_handles_missing_tmp_cleanup(
monkeypatch, tmp_path: Path
):
src_dir = tmp_path / "manifest"
src_dir.mkdir()
(src_dir / "x.txt").write_text("x", encoding="utf-8")
out = tmp_path / "manifest.tar.gz.sops"
monkeypatch.setattr(manifest, "require_sops_cmd", lambda: None)
def fake_encrypt(in_fp, out_fp, *args, **kwargs):
Path(out_fp).write_text("enc", encoding="utf-8")
monkeypatch.setattr(manifest, "encrypt_file_binary", fake_encrypt)
# Simulate race where tmp tar is already removed.
monkeypatch.setattr(
manifest.os, "unlink", lambda p: (_ for _ in ()).throw(FileNotFoundError())
)
res = manifest._encrypt_manifest_out_dir_to_sops(str(src_dir), str(out), ["ABC"]) # type: ignore[arg-type]
assert str(res).endswith(".sops")
assert out.exists()
def test_manifest_applies_jinjaturtle_to_jinjifyable_managed_file(
monkeypatch, tmp_path: Path
):
# Create a minimal bundle with just an apt_config snapshot.
bundle = tmp_path / "bundle"
(bundle / "artifacts" / "apt_config" / "etc" / "apt").mkdir(parents=True)
(bundle / "artifacts" / "apt_config" / "etc" / "apt" / "foo.ini").write_text(
"key=VALUE\n", encoding="utf-8"
)
state = {
"schema_version": 1,
"inventory": {"packages": {}},
"roles": {
"services": [],
"packages": [],
"apt_config": {
"role_name": "apt_config",
"managed_files": [
{
"path": "/etc/apt/foo.ini",
"src_rel": "etc/apt/foo.ini",
"owner": "root",
"group": "root",
"mode": "0644",
"reason": "apt_config",
}
],
"managed_dirs": [],
"excluded": [],
"notes": [],
},
},
}
(bundle / "state.json").write_text(
__import__("json").dumps(state), encoding="utf-8"
)
monkeypatch.setattr(manifest, "find_jinjaturtle_cmd", lambda: "jinjaturtle")
class _Res:
template_text = "key={{ foo }}\n"
vars_text = "foo: 123\n"
monkeypatch.setattr(manifest, "run_jinjaturtle", lambda *a, **k: _Res())
out_dir = tmp_path / "out"
manifest.manifest(str(bundle), str(out_dir), jinjaturtle="on")
tmpl = out_dir / "roles" / "apt_config" / "templates" / "etc" / "apt" / "foo.ini.j2"
assert tmpl.exists()
assert "{{ foo }}" in tmpl.read_text(encoding="utf-8")
defaults = out_dir / "roles" / "apt_config" / "defaults" / "main.yml"
txt = defaults.read_text(encoding="utf-8")
assert "foo: 123" in txt
# Non-templated file should not exist under files/.
assert not (
out_dir / "roles" / "apt_config" / "files" / "etc" / "apt" / "foo.ini"
).exists()

View file

@ -1,13 +1,5 @@
from __future__ import annotations
import json
import os
import stat
import subprocess
import sys
import types
from pathlib import Path
from types import SimpleNamespace
import pytest
@ -102,315 +94,3 @@ def test_sops_pgp_arg_and_encrypt_decrypt_roundtrip(tmp_path: Path, monkeypatch)
# Sanity: we invoked encrypt and decrypt.
assert any("--encrypt" in c for c in calls)
assert any("--decrypt" in c for c in calls)
def test_cache_dir_defaults_to_home_cache(monkeypatch, tmp_path: Path):
# Ensure default path uses ~/.cache when XDG_CACHE_HOME is unset.
from enroll.cache import enroll_cache_dir
monkeypatch.delenv("XDG_CACHE_HOME", raising=False)
monkeypatch.setattr(Path, "home", lambda: tmp_path)
p = enroll_cache_dir()
assert str(p).startswith(str(tmp_path))
assert p.name == "enroll"
def test_harvest_cache_state_json_property(tmp_path: Path):
from enroll.cache import HarvestCache
hc = HarvestCache(tmp_path / "h1")
assert hc.state_json == hc.dir / "state.json"
def test_cache_dir_security_rejects_symlink(tmp_path: Path):
from enroll.cache import _ensure_dir_secure
real = tmp_path / "real"
real.mkdir()
link = tmp_path / "link"
link.symlink_to(real, target_is_directory=True)
with pytest.raises(RuntimeError, match="Refusing to use symlink"):
_ensure_dir_secure(link)
def test_cache_dir_chmod_failures_are_ignored(monkeypatch, tmp_path: Path):
from enroll import cache
# Make the cache base path deterministic and writable.
monkeypatch.setattr(cache, "enroll_cache_dir", lambda: tmp_path)
# Force os.chmod to fail to cover the "except OSError: pass" paths.
monkeypatch.setattr(
os, "chmod", lambda *a, **k: (_ for _ in ()).throw(OSError("nope"))
)
hc = cache.new_harvest_cache_dir()
assert hc.dir.exists()
assert hc.dir.is_dir()
def test_stat_triplet_falls_back_to_numeric_ids(monkeypatch, tmp_path: Path):
from enroll.fsutil import stat_triplet
import pwd
import grp
p = tmp_path / "x"
p.write_text("x", encoding="utf-8")
# Force username/group resolution failures.
monkeypatch.setattr(
pwd, "getpwuid", lambda _uid: (_ for _ in ()).throw(KeyError("no user"))
)
monkeypatch.setattr(
grp, "getgrgid", lambda _gid: (_ for _ in ()).throw(KeyError("no group"))
)
owner, group, mode = stat_triplet(str(p))
assert owner.isdigit()
assert group.isdigit()
assert len(mode) == 4
def test_ignore_policy_iter_effective_lines_removes_block_comments():
from enroll.ignore import IgnorePolicy
pol = IgnorePolicy()
data = b"""keep1
/*
drop me
*/
keep2
"""
assert list(pol.iter_effective_lines(data)) == [b"keep1", b"keep2"]
def test_ignore_policy_deny_reason_dir_variants(tmp_path: Path):
from enroll.ignore import IgnorePolicy
pol = IgnorePolicy()
# denied by glob
assert pol.deny_reason_dir("/etc/shadow") == "denied_path"
# symlink rejected
d = tmp_path / "d"
d.mkdir()
link = tmp_path / "l"
link.symlink_to(d, target_is_directory=True)
assert pol.deny_reason_dir(str(link)) == "symlink"
# not a directory
f = tmp_path / "f"
f.write_text("x", encoding="utf-8")
assert pol.deny_reason_dir(str(f)) == "not_directory"
# ok
assert pol.deny_reason_dir(str(d)) is None
def test_run_jinjaturtle_parses_outputs(monkeypatch, tmp_path: Path):
# Fully unit-test enroll.jinjaturtle.run_jinjaturtle by stubbing subprocess.run.
from enroll.jinjaturtle import run_jinjaturtle
def fake_run(cmd, **kwargs): # noqa: ARG001
# cmd includes "-d <defaults> -t <template>"
d_idx = cmd.index("-d") + 1
t_idx = cmd.index("-t") + 1
defaults = Path(cmd[d_idx])
template = Path(cmd[t_idx])
defaults.write_text("---\nfoo: 1\n", encoding="utf-8")
template.write_text("value={{ foo }}\n", encoding="utf-8")
return SimpleNamespace(returncode=0, stdout="ok", stderr="")
monkeypatch.setattr(subprocess, "run", fake_run)
src = tmp_path / "src.ini"
src.write_text("foo=1\n", encoding="utf-8")
res = run_jinjaturtle("/bin/jinjaturtle", str(src), role_name="role1")
assert "foo: 1" in res.vars_text
assert "value=" in res.template_text
def test_run_jinjaturtle_raises_on_failure(monkeypatch, tmp_path: Path):
from enroll.jinjaturtle import run_jinjaturtle
def fake_run(cmd, **kwargs): # noqa: ARG001
return SimpleNamespace(returncode=2, stdout="out", stderr="bad")
monkeypatch.setattr(subprocess, "run", fake_run)
src = tmp_path / "src.ini"
src.write_text("x", encoding="utf-8")
with pytest.raises(RuntimeError, match="jinjaturtle failed"):
run_jinjaturtle("/bin/jinjaturtle", str(src), role_name="role1")
def test_require_sops_cmd_errors_when_missing(monkeypatch):
from enroll.sopsutil import require_sops_cmd, SopsError
monkeypatch.setattr("enroll.sopsutil.shutil.which", lambda _: None)
with pytest.raises(SopsError, match="not found on PATH"):
require_sops_cmd()
def test_get_enroll_version_reports_unknown_on_metadata_failure(monkeypatch):
import enroll.version as v
fake_meta = types.ModuleType("importlib.metadata")
def boom():
raise RuntimeError("boom")
fake_meta.packages_distributions = boom
fake_meta.version = lambda _dist: boom()
monkeypatch.setitem(sys.modules, "importlib.metadata", fake_meta)
assert v.get_enroll_version() == "unknown"
def test_get_enroll_version_returns_unknown_if_importlib_metadata_unavailable(
monkeypatch,
):
import builtins
import enroll.version as v
real_import = builtins.__import__
def fake_import(
name, globals=None, locals=None, fromlist=(), level=0
): # noqa: A002
if name == "importlib.metadata":
raise ImportError("no metadata")
return real_import(name, globals, locals, fromlist, level)
monkeypatch.setattr(builtins, "__import__", fake_import)
assert v.get_enroll_version() == "unknown"
def test_compare_harvests_and_format_report(tmp_path: Path):
from enroll.diff import compare_harvests, format_report
old = tmp_path / "old"
new = tmp_path / "new"
(old / "artifacts").mkdir(parents=True)
(new / "artifacts").mkdir(parents=True)
def write_state(base: Path, state: dict) -> None:
base.mkdir(parents=True, exist_ok=True)
(base / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8")
# Old bundle: pkg a@1.0, pkg b@1.0, one service, one user, one managed file.
old_state = {
"schema_version": 3,
"host": {"hostname": "h1"},
"inventory": {"packages": {"a": {"version": "1.0"}, "b": {"version": "1.0"}}},
"roles": {
"services": [
{
"unit": "svc.service",
"role_name": "svc",
"packages": ["a"],
"active_state": "inactive",
"sub_state": "dead",
"unit_file_state": "enabled",
"condition_result": None,
"managed_files": [
{
"path": "/etc/foo.conf",
"src_rel": "etc/foo.conf",
"owner": "root",
"group": "root",
"mode": "0644",
"reason": "modified_conffile",
}
],
}
],
"packages": [],
"users": {
"role_name": "users",
"users": [{"name": "alice", "shell": "/bin/sh"}],
},
"apt_config": {"role_name": "apt_config", "managed_files": []},
"etc_custom": {"role_name": "etc_custom", "managed_files": []},
"usr_local_custom": {"role_name": "usr_local_custom", "managed_files": []},
"extra_paths": {"role_name": "extra_paths", "managed_files": []},
},
}
(old / "artifacts" / "svc" / "etc").mkdir(parents=True, exist_ok=True)
(old / "artifacts" / "svc" / "etc" / "foo.conf").write_text("old", encoding="utf-8")
write_state(old, old_state)
# New bundle: pkg a@2.0, pkg c@1.0, service changed, user changed, file moved role+content.
new_state = {
"schema_version": 3,
"host": {"hostname": "h2"},
"inventory": {"packages": {"a": {"version": "2.0"}, "c": {"version": "1.0"}}},
"roles": {
"services": [
{
"unit": "svc.service",
"role_name": "svc",
"packages": ["a", "c"],
"active_state": "active",
"sub_state": "running",
"unit_file_state": "enabled",
"condition_result": None,
"managed_files": [],
}
],
"packages": [],
"users": {
"role_name": "users",
"users": [{"name": "alice", "shell": "/bin/bash"}, {"name": "bob"}],
},
"apt_config": {"role_name": "apt_config", "managed_files": []},
"etc_custom": {"role_name": "etc_custom", "managed_files": []},
"usr_local_custom": {"role_name": "usr_local_custom", "managed_files": []},
"extra_paths": {
"role_name": "extra_paths",
"managed_files": [
{
"path": "/etc/foo.conf",
"src_rel": "etc/foo.conf",
"owner": "root",
"group": "root",
"mode": "0600",
"reason": "user_include",
},
{
"path": "/etc/added.conf",
"src_rel": "etc/added.conf",
"owner": "root",
"group": "root",
"mode": "0644",
"reason": "user_include",
},
],
},
},
}
(new / "artifacts" / "extra_paths" / "etc").mkdir(parents=True, exist_ok=True)
(new / "artifacts" / "extra_paths" / "etc" / "foo.conf").write_text(
"new", encoding="utf-8"
)
(new / "artifacts" / "extra_paths" / "etc" / "added.conf").write_text(
"x", encoding="utf-8"
)
write_state(new, new_state)
report, changed = compare_harvests(str(old), str(new))
assert changed is True
txt = format_report(report, fmt="text")
assert "Packages" in txt
md = format_report(report, fmt="markdown")
assert "# enroll diff report" in md
js = format_report(report, fmt="json")
parsed = json.loads(js)
assert parsed["packages"]["added"] == ["c"]

323
tests/test_more_coverage.py Normal file
View file

@ -0,0 +1,323 @@
from __future__ import annotations
import json
import os
import subprocess
import sys
import types
from pathlib import Path
from types import SimpleNamespace
import pytest
def test_cache_dir_defaults_to_home_cache(monkeypatch, tmp_path: Path):
# Ensure default path uses ~/.cache when XDG_CACHE_HOME is unset.
from enroll.cache import enroll_cache_dir
monkeypatch.delenv("XDG_CACHE_HOME", raising=False)
monkeypatch.setattr(Path, "home", lambda: tmp_path)
p = enroll_cache_dir()
assert str(p).startswith(str(tmp_path))
assert p.name == "enroll"
def test_harvest_cache_state_json_property(tmp_path: Path):
from enroll.cache import HarvestCache
hc = HarvestCache(tmp_path / "h1")
assert hc.state_json == hc.dir / "state.json"
def test_cache_dir_security_rejects_symlink(tmp_path: Path):
from enroll.cache import _ensure_dir_secure
real = tmp_path / "real"
real.mkdir()
link = tmp_path / "link"
link.symlink_to(real, target_is_directory=True)
with pytest.raises(RuntimeError, match="Refusing to use symlink"):
_ensure_dir_secure(link)
def test_cache_dir_chmod_failures_are_ignored(monkeypatch, tmp_path: Path):
from enroll import cache
# Make the cache base path deterministic and writable.
monkeypatch.setattr(cache, "enroll_cache_dir", lambda: tmp_path)
# Force os.chmod to fail to cover the "except OSError: pass" paths.
monkeypatch.setattr(
os, "chmod", lambda *a, **k: (_ for _ in ()).throw(OSError("nope"))
)
hc = cache.new_harvest_cache_dir()
assert hc.dir.exists()
assert hc.dir.is_dir()
def test_stat_triplet_falls_back_to_numeric_ids(monkeypatch, tmp_path: Path):
from enroll.fsutil import stat_triplet
import pwd
import grp
p = tmp_path / "x"
p.write_text("x", encoding="utf-8")
# Force username/group resolution failures.
monkeypatch.setattr(
pwd, "getpwuid", lambda _uid: (_ for _ in ()).throw(KeyError("no user"))
)
monkeypatch.setattr(
grp, "getgrgid", lambda _gid: (_ for _ in ()).throw(KeyError("no group"))
)
owner, group, mode = stat_triplet(str(p))
assert owner.isdigit()
assert group.isdigit()
assert len(mode) == 4
def test_ignore_policy_iter_effective_lines_removes_block_comments():
from enroll.ignore import IgnorePolicy
pol = IgnorePolicy()
data = b"""keep1
/*
drop me
*/
keep2
"""
assert list(pol.iter_effective_lines(data)) == [b"keep1", b"keep2"]
def test_ignore_policy_deny_reason_dir_variants(tmp_path: Path):
from enroll.ignore import IgnorePolicy
pol = IgnorePolicy()
# denied by glob
assert pol.deny_reason_dir("/etc/shadow") == "denied_path"
# symlink rejected
d = tmp_path / "d"
d.mkdir()
link = tmp_path / "l"
link.symlink_to(d, target_is_directory=True)
assert pol.deny_reason_dir(str(link)) == "symlink"
# not a directory
f = tmp_path / "f"
f.write_text("x", encoding="utf-8")
assert pol.deny_reason_dir(str(f)) == "not_directory"
# ok
assert pol.deny_reason_dir(str(d)) is None
def test_run_jinjaturtle_parses_outputs(monkeypatch, tmp_path: Path):
# Fully unit-test enroll.jinjaturtle.run_jinjaturtle by stubbing subprocess.run.
from enroll.jinjaturtle import run_jinjaturtle
def fake_run(cmd, **kwargs): # noqa: ARG001
# cmd includes "-d <defaults> -t <template>"
d_idx = cmd.index("-d") + 1
t_idx = cmd.index("-t") + 1
defaults = Path(cmd[d_idx])
template = Path(cmd[t_idx])
defaults.write_text("---\nfoo: 1\n", encoding="utf-8")
template.write_text("value={{ foo }}\n", encoding="utf-8")
return SimpleNamespace(returncode=0, stdout="ok", stderr="")
monkeypatch.setattr(subprocess, "run", fake_run)
src = tmp_path / "src.ini"
src.write_text("foo=1\n", encoding="utf-8")
res = run_jinjaturtle("/bin/jinjaturtle", str(src), role_name="role1")
assert "foo: 1" in res.vars_text
assert "value=" in res.template_text
def test_run_jinjaturtle_raises_on_failure(monkeypatch, tmp_path: Path):
from enroll.jinjaturtle import run_jinjaturtle
def fake_run(cmd, **kwargs): # noqa: ARG001
return SimpleNamespace(returncode=2, stdout="out", stderr="bad")
monkeypatch.setattr(subprocess, "run", fake_run)
src = tmp_path / "src.ini"
src.write_text("x", encoding="utf-8")
with pytest.raises(RuntimeError, match="jinjaturtle failed"):
run_jinjaturtle("/bin/jinjaturtle", str(src), role_name="role1")
def test_require_sops_cmd_errors_when_missing(monkeypatch):
from enroll.sopsutil import require_sops_cmd, SopsError
monkeypatch.setattr("enroll.sopsutil.shutil.which", lambda _: None)
with pytest.raises(SopsError, match="not found on PATH"):
require_sops_cmd()
def test_get_enroll_version_reports_unknown_on_metadata_failure(monkeypatch):
import enroll.version as v
fake_meta = types.ModuleType("importlib.metadata")
def boom():
raise RuntimeError("boom")
fake_meta.packages_distributions = boom
fake_meta.version = lambda _dist: boom()
monkeypatch.setitem(sys.modules, "importlib.metadata", fake_meta)
assert v.get_enroll_version() == "unknown"
def test_get_enroll_version_returns_unknown_if_importlib_metadata_unavailable(
monkeypatch,
):
import builtins
import enroll.version as v
real_import = builtins.__import__
def fake_import(
name, globals=None, locals=None, fromlist=(), level=0
): # noqa: A002
if name == "importlib.metadata":
raise ImportError("no metadata")
return real_import(name, globals, locals, fromlist, level)
monkeypatch.setattr(builtins, "__import__", fake_import)
assert v.get_enroll_version() == "unknown"
def test_compare_harvests_and_format_report(tmp_path: Path):
from enroll.diff import compare_harvests, format_report
old = tmp_path / "old"
new = tmp_path / "new"
(old / "artifacts").mkdir(parents=True)
(new / "artifacts").mkdir(parents=True)
def write_state(base: Path, state: dict) -> None:
base.mkdir(parents=True, exist_ok=True)
(base / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8")
# Old bundle: pkg a@1.0, pkg b@1.0, one service, one user, one managed file.
old_state = {
"schema_version": 3,
"host": {"hostname": "h1"},
"inventory": {"packages": {"a": {"version": "1.0"}, "b": {"version": "1.0"}}},
"roles": {
"services": [
{
"unit": "svc.service",
"role_name": "svc",
"packages": ["a"],
"active_state": "inactive",
"sub_state": "dead",
"unit_file_state": "enabled",
"condition_result": None,
"managed_files": [
{
"path": "/etc/foo.conf",
"src_rel": "etc/foo.conf",
"owner": "root",
"group": "root",
"mode": "0644",
"reason": "modified_conffile",
}
],
}
],
"packages": [],
"users": {
"role_name": "users",
"users": [{"name": "alice", "shell": "/bin/sh"}],
},
"apt_config": {"role_name": "apt_config", "managed_files": []},
"etc_custom": {"role_name": "etc_custom", "managed_files": []},
"usr_local_custom": {"role_name": "usr_local_custom", "managed_files": []},
"extra_paths": {"role_name": "extra_paths", "managed_files": []},
},
}
(old / "artifacts" / "svc" / "etc").mkdir(parents=True, exist_ok=True)
(old / "artifacts" / "svc" / "etc" / "foo.conf").write_text("old", encoding="utf-8")
write_state(old, old_state)
# New bundle: pkg a@2.0, pkg c@1.0, service changed, user changed, file moved role+content.
new_state = {
"schema_version": 3,
"host": {"hostname": "h2"},
"inventory": {"packages": {"a": {"version": "2.0"}, "c": {"version": "1.0"}}},
"roles": {
"services": [
{
"unit": "svc.service",
"role_name": "svc",
"packages": ["a", "c"],
"active_state": "active",
"sub_state": "running",
"unit_file_state": "enabled",
"condition_result": None,
"managed_files": [],
}
],
"packages": [],
"users": {
"role_name": "users",
"users": [{"name": "alice", "shell": "/bin/bash"}, {"name": "bob"}],
},
"apt_config": {"role_name": "apt_config", "managed_files": []},
"etc_custom": {"role_name": "etc_custom", "managed_files": []},
"usr_local_custom": {"role_name": "usr_local_custom", "managed_files": []},
"extra_paths": {
"role_name": "extra_paths",
"managed_files": [
{
"path": "/etc/foo.conf",
"src_rel": "etc/foo.conf",
"owner": "root",
"group": "root",
"mode": "0600",
"reason": "user_include",
},
{
"path": "/etc/added.conf",
"src_rel": "etc/added.conf",
"owner": "root",
"group": "root",
"mode": "0644",
"reason": "user_include",
},
],
},
},
}
(new / "artifacts" / "extra_paths" / "etc").mkdir(parents=True, exist_ok=True)
(new / "artifacts" / "extra_paths" / "etc" / "foo.conf").write_text(
"new", encoding="utf-8"
)
(new / "artifacts" / "extra_paths" / "etc" / "added.conf").write_text(
"x", encoding="utf-8"
)
write_state(new, new_state)
report, changed = compare_harvests(str(old), str(new))
assert changed is True
txt = format_report(report, fmt="text")
assert "Packages" in txt
md = format_report(report, fmt="markdown")
assert "# enroll diff report" in md
js = format_report(report, fmt="json")
parsed = json.loads(js)
assert parsed["packages"]["added"] == ["c"]

View file

@ -3,8 +3,6 @@ from __future__ import annotations
import os
from pathlib import Path
import enroll.pathfilter as pf
def test_compile_and_match_prefix_glob_and_regex(tmp_path: Path):
from enroll.pathfilter import PathFilter, compile_path_pattern
@ -80,107 +78,3 @@ def test_expand_includes_notes_on_no_matches(tmp_path: Path):
paths, notes = expand_includes(pats, max_files=10)
assert paths == []
assert any("matched no files" in n.lower() for n in notes)
def test_expand_includes_supports_regex_with_inferred_root(tmp_path: Path):
"""Regex includes are expanded by walking an inferred literal prefix root."""
from enroll.pathfilter import compile_path_pattern, expand_includes
root = tmp_path / "root"
(root / "home" / "alice" / ".config" / "myapp").mkdir(parents=True)
target = root / "home" / "alice" / ".config" / "myapp" / "settings.ini"
target.write_text("x=1\n", encoding="utf-8")
# This is anchored and begins with an absolute path, so expand_includes should
# infer a narrow walk root instead of scanning '/'.
rex = rf"re:^{root}/home/[^/]+/\.config/myapp/.*$"
pat = compile_path_pattern(rex)
paths, notes = expand_includes([pat], max_files=10)
assert str(target) in paths
assert notes == []
def test_compile_path_pattern_normalises_relative_prefix():
from enroll.pathfilter import compile_path_pattern
p = compile_path_pattern("etc/ssh")
assert p.kind == "prefix"
assert p.value == "/etc/ssh"
def test_norm_abs_empty_string_is_root():
assert pf._norm_abs("") == "/"
def test_posix_match_invalid_pattern_fails_closed(monkeypatch):
# Force PurePosixPath.match to raise to cover the exception handler.
real_match = pf.PurePosixPath.match
def boom(self, pat):
raise ValueError("bad pattern")
monkeypatch.setattr(pf.PurePosixPath, "match", boom)
try:
assert pf._posix_match("/etc/hosts", "[bad") is False
finally:
monkeypatch.setattr(pf.PurePosixPath, "match", real_match)
def test_regex_literal_prefix_handles_escapes():
# Prefix stops at meta chars but includes escaped literals.
assert pf._regex_literal_prefix(r"^/etc/\./foo") == "/etc/./foo"
def test_expand_includes_maybe_add_file_skips_non_files(monkeypatch, tmp_path: Path):
# Drive the _maybe_add_file branch that rejects symlinks/non-files.
pats = [pf.compile_path_pattern(str(tmp_path / "missing"))]
monkeypatch.setattr(pf.os.path, "isfile", lambda p: False)
monkeypatch.setattr(pf.os.path, "islink", lambda p: False)
monkeypatch.setattr(pf.os.path, "isdir", lambda p: False)
paths, notes = pf.expand_includes(pats, max_files=10)
assert paths == []
assert any("matched no files" in n for n in notes)
def test_expand_includes_prunes_excluded_dirs(monkeypatch):
include = [pf.compile_path_pattern("/root/**")]
exclude = pf.PathFilter(exclude=["/root/skip/**"])
# Simulate filesystem walk:
# /root has dirnames ['skip', 'keep'] but skip should be pruned.
monkeypatch.setattr(
pf.os.path,
"isdir",
lambda p: p in {"/root", "/root/keep", "/root/skip"},
)
monkeypatch.setattr(pf.os.path, "islink", lambda p: False)
monkeypatch.setattr(pf.os.path, "isfile", lambda p: True)
def walk(root, followlinks=False):
assert root == "/root"
yield ("/root", ["skip", "keep"], [])
yield ("/root/keep", [], ["a.txt"])
# If pruning works, we should never walk into /root/skip.
monkeypatch.setattr(pf.os, "walk", walk)
paths, _notes = pf.expand_includes(include, exclude=exclude, max_files=10)
assert "/root/keep/a.txt" in paths
assert not any(p.startswith("/root/skip") for p in paths)
def test_expand_includes_respects_max_files(monkeypatch):
include = [pf.compile_path_pattern("/root/**")]
monkeypatch.setattr(pf.os.path, "isdir", lambda p: p == "/root")
monkeypatch.setattr(pf.os.path, "islink", lambda p: False)
monkeypatch.setattr(pf.os.path, "isfile", lambda p: True)
monkeypatch.setattr(
pf.os,
"walk",
lambda root, followlinks=False: [("/root", [], ["a", "b", "c"])],
)
paths, notes = pf.expand_includes(include, max_files=2)
assert len(paths) == 2
assert "/root/c" not in paths

View file

@ -129,50 +129,3 @@ def test_rpm_config_files_and_modified_files_parsing(monkeypatch):
rpm, "_run", lambda cmd, allow_fail=False, merge_err=False: (1, out)
)
assert rpm.rpm_modified_files("mypkg") == {"/etc/foo.conf", "/etc/bar"}
def test_list_manual_packages_uses_yum_fallback(monkeypatch):
# No dnf, yum present.
monkeypatch.setattr(
rpm.shutil, "which", lambda exe: "/usr/bin/yum" if exe == "yum" else None
)
def fake_run(cmd, allow_fail=False, merge_err=False):
assert cmd[:3] == ["yum", "-q", "history"]
return 0, "Installed Packages\nvim-enhanced.x86_64\nhtop\n"
monkeypatch.setattr(rpm, "_run", fake_run)
assert rpm.list_manual_packages() == ["htop", "vim-enhanced"]
def test_list_installed_packages_parses_epoch_and_sorts(monkeypatch):
out = (
"bash\t0\t5.2.26\t1.el9\tx86_64\n"
"bash\t1\t5.2.26\t1.el9\taarch64\n"
"coreutils\t(none)\t9.1\t2.el9\tx86_64\n"
)
monkeypatch.setattr(
rpm, "_run", lambda cmd, allow_fail=False, merge_err=False: (0, out)
)
pkgs = rpm.list_installed_packages()
assert pkgs["bash"][0]["arch"] == "aarch64" # sorted by arch then version
assert pkgs["bash"][0]["version"].startswith("1:")
assert pkgs["coreutils"][0]["version"] == "9.1-2.el9"
def test_rpm_config_files_returns_empty_on_failure(monkeypatch):
monkeypatch.setattr(
rpm, "_run", lambda cmd, allow_fail=False, merge_err=False: (1, "")
)
assert rpm.rpm_config_files("missing") == set()
def test_rpm_owner_strips_epoch_prefix_when_present(monkeypatch):
# Defensive: rpm output might include epoch-like token.
monkeypatch.setattr(
rpm,
"_run",
lambda cmd, allow_fail=False, merge_err=False: (0, "1:bash\n"),
)
assert rpm.rpm_owner("/bin/bash") == "bash"

View file

@ -1,31 +0,0 @@
from __future__ import annotations
import types
import pytest
import enroll.rpm as rpm
def test_run_raises_on_nonzero_returncode_when_not_allow_fail(monkeypatch):
def fake_run(cmd, check, text, stdout, stderr):
return types.SimpleNamespace(returncode=1, stdout="OUT", stderr="ERR")
monkeypatch.setattr(rpm.subprocess, "run", fake_run)
with pytest.raises(RuntimeError) as e:
rpm._run(["rpm", "-q"]) # type: ignore[attr-defined]
assert "Command failed" in str(e.value)
assert "ERR" in str(e.value)
assert "OUT" in str(e.value)
def test_run_merge_err_includes_stderr_in_stdout(monkeypatch):
def fake_run(cmd, check, text, stdout, stderr):
# When merge_err is True, stderr is redirected to STDOUT, so we only
# rely on stdout in our wrapper.
return types.SimpleNamespace(returncode=0, stdout="COMBINED", stderr=None)
monkeypatch.setattr(rpm.subprocess, "run", fake_run)
rc, out = rpm._run(["rpm", "-q"], merge_err=True)
assert rc == 0
assert out == "COMBINED"