From 4660a0703e2471d35a3b6153d295b9195a7c4c93 Mon Sep 17 00:00:00 2001 From: Miguel Jacq Date: Thu, 18 Dec 2025 17:11:04 +1100 Subject: [PATCH] Include files from `/usr/local/bin` and `/usr/local/etc` in harvest (assuming they aren't binaries or symlinks) and store in `usr_local_custom` role, similar to `etc_custom`. --- CHANGELOG.md | 5 ++ debian/changelog | 7 ++ enroll/diff.py | 6 ++ enroll/harvest.py | 106 ++++++++++++++++++++++++++ enroll/ignore.py | 5 ++ enroll/manifest.py | 102 +++++++++++++++++++++++++ pyproject.toml | 2 +- tests/test_diff_usr_local_custom.py | 111 ++++++++++++++++++++++++++++ tests/test_harvest.py | 41 +++++++++- tests/test_manifest.py | 73 ++++++++++++++++++ tests/test_misc_coverage.py | 96 ++++++++++++++++++++++++ 11 files changed, 551 insertions(+), 3 deletions(-) create mode 100644 tests/test_diff_usr_local_custom.py create mode 100644 tests/test_misc_coverage.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 81eed41..0e80a13 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +# 0.1.2 + + * Include files from `/usr/local/bin` and `/usr/local/etc` in harvest (assuming they aren't binaries or + symlinks) and store in `usr_local_custom` role, similar to `etc_custom`. + # 0.1.1 * Add `diff` subcommand which can compare two harvests and send email or webhook notifications in different diff --git a/debian/changelog b/debian/changelog index 0cc5861..0b16cfa 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,10 @@ +enroll (0.1.2) unstable; urgency=medium + + * Include files from `/usr/local/bin` and `/usr/local/etc` in harvest (assuming they aren't binaries or + symlinks) and store in `usr_local_custom` role, similar to `etc_custom`. + + -- Miguel Jacq Thu, 18 Dec 2025 17:07:00 +1100 + enroll (0.1.1) unstable; urgency=medium * Add `diff` subcommand which can compare two harvests and send email or webhook notifications in different diff --git a/enroll/diff.py b/enroll/diff.py index 9b396fc..e2861c9 100644 --- a/enroll/diff.py +++ b/enroll/diff.py @@ -190,6 +190,12 @@ def _iter_managed_files(state: Dict[str, Any]) -> Iterable[Tuple[str, Dict[str, for mf in ec.get("managed_files", []) or []: yield str(ec_role), mf + # usr_local_custom + ul = state.get("usr_local_custom") or {} + ul_role = ul.get("role_name") or "usr_local_custom" + for mf in ul.get("managed_files", []) or []: + yield str(ul_role), mf + def _file_index(bundle_dir: Path, state: Dict[str, Any]) -> Dict[str, FileRec]: """Return mapping of absolute path -> FileRec. diff --git a/enroll/harvest.py b/enroll/harvest.py index ef93903..659bebc 100644 --- a/enroll/harvest.py +++ b/enroll/harvest.py @@ -78,6 +78,14 @@ class EtcCustomSnapshot: notes: List[str] +@dataclass +class UsrLocalCustomSnapshot: + role_name: str + managed_files: List[ManagedFile] + excluded: List[ExcludedFile] + notes: List[str] + + ALLOWED_UNOWNED_EXTS = { ".conf", ".cfg", @@ -701,6 +709,103 @@ def harvest( notes=etc_notes, ) + # ------------------------- + # usr_local_custom role (/usr/local/etc + /usr/local/bin scripts) + # ------------------------- + ul_notes: List[str] = [] + ul_excluded: List[ExcludedFile] = [] + ul_managed: List[ManagedFile] = [] + ul_role_name = "usr_local_custom" + + # Extend the already-captured set with etc_custom. + already_all: Set[str] = set(already) + for mf in etc_managed: + already_all.add(mf.path) + + def _scan_usr_local_tree( + root: str, *, require_executable: bool, cap: int, reason: str + ) -> None: + scanned = 0 + if not os.path.isdir(root): + return + for dirpath, _, filenames in os.walk(root): + for fn in filenames: + path = os.path.join(dirpath, fn) + if path in already_all: + continue + if not os.path.isfile(path) or os.path.islink(path): + continue + if require_executable: + try: + owner, group, mode = stat_triplet(path) + except OSError: + ul_excluded.append(ExcludedFile(path=path, reason="unreadable")) + continue + try: + if (int(mode, 8) & 0o111) == 0: + continue + except ValueError: + # If mode parsing fails, be conservative and skip. + continue + else: + try: + owner, group, mode = stat_triplet(path) + except OSError: + ul_excluded.append(ExcludedFile(path=path, reason="unreadable")) + continue + + deny = policy.deny_reason(path) + if deny: + ul_excluded.append(ExcludedFile(path=path, reason=deny)) + continue + + src_rel = path.lstrip("/") + try: + _copy_into_bundle(bundle_dir, ul_role_name, path, src_rel) + except OSError: + ul_excluded.append(ExcludedFile(path=path, reason="unreadable")) + continue + + ul_managed.append( + ManagedFile( + path=path, + src_rel=src_rel, + owner=owner, + group=group, + mode=mode, + reason=reason, + ) + ) + + already_all.add(path) + scanned += 1 + if scanned >= cap: + ul_notes.append(f"Reached file cap ({cap}) while scanning {root}.") + return + + # /usr/local/etc: capture all non-binary regular files (filtered by IgnorePolicy) + _scan_usr_local_tree( + "/usr/local/etc", + require_executable=False, + cap=2000, + reason="usr_local_etc_custom", + ) + + # /usr/local/bin: capture executable scripts only (skip non-executable text) + _scan_usr_local_tree( + "/usr/local/bin", + require_executable=True, + cap=2000, + reason="usr_local_bin_script", + ) + + usr_local_custom_snapshot = UsrLocalCustomSnapshot( + role_name=ul_role_name, + managed_files=ul_managed, + excluded=ul_excluded, + notes=ul_notes, + ) + state = { "host": {"hostname": os.uname().nodename, "os": "debian"}, "users": asdict(users_snapshot), @@ -709,6 +814,7 @@ def harvest( "manual_packages_skipped": manual_pkgs_skipped, "package_roles": [asdict(p) for p in pkg_snaps], "etc_custom": asdict(etc_custom_snapshot), + "usr_local_custom": asdict(usr_local_custom_snapshot), } state_path = os.path.join(bundle_dir, "state.json") diff --git a/enroll/ignore.py b/enroll/ignore.py index d8ffce9..93ba423 100644 --- a/enroll/ignore.py +++ b/enroll/ignore.py @@ -23,6 +23,11 @@ DEFAULT_DENY_GLOBS = [ "/etc/gshadow", "/etc/*shadow", "/etc/letsencrypt/*", + "/usr/local/etc/ssl/private/*", + "/usr/local/etc/ssh/ssh_host_*", + "/usr/local/etc/*shadow", + "/usr/local/etc/*gshadow", + "/usr/local/etc/letsencrypt/*", ] SENSITIVE_CONTENT_PATTERNS = [ diff --git a/enroll/manifest.py b/enroll/manifest.py index e55418c..6909c5c 100644 --- a/enroll/manifest.py +++ b/enroll/manifest.py @@ -629,6 +629,7 @@ def _manifest_from_bundle_dir( package_roles: List[Dict[str, Any]] = state.get("package_roles", []) users_snapshot: Dict[str, Any] = state.get("users", {}) etc_custom_snapshot: Dict[str, Any] = state.get("etc_custom", {}) + usr_local_custom_snapshot: Dict[str, Any] = state.get("usr_local_custom", {}) site_mode = fqdn is not None and fqdn != "" @@ -661,6 +662,7 @@ def _manifest_from_bundle_dir( manifested_users_roles: List[str] = [] manifested_etc_custom_roles: List[str] = [] + manifested_usr_local_custom_roles: List[str] = [] manifested_service_roles: List[str] = [] manifested_pkg_roles: List[str] = [] @@ -999,6 +1001,105 @@ Unowned /etc config files not attributed to packages or services. # ------------------------- + # ------------------------- + + # ------------------------- + # usr_local_custom role (/usr/local/etc + /usr/local/bin scripts) + # ------------------------- + if usr_local_custom_snapshot and usr_local_custom_snapshot.get("managed_files"): + role = usr_local_custom_snapshot.get("role_name", "usr_local_custom") + role_dir = os.path.join(roles_root, role) + _write_role_scaffold(role_dir) + + var_prefix = role + + managed_files = usr_local_custom_snapshot.get("managed_files", []) + excluded = usr_local_custom_snapshot.get("excluded", []) + notes = usr_local_custom_snapshot.get("notes", []) + + templated, jt_vars = _jinjify_managed_files( + bundle_dir, + role, + role_dir, + managed_files, + jt_exe=jt_exe, + jt_enabled=jt_enabled, + overwrite_templates=not site_mode, + ) + + # Copy only the non-templated artifacts (templates live in the role). + if site_mode: + _copy_artifacts( + bundle_dir, + role, + _host_role_files_dir(out_dir, fqdn or "", role), + exclude_rels=templated, + ) + else: + _copy_artifacts( + bundle_dir, + role, + os.path.join(role_dir, "files"), + exclude_rels=templated, + ) + + files_var = _build_managed_files_var( + managed_files, + templated, + notify_other=None, + notify_systemd=None, + ) + + jt_map = _yaml_load_mapping(jt_vars) if jt_vars.strip() else {} + vars_map: Dict[str, Any] = {f"{var_prefix}_managed_files": files_var} + vars_map = _merge_mappings_overwrite(vars_map, jt_map) + + if site_mode: + _write_role_defaults(role_dir, {f"{var_prefix}_managed_files": []}) + _write_hostvars(out_dir, fqdn or "", role, vars_map) + else: + _write_role_defaults(role_dir, vars_map) + + tasks = "---\n" + _render_generic_files_tasks( + var_prefix, include_restart_notify=False + ) + with open( + os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8" + ) as f: + f.write(tasks.rstrip() + "\n") + + # No handlers needed for this role, but keep a valid YAML document. + with open( + os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8" + ) as f: + f.write("---\n") + + with open( + os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8" + ) as f: + f.write("---\ndependencies: []\n") + + readme = ( + """# usr_local_custom\n\n""" + "Unowned /usr/local files (scripts in /usr/local/bin and config under /usr/local/etc).\n\n" + "## Managed files\n" + + ("\n".join([f"- {mf.get('path')}" for mf in managed_files]) or "- (none)") + + "\n\n## Excluded\n" + + ( + "\n".join([f"- {e.get('path')} ({e.get('reason')})" for e in excluded]) + or "- (none)" + ) + + "\n\n## Notes\n" + + ("\n".join([f"- {n}" for n in notes]) or "- (none)") + + "\n" + ) + with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f: + f.write(readme) + + manifested_usr_local_custom_roles.append(role) + + # ------------------------- + # ------------------------- # Service roles # ------------------------- @@ -1310,6 +1411,7 @@ Generated for package `{pkg}`. manifested_pkg_roles + manifested_service_roles + manifested_etc_custom_roles + + manifested_usr_local_custom_roles + manifested_users_roles ) diff --git a/pyproject.toml b/pyproject.toml index 5231ad9..b5a07ab 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "enroll" -version = "0.1.1" +version = "0.1.2" description = "Enroll a server's running state retrospectively into Ansible" authors = ["Miguel Jacq "] license = "GPL-3.0-or-later" diff --git a/tests/test_diff_usr_local_custom.py b/tests/test_diff_usr_local_custom.py new file mode 100644 index 0000000..88d594f --- /dev/null +++ b/tests/test_diff_usr_local_custom.py @@ -0,0 +1,111 @@ +import json +from pathlib import Path + +from enroll.diff import compare_harvests + + +def _write_bundle(root: Path, state: dict, artifacts: dict[str, bytes]) -> None: + root.mkdir(parents=True, exist_ok=True) + (root / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") + for rel, data in artifacts.items(): + p = root / rel + p.parent.mkdir(parents=True, exist_ok=True) + p.write_bytes(data) + + +def test_diff_includes_usr_local_custom_files(tmp_path: Path): + old = tmp_path / "old" + new = tmp_path / "new" + + old_state = { + "host": {"hostname": "h1", "os": "debian"}, + "users": { + "role_name": "users", + "users": [], + "managed_files": [], + "excluded": [], + "notes": [], + }, + "services": [], + "package_roles": [], + "manual_packages": ["curl"], + "manual_packages_skipped": [], + "etc_custom": { + "role_name": "etc_custom", + "managed_files": [], + "excluded": [], + "notes": [], + }, + "usr_local_custom": { + "role_name": "usr_local_custom", + "managed_files": [ + { + "path": "/usr/local/etc/myapp.conf", + "src_rel": "usr/local/etc/myapp.conf", + "owner": "root", + "group": "root", + "mode": "0644", + "reason": "usr_local_etc_custom", + } + ], + "excluded": [], + "notes": [], + }, + } + new_state = { + **old_state, + "manual_packages": ["curl", "htop"], + "usr_local_custom": { + "role_name": "usr_local_custom", + "managed_files": [ + { + "path": "/usr/local/etc/myapp.conf", + "src_rel": "usr/local/etc/myapp.conf", + "owner": "root", + "group": "root", + "mode": "0644", + "reason": "usr_local_etc_custom", + }, + { + "path": "/usr/local/bin/myscript", + "src_rel": "usr/local/bin/myscript", + "owner": "root", + "group": "root", + "mode": "0755", + "reason": "usr_local_bin_script", + }, + ], + "excluded": [], + "notes": [], + }, + } + + _write_bundle( + old, + old_state, + { + "artifacts/usr_local_custom/usr/local/etc/myapp.conf": b"myapp=1\n", + }, + ) + _write_bundle( + new, + new_state, + { + "artifacts/usr_local_custom/usr/local/etc/myapp.conf": b"myapp=2\n", + "artifacts/usr_local_custom/usr/local/bin/myscript": b"#!/bin/sh\necho hi\n", + }, + ) + + report, has_changes = compare_harvests(str(old), str(new)) + assert has_changes is True + + # Packages: htop was added. + assert report["packages"]["added"] == ["htop"] + + # Files: /usr/local/etc/myapp.conf should be detected as changed (content sha differs). + changed_paths = {c["path"] for c in report["files"]["changed"]} + assert "/usr/local/etc/myapp.conf" in changed_paths + + # Files: new script was added. + added_paths = {a["path"] for a in report["files"]["added"]} + assert "/usr/local/bin/myscript" in added_paths diff --git a/tests/test_harvest.py b/tests/test_harvest.py index 8e19fb4..a832c81 100644 --- a/tests/test_harvest.py +++ b/tests/test_harvest.py @@ -23,30 +23,51 @@ def test_harvest_dedup_manual_packages_and_builds_etc_custom( real_islink = os.path.islink # Fake filesystem: two /etc files exist, only one is dpkg-owned. + # Also include some /usr/local files to populate usr_local_custom. files = { "/etc/openvpn/server.conf": b"server", "/etc/default/keyboard": b"kbd", + "/usr/local/etc/myapp.conf": b"myapp=1\n", + "/usr/local/bin/myscript": b"#!/bin/sh\necho hi\n", + # non-executable text under /usr/local/bin should be skipped + "/usr/local/bin/readme.txt": b"hello\n", + } + dirs = { + "/etc", + "/etc/openvpn", + "/etc/default", + "/usr", + "/usr/local", + "/usr/local/etc", + "/usr/local/bin", } - dirs = {"/etc", "/etc/openvpn", "/etc/default"} def fake_isfile(p: str) -> bool: if p.startswith("/etc/") or p == "/etc": return p in files + if p.startswith("/usr/local/"): + return p in files return real_isfile(p) def fake_isdir(p: str) -> bool: if p.startswith("/etc"): return p in dirs + if p.startswith("/usr/local") or p in ("/usr", "/usr/local"): + return p in dirs return real_isdir(p) def fake_islink(p: str) -> bool: if p.startswith("/etc"): return False + if p.startswith("/usr/local"): + return False return real_islink(p) def fake_exists(p: str) -> bool: if p.startswith("/etc"): return p in files or p in dirs + if p.startswith("/usr/local") or p in ("/usr", "/usr/local"): + return p in files or p in dirs return real_exists(p) def fake_walk(root: str): @@ -57,6 +78,10 @@ def test_harvest_dedup_manual_packages_and_builds_etc_custom( yield ("/etc/openvpn", [], ["server.conf"]) elif root == "/etc/default": yield ("/etc/default", [], ["keyboard"]) + elif root == "/usr/local/etc": + yield ("/usr/local/etc", [], ["myapp.conf"]) + elif root == "/usr/local/bin": + yield ("/usr/local/bin", [], ["myscript", "readme.txt"]) else: yield (root, [], []) @@ -109,7 +134,13 @@ def test_harvest_dedup_manual_packages_and_builds_etc_custom( monkeypatch.setattr(h, "list_manual_packages", lambda: ["openvpn", "curl"]) monkeypatch.setattr(h, "collect_non_system_users", lambda: []) - monkeypatch.setattr(h, "stat_triplet", lambda p: ("root", "root", "0644")) + def fake_stat_triplet(p: str): + if p == "/usr/local/bin/myscript": + return ("root", "root", "0755") + # /usr/local/bin/readme.txt remains non-executable + return ("root", "root", "0644") + + monkeypatch.setattr(h, "stat_triplet", fake_stat_triplet) # Avoid needing source files on disk by implementing our own bundle copier def fake_copy(bundle_dir: str, role_name: str, abs_path: str, src_rel: str): @@ -139,3 +170,9 @@ def test_harvest_dedup_manual_packages_and_builds_etc_custom( assert any( mf["path"] == "/etc/default/keyboard" for mf in etc_custom["managed_files"] ) + + # /usr/local content is attributed to usr_local_custom + ul = st["usr_local_custom"] + assert any(mf["path"] == "/usr/local/etc/myapp.conf" for mf in ul["managed_files"]) + assert any(mf["path"] == "/usr/local/bin/myscript" for mf in ul["managed_files"]) + assert all(mf["path"] != "/usr/local/bin/readme.txt" for mf in ul["managed_files"]) diff --git a/tests/test_manifest.py b/tests/test_manifest.py index 99040b0..92c3dfc 100644 --- a/tests/test_manifest.py +++ b/tests/test_manifest.py @@ -47,6 +47,29 @@ def test_manifest_writes_roles_and_playbook_with_clean_when(tmp_path: Path): "excluded": [], "notes": [], }, + "usr_local_custom": { + "role_name": "usr_local_custom", + "managed_files": [ + { + "path": "/usr/local/etc/myapp.conf", + "src_rel": "usr/local/etc/myapp.conf", + "owner": "root", + "group": "root", + "mode": "0644", + "reason": "usr_local_etc_custom", + }, + { + "path": "/usr/local/bin/myscript", + "src_rel": "usr/local/bin/myscript", + "owner": "root", + "group": "root", + "mode": "0755", + "reason": "usr_local_bin_script", + }, + ], + "excluded": [], + "notes": [], + }, "services": [ { "unit": "foo.service", @@ -92,6 +115,26 @@ def test_manifest_writes_roles_and_playbook_with_clean_when(tmp_path: Path): "kbd", encoding="utf-8" ) + # Create artifacts for usr_local_custom files so copy works + (bundle / "artifacts" / "usr_local_custom" / "usr" / "local" / "etc").mkdir( + parents=True, exist_ok=True + ) + ( + bundle + / "artifacts" + / "usr_local_custom" + / "usr" + / "local" + / "etc" + / "myapp.conf" + ).write_text("myapp=1\n", encoding="utf-8") + (bundle / "artifacts" / "usr_local_custom" / "usr" / "local" / "bin").mkdir( + parents=True, exist_ok=True + ) + ( + bundle / "artifacts" / "usr_local_custom" / "usr" / "local" / "bin" / "myscript" + ).write_text("#!/bin/sh\necho hi\n", encoding="utf-8") + manifest(str(bundle), str(out)) # Service role: systemd management should be gated on foo_manage_unit and a probe. @@ -119,6 +162,7 @@ def test_manifest_writes_roles_and_playbook_with_clean_when(tmp_path: Path): pb = (out / "playbook.yml").read_text(encoding="utf-8") assert "- users" in pb assert "- etc_custom" in pb + assert "- usr_local_custom" in pb assert "- curl" in pb assert "- foo" in pb @@ -168,6 +212,21 @@ def test_manifest_site_mode_creates_host_inventory_and_raw_files(tmp_path: Path) "excluded": [], "notes": [], }, + "usr_local_custom": { + "role_name": "usr_local_custom", + "managed_files": [ + { + "path": "/usr/local/etc/myapp.conf", + "src_rel": "usr/local/etc/myapp.conf", + "owner": "root", + "group": "root", + "mode": "0644", + "reason": "usr_local_etc_custom", + } + ], + "excluded": [], + "notes": [], + }, "services": [ { "unit": "foo.service", @@ -197,6 +256,20 @@ def test_manifest_site_mode_creates_host_inventory_and_raw_files(tmp_path: Path) bundle.mkdir(parents=True, exist_ok=True) (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") + # Artifacts for usr_local_custom file so copy works. + (bundle / "artifacts" / "usr_local_custom" / "usr" / "local" / "etc").mkdir( + parents=True, exist_ok=True + ) + ( + bundle + / "artifacts" + / "usr_local_custom" + / "usr" + / "local" + / "etc" + / "myapp.conf" + ).write_text("myapp=1\n", encoding="utf-8") + manifest(str(bundle), str(out), fqdn=fqdn) # Host playbook exists. diff --git a/tests/test_misc_coverage.py b/tests/test_misc_coverage.py new file mode 100644 index 0000000..b4250fc --- /dev/null +++ b/tests/test_misc_coverage.py @@ -0,0 +1,96 @@ +import stat +from pathlib import Path + +import pytest + +from enroll.cache import _safe_component, new_harvest_cache_dir +from enroll.ignore import IgnorePolicy +from enroll.sopsutil import ( + SopsError, + _pgp_arg, + decrypt_file_binary_to, + encrypt_file_binary, +) + + +def test_safe_component_sanitizes_and_bounds_length(): + assert _safe_component(" ") == "unknown" + assert _safe_component("a/b c") == "a_b_c" + assert _safe_component("x" * 200) == "x" * 64 + + +def test_new_harvest_cache_dir_uses_xdg_cache_home(tmp_path: Path, monkeypatch): + monkeypatch.setenv("XDG_CACHE_HOME", str(tmp_path / "xdg")) + hc = new_harvest_cache_dir(hint="my host/01") + assert hc.dir.exists() + assert "my_host_01" in hc.dir.name + assert str(hc.dir).startswith(str(tmp_path / "xdg")) + # best-effort: ensure directory is not world-readable on typical FS + try: + mode = stat.S_IMODE(hc.dir.stat().st_mode) + assert mode & 0o077 == 0 + except OSError: + pass + + +def test_ignore_policy_denies_binary_and_sensitive_content(tmp_path: Path): + p_bin = tmp_path / "binfile" + p_bin.write_bytes(b"abc\x00def") + assert IgnorePolicy().deny_reason(str(p_bin)) == "binary_like" + + p_secret = tmp_path / "secret.conf" + p_secret.write_text("password=foo\n", encoding="utf-8") + assert IgnorePolicy().deny_reason(str(p_secret)) == "sensitive_content" + + # dangerous mode disables heuristic scanning (but still checks file-ness/size) + assert IgnorePolicy(dangerous=True).deny_reason(str(p_secret)) is None + + +def test_ignore_policy_denies_usr_local_shadow_by_glob(): + # This should short-circuit before stat() (path doesn't need to exist). + assert IgnorePolicy().deny_reason("/usr/local/etc/shadow") == "denied_path" + + +def test_sops_pgp_arg_and_encrypt_decrypt_roundtrip(tmp_path: Path, monkeypatch): + assert _pgp_arg([" ABC ", "DEF"]) == "ABC,DEF" + with pytest.raises(SopsError): + _pgp_arg([]) + + # Stub out sops and subprocess. + import enroll.sopsutil as s + + monkeypatch.setattr(s, "require_sops_cmd", lambda: "sops") + + class R: + def __init__(self, rc: int, out: bytes, err: bytes = b""): + self.returncode = rc + self.stdout = out + self.stderr = err + + calls = [] + + def fake_run(cmd, capture_output, check): + calls.append(cmd) + # Return a deterministic payload so we can assert file writes. + if "--encrypt" in cmd: + return R(0, b"ENCRYPTED") + if "--decrypt" in cmd: + return R(0, b"PLAINTEXT") + return R(1, b"", b"bad") + + monkeypatch.setattr(s.subprocess, "run", fake_run) + + src = tmp_path / "src.bin" + src.write_bytes(b"x") + enc = tmp_path / "out.sops" + dec = tmp_path / "out.bin" + + encrypt_file_binary(src, enc, pgp_fingerprints=["ABC"], mode=0o600) + assert enc.read_bytes() == b"ENCRYPTED" + + decrypt_file_binary_to(enc, dec, mode=0o644) + assert dec.read_bytes() == b"PLAINTEXT" + + # Sanity: we invoked encrypt and decrypt. + assert any("--encrypt" in c for c in calls) + assert any("--decrypt" in c for c in calls)