Fix an attribution bug for certain files ending up in the wrong package/role.
This commit is contained in:
parent
921801caa6
commit
8c19473e18
6 changed files with 160 additions and 7 deletions
|
|
@ -1,3 +1,7 @@
|
||||||
|
# 0.1.7
|
||||||
|
|
||||||
|
* Fix an attribution bug for certain files ending up in the wrong package/role.
|
||||||
|
|
||||||
# 0.1.6
|
# 0.1.6
|
||||||
|
|
||||||
* DRY up some code logic
|
* DRY up some code logic
|
||||||
|
|
|
||||||
6
debian/changelog
vendored
6
debian/changelog
vendored
|
|
@ -1,3 +1,9 @@
|
||||||
|
enroll (0.1.7) unstable; urgency=medium
|
||||||
|
|
||||||
|
* Fix an attribution bug for certain files ending up in the wrong package/role.
|
||||||
|
|
||||||
|
-- Miguel Jacq <mig@mig5.net> Sun, 28 Dec 2025 18:30:00 +1100
|
||||||
|
|
||||||
enroll (0.1.6) unstable; urgency=medium
|
enroll (0.1.6) unstable; urgency=medium
|
||||||
|
|
||||||
* DRY up some code logic
|
* DRY up some code logic
|
||||||
|
|
|
||||||
|
|
@ -292,8 +292,25 @@ def _hint_names(unit: str, pkgs: Set[str]) -> Set[str]:
|
||||||
def _add_pkgs_from_etc_topdirs(
|
def _add_pkgs_from_etc_topdirs(
|
||||||
hints: Set[str], topdir_to_pkgs: Dict[str, Set[str]], pkgs: Set[str]
|
hints: Set[str], topdir_to_pkgs: Dict[str, Set[str]], pkgs: Set[str]
|
||||||
) -> None:
|
) -> None:
|
||||||
|
"""Expand a service's package set using dpkg-owned /etc top-level dirs.
|
||||||
|
|
||||||
|
This is a heuristic: many Debian packages split a service across multiple
|
||||||
|
packages (e.g. nginx + nginx-common) while sharing a single /etc/<name>
|
||||||
|
tree.
|
||||||
|
|
||||||
|
We intentionally *avoid* using shared trees (e.g. /etc/cron.d, /etc/ssl,
|
||||||
|
/etc/apparmor.d) to expand package sets, because many unrelated packages
|
||||||
|
legitimately install files there.
|
||||||
|
|
||||||
|
We also consider the common ".d" variant (e.g. hint "apparmor" ->
|
||||||
|
topdir "apparmor.d") so we can explicitly skip known shared trees.
|
||||||
|
"""
|
||||||
|
|
||||||
for h in hints:
|
for h in hints:
|
||||||
for p in topdir_to_pkgs.get(h, set()):
|
for top in (h, f"{h}.d"):
|
||||||
|
if top in SHARED_ETC_TOPDIRS:
|
||||||
|
continue
|
||||||
|
for p in topdir_to_pkgs.get(top, set()):
|
||||||
pkgs.add(p)
|
pkgs.add(p)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -1132,10 +1149,27 @@ def harvest(
|
||||||
|
|
||||||
pkg = dpkg_owner(path)
|
pkg = dpkg_owner(path)
|
||||||
if pkg:
|
if pkg:
|
||||||
svc_roles = pkg_to_service_roles.get(pkg)
|
svc_roles = sorted(set(pkg_to_service_roles.get(pkg, [])))
|
||||||
if svc_roles:
|
if svc_roles:
|
||||||
# Deterministic tie-break: lowest role name.
|
# If multiple service roles reference the same package, prefer
|
||||||
return (sorted(set(svc_roles))[0], tag)
|
# the role that most closely matches the snippet name (basename
|
||||||
|
# or stem). This avoids surprising attributions such as an
|
||||||
|
# AppArmor loader role "claiming" a cron/logrotate snippet
|
||||||
|
# that is clearly named after another package/service.
|
||||||
|
if len(svc_roles) > 1:
|
||||||
|
# Direct role-name matches first.
|
||||||
|
for c in [pkg, *uniq]:
|
||||||
|
rn = _safe_name(c)
|
||||||
|
if rn in svc_roles:
|
||||||
|
return (rn, tag)
|
||||||
|
# Next, use the alias map if it points at one of the roles.
|
||||||
|
for c in [pkg, *uniq]:
|
||||||
|
hit = alias_ranked.get(_safe_name(c))
|
||||||
|
if hit is not None and hit[1] in svc_roles:
|
||||||
|
return (hit[1], tag)
|
||||||
|
|
||||||
|
# Deterministic fallback: lowest role name.
|
||||||
|
return (svc_roles[0], tag)
|
||||||
pkg_role = pkg_name_to_role.get(pkg)
|
pkg_role = pkg_name_to_role.get(pkg)
|
||||||
if pkg_role:
|
if pkg_role:
|
||||||
return (pkg_role, tag)
|
return (pkg_role, tag)
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
[tool.poetry]
|
[tool.poetry]
|
||||||
name = "enroll"
|
name = "enroll"
|
||||||
version = "0.1.6"
|
version = "0.1.7"
|
||||||
description = "Enroll a server's running state retrospectively into Ansible"
|
description = "Enroll a server's running state retrospectively into Ansible"
|
||||||
authors = ["Miguel Jacq <mig@mig5.net>"]
|
authors = ["Miguel Jacq <mig@mig5.net>"]
|
||||||
license = "GPL-3.0-or-later"
|
license = "GPL-3.0-or-later"
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
%global upstream_version 0.1.6
|
%global upstream_version 0.1.7
|
||||||
|
|
||||||
Name: enroll
|
Name: enroll
|
||||||
Version: %{upstream_version}
|
Version: %{upstream_version}
|
||||||
|
|
@ -44,6 +44,8 @@ Enroll a server's running state retrospectively into Ansible.
|
||||||
|
|
||||||
%changelog
|
%changelog
|
||||||
* Sun Dec 28 2025 Miguel Jacq <mig@mig5.net> - %{version}-%{release}
|
* Sun Dec 28 2025 Miguel Jacq <mig@mig5.net> - %{version}-%{release}
|
||||||
|
- Fix an attribution bug for certain files ending up in the wrong package/role.
|
||||||
|
* Sun Dec 28 2025 Miguel Jacq <mig@mig5.net> - %{version}-%{release}
|
||||||
- DRY up some code logic
|
- DRY up some code logic
|
||||||
- More test coverage
|
- More test coverage
|
||||||
* Sun Dec 28 2025 Miguel Jacq <mig@mig5.net> - %{version}-%{release}
|
* Sun Dec 28 2025 Miguel Jacq <mig@mig5.net> - %{version}-%{release}
|
||||||
|
|
|
||||||
|
|
@ -176,3 +176,110 @@ def test_harvest_dedup_manual_packages_and_builds_etc_custom(
|
||||||
assert any(mf["path"] == "/usr/local/etc/myapp.conf" for mf in ul["managed_files"])
|
assert any(mf["path"] == "/usr/local/etc/myapp.conf" for mf in ul["managed_files"])
|
||||||
assert any(mf["path"] == "/usr/local/bin/myscript" for mf in ul["managed_files"])
|
assert any(mf["path"] == "/usr/local/bin/myscript" for mf in ul["managed_files"])
|
||||||
assert all(mf["path"] != "/usr/local/bin/readme.txt" for mf in ul["managed_files"])
|
assert all(mf["path"] != "/usr/local/bin/readme.txt" for mf in ul["managed_files"])
|
||||||
|
|
||||||
|
|
||||||
|
def test_shared_cron_snippet_prefers_matching_role_over_lexicographic(
|
||||||
|
monkeypatch, tmp_path: Path
|
||||||
|
):
|
||||||
|
"""Regression test for shared snippet routing.
|
||||||
|
|
||||||
|
When multiple service roles reference the same owning package, we prefer the
|
||||||
|
role whose name matches the snippet/package (e.g. ntpsec) rather than a
|
||||||
|
lexicographic tie-break that could incorrectly pick another role.
|
||||||
|
"""
|
||||||
|
|
||||||
|
bundle = tmp_path / "bundle"
|
||||||
|
|
||||||
|
files = {"/etc/cron.d/ntpsec": b"# cron\n"}
|
||||||
|
dirs = {"/etc", "/etc/cron.d"}
|
||||||
|
|
||||||
|
monkeypatch.setattr(h.os.path, "isfile", lambda p: p in files)
|
||||||
|
monkeypatch.setattr(h.os.path, "islink", lambda p: False)
|
||||||
|
monkeypatch.setattr(h.os.path, "isdir", lambda p: p in dirs)
|
||||||
|
monkeypatch.setattr(h.os.path, "exists", lambda p: p in files or p in dirs)
|
||||||
|
monkeypatch.setattr(h.os, "walk", lambda root: [("/etc/cron.d", [], ["ntpsec"])])
|
||||||
|
|
||||||
|
# Only include the cron snippet in the system capture set.
|
||||||
|
monkeypatch.setattr(
|
||||||
|
h, "_iter_system_capture_paths", lambda: [("/etc/cron.d/ntpsec", "system_cron")]
|
||||||
|
)
|
||||||
|
|
||||||
|
monkeypatch.setattr(
|
||||||
|
h, "list_enabled_services", lambda: ["apparmor.service", "ntpsec.service"]
|
||||||
|
)
|
||||||
|
|
||||||
|
def fake_unit_info(unit: str) -> UnitInfo:
|
||||||
|
if unit == "apparmor.service":
|
||||||
|
return UnitInfo(
|
||||||
|
name=unit,
|
||||||
|
fragment_path="/lib/systemd/system/apparmor.service",
|
||||||
|
dropin_paths=[],
|
||||||
|
env_files=[],
|
||||||
|
exec_paths=["/usr/sbin/apparmor"],
|
||||||
|
active_state="active",
|
||||||
|
sub_state="running",
|
||||||
|
unit_file_state="enabled",
|
||||||
|
condition_result=None,
|
||||||
|
)
|
||||||
|
return UnitInfo(
|
||||||
|
name=unit,
|
||||||
|
fragment_path="/lib/systemd/system/ntpsec.service",
|
||||||
|
dropin_paths=[],
|
||||||
|
env_files=[],
|
||||||
|
exec_paths=["/usr/sbin/ntpd"],
|
||||||
|
active_state="active",
|
||||||
|
sub_state="running",
|
||||||
|
unit_file_state="enabled",
|
||||||
|
condition_result=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
monkeypatch.setattr(h, "get_unit_info", fake_unit_info)
|
||||||
|
|
||||||
|
# Dpkg /etc index: no owned /etc paths needed for this test.
|
||||||
|
monkeypatch.setattr(
|
||||||
|
h,
|
||||||
|
"build_dpkg_etc_index",
|
||||||
|
lambda: (set(), {}, {}, {}),
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(h, "parse_status_conffiles", lambda: {})
|
||||||
|
monkeypatch.setattr(h, "read_pkg_md5sums", lambda pkg: {})
|
||||||
|
monkeypatch.setattr(h, "file_md5", lambda path: "x")
|
||||||
|
monkeypatch.setattr(h, "list_manual_packages", lambda: [])
|
||||||
|
monkeypatch.setattr(h, "collect_non_system_users", lambda: [])
|
||||||
|
|
||||||
|
# Make apparmor *also* claim the ntpsec package (simulates overly-broad
|
||||||
|
# package inference). The snippet routing should still prefer role 'ntpsec'.
|
||||||
|
def fake_dpkg_owner(p: str):
|
||||||
|
if p == "/etc/cron.d/ntpsec":
|
||||||
|
return "ntpsec"
|
||||||
|
if "apparmor" in p:
|
||||||
|
return "ntpsec" # intentionally misleading
|
||||||
|
if "ntpsec" in p or "ntpd" in p:
|
||||||
|
return "ntpsec"
|
||||||
|
return None
|
||||||
|
|
||||||
|
monkeypatch.setattr(h, "dpkg_owner", fake_dpkg_owner)
|
||||||
|
monkeypatch.setattr(h, "stat_triplet", lambda p: ("root", "root", "0644"))
|
||||||
|
|
||||||
|
def fake_copy(bundle_dir: str, role_name: str, abs_path: str, src_rel: str):
|
||||||
|
dst = Path(bundle_dir) / "artifacts" / role_name / src_rel
|
||||||
|
dst.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
dst.write_bytes(files[abs_path])
|
||||||
|
|
||||||
|
monkeypatch.setattr(h, "_copy_into_bundle", fake_copy)
|
||||||
|
|
||||||
|
class AllowAll:
|
||||||
|
def deny_reason(self, path: str):
|
||||||
|
return None
|
||||||
|
|
||||||
|
state_path = h.harvest(str(bundle), policy=AllowAll())
|
||||||
|
st = json.loads(Path(state_path).read_text(encoding="utf-8"))
|
||||||
|
|
||||||
|
# Cron snippet should end up attached to the ntpsec role, not apparmor.
|
||||||
|
svc_ntpsec = next(s for s in st["services"] if s["role_name"] == "ntpsec")
|
||||||
|
assert any(mf["path"] == "/etc/cron.d/ntpsec" for mf in svc_ntpsec["managed_files"])
|
||||||
|
|
||||||
|
svc_apparmor = next(s for s in st["services"] if s["role_name"] == "apparmor")
|
||||||
|
assert all(
|
||||||
|
mf["path"] != "/etc/cron.d/ntpsec" for mf in svc_apparmor["managed_files"]
|
||||||
|
)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue