Manage certain symlinks e.g for apache2/nginx sites-enabled and so on
Some checks failed
Lint / test (push) Waiting to run
Trivy / test (push) Waiting to run
CI / test (push) Has been cancelled

This commit is contained in:
Miguel Jacq 2026-01-05 16:29:21 +11:00
parent bcf3dd7422
commit d3fdfc9ef7
Signed by: mig5
GPG key ID: 59B3F0C24135C6A9
4 changed files with 252 additions and 11 deletions

View file

@ -11,8 +11,9 @@
- Captures config that has **changed from packaged defaults** where possible (e.g dpkg conffile hashes + package md5sums when available).
- Also captures **service-relevant custom/unowned files** under `/etc/<service>/...` (e.g. drop-in config includes).
- Defensively excludes likely secrets (path denylist + content sniff + size caps).
- Captures non-system users and their SSH public keys.
- Captures non-system users and their SSH public keys and any .bashrc or .bash_aliases or .profile files that deviate from the skel defaults.
- Captures miscellaneous `/etc` files it can't attribute to a package and installs them in an `etc_custom` role.
- Captures symlinks in common applications that rely on them, e.g apache2/nginx 'sites-enabled'
- Ditto for /usr/local/bin (for non-binary files) and /usr/local/etc
- Avoids trying to start systemd services that were detected as inactive during harvest.

View file

