Capture more singletons in /etc and avoid apt duplication
Some checks failed
Lint / test (push) Waiting to run
Trivy / test (push) Waiting to run
CI / test (push) Has been cancelled

This commit is contained in:
Miguel Jacq 2025-12-27 19:02:22 +11:00
parent 4d2250f974
commit 054a6192d1
Signed by: mig5
GPG key ID: 59B3F0C24135C6A9
6 changed files with 481 additions and 22 deletions

View file

@ -1,3 +1,8 @@
# 0.1.4
* Attempt to capture more stuff from /etc that might not be attributable to a specific package. This includes common singletons and systemd timers
* Avoid duplicate apt data in package-specific roles.
# 0.1.3
* Allow the user to add extra paths to harvest, or paths to ignore, using `--exclude-path` and `--include-path`

7
debian/changelog vendored
View file

@ -1,3 +1,10 @@
enroll (0.1.4) unstable; urgency=medium
* Attempt to capture more stuff from /etc that might not be attributable to a specific package. This includes common singletons and systemd timers
* Avoid duplicate apt data in package-specific roles.
-- Miguel Jacq <mig@mig5.net> Sat, 27 Dec 2025 19:00:00 +1100
enroll (0.1.3) unstable; urgency=medium
* Allow the user to add extra paths to harvest, or paths to ignore, using `--exclude-path` and `--include-path`

View file

