Compare commits

..

No commits in common. "8c6b51be3eb2ea949861937eddcffed74a439873" and "cae6246177581a0cc79e6aa3704298a164a154e3" have entirely different histories.

4 changed files with 67 additions and 475 deletions

View file

@ -184,12 +184,6 @@ def _iter_managed_files(state: Dict[str, Any]) -> Iterable[Tuple[str, Dict[str,
for mf in u.get("managed_files", []) or []: for mf in u.get("managed_files", []) or []:
yield str(u_role), mf yield str(u_role), mf
# apt_config
ac = state.get("apt_config") or {}
ac_role = ac.get("role_name") or "apt_config"
for mf in ac.get("managed_files", []) or []:
yield str(ac_role), mf
# etc_custom # etc_custom
ec = state.get("etc_custom") or {} ec = state.get("etc_custom") or {}
ec_role = ec.get("role_name") or "etc_custom" ec_role = ec.get("role_name") or "etc_custom"

View file

@ -77,14 +77,6 @@ class UsersSnapshot:
notes: List[str] notes: List[str]
@dataclass
class AptConfigSnapshot:
role_name: str
managed_files: List[ManagedFile]
excluded: List[ExcludedFile]
notes: List[str]
@dataclass @dataclass
class EtcCustomSnapshot: class EtcCustomSnapshot:
role_name: str role_name: str
@ -133,8 +125,7 @@ ALLOWED_UNOWNED_EXTS = {
"", # allow extensionless (common in /etc/default and /etc/init.d) "", # allow extensionless (common in /etc/default and /etc/init.d)
} }
MAX_FILES_CAP = 4000 MAX_UNOWNED_FILES_PER_ROLE = 400
MAX_UNOWNED_FILES_PER_ROLE = 500
# Directories that are shared across many packages; never attribute unowned files in these trees to a single package. # Directories that are shared across many packages; never attribute unowned files in these trees to a single package.
SHARED_ETC_TOPDIRS = { SHARED_ETC_TOPDIRS = {
@ -333,7 +324,7 @@ _SYSTEM_CAPTURE_GLOBS: List[tuple[str, str]] = [
] ]
def _iter_matching_files(spec: str, *, cap: int = MAX_FILES_CAP) -> List[str]: def _iter_matching_files(spec: str, *, cap: int = 2000) -> List[str]:
"""Expand a glob spec and also walk directories to collect files.""" """Expand a glob spec and also walk directories to collect files."""
out: List[str] = [] out: List[str] = []
for p in glob.glob(spec): for p in glob.glob(spec):
@ -408,61 +399,30 @@ def _parse_apt_signed_by(source_files: List[str]) -> Set[str]:
return out return out
def _iter_apt_capture_paths() -> List[tuple[str, str]]: def _iter_system_capture_paths() -> List[tuple[str, str]]:
"""Return (path, reason) pairs for APT configuration. """Return (path, reason) pairs for essential system config/state."""
out: List[tuple[str, str]] = []
This captures the full /etc/apt tree (subject to IgnorePolicy at copy time), # APT: capture sources and related config
plus any keyrings referenced via signed-by/Signed-By which may live outside
/etc (e.g. /usr/share/keyrings).
"""
reasons: Dict[str, str] = {}
# Capture all regular files under /etc/apt (no symlinks).
if os.path.isdir("/etc/apt"):
for dirpath, _, filenames in os.walk("/etc/apt"):
for fn in filenames:
p = os.path.join(dirpath, fn)
if os.path.islink(p) or not os.path.isfile(p):
continue
reasons.setdefault(p, "apt_config")
# Identify source files explicitly for nicer reasons and keyring discovery.
apt_sources: List[str] = [] apt_sources: List[str] = []
for g in _APT_SOURCE_GLOBS: for g in _APT_SOURCE_GLOBS:
apt_sources.extend(_iter_matching_files(g)) apt_sources.extend(_iter_matching_files(g))
for p in sorted(set(apt_sources)): for p in sorted(set(apt_sources)):
reasons[p] = "apt_source" out.append((p, "system_apt_sources"))
# Keyrings in standard locations. # APT: misc config files/dirs
for g in ( for g in _APT_MISC_GLOBS:
"/etc/apt/trusted.gpg",
"/etc/apt/trusted.gpg.d/*",
"/etc/apt/keyrings/*",
):
for p in _iter_matching_files(g): for p in _iter_matching_files(g):
reasons[p] = "apt_keyring" out.append((p, "system_apt_config"))
# Keyrings referenced by sources (may live outside /etc/apt). # APT: referenced keyrings (may live outside /etc)
signed_by = _parse_apt_signed_by(sorted(set(apt_sources))) signed_by = _parse_apt_signed_by(sorted(set(apt_sources)))
for p in sorted(signed_by): for p in sorted(signed_by):
if os.path.islink(p) or not os.path.isfile(p): if os.path.islink(p) or not os.path.isfile(p):
continue continue
if p.startswith("/etc/apt/"): out.append((p, "system_apt_keyring"))
reasons[p] = "apt_keyring"
else:
reasons[p] = "apt_signed_by_keyring"
# De-dup with stable ordering.
uniq: List[tuple[str, str]] = []
for p in sorted(reasons.keys()):
uniq.append((p, reasons[p]))
return uniq
def _iter_system_capture_paths() -> List[tuple[str, str]]:
"""Return (path, reason) pairs for essential system config/state (non-APT)."""
out: List[tuple[str, str]] = []
# Other system config/state globs
for spec, reason in _SYSTEM_CAPTURE_GLOBS: for spec, reason in _SYSTEM_CAPTURE_GLOBS:
for p in _iter_matching_files(spec): for p in _iter_matching_files(spec):
out.append((p, reason)) out.append((p, reason))
@ -582,8 +542,6 @@ def harvest(
for path in pkg_to_etc_paths.get(pkg, []): for path in pkg_to_etc_paths.get(pkg, []):
if not os.path.isfile(path) or os.path.islink(path): if not os.path.isfile(path) or os.path.islink(path):
continue continue
if path.startswith("/etc/apt/"):
continue
if path in conff: if path in conff:
# Only capture conffiles when they differ from the package default. # Only capture conffiles when they differ from the package default.
try: try:
@ -824,8 +782,6 @@ def harvest(
for path in pkg_to_etc_paths.get(pkg, []): for path in pkg_to_etc_paths.get(pkg, []):
if not os.path.isfile(path) or os.path.islink(path): if not os.path.isfile(path) or os.path.islink(path):
continue continue
if path.startswith("/etc/apt/"):
continue
if path in conff: if path in conff:
try: try:
current = file_md5(path) current = file_md5(path)
@ -988,55 +944,6 @@ def harvest(
notes=users_notes, notes=users_notes,
) )
# -------------------------
# apt_config role (APT configuration and keyrings)
# -------------------------
apt_notes: List[str] = []
apt_excluded: List[ExcludedFile] = []
apt_managed: List[ManagedFile] = []
apt_role_name = "apt_config"
for path, reason in _iter_apt_capture_paths():
if path_filter.is_excluded(path):
apt_excluded.append(ExcludedFile(path=path, reason="user_excluded"))
continue
deny = policy.deny_reason(path)
if deny:
apt_excluded.append(ExcludedFile(path=path, reason=deny))
continue
try:
owner, group, mode = stat_triplet(path)
except OSError:
apt_excluded.append(ExcludedFile(path=path, reason="unreadable"))
continue
src_rel = path.lstrip("/")
try:
_copy_into_bundle(bundle_dir, apt_role_name, path, src_rel)
except OSError:
apt_excluded.append(ExcludedFile(path=path, reason="unreadable"))
continue
apt_managed.append(
ManagedFile(
path=path,
src_rel=src_rel,
owner=owner,
group=group,
mode=mode,
reason=reason,
)
)
apt_config_snapshot = AptConfigSnapshot(
role_name=apt_role_name,
managed_files=apt_managed,
excluded=apt_excluded,
notes=apt_notes,
)
# ------------------------- # -------------------------
# etc_custom role (unowned /etc files not already attributed elsewhere) # etc_custom role (unowned /etc files not already attributed elsewhere)
# ------------------------- # -------------------------
@ -1055,144 +962,44 @@ def harvest(
already.add(mf.path) already.add(mf.path)
for mf in users_managed: for mf in users_managed:
already.add(mf.path) already.add(mf.path)
for mf in apt_managed:
already.add(mf.path)
# Maps for re-attributing shared snippets (cron.d/logrotate.d) to existing roles.
svc_by_role: Dict[str, ServiceSnapshot] = {s.role_name: s for s in service_snaps}
pkg_by_role: Dict[str, PackageSnapshot] = {p.role_name: p for p in pkg_snaps}
def _target_role_for_shared_snippet(path: str) -> Optional[tuple[str, str]]:
"""If `path` is a shared snippet, return (role_name, reason) to attach to."""
base = os.path.basename(path)
# Try full filename and stem (before first dot).
candidates: List[str] = [base]
if "." in base:
candidates.append(base.split(".", 1)[0])
seen: Set[str] = set()
uniq: List[str] = []
for c in candidates:
if c and c not in seen:
seen.add(c)
uniq.append(c)
if path.startswith("/etc/logrotate.d/"):
for c in uniq:
rn = _safe_name(c)
if rn in svc_by_role or rn in pkg_by_role:
return (rn, "logrotate_snippet")
return None
if path.startswith("/etc/cron.d/"):
for c in uniq:
rn = _safe_name(c)
if rn in svc_by_role or rn in pkg_by_role:
return (rn, "cron_snippet")
return None
return None
# Capture essential system config/state (even if package-owned). # Capture essential system config/state (even if package-owned).
for path, reason in _iter_system_capture_paths(): for path, reason in _iter_system_capture_paths():
if path in already: if path in already:
continue continue
target = _target_role_for_shared_snippet(path)
if path_filter.is_excluded(path): if path_filter.is_excluded(path):
if target:
rn, _ = target
if rn in svc_by_role:
svc_by_role[rn].excluded.append(
ExcludedFile(path=path, reason="user_excluded")
)
elif rn in pkg_by_role:
pkg_by_role[rn].excluded.append(
ExcludedFile(path=path, reason="user_excluded")
)
else:
etc_excluded.append(ExcludedFile(path=path, reason="user_excluded")) etc_excluded.append(ExcludedFile(path=path, reason="user_excluded"))
already.add(path)
continue continue
deny = policy.deny_reason(path) deny = policy.deny_reason(path)
if deny: if deny:
if target:
rn, _ = target
if rn in svc_by_role:
svc_by_role[rn].excluded.append(
ExcludedFile(path=path, reason=deny)
)
elif rn in pkg_by_role:
pkg_by_role[rn].excluded.append(
ExcludedFile(path=path, reason=deny)
)
else:
etc_excluded.append(ExcludedFile(path=path, reason=deny)) etc_excluded.append(ExcludedFile(path=path, reason=deny))
already.add(path)
continue continue
try: try:
owner, group, mode = stat_triplet(path) owner, group, mode = stat_triplet(path)
except OSError: except OSError:
if target:
rn, _ = target
if rn in svc_by_role:
svc_by_role[rn].excluded.append(
ExcludedFile(path=path, reason="unreadable")
)
elif rn in pkg_by_role:
pkg_by_role[rn].excluded.append(
ExcludedFile(path=path, reason="unreadable")
)
else:
etc_excluded.append(ExcludedFile(path=path, reason="unreadable")) etc_excluded.append(ExcludedFile(path=path, reason="unreadable"))
already.add(path)
continue continue
src_rel = path.lstrip("/") src_rel = path.lstrip("/")
role_for_copy = etc_role_name
reason_for_role = reason
if target:
role_for_copy, reason_for_role = target
try: try:
_copy_into_bundle(bundle_dir, role_for_copy, path, src_rel) _copy_into_bundle(bundle_dir, etc_role_name, path, src_rel)
except OSError: except OSError:
if target:
rn, _ = target
if rn in svc_by_role:
svc_by_role[rn].excluded.append(
ExcludedFile(path=path, reason="unreadable")
)
elif rn in pkg_by_role:
pkg_by_role[rn].excluded.append(
ExcludedFile(path=path, reason="unreadable")
)
else:
etc_excluded.append(ExcludedFile(path=path, reason="unreadable")) etc_excluded.append(ExcludedFile(path=path, reason="unreadable"))
already.add(path)
continue continue
mf = ManagedFile( etc_managed.append(
ManagedFile(
path=path, path=path,
src_rel=src_rel, src_rel=src_rel,
owner=owner, owner=owner,
group=group, group=group,
mode=mode, mode=mode,
reason=reason_for_role, reason=reason,
)
) )
if target:
rn, _ = target
if rn in svc_by_role:
svc_by_role[rn].managed_files.append(mf)
elif rn in pkg_by_role:
pkg_by_role[rn].managed_files.append(mf)
else:
etc_managed.append(mf)
already.add(path) already.add(path)
# Walk /etc for remaining unowned config-ish files # Walk /etc for remaining unowned config-ish files
@ -1200,8 +1007,6 @@ def harvest(
for dirpath, _, filenames in os.walk("/etc"): for dirpath, _, filenames in os.walk("/etc"):
for fn in filenames: for fn in filenames:
path = os.path.join(dirpath, fn) path = os.path.join(dirpath, fn)
if path.startswith("/etc/apt/"):
continue
if path in already: if path in already:
continue continue
if path in owned_etc: if path in owned_etc:
@ -1211,106 +1016,45 @@ def harvest(
if not _is_confish(path): if not _is_confish(path):
continue continue
target = _target_role_for_shared_snippet(path)
if path_filter.is_excluded(path): if path_filter.is_excluded(path):
if target:
rn, _ = target
if rn in svc_by_role:
svc_by_role[rn].excluded.append(
ExcludedFile(path=path, reason="user_excluded")
)
elif rn in pkg_by_role:
pkg_by_role[rn].excluded.append(
ExcludedFile(path=path, reason="user_excluded")
)
else:
etc_excluded.append(ExcludedFile(path=path, reason="user_excluded")) etc_excluded.append(ExcludedFile(path=path, reason="user_excluded"))
already.add(path)
continue continue
deny = policy.deny_reason(path) deny = policy.deny_reason(path)
if deny: if deny:
if target:
rn, _ = target
if rn in svc_by_role:
svc_by_role[rn].excluded.append(
ExcludedFile(path=path, reason=deny)
)
elif rn in pkg_by_role:
pkg_by_role[rn].excluded.append(
ExcludedFile(path=path, reason=deny)
)
else:
etc_excluded.append(ExcludedFile(path=path, reason=deny)) etc_excluded.append(ExcludedFile(path=path, reason=deny))
already.add(path)
continue continue
try: try:
owner, group, mode = stat_triplet(path) owner, group, mode = stat_triplet(path)
except OSError: except OSError:
if target:
rn, _ = target
if rn in svc_by_role:
svc_by_role[rn].excluded.append(
ExcludedFile(path=path, reason="unreadable")
)
elif rn in pkg_by_role:
pkg_by_role[rn].excluded.append(
ExcludedFile(path=path, reason="unreadable")
)
else:
etc_excluded.append(ExcludedFile(path=path, reason="unreadable")) etc_excluded.append(ExcludedFile(path=path, reason="unreadable"))
already.add(path)
continue continue
src_rel = path.lstrip("/") src_rel = path.lstrip("/")
role_for_copy = etc_role_name
reason_for_role = "custom_unowned"
if target:
role_for_copy, reason_for_role = target
try: try:
_copy_into_bundle(bundle_dir, role_for_copy, path, src_rel) _copy_into_bundle(bundle_dir, etc_role_name, path, src_rel)
except OSError: except OSError:
if target:
rn, _ = target
if rn in svc_by_role:
svc_by_role[rn].excluded.append(
ExcludedFile(path=path, reason="unreadable")
)
elif rn in pkg_by_role:
pkg_by_role[rn].excluded.append(
ExcludedFile(path=path, reason="unreadable")
)
else:
etc_excluded.append(ExcludedFile(path=path, reason="unreadable")) etc_excluded.append(ExcludedFile(path=path, reason="unreadable"))
already.add(path)
continue continue
mf = ManagedFile( etc_managed.append(
ManagedFile(
path=path, path=path,
src_rel=src_rel, src_rel=src_rel,
owner=owner, owner=owner,
group=group, group=group,
mode=mode, mode=mode,
reason=reason_for_role, reason="custom_unowned",
)
) )
if target:
rn, _ = target
if rn in svc_by_role:
svc_by_role[rn].managed_files.append(mf)
elif rn in pkg_by_role:
pkg_by_role[rn].managed_files.append(mf)
else:
etc_managed.append(mf)
scanned += 1 scanned += 1
if scanned >= MAX_FILES_CAP: if scanned >= 2000:
etc_notes.append( etc_notes.append(
f"Reached file cap ({MAX_FILES_CAP}) while scanning /etc for unowned files." "Reached file cap (2000) while scanning /etc for unowned files."
) )
break break
if scanned >= MAX_FILES_CAP: if scanned >= 2000:
break break
etc_custom_snapshot = EtcCustomSnapshot( etc_custom_snapshot = EtcCustomSnapshot(
@ -1402,7 +1146,7 @@ def harvest(
_scan_usr_local_tree( _scan_usr_local_tree(
"/usr/local/etc", "/usr/local/etc",
require_executable=False, require_executable=False,
cap=MAX_FILES_CAP, cap=2000,
reason="usr_local_etc_custom", reason="usr_local_etc_custom",
) )
@ -1410,7 +1154,7 @@ def harvest(
_scan_usr_local_tree( _scan_usr_local_tree(
"/usr/local/bin", "/usr/local/bin",
require_executable=True, require_executable=True,
cap=MAX_FILES_CAP, cap=2000,
reason="usr_local_bin_script", reason="usr_local_bin_script",
) )
@ -1444,7 +1188,7 @@ def harvest(
files, inc_notes = expand_includes( files, inc_notes = expand_includes(
path_filter.iter_include_patterns(), path_filter.iter_include_patterns(),
exclude=path_filter, exclude=path_filter,
max_files=MAX_FILES_CAP, max_files=4000,
) )
included_files = files included_files = files
extra_notes.extend(inc_notes) extra_notes.extend(inc_notes)
@ -1503,7 +1247,6 @@ def harvest(
"manual_packages": manual_pkgs, "manual_packages": manual_pkgs,
"manual_packages_skipped": manual_pkgs_skipped, "manual_packages_skipped": manual_pkgs_skipped,
"package_roles": [asdict(p) for p in pkg_snaps], "package_roles": [asdict(p) for p in pkg_snaps],
"apt_config": asdict(apt_config_snapshot),
"etc_custom": asdict(etc_custom_snapshot), "etc_custom": asdict(etc_custom_snapshot),
"usr_local_custom": asdict(usr_local_custom_snapshot), "usr_local_custom": asdict(usr_local_custom_snapshot),
"extra_paths": asdict(extra_paths_snapshot), "extra_paths": asdict(extra_paths_snapshot),

View file

@ -2,7 +2,6 @@ from __future__ import annotations
import json import json
import os import os
import re
import shutil import shutil
import stat import stat
import tarfile import tarfile
@ -139,6 +138,7 @@ def _copy_artifacts(
# If a file was successfully templatised by JinjaTurtle, do NOT # If a file was successfully templatised by JinjaTurtle, do NOT
# also materialise the raw copy in the destination files dir. # also materialise the raw copy in the destination files dir.
# (This keeps the output minimal and avoids redundant "raw" files.)
if exclude_rels and rel in exclude_rels: if exclude_rels and rel in exclude_rels:
try: try:
if os.path.isfile(dst): if os.path.isfile(dst):
@ -165,7 +165,7 @@ def _write_role_scaffold(role_dir: str) -> None:
def _write_playbook_all(path: str, roles: List[str]) -> None: def _write_playbook_all(path: str, roles: List[str]) -> None:
pb_lines = [ pb_lines = [
"---", "---",
"- name: Apply all roles on all hosts", "- name: Apply all roles on host",
" hosts: all", " hosts: all",
" become: true", " become: true",
" roles:", " roles:",
@ -179,7 +179,7 @@ def _write_playbook_all(path: str, roles: List[str]) -> None:
def _write_playbook_host(path: str, fqdn: str, roles: List[str]) -> None: def _write_playbook_host(path: str, fqdn: str, roles: List[str]) -> None:
pb_lines = [ pb_lines = [
"---", "---",
f"- name: Apply all roles on {fqdn}", f"- name: Apply enroll roles on {fqdn}",
f" hosts: {fqdn}", f" hosts: {fqdn}",
" become: true", " become: true",
" roles:", " roles:",
@ -390,9 +390,9 @@ def _render_generic_files_tasks(
# Using first_found makes roles work in both modes: # Using first_found makes roles work in both modes:
# - site-mode: inventory/host_vars/<host>/<role>/.files/... # - site-mode: inventory/host_vars/<host>/<role>/.files/...
# - non-site: roles/<role>/files/... # - non-site: roles/<role>/files/...
return f"""# Generated by enroll return f"""# Generated by enroll (data-driven tasks)
- name: Deploy any systemd unit files (templates) - name: Deploy systemd unit files (templates)
ansible.builtin.template: ansible.builtin.template:
src: "{{{{ item.src_rel }}}}.j2" src: "{{{{ item.src_rel }}}}.j2"
dest: "{{{{ item.dest }}}}" dest: "{{{{ item.dest }}}}"
@ -406,7 +406,7 @@ def _render_generic_files_tasks(
| list }}}} | list }}}}
notify: "{{{{ item.notify | default([]) }}}}" notify: "{{{{ item.notify | default([]) }}}}"
- name: Deploy any systemd unit files (raw files) - name: Deploy systemd unit files (copies)
vars: vars:
_enroll_ff: _enroll_ff:
files: files:
@ -433,7 +433,7 @@ def _render_generic_files_tasks(
| list | list
| length) > 0 | length) > 0
- name: Deploy any other managed files (templates) - name: Deploy other managed files (templates)
ansible.builtin.template: ansible.builtin.template:
src: "{{{{ item.src_rel }}}}.j2" src: "{{{{ item.src_rel }}}}.j2"
dest: "{{{{ item.dest }}}}" dest: "{{{{ item.dest }}}}"
@ -447,7 +447,7 @@ def _render_generic_files_tasks(
| list }}}} | list }}}}
notify: "{{{{ item.notify | default([]) }}}}" notify: "{{{{ item.notify | default([]) }}}}"
- name: Deploy any other managed files (raw files) - name: Deploy other managed files (copies)
vars: vars:
_enroll_ff: _enroll_ff:
files: files:
@ -628,7 +628,6 @@ def _manifest_from_bundle_dir(
services: List[Dict[str, Any]] = state.get("services", []) services: List[Dict[str, Any]] = state.get("services", [])
package_roles: List[Dict[str, Any]] = state.get("package_roles", []) package_roles: List[Dict[str, Any]] = state.get("package_roles", [])
users_snapshot: Dict[str, Any] = state.get("users", {}) users_snapshot: Dict[str, Any] = state.get("users", {})
apt_config_snapshot: Dict[str, Any] = state.get("apt_config", {})
etc_custom_snapshot: Dict[str, Any] = state.get("etc_custom", {}) etc_custom_snapshot: Dict[str, Any] = state.get("etc_custom", {})
usr_local_custom_snapshot: Dict[str, Any] = state.get("usr_local_custom", {}) usr_local_custom_snapshot: Dict[str, Any] = state.get("usr_local_custom", {})
extra_paths_snapshot: Dict[str, Any] = state.get("extra_paths", {}) extra_paths_snapshot: Dict[str, Any] = state.get("extra_paths", {})
@ -663,13 +662,17 @@ def _manifest_from_bundle_dir(
_ensure_ansible_cfg(os.path.join(out_dir, "ansible.cfg")) _ensure_ansible_cfg(os.path.join(out_dir, "ansible.cfg"))
manifested_users_roles: List[str] = [] manifested_users_roles: List[str] = []
manifested_apt_config_roles: List[str] = []
manifested_etc_custom_roles: List[str] = [] manifested_etc_custom_roles: List[str] = []
manifested_usr_local_custom_roles: List[str] = [] manifested_usr_local_custom_roles: List[str] = []
manifested_extra_paths_roles: List[str] = [] manifested_extra_paths_roles: List[str] = []
manifested_service_roles: List[str] = [] manifested_service_roles: List[str] = []
manifested_pkg_roles: List[str] = [] manifested_pkg_roles: List[str] = []
# In site_mode, raw harvested files are stored under host-specific inventory
# to avoid cross-host clobber while still sharing a role definition.
# -------------------------
# ------------------------- # -------------------------
# Users role (non-system users) # Users role (non-system users)
# ------------------------- # -------------------------
@ -790,7 +793,7 @@ def _manifest_from_bundle_dir(
# tasks (data-driven) # tasks (data-driven)
users_tasks = """--- users_tasks = """---
# Generated by enroll # Generated by enroll (data-driven tasks)
- name: Ensure groups exist - name: Ensure groups exist
ansible.builtin.group: ansible.builtin.group:
@ -891,155 +894,6 @@ Generated non-system user accounts and SSH public material.
manifested_users_roles.append(role) manifested_users_roles.append(role)
# ------------------------- # -------------------------
# apt_config role (APT sources, pinning, and keyrings)
# -------------------------
if apt_config_snapshot and apt_config_snapshot.get("managed_files"):
role = apt_config_snapshot.get("role_name", "apt_config")
role_dir = os.path.join(roles_root, role)
_write_role_scaffold(role_dir)
var_prefix = role
managed_files = apt_config_snapshot.get("managed_files", [])
excluded = apt_config_snapshot.get("excluded", [])
notes = apt_config_snapshot.get("notes", [])
templated, jt_vars = _jinjify_managed_files(
bundle_dir,
role,
role_dir,
managed_files,
jt_exe=jt_exe,
jt_enabled=jt_enabled,
overwrite_templates=not site_mode,
)
# Copy only the non-templated artifacts (templates live in the role).
if site_mode:
_copy_artifacts(
bundle_dir,
role,
_host_role_files_dir(out_dir, fqdn or "", role),
exclude_rels=templated,
)
else:
_copy_artifacts(
bundle_dir,
role,
os.path.join(role_dir, "files"),
exclude_rels=templated,
)
files_var = _build_managed_files_var(
managed_files,
templated,
notify_other=None,
notify_systemd=None,
)
jt_map = _yaml_load_mapping(jt_vars) if jt_vars.strip() else {}
vars_map: Dict[str, Any] = {f"{var_prefix}_managed_files": files_var}
vars_map = _merge_mappings_overwrite(vars_map, jt_map)
if site_mode:
_write_role_defaults(role_dir, {f"{var_prefix}_managed_files": []})
_write_hostvars(out_dir, fqdn or "", role, vars_map)
else:
_write_role_defaults(role_dir, vars_map)
tasks = """---\n""" + _render_generic_files_tasks(
var_prefix, include_restart_notify=False
)
with open(
os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8"
) as f:
f.write(tasks.rstrip() + "\n")
with open(
os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8"
) as f:
f.write("---\ndependencies: []\n")
# README: summarise repos and keyrings
source_paths: List[str] = []
keyring_paths: List[str] = []
repo_hosts: Set[str] = set()
url_re = re.compile(r"(?:https?|ftp)://([^/\s]+)", re.IGNORECASE)
for mf in managed_files:
p = str(mf.get("path") or "")
src_rel = str(mf.get("src_rel") or "")
if not p or not src_rel:
continue
if p == "/etc/apt/sources.list" or p.startswith("/etc/apt/sources.list.d/"):
source_paths.append(p)
art_path = os.path.join(bundle_dir, "artifacts", role, src_rel)
try:
with open(art_path, "r", encoding="utf-8", errors="replace") as sf:
for line in sf:
line = line.strip()
if not line or line.startswith("#"):
continue
for m in url_re.finditer(line):
repo_hosts.add(m.group(1))
except OSError:
pass # nosec
if (
p.startswith("/etc/apt/trusted.gpg")
or p.startswith("/etc/apt/keyrings/")
or p.startswith("/usr/share/keyrings/")
):
keyring_paths.append(p)
source_paths = sorted(set(source_paths))
keyring_paths = sorted(set(keyring_paths))
repos = sorted(repo_hosts)
readme = (
"""# apt_config
APT configuration harvested from the system (sources, pinning, and keyrings).
## Repository hosts
"""
+ ("\n".join([f"- {h}" for h in repos]) or "- (none)")
+ """\n
## Source files
"""
+ ("\n".join([f"- {p}" for p in source_paths]) or "- (none)")
+ """\n
## Keyrings
"""
+ ("\n".join([f"- {p}" for p in keyring_paths]) or "- (none)")
+ """\n
## Managed files
"""
+ (
"\n".join(
[f"- {mf.get('path')} ({mf.get('reason')})" for mf in managed_files]
)
or "- (none)"
)
+ """\n
## Excluded
"""
+ (
"\n".join([f"- {e.get('path')} ({e.get('reason')})" for e in excluded])
or "- (none)"
)
+ """\n
## Notes
"""
+ ("\n".join([f"- {n}" for n in notes]) or "- (none)")
+ """\n"""
)
with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f:
f.write(readme)
manifested_apt_config_roles.append(role)
# ------------------------- # -------------------------
# etc_custom role (unowned /etc not already attributed) # etc_custom role (unowned /etc not already attributed)
@ -1358,6 +1212,8 @@ User-requested extra file harvesting.
manifested_usr_local_custom_roles.append(role) manifested_usr_local_custom_roles.append(role)
# -------------------------
# ------------------------- # -------------------------
# Service roles # Service roles
# ------------------------- # -------------------------
@ -1459,7 +1315,7 @@ User-requested extra file harvesting.
task_parts: List[str] = [] task_parts: List[str] = []
task_parts.append( task_parts.append(
f"""--- f"""---
# Generated by enroll # Generated by enroll (data-driven tasks)
- name: Install packages for {role} - name: Install packages for {role}
ansible.builtin.apt: ansible.builtin.apt:
@ -1618,7 +1474,7 @@ Generated from `{unit}`.
task_parts: List[str] = [] task_parts: List[str] = []
task_parts.append( task_parts.append(
f"""--- f"""---
# Generated by enroll # Generated by enroll (data-driven tasks)
- name: Install packages for {role} - name: Install packages for {role}
ansible.builtin.apt: ansible.builtin.apt:
@ -1666,8 +1522,7 @@ Generated for package `{pkg}`.
manifested_pkg_roles.append(role) manifested_pkg_roles.append(role)
all_roles = ( all_roles = (
manifested_apt_config_roles manifested_pkg_roles
+ manifested_pkg_roles
+ manifested_service_roles + manifested_service_roles
+ manifested_etc_custom_roles + manifested_etc_custom_roles
+ manifested_usr_local_custom_roles + manifested_usr_local_custom_roles

View file

@ -174,7 +174,7 @@ def expand_includes(
patterns: Sequence[CompiledPathPattern], patterns: Sequence[CompiledPathPattern],
*, *,
exclude: Optional[PathFilter] = None, exclude: Optional[PathFilter] = None,
max_files: int, max_files: int = 4000,
) -> Tuple[List[str], List[str]]: ) -> Tuple[List[str], List[str]]:
"""Expand include patterns into concrete file paths. """Expand include patterns into concrete file paths.