Support '--enforce' mode in 'enroll diff' with '--target' to use a specific config manager to run to enforce
All checks were successful
CI / test (push) Successful in 27m26s
Lint / test (push) Successful in 45s

This commit is contained in:
Miguel Jacq 2026-06-21 12:38:10 +10:00
parent 5b0e945c99
commit a0ac28f213
Signed by: mig5
GPG key ID: 03906B4110AAD3B8
4 changed files with 334 additions and 54 deletions

View file

@ -634,10 +634,19 @@ def main() -> None:
action="store_true",
help=(
"If differences are detected, attempt to enforce the old harvest state locally by generating a manifest and "
"running ansible-playbook. Requires ansible-playbook on PATH. "
"running the selected local apply tool. "
"Enroll does not attempt to downgrade packages; if the only drift is package version upgrades (or newly installed packages), enforcement is skipped."
),
)
d.add_argument(
"--target",
choices=["ansible", "puppet", "salt"],
default="ansible",
help=(
"Configuration-management target to use with --enforce (default: ansible). "
"Requires ansible-playbook, puppet, or salt-call on PATH as appropriate."
),
)
d.add_argument(
"--out",
help="Write the report to this file instead of stdout.",
@ -945,7 +954,7 @@ def main() -> None:
)
# Optional enforcement: if drift is detected, attempt to restore the
# system to the *old* (baseline) state using ansible-playbook.
# system to the *old* (baseline) state using the selected target.
if bool(getattr(args, "enforce", False)):
if has_changes:
if not has_enforceable_drift(report):
@ -963,6 +972,7 @@ def main() -> None:
args.old,
sops_mode=bool(getattr(args, "sops", False)),
report=report,
target=getattr(args, "target", "ansible"),
)
except Exception as e:
raise SystemExit(

View file

@ -658,6 +658,113 @@ def _role_tag(role: str) -> str:
return f"role_{safe}"
def _normalise_enforcement_target(target: str) -> str:
t = str(target or "ansible").strip().lower()
if t not in {"ansible", "puppet", "salt"}:
raise ValueError(f"unsupported enforcement target: {target!r}")
return t
def _enforcement_tool(target: str) -> Tuple[str, str]:
"""Return (binary-name, human-label) for a local enforcement target."""
if target == "puppet":
return "puppet", "puppet apply"
if target == "salt":
return "salt-call", "salt-call"
return "ansible-playbook", "ansible-playbook"
def _require_enforcement_tool(target: str) -> Tuple[str, str]:
binary, label = _enforcement_tool(target)
exe = shutil.which(binary)
if not exe:
install_hint = {
"ansible": "Ansible",
"puppet": "Puppet",
"salt": "Salt",
}.get(target, target)
raise RuntimeError(
f"{binary} not found on PATH "
f"(cannot enforce with target {target}; install {install_hint})"
)
return exe, label
def _enforcement_command(
target: str,
exe: str,
manifest_dir: Path,
*,
tags: Optional[List[str]] = None,
) -> Tuple[List[str], Dict[str, str]]:
"""Return the local apply command and environment for a rendered manifest."""
env = dict(os.environ)
if target == "ansible":
playbook = manifest_dir / "playbook.yml"
if not playbook.exists():
raise RuntimeError(
f"manifest did not produce expected playbook.yml at {playbook}"
)
cfg = manifest_dir / "ansible.cfg"
if cfg.exists():
env["ANSIBLE_CONFIG"] = str(cfg)
cmd = [
exe,
"-i",
"localhost,",
"-c",
"local",
str(playbook),
]
if tags:
cmd.extend(["--tags", ",".join(tags)])
return cmd, env
if target == "puppet":
site_pp = manifest_dir / "manifests" / "site.pp"
if not site_pp.exists():
raise RuntimeError(
f"manifest did not produce expected Puppet site.pp at {site_pp}"
)
cmd = [
exe,
"apply",
"--modulepath",
str(manifest_dir / "modules"),
]
hiera_config = manifest_dir / "hiera.yaml"
if hiera_config.exists():
cmd.extend(["--hiera_config", str(hiera_config)])
cmd.append(str(site_pp))
return cmd, env
if target == "salt":
states_dir = manifest_dir / "states"
top_sls = states_dir / "top.sls"
if not top_sls.exists():
raise RuntimeError(
f"manifest did not produce expected Salt top.sls at {top_sls}"
)
cmd = [
exe,
"--local",
"--file-root",
str(states_dir),
]
pillar_dir = manifest_dir / "pillar"
if pillar_dir.exists():
cmd.extend(["--pillar-root", str(pillar_dir)])
cmd.extend(["state.apply"])
return cmd, env
raise ValueError(f"unsupported enforcement target: {target!r}")
def _enforcement_plan(
report: Dict[str, Any],
old_state: Dict[str, Any],
@ -767,22 +874,22 @@ def enforce_old_harvest(
*,
sops_mode: bool = False,
report: Optional[Dict[str, Any]] = None,
target: str = "ansible",
) -> Dict[str, Any]:
"""Enforce the *old* (baseline) harvest state on the current machine.
When Ansible is available, this:
1) renders a temporary manifest from the old harvest, and
2) runs ansible-playbook locally to apply it.
This renders a temporary manifest from the old harvest using the requested
target, then runs the target's local apply command:
- ansible: ansible-playbook -i localhost, -c local playbook.yml
- puppet: puppet apply --modulepath ./modules manifests/site.pp
- salt: salt-call --local --file-root ./states state.apply
Returns a dict suitable for attaching to the diff report under
report['enforcement'].
"""
ansible_playbook = shutil.which("ansible-playbook")
if not ansible_playbook:
raise RuntimeError(
"ansible-playbook not found on PATH (cannot enforce; install Ansible)"
)
target = _normalise_enforcement_target(target)
tool_exe, tool_label = _require_enforcement_tool(target)
# Import lazily to avoid heavy import cost and potential CLI cycles.
from .manifest import manifest
@ -802,8 +909,12 @@ def enforce_old_harvest(
if report is not None:
plan = _enforcement_plan(report, old_state, old_b.dir)
roles = list(plan.get("roles") or [])
t = list(plan.get("tags") or [])
tags = t if t else None
# Only Ansible has generated per-role tags that can safely narrow
# the apply scope. Puppet and Salt enforcement deliberately run the
# full generated local manifest/catalog for now.
if target == "ansible":
t = list(plan.get("tags") or [])
tags = t if t else None
with tempfile.TemporaryDirectory(prefix="enroll-enforce-") as td:
td_path = Path(td)
@ -813,30 +924,15 @@ def enforce_old_harvest(
pass
# 1) Generate a manifest in a temp directory.
manifest(str(old_b.dir), str(td_path))
playbook = td_path / "playbook.yml"
if not playbook.exists():
raise RuntimeError(
f"manifest did not produce expected playbook.yml at {playbook}"
)
manifest(str(old_b.dir), str(td_path), target=target)
# 2) Apply it locally.
env = dict(os.environ)
cfg = td_path / "ansible.cfg"
if cfg.exists():
env["ANSIBLE_CONFIG"] = str(cfg)
cmd = [
ansible_playbook,
"-i",
"localhost,",
"-c",
"local",
str(playbook),
]
if tags:
cmd.extend(["--tags", ",".join(tags)])
cmd, env = _enforcement_command(
target,
tool_exe,
td_path,
tags=tags,
)
spinner: Optional[_Spinner] = None
p: Optional[subprocess.CompletedProcess[str]] = None
@ -844,12 +940,12 @@ def enforce_old_harvest(
if _progress_enabled():
if tags:
sys.stderr.write(
f"Enforce: running ansible-playbook (tags: {','.join(tags)})\n",
f"Enforce: running {tool_label} (tags: {','.join(tags)})\n",
)
else:
sys.stderr.write("Enforce: running ansible-playbook\n")
sys.stderr.write(f"Enforce: running {tool_label}\n")
sys.stderr.flush()
spinner = _Spinner(" ansible-playbook")
spinner = _Spinner(f" {tool_label}")
spinner.start()
try:
@ -867,8 +963,8 @@ def enforce_old_harvest(
rc = p.returncode if p is not None else None
spinner.stop(
final_line=(
f"Enforce: ansible-playbook finished in {elapsed:0.1f}s"
+ (f" (rc={rc})" if rc is not None else ""),
f"Enforce: {tool_label} finished in {elapsed:0.1f}s"
+ (f" (rc={rc})" if rc is not None else "")
),
)
@ -876,23 +972,32 @@ def enforce_old_harvest(
info: Dict[str, Any] = {
"status": "applied" if p.returncode == 0 else "failed",
"target": target,
"tool": tool_label,
"executable": tool_exe,
"started_at": started_at,
"finished_at": finished_at,
"ansible_playbook": ansible_playbook,
"command": cmd,
"returncode": int(p.returncode),
}
# Keep the original Ansible-specific field for compatibility with
# existing consumers of the JSON report.
if target == "ansible":
info["ansible_playbook"] = tool_exe
elif target == "puppet":
info["puppet"] = tool_exe
elif target == "salt":
info["salt_call"] = tool_exe
# Record tag selection (if we could attribute drift to specific roles).
info["roles"] = roles
info["tags"] = list(tags or [])
if not tags:
info["scope"] = "full_playbook"
info["scope"] = "full_manifest"
if p.returncode != 0:
err = (p.stderr or p.stdout or "").strip()
raise RuntimeError(
"ansible-playbook failed"
f"{tool_label} failed"
+ (f" (rc={p.returncode})" if p.returncode is not None else "")
+ (f": {err}" if err else "")
)
@ -937,6 +1042,9 @@ def _report_text(report: Dict[str, Any]) -> str:
if enf:
lines.append("\nEnforcement")
status = str(enf.get("status") or "").strip().lower()
tool = str(enf.get("tool") or "ansible-playbook")
target = str(enf.get("target") or "ansible")
via = f"{tool} ({target})" if target and target not in tool else tool
if status == "applied":
extra = ""
tags = enf.get("tags") or []
@ -946,7 +1054,7 @@ def _report_text(report: Dict[str, Any]) -> str:
elif scope:
extra = f" ({scope})"
lines.append(
f" applied old harvest via ansible-playbook (rc={enf.get('returncode')})"
f" applied old harvest via {via} (rc={enf.get('returncode')})"
+ extra
+ (
f" (finished {enf.get('finished_at')})"
@ -956,7 +1064,7 @@ def _report_text(report: Dict[str, Any]) -> str:
)
elif status == "failed":
lines.append(
f" attempted enforcement but ansible-playbook failed (rc={enf.get('returncode')})"
f" attempted enforcement but {via} failed (rc={enf.get('returncode')})"
)
elif status == "skipped":
r = enf.get("reason")
@ -1096,6 +1204,9 @@ def _report_markdown(report: Dict[str, Any]) -> str:
if enf:
out.append("\n## Enforcement\n")
status = str(enf.get("status") or "").strip().lower()
tool = str(enf.get("tool") or "ansible-playbook")
target = str(enf.get("target") or "ansible")
via = f"{tool} ({target})" if target and target not in tool else tool
if status == "applied":
extra = ""
tags = enf.get("tags") or []
@ -1105,7 +1216,7 @@ def _report_markdown(report: Dict[str, Any]) -> str:
elif scope:
extra = f" ({scope})"
out.append(
"- ✅ Applied old harvest via ansible-playbook"
f"- ✅ Applied old harvest via {via}"
+ extra
+ (
f" (rc={enf.get('returncode')})"
@ -1121,7 +1232,7 @@ def _report_markdown(report: Dict[str, Any]) -> str:
)
elif status == "failed":
out.append(
"- ⚠️ Attempted enforcement but ansible-playbook failed"
f"- ⚠️ Attempted enforcement but {via} failed"
+ (
f" (rc={enf.get('returncode')})"
if enf.get("returncode") is not None