Compare commits

..

2 commits

Author SHA1 Message Date
eb1d096c90
Add support for detecting flatpaks and snaps
Some checks failed
CI / test (push) Failing after 5m51s
Lint / test (push) Successful in 43s
2026-06-14 18:25:26 +10:00
11351cce87
Fix test 2026-06-14 16:23:06 +10:00
11 changed files with 2042 additions and 23 deletions

View file

@ -1,5 +1,6 @@
# 0.7.0 # 0.7.0
* Add support for detecting flatpaks and snaps
* Add --merge-simple-packages to reduce the number of roles, for packages that have no config files or services to maintain. * Add --merge-simple-packages to reduce the number of roles, for packages that have no config files or services to maintain.
# 0.6.0 # 0.6.0

View file

@ -1,8 +1,48 @@
from __future__ import annotations from __future__ import annotations
import configparser
import os import os
from dataclasses import dataclass import re
from typing import Dict, List, Set, Tuple import shutil
import subprocess # nosec
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Set, Tuple
@dataclass
class FlatpakInstall:
name: str
method: str
remote: Optional[str] = None
branch: Optional[str] = None
arch: Optional[str] = None
kind: Optional[str] = None
ref: Optional[str] = None
user: Optional[str] = None
home: Optional[str] = None
source: str = "filesystem"
@dataclass
class FlatpakRemote:
name: str
method: str
url: str
user: Optional[str] = None
home: Optional[str] = None
source: str = "filesystem"
@dataclass
class SnapInstall:
name: str
channel: Optional[str] = None
revision: Optional[int] = None
classic: bool = False
devmode: bool = False
dangerous: bool = False
notes: List[str] = field(default_factory=list)
source: str = "snap-list"
@dataclass @dataclass
@ -16,6 +56,7 @@ class UserRecord:
primary_group: str primary_group: str
supplementary_groups: List[str] supplementary_groups: List[str]
ssh_files: List[str] ssh_files: List[str]
flatpaks: List[FlatpakInstall] = field(default_factory=list)
def parse_login_defs(path: str = "/etc/login.defs") -> Dict[str, int]: def parse_login_defs(path: str = "/etc/login.defs") -> Dict[str, int]:
@ -115,6 +156,612 @@ def find_user_ssh_files(home: str) -> List[str]:
return sorted(set(out)) return sorted(set(out))
def _read_first_existing_text(paths: List[str]) -> Optional[str]:
for path in paths:
try:
with open(path, "r", encoding="utf-8", errors="replace") as f:
value = f.read().strip()
if value:
return value
except OSError:
continue
return None
def _parse_flatpak_ref(
ref: str,
) -> Tuple[Optional[str], str, Optional[str], Optional[str]]:
"""Return (kind, name, arch, branch) for a Flatpak ref.
refs look like app/org.example.App/x86_64/stable or
runtime/org.example.Platform/x86_64/23.08. If the value is already just an
application/runtime ID, keep it as the name and leave the other fields empty.
"""
parts = [p for p in (ref or "").strip().split("/") if p]
if len(parts) >= 4 and parts[0] in {"app", "runtime"}:
return parts[0], parts[1], parts[2], parts[3]
return None, (ref or "").strip(), None, None
def _parse_plain_flatpak_list_output(
output: str,
*,
method: str,
user: Optional[str] = None,
home: Optional[str] = None,
) -> List[FlatpakInstall]:
"""Parse default `flatpak list` table output.
Example:
Name Application ID Version Branch Installation
OnionShare org.onionshare.OnionShare 2.6.4 stable system
"""
out: List[FlatpakInstall] = []
seen: Set[
Tuple[str, Optional[str], Optional[str], Optional[str], Optional[str]]
] = set()
id_re = re.compile(r"\b(?:[A-Za-z0-9_-]+\.)+[A-Za-z0-9_-]+\b")
for line in output.splitlines():
line = line.rstrip()
if not line.strip():
continue
if "Application ID" in line and "Installation" in line:
continue
match = id_re.search(line)
if not match:
continue
name = match.group(0)
tail = line[match.end() :].split()
installation = tail[-1] if tail else ""
if installation in {"system", "user"} and installation != method:
continue
branch = None
if len(tail) >= 2 and tail[-1] in {"system", "user"}:
branch = tail[-2]
elif tail:
branch = tail[-1]
key = (name, None, branch, None, None)
if key in seen:
continue
seen.add(key)
out.append(
FlatpakInstall(
name=name,
method=method,
remote=None,
branch=branch,
arch=None,
kind=None,
ref=None,
user=user,
home=home,
source="flatpak-list",
)
)
return sorted(out, key=lambda f: (f.name, f.branch or ""))
def _parse_flatpak_list_output(
output: str,
*,
method: str,
columns: Optional[Tuple[str, ...]] = None,
user: Optional[str] = None,
home: Optional[str] = None,
) -> List[FlatpakInstall]:
"""Parse Flatpak list output.
If columns is None, parse the default table. Otherwise columns names must
match the order passed to `flatpak list --columns=...`.
"""
if columns is None:
return _parse_plain_flatpak_list_output(
output, method=method, user=user, home=home
)
out: List[FlatpakInstall] = []
seen: Set[
Tuple[str, Optional[str], Optional[str], Optional[str], Optional[str]]
] = set()
for line in output.splitlines():
line = line.strip()
if not line:
continue
lower = line.lower()
if lower.startswith("ref") or lower.startswith("application id"):
continue
parts = line.split("\t")
if len(parts) < len(columns):
parts = line.split()
if not parts:
continue
fields = {
name: parts[idx].strip()
for idx, name in enumerate(columns)
if idx < len(parts)
}
ref = fields.get("ref") or fields.get("application") or ""
kind, name, ref_arch, ref_branch = _parse_flatpak_ref(ref)
if not name:
continue
remote = fields.get("origin") or None
branch = fields.get("branch") or ref_branch
arch = fields.get("arch") or ref_arch
if remote in {"", "-"}:
remote = None
if branch in {"", "-"}:
branch = None
if arch in {"", "-"}:
arch = None
key = (name, remote, branch, arch, kind)
if key in seen:
continue
seen.add(key)
out.append(
FlatpakInstall(
name=name,
method=method,
remote=remote,
branch=branch,
arch=arch,
kind=kind,
ref=ref if "/" in ref else None,
user=user,
home=home,
source="flatpak-list",
)
)
return sorted(
out,
key=lambda f: (
f.kind or "",
f.name,
f.remote or "",
f.branch or "",
f.arch or "",
),
)
_KNOWN_FLATPAK_LIST_COLUMNS = {
"name",
"description",
"application",
"version",
"branch",
"arch",
"origin",
"installation",
"ref",
"active",
"latest",
"size",
"options",
}
def _parse_flatpak_columns_help(output: str) -> Set[str]:
"""Parse `flatpak list --columns=help` output into supported fields."""
supported: Set[str] = set()
for line in output.splitlines():
# Help output varies a bit between Flatpak versions. Treat any known
# token as a supported field, whether it appears alone or in a
# description table.
for token in re.findall(r"[A-Za-z_][A-Za-z0-9_-]*", line.lower()):
if token in _KNOWN_FLATPAK_LIST_COLUMNS:
supported.add(token)
return supported
def _run_flatpak_columns_help() -> Optional[Set[str]]:
if shutil.which("flatpak") is None:
return None
try:
proc = subprocess.run( # nosec
["flatpak", "list", "--columns=help"],
shell=False,
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=10,
)
except Exception:
return None
if proc.returncode != 0:
return None
supported = _parse_flatpak_columns_help(proc.stdout or "")
return supported or None
def _flatpak_list_attempts(
scope: str, supported: Optional[Set[str]]
) -> List[Tuple[List[str], Optional[Tuple[str, ...]]]]:
def supported_columns(*wanted: str) -> Optional[Tuple[str, ...]]:
if supported is not None and not set(wanted).issubset(supported):
return None
return tuple(wanted)
column_sets: List[Tuple[str, ...]] = []
for wanted in (
("application", "origin", "branch", "arch"),
("application", "branch", "arch"),
("application", "branch"),
("application",),
("ref", "origin", "branch", "arch"),
("ref", "branch", "arch"),
("ref", "branch"),
("ref",),
):
cols = supported_columns(*wanted)
if cols is not None and cols not in column_sets:
column_sets.append(cols)
attempts: List[Tuple[List[str], Optional[Tuple[str, ...]]]] = [
(
["flatpak", "list", scope, "--columns=" + ",".join(cols)],
cols,
)
for cols in column_sets
]
attempts.append((["flatpak", "list", scope], None))
return attempts
def _run_flatpak_list(method: str) -> Optional[Tuple[str, Optional[Tuple[str, ...]]]]:
if shutil.which("flatpak") is None:
return None
scope = "--system" if method == "system" else "--user"
supported = _run_flatpak_columns_help()
for args, columns in _flatpak_list_attempts(scope, supported):
try:
proc = subprocess.run( # nosec
args,
shell=False,
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=10,
)
except Exception: # nosec B112
continue
if proc.returncode == 0:
return proc.stdout or "", columns
return None
def _flatpak_remote_from_ref(
flatpak_root: str, app_id: str, arch: str, branch: str, remote_names: List[str]
) -> Optional[str]:
for remote_name in remote_names:
ref = os.path.join(
flatpak_root,
"repo",
"refs",
"remotes",
remote_name,
"app",
app_id,
arch,
branch,
)
if os.path.exists(ref):
return remote_name
return None
def _parse_flatpak_deploy_origin(branch_dir: str) -> Optional[str]:
active_dir = os.path.join(branch_dir, "active")
candidates = [
os.path.join(active_dir, "origin"),
os.path.join(active_dir, "metadata"),
]
origin = _read_first_existing_text([candidates[0]])
if origin:
return origin
metadata = candidates[1]
if os.path.isfile(metadata):
parser = configparser.ConfigParser(interpolation=None)
try:
parser.read(metadata, encoding="utf-8")
except Exception:
return None
for section in ("Application", "Runtime"):
if parser.has_option(section, "origin"):
value = parser.get(section, "origin", fallback="").strip()
if value:
return value
return None
def _find_flatpaks_in_root(
flatpak_root: str,
*,
method: str,
user: Optional[str] = None,
home: Optional[str] = None,
) -> List[FlatpakInstall]:
apps_dir = os.path.join(flatpak_root, "app")
if not os.path.isdir(apps_dir):
return []
remote_names = [
r.name
for r in find_flatpak_remotes(flatpak_root, method=method, user=user, home=home)
]
out: List[FlatpakInstall] = []
try:
app_ids = sorted(os.listdir(apps_dir))
except OSError:
return []
seen: Set[Tuple[str, Optional[str], Optional[str], Optional[str]]] = set()
for app_id in app_ids:
app_path = os.path.join(apps_dir, app_id)
if not os.path.isdir(app_path):
continue
try:
arches = sorted(os.listdir(app_path))
except OSError:
continue
for arch in arches:
arch_path = os.path.join(app_path, arch)
if not os.path.isdir(arch_path):
continue
try:
branches = sorted(os.listdir(arch_path))
except OSError:
continue
for branch in branches:
branch_path = os.path.join(arch_path, branch)
if not os.path.isdir(branch_path):
continue
active_dir = os.path.join(branch_path, "active")
if not os.path.exists(active_dir):
continue
remote = _parse_flatpak_deploy_origin(branch_path)
if not remote:
remote = _flatpak_remote_from_ref(
flatpak_root, app_id, arch, branch, remote_names
)
key = (app_id, remote, branch, arch)
if key in seen:
continue
seen.add(key)
out.append(
FlatpakInstall(
name=app_id,
method=method,
remote=remote,
branch=branch or None,
arch=arch or None,
kind="app",
ref=f"app/{app_id}/{arch}/{branch}",
user=user,
home=home,
)
)
return sorted(
out, key=lambda f: (f.name, f.remote or "", f.branch or "", f.arch or "")
)
def find_flatpak_remotes(
flatpak_root: str,
*,
method: str,
user: Optional[str] = None,
home: Optional[str] = None,
) -> List[FlatpakRemote]:
"""Return configured Flatpak remotes for a Flatpak installation root.
Flatpak stores remotes in the OSTree repo config. This gives us the remote
names and repository URLs. It does not reliably preserve the original
.flatpakref/.flatpakrepo URL that was used during installation.
"""
config_path = os.path.join(flatpak_root, "repo", "config")
if not os.path.isfile(config_path):
return []
parser = configparser.ConfigParser(interpolation=None, strict=False)
try:
parser.read(config_path, encoding="utf-8")
except Exception:
return []
out: List[FlatpakRemote] = []
for section in parser.sections():
match = re.fullmatch(r'remote\s+"(.+)"', section)
if not match:
continue
name = match.group(1).strip()
url = parser.get(section, "url", fallback="").strip()
if not name or not url:
continue
out.append(
FlatpakRemote(
name=name,
method=method,
url=url,
user=user,
home=home,
)
)
return sorted(out, key=lambda r: (r.method, r.user or "", r.name))
def find_user_flatpaks(home: str, user: Optional[str] = None) -> List[FlatpakInstall]:
"""Return per-user Flatpak applications installed under a home directory."""
flatpak_root = os.path.join(home, ".local", "share", "flatpak")
return _find_flatpaks_in_root(flatpak_root, method="user", user=user, home=home)
def find_user_flatpak_remotes(
home: str, user: Optional[str] = None
) -> List[FlatpakRemote]:
flatpak_root = os.path.join(home, ".local", "share", "flatpak")
return find_flatpak_remotes(flatpak_root, method="user", user=user, home=home)
def find_system_flatpaks() -> List[FlatpakInstall]:
"""Return Flatpak refs installed system-wide.
Prefer `flatpak list --system` because it is Flatpak's own view of
installed refs and includes layouts the filesystem scanner might miss.
Fall back to the on-disk app deployment tree when the command is
unavailable or produces unparsable output.
"""
listing = _run_flatpak_list("system")
if listing is not None:
output, columns = listing
parsed = _parse_flatpak_list_output(output, method="system", columns=columns)
if parsed or not output.strip():
return parsed
return _find_flatpaks_in_root("/var/lib/flatpak", method="system")
def find_system_flatpak_remotes() -> List[FlatpakRemote]:
return find_flatpak_remotes("/var/lib/flatpak", method="system")
def _parse_snap_notes(notes: str) -> List[str]:
if not notes or notes == "-":
return []
cleaned = notes.replace(",", " ").replace(";", " ")
return sorted(
{n.strip().lower() for n in cleaned.split() if n.strip() and n.strip() != "-"}
)
def _parse_snap_list_output(output: str) -> List[SnapInstall]:
out: List[SnapInstall] = []
for idx, line in enumerate(output.splitlines()):
line = line.strip()
if not line:
continue
if idx == 0 and line.lower().startswith("name"):
continue
parts = line.split(maxsplit=5)
if len(parts) < 5:
continue
name = parts[0]
revision: Optional[int]
try:
revision = int(parts[2])
except ValueError:
revision = None
tracking = parts[3]
channel = None if tracking in {"-", ""} else tracking
notes = _parse_snap_notes(parts[5] if len(parts) > 5 else "")
out.append(
SnapInstall(
name=name,
channel=channel,
revision=revision,
classic="classic" in notes,
devmode="devmode" in notes,
dangerous="dangerous" in notes,
notes=notes,
source="snap-list",
)
)
return sorted(out, key=lambda s: s.name)
def _run_snap_list() -> Optional[str]:
if shutil.which("snap") is None:
return None
try:
proc = subprocess.run( # nosec
["snap", "list"],
shell=False,
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=10,
)
except Exception:
return None
if proc.returncode != 0:
return None
return proc.stdout or ""
def _find_system_snaps_from_filesystem() -> List[SnapInstall]:
snapd_snaps = "/var/lib/snapd/snaps"
if not os.path.isdir(snapd_snaps):
return []
current_revisions: Dict[str, int] = {}
snap_mounts = "/snap"
if os.path.isdir(snap_mounts):
try:
mount_names = os.listdir(snap_mounts)
except OSError:
mount_names = []
for name in mount_names:
current = os.path.join(snap_mounts, name, "current")
try:
target = os.readlink(current)
except OSError:
continue
try:
current_revisions[name] = int(os.path.basename(target.rstrip("/")))
except ValueError:
continue
candidates: Dict[str, List[int]] = {}
try:
entries = os.listdir(snapd_snaps)
except OSError:
return []
for entry in entries:
if not entry.endswith(".snap") or "_" not in entry:
continue
name, rev_text = entry[:-5].rsplit("_", 1)
try:
revision = int(rev_text)
except ValueError:
continue
candidates.setdefault(name, []).append(revision)
out: List[SnapInstall] = []
for name, revisions in candidates.items():
revision = current_revisions.get(name)
if revision is None:
revision = max(revisions)
out.append(SnapInstall(name=name, revision=revision, source="filesystem"))
return sorted(out, key=lambda s: s.name)
def find_system_snaps() -> List[SnapInstall]:
"""Return system-wide snap packages.
Prefer `snap list` because it exposes channel tracking and confinement notes.
Fall back to snapd's on-disk snap filenames when the command is unavailable.
"""
output = _run_snap_list()
if output is not None:
parsed = _parse_snap_list_output(output)
if parsed:
return parsed
return _find_system_snaps_from_filesystem()
def collect_non_system_users() -> List[UserRecord]: def collect_non_system_users() -> List[UserRecord]:
defs = parse_login_defs() defs = parse_login_defs()
uid_min = defs.get("UID_MIN", 1000) uid_min = defs.get("UID_MIN", 1000)
@ -139,6 +786,10 @@ def collect_non_system_users() -> List[UserRecord]:
ssh_files = find_user_ssh_files(home) if home and home.startswith("/") else [] ssh_files = find_user_ssh_files(home) if home and home.startswith("/") else []
flatpaks: List[FlatpakInstall] = []
if home and home.startswith("/"):
flatpaks = find_user_flatpaks(home, user=name)
users.append( users.append(
UserRecord( UserRecord(
name=name, name=name,
@ -150,6 +801,7 @@ def collect_non_system_users() -> List[UserRecord]:
primary_group=primary_group, primary_group=primary_group,
supplementary_groups=supp, supplementary_groups=supp,
ssh_files=ssh_files, ssh_files=ssh_files,
flatpaks=flatpaks,
) )
) )

