diff --git a/CHANGELOG.md b/CHANGELOG.md
index f2cb109..f92e0b7 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,8 @@
+# 0.2.0
+
+ * Add version CLI arg
+ * Add ability to enroll RH-style systems (DNF5/DNF/RPM)
+
# 0.1.7
* Fix an attribution bug for certain files ending up in the wrong package/role.
diff --git a/README.md b/README.md
index c6b8123..d075951 100644
--- a/README.md
+++ b/README.md
@@ -4,15 +4,15 @@
-**enroll** inspects a Linux machine (currently Debian-only) and generates Ansible roles/playbooks (and optionally inventory) for what it finds.
+**enroll** inspects a Linux machine (Debian-like or RedHat-like) and generates Ansible roles/playbooks (and optionally inventory) for what it finds.
- Detects packages that have been installed.
-- Detects Debian package ownership of `/etc` files using dpkg’s local database.
-- Captures config that has **changed from packaged defaults** (dpkg conffile hashes + package md5sums when available).
+- Detects package ownership of `/etc` files where possible
+- Captures config that has **changed from packaged defaults** where possible (e.g dpkg conffile hashes + package md5sums when available).
- Also captures **service-relevant custom/unowned files** under `/etc//...` (e.g. drop-in config includes).
- Defensively excludes likely secrets (path denylist + content sniff + size caps).
- Captures non-system users and their SSH public keys.
-- Captures miscellaneous `/etc` files it can’t attribute to a package and installs them in an `etc_custom` role.
+- Captures miscellaneous `/etc` files it can't attribute to a package and installs them in an `etc_custom` role.
- Ditto for /usr/local/bin (for non-binary files) and /usr/local/etc
- Avoids trying to start systemd services that were detected as inactive during harvest.
@@ -41,8 +41,8 @@ Use when enrolling **one server** (or generating a “golden” role set you int
**Characteristics**
- Roles are more self-contained.
-- Raw config files live in the role’s `files/`.
-- Template variables live in the role’s `defaults/main.yml`.
+- Raw config files live in the role's `files/`.
+- Template variables live in the role's `defaults/main.yml`.
### Multi-site mode (`--fqdn`)
Use when enrolling **several existing servers** quickly, especially if they differ.
@@ -68,13 +68,13 @@ Harvest state about a host and write a harvest bundle.
- “Manual” packages
- Changed-from-default config (plus related custom/unowned files under service dirs)
- Non-system users + SSH public keys
-- Misc `/etc` that can’t be attributed to a package (`etc_custom` role)
+- Misc `/etc` that can't be attributed to a package (`etc_custom` role)
- Optional user-specified extra files/dirs via `--include-path` (emitted as an `extra_paths` role at manifest time)
**Common flags**
- Remote harvesting:
- `--remote-host`, `--remote-user`, `--remote-port`
- - `--no-sudo` (if you don’t want/need sudo)
+ - `--no-sudo` (if you don't want/need sudo)
- Sensitive-data behaviour:
- default: tries to avoid likely secrets
- `--dangerous`: disables secret-safety checks (see “Sensitive data” below)
@@ -233,7 +233,7 @@ poetry run enroll --help
## Found a bug / have a suggestion?
-My Forgejo doesn’t currently support federation, so I haven’t opened registration/login for issues.
+My Forgejo doesn't currently support federation, so I haven't opened registration/login for issues.
Instead, email me (see `pyproject.toml`) or contact me on the Fediverse:
diff --git a/enroll/cli.py b/enroll/cli.py
index ae9aba0..bb4d3f1 100644
--- a/enroll/cli.py
+++ b/enroll/cli.py
@@ -15,6 +15,7 @@ from .harvest import harvest
from .manifest import manifest
from .remote import remote_harvest
from .sopsutil import SopsError, encrypt_file_binary
+from .version import get_enroll_version
def _discover_config_path(argv: list[str]) -> Optional[Path]:
@@ -318,13 +319,6 @@ def _jt_mode(args: argparse.Namespace) -> str:
return "auto"
-def _add_remote_args(p: argparse.ArgumentParser) -> None:
- p.add_argument(
- "--remote-host",
- help="SSH host to run harvesting on (if set, harvest runs remotely and is pulled locally).",
- )
-
-
def _add_config_args(p: argparse.ArgumentParser) -> None:
p.add_argument(
"-c",
@@ -339,6 +333,13 @@ def _add_config_args(p: argparse.ArgumentParser) -> None:
action="store_true",
help="Do not load any INI config file (even if one would be auto-discovered).",
)
+
+
+def _add_remote_args(p: argparse.ArgumentParser) -> None:
+ p.add_argument(
+ "--remote-host",
+ help="SSH host to run harvesting on (if set, harvest runs remotely and is pulled locally).",
+ )
p.add_argument(
"--remote-port",
type=int,
@@ -354,11 +355,18 @@ def _add_config_args(p: argparse.ArgumentParser) -> None:
def main() -> None:
ap = argparse.ArgumentParser(prog="enroll")
+ ap.add_argument(
+ "-v",
+ "--version",
+ action="version",
+ version=f"{get_enroll_version()}",
+ )
_add_config_args(ap)
sub = ap.add_subparsers(dest="cmd", required=True)
h = sub.add_parser("harvest", help="Harvest service/package/config state")
_add_config_args(h)
+ _add_remote_args(h)
h.add_argument(
"--out",
help=(
@@ -406,7 +414,6 @@ def main() -> None:
action="store_true",
help="Don't use sudo on the remote host (when using --remote options). This may result in a limited harvest due to permission restrictions.",
)
- _add_remote_args(h)
m = sub.add_parser("manifest", help="Render Ansible roles from a harvest")
_add_config_args(m)
@@ -443,6 +450,7 @@ def main() -> None:
"single-shot", help="Harvest state, then manifest Ansible code, in one shot"
)
_add_config_args(s)
+ _add_remote_args(s)
s.add_argument(
"--harvest",
help=(
@@ -500,7 +508,6 @@ def main() -> None:
),
)
_add_common_manifest_args(s)
- _add_remote_args(s)
d = sub.add_parser("diff", help="Compare two harvests and report differences")
_add_config_args(d)
@@ -602,14 +609,12 @@ def main() -> None:
)
args = ap.parse_args(argv)
- remote_host: Optional[str] = getattr(args, "remote_host", None)
-
try:
if args.cmd == "harvest":
sops_fps = getattr(args, "sops", None)
- if remote_host:
+ if args.remote_host:
if sops_fps:
- out_file = _resolve_sops_out_file(args.out, hint=remote_host)
+ out_file = _resolve_sops_out_file(args.out, hint=args.remote_host)
with tempfile.TemporaryDirectory(prefix="enroll-harvest-") as td:
tmp_bundle = Path(td) / "bundle"
tmp_bundle.mkdir(parents=True, exist_ok=True)
@@ -619,7 +624,7 @@ def main() -> None:
pass
remote_harvest(
local_out_dir=tmp_bundle,
- remote_host=remote_host,
+ remote_host=args.remote_host,
remote_port=int(args.remote_port),
remote_user=args.remote_user,
dangerous=bool(args.dangerous),
@@ -635,11 +640,11 @@ def main() -> None:
out_dir = (
Path(args.out)
if args.out
- else new_harvest_cache_dir(hint=remote_host).dir
+ else new_harvest_cache_dir(hint=args.remote_host).dir
)
state = remote_harvest(
local_out_dir=out_dir,
- remote_host=remote_host,
+ remote_host=args.remote_host,
remote_port=int(args.remote_port),
remote_user=args.remote_user,
dangerous=bool(args.dangerous),
@@ -669,12 +674,16 @@ def main() -> None:
)
print(str(out_file))
else:
- if not args.out:
- raise SystemExit(
- "error: --out is required unless --remote-host is set"
+ if args.out:
+ out_dir = args.out
+ else:
+ out_dir = (
+ Path(args.out)
+ if args.out
+ else new_harvest_cache_dir(hint=args.remote_host).dir
)
path = harvest(
- args.out,
+ out_dir,
dangerous=bool(args.dangerous),
include_paths=list(getattr(args, "include_path", []) or []),
exclude_paths=list(getattr(args, "exclude_path", []) or []),
@@ -747,9 +756,11 @@ def main() -> None:
raise SystemExit(2)
elif args.cmd == "single-shot":
sops_fps = getattr(args, "sops", None)
- if remote_host:
+ if args.remote_host:
if sops_fps:
- out_file = _resolve_sops_out_file(args.harvest, hint=remote_host)
+ out_file = _resolve_sops_out_file(
+ args.harvest, hint=args.remote_host
+ )
with tempfile.TemporaryDirectory(prefix="enroll-harvest-") as td:
tmp_bundle = Path(td) / "bundle"
tmp_bundle.mkdir(parents=True, exist_ok=True)
@@ -759,7 +770,7 @@ def main() -> None:
pass
remote_harvest(
local_out_dir=tmp_bundle,
- remote_host=remote_host,
+ remote_host=args.remote_host,
remote_port=int(args.remote_port),
remote_user=args.remote_user,
dangerous=bool(args.dangerous),
@@ -784,11 +795,11 @@ def main() -> None:
harvest_dir = (
Path(args.harvest)
if args.harvest
- else new_harvest_cache_dir(hint=remote_host).dir
+ else new_harvest_cache_dir(hint=args.remote_host).dir
)
remote_harvest(
local_out_dir=harvest_dir,
- remote_host=remote_host,
+ remote_host=args.remote_host,
remote_port=int(args.remote_port),
remote_user=args.remote_user,
dangerous=bool(args.dangerous),
diff --git a/enroll/debian.py b/enroll/debian.py
index 0ddc1f3..7e1ee2d 100644
--- a/enroll/debian.py
+++ b/enroll/debian.py
@@ -1,7 +1,6 @@
from __future__ import annotations
import glob
-import hashlib
import os
import subprocess # nosec
from typing import Dict, List, Optional, Set, Tuple
@@ -180,28 +179,3 @@ def read_pkg_md5sums(pkg: str) -> Dict[str, str]:
md5, rel = line.split(None, 1)
m[rel.strip()] = md5.strip()
return m
-
-
-def file_md5(path: str) -> str:
- h = hashlib.md5() # nosec
- with open(path, "rb") as f:
- for chunk in iter(lambda: f.read(1024 * 1024), b""):
- h.update(chunk)
- return h.hexdigest()
-
-
-def stat_triplet(path: str) -> Tuple[str, str, str]:
- st = os.stat(path, follow_symlinks=True)
- mode = oct(st.st_mode & 0o777)[2:].zfill(4)
-
- import pwd, grp
-
- try:
- owner = pwd.getpwuid(st.st_uid).pw_name
- except KeyError:
- owner = str(st.st_uid)
- try:
- group = grp.getgrgid(st.st_gid).gr_name
- except KeyError:
- group = str(st.st_gid)
- return owner, group, mode
diff --git a/enroll/fsutil.py b/enroll/fsutil.py
new file mode 100644
index 0000000..3d18df6
--- /dev/null
+++ b/enroll/fsutil.py
@@ -0,0 +1,40 @@
+from __future__ import annotations
+
+import hashlib
+import os
+from typing import Tuple
+
+
+def file_md5(path: str) -> str:
+ """Return hex MD5 of a file.
+
+ Used for Debian dpkg baseline comparisons.
+ """
+ h = hashlib.md5() # nosec
+ with open(path, "rb") as f:
+ for chunk in iter(lambda: f.read(1024 * 1024), b""):
+ h.update(chunk)
+ return h.hexdigest()
+
+
+def stat_triplet(path: str) -> Tuple[str, str, str]:
+ """Return (owner, group, mode) for a path.
+
+ owner/group are usernames/group names when resolvable, otherwise numeric ids.
+ mode is a zero-padded octal string (e.g. "0644").
+ """
+ st = os.stat(path, follow_symlinks=True)
+ mode = oct(st.st_mode & 0o777)[2:].zfill(4)
+
+ import grp
+ import pwd
+
+ try:
+ owner = pwd.getpwuid(st.st_uid).pw_name
+ except KeyError:
+ owner = str(st.st_uid)
+ try:
+ group = grp.getgrgid(st.st_gid).gr_name
+ except KeyError:
+ group = str(st.st_gid)
+ return owner, group, mode
diff --git a/enroll/harvest.py b/enroll/harvest.py
index d678b89..bb706b1 100644
--- a/enroll/harvest.py
+++ b/enroll/harvest.py
@@ -15,18 +15,12 @@ from .systemd import (
get_timer_info,
UnitQueryError,
)
-from .debian import (
- build_dpkg_etc_index,
- dpkg_owner,
- file_md5,
- list_manual_packages,
- parse_status_conffiles,
- read_pkg_md5sums,
- stat_triplet,
-)
+from .fsutil import stat_triplet
+from .platform import detect_platform, get_backend
from .ignore import IgnorePolicy
from .pathfilter import PathFilter, expand_includes
from .accounts import collect_non_system_users
+from .version import get_enroll_version
@dataclass
@@ -85,6 +79,14 @@ class AptConfigSnapshot:
notes: List[str]
+@dataclass
+class DnfConfigSnapshot:
+ role_name: str
+ managed_files: List[ManagedFile]
+ excluded: List[ExcludedFile]
+ notes: List[str]
+
+
@dataclass
class EtcCustomSnapshot:
role_name: str
@@ -158,6 +160,13 @@ SHARED_ETC_TOPDIRS = {
"sudoers.d",
"sysctl.d",
"systemd",
+ # RPM-family shared trees
+ "dnf",
+ "yum",
+ "yum.repos.d",
+ "sysconfig",
+ "pki",
+ "firewalld",
}
@@ -314,17 +323,23 @@ def _add_pkgs_from_etc_topdirs(
pkgs.add(p)
-def _maybe_add_specific_paths(hints: Set[str]) -> List[str]:
- paths: List[str] = []
- for h in hints:
- paths.extend(
- [
- f"/etc/default/{h}",
- f"/etc/init.d/{h}",
- f"/etc/sysctl.d/{h}.conf",
- ]
- )
- return paths
+def _maybe_add_specific_paths(hints: Set[str], backend) -> List[str]:
+ # Delegate to backend-specific conventions (e.g. /etc/default on Debian,
+ # /etc/sysconfig on Fedora/RHEL). Always include sysctl.d.
+ try:
+ return backend.specific_paths_for_hints(hints)
+ except Exception:
+ # Best-effort fallback (Debian-ish).
+ paths: List[str] = []
+ for h in hints:
+ paths.extend(
+ [
+ f"/etc/default/{h}",
+ f"/etc/init.d/{h}",
+ f"/etc/sysctl.d/{h}.conf",
+ ]
+ )
+ return paths
def _scan_unowned_under_roots(
@@ -408,6 +423,7 @@ _SYSTEM_CAPTURE_GLOBS: List[tuple[str, str]] = [
("/etc/anacron/*", "system_cron"),
("/var/spool/cron/crontabs/*", "system_cron"),
("/var/spool/crontabs/*", "system_cron"),
+ ("/var/spool/cron/*", "system_cron"),
# network
("/etc/netplan/*", "system_network"),
("/etc/systemd/network/*", "system_network"),
@@ -415,6 +431,9 @@ _SYSTEM_CAPTURE_GLOBS: List[tuple[str, str]] = [
("/etc/network/interfaces.d/*", "system_network"),
("/etc/resolvconf.conf", "system_network"),
("/etc/resolvconf/resolv.conf.d/*", "system_network"),
+ ("/etc/NetworkManager/system-connections/*", "system_network"),
+ ("/etc/sysconfig/network*", "system_network"),
+ ("/etc/sysconfig/network-scripts/*", "system_network"),
# firewall
("/etc/nftables.conf", "system_firewall"),
("/etc/nftables.d/*", "system_firewall"),
@@ -422,6 +441,10 @@ _SYSTEM_CAPTURE_GLOBS: List[tuple[str, str]] = [
("/etc/iptables/rules.v6", "system_firewall"),
("/etc/ufw/*", "system_firewall"),
("/etc/default/ufw", "system_firewall"),
+ ("/etc/firewalld/*", "system_firewall"),
+ ("/etc/firewalld/zones/*", "system_firewall"),
+ # SELinux
+ ("/etc/selinux/config", "system_security"),
# other
("/etc/rc.local", "system_rc"),
]
@@ -553,6 +576,51 @@ def _iter_apt_capture_paths() -> List[tuple[str, str]]:
return uniq
+def _iter_dnf_capture_paths() -> List[tuple[str, str]]:
+ """Return (path, reason) pairs for DNF/YUM configuration on RPM systems.
+
+ Captures:
+ - /etc/dnf/* (dnf.conf, vars, plugins, modules, automatic)
+ - /etc/yum.conf (legacy)
+ - /etc/yum.repos.d/*.repo
+ - /etc/pki/rpm-gpg/* (GPG key files)
+ """
+ reasons: Dict[str, str] = {}
+
+ for root, tag in (
+ ("/etc/dnf", "dnf_config"),
+ ("/etc/yum", "yum_config"),
+ ):
+ if os.path.isdir(root):
+ for dirpath, _, filenames in os.walk(root):
+ for fn in filenames:
+ p = os.path.join(dirpath, fn)
+ if os.path.islink(p) or not os.path.isfile(p):
+ continue
+ reasons.setdefault(p, tag)
+
+ # Legacy yum.conf.
+ if os.path.isfile("/etc/yum.conf") and not os.path.islink("/etc/yum.conf"):
+ reasons.setdefault("/etc/yum.conf", "yum_conf")
+
+ # Repositories.
+ if os.path.isdir("/etc/yum.repos.d"):
+ for p in _iter_matching_files("/etc/yum.repos.d/*.repo"):
+ reasons[p] = "yum_repo"
+
+ # RPM GPG keys.
+ if os.path.isdir("/etc/pki/rpm-gpg"):
+ for dirpath, _, filenames in os.walk("/etc/pki/rpm-gpg"):
+ for fn in filenames:
+ p = os.path.join(dirpath, fn)
+ if os.path.islink(p) or not os.path.isfile(p):
+ continue
+ reasons.setdefault(p, "rpm_gpg_key")
+
+ # Stable ordering.
+ return [(p, reasons[p]) for p in sorted(reasons.keys())]
+
+
def _iter_system_capture_paths() -> List[tuple[str, str]]:
"""Return (path, reason) pairs for essential system config/state (non-APT)."""
out: List[tuple[str, str]] = []
@@ -600,8 +668,12 @@ def harvest(
flush=True,
)
- owned_etc, etc_owner_map, topdir_to_pkgs, pkg_to_etc_paths = build_dpkg_etc_index()
- conffiles_by_pkg = parse_status_conffiles()
+ platform = detect_platform()
+ backend = get_backend(platform)
+
+ owned_etc, etc_owner_map, topdir_to_pkgs, pkg_to_etc_paths = (
+ backend.build_etc_index()
+ )
# -------------------------
# Service roles
@@ -645,12 +717,12 @@ def harvest(
candidates: Dict[str, str] = {}
if ui.fragment_path:
- p = dpkg_owner(ui.fragment_path)
+ p = backend.owner_of_path(ui.fragment_path)
if p:
pkgs.add(p)
for exe in ui.exec_paths:
- p = dpkg_owner(exe)
+ p = backend.owner_of_path(exe)
if p:
pkgs.add(p)
@@ -675,7 +747,7 @@ def harvest(
# logrotate.d entries) can still be attributed back to this service.
service_role_aliases[role] = set(hints) | set(pkgs) | {role}
- for sp in _maybe_add_specific_paths(hints):
+ for sp in _maybe_add_specific_paths(hints, backend):
if not os.path.exists(sp):
continue
if sp in etc_owner_map:
@@ -684,31 +756,13 @@ def harvest(
candidates.setdefault(sp, "custom_specific_path")
for pkg in sorted(pkgs):
- conff = conffiles_by_pkg.get(pkg, {})
- md5sums = read_pkg_md5sums(pkg)
- for path in pkg_to_etc_paths.get(pkg, []):
+ etc_paths = pkg_to_etc_paths.get(pkg, [])
+ for path, reason in backend.modified_paths(pkg, etc_paths).items():
if not os.path.isfile(path) or os.path.islink(path):
continue
- if path.startswith("/etc/apt/"):
+ if backend.is_pkg_config_path(path):
continue
- if path in conff:
- # Only capture conffiles when they differ from the package default.
- try:
- current = file_md5(path)
- except OSError:
- continue
- if current != conff[path]:
- candidates.setdefault(path, "modified_conffile")
- continue
- rel = path.lstrip("/")
- baseline = md5sums.get(rel)
- if baseline:
- try:
- current = file_md5(path)
- except OSError:
- continue
- if current != baseline:
- candidates.setdefault(path, "modified_packaged_file")
+ candidates.setdefault(path, reason)
# Capture custom/unowned files living under /etc/ for this service.
#
@@ -847,18 +901,18 @@ def harvest(
# (useful when a timer triggers a service that isn't enabled).
pkgs: Set[str] = set()
if ti.fragment_path:
- p = dpkg_owner(ti.fragment_path)
+ p = backend.owner_of_path(ti.fragment_path)
if p:
pkgs.add(p)
if ti.trigger_unit and ti.trigger_unit.endswith(".service"):
try:
ui = get_unit_info(ti.trigger_unit)
if ui.fragment_path:
- p = dpkg_owner(ui.fragment_path)
+ p = backend.owner_of_path(ui.fragment_path)
if p:
pkgs.add(p)
for exe in ui.exec_paths:
- p = dpkg_owner(exe)
+ p = backend.owner_of_path(exe)
if p:
pkgs.add(p)
except Exception: # nosec
@@ -870,7 +924,7 @@ def harvest(
# -------------------------
# Manually installed package roles
# -------------------------
- manual_pkgs = list_manual_packages()
+ manual_pkgs = backend.list_manual_packages()
# Avoid duplicate roles: if a manual package is already managed by any service role, skip its pkg_ role.
covered_by_services: Set[str] = set()
for s in service_snaps:
@@ -893,41 +947,26 @@ def harvest(
for tpath in timer_extra_by_pkg.get(pkg, []):
candidates.setdefault(tpath, "related_timer")
- conff = conffiles_by_pkg.get(pkg, {})
- md5sums = read_pkg_md5sums(pkg)
-
- for path in pkg_to_etc_paths.get(pkg, []):
+ etc_paths = pkg_to_etc_paths.get(pkg, [])
+ for path, reason in backend.modified_paths(pkg, etc_paths).items():
if not os.path.isfile(path) or os.path.islink(path):
continue
- if path.startswith("/etc/apt/"):
+ if backend.is_pkg_config_path(path):
continue
- if path in conff:
- try:
- current = file_md5(path)
- except OSError:
- continue
- if current != conff[path]:
- candidates.setdefault(path, "modified_conffile")
- continue
- rel = path.lstrip("/")
- baseline = md5sums.get(rel)
- if baseline:
- try:
- current = file_md5(path)
- except OSError:
- continue
- if current != baseline:
- candidates.setdefault(path, "modified_packaged_file")
+ candidates.setdefault(path, reason)
topdirs = _topdirs_for_package(pkg, pkg_to_etc_paths)
roots: List[str] = []
+ # Collect candidate directories plus backend-specific common files.
for td in sorted(topdirs):
if td in SHARED_ETC_TOPDIRS:
continue
+ if backend.is_pkg_config_path(f"/etc/{td}/") or backend.is_pkg_config_path(
+ f"/etc/{td}"
+ ):
+ continue
roots.extend([f"/etc/{td}", f"/etc/{td}.d"])
- roots.extend([f"/etc/default/{td}"])
- roots.extend([f"/etc/init.d/{td}"])
- roots.extend([f"/etc/sysctl.d/{td}.conf"])
+ roots.extend(_maybe_add_specific_paths(set(topdirs), backend))
# Capture any custom/unowned files under /etc/ for this
# manually-installed package. This may include runtime-generated
@@ -1031,26 +1070,48 @@ def harvest(
)
# -------------------------
- # apt_config role (APT configuration and keyrings)
+ # Package manager config role
+ # - Debian: apt_config
+ # - Fedora/RHEL-like: dnf_config
# -------------------------
apt_notes: List[str] = []
apt_excluded: List[ExcludedFile] = []
apt_managed: List[ManagedFile] = []
- apt_role_name = "apt_config"
- apt_role_seen = seen_by_role.setdefault(apt_role_name, set())
+ dnf_notes: List[str] = []
+ dnf_excluded: List[ExcludedFile] = []
+ dnf_managed: List[ManagedFile] = []
- for path, reason in _iter_apt_capture_paths():
- _capture_file(
- bundle_dir=bundle_dir,
- role_name=apt_role_name,
- abs_path=path,
- reason=reason,
- policy=policy,
- path_filter=path_filter,
- managed_out=apt_managed,
- excluded_out=apt_excluded,
- seen_role=apt_role_seen,
- )
+ apt_role_name = "apt_config"
+ dnf_role_name = "dnf_config"
+
+ if backend.name == "dpkg":
+ apt_role_seen = seen_by_role.setdefault(apt_role_name, set())
+ for path, reason in _iter_apt_capture_paths():
+ _capture_file(
+ bundle_dir=bundle_dir,
+ role_name=apt_role_name,
+ abs_path=path,
+ reason=reason,
+ policy=policy,
+ path_filter=path_filter,
+ managed_out=apt_managed,
+ excluded_out=apt_excluded,
+ seen_role=apt_role_seen,
+ )
+ elif backend.name == "rpm":
+ dnf_role_seen = seen_by_role.setdefault(dnf_role_name, set())
+ for path, reason in _iter_dnf_capture_paths():
+ _capture_file(
+ bundle_dir=bundle_dir,
+ role_name=dnf_role_name,
+ abs_path=path,
+ reason=reason,
+ policy=policy,
+ path_filter=path_filter,
+ managed_out=dnf_managed,
+ excluded_out=dnf_excluded,
+ seen_role=dnf_role_seen,
+ )
apt_config_snapshot = AptConfigSnapshot(
role_name=apt_role_name,
@@ -1058,6 +1119,12 @@ def harvest(
excluded=apt_excluded,
notes=apt_notes,
)
+ dnf_config_snapshot = DnfConfigSnapshot(
+ role_name=dnf_role_name,
+ managed_files=dnf_managed,
+ excluded=dnf_excluded,
+ notes=dnf_notes,
+ )
# -------------------------
# etc_custom role (unowned /etc files not already attributed elsewhere)
@@ -1079,6 +1146,8 @@ def harvest(
already.add(mf.path)
for mf in apt_managed:
already.add(mf.path)
+ for mf in dnf_managed:
+ already.add(mf.path)
# Maps for re-attributing shared snippets (cron.d/logrotate.d) to existing roles.
svc_by_role: Dict[str, ServiceSnapshot] = {s.role_name: s for s in service_snaps}
@@ -1093,7 +1162,7 @@ def harvest(
for pkg in s.packages:
pkg_to_service_roles.setdefault(pkg, []).append(s.role_name)
- # Alias -> role mapping used as a fallback when dpkg ownership is missing.
+ # Alias -> role mapping used as a fallback when package ownership is missing.
# Prefer service roles over package roles when both would match.
alias_ranked: Dict[str, tuple[int, str]] = {}
@@ -1124,8 +1193,8 @@ def harvest(
per service.
Resolution order:
- 1) dpkg owner -> service role (if any service references the package)
- 2) dpkg owner -> package role (manual package role exists)
+ 1) package owner -> service role (if any service references the package)
+ 2) package owner -> package role (manual package role exists)
3) basename/stem alias match -> preferred role
"""
if path.startswith("/etc/logrotate.d/"):
@@ -1147,7 +1216,7 @@ def harvest(
seen.add(c)
uniq.append(c)
- pkg = dpkg_owner(path)
+ pkg = backend.owner_of_path(path)
if pkg:
svc_roles = sorted(set(pkg_to_service_roles.get(pkg, [])))
if svc_roles:
@@ -1226,7 +1295,7 @@ def harvest(
for dirpath, _, filenames in os.walk("/etc"):
for fn in filenames:
path = os.path.join(dirpath, fn)
- if path.startswith("/etc/apt/"):
+ if backend.is_pkg_config_path(path):
continue
if path in already:
continue
@@ -1413,13 +1482,22 @@ def harvest(
)
state = {
- "host": {"hostname": os.uname().nodename, "os": "debian"},
+ "enroll": {
+ "version": get_enroll_version(),
+ },
+ "host": {
+ "hostname": os.uname().nodename,
+ "os": platform.os_family,
+ "pkg_backend": backend.name,
+ "os_release": platform.os_release,
+ },
"users": asdict(users_snapshot),
"services": [asdict(s) for s in service_snaps],
"manual_packages": manual_pkgs,
"manual_packages_skipped": manual_pkgs_skipped,
"package_roles": [asdict(p) for p in pkg_snaps],
"apt_config": asdict(apt_config_snapshot),
+ "dnf_config": asdict(dnf_config_snapshot),
"etc_custom": asdict(etc_custom_snapshot),
"usr_local_custom": asdict(usr_local_custom_snapshot),
"extra_paths": asdict(extra_paths_snapshot),
diff --git a/enroll/ignore.py b/enroll/ignore.py
index ab2cb96..904997f 100644
--- a/enroll/ignore.py
+++ b/enroll/ignore.py
@@ -43,6 +43,7 @@ DEFAULT_ALLOW_BINARY_GLOBS = [
"/usr/share/keyrings/*.gpg",
"/usr/share/keyrings/*.pgp",
"/usr/share/keyrings/*.asc",
+ "/etc/pki/rpm-gpg/*",
]
SENSITIVE_CONTENT_PATTERNS = [
diff --git a/enroll/manifest.py b/enroll/manifest.py
index dbc2353..923040f 100644
--- a/enroll/manifest.py
+++ b/enroll/manifest.py
@@ -166,6 +166,7 @@ def _write_playbook_all(path: str, roles: List[str]) -> None:
pb_lines = [
"---",
"- name: Apply all roles on all hosts",
+ " gather_facts: true",
" hosts: all",
" become: true",
" roles:",
@@ -181,6 +182,7 @@ def _write_playbook_host(path: str, fqdn: str, roles: List[str]) -> None:
"---",
f"- name: Apply all roles on {fqdn}",
f" hosts: {fqdn}",
+ " gather_facts: true",
" become: true",
" roles:",
]
@@ -468,6 +470,51 @@ def _render_generic_files_tasks(
"""
+def _render_install_packages_tasks(role: str, var_prefix: str) -> str:
+ """Render cross-distro package installation tasks.
+
+ We generate conditional tasks for apt/dnf/yum, falling back to the
+ generic `package` module. This keeps generated roles usable on both
+ Debian-like and RPM-like systems.
+ """
+ return f"""# Generated by enroll
+
+- name: Install packages for {role} (APT)
+ ansible.builtin.apt:
+ name: "{{{{ {var_prefix}_packages | default([]) }}}}"
+ state: present
+ update_cache: true
+ when:
+ - ({var_prefix}_packages | default([])) | length > 0
+ - ansible_facts.pkg_mgr | default('') == 'apt'
+
+- name: Install packages for {role} (DNF5)
+ ansible.builtin.dnf5:
+ name: "{{{{ {var_prefix}_packages | default([]) }}}}"
+ state: present
+ when:
+ - ({var_prefix}_packages | default([])) | length > 0
+ - ansible_facts.pkg_mgr | default('') == 'dnf5'
+
+- name: Install packages for {role} (DNF/YUM)
+ ansible.builtin.dnf:
+ name: "{{{{ {var_prefix}_packages | default([]) }}}}"
+ state: present
+ when:
+ - ({var_prefix}_packages | default([])) | length > 0
+ - ansible_facts.pkg_mgr | default('') in ['dnf', 'yum']
+
+- name: Install packages for {role} (generic fallback)
+ ansible.builtin.package:
+ name: "{{{{ {var_prefix}_packages | default([]) }}}}"
+ state: present
+ when:
+ - ({var_prefix}_packages | default([])) | length > 0
+ - ansible_facts.pkg_mgr | default('') not in ['apt', 'dnf', 'dnf5', 'yum']
+
+"""
+
+
def _prepare_bundle_dir(
bundle: str,
*,
@@ -629,6 +676,7 @@ def _manifest_from_bundle_dir(
package_roles: List[Dict[str, Any]] = state.get("package_roles", [])
users_snapshot: Dict[str, Any] = state.get("users", {})
apt_config_snapshot: Dict[str, Any] = state.get("apt_config", {})
+ dnf_config_snapshot: Dict[str, Any] = state.get("dnf_config", {})
etc_custom_snapshot: Dict[str, Any] = state.get("etc_custom", {})
usr_local_custom_snapshot: Dict[str, Any] = state.get("usr_local_custom", {})
extra_paths_snapshot: Dict[str, Any] = state.get("extra_paths", {})
@@ -664,6 +712,7 @@ def _manifest_from_bundle_dir(
manifested_users_roles: List[str] = []
manifested_apt_config_roles: List[str] = []
+ manifested_dnf_config_roles: List[str] = []
manifested_etc_custom_roles: List[str] = []
manifested_usr_local_custom_roles: List[str] = []
manifested_extra_paths_roles: List[str] = []
@@ -1041,6 +1090,157 @@ APT configuration harvested from the system (sources, pinning, and keyrings).
manifested_apt_config_roles.append(role)
+ # -------------------------
+ # dnf_config role (DNF/YUM repos, config, and RPM GPG keys)
+ # -------------------------
+ if dnf_config_snapshot and dnf_config_snapshot.get("managed_files"):
+ role = dnf_config_snapshot.get("role_name", "dnf_config")
+ role_dir = os.path.join(roles_root, role)
+ _write_role_scaffold(role_dir)
+
+ var_prefix = role
+
+ managed_files = dnf_config_snapshot.get("managed_files", [])
+ excluded = dnf_config_snapshot.get("excluded", [])
+ notes = dnf_config_snapshot.get("notes", [])
+
+ templated, jt_vars = _jinjify_managed_files(
+ bundle_dir,
+ role,
+ role_dir,
+ managed_files,
+ jt_exe=jt_exe,
+ jt_enabled=jt_enabled,
+ overwrite_templates=not site_mode,
+ )
+
+ if site_mode:
+ _copy_artifacts(
+ bundle_dir,
+ role,
+ _host_role_files_dir(out_dir, fqdn or "", role),
+ exclude_rels=templated,
+ )
+ else:
+ _copy_artifacts(
+ bundle_dir,
+ role,
+ os.path.join(role_dir, "files"),
+ exclude_rels=templated,
+ )
+
+ files_var = _build_managed_files_var(
+ managed_files,
+ templated,
+ notify_other=None,
+ notify_systemd=None,
+ )
+
+ jt_map = _yaml_load_mapping(jt_vars) if jt_vars.strip() else {}
+ vars_map: Dict[str, Any] = {f"{var_prefix}_managed_files": files_var}
+ vars_map = _merge_mappings_overwrite(vars_map, jt_map)
+
+ if site_mode:
+ _write_role_defaults(role_dir, {f"{var_prefix}_managed_files": []})
+ _write_hostvars(out_dir, fqdn or "", role, vars_map)
+ else:
+ _write_role_defaults(role_dir, vars_map)
+
+ tasks = "---\n" + _render_generic_files_tasks(
+ var_prefix, include_restart_notify=False
+ )
+ with open(
+ os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8"
+ ) as f:
+ f.write(tasks.rstrip() + "\n")
+
+ with open(
+ os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8"
+ ) as f:
+ f.write("---\ndependencies: []\n")
+
+ # README: summarise repos and GPG key material
+ repo_paths: List[str] = []
+ key_paths: List[str] = []
+ repo_hosts: Set[str] = set()
+
+ url_re = re.compile(r"(?:https?|ftp)://([^/\s]+)", re.IGNORECASE)
+ file_url_re = re.compile(r"file://(/[^\s]+)")
+
+ for mf in managed_files:
+ p = str(mf.get("path") or "")
+ src_rel = str(mf.get("src_rel") or "")
+ if not p or not src_rel:
+ continue
+
+ if p.startswith("/etc/yum.repos.d/") and p.endswith(".repo"):
+ repo_paths.append(p)
+ art_path = os.path.join(bundle_dir, "artifacts", role, src_rel)
+ try:
+ with open(art_path, "r", encoding="utf-8", errors="replace") as rf:
+ for line in rf:
+ s = line.strip()
+ if not s or s.startswith("#") or s.startswith(";"):
+ continue
+ # Collect hostnames from URLs (baseurl, mirrorlist, metalink, gpgkey...)
+ for m in url_re.finditer(s):
+ repo_hosts.add(m.group(1))
+ # Collect local gpgkey file paths referenced as file:///...
+ for m in file_url_re.finditer(s):
+ key_paths.append(m.group(1))
+ except OSError:
+ pass # nosec
+
+ if p.startswith("/etc/pki/rpm-gpg/"):
+ key_paths.append(p)
+
+ repo_paths = sorted(set(repo_paths))
+ key_paths = sorted(set(key_paths))
+ repos = sorted(repo_hosts)
+
+ readme = (
+ """# dnf_config
+
+DNF/YUM configuration harvested from the system (repos, config files, and RPM GPG keys).
+
+## Repository hosts
+"""
+ + ("\n".join([f"- {h}" for h in repos]) or "- (none)")
+ + """\n
+## Repo files
+"""
+ + ("\n".join([f"- {p}" for p in repo_paths]) or "- (none)")
+ + """\n
+## GPG keys
+"""
+ + ("\n".join([f"- {p}" for p in key_paths]) or "- (none)")
+ + """\n
+## Managed files
+"""
+ + (
+ "\n".join(
+ [f"- {mf.get('path')} ({mf.get('reason')})" for mf in managed_files]
+ )
+ or "- (none)"
+ )
+ + """\n
+## Excluded
+"""
+ + (
+ "\n".join([f"- {e.get('path')} ({e.get('reason')})" for e in excluded])
+ or "- (none)"
+ )
+ + """\n
+## Notes
+"""
+ + ("\n".join([f"- {n}" for n in notes]) or "- (none)")
+ + """\n"""
+ )
+ with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f:
+ f.write(readme)
+
+ manifested_dnf_config_roles.append(role)
+
# -------------------------
# etc_custom role (unowned /etc not already attributed)
# -------------------------
@@ -1457,19 +1657,7 @@ User-requested extra file harvesting.
f.write(handlers)
task_parts: List[str] = []
- task_parts.append(
- f"""---
-# Generated by enroll
-
-- name: Install packages for {role}
- ansible.builtin.apt:
- name: "{{{{ {var_prefix}_packages | default([]) }}}}"
- state: present
- update_cache: true
- when: ({var_prefix}_packages | default([])) | length > 0
-
-"""
- )
+ task_parts.append("---\n" + _render_install_packages_tasks(role, var_prefix))
task_parts.append(
_render_generic_files_tasks(var_prefix, include_restart_notify=True)
@@ -1616,19 +1804,7 @@ Generated from `{unit}`.
f.write(handlers)
task_parts: List[str] = []
- task_parts.append(
- f"""---
-# Generated by enroll
-
-- name: Install packages for {role}
- ansible.builtin.apt:
- name: "{{{{ {var_prefix}_packages | default([]) }}}}"
- state: present
- update_cache: true
- when: ({var_prefix}_packages | default([])) | length > 0
-
-"""
- )
+ task_parts.append("---\n" + _render_install_packages_tasks(role, var_prefix))
task_parts.append(
_render_generic_files_tasks(var_prefix, include_restart_notify=False)
)
@@ -1667,6 +1843,7 @@ Generated for package `{pkg}`.
manifested_pkg_roles.append(role)
all_roles = (
manifested_apt_config_roles
+ + manifested_dnf_config_roles
+ manifested_pkg_roles
+ manifested_service_roles
+ manifested_etc_custom_roles
diff --git a/enroll/platform.py b/enroll/platform.py
new file mode 100644
index 0000000..998b83d
--- /dev/null
+++ b/enroll/platform.py
@@ -0,0 +1,261 @@
+from __future__ import annotations
+
+import shutil
+from dataclasses import dataclass
+from typing import Dict, List, Optional, Set, Tuple
+
+from .fsutil import file_md5
+
+
+def _read_os_release(path: str = "/etc/os-release") -> Dict[str, str]:
+ out: Dict[str, str] = {}
+ try:
+ with open(path, "r", encoding="utf-8", errors="replace") as f:
+ for raw in f:
+ line = raw.strip()
+ if not line or line.startswith("#") or "=" not in line:
+ continue
+ k, v = line.split("=", 1)
+ k = k.strip()
+ v = v.strip().strip('"')
+ out[k] = v
+ except OSError:
+ return {}
+ return out
+
+
+@dataclass
+class PlatformInfo:
+ os_family: str # debian|redhat|unknown
+ pkg_backend: str # dpkg|rpm|unknown
+ os_release: Dict[str, str]
+
+
+def detect_platform() -> PlatformInfo:
+ """Detect platform family and package backend.
+
+ Uses /etc/os-release when available, with a conservative fallback to
+ checking for dpkg/rpm binaries.
+ """
+
+ osr = _read_os_release()
+ os_id = (osr.get("ID") or "").strip().lower()
+ likes = (osr.get("ID_LIKE") or "").strip().lower().split()
+
+ deb_ids = {"debian", "ubuntu", "linuxmint", "raspbian", "kali"}
+ rhel_ids = {
+ "fedora",
+ "rhel",
+ "centos",
+ "rocky",
+ "almalinux",
+ "ol",
+ "oracle",
+ "scientific",
+ }
+
+ if os_id in deb_ids or "debian" in likes:
+ return PlatformInfo(os_family="debian", pkg_backend="dpkg", os_release=osr)
+ if os_id in rhel_ids or any(
+ x in likes for x in ("rhel", "fedora", "centos", "redhat")
+ ):
+ return PlatformInfo(os_family="redhat", pkg_backend="rpm", os_release=osr)
+
+ # Fallback heuristics.
+ if shutil.which("dpkg"):
+ return PlatformInfo(os_family="debian", pkg_backend="dpkg", os_release=osr)
+ if shutil.which("rpm"):
+ return PlatformInfo(os_family="redhat", pkg_backend="rpm", os_release=osr)
+ return PlatformInfo(os_family="unknown", pkg_backend="unknown", os_release=osr)
+
+
+class PackageBackend:
+ """Backend abstraction for package ownership, config detection, and manual package lists."""
+
+ name: str
+ pkg_config_prefixes: Tuple[str, ...]
+
+ def owner_of_path(self, path: str) -> Optional[str]: # pragma: no cover
+ raise NotImplementedError
+
+ def list_manual_packages(self) -> List[str]: # pragma: no cover
+ raise NotImplementedError
+
+ def build_etc_index(
+ self,
+ ) -> Tuple[
+ Set[str], Dict[str, str], Dict[str, Set[str]], Dict[str, List[str]]
+ ]: # pragma: no cover
+ raise NotImplementedError
+
+ def specific_paths_for_hints(self, hints: Set[str]) -> List[str]:
+ return []
+
+ def is_pkg_config_path(self, path: str) -> bool:
+ for pfx in self.pkg_config_prefixes:
+ if path == pfx or path.startswith(pfx):
+ return True
+ return False
+
+ def modified_paths(self, pkg: str, etc_paths: List[str]) -> Dict[str, str]:
+ """Return a mapping of modified file paths -> reason label."""
+ return {}
+
+
+class DpkgBackend(PackageBackend):
+ name = "dpkg"
+ pkg_config_prefixes = ("/etc/apt/",)
+
+ def __init__(self) -> None:
+ from .debian import parse_status_conffiles
+
+ self._conffiles_by_pkg = parse_status_conffiles()
+
+ def owner_of_path(self, path: str) -> Optional[str]:
+ from .debian import dpkg_owner
+
+ return dpkg_owner(path)
+
+ def list_manual_packages(self) -> List[str]:
+ from .debian import list_manual_packages
+
+ return list_manual_packages()
+
+ def build_etc_index(self):
+ from .debian import build_dpkg_etc_index
+
+ return build_dpkg_etc_index()
+
+ def specific_paths_for_hints(self, hints: Set[str]) -> List[str]:
+ paths: List[str] = []
+ for h in hints:
+ paths.extend(
+ [
+ f"/etc/default/{h}",
+ f"/etc/init.d/{h}",
+ f"/etc/sysctl.d/{h}.conf",
+ ]
+ )
+ return paths
+
+ def modified_paths(self, pkg: str, etc_paths: List[str]) -> Dict[str, str]:
+ from .debian import read_pkg_md5sums
+
+ out: Dict[str, str] = {}
+ conff = self._conffiles_by_pkg.get(pkg, {})
+ md5sums = read_pkg_md5sums(pkg)
+
+ for path in etc_paths:
+ if not path.startswith("/etc/"):
+ continue
+ if self.is_pkg_config_path(path):
+ continue
+ if path in conff:
+ try:
+ current = file_md5(path)
+ except OSError:
+ continue
+ if current != conff[path]:
+ out[path] = "modified_conffile"
+ continue
+
+ rel = path.lstrip("/")
+ baseline = md5sums.get(rel)
+ if baseline:
+ try:
+ current = file_md5(path)
+ except OSError:
+ continue
+ if current != baseline:
+ out[path] = "modified_packaged_file"
+ return out
+
+
+class RpmBackend(PackageBackend):
+ name = "rpm"
+ pkg_config_prefixes = (
+ "/etc/dnf/",
+ "/etc/yum/",
+ "/etc/yum.repos.d/",
+ "/etc/yum.conf",
+ )
+
+ def __init__(self) -> None:
+ self._modified_cache: Dict[str, Set[str]] = {}
+ self._config_cache: Dict[str, Set[str]] = {}
+
+ def owner_of_path(self, path: str) -> Optional[str]:
+ from .rpm import rpm_owner
+
+ return rpm_owner(path)
+
+ def list_manual_packages(self) -> List[str]:
+ from .rpm import list_manual_packages
+
+ return list_manual_packages()
+
+ def build_etc_index(self):
+ from .rpm import build_rpm_etc_index
+
+ return build_rpm_etc_index()
+
+ def specific_paths_for_hints(self, hints: Set[str]) -> List[str]:
+ paths: List[str] = []
+ for h in hints:
+ paths.extend(
+ [
+ f"/etc/sysconfig/{h}",
+ f"/etc/sysconfig/{h}.conf",
+ f"/etc/sysctl.d/{h}.conf",
+ ]
+ )
+ return paths
+
+ def _config_files(self, pkg: str) -> Set[str]:
+ if pkg in self._config_cache:
+ return self._config_cache[pkg]
+ from .rpm import rpm_config_files
+
+ s = rpm_config_files(pkg)
+ self._config_cache[pkg] = s
+ return s
+
+ def _modified_files(self, pkg: str) -> Set[str]:
+ if pkg in self._modified_cache:
+ return self._modified_cache[pkg]
+ from .rpm import rpm_modified_files
+
+ s = rpm_modified_files(pkg)
+ self._modified_cache[pkg] = s
+ return s
+
+ def modified_paths(self, pkg: str, etc_paths: List[str]) -> Dict[str, str]:
+ out: Dict[str, str] = {}
+ modified = self._modified_files(pkg)
+ if not modified:
+ return out
+ config = self._config_files(pkg)
+
+ for path in etc_paths:
+ if not path.startswith("/etc/"):
+ continue
+ if self.is_pkg_config_path(path):
+ continue
+ if path not in modified:
+ continue
+ out[path] = (
+ "modified_conffile" if path in config else "modified_packaged_file"
+ )
+ return out
+
+
+def get_backend(info: Optional[PlatformInfo] = None) -> PackageBackend:
+ info = info or detect_platform()
+ if info.pkg_backend == "dpkg":
+ return DpkgBackend()
+ if info.pkg_backend == "rpm":
+ return RpmBackend()
+ # Unknown: be conservative and use an rpm backend if rpm exists, otherwise dpkg.
+ if shutil.which("rpm"):
+ return RpmBackend()
+ return DpkgBackend()
diff --git a/enroll/rpm.py b/enroll/rpm.py
new file mode 100644
index 0000000..947617c
--- /dev/null
+++ b/enroll/rpm.py
@@ -0,0 +1,266 @@
+from __future__ import annotations
+
+import os
+import re
+import shutil
+import subprocess # nosec
+from typing import Dict, List, Optional, Set, Tuple
+
+
+def _run(
+ cmd: list[str], *, allow_fail: bool = False, merge_err: bool = False
+) -> tuple[int, str]:
+ """Run a command and return (rc, stdout).
+
+ If merge_err is True, stderr is merged into stdout to preserve ordering.
+ """
+ p = subprocess.run(
+ cmd,
+ check=False,
+ text=True,
+ stdout=subprocess.PIPE,
+ stderr=(subprocess.STDOUT if merge_err else subprocess.PIPE),
+ ) # nosec
+ out = p.stdout or ""
+ if (not allow_fail) and p.returncode != 0:
+ err = "" if merge_err else (p.stderr or "")
+ raise RuntimeError(f"Command failed: {cmd}\n{err}{out}")
+ return p.returncode, out
+
+
+def rpm_owner(path: str) -> Optional[str]:
+ """Return owning package name for a path, or None if unowned."""
+ if not path:
+ return None
+ rc, out = _run(
+ ["rpm", "-qf", "--qf", "%{NAME}\n", path], allow_fail=True, merge_err=True
+ )
+ if rc != 0:
+ return None
+ for line in out.splitlines():
+ line = line.strip()
+ if not line:
+ continue
+ if "is not owned" in line:
+ return None
+ # With --qf we expect just the package name.
+ if re.match(r"^[A-Za-z0-9_.+:-]+$", line):
+ # Strip any accidental epoch/name-version-release output.
+ return line.split(":", 1)[-1].strip() if line else None
+ return None
+
+
+_ARCH_SUFFIXES = {
+ "noarch",
+ "x86_64",
+ "i686",
+ "aarch64",
+ "armv7hl",
+ "ppc64le",
+ "s390x",
+ "riscv64",
+}
+
+
+def _strip_arch(token: str) -> str:
+ """Strip a trailing .ARCH from a yum/dnf package token."""
+ t = token.strip()
+ if "." not in t:
+ return t
+ head, tail = t.rsplit(".", 1)
+ if tail in _ARCH_SUFFIXES:
+ return head
+ return t
+
+
+def list_manual_packages() -> List[str]:
+ """Return packages considered "user-installed" on RPM-based systems.
+
+ Best-effort:
+ 1) dnf repoquery --userinstalled
+ 2) dnf history userinstalled
+ 3) yum history userinstalled
+
+ If none are available, returns an empty list.
+ """
+
+ def _dedupe(pkgs: List[str]) -> List[str]:
+ return sorted({p for p in (pkgs or []) if p})
+
+ if shutil.which("dnf"):
+ # Prefer a machine-friendly output.
+ for cmd in (
+ ["dnf", "-q", "repoquery", "--userinstalled", "--qf", "%{name}\n"],
+ ["dnf", "-q", "repoquery", "--userinstalled"],
+ ):
+ rc, out = _run(cmd, allow_fail=True, merge_err=True)
+ if rc == 0 and out.strip():
+ pkgs = []
+ for line in out.splitlines():
+ line = line.strip()
+ if not line or line.startswith("Loaded plugins"):
+ continue
+ pkgs.append(_strip_arch(line.split()[0]))
+ if pkgs:
+ return _dedupe(pkgs)
+
+ # Fallback: human-oriented output.
+ rc, out = _run(
+ ["dnf", "-q", "history", "userinstalled"], allow_fail=True, merge_err=True
+ )
+ if rc == 0 and out.strip():
+ pkgs = []
+ for line in out.splitlines():
+ line = line.strip()
+ if not line or line.startswith("Installed") or line.startswith("Last"):
+ continue
+ # Often: "vim-enhanced.x86_64"
+ tok = line.split()[0]
+ pkgs.append(_strip_arch(tok))
+ if pkgs:
+ return _dedupe(pkgs)
+
+ if shutil.which("yum"):
+ rc, out = _run(
+ ["yum", "-q", "history", "userinstalled"], allow_fail=True, merge_err=True
+ )
+ if rc == 0 and out.strip():
+ pkgs = []
+ for line in out.splitlines():
+ line = line.strip()
+ if (
+ not line
+ or line.startswith("Installed")
+ or line.startswith("Loaded")
+ ):
+ continue
+ tok = line.split()[0]
+ pkgs.append(_strip_arch(tok))
+ if pkgs:
+ return _dedupe(pkgs)
+
+ return []
+
+
+def _walk_etc_files() -> List[str]:
+ out: List[str] = []
+ for dirpath, _, filenames in os.walk("/etc"):
+ for fn in filenames:
+ p = os.path.join(dirpath, fn)
+ if os.path.islink(p) or not os.path.isfile(p):
+ continue
+ out.append(p)
+ return out
+
+
+def build_rpm_etc_index() -> (
+ Tuple[Set[str], Dict[str, str], Dict[str, Set[str]], Dict[str, List[str]]]
+):
+ """Best-effort equivalent of build_dpkg_etc_index for RPM systems.
+
+ This builds indexes by walking the live /etc tree and querying RPM ownership
+ for each file.
+
+ Returns:
+ owned_etc_paths: set of /etc paths owned by rpm
+ etc_owner_map: /etc/path -> pkg
+ topdir_to_pkgs: "nginx" -> {"nginx", ...} based on /etc//...
+ pkg_to_etc_paths: pkg -> list of owned /etc paths
+ """
+
+ owned: Set[str] = set()
+ owner: Dict[str, str] = {}
+ topdir_to_pkgs: Dict[str, Set[str]] = {}
+ pkg_to_etc: Dict[str, List[str]] = {}
+
+ paths = _walk_etc_files()
+
+ # Query in chunks to avoid excessive process spawns.
+ chunk_size = 250
+
+ not_owned_re = re.compile(
+ r"^file\s+(?P.+?)\s+is\s+not\s+owned\s+by\s+any\s+package", re.IGNORECASE
+ )
+
+ for i in range(0, len(paths), chunk_size):
+ chunk = paths[i : i + chunk_size]
+ rc, out = _run(
+ ["rpm", "-qf", "--qf", "%{NAME}\n", *chunk],
+ allow_fail=True,
+ merge_err=True,
+ )
+
+ lines = [ln.strip() for ln in out.splitlines() if ln.strip()]
+ # Heuristic: rpm prints one output line per input path. If that isn't
+ # true (warnings/errors), fall back to per-file queries for this chunk.
+ if len(lines) != len(chunk):
+ for p in chunk:
+ pkg = rpm_owner(p)
+ if not pkg:
+ continue
+ owned.add(p)
+ owner.setdefault(p, pkg)
+ pkg_to_etc.setdefault(pkg, []).append(p)
+ parts = p.split("/", 3)
+ if len(parts) >= 3 and parts[2]:
+ topdir_to_pkgs.setdefault(parts[2], set()).add(pkg)
+ continue
+
+ for pth, line in zip(chunk, lines):
+ if not line:
+ continue
+ if not_owned_re.match(line) or "is not owned" in line:
+ continue
+ pkg = line.split()[0].strip()
+ if not pkg:
+ continue
+ owned.add(pth)
+ owner.setdefault(pth, pkg)
+ pkg_to_etc.setdefault(pkg, []).append(pth)
+ parts = pth.split("/", 3)
+ if len(parts) >= 3 and parts[2]:
+ topdir_to_pkgs.setdefault(parts[2], set()).add(pkg)
+
+ for k, v in list(pkg_to_etc.items()):
+ pkg_to_etc[k] = sorted(set(v))
+
+ return owned, owner, topdir_to_pkgs, pkg_to_etc
+
+
+def rpm_config_files(pkg: str) -> Set[str]:
+ """Return config files for a package (rpm -qc)."""
+ rc, out = _run(["rpm", "-qc", pkg], allow_fail=True, merge_err=True)
+ if rc != 0:
+ return set()
+ files: Set[str] = set()
+ for line in out.splitlines():
+ line = line.strip()
+ if line.startswith("/"):
+ files.add(line)
+ return files
+
+
+def rpm_modified_files(pkg: str) -> Set[str]:
+ """Return files reported as modified by rpm verification (rpm -V).
+
+ rpm -V only prints lines for differences/missing files.
+ """
+ rc, out = _run(["rpm", "-V", pkg], allow_fail=True, merge_err=True)
+ # rc is non-zero when there are differences; we still want the output.
+ files: Set[str] = set()
+ for raw in out.splitlines():
+ line = raw.strip()
+ if not line:
+ continue
+ # Typical forms:
+ # S.5....T. c /etc/foo.conf
+ # missing /etc/bar
+ m = re.search(r"\s(/\S+)$", line)
+ if m:
+ files.add(m.group(1))
+ continue
+ if line.startswith("missing"):
+ parts = line.split()
+ if parts and parts[-1].startswith("/"):
+ files.add(parts[-1])
+ return files
diff --git a/enroll/version.py b/enroll/version.py
new file mode 100644
index 0000000..bbe78b6
--- /dev/null
+++ b/enroll/version.py
@@ -0,0 +1,32 @@
+from __future__ import annotations
+
+
+def get_enroll_version() -> str:
+ """
+ Best-effort version lookup that works when installed via:
+ - poetry/pip/wheel
+ - deb/rpm system packages
+ Falls back to "0+unknown" when running from an unpacked source tree.
+ """
+ try:
+ from importlib.metadata import (
+ packages_distributions,
+ version,
+ )
+ except Exception:
+ # Very old Python or unusual environment
+ return "unknown"
+
+ # Map import package -> dist(s)
+ dist_names = []
+ try:
+ dist_names = (packages_distributions() or {}).get("enroll", []) or []
+ except Exception:
+ dist_names = []
+
+ # Try mapped dists first, then a reasonable default
+ for dist in [*dist_names, "enroll"]:
+ try:
+ return version(dist)
+ except Exception:
+ return "unknown"
diff --git a/tests/test_debian.py b/tests/test_debian.py
index 333afc1..abad361 100644
--- a/tests/test_debian.py
+++ b/tests/test_debian.py
@@ -1,6 +1,5 @@
from __future__ import annotations
-import hashlib
from pathlib import Path
@@ -97,58 +96,3 @@ def test_parse_status_conffiles_handles_continuations(tmp_path: Path):
assert m["nginx"]["/etc/nginx/nginx.conf"] == "abcdef"
assert m["nginx"]["/etc/nginx/mime.types"] == "123456"
assert "other" not in m
-
-
-def test_read_pkg_md5sums_and_file_md5(tmp_path: Path, monkeypatch):
- import enroll.debian as d
-
- # Patch /var/lib/dpkg/info/.md5sums lookup to a tmp file.
- md5_file = tmp_path / "pkg.md5sums"
- md5_file.write_text("0123456789abcdef etc/foo.conf\n", encoding="utf-8")
-
- def fake_exists(path: str) -> bool:
- return path.endswith("/var/lib/dpkg/info/p1.md5sums")
-
- real_open = open
-
- def fake_open(path: str, *args, **kwargs):
- if path.endswith("/var/lib/dpkg/info/p1.md5sums"):
- return real_open(md5_file, *args, **kwargs)
- return real_open(path, *args, **kwargs)
-
- monkeypatch.setattr(d.os.path, "exists", fake_exists)
- monkeypatch.setattr("builtins.open", fake_open)
-
- m = d.read_pkg_md5sums("p1")
- assert m == {"etc/foo.conf": "0123456789abcdef"}
-
- content = b"hello world\n"
- p = tmp_path / "x"
- p.write_bytes(content)
- assert d.file_md5(str(p)) == hashlib.md5(content).hexdigest()
-
-
-def test_stat_triplet_fallbacks(tmp_path: Path, monkeypatch):
- import enroll.debian as d
- import sys
-
- p = tmp_path / "f"
- p.write_text("x", encoding="utf-8")
-
- class FakePwdMod:
- @staticmethod
- def getpwuid(_): # pragma: no cover
- raise KeyError
-
- class FakeGrpMod:
- @staticmethod
- def getgrgid(_): # pragma: no cover
- raise KeyError
-
- # stat_triplet imports pwd/grp inside the function, so patch sys.modules.
- monkeypatch.setitem(sys.modules, "pwd", FakePwdMod)
- monkeypatch.setitem(sys.modules, "grp", FakeGrpMod)
- owner, group, mode = d.stat_triplet(str(p))
- assert owner.isdigit()
- assert group.isdigit()
- assert mode.isdigit() and len(mode) == 4
diff --git a/tests/test_fsutil.py b/tests/test_fsutil.py
new file mode 100644
index 0000000..ebe2224
--- /dev/null
+++ b/tests/test_fsutil.py
@@ -0,0 +1,25 @@
+from __future__ import annotations
+
+import hashlib
+import os
+from pathlib import Path
+
+from enroll.fsutil import file_md5, stat_triplet
+
+
+def test_file_md5_matches_hashlib(tmp_path: Path):
+ p = tmp_path / "x"
+ p.write_bytes(b"hello world")
+ expected = hashlib.md5(b"hello world").hexdigest() # nosec
+ assert file_md5(str(p)) == expected
+
+
+def test_stat_triplet_reports_mode(tmp_path: Path):
+ p = tmp_path / "x"
+ p.write_text("x", encoding="utf-8")
+ os.chmod(p, 0o600)
+
+ owner, group, mode = stat_triplet(str(p))
+ assert mode == "0600"
+ assert owner # non-empty string
+ assert group # non-empty string
diff --git a/tests/test_harvest.py b/tests/test_harvest.py
index fa796f0..a0d22ec 100644
--- a/tests/test_harvest.py
+++ b/tests/test_harvest.py
@@ -2,6 +2,7 @@ import json
from pathlib import Path
import enroll.harvest as h
+from enroll.platform import PlatformInfo
from enroll.systemd import UnitInfo
@@ -10,6 +11,64 @@ class AllowAllPolicy:
return None
+class FakeBackend:
+ """Minimal backend stub for harvest tests.
+
+ The real backends (dpkg/rpm) enumerate the live system (dpkg status, rpm
+ databases, etc). These tests instead control all backend behaviour.
+ """
+
+ def __init__(
+ self,
+ *,
+ name: str,
+ owned_etc: set[str],
+ etc_owner_map: dict[str, str],
+ topdir_to_pkgs: dict[str, set[str]],
+ pkg_to_etc_paths: dict[str, list[str]],
+ manual_pkgs: list[str],
+ owner_fn,
+ modified_by_pkg: dict[str, dict[str, str]] | None = None,
+ pkg_config_prefixes: tuple[str, ...] = ("/etc/apt/",),
+ ):
+ self.name = name
+ self.pkg_config_prefixes = pkg_config_prefixes
+ self._owned_etc = owned_etc
+ self._etc_owner_map = etc_owner_map
+ self._topdir_to_pkgs = topdir_to_pkgs
+ self._pkg_to_etc_paths = pkg_to_etc_paths
+ self._manual = manual_pkgs
+ self._owner_fn = owner_fn
+ self._modified_by_pkg = modified_by_pkg or {}
+
+ def build_etc_index(self):
+ return (
+ self._owned_etc,
+ self._etc_owner_map,
+ self._topdir_to_pkgs,
+ self._pkg_to_etc_paths,
+ )
+
+ def owner_of_path(self, path: str):
+ return self._owner_fn(path)
+
+ def list_manual_packages(self):
+ return list(self._manual)
+
+ def specific_paths_for_hints(self, hints: set[str]):
+ return []
+
+ def is_pkg_config_path(self, path: str) -> bool:
+ for pfx in self.pkg_config_prefixes:
+ if path == pfx or path.startswith(pfx):
+ return True
+ return False
+
+ def modified_paths(self, pkg: str, etc_paths: list[str]):
+ # Test-controlled; ignore etc_paths.
+ return dict(self._modified_by_pkg.get(pkg, {}))
+
+
def test_harvest_dedup_manual_packages_and_builds_etc_custom(
monkeypatch, tmp_path: Path
):
@@ -22,7 +81,7 @@ def test_harvest_dedup_manual_packages_and_builds_etc_custom(
real_exists = os.path.exists
real_islink = os.path.islink
- # Fake filesystem: two /etc files exist, only one is dpkg-owned.
+ # Fake filesystem: two /etc files exist, only one is package-owned.
# Also include some /usr/local files to populate usr_local_custom.
files = {
"/etc/openvpn/server.conf": b"server",
@@ -93,6 +152,7 @@ def test_harvest_dedup_manual_packages_and_builds_etc_custom(
# Avoid real system access
monkeypatch.setattr(h, "list_enabled_services", lambda: ["openvpn.service"])
+ monkeypatch.setattr(h, "list_enabled_timers", lambda: [])
monkeypatch.setattr(
h,
"get_unit_info",
@@ -109,29 +169,30 @@ def test_harvest_dedup_manual_packages_and_builds_etc_custom(
),
)
- # Debian package index: openvpn owns /etc/openvpn/server.conf; keyboard is unowned.
- def fake_build_index():
- owned_etc = {"/etc/openvpn/server.conf"}
- etc_owner_map = {"/etc/openvpn/server.conf": "openvpn"}
- topdir_to_pkgs = {"openvpn": {"openvpn"}}
- pkg_to_etc_paths = {"openvpn": ["/etc/openvpn/server.conf"], "curl": []}
- return owned_etc, etc_owner_map, topdir_to_pkgs, pkg_to_etc_paths
+ # Package index: openvpn owns /etc/openvpn/server.conf; keyboard is unowned.
+ owned_etc = {"/etc/openvpn/server.conf"}
+ etc_owner_map = {"/etc/openvpn/server.conf": "openvpn"}
+ topdir_to_pkgs = {"openvpn": {"openvpn"}}
+ pkg_to_etc_paths = {"openvpn": ["/etc/openvpn/server.conf"], "curl": []}
- monkeypatch.setattr(h, "build_dpkg_etc_index", fake_build_index)
-
- # openvpn conffile hash mismatch => should be captured under service role
- monkeypatch.setattr(
- h,
- "parse_status_conffiles",
- lambda: {"openvpn": {"/etc/openvpn/server.conf": "old"}},
+ backend = FakeBackend(
+ name="dpkg",
+ owned_etc=owned_etc,
+ etc_owner_map=etc_owner_map,
+ topdir_to_pkgs=topdir_to_pkgs,
+ pkg_to_etc_paths=pkg_to_etc_paths,
+ manual_pkgs=["openvpn", "curl"],
+ owner_fn=lambda p: "openvpn" if "openvpn" in (p or "") else None,
+ modified_by_pkg={
+ "openvpn": {"/etc/openvpn/server.conf": "modified_conffile"},
+ },
)
- monkeypatch.setattr(h, "read_pkg_md5sums", lambda pkg: {})
- monkeypatch.setattr(h, "file_md5", lambda path: "new")
monkeypatch.setattr(
- h, "dpkg_owner", lambda p: "openvpn" if "openvpn" in p else None
+ h, "detect_platform", lambda: PlatformInfo("debian", "dpkg", {})
)
- monkeypatch.setattr(h, "list_manual_packages", lambda: ["openvpn", "curl"])
+ monkeypatch.setattr(h, "get_backend", lambda info=None: backend)
+
monkeypatch.setattr(h, "collect_non_system_users", lambda: [])
def fake_stat_triplet(p: str):
@@ -207,6 +268,7 @@ def test_shared_cron_snippet_prefers_matching_role_over_lexicographic(
monkeypatch.setattr(
h, "list_enabled_services", lambda: ["apparmor.service", "ntpsec.service"]
)
+ monkeypatch.setattr(h, "list_enabled_timers", lambda: [])
def fake_unit_info(unit: str) -> UnitInfo:
if unit == "apparmor.service":
@@ -235,31 +297,35 @@ def test_shared_cron_snippet_prefers_matching_role_over_lexicographic(
monkeypatch.setattr(h, "get_unit_info", fake_unit_info)
- # Dpkg /etc index: no owned /etc paths needed for this test.
- monkeypatch.setattr(
- h,
- "build_dpkg_etc_index",
- lambda: (set(), {}, {}, {}),
- )
- monkeypatch.setattr(h, "parse_status_conffiles", lambda: {})
- monkeypatch.setattr(h, "read_pkg_md5sums", lambda pkg: {})
- monkeypatch.setattr(h, "file_md5", lambda path: "x")
- monkeypatch.setattr(h, "list_manual_packages", lambda: [])
- monkeypatch.setattr(h, "collect_non_system_users", lambda: [])
-
# Make apparmor *also* claim the ntpsec package (simulates overly-broad
# package inference). The snippet routing should still prefer role 'ntpsec'.
- def fake_dpkg_owner(p: str):
+ def fake_owner(p: str):
if p == "/etc/cron.d/ntpsec":
return "ntpsec"
- if "apparmor" in p:
+ if "apparmor" in (p or ""):
return "ntpsec" # intentionally misleading
- if "ntpsec" in p or "ntpd" in p:
+ if "ntpsec" in (p or "") or "ntpd" in (p or ""):
return "ntpsec"
return None
- monkeypatch.setattr(h, "dpkg_owner", fake_dpkg_owner)
+ backend = FakeBackend(
+ name="dpkg",
+ owned_etc=set(),
+ etc_owner_map={},
+ topdir_to_pkgs={},
+ pkg_to_etc_paths={},
+ manual_pkgs=[],
+ owner_fn=fake_owner,
+ modified_by_pkg={},
+ )
+
+ monkeypatch.setattr(
+ h, "detect_platform", lambda: PlatformInfo("debian", "dpkg", {})
+ )
+ monkeypatch.setattr(h, "get_backend", lambda info=None: backend)
+
monkeypatch.setattr(h, "stat_triplet", lambda p: ("root", "root", "0644"))
+ monkeypatch.setattr(h, "collect_non_system_users", lambda: [])
def fake_copy(bundle_dir: str, role_name: str, abs_path: str, src_rel: str):
dst = Path(bundle_dir) / "artifacts" / role_name / src_rel
@@ -268,11 +334,7 @@ def test_shared_cron_snippet_prefers_matching_role_over_lexicographic(
monkeypatch.setattr(h, "_copy_into_bundle", fake_copy)
- class AllowAll:
- def deny_reason(self, path: str):
- return None
-
- state_path = h.harvest(str(bundle), policy=AllowAll())
+ state_path = h.harvest(str(bundle), policy=AllowAllPolicy())
st = json.loads(Path(state_path).read_text(encoding="utf-8"))
# Cron snippet should end up attached to the ntpsec role, not apparmor.
diff --git a/tests/test_manifest.py b/tests/test_manifest.py
index 92c3dfc..cbfc208 100644
--- a/tests/test_manifest.py
+++ b/tests/test_manifest.py
@@ -322,3 +322,96 @@ def test_copy2_replace_overwrites_readonly_destination(tmp_path: Path):
assert dst.read_text(encoding="utf-8") == "new"
mode = stat.S_IMODE(dst.stat().st_mode)
assert mode & stat.S_IWUSR # destination should remain mergeable
+
+
+def test_manifest_includes_dnf_config_role_when_present(tmp_path: Path):
+ bundle = tmp_path / "bundle"
+ out = tmp_path / "ansible"
+
+ # Create a dnf_config artifact.
+ (bundle / "artifacts" / "dnf_config" / "etc" / "dnf").mkdir(
+ parents=True, exist_ok=True
+ )
+ (bundle / "artifacts" / "dnf_config" / "etc" / "dnf" / "dnf.conf").write_text(
+ "[main]\n", encoding="utf-8"
+ )
+
+ state = {
+ "host": {"hostname": "test", "os": "redhat", "pkg_backend": "rpm"},
+ "users": {
+ "role_name": "users",
+ "users": [],
+ "managed_files": [],
+ "excluded": [],
+ "notes": [],
+ },
+ "services": [],
+ "package_roles": [],
+ "manual_packages": [],
+ "manual_packages_skipped": [],
+ "apt_config": {
+ "role_name": "apt_config",
+ "managed_files": [],
+ "excluded": [],
+ "notes": [],
+ },
+ "dnf_config": {
+ "role_name": "dnf_config",
+ "managed_files": [
+ {
+ "path": "/etc/dnf/dnf.conf",
+ "src_rel": "etc/dnf/dnf.conf",
+ "owner": "root",
+ "group": "root",
+ "mode": "0644",
+ "reason": "dnf_config",
+ }
+ ],
+ "excluded": [],
+ "notes": [],
+ },
+ "etc_custom": {
+ "role_name": "etc_custom",
+ "managed_files": [],
+ "excluded": [],
+ "notes": [],
+ },
+ "usr_local_custom": {
+ "role_name": "usr_local_custom",
+ "managed_files": [],
+ "excluded": [],
+ "notes": [],
+ },
+ "extra_paths": {
+ "role_name": "extra_paths",
+ "include_patterns": [],
+ "exclude_patterns": [],
+ "managed_files": [],
+ "excluded": [],
+ "notes": [],
+ },
+ }
+
+ bundle.mkdir(parents=True, exist_ok=True)
+ (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8")
+
+ manifest(str(bundle), str(out))
+
+ pb = (out / "playbook.yml").read_text(encoding="utf-8")
+ assert "- dnf_config" in pb
+
+ tasks = (out / "roles" / "dnf_config" / "tasks" / "main.yml").read_text(
+ encoding="utf-8"
+ )
+ # Ensure the role exists and contains some file deployment logic.
+ assert "Deploy any other managed files" in tasks
+
+
+def test_render_install_packages_tasks_contains_dnf_branch():
+ from enroll.manifest import _render_install_packages_tasks
+
+ txt = _render_install_packages_tasks("role", "role")
+ assert "ansible.builtin.apt" in txt
+ assert "ansible.builtin.dnf" in txt
+ assert "ansible.builtin.package" in txt
+ assert "pkg_mgr" in txt
diff --git a/tests/test_platform.py b/tests/test_platform.py
new file mode 100644
index 0000000..7ff66c6
--- /dev/null
+++ b/tests/test_platform.py
@@ -0,0 +1,93 @@
+from __future__ import annotations
+
+from pathlib import Path
+
+import enroll.platform as platform
+
+
+def test_read_os_release_parses_kv_and_strips_quotes(tmp_path: Path):
+ p = tmp_path / "os-release"
+ p.write_text(
+ """
+# comment
+ID=fedora
+ID_LIKE=\"rhel centos\"
+NAME=\"Fedora Linux\"
+EMPTY=
+NOEQUALS
+""",
+ encoding="utf-8",
+ )
+
+ osr = platform._read_os_release(str(p))
+ assert osr["ID"] == "fedora"
+ assert osr["ID_LIKE"] == "rhel centos"
+ assert osr["NAME"] == "Fedora Linux"
+ assert osr["EMPTY"] == ""
+ assert "NOEQUALS" not in osr
+
+
+def test_detect_platform_prefers_os_release(monkeypatch):
+ monkeypatch.setattr(
+ platform,
+ "_read_os_release",
+ lambda path="/etc/os-release": {"ID": "fedora", "ID_LIKE": "rhel"},
+ )
+ # If os-release is decisive we shouldn't need which()
+ monkeypatch.setattr(platform.shutil, "which", lambda exe: None)
+
+ info = platform.detect_platform()
+ assert info.os_family == "redhat"
+ assert info.pkg_backend == "rpm"
+
+
+def test_detect_platform_fallbacks_to_dpkg_when_unknown(monkeypatch):
+ monkeypatch.setattr(platform, "_read_os_release", lambda path="/etc/os-release": {})
+ monkeypatch.setattr(
+ platform.shutil, "which", lambda exe: "/usr/bin/dpkg" if exe == "dpkg" else None
+ )
+
+ info = platform.detect_platform()
+ assert info.os_family == "debian"
+ assert info.pkg_backend == "dpkg"
+
+
+def test_get_backend_unknown_prefers_rpm_if_present(monkeypatch):
+ monkeypatch.setattr(
+ platform.shutil, "which", lambda exe: "/usr/bin/rpm" if exe == "rpm" else None
+ )
+
+ b = platform.get_backend(
+ platform.PlatformInfo(os_family="unknown", pkg_backend="unknown", os_release={})
+ )
+ assert isinstance(b, platform.RpmBackend)
+
+
+def test_rpm_backend_modified_paths_labels_conffiles(monkeypatch):
+ b = platform.RpmBackend()
+
+ # Pretend rpm -V says both files changed, but only one is a config file.
+ monkeypatch.setattr(b, "_modified_files", lambda pkg: {"/etc/foo.conf", "/etc/bar"})
+ monkeypatch.setattr(b, "_config_files", lambda pkg: {"/etc/foo.conf"})
+
+ out = b.modified_paths("mypkg", ["/etc/foo.conf", "/etc/bar", "/etc/dnf/dnf.conf"])
+ assert out["/etc/foo.conf"] == "modified_conffile"
+ assert out["/etc/bar"] == "modified_packaged_file"
+ # Package-manager config paths are excluded.
+ assert "/etc/dnf/dnf.conf" not in out
+
+
+def test_specific_paths_for_hints_differs_between_backends():
+ # We can exercise this without instantiating DpkgBackend (which reads dpkg status)
+ class Dummy(platform.PackageBackend):
+ name = "dummy"
+ pkg_config_prefixes = ("/etc/apt/",)
+
+ d = Dummy()
+ assert d.is_pkg_config_path("/etc/apt/sources.list")
+ assert not d.is_pkg_config_path("/etc/ssh/sshd_config")
+
+ r = platform.RpmBackend()
+ paths = set(r.specific_paths_for_hints({"nginx"}))
+ assert "/etc/sysconfig/nginx" in paths
+ assert "/etc/sysconfig/nginx.conf" in paths
diff --git a/tests/test_rpm.py b/tests/test_rpm.py
new file mode 100644
index 0000000..ea97c12
--- /dev/null
+++ b/tests/test_rpm.py
@@ -0,0 +1,131 @@
+from __future__ import annotations
+
+import enroll.rpm as rpm
+
+
+def test_rpm_owner_returns_none_when_unowned(monkeypatch):
+ monkeypatch.setattr(
+ rpm,
+ "_run",
+ lambda cmd, allow_fail=False, merge_err=False: (
+ 1,
+ "file /etc/x is not owned by any package\n",
+ ),
+ )
+ assert rpm.rpm_owner("/etc/x") is None
+
+
+def test_rpm_owner_parses_name(monkeypatch):
+ monkeypatch.setattr(
+ rpm, "_run", lambda cmd, allow_fail=False, merge_err=False: (0, "bash\n")
+ )
+ assert rpm.rpm_owner("/bin/bash") == "bash"
+
+
+def test_strip_arch_strips_known_arches():
+ assert rpm._strip_arch("vim-enhanced.x86_64") == "vim-enhanced"
+ assert rpm._strip_arch("foo.noarch") == "foo"
+ assert rpm._strip_arch("weird.token") == "weird.token"
+
+
+def test_list_manual_packages_prefers_dnf_repoquery(monkeypatch):
+ monkeypatch.setattr(
+ rpm.shutil, "which", lambda exe: "/usr/bin/dnf" if exe == "dnf" else None
+ )
+
+ def fake_run(cmd, allow_fail=False, merge_err=False):
+ # First repoquery form returns usable output.
+ if cmd[:3] == ["dnf", "-q", "repoquery"]:
+ return 0, "vim-enhanced.x86_64\nhtop\nvim-enhanced.x86_64\n"
+ raise AssertionError(f"unexpected cmd: {cmd}")
+
+ monkeypatch.setattr(rpm, "_run", fake_run)
+
+ pkgs = rpm.list_manual_packages()
+ assert pkgs == ["htop", "vim-enhanced"]
+
+
+def test_list_manual_packages_falls_back_to_history(monkeypatch):
+ monkeypatch.setattr(
+ rpm.shutil, "which", lambda exe: "/usr/bin/dnf" if exe == "dnf" else None
+ )
+
+ def fake_run(cmd, allow_fail=False, merge_err=False):
+ # repoquery fails
+ if cmd[:3] == ["dnf", "-q", "repoquery"]:
+ return 1, ""
+ if cmd[:3] == ["dnf", "-q", "history"]:
+ return (
+ 0,
+ "Installed Packages\nvim-enhanced.x86_64\nLast metadata expiration check: 0:01:00 ago\n",
+ )
+ raise AssertionError(f"unexpected cmd: {cmd}")
+
+ monkeypatch.setattr(rpm, "_run", fake_run)
+
+ pkgs = rpm.list_manual_packages()
+ assert pkgs == ["vim-enhanced"]
+
+
+def test_build_rpm_etc_index_uses_fallback_when_rpm_output_mismatches(monkeypatch):
+ # Two files in /etc, one owned, one unowned.
+ monkeypatch.setattr(
+ rpm, "_walk_etc_files", lambda: ["/etc/owned.conf", "/etc/unowned.conf"]
+ )
+
+ # Simulate chunk query producing unexpected extra line (mismatch) -> triggers per-file fallback.
+ monkeypatch.setattr(
+ rpm,
+ "_run",
+ lambda cmd, allow_fail=False, merge_err=False: (0, "ownedpkg\nEXTRA\nTHIRD\n"),
+ )
+ monkeypatch.setattr(
+ rpm, "rpm_owner", lambda p: "ownedpkg" if p == "/etc/owned.conf" else None
+ )
+
+ owned, owner_map, topdir_to_pkgs, pkg_to_etc = rpm.build_rpm_etc_index()
+
+ assert owned == {"/etc/owned.conf"}
+ assert owner_map["/etc/owned.conf"] == "ownedpkg"
+ assert "owned.conf" in topdir_to_pkgs
+ assert pkg_to_etc["ownedpkg"] == ["/etc/owned.conf"]
+
+
+def test_build_rpm_etc_index_parses_chunk_output(monkeypatch):
+ monkeypatch.setattr(
+ rpm, "_walk_etc_files", lambda: ["/etc/ssh/sshd_config", "/etc/notowned"]
+ )
+
+ def fake_run(cmd, allow_fail=False, merge_err=False):
+ # One output line per input path.
+ return 0, "openssh-server\nfile /etc/notowned is not owned by any package\n"
+
+ monkeypatch.setattr(rpm, "_run", fake_run)
+
+ owned, owner_map, topdir_to_pkgs, pkg_to_etc = rpm.build_rpm_etc_index()
+
+ assert "/etc/ssh/sshd_config" in owned
+ assert "/etc/notowned" not in owned
+ assert owner_map["/etc/ssh/sshd_config"] == "openssh-server"
+ assert "ssh" in topdir_to_pkgs
+ assert "openssh-server" in topdir_to_pkgs["ssh"]
+ assert pkg_to_etc["openssh-server"] == ["/etc/ssh/sshd_config"]
+
+
+def test_rpm_config_files_and_modified_files_parsing(monkeypatch):
+ monkeypatch.setattr(
+ rpm,
+ "_run",
+ lambda cmd, allow_fail=False, merge_err=False: (
+ 0,
+ "/etc/foo.conf\n/usr/bin/tool\n",
+ ),
+ )
+ assert rpm.rpm_config_files("mypkg") == {"/etc/foo.conf", "/usr/bin/tool"}
+
+ # rpm -V returns only changed/missing files
+ out = "S.5....T. c /etc/foo.conf\nmissing /etc/bar\n"
+ monkeypatch.setattr(
+ rpm, "_run", lambda cmd, allow_fail=False, merge_err=False: (1, out)
+ )
+ assert rpm.rpm_modified_files("mypkg") == {"/etc/foo.conf", "/etc/bar"}