Compare commits

...

1 commit
0.5.0 ... main

Author SHA1 Message Date
b25dd1e314
* Add support for capturing ipset and iptables configuration files
All checks were successful
CI / test (push) Successful in 8m23s
Lint / test (push) Successful in 33s
* Add support for generating ipset and iptables configuration files from runtime, if the former weren't present (`firewall_runtime` role)
 * Dependency updates
2026-05-14 15:16:36 +10:00
13 changed files with 856 additions and 11 deletions

View file

@ -1,3 +1,14 @@
# 0.6.0
* Add support for capturing ipset and iptables configuration files
* Add support for generating ipset and iptables configuration files from runtime, if the former weren't present (`firewall_runtime` role)
* Dependency updates
# 0.5.0
* Add support for templating `sshd_config`, if a compatible version of JinjaTurtle is also present.
* Dependency updates
# 0.4.4 # 0.4.4
* Update cryptography dependency * Update cryptography dependency

View file

@ -13,6 +13,7 @@
- Defensively excludes likely secrets (path denylist + content sniff + size caps). - Defensively excludes likely secrets (path denylist + content sniff + size caps).
- Captures non-system users and their SSH public keys and any .bashrc or .bash_aliases or .profile files that deviate from the skel defaults. - Captures non-system users and their SSH public keys and any .bashrc or .bash_aliases or .profile files that deviate from the skel defaults.
- Captures miscellaneous `/etc` files it can't attribute to a package and installs them in an `etc_custom` role. - Captures miscellaneous `/etc` files it can't attribute to a package and installs them in an `etc_custom` role.
- Captures live ipset and iptables runtime state into a fallback `firewall_runtime` role, when active ipsets/iptables rules are present *and* no corresponding persistent ipset/iptables *files* were found.
- Captures symlinks in common applications that rely on them, e.g apache2/nginx 'sites-enabled' - Captures symlinks in common applications that rely on them, e.g apache2/nginx 'sites-enabled'
- Ditto for /usr/local/bin (for non-binary files) and /usr/local/etc - Ditto for /usr/local/bin (for non-binary files) and /usr/local/etc
- Avoids trying to start systemd services that were detected as inactive during harvest. - Avoids trying to start systemd services that were detected as inactive during harvest.
@ -70,6 +71,8 @@ Harvest state about a host and write a harvest bundle.
- Changed-from-default config (plus related custom/unowned files under service dirs) - Changed-from-default config (plus related custom/unowned files under service dirs)
- Non-system users + SSH public keys - Non-system users + SSH public keys
- Misc `/etc` that can't be attributed to a package (`etc_custom` role) - Misc `/etc` that can't be attributed to a package (`etc_custom` role)
- Static firewall config files such as nftables, UFW, firewalld, `/etc/iptables/rules.v4`, `/etc/iptables/rules.v6`, and `/etc/ipset*`
- Live kernel ipset/iptables state via `ipset save`, `iptables-save`, and `ip6tables-save` as a fallback, but only when the corresponding persistent config was not found (`firewall_runtime` role at manifest time)
- Optional user-specified extra files/dirs via `--include-path` (emitted as an `extra_paths` role at manifest time) - Optional user-specified extra files/dirs via `--include-path` (emitted as an `extra_paths` role at manifest time)
**Common flags** **Common flags**
@ -531,6 +534,7 @@ Roles collected
- packages: 232 package snapshot(s), 41 file(s), 0 excluded - packages: 232 package snapshot(s), 41 file(s), 0 excluded
- apt_config: 26 file(s), 7 dir(s), 10 excluded - apt_config: 26 file(s), 7 dir(s), 10 excluded
- dnf_config: 0 file(s), 0 dir(s), 0 excluded - dnf_config: 0 file(s), 0 dir(s), 0 excluded
- firewall_runtime: 2 snapshot(s), 1 ipset(s)
- etc_custom: 70 file(s), 20 dir(s), 0 excluded - etc_custom: 70 file(s), 20 dir(s), 0 excluded
- usr_local_custom: 35 file(s), 1 dir(s), 0 excluded - usr_local_custom: 35 file(s), 1 dir(s), 0 excluded
- extra_paths: 0 file(s), 0 dir(s), 0 excluded - extra_paths: 0 file(s), 0 dir(s), 0 excluded

7
debian/changelog vendored
View file

@ -1,3 +1,10 @@
enroll (0.6.0) unstable; urgency=medium
* Add support for capturing ipset and iptables configuration files
* Add support for generating ipset and iptables configuration files from runtime, if the former weren't present ('firewall_runtime' role)
-- Miguel Jacq <mig@mig5.net> Thu, 14 May 2026 15:00 +1000
enroll (0.5.0) unstable; urgency=medium enroll (0.5.0) unstable; urgency=medium
* Add ssh config support where JinjaTurtle is used * Add ssh config support where JinjaTurtle is used

View file

