347 lines
12 KiB
Python
347 lines
12 KiB
Python
import json
|
|
from pathlib import Path
|
|
|
|
import enroll.harvest as h
|
|
from enroll.platform import PlatformInfo
|
|
from enroll.systemd import UnitInfo
|
|
|
|
|
|
class AllowAllPolicy:
|
|
def deny_reason(self, path: str):
|
|
return None
|
|
|
|
|
|
class FakeBackend:
|
|
"""Minimal backend stub for harvest tests.
|
|
|
|
The real backends (dpkg/rpm) enumerate the live system (dpkg status, rpm
|
|
databases, etc). These tests instead control all backend behaviour.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
name: str,
|
|
owned_etc: set[str],
|
|
etc_owner_map: dict[str, str],
|
|
topdir_to_pkgs: dict[str, set[str]],
|
|
pkg_to_etc_paths: dict[str, list[str]],
|
|
manual_pkgs: list[str],
|
|
owner_fn,
|
|
modified_by_pkg: dict[str, dict[str, str]] | None = None,
|
|
pkg_config_prefixes: tuple[str, ...] = ("/etc/apt/",),
|
|
):
|
|
self.name = name
|
|
self.pkg_config_prefixes = pkg_config_prefixes
|
|
self._owned_etc = owned_etc
|
|
self._etc_owner_map = etc_owner_map
|
|
self._topdir_to_pkgs = topdir_to_pkgs
|
|
self._pkg_to_etc_paths = pkg_to_etc_paths
|
|
self._manual = manual_pkgs
|
|
self._owner_fn = owner_fn
|
|
self._modified_by_pkg = modified_by_pkg or {}
|
|
|
|
def build_etc_index(self):
|
|
return (
|
|
self._owned_etc,
|
|
self._etc_owner_map,
|
|
self._topdir_to_pkgs,
|
|
self._pkg_to_etc_paths,
|
|
)
|
|
|
|
def owner_of_path(self, path: str):
|
|
return self._owner_fn(path)
|
|
|
|
def list_manual_packages(self):
|
|
return list(self._manual)
|
|
|
|
def specific_paths_for_hints(self, hints: set[str]):
|
|
return []
|
|
|
|
def is_pkg_config_path(self, path: str) -> bool:
|
|
for pfx in self.pkg_config_prefixes:
|
|
if path == pfx or path.startswith(pfx):
|
|
return True
|
|
return False
|
|
|
|
def modified_paths(self, pkg: str, etc_paths: list[str]):
|
|
# Test-controlled; ignore etc_paths.
|
|
return dict(self._modified_by_pkg.get(pkg, {}))
|
|
|
|
|
|
def test_harvest_dedup_manual_packages_and_builds_etc_custom(
|
|
monkeypatch, tmp_path: Path
|
|
):
|
|
bundle = tmp_path / "bundle"
|
|
|
|
import os
|
|
|
|
real_isfile = os.path.isfile
|
|
real_isdir = os.path.isdir
|
|
real_exists = os.path.exists
|
|
real_islink = os.path.islink
|
|
|
|
# Fake filesystem: two /etc files exist, only one is package-owned.
|
|
# Also include some /usr/local files to populate usr_local_custom.
|
|
files = {
|
|
"/etc/openvpn/server.conf": b"server",
|
|
"/etc/default/keyboard": b"kbd",
|
|
"/usr/local/etc/myapp.conf": b"myapp=1\n",
|
|
"/usr/local/bin/myscript": b"#!/bin/sh\necho hi\n",
|
|
# non-executable text under /usr/local/bin should be skipped
|
|
"/usr/local/bin/readme.txt": b"hello\n",
|
|
}
|
|
dirs = {
|
|
"/etc",
|
|
"/etc/openvpn",
|
|
"/etc/default",
|
|
"/usr",
|
|
"/usr/local",
|
|
"/usr/local/etc",
|
|
"/usr/local/bin",
|
|
}
|
|
|
|
def fake_isfile(p: str) -> bool:
|
|
if p.startswith("/etc/") or p == "/etc":
|
|
return p in files
|
|
if p.startswith("/usr/local/"):
|
|
return p in files
|
|
return real_isfile(p)
|
|
|
|
def fake_isdir(p: str) -> bool:
|
|
if p.startswith("/etc"):
|
|
return p in dirs
|
|
if p.startswith("/usr/local") or p in ("/usr", "/usr/local"):
|
|
return p in dirs
|
|
return real_isdir(p)
|
|
|
|
def fake_islink(p: str) -> bool:
|
|
if p.startswith("/etc"):
|
|
return False
|
|
if p.startswith("/usr/local"):
|
|
return False
|
|
return real_islink(p)
|
|
|
|
def fake_exists(p: str) -> bool:
|
|
if p.startswith("/etc"):
|
|
return p in files or p in dirs
|
|
if p.startswith("/usr/local") or p in ("/usr", "/usr/local"):
|
|
return p in files or p in dirs
|
|
return real_exists(p)
|
|
|
|
def fake_walk(root: str):
|
|
if root == "/etc":
|
|
yield ("/etc/openvpn", [], ["server.conf"])
|
|
yield ("/etc/default", [], ["keyboard"])
|
|
elif root == "/etc/openvpn":
|
|
yield ("/etc/openvpn", [], ["server.conf"])
|
|
elif root == "/etc/default":
|
|
yield ("/etc/default", [], ["keyboard"])
|
|
elif root == "/usr/local/etc":
|
|
yield ("/usr/local/etc", [], ["myapp.conf"])
|
|
elif root == "/usr/local/bin":
|
|
yield ("/usr/local/bin", [], ["myscript", "readme.txt"])
|
|
else:
|
|
yield (root, [], [])
|
|
|
|
monkeypatch.setattr(h.os.path, "isfile", fake_isfile)
|
|
monkeypatch.setattr(h.os.path, "isdir", fake_isdir)
|
|
monkeypatch.setattr(h.os.path, "islink", fake_islink)
|
|
monkeypatch.setattr(h.os.path, "exists", fake_exists)
|
|
monkeypatch.setattr(h.os, "walk", fake_walk)
|
|
|
|
# Avoid real system access
|
|
monkeypatch.setattr(h, "list_enabled_services", lambda: ["openvpn.service"])
|
|
monkeypatch.setattr(h, "list_enabled_timers", lambda: [])
|
|
monkeypatch.setattr(
|
|
h,
|
|
"get_unit_info",
|
|
lambda unit: UnitInfo(
|
|
name=unit,
|
|
fragment_path="/lib/systemd/system/openvpn.service",
|
|
dropin_paths=[],
|
|
env_files=[],
|
|
exec_paths=["/usr/sbin/openvpn"],
|
|
active_state="inactive",
|
|
sub_state="dead",
|
|
unit_file_state="enabled",
|
|
condition_result=None,
|
|
),
|
|
)
|
|
|
|
# Package index: openvpn owns /etc/openvpn/server.conf; keyboard is unowned.
|
|
owned_etc = {"/etc/openvpn/server.conf"}
|
|
etc_owner_map = {"/etc/openvpn/server.conf": "openvpn"}
|
|
topdir_to_pkgs = {"openvpn": {"openvpn"}}
|
|
pkg_to_etc_paths = {"openvpn": ["/etc/openvpn/server.conf"], "curl": []}
|
|
|
|
backend = FakeBackend(
|
|
name="dpkg",
|
|
owned_etc=owned_etc,
|
|
etc_owner_map=etc_owner_map,
|
|
topdir_to_pkgs=topdir_to_pkgs,
|
|
pkg_to_etc_paths=pkg_to_etc_paths,
|
|
manual_pkgs=["openvpn", "curl"],
|
|
owner_fn=lambda p: "openvpn" if "openvpn" in (p or "") else None,
|
|
modified_by_pkg={
|
|
"openvpn": {"/etc/openvpn/server.conf": "modified_conffile"},
|
|
},
|
|
)
|
|
|
|
monkeypatch.setattr(
|
|
h, "detect_platform", lambda: PlatformInfo("debian", "dpkg", {})
|
|
)
|
|
monkeypatch.setattr(h, "get_backend", lambda info=None: backend)
|
|
|
|
monkeypatch.setattr(h, "collect_non_system_users", lambda: [])
|
|
|
|
def fake_stat_triplet(p: str):
|
|
if p == "/usr/local/bin/myscript":
|
|
return ("root", "root", "0755")
|
|
# /usr/local/bin/readme.txt remains non-executable
|
|
return ("root", "root", "0644")
|
|
|
|
monkeypatch.setattr(h, "stat_triplet", fake_stat_triplet)
|
|
|
|
# Avoid needing source files on disk by implementing our own bundle copier
|
|
def fake_copy(bundle_dir: str, role_name: str, abs_path: str, src_rel: str):
|
|
dst = Path(bundle_dir) / "artifacts" / role_name / src_rel
|
|
dst.parent.mkdir(parents=True, exist_ok=True)
|
|
dst.write_bytes(files.get(abs_path, b""))
|
|
|
|
monkeypatch.setattr(h, "_copy_into_bundle", fake_copy)
|
|
|
|
state_path = h.harvest(str(bundle), policy=AllowAllPolicy())
|
|
st = json.loads(Path(state_path).read_text(encoding="utf-8"))
|
|
|
|
assert "openvpn" in st["manual_packages"]
|
|
assert "curl" in st["manual_packages"]
|
|
assert "openvpn" in st["manual_packages_skipped"]
|
|
assert all(pr["package"] != "openvpn" for pr in st["package_roles"])
|
|
assert any(pr["package"] == "curl" for pr in st["package_roles"])
|
|
|
|
# Service role captured modified conffile
|
|
svc = st["services"][0]
|
|
assert svc["unit"] == "openvpn.service"
|
|
assert "openvpn" in svc["packages"]
|
|
assert any(mf["path"] == "/etc/openvpn/server.conf" for mf in svc["managed_files"])
|
|
|
|
# Unowned /etc/default/keyboard is attributed to etc_custom only
|
|
etc_custom = st["etc_custom"]
|
|
assert any(
|
|
mf["path"] == "/etc/default/keyboard" for mf in etc_custom["managed_files"]
|
|
)
|
|
|
|
# /usr/local content is attributed to usr_local_custom
|
|
ul = st["usr_local_custom"]
|
|
assert any(mf["path"] == "/usr/local/etc/myapp.conf" for mf in ul["managed_files"])
|
|
assert any(mf["path"] == "/usr/local/bin/myscript" for mf in ul["managed_files"])
|
|
assert all(mf["path"] != "/usr/local/bin/readme.txt" for mf in ul["managed_files"])
|
|
|
|
|
|
def test_shared_cron_snippet_prefers_matching_role_over_lexicographic(
|
|
monkeypatch, tmp_path: Path
|
|
):
|
|
"""Regression test for shared snippet routing.
|
|
|
|
When multiple service roles reference the same owning package, we prefer the
|
|
role whose name matches the snippet/package (e.g. ntpsec) rather than a
|
|
lexicographic tie-break that could incorrectly pick another role.
|
|
"""
|
|
|
|
bundle = tmp_path / "bundle"
|
|
|
|
files = {"/etc/cron.d/ntpsec": b"# cron\n"}
|
|
dirs = {"/etc", "/etc/cron.d"}
|
|
|
|
monkeypatch.setattr(h.os.path, "isfile", lambda p: p in files)
|
|
monkeypatch.setattr(h.os.path, "islink", lambda p: False)
|
|
monkeypatch.setattr(h.os.path, "isdir", lambda p: p in dirs)
|
|
monkeypatch.setattr(h.os.path, "exists", lambda p: p in files or p in dirs)
|
|
monkeypatch.setattr(h.os, "walk", lambda root: [("/etc/cron.d", [], ["ntpsec"])])
|
|
|
|
# Only include the cron snippet in the system capture set.
|
|
monkeypatch.setattr(
|
|
h, "_iter_system_capture_paths", lambda: [("/etc/cron.d/ntpsec", "system_cron")]
|
|
)
|
|
|
|
monkeypatch.setattr(
|
|
h, "list_enabled_services", lambda: ["apparmor.service", "ntpsec.service"]
|
|
)
|
|
monkeypatch.setattr(h, "list_enabled_timers", lambda: [])
|
|
|
|
def fake_unit_info(unit: str) -> UnitInfo:
|
|
if unit == "apparmor.service":
|
|
return UnitInfo(
|
|
name=unit,
|
|
fragment_path="/lib/systemd/system/apparmor.service",
|
|
dropin_paths=[],
|
|
env_files=[],
|
|
exec_paths=["/usr/sbin/apparmor"],
|
|
active_state="active",
|
|
sub_state="running",
|
|
unit_file_state="enabled",
|
|
condition_result=None,
|
|
)
|
|
return UnitInfo(
|
|
name=unit,
|
|
fragment_path="/lib/systemd/system/ntpsec.service",
|
|
dropin_paths=[],
|
|
env_files=[],
|
|
exec_paths=["/usr/sbin/ntpd"],
|
|
active_state="active",
|
|
sub_state="running",
|
|
unit_file_state="enabled",
|
|
condition_result=None,
|
|
)
|
|
|
|
monkeypatch.setattr(h, "get_unit_info", fake_unit_info)
|
|
|
|
# Make apparmor *also* claim the ntpsec package (simulates overly-broad
|
|
# package inference). The snippet routing should still prefer role 'ntpsec'.
|
|
def fake_owner(p: str):
|
|
if p == "/etc/cron.d/ntpsec":
|
|
return "ntpsec"
|
|
if "apparmor" in (p or ""):
|
|
return "ntpsec" # intentionally misleading
|
|
if "ntpsec" in (p or "") or "ntpd" in (p or ""):
|
|
return "ntpsec"
|
|
return None
|
|
|
|
backend = FakeBackend(
|
|
name="dpkg",
|
|
owned_etc=set(),
|
|
etc_owner_map={},
|
|
topdir_to_pkgs={},
|
|
pkg_to_etc_paths={},
|
|
manual_pkgs=[],
|
|
owner_fn=fake_owner,
|
|
modified_by_pkg={},
|
|
)
|
|
|
|
monkeypatch.setattr(
|
|
h, "detect_platform", lambda: PlatformInfo("debian", "dpkg", {})
|
|
)
|
|
monkeypatch.setattr(h, "get_backend", lambda info=None: backend)
|
|
|
|
monkeypatch.setattr(h, "stat_triplet", lambda p: ("root", "root", "0644"))
|
|
monkeypatch.setattr(h, "collect_non_system_users", lambda: [])
|
|
|
|
def fake_copy(bundle_dir: str, role_name: str, abs_path: str, src_rel: str):
|
|
dst = Path(bundle_dir) / "artifacts" / role_name / src_rel
|
|
dst.parent.mkdir(parents=True, exist_ok=True)
|
|
dst.write_bytes(files[abs_path])
|
|
|
|
monkeypatch.setattr(h, "_copy_into_bundle", fake_copy)
|
|
|
|
state_path = h.harvest(str(bundle), policy=AllowAllPolicy())
|
|
st = json.loads(Path(state_path).read_text(encoding="utf-8"))
|
|
|
|
# Cron snippet should end up attached to the ntpsec role, not apparmor.
|
|
svc_ntpsec = next(s for s in st["services"] if s["role_name"] == "ntpsec")
|
|
assert any(mf["path"] == "/etc/cron.d/ntpsec" for mf in svc_ntpsec["managed_files"])
|
|
|
|
svc_apparmor = next(s for s in st["services"] if s["role_name"] == "apparmor")
|
|
assert all(
|
|
mf["path"] != "/etc/cron.d/ntpsec" for mf in svc_apparmor["managed_files"]
|
|
)
|