diff --git a/enroll/ansible.py b/enroll/ansible.py index 933052f..1778fd9 100644 --- a/enroll/ansible.py +++ b/enroll/ansible.py @@ -7,9 +7,9 @@ import stat import tempfile from dataclasses import dataclass from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, Set, Tuple +from typing import Any, Dict, List, Optional, Set, Tuple -from .cm import CMModule, package_section_label, section_label_for_packages +from .cm import CMModule, markdown_list, snapshot_excluded_lines, snapshot_note_lines from .jinjaturtle import ( jinjify_managed_files as _jinjify_managed_files, resolve_jinjaturtle_mode, @@ -48,34 +48,48 @@ class AnsibleRole(CMModule): self.entries: List[Dict[str, Any]] = [] self.excluded: List[Dict[str, Any]] = [] self.origin_lines: List[str] = [] + self.container_images: List[Dict[str, Any]] = [] + self.flatpak_remotes: List[Dict[str, Any]] = [] + self.flatpaks: List[Dict[str, Any]] = [] + self.snaps: List[Dict[str, Any]] = [] + self.users_groups: List[str] = [] + self.users_data: List[Dict[str, Any]] = [] + self.users_ssh_dirs: List[Dict[str, Any]] = [] + self.users_ssh_files: List[Dict[str, Any]] = [] + + def has_resources(self) -> bool: + return self.has_resources_or_attrs( + "container_images", + "flatpak_remotes", + "flatpaks", + "snaps", + "users_data", + "users_ssh_files", + ) def add_package_snapshot(self, snap: Dict[str, Any]) -> None: - pkg = str(snap.get("package") or "").strip() + pkg = self.package_name_from_snapshot(snap) source_role = str(snap.get("role_name") or pkg or self.role_name) self.entries.append({"kind": "package", "snapshot": snap}) + super().add_package_snapshot(snap) if pkg: - self.packages.add(pkg) self.origin_lines.append(f"package `{pkg}` from role `{source_role}`") self.add_managed_content(snap) def add_service_snapshot(self, snap: Dict[str, Any]) -> None: - unit = str(snap.get("unit") or "").strip() + unit = self.service_unit_from_snapshot(snap) source_role = str(snap.get("role_name") or unit or self.role_name) self.entries.append({"kind": "service", "snapshot": snap}) - for pkg in snap.get("packages", []) or []: - pkg_s = str(pkg or "").strip() - if pkg_s: - self.packages.add(pkg_s) + self.add_service_packages_from_snapshot(snap) if unit: - unit_file_state = str(snap.get("unit_file_state") or "") self.services.setdefault( unit, { "name": unit, "manage": True, - "enabled": unit_file_state in ("enabled", "enabled-runtime"), - "state": ( - "started" if snap.get("active_state") == "active" else "stopped" + "enabled": self.service_enabled_from_snapshot(snap), + "state": self.service_state_from_snapshot( + snap, running="started", stopped="stopped" ), }, ) @@ -126,6 +140,189 @@ class AnsibleRole(CMModule): def systemd_units_var(self) -> List[Dict[str, Any]]: return [self.services[k] for k in sorted(self.services)] + def add_firewall_runtime_snapshot(self, snap: Dict[str, Any]) -> None: + self.add_service_packages_from_snapshot(snap) + self.firewall_runtime.update(self.firewall_runtime_source_refs(snap)) + ipset_sets = self.firewall_runtime_ipset_sets(snap) + if ipset_sets: + self.firewall_runtime["ipset_sets"] = ipset_sets + self.add_snapshot_notes(snap) + + def prepare_flatpak_remote(self, item: Dict[str, Any]) -> Dict[str, Any]: + return dict(item) + + def prepare_flatpak_item(self, item: Dict[str, Any]) -> Dict[str, Any]: + return dict(item) + + def prepare_snap_item(self, item: Dict[str, Any]) -> Dict[str, Any]: + return _normalise_snap_item(item) + + def add_container_images_snapshot(self, snap: Dict[str, Any]) -> None: + self.container_images = [ + _normalise_container_image_item(img) + for img in (snap.get("images", []) or []) + ] + self.add_snapshot_notes(snap) + + def add_users_snapshot(self, snap: Dict[str, Any]) -> None: + users_data = self.user_records_from_snapshot(snap) + for user in users_data: + user["ssh_dir"] = str(user.get("home") or "").rstrip("/") + "/.ssh" + + ssh_files: List[Dict[str, Any]] = [] + for mf in snap.get("managed_files", []) or []: + dest = mf.get("path") or "" + src_rel = mf.get("src_rel") or "" + if not dest or not src_rel: + continue + + owner = "root" + group = "root" + for u in users_data: + home_prefix = (u.get("home") or "").rstrip("/") + "/" + if home_prefix and dest.startswith(home_prefix): + owner = str(u.get("name") or "root") + group = str(u.get("primary_group") or owner) + break + + mode = mf.get("mode") or "0644" + if mf.get("reason") == "authorized_keys": + mode = "0600" + ssh_files.append( + { + "dest": dest, + "src_rel": src_rel, + "owner": owner, + "group": group, + "mode": mode, + } + ) + + ssh_dirs_by_dest: Dict[str, Dict[str, Any]] = {} + for item in ssh_files: + dest = str(item.get("dest") or "") + if not dest: + continue + for user in users_data: + ssh_dir = str(user.get("ssh_dir") or "").rstrip("/") + if not ssh_dir or not dest.startswith(ssh_dir + "/"): + continue + ssh_dirs_by_dest.setdefault( + ssh_dir, + { + "dest": ssh_dir, + "owner": str(user.get("name") or item.get("owner") or "root"), + "group": str( + user.get("primary_group") or item.get("group") or "root" + ), + "mode": "0700", + }, + ) + break + + self.users_groups = sorted(self.user_group_names_from_records(users_data)) + self.users_data = users_data + self.users_ssh_files = ssh_files + self.users_ssh_dirs = sorted( + ssh_dirs_by_dest.values(), key=lambda item: str(item.get("dest") or "") + ) + self.add_user_flatpaks_snapshot(snap) + self.excluded.extend(snap.get("excluded", []) or []) + self.add_snapshot_notes(snap) + + def render_firewall_runtime_tasks(self) -> str: + var_prefix = self.role_name + return f"""- name: Ensure firewall runtime snapshot directory exists + ansible.builtin.file: + path: {self.firewall_runtime_dir} + state: directory + owner: root + group: root + mode: "0750" + +- name: Deploy captured ipset snapshot + vars: + _enroll_ff: + files: + - "{{{{ inventory_dir }}}}/host_vars/{{{{ inventory_hostname }}}}/{{{{ role_name }}}}/.files/{{{{ {var_prefix}_ipset_save }}}}" + - "{{{{ role_path }}}}/files/{{{{ {var_prefix}_ipset_save }}}}" + ansible.builtin.copy: + src: "{{{{ lookup('ansible.builtin.first_found', _enroll_ff) }}}}" + dest: {self.firewall_runtime_dest_path('ipset.save')} + owner: root + group: root + mode: "0600" + notify: Restore captured ipsets + when: ({var_prefix}_ipset_save | default('') | length) > 0 + +- name: Deploy captured IPv4 iptables snapshot + vars: + _enroll_ff: + files: + - "{{{{ inventory_dir }}}}/host_vars/{{{{ inventory_hostname }}}}/{{{{ role_name }}}}/.files/{{{{ {var_prefix}_iptables_v4_save }}}}" + - "{{{{ role_path }}}}/files/{{{{ {var_prefix}_iptables_v4_save }}}}" + ansible.builtin.copy: + src: "{{{{ lookup('ansible.builtin.first_found', _enroll_ff) }}}}" + dest: {self.firewall_runtime_dest_path('iptables.v4')} + owner: root + group: root + mode: "0600" + notify: Restore captured IPv4 iptables rules + when: ({var_prefix}_iptables_v4_save | default('') | length) > 0 + +- name: Deploy captured IPv6 iptables snapshot + vars: + _enroll_ff: + files: + - "{{{{ inventory_dir }}}}/host_vars/{{{{ inventory_hostname }}}}/{{{{ role_name }}}}/.files/{{{{ {var_prefix}_iptables_v6_save }}}}" + - "{{{{ role_path }}}}/files/{{{{ {var_prefix}_iptables_v6_save }}}}" + ansible.builtin.copy: + src: "{{{{ lookup('ansible.builtin.first_found', _enroll_ff) }}}}" + dest: {self.firewall_runtime_dest_path('iptables.v6')} + owner: root + group: root + mode: "0600" + notify: Restore captured IPv6 iptables rules + when: ({var_prefix}_iptables_v6_save | default('') | length) > 0 +""" + + def render_firewall_runtime_handlers(self) -> str: + var_prefix = self.role_name + return f"""--- +- name: Flush captured ipsets before restoring members + ansible.builtin.command: + cmd: "ipset flush {{{{ item }}}}" + loop: "{{{{ {var_prefix}_ipset_sets | default([]) }}}}" + listen: Restore captured ipsets + register: _enroll_ipset_flush + failed_when: false + changed_when: false + when: {var_prefix}_sync_ipsets_exact | default(true) | bool + +- name: Restore captured ipsets + ansible.builtin.shell: "ipset restore -exist < {self.firewall_runtime_dest_path('ipset.save')}" + args: + executable: /bin/sh + listen: Restore captured ipsets + when: ({var_prefix}_ipset_save | default('') | length) > 0 + +- name: Restore captured IPv4 iptables rules + ansible.builtin.command: + cmd: iptables-restore {self.firewall_runtime_dest_path('iptables.v4')} + listen: Restore captured IPv4 iptables rules + when: + - ({var_prefix}_iptables_v4_save | default('') | length) > 0 + - {var_prefix}_restore_iptables | default(true) | bool + +- name: Restore captured IPv6 iptables rules + ansible.builtin.command: + cmd: ip6tables-restore {self.firewall_runtime_dest_path('iptables.v6')} + listen: Restore captured IPv6 iptables rules + when: + - ({var_prefix}_iptables_v6_save | default('') | length) > 0 + - {var_prefix}_restore_iptables | default(true) | bool +""" + def _role_id(raw: str) -> str: """Return an Ansible-safe role identifier from an arbitrary label.""" @@ -160,26 +357,24 @@ def _collect_ansible_roles( *, use_common_roles: bool, ) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], Dict[str, List[Dict[str, Any]]]]: - """Collect the raw Ansible role inputs from harvest state.""" - services = roles.get("services", []) or [] - packages = roles.get("packages", []) or [] + """Collect package/service role inputs from shared CMModule grouping logic.""" + + services: List[Dict[str, Any]] = [] + packages: List[Dict[str, Any]] = [] common_role_groups: Dict[str, List[Dict[str, Any]]] = {} - - if use_common_roles: - for svc in services: - label = section_label_for_packages( - svc.get("packages", []) or [], inventory_packages - ) - common_role_groups.setdefault(label, []).append( - {"kind": "service", "snapshot": svc} - ) - for pr in packages: - label = package_section_label(pr, inventory_packages) - common_role_groups.setdefault(label, []).append( - {"kind": "package", "snapshot": pr} - ) - return [], [], common_role_groups - + for entry in CMModule.package_service_entries( + roles, inventory_packages, use_common_roles=use_common_roles + ): + kind = str(entry.get("kind") or "package") + snap = entry.get("snapshot") or {} + if use_common_roles: + common_role_groups.setdefault( + str(entry.get("role_label") or "misc"), [] + ).append({"kind": kind, "snapshot": snap}) + elif kind == "service": + services.append(snap) + else: + packages.append(snap) return services, packages, common_role_groups @@ -510,6 +705,156 @@ def _write_role_defaults(role_dir: str, mapping: Dict[str, Any]) -> None: f.write(out) +def _write_role_meta(role_dir: str, collections: Optional[List[str]] = None) -> None: + meta_path = os.path.join(role_dir, "meta", "main.yml") + with open(meta_path, "w", encoding="utf-8") as f: + f.write("---\n") + f.write("dependencies: []\n") + if collections: + f.write("collections:\n") + for collection in collections: + f.write(f" - {collection}\n") + + +def _write_ansible_role_vars( + ctx: AnsibleManifestContext, + role_dir: str, + role: str, + vars_map: Dict[str, Any], + *, + site_defaults: Optional[Dict[str, Any]] = None, +) -> None: + """Write role variables using the same mode split as Puppet Hiera/Salt Pillar.""" + + if ctx.site_mode: + _write_role_defaults(role_dir, site_defaults or {}) + _write_hostvars(ctx.out_dir, ctx.fqdn or "", role, vars_map) + else: + _write_role_defaults(role_dir, vars_map) + + +def _write_ansible_role( + ctx: AnsibleManifestContext, + role: str, + *, + vars_map: Optional[Dict[str, Any]] = None, + site_defaults: Optional[Dict[str, Any]] = None, + tasks: str = "---\n", + handlers: str = "---\n", + collections: Optional[List[str]] = None, +) -> str: + """Write an Ansible role through one common logistical path. + + The CM-specific rendering remains target-specific, but role directory layout, + defaults/host_vars splitting and metadata writing are no longer repeated + in every feature renderer. + """ + + role_dir = os.path.join(ctx.roles_root, role) + _write_role_scaffold(role_dir) + _write_ansible_role_vars( + ctx, role_dir, role, vars_map or {}, site_defaults=site_defaults + ) + + with open(os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8") as f: + f.write(tasks.rstrip() + "\n") + + with open( + os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8" + ) as f: + f.write(handlers.rstrip() + "\n") + + _write_role_meta(role_dir, collections) + + return role + + +def _copy_role_artifacts( + ctx: AnsibleManifestContext, + role: str, + artifact_role: str, + *, + exclude_rels: Optional[Set[str]] = None, + preserve_existing: bool = False, +) -> None: + role_dir = os.path.join(ctx.roles_root, role) + dst_files_dir = ( + _host_role_files_dir(ctx.out_dir, ctx.fqdn or "", role) + if ctx.site_mode + else os.path.join(role_dir, "files") + ) + _copy_artifacts( + ctx.bundle_dir, + artifact_role, + dst_files_dir, + preserve_existing=preserve_existing, + exclude_rels=exclude_rels, + ) + + +def _render_readme( + state: Dict[str, Any], + rendered_roles: List[str], + *, + fqdn: Optional[str] = None, +) -> str: + host = state.get("host", {}) if isinstance(state.get("host"), dict) else {} + hostname = host.get("hostname") or "unknown" + roles = state.get("roles", {}) if isinstance(state.get("roles"), dict) else {} + excluded = snapshot_excluded_lines(roles) + notes = snapshot_note_lines(roles) + role_lines = "\n".join(f"- `{role}`" for role in rendered_roles) or "- None." + excluded_text = markdown_list(excluded) + notes_text = markdown_list(notes) + + if fqdn: + layout = f"""- `playbooks/{fqdn}.yml` applies the generated roles to `{fqdn}`. +- `inventory/hosts.ini` defines the target host. +- `inventory/host_vars/{fqdn}//main.yml` contains host-specific role variables. +- `inventory/host_vars/{fqdn}//.files/...` contains host-specific harvested file artifacts. +- `roles//tasks/main.yml` contains reusable Ansible tasks. +- `roles//files/...` and `roles//templates/...` contain reusable role artifacts where applicable.""" + apply = f"""```bash +ansible-galaxy collection install -r requirements.yml +ansible-playbook -i inventory/hosts.ini playbooks/{fqdn}.yml --check +```""" + else: + layout = """- `playbook.yml` applies the generated roles to the current inventory. +- `roles//tasks/main.yml` contains reusable Ansible tasks. +- `roles//defaults/main.yml` contains harvested/default role variables. +- `roles//files/...` and `roles//templates/...` contain harvested role artifacts where applicable. +- `roles//handlers/main.yml` contains any restore/restart/apply handlers.""" + apply = """```bash +ansible-galaxy collection install -r requirements.yml +ansible-playbook -i localhost, -c local playbook.yml --check --diff +```""" + + return f"""# Enroll Ansible manifest + +Generated from harvested state for `{hostname}`. + +## Layout + +{layout} + +## Roles + +{role_lines} + +## Excluded captured paths + +{excluded_text} + +## Notes + +{notes_text} + +## Apply / dry-run + +{apply} +""" + + def _write_site_scaffold(ctx: AnsibleManifestContext) -> None: if not ctx.site_mode: return @@ -725,108 +1070,123 @@ def _render_sysctl_handlers(var_prefix: str) -> str: """ -def _render_firewall_runtime_tasks(var_prefix: str) -> str: - """Render tasks for live ipset/iptables snapshots.""" - return f"""- name: Ensure firewall runtime snapshot directory exists - ansible.builtin.file: - path: /etc/enroll/firewall - state: directory - owner: root - group: root - mode: "0750" - -- name: Deploy captured ipset snapshot - vars: - _enroll_ff: - files: - - "{{{{ inventory_dir }}}}/host_vars/{{{{ inventory_hostname }}}}/{{{{ role_name }}}}/.files/{{{{ {var_prefix}_ipset_save }}}}" - - "{{{{ role_path }}}}/files/{{{{ {var_prefix}_ipset_save }}}}" - ansible.builtin.copy: - src: "{{{{ lookup('ansible.builtin.first_found', _enroll_ff) }}}}" - dest: /etc/enroll/firewall/ipset.save - owner: root - group: root - mode: "0600" - notify: Restore captured ipsets - when: ({var_prefix}_ipset_save | default('') | length) > 0 - -- name: Deploy captured IPv4 iptables snapshot - vars: - _enroll_ff: - files: - - "{{{{ inventory_dir }}}}/host_vars/{{{{ inventory_hostname }}}}/{{{{ role_name }}}}/.files/{{{{ {var_prefix}_iptables_v4_save }}}}" - - "{{{{ role_path }}}}/files/{{{{ {var_prefix}_iptables_v4_save }}}}" - ansible.builtin.copy: - src: "{{{{ lookup('ansible.builtin.first_found', _enroll_ff) }}}}" - dest: /etc/enroll/firewall/iptables.v4 - owner: root - group: root - mode: "0600" - notify: Restore captured IPv4 iptables rules - when: ({var_prefix}_iptables_v4_save | default('') | length) > 0 - -- name: Deploy captured IPv6 iptables snapshot - vars: - _enroll_ff: - files: - - "{{{{ inventory_dir }}}}/host_vars/{{{{ inventory_hostname }}}}/{{{{ role_name }}}}/.files/{{{{ {var_prefix}_iptables_v6_save }}}}" - - "{{{{ role_path }}}}/files/{{{{ {var_prefix}_iptables_v6_save }}}}" - ansible.builtin.copy: - src: "{{{{ lookup('ansible.builtin.first_found', _enroll_ff) }}}}" - dest: /etc/enroll/firewall/iptables.v6 - owner: root - group: root - mode: "0600" - notify: Restore captured IPv6 iptables rules - when: ({var_prefix}_iptables_v6_save | default('') | length) > 0 -""" +def _task_body(tasks: str) -> str: + return (tasks or "").strip().removeprefix("---").lstrip("\n") -def _render_firewall_runtime_handlers(var_prefix: str) -> str: - """Render handlers for live ipset/iptables snapshots. - - Runtime firewall snapshots are intentionally applied only when Enroll's - staged snapshot file changes. Live iptables/ipset state is volatile, and - daemons such as Docker may mutate counters/chains between configuration - management runs. Treating restore as a file-change handler keeps repeated - Ansible runs idempotent while still applying a new harvested snapshot. - """ - return f"""--- -- name: Flush captured ipsets before restoring members - ansible.builtin.command: - cmd: "ipset flush {{{{ item }}}}" - loop: "{{{{ {var_prefix}_ipset_sets | default([]) }}}}" - listen: Restore captured ipsets - register: _enroll_ipset_flush +def _render_single_systemd_tasks(var_prefix: str) -> str: + return f"""- name: Probe whether systemd unit exists and is manageable + ansible.builtin.systemd: + name: "{{{{ {var_prefix}_unit_name }}}}" + no_log: "{{{{ enroll_hide_systemd_status | default(true) | bool }}}}" + check_mode: true + register: _unit_probe failed_when: false changed_when: false - when: {var_prefix}_sync_ipsets_exact | default(true) | bool + when: {var_prefix}_manage_unit | default(false) -- name: Restore captured ipsets - ansible.builtin.shell: "ipset restore -exist < /etc/enroll/firewall/ipset.save" - args: - executable: /bin/sh - listen: Restore captured ipsets - when: ({var_prefix}_ipset_save | default('') | length) > 0 - -- name: Restore captured IPv4 iptables rules - ansible.builtin.command: - cmd: iptables-restore /etc/enroll/firewall/iptables.v4 - listen: Restore captured IPv4 iptables rules +- name: Ensure unit enablement matches harvest + ansible.builtin.systemd: + name: "{{{{ {var_prefix}_unit_name }}}}" + enabled: "{{{{ {var_prefix}_systemd_enabled | bool }}}}" + no_log: "{{{{ enroll_hide_systemd_status | default(true) | bool }}}}" when: - - ({var_prefix}_iptables_v4_save | default('') | length) > 0 - - {var_prefix}_restore_iptables | default(true) | bool + - {var_prefix}_manage_unit | default(false) + - _unit_probe is succeeded -- name: Restore captured IPv6 iptables rules - ansible.builtin.command: - cmd: ip6tables-restore /etc/enroll/firewall/iptables.v6 - listen: Restore captured IPv6 iptables rules +- name: Ensure unit running state matches harvest + ansible.builtin.systemd: + name: "{{{{ {var_prefix}_unit_name }}}}" + state: "{{{{ {var_prefix}_systemd_state }}}}" + no_log: "{{{{ enroll_hide_systemd_status | default(true) | bool }}}}" when: - - ({var_prefix}_iptables_v6_save | default('') | length) > 0 - - {var_prefix}_restore_iptables | default(true) | bool + - {var_prefix}_manage_unit | default(false) + - _unit_probe is succeeded """ +def _render_role_tasks( + role: AnsibleRole, + *, + packages: bool = False, + managed_content: bool = False, + single_service: bool = False, + grouped_services: bool = False, + sysctl: bool = False, + firewall_runtime: bool = False, + extra_tasks: str = "", +) -> str: + parts: List[str] = [] + if packages: + parts.append(_render_install_packages_tasks(role.role_name, role.var_prefix)) + if managed_content: + parts.append(_render_generic_files_tasks(role.var_prefix)) + if single_service: + parts.append(_render_single_systemd_tasks(role.var_prefix)) + if grouped_services: + parts.append(_render_grouped_systemd_tasks(role.var_prefix)) + if sysctl: + parts.append(_render_sysctl_tasks(role.var_prefix)) + if firewall_runtime: + parts.append(role.render_firewall_runtime_tasks()) + if extra_tasks: + parts.append(extra_tasks) + + body = "\n\n".join(_task_body(part) for part in parts if _task_body(part)) + return "---\n" + (body.rstrip() + "\n" if body else "") + + +def _single_service_restart_handler_body(var_prefix: str) -> str: + return f"""- name: Restart service + ansible.builtin.service: + name: "{{{{ {var_prefix}_unit_name }}}}" + state: restarted + when: + - {var_prefix}_manage_unit | default(false) + - ({var_prefix}_systemd_state | default('stopped')) == 'started' +""" + + +def _grouped_service_restart_handler_body(var_prefix: str) -> str: + return f"""- name: Restart managed services + ansible.builtin.service: + name: "{{{{ item.name }}}}" + state: restarted + loop: "{{{{ {var_prefix}_systemd_units | default([]) }}}}" + when: + - item.manage | default(false) + - (item.state | default('stopped')) == 'started' +""" + + +def _render_role_handlers( + role: AnsibleRole, + *, + systemd_reload: bool = False, + single_service: bool = False, + grouped_services: bool = False, + sysctl: bool = False, + firewall_runtime: bool = False, + extra_handlers: str = "", +) -> str: + parts: List[str] = [] + if systemd_reload or single_service or grouped_services: + parts.append(_SYSTEMD_DAEMON_RELOAD_HANDLER) + if single_service: + parts.append(_single_service_restart_handler_body(role.var_prefix)) + if grouped_services: + parts.append(_grouped_service_restart_handler_body(role.var_prefix)) + if sysctl: + parts.append(_render_sysctl_handlers(role.var_prefix)) + if firewall_runtime: + parts.append(role.render_firewall_runtime_handlers()) + if extra_handlers: + parts.append(extra_handlers) + + body = "\n\n".join(_task_body(part) for part in parts if _task_body(part)) + return "---\n" + (body.rstrip() + "\n" if body else "") + + # --- Ansible variable builders --- def _normalise_flatpak_item( item: Any, @@ -835,44 +1195,15 @@ def _normalise_flatpak_item( user: Optional[str] = None, home: Optional[str] = None, ) -> Dict[str, Any]: - if isinstance(item, str): - out: Dict[str, Any] = {"name": item, "method": method} - elif isinstance(item, dict): - out = dict(item) - out.setdefault("method", method) - else: - out = {"name": str(item), "method": method} - if user: - out.setdefault("user", user) - if home: - out.setdefault("home", home) - return out + return CMModule.normalise_flatpak_item(item, method=method, user=user, home=home) def _normalise_flatpak_remote(item: Any) -> Dict[str, Any]: - if isinstance(item, dict): - out = dict(item) - else: - out = {"name": str(item)} - out.setdefault("method", "system") - return out + return CMModule.normalise_flatpak_remote(item) def _normalise_snap_item(item: Any) -> Dict[str, Any]: - if isinstance(item, str): - out: Dict[str, Any] = {"name": item} - elif isinstance(item, dict): - out = dict(item) - else: - out = {"name": str(item)} - - notes = out.get("notes") or [] - if isinstance(notes, str): - notes = [notes] - notes_l = {str(n).lower() for n in notes} - out["classic"] = bool(out.get("classic") or "classic" in notes_l) - out["devmode"] = bool(out.get("devmode") or "devmode" in notes_l) - out["dangerous"] = bool(out.get("dangerous") or "dangerous" in notes_l) + out = CMModule.normalise_snap_item(item) # The Ansible snap module's revision parameter pins/holds the snap. For # ordinary store snaps that track a channel, preserve the channel instead @@ -976,228 +1307,6 @@ def _normalise_container_image_item(item: Any) -> Dict[str, Any]: return out -# --- Ansible README builders --- -def _markdown_list(items: List[str]) -> str: - values = [str(item) for item in items if str(item)] - return "\n".join(f"- {item}" for item in values) or "- (none)" - - -def _managed_file_lines( - managed_files: List[Dict[str, Any]], *, include_reason: bool -) -> List[str]: - out: List[str] = [] - for mf in managed_files: - path = str(mf.get("path") or "") - if not path: - continue - if include_reason: - out.append(f"{path} ({mf.get('reason')})") - else: - out.append(path) - return out - - -def _excluded_lines(excluded: List[Dict[str, Any]]) -> List[str]: - return [f"{e.get('path')} ({e.get('reason')})" for e in excluded if e.get("path")] - - -def _read_artifact_lines(bundle_dir: str, role: str, src_rel: str) -> List[str]: - art_path = os.path.join(bundle_dir, "artifacts", role, src_rel) - try: - with open(art_path, "r", encoding="utf-8", errors="replace") as f: - return [line.rstrip("\n") for line in f] - except OSError: - return [] - - -def _apt_config_readme( - *, - bundle_dir: str, - role: str, - snapshot: Dict[str, Any], - managed_files: List[Dict[str, Any]], - managed_dirs: List[Dict[str, Any]], - excluded: List[Dict[str, Any]], - notes: List[Any], -) -> str: - source_paths: List[str] = [] - keyring_paths: List[str] = [] - repo_hosts: Set[str] = set() - url_re = re.compile(r"(?:https?|ftp)://([^/\s]+)", re.IGNORECASE) - - for mf in managed_files: - path = str(mf.get("path") or "") - src_rel = str(mf.get("src_rel") or "") - if not path or not src_rel: - continue - if path == "/etc/apt/sources.list" or path.startswith( - "/etc/apt/sources.list.d/" - ): - source_paths.append(path) - for line in _read_artifact_lines(bundle_dir, role, src_rel): - s = line.strip() - if not s or s.startswith("#"): - continue - for match in url_re.finditer(s): - repo_hosts.add(match.group(1)) - if ( - path.startswith("/etc/apt/trusted.gpg") - or path.startswith("/etc/apt/keyrings/") - or path.startswith("/usr/share/keyrings/") - ): - keyring_paths.append(path) - - return f"""# apt_config - -APT configuration harvested from the system (sources, pinning, and keyrings). - -## Repository hosts -{_markdown_list(sorted(repo_hosts))} - -## Source files -{_markdown_list(sorted(set(source_paths)))} - -## Keyrings -{_markdown_list(sorted(set(keyring_paths)))} - -## Managed files -{_markdown_list(_managed_file_lines(managed_files, include_reason=True))} - -## Excluded -{_markdown_list(_excluded_lines(excluded))} - -## Notes -{_markdown_list([str(n) for n in notes])} -""" - - -def _dnf_config_readme( - *, - bundle_dir: str, - role: str, - snapshot: Dict[str, Any], - managed_files: List[Dict[str, Any]], - managed_dirs: List[Dict[str, Any]], - excluded: List[Dict[str, Any]], - notes: List[Any], -) -> str: - repo_paths: List[str] = [] - key_paths: List[str] = [] - repo_hosts: Set[str] = set() - url_re = re.compile(r"(?:https?|ftp)://([^/\s]+)", re.IGNORECASE) - file_url_re = re.compile(r"file://(/[^\s]+)") - - for mf in managed_files: - path = str(mf.get("path") or "") - src_rel = str(mf.get("src_rel") or "") - if not path or not src_rel: - continue - if path.startswith("/etc/yum.repos.d/") and path.endswith(".repo"): - repo_paths.append(path) - for line in _read_artifact_lines(bundle_dir, role, src_rel): - s = line.strip() - if not s or s.startswith("#") or s.startswith(";"): - continue - for match in url_re.finditer(s): - repo_hosts.add(match.group(1)) - for match in file_url_re.finditer(s): - key_paths.append(match.group(1)) - if path.startswith("/etc/pki/rpm-gpg/"): - key_paths.append(path) - - return f"""# dnf_config - -DNF/YUM configuration harvested from the system (repos, config files, and RPM GPG keys). - -## Repository hosts -{_markdown_list(sorted(repo_hosts))} - -## Repo files -{_markdown_list(sorted(set(repo_paths)))} - -## GPG keys -{_markdown_list(sorted(set(key_paths)))} - -## Managed files -{_markdown_list(_managed_file_lines(managed_files, include_reason=True))} - -## Excluded -{_markdown_list(_excluded_lines(excluded))} - -## Notes -{_markdown_list([str(n) for n in notes])} -""" - - -def _simple_managed_files_readme( - title: str, - description: str, - *, - include_reason: bool, -) -> Callable[..., str]: - def _builder( - *, - bundle_dir: str, - role: str, - snapshot: Dict[str, Any], - managed_files: List[Dict[str, Any]], - managed_dirs: List[Dict[str, Any]], - excluded: List[Dict[str, Any]], - notes: List[Any], - ) -> str: - return f"""# {title} - -{description} - -## Managed files -{_markdown_list(_managed_file_lines(managed_files, include_reason=include_reason))} - -## Excluded -{_markdown_list(_excluded_lines(excluded))} - -## Notes -{_markdown_list([str(n) for n in notes])} -""" - - return _builder - - -def _extra_paths_readme( - *, - bundle_dir: str, - role: str, - snapshot: Dict[str, Any], - managed_files: List[Dict[str, Any]], - managed_dirs: List[Dict[str, Any]], - excluded: List[Dict[str, Any]], - notes: List[Any], -) -> str: - include_pats = snapshot.get("include_patterns", []) or [] - exclude_pats = snapshot.get("exclude_patterns", []) or [] - return f"""# {role} - -User-requested extra file harvesting. - -## Include patterns -{_markdown_list([str(p) for p in include_pats])} - -## Exclude patterns -{_markdown_list([str(p) for p in exclude_pats])} - -## Managed directories -{_markdown_list([str(d.get('path') or '') for d in managed_dirs])} - -## Managed files -{_markdown_list(_managed_file_lines(managed_files, include_reason=False))} - -## Excluded -{_markdown_list(_excluded_lines(excluded))} - -## Notes -{_markdown_list([str(n) for n in notes])} -""" - - # --- Container image role renderer --- _CONTAINER_COLLECTIONS = [ {"name": "community.docker", "version": ">=4.0.0"}, @@ -1213,33 +1322,16 @@ def _render_container_images_role( if not container_images_snapshot and not raw_images: return - images = [_normalise_container_image_item(img) for img in raw_images] - if not images and not (container_images_snapshot.get("notes") or []): + arole = AnsibleRole(container_images_snapshot.get("role_name", "container_images")) + arole.add_container_images_snapshot(container_images_snapshot) + if not arole.container_images and not arole.notes: return - role = container_images_snapshot.get("role_name", "container_images") - role_dir = os.path.join(ctx.roles_root, role) - _write_role_scaffold(role_dir) _ensure_requirements_yaml( os.path.join(ctx.out_dir, "requirements.yml"), _CONTAINER_COLLECTIONS ) - vars_map = {"container_images": images} - if ctx.site_mode: - _write_role_defaults(role_dir, {"container_images": []}) - _write_hostvars(ctx.out_dir, ctx.fqdn or "", role, vars_map) - else: - _write_role_defaults(role_dir, vars_map) - - with open(os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8") as f: - f.write( - "---\n" - "dependencies: []\n" - "collections:\n" - " - community.docker\n" - " - containers.podman\n" - ) - + vars_map = {"container_images": arole.container_images} tasks = """--- - name: Pull Docker images by immutable registry digest @@ -1312,69 +1404,14 @@ def _render_container_images_role( become: true become_user: "{{ item.0.user }}" """ - with open(os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8") as f: - f.write(tasks) - - with open( - os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8" - ) as f: - f.write("---\n") - - def _fmt_image(img: Dict[str, Any]) -> str: - pull_ref = ( - img.get("pull_ref") or "(no registry digest; not rendered as an exact pull)" - ) - tags = img.get("repo_tags") or [] - tag_part = f" tags={', '.join(tags)}" if tags else "" - platform = img.get("platform") - platform_part = f" platform={platform}" if platform else "" - return f"- {img.get('engine', 'unknown')}: {pull_ref}{tag_part}{platform_part}" - - notes = list(container_images_snapshot.get("notes", []) or []) - unpinned_notes: List[str] = [] - for img in images: - if img.get("pull_ref"): - continue - label = ( - ", ".join(img.get("repo_tags") or []) - or img.get("image_id") - or "unknown image" - ) - unpinned_notes.append( - f"{label}: no RepoDigest was available, so no exact pull task is emitted." - ) - - readme = ( - """# container_images - -Generated Docker and Podman image-cache restoration role. - -Images are pulled by immutable registry digest, such as -`registry.example.net/app@sha256:...`, when the harvest found a usable -`RepoDigest`. Local image IDs are recorded in `state.json` for evidence but are -not registry pull references. - -**Note:** This role requires the `community.docker` and `containers.podman` -Ansible collections. Install them with: -`ansible-galaxy collection install -r requirements.yml`. - -Registry credentials are not harvested. Private-registry authentication must be -managed separately before this role runs. - -## Container images -""" - + "\n".join(_fmt_image(img) for img in images) - + """ - -## Notes -""" - + ("\n".join([f"- {n}" for n in notes + unpinned_notes]) or "- (none)") - + "\n" + return _write_ansible_role( + ctx, + arole.role_name, + vars_map=vars_map, + site_defaults={"container_images": []}, + tasks=_render_role_tasks(arole, extra_tasks=tasks), + collections=["community.docker", "containers.podman"], ) - with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f: - f.write(readme) - - return role # --- Desktop package role renderers --- @@ -1382,49 +1419,18 @@ def _render_flatpak_role( ctx: AnsibleManifestContext, flatpak_snapshot: Dict[str, Any], ) -> Optional[str]: - out_dir = ctx.out_dir - roles_root = ctx.roles_root - fqdn = ctx.fqdn - site_mode = ctx.site_mode + if not flatpak_snapshot: + return - # ------------------------- - # Flatpak role (system-wide Flatpak remotes and applications) - # ------------------------- - raw_flatpak_apps = flatpak_snapshot.get("system_flatpaks", []) or [] - raw_flatpak_remotes = flatpak_snapshot.get("remotes", []) or [] + arole = AnsibleRole(flatpak_snapshot.get("role_name", "flatpak")) + arole.add_flatpak_snapshot(flatpak_snapshot) - if flatpak_snapshot: - role = flatpak_snapshot.get("role_name", "flatpak") - role_dir = os.path.join(roles_root, role) - _write_role_scaffold(role_dir) - _ensure_requirements_yaml(os.path.join(out_dir, "requirements.yml")) - - flatpak_system_flatpaks = [ - _normalise_flatpak_item(fp, method="system") for fp in raw_flatpak_apps - ] - flatpak_remotes = [_normalise_flatpak_remote(r) for r in raw_flatpak_remotes] - - vars_map = { - "flatpak_system_flatpaks": flatpak_system_flatpaks, - "flatpak_remotes": flatpak_remotes, - } - if site_mode: - _write_role_defaults( - role_dir, - {"flatpak_system_flatpaks": [], "flatpak_remotes": []}, - ) - _write_hostvars(out_dir, fqdn or "", role, vars_map) - else: - _write_role_defaults(role_dir, vars_map) - - with open( - os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8" - ) as f: - f.write( - "---\n" "dependencies: []\n" "collections:\n" " - community.general\n" - ) - - tasks = """--- + _ensure_requirements_yaml(os.path.join(ctx.out_dir, "requirements.yml")) + vars_map = { + "flatpak_system_flatpaks": arole.flatpaks, + "flatpak_remotes": arole.flatpak_remotes, + } + tasks = """--- - name: Ensure system Flatpak remotes exist ansible.builtin.command: @@ -1457,111 +1463,34 @@ def _render_flatpak_role( - item.name | length > 0 become: true """ - with open( - os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8" - ) as f: - f.write(tasks) - - with open( - os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8" - ) as f: - f.write("---\n") - - def _fmt_flatpak_apps(items: List[Dict[str, Any]]) -> str: - lines = [] - for item in items: - name = item.get("name") - if not name: - continue - detail_parts = [] - for key in ("remote", "branch", "arch"): - value = item.get(key) - if value not in (None, "", []): - detail_parts.append(f"{key}={value}") - details = f" ({', '.join(detail_parts)})" if detail_parts else "" - lines.append(f"- {name}{details}") - return "\n".join(lines) or "- (none)" - - def _fmt_flatpak_remotes(items: List[Dict[str, Any]]) -> str: - lines = [] - for item in items: - name = item.get("name") - url = item.get("url") - if not name or not url: - continue - lines.append(f"- {name}: {url}") - return "\n".join(lines) or "- (none)" - - notes = flatpak_snapshot.get("notes", []) or [] - readme = ( - """# flatpak - -Generated system-wide Flatpak remotes and applications. - -**Note:** This role requires the `community.general` Ansible collection. -Install it with: `ansible-galaxy collection install -r requirements.yml`. - -Flatpak `remote` is harvested from the installed deployment where detectable. -The original `.flatpakref` URL is generally not preserved by Flatpak after -installation, so `from_url` is only emitted if a future/hand-edited state file -contains it. - -## System Flatpak remotes -""" - + _fmt_flatpak_remotes(flatpak_remotes) - + """\n -## System-wide Flatpaks -""" - + _fmt_flatpak_apps(flatpak_system_flatpaks) - + """\n -## Notes -""" - + ("\n".join([f"- {n}" for n in notes]) or "- (none)") - + """\n""" - ) - with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f: - f.write(readme) - - return role + return _write_ansible_role( + ctx, + arole.role_name, + vars_map=vars_map, + site_defaults={"flatpak_system_flatpaks": [], "flatpak_remotes": []}, + tasks=_render_role_tasks(arole, extra_tasks=tasks), + collections=["community.general"], + ) def _render_snap_role( ctx: AnsibleManifestContext, snap_snapshot: Dict[str, Any], ) -> Optional[str]: - out_dir = ctx.out_dir - roles_root = ctx.roles_root - fqdn = ctx.fqdn - site_mode = ctx.site_mode + if not ( + snap_snapshot + and (snap_snapshot.get("system_snaps") or snap_snapshot.get("notes")) + ): + return - # ------------------------- - # Snap role (system-wide snap packages) - # ------------------------- - raw_system_snaps = snap_snapshot.get("system_snaps", []) or [] + arole = AnsibleRole(snap_snapshot.get("role_name", "snap")) + arole.add_snap_snapshot(snap_snapshot) + if not (arole.snaps or arole.notes): + return - if raw_system_snaps: - role = snap_snapshot.get("role_name", "snap") if snap_snapshot else "snap" - role_dir = os.path.join(roles_root, role) - _write_role_scaffold(role_dir) - _ensure_requirements_yaml(os.path.join(out_dir, "requirements.yml")) - - snap_system_snaps = [_normalise_snap_item(s) for s in raw_system_snaps] - - vars_map = {"snap_system_snaps": snap_system_snaps} - if site_mode: - _write_role_defaults(role_dir, {"snap_system_snaps": []}) - _write_hostvars(out_dir, fqdn or "", role, vars_map) - else: - _write_role_defaults(role_dir, vars_map) - - with open( - os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8" - ) as f: - f.write( - "---\n" "dependencies: []\n" "collections:\n" " - community.general\n" - ) - - tasks = """--- + _ensure_requirements_yaml(os.path.join(ctx.out_dir, "requirements.yml")) + vars_map = {"snap_system_snaps": arole.snaps} + tasks = """--- - name: Install system-wide snaps with full detected attributes community.general.snap: @@ -1610,60 +1539,14 @@ def _render_snap_role( become: true ignore_errors: true """ - with open( - os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8" - ) as f: - f.write(tasks) - - with open( - os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8" - ) as f: - f.write("---\n") - - def _fmt_snap_apps(items: List[Dict[str, Any]]) -> str: - lines = [] - for item in items: - name = item.get("name") - if not name: - continue - detail_parts = [] - for key in ("channel", "revision"): - value = item.get(key) - if value not in (None, "", []): - detail_parts.append(f"{key}={value}") - for key in ("classic", "devmode", "dangerous"): - if item.get(key): - detail_parts.append(key) - details = f" ({', '.join(detail_parts)})" if detail_parts else "" - lines.append(f"- {name}{details}") - return "\n".join(lines) or "- (none)" - - notes = snap_snapshot.get("notes", []) or [] - readme = ( - """# snap - -Generated system-wide snap packages. - -**Note:** This role requires the `community.general` Ansible collection. -Install it with: `ansible-galaxy collection install -r requirements.yml`. - -The first install task uses all harvested attributes. If the installed -`community.general.snap` module is too old for some parameters, the generated -role falls back to reduced then minimal install tasks on a best-effort basis. - -## System-wide snaps -""" - + _fmt_snap_apps(snap_system_snaps) - + """\n -## Notes -""" - + ("\n".join([f"- {n}" for n in notes]) or "- (none)") - + """\n""" - ) - with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f: - f.write(readme) - - return role + return _write_ansible_role( + ctx, + arole.role_name, + vars_map=vars_map, + site_defaults={"snap_system_snaps": []}, + tasks=_render_role_tasks(arole, extra_tasks=tasks), + collections=["community.general"], + ) # --- Managed-file role renderers --- @@ -1674,7 +1557,6 @@ class AnsibleManagedFileRoleSpec: key: str default_role: str category: str - readme_builder: Callable[..., str] notify_systemd: Optional[str] = None handlers: str = "---\n" include_dirs_when_empty: bool = False @@ -1693,13 +1575,11 @@ MANAGED_FILE_ROLE_SPECS: Tuple[AnsibleManagedFileRoleSpec, ...] = ( key="apt_config", default_role="apt_config", category="apt_config", - readme_builder=_apt_config_readme, ), AnsibleManagedFileRoleSpec( key="dnf_config", default_role="dnf_config", category="dnf_config", - readme_builder=_dnf_config_readme, ), AnsibleManagedFileRoleSpec( key="etc_custom", @@ -1707,27 +1587,16 @@ MANAGED_FILE_ROLE_SPECS: Tuple[AnsibleManagedFileRoleSpec, ...] = ( category="etc_custom", notify_systemd="Run systemd daemon-reload", handlers=_SYSTEMD_DAEMON_RELOAD_HANDLER, - readme_builder=_simple_managed_files_readme( - "etc_custom", - "Unowned /etc config files not attributed to packages or services.", - include_reason=False, - ), ), AnsibleManagedFileRoleSpec( key="usr_local_custom", default_role="usr_local_custom", category="usr_local_custom", - readme_builder=_simple_managed_files_readme( - "usr_local_custom", - "Unowned /usr/local files (scripts in /usr/local/bin and config under /usr/local/etc).", - include_reason=False, - ), ), AnsibleManagedFileRoleSpec( key="extra_paths", default_role="extra_paths", category="extra_paths", - readme_builder=_extra_paths_readme, include_dirs_when_empty=True, ), ) @@ -1760,7 +1629,6 @@ def _write_managed_files_role_from_spec( jt_enabled=ctx.jt_enabled, notify_systemd=spec.notify_systemd, handlers=spec.handlers, - readme_builder=spec.readme_builder, ) return role @@ -1778,13 +1646,12 @@ def _write_managed_files_role( jt_enabled: bool, notify_systemd: Optional[str], handlers: str, - readme_builder: Callable[..., str], ) -> str: """Render an Ansible role whose main purpose is managed files/dirs. This covers apt_config, dnf_config, etc_custom, usr_local_custom, and - extra_paths. Their harvested state shape is the same; only their README - and optional handler differ. + extra_paths. Their harvested state shape is the same; only optional + handlers differ. """ role = snapshot.get("role_name", default_role) @@ -1794,9 +1661,6 @@ def _write_managed_files_role( var_prefix = role managed_files = snapshot.get("managed_files", []) or [] managed_dirs = snapshot.get("managed_dirs", []) or [] - excluded = snapshot.get("excluded", []) or [] - notes = snapshot.get("notes", []) or [] - templated, jt_vars = _jinjify_managed_files( bundle_dir, role, @@ -1846,7 +1710,7 @@ def _write_managed_files_role( else: _write_role_defaults(role_dir, vars_map) - tasks = "---\n" + _render_generic_files_tasks(var_prefix) + tasks = _render_role_tasks(AnsibleRole(role), managed_content=True) with open(os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8") as f: f.write(tasks.rstrip() + "\n") @@ -1858,18 +1722,6 @@ def _write_managed_files_role( with open(os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8") as f: f.write("---\ndependencies: []\n") - readme = readme_builder( - bundle_dir=bundle_dir, - role=role, - snapshot=snapshot, - managed_files=managed_files, - managed_dirs=managed_dirs, - excluded=excluded, - notes=notes, - ) - with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f: - f.write(readme) - return role @@ -1893,203 +1745,220 @@ def _render_managed_file_roles( # --- Package and service role renderers --- +def _role_managed_content_vars( + ctx: AnsibleManifestContext, + role: str, + entries: List[Dict[str, Any]], + *, + notify_by_kind: Optional[Dict[str, Optional[str]]] = None, + overwrite_templates: bool, +) -> Tuple[ + List[Dict[str, Any]], + List[Dict[str, Any]], + List[Dict[str, Any]], + Dict[str, Any], +]: + role_dir = os.path.join(ctx.roles_root, role) + files_var: List[Dict[str, Any]] = [] + dirs_var: List[Dict[str, Any]] = [] + links_var: List[Dict[str, Any]] = [] + jt_combined: Dict[str, Any] = {} + seen_files: Set[Tuple[Any, Any, Any]] = set() + seen_dirs: Set[Tuple[Any, Any, Any, Any]] = set() + seen_links: Set[Tuple[Any, Any]] = set() + + for entry in entries: + kind = str(entry.get("kind") or "package") + snap = entry.get("snapshot") or {} + source_role = str(snap.get("role_name") or "") + managed_files = snap.get("managed_files", []) or [] + managed_dirs = snap.get("managed_dirs", []) or [] + managed_links = snap.get("managed_links", []) or [] + + templated: Set[str] = set() + jt_vars = "" + if managed_files and source_role: + templated, jt_vars = _jinjify_managed_files( + ctx.bundle_dir, + source_role, + os.path.join(role_dir, "templates"), + managed_files, + jt_exe=ctx.jt_exe, + jt_enabled=ctx.jt_enabled, + overwrite_templates=overwrite_templates, + ) + _copy_role_artifacts(ctx, role, source_role, exclude_rels=templated) + + for item in _build_managed_files_var( + managed_files, + templated, + notify_other=(notify_by_kind or {}).get(kind), + notify_systemd="Run systemd daemon-reload", + ): + key = (item.get("dest"), item.get("src_rel"), item.get("kind")) + if key not in seen_files: + seen_files.add(key) + files_var.append(item) + + for item in _build_managed_dirs_var(managed_dirs): + key = ( + item.get("dest"), + item.get("owner"), + item.get("group"), + item.get("mode"), + ) + if key not in seen_dirs: + seen_dirs.add(key) + dirs_var.append(item) + + for item in _build_managed_links_var(managed_links): + key = (item.get("dest"), item.get("src")) + if key not in seen_links: + seen_links.add(key) + links_var.append(item) + + jt_map = yaml_load_mapping(jt_vars) if jt_vars.strip() else {} + jt_combined = _merge_mappings_overwrite(jt_combined, jt_map) + + return ( + sorted(files_var, key=lambda item: str(item.get("dest") or "")), + sorted(dirs_var, key=lambda item: str(item.get("dest") or "")), + sorted(links_var, key=lambda item: str(item.get("dest") or "")), + jt_combined, + ) + + +def _resource_role_vars( + role: AnsibleRole, + *, + files_var: List[Dict[str, Any]], + dirs_var: List[Dict[str, Any]], + links_var: List[Dict[str, Any]], + extra_vars: Optional[Dict[str, Any]] = None, + jt_vars: Optional[Dict[str, Any]] = None, +) -> Dict[str, Any]: + var_prefix = role.var_prefix + vars_map: Dict[str, Any] = { + f"{var_prefix}_packages": role.sorted_packages, + f"{var_prefix}_managed_files": files_var, + f"{var_prefix}_managed_dirs": dirs_var, + f"{var_prefix}_managed_links": links_var, + } + if extra_vars: + vars_map.update(extra_vars) + return _merge_mappings_overwrite(vars_map, jt_vars or {}) + + +def _resource_role_site_defaults( + var_prefix: str, + *, + extra_defaults: Optional[Dict[str, Any]] = None, +) -> Dict[str, Any]: + defaults: Dict[str, Any] = { + f"{var_prefix}_packages": [], + f"{var_prefix}_managed_files": [], + f"{var_prefix}_managed_dirs": [], + f"{var_prefix}_managed_links": [], + } + if extra_defaults: + defaults.update(extra_defaults) + return defaults + + +def _single_service_extra_vars(role: AnsibleRole) -> Dict[str, Any]: + var_prefix = role.var_prefix + unit = next(iter(role.services), "") + unit_state = role.services.get(unit, {}) + return { + f"{var_prefix}_unit_name": unit, + f"{var_prefix}_manage_unit": bool(unit), + f"{var_prefix}_systemd_enabled": bool(unit_state.get("enabled")), + f"{var_prefix}_systemd_state": str(unit_state.get("state") or "stopped"), + } + + +def _single_service_site_defaults(var_prefix: str, unit: str) -> Dict[str, Any]: + return { + f"{var_prefix}_unit_name": unit, + f"{var_prefix}_manage_unit": False, + f"{var_prefix}_systemd_enabled": False, + f"{var_prefix}_systemd_state": "stopped", + } + + +def _write_resource_ansible_role( + ctx: AnsibleManifestContext, + role: AnsibleRole, + *, + notify_by_kind: Dict[str, Optional[str]], + overwrite_templates: bool, + extra_vars: Optional[Dict[str, Any]] = None, + site_defaults: Optional[Dict[str, Any]] = None, + single_service: bool = False, + grouped_services: bool = False, + systemd_reload: bool = False, +) -> str: + files_var, dirs_var, links_var, jt_vars = _role_managed_content_vars( + ctx, + role.role_name, + role.entries, + notify_by_kind=notify_by_kind, + overwrite_templates=overwrite_templates, + ) + vars_map = _resource_role_vars( + role, + files_var=files_var, + dirs_var=dirs_var, + links_var=links_var, + extra_vars=extra_vars, + jt_vars=jt_vars, + ) + return _write_ansible_role( + ctx, + role.role_name, + vars_map=vars_map, + site_defaults=site_defaults, + tasks=_render_role_tasks( + role, + packages=True, + managed_content=True, + single_service=single_service, + grouped_services=grouped_services, + ), + handlers=_render_role_handlers( + role, + systemd_reload=systemd_reload, + single_service=single_service, + grouped_services=grouped_services, + ), + ) + + def _render_service_roles( ctx: AnsibleManifestContext, services_to_manifest: List[Dict[str, Any]], ) -> List[str]: - bundle_dir = ctx.bundle_dir - out_dir = ctx.out_dir - roles_root = ctx.roles_root - fqdn = ctx.fqdn - site_mode = ctx.site_mode - jt_exe = ctx.jt_exe - jt_enabled = ctx.jt_enabled rendered_roles: List[str] = [] - - # ------------------------- - # Service roles - # ------------------------- for svc in services_to_manifest: - source_role = svc["role_name"] - role = avoid_reserved_role_name(source_role, prefix="service") - unit = svc["unit"] - pkgs = svc.get("packages", []) or [] - managed_files = svc.get("managed_files", []) or [] - managed_dirs = svc.get("managed_dirs", []) or [] - managed_links = svc.get("managed_links", []) or [] - - ansible_role = AnsibleRole(role) - ansible_role.add_service_snapshot(svc) - - role_dir = os.path.join(roles_root, role) - _write_role_scaffold(role_dir) - - var_prefix = role - - unit_state = ansible_role.services.get(unit, {}) - enabled_at_harvest = bool(unit_state.get("enabled")) - desired_state = str(unit_state.get("state") or "stopped") - - templated, jt_vars = _jinjify_managed_files( - bundle_dir, - source_role, - os.path.join(role_dir, "templates"), - managed_files, - jt_exe=jt_exe, - jt_enabled=jt_enabled, - overwrite_templates=not site_mode, + source_role = str(svc.get("role_name") or svc.get("unit") or "service") + role_name = avoid_reserved_role_name(source_role, prefix="service") + role = AnsibleRole(role_name) + role.add_service_snapshot(svc) + var_prefix = role.var_prefix + unit = next(iter(role.services), str(svc.get("unit") or "")) + _write_resource_ansible_role( + ctx, + role, + notify_by_kind={"service": "Restart service"}, + overwrite_templates=not ctx.site_mode, + extra_vars=_single_service_extra_vars(role), + site_defaults=_resource_role_site_defaults( + var_prefix, + extra_defaults=_single_service_site_defaults(var_prefix, unit), + ), + single_service=True, ) - - # Copy only the non-templated artifacts. - if site_mode: - _copy_artifacts( - bundle_dir, - source_role, - _host_role_files_dir(out_dir, fqdn or "", role), - exclude_rels=templated, - ) - else: - _copy_artifacts( - bundle_dir, - source_role, - os.path.join(role_dir, "files"), - exclude_rels=templated, - ) - - files_var = _build_managed_files_var( - managed_files, - templated, - notify_other="Restart service", - notify_systemd="Run systemd daemon-reload", - ) - - links_var = _build_managed_links_var(managed_links) - - dirs_var = _build_managed_dirs_var(managed_dirs) - - jt_map = yaml_load_mapping(jt_vars) if jt_vars.strip() else {} - base_vars: Dict[str, Any] = { - f"{var_prefix}_unit_name": unit, - f"{var_prefix}_packages": pkgs, - f"{var_prefix}_managed_files": files_var, - f"{var_prefix}_managed_dirs": dirs_var, - f"{var_prefix}_managed_links": links_var, - f"{var_prefix}_manage_unit": True, - f"{var_prefix}_systemd_enabled": bool(enabled_at_harvest), - f"{var_prefix}_systemd_state": desired_state, - } - base_vars = _merge_mappings_overwrite(base_vars, jt_map) - - if site_mode: - # Role defaults are host-agnostic/safe; all harvested state is in host_vars. - _write_role_defaults( - role_dir, - { - f"{var_prefix}_unit_name": unit, - f"{var_prefix}_packages": [], - f"{var_prefix}_managed_files": [], - f"{var_prefix}_managed_dirs": [], - f"{var_prefix}_managed_links": [], - f"{var_prefix}_manage_unit": False, - f"{var_prefix}_systemd_enabled": False, - f"{var_prefix}_systemd_state": "stopped", - }, - ) - _write_hostvars(out_dir, fqdn or "", role, base_vars) - else: - _write_role_defaults(role_dir, base_vars) - - handlers = f"""--- -- name: Run systemd daemon-reload - ansible.builtin.systemd: - daemon_reload: true - no_log: "{{ enroll_hide_systemd_status | default(true) | bool }}" - -- name: Restart service - ansible.builtin.service: - name: "{{{{ {var_prefix}_unit_name }}}}" - state: restarted - when: - - {var_prefix}_manage_unit | default(false) - - ({var_prefix}_systemd_state | default('stopped')) == 'started' -""" - with open( - os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8" - ) as f: - f.write(handlers) - - task_parts: List[str] = [] - task_parts.append("---\n" + _render_install_packages_tasks(role, var_prefix)) - - task_parts.append(_render_generic_files_tasks(var_prefix)) - - task_parts.append( - f"""- name: Probe whether systemd unit exists and is manageable - ansible.builtin.systemd: - name: "{{{{ {var_prefix}_unit_name }}}}" - no_log: "{{{{ enroll_hide_systemd_status | default(true) | bool }}}}" - check_mode: true - register: _unit_probe - failed_when: false - changed_when: false - when: {var_prefix}_manage_unit | default(false) - -- name: Ensure unit enablement matches harvest - ansible.builtin.systemd: - name: "{{{{ {var_prefix}_unit_name }}}}" - enabled: "{{{{ {var_prefix}_systemd_enabled | bool }}}}" - no_log: "{{{{ enroll_hide_systemd_status | default(true) | bool }}}}" - when: - - {var_prefix}_manage_unit | default(false) - - _unit_probe is succeeded - -- name: Ensure unit running state matches harvest - ansible.builtin.systemd: - name: "{{{{ {var_prefix}_unit_name }}}}" - state: "{{{{ {var_prefix}_systemd_state }}}}" - no_log: "{{{{ enroll_hide_systemd_status | default(true) | bool }}}}" - when: - - {var_prefix}_manage_unit | default(false) - - _unit_probe is succeeded -""" - ) - - tasks = "\n".join(task_parts).rstrip() + "\n" - with open( - os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8" - ) as f: - f.write(tasks) - - with open( - os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8" - ) as f: - f.write("---\ndependencies: []\n") - - excluded = svc.get("excluded", []) - notes = svc.get("notes", []) - readme = f"""# {role} - -Generated from `{unit}`. - -## Packages -{os.linesep.join("- " + p for p in pkgs) or "- (none detected)"} - -## Managed files -{os.linesep.join("- " + mf["path"] + " (" + mf["reason"] + ")" for mf in managed_files) or "- (none)"} - -## Managed symlinks -{os.linesep.join("- " + ml["path"] + " -> " + ml["target"] + " (" + ml.get("reason", "") + ")" for ml in managed_links) or "- (none)"} - -## Excluded (possible secrets / unsafe) -{os.linesep.join("- " + e["path"] + " (" + e["reason"] + ")" for e in excluded) or "- (none)"} - -## Notes -{os.linesep.join("- " + n for n in notes) or "- (none)"} -""" - with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f: - f.write(readme) - - _add_role(rendered_roles, role) + _add_role(rendered_roles, role.role_name) return rendered_roles @@ -2099,26 +1968,8 @@ def _render_common_ansible_roles( package_roles: List[Dict[str, Any]], occupied_role_names: Set[str], ) -> Tuple[List[str], List[str]]: - bundle_dir = ctx.bundle_dir - roles_root = ctx.roles_root - jt_exe = ctx.jt_exe - jt_enabled = ctx.jt_enabled - rendered_roles: List[str] = [] common_tail_roles: List[str] = [] - - # ------------------------- - # Common package section/group roles - # - # Outside --fqdn/site mode, package and systemd-unit roles are grouped by - # Debian Section or RPM Group by default. Managed config and unit state can - # live in those section roles too; --no-common-roles preserves the historic - # one-role-per-package/unit output, and --fqdn implies that mode because - # grouped role contents would be unsafe across multiple harvested hosts. - # ------------------------- - # ------------------------- - # Manually installed package roles - # ------------------------- occupied_roles: Set[str] = set(occupied_role_names) for pr in package_roles: occupied_roles.add( @@ -2126,183 +1977,33 @@ def _render_common_ansible_roles( ) for section_label, entries in sorted(common_role_groups.items()): - role = _section_role_name(section_label, occupied_roles) - ansible_role = AnsibleRole( - role, - var_prefix=role, + role_name = _section_role_name(section_label, occupied_roles) + role = AnsibleRole( + role_name, + var_prefix=role_name, section_label=section_label, grouped=True, ) for entry in entries: - kind = entry.get("kind") or "package" snap = entry.get("snapshot") or {} - if kind == "service": - ansible_role.add_service_snapshot(snap) + if (entry.get("kind") or "package") == "service": + role.add_service_snapshot(snap) else: - ansible_role.add_package_snapshot(snap) + role.add_package_snapshot(snap) - role_dir = os.path.join(roles_root, role) - _write_role_scaffold(role_dir) + systemd_units = role.systemd_units_var + if {"cron", "logrotate"}.intersection(role.packages): + common_tail_roles.append(role.role_name) - var_prefix = ansible_role.var_prefix - files_var: List[Dict[str, Any]] = [] - dirs_var: List[Dict[str, Any]] = [] - links_var: List[Dict[str, Any]] = [] - jt_combined: Dict[str, Any] = {} - - seen_files: Set[tuple] = set() - seen_dirs: Set[tuple] = set() - seen_links: Set[tuple] = set() - - for entry in ansible_role.entries: - kind = entry.get("kind") or "package" - snap = entry.get("snapshot") or {} - source_role = str(snap.get("role_name") or "") - managed_files = snap.get("managed_files", []) or [] - managed_dirs = snap.get("managed_dirs", []) or [] - managed_links = snap.get("managed_links", []) or [] - - templated: Set[str] = set() - jt_vars = "" - if managed_files and source_role: - templated, jt_vars = _jinjify_managed_files( - bundle_dir, - source_role, - os.path.join(role_dir, "templates"), - managed_files, - jt_exe=jt_exe, - jt_enabled=jt_enabled, - overwrite_templates=True, - ) - - _copy_artifacts( - bundle_dir, - source_role, - os.path.join(role_dir, "files"), - exclude_rels=templated, - ) - - notify_other = "Restart managed services" if kind == "service" else None - for item in _build_managed_files_var( - managed_files, - templated, - notify_other=notify_other, - notify_systemd="Run systemd daemon-reload", - ): - key = (item.get("dest"), item.get("src_rel"), item.get("kind")) - if key not in seen_files: - seen_files.add(key) - files_var.append(item) - - for item in _build_managed_dirs_var(managed_dirs): - key = ( - item.get("dest"), - item.get("owner"), - item.get("group"), - item.get("mode"), - ) - if key not in seen_dirs: - seen_dirs.add(key) - dirs_var.append(item) - - for item in _build_managed_links_var(managed_links): - key = (item.get("dest"), item.get("src")) - if key not in seen_links: - seen_links.add(key) - links_var.append(item) - - jt_map = yaml_load_mapping(jt_vars) if jt_vars.strip() else {} - jt_combined = _merge_mappings_overwrite(jt_combined, jt_map) - - packages = ansible_role.sorted_packages - files_var = sorted(files_var, key=lambda x: str(x.get("dest") or "")) - dirs_var = sorted(dirs_var, key=lambda x: str(x.get("dest") or "")) - links_var = sorted(links_var, key=lambda x: str(x.get("dest") or "")) - systemd_units = ansible_role.systemd_units_var - - base_vars: Dict[str, Any] = { - f"{var_prefix}_packages": packages, - f"{var_prefix}_managed_files": files_var, - f"{var_prefix}_managed_dirs": dirs_var, - f"{var_prefix}_managed_links": links_var, - f"{var_prefix}_systemd_units": systemd_units, - } - base_vars = _merge_mappings_overwrite(base_vars, jt_combined) - - _write_role_defaults(role_dir, base_vars) - - if {"cron", "logrotate"}.intersection(ansible_role.packages): - common_tail_roles.append(role) - - handlers = ( - """--- -- name: Run systemd daemon-reload - ansible.builtin.systemd: - daemon_reload: true - no_log: "{{ enroll_hide_systemd_status | default(true) | bool }}" - -- name: Restart managed services - ansible.builtin.service: - name: "{{ item.name }}" - state: restarted - loop: "{{ """ - + f"{var_prefix}_systemd_units" - + """ | default([]) }}" - when: - - item.manage | default(false) - - (item.state | default('stopped')) == 'started' -""" + _write_resource_ansible_role( + ctx, + role, + notify_by_kind={"service": "Restart managed services"}, + overwrite_templates=True, + extra_vars={f"{role.var_prefix}_systemd_units": systemd_units}, + grouped_services=True, ) - with open( - os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8" - ) as f: - f.write(handlers) - - task_parts: List[str] = [] - task_parts.append("---\n" + _render_install_packages_tasks(role, var_prefix)) - task_parts.append(_render_generic_files_tasks(var_prefix)) - task_parts.append(_render_grouped_systemd_tasks(var_prefix)) - - tasks = "\n".join(task_parts).rstrip() + "\n" - with open( - os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8" - ) as f: - f.write(tasks) - - with open( - os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8" - ) as f: - f.write("---\ndependencies: []\n") - - readme = f"""# {role} - -Common role for package section/group `{section_label}`. - -## Origin roles -{os.linesep.join("- " + line for line in sorted(ansible_role.origin_lines)) or "- (none)"} - -## Packages -{os.linesep.join("- " + p for p in packages) or "- (none)"} - -## Managed files -{os.linesep.join("- " + mf["dest"] for mf in files_var) or "- (none)"} - -## Managed symlinks -{os.linesep.join("- " + ml["dest"] + " -> " + ml["src"] for ml in links_var) or "- (none)"} - -## Systemd units -{os.linesep.join("- " + u["name"] + " (enabled=" + str(u["enabled"]).lower() + ", state=" + u["state"] + ")" for u in systemd_units) or "- (none)"} - -## Excluded (possible secrets / unsafe) -{os.linesep.join("- " + e.get("path", "") + " (" + e.get("reason", "") + ")" for e in ansible_role.excluded) or "- (none)"} - -## Notes -{os.linesep.join("- " + n for n in ansible_role.notes) or "- (none)"} -""" - with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f: - f.write(readme) - - _add_role(rendered_roles, role) + _add_role(rendered_roles, role.role_name) return rendered_roles, common_tail_roles @@ -2311,144 +2012,21 @@ def _render_package_roles( ctx: AnsibleManifestContext, package_roles: List[Dict[str, Any]], ) -> List[str]: - bundle_dir = ctx.bundle_dir - out_dir = ctx.out_dir - roles_root = ctx.roles_root - fqdn = ctx.fqdn - site_mode = ctx.site_mode - jt_exe = ctx.jt_exe - jt_enabled = ctx.jt_enabled rendered_roles: List[str] = [] - - # Process package roles (those with configuration files) for pr in package_roles: - source_role = pr["role_name"] - role = avoid_reserved_role_name(source_role, prefix="package") - pkg = pr.get("package") or "" - managed_files = pr.get("managed_files", []) or [] - managed_dirs = pr.get("managed_dirs", []) or [] - managed_links = pr.get("managed_links", []) or [] - - ansible_role = AnsibleRole(role) - ansible_role.add_package_snapshot(pr) - - role_dir = os.path.join(roles_root, role) - _write_role_scaffold(role_dir) - - var_prefix = role - - templated, jt_vars = _jinjify_managed_files( - bundle_dir, - source_role, - os.path.join(role_dir, "templates"), - managed_files, - jt_exe=jt_exe, - jt_enabled=jt_enabled, - overwrite_templates=not site_mode, + source_role = str(pr.get("role_name") or pr.get("package") or "package") + role_name = avoid_reserved_role_name(source_role, prefix="package") + role = AnsibleRole(role_name) + role.add_package_snapshot(pr) + _write_resource_ansible_role( + ctx, + role, + notify_by_kind={"package": None}, + overwrite_templates=not ctx.site_mode, + site_defaults=_resource_role_site_defaults(role.var_prefix), + systemd_reload=True, ) - - # Copy only the non-templated artifacts. - if site_mode: - _copy_artifacts( - bundle_dir, - source_role, - _host_role_files_dir(out_dir, fqdn or "", role), - exclude_rels=templated, - ) - else: - _copy_artifacts( - bundle_dir, - source_role, - os.path.join(role_dir, "files"), - exclude_rels=templated, - ) - - pkgs = ansible_role.sorted_packages - - files_var = _build_managed_files_var( - managed_files, - templated, - notify_other=None, - notify_systemd="Run systemd daemon-reload", - ) - - links_var = _build_managed_links_var(managed_links) - - dirs_var = _build_managed_dirs_var(managed_dirs) - - jt_map = yaml_load_mapping(jt_vars) if jt_vars.strip() else {} - base_vars: Dict[str, Any] = { - f"{var_prefix}_packages": pkgs, - f"{var_prefix}_managed_files": files_var, - f"{var_prefix}_managed_dirs": dirs_var, - f"{var_prefix}_managed_links": links_var, - } - base_vars = _merge_mappings_overwrite(base_vars, jt_map) - - if site_mode: - _write_role_defaults( - role_dir, - { - f"{var_prefix}_packages": [], - f"{var_prefix}_managed_files": [], - f"{var_prefix}_managed_dirs": [], - f"{var_prefix}_managed_links": [], - }, - ) - _write_hostvars(out_dir, fqdn or "", role, base_vars) - else: - _write_role_defaults(role_dir, base_vars) - - handlers = """--- -- name: Run systemd daemon-reload - ansible.builtin.systemd: - daemon_reload: true - no_log: "{{ enroll_hide_systemd_status | default(true) | bool }}" -""" - with open( - os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8" - ) as f: - f.write(handlers) - - task_parts: List[str] = [] - task_parts.append("---\n" + _render_install_packages_tasks(role, var_prefix)) - task_parts.append(_render_generic_files_tasks(var_prefix)) - - tasks = "\n".join(task_parts).rstrip() + "\n" - with open( - os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8" - ) as f: - f.write(tasks) - - with open( - os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8" - ) as f: - f.write("---\ndependencies: []\n") - - excluded = pr.get("excluded", []) - notes = pr.get("notes", []) - readme = f"""# {role} - -Generated for package `{pkg}`. - -## Managed files -{os.linesep.join("- " + mf["path"] + " (" + mf["reason"] + ")" for mf in managed_files) or "- (none)"} - -## Managed symlinks -{os.linesep.join("- " + ml["path"] + " -> " + ml["target"] + " (" + ml.get("reason", "") + ")" for ml in managed_links) or "- (none)"} - -## Excluded (possible secrets / unsafe) -{os.linesep.join("- " + e["path"] + " (" + e["reason"] + ")" for e in excluded) or "- (none)"} - -## Notes -{os.linesep.join("- " + n for n in notes) or "- (none)"} - -> Note: package roles (those not discovered via a systemd service) do not attempt to restart or enable services automatically. -""" - with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f: - f.write(readme) - - _add_role(rendered_roles, role) + _add_role(rendered_roles, role.role_name) return rendered_roles @@ -2461,10 +2039,6 @@ def _render_sysctl_role( return role = sysctl_snapshot.get("role_name", "sysctl") - role_dir = os.path.join(ctx.roles_root, role) - _write_role_scaffold(role_dir) - - var_prefix = role managed_files = sysctl_snapshot.get("managed_files", []) or [] conf_src_rel = "" for mf in managed_files: @@ -2474,86 +2048,30 @@ def _render_sysctl_role( if not conf_src_rel and managed_files: conf_src_rel = managed_files[0].get("src_rel") or "" - parameters = sysctl_snapshot.get("parameters", {}) or {} - notes = sysctl_snapshot.get("notes", []) or [] - - if ctx.site_mode: - _copy_artifacts( - ctx.bundle_dir, - role, - _host_role_files_dir(ctx.out_dir, ctx.fqdn or "", role), - ) - else: - _copy_artifacts(ctx.bundle_dir, role, os.path.join(role_dir, "files")) + _write_role_scaffold(os.path.join(ctx.roles_root, role)) + _copy_role_artifacts(ctx, role, role) + var_prefix = role vars_map: Dict[str, Any] = { f"{var_prefix}_conf_src_rel": conf_src_rel, f"{var_prefix}_apply": True, f"{var_prefix}_ignore_apply_errors": True, } - - if ctx.site_mode: - _write_role_defaults( - role_dir, - { - f"{var_prefix}_conf_src_rel": "", - f"{var_prefix}_apply": True, - f"{var_prefix}_ignore_apply_errors": True, - }, - ) - _write_hostvars(ctx.out_dir, ctx.fqdn or "", role, vars_map) - else: - _write_role_defaults(role_dir, vars_map) - - tasks = "---\n" + _render_sysctl_tasks(var_prefix) - with open(os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8") as f: - f.write(tasks.rstrip() + "\n") - - handlers_dir = os.path.join(role_dir, "handlers") - os.makedirs(handlers_dir, exist_ok=True) - with open(os.path.join(handlers_dir, "main.yml"), "w", encoding="utf-8") as f: - f.write(_render_sysctl_handlers(var_prefix)) - - with open(os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8") as f: - f.write("---\ndependencies: []\n") - - param_count = len(parameters) if isinstance(parameters, dict) else 0 - sample_params = [] - if isinstance(parameters, dict): - sample_params = sorted(parameters.keys())[:25] - - readme = f"""# {role} - -Generated from live writable sysctl state captured during harvest. - -This role deploys the captured values to `/etc/sysctl.d/99-enroll.conf`. The generated file intentionally contains writable, single-line sysctl values only; read-only, multiline, action-like, and host-identity keys are skipped to avoid creating a noisy or brittle boot-time configuration. - -## Captured parameters - -Captured parameter count: {param_count} - -{os.linesep.join("- " + x for x in sample_params) or "- (none)"} - -{"- ..." if param_count > len(sample_params) else ""} - -## Notes -{os.linesep.join("- " + n for n in notes) or "- (none)"} - -## Safety notes -- `sysctl_apply` defaults to `true` and applies `/etc/sysctl.d/99-enroll.conf` when the file changes. -- `sysctl_ignore_apply_errors` defaults to `true`, because sysctl availability and writability can vary across kernels, containers, and hardware. -- Review this role before applying it broadly across unlike hosts. -""" - with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f: - f.write(readme) - - return role + return _write_ansible_role( + ctx, + role, + vars_map=vars_map, + site_defaults={ + f"{var_prefix}_conf_src_rel": "", + f"{var_prefix}_apply": True, + f"{var_prefix}_ignore_apply_errors": True, + }, + tasks=_render_role_tasks(AnsibleRole(role), sysctl=True), + handlers=_render_role_handlers(AnsibleRole(role), sysctl=True), + ) def _render_enroll_runtime_role(ctx: AnsibleManifestContext) -> str: - role = "enroll_runtime" - role_dir = os.path.join(ctx.roles_root, role) - _write_role_scaffold(role_dir) tasks = """--- - name: Ensure Enroll runtime directory exists ansible.builtin.file: @@ -2563,53 +2081,32 @@ def _render_enroll_runtime_role(ctx: AnsibleManifestContext) -> str: group: root mode: "0750" """ - with open(os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8") as f: - f.write(tasks.rstrip() + "\n") - with open(os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8") as f: - f.write("---\ndependencies: []\n") - with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f: - f.write( - "# enroll_runtime\n\n" - "Generated by Enroll to hold common runtime scaffolding used by other generated roles.\n" - ) - return role + return _write_ansible_role( + ctx, + "enroll_runtime", + tasks=_render_role_tasks(AnsibleRole("enroll_runtime"), extra_tasks=tasks), + ) def _render_firewall_runtime_role( ctx: AnsibleManifestContext, firewall_runtime_snapshot: Dict[str, Any], ) -> Optional[str]: - if not ( - firewall_runtime_snapshot - and ( - firewall_runtime_snapshot.get("ipset_save") - or firewall_runtime_snapshot.get("iptables_v4_save") - or firewall_runtime_snapshot.get("iptables_v6_save") - ) - ): - return - role = firewall_runtime_snapshot.get("role_name", "firewall_runtime") - role_dir = os.path.join(ctx.roles_root, role) - _write_role_scaffold(role_dir) + arole = AnsibleRole(role) + if not arole.firewall_runtime_snapshot_has_artifacts(firewall_runtime_snapshot): + return + arole.add_firewall_runtime_snapshot(firewall_runtime_snapshot) + _write_role_scaffold(os.path.join(ctx.roles_root, role)) + _copy_role_artifacts(ctx, role, role) var_prefix = role - packages = firewall_runtime_snapshot.get("packages", []) or [] - ipset_save = firewall_runtime_snapshot.get("ipset_save") or "" - ipset_sets = firewall_runtime_snapshot.get("ipset_sets", []) or [] - iptables_v4_save = firewall_runtime_snapshot.get("iptables_v4_save") or "" - iptables_v6_save = firewall_runtime_snapshot.get("iptables_v6_save") or "" - notes = firewall_runtime_snapshot.get("notes", []) or [] - - if ctx.site_mode: - _copy_artifacts( - ctx.bundle_dir, - role, - _host_role_files_dir(ctx.out_dir, ctx.fqdn or "", role), - ) - else: - _copy_artifacts(ctx.bundle_dir, role, os.path.join(role_dir, "files")) - + packages = list(arole.package_names_from_snapshot(firewall_runtime_snapshot)) + refs = arole.firewall_runtime + ipset_save = refs.get("ipset_save", "") + ipset_sets = refs.get("ipset_sets", []) or [] + iptables_v4_save = refs.get("iptables_v4_save", "") + iptables_v6_save = refs.get("iptables_v6_save", "") vars_map: Dict[str, Any] = { f"{var_prefix}_packages": packages, f"{var_prefix}_ipset_save": ipset_save, @@ -2619,65 +2116,22 @@ def _render_firewall_runtime_role( f"{var_prefix}_sync_ipsets_exact": True, f"{var_prefix}_restore_iptables": True, } - - if ctx.site_mode: - _write_role_defaults( - role_dir, - { - f"{var_prefix}_packages": [], - f"{var_prefix}_ipset_save": "", - f"{var_prefix}_ipset_sets": [], - f"{var_prefix}_iptables_v4_save": "", - f"{var_prefix}_iptables_v6_save": "", - f"{var_prefix}_sync_ipsets_exact": True, - f"{var_prefix}_restore_iptables": True, - }, - ) - _write_hostvars(ctx.out_dir, ctx.fqdn or "", role, vars_map) - else: - _write_role_defaults(role_dir, vars_map) - - tasks = ( - "---\n" - + _render_install_packages_tasks(role, var_prefix) - + _render_firewall_runtime_tasks(var_prefix) + return _write_ansible_role( + ctx, + role, + vars_map=vars_map, + site_defaults={ + f"{var_prefix}_packages": [], + f"{var_prefix}_ipset_save": "", + f"{var_prefix}_ipset_sets": [], + f"{var_prefix}_iptables_v4_save": "", + f"{var_prefix}_iptables_v6_save": "", + f"{var_prefix}_sync_ipsets_exact": True, + f"{var_prefix}_restore_iptables": True, + }, + tasks=_render_role_tasks(arole, packages=True, firewall_runtime=True), + handlers=_render_role_handlers(arole, firewall_runtime=True), ) - with open(os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8") as f: - f.write(tasks.rstrip() + "\n") - - with open( - os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8" - ) as f: - f.write(_render_firewall_runtime_handlers(var_prefix).rstrip() + "\n") - - with open(os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8") as f: - f.write("---\ndependencies: []\n") - - readme = f"""# {role} - -Generated from live firewall runtime state captured during harvest. - -This role restores live ipset and iptables state only for firewall families where Enroll did not find corresponding persistent configuration on the source host. Static firewall configuration files, such as `/etc/iptables/rules.v4`, `/etc/iptables/rules.v6`, UFW, nftables, firewalld, or `/etc/ipset*`, are harvested separately as managed files and treated as authoritative for their respective family. - -## Captured snapshots -- ipset: {ipset_save or "(none)"} -- iptables IPv4: {iptables_v4_save or "(none)"} -- iptables IPv6: {iptables_v6_save or "(none)"} - -## Captured ipsets -{os.linesep.join("- " + x for x in ipset_sets) or "- (none)"} - -## Notes -{os.linesep.join("- " + n for n in notes) or "- (none)"} - -## Safety notes -- `firewall_runtime_sync_ipsets_exact` defaults to `true`; it flushes captured set members before replaying the saved members so stale entries are removed. This applies only when no persistent ipset config was found. -- `firewall_runtime_restore_iptables` defaults to `true`; `iptables-restore`/`ip6tables-restore` replace only captured families. A family is captured only when no corresponding persistent iptables config was found. -""" - with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f: - f.write(readme) - - return role # --- User role renderer --- @@ -2685,209 +2139,30 @@ def _render_users_role( ctx: AnsibleManifestContext, users_snapshot: Dict[str, Any], ) -> Optional[str]: - bundle_dir = ctx.bundle_dir - out_dir = ctx.out_dir - roles_root = ctx.roles_root - fqdn = ctx.fqdn - site_mode = ctx.site_mode + if not users_snapshot: + return - # ------------------------- - # Users role (non-system users) - # ------------------------- - if users_snapshot: - role = users_snapshot.get("role_name", "users") - role_dir = os.path.join(roles_root, role) - _write_role_scaffold(role_dir) + role = users_snapshot.get("role_name", "users") + arole = AnsibleRole(role) + arole.add_users_snapshot(users_snapshot) - # Users role includes harvested SSH-related files; in site mode keep them - # host-specific to avoid cross-host clobber. - if site_mode: - _copy_artifacts( - bundle_dir, role, _host_role_files_dir(out_dir, fqdn or "", role) - ) - else: - _copy_artifacts(bundle_dir, role, os.path.join(role_dir, "files")) + _write_role_scaffold(os.path.join(ctx.roles_root, role)) + _copy_role_artifacts(ctx, role, role) - users = users_snapshot.get("users", []) - managed_files = users_snapshot.get("managed_files", []) - excluded = users_snapshot.get("excluded", []) - notes = users_snapshot.get("notes", []) + users_needs_community = bool(arole.flatpak_remotes or arole.flatpaks) + if users_needs_community: + _ensure_requirements_yaml(os.path.join(ctx.out_dir, "requirements.yml")) - # Build groups list and a simplified user dict list suitable for loops - group_names: List[str] = [] - group_set = set() - users_data: List[Dict[str, Any]] = [] - for u in users: - name = u.get("name") - if not name: - continue - pg = u.get("primary_group") or name - home = u.get("home") or f"/home/{name}" - sshdir = home.rstrip("/") + "/.ssh" - supp = u.get("supplementary_groups") or [] - if pg: - group_set.add(pg) - for g in supp: - if g: - group_set.add(g) + vars_map = { + "users_groups": arole.users_groups, + "users_users": arole.users_data, + "users_ssh_dirs": arole.users_ssh_dirs, + "users_ssh_files": arole.users_ssh_files, + "users_flatpaks": arole.flatpaks, + "users_flatpak_remotes": arole.flatpak_remotes, + } - users_data.append( - { - "name": name, - "uid": u.get("uid"), - "primary_group": pg, - "home": home, - "ssh_dir": sshdir, - "shell": u.get("shell"), - "gecos": u.get("gecos"), - "supplementary_groups": sorted(set(supp)), - } - ) - - group_names = sorted(group_set) - - # User-managed files (authorized_keys plus dangerous-mode shell dotfiles). - # Keep the variable name for compatibility with existing generated data. - ssh_files: List[Dict[str, Any]] = [] - for mf in managed_files: - dest = mf.get("path") or "" - src_rel = mf.get("src_rel") or "" - if not dest or not src_rel: - continue - - owner = "root" - group = "root" - for u in users_data: - home_prefix = (u.get("home") or "").rstrip("/") + "/" - if home_prefix and dest.startswith(home_prefix): - owner = str(u.get("name") or "root") - group = str(u.get("primary_group") or owner) - break - - # Prefer the harvested file mode so we preserve any deliberate - # permissions (e.g. 0600 for certain dotfiles). For authorized_keys, - # enforce 0600 regardless. - mode = mf.get("mode") or "0644" - if mf.get("reason") == "authorized_keys": - mode = "0600" - ssh_files.append( - { - "dest": dest, - "src_rel": src_rel, - "owner": owner, - "group": group, - "mode": mode, - } - ) - - # Only create .ssh directories for users that actually have harvested - # files under .ssh. - ssh_dirs_by_dest: Dict[str, Dict[str, Any]] = {} - for item in ssh_files: - dest = str(item.get("dest") or "") - if not dest: - continue - for user in users_data: - ssh_dir = str(user.get("ssh_dir") or "").rstrip("/") - if not ssh_dir or not dest.startswith(ssh_dir + "/"): - continue - ssh_dirs_by_dest.setdefault( - ssh_dir, - { - "dest": ssh_dir, - "owner": str(user.get("name") or item.get("owner") or "root"), - "group": str( - user.get("primary_group") or item.get("group") or "root" - ), - "mode": "0700", - }, - ) - break - ssh_dirs = sorted( - ssh_dirs_by_dest.values(), key=lambda item: str(item.get("dest") or "") - ) - - # Build Flatpak and Snap lists. Flatpak can be installed system-wide or - # per-user. Snap packages are system-wide; per-user ~/snap/* directories - # are runtime/user data and are not treated as install sources. - users_flatpaks: List[Dict[str, Any]] = [] - user_flatpak_map = users_snapshot.get("user_flatpaks", {}) or {} - home_by_user = { - str(u.get("name")): str(u.get("home") or "") for u in users_data - } - for uname, flatpaks in user_flatpak_map.items(): - for fp in flatpaks or []: - users_flatpaks.append( - _normalise_flatpak_item( - fp, - method="user", - user=str(uname), - home=home_by_user.get(str(uname)) or None, - ) - ) - - flatpak_remotes = [ - _normalise_flatpak_remote(r) - for r in (users_snapshot.get("user_flatpak_remotes", []) or []) - ] - users_needs_community = bool(flatpak_remotes or users_flatpaks) - if users_needs_community: - _ensure_requirements_yaml(os.path.join(out_dir, "requirements.yml")) - - # Variables are host-specific in site mode; in non-site mode they live in role defaults. - if site_mode: - _write_role_defaults( - role_dir, - { - "users_groups": [], - "users_users": [], - "users_ssh_dirs": [], - "users_ssh_files": [], - "users_flatpaks": [], - "users_flatpak_remotes": [], - }, - ) - _write_hostvars( - out_dir, - fqdn or "", - role, - { - "users_groups": group_names, - "users_users": users_data, - "users_ssh_dirs": ssh_dirs, - "users_ssh_files": ssh_files, - "users_flatpaks": users_flatpaks, - "users_flatpak_remotes": flatpak_remotes, - }, - ) - else: - _write_role_defaults( - role_dir, - { - "users_groups": group_names, - "users_users": users_data, - "users_ssh_dirs": ssh_dirs, - "users_ssh_files": ssh_files, - "users_flatpaks": users_flatpaks, - "users_flatpak_remotes": flatpak_remotes, - }, - ) - - with open( - os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8" - ) as f: - if users_needs_community: - f.write( - "---\n" - "dependencies: []\n" - "collections:\n" - " - community.general\n" - ) - else: - f.write("---\ndependencies: []\n") - - # tasks (data-driven) - users_tasks = """--- + users_tasks = """--- - name: Ensure groups exist ansible.builtin.group: @@ -2939,8 +2214,8 @@ def _render_users_role( loop: "{{ users_ssh_files | default([]) }}" """ - if flatpak_remotes or users_flatpaks: - users_tasks += """ + if users_needs_community: + users_tasks += """ - name: Ensure user Flatpak remotes exist ansible.builtin.command: argv: @@ -2983,100 +2258,21 @@ def _render_users_role( XDG_DATA_HOME: "{{ (item.home | default('/home/' ~ item.user, true)) ~ '/.local/share' }}" """ - with open( - os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8" - ) as f: - f.write(users_tasks) - - with open( - os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8" - ) as f: - f.write("---\n") - - def _fmt_user_flatpaks(items: List[Dict[str, Any]]) -> str: - lines = [] - for item in items: - name = item.get("name") - user = item.get("user") - if not name or not user: - continue - detail_parts = [] - for key in ("remote", "branch", "arch"): - value = item.get(key) - if value not in (None, "", []): - detail_parts.append(f"{key}={value}") - details = f" ({', '.join(detail_parts)})" if detail_parts else "" - lines.append(f"- {user}: {name}{details}") - return "\n".join(lines) or "- (none)" - - def _fmt_remotes(items: List[Dict[str, Any]]) -> str: - lines = [] - for item in items: - name = item.get("name") - url = item.get("url") - method = item.get("method") or "system" - user = item.get("user") - if not name or not url: - continue - owner = f"user={user}" if user else "system" - lines.append(f"- {name} ({method}, {owner}): {url}") - return "\n".join(lines) or "- (none)" - - readme = ( - """# users - -Generated non-system user accounts, SSH public material, and per-user Flatpak -applications/remotes. - -**Note:** User Flatpak tasks require the `community.general` Ansible collection. -Install it with: `ansible-galaxy collection install -r requirements.yml`. - -Flatpak `remote` is harvested from the installed deployment where detectable. -The original `.flatpakref` URL is generally not preserved by Flatpak after -installation, so `from_url` is only emitted if a future/hand-edited state file -contains it. - - -## Users -""" - + ( - "\n".join([f"- {u.get('name')} (uid {u.get('uid')})" for u in users]) - or "- (none)" - ) - + """\n -## Included SSH files -""" - + ( - "\n".join( - [f"- {mf.get('path')} ({mf.get('reason')})" for mf in managed_files] - ) - or "- (none)" - ) - + """\n -## Flatpak remotes -""" - + _fmt_remotes(flatpak_remotes) - + """\n -## User Flatpaks -""" - + _fmt_user_flatpaks(users_flatpaks) - + """\n -## Excluded -""" - + ( - "\n".join([f"- {e.get('path')} ({e.get('reason')})" for e in excluded]) - or "- (none)" - ) - + """\n -## Notes -""" - + ("\n".join([f"- {n}" for n in notes]) or "- (none)") - + """\n""" - ) - with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f: - f.write(readme) - - return role + return _write_ansible_role( + ctx, + role, + vars_map=vars_map, + site_defaults={ + "users_groups": [], + "users_users": [], + "users_ssh_dirs": [], + "users_ssh_files": [], + "users_flatpaks": [], + "users_flatpak_remotes": [], + }, + tasks=_render_role_tasks(arole, extra_tasks=users_tasks), + collections=["community.general"] if users_needs_community else None, + ) class AnsibleManifestRenderer: @@ -3176,6 +2372,10 @@ class AnsibleManifestRenderer: _add_role(ordered_roles, role) _write_manifest_playbook(ctx, ordered_roles) + Path(ctx.out_dir, "README.md").write_text( + _render_readme(state, ordered_roles, fqdn=ctx.fqdn), + encoding="utf-8", + ) def manifest_from_bundle_dir( diff --git a/enroll/cm.py b/enroll/cm.py index 39306b1..4718b90 100644 --- a/enroll/cm.py +++ b/enroll/cm.py @@ -1,8 +1,19 @@ from __future__ import annotations +import shlex from dataclasses import dataclass, field from pathlib import Path -from typing import Any, Dict, Iterable, Iterator, List, Mapping, Set +from typing import ( + Any, + Callable, + ClassVar, + Dict, + Iterable, + Iterator, + List, + Mapping, + Set, +) from .state import load_state, state_path, write_state @@ -25,9 +36,18 @@ class CMModule: files: Dict[str, Dict[str, Any]] = field(default_factory=dict) links: Dict[str, Dict[str, Any]] = field(default_factory=dict) services: Dict[str, Dict[str, Any]] = field(default_factory=dict) + firewall_runtime: Dict[str, Any] = field(default_factory=dict) notes: List[str] = field(default_factory=list) - def has_resources(self) -> bool: + managed_owner_attr: ClassVar[str] = "owner" + firewall_runtime_dir: ClassVar[str] = "/etc/enroll/firewall" + firewall_runtime_artifacts: ClassVar[tuple[tuple[str, str, str], ...]] = ( + ("ipset_save", "ipset.save", "0600"), + ("iptables_v4_save", "iptables.v4", "0600"), + ("iptables_v6_save", "iptables.v6", "0600"), + ) + + def has_core_resources(self) -> bool: return bool( self.packages or self.groups @@ -36,9 +56,20 @@ class CMModule: or self.files or self.links or self.services + or self.firewall_runtime or self.notes ) + def has_resources(self) -> bool: + return self.has_core_resources() + + def has_resources_or_attrs(self, *attrs: str) -> bool: + """Return true if core resources or named renderer extras are present.""" + + return self.has_core_resources() or any( + bool(getattr(self, attr, None)) for attr in attrs + ) + @staticmethod def state_path(bundle_dir: str | Path) -> Path: """Return the canonical state.json path for a harvest bundle.""" @@ -142,6 +173,412 @@ class CMModule: def add_snapshot_notes(self, snap: Dict[str, Any]) -> None: self.notes.extend(str(n) for n in (snap.get("notes", []) or [])) + @staticmethod + def package_name_from_snapshot(snap: Dict[str, Any]) -> str: + return str(snap.get("package") or "").strip() + + @staticmethod + def package_names_from_snapshot(snap: Dict[str, Any]) -> Iterator[str]: + for pkg in snap.get("packages", []) or []: + pkg_s = str(pkg or "").strip() + if pkg_s: + yield pkg_s + + def add_package_snapshot(self, snap: Dict[str, Any]) -> None: + pkg = self.package_name_from_snapshot(snap) + if pkg: + self.packages.add(pkg) + + def add_service_packages_from_snapshot(self, snap: Dict[str, Any]) -> None: + self.packages.update(self.package_names_from_snapshot(snap)) + + def service_unit_from_snapshot(self, snap: Dict[str, Any]) -> str: + return str(snap.get("unit") or "").strip() + + def service_enabled_from_snapshot(self, snap: Dict[str, Any]) -> bool: + unit_file_state = str(snap.get("unit_file_state") or "") + return unit_file_state in ("enabled", "enabled-runtime") + + def service_state_from_snapshot( + self, + snap: Dict[str, Any], + *, + running: str, + stopped: str, + ) -> str: + return running if snap.get("active_state") == "active" else stopped + + def add_service_snapshot_state( + self, + snap: Dict[str, Any], + *, + state_key: str, + running: str, + stopped: str, + include_manage: bool = False, + ) -> None: + """Add the common systemd service parts, parameterised per renderer.""" + + self.add_service_packages_from_snapshot(snap) + unit = self.service_unit_from_snapshot(snap) + if not unit: + return + + data: Dict[str, Any] = { + "name": unit, + state_key: self.service_state_from_snapshot( + snap, running=running, stopped=stopped + ), + "enable": self.service_enabled_from_snapshot(snap), + } + if include_manage: + data["manage"] = True + self.services[unit] = data + + @staticmethod + def normalise_flatpak_item( + item: Any, + *, + method: str, + user: str | None = None, + home: str | None = None, + ) -> Dict[str, Any]: + if isinstance(item, dict): + out = dict(item) + elif isinstance(item, str): + out = {"name": item} + else: + out = {"name": str(item)} + + out["method"] = str(out.get("method") or method or "system").strip() or "system" + if user and not out.get("user"): + out["user"] = user + if home and not out.get("home"): + out["home"] = home + ref = str(out.get("ref") or "").strip() + if ref and not out.get("name"): + out["name"] = ref.rsplit("/", 1)[-1] + name = str(out.get("name") or out.get("app_id") or "").strip() + if name: + out["name"] = name + remote = str(out.get("remote") or "").strip() + if remote: + out["remote"] = remote + branch = str(out.get("branch") or out.get("origin") or "").strip() + if branch: + out["branch"] = branch + if ref: + out["ref"] = ref + return out + + @staticmethod + def normalise_flatpak_remote(item: Any) -> Dict[str, Any]: + if isinstance(item, dict): + out = dict(item) + else: + out = {"name": str(item)} + name = str(out.get("name") or out.get("remote") or "").strip() + url = str(out.get("url") or out.get("from_url") or "").strip() + method = ( + str(out.get("method") or out.get("scope") or "system").strip() or "system" + ) + if name: + out["name"] = name + if url: + out["url"] = url + out["method"] = "user" if method == "user" else "system" + return out + + @staticmethod + def normalise_snap_item(item: Any) -> Dict[str, Any]: + if isinstance(item, dict): + out = dict(item) + elif isinstance(item, str): + out = {"name": item} + else: + out = {"name": str(item)} + + name = str(out.get("name") or "").strip() + if name: + out["name"] = name + channel = str(out.get("tracking") or out.get("channel") or "").strip() + if channel: + out["channel"] = channel + raw_notes = out.get("notes") or [] + if isinstance(raw_notes, str): + raw_notes = [raw_notes] + notes = [str(note).lower() for note in raw_notes] + confinement = str(out.get("confinement") or "").strip().lower() + out["classic"] = bool( + out.get("classic") + or confinement == "classic" + or any("classic" in note for note in notes) + ) + out["devmode"] = bool( + out.get("devmode") + or any("devmode" in note or "dev mode" in note for note in notes) + ) + out["dangerous"] = bool( + out.get("dangerous") or any("dangerous" in note for note in notes) + ) + revision = str(out.get("revision") or "").strip() + if revision and not channel: + out["revision"] = revision + return out + + def prepare_flatpak_remote(self, item: Dict[str, Any]) -> Dict[str, Any]: + raise NotImplementedError + + def prepare_flatpak_item(self, item: Dict[str, Any]) -> Dict[str, Any]: + raise NotImplementedError + + def prepare_snap_item(self, item: Dict[str, Any]) -> Dict[str, Any]: + raise NotImplementedError + + @staticmethod + def user_records_from_snapshot(snap: Dict[str, Any]) -> List[Dict[str, Any]]: + records: List[Dict[str, Any]] = [] + for raw in snap.get("users", []) or []: + if not isinstance(raw, dict): + continue + name = str(raw.get("name") or "").strip() + if not name: + continue + primary_group = str(raw.get("primary_group") or name).strip() + supplementary = sorted( + { + str(group).strip() + for group in (raw.get("supplementary_groups") or []) + if str(group).strip() + } + ) + records.append( + { + "name": name, + "uid": raw.get("uid"), + "gid": raw.get("gid"), + "primary_group": primary_group, + "home": raw.get("home") or f"/home/{name}", + "shell": raw.get("shell"), + "gecos": raw.get("gecos"), + "supplementary_groups": supplementary, + } + ) + return records + + @staticmethod + def user_group_names_from_records(records: Iterable[Mapping[str, Any]]) -> Set[str]: + groups: Set[str] = set() + for record in records: + primary_group = str(record.get("primary_group") or "").strip() + if primary_group: + groups.add(primary_group) + groups.update( + str(group).strip() + for group in (record.get("supplementary_groups") or []) + if str(group).strip() + ) + return groups + + @staticmethod + def package_service_entries( + roles: Mapping[str, Any], + inventory_packages: Mapping[str, Any], + *, + use_common_roles: bool, + ) -> Iterator[Dict[str, Any]]: + for svc in roles.get("services", []) or []: + if not isinstance(svc, dict): + continue + own_label = str(svc.get("role_name") or svc.get("unit") or "service") + role_label = ( + section_label_for_packages( + svc.get("packages", []) or [], inventory_packages + ) + if use_common_roles + else own_label + ) + yield {"kind": "service", "snapshot": svc, "role_label": role_label} + + for pkg in roles.get("packages", []) or []: + if not isinstance(pkg, dict): + continue + own_label = str(pkg.get("role_name") or pkg.get("package") or "package") + role_label = ( + package_section_label(pkg, inventory_packages) + if use_common_roles + else own_label + ) + yield {"kind": "package", "snapshot": pkg, "role_label": role_label} + + def add_user_flatpaks_snapshot(self, snap: Dict[str, Any]) -> None: + home_by_user = { + str(u.get("name")): str(u.get("home") or "") + for u in (snap.get("users", []) or []) + if isinstance(u, dict) and u.get("name") + } + for remote in snap.get("user_flatpak_remotes", []) or []: + item = self.normalise_flatpak_remote(remote) + user = str(item.get("user") or "").strip() + if user and not item.get("home"): + item["home"] = home_by_user.get(user) or f"/home/{user}" + if item.get("method") == "user" and item.get("name") and item.get("url"): + self.flatpak_remotes.append( # type: ignore[attr-defined] + self.prepare_flatpak_remote(item) + ) + for uname, flatpaks in (snap.get("user_flatpaks", {}) or {}).items(): + user = str(uname) + for fp in flatpaks or []: + item = self.normalise_flatpak_item( + fp, method="user", user=user, home=home_by_user.get(user) or None + ) + if item.get("name"): + self.flatpaks.append( # type: ignore[attr-defined] + self.prepare_flatpak_item(item) + ) + + def add_flatpak_snapshot(self, snap: Dict[str, Any]) -> None: + for remote in snap.get("remotes", []) or []: + item = self.normalise_flatpak_remote(remote) + if item.get("name") and item.get("url"): + self.flatpak_remotes.append( # type: ignore[attr-defined] + self.prepare_flatpak_remote(item) + ) + for fp in snap.get("system_flatpaks", []) or []: + item = self.normalise_flatpak_item(fp, method="system") + if item.get("name"): + self.flatpaks.append( # type: ignore[attr-defined] + self.prepare_flatpak_item(item) + ) + self.add_snapshot_notes(snap) + + def add_snap_snapshot(self, snap: Dict[str, Any]) -> None: + for raw in snap.get("system_snaps", []) or []: + item = self.normalise_snap_item(raw) + if item.get("name"): + self.snaps.append( # type: ignore[attr-defined] + self.prepare_snap_item(item) + ) + self.add_snapshot_notes(snap) + + def firewall_runtime_snapshot_has_artifacts(self, snap: Mapping[str, Any]) -> bool: + return any( + str(snap.get(key) or "").strip() + for key, _dest, _mode in self.firewall_runtime_artifacts + ) + + def firewall_runtime_source_refs(self, snap: Mapping[str, Any]) -> Dict[str, str]: + return { + key: str(snap.get(key) or "").strip() + for key, _dest, _mode in self.firewall_runtime_artifacts + if str(snap.get(key) or "").strip() + } + + def firewall_runtime_dest_path(self, dest_name: str) -> str: + return f"{self.firewall_runtime_dir}/{dest_name}" + + def firewall_runtime_ipset_sets(self, snap: Mapping[str, Any]) -> List[str]: + return [ + str(x).strip() for x in (snap.get("ipset_sets") or []) if str(x).strip() + ] + + @staticmethod + def shell_quote(value: Any) -> str: + return shlex.quote(str(value or "")) + + def firewall_ipset_restore_cmd(self, path: str, sets: List[str]) -> str: + flush_parts = [f"ipset flush {self.shell_quote(name)} || true" for name in sets] + flush = "; ".join(flush_parts) + restore = f"ipset restore -exist < {self.shell_quote(path)}" + if flush: + return f"/bin/sh -c {self.shell_quote(flush + '; ' + restore)}" + return f"/bin/sh -c {self.shell_quote(restore)}" + + def firewall_runtime_commands(self, runtime: Mapping[str, Any]) -> Dict[str, Any]: + out: Dict[str, Any] = {} + ipset_path = str(runtime.get("ipset_save") or "") + if ipset_path: + sets = [str(x) for x in (runtime.get("ipset_sets") or []) if str(x)] + out["ipset_restore_cmd"] = self.firewall_ipset_restore_cmd(ipset_path, sets) + ipt4_path = str(runtime.get("iptables_v4_save") or "") + if ipt4_path: + out["iptables_v4_restore_cmd"] = ( + f"iptables-restore {self.shell_quote(ipt4_path)}" + ) + ipt6_path = str(runtime.get("iptables_v6_save") or "") + if ipt6_path: + out["iptables_v6_restore_cmd"] = ( + f"ip6tables-restore {self.shell_quote(ipt6_path)}" + ) + return out + + def _managed_owner_attrs(self, owner: Any) -> Dict[str, Any]: + return {self.managed_owner_attr: owner or "root"} + + def add_firewall_runtime_snapshot( + self, + snap: Dict[str, Any], + *, + bundle_dir: str, + artifact_role: str, + files_dir: Path, + copy_artifact: Callable[..., str | None], + source_uri: Callable[[str, str], str], + file_prefix: str | None = None, + dir_attrs: Mapping[str, Any] | None = None, + file_attrs: Mapping[str, Any] | None = None, + ) -> None: + """Add captured live firewall state using renderer-supplied file hooks.""" + + self.add_service_packages_from_snapshot(snap) + attrs: Dict[str, Any] = { + **self._managed_owner_attrs("root"), + "group": "root", + "mode": "0750", + "reason": "firewall_runtime", + } + if dir_attrs: + attrs.update(dir_attrs) + self.add_managed_dir(self.firewall_runtime_dir, **attrs) + + runtime: Dict[str, Any] = {} + for key, dest_name, mode in self.firewall_runtime_artifacts: + src_rel = str(snap.get(key) or "").strip() + if not src_rel: + continue + role_rel = copy_artifact( + bundle_dir, + artifact_role, + src_rel, + files_dir, + dst_prefix=file_prefix, + ) + if not role_rel: + self.notes.append( + f"Firewall runtime artifact {src_rel!r} was referenced but not found." + ) + continue + file_data: Dict[str, Any] = { + **self._managed_owner_attrs("root"), + "group": "root", + "mode": mode, + "source": source_uri(self.module_name, role_rel), + "reason": "firewall_runtime", + } + if file_attrs: + file_data.update(file_attrs) + dest = self.firewall_runtime_dest_path(dest_name) + self.add_managed_file(dest, **file_data) + runtime[key] = dest + + ipset_sets = self.firewall_runtime_ipset_sets(snap) + if ipset_sets: + runtime["ipset_sets"] = ipset_sets + if runtime: + runtime.update(self.firewall_runtime_commands(runtime)) + self.firewall_runtime.update(runtime) + self.add_snapshot_notes(snap) + def remove_directory_resource_conflicts(self) -> None: for path in set(self.files) | set(self.links): self.dirs.pop(path, None) @@ -204,6 +641,55 @@ def role_order_key(role: str) -> tuple[int, str]: return (priority.get(role, 50), role) +def markdown_list(items: Iterable[Any], *, empty: str = "None.") -> str: + values = [str(item) for item in items if str(item)] + return "\n".join(f"- {item}" for item in values) or f"- {empty}" + + +def path_reason_lines( + items: Iterable[Mapping[str, Any]], *, source_key: str = "path" +) -> List[str]: + lines: List[str] = [] + for item in items or []: + path = str(item.get(source_key) or "") + if not path: + continue + reason = str(item.get("reason") or "") + lines.append(f"{path} ({reason})" if reason else path) + return lines + + +def iter_role_snapshots(roles: Mapping[str, Any]) -> Iterator[Mapping[str, Any]]: + for value in roles.values(): + if isinstance(value, list): + for item in value: + if isinstance(item, Mapping): + yield item + elif isinstance(value, Mapping): + yield value + + +def snapshot_note_lines(roles: Mapping[str, Any]) -> List[str]: + notes: List[str] = [] + for snap in iter_role_snapshots(roles): + source = str( + snap.get("role_name") or snap.get("unit") or snap.get("package") or "role" + ) + notes.extend(f"`{source}`: {note}" for note in snap.get("notes", []) or []) + return notes + + +def snapshot_excluded_lines(roles: Mapping[str, Any]) -> List[str]: + excluded: List[str] = [] + for snap in iter_role_snapshots(roles): + source = str( + snap.get("role_name") or snap.get("unit") or snap.get("package") or "role" + ) + for line in path_reason_lines(snap.get("excluded", []) or []): + excluded.append(f"`{source}`: {line}") + return excluded + + def _drop_duplicate_set_items( module: CMModule, values: Set[str], diff --git a/enroll/puppet.py b/enroll/puppet.py index 0e8a08a..286ce47 100644 --- a/enroll/puppet.py +++ b/enroll/puppet.py @@ -12,10 +12,9 @@ import yaml from .cm import ( CMModule, - package_section_label, resolve_catalog_conflicts, role_order_key, - section_label_for_packages, + markdown_list, ) from .state import inventory_packages_from_state, roles_from_state @@ -32,108 +31,43 @@ class PuppetRole(CMModule): self.flatpak_remotes: List[Dict[str, Any]] = [] self.flatpaks: List[Dict[str, Any]] = [] self.snaps: List[Dict[str, Any]] = [] - self.firewall_runtime: Dict[str, Any] = {} def has_resources(self) -> bool: - return ( - super().has_resources() - or bool(self.container_images) - or bool(self.flatpak_remotes) - or bool(self.flatpaks) - or bool(self.snaps) - or bool(self.firewall_runtime) + return self.has_resources_or_attrs( + "container_images", "flatpak_remotes", "flatpaks", "snaps" ) - def add_package_snapshot(self, snap: Dict[str, Any]) -> None: - pkg = str(snap.get("package") or "").strip() - if pkg: - self.packages.add(pkg) - def add_service_snapshot(self, snap: Dict[str, Any]) -> None: - for pkg in snap.get("packages", []) or []: - pkg_s = str(pkg or "").strip() - if pkg_s: - self.packages.add(pkg_s) - unit = str(snap.get("unit") or "").strip() - if unit: - unit_file_state = str(snap.get("unit_file_state") or "") - self.services[unit] = { - "name": unit, - "ensure": ( - "running" if snap.get("active_state") == "active" else "stopped" - ), - "enable": unit_file_state in ("enabled", "enabled-runtime"), - } + self.add_service_snapshot_state( + snap, state_key="ensure", running="running", stopped="stopped" + ) def add_users_snapshot(self, snap: Dict[str, Any]) -> None: - for u in snap.get("users", []) or []: - if not isinstance(u, dict): - continue - name = str(u.get("name") or "").strip() - if not name: - continue - primary_group = str(u.get("primary_group") or name).strip() - if primary_group: - self.groups.add(primary_group) - supplementary = sorted( - { - str(g).strip() - for g in (u.get("supplementary_groups") or []) - if str(g).strip() - } - ) - self.groups.update(supplementary) + records = self.user_records_from_snapshot(snap) + self.groups.update(self.user_group_names_from_records(records)) + for record in records: + name = str(record.get("name") or "") self.users[name] = { "name": name, - "uid": u.get("uid"), - "gid": u.get("gid"), - "primary_group": primary_group or None, - "home": u.get("home") or f"/home/{name}", - "shell": u.get("shell"), - "gecos": u.get("gecos"), - "supplementary_groups": supplementary, + "uid": record.get("uid"), + "gid": record.get("gid"), + "primary_group": record.get("primary_group") or None, + "home": record.get("home"), + "shell": record.get("shell"), + "gecos": record.get("gecos"), + "supplementary_groups": record.get("supplementary_groups") or [], } - home_by_user = { - str(u.get("name")): str(u.get("home") or "") - for u in (snap.get("users", []) or []) - if isinstance(u, dict) and u.get("name") - } - for remote in snap.get("user_flatpak_remotes", []) or []: - item = _normalise_flatpak_remote(remote) - user = str(item.get("user") or "").strip() - if user and not item.get("home"): - item["home"] = home_by_user.get(user) or f"/home/{user}" - if item.get("method") == "user" and item.get("name") and item.get("url"): - self.flatpak_remotes.append(_prepare_flatpak_remote(item)) - for uname, flatpaks in (snap.get("user_flatpaks", {}) or {}).items(): - user = str(uname) - for fp in flatpaks or []: - item = _normalise_flatpak_item( - fp, method="user", user=user, home=home_by_user.get(user) or None - ) - if item.get("name"): - self.flatpaks.append(_prepare_flatpak_item(item)) + self.add_user_flatpaks_snapshot(snap) - def add_flatpak_snapshot(self, snap: Dict[str, Any]) -> None: - for remote in snap.get("remotes", []) or []: - item = _normalise_flatpak_remote(remote) - if item.get("name") and item.get("url"): - self.flatpak_remotes.append(_prepare_flatpak_remote(item)) - for fp in snap.get("system_flatpaks", []) or []: - item = _normalise_flatpak_item(fp, method="system") - if item.get("name"): - self.flatpaks.append(_prepare_flatpak_item(item)) - for note in snap.get("notes", []) or []: - self.notes.append(str(note)) + def prepare_flatpak_remote(self, item: Dict[str, Any]) -> Dict[str, Any]: + return _prepare_flatpak_remote(item) - def add_snap_snapshot(self, snap: Dict[str, Any]) -> None: - for raw in snap.get("system_snaps", []) or []: - item = _normalise_snap_item(raw) - if item.get("name"): - self.snaps.append(_prepare_snap_item(item)) - for note in snap.get("notes", []) or []: - self.notes.append(str(note)) + def prepare_flatpak_item(self, item: Dict[str, Any]) -> Dict[str, Any]: + return _prepare_flatpak_item(item) + + def prepare_snap_item(self, item: Dict[str, Any]) -> Dict[str, Any]: + return _prepare_snap_item(item) def add_firewall_runtime_snapshot( self, @@ -144,58 +78,16 @@ class PuppetRole(CMModule): module_files_dir: Path, file_prefix: Optional[str] = None, ) -> None: - self.packages.update( - str(p).strip() for p in (snap.get("packages") or []) if str(p).strip() + super().add_firewall_runtime_snapshot( + snap, + bundle_dir=bundle_dir, + artifact_role=artifact_role, + files_dir=module_files_dir, + copy_artifact=_copy_artifact, + source_uri=_source_uri, + file_prefix=file_prefix, + dir_attrs={"require": "File['/etc/enroll']"}, ) - self.add_managed_dir( - "/etc/enroll/firewall", - owner="root", - group="root", - mode="0750", - require="File['/etc/enroll']", - reason="firewall_runtime", - ) - runtime: Dict[str, Any] = {} - for key, dest_name, mode in ( - ("ipset_save", "ipset.save", "0600"), - ("iptables_v4_save", "iptables.v4", "0600"), - ("iptables_v6_save", "iptables.v6", "0600"), - ): - src_rel = str(snap.get(key) or "").strip() - if not src_rel: - continue - role_rel = _copy_artifact( - bundle_dir, - artifact_role, - src_rel, - module_files_dir, - dst_prefix=file_prefix, - ) - if not role_rel: - self.notes.append( - f"Firewall runtime artifact {src_rel!r} was referenced but not found." - ) - continue - dest = f"/etc/enroll/firewall/{dest_name}" - self.add_managed_file( - dest, - owner="root", - group="root", - mode=mode, - source=_source_uri(self.module_name, role_rel), - reason="firewall_runtime", - ) - runtime[key] = dest - ipset_sets = [ - str(x).strip() for x in (snap.get("ipset_sets") or []) if str(x).strip() - ] - if ipset_sets: - runtime["ipset_sets"] = ipset_sets - if runtime: - runtime.update(_firewall_runtime_commands(runtime)) - self.firewall_runtime.update(runtime) - for note in snap.get("notes", []) or []: - self.notes.append(str(note)) def add_container_images_snapshot(self, snap: Dict[str, Any]) -> None: for raw in snap.get("images", []) or []: @@ -374,70 +266,6 @@ def _container_tag_cmd(engine: str, pull_ref: str, tag_ref: str) -> str: return f"{engine} tag {_shell_quote(pull_ref)} {_shell_quote(tag_ref)}" -def _normalise_flatpak_item( - item: Dict[str, Any], - *, - method: str, - user: Optional[str] = None, - home: Optional[str] = None, -) -> Dict[str, Any]: - out = dict(item) - out["method"] = str(out.get("method") or method or "system").strip() or "system" - if user and not out.get("user"): - out["user"] = user - if home and not out.get("home"): - out["home"] = home - ref = str(out.get("ref") or "").strip() - if ref and not out.get("name"): - out["name"] = ref.rsplit("/", 1)[-1] - name = str(out.get("name") or out.get("app_id") or "").strip() - if name: - out["name"] = name - remote = str(out.get("remote") or "").strip() - if remote: - out["remote"] = remote - branch = str(out.get("branch") or out.get("origin") or "").strip() - if branch: - out["branch"] = branch - if ref: - out["ref"] = ref - return out - - -def _normalise_flatpak_remote(item: Dict[str, Any]) -> Dict[str, Any]: - out = dict(item) - name = str(out.get("name") or out.get("remote") or "").strip() - url = str(out.get("url") or out.get("from_url") or "").strip() - method = str(out.get("method") or out.get("scope") or "system").strip() or "system" - if name: - out["name"] = name - if url: - out["url"] = url - out["method"] = "user" if method == "user" else "system" - return out - - -def _normalise_snap_item(item: Dict[str, Any]) -> Dict[str, Any]: - out = dict(item) - name = str(out.get("name") or "").strip() - if name: - out["name"] = name - channel = str(out.get("tracking") or out.get("channel") or "").strip() - if channel: - out["channel"] = channel - notes = [str(note).lower() for note in (out.get("notes") or [])] - confinement = str(out.get("confinement") or "").strip().lower() - out["classic"] = confinement == "classic" or any( - "classic" in note for note in notes - ) - out["devmode"] = any("devmode" in note or "dev mode" in note for note in notes) - out["dangerous"] = any("dangerous" in note for note in notes) - revision = str(out.get("revision") or "").strip() - if revision and not channel: - out["revision"] = revision - return out - - def _flatpak_scope(item: Dict[str, Any]) -> str: return "--user" if str(item.get("method") or "system") == "user" else "--system" @@ -596,30 +424,6 @@ def _state_title(prefix: str, value: Any) -> str: return f"enroll-{prefix}-{safe}" -def _firewall_ipset_restore_cmd(path: str, sets: List[str]) -> str: - flush_parts = [f"ipset flush {_shell_quote(name)} || true" for name in sets] - flush = "; ".join(flush_parts) - restore = f"ipset restore -exist < {_shell_quote(path)}" - if flush: - return f"/bin/sh -c {_shell_quote(flush + '; ' + restore)}" - return f"/bin/sh -c {_shell_quote(restore)}" - - -def _firewall_runtime_commands(runtime: Dict[str, Any]) -> Dict[str, Any]: - out: Dict[str, Any] = {} - ipset_path = str(runtime.get("ipset_save") or "") - if ipset_path: - sets = [str(x) for x in (runtime.get("ipset_sets") or []) if str(x)] - out["ipset_restore_cmd"] = _firewall_ipset_restore_cmd(ipset_path, sets) - ipt4_path = str(runtime.get("iptables_v4_save") or "") - if ipt4_path: - out["iptables_v4_restore_cmd"] = f"iptables-restore {_shell_quote(ipt4_path)}" - ipt6_path = str(runtime.get("iptables_v6_save") or "") - if ipt6_path: - out["iptables_v6_restore_cmd"] = f"ip6tables-restore {_shell_quote(ipt6_path)}" - return out - - def _render_firewall_runtime_execs( lines: List[str], runtime: Dict[str, Any], *, indent: str = " " ) -> None: @@ -757,57 +561,29 @@ def _collect_puppet_roles( file_prefix=node_file_prefix, ) - for svc in roles.get("services", []) or []: - if not isinstance(svc, dict): - continue - original_role_name = _puppet_name( - str(svc.get("role_name") or svc.get("unit") or "service"), - fallback="service", + for entry in CMModule.package_service_entries( + roles, inventory_packages, use_common_roles=use_common_modules + ): + snap = entry.get("snapshot") or {} + kind = str(entry.get("kind") or "package") + fallback = "service" if kind == "service" else "package" + source_label = str( + snap.get("role_name") or snap.get("unit") or snap.get("package") or fallback + ) + original_role_name = _puppet_name(source_label, fallback=fallback) + role_name = _puppet_name( + str(entry.get("role_label") or source_label), + fallback="package_group" if use_common_modules else fallback, ) - if use_common_modules: - role_name = _puppet_name( - section_label_for_packages( - [ - str(p).strip() - for p in (svc.get("packages") or []) - if str(p).strip() - ], - inventory_packages, - ), - fallback="package_group", - ) - else: - role_name = original_role_name prole = ensure_role(role_name) - prole.add_service_snapshot(svc) - prole.add_managed_content( - svc, - bundle_dir=bundle_dir, - artifact_role=str(svc.get("role_name") or original_role_name), - module_files_dir=modules_dir / prole.module_name / "files", - file_prefix=node_file_prefix, - ) - - for pkg in roles.get("packages", []) or []: - if not isinstance(pkg, dict): - continue - original_role_name = _puppet_name( - str(pkg.get("role_name") or pkg.get("package") or "package"), - fallback="package", - ) - if use_common_modules: - role_name = _puppet_name( - package_section_label(pkg, inventory_packages), - fallback="package_group", - ) + if kind == "service": + prole.add_service_snapshot(snap) else: - role_name = original_role_name - prole = ensure_role(role_name) - prole.add_package_snapshot(pkg) + prole.add_package_snapshot(snap) prole.add_managed_content( - pkg, + snap, bundle_dir=bundle_dir, - artifact_role=str(pkg.get("role_name") or original_role_name), + artifact_role=str(snap.get("role_name") or original_role_name), module_files_dir=modules_dir / prole.module_name / "files", file_prefix=node_file_prefix, ) @@ -1577,19 +1353,13 @@ def _render_readme( host = state.get("host", {}) if isinstance(state.get("host"), dict) else {} hostname = host.get("hostname") or "unknown" hiera_mode = bool(fqdn) - role_lines = ( - "\n".join( - f"- `{r.module_name}` from Enroll role `{r.role_name}`" - for r in puppet_roles - ) - or "- None." + role_lines = markdown_list( + f"`{r.module_name}` from Enroll role `{r.role_name}`" for r in puppet_roles + ) + node_lines = markdown_list(f"`{n}`" for n in (node_names or [])) + notes_text = markdown_list( + f"`{r.module_name}`: {note}" for r in puppet_roles for note in r.notes ) - node_lines = "\n".join(f"- `{n}`" for n in (node_names or [])) or "- None." - notes: List[str] = [] - for r in puppet_roles: - for note in r.notes: - notes.append(f"`{r.module_name}`: {note}") - notes_text = "\n".join(f"- {n}" for n in notes) or "- None." if hiera_mode: layout = f"""- `manifests/site.pp` declares node blocks and includes classes listed in Hiera key `enroll::classes`. - `hiera.yaml` configures per-node lookup from `data/nodes/%{{trusted.certname}}.yaml` with a fallback to `data/common.yaml`. @@ -1599,11 +1369,10 @@ def _render_readme( apply = f"""Run from this generated output directory, passing the node certname so Hiera selects the right node data: ```bash -sudo puppet apply --modulepath ./modules --hiera_config ./hiera.yaml --certname {fqdn} manifests/site.pp --noop +sudo puppet apply --modulepath ./modules --hiera_config ./hiera.yaml --certname {fqdn} manifests/site.pp --noop --test ``` -If you depend on other pre-installed Puppet modules (such as for supporting Docker image version enforcement, which Enroll may -have harvested information on), you may need to pass in other modulepaths as well, e.g: +If you depend on other pre-installed Puppet modules, you may need to pass in other modulepaths as well, e.g: ```bash sudo puppet apply --modulepath ./modules:/etc/puppet/code/modules --hiera_config ./hiera.yaml --certname {fqdn} manifests/site.pp --noop @@ -1618,11 +1387,10 @@ For Puppet agent/control-repo use, place this output where `hiera.yaml`, `data/` apply = """Run from this generated output directory so Puppet can find `./modules`, or pass an absolute module path: ```bash -sudo puppet apply --modulepath ./modules manifests/site.pp --noop +sudo puppet apply --modulepath ./modules manifests/site.pp --noop --test ``` -If you depend on other pre-installed Puppet modules (such as for supporting Docker image version enforcement, which Enroll may -have harvested information on), you may need to pass in other modulepaths as well, e.g: +If you depend on other pre-installed Puppet modules, you may need to pass in other modulepaths as well, e.g: ```bash sudo puppet apply --modulepath ./modules:/etc/puppet/code/modules manifests/site.pp --noop @@ -1661,7 +1429,7 @@ This Puppet target reuses the existing harvest state without changing harvesting ## Current limitations -- JinjaTurtle templating is currently Ansible-oriented and is not applied to Puppet output. +- JinjaTurtle templating is currently Ansible/Salt-oriented and is not applied to Puppet output - there are no erb templates, just raw files. - Review generated resources before applying them broadly across unlike hosts. ## Notes diff --git a/enroll/salt.py b/enroll/salt.py index 3e860b8..611e69b 100644 --- a/enroll/salt.py +++ b/enroll/salt.py @@ -12,10 +12,9 @@ import yaml from .cm import ( CMModule, - package_section_label, resolve_catalog_conflicts, role_order_key, - section_label_for_packages, + markdown_list, ) from .jinjaturtle import jinjify_artifact, resolve_jinjaturtle_mode from .state import inventory_packages_from_state, roles_from_state @@ -25,6 +24,8 @@ from .yamlutil import yaml_dump_mapping, yaml_load_mapping_file class SaltRole(CMModule): """Salt-specific view of a renderer-neutral CMModule.""" + managed_owner_attr = "user" + def __init__(self, role_name: str) -> None: super().__init__( role_name=role_name, @@ -34,110 +35,47 @@ class SaltRole(CMModule): self.flatpak_remotes: List[Dict[str, Any]] = [] self.flatpaks: List[Dict[str, Any]] = [] self.snaps: List[Dict[str, Any]] = [] - self.firewall_runtime: Dict[str, Any] = {} def has_resources(self) -> bool: - return ( - super().has_resources() - or bool(self.container_images) - or bool(self.flatpak_remotes) - or bool(self.flatpaks) - or bool(self.snaps) - or bool(self.firewall_runtime) + return self.has_resources_or_attrs( + "container_images", "flatpak_remotes", "flatpaks", "snaps" ) @property def sls_name(self) -> str: return f"roles.{self.module_name}" - def add_package_snapshot(self, snap: Dict[str, Any]) -> None: - pkg = str(snap.get("package") or "").strip() - if pkg: - self.packages.add(pkg) - def add_service_snapshot(self, snap: Dict[str, Any]) -> None: - for pkg in snap.get("packages", []) or []: - pkg_s = str(pkg or "").strip() - if pkg_s: - self.packages.add(pkg_s) - unit = str(snap.get("unit") or "").strip() - if unit: - unit_file_state = str(snap.get("unit_file_state") or "") - self.services[unit] = { - "name": unit, - "state": "running" if snap.get("active_state") == "active" else "dead", - "enable": unit_file_state in ("enabled", "enabled-runtime"), - } + self.add_service_snapshot_state( + snap, state_key="state", running="running", stopped="dead" + ) def add_users_snapshot(self, snap: Dict[str, Any]) -> None: - for u in snap.get("users", []) or []: - if not isinstance(u, dict): - continue - name = str(u.get("name") or "").strip() - if not name: - continue - primary_group = str(u.get("primary_group") or name).strip() - if primary_group: - self.groups.add(primary_group) - supplementary = sorted( - { - str(g).strip() - for g in (u.get("supplementary_groups") or []) - if str(g).strip() - } - ) - self.groups.update(supplementary) + records = self.user_records_from_snapshot(snap) + self.groups.update(self.user_group_names_from_records(records)) + for record in records: + name = str(record.get("name") or "") user_data: Dict[str, Any] = { "name": name, - "uid": u.get("uid"), - "gid": primary_group or u.get("gid"), - "home": u.get("home") or f"/home/{name}", - "shell": u.get("shell"), - "groups": supplementary, + "uid": record.get("uid"), + "gid": record.get("primary_group") or record.get("gid"), + "home": record.get("home"), + "shell": record.get("shell"), + "groups": record.get("supplementary_groups") or [], } - user_data.update(_gecos_attrs(u.get("gecos"))) + user_data.update(_gecos_attrs(record.get("gecos"))) self.users[name] = user_data - home_by_user = { - str(u.get("name")): str(u.get("home") or "") - for u in (snap.get("users", []) or []) - if isinstance(u, dict) and u.get("name") - } - for remote in snap.get("user_flatpak_remotes", []) or []: - item = _normalise_flatpak_remote(remote) - user = str(item.get("user") or "").strip() - if user and not item.get("home"): - item["home"] = home_by_user.get(user) or f"/home/{user}" - if item.get("method") == "user" and item.get("name") and item.get("url"): - self.flatpak_remotes.append(_prepare_flatpak_remote(item)) - for uname, flatpaks in (snap.get("user_flatpaks", {}) or {}).items(): - user = str(uname) - for fp in flatpaks or []: - item = _normalise_flatpak_item( - fp, method="user", user=user, home=home_by_user.get(user) or None - ) - if item.get("name"): - self.flatpaks.append(_prepare_flatpak_item(item)) + self.add_user_flatpaks_snapshot(snap) - def add_flatpak_snapshot(self, snap: Dict[str, Any]) -> None: - for remote in snap.get("remotes", []) or []: - item = _normalise_flatpak_remote(remote) - if item.get("name") and item.get("url"): - self.flatpak_remotes.append(_prepare_flatpak_remote(item)) - for fp in snap.get("system_flatpaks", []) or []: - item = _normalise_flatpak_item(fp, method="system") - if item.get("name"): - self.flatpaks.append(_prepare_flatpak_item(item)) - for note in snap.get("notes", []) or []: - self.notes.append(str(note)) + def prepare_flatpak_remote(self, item: Dict[str, Any]) -> Dict[str, Any]: + return _prepare_flatpak_remote(item) - def add_snap_snapshot(self, snap: Dict[str, Any]) -> None: - for raw in snap.get("system_snaps", []) or []: - item = _normalise_snap_item(raw) - if item.get("name"): - self.snaps.append(_prepare_snap_item(item)) - for note in snap.get("notes", []) or []: - self.notes.append(str(note)) + def prepare_flatpak_item(self, item: Dict[str, Any]) -> Dict[str, Any]: + return _prepare_flatpak_item(item) + + def prepare_snap_item(self, item: Dict[str, Any]) -> Dict[str, Any]: + return _prepare_snap_item(item) def add_firewall_runtime_snapshot( self, @@ -148,58 +86,16 @@ class SaltRole(CMModule): role_files_dir: Path, file_prefix: Optional[str] = None, ) -> None: - self.packages.update( - str(p).strip() for p in (snap.get("packages") or []) if str(p).strip() + super().add_firewall_runtime_snapshot( + snap, + bundle_dir=bundle_dir, + artifact_role=artifact_role, + files_dir=role_files_dir, + copy_artifact=_copy_artifact, + source_uri=_source_uri, + file_prefix=file_prefix, + dir_attrs={"require": [{"file": "/etc/enroll"}]}, ) - self.add_managed_dir( - "/etc/enroll/firewall", - user="root", - group="root", - mode="0750", - require=[{"file": "/etc/enroll"}], - reason="firewall_runtime", - ) - runtime: Dict[str, Any] = {} - for key, dest_name, mode in ( - ("ipset_save", "ipset.save", "0600"), - ("iptables_v4_save", "iptables.v4", "0600"), - ("iptables_v6_save", "iptables.v6", "0600"), - ): - src_rel = str(snap.get(key) or "").strip() - if not src_rel: - continue - role_rel = _copy_artifact( - bundle_dir, - artifact_role, - src_rel, - role_files_dir, - dst_prefix=file_prefix, - ) - if not role_rel: - self.notes.append( - f"Firewall runtime artifact {src_rel!r} was referenced but not found." - ) - continue - dest = f"/etc/enroll/firewall/{dest_name}" - self.add_managed_file( - dest, - user="root", - group="root", - mode=mode, - source=_source_uri(self.module_name, role_rel), - reason="firewall_runtime", - ) - runtime[key] = dest - ipset_sets = [ - str(x).strip() for x in (snap.get("ipset_sets") or []) if str(x).strip() - ] - if ipset_sets: - runtime["ipset_sets"] = ipset_sets - if runtime: - runtime.update(_firewall_runtime_commands(runtime)) - self.firewall_runtime.update(runtime) - for note in snap.get("notes", []) or []: - self.notes.append(str(note)) def add_container_images_snapshot(self, snap: Dict[str, Any]) -> None: for raw in snap.get("images", []) or []: @@ -413,70 +309,6 @@ def _container_tag_cmd(engine: str, pull_ref: str, tag_ref: str) -> str: return f"{engine} tag {_shell_quote(pull_ref)} {_shell_quote(tag_ref)}" -def _normalise_flatpak_item( - item: Dict[str, Any], - *, - method: str, - user: Optional[str] = None, - home: Optional[str] = None, -) -> Dict[str, Any]: - out = dict(item) - out["method"] = str(out.get("method") or method or "system").strip() or "system" - if user and not out.get("user"): - out["user"] = user - if home and not out.get("home"): - out["home"] = home - ref = str(out.get("ref") or "").strip() - if ref and not out.get("name"): - out["name"] = ref.rsplit("/", 1)[-1] - name = str(out.get("name") or out.get("app_id") or "").strip() - if name: - out["name"] = name - remote = str(out.get("remote") or "").strip() - if remote: - out["remote"] = remote - branch = str(out.get("branch") or out.get("origin") or "").strip() - if branch: - out["branch"] = branch - if ref: - out["ref"] = ref - return out - - -def _normalise_flatpak_remote(item: Dict[str, Any]) -> Dict[str, Any]: - out = dict(item) - name = str(out.get("name") or out.get("remote") or "").strip() - url = str(out.get("url") or out.get("from_url") or "").strip() - method = str(out.get("method") or out.get("scope") or "system").strip() or "system" - if name: - out["name"] = name - if url: - out["url"] = url - out["method"] = "user" if method == "user" else "system" - return out - - -def _normalise_snap_item(item: Dict[str, Any]) -> Dict[str, Any]: - out = dict(item) - name = str(out.get("name") or "").strip() - if name: - out["name"] = name - channel = str(out.get("tracking") or out.get("channel") or "").strip() - if channel: - out["channel"] = channel - notes = [str(note).lower() for note in (out.get("notes") or [])] - confinement = str(out.get("confinement") or "").strip().lower() - out["classic"] = confinement == "classic" or any( - "classic" in note for note in notes - ) - out["devmode"] = any("devmode" in note or "dev mode" in note for note in notes) - out["dangerous"] = any("dangerous" in note for note in notes) - revision = str(out.get("revision") or "").strip() - if revision and not channel: - out["revision"] = revision - return out - - def _flatpak_scope(item: Dict[str, Any]) -> str: return "--user" if str(item.get("method") or "system") == "user" else "--system" @@ -583,30 +415,6 @@ def _prepare_snap_item(item: Dict[str, Any]) -> Dict[str, Any]: return out -def _firewall_ipset_restore_cmd(path: str, sets: List[str]) -> str: - flush_parts = [f"ipset flush {_shell_quote(name)} || true" for name in sets] - flush = "; ".join(flush_parts) - restore = f"ipset restore -exist < {_shell_quote(path)}" - if flush: - return f"/bin/sh -c {_shell_quote(flush + '; ' + restore)}" - return f"/bin/sh -c {_shell_quote(restore)}" - - -def _firewall_runtime_commands(runtime: Dict[str, Any]) -> Dict[str, Any]: - out: Dict[str, Any] = {} - ipset_path = str(runtime.get("ipset_save") or "") - if ipset_path: - sets = [str(x) for x in (runtime.get("ipset_sets") or []) if str(x)] - out["ipset_restore_cmd"] = _firewall_ipset_restore_cmd(ipset_path, sets) - ipt4_path = str(runtime.get("iptables_v4_save") or "") - if ipt4_path: - out["iptables_v4_restore_cmd"] = f"iptables-restore {_shell_quote(ipt4_path)}" - ipt6_path = str(runtime.get("iptables_v6_save") or "") - if ipt6_path: - out["iptables_v6_restore_cmd"] = f"ip6tables-restore {_shell_quote(ipt6_path)}" - return out - - def _append_firewall_runtime_states(lines: List[str], runtime: Dict[str, Any]) -> None: specs = [ ( @@ -807,60 +615,29 @@ def _collect_salt_roles( overwrite_templates=not bool(fqdn), ) - for svc in roles.get("services", []) or []: - if not isinstance(svc, dict): - continue - original_role_name = _salt_name( - str(svc.get("role_name") or svc.get("unit") or "service"), - fallback="service", + for entry in CMModule.package_service_entries( + roles, inventory_packages, use_common_roles=use_common_roles + ): + snap = entry.get("snapshot") or {} + kind = str(entry.get("kind") or "package") + fallback = "service" if kind == "service" else "package" + source_label = str( + snap.get("role_name") or snap.get("unit") or snap.get("package") or fallback + ) + original_role_name = _salt_name(source_label, fallback=fallback) + role_name = _salt_name( + str(entry.get("role_label") or source_label), + fallback="package_group" if use_common_roles else fallback, ) - if use_common_roles: - role_name = _salt_name( - section_label_for_packages( - [ - str(p).strip() - for p in (svc.get("packages") or []) - if str(p).strip() - ], - inventory_packages, - ), - fallback="package_group", - ) - else: - role_name = original_role_name srole = ensure_role(role_name) - srole.add_service_snapshot(svc) - srole.add_managed_content( - svc, - bundle_dir=bundle_dir, - artifact_role=str(svc.get("role_name") or original_role_name), - role_files_dir=states_dir / "roles" / srole.module_name / "files", - file_prefix=node_file_prefix, - jt_exe=jt_exe, - jt_enabled=jt_enabled, - overwrite_templates=not bool(fqdn), - ) - - for pkg in roles.get("packages", []) or []: - if not isinstance(pkg, dict): - continue - original_role_name = _salt_name( - str(pkg.get("role_name") or pkg.get("package") or "package"), - fallback="package", - ) - if use_common_roles: - role_name = _salt_name( - package_section_label(pkg, inventory_packages), - fallback="package_group", - ) + if kind == "service": + srole.add_service_snapshot(snap) else: - role_name = original_role_name - srole = ensure_role(role_name) - srole.add_package_snapshot(pkg) + srole.add_package_snapshot(snap) srole.add_managed_content( - pkg, + snap, bundle_dir=bundle_dir, - artifact_role=str(pkg.get("role_name") or original_role_name), + artifact_role=str(snap.get("role_name") or original_role_name), role_files_dir=states_dir / "roles" / srole.module_name / "files", file_prefix=node_file_prefix, jt_exe=jt_exe, @@ -1621,17 +1398,12 @@ def _render_readme( ) -> str: host = state.get("host", {}) if isinstance(state.get("host"), dict) else {} hostname = host.get("hostname") or "unknown" - role_lines = ( - "\n".join( - f"- `{r.sls_name}` from Enroll role `{r.role_name}`" for r in salt_roles - ) - or "- None." + role_lines = markdown_list( + f"`{r.sls_name}` from Enroll role `{r.role_name}`" for r in salt_roles + ) + notes_text = markdown_list( + f"`{r.sls_name}`: {note}" for r in salt_roles for note in r.notes ) - notes: List[str] = [] - for r in salt_roles: - for note in r.notes: - notes.append(f"`{r.sls_name}`: {note}") - notes_text = "\n".join(f"- {n}" for n in notes) or "- None." if fqdn: node_display = ( diff --git a/tests/test_manifest.py b/tests/test_manifest.py index 9c10df7..4c00c1d 100644 --- a/tests/test_manifest.py +++ b/tests/test_manifest.py @@ -433,7 +433,8 @@ def test_manifest_groups_excluded_package_paths_into_common_roles(tmp_path: Path assert (out / "roles" / "net").exists() assert not (out / "roles" / "secret_agent").exists() - readme = (out / "roles" / "net" / "README.md").read_text(encoding="utf-8") + assert not (out / "roles" / "net" / "README.md").exists() + readme = (out / "README.md").read_text(encoding="utf-8") assert "/etc/secret-agent/key" in readme @@ -1461,24 +1462,32 @@ def test_copy2_replace_atomic(tmp_path: Path): def test_render_firewall_runtime_tasks_empty(): - result = ansible_tasks._render_firewall_runtime_tasks("firewall_runtime") + result = ansible_tasks._render_role_tasks( + ansible_tasks.AnsibleRole("firewall_runtime"), firewall_runtime=True + ) # Function always returns at least a basic playbook structure assert isinstance(result, str) assert len(result) > 0 def test_render_firewall_runtime_tasks_with_iptables(): - result = ansible_tasks._render_firewall_runtime_tasks("firewall_runtime") + result = ansible_tasks._render_role_tasks( + ansible_tasks.AnsibleRole("firewall_runtime"), firewall_runtime=True + ) assert len(result) >= 1 def test_render_firewall_runtime_tasks_with_ipset(): - result = ansible_tasks._render_firewall_runtime_tasks("firewall_runtime") + result = ansible_tasks._render_role_tasks( + ansible_tasks.AnsibleRole("firewall_runtime"), firewall_runtime=True + ) assert len(result) >= 1 def test_render_firewall_runtime_tasks_with_ipv6(): - result = ansible_tasks._render_firewall_runtime_tasks("firewall_runtime") + result = ansible_tasks._render_role_tasks( + ansible_tasks.AnsibleRole("firewall_runtime"), firewall_runtime=True + ) assert len(result) >= 1 @@ -1608,7 +1617,8 @@ def test_manifest_renders_flatpak_and_snap_details(tmp_path: Path): users_tasks = (out / "roles" / "users" / "tasks" / "main.yml").read_text( encoding="utf-8" ) - users_readme = (out / "roles" / "users" / "README.md").read_text(encoding="utf-8") + assert not (out / "roles" / "users" / "README.md").exists() + users_readme = (out / "README.md").read_text(encoding="utf-8") flatpak_defaults = (out / "roles" / "flatpak" / "defaults" / "main.yml").read_text( encoding="utf-8" )