diff --git a/CHANGELOG.md b/CHANGELOG.md index 2a4c39d..f2cb109 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.1.7 + + * Fix an attribution bug for certain files ending up in the wrong package/role. + # 0.1.6 * DRY up some code logic diff --git a/debian/changelog b/debian/changelog index a15c38a..eabdefc 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +enroll (0.1.7) unstable; urgency=medium + + * Fix an attribution bug for certain files ending up in the wrong package/role. + + -- Miguel Jacq Sun, 28 Dec 2025 18:30:00 +1100 + enroll (0.1.6) unstable; urgency=medium * DRY up some code logic diff --git a/enroll/harvest.py b/enroll/harvest.py index 56e5aed..d678b89 100644 --- a/enroll/harvest.py +++ b/enroll/harvest.py @@ -292,9 +292,26 @@ def _hint_names(unit: str, pkgs: Set[str]) -> Set[str]: def _add_pkgs_from_etc_topdirs( hints: Set[str], topdir_to_pkgs: Dict[str, Set[str]], pkgs: Set[str] ) -> None: + """Expand a service's package set using dpkg-owned /etc top-level dirs. + + This is a heuristic: many Debian packages split a service across multiple + packages (e.g. nginx + nginx-common) while sharing a single /etc/ + tree. + + We intentionally *avoid* using shared trees (e.g. /etc/cron.d, /etc/ssl, + /etc/apparmor.d) to expand package sets, because many unrelated packages + legitimately install files there. + + We also consider the common ".d" variant (e.g. hint "apparmor" -> + topdir "apparmor.d") so we can explicitly skip known shared trees. + """ + for h in hints: - for p in topdir_to_pkgs.get(h, set()): - pkgs.add(p) + for top in (h, f"{h}.d"): + if top in SHARED_ETC_TOPDIRS: + continue + for p in topdir_to_pkgs.get(top, set()): + pkgs.add(p) def _maybe_add_specific_paths(hints: Set[str]) -> List[str]: @@ -1132,10 +1149,27 @@ def harvest( pkg = dpkg_owner(path) if pkg: - svc_roles = pkg_to_service_roles.get(pkg) + svc_roles = sorted(set(pkg_to_service_roles.get(pkg, []))) if svc_roles: - # Deterministic tie-break: lowest role name. - return (sorted(set(svc_roles))[0], tag) + # If multiple service roles reference the same package, prefer + # the role that most closely matches the snippet name (basename + # or stem). This avoids surprising attributions such as an + # AppArmor loader role "claiming" a cron/logrotate snippet + # that is clearly named after another package/service. + if len(svc_roles) > 1: + # Direct role-name matches first. + for c in [pkg, *uniq]: + rn = _safe_name(c) + if rn in svc_roles: + return (rn, tag) + # Next, use the alias map if it points at one of the roles. + for c in [pkg, *uniq]: + hit = alias_ranked.get(_safe_name(c)) + if hit is not None and hit[1] in svc_roles: + return (hit[1], tag) + + # Deterministic fallback: lowest role name. + return (svc_roles[0], tag) pkg_role = pkg_name_to_role.get(pkg) if pkg_role: return (pkg_role, tag) diff --git a/pyproject.toml b/pyproject.toml index c7356bc..ca875e8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "enroll" -version = "0.1.6" +version = "0.1.7" description = "Enroll a server's running state retrospectively into Ansible" authors = ["Miguel Jacq "] license = "GPL-3.0-or-later" diff --git a/rpm/enroll.spec b/rpm/enroll.spec index 637dee1..f63a12c 100644 --- a/rpm/enroll.spec +++ b/rpm/enroll.spec @@ -1,4 +1,4 @@ -%global upstream_version 0.1.6 +%global upstream_version 0.1.7 Name: enroll Version: %{upstream_version} @@ -44,6 +44,8 @@ Enroll a server's running state retrospectively into Ansible. %changelog * Sun Dec 28 2025 Miguel Jacq - %{version}-%{release} +- Fix an attribution bug for certain files ending up in the wrong package/role. +* Sun Dec 28 2025 Miguel Jacq - %{version}-%{release} - DRY up some code logic - More test coverage * Sun Dec 28 2025 Miguel Jacq - %{version}-%{release} diff --git a/tests/test_harvest.py b/tests/test_harvest.py index a832c81..fa796f0 100644 --- a/tests/test_harvest.py +++ b/tests/test_harvest.py @@ -176,3 +176,110 @@ def test_harvest_dedup_manual_packages_and_builds_etc_custom( assert any(mf["path"] == "/usr/local/etc/myapp.conf" for mf in ul["managed_files"]) assert any(mf["path"] == "/usr/local/bin/myscript" for mf in ul["managed_files"]) assert all(mf["path"] != "/usr/local/bin/readme.txt" for mf in ul["managed_files"]) + + +def test_shared_cron_snippet_prefers_matching_role_over_lexicographic( + monkeypatch, tmp_path: Path +): + """Regression test for shared snippet routing. + + When multiple service roles reference the same owning package, we prefer the + role whose name matches the snippet/package (e.g. ntpsec) rather than a + lexicographic tie-break that could incorrectly pick another role. + """ + + bundle = tmp_path / "bundle" + + files = {"/etc/cron.d/ntpsec": b"# cron\n"} + dirs = {"/etc", "/etc/cron.d"} + + monkeypatch.setattr(h.os.path, "isfile", lambda p: p in files) + monkeypatch.setattr(h.os.path, "islink", lambda p: False) + monkeypatch.setattr(h.os.path, "isdir", lambda p: p in dirs) + monkeypatch.setattr(h.os.path, "exists", lambda p: p in files or p in dirs) + monkeypatch.setattr(h.os, "walk", lambda root: [("/etc/cron.d", [], ["ntpsec"])]) + + # Only include the cron snippet in the system capture set. + monkeypatch.setattr( + h, "_iter_system_capture_paths", lambda: [("/etc/cron.d/ntpsec", "system_cron")] + ) + + monkeypatch.setattr( + h, "list_enabled_services", lambda: ["apparmor.service", "ntpsec.service"] + ) + + def fake_unit_info(unit: str) -> UnitInfo: + if unit == "apparmor.service": + return UnitInfo( + name=unit, + fragment_path="/lib/systemd/system/apparmor.service", + dropin_paths=[], + env_files=[], + exec_paths=["/usr/sbin/apparmor"], + active_state="active", + sub_state="running", + unit_file_state="enabled", + condition_result=None, + ) + return UnitInfo( + name=unit, + fragment_path="/lib/systemd/system/ntpsec.service", + dropin_paths=[], + env_files=[], + exec_paths=["/usr/sbin/ntpd"], + active_state="active", + sub_state="running", + unit_file_state="enabled", + condition_result=None, + ) + + monkeypatch.setattr(h, "get_unit_info", fake_unit_info) + + # Dpkg /etc index: no owned /etc paths needed for this test. + monkeypatch.setattr( + h, + "build_dpkg_etc_index", + lambda: (set(), {}, {}, {}), + ) + monkeypatch.setattr(h, "parse_status_conffiles", lambda: {}) + monkeypatch.setattr(h, "read_pkg_md5sums", lambda pkg: {}) + monkeypatch.setattr(h, "file_md5", lambda path: "x") + monkeypatch.setattr(h, "list_manual_packages", lambda: []) + monkeypatch.setattr(h, "collect_non_system_users", lambda: []) + + # Make apparmor *also* claim the ntpsec package (simulates overly-broad + # package inference). The snippet routing should still prefer role 'ntpsec'. + def fake_dpkg_owner(p: str): + if p == "/etc/cron.d/ntpsec": + return "ntpsec" + if "apparmor" in p: + return "ntpsec" # intentionally misleading + if "ntpsec" in p or "ntpd" in p: + return "ntpsec" + return None + + monkeypatch.setattr(h, "dpkg_owner", fake_dpkg_owner) + monkeypatch.setattr(h, "stat_triplet", lambda p: ("root", "root", "0644")) + + def fake_copy(bundle_dir: str, role_name: str, abs_path: str, src_rel: str): + dst = Path(bundle_dir) / "artifacts" / role_name / src_rel + dst.parent.mkdir(parents=True, exist_ok=True) + dst.write_bytes(files[abs_path]) + + monkeypatch.setattr(h, "_copy_into_bundle", fake_copy) + + class AllowAll: + def deny_reason(self, path: str): + return None + + state_path = h.harvest(str(bundle), policy=AllowAll()) + st = json.loads(Path(state_path).read_text(encoding="utf-8")) + + # Cron snippet should end up attached to the ntpsec role, not apparmor. + svc_ntpsec = next(s for s in st["services"] if s["role_name"] == "ntpsec") + assert any(mf["path"] == "/etc/cron.d/ntpsec" for mf in svc_ntpsec["managed_files"]) + + svc_apparmor = next(s for s in st["services"] if s["role_name"] == "apparmor") + assert all( + mf["path"] != "/etc/cron.d/ntpsec" for mf in svc_apparmor["managed_files"] + )