Compare commits

..

2 commits

Author SHA1 Message Date
081739fd19
Fix tests
All checks were successful
CI / test (push) Successful in 5m7s
Lint / test (push) Successful in 29s
Trivy / test (push) Successful in 18s
2025-12-29 16:35:21 +11:00
043802e800
Refactor state structure and capture versions of packages 2025-12-29 16:10:27 +11:00
10 changed files with 749 additions and 318 deletions

View file

@ -63,6 +63,50 @@ def list_manual_packages() -> List[str]:
return sorted(set(pkgs)) return sorted(set(pkgs))
def list_installed_packages() -> Dict[str, List[Dict[str, str]]]:
"""Return mapping of installed package name -> installed instances.
Uses dpkg-query and is expected to work on Debian/Ubuntu-like systems.
Output format:
{"pkg": [{"version": "...", "arch": "..."}, ...], ...}
"""
try:
p = subprocess.run(
[
"dpkg-query",
"-W",
"-f=${Package}\t${Version}\t${Architecture}\n",
],
text=True,
capture_output=True,
check=False,
) # nosec
except Exception:
return {}
out: Dict[str, List[Dict[str, str]]] = {}
for raw in (p.stdout or "").splitlines():
line = raw.strip("\n")
if not line:
continue
parts = line.split("\t")
if len(parts) < 3:
continue
name, ver, arch = parts[0].strip(), parts[1].strip(), parts[2].strip()
if not name:
continue
out.setdefault(name, []).append({"version": ver, "arch": arch})
# Stable ordering for deterministic JSON dumps.
for k in list(out.keys()):
out[k] = sorted(
out[k], key=lambda x: (x.get("arch") or "", x.get("version") or "")
)
return out
def build_dpkg_etc_index( def build_dpkg_etc_index(
info_dir: str = "/var/lib/dpkg/info", info_dir: str = "/var/lib/dpkg/info",
) -> Tuple[Set[str], Dict[str, str], Dict[str, Set[str]], Dict[str, List[str]]]: ) -> Tuple[Set[str], Dict[str, str], Dict[str, Set[str]], Dict[str, List[str]]]:

View file

