Better attribution of config files to parent service/role (not systemd helpers)
All checks were successful
CI / test (push) Successful in 4m51s
Lint / test (push) Successful in 27s
Trivy / test (push) Successful in 15s

This commit is contained in:
Miguel Jacq 2025-12-29 17:19:59 +11:00
parent 081739fd19
commit f01603dac4
Signed by: mig5
GPG key ID: 59B3F0C24135C6A9

View file

@ -676,6 +676,10 @@ def harvest(
backend.build_etc_index()
)
# Global de-duplication across roles: each absolute path is captured at most once.
# This avoids multiple Ansible roles managing the same destination file.
captured_global: Set[str] = set()
# -------------------------
# Service roles
# -------------------------
@ -685,8 +689,45 @@ def harvest(
service_role_aliases: Dict[str, Set[str]] = {}
# De-dupe per-role captures (avoids duplicate tasks in manifest generation).
seen_by_role: Dict[str, Set[str]] = {}
for unit in list_enabled_services():
# Managed/excluded lists keyed by role so helper services can attribute shared
# configuration to their parent service role.
managed_by_role: Dict[str, List[ManagedFile]] = {}
excluded_by_role: Dict[str, List[ExcludedFile]] = {}
enabled_services = list_enabled_services()
enabled_set = set(enabled_services)
def _service_sort_key(unit: str) -> tuple[int, str, str]:
# Prefer "parent" services over helpers (e.g. NetworkManager.service before
# NetworkManager-dispatcher.service) so shared config lands in the main role.
base = unit.removesuffix(".service")
base = base.split("@", 1)[0]
return (base.count("-"), base.lower(), unit.lower())
def _parent_service_unit(unit: str) -> Optional[str]:
# If unit name contains '-' segments, treat dashed prefixes as potential parents.
# Example: NetworkManager-dispatcher.service -> NetworkManager.service (if enabled).
if not unit.endswith(".service"):
return None
base = unit.removesuffix(".service")
base = base.split("@", 1)[0]
parts = base.split("-")
for i in range(len(parts) - 1, 0, -1):
cand = "-".join(parts[:i]) + ".service"
if cand in enabled_set:
return cand
return None
parent_unit_for: Dict[str, str] = {}
for u in enabled_services:
pu = _parent_service_unit(u)
if pu:
parent_unit_for[u] = pu
for unit in sorted(enabled_services, key=_service_sort_key):
role = _role_name_from_unit(unit)
parent_unit = parent_unit_for.get(unit)
parent_role = _role_name_from_unit(parent_unit) if parent_unit else None
try:
ui = get_unit_info(unit)
@ -695,6 +736,8 @@ def harvest(
# shared snippets can still be attributed to this role by name.
service_role_aliases.setdefault(role, _hint_names(unit, set()) | {role})
seen_by_role.setdefault(role, set())
managed = managed_by_role.setdefault(role, [])
excluded = excluded_by_role.setdefault(role, [])
service_snaps.append(
ServiceSnapshot(
unit=unit,
@ -704,8 +747,8 @@ def harvest(
sub_state=None,
unit_file_state=None,
condition_result=None,
managed_files=[],
excluded=[],
managed_files=managed,
excluded=excluded,
notes=[str(e)],
)
)
@ -713,8 +756,8 @@ def harvest(
pkgs: Set[str] = set()
notes: List[str] = []
excluded: List[ExcludedFile] = []
managed: List[ManagedFile] = []
excluded = excluded_by_role.setdefault(role, [])
managed = managed_by_role.setdefault(role, [])
candidates: Dict[str, str] = {}
if ui.fragment_path:
@ -810,18 +853,31 @@ def harvest(
# De-dupe within this role while capturing. This also avoids emitting
# duplicate Ansible tasks for the same destination path.
role_seen = seen_by_role.setdefault(role, set())
# Attribute shared /etc config to the parent service role when this unit looks
# like a helper (e.g. NetworkManager-dispatcher.service -> NetworkManager.service).
for path, reason in sorted(candidates.items()):
dest_role = role
if (
parent_role
and path.startswith("/etc/")
and reason not in ("systemd_dropin", "systemd_envfile")
):
dest_role = parent_role
dest_managed = managed_by_role.setdefault(dest_role, [])
dest_excluded = excluded_by_role.setdefault(dest_role, [])
dest_seen = seen_by_role.setdefault(dest_role, set())
_capture_file(
bundle_dir=bundle_dir,
role_name=role,
role_name=dest_role,
abs_path=path,
reason=reason,
policy=policy,
path_filter=path_filter,
managed_out=managed,
excluded_out=excluded,
seen_role=role_seen,
managed_out=dest_managed,
excluded_out=dest_excluded,
seen_role=dest_seen,
seen_global=captured_global,
)
service_snaps.append(
@ -857,7 +913,7 @@ def harvest(
s.unit: s for s in service_snaps
}
for t in enabled_timers:
for t in sorted(enabled_timers):
try:
ti = get_timer_info(t)
except Exception: # nosec
@ -895,6 +951,7 @@ def harvest(
managed_out=snap.managed_files,
excluded_out=snap.excluded,
seen_role=role_seen,
seen_global=captured_global,
)
continue
@ -935,7 +992,7 @@ def harvest(
manual_pkgs_skipped: List[str] = []
pkg_snaps: List[PackageSnapshot] = []
for pkg in manual_pkgs:
for pkg in sorted(manual_pkgs):
if pkg in covered_by_services:
manual_pkgs_skipped.append(pkg)
continue
@ -997,6 +1054,7 @@ def harvest(
managed_out=managed,
excluded_out=excluded,
seen_role=role_seen,
seen_global=captured_global,
)
if not pkg_to_etc_paths.get(pkg, []) and not managed:
@ -1060,6 +1118,7 @@ def harvest(
managed_out=users_managed,
excluded_out=users_excluded,
seen_role=users_role_seen,
seen_global=captured_global,
)
users_snapshot = UsersSnapshot(
@ -1098,6 +1157,7 @@ def harvest(
managed_out=apt_managed,
excluded_out=apt_excluded,
seen_role=apt_role_seen,
seen_global=captured_global,
)
elif backend.name == "rpm":
dnf_role_seen = seen_by_role.setdefault(dnf_role_name, set())
@ -1112,6 +1172,7 @@ def harvest(
managed_out=dnf_managed,
excluded_out=dnf_excluded,
seen_role=dnf_role_seen,
seen_global=captured_global,
)
apt_config_snapshot = AptConfigSnapshot(
@ -1135,20 +1196,9 @@ def harvest(
etc_managed: List[ManagedFile] = []
etc_role_name = "etc_custom"
# Build a set of files already captured by other roles.
already: Set[str] = set()
for s in service_snaps:
for mf in s.managed_files:
already.add(mf.path)
for p in pkg_snaps:
for mf in p.managed_files:
already.add(mf.path)
for mf in users_managed:
already.add(mf.path)
for mf in apt_managed:
already.add(mf.path)
for mf in dnf_managed:
already.add(mf.path)
# Files already captured by earlier roles. Use the global set so we never
# end up with the same destination path managed by multiple roles.
already: Set[str] = captured_global
# Maps for re-attributing shared snippets (cron.d/logrotate.d) to existing roles.
svc_by_role: Dict[str, ServiceSnapshot] = {s.role_name: s for s in service_snaps}
@ -1288,7 +1338,7 @@ def harvest(
managed_out=managed_out,
excluded_out=excluded_out,
seen_role=role_seen,
seen_global=already,
seen_global=captured_global,
)
# Walk /etc for remaining unowned config-ish files
@ -1327,7 +1377,7 @@ def harvest(
managed_out=managed_out,
excluded_out=excluded_out,
seen_role=role_seen,
seen_global=already,
seen_global=captured_global,
):
scanned += 1
if scanned >= MAX_FILES_CAP:
@ -1396,6 +1446,7 @@ def harvest(
managed_out=ul_managed,
excluded_out=ul_excluded,
seen_role=role_seen,
seen_global=captured_global,
metadata=(owner, group, mode),
):
already_all.add(path)
@ -1470,6 +1521,7 @@ def harvest(
managed_out=extra_managed,
excluded_out=extra_excluded,
seen_role=extra_role_seen,
seen_global=captured_global,
):
already_all.add(path)