View file

@ -10,8 +10,9 @@ import stat
import subprocess # nosec import subprocess # nosec
import time import time
from dataclasses import dataclass, asdict, field from dataclasses import dataclass, asdict, field
from typing import Dict, List, Optional, Set, Tuple from typing import Any, Dict, List, Optional, Set, Tuple
from .role_names import avoid_reserved_role_name
from .systemd import ( from .systemd import (
list_enabled_services, list_enabled_services,
list_enabled_timers, list_enabled_timers,
@ -101,6 +102,23 @@ class UsersSnapshot:
managed_files: List[ManagedFile] = field(default_factory=list) managed_files: List[ManagedFile] = field(default_factory=list)
excluded: List[ExcludedFile] = field(default_factory=list) excluded: List[ExcludedFile] = field(default_factory=list)
notes: List[str] = field(default_factory=list) notes: List[str] = field(default_factory=list)
user_flatpaks: Dict[str, List[Dict[str, Any]]] = field(default_factory=dict)
user_flatpak_remotes: List[Dict[str, Any]] = field(default_factory=list)
@dataclass
class FlatpakSnapshot:
role_name: str
system_flatpaks: List[Dict[str, Any]] = field(default_factory=list)
remotes: List[Dict[str, Any]] = field(default_factory=list)
notes: List[str] = field(default_factory=list)
@dataclass
class SnapSnapshot:
role_name: str
system_snaps: List[Dict[str, Any]] = field(default_factory=list)
notes: List[str] = field(default_factory=list)
@dataclass @dataclass
@ -364,11 +382,11 @@ def _role_id(raw: str) -> str:
def _role_name_from_unit(unit: str) -> str: def _role_name_from_unit(unit: str) -> str:
base = _role_id(unit.removesuffix(".service")) base = _role_id(unit.removesuffix(".service"))
return _safe_name(base) return avoid_reserved_role_name(_safe_name(base), prefix="service")
def _role_name_from_pkg(pkg: str) -> str: def _role_name_from_pkg(pkg: str) -> str:
return _safe_name(pkg) return avoid_reserved_role_name(_safe_name(pkg), prefix="package")
def _copy_into_bundle( def _copy_into_bundle(
@ -1808,6 +1826,30 @@ def harvest(
user_records = [] user_records = []
users_notes.append(f"Failed to enumerate users: {e!r}") users_notes.append(f"Failed to enumerate users: {e!r}")
# Detect system-wide Flatpaks/Snaps and configured Flatpak remotes.
from .accounts import (
find_system_flatpak_remotes,
find_system_flatpaks,
find_system_snaps,
find_user_flatpak_remotes,
)
system_flatpaks = [asdict(f) for f in find_system_flatpaks()]
system_snaps = [asdict(s) for s in find_system_snaps()]
system_flatpak_remotes = [asdict(r) for r in find_system_flatpak_remotes()]
flatpak_notes: List[str] = []
snap_notes: List[str] = []
if system_flatpaks:
flatpak_notes.append(
"System-wide flatpaks detected: "
+ ", ".join(str(f.get("name")) for f in system_flatpaks)
)
if system_snaps:
snap_notes.append(
"System-wide snaps detected: "
+ ", ".join(str(s.get("name")) for s in system_snaps)
)
users_role_name = "users" users_role_name = "users"
users_role_seen = seen_by_role.setdefault(users_role_name, set()) users_role_seen = seen_by_role.setdefault(users_role_name, set())
@ -1823,6 +1865,9 @@ def harvest(
(".bash_aliases", "user_shell_aliases"), (".bash_aliases", "user_shell_aliases"),
] ]
user_flatpaks_map: Dict[str, List[Dict[str, Any]]] = {}
user_flatpak_remotes: List[Dict[str, Any]] = []
for u in user_records: for u in user_records:
users_list.append( users_list.append(
{ {
@ -1899,12 +1944,36 @@ def harvest(
seen_global=captured_global, seen_global=captured_global,
) )
# Collect per-user Flatpak applications and remotes. Snap packages are
# system-wide; ~/snap/* is user data, not an install source.
if u.flatpaks:
user_flatpaks_map[u.name] = [asdict(fp) for fp in u.flatpaks]
if home and home.startswith("/"):
user_flatpak_remotes.extend(
asdict(r) for r in find_user_flatpak_remotes(home, user=u.name)
)
users_snapshot = UsersSnapshot( users_snapshot = UsersSnapshot(
role_name=users_role_name, role_name=users_role_name,
users=users_list, users=users_list,
managed_files=users_managed, managed_files=users_managed,
excluded=users_excluded, excluded=users_excluded,
notes=users_notes, notes=users_notes,
user_flatpaks=user_flatpaks_map,
user_flatpak_remotes=user_flatpak_remotes,
)
flatpak_snapshot = FlatpakSnapshot(
role_name="flatpak",
system_flatpaks=system_flatpaks,
remotes=system_flatpak_remotes,
notes=flatpak_notes,
)
snap_snapshot = SnapSnapshot(
role_name="snap",
system_snaps=system_snaps,
notes=snap_notes,
) )
# ------------------------- # -------------------------
@ -2512,6 +2581,8 @@ def harvest(
}, },
"roles": { "roles": {
"users": asdict(users_snapshot), "users": asdict(users_snapshot),
"flatpak": asdict(flatpak_snapshot),
"snap": asdict(snap_snapshot),
"services": [asdict(s) for s in service_snaps], "services": [asdict(s) for s in service_snaps],
"packages": [asdict(p) for p in pkg_snaps], "packages": [asdict(p) for p in pkg_snaps],
"apt_config": asdict(apt_config_snapshot), "apt_config": asdict(apt_config_snapshot),

View file

@ -10,6 +10,8 @@ import tempfile
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple from typing import Any, Dict, List, Optional, Set, Tuple
from .role_names import avoid_reserved_role_name
from .jinjaturtle import ( from .jinjaturtle import (
can_jinjify_path, can_jinjify_path,
find_jinjaturtle_cmd, find_jinjaturtle_cmd,
@ -229,6 +231,72 @@ def _ensure_ansible_cfg(cfg_path: str) -> None:
return return
def _ensure_requirements_yaml(req_path: str) -> None:
if not os.path.exists(req_path):
with open(req_path, "w", encoding="utf-8") as f:
f.write("---\n")
f.write("collections:\n")
f.write(" - name: community.general\n")
f.write(' version: ">=13.0.0"\n')
return
def _normalise_flatpak_item(
item: Any,
*,
method: str,
user: Optional[str] = None,
home: Optional[str] = None,
) -> Dict[str, Any]:
if isinstance(item, str):
out: Dict[str, Any] = {"name": item, "method": method}
elif isinstance(item, dict):
out = dict(item)
out.setdefault("method", method)
else:
out = {"name": str(item), "method": method}
if user:
out.setdefault("user", user)
if home:
out.setdefault("home", home)
return out
def _normalise_flatpak_remote(item: Any) -> Dict[str, Any]:
if isinstance(item, dict):
out = dict(item)
else:
out = {"name": str(item)}
out.setdefault("method", "system")
return out
def _normalise_snap_item(item: Any) -> Dict[str, Any]:
if isinstance(item, str):
out: Dict[str, Any] = {"name": item}
elif isinstance(item, dict):
out = dict(item)
else:
out = {"name": str(item)}
notes = out.get("notes") or []
if isinstance(notes, str):
notes = [notes]
notes_l = {str(n).lower() for n in notes}
out["classic"] = bool(out.get("classic") or "classic" in notes_l)
out["devmode"] = bool(out.get("devmode") or "devmode" in notes_l)
out["dangerous"] = bool(out.get("dangerous") or "dangerous" in notes_l)
# The Ansible snap module's revision parameter pins/holds the snap. For
# ordinary store snaps that track a channel, preserve the channel instead
# of freezing every harvested host at today's revision.
if out.get("revision") is not None and not out.get("channel"):
out["install_revision"] = True
else:
out["install_revision"] = False
return out
def _ensure_inventory_host(inv_path: str, fqdn: str) -> None: def _ensure_inventory_host(inv_path: str, fqdn: str) -> None:
os.makedirs(os.path.dirname(inv_path), exist_ok=True) os.makedirs(os.path.dirname(inv_path), exist_ok=True)
if not os.path.exists(inv_path): if not os.path.exists(inv_path):
@ -836,6 +904,8 @@ def _manifest_from_bundle_dir(
services: List[Dict[str, Any]] = roles.get("services", []) services: List[Dict[str, Any]] = roles.get("services", [])
package_roles: List[Dict[str, Any]] = roles.get("packages", []) package_roles: List[Dict[str, Any]] = roles.get("packages", [])
users_snapshot: Dict[str, Any] = roles.get("users", {}) users_snapshot: Dict[str, Any] = roles.get("users", {})
flatpak_snapshot: Dict[str, Any] = roles.get("flatpak", {})
snap_snapshot: Dict[str, Any] = roles.get("snap", {})
apt_config_snapshot: Dict[str, Any] = roles.get("apt_config", {}) apt_config_snapshot: Dict[str, Any] = roles.get("apt_config", {})
dnf_config_snapshot: Dict[str, Any] = roles.get("dnf_config", {}) dnf_config_snapshot: Dict[str, Any] = roles.get("dnf_config", {})
firewall_runtime_snapshot: Dict[str, Any] = roles.get("firewall_runtime", {}) firewall_runtime_snapshot: Dict[str, Any] = roles.get("firewall_runtime", {})
@ -871,8 +941,11 @@ def _manifest_from_bundle_dir(
os.path.join(out_dir, "inventory", "hosts.ini"), fqdn or "" os.path.join(out_dir, "inventory", "hosts.ini"), fqdn or ""
) )
_ensure_ansible_cfg(os.path.join(out_dir, "ansible.cfg")) _ensure_ansible_cfg(os.path.join(out_dir, "ansible.cfg"))
_ensure_requirements_yaml(os.path.join(out_dir, "requirements.yml"))
manifested_users_roles: List[str] = [] manifested_users_roles: List[str] = []
manifested_flatpak_roles: List[str] = []
manifested_snap_roles: List[str] = []
manifested_apt_config_roles: List[str] = [] manifested_apt_config_roles: List[str] = []
manifested_dnf_config_roles: List[str] = [] manifested_dnf_config_roles: List[str] = []
manifested_firewall_runtime_roles: List[str] = [] manifested_firewall_runtime_roles: List[str] = []
@ -885,7 +958,7 @@ def _manifest_from_bundle_dir(
# ------------------------- # -------------------------
# Users role (non-system users) # Users role (non-system users)
# ------------------------- # -------------------------
if users_snapshot and users_snapshot.get("users"): if users_snapshot:
role = users_snapshot.get("role_name", "users") role = users_snapshot.get("role_name", "users")
role_dir = os.path.join(roles_root, role) role_dir = os.path.join(roles_root, role)
_write_role_scaffold(role_dir) _write_role_scaffold(role_dir)
@ -970,6 +1043,33 @@ def _manifest_from_bundle_dir(
} }
) )
# Build Flatpak and Snap lists. Flatpak can be installed system-wide or
# per-user. Snap packages are system-wide; per-user ~/snap/* directories
# are runtime/user data and are not treated as install sources.
users_flatpaks: List[Dict[str, Any]] = []
user_flatpak_map = users_snapshot.get("user_flatpaks", {}) or {}
home_by_user = {
str(u.get("name")): str(u.get("home") or "") for u in users_data
}
for uname, flatpaks in user_flatpak_map.items():
for fp in flatpaks or []:
users_flatpaks.append(
_normalise_flatpak_item(
fp,
method="user",
user=str(uname),
home=home_by_user.get(str(uname)) or None,
)
)
flatpak_remotes = [
_normalise_flatpak_remote(r)
for r in (users_snapshot.get("user_flatpak_remotes", []) or [])
]
users_needs_community = bool(flatpak_remotes or users_flatpaks)
if users_needs_community:
_ensure_requirements_yaml(os.path.join(out_dir, "requirements.yml"))
# Variables are host-specific in site mode; in non-site mode they live in role defaults. # Variables are host-specific in site mode; in non-site mode they live in role defaults.
if site_mode: if site_mode:
_write_role_defaults( _write_role_defaults(
@ -978,6 +1078,8 @@ def _manifest_from_bundle_dir(
"users_groups": [], "users_groups": [],
"users_users": [], "users_users": [],
"users_ssh_files": [], "users_ssh_files": [],
"users_flatpaks": [],
"users_flatpak_remotes": [],
}, },
) )
_write_hostvars( _write_hostvars(
@ -988,6 +1090,8 @@ def _manifest_from_bundle_dir(
"users_groups": group_names, "users_groups": group_names,
"users_users": users_data, "users_users": users_data,
"users_ssh_files": ssh_files, "users_ssh_files": ssh_files,
"users_flatpaks": users_flatpaks,
"users_flatpak_remotes": flatpak_remotes,
}, },
) )
else: else:
@ -997,13 +1101,23 @@ def _manifest_from_bundle_dir(
"users_groups": group_names, "users_groups": group_names,
"users_users": users_data, "users_users": users_data,
"users_ssh_files": ssh_files, "users_ssh_files": ssh_files,
"users_flatpaks": users_flatpaks,
"users_flatpak_remotes": flatpak_remotes,
}, },
) )
with open( with open(
os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8" os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8"
) as f: ) as f:
f.write("---\ndependencies: []\n") if users_needs_community:
f.write(
"---\n"
"dependencies: []\n"
"collections:\n"
" - community.general\n"
)
else:
f.write("---\ndependencies: []\n")
# tasks (data-driven) # tasks (data-driven)
users_tasks = """--- users_tasks = """---
@ -1056,6 +1170,52 @@ def _manifest_from_bundle_dir(
group: "{{ item.group }}" group: "{{ item.group }}"
mode: "{{ item.mode }}" mode: "{{ item.mode }}"
loop: "{{ users_ssh_files | default([]) }}" loop: "{{ users_ssh_files | default([]) }}"
"""
if flatpak_remotes or users_flatpaks:
users_tasks += """
- name: Ensure user Flatpak remotes exist
ansible.builtin.command:
argv:
- flatpak
- remote-add
- --user
- --if-not-exists
- "{{ item.name }}"
- "{{ item.url }}"
loop: "{{ users_flatpak_remotes | default([]) | selectattr('method', 'equalto', 'user') | list }}"
when:
- item.name is defined
- item.url is defined
- item.url | length > 0
- item.user is defined
become: true
become_user: "{{ item.user }}"
environment:
HOME: "{{ item.home | default('/home/' ~ item.user, true) }}"
XDG_DATA_HOME: "{{ (item.home | default('/home/' ~ item.user, true)) ~ '/.local/share' }}"
changed_when: false
- name: Install user Flatpaks
community.general.flatpak:
name:
- "{{ item.name }}"
state: present
method: user
remote: "{{ item.remote | default(omit) }}"
from_url: "{{ item.from_url | default(omit) }}"
loop: "{{ users_flatpaks | default([]) }}"
when:
- item.name is defined
- item.name | length > 0
- item.user is defined
become: true
become_user: "{{ item.user }}"
environment:
HOME: "{{ item.home | default('/home/' ~ item.user, true) }}"
XDG_DATA_HOME: "{{ (item.home | default('/home/' ~ item.user, true)) ~ '/.local/share' }}"
""" """
with open( with open(
@ -1068,10 +1228,67 @@ def _manifest_from_bundle_dir(
) as f: ) as f:
f.write("---\n") f.write("---\n")
def _fmt_app_list(items: List[Dict[str, Any]]) -> str:
lines = []
for item in items:
name = item.get("name")
if not name:
continue
detail_parts = []
for key in ("remote", "channel", "revision", "branch", "arch"):
value = item.get(key)
if value not in (None, "", []):
detail_parts.append(f"{key}={value}")
for key in ("classic", "devmode", "dangerous"):
if item.get(key):
detail_parts.append(key)
details = f" ({', '.join(detail_parts)})" if detail_parts else ""
lines.append(f"- {name}{details}")
return "\n".join(lines) or "- (none)"
def _fmt_user_flatpaks(items: List[Dict[str, Any]]) -> str:
lines = []
for item in items:
name = item.get("name")
user = item.get("user")
if not name or not user:
continue
detail_parts = []
for key in ("remote", "branch", "arch"):
value = item.get(key)
if value not in (None, "", []):
detail_parts.append(f"{key}={value}")
details = f" ({', '.join(detail_parts)})" if detail_parts else ""
lines.append(f"- {user}: {name}{details}")
return "\n".join(lines) or "- (none)"
def _fmt_remotes(items: List[Dict[str, Any]]) -> str:
lines = []
for item in items:
name = item.get("name")
url = item.get("url")
method = item.get("method") or "system"
user = item.get("user")
if not name or not url:
continue
owner = f"user={user}" if user else "system"
lines.append(f"- {name} ({method}, {owner}): {url}")
return "\n".join(lines) or "- (none)"
readme = ( readme = (
"""# users """# users
Generated non-system user accounts and SSH public material. Generated non-system user accounts, SSH public material, and per-user Flatpak
applications/remotes.
**Note:** User Flatpak tasks require the `community.general` Ansible collection.
Install it with: `ansible-galaxy collection install -r requirements.yml`.
Flatpak `remote` is harvested from the installed deployment where detectable.
The original `.flatpakref` URL is generally not preserved by Flatpak after
installation, so `from_url` is only emitted if a future/hand-edited state file
contains it.
## Users ## Users
""" """
@ -1089,6 +1306,14 @@ Generated non-system user accounts and SSH public material.
or "- (none)" or "- (none)"
) )
+ """\n + """\n
## Flatpak remotes
"""
+ _fmt_remotes(flatpak_remotes)
+ """\n
## User Flatpaks
"""
+ _fmt_user_flatpaks(users_flatpaks)
+ """\n
## Excluded ## Excluded
""" """
+ ( + (
@ -1106,6 +1331,274 @@ Generated non-system user accounts and SSH public material.
manifested_users_roles.append(role) manifested_users_roles.append(role)
# -------------------------
# Flatpak role (system-wide Flatpak remotes and applications)
# -------------------------
raw_flatpak_apps = flatpak_snapshot.get("system_flatpaks", []) or []
raw_flatpak_remotes = flatpak_snapshot.get("remotes", []) or []
if flatpak_snapshot:
role = flatpak_snapshot.get("role_name", "flatpak")
role_dir = os.path.join(roles_root, role)
_write_role_scaffold(role_dir)
_ensure_requirements_yaml(os.path.join(out_dir, "requirements.yml"))
flatpak_system_flatpaks = [
_normalise_flatpak_item(fp, method="system") for fp in raw_flatpak_apps
]
flatpak_remotes = [_normalise_flatpak_remote(r) for r in raw_flatpak_remotes]
vars_map = {
"flatpak_system_flatpaks": flatpak_system_flatpaks,
"flatpak_remotes": flatpak_remotes,
}
if site_mode:
_write_role_defaults(
role_dir,
{"flatpak_system_flatpaks": [], "flatpak_remotes": []},
)
_write_hostvars(out_dir, fqdn or "", role, vars_map)
else:
_write_role_defaults(role_dir, vars_map)
with open(
os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8"
) as f:
f.write(
"---\n" "dependencies: []\n" "collections:\n" " - community.general\n"
)
tasks = """---
- name: Ensure system Flatpak remotes exist
ansible.builtin.command:
argv:
- flatpak
- remote-add
- --system
- --if-not-exists
- "{{ item.name }}"
- "{{ item.url }}"
loop: "{{ flatpak_remotes | default([]) }}"
when:
- item.name is defined
- item.url is defined
- item.url | length > 0
become: true
changed_when: false
- name: Install system-wide Flatpaks
community.general.flatpak:
name:
- "{{ item.name }}"
state: present
method: system
remote: "{{ item.remote | default(omit) }}"
from_url: "{{ item.from_url | default(omit) }}"
loop: "{{ flatpak_system_flatpaks | default([]) }}"
when:
- item.name is defined
- item.name | length > 0
become: true
"""
with open(
os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8"
) as f:
f.write(tasks)
with open(
os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8"
) as f:
f.write("---\n")
def _fmt_flatpak_apps(items: List[Dict[str, Any]]) -> str:
lines = []
for item in items:
name = item.get("name")
if not name:
continue
detail_parts = []
for key in ("remote", "branch", "arch"):
value = item.get(key)
if value not in (None, "", []):
detail_parts.append(f"{key}={value}")
details = f" ({', '.join(detail_parts)})" if detail_parts else ""
lines.append(f"- {name}{details}")
return "\n".join(lines) or "- (none)"
def _fmt_flatpak_remotes(items: List[Dict[str, Any]]) -> str:
lines = []
for item in items:
name = item.get("name")
url = item.get("url")
if not name or not url:
continue
lines.append(f"- {name}: {url}")
return "\n".join(lines) or "- (none)"
notes = flatpak_snapshot.get("notes", []) or []
readme = (
"""# flatpak
Generated system-wide Flatpak remotes and applications.
**Note:** This role requires the `community.general` Ansible collection.
Install it with: `ansible-galaxy collection install -r requirements.yml`.
Flatpak `remote` is harvested from the installed deployment where detectable.
The original `.flatpakref` URL is generally not preserved by Flatpak after
installation, so `from_url` is only emitted if a future/hand-edited state file
contains it.
## System Flatpak remotes
"""
+ _fmt_flatpak_remotes(flatpak_remotes)
+ """\n
## System-wide Flatpaks
"""
+ _fmt_flatpak_apps(flatpak_system_flatpaks)
+ """\n
## Notes
"""
+ ("\n".join([f"- {n}" for n in notes]) or "- (none)")
+ """\n"""
)
with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f:
f.write(readme)
manifested_flatpak_roles.append(role)
# -------------------------
# Snap role (system-wide snap packages)
# -------------------------
raw_system_snaps = snap_snapshot.get("system_snaps", []) or []
if raw_system_snaps:
role = snap_snapshot.get("role_name", "snap") if snap_snapshot else "snap"
role_dir = os.path.join(roles_root, role)
_write_role_scaffold(role_dir)
_ensure_requirements_yaml(os.path.join(out_dir, "requirements.yml"))
snap_system_snaps = [_normalise_snap_item(s) for s in raw_system_snaps]
vars_map = {"snap_system_snaps": snap_system_snaps}
if site_mode:
_write_role_defaults(role_dir, {"snap_system_snaps": []})
_write_hostvars(out_dir, fqdn or "", role, vars_map)
else:
_write_role_defaults(role_dir, vars_map)
with open(
os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8"
) as f:
f.write(
"---\n" "dependencies: []\n" "collections:\n" " - community.general\n"
)
tasks = """---
- name: Install system-wide snaps with full detected attributes
community.general.snap:
name:
- "{{ item.name }}"
state: present
channel: "{{ item.channel | default(omit) if not (item.install_revision | default(false)) else omit }}"
revision: "{{ item.revision | default(omit) if (item.install_revision | default(false)) else omit }}"
classic: "{{ item.classic | default(false) }}"
devmode: "{{ item.devmode | default(false) }}"
dangerous: "{{ item.dangerous | default(false) }}"
loop: "{{ snap_system_snaps | default([]) }}"
when:
- item.name is defined
- item.name | length > 0
become: true
register: _enroll_snap_full_results
ignore_errors: true
- name: Install system-wide snaps with compatibility options
community.general.snap:
name:
- "{{ item.item.name }}"
state: present
channel: "{{ item.item.channel | default(omit) if not (item.item.install_revision | default(false)) else omit }}"
classic: "{{ item.item.classic | default(false) }}"
loop: "{{ (_enroll_snap_full_results | default({})).results | default([]) }}"
when:
- item.failed | default(false)
- item.item.name is defined
- item.item.name | length > 0
become: true
register: _enroll_snap_compat_results
ignore_errors: true
- name: Install system-wide snaps with minimal options
community.general.snap:
name:
- "{{ item.item.item.name }}"
state: present
loop: "{{ (_enroll_snap_compat_results | default({})).results | default([]) }}"
when:
- item.failed | default(false)
- item.item.item.name is defined
- item.item.item.name | length > 0
become: true
ignore_errors: true
"""
with open(
os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8"
) as f:
f.write(tasks)
with open(
os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8"
) as f:
f.write("---\n")
def _fmt_snap_apps(items: List[Dict[str, Any]]) -> str:
lines = []
for item in items:
name = item.get("name")
if not name:
continue
detail_parts = []
for key in ("channel", "revision"):
value = item.get(key)
if value not in (None, "", []):
detail_parts.append(f"{key}={value}")
for key in ("classic", "devmode", "dangerous"):
if item.get(key):
detail_parts.append(key)
details = f" ({', '.join(detail_parts)})" if detail_parts else ""
lines.append(f"- {name}{details}")
return "\n".join(lines) or "- (none)"
notes = snap_snapshot.get("notes", []) or []
readme = (
"""# snap
Generated system-wide snap packages.
**Note:** This role requires the `community.general` Ansible collection.
Install it with: `ansible-galaxy collection install -r requirements.yml`.
The first install task uses all harvested attributes. If the installed
`community.general.snap` module is too old for some parameters, the generated
role falls back to reduced then minimal install tasks on a best-effort basis.
## System-wide snaps
"""
+ _fmt_snap_apps(snap_system_snaps)
+ """\n
## Notes
"""
+ ("\n".join([f"- {n}" for n in notes]) or "- (none)")
+ """\n"""
)
with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f:
f.write(readme)
manifested_snap_roles.append(role)
# ------------------------- # -------------------------
# apt_config role (APT sources, pinning, and keyrings) # apt_config role (APT sources, pinning, and keyrings)
# ------------------------- # -------------------------
@ -1880,7 +2373,8 @@ User-requested extra file harvesting.
# Service roles # Service roles
# ------------------------- # -------------------------
for svc in services: for svc in services:
role = svc["role_name"] source_role = svc["role_name"]
role = avoid_reserved_role_name(source_role, prefix="service")
unit = svc["unit"] unit = svc["unit"]
pkgs = svc.get("packages", []) or [] pkgs = svc.get("packages", []) or []
managed_files = svc.get("managed_files", []) or [] managed_files = svc.get("managed_files", []) or []
@ -1899,7 +2393,7 @@ User-requested extra file harvesting.
templated, jt_vars = _jinjify_managed_files( templated, jt_vars = _jinjify_managed_files(
bundle_dir, bundle_dir,
role, source_role,
role_dir, role_dir,
managed_files, managed_files,
jt_exe=jt_exe, jt_exe=jt_exe,
@ -1911,14 +2405,14 @@ User-requested extra file harvesting.
if site_mode: if site_mode:
_copy_artifacts( _copy_artifacts(
bundle_dir, bundle_dir,
role, source_role,
_host_role_files_dir(out_dir, fqdn or "", role), _host_role_files_dir(out_dir, fqdn or "", role),
exclude_rels=templated, exclude_rels=templated,
) )
else: else:
_copy_artifacts( _copy_artifacts(
bundle_dir, bundle_dir,
role, source_role,
os.path.join(role_dir, "files"), os.path.join(role_dir, "files"),
exclude_rels=templated, exclude_rels=templated,
) )
@ -2152,7 +2646,8 @@ This role was created by merging simple packages using the `--merge-simple-packa
# Process package roles (those with configuration files) # Process package roles (those with configuration files)
for pr in package_roles: for pr in package_roles:
role = pr["role_name"] source_role = pr["role_name"]
role = avoid_reserved_role_name(source_role, prefix="package")
pkg = pr.get("package") or "" pkg = pr.get("package") or ""
managed_files = pr.get("managed_files", []) or [] managed_files = pr.get("managed_files", []) or []
managed_dirs = pr.get("managed_dirs", []) or [] managed_dirs = pr.get("managed_dirs", []) or []
@ -2165,7 +2660,7 @@ This role was created by merging simple packages using the `--merge-simple-packa
templated, jt_vars = _jinjify_managed_files( templated, jt_vars = _jinjify_managed_files(
bundle_dir, bundle_dir,
role, source_role,
role_dir, role_dir,
managed_files, managed_files,
jt_exe=jt_exe, jt_exe=jt_exe,
@ -2177,14 +2672,14 @@ This role was created by merging simple packages using the `--merge-simple-packa
if site_mode: if site_mode:
_copy_artifacts( _copy_artifacts(
bundle_dir, bundle_dir,
role, source_role,
_host_role_files_dir(out_dir, fqdn or "", role), _host_role_files_dir(out_dir, fqdn or "", role),
exclude_rels=templated, exclude_rels=templated,
) )
else: else:
_copy_artifacts( _copy_artifacts(
bundle_dir, bundle_dir,
role, source_role,
os.path.join(role_dir, "files"), os.path.join(role_dir, "files"),
exclude_rels=templated, exclude_rels=templated,
) )
@ -2294,6 +2789,8 @@ Generated for package `{pkg}`.
+ manifested_etc_custom_roles + manifested_etc_custom_roles
+ manifested_usr_local_custom_roles + manifested_usr_local_custom_roles
+ manifested_extra_paths_roles + manifested_extra_paths_roles
+ manifested_flatpak_roles
+ manifested_snap_roles
+ manifested_users_roles + manifested_users_roles
+ tail_roles + tail_roles
+ manifested_firewall_runtime_roles + manifested_firewall_runtime_roles

28
enroll/role_names.py Normal file
View file

@ -0,0 +1,28 @@
from __future__ import annotations
RESERVED_SINGLETON_ROLE_NAMES = {
"users",
"flatpak",
"snap",
"apt_config",
"dnf_config",
"firewall_runtime",
"etc_custom",
"usr_local_custom",
"extra_paths",
"common_packages",
}
def avoid_reserved_role_name(role_name: str, *, prefix: str) -> str:
"""Return a role name that cannot collide with singleton roles.
Singleton roles are generated once per manifest from dedicated top-level
state sections. Package and service roles can naturally have the same names
as those singletons, e.g. the OS package named ``flatpak``. Prefix those
generated package/service roles so they cannot overwrite singleton role
directories during manifestation.
"""
if role_name in RESERVED_SINGLETON_ROLE_NAMES:
return f"{prefix}_{role_name}"
return role_name

View file

@ -575,6 +575,21 @@
"$ref": "#/$defs/UserEntry" "$ref": "#/$defs/UserEntry"
}, },
"type": "array" "type": "array"
},
"user_flatpaks": {
"additionalProperties": {
"type": "array",
"items": {
"$ref": "#/$defs/FlatpakInstall"
}
},
"type": "object"
},
"user_flatpak_remotes": {
"type": "array",
"items": {
"$ref": "#/$defs/FlatpakRemote"
}
} }
}, },
"required": [ "required": [
@ -656,6 +671,224 @@
"notes" "notes"
], ],
"type": "object" "type": "object"
},
"FlatpakInstall": {
"additionalProperties": false,
"properties": {
"name": {
"type": "string",
"minLength": 1
},
"method": {
"type": "string",
"enum": [
"system",
"user"
]
},
"remote": {
"type": [
"string",
"null"
]
},
"branch": {
"type": [
"string",
"null"
]
},
"arch": {
"type": [
"string",
"null"
]
},
"kind": {
"type": [
"string",
"null"
],
"enum": [
"app",
"runtime",
null
]
},
"ref": {
"type": [
"string",
"null"
]
},
"user": {
"type": [
"string",
"null"
]
},
"home": {
"type": [
"string",
"null"
]
},
"source": {
"type": "string"
},
"from_url": {
"type": "string",
"minLength": 1
}
},
"required": [
"name",
"method"
],
"type": "object"
},
"FlatpakRemote": {
"additionalProperties": false,
"properties": {
"name": {
"type": "string",
"minLength": 1
},
"method": {
"type": "string",
"enum": [
"system",
"user"
]
},
"url": {
"type": "string",
"minLength": 1
},
"user": {
"type": [
"string",
"null"
]
},
"home": {
"type": [
"string",
"null"
]
},
"source": {
"type": "string"
}
},
"required": [
"name",
"method",
"url"
],
"type": "object"
},
"SnapInstall": {
"additionalProperties": false,
"properties": {
"name": {
"type": "string",
"minLength": 1
},
"channel": {
"type": [
"string",
"null"
]
},
"revision": {
"type": [
"integer",
"null"
],
"minimum": 0
},
"classic": {
"type": "boolean"
},
"devmode": {
"type": "boolean"
},
"dangerous": {
"type": "boolean"
},
"notes": {
"type": "array",
"items": {
"type": "string"
}
},
"source": {
"type": "string"
},
"install_revision": {
"type": "boolean"
}
},
"required": [
"name"
],
"type": "object"
},
"FlatpakSnapshot": {
"additionalProperties": false,
"properties": {
"role_name": {
"const": "flatpak"
},
"system_flatpaks": {
"type": "array",
"items": {
"$ref": "#/$defs/FlatpakInstall"
}
},
"remotes": {
"type": "array",
"items": {
"$ref": "#/$defs/FlatpakRemote"
}
},
"notes": {
"type": "array",
"items": {
"type": "string"
}
}
},
"required": [
"role_name"
],
"type": "object"
},
"SnapSnapshot": {
"additionalProperties": false,
"properties": {
"role_name": {
"const": "snap"
},
"system_snaps": {
"type": "array",
"items": {
"$ref": "#/$defs/SnapInstall"
}
},
"notes": {
"type": "array",
"items": {
"type": "string"
}
}
},
"required": [
"role_name"
],
"type": "object"
} }
}, },
"$id": "https://enroll.sh/schema/state.schema.json", "$id": "https://enroll.sh/schema/state.schema.json",
@ -766,6 +999,12 @@
}, },
"firewall_runtime": { "firewall_runtime": {
"$ref": "#/$defs/FirewallRuntimeSnapshot" "$ref": "#/$defs/FirewallRuntimeSnapshot"
},
"flatpak": {
"$ref": "#/$defs/FlatpakSnapshot"
},
"snap": {
"$ref": "#/$defs/SnapSnapshot"
} }
}, },
"required": [ "required": [

View file

@ -44,6 +44,13 @@ poetry run \
--format json | jq --format json | jq
DEBIAN_FRONTEND=noninteractive apt-get remove -y --purge cowsay DEBIAN_FRONTEND=noninteractive apt-get remove -y --purge cowsay
# Common simple packages mode (is tested later)
poetry run \
enroll manifest \
--harvest "${BUNDLE_DIR}2" \
--out "${ANSIBLE_DIR}2" \
--merge-simple-packages
# Ansible test # Ansible test
builtin cd "${ANSIBLE_DIR}" builtin cd "${ANSIBLE_DIR}"
# Lint # Lint
@ -52,13 +59,8 @@ ansible-lint "${ANSIBLE_DIR}"
# Run # Run
ansible-playbook playbook.yml -i "localhost," -c local --check --diff ansible-playbook playbook.yml -i "localhost," -c local --check --diff
# Common simple packages mode # Test the --merge-simple-packages mode
poetry run \
enroll manifest \
--harvest "${BUNDLE_DIR}2" \
--out "${ANSIBLE_DIR}2" \
--merge-simple-packages
builtin cd "${ANSIBLE_DIR}2" builtin cd "${ANSIBLE_DIR}2"
ls "${ANSIBLE_DIR}2/roles" ls "${ANSIBLE_DIR}2/roles"
ansible-playbook playbook.yml -i "localhost," -c local --check --diff ansible-playbook playbook.yml -i "localhost," -c local --check --diff

View file

@ -312,3 +312,185 @@ def test_parse_group_handles_short_lines(tmp_path: Path):
assert 1000 in gid_to_name assert 1000 in gid_to_name
assert 1001 not in gid_to_name # skipped due to short line assert 1001 not in gid_to_name # skipped due to short line
assert 1002 in gid_to_name assert 1002 in gid_to_name
def test_find_flatpaks_in_root_detects_remote_branch_and_arch(tmp_path: Path):
import enroll.accounts as a
root = tmp_path / "flatpak"
(root / "repo").mkdir(parents=True)
(root / "repo" / "config").write_text(
'[remote "acme"]\nurl=https://flatpak.example/repo/\n',
encoding="utf-8",
)
ref = (
root
/ "repo"
/ "refs"
/ "remotes"
/ "acme"
/ "app"
/ "com.example.App"
/ "x86_64"
/ "stable"
)
ref.parent.mkdir(parents=True)
ref.write_text("checksum\n", encoding="utf-8")
active = root / "app" / "com.example.App" / "x86_64" / "stable" / "active"
active.mkdir(parents=True)
remotes = a.find_flatpak_remotes(str(root), method="system")
assert [(r.name, r.url, r.method) for r in remotes] == [
("acme", "https://flatpak.example/repo/", "system")
]
apps = a._find_flatpaks_in_root(str(root), method="system")
assert len(apps) == 1
assert apps[0].name == "com.example.App"
assert apps[0].remote == "acme"
assert apps[0].branch == "stable"
assert apps[0].arch == "x86_64"
def test_parse_snap_list_output_detects_channel_revision_and_modes():
import enroll.accounts as a
output = """Name Version Rev Tracking Publisher Notes
code abc 123 latest/stable vscode classic
mydev 1.0 42 latest/edge example devmode,dangerous
bare 1.0 5 latest/stable canonical base
"""
snaps = {snap.name: snap for snap in a._parse_snap_list_output(output)}
assert snaps["code"].channel == "latest/stable"
assert snaps["code"].revision == 123
assert snaps["code"].classic is True
assert snaps["mydev"].devmode is True
assert snaps["mydev"].dangerous is True
assert snaps["bare"].notes == ["base"]
def test_parse_flatpak_list_output_detects_system_refs():
from enroll.accounts import _parse_flatpak_list_output
output = "\n".join(
[
"app/org.example.App/x86_64/stable\tflathub\tstable\tx86_64",
"runtime/org.freedesktop.Platform/x86_64/24.08\tflathub\t24.08\tx86_64",
]
)
refs = _parse_flatpak_list_output(
output, method="system", columns=("ref", "origin", "branch", "arch")
)
assert [(r.kind, r.name, r.remote, r.branch, r.arch) for r in refs] == [
("app", "org.example.App", "flathub", "stable", "x86_64"),
("runtime", "org.freedesktop.Platform", "flathub", "24.08", "x86_64"),
]
assert refs[0].source == "flatpak-list"
def test_find_system_flatpaks_prefers_flatpak_list(monkeypatch):
import subprocess
import enroll.accounts as a
calls = []
def fake_run(args, **kwargs):
calls.append(args)
if args == ["flatpak", "list", "--columns=help"]:
return subprocess.CompletedProcess(
args,
0,
stdout="application\norigin\nbranch\narch\n",
stderr="",
)
return subprocess.CompletedProcess(
args,
0,
stdout="app/org.example.App/x86_64/stable\tacme\tstable\tx86_64\n",
stderr="",
)
monkeypatch.setattr(a.shutil, "which", lambda cmd: "/usr/bin/flatpak")
monkeypatch.setattr(a.subprocess, "run", fake_run)
monkeypatch.setattr(
a,
"_find_flatpaks_in_root",
lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("fallback used")),
)
refs = a.find_system_flatpaks()
assert calls[0] == ["flatpak", "list", "--columns=help"]
assert calls[1][:3] == ["flatpak", "list", "--system"]
assert refs[0].name == "org.example.App"
assert refs[0].method == "system"
assert refs[0].remote == "acme"
def test_parse_flatpak_list_output_detects_application_columns():
from enroll.accounts import _parse_flatpak_list_output
output = "org.example.App\tflathub\tstable\tx86_64\n"
refs = _parse_flatpak_list_output(
output, method="system", columns=("application", "origin", "branch", "arch")
)
assert len(refs) == 1
assert refs[0].name == "org.example.App"
assert refs[0].kind is None
assert refs[0].remote == "flathub"
assert refs[0].branch == "stable"
assert refs[0].arch == "x86_64"
def test_parse_plain_flatpak_list_output_like_default_table():
from enroll.accounts import _parse_flatpak_list_output
output = """Name Application ID Version Branch Installation
Mesa org.freedesktop.Platform.GL.default 26.0.6 25.08 system
Mesa (Extra) org.freedesktop.Platform.GL.default 26.0.6 25.08-extra system
Codecs Extra Extension org.freedesktop.Platform.codecs-extra 25.08-extra system
KDE Application Platform org.kde.Platform 6.10 system
OnionShare org.onionshare.OnionShare 2.6.4 stable system
"""
refs = _parse_flatpak_list_output(output, method="system", columns=None)
by_name_branch = {(r.name, r.branch) for r in refs}
assert ("org.onionshare.OnionShare", "stable") in by_name_branch
assert ("org.freedesktop.Platform.GL.default", "25.08") in by_name_branch
assert ("org.freedesktop.Platform.GL.default", "25.08-extra") in by_name_branch
assert ("org.kde.Platform", "6.10") in by_name_branch
def test_parse_flatpak_columns_help_handles_description_table():
from enroll.accounts import _parse_flatpak_columns_help
output = """
Available columns:
application The application ID
branch The branch
installation The installation
"""
assert _parse_flatpak_columns_help(output) >= {
"application",
"branch",
"installation",
}
def test_flatpak_list_attempts_respect_supported_columns():
from enroll.accounts import _flatpak_list_attempts
attempts = _flatpak_list_attempts(
"--system", {"application", "branch", "installation"}
)
command_strings = [" ".join(args) for args, _columns in attempts]
assert any("--columns=application,branch" in cmd for cmd in command_strings)
assert not any("origin" in cmd for cmd in command_strings)
assert command_strings[-1] == "flatpak list --system"

View file

@ -224,6 +224,19 @@ def test_harvest_dedup_manual_packages_and_builds_etc_custom(
monkeypatch.setattr(harvest, "collect_non_system_users", lambda: []) monkeypatch.setattr(harvest, "collect_non_system_users", lambda: [])
import enroll.accounts as accounts
monkeypatch.setattr(accounts, "find_system_flatpaks", lambda: [])
monkeypatch.setattr(accounts, "find_system_flatpak_remotes", lambda: [])
monkeypatch.setattr(
accounts, "find_user_flatpak_remotes", lambda home, user=None: []
)
monkeypatch.setattr(
accounts,
"find_system_snaps",
lambda: [accounts.SnapInstall(name="code", channel="latest/stable")],
)
def fake_stat_triplet(p: str): def fake_stat_triplet(p: str):
if p == "/usr/local/bin/myscript": if p == "/usr/local/bin/myscript":
return ("root", "root", "0755") return ("root", "root", "0755")
@ -259,6 +272,9 @@ def test_harvest_dedup_manual_packages_and_builds_etc_custom(
for o in openvpn_obs for o in openvpn_obs
) )
assert st["roles"]["snap"]["role_name"] == "snap"
assert st["roles"]["snap"]["system_snaps"][0]["name"] == "code"
# Service role captured modified conffile # Service role captured modified conffile
svc = st["roles"]["services"][0] svc = st["roles"]["services"][0]
assert svc["unit"] == "openvpn.service" assert svc["unit"] == "openvpn.service"

View file

@ -286,3 +286,20 @@ def test_collect_firewall_runtime_snapshot_is_per_family_fallback(
assert ( assert (
tmp_path / "artifacts" / "firewall_runtime" / "firewall" / "iptables.v6" tmp_path / "artifacts" / "firewall_runtime" / "firewall" / "iptables.v6"
).exists() ).exists()
def test_package_role_names_do_not_collide_with_singleton_roles():
from enroll.harvest import _role_name_from_pkg
assert _role_name_from_pkg("flatpak") == "package_flatpak"
assert _role_name_from_pkg("snap") == "package_snap"
assert _role_name_from_pkg("users") == "package_users"
assert _role_name_from_pkg("nginx") == "nginx"
def test_service_role_names_do_not_collide_with_singleton_roles():
from enroll.harvest import _role_name_from_unit
assert _role_name_from_unit("flatpak.service") == "service_flatpak"
assert _role_name_from_unit("users.service") == "service_users"
assert _role_name_from_unit("nginx.service") == "nginx"

View file

@ -1064,3 +1064,317 @@ def test_render_firewall_runtime_tasks_with_ipv6():
} }
result = manifest._render_firewall_runtime_tasks(state) result = manifest._render_firewall_runtime_tasks(state)
assert len(result) >= 1 assert len(result) >= 1
def test_manifest_renders_flatpak_and_snap_details(tmp_path: Path):
bundle = tmp_path / "bundle"
out = tmp_path / "ansible"
state = {
"schema_version": 3,
"host": {"hostname": "test", "os": "debian", "pkg_backend": "dpkg"},
"inventory": {"packages": {}},
"roles": {
"users": {
"role_name": "users",
"users": [
{
"name": "alice",
"uid": 1000,
"gid": 1000,
"gecos": "Alice",
"home": "/home/alice",
"shell": "/bin/bash",
"primary_group": "alice",
"supplementary_groups": [],
}
],
"managed_files": [],
"excluded": [],
"notes": [],
"user_flatpak_remotes": [
{
"name": "acme-user",
"method": "user",
"url": "https://flatpak.example/user-repo/",
"user": "alice",
"home": "/home/alice",
},
],
"user_flatpaks": {
"alice": [
{
"name": "org.example.UserApp",
"method": "user",
"remote": "acme-user",
"branch": "stable",
"arch": "x86_64",
}
]
},
},
"flatpak": {
"role_name": "flatpak",
"remotes": [
{
"name": "acme",
"method": "system",
"url": "https://flatpak.example/repo/",
},
],
"system_flatpaks": [
{
"name": "com.example.App",
"method": "system",
"remote": "acme",
"branch": "stable",
"arch": "x86_64",
}
],
"notes": [],
},
"snap": {
"role_name": "snap",
"system_snaps": [
{
"name": "code",
"channel": "latest/stable",
"revision": 123,
"classic": True,
"notes": ["classic"],
}
],
"notes": [],
},
"services": [],
"packages": [],
"apt_config": {
"role_name": "apt_config",
"managed_files": [],
"excluded": [],
"notes": [],
},
"dnf_config": {
"role_name": "dnf_config",
"managed_files": [],
"excluded": [],
"notes": [],
},
"etc_custom": {
"role_name": "etc_custom",
"managed_files": [],
"excluded": [],
"notes": [],
},
"usr_local_custom": {
"role_name": "usr_local_custom",
"managed_files": [],
"excluded": [],
"notes": [],
},
"extra_paths": {
"role_name": "extra_paths",
"include_patterns": [],
"exclude_patterns": [],
"managed_files": [],
"excluded": [],
"notes": [],
},
},
}
bundle.mkdir(parents=True, exist_ok=True)
(bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8")
manifest.manifest(str(bundle), str(out))
users_defaults = (out / "roles" / "users" / "defaults" / "main.yml").read_text(
encoding="utf-8"
)
users_tasks = (out / "roles" / "users" / "tasks" / "main.yml").read_text(
encoding="utf-8"
)
users_readme = (out / "roles" / "users" / "README.md").read_text(encoding="utf-8")
flatpak_defaults = (out / "roles" / "flatpak" / "defaults" / "main.yml").read_text(
encoding="utf-8"
)
flatpak_tasks = (out / "roles" / "flatpak" / "tasks" / "main.yml").read_text(
encoding="utf-8"
)
snap_defaults = (out / "roles" / "snap" / "defaults" / "main.yml").read_text(
encoding="utf-8"
)
snap_tasks = (out / "roles" / "snap" / "tasks" / "main.yml").read_text(
encoding="utf-8"
)
assert "users_flatpak_remotes:" in users_defaults
assert "remote: acme-user" in users_defaults
assert "community.general.snap" not in users_tasks
assert "Install system-wide snaps" not in users_tasks
assert "Install system-wide Flatpaks" not in users_tasks
assert "ansible-galaxy collection install -r requirements.yml" in users_readme
assert "snap_system_snaps:" in snap_defaults
assert "channel: latest/stable" in snap_defaults
assert "classic: true" in snap_defaults
assert "community.general.snap" in snap_tasks
assert "Install system-wide snaps with full detected attributes" in snap_tasks
assert "Install system-wide snaps with compatibility options" in snap_tasks
assert "Install system-wide snaps with minimal options" in snap_tasks
assert "ignore_errors: true" in snap_tasks
assert "flatpak_system_flatpaks:" in flatpak_defaults
assert "remote: acme" in flatpak_defaults
assert "community.general.flatpak" in flatpak_tasks
assert "Install system-wide Flatpaks" in flatpak_tasks
assert (out / "requirements.yml").exists()
def test_users_role_without_portable_apps_omits_community_general_tasks(tmp_path):
bundle = tmp_path / "bundle"
out = tmp_path / "out"
state = {
"roles": {
"users": {
"role_name": "users",
"users": [
{
"name": "alice",
"uid": 1000,
"gid": 1000,
"gecos": "Alice",
"home": "/home/alice",
"shell": "/bin/bash",
"primary_group": "alice",
"supplementary_groups": [],
}
],
"managed_files": [],
"excluded": [],
"notes": [],
},
"services": [],
"packages": [],
},
}
bundle.mkdir(parents=True, exist_ok=True)
(bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8")
manifest.manifest(str(bundle), str(out))
users_tasks = (out / "roles" / "users" / "tasks" / "main.yml").read_text(
encoding="utf-8"
)
users_meta = (out / "roles" / "users" / "meta" / "main.yml").read_text(
encoding="utf-8"
)
assert "community.general.flatpak" not in users_tasks
assert "community.general.snap" not in users_tasks
assert "collections:" not in users_meta
def test_manifest_emits_flatpak_role_even_when_no_flatpaks(tmp_path):
bundle = tmp_path / "bundle"
out = tmp_path / "out"
state = {
"roles": {
"users": {
"role_name": "users",
"users": [],
"managed_files": [],
"excluded": [],
"notes": [],
"user_flatpaks": {},
"user_flatpak_remotes": [],
},
"flatpak": {
"role_name": "flatpak",
"system_flatpaks": [],
"remotes": [],
"notes": [],
},
"services": [],
"packages": [],
}
}
bundle.mkdir(parents=True, exist_ok=True)
(bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8")
manifest.manifest(str(bundle), str(out))
flatpak_tasks = (out / "roles" / "flatpak" / "tasks" / "main.yml").read_text(
encoding="utf-8"
)
flatpak_defaults = (out / "roles" / "flatpak" / "defaults" / "main.yml").read_text(
encoding="utf-8"
)
assert "flatpak_system_flatpaks: []" in flatpak_defaults
assert "flatpak_remotes: []" in flatpak_defaults
assert "Install system-wide Flatpaks" in flatpak_tasks
assert "Ensure system Flatpak remotes exist" in flatpak_tasks
def test_manifest_avoids_package_role_collision_with_flatpak_singleton(tmp_path):
bundle = tmp_path / "bundle"
out = tmp_path / "out"
state = {
"roles": {
"users": {
"role_name": "users",
"users": [],
"managed_files": [],
"excluded": [],
"notes": [],
"user_flatpaks": {},
"user_flatpak_remotes": [],
},
"flatpak": {
"role_name": "flatpak",
"remotes": [
{
"name": "flathub",
"method": "system",
"url": "https://dl.flathub.org/repo/",
}
],
"system_flatpaks": [
{
"name": "org.onionshare.OnionShare",
"method": "system",
"remote": "flathub",
"branch": "stable",
"arch": "x86_64",
}
],
"notes": [],
},
"services": [],
"packages": [
{
"package": "flatpak",
"role_name": "flatpak",
"managed_files": [],
"managed_dirs": [],
"managed_links": [],
"excluded": [],
"notes": [],
"has_config": True,
}
],
}
}
bundle.mkdir(parents=True, exist_ok=True)
(bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8")
manifest.manifest(str(bundle), str(out))
flatpak_defaults = (out / "roles" / "flatpak" / "defaults" / "main.yml").read_text(
encoding="utf-8"
)
playbook = (out / "playbook.yml").read_text(encoding="utf-8")
assert "org.onionshare.OnionShare" in flatpak_defaults
assert (out / "roles" / "package_flatpak" / "tasks" / "main.yml").exists()
assert "role: flatpak" in playbook
assert "role: package_flatpak" in playbook