Compare commits

..

2 commits

Author SHA1 Message Date
eb1d096c90
Add support for detecting flatpaks and snaps
Some checks failed
CI / test (push) Failing after 5m51s
Lint / test (push) Successful in 43s
2026-06-14 18:25:26 +10:00
11351cce87
Fix test 2026-06-14 16:23:06 +10:00
11 changed files with 2042 additions and 23 deletions

View file

@ -1,5 +1,6 @@
# 0.7.0
* Add support for detecting flatpaks and snaps
* Add --merge-simple-packages to reduce the number of roles, for packages that have no config files or services to maintain.
# 0.6.0

View file

@ -1,8 +1,48 @@
from __future__ import annotations
import configparser
import os
from dataclasses import dataclass
from typing import Dict, List, Set, Tuple
import re
import shutil
import subprocess # nosec
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Set, Tuple
@dataclass
class FlatpakInstall:
name: str
method: str
remote: Optional[str] = None
branch: Optional[str] = None
arch: Optional[str] = None
kind: Optional[str] = None
ref: Optional[str] = None
user: Optional[str] = None
home: Optional[str] = None
source: str = "filesystem"
@dataclass
class FlatpakRemote:
name: str
method: str
url: str
user: Optional[str] = None
home: Optional[str] = None
source: str = "filesystem"
@dataclass
class SnapInstall:
name: str
channel: Optional[str] = None
revision: Optional[int] = None
classic: bool = False
devmode: bool = False
dangerous: bool = False
notes: List[str] = field(default_factory=list)
source: str = "snap-list"
@dataclass
@ -16,6 +56,7 @@ class UserRecord:
primary_group: str
supplementary_groups: List[str]
ssh_files: List[str]
flatpaks: List[FlatpakInstall] = field(default_factory=list)
def parse_login_defs(path: str = "/etc/login.defs") -> Dict[str, int]:
@ -115,6 +156,612 @@ def find_user_ssh_files(home: str) -> List[str]:
return sorted(set(out))
def _read_first_existing_text(paths: List[str]) -> Optional[str]:
for path in paths:
try:
with open(path, "r", encoding="utf-8", errors="replace") as f:
value = f.read().strip()
if value:
return value
except OSError:
continue
return None
def _parse_flatpak_ref(
ref: str,
) -> Tuple[Optional[str], str, Optional[str], Optional[str]]:
"""Return (kind, name, arch, branch) for a Flatpak ref.
refs look like app/org.example.App/x86_64/stable or
runtime/org.example.Platform/x86_64/23.08. If the value is already just an
application/runtime ID, keep it as the name and leave the other fields empty.
"""
parts = [p for p in (ref or "").strip().split("/") if p]
if len(parts) >= 4 and parts[0] in {"app", "runtime"}:
return parts[0], parts[1], parts[2], parts[3]
return None, (ref or "").strip(), None, None
def _parse_plain_flatpak_list_output(
output: str,
*,
method: str,
user: Optional[str] = None,
home: Optional[str] = None,
) -> List[FlatpakInstall]:
"""Parse default `flatpak list` table output.
Example:
Name Application ID Version Branch Installation
OnionShare org.onionshare.OnionShare 2.6.4 stable system
"""
out: List[FlatpakInstall] = []
seen: Set[
Tuple[str, Optional[str], Optional[str], Optional[str], Optional[str]]
] = set()
id_re = re.compile(r"\b(?:[A-Za-z0-9_-]+\.)+[A-Za-z0-9_-]+\b")
for line in output.splitlines():
line = line.rstrip()
if not line.strip():
continue
if "Application ID" in line and "Installation" in line:
continue
match = id_re.search(line)
if not match:
continue
name = match.group(0)
tail = line[match.end() :].split()
installation = tail[-1] if tail else ""
if installation in {"system", "user"} and installation != method:
continue
branch = None
if len(tail) >= 2 and tail[-1] in {"system", "user"}:
branch = tail[-2]
elif tail:
branch = tail[-1]
key = (name, None, branch, None, None)
if key in seen:
continue
seen.add(key)
out.append(
FlatpakInstall(
name=name,
method=method,
remote=None,
branch=branch,
arch=None,
kind=None,
ref=None,
user=user,
home=home,
source="flatpak-list",
)
)
return sorted(out, key=lambda f: (f.name, f.branch or ""))
def _parse_flatpak_list_output(
output: str,
*,
method: str,
columns: Optional[Tuple[str, ...]] = None,
user: Optional[str] = None,
home: Optional[str] = None,
) -> List[FlatpakInstall]:
"""Parse Flatpak list output.
If columns is None, parse the default table. Otherwise columns names must
match the order passed to `flatpak list --columns=...`.
"""
if columns is None:
return _parse_plain_flatpak_list_output(
output, method=method, user=user, home=home
)
out: List[FlatpakInstall] = []
seen: Set[
Tuple[str, Optional[str], Optional[str], Optional[str], Optional[str]]
] = set()
for line in output.splitlines():
line = line.strip()
if not line:
continue
lower = line.lower()
if lower.startswith("ref") or lower.startswith("application id"):
continue
parts = line.split("\t")
if len(parts) < len(columns):
parts = line.split()
if not parts:
continue
fields = {
name: parts[idx].strip()
for idx, name in enumerate(columns)
if idx < len(parts)
}
ref = fields.get("ref") or fields.get("application") or ""
kind, name, ref_arch, ref_branch = _parse_flatpak_ref(ref)
if not name:
continue
remote = fields.get("origin") or None
branch = fields.get("branch") or ref_branch
arch = fields.get("arch") or ref_arch
if remote in {"", "-"}:
remote = None
if branch in {"", "-"}:
branch = None
if arch in {"", "-"}:
arch = None
key = (name, remote, branch, arch, kind)
if key in seen:
continue
seen.add(key)
out.append(
FlatpakInstall(
name=name,
method=method,
remote=remote,
branch=branch,
arch=arch,
kind=kind,
ref=ref if "/" in ref else None,
user=user,
home=home,
source="flatpak-list",
)
)
return sorted(
out,
key=lambda f: (
f.kind or "",
f.name,
f.remote or "",
f.branch or "",
f.arch or "",
),
)
_KNOWN_FLATPAK_LIST_COLUMNS = {
"name",
"description",
"application",
"version",
"branch",
"arch",
"origin",
"installation",
"ref",
"active",
"latest",
"size",
"options",
}
def _parse_flatpak_columns_help(output: str) -> Set[str]:
"""Parse `flatpak list --columns=help` output into supported fields."""
supported: Set[str] = set()
for line in output.splitlines():
# Help output varies a bit between Flatpak versions. Treat any known
# token as a supported field, whether it appears alone or in a
# description table.
for token in re.findall(r"[A-Za-z_][A-Za-z0-9_-]*", line.lower()):
if token in _KNOWN_FLATPAK_LIST_COLUMNS:
supported.add(token)
return supported
def _run_flatpak_columns_help() -> Optional[Set[str]]:
if shutil.which("flatpak") is None:
return None
try:
proc = subprocess.run( # nosec
["flatpak", "list", "--columns=help"],
shell=False,
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=10,
)
except Exception:
return None
if proc.returncode != 0:
return None
supported = _parse_flatpak_columns_help(proc.stdout or "")
return supported or None
def _flatpak_list_attempts(
scope: str, supported: Optional[Set[str]]
) -> List[Tuple[List[str], Optional[Tuple[str, ...]]]]:
def supported_columns(*wanted: str) -> Optional[Tuple[str, ...]]:
if supported is not None and not set(wanted).issubset(supported):
return None
return tuple(wanted)
column_sets: List[Tuple[str, ...]] = []
for wanted in (
("application", "origin", "branch", "arch"),
("application", "branch", "arch"),
("application", "branch"),
("application",),
("ref", "origin", "branch", "arch"),
("ref", "branch", "arch"),
("ref", "branch"),
("ref",),
):
cols = supported_columns(*wanted)
if cols is not None and cols not in column_sets:
column_sets.append(cols)
attempts: List[Tuple[List[str], Optional[Tuple[str, ...]]]] = [
(
["flatpak", "list", scope, "--columns=" + ",".join(cols)],
cols,
)
for cols in column_sets
]
attempts.append((["flatpak", "list", scope], None))
return attempts
def _run_flatpak_list(method: str) -> Optional[Tuple[str, Optional[Tuple[str, ...]]]]:
if shutil.which("flatpak") is None:
return None
scope = "--system" if method == "system" else "--user"
supported = _run_flatpak_columns_help()
for args, columns in _flatpak_list_attempts(scope, supported):
try:
proc = subprocess.run( # nosec
args,
shell=False,
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=10,
)
except Exception: # nosec B112
continue
if proc.returncode == 0:
return proc.stdout or "", columns
return None
def _flatpak_remote_from_ref(
flatpak_root: str, app_id: str, arch: str, branch: str, remote_names: List[str]
) -> Optional[str]:
for remote_name in remote_names:
ref = os.path.join(
flatpak_root,
"repo",
"refs",
"remotes",
remote_name,
"app",
app_id,
arch,
branch,
)
if os.path.exists(ref):
return remote_name
return None
def _parse_flatpak_deploy_origin(branch_dir: str) -> Optional[str]:
active_dir = os.path.join(branch_dir, "active")
candidates = [
os.path.join(active_dir, "origin"),
os.path.join(active_dir, "metadata"),
]
origin = _read_first_existing_text([candidates[0]])
if origin:
return origin
metadata = candidates[1]
if os.path.isfile(metadata):
parser = configparser.ConfigParser(interpolation=None)
try:
parser.read(metadata, encoding="utf-8")
except Exception:
return None
for section in ("Application", "Runtime"):
if parser.has_option(section, "origin"):
value = parser.get(section, "origin", fallback="").strip()
if value:
return value
return None
def _find_flatpaks_in_root(
flatpak_root: str,
*,
method: str,
user: Optional[str] = None,
home: Optional[str] = None,
) -> List[FlatpakInstall]:
apps_dir = os.path.join(flatpak_root, "app")
if not os.path.isdir(apps_dir):
return []
remote_names = [
r.name
for r in find_flatpak_remotes(flatpak_root, method=method, user=user, home=home)
]
out: List[FlatpakInstall] = []
try:
app_ids = sorted(os.listdir(apps_dir))
except OSError:
return []
seen: Set[Tuple[str, Optional[str], Optional[str], Optional[str]]] = set()
for app_id in app_ids:
app_path = os.path.join(apps_dir, app_id)
if not os.path.isdir(app_path):
continue
try:
arches = sorted(os.listdir(app_path))
except OSError:
continue
for arch in arches:
arch_path = os.path.join(app_path, arch)
if not os.path.isdir(arch_path):
continue
try:
branches = sorted(os.listdir(arch_path))
except OSError:
continue
for branch in branches:
branch_path = os.path.join(arch_path, branch)
if not os.path.isdir(branch_path):
continue
active_dir = os.path.join(branch_path, "active")
if not os.path.exists(active_dir):
continue
remote = _parse_flatpak_deploy_origin(branch_path)
if not remote:
remote = _flatpak_remote_from_ref(
flatpak_root, app_id, arch, branch, remote_names
)
key = (app_id, remote, branch, arch)
if key in seen:
continue
seen.add(key)
out.append(
FlatpakInstall(
name=app_id,
method=method,
remote=remote,
branch=branch or None,
arch=arch or None,
kind="app",
ref=f"app/{app_id}/{arch}/{branch}",
user=user,
home=home,
)
)
return sorted(
out, key=lambda f: (f.name, f.remote or "", f.branch or "", f.arch or "")
)
def find_flatpak_remotes(
flatpak_root: str,
*,
method: str,
user: Optional[str] = None,
home: Optional[str] = None,
) -> List[FlatpakRemote]:
"""Return configured Flatpak remotes for a Flatpak installation root.
Flatpak stores remotes in the OSTree repo config. This gives us the remote
names and repository URLs. It does not reliably preserve the original
.flatpakref/.flatpakrepo URL that was used during installation.
"""
config_path = os.path.join(flatpak_root, "repo", "config")
if not os.path.isfile(config_path):
return []
parser = configparser.ConfigParser(interpolation=None, strict=False)
try:
parser.read(config_path, encoding="utf-8")
except Exception:
return []
out: List[FlatpakRemote] = []
for section in parser.sections():
match = re.fullmatch(r'remote\s+"(.+)"', section)
if not match:
continue
name = match.group(1).strip()
url = parser.get(section, "url", fallback="").strip()
if not name or not url:
continue
out.append(
FlatpakRemote(
name=name,
method=method,
url=url,
user=user,
home=home,
)
)
return sorted(out, key=lambda r: (r.method, r.user or "", r.name))
def find_user_flatpaks(home: str, user: Optional[str] = None) -> List[FlatpakInstall]:
"""Return per-user Flatpak applications installed under a home directory."""
flatpak_root = os.path.join(home, ".local", "share", "flatpak")
return _find_flatpaks_in_root(flatpak_root, method="user", user=user, home=home)
def find_user_flatpak_remotes(
home: str, user: Optional[str] = None
) -> List[FlatpakRemote]:
flatpak_root = os.path.join(home, ".local", "share", "flatpak")
return find_flatpak_remotes(flatpak_root, method="user", user=user, home=home)
def find_system_flatpaks() -> List[FlatpakInstall]:
"""Return Flatpak refs installed system-wide.
Prefer `flatpak list --system` because it is Flatpak's own view of
installed refs and includes layouts the filesystem scanner might miss.
Fall back to the on-disk app deployment tree when the command is
unavailable or produces unparsable output.
"""
listing = _run_flatpak_list("system")
if listing is not None:
output, columns = listing
parsed = _parse_flatpak_list_output(output, method="system", columns=columns)
if parsed or not output.strip():
return parsed
return _find_flatpaks_in_root("/var/lib/flatpak", method="system")
def find_system_flatpak_remotes() -> List[FlatpakRemote]:
return find_flatpak_remotes("/var/lib/flatpak", method="system")
def _parse_snap_notes(notes: str) -> List[str]:
if not notes or notes == "-":
return []
cleaned = notes.replace(",", " ").replace(";", " ")
return sorted(
{n.strip().lower() for n in cleaned.split() if n.strip() and n.strip() != "-"}
)
def _parse_snap_list_output(output: str) -> List[SnapInstall]:
out: List[SnapInstall] = []
for idx, line in enumerate(output.splitlines()):
line = line.strip()
if not line:
continue
if idx == 0 and line.lower().startswith("name"):
continue
parts = line.split(maxsplit=5)
if len(parts) < 5:
continue
name = parts[0]
revision: Optional[int]
try:
revision = int(parts[2])
except ValueError:
revision = None
tracking = parts[3]
channel = None if tracking in {"-", ""} else tracking
notes = _parse_snap_notes(parts[5] if len(parts) > 5 else "")
out.append(
SnapInstall(
name=name,
channel=channel,
revision=revision,
classic="classic" in notes,
devmode="devmode" in notes,
dangerous="dangerous" in notes,
notes=notes,
source="snap-list",
)
)
return sorted(out, key=lambda s: s.name)
def _run_snap_list() -> Optional[str]:
if shutil.which("snap") is None:
return None
try:
proc = subprocess.run( # nosec
["snap", "list"],
shell=False,
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=10,
)
except Exception:
return None
if proc.returncode != 0:
return None
return proc.stdout or ""
def _find_system_snaps_from_filesystem() -> List[SnapInstall]:
snapd_snaps = "/var/lib/snapd/snaps"
if not os.path.isdir(snapd_snaps):
return []
current_revisions: Dict[str, int] = {}
snap_mounts = "/snap"
if os.path.isdir(snap_mounts):
try:
mount_names = os.listdir(snap_mounts)
except OSError:
mount_names = []
for name in mount_names:
current = os.path.join(snap_mounts, name, "current")
try:
target = os.readlink(current)
except OSError:
continue
try:
current_revisions[name] = int(os.path.basename(target.rstrip("/")))
except ValueError:
continue
candidates: Dict[str, List[int]] = {}
try:
entries = os.listdir(snapd_snaps)
except OSError:
return []
for entry in entries:
if not entry.endswith(".snap") or "_" not in entry:
continue
name, rev_text = entry[:-5].rsplit("_", 1)
try:
revision = int(rev_text)
except ValueError:
continue
candidates.setdefault(name, []).append(revision)
out: List[SnapInstall] = []
for name, revisions in candidates.items():
revision = current_revisions.get(name)
if revision is None:
revision = max(revisions)
out.append(SnapInstall(name=name, revision=revision, source="filesystem"))
return sorted(out, key=lambda s: s.name)
def find_system_snaps() -> List[SnapInstall]:
"""Return system-wide snap packages.
Prefer `snap list` because it exposes channel tracking and confinement notes.
Fall back to snapd's on-disk snap filenames when the command is unavailable.
"""
output = _run_snap_list()
if output is not None:
parsed = _parse_snap_list_output(output)
if parsed:
return parsed
return _find_system_snaps_from_filesystem()
def collect_non_system_users() -> List[UserRecord]:
defs = parse_login_defs()
uid_min = defs.get("UID_MIN", 1000)
@ -139,6 +786,10 @@ def collect_non_system_users() -> List[UserRecord]:
ssh_files = find_user_ssh_files(home) if home and home.startswith("/") else []
flatpaks: List[FlatpakInstall] = []
if home and home.startswith("/"):
flatpaks = find_user_flatpaks(home, user=name)
users.append(
UserRecord(
name=name,
@ -150,6 +801,7 @@ def collect_non_system_users() -> List[UserRecord]:
primary_group=primary_group,
supplementary_groups=supp,
ssh_files=ssh_files,
flatpaks=flatpaks,
)
)

