diff --git a/CHANGELOG.md b/CHANGELOG.md index f92e0b7..f2cb109 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,3 @@ -# 0.2.0 - - * Add version CLI arg - * Add ability to enroll RH-style systems (DNF5/DNF/RPM) - # 0.1.7 * Fix an attribution bug for certain files ending up in the wrong package/role. diff --git a/README.md b/README.md index d075951..c6b8123 100644 --- a/README.md +++ b/README.md @@ -4,15 +4,15 @@ Enroll logo -**enroll** inspects a Linux machine (Debian-like or RedHat-like) and generates Ansible roles/playbooks (and optionally inventory) for what it finds. +**enroll** inspects a Linux machine (currently Debian-only) and generates Ansible roles/playbooks (and optionally inventory) for what it finds. - Detects packages that have been installed. -- Detects package ownership of `/etc` files where possible -- Captures config that has **changed from packaged defaults** where possible (e.g dpkg conffile hashes + package md5sums when available). +- Detects Debian package ownership of `/etc` files using dpkg’s local database. +- Captures config that has **changed from packaged defaults** (dpkg conffile hashes + package md5sums when available). - Also captures **service-relevant custom/unowned files** under `/etc//...` (e.g. drop-in config includes). - Defensively excludes likely secrets (path denylist + content sniff + size caps). - Captures non-system users and their SSH public keys. -- Captures miscellaneous `/etc` files it can't attribute to a package and installs them in an `etc_custom` role. +- Captures miscellaneous `/etc` files it can’t attribute to a package and installs them in an `etc_custom` role. - Ditto for /usr/local/bin (for non-binary files) and /usr/local/etc - Avoids trying to start systemd services that were detected as inactive during harvest. @@ -41,8 +41,8 @@ Use when enrolling **one server** (or generating a “golden” role set you int **Characteristics** - Roles are more self-contained. -- Raw config files live in the role's `files/`. -- Template variables live in the role's `defaults/main.yml`. +- Raw config files live in the role’s `files/`. +- Template variables live in the role’s `defaults/main.yml`. ### Multi-site mode (`--fqdn`) Use when enrolling **several existing servers** quickly, especially if they differ. @@ -68,13 +68,13 @@ Harvest state about a host and write a harvest bundle. - “Manual” packages - Changed-from-default config (plus related custom/unowned files under service dirs) - Non-system users + SSH public keys -- Misc `/etc` that can't be attributed to a package (`etc_custom` role) +- Misc `/etc` that can’t be attributed to a package (`etc_custom` role) - Optional user-specified extra files/dirs via `--include-path` (emitted as an `extra_paths` role at manifest time) **Common flags** - Remote harvesting: - `--remote-host`, `--remote-user`, `--remote-port` - - `--no-sudo` (if you don't want/need sudo) + - `--no-sudo` (if you don’t want/need sudo) - Sensitive-data behaviour: - default: tries to avoid likely secrets - `--dangerous`: disables secret-safety checks (see “Sensitive data” below) @@ -233,7 +233,7 @@ poetry run enroll --help ## Found a bug / have a suggestion? -My Forgejo doesn't currently support federation, so I haven't opened registration/login for issues. +My Forgejo doesn’t currently support federation, so I haven’t opened registration/login for issues. Instead, email me (see `pyproject.toml`) or contact me on the Fediverse: diff --git a/enroll/cli.py b/enroll/cli.py index bb4d3f1..ae9aba0 100644 --- a/enroll/cli.py +++ b/enroll/cli.py @@ -15,7 +15,6 @@ from .harvest import harvest from .manifest import manifest from .remote import remote_harvest from .sopsutil import SopsError, encrypt_file_binary -from .version import get_enroll_version def _discover_config_path(argv: list[str]) -> Optional[Path]: @@ -319,6 +318,13 @@ def _jt_mode(args: argparse.Namespace) -> str: return "auto" +def _add_remote_args(p: argparse.ArgumentParser) -> None: + p.add_argument( + "--remote-host", + help="SSH host to run harvesting on (if set, harvest runs remotely and is pulled locally).", + ) + + def _add_config_args(p: argparse.ArgumentParser) -> None: p.add_argument( "-c", @@ -333,13 +339,6 @@ def _add_config_args(p: argparse.ArgumentParser) -> None: action="store_true", help="Do not load any INI config file (even if one would be auto-discovered).", ) - - -def _add_remote_args(p: argparse.ArgumentParser) -> None: - p.add_argument( - "--remote-host", - help="SSH host to run harvesting on (if set, harvest runs remotely and is pulled locally).", - ) p.add_argument( "--remote-port", type=int, @@ -355,18 +354,11 @@ def _add_remote_args(p: argparse.ArgumentParser) -> None: def main() -> None: ap = argparse.ArgumentParser(prog="enroll") - ap.add_argument( - "-v", - "--version", - action="version", - version=f"{get_enroll_version()}", - ) _add_config_args(ap) sub = ap.add_subparsers(dest="cmd", required=True) h = sub.add_parser("harvest", help="Harvest service/package/config state") _add_config_args(h) - _add_remote_args(h) h.add_argument( "--out", help=( @@ -414,6 +406,7 @@ def main() -> None: action="store_true", help="Don't use sudo on the remote host (when using --remote options). This may result in a limited harvest due to permission restrictions.", ) + _add_remote_args(h) m = sub.add_parser("manifest", help="Render Ansible roles from a harvest") _add_config_args(m) @@ -450,7 +443,6 @@ def main() -> None: "single-shot", help="Harvest state, then manifest Ansible code, in one shot" ) _add_config_args(s) - _add_remote_args(s) s.add_argument( "--harvest", help=( @@ -508,6 +500,7 @@ def main() -> None: ), ) _add_common_manifest_args(s) + _add_remote_args(s) d = sub.add_parser("diff", help="Compare two harvests and report differences") _add_config_args(d) @@ -609,12 +602,14 @@ def main() -> None: ) args = ap.parse_args(argv) + remote_host: Optional[str] = getattr(args, "remote_host", None) + try: if args.cmd == "harvest": sops_fps = getattr(args, "sops", None) - if args.remote_host: + if remote_host: if sops_fps: - out_file = _resolve_sops_out_file(args.out, hint=args.remote_host) + out_file = _resolve_sops_out_file(args.out, hint=remote_host) with tempfile.TemporaryDirectory(prefix="enroll-harvest-") as td: tmp_bundle = Path(td) / "bundle" tmp_bundle.mkdir(parents=True, exist_ok=True) @@ -624,7 +619,7 @@ def main() -> None: pass remote_harvest( local_out_dir=tmp_bundle, - remote_host=args.remote_host, + remote_host=remote_host, remote_port=int(args.remote_port), remote_user=args.remote_user, dangerous=bool(args.dangerous), @@ -640,11 +635,11 @@ def main() -> None: out_dir = ( Path(args.out) if args.out - else new_harvest_cache_dir(hint=args.remote_host).dir + else new_harvest_cache_dir(hint=remote_host).dir ) state = remote_harvest( local_out_dir=out_dir, - remote_host=args.remote_host, + remote_host=remote_host, remote_port=int(args.remote_port), remote_user=args.remote_user, dangerous=bool(args.dangerous), @@ -674,16 +669,12 @@ def main() -> None: ) print(str(out_file)) else: - if args.out: - out_dir = args.out - else: - out_dir = ( - Path(args.out) - if args.out - else new_harvest_cache_dir(hint=args.remote_host).dir + if not args.out: + raise SystemExit( + "error: --out is required unless --remote-host is set" ) path = harvest( - out_dir, + args.out, dangerous=bool(args.dangerous), include_paths=list(getattr(args, "include_path", []) or []), exclude_paths=list(getattr(args, "exclude_path", []) or []), @@ -756,11 +747,9 @@ def main() -> None: raise SystemExit(2) elif args.cmd == "single-shot": sops_fps = getattr(args, "sops", None) - if args.remote_host: + if remote_host: if sops_fps: - out_file = _resolve_sops_out_file( - args.harvest, hint=args.remote_host - ) + out_file = _resolve_sops_out_file(args.harvest, hint=remote_host) with tempfile.TemporaryDirectory(prefix="enroll-harvest-") as td: tmp_bundle = Path(td) / "bundle" tmp_bundle.mkdir(parents=True, exist_ok=True) @@ -770,7 +759,7 @@ def main() -> None: pass remote_harvest( local_out_dir=tmp_bundle, - remote_host=args.remote_host, + remote_host=remote_host, remote_port=int(args.remote_port), remote_user=args.remote_user, dangerous=bool(args.dangerous), @@ -795,11 +784,11 @@ def main() -> None: harvest_dir = ( Path(args.harvest) if args.harvest - else new_harvest_cache_dir(hint=args.remote_host).dir + else new_harvest_cache_dir(hint=remote_host).dir ) remote_harvest( local_out_dir=harvest_dir, - remote_host=args.remote_host, + remote_host=remote_host, remote_port=int(args.remote_port), remote_user=args.remote_user, dangerous=bool(args.dangerous), diff --git a/enroll/debian.py b/enroll/debian.py index 7e1ee2d..0ddc1f3 100644 --- a/enroll/debian.py +++ b/enroll/debian.py @@ -1,6 +1,7 @@ from __future__ import annotations import glob +import hashlib import os import subprocess # nosec from typing import Dict, List, Optional, Set, Tuple @@ -179,3 +180,28 @@ def read_pkg_md5sums(pkg: str) -> Dict[str, str]: md5, rel = line.split(None, 1) m[rel.strip()] = md5.strip() return m + + +def file_md5(path: str) -> str: + h = hashlib.md5() # nosec + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(1024 * 1024), b""): + h.update(chunk) + return h.hexdigest() + + +def stat_triplet(path: str) -> Tuple[str, str, str]: + st = os.stat(path, follow_symlinks=True) + mode = oct(st.st_mode & 0o777)[2:].zfill(4) + + import pwd, grp + + try: + owner = pwd.getpwuid(st.st_uid).pw_name + except KeyError: + owner = str(st.st_uid) + try: + group = grp.getgrgid(st.st_gid).gr_name + except KeyError: + group = str(st.st_gid) + return owner, group, mode diff --git a/enroll/fsutil.py b/enroll/fsutil.py deleted file mode 100644 index 3d18df6..0000000 --- a/enroll/fsutil.py +++ /dev/null @@ -1,40 +0,0 @@ -from __future__ import annotations - -import hashlib -import os -from typing import Tuple - - -def file_md5(path: str) -> str: - """Return hex MD5 of a file. - - Used for Debian dpkg baseline comparisons. - """ - h = hashlib.md5() # nosec - with open(path, "rb") as f: - for chunk in iter(lambda: f.read(1024 * 1024), b""): - h.update(chunk) - return h.hexdigest() - - -def stat_triplet(path: str) -> Tuple[str, str, str]: - """Return (owner, group, mode) for a path. - - owner/group are usernames/group names when resolvable, otherwise numeric ids. - mode is a zero-padded octal string (e.g. "0644"). - """ - st = os.stat(path, follow_symlinks=True) - mode = oct(st.st_mode & 0o777)[2:].zfill(4) - - import grp - import pwd - - try: - owner = pwd.getpwuid(st.st_uid).pw_name - except KeyError: - owner = str(st.st_uid) - try: - group = grp.getgrgid(st.st_gid).gr_name - except KeyError: - group = str(st.st_gid) - return owner, group, mode diff --git a/enroll/harvest.py b/enroll/harvest.py index bb706b1..d678b89 100644 --- a/enroll/harvest.py +++ b/enroll/harvest.py @@ -15,12 +15,18 @@ from .systemd import ( get_timer_info, UnitQueryError, ) -from .fsutil import stat_triplet -from .platform import detect_platform, get_backend +from .debian import ( + build_dpkg_etc_index, + dpkg_owner, + file_md5, + list_manual_packages, + parse_status_conffiles, + read_pkg_md5sums, + stat_triplet, +) from .ignore import IgnorePolicy from .pathfilter import PathFilter, expand_includes from .accounts import collect_non_system_users -from .version import get_enroll_version @dataclass @@ -79,14 +85,6 @@ class AptConfigSnapshot: notes: List[str] -@dataclass -class DnfConfigSnapshot: - role_name: str - managed_files: List[ManagedFile] - excluded: List[ExcludedFile] - notes: List[str] - - @dataclass class EtcCustomSnapshot: role_name: str @@ -160,13 +158,6 @@ SHARED_ETC_TOPDIRS = { "sudoers.d", "sysctl.d", "systemd", - # RPM-family shared trees - "dnf", - "yum", - "yum.repos.d", - "sysconfig", - "pki", - "firewalld", } @@ -323,23 +314,17 @@ def _add_pkgs_from_etc_topdirs( pkgs.add(p) -def _maybe_add_specific_paths(hints: Set[str], backend) -> List[str]: - # Delegate to backend-specific conventions (e.g. /etc/default on Debian, - # /etc/sysconfig on Fedora/RHEL). Always include sysctl.d. - try: - return backend.specific_paths_for_hints(hints) - except Exception: - # Best-effort fallback (Debian-ish). - paths: List[str] = [] - for h in hints: - paths.extend( - [ - f"/etc/default/{h}", - f"/etc/init.d/{h}", - f"/etc/sysctl.d/{h}.conf", - ] - ) - return paths +def _maybe_add_specific_paths(hints: Set[str]) -> List[str]: + paths: List[str] = [] + for h in hints: + paths.extend( + [ + f"/etc/default/{h}", + f"/etc/init.d/{h}", + f"/etc/sysctl.d/{h}.conf", + ] + ) + return paths def _scan_unowned_under_roots( @@ -423,7 +408,6 @@ _SYSTEM_CAPTURE_GLOBS: List[tuple[str, str]] = [ ("/etc/anacron/*", "system_cron"), ("/var/spool/cron/crontabs/*", "system_cron"), ("/var/spool/crontabs/*", "system_cron"), - ("/var/spool/cron/*", "system_cron"), # network ("/etc/netplan/*", "system_network"), ("/etc/systemd/network/*", "system_network"), @@ -431,9 +415,6 @@ _SYSTEM_CAPTURE_GLOBS: List[tuple[str, str]] = [ ("/etc/network/interfaces.d/*", "system_network"), ("/etc/resolvconf.conf", "system_network"), ("/etc/resolvconf/resolv.conf.d/*", "system_network"), - ("/etc/NetworkManager/system-connections/*", "system_network"), - ("/etc/sysconfig/network*", "system_network"), - ("/etc/sysconfig/network-scripts/*", "system_network"), # firewall ("/etc/nftables.conf", "system_firewall"), ("/etc/nftables.d/*", "system_firewall"), @@ -441,10 +422,6 @@ _SYSTEM_CAPTURE_GLOBS: List[tuple[str, str]] = [ ("/etc/iptables/rules.v6", "system_firewall"), ("/etc/ufw/*", "system_firewall"), ("/etc/default/ufw", "system_firewall"), - ("/etc/firewalld/*", "system_firewall"), - ("/etc/firewalld/zones/*", "system_firewall"), - # SELinux - ("/etc/selinux/config", "system_security"), # other ("/etc/rc.local", "system_rc"), ] @@ -576,51 +553,6 @@ def _iter_apt_capture_paths() -> List[tuple[str, str]]: return uniq -def _iter_dnf_capture_paths() -> List[tuple[str, str]]: - """Return (path, reason) pairs for DNF/YUM configuration on RPM systems. - - Captures: - - /etc/dnf/* (dnf.conf, vars, plugins, modules, automatic) - - /etc/yum.conf (legacy) - - /etc/yum.repos.d/*.repo - - /etc/pki/rpm-gpg/* (GPG key files) - """ - reasons: Dict[str, str] = {} - - for root, tag in ( - ("/etc/dnf", "dnf_config"), - ("/etc/yum", "yum_config"), - ): - if os.path.isdir(root): - for dirpath, _, filenames in os.walk(root): - for fn in filenames: - p = os.path.join(dirpath, fn) - if os.path.islink(p) or not os.path.isfile(p): - continue - reasons.setdefault(p, tag) - - # Legacy yum.conf. - if os.path.isfile("/etc/yum.conf") and not os.path.islink("/etc/yum.conf"): - reasons.setdefault("/etc/yum.conf", "yum_conf") - - # Repositories. - if os.path.isdir("/etc/yum.repos.d"): - for p in _iter_matching_files("/etc/yum.repos.d/*.repo"): - reasons[p] = "yum_repo" - - # RPM GPG keys. - if os.path.isdir("/etc/pki/rpm-gpg"): - for dirpath, _, filenames in os.walk("/etc/pki/rpm-gpg"): - for fn in filenames: - p = os.path.join(dirpath, fn) - if os.path.islink(p) or not os.path.isfile(p): - continue - reasons.setdefault(p, "rpm_gpg_key") - - # Stable ordering. - return [(p, reasons[p]) for p in sorted(reasons.keys())] - - def _iter_system_capture_paths() -> List[tuple[str, str]]: """Return (path, reason) pairs for essential system config/state (non-APT).""" out: List[tuple[str, str]] = [] @@ -668,12 +600,8 @@ def harvest( flush=True, ) - platform = detect_platform() - backend = get_backend(platform) - - owned_etc, etc_owner_map, topdir_to_pkgs, pkg_to_etc_paths = ( - backend.build_etc_index() - ) + owned_etc, etc_owner_map, topdir_to_pkgs, pkg_to_etc_paths = build_dpkg_etc_index() + conffiles_by_pkg = parse_status_conffiles() # ------------------------- # Service roles @@ -717,12 +645,12 @@ def harvest( candidates: Dict[str, str] = {} if ui.fragment_path: - p = backend.owner_of_path(ui.fragment_path) + p = dpkg_owner(ui.fragment_path) if p: pkgs.add(p) for exe in ui.exec_paths: - p = backend.owner_of_path(exe) + p = dpkg_owner(exe) if p: pkgs.add(p) @@ -747,7 +675,7 @@ def harvest( # logrotate.d entries) can still be attributed back to this service. service_role_aliases[role] = set(hints) | set(pkgs) | {role} - for sp in _maybe_add_specific_paths(hints, backend): + for sp in _maybe_add_specific_paths(hints): if not os.path.exists(sp): continue if sp in etc_owner_map: @@ -756,13 +684,31 @@ def harvest( candidates.setdefault(sp, "custom_specific_path") for pkg in sorted(pkgs): - etc_paths = pkg_to_etc_paths.get(pkg, []) - for path, reason in backend.modified_paths(pkg, etc_paths).items(): + conff = conffiles_by_pkg.get(pkg, {}) + md5sums = read_pkg_md5sums(pkg) + for path in pkg_to_etc_paths.get(pkg, []): if not os.path.isfile(path) or os.path.islink(path): continue - if backend.is_pkg_config_path(path): + if path.startswith("/etc/apt/"): continue - candidates.setdefault(path, reason) + if path in conff: + # Only capture conffiles when they differ from the package default. + try: + current = file_md5(path) + except OSError: + continue + if current != conff[path]: + candidates.setdefault(path, "modified_conffile") + continue + rel = path.lstrip("/") + baseline = md5sums.get(rel) + if baseline: + try: + current = file_md5(path) + except OSError: + continue + if current != baseline: + candidates.setdefault(path, "modified_packaged_file") # Capture custom/unowned files living under /etc/ for this service. # @@ -901,18 +847,18 @@ def harvest( # (useful when a timer triggers a service that isn't enabled). pkgs: Set[str] = set() if ti.fragment_path: - p = backend.owner_of_path(ti.fragment_path) + p = dpkg_owner(ti.fragment_path) if p: pkgs.add(p) if ti.trigger_unit and ti.trigger_unit.endswith(".service"): try: ui = get_unit_info(ti.trigger_unit) if ui.fragment_path: - p = backend.owner_of_path(ui.fragment_path) + p = dpkg_owner(ui.fragment_path) if p: pkgs.add(p) for exe in ui.exec_paths: - p = backend.owner_of_path(exe) + p = dpkg_owner(exe) if p: pkgs.add(p) except Exception: # nosec @@ -924,7 +870,7 @@ def harvest( # ------------------------- # Manually installed package roles # ------------------------- - manual_pkgs = backend.list_manual_packages() + manual_pkgs = list_manual_packages() # Avoid duplicate roles: if a manual package is already managed by any service role, skip its pkg_ role. covered_by_services: Set[str] = set() for s in service_snaps: @@ -947,26 +893,41 @@ def harvest( for tpath in timer_extra_by_pkg.get(pkg, []): candidates.setdefault(tpath, "related_timer") - etc_paths = pkg_to_etc_paths.get(pkg, []) - for path, reason in backend.modified_paths(pkg, etc_paths).items(): + conff = conffiles_by_pkg.get(pkg, {}) + md5sums = read_pkg_md5sums(pkg) + + for path in pkg_to_etc_paths.get(pkg, []): if not os.path.isfile(path) or os.path.islink(path): continue - if backend.is_pkg_config_path(path): + if path.startswith("/etc/apt/"): continue - candidates.setdefault(path, reason) + if path in conff: + try: + current = file_md5(path) + except OSError: + continue + if current != conff[path]: + candidates.setdefault(path, "modified_conffile") + continue + rel = path.lstrip("/") + baseline = md5sums.get(rel) + if baseline: + try: + current = file_md5(path) + except OSError: + continue + if current != baseline: + candidates.setdefault(path, "modified_packaged_file") topdirs = _topdirs_for_package(pkg, pkg_to_etc_paths) roots: List[str] = [] - # Collect candidate directories plus backend-specific common files. for td in sorted(topdirs): if td in SHARED_ETC_TOPDIRS: continue - if backend.is_pkg_config_path(f"/etc/{td}/") or backend.is_pkg_config_path( - f"/etc/{td}" - ): - continue roots.extend([f"/etc/{td}", f"/etc/{td}.d"]) - roots.extend(_maybe_add_specific_paths(set(topdirs), backend)) + roots.extend([f"/etc/default/{td}"]) + roots.extend([f"/etc/init.d/{td}"]) + roots.extend([f"/etc/sysctl.d/{td}.conf"]) # Capture any custom/unowned files under /etc/ for this # manually-installed package. This may include runtime-generated @@ -1070,48 +1031,26 @@ def harvest( ) # ------------------------- - # Package manager config role - # - Debian: apt_config - # - Fedora/RHEL-like: dnf_config + # apt_config role (APT configuration and keyrings) # ------------------------- apt_notes: List[str] = [] apt_excluded: List[ExcludedFile] = [] apt_managed: List[ManagedFile] = [] - dnf_notes: List[str] = [] - dnf_excluded: List[ExcludedFile] = [] - dnf_managed: List[ManagedFile] = [] - apt_role_name = "apt_config" - dnf_role_name = "dnf_config" + apt_role_seen = seen_by_role.setdefault(apt_role_name, set()) - if backend.name == "dpkg": - apt_role_seen = seen_by_role.setdefault(apt_role_name, set()) - for path, reason in _iter_apt_capture_paths(): - _capture_file( - bundle_dir=bundle_dir, - role_name=apt_role_name, - abs_path=path, - reason=reason, - policy=policy, - path_filter=path_filter, - managed_out=apt_managed, - excluded_out=apt_excluded, - seen_role=apt_role_seen, - ) - elif backend.name == "rpm": - dnf_role_seen = seen_by_role.setdefault(dnf_role_name, set()) - for path, reason in _iter_dnf_capture_paths(): - _capture_file( - bundle_dir=bundle_dir, - role_name=dnf_role_name, - abs_path=path, - reason=reason, - policy=policy, - path_filter=path_filter, - managed_out=dnf_managed, - excluded_out=dnf_excluded, - seen_role=dnf_role_seen, - ) + for path, reason in _iter_apt_capture_paths(): + _capture_file( + bundle_dir=bundle_dir, + role_name=apt_role_name, + abs_path=path, + reason=reason, + policy=policy, + path_filter=path_filter, + managed_out=apt_managed, + excluded_out=apt_excluded, + seen_role=apt_role_seen, + ) apt_config_snapshot = AptConfigSnapshot( role_name=apt_role_name, @@ -1119,12 +1058,6 @@ def harvest( excluded=apt_excluded, notes=apt_notes, ) - dnf_config_snapshot = DnfConfigSnapshot( - role_name=dnf_role_name, - managed_files=dnf_managed, - excluded=dnf_excluded, - notes=dnf_notes, - ) # ------------------------- # etc_custom role (unowned /etc files not already attributed elsewhere) @@ -1146,8 +1079,6 @@ def harvest( already.add(mf.path) for mf in apt_managed: already.add(mf.path) - for mf in dnf_managed: - already.add(mf.path) # Maps for re-attributing shared snippets (cron.d/logrotate.d) to existing roles. svc_by_role: Dict[str, ServiceSnapshot] = {s.role_name: s for s in service_snaps} @@ -1162,7 +1093,7 @@ def harvest( for pkg in s.packages: pkg_to_service_roles.setdefault(pkg, []).append(s.role_name) - # Alias -> role mapping used as a fallback when package ownership is missing. + # Alias -> role mapping used as a fallback when dpkg ownership is missing. # Prefer service roles over package roles when both would match. alias_ranked: Dict[str, tuple[int, str]] = {} @@ -1193,8 +1124,8 @@ def harvest( per service. Resolution order: - 1) package owner -> service role (if any service references the package) - 2) package owner -> package role (manual package role exists) + 1) dpkg owner -> service role (if any service references the package) + 2) dpkg owner -> package role (manual package role exists) 3) basename/stem alias match -> preferred role """ if path.startswith("/etc/logrotate.d/"): @@ -1216,7 +1147,7 @@ def harvest( seen.add(c) uniq.append(c) - pkg = backend.owner_of_path(path) + pkg = dpkg_owner(path) if pkg: svc_roles = sorted(set(pkg_to_service_roles.get(pkg, []))) if svc_roles: @@ -1295,7 +1226,7 @@ def harvest( for dirpath, _, filenames in os.walk("/etc"): for fn in filenames: path = os.path.join(dirpath, fn) - if backend.is_pkg_config_path(path): + if path.startswith("/etc/apt/"): continue if path in already: continue @@ -1482,22 +1413,13 @@ def harvest( ) state = { - "enroll": { - "version": get_enroll_version(), - }, - "host": { - "hostname": os.uname().nodename, - "os": platform.os_family, - "pkg_backend": backend.name, - "os_release": platform.os_release, - }, + "host": {"hostname": os.uname().nodename, "os": "debian"}, "users": asdict(users_snapshot), "services": [asdict(s) for s in service_snaps], "manual_packages": manual_pkgs, "manual_packages_skipped": manual_pkgs_skipped, "package_roles": [asdict(p) for p in pkg_snaps], "apt_config": asdict(apt_config_snapshot), - "dnf_config": asdict(dnf_config_snapshot), "etc_custom": asdict(etc_custom_snapshot), "usr_local_custom": asdict(usr_local_custom_snapshot), "extra_paths": asdict(extra_paths_snapshot), diff --git a/enroll/ignore.py b/enroll/ignore.py index 904997f..ab2cb96 100644 --- a/enroll/ignore.py +++ b/enroll/ignore.py @@ -43,7 +43,6 @@ DEFAULT_ALLOW_BINARY_GLOBS = [ "/usr/share/keyrings/*.gpg", "/usr/share/keyrings/*.pgp", "/usr/share/keyrings/*.asc", - "/etc/pki/rpm-gpg/*", ] SENSITIVE_CONTENT_PATTERNS = [ diff --git a/enroll/manifest.py b/enroll/manifest.py index 923040f..dbc2353 100644 --- a/enroll/manifest.py +++ b/enroll/manifest.py @@ -166,7 +166,6 @@ def _write_playbook_all(path: str, roles: List[str]) -> None: pb_lines = [ "---", "- name: Apply all roles on all hosts", - " gather_facts: true", " hosts: all", " become: true", " roles:", @@ -182,7 +181,6 @@ def _write_playbook_host(path: str, fqdn: str, roles: List[str]) -> None: "---", f"- name: Apply all roles on {fqdn}", f" hosts: {fqdn}", - " gather_facts: true", " become: true", " roles:", ] @@ -470,51 +468,6 @@ def _render_generic_files_tasks( """ -def _render_install_packages_tasks(role: str, var_prefix: str) -> str: - """Render cross-distro package installation tasks. - - We generate conditional tasks for apt/dnf/yum, falling back to the - generic `package` module. This keeps generated roles usable on both - Debian-like and RPM-like systems. - """ - return f"""# Generated by enroll - -- name: Install packages for {role} (APT) - ansible.builtin.apt: - name: "{{{{ {var_prefix}_packages | default([]) }}}}" - state: present - update_cache: true - when: - - ({var_prefix}_packages | default([])) | length > 0 - - ansible_facts.pkg_mgr | default('') == 'apt' - -- name: Install packages for {role} (DNF5) - ansible.builtin.dnf5: - name: "{{{{ {var_prefix}_packages | default([]) }}}}" - state: present - when: - - ({var_prefix}_packages | default([])) | length > 0 - - ansible_facts.pkg_mgr | default('') == 'dnf5' - -- name: Install packages for {role} (DNF/YUM) - ansible.builtin.dnf: - name: "{{{{ {var_prefix}_packages | default([]) }}}}" - state: present - when: - - ({var_prefix}_packages | default([])) | length > 0 - - ansible_facts.pkg_mgr | default('') in ['dnf', 'yum'] - -- name: Install packages for {role} (generic fallback) - ansible.builtin.package: - name: "{{{{ {var_prefix}_packages | default([]) }}}}" - state: present - when: - - ({var_prefix}_packages | default([])) | length > 0 - - ansible_facts.pkg_mgr | default('') not in ['apt', 'dnf', 'dnf5', 'yum'] - -""" - - def _prepare_bundle_dir( bundle: str, *, @@ -676,7 +629,6 @@ def _manifest_from_bundle_dir( package_roles: List[Dict[str, Any]] = state.get("package_roles", []) users_snapshot: Dict[str, Any] = state.get("users", {}) apt_config_snapshot: Dict[str, Any] = state.get("apt_config", {}) - dnf_config_snapshot: Dict[str, Any] = state.get("dnf_config", {}) etc_custom_snapshot: Dict[str, Any] = state.get("etc_custom", {}) usr_local_custom_snapshot: Dict[str, Any] = state.get("usr_local_custom", {}) extra_paths_snapshot: Dict[str, Any] = state.get("extra_paths", {}) @@ -712,7 +664,6 @@ def _manifest_from_bundle_dir( manifested_users_roles: List[str] = [] manifested_apt_config_roles: List[str] = [] - manifested_dnf_config_roles: List[str] = [] manifested_etc_custom_roles: List[str] = [] manifested_usr_local_custom_roles: List[str] = [] manifested_extra_paths_roles: List[str] = [] @@ -1090,157 +1041,6 @@ APT configuration harvested from the system (sources, pinning, and keyrings). manifested_apt_config_roles.append(role) - # ------------------------- - # dnf_config role (DNF/YUM repos, config, and RPM GPG keys) - # ------------------------- - if dnf_config_snapshot and dnf_config_snapshot.get("managed_files"): - role = dnf_config_snapshot.get("role_name", "dnf_config") - role_dir = os.path.join(roles_root, role) - _write_role_scaffold(role_dir) - - var_prefix = role - - managed_files = dnf_config_snapshot.get("managed_files", []) - excluded = dnf_config_snapshot.get("excluded", []) - notes = dnf_config_snapshot.get("notes", []) - - templated, jt_vars = _jinjify_managed_files( - bundle_dir, - role, - role_dir, - managed_files, - jt_exe=jt_exe, - jt_enabled=jt_enabled, - overwrite_templates=not site_mode, - ) - - if site_mode: - _copy_artifacts( - bundle_dir, - role, - _host_role_files_dir(out_dir, fqdn or "", role), - exclude_rels=templated, - ) - else: - _copy_artifacts( - bundle_dir, - role, - os.path.join(role_dir, "files"), - exclude_rels=templated, - ) - - files_var = _build_managed_files_var( - managed_files, - templated, - notify_other=None, - notify_systemd=None, - ) - - jt_map = _yaml_load_mapping(jt_vars) if jt_vars.strip() else {} - vars_map: Dict[str, Any] = {f"{var_prefix}_managed_files": files_var} - vars_map = _merge_mappings_overwrite(vars_map, jt_map) - - if site_mode: - _write_role_defaults(role_dir, {f"{var_prefix}_managed_files": []}) - _write_hostvars(out_dir, fqdn or "", role, vars_map) - else: - _write_role_defaults(role_dir, vars_map) - - tasks = "---\n" + _render_generic_files_tasks( - var_prefix, include_restart_notify=False - ) - with open( - os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8" - ) as f: - f.write(tasks.rstrip() + "\n") - - with open( - os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8" - ) as f: - f.write("---\ndependencies: []\n") - - # README: summarise repos and GPG key material - repo_paths: List[str] = [] - key_paths: List[str] = [] - repo_hosts: Set[str] = set() - - url_re = re.compile(r"(?:https?|ftp)://([^/\s]+)", re.IGNORECASE) - file_url_re = re.compile(r"file://(/[^\s]+)") - - for mf in managed_files: - p = str(mf.get("path") or "") - src_rel = str(mf.get("src_rel") or "") - if not p or not src_rel: - continue - - if p.startswith("/etc/yum.repos.d/") and p.endswith(".repo"): - repo_paths.append(p) - art_path = os.path.join(bundle_dir, "artifacts", role, src_rel) - try: - with open(art_path, "r", encoding="utf-8", errors="replace") as rf: - for line in rf: - s = line.strip() - if not s or s.startswith("#") or s.startswith(";"): - continue - # Collect hostnames from URLs (baseurl, mirrorlist, metalink, gpgkey...) - for m in url_re.finditer(s): - repo_hosts.add(m.group(1)) - # Collect local gpgkey file paths referenced as file:///... - for m in file_url_re.finditer(s): - key_paths.append(m.group(1)) - except OSError: - pass # nosec - - if p.startswith("/etc/pki/rpm-gpg/"): - key_paths.append(p) - - repo_paths = sorted(set(repo_paths)) - key_paths = sorted(set(key_paths)) - repos = sorted(repo_hosts) - - readme = ( - """# dnf_config - -DNF/YUM configuration harvested from the system (repos, config files, and RPM GPG keys). - -## Repository hosts -""" - + ("\n".join([f"- {h}" for h in repos]) or "- (none)") - + """\n -## Repo files -""" - + ("\n".join([f"- {p}" for p in repo_paths]) or "- (none)") - + """\n -## GPG keys -""" - + ("\n".join([f"- {p}" for p in key_paths]) or "- (none)") - + """\n -## Managed files -""" - + ( - "\n".join( - [f"- {mf.get('path')} ({mf.get('reason')})" for mf in managed_files] - ) - or "- (none)" - ) - + """\n -## Excluded -""" - + ( - "\n".join([f"- {e.get('path')} ({e.get('reason')})" for e in excluded]) - or "- (none)" - ) - + """\n -## Notes -""" - + ("\n".join([f"- {n}" for n in notes]) or "- (none)") - + """\n""" - ) - with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f: - f.write(readme) - - manifested_dnf_config_roles.append(role) - # ------------------------- # etc_custom role (unowned /etc not already attributed) # ------------------------- @@ -1657,7 +1457,19 @@ User-requested extra file harvesting. f.write(handlers) task_parts: List[str] = [] - task_parts.append("---\n" + _render_install_packages_tasks(role, var_prefix)) + task_parts.append( + f"""--- +# Generated by enroll + +- name: Install packages for {role} + ansible.builtin.apt: + name: "{{{{ {var_prefix}_packages | default([]) }}}}" + state: present + update_cache: true + when: ({var_prefix}_packages | default([])) | length > 0 + +""" + ) task_parts.append( _render_generic_files_tasks(var_prefix, include_restart_notify=True) @@ -1804,7 +1616,19 @@ Generated from `{unit}`. f.write(handlers) task_parts: List[str] = [] - task_parts.append("---\n" + _render_install_packages_tasks(role, var_prefix)) + task_parts.append( + f"""--- +# Generated by enroll + +- name: Install packages for {role} + ansible.builtin.apt: + name: "{{{{ {var_prefix}_packages | default([]) }}}}" + state: present + update_cache: true + when: ({var_prefix}_packages | default([])) | length > 0 + +""" + ) task_parts.append( _render_generic_files_tasks(var_prefix, include_restart_notify=False) ) @@ -1843,7 +1667,6 @@ Generated for package `{pkg}`. manifested_pkg_roles.append(role) all_roles = ( manifested_apt_config_roles - + manifested_dnf_config_roles + manifested_pkg_roles + manifested_service_roles + manifested_etc_custom_roles diff --git a/enroll/platform.py b/enroll/platform.py deleted file mode 100644 index 998b83d..0000000 --- a/enroll/platform.py +++ /dev/null @@ -1,261 +0,0 @@ -from __future__ import annotations - -import shutil -from dataclasses import dataclass -from typing import Dict, List, Optional, Set, Tuple - -from .fsutil import file_md5 - - -def _read_os_release(path: str = "/etc/os-release") -> Dict[str, str]: - out: Dict[str, str] = {} - try: - with open(path, "r", encoding="utf-8", errors="replace") as f: - for raw in f: - line = raw.strip() - if not line or line.startswith("#") or "=" not in line: - continue - k, v = line.split("=", 1) - k = k.strip() - v = v.strip().strip('"') - out[k] = v - except OSError: - return {} - return out - - -@dataclass -class PlatformInfo: - os_family: str # debian|redhat|unknown - pkg_backend: str # dpkg|rpm|unknown - os_release: Dict[str, str] - - -def detect_platform() -> PlatformInfo: - """Detect platform family and package backend. - - Uses /etc/os-release when available, with a conservative fallback to - checking for dpkg/rpm binaries. - """ - - osr = _read_os_release() - os_id = (osr.get("ID") or "").strip().lower() - likes = (osr.get("ID_LIKE") or "").strip().lower().split() - - deb_ids = {"debian", "ubuntu", "linuxmint", "raspbian", "kali"} - rhel_ids = { - "fedora", - "rhel", - "centos", - "rocky", - "almalinux", - "ol", - "oracle", - "scientific", - } - - if os_id in deb_ids or "debian" in likes: - return PlatformInfo(os_family="debian", pkg_backend="dpkg", os_release=osr) - if os_id in rhel_ids or any( - x in likes for x in ("rhel", "fedora", "centos", "redhat") - ): - return PlatformInfo(os_family="redhat", pkg_backend="rpm", os_release=osr) - - # Fallback heuristics. - if shutil.which("dpkg"): - return PlatformInfo(os_family="debian", pkg_backend="dpkg", os_release=osr) - if shutil.which("rpm"): - return PlatformInfo(os_family="redhat", pkg_backend="rpm", os_release=osr) - return PlatformInfo(os_family="unknown", pkg_backend="unknown", os_release=osr) - - -class PackageBackend: - """Backend abstraction for package ownership, config detection, and manual package lists.""" - - name: str - pkg_config_prefixes: Tuple[str, ...] - - def owner_of_path(self, path: str) -> Optional[str]: # pragma: no cover - raise NotImplementedError - - def list_manual_packages(self) -> List[str]: # pragma: no cover - raise NotImplementedError - - def build_etc_index( - self, - ) -> Tuple[ - Set[str], Dict[str, str], Dict[str, Set[str]], Dict[str, List[str]] - ]: # pragma: no cover - raise NotImplementedError - - def specific_paths_for_hints(self, hints: Set[str]) -> List[str]: - return [] - - def is_pkg_config_path(self, path: str) -> bool: - for pfx in self.pkg_config_prefixes: - if path == pfx or path.startswith(pfx): - return True - return False - - def modified_paths(self, pkg: str, etc_paths: List[str]) -> Dict[str, str]: - """Return a mapping of modified file paths -> reason label.""" - return {} - - -class DpkgBackend(PackageBackend): - name = "dpkg" - pkg_config_prefixes = ("/etc/apt/",) - - def __init__(self) -> None: - from .debian import parse_status_conffiles - - self._conffiles_by_pkg = parse_status_conffiles() - - def owner_of_path(self, path: str) -> Optional[str]: - from .debian import dpkg_owner - - return dpkg_owner(path) - - def list_manual_packages(self) -> List[str]: - from .debian import list_manual_packages - - return list_manual_packages() - - def build_etc_index(self): - from .debian import build_dpkg_etc_index - - return build_dpkg_etc_index() - - def specific_paths_for_hints(self, hints: Set[str]) -> List[str]: - paths: List[str] = [] - for h in hints: - paths.extend( - [ - f"/etc/default/{h}", - f"/etc/init.d/{h}", - f"/etc/sysctl.d/{h}.conf", - ] - ) - return paths - - def modified_paths(self, pkg: str, etc_paths: List[str]) -> Dict[str, str]: - from .debian import read_pkg_md5sums - - out: Dict[str, str] = {} - conff = self._conffiles_by_pkg.get(pkg, {}) - md5sums = read_pkg_md5sums(pkg) - - for path in etc_paths: - if not path.startswith("/etc/"): - continue - if self.is_pkg_config_path(path): - continue - if path in conff: - try: - current = file_md5(path) - except OSError: - continue - if current != conff[path]: - out[path] = "modified_conffile" - continue - - rel = path.lstrip("/") - baseline = md5sums.get(rel) - if baseline: - try: - current = file_md5(path) - except OSError: - continue - if current != baseline: - out[path] = "modified_packaged_file" - return out - - -class RpmBackend(PackageBackend): - name = "rpm" - pkg_config_prefixes = ( - "/etc/dnf/", - "/etc/yum/", - "/etc/yum.repos.d/", - "/etc/yum.conf", - ) - - def __init__(self) -> None: - self._modified_cache: Dict[str, Set[str]] = {} - self._config_cache: Dict[str, Set[str]] = {} - - def owner_of_path(self, path: str) -> Optional[str]: - from .rpm import rpm_owner - - return rpm_owner(path) - - def list_manual_packages(self) -> List[str]: - from .rpm import list_manual_packages - - return list_manual_packages() - - def build_etc_index(self): - from .rpm import build_rpm_etc_index - - return build_rpm_etc_index() - - def specific_paths_for_hints(self, hints: Set[str]) -> List[str]: - paths: List[str] = [] - for h in hints: - paths.extend( - [ - f"/etc/sysconfig/{h}", - f"/etc/sysconfig/{h}.conf", - f"/etc/sysctl.d/{h}.conf", - ] - ) - return paths - - def _config_files(self, pkg: str) -> Set[str]: - if pkg in self._config_cache: - return self._config_cache[pkg] - from .rpm import rpm_config_files - - s = rpm_config_files(pkg) - self._config_cache[pkg] = s - return s - - def _modified_files(self, pkg: str) -> Set[str]: - if pkg in self._modified_cache: - return self._modified_cache[pkg] - from .rpm import rpm_modified_files - - s = rpm_modified_files(pkg) - self._modified_cache[pkg] = s - return s - - def modified_paths(self, pkg: str, etc_paths: List[str]) -> Dict[str, str]: - out: Dict[str, str] = {} - modified = self._modified_files(pkg) - if not modified: - return out - config = self._config_files(pkg) - - for path in etc_paths: - if not path.startswith("/etc/"): - continue - if self.is_pkg_config_path(path): - continue - if path not in modified: - continue - out[path] = ( - "modified_conffile" if path in config else "modified_packaged_file" - ) - return out - - -def get_backend(info: Optional[PlatformInfo] = None) -> PackageBackend: - info = info or detect_platform() - if info.pkg_backend == "dpkg": - return DpkgBackend() - if info.pkg_backend == "rpm": - return RpmBackend() - # Unknown: be conservative and use an rpm backend if rpm exists, otherwise dpkg. - if shutil.which("rpm"): - return RpmBackend() - return DpkgBackend() diff --git a/enroll/rpm.py b/enroll/rpm.py deleted file mode 100644 index 947617c..0000000 --- a/enroll/rpm.py +++ /dev/null @@ -1,266 +0,0 @@ -from __future__ import annotations - -import os -import re -import shutil -import subprocess # nosec -from typing import Dict, List, Optional, Set, Tuple - - -def _run( - cmd: list[str], *, allow_fail: bool = False, merge_err: bool = False -) -> tuple[int, str]: - """Run a command and return (rc, stdout). - - If merge_err is True, stderr is merged into stdout to preserve ordering. - """ - p = subprocess.run( - cmd, - check=False, - text=True, - stdout=subprocess.PIPE, - stderr=(subprocess.STDOUT if merge_err else subprocess.PIPE), - ) # nosec - out = p.stdout or "" - if (not allow_fail) and p.returncode != 0: - err = "" if merge_err else (p.stderr or "") - raise RuntimeError(f"Command failed: {cmd}\n{err}{out}") - return p.returncode, out - - -def rpm_owner(path: str) -> Optional[str]: - """Return owning package name for a path, or None if unowned.""" - if not path: - return None - rc, out = _run( - ["rpm", "-qf", "--qf", "%{NAME}\n", path], allow_fail=True, merge_err=True - ) - if rc != 0: - return None - for line in out.splitlines(): - line = line.strip() - if not line: - continue - if "is not owned" in line: - return None - # With --qf we expect just the package name. - if re.match(r"^[A-Za-z0-9_.+:-]+$", line): - # Strip any accidental epoch/name-version-release output. - return line.split(":", 1)[-1].strip() if line else None - return None - - -_ARCH_SUFFIXES = { - "noarch", - "x86_64", - "i686", - "aarch64", - "armv7hl", - "ppc64le", - "s390x", - "riscv64", -} - - -def _strip_arch(token: str) -> str: - """Strip a trailing .ARCH from a yum/dnf package token.""" - t = token.strip() - if "." not in t: - return t - head, tail = t.rsplit(".", 1) - if tail in _ARCH_SUFFIXES: - return head - return t - - -def list_manual_packages() -> List[str]: - """Return packages considered "user-installed" on RPM-based systems. - - Best-effort: - 1) dnf repoquery --userinstalled - 2) dnf history userinstalled - 3) yum history userinstalled - - If none are available, returns an empty list. - """ - - def _dedupe(pkgs: List[str]) -> List[str]: - return sorted({p for p in (pkgs or []) if p}) - - if shutil.which("dnf"): - # Prefer a machine-friendly output. - for cmd in ( - ["dnf", "-q", "repoquery", "--userinstalled", "--qf", "%{name}\n"], - ["dnf", "-q", "repoquery", "--userinstalled"], - ): - rc, out = _run(cmd, allow_fail=True, merge_err=True) - if rc == 0 and out.strip(): - pkgs = [] - for line in out.splitlines(): - line = line.strip() - if not line or line.startswith("Loaded plugins"): - continue - pkgs.append(_strip_arch(line.split()[0])) - if pkgs: - return _dedupe(pkgs) - - # Fallback: human-oriented output. - rc, out = _run( - ["dnf", "-q", "history", "userinstalled"], allow_fail=True, merge_err=True - ) - if rc == 0 and out.strip(): - pkgs = [] - for line in out.splitlines(): - line = line.strip() - if not line or line.startswith("Installed") or line.startswith("Last"): - continue - # Often: "vim-enhanced.x86_64" - tok = line.split()[0] - pkgs.append(_strip_arch(tok)) - if pkgs: - return _dedupe(pkgs) - - if shutil.which("yum"): - rc, out = _run( - ["yum", "-q", "history", "userinstalled"], allow_fail=True, merge_err=True - ) - if rc == 0 and out.strip(): - pkgs = [] - for line in out.splitlines(): - line = line.strip() - if ( - not line - or line.startswith("Installed") - or line.startswith("Loaded") - ): - continue - tok = line.split()[0] - pkgs.append(_strip_arch(tok)) - if pkgs: - return _dedupe(pkgs) - - return [] - - -def _walk_etc_files() -> List[str]: - out: List[str] = [] - for dirpath, _, filenames in os.walk("/etc"): - for fn in filenames: - p = os.path.join(dirpath, fn) - if os.path.islink(p) or not os.path.isfile(p): - continue - out.append(p) - return out - - -def build_rpm_etc_index() -> ( - Tuple[Set[str], Dict[str, str], Dict[str, Set[str]], Dict[str, List[str]]] -): - """Best-effort equivalent of build_dpkg_etc_index for RPM systems. - - This builds indexes by walking the live /etc tree and querying RPM ownership - for each file. - - Returns: - owned_etc_paths: set of /etc paths owned by rpm - etc_owner_map: /etc/path -> pkg - topdir_to_pkgs: "nginx" -> {"nginx", ...} based on /etc//... - pkg_to_etc_paths: pkg -> list of owned /etc paths - """ - - owned: Set[str] = set() - owner: Dict[str, str] = {} - topdir_to_pkgs: Dict[str, Set[str]] = {} - pkg_to_etc: Dict[str, List[str]] = {} - - paths = _walk_etc_files() - - # Query in chunks to avoid excessive process spawns. - chunk_size = 250 - - not_owned_re = re.compile( - r"^file\s+(?P.+?)\s+is\s+not\s+owned\s+by\s+any\s+package", re.IGNORECASE - ) - - for i in range(0, len(paths), chunk_size): - chunk = paths[i : i + chunk_size] - rc, out = _run( - ["rpm", "-qf", "--qf", "%{NAME}\n", *chunk], - allow_fail=True, - merge_err=True, - ) - - lines = [ln.strip() for ln in out.splitlines() if ln.strip()] - # Heuristic: rpm prints one output line per input path. If that isn't - # true (warnings/errors), fall back to per-file queries for this chunk. - if len(lines) != len(chunk): - for p in chunk: - pkg = rpm_owner(p) - if not pkg: - continue - owned.add(p) - owner.setdefault(p, pkg) - pkg_to_etc.setdefault(pkg, []).append(p) - parts = p.split("/", 3) - if len(parts) >= 3 and parts[2]: - topdir_to_pkgs.setdefault(parts[2], set()).add(pkg) - continue - - for pth, line in zip(chunk, lines): - if not line: - continue - if not_owned_re.match(line) or "is not owned" in line: - continue - pkg = line.split()[0].strip() - if not pkg: - continue - owned.add(pth) - owner.setdefault(pth, pkg) - pkg_to_etc.setdefault(pkg, []).append(pth) - parts = pth.split("/", 3) - if len(parts) >= 3 and parts[2]: - topdir_to_pkgs.setdefault(parts[2], set()).add(pkg) - - for k, v in list(pkg_to_etc.items()): - pkg_to_etc[k] = sorted(set(v)) - - return owned, owner, topdir_to_pkgs, pkg_to_etc - - -def rpm_config_files(pkg: str) -> Set[str]: - """Return config files for a package (rpm -qc).""" - rc, out = _run(["rpm", "-qc", pkg], allow_fail=True, merge_err=True) - if rc != 0: - return set() - files: Set[str] = set() - for line in out.splitlines(): - line = line.strip() - if line.startswith("/"): - files.add(line) - return files - - -def rpm_modified_files(pkg: str) -> Set[str]: - """Return files reported as modified by rpm verification (rpm -V). - - rpm -V only prints lines for differences/missing files. - """ - rc, out = _run(["rpm", "-V", pkg], allow_fail=True, merge_err=True) - # rc is non-zero when there are differences; we still want the output. - files: Set[str] = set() - for raw in out.splitlines(): - line = raw.strip() - if not line: - continue - # Typical forms: - # S.5....T. c /etc/foo.conf - # missing /etc/bar - m = re.search(r"\s(/\S+)$", line) - if m: - files.add(m.group(1)) - continue - if line.startswith("missing"): - parts = line.split() - if parts and parts[-1].startswith("/"): - files.add(parts[-1]) - return files diff --git a/enroll/version.py b/enroll/version.py deleted file mode 100644 index bbe78b6..0000000 --- a/enroll/version.py +++ /dev/null @@ -1,32 +0,0 @@ -from __future__ import annotations - - -def get_enroll_version() -> str: - """ - Best-effort version lookup that works when installed via: - - poetry/pip/wheel - - deb/rpm system packages - Falls back to "0+unknown" when running from an unpacked source tree. - """ - try: - from importlib.metadata import ( - packages_distributions, - version, - ) - except Exception: - # Very old Python or unusual environment - return "unknown" - - # Map import package -> dist(s) - dist_names = [] - try: - dist_names = (packages_distributions() or {}).get("enroll", []) or [] - except Exception: - dist_names = [] - - # Try mapped dists first, then a reasonable default - for dist in [*dist_names, "enroll"]: - try: - return version(dist) - except Exception: - return "unknown" diff --git a/tests/test_debian.py b/tests/test_debian.py index abad361..333afc1 100644 --- a/tests/test_debian.py +++ b/tests/test_debian.py @@ -1,5 +1,6 @@ from __future__ import annotations +import hashlib from pathlib import Path @@ -96,3 +97,58 @@ def test_parse_status_conffiles_handles_continuations(tmp_path: Path): assert m["nginx"]["/etc/nginx/nginx.conf"] == "abcdef" assert m["nginx"]["/etc/nginx/mime.types"] == "123456" assert "other" not in m + + +def test_read_pkg_md5sums_and_file_md5(tmp_path: Path, monkeypatch): + import enroll.debian as d + + # Patch /var/lib/dpkg/info/.md5sums lookup to a tmp file. + md5_file = tmp_path / "pkg.md5sums" + md5_file.write_text("0123456789abcdef etc/foo.conf\n", encoding="utf-8") + + def fake_exists(path: str) -> bool: + return path.endswith("/var/lib/dpkg/info/p1.md5sums") + + real_open = open + + def fake_open(path: str, *args, **kwargs): + if path.endswith("/var/lib/dpkg/info/p1.md5sums"): + return real_open(md5_file, *args, **kwargs) + return real_open(path, *args, **kwargs) + + monkeypatch.setattr(d.os.path, "exists", fake_exists) + monkeypatch.setattr("builtins.open", fake_open) + + m = d.read_pkg_md5sums("p1") + assert m == {"etc/foo.conf": "0123456789abcdef"} + + content = b"hello world\n" + p = tmp_path / "x" + p.write_bytes(content) + assert d.file_md5(str(p)) == hashlib.md5(content).hexdigest() + + +def test_stat_triplet_fallbacks(tmp_path: Path, monkeypatch): + import enroll.debian as d + import sys + + p = tmp_path / "f" + p.write_text("x", encoding="utf-8") + + class FakePwdMod: + @staticmethod + def getpwuid(_): # pragma: no cover + raise KeyError + + class FakeGrpMod: + @staticmethod + def getgrgid(_): # pragma: no cover + raise KeyError + + # stat_triplet imports pwd/grp inside the function, so patch sys.modules. + monkeypatch.setitem(sys.modules, "pwd", FakePwdMod) + monkeypatch.setitem(sys.modules, "grp", FakeGrpMod) + owner, group, mode = d.stat_triplet(str(p)) + assert owner.isdigit() + assert group.isdigit() + assert mode.isdigit() and len(mode) == 4 diff --git a/tests/test_fsutil.py b/tests/test_fsutil.py deleted file mode 100644 index ebe2224..0000000 --- a/tests/test_fsutil.py +++ /dev/null @@ -1,25 +0,0 @@ -from __future__ import annotations - -import hashlib -import os -from pathlib import Path - -from enroll.fsutil import file_md5, stat_triplet - - -def test_file_md5_matches_hashlib(tmp_path: Path): - p = tmp_path / "x" - p.write_bytes(b"hello world") - expected = hashlib.md5(b"hello world").hexdigest() # nosec - assert file_md5(str(p)) == expected - - -def test_stat_triplet_reports_mode(tmp_path: Path): - p = tmp_path / "x" - p.write_text("x", encoding="utf-8") - os.chmod(p, 0o600) - - owner, group, mode = stat_triplet(str(p)) - assert mode == "0600" - assert owner # non-empty string - assert group # non-empty string diff --git a/tests/test_harvest.py b/tests/test_harvest.py index a0d22ec..fa796f0 100644 --- a/tests/test_harvest.py +++ b/tests/test_harvest.py @@ -2,7 +2,6 @@ import json from pathlib import Path import enroll.harvest as h -from enroll.platform import PlatformInfo from enroll.systemd import UnitInfo @@ -11,64 +10,6 @@ class AllowAllPolicy: return None -class FakeBackend: - """Minimal backend stub for harvest tests. - - The real backends (dpkg/rpm) enumerate the live system (dpkg status, rpm - databases, etc). These tests instead control all backend behaviour. - """ - - def __init__( - self, - *, - name: str, - owned_etc: set[str], - etc_owner_map: dict[str, str], - topdir_to_pkgs: dict[str, set[str]], - pkg_to_etc_paths: dict[str, list[str]], - manual_pkgs: list[str], - owner_fn, - modified_by_pkg: dict[str, dict[str, str]] | None = None, - pkg_config_prefixes: tuple[str, ...] = ("/etc/apt/",), - ): - self.name = name - self.pkg_config_prefixes = pkg_config_prefixes - self._owned_etc = owned_etc - self._etc_owner_map = etc_owner_map - self._topdir_to_pkgs = topdir_to_pkgs - self._pkg_to_etc_paths = pkg_to_etc_paths - self._manual = manual_pkgs - self._owner_fn = owner_fn - self._modified_by_pkg = modified_by_pkg or {} - - def build_etc_index(self): - return ( - self._owned_etc, - self._etc_owner_map, - self._topdir_to_pkgs, - self._pkg_to_etc_paths, - ) - - def owner_of_path(self, path: str): - return self._owner_fn(path) - - def list_manual_packages(self): - return list(self._manual) - - def specific_paths_for_hints(self, hints: set[str]): - return [] - - def is_pkg_config_path(self, path: str) -> bool: - for pfx in self.pkg_config_prefixes: - if path == pfx or path.startswith(pfx): - return True - return False - - def modified_paths(self, pkg: str, etc_paths: list[str]): - # Test-controlled; ignore etc_paths. - return dict(self._modified_by_pkg.get(pkg, {})) - - def test_harvest_dedup_manual_packages_and_builds_etc_custom( monkeypatch, tmp_path: Path ): @@ -81,7 +22,7 @@ def test_harvest_dedup_manual_packages_and_builds_etc_custom( real_exists = os.path.exists real_islink = os.path.islink - # Fake filesystem: two /etc files exist, only one is package-owned. + # Fake filesystem: two /etc files exist, only one is dpkg-owned. # Also include some /usr/local files to populate usr_local_custom. files = { "/etc/openvpn/server.conf": b"server", @@ -152,7 +93,6 @@ def test_harvest_dedup_manual_packages_and_builds_etc_custom( # Avoid real system access monkeypatch.setattr(h, "list_enabled_services", lambda: ["openvpn.service"]) - monkeypatch.setattr(h, "list_enabled_timers", lambda: []) monkeypatch.setattr( h, "get_unit_info", @@ -169,30 +109,29 @@ def test_harvest_dedup_manual_packages_and_builds_etc_custom( ), ) - # Package index: openvpn owns /etc/openvpn/server.conf; keyboard is unowned. - owned_etc = {"/etc/openvpn/server.conf"} - etc_owner_map = {"/etc/openvpn/server.conf": "openvpn"} - topdir_to_pkgs = {"openvpn": {"openvpn"}} - pkg_to_etc_paths = {"openvpn": ["/etc/openvpn/server.conf"], "curl": []} + # Debian package index: openvpn owns /etc/openvpn/server.conf; keyboard is unowned. + def fake_build_index(): + owned_etc = {"/etc/openvpn/server.conf"} + etc_owner_map = {"/etc/openvpn/server.conf": "openvpn"} + topdir_to_pkgs = {"openvpn": {"openvpn"}} + pkg_to_etc_paths = {"openvpn": ["/etc/openvpn/server.conf"], "curl": []} + return owned_etc, etc_owner_map, topdir_to_pkgs, pkg_to_etc_paths - backend = FakeBackend( - name="dpkg", - owned_etc=owned_etc, - etc_owner_map=etc_owner_map, - topdir_to_pkgs=topdir_to_pkgs, - pkg_to_etc_paths=pkg_to_etc_paths, - manual_pkgs=["openvpn", "curl"], - owner_fn=lambda p: "openvpn" if "openvpn" in (p or "") else None, - modified_by_pkg={ - "openvpn": {"/etc/openvpn/server.conf": "modified_conffile"}, - }, + monkeypatch.setattr(h, "build_dpkg_etc_index", fake_build_index) + + # openvpn conffile hash mismatch => should be captured under service role + monkeypatch.setattr( + h, + "parse_status_conffiles", + lambda: {"openvpn": {"/etc/openvpn/server.conf": "old"}}, ) + monkeypatch.setattr(h, "read_pkg_md5sums", lambda pkg: {}) + monkeypatch.setattr(h, "file_md5", lambda path: "new") monkeypatch.setattr( - h, "detect_platform", lambda: PlatformInfo("debian", "dpkg", {}) + h, "dpkg_owner", lambda p: "openvpn" if "openvpn" in p else None ) - monkeypatch.setattr(h, "get_backend", lambda info=None: backend) - + monkeypatch.setattr(h, "list_manual_packages", lambda: ["openvpn", "curl"]) monkeypatch.setattr(h, "collect_non_system_users", lambda: []) def fake_stat_triplet(p: str): @@ -268,7 +207,6 @@ def test_shared_cron_snippet_prefers_matching_role_over_lexicographic( monkeypatch.setattr( h, "list_enabled_services", lambda: ["apparmor.service", "ntpsec.service"] ) - monkeypatch.setattr(h, "list_enabled_timers", lambda: []) def fake_unit_info(unit: str) -> UnitInfo: if unit == "apparmor.service": @@ -297,35 +235,31 @@ def test_shared_cron_snippet_prefers_matching_role_over_lexicographic( monkeypatch.setattr(h, "get_unit_info", fake_unit_info) + # Dpkg /etc index: no owned /etc paths needed for this test. + monkeypatch.setattr( + h, + "build_dpkg_etc_index", + lambda: (set(), {}, {}, {}), + ) + monkeypatch.setattr(h, "parse_status_conffiles", lambda: {}) + monkeypatch.setattr(h, "read_pkg_md5sums", lambda pkg: {}) + monkeypatch.setattr(h, "file_md5", lambda path: "x") + monkeypatch.setattr(h, "list_manual_packages", lambda: []) + monkeypatch.setattr(h, "collect_non_system_users", lambda: []) + # Make apparmor *also* claim the ntpsec package (simulates overly-broad # package inference). The snippet routing should still prefer role 'ntpsec'. - def fake_owner(p: str): + def fake_dpkg_owner(p: str): if p == "/etc/cron.d/ntpsec": return "ntpsec" - if "apparmor" in (p or ""): + if "apparmor" in p: return "ntpsec" # intentionally misleading - if "ntpsec" in (p or "") or "ntpd" in (p or ""): + if "ntpsec" in p or "ntpd" in p: return "ntpsec" return None - backend = FakeBackend( - name="dpkg", - owned_etc=set(), - etc_owner_map={}, - topdir_to_pkgs={}, - pkg_to_etc_paths={}, - manual_pkgs=[], - owner_fn=fake_owner, - modified_by_pkg={}, - ) - - monkeypatch.setattr( - h, "detect_platform", lambda: PlatformInfo("debian", "dpkg", {}) - ) - monkeypatch.setattr(h, "get_backend", lambda info=None: backend) - + monkeypatch.setattr(h, "dpkg_owner", fake_dpkg_owner) monkeypatch.setattr(h, "stat_triplet", lambda p: ("root", "root", "0644")) - monkeypatch.setattr(h, "collect_non_system_users", lambda: []) def fake_copy(bundle_dir: str, role_name: str, abs_path: str, src_rel: str): dst = Path(bundle_dir) / "artifacts" / role_name / src_rel @@ -334,7 +268,11 @@ def test_shared_cron_snippet_prefers_matching_role_over_lexicographic( monkeypatch.setattr(h, "_copy_into_bundle", fake_copy) - state_path = h.harvest(str(bundle), policy=AllowAllPolicy()) + class AllowAll: + def deny_reason(self, path: str): + return None + + state_path = h.harvest(str(bundle), policy=AllowAll()) st = json.loads(Path(state_path).read_text(encoding="utf-8")) # Cron snippet should end up attached to the ntpsec role, not apparmor. diff --git a/tests/test_manifest.py b/tests/test_manifest.py index cbfc208..92c3dfc 100644 --- a/tests/test_manifest.py +++ b/tests/test_manifest.py @@ -322,96 +322,3 @@ def test_copy2_replace_overwrites_readonly_destination(tmp_path: Path): assert dst.read_text(encoding="utf-8") == "new" mode = stat.S_IMODE(dst.stat().st_mode) assert mode & stat.S_IWUSR # destination should remain mergeable - - -def test_manifest_includes_dnf_config_role_when_present(tmp_path: Path): - bundle = tmp_path / "bundle" - out = tmp_path / "ansible" - - # Create a dnf_config artifact. - (bundle / "artifacts" / "dnf_config" / "etc" / "dnf").mkdir( - parents=True, exist_ok=True - ) - (bundle / "artifacts" / "dnf_config" / "etc" / "dnf" / "dnf.conf").write_text( - "[main]\n", encoding="utf-8" - ) - - state = { - "host": {"hostname": "test", "os": "redhat", "pkg_backend": "rpm"}, - "users": { - "role_name": "users", - "users": [], - "managed_files": [], - "excluded": [], - "notes": [], - }, - "services": [], - "package_roles": [], - "manual_packages": [], - "manual_packages_skipped": [], - "apt_config": { - "role_name": "apt_config", - "managed_files": [], - "excluded": [], - "notes": [], - }, - "dnf_config": { - "role_name": "dnf_config", - "managed_files": [ - { - "path": "/etc/dnf/dnf.conf", - "src_rel": "etc/dnf/dnf.conf", - "owner": "root", - "group": "root", - "mode": "0644", - "reason": "dnf_config", - } - ], - "excluded": [], - "notes": [], - }, - "etc_custom": { - "role_name": "etc_custom", - "managed_files": [], - "excluded": [], - "notes": [], - }, - "usr_local_custom": { - "role_name": "usr_local_custom", - "managed_files": [], - "excluded": [], - "notes": [], - }, - "extra_paths": { - "role_name": "extra_paths", - "include_patterns": [], - "exclude_patterns": [], - "managed_files": [], - "excluded": [], - "notes": [], - }, - } - - bundle.mkdir(parents=True, exist_ok=True) - (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") - - manifest(str(bundle), str(out)) - - pb = (out / "playbook.yml").read_text(encoding="utf-8") - assert "- dnf_config" in pb - - tasks = (out / "roles" / "dnf_config" / "tasks" / "main.yml").read_text( - encoding="utf-8" - ) - # Ensure the role exists and contains some file deployment logic. - assert "Deploy any other managed files" in tasks - - -def test_render_install_packages_tasks_contains_dnf_branch(): - from enroll.manifest import _render_install_packages_tasks - - txt = _render_install_packages_tasks("role", "role") - assert "ansible.builtin.apt" in txt - assert "ansible.builtin.dnf" in txt - assert "ansible.builtin.package" in txt - assert "pkg_mgr" in txt diff --git a/tests/test_platform.py b/tests/test_platform.py deleted file mode 100644 index 7ff66c6..0000000 --- a/tests/test_platform.py +++ /dev/null @@ -1,93 +0,0 @@ -from __future__ import annotations - -from pathlib import Path - -import enroll.platform as platform - - -def test_read_os_release_parses_kv_and_strips_quotes(tmp_path: Path): - p = tmp_path / "os-release" - p.write_text( - """ -# comment -ID=fedora -ID_LIKE=\"rhel centos\" -NAME=\"Fedora Linux\" -EMPTY= -NOEQUALS -""", - encoding="utf-8", - ) - - osr = platform._read_os_release(str(p)) - assert osr["ID"] == "fedora" - assert osr["ID_LIKE"] == "rhel centos" - assert osr["NAME"] == "Fedora Linux" - assert osr["EMPTY"] == "" - assert "NOEQUALS" not in osr - - -def test_detect_platform_prefers_os_release(monkeypatch): - monkeypatch.setattr( - platform, - "_read_os_release", - lambda path="/etc/os-release": {"ID": "fedora", "ID_LIKE": "rhel"}, - ) - # If os-release is decisive we shouldn't need which() - monkeypatch.setattr(platform.shutil, "which", lambda exe: None) - - info = platform.detect_platform() - assert info.os_family == "redhat" - assert info.pkg_backend == "rpm" - - -def test_detect_platform_fallbacks_to_dpkg_when_unknown(monkeypatch): - monkeypatch.setattr(platform, "_read_os_release", lambda path="/etc/os-release": {}) - monkeypatch.setattr( - platform.shutil, "which", lambda exe: "/usr/bin/dpkg" if exe == "dpkg" else None - ) - - info = platform.detect_platform() - assert info.os_family == "debian" - assert info.pkg_backend == "dpkg" - - -def test_get_backend_unknown_prefers_rpm_if_present(monkeypatch): - monkeypatch.setattr( - platform.shutil, "which", lambda exe: "/usr/bin/rpm" if exe == "rpm" else None - ) - - b = platform.get_backend( - platform.PlatformInfo(os_family="unknown", pkg_backend="unknown", os_release={}) - ) - assert isinstance(b, platform.RpmBackend) - - -def test_rpm_backend_modified_paths_labels_conffiles(monkeypatch): - b = platform.RpmBackend() - - # Pretend rpm -V says both files changed, but only one is a config file. - monkeypatch.setattr(b, "_modified_files", lambda pkg: {"/etc/foo.conf", "/etc/bar"}) - monkeypatch.setattr(b, "_config_files", lambda pkg: {"/etc/foo.conf"}) - - out = b.modified_paths("mypkg", ["/etc/foo.conf", "/etc/bar", "/etc/dnf/dnf.conf"]) - assert out["/etc/foo.conf"] == "modified_conffile" - assert out["/etc/bar"] == "modified_packaged_file" - # Package-manager config paths are excluded. - assert "/etc/dnf/dnf.conf" not in out - - -def test_specific_paths_for_hints_differs_between_backends(): - # We can exercise this without instantiating DpkgBackend (which reads dpkg status) - class Dummy(platform.PackageBackend): - name = "dummy" - pkg_config_prefixes = ("/etc/apt/",) - - d = Dummy() - assert d.is_pkg_config_path("/etc/apt/sources.list") - assert not d.is_pkg_config_path("/etc/ssh/sshd_config") - - r = platform.RpmBackend() - paths = set(r.specific_paths_for_hints({"nginx"})) - assert "/etc/sysconfig/nginx" in paths - assert "/etc/sysconfig/nginx.conf" in paths diff --git a/tests/test_rpm.py b/tests/test_rpm.py deleted file mode 100644 index ea97c12..0000000 --- a/tests/test_rpm.py +++ /dev/null @@ -1,131 +0,0 @@ -from __future__ import annotations - -import enroll.rpm as rpm - - -def test_rpm_owner_returns_none_when_unowned(monkeypatch): - monkeypatch.setattr( - rpm, - "_run", - lambda cmd, allow_fail=False, merge_err=False: ( - 1, - "file /etc/x is not owned by any package\n", - ), - ) - assert rpm.rpm_owner("/etc/x") is None - - -def test_rpm_owner_parses_name(monkeypatch): - monkeypatch.setattr( - rpm, "_run", lambda cmd, allow_fail=False, merge_err=False: (0, "bash\n") - ) - assert rpm.rpm_owner("/bin/bash") == "bash" - - -def test_strip_arch_strips_known_arches(): - assert rpm._strip_arch("vim-enhanced.x86_64") == "vim-enhanced" - assert rpm._strip_arch("foo.noarch") == "foo" - assert rpm._strip_arch("weird.token") == "weird.token" - - -def test_list_manual_packages_prefers_dnf_repoquery(monkeypatch): - monkeypatch.setattr( - rpm.shutil, "which", lambda exe: "/usr/bin/dnf" if exe == "dnf" else None - ) - - def fake_run(cmd, allow_fail=False, merge_err=False): - # First repoquery form returns usable output. - if cmd[:3] == ["dnf", "-q", "repoquery"]: - return 0, "vim-enhanced.x86_64\nhtop\nvim-enhanced.x86_64\n" - raise AssertionError(f"unexpected cmd: {cmd}") - - monkeypatch.setattr(rpm, "_run", fake_run) - - pkgs = rpm.list_manual_packages() - assert pkgs == ["htop", "vim-enhanced"] - - -def test_list_manual_packages_falls_back_to_history(monkeypatch): - monkeypatch.setattr( - rpm.shutil, "which", lambda exe: "/usr/bin/dnf" if exe == "dnf" else None - ) - - def fake_run(cmd, allow_fail=False, merge_err=False): - # repoquery fails - if cmd[:3] == ["dnf", "-q", "repoquery"]: - return 1, "" - if cmd[:3] == ["dnf", "-q", "history"]: - return ( - 0, - "Installed Packages\nvim-enhanced.x86_64\nLast metadata expiration check: 0:01:00 ago\n", - ) - raise AssertionError(f"unexpected cmd: {cmd}") - - monkeypatch.setattr(rpm, "_run", fake_run) - - pkgs = rpm.list_manual_packages() - assert pkgs == ["vim-enhanced"] - - -def test_build_rpm_etc_index_uses_fallback_when_rpm_output_mismatches(monkeypatch): - # Two files in /etc, one owned, one unowned. - monkeypatch.setattr( - rpm, "_walk_etc_files", lambda: ["/etc/owned.conf", "/etc/unowned.conf"] - ) - - # Simulate chunk query producing unexpected extra line (mismatch) -> triggers per-file fallback. - monkeypatch.setattr( - rpm, - "_run", - lambda cmd, allow_fail=False, merge_err=False: (0, "ownedpkg\nEXTRA\nTHIRD\n"), - ) - monkeypatch.setattr( - rpm, "rpm_owner", lambda p: "ownedpkg" if p == "/etc/owned.conf" else None - ) - - owned, owner_map, topdir_to_pkgs, pkg_to_etc = rpm.build_rpm_etc_index() - - assert owned == {"/etc/owned.conf"} - assert owner_map["/etc/owned.conf"] == "ownedpkg" - assert "owned.conf" in topdir_to_pkgs - assert pkg_to_etc["ownedpkg"] == ["/etc/owned.conf"] - - -def test_build_rpm_etc_index_parses_chunk_output(monkeypatch): - monkeypatch.setattr( - rpm, "_walk_etc_files", lambda: ["/etc/ssh/sshd_config", "/etc/notowned"] - ) - - def fake_run(cmd, allow_fail=False, merge_err=False): - # One output line per input path. - return 0, "openssh-server\nfile /etc/notowned is not owned by any package\n" - - monkeypatch.setattr(rpm, "_run", fake_run) - - owned, owner_map, topdir_to_pkgs, pkg_to_etc = rpm.build_rpm_etc_index() - - assert "/etc/ssh/sshd_config" in owned - assert "/etc/notowned" not in owned - assert owner_map["/etc/ssh/sshd_config"] == "openssh-server" - assert "ssh" in topdir_to_pkgs - assert "openssh-server" in topdir_to_pkgs["ssh"] - assert pkg_to_etc["openssh-server"] == ["/etc/ssh/sshd_config"] - - -def test_rpm_config_files_and_modified_files_parsing(monkeypatch): - monkeypatch.setattr( - rpm, - "_run", - lambda cmd, allow_fail=False, merge_err=False: ( - 0, - "/etc/foo.conf\n/usr/bin/tool\n", - ), - ) - assert rpm.rpm_config_files("mypkg") == {"/etc/foo.conf", "/usr/bin/tool"} - - # rpm -V returns only changed/missing files - out = "S.5....T. c /etc/foo.conf\nmissing /etc/bar\n" - monkeypatch.setattr( - rpm, "_run", lambda cmd, allow_fail=False, merge_err=False: (1, out) - ) - assert rpm.rpm_modified_files("mypkg") == {"/etc/foo.conf", "/etc/bar"}