Add --enforce mode to enroll diff and add --ignore-package-versions
Some checks failed
CI / test (push) Failing after 1m48s
Lint / test (push) Successful in 32s
Trivy / test (push) Successful in 22s

If there is diff detected between the two harvests, and it can
enforce restoring the state from the older harvest, it will
manifest the state and apply it with ansible. Only the specific
roles that had diffed will be applied (via the new tags capability).

`--ignore-package-versions` will skip reporting when packages are
upgraded/downgraded in the diff.
This commit is contained in:
Miguel Jacq 2026-01-10 10:51:41 +11:00
parent 9a249cc973
commit ebd30247d1
Signed by: mig5
GPG key ID: 59B3F0C24135C6A9
9 changed files with 309 additions and 59 deletions

View file

@ -567,6 +567,14 @@ def main() -> None:
"This affects file drift reporting only (added/removed/changed files), not package/service/user diffs."
),
)
d.add_argument(
"--ignore-package-versions",
action="store_true",
help=(
"Ignore package version changes in the diff report and exit status. "
"Package additions/removals are still reported. Useful when routine upgrades would otherwise create noisy drift."
),
)
d.add_argument(
"--enforce",
action="store_true",
@ -854,6 +862,9 @@ def main() -> None:
args.new,
sops_mode=bool(getattr(args, "sops", False)),
exclude_paths=list(getattr(args, "exclude_path", []) or []),
ignore_package_versions=bool(
getattr(args, "ignore_package_versions", False)
),
)
# Optional enforcement: if drift is detected, attempt to restore the
@ -865,7 +876,7 @@ def main() -> None:
"requested": True,
"status": "skipped",
"reason": (
"no enforceable drift detected (only package additions and/or version changes); "
"no enforceable drift detected (only additions and/or package version changes); "
"enroll does not attempt to downgrade packages"
),
}
@ -874,6 +885,7 @@ def main() -> None:
info = enforce_old_harvest(
args.old,
sops_mode=bool(getattr(args, "sops", False)),
report=report,
)
except Exception as e:
raise SystemExit(

View file

@ -3,6 +3,7 @@ from __future__ import annotations
import hashlib
import json
import os
import re
import shutil
import subprocess # nosec
import tarfile
@ -291,6 +292,7 @@ def compare_harvests(
*,
sops_mode: bool = False,
exclude_paths: Optional[List[str]] = None,
ignore_package_versions: bool = False,
) -> Tuple[Dict[str, Any], bool]:
"""Compare two harvests.
@ -317,17 +319,21 @@ def compare_harvests(
pkgs_removed = sorted(old_pkgs - new_pkgs)
pkgs_version_changed: List[Dict[str, Any]] = []
pkgs_version_changed_ignored_count = 0
for pkg in sorted(old_pkgs & new_pkgs):
a = old_inv.get(pkg) or {}
b = new_inv.get(pkg) or {}
if _pkg_version_key(a) != _pkg_version_key(b):
pkgs_version_changed.append(
{
"package": pkg,
"old": _pkg_version_display(a),
"new": _pkg_version_display(b),
}
)
if ignore_package_versions:
pkgs_version_changed_ignored_count += 1
else:
pkgs_version_changed.append(
{
"package": pkg,
"old": _pkg_version_display(a),
"new": _pkg_version_display(b),
}
)
old_units = _service_units(old_state)
new_units = _service_units(new_state)
@ -477,6 +483,7 @@ def compare_harvests(
"generated_at": _utc_now_iso(),
"filters": {
"exclude_paths": list(exclude_paths or []),
"ignore_package_versions": bool(ignore_package_versions),
},
"old": {
"input": old_path,
@ -494,6 +501,9 @@ def compare_harvests(
"added": pkgs_added,
"removed": pkgs_removed,
"version_changed": pkgs_version_changed,
"version_changed_ignored_count": int(
pkgs_version_changed_ignored_count
),
},
"services": {
"enabled_added": units_added,
@ -529,13 +539,6 @@ def compare_harvests(
return report, has_changes
def _tail_text(s: str, *, max_chars: int = 4000) -> str:
s = s or ""
if len(s) <= max_chars:
return s
return "" + s[-max_chars:]
def has_enforceable_drift(report: Dict[str, Any]) -> bool:
"""Return True if the diff report contains drift that is safe/meaningful to enforce.
@ -554,7 +557,9 @@ def has_enforceable_drift(report: Dict[str, Any]) -> bool:
return True
sv = report.get("services", {}) or {}
if (sv.get("enabled_added") or []) or (sv.get("enabled_removed") or []):
# We do not try to disable newly-enabled services; we only restore units
# that were enabled in the baseline but are now missing.
if sv.get("enabled_removed") or []:
return True
for ch in sv.get("changed", []) or []:
@ -567,28 +572,136 @@ def has_enforceable_drift(report: Dict[str, Any]) -> bool:
return True
us = report.get("users", {}) or {}
if (
(us.get("added") or [])
or (us.get("removed") or [])
or (us.get("changed") or [])
):
# We restore baseline users (missing/changed). We do not remove newly-added users.
if (us.get("removed") or []) or (us.get("changed") or []):
return True
fl = report.get("files", {}) or {}
if (
(fl.get("added") or [])
or (fl.get("removed") or [])
or (fl.get("changed") or [])
):
# We restore baseline files (missing/changed). We do not delete newly-managed files.
if (fl.get("removed") or []) or (fl.get("changed") or []):
return True
return False
def _role_tag(role: str) -> str:
"""Return the Ansible tag name for a role (must match manifest generation)."""
r = str(role or "").strip()
safe = re.sub(r"[^A-Za-z0-9_-]+", "_", r).strip("_")
if not safe:
safe = "other"
return f"role_{safe}"
def _enforcement_plan(
report: Dict[str, Any],
old_state: Dict[str, Any],
old_bundle_dir: Path,
) -> Dict[str, Any]:
"""Return a best-effort enforcement plan (roles/tags) for this diff report.
We only plan for drift that the baseline manifest can safely restore:
- packages that were removed (reinstall, no downgrades)
- baseline users that were removed/changed
- baseline files that were removed/changed
- baseline systemd units that were disabled/changed
We do NOT plan to remove newly-added packages/users/files/services.
"""
roles: set[str] = set()
# --- Packages (only removals)
pk = report.get("packages", {}) or {}
removed_pkgs = set(pk.get("removed") or [])
if removed_pkgs:
pkg_to_roles: Dict[str, set[str]] = {}
for svc in _roles(old_state).get("services") or []:
r = str(svc.get("role_name") or "").strip()
for p in svc.get("packages", []) or []:
if p:
pkg_to_roles.setdefault(str(p), set()).add(r)
for pr in _roles(old_state).get("packages") or []:
r = str(pr.get("role_name") or "").strip()
p = pr.get("package")
if p:
pkg_to_roles.setdefault(str(p), set()).add(r)
for p in removed_pkgs:
for r in pkg_to_roles.get(str(p), set()):
if r:
roles.add(r)
# --- Users (removed/changed)
us = report.get("users", {}) or {}
if (us.get("removed") or []) or (us.get("changed") or []):
u = _roles(old_state).get("users") or {}
u_role = str(u.get("role_name") or "users")
if u_role:
roles.add(u_role)
# --- Files (removed/changed)
fl = report.get("files", {}) or {}
file_paths: List[str] = []
for e in fl.get("removed", []) or []:
if isinstance(e, dict):
p = e.get("path")
else:
p = e
if p:
file_paths.append(str(p))
for e in fl.get("changed", []) or []:
if isinstance(e, dict):
p = e.get("path")
else:
p = e
if p:
file_paths.append(str(p))
if file_paths:
idx = _file_index(old_bundle_dir, old_state)
for p in file_paths:
rec = idx.get(p)
if rec and rec.role:
roles.add(str(rec.role))
# --- Services (enabled_removed + meaningful changes)
sv = report.get("services", {}) or {}
units: List[str] = []
for u in sv.get("enabled_removed", []) or []:
if u:
units.append(str(u))
for ch in sv.get("changed", []) or []:
if not isinstance(ch, dict):
continue
unit = ch.get("unit")
changes = ch.get("changes") or {}
if unit and any(k != "packages" for k in changes.keys()):
units.append(str(unit))
if units:
old_units = _service_units(old_state)
for u in units:
snap = old_units.get(u)
if snap and snap.get("role_name"):
roles.add(str(snap.get("role_name")))
# Drop empty/unknown roles.
roles = {r for r in roles if r and str(r).strip() and str(r).strip() != "unknown"}
tags = sorted({_role_tag(r) for r in roles})
return {
"roles": sorted(roles),
"tags": tags,
}
def enforce_old_harvest(
old_path: str,
*,
sops_mode: bool = False,
report: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""Enforce the *old* (baseline) harvest state on the current machine.
@ -616,6 +729,17 @@ def enforce_old_harvest(
if old_b.tempdir:
stack.callback(old_b.tempdir.cleanup)
old_state = _load_state(old_b.dir)
plan: Optional[Dict[str, Any]] = None
tags: Optional[List[str]] = None
roles: List[str] = []
if report is not None:
plan = _enforcement_plan(report, old_state, old_b.dir)
roles = list(plan.get("roles") or [])
t = list(plan.get("tags") or [])
tags = t if t else None
with tempfile.TemporaryDirectory(prefix="enroll-enforce-") as td:
td_path = Path(td)
try:
@ -646,6 +770,8 @@ def enforce_old_harvest(
"local",
str(playbook),
]
if tags:
cmd.extend(["--tags", ",".join(tags)])
p = subprocess.run(
cmd,
cwd=str(td_path),
@ -666,16 +792,14 @@ def enforce_old_harvest(
"returncode": int(p.returncode),
}
# Include a small tail for observability in webhooks/emails.
if p.stdout:
info["stdout_tail"] = _tail_text(p.stdout)
if p.stderr:
info["stderr_tail"] = _tail_text(p.stderr)
# Record tag selection (if we could attribute drift to specific roles).
info["roles"] = roles
info["tags"] = list(tags or [])
if not tags:
info["scope"] = "full_playbook"
if p.returncode != 0:
err = (p.stderr or p.stdout or "").strip()
if err:
err = _tail_text(err)
raise RuntimeError(
"ansible-playbook failed"
+ (f" (rc={p.returncode})" if p.returncode is not None else "")
@ -709,13 +833,30 @@ def _report_text(report: Dict[str, Any]) -> str:
if ex_paths:
lines.append(f"file exclude patterns: {', '.join(str(p) for p in ex_paths)}")
if filt.get("ignore_package_versions"):
ignored = int(
(report.get("packages", {}) or {}).get("version_changed_ignored_count") or 0
)
msg = "package version drift: ignored (--ignore-package-versions)"
if ignored:
msg += f" (ignored {ignored} change{'s' if ignored != 1 else ''})"
lines.append(msg)
enf = report.get("enforcement") or {}
if enf:
lines.append("\nEnforcement")
status = str(enf.get("status") or "").strip().lower()
if status == "applied":
extra = ""
tags = enf.get("tags") or []
scope = enf.get("scope")
if tags:
extra = f" (tags={','.join(str(t) for t in tags)})"
elif scope:
extra = f" ({scope})"
lines.append(
f" applied old harvest via ansible-playbook (rc={enf.get('returncode')})"
+ extra
+ (
f" (finished {enf.get('finished_at')})"
if enf.get("finished_at")
@ -737,7 +878,10 @@ def _report_text(report: Dict[str, Any]) -> str:
lines.append("\nPackages")
lines.append(f" added: {len(pk.get('added', []) or [])}")
lines.append(f" removed: {len(pk.get('removed', []) or [])}")
lines.append(f" version_changed: {len(pk.get('version_changed', []) or [])}")
ignored_v = int(pk.get("version_changed_ignored_count") or 0)
vc = len(pk.get("version_changed", []) or [])
suffix = f" (ignored {ignored_v})" if ignored_v else ""
lines.append(f" version_changed: {vc}{suffix}")
for p in pk.get("added", []) or []:
lines.append(f" + {p}")
for p in pk.get("removed", []) or []:
@ -848,13 +992,30 @@ def _report_markdown(report: Dict[str, Any]) -> str:
+ "\n"
)
if filt.get("ignore_package_versions"):
ignored = int(
(report.get("packages", {}) or {}).get("version_changed_ignored_count") or 0
)
msg = "- **Package version drift**: ignored (`--ignore-package-versions`)"
if ignored:
msg += f" (ignored {ignored} change{'s' if ignored != 1 else ''})"
out.append(msg + "\n")
enf = report.get("enforcement") or {}
if enf:
out.append("\n## Enforcement\n")
status = str(enf.get("status") or "").strip().lower()
if status == "applied":
extra = ""
tags = enf.get("tags") or []
scope = enf.get("scope")
if tags:
extra = " (tags=" + ",".join(str(t) for t in tags) + ")"
elif scope:
extra = f" ({scope})"
out.append(
"- ✅ Applied old harvest via ansible-playbook"
+ extra
+ (
f" (rc={enf.get('returncode')})"
if enf.get("returncode") is not None
@ -892,7 +1053,10 @@ def _report_markdown(report: Dict[str, Any]) -> str:
for p in pk.get("removed", []) or []:
out.append(f" - `- {p}`\n")
out.append(f"- Version changed: {len(pk.get('version_changed', []) or [])}\n")
ignored_v = int(pk.get("version_changed_ignored_count") or 0)
vc = len(pk.get("version_changed", []) or [])
suffix = f" (ignored {ignored_v})" if ignored_v else ""
out.append(f"- Version changed: {vc}{suffix}\n")
for ch in pk.get("version_changed", []) or []:
out.append(
f" - `~ {ch.get('package')}`: `{ch.get('old')}` → `{ch.get('new')}`\n"

View file

@ -163,6 +163,19 @@ def _write_role_scaffold(role_dir: str) -> None:
os.makedirs(os.path.join(role_dir, "templates"), exist_ok=True)
def _role_tag(role: str) -> str:
"""Return a stable Ansible tag name for a role.
Used by `enroll diff --enforce` to run only the roles needed to repair drift.
"""
r = str(role or "").strip()
# Ansible tag charset is fairly permissive, but keep it portable and consistent.
safe = re.sub(r"[^A-Za-z0-9_-]+", "_", r).strip("_")
if not safe:
safe = "other"
return f"role_{safe}"
def _write_playbook_all(path: str, roles: List[str]) -> None:
pb_lines = [
"---",
@ -173,7 +186,8 @@ def _write_playbook_all(path: str, roles: List[str]) -> None:
" roles:",
]
for r in roles:
pb_lines.append(f" - {r}")
pb_lines.append(f" - role: {r}")
pb_lines.append(f" tags: [{_role_tag(r)}]")
with open(path, "w", encoding="utf-8") as f:
f.write("\n".join(pb_lines) + "\n")
@ -188,7 +202,8 @@ def _write_playbook_host(path: str, fqdn: str, roles: List[str]) -> None:
" roles:",
]
for r in roles:
pb_lines.append(f" - {r}")
pb_lines.append(f" - role: {r}")
pb_lines.append(f" tags: [{_role_tag(r)}]")
with open(path, "w", encoding="utf-8") as f:
f.write("\n".join(pb_lines) + "\n")