Compare commits
No commits in common. "a1433d645f7b0964db05eb51d55c2de980ee19e5" and "24cedc8c8df6d8f00a4b48f3b6393077cb5b45ef" have entirely different histories.
a1433d645f
...
24cedc8c8d
15 changed files with 382 additions and 1760 deletions
|
|
@ -2,7 +2,6 @@
|
|||
|
||||
* Introduce `enroll explain` - a tool to analyze and explain what's in (or not in) a harvest and why.
|
||||
* Centralise the cron and logrotate stuff into their respective roles, we had a bit of duplication between roles based on harvest discovery.
|
||||
* Capture other files in the user's home directory such as `.bashrc`, `.bash_aliases`, `.profile`, if these files differ from the `/etc/skel` defaults
|
||||
|
||||
# 0.2.3
|
||||
|
||||
|
|
|
|||
|
|
@ -914,6 +914,56 @@ def main() -> None:
|
|||
fqdn=args.fqdn,
|
||||
jinjaturtle=_jt_mode(args),
|
||||
)
|
||||
elif args.cmd == "diff":
|
||||
report, has_changes = compare_harvests(
|
||||
args.old, args.new, sops_mode=bool(getattr(args, "sops", False))
|
||||
)
|
||||
|
||||
rendered = format_report(report, fmt=str(args.format))
|
||||
if args.out:
|
||||
Path(args.out).expanduser().write_text(rendered, encoding="utf-8")
|
||||
else:
|
||||
print(rendered, end="")
|
||||
|
||||
do_notify = bool(has_changes or getattr(args, "notify_always", False))
|
||||
|
||||
if do_notify and getattr(args, "webhook", None):
|
||||
wf = str(getattr(args, "webhook_format", "json"))
|
||||
body = format_report(report, fmt=wf).encode("utf-8")
|
||||
headers = {"User-Agent": "enroll"}
|
||||
if wf == "json":
|
||||
headers["Content-Type"] = "application/json"
|
||||
else:
|
||||
headers["Content-Type"] = "text/plain; charset=utf-8"
|
||||
for hv in getattr(args, "webhook_header", []) or []:
|
||||
if ":" not in hv:
|
||||
raise SystemExit(
|
||||
"error: --webhook-header must be in the form 'K:V'"
|
||||
)
|
||||
k, v = hv.split(":", 1)
|
||||
headers[k.strip()] = v.strip()
|
||||
status, _ = post_webhook(str(args.webhook), body, headers=headers)
|
||||
if status and status >= 400:
|
||||
raise SystemExit(f"error: webhook returned HTTP {status}")
|
||||
|
||||
if do_notify and (getattr(args, "email_to", []) or []):
|
||||
subject = getattr(args, "email_subject", None) or "enroll diff report"
|
||||
smtp_password = None
|
||||
pw_env = getattr(args, "smtp_password_env", None)
|
||||
if pw_env:
|
||||
smtp_password = os.environ.get(str(pw_env))
|
||||
send_email(
|
||||
to_addrs=list(getattr(args, "email_to", []) or []),
|
||||
subject=str(subject),
|
||||
body=rendered,
|
||||
from_addr=getattr(args, "email_from", None),
|
||||
smtp=getattr(args, "smtp", None),
|
||||
smtp_user=getattr(args, "smtp_user", None),
|
||||
smtp_password=smtp_password,
|
||||
)
|
||||
|
||||
if getattr(args, "exit_code", False) and has_changes:
|
||||
raise SystemExit(2)
|
||||
except RemoteSudoPasswordRequired:
|
||||
raise SystemExit(
|
||||
"error: remote sudo requires a password. Re-run with --ask-become-pass."
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@ import json
|
|||
import os
|
||||
import re
|
||||
import shutil
|
||||
import stat
|
||||
import time
|
||||
from dataclasses import dataclass, asdict, field
|
||||
from typing import Dict, List, Optional, Set
|
||||
|
|
@ -158,54 +157,6 @@ MAX_FILES_CAP = 4000
|
|||
MAX_UNOWNED_FILES_PER_ROLE = 500
|
||||
|
||||
|
||||
def _files_differ(a: str, b: str, *, max_bytes: int = 2_000_000) -> bool:
|
||||
"""Return True if file `a` differs from file `b`.
|
||||
|
||||
Best-effort and conservative:
|
||||
- If `b` (baseline) does not exist or is not a regular file, treat as
|
||||
"different" so we err on the side of capturing user state.
|
||||
- If we can't stat/read either file, treat as "different" (capture will
|
||||
later be filtered via IgnorePolicy).
|
||||
- If files are large, avoid reading them fully.
|
||||
"""
|
||||
|
||||
try:
|
||||
st_a = os.stat(a, follow_symlinks=True)
|
||||
except OSError:
|
||||
return True
|
||||
|
||||
# Refuse to do content comparisons on non-regular files.
|
||||
if not stat.S_ISREG(st_a.st_mode):
|
||||
return True
|
||||
|
||||
try:
|
||||
st_b = os.stat(b, follow_symlinks=True)
|
||||
except OSError:
|
||||
return True
|
||||
|
||||
if not stat.S_ISREG(st_b.st_mode):
|
||||
return True
|
||||
|
||||
if st_a.st_size != st_b.st_size:
|
||||
return True
|
||||
|
||||
# If it's unexpectedly big, treat as different to avoid expensive reads.
|
||||
if st_a.st_size > max_bytes:
|
||||
return True
|
||||
|
||||
try:
|
||||
with open(a, "rb") as fa, open(b, "rb") as fb:
|
||||
while True:
|
||||
ca = fa.read(1024 * 64)
|
||||
cb = fb.read(1024 * 64)
|
||||
if ca != cb:
|
||||
return True
|
||||
if not ca: # EOF on both
|
||||
return False
|
||||
except OSError:
|
||||
return True
|
||||
|
||||
|
||||
def _merge_parent_dirs(
|
||||
existing_dirs: List[ManagedDir],
|
||||
managed_files: List[ManagedFile],
|
||||
|
|
@ -1368,18 +1319,6 @@ def harvest(
|
|||
users_role_name = "users"
|
||||
users_role_seen = seen_by_role.setdefault(users_role_name, set())
|
||||
|
||||
skel_dir = "/etc/skel"
|
||||
# Dotfiles to harvest for non-system users. For the common "skeleton"
|
||||
# files, only capture if the user's copy differs from /etc/skel.
|
||||
skel_dotfiles = [
|
||||
(".bashrc", "user_shell_rc"),
|
||||
(".profile", "user_profile"),
|
||||
(".bash_logout", "user_shell_logout"),
|
||||
]
|
||||
extra_dotfiles = [
|
||||
(".bash_aliases", "user_shell_aliases"),
|
||||
]
|
||||
|
||||
for u in user_records:
|
||||
users_list.append(
|
||||
{
|
||||
|
|
@ -1414,48 +1353,6 @@ def harvest(
|
|||
seen_global=captured_global,
|
||||
)
|
||||
|
||||
# Capture common per-user shell dotfiles when they differ from /etc/skel.
|
||||
# These still go through IgnorePolicy and user path filters.
|
||||
home = (u.home or "").rstrip("/")
|
||||
if home and home.startswith("/"):
|
||||
for rel, reason in skel_dotfiles:
|
||||
upath = os.path.join(home, rel)
|
||||
if not os.path.exists(upath):
|
||||
continue
|
||||
skel_path = os.path.join(skel_dir, rel)
|
||||
if not _files_differ(upath, skel_path, max_bytes=policy.max_file_bytes):
|
||||
continue
|
||||
_capture_file(
|
||||
bundle_dir=bundle_dir,
|
||||
role_name=users_role_name,
|
||||
abs_path=upath,
|
||||
reason=reason,
|
||||
policy=policy,
|
||||
path_filter=path_filter,
|
||||
managed_out=users_managed,
|
||||
excluded_out=users_excluded,
|
||||
seen_role=users_role_seen,
|
||||
seen_global=captured_global,
|
||||
)
|
||||
|
||||
# Capture other common per-user shell files unconditionally if present.
|
||||
for rel, reason in extra_dotfiles:
|
||||
upath = os.path.join(home, rel)
|
||||
if not os.path.exists(upath):
|
||||
continue
|
||||
_capture_file(
|
||||
bundle_dir=bundle_dir,
|
||||
role_name=users_role_name,
|
||||
abs_path=upath,
|
||||
reason=reason,
|
||||
policy=policy,
|
||||
path_filter=path_filter,
|
||||
managed_out=users_managed,
|
||||
excluded_out=users_excluded,
|
||||
seen_role=users_role_seen,
|
||||
seen_global=captured_global,
|
||||
)
|
||||
|
||||
users_snapshot = UsersSnapshot(
|
||||
role_name=users_role_name,
|
||||
users=users_list,
|
||||
|
|
|
|||
|
|
@ -819,12 +819,7 @@ def _manifest_from_bundle_dir(
|
|||
group = str(u.get("primary_group") or owner)
|
||||
break
|
||||
|
||||
# Prefer the harvested file mode so we preserve any deliberate
|
||||
# permissions (e.g. 0600 for certain dotfiles). For authorized_keys,
|
||||
# enforce 0600 regardless.
|
||||
mode = mf.get("mode") or "0644"
|
||||
if mf.get("reason") == "authorized_keys":
|
||||
mode = "0600"
|
||||
mode = "0600" if mf.get("reason") == "authorized_keys" else "0644"
|
||||
ssh_files.append(
|
||||
{
|
||||
"dest": dest,
|
||||
|
|
|
|||
7
tests.sh
7
tests.sh
|
|
@ -27,10 +27,9 @@ poetry run \
|
|||
enroll harvest --out "${BUNDLE_DIR}2"
|
||||
poetry run \
|
||||
enroll diff \
|
||||
--old "${BUNDLE_DIR}" \
|
||||
--new "${BUNDLE_DIR}2" \
|
||||
--format json | jq
|
||||
DEBIAN_FRONTEND=noninteractive apt-get remove --purge cowsay
|
||||
--old "${BUNDLE_DIR}" \
|
||||
--new "${BUNDLE_DIR}2" \
|
||||
--format json | jq
|
||||
|
||||
# Ansible test
|
||||
builtin cd "${ANSIBLE_DIR}"
|
||||
|
|
|
|||
|
|
@ -1,14 +1,9 @@
|
|||
from __future__ import annotations
|
||||
import sys
|
||||
|
||||
import pytest
|
||||
|
||||
import enroll.cli as cli
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from enroll.remote import RemoteSudoPasswordRequired
|
||||
from enroll.sopsutil import SopsError
|
||||
|
||||
|
||||
def test_cli_harvest_subcommand_calls_harvest(monkeypatch, capsys, tmp_path):
|
||||
called = {}
|
||||
|
|
@ -403,286 +398,3 @@ def test_cli_manifest_common_args(monkeypatch, tmp_path):
|
|||
cli.main()
|
||||
assert called["fqdn"] == "example.test"
|
||||
assert called["jinjaturtle"] == "off"
|
||||
|
||||
|
||||
def test_cli_explain_passes_args_and_writes_stdout(monkeypatch, capsys, tmp_path):
|
||||
called = {}
|
||||
|
||||
def fake_explain_state(
|
||||
harvest: str,
|
||||
*,
|
||||
sops_mode: bool = False,
|
||||
fmt: str = "text",
|
||||
max_examples: int = 3,
|
||||
):
|
||||
called["harvest"] = harvest
|
||||
called["sops_mode"] = sops_mode
|
||||
called["fmt"] = fmt
|
||||
called["max_examples"] = max_examples
|
||||
return "EXPLAINED\n"
|
||||
|
||||
monkeypatch.setattr(cli, "explain_state", fake_explain_state)
|
||||
|
||||
monkeypatch.setattr(
|
||||
sys,
|
||||
"argv",
|
||||
[
|
||||
"enroll",
|
||||
"explain",
|
||||
"--sops",
|
||||
"--format",
|
||||
"json",
|
||||
"--max-examples",
|
||||
"7",
|
||||
str(tmp_path / "bundle" / "state.json"),
|
||||
],
|
||||
)
|
||||
|
||||
cli.main()
|
||||
out = capsys.readouterr().out
|
||||
assert out == "EXPLAINED\n"
|
||||
assert called["sops_mode"] is True
|
||||
assert called["fmt"] == "json"
|
||||
assert called["max_examples"] == 7
|
||||
|
||||
|
||||
def test_discover_config_path_missing_config_value_returns_none(monkeypatch):
|
||||
# Covers the "--config" flag present with no value.
|
||||
monkeypatch.delenv("ENROLL_CONFIG", raising=False)
|
||||
monkeypatch.delenv("XDG_CONFIG_HOME", raising=False)
|
||||
assert cli._discover_config_path(["--config"]) is None
|
||||
|
||||
|
||||
def test_discover_config_path_defaults_to_home_config(monkeypatch, tmp_path: Path):
|
||||
# Covers the Path.home() / ".config" fallback.
|
||||
monkeypatch.delenv("ENROLL_CONFIG", raising=False)
|
||||
monkeypatch.delenv("XDG_CONFIG_HOME", raising=False)
|
||||
monkeypatch.setattr(cli.Path, "home", lambda: tmp_path)
|
||||
monkeypatch.setattr(cli.Path, "cwd", lambda: tmp_path)
|
||||
|
||||
cp = tmp_path / ".config" / "enroll" / "enroll.ini"
|
||||
cp.parent.mkdir(parents=True)
|
||||
cp.write_text("[enroll]\n", encoding="utf-8")
|
||||
|
||||
assert cli._discover_config_path(["harvest"]) == cp
|
||||
|
||||
|
||||
def test_cli_harvest_local_sops_encrypts_and_prints_path(
|
||||
monkeypatch, tmp_path: Path, capsys
|
||||
):
|
||||
out_dir = tmp_path / "out"
|
||||
out_dir.mkdir()
|
||||
calls: dict[str, object] = {}
|
||||
|
||||
def fake_harvest(bundle_dir: str, **kwargs):
|
||||
calls["bundle"] = bundle_dir
|
||||
# Create a minimal state.json so tooling that expects it won't break.
|
||||
Path(bundle_dir).mkdir(parents=True, exist_ok=True)
|
||||
(Path(bundle_dir) / "state.json").write_text("{}", encoding="utf-8")
|
||||
return str(Path(bundle_dir) / "state.json")
|
||||
|
||||
def fake_encrypt(bundle_dir: Path, out_file: Path, fps: list[str]):
|
||||
calls["encrypt"] = (bundle_dir, out_file, fps)
|
||||
out_file.write_text("encrypted", encoding="utf-8")
|
||||
return out_file
|
||||
|
||||
monkeypatch.setattr(cli, "harvest", fake_harvest)
|
||||
monkeypatch.setattr(cli, "_encrypt_harvest_dir_to_sops", fake_encrypt)
|
||||
|
||||
monkeypatch.setattr(
|
||||
sys,
|
||||
"argv",
|
||||
[
|
||||
"enroll",
|
||||
"harvest",
|
||||
"--sops",
|
||||
"ABCDEF",
|
||||
"--out",
|
||||
str(out_dir),
|
||||
],
|
||||
)
|
||||
cli.main()
|
||||
|
||||
printed = capsys.readouterr().out.strip()
|
||||
assert printed.endswith("harvest.tar.gz.sops")
|
||||
assert Path(printed).exists()
|
||||
assert calls.get("encrypt")
|
||||
|
||||
|
||||
def test_cli_harvest_remote_sops_encrypts_and_prints_path(
|
||||
monkeypatch, tmp_path: Path, capsys
|
||||
):
|
||||
out_dir = tmp_path / "out"
|
||||
out_dir.mkdir()
|
||||
calls: dict[str, object] = {}
|
||||
|
||||
def fake_remote_harvest(**kwargs):
|
||||
calls["remote"] = kwargs
|
||||
# Create a minimal state.json in the temp bundle.
|
||||
out = Path(kwargs["local_out_dir"]) / "state.json"
|
||||
out.write_text("{}", encoding="utf-8")
|
||||
return out
|
||||
|
||||
def fake_encrypt(bundle_dir: Path, out_file: Path, fps: list[str]):
|
||||
calls["encrypt"] = (bundle_dir, out_file, fps)
|
||||
out_file.write_text("encrypted", encoding="utf-8")
|
||||
return out_file
|
||||
|
||||
monkeypatch.setattr(cli, "remote_harvest", fake_remote_harvest)
|
||||
monkeypatch.setattr(cli, "_encrypt_harvest_dir_to_sops", fake_encrypt)
|
||||
|
||||
monkeypatch.setattr(
|
||||
sys,
|
||||
"argv",
|
||||
[
|
||||
"enroll",
|
||||
"harvest",
|
||||
"--remote-host",
|
||||
"example.com",
|
||||
"--remote-user",
|
||||
"root",
|
||||
"--sops",
|
||||
"ABCDEF",
|
||||
"--out",
|
||||
str(out_dir),
|
||||
],
|
||||
)
|
||||
cli.main()
|
||||
|
||||
printed = capsys.readouterr().out.strip()
|
||||
assert printed.endswith("harvest.tar.gz.sops")
|
||||
assert Path(printed).exists()
|
||||
assert calls.get("remote")
|
||||
assert calls.get("encrypt")
|
||||
|
||||
|
||||
def test_cli_harvest_remote_password_required_exits_cleanly(monkeypatch):
|
||||
def boom(**kwargs):
|
||||
raise RemoteSudoPasswordRequired("pw required")
|
||||
|
||||
monkeypatch.setattr(cli, "remote_harvest", boom)
|
||||
monkeypatch.setattr(
|
||||
sys,
|
||||
"argv",
|
||||
[
|
||||
"enroll",
|
||||
"harvest",
|
||||
"--remote-host",
|
||||
"example.com",
|
||||
"--remote-user",
|
||||
"root",
|
||||
],
|
||||
)
|
||||
with pytest.raises(SystemExit) as e:
|
||||
cli.main()
|
||||
assert "--ask-become-pass" in str(e.value)
|
||||
|
||||
|
||||
def test_cli_runtime_error_is_wrapped_as_user_friendly_system_exit(monkeypatch):
|
||||
def boom(*args, **kwargs):
|
||||
raise RuntimeError("nope")
|
||||
|
||||
monkeypatch.setattr(cli, "harvest", boom)
|
||||
monkeypatch.setattr(sys, "argv", ["enroll", "harvest", "--out", "/tmp/x"])
|
||||
with pytest.raises(SystemExit) as e:
|
||||
cli.main()
|
||||
assert str(e.value) == "error: nope"
|
||||
|
||||
|
||||
def test_cli_sops_error_is_wrapped_as_user_friendly_system_exit(monkeypatch):
|
||||
def boom(*args, **kwargs):
|
||||
raise SopsError("sops broke")
|
||||
|
||||
monkeypatch.setattr(cli, "manifest", boom)
|
||||
monkeypatch.setattr(
|
||||
sys, "argv", ["enroll", "manifest", "--harvest", "/tmp/x", "--out", "/tmp/y"]
|
||||
)
|
||||
with pytest.raises(SystemExit) as e:
|
||||
cli.main()
|
||||
assert str(e.value) == "error: sops broke"
|
||||
|
||||
|
||||
def test_cli_diff_notifies_webhook_and_email_and_respects_exit_code(
|
||||
monkeypatch, capsys
|
||||
):
|
||||
calls: dict[str, object] = {}
|
||||
|
||||
def fake_compare(old, new, sops_mode=False):
|
||||
calls["compare"] = (old, new, sops_mode)
|
||||
return {"dummy": True}, True
|
||||
|
||||
def fake_format(report, fmt="text"):
|
||||
calls.setdefault("format", []).append((report, fmt))
|
||||
return "REPORT\n"
|
||||
|
||||
def fake_post(url, body, headers=None):
|
||||
calls["webhook"] = (url, body, headers)
|
||||
return 200, b"ok"
|
||||
|
||||
def fake_email(**kwargs):
|
||||
calls["email"] = kwargs
|
||||
|
||||
monkeypatch.setattr(cli, "compare_harvests", fake_compare)
|
||||
monkeypatch.setattr(cli, "format_report", fake_format)
|
||||
monkeypatch.setattr(cli, "post_webhook", fake_post)
|
||||
monkeypatch.setattr(cli, "send_email", fake_email)
|
||||
monkeypatch.setenv("SMTPPW", "secret")
|
||||
|
||||
monkeypatch.setattr(
|
||||
sys,
|
||||
"argv",
|
||||
[
|
||||
"enroll",
|
||||
"diff",
|
||||
"--old",
|
||||
"/tmp/old",
|
||||
"--new",
|
||||
"/tmp/new",
|
||||
"--webhook",
|
||||
"https://example.invalid/h",
|
||||
"--webhook-header",
|
||||
"X-Test: ok",
|
||||
"--email-to",
|
||||
"a@example.com",
|
||||
"--smtp-password-env",
|
||||
"SMTPPW",
|
||||
"--exit-code",
|
||||
],
|
||||
)
|
||||
|
||||
with pytest.raises(SystemExit) as e:
|
||||
cli.main()
|
||||
assert e.value.code == 2
|
||||
|
||||
assert calls.get("compare")
|
||||
assert calls.get("webhook")
|
||||
assert calls.get("email")
|
||||
# No report printed when exiting via --exit-code? (we still render and print).
|
||||
_ = capsys.readouterr()
|
||||
|
||||
|
||||
def test_cli_diff_webhook_http_error_raises_system_exit(monkeypatch):
|
||||
def fake_compare(old, new, sops_mode=False):
|
||||
return {"dummy": True}, True
|
||||
|
||||
monkeypatch.setattr(cli, "compare_harvests", fake_compare)
|
||||
monkeypatch.setattr(cli, "format_report", lambda report, fmt="text": "R\n")
|
||||
monkeypatch.setattr(cli, "post_webhook", lambda url, body, headers=None: (500, b""))
|
||||
|
||||
monkeypatch.setattr(
|
||||
sys,
|
||||
"argv",
|
||||
[
|
||||
"enroll",
|
||||
"diff",
|
||||
"--old",
|
||||
"/tmp/old",
|
||||
"--new",
|
||||
"/tmp/new",
|
||||
"--webhook",
|
||||
"https://example.invalid/h",
|
||||
],
|
||||
)
|
||||
with pytest.raises(SystemExit) as e:
|
||||
cli.main()
|
||||
assert "HTTP 500" in str(e.value)
|
||||
|
|
|
|||
|
|
@ -1,222 +0,0 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
import enroll.explain as ex
|
||||
|
||||
|
||||
def _write_state(bundle: Path, state: dict) -> Path:
|
||||
bundle.mkdir(parents=True, exist_ok=True)
|
||||
(bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8")
|
||||
return bundle / "state.json"
|
||||
|
||||
|
||||
def test_explain_state_text_renders_roles_inventory_and_reasons(tmp_path: Path):
|
||||
bundle = tmp_path / "bundle"
|
||||
state = {
|
||||
"schema_version": 3,
|
||||
"host": {"hostname": "h1", "os": "debian", "pkg_backend": "dpkg"},
|
||||
"enroll": {"version": "0.0.0"},
|
||||
"inventory": {
|
||||
"packages": {
|
||||
"foo": {
|
||||
"installations": [{"version": "1.0", "arch": "amd64"}],
|
||||
"observed_via": [
|
||||
{"kind": "systemd_unit", "ref": "foo.service"},
|
||||
{"kind": "package_role", "ref": "foo"},
|
||||
],
|
||||
"roles": ["foo"],
|
||||
},
|
||||
"bar": {
|
||||
"installations": [{"version": "2.0", "arch": "amd64"}],
|
||||
"observed_via": [{"kind": "user_installed", "ref": "manual"}],
|
||||
"roles": ["bar"],
|
||||
},
|
||||
}
|
||||
},
|
||||
"roles": {
|
||||
"users": {
|
||||
"role_name": "users",
|
||||
"users": [{"name": "alice"}],
|
||||
"managed_files": [
|
||||
{
|
||||
"path": "/home/alice/.ssh/authorized_keys",
|
||||
"src_rel": "home/alice/.ssh/authorized_keys",
|
||||
"owner": "alice",
|
||||
"group": "alice",
|
||||
"mode": "0600",
|
||||
"reason": "authorized_keys",
|
||||
}
|
||||
],
|
||||
"managed_dirs": [
|
||||
{
|
||||
"path": "/home/alice/.ssh",
|
||||
"owner": "alice",
|
||||
"group": "alice",
|
||||
"mode": "0700",
|
||||
"reason": "parent_of_managed_file",
|
||||
}
|
||||
],
|
||||
"excluded": [{"path": "/etc/shadow", "reason": "sensitive_content"}],
|
||||
"notes": ["n1", "n2"],
|
||||
},
|
||||
"services": [
|
||||
{
|
||||
"unit": "foo.service",
|
||||
"role_name": "foo",
|
||||
"packages": ["foo"],
|
||||
"managed_files": [
|
||||
{
|
||||
"path": "/etc/foo.conf",
|
||||
"src_rel": "etc/foo.conf",
|
||||
"owner": "root",
|
||||
"group": "root",
|
||||
"mode": "0644",
|
||||
"reason": "modified_conffile",
|
||||
},
|
||||
# Unknown reason should fall back to generic text.
|
||||
{
|
||||
"path": "/etc/odd.conf",
|
||||
"src_rel": "etc/odd.conf",
|
||||
"owner": "root",
|
||||
"group": "root",
|
||||
"mode": "0644",
|
||||
"reason": "mystery_reason",
|
||||
},
|
||||
],
|
||||
"excluded": [],
|
||||
"notes": [],
|
||||
}
|
||||
],
|
||||
"packages": [
|
||||
{
|
||||
"package": "bar",
|
||||
"role_name": "bar",
|
||||
"managed_files": [],
|
||||
"excluded": [],
|
||||
"notes": [],
|
||||
}
|
||||
],
|
||||
"extra_paths": {
|
||||
"role_name": "extra_paths",
|
||||
"include_patterns": ["/etc/a", "/etc/b"],
|
||||
"exclude_patterns": ["/etc/x", "/etc/y"],
|
||||
"managed_files": [],
|
||||
"excluded": [],
|
||||
"notes": [],
|
||||
},
|
||||
"apt_config": {
|
||||
"role_name": "apt_config",
|
||||
"managed_files": [],
|
||||
"excluded": [],
|
||||
"notes": [],
|
||||
},
|
||||
"dnf_config": {
|
||||
"role_name": "dnf_config",
|
||||
"managed_files": [],
|
||||
"excluded": [],
|
||||
"notes": [],
|
||||
},
|
||||
"etc_custom": {
|
||||
"role_name": "etc_custom",
|
||||
"managed_files": [],
|
||||
"excluded": [],
|
||||
"notes": [],
|
||||
},
|
||||
"usr_local_custom": {
|
||||
"role_name": "usr_local_custom",
|
||||
"managed_files": [],
|
||||
"excluded": [],
|
||||
"notes": [],
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
state_path = _write_state(bundle, state)
|
||||
|
||||
out = ex.explain_state(str(state_path), fmt="text", max_examples=1)
|
||||
|
||||
assert "Enroll explained:" in out
|
||||
assert "Host: h1" in out
|
||||
assert "Inventory" in out
|
||||
# observed_via summary should include both kinds (order not strictly guaranteed)
|
||||
assert "observed_via" in out
|
||||
assert "systemd_unit" in out
|
||||
assert "user_installed" in out
|
||||
|
||||
# extra_paths include/exclude patterns should be rendered with max_examples truncation.
|
||||
assert "include_patterns:" in out
|
||||
assert "/etc/a" in out
|
||||
assert "exclude_patterns:" in out
|
||||
|
||||
# Reasons section should mention known and unknown reasons.
|
||||
assert "modified_conffile" in out
|
||||
assert "mystery_reason" in out
|
||||
assert "Captured with reason 'mystery_reason'" in out
|
||||
|
||||
# Excluded paths section.
|
||||
assert "Why paths were excluded" in out
|
||||
assert "sensitive_content" in out
|
||||
|
||||
|
||||
def test_explain_state_json_contains_structured_report(tmp_path: Path):
|
||||
bundle = tmp_path / "bundle"
|
||||
state = {
|
||||
"schema_version": 3,
|
||||
"host": {"hostname": "h2", "os": "rhel", "pkg_backend": "rpm"},
|
||||
"enroll": {"version": "1.2.3"},
|
||||
"inventory": {"packages": {}},
|
||||
"roles": {
|
||||
"users": {
|
||||
"role_name": "users",
|
||||
"users": [],
|
||||
"managed_files": [],
|
||||
"excluded": [],
|
||||
"notes": [],
|
||||
},
|
||||
"services": [],
|
||||
"packages": [],
|
||||
"apt_config": {
|
||||
"role_name": "apt_config",
|
||||
"managed_files": [],
|
||||
"excluded": [],
|
||||
"notes": [],
|
||||
},
|
||||
"dnf_config": {
|
||||
"role_name": "dnf_config",
|
||||
"managed_files": [],
|
||||
"excluded": [],
|
||||
"notes": [],
|
||||
},
|
||||
"etc_custom": {
|
||||
"role_name": "etc_custom",
|
||||
"managed_files": [],
|
||||
"excluded": [],
|
||||
"notes": [],
|
||||
},
|
||||
"usr_local_custom": {
|
||||
"role_name": "usr_local_custom",
|
||||
"managed_files": [],
|
||||
"excluded": [],
|
||||
"notes": [],
|
||||
},
|
||||
"extra_paths": {
|
||||
"role_name": "extra_paths",
|
||||
"include_patterns": [],
|
||||
"exclude_patterns": [],
|
||||
"managed_files": [],
|
||||
"excluded": [],
|
||||
"notes": [],
|
||||
},
|
||||
},
|
||||
}
|
||||
state_path = _write_state(bundle, state)
|
||||
|
||||
raw = ex.explain_state(str(state_path), fmt="json", max_examples=2)
|
||||
rep = json.loads(raw)
|
||||
assert rep["host"]["hostname"] == "h2"
|
||||
assert rep["enroll"]["version"] == "1.2.3"
|
||||
assert rep["inventory"]["package_count"] == 0
|
||||
assert isinstance(rep["roles"], list)
|
||||
assert "reasons" in rep
|
||||
|
|
@ -1,164 +0,0 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
import enroll.harvest as h
|
||||
from enroll.platform import PlatformInfo
|
||||
from enroll.systemd import UnitInfo
|
||||
|
||||
|
||||
class AllowAllPolicy:
|
||||
def deny_reason(self, path: str):
|
||||
return None
|
||||
|
||||
|
||||
class FakeBackend:
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
name: str,
|
||||
installed: dict[str, list[dict[str, str]]],
|
||||
manual: list[str],
|
||||
):
|
||||
self.name = name
|
||||
self._installed = dict(installed)
|
||||
self._manual = list(manual)
|
||||
|
||||
def build_etc_index(self):
|
||||
# No package ownership information needed for this test.
|
||||
return set(), {}, {}, {}
|
||||
|
||||
def installed_packages(self):
|
||||
return dict(self._installed)
|
||||
|
||||
def list_manual_packages(self):
|
||||
return list(self._manual)
|
||||
|
||||
def owner_of_path(self, path: str):
|
||||
return None
|
||||
|
||||
def specific_paths_for_hints(self, hints: set[str]):
|
||||
return []
|
||||
|
||||
def is_pkg_config_path(self, path: str) -> bool:
|
||||
return False
|
||||
|
||||
def modified_paths(self, pkg: str, etc_paths: list[str]):
|
||||
return {}
|
||||
|
||||
|
||||
def test_harvest_unifies_cron_and_logrotate_into_dedicated_package_roles(
|
||||
monkeypatch, tmp_path: Path
|
||||
):
|
||||
bundle = tmp_path / "bundle"
|
||||
|
||||
# Fake files we want harvested.
|
||||
files = {
|
||||
"/etc/crontab": b"* * * * * root echo hi\n",
|
||||
"/etc/cron.d/php": b"# php cron\n",
|
||||
"/var/spool/cron/crontabs/alice": b"@daily echo user\n",
|
||||
"/etc/logrotate.conf": b"weekly\n",
|
||||
"/etc/logrotate.d/rsyslog": b"/var/log/syslog { rotate 7 }\n",
|
||||
}
|
||||
|
||||
monkeypatch.setattr(h.os.path, "islink", lambda p: False)
|
||||
monkeypatch.setattr(h.os.path, "isfile", lambda p: p in files)
|
||||
monkeypatch.setattr(h.os.path, "isdir", lambda p: False)
|
||||
monkeypatch.setattr(h.os.path, "exists", lambda p: (p in files) or False)
|
||||
|
||||
# Expand cron/logrotate globs deterministically.
|
||||
def fake_iter_matching(spec: str, cap: int = 10000):
|
||||
mapping = {
|
||||
"/etc/crontab": ["/etc/crontab"],
|
||||
"/etc/cron.d/*": ["/etc/cron.d/php"],
|
||||
"/etc/cron.hourly/*": [],
|
||||
"/etc/cron.daily/*": [],
|
||||
"/etc/cron.weekly/*": [],
|
||||
"/etc/cron.monthly/*": [],
|
||||
"/etc/cron.allow": [],
|
||||
"/etc/cron.deny": [],
|
||||
"/etc/anacrontab": [],
|
||||
"/etc/anacron/*": [],
|
||||
"/var/spool/cron/*": [],
|
||||
"/var/spool/cron/crontabs/*": ["/var/spool/cron/crontabs/alice"],
|
||||
"/var/spool/crontabs/*": [],
|
||||
"/var/spool/anacron/*": [],
|
||||
"/etc/logrotate.conf": ["/etc/logrotate.conf"],
|
||||
"/etc/logrotate.d/*": ["/etc/logrotate.d/rsyslog"],
|
||||
}
|
||||
return list(mapping.get(spec, []))[:cap]
|
||||
|
||||
monkeypatch.setattr(h, "_iter_matching_files", fake_iter_matching)
|
||||
|
||||
# Avoid real system probing.
|
||||
monkeypatch.setattr(
|
||||
h, "detect_platform", lambda: PlatformInfo("debian", "dpkg", {})
|
||||
)
|
||||
backend = FakeBackend(
|
||||
name="dpkg",
|
||||
installed={
|
||||
"cron": [{"version": "1", "arch": "amd64"}],
|
||||
"logrotate": [{"version": "1", "arch": "amd64"}],
|
||||
},
|
||||
# Include cron/logrotate in manual packages to ensure they are skipped in the generic loop.
|
||||
manual=["cron", "logrotate"],
|
||||
)
|
||||
monkeypatch.setattr(h, "get_backend", lambda info=None: backend)
|
||||
|
||||
# Include a service that would collide with cron role naming.
|
||||
monkeypatch.setattr(
|
||||
h, "list_enabled_services", lambda: ["cron.service", "foo.service"]
|
||||
)
|
||||
monkeypatch.setattr(h, "list_enabled_timers", lambda: [])
|
||||
monkeypatch.setattr(
|
||||
h,
|
||||
"get_unit_info",
|
||||
lambda unit: UnitInfo(
|
||||
name=unit,
|
||||
fragment_path=None,
|
||||
dropin_paths=[],
|
||||
env_files=[],
|
||||
exec_paths=[],
|
||||
active_state="active",
|
||||
sub_state="running",
|
||||
unit_file_state="enabled",
|
||||
condition_result=None,
|
||||
),
|
||||
)
|
||||
monkeypatch.setattr(h, "collect_non_system_users", lambda: [])
|
||||
monkeypatch.setattr(
|
||||
h,
|
||||
"stat_triplet",
|
||||
lambda p: ("alice" if "alice" in p else "root", "root", "0644"),
|
||||
)
|
||||
|
||||
# Avoid needing real source files by implementing our own bundle copier.
|
||||
def fake_copy(bundle_dir: str, role_name: str, abs_path: str, src_rel: str):
|
||||
dst = Path(bundle_dir) / "artifacts" / role_name / src_rel
|
||||
dst.parent.mkdir(parents=True, exist_ok=True)
|
||||
dst.write_bytes(files.get(abs_path, b""))
|
||||
|
||||
monkeypatch.setattr(h, "_copy_into_bundle", fake_copy)
|
||||
|
||||
state_path = h.harvest(str(bundle), policy=AllowAllPolicy())
|
||||
st = json.loads(Path(state_path).read_text(encoding="utf-8"))
|
||||
|
||||
# cron.service must be skipped to avoid colliding with the dedicated "cron" package role.
|
||||
svc_units = [s["unit"] for s in st["roles"]["services"]]
|
||||
assert "cron.service" not in svc_units
|
||||
assert "foo.service" in svc_units
|
||||
|
||||
pkgs = st["roles"]["packages"]
|
||||
cron = next(p for p in pkgs if p["role_name"] == "cron")
|
||||
logrotate = next(p for p in pkgs if p["role_name"] == "logrotate")
|
||||
|
||||
cron_paths = {mf["path"] for mf in cron["managed_files"]}
|
||||
assert "/etc/crontab" in cron_paths
|
||||
assert "/etc/cron.d/php" in cron_paths
|
||||
# user crontab captured
|
||||
assert "/var/spool/cron/crontabs/alice" in cron_paths
|
||||
|
||||
lr_paths = {mf["path"] for mf in logrotate["managed_files"]}
|
||||
assert "/etc/logrotate.conf" in lr_paths
|
||||
assert "/etc/logrotate.d/rsyslog" in lr_paths
|
||||
|
|
@ -1,170 +0,0 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
import enroll.harvest as h
|
||||
|
||||
|
||||
def test_iter_matching_files_skips_symlinks_and_walks_dirs(monkeypatch, tmp_path: Path):
|
||||
# Layout:
|
||||
# root/real.txt (file)
|
||||
# root/sub/nested.txt
|
||||
# root/link -> ... (ignored)
|
||||
root = tmp_path / "root"
|
||||
(root / "sub").mkdir(parents=True)
|
||||
(root / "real.txt").write_text("a", encoding="utf-8")
|
||||
(root / "sub" / "nested.txt").write_text("b", encoding="utf-8")
|
||||
|
||||
paths = {
|
||||
str(root): "dir",
|
||||
str(root / "real.txt"): "file",
|
||||
str(root / "sub"): "dir",
|
||||
str(root / "sub" / "nested.txt"): "file",
|
||||
str(root / "link"): "link",
|
||||
}
|
||||
|
||||
monkeypatch.setattr(h.glob, "glob", lambda spec: [str(root), str(root / "link")])
|
||||
monkeypatch.setattr(h.os.path, "islink", lambda p: paths.get(p) == "link")
|
||||
monkeypatch.setattr(h.os.path, "isfile", lambda p: paths.get(p) == "file")
|
||||
monkeypatch.setattr(h.os.path, "isdir", lambda p: paths.get(p) == "dir")
|
||||
monkeypatch.setattr(
|
||||
h.os,
|
||||
"walk",
|
||||
lambda p: [
|
||||
(str(root), ["sub"], ["real.txt", "link"]),
|
||||
(str(root / "sub"), [], ["nested.txt"]),
|
||||
],
|
||||
)
|
||||
|
||||
out = h._iter_matching_files("/whatever/*", cap=100)
|
||||
assert str(root / "real.txt") in out
|
||||
assert str(root / "sub" / "nested.txt") in out
|
||||
assert str(root / "link") not in out
|
||||
|
||||
|
||||
def test_parse_apt_signed_by_extracts_keyrings(tmp_path: Path):
|
||||
f1 = tmp_path / "a.list"
|
||||
f1.write_text(
|
||||
"deb [signed-by=/usr/share/keyrings/foo.gpg] https://example.invalid stable main\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
f2 = tmp_path / "b.sources"
|
||||
f2.write_text(
|
||||
"Types: deb\nSigned-By: /etc/apt/keyrings/bar.gpg, /usr/share/keyrings/baz.gpg\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
f3 = tmp_path / "c.sources"
|
||||
f3.write_text("Signed-By: | /bin/echo nope\n", encoding="utf-8")
|
||||
|
||||
out = h._parse_apt_signed_by([str(f1), str(f2), str(f3)])
|
||||
assert "/usr/share/keyrings/foo.gpg" in out
|
||||
assert "/etc/apt/keyrings/bar.gpg" in out
|
||||
assert "/usr/share/keyrings/baz.gpg" in out
|
||||
|
||||
|
||||
def test_iter_apt_capture_paths_includes_signed_by_keyring(monkeypatch):
|
||||
# Simulate:
|
||||
# /etc/apt/apt.conf.d/00test
|
||||
# /etc/apt/sources.list.d/test.list (signed-by outside /etc/apt)
|
||||
# /usr/share/keyrings/ext.gpg
|
||||
files = {
|
||||
"/etc/apt/apt.conf.d/00test": "file",
|
||||
"/etc/apt/sources.list.d/test.list": "file",
|
||||
"/usr/share/keyrings/ext.gpg": "file",
|
||||
}
|
||||
|
||||
monkeypatch.setattr(h.os.path, "isdir", lambda p: p in {"/etc/apt"})
|
||||
monkeypatch.setattr(
|
||||
h.os,
|
||||
"walk",
|
||||
lambda root: [
|
||||
("/etc/apt", ["apt.conf.d", "sources.list.d"], []),
|
||||
("/etc/apt/apt.conf.d", [], ["00test"]),
|
||||
("/etc/apt/sources.list.d", [], ["test.list"]),
|
||||
],
|
||||
)
|
||||
monkeypatch.setattr(h.os.path, "islink", lambda p: False)
|
||||
monkeypatch.setattr(h.os.path, "isfile", lambda p: files.get(p) == "file")
|
||||
|
||||
# Only treat the sources glob as having a hit.
|
||||
def fake_iter_matching(spec: str, cap: int = 10000):
|
||||
if spec == "/etc/apt/sources.list.d/*.list":
|
||||
return ["/etc/apt/sources.list.d/test.list"]
|
||||
return []
|
||||
|
||||
monkeypatch.setattr(h, "_iter_matching_files", fake_iter_matching)
|
||||
|
||||
# Provide file contents for the sources file.
|
||||
real_open = open
|
||||
|
||||
def fake_open(path, *a, **k):
|
||||
if path == "/etc/apt/sources.list.d/test.list":
|
||||
return real_open(os.devnull, "r", encoding="utf-8") # placeholder
|
||||
return real_open(path, *a, **k)
|
||||
|
||||
# Easier: patch _parse_apt_signed_by directly to avoid filesystem reads.
|
||||
monkeypatch.setattr(
|
||||
h, "_parse_apt_signed_by", lambda sfs: {"/usr/share/keyrings/ext.gpg"}
|
||||
)
|
||||
|
||||
out = h._iter_apt_capture_paths()
|
||||
paths = {p for p, _r in out}
|
||||
reasons = {p: r for p, r in out}
|
||||
assert "/etc/apt/apt.conf.d/00test" in paths
|
||||
assert "/etc/apt/sources.list.d/test.list" in paths
|
||||
assert "/usr/share/keyrings/ext.gpg" in paths
|
||||
assert reasons["/usr/share/keyrings/ext.gpg"] == "apt_signed_by_keyring"
|
||||
|
||||
|
||||
def test_iter_dnf_capture_paths(monkeypatch):
|
||||
files = {
|
||||
"/etc/dnf/dnf.conf": "file",
|
||||
"/etc/yum/yum.conf": "file",
|
||||
"/etc/yum.conf": "file",
|
||||
"/etc/yum.repos.d/test.repo": "file",
|
||||
"/etc/pki/rpm-gpg/RPM-GPG-KEY": "file",
|
||||
}
|
||||
|
||||
def isdir(p):
|
||||
return p in {"/etc/dnf", "/etc/yum", "/etc/yum.repos.d", "/etc/pki/rpm-gpg"}
|
||||
|
||||
def walk(root):
|
||||
if root == "/etc/dnf":
|
||||
return [("/etc/dnf", [], ["dnf.conf"])]
|
||||
if root == "/etc/yum":
|
||||
return [("/etc/yum", [], ["yum.conf"])]
|
||||
if root == "/etc/pki/rpm-gpg":
|
||||
return [("/etc/pki/rpm-gpg", [], ["RPM-GPG-KEY"])]
|
||||
return []
|
||||
|
||||
monkeypatch.setattr(h.os.path, "isdir", isdir)
|
||||
monkeypatch.setattr(h.os, "walk", walk)
|
||||
monkeypatch.setattr(h.os.path, "islink", lambda p: False)
|
||||
monkeypatch.setattr(h.os.path, "isfile", lambda p: files.get(p) == "file")
|
||||
monkeypatch.setattr(
|
||||
h,
|
||||
"_iter_matching_files",
|
||||
lambda spec, cap=10000: (
|
||||
["/etc/yum.repos.d/test.repo"] if spec.endswith("*.repo") else []
|
||||
),
|
||||
)
|
||||
|
||||
out = h._iter_dnf_capture_paths()
|
||||
paths = {p for p, _r in out}
|
||||
assert "/etc/dnf/dnf.conf" in paths
|
||||
assert "/etc/yum/yum.conf" in paths
|
||||
assert "/etc/yum.conf" in paths
|
||||
assert "/etc/yum.repos.d/test.repo" in paths
|
||||
assert "/etc/pki/rpm-gpg/RPM-GPG-KEY" in paths
|
||||
|
||||
|
||||
def test_iter_system_capture_paths_dedupes_first_reason(monkeypatch):
|
||||
monkeypatch.setattr(h, "_SYSTEM_CAPTURE_GLOBS", [("/a", "r1"), ("/b", "r2")])
|
||||
monkeypatch.setattr(
|
||||
h,
|
||||
"_iter_matching_files",
|
||||
lambda spec, cap=10000: ["/dup"] if spec in {"/a", "/b"} else [],
|
||||
)
|
||||
out = h._iter_system_capture_paths()
|
||||
assert out == [("/dup", "r1")]
|
||||
|
|
@ -1,12 +1,7 @@
|
|||
import json
|
||||
from pathlib import Path
|
||||
|
||||
import os
|
||||
import stat
|
||||
import tarfile
|
||||
import pytest
|
||||
|
||||
import enroll.manifest as manifest
|
||||
from enroll.manifest import manifest
|
||||
|
||||
|
||||
def test_manifest_writes_roles_and_playbook_with_clean_when(tmp_path: Path):
|
||||
|
|
@ -181,7 +176,7 @@ def test_manifest_writes_roles_and_playbook_with_clean_when(tmp_path: Path):
|
|||
bundle / "artifacts" / "usr_local_custom" / "usr" / "local" / "bin" / "myscript"
|
||||
).write_text("#!/bin/sh\necho hi\n", encoding="utf-8")
|
||||
|
||||
manifest.manifest(str(bundle), str(out))
|
||||
manifest(str(bundle), str(out))
|
||||
|
||||
# Service role: systemd management should be gated on foo_manage_unit and a probe.
|
||||
tasks = (out / "roles" / "foo" / "tasks" / "main.yml").read_text(encoding="utf-8")
|
||||
|
|
@ -350,7 +345,7 @@ def test_manifest_site_mode_creates_host_inventory_and_raw_files(tmp_path: Path)
|
|||
/ "myapp.conf"
|
||||
).write_text("myapp=1\n", encoding="utf-8")
|
||||
|
||||
manifest.manifest(str(bundle), str(out), fqdn=fqdn)
|
||||
manifest(str(bundle), str(out), fqdn=fqdn)
|
||||
|
||||
# Host playbook exists.
|
||||
assert (out / "playbooks" / f"{fqdn}.yml").exists()
|
||||
|
|
@ -487,7 +482,7 @@ def test_manifest_includes_dnf_config_role_when_present(tmp_path: Path):
|
|||
bundle.mkdir(parents=True, exist_ok=True)
|
||||
(bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8")
|
||||
|
||||
manifest.manifest(str(bundle), str(out))
|
||||
manifest(str(bundle), str(out))
|
||||
|
||||
pb = (out / "playbook.yml").read_text(encoding="utf-8")
|
||||
assert "- dnf_config" in pb
|
||||
|
|
@ -507,291 +502,3 @@ def test_render_install_packages_tasks_contains_dnf_branch():
|
|||
assert "ansible.builtin.dnf" in txt
|
||||
assert "ansible.builtin.package" in txt
|
||||
assert "pkg_mgr" in txt
|
||||
|
||||
|
||||
def test_manifest_orders_cron_and_logrotate_at_playbook_tail(tmp_path: Path):
|
||||
"""Cron/logrotate roles should appear at the end.
|
||||
|
||||
The cron role may restore per-user crontabs under /var/spool, so it should
|
||||
run after users have been created.
|
||||
"""
|
||||
|
||||
bundle = tmp_path / "bundle"
|
||||
out = tmp_path / "ansible"
|
||||
|
||||
state = {
|
||||
"schema_version": 3,
|
||||
"host": {"hostname": "test", "os": "debian", "pkg_backend": "dpkg"},
|
||||
"inventory": {"packages": {}},
|
||||
"roles": {
|
||||
"users": {
|
||||
"role_name": "users",
|
||||
"users": [{"name": "alice"}],
|
||||
"managed_files": [],
|
||||
"excluded": [],
|
||||
"notes": [],
|
||||
},
|
||||
"services": [],
|
||||
"packages": [
|
||||
{
|
||||
"package": "curl",
|
||||
"role_name": "curl",
|
||||
"managed_files": [],
|
||||
"excluded": [],
|
||||
"notes": [],
|
||||
},
|
||||
{
|
||||
"package": "cron",
|
||||
"role_name": "cron",
|
||||
"managed_files": [
|
||||
{
|
||||
"path": "/var/spool/cron/crontabs/alice",
|
||||
"src_rel": "var/spool/cron/crontabs/alice",
|
||||
"owner": "alice",
|
||||
"group": "root",
|
||||
"mode": "0600",
|
||||
"reason": "system_cron",
|
||||
}
|
||||
],
|
||||
"excluded": [],
|
||||
"notes": [],
|
||||
},
|
||||
{
|
||||
"package": "logrotate",
|
||||
"role_name": "logrotate",
|
||||
"managed_files": [
|
||||
{
|
||||
"path": "/etc/logrotate.conf",
|
||||
"src_rel": "etc/logrotate.conf",
|
||||
"owner": "root",
|
||||
"group": "root",
|
||||
"mode": "0644",
|
||||
"reason": "system_logrotate",
|
||||
}
|
||||
],
|
||||
"excluded": [],
|
||||
"notes": [],
|
||||
},
|
||||
],
|
||||
"apt_config": {
|
||||
"role_name": "apt_config",
|
||||
"managed_files": [],
|
||||
"excluded": [],
|
||||
"notes": [],
|
||||
},
|
||||
"dnf_config": {
|
||||
"role_name": "dnf_config",
|
||||
"managed_files": [],
|
||||
"excluded": [],
|
||||
"notes": [],
|
||||
},
|
||||
"etc_custom": {
|
||||
"role_name": "etc_custom",
|
||||
"managed_files": [],
|
||||
"excluded": [],
|
||||
"notes": [],
|
||||
},
|
||||
"usr_local_custom": {
|
||||
"role_name": "usr_local_custom",
|
||||
"managed_files": [],
|
||||
"excluded": [],
|
||||
"notes": [],
|
||||
},
|
||||
"extra_paths": {
|
||||
"role_name": "extra_paths",
|
||||
"include_patterns": [],
|
||||
"exclude_patterns": [],
|
||||
"managed_files": [],
|
||||
"excluded": [],
|
||||
"notes": [],
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
# Minimal artifacts for managed files.
|
||||
(bundle / "artifacts" / "cron" / "var" / "spool" / "cron" / "crontabs").mkdir(
|
||||
parents=True, exist_ok=True
|
||||
)
|
||||
(
|
||||
bundle / "artifacts" / "cron" / "var" / "spool" / "cron" / "crontabs" / "alice"
|
||||
).write_text("@daily echo hi\n", encoding="utf-8")
|
||||
(bundle / "artifacts" / "logrotate" / "etc").mkdir(parents=True, exist_ok=True)
|
||||
(bundle / "artifacts" / "logrotate" / "etc" / "logrotate.conf").write_text(
|
||||
"weekly\n", encoding="utf-8"
|
||||
)
|
||||
|
||||
bundle.mkdir(parents=True, exist_ok=True)
|
||||
(bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8")
|
||||
|
||||
manifest.manifest(str(bundle), str(out))
|
||||
|
||||
pb = (out / "playbook.yml").read_text(encoding="utf-8").splitlines()
|
||||
# Roles are emitted as indented list items under the `roles:` key.
|
||||
roles = [
|
||||
ln.strip().removeprefix("- ").strip() for ln in pb if ln.startswith(" - ")
|
||||
]
|
||||
|
||||
# Ensure tail ordering.
|
||||
assert roles[-2:] == ["cron", "logrotate"]
|
||||
assert "users" in roles
|
||||
assert roles.index("users") < roles.index("cron")
|
||||
|
||||
|
||||
def test_yaml_helpers_fallback_when_yaml_unavailable(monkeypatch):
|
||||
monkeypatch.setattr(manifest, "_try_yaml", lambda: None)
|
||||
assert manifest._yaml_load_mapping("foo: 1\n") == {}
|
||||
out = manifest._yaml_dump_mapping({"b": 2, "a": 1})
|
||||
# Best-effort fallback is key: repr(value)
|
||||
assert out.splitlines()[0].startswith("a: ")
|
||||
assert out.endswith("\n")
|
||||
|
||||
|
||||
def test_copy2_replace_makes_readonly_sources_user_writable(
|
||||
monkeypatch, tmp_path: Path
|
||||
):
|
||||
src = tmp_path / "src.txt"
|
||||
dst = tmp_path / "dst.txt"
|
||||
src.write_text("hello", encoding="utf-8")
|
||||
# Make source read-only; copy2 preserves mode, so tmp will be read-only too.
|
||||
os.chmod(src, 0o444)
|
||||
|
||||
manifest._copy2_replace(str(src), str(dst))
|
||||
|
||||
st = os.stat(dst, follow_symlinks=False)
|
||||
assert stat.S_IMODE(st.st_mode) & stat.S_IWUSR
|
||||
|
||||
|
||||
def test_prepare_bundle_dir_sops_decrypts_and_extracts(monkeypatch, tmp_path: Path):
|
||||
enc = tmp_path / "harvest.tar.gz.sops"
|
||||
enc.write_text("ignored", encoding="utf-8")
|
||||
|
||||
def fake_require():
|
||||
return None
|
||||
|
||||
def fake_decrypt(src: str, dst: str, *, mode: int = 0o600):
|
||||
# Create a minimal tar.gz with a state.json file.
|
||||
with tarfile.open(dst, "w:gz") as tf:
|
||||
p = tmp_path / "state.json"
|
||||
p.write_text("{}", encoding="utf-8")
|
||||
tf.add(p, arcname="state.json")
|
||||
|
||||
monkeypatch.setattr(manifest, "require_sops_cmd", fake_require)
|
||||
monkeypatch.setattr(manifest, "decrypt_file_binary_to", fake_decrypt)
|
||||
|
||||
bundle_dir, td = manifest._prepare_bundle_dir(str(enc), sops_mode=True)
|
||||
try:
|
||||
assert (Path(bundle_dir) / "state.json").exists()
|
||||
finally:
|
||||
td.cleanup()
|
||||
|
||||
|
||||
def test_prepare_bundle_dir_rejects_non_dir_without_sops(tmp_path: Path):
|
||||
fp = tmp_path / "bundle.tar.gz"
|
||||
fp.write_text("x", encoding="utf-8")
|
||||
with pytest.raises(RuntimeError):
|
||||
manifest._prepare_bundle_dir(str(fp), sops_mode=False)
|
||||
|
||||
|
||||
def test_tar_dir_to_with_progress_writes_progress_when_tty(monkeypatch, tmp_path: Path):
|
||||
src = tmp_path / "dir"
|
||||
src.mkdir()
|
||||
(src / "a.txt").write_text("a", encoding="utf-8")
|
||||
(src / "b.txt").write_text("b", encoding="utf-8")
|
||||
|
||||
out = tmp_path / "out.tar.gz"
|
||||
writes: list[bytes] = []
|
||||
|
||||
monkeypatch.setattr(manifest.os, "isatty", lambda fd: True)
|
||||
monkeypatch.setattr(manifest.os, "write", lambda fd, b: writes.append(b) or len(b))
|
||||
|
||||
manifest._tar_dir_to_with_progress(str(src), str(out), desc="tarring")
|
||||
assert out.exists()
|
||||
assert writes # progress was written
|
||||
assert writes[-1].endswith(b"\n")
|
||||
|
||||
|
||||
def test_encrypt_manifest_out_dir_to_sops_handles_missing_tmp_cleanup(
|
||||
monkeypatch, tmp_path: Path
|
||||
):
|
||||
src_dir = tmp_path / "manifest"
|
||||
src_dir.mkdir()
|
||||
(src_dir / "x.txt").write_text("x", encoding="utf-8")
|
||||
|
||||
out = tmp_path / "manifest.tar.gz.sops"
|
||||
|
||||
monkeypatch.setattr(manifest, "require_sops_cmd", lambda: None)
|
||||
|
||||
def fake_encrypt(in_fp, out_fp, *args, **kwargs):
|
||||
Path(out_fp).write_text("enc", encoding="utf-8")
|
||||
|
||||
monkeypatch.setattr(manifest, "encrypt_file_binary", fake_encrypt)
|
||||
# Simulate race where tmp tar is already removed.
|
||||
monkeypatch.setattr(
|
||||
manifest.os, "unlink", lambda p: (_ for _ in ()).throw(FileNotFoundError())
|
||||
)
|
||||
|
||||
res = manifest._encrypt_manifest_out_dir_to_sops(str(src_dir), str(out), ["ABC"]) # type: ignore[arg-type]
|
||||
assert str(res).endswith(".sops")
|
||||
assert out.exists()
|
||||
|
||||
|
||||
def test_manifest_applies_jinjaturtle_to_jinjifyable_managed_file(
|
||||
monkeypatch, tmp_path: Path
|
||||
):
|
||||
# Create a minimal bundle with just an apt_config snapshot.
|
||||
bundle = tmp_path / "bundle"
|
||||
(bundle / "artifacts" / "apt_config" / "etc" / "apt").mkdir(parents=True)
|
||||
(bundle / "artifacts" / "apt_config" / "etc" / "apt" / "foo.ini").write_text(
|
||||
"key=VALUE\n", encoding="utf-8"
|
||||
)
|
||||
|
||||
state = {
|
||||
"schema_version": 1,
|
||||
"inventory": {"packages": {}},
|
||||
"roles": {
|
||||
"services": [],
|
||||
"packages": [],
|
||||
"apt_config": {
|
||||
"role_name": "apt_config",
|
||||
"managed_files": [
|
||||
{
|
||||
"path": "/etc/apt/foo.ini",
|
||||
"src_rel": "etc/apt/foo.ini",
|
||||
"owner": "root",
|
||||
"group": "root",
|
||||
"mode": "0644",
|
||||
"reason": "apt_config",
|
||||
}
|
||||
],
|
||||
"managed_dirs": [],
|
||||
"excluded": [],
|
||||
"notes": [],
|
||||
},
|
||||
},
|
||||
}
|
||||
(bundle / "state.json").write_text(
|
||||
__import__("json").dumps(state), encoding="utf-8"
|
||||
)
|
||||
|
||||
monkeypatch.setattr(manifest, "find_jinjaturtle_cmd", lambda: "jinjaturtle")
|
||||
|
||||
class _Res:
|
||||
template_text = "key={{ foo }}\n"
|
||||
vars_text = "foo: 123\n"
|
||||
|
||||
monkeypatch.setattr(manifest, "run_jinjaturtle", lambda *a, **k: _Res())
|
||||
|
||||
out_dir = tmp_path / "out"
|
||||
manifest.manifest(str(bundle), str(out_dir), jinjaturtle="on")
|
||||
|
||||
tmpl = out_dir / "roles" / "apt_config" / "templates" / "etc" / "apt" / "foo.ini.j2"
|
||||
assert tmpl.exists()
|
||||
assert "{{ foo }}" in tmpl.read_text(encoding="utf-8")
|
||||
|
||||
defaults = out_dir / "roles" / "apt_config" / "defaults" / "main.yml"
|
||||
txt = defaults.read_text(encoding="utf-8")
|
||||
assert "foo: 123" in txt
|
||||
# Non-templated file should not exist under files/.
|
||||
assert not (
|
||||
out_dir / "roles" / "apt_config" / "files" / "etc" / "apt" / "foo.ini"
|
||||
).exists()
|
||||
|
|
|
|||
|
|
@ -1,13 +1,5 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import stat
|
||||
import subprocess
|
||||
import sys
|
||||
import types
|
||||
from pathlib import Path
|
||||
from types import SimpleNamespace
|
||||
|
||||
import pytest
|
||||
|
||||
|
|
@ -102,315 +94,3 @@ def test_sops_pgp_arg_and_encrypt_decrypt_roundtrip(tmp_path: Path, monkeypatch)
|
|||
# Sanity: we invoked encrypt and decrypt.
|
||||
assert any("--encrypt" in c for c in calls)
|
||||
assert any("--decrypt" in c for c in calls)
|
||||
|
||||
|
||||
def test_cache_dir_defaults_to_home_cache(monkeypatch, tmp_path: Path):
|
||||
# Ensure default path uses ~/.cache when XDG_CACHE_HOME is unset.
|
||||
from enroll.cache import enroll_cache_dir
|
||||
|
||||
monkeypatch.delenv("XDG_CACHE_HOME", raising=False)
|
||||
monkeypatch.setattr(Path, "home", lambda: tmp_path)
|
||||
|
||||
p = enroll_cache_dir()
|
||||
assert str(p).startswith(str(tmp_path))
|
||||
assert p.name == "enroll"
|
||||
|
||||
|
||||
def test_harvest_cache_state_json_property(tmp_path: Path):
|
||||
from enroll.cache import HarvestCache
|
||||
|
||||
hc = HarvestCache(tmp_path / "h1")
|
||||
assert hc.state_json == hc.dir / "state.json"
|
||||
|
||||
|
||||
def test_cache_dir_security_rejects_symlink(tmp_path: Path):
|
||||
from enroll.cache import _ensure_dir_secure
|
||||
|
||||
real = tmp_path / "real"
|
||||
real.mkdir()
|
||||
link = tmp_path / "link"
|
||||
link.symlink_to(real, target_is_directory=True)
|
||||
|
||||
with pytest.raises(RuntimeError, match="Refusing to use symlink"):
|
||||
_ensure_dir_secure(link)
|
||||
|
||||
|
||||
def test_cache_dir_chmod_failures_are_ignored(monkeypatch, tmp_path: Path):
|
||||
from enroll import cache
|
||||
|
||||
# Make the cache base path deterministic and writable.
|
||||
monkeypatch.setattr(cache, "enroll_cache_dir", lambda: tmp_path)
|
||||
|
||||
# Force os.chmod to fail to cover the "except OSError: pass" paths.
|
||||
monkeypatch.setattr(
|
||||
os, "chmod", lambda *a, **k: (_ for _ in ()).throw(OSError("nope"))
|
||||
)
|
||||
|
||||
hc = cache.new_harvest_cache_dir()
|
||||
assert hc.dir.exists()
|
||||
assert hc.dir.is_dir()
|
||||
|
||||
|
||||
def test_stat_triplet_falls_back_to_numeric_ids(monkeypatch, tmp_path: Path):
|
||||
from enroll.fsutil import stat_triplet
|
||||
import pwd
|
||||
import grp
|
||||
|
||||
p = tmp_path / "x"
|
||||
p.write_text("x", encoding="utf-8")
|
||||
|
||||
# Force username/group resolution failures.
|
||||
monkeypatch.setattr(
|
||||
pwd, "getpwuid", lambda _uid: (_ for _ in ()).throw(KeyError("no user"))
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
grp, "getgrgid", lambda _gid: (_ for _ in ()).throw(KeyError("no group"))
|
||||
)
|
||||
|
||||
owner, group, mode = stat_triplet(str(p))
|
||||
assert owner.isdigit()
|
||||
assert group.isdigit()
|
||||
assert len(mode) == 4
|
||||
|
||||
|
||||
def test_ignore_policy_iter_effective_lines_removes_block_comments():
|
||||
from enroll.ignore import IgnorePolicy
|
||||
|
||||
pol = IgnorePolicy()
|
||||
data = b"""keep1
|
||||
/*
|
||||
drop me
|
||||
*/
|
||||
keep2
|
||||
"""
|
||||
assert list(pol.iter_effective_lines(data)) == [b"keep1", b"keep2"]
|
||||
|
||||
|
||||
def test_ignore_policy_deny_reason_dir_variants(tmp_path: Path):
|
||||
from enroll.ignore import IgnorePolicy
|
||||
|
||||
pol = IgnorePolicy()
|
||||
|
||||
# denied by glob
|
||||
assert pol.deny_reason_dir("/etc/shadow") == "denied_path"
|
||||
|
||||
# symlink rejected
|
||||
d = tmp_path / "d"
|
||||
d.mkdir()
|
||||
link = tmp_path / "l"
|
||||
link.symlink_to(d, target_is_directory=True)
|
||||
assert pol.deny_reason_dir(str(link)) == "symlink"
|
||||
|
||||
# not a directory
|
||||
f = tmp_path / "f"
|
||||
f.write_text("x", encoding="utf-8")
|
||||
assert pol.deny_reason_dir(str(f)) == "not_directory"
|
||||
|
||||
# ok
|
||||
assert pol.deny_reason_dir(str(d)) is None
|
||||
|
||||
|
||||
def test_run_jinjaturtle_parses_outputs(monkeypatch, tmp_path: Path):
|
||||
# Fully unit-test enroll.jinjaturtle.run_jinjaturtle by stubbing subprocess.run.
|
||||
from enroll.jinjaturtle import run_jinjaturtle
|
||||
|
||||
def fake_run(cmd, **kwargs): # noqa: ARG001
|
||||
# cmd includes "-d <defaults> -t <template>"
|
||||
d_idx = cmd.index("-d") + 1
|
||||
t_idx = cmd.index("-t") + 1
|
||||
defaults = Path(cmd[d_idx])
|
||||
template = Path(cmd[t_idx])
|
||||
defaults.write_text("---\nfoo: 1\n", encoding="utf-8")
|
||||
template.write_text("value={{ foo }}\n", encoding="utf-8")
|
||||
return SimpleNamespace(returncode=0, stdout="ok", stderr="")
|
||||
|
||||
monkeypatch.setattr(subprocess, "run", fake_run)
|
||||
|
||||
src = tmp_path / "src.ini"
|
||||
src.write_text("foo=1\n", encoding="utf-8")
|
||||
|
||||
res = run_jinjaturtle("/bin/jinjaturtle", str(src), role_name="role1")
|
||||
assert "foo: 1" in res.vars_text
|
||||
assert "value=" in res.template_text
|
||||
|
||||
|
||||
def test_run_jinjaturtle_raises_on_failure(monkeypatch, tmp_path: Path):
|
||||
from enroll.jinjaturtle import run_jinjaturtle
|
||||
|
||||
def fake_run(cmd, **kwargs): # noqa: ARG001
|
||||
return SimpleNamespace(returncode=2, stdout="out", stderr="bad")
|
||||
|
||||
monkeypatch.setattr(subprocess, "run", fake_run)
|
||||
|
||||
src = tmp_path / "src.ini"
|
||||
src.write_text("x", encoding="utf-8")
|
||||
with pytest.raises(RuntimeError, match="jinjaturtle failed"):
|
||||
run_jinjaturtle("/bin/jinjaturtle", str(src), role_name="role1")
|
||||
|
||||
|
||||
def test_require_sops_cmd_errors_when_missing(monkeypatch):
|
||||
from enroll.sopsutil import require_sops_cmd, SopsError
|
||||
|
||||
monkeypatch.setattr("enroll.sopsutil.shutil.which", lambda _: None)
|
||||
with pytest.raises(SopsError, match="not found on PATH"):
|
||||
require_sops_cmd()
|
||||
|
||||
|
||||
def test_get_enroll_version_reports_unknown_on_metadata_failure(monkeypatch):
|
||||
import enroll.version as v
|
||||
|
||||
fake_meta = types.ModuleType("importlib.metadata")
|
||||
|
||||
def boom():
|
||||
raise RuntimeError("boom")
|
||||
|
||||
fake_meta.packages_distributions = boom
|
||||
fake_meta.version = lambda _dist: boom()
|
||||
|
||||
monkeypatch.setitem(sys.modules, "importlib.metadata", fake_meta)
|
||||
assert v.get_enroll_version() == "unknown"
|
||||
|
||||
|
||||
def test_get_enroll_version_returns_unknown_if_importlib_metadata_unavailable(
|
||||
monkeypatch,
|
||||
):
|
||||
import builtins
|
||||
import enroll.version as v
|
||||
|
||||
real_import = builtins.__import__
|
||||
|
||||
def fake_import(
|
||||
name, globals=None, locals=None, fromlist=(), level=0
|
||||
): # noqa: A002
|
||||
if name == "importlib.metadata":
|
||||
raise ImportError("no metadata")
|
||||
return real_import(name, globals, locals, fromlist, level)
|
||||
|
||||
monkeypatch.setattr(builtins, "__import__", fake_import)
|
||||
assert v.get_enroll_version() == "unknown"
|
||||
|
||||
|
||||
def test_compare_harvests_and_format_report(tmp_path: Path):
|
||||
from enroll.diff import compare_harvests, format_report
|
||||
|
||||
old = tmp_path / "old"
|
||||
new = tmp_path / "new"
|
||||
(old / "artifacts").mkdir(parents=True)
|
||||
(new / "artifacts").mkdir(parents=True)
|
||||
|
||||
def write_state(base: Path, state: dict) -> None:
|
||||
base.mkdir(parents=True, exist_ok=True)
|
||||
(base / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8")
|
||||
|
||||
# Old bundle: pkg a@1.0, pkg b@1.0, one service, one user, one managed file.
|
||||
old_state = {
|
||||
"schema_version": 3,
|
||||
"host": {"hostname": "h1"},
|
||||
"inventory": {"packages": {"a": {"version": "1.0"}, "b": {"version": "1.0"}}},
|
||||
"roles": {
|
||||
"services": [
|
||||
{
|
||||
"unit": "svc.service",
|
||||
"role_name": "svc",
|
||||
"packages": ["a"],
|
||||
"active_state": "inactive",
|
||||
"sub_state": "dead",
|
||||
"unit_file_state": "enabled",
|
||||
"condition_result": None,
|
||||
"managed_files": [
|
||||
{
|
||||
"path": "/etc/foo.conf",
|
||||
"src_rel": "etc/foo.conf",
|
||||
"owner": "root",
|
||||
"group": "root",
|
||||
"mode": "0644",
|
||||
"reason": "modified_conffile",
|
||||
}
|
||||
],
|
||||
}
|
||||
],
|
||||
"packages": [],
|
||||
"users": {
|
||||
"role_name": "users",
|
||||
"users": [{"name": "alice", "shell": "/bin/sh"}],
|
||||
},
|
||||
"apt_config": {"role_name": "apt_config", "managed_files": []},
|
||||
"etc_custom": {"role_name": "etc_custom", "managed_files": []},
|
||||
"usr_local_custom": {"role_name": "usr_local_custom", "managed_files": []},
|
||||
"extra_paths": {"role_name": "extra_paths", "managed_files": []},
|
||||
},
|
||||
}
|
||||
(old / "artifacts" / "svc" / "etc").mkdir(parents=True, exist_ok=True)
|
||||
(old / "artifacts" / "svc" / "etc" / "foo.conf").write_text("old", encoding="utf-8")
|
||||
write_state(old, old_state)
|
||||
|
||||
# New bundle: pkg a@2.0, pkg c@1.0, service changed, user changed, file moved role+content.
|
||||
new_state = {
|
||||
"schema_version": 3,
|
||||
"host": {"hostname": "h2"},
|
||||
"inventory": {"packages": {"a": {"version": "2.0"}, "c": {"version": "1.0"}}},
|
||||
"roles": {
|
||||
"services": [
|
||||
{
|
||||
"unit": "svc.service",
|
||||
"role_name": "svc",
|
||||
"packages": ["a", "c"],
|
||||
"active_state": "active",
|
||||
"sub_state": "running",
|
||||
"unit_file_state": "enabled",
|
||||
"condition_result": None,
|
||||
"managed_files": [],
|
||||
}
|
||||
],
|
||||
"packages": [],
|
||||
"users": {
|
||||
"role_name": "users",
|
||||
"users": [{"name": "alice", "shell": "/bin/bash"}, {"name": "bob"}],
|
||||
},
|
||||
"apt_config": {"role_name": "apt_config", "managed_files": []},
|
||||
"etc_custom": {"role_name": "etc_custom", "managed_files": []},
|
||||
"usr_local_custom": {"role_name": "usr_local_custom", "managed_files": []},
|
||||
"extra_paths": {
|
||||
"role_name": "extra_paths",
|
||||
"managed_files": [
|
||||
{
|
||||
"path": "/etc/foo.conf",
|
||||
"src_rel": "etc/foo.conf",
|
||||
"owner": "root",
|
||||
"group": "root",
|
||||
"mode": "0600",
|
||||
"reason": "user_include",
|
||||
},
|
||||
{
|
||||
"path": "/etc/added.conf",
|
||||
"src_rel": "etc/added.conf",
|
||||
"owner": "root",
|
||||
"group": "root",
|
||||
"mode": "0644",
|
||||
"reason": "user_include",
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
}
|
||||
(new / "artifacts" / "extra_paths" / "etc").mkdir(parents=True, exist_ok=True)
|
||||
(new / "artifacts" / "extra_paths" / "etc" / "foo.conf").write_text(
|
||||
"new", encoding="utf-8"
|
||||
)
|
||||
(new / "artifacts" / "extra_paths" / "etc" / "added.conf").write_text(
|
||||
"x", encoding="utf-8"
|
||||
)
|
||||
write_state(new, new_state)
|
||||
|
||||
report, changed = compare_harvests(str(old), str(new))
|
||||
assert changed is True
|
||||
|
||||
txt = format_report(report, fmt="text")
|
||||
assert "Packages" in txt
|
||||
|
||||
md = format_report(report, fmt="markdown")
|
||||
assert "# enroll diff report" in md
|
||||
|
||||
js = format_report(report, fmt="json")
|
||||
parsed = json.loads(js)
|
||||
assert parsed["packages"]["added"] == ["c"]
|
||||
|
|
|
|||
323
tests/test_more_coverage.py
Normal file
323
tests/test_more_coverage.py
Normal file
|
|
@ -0,0 +1,323 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import types
|
||||
from pathlib import Path
|
||||
from types import SimpleNamespace
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
def test_cache_dir_defaults_to_home_cache(monkeypatch, tmp_path: Path):
|
||||
# Ensure default path uses ~/.cache when XDG_CACHE_HOME is unset.
|
||||
from enroll.cache import enroll_cache_dir
|
||||
|
||||
monkeypatch.delenv("XDG_CACHE_HOME", raising=False)
|
||||
monkeypatch.setattr(Path, "home", lambda: tmp_path)
|
||||
|
||||
p = enroll_cache_dir()
|
||||
assert str(p).startswith(str(tmp_path))
|
||||
assert p.name == "enroll"
|
||||
|
||||
|
||||
def test_harvest_cache_state_json_property(tmp_path: Path):
|
||||
from enroll.cache import HarvestCache
|
||||
|
||||
hc = HarvestCache(tmp_path / "h1")
|
||||
assert hc.state_json == hc.dir / "state.json"
|
||||
|
||||
|
||||
def test_cache_dir_security_rejects_symlink(tmp_path: Path):
|
||||
from enroll.cache import _ensure_dir_secure
|
||||
|
||||
real = tmp_path / "real"
|
||||
real.mkdir()
|
||||
link = tmp_path / "link"
|
||||
link.symlink_to(real, target_is_directory=True)
|
||||
|
||||
with pytest.raises(RuntimeError, match="Refusing to use symlink"):
|
||||
_ensure_dir_secure(link)
|
||||
|
||||
|
||||
def test_cache_dir_chmod_failures_are_ignored(monkeypatch, tmp_path: Path):
|
||||
from enroll import cache
|
||||
|
||||
# Make the cache base path deterministic and writable.
|
||||
monkeypatch.setattr(cache, "enroll_cache_dir", lambda: tmp_path)
|
||||
|
||||
# Force os.chmod to fail to cover the "except OSError: pass" paths.
|
||||
monkeypatch.setattr(
|
||||
os, "chmod", lambda *a, **k: (_ for _ in ()).throw(OSError("nope"))
|
||||
)
|
||||
|
||||
hc = cache.new_harvest_cache_dir()
|
||||
assert hc.dir.exists()
|
||||
assert hc.dir.is_dir()
|
||||
|
||||
|
||||
def test_stat_triplet_falls_back_to_numeric_ids(monkeypatch, tmp_path: Path):
|
||||
from enroll.fsutil import stat_triplet
|
||||
import pwd
|
||||
import grp
|
||||
|
||||
p = tmp_path / "x"
|
||||
p.write_text("x", encoding="utf-8")
|
||||
|
||||
# Force username/group resolution failures.
|
||||
monkeypatch.setattr(
|
||||
pwd, "getpwuid", lambda _uid: (_ for _ in ()).throw(KeyError("no user"))
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
grp, "getgrgid", lambda _gid: (_ for _ in ()).throw(KeyError("no group"))
|
||||
)
|
||||
|
||||
owner, group, mode = stat_triplet(str(p))
|
||||
assert owner.isdigit()
|
||||
assert group.isdigit()
|
||||
assert len(mode) == 4
|
||||
|
||||
|
||||
def test_ignore_policy_iter_effective_lines_removes_block_comments():
|
||||
from enroll.ignore import IgnorePolicy
|
||||
|
||||
pol = IgnorePolicy()
|
||||
data = b"""keep1
|
||||
/*
|
||||
drop me
|
||||
*/
|
||||
keep2
|
||||
"""
|
||||
assert list(pol.iter_effective_lines(data)) == [b"keep1", b"keep2"]
|
||||
|
||||
|
||||
def test_ignore_policy_deny_reason_dir_variants(tmp_path: Path):
|
||||
from enroll.ignore import IgnorePolicy
|
||||
|
||||
pol = IgnorePolicy()
|
||||
|
||||
# denied by glob
|
||||
assert pol.deny_reason_dir("/etc/shadow") == "denied_path"
|
||||
|
||||
# symlink rejected
|
||||
d = tmp_path / "d"
|
||||
d.mkdir()
|
||||
link = tmp_path / "l"
|
||||
link.symlink_to(d, target_is_directory=True)
|
||||
assert pol.deny_reason_dir(str(link)) == "symlink"
|
||||
|
||||
# not a directory
|
||||
f = tmp_path / "f"
|
||||
f.write_text("x", encoding="utf-8")
|
||||
assert pol.deny_reason_dir(str(f)) == "not_directory"
|
||||
|
||||
# ok
|
||||
assert pol.deny_reason_dir(str(d)) is None
|
||||
|
||||
|
||||
def test_run_jinjaturtle_parses_outputs(monkeypatch, tmp_path: Path):
|
||||
# Fully unit-test enroll.jinjaturtle.run_jinjaturtle by stubbing subprocess.run.
|
||||
from enroll.jinjaturtle import run_jinjaturtle
|
||||
|
||||
def fake_run(cmd, **kwargs): # noqa: ARG001
|
||||
# cmd includes "-d <defaults> -t <template>"
|
||||
d_idx = cmd.index("-d") + 1
|
||||
t_idx = cmd.index("-t") + 1
|
||||
defaults = Path(cmd[d_idx])
|
||||
template = Path(cmd[t_idx])
|
||||
defaults.write_text("---\nfoo: 1\n", encoding="utf-8")
|
||||
template.write_text("value={{ foo }}\n", encoding="utf-8")
|
||||
return SimpleNamespace(returncode=0, stdout="ok", stderr="")
|
||||
|
||||
monkeypatch.setattr(subprocess, "run", fake_run)
|
||||
|
||||
src = tmp_path / "src.ini"
|
||||
src.write_text("foo=1\n", encoding="utf-8")
|
||||
|
||||
res = run_jinjaturtle("/bin/jinjaturtle", str(src), role_name="role1")
|
||||
assert "foo: 1" in res.vars_text
|
||||
assert "value=" in res.template_text
|
||||
|
||||
|
||||
def test_run_jinjaturtle_raises_on_failure(monkeypatch, tmp_path: Path):
|
||||
from enroll.jinjaturtle import run_jinjaturtle
|
||||
|
||||
def fake_run(cmd, **kwargs): # noqa: ARG001
|
||||
return SimpleNamespace(returncode=2, stdout="out", stderr="bad")
|
||||
|
||||
monkeypatch.setattr(subprocess, "run", fake_run)
|
||||
|
||||
src = tmp_path / "src.ini"
|
||||
src.write_text("x", encoding="utf-8")
|
||||
with pytest.raises(RuntimeError, match="jinjaturtle failed"):
|
||||
run_jinjaturtle("/bin/jinjaturtle", str(src), role_name="role1")
|
||||
|
||||
|
||||
def test_require_sops_cmd_errors_when_missing(monkeypatch):
|
||||
from enroll.sopsutil import require_sops_cmd, SopsError
|
||||
|
||||
monkeypatch.setattr("enroll.sopsutil.shutil.which", lambda _: None)
|
||||
with pytest.raises(SopsError, match="not found on PATH"):
|
||||
require_sops_cmd()
|
||||
|
||||
|
||||
def test_get_enroll_version_reports_unknown_on_metadata_failure(monkeypatch):
|
||||
import enroll.version as v
|
||||
|
||||
fake_meta = types.ModuleType("importlib.metadata")
|
||||
|
||||
def boom():
|
||||
raise RuntimeError("boom")
|
||||
|
||||
fake_meta.packages_distributions = boom
|
||||
fake_meta.version = lambda _dist: boom()
|
||||
|
||||
monkeypatch.setitem(sys.modules, "importlib.metadata", fake_meta)
|
||||
assert v.get_enroll_version() == "unknown"
|
||||
|
||||
|
||||
def test_get_enroll_version_returns_unknown_if_importlib_metadata_unavailable(
|
||||
monkeypatch,
|
||||
):
|
||||
import builtins
|
||||
import enroll.version as v
|
||||
|
||||
real_import = builtins.__import__
|
||||
|
||||
def fake_import(
|
||||
name, globals=None, locals=None, fromlist=(), level=0
|
||||
): # noqa: A002
|
||||
if name == "importlib.metadata":
|
||||
raise ImportError("no metadata")
|
||||
return real_import(name, globals, locals, fromlist, level)
|
||||
|
||||
monkeypatch.setattr(builtins, "__import__", fake_import)
|
||||
assert v.get_enroll_version() == "unknown"
|
||||
|
||||
|
||||
def test_compare_harvests_and_format_report(tmp_path: Path):
|
||||
from enroll.diff import compare_harvests, format_report
|
||||
|
||||
old = tmp_path / "old"
|
||||
new = tmp_path / "new"
|
||||
(old / "artifacts").mkdir(parents=True)
|
||||
(new / "artifacts").mkdir(parents=True)
|
||||
|
||||
def write_state(base: Path, state: dict) -> None:
|
||||
base.mkdir(parents=True, exist_ok=True)
|
||||
(base / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8")
|
||||
|
||||
# Old bundle: pkg a@1.0, pkg b@1.0, one service, one user, one managed file.
|
||||
old_state = {
|
||||
"schema_version": 3,
|
||||
"host": {"hostname": "h1"},
|
||||
"inventory": {"packages": {"a": {"version": "1.0"}, "b": {"version": "1.0"}}},
|
||||
"roles": {
|
||||
"services": [
|
||||
{
|
||||
"unit": "svc.service",
|
||||
"role_name": "svc",
|
||||
"packages": ["a"],
|
||||
"active_state": "inactive",
|
||||
"sub_state": "dead",
|
||||
"unit_file_state": "enabled",
|
||||
"condition_result": None,
|
||||
"managed_files": [
|
||||
{
|
||||
"path": "/etc/foo.conf",
|
||||
"src_rel": "etc/foo.conf",
|
||||
"owner": "root",
|
||||
"group": "root",
|
||||
"mode": "0644",
|
||||
"reason": "modified_conffile",
|
||||
}
|
||||
],
|
||||
}
|
||||
],
|
||||
"packages": [],
|
||||
"users": {
|
||||
"role_name": "users",
|
||||
"users": [{"name": "alice", "shell": "/bin/sh"}],
|
||||
},
|
||||
"apt_config": {"role_name": "apt_config", "managed_files": []},
|
||||
"etc_custom": {"role_name": "etc_custom", "managed_files": []},
|
||||
"usr_local_custom": {"role_name": "usr_local_custom", "managed_files": []},
|
||||
"extra_paths": {"role_name": "extra_paths", "managed_files": []},
|
||||
},
|
||||
}
|
||||
(old / "artifacts" / "svc" / "etc").mkdir(parents=True, exist_ok=True)
|
||||
(old / "artifacts" / "svc" / "etc" / "foo.conf").write_text("old", encoding="utf-8")
|
||||
write_state(old, old_state)
|
||||
|
||||
# New bundle: pkg a@2.0, pkg c@1.0, service changed, user changed, file moved role+content.
|
||||
new_state = {
|
||||
"schema_version": 3,
|
||||
"host": {"hostname": "h2"},
|
||||
"inventory": {"packages": {"a": {"version": "2.0"}, "c": {"version": "1.0"}}},
|
||||
"roles": {
|
||||
"services": [
|
||||
{
|
||||
"unit": "svc.service",
|
||||
"role_name": "svc",
|
||||
"packages": ["a", "c"],
|
||||
"active_state": "active",
|
||||
"sub_state": "running",
|
||||
"unit_file_state": "enabled",
|
||||
"condition_result": None,
|
||||
"managed_files": [],
|
||||
}
|
||||
],
|
||||
"packages": [],
|
||||
"users": {
|
||||
"role_name": "users",
|
||||
"users": [{"name": "alice", "shell": "/bin/bash"}, {"name": "bob"}],
|
||||
},
|
||||
"apt_config": {"role_name": "apt_config", "managed_files": []},
|
||||
"etc_custom": {"role_name": "etc_custom", "managed_files": []},
|
||||
"usr_local_custom": {"role_name": "usr_local_custom", "managed_files": []},
|
||||
"extra_paths": {
|
||||
"role_name": "extra_paths",
|
||||
"managed_files": [
|
||||
{
|
||||
"path": "/etc/foo.conf",
|
||||
"src_rel": "etc/foo.conf",
|
||||
"owner": "root",
|
||||
"group": "root",
|
||||
"mode": "0600",
|
||||
"reason": "user_include",
|
||||
},
|
||||
{
|
||||
"path": "/etc/added.conf",
|
||||
"src_rel": "etc/added.conf",
|
||||
"owner": "root",
|
||||
"group": "root",
|
||||
"mode": "0644",
|
||||
"reason": "user_include",
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
}
|
||||
(new / "artifacts" / "extra_paths" / "etc").mkdir(parents=True, exist_ok=True)
|
||||
(new / "artifacts" / "extra_paths" / "etc" / "foo.conf").write_text(
|
||||
"new", encoding="utf-8"
|
||||
)
|
||||
(new / "artifacts" / "extra_paths" / "etc" / "added.conf").write_text(
|
||||
"x", encoding="utf-8"
|
||||
)
|
||||
write_state(new, new_state)
|
||||
|
||||
report, changed = compare_harvests(str(old), str(new))
|
||||
assert changed is True
|
||||
|
||||
txt = format_report(report, fmt="text")
|
||||
assert "Packages" in txt
|
||||
|
||||
md = format_report(report, fmt="markdown")
|
||||
assert "# enroll diff report" in md
|
||||
|
||||
js = format_report(report, fmt="json")
|
||||
parsed = json.loads(js)
|
||||
assert parsed["packages"]["added"] == ["c"]
|
||||
|
|
@ -3,8 +3,6 @@ from __future__ import annotations
|
|||
import os
|
||||
from pathlib import Path
|
||||
|
||||
import enroll.pathfilter as pf
|
||||
|
||||
|
||||
def test_compile_and_match_prefix_glob_and_regex(tmp_path: Path):
|
||||
from enroll.pathfilter import PathFilter, compile_path_pattern
|
||||
|
|
@ -80,107 +78,3 @@ def test_expand_includes_notes_on_no_matches(tmp_path: Path):
|
|||
paths, notes = expand_includes(pats, max_files=10)
|
||||
assert paths == []
|
||||
assert any("matched no files" in n.lower() for n in notes)
|
||||
|
||||
|
||||
def test_expand_includes_supports_regex_with_inferred_root(tmp_path: Path):
|
||||
"""Regex includes are expanded by walking an inferred literal prefix root."""
|
||||
from enroll.pathfilter import compile_path_pattern, expand_includes
|
||||
|
||||
root = tmp_path / "root"
|
||||
(root / "home" / "alice" / ".config" / "myapp").mkdir(parents=True)
|
||||
target = root / "home" / "alice" / ".config" / "myapp" / "settings.ini"
|
||||
target.write_text("x=1\n", encoding="utf-8")
|
||||
|
||||
# This is anchored and begins with an absolute path, so expand_includes should
|
||||
# infer a narrow walk root instead of scanning '/'.
|
||||
rex = rf"re:^{root}/home/[^/]+/\.config/myapp/.*$"
|
||||
pat = compile_path_pattern(rex)
|
||||
paths, notes = expand_includes([pat], max_files=10)
|
||||
assert str(target) in paths
|
||||
assert notes == []
|
||||
|
||||
|
||||
def test_compile_path_pattern_normalises_relative_prefix():
|
||||
from enroll.pathfilter import compile_path_pattern
|
||||
|
||||
p = compile_path_pattern("etc/ssh")
|
||||
assert p.kind == "prefix"
|
||||
assert p.value == "/etc/ssh"
|
||||
|
||||
|
||||
def test_norm_abs_empty_string_is_root():
|
||||
assert pf._norm_abs("") == "/"
|
||||
|
||||
|
||||
def test_posix_match_invalid_pattern_fails_closed(monkeypatch):
|
||||
# Force PurePosixPath.match to raise to cover the exception handler.
|
||||
real_match = pf.PurePosixPath.match
|
||||
|
||||
def boom(self, pat):
|
||||
raise ValueError("bad pattern")
|
||||
|
||||
monkeypatch.setattr(pf.PurePosixPath, "match", boom)
|
||||
try:
|
||||
assert pf._posix_match("/etc/hosts", "[bad") is False
|
||||
finally:
|
||||
monkeypatch.setattr(pf.PurePosixPath, "match", real_match)
|
||||
|
||||
|
||||
def test_regex_literal_prefix_handles_escapes():
|
||||
# Prefix stops at meta chars but includes escaped literals.
|
||||
assert pf._regex_literal_prefix(r"^/etc/\./foo") == "/etc/./foo"
|
||||
|
||||
|
||||
def test_expand_includes_maybe_add_file_skips_non_files(monkeypatch, tmp_path: Path):
|
||||
# Drive the _maybe_add_file branch that rejects symlinks/non-files.
|
||||
pats = [pf.compile_path_pattern(str(tmp_path / "missing"))]
|
||||
|
||||
monkeypatch.setattr(pf.os.path, "isfile", lambda p: False)
|
||||
monkeypatch.setattr(pf.os.path, "islink", lambda p: False)
|
||||
monkeypatch.setattr(pf.os.path, "isdir", lambda p: False)
|
||||
|
||||
paths, notes = pf.expand_includes(pats, max_files=10)
|
||||
assert paths == []
|
||||
assert any("matched no files" in n for n in notes)
|
||||
|
||||
|
||||
def test_expand_includes_prunes_excluded_dirs(monkeypatch):
|
||||
include = [pf.compile_path_pattern("/root/**")]
|
||||
exclude = pf.PathFilter(exclude=["/root/skip/**"])
|
||||
|
||||
# Simulate filesystem walk:
|
||||
# /root has dirnames ['skip', 'keep'] but skip should be pruned.
|
||||
monkeypatch.setattr(
|
||||
pf.os.path,
|
||||
"isdir",
|
||||
lambda p: p in {"/root", "/root/keep", "/root/skip"},
|
||||
)
|
||||
monkeypatch.setattr(pf.os.path, "islink", lambda p: False)
|
||||
monkeypatch.setattr(pf.os.path, "isfile", lambda p: True)
|
||||
|
||||
def walk(root, followlinks=False):
|
||||
assert root == "/root"
|
||||
yield ("/root", ["skip", "keep"], [])
|
||||
yield ("/root/keep", [], ["a.txt"])
|
||||
# If pruning works, we should never walk into /root/skip.
|
||||
|
||||
monkeypatch.setattr(pf.os, "walk", walk)
|
||||
|
||||
paths, _notes = pf.expand_includes(include, exclude=exclude, max_files=10)
|
||||
assert "/root/keep/a.txt" in paths
|
||||
assert not any(p.startswith("/root/skip") for p in paths)
|
||||
|
||||
|
||||
def test_expand_includes_respects_max_files(monkeypatch):
|
||||
include = [pf.compile_path_pattern("/root/**")]
|
||||
monkeypatch.setattr(pf.os.path, "isdir", lambda p: p == "/root")
|
||||
monkeypatch.setattr(pf.os.path, "islink", lambda p: False)
|
||||
monkeypatch.setattr(pf.os.path, "isfile", lambda p: True)
|
||||
monkeypatch.setattr(
|
||||
pf.os,
|
||||
"walk",
|
||||
lambda root, followlinks=False: [("/root", [], ["a", "b", "c"])],
|
||||
)
|
||||
paths, notes = pf.expand_includes(include, max_files=2)
|
||||
assert len(paths) == 2
|
||||
assert "/root/c" not in paths
|
||||
|
|
|
|||
|
|
@ -129,50 +129,3 @@ def test_rpm_config_files_and_modified_files_parsing(monkeypatch):
|
|||
rpm, "_run", lambda cmd, allow_fail=False, merge_err=False: (1, out)
|
||||
)
|
||||
assert rpm.rpm_modified_files("mypkg") == {"/etc/foo.conf", "/etc/bar"}
|
||||
|
||||
|
||||
def test_list_manual_packages_uses_yum_fallback(monkeypatch):
|
||||
# No dnf, yum present.
|
||||
monkeypatch.setattr(
|
||||
rpm.shutil, "which", lambda exe: "/usr/bin/yum" if exe == "yum" else None
|
||||
)
|
||||
|
||||
def fake_run(cmd, allow_fail=False, merge_err=False):
|
||||
assert cmd[:3] == ["yum", "-q", "history"]
|
||||
return 0, "Installed Packages\nvim-enhanced.x86_64\nhtop\n"
|
||||
|
||||
monkeypatch.setattr(rpm, "_run", fake_run)
|
||||
|
||||
assert rpm.list_manual_packages() == ["htop", "vim-enhanced"]
|
||||
|
||||
|
||||
def test_list_installed_packages_parses_epoch_and_sorts(monkeypatch):
|
||||
out = (
|
||||
"bash\t0\t5.2.26\t1.el9\tx86_64\n"
|
||||
"bash\t1\t5.2.26\t1.el9\taarch64\n"
|
||||
"coreutils\t(none)\t9.1\t2.el9\tx86_64\n"
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
rpm, "_run", lambda cmd, allow_fail=False, merge_err=False: (0, out)
|
||||
)
|
||||
pkgs = rpm.list_installed_packages()
|
||||
assert pkgs["bash"][0]["arch"] == "aarch64" # sorted by arch then version
|
||||
assert pkgs["bash"][0]["version"].startswith("1:")
|
||||
assert pkgs["coreutils"][0]["version"] == "9.1-2.el9"
|
||||
|
||||
|
||||
def test_rpm_config_files_returns_empty_on_failure(monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
rpm, "_run", lambda cmd, allow_fail=False, merge_err=False: (1, "")
|
||||
)
|
||||
assert rpm.rpm_config_files("missing") == set()
|
||||
|
||||
|
||||
def test_rpm_owner_strips_epoch_prefix_when_present(monkeypatch):
|
||||
# Defensive: rpm output might include epoch-like token.
|
||||
monkeypatch.setattr(
|
||||
rpm,
|
||||
"_run",
|
||||
lambda cmd, allow_fail=False, merge_err=False: (0, "1:bash\n"),
|
||||
)
|
||||
assert rpm.rpm_owner("/bin/bash") == "bash"
|
||||
|
|
|
|||
|
|
@ -1,31 +0,0 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import types
|
||||
|
||||
import pytest
|
||||
|
||||
import enroll.rpm as rpm
|
||||
|
||||
|
||||
def test_run_raises_on_nonzero_returncode_when_not_allow_fail(monkeypatch):
|
||||
def fake_run(cmd, check, text, stdout, stderr):
|
||||
return types.SimpleNamespace(returncode=1, stdout="OUT", stderr="ERR")
|
||||
|
||||
monkeypatch.setattr(rpm.subprocess, "run", fake_run)
|
||||
with pytest.raises(RuntimeError) as e:
|
||||
rpm._run(["rpm", "-q"]) # type: ignore[attr-defined]
|
||||
assert "Command failed" in str(e.value)
|
||||
assert "ERR" in str(e.value)
|
||||
assert "OUT" in str(e.value)
|
||||
|
||||
|
||||
def test_run_merge_err_includes_stderr_in_stdout(monkeypatch):
|
||||
def fake_run(cmd, check, text, stdout, stderr):
|
||||
# When merge_err is True, stderr is redirected to STDOUT, so we only
|
||||
# rely on stdout in our wrapper.
|
||||
return types.SimpleNamespace(returncode=0, stdout="COMBINED", stderr=None)
|
||||
|
||||
monkeypatch.setattr(rpm.subprocess, "run", fake_run)
|
||||
rc, out = rpm._run(["rpm", "-q"], merge_err=True)
|
||||
assert rc == 0
|
||||
assert out == "COMBINED"
|
||||
Loading…
Add table
Add a link
Reference in a new issue