Compare commits
No commits in common. "984b0fa81b5b224951816c4dc46a74734b950d07" and "8c19473e18b388b95ac3a5f77942cd081c17e889" have entirely different histories.
984b0fa81b
...
8c19473e18
17 changed files with 278 additions and 1471 deletions
|
|
@ -1,8 +1,3 @@
|
|||
# 0.2.0
|
||||
|
||||
* Add version CLI arg
|
||||
* Add ability to enroll RH-style systems (DNF5/DNF/RPM)
|
||||
|
||||
# 0.1.7
|
||||
|
||||
* Fix an attribution bug for certain files ending up in the wrong package/role.
|
||||
|
|
|
|||
18
README.md
18
README.md
|
|
@ -4,15 +4,15 @@
|
|||
<img src="https://git.mig5.net/mig5/enroll/raw/branch/main/enroll.svg" alt="Enroll logo" width="240" />
|
||||
</div>
|
||||
|
||||
**enroll** inspects a Linux machine (Debian-like or RedHat-like) and generates Ansible roles/playbooks (and optionally inventory) for what it finds.
|
||||
**enroll** inspects a Linux machine (currently Debian-only) and generates Ansible roles/playbooks (and optionally inventory) for what it finds.
|
||||
|
||||
- Detects packages that have been installed.
|
||||
- Detects package ownership of `/etc` files where possible
|
||||
- Captures config that has **changed from packaged defaults** where possible (e.g dpkg conffile hashes + package md5sums when available).
|
||||
- Detects Debian package ownership of `/etc` files using dpkg’s local database.
|
||||
- Captures config that has **changed from packaged defaults** (dpkg conffile hashes + package md5sums when available).
|
||||
- Also captures **service-relevant custom/unowned files** under `/etc/<service>/...` (e.g. drop-in config includes).
|
||||
- Defensively excludes likely secrets (path denylist + content sniff + size caps).
|
||||
- Captures non-system users and their SSH public keys.
|
||||
- Captures miscellaneous `/etc` files it can't attribute to a package and installs them in an `etc_custom` role.
|
||||
- Captures miscellaneous `/etc` files it can’t attribute to a package and installs them in an `etc_custom` role.
|
||||
- Ditto for /usr/local/bin (for non-binary files) and /usr/local/etc
|
||||
- Avoids trying to start systemd services that were detected as inactive during harvest.
|
||||
|
||||
|
|
@ -41,8 +41,8 @@ Use when enrolling **one server** (or generating a “golden” role set you int
|
|||
|
||||
**Characteristics**
|
||||
- Roles are more self-contained.
|
||||
- Raw config files live in the role's `files/`.
|
||||
- Template variables live in the role's `defaults/main.yml`.
|
||||
- Raw config files live in the role’s `files/`.
|
||||
- Template variables live in the role’s `defaults/main.yml`.
|
||||
|
||||
### Multi-site mode (`--fqdn`)
|
||||
Use when enrolling **several existing servers** quickly, especially if they differ.
|
||||
|
|
@ -68,13 +68,13 @@ Harvest state about a host and write a harvest bundle.
|
|||
- “Manual” packages
|
||||
- Changed-from-default config (plus related custom/unowned files under service dirs)
|
||||
- Non-system users + SSH public keys
|
||||
- Misc `/etc` that can't be attributed to a package (`etc_custom` role)
|
||||
- Misc `/etc` that can’t be attributed to a package (`etc_custom` role)
|
||||
- Optional user-specified extra files/dirs via `--include-path` (emitted as an `extra_paths` role at manifest time)
|
||||
|
||||
**Common flags**
|
||||
- Remote harvesting:
|
||||
- `--remote-host`, `--remote-user`, `--remote-port`
|
||||
- `--no-sudo` (if you don't want/need sudo)
|
||||
- `--no-sudo` (if you don’t want/need sudo)
|
||||
- Sensitive-data behaviour:
|
||||
- default: tries to avoid likely secrets
|
||||
- `--dangerous`: disables secret-safety checks (see “Sensitive data” below)
|
||||
|
|
@ -233,7 +233,7 @@ poetry run enroll --help
|
|||
|
||||
## Found a bug / have a suggestion?
|
||||
|
||||
My Forgejo doesn't currently support federation, so I haven't opened registration/login for issues.
|
||||
My Forgejo doesn’t currently support federation, so I haven’t opened registration/login for issues.
|
||||
|
||||
Instead, email me (see `pyproject.toml`) or contact me on the Fediverse:
|
||||
|
||||
|
|
|
|||
|
|
@ -15,7 +15,6 @@ from .harvest import harvest
|
|||
from .manifest import manifest
|
||||
from .remote import remote_harvest
|
||||
from .sopsutil import SopsError, encrypt_file_binary
|
||||
from .version import get_enroll_version
|
||||
|
||||
|
||||
def _discover_config_path(argv: list[str]) -> Optional[Path]:
|
||||
|
|
@ -319,6 +318,13 @@ def _jt_mode(args: argparse.Namespace) -> str:
|
|||
return "auto"
|
||||
|
||||
|
||||
def _add_remote_args(p: argparse.ArgumentParser) -> None:
|
||||
p.add_argument(
|
||||
"--remote-host",
|
||||
help="SSH host to run harvesting on (if set, harvest runs remotely and is pulled locally).",
|
||||
)
|
||||
|
||||
|
||||
def _add_config_args(p: argparse.ArgumentParser) -> None:
|
||||
p.add_argument(
|
||||
"-c",
|
||||
|
|
@ -333,13 +339,6 @@ def _add_config_args(p: argparse.ArgumentParser) -> None:
|
|||
action="store_true",
|
||||
help="Do not load any INI config file (even if one would be auto-discovered).",
|
||||
)
|
||||
|
||||
|
||||
def _add_remote_args(p: argparse.ArgumentParser) -> None:
|
||||
p.add_argument(
|
||||
"--remote-host",
|
||||
help="SSH host to run harvesting on (if set, harvest runs remotely and is pulled locally).",
|
||||
)
|
||||
p.add_argument(
|
||||
"--remote-port",
|
||||
type=int,
|
||||
|
|
@ -355,18 +354,11 @@ def _add_remote_args(p: argparse.ArgumentParser) -> None:
|
|||
|
||||
def main() -> None:
|
||||
ap = argparse.ArgumentParser(prog="enroll")
|
||||
ap.add_argument(
|
||||
"-v",
|
||||
"--version",
|
||||
action="version",
|
||||
version=f"{get_enroll_version()}",
|
||||
)
|
||||
_add_config_args(ap)
|
||||
sub = ap.add_subparsers(dest="cmd", required=True)
|
||||
|
||||
h = sub.add_parser("harvest", help="Harvest service/package/config state")
|
||||
_add_config_args(h)
|
||||
_add_remote_args(h)
|
||||
h.add_argument(
|
||||
"--out",
|
||||
help=(
|
||||
|
|
@ -414,6 +406,7 @@ def main() -> None:
|
|||
action="store_true",
|
||||
help="Don't use sudo on the remote host (when using --remote options). This may result in a limited harvest due to permission restrictions.",
|
||||
)
|
||||
_add_remote_args(h)
|
||||
|
||||
m = sub.add_parser("manifest", help="Render Ansible roles from a harvest")
|
||||
_add_config_args(m)
|
||||
|
|
@ -450,7 +443,6 @@ def main() -> None:
|
|||
"single-shot", help="Harvest state, then manifest Ansible code, in one shot"
|
||||
)
|
||||
_add_config_args(s)
|
||||
_add_remote_args(s)
|
||||
s.add_argument(
|
||||
"--harvest",
|
||||
help=(
|
||||
|
|
@ -508,6 +500,7 @@ def main() -> None:
|
|||
),
|
||||
)
|
||||
_add_common_manifest_args(s)
|
||||
_add_remote_args(s)
|
||||
|
||||
d = sub.add_parser("diff", help="Compare two harvests and report differences")
|
||||
_add_config_args(d)
|
||||
|
|
@ -609,12 +602,14 @@ def main() -> None:
|
|||
)
|
||||
args = ap.parse_args(argv)
|
||||
|
||||
remote_host: Optional[str] = getattr(args, "remote_host", None)
|
||||
|
||||
try:
|
||||
if args.cmd == "harvest":
|
||||
sops_fps = getattr(args, "sops", None)
|
||||
if args.remote_host:
|
||||
if remote_host:
|
||||
if sops_fps:
|
||||
out_file = _resolve_sops_out_file(args.out, hint=args.remote_host)
|
||||
out_file = _resolve_sops_out_file(args.out, hint=remote_host)
|
||||
with tempfile.TemporaryDirectory(prefix="enroll-harvest-") as td:
|
||||
tmp_bundle = Path(td) / "bundle"
|
||||
tmp_bundle.mkdir(parents=True, exist_ok=True)
|
||||
|
|
@ -624,7 +619,7 @@ def main() -> None:
|
|||
pass
|
||||
remote_harvest(
|
||||
local_out_dir=tmp_bundle,
|
||||
remote_host=args.remote_host,
|
||||
remote_host=remote_host,
|
||||
remote_port=int(args.remote_port),
|
||||
remote_user=args.remote_user,
|
||||
dangerous=bool(args.dangerous),
|
||||
|
|
@ -640,11 +635,11 @@ def main() -> None:
|
|||
out_dir = (
|
||||
Path(args.out)
|
||||
if args.out
|
||||
else new_harvest_cache_dir(hint=args.remote_host).dir
|
||||
else new_harvest_cache_dir(hint=remote_host).dir
|
||||
)
|
||||
state = remote_harvest(
|
||||
local_out_dir=out_dir,
|
||||
remote_host=args.remote_host,
|
||||
remote_host=remote_host,
|
||||
remote_port=int(args.remote_port),
|
||||
remote_user=args.remote_user,
|
||||
dangerous=bool(args.dangerous),
|
||||
|
|
@ -674,16 +669,12 @@ def main() -> None:
|
|||
)
|
||||
print(str(out_file))
|
||||
else:
|
||||
if args.out:
|
||||
out_dir = args.out
|
||||
else:
|
||||
out_dir = (
|
||||
Path(args.out)
|
||||
if args.out
|
||||
else new_harvest_cache_dir(hint=args.remote_host).dir
|
||||
if not args.out:
|
||||
raise SystemExit(
|
||||
"error: --out is required unless --remote-host is set"
|
||||
)
|
||||
path = harvest(
|
||||
out_dir,
|
||||
args.out,
|
||||
dangerous=bool(args.dangerous),
|
||||
include_paths=list(getattr(args, "include_path", []) or []),
|
||||
exclude_paths=list(getattr(args, "exclude_path", []) or []),
|
||||
|
|
@ -756,11 +747,9 @@ def main() -> None:
|
|||
raise SystemExit(2)
|
||||
elif args.cmd == "single-shot":
|
||||
sops_fps = getattr(args, "sops", None)
|
||||
if args.remote_host:
|
||||
if remote_host:
|
||||
if sops_fps:
|
||||
out_file = _resolve_sops_out_file(
|
||||
args.harvest, hint=args.remote_host
|
||||
)
|
||||
out_file = _resolve_sops_out_file(args.harvest, hint=remote_host)
|
||||
with tempfile.TemporaryDirectory(prefix="enroll-harvest-") as td:
|
||||
tmp_bundle = Path(td) / "bundle"
|
||||
tmp_bundle.mkdir(parents=True, exist_ok=True)
|
||||
|
|
@ -770,7 +759,7 @@ def main() -> None:
|
|||
pass
|
||||
remote_harvest(
|
||||
local_out_dir=tmp_bundle,
|
||||
remote_host=args.remote_host,
|
||||
remote_host=remote_host,
|
||||
remote_port=int(args.remote_port),
|
||||
remote_user=args.remote_user,
|
||||
dangerous=bool(args.dangerous),
|
||||
|
|
@ -795,11 +784,11 @@ def main() -> None:
|
|||
harvest_dir = (
|
||||
Path(args.harvest)
|
||||
if args.harvest
|
||||
else new_harvest_cache_dir(hint=args.remote_host).dir
|
||||
else new_harvest_cache_dir(hint=remote_host).dir
|
||||
)
|
||||
remote_harvest(
|
||||
local_out_dir=harvest_dir,
|
||||
remote_host=args.remote_host,
|
||||
remote_host=remote_host,
|
||||
remote_port=int(args.remote_port),
|
||||
remote_user=args.remote_user,
|
||||
dangerous=bool(args.dangerous),
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import glob
|
||||
import hashlib
|
||||
import os
|
||||
import subprocess # nosec
|
||||
from typing import Dict, List, Optional, Set, Tuple
|
||||
|
|
@ -179,3 +180,28 @@ def read_pkg_md5sums(pkg: str) -> Dict[str, str]:
|
|||
md5, rel = line.split(None, 1)
|
||||
m[rel.strip()] = md5.strip()
|
||||
return m
|
||||
|
||||
|
||||
def file_md5(path: str) -> str:
|
||||
h = hashlib.md5() # nosec
|
||||
with open(path, "rb") as f:
|
||||
for chunk in iter(lambda: f.read(1024 * 1024), b""):
|
||||
h.update(chunk)
|
||||
return h.hexdigest()
|
||||
|
||||
|
||||
def stat_triplet(path: str) -> Tuple[str, str, str]:
|
||||
st = os.stat(path, follow_symlinks=True)
|
||||
mode = oct(st.st_mode & 0o777)[2:].zfill(4)
|
||||
|
||||
import pwd, grp
|
||||
|
||||
try:
|
||||
owner = pwd.getpwuid(st.st_uid).pw_name
|
||||
except KeyError:
|
||||
owner = str(st.st_uid)
|
||||
try:
|
||||
group = grp.getgrgid(st.st_gid).gr_name
|
||||
except KeyError:
|
||||
group = str(st.st_gid)
|
||||
return owner, group, mode
|
||||
|
|
|
|||
|
|
@ -1,40 +0,0 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import os
|
||||
from typing import Tuple
|
||||
|
||||
|
||||
def file_md5(path: str) -> str:
|
||||
"""Return hex MD5 of a file.
|
||||
|
||||
Used for Debian dpkg baseline comparisons.
|
||||
"""
|
||||
h = hashlib.md5() # nosec
|
||||
with open(path, "rb") as f:
|
||||
for chunk in iter(lambda: f.read(1024 * 1024), b""):
|
||||
h.update(chunk)
|
||||
return h.hexdigest()
|
||||
|
||||
|
||||
def stat_triplet(path: str) -> Tuple[str, str, str]:
|
||||
"""Return (owner, group, mode) for a path.
|
||||
|
||||
owner/group are usernames/group names when resolvable, otherwise numeric ids.
|
||||
mode is a zero-padded octal string (e.g. "0644").
|
||||
"""
|
||||
st = os.stat(path, follow_symlinks=True)
|
||||
mode = oct(st.st_mode & 0o777)[2:].zfill(4)
|
||||
|
||||
import grp
|
||||
import pwd
|
||||
|
||||
try:
|
||||
owner = pwd.getpwuid(st.st_uid).pw_name
|
||||
except KeyError:
|
||||
owner = str(st.st_uid)
|
||||
try:
|
||||
group = grp.getgrgid(st.st_gid).gr_name
|
||||
except KeyError:
|
||||
group = str(st.st_gid)
|
||||
return owner, group, mode
|
||||
|
|
@ -15,12 +15,18 @@ from .systemd import (
|
|||
get_timer_info,
|
||||
UnitQueryError,
|
||||
)
|
||||
from .fsutil import stat_triplet
|
||||
from .platform import detect_platform, get_backend
|
||||
from .debian import (
|
||||
build_dpkg_etc_index,
|
||||
dpkg_owner,
|
||||
file_md5,
|
||||
list_manual_packages,
|
||||
parse_status_conffiles,
|
||||
read_pkg_md5sums,
|
||||
stat_triplet,
|
||||
)
|
||||
from .ignore import IgnorePolicy
|
||||
from .pathfilter import PathFilter, expand_includes
|
||||
from .accounts import collect_non_system_users
|
||||
from .version import get_enroll_version
|
||||
|
||||
|
||||
@dataclass
|
||||
|
|
@ -79,14 +85,6 @@ class AptConfigSnapshot:
|
|||
notes: List[str]
|
||||
|
||||
|
||||
@dataclass
|
||||
class DnfConfigSnapshot:
|
||||
role_name: str
|
||||
managed_files: List[ManagedFile]
|
||||
excluded: List[ExcludedFile]
|
||||
notes: List[str]
|
||||
|
||||
|
||||
@dataclass
|
||||
class EtcCustomSnapshot:
|
||||
role_name: str
|
||||
|
|
@ -160,13 +158,6 @@ SHARED_ETC_TOPDIRS = {
|
|||
"sudoers.d",
|
||||
"sysctl.d",
|
||||
"systemd",
|
||||
# RPM-family shared trees
|
||||
"dnf",
|
||||
"yum",
|
||||
"yum.repos.d",
|
||||
"sysconfig",
|
||||
"pki",
|
||||
"firewalld",
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -323,13 +314,7 @@ def _add_pkgs_from_etc_topdirs(
|
|||
pkgs.add(p)
|
||||
|
||||
|
||||
def _maybe_add_specific_paths(hints: Set[str], backend) -> List[str]:
|
||||
# Delegate to backend-specific conventions (e.g. /etc/default on Debian,
|
||||
# /etc/sysconfig on Fedora/RHEL). Always include sysctl.d.
|
||||
try:
|
||||
return backend.specific_paths_for_hints(hints)
|
||||
except Exception:
|
||||
# Best-effort fallback (Debian-ish).
|
||||
def _maybe_add_specific_paths(hints: Set[str]) -> List[str]:
|
||||
paths: List[str] = []
|
||||
for h in hints:
|
||||
paths.extend(
|
||||
|
|
@ -423,7 +408,6 @@ _SYSTEM_CAPTURE_GLOBS: List[tuple[str, str]] = [
|
|||
("/etc/anacron/*", "system_cron"),
|
||||
("/var/spool/cron/crontabs/*", "system_cron"),
|
||||
("/var/spool/crontabs/*", "system_cron"),
|
||||
("/var/spool/cron/*", "system_cron"),
|
||||
# network
|
||||
("/etc/netplan/*", "system_network"),
|
||||
("/etc/systemd/network/*", "system_network"),
|
||||
|
|
@ -431,9 +415,6 @@ _SYSTEM_CAPTURE_GLOBS: List[tuple[str, str]] = [
|
|||
("/etc/network/interfaces.d/*", "system_network"),
|
||||
("/etc/resolvconf.conf", "system_network"),
|
||||
("/etc/resolvconf/resolv.conf.d/*", "system_network"),
|
||||
("/etc/NetworkManager/system-connections/*", "system_network"),
|
||||
("/etc/sysconfig/network*", "system_network"),
|
||||
("/etc/sysconfig/network-scripts/*", "system_network"),
|
||||
# firewall
|
||||
("/etc/nftables.conf", "system_firewall"),
|
||||
("/etc/nftables.d/*", "system_firewall"),
|
||||
|
|
@ -441,10 +422,6 @@ _SYSTEM_CAPTURE_GLOBS: List[tuple[str, str]] = [
|
|||
("/etc/iptables/rules.v6", "system_firewall"),
|
||||
("/etc/ufw/*", "system_firewall"),
|
||||
("/etc/default/ufw", "system_firewall"),
|
||||
("/etc/firewalld/*", "system_firewall"),
|
||||
("/etc/firewalld/zones/*", "system_firewall"),
|
||||
# SELinux
|
||||
("/etc/selinux/config", "system_security"),
|
||||
# other
|
||||
("/etc/rc.local", "system_rc"),
|
||||
]
|
||||
|
|
@ -576,51 +553,6 @@ def _iter_apt_capture_paths() -> List[tuple[str, str]]:
|
|||
return uniq
|
||||
|
||||
|
||||
def _iter_dnf_capture_paths() -> List[tuple[str, str]]:
|
||||
"""Return (path, reason) pairs for DNF/YUM configuration on RPM systems.
|
||||
|
||||
Captures:
|
||||
- /etc/dnf/* (dnf.conf, vars, plugins, modules, automatic)
|
||||
- /etc/yum.conf (legacy)
|
||||
- /etc/yum.repos.d/*.repo
|
||||
- /etc/pki/rpm-gpg/* (GPG key files)
|
||||
"""
|
||||
reasons: Dict[str, str] = {}
|
||||
|
||||
for root, tag in (
|
||||
("/etc/dnf", "dnf_config"),
|
||||
("/etc/yum", "yum_config"),
|
||||
):
|
||||
if os.path.isdir(root):
|
||||
for dirpath, _, filenames in os.walk(root):
|
||||
for fn in filenames:
|
||||
p = os.path.join(dirpath, fn)
|
||||
if os.path.islink(p) or not os.path.isfile(p):
|
||||
continue
|
||||
reasons.setdefault(p, tag)
|
||||
|
||||
# Legacy yum.conf.
|
||||
if os.path.isfile("/etc/yum.conf") and not os.path.islink("/etc/yum.conf"):
|
||||
reasons.setdefault("/etc/yum.conf", "yum_conf")
|
||||
|
||||
# Repositories.
|
||||
if os.path.isdir("/etc/yum.repos.d"):
|
||||
for p in _iter_matching_files("/etc/yum.repos.d/*.repo"):
|
||||
reasons[p] = "yum_repo"
|
||||
|
||||
# RPM GPG keys.
|
||||
if os.path.isdir("/etc/pki/rpm-gpg"):
|
||||
for dirpath, _, filenames in os.walk("/etc/pki/rpm-gpg"):
|
||||
for fn in filenames:
|
||||
p = os.path.join(dirpath, fn)
|
||||
if os.path.islink(p) or not os.path.isfile(p):
|
||||
continue
|
||||
reasons.setdefault(p, "rpm_gpg_key")
|
||||
|
||||
# Stable ordering.
|
||||
return [(p, reasons[p]) for p in sorted(reasons.keys())]
|
||||
|
||||
|
||||
def _iter_system_capture_paths() -> List[tuple[str, str]]:
|
||||
"""Return (path, reason) pairs for essential system config/state (non-APT)."""
|
||||
out: List[tuple[str, str]] = []
|
||||
|
|
@ -668,12 +600,8 @@ def harvest(
|
|||
flush=True,
|
||||
)
|
||||
|
||||
platform = detect_platform()
|
||||
backend = get_backend(platform)
|
||||
|
||||
owned_etc, etc_owner_map, topdir_to_pkgs, pkg_to_etc_paths = (
|
||||
backend.build_etc_index()
|
||||
)
|
||||
owned_etc, etc_owner_map, topdir_to_pkgs, pkg_to_etc_paths = build_dpkg_etc_index()
|
||||
conffiles_by_pkg = parse_status_conffiles()
|
||||
|
||||
# -------------------------
|
||||
# Service roles
|
||||
|
|
@ -717,12 +645,12 @@ def harvest(
|
|||
candidates: Dict[str, str] = {}
|
||||
|
||||
if ui.fragment_path:
|
||||
p = backend.owner_of_path(ui.fragment_path)
|
||||
p = dpkg_owner(ui.fragment_path)
|
||||
if p:
|
||||
pkgs.add(p)
|
||||
|
||||
for exe in ui.exec_paths:
|
||||
p = backend.owner_of_path(exe)
|
||||
p = dpkg_owner(exe)
|
||||
if p:
|
||||
pkgs.add(p)
|
||||
|
||||
|
|
@ -747,7 +675,7 @@ def harvest(
|
|||
# logrotate.d entries) can still be attributed back to this service.
|
||||
service_role_aliases[role] = set(hints) | set(pkgs) | {role}
|
||||
|
||||
for sp in _maybe_add_specific_paths(hints, backend):
|
||||
for sp in _maybe_add_specific_paths(hints):
|
||||
if not os.path.exists(sp):
|
||||
continue
|
||||
if sp in etc_owner_map:
|
||||
|
|
@ -756,13 +684,31 @@ def harvest(
|
|||
candidates.setdefault(sp, "custom_specific_path")
|
||||
|
||||
for pkg in sorted(pkgs):
|
||||
etc_paths = pkg_to_etc_paths.get(pkg, [])
|
||||
for path, reason in backend.modified_paths(pkg, etc_paths).items():
|
||||
conff = conffiles_by_pkg.get(pkg, {})
|
||||
md5sums = read_pkg_md5sums(pkg)
|
||||
for path in pkg_to_etc_paths.get(pkg, []):
|
||||
if not os.path.isfile(path) or os.path.islink(path):
|
||||
continue
|
||||
if backend.is_pkg_config_path(path):
|
||||
if path.startswith("/etc/apt/"):
|
||||
continue
|
||||
candidates.setdefault(path, reason)
|
||||
if path in conff:
|
||||
# Only capture conffiles when they differ from the package default.
|
||||
try:
|
||||
current = file_md5(path)
|
||||
except OSError:
|
||||
continue
|
||||
if current != conff[path]:
|
||||
candidates.setdefault(path, "modified_conffile")
|
||||
continue
|
||||
rel = path.lstrip("/")
|
||||
baseline = md5sums.get(rel)
|
||||
if baseline:
|
||||
try:
|
||||
current = file_md5(path)
|
||||
except OSError:
|
||||
continue
|
||||
if current != baseline:
|
||||
candidates.setdefault(path, "modified_packaged_file")
|
||||
|
||||
# Capture custom/unowned files living under /etc/<name> for this service.
|
||||
#
|
||||
|
|
@ -901,18 +847,18 @@ def harvest(
|
|||
# (useful when a timer triggers a service that isn't enabled).
|
||||
pkgs: Set[str] = set()
|
||||
if ti.fragment_path:
|
||||
p = backend.owner_of_path(ti.fragment_path)
|
||||
p = dpkg_owner(ti.fragment_path)
|
||||
if p:
|
||||
pkgs.add(p)
|
||||
if ti.trigger_unit and ti.trigger_unit.endswith(".service"):
|
||||
try:
|
||||
ui = get_unit_info(ti.trigger_unit)
|
||||
if ui.fragment_path:
|
||||
p = backend.owner_of_path(ui.fragment_path)
|
||||
p = dpkg_owner(ui.fragment_path)
|
||||
if p:
|
||||
pkgs.add(p)
|
||||
for exe in ui.exec_paths:
|
||||
p = backend.owner_of_path(exe)
|
||||
p = dpkg_owner(exe)
|
||||
if p:
|
||||
pkgs.add(p)
|
||||
except Exception: # nosec
|
||||
|
|
@ -924,7 +870,7 @@ def harvest(
|
|||
# -------------------------
|
||||
# Manually installed package roles
|
||||
# -------------------------
|
||||
manual_pkgs = backend.list_manual_packages()
|
||||
manual_pkgs = list_manual_packages()
|
||||
# Avoid duplicate roles: if a manual package is already managed by any service role, skip its pkg_<name> role.
|
||||
covered_by_services: Set[str] = set()
|
||||
for s in service_snaps:
|
||||
|
|
@ -947,26 +893,41 @@ def harvest(
|
|||
for tpath in timer_extra_by_pkg.get(pkg, []):
|
||||
candidates.setdefault(tpath, "related_timer")
|
||||
|
||||
etc_paths = pkg_to_etc_paths.get(pkg, [])
|
||||
for path, reason in backend.modified_paths(pkg, etc_paths).items():
|
||||
conff = conffiles_by_pkg.get(pkg, {})
|
||||
md5sums = read_pkg_md5sums(pkg)
|
||||
|
||||
for path in pkg_to_etc_paths.get(pkg, []):
|
||||
if not os.path.isfile(path) or os.path.islink(path):
|
||||
continue
|
||||
if backend.is_pkg_config_path(path):
|
||||
if path.startswith("/etc/apt/"):
|
||||
continue
|
||||
candidates.setdefault(path, reason)
|
||||
if path in conff:
|
||||
try:
|
||||
current = file_md5(path)
|
||||
except OSError:
|
||||
continue
|
||||
if current != conff[path]:
|
||||
candidates.setdefault(path, "modified_conffile")
|
||||
continue
|
||||
rel = path.lstrip("/")
|
||||
baseline = md5sums.get(rel)
|
||||
if baseline:
|
||||
try:
|
||||
current = file_md5(path)
|
||||
except OSError:
|
||||
continue
|
||||
if current != baseline:
|
||||
candidates.setdefault(path, "modified_packaged_file")
|
||||
|
||||
topdirs = _topdirs_for_package(pkg, pkg_to_etc_paths)
|
||||
roots: List[str] = []
|
||||
# Collect candidate directories plus backend-specific common files.
|
||||
for td in sorted(topdirs):
|
||||
if td in SHARED_ETC_TOPDIRS:
|
||||
continue
|
||||
if backend.is_pkg_config_path(f"/etc/{td}/") or backend.is_pkg_config_path(
|
||||
f"/etc/{td}"
|
||||
):
|
||||
continue
|
||||
roots.extend([f"/etc/{td}", f"/etc/{td}.d"])
|
||||
roots.extend(_maybe_add_specific_paths(set(topdirs), backend))
|
||||
roots.extend([f"/etc/default/{td}"])
|
||||
roots.extend([f"/etc/init.d/{td}"])
|
||||
roots.extend([f"/etc/sysctl.d/{td}.conf"])
|
||||
|
||||
# Capture any custom/unowned files under /etc/<topdir> for this
|
||||
# manually-installed package. This may include runtime-generated
|
||||
|
|
@ -1070,22 +1031,14 @@ def harvest(
|
|||
)
|
||||
|
||||
# -------------------------
|
||||
# Package manager config role
|
||||
# - Debian: apt_config
|
||||
# - Fedora/RHEL-like: dnf_config
|
||||
# apt_config role (APT configuration and keyrings)
|
||||
# -------------------------
|
||||
apt_notes: List[str] = []
|
||||
apt_excluded: List[ExcludedFile] = []
|
||||
apt_managed: List[ManagedFile] = []
|
||||
dnf_notes: List[str] = []
|
||||
dnf_excluded: List[ExcludedFile] = []
|
||||
dnf_managed: List[ManagedFile] = []
|
||||
|
||||
apt_role_name = "apt_config"
|
||||
dnf_role_name = "dnf_config"
|
||||
|
||||
if backend.name == "dpkg":
|
||||
apt_role_seen = seen_by_role.setdefault(apt_role_name, set())
|
||||
|
||||
for path, reason in _iter_apt_capture_paths():
|
||||
_capture_file(
|
||||
bundle_dir=bundle_dir,
|
||||
|
|
@ -1098,20 +1051,6 @@ def harvest(
|
|||
excluded_out=apt_excluded,
|
||||
seen_role=apt_role_seen,
|
||||
)
|
||||
elif backend.name == "rpm":
|
||||
dnf_role_seen = seen_by_role.setdefault(dnf_role_name, set())
|
||||
for path, reason in _iter_dnf_capture_paths():
|
||||
_capture_file(
|
||||
bundle_dir=bundle_dir,
|
||||
role_name=dnf_role_name,
|
||||
abs_path=path,
|
||||
reason=reason,
|
||||
policy=policy,
|
||||
path_filter=path_filter,
|
||||
managed_out=dnf_managed,
|
||||
excluded_out=dnf_excluded,
|
||||
seen_role=dnf_role_seen,
|
||||
)
|
||||
|
||||
apt_config_snapshot = AptConfigSnapshot(
|
||||
role_name=apt_role_name,
|
||||
|
|
@ -1119,12 +1058,6 @@ def harvest(
|
|||
excluded=apt_excluded,
|
||||
notes=apt_notes,
|
||||
)
|
||||
dnf_config_snapshot = DnfConfigSnapshot(
|
||||
role_name=dnf_role_name,
|
||||
managed_files=dnf_managed,
|
||||
excluded=dnf_excluded,
|
||||
notes=dnf_notes,
|
||||
)
|
||||
|
||||
# -------------------------
|
||||
# etc_custom role (unowned /etc files not already attributed elsewhere)
|
||||
|
|
@ -1146,8 +1079,6 @@ def harvest(
|
|||
already.add(mf.path)
|
||||
for mf in apt_managed:
|
||||
already.add(mf.path)
|
||||
for mf in dnf_managed:
|
||||
already.add(mf.path)
|
||||
|
||||
# Maps for re-attributing shared snippets (cron.d/logrotate.d) to existing roles.
|
||||
svc_by_role: Dict[str, ServiceSnapshot] = {s.role_name: s for s in service_snaps}
|
||||
|
|
@ -1162,7 +1093,7 @@ def harvest(
|
|||
for pkg in s.packages:
|
||||
pkg_to_service_roles.setdefault(pkg, []).append(s.role_name)
|
||||
|
||||
# Alias -> role mapping used as a fallback when package ownership is missing.
|
||||
# Alias -> role mapping used as a fallback when dpkg ownership is missing.
|
||||
# Prefer service roles over package roles when both would match.
|
||||
alias_ranked: Dict[str, tuple[int, str]] = {}
|
||||
|
||||
|
|
@ -1193,8 +1124,8 @@ def harvest(
|
|||
per service.
|
||||
|
||||
Resolution order:
|
||||
1) package owner -> service role (if any service references the package)
|
||||
2) package owner -> package role (manual package role exists)
|
||||
1) dpkg owner -> service role (if any service references the package)
|
||||
2) dpkg owner -> package role (manual package role exists)
|
||||
3) basename/stem alias match -> preferred role
|
||||
"""
|
||||
if path.startswith("/etc/logrotate.d/"):
|
||||
|
|
@ -1216,7 +1147,7 @@ def harvest(
|
|||
seen.add(c)
|
||||
uniq.append(c)
|
||||
|
||||
pkg = backend.owner_of_path(path)
|
||||
pkg = dpkg_owner(path)
|
||||
if pkg:
|
||||
svc_roles = sorted(set(pkg_to_service_roles.get(pkg, [])))
|
||||
if svc_roles:
|
||||
|
|
@ -1295,7 +1226,7 @@ def harvest(
|
|||
for dirpath, _, filenames in os.walk("/etc"):
|
||||
for fn in filenames:
|
||||
path = os.path.join(dirpath, fn)
|
||||
if backend.is_pkg_config_path(path):
|
||||
if path.startswith("/etc/apt/"):
|
||||
continue
|
||||
if path in already:
|
||||
continue
|
||||
|
|
@ -1482,22 +1413,13 @@ def harvest(
|
|||
)
|
||||
|
||||
state = {
|
||||
"enroll": {
|
||||
"version": get_enroll_version(),
|
||||
},
|
||||
"host": {
|
||||
"hostname": os.uname().nodename,
|
||||
"os": platform.os_family,
|
||||
"pkg_backend": backend.name,
|
||||
"os_release": platform.os_release,
|
||||
},
|
||||
"host": {"hostname": os.uname().nodename, "os": "debian"},
|
||||
"users": asdict(users_snapshot),
|
||||
"services": [asdict(s) for s in service_snaps],
|
||||
"manual_packages": manual_pkgs,
|
||||
"manual_packages_skipped": manual_pkgs_skipped,
|
||||
"package_roles": [asdict(p) for p in pkg_snaps],
|
||||
"apt_config": asdict(apt_config_snapshot),
|
||||
"dnf_config": asdict(dnf_config_snapshot),
|
||||
"etc_custom": asdict(etc_custom_snapshot),
|
||||
"usr_local_custom": asdict(usr_local_custom_snapshot),
|
||||
"extra_paths": asdict(extra_paths_snapshot),
|
||||
|
|
|
|||
|
|
@ -43,7 +43,6 @@ DEFAULT_ALLOW_BINARY_GLOBS = [
|
|||
"/usr/share/keyrings/*.gpg",
|
||||
"/usr/share/keyrings/*.pgp",
|
||||
"/usr/share/keyrings/*.asc",
|
||||
"/etc/pki/rpm-gpg/*",
|
||||
]
|
||||
|
||||
SENSITIVE_CONTENT_PATTERNS = [
|
||||
|
|
|
|||
|
|
@ -166,7 +166,6 @@ def _write_playbook_all(path: str, roles: List[str]) -> None:
|
|||
pb_lines = [
|
||||
"---",
|
||||
"- name: Apply all roles on all hosts",
|
||||
" gather_facts: true",
|
||||
" hosts: all",
|
||||
" become: true",
|
||||
" roles:",
|
||||
|
|
@ -182,7 +181,6 @@ def _write_playbook_host(path: str, fqdn: str, roles: List[str]) -> None:
|
|||
"---",
|
||||
f"- name: Apply all roles on {fqdn}",
|
||||
f" hosts: {fqdn}",
|
||||
" gather_facts: true",
|
||||
" become: true",
|
||||
" roles:",
|
||||
]
|
||||
|
|
@ -470,51 +468,6 @@ def _render_generic_files_tasks(
|
|||
"""
|
||||
|
||||
|
||||
def _render_install_packages_tasks(role: str, var_prefix: str) -> str:
|
||||
"""Render cross-distro package installation tasks.
|
||||
|
||||
We generate conditional tasks for apt/dnf/yum, falling back to the
|
||||
generic `package` module. This keeps generated roles usable on both
|
||||
Debian-like and RPM-like systems.
|
||||
"""
|
||||
return f"""# Generated by enroll
|
||||
|
||||
- name: Install packages for {role} (APT)
|
||||
ansible.builtin.apt:
|
||||
name: "{{{{ {var_prefix}_packages | default([]) }}}}"
|
||||
state: present
|
||||
update_cache: true
|
||||
when:
|
||||
- ({var_prefix}_packages | default([])) | length > 0
|
||||
- ansible_facts.pkg_mgr | default('') == 'apt'
|
||||
|
||||
- name: Install packages for {role} (DNF5)
|
||||
ansible.builtin.dnf5:
|
||||
name: "{{{{ {var_prefix}_packages | default([]) }}}}"
|
||||
state: present
|
||||
when:
|
||||
- ({var_prefix}_packages | default([])) | length > 0
|
||||
- ansible_facts.pkg_mgr | default('') == 'dnf5'
|
||||
|
||||
- name: Install packages for {role} (DNF/YUM)
|
||||
ansible.builtin.dnf:
|
||||
name: "{{{{ {var_prefix}_packages | default([]) }}}}"
|
||||
state: present
|
||||
when:
|
||||
- ({var_prefix}_packages | default([])) | length > 0
|
||||
- ansible_facts.pkg_mgr | default('') in ['dnf', 'yum']
|
||||
|
||||
- name: Install packages for {role} (generic fallback)
|
||||
ansible.builtin.package:
|
||||
name: "{{{{ {var_prefix}_packages | default([]) }}}}"
|
||||
state: present
|
||||
when:
|
||||
- ({var_prefix}_packages | default([])) | length > 0
|
||||
- ansible_facts.pkg_mgr | default('') not in ['apt', 'dnf', 'dnf5', 'yum']
|
||||
|
||||
"""
|
||||
|
||||
|
||||
def _prepare_bundle_dir(
|
||||
bundle: str,
|
||||
*,
|
||||
|
|
@ -676,7 +629,6 @@ def _manifest_from_bundle_dir(
|
|||
package_roles: List[Dict[str, Any]] = state.get("package_roles", [])
|
||||
users_snapshot: Dict[str, Any] = state.get("users", {})
|
||||
apt_config_snapshot: Dict[str, Any] = state.get("apt_config", {})
|
||||
dnf_config_snapshot: Dict[str, Any] = state.get("dnf_config", {})
|
||||
etc_custom_snapshot: Dict[str, Any] = state.get("etc_custom", {})
|
||||
usr_local_custom_snapshot: Dict[str, Any] = state.get("usr_local_custom", {})
|
||||
extra_paths_snapshot: Dict[str, Any] = state.get("extra_paths", {})
|
||||
|
|
@ -712,7 +664,6 @@ def _manifest_from_bundle_dir(
|
|||
|
||||
manifested_users_roles: List[str] = []
|
||||
manifested_apt_config_roles: List[str] = []
|
||||
manifested_dnf_config_roles: List[str] = []
|
||||
manifested_etc_custom_roles: List[str] = []
|
||||
manifested_usr_local_custom_roles: List[str] = []
|
||||
manifested_extra_paths_roles: List[str] = []
|
||||
|
|
@ -1090,157 +1041,6 @@ APT configuration harvested from the system (sources, pinning, and keyrings).
|
|||
|
||||
manifested_apt_config_roles.append(role)
|
||||
|
||||
# -------------------------
|
||||
# dnf_config role (DNF/YUM repos, config, and RPM GPG keys)
|
||||
# -------------------------
|
||||
if dnf_config_snapshot and dnf_config_snapshot.get("managed_files"):
|
||||
role = dnf_config_snapshot.get("role_name", "dnf_config")
|
||||
role_dir = os.path.join(roles_root, role)
|
||||
_write_role_scaffold(role_dir)
|
||||
|
||||
var_prefix = role
|
||||
|
||||
managed_files = dnf_config_snapshot.get("managed_files", [])
|
||||
excluded = dnf_config_snapshot.get("excluded", [])
|
||||
notes = dnf_config_snapshot.get("notes", [])
|
||||
|
||||
templated, jt_vars = _jinjify_managed_files(
|
||||
bundle_dir,
|
||||
role,
|
||||
role_dir,
|
||||
managed_files,
|
||||
jt_exe=jt_exe,
|
||||
jt_enabled=jt_enabled,
|
||||
overwrite_templates=not site_mode,
|
||||
)
|
||||
|
||||
if site_mode:
|
||||
_copy_artifacts(
|
||||
bundle_dir,
|
||||
role,
|
||||
_host_role_files_dir(out_dir, fqdn or "", role),
|
||||
exclude_rels=templated,
|
||||
)
|
||||
else:
|
||||
_copy_artifacts(
|
||||
bundle_dir,
|
||||
role,
|
||||
os.path.join(role_dir, "files"),
|
||||
exclude_rels=templated,
|
||||
)
|
||||
|
||||
files_var = _build_managed_files_var(
|
||||
managed_files,
|
||||
templated,
|
||||
notify_other=None,
|
||||
notify_systemd=None,
|
||||
)
|
||||
|
||||
jt_map = _yaml_load_mapping(jt_vars) if jt_vars.strip() else {}
|
||||
vars_map: Dict[str, Any] = {f"{var_prefix}_managed_files": files_var}
|
||||
vars_map = _merge_mappings_overwrite(vars_map, jt_map)
|
||||
|
||||
if site_mode:
|
||||
_write_role_defaults(role_dir, {f"{var_prefix}_managed_files": []})
|
||||
_write_hostvars(out_dir, fqdn or "", role, vars_map)
|
||||
else:
|
||||
_write_role_defaults(role_dir, vars_map)
|
||||
|
||||
tasks = "---\n" + _render_generic_files_tasks(
|
||||
var_prefix, include_restart_notify=False
|
||||
)
|
||||
with open(
|
||||
os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8"
|
||||
) as f:
|
||||
f.write(tasks.rstrip() + "\n")
|
||||
|
||||
with open(
|
||||
os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8"
|
||||
) as f:
|
||||
f.write("---\ndependencies: []\n")
|
||||
|
||||
# README: summarise repos and GPG key material
|
||||
repo_paths: List[str] = []
|
||||
key_paths: List[str] = []
|
||||
repo_hosts: Set[str] = set()
|
||||
|
||||
url_re = re.compile(r"(?:https?|ftp)://([^/\s]+)", re.IGNORECASE)
|
||||
file_url_re = re.compile(r"file://(/[^\s]+)")
|
||||
|
||||
for mf in managed_files:
|
||||
p = str(mf.get("path") or "")
|
||||
src_rel = str(mf.get("src_rel") or "")
|
||||
if not p or not src_rel:
|
||||
continue
|
||||
|
||||
if p.startswith("/etc/yum.repos.d/") and p.endswith(".repo"):
|
||||
repo_paths.append(p)
|
||||
art_path = os.path.join(bundle_dir, "artifacts", role, src_rel)
|
||||
try:
|
||||
with open(art_path, "r", encoding="utf-8", errors="replace") as rf:
|
||||
for line in rf:
|
||||
s = line.strip()
|
||||
if not s or s.startswith("#") or s.startswith(";"):
|
||||
continue
|
||||
# Collect hostnames from URLs (baseurl, mirrorlist, metalink, gpgkey...)
|
||||
for m in url_re.finditer(s):
|
||||
repo_hosts.add(m.group(1))
|
||||
# Collect local gpgkey file paths referenced as file:///...
|
||||
for m in file_url_re.finditer(s):
|
||||
key_paths.append(m.group(1))
|
||||
except OSError:
|
||||
pass # nosec
|
||||
|
||||
if p.startswith("/etc/pki/rpm-gpg/"):
|
||||
key_paths.append(p)
|
||||
|
||||
repo_paths = sorted(set(repo_paths))
|
||||
key_paths = sorted(set(key_paths))
|
||||
repos = sorted(repo_hosts)
|
||||
|
||||
readme = (
|
||||
"""# dnf_config
|
||||
|
||||
DNF/YUM configuration harvested from the system (repos, config files, and RPM GPG keys).
|
||||
|
||||
## Repository hosts
|
||||
"""
|
||||
+ ("\n".join([f"- {h}" for h in repos]) or "- (none)")
|
||||
+ """\n
|
||||
## Repo files
|
||||
"""
|
||||
+ ("\n".join([f"- {p}" for p in repo_paths]) or "- (none)")
|
||||
+ """\n
|
||||
## GPG keys
|
||||
"""
|
||||
+ ("\n".join([f"- {p}" for p in key_paths]) or "- (none)")
|
||||
+ """\n
|
||||
## Managed files
|
||||
"""
|
||||
+ (
|
||||
"\n".join(
|
||||
[f"- {mf.get('path')} ({mf.get('reason')})" for mf in managed_files]
|
||||
)
|
||||
or "- (none)"
|
||||
)
|
||||
+ """\n
|
||||
## Excluded
|
||||
"""
|
||||
+ (
|
||||
"\n".join([f"- {e.get('path')} ({e.get('reason')})" for e in excluded])
|
||||
or "- (none)"
|
||||
)
|
||||
+ """\n
|
||||
## Notes
|
||||
"""
|
||||
+ ("\n".join([f"- {n}" for n in notes]) or "- (none)")
|
||||
+ """\n"""
|
||||
)
|
||||
with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f:
|
||||
f.write(readme)
|
||||
|
||||
manifested_dnf_config_roles.append(role)
|
||||
|
||||
# -------------------------
|
||||
# etc_custom role (unowned /etc not already attributed)
|
||||
# -------------------------
|
||||
|
|
@ -1657,7 +1457,19 @@ User-requested extra file harvesting.
|
|||
f.write(handlers)
|
||||
|
||||
task_parts: List[str] = []
|
||||
task_parts.append("---\n" + _render_install_packages_tasks(role, var_prefix))
|
||||
task_parts.append(
|
||||
f"""---
|
||||
# Generated by enroll
|
||||
|
||||
- name: Install packages for {role}
|
||||
ansible.builtin.apt:
|
||||
name: "{{{{ {var_prefix}_packages | default([]) }}}}"
|
||||
state: present
|
||||
update_cache: true
|
||||
when: ({var_prefix}_packages | default([])) | length > 0
|
||||
|
||||
"""
|
||||
)
|
||||
|
||||
task_parts.append(
|
||||
_render_generic_files_tasks(var_prefix, include_restart_notify=True)
|
||||
|
|
@ -1804,7 +1616,19 @@ Generated from `{unit}`.
|
|||
f.write(handlers)
|
||||
|
||||
task_parts: List[str] = []
|
||||
task_parts.append("---\n" + _render_install_packages_tasks(role, var_prefix))
|
||||
task_parts.append(
|
||||
f"""---
|
||||
# Generated by enroll
|
||||
|
||||
- name: Install packages for {role}
|
||||
ansible.builtin.apt:
|
||||
name: "{{{{ {var_prefix}_packages | default([]) }}}}"
|
||||
state: present
|
||||
update_cache: true
|
||||
when: ({var_prefix}_packages | default([])) | length > 0
|
||||
|
||||
"""
|
||||
)
|
||||
task_parts.append(
|
||||
_render_generic_files_tasks(var_prefix, include_restart_notify=False)
|
||||
)
|
||||
|
|
@ -1843,7 +1667,6 @@ Generated for package `{pkg}`.
|
|||
manifested_pkg_roles.append(role)
|
||||
all_roles = (
|
||||
manifested_apt_config_roles
|
||||
+ manifested_dnf_config_roles
|
||||
+ manifested_pkg_roles
|
||||
+ manifested_service_roles
|
||||
+ manifested_etc_custom_roles
|
||||
|
|
|
|||
|
|
@ -1,261 +0,0 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import shutil
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, List, Optional, Set, Tuple
|
||||
|
||||
from .fsutil import file_md5
|
||||
|
||||
|
||||
def _read_os_release(path: str = "/etc/os-release") -> Dict[str, str]:
|
||||
out: Dict[str, str] = {}
|
||||
try:
|
||||
with open(path, "r", encoding="utf-8", errors="replace") as f:
|
||||
for raw in f:
|
||||
line = raw.strip()
|
||||
if not line or line.startswith("#") or "=" not in line:
|
||||
continue
|
||||
k, v = line.split("=", 1)
|
||||
k = k.strip()
|
||||
v = v.strip().strip('"')
|
||||
out[k] = v
|
||||
except OSError:
|
||||
return {}
|
||||
return out
|
||||
|
||||
|
||||
@dataclass
|
||||
class PlatformInfo:
|
||||
os_family: str # debian|redhat|unknown
|
||||
pkg_backend: str # dpkg|rpm|unknown
|
||||
os_release: Dict[str, str]
|
||||
|
||||
|
||||
def detect_platform() -> PlatformInfo:
|
||||
"""Detect platform family and package backend.
|
||||
|
||||
Uses /etc/os-release when available, with a conservative fallback to
|
||||
checking for dpkg/rpm binaries.
|
||||
"""
|
||||
|
||||
osr = _read_os_release()
|
||||
os_id = (osr.get("ID") or "").strip().lower()
|
||||
likes = (osr.get("ID_LIKE") or "").strip().lower().split()
|
||||
|
||||
deb_ids = {"debian", "ubuntu", "linuxmint", "raspbian", "kali"}
|
||||
rhel_ids = {
|
||||
"fedora",
|
||||
"rhel",
|
||||
"centos",
|
||||
"rocky",
|
||||
"almalinux",
|
||||
"ol",
|
||||
"oracle",
|
||||
"scientific",
|
||||
}
|
||||
|
||||
if os_id in deb_ids or "debian" in likes:
|
||||
return PlatformInfo(os_family="debian", pkg_backend="dpkg", os_release=osr)
|
||||
if os_id in rhel_ids or any(
|
||||
x in likes for x in ("rhel", "fedora", "centos", "redhat")
|
||||
):
|
||||
return PlatformInfo(os_family="redhat", pkg_backend="rpm", os_release=osr)
|
||||
|
||||
# Fallback heuristics.
|
||||
if shutil.which("dpkg"):
|
||||
return PlatformInfo(os_family="debian", pkg_backend="dpkg", os_release=osr)
|
||||
if shutil.which("rpm"):
|
||||
return PlatformInfo(os_family="redhat", pkg_backend="rpm", os_release=osr)
|
||||
return PlatformInfo(os_family="unknown", pkg_backend="unknown", os_release=osr)
|
||||
|
||||
|
||||
class PackageBackend:
|
||||
"""Backend abstraction for package ownership, config detection, and manual package lists."""
|
||||
|
||||
name: str
|
||||
pkg_config_prefixes: Tuple[str, ...]
|
||||
|
||||
def owner_of_path(self, path: str) -> Optional[str]: # pragma: no cover
|
||||
raise NotImplementedError
|
||||
|
||||
def list_manual_packages(self) -> List[str]: # pragma: no cover
|
||||
raise NotImplementedError
|
||||
|
||||
def build_etc_index(
|
||||
self,
|
||||
) -> Tuple[
|
||||
Set[str], Dict[str, str], Dict[str, Set[str]], Dict[str, List[str]]
|
||||
]: # pragma: no cover
|
||||
raise NotImplementedError
|
||||
|
||||
def specific_paths_for_hints(self, hints: Set[str]) -> List[str]:
|
||||
return []
|
||||
|
||||
def is_pkg_config_path(self, path: str) -> bool:
|
||||
for pfx in self.pkg_config_prefixes:
|
||||
if path == pfx or path.startswith(pfx):
|
||||
return True
|
||||
return False
|
||||
|
||||
def modified_paths(self, pkg: str, etc_paths: List[str]) -> Dict[str, str]:
|
||||
"""Return a mapping of modified file paths -> reason label."""
|
||||
return {}
|
||||
|
||||
|
||||
class DpkgBackend(PackageBackend):
|
||||
name = "dpkg"
|
||||
pkg_config_prefixes = ("/etc/apt/",)
|
||||
|
||||
def __init__(self) -> None:
|
||||
from .debian import parse_status_conffiles
|
||||
|
||||
self._conffiles_by_pkg = parse_status_conffiles()
|
||||
|
||||
def owner_of_path(self, path: str) -> Optional[str]:
|
||||
from .debian import dpkg_owner
|
||||
|
||||
return dpkg_owner(path)
|
||||
|
||||
def list_manual_packages(self) -> List[str]:
|
||||
from .debian import list_manual_packages
|
||||
|
||||
return list_manual_packages()
|
||||
|
||||
def build_etc_index(self):
|
||||
from .debian import build_dpkg_etc_index
|
||||
|
||||
return build_dpkg_etc_index()
|
||||
|
||||
def specific_paths_for_hints(self, hints: Set[str]) -> List[str]:
|
||||
paths: List[str] = []
|
||||
for h in hints:
|
||||
paths.extend(
|
||||
[
|
||||
f"/etc/default/{h}",
|
||||
f"/etc/init.d/{h}",
|
||||
f"/etc/sysctl.d/{h}.conf",
|
||||
]
|
||||
)
|
||||
return paths
|
||||
|
||||
def modified_paths(self, pkg: str, etc_paths: List[str]) -> Dict[str, str]:
|
||||
from .debian import read_pkg_md5sums
|
||||
|
||||
out: Dict[str, str] = {}
|
||||
conff = self._conffiles_by_pkg.get(pkg, {})
|
||||
md5sums = read_pkg_md5sums(pkg)
|
||||
|
||||
for path in etc_paths:
|
||||
if not path.startswith("/etc/"):
|
||||
continue
|
||||
if self.is_pkg_config_path(path):
|
||||
continue
|
||||
if path in conff:
|
||||
try:
|
||||
current = file_md5(path)
|
||||
except OSError:
|
||||
continue
|
||||
if current != conff[path]:
|
||||
out[path] = "modified_conffile"
|
||||
continue
|
||||
|
||||
rel = path.lstrip("/")
|
||||
baseline = md5sums.get(rel)
|
||||
if baseline:
|
||||
try:
|
||||
current = file_md5(path)
|
||||
except OSError:
|
||||
continue
|
||||
if current != baseline:
|
||||
out[path] = "modified_packaged_file"
|
||||
return out
|
||||
|
||||
|
||||
class RpmBackend(PackageBackend):
|
||||
name = "rpm"
|
||||
pkg_config_prefixes = (
|
||||
"/etc/dnf/",
|
||||
"/etc/yum/",
|
||||
"/etc/yum.repos.d/",
|
||||
"/etc/yum.conf",
|
||||
)
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._modified_cache: Dict[str, Set[str]] = {}
|
||||
self._config_cache: Dict[str, Set[str]] = {}
|
||||
|
||||
def owner_of_path(self, path: str) -> Optional[str]:
|
||||
from .rpm import rpm_owner
|
||||
|
||||
return rpm_owner(path)
|
||||
|
||||
def list_manual_packages(self) -> List[str]:
|
||||
from .rpm import list_manual_packages
|
||||
|
||||
return list_manual_packages()
|
||||
|
||||
def build_etc_index(self):
|
||||
from .rpm import build_rpm_etc_index
|
||||
|
||||
return build_rpm_etc_index()
|
||||
|
||||
def specific_paths_for_hints(self, hints: Set[str]) -> List[str]:
|
||||
paths: List[str] = []
|
||||
for h in hints:
|
||||
paths.extend(
|
||||
[
|
||||
f"/etc/sysconfig/{h}",
|
||||
f"/etc/sysconfig/{h}.conf",
|
||||
f"/etc/sysctl.d/{h}.conf",
|
||||
]
|
||||
)
|
||||
return paths
|
||||
|
||||
def _config_files(self, pkg: str) -> Set[str]:
|
||||
if pkg in self._config_cache:
|
||||
return self._config_cache[pkg]
|
||||
from .rpm import rpm_config_files
|
||||
|
||||
s = rpm_config_files(pkg)
|
||||
self._config_cache[pkg] = s
|
||||
return s
|
||||
|
||||
def _modified_files(self, pkg: str) -> Set[str]:
|
||||
if pkg in self._modified_cache:
|
||||
return self._modified_cache[pkg]
|
||||
from .rpm import rpm_modified_files
|
||||
|
||||
s = rpm_modified_files(pkg)
|
||||
self._modified_cache[pkg] = s
|
||||
return s
|
||||
|
||||
def modified_paths(self, pkg: str, etc_paths: List[str]) -> Dict[str, str]:
|
||||
out: Dict[str, str] = {}
|
||||
modified = self._modified_files(pkg)
|
||||
if not modified:
|
||||
return out
|
||||
config = self._config_files(pkg)
|
||||
|
||||
for path in etc_paths:
|
||||
if not path.startswith("/etc/"):
|
||||
continue
|
||||
if self.is_pkg_config_path(path):
|
||||
continue
|
||||
if path not in modified:
|
||||
continue
|
||||
out[path] = (
|
||||
"modified_conffile" if path in config else "modified_packaged_file"
|
||||
)
|
||||
return out
|
||||
|
||||
|
||||
def get_backend(info: Optional[PlatformInfo] = None) -> PackageBackend:
|
||||
info = info or detect_platform()
|
||||
if info.pkg_backend == "dpkg":
|
||||
return DpkgBackend()
|
||||
if info.pkg_backend == "rpm":
|
||||
return RpmBackend()
|
||||
# Unknown: be conservative and use an rpm backend if rpm exists, otherwise dpkg.
|
||||
if shutil.which("rpm"):
|
||||
return RpmBackend()
|
||||
return DpkgBackend()
|
||||
266
enroll/rpm.py
266
enroll/rpm.py
|
|
@ -1,266 +0,0 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import subprocess # nosec
|
||||
from typing import Dict, List, Optional, Set, Tuple
|
||||
|
||||
|
||||
def _run(
|
||||
cmd: list[str], *, allow_fail: bool = False, merge_err: bool = False
|
||||
) -> tuple[int, str]:
|
||||
"""Run a command and return (rc, stdout).
|
||||
|
||||
If merge_err is True, stderr is merged into stdout to preserve ordering.
|
||||
"""
|
||||
p = subprocess.run(
|
||||
cmd,
|
||||
check=False,
|
||||
text=True,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=(subprocess.STDOUT if merge_err else subprocess.PIPE),
|
||||
) # nosec
|
||||
out = p.stdout or ""
|
||||
if (not allow_fail) and p.returncode != 0:
|
||||
err = "" if merge_err else (p.stderr or "")
|
||||
raise RuntimeError(f"Command failed: {cmd}\n{err}{out}")
|
||||
return p.returncode, out
|
||||
|
||||
|
||||
def rpm_owner(path: str) -> Optional[str]:
|
||||
"""Return owning package name for a path, or None if unowned."""
|
||||
if not path:
|
||||
return None
|
||||
rc, out = _run(
|
||||
["rpm", "-qf", "--qf", "%{NAME}\n", path], allow_fail=True, merge_err=True
|
||||
)
|
||||
if rc != 0:
|
||||
return None
|
||||
for line in out.splitlines():
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
if "is not owned" in line:
|
||||
return None
|
||||
# With --qf we expect just the package name.
|
||||
if re.match(r"^[A-Za-z0-9_.+:-]+$", line):
|
||||
# Strip any accidental epoch/name-version-release output.
|
||||
return line.split(":", 1)[-1].strip() if line else None
|
||||
return None
|
||||
|
||||
|
||||
_ARCH_SUFFIXES = {
|
||||
"noarch",
|
||||
"x86_64",
|
||||
"i686",
|
||||
"aarch64",
|
||||
"armv7hl",
|
||||
"ppc64le",
|
||||
"s390x",
|
||||
"riscv64",
|
||||
}
|
||||
|
||||
|
||||
def _strip_arch(token: str) -> str:
|
||||
"""Strip a trailing .ARCH from a yum/dnf package token."""
|
||||
t = token.strip()
|
||||
if "." not in t:
|
||||
return t
|
||||
head, tail = t.rsplit(".", 1)
|
||||
if tail in _ARCH_SUFFIXES:
|
||||
return head
|
||||
return t
|
||||
|
||||
|
||||
def list_manual_packages() -> List[str]:
|
||||
"""Return packages considered "user-installed" on RPM-based systems.
|
||||
|
||||
Best-effort:
|
||||
1) dnf repoquery --userinstalled
|
||||
2) dnf history userinstalled
|
||||
3) yum history userinstalled
|
||||
|
||||
If none are available, returns an empty list.
|
||||
"""
|
||||
|
||||
def _dedupe(pkgs: List[str]) -> List[str]:
|
||||
return sorted({p for p in (pkgs or []) if p})
|
||||
|
||||
if shutil.which("dnf"):
|
||||
# Prefer a machine-friendly output.
|
||||
for cmd in (
|
||||
["dnf", "-q", "repoquery", "--userinstalled", "--qf", "%{name}\n"],
|
||||
["dnf", "-q", "repoquery", "--userinstalled"],
|
||||
):
|
||||
rc, out = _run(cmd, allow_fail=True, merge_err=True)
|
||||
if rc == 0 and out.strip():
|
||||
pkgs = []
|
||||
for line in out.splitlines():
|
||||
line = line.strip()
|
||||
if not line or line.startswith("Loaded plugins"):
|
||||
continue
|
||||
pkgs.append(_strip_arch(line.split()[0]))
|
||||
if pkgs:
|
||||
return _dedupe(pkgs)
|
||||
|
||||
# Fallback: human-oriented output.
|
||||
rc, out = _run(
|
||||
["dnf", "-q", "history", "userinstalled"], allow_fail=True, merge_err=True
|
||||
)
|
||||
if rc == 0 and out.strip():
|
||||
pkgs = []
|
||||
for line in out.splitlines():
|
||||
line = line.strip()
|
||||
if not line or line.startswith("Installed") or line.startswith("Last"):
|
||||
continue
|
||||
# Often: "vim-enhanced.x86_64"
|
||||
tok = line.split()[0]
|
||||
pkgs.append(_strip_arch(tok))
|
||||
if pkgs:
|
||||
return _dedupe(pkgs)
|
||||
|
||||
if shutil.which("yum"):
|
||||
rc, out = _run(
|
||||
["yum", "-q", "history", "userinstalled"], allow_fail=True, merge_err=True
|
||||
)
|
||||
if rc == 0 and out.strip():
|
||||
pkgs = []
|
||||
for line in out.splitlines():
|
||||
line = line.strip()
|
||||
if (
|
||||
not line
|
||||
or line.startswith("Installed")
|
||||
or line.startswith("Loaded")
|
||||
):
|
||||
continue
|
||||
tok = line.split()[0]
|
||||
pkgs.append(_strip_arch(tok))
|
||||
if pkgs:
|
||||
return _dedupe(pkgs)
|
||||
|
||||
return []
|
||||
|
||||
|
||||
def _walk_etc_files() -> List[str]:
|
||||
out: List[str] = []
|
||||
for dirpath, _, filenames in os.walk("/etc"):
|
||||
for fn in filenames:
|
||||
p = os.path.join(dirpath, fn)
|
||||
if os.path.islink(p) or not os.path.isfile(p):
|
||||
continue
|
||||
out.append(p)
|
||||
return out
|
||||
|
||||
|
||||
def build_rpm_etc_index() -> (
|
||||
Tuple[Set[str], Dict[str, str], Dict[str, Set[str]], Dict[str, List[str]]]
|
||||
):
|
||||
"""Best-effort equivalent of build_dpkg_etc_index for RPM systems.
|
||||
|
||||
This builds indexes by walking the live /etc tree and querying RPM ownership
|
||||
for each file.
|
||||
|
||||
Returns:
|
||||
owned_etc_paths: set of /etc paths owned by rpm
|
||||
etc_owner_map: /etc/path -> pkg
|
||||
topdir_to_pkgs: "nginx" -> {"nginx", ...} based on /etc/<topdir>/...
|
||||
pkg_to_etc_paths: pkg -> list of owned /etc paths
|
||||
"""
|
||||
|
||||
owned: Set[str] = set()
|
||||
owner: Dict[str, str] = {}
|
||||
topdir_to_pkgs: Dict[str, Set[str]] = {}
|
||||
pkg_to_etc: Dict[str, List[str]] = {}
|
||||
|
||||
paths = _walk_etc_files()
|
||||
|
||||
# Query in chunks to avoid excessive process spawns.
|
||||
chunk_size = 250
|
||||
|
||||
not_owned_re = re.compile(
|
||||
r"^file\s+(?P<path>.+?)\s+is\s+not\s+owned\s+by\s+any\s+package", re.IGNORECASE
|
||||
)
|
||||
|
||||
for i in range(0, len(paths), chunk_size):
|
||||
chunk = paths[i : i + chunk_size]
|
||||
rc, out = _run(
|
||||
["rpm", "-qf", "--qf", "%{NAME}\n", *chunk],
|
||||
allow_fail=True,
|
||||
merge_err=True,
|
||||
)
|
||||
|
||||
lines = [ln.strip() for ln in out.splitlines() if ln.strip()]
|
||||
# Heuristic: rpm prints one output line per input path. If that isn't
|
||||
# true (warnings/errors), fall back to per-file queries for this chunk.
|
||||
if len(lines) != len(chunk):
|
||||
for p in chunk:
|
||||
pkg = rpm_owner(p)
|
||||
if not pkg:
|
||||
continue
|
||||
owned.add(p)
|
||||
owner.setdefault(p, pkg)
|
||||
pkg_to_etc.setdefault(pkg, []).append(p)
|
||||
parts = p.split("/", 3)
|
||||
if len(parts) >= 3 and parts[2]:
|
||||
topdir_to_pkgs.setdefault(parts[2], set()).add(pkg)
|
||||
continue
|
||||
|
||||
for pth, line in zip(chunk, lines):
|
||||
if not line:
|
||||
continue
|
||||
if not_owned_re.match(line) or "is not owned" in line:
|
||||
continue
|
||||
pkg = line.split()[0].strip()
|
||||
if not pkg:
|
||||
continue
|
||||
owned.add(pth)
|
||||
owner.setdefault(pth, pkg)
|
||||
pkg_to_etc.setdefault(pkg, []).append(pth)
|
||||
parts = pth.split("/", 3)
|
||||
if len(parts) >= 3 and parts[2]:
|
||||
topdir_to_pkgs.setdefault(parts[2], set()).add(pkg)
|
||||
|
||||
for k, v in list(pkg_to_etc.items()):
|
||||
pkg_to_etc[k] = sorted(set(v))
|
||||
|
||||
return owned, owner, topdir_to_pkgs, pkg_to_etc
|
||||
|
||||
|
||||
def rpm_config_files(pkg: str) -> Set[str]:
|
||||
"""Return config files for a package (rpm -qc)."""
|
||||
rc, out = _run(["rpm", "-qc", pkg], allow_fail=True, merge_err=True)
|
||||
if rc != 0:
|
||||
return set()
|
||||
files: Set[str] = set()
|
||||
for line in out.splitlines():
|
||||
line = line.strip()
|
||||
if line.startswith("/"):
|
||||
files.add(line)
|
||||
return files
|
||||
|
||||
|
||||
def rpm_modified_files(pkg: str) -> Set[str]:
|
||||
"""Return files reported as modified by rpm verification (rpm -V).
|
||||
|
||||
rpm -V only prints lines for differences/missing files.
|
||||
"""
|
||||
rc, out = _run(["rpm", "-V", pkg], allow_fail=True, merge_err=True)
|
||||
# rc is non-zero when there are differences; we still want the output.
|
||||
files: Set[str] = set()
|
||||
for raw in out.splitlines():
|
||||
line = raw.strip()
|
||||
if not line:
|
||||
continue
|
||||
# Typical forms:
|
||||
# S.5....T. c /etc/foo.conf
|
||||
# missing /etc/bar
|
||||
m = re.search(r"\s(/\S+)$", line)
|
||||
if m:
|
||||
files.add(m.group(1))
|
||||
continue
|
||||
if line.startswith("missing"):
|
||||
parts = line.split()
|
||||
if parts and parts[-1].startswith("/"):
|
||||
files.add(parts[-1])
|
||||
return files
|
||||
|
|
@ -1,32 +0,0 @@
|
|||
from __future__ import annotations
|
||||
|
||||
|
||||
def get_enroll_version() -> str:
|
||||
"""
|
||||
Best-effort version lookup that works when installed via:
|
||||
- poetry/pip/wheel
|
||||
- deb/rpm system packages
|
||||
Falls back to "0+unknown" when running from an unpacked source tree.
|
||||
"""
|
||||
try:
|
||||
from importlib.metadata import (
|
||||
packages_distributions,
|
||||
version,
|
||||
)
|
||||
except Exception:
|
||||
# Very old Python or unusual environment
|
||||
return "unknown"
|
||||
|
||||
# Map import package -> dist(s)
|
||||
dist_names = []
|
||||
try:
|
||||
dist_names = (packages_distributions() or {}).get("enroll", []) or []
|
||||
except Exception:
|
||||
dist_names = []
|
||||
|
||||
# Try mapped dists first, then a reasonable default
|
||||
for dist in [*dist_names, "enroll"]:
|
||||
try:
|
||||
return version(dist)
|
||||
except Exception:
|
||||
return "unknown"
|
||||
|
|
@ -1,5 +1,6 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
|
|
@ -96,3 +97,58 @@ def test_parse_status_conffiles_handles_continuations(tmp_path: Path):
|
|||
assert m["nginx"]["/etc/nginx/nginx.conf"] == "abcdef"
|
||||
assert m["nginx"]["/etc/nginx/mime.types"] == "123456"
|
||||
assert "other" not in m
|
||||
|
||||
|
||||
def test_read_pkg_md5sums_and_file_md5(tmp_path: Path, monkeypatch):
|
||||
import enroll.debian as d
|
||||
|
||||
# Patch /var/lib/dpkg/info/<pkg>.md5sums lookup to a tmp file.
|
||||
md5_file = tmp_path / "pkg.md5sums"
|
||||
md5_file.write_text("0123456789abcdef etc/foo.conf\n", encoding="utf-8")
|
||||
|
||||
def fake_exists(path: str) -> bool:
|
||||
return path.endswith("/var/lib/dpkg/info/p1.md5sums")
|
||||
|
||||
real_open = open
|
||||
|
||||
def fake_open(path: str, *args, **kwargs):
|
||||
if path.endswith("/var/lib/dpkg/info/p1.md5sums"):
|
||||
return real_open(md5_file, *args, **kwargs)
|
||||
return real_open(path, *args, **kwargs)
|
||||
|
||||
monkeypatch.setattr(d.os.path, "exists", fake_exists)
|
||||
monkeypatch.setattr("builtins.open", fake_open)
|
||||
|
||||
m = d.read_pkg_md5sums("p1")
|
||||
assert m == {"etc/foo.conf": "0123456789abcdef"}
|
||||
|
||||
content = b"hello world\n"
|
||||
p = tmp_path / "x"
|
||||
p.write_bytes(content)
|
||||
assert d.file_md5(str(p)) == hashlib.md5(content).hexdigest()
|
||||
|
||||
|
||||
def test_stat_triplet_fallbacks(tmp_path: Path, monkeypatch):
|
||||
import enroll.debian as d
|
||||
import sys
|
||||
|
||||
p = tmp_path / "f"
|
||||
p.write_text("x", encoding="utf-8")
|
||||
|
||||
class FakePwdMod:
|
||||
@staticmethod
|
||||
def getpwuid(_): # pragma: no cover
|
||||
raise KeyError
|
||||
|
||||
class FakeGrpMod:
|
||||
@staticmethod
|
||||
def getgrgid(_): # pragma: no cover
|
||||
raise KeyError
|
||||
|
||||
# stat_triplet imports pwd/grp inside the function, so patch sys.modules.
|
||||
monkeypatch.setitem(sys.modules, "pwd", FakePwdMod)
|
||||
monkeypatch.setitem(sys.modules, "grp", FakeGrpMod)
|
||||
owner, group, mode = d.stat_triplet(str(p))
|
||||
assert owner.isdigit()
|
||||
assert group.isdigit()
|
||||
assert mode.isdigit() and len(mode) == 4
|
||||
|
|
|
|||
|
|
@ -1,25 +0,0 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from enroll.fsutil import file_md5, stat_triplet
|
||||
|
||||
|
||||
def test_file_md5_matches_hashlib(tmp_path: Path):
|
||||
p = tmp_path / "x"
|
||||
p.write_bytes(b"hello world")
|
||||
expected = hashlib.md5(b"hello world").hexdigest() # nosec
|
||||
assert file_md5(str(p)) == expected
|
||||
|
||||
|
||||
def test_stat_triplet_reports_mode(tmp_path: Path):
|
||||
p = tmp_path / "x"
|
||||
p.write_text("x", encoding="utf-8")
|
||||
os.chmod(p, 0o600)
|
||||
|
||||
owner, group, mode = stat_triplet(str(p))
|
||||
assert mode == "0600"
|
||||
assert owner # non-empty string
|
||||
assert group # non-empty string
|
||||
|
|
@ -2,7 +2,6 @@ import json
|
|||
from pathlib import Path
|
||||
|
||||
import enroll.harvest as h
|
||||
from enroll.platform import PlatformInfo
|
||||
from enroll.systemd import UnitInfo
|
||||
|
||||
|
||||
|
|
@ -11,64 +10,6 @@ class AllowAllPolicy:
|
|||
return None
|
||||
|
||||
|
||||
class FakeBackend:
|
||||
"""Minimal backend stub for harvest tests.
|
||||
|
||||
The real backends (dpkg/rpm) enumerate the live system (dpkg status, rpm
|
||||
databases, etc). These tests instead control all backend behaviour.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
name: str,
|
||||
owned_etc: set[str],
|
||||
etc_owner_map: dict[str, str],
|
||||
topdir_to_pkgs: dict[str, set[str]],
|
||||
pkg_to_etc_paths: dict[str, list[str]],
|
||||
manual_pkgs: list[str],
|
||||
owner_fn,
|
||||
modified_by_pkg: dict[str, dict[str, str]] | None = None,
|
||||
pkg_config_prefixes: tuple[str, ...] = ("/etc/apt/",),
|
||||
):
|
||||
self.name = name
|
||||
self.pkg_config_prefixes = pkg_config_prefixes
|
||||
self._owned_etc = owned_etc
|
||||
self._etc_owner_map = etc_owner_map
|
||||
self._topdir_to_pkgs = topdir_to_pkgs
|
||||
self._pkg_to_etc_paths = pkg_to_etc_paths
|
||||
self._manual = manual_pkgs
|
||||
self._owner_fn = owner_fn
|
||||
self._modified_by_pkg = modified_by_pkg or {}
|
||||
|
||||
def build_etc_index(self):
|
||||
return (
|
||||
self._owned_etc,
|
||||
self._etc_owner_map,
|
||||
self._topdir_to_pkgs,
|
||||
self._pkg_to_etc_paths,
|
||||
)
|
||||
|
||||
def owner_of_path(self, path: str):
|
||||
return self._owner_fn(path)
|
||||
|
||||
def list_manual_packages(self):
|
||||
return list(self._manual)
|
||||
|
||||
def specific_paths_for_hints(self, hints: set[str]):
|
||||
return []
|
||||
|
||||
def is_pkg_config_path(self, path: str) -> bool:
|
||||
for pfx in self.pkg_config_prefixes:
|
||||
if path == pfx or path.startswith(pfx):
|
||||
return True
|
||||
return False
|
||||
|
||||
def modified_paths(self, pkg: str, etc_paths: list[str]):
|
||||
# Test-controlled; ignore etc_paths.
|
||||
return dict(self._modified_by_pkg.get(pkg, {}))
|
||||
|
||||
|
||||
def test_harvest_dedup_manual_packages_and_builds_etc_custom(
|
||||
monkeypatch, tmp_path: Path
|
||||
):
|
||||
|
|
@ -81,7 +22,7 @@ def test_harvest_dedup_manual_packages_and_builds_etc_custom(
|
|||
real_exists = os.path.exists
|
||||
real_islink = os.path.islink
|
||||
|
||||
# Fake filesystem: two /etc files exist, only one is package-owned.
|
||||
# Fake filesystem: two /etc files exist, only one is dpkg-owned.
|
||||
# Also include some /usr/local files to populate usr_local_custom.
|
||||
files = {
|
||||
"/etc/openvpn/server.conf": b"server",
|
||||
|
|
@ -152,7 +93,6 @@ def test_harvest_dedup_manual_packages_and_builds_etc_custom(
|
|||
|
||||
# Avoid real system access
|
||||
monkeypatch.setattr(h, "list_enabled_services", lambda: ["openvpn.service"])
|
||||
monkeypatch.setattr(h, "list_enabled_timers", lambda: [])
|
||||
monkeypatch.setattr(
|
||||
h,
|
||||
"get_unit_info",
|
||||
|
|
@ -169,30 +109,29 @@ def test_harvest_dedup_manual_packages_and_builds_etc_custom(
|
|||
),
|
||||
)
|
||||
|
||||
# Package index: openvpn owns /etc/openvpn/server.conf; keyboard is unowned.
|
||||
# Debian package index: openvpn owns /etc/openvpn/server.conf; keyboard is unowned.
|
||||
def fake_build_index():
|
||||
owned_etc = {"/etc/openvpn/server.conf"}
|
||||
etc_owner_map = {"/etc/openvpn/server.conf": "openvpn"}
|
||||
topdir_to_pkgs = {"openvpn": {"openvpn"}}
|
||||
pkg_to_etc_paths = {"openvpn": ["/etc/openvpn/server.conf"], "curl": []}
|
||||
return owned_etc, etc_owner_map, topdir_to_pkgs, pkg_to_etc_paths
|
||||
|
||||
backend = FakeBackend(
|
||||
name="dpkg",
|
||||
owned_etc=owned_etc,
|
||||
etc_owner_map=etc_owner_map,
|
||||
topdir_to_pkgs=topdir_to_pkgs,
|
||||
pkg_to_etc_paths=pkg_to_etc_paths,
|
||||
manual_pkgs=["openvpn", "curl"],
|
||||
owner_fn=lambda p: "openvpn" if "openvpn" in (p or "") else None,
|
||||
modified_by_pkg={
|
||||
"openvpn": {"/etc/openvpn/server.conf": "modified_conffile"},
|
||||
},
|
||||
monkeypatch.setattr(h, "build_dpkg_etc_index", fake_build_index)
|
||||
|
||||
# openvpn conffile hash mismatch => should be captured under service role
|
||||
monkeypatch.setattr(
|
||||
h,
|
||||
"parse_status_conffiles",
|
||||
lambda: {"openvpn": {"/etc/openvpn/server.conf": "old"}},
|
||||
)
|
||||
monkeypatch.setattr(h, "read_pkg_md5sums", lambda pkg: {})
|
||||
monkeypatch.setattr(h, "file_md5", lambda path: "new")
|
||||
|
||||
monkeypatch.setattr(
|
||||
h, "detect_platform", lambda: PlatformInfo("debian", "dpkg", {})
|
||||
h, "dpkg_owner", lambda p: "openvpn" if "openvpn" in p else None
|
||||
)
|
||||
monkeypatch.setattr(h, "get_backend", lambda info=None: backend)
|
||||
|
||||
monkeypatch.setattr(h, "list_manual_packages", lambda: ["openvpn", "curl"])
|
||||
monkeypatch.setattr(h, "collect_non_system_users", lambda: [])
|
||||
|
||||
def fake_stat_triplet(p: str):
|
||||
|
|
@ -268,7 +207,6 @@ def test_shared_cron_snippet_prefers_matching_role_over_lexicographic(
|
|||
monkeypatch.setattr(
|
||||
h, "list_enabled_services", lambda: ["apparmor.service", "ntpsec.service"]
|
||||
)
|
||||
monkeypatch.setattr(h, "list_enabled_timers", lambda: [])
|
||||
|
||||
def fake_unit_info(unit: str) -> UnitInfo:
|
||||
if unit == "apparmor.service":
|
||||
|
|
@ -297,35 +235,31 @@ def test_shared_cron_snippet_prefers_matching_role_over_lexicographic(
|
|||
|
||||
monkeypatch.setattr(h, "get_unit_info", fake_unit_info)
|
||||
|
||||
# Dpkg /etc index: no owned /etc paths needed for this test.
|
||||
monkeypatch.setattr(
|
||||
h,
|
||||
"build_dpkg_etc_index",
|
||||
lambda: (set(), {}, {}, {}),
|
||||
)
|
||||
monkeypatch.setattr(h, "parse_status_conffiles", lambda: {})
|
||||
monkeypatch.setattr(h, "read_pkg_md5sums", lambda pkg: {})
|
||||
monkeypatch.setattr(h, "file_md5", lambda path: "x")
|
||||
monkeypatch.setattr(h, "list_manual_packages", lambda: [])
|
||||
monkeypatch.setattr(h, "collect_non_system_users", lambda: [])
|
||||
|
||||
# Make apparmor *also* claim the ntpsec package (simulates overly-broad
|
||||
# package inference). The snippet routing should still prefer role 'ntpsec'.
|
||||
def fake_owner(p: str):
|
||||
def fake_dpkg_owner(p: str):
|
||||
if p == "/etc/cron.d/ntpsec":
|
||||
return "ntpsec"
|
||||
if "apparmor" in (p or ""):
|
||||
if "apparmor" in p:
|
||||
return "ntpsec" # intentionally misleading
|
||||
if "ntpsec" in (p or "") or "ntpd" in (p or ""):
|
||||
if "ntpsec" in p or "ntpd" in p:
|
||||
return "ntpsec"
|
||||
return None
|
||||
|
||||
backend = FakeBackend(
|
||||
name="dpkg",
|
||||
owned_etc=set(),
|
||||
etc_owner_map={},
|
||||
topdir_to_pkgs={},
|
||||
pkg_to_etc_paths={},
|
||||
manual_pkgs=[],
|
||||
owner_fn=fake_owner,
|
||||
modified_by_pkg={},
|
||||
)
|
||||
|
||||
monkeypatch.setattr(
|
||||
h, "detect_platform", lambda: PlatformInfo("debian", "dpkg", {})
|
||||
)
|
||||
monkeypatch.setattr(h, "get_backend", lambda info=None: backend)
|
||||
|
||||
monkeypatch.setattr(h, "dpkg_owner", fake_dpkg_owner)
|
||||
monkeypatch.setattr(h, "stat_triplet", lambda p: ("root", "root", "0644"))
|
||||
monkeypatch.setattr(h, "collect_non_system_users", lambda: [])
|
||||
|
||||
def fake_copy(bundle_dir: str, role_name: str, abs_path: str, src_rel: str):
|
||||
dst = Path(bundle_dir) / "artifacts" / role_name / src_rel
|
||||
|
|
@ -334,7 +268,11 @@ def test_shared_cron_snippet_prefers_matching_role_over_lexicographic(
|
|||
|
||||
monkeypatch.setattr(h, "_copy_into_bundle", fake_copy)
|
||||
|
||||
state_path = h.harvest(str(bundle), policy=AllowAllPolicy())
|
||||
class AllowAll:
|
||||
def deny_reason(self, path: str):
|
||||
return None
|
||||
|
||||
state_path = h.harvest(str(bundle), policy=AllowAll())
|
||||
st = json.loads(Path(state_path).read_text(encoding="utf-8"))
|
||||
|
||||
# Cron snippet should end up attached to the ntpsec role, not apparmor.
|
||||
|
|
|
|||
|
|
@ -322,96 +322,3 @@ def test_copy2_replace_overwrites_readonly_destination(tmp_path: Path):
|
|||
assert dst.read_text(encoding="utf-8") == "new"
|
||||
mode = stat.S_IMODE(dst.stat().st_mode)
|
||||
assert mode & stat.S_IWUSR # destination should remain mergeable
|
||||
|
||||
|
||||
def test_manifest_includes_dnf_config_role_when_present(tmp_path: Path):
|
||||
bundle = tmp_path / "bundle"
|
||||
out = tmp_path / "ansible"
|
||||
|
||||
# Create a dnf_config artifact.
|
||||
(bundle / "artifacts" / "dnf_config" / "etc" / "dnf").mkdir(
|
||||
parents=True, exist_ok=True
|
||||
)
|
||||
(bundle / "artifacts" / "dnf_config" / "etc" / "dnf" / "dnf.conf").write_text(
|
||||
"[main]\n", encoding="utf-8"
|
||||
)
|
||||
|
||||
state = {
|
||||
"host": {"hostname": "test", "os": "redhat", "pkg_backend": "rpm"},
|
||||
"users": {
|
||||
"role_name": "users",
|
||||
"users": [],
|
||||
"managed_files": [],
|
||||
"excluded": [],
|
||||
"notes": [],
|
||||
},
|
||||
"services": [],
|
||||
"package_roles": [],
|
||||
"manual_packages": [],
|
||||
"manual_packages_skipped": [],
|
||||
"apt_config": {
|
||||
"role_name": "apt_config",
|
||||
"managed_files": [],
|
||||
"excluded": [],
|
||||
"notes": [],
|
||||
},
|
||||
"dnf_config": {
|
||||
"role_name": "dnf_config",
|
||||
"managed_files": [
|
||||
{
|
||||
"path": "/etc/dnf/dnf.conf",
|
||||
"src_rel": "etc/dnf/dnf.conf",
|
||||
"owner": "root",
|
||||
"group": "root",
|
||||
"mode": "0644",
|
||||
"reason": "dnf_config",
|
||||
}
|
||||
],
|
||||
"excluded": [],
|
||||
"notes": [],
|
||||
},
|
||||
"etc_custom": {
|
||||
"role_name": "etc_custom",
|
||||
"managed_files": [],
|
||||
"excluded": [],
|
||||
"notes": [],
|
||||
},
|
||||
"usr_local_custom": {
|
||||
"role_name": "usr_local_custom",
|
||||
"managed_files": [],
|
||||
"excluded": [],
|
||||
"notes": [],
|
||||
},
|
||||
"extra_paths": {
|
||||
"role_name": "extra_paths",
|
||||
"include_patterns": [],
|
||||
"exclude_patterns": [],
|
||||
"managed_files": [],
|
||||
"excluded": [],
|
||||
"notes": [],
|
||||
},
|
||||
}
|
||||
|
||||
bundle.mkdir(parents=True, exist_ok=True)
|
||||
(bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8")
|
||||
|
||||
manifest(str(bundle), str(out))
|
||||
|
||||
pb = (out / "playbook.yml").read_text(encoding="utf-8")
|
||||
assert "- dnf_config" in pb
|
||||
|
||||
tasks = (out / "roles" / "dnf_config" / "tasks" / "main.yml").read_text(
|
||||
encoding="utf-8"
|
||||
)
|
||||
# Ensure the role exists and contains some file deployment logic.
|
||||
assert "Deploy any other managed files" in tasks
|
||||
|
||||
|
||||
def test_render_install_packages_tasks_contains_dnf_branch():
|
||||
from enroll.manifest import _render_install_packages_tasks
|
||||
|
||||
txt = _render_install_packages_tasks("role", "role")
|
||||
assert "ansible.builtin.apt" in txt
|
||||
assert "ansible.builtin.dnf" in txt
|
||||
assert "ansible.builtin.package" in txt
|
||||
assert "pkg_mgr" in txt
|
||||
|
|
|
|||
|
|
@ -1,93 +0,0 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import enroll.platform as platform
|
||||
|
||||
|
||||
def test_read_os_release_parses_kv_and_strips_quotes(tmp_path: Path):
|
||||
p = tmp_path / "os-release"
|
||||
p.write_text(
|
||||
"""
|
||||
# comment
|
||||
ID=fedora
|
||||
ID_LIKE=\"rhel centos\"
|
||||
NAME=\"Fedora Linux\"
|
||||
EMPTY=
|
||||
NOEQUALS
|
||||
""",
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
osr = platform._read_os_release(str(p))
|
||||
assert osr["ID"] == "fedora"
|
||||
assert osr["ID_LIKE"] == "rhel centos"
|
||||
assert osr["NAME"] == "Fedora Linux"
|
||||
assert osr["EMPTY"] == ""
|
||||
assert "NOEQUALS" not in osr
|
||||
|
||||
|
||||
def test_detect_platform_prefers_os_release(monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
platform,
|
||||
"_read_os_release",
|
||||
lambda path="/etc/os-release": {"ID": "fedora", "ID_LIKE": "rhel"},
|
||||
)
|
||||
# If os-release is decisive we shouldn't need which()
|
||||
monkeypatch.setattr(platform.shutil, "which", lambda exe: None)
|
||||
|
||||
info = platform.detect_platform()
|
||||
assert info.os_family == "redhat"
|
||||
assert info.pkg_backend == "rpm"
|
||||
|
||||
|
||||
def test_detect_platform_fallbacks_to_dpkg_when_unknown(monkeypatch):
|
||||
monkeypatch.setattr(platform, "_read_os_release", lambda path="/etc/os-release": {})
|
||||
monkeypatch.setattr(
|
||||
platform.shutil, "which", lambda exe: "/usr/bin/dpkg" if exe == "dpkg" else None
|
||||
)
|
||||
|
||||
info = platform.detect_platform()
|
||||
assert info.os_family == "debian"
|
||||
assert info.pkg_backend == "dpkg"
|
||||
|
||||
|
||||
def test_get_backend_unknown_prefers_rpm_if_present(monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
platform.shutil, "which", lambda exe: "/usr/bin/rpm" if exe == "rpm" else None
|
||||
)
|
||||
|
||||
b = platform.get_backend(
|
||||
platform.PlatformInfo(os_family="unknown", pkg_backend="unknown", os_release={})
|
||||
)
|
||||
assert isinstance(b, platform.RpmBackend)
|
||||
|
||||
|
||||
def test_rpm_backend_modified_paths_labels_conffiles(monkeypatch):
|
||||
b = platform.RpmBackend()
|
||||
|
||||
# Pretend rpm -V says both files changed, but only one is a config file.
|
||||
monkeypatch.setattr(b, "_modified_files", lambda pkg: {"/etc/foo.conf", "/etc/bar"})
|
||||
monkeypatch.setattr(b, "_config_files", lambda pkg: {"/etc/foo.conf"})
|
||||
|
||||
out = b.modified_paths("mypkg", ["/etc/foo.conf", "/etc/bar", "/etc/dnf/dnf.conf"])
|
||||
assert out["/etc/foo.conf"] == "modified_conffile"
|
||||
assert out["/etc/bar"] == "modified_packaged_file"
|
||||
# Package-manager config paths are excluded.
|
||||
assert "/etc/dnf/dnf.conf" not in out
|
||||
|
||||
|
||||
def test_specific_paths_for_hints_differs_between_backends():
|
||||
# We can exercise this without instantiating DpkgBackend (which reads dpkg status)
|
||||
class Dummy(platform.PackageBackend):
|
||||
name = "dummy"
|
||||
pkg_config_prefixes = ("/etc/apt/",)
|
||||
|
||||
d = Dummy()
|
||||
assert d.is_pkg_config_path("/etc/apt/sources.list")
|
||||
assert not d.is_pkg_config_path("/etc/ssh/sshd_config")
|
||||
|
||||
r = platform.RpmBackend()
|
||||
paths = set(r.specific_paths_for_hints({"nginx"}))
|
||||
assert "/etc/sysconfig/nginx" in paths
|
||||
assert "/etc/sysconfig/nginx.conf" in paths
|
||||
|
|
@ -1,131 +0,0 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import enroll.rpm as rpm
|
||||
|
||||
|
||||
def test_rpm_owner_returns_none_when_unowned(monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
rpm,
|
||||
"_run",
|
||||
lambda cmd, allow_fail=False, merge_err=False: (
|
||||
1,
|
||||
"file /etc/x is not owned by any package\n",
|
||||
),
|
||||
)
|
||||
assert rpm.rpm_owner("/etc/x") is None
|
||||
|
||||
|
||||
def test_rpm_owner_parses_name(monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
rpm, "_run", lambda cmd, allow_fail=False, merge_err=False: (0, "bash\n")
|
||||
)
|
||||
assert rpm.rpm_owner("/bin/bash") == "bash"
|
||||
|
||||
|
||||
def test_strip_arch_strips_known_arches():
|
||||
assert rpm._strip_arch("vim-enhanced.x86_64") == "vim-enhanced"
|
||||
assert rpm._strip_arch("foo.noarch") == "foo"
|
||||
assert rpm._strip_arch("weird.token") == "weird.token"
|
||||
|
||||
|
||||
def test_list_manual_packages_prefers_dnf_repoquery(monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
rpm.shutil, "which", lambda exe: "/usr/bin/dnf" if exe == "dnf" else None
|
||||
)
|
||||
|
||||
def fake_run(cmd, allow_fail=False, merge_err=False):
|
||||
# First repoquery form returns usable output.
|
||||
if cmd[:3] == ["dnf", "-q", "repoquery"]:
|
||||
return 0, "vim-enhanced.x86_64\nhtop\nvim-enhanced.x86_64\n"
|
||||
raise AssertionError(f"unexpected cmd: {cmd}")
|
||||
|
||||
monkeypatch.setattr(rpm, "_run", fake_run)
|
||||
|
||||
pkgs = rpm.list_manual_packages()
|
||||
assert pkgs == ["htop", "vim-enhanced"]
|
||||
|
||||
|
||||
def test_list_manual_packages_falls_back_to_history(monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
rpm.shutil, "which", lambda exe: "/usr/bin/dnf" if exe == "dnf" else None
|
||||
)
|
||||
|
||||
def fake_run(cmd, allow_fail=False, merge_err=False):
|
||||
# repoquery fails
|
||||
if cmd[:3] == ["dnf", "-q", "repoquery"]:
|
||||
return 1, ""
|
||||
if cmd[:3] == ["dnf", "-q", "history"]:
|
||||
return (
|
||||
0,
|
||||
"Installed Packages\nvim-enhanced.x86_64\nLast metadata expiration check: 0:01:00 ago\n",
|
||||
)
|
||||
raise AssertionError(f"unexpected cmd: {cmd}")
|
||||
|
||||
monkeypatch.setattr(rpm, "_run", fake_run)
|
||||
|
||||
pkgs = rpm.list_manual_packages()
|
||||
assert pkgs == ["vim-enhanced"]
|
||||
|
||||
|
||||
def test_build_rpm_etc_index_uses_fallback_when_rpm_output_mismatches(monkeypatch):
|
||||
# Two files in /etc, one owned, one unowned.
|
||||
monkeypatch.setattr(
|
||||
rpm, "_walk_etc_files", lambda: ["/etc/owned.conf", "/etc/unowned.conf"]
|
||||
)
|
||||
|
||||
# Simulate chunk query producing unexpected extra line (mismatch) -> triggers per-file fallback.
|
||||
monkeypatch.setattr(
|
||||
rpm,
|
||||
"_run",
|
||||
lambda cmd, allow_fail=False, merge_err=False: (0, "ownedpkg\nEXTRA\nTHIRD\n"),
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
rpm, "rpm_owner", lambda p: "ownedpkg" if p == "/etc/owned.conf" else None
|
||||
)
|
||||
|
||||
owned, owner_map, topdir_to_pkgs, pkg_to_etc = rpm.build_rpm_etc_index()
|
||||
|
||||
assert owned == {"/etc/owned.conf"}
|
||||
assert owner_map["/etc/owned.conf"] == "ownedpkg"
|
||||
assert "owned.conf" in topdir_to_pkgs
|
||||
assert pkg_to_etc["ownedpkg"] == ["/etc/owned.conf"]
|
||||
|
||||
|
||||
def test_build_rpm_etc_index_parses_chunk_output(monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
rpm, "_walk_etc_files", lambda: ["/etc/ssh/sshd_config", "/etc/notowned"]
|
||||
)
|
||||
|
||||
def fake_run(cmd, allow_fail=False, merge_err=False):
|
||||
# One output line per input path.
|
||||
return 0, "openssh-server\nfile /etc/notowned is not owned by any package\n"
|
||||
|
||||
monkeypatch.setattr(rpm, "_run", fake_run)
|
||||
|
||||
owned, owner_map, topdir_to_pkgs, pkg_to_etc = rpm.build_rpm_etc_index()
|
||||
|
||||
assert "/etc/ssh/sshd_config" in owned
|
||||
assert "/etc/notowned" not in owned
|
||||
assert owner_map["/etc/ssh/sshd_config"] == "openssh-server"
|
||||
assert "ssh" in topdir_to_pkgs
|
||||
assert "openssh-server" in topdir_to_pkgs["ssh"]
|
||||
assert pkg_to_etc["openssh-server"] == ["/etc/ssh/sshd_config"]
|
||||
|
||||
|
||||
def test_rpm_config_files_and_modified_files_parsing(monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
rpm,
|
||||
"_run",
|
||||
lambda cmd, allow_fail=False, merge_err=False: (
|
||||
0,
|
||||
"/etc/foo.conf\n/usr/bin/tool\n",
|
||||
),
|
||||
)
|
||||
assert rpm.rpm_config_files("mypkg") == {"/etc/foo.conf", "/usr/bin/tool"}
|
||||
|
||||
# rpm -V returns only changed/missing files
|
||||
out = "S.5....T. c /etc/foo.conf\nmissing /etc/bar\n"
|
||||
monkeypatch.setattr(
|
||||
rpm, "_run", lambda cmd, allow_fail=False, merge_err=False: (1, out)
|
||||
)
|
||||
assert rpm.rpm_modified_files("mypkg") == {"/etc/foo.conf", "/etc/bar"}
|
||||
Loading…
Add table
Add a link
Reference in a new issue