@ -8,7 +8,13 @@ import shutil
from dataclasses import dataclass, asdict
from typing import Dict, List, Optional, Set
from .systemd import list_enabled_services, get_unit_info, UnitQueryError
from .systemd import (
list_enabled_services,
list_enabled_timers,
get_unit_info,
get_timer_info,
UnitQueryError,
)
from .debian import (
build_dpkg_etc_index,
dpkg_owner,
@ -98,24 +104,24 @@ class ExtraPathsSnapshot:
ALLOWED_UNOWNED_EXTS = {
".cnf",
".conf",
".cfg",
".ini",
".cnf",
".yaml",
".yml",
".json",
".toml",
".link",
".mount",
".netdev",
".network",
".path",
".rules",
".service",
".socket",
".timer",
".target",
".path",
".mount",
".network",
".netdev",
".link",
".timer",
".toml",
".yaml",
".yml",
"", # allow extensionless (common in /etc/default and /etc/init.d)
}
@ -123,23 +129,24 @@ MAX_UNOWNED_FILES_PER_ROLE = 400
# Directories that are shared across many packages; never attribute unowned files in these trees to a single package.
SHARED_ETC_TOPDIRS = {
"default",
"apparmor.d",
"network",
"init.d",
"systemd",
"pam.d",
"ssh",
"ssl",
"sudoers.d",
"apt",
"cron.d",
"cron.daily",
"cron.weekly",
"cron.monthly",
"cron.hourly",
"default",
"init.d",
"logrotate.d",
"sysctl.d",
"modprobe.d",
"network",
"pam.d",
"ssh",
"ssl",
"sudoers.d",
"sysctl.d",
"systemd",
}
@ -256,6 +263,181 @@ def _topdirs_for_package(pkg: str, pkg_to_etc_paths: Dict[str, List[str]]) -> Se
return topdirs
# -------------------------
# System capture helpers
# -------------------------
_APT_SOURCE_GLOBS = [
"/etc/apt/sources.list",
"/etc/apt/sources.list.d/*.list",
"/etc/apt/sources.list.d/*.sources",
]
_APT_MISC_GLOBS = [
"/etc/apt/apt.conf",
"/etc/apt/apt.conf.d/*",
"/etc/apt/preferences",
"/etc/apt/preferences.d/*",
"/etc/apt/auth.conf",
"/etc/apt/auth.conf.d/*",
"/etc/apt/trusted.gpg",
"/etc/apt/trusted.gpg.d/*",
"/etc/apt/keyrings/*",
]
_SYSTEM_CAPTURE_GLOBS: List[tuple[str, str]] = [
# mounts
("/etc/fstab", "system_mounts"),
("/etc/crypttab", "system_mounts"),
# logrotate
("/etc/logrotate.conf", "system_logrotate"),
("/etc/logrotate.d/*", "system_logrotate"),
# sysctl / modules
("/etc/sysctl.conf", "system_sysctl"),
("/etc/sysctl.d/*", "system_sysctl"),
("/etc/modprobe.d/*", "system_modprobe"),
("/etc/modules", "system_modprobe"),
("/etc/modules-load.d/*", "system_modprobe"),
# cron
("/etc/crontab", "system_cron"),
("/etc/cron.d/*", "system_cron"),
("/etc/anacrontab", "system_cron"),
("/etc/anacron/*", "system_cron"),
("/var/spool/cron/crontabs/*", "system_cron"),
("/var/spool/crontabs/*", "system_cron"),
# network
("/etc/netplan/*", "system_network"),
("/etc/systemd/network/*", "system_network"),
("/etc/network/interfaces", "system_network"),
("/etc/network/interfaces.d/*", "system_network"),
("/etc/resolvconf.conf", "system_network"),
("/etc/resolvconf/resolv.conf.d/*", "system_network"),
# firewall
("/etc/nftables.conf", "system_firewall"),
("/etc/nftables.d/*", "system_firewall"),
("/etc/iptables/rules.v4", "system_firewall"),
("/etc/iptables/rules.v6", "system_firewall"),
("/etc/ufw/*", "system_firewall"),
("/etc/default/ufw", "system_firewall"),
# other
("/etc/rc.local", "system_rc"),
]
def _iter_matching_files(spec: str, *, cap: int = 2000) -> List[str]:
"""Expand a glob spec and also walk directories to collect files."""
out: List[str] = []
for p in glob.glob(spec):
if len(out) >= cap:
break
if os.path.islink(p):
continue
if os.path.isfile(p):
out.append(p)
continue
if os.path.isdir(p):
for dirpath, _, filenames in os.walk(p):
for fn in filenames:
if len(out) >= cap:
break
fp = os.path.join(dirpath, fn)
if os.path.islink(fp) or not os.path.isfile(fp):
continue
out.append(fp)
if len(out) >= cap:
break
return out
def _parse_apt_signed_by(source_files: List[str]) -> Set[str]:
"""Return absolute keyring paths referenced via signed-by / Signed-By."""
out: Set[str] = set()
# deb line: deb [signed-by=/usr/share/keyrings/foo.gpg] ...
re_signed_by = re.compile(r"signed-by\s*=\s*([^\]\s]+)", re.IGNORECASE)
# deb822: Signed-By: /usr/share/keyrings/foo.gpg
re_signed_by_hdr = re.compile(r"^\s*Signed-By\s*:\s*(.+)$", re.IGNORECASE)
for sf in source_files:
try:
with open(sf, "r", encoding="utf-8", errors="replace") as f:
for raw in f:
line = raw.strip()
if not line or line.startswith("#"):
continue
m = re_signed_by_hdr.match(line)
if m:
val = m.group(1).strip()
if val.startswith("|"):
continue
toks = re.split(r"[\s,]+", val)
for t in toks:
if t.startswith("/"):
out.add(t)
continue
# Try bracketed options first (common for .list files)
if "[" in line and "]" in line:
bracket = line.split("[", 1)[1].split("]", 1)[0]
for mm in re_signed_by.finditer(bracket):
val = mm.group(1).strip().strip("\"'")
for t in re.split(r"[\s,]+", val):
if t.startswith("/"):
out.add(t)
continue
# Fallback: signed-by= in whole line
for mm in re_signed_by.finditer(line):
val = mm.group(1).strip().strip("\"'")
for t in re.split(r"[\s,]+", val):
if t.startswith("/"):
out.add(t)
except OSError:
continue
return out
def _iter_system_capture_paths() -> List[tuple[str, str]]:
"""Return (path, reason) pairs for essential system config/state."""
out: List[tuple[str, str]] = []
# APT: capture sources and related config
apt_sources: List[str] = []
for g in _APT_SOURCE_GLOBS:
apt_sources.extend(_iter_matching_files(g))
for p in sorted(set(apt_sources)):
out.append((p, "system_apt_sources"))
# APT: misc config files/dirs
for g in _APT_MISC_GLOBS:
for p in _iter_matching_files(g):
out.append((p, "system_apt_config"))
# APT: referenced keyrings (may live outside /etc)
signed_by = _parse_apt_signed_by(sorted(set(apt_sources)))
for p in sorted(signed_by):
if os.path.islink(p) or not os.path.isfile(p):
continue
out.append((p, "system_apt_keyring"))
# Other system config/state globs
for spec, reason in _SYSTEM_CAPTURE_GLOBS:
for p in _iter_matching_files(spec):
out.append((p, reason))
# De-dup while preserving first reason
seen: Set[str] = set()
uniq: List[tuple[str, str]] = []
for p, r in out:
if p in seen:
continue
seen.add(p)
uniq.append((p, r))
return uniq
def harvest(
bundle_dir: str,
policy: Optional[IgnorePolicy] = None,
@ -467,6 +649,107 @@ def harvest(
)
)
# -------------------------
# Enabled systemd timers
#
# Timers are typically related to a service/package, so we try to attribute
# timer unit overrides to their associated role rather than creating a
# standalone timer role. If we can't attribute a timer, it will fall back
# to etc_custom (if it's a custom /etc unit).
# -------------------------
timer_extra_by_pkg: Dict[str, List[str]] = {}
try:
enabled_timers = list_enabled_timers()
except Exception:
enabled_timers = []
service_snap_by_unit: Dict[str, ServiceSnapshot] = {
s.unit: s for s in service_snaps
}
for t in enabled_timers:
try:
ti = get_timer_info(t)
except Exception: # nosec
continue
timer_paths: List[str] = []
for pth in [ti.fragment_path, *ti.dropin_paths, *ti.env_files]:
if not pth:
continue
if not pth.startswith("/etc/"):
# Prefer capturing only custom/overridden units.
continue
if os.path.islink(pth) or not os.path.isfile(pth):
continue
timer_paths.append(pth)
if not timer_paths:
continue
# Primary attribution: timer -> trigger service role
snap = None
if ti.trigger_unit:
snap = service_snap_by_unit.get(ti.trigger_unit)
if snap is not None:
for path in timer_paths:
if path_filter.is_excluded(path):
snap.excluded.append(
ExcludedFile(path=path, reason="user_excluded")
)
continue
deny = policy.deny_reason(path)
if deny:
snap.excluded.append(ExcludedFile(path=path, reason=deny))
continue
try:
owner, group, mode = stat_triplet(path)
except OSError:
snap.excluded.append(ExcludedFile(path=path, reason="unreadable"))
continue
src_rel = path.lstrip("/")
try:
_copy_into_bundle(bundle_dir, snap.role_name, path, src_rel)
except OSError:
snap.excluded.append(ExcludedFile(path=path, reason="unreadable"))
continue
snap.managed_files.append(
ManagedFile(
path=path,
src_rel=src_rel,
owner=owner,
group=group,
mode=mode,
reason="related_timer",
)
)
continue
# Secondary attribution: associate timer overrides with a package role
# (useful when a timer triggers a service that isn't enabled).
pkgs: Set[str] = set()
if ti.fragment_path:
p = dpkg_owner(ti.fragment_path)
if p:
pkgs.add(p)
if ti.trigger_unit and ti.trigger_unit.endswith(".service"):
try:
ui = get_unit_info(ti.trigger_unit)
if ui.fragment_path:
p = dpkg_owner(ui.fragment_path)
if p:
pkgs.add(p)
for exe in ui.exec_paths:
p = dpkg_owner(exe)
if p:
pkgs.add(p)
except Exception: # nosec
pass
for pkg in pkgs:
timer_extra_by_pkg.setdefault(pkg, []).extend(timer_paths)
# -------------------------
# Manually installed package roles
# -------------------------
@ -490,6 +773,9 @@ def harvest(
managed: List[ManagedFile] = []
candidates: Dict[str, str] = {}
for tpath in timer_extra_by_pkg.get(pkg, []):
candidates.setdefault(tpath, "related_timer")
conff = conffiles_by_pkg.get(pkg, {})
md5sums = read_pkg_md5sums(pkg)
@ -677,7 +963,46 @@ def harvest(
for mf in users_managed:
already.add(mf.path)
# Walk /etc for unowned config-ish files
# Capture essential system config/state (even if package-owned).
for path, reason in _iter_system_capture_paths():
if path in already:
continue
if path_filter.is_excluded(path):
etc_excluded.append(ExcludedFile(path=path, reason="user_excluded"))
continue
deny = policy.deny_reason(path)
if deny:
etc_excluded.append(ExcludedFile(path=path, reason=deny))
continue
try:
owner, group, mode = stat_triplet(path)
except OSError:
etc_excluded.append(ExcludedFile(path=path, reason="unreadable"))
continue
src_rel = path.lstrip("/")
try:
_copy_into_bundle(bundle_dir, etc_role_name, path, src_rel)
except OSError:
etc_excluded.append(ExcludedFile(path=path, reason="unreadable"))
continue
etc_managed.append(
ManagedFile(
path=path,
src_rel=src_rel,
owner=owner,
group=group,
mode=mode,
reason=reason,
)
)
already.add(path)
# Walk /etc for remaining unowned config-ish files
scanned = 0
for dirpath, _, filenames in os.walk("/etc"):
for fn in filenames:

View file

@ -30,6 +30,21 @@ DEFAULT_DENY_GLOBS = [
"/usr/local/etc/letsencrypt/*",
]
# Allow a small set of binary config artifacts that are commonly required to
# reproduce system configuration (notably APT keyrings). These are still subject
# to size and readability limits, but are exempt from the "binary_like" denial.
DEFAULT_ALLOW_BINARY_GLOBS = [
"/etc/apt/trusted.gpg",
"/etc/apt/trusted.gpg.d/*.gpg",
"/etc/apt/keyrings/*.gpg",
"/etc/apt/keyrings/*.pgp",
"/etc/apt/keyrings/*.asc",
"/usr/share/keyrings/*.gpg",
"/usr/share/keyrings/*.pgp",
"/usr/share/keyrings/*.asc",
]
SENSITIVE_CONTENT_PATTERNS = [
re.compile(rb"-----BEGIN (RSA |EC |OPENSSH |)PRIVATE KEY-----"),
re.compile(rb"(?i)\bpassword\s*="),
@ -44,6 +59,7 @@ BLOCK_END = b"*/"
@dataclass
class IgnorePolicy:
deny_globs: Optional[list[str]] = None
allow_binary_globs: Optional[list[str]] = None
max_file_bytes: int = 256_000
sample_bytes: int = 64_000
# If True, be much less conservative about collecting potentially
@ -54,6 +70,8 @@ class IgnorePolicy:
def __post_init__(self) -> None:
if self.deny_globs is None:
self.deny_globs = list(DEFAULT_DENY_GLOBS)
if self.allow_binary_globs is None:
self.allow_binary_globs = list(DEFAULT_ALLOW_BINARY_GLOBS)
def iter_effective_lines(self, content: bytes):
in_block = False
@ -105,6 +123,10 @@ class IgnorePolicy:
return "unreadable"
if b"\x00" in data:
for g in self.allow_binary_globs or []:
if fnmatch.fnmatch(path, g):
# Binary is acceptable for explicitly-allowed paths.
return None
return "binary_like"
if not self.dangerous:

View file

@ -33,6 +33,19 @@ def _run(cmd: list[str]) -> str:
return p.stdout
@dataclass
class TimerInfo:
name: str
fragment_path: Optional[str]
dropin_paths: List[str]
env_files: List[str]
trigger_unit: Optional[str]
active_state: Optional[str]
sub_state: Optional[str]
unit_file_state: Optional[str]
condition_result: Optional[str]
def list_enabled_services() -> List[str]:
out = _run(
[
@ -58,6 +71,31 @@ def list_enabled_services() -> List[str]:
return sorted(set(units))
def list_enabled_timers() -> List[str]:
out = _run(
[
"systemctl",
"list-unit-files",
"--type=timer",
"--state=enabled",
"--no-legend",
]
)
units: List[str] = []
for line in out.splitlines():
parts = line.split()
if not parts:
continue
unit = parts[0].strip()
if not unit.endswith(".timer"):
continue
# Skip template units like "foo@.timer"
if unit.endswith("@.timer"):
continue
units.append(unit)
return sorted(set(units))
def get_unit_info(unit: str) -> UnitInfo:
p = subprocess.run(
[
@ -117,3 +155,62 @@ def get_unit_info(unit: str) -> UnitInfo:
unit_file_state=kv.get("UnitFileState") or None,
condition_result=kv.get("ConditionResult") or None,
)
def get_timer_info(unit: str) -> TimerInfo:
p = subprocess.run(
[
"systemctl",
"show",
unit,
"-p",
"FragmentPath",
"-p",
"DropInPaths",
"-p",
"EnvironmentFiles",
"-p",
"Unit",
"-p",
"ActiveState",
"-p",
"SubState",
"-p",
"UnitFileState",
"-p",
"ConditionResult",
],
text=True,
capture_output=True,
) # nosec
if p.returncode != 0:
raise RuntimeError(f"systemctl show failed for {unit}: {p.stderr}")
kv: dict[str, str] = {}
for line in (p.stdout or "").splitlines():
if "=" in line:
k, v = line.split("=", 1)
kv[k] = v.strip()
fragment = kv.get("FragmentPath") or None
dropins = [pp for pp in (kv.get("DropInPaths", "") or "").split() if pp]
env_files: List[str] = []
for token in (kv.get("EnvironmentFiles", "") or "").split():
token = token.lstrip("-")
if token:
env_files.append(token)
trigger = kv.get("Unit") or None
return TimerInfo(
name=unit,
fragment_path=fragment,
dropin_paths=dropins,
env_files=env_files,
trigger_unit=trigger,
active_state=kv.get("ActiveState") or None,
sub_state=kv.get("SubState") or None,
unit_file_state=kv.get("UnitFileState") or None,
condition_result=kv.get("ConditionResult") or None,
)

View file

@ -1,4 +1,4 @@
%global upstream_version 0.1.3
%global upstream_version 0.1.4
Name: enroll
Version: %{upstream_version}
@ -44,4 +44,7 @@ Enroll a server's running state retrospectively into Ansible.
%changelog
* Sat Dec 27 2025 Miguel Jacq <mig@mig5.net> - %{version}-%{release}
- Attempt to capture more stuff from /etc that might not be attributable to a specific package. This includes common singletons and systemd timers
- Avoid duplicate apt data in package-specific roles.
* Sat Dec 27 2025 Miguel Jacq <mig@mig5.net> - %{version}-%{release}
- Initial RPM packaging for Fedora 42