@ -72,7 +72,7 @@ _MANAGED_FILE_REASONS: Dict[str, ReasonInfo] = {
), ),
"system_firewall": ReasonInfo( "system_firewall": ReasonInfo(
"Firewall configuration", "Firewall configuration",
"Firewall rules/configuration (ufw, nftables, iptables, etc.).", "Firewall rules/configuration (ufw, nftables, iptables, ipset, etc.).",
), ),
"system_sysctl": ReasonInfo( "system_sysctl": ReasonInfo(
"sysctl configuration", "sysctl configuration",
@ -211,6 +211,10 @@ _OBSERVED_VIA: Dict[str, ReasonInfo] = {
"Referenced by package role", "Referenced by package role",
"Package was referenced by an enroll packages snapshot/role.", "Package was referenced by an enroll packages snapshot/role.",
), ),
"firewall_runtime": ReasonInfo(
"Referenced by firewall runtime role",
"Package was referenced by captured live ipset/iptables runtime state.",
),
} }
@ -359,6 +363,22 @@ def explain_state(
} }
) )
# Runtime firewall snapshot
firewall_obj = roles.get("firewall_runtime") or {}
if isinstance(firewall_obj, dict) and firewall_obj:
captures = [
key
for key in ("ipset_save", "iptables_v4_save", "iptables_v6_save")
if firewall_obj.get(key)
]
role_summaries.append(
{
"role": "firewall_runtime",
"summary": f"{len(captures)} snapshot(s), {len(firewall_obj.get('ipset_sets') or [])} ipset(s)",
"notes": firewall_obj.get("notes") or [],
}
)
# Single snapshots # Single snapshots
for rname in [ for rname in [
"apt_config", "apt_config",

View file

@ -5,10 +5,12 @@ import json
import os import os
import re import re
import shutil import shutil
import shlex
import stat import stat
import subprocess # nosec
import time import time
from dataclasses import dataclass, asdict, field from dataclasses import dataclass, asdict, field
from typing import Dict, List, Optional, Set from typing import Dict, List, Optional, Set, Tuple
from .systemd import ( from .systemd import (
list_enabled_services, list_enabled_services,
@ -148,6 +150,17 @@ class ExtraPathsSnapshot:
notes: List[str] = field(default_factory=list) notes: List[str] = field(default_factory=list)
@dataclass
class FirewallRuntimeSnapshot:
role_name: str
packages: List[str] = field(default_factory=list)
ipset_save: Optional[str] = None
ipset_sets: List[str] = field(default_factory=list)
iptables_v4_save: Optional[str] = None
iptables_v6_save: Optional[str] = None
notes: List[str] = field(default_factory=list)
ALLOWED_UNOWNED_EXTS = { ALLOWED_UNOWNED_EXTS = {
".cfg", ".cfg",
".cnf", ".cnf",
@ -653,6 +666,13 @@ _SYSTEM_CAPTURE_GLOBS: List[tuple[str, str]] = [
("/etc/nftables.d/*", "system_firewall"), ("/etc/nftables.d/*", "system_firewall"),
("/etc/iptables/rules.v4", "system_firewall"), ("/etc/iptables/rules.v4", "system_firewall"),
("/etc/iptables/rules.v6", "system_firewall"), ("/etc/iptables/rules.v6", "system_firewall"),
("/etc/sysconfig/iptables", "system_firewall"),
("/etc/sysconfig/ip6tables", "system_firewall"),
("/etc/ipset.conf", "system_firewall"),
("/etc/ipset/*", "system_firewall"),
("/etc/ipset.d/*", "system_firewall"),
("/etc/sysconfig/ipset", "system_firewall"),
("/etc/default/ipset", "system_firewall"),
("/etc/ufw/*", "system_firewall"), ("/etc/ufw/*", "system_firewall"),
("/etc/default/ufw", "system_firewall"), ("/etc/default/ufw", "system_firewall"),
("/etc/firewalld/*", "system_firewall"), ("/etc/firewalld/*", "system_firewall"),
@ -664,6 +684,46 @@ _SYSTEM_CAPTURE_GLOBS: List[tuple[str, str]] = [
] ]
# Persistent firewall files that are treated as authoritative for their
# respective runtime state. If any matching file exists, the runtime capture
# for that family is retained only as static managed-file harvest output and
# not duplicated through the generated firewall_runtime role.
_PERSISTENT_IPTABLES_V4_GLOBS = [
"/etc/iptables/rules.v4",
"/etc/sysconfig/iptables",
]
_PERSISTENT_IPTABLES_V6_GLOBS = [
"/etc/iptables/rules.v6",
"/etc/sysconfig/ip6tables",
]
_PERSISTENT_IPSET_GLOBS = [
"/etc/ipset.conf",
"/etc/ipset/*",
"/etc/ipset.d/*",
"/etc/sysconfig/ipset",
]
def _persistent_firewall_files(globs: List[str]) -> List[str]:
"""Return persistent firewall files matching ``globs``.
This intentionally uses the same file walking helper as the static system
capture path so the runtime fallback decision matches what Enroll can
harvest as managed files.
"""
seen: Set[str] = set()
out: List[str] = []
for spec in globs:
for path in _iter_matching_files(spec):
if path in seen:
continue
seen.add(path)
out.append(path)
return sorted(out)
def _iter_matching_files(spec: str, *, cap: int = MAX_FILES_CAP) -> List[str]: def _iter_matching_files(spec: str, *, cap: int = MAX_FILES_CAP) -> List[str]:
"""Expand a glob spec and also walk directories to collect files.""" """Expand a glob spec and also walk directories to collect files."""
out: List[str] = [] out: List[str] = []
@ -854,6 +914,200 @@ def _iter_system_capture_paths() -> List[tuple[str, str]]:
return uniq return uniq
_FIREWALL_CAPTURE_COMMANDS: Dict[str, Tuple[str, ...]] = {
"ipset_save": ("ipset", "save"),
"iptables_v4_save": ("iptables-save",),
"iptables_v6_save": ("ip6tables-save",),
}
def _run_capture_command(
command_key: str, *, timeout: int = 10
) -> tuple[Optional[str], Optional[str]]:
"""Return (stdout, error_note) for an allowlisted local state command.
The command key is resolved through ``_FIREWALL_CAPTURE_COMMANDS`` so this
helper never executes caller-supplied argv. Commands are run with
``shell=False`` explicitly to avoid shell interpretation.
"""
argv = _FIREWALL_CAPTURE_COMMANDS.get(command_key)
if argv is None:
return None, f"Unknown capture command: {command_key}"
exe = argv[0]
if shutil.which(exe) is None:
return None, f"{exe} not found on PATH."
try:
proc = subprocess.run( # nosec
argv,
shell=False,
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=timeout,
)
except Exception as e: # noqa: BLE001
return None, f"{' '.join(argv)} failed: {e!r}"
if proc.returncode != 0:
stderr = (proc.stderr or "").strip()
if len(stderr) > 300:
stderr = stderr[:297] + "..."
return (
None,
f"{' '.join(argv)} exited {proc.returncode}: {stderr or '(no stderr)'}",
)
return proc.stdout or "", None
def _write_generated_artifact(
bundle_dir: str, role_name: str, src_rel: str, content: str
) -> None:
"""Write a generated harvest artifact that did not exist as a file on disk."""
dst = os.path.join(bundle_dir, "artifacts", role_name, src_rel)
os.makedirs(os.path.dirname(dst), exist_ok=True)
with open(dst, "w", encoding="utf-8") as f:
f.write(content)
def _ipset_save_has_state(text: str) -> bool:
for raw in (text or "").splitlines():
line = raw.strip()
if not line or line.startswith("#"):
continue
if line.startswith(("create ", "add ")):
return True
return False
def _parse_ipset_set_names(text: str) -> List[str]:
names: List[str] = []
seen: Set[str] = set()
for raw in (text or "").splitlines():
line = raw.strip()
if not line or line.startswith("#"):
continue
try:
toks = shlex.split(line)
except ValueError:
toks = line.split()
if len(toks) >= 2 and toks[0] == "create" and toks[1] not in seen:
seen.add(toks[1])
names.append(toks[1])
return names
def _iptables_save_has_state(text: str) -> bool:
"""Return True when iptables-save output contains non-default state."""
for raw in (text or "").splitlines():
line = raw.strip()
if not line or line.startswith("#"):
continue
if line.startswith("*") or line == "COMMIT":
continue
if line.startswith(":"):
parts = line.split()
chain_name = parts[0][1:] if parts else ""
policy = parts[1] if len(parts) >= 2 else ""
# Built-in empty chains usually look like ':INPUT ACCEPT [0:0]'.
# A changed policy, or any custom chain, is meaningful state.
if policy not in ("ACCEPT", "-"):
return True
if policy == "-" and chain_name:
return True
continue
if line.startswith(("-A ", "-I ", "-N ", "-P ", "-R ")):
return True
return False
def _collect_firewall_runtime_snapshot(
bundle_dir: str,
*,
persistent_ipset_files: Optional[List[str]] = None,
persistent_iptables_v4_files: Optional[List[str]] = None,
persistent_iptables_v6_files: Optional[List[str]] = None,
) -> FirewallRuntimeSnapshot:
"""Capture live kernel firewall state only when no persistent config exists.
Enroll also harvests persistent firewall files such as
/etc/iptables/rules.v4, /etc/iptables/rules.v6, and /etc/ipset.conf as
managed files. The generated runtime restore role is therefore a fallback:
it captures each firewall family only when that family has no persistent
file to avoid generating two roles that try to manage the same state.
"""
role_name = "firewall_runtime"
packages: Set[str] = set()
notes: List[str] = []
ipset_save_rel: Optional[str] = None
ipset_sets: List[str] = []
iptables_v4_rel: Optional[str] = None
iptables_v6_rel: Optional[str] = None
persistent_ipset_files = persistent_ipset_files or []
persistent_iptables_v4_files = persistent_iptables_v4_files or []
persistent_iptables_v6_files = persistent_iptables_v6_files or []
if persistent_ipset_files:
notes.append(
"Live ipset runtime capture skipped because persistent ipset "
f"configuration was found: {', '.join(persistent_ipset_files)}"
)
else:
ipset_out, ipset_err = _run_capture_command("ipset_save")
if ipset_err:
notes.append(ipset_err)
elif ipset_out is not None and _ipset_save_has_state(ipset_out):
ipset_save_rel = "firewall/ipset.save"
_write_generated_artifact(bundle_dir, role_name, ipset_save_rel, ipset_out)
ipset_sets = _parse_ipset_set_names(ipset_out)
packages.add("ipset")
if persistent_iptables_v4_files:
notes.append(
"Live IPv4 iptables runtime capture skipped because persistent "
f"IPv4 iptables configuration was found: {', '.join(persistent_iptables_v4_files)}"
)
else:
ipt4_out, ipt4_err = _run_capture_command("iptables_v4_save")
if ipt4_err:
notes.append(ipt4_err)
elif ipt4_out is not None and _iptables_save_has_state(ipt4_out):
iptables_v4_rel = "firewall/iptables.v4"
_write_generated_artifact(bundle_dir, role_name, iptables_v4_rel, ipt4_out)
packages.add("iptables")
if persistent_iptables_v6_files:
notes.append(
"Live IPv6 iptables runtime capture skipped because persistent "
f"IPv6 iptables configuration was found: {', '.join(persistent_iptables_v6_files)}"
)
else:
ipt6_out, ipt6_err = _run_capture_command("iptables_v6_save")
if ipt6_err:
notes.append(ipt6_err)
elif ipt6_out is not None and _iptables_save_has_state(ipt6_out):
iptables_v6_rel = "firewall/iptables.v6"
_write_generated_artifact(bundle_dir, role_name, iptables_v6_rel, ipt6_out)
packages.add("iptables")
# Package names are intentionally added only when matching live state was
# captured. Merely having iptables/ipset installed should not create a role.
return FirewallRuntimeSnapshot(
role_name=role_name,
packages=sorted(packages),
ipset_save=ipset_save_rel,
ipset_sets=ipset_sets,
iptables_v4_save=iptables_v4_rel,
iptables_v6_save=iptables_v6_rel,
notes=notes,
)
def harvest( def harvest(
bundle_dir: str, bundle_dir: str,
policy: Optional[IgnorePolicy] = None, policy: Optional[IgnorePolicy] = None,
@ -907,6 +1161,29 @@ def harvest(
installed_pkgs = backend.installed_packages() or {} installed_pkgs = backend.installed_packages() or {}
installed_names: Set[str] = set(installed_pkgs.keys()) installed_names: Set[str] = set(installed_pkgs.keys())
persistent_ipset_files = _persistent_firewall_files(_PERSISTENT_IPSET_GLOBS)
persistent_iptables_v4_files = _persistent_firewall_files(
_PERSISTENT_IPTABLES_V4_GLOBS
)
persistent_iptables_v6_files = _persistent_firewall_files(
_PERSISTENT_IPTABLES_V6_GLOBS
)
if hasattr(os, "geteuid") and os.geteuid() != 0:
firewall_runtime_snapshot = FirewallRuntimeSnapshot(
role_name="firewall_runtime",
notes=[
"Live ipset/iptables runtime capture skipped because harvest is not running as root."
],
)
else:
firewall_runtime_snapshot = _collect_firewall_runtime_snapshot(
bundle_dir,
persistent_ipset_files=persistent_ipset_files,
persistent_iptables_v4_files=persistent_iptables_v4_files,
persistent_iptables_v6_files=persistent_iptables_v6_files,
)
def _pick_installed(cands: List[str]) -> Optional[str]: def _pick_installed(cands: List[str]) -> Optional[str]:
for c in cands: for c in cands:
if c in installed_names: if c in installed_names:
@ -2121,6 +2398,7 @@ def harvest(
pkg_names |= manual_set pkg_names |= manual_set
pkg_names |= set(pkg_units.keys()) pkg_names |= set(pkg_units.keys())
pkg_names |= {ps.package for ps in pkg_snaps} pkg_names |= {ps.package for ps in pkg_snaps}
pkg_names |= set(firewall_runtime_snapshot.packages or [])
packages_inventory: Dict[str, Dict[str, object]] = {} packages_inventory: Dict[str, Dict[str, object]] = {}
for pkg in sorted(pkg_names): for pkg in sorted(pkg_names):
@ -2136,6 +2414,13 @@ def harvest(
observed.append({"kind": "systemd_unit", "ref": unit}) observed.append({"kind": "systemd_unit", "ref": unit})
for rn in sorted(set(pkg_role_names.get(pkg, []))): for rn in sorted(set(pkg_role_names.get(pkg, []))):
observed.append({"kind": "package_role", "ref": rn}) observed.append({"kind": "package_role", "ref": rn})
if pkg in set(firewall_runtime_snapshot.packages or []):
observed.append(
{"kind": "firewall_runtime", "ref": firewall_runtime_snapshot.role_name}
)
pkg_roles_map.setdefault(pkg, set()).add(
firewall_runtime_snapshot.role_name
)
roles = sorted(pkg_roles_map.get(pkg, set())) roles = sorted(pkg_roles_map.get(pkg, set()))
@ -2219,6 +2504,7 @@ def harvest(
"packages": [asdict(p) for p in pkg_snaps], "packages": [asdict(p) for p in pkg_snaps],
"apt_config": asdict(apt_config_snapshot), "apt_config": asdict(apt_config_snapshot),
"dnf_config": asdict(dnf_config_snapshot), "dnf_config": asdict(dnf_config_snapshot),
"firewall_runtime": asdict(firewall_runtime_snapshot),
"etc_custom": asdict(etc_custom_snapshot), "etc_custom": asdict(etc_custom_snapshot),
"usr_local_custom": asdict(usr_local_custom_snapshot), "usr_local_custom": asdict(usr_local_custom_snapshot),
"extra_paths": asdict(extra_paths_snapshot), "extra_paths": asdict(extra_paths_snapshot),

View file

@ -582,6 +582,97 @@ def _render_install_packages_tasks(role: str, var_prefix: str) -> str:
""" """
def _render_firewall_runtime_tasks(var_prefix: str) -> str:
"""Render tasks for live ipset/iptables snapshots."""
return f"""- name: Ensure firewall runtime snapshot directory exists
ansible.builtin.file:
path: /etc/enroll/firewall
state: directory
owner: root
group: root
mode: "0750"
- name: Deploy captured ipset snapshot
vars:
_enroll_ff:
files:
- "{{{{ inventory_dir }}}}/host_vars/{{{{ inventory_hostname }}}}/{{{{ role_name }}}}/.files/{{{{ {var_prefix}_ipset_save }}}}"
- "{{{{ role_path }}}}/files/{{{{ {var_prefix}_ipset_save }}}}"
ansible.builtin.copy:
src: "{{{{ lookup('ansible.builtin.first_found', _enroll_ff) }}}}"
dest: /etc/enroll/firewall/ipset.save
owner: root
group: root
mode: "0600"
when: ({var_prefix}_ipset_save | default('') | length) > 0
- name: Flush captured ipsets before restoring members
ansible.builtin.command:
cmd: "ipset flush {{{{ item }}}}"
loop: "{{{{ {var_prefix}_ipset_sets | default([]) }}}}"
register: _enroll_ipset_flush
failed_when: false
changed_when: false
when:
- ({var_prefix}_ipset_save | default('') | length) > 0
- {var_prefix}_sync_ipsets_exact | default(true) | bool
- name: Restore captured ipsets
ansible.builtin.shell: "ipset restore -exist < /etc/enroll/firewall/ipset.save"
args:
executable: /bin/sh
register: _enroll_ipset_restore
changed_when: _enroll_ipset_restore.rc == 0
when: ({var_prefix}_ipset_save | default('') | length) > 0
- name: Deploy captured IPv4 iptables snapshot
vars:
_enroll_ff:
files:
- "{{{{ inventory_dir }}}}/host_vars/{{{{ inventory_hostname }}}}/{{{{ role_name }}}}/.files/{{{{ {var_prefix}_iptables_v4_save }}}}"
- "{{{{ role_path }}}}/files/{{{{ {var_prefix}_iptables_v4_save }}}}"
ansible.builtin.copy:
src: "{{{{ lookup('ansible.builtin.first_found', _enroll_ff) }}}}"
dest: /etc/enroll/firewall/iptables.v4
owner: root
group: root
mode: "0600"
when: ({var_prefix}_iptables_v4_save | default('') | length) > 0
- name: Restore captured IPv4 iptables rules
ansible.builtin.command:
cmd: iptables-restore /etc/enroll/firewall/iptables.v4
register: _enroll_iptables_v4_restore
changed_when: _enroll_iptables_v4_restore.rc == 0
when:
- ({var_prefix}_iptables_v4_save | default('') | length) > 0
- {var_prefix}_restore_iptables | default(true) | bool
- name: Deploy captured IPv6 iptables snapshot
vars:
_enroll_ff:
files:
- "{{{{ inventory_dir }}}}/host_vars/{{{{ inventory_hostname }}}}/{{{{ role_name }}}}/.files/{{{{ {var_prefix}_iptables_v6_save }}}}"
- "{{{{ role_path }}}}/files/{{{{ {var_prefix}_iptables_v6_save }}}}"
ansible.builtin.copy:
src: "{{{{ lookup('ansible.builtin.first_found', _enroll_ff) }}}}"
dest: /etc/enroll/firewall/iptables.v6
owner: root
group: root
mode: "0600"
when: ({var_prefix}_iptables_v6_save | default('') | length) > 0
- name: Restore captured IPv6 iptables rules
ansible.builtin.command:
cmd: ip6tables-restore /etc/enroll/firewall/iptables.v6
register: _enroll_iptables_v6_restore
changed_when: _enroll_iptables_v6_restore.rc == 0
when:
- ({var_prefix}_iptables_v6_save | default('') | length) > 0
- {var_prefix}_restore_iptables | default(true) | bool
"""
def _prepare_bundle_dir( def _prepare_bundle_dir(
bundle: str, bundle: str,
*, *,
@ -746,6 +837,7 @@ def _manifest_from_bundle_dir(
users_snapshot: Dict[str, Any] = roles.get("users", {}) users_snapshot: Dict[str, Any] = roles.get("users", {})
apt_config_snapshot: Dict[str, Any] = roles.get("apt_config", {}) apt_config_snapshot: Dict[str, Any] = roles.get("apt_config", {})
dnf_config_snapshot: Dict[str, Any] = roles.get("dnf_config", {}) dnf_config_snapshot: Dict[str, Any] = roles.get("dnf_config", {})
firewall_runtime_snapshot: Dict[str, Any] = roles.get("firewall_runtime", {})
etc_custom_snapshot: Dict[str, Any] = roles.get("etc_custom", {}) etc_custom_snapshot: Dict[str, Any] = roles.get("etc_custom", {})
usr_local_custom_snapshot: Dict[str, Any] = roles.get("usr_local_custom", {}) usr_local_custom_snapshot: Dict[str, Any] = roles.get("usr_local_custom", {})
extra_paths_snapshot: Dict[str, Any] = roles.get("extra_paths", {}) extra_paths_snapshot: Dict[str, Any] = roles.get("extra_paths", {})
@ -782,6 +874,7 @@ def _manifest_from_bundle_dir(
manifested_users_roles: List[str] = [] manifested_users_roles: List[str] = []
manifested_apt_config_roles: List[str] = [] manifested_apt_config_roles: List[str] = []
manifested_dnf_config_roles: List[str] = [] manifested_dnf_config_roles: List[str] = []
manifested_firewall_runtime_roles: List[str] = []
manifested_etc_custom_roles: List[str] = [] manifested_etc_custom_roles: List[str] = []
manifested_usr_local_custom_roles: List[str] = [] manifested_usr_local_custom_roles: List[str] = []
manifested_extra_paths_roles: List[str] = [] manifested_extra_paths_roles: List[str] = []
@ -1332,6 +1425,104 @@ DNF/YUM configuration harvested from the system (repos, config files, and RPM GP
manifested_dnf_config_roles.append(role) manifested_dnf_config_roles.append(role)
# -------------------------
# firewall_runtime role (live ipset/iptables kernel state)
# -------------------------
if firewall_runtime_snapshot and (
firewall_runtime_snapshot.get("ipset_save")
or firewall_runtime_snapshot.get("iptables_v4_save")
or firewall_runtime_snapshot.get("iptables_v6_save")
):
role = firewall_runtime_snapshot.get("role_name", "firewall_runtime")
role_dir = os.path.join(roles_root, role)
_write_role_scaffold(role_dir)
var_prefix = role
packages = firewall_runtime_snapshot.get("packages", []) or []
ipset_save = firewall_runtime_snapshot.get("ipset_save") or ""
ipset_sets = firewall_runtime_snapshot.get("ipset_sets", []) or []
iptables_v4_save = firewall_runtime_snapshot.get("iptables_v4_save") or ""
iptables_v6_save = firewall_runtime_snapshot.get("iptables_v6_save") or ""
notes = firewall_runtime_snapshot.get("notes", []) or []
# Generated firewall snapshots are host-specific in site mode.
if site_mode:
_copy_artifacts(
bundle_dir,
role,
_host_role_files_dir(out_dir, fqdn or "", role),
)
else:
_copy_artifacts(bundle_dir, role, os.path.join(role_dir, "files"))
vars_map: Dict[str, Any] = {
f"{var_prefix}_packages": packages,
f"{var_prefix}_ipset_save": ipset_save,
f"{var_prefix}_ipset_sets": ipset_sets,
f"{var_prefix}_iptables_v4_save": iptables_v4_save,
f"{var_prefix}_iptables_v6_save": iptables_v6_save,
f"{var_prefix}_sync_ipsets_exact": True,
f"{var_prefix}_restore_iptables": True,
}
if site_mode:
_write_role_defaults(
role_dir,
{
f"{var_prefix}_packages": [],
f"{var_prefix}_ipset_save": "",
f"{var_prefix}_ipset_sets": [],
f"{var_prefix}_iptables_v4_save": "",
f"{var_prefix}_iptables_v6_save": "",
f"{var_prefix}_sync_ipsets_exact": True,
f"{var_prefix}_restore_iptables": True,
},
)
_write_hostvars(out_dir, fqdn or "", role, vars_map)
else:
_write_role_defaults(role_dir, vars_map)
tasks = (
"---\n"
+ _render_install_packages_tasks(role, var_prefix)
+ _render_firewall_runtime_tasks(var_prefix)
)
with open(
os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8"
) as f:
f.write(tasks.rstrip() + "\n")
with open(
os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8"
) as f:
f.write("---\ndependencies: []\n")
readme = f"""# {role}
Generated from live firewall runtime state captured during harvest.
This role restores live ipset and iptables state only for firewall families where Enroll did not find corresponding persistent configuration on the source host. Static firewall configuration files, such as `/etc/iptables/rules.v4`, `/etc/iptables/rules.v6`, UFW, nftables, firewalld, or `/etc/ipset*`, are harvested separately as managed files and treated as authoritative for their respective family.
## Captured snapshots
- ipset: {ipset_save or "(none)"}
- iptables IPv4: {iptables_v4_save or "(none)"}
- iptables IPv6: {iptables_v6_save or "(none)"}
## Captured ipsets
{os.linesep.join("- " + x for x in ipset_sets) or "- (none)"}
## Notes
{os.linesep.join("- " + n for n in notes) or "- (none)"}
## Safety notes
- `firewall_runtime_sync_ipsets_exact` defaults to `true`; it flushes captured set members before replaying the saved members so stale entries are removed. This applies only when no persistent ipset config was found.
- `firewall_runtime_restore_iptables` defaults to `true`; `iptables-restore`/`ip6tables-restore` replace only captured families. A family is captured only when no corresponding persistent iptables config was found.
"""
with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f:
f.write(readme)
manifested_firewall_runtime_roles.append(role)
# ------------------------- # -------------------------
# etc_custom role (unowned /etc not already attributed) # etc_custom role (unowned /etc not already attributed)
# ------------------------- # -------------------------
@ -2012,6 +2203,7 @@ Generated for package `{pkg}`.
+ manifested_extra_paths_roles + manifested_extra_paths_roles
+ manifested_users_roles + manifested_users_roles
+ tail_roles + tail_roles
+ manifested_firewall_runtime_roles
) )
if site_mode: if site_mode:

View file

@ -315,6 +315,23 @@
"ref" "ref"
], ],
"type": "object" "type": "object"
},
{
"additionalProperties": false,
"properties": {
"kind": {
"const": "firewall_runtime"
},
"ref": {
"minLength": 1,
"type": "string"
}
},
"required": [
"kind",
"ref"
],
"type": "object"
} }
] ]
}, },
@ -579,6 +596,62 @@
} }
], ],
"unevaluatedProperties": false "unevaluatedProperties": false
},
"FirewallRuntimeSnapshot": {
"additionalProperties": false,
"properties": {
"role_name": {
"const": "firewall_runtime"
},
"packages": {
"items": {
"minLength": 1,
"type": "string"
},
"type": "array"
},
"ipset_save": {
"type": [
"string",
"null"
]
},
"ipset_sets": {
"items": {
"minLength": 1,
"type": "string"
},
"type": "array"
},
"iptables_v4_save": {
"type": [
"string",
"null"
]
},
"iptables_v6_save": {
"type": [
"string",
"null"
]
},
"notes": {
"items": {
"type": "string"
},
"type": "array"
}
},
"required": [
"role_name",
"packages",
"ipset_save",
"ipset_sets",
"iptables_v4_save",
"iptables_v6_save",
"notes"
],
"type": "object"
} }
}, },
"$id": "https://enroll.sh/schema/state.schema.json", "$id": "https://enroll.sh/schema/state.schema.json",
@ -686,6 +759,9 @@
}, },
"usr_local_custom": { "usr_local_custom": {
"$ref": "#/$defs/UsrLocalCustomSnapshot" "$ref": "#/$defs/UsrLocalCustomSnapshot"
},
"firewall_runtime": {
"$ref": "#/$defs/FirewallRuntimeSnapshot"
} }
}, },
"required": [ "required": [

View file

@ -197,6 +197,37 @@ def validate_harvest(
f"artifact is not a file for role {role_name}: artifacts/{role_name}/{src_rel}" f"artifact is not a file for role {role_name}: artifacts/{role_name}/{src_rel}"
) )
# Runtime firewall snapshots are generated artifacts rather than managed files.
fw = (state.get("roles") or {}).get("firewall_runtime") or {}
if isinstance(fw, dict):
for key in ("ipset_save", "iptables_v4_save", "iptables_v6_save"):
src_rel = str(fw.get(key) or "")
if not src_rel:
continue
if src_rel.startswith("/") or ".." in src_rel.split("/"):
errors.append(
f"firewall_runtime {key} has suspicious src_rel: {src_rel!r}"
)
continue
referenced.add(
(str(fw.get("role_name") or "firewall_runtime"), src_rel)
)
p = (
artifacts_dir
/ str(fw.get("role_name") or "firewall_runtime")
/ src_rel
)
if not p.exists():
errors.append(
"missing firewall runtime artifact: "
f"artifacts/{fw.get('role_name') or 'firewall_runtime'}/{src_rel}"
)
elif not p.is_file():
errors.append(
"firewall runtime artifact is not a file: "
f"artifacts/{fw.get('role_name') or 'firewall_runtime'}/{src_rel}"
)
# Warn if there are extra files in artifacts not referenced. # Warn if there are extra files in artifacts not referenced.
if artifacts_dir.exists() and artifacts_dir.is_dir(): if artifacts_dir.exists() and artifacts_dir.is_dir():
for fp in artifacts_dir.rglob("*"): for fp in artifacts_dir.rglob("*"):

12
poetry.lock generated
View file

@ -562,13 +562,13 @@ test = ["pytest (>=6)"]
[[package]] [[package]]
name = "idna" name = "idna"
version = "3.14" version = "3.15"
description = "Internationalized Domain Names in Applications (IDNA)" description = "Internationalized Domain Names in Applications (IDNA)"
optional = false optional = false
python-versions = ">=3.8" python-versions = ">=3.8"
files = [ files = [
{file = "idna-3.14-py3-none-any.whl", hash = "sha256:e677eaf072e290f7b725f9acf0b3a2bd55f9fd6f7c70abe5f0e34823d0accf69"}, {file = "idna-3.15-py3-none-any.whl", hash = "sha256:048adeaf8c2d788c40fee287673ccaa74c24ffd8dcf09ffa555a2fbb59f10ac8"},
{file = "idna-3.14.tar.gz", hash = "sha256:466d810d7a2cc1022bea9b037c39728d51ae7dad40d480fc9b7d7ecf98ba8ee3"}, {file = "idna-3.15.tar.gz", hash = "sha256:ca962446ea538f7092a95e057da437618e886f4d349216d2b1e294abfdb65fdc"},
] ]
[package.extras] [package.extras]
@ -897,13 +897,13 @@ typing-extensions = {version = ">=4.4.0", markers = "python_version < \"3.13\""}
[[package]] [[package]]
name = "requests" name = "requests"
version = "2.34.0" version = "2.34.1"
description = "Python HTTP for Humans." description = "Python HTTP for Humans."
optional = false optional = false
python-versions = ">=3.10" python-versions = ">=3.10"
files = [ files = [
{file = "requests-2.34.0-py3-none-any.whl", hash = "sha256:917520a21b767485ce7c588f4ebb917c436b24a31231b44228715eaeb5a52c60"}, {file = "requests-2.34.1-py3-none-any.whl", hash = "sha256:bf38a3ff993960d3dd819c08862c40b3c703306eb7c744fcd9f4ddbb95b548f0"},
{file = "requests-2.34.0.tar.gz", hash = "sha256:7d62fe92f50eb82c529b0916bb445afa1531a566fc8f35ffdc64446e771b856a"}, {file = "requests-2.34.1.tar.gz", hash = "sha256:0fc5669f2b69704449fe1552360bd2a73a54512dfd03e65529157f1513322beb"},
] ]
[package.dependencies] [package.dependencies]

View file

@ -1,6 +1,6 @@
[tool.poetry] [tool.poetry]
name = "enroll" name = "enroll"
version = "0.5.0" version = "0.6.0"
description = "Enroll a server's running state retrospectively into Ansible" description = "Enroll a server's running state retrospectively into Ansible"
authors = ["Miguel Jacq <mig@mig5.net>"] authors = ["Miguel Jacq <mig@mig5.net>"]
license = "GPL-3.0-or-later" license = "GPL-3.0-or-later"

View file

@ -1,4 +1,4 @@
%global upstream_version 0.5.0 %global upstream_version 0.6.0
Name: enroll Name: enroll
Version: %{upstream_version} Version: %{upstream_version}
@ -43,6 +43,9 @@ Enroll a server's running state retrospectively into Ansible.
%{_bindir}/enroll %{_bindir}/enroll
%changelog %changelog
* Thu May 14 2026 Miguel Jacq <mig@mig5.net> - %{version}-%{release}
- Add support for capturing ipset and iptables configuration files
- Add support for generating ipset and iptables configuration files from runtime, if the former weren't present ('firewall_runtime' role)
* Tue May 12 2026 Miguel Jacq <mig@mig5.net> - %{version}-%{release} * Tue May 12 2026 Miguel Jacq <mig@mig5.net> - %{version}-%{release}
- Add ssh config support where JinjaTurtle is used - Add ssh config support where JinjaTurtle is used
* Tue Feb 16 2026 Miguel Jacq <mig@mig5.net> - %{version}-%{release} * Tue Feb 16 2026 Miguel Jacq <mig@mig5.net> - %{version}-%{release}

View file

@ -168,3 +168,121 @@ def test_iter_system_capture_paths_dedupes_first_reason(monkeypatch):
) )
out = h._iter_system_capture_paths() out = h._iter_system_capture_paths()
assert out == [("/dup", "r1")] assert out == [("/dup", "r1")]
def test_ipset_and_iptables_state_helpers(tmp_path: Path):
ipset_save = """create blocklist hash:ip family inet hashsize 1024 maxelem 65536
add blocklist 203.0.113.10
create nets hash:net family inet
"""
assert h._ipset_save_has_state(ipset_save)
assert h._parse_ipset_set_names(ipset_save) == ["blocklist", "nets"]
assert not h._ipset_save_has_state("# empty\n")
empty_iptables = """*filter
:INPUT ACCEPT [0:0]
:FORWARD ACCEPT [0:0]
:OUTPUT ACCEPT [0:0]
COMMIT
"""
assert not h._iptables_save_has_state(empty_iptables)
native_rule = empty_iptables.replace(
"COMMIT", "-A INPUT -p tcp --dport 22 -j ACCEPT\nCOMMIT"
)
assert h._iptables_save_has_state(native_rule)
changed_policy = empty_iptables.replace(":INPUT ACCEPT", ":INPUT DROP")
assert h._iptables_save_has_state(changed_policy)
def test_collect_firewall_runtime_snapshot_writes_generated_artifacts(
monkeypatch, tmp_path: Path
):
outputs = {
"ipset_save": (
"create blocklist hash:ip family inet\nadd blocklist 203.0.113.10\n",
None,
),
"iptables_v4_save": (
"*filter\n:INPUT DROP [0:0]\n-A INPUT -m set --match-set blocklist src -j DROP\nCOMMIT\n",
None,
),
"iptables_v6_save": ("*filter\n:INPUT ACCEPT [0:0]\nCOMMIT\n", None),
}
def fake_run(command_key, *, timeout=10):
return outputs[command_key]
monkeypatch.setattr(h, "_run_capture_command", fake_run)
snap = h._collect_firewall_runtime_snapshot(str(tmp_path))
assert snap.role_name == "firewall_runtime"
assert snap.packages == ["ipset", "iptables"]
assert snap.ipset_save == "firewall/ipset.save"
assert snap.ipset_sets == ["blocklist"]
assert snap.iptables_v4_save == "firewall/iptables.v4"
assert snap.iptables_v6_save is None
assert (
(tmp_path / "artifacts" / "firewall_runtime" / "firewall" / "ipset.save")
.read_text(encoding="utf-8")
.startswith("create blocklist")
)
assert (
(tmp_path / "artifacts" / "firewall_runtime" / "firewall" / "iptables.v4")
.read_text(encoding="utf-8")
.startswith("*filter")
)
def test_collect_firewall_runtime_snapshot_is_per_family_fallback(
monkeypatch, tmp_path: Path
):
calls = []
outputs = {
"ipset_save": (
"create blocklist hash:ip family inet\nadd blocklist 203.0.113.10\n",
None,
),
"iptables_v4_save": (
"*filter\n:INPUT DROP [0:0]\n-A INPUT -p tcp --dport 22 -j ACCEPT\nCOMMIT\n",
None,
),
"iptables_v6_save": (
"*filter\n:INPUT DROP [0:0]\n-A INPUT -p tcp --dport 22 -j ACCEPT\nCOMMIT\n",
None,
),
}
def fake_run(command_key, *, timeout=10):
calls.append(command_key)
return outputs[command_key]
monkeypatch.setattr(h, "_run_capture_command", fake_run)
snap = h._collect_firewall_runtime_snapshot(
str(tmp_path),
persistent_ipset_files=["/etc/ipset.conf"],
persistent_iptables_v4_files=["/etc/iptables/rules.v4"],
persistent_iptables_v6_files=[],
)
assert "ipset_save" not in calls
assert "iptables_v4_save" not in calls
assert "iptables_v6_save" in calls
assert snap.ipset_save is None
assert snap.iptables_v4_save is None
assert snap.iptables_v6_save == "firewall/iptables.v6"
assert snap.packages == ["iptables"]
assert any("persistent ipset configuration" in note for note in snap.notes)
assert any("persistent IPv4 iptables configuration" in note for note in snap.notes)
assert not (
tmp_path / "artifacts" / "firewall_runtime" / "firewall" / "ipset.save"
).exists()
assert not (
tmp_path / "artifacts" / "firewall_runtime" / "firewall" / "iptables.v4"
).exists()
assert (
tmp_path / "artifacts" / "firewall_runtime" / "firewall" / "iptables.v6"
).exists()

