Include files from /usr/local/bin and /usr/local/etc in harvest (assuming they aren't binaries or symlinks) and store in usr_local_custom role, similar to etc_custom.
All checks were successful
CI / test (push) Successful in 5m43s
Lint / test (push) Successful in 30s
Trivy / test (push) Successful in 19s

This commit is contained in:
Miguel Jacq 2025-12-18 17:11:04 +11:00
parent b5d2b99174
commit 4660a0703e
Signed by: mig5
GPG key ID: 59B3F0C24135C6A9
11 changed files with 551 additions and 3 deletions

View file

@ -0,0 +1,111 @@
import json
from pathlib import Path
from enroll.diff import compare_harvests
def _write_bundle(root: Path, state: dict, artifacts: dict[str, bytes]) -> None:
root.mkdir(parents=True, exist_ok=True)
(root / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8")
for rel, data in artifacts.items():
p = root / rel
p.parent.mkdir(parents=True, exist_ok=True)
p.write_bytes(data)
def test_diff_includes_usr_local_custom_files(tmp_path: Path):
old = tmp_path / "old"
new = tmp_path / "new"
old_state = {
"host": {"hostname": "h1", "os": "debian"},
"users": {
"role_name": "users",
"users": [],
"managed_files": [],
"excluded": [],
"notes": [],
},
"services": [],
"package_roles": [],
"manual_packages": ["curl"],
"manual_packages_skipped": [],
"etc_custom": {
"role_name": "etc_custom",
"managed_files": [],
"excluded": [],
"notes": [],
},
"usr_local_custom": {
"role_name": "usr_local_custom",
"managed_files": [
{
"path": "/usr/local/etc/myapp.conf",
"src_rel": "usr/local/etc/myapp.conf",
"owner": "root",
"group": "root",
"mode": "0644",
"reason": "usr_local_etc_custom",
}
],
"excluded": [],
"notes": [],
},
}
new_state = {
**old_state,
"manual_packages": ["curl", "htop"],
"usr_local_custom": {
"role_name": "usr_local_custom",
"managed_files": [
{
"path": "/usr/local/etc/myapp.conf",
"src_rel": "usr/local/etc/myapp.conf",
"owner": "root",
"group": "root",
"mode": "0644",
"reason": "usr_local_etc_custom",
},
{
"path": "/usr/local/bin/myscript",
"src_rel": "usr/local/bin/myscript",
"owner": "root",
"group": "root",
"mode": "0755",
"reason": "usr_local_bin_script",
},
],
"excluded": [],
"notes": [],
},
}
_write_bundle(
old,
old_state,
{
"artifacts/usr_local_custom/usr/local/etc/myapp.conf": b"myapp=1\n",
},
)
_write_bundle(
new,
new_state,
{
"artifacts/usr_local_custom/usr/local/etc/myapp.conf": b"myapp=2\n",
"artifacts/usr_local_custom/usr/local/bin/myscript": b"#!/bin/sh\necho hi\n",
},
)
report, has_changes = compare_harvests(str(old), str(new))
assert has_changes is True
# Packages: htop was added.
assert report["packages"]["added"] == ["htop"]
# Files: /usr/local/etc/myapp.conf should be detected as changed (content sha differs).
changed_paths = {c["path"] for c in report["files"]["changed"]}
assert "/usr/local/etc/myapp.conf" in changed_paths
# Files: new script was added.
added_paths = {a["path"] for a in report["files"]["added"]}
assert "/usr/local/bin/myscript" in added_paths

View file

@ -23,30 +23,51 @@ def test_harvest_dedup_manual_packages_and_builds_etc_custom(
real_islink = os.path.islink
# Fake filesystem: two /etc files exist, only one is dpkg-owned.
# Also include some /usr/local files to populate usr_local_custom.
files = {
"/etc/openvpn/server.conf": b"server",
"/etc/default/keyboard": b"kbd",
"/usr/local/etc/myapp.conf": b"myapp=1\n",
"/usr/local/bin/myscript": b"#!/bin/sh\necho hi\n",
# non-executable text under /usr/local/bin should be skipped
"/usr/local/bin/readme.txt": b"hello\n",
}
dirs = {
"/etc",
"/etc/openvpn",
"/etc/default",
"/usr",
"/usr/local",
"/usr/local/etc",
"/usr/local/bin",
}
dirs = {"/etc", "/etc/openvpn", "/etc/default"}
def fake_isfile(p: str) -> bool:
if p.startswith("/etc/") or p == "/etc":
return p in files
if p.startswith("/usr/local/"):
return p in files
return real_isfile(p)
def fake_isdir(p: str) -> bool:
if p.startswith("/etc"):
return p in dirs
if p.startswith("/usr/local") or p in ("/usr", "/usr/local"):
return p in dirs
return real_isdir(p)
def fake_islink(p: str) -> bool:
if p.startswith("/etc"):
return False
if p.startswith("/usr/local"):
return False
return real_islink(p)
def fake_exists(p: str) -> bool:
if p.startswith("/etc"):
return p in files or p in dirs
if p.startswith("/usr/local") or p in ("/usr", "/usr/local"):
return p in files or p in dirs
return real_exists(p)
def fake_walk(root: str):
@ -57,6 +78,10 @@ def test_harvest_dedup_manual_packages_and_builds_etc_custom(
yield ("/etc/openvpn", [], ["server.conf"])
elif root == "/etc/default":
yield ("/etc/default", [], ["keyboard"])
elif root == "/usr/local/etc":
yield ("/usr/local/etc", [], ["myapp.conf"])
elif root == "/usr/local/bin":
yield ("/usr/local/bin", [], ["myscript", "readme.txt"])
else:
yield (root, [], [])
@ -109,7 +134,13 @@ def test_harvest_dedup_manual_packages_and_builds_etc_custom(
monkeypatch.setattr(h, "list_manual_packages", lambda: ["openvpn", "curl"])
monkeypatch.setattr(h, "collect_non_system_users", lambda: [])
monkeypatch.setattr(h, "stat_triplet", lambda p: ("root", "root", "0644"))
def fake_stat_triplet(p: str):
if p == "/usr/local/bin/myscript":
return ("root", "root", "0755")
# /usr/local/bin/readme.txt remains non-executable
return ("root", "root", "0644")
monkeypatch.setattr(h, "stat_triplet", fake_stat_triplet)
# Avoid needing source files on disk by implementing our own bundle copier
def fake_copy(bundle_dir: str, role_name: str, abs_path: str, src_rel: str):
@ -139,3 +170,9 @@ def test_harvest_dedup_manual_packages_and_builds_etc_custom(
assert any(
mf["path"] == "/etc/default/keyboard" for mf in etc_custom["managed_files"]
)
# /usr/local content is attributed to usr_local_custom
ul = st["usr_local_custom"]
assert any(mf["path"] == "/usr/local/etc/myapp.conf" for mf in ul["managed_files"])
assert any(mf["path"] == "/usr/local/bin/myscript" for mf in ul["managed_files"])
assert all(mf["path"] != "/usr/local/bin/readme.txt" for mf in ul["managed_files"])

View file

@ -47,6 +47,29 @@ def test_manifest_writes_roles_and_playbook_with_clean_when(tmp_path: Path):
"excluded": [],
"notes": [],
},
"usr_local_custom": {
"role_name": "usr_local_custom",
"managed_files": [
{
"path": "/usr/local/etc/myapp.conf",
"src_rel": "usr/local/etc/myapp.conf",
"owner": "root",
"group": "root",
"mode": "0644",
"reason": "usr_local_etc_custom",
},
{
"path": "/usr/local/bin/myscript",
"src_rel": "usr/local/bin/myscript",
"owner": "root",
"group": "root",
"mode": "0755",
"reason": "usr_local_bin_script",
},
],
"excluded": [],
"notes": [],
},
"services": [
{
"unit": "foo.service",
@ -92,6 +115,26 @@ def test_manifest_writes_roles_and_playbook_with_clean_when(tmp_path: Path):
"kbd", encoding="utf-8"
)
# Create artifacts for usr_local_custom files so copy works
(bundle / "artifacts" / "usr_local_custom" / "usr" / "local" / "etc").mkdir(
parents=True, exist_ok=True
)
(
bundle
/ "artifacts"
/ "usr_local_custom"
/ "usr"
/ "local"
/ "etc"
/ "myapp.conf"
).write_text("myapp=1\n", encoding="utf-8")
(bundle / "artifacts" / "usr_local_custom" / "usr" / "local" / "bin").mkdir(
parents=True, exist_ok=True
)
(
bundle / "artifacts" / "usr_local_custom" / "usr" / "local" / "bin" / "myscript"
).write_text("#!/bin/sh\necho hi\n", encoding="utf-8")
manifest(str(bundle), str(out))
# Service role: systemd management should be gated on foo_manage_unit and a probe.
@ -119,6 +162,7 @@ def test_manifest_writes_roles_and_playbook_with_clean_when(tmp_path: Path):
pb = (out / "playbook.yml").read_text(encoding="utf-8")
assert "- users" in pb
assert "- etc_custom" in pb
assert "- usr_local_custom" in pb
assert "- curl" in pb
assert "- foo" in pb
@ -168,6 +212,21 @@ def test_manifest_site_mode_creates_host_inventory_and_raw_files(tmp_path: Path)
"excluded": [],
"notes": [],
},
"usr_local_custom": {
"role_name": "usr_local_custom",
"managed_files": [
{
"path": "/usr/local/etc/myapp.conf",
"src_rel": "usr/local/etc/myapp.conf",
"owner": "root",
"group": "root",
"mode": "0644",
"reason": "usr_local_etc_custom",
}
],
"excluded": [],
"notes": [],
},
"services": [
{
"unit": "foo.service",
@ -197,6 +256,20 @@ def test_manifest_site_mode_creates_host_inventory_and_raw_files(tmp_path: Path)
bundle.mkdir(parents=True, exist_ok=True)
(bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8")
# Artifacts for usr_local_custom file so copy works.
(bundle / "artifacts" / "usr_local_custom" / "usr" / "local" / "etc").mkdir(
parents=True, exist_ok=True
)
(
bundle
/ "artifacts"
/ "usr_local_custom"
/ "usr"
/ "local"
/ "etc"
/ "myapp.conf"
).write_text("myapp=1\n", encoding="utf-8")
manifest(str(bundle), str(out), fqdn=fqdn)
# Host playbook exists.

View file

@ -0,0 +1,96 @@
import stat
from pathlib import Path
import pytest
from enroll.cache import _safe_component, new_harvest_cache_dir
from enroll.ignore import IgnorePolicy
from enroll.sopsutil import (
SopsError,
_pgp_arg,
decrypt_file_binary_to,
encrypt_file_binary,
)
def test_safe_component_sanitizes_and_bounds_length():
assert _safe_component(" ") == "unknown"
assert _safe_component("a/b c") == "a_b_c"
assert _safe_component("x" * 200) == "x" * 64
def test_new_harvest_cache_dir_uses_xdg_cache_home(tmp_path: Path, monkeypatch):
monkeypatch.setenv("XDG_CACHE_HOME", str(tmp_path / "xdg"))
hc = new_harvest_cache_dir(hint="my host/01")
assert hc.dir.exists()
assert "my_host_01" in hc.dir.name
assert str(hc.dir).startswith(str(tmp_path / "xdg"))
# best-effort: ensure directory is not world-readable on typical FS
try:
mode = stat.S_IMODE(hc.dir.stat().st_mode)
assert mode & 0o077 == 0
except OSError:
pass
def test_ignore_policy_denies_binary_and_sensitive_content(tmp_path: Path):
p_bin = tmp_path / "binfile"
p_bin.write_bytes(b"abc\x00def")
assert IgnorePolicy().deny_reason(str(p_bin)) == "binary_like"
p_secret = tmp_path / "secret.conf"
p_secret.write_text("password=foo\n", encoding="utf-8")
assert IgnorePolicy().deny_reason(str(p_secret)) == "sensitive_content"
# dangerous mode disables heuristic scanning (but still checks file-ness/size)
assert IgnorePolicy(dangerous=True).deny_reason(str(p_secret)) is None
def test_ignore_policy_denies_usr_local_shadow_by_glob():
# This should short-circuit before stat() (path doesn't need to exist).
assert IgnorePolicy().deny_reason("/usr/local/etc/shadow") == "denied_path"
def test_sops_pgp_arg_and_encrypt_decrypt_roundtrip(tmp_path: Path, monkeypatch):
assert _pgp_arg([" ABC ", "DEF"]) == "ABC,DEF"
with pytest.raises(SopsError):
_pgp_arg([])
# Stub out sops and subprocess.
import enroll.sopsutil as s
monkeypatch.setattr(s, "require_sops_cmd", lambda: "sops")
class R:
def __init__(self, rc: int, out: bytes, err: bytes = b""):
self.returncode = rc
self.stdout = out
self.stderr = err
calls = []
def fake_run(cmd, capture_output, check):
calls.append(cmd)
# Return a deterministic payload so we can assert file writes.
if "--encrypt" in cmd:
return R(0, b"ENCRYPTED")
if "--decrypt" in cmd:
return R(0, b"PLAINTEXT")
return R(1, b"", b"bad")
monkeypatch.setattr(s.subprocess, "run", fake_run)
src = tmp_path / "src.bin"
src.write_bytes(b"x")
enc = tmp_path / "out.sops"
dec = tmp_path / "out.bin"
encrypt_file_binary(src, enc, pgp_fingerprints=["ABC"], mode=0o600)
assert enc.read_bytes() == b"ENCRYPTED"
decrypt_file_binary_to(enc, dec, mode=0o644)
assert dec.read_bytes() == b"PLAINTEXT"
# Sanity: we invoked encrypt and decrypt.
assert any("--encrypt" in c for c in calls)
assert any("--decrypt" in c for c in calls)