Many tweaks

This commit is contained in:
Miguel Jacq 2025-12-15 11:04:54 +11:00
parent 5398ad123c
commit 227be6dd51
Signed by: mig5
GPG key ID: 59B3F0C24135C6A9
20 changed files with 1350 additions and 174 deletions

View file

@ -2,7 +2,7 @@ from __future__ import annotations
import os
from dataclasses import dataclass
from typing import Dict, List, Optional, Set, Tuple
from typing import Dict, List, Set, Tuple
@dataclass
@ -27,7 +27,12 @@ def parse_login_defs(path: str = "/etc/login.defs") -> Dict[str, int]:
if not line or line.startswith("#"):
continue
parts = line.split()
if len(parts) >= 2 and parts[0] in {"UID_MIN", "UID_MAX", "SYS_UID_MIN", "SYS_UID_MAX"}:
if len(parts) >= 2 and parts[0] in {
"UID_MIN",
"UID_MAX",
"SYS_UID_MIN",
"SYS_UID_MAX",
}:
try:
vals[parts[0]] = int(parts[1])
except ValueError:
@ -37,7 +42,9 @@ def parse_login_defs(path: str = "/etc/login.defs") -> Dict[str, int]:
return vals
def parse_passwd(path: str = "/etc/passwd") -> List[Tuple[str, int, int, str, str, str]]:
def parse_passwd(
path: str = "/etc/passwd",
) -> List[Tuple[str, int, int, str, str, str]]:
rows: List[Tuple[str, int, int, str, str, str]] = []
with open(path, "r", encoding="utf-8", errors="replace") as f:
for line in f:
@ -60,7 +67,9 @@ def parse_passwd(path: str = "/etc/passwd") -> List[Tuple[str, int, int, str, st
return rows
def parse_group(path: str = "/etc/group") -> Tuple[Dict[int, str], Dict[str, int], Dict[str, Set[str]]]:
def parse_group(
path: str = "/etc/group",
) -> Tuple[Dict[int, str], Dict[str, int], Dict[str, Set[str]]]:
gid_to_name: Dict[int, str] = {}
name_to_gid: Dict[str, int] = {}
members: Dict[str, Set[str]] = {}
@ -130,16 +139,18 @@ def collect_non_system_users() -> List[UserRecord]:
ssh_files = find_user_ssh_files(home) if home and home.startswith("/") else []
users.append(UserRecord(
name=name,
uid=uid,
gid=gid,
gecos=gecos,
home=home,
shell=shell,
primary_group=primary_group,
supplementary_groups=supp,
ssh_files=ssh_files,
))
users.append(
UserRecord(
name=name,
uid=uid,
gid=gid,
gecos=gecos,
home=home,
shell=shell, # nosec
primary_group=primary_group,
supplementary_groups=supp,
ssh_files=ssh_files,
)
)
return users

View file

@ -9,16 +9,32 @@ def main() -> None:
ap = argparse.ArgumentParser(prog="enroll")
sub = ap.add_subparsers(dest="cmd", required=True)
h = sub.add_parser("harvest", help="Harvest service/package/config state into a bundle")
h.add_argument("--out", required=True, help="Bundle output directory")
h = sub.add_parser("harvest", help="Harvest service/package/config state")
h.add_argument("--out", required=True, help="Harvest output directory")
r = sub.add_parser("manifest", help="Render Ansible roles from a harvested bundle")
r.add_argument("--bundle", required=True, help="Path to the bundle directory created by the harvest command")
r.add_argument("--out", required=True, help="Output directory for generated roles/playbook Ansible manifest")
r = sub.add_parser("manifest", help="Render Ansible roles from a harvest")
r.add_argument(
"--harvest",
required=True,
help="Path to the directory created by the harvest command",
)
r.add_argument(
"--out",
required=True,
help="Output directory for generated roles/playbook Ansible manifest",
)
e = sub.add_parser("export", help="Harvest then manifest in one shot")
e.add_argument("--bundle", required=True, help="Path to the directory to place the bundle in")
e.add_argument("--out", required=True, help="Output directory for generated roles/playbook Ansible manifest")
e = sub.add_parser(
"enroll", help="Harvest state, then manifest Ansible code, in one shot"
)
e.add_argument(
"--harvest", required=True, help="Path to the directory to place the harvest in"
)
e.add_argument(
"--out",
required=True,
help="Output directory for generated roles/playbook Ansible manifest",
)
args = ap.parse_args()
@ -26,7 +42,7 @@ def main() -> None:
path = harvest(args.out)
print(path)
elif args.cmd == "manifest":
manifest(args.bundle, args.out)
elif args.cmd == "export":
harvest(args.bundle)
manifest(args.bundle, args.out)
manifest(args.harvest, args.out)
elif args.cmd == "enroll":
harvest(args.harvest)
manifest(args.harvest, args.out)

View file

@ -3,19 +3,19 @@ from __future__ import annotations
import glob
import hashlib
import os
import subprocess
import subprocess # nosec
from typing import Dict, List, Optional, Set, Tuple
def _run(cmd: list[str]) -> str:
p = subprocess.run(cmd, check=False, text=True, capture_output=True)
p = subprocess.run(cmd, check=False, text=True, capture_output=True) # nosec
if p.returncode != 0:
raise RuntimeError(f"Command failed: {cmd}\n{p.stderr}")
return p.stdout
def dpkg_owner(path: str) -> Optional[str]:
p = subprocess.run(["dpkg", "-S", path], text=True, capture_output=True)
p = subprocess.run(["dpkg", "-S", path], text=True, capture_output=True) #nosec
if p.returncode != 0:
return None
left = p.stdout.split(":", 1)[0].strip()
@ -23,10 +23,9 @@ def dpkg_owner(path: str) -> Optional[str]:
return pkg or None
def list_manual_packages() -> List[str]:
"""Return packages marked as manually installed (apt-mark showmanual)."""
p = subprocess.run(["apt-mark", "showmanual"], text=True, capture_output=True)
p = subprocess.run(["apt-mark", "showmanual"], text=True, capture_output=True) #nosec
if p.returncode != 0:
return []
pkgs: List[str] = []
@ -37,6 +36,7 @@ def list_manual_packages() -> List[str]:
pkgs.append(line)
return sorted(set(pkgs))
def build_dpkg_etc_index(
info_dir: str = "/var/lib/dpkg/info",
) -> Tuple[Set[str], Dict[str, str], Dict[str, Set[str]], Dict[str, List[str]]]:
@ -83,7 +83,9 @@ def build_dpkg_etc_index(
return owned, owner, topdir_to_pkgs, pkg_to_etc
def parse_status_conffiles(status_path: str = "/var/lib/dpkg/status") -> Dict[str, Dict[str, str]]:
def parse_status_conffiles(
status_path: str = "/var/lib/dpkg/status",
) -> Dict[str, Dict[str, str]]:
"""
pkg -> { "/etc/foo": md5hex, ... } based on dpkg status "Conffiles" field.
This md5 is the packaged baseline for the conffile.
@ -152,7 +154,7 @@ def read_pkg_md5sums(pkg: str) -> Dict[str, str]:
def file_md5(path: str) -> str:
h = hashlib.md5()
h = hashlib.md5() # nosec
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
h.update(chunk)
@ -164,6 +166,7 @@ def stat_triplet(path: str) -> Tuple[str, str, str]:
mode = oct(st.st_mode & 0o777)[2:].zfill(4)
import pwd, grp
try:
owner = pwd.getpwuid(st.st_uid).pw_name
except KeyError:

View file

@ -18,8 +18,7 @@ from .debian import (
stat_triplet,
)
from .secrets import SecretPolicy
from .accounts import collect_non_system_users, UserRecord
from .accounts import collect_non_system_users
@dataclass
@ -43,6 +42,10 @@ class ServiceSnapshot:
unit: str
role_name: str
packages: List[str]
active_state: Optional[str]
sub_state: Optional[str]
unit_file_state: Optional[str]
condition_result: Optional[str]
managed_files: List[ManagedFile]
excluded: List[ExcludedFile]
notes: List[str]
@ -66,15 +69,59 @@ class UsersSnapshot:
notes: List[str]
@dataclass
class EtcCustomSnapshot:
role_name: str
managed_files: List[ManagedFile]
excluded: List[ExcludedFile]
notes: List[str]
ALLOWED_UNOWNED_EXTS = {
".conf", ".cfg", ".ini", ".cnf", ".yaml", ".yml", ".json", ".toml",
".rules", ".service", ".socket", ".timer", ".target", ".path", ".mount",
".network", ".netdev", ".link",
".conf",
".cfg",
".ini",
".cnf",
".yaml",
".yml",
".json",
".toml",
".rules",
".service",
".socket",
".timer",
".target",
".path",
".mount",
".network",
".netdev",
".link",
"", # allow extensionless (common in /etc/default and /etc/init.d)
}
MAX_UNOWNED_FILES_PER_ROLE = 400
# Directories that are shared across many packages; never attribute unowned files in these trees to a single package.
SHARED_ETC_TOPDIRS = {
"default",
"apparmor.d",
"network",
"init.d",
"systemd",
"pam.d",
"ssh",
"ssl",
"sudoers.d",
"cron.d",
"cron.daily",
"cron.weekly",
"cron.monthly",
"cron.hourly",
"logrotate.d",
"sysctl.d",
"modprobe.d",
}
def _safe_name(s: str) -> str:
out: List[str] = []
@ -89,10 +136,12 @@ def _role_name_from_unit(unit: str) -> str:
def _role_name_from_pkg(pkg: str) -> str:
return "pkg_" + _safe_name(pkg)
return _safe_name(pkg)
def _copy_into_bundle(bundle_dir: str, role_name: str, abs_path: str, src_rel: str) -> None:
def _copy_into_bundle(
bundle_dir: str, role_name: str, abs_path: str, src_rel: str
) -> None:
dst = os.path.join(bundle_dir, "artifacts", role_name, src_rel)
os.makedirs(os.path.dirname(dst), exist_ok=True)
shutil.copy2(abs_path, dst)
@ -114,7 +163,9 @@ def _hint_names(unit: str, pkgs: Set[str]) -> Set[str]:
return {h for h in hints if h}
def _add_pkgs_from_etc_topdirs(hints: Set[str], topdir_to_pkgs: Dict[str, Set[str]], pkgs: Set[str]) -> None:
def _add_pkgs_from_etc_topdirs(
hints: Set[str], topdir_to_pkgs: Dict[str, Set[str]], pkgs: Set[str]
) -> None:
for h in hints:
for p in topdir_to_pkgs.get(h, set()):
pkgs.add(p)
@ -123,16 +174,20 @@ def _add_pkgs_from_etc_topdirs(hints: Set[str], topdir_to_pkgs: Dict[str, Set[st
def _maybe_add_specific_paths(hints: Set[str]) -> List[str]:
paths: List[str] = []
for h in hints:
paths.extend([
f"/etc/default/{h}",
f"/etc/init.d/{h}",
f"/etc/sysctl.d/{h}.conf",
f"/etc/logrotate.d/{h}",
])
paths.extend(
[
f"/etc/default/{h}",
f"/etc/init.d/{h}",
f"/etc/sysctl.d/{h}.conf",
f"/etc/logrotate.d/{h}",
]
)
return paths
def _scan_unowned_under_roots(roots: List[str], owned_etc: Set[str], limit: int = MAX_UNOWNED_FILES_PER_ROLE) -> List[str]:
def _scan_unowned_under_roots(
roots: List[str], owned_etc: Set[str], limit: int = MAX_UNOWNED_FILES_PER_ROLE
) -> List[str]:
found: List[str] = []
for root in roots:
if not os.path.isdir(root):
@ -170,7 +225,10 @@ def harvest(bundle_dir: str, policy: Optional[SecretPolicy] = None) -> str:
os.makedirs(bundle_dir, exist_ok=True)
if hasattr(os, "geteuid") and os.geteuid() != 0:
print("Warning: not running as root; harvest may miss files or metadata.", flush=True)
print(
"Warning: not running as root; harvest may miss files or metadata.",
flush=True,
)
owned_etc, etc_owner_map, topdir_to_pkgs, pkg_to_etc_paths = build_dpkg_etc_index()
conffiles_by_pkg = parse_status_conffiles()
@ -185,14 +243,20 @@ def harvest(bundle_dir: str, policy: Optional[SecretPolicy] = None) -> str:
try:
ui = get_unit_info(unit)
except UnitQueryError as e:
service_snaps.append(ServiceSnapshot(
unit=unit,
role_name=role,
packages=[],
managed_files=[],
excluded=[],
notes=[str(e)],
))
service_snaps.append(
ServiceSnapshot(
unit=unit,
role_name=role,
packages=[],
active_state=None,
sub_state=None,
unit_file_state=None,
condition_result=None,
managed_files=[],
excluded=[],
notes=[str(e)],
)
)
continue
pkgs: Set[str] = set()
@ -243,6 +307,7 @@ def harvest(bundle_dir: str, policy: Optional[SecretPolicy] = None) -> str:
if not os.path.isfile(path) or os.path.islink(path):
continue
if path in conff:
# Only capture conffiles when they differ from the package default.
try:
current = file_md5(path)
except OSError:
@ -267,7 +332,9 @@ def harvest(bundle_dir: str, policy: Optional[SecretPolicy] = None) -> str:
candidates.setdefault(pth, "custom_unowned")
if not pkgs and not candidates:
notes.append("No packages or /etc candidates detected (unexpected for enabled service).")
notes.append(
"No packages or /etc candidates detected (unexpected for enabled service)."
)
for path, reason in sorted(candidates.items()):
deny = policy.deny_reason(path)
@ -285,31 +352,49 @@ def harvest(bundle_dir: str, policy: Optional[SecretPolicy] = None) -> str:
except OSError:
excluded.append(ExcludedFile(path=path, reason="unreadable"))
continue
managed.append(ManagedFile(
path=path,
src_rel=src_rel,
owner=owner,
group=group,
mode=mode,
reason=reason,
))
managed.append(
ManagedFile(
path=path,
src_rel=src_rel,
owner=owner,
group=group,
mode=mode,
reason=reason,
)
)
service_snaps.append(ServiceSnapshot(
unit=unit,
role_name=role,
packages=sorted(pkgs),
managed_files=managed,
excluded=excluded,
notes=notes,
))
service_snaps.append(
ServiceSnapshot(
unit=unit,
role_name=role,
packages=sorted(pkgs),
active_state=ui.active_state,
sub_state=ui.sub_state,
unit_file_state=ui.unit_file_state,
condition_result=ui.condition_result,
managed_files=managed,
excluded=excluded,
notes=notes,
)
)
# -------------------------
# Manual package roles
# -------------------------
manual_pkgs = list_manual_packages()
# Avoid duplicate roles: if a manual package is already managed by any service role, skip its pkg_<name> role.
covered_by_services: Set[str] = set()
for s in service_snaps:
for p in s.packages:
covered_by_services.add(p)
manual_pkgs_skipped: List[str] = []
pkg_snaps: List[PackageSnapshot] = []
for pkg in manual_pkgs:
if pkg in covered_by_services:
manual_pkgs_skipped.append(pkg)
continue
role = _role_name_from_pkg(pkg)
notes: List[str] = []
excluded: List[ExcludedFile] = []
@ -343,13 +428,17 @@ def harvest(bundle_dir: str, policy: Optional[SecretPolicy] = None) -> str:
topdirs = _topdirs_for_package(pkg, pkg_to_etc_paths)
roots: List[str] = []
for td in sorted(topdirs):
if td in SHARED_ETC_TOPDIRS:
continue
roots.extend([f"/etc/{td}", f"/etc/{td}.d"])
roots.extend([f"/etc/default/{td}"])
roots.extend([f"/etc/init.d/{td}"])
roots.extend([f"/etc/logrotate.d/{td}"])
roots.extend([f"/etc/sysctl.d/{td}.conf"])
for pth in _scan_unowned_under_roots([r for r in roots if os.path.isdir(r)], owned_etc):
for pth in _scan_unowned_under_roots(
[r for r in roots if os.path.isdir(r)], owned_etc
):
candidates.setdefault(pth, "custom_unowned")
for r in roots:
@ -373,25 +462,31 @@ def harvest(bundle_dir: str, policy: Optional[SecretPolicy] = None) -> str:
except OSError:
excluded.append(ExcludedFile(path=path, reason="unreadable"))
continue
managed.append(ManagedFile(
path=path,
src_rel=src_rel,
owner=owner,
group=group,
mode=mode,
reason=reason,
))
managed.append(
ManagedFile(
path=path,
src_rel=src_rel,
owner=owner,
group=group,
mode=mode,
reason=reason,
)
)
if not pkg_to_etc_paths.get(pkg, []) and not managed:
notes.append("No /etc files detected for this package (may be a meta package).")
notes.append(
"No /etc files detected for this package (may be a meta package)."
)
pkg_snaps.append(PackageSnapshot(
package=pkg,
role_name=role,
managed_files=managed,
excluded=excluded,
notes=notes,
))
pkg_snaps.append(
PackageSnapshot(
package=pkg,
role_name=role,
managed_files=managed,
excluded=excluded,
notes=notes,
)
)
# -------------------------
# Users role (non-system users)
@ -402,7 +497,7 @@ def harvest(bundle_dir: str, policy: Optional[SecretPolicy] = None) -> str:
users_list: List[dict] = []
try:
us
user_records = collect_non_system_users()
except Exception as e:
user_records = []
users_notes.append(f"Failed to enumerate users: {e!r}")
@ -410,47 +505,51 @@ def harvest(bundle_dir: str, policy: Optional[SecretPolicy] = None) -> str:
users_role_name = "users"
for u in user_records:
users_list.append({
"name": u.name,
"uid": u.uid,
"gid": u.gid,
"gecos": u.gecos,
"home": u.home,
"shell": u.shell,
"primary_group": u.primary_group,
"supplementary_groups": u.supplementary_groups,
})
users_list.append(
{
"name": u.name,
"uid": u.uid,
"gid": u.gid,
"gecos": u.gecos,
"home": u.home,
"shell": u.shell,
"primary_group": u.primary_group,
"supplementary_groups": u.supplementary_groups,
}
)
# Copy authorized_keys
# Copy only safe SSH public material: authorized_keys + *.pub
for sf in u.ssh_files:
deny = policy.deny_reason(sf)
if deny:
users_excluded.append(ExcludedFile(path=sf, reason=deny))
continue
# Force safe modes; still record current owner/group for reference.
try:
owner, group, mode = stat_triplet(sf)
except OSError:
users_excluded.append(ExcludedFile(path=sf, reason="unreadable"))
continue
src_rel = sf.lstrip("/")
try:
_copy_into_bundle(bundle_dir, users_role_name, sf, src_rel)
except OSError:
users_excluded.append(ExcludedFile(path=sf, reason="unreadable"))
continue
reason = "authorized_keys" if sf.endswith("/authorized_keys") else "ssh_public_key"
users_managed.append(ManagedFile(
path=sf,
src_rel=src_rel,
owner=owner,
group=group,
mode=mode,
reason=reason,
))
reason = (
"authorized_keys"
if sf.endswith("/authorized_keys")
else "ssh_public_key"
)
users_managed.append(
ManagedFile(
path=sf,
src_rel=src_rel,
owner=owner,
group=group,
mode=mode,
reason=reason,
)
)
users_snapshot = UsersSnapshot(
role_name=users_role_name,
@ -460,12 +559,91 @@ def harvest(bundle_dir: str, policy: Optional[SecretPolicy] = None) -> str:
notes=users_notes,
)
# -------------------------
# etc_custom role (unowned /etc files not already attributed elsewhere)
# -------------------------
etc_notes: List[str] = []
etc_excluded: List[ExcludedFile] = []
etc_managed: List[ManagedFile] = []
etc_role_name = "etc_custom"
# Build a set of files already captured by other roles.
already: Set[str] = set()
for s in service_snaps:
for mf in s.managed_files:
already.add(mf.path)
for p in pkg_snaps:
for mf in p.managed_files:
already.add(mf.path)
for mf in users_managed:
already.add(mf.path)
# Walk /etc for unowned config-ish files
scanned = 0
for dirpath, _, filenames in os.walk("/etc"):
for fn in filenames:
path = os.path.join(dirpath, fn)
if path in already:
continue
if path in owned_etc:
continue
if not os.path.isfile(path) or os.path.islink(path):
continue
if not _is_confish(path):
continue
deny = policy.deny_reason(path)
if deny:
etc_excluded.append(ExcludedFile(path=path, reason=deny))
continue
try:
owner, group, mode = stat_triplet(path)
except OSError:
etc_excluded.append(ExcludedFile(path=path, reason="unreadable"))
continue
src_rel = path.lstrip("/")
try:
_copy_into_bundle(bundle_dir, etc_role_name, path, src_rel)
except OSError:
etc_excluded.append(ExcludedFile(path=path, reason="unreadable"))
continue
etc_managed.append(
ManagedFile(
path=path,
src_rel=src_rel,
owner=owner,
group=group,
mode=mode,
reason="custom_unowned",
)
)
scanned += 1
if scanned >= 2000:
etc_notes.append(
"Reached file cap (2000) while scanning /etc for unowned files."
)
break
if scanned >= 2000:
break
etc_custom_snapshot = EtcCustomSnapshot(
role_name=etc_role_name,
managed_files=etc_managed,
excluded=etc_excluded,
notes=etc_notes,
)
state = {
"host": {"hostname": os.uname().nodename, "os": "debian"},
"users": asdict(users_snapshot),
"services": [asdict(s) for s in service_snaps],
"manual_packages": manual_pkgs,
"manual_packages_skipped": manual_pkgs_skipped,
"package_roles": [asdict(p) for p in pkg_snaps],
"etc_custom": asdict(etc_custom_snapshot),
}
state_path = os.path.join(bundle_dir, "state.json")

View file

@ -50,12 +50,14 @@ def manifest(bundle_dir: str, out_dir: str) -> None:
services: List[Dict[str, Any]] = state.get("services", [])
package_roles: List[Dict[str, Any]] = state.get("package_roles", [])
users_snapshot: Dict[str, Any] = state.get("users", {})
etc_custom_snapshot: Dict[str, Any] = state.get("etc_custom", {})
os.makedirs(out_dir, exist_ok=True)
roles_root = os.path.join(out_dir, "roles")
os.makedirs(roles_root, exist_ok=True)
manifested_users_roles: List[str] = []
manifested_etc_custom_roles: List[str] = []
manifested_service_roles: List[str] = []
manifested_pkg_roles: List[str] = []
@ -86,11 +88,17 @@ def manifest(bundle_dir: str, out_dir: str) -> None:
# defaults: store users list (handy for later), but tasks are explicit for readability
defaults = """---
users_accounts:
""" + ("\n".join([f" - name: {u.get('name')}" for u in users]) + "\n")
with open(os.path.join(role_dir, "defaults", "main.yml"), "w", encoding="utf-8") as f:
""" + (
"\n".join([f" - name: {u.get('name')}" for u in users]) + "\n"
)
with open(
os.path.join(role_dir, "defaults", "main.yml"), "w", encoding="utf-8"
) as f:
f.write(defaults)
with open(os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8") as f:
with open(
os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8"
) as f:
f.write("---\ndependencies: []\n")
# tasks
@ -112,7 +120,7 @@ users_accounts:
lines.append(f" group: {u.get('primary_group')}")
supp = u.get("supplementary_groups") or []
if supp:
lines.append(" groups: " + ",".join(supp))
lines.append(" groups: " + ",".join(sorted(supp)))
lines.append(" append: true")
lines.append(f" home: {u.get('home')}")
lines.append(" create_home: true")
@ -120,9 +128,8 @@ users_accounts:
lines.append(f" shell: {u.get('shell')}")
if u.get("gecos"):
# quote to avoid YAML surprises
gec = u.get("gecos").replace('"', '\"')
gec = u.get("gecos").replace('"', '"')
lines.append(f' comment: "{gec}"')
lines.append(" password_lock: true")
lines.append(" state: present")
# Ensure ~/.ssh
@ -163,30 +170,122 @@ users_accounts:
lines.append(f" mode: '{mode}'")
tasks = "\n".join(lines).rstrip() + "\n"
with open(os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8") as f:
with open(
os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8"
) as f:
f.write(tasks)
# handlers (none needed)
with open(os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8") as f:
with open(
os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8"
) as f:
f.write("---\n")
readme = """# users
readme = (
"""# users
Generated non-system user accounts and SSH public material.
## Users
""" + ("\n".join([f"- {u.get('name')} (uid {u.get('uid')})" for u in users]) or "- (none)") + """\n
"""
+ (
"\n".join([f"- {u.get('name')} (uid {u.get('uid')})" for u in users])
or "- (none)"
)
+ """\n
## Included SSH files
""" + ("\n".join([f"- {mf.get('path')} ({mf.get('reason')})" for mf in managed_files]) or "- (none)") + """\n
"""
+ (
"\n".join(
[f"- {mf.get('path')} ({mf.get('reason')})" for mf in managed_files]
)
or "- (none)"
)
+ """\n
## Excluded
""" + ("\n".join([f"- {e.get('path')} ({e.get('reason')})" for e in excluded]) or "- (none)") + """\n
"""
+ (
"\n".join([f"- {e.get('path')} ({e.get('reason')})" for e in excluded])
or "- (none)"
)
+ """\n
## Notes
""" + ("\n".join([f"- {n}" for n in notes]) or "- (none)") + """\n"""
"""
+ ("\n".join([f"- {n}" for n in notes]) or "- (none)")
+ """\n"""
)
with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f:
f.write(readme)
manifested_users_roles.append(role)
# -------------------------
# etc_custom role (unowned /etc not already attributed)
# -------------------------
if etc_custom_snapshot and etc_custom_snapshot.get("managed_files"):
role = etc_custom_snapshot.get("role_name", "etc_custom")
role_dir = os.path.join(roles_root, role)
_write_role_scaffold(role_dir)
_copy_artifacts(bundle_dir, role, role_dir)
managed_files = etc_custom_snapshot.get("managed_files", [])
excluded = etc_custom_snapshot.get("excluded", [])
notes = etc_custom_snapshot.get("notes", [])
# tasks: just deploy files (no restarts)
lines: List[str] = ["---"]
for mf in managed_files:
dest = mf["path"]
src = mf["src_rel"]
lines.append(f"- name: Deploy {dest}")
lines.append(" ansible.builtin.copy:")
lines.append(f" src: {src}")
lines.append(f" dest: {dest}")
lines.append(f" owner: {mf.get('owner')}")
lines.append(f" group: {mf.get('group')}")
lines.append(f" mode: '{mf.get('mode')}'")
tasks = "\n".join(lines).rstrip() + "\n"
with open(
os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8"
) as f:
f.write(tasks)
with open(
os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8"
) as f:
f.write("---\n")
with open(
os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8"
) as f:
f.write("---\ndependencies: []\n")
readme = (
"""# etc_custom
Unowned /etc config files not attributed to packages or services.
## Managed files
"""
+ ("\n".join([f"- {mf.get('path')}" for mf in managed_files]) or "- (none)")
+ """\n
## Excluded
"""
+ (
"\n".join([f"- {e.get('path')} ({e.get('reason')})" for e in excluded])
or "- (none)"
)
+ """\n
## Notes
"""
+ ("\n".join([f"- {n}" for n in notes]) or "- (none)")
+ """\n"""
)
with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f:
f.write(readme)
manifested_etc_custom_roles.append(role)
# -------------------------
# Service roles
# -------------------------
@ -202,11 +301,16 @@ Generated non-system user accounts and SSH public material.
var_prefix = role
was_active = svc.get("active_state") == "active"
defaults = f"""---
{var_prefix}_packages:
{_yaml_list(pkgs, indent=2)}
{var_prefix}_active_state_at_harvest: "{svc.get("active_state")}"
{var_prefix}_start: {"true" if was_active else "false"}
"""
with open(os.path.join(role_dir, "defaults", "main.yml"), "w", encoding="utf-8") as f:
with open(
os.path.join(role_dir, "defaults", "main.yml"), "w", encoding="utf-8"
) as f:
f.write(defaults)
handlers = """---
@ -219,10 +323,14 @@ Generated non-system user accounts and SSH public material.
name: "{{ unit_name }}"
state: restarted
"""
with open(os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8") as f:
with open(
os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8"
) as f:
f.write(handlers)
systemd_files = [mf for mf in managed_files if mf["path"].startswith("/etc/systemd/system/")]
systemd_files = [
mf for mf in managed_files if mf["path"].startswith("/etc/systemd/system/")
]
other_files = [mf for mf in managed_files if mf not in systemd_files]
def copy_task(mf: Dict[str, Any], notify: str | None) -> str:
@ -237,7 +345,8 @@ Generated non-system user accounts and SSH public material.
{notify_line}"""
task_parts: List[str] = []
task_parts.append(f"""---
task_parts.append(
f"""---
- name: Set unit name
ansible.builtin.set_fact:
unit_name: "{unit}"
@ -248,30 +357,44 @@ Generated non-system user accounts and SSH public material.
state: present
update_cache: true
when: {var_prefix}_packages | length > 0
""")
"""
)
if systemd_files:
for mf in systemd_files:
task_parts.append(copy_task(mf, "[systemd daemon-reload]"))
task_parts.append("""- name: Reload systemd to pick up unit changes
task_parts.append(
"""- name: Reload systemd to pick up unit changes
ansible.builtin.meta: flush_handlers
""")
"""
)
for mf in other_files:
task_parts.append(copy_task(mf, "[Restart service]"))
task_parts.append(f"""- name: Ensure {unit} is enabled and running
task_parts.append(
f"""- name: Ensure {unit} is enabled (preserve running state)
ansible.builtin.service:
name: "{{{{ unit_name }}}}"
enabled: true
- name: Start {unit} if it was active at harvest time
ansible.builtin.service:
name: "{{{{ unit_name }}}}"
state: started
""")
when: {var_prefix}_start | bool
"""
)
tasks = "\n".join(task_parts).rstrip() + "\n"
with open(os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8") as f:
with open(
os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8"
) as f:
f.write(tasks)
with open(os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8") as f:
with open(
os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8"
) as f:
f.write("---\ndependencies: []\n")
excluded = svc.get("excluded", [])
@ -315,7 +438,9 @@ Generated from `{unit}`.
{var_prefix}_packages:
- {pkg}
"""
with open(os.path.join(role_dir, "defaults", "main.yml"), "w", encoding="utf-8") as f:
with open(
os.path.join(role_dir, "defaults", "main.yml"), "w", encoding="utf-8"
) as f:
f.write(defaults)
handlers = """---
@ -323,10 +448,14 @@ Generated from `{unit}`.
ansible.builtin.systemd:
daemon_reload: true
"""
with open(os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8") as f:
with open(
os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8"
) as f:
f.write(handlers)
systemd_files = [mf for mf in managed_files if mf["path"].startswith("/etc/systemd/system/")]
systemd_files = [
mf for mf in managed_files if mf["path"].startswith("/etc/systemd/system/")
]
other_files = [mf for mf in managed_files if mf not in systemd_files]
def copy_task(mf: Dict[str, Any], notify: str | None) -> str:
@ -341,29 +470,37 @@ Generated from `{unit}`.
{notify_line}"""
task_parts: List[str] = []
task_parts.append(f"""---
task_parts.append(
f"""---
- name: Install manual package {pkg}
ansible.builtin.apt:
name: "{{{{ {var_prefix}_packages }}}}"
state: present
update_cache: true
""")
"""
)
if systemd_files:
for mf in systemd_files:
task_parts.append(copy_task(mf, "[systemd daemon-reload]"))
task_parts.append("""- name: Reload systemd to pick up unit changes
task_parts.append(
"""- name: Reload systemd to pick up unit changes
ansible.builtin.meta: flush_handlers
""")
"""
)
for mf in other_files:
task_parts.append(copy_task(mf, None))
tasks = "\n".join(task_parts).rstrip() + "\n"
with open(os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8") as f:
with open(
os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8"
) as f:
f.write(tasks)
with open(os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8") as f:
with open(
os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8"
) as f:
f.write("---\ndependencies: []\n")
excluded = pr.get("excluded", [])
@ -389,4 +526,7 @@ Generated for manual package `{pkg}`.
manifested_pkg_roles.append(role)
# Playbooks
_write_playbook(os.path.join(out_dir, "playbook.yml"), manifested_users_roles + manifested_pkg_roles + manifested_service_roles)
_write_playbook(
os.path.join(out_dir, "playbook.yml"),
manifested_users_roles + manifested_etc_custom_roles + manifested_pkg_roles + manifested_service_roles,
)

View file

@ -8,6 +8,15 @@ from typing import Optional
DEFAULT_DENY_GLOBS = [
# Common backup copies created by passwd tools (can contain sensitive data)
"/etc/passwd-",
"/etc/group-",
"/etc/shadow-",
"/etc/gshadow-",
"/etc/subuid-",
"/etc/subgid-",
"/etc/*shadow-",
"/etc/*gshadow-",
"/etc/ssl/private/*",
"/etc/ssh/ssh_host_*",
"/etc/shadow",
@ -17,9 +26,9 @@ DEFAULT_DENY_GLOBS = [
]
SENSITIVE_CONTENT_PATTERNS = [
re.compile(br"-----BEGIN (RSA |EC |OPENSSH |)PRIVATE KEY-----"),
re.compile(br"(?i)\bpassword\s*="),
re.compile(br"(?i)\b(pass|passwd|token|secret|api[_-]?key)\b"),
re.compile(rb"-----BEGIN (RSA |EC |OPENSSH |)PRIVATE KEY-----"),
re.compile(rb"(?i)\bpassword\s*="),
re.compile(rb"(?i)\b(pass|passwd|token|secret|api[_-]?key)\b"),
]

View file

@ -1,7 +1,7 @@
from __future__ import annotations
import re
import subprocess
import subprocess # nosec
from dataclasses import dataclass
from typing import List, Optional
@ -12,7 +12,11 @@ class UnitInfo:
fragment_path: Optional[str]
dropin_paths: List[str]
env_files: List[str]
exec_paths: List[str] # binaries from ExecStart "path=" parts
exec_paths: List[str]
active_state: Optional[str]
sub_state: Optional[str]
unit_file_state: Optional[str]
condition_result: Optional[str]
class UnitQueryError(RuntimeError):
@ -23,14 +27,22 @@ class UnitQueryError(RuntimeError):
def _run(cmd: list[str]) -> str:
p = subprocess.run(cmd, check=False, text=True, capture_output=True)
p = subprocess.run(cmd, check=False, text=True, capture_output=True) # nosec
if p.returncode != 0:
raise RuntimeError(f"Command failed: {cmd}\n{p.stderr}")
return p.stdout
def list_enabled_services() -> List[str]:
out = _run(["systemctl", "list-unit-files", "--type=service", "--state=enabled", "--no-legend"])
out = _run(
[
"systemctl",
"list-unit-files",
"--type=service",
"--state=enabled",
"--no-legend",
]
)
units: List[str] = []
for line in out.splitlines():
parts = line.split()
@ -39,7 +51,7 @@ def list_enabled_services() -> List[str]:
unit = parts[0].strip()
if not unit.endswith(".service"):
continue
# Skip template units like "getty@.service" which are enabled but not valid for systemctl show
# Skip template units like "getty@.service"
if unit.endswith("@.service") or "@.service" in unit:
continue
units.append(unit)
@ -49,13 +61,27 @@ def list_enabled_services() -> List[str]:
def get_unit_info(unit: str) -> UnitInfo:
p = subprocess.run(
[
"systemctl", "show", unit,
"-p", "FragmentPath",
"-p", "DropInPaths",
"-p", "EnvironmentFiles",
"-p", "ExecStart",
"systemctl",
"show",
unit,
"-p",
"FragmentPath",
"-p",
"DropInPaths",
"-p",
"EnvironmentFiles",
"-p",
"ExecStart",
"-p",
"ActiveState",
"-p",
"SubState",
"-p",
"UnitFileState",
"-p",
"ConditionResult",
"--no-page",
],
], # nosec
check=False,
text=True,
capture_output=True,
@ -70,7 +96,6 @@ def get_unit_info(unit: str) -> UnitInfo:
kv[k] = v.strip()
fragment = kv.get("FragmentPath") or None
dropins = [pp for pp in (kv.get("DropInPaths", "") or "").split() if pp]
env_files: List[str] = []
@ -87,4 +112,8 @@ def get_unit_info(unit: str) -> UnitInfo:
dropin_paths=sorted(set(dropins)),
env_files=sorted(set(env_files)),
exec_paths=sorted(set(exec_paths)),
active_state=kv.get("ActiveState") or None,
sub_state=kv.get("SubState") or None,
unit_file_state=kv.get("UnitFileState") or None,
condition_result=kv.get("ConditionResult") or None,
)