diff --git a/CHANGELOG.md b/CHANGELOG.md index a6b840d..500e7c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ # 0.7.0 + * Add support for detecting flatpaks and snaps * Add --merge-simple-packages to reduce the number of roles, for packages that have no config files or services to maintain. # 0.6.0 diff --git a/enroll/accounts.py b/enroll/accounts.py index cf2fcd3..b4f774b 100644 --- a/enroll/accounts.py +++ b/enroll/accounts.py @@ -1,8 +1,48 @@ from __future__ import annotations +import configparser import os -from dataclasses import dataclass -from typing import Dict, List, Set, Tuple +import re +import shutil +import subprocess # nosec +from dataclasses import dataclass, field +from typing import Dict, List, Optional, Set, Tuple + + +@dataclass +class FlatpakInstall: + name: str + method: str + remote: Optional[str] = None + branch: Optional[str] = None + arch: Optional[str] = None + kind: Optional[str] = None + ref: Optional[str] = None + user: Optional[str] = None + home: Optional[str] = None + source: str = "filesystem" + + +@dataclass +class FlatpakRemote: + name: str + method: str + url: str + user: Optional[str] = None + home: Optional[str] = None + source: str = "filesystem" + + +@dataclass +class SnapInstall: + name: str + channel: Optional[str] = None + revision: Optional[int] = None + classic: bool = False + devmode: bool = False + dangerous: bool = False + notes: List[str] = field(default_factory=list) + source: str = "snap-list" @dataclass @@ -16,6 +56,7 @@ class UserRecord: primary_group: str supplementary_groups: List[str] ssh_files: List[str] + flatpaks: List[FlatpakInstall] = field(default_factory=list) def parse_login_defs(path: str = "/etc/login.defs") -> Dict[str, int]: @@ -115,6 +156,612 @@ def find_user_ssh_files(home: str) -> List[str]: return sorted(set(out)) +def _read_first_existing_text(paths: List[str]) -> Optional[str]: + for path in paths: + try: + with open(path, "r", encoding="utf-8", errors="replace") as f: + value = f.read().strip() + if value: + return value + except OSError: + continue + return None + + +def _parse_flatpak_ref( + ref: str, +) -> Tuple[Optional[str], str, Optional[str], Optional[str]]: + """Return (kind, name, arch, branch) for a Flatpak ref. + + refs look like app/org.example.App/x86_64/stable or + runtime/org.example.Platform/x86_64/23.08. If the value is already just an + application/runtime ID, keep it as the name and leave the other fields empty. + """ + parts = [p for p in (ref or "").strip().split("/") if p] + if len(parts) >= 4 and parts[0] in {"app", "runtime"}: + return parts[0], parts[1], parts[2], parts[3] + return None, (ref or "").strip(), None, None + + +def _parse_plain_flatpak_list_output( + output: str, + *, + method: str, + user: Optional[str] = None, + home: Optional[str] = None, +) -> List[FlatpakInstall]: + """Parse default `flatpak list` table output. + + Example: + Name Application ID Version Branch Installation + OnionShare org.onionshare.OnionShare 2.6.4 stable system + """ + out: List[FlatpakInstall] = [] + seen: Set[ + Tuple[str, Optional[str], Optional[str], Optional[str], Optional[str]] + ] = set() + id_re = re.compile(r"\b(?:[A-Za-z0-9_-]+\.)+[A-Za-z0-9_-]+\b") + for line in output.splitlines(): + line = line.rstrip() + if not line.strip(): + continue + if "Application ID" in line and "Installation" in line: + continue + match = id_re.search(line) + if not match: + continue + name = match.group(0) + tail = line[match.end() :].split() + installation = tail[-1] if tail else "" + if installation in {"system", "user"} and installation != method: + continue + branch = None + if len(tail) >= 2 and tail[-1] in {"system", "user"}: + branch = tail[-2] + elif tail: + branch = tail[-1] + + key = (name, None, branch, None, None) + if key in seen: + continue + seen.add(key) + out.append( + FlatpakInstall( + name=name, + method=method, + remote=None, + branch=branch, + arch=None, + kind=None, + ref=None, + user=user, + home=home, + source="flatpak-list", + ) + ) + return sorted(out, key=lambda f: (f.name, f.branch or "")) + + +def _parse_flatpak_list_output( + output: str, + *, + method: str, + columns: Optional[Tuple[str, ...]] = None, + user: Optional[str] = None, + home: Optional[str] = None, +) -> List[FlatpakInstall]: + """Parse Flatpak list output. + + If columns is None, parse the default table. Otherwise columns names must + match the order passed to `flatpak list --columns=...`. + """ + if columns is None: + return _parse_plain_flatpak_list_output( + output, method=method, user=user, home=home + ) + + out: List[FlatpakInstall] = [] + seen: Set[ + Tuple[str, Optional[str], Optional[str], Optional[str], Optional[str]] + ] = set() + for line in output.splitlines(): + line = line.strip() + if not line: + continue + lower = line.lower() + if lower.startswith("ref") or lower.startswith("application id"): + continue + + parts = line.split("\t") + if len(parts) < len(columns): + parts = line.split() + if not parts: + continue + + fields = { + name: parts[idx].strip() + for idx, name in enumerate(columns) + if idx < len(parts) + } + ref = fields.get("ref") or fields.get("application") or "" + kind, name, ref_arch, ref_branch = _parse_flatpak_ref(ref) + if not name: + continue + + remote = fields.get("origin") or None + branch = fields.get("branch") or ref_branch + arch = fields.get("arch") or ref_arch + if remote in {"", "-"}: + remote = None + if branch in {"", "-"}: + branch = None + if arch in {"", "-"}: + arch = None + + key = (name, remote, branch, arch, kind) + if key in seen: + continue + seen.add(key) + out.append( + FlatpakInstall( + name=name, + method=method, + remote=remote, + branch=branch, + arch=arch, + kind=kind, + ref=ref if "/" in ref else None, + user=user, + home=home, + source="flatpak-list", + ) + ) + return sorted( + out, + key=lambda f: ( + f.kind or "", + f.name, + f.remote or "", + f.branch or "", + f.arch or "", + ), + ) + + +_KNOWN_FLATPAK_LIST_COLUMNS = { + "name", + "description", + "application", + "version", + "branch", + "arch", + "origin", + "installation", + "ref", + "active", + "latest", + "size", + "options", +} + + +def _parse_flatpak_columns_help(output: str) -> Set[str]: + """Parse `flatpak list --columns=help` output into supported fields.""" + supported: Set[str] = set() + for line in output.splitlines(): + # Help output varies a bit between Flatpak versions. Treat any known + # token as a supported field, whether it appears alone or in a + # description table. + for token in re.findall(r"[A-Za-z_][A-Za-z0-9_-]*", line.lower()): + if token in _KNOWN_FLATPAK_LIST_COLUMNS: + supported.add(token) + return supported + + +def _run_flatpak_columns_help() -> Optional[Set[str]]: + if shutil.which("flatpak") is None: + return None + try: + proc = subprocess.run( # nosec + ["flatpak", "list", "--columns=help"], + shell=False, + check=False, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + timeout=10, + ) + except Exception: + return None + if proc.returncode != 0: + return None + supported = _parse_flatpak_columns_help(proc.stdout or "") + return supported or None + + +def _flatpak_list_attempts( + scope: str, supported: Optional[Set[str]] +) -> List[Tuple[List[str], Optional[Tuple[str, ...]]]]: + def supported_columns(*wanted: str) -> Optional[Tuple[str, ...]]: + if supported is not None and not set(wanted).issubset(supported): + return None + return tuple(wanted) + + column_sets: List[Tuple[str, ...]] = [] + for wanted in ( + ("application", "origin", "branch", "arch"), + ("application", "branch", "arch"), + ("application", "branch"), + ("application",), + ("ref", "origin", "branch", "arch"), + ("ref", "branch", "arch"), + ("ref", "branch"), + ("ref",), + ): + cols = supported_columns(*wanted) + if cols is not None and cols not in column_sets: + column_sets.append(cols) + + attempts: List[Tuple[List[str], Optional[Tuple[str, ...]]]] = [ + ( + ["flatpak", "list", scope, "--columns=" + ",".join(cols)], + cols, + ) + for cols in column_sets + ] + attempts.append((["flatpak", "list", scope], None)) + return attempts + + +def _run_flatpak_list(method: str) -> Optional[Tuple[str, Optional[Tuple[str, ...]]]]: + if shutil.which("flatpak") is None: + return None + + scope = "--system" if method == "system" else "--user" + supported = _run_flatpak_columns_help() + for args, columns in _flatpak_list_attempts(scope, supported): + try: + proc = subprocess.run( # nosec + args, + shell=False, + check=False, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + timeout=10, + ) + except Exception: # nosec B112 + continue + if proc.returncode == 0: + return proc.stdout or "", columns + return None + + +def _flatpak_remote_from_ref( + flatpak_root: str, app_id: str, arch: str, branch: str, remote_names: List[str] +) -> Optional[str]: + for remote_name in remote_names: + ref = os.path.join( + flatpak_root, + "repo", + "refs", + "remotes", + remote_name, + "app", + app_id, + arch, + branch, + ) + if os.path.exists(ref): + return remote_name + return None + + +def _parse_flatpak_deploy_origin(branch_dir: str) -> Optional[str]: + active_dir = os.path.join(branch_dir, "active") + candidates = [ + os.path.join(active_dir, "origin"), + os.path.join(active_dir, "metadata"), + ] + + origin = _read_first_existing_text([candidates[0]]) + if origin: + return origin + + metadata = candidates[1] + if os.path.isfile(metadata): + parser = configparser.ConfigParser(interpolation=None) + try: + parser.read(metadata, encoding="utf-8") + except Exception: + return None + for section in ("Application", "Runtime"): + if parser.has_option(section, "origin"): + value = parser.get(section, "origin", fallback="").strip() + if value: + return value + return None + + +def _find_flatpaks_in_root( + flatpak_root: str, + *, + method: str, + user: Optional[str] = None, + home: Optional[str] = None, +) -> List[FlatpakInstall]: + apps_dir = os.path.join(flatpak_root, "app") + if not os.path.isdir(apps_dir): + return [] + + remote_names = [ + r.name + for r in find_flatpak_remotes(flatpak_root, method=method, user=user, home=home) + ] + out: List[FlatpakInstall] = [] + + try: + app_ids = sorted(os.listdir(apps_dir)) + except OSError: + return [] + + seen: Set[Tuple[str, Optional[str], Optional[str], Optional[str]]] = set() + for app_id in app_ids: + app_path = os.path.join(apps_dir, app_id) + if not os.path.isdir(app_path): + continue + try: + arches = sorted(os.listdir(app_path)) + except OSError: + continue + for arch in arches: + arch_path = os.path.join(app_path, arch) + if not os.path.isdir(arch_path): + continue + try: + branches = sorted(os.listdir(arch_path)) + except OSError: + continue + for branch in branches: + branch_path = os.path.join(arch_path, branch) + if not os.path.isdir(branch_path): + continue + active_dir = os.path.join(branch_path, "active") + if not os.path.exists(active_dir): + continue + remote = _parse_flatpak_deploy_origin(branch_path) + if not remote: + remote = _flatpak_remote_from_ref( + flatpak_root, app_id, arch, branch, remote_names + ) + key = (app_id, remote, branch, arch) + if key in seen: + continue + seen.add(key) + out.append( + FlatpakInstall( + name=app_id, + method=method, + remote=remote, + branch=branch or None, + arch=arch or None, + kind="app", + ref=f"app/{app_id}/{arch}/{branch}", + user=user, + home=home, + ) + ) + + return sorted( + out, key=lambda f: (f.name, f.remote or "", f.branch or "", f.arch or "") + ) + + +def find_flatpak_remotes( + flatpak_root: str, + *, + method: str, + user: Optional[str] = None, + home: Optional[str] = None, +) -> List[FlatpakRemote]: + """Return configured Flatpak remotes for a Flatpak installation root. + + Flatpak stores remotes in the OSTree repo config. This gives us the remote + names and repository URLs. It does not reliably preserve the original + .flatpakref/.flatpakrepo URL that was used during installation. + """ + config_path = os.path.join(flatpak_root, "repo", "config") + if not os.path.isfile(config_path): + return [] + + parser = configparser.ConfigParser(interpolation=None, strict=False) + try: + parser.read(config_path, encoding="utf-8") + except Exception: + return [] + + out: List[FlatpakRemote] = [] + for section in parser.sections(): + match = re.fullmatch(r'remote\s+"(.+)"', section) + if not match: + continue + name = match.group(1).strip() + url = parser.get(section, "url", fallback="").strip() + if not name or not url: + continue + out.append( + FlatpakRemote( + name=name, + method=method, + url=url, + user=user, + home=home, + ) + ) + + return sorted(out, key=lambda r: (r.method, r.user or "", r.name)) + + +def find_user_flatpaks(home: str, user: Optional[str] = None) -> List[FlatpakInstall]: + """Return per-user Flatpak applications installed under a home directory.""" + flatpak_root = os.path.join(home, ".local", "share", "flatpak") + return _find_flatpaks_in_root(flatpak_root, method="user", user=user, home=home) + + +def find_user_flatpak_remotes( + home: str, user: Optional[str] = None +) -> List[FlatpakRemote]: + flatpak_root = os.path.join(home, ".local", "share", "flatpak") + return find_flatpak_remotes(flatpak_root, method="user", user=user, home=home) + + +def find_system_flatpaks() -> List[FlatpakInstall]: + """Return Flatpak refs installed system-wide. + + Prefer `flatpak list --system` because it is Flatpak's own view of + installed refs and includes layouts the filesystem scanner might miss. + Fall back to the on-disk app deployment tree when the command is + unavailable or produces unparsable output. + """ + listing = _run_flatpak_list("system") + if listing is not None: + output, columns = listing + parsed = _parse_flatpak_list_output(output, method="system", columns=columns) + if parsed or not output.strip(): + return parsed + return _find_flatpaks_in_root("/var/lib/flatpak", method="system") + + +def find_system_flatpak_remotes() -> List[FlatpakRemote]: + return find_flatpak_remotes("/var/lib/flatpak", method="system") + + +def _parse_snap_notes(notes: str) -> List[str]: + if not notes or notes == "-": + return [] + cleaned = notes.replace(",", " ").replace(";", " ") + return sorted( + {n.strip().lower() for n in cleaned.split() if n.strip() and n.strip() != "-"} + ) + + +def _parse_snap_list_output(output: str) -> List[SnapInstall]: + out: List[SnapInstall] = [] + for idx, line in enumerate(output.splitlines()): + line = line.strip() + if not line: + continue + if idx == 0 and line.lower().startswith("name"): + continue + parts = line.split(maxsplit=5) + if len(parts) < 5: + continue + name = parts[0] + revision: Optional[int] + try: + revision = int(parts[2]) + except ValueError: + revision = None + tracking = parts[3] + channel = None if tracking in {"-", ""} else tracking + notes = _parse_snap_notes(parts[5] if len(parts) > 5 else "") + out.append( + SnapInstall( + name=name, + channel=channel, + revision=revision, + classic="classic" in notes, + devmode="devmode" in notes, + dangerous="dangerous" in notes, + notes=notes, + source="snap-list", + ) + ) + return sorted(out, key=lambda s: s.name) + + +def _run_snap_list() -> Optional[str]: + if shutil.which("snap") is None: + return None + try: + proc = subprocess.run( # nosec + ["snap", "list"], + shell=False, + check=False, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + timeout=10, + ) + except Exception: + return None + if proc.returncode != 0: + return None + return proc.stdout or "" + + +def _find_system_snaps_from_filesystem() -> List[SnapInstall]: + snapd_snaps = "/var/lib/snapd/snaps" + if not os.path.isdir(snapd_snaps): + return [] + + current_revisions: Dict[str, int] = {} + snap_mounts = "/snap" + if os.path.isdir(snap_mounts): + try: + mount_names = os.listdir(snap_mounts) + except OSError: + mount_names = [] + for name in mount_names: + current = os.path.join(snap_mounts, name, "current") + try: + target = os.readlink(current) + except OSError: + continue + try: + current_revisions[name] = int(os.path.basename(target.rstrip("/"))) + except ValueError: + continue + + candidates: Dict[str, List[int]] = {} + try: + entries = os.listdir(snapd_snaps) + except OSError: + return [] + + for entry in entries: + if not entry.endswith(".snap") or "_" not in entry: + continue + name, rev_text = entry[:-5].rsplit("_", 1) + try: + revision = int(rev_text) + except ValueError: + continue + candidates.setdefault(name, []).append(revision) + + out: List[SnapInstall] = [] + for name, revisions in candidates.items(): + revision = current_revisions.get(name) + if revision is None: + revision = max(revisions) + out.append(SnapInstall(name=name, revision=revision, source="filesystem")) + return sorted(out, key=lambda s: s.name) + + +def find_system_snaps() -> List[SnapInstall]: + """Return system-wide snap packages. + + Prefer `snap list` because it exposes channel tracking and confinement notes. + Fall back to snapd's on-disk snap filenames when the command is unavailable. + """ + output = _run_snap_list() + if output is not None: + parsed = _parse_snap_list_output(output) + if parsed: + return parsed + return _find_system_snaps_from_filesystem() + + def collect_non_system_users() -> List[UserRecord]: defs = parse_login_defs() uid_min = defs.get("UID_MIN", 1000) @@ -139,6 +786,10 @@ def collect_non_system_users() -> List[UserRecord]: ssh_files = find_user_ssh_files(home) if home and home.startswith("/") else [] + flatpaks: List[FlatpakInstall] = [] + if home and home.startswith("/"): + flatpaks = find_user_flatpaks(home, user=name) + users.append( UserRecord( name=name, @@ -150,6 +801,7 @@ def collect_non_system_users() -> List[UserRecord]: primary_group=primary_group, supplementary_groups=supp, ssh_files=ssh_files, + flatpaks=flatpaks, ) ) diff --git a/enroll/harvest.py b/enroll/harvest.py index a9d5526..01d3ff7 100644 --- a/enroll/harvest.py +++ b/enroll/harvest.py @@ -10,8 +10,9 @@ import stat import subprocess # nosec import time from dataclasses import dataclass, asdict, field -from typing import Dict, List, Optional, Set, Tuple +from typing import Any, Dict, List, Optional, Set, Tuple +from .role_names import avoid_reserved_role_name from .systemd import ( list_enabled_services, list_enabled_timers, @@ -101,6 +102,23 @@ class UsersSnapshot: managed_files: List[ManagedFile] = field(default_factory=list) excluded: List[ExcludedFile] = field(default_factory=list) notes: List[str] = field(default_factory=list) + user_flatpaks: Dict[str, List[Dict[str, Any]]] = field(default_factory=dict) + user_flatpak_remotes: List[Dict[str, Any]] = field(default_factory=list) + + +@dataclass +class FlatpakSnapshot: + role_name: str + system_flatpaks: List[Dict[str, Any]] = field(default_factory=list) + remotes: List[Dict[str, Any]] = field(default_factory=list) + notes: List[str] = field(default_factory=list) + + +@dataclass +class SnapSnapshot: + role_name: str + system_snaps: List[Dict[str, Any]] = field(default_factory=list) + notes: List[str] = field(default_factory=list) @dataclass @@ -364,11 +382,11 @@ def _role_id(raw: str) -> str: def _role_name_from_unit(unit: str) -> str: base = _role_id(unit.removesuffix(".service")) - return _safe_name(base) + return avoid_reserved_role_name(_safe_name(base), prefix="service") def _role_name_from_pkg(pkg: str) -> str: - return _safe_name(pkg) + return avoid_reserved_role_name(_safe_name(pkg), prefix="package") def _copy_into_bundle( @@ -1808,6 +1826,30 @@ def harvest( user_records = [] users_notes.append(f"Failed to enumerate users: {e!r}") + # Detect system-wide Flatpaks/Snaps and configured Flatpak remotes. + from .accounts import ( + find_system_flatpak_remotes, + find_system_flatpaks, + find_system_snaps, + find_user_flatpak_remotes, + ) + + system_flatpaks = [asdict(f) for f in find_system_flatpaks()] + system_snaps = [asdict(s) for s in find_system_snaps()] + system_flatpak_remotes = [asdict(r) for r in find_system_flatpak_remotes()] + flatpak_notes: List[str] = [] + snap_notes: List[str] = [] + if system_flatpaks: + flatpak_notes.append( + "System-wide flatpaks detected: " + + ", ".join(str(f.get("name")) for f in system_flatpaks) + ) + if system_snaps: + snap_notes.append( + "System-wide snaps detected: " + + ", ".join(str(s.get("name")) for s in system_snaps) + ) + users_role_name = "users" users_role_seen = seen_by_role.setdefault(users_role_name, set()) @@ -1823,6 +1865,9 @@ def harvest( (".bash_aliases", "user_shell_aliases"), ] + user_flatpaks_map: Dict[str, List[Dict[str, Any]]] = {} + user_flatpak_remotes: List[Dict[str, Any]] = [] + for u in user_records: users_list.append( { @@ -1899,12 +1944,36 @@ def harvest( seen_global=captured_global, ) + # Collect per-user Flatpak applications and remotes. Snap packages are + # system-wide; ~/snap/* is user data, not an install source. + if u.flatpaks: + user_flatpaks_map[u.name] = [asdict(fp) for fp in u.flatpaks] + if home and home.startswith("/"): + user_flatpak_remotes.extend( + asdict(r) for r in find_user_flatpak_remotes(home, user=u.name) + ) + users_snapshot = UsersSnapshot( role_name=users_role_name, users=users_list, managed_files=users_managed, excluded=users_excluded, notes=users_notes, + user_flatpaks=user_flatpaks_map, + user_flatpak_remotes=user_flatpak_remotes, + ) + + flatpak_snapshot = FlatpakSnapshot( + role_name="flatpak", + system_flatpaks=system_flatpaks, + remotes=system_flatpak_remotes, + notes=flatpak_notes, + ) + + snap_snapshot = SnapSnapshot( + role_name="snap", + system_snaps=system_snaps, + notes=snap_notes, ) # ------------------------- @@ -2512,6 +2581,8 @@ def harvest( }, "roles": { "users": asdict(users_snapshot), + "flatpak": asdict(flatpak_snapshot), + "snap": asdict(snap_snapshot), "services": [asdict(s) for s in service_snaps], "packages": [asdict(p) for p in pkg_snaps], "apt_config": asdict(apt_config_snapshot), diff --git a/enroll/manifest.py b/enroll/manifest.py index 97f8caf..219ea64 100644 --- a/enroll/manifest.py +++ b/enroll/manifest.py @@ -10,6 +10,8 @@ import tempfile from pathlib import Path from typing import Any, Dict, List, Optional, Set, Tuple +from .role_names import avoid_reserved_role_name + from .jinjaturtle import ( can_jinjify_path, find_jinjaturtle_cmd, @@ -229,6 +231,72 @@ def _ensure_ansible_cfg(cfg_path: str) -> None: return +def _ensure_requirements_yaml(req_path: str) -> None: + if not os.path.exists(req_path): + with open(req_path, "w", encoding="utf-8") as f: + f.write("---\n") + f.write("collections:\n") + f.write(" - name: community.general\n") + f.write(' version: ">=13.0.0"\n') + return + + +def _normalise_flatpak_item( + item: Any, + *, + method: str, + user: Optional[str] = None, + home: Optional[str] = None, +) -> Dict[str, Any]: + if isinstance(item, str): + out: Dict[str, Any] = {"name": item, "method": method} + elif isinstance(item, dict): + out = dict(item) + out.setdefault("method", method) + else: + out = {"name": str(item), "method": method} + if user: + out.setdefault("user", user) + if home: + out.setdefault("home", home) + return out + + +def _normalise_flatpak_remote(item: Any) -> Dict[str, Any]: + if isinstance(item, dict): + out = dict(item) + else: + out = {"name": str(item)} + out.setdefault("method", "system") + return out + + +def _normalise_snap_item(item: Any) -> Dict[str, Any]: + if isinstance(item, str): + out: Dict[str, Any] = {"name": item} + elif isinstance(item, dict): + out = dict(item) + else: + out = {"name": str(item)} + + notes = out.get("notes") or [] + if isinstance(notes, str): + notes = [notes] + notes_l = {str(n).lower() for n in notes} + out["classic"] = bool(out.get("classic") or "classic" in notes_l) + out["devmode"] = bool(out.get("devmode") or "devmode" in notes_l) + out["dangerous"] = bool(out.get("dangerous") or "dangerous" in notes_l) + + # The Ansible snap module's revision parameter pins/holds the snap. For + # ordinary store snaps that track a channel, preserve the channel instead + # of freezing every harvested host at today's revision. + if out.get("revision") is not None and not out.get("channel"): + out["install_revision"] = True + else: + out["install_revision"] = False + return out + + def _ensure_inventory_host(inv_path: str, fqdn: str) -> None: os.makedirs(os.path.dirname(inv_path), exist_ok=True) if not os.path.exists(inv_path): @@ -836,6 +904,8 @@ def _manifest_from_bundle_dir( services: List[Dict[str, Any]] = roles.get("services", []) package_roles: List[Dict[str, Any]] = roles.get("packages", []) users_snapshot: Dict[str, Any] = roles.get("users", {}) + flatpak_snapshot: Dict[str, Any] = roles.get("flatpak", {}) + snap_snapshot: Dict[str, Any] = roles.get("snap", {}) apt_config_snapshot: Dict[str, Any] = roles.get("apt_config", {}) dnf_config_snapshot: Dict[str, Any] = roles.get("dnf_config", {}) firewall_runtime_snapshot: Dict[str, Any] = roles.get("firewall_runtime", {}) @@ -871,8 +941,11 @@ def _manifest_from_bundle_dir( os.path.join(out_dir, "inventory", "hosts.ini"), fqdn or "" ) _ensure_ansible_cfg(os.path.join(out_dir, "ansible.cfg")) + _ensure_requirements_yaml(os.path.join(out_dir, "requirements.yml")) manifested_users_roles: List[str] = [] + manifested_flatpak_roles: List[str] = [] + manifested_snap_roles: List[str] = [] manifested_apt_config_roles: List[str] = [] manifested_dnf_config_roles: List[str] = [] manifested_firewall_runtime_roles: List[str] = [] @@ -885,7 +958,7 @@ def _manifest_from_bundle_dir( # ------------------------- # Users role (non-system users) # ------------------------- - if users_snapshot and users_snapshot.get("users"): + if users_snapshot: role = users_snapshot.get("role_name", "users") role_dir = os.path.join(roles_root, role) _write_role_scaffold(role_dir) @@ -970,6 +1043,33 @@ def _manifest_from_bundle_dir( } ) + # Build Flatpak and Snap lists. Flatpak can be installed system-wide or + # per-user. Snap packages are system-wide; per-user ~/snap/* directories + # are runtime/user data and are not treated as install sources. + users_flatpaks: List[Dict[str, Any]] = [] + user_flatpak_map = users_snapshot.get("user_flatpaks", {}) or {} + home_by_user = { + str(u.get("name")): str(u.get("home") or "") for u in users_data + } + for uname, flatpaks in user_flatpak_map.items(): + for fp in flatpaks or []: + users_flatpaks.append( + _normalise_flatpak_item( + fp, + method="user", + user=str(uname), + home=home_by_user.get(str(uname)) or None, + ) + ) + + flatpak_remotes = [ + _normalise_flatpak_remote(r) + for r in (users_snapshot.get("user_flatpak_remotes", []) or []) + ] + users_needs_community = bool(flatpak_remotes or users_flatpaks) + if users_needs_community: + _ensure_requirements_yaml(os.path.join(out_dir, "requirements.yml")) + # Variables are host-specific in site mode; in non-site mode they live in role defaults. if site_mode: _write_role_defaults( @@ -978,6 +1078,8 @@ def _manifest_from_bundle_dir( "users_groups": [], "users_users": [], "users_ssh_files": [], + "users_flatpaks": [], + "users_flatpak_remotes": [], }, ) _write_hostvars( @@ -988,6 +1090,8 @@ def _manifest_from_bundle_dir( "users_groups": group_names, "users_users": users_data, "users_ssh_files": ssh_files, + "users_flatpaks": users_flatpaks, + "users_flatpak_remotes": flatpak_remotes, }, ) else: @@ -997,13 +1101,23 @@ def _manifest_from_bundle_dir( "users_groups": group_names, "users_users": users_data, "users_ssh_files": ssh_files, + "users_flatpaks": users_flatpaks, + "users_flatpak_remotes": flatpak_remotes, }, ) with open( os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8" ) as f: - f.write("---\ndependencies: []\n") + if users_needs_community: + f.write( + "---\n" + "dependencies: []\n" + "collections:\n" + " - community.general\n" + ) + else: + f.write("---\ndependencies: []\n") # tasks (data-driven) users_tasks = """--- @@ -1056,6 +1170,52 @@ def _manifest_from_bundle_dir( group: "{{ item.group }}" mode: "{{ item.mode }}" loop: "{{ users_ssh_files | default([]) }}" + +""" + + if flatpak_remotes or users_flatpaks: + users_tasks += """ +- name: Ensure user Flatpak remotes exist + ansible.builtin.command: + argv: + - flatpak + - remote-add + - --user + - --if-not-exists + - "{{ item.name }}" + - "{{ item.url }}" + loop: "{{ users_flatpak_remotes | default([]) | selectattr('method', 'equalto', 'user') | list }}" + when: + - item.name is defined + - item.url is defined + - item.url | length > 0 + - item.user is defined + become: true + become_user: "{{ item.user }}" + environment: + HOME: "{{ item.home | default('/home/' ~ item.user, true) }}" + XDG_DATA_HOME: "{{ (item.home | default('/home/' ~ item.user, true)) ~ '/.local/share' }}" + changed_when: false + +- name: Install user Flatpaks + community.general.flatpak: + name: + - "{{ item.name }}" + state: present + method: user + remote: "{{ item.remote | default(omit) }}" + from_url: "{{ item.from_url | default(omit) }}" + loop: "{{ users_flatpaks | default([]) }}" + when: + - item.name is defined + - item.name | length > 0 + - item.user is defined + become: true + become_user: "{{ item.user }}" + environment: + HOME: "{{ item.home | default('/home/' ~ item.user, true) }}" + XDG_DATA_HOME: "{{ (item.home | default('/home/' ~ item.user, true)) ~ '/.local/share' }}" + """ with open( @@ -1068,10 +1228,67 @@ def _manifest_from_bundle_dir( ) as f: f.write("---\n") + def _fmt_app_list(items: List[Dict[str, Any]]) -> str: + lines = [] + for item in items: + name = item.get("name") + if not name: + continue + detail_parts = [] + for key in ("remote", "channel", "revision", "branch", "arch"): + value = item.get(key) + if value not in (None, "", []): + detail_parts.append(f"{key}={value}") + for key in ("classic", "devmode", "dangerous"): + if item.get(key): + detail_parts.append(key) + details = f" ({', '.join(detail_parts)})" if detail_parts else "" + lines.append(f"- {name}{details}") + return "\n".join(lines) or "- (none)" + + def _fmt_user_flatpaks(items: List[Dict[str, Any]]) -> str: + lines = [] + for item in items: + name = item.get("name") + user = item.get("user") + if not name or not user: + continue + detail_parts = [] + for key in ("remote", "branch", "arch"): + value = item.get(key) + if value not in (None, "", []): + detail_parts.append(f"{key}={value}") + details = f" ({', '.join(detail_parts)})" if detail_parts else "" + lines.append(f"- {user}: {name}{details}") + return "\n".join(lines) or "- (none)" + + def _fmt_remotes(items: List[Dict[str, Any]]) -> str: + lines = [] + for item in items: + name = item.get("name") + url = item.get("url") + method = item.get("method") or "system" + user = item.get("user") + if not name or not url: + continue + owner = f"user={user}" if user else "system" + lines.append(f"- {name} ({method}, {owner}): {url}") + return "\n".join(lines) or "- (none)" + readme = ( """# users -Generated non-system user accounts and SSH public material. +Generated non-system user accounts, SSH public material, and per-user Flatpak +applications/remotes. + +**Note:** User Flatpak tasks require the `community.general` Ansible collection. +Install it with: `ansible-galaxy collection install -r requirements.yml`. + +Flatpak `remote` is harvested from the installed deployment where detectable. +The original `.flatpakref` URL is generally not preserved by Flatpak after +installation, so `from_url` is only emitted if a future/hand-edited state file +contains it. + ## Users """ @@ -1089,6 +1306,14 @@ Generated non-system user accounts and SSH public material. or "- (none)" ) + """\n +## Flatpak remotes +""" + + _fmt_remotes(flatpak_remotes) + + """\n +## User Flatpaks +""" + + _fmt_user_flatpaks(users_flatpaks) + + """\n ## Excluded """ + ( @@ -1106,6 +1331,274 @@ Generated non-system user accounts and SSH public material. manifested_users_roles.append(role) + # ------------------------- + # Flatpak role (system-wide Flatpak remotes and applications) + # ------------------------- + raw_flatpak_apps = flatpak_snapshot.get("system_flatpaks", []) or [] + raw_flatpak_remotes = flatpak_snapshot.get("remotes", []) or [] + + if flatpak_snapshot: + role = flatpak_snapshot.get("role_name", "flatpak") + role_dir = os.path.join(roles_root, role) + _write_role_scaffold(role_dir) + _ensure_requirements_yaml(os.path.join(out_dir, "requirements.yml")) + + flatpak_system_flatpaks = [ + _normalise_flatpak_item(fp, method="system") for fp in raw_flatpak_apps + ] + flatpak_remotes = [_normalise_flatpak_remote(r) for r in raw_flatpak_remotes] + + vars_map = { + "flatpak_system_flatpaks": flatpak_system_flatpaks, + "flatpak_remotes": flatpak_remotes, + } + if site_mode: + _write_role_defaults( + role_dir, + {"flatpak_system_flatpaks": [], "flatpak_remotes": []}, + ) + _write_hostvars(out_dir, fqdn or "", role, vars_map) + else: + _write_role_defaults(role_dir, vars_map) + + with open( + os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8" + ) as f: + f.write( + "---\n" "dependencies: []\n" "collections:\n" " - community.general\n" + ) + + tasks = """--- + +- name: Ensure system Flatpak remotes exist + ansible.builtin.command: + argv: + - flatpak + - remote-add + - --system + - --if-not-exists + - "{{ item.name }}" + - "{{ item.url }}" + loop: "{{ flatpak_remotes | default([]) }}" + when: + - item.name is defined + - item.url is defined + - item.url | length > 0 + become: true + changed_when: false + +- name: Install system-wide Flatpaks + community.general.flatpak: + name: + - "{{ item.name }}" + state: present + method: system + remote: "{{ item.remote | default(omit) }}" + from_url: "{{ item.from_url | default(omit) }}" + loop: "{{ flatpak_system_flatpaks | default([]) }}" + when: + - item.name is defined + - item.name | length > 0 + become: true +""" + with open( + os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8" + ) as f: + f.write(tasks) + + with open( + os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8" + ) as f: + f.write("---\n") + + def _fmt_flatpak_apps(items: List[Dict[str, Any]]) -> str: + lines = [] + for item in items: + name = item.get("name") + if not name: + continue + detail_parts = [] + for key in ("remote", "branch", "arch"): + value = item.get(key) + if value not in (None, "", []): + detail_parts.append(f"{key}={value}") + details = f" ({', '.join(detail_parts)})" if detail_parts else "" + lines.append(f"- {name}{details}") + return "\n".join(lines) or "- (none)" + + def _fmt_flatpak_remotes(items: List[Dict[str, Any]]) -> str: + lines = [] + for item in items: + name = item.get("name") + url = item.get("url") + if not name or not url: + continue + lines.append(f"- {name}: {url}") + return "\n".join(lines) or "- (none)" + + notes = flatpak_snapshot.get("notes", []) or [] + readme = ( + """# flatpak + +Generated system-wide Flatpak remotes and applications. + +**Note:** This role requires the `community.general` Ansible collection. +Install it with: `ansible-galaxy collection install -r requirements.yml`. + +Flatpak `remote` is harvested from the installed deployment where detectable. +The original `.flatpakref` URL is generally not preserved by Flatpak after +installation, so `from_url` is only emitted if a future/hand-edited state file +contains it. + +## System Flatpak remotes +""" + + _fmt_flatpak_remotes(flatpak_remotes) + + """\n +## System-wide Flatpaks +""" + + _fmt_flatpak_apps(flatpak_system_flatpaks) + + """\n +## Notes +""" + + ("\n".join([f"- {n}" for n in notes]) or "- (none)") + + """\n""" + ) + with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f: + f.write(readme) + + manifested_flatpak_roles.append(role) + + # ------------------------- + # Snap role (system-wide snap packages) + # ------------------------- + raw_system_snaps = snap_snapshot.get("system_snaps", []) or [] + + if raw_system_snaps: + role = snap_snapshot.get("role_name", "snap") if snap_snapshot else "snap" + role_dir = os.path.join(roles_root, role) + _write_role_scaffold(role_dir) + _ensure_requirements_yaml(os.path.join(out_dir, "requirements.yml")) + + snap_system_snaps = [_normalise_snap_item(s) for s in raw_system_snaps] + + vars_map = {"snap_system_snaps": snap_system_snaps} + if site_mode: + _write_role_defaults(role_dir, {"snap_system_snaps": []}) + _write_hostvars(out_dir, fqdn or "", role, vars_map) + else: + _write_role_defaults(role_dir, vars_map) + + with open( + os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8" + ) as f: + f.write( + "---\n" "dependencies: []\n" "collections:\n" " - community.general\n" + ) + + tasks = """--- + +- name: Install system-wide snaps with full detected attributes + community.general.snap: + name: + - "{{ item.name }}" + state: present + channel: "{{ item.channel | default(omit) if not (item.install_revision | default(false)) else omit }}" + revision: "{{ item.revision | default(omit) if (item.install_revision | default(false)) else omit }}" + classic: "{{ item.classic | default(false) }}" + devmode: "{{ item.devmode | default(false) }}" + dangerous: "{{ item.dangerous | default(false) }}" + loop: "{{ snap_system_snaps | default([]) }}" + when: + - item.name is defined + - item.name | length > 0 + become: true + register: _enroll_snap_full_results + ignore_errors: true + +- name: Install system-wide snaps with compatibility options + community.general.snap: + name: + - "{{ item.item.name }}" + state: present + channel: "{{ item.item.channel | default(omit) if not (item.item.install_revision | default(false)) else omit }}" + classic: "{{ item.item.classic | default(false) }}" + loop: "{{ (_enroll_snap_full_results | default({})).results | default([]) }}" + when: + - item.failed | default(false) + - item.item.name is defined + - item.item.name | length > 0 + become: true + register: _enroll_snap_compat_results + ignore_errors: true + +- name: Install system-wide snaps with minimal options + community.general.snap: + name: + - "{{ item.item.item.name }}" + state: present + loop: "{{ (_enroll_snap_compat_results | default({})).results | default([]) }}" + when: + - item.failed | default(false) + - item.item.item.name is defined + - item.item.item.name | length > 0 + become: true + ignore_errors: true +""" + with open( + os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8" + ) as f: + f.write(tasks) + + with open( + os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8" + ) as f: + f.write("---\n") + + def _fmt_snap_apps(items: List[Dict[str, Any]]) -> str: + lines = [] + for item in items: + name = item.get("name") + if not name: + continue + detail_parts = [] + for key in ("channel", "revision"): + value = item.get(key) + if value not in (None, "", []): + detail_parts.append(f"{key}={value}") + for key in ("classic", "devmode", "dangerous"): + if item.get(key): + detail_parts.append(key) + details = f" ({', '.join(detail_parts)})" if detail_parts else "" + lines.append(f"- {name}{details}") + return "\n".join(lines) or "- (none)" + + notes = snap_snapshot.get("notes", []) or [] + readme = ( + """# snap + +Generated system-wide snap packages. + +**Note:** This role requires the `community.general` Ansible collection. +Install it with: `ansible-galaxy collection install -r requirements.yml`. + +The first install task uses all harvested attributes. If the installed +`community.general.snap` module is too old for some parameters, the generated +role falls back to reduced then minimal install tasks on a best-effort basis. + +## System-wide snaps +""" + + _fmt_snap_apps(snap_system_snaps) + + """\n +## Notes +""" + + ("\n".join([f"- {n}" for n in notes]) or "- (none)") + + """\n""" + ) + with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f: + f.write(readme) + + manifested_snap_roles.append(role) + # ------------------------- # apt_config role (APT sources, pinning, and keyrings) # ------------------------- @@ -1880,7 +2373,8 @@ User-requested extra file harvesting. # Service roles # ------------------------- for svc in services: - role = svc["role_name"] + source_role = svc["role_name"] + role = avoid_reserved_role_name(source_role, prefix="service") unit = svc["unit"] pkgs = svc.get("packages", []) or [] managed_files = svc.get("managed_files", []) or [] @@ -1899,7 +2393,7 @@ User-requested extra file harvesting. templated, jt_vars = _jinjify_managed_files( bundle_dir, - role, + source_role, role_dir, managed_files, jt_exe=jt_exe, @@ -1911,14 +2405,14 @@ User-requested extra file harvesting. if site_mode: _copy_artifacts( bundle_dir, - role, + source_role, _host_role_files_dir(out_dir, fqdn or "", role), exclude_rels=templated, ) else: _copy_artifacts( bundle_dir, - role, + source_role, os.path.join(role_dir, "files"), exclude_rels=templated, ) @@ -2152,7 +2646,8 @@ This role was created by merging simple packages using the `--merge-simple-packa # Process package roles (those with configuration files) for pr in package_roles: - role = pr["role_name"] + source_role = pr["role_name"] + role = avoid_reserved_role_name(source_role, prefix="package") pkg = pr.get("package") or "" managed_files = pr.get("managed_files", []) or [] managed_dirs = pr.get("managed_dirs", []) or [] @@ -2165,7 +2660,7 @@ This role was created by merging simple packages using the `--merge-simple-packa templated, jt_vars = _jinjify_managed_files( bundle_dir, - role, + source_role, role_dir, managed_files, jt_exe=jt_exe, @@ -2177,14 +2672,14 @@ This role was created by merging simple packages using the `--merge-simple-packa if site_mode: _copy_artifacts( bundle_dir, - role, + source_role, _host_role_files_dir(out_dir, fqdn or "", role), exclude_rels=templated, ) else: _copy_artifacts( bundle_dir, - role, + source_role, os.path.join(role_dir, "files"), exclude_rels=templated, ) @@ -2294,6 +2789,8 @@ Generated for package `{pkg}`. + manifested_etc_custom_roles + manifested_usr_local_custom_roles + manifested_extra_paths_roles + + manifested_flatpak_roles + + manifested_snap_roles + manifested_users_roles + tail_roles + manifested_firewall_runtime_roles diff --git a/enroll/role_names.py b/enroll/role_names.py new file mode 100644 index 0000000..b3fa584 --- /dev/null +++ b/enroll/role_names.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +RESERVED_SINGLETON_ROLE_NAMES = { + "users", + "flatpak", + "snap", + "apt_config", + "dnf_config", + "firewall_runtime", + "etc_custom", + "usr_local_custom", + "extra_paths", + "common_packages", +} + + +def avoid_reserved_role_name(role_name: str, *, prefix: str) -> str: + """Return a role name that cannot collide with singleton roles. + + Singleton roles are generated once per manifest from dedicated top-level + state sections. Package and service roles can naturally have the same names + as those singletons, e.g. the OS package named ``flatpak``. Prefix those + generated package/service roles so they cannot overwrite singleton role + directories during manifestation. + """ + if role_name in RESERVED_SINGLETON_ROLE_NAMES: + return f"{prefix}_{role_name}" + return role_name diff --git a/enroll/schema/state.schema.json b/enroll/schema/state.schema.json index d8c136a..482b014 100644 --- a/enroll/schema/state.schema.json +++ b/enroll/schema/state.schema.json @@ -575,6 +575,21 @@ "$ref": "#/$defs/UserEntry" }, "type": "array" + }, + "user_flatpaks": { + "additionalProperties": { + "type": "array", + "items": { + "$ref": "#/$defs/FlatpakInstall" + } + }, + "type": "object" + }, + "user_flatpak_remotes": { + "type": "array", + "items": { + "$ref": "#/$defs/FlatpakRemote" + } } }, "required": [ @@ -656,6 +671,224 @@ "notes" ], "type": "object" + }, + "FlatpakInstall": { + "additionalProperties": false, + "properties": { + "name": { + "type": "string", + "minLength": 1 + }, + "method": { + "type": "string", + "enum": [ + "system", + "user" + ] + }, + "remote": { + "type": [ + "string", + "null" + ] + }, + "branch": { + "type": [ + "string", + "null" + ] + }, + "arch": { + "type": [ + "string", + "null" + ] + }, + "kind": { + "type": [ + "string", + "null" + ], + "enum": [ + "app", + "runtime", + null + ] + }, + "ref": { + "type": [ + "string", + "null" + ] + }, + "user": { + "type": [ + "string", + "null" + ] + }, + "home": { + "type": [ + "string", + "null" + ] + }, + "source": { + "type": "string" + }, + "from_url": { + "type": "string", + "minLength": 1 + } + }, + "required": [ + "name", + "method" + ], + "type": "object" + }, + "FlatpakRemote": { + "additionalProperties": false, + "properties": { + "name": { + "type": "string", + "minLength": 1 + }, + "method": { + "type": "string", + "enum": [ + "system", + "user" + ] + }, + "url": { + "type": "string", + "minLength": 1 + }, + "user": { + "type": [ + "string", + "null" + ] + }, + "home": { + "type": [ + "string", + "null" + ] + }, + "source": { + "type": "string" + } + }, + "required": [ + "name", + "method", + "url" + ], + "type": "object" + }, + "SnapInstall": { + "additionalProperties": false, + "properties": { + "name": { + "type": "string", + "minLength": 1 + }, + "channel": { + "type": [ + "string", + "null" + ] + }, + "revision": { + "type": [ + "integer", + "null" + ], + "minimum": 0 + }, + "classic": { + "type": "boolean" + }, + "devmode": { + "type": "boolean" + }, + "dangerous": { + "type": "boolean" + }, + "notes": { + "type": "array", + "items": { + "type": "string" + } + }, + "source": { + "type": "string" + }, + "install_revision": { + "type": "boolean" + } + }, + "required": [ + "name" + ], + "type": "object" + }, + "FlatpakSnapshot": { + "additionalProperties": false, + "properties": { + "role_name": { + "const": "flatpak" + }, + "system_flatpaks": { + "type": "array", + "items": { + "$ref": "#/$defs/FlatpakInstall" + } + }, + "remotes": { + "type": "array", + "items": { + "$ref": "#/$defs/FlatpakRemote" + } + }, + "notes": { + "type": "array", + "items": { + "type": "string" + } + } + }, + "required": [ + "role_name" + ], + "type": "object" + + }, + "SnapSnapshot": { + "additionalProperties": false, + "properties": { + "role_name": { + "const": "snap" + }, + "system_snaps": { + "type": "array", + "items": { + "$ref": "#/$defs/SnapInstall" + } + }, + "notes": { + "type": "array", + "items": { + "type": "string" + } + } + }, + "required": [ + "role_name" + ], + "type": "object" } }, "$id": "https://enroll.sh/schema/state.schema.json", @@ -766,6 +999,12 @@ }, "firewall_runtime": { "$ref": "#/$defs/FirewallRuntimeSnapshot" + }, + "flatpak": { + "$ref": "#/$defs/FlatpakSnapshot" + }, + "snap": { + "$ref": "#/$defs/SnapSnapshot" } }, "required": [ diff --git a/tests/test_accounts.py b/tests/test_accounts.py index 36e5af9..a78c5f6 100644 --- a/tests/test_accounts.py +++ b/tests/test_accounts.py @@ -312,3 +312,185 @@ def test_parse_group_handles_short_lines(tmp_path: Path): assert 1000 in gid_to_name assert 1001 not in gid_to_name # skipped due to short line assert 1002 in gid_to_name + + +def test_find_flatpaks_in_root_detects_remote_branch_and_arch(tmp_path: Path): + import enroll.accounts as a + + root = tmp_path / "flatpak" + (root / "repo").mkdir(parents=True) + (root / "repo" / "config").write_text( + '[remote "acme"]\nurl=https://flatpak.example/repo/\n', + encoding="utf-8", + ) + ref = ( + root + / "repo" + / "refs" + / "remotes" + / "acme" + / "app" + / "com.example.App" + / "x86_64" + / "stable" + ) + ref.parent.mkdir(parents=True) + ref.write_text("checksum\n", encoding="utf-8") + active = root / "app" / "com.example.App" / "x86_64" / "stable" / "active" + active.mkdir(parents=True) + + remotes = a.find_flatpak_remotes(str(root), method="system") + assert [(r.name, r.url, r.method) for r in remotes] == [ + ("acme", "https://flatpak.example/repo/", "system") + ] + + apps = a._find_flatpaks_in_root(str(root), method="system") + assert len(apps) == 1 + assert apps[0].name == "com.example.App" + assert apps[0].remote == "acme" + assert apps[0].branch == "stable" + assert apps[0].arch == "x86_64" + + +def test_parse_snap_list_output_detects_channel_revision_and_modes(): + import enroll.accounts as a + + output = """Name Version Rev Tracking Publisher Notes +code abc 123 latest/stable vscode✓ classic +mydev 1.0 42 latest/edge example devmode,dangerous +bare 1.0 5 latest/stable canonical✓ base +""" + + snaps = {snap.name: snap for snap in a._parse_snap_list_output(output)} + assert snaps["code"].channel == "latest/stable" + assert snaps["code"].revision == 123 + assert snaps["code"].classic is True + assert snaps["mydev"].devmode is True + assert snaps["mydev"].dangerous is True + assert snaps["bare"].notes == ["base"] + + +def test_parse_flatpak_list_output_detects_system_refs(): + from enroll.accounts import _parse_flatpak_list_output + + output = "\n".join( + [ + "app/org.example.App/x86_64/stable\tflathub\tstable\tx86_64", + "runtime/org.freedesktop.Platform/x86_64/24.08\tflathub\t24.08\tx86_64", + ] + ) + + refs = _parse_flatpak_list_output( + output, method="system", columns=("ref", "origin", "branch", "arch") + ) + + assert [(r.kind, r.name, r.remote, r.branch, r.arch) for r in refs] == [ + ("app", "org.example.App", "flathub", "stable", "x86_64"), + ("runtime", "org.freedesktop.Platform", "flathub", "24.08", "x86_64"), + ] + assert refs[0].source == "flatpak-list" + + +def test_find_system_flatpaks_prefers_flatpak_list(monkeypatch): + import subprocess + import enroll.accounts as a + + calls = [] + + def fake_run(args, **kwargs): + calls.append(args) + if args == ["flatpak", "list", "--columns=help"]: + return subprocess.CompletedProcess( + args, + 0, + stdout="application\norigin\nbranch\narch\n", + stderr="", + ) + return subprocess.CompletedProcess( + args, + 0, + stdout="app/org.example.App/x86_64/stable\tacme\tstable\tx86_64\n", + stderr="", + ) + + monkeypatch.setattr(a.shutil, "which", lambda cmd: "/usr/bin/flatpak") + monkeypatch.setattr(a.subprocess, "run", fake_run) + monkeypatch.setattr( + a, + "_find_flatpaks_in_root", + lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("fallback used")), + ) + + refs = a.find_system_flatpaks() + + assert calls[0] == ["flatpak", "list", "--columns=help"] + assert calls[1][:3] == ["flatpak", "list", "--system"] + assert refs[0].name == "org.example.App" + assert refs[0].method == "system" + assert refs[0].remote == "acme" + + +def test_parse_flatpak_list_output_detects_application_columns(): + from enroll.accounts import _parse_flatpak_list_output + + output = "org.example.App\tflathub\tstable\tx86_64\n" + refs = _parse_flatpak_list_output( + output, method="system", columns=("application", "origin", "branch", "arch") + ) + + assert len(refs) == 1 + assert refs[0].name == "org.example.App" + assert refs[0].kind is None + assert refs[0].remote == "flathub" + assert refs[0].branch == "stable" + assert refs[0].arch == "x86_64" + + +def test_parse_plain_flatpak_list_output_like_default_table(): + from enroll.accounts import _parse_flatpak_list_output + + output = """Name Application ID Version Branch Installation +Mesa org.freedesktop.Platform.GL.default 26.0.6 25.08 system +Mesa (Extra) org.freedesktop.Platform.GL.default 26.0.6 25.08-extra system +Codecs Extra Extension org.freedesktop.Platform.codecs-extra 25.08-extra system +KDE Application Platform org.kde.Platform 6.10 system +OnionShare org.onionshare.OnionShare 2.6.4 stable system +""" + + refs = _parse_flatpak_list_output(output, method="system", columns=None) + by_name_branch = {(r.name, r.branch) for r in refs} + + assert ("org.onionshare.OnionShare", "stable") in by_name_branch + assert ("org.freedesktop.Platform.GL.default", "25.08") in by_name_branch + assert ("org.freedesktop.Platform.GL.default", "25.08-extra") in by_name_branch + assert ("org.kde.Platform", "6.10") in by_name_branch + + +def test_parse_flatpak_columns_help_handles_description_table(): + from enroll.accounts import _parse_flatpak_columns_help + + output = """ +Available columns: + application The application ID + branch The branch + installation The installation +""" + + assert _parse_flatpak_columns_help(output) >= { + "application", + "branch", + "installation", + } + + +def test_flatpak_list_attempts_respect_supported_columns(): + from enroll.accounts import _flatpak_list_attempts + + attempts = _flatpak_list_attempts( + "--system", {"application", "branch", "installation"} + ) + command_strings = [" ".join(args) for args, _columns in attempts] + + assert any("--columns=application,branch" in cmd for cmd in command_strings) + assert not any("origin" in cmd for cmd in command_strings) + assert command_strings[-1] == "flatpak list --system" diff --git a/tests/test_harvest.py b/tests/test_harvest.py index 93dfd90..1b50da5 100644 --- a/tests/test_harvest.py +++ b/tests/test_harvest.py @@ -224,6 +224,19 @@ def test_harvest_dedup_manual_packages_and_builds_etc_custom( monkeypatch.setattr(harvest, "collect_non_system_users", lambda: []) + import enroll.accounts as accounts + + monkeypatch.setattr(accounts, "find_system_flatpaks", lambda: []) + monkeypatch.setattr(accounts, "find_system_flatpak_remotes", lambda: []) + monkeypatch.setattr( + accounts, "find_user_flatpak_remotes", lambda home, user=None: [] + ) + monkeypatch.setattr( + accounts, + "find_system_snaps", + lambda: [accounts.SnapInstall(name="code", channel="latest/stable")], + ) + def fake_stat_triplet(p: str): if p == "/usr/local/bin/myscript": return ("root", "root", "0755") @@ -259,6 +272,9 @@ def test_harvest_dedup_manual_packages_and_builds_etc_custom( for o in openvpn_obs ) + assert st["roles"]["snap"]["role_name"] == "snap" + assert st["roles"]["snap"]["system_snaps"][0]["name"] == "code" + # Service role captured modified conffile svc = st["roles"]["services"][0] assert svc["unit"] == "openvpn.service" diff --git a/tests/test_harvest_helpers.py b/tests/test_harvest_helpers.py index a0d2c91..53e7d58 100644 --- a/tests/test_harvest_helpers.py +++ b/tests/test_harvest_helpers.py @@ -286,3 +286,20 @@ def test_collect_firewall_runtime_snapshot_is_per_family_fallback( assert ( tmp_path / "artifacts" / "firewall_runtime" / "firewall" / "iptables.v6" ).exists() + + +def test_package_role_names_do_not_collide_with_singleton_roles(): + from enroll.harvest import _role_name_from_pkg + + assert _role_name_from_pkg("flatpak") == "package_flatpak" + assert _role_name_from_pkg("snap") == "package_snap" + assert _role_name_from_pkg("users") == "package_users" + assert _role_name_from_pkg("nginx") == "nginx" + + +def test_service_role_names_do_not_collide_with_singleton_roles(): + from enroll.harvest import _role_name_from_unit + + assert _role_name_from_unit("flatpak.service") == "service_flatpak" + assert _role_name_from_unit("users.service") == "service_users" + assert _role_name_from_unit("nginx.service") == "nginx" diff --git a/tests/test_manifest.py b/tests/test_manifest.py index 1b78bcf..a8eaf0f 100644 --- a/tests/test_manifest.py +++ b/tests/test_manifest.py @@ -1064,3 +1064,317 @@ def test_render_firewall_runtime_tasks_with_ipv6(): } result = manifest._render_firewall_runtime_tasks(state) assert len(result) >= 1 + + +def test_manifest_renders_flatpak_and_snap_details(tmp_path: Path): + bundle = tmp_path / "bundle" + out = tmp_path / "ansible" + state = { + "schema_version": 3, + "host": {"hostname": "test", "os": "debian", "pkg_backend": "dpkg"}, + "inventory": {"packages": {}}, + "roles": { + "users": { + "role_name": "users", + "users": [ + { + "name": "alice", + "uid": 1000, + "gid": 1000, + "gecos": "Alice", + "home": "/home/alice", + "shell": "/bin/bash", + "primary_group": "alice", + "supplementary_groups": [], + } + ], + "managed_files": [], + "excluded": [], + "notes": [], + "user_flatpak_remotes": [ + { + "name": "acme-user", + "method": "user", + "url": "https://flatpak.example/user-repo/", + "user": "alice", + "home": "/home/alice", + }, + ], + "user_flatpaks": { + "alice": [ + { + "name": "org.example.UserApp", + "method": "user", + "remote": "acme-user", + "branch": "stable", + "arch": "x86_64", + } + ] + }, + }, + "flatpak": { + "role_name": "flatpak", + "remotes": [ + { + "name": "acme", + "method": "system", + "url": "https://flatpak.example/repo/", + }, + ], + "system_flatpaks": [ + { + "name": "com.example.App", + "method": "system", + "remote": "acme", + "branch": "stable", + "arch": "x86_64", + } + ], + "notes": [], + }, + "snap": { + "role_name": "snap", + "system_snaps": [ + { + "name": "code", + "channel": "latest/stable", + "revision": 123, + "classic": True, + "notes": ["classic"], + } + ], + "notes": [], + }, + "services": [], + "packages": [], + "apt_config": { + "role_name": "apt_config", + "managed_files": [], + "excluded": [], + "notes": [], + }, + "dnf_config": { + "role_name": "dnf_config", + "managed_files": [], + "excluded": [], + "notes": [], + }, + "etc_custom": { + "role_name": "etc_custom", + "managed_files": [], + "excluded": [], + "notes": [], + }, + "usr_local_custom": { + "role_name": "usr_local_custom", + "managed_files": [], + "excluded": [], + "notes": [], + }, + "extra_paths": { + "role_name": "extra_paths", + "include_patterns": [], + "exclude_patterns": [], + "managed_files": [], + "excluded": [], + "notes": [], + }, + }, + } + bundle.mkdir(parents=True, exist_ok=True) + (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") + + manifest.manifest(str(bundle), str(out)) + + users_defaults = (out / "roles" / "users" / "defaults" / "main.yml").read_text( + encoding="utf-8" + ) + users_tasks = (out / "roles" / "users" / "tasks" / "main.yml").read_text( + encoding="utf-8" + ) + users_readme = (out / "roles" / "users" / "README.md").read_text(encoding="utf-8") + flatpak_defaults = (out / "roles" / "flatpak" / "defaults" / "main.yml").read_text( + encoding="utf-8" + ) + flatpak_tasks = (out / "roles" / "flatpak" / "tasks" / "main.yml").read_text( + encoding="utf-8" + ) + snap_defaults = (out / "roles" / "snap" / "defaults" / "main.yml").read_text( + encoding="utf-8" + ) + snap_tasks = (out / "roles" / "snap" / "tasks" / "main.yml").read_text( + encoding="utf-8" + ) + + assert "users_flatpak_remotes:" in users_defaults + assert "remote: acme-user" in users_defaults + assert "community.general.snap" not in users_tasks + assert "Install system-wide snaps" not in users_tasks + assert "Install system-wide Flatpaks" not in users_tasks + assert "ansible-galaxy collection install -r requirements.yml" in users_readme + + assert "snap_system_snaps:" in snap_defaults + assert "channel: latest/stable" in snap_defaults + assert "classic: true" in snap_defaults + assert "community.general.snap" in snap_tasks + assert "Install system-wide snaps with full detected attributes" in snap_tasks + assert "Install system-wide snaps with compatibility options" in snap_tasks + assert "Install system-wide snaps with minimal options" in snap_tasks + assert "ignore_errors: true" in snap_tasks + + assert "flatpak_system_flatpaks:" in flatpak_defaults + assert "remote: acme" in flatpak_defaults + assert "community.general.flatpak" in flatpak_tasks + assert "Install system-wide Flatpaks" in flatpak_tasks + assert (out / "requirements.yml").exists() + + +def test_users_role_without_portable_apps_omits_community_general_tasks(tmp_path): + bundle = tmp_path / "bundle" + out = tmp_path / "out" + state = { + "roles": { + "users": { + "role_name": "users", + "users": [ + { + "name": "alice", + "uid": 1000, + "gid": 1000, + "gecos": "Alice", + "home": "/home/alice", + "shell": "/bin/bash", + "primary_group": "alice", + "supplementary_groups": [], + } + ], + "managed_files": [], + "excluded": [], + "notes": [], + }, + "services": [], + "packages": [], + }, + } + bundle.mkdir(parents=True, exist_ok=True) + (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") + + manifest.manifest(str(bundle), str(out)) + + users_tasks = (out / "roles" / "users" / "tasks" / "main.yml").read_text( + encoding="utf-8" + ) + users_meta = (out / "roles" / "users" / "meta" / "main.yml").read_text( + encoding="utf-8" + ) + + assert "community.general.flatpak" not in users_tasks + assert "community.general.snap" not in users_tasks + assert "collections:" not in users_meta + + +def test_manifest_emits_flatpak_role_even_when_no_flatpaks(tmp_path): + bundle = tmp_path / "bundle" + out = tmp_path / "out" + state = { + "roles": { + "users": { + "role_name": "users", + "users": [], + "managed_files": [], + "excluded": [], + "notes": [], + "user_flatpaks": {}, + "user_flatpak_remotes": [], + }, + "flatpak": { + "role_name": "flatpak", + "system_flatpaks": [], + "remotes": [], + "notes": [], + }, + "services": [], + "packages": [], + } + } + bundle.mkdir(parents=True, exist_ok=True) + (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") + + manifest.manifest(str(bundle), str(out)) + + flatpak_tasks = (out / "roles" / "flatpak" / "tasks" / "main.yml").read_text( + encoding="utf-8" + ) + flatpak_defaults = (out / "roles" / "flatpak" / "defaults" / "main.yml").read_text( + encoding="utf-8" + ) + + assert "flatpak_system_flatpaks: []" in flatpak_defaults + assert "flatpak_remotes: []" in flatpak_defaults + assert "Install system-wide Flatpaks" in flatpak_tasks + assert "Ensure system Flatpak remotes exist" in flatpak_tasks + + +def test_manifest_avoids_package_role_collision_with_flatpak_singleton(tmp_path): + bundle = tmp_path / "bundle" + out = tmp_path / "out" + state = { + "roles": { + "users": { + "role_name": "users", + "users": [], + "managed_files": [], + "excluded": [], + "notes": [], + "user_flatpaks": {}, + "user_flatpak_remotes": [], + }, + "flatpak": { + "role_name": "flatpak", + "remotes": [ + { + "name": "flathub", + "method": "system", + "url": "https://dl.flathub.org/repo/", + } + ], + "system_flatpaks": [ + { + "name": "org.onionshare.OnionShare", + "method": "system", + "remote": "flathub", + "branch": "stable", + "arch": "x86_64", + } + ], + "notes": [], + }, + "services": [], + "packages": [ + { + "package": "flatpak", + "role_name": "flatpak", + "managed_files": [], + "managed_dirs": [], + "managed_links": [], + "excluded": [], + "notes": [], + "has_config": True, + } + ], + } + } + bundle.mkdir(parents=True, exist_ok=True) + (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") + + manifest.manifest(str(bundle), str(out)) + + flatpak_defaults = (out / "roles" / "flatpak" / "defaults" / "main.yml").read_text( + encoding="utf-8" + ) + playbook = (out / "playbook.yml").read_text(encoding="utf-8") + + assert "org.onionshare.OnionShare" in flatpak_defaults + assert (out / "roles" / "package_flatpak" / "tasks" / "main.yml").exists() + assert "role: flatpak" in playbook + assert "role: package_flatpak" in playbook