From 984b0fa81b5b224951816c4dc46a74734b950d07 Mon Sep 17 00:00:00 2001 From: Miguel Jacq Date: Mon, 29 Dec 2025 14:59:34 +1100 Subject: [PATCH] Add ability to enroll RH-style systems (DNF5/DNF/RPM) --- CHANGELOG.md | 1 + README.md | 18 +-- enroll/debian.py | 26 ---- enroll/fsutil.py | 40 ++++++ enroll/harvest.py | 272 ++++++++++++++++++++++++++--------------- enroll/ignore.py | 1 + enroll/manifest.py | 229 ++++++++++++++++++++++++++++++---- enroll/platform.py | 261 +++++++++++++++++++++++++++++++++++++++ enroll/rpm.py | 266 ++++++++++++++++++++++++++++++++++++++++ tests/test_debian.py | 56 --------- tests/test_fsutil.py | 25 ++++ tests/test_harvest.py | 142 +++++++++++++++------ tests/test_manifest.py | 93 ++++++++++++++ tests/test_platform.py | 93 ++++++++++++++ tests/test_rpm.py | 131 ++++++++++++++++++++ 15 files changed, 1400 insertions(+), 254 deletions(-) create mode 100644 enroll/fsutil.py create mode 100644 enroll/platform.py create mode 100644 enroll/rpm.py create mode 100644 tests/test_fsutil.py create mode 100644 tests/test_platform.py create mode 100644 tests/test_rpm.py diff --git a/CHANGELOG.md b/CHANGELOG.md index e07f57b..f92e0b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # 0.2.0 * Add version CLI arg + * Add ability to enroll RH-style systems (DNF5/DNF/RPM) # 0.1.7 diff --git a/README.md b/README.md index c6b8123..d075951 100644 --- a/README.md +++ b/README.md @@ -4,15 +4,15 @@ Enroll logo -**enroll** inspects a Linux machine (currently Debian-only) and generates Ansible roles/playbooks (and optionally inventory) for what it finds. +**enroll** inspects a Linux machine (Debian-like or RedHat-like) and generates Ansible roles/playbooks (and optionally inventory) for what it finds. - Detects packages that have been installed. -- Detects Debian package ownership of `/etc` files using dpkg’s local database. -- Captures config that has **changed from packaged defaults** (dpkg conffile hashes + package md5sums when available). +- Detects package ownership of `/etc` files where possible +- Captures config that has **changed from packaged defaults** where possible (e.g dpkg conffile hashes + package md5sums when available). - Also captures **service-relevant custom/unowned files** under `/etc//...` (e.g. drop-in config includes). - Defensively excludes likely secrets (path denylist + content sniff + size caps). - Captures non-system users and their SSH public keys. -- Captures miscellaneous `/etc` files it can’t attribute to a package and installs them in an `etc_custom` role. +- Captures miscellaneous `/etc` files it can't attribute to a package and installs them in an `etc_custom` role. - Ditto for /usr/local/bin (for non-binary files) and /usr/local/etc - Avoids trying to start systemd services that were detected as inactive during harvest. @@ -41,8 +41,8 @@ Use when enrolling **one server** (or generating a “golden” role set you int **Characteristics** - Roles are more self-contained. -- Raw config files live in the role’s `files/`. -- Template variables live in the role’s `defaults/main.yml`. +- Raw config files live in the role's `files/`. +- Template variables live in the role's `defaults/main.yml`. ### Multi-site mode (`--fqdn`) Use when enrolling **several existing servers** quickly, especially if they differ. @@ -68,13 +68,13 @@ Harvest state about a host and write a harvest bundle. - “Manual” packages - Changed-from-default config (plus related custom/unowned files under service dirs) - Non-system users + SSH public keys -- Misc `/etc` that can’t be attributed to a package (`etc_custom` role) +- Misc `/etc` that can't be attributed to a package (`etc_custom` role) - Optional user-specified extra files/dirs via `--include-path` (emitted as an `extra_paths` role at manifest time) **Common flags** - Remote harvesting: - `--remote-host`, `--remote-user`, `--remote-port` - - `--no-sudo` (if you don’t want/need sudo) + - `--no-sudo` (if you don't want/need sudo) - Sensitive-data behaviour: - default: tries to avoid likely secrets - `--dangerous`: disables secret-safety checks (see “Sensitive data” below) @@ -233,7 +233,7 @@ poetry run enroll --help ## Found a bug / have a suggestion? -My Forgejo doesn’t currently support federation, so I haven’t opened registration/login for issues. +My Forgejo doesn't currently support federation, so I haven't opened registration/login for issues. Instead, email me (see `pyproject.toml`) or contact me on the Fediverse: diff --git a/enroll/debian.py b/enroll/debian.py index 0ddc1f3..7e1ee2d 100644 --- a/enroll/debian.py +++ b/enroll/debian.py @@ -1,7 +1,6 @@ from __future__ import annotations import glob -import hashlib import os import subprocess # nosec from typing import Dict, List, Optional, Set, Tuple @@ -180,28 +179,3 @@ def read_pkg_md5sums(pkg: str) -> Dict[str, str]: md5, rel = line.split(None, 1) m[rel.strip()] = md5.strip() return m - - -def file_md5(path: str) -> str: - h = hashlib.md5() # nosec - with open(path, "rb") as f: - for chunk in iter(lambda: f.read(1024 * 1024), b""): - h.update(chunk) - return h.hexdigest() - - -def stat_triplet(path: str) -> Tuple[str, str, str]: - st = os.stat(path, follow_symlinks=True) - mode = oct(st.st_mode & 0o777)[2:].zfill(4) - - import pwd, grp - - try: - owner = pwd.getpwuid(st.st_uid).pw_name - except KeyError: - owner = str(st.st_uid) - try: - group = grp.getgrgid(st.st_gid).gr_name - except KeyError: - group = str(st.st_gid) - return owner, group, mode diff --git a/enroll/fsutil.py b/enroll/fsutil.py new file mode 100644 index 0000000..3d18df6 --- /dev/null +++ b/enroll/fsutil.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +import hashlib +import os +from typing import Tuple + + +def file_md5(path: str) -> str: + """Return hex MD5 of a file. + + Used for Debian dpkg baseline comparisons. + """ + h = hashlib.md5() # nosec + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(1024 * 1024), b""): + h.update(chunk) + return h.hexdigest() + + +def stat_triplet(path: str) -> Tuple[str, str, str]: + """Return (owner, group, mode) for a path. + + owner/group are usernames/group names when resolvable, otherwise numeric ids. + mode is a zero-padded octal string (e.g. "0644"). + """ + st = os.stat(path, follow_symlinks=True) + mode = oct(st.st_mode & 0o777)[2:].zfill(4) + + import grp + import pwd + + try: + owner = pwd.getpwuid(st.st_uid).pw_name + except KeyError: + owner = str(st.st_uid) + try: + group = grp.getgrgid(st.st_gid).gr_name + except KeyError: + group = str(st.st_gid) + return owner, group, mode diff --git a/enroll/harvest.py b/enroll/harvest.py index d678b89..bb706b1 100644 --- a/enroll/harvest.py +++ b/enroll/harvest.py @@ -15,18 +15,12 @@ from .systemd import ( get_timer_info, UnitQueryError, ) -from .debian import ( - build_dpkg_etc_index, - dpkg_owner, - file_md5, - list_manual_packages, - parse_status_conffiles, - read_pkg_md5sums, - stat_triplet, -) +from .fsutil import stat_triplet +from .platform import detect_platform, get_backend from .ignore import IgnorePolicy from .pathfilter import PathFilter, expand_includes from .accounts import collect_non_system_users +from .version import get_enroll_version @dataclass @@ -85,6 +79,14 @@ class AptConfigSnapshot: notes: List[str] +@dataclass +class DnfConfigSnapshot: + role_name: str + managed_files: List[ManagedFile] + excluded: List[ExcludedFile] + notes: List[str] + + @dataclass class EtcCustomSnapshot: role_name: str @@ -158,6 +160,13 @@ SHARED_ETC_TOPDIRS = { "sudoers.d", "sysctl.d", "systemd", + # RPM-family shared trees + "dnf", + "yum", + "yum.repos.d", + "sysconfig", + "pki", + "firewalld", } @@ -314,17 +323,23 @@ def _add_pkgs_from_etc_topdirs( pkgs.add(p) -def _maybe_add_specific_paths(hints: Set[str]) -> List[str]: - paths: List[str] = [] - for h in hints: - paths.extend( - [ - f"/etc/default/{h}", - f"/etc/init.d/{h}", - f"/etc/sysctl.d/{h}.conf", - ] - ) - return paths +def _maybe_add_specific_paths(hints: Set[str], backend) -> List[str]: + # Delegate to backend-specific conventions (e.g. /etc/default on Debian, + # /etc/sysconfig on Fedora/RHEL). Always include sysctl.d. + try: + return backend.specific_paths_for_hints(hints) + except Exception: + # Best-effort fallback (Debian-ish). + paths: List[str] = [] + for h in hints: + paths.extend( + [ + f"/etc/default/{h}", + f"/etc/init.d/{h}", + f"/etc/sysctl.d/{h}.conf", + ] + ) + return paths def _scan_unowned_under_roots( @@ -408,6 +423,7 @@ _SYSTEM_CAPTURE_GLOBS: List[tuple[str, str]] = [ ("/etc/anacron/*", "system_cron"), ("/var/spool/cron/crontabs/*", "system_cron"), ("/var/spool/crontabs/*", "system_cron"), + ("/var/spool/cron/*", "system_cron"), # network ("/etc/netplan/*", "system_network"), ("/etc/systemd/network/*", "system_network"), @@ -415,6 +431,9 @@ _SYSTEM_CAPTURE_GLOBS: List[tuple[str, str]] = [ ("/etc/network/interfaces.d/*", "system_network"), ("/etc/resolvconf.conf", "system_network"), ("/etc/resolvconf/resolv.conf.d/*", "system_network"), + ("/etc/NetworkManager/system-connections/*", "system_network"), + ("/etc/sysconfig/network*", "system_network"), + ("/etc/sysconfig/network-scripts/*", "system_network"), # firewall ("/etc/nftables.conf", "system_firewall"), ("/etc/nftables.d/*", "system_firewall"), @@ -422,6 +441,10 @@ _SYSTEM_CAPTURE_GLOBS: List[tuple[str, str]] = [ ("/etc/iptables/rules.v6", "system_firewall"), ("/etc/ufw/*", "system_firewall"), ("/etc/default/ufw", "system_firewall"), + ("/etc/firewalld/*", "system_firewall"), + ("/etc/firewalld/zones/*", "system_firewall"), + # SELinux + ("/etc/selinux/config", "system_security"), # other ("/etc/rc.local", "system_rc"), ] @@ -553,6 +576,51 @@ def _iter_apt_capture_paths() -> List[tuple[str, str]]: return uniq +def _iter_dnf_capture_paths() -> List[tuple[str, str]]: + """Return (path, reason) pairs for DNF/YUM configuration on RPM systems. + + Captures: + - /etc/dnf/* (dnf.conf, vars, plugins, modules, automatic) + - /etc/yum.conf (legacy) + - /etc/yum.repos.d/*.repo + - /etc/pki/rpm-gpg/* (GPG key files) + """ + reasons: Dict[str, str] = {} + + for root, tag in ( + ("/etc/dnf", "dnf_config"), + ("/etc/yum", "yum_config"), + ): + if os.path.isdir(root): + for dirpath, _, filenames in os.walk(root): + for fn in filenames: + p = os.path.join(dirpath, fn) + if os.path.islink(p) or not os.path.isfile(p): + continue + reasons.setdefault(p, tag) + + # Legacy yum.conf. + if os.path.isfile("/etc/yum.conf") and not os.path.islink("/etc/yum.conf"): + reasons.setdefault("/etc/yum.conf", "yum_conf") + + # Repositories. + if os.path.isdir("/etc/yum.repos.d"): + for p in _iter_matching_files("/etc/yum.repos.d/*.repo"): + reasons[p] = "yum_repo" + + # RPM GPG keys. + if os.path.isdir("/etc/pki/rpm-gpg"): + for dirpath, _, filenames in os.walk("/etc/pki/rpm-gpg"): + for fn in filenames: + p = os.path.join(dirpath, fn) + if os.path.islink(p) or not os.path.isfile(p): + continue + reasons.setdefault(p, "rpm_gpg_key") + + # Stable ordering. + return [(p, reasons[p]) for p in sorted(reasons.keys())] + + def _iter_system_capture_paths() -> List[tuple[str, str]]: """Return (path, reason) pairs for essential system config/state (non-APT).""" out: List[tuple[str, str]] = [] @@ -600,8 +668,12 @@ def harvest( flush=True, ) - owned_etc, etc_owner_map, topdir_to_pkgs, pkg_to_etc_paths = build_dpkg_etc_index() - conffiles_by_pkg = parse_status_conffiles() + platform = detect_platform() + backend = get_backend(platform) + + owned_etc, etc_owner_map, topdir_to_pkgs, pkg_to_etc_paths = ( + backend.build_etc_index() + ) # ------------------------- # Service roles @@ -645,12 +717,12 @@ def harvest( candidates: Dict[str, str] = {} if ui.fragment_path: - p = dpkg_owner(ui.fragment_path) + p = backend.owner_of_path(ui.fragment_path) if p: pkgs.add(p) for exe in ui.exec_paths: - p = dpkg_owner(exe) + p = backend.owner_of_path(exe) if p: pkgs.add(p) @@ -675,7 +747,7 @@ def harvest( # logrotate.d entries) can still be attributed back to this service. service_role_aliases[role] = set(hints) | set(pkgs) | {role} - for sp in _maybe_add_specific_paths(hints): + for sp in _maybe_add_specific_paths(hints, backend): if not os.path.exists(sp): continue if sp in etc_owner_map: @@ -684,31 +756,13 @@ def harvest( candidates.setdefault(sp, "custom_specific_path") for pkg in sorted(pkgs): - conff = conffiles_by_pkg.get(pkg, {}) - md5sums = read_pkg_md5sums(pkg) - for path in pkg_to_etc_paths.get(pkg, []): + etc_paths = pkg_to_etc_paths.get(pkg, []) + for path, reason in backend.modified_paths(pkg, etc_paths).items(): if not os.path.isfile(path) or os.path.islink(path): continue - if path.startswith("/etc/apt/"): + if backend.is_pkg_config_path(path): continue - if path in conff: - # Only capture conffiles when they differ from the package default. - try: - current = file_md5(path) - except OSError: - continue - if current != conff[path]: - candidates.setdefault(path, "modified_conffile") - continue - rel = path.lstrip("/") - baseline = md5sums.get(rel) - if baseline: - try: - current = file_md5(path) - except OSError: - continue - if current != baseline: - candidates.setdefault(path, "modified_packaged_file") + candidates.setdefault(path, reason) # Capture custom/unowned files living under /etc/ for this service. # @@ -847,18 +901,18 @@ def harvest( # (useful when a timer triggers a service that isn't enabled). pkgs: Set[str] = set() if ti.fragment_path: - p = dpkg_owner(ti.fragment_path) + p = backend.owner_of_path(ti.fragment_path) if p: pkgs.add(p) if ti.trigger_unit and ti.trigger_unit.endswith(".service"): try: ui = get_unit_info(ti.trigger_unit) if ui.fragment_path: - p = dpkg_owner(ui.fragment_path) + p = backend.owner_of_path(ui.fragment_path) if p: pkgs.add(p) for exe in ui.exec_paths: - p = dpkg_owner(exe) + p = backend.owner_of_path(exe) if p: pkgs.add(p) except Exception: # nosec @@ -870,7 +924,7 @@ def harvest( # ------------------------- # Manually installed package roles # ------------------------- - manual_pkgs = list_manual_packages() + manual_pkgs = backend.list_manual_packages() # Avoid duplicate roles: if a manual package is already managed by any service role, skip its pkg_ role. covered_by_services: Set[str] = set() for s in service_snaps: @@ -893,41 +947,26 @@ def harvest( for tpath in timer_extra_by_pkg.get(pkg, []): candidates.setdefault(tpath, "related_timer") - conff = conffiles_by_pkg.get(pkg, {}) - md5sums = read_pkg_md5sums(pkg) - - for path in pkg_to_etc_paths.get(pkg, []): + etc_paths = pkg_to_etc_paths.get(pkg, []) + for path, reason in backend.modified_paths(pkg, etc_paths).items(): if not os.path.isfile(path) or os.path.islink(path): continue - if path.startswith("/etc/apt/"): + if backend.is_pkg_config_path(path): continue - if path in conff: - try: - current = file_md5(path) - except OSError: - continue - if current != conff[path]: - candidates.setdefault(path, "modified_conffile") - continue - rel = path.lstrip("/") - baseline = md5sums.get(rel) - if baseline: - try: - current = file_md5(path) - except OSError: - continue - if current != baseline: - candidates.setdefault(path, "modified_packaged_file") + candidates.setdefault(path, reason) topdirs = _topdirs_for_package(pkg, pkg_to_etc_paths) roots: List[str] = [] + # Collect candidate directories plus backend-specific common files. for td in sorted(topdirs): if td in SHARED_ETC_TOPDIRS: continue + if backend.is_pkg_config_path(f"/etc/{td}/") or backend.is_pkg_config_path( + f"/etc/{td}" + ): + continue roots.extend([f"/etc/{td}", f"/etc/{td}.d"]) - roots.extend([f"/etc/default/{td}"]) - roots.extend([f"/etc/init.d/{td}"]) - roots.extend([f"/etc/sysctl.d/{td}.conf"]) + roots.extend(_maybe_add_specific_paths(set(topdirs), backend)) # Capture any custom/unowned files under /etc/ for this # manually-installed package. This may include runtime-generated @@ -1031,26 +1070,48 @@ def harvest( ) # ------------------------- - # apt_config role (APT configuration and keyrings) + # Package manager config role + # - Debian: apt_config + # - Fedora/RHEL-like: dnf_config # ------------------------- apt_notes: List[str] = [] apt_excluded: List[ExcludedFile] = [] apt_managed: List[ManagedFile] = [] - apt_role_name = "apt_config" - apt_role_seen = seen_by_role.setdefault(apt_role_name, set()) + dnf_notes: List[str] = [] + dnf_excluded: List[ExcludedFile] = [] + dnf_managed: List[ManagedFile] = [] - for path, reason in _iter_apt_capture_paths(): - _capture_file( - bundle_dir=bundle_dir, - role_name=apt_role_name, - abs_path=path, - reason=reason, - policy=policy, - path_filter=path_filter, - managed_out=apt_managed, - excluded_out=apt_excluded, - seen_role=apt_role_seen, - ) + apt_role_name = "apt_config" + dnf_role_name = "dnf_config" + + if backend.name == "dpkg": + apt_role_seen = seen_by_role.setdefault(apt_role_name, set()) + for path, reason in _iter_apt_capture_paths(): + _capture_file( + bundle_dir=bundle_dir, + role_name=apt_role_name, + abs_path=path, + reason=reason, + policy=policy, + path_filter=path_filter, + managed_out=apt_managed, + excluded_out=apt_excluded, + seen_role=apt_role_seen, + ) + elif backend.name == "rpm": + dnf_role_seen = seen_by_role.setdefault(dnf_role_name, set()) + for path, reason in _iter_dnf_capture_paths(): + _capture_file( + bundle_dir=bundle_dir, + role_name=dnf_role_name, + abs_path=path, + reason=reason, + policy=policy, + path_filter=path_filter, + managed_out=dnf_managed, + excluded_out=dnf_excluded, + seen_role=dnf_role_seen, + ) apt_config_snapshot = AptConfigSnapshot( role_name=apt_role_name, @@ -1058,6 +1119,12 @@ def harvest( excluded=apt_excluded, notes=apt_notes, ) + dnf_config_snapshot = DnfConfigSnapshot( + role_name=dnf_role_name, + managed_files=dnf_managed, + excluded=dnf_excluded, + notes=dnf_notes, + ) # ------------------------- # etc_custom role (unowned /etc files not already attributed elsewhere) @@ -1079,6 +1146,8 @@ def harvest( already.add(mf.path) for mf in apt_managed: already.add(mf.path) + for mf in dnf_managed: + already.add(mf.path) # Maps for re-attributing shared snippets (cron.d/logrotate.d) to existing roles. svc_by_role: Dict[str, ServiceSnapshot] = {s.role_name: s for s in service_snaps} @@ -1093,7 +1162,7 @@ def harvest( for pkg in s.packages: pkg_to_service_roles.setdefault(pkg, []).append(s.role_name) - # Alias -> role mapping used as a fallback when dpkg ownership is missing. + # Alias -> role mapping used as a fallback when package ownership is missing. # Prefer service roles over package roles when both would match. alias_ranked: Dict[str, tuple[int, str]] = {} @@ -1124,8 +1193,8 @@ def harvest( per service. Resolution order: - 1) dpkg owner -> service role (if any service references the package) - 2) dpkg owner -> package role (manual package role exists) + 1) package owner -> service role (if any service references the package) + 2) package owner -> package role (manual package role exists) 3) basename/stem alias match -> preferred role """ if path.startswith("/etc/logrotate.d/"): @@ -1147,7 +1216,7 @@ def harvest( seen.add(c) uniq.append(c) - pkg = dpkg_owner(path) + pkg = backend.owner_of_path(path) if pkg: svc_roles = sorted(set(pkg_to_service_roles.get(pkg, []))) if svc_roles: @@ -1226,7 +1295,7 @@ def harvest( for dirpath, _, filenames in os.walk("/etc"): for fn in filenames: path = os.path.join(dirpath, fn) - if path.startswith("/etc/apt/"): + if backend.is_pkg_config_path(path): continue if path in already: continue @@ -1413,13 +1482,22 @@ def harvest( ) state = { - "host": {"hostname": os.uname().nodename, "os": "debian"}, + "enroll": { + "version": get_enroll_version(), + }, + "host": { + "hostname": os.uname().nodename, + "os": platform.os_family, + "pkg_backend": backend.name, + "os_release": platform.os_release, + }, "users": asdict(users_snapshot), "services": [asdict(s) for s in service_snaps], "manual_packages": manual_pkgs, "manual_packages_skipped": manual_pkgs_skipped, "package_roles": [asdict(p) for p in pkg_snaps], "apt_config": asdict(apt_config_snapshot), + "dnf_config": asdict(dnf_config_snapshot), "etc_custom": asdict(etc_custom_snapshot), "usr_local_custom": asdict(usr_local_custom_snapshot), "extra_paths": asdict(extra_paths_snapshot), diff --git a/enroll/ignore.py b/enroll/ignore.py index ab2cb96..904997f 100644 --- a/enroll/ignore.py +++ b/enroll/ignore.py @@ -43,6 +43,7 @@ DEFAULT_ALLOW_BINARY_GLOBS = [ "/usr/share/keyrings/*.gpg", "/usr/share/keyrings/*.pgp", "/usr/share/keyrings/*.asc", + "/etc/pki/rpm-gpg/*", ] SENSITIVE_CONTENT_PATTERNS = [ diff --git a/enroll/manifest.py b/enroll/manifest.py index dbc2353..923040f 100644 --- a/enroll/manifest.py +++ b/enroll/manifest.py @@ -166,6 +166,7 @@ def _write_playbook_all(path: str, roles: List[str]) -> None: pb_lines = [ "---", "- name: Apply all roles on all hosts", + " gather_facts: true", " hosts: all", " become: true", " roles:", @@ -181,6 +182,7 @@ def _write_playbook_host(path: str, fqdn: str, roles: List[str]) -> None: "---", f"- name: Apply all roles on {fqdn}", f" hosts: {fqdn}", + " gather_facts: true", " become: true", " roles:", ] @@ -468,6 +470,51 @@ def _render_generic_files_tasks( """ +def _render_install_packages_tasks(role: str, var_prefix: str) -> str: + """Render cross-distro package installation tasks. + + We generate conditional tasks for apt/dnf/yum, falling back to the + generic `package` module. This keeps generated roles usable on both + Debian-like and RPM-like systems. + """ + return f"""# Generated by enroll + +- name: Install packages for {role} (APT) + ansible.builtin.apt: + name: "{{{{ {var_prefix}_packages | default([]) }}}}" + state: present + update_cache: true + when: + - ({var_prefix}_packages | default([])) | length > 0 + - ansible_facts.pkg_mgr | default('') == 'apt' + +- name: Install packages for {role} (DNF5) + ansible.builtin.dnf5: + name: "{{{{ {var_prefix}_packages | default([]) }}}}" + state: present + when: + - ({var_prefix}_packages | default([])) | length > 0 + - ansible_facts.pkg_mgr | default('') == 'dnf5' + +- name: Install packages for {role} (DNF/YUM) + ansible.builtin.dnf: + name: "{{{{ {var_prefix}_packages | default([]) }}}}" + state: present + when: + - ({var_prefix}_packages | default([])) | length > 0 + - ansible_facts.pkg_mgr | default('') in ['dnf', 'yum'] + +- name: Install packages for {role} (generic fallback) + ansible.builtin.package: + name: "{{{{ {var_prefix}_packages | default([]) }}}}" + state: present + when: + - ({var_prefix}_packages | default([])) | length > 0 + - ansible_facts.pkg_mgr | default('') not in ['apt', 'dnf', 'dnf5', 'yum'] + +""" + + def _prepare_bundle_dir( bundle: str, *, @@ -629,6 +676,7 @@ def _manifest_from_bundle_dir( package_roles: List[Dict[str, Any]] = state.get("package_roles", []) users_snapshot: Dict[str, Any] = state.get("users", {}) apt_config_snapshot: Dict[str, Any] = state.get("apt_config", {}) + dnf_config_snapshot: Dict[str, Any] = state.get("dnf_config", {}) etc_custom_snapshot: Dict[str, Any] = state.get("etc_custom", {}) usr_local_custom_snapshot: Dict[str, Any] = state.get("usr_local_custom", {}) extra_paths_snapshot: Dict[str, Any] = state.get("extra_paths", {}) @@ -664,6 +712,7 @@ def _manifest_from_bundle_dir( manifested_users_roles: List[str] = [] manifested_apt_config_roles: List[str] = [] + manifested_dnf_config_roles: List[str] = [] manifested_etc_custom_roles: List[str] = [] manifested_usr_local_custom_roles: List[str] = [] manifested_extra_paths_roles: List[str] = [] @@ -1041,6 +1090,157 @@ APT configuration harvested from the system (sources, pinning, and keyrings). manifested_apt_config_roles.append(role) + # ------------------------- + # dnf_config role (DNF/YUM repos, config, and RPM GPG keys) + # ------------------------- + if dnf_config_snapshot and dnf_config_snapshot.get("managed_files"): + role = dnf_config_snapshot.get("role_name", "dnf_config") + role_dir = os.path.join(roles_root, role) + _write_role_scaffold(role_dir) + + var_prefix = role + + managed_files = dnf_config_snapshot.get("managed_files", []) + excluded = dnf_config_snapshot.get("excluded", []) + notes = dnf_config_snapshot.get("notes", []) + + templated, jt_vars = _jinjify_managed_files( + bundle_dir, + role, + role_dir, + managed_files, + jt_exe=jt_exe, + jt_enabled=jt_enabled, + overwrite_templates=not site_mode, + ) + + if site_mode: + _copy_artifacts( + bundle_dir, + role, + _host_role_files_dir(out_dir, fqdn or "", role), + exclude_rels=templated, + ) + else: + _copy_artifacts( + bundle_dir, + role, + os.path.join(role_dir, "files"), + exclude_rels=templated, + ) + + files_var = _build_managed_files_var( + managed_files, + templated, + notify_other=None, + notify_systemd=None, + ) + + jt_map = _yaml_load_mapping(jt_vars) if jt_vars.strip() else {} + vars_map: Dict[str, Any] = {f"{var_prefix}_managed_files": files_var} + vars_map = _merge_mappings_overwrite(vars_map, jt_map) + + if site_mode: + _write_role_defaults(role_dir, {f"{var_prefix}_managed_files": []}) + _write_hostvars(out_dir, fqdn or "", role, vars_map) + else: + _write_role_defaults(role_dir, vars_map) + + tasks = "---\n" + _render_generic_files_tasks( + var_prefix, include_restart_notify=False + ) + with open( + os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8" + ) as f: + f.write(tasks.rstrip() + "\n") + + with open( + os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8" + ) as f: + f.write("---\ndependencies: []\n") + + # README: summarise repos and GPG key material + repo_paths: List[str] = [] + key_paths: List[str] = [] + repo_hosts: Set[str] = set() + + url_re = re.compile(r"(?:https?|ftp)://([^/\s]+)", re.IGNORECASE) + file_url_re = re.compile(r"file://(/[^\s]+)") + + for mf in managed_files: + p = str(mf.get("path") or "") + src_rel = str(mf.get("src_rel") or "") + if not p or not src_rel: + continue + + if p.startswith("/etc/yum.repos.d/") and p.endswith(".repo"): + repo_paths.append(p) + art_path = os.path.join(bundle_dir, "artifacts", role, src_rel) + try: + with open(art_path, "r", encoding="utf-8", errors="replace") as rf: + for line in rf: + s = line.strip() + if not s or s.startswith("#") or s.startswith(";"): + continue + # Collect hostnames from URLs (baseurl, mirrorlist, metalink, gpgkey...) + for m in url_re.finditer(s): + repo_hosts.add(m.group(1)) + # Collect local gpgkey file paths referenced as file:///... + for m in file_url_re.finditer(s): + key_paths.append(m.group(1)) + except OSError: + pass # nosec + + if p.startswith("/etc/pki/rpm-gpg/"): + key_paths.append(p) + + repo_paths = sorted(set(repo_paths)) + key_paths = sorted(set(key_paths)) + repos = sorted(repo_hosts) + + readme = ( + """# dnf_config + +DNF/YUM configuration harvested from the system (repos, config files, and RPM GPG keys). + +## Repository hosts +""" + + ("\n".join([f"- {h}" for h in repos]) or "- (none)") + + """\n +## Repo files +""" + + ("\n".join([f"- {p}" for p in repo_paths]) or "- (none)") + + """\n +## GPG keys +""" + + ("\n".join([f"- {p}" for p in key_paths]) or "- (none)") + + """\n +## Managed files +""" + + ( + "\n".join( + [f"- {mf.get('path')} ({mf.get('reason')})" for mf in managed_files] + ) + or "- (none)" + ) + + """\n +## Excluded +""" + + ( + "\n".join([f"- {e.get('path')} ({e.get('reason')})" for e in excluded]) + or "- (none)" + ) + + """\n +## Notes +""" + + ("\n".join([f"- {n}" for n in notes]) or "- (none)") + + """\n""" + ) + with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f: + f.write(readme) + + manifested_dnf_config_roles.append(role) + # ------------------------- # etc_custom role (unowned /etc not already attributed) # ------------------------- @@ -1457,19 +1657,7 @@ User-requested extra file harvesting. f.write(handlers) task_parts: List[str] = [] - task_parts.append( - f"""--- -# Generated by enroll - -- name: Install packages for {role} - ansible.builtin.apt: - name: "{{{{ {var_prefix}_packages | default([]) }}}}" - state: present - update_cache: true - when: ({var_prefix}_packages | default([])) | length > 0 - -""" - ) + task_parts.append("---\n" + _render_install_packages_tasks(role, var_prefix)) task_parts.append( _render_generic_files_tasks(var_prefix, include_restart_notify=True) @@ -1616,19 +1804,7 @@ Generated from `{unit}`. f.write(handlers) task_parts: List[str] = [] - task_parts.append( - f"""--- -# Generated by enroll - -- name: Install packages for {role} - ansible.builtin.apt: - name: "{{{{ {var_prefix}_packages | default([]) }}}}" - state: present - update_cache: true - when: ({var_prefix}_packages | default([])) | length > 0 - -""" - ) + task_parts.append("---\n" + _render_install_packages_tasks(role, var_prefix)) task_parts.append( _render_generic_files_tasks(var_prefix, include_restart_notify=False) ) @@ -1667,6 +1843,7 @@ Generated for package `{pkg}`. manifested_pkg_roles.append(role) all_roles = ( manifested_apt_config_roles + + manifested_dnf_config_roles + manifested_pkg_roles + manifested_service_roles + manifested_etc_custom_roles diff --git a/enroll/platform.py b/enroll/platform.py new file mode 100644 index 0000000..998b83d --- /dev/null +++ b/enroll/platform.py @@ -0,0 +1,261 @@ +from __future__ import annotations + +import shutil +from dataclasses import dataclass +from typing import Dict, List, Optional, Set, Tuple + +from .fsutil import file_md5 + + +def _read_os_release(path: str = "/etc/os-release") -> Dict[str, str]: + out: Dict[str, str] = {} + try: + with open(path, "r", encoding="utf-8", errors="replace") as f: + for raw in f: + line = raw.strip() + if not line or line.startswith("#") or "=" not in line: + continue + k, v = line.split("=", 1) + k = k.strip() + v = v.strip().strip('"') + out[k] = v + except OSError: + return {} + return out + + +@dataclass +class PlatformInfo: + os_family: str # debian|redhat|unknown + pkg_backend: str # dpkg|rpm|unknown + os_release: Dict[str, str] + + +def detect_platform() -> PlatformInfo: + """Detect platform family and package backend. + + Uses /etc/os-release when available, with a conservative fallback to + checking for dpkg/rpm binaries. + """ + + osr = _read_os_release() + os_id = (osr.get("ID") or "").strip().lower() + likes = (osr.get("ID_LIKE") or "").strip().lower().split() + + deb_ids = {"debian", "ubuntu", "linuxmint", "raspbian", "kali"} + rhel_ids = { + "fedora", + "rhel", + "centos", + "rocky", + "almalinux", + "ol", + "oracle", + "scientific", + } + + if os_id in deb_ids or "debian" in likes: + return PlatformInfo(os_family="debian", pkg_backend="dpkg", os_release=osr) + if os_id in rhel_ids or any( + x in likes for x in ("rhel", "fedora", "centos", "redhat") + ): + return PlatformInfo(os_family="redhat", pkg_backend="rpm", os_release=osr) + + # Fallback heuristics. + if shutil.which("dpkg"): + return PlatformInfo(os_family="debian", pkg_backend="dpkg", os_release=osr) + if shutil.which("rpm"): + return PlatformInfo(os_family="redhat", pkg_backend="rpm", os_release=osr) + return PlatformInfo(os_family="unknown", pkg_backend="unknown", os_release=osr) + + +class PackageBackend: + """Backend abstraction for package ownership, config detection, and manual package lists.""" + + name: str + pkg_config_prefixes: Tuple[str, ...] + + def owner_of_path(self, path: str) -> Optional[str]: # pragma: no cover + raise NotImplementedError + + def list_manual_packages(self) -> List[str]: # pragma: no cover + raise NotImplementedError + + def build_etc_index( + self, + ) -> Tuple[ + Set[str], Dict[str, str], Dict[str, Set[str]], Dict[str, List[str]] + ]: # pragma: no cover + raise NotImplementedError + + def specific_paths_for_hints(self, hints: Set[str]) -> List[str]: + return [] + + def is_pkg_config_path(self, path: str) -> bool: + for pfx in self.pkg_config_prefixes: + if path == pfx or path.startswith(pfx): + return True + return False + + def modified_paths(self, pkg: str, etc_paths: List[str]) -> Dict[str, str]: + """Return a mapping of modified file paths -> reason label.""" + return {} + + +class DpkgBackend(PackageBackend): + name = "dpkg" + pkg_config_prefixes = ("/etc/apt/",) + + def __init__(self) -> None: + from .debian import parse_status_conffiles + + self._conffiles_by_pkg = parse_status_conffiles() + + def owner_of_path(self, path: str) -> Optional[str]: + from .debian import dpkg_owner + + return dpkg_owner(path) + + def list_manual_packages(self) -> List[str]: + from .debian import list_manual_packages + + return list_manual_packages() + + def build_etc_index(self): + from .debian import build_dpkg_etc_index + + return build_dpkg_etc_index() + + def specific_paths_for_hints(self, hints: Set[str]) -> List[str]: + paths: List[str] = [] + for h in hints: + paths.extend( + [ + f"/etc/default/{h}", + f"/etc/init.d/{h}", + f"/etc/sysctl.d/{h}.conf", + ] + ) + return paths + + def modified_paths(self, pkg: str, etc_paths: List[str]) -> Dict[str, str]: + from .debian import read_pkg_md5sums + + out: Dict[str, str] = {} + conff = self._conffiles_by_pkg.get(pkg, {}) + md5sums = read_pkg_md5sums(pkg) + + for path in etc_paths: + if not path.startswith("/etc/"): + continue + if self.is_pkg_config_path(path): + continue + if path in conff: + try: + current = file_md5(path) + except OSError: + continue + if current != conff[path]: + out[path] = "modified_conffile" + continue + + rel = path.lstrip("/") + baseline = md5sums.get(rel) + if baseline: + try: + current = file_md5(path) + except OSError: + continue + if current != baseline: + out[path] = "modified_packaged_file" + return out + + +class RpmBackend(PackageBackend): + name = "rpm" + pkg_config_prefixes = ( + "/etc/dnf/", + "/etc/yum/", + "/etc/yum.repos.d/", + "/etc/yum.conf", + ) + + def __init__(self) -> None: + self._modified_cache: Dict[str, Set[str]] = {} + self._config_cache: Dict[str, Set[str]] = {} + + def owner_of_path(self, path: str) -> Optional[str]: + from .rpm import rpm_owner + + return rpm_owner(path) + + def list_manual_packages(self) -> List[str]: + from .rpm import list_manual_packages + + return list_manual_packages() + + def build_etc_index(self): + from .rpm import build_rpm_etc_index + + return build_rpm_etc_index() + + def specific_paths_for_hints(self, hints: Set[str]) -> List[str]: + paths: List[str] = [] + for h in hints: + paths.extend( + [ + f"/etc/sysconfig/{h}", + f"/etc/sysconfig/{h}.conf", + f"/etc/sysctl.d/{h}.conf", + ] + ) + return paths + + def _config_files(self, pkg: str) -> Set[str]: + if pkg in self._config_cache: + return self._config_cache[pkg] + from .rpm import rpm_config_files + + s = rpm_config_files(pkg) + self._config_cache[pkg] = s + return s + + def _modified_files(self, pkg: str) -> Set[str]: + if pkg in self._modified_cache: + return self._modified_cache[pkg] + from .rpm import rpm_modified_files + + s = rpm_modified_files(pkg) + self._modified_cache[pkg] = s + return s + + def modified_paths(self, pkg: str, etc_paths: List[str]) -> Dict[str, str]: + out: Dict[str, str] = {} + modified = self._modified_files(pkg) + if not modified: + return out + config = self._config_files(pkg) + + for path in etc_paths: + if not path.startswith("/etc/"): + continue + if self.is_pkg_config_path(path): + continue + if path not in modified: + continue + out[path] = ( + "modified_conffile" if path in config else "modified_packaged_file" + ) + return out + + +def get_backend(info: Optional[PlatformInfo] = None) -> PackageBackend: + info = info or detect_platform() + if info.pkg_backend == "dpkg": + return DpkgBackend() + if info.pkg_backend == "rpm": + return RpmBackend() + # Unknown: be conservative and use an rpm backend if rpm exists, otherwise dpkg. + if shutil.which("rpm"): + return RpmBackend() + return DpkgBackend() diff --git a/enroll/rpm.py b/enroll/rpm.py new file mode 100644 index 0000000..947617c --- /dev/null +++ b/enroll/rpm.py @@ -0,0 +1,266 @@ +from __future__ import annotations + +import os +import re +import shutil +import subprocess # nosec +from typing import Dict, List, Optional, Set, Tuple + + +def _run( + cmd: list[str], *, allow_fail: bool = False, merge_err: bool = False +) -> tuple[int, str]: + """Run a command and return (rc, stdout). + + If merge_err is True, stderr is merged into stdout to preserve ordering. + """ + p = subprocess.run( + cmd, + check=False, + text=True, + stdout=subprocess.PIPE, + stderr=(subprocess.STDOUT if merge_err else subprocess.PIPE), + ) # nosec + out = p.stdout or "" + if (not allow_fail) and p.returncode != 0: + err = "" if merge_err else (p.stderr or "") + raise RuntimeError(f"Command failed: {cmd}\n{err}{out}") + return p.returncode, out + + +def rpm_owner(path: str) -> Optional[str]: + """Return owning package name for a path, or None if unowned.""" + if not path: + return None + rc, out = _run( + ["rpm", "-qf", "--qf", "%{NAME}\n", path], allow_fail=True, merge_err=True + ) + if rc != 0: + return None + for line in out.splitlines(): + line = line.strip() + if not line: + continue + if "is not owned" in line: + return None + # With --qf we expect just the package name. + if re.match(r"^[A-Za-z0-9_.+:-]+$", line): + # Strip any accidental epoch/name-version-release output. + return line.split(":", 1)[-1].strip() if line else None + return None + + +_ARCH_SUFFIXES = { + "noarch", + "x86_64", + "i686", + "aarch64", + "armv7hl", + "ppc64le", + "s390x", + "riscv64", +} + + +def _strip_arch(token: str) -> str: + """Strip a trailing .ARCH from a yum/dnf package token.""" + t = token.strip() + if "." not in t: + return t + head, tail = t.rsplit(".", 1) + if tail in _ARCH_SUFFIXES: + return head + return t + + +def list_manual_packages() -> List[str]: + """Return packages considered "user-installed" on RPM-based systems. + + Best-effort: + 1) dnf repoquery --userinstalled + 2) dnf history userinstalled + 3) yum history userinstalled + + If none are available, returns an empty list. + """ + + def _dedupe(pkgs: List[str]) -> List[str]: + return sorted({p for p in (pkgs or []) if p}) + + if shutil.which("dnf"): + # Prefer a machine-friendly output. + for cmd in ( + ["dnf", "-q", "repoquery", "--userinstalled", "--qf", "%{name}\n"], + ["dnf", "-q", "repoquery", "--userinstalled"], + ): + rc, out = _run(cmd, allow_fail=True, merge_err=True) + if rc == 0 and out.strip(): + pkgs = [] + for line in out.splitlines(): + line = line.strip() + if not line or line.startswith("Loaded plugins"): + continue + pkgs.append(_strip_arch(line.split()[0])) + if pkgs: + return _dedupe(pkgs) + + # Fallback: human-oriented output. + rc, out = _run( + ["dnf", "-q", "history", "userinstalled"], allow_fail=True, merge_err=True + ) + if rc == 0 and out.strip(): + pkgs = [] + for line in out.splitlines(): + line = line.strip() + if not line or line.startswith("Installed") or line.startswith("Last"): + continue + # Often: "vim-enhanced.x86_64" + tok = line.split()[0] + pkgs.append(_strip_arch(tok)) + if pkgs: + return _dedupe(pkgs) + + if shutil.which("yum"): + rc, out = _run( + ["yum", "-q", "history", "userinstalled"], allow_fail=True, merge_err=True + ) + if rc == 0 and out.strip(): + pkgs = [] + for line in out.splitlines(): + line = line.strip() + if ( + not line + or line.startswith("Installed") + or line.startswith("Loaded") + ): + continue + tok = line.split()[0] + pkgs.append(_strip_arch(tok)) + if pkgs: + return _dedupe(pkgs) + + return [] + + +def _walk_etc_files() -> List[str]: + out: List[str] = [] + for dirpath, _, filenames in os.walk("/etc"): + for fn in filenames: + p = os.path.join(dirpath, fn) + if os.path.islink(p) or not os.path.isfile(p): + continue + out.append(p) + return out + + +def build_rpm_etc_index() -> ( + Tuple[Set[str], Dict[str, str], Dict[str, Set[str]], Dict[str, List[str]]] +): + """Best-effort equivalent of build_dpkg_etc_index for RPM systems. + + This builds indexes by walking the live /etc tree and querying RPM ownership + for each file. + + Returns: + owned_etc_paths: set of /etc paths owned by rpm + etc_owner_map: /etc/path -> pkg + topdir_to_pkgs: "nginx" -> {"nginx", ...} based on /etc//... + pkg_to_etc_paths: pkg -> list of owned /etc paths + """ + + owned: Set[str] = set() + owner: Dict[str, str] = {} + topdir_to_pkgs: Dict[str, Set[str]] = {} + pkg_to_etc: Dict[str, List[str]] = {} + + paths = _walk_etc_files() + + # Query in chunks to avoid excessive process spawns. + chunk_size = 250 + + not_owned_re = re.compile( + r"^file\s+(?P.+?)\s+is\s+not\s+owned\s+by\s+any\s+package", re.IGNORECASE + ) + + for i in range(0, len(paths), chunk_size): + chunk = paths[i : i + chunk_size] + rc, out = _run( + ["rpm", "-qf", "--qf", "%{NAME}\n", *chunk], + allow_fail=True, + merge_err=True, + ) + + lines = [ln.strip() for ln in out.splitlines() if ln.strip()] + # Heuristic: rpm prints one output line per input path. If that isn't + # true (warnings/errors), fall back to per-file queries for this chunk. + if len(lines) != len(chunk): + for p in chunk: + pkg = rpm_owner(p) + if not pkg: + continue + owned.add(p) + owner.setdefault(p, pkg) + pkg_to_etc.setdefault(pkg, []).append(p) + parts = p.split("/", 3) + if len(parts) >= 3 and parts[2]: + topdir_to_pkgs.setdefault(parts[2], set()).add(pkg) + continue + + for pth, line in zip(chunk, lines): + if not line: + continue + if not_owned_re.match(line) or "is not owned" in line: + continue + pkg = line.split()[0].strip() + if not pkg: + continue + owned.add(pth) + owner.setdefault(pth, pkg) + pkg_to_etc.setdefault(pkg, []).append(pth) + parts = pth.split("/", 3) + if len(parts) >= 3 and parts[2]: + topdir_to_pkgs.setdefault(parts[2], set()).add(pkg) + + for k, v in list(pkg_to_etc.items()): + pkg_to_etc[k] = sorted(set(v)) + + return owned, owner, topdir_to_pkgs, pkg_to_etc + + +def rpm_config_files(pkg: str) -> Set[str]: + """Return config files for a package (rpm -qc).""" + rc, out = _run(["rpm", "-qc", pkg], allow_fail=True, merge_err=True) + if rc != 0: + return set() + files: Set[str] = set() + for line in out.splitlines(): + line = line.strip() + if line.startswith("/"): + files.add(line) + return files + + +def rpm_modified_files(pkg: str) -> Set[str]: + """Return files reported as modified by rpm verification (rpm -V). + + rpm -V only prints lines for differences/missing files. + """ + rc, out = _run(["rpm", "-V", pkg], allow_fail=True, merge_err=True) + # rc is non-zero when there are differences; we still want the output. + files: Set[str] = set() + for raw in out.splitlines(): + line = raw.strip() + if not line: + continue + # Typical forms: + # S.5....T. c /etc/foo.conf + # missing /etc/bar + m = re.search(r"\s(/\S+)$", line) + if m: + files.add(m.group(1)) + continue + if line.startswith("missing"): + parts = line.split() + if parts and parts[-1].startswith("/"): + files.add(parts[-1]) + return files diff --git a/tests/test_debian.py b/tests/test_debian.py index 333afc1..abad361 100644 --- a/tests/test_debian.py +++ b/tests/test_debian.py @@ -1,6 +1,5 @@ from __future__ import annotations -import hashlib from pathlib import Path @@ -97,58 +96,3 @@ def test_parse_status_conffiles_handles_continuations(tmp_path: Path): assert m["nginx"]["/etc/nginx/nginx.conf"] == "abcdef" assert m["nginx"]["/etc/nginx/mime.types"] == "123456" assert "other" not in m - - -def test_read_pkg_md5sums_and_file_md5(tmp_path: Path, monkeypatch): - import enroll.debian as d - - # Patch /var/lib/dpkg/info/.md5sums lookup to a tmp file. - md5_file = tmp_path / "pkg.md5sums" - md5_file.write_text("0123456789abcdef etc/foo.conf\n", encoding="utf-8") - - def fake_exists(path: str) -> bool: - return path.endswith("/var/lib/dpkg/info/p1.md5sums") - - real_open = open - - def fake_open(path: str, *args, **kwargs): - if path.endswith("/var/lib/dpkg/info/p1.md5sums"): - return real_open(md5_file, *args, **kwargs) - return real_open(path, *args, **kwargs) - - monkeypatch.setattr(d.os.path, "exists", fake_exists) - monkeypatch.setattr("builtins.open", fake_open) - - m = d.read_pkg_md5sums("p1") - assert m == {"etc/foo.conf": "0123456789abcdef"} - - content = b"hello world\n" - p = tmp_path / "x" - p.write_bytes(content) - assert d.file_md5(str(p)) == hashlib.md5(content).hexdigest() - - -def test_stat_triplet_fallbacks(tmp_path: Path, monkeypatch): - import enroll.debian as d - import sys - - p = tmp_path / "f" - p.write_text("x", encoding="utf-8") - - class FakePwdMod: - @staticmethod - def getpwuid(_): # pragma: no cover - raise KeyError - - class FakeGrpMod: - @staticmethod - def getgrgid(_): # pragma: no cover - raise KeyError - - # stat_triplet imports pwd/grp inside the function, so patch sys.modules. - monkeypatch.setitem(sys.modules, "pwd", FakePwdMod) - monkeypatch.setitem(sys.modules, "grp", FakeGrpMod) - owner, group, mode = d.stat_triplet(str(p)) - assert owner.isdigit() - assert group.isdigit() - assert mode.isdigit() and len(mode) == 4 diff --git a/tests/test_fsutil.py b/tests/test_fsutil.py new file mode 100644 index 0000000..ebe2224 --- /dev/null +++ b/tests/test_fsutil.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +import hashlib +import os +from pathlib import Path + +from enroll.fsutil import file_md5, stat_triplet + + +def test_file_md5_matches_hashlib(tmp_path: Path): + p = tmp_path / "x" + p.write_bytes(b"hello world") + expected = hashlib.md5(b"hello world").hexdigest() # nosec + assert file_md5(str(p)) == expected + + +def test_stat_triplet_reports_mode(tmp_path: Path): + p = tmp_path / "x" + p.write_text("x", encoding="utf-8") + os.chmod(p, 0o600) + + owner, group, mode = stat_triplet(str(p)) + assert mode == "0600" + assert owner # non-empty string + assert group # non-empty string diff --git a/tests/test_harvest.py b/tests/test_harvest.py index fa796f0..a0d22ec 100644 --- a/tests/test_harvest.py +++ b/tests/test_harvest.py @@ -2,6 +2,7 @@ import json from pathlib import Path import enroll.harvest as h +from enroll.platform import PlatformInfo from enroll.systemd import UnitInfo @@ -10,6 +11,64 @@ class AllowAllPolicy: return None +class FakeBackend: + """Minimal backend stub for harvest tests. + + The real backends (dpkg/rpm) enumerate the live system (dpkg status, rpm + databases, etc). These tests instead control all backend behaviour. + """ + + def __init__( + self, + *, + name: str, + owned_etc: set[str], + etc_owner_map: dict[str, str], + topdir_to_pkgs: dict[str, set[str]], + pkg_to_etc_paths: dict[str, list[str]], + manual_pkgs: list[str], + owner_fn, + modified_by_pkg: dict[str, dict[str, str]] | None = None, + pkg_config_prefixes: tuple[str, ...] = ("/etc/apt/",), + ): + self.name = name + self.pkg_config_prefixes = pkg_config_prefixes + self._owned_etc = owned_etc + self._etc_owner_map = etc_owner_map + self._topdir_to_pkgs = topdir_to_pkgs + self._pkg_to_etc_paths = pkg_to_etc_paths + self._manual = manual_pkgs + self._owner_fn = owner_fn + self._modified_by_pkg = modified_by_pkg or {} + + def build_etc_index(self): + return ( + self._owned_etc, + self._etc_owner_map, + self._topdir_to_pkgs, + self._pkg_to_etc_paths, + ) + + def owner_of_path(self, path: str): + return self._owner_fn(path) + + def list_manual_packages(self): + return list(self._manual) + + def specific_paths_for_hints(self, hints: set[str]): + return [] + + def is_pkg_config_path(self, path: str) -> bool: + for pfx in self.pkg_config_prefixes: + if path == pfx or path.startswith(pfx): + return True + return False + + def modified_paths(self, pkg: str, etc_paths: list[str]): + # Test-controlled; ignore etc_paths. + return dict(self._modified_by_pkg.get(pkg, {})) + + def test_harvest_dedup_manual_packages_and_builds_etc_custom( monkeypatch, tmp_path: Path ): @@ -22,7 +81,7 @@ def test_harvest_dedup_manual_packages_and_builds_etc_custom( real_exists = os.path.exists real_islink = os.path.islink - # Fake filesystem: two /etc files exist, only one is dpkg-owned. + # Fake filesystem: two /etc files exist, only one is package-owned. # Also include some /usr/local files to populate usr_local_custom. files = { "/etc/openvpn/server.conf": b"server", @@ -93,6 +152,7 @@ def test_harvest_dedup_manual_packages_and_builds_etc_custom( # Avoid real system access monkeypatch.setattr(h, "list_enabled_services", lambda: ["openvpn.service"]) + monkeypatch.setattr(h, "list_enabled_timers", lambda: []) monkeypatch.setattr( h, "get_unit_info", @@ -109,29 +169,30 @@ def test_harvest_dedup_manual_packages_and_builds_etc_custom( ), ) - # Debian package index: openvpn owns /etc/openvpn/server.conf; keyboard is unowned. - def fake_build_index(): - owned_etc = {"/etc/openvpn/server.conf"} - etc_owner_map = {"/etc/openvpn/server.conf": "openvpn"} - topdir_to_pkgs = {"openvpn": {"openvpn"}} - pkg_to_etc_paths = {"openvpn": ["/etc/openvpn/server.conf"], "curl": []} - return owned_etc, etc_owner_map, topdir_to_pkgs, pkg_to_etc_paths + # Package index: openvpn owns /etc/openvpn/server.conf; keyboard is unowned. + owned_etc = {"/etc/openvpn/server.conf"} + etc_owner_map = {"/etc/openvpn/server.conf": "openvpn"} + topdir_to_pkgs = {"openvpn": {"openvpn"}} + pkg_to_etc_paths = {"openvpn": ["/etc/openvpn/server.conf"], "curl": []} - monkeypatch.setattr(h, "build_dpkg_etc_index", fake_build_index) - - # openvpn conffile hash mismatch => should be captured under service role - monkeypatch.setattr( - h, - "parse_status_conffiles", - lambda: {"openvpn": {"/etc/openvpn/server.conf": "old"}}, + backend = FakeBackend( + name="dpkg", + owned_etc=owned_etc, + etc_owner_map=etc_owner_map, + topdir_to_pkgs=topdir_to_pkgs, + pkg_to_etc_paths=pkg_to_etc_paths, + manual_pkgs=["openvpn", "curl"], + owner_fn=lambda p: "openvpn" if "openvpn" in (p or "") else None, + modified_by_pkg={ + "openvpn": {"/etc/openvpn/server.conf": "modified_conffile"}, + }, ) - monkeypatch.setattr(h, "read_pkg_md5sums", lambda pkg: {}) - monkeypatch.setattr(h, "file_md5", lambda path: "new") monkeypatch.setattr( - h, "dpkg_owner", lambda p: "openvpn" if "openvpn" in p else None + h, "detect_platform", lambda: PlatformInfo("debian", "dpkg", {}) ) - monkeypatch.setattr(h, "list_manual_packages", lambda: ["openvpn", "curl"]) + monkeypatch.setattr(h, "get_backend", lambda info=None: backend) + monkeypatch.setattr(h, "collect_non_system_users", lambda: []) def fake_stat_triplet(p: str): @@ -207,6 +268,7 @@ def test_shared_cron_snippet_prefers_matching_role_over_lexicographic( monkeypatch.setattr( h, "list_enabled_services", lambda: ["apparmor.service", "ntpsec.service"] ) + monkeypatch.setattr(h, "list_enabled_timers", lambda: []) def fake_unit_info(unit: str) -> UnitInfo: if unit == "apparmor.service": @@ -235,31 +297,35 @@ def test_shared_cron_snippet_prefers_matching_role_over_lexicographic( monkeypatch.setattr(h, "get_unit_info", fake_unit_info) - # Dpkg /etc index: no owned /etc paths needed for this test. - monkeypatch.setattr( - h, - "build_dpkg_etc_index", - lambda: (set(), {}, {}, {}), - ) - monkeypatch.setattr(h, "parse_status_conffiles", lambda: {}) - monkeypatch.setattr(h, "read_pkg_md5sums", lambda pkg: {}) - monkeypatch.setattr(h, "file_md5", lambda path: "x") - monkeypatch.setattr(h, "list_manual_packages", lambda: []) - monkeypatch.setattr(h, "collect_non_system_users", lambda: []) - # Make apparmor *also* claim the ntpsec package (simulates overly-broad # package inference). The snippet routing should still prefer role 'ntpsec'. - def fake_dpkg_owner(p: str): + def fake_owner(p: str): if p == "/etc/cron.d/ntpsec": return "ntpsec" - if "apparmor" in p: + if "apparmor" in (p or ""): return "ntpsec" # intentionally misleading - if "ntpsec" in p or "ntpd" in p: + if "ntpsec" in (p or "") or "ntpd" in (p or ""): return "ntpsec" return None - monkeypatch.setattr(h, "dpkg_owner", fake_dpkg_owner) + backend = FakeBackend( + name="dpkg", + owned_etc=set(), + etc_owner_map={}, + topdir_to_pkgs={}, + pkg_to_etc_paths={}, + manual_pkgs=[], + owner_fn=fake_owner, + modified_by_pkg={}, + ) + + monkeypatch.setattr( + h, "detect_platform", lambda: PlatformInfo("debian", "dpkg", {}) + ) + monkeypatch.setattr(h, "get_backend", lambda info=None: backend) + monkeypatch.setattr(h, "stat_triplet", lambda p: ("root", "root", "0644")) + monkeypatch.setattr(h, "collect_non_system_users", lambda: []) def fake_copy(bundle_dir: str, role_name: str, abs_path: str, src_rel: str): dst = Path(bundle_dir) / "artifacts" / role_name / src_rel @@ -268,11 +334,7 @@ def test_shared_cron_snippet_prefers_matching_role_over_lexicographic( monkeypatch.setattr(h, "_copy_into_bundle", fake_copy) - class AllowAll: - def deny_reason(self, path: str): - return None - - state_path = h.harvest(str(bundle), policy=AllowAll()) + state_path = h.harvest(str(bundle), policy=AllowAllPolicy()) st = json.loads(Path(state_path).read_text(encoding="utf-8")) # Cron snippet should end up attached to the ntpsec role, not apparmor. diff --git a/tests/test_manifest.py b/tests/test_manifest.py index 92c3dfc..cbfc208 100644 --- a/tests/test_manifest.py +++ b/tests/test_manifest.py @@ -322,3 +322,96 @@ def test_copy2_replace_overwrites_readonly_destination(tmp_path: Path): assert dst.read_text(encoding="utf-8") == "new" mode = stat.S_IMODE(dst.stat().st_mode) assert mode & stat.S_IWUSR # destination should remain mergeable + + +def test_manifest_includes_dnf_config_role_when_present(tmp_path: Path): + bundle = tmp_path / "bundle" + out = tmp_path / "ansible" + + # Create a dnf_config artifact. + (bundle / "artifacts" / "dnf_config" / "etc" / "dnf").mkdir( + parents=True, exist_ok=True + ) + (bundle / "artifacts" / "dnf_config" / "etc" / "dnf" / "dnf.conf").write_text( + "[main]\n", encoding="utf-8" + ) + + state = { + "host": {"hostname": "test", "os": "redhat", "pkg_backend": "rpm"}, + "users": { + "role_name": "users", + "users": [], + "managed_files": [], + "excluded": [], + "notes": [], + }, + "services": [], + "package_roles": [], + "manual_packages": [], + "manual_packages_skipped": [], + "apt_config": { + "role_name": "apt_config", + "managed_files": [], + "excluded": [], + "notes": [], + }, + "dnf_config": { + "role_name": "dnf_config", + "managed_files": [ + { + "path": "/etc/dnf/dnf.conf", + "src_rel": "etc/dnf/dnf.conf", + "owner": "root", + "group": "root", + "mode": "0644", + "reason": "dnf_config", + } + ], + "excluded": [], + "notes": [], + }, + "etc_custom": { + "role_name": "etc_custom", + "managed_files": [], + "excluded": [], + "notes": [], + }, + "usr_local_custom": { + "role_name": "usr_local_custom", + "managed_files": [], + "excluded": [], + "notes": [], + }, + "extra_paths": { + "role_name": "extra_paths", + "include_patterns": [], + "exclude_patterns": [], + "managed_files": [], + "excluded": [], + "notes": [], + }, + } + + bundle.mkdir(parents=True, exist_ok=True) + (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") + + manifest(str(bundle), str(out)) + + pb = (out / "playbook.yml").read_text(encoding="utf-8") + assert "- dnf_config" in pb + + tasks = (out / "roles" / "dnf_config" / "tasks" / "main.yml").read_text( + encoding="utf-8" + ) + # Ensure the role exists and contains some file deployment logic. + assert "Deploy any other managed files" in tasks + + +def test_render_install_packages_tasks_contains_dnf_branch(): + from enroll.manifest import _render_install_packages_tasks + + txt = _render_install_packages_tasks("role", "role") + assert "ansible.builtin.apt" in txt + assert "ansible.builtin.dnf" in txt + assert "ansible.builtin.package" in txt + assert "pkg_mgr" in txt diff --git a/tests/test_platform.py b/tests/test_platform.py new file mode 100644 index 0000000..7ff66c6 --- /dev/null +++ b/tests/test_platform.py @@ -0,0 +1,93 @@ +from __future__ import annotations + +from pathlib import Path + +import enroll.platform as platform + + +def test_read_os_release_parses_kv_and_strips_quotes(tmp_path: Path): + p = tmp_path / "os-release" + p.write_text( + """ +# comment +ID=fedora +ID_LIKE=\"rhel centos\" +NAME=\"Fedora Linux\" +EMPTY= +NOEQUALS +""", + encoding="utf-8", + ) + + osr = platform._read_os_release(str(p)) + assert osr["ID"] == "fedora" + assert osr["ID_LIKE"] == "rhel centos" + assert osr["NAME"] == "Fedora Linux" + assert osr["EMPTY"] == "" + assert "NOEQUALS" not in osr + + +def test_detect_platform_prefers_os_release(monkeypatch): + monkeypatch.setattr( + platform, + "_read_os_release", + lambda path="/etc/os-release": {"ID": "fedora", "ID_LIKE": "rhel"}, + ) + # If os-release is decisive we shouldn't need which() + monkeypatch.setattr(platform.shutil, "which", lambda exe: None) + + info = platform.detect_platform() + assert info.os_family == "redhat" + assert info.pkg_backend == "rpm" + + +def test_detect_platform_fallbacks_to_dpkg_when_unknown(monkeypatch): + monkeypatch.setattr(platform, "_read_os_release", lambda path="/etc/os-release": {}) + monkeypatch.setattr( + platform.shutil, "which", lambda exe: "/usr/bin/dpkg" if exe == "dpkg" else None + ) + + info = platform.detect_platform() + assert info.os_family == "debian" + assert info.pkg_backend == "dpkg" + + +def test_get_backend_unknown_prefers_rpm_if_present(monkeypatch): + monkeypatch.setattr( + platform.shutil, "which", lambda exe: "/usr/bin/rpm" if exe == "rpm" else None + ) + + b = platform.get_backend( + platform.PlatformInfo(os_family="unknown", pkg_backend="unknown", os_release={}) + ) + assert isinstance(b, platform.RpmBackend) + + +def test_rpm_backend_modified_paths_labels_conffiles(monkeypatch): + b = platform.RpmBackend() + + # Pretend rpm -V says both files changed, but only one is a config file. + monkeypatch.setattr(b, "_modified_files", lambda pkg: {"/etc/foo.conf", "/etc/bar"}) + monkeypatch.setattr(b, "_config_files", lambda pkg: {"/etc/foo.conf"}) + + out = b.modified_paths("mypkg", ["/etc/foo.conf", "/etc/bar", "/etc/dnf/dnf.conf"]) + assert out["/etc/foo.conf"] == "modified_conffile" + assert out["/etc/bar"] == "modified_packaged_file" + # Package-manager config paths are excluded. + assert "/etc/dnf/dnf.conf" not in out + + +def test_specific_paths_for_hints_differs_between_backends(): + # We can exercise this without instantiating DpkgBackend (which reads dpkg status) + class Dummy(platform.PackageBackend): + name = "dummy" + pkg_config_prefixes = ("/etc/apt/",) + + d = Dummy() + assert d.is_pkg_config_path("/etc/apt/sources.list") + assert not d.is_pkg_config_path("/etc/ssh/sshd_config") + + r = platform.RpmBackend() + paths = set(r.specific_paths_for_hints({"nginx"})) + assert "/etc/sysconfig/nginx" in paths + assert "/etc/sysconfig/nginx.conf" in paths diff --git a/tests/test_rpm.py b/tests/test_rpm.py new file mode 100644 index 0000000..ea97c12 --- /dev/null +++ b/tests/test_rpm.py @@ -0,0 +1,131 @@ +from __future__ import annotations + +import enroll.rpm as rpm + + +def test_rpm_owner_returns_none_when_unowned(monkeypatch): + monkeypatch.setattr( + rpm, + "_run", + lambda cmd, allow_fail=False, merge_err=False: ( + 1, + "file /etc/x is not owned by any package\n", + ), + ) + assert rpm.rpm_owner("/etc/x") is None + + +def test_rpm_owner_parses_name(monkeypatch): + monkeypatch.setattr( + rpm, "_run", lambda cmd, allow_fail=False, merge_err=False: (0, "bash\n") + ) + assert rpm.rpm_owner("/bin/bash") == "bash" + + +def test_strip_arch_strips_known_arches(): + assert rpm._strip_arch("vim-enhanced.x86_64") == "vim-enhanced" + assert rpm._strip_arch("foo.noarch") == "foo" + assert rpm._strip_arch("weird.token") == "weird.token" + + +def test_list_manual_packages_prefers_dnf_repoquery(monkeypatch): + monkeypatch.setattr( + rpm.shutil, "which", lambda exe: "/usr/bin/dnf" if exe == "dnf" else None + ) + + def fake_run(cmd, allow_fail=False, merge_err=False): + # First repoquery form returns usable output. + if cmd[:3] == ["dnf", "-q", "repoquery"]: + return 0, "vim-enhanced.x86_64\nhtop\nvim-enhanced.x86_64\n" + raise AssertionError(f"unexpected cmd: {cmd}") + + monkeypatch.setattr(rpm, "_run", fake_run) + + pkgs = rpm.list_manual_packages() + assert pkgs == ["htop", "vim-enhanced"] + + +def test_list_manual_packages_falls_back_to_history(monkeypatch): + monkeypatch.setattr( + rpm.shutil, "which", lambda exe: "/usr/bin/dnf" if exe == "dnf" else None + ) + + def fake_run(cmd, allow_fail=False, merge_err=False): + # repoquery fails + if cmd[:3] == ["dnf", "-q", "repoquery"]: + return 1, "" + if cmd[:3] == ["dnf", "-q", "history"]: + return ( + 0, + "Installed Packages\nvim-enhanced.x86_64\nLast metadata expiration check: 0:01:00 ago\n", + ) + raise AssertionError(f"unexpected cmd: {cmd}") + + monkeypatch.setattr(rpm, "_run", fake_run) + + pkgs = rpm.list_manual_packages() + assert pkgs == ["vim-enhanced"] + + +def test_build_rpm_etc_index_uses_fallback_when_rpm_output_mismatches(monkeypatch): + # Two files in /etc, one owned, one unowned. + monkeypatch.setattr( + rpm, "_walk_etc_files", lambda: ["/etc/owned.conf", "/etc/unowned.conf"] + ) + + # Simulate chunk query producing unexpected extra line (mismatch) -> triggers per-file fallback. + monkeypatch.setattr( + rpm, + "_run", + lambda cmd, allow_fail=False, merge_err=False: (0, "ownedpkg\nEXTRA\nTHIRD\n"), + ) + monkeypatch.setattr( + rpm, "rpm_owner", lambda p: "ownedpkg" if p == "/etc/owned.conf" else None + ) + + owned, owner_map, topdir_to_pkgs, pkg_to_etc = rpm.build_rpm_etc_index() + + assert owned == {"/etc/owned.conf"} + assert owner_map["/etc/owned.conf"] == "ownedpkg" + assert "owned.conf" in topdir_to_pkgs + assert pkg_to_etc["ownedpkg"] == ["/etc/owned.conf"] + + +def test_build_rpm_etc_index_parses_chunk_output(monkeypatch): + monkeypatch.setattr( + rpm, "_walk_etc_files", lambda: ["/etc/ssh/sshd_config", "/etc/notowned"] + ) + + def fake_run(cmd, allow_fail=False, merge_err=False): + # One output line per input path. + return 0, "openssh-server\nfile /etc/notowned is not owned by any package\n" + + monkeypatch.setattr(rpm, "_run", fake_run) + + owned, owner_map, topdir_to_pkgs, pkg_to_etc = rpm.build_rpm_etc_index() + + assert "/etc/ssh/sshd_config" in owned + assert "/etc/notowned" not in owned + assert owner_map["/etc/ssh/sshd_config"] == "openssh-server" + assert "ssh" in topdir_to_pkgs + assert "openssh-server" in topdir_to_pkgs["ssh"] + assert pkg_to_etc["openssh-server"] == ["/etc/ssh/sshd_config"] + + +def test_rpm_config_files_and_modified_files_parsing(monkeypatch): + monkeypatch.setattr( + rpm, + "_run", + lambda cmd, allow_fail=False, merge_err=False: ( + 0, + "/etc/foo.conf\n/usr/bin/tool\n", + ), + ) + assert rpm.rpm_config_files("mypkg") == {"/etc/foo.conf", "/usr/bin/tool"} + + # rpm -V returns only changed/missing files + out = "S.5....T. c /etc/foo.conf\nmissing /etc/bar\n" + monkeypatch.setattr( + rpm, "_run", lambda cmd, allow_fail=False, merge_err=False: (1, out) + ) + assert rpm.rpm_modified_files("mypkg") == {"/etc/foo.conf", "/etc/bar"}