@ -126,18 +126,62 @@ def _load_state(bundle_dir: Path) -> Dict[str, Any]:
return json.load(f) return json.load(f)
def _packages_inventory(state: Dict[str, Any]) -> Dict[str, Any]:
return (state.get("inventory") or {}).get("packages") or {}
def _all_packages(state: Dict[str, Any]) -> List[str]: def _all_packages(state: Dict[str, Any]) -> List[str]:
pkgs = set(state.get("manual_packages", []) or []) return sorted(_packages_inventory(state).keys())
pkgs |= set(state.get("manual_packages_skipped", []) or [])
for s in state.get("services", []) or []:
for p in s.get("packages", []) or []: def _roles(state: Dict[str, Any]) -> Dict[str, Any]:
pkgs.add(p) return state.get("roles") or {}
return sorted(pkgs)
def _pkg_version_key(entry: Dict[str, Any]) -> Optional[str]:
"""Return a stable string used for version comparison."""
installs = entry.get("installations") or []
if isinstance(installs, list) and installs:
parts: List[str] = []
for inst in installs:
if not isinstance(inst, dict):
continue
arch = str(inst.get("arch") or "")
ver = str(inst.get("version") or "")
if not ver:
continue
parts.append(f"{arch}:{ver}" if arch else ver)
if parts:
return "|".join(sorted(parts))
v = entry.get("version")
if v:
return str(v)
return None
def _pkg_version_display(entry: Dict[str, Any]) -> Optional[str]:
v = entry.get("version")
if v:
return str(v)
installs = entry.get("installations") or []
if isinstance(installs, list) and installs:
parts: List[str] = []
for inst in installs:
if not isinstance(inst, dict):
continue
arch = str(inst.get("arch") or "")
ver = str(inst.get("version") or "")
if not ver:
continue
parts.append(f"{ver} ({arch})" if arch else ver)
if parts:
return ", ".join(sorted(parts))
return None
def _service_units(state: Dict[str, Any]) -> Dict[str, Dict[str, Any]]: def _service_units(state: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
out: Dict[str, Dict[str, Any]] = {} out: Dict[str, Dict[str, Any]] = {}
for s in state.get("services", []) or []: for s in _roles(state).get("services") or []:
unit = s.get("unit") unit = s.get("unit")
if unit: if unit:
out[str(unit)] = s out[str(unit)] = s
@ -145,7 +189,7 @@ def _service_units(state: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
def _users_by_name(state: Dict[str, Any]) -> Dict[str, Dict[str, Any]]: def _users_by_name(state: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
users = (state.get("users") or {}).get("users") or [] users = (_roles(state).get("users") or {}).get("users") or []
out: Dict[str, Dict[str, Any]] = {} out: Dict[str, Dict[str, Any]] = {}
for u in users: for u in users:
name = u.get("name") name = u.get("name")
@ -167,43 +211,43 @@ class FileRec:
def _iter_managed_files(state: Dict[str, Any]) -> Iterable[Tuple[str, Dict[str, Any]]]: def _iter_managed_files(state: Dict[str, Any]) -> Iterable[Tuple[str, Dict[str, Any]]]:
# Services # Services
for s in state.get("services", []) or []: for s in _roles(state).get("services") or []:
role = s.get("role_name") or "unknown" role = s.get("role_name") or "unknown"
for mf in s.get("managed_files", []) or []: for mf in s.get("managed_files", []) or []:
yield str(role), mf yield str(role), mf
# Package roles # Package roles
for p in state.get("package_roles", []) or []: for p in _roles(state).get("packages") or []:
role = p.get("role_name") or "unknown" role = p.get("role_name") or "unknown"
for mf in p.get("managed_files", []) or []: for mf in p.get("managed_files", []) or []:
yield str(role), mf yield str(role), mf
# Users # Users
u = state.get("users") or {} u = _roles(state).get("users") or {}
u_role = u.get("role_name") or "users" u_role = u.get("role_name") or "users"
for mf in u.get("managed_files", []) or []: for mf in u.get("managed_files", []) or []:
yield str(u_role), mf yield str(u_role), mf
# apt_config # apt_config
ac = state.get("apt_config") or {} ac = _roles(state).get("apt_config") or {}
ac_role = ac.get("role_name") or "apt_config" ac_role = ac.get("role_name") or "apt_config"
for mf in ac.get("managed_files", []) or []: for mf in ac.get("managed_files", []) or []:
yield str(ac_role), mf yield str(ac_role), mf
# etc_custom # etc_custom
ec = state.get("etc_custom") or {} ec = _roles(state).get("etc_custom") or {}
ec_role = ec.get("role_name") or "etc_custom" ec_role = ec.get("role_name") or "etc_custom"
for mf in ec.get("managed_files", []) or []: for mf in ec.get("managed_files", []) or []:
yield str(ec_role), mf yield str(ec_role), mf
# usr_local_custom # usr_local_custom
ul = state.get("usr_local_custom") or {} ul = _roles(state).get("usr_local_custom") or {}
ul_role = ul.get("role_name") or "usr_local_custom" ul_role = ul.get("role_name") or "usr_local_custom"
for mf in ul.get("managed_files", []) or []: for mf in ul.get("managed_files", []) or []:
yield str(ul_role), mf yield str(ul_role), mf
# extra_paths # extra_paths
xp = state.get("extra_paths") or {} xp = _roles(state).get("extra_paths") or {}
xp_role = xp.get("role_name") or "extra_paths" xp_role = xp.get("role_name") or "extra_paths"
for mf in xp.get("managed_files", []) or []: for mf in xp.get("managed_files", []) or []:
yield str(xp_role), mf yield str(xp_role), mf
@ -261,12 +305,28 @@ def compare_harvests(
old_state = _load_state(old_b.dir) old_state = _load_state(old_b.dir)
new_state = _load_state(new_b.dir) new_state = _load_state(new_b.dir)
old_pkgs = set(_all_packages(old_state)) old_inv = _packages_inventory(old_state)
new_pkgs = set(_all_packages(new_state)) new_inv = _packages_inventory(new_state)
old_pkgs = set(old_inv.keys())
new_pkgs = set(new_inv.keys())
pkgs_added = sorted(new_pkgs - old_pkgs) pkgs_added = sorted(new_pkgs - old_pkgs)
pkgs_removed = sorted(old_pkgs - new_pkgs) pkgs_removed = sorted(old_pkgs - new_pkgs)
pkgs_version_changed: List[Dict[str, Any]] = []
for pkg in sorted(old_pkgs & new_pkgs):
a = old_inv.get(pkg) or {}
b = new_inv.get(pkg) or {}
if _pkg_version_key(a) != _pkg_version_key(b):
pkgs_version_changed.append(
{
"package": pkg,
"old": _pkg_version_display(a),
"new": _pkg_version_display(b),
}
)
old_units = _service_units(old_state) old_units = _service_units(old_state)
new_units = _service_units(new_state) new_units = _service_units(new_state)
units_added = sorted(set(new_units) - set(old_units)) units_added = sorted(set(new_units) - set(old_units))
@ -380,6 +440,7 @@ def compare_harvests(
[ [
pkgs_added, pkgs_added,
pkgs_removed, pkgs_removed,
pkgs_version_changed,
units_added, units_added,
units_removed, units_removed,
units_changed, units_changed,
@ -413,7 +474,11 @@ def compare_harvests(
"state_mtime": _mtime_iso(new_b.state_path), "state_mtime": _mtime_iso(new_b.state_path),
"host": (new_state.get("host") or {}).get("hostname"), "host": (new_state.get("host") or {}).get("hostname"),
}, },
"packages": {"added": pkgs_added, "removed": pkgs_removed}, "packages": {
"added": pkgs_added,
"removed": pkgs_removed,
"version_changed": pkgs_version_changed,
},
"services": { "services": {
"enabled_added": units_added, "enabled_added": units_added,
"enabled_removed": units_removed, "enabled_removed": units_removed,
@ -471,10 +536,13 @@ def _report_text(report: Dict[str, Any]) -> str:
lines.append("\nPackages") lines.append("\nPackages")
lines.append(f" added: {len(pk.get('added', []) or [])}") lines.append(f" added: {len(pk.get('added', []) or [])}")
lines.append(f" removed: {len(pk.get('removed', []) or [])}") lines.append(f" removed: {len(pk.get('removed', []) or [])}")
lines.append(f" version_changed: {len(pk.get('version_changed', []) or [])}")
for p in pk.get("added", []) or []: for p in pk.get("added", []) or []:
lines.append(f" + {p}") lines.append(f" + {p}")
for p in pk.get("removed", []) or []: for p in pk.get("removed", []) or []:
lines.append(f" - {p}") lines.append(f" - {p}")
for ch in pk.get("version_changed", []) or []:
lines.append(f" ~ {ch.get('package')}: {ch.get('old')} -> {ch.get('new')}")
sv = report.get("services", {}) sv = report.get("services", {})
lines.append("\nServices (enabled systemd units)") lines.append("\nServices (enabled systemd units)")
@ -542,6 +610,7 @@ def _report_text(report: Dict[str, Any]) -> str:
[ [
(pk.get("added") or []), (pk.get("added") or []),
(pk.get("removed") or []), (pk.get("removed") or []),
(pk.get("version_changed") or []),
(sv.get("enabled_added") or []), (sv.get("enabled_added") or []),
(sv.get("enabled_removed") or []), (sv.get("enabled_removed") or []),
(sv.get("changed") or []), (sv.get("changed") or []),
@ -578,6 +647,12 @@ def _report_markdown(report: Dict[str, Any]) -> str:
for p in pk.get("removed", []) or []: for p in pk.get("removed", []) or []:
out.append(f" - `- {p}`\n") out.append(f" - `- {p}`\n")
out.append(f"- Version changed: {len(pk.get('version_changed', []) or [])}\n")
for ch in pk.get("version_changed", []) or []:
out.append(
f" - `~ {ch.get('package')}`: `{ch.get('old')}` → `{ch.get('new')}`\n"
)
sv = report.get("services", {}) sv = report.get("services", {})
out.append("## Services (enabled systemd units)\n") out.append("## Services (enabled systemd units)\n")
if sv.get("enabled_added"): if sv.get("enabled_added"):
@ -672,6 +747,7 @@ def _report_markdown(report: Dict[str, Any]) -> str:
[ [
(pk.get("added") or []), (pk.get("added") or []),
(pk.get("removed") or []), (pk.get("removed") or []),
(pk.get("version_changed") or []),
(sv.get("enabled_added") or []), (sv.get("enabled_added") or []),
(sv.get("enabled_removed") or []), (sv.get("enabled_removed") or []),
(sv.get("changed") or []), (sv.get("changed") or []),

View file

@ -5,6 +5,7 @@ import json
import os import os
import re import re
import shutil import shutil
import time
from dataclasses import dataclass, asdict from dataclasses import dataclass, asdict
from typing import Dict, List, Optional, Set from typing import Dict, List, Optional, Set
@ -1481,9 +1482,60 @@ def harvest(
notes=extra_notes, notes=extra_notes,
) )
# -------------------------
# Inventory: packages (SBOM-ish)
# -------------------------
installed = backend.installed_packages() or {}
manual_set: Set[str] = set(manual_pkgs or [])
pkg_units: Dict[str, Set[str]] = {}
pkg_roles_map: Dict[str, Set[str]] = {}
for svc in service_snaps:
for p in svc.packages:
pkg_units.setdefault(p, set()).add(svc.unit)
pkg_roles_map.setdefault(p, set()).add(svc.role_name)
pkg_role_names: Dict[str, List[str]] = {}
for ps in pkg_snaps:
pkg_roles_map.setdefault(ps.package, set()).add(ps.role_name)
pkg_role_names.setdefault(ps.package, []).append(ps.role_name)
pkg_names: Set[str] = set()
pkg_names |= manual_set
pkg_names |= set(pkg_units.keys())
pkg_names |= {ps.package for ps in pkg_snaps}
packages_inventory: Dict[str, Dict[str, object]] = {}
for pkg in sorted(pkg_names):
installs = installed.get(pkg, []) or []
arches = sorted({i.get("arch") for i in installs if i.get("arch")})
vers = sorted({i.get("version") for i in installs if i.get("version")})
version: Optional[str] = vers[0] if len(vers) == 1 else None
observed: List[Dict[str, str]] = []
if pkg in manual_set:
observed.append({"kind": "user_installed"})
for unit in sorted(pkg_units.get(pkg, set())):
observed.append({"kind": "systemd_unit", "ref": unit})
for rn in sorted(set(pkg_role_names.get(pkg, []))):
observed.append({"kind": "package_role", "ref": rn})
roles = sorted(pkg_roles_map.get(pkg, set()))
packages_inventory[pkg] = {
"version": version,
"arches": arches,
"installations": installs,
"observed_via": observed,
"roles": roles,
}
state = { state = {
"enroll": { "enroll": {
"version": get_enroll_version(), "version": get_enroll_version(),
"harvest_time": time.time_ns(),
}, },
"host": { "host": {
"hostname": os.uname().nodename, "hostname": os.uname().nodename,
@ -1491,16 +1543,19 @@ def harvest(
"pkg_backend": backend.name, "pkg_backend": backend.name,
"os_release": platform.os_release, "os_release": platform.os_release,
}, },
"inventory": {
"packages": packages_inventory,
},
"roles": {
"users": asdict(users_snapshot), "users": asdict(users_snapshot),
"services": [asdict(s) for s in service_snaps], "services": [asdict(s) for s in service_snaps],
"manual_packages": manual_pkgs, "packages": [asdict(p) for p in pkg_snaps],
"manual_packages_skipped": manual_pkgs_skipped,
"package_roles": [asdict(p) for p in pkg_snaps],
"apt_config": asdict(apt_config_snapshot), "apt_config": asdict(apt_config_snapshot),
"dnf_config": asdict(dnf_config_snapshot), "dnf_config": asdict(dnf_config_snapshot),
"etc_custom": asdict(etc_custom_snapshot), "etc_custom": asdict(etc_custom_snapshot),
"usr_local_custom": asdict(usr_local_custom_snapshot), "usr_local_custom": asdict(usr_local_custom_snapshot),
"extra_paths": asdict(extra_paths_snapshot), "extra_paths": asdict(extra_paths_snapshot),
},
} }
state_path = os.path.join(bundle_dir, "state.json") state_path = os.path.join(bundle_dir, "state.json")

View file

@ -271,9 +271,7 @@ def _write_hostvars(site_root: str, fqdn: str, role: str, data: Dict[str, Any])
merged = _merge_mappings_overwrite(existing_map, data) merged = _merge_mappings_overwrite(existing_map, data)
out = "# Generated by enroll (host-specific vars)\n---\n" + _yaml_dump_mapping( out = "---\n" + _yaml_dump_mapping(merged, sort_keys=True)
merged, sort_keys=True
)
with open(path, "w", encoding="utf-8") as f: with open(path, "w", encoding="utf-8") as f:
f.write(out) f.write(out)
@ -392,9 +390,7 @@ def _render_generic_files_tasks(
# Using first_found makes roles work in both modes: # Using first_found makes roles work in both modes:
# - site-mode: inventory/host_vars/<host>/<role>/.files/... # - site-mode: inventory/host_vars/<host>/<role>/.files/...
# - non-site: roles/<role>/files/... # - non-site: roles/<role>/files/...
return f"""# Generated by enroll return f"""- name: Deploy any systemd unit files (templates)
- name: Deploy any systemd unit files (templates)
ansible.builtin.template: ansible.builtin.template:
src: "{{{{ item.src_rel }}}}.j2" src: "{{{{ item.src_rel }}}}.j2"
dest: "{{{{ item.dest }}}}" dest: "{{{{ item.dest }}}}"
@ -477,9 +473,7 @@ def _render_install_packages_tasks(role: str, var_prefix: str) -> str:
generic `package` module. This keeps generated roles usable on both generic `package` module. This keeps generated roles usable on both
Debian-like and RPM-like systems. Debian-like and RPM-like systems.
""" """
return f"""# Generated by enroll return f"""- name: Install packages for {role} (APT)
- name: Install packages for {role} (APT)
ansible.builtin.apt: ansible.builtin.apt:
name: "{{{{ {var_prefix}_packages | default([]) }}}}" name: "{{{{ {var_prefix}_packages | default([]) }}}}"
state: present state: present
@ -672,14 +666,16 @@ def _manifest_from_bundle_dir(
with open(state_path, "r", encoding="utf-8") as f: with open(state_path, "r", encoding="utf-8") as f:
state = json.load(f) state = json.load(f)
services: List[Dict[str, Any]] = state.get("services", []) roles: Dict[str, Any] = state.get("roles") or {}
package_roles: List[Dict[str, Any]] = state.get("package_roles", [])
users_snapshot: Dict[str, Any] = state.get("users", {}) services: List[Dict[str, Any]] = roles.get("services", [])
apt_config_snapshot: Dict[str, Any] = state.get("apt_config", {}) package_roles: List[Dict[str, Any]] = roles.get("packages", [])
dnf_config_snapshot: Dict[str, Any] = state.get("dnf_config", {}) users_snapshot: Dict[str, Any] = roles.get("users", {})
etc_custom_snapshot: Dict[str, Any] = state.get("etc_custom", {}) apt_config_snapshot: Dict[str, Any] = roles.get("apt_config", {})
usr_local_custom_snapshot: Dict[str, Any] = state.get("usr_local_custom", {}) dnf_config_snapshot: Dict[str, Any] = roles.get("dnf_config", {})
extra_paths_snapshot: Dict[str, Any] = state.get("extra_paths", {}) etc_custom_snapshot: Dict[str, Any] = roles.get("etc_custom", {})
usr_local_custom_snapshot: Dict[str, Any] = roles.get("usr_local_custom", {})
extra_paths_snapshot: Dict[str, Any] = roles.get("extra_paths", {})
site_mode = fqdn is not None and fqdn != "" site_mode = fqdn is not None and fqdn != ""
@ -839,7 +835,6 @@ def _manifest_from_bundle_dir(
# tasks (data-driven) # tasks (data-driven)
users_tasks = """--- users_tasks = """---
# Generated by enroll
- name: Ensure groups exist - name: Ensure groups exist
ansible.builtin.group: ansible.builtin.group:
@ -996,7 +991,7 @@ Generated non-system user accounts and SSH public material.
else: else:
_write_role_defaults(role_dir, vars_map) _write_role_defaults(role_dir, vars_map)
tasks = """---\n""" + _render_generic_files_tasks( tasks = "---\n" + _render_generic_files_tasks(
var_prefix, include_restart_notify=False var_prefix, include_restart_notify=False
) )
with open( with open(
@ -1298,7 +1293,7 @@ DNF/YUM configuration harvested from the system (repos, config files, and RPM GP
else: else:
_write_role_defaults(role_dir, vars_map) _write_role_defaults(role_dir, vars_map)
tasks = """---\n""" + _render_generic_files_tasks( tasks = "---\n" + _render_generic_files_tasks(
var_prefix, include_restart_notify=False var_prefix, include_restart_notify=False
) )
with open( with open(
@ -1664,8 +1659,7 @@ User-requested extra file harvesting.
) )
task_parts.append( task_parts.append(
f""" f"""- name: Probe whether systemd unit exists and is manageable
- name: Probe whether systemd unit exists and is manageable
ansible.builtin.systemd: ansible.builtin.systemd:
name: "{{{{ {var_prefix}_unit_name }}}}" name: "{{{{ {var_prefix}_unit_name }}}}"
check_mode: true check_mode: true

View file

@ -81,6 +81,17 @@ class PackageBackend:
def list_manual_packages(self) -> List[str]: # pragma: no cover def list_manual_packages(self) -> List[str]: # pragma: no cover
raise NotImplementedError raise NotImplementedError
def installed_packages(self) -> Dict[str, List[Dict[str, str]]]: # pragma: no cover
"""Return mapping of package name -> installed instances.
Each instance is a dict with at least:
- version: package version string
- arch: architecture string
Backends should be best-effort and return an empty mapping on failure.
"""
raise NotImplementedError
def build_etc_index( def build_etc_index(
self, self,
) -> Tuple[ ) -> Tuple[
@ -121,6 +132,11 @@ class DpkgBackend(PackageBackend):
return list_manual_packages() return list_manual_packages()
def installed_packages(self) -> Dict[str, List[Dict[str, str]]]:
from .debian import list_installed_packages
return list_installed_packages()
def build_etc_index(self): def build_etc_index(self):
from .debian import build_dpkg_etc_index from .debian import build_dpkg_etc_index
@ -194,6 +210,11 @@ class RpmBackend(PackageBackend):
return list_manual_packages() return list_manual_packages()
def installed_packages(self) -> Dict[str, List[Dict[str, str]]]:
from .rpm import list_installed_packages
return list_installed_packages()
def build_etc_index(self): def build_etc_index(self):
from .rpm import build_rpm_etc_index from .rpm import build_rpm_etc_index

View file

@ -104,7 +104,7 @@ def list_manual_packages() -> List[str]:
if pkgs: if pkgs:
return _dedupe(pkgs) return _dedupe(pkgs)
# Fallback: human-oriented output. # Fallback
rc, out = _run( rc, out = _run(
["dnf", "-q", "history", "userinstalled"], allow_fail=True, merge_err=True ["dnf", "-q", "history", "userinstalled"], allow_fail=True, merge_err=True
) )
@ -142,6 +142,63 @@ def list_manual_packages() -> List[str]:
return [] return []
def list_installed_packages() -> Dict[str, List[Dict[str, str]]]:
"""Return mapping of installed package name -> installed instances.
Uses `rpm -qa` and is expected to work on RHEL/Fedora-like systems.
Output format:
{"pkg": [{"version": "...", "arch": "..."}, ...], ...}
The version string is formatted as:
- "<version>-<release>" for typical packages
- "<epoch>:<version>-<release>" if a non-zero epoch is present
"""
try:
_, out = _run(
[
"rpm",
"-qa",
"--qf",
"%{NAME}\t%{EPOCHNUM}\t%{VERSION}\t%{RELEASE}\t%{ARCH}\n",
],
allow_fail=False,
merge_err=True,
)
except Exception:
return {}
pkgs: Dict[str, List[Dict[str, str]]] = {}
for raw in (out or "").splitlines():
line = raw.strip("\n")
if not line:
continue
parts = line.split("\t")
if len(parts) < 5:
continue
name, epoch, ver, rel, arch = [p.strip() for p in parts[:5]]
if not name or not ver:
continue
# Normalise epoch.
epoch = epoch.strip()
if epoch.lower() in ("(none)", "none", ""):
epoch = "0"
v = f"{ver}-{rel}" if rel else ver
if epoch and epoch.isdigit() and epoch != "0":
v = f"{epoch}:{v}"
pkgs.setdefault(name, []).append({"version": v, "arch": arch})
for k in list(pkgs.keys()):
pkgs[k] = sorted(
pkgs[k], key=lambda x: (x.get("arch") or "", x.get("version") or "")
)
return pkgs
def _walk_etc_files() -> List[str]: def _walk_etc_files() -> List[str]:
out: List[str] = [] out: List[str] = []
for dirpath, _, filenames in os.walk("/etc"): for dirpath, _, filenames in os.walk("/etc"):

View file

@ -18,7 +18,20 @@ def test_diff_includes_usr_local_custom_files(tmp_path: Path):
new = tmp_path / "new" new = tmp_path / "new"
old_state = { old_state = {
"host": {"hostname": "h1", "os": "debian"}, "schema_version": 3,
"host": {"hostname": "h1", "os": "debian", "pkg_backend": "dpkg"},
"inventory": {
"packages": {
"curl": {
"version": "1.0",
"arches": [],
"installations": [{"version": "1.0", "arch": "amd64"}],
"observed_via": [{"kind": "user_installed"}],
"roles": [],
}
}
},
"roles": {
"users": { "users": {
"role_name": "users", "role_name": "users",
"users": [], "users": [],
@ -27,9 +40,13 @@ def test_diff_includes_usr_local_custom_files(tmp_path: Path):
"notes": [], "notes": [],
}, },
"services": [], "services": [],
"package_roles": [], "packages": [],
"manual_packages": ["curl"], "apt_config": {
"manual_packages_skipped": [], "role_name": "apt_config",
"managed_files": [],
"excluded": [],
"notes": [],
},
"etc_custom": { "etc_custom": {
"role_name": "etc_custom", "role_name": "etc_custom",
"managed_files": [], "managed_files": [],
@ -51,10 +68,33 @@ def test_diff_includes_usr_local_custom_files(tmp_path: Path):
"excluded": [], "excluded": [],
"notes": [], "notes": [],
}, },
"extra_paths": {
"role_name": "extra_paths",
"include_patterns": [],
"exclude_patterns": [],
"managed_files": [],
"excluded": [],
"notes": [],
},
},
} }
new_state = { new_state = {
**old_state, **old_state,
"manual_packages": ["curl", "htop"], "inventory": {
"packages": {
**old_state["inventory"]["packages"],
"htop": {
"version": "3.0",
"arches": [],
"installations": [{"version": "3.0", "arch": "amd64"}],
"observed_via": [{"kind": "user_installed"}],
"roles": [],
},
}
},
"roles": {
**old_state["roles"],
"usr_local_custom": { "usr_local_custom": {
"role_name": "usr_local_custom", "role_name": "usr_local_custom",
"managed_files": [ "managed_files": [
@ -78,6 +118,7 @@ def test_diff_includes_usr_local_custom_files(tmp_path: Path):
"excluded": [], "excluded": [],
"notes": [], "notes": [],
}, },
},
} }
_write_bundle( _write_bundle(

View file

@ -30,6 +30,7 @@ class FakeBackend:
owner_fn, owner_fn,
modified_by_pkg: dict[str, dict[str, str]] | None = None, modified_by_pkg: dict[str, dict[str, str]] | None = None,
pkg_config_prefixes: tuple[str, ...] = ("/etc/apt/",), pkg_config_prefixes: tuple[str, ...] = ("/etc/apt/",),
installed: dict[str, list[dict[str, str]]] | None = None,
): ):
self.name = name self.name = name
self.pkg_config_prefixes = pkg_config_prefixes self.pkg_config_prefixes = pkg_config_prefixes
@ -40,6 +41,7 @@ class FakeBackend:
self._manual = manual_pkgs self._manual = manual_pkgs
self._owner_fn = owner_fn self._owner_fn = owner_fn
self._modified_by_pkg = modified_by_pkg or {} self._modified_by_pkg = modified_by_pkg or {}
self._installed = installed or {}
def build_etc_index(self): def build_etc_index(self):
return ( return (
@ -55,6 +57,14 @@ class FakeBackend:
def list_manual_packages(self): def list_manual_packages(self):
return list(self._manual) return list(self._manual)
def installed_packages(self):
"""Return mapping package -> installations.
The real backends return:
{"pkg": [{"version": "...", "arch": "..."}, ...]}
"""
return dict(self._installed)
def specific_paths_for_hints(self, hints: set[str]): def specific_paths_for_hints(self, hints: set[str]):
return [] return []
@ -214,26 +224,36 @@ def test_harvest_dedup_manual_packages_and_builds_etc_custom(
state_path = h.harvest(str(bundle), policy=AllowAllPolicy()) state_path = h.harvest(str(bundle), policy=AllowAllPolicy())
st = json.loads(Path(state_path).read_text(encoding="utf-8")) st = json.loads(Path(state_path).read_text(encoding="utf-8"))
assert "openvpn" in st["manual_packages"] inv = st["inventory"]["packages"]
assert "curl" in st["manual_packages"] assert "openvpn" in inv
assert "openvpn" in st["manual_packages_skipped"] assert "curl" in inv
assert all(pr["package"] != "openvpn" for pr in st["package_roles"])
assert any(pr["package"] == "curl" for pr in st["package_roles"]) # openvpn is managed by the service role, so it should NOT appear as a package role.
pkg_roles = st["roles"]["packages"]
assert all(pr["package"] != "openvpn" for pr in pkg_roles)
assert any(pr["package"] == "curl" for pr in pkg_roles)
# Inventory provenance: openvpn should be observed via systemd unit.
openvpn_obs = inv["openvpn"]["observed_via"]
assert any(
o.get("kind") == "systemd_unit" and o.get("ref") == "openvpn.service"
for o in openvpn_obs
)
# Service role captured modified conffile # Service role captured modified conffile
svc = st["services"][0] svc = st["roles"]["services"][0]
assert svc["unit"] == "openvpn.service" assert svc["unit"] == "openvpn.service"
assert "openvpn" in svc["packages"] assert "openvpn" in svc["packages"]
assert any(mf["path"] == "/etc/openvpn/server.conf" for mf in svc["managed_files"]) assert any(mf["path"] == "/etc/openvpn/server.conf" for mf in svc["managed_files"])
# Unowned /etc/default/keyboard is attributed to etc_custom only # Unowned /etc/default/keyboard is attributed to etc_custom only
etc_custom = st["etc_custom"] etc_custom = st["roles"]["etc_custom"]
assert any( assert any(
mf["path"] == "/etc/default/keyboard" for mf in etc_custom["managed_files"] mf["path"] == "/etc/default/keyboard" for mf in etc_custom["managed_files"]
) )
# /usr/local content is attributed to usr_local_custom # /usr/local content is attributed to usr_local_custom
ul = st["usr_local_custom"] ul = st["roles"]["usr_local_custom"]
assert any(mf["path"] == "/usr/local/etc/myapp.conf" for mf in ul["managed_files"]) assert any(mf["path"] == "/usr/local/etc/myapp.conf" for mf in ul["managed_files"])
assert any(mf["path"] == "/usr/local/bin/myscript" for mf in ul["managed_files"]) assert any(mf["path"] == "/usr/local/bin/myscript" for mf in ul["managed_files"])
assert all(mf["path"] != "/usr/local/bin/readme.txt" for mf in ul["managed_files"]) assert all(mf["path"] != "/usr/local/bin/readme.txt" for mf in ul["managed_files"])
@ -338,10 +358,12 @@ def test_shared_cron_snippet_prefers_matching_role_over_lexicographic(
st = json.loads(Path(state_path).read_text(encoding="utf-8")) st = json.loads(Path(state_path).read_text(encoding="utf-8"))
# Cron snippet should end up attached to the ntpsec role, not apparmor. # Cron snippet should end up attached to the ntpsec role, not apparmor.
svc_ntpsec = next(s for s in st["services"] if s["role_name"] == "ntpsec") svc_ntpsec = next(s for s in st["roles"]["services"] if s["role_name"] == "ntpsec")
assert any(mf["path"] == "/etc/cron.d/ntpsec" for mf in svc_ntpsec["managed_files"]) assert any(mf["path"] == "/etc/cron.d/ntpsec" for mf in svc_ntpsec["managed_files"])
svc_apparmor = next(s for s in st["services"] if s["role_name"] == "apparmor") svc_apparmor = next(
s for s in st["roles"]["services"] if s["role_name"] == "apparmor"
)
assert all( assert all(
mf["path"] != "/etc/cron.d/ntpsec" for mf in svc_apparmor["managed_files"] mf["path"] != "/etc/cron.d/ntpsec" for mf in svc_apparmor["managed_files"]
) )

View file

@ -24,7 +24,20 @@ def test_manifest_uses_jinjaturtle_templates_and_does_not_copy_raw(
) )
state = { state = {
"host": {"hostname": "test", "os": "debian"}, "schema_version": 3,
"host": {"hostname": "test", "os": "debian", "pkg_backend": "dpkg"},
"inventory": {
"packages": {
"foo": {
"version": "1.0",
"arches": [],
"installations": [{"version": "1.0", "arch": "amd64"}],
"observed_via": [{"kind": "systemd_unit", "ref": "foo.service"}],
"roles": ["foo"],
}
}
},
"roles": {
"users": { "users": {
"role_name": "users", "role_name": "users",
"users": [], "users": [],
@ -32,12 +45,6 @@ def test_manifest_uses_jinjaturtle_templates_and_does_not_copy_raw(
"excluded": [], "excluded": [],
"notes": [], "notes": [],
}, },
"etc_custom": {
"role_name": "etc_custom",
"managed_files": [],
"excluded": [],
"notes": [],
},
"services": [ "services": [
{ {
"unit": "foo.service", "unit": "foo.service",
@ -61,7 +68,34 @@ def test_manifest_uses_jinjaturtle_templates_and_does_not_copy_raw(
"notes": [], "notes": [],
} }
], ],
"package_roles": [], "packages": [],
"apt_config": {
"role_name": "apt_config",
"managed_files": [],
"excluded": [],
"notes": [],
},
"etc_custom": {
"role_name": "etc_custom",
"managed_files": [],
"excluded": [],
"notes": [],
},
"usr_local_custom": {
"role_name": "usr_local_custom",
"managed_files": [],
"excluded": [],
"notes": [],
},
"extra_paths": {
"role_name": "extra_paths",
"include_patterns": [],
"exclude_patterns": [],
"managed_files": [],
"excluded": [],
"notes": [],
},
},
} }
bundle.mkdir(parents=True, exist_ok=True) bundle.mkdir(parents=True, exist_ok=True)

View file

@ -13,7 +13,27 @@ def test_manifest_writes_roles_and_playbook_with_clean_when(tmp_path: Path):
) )
state = { state = {
"host": {"hostname": "test", "os": "debian"}, "schema_version": 3,
"host": {"hostname": "test", "os": "debian", "pkg_backend": "dpkg"},
"inventory": {
"packages": {
"foo": {
"version": "1.0",
"arches": [],
"installations": [{"version": "1.0", "arch": "amd64"}],
"observed_via": [{"kind": "systemd_unit", "ref": "foo.service"}],
"roles": ["foo"],
},
"curl": {
"version": "8.0",
"arches": [],
"installations": [{"version": "8.0", "arch": "amd64"}],
"observed_via": [{"kind": "package_role", "ref": "curl"}],
"roles": ["curl"],
},
}
},
"roles": {
"users": { "users": {
"role_name": "users", "role_name": "users",
"users": [ "users": [
@ -32,6 +52,50 @@ def test_manifest_writes_roles_and_playbook_with_clean_when(tmp_path: Path):
"excluded": [], "excluded": [],
"notes": [], "notes": [],
}, },
"services": [
{
"unit": "foo.service",
"role_name": "foo",
"packages": ["foo"],
"active_state": "inactive",
"sub_state": "dead",
"unit_file_state": "enabled",
"condition_result": "no",
"managed_files": [
{
"path": "/etc/foo.conf",
"src_rel": "etc/foo.conf",
"owner": "root",
"group": "root",
"mode": "0644",
"reason": "modified_conffile",
}
],
"excluded": [],
"notes": [],
}
],
"packages": [
{
"package": "curl",
"role_name": "curl",
"managed_files": [],
"excluded": [],
"notes": [],
}
],
"apt_config": {
"role_name": "apt_config",
"managed_files": [],
"excluded": [],
"notes": [],
},
"dnf_config": {
"role_name": "dnf_config",
"managed_files": [],
"excluded": [],
"notes": [],
},
"etc_custom": { "etc_custom": {
"role_name": "etc_custom", "role_name": "etc_custom",
"managed_files": [ "managed_files": [
@ -70,38 +134,15 @@ def test_manifest_writes_roles_and_playbook_with_clean_when(tmp_path: Path):
"excluded": [], "excluded": [],
"notes": [], "notes": [],
}, },
"services": [ "extra_paths": {
{ "role_name": "extra_paths",
"unit": "foo.service", "include_patterns": [],
"role_name": "foo", "exclude_patterns": [],
"packages": ["foo"],
"active_state": "inactive",
"sub_state": "dead",
"unit_file_state": "enabled",
"condition_result": "no",
"managed_files": [
{
"path": "/etc/foo.conf",
"src_rel": "etc/foo.conf",
"owner": "root",
"group": "root",
"mode": "0644",
"reason": "modified_conffile",
}
],
"excluded": [],
"notes": [],
}
],
"package_roles": [
{
"package": "curl",
"role_name": "curl",
"managed_files": [], "managed_files": [],
"excluded": [], "excluded": [],
"notes": [], "notes": [],
} },
], },
} }
bundle.mkdir(parents=True, exist_ok=True) bundle.mkdir(parents=True, exist_ok=True)
@ -189,7 +230,20 @@ def test_manifest_site_mode_creates_host_inventory_and_raw_files(tmp_path: Path)
) )
state = { state = {
"host": {"hostname": "test", "os": "debian"}, "schema_version": 3,
"host": {"hostname": "test", "os": "debian", "pkg_backend": "dpkg"},
"inventory": {
"packages": {
"foo": {
"version": "1.0",
"arches": [],
"installations": [{"version": "1.0", "arch": "amd64"}],
"observed_via": [{"kind": "systemd_unit", "ref": "foo.service"}],
"roles": ["foo"],
}
}
},
"roles": {
"users": { "users": {
"role_name": "users", "role_name": "users",
"users": [], "users": [],
@ -197,6 +251,42 @@ def test_manifest_site_mode_creates_host_inventory_and_raw_files(tmp_path: Path)
"excluded": [], "excluded": [],
"notes": [], "notes": [],
}, },
"services": [
{
"unit": "foo.service",
"role_name": "foo",
"packages": ["foo"],
"active_state": "active",
"sub_state": "running",
"unit_file_state": "enabled",
"condition_result": "yes",
"managed_files": [
{
"path": "/etc/foo.conf",
"src_rel": "etc/foo.conf",
"owner": "root",
"group": "root",
"mode": "0644",
"reason": "modified_conffile",
}
],
"excluded": [],
"notes": [],
}
],
"packages": [],
"apt_config": {
"role_name": "apt_config",
"managed_files": [],
"excluded": [],
"notes": [],
},
"dnf_config": {
"role_name": "dnf_config",
"managed_files": [],
"excluded": [],
"notes": [],
},
"etc_custom": { "etc_custom": {
"role_name": "etc_custom", "role_name": "etc_custom",
"managed_files": [ "managed_files": [
@ -227,30 +317,15 @@ def test_manifest_site_mode_creates_host_inventory_and_raw_files(tmp_path: Path)
"excluded": [], "excluded": [],
"notes": [], "notes": [],
}, },
"services": [ "extra_paths": {
{ "role_name": "extra_paths",
"unit": "foo.service", "include_patterns": [],
"role_name": "foo", "exclude_patterns": [],
"packages": ["foo"], "managed_files": [],
"active_state": "active",
"sub_state": "running",
"unit_file_state": "enabled",
"condition_result": "yes",
"managed_files": [
{
"path": "/etc/foo.conf",
"src_rel": "etc/foo.conf",
"owner": "root",
"group": "root",
"mode": "0644",
"reason": "modified_conffile",
}
],
"excluded": [], "excluded": [],
"notes": [], "notes": [],
} },
], },
"package_roles": [],
} }
bundle.mkdir(parents=True, exist_ok=True) bundle.mkdir(parents=True, exist_ok=True)
@ -337,7 +412,20 @@ def test_manifest_includes_dnf_config_role_when_present(tmp_path: Path):
) )
state = { state = {
"schema_version": 3,
"host": {"hostname": "test", "os": "redhat", "pkg_backend": "rpm"}, "host": {"hostname": "test", "os": "redhat", "pkg_backend": "rpm"},
"inventory": {
"packages": {
"dnf": {
"version": "4.0",
"arches": [],
"installations": [{"version": "4.0", "arch": "x86_64"}],
"observed_via": [{"kind": "dnf_config"}],
"roles": [],
}
}
},
"roles": {
"users": { "users": {
"role_name": "users", "role_name": "users",
"users": [], "users": [],
@ -346,9 +434,7 @@ def test_manifest_includes_dnf_config_role_when_present(tmp_path: Path):
"notes": [], "notes": [],
}, },
"services": [], "services": [],
"package_roles": [], "packages": [],
"manual_packages": [],
"manual_packages_skipped": [],
"apt_config": { "apt_config": {
"role_name": "apt_config", "role_name": "apt_config",
"managed_files": [], "managed_files": [],
@ -390,6 +476,7 @@ def test_manifest_includes_dnf_config_role_when_present(tmp_path: Path):
"excluded": [], "excluded": [],
"notes": [], "notes": [],
}, },
},
} }
bundle.mkdir(parents=True, exist_ok=True) bundle.mkdir(parents=True, exist_ok=True)