Standardise more into CMModule parent class for the 3 child renderers
Some checks failed
Lint / test (push) Waiting to run
CI / test (push) Has been cancelled

This commit is contained in:
Miguel Jacq 2026-06-20 12:19:04 +10:00
parent 7379587a28
commit 899724097e
Signed by: mig5
GPG key ID: 03906B4110AAD3B8
5 changed files with 1487 additions and 2251 deletions

File diff suppressed because it is too large Load diff

View file

@ -1,8 +1,19 @@
from __future__ import annotations
import shlex
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, Iterable, Iterator, List, Mapping, Set
from typing import (
Any,
Callable,
ClassVar,
Dict,
Iterable,
Iterator,
List,
Mapping,
Set,
)
from .state import load_state, state_path, write_state
@ -25,9 +36,18 @@ class CMModule:
files: Dict[str, Dict[str, Any]] = field(default_factory=dict)
links: Dict[str, Dict[str, Any]] = field(default_factory=dict)
services: Dict[str, Dict[str, Any]] = field(default_factory=dict)
firewall_runtime: Dict[str, Any] = field(default_factory=dict)
notes: List[str] = field(default_factory=list)
def has_resources(self) -> bool:
managed_owner_attr: ClassVar[str] = "owner"
firewall_runtime_dir: ClassVar[str] = "/etc/enroll/firewall"
firewall_runtime_artifacts: ClassVar[tuple[tuple[str, str, str], ...]] = (
("ipset_save", "ipset.save", "0600"),
("iptables_v4_save", "iptables.v4", "0600"),
("iptables_v6_save", "iptables.v6", "0600"),
)
def has_core_resources(self) -> bool:
return bool(
self.packages
or self.groups
@ -36,9 +56,20 @@ class CMModule:
or self.files
or self.links
or self.services
or self.firewall_runtime
or self.notes
)
def has_resources(self) -> bool:
return self.has_core_resources()
def has_resources_or_attrs(self, *attrs: str) -> bool:
"""Return true if core resources or named renderer extras are present."""
return self.has_core_resources() or any(
bool(getattr(self, attr, None)) for attr in attrs
)
@staticmethod
def state_path(bundle_dir: str | Path) -> Path:
"""Return the canonical state.json path for a harvest bundle."""
@ -142,6 +173,412 @@ class CMModule:
def add_snapshot_notes(self, snap: Dict[str, Any]) -> None:
self.notes.extend(str(n) for n in (snap.get("notes", []) or []))
@staticmethod
def package_name_from_snapshot(snap: Dict[str, Any]) -> str:
return str(snap.get("package") or "").strip()
@staticmethod
def package_names_from_snapshot(snap: Dict[str, Any]) -> Iterator[str]:
for pkg in snap.get("packages", []) or []:
pkg_s = str(pkg or "").strip()
if pkg_s:
yield pkg_s
def add_package_snapshot(self, snap: Dict[str, Any]) -> None:
pkg = self.package_name_from_snapshot(snap)
if pkg:
self.packages.add(pkg)
def add_service_packages_from_snapshot(self, snap: Dict[str, Any]) -> None:
self.packages.update(self.package_names_from_snapshot(snap))
def service_unit_from_snapshot(self, snap: Dict[str, Any]) -> str:
return str(snap.get("unit") or "").strip()
def service_enabled_from_snapshot(self, snap: Dict[str, Any]) -> bool:
unit_file_state = str(snap.get("unit_file_state") or "")
return unit_file_state in ("enabled", "enabled-runtime")
def service_state_from_snapshot(
self,
snap: Dict[str, Any],
*,
running: str,
stopped: str,
) -> str:
return running if snap.get("active_state") == "active" else stopped
def add_service_snapshot_state(
self,
snap: Dict[str, Any],
*,
state_key: str,
running: str,
stopped: str,
include_manage: bool = False,
) -> None:
"""Add the common systemd service parts, parameterised per renderer."""
self.add_service_packages_from_snapshot(snap)
unit = self.service_unit_from_snapshot(snap)
if not unit:
return
data: Dict[str, Any] = {
"name": unit,
state_key: self.service_state_from_snapshot(
snap, running=running, stopped=stopped
),
"enable": self.service_enabled_from_snapshot(snap),
}
if include_manage:
data["manage"] = True
self.services[unit] = data
@staticmethod
def normalise_flatpak_item(
item: Any,
*,
method: str,
user: str | None = None,
home: str | None = None,
) -> Dict[str, Any]:
if isinstance(item, dict):
out = dict(item)
elif isinstance(item, str):
out = {"name": item}
else:
out = {"name": str(item)}
out["method"] = str(out.get("method") or method or "system").strip() or "system"
if user and not out.get("user"):
out["user"] = user
if home and not out.get("home"):
out["home"] = home
ref = str(out.get("ref") or "").strip()
if ref and not out.get("name"):
out["name"] = ref.rsplit("/", 1)[-1]
name = str(out.get("name") or out.get("app_id") or "").strip()
if name:
out["name"] = name
remote = str(out.get("remote") or "").strip()
if remote:
out["remote"] = remote
branch = str(out.get("branch") or out.get("origin") or "").strip()
if branch:
out["branch"] = branch
if ref:
out["ref"] = ref
return out
@staticmethod
def normalise_flatpak_remote(item: Any) -> Dict[str, Any]:
if isinstance(item, dict):
out = dict(item)
else:
out = {"name": str(item)}
name = str(out.get("name") or out.get("remote") or "").strip()
url = str(out.get("url") or out.get("from_url") or "").strip()
method = (
str(out.get("method") or out.get("scope") or "system").strip() or "system"
)
if name:
out["name"] = name
if url:
out["url"] = url
out["method"] = "user" if method == "user" else "system"
return out
@staticmethod
def normalise_snap_item(item: Any) -> Dict[str, Any]:
if isinstance(item, dict):
out = dict(item)
elif isinstance(item, str):
out = {"name": item}
else:
out = {"name": str(item)}
name = str(out.get("name") or "").strip()
if name:
out["name"] = name
channel = str(out.get("tracking") or out.get("channel") or "").strip()
if channel:
out["channel"] = channel
raw_notes = out.get("notes") or []
if isinstance(raw_notes, str):
raw_notes = [raw_notes]
notes = [str(note).lower() for note in raw_notes]
confinement = str(out.get("confinement") or "").strip().lower()
out["classic"] = bool(
out.get("classic")
or confinement == "classic"
or any("classic" in note for note in notes)
)
out["devmode"] = bool(
out.get("devmode")
or any("devmode" in note or "dev mode" in note for note in notes)
)
out["dangerous"] = bool(
out.get("dangerous") or any("dangerous" in note for note in notes)
)
revision = str(out.get("revision") or "").strip()
if revision and not channel:
out["revision"] = revision
return out
def prepare_flatpak_remote(self, item: Dict[str, Any]) -> Dict[str, Any]:
raise NotImplementedError
def prepare_flatpak_item(self, item: Dict[str, Any]) -> Dict[str, Any]:
raise NotImplementedError
def prepare_snap_item(self, item: Dict[str, Any]) -> Dict[str, Any]:
raise NotImplementedError
@staticmethod
def user_records_from_snapshot(snap: Dict[str, Any]) -> List[Dict[str, Any]]:
records: List[Dict[str, Any]] = []
for raw in snap.get("users", []) or []:
if not isinstance(raw, dict):
continue
name = str(raw.get("name") or "").strip()
if not name:
continue
primary_group = str(raw.get("primary_group") or name).strip()
supplementary = sorted(
{
str(group).strip()
for group in (raw.get("supplementary_groups") or [])
if str(group).strip()
}
)
records.append(
{
"name": name,
"uid": raw.get("uid"),
"gid": raw.get("gid"),
"primary_group": primary_group,
"home": raw.get("home") or f"/home/{name}",
"shell": raw.get("shell"),
"gecos": raw.get("gecos"),
"supplementary_groups": supplementary,
}
)
return records
@staticmethod
def user_group_names_from_records(records: Iterable[Mapping[str, Any]]) -> Set[str]:
groups: Set[str] = set()
for record in records:
primary_group = str(record.get("primary_group") or "").strip()
if primary_group:
groups.add(primary_group)
groups.update(
str(group).strip()
for group in (record.get("supplementary_groups") or [])
if str(group).strip()
)
return groups
@staticmethod
def package_service_entries(
roles: Mapping[str, Any],
inventory_packages: Mapping[str, Any],
*,
use_common_roles: bool,
) -> Iterator[Dict[str, Any]]:
for svc in roles.get("services", []) or []:
if not isinstance(svc, dict):
continue
own_label = str(svc.get("role_name") or svc.get("unit") or "service")
role_label = (
section_label_for_packages(
svc.get("packages", []) or [], inventory_packages
)
if use_common_roles
else own_label
)
yield {"kind": "service", "snapshot": svc, "role_label": role_label}
for pkg in roles.get("packages", []) or []:
if not isinstance(pkg, dict):
continue
own_label = str(pkg.get("role_name") or pkg.get("package") or "package")
role_label = (
package_section_label(pkg, inventory_packages)
if use_common_roles
else own_label
)
yield {"kind": "package", "snapshot": pkg, "role_label": role_label}
def add_user_flatpaks_snapshot(self, snap: Dict[str, Any]) -> None:
home_by_user = {
str(u.get("name")): str(u.get("home") or "")
for u in (snap.get("users", []) or [])
if isinstance(u, dict) and u.get("name")
}
for remote in snap.get("user_flatpak_remotes", []) or []:
item = self.normalise_flatpak_remote(remote)
user = str(item.get("user") or "").strip()
if user and not item.get("home"):
item["home"] = home_by_user.get(user) or f"/home/{user}"
if item.get("method") == "user" and item.get("name") and item.get("url"):
self.flatpak_remotes.append( # type: ignore[attr-defined]
self.prepare_flatpak_remote(item)
)
for uname, flatpaks in (snap.get("user_flatpaks", {}) or {}).items():
user = str(uname)
for fp in flatpaks or []:
item = self.normalise_flatpak_item(
fp, method="user", user=user, home=home_by_user.get(user) or None
)
if item.get("name"):
self.flatpaks.append( # type: ignore[attr-defined]
self.prepare_flatpak_item(item)
)
def add_flatpak_snapshot(self, snap: Dict[str, Any]) -> None:
for remote in snap.get("remotes", []) or []:
item = self.normalise_flatpak_remote(remote)
if item.get("name") and item.get("url"):
self.flatpak_remotes.append( # type: ignore[attr-defined]
self.prepare_flatpak_remote(item)
)
for fp in snap.get("system_flatpaks", []) or []:
item = self.normalise_flatpak_item(fp, method="system")
if item.get("name"):
self.flatpaks.append( # type: ignore[attr-defined]
self.prepare_flatpak_item(item)
)
self.add_snapshot_notes(snap)
def add_snap_snapshot(self, snap: Dict[str, Any]) -> None:
for raw in snap.get("system_snaps", []) or []:
item = self.normalise_snap_item(raw)
if item.get("name"):
self.snaps.append( # type: ignore[attr-defined]
self.prepare_snap_item(item)
)
self.add_snapshot_notes(snap)
def firewall_runtime_snapshot_has_artifacts(self, snap: Mapping[str, Any]) -> bool:
return any(
str(snap.get(key) or "").strip()
for key, _dest, _mode in self.firewall_runtime_artifacts
)
def firewall_runtime_source_refs(self, snap: Mapping[str, Any]) -> Dict[str, str]:
return {
key: str(snap.get(key) or "").strip()
for key, _dest, _mode in self.firewall_runtime_artifacts
if str(snap.get(key) or "").strip()
}
def firewall_runtime_dest_path(self, dest_name: str) -> str:
return f"{self.firewall_runtime_dir}/{dest_name}"
def firewall_runtime_ipset_sets(self, snap: Mapping[str, Any]) -> List[str]:
return [
str(x).strip() for x in (snap.get("ipset_sets") or []) if str(x).strip()
]
@staticmethod
def shell_quote(value: Any) -> str:
return shlex.quote(str(value or ""))
def firewall_ipset_restore_cmd(self, path: str, sets: List[str]) -> str:
flush_parts = [f"ipset flush {self.shell_quote(name)} || true" for name in sets]
flush = "; ".join(flush_parts)
restore = f"ipset restore -exist < {self.shell_quote(path)}"
if flush:
return f"/bin/sh -c {self.shell_quote(flush + '; ' + restore)}"
return f"/bin/sh -c {self.shell_quote(restore)}"
def firewall_runtime_commands(self, runtime: Mapping[str, Any]) -> Dict[str, Any]:
out: Dict[str, Any] = {}
ipset_path = str(runtime.get("ipset_save") or "")
if ipset_path:
sets = [str(x) for x in (runtime.get("ipset_sets") or []) if str(x)]
out["ipset_restore_cmd"] = self.firewall_ipset_restore_cmd(ipset_path, sets)
ipt4_path = str(runtime.get("iptables_v4_save") or "")
if ipt4_path:
out["iptables_v4_restore_cmd"] = (
f"iptables-restore {self.shell_quote(ipt4_path)}"
)
ipt6_path = str(runtime.get("iptables_v6_save") or "")
if ipt6_path:
out["iptables_v6_restore_cmd"] = (
f"ip6tables-restore {self.shell_quote(ipt6_path)}"
)
return out
def _managed_owner_attrs(self, owner: Any) -> Dict[str, Any]:
return {self.managed_owner_attr: owner or "root"}
def add_firewall_runtime_snapshot(
self,
snap: Dict[str, Any],
*,
bundle_dir: str,
artifact_role: str,
files_dir: Path,
copy_artifact: Callable[..., str | None],
source_uri: Callable[[str, str], str],
file_prefix: str | None = None,
dir_attrs: Mapping[str, Any] | None = None,
file_attrs: Mapping[str, Any] | None = None,
) -> None:
"""Add captured live firewall state using renderer-supplied file hooks."""
self.add_service_packages_from_snapshot(snap)
attrs: Dict[str, Any] = {
**self._managed_owner_attrs("root"),
"group": "root",
"mode": "0750",
"reason": "firewall_runtime",
}
if dir_attrs:
attrs.update(dir_attrs)
self.add_managed_dir(self.firewall_runtime_dir, **attrs)
runtime: Dict[str, Any] = {}
for key, dest_name, mode in self.firewall_runtime_artifacts:
src_rel = str(snap.get(key) or "").strip()
if not src_rel:
continue
role_rel = copy_artifact(
bundle_dir,
artifact_role,
src_rel,
files_dir,
dst_prefix=file_prefix,
)
if not role_rel:
self.notes.append(
f"Firewall runtime artifact {src_rel!r} was referenced but not found."
)
continue
file_data: Dict[str, Any] = {
**self._managed_owner_attrs("root"),
"group": "root",
"mode": mode,
"source": source_uri(self.module_name, role_rel),
"reason": "firewall_runtime",
}
if file_attrs:
file_data.update(file_attrs)
dest = self.firewall_runtime_dest_path(dest_name)
self.add_managed_file(dest, **file_data)
runtime[key] = dest
ipset_sets = self.firewall_runtime_ipset_sets(snap)
if ipset_sets:
runtime["ipset_sets"] = ipset_sets
if runtime:
runtime.update(self.firewall_runtime_commands(runtime))
self.firewall_runtime.update(runtime)
self.add_snapshot_notes(snap)
def remove_directory_resource_conflicts(self) -> None:
for path in set(self.files) | set(self.links):
self.dirs.pop(path, None)
@ -204,6 +641,55 @@ def role_order_key(role: str) -> tuple[int, str]:
return (priority.get(role, 50), role)
def markdown_list(items: Iterable[Any], *, empty: str = "None.") -> str:
values = [str(item) for item in items if str(item)]
return "\n".join(f"- {item}" for item in values) or f"- {empty}"
def path_reason_lines(
items: Iterable[Mapping[str, Any]], *, source_key: str = "path"
) -> List[str]:
lines: List[str] = []
for item in items or []:
path = str(item.get(source_key) or "")
if not path:
continue
reason = str(item.get("reason") or "")
lines.append(f"{path} ({reason})" if reason else path)
return lines
def iter_role_snapshots(roles: Mapping[str, Any]) -> Iterator[Mapping[str, Any]]:
for value in roles.values():
if isinstance(value, list):
for item in value:
if isinstance(item, Mapping):
yield item
elif isinstance(value, Mapping):
yield value
def snapshot_note_lines(roles: Mapping[str, Any]) -> List[str]:
notes: List[str] = []
for snap in iter_role_snapshots(roles):
source = str(
snap.get("role_name") or snap.get("unit") or snap.get("package") or "role"
)
notes.extend(f"`{source}`: {note}" for note in snap.get("notes", []) or [])
return notes
def snapshot_excluded_lines(roles: Mapping[str, Any]) -> List[str]:
excluded: List[str] = []
for snap in iter_role_snapshots(roles):
source = str(
snap.get("role_name") or snap.get("unit") or snap.get("package") or "role"
)
for line in path_reason_lines(snap.get("excluded", []) or []):
excluded.append(f"`{source}`: {line}")
return excluded
def _drop_duplicate_set_items(
module: CMModule,
values: Set[str],

View file

@ -12,10 +12,9 @@ import yaml
from .cm import (
CMModule,
package_section_label,
resolve_catalog_conflicts,
role_order_key,
section_label_for_packages,
markdown_list,
)
from .state import inventory_packages_from_state, roles_from_state
@ -32,108 +31,43 @@ class PuppetRole(CMModule):
self.flatpak_remotes: List[Dict[str, Any]] = []
self.flatpaks: List[Dict[str, Any]] = []
self.snaps: List[Dict[str, Any]] = []
self.firewall_runtime: Dict[str, Any] = {}
def has_resources(self) -> bool:
return (
super().has_resources()
or bool(self.container_images)
or bool(self.flatpak_remotes)
or bool(self.flatpaks)
or bool(self.snaps)
or bool(self.firewall_runtime)
return self.has_resources_or_attrs(
"container_images", "flatpak_remotes", "flatpaks", "snaps"
)
def add_package_snapshot(self, snap: Dict[str, Any]) -> None:
pkg = str(snap.get("package") or "").strip()
if pkg:
self.packages.add(pkg)
def add_service_snapshot(self, snap: Dict[str, Any]) -> None:
for pkg in snap.get("packages", []) or []:
pkg_s = str(pkg or "").strip()
if pkg_s:
self.packages.add(pkg_s)
unit = str(snap.get("unit") or "").strip()
if unit:
unit_file_state = str(snap.get("unit_file_state") or "")
self.services[unit] = {
"name": unit,
"ensure": (
"running" if snap.get("active_state") == "active" else "stopped"
),
"enable": unit_file_state in ("enabled", "enabled-runtime"),
}
self.add_service_snapshot_state(
snap, state_key="ensure", running="running", stopped="stopped"
)
def add_users_snapshot(self, snap: Dict[str, Any]) -> None:
for u in snap.get("users", []) or []:
if not isinstance(u, dict):
continue
name = str(u.get("name") or "").strip()
if not name:
continue
primary_group = str(u.get("primary_group") or name).strip()
if primary_group:
self.groups.add(primary_group)
supplementary = sorted(
{
str(g).strip()
for g in (u.get("supplementary_groups") or [])
if str(g).strip()
}
)
self.groups.update(supplementary)
records = self.user_records_from_snapshot(snap)
self.groups.update(self.user_group_names_from_records(records))
for record in records:
name = str(record.get("name") or "")
self.users[name] = {
"name": name,
"uid": u.get("uid"),
"gid": u.get("gid"),
"primary_group": primary_group or None,
"home": u.get("home") or f"/home/{name}",
"shell": u.get("shell"),
"gecos": u.get("gecos"),
"supplementary_groups": supplementary,
"uid": record.get("uid"),
"gid": record.get("gid"),
"primary_group": record.get("primary_group") or None,
"home": record.get("home"),
"shell": record.get("shell"),
"gecos": record.get("gecos"),
"supplementary_groups": record.get("supplementary_groups") or [],
}
home_by_user = {
str(u.get("name")): str(u.get("home") or "")
for u in (snap.get("users", []) or [])
if isinstance(u, dict) and u.get("name")
}
for remote in snap.get("user_flatpak_remotes", []) or []:
item = _normalise_flatpak_remote(remote)
user = str(item.get("user") or "").strip()
if user and not item.get("home"):
item["home"] = home_by_user.get(user) or f"/home/{user}"
if item.get("method") == "user" and item.get("name") and item.get("url"):
self.flatpak_remotes.append(_prepare_flatpak_remote(item))
for uname, flatpaks in (snap.get("user_flatpaks", {}) or {}).items():
user = str(uname)
for fp in flatpaks or []:
item = _normalise_flatpak_item(
fp, method="user", user=user, home=home_by_user.get(user) or None
)
if item.get("name"):
self.flatpaks.append(_prepare_flatpak_item(item))
self.add_user_flatpaks_snapshot(snap)
def add_flatpak_snapshot(self, snap: Dict[str, Any]) -> None:
for remote in snap.get("remotes", []) or []:
item = _normalise_flatpak_remote(remote)
if item.get("name") and item.get("url"):
self.flatpak_remotes.append(_prepare_flatpak_remote(item))
for fp in snap.get("system_flatpaks", []) or []:
item = _normalise_flatpak_item(fp, method="system")
if item.get("name"):
self.flatpaks.append(_prepare_flatpak_item(item))
for note in snap.get("notes", []) or []:
self.notes.append(str(note))
def prepare_flatpak_remote(self, item: Dict[str, Any]) -> Dict[str, Any]:
return _prepare_flatpak_remote(item)
def add_snap_snapshot(self, snap: Dict[str, Any]) -> None:
for raw in snap.get("system_snaps", []) or []:
item = _normalise_snap_item(raw)
if item.get("name"):
self.snaps.append(_prepare_snap_item(item))
for note in snap.get("notes", []) or []:
self.notes.append(str(note))
def prepare_flatpak_item(self, item: Dict[str, Any]) -> Dict[str, Any]:
return _prepare_flatpak_item(item)
def prepare_snap_item(self, item: Dict[str, Any]) -> Dict[str, Any]:
return _prepare_snap_item(item)
def add_firewall_runtime_snapshot(
self,
@ -144,58 +78,16 @@ class PuppetRole(CMModule):
module_files_dir: Path,
file_prefix: Optional[str] = None,
) -> None:
self.packages.update(
str(p).strip() for p in (snap.get("packages") or []) if str(p).strip()
super().add_firewall_runtime_snapshot(
snap,
bundle_dir=bundle_dir,
artifact_role=artifact_role,
files_dir=module_files_dir,
copy_artifact=_copy_artifact,
source_uri=_source_uri,
file_prefix=file_prefix,
dir_attrs={"require": "File['/etc/enroll']"},
)
self.add_managed_dir(
"/etc/enroll/firewall",
owner="root",
group="root",
mode="0750",
require="File['/etc/enroll']",
reason="firewall_runtime",
)
runtime: Dict[str, Any] = {}
for key, dest_name, mode in (
("ipset_save", "ipset.save", "0600"),
("iptables_v4_save", "iptables.v4", "0600"),
("iptables_v6_save", "iptables.v6", "0600"),
):
src_rel = str(snap.get(key) or "").strip()
if not src_rel:
continue
role_rel = _copy_artifact(
bundle_dir,
artifact_role,
src_rel,
module_files_dir,
dst_prefix=file_prefix,
)
if not role_rel:
self.notes.append(
f"Firewall runtime artifact {src_rel!r} was referenced but not found."
)
continue
dest = f"/etc/enroll/firewall/{dest_name}"
self.add_managed_file(
dest,
owner="root",
group="root",
mode=mode,
source=_source_uri(self.module_name, role_rel),
reason="firewall_runtime",
)
runtime[key] = dest
ipset_sets = [
str(x).strip() for x in (snap.get("ipset_sets") or []) if str(x).strip()
]
if ipset_sets:
runtime["ipset_sets"] = ipset_sets
if runtime:
runtime.update(_firewall_runtime_commands(runtime))
self.firewall_runtime.update(runtime)
for note in snap.get("notes", []) or []:
self.notes.append(str(note))
def add_container_images_snapshot(self, snap: Dict[str, Any]) -> None:
for raw in snap.get("images", []) or []:
@ -374,70 +266,6 @@ def _container_tag_cmd(engine: str, pull_ref: str, tag_ref: str) -> str:
return f"{engine} tag {_shell_quote(pull_ref)} {_shell_quote(tag_ref)}"
def _normalise_flatpak_item(
item: Dict[str, Any],
*,
method: str,
user: Optional[str] = None,
home: Optional[str] = None,
) -> Dict[str, Any]:
out = dict(item)
out["method"] = str(out.get("method") or method or "system").strip() or "system"
if user and not out.get("user"):
out["user"] = user
if home and not out.get("home"):
out["home"] = home
ref = str(out.get("ref") or "").strip()
if ref and not out.get("name"):
out["name"] = ref.rsplit("/", 1)[-1]
name = str(out.get("name") or out.get("app_id") or "").strip()
if name:
out["name"] = name
remote = str(out.get("remote") or "").strip()
if remote:
out["remote"] = remote
branch = str(out.get("branch") or out.get("origin") or "").strip()
if branch:
out["branch"] = branch
if ref:
out["ref"] = ref
return out
def _normalise_flatpak_remote(item: Dict[str, Any]) -> Dict[str, Any]:
out = dict(item)
name = str(out.get("name") or out.get("remote") or "").strip()
url = str(out.get("url") or out.get("from_url") or "").strip()
method = str(out.get("method") or out.get("scope") or "system").strip() or "system"
if name:
out["name"] = name
if url:
out["url"] = url
out["method"] = "user" if method == "user" else "system"
return out
def _normalise_snap_item(item: Dict[str, Any]) -> Dict[str, Any]:
out = dict(item)
name = str(out.get("name") or "").strip()
if name:
out["name"] = name
channel = str(out.get("tracking") or out.get("channel") or "").strip()
if channel:
out["channel"] = channel
notes = [str(note).lower() for note in (out.get("notes") or [])]
confinement = str(out.get("confinement") or "").strip().lower()
out["classic"] = confinement == "classic" or any(
"classic" in note for note in notes
)
out["devmode"] = any("devmode" in note or "dev mode" in note for note in notes)
out["dangerous"] = any("dangerous" in note for note in notes)
revision = str(out.get("revision") or "").strip()
if revision and not channel:
out["revision"] = revision
return out
def _flatpak_scope(item: Dict[str, Any]) -> str:
return "--user" if str(item.get("method") or "system") == "user" else "--system"
@ -596,30 +424,6 @@ def _state_title(prefix: str, value: Any) -> str:
return f"enroll-{prefix}-{safe}"
def _firewall_ipset_restore_cmd(path: str, sets: List[str]) -> str:
flush_parts = [f"ipset flush {_shell_quote(name)} || true" for name in sets]
flush = "; ".join(flush_parts)
restore = f"ipset restore -exist < {_shell_quote(path)}"
if flush:
return f"/bin/sh -c {_shell_quote(flush + '; ' + restore)}"
return f"/bin/sh -c {_shell_quote(restore)}"
def _firewall_runtime_commands(runtime: Dict[str, Any]) -> Dict[str, Any]:
out: Dict[str, Any] = {}
ipset_path = str(runtime.get("ipset_save") or "")
if ipset_path:
sets = [str(x) for x in (runtime.get("ipset_sets") or []) if str(x)]
out["ipset_restore_cmd"] = _firewall_ipset_restore_cmd(ipset_path, sets)
ipt4_path = str(runtime.get("iptables_v4_save") or "")
if ipt4_path:
out["iptables_v4_restore_cmd"] = f"iptables-restore {_shell_quote(ipt4_path)}"
ipt6_path = str(runtime.get("iptables_v6_save") or "")
if ipt6_path:
out["iptables_v6_restore_cmd"] = f"ip6tables-restore {_shell_quote(ipt6_path)}"
return out
def _render_firewall_runtime_execs(
lines: List[str], runtime: Dict[str, Any], *, indent: str = " "
) -> None:
@ -757,57 +561,29 @@ def _collect_puppet_roles(
file_prefix=node_file_prefix,
)
for svc in roles.get("services", []) or []:
if not isinstance(svc, dict):
continue
original_role_name = _puppet_name(
str(svc.get("role_name") or svc.get("unit") or "service"),
fallback="service",
for entry in CMModule.package_service_entries(
roles, inventory_packages, use_common_roles=use_common_modules
):
snap = entry.get("snapshot") or {}
kind = str(entry.get("kind") or "package")
fallback = "service" if kind == "service" else "package"
source_label = str(
snap.get("role_name") or snap.get("unit") or snap.get("package") or fallback
)
original_role_name = _puppet_name(source_label, fallback=fallback)
role_name = _puppet_name(
str(entry.get("role_label") or source_label),
fallback="package_group" if use_common_modules else fallback,
)
if use_common_modules:
role_name = _puppet_name(
section_label_for_packages(
[
str(p).strip()
for p in (svc.get("packages") or [])
if str(p).strip()
],
inventory_packages,
),
fallback="package_group",
)
else:
role_name = original_role_name
prole = ensure_role(role_name)
prole.add_service_snapshot(svc)
prole.add_managed_content(
svc,
bundle_dir=bundle_dir,
artifact_role=str(svc.get("role_name") or original_role_name),
module_files_dir=modules_dir / prole.module_name / "files",
file_prefix=node_file_prefix,
)
for pkg in roles.get("packages", []) or []:
if not isinstance(pkg, dict):
continue
original_role_name = _puppet_name(
str(pkg.get("role_name") or pkg.get("package") or "package"),
fallback="package",
)
if use_common_modules:
role_name = _puppet_name(
package_section_label(pkg, inventory_packages),
fallback="package_group",
)
if kind == "service":
prole.add_service_snapshot(snap)
else:
role_name = original_role_name
prole = ensure_role(role_name)
prole.add_package_snapshot(pkg)
prole.add_package_snapshot(snap)
prole.add_managed_content(
pkg,
snap,
bundle_dir=bundle_dir,
artifact_role=str(pkg.get("role_name") or original_role_name),
artifact_role=str(snap.get("role_name") or original_role_name),
module_files_dir=modules_dir / prole.module_name / "files",
file_prefix=node_file_prefix,
)
@ -1577,19 +1353,13 @@ def _render_readme(
host = state.get("host", {}) if isinstance(state.get("host"), dict) else {}
hostname = host.get("hostname") or "unknown"
hiera_mode = bool(fqdn)
role_lines = (
"\n".join(
f"- `{r.module_name}` from Enroll role `{r.role_name}`"
for r in puppet_roles
)
or "- None."
role_lines = markdown_list(
f"`{r.module_name}` from Enroll role `{r.role_name}`" for r in puppet_roles
)
node_lines = markdown_list(f"`{n}`" for n in (node_names or []))
notes_text = markdown_list(
f"`{r.module_name}`: {note}" for r in puppet_roles for note in r.notes
)
node_lines = "\n".join(f"- `{n}`" for n in (node_names or [])) or "- None."
notes: List[str] = []
for r in puppet_roles:
for note in r.notes:
notes.append(f"`{r.module_name}`: {note}")
notes_text = "\n".join(f"- {n}" for n in notes) or "- None."
if hiera_mode:
layout = f"""- `manifests/site.pp` declares node blocks and includes classes listed in Hiera key `enroll::classes`.
- `hiera.yaml` configures per-node lookup from `data/nodes/%{{trusted.certname}}.yaml` with a fallback to `data/common.yaml`.
@ -1599,11 +1369,10 @@ def _render_readme(
apply = f"""Run from this generated output directory, passing the node certname so Hiera selects the right node data:
```bash
sudo puppet apply --modulepath ./modules --hiera_config ./hiera.yaml --certname {fqdn} manifests/site.pp --noop
sudo puppet apply --modulepath ./modules --hiera_config ./hiera.yaml --certname {fqdn} manifests/site.pp --noop --test
```
If you depend on other pre-installed Puppet modules (such as for supporting Docker image version enforcement, which Enroll may
have harvested information on), you may need to pass in other modulepaths as well, e.g:
If you depend on other pre-installed Puppet modules, you may need to pass in other modulepaths as well, e.g:
```bash
sudo puppet apply --modulepath ./modules:/etc/puppet/code/modules --hiera_config ./hiera.yaml --certname {fqdn} manifests/site.pp --noop
@ -1618,11 +1387,10 @@ For Puppet agent/control-repo use, place this output where `hiera.yaml`, `data/`
apply = """Run from this generated output directory so Puppet can find `./modules`, or pass an absolute module path:
```bash
sudo puppet apply --modulepath ./modules manifests/site.pp --noop
sudo puppet apply --modulepath ./modules manifests/site.pp --noop --test
```
If you depend on other pre-installed Puppet modules (such as for supporting Docker image version enforcement, which Enroll may
have harvested information on), you may need to pass in other modulepaths as well, e.g:
If you depend on other pre-installed Puppet modules, you may need to pass in other modulepaths as well, e.g:
```bash
sudo puppet apply --modulepath ./modules:/etc/puppet/code/modules manifests/site.pp --noop
@ -1661,7 +1429,7 @@ This Puppet target reuses the existing harvest state without changing harvesting
## Current limitations
- JinjaTurtle templating is currently Ansible-oriented and is not applied to Puppet output.
- JinjaTurtle templating is currently Ansible/Salt-oriented and is not applied to Puppet output - there are no erb templates, just raw files.
- Review generated resources before applying them broadly across unlike hosts.
## Notes

View file

@ -12,10 +12,9 @@ import yaml
from .cm import (
CMModule,
package_section_label,
resolve_catalog_conflicts,
role_order_key,
section_label_for_packages,
markdown_list,
)
from .jinjaturtle import jinjify_artifact, resolve_jinjaturtle_mode
from .state import inventory_packages_from_state, roles_from_state
@ -25,6 +24,8 @@ from .yamlutil import yaml_dump_mapping, yaml_load_mapping_file
class SaltRole(CMModule):
"""Salt-specific view of a renderer-neutral CMModule."""
managed_owner_attr = "user"
def __init__(self, role_name: str) -> None:
super().__init__(
role_name=role_name,
@ -34,110 +35,47 @@ class SaltRole(CMModule):
self.flatpak_remotes: List[Dict[str, Any]] = []
self.flatpaks: List[Dict[str, Any]] = []
self.snaps: List[Dict[str, Any]] = []
self.firewall_runtime: Dict[str, Any] = {}
def has_resources(self) -> bool:
return (
super().has_resources()
or bool(self.container_images)
or bool(self.flatpak_remotes)
or bool(self.flatpaks)
or bool(self.snaps)
or bool(self.firewall_runtime)
return self.has_resources_or_attrs(
"container_images", "flatpak_remotes", "flatpaks", "snaps"
)
@property
def sls_name(self) -> str:
return f"roles.{self.module_name}"
def add_package_snapshot(self, snap: Dict[str, Any]) -> None:
pkg = str(snap.get("package") or "").strip()
if pkg:
self.packages.add(pkg)
def add_service_snapshot(self, snap: Dict[str, Any]) -> None:
for pkg in snap.get("packages", []) or []:
pkg_s = str(pkg or "").strip()
if pkg_s:
self.packages.add(pkg_s)
unit = str(snap.get("unit") or "").strip()
if unit:
unit_file_state = str(snap.get("unit_file_state") or "")
self.services[unit] = {
"name": unit,
"state": "running" if snap.get("active_state") == "active" else "dead",
"enable": unit_file_state in ("enabled", "enabled-runtime"),
}
self.add_service_snapshot_state(
snap, state_key="state", running="running", stopped="dead"
)
def add_users_snapshot(self, snap: Dict[str, Any]) -> None:
for u in snap.get("users", []) or []:
if not isinstance(u, dict):
continue
name = str(u.get("name") or "").strip()
if not name:
continue
primary_group = str(u.get("primary_group") or name).strip()
if primary_group:
self.groups.add(primary_group)
supplementary = sorted(
{
str(g).strip()
for g in (u.get("supplementary_groups") or [])
if str(g).strip()
}
)
self.groups.update(supplementary)
records = self.user_records_from_snapshot(snap)
self.groups.update(self.user_group_names_from_records(records))
for record in records:
name = str(record.get("name") or "")
user_data: Dict[str, Any] = {
"name": name,
"uid": u.get("uid"),
"gid": primary_group or u.get("gid"),
"home": u.get("home") or f"/home/{name}",
"shell": u.get("shell"),
"groups": supplementary,
"uid": record.get("uid"),
"gid": record.get("primary_group") or record.get("gid"),
"home": record.get("home"),
"shell": record.get("shell"),
"groups": record.get("supplementary_groups") or [],
}
user_data.update(_gecos_attrs(u.get("gecos")))
user_data.update(_gecos_attrs(record.get("gecos")))
self.users[name] = user_data
home_by_user = {
str(u.get("name")): str(u.get("home") or "")
for u in (snap.get("users", []) or [])
if isinstance(u, dict) and u.get("name")
}
for remote in snap.get("user_flatpak_remotes", []) or []:
item = _normalise_flatpak_remote(remote)
user = str(item.get("user") or "").strip()
if user and not item.get("home"):
item["home"] = home_by_user.get(user) or f"/home/{user}"
if item.get("method") == "user" and item.get("name") and item.get("url"):
self.flatpak_remotes.append(_prepare_flatpak_remote(item))
for uname, flatpaks in (snap.get("user_flatpaks", {}) or {}).items():
user = str(uname)
for fp in flatpaks or []:
item = _normalise_flatpak_item(
fp, method="user", user=user, home=home_by_user.get(user) or None
)
if item.get("name"):
self.flatpaks.append(_prepare_flatpak_item(item))
self.add_user_flatpaks_snapshot(snap)
def add_flatpak_snapshot(self, snap: Dict[str, Any]) -> None:
for remote in snap.get("remotes", []) or []:
item = _normalise_flatpak_remote(remote)
if item.get("name") and item.get("url"):
self.flatpak_remotes.append(_prepare_flatpak_remote(item))
for fp in snap.get("system_flatpaks", []) or []:
item = _normalise_flatpak_item(fp, method="system")
if item.get("name"):
self.flatpaks.append(_prepare_flatpak_item(item))
for note in snap.get("notes", []) or []:
self.notes.append(str(note))
def prepare_flatpak_remote(self, item: Dict[str, Any]) -> Dict[str, Any]:
return _prepare_flatpak_remote(item)
def add_snap_snapshot(self, snap: Dict[str, Any]) -> None:
for raw in snap.get("system_snaps", []) or []:
item = _normalise_snap_item(raw)
if item.get("name"):
self.snaps.append(_prepare_snap_item(item))
for note in snap.get("notes", []) or []:
self.notes.append(str(note))
def prepare_flatpak_item(self, item: Dict[str, Any]) -> Dict[str, Any]:
return _prepare_flatpak_item(item)
def prepare_snap_item(self, item: Dict[str, Any]) -> Dict[str, Any]:
return _prepare_snap_item(item)
def add_firewall_runtime_snapshot(
self,
@ -148,58 +86,16 @@ class SaltRole(CMModule):
role_files_dir: Path,
file_prefix: Optional[str] = None,
) -> None:
self.packages.update(
str(p).strip() for p in (snap.get("packages") or []) if str(p).strip()
super().add_firewall_runtime_snapshot(
snap,
bundle_dir=bundle_dir,
artifact_role=artifact_role,
files_dir=role_files_dir,
copy_artifact=_copy_artifact,
source_uri=_source_uri,
file_prefix=file_prefix,
dir_attrs={"require": [{"file": "/etc/enroll"}]},
)
self.add_managed_dir(
"/etc/enroll/firewall",
user="root",
group="root",
mode="0750",
require=[{"file": "/etc/enroll"}],
reason="firewall_runtime",
)
runtime: Dict[str, Any] = {}
for key, dest_name, mode in (
("ipset_save", "ipset.save", "0600"),
("iptables_v4_save", "iptables.v4", "0600"),
("iptables_v6_save", "iptables.v6", "0600"),
):
src_rel = str(snap.get(key) or "").strip()
if not src_rel:
continue
role_rel = _copy_artifact(
bundle_dir,
artifact_role,
src_rel,
role_files_dir,
dst_prefix=file_prefix,
)
if not role_rel:
self.notes.append(
f"Firewall runtime artifact {src_rel!r} was referenced but not found."
)
continue
dest = f"/etc/enroll/firewall/{dest_name}"
self.add_managed_file(
dest,
user="root",
group="root",
mode=mode,
source=_source_uri(self.module_name, role_rel),
reason="firewall_runtime",
)
runtime[key] = dest
ipset_sets = [
str(x).strip() for x in (snap.get("ipset_sets") or []) if str(x).strip()
]
if ipset_sets:
runtime["ipset_sets"] = ipset_sets
if runtime:
runtime.update(_firewall_runtime_commands(runtime))
self.firewall_runtime.update(runtime)
for note in snap.get("notes", []) or []:
self.notes.append(str(note))
def add_container_images_snapshot(self, snap: Dict[str, Any]) -> None:
for raw in snap.get("images", []) or []:
@ -413,70 +309,6 @@ def _container_tag_cmd(engine: str, pull_ref: str, tag_ref: str) -> str:
return f"{engine} tag {_shell_quote(pull_ref)} {_shell_quote(tag_ref)}"
def _normalise_flatpak_item(
item: Dict[str, Any],
*,
method: str,
user: Optional[str] = None,
home: Optional[str] = None,
) -> Dict[str, Any]:
out = dict(item)
out["method"] = str(out.get("method") or method or "system").strip() or "system"
if user and not out.get("user"):
out["user"] = user
if home and not out.get("home"):
out["home"] = home
ref = str(out.get("ref") or "").strip()
if ref and not out.get("name"):
out["name"] = ref.rsplit("/", 1)[-1]
name = str(out.get("name") or out.get("app_id") or "").strip()
if name:
out["name"] = name
remote = str(out.get("remote") or "").strip()
if remote:
out["remote"] = remote
branch = str(out.get("branch") or out.get("origin") or "").strip()
if branch:
out["branch"] = branch
if ref:
out["ref"] = ref
return out
def _normalise_flatpak_remote(item: Dict[str, Any]) -> Dict[str, Any]:
out = dict(item)
name = str(out.get("name") or out.get("remote") or "").strip()
url = str(out.get("url") or out.get("from_url") or "").strip()
method = str(out.get("method") or out.get("scope") or "system").strip() or "system"
if name:
out["name"] = name
if url:
out["url"] = url
out["method"] = "user" if method == "user" else "system"
return out
def _normalise_snap_item(item: Dict[str, Any]) -> Dict[str, Any]:
out = dict(item)
name = str(out.get("name") or "").strip()
if name:
out["name"] = name
channel = str(out.get("tracking") or out.get("channel") or "").strip()
if channel:
out["channel"] = channel
notes = [str(note).lower() for note in (out.get("notes") or [])]
confinement = str(out.get("confinement") or "").strip().lower()
out["classic"] = confinement == "classic" or any(
"classic" in note for note in notes
)
out["devmode"] = any("devmode" in note or "dev mode" in note for note in notes)
out["dangerous"] = any("dangerous" in note for note in notes)
revision = str(out.get("revision") or "").strip()
if revision and not channel:
out["revision"] = revision
return out
def _flatpak_scope(item: Dict[str, Any]) -> str:
return "--user" if str(item.get("method") or "system") == "user" else "--system"
@ -583,30 +415,6 @@ def _prepare_snap_item(item: Dict[str, Any]) -> Dict[str, Any]:
return out
def _firewall_ipset_restore_cmd(path: str, sets: List[str]) -> str:
flush_parts = [f"ipset flush {_shell_quote(name)} || true" for name in sets]
flush = "; ".join(flush_parts)
restore = f"ipset restore -exist < {_shell_quote(path)}"
if flush:
return f"/bin/sh -c {_shell_quote(flush + '; ' + restore)}"
return f"/bin/sh -c {_shell_quote(restore)}"
def _firewall_runtime_commands(runtime: Dict[str, Any]) -> Dict[str, Any]:
out: Dict[str, Any] = {}
ipset_path = str(runtime.get("ipset_save") or "")
if ipset_path:
sets = [str(x) for x in (runtime.get("ipset_sets") or []) if str(x)]
out["ipset_restore_cmd"] = _firewall_ipset_restore_cmd(ipset_path, sets)
ipt4_path = str(runtime.get("iptables_v4_save") or "")
if ipt4_path:
out["iptables_v4_restore_cmd"] = f"iptables-restore {_shell_quote(ipt4_path)}"
ipt6_path = str(runtime.get("iptables_v6_save") or "")
if ipt6_path:
out["iptables_v6_restore_cmd"] = f"ip6tables-restore {_shell_quote(ipt6_path)}"
return out
def _append_firewall_runtime_states(lines: List[str], runtime: Dict[str, Any]) -> None:
specs = [
(
@ -807,60 +615,29 @@ def _collect_salt_roles(
overwrite_templates=not bool(fqdn),
)
for svc in roles.get("services", []) or []:
if not isinstance(svc, dict):
continue
original_role_name = _salt_name(
str(svc.get("role_name") or svc.get("unit") or "service"),
fallback="service",
for entry in CMModule.package_service_entries(
roles, inventory_packages, use_common_roles=use_common_roles
):
snap = entry.get("snapshot") or {}
kind = str(entry.get("kind") or "package")
fallback = "service" if kind == "service" else "package"
source_label = str(
snap.get("role_name") or snap.get("unit") or snap.get("package") or fallback
)
original_role_name = _salt_name(source_label, fallback=fallback)
role_name = _salt_name(
str(entry.get("role_label") or source_label),
fallback="package_group" if use_common_roles else fallback,
)
if use_common_roles:
role_name = _salt_name(
section_label_for_packages(
[
str(p).strip()
for p in (svc.get("packages") or [])
if str(p).strip()
],
inventory_packages,
),
fallback="package_group",
)
else:
role_name = original_role_name
srole = ensure_role(role_name)
srole.add_service_snapshot(svc)
srole.add_managed_content(
svc,
bundle_dir=bundle_dir,
artifact_role=str(svc.get("role_name") or original_role_name),
role_files_dir=states_dir / "roles" / srole.module_name / "files",
file_prefix=node_file_prefix,
jt_exe=jt_exe,
jt_enabled=jt_enabled,
overwrite_templates=not bool(fqdn),
)
for pkg in roles.get("packages", []) or []:
if not isinstance(pkg, dict):
continue
original_role_name = _salt_name(
str(pkg.get("role_name") or pkg.get("package") or "package"),
fallback="package",
)
if use_common_roles:
role_name = _salt_name(
package_section_label(pkg, inventory_packages),
fallback="package_group",
)
if kind == "service":
srole.add_service_snapshot(snap)
else:
role_name = original_role_name
srole = ensure_role(role_name)
srole.add_package_snapshot(pkg)
srole.add_package_snapshot(snap)
srole.add_managed_content(
pkg,
snap,
bundle_dir=bundle_dir,
artifact_role=str(pkg.get("role_name") or original_role_name),
artifact_role=str(snap.get("role_name") or original_role_name),
role_files_dir=states_dir / "roles" / srole.module_name / "files",
file_prefix=node_file_prefix,
jt_exe=jt_exe,
@ -1621,17 +1398,12 @@ def _render_readme(
) -> str:
host = state.get("host", {}) if isinstance(state.get("host"), dict) else {}
hostname = host.get("hostname") or "unknown"
role_lines = (
"\n".join(
f"- `{r.sls_name}` from Enroll role `{r.role_name}`" for r in salt_roles
)
or "- None."
role_lines = markdown_list(
f"`{r.sls_name}` from Enroll role `{r.role_name}`" for r in salt_roles
)
notes_text = markdown_list(
f"`{r.sls_name}`: {note}" for r in salt_roles for note in r.notes
)
notes: List[str] = []
for r in salt_roles:
for note in r.notes:
notes.append(f"`{r.sls_name}`: {note}")
notes_text = "\n".join(f"- {n}" for n in notes) or "- None."
if fqdn:
node_display = (