@ -35,6 +35,19 @@ class ManagedFile:
reason: str
@dataclass
class ManagedLink:
"""A symlink we want to materialise on the target host.
For configuration enablement patterns (e.g. sites-enabled), the symlink is
meaningful state even when the link target is captured elsewhere.
"""
path: str
target: str
reason: str
@dataclass
class ManagedDir:
path: str
@ -61,6 +74,7 @@ class ServiceSnapshot:
condition_result: Optional[str]
managed_dirs: List[ManagedDir] = field(default_factory=list)
managed_files: List[ManagedFile] = field(default_factory=list)
managed_links: List[ManagedLink] = field(default_factory=list)
excluded: List[ExcludedFile] = field(default_factory=list)
notes: List[str] = field(default_factory=list)
@ -71,6 +85,7 @@ class PackageSnapshot:
role_name: str
managed_dirs: List[ManagedDir] = field(default_factory=list)
managed_files: List[ManagedFile] = field(default_factory=list)
managed_links: List[ManagedLink] = field(default_factory=list)
excluded: List[ExcludedFile] = field(default_factory=list)
notes: List[str] = field(default_factory=list)
@ -124,12 +139,13 @@ class UsrLocalCustomSnapshot:
@dataclass
class ExtraPathsSnapshot:
role_name: str
include_patterns: List[str]
exclude_patterns: List[str]
managed_dirs: List[ManagedDir]
managed_files: List[ManagedFile]
excluded: List[ExcludedFile]
notes: List[str]
include_patterns: List[str] = field(default_factory=list)
exclude_patterns: List[str] = field(default_factory=list)
managed_dirs: List[ManagedDir] = field(default_factory=list)
managed_files: List[ManagedFile] = field(default_factory=list)
managed_links: List[ManagedLink] = field(default_factory=list)
excluded: List[ExcludedFile] = field(default_factory=list)
notes: List[str] = field(default_factory=list)
ALLOWED_UNOWNED_EXTS = {
@ -211,6 +227,7 @@ def _merge_parent_dirs(
managed_files: List[ManagedFile],
*,
policy: IgnorePolicy,
extra_paths: Optional[List[str]] = None,
) -> List[ManagedDir]:
"""Ensure parent directories for managed_files are present in managed_dirs.
@ -226,8 +243,18 @@ def _merge_parent_dirs(
d.path: d for d in (existing_dirs or []) if d.path
}
for mf in managed_files or []:
p = str(mf.path or "").rstrip("/")
def _iter_paths() -> List[str]:
paths: List[str] = []
for mf in managed_files or []:
if mf and mf.path:
paths.append(str(mf.path))
for p in extra_paths or []:
if p:
paths.append(str(p))
return paths
for p0 in _iter_paths():
p = str(p0 or "").rstrip("/")
if not p:
continue
dpath = os.path.dirname(p)
@ -414,6 +441,72 @@ def _capture_file(
return True
def _capture_link(
*,
role_name: str,
abs_path: str,
reason: str,
policy: IgnorePolicy,
path_filter: PathFilter,
managed_out: List[ManagedLink],
excluded_out: List[ExcludedFile],
seen_role: Optional[Set[str]] = None,
seen_global: Optional[Set[str]] = None,
) -> bool:
"""Try to capture a symlink into the manifest.
NOTE: Symlinks are *not* copied into artifacts; we record their link target
and materialise them via ansible.builtin.file state=link.
"""
if seen_global is not None and abs_path in seen_global:
return False
if seen_role is not None and abs_path in seen_role:
return False
def _mark_seen() -> None:
if seen_role is not None:
seen_role.add(abs_path)
if seen_global is not None:
seen_global.add(abs_path)
if path_filter.is_excluded(abs_path):
excluded_out.append(ExcludedFile(path=abs_path, reason="user_excluded"))
_mark_seen()
return False
deny_link = getattr(policy, "deny_reason_link", None)
if callable(deny_link):
deny = deny_link(abs_path)
else:
# Fallback: apply deny_reason() but treat "not_regular_file" as acceptable
# for symlinks.
deny = policy.deny_reason(abs_path)
if deny in ("not_regular_file", "not_file", "not_regular"):
deny = None
if deny:
excluded_out.append(ExcludedFile(path=abs_path, reason=deny))
_mark_seen()
return False
if not os.path.islink(abs_path):
excluded_out.append(ExcludedFile(path=abs_path, reason="not_symlink"))
_mark_seen()
return False
try:
target = os.readlink(abs_path)
except OSError:
excluded_out.append(ExcludedFile(path=abs_path, reason="unreadable"))
_mark_seen()
return False
managed_out.append(ManagedLink(path=abs_path, target=target, reason=reason))
_mark_seen()
return True
def _is_confish(path: str) -> bool:
base = os.path.basename(path)
_, ext = os.path.splitext(base)
@ -1346,11 +1439,72 @@ def harvest(
package=pkg,
role_name=role,
managed_files=managed,
managed_links=[],
excluded=excluded,
notes=notes,
)
)
# -------------------------
# Web server enablement symlinks (nginx/apache2)
#
# Debian-style nginx/apache2 configurations often use *-enabled directories
# populated with symlinks pointing back into *-available. The symlinks
# represent the enablement state and are important to reproduce.
#
# We only harvest these when the relevant service/package has already been
# detected in this run (i.e. we have a role that will manage nginx/apache2).
# -------------------------
def _find_role_snapshot(role_name: str):
for s in service_snaps:
if s.role_name == role_name:
return s
for p in pkg_snaps:
if p.role_name == role_name:
return p
return None
def _capture_enabled_symlinks(role_name: str, dirs: List[str]) -> None:
snap = _find_role_snapshot(role_name)
if snap is None:
return
role_seen = seen_by_role.setdefault(role_name, set())
for d in dirs:
if not os.path.isdir(d):
continue
for pth in sorted(glob.glob(os.path.join(d, "*"))):
if not os.path.islink(pth):
continue
_capture_link(
role_name=role_name,
abs_path=pth,
reason="enabled_symlink",
policy=policy,
path_filter=path_filter,
managed_out=snap.managed_links,
excluded_out=snap.excluded,
seen_role=role_seen,
seen_global=captured_global,
)
_capture_enabled_symlinks(
"nginx",
[
"/etc/nginx/modules-enabled",
"/etc/nginx/sites-enabled",
],
)
_capture_enabled_symlinks(
"apache2",
[
"/etc/apache2/conf-enabled",
"/etc/apache2/mods-enabled",
"/etc/apache2/sites-enabled",
],
)
# -------------------------
# Users role (non-system users)
# -------------------------
@ -2001,11 +2155,17 @@ def harvest(
)
for s in service_snaps:
s.managed_dirs = _merge_parent_dirs(
s.managed_dirs, s.managed_files, policy=policy
s.managed_dirs,
s.managed_files,
policy=policy,
extra_paths=[ml.path for ml in (s.managed_links or [])],
)
for p in pkg_snaps:
p.managed_dirs = _merge_parent_dirs(
p.managed_dirs, p.managed_files, policy=policy
p.managed_dirs,
p.managed_files,
policy=policy,
extra_paths=[ml.path for ml in (p.managed_links or [])],
)
if apt_config_snapshot:

View file

@ -173,3 +173,45 @@ class IgnorePolicy:
return "not_directory"
return None
def deny_reason_link(self, path: str) -> Optional[str]:
"""Symlink-specific deny logic.
Symlinks are meaningful configuration state (e.g. Debian-style
*-enabled directories). deny_reason() is file-oriented and rejects
symlinks as "not_regular_file".
For symlinks we:
- apply the usual deny_globs (unless dangerous)
- ensure the path is a symlink and we can readlink() it
No size checks or content scanning are performed for symlinks.
"""
# Keep the same fast-path filename ignores as deny_reason().
if path.endswith(".log"):
return "log_file"
if path.endswith("~"):
return "backup_file"
if path.startswith("/etc/") and path.endswith("-"):
return "backup_file"
if not self.dangerous:
for g in self.deny_globs or []:
if fnmatch.fnmatch(path, g):
return "denied_path"
try:
os.lstat(path)
except OSError:
return "unreadable"
if not os.path.islink(path):
return "not_symlink"
try:
os.readlink(path)
except OSError:
return "unreadable"
return None

View file

@ -406,6 +406,20 @@ def _build_managed_files_var(
return out
def _build_managed_links_var(
managed_links: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""Convert enroll managed_links into an Ansible-friendly list of dicts."""
out: List[Dict[str, Any]] = []
for ml in managed_links or []:
dest = ml.get("path") or ""
src = ml.get("target") or ""
if not dest or not src:
continue
out.append({"dest": dest, "src": src})
return out
def _render_generic_files_tasks(
var_prefix: str, *, include_restart_notify: bool
) -> str:
@ -495,6 +509,14 @@ def _render_generic_files_tasks(
| selectattr('kind', 'equalto', 'copy')
| list }}}}
notify: "{{{{ item.notify | default([]) }}}}"
- name: Ensure managed symlinks exist
ansible.builtin.file:
src: "{{{{ item.src }}}}"
dest: "{{{{ item.dest }}}}"
state: link
force: true
loop: "{{{{ {var_prefix}_managed_links | default([]) }}}}"
"""
@ -1652,6 +1674,7 @@ User-requested extra file harvesting.
pkgs = svc.get("packages", []) or []
managed_files = svc.get("managed_files", []) or []
managed_dirs = svc.get("managed_dirs", []) or []
managed_links = svc.get("managed_links", []) or []
role_dir = os.path.join(roles_root, role)
_write_role_scaffold(role_dir)
@ -1696,6 +1719,8 @@ User-requested extra file harvesting.
notify_systemd="Run systemd daemon-reload",
)
links_var = _build_managed_links_var(managed_links)
dirs_var = _build_managed_dirs_var(managed_dirs)
jt_map = _yaml_load_mapping(jt_vars) if jt_vars.strip() else {}
@ -1704,6 +1729,7 @@ User-requested extra file harvesting.
f"{var_prefix}_packages": pkgs,
f"{var_prefix}_managed_files": files_var,
f"{var_prefix}_managed_dirs": dirs_var,
f"{var_prefix}_managed_links": links_var,
f"{var_prefix}_manage_unit": True,
f"{var_prefix}_systemd_enabled": bool(enabled_at_harvest),
f"{var_prefix}_systemd_state": desired_state,
@ -1719,6 +1745,7 @@ User-requested extra file harvesting.
f"{var_prefix}_packages": [],
f"{var_prefix}_managed_files": [],
f"{var_prefix}_managed_dirs": [],
f"{var_prefix}_managed_links": [],
f"{var_prefix}_manage_unit": False,
f"{var_prefix}_systemd_enabled": False,
f"{var_prefix}_systemd_state": "stopped",
@ -1804,6 +1831,9 @@ Generated from `{unit}`.
## Managed files
{os.linesep.join("- " + mf["path"] + " (" + mf["reason"] + ")" for mf in managed_files) or "- (none)"}
## Managed symlinks
{os.linesep.join("- " + ml["path"] + " -> " + ml["target"] + " (" + ml.get("reason", "") + ")" for ml in managed_links) or "- (none)"}
## Excluded (possible secrets / unsafe)
{os.linesep.join("- " + e["path"] + " (" + e["reason"] + ")" for e in excluded) or "- (none)"}
@ -1823,6 +1853,7 @@ Generated from `{unit}`.
pkg = pr.get("package") or ""
managed_files = pr.get("managed_files", []) or []
managed_dirs = pr.get("managed_dirs", []) or []
managed_links = pr.get("managed_links", []) or []
role_dir = os.path.join(roles_root, role)
_write_role_scaffold(role_dir)
@ -1864,6 +1895,8 @@ Generated from `{unit}`.
notify_systemd="Run systemd daemon-reload",
)
links_var = _build_managed_links_var(managed_links)
dirs_var = _build_managed_dirs_var(managed_dirs)
jt_map = _yaml_load_mapping(jt_vars) if jt_vars.strip() else {}
@ -1871,6 +1904,7 @@ Generated from `{unit}`.
f"{var_prefix}_packages": pkgs,
f"{var_prefix}_managed_files": files_var,
f"{var_prefix}_managed_dirs": dirs_var,
f"{var_prefix}_managed_links": links_var,
}
base_vars = _merge_mappings_overwrite(base_vars, jt_map)
@ -1881,6 +1915,7 @@ Generated from `{unit}`.
f"{var_prefix}_packages": [],
f"{var_prefix}_managed_files": [],
f"{var_prefix}_managed_dirs": [],
f"{var_prefix}_managed_links": [],
},
)
_write_hostvars(out_dir, fqdn or "", role, base_vars)
@ -1923,6 +1958,9 @@ Generated for package `{pkg}`.
## Managed files
{os.linesep.join("- " + mf["path"] + " (" + mf["reason"] + ")" for mf in managed_files) or "- (none)"}
## Managed symlinks
{os.linesep.join("- " + ml["path"] + " -> " + ml["target"] + " (" + ml.get("reason", "") + ")" for ml in managed_links) or "- (none)"}
## Excluded (possible secrets / unsafe)
{os.linesep.join("- " + e["path"] + " (" + e["reason"] + ")" for e in excluded) or "- (none)"}