diff --git a/CHANGELOG.md b/CHANGELOG.md index 500e7c8..a6b840d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,5 @@ # 0.7.0 - * Add support for detecting flatpaks and snaps * Add --merge-simple-packages to reduce the number of roles, for packages that have no config files or services to maintain. # 0.6.0 diff --git a/enroll/accounts.py b/enroll/accounts.py index b4f774b..cf2fcd3 100644 --- a/enroll/accounts.py +++ b/enroll/accounts.py @@ -1,48 +1,8 @@ from __future__ import annotations -import configparser import os -import re -import shutil -import subprocess # nosec -from dataclasses import dataclass, field -from typing import Dict, List, Optional, Set, Tuple - - -@dataclass -class FlatpakInstall: - name: str - method: str - remote: Optional[str] = None - branch: Optional[str] = None - arch: Optional[str] = None - kind: Optional[str] = None - ref: Optional[str] = None - user: Optional[str] = None - home: Optional[str] = None - source: str = "filesystem" - - -@dataclass -class FlatpakRemote: - name: str - method: str - url: str - user: Optional[str] = None - home: Optional[str] = None - source: str = "filesystem" - - -@dataclass -class SnapInstall: - name: str - channel: Optional[str] = None - revision: Optional[int] = None - classic: bool = False - devmode: bool = False - dangerous: bool = False - notes: List[str] = field(default_factory=list) - source: str = "snap-list" +from dataclasses import dataclass +from typing import Dict, List, Set, Tuple @dataclass @@ -56,7 +16,6 @@ class UserRecord: primary_group: str supplementary_groups: List[str] ssh_files: List[str] - flatpaks: List[FlatpakInstall] = field(default_factory=list) def parse_login_defs(path: str = "/etc/login.defs") -> Dict[str, int]: @@ -156,612 +115,6 @@ def find_user_ssh_files(home: str) -> List[str]: return sorted(set(out)) -def _read_first_existing_text(paths: List[str]) -> Optional[str]: - for path in paths: - try: - with open(path, "r", encoding="utf-8", errors="replace") as f: - value = f.read().strip() - if value: - return value - except OSError: - continue - return None - - -def _parse_flatpak_ref( - ref: str, -) -> Tuple[Optional[str], str, Optional[str], Optional[str]]: - """Return (kind, name, arch, branch) for a Flatpak ref. - - refs look like app/org.example.App/x86_64/stable or - runtime/org.example.Platform/x86_64/23.08. If the value is already just an - application/runtime ID, keep it as the name and leave the other fields empty. - """ - parts = [p for p in (ref or "").strip().split("/") if p] - if len(parts) >= 4 and parts[0] in {"app", "runtime"}: - return parts[0], parts[1], parts[2], parts[3] - return None, (ref or "").strip(), None, None - - -def _parse_plain_flatpak_list_output( - output: str, - *, - method: str, - user: Optional[str] = None, - home: Optional[str] = None, -) -> List[FlatpakInstall]: - """Parse default `flatpak list` table output. - - Example: - Name Application ID Version Branch Installation - OnionShare org.onionshare.OnionShare 2.6.4 stable system - """ - out: List[FlatpakInstall] = [] - seen: Set[ - Tuple[str, Optional[str], Optional[str], Optional[str], Optional[str]] - ] = set() - id_re = re.compile(r"\b(?:[A-Za-z0-9_-]+\.)+[A-Za-z0-9_-]+\b") - for line in output.splitlines(): - line = line.rstrip() - if not line.strip(): - continue - if "Application ID" in line and "Installation" in line: - continue - match = id_re.search(line) - if not match: - continue - name = match.group(0) - tail = line[match.end() :].split() - installation = tail[-1] if tail else "" - if installation in {"system", "user"} and installation != method: - continue - branch = None - if len(tail) >= 2 and tail[-1] in {"system", "user"}: - branch = tail[-2] - elif tail: - branch = tail[-1] - - key = (name, None, branch, None, None) - if key in seen: - continue - seen.add(key) - out.append( - FlatpakInstall( - name=name, - method=method, - remote=None, - branch=branch, - arch=None, - kind=None, - ref=None, - user=user, - home=home, - source="flatpak-list", - ) - ) - return sorted(out, key=lambda f: (f.name, f.branch or "")) - - -def _parse_flatpak_list_output( - output: str, - *, - method: str, - columns: Optional[Tuple[str, ...]] = None, - user: Optional[str] = None, - home: Optional[str] = None, -) -> List[FlatpakInstall]: - """Parse Flatpak list output. - - If columns is None, parse the default table. Otherwise columns names must - match the order passed to `flatpak list --columns=...`. - """ - if columns is None: - return _parse_plain_flatpak_list_output( - output, method=method, user=user, home=home - ) - - out: List[FlatpakInstall] = [] - seen: Set[ - Tuple[str, Optional[str], Optional[str], Optional[str], Optional[str]] - ] = set() - for line in output.splitlines(): - line = line.strip() - if not line: - continue - lower = line.lower() - if lower.startswith("ref") or lower.startswith("application id"): - continue - - parts = line.split("\t") - if len(parts) < len(columns): - parts = line.split() - if not parts: - continue - - fields = { - name: parts[idx].strip() - for idx, name in enumerate(columns) - if idx < len(parts) - } - ref = fields.get("ref") or fields.get("application") or "" - kind, name, ref_arch, ref_branch = _parse_flatpak_ref(ref) - if not name: - continue - - remote = fields.get("origin") or None - branch = fields.get("branch") or ref_branch - arch = fields.get("arch") or ref_arch - if remote in {"", "-"}: - remote = None - if branch in {"", "-"}: - branch = None - if arch in {"", "-"}: - arch = None - - key = (name, remote, branch, arch, kind) - if key in seen: - continue - seen.add(key) - out.append( - FlatpakInstall( - name=name, - method=method, - remote=remote, - branch=branch, - arch=arch, - kind=kind, - ref=ref if "/" in ref else None, - user=user, - home=home, - source="flatpak-list", - ) - ) - return sorted( - out, - key=lambda f: ( - f.kind or "", - f.name, - f.remote or "", - f.branch or "", - f.arch or "", - ), - ) - - -_KNOWN_FLATPAK_LIST_COLUMNS = { - "name", - "description", - "application", - "version", - "branch", - "arch", - "origin", - "installation", - "ref", - "active", - "latest", - "size", - "options", -} - - -def _parse_flatpak_columns_help(output: str) -> Set[str]: - """Parse `flatpak list --columns=help` output into supported fields.""" - supported: Set[str] = set() - for line in output.splitlines(): - # Help output varies a bit between Flatpak versions. Treat any known - # token as a supported field, whether it appears alone or in a - # description table. - for token in re.findall(r"[A-Za-z_][A-Za-z0-9_-]*", line.lower()): - if token in _KNOWN_FLATPAK_LIST_COLUMNS: - supported.add(token) - return supported - - -def _run_flatpak_columns_help() -> Optional[Set[str]]: - if shutil.which("flatpak") is None: - return None - try: - proc = subprocess.run( # nosec - ["flatpak", "list", "--columns=help"], - shell=False, - check=False, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - timeout=10, - ) - except Exception: - return None - if proc.returncode != 0: - return None - supported = _parse_flatpak_columns_help(proc.stdout or "") - return supported or None - - -def _flatpak_list_attempts( - scope: str, supported: Optional[Set[str]] -) -> List[Tuple[List[str], Optional[Tuple[str, ...]]]]: - def supported_columns(*wanted: str) -> Optional[Tuple[str, ...]]: - if supported is not None and not set(wanted).issubset(supported): - return None - return tuple(wanted) - - column_sets: List[Tuple[str, ...]] = [] - for wanted in ( - ("application", "origin", "branch", "arch"), - ("application", "branch", "arch"), - ("application", "branch"), - ("application",), - ("ref", "origin", "branch", "arch"), - ("ref", "branch", "arch"), - ("ref", "branch"), - ("ref",), - ): - cols = supported_columns(*wanted) - if cols is not None and cols not in column_sets: - column_sets.append(cols) - - attempts: List[Tuple[List[str], Optional[Tuple[str, ...]]]] = [ - ( - ["flatpak", "list", scope, "--columns=" + ",".join(cols)], - cols, - ) - for cols in column_sets - ] - attempts.append((["flatpak", "list", scope], None)) - return attempts - - -def _run_flatpak_list(method: str) -> Optional[Tuple[str, Optional[Tuple[str, ...]]]]: - if shutil.which("flatpak") is None: - return None - - scope = "--system" if method == "system" else "--user" - supported = _run_flatpak_columns_help() - for args, columns in _flatpak_list_attempts(scope, supported): - try: - proc = subprocess.run( # nosec - args, - shell=False, - check=False, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - timeout=10, - ) - except Exception: # nosec B112 - continue - if proc.returncode == 0: - return proc.stdout or "", columns - return None - - -def _flatpak_remote_from_ref( - flatpak_root: str, app_id: str, arch: str, branch: str, remote_names: List[str] -) -> Optional[str]: - for remote_name in remote_names: - ref = os.path.join( - flatpak_root, - "repo", - "refs", - "remotes", - remote_name, - "app", - app_id, - arch, - branch, - ) - if os.path.exists(ref): - return remote_name - return None - - -def _parse_flatpak_deploy_origin(branch_dir: str) -> Optional[str]: - active_dir = os.path.join(branch_dir, "active") - candidates = [ - os.path.join(active_dir, "origin"), - os.path.join(active_dir, "metadata"), - ] - - origin = _read_first_existing_text([candidates[0]]) - if origin: - return origin - - metadata = candidates[1] - if os.path.isfile(metadata): - parser = configparser.ConfigParser(interpolation=None) - try: - parser.read(metadata, encoding="utf-8") - except Exception: - return None - for section in ("Application", "Runtime"): - if parser.has_option(section, "origin"): - value = parser.get(section, "origin", fallback="").strip() - if value: - return value - return None - - -def _find_flatpaks_in_root( - flatpak_root: str, - *, - method: str, - user: Optional[str] = None, - home: Optional[str] = None, -) -> List[FlatpakInstall]: - apps_dir = os.path.join(flatpak_root, "app") - if not os.path.isdir(apps_dir): - return [] - - remote_names = [ - r.name - for r in find_flatpak_remotes(flatpak_root, method=method, user=user, home=home) - ] - out: List[FlatpakInstall] = [] - - try: - app_ids = sorted(os.listdir(apps_dir)) - except OSError: - return [] - - seen: Set[Tuple[str, Optional[str], Optional[str], Optional[str]]] = set() - for app_id in app_ids: - app_path = os.path.join(apps_dir, app_id) - if not os.path.isdir(app_path): - continue - try: - arches = sorted(os.listdir(app_path)) - except OSError: - continue - for arch in arches: - arch_path = os.path.join(app_path, arch) - if not os.path.isdir(arch_path): - continue - try: - branches = sorted(os.listdir(arch_path)) - except OSError: - continue - for branch in branches: - branch_path = os.path.join(arch_path, branch) - if not os.path.isdir(branch_path): - continue - active_dir = os.path.join(branch_path, "active") - if not os.path.exists(active_dir): - continue - remote = _parse_flatpak_deploy_origin(branch_path) - if not remote: - remote = _flatpak_remote_from_ref( - flatpak_root, app_id, arch, branch, remote_names - ) - key = (app_id, remote, branch, arch) - if key in seen: - continue - seen.add(key) - out.append( - FlatpakInstall( - name=app_id, - method=method, - remote=remote, - branch=branch or None, - arch=arch or None, - kind="app", - ref=f"app/{app_id}/{arch}/{branch}", - user=user, - home=home, - ) - ) - - return sorted( - out, key=lambda f: (f.name, f.remote or "", f.branch or "", f.arch or "") - ) - - -def find_flatpak_remotes( - flatpak_root: str, - *, - method: str, - user: Optional[str] = None, - home: Optional[str] = None, -) -> List[FlatpakRemote]: - """Return configured Flatpak remotes for a Flatpak installation root. - - Flatpak stores remotes in the OSTree repo config. This gives us the remote - names and repository URLs. It does not reliably preserve the original - .flatpakref/.flatpakrepo URL that was used during installation. - """ - config_path = os.path.join(flatpak_root, "repo", "config") - if not os.path.isfile(config_path): - return [] - - parser = configparser.ConfigParser(interpolation=None, strict=False) - try: - parser.read(config_path, encoding="utf-8") - except Exception: - return [] - - out: List[FlatpakRemote] = [] - for section in parser.sections(): - match = re.fullmatch(r'remote\s+"(.+)"', section) - if not match: - continue - name = match.group(1).strip() - url = parser.get(section, "url", fallback="").strip() - if not name or not url: - continue - out.append( - FlatpakRemote( - name=name, - method=method, - url=url, - user=user, - home=home, - ) - ) - - return sorted(out, key=lambda r: (r.method, r.user or "", r.name)) - - -def find_user_flatpaks(home: str, user: Optional[str] = None) -> List[FlatpakInstall]: - """Return per-user Flatpak applications installed under a home directory.""" - flatpak_root = os.path.join(home, ".local", "share", "flatpak") - return _find_flatpaks_in_root(flatpak_root, method="user", user=user, home=home) - - -def find_user_flatpak_remotes( - home: str, user: Optional[str] = None -) -> List[FlatpakRemote]: - flatpak_root = os.path.join(home, ".local", "share", "flatpak") - return find_flatpak_remotes(flatpak_root, method="user", user=user, home=home) - - -def find_system_flatpaks() -> List[FlatpakInstall]: - """Return Flatpak refs installed system-wide. - - Prefer `flatpak list --system` because it is Flatpak's own view of - installed refs and includes layouts the filesystem scanner might miss. - Fall back to the on-disk app deployment tree when the command is - unavailable or produces unparsable output. - """ - listing = _run_flatpak_list("system") - if listing is not None: - output, columns = listing - parsed = _parse_flatpak_list_output(output, method="system", columns=columns) - if parsed or not output.strip(): - return parsed - return _find_flatpaks_in_root("/var/lib/flatpak", method="system") - - -def find_system_flatpak_remotes() -> List[FlatpakRemote]: - return find_flatpak_remotes("/var/lib/flatpak", method="system") - - -def _parse_snap_notes(notes: str) -> List[str]: - if not notes or notes == "-": - return [] - cleaned = notes.replace(",", " ").replace(";", " ") - return sorted( - {n.strip().lower() for n in cleaned.split() if n.strip() and n.strip() != "-"} - ) - - -def _parse_snap_list_output(output: str) -> List[SnapInstall]: - out: List[SnapInstall] = [] - for idx, line in enumerate(output.splitlines()): - line = line.strip() - if not line: - continue - if idx == 0 and line.lower().startswith("name"): - continue - parts = line.split(maxsplit=5) - if len(parts) < 5: - continue - name = parts[0] - revision: Optional[int] - try: - revision = int(parts[2]) - except ValueError: - revision = None - tracking = parts[3] - channel = None if tracking in {"-", ""} else tracking - notes = _parse_snap_notes(parts[5] if len(parts) > 5 else "") - out.append( - SnapInstall( - name=name, - channel=channel, - revision=revision, - classic="classic" in notes, - devmode="devmode" in notes, - dangerous="dangerous" in notes, - notes=notes, - source="snap-list", - ) - ) - return sorted(out, key=lambda s: s.name) - - -def _run_snap_list() -> Optional[str]: - if shutil.which("snap") is None: - return None - try: - proc = subprocess.run( # nosec - ["snap", "list"], - shell=False, - check=False, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - timeout=10, - ) - except Exception: - return None - if proc.returncode != 0: - return None - return proc.stdout or "" - - -def _find_system_snaps_from_filesystem() -> List[SnapInstall]: - snapd_snaps = "/var/lib/snapd/snaps" - if not os.path.isdir(snapd_snaps): - return [] - - current_revisions: Dict[str, int] = {} - snap_mounts = "/snap" - if os.path.isdir(snap_mounts): - try: - mount_names = os.listdir(snap_mounts) - except OSError: - mount_names = [] - for name in mount_names: - current = os.path.join(snap_mounts, name, "current") - try: - target = os.readlink(current) - except OSError: - continue - try: - current_revisions[name] = int(os.path.basename(target.rstrip("/"))) - except ValueError: - continue - - candidates: Dict[str, List[int]] = {} - try: - entries = os.listdir(snapd_snaps) - except OSError: - return [] - - for entry in entries: - if not entry.endswith(".snap") or "_" not in entry: - continue - name, rev_text = entry[:-5].rsplit("_", 1) - try: - revision = int(rev_text) - except ValueError: - continue - candidates.setdefault(name, []).append(revision) - - out: List[SnapInstall] = [] - for name, revisions in candidates.items(): - revision = current_revisions.get(name) - if revision is None: - revision = max(revisions) - out.append(SnapInstall(name=name, revision=revision, source="filesystem")) - return sorted(out, key=lambda s: s.name) - - -def find_system_snaps() -> List[SnapInstall]: - """Return system-wide snap packages. - - Prefer `snap list` because it exposes channel tracking and confinement notes. - Fall back to snapd's on-disk snap filenames when the command is unavailable. - """ - output = _run_snap_list() - if output is not None: - parsed = _parse_snap_list_output(output) - if parsed: - return parsed - return _find_system_snaps_from_filesystem() - - def collect_non_system_users() -> List[UserRecord]: defs = parse_login_defs() uid_min = defs.get("UID_MIN", 1000) @@ -786,10 +139,6 @@ def collect_non_system_users() -> List[UserRecord]: ssh_files = find_user_ssh_files(home) if home and home.startswith("/") else [] - flatpaks: List[FlatpakInstall] = [] - if home and home.startswith("/"): - flatpaks = find_user_flatpaks(home, user=name) - users.append( UserRecord( name=name, @@ -801,7 +150,6 @@ def collect_non_system_users() -> List[UserRecord]: primary_group=primary_group, supplementary_groups=supp, ssh_files=ssh_files, - flatpaks=flatpaks, ) ) diff --git a/enroll/harvest.py b/enroll/harvest.py index 01d3ff7..a9d5526 100644 --- a/enroll/harvest.py +++ b/enroll/harvest.py @@ -10,9 +10,8 @@ import stat import subprocess # nosec import time from dataclasses import dataclass, asdict, field -from typing import Any, Dict, List, Optional, Set, Tuple +from typing import Dict, List, Optional, Set, Tuple -from .role_names import avoid_reserved_role_name from .systemd import ( list_enabled_services, list_enabled_timers, @@ -102,23 +101,6 @@ class UsersSnapshot: managed_files: List[ManagedFile] = field(default_factory=list) excluded: List[ExcludedFile] = field(default_factory=list) notes: List[str] = field(default_factory=list) - user_flatpaks: Dict[str, List[Dict[str, Any]]] = field(default_factory=dict) - user_flatpak_remotes: List[Dict[str, Any]] = field(default_factory=list) - - -@dataclass -class FlatpakSnapshot: - role_name: str - system_flatpaks: List[Dict[str, Any]] = field(default_factory=list) - remotes: List[Dict[str, Any]] = field(default_factory=list) - notes: List[str] = field(default_factory=list) - - -@dataclass -class SnapSnapshot: - role_name: str - system_snaps: List[Dict[str, Any]] = field(default_factory=list) - notes: List[str] = field(default_factory=list) @dataclass @@ -382,11 +364,11 @@ def _role_id(raw: str) -> str: def _role_name_from_unit(unit: str) -> str: base = _role_id(unit.removesuffix(".service")) - return avoid_reserved_role_name(_safe_name(base), prefix="service") + return _safe_name(base) def _role_name_from_pkg(pkg: str) -> str: - return avoid_reserved_role_name(_safe_name(pkg), prefix="package") + return _safe_name(pkg) def _copy_into_bundle( @@ -1826,30 +1808,6 @@ def harvest( user_records = [] users_notes.append(f"Failed to enumerate users: {e!r}") - # Detect system-wide Flatpaks/Snaps and configured Flatpak remotes. - from .accounts import ( - find_system_flatpak_remotes, - find_system_flatpaks, - find_system_snaps, - find_user_flatpak_remotes, - ) - - system_flatpaks = [asdict(f) for f in find_system_flatpaks()] - system_snaps = [asdict(s) for s in find_system_snaps()] - system_flatpak_remotes = [asdict(r) for r in find_system_flatpak_remotes()] - flatpak_notes: List[str] = [] - snap_notes: List[str] = [] - if system_flatpaks: - flatpak_notes.append( - "System-wide flatpaks detected: " - + ", ".join(str(f.get("name")) for f in system_flatpaks) - ) - if system_snaps: - snap_notes.append( - "System-wide snaps detected: " - + ", ".join(str(s.get("name")) for s in system_snaps) - ) - users_role_name = "users" users_role_seen = seen_by_role.setdefault(users_role_name, set()) @@ -1865,9 +1823,6 @@ def harvest( (".bash_aliases", "user_shell_aliases"), ] - user_flatpaks_map: Dict[str, List[Dict[str, Any]]] = {} - user_flatpak_remotes: List[Dict[str, Any]] = [] - for u in user_records: users_list.append( { @@ -1944,36 +1899,12 @@ def harvest( seen_global=captured_global, ) - # Collect per-user Flatpak applications and remotes. Snap packages are - # system-wide; ~/snap/* is user data, not an install source. - if u.flatpaks: - user_flatpaks_map[u.name] = [asdict(fp) for fp in u.flatpaks] - if home and home.startswith("/"): - user_flatpak_remotes.extend( - asdict(r) for r in find_user_flatpak_remotes(home, user=u.name) - ) - users_snapshot = UsersSnapshot( role_name=users_role_name, users=users_list, managed_files=users_managed, excluded=users_excluded, notes=users_notes, - user_flatpaks=user_flatpaks_map, - user_flatpak_remotes=user_flatpak_remotes, - ) - - flatpak_snapshot = FlatpakSnapshot( - role_name="flatpak", - system_flatpaks=system_flatpaks, - remotes=system_flatpak_remotes, - notes=flatpak_notes, - ) - - snap_snapshot = SnapSnapshot( - role_name="snap", - system_snaps=system_snaps, - notes=snap_notes, ) # ------------------------- @@ -2581,8 +2512,6 @@ def harvest( }, "roles": { "users": asdict(users_snapshot), - "flatpak": asdict(flatpak_snapshot), - "snap": asdict(snap_snapshot), "services": [asdict(s) for s in service_snaps], "packages": [asdict(p) for p in pkg_snaps], "apt_config": asdict(apt_config_snapshot), diff --git a/enroll/manifest.py b/enroll/manifest.py index 219ea64..97f8caf 100644 --- a/enroll/manifest.py +++ b/enroll/manifest.py @@ -10,8 +10,6 @@ import tempfile from pathlib import Path from typing import Any, Dict, List, Optional, Set, Tuple -from .role_names import avoid_reserved_role_name - from .jinjaturtle import ( can_jinjify_path, find_jinjaturtle_cmd, @@ -231,72 +229,6 @@ def _ensure_ansible_cfg(cfg_path: str) -> None: return -def _ensure_requirements_yaml(req_path: str) -> None: - if not os.path.exists(req_path): - with open(req_path, "w", encoding="utf-8") as f: - f.write("---\n") - f.write("collections:\n") - f.write(" - name: community.general\n") - f.write(' version: ">=13.0.0"\n') - return - - -def _normalise_flatpak_item( - item: Any, - *, - method: str, - user: Optional[str] = None, - home: Optional[str] = None, -) -> Dict[str, Any]: - if isinstance(item, str): - out: Dict[str, Any] = {"name": item, "method": method} - elif isinstance(item, dict): - out = dict(item) - out.setdefault("method", method) - else: - out = {"name": str(item), "method": method} - if user: - out.setdefault("user", user) - if home: - out.setdefault("home", home) - return out - - -def _normalise_flatpak_remote(item: Any) -> Dict[str, Any]: - if isinstance(item, dict): - out = dict(item) - else: - out = {"name": str(item)} - out.setdefault("method", "system") - return out - - -def _normalise_snap_item(item: Any) -> Dict[str, Any]: - if isinstance(item, str): - out: Dict[str, Any] = {"name": item} - elif isinstance(item, dict): - out = dict(item) - else: - out = {"name": str(item)} - - notes = out.get("notes") or [] - if isinstance(notes, str): - notes = [notes] - notes_l = {str(n).lower() for n in notes} - out["classic"] = bool(out.get("classic") or "classic" in notes_l) - out["devmode"] = bool(out.get("devmode") or "devmode" in notes_l) - out["dangerous"] = bool(out.get("dangerous") or "dangerous" in notes_l) - - # The Ansible snap module's revision parameter pins/holds the snap. For - # ordinary store snaps that track a channel, preserve the channel instead - # of freezing every harvested host at today's revision. - if out.get("revision") is not None and not out.get("channel"): - out["install_revision"] = True - else: - out["install_revision"] = False - return out - - def _ensure_inventory_host(inv_path: str, fqdn: str) -> None: os.makedirs(os.path.dirname(inv_path), exist_ok=True) if not os.path.exists(inv_path): @@ -904,8 +836,6 @@ def _manifest_from_bundle_dir( services: List[Dict[str, Any]] = roles.get("services", []) package_roles: List[Dict[str, Any]] = roles.get("packages", []) users_snapshot: Dict[str, Any] = roles.get("users", {}) - flatpak_snapshot: Dict[str, Any] = roles.get("flatpak", {}) - snap_snapshot: Dict[str, Any] = roles.get("snap", {}) apt_config_snapshot: Dict[str, Any] = roles.get("apt_config", {}) dnf_config_snapshot: Dict[str, Any] = roles.get("dnf_config", {}) firewall_runtime_snapshot: Dict[str, Any] = roles.get("firewall_runtime", {}) @@ -941,11 +871,8 @@ def _manifest_from_bundle_dir( os.path.join(out_dir, "inventory", "hosts.ini"), fqdn or "" ) _ensure_ansible_cfg(os.path.join(out_dir, "ansible.cfg")) - _ensure_requirements_yaml(os.path.join(out_dir, "requirements.yml")) manifested_users_roles: List[str] = [] - manifested_flatpak_roles: List[str] = [] - manifested_snap_roles: List[str] = [] manifested_apt_config_roles: List[str] = [] manifested_dnf_config_roles: List[str] = [] manifested_firewall_runtime_roles: List[str] = [] @@ -958,7 +885,7 @@ def _manifest_from_bundle_dir( # ------------------------- # Users role (non-system users) # ------------------------- - if users_snapshot: + if users_snapshot and users_snapshot.get("users"): role = users_snapshot.get("role_name", "users") role_dir = os.path.join(roles_root, role) _write_role_scaffold(role_dir) @@ -1043,33 +970,6 @@ def _manifest_from_bundle_dir( } ) - # Build Flatpak and Snap lists. Flatpak can be installed system-wide or - # per-user. Snap packages are system-wide; per-user ~/snap/* directories - # are runtime/user data and are not treated as install sources. - users_flatpaks: List[Dict[str, Any]] = [] - user_flatpak_map = users_snapshot.get("user_flatpaks", {}) or {} - home_by_user = { - str(u.get("name")): str(u.get("home") or "") for u in users_data - } - for uname, flatpaks in user_flatpak_map.items(): - for fp in flatpaks or []: - users_flatpaks.append( - _normalise_flatpak_item( - fp, - method="user", - user=str(uname), - home=home_by_user.get(str(uname)) or None, - ) - ) - - flatpak_remotes = [ - _normalise_flatpak_remote(r) - for r in (users_snapshot.get("user_flatpak_remotes", []) or []) - ] - users_needs_community = bool(flatpak_remotes or users_flatpaks) - if users_needs_community: - _ensure_requirements_yaml(os.path.join(out_dir, "requirements.yml")) - # Variables are host-specific in site mode; in non-site mode they live in role defaults. if site_mode: _write_role_defaults( @@ -1078,8 +978,6 @@ def _manifest_from_bundle_dir( "users_groups": [], "users_users": [], "users_ssh_files": [], - "users_flatpaks": [], - "users_flatpak_remotes": [], }, ) _write_hostvars( @@ -1090,8 +988,6 @@ def _manifest_from_bundle_dir( "users_groups": group_names, "users_users": users_data, "users_ssh_files": ssh_files, - "users_flatpaks": users_flatpaks, - "users_flatpak_remotes": flatpak_remotes, }, ) else: @@ -1101,23 +997,13 @@ def _manifest_from_bundle_dir( "users_groups": group_names, "users_users": users_data, "users_ssh_files": ssh_files, - "users_flatpaks": users_flatpaks, - "users_flatpak_remotes": flatpak_remotes, }, ) with open( os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8" ) as f: - if users_needs_community: - f.write( - "---\n" - "dependencies: []\n" - "collections:\n" - " - community.general\n" - ) - else: - f.write("---\ndependencies: []\n") + f.write("---\ndependencies: []\n") # tasks (data-driven) users_tasks = """--- @@ -1170,52 +1056,6 @@ def _manifest_from_bundle_dir( group: "{{ item.group }}" mode: "{{ item.mode }}" loop: "{{ users_ssh_files | default([]) }}" - -""" - - if flatpak_remotes or users_flatpaks: - users_tasks += """ -- name: Ensure user Flatpak remotes exist - ansible.builtin.command: - argv: - - flatpak - - remote-add - - --user - - --if-not-exists - - "{{ item.name }}" - - "{{ item.url }}" - loop: "{{ users_flatpak_remotes | default([]) | selectattr('method', 'equalto', 'user') | list }}" - when: - - item.name is defined - - item.url is defined - - item.url | length > 0 - - item.user is defined - become: true - become_user: "{{ item.user }}" - environment: - HOME: "{{ item.home | default('/home/' ~ item.user, true) }}" - XDG_DATA_HOME: "{{ (item.home | default('/home/' ~ item.user, true)) ~ '/.local/share' }}" - changed_when: false - -- name: Install user Flatpaks - community.general.flatpak: - name: - - "{{ item.name }}" - state: present - method: user - remote: "{{ item.remote | default(omit) }}" - from_url: "{{ item.from_url | default(omit) }}" - loop: "{{ users_flatpaks | default([]) }}" - when: - - item.name is defined - - item.name | length > 0 - - item.user is defined - become: true - become_user: "{{ item.user }}" - environment: - HOME: "{{ item.home | default('/home/' ~ item.user, true) }}" - XDG_DATA_HOME: "{{ (item.home | default('/home/' ~ item.user, true)) ~ '/.local/share' }}" - """ with open( @@ -1228,67 +1068,10 @@ def _manifest_from_bundle_dir( ) as f: f.write("---\n") - def _fmt_app_list(items: List[Dict[str, Any]]) -> str: - lines = [] - for item in items: - name = item.get("name") - if not name: - continue - detail_parts = [] - for key in ("remote", "channel", "revision", "branch", "arch"): - value = item.get(key) - if value not in (None, "", []): - detail_parts.append(f"{key}={value}") - for key in ("classic", "devmode", "dangerous"): - if item.get(key): - detail_parts.append(key) - details = f" ({', '.join(detail_parts)})" if detail_parts else "" - lines.append(f"- {name}{details}") - return "\n".join(lines) or "- (none)" - - def _fmt_user_flatpaks(items: List[Dict[str, Any]]) -> str: - lines = [] - for item in items: - name = item.get("name") - user = item.get("user") - if not name or not user: - continue - detail_parts = [] - for key in ("remote", "branch", "arch"): - value = item.get(key) - if value not in (None, "", []): - detail_parts.append(f"{key}={value}") - details = f" ({', '.join(detail_parts)})" if detail_parts else "" - lines.append(f"- {user}: {name}{details}") - return "\n".join(lines) or "- (none)" - - def _fmt_remotes(items: List[Dict[str, Any]]) -> str: - lines = [] - for item in items: - name = item.get("name") - url = item.get("url") - method = item.get("method") or "system" - user = item.get("user") - if not name or not url: - continue - owner = f"user={user}" if user else "system" - lines.append(f"- {name} ({method}, {owner}): {url}") - return "\n".join(lines) or "- (none)" - readme = ( """# users -Generated non-system user accounts, SSH public material, and per-user Flatpak -applications/remotes. - -**Note:** User Flatpak tasks require the `community.general` Ansible collection. -Install it with: `ansible-galaxy collection install -r requirements.yml`. - -Flatpak `remote` is harvested from the installed deployment where detectable. -The original `.flatpakref` URL is generally not preserved by Flatpak after -installation, so `from_url` is only emitted if a future/hand-edited state file -contains it. - +Generated non-system user accounts and SSH public material. ## Users """ @@ -1306,14 +1089,6 @@ contains it. or "- (none)" ) + """\n -## Flatpak remotes -""" - + _fmt_remotes(flatpak_remotes) - + """\n -## User Flatpaks -""" - + _fmt_user_flatpaks(users_flatpaks) - + """\n ## Excluded """ + ( @@ -1331,274 +1106,6 @@ contains it. manifested_users_roles.append(role) - # ------------------------- - # Flatpak role (system-wide Flatpak remotes and applications) - # ------------------------- - raw_flatpak_apps = flatpak_snapshot.get("system_flatpaks", []) or [] - raw_flatpak_remotes = flatpak_snapshot.get("remotes", []) or [] - - if flatpak_snapshot: - role = flatpak_snapshot.get("role_name", "flatpak") - role_dir = os.path.join(roles_root, role) - _write_role_scaffold(role_dir) - _ensure_requirements_yaml(os.path.join(out_dir, "requirements.yml")) - - flatpak_system_flatpaks = [ - _normalise_flatpak_item(fp, method="system") for fp in raw_flatpak_apps - ] - flatpak_remotes = [_normalise_flatpak_remote(r) for r in raw_flatpak_remotes] - - vars_map = { - "flatpak_system_flatpaks": flatpak_system_flatpaks, - "flatpak_remotes": flatpak_remotes, - } - if site_mode: - _write_role_defaults( - role_dir, - {"flatpak_system_flatpaks": [], "flatpak_remotes": []}, - ) - _write_hostvars(out_dir, fqdn or "", role, vars_map) - else: - _write_role_defaults(role_dir, vars_map) - - with open( - os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8" - ) as f: - f.write( - "---\n" "dependencies: []\n" "collections:\n" " - community.general\n" - ) - - tasks = """--- - -- name: Ensure system Flatpak remotes exist - ansible.builtin.command: - argv: - - flatpak - - remote-add - - --system - - --if-not-exists - - "{{ item.name }}" - - "{{ item.url }}" - loop: "{{ flatpak_remotes | default([]) }}" - when: - - item.name is defined - - item.url is defined - - item.url | length > 0 - become: true - changed_when: false - -- name: Install system-wide Flatpaks - community.general.flatpak: - name: - - "{{ item.name }}" - state: present - method: system - remote: "{{ item.remote | default(omit) }}" - from_url: "{{ item.from_url | default(omit) }}" - loop: "{{ flatpak_system_flatpaks | default([]) }}" - when: - - item.name is defined - - item.name | length > 0 - become: true -""" - with open( - os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8" - ) as f: - f.write(tasks) - - with open( - os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8" - ) as f: - f.write("---\n") - - def _fmt_flatpak_apps(items: List[Dict[str, Any]]) -> str: - lines = [] - for item in items: - name = item.get("name") - if not name: - continue - detail_parts = [] - for key in ("remote", "branch", "arch"): - value = item.get(key) - if value not in (None, "", []): - detail_parts.append(f"{key}={value}") - details = f" ({', '.join(detail_parts)})" if detail_parts else "" - lines.append(f"- {name}{details}") - return "\n".join(lines) or "- (none)" - - def _fmt_flatpak_remotes(items: List[Dict[str, Any]]) -> str: - lines = [] - for item in items: - name = item.get("name") - url = item.get("url") - if not name or not url: - continue - lines.append(f"- {name}: {url}") - return "\n".join(lines) or "- (none)" - - notes = flatpak_snapshot.get("notes", []) or [] - readme = ( - """# flatpak - -Generated system-wide Flatpak remotes and applications. - -**Note:** This role requires the `community.general` Ansible collection. -Install it with: `ansible-galaxy collection install -r requirements.yml`. - -Flatpak `remote` is harvested from the installed deployment where detectable. -The original `.flatpakref` URL is generally not preserved by Flatpak after -installation, so `from_url` is only emitted if a future/hand-edited state file -contains it. - -## System Flatpak remotes -""" - + _fmt_flatpak_remotes(flatpak_remotes) - + """\n -## System-wide Flatpaks -""" - + _fmt_flatpak_apps(flatpak_system_flatpaks) - + """\n -## Notes -""" - + ("\n".join([f"- {n}" for n in notes]) or "- (none)") - + """\n""" - ) - with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f: - f.write(readme) - - manifested_flatpak_roles.append(role) - - # ------------------------- - # Snap role (system-wide snap packages) - # ------------------------- - raw_system_snaps = snap_snapshot.get("system_snaps", []) or [] - - if raw_system_snaps: - role = snap_snapshot.get("role_name", "snap") if snap_snapshot else "snap" - role_dir = os.path.join(roles_root, role) - _write_role_scaffold(role_dir) - _ensure_requirements_yaml(os.path.join(out_dir, "requirements.yml")) - - snap_system_snaps = [_normalise_snap_item(s) for s in raw_system_snaps] - - vars_map = {"snap_system_snaps": snap_system_snaps} - if site_mode: - _write_role_defaults(role_dir, {"snap_system_snaps": []}) - _write_hostvars(out_dir, fqdn or "", role, vars_map) - else: - _write_role_defaults(role_dir, vars_map) - - with open( - os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8" - ) as f: - f.write( - "---\n" "dependencies: []\n" "collections:\n" " - community.general\n" - ) - - tasks = """--- - -- name: Install system-wide snaps with full detected attributes - community.general.snap: - name: - - "{{ item.name }}" - state: present - channel: "{{ item.channel | default(omit) if not (item.install_revision | default(false)) else omit }}" - revision: "{{ item.revision | default(omit) if (item.install_revision | default(false)) else omit }}" - classic: "{{ item.classic | default(false) }}" - devmode: "{{ item.devmode | default(false) }}" - dangerous: "{{ item.dangerous | default(false) }}" - loop: "{{ snap_system_snaps | default([]) }}" - when: - - item.name is defined - - item.name | length > 0 - become: true - register: _enroll_snap_full_results - ignore_errors: true - -- name: Install system-wide snaps with compatibility options - community.general.snap: - name: - - "{{ item.item.name }}" - state: present - channel: "{{ item.item.channel | default(omit) if not (item.item.install_revision | default(false)) else omit }}" - classic: "{{ item.item.classic | default(false) }}" - loop: "{{ (_enroll_snap_full_results | default({})).results | default([]) }}" - when: - - item.failed | default(false) - - item.item.name is defined - - item.item.name | length > 0 - become: true - register: _enroll_snap_compat_results - ignore_errors: true - -- name: Install system-wide snaps with minimal options - community.general.snap: - name: - - "{{ item.item.item.name }}" - state: present - loop: "{{ (_enroll_snap_compat_results | default({})).results | default([]) }}" - when: - - item.failed | default(false) - - item.item.item.name is defined - - item.item.item.name | length > 0 - become: true - ignore_errors: true -""" - with open( - os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8" - ) as f: - f.write(tasks) - - with open( - os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8" - ) as f: - f.write("---\n") - - def _fmt_snap_apps(items: List[Dict[str, Any]]) -> str: - lines = [] - for item in items: - name = item.get("name") - if not name: - continue - detail_parts = [] - for key in ("channel", "revision"): - value = item.get(key) - if value not in (None, "", []): - detail_parts.append(f"{key}={value}") - for key in ("classic", "devmode", "dangerous"): - if item.get(key): - detail_parts.append(key) - details = f" ({', '.join(detail_parts)})" if detail_parts else "" - lines.append(f"- {name}{details}") - return "\n".join(lines) or "- (none)" - - notes = snap_snapshot.get("notes", []) or [] - readme = ( - """# snap - -Generated system-wide snap packages. - -**Note:** This role requires the `community.general` Ansible collection. -Install it with: `ansible-galaxy collection install -r requirements.yml`. - -The first install task uses all harvested attributes. If the installed -`community.general.snap` module is too old for some parameters, the generated -role falls back to reduced then minimal install tasks on a best-effort basis. - -## System-wide snaps -""" - + _fmt_snap_apps(snap_system_snaps) - + """\n -## Notes -""" - + ("\n".join([f"- {n}" for n in notes]) or "- (none)") - + """\n""" - ) - with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f: - f.write(readme) - - manifested_snap_roles.append(role) - # ------------------------- # apt_config role (APT sources, pinning, and keyrings) # ------------------------- @@ -2373,8 +1880,7 @@ User-requested extra file harvesting. # Service roles # ------------------------- for svc in services: - source_role = svc["role_name"] - role = avoid_reserved_role_name(source_role, prefix="service") + role = svc["role_name"] unit = svc["unit"] pkgs = svc.get("packages", []) or [] managed_files = svc.get("managed_files", []) or [] @@ -2393,7 +1899,7 @@ User-requested extra file harvesting. templated, jt_vars = _jinjify_managed_files( bundle_dir, - source_role, + role, role_dir, managed_files, jt_exe=jt_exe, @@ -2405,14 +1911,14 @@ User-requested extra file harvesting. if site_mode: _copy_artifacts( bundle_dir, - source_role, + role, _host_role_files_dir(out_dir, fqdn or "", role), exclude_rels=templated, ) else: _copy_artifacts( bundle_dir, - source_role, + role, os.path.join(role_dir, "files"), exclude_rels=templated, ) @@ -2646,8 +2152,7 @@ This role was created by merging simple packages using the `--merge-simple-packa # Process package roles (those with configuration files) for pr in package_roles: - source_role = pr["role_name"] - role = avoid_reserved_role_name(source_role, prefix="package") + role = pr["role_name"] pkg = pr.get("package") or "" managed_files = pr.get("managed_files", []) or [] managed_dirs = pr.get("managed_dirs", []) or [] @@ -2660,7 +2165,7 @@ This role was created by merging simple packages using the `--merge-simple-packa templated, jt_vars = _jinjify_managed_files( bundle_dir, - source_role, + role, role_dir, managed_files, jt_exe=jt_exe, @@ -2672,14 +2177,14 @@ This role was created by merging simple packages using the `--merge-simple-packa if site_mode: _copy_artifacts( bundle_dir, - source_role, + role, _host_role_files_dir(out_dir, fqdn or "", role), exclude_rels=templated, ) else: _copy_artifacts( bundle_dir, - source_role, + role, os.path.join(role_dir, "files"), exclude_rels=templated, ) @@ -2789,8 +2294,6 @@ Generated for package `{pkg}`. + manifested_etc_custom_roles + manifested_usr_local_custom_roles + manifested_extra_paths_roles - + manifested_flatpak_roles - + manifested_snap_roles + manifested_users_roles + tail_roles + manifested_firewall_runtime_roles diff --git a/enroll/role_names.py b/enroll/role_names.py deleted file mode 100644 index b3fa584..0000000 --- a/enroll/role_names.py +++ /dev/null @@ -1,28 +0,0 @@ -from __future__ import annotations - -RESERVED_SINGLETON_ROLE_NAMES = { - "users", - "flatpak", - "snap", - "apt_config", - "dnf_config", - "firewall_runtime", - "etc_custom", - "usr_local_custom", - "extra_paths", - "common_packages", -} - - -def avoid_reserved_role_name(role_name: str, *, prefix: str) -> str: - """Return a role name that cannot collide with singleton roles. - - Singleton roles are generated once per manifest from dedicated top-level - state sections. Package and service roles can naturally have the same names - as those singletons, e.g. the OS package named ``flatpak``. Prefix those - generated package/service roles so they cannot overwrite singleton role - directories during manifestation. - """ - if role_name in RESERVED_SINGLETON_ROLE_NAMES: - return f"{prefix}_{role_name}" - return role_name diff --git a/enroll/schema/state.schema.json b/enroll/schema/state.schema.json index 482b014..d8c136a 100644 --- a/enroll/schema/state.schema.json +++ b/enroll/schema/state.schema.json @@ -575,21 +575,6 @@ "$ref": "#/$defs/UserEntry" }, "type": "array" - }, - "user_flatpaks": { - "additionalProperties": { - "type": "array", - "items": { - "$ref": "#/$defs/FlatpakInstall" - } - }, - "type": "object" - }, - "user_flatpak_remotes": { - "type": "array", - "items": { - "$ref": "#/$defs/FlatpakRemote" - } } }, "required": [ @@ -671,224 +656,6 @@ "notes" ], "type": "object" - }, - "FlatpakInstall": { - "additionalProperties": false, - "properties": { - "name": { - "type": "string", - "minLength": 1 - }, - "method": { - "type": "string", - "enum": [ - "system", - "user" - ] - }, - "remote": { - "type": [ - "string", - "null" - ] - }, - "branch": { - "type": [ - "string", - "null" - ] - }, - "arch": { - "type": [ - "string", - "null" - ] - }, - "kind": { - "type": [ - "string", - "null" - ], - "enum": [ - "app", - "runtime", - null - ] - }, - "ref": { - "type": [ - "string", - "null" - ] - }, - "user": { - "type": [ - "string", - "null" - ] - }, - "home": { - "type": [ - "string", - "null" - ] - }, - "source": { - "type": "string" - }, - "from_url": { - "type": "string", - "minLength": 1 - } - }, - "required": [ - "name", - "method" - ], - "type": "object" - }, - "FlatpakRemote": { - "additionalProperties": false, - "properties": { - "name": { - "type": "string", - "minLength": 1 - }, - "method": { - "type": "string", - "enum": [ - "system", - "user" - ] - }, - "url": { - "type": "string", - "minLength": 1 - }, - "user": { - "type": [ - "string", - "null" - ] - }, - "home": { - "type": [ - "string", - "null" - ] - }, - "source": { - "type": "string" - } - }, - "required": [ - "name", - "method", - "url" - ], - "type": "object" - }, - "SnapInstall": { - "additionalProperties": false, - "properties": { - "name": { - "type": "string", - "minLength": 1 - }, - "channel": { - "type": [ - "string", - "null" - ] - }, - "revision": { - "type": [ - "integer", - "null" - ], - "minimum": 0 - }, - "classic": { - "type": "boolean" - }, - "devmode": { - "type": "boolean" - }, - "dangerous": { - "type": "boolean" - }, - "notes": { - "type": "array", - "items": { - "type": "string" - } - }, - "source": { - "type": "string" - }, - "install_revision": { - "type": "boolean" - } - }, - "required": [ - "name" - ], - "type": "object" - }, - "FlatpakSnapshot": { - "additionalProperties": false, - "properties": { - "role_name": { - "const": "flatpak" - }, - "system_flatpaks": { - "type": "array", - "items": { - "$ref": "#/$defs/FlatpakInstall" - } - }, - "remotes": { - "type": "array", - "items": { - "$ref": "#/$defs/FlatpakRemote" - } - }, - "notes": { - "type": "array", - "items": { - "type": "string" - } - } - }, - "required": [ - "role_name" - ], - "type": "object" - - }, - "SnapSnapshot": { - "additionalProperties": false, - "properties": { - "role_name": { - "const": "snap" - }, - "system_snaps": { - "type": "array", - "items": { - "$ref": "#/$defs/SnapInstall" - } - }, - "notes": { - "type": "array", - "items": { - "type": "string" - } - } - }, - "required": [ - "role_name" - ], - "type": "object" } }, "$id": "https://enroll.sh/schema/state.schema.json", @@ -999,12 +766,6 @@ }, "firewall_runtime": { "$ref": "#/$defs/FirewallRuntimeSnapshot" - }, - "flatpak": { - "$ref": "#/$defs/FlatpakSnapshot" - }, - "snap": { - "$ref": "#/$defs/SnapSnapshot" } }, "required": [ diff --git a/tests.sh b/tests.sh index a4fef8c..68f7007 100755 --- a/tests.sh +++ b/tests.sh @@ -44,13 +44,6 @@ poetry run \ --format json | jq DEBIAN_FRONTEND=noninteractive apt-get remove -y --purge cowsay -# Common simple packages mode (is tested later) -poetry run \ - enroll manifest \ - --harvest "${BUNDLE_DIR}2" \ - --out "${ANSIBLE_DIR}2" \ - --merge-simple-packages - # Ansible test builtin cd "${ANSIBLE_DIR}" # Lint @@ -59,8 +52,13 @@ ansible-lint "${ANSIBLE_DIR}" # Run ansible-playbook playbook.yml -i "localhost," -c local --check --diff -# Test the --merge-simple-packages mode +# Common simple packages mode +poetry run \ + enroll manifest \ + --harvest "${BUNDLE_DIR}2" \ + --out "${ANSIBLE_DIR}2" \ + --merge-simple-packages + builtin cd "${ANSIBLE_DIR}2" ls "${ANSIBLE_DIR}2/roles" - ansible-playbook playbook.yml -i "localhost," -c local --check --diff diff --git a/tests/test_accounts.py b/tests/test_accounts.py index a78c5f6..36e5af9 100644 --- a/tests/test_accounts.py +++ b/tests/test_accounts.py @@ -312,185 +312,3 @@ def test_parse_group_handles_short_lines(tmp_path: Path): assert 1000 in gid_to_name assert 1001 not in gid_to_name # skipped due to short line assert 1002 in gid_to_name - - -def test_find_flatpaks_in_root_detects_remote_branch_and_arch(tmp_path: Path): - import enroll.accounts as a - - root = tmp_path / "flatpak" - (root / "repo").mkdir(parents=True) - (root / "repo" / "config").write_text( - '[remote "acme"]\nurl=https://flatpak.example/repo/\n', - encoding="utf-8", - ) - ref = ( - root - / "repo" - / "refs" - / "remotes" - / "acme" - / "app" - / "com.example.App" - / "x86_64" - / "stable" - ) - ref.parent.mkdir(parents=True) - ref.write_text("checksum\n", encoding="utf-8") - active = root / "app" / "com.example.App" / "x86_64" / "stable" / "active" - active.mkdir(parents=True) - - remotes = a.find_flatpak_remotes(str(root), method="system") - assert [(r.name, r.url, r.method) for r in remotes] == [ - ("acme", "https://flatpak.example/repo/", "system") - ] - - apps = a._find_flatpaks_in_root(str(root), method="system") - assert len(apps) == 1 - assert apps[0].name == "com.example.App" - assert apps[0].remote == "acme" - assert apps[0].branch == "stable" - assert apps[0].arch == "x86_64" - - -def test_parse_snap_list_output_detects_channel_revision_and_modes(): - import enroll.accounts as a - - output = """Name Version Rev Tracking Publisher Notes -code abc 123 latest/stable vscode✓ classic -mydev 1.0 42 latest/edge example devmode,dangerous -bare 1.0 5 latest/stable canonical✓ base -""" - - snaps = {snap.name: snap for snap in a._parse_snap_list_output(output)} - assert snaps["code"].channel == "latest/stable" - assert snaps["code"].revision == 123 - assert snaps["code"].classic is True - assert snaps["mydev"].devmode is True - assert snaps["mydev"].dangerous is True - assert snaps["bare"].notes == ["base"] - - -def test_parse_flatpak_list_output_detects_system_refs(): - from enroll.accounts import _parse_flatpak_list_output - - output = "\n".join( - [ - "app/org.example.App/x86_64/stable\tflathub\tstable\tx86_64", - "runtime/org.freedesktop.Platform/x86_64/24.08\tflathub\t24.08\tx86_64", - ] - ) - - refs = _parse_flatpak_list_output( - output, method="system", columns=("ref", "origin", "branch", "arch") - ) - - assert [(r.kind, r.name, r.remote, r.branch, r.arch) for r in refs] == [ - ("app", "org.example.App", "flathub", "stable", "x86_64"), - ("runtime", "org.freedesktop.Platform", "flathub", "24.08", "x86_64"), - ] - assert refs[0].source == "flatpak-list" - - -def test_find_system_flatpaks_prefers_flatpak_list(monkeypatch): - import subprocess - import enroll.accounts as a - - calls = [] - - def fake_run(args, **kwargs): - calls.append(args) - if args == ["flatpak", "list", "--columns=help"]: - return subprocess.CompletedProcess( - args, - 0, - stdout="application\norigin\nbranch\narch\n", - stderr="", - ) - return subprocess.CompletedProcess( - args, - 0, - stdout="app/org.example.App/x86_64/stable\tacme\tstable\tx86_64\n", - stderr="", - ) - - monkeypatch.setattr(a.shutil, "which", lambda cmd: "/usr/bin/flatpak") - monkeypatch.setattr(a.subprocess, "run", fake_run) - monkeypatch.setattr( - a, - "_find_flatpaks_in_root", - lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("fallback used")), - ) - - refs = a.find_system_flatpaks() - - assert calls[0] == ["flatpak", "list", "--columns=help"] - assert calls[1][:3] == ["flatpak", "list", "--system"] - assert refs[0].name == "org.example.App" - assert refs[0].method == "system" - assert refs[0].remote == "acme" - - -def test_parse_flatpak_list_output_detects_application_columns(): - from enroll.accounts import _parse_flatpak_list_output - - output = "org.example.App\tflathub\tstable\tx86_64\n" - refs = _parse_flatpak_list_output( - output, method="system", columns=("application", "origin", "branch", "arch") - ) - - assert len(refs) == 1 - assert refs[0].name == "org.example.App" - assert refs[0].kind is None - assert refs[0].remote == "flathub" - assert refs[0].branch == "stable" - assert refs[0].arch == "x86_64" - - -def test_parse_plain_flatpak_list_output_like_default_table(): - from enroll.accounts import _parse_flatpak_list_output - - output = """Name Application ID Version Branch Installation -Mesa org.freedesktop.Platform.GL.default 26.0.6 25.08 system -Mesa (Extra) org.freedesktop.Platform.GL.default 26.0.6 25.08-extra system -Codecs Extra Extension org.freedesktop.Platform.codecs-extra 25.08-extra system -KDE Application Platform org.kde.Platform 6.10 system -OnionShare org.onionshare.OnionShare 2.6.4 stable system -""" - - refs = _parse_flatpak_list_output(output, method="system", columns=None) - by_name_branch = {(r.name, r.branch) for r in refs} - - assert ("org.onionshare.OnionShare", "stable") in by_name_branch - assert ("org.freedesktop.Platform.GL.default", "25.08") in by_name_branch - assert ("org.freedesktop.Platform.GL.default", "25.08-extra") in by_name_branch - assert ("org.kde.Platform", "6.10") in by_name_branch - - -def test_parse_flatpak_columns_help_handles_description_table(): - from enroll.accounts import _parse_flatpak_columns_help - - output = """ -Available columns: - application The application ID - branch The branch - installation The installation -""" - - assert _parse_flatpak_columns_help(output) >= { - "application", - "branch", - "installation", - } - - -def test_flatpak_list_attempts_respect_supported_columns(): - from enroll.accounts import _flatpak_list_attempts - - attempts = _flatpak_list_attempts( - "--system", {"application", "branch", "installation"} - ) - command_strings = [" ".join(args) for args, _columns in attempts] - - assert any("--columns=application,branch" in cmd for cmd in command_strings) - assert not any("origin" in cmd for cmd in command_strings) - assert command_strings[-1] == "flatpak list --system" diff --git a/tests/test_harvest.py b/tests/test_harvest.py index 1b50da5..93dfd90 100644 --- a/tests/test_harvest.py +++ b/tests/test_harvest.py @@ -224,19 +224,6 @@ def test_harvest_dedup_manual_packages_and_builds_etc_custom( monkeypatch.setattr(harvest, "collect_non_system_users", lambda: []) - import enroll.accounts as accounts - - monkeypatch.setattr(accounts, "find_system_flatpaks", lambda: []) - monkeypatch.setattr(accounts, "find_system_flatpak_remotes", lambda: []) - monkeypatch.setattr( - accounts, "find_user_flatpak_remotes", lambda home, user=None: [] - ) - monkeypatch.setattr( - accounts, - "find_system_snaps", - lambda: [accounts.SnapInstall(name="code", channel="latest/stable")], - ) - def fake_stat_triplet(p: str): if p == "/usr/local/bin/myscript": return ("root", "root", "0755") @@ -272,9 +259,6 @@ def test_harvest_dedup_manual_packages_and_builds_etc_custom( for o in openvpn_obs ) - assert st["roles"]["snap"]["role_name"] == "snap" - assert st["roles"]["snap"]["system_snaps"][0]["name"] == "code" - # Service role captured modified conffile svc = st["roles"]["services"][0] assert svc["unit"] == "openvpn.service" diff --git a/tests/test_harvest_helpers.py b/tests/test_harvest_helpers.py index 53e7d58..a0d2c91 100644 --- a/tests/test_harvest_helpers.py +++ b/tests/test_harvest_helpers.py @@ -286,20 +286,3 @@ def test_collect_firewall_runtime_snapshot_is_per_family_fallback( assert ( tmp_path / "artifacts" / "firewall_runtime" / "firewall" / "iptables.v6" ).exists() - - -def test_package_role_names_do_not_collide_with_singleton_roles(): - from enroll.harvest import _role_name_from_pkg - - assert _role_name_from_pkg("flatpak") == "package_flatpak" - assert _role_name_from_pkg("snap") == "package_snap" - assert _role_name_from_pkg("users") == "package_users" - assert _role_name_from_pkg("nginx") == "nginx" - - -def test_service_role_names_do_not_collide_with_singleton_roles(): - from enroll.harvest import _role_name_from_unit - - assert _role_name_from_unit("flatpak.service") == "service_flatpak" - assert _role_name_from_unit("users.service") == "service_users" - assert _role_name_from_unit("nginx.service") == "nginx" diff --git a/tests/test_manifest.py b/tests/test_manifest.py index a8eaf0f..1b78bcf 100644 --- a/tests/test_manifest.py +++ b/tests/test_manifest.py @@ -1064,317 +1064,3 @@ def test_render_firewall_runtime_tasks_with_ipv6(): } result = manifest._render_firewall_runtime_tasks(state) assert len(result) >= 1 - - -def test_manifest_renders_flatpak_and_snap_details(tmp_path: Path): - bundle = tmp_path / "bundle" - out = tmp_path / "ansible" - state = { - "schema_version": 3, - "host": {"hostname": "test", "os": "debian", "pkg_backend": "dpkg"}, - "inventory": {"packages": {}}, - "roles": { - "users": { - "role_name": "users", - "users": [ - { - "name": "alice", - "uid": 1000, - "gid": 1000, - "gecos": "Alice", - "home": "/home/alice", - "shell": "/bin/bash", - "primary_group": "alice", - "supplementary_groups": [], - } - ], - "managed_files": [], - "excluded": [], - "notes": [], - "user_flatpak_remotes": [ - { - "name": "acme-user", - "method": "user", - "url": "https://flatpak.example/user-repo/", - "user": "alice", - "home": "/home/alice", - }, - ], - "user_flatpaks": { - "alice": [ - { - "name": "org.example.UserApp", - "method": "user", - "remote": "acme-user", - "branch": "stable", - "arch": "x86_64", - } - ] - }, - }, - "flatpak": { - "role_name": "flatpak", - "remotes": [ - { - "name": "acme", - "method": "system", - "url": "https://flatpak.example/repo/", - }, - ], - "system_flatpaks": [ - { - "name": "com.example.App", - "method": "system", - "remote": "acme", - "branch": "stable", - "arch": "x86_64", - } - ], - "notes": [], - }, - "snap": { - "role_name": "snap", - "system_snaps": [ - { - "name": "code", - "channel": "latest/stable", - "revision": 123, - "classic": True, - "notes": ["classic"], - } - ], - "notes": [], - }, - "services": [], - "packages": [], - "apt_config": { - "role_name": "apt_config", - "managed_files": [], - "excluded": [], - "notes": [], - }, - "dnf_config": { - "role_name": "dnf_config", - "managed_files": [], - "excluded": [], - "notes": [], - }, - "etc_custom": { - "role_name": "etc_custom", - "managed_files": [], - "excluded": [], - "notes": [], - }, - "usr_local_custom": { - "role_name": "usr_local_custom", - "managed_files": [], - "excluded": [], - "notes": [], - }, - "extra_paths": { - "role_name": "extra_paths", - "include_patterns": [], - "exclude_patterns": [], - "managed_files": [], - "excluded": [], - "notes": [], - }, - }, - } - bundle.mkdir(parents=True, exist_ok=True) - (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") - - manifest.manifest(str(bundle), str(out)) - - users_defaults = (out / "roles" / "users" / "defaults" / "main.yml").read_text( - encoding="utf-8" - ) - users_tasks = (out / "roles" / "users" / "tasks" / "main.yml").read_text( - encoding="utf-8" - ) - users_readme = (out / "roles" / "users" / "README.md").read_text(encoding="utf-8") - flatpak_defaults = (out / "roles" / "flatpak" / "defaults" / "main.yml").read_text( - encoding="utf-8" - ) - flatpak_tasks = (out / "roles" / "flatpak" / "tasks" / "main.yml").read_text( - encoding="utf-8" - ) - snap_defaults = (out / "roles" / "snap" / "defaults" / "main.yml").read_text( - encoding="utf-8" - ) - snap_tasks = (out / "roles" / "snap" / "tasks" / "main.yml").read_text( - encoding="utf-8" - ) - - assert "users_flatpak_remotes:" in users_defaults - assert "remote: acme-user" in users_defaults - assert "community.general.snap" not in users_tasks - assert "Install system-wide snaps" not in users_tasks - assert "Install system-wide Flatpaks" not in users_tasks - assert "ansible-galaxy collection install -r requirements.yml" in users_readme - - assert "snap_system_snaps:" in snap_defaults - assert "channel: latest/stable" in snap_defaults - assert "classic: true" in snap_defaults - assert "community.general.snap" in snap_tasks - assert "Install system-wide snaps with full detected attributes" in snap_tasks - assert "Install system-wide snaps with compatibility options" in snap_tasks - assert "Install system-wide snaps with minimal options" in snap_tasks - assert "ignore_errors: true" in snap_tasks - - assert "flatpak_system_flatpaks:" in flatpak_defaults - assert "remote: acme" in flatpak_defaults - assert "community.general.flatpak" in flatpak_tasks - assert "Install system-wide Flatpaks" in flatpak_tasks - assert (out / "requirements.yml").exists() - - -def test_users_role_without_portable_apps_omits_community_general_tasks(tmp_path): - bundle = tmp_path / "bundle" - out = tmp_path / "out" - state = { - "roles": { - "users": { - "role_name": "users", - "users": [ - { - "name": "alice", - "uid": 1000, - "gid": 1000, - "gecos": "Alice", - "home": "/home/alice", - "shell": "/bin/bash", - "primary_group": "alice", - "supplementary_groups": [], - } - ], - "managed_files": [], - "excluded": [], - "notes": [], - }, - "services": [], - "packages": [], - }, - } - bundle.mkdir(parents=True, exist_ok=True) - (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") - - manifest.manifest(str(bundle), str(out)) - - users_tasks = (out / "roles" / "users" / "tasks" / "main.yml").read_text( - encoding="utf-8" - ) - users_meta = (out / "roles" / "users" / "meta" / "main.yml").read_text( - encoding="utf-8" - ) - - assert "community.general.flatpak" not in users_tasks - assert "community.general.snap" not in users_tasks - assert "collections:" not in users_meta - - -def test_manifest_emits_flatpak_role_even_when_no_flatpaks(tmp_path): - bundle = tmp_path / "bundle" - out = tmp_path / "out" - state = { - "roles": { - "users": { - "role_name": "users", - "users": [], - "managed_files": [], - "excluded": [], - "notes": [], - "user_flatpaks": {}, - "user_flatpak_remotes": [], - }, - "flatpak": { - "role_name": "flatpak", - "system_flatpaks": [], - "remotes": [], - "notes": [], - }, - "services": [], - "packages": [], - } - } - bundle.mkdir(parents=True, exist_ok=True) - (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") - - manifest.manifest(str(bundle), str(out)) - - flatpak_tasks = (out / "roles" / "flatpak" / "tasks" / "main.yml").read_text( - encoding="utf-8" - ) - flatpak_defaults = (out / "roles" / "flatpak" / "defaults" / "main.yml").read_text( - encoding="utf-8" - ) - - assert "flatpak_system_flatpaks: []" in flatpak_defaults - assert "flatpak_remotes: []" in flatpak_defaults - assert "Install system-wide Flatpaks" in flatpak_tasks - assert "Ensure system Flatpak remotes exist" in flatpak_tasks - - -def test_manifest_avoids_package_role_collision_with_flatpak_singleton(tmp_path): - bundle = tmp_path / "bundle" - out = tmp_path / "out" - state = { - "roles": { - "users": { - "role_name": "users", - "users": [], - "managed_files": [], - "excluded": [], - "notes": [], - "user_flatpaks": {}, - "user_flatpak_remotes": [], - }, - "flatpak": { - "role_name": "flatpak", - "remotes": [ - { - "name": "flathub", - "method": "system", - "url": "https://dl.flathub.org/repo/", - } - ], - "system_flatpaks": [ - { - "name": "org.onionshare.OnionShare", - "method": "system", - "remote": "flathub", - "branch": "stable", - "arch": "x86_64", - } - ], - "notes": [], - }, - "services": [], - "packages": [ - { - "package": "flatpak", - "role_name": "flatpak", - "managed_files": [], - "managed_dirs": [], - "managed_links": [], - "excluded": [], - "notes": [], - "has_config": True, - } - ], - } - } - bundle.mkdir(parents=True, exist_ok=True) - (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") - - manifest.manifest(str(bundle), str(out)) - - flatpak_defaults = (out / "roles" / "flatpak" / "defaults" / "main.yml").read_text( - encoding="utf-8" - ) - playbook = (out / "playbook.yml").read_text(encoding="utf-8") - - assert "org.onionshare.OnionShare" in flatpak_defaults - assert (out / "roles" / "package_flatpak" / "tasks" / "main.yml").exists() - assert "role: flatpak" in playbook - assert "role: package_flatpak" in playbook