View file

@ -795,3 +795,100 @@ def test_manifest_applies_jinjaturtle_to_jinjifyable_managed_file(
assert not ( assert not (
out_dir / "roles" / "apt_config" / "files" / "etc" / "apt" / "foo.ini" out_dir / "roles" / "apt_config" / "files" / "etc" / "apt" / "foo.ini"
).exists() ).exists()
def test_manifest_writes_firewall_runtime_role(tmp_path: Path):
bundle = tmp_path / "bundle"
out = tmp_path / "ansible"
(bundle / "artifacts" / "firewall_runtime" / "firewall").mkdir(
parents=True, exist_ok=True
)
(bundle / "artifacts" / "firewall_runtime" / "firewall" / "ipset.save").write_text(
"create blocklist hash:ip family inet\nadd blocklist 203.0.113.10\n",
encoding="utf-8",
)
(bundle / "artifacts" / "firewall_runtime" / "firewall" / "iptables.v4").write_text(
"*filter\n:INPUT DROP [0:0]\n-A INPUT -m set --match-set blocklist src -j DROP\nCOMMIT\n",
encoding="utf-8",
)
state = {
"schema_version": 3,
"host": {"hostname": "test", "os": "debian", "pkg_backend": "dpkg"},
"inventory": {"packages": {}},
"roles": {
"users": {
"role_name": "users",
"users": [],
"managed_files": [],
"excluded": [],
"notes": [],
},
"services": [],
"packages": [],
"apt_config": {
"role_name": "apt_config",
"managed_files": [],
"excluded": [],
"notes": [],
},
"dnf_config": {
"role_name": "dnf_config",
"managed_files": [],
"excluded": [],
"notes": [],
},
"firewall_runtime": {
"role_name": "firewall_runtime",
"packages": ["ipset", "iptables"],
"ipset_save": "firewall/ipset.save",
"ipset_sets": ["blocklist"],
"iptables_v4_save": "firewall/iptables.v4",
"iptables_v6_save": None,
"notes": [],
},
"etc_custom": {
"role_name": "etc_custom",
"managed_files": [],
"excluded": [],
"notes": [],
},
"usr_local_custom": {
"role_name": "usr_local_custom",
"managed_files": [],
"excluded": [],
"notes": [],
},
"extra_paths": {
"role_name": "extra_paths",
"include_patterns": [],
"exclude_patterns": [],
"managed_files": [],
"excluded": [],
"notes": [],
},
},
}
(bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8")
manifest.manifest(str(bundle), str(out))
tasks = (out / "roles" / "firewall_runtime" / "tasks" / "main.yml").read_text(
encoding="utf-8"
)
assert "ipset restore -exist" in tasks
assert "iptables-restore /etc/enroll/firewall/iptables.v4" in tasks
assert "ipset flush {{ item }}" in tasks
defaults = (out / "roles" / "firewall_runtime" / "defaults" / "main.yml").read_text(
encoding="utf-8"
)
assert "firewall_runtime_ipset_sets:" in defaults
assert "- blocklist" in defaults
assert "firewall_runtime_restore_iptables: true" in defaults
pb = (out / "playbook.yml").read_text(encoding="utf-8")
assert "role: firewall_runtime" in pb
assert (
out / "roles" / "firewall_runtime" / "files" / "firewall" / "ipset.save"
).exists()