Simplify the over-engineered ansible rendering. Simplify docker image mgmt on Puppet so it doesn't use that awful puppetlabs-docker module
All checks were successful
CI / test (push) Successful in 20m26s
Lint / test (push) Successful in 47s

This commit is contained in:
Miguel Jacq 2026-06-19 16:32:25 +10:00
parent 05b2875c17
commit b8926f9a5f
Signed by: mig5
GPG key ID: 03906B4110AAD3B8
27 changed files with 3369 additions and 3585 deletions

View file

@ -7,8 +7,8 @@
* Support manifesting Puppet code, as well as Ansible!
* Support manifesting Salt code, as well as Ansible and Puppet!
* A lot of under-the-bonnet refactoring to make it easier to extend to cover other config managers (that don't suck) in future.
* Support for detecting Docker images. You will need to install puppetlabs-docker module if you're using the Puppet manifester.
* Add support for detecting flatpaks and snaps (manifests Ansible code only, not Puppet or Salt at this time)
* Support for detecting Docker images.
* Add support for detecting Flatpaks and Snaps (manifests for Ansible code only, not Puppet or Salt at this time)
# 0.6.0

File diff suppressed because it is too large Load diff

View file

@ -1 +0,0 @@
"""Ansible manifest renderer implementation."""

View file

@ -1,56 +0,0 @@
from __future__ import annotations
import os
from dataclasses import dataclass
from typing import Optional, Tuple
from ..jinjaturtle import find_jinjaturtle_cmd
@dataclass
class AnsibleManifestContext:
bundle_dir: str
out_dir: str
roles_root: str
fqdn: Optional[str]
site_mode: bool
jt_exe: Optional[str]
jt_enabled: bool
def _resolve_jinjaturtle_mode(jinjaturtle: str) -> Tuple[Optional[str], bool]:
jt_exe = find_jinjaturtle_cmd()
if jinjaturtle not in ("auto", "on", "off"):
raise ValueError("jinjaturtle must be one of: auto, on, off")
if jinjaturtle == "on":
if not jt_exe:
raise RuntimeError("jinjaturtle requested but not found on PATH")
return jt_exe, True
if jinjaturtle == "auto":
return jt_exe, jt_exe is not None
return jt_exe, False
def _prepare_ansible_context(
bundle_dir: str,
out_dir: str,
*,
fqdn: Optional[str],
jinjaturtle: str,
) -> AnsibleManifestContext:
site_mode = fqdn is not None and fqdn != ""
jt_exe, jt_enabled = _resolve_jinjaturtle_mode(jinjaturtle)
os.makedirs(out_dir, exist_ok=True)
roles_root = os.path.join(out_dir, "roles")
os.makedirs(roles_root, exist_ok=True)
return AnsibleManifestContext(
bundle_dir=bundle_dir,
out_dir=out_dir,
roles_root=roles_root,
fqdn=fqdn,
site_mode=site_mode,
jt_exe=jt_exe,
jt_enabled=jt_enabled,
)

View file

@ -1,69 +0,0 @@
from __future__ import annotations
import os
from typing import Any, Dict, List, Optional, Set, Tuple
from ..jinjaturtle import can_jinjify_path, infer_other_formats, run_jinjaturtle
from .yamlutil import _merge_mappings_overwrite, _yaml_dump_mapping, _yaml_load_mapping
def _jinjify_managed_files(
bundle_dir: str,
role: str,
role_dir: str,
managed_files: List[Dict[str, Any]],
*,
jt_exe: Optional[str],
jt_enabled: bool,
overwrite_templates: bool,
) -> Tuple[Set[str], str]:
"""
Return (templated_src_rels, combined_vars_text).
combined_vars_text is a YAML mapping fragment (no leading ---).
"""
templated: Set[str] = set()
vars_map: Dict[str, Any] = {}
if not (jt_enabled and jt_exe):
return templated, ""
for mf in managed_files:
dest_path = mf.get("path", "")
src_rel = mf.get("src_rel", "")
if not dest_path or not src_rel:
continue
if not can_jinjify_path(dest_path):
continue
artifact_path = os.path.join(bundle_dir, "artifacts", role, src_rel)
if not os.path.isfile(artifact_path):
continue
try:
force_fmt = infer_other_formats(dest_path)
res = run_jinjaturtle(
jt_exe, artifact_path, role_name=role, force_format=force_fmt
)
except Exception:
# If jinjaturtle cannot process a file for any reason, skip silently.
# (Enroll's core promise is to be optimistic and non-interactive.)
continue # nosec
tmpl_rel = src_rel + ".j2"
tmpl_dst = os.path.join(role_dir, "templates", tmpl_rel)
if overwrite_templates or not os.path.exists(tmpl_dst):
os.makedirs(os.path.dirname(tmpl_dst), exist_ok=True)
with open(tmpl_dst, "w", encoding="utf-8") as f:
f.write(res.template_text)
templated.add(src_rel)
if res.vars_text.strip():
# merge YAML mappings; last wins (avoids duplicate keys)
chunk = _yaml_load_mapping(res.vars_text)
if chunk:
vars_map = _merge_mappings_overwrite(vars_map, chunk)
if vars_map:
combined = _yaml_dump_mapping(vars_map, sort_keys=True)
return templated, combined
return templated, ""

View file

@ -1,304 +0,0 @@
from __future__ import annotations
import os
import re
import shutil
import stat
import tempfile
from pathlib import Path
from typing import Any, Dict, List, Optional, Set
from .context import AnsibleManifestContext
from .yamlutil import _merge_mappings_overwrite, _yaml_dump_mapping, _yaml_load_mapping
def _copy2_replace(src: str, dst: str) -> None:
dst_dir = os.path.dirname(dst)
os.makedirs(dst_dir, exist_ok=True)
# Copy to a temp file in the same directory, then atomically replace.
fd, tmp = tempfile.mkstemp(prefix=".enroll-tmp-", dir=dst_dir)
os.close(fd)
try:
shutil.copy2(src, tmp)
# Ensure the working tree stays mergeable: make the file user-writable.
st = os.stat(tmp, follow_symlinks=False)
mode = stat.S_IMODE(st.st_mode)
if not (mode & stat.S_IWUSR):
os.chmod(tmp, mode | stat.S_IWUSR)
os.replace(tmp, dst)
finally:
try:
os.unlink(tmp)
except FileNotFoundError:
pass
def _copy_artifacts(
bundle_dir: str,
role: str,
dst_files_dir: str,
*,
preserve_existing: bool = False,
exclude_rels: Optional[Set[str]] = None,
) -> None:
"""Copy harvested artifacts for a role into a destination *files* directory.
In non --fqdn mode, this is usually <role_dir>/files.
In --fqdn site mode, this is usually:
inventory/host_vars/<fqdn>/<role>/.files
"""
artifacts_dir = os.path.join(bundle_dir, "artifacts", role)
if not os.path.isdir(artifacts_dir):
return
for root, _, files in os.walk(artifacts_dir):
for fn in files:
src = os.path.join(root, fn)
rel = os.path.relpath(src, artifacts_dir)
dst = os.path.join(dst_files_dir, rel)
# If a file was successfully templatised by JinjaTurtle, do NOT
# also materialise the raw copy in the destination files dir.
if exclude_rels and rel in exclude_rels:
try:
if os.path.isfile(dst):
os.remove(dst)
except Exception:
pass # nosec
continue
if preserve_existing and os.path.exists(dst):
continue
os.makedirs(os.path.dirname(dst), exist_ok=True)
_copy2_replace(src, dst)
def _write_role_scaffold(role_dir: str) -> None:
os.makedirs(os.path.join(role_dir, "tasks"), exist_ok=True)
os.makedirs(os.path.join(role_dir, "handlers"), exist_ok=True)
os.makedirs(os.path.join(role_dir, "defaults"), exist_ok=True)
os.makedirs(os.path.join(role_dir, "meta"), exist_ok=True)
os.makedirs(os.path.join(role_dir, "files"), exist_ok=True)
os.makedirs(os.path.join(role_dir, "templates"), exist_ok=True)
def _role_tag(role: str) -> str:
"""Return a stable Ansible tag name for a role.
Used by `enroll diff --enforce` to run only the roles needed to repair drift.
"""
r = str(role or "").strip()
# Ansible tag charset is fairly permissive, but keep it portable and consistent.
safe = re.sub(r"[^A-Za-z0-9_-]+", "_", r).strip("_")
if not safe:
safe = "other"
return f"role_{safe}"
def _write_playbook_all(path: str, roles: List[str]) -> None:
pb_lines = [
"---",
"- name: Apply all roles on all hosts",
" gather_facts: true",
" hosts: all",
" become: true",
" roles:",
]
for r in roles:
pb_lines.append(f" - role: {r}")
pb_lines.append(f" tags: [{_role_tag(r)}]")
with open(path, "w", encoding="utf-8") as f:
f.write("\n".join(pb_lines) + "\n")
def _write_playbook_host(path: str, fqdn: str, roles: List[str]) -> None:
pb_lines = [
"---",
f"- name: Apply all roles on {fqdn}",
f" hosts: {fqdn}",
" gather_facts: true",
" become: true",
" roles:",
]
for r in roles:
pb_lines.append(f" - role: {r}")
pb_lines.append(f" tags: [{_role_tag(r)}]")
with open(path, "w", encoding="utf-8") as f:
f.write("\n".join(pb_lines) + "\n")
def _ensure_ansible_cfg(cfg_path: str) -> None:
if not os.path.exists(cfg_path):
with open(cfg_path, "w", encoding="utf-8") as f:
f.write("[defaults]\n")
f.write("roles_path = roles\n")
f.write("interpreter_python=/usr/bin/python3\n")
f.write("inventory = inventory\n")
f.write("stdout_callback = unixy\n")
f.write("force_color = 1\n")
f.write("vars_plugins_enabled = host_group_vars\n")
f.write("fact_caching = jsonfile\n")
f.write("fact_caching_connection = .enroll_cached_facts\n")
f.write("forks = 30\n")
f.write("remote_tmp = /tmp/ansible-${USER}\n")
f.write("timeout = 12\n")
f.write("[ssh_connection]\n")
f.write("pipelining = True\n")
f.write("scp_if_ssh = True\n")
return
def _ensure_requirements_yaml(
req_path: str,
collections: Optional[List[Dict[str, str]]] = None,
) -> None:
requested = collections or [{"name": "community.general", "version": ">=13.0.0"}]
existing: Dict[str, Any] = {}
if os.path.exists(req_path):
try:
existing = _yaml_load_mapping(Path(req_path).read_text(encoding="utf-8"))
except Exception:
existing = {}
current_items = existing.get("collections")
if not isinstance(current_items, list):
current_items = []
by_name: Dict[str, Dict[str, str]] = {}
ordered_names: List[str] = []
for item in current_items:
if isinstance(item, str):
name = item.strip()
if not name:
continue
entry: Dict[str, str] = {"name": name}
elif isinstance(item, dict):
name = str(item.get("name") or "").strip()
if not name:
continue
entry = {str(k): str(v) for k, v in item.items() if v is not None}
entry["name"] = name
else:
continue
if name not in by_name:
ordered_names.append(name)
by_name[name] = entry
for item in requested:
name = str(item.get("name") or "").strip()
if not name:
continue
entry = dict(item)
entry["name"] = name
if name not in by_name:
ordered_names.append(name)
by_name[name] = entry
else:
by_name[name].update(
{k: v for k, v in entry.items() if v not in (None, "")}
)
out = {"collections": [by_name[name] for name in ordered_names]}
Path(req_path).parent.mkdir(parents=True, exist_ok=True)
Path(req_path).write_text(
"---\n" + _yaml_dump_mapping(out, sort_keys=False), encoding="utf-8"
)
def _ensure_inventory_host(inv_path: str, fqdn: str) -> None:
os.makedirs(os.path.dirname(inv_path), exist_ok=True)
if not os.path.exists(inv_path):
with open(inv_path, "w", encoding="utf-8") as f:
f.write("[all]\n")
f.write(fqdn + "\n")
return
with open(inv_path, "r", encoding="utf-8") as f:
lines = [ln.rstrip("\n") for ln in f.readlines()]
# ensure there is an [all] group; if not, create it at top
if not any(ln.strip() == "[all]" for ln in lines):
lines = ["[all]"] + lines
# check if fqdn already present (exact match, ignoring whitespace)
if any(ln.strip() == fqdn for ln in lines):
return
# append at end
lines.append(fqdn)
with open(inv_path, "w", encoding="utf-8") as f:
f.write("\n".join(lines) + "\n")
def _hostvars_path(site_root: str, fqdn: str, role: str) -> str:
return os.path.join(site_root, "inventory", "host_vars", fqdn, f"{role}.yml")
def _host_role_files_dir(site_root: str, fqdn: str, role: str) -> str:
"""Host-specific files dir for a given role.
Layout:
inventory/host_vars/<fqdn>/<role>/.files/
"""
return os.path.join(site_root, "inventory", "host_vars", fqdn, role, ".files")
def _write_hostvars(site_root: str, fqdn: str, role: str, data: Dict[str, Any]) -> None:
"""Write host_vars YAML for a role for a specific host.
This is host-specific state and should track the current harvest output.
Existing keys not mentioned in `data` are preserved, but keys in `data`
are overwritten (including list values).
"""
path = _hostvars_path(site_root, fqdn, role)
os.makedirs(os.path.dirname(path), exist_ok=True)
existing_map: Dict[str, Any] = {}
if os.path.exists(path):
try:
existing_text = Path(path).read_text(encoding="utf-8")
existing_map = _yaml_load_mapping(existing_text)
except Exception:
existing_map = {}
merged = _merge_mappings_overwrite(existing_map, data)
out = "---\n" + _yaml_dump_mapping(merged, sort_keys=True)
with open(path, "w", encoding="utf-8") as f:
f.write(out)
def _write_role_defaults(role_dir: str, mapping: Dict[str, Any]) -> None:
"""Overwrite role defaults/main.yml with the provided mapping."""
defaults_path = os.path.join(role_dir, "defaults", "main.yml")
os.makedirs(os.path.dirname(defaults_path), exist_ok=True)
out = "---\n" + _yaml_dump_mapping(mapping, sort_keys=True)
with open(defaults_path, "w", encoding="utf-8") as f:
f.write(out)
def _write_site_scaffold(ctx: AnsibleManifestContext) -> None:
if not ctx.site_mode:
return
os.makedirs(os.path.join(ctx.out_dir, "inventory"), exist_ok=True)
os.makedirs(os.path.join(ctx.out_dir, "inventory", "host_vars"), exist_ok=True)
os.makedirs(os.path.join(ctx.out_dir, "playbooks"), exist_ok=True)
_ensure_inventory_host(
os.path.join(ctx.out_dir, "inventory", "hosts.ini"), ctx.fqdn or ""
)
_ensure_ansible_cfg(os.path.join(ctx.out_dir, "ansible.cfg"))
_ensure_requirements_yaml(os.path.join(ctx.out_dir, "requirements.yml"))
def _write_manifest_playbook(ctx: AnsibleManifestContext, roles: List[str]) -> None:
if ctx.site_mode:
_write_playbook_host(
os.path.join(ctx.out_dir, "playbooks", f"{ctx.fqdn}.yml"),
ctx.fqdn or "",
roles,
)
else:
_write_playbook_all(os.path.join(ctx.out_dir, "playbook.yml"), roles)

View file

@ -1,227 +0,0 @@
from __future__ import annotations
import re
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Set
from ..cm import CMModule, package_section_label, section_label_for_packages
from ..role_names import avoid_reserved_role_name
@dataclass
class AnsibleRoleCollection:
services: List[Dict[str, Any]]
packages: List[Dict[str, Any]]
common_role_groups: Dict[str, List[Dict[str, Any]]]
class AnsibleRole(CMModule):
"""Ansible-specific view of a renderer-neutral CMModule."""
def __init__(
self,
role_name: str,
*,
var_prefix: Optional[str] = None,
section_label: Optional[str] = None,
grouped: bool = False,
) -> None:
super().__init__(role_name=role_name, module_name=role_name)
self.var_prefix = var_prefix or role_name
self.section_label = section_label
self.grouped = grouped
self.entries: List[Dict[str, Any]] = []
self.excluded: List[Dict[str, Any]] = []
self.origin_lines: List[str] = []
def add_package_snapshot(self, snap: Dict[str, Any]) -> None:
pkg = str(snap.get("package") or "").strip()
source_role = str(snap.get("role_name") or pkg or self.role_name)
self.entries.append({"kind": "package", "snapshot": snap})
if pkg:
self.packages.add(pkg)
self.origin_lines.append(f"package `{pkg}` from role `{source_role}`")
self.add_managed_content(snap)
def add_service_snapshot(self, snap: Dict[str, Any]) -> None:
unit = str(snap.get("unit") or "").strip()
source_role = str(snap.get("role_name") or unit or self.role_name)
self.entries.append({"kind": "service", "snapshot": snap})
for pkg in snap.get("packages", []) or []:
pkg_s = str(pkg or "").strip()
if pkg_s:
self.packages.add(pkg_s)
if unit:
unit_file_state = str(snap.get("unit_file_state") or "")
self.services.setdefault(
unit,
{
"name": unit,
"manage": True,
"enabled": unit_file_state in ("enabled", "enabled-runtime"),
"state": (
"started" if snap.get("active_state") == "active" else "stopped"
),
},
)
self.origin_lines.append(f"service `{unit}` from role `{source_role}`")
self.add_managed_content(snap)
def add_managed_content(self, snap: Dict[str, Any]) -> None:
for d in self.managed_dirs_from_snapshot(snap):
path = str(d.get("path") or "").strip()
self.add_managed_dir(
path,
dest=path,
owner=d.get("owner") or "root",
group=d.get("group") or "root",
mode=d.get("mode") or "0755",
)
for mf in self.managed_files_from_snapshot(snap):
path = str(mf.get("path") or "").strip()
src_rel = str(mf.get("src_rel") or "").strip()
if not path or not src_rel:
continue
self.add_managed_file(
path,
dest=path,
src_rel=src_rel,
owner=mf.get("owner") or "root",
group=mf.get("group") or "root",
mode=mf.get("mode") or "0644",
reason=mf.get("reason") or "managed_file",
)
for ml in self.managed_links_from_snapshot(snap):
path = str(ml.get("path") or "").strip()
target = str(ml.get("target") or "").strip()
if not path or not target:
continue
self.add_managed_link(path, dest=path, src=target)
self.excluded.extend(snap.get("excluded", []) or [])
self.add_snapshot_notes(snap)
@property
def sorted_packages(self) -> List[str]:
return sorted(self.packages)
@property
def systemd_units_var(self) -> List[Dict[str, Any]]:
return [self.services[k] for k in sorted(self.services)]
class AnsibleManifestPlan:
"""Track generated Ansible roles without scattering category lists."""
_ORDER = (
"apt_config",
"dnf_config",
"package",
"service",
"etc_custom",
"usr_local_custom",
"extra_paths",
"flatpak",
"snap",
"container_images",
"users",
"tail_package",
"sysctl",
"firewall_runtime",
)
def __init__(self) -> None:
self._roles: Dict[str, List[str]] = {category: [] for category in self._ORDER}
self._tail_packages: List[str] = []
def add(self, category: str, role: str) -> None:
if category not in self._roles:
raise ValueError(f"unknown Ansible role category: {category}")
if role and role not in self._roles[category]:
self._roles[category].append(role)
def roles(self, category: str) -> List[str]:
return list(self._roles.get(category, []))
def has(self, category: str, role: str) -> bool:
return role in self._roles.get(category, [])
def mark_tail_package(self, role: str) -> None:
if self.has("package", role) and role not in self._tail_packages:
self._tail_packages.append(role)
def ordered_roles(self) -> List[str]:
tail = set(self._tail_packages)
package_roles = [r for r in self._roles["package"] if r not in tail]
out: List[str] = []
for category in self._ORDER:
if category == "package":
out.extend(package_roles)
elif category == "tail_package":
out.extend(self._tail_packages)
else:
out.extend(self._roles[category])
return out
def _role_id(raw: str) -> str:
"""Return an Ansible-safe role identifier from an arbitrary label."""
s = re.sub(r"[^A-Za-z0-9]+", "_", raw or "misc")
s = re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", s)
s = s.lower()
s = re.sub(r"_+", "_", s).strip("_")
if not s:
s = "misc"
if not re.match(r"^[a-z_]", s):
s = "r_" + s
return s
def _section_role_name(label: str, occupied_roles: Set[str]) -> str:
"""Create a stable section role name, avoiding generated-role collisions."""
base = avoid_reserved_role_name(_role_id(label), prefix="section")
role = base if base not in occupied_roles else f"section_{base}"
n = 2
while role in occupied_roles:
role = f"section_{base}_{n}"
n += 1
occupied_roles.add(role)
return role
def _collect_ansible_roles(
roles: Dict[str, Any],
inventory_packages: Dict[str, Any],
*,
use_common_roles: bool,
) -> AnsibleRoleCollection:
services = roles.get("services", []) or []
packages = roles.get("packages", []) or []
common_role_groups: Dict[str, List[Dict[str, Any]]] = {}
if use_common_roles:
for svc in services:
label = section_label_for_packages(
svc.get("packages", []) or [], inventory_packages
)
common_role_groups.setdefault(label, []).append(
{"kind": "service", "snapshot": svc}
)
for pr in packages:
label = package_section_label(pr, inventory_packages)
common_role_groups.setdefault(label, []).append(
{"kind": "package", "snapshot": pr}
)
return AnsibleRoleCollection(
services=[], packages=[], common_role_groups=common_role_groups
)
return AnsibleRoleCollection(
services=services,
packages=packages,
common_role_groups=common_role_groups,
)

View file

@ -1,226 +0,0 @@
from __future__ import annotations
import os
import re
from typing import Any, Callable, Dict, List, Set
def _markdown_list(items: List[str]) -> str:
values = [str(item) for item in items if str(item)]
return "\n".join(f"- {item}" for item in values) or "- (none)"
def _managed_file_lines(
managed_files: List[Dict[str, Any]], *, include_reason: bool
) -> List[str]:
out: List[str] = []
for mf in managed_files:
path = str(mf.get("path") or "")
if not path:
continue
if include_reason:
out.append(f"{path} ({mf.get('reason')})")
else:
out.append(path)
return out
def _excluded_lines(excluded: List[Dict[str, Any]]) -> List[str]:
return [f"{e.get('path')} ({e.get('reason')})" for e in excluded if e.get("path")]
def _read_artifact_lines(bundle_dir: str, role: str, src_rel: str) -> List[str]:
art_path = os.path.join(bundle_dir, "artifacts", role, src_rel)
try:
with open(art_path, "r", encoding="utf-8", errors="replace") as f:
return [line.rstrip("\n") for line in f]
except OSError:
return []
def _apt_config_readme(
*,
bundle_dir: str,
role: str,
snapshot: Dict[str, Any],
managed_files: List[Dict[str, Any]],
managed_dirs: List[Dict[str, Any]],
excluded: List[Dict[str, Any]],
notes: List[Any],
) -> str:
source_paths: List[str] = []
keyring_paths: List[str] = []
repo_hosts: Set[str] = set()
url_re = re.compile(r"(?:https?|ftp)://([^/\s]+)", re.IGNORECASE)
for mf in managed_files:
path = str(mf.get("path") or "")
src_rel = str(mf.get("src_rel") or "")
if not path or not src_rel:
continue
if path == "/etc/apt/sources.list" or path.startswith(
"/etc/apt/sources.list.d/"
):
source_paths.append(path)
for line in _read_artifact_lines(bundle_dir, role, src_rel):
s = line.strip()
if not s or s.startswith("#"):
continue
for match in url_re.finditer(s):
repo_hosts.add(match.group(1))
if (
path.startswith("/etc/apt/trusted.gpg")
or path.startswith("/etc/apt/keyrings/")
or path.startswith("/usr/share/keyrings/")
):
keyring_paths.append(path)
return f"""# apt_config
APT configuration harvested from the system (sources, pinning, and keyrings).
## Repository hosts
{_markdown_list(sorted(repo_hosts))}
## Source files
{_markdown_list(sorted(set(source_paths)))}
## Keyrings
{_markdown_list(sorted(set(keyring_paths)))}
## Managed files
{_markdown_list(_managed_file_lines(managed_files, include_reason=True))}
## Excluded
{_markdown_list(_excluded_lines(excluded))}
## Notes
{_markdown_list([str(n) for n in notes])}
"""
def _dnf_config_readme(
*,
bundle_dir: str,
role: str,
snapshot: Dict[str, Any],
managed_files: List[Dict[str, Any]],
managed_dirs: List[Dict[str, Any]],
excluded: List[Dict[str, Any]],
notes: List[Any],
) -> str:
repo_paths: List[str] = []
key_paths: List[str] = []
repo_hosts: Set[str] = set()
url_re = re.compile(r"(?:https?|ftp)://([^/\s]+)", re.IGNORECASE)
file_url_re = re.compile(r"file://(/[^\s]+)")
for mf in managed_files:
path = str(mf.get("path") or "")
src_rel = str(mf.get("src_rel") or "")
if not path or not src_rel:
continue
if path.startswith("/etc/yum.repos.d/") and path.endswith(".repo"):
repo_paths.append(path)
for line in _read_artifact_lines(bundle_dir, role, src_rel):
s = line.strip()
if not s or s.startswith("#") or s.startswith(";"):
continue
for match in url_re.finditer(s):
repo_hosts.add(match.group(1))
for match in file_url_re.finditer(s):
key_paths.append(match.group(1))
if path.startswith("/etc/pki/rpm-gpg/"):
key_paths.append(path)
return f"""# dnf_config
DNF/YUM configuration harvested from the system (repos, config files, and RPM GPG keys).
## Repository hosts
{_markdown_list(sorted(repo_hosts))}
## Repo files
{_markdown_list(sorted(set(repo_paths)))}
## GPG keys
{_markdown_list(sorted(set(key_paths)))}
## Managed files
{_markdown_list(_managed_file_lines(managed_files, include_reason=True))}
## Excluded
{_markdown_list(_excluded_lines(excluded))}
## Notes
{_markdown_list([str(n) for n in notes])}
"""
def _simple_managed_files_readme(
title: str,
description: str,
*,
include_reason: bool,
) -> Callable[..., str]:
def _builder(
*,
bundle_dir: str,
role: str,
snapshot: Dict[str, Any],
managed_files: List[Dict[str, Any]],
managed_dirs: List[Dict[str, Any]],
excluded: List[Dict[str, Any]],
notes: List[Any],
) -> str:
return f"""# {title}
{description}
## Managed files
{_markdown_list(_managed_file_lines(managed_files, include_reason=include_reason))}
## Excluded
{_markdown_list(_excluded_lines(excluded))}
## Notes
{_markdown_list([str(n) for n in notes])}
"""
return _builder
def _extra_paths_readme(
*,
bundle_dir: str,
role: str,
snapshot: Dict[str, Any],
managed_files: List[Dict[str, Any]],
managed_dirs: List[Dict[str, Any]],
excluded: List[Dict[str, Any]],
notes: List[Any],
) -> str:
include_pats = snapshot.get("include_patterns", []) or []
exclude_pats = snapshot.get("exclude_patterns", []) or []
return f"""# {role}
User-requested extra file harvesting.
## Include patterns
{_markdown_list([str(p) for p in include_pats])}
## Exclude patterns
{_markdown_list([str(p) for p in exclude_pats])}
## Managed directories
{_markdown_list([str(d.get('path') or '') for d in managed_dirs])}
## Managed files
{_markdown_list(_managed_file_lines(managed_files, include_reason=False))}
## Excluded
{_markdown_list(_excluded_lines(excluded))}
## Notes
{_markdown_list([str(n) for n in notes])}
"""

View file

@ -1 +0,0 @@
"""Role writers for the Ansible renderer."""

View file

@ -1,192 +0,0 @@
from __future__ import annotations
import os
from typing import Any, Dict, List
from ..context import AnsibleManifestContext
from ..layout import (
_ensure_requirements_yaml,
_write_hostvars,
_write_role_defaults,
_write_role_scaffold,
)
from ..model import AnsibleManifestPlan
from ..vars import _normalise_container_image_item
_CONTAINER_COLLECTIONS = [
{"name": "community.docker", "version": ">=4.0.0"},
{"name": "containers.podman", "version": ">=1.0.0"},
]
def _render_container_images_role(
ctx: AnsibleManifestContext,
manifest_plan: AnsibleManifestPlan,
container_images_snapshot: Dict[str, Any],
) -> None:
raw_images = container_images_snapshot.get("images", []) or []
if not container_images_snapshot and not raw_images:
return
images = [_normalise_container_image_item(img) for img in raw_images]
if not images and not (container_images_snapshot.get("notes") or []):
return
role = container_images_snapshot.get("role_name", "container_images")
role_dir = os.path.join(ctx.roles_root, role)
_write_role_scaffold(role_dir)
_ensure_requirements_yaml(
os.path.join(ctx.out_dir, "requirements.yml"), _CONTAINER_COLLECTIONS
)
vars_map = {"container_images": images}
if ctx.site_mode:
_write_role_defaults(role_dir, {"container_images": []})
_write_hostvars(ctx.out_dir, ctx.fqdn or "", role, vars_map)
else:
_write_role_defaults(role_dir, vars_map)
with open(os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8") as f:
f.write(
"---\n"
"dependencies: []\n"
"collections:\n"
" - community.docker\n"
" - containers.podman\n"
)
tasks = """---
- name: Pull Docker images by immutable registry digest
community.docker.docker_image_pull:
name: "{{ item.pull_ref }}"
pull: not_present
platform: "{{ item.platform | default(omit, true) }}"
loop: "{{ container_images | default([]) | selectattr('engine', 'equalto', 'docker') | selectattr('pull_ref', 'defined') | list }}"
when:
- item.pull_ref | default('') | length > 0
become: true
- name: Tag Docker images with harvested tag aliases
community.docker.docker_image_tag:
name: "{{ item.0.pull_ref }}"
repository:
- "{{ item.1.ref }}"
loop: "{{ query('subelements', container_images | default([]) | selectattr('engine', 'equalto', 'docker') | selectattr('pull_ref', 'defined') | list, 'tag_aliases', {'skip_missing': True}) }}"
when:
- item.0.pull_ref | default('') | length > 0
- item.1.repository | default('') | length > 0
- item.1.tag | default('') | length > 0
become: true
- name: Pull system Podman images by immutable registry digest
containers.podman.podman_image:
name: "{{ item.pull_ref }}"
state: present
force: false
platform: "{{ item.platform | default(omit, true) }}"
loop: "{{ container_images | default([]) | selectattr('engine', 'equalto', 'podman') | rejectattr('scope', 'equalto', 'user') | selectattr('pull_ref', 'defined') | list }}"
when:
- item.pull_ref | default('') | length > 0
become: true
- name: Tag system Podman images with harvested tag aliases
containers.podman.podman_tag:
image: "{{ item.0.pull_ref }}"
target_names:
- "{{ item.1.ref }}"
loop: "{{ query('subelements', container_images | default([]) | selectattr('engine', 'equalto', 'podman') | rejectattr('scope', 'equalto', 'user') | selectattr('pull_ref', 'defined') | list, 'tag_aliases', {'skip_missing': True}) }}"
when:
- item.0.pull_ref | default('') | length > 0
- item.1.ref | default('') | length > 0
become: true
- name: Pull user Podman images by immutable registry digest
containers.podman.podman_image:
name: "{{ item.pull_ref }}"
state: present
force: false
platform: "{{ item.platform | default(omit, true) }}"
loop: "{{ container_images | default([]) | selectattr('engine', 'equalto', 'podman') | selectattr('scope', 'equalto', 'user') | selectattr('pull_ref', 'defined') | list }}"
when:
- item.pull_ref | default('') | length > 0
- item.user | default('') | length > 0
become: true
become_user: "{{ item.user }}"
- name: Tag user Podman images with harvested tag aliases
containers.podman.podman_tag:
image: "{{ item.0.pull_ref }}"
target_names:
- "{{ item.1.ref }}"
loop: "{{ query('subelements', container_images | default([]) | selectattr('engine', 'equalto', 'podman') | selectattr('scope', 'equalto', 'user') | selectattr('pull_ref', 'defined') | list, 'tag_aliases', {'skip_missing': True}) }}"
when:
- item.0.pull_ref | default('') | length > 0
- item.0.user | default('') | length > 0
- item.1.ref | default('') | length > 0
become: true
become_user: "{{ item.0.user }}"
"""
with open(os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8") as f:
f.write(tasks)
with open(
os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8"
) as f:
f.write("---\n")
def _fmt_image(img: Dict[str, Any]) -> str:
pull_ref = (
img.get("pull_ref") or "(no registry digest; not rendered as an exact pull)"
)
tags = img.get("repo_tags") or []
tag_part = f" tags={', '.join(tags)}" if tags else ""
platform = img.get("platform")
platform_part = f" platform={platform}" if platform else ""
return f"- {img.get('engine', 'unknown')}: {pull_ref}{tag_part}{platform_part}"
notes = list(container_images_snapshot.get("notes", []) or [])
unpinned_notes: List[str] = []
for img in images:
if img.get("pull_ref"):
continue
label = (
", ".join(img.get("repo_tags") or [])
or img.get("image_id")
or "unknown image"
)
unpinned_notes.append(
f"{label}: no RepoDigest was available, so no exact pull task is emitted."
)
readme = (
"""# container_images
Generated Docker and Podman image-cache restoration role.
Images are pulled by immutable registry digest, such as
`registry.example.net/app@sha256:...`, when the harvest found a usable
`RepoDigest`. Local image IDs are recorded in `state.json` for evidence but are
not registry pull references.
**Note:** This role requires the `community.docker` and `containers.podman`
Ansible collections. Install them with:
`ansible-galaxy collection install -r requirements.yml`.
Registry credentials are not harvested. Private-registry authentication must be
managed separately before this role runs.
## Container images
"""
+ "\n".join(_fmt_image(img) for img in images)
+ """
## Notes
"""
+ ("\n".join([f"- {n}" for n in notes + unpinned_notes]) or "- (none)")
+ "\n"
)
with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f:
f.write(readme)
manifest_plan.add("container_images", role)

View file

@ -1,308 +0,0 @@
from __future__ import annotations
import os
from typing import Any, Dict, List
from ..context import AnsibleManifestContext
from ..layout import (
_ensure_requirements_yaml,
_write_hostvars,
_write_role_defaults,
_write_role_scaffold,
)
from ..model import AnsibleManifestPlan
from ..vars import (
_normalise_flatpak_item,
_normalise_flatpak_remote,
_normalise_snap_item,
)
def _render_flatpak_role(
ctx: AnsibleManifestContext,
manifest_plan: AnsibleManifestPlan,
flatpak_snapshot: Dict[str, Any],
) -> None:
out_dir = ctx.out_dir
roles_root = ctx.roles_root
fqdn = ctx.fqdn
site_mode = ctx.site_mode
# -------------------------
# Flatpak role (system-wide Flatpak remotes and applications)
# -------------------------
raw_flatpak_apps = flatpak_snapshot.get("system_flatpaks", []) or []
raw_flatpak_remotes = flatpak_snapshot.get("remotes", []) or []
if flatpak_snapshot:
role = flatpak_snapshot.get("role_name", "flatpak")
role_dir = os.path.join(roles_root, role)
_write_role_scaffold(role_dir)
_ensure_requirements_yaml(os.path.join(out_dir, "requirements.yml"))
flatpak_system_flatpaks = [
_normalise_flatpak_item(fp, method="system") for fp in raw_flatpak_apps
]
flatpak_remotes = [_normalise_flatpak_remote(r) for r in raw_flatpak_remotes]
vars_map = {
"flatpak_system_flatpaks": flatpak_system_flatpaks,
"flatpak_remotes": flatpak_remotes,
}
if site_mode:
_write_role_defaults(
role_dir,
{"flatpak_system_flatpaks": [], "flatpak_remotes": []},
)
_write_hostvars(out_dir, fqdn or "", role, vars_map)
else:
_write_role_defaults(role_dir, vars_map)
with open(
os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8"
) as f:
f.write(
"---\n" "dependencies: []\n" "collections:\n" " - community.general\n"
)
tasks = """---
- name: Ensure system Flatpak remotes exist
ansible.builtin.command:
argv:
- flatpak
- remote-add
- --system
- --if-not-exists
- "{{ item.name }}"
- "{{ item.url }}"
loop: "{{ flatpak_remotes | default([]) }}"
when:
- item.name is defined
- item.url is defined
- item.url | length > 0
become: true
changed_when: false
- name: Install system-wide Flatpaks
community.general.flatpak:
name:
- "{{ item.name }}"
state: present
method: system
remote: "{{ item.remote | default(omit) }}"
from_url: "{{ item.from_url | default(omit) }}"
loop: "{{ flatpak_system_flatpaks | default([]) }}"
when:
- item.name is defined
- item.name | length > 0
become: true
"""
with open(
os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8"
) as f:
f.write(tasks)
with open(
os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8"
) as f:
f.write("---\n")
def _fmt_flatpak_apps(items: List[Dict[str, Any]]) -> str:
lines = []
for item in items:
name = item.get("name")
if not name:
continue
detail_parts = []
for key in ("remote", "branch", "arch"):
value = item.get(key)
if value not in (None, "", []):
detail_parts.append(f"{key}={value}")
details = f" ({', '.join(detail_parts)})" if detail_parts else ""
lines.append(f"- {name}{details}")
return "\n".join(lines) or "- (none)"
def _fmt_flatpak_remotes(items: List[Dict[str, Any]]) -> str:
lines = []
for item in items:
name = item.get("name")
url = item.get("url")
if not name or not url:
continue
lines.append(f"- {name}: {url}")
return "\n".join(lines) or "- (none)"
notes = flatpak_snapshot.get("notes", []) or []
readme = (
"""# flatpak
Generated system-wide Flatpak remotes and applications.
**Note:** This role requires the `community.general` Ansible collection.
Install it with: `ansible-galaxy collection install -r requirements.yml`.
Flatpak `remote` is harvested from the installed deployment where detectable.
The original `.flatpakref` URL is generally not preserved by Flatpak after
installation, so `from_url` is only emitted if a future/hand-edited state file
contains it.
## System Flatpak remotes
"""
+ _fmt_flatpak_remotes(flatpak_remotes)
+ """\n
## System-wide Flatpaks
"""
+ _fmt_flatpak_apps(flatpak_system_flatpaks)
+ """\n
## Notes
"""
+ ("\n".join([f"- {n}" for n in notes]) or "- (none)")
+ """\n"""
)
with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f:
f.write(readme)
manifest_plan.add("flatpak", role)
def _render_snap_role(
ctx: AnsibleManifestContext,
manifest_plan: AnsibleManifestPlan,
snap_snapshot: Dict[str, Any],
) -> None:
out_dir = ctx.out_dir
roles_root = ctx.roles_root
fqdn = ctx.fqdn
site_mode = ctx.site_mode
# -------------------------
# Snap role (system-wide snap packages)
# -------------------------
raw_system_snaps = snap_snapshot.get("system_snaps", []) or []
if raw_system_snaps:
role = snap_snapshot.get("role_name", "snap") if snap_snapshot else "snap"
role_dir = os.path.join(roles_root, role)
_write_role_scaffold(role_dir)
_ensure_requirements_yaml(os.path.join(out_dir, "requirements.yml"))
snap_system_snaps = [_normalise_snap_item(s) for s in raw_system_snaps]
vars_map = {"snap_system_snaps": snap_system_snaps}
if site_mode:
_write_role_defaults(role_dir, {"snap_system_snaps": []})
_write_hostvars(out_dir, fqdn or "", role, vars_map)
else:
_write_role_defaults(role_dir, vars_map)
with open(
os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8"
) as f:
f.write(
"---\n" "dependencies: []\n" "collections:\n" " - community.general\n"
)
tasks = """---
- name: Install system-wide snaps with full detected attributes
community.general.snap:
name:
- "{{ item.name }}"
state: present
channel: "{{ item.channel | default(omit) if not (item.install_revision | default(false)) else omit }}"
revision: "{{ item.revision | default(omit) if (item.install_revision | default(false)) else omit }}"
classic: "{{ item.classic | default(false) }}"
devmode: "{{ item.devmode | default(false) }}"
dangerous: "{{ item.dangerous | default(false) }}"
loop: "{{ snap_system_snaps | default([]) }}"
when:
- item.name is defined
- item.name | length > 0
become: true
register: _enroll_snap_full_results
ignore_errors: true
- name: Install system-wide snaps with compatibility options
community.general.snap:
name:
- "{{ item.item.name }}"
state: present
channel: "{{ item.item.channel | default(omit) if not (item.item.install_revision | default(false)) else omit }}"
classic: "{{ item.item.classic | default(false) }}"
loop: "{{ (_enroll_snap_full_results | default({})).results | default([]) }}"
when:
- item.failed | default(false)
- item.item.name is defined
- item.item.name | length > 0
become: true
register: _enroll_snap_compat_results
ignore_errors: true
- name: Install system-wide snaps with minimal options
community.general.snap:
name:
- "{{ item.item.item.name }}"
state: present
loop: "{{ (_enroll_snap_compat_results | default({})).results | default([]) }}"
when:
- item.failed | default(false)
- item.item.item.name is defined
- item.item.item.name | length > 0
become: true
ignore_errors: true
"""
with open(
os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8"
) as f:
f.write(tasks)
with open(
os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8"
) as f:
f.write("---\n")
def _fmt_snap_apps(items: List[Dict[str, Any]]) -> str:
lines = []
for item in items:
name = item.get("name")
if not name:
continue
detail_parts = []
for key in ("channel", "revision"):
value = item.get(key)
if value not in (None, "", []):
detail_parts.append(f"{key}={value}")
for key in ("classic", "devmode", "dangerous"):
if item.get(key):
detail_parts.append(key)
details = f" ({', '.join(detail_parts)})" if detail_parts else ""
lines.append(f"- {name}{details}")
return "\n".join(lines) or "- (none)"
notes = snap_snapshot.get("notes", []) or []
readme = (
"""# snap
Generated system-wide snap packages.
**Note:** This role requires the `community.general` Ansible collection.
Install it with: `ansible-galaxy collection install -r requirements.yml`.
The first install task uses all harvested attributes. If the installed
`community.general.snap` module is too old for some parameters, the generated
role falls back to reduced then minimal install tasks on a best-effort basis.
## System-wide snaps
"""
+ _fmt_snap_apps(snap_system_snaps)
+ """\n
## Notes
"""
+ ("\n".join([f"- {n}" for n in notes]) or "- (none)")
+ """\n"""
)
with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f:
f.write(readme)
manifest_plan.add("snap", role)

View file

@ -1,257 +0,0 @@
from __future__ import annotations
import os
from dataclasses import dataclass
from typing import Any, Callable, Dict, Optional, Tuple
from ..context import AnsibleManifestContext
from ..jinjaturtle import _jinjify_managed_files
from ..layout import (
_copy_artifacts,
_host_role_files_dir,
_write_hostvars,
_write_role_defaults,
_write_role_scaffold,
)
from ..model import AnsibleManifestPlan
from ..readme import (
_apt_config_readme,
_dnf_config_readme,
_extra_paths_readme,
_simple_managed_files_readme,
)
from ..tasks import _render_generic_files_tasks
from ..vars import _build_managed_dirs_var, _build_managed_files_var
from ..yamlutil import _merge_mappings_overwrite, _yaml_load_mapping
@dataclass(frozen=True)
class AnsibleManagedFileRoleSpec:
"""Declarative managed-file singleton role rendering spec.
Puppet and Salt collect these singleton snapshots in a simple loop and feed
each one through the same managed-content renderer. Ansible has more
layout concerns (defaults vs host_vars, optional JinjaTurtle templates,
handlers), but the resource intent is the same, so keep the per-role
differences in data rather than spelling out one branch per role.
"""
key: str
default_role: str
category: str
readme_builder: Callable[..., str]
notify_systemd: Optional[str] = None
handlers: str = "---\n"
include_dirs_when_empty: bool = False
_SYSTEMD_DAEMON_RELOAD_HANDLER = """---
- name: Run systemd daemon-reload
ansible.builtin.systemd:
daemon_reload: true
no_log: "{{ enroll_hide_systemd_status | default(true) | bool }}"
"""
MANAGED_FILE_ROLE_SPECS: Tuple[AnsibleManagedFileRoleSpec, ...] = (
AnsibleManagedFileRoleSpec(
key="apt_config",
default_role="apt_config",
category="apt_config",
readme_builder=_apt_config_readme,
),
AnsibleManagedFileRoleSpec(
key="dnf_config",
default_role="dnf_config",
category="dnf_config",
readme_builder=_dnf_config_readme,
),
AnsibleManagedFileRoleSpec(
key="etc_custom",
default_role="etc_custom",
category="etc_custom",
notify_systemd="Run systemd daemon-reload",
handlers=_SYSTEMD_DAEMON_RELOAD_HANDLER,
readme_builder=_simple_managed_files_readme(
"etc_custom",
"Unowned /etc config files not attributed to packages or services.",
include_reason=False,
),
),
AnsibleManagedFileRoleSpec(
key="usr_local_custom",
default_role="usr_local_custom",
category="usr_local_custom",
readme_builder=_simple_managed_files_readme(
"usr_local_custom",
"Unowned /usr/local files (scripts in /usr/local/bin and config under /usr/local/etc).",
include_reason=False,
),
),
AnsibleManagedFileRoleSpec(
key="extra_paths",
default_role="extra_paths",
category="extra_paths",
readme_builder=_extra_paths_readme,
include_dirs_when_empty=True,
),
)
def _managed_file_role_has_resources(
snapshot: Dict[str, Any], spec: AnsibleManagedFileRoleSpec
) -> bool:
if not snapshot:
return False
if snapshot.get("managed_files"):
return True
return bool(spec.include_dirs_when_empty and snapshot.get("managed_dirs"))
def _write_managed_files_role_from_spec(
ctx: AnsibleManifestContext,
manifest_plan: AnsibleManifestPlan,
snapshot: Dict[str, Any],
spec: AnsibleManagedFileRoleSpec,
) -> None:
role = _write_managed_files_role(
snapshot=snapshot,
default_role=spec.default_role,
bundle_dir=ctx.bundle_dir,
roles_root=ctx.roles_root,
out_dir=ctx.out_dir,
fqdn=ctx.fqdn,
site_mode=ctx.site_mode,
jt_exe=ctx.jt_exe,
jt_enabled=ctx.jt_enabled,
notify_systemd=spec.notify_systemd,
handlers=spec.handlers,
readme_builder=spec.readme_builder,
)
manifest_plan.add(spec.category, role)
def _write_managed_files_role(
*,
snapshot: Dict[str, Any],
default_role: str,
bundle_dir: str,
roles_root: str,
out_dir: str,
fqdn: Optional[str],
site_mode: bool,
jt_exe: Optional[str],
jt_enabled: bool,
notify_systemd: Optional[str],
handlers: str,
readme_builder: Callable[..., str],
) -> str:
"""Render an Ansible role whose main purpose is managed files/dirs.
This covers apt_config, dnf_config, etc_custom, usr_local_custom, and
extra_paths. Their harvested state shape is the same; only their README
and optional handler differ.
"""
role = snapshot.get("role_name", default_role)
role_dir = os.path.join(roles_root, role)
_write_role_scaffold(role_dir)
var_prefix = role
managed_files = snapshot.get("managed_files", []) or []
managed_dirs = snapshot.get("managed_dirs", []) or []
excluded = snapshot.get("excluded", []) or []
notes = snapshot.get("notes", []) or []
templated, jt_vars = _jinjify_managed_files(
bundle_dir,
role,
role_dir,
managed_files,
jt_exe=jt_exe,
jt_enabled=jt_enabled,
overwrite_templates=not site_mode,
)
if site_mode:
_copy_artifacts(
bundle_dir,
role,
_host_role_files_dir(out_dir, fqdn or "", role),
exclude_rels=templated,
)
else:
_copy_artifacts(
bundle_dir,
role,
os.path.join(role_dir, "files"),
exclude_rels=templated,
)
files_var = _build_managed_files_var(
managed_files,
templated,
notify_other=None,
notify_systemd=notify_systemd,
)
dirs_var = _build_managed_dirs_var(managed_dirs)
jt_map = _yaml_load_mapping(jt_vars) if jt_vars.strip() else {}
vars_map: Dict[str, Any] = {
f"{var_prefix}_managed_files": files_var,
f"{var_prefix}_managed_dirs": dirs_var,
}
vars_map = _merge_mappings_overwrite(vars_map, jt_map)
if site_mode:
_write_role_defaults(
role_dir,
{f"{var_prefix}_managed_files": [], f"{var_prefix}_managed_dirs": []},
)
_write_hostvars(out_dir, fqdn or "", role, vars_map)
else:
_write_role_defaults(role_dir, vars_map)
tasks = "---\n" + _render_generic_files_tasks(
var_prefix, include_restart_notify=False
)
with open(os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8") as f:
f.write(tasks.rstrip() + "\n")
with open(
os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8"
) as f:
f.write(handlers.rstrip() + "\n")
with open(os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8") as f:
f.write("---\ndependencies: []\n")
readme = readme_builder(
bundle_dir=bundle_dir,
role=role,
snapshot=snapshot,
managed_files=managed_files,
managed_dirs=managed_dirs,
excluded=excluded,
notes=notes,
)
with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f:
f.write(readme)
return role
def _render_managed_file_roles(
ctx: AnsibleManifestContext,
manifest_plan: AnsibleManifestPlan,
roles: Dict[str, Any],
) -> None:
"""Render file-centric singleton roles in the same loop style as Puppet/Salt."""
for spec in MANAGED_FILE_ROLE_SPECS:
snapshot = roles.get(spec.key, {})
if not isinstance(snapshot, dict):
continue
if not _managed_file_role_has_resources(snapshot, spec):
continue
_write_managed_files_role_from_spec(ctx, manifest_plan, snapshot, spec)

View file

@ -1,601 +0,0 @@
from __future__ import annotations
import os
from typing import Any, Dict, List, Set
from ..context import AnsibleManifestContext
from ..jinjaturtle import _jinjify_managed_files
from ..layout import (
_copy_artifacts,
_host_role_files_dir,
_write_hostvars,
_write_role_defaults,
_write_role_scaffold,
)
from ..model import AnsibleManifestPlan, AnsibleRole, _section_role_name
from ..tasks import (
_render_generic_files_tasks,
_render_grouped_systemd_tasks,
_render_install_packages_tasks,
)
from ..vars import (
_build_managed_dirs_var,
_build_managed_files_var,
_build_managed_links_var,
)
from ..yamlutil import _merge_mappings_overwrite, _yaml_load_mapping
from ...role_names import avoid_reserved_role_name
def _render_service_roles(
ctx: AnsibleManifestContext,
manifest_plan: AnsibleManifestPlan,
services_to_manifest: List[Dict[str, Any]],
) -> None:
bundle_dir = ctx.bundle_dir
out_dir = ctx.out_dir
roles_root = ctx.roles_root
fqdn = ctx.fqdn
site_mode = ctx.site_mode
jt_exe = ctx.jt_exe
jt_enabled = ctx.jt_enabled
# -------------------------
# Service roles
# -------------------------
for svc in services_to_manifest:
source_role = svc["role_name"]
role = avoid_reserved_role_name(source_role, prefix="service")
unit = svc["unit"]
pkgs = svc.get("packages", []) or []
managed_files = svc.get("managed_files", []) or []
managed_dirs = svc.get("managed_dirs", []) or []
managed_links = svc.get("managed_links", []) or []
ansible_role = AnsibleRole(role)
ansible_role.add_service_snapshot(svc)
role_dir = os.path.join(roles_root, role)
_write_role_scaffold(role_dir)
var_prefix = role
unit_state = ansible_role.services.get(unit, {})
enabled_at_harvest = bool(unit_state.get("enabled"))
desired_state = str(unit_state.get("state") or "stopped")
templated, jt_vars = _jinjify_managed_files(
bundle_dir,
source_role,
role_dir,
managed_files,
jt_exe=jt_exe,
jt_enabled=jt_enabled,
overwrite_templates=not site_mode,
)
# Copy only the non-templated artifacts.
if site_mode:
_copy_artifacts(
bundle_dir,
source_role,
_host_role_files_dir(out_dir, fqdn or "", role),
exclude_rels=templated,
)
else:
_copy_artifacts(
bundle_dir,
source_role,
os.path.join(role_dir, "files"),
exclude_rels=templated,
)
files_var = _build_managed_files_var(
managed_files,
templated,
notify_other="Restart service",
notify_systemd="Run systemd daemon-reload",
)
links_var = _build_managed_links_var(managed_links)
dirs_var = _build_managed_dirs_var(managed_dirs)
jt_map = _yaml_load_mapping(jt_vars) if jt_vars.strip() else {}
base_vars: Dict[str, Any] = {
f"{var_prefix}_unit_name": unit,
f"{var_prefix}_packages": pkgs,
f"{var_prefix}_managed_files": files_var,
f"{var_prefix}_managed_dirs": dirs_var,
f"{var_prefix}_managed_links": links_var,
f"{var_prefix}_manage_unit": True,
f"{var_prefix}_systemd_enabled": bool(enabled_at_harvest),
f"{var_prefix}_systemd_state": desired_state,
}
base_vars = _merge_mappings_overwrite(base_vars, jt_map)
if site_mode:
# Role defaults are host-agnostic/safe; all harvested state is in host_vars.
_write_role_defaults(
role_dir,
{
f"{var_prefix}_unit_name": unit,
f"{var_prefix}_packages": [],
f"{var_prefix}_managed_files": [],
f"{var_prefix}_managed_dirs": [],
f"{var_prefix}_managed_links": [],
f"{var_prefix}_manage_unit": False,
f"{var_prefix}_systemd_enabled": False,
f"{var_prefix}_systemd_state": "stopped",
},
)
_write_hostvars(out_dir, fqdn or "", role, base_vars)
else:
_write_role_defaults(role_dir, base_vars)
handlers = f"""---
- name: Run systemd daemon-reload
ansible.builtin.systemd:
daemon_reload: true
no_log: "{{ enroll_hide_systemd_status | default(true) | bool }}"
- name: Restart service
ansible.builtin.service:
name: "{{{{ {var_prefix}_unit_name }}}}"
state: restarted
when:
- {var_prefix}_manage_unit | default(false)
- ({var_prefix}_systemd_state | default('stopped')) == 'started'
"""
with open(
os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8"
) as f:
f.write(handlers)
task_parts: List[str] = []
task_parts.append("---\n" + _render_install_packages_tasks(role, var_prefix))
task_parts.append(
_render_generic_files_tasks(var_prefix, include_restart_notify=True)
)
task_parts.append(
f"""- name: Probe whether systemd unit exists and is manageable
ansible.builtin.systemd:
name: "{{{{ {var_prefix}_unit_name }}}}"
no_log: "{{{{ enroll_hide_systemd_status | default(true) | bool }}}}"
check_mode: true
register: _unit_probe
failed_when: false
changed_when: false
when: {var_prefix}_manage_unit | default(false)
- name: Ensure unit enablement matches harvest
ansible.builtin.systemd:
name: "{{{{ {var_prefix}_unit_name }}}}"
enabled: "{{{{ {var_prefix}_systemd_enabled | bool }}}}"
no_log: "{{{{ enroll_hide_systemd_status | default(true) | bool }}}}"
when:
- {var_prefix}_manage_unit | default(false)
- _unit_probe is succeeded
- name: Ensure unit running state matches harvest
ansible.builtin.systemd:
name: "{{{{ {var_prefix}_unit_name }}}}"
state: "{{{{ {var_prefix}_systemd_state }}}}"
no_log: "{{{{ enroll_hide_systemd_status | default(true) | bool }}}}"
when:
- {var_prefix}_manage_unit | default(false)
- _unit_probe is succeeded
"""
)
tasks = "\n".join(task_parts).rstrip() + "\n"
with open(
os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8"
) as f:
f.write(tasks)
with open(
os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8"
) as f:
f.write("---\ndependencies: []\n")
excluded = svc.get("excluded", [])
notes = svc.get("notes", [])
readme = f"""# {role}
Generated from `{unit}`.
## Packages
{os.linesep.join("- " + p for p in pkgs) or "- (none detected)"}
## Managed files
{os.linesep.join("- " + mf["path"] + " (" + mf["reason"] + ")" for mf in managed_files) or "- (none)"}
## Managed symlinks
{os.linesep.join("- " + ml["path"] + " -> " + ml["target"] + " (" + ml.get("reason", "") + ")" for ml in managed_links) or "- (none)"}
## Excluded (possible secrets / unsafe)
{os.linesep.join("- " + e["path"] + " (" + e["reason"] + ")" for e in excluded) or "- (none)"}
## Notes
{os.linesep.join("- " + n for n in notes) or "- (none)"}
"""
with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f:
f.write(readme)
manifest_plan.add("service", role)
def _render_common_ansible_roles(
ctx: AnsibleManifestContext,
manifest_plan: AnsibleManifestPlan,
common_role_groups: Dict[str, List[Dict[str, Any]]],
package_roles: List[Dict[str, Any]],
) -> List[str]:
bundle_dir = ctx.bundle_dir
roles_root = ctx.roles_root
jt_exe = ctx.jt_exe
jt_enabled = ctx.jt_enabled
common_tail_roles: List[str] = []
# -------------------------
# Common package section/group roles
#
# Outside --fqdn/site mode, package and systemd-unit roles are grouped by
# Debian Section or RPM Group by default. Managed config and unit state can
# live in those section roles too; --no-common-roles preserves the historic
# one-role-per-package/unit output, and --fqdn implies that mode because
# grouped role contents would be unsafe across multiple harvested hosts.
# -------------------------
# -------------------------
# Manually installed package roles
# -------------------------
occupied_roles: Set[str] = set(
manifest_plan.roles("apt_config")
+ manifest_plan.roles("dnf_config")
+ manifest_plan.roles("users")
+ manifest_plan.roles("flatpak")
+ manifest_plan.roles("snap")
+ manifest_plan.roles("service")
+ manifest_plan.roles("firewall_runtime")
+ manifest_plan.roles("sysctl")
+ manifest_plan.roles("etc_custom")
+ manifest_plan.roles("usr_local_custom")
+ manifest_plan.roles("extra_paths")
)
for pr in package_roles:
occupied_roles.add(
avoid_reserved_role_name(str(pr.get("role_name") or ""), prefix="package")
)
for section_label, entries in sorted(common_role_groups.items()):
role = _section_role_name(section_label, occupied_roles)
ansible_role = AnsibleRole(
role,
var_prefix=role,
section_label=section_label,
grouped=True,
)
for entry in entries:
kind = entry.get("kind") or "package"
snap = entry.get("snapshot") or {}
if kind == "service":
ansible_role.add_service_snapshot(snap)
else:
ansible_role.add_package_snapshot(snap)
role_dir = os.path.join(roles_root, role)
_write_role_scaffold(role_dir)
var_prefix = ansible_role.var_prefix
files_var: List[Dict[str, Any]] = []
dirs_var: List[Dict[str, Any]] = []
links_var: List[Dict[str, Any]] = []
jt_combined: Dict[str, Any] = {}
seen_files: Set[tuple] = set()
seen_dirs: Set[tuple] = set()
seen_links: Set[tuple] = set()
for entry in ansible_role.entries:
kind = entry.get("kind") or "package"
snap = entry.get("snapshot") or {}
source_role = str(snap.get("role_name") or "")
managed_files = snap.get("managed_files", []) or []
managed_dirs = snap.get("managed_dirs", []) or []
managed_links = snap.get("managed_links", []) or []
templated: Set[str] = set()
jt_vars = ""
if managed_files and source_role:
templated, jt_vars = _jinjify_managed_files(
bundle_dir,
source_role,
role_dir,
managed_files,
jt_exe=jt_exe,
jt_enabled=jt_enabled,
overwrite_templates=True,
)
_copy_artifacts(
bundle_dir,
source_role,
os.path.join(role_dir, "files"),
exclude_rels=templated,
)
notify_other = "Restart managed services" if kind == "service" else None
for item in _build_managed_files_var(
managed_files,
templated,
notify_other=notify_other,
notify_systemd="Run systemd daemon-reload",
):
key = (item.get("dest"), item.get("src_rel"), item.get("kind"))
if key not in seen_files:
seen_files.add(key)
files_var.append(item)
for item in _build_managed_dirs_var(managed_dirs):
key = (
item.get("dest"),
item.get("owner"),
item.get("group"),
item.get("mode"),
)
if key not in seen_dirs:
seen_dirs.add(key)
dirs_var.append(item)
for item in _build_managed_links_var(managed_links):
key = (item.get("dest"), item.get("src"))
if key not in seen_links:
seen_links.add(key)
links_var.append(item)
jt_map = _yaml_load_mapping(jt_vars) if jt_vars.strip() else {}
jt_combined = _merge_mappings_overwrite(jt_combined, jt_map)
packages = ansible_role.sorted_packages
files_var = sorted(files_var, key=lambda x: str(x.get("dest") or ""))
dirs_var = sorted(dirs_var, key=lambda x: str(x.get("dest") or ""))
links_var = sorted(links_var, key=lambda x: str(x.get("dest") or ""))
systemd_units = ansible_role.systemd_units_var
base_vars: Dict[str, Any] = {
f"{var_prefix}_packages": packages,
f"{var_prefix}_managed_files": files_var,
f"{var_prefix}_managed_dirs": dirs_var,
f"{var_prefix}_managed_links": links_var,
f"{var_prefix}_systemd_units": systemd_units,
}
base_vars = _merge_mappings_overwrite(base_vars, jt_combined)
_write_role_defaults(role_dir, base_vars)
if {"cron", "logrotate"}.intersection(ansible_role.packages):
common_tail_roles.append(role)
handlers = (
"""---
- name: Run systemd daemon-reload
ansible.builtin.systemd:
daemon_reload: true
no_log: "{{ enroll_hide_systemd_status | default(true) | bool }}"
- name: Restart managed services
ansible.builtin.service:
name: "{{ item.name }}"
state: restarted
loop: "{{ """
+ f"{var_prefix}_systemd_units"
+ """ | default([]) }}"
when:
- item.manage | default(false)
- (item.state | default('stopped')) == 'started'
"""
)
with open(
os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8"
) as f:
f.write(handlers)
task_parts: List[str] = []
task_parts.append("---\n" + _render_install_packages_tasks(role, var_prefix))
task_parts.append(
_render_generic_files_tasks(var_prefix, include_restart_notify=True)
)
task_parts.append(_render_grouped_systemd_tasks(var_prefix))
tasks = "\n".join(task_parts).rstrip() + "\n"
with open(
os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8"
) as f:
f.write(tasks)
with open(
os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8"
) as f:
f.write("---\ndependencies: []\n")
readme = f"""# {role}
Common role for package section/group `{section_label}`.
## Origin roles
{os.linesep.join("- " + line for line in sorted(ansible_role.origin_lines)) or "- (none)"}
## Packages
{os.linesep.join("- " + p for p in packages) or "- (none)"}
## Managed files
{os.linesep.join("- " + mf["dest"] for mf in files_var) or "- (none)"}
## Managed symlinks
{os.linesep.join("- " + ml["dest"] + " -> " + ml["src"] for ml in links_var) or "- (none)"}
## Systemd units
{os.linesep.join("- " + u["name"] + " (enabled=" + str(u["enabled"]).lower() + ", state=" + u["state"] + ")" for u in systemd_units) or "- (none)"}
## Excluded (possible secrets / unsafe)
{os.linesep.join("- " + e.get("path", "") + " (" + e.get("reason", "") + ")" for e in ansible_role.excluded) or "- (none)"}
## Notes
{os.linesep.join("- " + n for n in ansible_role.notes) or "- (none)"}
"""
with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f:
f.write(readme)
manifest_plan.add("package", role)
return common_tail_roles
def _render_package_roles(
ctx: AnsibleManifestContext,
manifest_plan: AnsibleManifestPlan,
package_roles: List[Dict[str, Any]],
) -> None:
bundle_dir = ctx.bundle_dir
out_dir = ctx.out_dir
roles_root = ctx.roles_root
fqdn = ctx.fqdn
site_mode = ctx.site_mode
jt_exe = ctx.jt_exe
jt_enabled = ctx.jt_enabled
# Process package roles (those with configuration files)
for pr in package_roles:
source_role = pr["role_name"]
role = avoid_reserved_role_name(source_role, prefix="package")
pkg = pr.get("package") or ""
managed_files = pr.get("managed_files", []) or []
managed_dirs = pr.get("managed_dirs", []) or []
managed_links = pr.get("managed_links", []) or []
ansible_role = AnsibleRole(role)
ansible_role.add_package_snapshot(pr)
role_dir = os.path.join(roles_root, role)
_write_role_scaffold(role_dir)
var_prefix = role
templated, jt_vars = _jinjify_managed_files(
bundle_dir,
source_role,
role_dir,
managed_files,
jt_exe=jt_exe,
jt_enabled=jt_enabled,
overwrite_templates=not site_mode,
)
# Copy only the non-templated artifacts.
if site_mode:
_copy_artifacts(
bundle_dir,
source_role,
_host_role_files_dir(out_dir, fqdn or "", role),
exclude_rels=templated,
)
else:
_copy_artifacts(
bundle_dir,
source_role,
os.path.join(role_dir, "files"),
exclude_rels=templated,
)
pkgs = ansible_role.sorted_packages
files_var = _build_managed_files_var(
managed_files,
templated,
notify_other=None,
notify_systemd="Run systemd daemon-reload",
)
links_var = _build_managed_links_var(managed_links)
dirs_var = _build_managed_dirs_var(managed_dirs)
jt_map = _yaml_load_mapping(jt_vars) if jt_vars.strip() else {}
base_vars: Dict[str, Any] = {
f"{var_prefix}_packages": pkgs,
f"{var_prefix}_managed_files": files_var,
f"{var_prefix}_managed_dirs": dirs_var,
f"{var_prefix}_managed_links": links_var,
}
base_vars = _merge_mappings_overwrite(base_vars, jt_map)
if site_mode:
_write_role_defaults(
role_dir,
{
f"{var_prefix}_packages": [],
f"{var_prefix}_managed_files": [],
f"{var_prefix}_managed_dirs": [],
f"{var_prefix}_managed_links": [],
},
)
_write_hostvars(out_dir, fqdn or "", role, base_vars)
else:
_write_role_defaults(role_dir, base_vars)
handlers = """---
- name: Run systemd daemon-reload
ansible.builtin.systemd:
daemon_reload: true
no_log: "{{ enroll_hide_systemd_status | default(true) | bool }}"
"""
with open(
os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8"
) as f:
f.write(handlers)
task_parts: List[str] = []
task_parts.append("---\n" + _render_install_packages_tasks(role, var_prefix))
task_parts.append(
_render_generic_files_tasks(var_prefix, include_restart_notify=False)
)
tasks = "\n".join(task_parts).rstrip() + "\n"
with open(
os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8"
) as f:
f.write(tasks)
with open(
os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8"
) as f:
f.write("---\ndependencies: []\n")
excluded = pr.get("excluded", [])
notes = pr.get("notes", [])
readme = f"""# {role}
Generated for package `{pkg}`.
## Managed files
{os.linesep.join("- " + mf["path"] + " (" + mf["reason"] + ")" for mf in managed_files) or "- (none)"}
## Managed symlinks
{os.linesep.join("- " + ml["path"] + " -> " + ml["target"] + " (" + ml.get("reason", "") + ")" for ml in managed_links) or "- (none)"}
## Excluded (possible secrets / unsafe)
{os.linesep.join("- " + e["path"] + " (" + e["reason"] + ")" for e in excluded) or "- (none)"}
## Notes
{os.linesep.join("- " + n for n in notes) or "- (none)"}
> Note: package roles (those not discovered via a systemd service) do not attempt to restart or enable services automatically.
"""
with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f:
f.write(readme)
manifest_plan.add("package", role)

View file

@ -1,219 +0,0 @@
from __future__ import annotations
import os
from typing import Any, Dict
from ..context import AnsibleManifestContext
from ..layout import (
_copy_artifacts,
_host_role_files_dir,
_write_hostvars,
_write_role_defaults,
_write_role_scaffold,
)
from ..model import AnsibleManifestPlan
from ..tasks import (
_render_firewall_runtime_tasks,
_render_install_packages_tasks,
_render_sysctl_handlers,
_render_sysctl_tasks,
)
def _render_sysctl_role(
ctx: AnsibleManifestContext,
manifest_plan: AnsibleManifestPlan,
sysctl_snapshot: Dict[str, Any],
) -> None:
if not (sysctl_snapshot and (sysctl_snapshot.get("managed_files") or [])):
return
role = sysctl_snapshot.get("role_name", "sysctl")
role_dir = os.path.join(ctx.roles_root, role)
_write_role_scaffold(role_dir)
var_prefix = role
managed_files = sysctl_snapshot.get("managed_files", []) or []
conf_src_rel = ""
for mf in managed_files:
if mf.get("path") == "/etc/sysctl.d/99-enroll.conf":
conf_src_rel = mf.get("src_rel") or ""
break
if not conf_src_rel and managed_files:
conf_src_rel = managed_files[0].get("src_rel") or ""
parameters = sysctl_snapshot.get("parameters", {}) or {}
notes = sysctl_snapshot.get("notes", []) or []
if ctx.site_mode:
_copy_artifacts(
ctx.bundle_dir,
role,
_host_role_files_dir(ctx.out_dir, ctx.fqdn or "", role),
)
else:
_copy_artifacts(ctx.bundle_dir, role, os.path.join(role_dir, "files"))
vars_map: Dict[str, Any] = {
f"{var_prefix}_conf_src_rel": conf_src_rel,
f"{var_prefix}_apply": True,
f"{var_prefix}_ignore_apply_errors": True,
}
if ctx.site_mode:
_write_role_defaults(
role_dir,
{
f"{var_prefix}_conf_src_rel": "",
f"{var_prefix}_apply": True,
f"{var_prefix}_ignore_apply_errors": True,
},
)
_write_hostvars(ctx.out_dir, ctx.fqdn or "", role, vars_map)
else:
_write_role_defaults(role_dir, vars_map)
tasks = "---\n" + _render_sysctl_tasks(var_prefix)
with open(os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8") as f:
f.write(tasks.rstrip() + "\n")
handlers_dir = os.path.join(role_dir, "handlers")
os.makedirs(handlers_dir, exist_ok=True)
with open(os.path.join(handlers_dir, "main.yml"), "w", encoding="utf-8") as f:
f.write(_render_sysctl_handlers(var_prefix))
with open(os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8") as f:
f.write("---\ndependencies: []\n")
param_count = len(parameters) if isinstance(parameters, dict) else 0
sample_params = []
if isinstance(parameters, dict):
sample_params = sorted(parameters.keys())[:25]
readme = f"""# {role}
Generated from live writable sysctl state captured during harvest.
This role deploys the captured values to `/etc/sysctl.d/99-enroll.conf`. The generated file intentionally contains writable, single-line sysctl values only; read-only, multiline, action-like, and host-identity keys are skipped to avoid creating a noisy or brittle boot-time configuration.
## Captured parameters
Captured parameter count: {param_count}
{os.linesep.join("- " + x for x in sample_params) or "- (none)"}
{"- ..." if param_count > len(sample_params) else ""}
## Notes
{os.linesep.join("- " + n for n in notes) or "- (none)"}
## Safety notes
- `sysctl_apply` defaults to `true` and applies `/etc/sysctl.d/99-enroll.conf` when the file changes.
- `sysctl_ignore_apply_errors` defaults to `true`, because sysctl availability and writability can vary across kernels, containers, and hardware.
- Review this role before applying it broadly across unlike hosts.
"""
with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f:
f.write(readme)
manifest_plan.add("sysctl", role)
def _render_firewall_runtime_role(
ctx: AnsibleManifestContext,
manifest_plan: AnsibleManifestPlan,
firewall_runtime_snapshot: Dict[str, Any],
) -> None:
if not (
firewall_runtime_snapshot
and (
firewall_runtime_snapshot.get("ipset_save")
or firewall_runtime_snapshot.get("iptables_v4_save")
or firewall_runtime_snapshot.get("iptables_v6_save")
)
):
return
role = firewall_runtime_snapshot.get("role_name", "firewall_runtime")
role_dir = os.path.join(ctx.roles_root, role)
_write_role_scaffold(role_dir)
var_prefix = role
packages = firewall_runtime_snapshot.get("packages", []) or []
ipset_save = firewall_runtime_snapshot.get("ipset_save") or ""
ipset_sets = firewall_runtime_snapshot.get("ipset_sets", []) or []
iptables_v4_save = firewall_runtime_snapshot.get("iptables_v4_save") or ""
iptables_v6_save = firewall_runtime_snapshot.get("iptables_v6_save") or ""
notes = firewall_runtime_snapshot.get("notes", []) or []
if ctx.site_mode:
_copy_artifacts(
ctx.bundle_dir,
role,
_host_role_files_dir(ctx.out_dir, ctx.fqdn or "", role),
)
else:
_copy_artifacts(ctx.bundle_dir, role, os.path.join(role_dir, "files"))
vars_map: Dict[str, Any] = {
f"{var_prefix}_packages": packages,
f"{var_prefix}_ipset_save": ipset_save,
f"{var_prefix}_ipset_sets": ipset_sets,
f"{var_prefix}_iptables_v4_save": iptables_v4_save,
f"{var_prefix}_iptables_v6_save": iptables_v6_save,
f"{var_prefix}_sync_ipsets_exact": True,
f"{var_prefix}_restore_iptables": True,
}
if ctx.site_mode:
_write_role_defaults(
role_dir,
{
f"{var_prefix}_packages": [],
f"{var_prefix}_ipset_save": "",
f"{var_prefix}_ipset_sets": [],
f"{var_prefix}_iptables_v4_save": "",
f"{var_prefix}_iptables_v6_save": "",
f"{var_prefix}_sync_ipsets_exact": True,
f"{var_prefix}_restore_iptables": True,
},
)
_write_hostvars(ctx.out_dir, ctx.fqdn or "", role, vars_map)
else:
_write_role_defaults(role_dir, vars_map)
tasks = (
"---\n"
+ _render_install_packages_tasks(role, var_prefix)
+ _render_firewall_runtime_tasks(var_prefix)
)
with open(os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8") as f:
f.write(tasks.rstrip() + "\n")
with open(os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8") as f:
f.write("---\ndependencies: []\n")
readme = f"""# {role}
Generated from live firewall runtime state captured during harvest.
This role restores live ipset and iptables state only for firewall families where Enroll did not find corresponding persistent configuration on the source host. Static firewall configuration files, such as `/etc/iptables/rules.v4`, `/etc/iptables/rules.v6`, UFW, nftables, firewalld, or `/etc/ipset*`, are harvested separately as managed files and treated as authoritative for their respective family.
## Captured snapshots
- ipset: {ipset_save or "(none)"}
- iptables IPv4: {iptables_v4_save or "(none)"}
- iptables IPv6: {iptables_v6_save or "(none)"}
## Captured ipsets
{os.linesep.join("- " + x for x in ipset_sets) or "- (none)"}
## Notes
{os.linesep.join("- " + n for n in notes) or "- (none)"}
## Safety notes
- `firewall_runtime_sync_ipsets_exact` defaults to `true`; it flushes captured set members before replaying the saved members so stale entries are removed. This applies only when no persistent ipset config was found.
- `firewall_runtime_restore_iptables` defaults to `true`; `iptables-restore`/`ip6tables-restore` replace only captured families. A family is captured only when no corresponding persistent iptables config was found.
"""
with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f:
f.write(readme)
manifest_plan.add("firewall_runtime", role)

View file

@ -1,434 +0,0 @@
from __future__ import annotations
import os
from typing import Any, Dict, List
from ..context import AnsibleManifestContext
from ..layout import (
_copy_artifacts,
_ensure_requirements_yaml,
_host_role_files_dir,
_write_hostvars,
_write_role_defaults,
_write_role_scaffold,
)
from ..model import AnsibleManifestPlan
from ..vars import _normalise_flatpak_item, _normalise_flatpak_remote
def _render_users_role(
ctx: AnsibleManifestContext,
manifest_plan: AnsibleManifestPlan,
users_snapshot: Dict[str, Any],
) -> None:
bundle_dir = ctx.bundle_dir
out_dir = ctx.out_dir
roles_root = ctx.roles_root
fqdn = ctx.fqdn
site_mode = ctx.site_mode
# -------------------------
# Users role (non-system users)
# -------------------------
if users_snapshot:
role = users_snapshot.get("role_name", "users")
role_dir = os.path.join(roles_root, role)
_write_role_scaffold(role_dir)
# Users role includes harvested SSH-related files; in site mode keep them
# host-specific to avoid cross-host clobber.
if site_mode:
_copy_artifacts(
bundle_dir, role, _host_role_files_dir(out_dir, fqdn or "", role)
)
else:
_copy_artifacts(bundle_dir, role, os.path.join(role_dir, "files"))
users = users_snapshot.get("users", [])
managed_files = users_snapshot.get("managed_files", [])
excluded = users_snapshot.get("excluded", [])
notes = users_snapshot.get("notes", [])
# Build groups list and a simplified user dict list suitable for loops
group_names: List[str] = []
group_set = set()
users_data: List[Dict[str, Any]] = []
for u in users:
name = u.get("name")
if not name:
continue
pg = u.get("primary_group") or name
home = u.get("home") or f"/home/{name}"
sshdir = home.rstrip("/") + "/.ssh"
supp = u.get("supplementary_groups") or []
if pg:
group_set.add(pg)
for g in supp:
if g:
group_set.add(g)
users_data.append(
{
"name": name,
"uid": u.get("uid"),
"primary_group": pg,
"home": home,
"ssh_dir": sshdir,
"shell": u.get("shell"),
"gecos": u.get("gecos"),
"supplementary_groups": sorted(set(supp)),
}
)
group_names = sorted(group_set)
# User-managed files (authorized_keys plus dangerous-mode shell dotfiles).
# Keep the variable name for compatibility with existing generated data.
ssh_files: List[Dict[str, Any]] = []
for mf in managed_files:
dest = mf.get("path") or ""
src_rel = mf.get("src_rel") or ""
if not dest or not src_rel:
continue
owner = "root"
group = "root"
for u in users_data:
home_prefix = (u.get("home") or "").rstrip("/") + "/"
if home_prefix and dest.startswith(home_prefix):
owner = str(u.get("name") or "root")
group = str(u.get("primary_group") or owner)
break
# Prefer the harvested file mode so we preserve any deliberate
# permissions (e.g. 0600 for certain dotfiles). For authorized_keys,
# enforce 0600 regardless.
mode = mf.get("mode") or "0644"
if mf.get("reason") == "authorized_keys":
mode = "0600"
ssh_files.append(
{
"dest": dest,
"src_rel": src_rel,
"owner": owner,
"group": group,
"mode": mode,
}
)
# Only create .ssh directories for users that actually have harvested
# files under .ssh. This mirrors Puppet's behaviour and avoids creating
# empty SSH directories merely because a user account exists.
ssh_dirs_by_dest: Dict[str, Dict[str, Any]] = {}
for item in ssh_files:
dest = str(item.get("dest") or "")
if not dest:
continue
for user in users_data:
ssh_dir = str(user.get("ssh_dir") or "").rstrip("/")
if not ssh_dir or not dest.startswith(ssh_dir + "/"):
continue
ssh_dirs_by_dest.setdefault(
ssh_dir,
{
"dest": ssh_dir,
"owner": str(user.get("name") or item.get("owner") or "root"),
"group": str(
user.get("primary_group") or item.get("group") or "root"
),
"mode": "0700",
},
)
break
ssh_dirs = sorted(
ssh_dirs_by_dest.values(), key=lambda item: str(item.get("dest") or "")
)
# Build Flatpak and Snap lists. Flatpak can be installed system-wide or
# per-user. Snap packages are system-wide; per-user ~/snap/* directories
# are runtime/user data and are not treated as install sources.
users_flatpaks: List[Dict[str, Any]] = []
user_flatpak_map = users_snapshot.get("user_flatpaks", {}) or {}
home_by_user = {
str(u.get("name")): str(u.get("home") or "") for u in users_data
}
for uname, flatpaks in user_flatpak_map.items():
for fp in flatpaks or []:
users_flatpaks.append(
_normalise_flatpak_item(
fp,
method="user",
user=str(uname),
home=home_by_user.get(str(uname)) or None,
)
)
flatpak_remotes = [
_normalise_flatpak_remote(r)
for r in (users_snapshot.get("user_flatpak_remotes", []) or [])
]
users_needs_community = bool(flatpak_remotes or users_flatpaks)
if users_needs_community:
_ensure_requirements_yaml(os.path.join(out_dir, "requirements.yml"))
# Variables are host-specific in site mode; in non-site mode they live in role defaults.
if site_mode:
_write_role_defaults(
role_dir,
{
"users_groups": [],
"users_users": [],
"users_ssh_dirs": [],
"users_ssh_files": [],
"users_flatpaks": [],
"users_flatpak_remotes": [],
},
)
_write_hostvars(
out_dir,
fqdn or "",
role,
{
"users_groups": group_names,
"users_users": users_data,
"users_ssh_dirs": ssh_dirs,
"users_ssh_files": ssh_files,
"users_flatpaks": users_flatpaks,
"users_flatpak_remotes": flatpak_remotes,
},
)
else:
_write_role_defaults(
role_dir,
{
"users_groups": group_names,
"users_users": users_data,
"users_ssh_dirs": ssh_dirs,
"users_ssh_files": ssh_files,
"users_flatpaks": users_flatpaks,
"users_flatpak_remotes": flatpak_remotes,
},
)
with open(
os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8"
) as f:
if users_needs_community:
f.write(
"---\n"
"dependencies: []\n"
"collections:\n"
" - community.general\n"
)
else:
f.write("---\ndependencies: []\n")
# tasks (data-driven)
users_tasks = """---
- name: Ensure groups exist
ansible.builtin.group:
name: "{{ item }}"
state: present
loop: "{{ users_groups | default([]) }}"
- name: Ensure users exist
ansible.builtin.user:
name: "{{ item.name }}"
uid: "{{ item.uid | default(omit) }}"
group: "{{ item.primary_group }}"
home: "{{ item.home }}"
create_home: true
shell: "{{ item.shell | default(omit) }}"
comment: "{{ item.gecos | default(omit) }}"
state: present
loop: "{{ users_users | default([]) }}"
- name: Ensure users supplementary groups
ansible.builtin.user:
name: "{{ item.name }}"
groups: "{{ item.supplementary_groups | default([]) | join(',') }}"
append: true
loop: "{{ users_users | default([]) }}"
when: (item.supplementary_groups | default([])) | length > 0
- name: Ensure .ssh directories exist for managed SSH files
ansible.builtin.file:
path: "{{ item.dest }}"
state: directory
owner: "{{ item.owner }}"
group: "{{ item.group }}"
mode: "{{ item.mode }}"
loop: "{{ users_ssh_dirs | default([]) }}"
- name: Deploy user-managed files
vars:
_enroll_ff:
files:
- "{{ inventory_dir }}/host_vars/{{ inventory_hostname }}/{{ role_name }}/.files/{{ item.src_rel }}"
- "{{ role_path }}/files/{{ item.src_rel }}"
ansible.builtin.copy:
src: "{{ lookup('ansible.builtin.first_found', _enroll_ff) }}"
dest: "{{ item.dest }}"
owner: "{{ item.owner }}"
group: "{{ item.group }}"
mode: "{{ item.mode }}"
loop: "{{ users_ssh_files | default([]) }}"
"""
if flatpak_remotes or users_flatpaks:
users_tasks += """
- name: Ensure user Flatpak remotes exist
ansible.builtin.command:
argv:
- flatpak
- remote-add
- --user
- --if-not-exists
- "{{ item.name }}"
- "{{ item.url }}"
loop: "{{ users_flatpak_remotes | default([]) | selectattr('method', 'equalto', 'user') | list }}"
when:
- item.name is defined
- item.url is defined
- item.url | length > 0
- item.user is defined
become: true
become_user: "{{ item.user }}"
environment:
HOME: "{{ item.home | default('/home/' ~ item.user, true) }}"
XDG_DATA_HOME: "{{ (item.home | default('/home/' ~ item.user, true)) ~ '/.local/share' }}"
changed_when: false
- name: Install user Flatpaks
community.general.flatpak:
name:
- "{{ item.name }}"
state: present
method: user
remote: "{{ item.remote | default(omit) }}"
from_url: "{{ item.from_url | default(omit) }}"
loop: "{{ users_flatpaks | default([]) }}"
when:
- item.name is defined
- item.name | length > 0
- item.user is defined
become: true
become_user: "{{ item.user }}"
environment:
HOME: "{{ item.home | default('/home/' ~ item.user, true) }}"
XDG_DATA_HOME: "{{ (item.home | default('/home/' ~ item.user, true)) ~ '/.local/share' }}"
"""
with open(
os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8"
) as f:
f.write(users_tasks)
with open(
os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8"
) as f:
f.write("---\n")
def _fmt_app_list(items: List[Dict[str, Any]]) -> str:
lines = []
for item in items:
name = item.get("name")
if not name:
continue
detail_parts = []
for key in ("remote", "channel", "revision", "branch", "arch"):
value = item.get(key)
if value not in (None, "", []):
detail_parts.append(f"{key}={value}")
for key in ("classic", "devmode", "dangerous"):
if item.get(key):
detail_parts.append(key)
details = f" ({', '.join(detail_parts)})" if detail_parts else ""
lines.append(f"- {name}{details}")
return "\n".join(lines) or "- (none)"
def _fmt_user_flatpaks(items: List[Dict[str, Any]]) -> str:
lines = []
for item in items:
name = item.get("name")
user = item.get("user")
if not name or not user:
continue
detail_parts = []
for key in ("remote", "branch", "arch"):
value = item.get(key)
if value not in (None, "", []):
detail_parts.append(f"{key}={value}")
details = f" ({', '.join(detail_parts)})" if detail_parts else ""
lines.append(f"- {user}: {name}{details}")
return "\n".join(lines) or "- (none)"
def _fmt_remotes(items: List[Dict[str, Any]]) -> str:
lines = []
for item in items:
name = item.get("name")
url = item.get("url")
method = item.get("method") or "system"
user = item.get("user")
if not name or not url:
continue
owner = f"user={user}" if user else "system"
lines.append(f"- {name} ({method}, {owner}): {url}")
return "\n".join(lines) or "- (none)"
readme = (
"""# users
Generated non-system user accounts, SSH public material, and per-user Flatpak
applications/remotes.
**Note:** User Flatpak tasks require the `community.general` Ansible collection.
Install it with: `ansible-galaxy collection install -r requirements.yml`.
Flatpak `remote` is harvested from the installed deployment where detectable.
The original `.flatpakref` URL is generally not preserved by Flatpak after
installation, so `from_url` is only emitted if a future/hand-edited state file
contains it.
## Users
"""
+ (
"\n".join([f"- {u.get('name')} (uid {u.get('uid')})" for u in users])
or "- (none)"
)
+ """\n
## Included SSH files
"""
+ (
"\n".join(
[f"- {mf.get('path')} ({mf.get('reason')})" for mf in managed_files]
)
or "- (none)"
)
+ """\n
## Flatpak remotes
"""
+ _fmt_remotes(flatpak_remotes)
+ """\n
## User Flatpaks
"""
+ _fmt_user_flatpaks(users_flatpaks)
+ """\n
## Excluded
"""
+ (
"\n".join([f"- {e.get('path')} ({e.get('reason')})" for e in excluded])
or "- (none)"
)
+ """\n
## Notes
"""
+ ("\n".join([f"- {n}" for n in notes]) or "- (none)")
+ """\n"""
)
with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f:
f.write(readme)
manifest_plan.add("users", role)

View file

@ -1,290 +0,0 @@
from __future__ import annotations
def _render_generic_files_tasks(
var_prefix: str, *, include_restart_notify: bool
) -> str:
"""Render generic tasks to deploy <var_prefix>_managed_files safely."""
# Using first_found makes roles work in both modes:
# - site-mode: inventory/host_vars/<host>/<role>/.files/...
# - non-site: roles/<role>/files/...
return f"""- name: Ensure managed directories exist (preserve owner/group/mode)
ansible.builtin.file:
path: "{{{{ item.dest }}}}"
state: directory
owner: "{{{{ item.owner }}}}"
group: "{{{{ item.group }}}}"
mode: "{{{{ item.mode }}}}"
loop: "{{{{ {var_prefix}_managed_dirs | default([]) }}}}"
- name: Deploy any systemd unit files (templates)
ansible.builtin.template:
src: "{{{{ item.src_rel }}}}.j2"
dest: "{{{{ item.dest }}}}"
owner: "{{{{ item.owner }}}}"
group: "{{{{ item.group }}}}"
mode: "{{{{ item.mode }}}}"
loop: >-
{{{{ {var_prefix}_managed_files | default([])
| selectattr('is_systemd_unit', 'equalto', true)
| selectattr('kind', 'equalto', 'template')
| list }}}}
notify: "{{{{ item.notify | default([]) }}}}"
- name: Deploy any systemd unit files (raw files)
vars:
_enroll_ff:
files:
- "{{{{ inventory_dir }}}}/host_vars/{{{{ inventory_hostname }}}}/{{{{ role_name }}}}/.files/{{{{ item.src_rel }}}}"
- "{{{{ role_path }}}}/files/{{{{ item.src_rel }}}}"
ansible.builtin.copy:
src: "{{{{ lookup('ansible.builtin.first_found', _enroll_ff) }}}}"
dest: "{{{{ item.dest }}}}"
owner: "{{{{ item.owner }}}}"
group: "{{{{ item.group }}}}"
mode: "{{{{ item.mode }}}}"
loop: >-
{{{{ {var_prefix}_managed_files | default([])
| selectattr('is_systemd_unit', 'equalto', true)
| selectattr('kind', 'equalto', 'copy')
| list }}}}
notify: "{{{{ item.notify | default([]) }}}}"
- name: Reload systemd to pick up unit changes
ansible.builtin.meta: flush_handlers
when: >-
({var_prefix}_managed_files | default([])
| selectattr('is_systemd_unit', 'equalto', true)
| list
| length) > 0
- name: Deploy any other managed files (templates)
ansible.builtin.template:
src: "{{{{ item.src_rel }}}}.j2"
dest: "{{{{ item.dest }}}}"
owner: "{{{{ item.owner }}}}"
group: "{{{{ item.group }}}}"
mode: "{{{{ item.mode }}}}"
loop: >-
{{{{ {var_prefix}_managed_files | default([])
| selectattr('is_systemd_unit', 'equalto', false)
| selectattr('kind', 'equalto', 'template')
| list }}}}
notify: "{{{{ item.notify | default([]) }}}}"
- name: Deploy any other managed files (raw files)
vars:
_enroll_ff:
files:
- "{{{{ inventory_dir }}}}/host_vars/{{{{ inventory_hostname }}}}/{{{{ role_name }}}}/.files/{{{{ item.src_rel }}}}"
- "{{{{ role_path }}}}/files/{{{{ item.src_rel }}}}"
ansible.builtin.copy:
src: "{{{{ lookup('ansible.builtin.first_found', _enroll_ff) }}}}"
dest: "{{{{ item.dest }}}}"
owner: "{{{{ item.owner }}}}"
group: "{{{{ item.group }}}}"
mode: "{{{{ item.mode }}}}"
loop: >-
{{{{ {var_prefix}_managed_files | default([])
| selectattr('is_systemd_unit', 'equalto', false)
| selectattr('kind', 'equalto', 'copy')
| list }}}}
notify: "{{{{ item.notify | default([]) }}}}"
- name: Ensure managed symlinks exist
ansible.builtin.file:
src: "{{{{ item.src }}}}"
dest: "{{{{ item.dest }}}}"
state: link
force: true
loop: "{{{{ {var_prefix}_managed_links | default([]) }}}}"
"""
def _render_install_packages_tasks(role: str, var_prefix: str) -> str:
"""Render package installation through Ansible's generic package provider.
Puppet and Salt use provider-backed package resources instead of selecting
apt/dnf/yum in the generated manifest. Ansible's package module is the
equivalent abstraction: it proxies to the target host's detected package
manager and keeps generated roles provider-neutral.
"""
return f"""- name: Install packages for {role}
ansible.builtin.package:
name: "{{{{ {var_prefix}_packages | default([]) }}}}"
state: present
when: ({var_prefix}_packages | default([])) | length > 0
"""
def _render_grouped_systemd_tasks(var_prefix: str) -> str:
"""Render tasks to manage multiple systemd units in a common role."""
return f"""- name: Probe whether grouped systemd units exist and are manageable
ansible.builtin.systemd:
name: "{{{{ item.name }}}}"
no_log: "{{{{ enroll_hide_systemd_status | default(true) | bool }}}}"
check_mode: true
loop: "{{{{ {var_prefix}_systemd_units | default([]) }}}}"
register: _enroll_unit_probes
failed_when: false
changed_when: false
when: item.manage | default(false)
- name: Ensure grouped unit enablement matches harvest
ansible.builtin.systemd:
name: "{{{{ item.item.name }}}}"
enabled: "{{{{ item.item.enabled | bool }}}}"
no_log: "{{{{ enroll_hide_systemd_status | default(true) | bool }}}}"
loop: "{{{{ _enroll_unit_probes.results | default([]) }}}}"
when:
- item.item.manage | default(false)
- not (item.failed | default(false))
- name: Ensure grouped unit running state matches harvest
ansible.builtin.systemd:
name: "{{{{ item.item.name }}}}"
state: "{{{{ item.item.state }}}}"
no_log: "{{{{ enroll_hide_systemd_status | default(true) | bool }}}}"
loop: "{{{{ _enroll_unit_probes.results | default([]) }}}}"
when:
- item.item.manage | default(false)
- not (item.failed | default(false))
"""
def _render_sysctl_tasks(var_prefix: str) -> str:
return f"""- name: Ensure sysctl.d exists
ansible.builtin.file:
path: /etc/sysctl.d
state: directory
owner: root
group: root
mode: "0755"
- name: Deploy captured sysctl configuration
vars:
_enroll_ff:
files:
- "{{{{ inventory_dir }}}}/host_vars/{{{{ inventory_hostname }}}}/{{{{ role_name }}}}/.files/{{{{ {var_prefix}_conf_src_rel }}}}"
- "{{{{ role_path }}}}/files/{{{{ {var_prefix}_conf_src_rel }}}}"
ansible.builtin.copy:
src: "{{{{ lookup('ansible.builtin.first_found', _enroll_ff) }}}}"
dest: /etc/sysctl.d/99-enroll.conf
owner: root
group: root
mode: "0644"
when: ({var_prefix}_conf_src_rel | default('') | length) > 0
notify: Apply captured sysctl configuration
"""
def _render_sysctl_handlers(var_prefix: str) -> str:
return f"""---
- name: Apply captured sysctl configuration
ansible.builtin.command:
argv:
- sysctl
- -e
- -p
- /etc/sysctl.d/99-enroll.conf
register: _enroll_sysctl_apply
changed_when: false
failed_when:
- not ({var_prefix}_ignore_apply_errors | default(true) | bool)
- _enroll_sysctl_apply.rc != 0
when: {var_prefix}_apply | default(true) | bool
"""
def _render_firewall_runtime_tasks(var_prefix: str) -> str:
"""Render tasks for live ipset/iptables snapshots."""
return f"""- name: Ensure firewall runtime snapshot directory exists
ansible.builtin.file:
path: /etc/enroll/firewall
state: directory
owner: root
group: root
mode: "0750"
- name: Deploy captured ipset snapshot
vars:
_enroll_ff:
files:
- "{{{{ inventory_dir }}}}/host_vars/{{{{ inventory_hostname }}}}/{{{{ role_name }}}}/.files/{{{{ {var_prefix}_ipset_save }}}}"
- "{{{{ role_path }}}}/files/{{{{ {var_prefix}_ipset_save }}}}"
ansible.builtin.copy:
src: "{{{{ lookup('ansible.builtin.first_found', _enroll_ff) }}}}"
dest: /etc/enroll/firewall/ipset.save
owner: root
group: root
mode: "0600"
when: ({var_prefix}_ipset_save | default('') | length) > 0
- name: Flush captured ipsets before restoring members
ansible.builtin.command:
cmd: "ipset flush {{{{ item }}}}"
loop: "{{{{ {var_prefix}_ipset_sets | default([]) }}}}"
register: _enroll_ipset_flush
failed_when: false
changed_when: false
when:
- ({var_prefix}_ipset_save | default('') | length) > 0
- {var_prefix}_sync_ipsets_exact | default(true) | bool
- name: Restore captured ipsets
ansible.builtin.shell: "ipset restore -exist < /etc/enroll/firewall/ipset.save"
args:
executable: /bin/sh
register: _enroll_ipset_restore
changed_when: _enroll_ipset_restore.rc == 0
when: ({var_prefix}_ipset_save | default('') | length) > 0
- name: Deploy captured IPv4 iptables snapshot
vars:
_enroll_ff:
files:
- "{{{{ inventory_dir }}}}/host_vars/{{{{ inventory_hostname }}}}/{{{{ role_name }}}}/.files/{{{{ {var_prefix}_iptables_v4_save }}}}"
- "{{{{ role_path }}}}/files/{{{{ {var_prefix}_iptables_v4_save }}}}"
ansible.builtin.copy:
src: "{{{{ lookup('ansible.builtin.first_found', _enroll_ff) }}}}"
dest: /etc/enroll/firewall/iptables.v4
owner: root
group: root
mode: "0600"
when: ({var_prefix}_iptables_v4_save | default('') | length) > 0
- name: Restore captured IPv4 iptables rules
ansible.builtin.command:
cmd: iptables-restore /etc/enroll/firewall/iptables.v4
register: _enroll_iptables_v4_restore
changed_when: _enroll_iptables_v4_restore.rc == 0
when:
- ({var_prefix}_iptables_v4_save | default('') | length) > 0
- {var_prefix}_restore_iptables | default(true) | bool
- name: Deploy captured IPv6 iptables snapshot
vars:
_enroll_ff:
files:
- "{{{{ inventory_dir }}}}/host_vars/{{{{ inventory_hostname }}}}/{{{{ role_name }}}}/.files/{{{{ {var_prefix}_iptables_v6_save }}}}"
- "{{{{ role_path }}}}/files/{{{{ {var_prefix}_iptables_v6_save }}}}"
ansible.builtin.copy:
src: "{{{{ lookup('ansible.builtin.first_found', _enroll_ff) }}}}"
dest: /etc/enroll/firewall/iptables.v6
owner: root
group: root
mode: "0600"
when: ({var_prefix}_iptables_v6_save | default('') | length) > 0
- name: Restore captured IPv6 iptables rules
ansible.builtin.command:
cmd: ip6tables-restore /etc/enroll/firewall/iptables.v6
register: _enroll_iptables_v6_restore
changed_when: _enroll_iptables_v6_restore.rc == 0
when:
- ({var_prefix}_iptables_v6_save | default('') | length) > 0
- {var_prefix}_restore_iptables | default(true) | bool
"""

View file

@ -1,151 +0,0 @@
from __future__ import annotations
from typing import Any, Dict, List, Optional, Set
def _normalise_flatpak_item(
item: Any,
*,
method: str,
user: Optional[str] = None,
home: Optional[str] = None,
) -> Dict[str, Any]:
if isinstance(item, str):
out: Dict[str, Any] = {"name": item, "method": method}
elif isinstance(item, dict):
out = dict(item)
out.setdefault("method", method)
else:
out = {"name": str(item), "method": method}
if user:
out.setdefault("user", user)
if home:
out.setdefault("home", home)
return out
def _normalise_flatpak_remote(item: Any) -> Dict[str, Any]:
if isinstance(item, dict):
out = dict(item)
else:
out = {"name": str(item)}
out.setdefault("method", "system")
return out
def _normalise_snap_item(item: Any) -> Dict[str, Any]:
if isinstance(item, str):
out: Dict[str, Any] = {"name": item}
elif isinstance(item, dict):
out = dict(item)
else:
out = {"name": str(item)}
notes = out.get("notes") or []
if isinstance(notes, str):
notes = [notes]
notes_l = {str(n).lower() for n in notes}
out["classic"] = bool(out.get("classic") or "classic" in notes_l)
out["devmode"] = bool(out.get("devmode") or "devmode" in notes_l)
out["dangerous"] = bool(out.get("dangerous") or "dangerous" in notes_l)
# The Ansible snap module's revision parameter pins/holds the snap. For
# ordinary store snaps that track a channel, preserve the channel instead
# of freezing every harvested host at today's revision.
if out.get("revision") is not None and not out.get("channel"):
out["install_revision"] = True
else:
out["install_revision"] = False
return out
def _build_managed_dirs_var(
managed_dirs: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""Convert enroll managed_dirs into an Ansible-friendly list of dicts.
Each dict drives a role task loop and is safe across hosts.
"""
out: List[Dict[str, Any]] = []
for d in managed_dirs:
dest = d.get("path") or ""
if not dest:
continue
out.append(
{
"dest": dest,
"owner": d.get("owner") or "root",
"group": d.get("group") or "root",
"mode": d.get("mode") or "0755",
}
)
return out
def _build_managed_files_var(
managed_files: List[Dict[str, Any]],
templated_src_rels: Set[str],
*,
notify_other: Optional[str] = None,
notify_systemd: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""Convert enroll managed_files into an Ansible-friendly list of dicts.
Each dict drives a role task loop and is safe across hosts.
"""
out: List[Dict[str, Any]] = []
for mf in managed_files:
dest = mf.get("path") or ""
src_rel = mf.get("src_rel") or ""
if not dest or not src_rel:
continue
is_unit = str(dest).startswith("/etc/systemd/system/")
kind = "template" if src_rel in templated_src_rels else "copy"
notify: List[str] = []
if is_unit and notify_systemd:
notify.append(notify_systemd)
if (not is_unit) and notify_other:
notify.append(notify_other)
out.append(
{
"dest": dest,
"src_rel": src_rel,
"owner": mf.get("owner") or "root",
"group": mf.get("group") or "root",
"mode": mf.get("mode") or "0644",
"kind": kind,
"is_systemd_unit": bool(is_unit),
"notify": notify,
}
)
return out
def _build_managed_links_var(
managed_links: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""Convert enroll managed_links into an Ansible-friendly list of dicts."""
out: List[Dict[str, Any]] = []
for ml in managed_links or []:
dest = ml.get("path") or ""
src = ml.get("target") or ""
if not dest or not src:
continue
out.append({"dest": dest, "src": src})
return out
def _normalise_container_image_item(item: Any) -> Dict[str, Any]:
if isinstance(item, dict):
out = dict(item)
else:
out = {"pull_ref": str(item)}
out.setdefault("engine", "docker")
out.setdefault("scope", "system")
out.setdefault("user", None)
out.setdefault("home", None)
out.setdefault("repo_tags", [])
out.setdefault("repo_digests", [])
out.setdefault("tag_aliases", [])
out.setdefault("notes", [])
return out

View file

@ -1,69 +0,0 @@
from __future__ import annotations
from typing import Any, Dict, List
def _try_yaml():
try:
import yaml # type: ignore
except Exception:
return None
return yaml
def _yaml_load_mapping(text: str) -> Dict[str, Any]:
yaml = _try_yaml()
if yaml is None:
return {}
try:
obj = yaml.safe_load(text)
except Exception:
return {}
if obj is None:
return {}
if isinstance(obj, dict):
return obj
return {}
def _yaml_dump_mapping(obj: Dict[str, Any], *, sort_keys: bool = True) -> str:
yaml = _try_yaml()
if yaml is None:
# fall back to a naive key: value dump (best-effort)
lines: List[str] = []
for k, v in sorted(obj.items()) if sort_keys else obj.items():
lines.append(f"{k}: {v!r}")
return "\n".join(lines).rstrip() + "\n"
# ansible-lint/yamllint's indentation rules are stricter than YAML itself.
# In particular, they expect sequences nested under a mapping key to be
# indented (e.g. `foo:\n - a`), whereas PyYAML's default is often
# `foo:\n- a`.
class _IndentDumper(yaml.SafeDumper): # type: ignore
def increase_indent(self, flow: bool = False, indentless: bool = False):
return super().increase_indent(flow, False)
return (
yaml.dump(
obj,
Dumper=_IndentDumper,
default_flow_style=False,
sort_keys=sort_keys,
indent=2,
allow_unicode=True,
).rstrip()
+ "\n"
)
def _merge_mappings_overwrite(
existing: Dict[str, Any], incoming: Dict[str, Any]
) -> Dict[str, Any]:
"""Merge incoming into existing with overwrite.
NOTE: Unlike role defaults merging, host_vars should reflect the current
harvest for a host. Therefore lists are replaced rather than unioned.
"""
merged = dict(existing)
merged.update(incoming)
return merged

View file

@ -5,7 +5,9 @@ import subprocess # nosec
import tempfile
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
from typing import Any, Dict, List, Optional, Set, Tuple
from .yamlutil import yaml_dump_mapping, yaml_load_mapping
SYSTEMD_SUFFIXES = {
@ -36,6 +38,143 @@ SUPPORTED_SUFFIXES = {
} | SYSTEMD_SUFFIXES
def resolve_jinjaturtle_mode(jinjaturtle: str) -> Tuple[Optional[str], bool]:
"""Resolve Enroll's common JinjaTurtle mode flag.
Renderers accept the same values:
- ``auto``: use JinjaTurtle when present on PATH
- ``on``: require it and fail if it is absent
- ``off``: never use it
"""
jt_exe = find_jinjaturtle_cmd()
if jinjaturtle not in {"auto", "on", "off"}:
raise ValueError("jinjaturtle must be one of: auto, on, off")
if jinjaturtle == "on":
if not jt_exe:
raise RuntimeError("jinjaturtle requested but not found on PATH")
return jt_exe, True
if jinjaturtle == "auto":
return jt_exe, jt_exe is not None
return jt_exe, False
def _merge_mappings_overwrite(
existing: Dict[str, Any], incoming: Dict[str, Any]
) -> Dict[str, Any]:
merged = dict(existing)
merged.update(incoming)
return merged
@dataclass(frozen=True)
class JinjifiedArtifact:
template_rel: str
template_text: str
vars_text: str
context: Dict[str, Any]
def jinjify_artifact(
bundle_dir: str | Path,
artifact_role: str,
src_rel: str,
dest_path: str,
template_root: str | Path,
*,
jt_exe: Optional[str],
jt_enabled: bool,
overwrite_templates: bool = True,
role_name: Optional[str] = None,
) -> Optional[JinjifiedArtifact]:
"""Best-effort conversion of one harvested artifact into a Jinja2 template.
Puppet does not use JinjaTurtle, but Salt and Ansible both have the same
philosophical operation: take ``artifacts/<role>/<src_rel>``, ask
JinjaTurtle for a template and variable mapping, and write that template
under the renderer's template directory. Keeping that here prevents Salt
and Ansible from reimplementing the same probing/format/error handling.
"""
if not (jt_enabled and jt_exe and can_jinjify_path(dest_path)):
return None
artifact_path = Path(bundle_dir) / "artifacts" / artifact_role / src_rel
if not artifact_path.is_file():
return None
try:
result = run_jinjaturtle(
jt_exe,
str(artifact_path),
role_name=role_name or artifact_role,
force_format=infer_other_formats(dest_path),
)
except Exception:
return None # nosec - best-effort template generation
template_rel = Path(src_rel).as_posix() + ".j2"
template_dst = Path(template_root) / template_rel
if overwrite_templates or not template_dst.exists():
template_dst.parent.mkdir(parents=True, exist_ok=True)
template_dst.write_text(result.template_text, encoding="utf-8")
return JinjifiedArtifact(
template_rel=template_rel,
template_text=result.template_text,
vars_text=result.vars_text,
context=yaml_load_mapping(result.vars_text),
)
def jinjify_managed_files(
bundle_dir: str | Path,
artifact_role: str,
template_root: str | Path,
managed_files: List[Dict[str, Any]],
*,
jt_exe: Optional[str],
jt_enabled: bool,
overwrite_templates: bool,
role_name: Optional[str] = None,
) -> Tuple[Set[str], str]:
"""Jinjify a list of managed files and return Ansible-style vars text.
The return shape intentionally matches the historical Ansible helper:
``(templated_src_rels, combined_vars_text)``. Salt uses
:func:`jinjify_artifact` directly because it stores variables as a context
map per managed file.
"""
templated: Set[str] = set()
vars_map: Dict[str, Any] = {}
for mf in managed_files:
dest_path = str(mf.get("path") or "")
src_rel = str(mf.get("src_rel") or "")
if not dest_path or not src_rel:
continue
converted = jinjify_artifact(
bundle_dir,
artifact_role,
src_rel,
dest_path,
template_root,
jt_exe=jt_exe,
jt_enabled=jt_enabled,
overwrite_templates=overwrite_templates,
role_name=role_name or artifact_role,
)
if converted is None:
continue
templated.add(src_rel)
if converted.context:
vars_map = _merge_mappings_overwrite(vars_map, converted.context)
if vars_map:
return templated, yaml_dump_mapping(vars_map, sort_keys=True)
return templated, ""
def infer_other_formats(dest_path: str) -> Optional[str]:
p = Path(dest_path)
name = p.name.lower()

View file

@ -594,12 +594,29 @@ def _render_role_class(prole: PuppetRole) -> str:
if not engine or not pull_ref:
continue
if engine == "docker":
attrs: List[Tuple[str, str]] = [("ensure", _pp_quote("present"))]
if image.get("image"):
attrs.append(("image", _pp_quote(image["image"])))
if image.get("image_digest"):
attrs.append(("image_digest", _pp_quote(image["image_digest"])))
_resource(lines, "docker::image", pull_ref, attrs)
pull_title = _state_title("docker-pull", pull_ref)
_resource(
lines,
"exec",
pull_title,
[
(
"command",
_pp_quote(
image.get("pull_cmd")
or _container_pull_cmd(engine, pull_ref)
),
),
(
"unless",
_pp_quote(
image.get("pull_unless")
or _container_exists_cmd(engine, pull_ref)
),
),
("path", "['/usr/bin', '/bin']"),
],
)
for alias in image.get("tag_aliases") or []:
tag_ref = str(alias.get("ref") or "").strip()
if not tag_ref:
@ -624,7 +641,7 @@ def _render_role_class(prole: PuppetRole) -> str:
),
),
("path", "['/usr/bin', '/bin']"),
("require", f"Docker::Image[{_pp_quote(pull_ref)}]"),
("require", f"Exec[{_pp_quote(pull_title)}]"),
],
)
elif engine == "podman":
@ -870,17 +887,17 @@ def _render_hiera_role_class(prole: PuppetRole) -> str:
"",
" $container_images.each |Integer $idx, Hash $image| {",
" if $image['engine'] == 'docker' and $image['pull_ref'] {",
" docker::image { $image['pull_ref']:",
" ensure => 'present',",
" image => $image['image'],",
" image_digest => $image['image_digest'],",
' exec { "enroll-docker-pull-${idx}":',
" command => $image['pull_cmd'],",
" unless => $image['pull_unless'],",
" path => ['/usr/bin', '/bin'],",
" }",
" $image['tag_aliases'].each |Integer $tag_idx, Hash $alias| {",
' exec { "enroll-docker-tag-${idx}-${tag_idx}":',
" command => $alias['tag_cmd'],",
" unless => $alias['tag_unless'],",
" path => ['/usr/bin', '/bin'],",
" require => Docker::Image[$image['pull_ref']],",
' require => Exec["enroll-docker-pull-${idx}"],',
" }",
" }",
" } elsif $image['engine'] == 'podman' and $image['pull_ref'] {",
@ -1014,13 +1031,6 @@ def _hiera_node_names(out: Path) -> List[str]:
def _write_metadata(module_dir: Path, module_name: str, prole: PuppetRole) -> None:
dependencies: List[Dict[str, str]] = []
if any(img.get("engine") == "docker" for img in prole.container_images):
dependencies.append(
{
"name": "puppetlabs-docker",
"version_requirement": ">= 8.0.0 < 15.0.0",
}
)
(module_dir / "metadata.json").write_text(
json.dumps(
@ -1130,13 +1140,12 @@ This Puppet target reuses the existing harvest state without changing harvesting
- Managed directories, files, and symlinks from harvested roles.
- Basic service enablement/running-state resources.
- `/etc/sysctl.d/99-enroll.conf` plus a refresh-only sysctl apply exec when present.
- Docker images by digest using the `puppetlabs-docker` module's `docker::image` defined type (you must pre-install it).
- Docker and Podman images by digest using guarded `exec` resources (`pull`/`tag` commands with `unless` checks).
- Podman images by digest using guarded `podman pull` / `podman tag` exec resources.
## Current limitations
- Flatpak, Snap, and live firewall runtime snapshots are listed as notes when present rather than rendered as Puppet resources.
- Docker image resources require the `puppetlabs-docker` module to be installed in the Puppet environment.
- JinjaTurtle templating is currently Ansible-oriented and is not applied to Puppet output.
- Review generated resources before applying them broadly across unlike hosts.

View file

@ -17,13 +17,9 @@ from .cm import (
role_order_key,
section_label_for_packages,
)
from .jinjaturtle import (
can_jinjify_path,
find_jinjaturtle_cmd,
infer_other_formats,
run_jinjaturtle,
)
from .jinjaturtle import jinjify_artifact, resolve_jinjaturtle_mode
from .state import inventory_packages_from_state, roles_from_state
from .yamlutil import yaml_dump_mapping, yaml_load_mapping_file
class SaltRole(CMModule):
@ -363,27 +359,6 @@ def _template_source_uri(module_name: str, tmpl_rel: str) -> str:
return f"salt://roles/{module_name}/templates/{tmpl_rel}"
def _yaml_load_mapping(text: str) -> Dict[str, Any]:
try:
obj = yaml.safe_load(text)
except Exception:
return {}
return obj if isinstance(obj, dict) else {}
def _resolve_jinjaturtle_mode(jinjaturtle: str) -> Tuple[Optional[str], bool]:
jt_exe = find_jinjaturtle_cmd()
if jinjaturtle not in {"auto", "on", "off"}:
raise ValueError("jinjaturtle must be one of: auto, on, off")
if jinjaturtle == "on":
if not jt_exe:
raise RuntimeError("jinjaturtle requested but not found on PATH")
return jt_exe, True
if jinjaturtle == "auto":
return jt_exe, jt_exe is not None
return jt_exe, False
def _jinjify_managed_file(
bundle_dir: str,
artifact_role: str,
@ -395,31 +370,19 @@ def _jinjify_managed_file(
jt_enabled: bool,
overwrite_templates: bool,
) -> Optional[Tuple[str, Dict[str, Any]]]:
if not (jt_enabled and jt_exe and can_jinjify_path(dest_path)):
return None
artifact_path = Path(bundle_dir) / "artifacts" / artifact_role / src_rel
if not artifact_path.is_file():
return None
try:
result = run_jinjaturtle(
jt_exe,
str(artifact_path),
role_name=artifact_role,
force_format=infer_other_formats(dest_path),
converted = jinjify_artifact(
bundle_dir,
artifact_role,
src_rel,
dest_path,
role_dir / "templates",
jt_exe=jt_exe,
jt_enabled=jt_enabled,
overwrite_templates=overwrite_templates,
)
except Exception:
return None # nosec - best-effort template generation
context = _yaml_load_mapping(result.vars_text)
tmpl_rel = Path(src_rel).as_posix() + ".j2"
tmpl_dst = role_dir / "templates" / tmpl_rel
if overwrite_templates or not tmpl_dst.exists():
tmpl_dst.parent.mkdir(parents=True, exist_ok=True)
tmpl_dst.write_text(result.template_text, encoding="utf-8")
return tmpl_rel, context
if converted is None:
return None
return converted.template_rel, converted.context
def _node_file_prefix(fqdn: str) -> str:
@ -1039,19 +1002,13 @@ def _render_pillar_role(srole: SaltRole) -> str:
def _write_yaml(path: Path, data: Dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(
yaml.safe_dump(data, sort_keys=True, explicit_start=True),
yaml_dump_mapping(data, sort_keys=True, explicit_start=True),
encoding="utf-8",
)
def _load_yaml_mapping(path: Path) -> Dict[str, Any]:
if not path.exists():
return {}
try:
obj = yaml.safe_load(path.read_text(encoding="utf-8"))
except Exception:
return {}
return obj if isinstance(obj, dict) else {}
return yaml_load_mapping_file(path)
def _write_top(path: Path, mapping: Dict[str, List[str]]) -> None:
@ -1236,7 +1193,7 @@ class SaltManifestRenderer:
self.out_dir = out_dir
self.fqdn = fqdn
self.no_common_roles = no_common_roles
self.jt_exe, self.jt_enabled = _resolve_jinjaturtle_mode(jinjaturtle)
self.jt_exe, self.jt_enabled = resolve_jinjaturtle_mode(jinjaturtle)
def render(self) -> None:
state = SaltRole.load_state(self.bundle_dir)

56
enroll/yamlutil.py Normal file
View file

@ -0,0 +1,56 @@
from __future__ import annotations
from pathlib import Path
from typing import Any, Dict, Mapping
import yaml
class IndentedSafeDumper(yaml.SafeDumper): # type: ignore[misc]
"""PyYAML dumper that indents sequences under mapping keys."""
def increase_indent(self, flow: bool = False, indentless: bool = False):
# PyYAML calls this method with an ``indentless`` keyword, so the
# parameter name must stay intact even though Enroll deliberately
# ignores its value to force indented block sequences.
return super().increase_indent(flow, False)
def yaml_load_mapping(text: str) -> Dict[str, Any]:
"""Load YAML text and return a mapping, or an empty mapping on failure."""
try:
obj = yaml.safe_load(text)
except Exception:
return {}
return obj if isinstance(obj, dict) else {}
def yaml_load_mapping_file(path: Path) -> Dict[str, Any]:
"""Load a YAML mapping from *path*, returning an empty mapping if absent."""
if not path.exists():
return {}
return yaml_load_mapping(path.read_text(encoding="utf-8"))
def yaml_dump_mapping(
obj: Mapping[str, Any],
*,
sort_keys: bool = True,
explicit_start: bool = False,
) -> str:
"""Dump a YAML mapping using Enroll's renderer-friendly formatting."""
return (
yaml.dump(
dict(obj),
Dumper=IndentedSafeDumper,
default_flow_style=False,
sort_keys=sort_keys,
indent=2,
allow_unicode=True,
explicit_start=explicit_start,
).rstrip()
+ "\n"
)

View file

@ -2,8 +2,7 @@ import json
from pathlib import Path
import enroll.manifest as manifest_mod
from enroll.ansible_renderer import context as ansible_context
from enroll.ansible_renderer import jinjaturtle as ansible_jt
import enroll.jinjaturtle as jinjaturtle_mod
from enroll.jinjaturtle import JinjifyResult
@ -108,7 +107,7 @@ def test_manifest_uses_jinjaturtle_templates_and_does_not_copy_raw(
# Pretend jinjaturtle exists.
monkeypatch.setattr(
ansible_context, "find_jinjaturtle_cmd", lambda: "/usr/bin/jinjaturtle"
jinjaturtle_mod, "find_jinjaturtle_cmd", lambda: "/usr/bin/jinjaturtle"
)
# Stub jinjaturtle output.
@ -121,7 +120,7 @@ def test_manifest_uses_jinjaturtle_templates_and_does_not_copy_raw(
vars_text="foo_key: 1\n",
)
monkeypatch.setattr(ansible_jt, "run_jinjaturtle", fake_run_jinjaturtle)
monkeypatch.setattr(jinjaturtle_mod, "run_jinjaturtle", fake_run_jinjaturtle)
manifest_mod.manifest(str(bundle), str(out), jinjaturtle="on")

View file

@ -7,11 +7,11 @@ import tarfile
import pytest
import enroll.manifest as manifest
from enroll.ansible_renderer import context as ansible_context
from enroll.ansible_renderer import jinjaturtle as ansible_jt
from enroll.ansible_renderer import layout as ansible_layout
from enroll.ansible_renderer import tasks as ansible_tasks
from enroll.ansible_renderer import yamlutil as ansible_yaml
import enroll.jinjaturtle as jinjaturtle_mod
from enroll import ansible as ansible_layout
from enroll import ansible as ansible_tasks
from enroll import ansible as ansible_yaml
from enroll import yamlutil as yaml_helpers
def _minimal_package_state(packages):
@ -829,7 +829,7 @@ def test_copy2_replace_overwrites_readonly_destination(tmp_path: Path):
import os
import stat
from enroll.ansible_renderer.layout import _copy2_replace
from enroll.ansible import _copy2_replace
src = tmp_path / "src"
dst = tmp_path / "dst"
@ -940,7 +940,7 @@ def test_manifest_includes_dnf_config_role_when_present(tmp_path: Path):
def test_render_install_packages_tasks_uses_generic_package_provider():
from enroll.ansible_renderer.tasks import _render_install_packages_tasks
from enroll.ansible import _render_install_packages_tasks
txt = _render_install_packages_tasks("role", "role")
assert "ansible.builtin.package" in txt
@ -1078,15 +1078,6 @@ def test_manifest_orders_cron_and_logrotate_at_playbook_tail(tmp_path: Path):
assert "role: users" in roles
def test_yaml_helpers_fallback_when_yaml_unavailable(monkeypatch):
monkeypatch.setattr(ansible_yaml, "_try_yaml", lambda: None)
assert ansible_yaml._yaml_load_mapping("foo: 1\n") == {}
out = ansible_yaml._yaml_dump_mapping({"b": 2, "a": 1})
# Best-effort fallback is key: repr(value)
assert out.splitlines()[0].startswith("a: ")
assert out.endswith("\n")
def test_copy2_replace_makes_readonly_sources_user_writable(
monkeypatch, tmp_path: Path
):
@ -1214,13 +1205,13 @@ def test_manifest_applies_jinjaturtle_to_jinjifyable_managed_file(
__import__("json").dumps(state), encoding="utf-8"
)
monkeypatch.setattr(ansible_context, "find_jinjaturtle_cmd", lambda: "jinjaturtle")
monkeypatch.setattr(jinjaturtle_mod, "find_jinjaturtle_cmd", lambda: "jinjaturtle")
class _Res:
template_text = "key={{ foo }}\n"
vars_text = "foo: 123\n"
monkeypatch.setattr(ansible_jt, "run_jinjaturtle", lambda *a, **k: _Res())
monkeypatch.setattr(jinjaturtle_mod, "run_jinjaturtle", lambda *a, **k: _Res())
out_dir = tmp_path / "out"
manifest.manifest(str(bundle), str(out_dir), jinjaturtle="on")
@ -1335,13 +1326,11 @@ def test_manifest_writes_firewall_runtime_role(tmp_path: Path):
).exists()
def test_try_yaml_with_yaml_installed():
result = ansible_yaml._try_yaml()
# PyYAML should be installed for tests
if result is None:
pytest.skip("PyYAML not installed")
assert hasattr(result, "safe_load")
assert hasattr(result, "dump")
def test_yamlutil_uses_pyyaml():
import yaml
assert hasattr(yaml, "safe_load")
assert hasattr(yaml, "dump")
def test_yaml_load_mapping_with_yaml(tmp_path: Path):
@ -1353,48 +1342,48 @@ list:
- item1
- item2
"""
result = ansible_yaml._yaml_load_mapping(text)
result = yaml_helpers.yaml_load_mapping(text)
assert result["key1"] == "value1"
assert result["key2"]["nested"] == "value"
assert result["list"] == ["item1", "item2"]
def test_yaml_load_mapping_empty():
result = ansible_yaml._yaml_load_mapping("")
result = yaml_helpers.yaml_load_mapping("")
assert result == {}
def test_yaml_load_mapping_invalid():
result = ansible_yaml._yaml_load_mapping("invalid: yaml: :")
result = yaml_helpers.yaml_load_mapping("invalid: yaml: :")
assert result == {}
def test_yaml_load_mapping_not_dict():
result = ansible_yaml._yaml_load_mapping("- item1\n- item2")
result = yaml_helpers.yaml_load_mapping("- item1\n- item2")
assert result == {}
def test_yaml_load_mapping_none():
result = ansible_yaml._yaml_load_mapping("~")
result = yaml_helpers.yaml_load_mapping("~")
assert result == {}
def test_yaml_dump_mapping_with_yaml(tmp_path: Path):
obj = {"key1": "value1", "key2": 123}
result = ansible_yaml._yaml_dump_mapping(obj)
result = yaml_helpers.yaml_dump_mapping(obj)
assert "key1: value1" in result
assert "key2:" in result
def test_yaml_dump_mapping_empty():
result = ansible_yaml._yaml_dump_mapping({})
result = yaml_helpers.yaml_dump_mapping({})
# Empty dict produces '{}'
assert result.strip() == "{}"
def test_yaml_dump_mapping_with_nested(tmp_path: Path):
obj = {"key1": {"nested": "value"}}
result = ansible_yaml._yaml_dump_mapping(obj)
result = yaml_helpers.yaml_dump_mapping(obj)
assert "nested:" in result
@ -1758,7 +1747,7 @@ def test_users_role_only_creates_ssh_dir_when_managed_ssh_files_exist(tmp_path):
users_defaults_text = (out / "roles" / "users" / "defaults" / "main.yml").read_text(
encoding="utf-8"
)
users_defaults = ansible_yaml._yaml_load_mapping(users_defaults_text)
users_defaults = yaml_helpers.yaml_load_mapping(users_defaults_text)
users_tasks = (out / "roles" / "users" / "tasks" / "main.yml").read_text(
encoding="utf-8"
)

View file

@ -1,7 +1,7 @@
from __future__ import annotations
from enroll.cm import CMModule
from enroll.ansible_renderer.model import AnsibleRole
from enroll.ansible import AnsibleRole
def test_ansible_role_extends_cm_module_and_normalises_service_snapshot():

View file

@ -675,7 +675,6 @@ def test_manifest_puppet_renders_container_images_static_and_hiera(tmp_path: Pat
bundle = tmp_path / "bundle"
out = tmp_path / "puppet"
_write_state(bundle, state)
manifest.manifest(str(bundle), str(out), target="puppet")
site_pp = (out / "manifests" / "site.pp").read_text(encoding="utf-8")
@ -683,8 +682,10 @@ def test_manifest_puppet_renders_container_images_static_and_hiera(tmp_path: Pat
pp = (out / "modules" / "container_images" / "manifests" / "init.pp").read_text(
encoding="utf-8"
)
assert "docker::image" in pp
assert "image_digest => 'sha256:" + "a" * 64 + "'" in pp
assert "docker::image" not in pp
assert "docker pull" in pp
assert "Docker::Image" not in pp
assert digest in pp
assert "docker tag" in pp
assert "podman pull" in pp
metadata = json.loads(
@ -692,9 +693,7 @@ def test_manifest_puppet_renders_container_images_static_and_hiera(tmp_path: Pat
encoding="utf-8"
)
)
assert metadata["dependencies"] == [
{"name": "puppetlabs-docker", "version_requirement": ">= 8.0.0 < 15.0.0"}
]
assert metadata["dependencies"] == []
fqdn_out = tmp_path / "puppet-fqdn"
manifest.manifest(str(bundle), str(fqdn_out), target="puppet", fqdn="node.example")
@ -706,7 +705,8 @@ def test_manifest_puppet_renders_container_images_static_and_hiera(tmp_path: Pat
fqdn_out / "modules" / "container_images" / "manifests" / "init.pp"
).read_text(encoding="utf-8")
assert "Array[Hash] $container_images = []" in fqdn_pp
assert "docker::image" in fqdn_pp
assert "docker::image" not in fqdn_pp
assert "enroll-docker-pull-${idx}" in fqdn_pp
assert "enroll-podman-pull-${idx}" in fqdn_pp
assert "$image['pull_cmd']" in fqdn_pp
assert "podman pull" in (

View file

@ -469,7 +469,7 @@ def test_manifest_salt_renders_container_images_in_single_and_fqdn_modes(
def test_manifest_salt_uses_jinjaturtle_templates(monkeypatch, tmp_path: Path):
from enroll import salt as salt_mod
import enroll.jinjaturtle as jinjaturtle_mod
from enroll.jinjaturtle import JinjifyResult
bundle = tmp_path / "bundle"
@ -479,9 +479,9 @@ def test_manifest_salt_uses_jinjaturtle_templates(monkeypatch, tmp_path: Path):
_write_state(bundle, state)
monkeypatch.setattr(
salt_mod, "find_jinjaturtle_cmd", lambda: "/usr/bin/jinjaturtle"
jinjaturtle_mod, "find_jinjaturtle_cmd", lambda: "/usr/bin/jinjaturtle"
)
monkeypatch.setattr(salt_mod, "can_jinjify_path", lambda _path: True)
monkeypatch.setattr(jinjaturtle_mod, "can_jinjify_path", lambda _path: True)
def fake_run_jinjaturtle(
jt_exe: str, src_path: str, *, role_name: str, force_format=None
@ -494,7 +494,7 @@ def test_manifest_salt_uses_jinjaturtle_templates(monkeypatch, tmp_path: Path):
vars_text="foo_setting: true\n",
)
monkeypatch.setattr(salt_mod, "run_jinjaturtle", fake_run_jinjaturtle)
monkeypatch.setattr(jinjaturtle_mod, "run_jinjaturtle", fake_run_jinjaturtle)
manifest.manifest(str(bundle), str(out), target="salt", jinjaturtle="on")