View file

@ -10,8 +10,9 @@ import stat
import subprocess # nosec
import time
from dataclasses import dataclass, asdict, field
from typing import Dict, List, Optional, Set, Tuple
from typing import Any, Dict, List, Optional, Set, Tuple
from .role_names import avoid_reserved_role_name
from .systemd import (
list_enabled_services,
list_enabled_timers,
@ -101,6 +102,23 @@ class UsersSnapshot:
managed_files: List[ManagedFile] = field(default_factory=list)
excluded: List[ExcludedFile] = field(default_factory=list)
notes: List[str] = field(default_factory=list)
user_flatpaks: Dict[str, List[Dict[str, Any]]] = field(default_factory=dict)
user_flatpak_remotes: List[Dict[str, Any]] = field(default_factory=list)
@dataclass
class FlatpakSnapshot:
role_name: str
system_flatpaks: List[Dict[str, Any]] = field(default_factory=list)
remotes: List[Dict[str, Any]] = field(default_factory=list)
notes: List[str] = field(default_factory=list)
@dataclass
class SnapSnapshot:
role_name: str
system_snaps: List[Dict[str, Any]] = field(default_factory=list)
notes: List[str] = field(default_factory=list)
@dataclass
@ -364,11 +382,11 @@ def _role_id(raw: str) -> str:
def _role_name_from_unit(unit: str) -> str:
base = _role_id(unit.removesuffix(".service"))
return _safe_name(base)
return avoid_reserved_role_name(_safe_name(base), prefix="service")
def _role_name_from_pkg(pkg: str) -> str:
return _safe_name(pkg)
return avoid_reserved_role_name(_safe_name(pkg), prefix="package")
def _copy_into_bundle(
@ -1808,6 +1826,30 @@ def harvest(
user_records = []
users_notes.append(f"Failed to enumerate users: {e!r}")
# Detect system-wide Flatpaks/Snaps and configured Flatpak remotes.
from .accounts import (
find_system_flatpak_remotes,
find_system_flatpaks,
find_system_snaps,
find_user_flatpak_remotes,
)
system_flatpaks = [asdict(f) for f in find_system_flatpaks()]
system_snaps = [asdict(s) for s in find_system_snaps()]
system_flatpak_remotes = [asdict(r) for r in find_system_flatpak_remotes()]
flatpak_notes: List[str] = []
snap_notes: List[str] = []
if system_flatpaks:
flatpak_notes.append(
"System-wide flatpaks detected: "
+ ", ".join(str(f.get("name")) for f in system_flatpaks)
)
if system_snaps:
snap_notes.append(
"System-wide snaps detected: "
+ ", ".join(str(s.get("name")) for s in system_snaps)
)
users_role_name = "users"
users_role_seen = seen_by_role.setdefault(users_role_name, set())
@ -1823,6 +1865,9 @@ def harvest(
(".bash_aliases", "user_shell_aliases"),
]
user_flatpaks_map: Dict[str, List[Dict[str, Any]]] = {}
user_flatpak_remotes: List[Dict[str, Any]] = []
for u in user_records:
users_list.append(
{
@ -1899,12 +1944,36 @@ def harvest(
seen_global=captured_global,
)
# Collect per-user Flatpak applications and remotes. Snap packages are
# system-wide; ~/snap/* is user data, not an install source.
if u.flatpaks:
user_flatpaks_map[u.name] = [asdict(fp) for fp in u.flatpaks]
if home and home.startswith("/"):
user_flatpak_remotes.extend(
asdict(r) for r in find_user_flatpak_remotes(home, user=u.name)
)
users_snapshot = UsersSnapshot(
role_name=users_role_name,
users=users_list,
managed_files=users_managed,
excluded=users_excluded,
notes=users_notes,
user_flatpaks=user_flatpaks_map,
user_flatpak_remotes=user_flatpak_remotes,
)
flatpak_snapshot = FlatpakSnapshot(
role_name="flatpak",
system_flatpaks=system_flatpaks,
remotes=system_flatpak_remotes,
notes=flatpak_notes,
)
snap_snapshot = SnapSnapshot(
role_name="snap",
system_snaps=system_snaps,
notes=snap_notes,
)
# -------------------------
@ -2512,6 +2581,8 @@ def harvest(
},
"roles": {
"users": asdict(users_snapshot),
"flatpak": asdict(flatpak_snapshot),
"snap": asdict(snap_snapshot),
"services": [asdict(s) for s in service_snaps],
"packages": [asdict(p) for p in pkg_snaps],
"apt_config": asdict(apt_config_snapshot),

View file

@ -10,6 +10,8 @@ import tempfile
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple
from .role_names import avoid_reserved_role_name
from .jinjaturtle import (
can_jinjify_path,
find_jinjaturtle_cmd,
@ -229,6 +231,72 @@ def _ensure_ansible_cfg(cfg_path: str) -> None:
return
def _ensure_requirements_yaml(req_path: str) -> None:
if not os.path.exists(req_path):
with open(req_path, "w", encoding="utf-8") as f:
f.write("---\n")
f.write("collections:\n")
f.write(" - name: community.general\n")
f.write(' version: ">=13.0.0"\n')
return
def _normalise_flatpak_item(
item: Any,
*,
method: str,
user: Optional[str] = None,
home: Optional[str] = None,
) -> Dict[str, Any]:
if isinstance(item, str):
out: Dict[str, Any] = {"name": item, "method": method}
elif isinstance(item, dict):
out = dict(item)
out.setdefault("method", method)
else:
out = {"name": str(item), "method": method}
if user:
out.setdefault("user", user)
if home:
out.setdefault("home", home)
return out
def _normalise_flatpak_remote(item: Any) -> Dict[str, Any]:
if isinstance(item, dict):
out = dict(item)
else:
out = {"name": str(item)}
out.setdefault("method", "system")
return out
def _normalise_snap_item(item: Any) -> Dict[str, Any]:
if isinstance(item, str):
out: Dict[str, Any] = {"name": item}
elif isinstance(item, dict):
out = dict(item)
else:
out = {"name": str(item)}
notes = out.get("notes") or []
if isinstance(notes, str):
notes = [notes]
notes_l = {str(n).lower() for n in notes}
out["classic"] = bool(out.get("classic") or "classic" in notes_l)
out["devmode"] = bool(out.get("devmode") or "devmode" in notes_l)
out["dangerous"] = bool(out.get("dangerous") or "dangerous" in notes_l)
# The Ansible snap module's revision parameter pins/holds the snap. For
# ordinary store snaps that track a channel, preserve the channel instead
# of freezing every harvested host at today's revision.
if out.get("revision") is not None and not out.get("channel"):
out["install_revision"] = True
else:
out["install_revision"] = False
return out
def _ensure_inventory_host(inv_path: str, fqdn: str) -> None:
os.makedirs(os.path.dirname(inv_path), exist_ok=True)
if not os.path.exists(inv_path):
@ -836,6 +904,8 @@ def _manifest_from_bundle_dir(
services: List[Dict[str, Any]] = roles.get("services", [])
package_roles: List[Dict[str, Any]] = roles.get("packages", [])
users_snapshot: Dict[str, Any] = roles.get("users", {})
flatpak_snapshot: Dict[str, Any] = roles.get("flatpak", {})
snap_snapshot: Dict[str, Any] = roles.get("snap", {})
apt_config_snapshot: Dict[str, Any] = roles.get("apt_config", {})
dnf_config_snapshot: Dict[str, Any] = roles.get("dnf_config", {})
firewall_runtime_snapshot: Dict[str, Any] = roles.get("firewall_runtime", {})
@ -871,8 +941,11 @@ def _manifest_from_bundle_dir(
os.path.join(out_dir, "inventory", "hosts.ini"), fqdn or ""
)
_ensure_ansible_cfg(os.path.join(out_dir, "ansible.cfg"))
_ensure_requirements_yaml(os.path.join(out_dir, "requirements.yml"))
manifested_users_roles: List[str] = []
manifested_flatpak_roles: List[str] = []
manifested_snap_roles: List[str] = []
manifested_apt_config_roles: List[str] = []
manifested_dnf_config_roles: List[str] = []
manifested_firewall_runtime_roles: List[str] = []
@ -885,7 +958,7 @@ def _manifest_from_bundle_dir(
# -------------------------
# Users role (non-system users)
# -------------------------
if users_snapshot and users_snapshot.get("users"):
if users_snapshot:
role = users_snapshot.get("role_name", "users")
role_dir = os.path.join(roles_root, role)
_write_role_scaffold(role_dir)
@ -970,6 +1043,33 @@ def _manifest_from_bundle_dir(
}
)
# Build Flatpak and Snap lists. Flatpak can be installed system-wide or
# per-user. Snap packages are system-wide; per-user ~/snap/* directories
# are runtime/user data and are not treated as install sources.
users_flatpaks: List[Dict[str, Any]] = []
user_flatpak_map = users_snapshot.get("user_flatpaks", {}) or {}
home_by_user = {
str(u.get("name")): str(u.get("home") or "") for u in users_data
}
for uname, flatpaks in user_flatpak_map.items():
for fp in flatpaks or []:
users_flatpaks.append(
_normalise_flatpak_item(
fp,
method="user",
user=str(uname),
home=home_by_user.get(str(uname)) or None,
)
)
flatpak_remotes = [
_normalise_flatpak_remote(r)
for r in (users_snapshot.get("user_flatpak_remotes", []) or [])
]
users_needs_community = bool(flatpak_remotes or users_flatpaks)
if users_needs_community:
_ensure_requirements_yaml(os.path.join(out_dir, "requirements.yml"))
# Variables are host-specific in site mode; in non-site mode they live in role defaults.
if site_mode:
_write_role_defaults(
@ -978,6 +1078,8 @@ def _manifest_from_bundle_dir(
"users_groups": [],
"users_users": [],
"users_ssh_files": [],
"users_flatpaks": [],
"users_flatpak_remotes": [],
},
)
_write_hostvars(
@ -988,6 +1090,8 @@ def _manifest_from_bundle_dir(
"users_groups": group_names,
"users_users": users_data,
"users_ssh_files": ssh_files,
"users_flatpaks": users_flatpaks,
"users_flatpak_remotes": flatpak_remotes,
},
)
else:
@ -997,13 +1101,23 @@ def _manifest_from_bundle_dir(
"users_groups": group_names,
"users_users": users_data,
"users_ssh_files": ssh_files,
"users_flatpaks": users_flatpaks,
"users_flatpak_remotes": flatpak_remotes,
},
)
with open(
os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8"
) as f:
f.write("---\ndependencies: []\n")
if users_needs_community:
f.write(
"---\n"
"dependencies: []\n"
"collections:\n"
" - community.general\n"
)
else:
f.write("---\ndependencies: []\n")
# tasks (data-driven)
users_tasks = """---
@ -1056,6 +1170,52 @@ def _manifest_from_bundle_dir(
group: "{{ item.group }}"
mode: "{{ item.mode }}"
loop: "{{ users_ssh_files | default([]) }}"
"""
if flatpak_remotes or users_flatpaks:
users_tasks += """
- name: Ensure user Flatpak remotes exist
ansible.builtin.command:
argv:
- flatpak
- remote-add
- --user
- --if-not-exists
- "{{ item.name }}"
- "{{ item.url }}"
loop: "{{ users_flatpak_remotes | default([]) | selectattr('method', 'equalto', 'user') | list }}"
when:
- item.name is defined
- item.url is defined
- item.url | length > 0
- item.user is defined
become: true
become_user: "{{ item.user }}"
environment:
HOME: "{{ item.home | default('/home/' ~ item.user, true) }}"
XDG_DATA_HOME: "{{ (item.home | default('/home/' ~ item.user, true)) ~ '/.local/share' }}"
changed_when: false
- name: Install user Flatpaks
community.general.flatpak:
name:
- "{{ item.name }}"
state: present
method: user
remote: "{{ item.remote | default(omit) }}"
from_url: "{{ item.from_url | default(omit) }}"
loop: "{{ users_flatpaks | default([]) }}"
when:
- item.name is defined
- item.name | length > 0
- item.user is defined
become: true
become_user: "{{ item.user }}"
environment:
HOME: "{{ item.home | default('/home/' ~ item.user, true) }}"
XDG_DATA_HOME: "{{ (item.home | default('/home/' ~ item.user, true)) ~ '/.local/share' }}"
"""
with open(
@ -1068,10 +1228,67 @@ def _manifest_from_bundle_dir(
) as f:
f.write("---\n")
def _fmt_app_list(items: List[Dict[str, Any]]) -> str:
lines = []
for item in items:
name = item.get("name")
if not name:
continue
detail_parts = []
for key in ("remote", "channel", "revision", "branch", "arch"):
value = item.get(key)
if value not in (None, "", []):
detail_parts.append(f"{key}={value}")
for key in ("classic", "devmode", "dangerous"):
if item.get(key):
detail_parts.append(key)
details = f" ({', '.join(detail_parts)})" if detail_parts else ""
lines.append(f"- {name}{details}")
return "\n".join(lines) or "- (none)"
def _fmt_user_flatpaks(items: List[Dict[str, Any]]) -> str:
lines = []
for item in items:
name = item.get("name")
user = item.get("user")
if not name or not user:
continue
detail_parts = []
for key in ("remote", "branch", "arch"):
value = item.get(key)
if value not in (None, "", []):
detail_parts.append(f"{key}={value}")
details = f" ({', '.join(detail_parts)})" if detail_parts else ""
lines.append(f"- {user}: {name}{details}")
return "\n".join(lines) or "- (none)"
def _fmt_remotes(items: List[Dict[str, Any]]) -> str:
lines = []
for item in items:
name = item.get("name")
url = item.get("url")
method = item.get("method") or "system"
user = item.get("user")
if not name or not url:
continue
owner = f"user={user}" if user else "system"
lines.append(f"- {name} ({method}, {owner}): {url}")
return "\n".join(lines) or "- (none)"
readme = (
"""# users
Generated non-system user accounts and SSH public material.
Generated non-system user accounts, SSH public material, and per-user Flatpak
applications/remotes.
**Note:** User Flatpak tasks require the `community.general` Ansible collection.
Install it with: `ansible-galaxy collection install -r requirements.yml`.
Flatpak `remote` is harvested from the installed deployment where detectable.
The original `.flatpakref` URL is generally not preserved by Flatpak after
installation, so `from_url` is only emitted if a future/hand-edited state file
contains it.
## Users
"""
@ -1089,6 +1306,14 @@ Generated non-system user accounts and SSH public material.
or "- (none)"
)
+ """\n
## Flatpak remotes
"""
+ _fmt_remotes(flatpak_remotes)
+ """\n
## User Flatpaks
"""
+ _fmt_user_flatpaks(users_flatpaks)
+ """\n
## Excluded
"""
+ (
@ -1106,6 +1331,274 @@ Generated non-system user accounts and SSH public material.
manifested_users_roles.append(role)
# -------------------------
# Flatpak role (system-wide Flatpak remotes and applications)
# -------------------------
raw_flatpak_apps = flatpak_snapshot.get("system_flatpaks", []) or []
raw_flatpak_remotes = flatpak_snapshot.get("remotes", []) or []
if flatpak_snapshot:
role = flatpak_snapshot.get("role_name", "flatpak")
role_dir = os.path.join(roles_root, role)
_write_role_scaffold(role_dir)
_ensure_requirements_yaml(os.path.join(out_dir, "requirements.yml"))
flatpak_system_flatpaks = [
_normalise_flatpak_item(fp, method="system") for fp in raw_flatpak_apps
]
flatpak_remotes = [_normalise_flatpak_remote(r) for r in raw_flatpak_remotes]
vars_map = {
"flatpak_system_flatpaks": flatpak_system_flatpaks,
"flatpak_remotes": flatpak_remotes,
}
if site_mode:
_write_role_defaults(
role_dir,
{"flatpak_system_flatpaks": [], "flatpak_remotes": []},
)
_write_hostvars(out_dir, fqdn or "", role, vars_map)
else:
_write_role_defaults(role_dir, vars_map)
with open(
os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8"
) as f:
f.write(
"---\n" "dependencies: []\n" "collections:\n" " - community.general\n"
)
tasks = """---
- name: Ensure system Flatpak remotes exist
ansible.builtin.command:
argv:
- flatpak
- remote-add
- --system
- --if-not-exists
- "{{ item.name }}"
- "{{ item.url }}"
loop: "{{ flatpak_remotes | default([]) }}"
when:
- item.name is defined
- item.url is defined
- item.url | length > 0
become: true
changed_when: false
- name: Install system-wide Flatpaks
community.general.flatpak:
name:
- "{{ item.name }}"
state: present
method: system
remote: "{{ item.remote | default(omit) }}"
from_url: "{{ item.from_url | default(omit) }}"
loop: "{{ flatpak_system_flatpaks | default([]) }}"
when:
- item.name is defined
- item.name | length > 0
become: true
"""
with open(
os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8"
) as f:
f.write(tasks)
with open(
os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8"
) as f:
f.write("---\n")
def _fmt_flatpak_apps(items: List[Dict[str, Any]]) -> str:
lines = []
for item in items:
name = item.get("name")
if not name:
continue
detail_parts = []
for key in ("remote", "branch", "arch"):
value = item.get(key)
if value not in (None, "", []):
detail_parts.append(f"{key}={value}")
details = f" ({', '.join(detail_parts)})" if detail_parts else ""
lines.append(f"- {name}{details}")
return "\n".join(lines) or "- (none)"
def _fmt_flatpak_remotes(items: List[Dict[str, Any]]) -> str:
lines = []
for item in items:
name = item.get("name")
url = item.get("url")
if not name or not url:
continue
lines.append(f"- {name}: {url}")
return "\n".join(lines) or "- (none)"
notes = flatpak_snapshot.get("notes", []) or []
readme = (
"""# flatpak
Generated system-wide Flatpak remotes and applications.
**Note:** This role requires the `community.general` Ansible collection.
Install it with: `ansible-galaxy collection install -r requirements.yml`.
Flatpak `remote` is harvested from the installed deployment where detectable.
The original `.flatpakref` URL is generally not preserved by Flatpak after
installation, so `from_url` is only emitted if a future/hand-edited state file
contains it.
## System Flatpak remotes
"""
+ _fmt_flatpak_remotes(flatpak_remotes)
+ """\n
## System-wide Flatpaks
"""
+ _fmt_flatpak_apps(flatpak_system_flatpaks)
+ """\n
## Notes
"""
+ ("\n".join([f"- {n}" for n in notes]) or "- (none)")
+ """\n"""
)
with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f:
f.write(readme)
manifested_flatpak_roles.append(role)
# -------------------------
# Snap role (system-wide snap packages)
# -------------------------
raw_system_snaps = snap_snapshot.get("system_snaps", []) or []
if raw_system_snaps:
role = snap_snapshot.get("role_name", "snap") if snap_snapshot else "snap"
role_dir = os.path.join(roles_root, role)
_write_role_scaffold(role_dir)
_ensure_requirements_yaml(os.path.join(out_dir, "requirements.yml"))
snap_system_snaps = [_normalise_snap_item(s) for s in raw_system_snaps]
vars_map = {"snap_system_snaps": snap_system_snaps}
if site_mode:
_write_role_defaults(role_dir, {"snap_system_snaps": []})
_write_hostvars(out_dir, fqdn or "", role, vars_map)
else:
_write_role_defaults(role_dir, vars_map)
with open(
os.path.join(role_dir, "meta", "main.yml"), "w", encoding="utf-8"
) as f:
f.write(
"---\n" "dependencies: []\n" "collections:\n" " - community.general\n"
)
tasks = """---
- name: Install system-wide snaps with full detected attributes
community.general.snap:
name:
- "{{ item.name }}"
state: present
channel: "{{ item.channel | default(omit) if not (item.install_revision | default(false)) else omit }}"
revision: "{{ item.revision | default(omit) if (item.install_revision | default(false)) else omit }}"
classic: "{{ item.classic | default(false) }}"
devmode: "{{ item.devmode | default(false) }}"
dangerous: "{{ item.dangerous | default(false) }}"
loop: "{{ snap_system_snaps | default([]) }}"
when:
- item.name is defined
- item.name | length > 0
become: true
register: _enroll_snap_full_results
ignore_errors: true
- name: Install system-wide snaps with compatibility options
community.general.snap:
name:
- "{{ item.item.name }}"
state: present
channel: "{{ item.item.channel | default(omit) if not (item.item.install_revision | default(false)) else omit }}"
classic: "{{ item.item.classic | default(false) }}"
loop: "{{ (_enroll_snap_full_results | default({})).results | default([]) }}"
when:
- item.failed | default(false)
- item.item.name is defined
- item.item.name | length > 0
become: true
register: _enroll_snap_compat_results
ignore_errors: true
- name: Install system-wide snaps with minimal options
community.general.snap:
name:
- "{{ item.item.item.name }}"
state: present
loop: "{{ (_enroll_snap_compat_results | default({})).results | default([]) }}"
when:
- item.failed | default(false)
- item.item.item.name is defined
- item.item.item.name | length > 0
become: true
ignore_errors: true
"""
with open(
os.path.join(role_dir, "tasks", "main.yml"), "w", encoding="utf-8"
) as f:
f.write(tasks)
with open(
os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8"
) as f:
f.write("---\n")
def _fmt_snap_apps(items: List[Dict[str, Any]]) -> str:
lines = []
for item in items:
name = item.get("name")
if not name:
continue
detail_parts = []
for key in ("channel", "revision"):
value = item.get(key)
if value not in (None, "", []):
detail_parts.append(f"{key}={value}")
for key in ("classic", "devmode", "dangerous"):
if item.get(key):
detail_parts.append(key)
details = f" ({', '.join(detail_parts)})" if detail_parts else ""
lines.append(f"- {name}{details}")
return "\n".join(lines) or "- (none)"
notes = snap_snapshot.get("notes", []) or []
readme = (
"""# snap
Generated system-wide snap packages.
**Note:** This role requires the `community.general` Ansible collection.
Install it with: `ansible-galaxy collection install -r requirements.yml`.
The first install task uses all harvested attributes. If the installed
`community.general.snap` module is too old for some parameters, the generated
role falls back to reduced then minimal install tasks on a best-effort basis.
## System-wide snaps
"""
+ _fmt_snap_apps(snap_system_snaps)
+ """\n
## Notes
"""
+ ("\n".join([f"- {n}" for n in notes]) or "- (none)")
+ """\n"""
)
with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f:
f.write(readme)
manifested_snap_roles.append(role)
# -------------------------
# apt_config role (APT sources, pinning, and keyrings)
# -------------------------
@ -1880,7 +2373,8 @@ User-requested extra file harvesting.
# Service roles
# -------------------------
for svc in services:
role = svc["role_name"]
source_role = svc["role_name"]
role = avoid_reserved_role_name(source_role, prefix="service")
unit = svc["unit"]
pkgs = svc.get("packages", []) or []
managed_files = svc.get("managed_files", []) or []
@ -1899,7 +2393,7 @@ User-requested extra file harvesting.
templated, jt_vars = _jinjify_managed_files(
bundle_dir,
role,
source_role,
role_dir,
managed_files,
jt_exe=jt_exe,
@ -1911,14 +2405,14 @@ User-requested extra file harvesting.
if site_mode:
_copy_artifacts(
bundle_dir,
role,
source_role,
_host_role_files_dir(out_dir, fqdn or "", role),
exclude_rels=templated,
)
else:
_copy_artifacts(
bundle_dir,
role,
source_role,
os.path.join(role_dir, "files"),
exclude_rels=templated,
)
@ -2152,7 +2646,8 @@ This role was created by merging simple packages using the `--merge-simple-packa
# Process package roles (those with configuration files)
for pr in package_roles:
role = pr["role_name"]
source_role = pr["role_name"]
role = avoid_reserved_role_name(source_role, prefix="package")
pkg = pr.get("package") or ""
managed_files = pr.get("managed_files", []) or []
managed_dirs = pr.get("managed_dirs", []) or []
@ -2165,7 +2660,7 @@ This role was created by merging simple packages using the `--merge-simple-packa
templated, jt_vars = _jinjify_managed_files(
bundle_dir,
role,
source_role,
role_dir,
managed_files,
jt_exe=jt_exe,
@ -2177,14 +2672,14 @@ This role was created by merging simple packages using the `--merge-simple-packa
if site_mode:
_copy_artifacts(
bundle_dir,
role,
source_role,
_host_role_files_dir(out_dir, fqdn or "", role),
exclude_rels=templated,
)
else:
_copy_artifacts(
bundle_dir,
role,
source_role,
os.path.join(role_dir, "files"),
exclude_rels=templated,
)
@ -2294,6 +2789,8 @@ Generated for package `{pkg}`.
+ manifested_etc_custom_roles
+ manifested_usr_local_custom_roles
+ manifested_extra_paths_roles
+ manifested_flatpak_roles
+ manifested_snap_roles
+ manifested_users_roles
+ tail_roles
+ manifested_firewall_runtime_roles

28
enroll/role_names.py Normal file
View file

@ -0,0 +1,28 @@
from __future__ import annotations
RESERVED_SINGLETON_ROLE_NAMES = {
"users",
"flatpak",
"snap",
"apt_config",
"dnf_config",
"firewall_runtime",
"etc_custom",
"usr_local_custom",
"extra_paths",
"common_packages",
}
def avoid_reserved_role_name(role_name: str, *, prefix: str) -> str:
"""Return a role name that cannot collide with singleton roles.
Singleton roles are generated once per manifest from dedicated top-level
state sections. Package and service roles can naturally have the same names
as those singletons, e.g. the OS package named ``flatpak``. Prefix those
generated package/service roles so they cannot overwrite singleton role
directories during manifestation.
"""
if role_name in RESERVED_SINGLETON_ROLE_NAMES:
return f"{prefix}_{role_name}"
return role_name

View file

@ -575,6 +575,21 @@
"$ref": "#/$defs/UserEntry"
},
"type": "array"
},
"user_flatpaks": {
"additionalProperties": {
"type": "array",
"items": {
"$ref": "#/$defs/FlatpakInstall"
}
},
"type": "object"
},
"user_flatpak_remotes": {
"type": "array",
"items": {
"$ref": "#/$defs/FlatpakRemote"
}
}
},
"required": [
@ -656,6 +671,224 @@
"notes"
],
"type": "object"
},
"FlatpakInstall": {
"additionalProperties": false,
"properties": {
"name": {
"type": "string",
"minLength": 1
},
"method": {
"type": "string",
"enum": [
"system",
"user"
]
},
"remote": {
"type": [
"string",
"null"
]
},
"branch": {
"type": [
"string",
"null"
]
},
"arch": {
"type": [
"string",
"null"
]
},
"kind": {
"type": [
"string",
"null"
],
"enum": [
"app",
"runtime",
null
]
},
"ref": {
"type": [
"string",
"null"
]
},
"user": {
"type": [
"string",
"null"
]
},
"home": {
"type": [
"string",
"null"
]
},
"source": {
"type": "string"
},
"from_url": {
"type": "string",
"minLength": 1
}
},
"required": [
"name",
"method"
],
"type": "object"
},
"FlatpakRemote": {
"additionalProperties": false,
"properties": {
"name": {
"type": "string",
"minLength": 1
},
"method": {
"type": "string",
"enum": [
"system",
"user"
]
},
"url": {
"type": "string",
"minLength": 1
},
"user": {
"type": [
"string",
"null"
]
},
"home": {
"type": [
"string",
"null"
]
},
"source": {
"type": "string"
}
},
"required": [
"name",
"method",
"url"
],
"type": "object"
},
"SnapInstall": {
"additionalProperties": false,
"properties": {
"name": {
"type": "string",
"minLength": 1
},
"channel": {
"type": [
"string",
"null"
]
},
"revision": {
"type": [
"integer",
"null"
],
"minimum": 0
},
"classic": {
"type": "boolean"
},
"devmode": {
"type": "boolean"
},
"dangerous": {
"type": "boolean"
},
"notes": {
"type": "array",
"items": {
"type": "string"
}
},
"source": {
"type": "string"
},
"install_revision": {
"type": "boolean"
}
},
"required": [
"name"
],
"type": "object"
},
"FlatpakSnapshot": {
"additionalProperties": false,
"properties": {
"role_name": {
"const": "flatpak"
},
"system_flatpaks": {
"type": "array",
"items": {
"$ref": "#/$defs/FlatpakInstall"
}
},
"remotes": {
"type": "array",
"items": {
"$ref": "#/$defs/FlatpakRemote"
}
},
"notes": {
"type": "array",
"items": {
"type": "string"
}
}
},
"required": [
"role_name"
],
"type": "object"
},
"SnapSnapshot": {
"additionalProperties": false,
"properties": {
"role_name": {
"const": "snap"
},
"system_snaps": {
"type": "array",
"items": {
"$ref": "#/$defs/SnapInstall"
}
},
"notes": {
"type": "array",
"items": {
"type": "string"
}
}
},
"required": [
"role_name"
],
"type": "object"
}
},
"$id": "https://enroll.sh/schema/state.schema.json",
@ -766,6 +999,12 @@
},
"firewall_runtime": {
"$ref": "#/$defs/FirewallRuntimeSnapshot"
},
"flatpak": {
"$ref": "#/$defs/FlatpakSnapshot"
},
"snap": {
"$ref": "#/$defs/SnapSnapshot"
}
},
"required": [

View file

@ -44,6 +44,13 @@ poetry run \
--format json | jq
DEBIAN_FRONTEND=noninteractive apt-get remove -y --purge cowsay
# Common simple packages mode (is tested later)
poetry run \
enroll manifest \
--harvest "${BUNDLE_DIR}2" \
--out "${ANSIBLE_DIR}2" \
--merge-simple-packages
# Ansible test
builtin cd "${ANSIBLE_DIR}"
# Lint
@ -52,13 +59,8 @@ ansible-lint "${ANSIBLE_DIR}"
# Run
ansible-playbook playbook.yml -i "localhost," -c local --check --diff
# Common simple packages mode
poetry run \
enroll manifest \
--harvest "${BUNDLE_DIR}2" \
--out "${ANSIBLE_DIR}2" \
--merge-simple-packages
# Test the --merge-simple-packages mode
builtin cd "${ANSIBLE_DIR}2"
ls "${ANSIBLE_DIR}2/roles"
ansible-playbook playbook.yml -i "localhost," -c local --check --diff

View file

@ -312,3 +312,185 @@ def test_parse_group_handles_short_lines(tmp_path: Path):
assert 1000 in gid_to_name
assert 1001 not in gid_to_name # skipped due to short line
assert 1002 in gid_to_name
def test_find_flatpaks_in_root_detects_remote_branch_and_arch(tmp_path: Path):
import enroll.accounts as a
root = tmp_path / "flatpak"
(root / "repo").mkdir(parents=True)
(root / "repo" / "config").write_text(
'[remote "acme"]\nurl=https://flatpak.example/repo/\n',
encoding="utf-8",
)
ref = (
root
/ "repo"
/ "refs"
/ "remotes"
/ "acme"
/ "app"
/ "com.example.App"
/ "x86_64"
/ "stable"
)
ref.parent.mkdir(parents=True)
ref.write_text("checksum\n", encoding="utf-8")
active = root / "app" / "com.example.App" / "x86_64" / "stable" / "active"
active.mkdir(parents=True)
remotes = a.find_flatpak_remotes(str(root), method="system")
assert [(r.name, r.url, r.method) for r in remotes] == [
("acme", "https://flatpak.example/repo/", "system")
]
apps = a._find_flatpaks_in_root(str(root), method="system")
assert len(apps) == 1
assert apps[0].name == "com.example.App"
assert apps[0].remote == "acme"
assert apps[0].branch == "stable"
assert apps[0].arch == "x86_64"
def test_parse_snap_list_output_detects_channel_revision_and_modes():
import enroll.accounts as a
output = """Name Version Rev Tracking Publisher Notes
code abc 123 latest/stable vscode classic
mydev 1.0 42 latest/edge example devmode,dangerous
bare 1.0 5 latest/stable canonical base
"""
snaps = {snap.name: snap for snap in a._parse_snap_list_output(output)}
assert snaps["code"].channel == "latest/stable"
assert snaps["code"].revision == 123
assert snaps["code"].classic is True
assert snaps["mydev"].devmode is True
assert snaps["mydev"].dangerous is True
assert snaps["bare"].notes == ["base"]
def test_parse_flatpak_list_output_detects_system_refs():
from enroll.accounts import _parse_flatpak_list_output
output = "\n".join(
[
"app/org.example.App/x86_64/stable\tflathub\tstable\tx86_64",
"runtime/org.freedesktop.Platform/x86_64/24.08\tflathub\t24.08\tx86_64",
]
)
refs = _parse_flatpak_list_output(
output, method="system", columns=("ref", "origin", "branch", "arch")
)
assert [(r.kind, r.name, r.remote, r.branch, r.arch) for r in refs] == [
("app", "org.example.App", "flathub", "stable", "x86_64"),
("runtime", "org.freedesktop.Platform", "flathub", "24.08", "x86_64"),
]
assert refs[0].source == "flatpak-list"
def test_find_system_flatpaks_prefers_flatpak_list(monkeypatch):
import subprocess
import enroll.accounts as a
calls = []
def fake_run(args, **kwargs):
calls.append(args)
if args == ["flatpak", "list", "--columns=help"]:
return subprocess.CompletedProcess(
args,
0,
stdout="application\norigin\nbranch\narch\n",
stderr="",
)
return subprocess.CompletedProcess(
args,
0,
stdout="app/org.example.App/x86_64/stable\tacme\tstable\tx86_64\n",
stderr="",
)
monkeypatch.setattr(a.shutil, "which", lambda cmd: "/usr/bin/flatpak")
monkeypatch.setattr(a.subprocess, "run", fake_run)
monkeypatch.setattr(
a,
"_find_flatpaks_in_root",
lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("fallback used")),
)
refs = a.find_system_flatpaks()
assert calls[0] == ["flatpak", "list", "--columns=help"]
assert calls[1][:3] == ["flatpak", "list", "--system"]
assert refs[0].name == "org.example.App"
assert refs[0].method == "system"
assert refs[0].remote == "acme"
def test_parse_flatpak_list_output_detects_application_columns():
from enroll.accounts import _parse_flatpak_list_output
output = "org.example.App\tflathub\tstable\tx86_64\n"
refs = _parse_flatpak_list_output(
output, method="system", columns=("application", "origin", "branch", "arch")
)
assert len(refs) == 1
assert refs[0].name == "org.example.App"
assert refs[0].kind is None
assert refs[0].remote == "flathub"
assert refs[0].branch == "stable"
assert refs[0].arch == "x86_64"
def test_parse_plain_flatpak_list_output_like_default_table():
from enroll.accounts import _parse_flatpak_list_output
output = """Name Application ID Version Branch Installation
Mesa org.freedesktop.Platform.GL.default 26.0.6 25.08 system
Mesa (Extra) org.freedesktop.Platform.GL.default 26.0.6 25.08-extra system
Codecs Extra Extension org.freedesktop.Platform.codecs-extra 25.08-extra system
KDE Application Platform org.kde.Platform 6.10 system
OnionShare org.onionshare.OnionShare 2.6.4 stable system
"""
refs = _parse_flatpak_list_output(output, method="system", columns=None)
by_name_branch = {(r.name, r.branch) for r in refs}
assert ("org.onionshare.OnionShare", "stable") in by_name_branch
assert ("org.freedesktop.Platform.GL.default", "25.08") in by_name_branch
assert ("org.freedesktop.Platform.GL.default", "25.08-extra") in by_name_branch
assert ("org.kde.Platform", "6.10") in by_name_branch
def test_parse_flatpak_columns_help_handles_description_table():
from enroll.accounts import _parse_flatpak_columns_help
output = """
Available columns:
application The application ID
branch The branch
installation The installation
"""
assert _parse_flatpak_columns_help(output) >= {
"application",
"branch",
"installation",
}
def test_flatpak_list_attempts_respect_supported_columns():
from enroll.accounts import _flatpak_list_attempts
attempts = _flatpak_list_attempts(
"--system", {"application", "branch", "installation"}
)
command_strings = [" ".join(args) for args, _columns in attempts]
assert any("--columns=application,branch" in cmd for cmd in command_strings)
assert not any("origin" in cmd for cmd in command_strings)
assert command_strings[-1] == "flatpak list --system"

View file

@ -224,6 +224,19 @@ def test_harvest_dedup_manual_packages_and_builds_etc_custom(
monkeypatch.setattr(harvest, "collect_non_system_users", lambda: [])
import enroll.accounts as accounts
monkeypatch.setattr(accounts, "find_system_flatpaks", lambda: [])
monkeypatch.setattr(accounts, "find_system_flatpak_remotes", lambda: [])
monkeypatch.setattr(
accounts, "find_user_flatpak_remotes", lambda home, user=None: []
)
monkeypatch.setattr(
accounts,
"find_system_snaps",
lambda: [accounts.SnapInstall(name="code", channel="latest/stable")],
)
def fake_stat_triplet(p: str):
if p == "/usr/local/bin/myscript":
return ("root", "root", "0755")
@ -259,6 +272,9 @@ def test_harvest_dedup_manual_packages_and_builds_etc_custom(
for o in openvpn_obs
)
assert st["roles"]["snap"]["role_name"] == "snap"
assert st["roles"]["snap"]["system_snaps"][0]["name"] == "code"
# Service role captured modified conffile
svc = st["roles"]["services"][0]
assert svc["unit"] == "openvpn.service"

View file

@ -286,3 +286,20 @@ def test_collect_firewall_runtime_snapshot_is_per_family_fallback(
assert (
tmp_path / "artifacts" / "firewall_runtime" / "firewall" / "iptables.v6"
).exists()
def test_package_role_names_do_not_collide_with_singleton_roles():
from enroll.harvest import _role_name_from_pkg
assert _role_name_from_pkg("flatpak") == "package_flatpak"
assert _role_name_from_pkg("snap") == "package_snap"
assert _role_name_from_pkg("users") == "package_users"
assert _role_name_from_pkg("nginx") == "nginx"
def test_service_role_names_do_not_collide_with_singleton_roles():
from enroll.harvest import _role_name_from_unit
assert _role_name_from_unit("flatpak.service") == "service_flatpak"
assert _role_name_from_unit("users.service") == "service_users"
assert _role_name_from_unit("nginx.service") == "nginx"

View file

@ -1064,3 +1064,317 @@ def test_render_firewall_runtime_tasks_with_ipv6():
}
result = manifest._render_firewall_runtime_tasks(state)
assert len(result) >= 1
def test_manifest_renders_flatpak_and_snap_details(tmp_path: Path):
bundle = tmp_path / "bundle"
out = tmp_path / "ansible"
state = {
"schema_version": 3,
"host": {"hostname": "test", "os": "debian", "pkg_backend": "dpkg"},
"inventory": {"packages": {}},
"roles": {
"users": {
"role_name": "users",
"users": [
{
"name": "alice",
"uid": 1000,
"gid": 1000,
"gecos": "Alice",
"home": "/home/alice",
"shell": "/bin/bash",
"primary_group": "alice",
"supplementary_groups": [],
}
],
"managed_files": [],
"excluded": [],
"notes": [],
"user_flatpak_remotes": [
{
"name": "acme-user",
"method": "user",
"url": "https://flatpak.example/user-repo/",
"user": "alice",
"home": "/home/alice",
},
],
"user_flatpaks": {
"alice": [
{
"name": "org.example.UserApp",
"method": "user",
"remote": "acme-user",
"branch": "stable",
"arch": "x86_64",
}
]
},
},
"flatpak": {
"role_name": "flatpak",
"remotes": [
{
"name": "acme",
"method": "system",
"url": "https://flatpak.example/repo/",
},
],
"system_flatpaks": [
{
"name": "com.example.App",
"method": "system",
"remote": "acme",
"branch": "stable",
"arch": "x86_64",
}
],
"notes": [],
},
"snap": {
"role_name": "snap",
"system_snaps": [
{
"name": "code",
"channel": "latest/stable",
"revision": 123,
"classic": True,
"notes": ["classic"],
}
],
"notes": [],
},
"services": [],
"packages": [],
"apt_config": {
"role_name": "apt_config",
"managed_files": [],
"excluded": [],
"notes": [],
},
"dnf_config": {
"role_name": "dnf_config",
"managed_files": [],
"excluded": [],
"notes": [],
},
"etc_custom": {
"role_name": "etc_custom",
"managed_files": [],
"excluded": [],
"notes": [],
},
"usr_local_custom": {
"role_name": "usr_local_custom",
"managed_files": [],
"excluded": [],
"notes": [],
},
"extra_paths": {
"role_name": "extra_paths",
"include_patterns": [],
"exclude_patterns": [],
"managed_files": [],
"excluded": [],
"notes": [],
},
},
}
bundle.mkdir(parents=True, exist_ok=True)
(bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8")
manifest.manifest(str(bundle), str(out))
users_defaults = (out / "roles" / "users" / "defaults" / "main.yml").read_text(
encoding="utf-8"
)
users_tasks = (out / "roles" / "users" / "tasks" / "main.yml").read_text(
encoding="utf-8"
)
users_readme = (out / "roles" / "users" / "README.md").read_text(encoding="utf-8")
flatpak_defaults = (out / "roles" / "flatpak" / "defaults" / "main.yml").read_text(
encoding="utf-8"
)
flatpak_tasks = (out / "roles" / "flatpak" / "tasks" / "main.yml").read_text(
encoding="utf-8"
)
snap_defaults = (out / "roles" / "snap" / "defaults" / "main.yml").read_text(
encoding="utf-8"
)
snap_tasks = (out / "roles" / "snap" / "tasks" / "main.yml").read_text(
encoding="utf-8"
)
assert "users_flatpak_remotes:" in users_defaults
assert "remote: acme-user" in users_defaults
assert "community.general.snap" not in users_tasks
assert "Install system-wide snaps" not in users_tasks
assert "Install system-wide Flatpaks" not in users_tasks
assert "ansible-galaxy collection install -r requirements.yml" in users_readme
assert "snap_system_snaps:" in snap_defaults
assert "channel: latest/stable" in snap_defaults
assert "classic: true" in snap_defaults
assert "community.general.snap" in snap_tasks
assert "Install system-wide snaps with full detected attributes" in snap_tasks
assert "Install system-wide snaps with compatibility options" in snap_tasks
assert "Install system-wide snaps with minimal options" in snap_tasks
assert "ignore_errors: true" in snap_tasks
assert "flatpak_system_flatpaks:" in flatpak_defaults
assert "remote: acme" in flatpak_defaults
assert "community.general.flatpak" in flatpak_tasks
assert "Install system-wide Flatpaks" in flatpak_tasks
assert (out / "requirements.yml").exists()
def test_users_role_without_portable_apps_omits_community_general_tasks(tmp_path):
bundle = tmp_path / "bundle"
out = tmp_path / "out"
state = {
"roles": {
"users": {
"role_name": "users",
"users": [
{
"name": "alice",
"uid": 1000,
"gid": 1000,
"gecos": "Alice",
"home": "/home/alice",
"shell": "/bin/bash",
"primary_group": "alice",
"supplementary_groups": [],
}
],
"managed_files": [],
"excluded": [],
"notes": [],
},
"services": [],
"packages": [],
},
}
bundle.mkdir(parents=True, exist_ok=True)
(bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8")
manifest.manifest(str(bundle), str(out))
users_tasks = (out / "roles" / "users" / "tasks" / "main.yml").read_text(
encoding="utf-8"
)
users_meta = (out / "roles" / "users" / "meta" / "main.yml").read_text(
encoding="utf-8"
)
assert "community.general.flatpak" not in users_tasks
assert "community.general.snap" not in users_tasks
assert "collections:" not in users_meta
def test_manifest_emits_flatpak_role_even_when_no_flatpaks(tmp_path):
bundle = tmp_path / "bundle"
out = tmp_path / "out"
state = {
"roles": {
"users": {
"role_name": "users",
"users": [],
"managed_files": [],
"excluded": [],
"notes": [],
"user_flatpaks": {},
"user_flatpak_remotes": [],
},
"flatpak": {
"role_name": "flatpak",
"system_flatpaks": [],
"remotes": [],
"notes": [],
},
"services": [],
"packages": [],
}
}
bundle.mkdir(parents=True, exist_ok=True)
(bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8")
manifest.manifest(str(bundle), str(out))
flatpak_tasks = (out / "roles" / "flatpak" / "tasks" / "main.yml").read_text(
encoding="utf-8"
)
flatpak_defaults = (out / "roles" / "flatpak" / "defaults" / "main.yml").read_text(
encoding="utf-8"
)
assert "flatpak_system_flatpaks: []" in flatpak_defaults
assert "flatpak_remotes: []" in flatpak_defaults
assert "Install system-wide Flatpaks" in flatpak_tasks
assert "Ensure system Flatpak remotes exist" in flatpak_tasks
def test_manifest_avoids_package_role_collision_with_flatpak_singleton(tmp_path):
bundle = tmp_path / "bundle"
out = tmp_path / "out"
state = {
"roles": {
"users": {
"role_name": "users",
"users": [],
"managed_files": [],
"excluded": [],
"notes": [],
"user_flatpaks": {},
"user_flatpak_remotes": [],
},
"flatpak": {
"role_name": "flatpak",
"remotes": [
{
"name": "flathub",
"method": "system",
"url": "https://dl.flathub.org/repo/",
}
],
"system_flatpaks": [
{
"name": "org.onionshare.OnionShare",
"method": "system",
"remote": "flathub",
"branch": "stable",
"arch": "x86_64",
}
],
"notes": [],
},
"services": [],
"packages": [
{
"package": "flatpak",
"role_name": "flatpak",
"managed_files": [],
"managed_dirs": [],
"managed_links": [],
"excluded": [],
"notes": [],
"has_config": True,
}
],
}
}
bundle.mkdir(parents=True, exist_ok=True)
(bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8")
manifest.manifest(str(bundle), str(out))
flatpak_defaults = (out / "roles" / "flatpak" / "defaults" / "main.yml").read_text(
encoding="utf-8"
)
playbook = (out / "playbook.yml").read_text(encoding="utf-8")
assert "org.onionshare.OnionShare" in flatpak_defaults
assert (out / "roles" / "package_flatpak" / "tasks" / "main.yml").exists()
assert "role: flatpak" in playbook
assert "role: package_flatpak" in playbook