Compare commits

...

2 commits

Author SHA1 Message Date
1e996f4a43
Group all package roles into Debian/RPM 'sections'
Some checks failed
Lint / test (push) Waiting to run
CI / test (push) Has been cancelled
This includes managed config files and unit state.

This mode is not used if `--fqdn` or `--no-common-roles` is set,
in which case, the traditional behaviour of preserving one role
per package/unit is used instead.

This is a breaking change.
2026-06-14 19:19:59 +10:00
e2339616fb
remove flatpak tests which don't work great in CI 2026-06-14 18:49:26 +10:00
14 changed files with 908 additions and 104 deletions

View file

@ -1,7 +1,7 @@
# 0.7.0 # 0.7.0
* Add support for detecting flatpaks and snaps * Add support for detecting flatpaks and snaps
* Add --merge-simple-packages to reduce the number of roles, for packages that have no config files or services to maintain. * BREAKING CHANGE: Group all package and systemd-unit roles into Debian Section/RPM Group roles by default, including managed config files and unit state. This mode is not used if `--fqdn` or `--no-common-roles` is set, in which case, the traditional behaviour of preserving one role per package/unit is used instead.
# 0.6.0 # 0.6.0

View file

@ -129,7 +129,7 @@ Generate Ansible output from an existing harvest bundle.
**Common flags** **Common flags**
- `--fqdn <host>`: enables **multi-site** output style - `--fqdn <host>`: enables **multi-site** output style
- `--merge-simple-packages`: Puts all packages that don't have config files or services to maintain, in a `common_packages` role, to reduce the number of overall Ansible roles to run. - `--no-common-roles`: disables the default grouping of package and systemd-unit roles into Debian Section/RPM Group roles, preserving one generated role per package/unit. `--fqdn` implies this behaviour.
**Role tags** **Role tags**
Generated playbooks tag each role so you can target just the parts you need: Generated playbooks tag each role so you can target just the parts you need:
@ -149,7 +149,7 @@ Convenience wrapper that runs **harvest → manifest** in one command.
Use this when you want “get me something workable ASAP”. Use this when you want “get me something workable ASAP”.
Supports the same general flags as harvest/manifest, including `--fqdn`, remote harvest flags, `--merge-simple-packages`, and `--sops`. Supports the same general flags as harvest/manifest, including `--fqdn`, `--no-common-roles`, remote harvest flags, and `--sops`.
--- ---

View file

@ -312,6 +312,14 @@ def _add_common_manifest_args(p: argparse.ArgumentParser) -> None:
"--fqdn", "--fqdn",
help="Host FQDN/name for site-mode output (creates inventory/, inventory/host_vars/, playbooks/).", help="Host FQDN/name for site-mode output (creates inventory/, inventory/host_vars/, playbooks/).",
) )
p.add_argument(
"--no-common-roles",
action="store_true",
help=(
"Do not group package and systemd-unit roles into common section/group roles. "
"This preserves one generated role per package/unit. --fqdn implies this."
),
)
g = p.add_mutually_exclusive_group() g = p.add_mutually_exclusive_group()
g.add_argument( g.add_argument(
"--jinjaturtle", "--jinjaturtle",
@ -503,11 +511,6 @@ def main() -> None:
"(binary) using the given GPG fingerprint(s). Requires `sops` on PATH." "(binary) using the given GPG fingerprint(s). Requires `sops` on PATH."
), ),
) )
m.add_argument(
"--merge-simple-packages",
action="store_true",
help="Merge packages with no configuration files into a single 'common_packages' role.",
)
_add_common_manifest_args(m) _add_common_manifest_args(m)
s = sub.add_parser( s = sub.add_parser(
@ -570,11 +573,6 @@ def main() -> None:
"or a file path." "or a file path."
), ),
) )
s.add_argument(
"--merge-simple-packages",
action="store_true",
help="Merge packages with no configuration files into a single 'common_packages' role.",
)
_add_common_manifest_args(s) _add_common_manifest_args(s)
d = sub.add_parser("diff", help="Compare two harvests and report differences") d = sub.add_parser("diff", help="Compare two harvests and report differences")
@ -921,7 +919,7 @@ def main() -> None:
fqdn=args.fqdn, fqdn=args.fqdn,
jinjaturtle=_jt_mode(args), jinjaturtle=_jt_mode(args),
sops_fingerprints=getattr(args, "sops", None), sops_fingerprints=getattr(args, "sops", None),
merge_simple_packages=getattr(args, "merge_simple_packages", False), no_common_roles=bool(getattr(args, "no_common_roles", False)),
) )
if getattr(args, "sops", None) and out_enc: if getattr(args, "sops", None) and out_enc:
print(str(out_enc)) print(str(out_enc))
@ -1059,9 +1057,7 @@ def main() -> None:
fqdn=args.fqdn, fqdn=args.fqdn,
jinjaturtle=_jt_mode(args), jinjaturtle=_jt_mode(args),
sops_fingerprints=list(sops_fps), sops_fingerprints=list(sops_fps),
merge_simple_packages=getattr( no_common_roles=bool(getattr(args, "no_common_roles", False)),
args, "merge_simple_packages", False
),
) )
if not args.harvest: if not args.harvest:
print(str(out_file)) print(str(out_file))
@ -1092,9 +1088,7 @@ def main() -> None:
args.out, args.out,
fqdn=args.fqdn, fqdn=args.fqdn,
jinjaturtle=_jt_mode(args), jinjaturtle=_jt_mode(args),
merge_simple_packages=getattr( no_common_roles=bool(getattr(args, "no_common_roles", False)),
args, "merge_simple_packages", False
),
) )
# For usability (when --harvest wasn't provided), print the harvest path. # For usability (when --harvest wasn't provided), print the harvest path.
if not args.harvest: if not args.harvest:
@ -1125,9 +1119,7 @@ def main() -> None:
fqdn=args.fqdn, fqdn=args.fqdn,
jinjaturtle=_jt_mode(args), jinjaturtle=_jt_mode(args),
sops_fingerprints=list(sops_fps), sops_fingerprints=list(sops_fps),
merge_simple_packages=getattr( no_common_roles=bool(getattr(args, "no_common_roles", False)),
args, "merge_simple_packages", False
),
) )
if not args.harvest: if not args.harvest:
print(str(out_file)) print(str(out_file))
@ -1147,9 +1139,7 @@ def main() -> None:
args.out, args.out,
fqdn=args.fqdn, fqdn=args.fqdn,
jinjaturtle=_jt_mode(args), jinjaturtle=_jt_mode(args),
merge_simple_packages=getattr( no_common_roles=bool(getattr(args, "no_common_roles", False)),
args, "merge_simple_packages", False
),
) )
except RemoteSudoPasswordRequired: except RemoteSudoPasswordRequired:
raise SystemExit( raise SystemExit(

View file

@ -69,7 +69,7 @@ def list_installed_packages() -> Dict[str, List[Dict[str, str]]]:
Uses dpkg-query and is expected to work on Debian/Ubuntu-like systems. Uses dpkg-query and is expected to work on Debian/Ubuntu-like systems.
Output format: Output format:
{"pkg": [{"version": "...", "arch": "..."}, ...], ...} {"pkg": [{"version": "...", "arch": "...", "section": "..."}, ...], ...}
""" """
try: try:
@ -77,7 +77,7 @@ def list_installed_packages() -> Dict[str, List[Dict[str, str]]]:
[ [
"dpkg-query", "dpkg-query",
"-W", "-W",
"-f=${Package}\t${Version}\t${Architecture}\n", "-f=${Package}\t${Version}\t${Architecture}\t${Section}\n",
], ],
text=True, text=True,
capture_output=True, capture_output=True,
@ -97,7 +97,10 @@ def list_installed_packages() -> Dict[str, List[Dict[str, str]]]:
name, ver, arch = parts[0].strip(), parts[1].strip(), parts[2].strip() name, ver, arch = parts[0].strip(), parts[1].strip(), parts[2].strip()
if not name: if not name:
continue continue
out.setdefault(name, []).append({"version": ver, "arch": arch}) instance = {"version": ver, "arch": arch}
if len(parts) >= 4 and parts[3].strip():
instance["section"] = parts[3].strip()
out.setdefault(name, []).append(instance)
# Stable ordering for deterministic JSON dumps. # Stable ordering for deterministic JSON dumps.
for k in list(out.keys()): for k in list(out.keys()):

View file

@ -86,6 +86,7 @@ class ServiceSnapshot:
class PackageSnapshot: class PackageSnapshot:
package: str package: str
role_name: str role_name: str
section: Optional[str] = None
managed_dirs: List[ManagedDir] = field(default_factory=list) managed_dirs: List[ManagedDir] = field(default_factory=list)
managed_files: List[ManagedFile] = field(default_factory=list) managed_files: List[ManagedFile] = field(default_factory=list)
managed_links: List[ManagedLink] = field(default_factory=list) managed_links: List[ManagedLink] = field(default_factory=list)
@ -389,6 +390,30 @@ def _role_name_from_pkg(pkg: str) -> str:
return avoid_reserved_role_name(_safe_name(pkg), prefix="package") return avoid_reserved_role_name(_safe_name(pkg), prefix="package")
def _package_section_from_installations(
installs: List[Dict[str, str]],
) -> Optional[str]:
"""Return a stable package grouping label from installed package metadata.
Debian exposes this as ``Section``. RPM-family distributions have a broadly
similar ``Group`` tag, although modern Fedora/RHEL packages may omit it or
set it to ``Unspecified``.
"""
values: Set[str] = set()
for inst in installs or []:
value = (inst.get("section") or inst.get("group") or "").strip()
if not value:
continue
if value.lower() in {"(none)", "none", "unspecified"}:
continue
values.add(value)
if not values:
return None
return sorted(values)[0]
def _copy_into_bundle( def _copy_into_bundle(
bundle_dir: str, role_name: str, abs_path: str, src_rel: str bundle_dir: str, role_name: str, abs_path: str, src_rel: str
) -> None: ) -> None:
@ -1279,6 +1304,9 @@ def harvest(
cron_snapshot = PackageSnapshot( cron_snapshot = PackageSnapshot(
package=cron_pkg, package=cron_pkg,
role_name=cron_role_name, role_name=cron_role_name,
section=_package_section_from_installations(
installed_pkgs.get(cron_pkg, [])
),
managed_files=cron_managed, managed_files=cron_managed,
excluded=cron_excluded, excluded=cron_excluded,
notes=cron_notes, notes=cron_notes,
@ -1314,6 +1342,9 @@ def harvest(
logrotate_snapshot = PackageSnapshot( logrotate_snapshot = PackageSnapshot(
package=logrotate_pkg, package=logrotate_pkg,
role_name=logrotate_role_name, role_name=logrotate_role_name,
section=_package_section_from_installations(
installed_pkgs.get(logrotate_pkg, [])
),
managed_files=lr_managed, managed_files=lr_managed,
excluded=lr_excluded, excluded=lr_excluded,
notes=lr_notes, notes=lr_notes,
@ -1732,11 +1763,11 @@ def harvest(
seen_global=captured_global, seen_global=captured_global,
) )
has_config = bool(pkg_to_etc_paths.get(pkg, []) or managed) has_config = bool(managed or excluded)
if not has_config: if not has_config:
notes.append( notes.append(
"No /etc files or custom configuration detected for this package." "No changed or custom configuration detected for this package."
) )
simple_packages.append(pkg) simple_packages.append(pkg)
@ -1744,6 +1775,9 @@ def harvest(
PackageSnapshot( PackageSnapshot(
package=pkg, package=pkg,
role_name=role, role_name=role,
section=_package_section_from_installations(
installed_pkgs.get(pkg, [])
),
managed_files=managed, managed_files=managed,
managed_links=[], managed_links=[],
excluded=excluded, excluded=excluded,
@ -2487,6 +2521,7 @@ def harvest(
arches = sorted({i.get("arch") for i in installs if i.get("arch")}) arches = sorted({i.get("arch") for i in installs if i.get("arch")})
vers = sorted({i.get("version") for i in installs if i.get("version")}) vers = sorted({i.get("version") for i in installs if i.get("version")})
version: Optional[str] = vers[0] if len(vers) == 1 else None version: Optional[str] = vers[0] if len(vers) == 1 else None
section = _package_section_from_installations(installs)
observed: List[Dict[str, str]] = [] observed: List[Dict[str, str]] = []
if pkg in manual_set: if pkg in manual_set:
@ -2509,6 +2544,7 @@ def harvest(
"version": version, "version": version,
"arches": arches, "arches": arches,
"installations": installs, "installations": installs,
"section": section,
"observed_via": observed, "observed_via": observed,
"roles": roles, "roles": roles,
} }

View file

@ -80,6 +80,78 @@ def _yaml_dump_mapping(obj: Dict[str, Any], *, sort_keys: bool = True) -> str:
) )
def _role_id(raw: str) -> str:
"""Return an Ansible-safe role identifier from an arbitrary label."""
s = re.sub(r"[^A-Za-z0-9]+", "_", raw or "misc")
s = re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", s)
s = s.lower()
s = re.sub(r"_+", "_", s).strip("_")
if not s:
s = "misc"
if not re.match(r"^[a-z_]", s):
s = "r_" + s
return s
def _package_section_label(
package_role: Dict[str, Any], inventory_packages: Dict[str, Any]
) -> str:
"""Return the Debian Section/RPM Group label for a package role."""
pkg = str(package_role.get("package") or "")
inv = inventory_packages.get(pkg) or {}
candidates: List[str] = []
for value in (package_role.get("section"), inv.get("section")):
if isinstance(value, str) and value.strip():
candidates.append(value.strip())
for inst in inv.get("installations", []) or []:
if not isinstance(inst, dict):
continue
for key in ("section", "group"):
value = inst.get(key)
if isinstance(value, str) and value.strip():
candidates.append(value.strip())
for value in candidates:
if value.lower() in {"(none)", "none", "unspecified"}:
continue
return value
return "misc"
def _section_label_for_packages(
packages: List[str], inventory_packages: Dict[str, Any]
) -> str:
"""Return a stable section/group label for a set of packages.
Service roles can involve more than one package. Prefer the first
package with a concrete Debian Section/RPM Group and fall back to misc
when no package metadata is available.
"""
for pkg in packages or []:
label = _package_section_label({"package": pkg}, inventory_packages)
if label and label.lower() != "misc":
return label
return "misc"
def _section_role_name(label: str, occupied_roles: Set[str]) -> str:
"""Create a stable section role name, avoiding generated-role collisions."""
base = avoid_reserved_role_name(_role_id(label), prefix="section")
role = base if base not in occupied_roles else f"section_{base}"
n = 2
while role in occupied_roles:
role = f"section_{base}_{n}"
n += 1
occupied_roles.add(role)
return role
def _merge_mappings_overwrite( def _merge_mappings_overwrite(
existing: Dict[str, Any], incoming: Dict[str, Any] existing: Dict[str, Any], incoming: Dict[str, Any]
) -> Dict[str, Any]: ) -> Dict[str, Any]:
@ -650,6 +722,39 @@ def _render_install_packages_tasks(role: str, var_prefix: str) -> str:
""" """
def _render_grouped_systemd_tasks(var_prefix: str) -> str:
"""Render tasks to manage multiple systemd units in a common role."""
return f"""- name: Probe whether grouped systemd units exist and are manageable
ansible.builtin.systemd:
name: "{{{{ item.name }}}}"
check_mode: true
loop: "{{{{ {var_prefix}_systemd_units | default([]) }}}}"
register: _enroll_unit_probes
failed_when: false
changed_when: false
when: item.manage | default(false)
- name: Ensure grouped unit enablement matches harvest
ansible.builtin.systemd:
name: "{{{{ item.item.name }}}}"
enabled: "{{{{ item.item.enabled | bool }}}}"
loop: "{{{{ _enroll_unit_probes.results | default([]) }}}}"
when:
- item.item.manage | default(false)
- not (item.failed | default(false))
- name: Ensure grouped unit running state matches harvest
ansible.builtin.systemd:
name: "{{{{ item.item.name }}}}"
state: "{{{{ item.item.state }}}}"
loop: "{{{{ _enroll_unit_probes.results | default([]) }}}}"
when:
- item.item.manage | default(false)
- not (item.failed | default(false))
"""
def _render_firewall_runtime_tasks(var_prefix: str) -> str: def _render_firewall_runtime_tasks(var_prefix: str) -> str:
"""Render tasks for live ipset/iptables snapshots.""" """Render tasks for live ipset/iptables snapshots."""
return f"""- name: Ensure firewall runtime snapshot directory exists return f"""- name: Ensure firewall runtime snapshot directory exists
@ -893,13 +998,16 @@ def _manifest_from_bundle_dir(
*, *,
fqdn: Optional[str] = None, fqdn: Optional[str] = None,
jinjaturtle: str = "auto", # auto|on|off jinjaturtle: str = "auto", # auto|on|off
merge_simple_packages: bool = False, no_common_roles: bool = False,
) -> None: ) -> None:
state_path = os.path.join(bundle_dir, "state.json") state_path = os.path.join(bundle_dir, "state.json")
with open(state_path, "r", encoding="utf-8") as f: with open(state_path, "r", encoding="utf-8") as f:
state = json.load(f) state = json.load(f)
roles: Dict[str, Any] = state.get("roles") or {} roles: Dict[str, Any] = state.get("roles") or {}
inventory_packages: Dict[str, Any] = (state.get("inventory") or {}).get(
"packages"
) or {}
services: List[Dict[str, Any]] = roles.get("services", []) services: List[Dict[str, Any]] = roles.get("services", [])
package_roles: List[Dict[str, Any]] = roles.get("packages", []) package_roles: List[Dict[str, Any]] = roles.get("packages", [])
@ -914,6 +1022,7 @@ def _manifest_from_bundle_dir(
extra_paths_snapshot: Dict[str, Any] = roles.get("extra_paths", {}) extra_paths_snapshot: Dict[str, Any] = roles.get("extra_paths", {})
site_mode = fqdn is not None and fqdn != "" site_mode = fqdn is not None and fqdn != ""
use_common_roles = (not site_mode) and (not no_common_roles)
jt_exe = find_jinjaturtle_cmd() jt_exe = find_jinjaturtle_cmd()
jt_enabled = False jt_enabled = False
@ -954,6 +1063,20 @@ def _manifest_from_bundle_dir(
manifested_extra_paths_roles: List[str] = [] manifested_extra_paths_roles: List[str] = []
manifested_service_roles: List[str] = [] manifested_service_roles: List[str] = []
manifested_pkg_roles: List[str] = [] manifested_pkg_roles: List[str] = []
common_role_groups: Dict[str, List[Dict[str, Any]]] = {}
common_tail_roles: List[str] = []
if use_common_roles:
for svc in services:
label = _section_label_for_packages(
svc.get("packages", []) or [], inventory_packages
)
common_role_groups.setdefault(label, []).append(
{"kind": "service", "snapshot": svc}
)
services_to_manifest: List[Dict[str, Any]] = []
else:
services_to_manifest = services
# ------------------------- # -------------------------
# Users role (non-system users) # Users role (non-system users)
@ -2370,7 +2493,7 @@ User-requested extra file harvesting.
# ------------------------- # -------------------------
# Service roles # Service roles
# ------------------------- # -------------------------
for svc in services: for svc in services_to_manifest:
source_role = svc["role_name"] source_role = svc["role_name"]
role = avoid_reserved_role_name(source_role, prefix="service") role = avoid_reserved_role_name(source_role, prefix="service")
unit = svc["unit"] unit = svc["unit"]
@ -2549,64 +2672,200 @@ Generated from `{unit}`.
manifested_service_roles.append(role) manifested_service_roles.append(role)
# ------------------------- # -------------------------
# Merge simple packages (if --merge-simple-packages is set) # Common package section/group roles
# #
# Packages with no configuration files, systemd units, or cron jobs # Outside --fqdn/site mode, package and systemd-unit roles are grouped by
# are merged into a single 'common_packages' role to reduce role count. # Debian Section or RPM Group by default. Managed config and unit state can
# live in those section roles too; --no-common-roles preserves the historic
# one-role-per-package/unit output, and --fqdn implies that mode because
# grouped role contents would be unsafe across multiple harvested hosts.
# ------------------------- # -------------------------
simple_packages_list: List[str] = [] if use_common_roles:
if merge_simple_packages:
filtered_package_roles: List[Dict[str, Any]] = []
for pr in package_roles: for pr in package_roles:
has_config = pr.get("has_config", True) label = _package_section_label(pr, inventory_packages)
managed_files = pr.get("managed_files", []) or [] common_role_groups.setdefault(label, []).append(
# A package is "simple" if it has no config files AND no managed files {"kind": "package", "snapshot": pr}
if not has_config and not managed_files: )
pkg = pr.get("package") package_roles = []
if pkg:
simple_packages_list.append(pkg)
else:
filtered_package_roles.append(pr)
package_roles = filtered_package_roles
# ------------------------- # -------------------------
# Manually installed package roles # Manually installed package roles
# ------------------------- # -------------------------
# First, create the common_packages role if we have simple packages to merge occupied_roles: Set[str] = set(
if simple_packages_list: manifested_apt_config_roles
role = "common_packages" + manifested_dnf_config_roles
+ manifested_users_roles
+ manifested_flatpak_roles
+ manifested_snap_roles
+ manifested_service_roles
+ manifested_firewall_runtime_roles
+ manifested_etc_custom_roles
+ manifested_usr_local_custom_roles
+ manifested_extra_paths_roles
)
for pr in package_roles:
occupied_roles.add(
avoid_reserved_role_name(str(pr.get("role_name") or ""), prefix="package")
)
for section_label, entries in sorted(common_role_groups.items()):
role = _section_role_name(section_label, occupied_roles)
role_dir = os.path.join(roles_root, role) role_dir = os.path.join(roles_root, role)
_write_role_scaffold(role_dir) _write_role_scaffold(role_dir)
var_prefix = role var_prefix = role
packages_set: Set[str] = set()
# No managed files for common_packages - just package installation
files_var: List[Dict[str, Any]] = [] files_var: List[Dict[str, Any]] = []
links_var: List[Dict[str, Any]] = []
dirs_var: List[Dict[str, Any]] = [] dirs_var: List[Dict[str, Any]] = []
links_var: List[Dict[str, Any]] = []
systemd_units: List[Dict[str, Any]] = []
excluded_all: List[Dict[str, Any]] = []
notes_all: List[str] = []
origin_lines: List[str] = []
jt_combined: Dict[str, Any] = {}
seen_files: Set[tuple] = set()
seen_dirs: Set[tuple] = set()
seen_links: Set[tuple] = set()
seen_units: Set[str] = set()
for entry in entries:
kind = entry.get("kind") or "package"
snap = entry.get("snapshot") or {}
source_role = str(snap.get("role_name") or "")
managed_files = snap.get("managed_files", []) or []
managed_dirs = snap.get("managed_dirs", []) or []
managed_links = snap.get("managed_links", []) or []
excluded = snap.get("excluded", []) or []
notes = snap.get("notes", []) or []
if kind == "service":
pkgs = snap.get("packages", []) or []
unit = str(snap.get("unit") or "")
origin_lines.append(f"service `{unit}` from role `{source_role}`")
else:
pkg = str(snap.get("package") or "")
pkgs = [pkg] if pkg else []
origin_lines.append(f"package `{pkg}` from role `{source_role}`")
for pkg in pkgs:
if pkg:
packages_set.add(str(pkg))
templated: Set[str] = set()
jt_vars = ""
if managed_files and source_role:
templated, jt_vars = _jinjify_managed_files(
bundle_dir,
source_role,
role_dir,
managed_files,
jt_exe=jt_exe,
jt_enabled=jt_enabled,
overwrite_templates=True,
)
_copy_artifacts(
bundle_dir,
source_role,
os.path.join(role_dir, "files"),
exclude_rels=templated,
)
notify_other = "Restart managed services" if kind == "service" else None
for item in _build_managed_files_var(
managed_files,
templated,
notify_other=notify_other,
notify_systemd="Run systemd daemon-reload",
):
key = (item.get("dest"), item.get("src_rel"), item.get("kind"))
if key not in seen_files:
seen_files.add(key)
files_var.append(item)
for item in _build_managed_dirs_var(managed_dirs):
key = (
item.get("dest"),
item.get("owner"),
item.get("group"),
item.get("mode"),
)
if key not in seen_dirs:
seen_dirs.add(key)
dirs_var.append(item)
for item in _build_managed_links_var(managed_links):
key = (item.get("dest"), item.get("src"))
if key not in seen_links:
seen_links.add(key)
links_var.append(item)
if kind == "service":
unit = str(snap.get("unit") or "")
if unit and unit not in seen_units:
seen_units.add(unit)
unit_file_state = str(snap.get("unit_file_state") or "")
enabled_at_harvest = unit_file_state in (
"enabled",
"enabled-runtime",
)
desired_state = (
"started" if snap.get("active_state") == "active" else "stopped"
)
systemd_units.append(
{
"name": unit,
"manage": True,
"enabled": bool(enabled_at_harvest),
"state": desired_state,
}
)
excluded_all.extend(excluded)
notes_all.extend(str(n) for n in notes)
jt_map = _yaml_load_mapping(jt_vars) if jt_vars.strip() else {}
jt_combined = _merge_mappings_overwrite(jt_combined, jt_map)
packages = sorted(packages_set)
files_var = sorted(files_var, key=lambda x: str(x.get("dest") or ""))
dirs_var = sorted(dirs_var, key=lambda x: str(x.get("dest") or ""))
links_var = sorted(links_var, key=lambda x: str(x.get("dest") or ""))
systemd_units = sorted(systemd_units, key=lambda x: str(x.get("name") or ""))
base_vars: Dict[str, Any] = { base_vars: Dict[str, Any] = {
f"{var_prefix}_packages": simple_packages_list, f"{var_prefix}_packages": packages,
f"{var_prefix}_managed_files": files_var, f"{var_prefix}_managed_files": files_var,
f"{var_prefix}_managed_dirs": dirs_var, f"{var_prefix}_managed_dirs": dirs_var,
f"{var_prefix}_managed_links": links_var, f"{var_prefix}_managed_links": links_var,
f"{var_prefix}_systemd_units": systemd_units,
} }
base_vars = _merge_mappings_overwrite(base_vars, jt_combined)
if site_mode:
_write_role_defaults(
role_dir,
{
f"{var_prefix}_packages": [],
f"{var_prefix}_managed_files": [],
f"{var_prefix}_managed_dirs": [],
f"{var_prefix}_managed_links": [],
},
)
_write_hostvars(out_dir, fqdn or "", role, base_vars)
else:
_write_role_defaults(role_dir, base_vars) _write_role_defaults(role_dir, base_vars)
handlers = "---\n" if {"cron", "logrotate"}.intersection(packages_set):
common_tail_roles.append(role)
handlers = (
"""---
- name: Run systemd daemon-reload
ansible.builtin.systemd:
daemon_reload: true
- name: Restart managed services
ansible.builtin.service:
name: "{{ item.name }}"
state: restarted
loop: "{{ """
+ f"{var_prefix}_systemd_units"
+ """ | default([]) }}"
when:
- item.manage | default(false)
- (item.state | default('stopped')) == 'started'
"""
)
with open( with open(
os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8" os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8"
) as f: ) as f:
@ -2614,6 +2873,10 @@ Generated from `{unit}`.
task_parts: List[str] = [] task_parts: List[str] = []
task_parts.append("---\n" + _render_install_packages_tasks(role, var_prefix)) task_parts.append("---\n" + _render_install_packages_tasks(role, var_prefix))
task_parts.append(
_render_generic_files_tasks(var_prefix, include_restart_notify=True)
)
task_parts.append(_render_grouped_systemd_tasks(var_prefix))
tasks = "\n".join(task_parts).rstrip() + "\n" tasks = "\n".join(task_parts).rstrip() + "\n"
with open( with open(
@ -2628,14 +2891,28 @@ Generated from `{unit}`.
readme = f"""# {role} readme = f"""# {role}
Common packages with no configuration files. Common role for package section/group `{section_label}`.
This role was created by merging simple packages using the `--merge-simple-packages` flag. ## Origin roles
{os.linesep.join("- " + line for line in sorted(origin_lines)) or "- (none)"}
## Packages ## Packages
{os.linesep.join("- " + p for p in simple_packages_list) or "- (none)"} {os.linesep.join("- " + p for p in packages) or "- (none)"}
> Note: This role only installs packages; it does not manage any configuration files or services. ## Managed files
{os.linesep.join("- " + mf["dest"] for mf in files_var) or "- (none)"}
## Managed symlinks
{os.linesep.join("- " + ml["dest"] + " -> " + ml["src"] for ml in links_var) or "- (none)"}
## Systemd units
{os.linesep.join("- " + u["name"] + " (enabled=" + str(u["enabled"]).lower() + ", state=" + u["state"] + ")" for u in systemd_units) or "- (none)"}
## Excluded (possible secrets / unsafe)
{os.linesep.join("- " + e.get("path", "") + " (" + e.get("reason", "") + ")" for e in excluded_all) or "- (none)"}
## Notes
{os.linesep.join("- " + n for n in notes_all) or "- (none)"}
""" """
with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f: with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f:
f.write(readme) f.write(readme)
@ -2776,6 +3053,9 @@ Generated for package `{pkg}`.
for r in ("cron", "logrotate"): for r in ("cron", "logrotate"):
if r in manifested_pkg_roles: if r in manifested_pkg_roles:
tail_roles.append(r) tail_roles.append(r)
for r in common_tail_roles:
if r in manifested_pkg_roles and r not in tail_roles:
tail_roles.append(r)
main_pkg_roles = [r for r in manifested_pkg_roles if r not in set(tail_roles)] main_pkg_roles = [r for r in manifested_pkg_roles if r not in set(tail_roles)]
@ -2809,7 +3089,7 @@ def manifest(
fqdn: Optional[str] = None, fqdn: Optional[str] = None,
jinjaturtle: str = "auto", # auto|on|off jinjaturtle: str = "auto", # auto|on|off
sops_fingerprints: Optional[List[str]] = None, sops_fingerprints: Optional[List[str]] = None,
merge_simple_packages: bool = False, no_common_roles: bool = False,
) -> Optional[str]: ) -> Optional[str]:
"""Render an Ansible manifest from a harvest. """Render an Ansible manifest from a harvest.
@ -2842,7 +3122,7 @@ def manifest(
out, out,
fqdn=fqdn, fqdn=fqdn,
jinjaturtle=jinjaturtle, jinjaturtle=jinjaturtle,
merge_simple_packages=merge_simple_packages, no_common_roles=no_common_roles,
) )
return None return None
@ -2862,7 +3142,7 @@ def manifest(
str(tmp_out), str(tmp_out),
fqdn=fqdn, fqdn=fqdn,
jinjaturtle=jinjaturtle, jinjaturtle=jinjaturtle,
merge_simple_packages=merge_simple_packages, no_common_roles=no_common_roles,
) )
enc = _encrypt_manifest_out_dir_to_sops( enc = _encrypt_manifest_out_dir_to_sops(

View file

@ -148,7 +148,7 @@ def list_installed_packages() -> Dict[str, List[Dict[str, str]]]:
Uses `rpm -qa` and is expected to work on RHEL/Fedora-like systems. Uses `rpm -qa` and is expected to work on RHEL/Fedora-like systems.
Output format: Output format:
{"pkg": [{"version": "...", "arch": "..."}, ...], ...} {"pkg": [{"version": "...", "arch": "...", "group": "..."}, ...], ...}
The version string is formatted as: The version string is formatted as:
- "<version>-<release>" for typical packages - "<version>-<release>" for typical packages
@ -161,7 +161,7 @@ def list_installed_packages() -> Dict[str, List[Dict[str, str]]]:
"rpm", "rpm",
"-qa", "-qa",
"--qf", "--qf",
"%{NAME}\t%{EPOCHNUM}\t%{VERSION}\t%{RELEASE}\t%{ARCH}\n", "%{NAME}\t%{EPOCHNUM}\t%{VERSION}\t%{RELEASE}\t%{ARCH}\t%{GROUP}\n",
], ],
allow_fail=False, allow_fail=False,
merge_err=True, merge_err=True,
@ -190,7 +190,11 @@ def list_installed_packages() -> Dict[str, List[Dict[str, str]]]:
if epoch and epoch.isdigit() and epoch != "0": if epoch and epoch.isdigit() and epoch != "0":
v = f"{epoch}:{v}" v = f"{epoch}:{v}"
pkgs.setdefault(name, []).append({"version": v, "arch": arch}) instance = {"version": v, "arch": arch}
if len(parts) >= 6 and parts[5].strip():
instance["group"] = parts[5].strip()
pkgs.setdefault(name, []).append(instance)
for k in list(pkgs.keys()): for k in list(pkgs.keys()):
pkgs[k] = sorted( pkgs[k] = sorted(

View file

@ -117,6 +117,14 @@
"minLength": 1, "minLength": 1,
"type": "string" "type": "string"
}, },
"group": {
"minLength": 1,
"type": "string"
},
"section": {
"minLength": 1,
"type": "string"
},
"version": { "version": {
"minLength": 1, "minLength": 1,
"type": "string" "type": "string"
@ -364,6 +372,12 @@
}, },
"type": "array" "type": "array"
}, },
"section": {
"type": [
"string",
"null"
]
},
"version": { "version": {
"type": [ "type": [
"string", "string",
@ -394,6 +408,12 @@
"has_config": { "has_config": {
"type": "boolean", "type": "boolean",
"default": true "default": true
},
"section": {
"type": [
"string",
"null"
]
} }
}, },
"required": [ "required": [

View file

@ -44,20 +44,12 @@ poetry run \
--format json | jq --format json | jq
DEBIAN_FRONTEND=noninteractive apt-get remove -y --purge cowsay DEBIAN_FRONTEND=noninteractive apt-get remove -y --purge cowsay
# Ensure some flatpaks are installed # No common roles mode (tested later)
DEBIAN_FRONTEND=noninteractive apt-get install -y flatpak
flatpak remote-add --if-not-exists flathub https://dl.flathub.org/repo/flathub.flatpakrepo
flatpak install -y flathub org.onionshare.OnionShare
poetry run \ poetry run \
enroll manifest \ enroll manifest \
--harvest "${BUNDLE_DIR}2" \ --harvest "${BUNDLE_DIR}2" \
--out "${ANSIBLE_DIR}2" --out "${ANSIBLE_DIR}2" \
--no-common-roles
# Test the presence of OnionShare
builtin cd "${ANSIBLE_DIR}2"
grep -r org.onionshare.OnionShare "${ANSIBLE_DIR}2/roles"
ansible-playbook playbook.yml -i "localhost," -c local --check --diff --tags role_flatpak
# Ansible test # Ansible test
builtin cd "${ANSIBLE_DIR}" builtin cd "${ANSIBLE_DIR}"
@ -66,3 +58,8 @@ ansible-lint "${ANSIBLE_DIR}"
# Run # Run
ansible-playbook playbook.yml -i "localhost," -c local --check --diff ansible-playbook playbook.yml -i "localhost," -c local --check --diff
# Test the --no-common-roles mode
builtin cd "${ANSIBLE_DIR}2"
ls "${ANSIBLE_DIR}2/roles"
ansible-playbook playbook.yml -i "localhost," -c local --check --diff

View file

@ -47,6 +47,7 @@ def test_cli_manifest_subcommand_calls_manifest(monkeypatch, tmp_path):
# Common manifest args should be passed through by the CLI. # Common manifest args should be passed through by the CLI.
called["fqdn"] = kwargs.get("fqdn") called["fqdn"] = kwargs.get("fqdn")
called["jinjaturtle"] = kwargs.get("jinjaturtle") called["jinjaturtle"] = kwargs.get("jinjaturtle")
called["no_common_roles"] = kwargs.get("no_common_roles")
monkeypatch.setattr(cli, "manifest", fake_manifest) monkeypatch.setattr(cli, "manifest", fake_manifest)
monkeypatch.setattr( monkeypatch.setattr(
@ -67,6 +68,36 @@ def test_cli_manifest_subcommand_calls_manifest(monkeypatch, tmp_path):
assert called["out"] == str(tmp_path / "ansible") assert called["out"] == str(tmp_path / "ansible")
assert called["fqdn"] is None assert called["fqdn"] is None
assert called["jinjaturtle"] == "auto" assert called["jinjaturtle"] == "auto"
assert called["no_common_roles"] is False
def test_cli_manifest_no_common_roles_is_forwarded(monkeypatch, tmp_path):
called = {}
def fake_manifest(harvest_dir: str, out_dir: str, **kwargs):
called["harvest"] = harvest_dir
called["out"] = out_dir
called["no_common_roles"] = kwargs.get("no_common_roles")
monkeypatch.setattr(cli, "manifest", fake_manifest)
monkeypatch.setattr(
sys,
"argv",
[
"enroll",
"manifest",
"--harvest",
str(tmp_path / "bundle"),
"--out",
str(tmp_path / "ansible"),
"--no-common-roles",
],
)
cli.main()
assert called["harvest"] == str(tmp_path / "bundle")
assert called["out"] == str(tmp_path / "ansible")
assert called["no_common_roles"] is True
def test_cli_enroll_subcommand_runs_harvest_then_manifest(monkeypatch, tmp_path): def test_cli_enroll_subcommand_runs_harvest_then_manifest(monkeypatch, tmp_path):

View file

@ -169,7 +169,7 @@ def test_list_installed_packages_parses_output():
original_run = d.subprocess.run original_run = d.subprocess.run
def fake_run(cmd, text, capture_output, check): def fake_run(cmd, text, capture_output, check):
return P(0, "nginx\t1.18.0\tamd64\nvim\t8.2\tamd64\n") return P(0, "nginx\t1.18.0\tamd64\tweb\nvim\t8.2\tamd64\teditors\n")
d.subprocess.run = fake_run d.subprocess.run = fake_run
try: try:
@ -177,6 +177,7 @@ def test_list_installed_packages_parses_output():
assert "nginx" in result assert "nginx" in result
assert result["nginx"][0]["version"] == "1.18.0" assert result["nginx"][0]["version"] == "1.18.0"
assert result["nginx"][0]["arch"] == "amd64" assert result["nginx"][0]["arch"] == "amd64"
assert result["nginx"][0]["section"] == "web"
assert "vim" in result assert "vim" in result
finally: finally:
d.subprocess.run = original_run d.subprocess.run = original_run

View file

@ -202,7 +202,12 @@ def test_harvest_dedup_manual_packages_and_builds_etc_custom(
owned_etc = {"/etc/openvpn/server.conf"} owned_etc = {"/etc/openvpn/server.conf"}
etc_owner_map = {"/etc/openvpn/server.conf": "openvpn"} etc_owner_map = {"/etc/openvpn/server.conf": "openvpn"}
topdir_to_pkgs = {"openvpn": {"openvpn"}} topdir_to_pkgs = {"openvpn": {"openvpn"}}
pkg_to_etc_paths = {"openvpn": ["/etc/openvpn/server.conf"], "curl": []} # curl has a package-owned /etc path, but no changed/custom harvested
# artifacts. That should still be considered a simple package role.
pkg_to_etc_paths = {
"openvpn": ["/etc/openvpn/server.conf"],
"curl": ["/etc/curl/curlrc"],
}
backend = FakeBackend( backend = FakeBackend(
name="dpkg", name="dpkg",
@ -264,6 +269,9 @@ def test_harvest_dedup_manual_packages_and_builds_etc_custom(
pkg_roles = st["roles"]["packages"] pkg_roles = st["roles"]["packages"]
assert all(pr["package"] != "openvpn" for pr in pkg_roles) assert all(pr["package"] != "openvpn" for pr in pkg_roles)
assert any(pr["package"] == "curl" for pr in pkg_roles) assert any(pr["package"] == "curl" for pr in pkg_roles)
curl_role = next(pr for pr in pkg_roles if pr["package"] == "curl")
assert curl_role["has_config"] is False
assert any("No changed or custom configuration" in n for n in curl_role["notes"])
# Inventory provenance: openvpn should be observed via systemd unit. # Inventory provenance: openvpn should be observed via systemd unit.
openvpn_obs = inv["openvpn"]["observed_via"] openvpn_obs = inv["openvpn"]["observed_via"]

View file

@ -9,6 +9,80 @@ import pytest
import enroll.manifest as manifest import enroll.manifest as manifest
def _minimal_package_state(packages):
return {
"schema_version": 3,
"host": {"hostname": "test", "os": "debian", "pkg_backend": "dpkg"},
"inventory": {
"packages": {
p["package"]: {
"version": "1.0",
"arches": ["amd64"],
"installations": [
{
"version": "1.0",
"arch": "amd64",
"section": p.get("section") or "misc",
}
],
"section": p.get("section") or "misc",
"observed_via": [{"kind": "package_role", "ref": p["role_name"]}],
"roles": [p["role_name"]],
}
for p in packages
}
},
"roles": {
"users": {
"role_name": "users",
"users": [],
"managed_files": [],
"excluded": [],
"notes": [],
},
"services": [],
"packages": packages,
"apt_config": {
"role_name": "apt_config",
"managed_files": [],
"excluded": [],
"notes": [],
},
"dnf_config": {
"role_name": "dnf_config",
"managed_files": [],
"excluded": [],
"notes": [],
},
"etc_custom": {
"role_name": "etc_custom",
"managed_files": [],
"excluded": [],
"notes": [],
},
"usr_local_custom": {
"role_name": "usr_local_custom",
"managed_files": [],
"excluded": [],
"notes": [],
},
"extra_paths": {
"role_name": "extra_paths",
"include_patterns": [],
"exclude_patterns": [],
"managed_files": [],
"excluded": [],
"notes": [],
},
},
}
def _write_state(bundle: Path, state: dict) -> None:
bundle.mkdir(parents=True, exist_ok=True)
(bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8")
def test_manifest_writes_roles_and_playbook_with_clean_when(tmp_path: Path): def test_manifest_writes_roles_and_playbook_with_clean_when(tmp_path: Path):
bundle = tmp_path / "bundle" bundle = tmp_path / "bundle"
out = tmp_path / "ansible" out = tmp_path / "ansible"
@ -181,7 +255,7 @@ def test_manifest_writes_roles_and_playbook_with_clean_when(tmp_path: Path):
bundle / "artifacts" / "usr_local_custom" / "usr" / "local" / "bin" / "myscript" bundle / "artifacts" / "usr_local_custom" / "usr" / "local" / "bin" / "myscript"
).write_text("#!/bin/sh\necho hi\n", encoding="utf-8") ).write_text("#!/bin/sh\necho hi\n", encoding="utf-8")
manifest.manifest(str(bundle), str(out)) manifest.manifest(str(bundle), str(out), no_common_roles=True)
# Service role: systemd management should be gated on foo_manage_unit and a probe. # Service role: systemd management should be gated on foo_manage_unit and a probe.
tasks = (out / "roles" / "foo" / "tasks" / "main.yml").read_text(encoding="utf-8") tasks = (out / "roles" / "foo" / "tasks" / "main.yml").read_text(encoding="utf-8")
@ -213,6 +287,365 @@ def test_manifest_writes_roles_and_playbook_with_clean_when(tmp_path: Path):
assert "role: foo" in pb assert "role: foo" in pb
def test_manifest_groups_simple_packages_by_section_by_default(tmp_path: Path):
bundle = tmp_path / "bundle"
out = tmp_path / "ansible"
state = _minimal_package_state(
[
{
"package": "curl",
"role_name": "curl",
"section": "net",
"has_config": False,
"managed_files": [],
"managed_dirs": [],
"managed_links": [],
"excluded": [],
"notes": [],
},
{
"package": "rsync",
"role_name": "rsync",
"section": "net",
"has_config": False,
"managed_files": [],
"managed_dirs": [],
"managed_links": [],
"excluded": [],
"notes": [],
},
{
"package": "vim",
"role_name": "vim",
"section": "editors",
"has_config": False,
"managed_files": [],
"managed_dirs": [],
"managed_links": [],
"excluded": [],
"notes": [],
},
{
"package": "nginx",
"role_name": "nginx",
"section": "httpd",
"has_config": True,
"managed_files": [],
"managed_dirs": [],
"managed_links": [],
"excluded": [],
"notes": [],
},
]
)
_write_state(bundle, state)
manifest.manifest(str(bundle), str(out))
assert (out / "roles" / "net").exists()
assert (out / "roles" / "editors").exists()
assert (out / "roles" / "httpd").exists()
assert not (out / "roles" / "curl").exists()
assert not (out / "roles" / "rsync").exists()
assert not (out / "roles" / "vim").exists()
assert not (out / "roles" / "nginx").exists()
net_defaults = (out / "roles" / "net" / "defaults" / "main.yml").read_text(
encoding="utf-8"
)
assert "- curl" in net_defaults
assert "- rsync" in net_defaults
pb = (out / "playbook.yml").read_text(encoding="utf-8")
assert "role: net" in pb
assert "role: editors" in pb
assert "role: httpd" in pb
def test_manifest_no_common_roles_preserves_package_roles(tmp_path: Path):
bundle = tmp_path / "bundle"
out = tmp_path / "ansible"
state = _minimal_package_state(
[
{
"package": "curl",
"role_name": "curl",
"section": "net",
"has_config": False,
"managed_files": [],
"managed_dirs": [],
"managed_links": [],
"excluded": [],
"notes": [],
},
{
"package": "vim",
"role_name": "vim",
"section": "editors",
"has_config": False,
"managed_files": [],
"managed_dirs": [],
"managed_links": [],
"excluded": [],
"notes": [],
},
]
)
_write_state(bundle, state)
manifest.manifest(str(bundle), str(out), no_common_roles=True)
assert (out / "roles" / "curl").exists()
assert (out / "roles" / "vim").exists()
assert not (out / "roles" / "net").exists()
assert not (out / "roles" / "editors").exists()
def test_manifest_groups_excluded_package_paths_into_common_roles(tmp_path: Path):
bundle = tmp_path / "bundle"
out = tmp_path / "ansible"
state = _minimal_package_state(
[
{
"package": "secret-agent",
"role_name": "secret_agent",
"section": "net",
"has_config": False,
"managed_files": [],
"managed_dirs": [],
"managed_links": [],
"excluded": [
{"path": "/etc/secret-agent/key", "reason": "possible_secret"}
],
"notes": [],
}
]
)
_write_state(bundle, state)
manifest.manifest(str(bundle), str(out))
assert (out / "roles" / "net").exists()
assert not (out / "roles" / "secret_agent").exists()
readme = (out / "roles" / "net" / "README.md").read_text(encoding="utf-8")
assert "/etc/secret-agent/key" in readme
def test_manifest_groups_managed_package_config_into_common_role(tmp_path: Path):
bundle = tmp_path / "bundle"
out = tmp_path / "ansible"
(bundle / "artifacts" / "nginx" / "etc" / "nginx").mkdir(
parents=True, exist_ok=True
)
(bundle / "artifacts" / "nginx" / "etc" / "nginx" / "nginx.conf").write_text(
"worker_processes auto;\n", encoding="utf-8"
)
state = _minimal_package_state(
[
{
"package": "nginx",
"role_name": "nginx",
"section": "httpd",
"has_config": True,
"managed_files": [
{
"path": "/etc/nginx/nginx.conf",
"src_rel": "etc/nginx/nginx.conf",
"owner": "root",
"group": "root",
"mode": "0644",
"reason": "modified_conffile",
}
],
"managed_dirs": [
{
"path": "/etc/nginx",
"owner": "root",
"group": "root",
"mode": "0755",
"reason": "parent_of_managed_file",
}
],
"managed_links": [],
"excluded": [],
"notes": [],
}
]
)
_write_state(bundle, state)
manifest.manifest(str(bundle), str(out))
assert (out / "roles" / "httpd").exists()
assert not (out / "roles" / "nginx").exists()
defaults = (out / "roles" / "httpd" / "defaults" / "main.yml").read_text(
encoding="utf-8"
)
assert "- nginx" in defaults
assert "dest: /etc/nginx/nginx.conf" in defaults
assert (out / "roles" / "httpd" / "files" / "etc" / "nginx" / "nginx.conf").exists()
def test_manifest_groups_systemd_units_into_common_role(tmp_path: Path):
bundle = tmp_path / "bundle"
out = tmp_path / "ansible"
(bundle / "artifacts" / "network_manager" / "etc" / "NetworkManager").mkdir(
parents=True, exist_ok=True
)
(
bundle
/ "artifacts"
/ "network_manager"
/ "etc"
/ "NetworkManager"
/ "NetworkManager.conf"
).write_text("[main]\n", encoding="utf-8")
state = {
"schema_version": 3,
"host": {"hostname": "test", "os": "debian", "pkg_backend": "dpkg"},
"inventory": {
"packages": {
"network-manager": {
"version": "1.0",
"arches": ["amd64"],
"installations": [
{"version": "1.0", "arch": "amd64", "section": "net"}
],
"section": "net",
"observed_via": [
{"kind": "systemd_unit", "ref": "NetworkManager.service"}
],
"roles": ["network_manager", "network_manager_dispatcher"],
}
}
},
"roles": {
"users": {
"role_name": "users",
"users": [],
"managed_files": [],
"excluded": [],
"notes": [],
},
"services": [
{
"unit": "NetworkManager.service",
"role_name": "network_manager",
"packages": ["network-manager"],
"active_state": "active",
"sub_state": "running",
"unit_file_state": "enabled",
"condition_result": "yes",
"managed_files": [
{
"path": "/etc/NetworkManager/NetworkManager.conf",
"src_rel": "etc/NetworkManager/NetworkManager.conf",
"owner": "root",
"group": "root",
"mode": "0644",
"reason": "modified_conffile",
}
],
"managed_dirs": [],
"managed_links": [],
"excluded": [],
"notes": [],
},
{
"unit": "NetworkManager-dispatcher.service",
"role_name": "network_manager_dispatcher",
"packages": ["network-manager"],
"active_state": "inactive",
"sub_state": "dead",
"unit_file_state": "enabled",
"condition_result": "no",
"managed_files": [],
"managed_dirs": [],
"managed_links": [],
"excluded": [],
"notes": [],
},
],
"packages": [],
"apt_config": {
"role_name": "apt_config",
"managed_files": [],
"excluded": [],
"notes": [],
},
"dnf_config": {
"role_name": "dnf_config",
"managed_files": [],
"excluded": [],
"notes": [],
},
"etc_custom": {
"role_name": "etc_custom",
"managed_files": [],
"excluded": [],
"notes": [],
},
"usr_local_custom": {
"role_name": "usr_local_custom",
"managed_files": [],
"excluded": [],
"notes": [],
},
"extra_paths": {
"role_name": "extra_paths",
"include_patterns": [],
"exclude_patterns": [],
"managed_files": [],
"excluded": [],
"notes": [],
},
},
}
_write_state(bundle, state)
manifest.manifest(str(bundle), str(out))
assert (out / "roles" / "net").exists()
assert not (out / "roles" / "network_manager").exists()
assert not (out / "roles" / "network_manager_dispatcher").exists()
defaults = (out / "roles" / "net" / "defaults" / "main.yml").read_text(
encoding="utf-8"
)
assert "- network-manager" in defaults
assert "name: NetworkManager.service" in defaults
assert "name: NetworkManager-dispatcher.service" in defaults
assert "dest: /etc/NetworkManager/NetworkManager.conf" in defaults
tasks = (out / "roles" / "net" / "tasks" / "main.yml").read_text(encoding="utf-8")
assert "Ensure grouped unit enablement matches harvest" in tasks
def test_manifest_fqdn_implies_no_common_roles(tmp_path: Path):
bundle = tmp_path / "bundle"
out = tmp_path / "ansible"
state = _minimal_package_state(
[
{
"package": "curl",
"role_name": "curl",
"section": "net",
"has_config": False,
"managed_files": [],
"managed_dirs": [],
"managed_links": [],
"excluded": [],
"notes": [],
}
]
)
_write_state(bundle, state)
manifest.manifest(str(bundle), str(out), fqdn="host1.example.test")
assert (out / "roles" / "curl").exists()
assert not (out / "roles" / "net").exists()
def test_manifest_site_mode_creates_host_inventory_and_raw_files(tmp_path: Path): def test_manifest_site_mode_creates_host_inventory_and_raw_files(tmp_path: Path):
"""In --fqdn mode, host-specific state goes into inventory/host_vars.""" """In --fqdn mode, host-specific state goes into inventory/host_vars."""
@ -631,10 +1064,10 @@ def test_manifest_orders_cron_and_logrotate_at_playbook_tail(tmp_path: Path):
ln.strip().removeprefix("- ").strip() for ln in pb if ln.startswith(" - ") ln.strip().removeprefix("- ").strip() for ln in pb if ln.startswith(" - ")
] ]
# Ensure tail ordering. # Ensure the grouped role containing cron/logrotate is still ordered after users.
assert roles[-2:] == ["role: cron", "role: logrotate"] assert roles[-1] == "role: misc"
assert roles.index("role: users") < roles.index("role: misc")
assert "role: users" in roles assert "role: users" in roles
assert roles.index("role: users") < roles.index("role: cron")
def test_yaml_helpers_fallback_when_yaml_unavailable(monkeypatch): def test_yaml_helpers_fallback_when_yaml_unavailable(monkeypatch):
@ -1367,7 +1800,7 @@ def test_manifest_avoids_package_role_collision_with_flatpak_singleton(tmp_path)
bundle.mkdir(parents=True, exist_ok=True) bundle.mkdir(parents=True, exist_ok=True)
(bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8") (bundle / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8")
manifest.manifest(str(bundle), str(out)) manifest.manifest(str(bundle), str(out), no_common_roles=True)
flatpak_defaults = (out / "roles" / "flatpak" / "defaults" / "main.yml").read_text( flatpak_defaults = (out / "roles" / "flatpak" / "defaults" / "main.yml").read_text(
encoding="utf-8" encoding="utf-8"

View file

@ -149,9 +149,9 @@ def test_list_manual_packages_uses_yum_fallback(monkeypatch):
def test_list_installed_packages_parses_epoch_and_sorts(monkeypatch): def test_list_installed_packages_parses_epoch_and_sorts(monkeypatch):
out = ( out = (
"bash\t0\t5.2.26\t1.el9\tx86_64\n" "bash\t0\t5.2.26\t1.el9\tx86_64\tSystem Environment/Shells\n"
"bash\t1\t5.2.26\t1.el9\taarch64\n" "bash\t1\t5.2.26\t1.el9\taarch64\tSystem Environment/Shells\n"
"coreutils\t(none)\t9.1\t2.el9\tx86_64\n" "coreutils\t(none)\t9.1\t2.el9\tx86_64\tSystem Environment/Base\n"
) )
monkeypatch.setattr( monkeypatch.setattr(
rpm, "_run", lambda cmd, allow_fail=False, merge_err=False: (0, out) rpm, "_run", lambda cmd, allow_fail=False, merge_err=False: (0, out)
@ -159,6 +159,7 @@ def test_list_installed_packages_parses_epoch_and_sorts(monkeypatch):
pkgs = rpm.list_installed_packages() pkgs = rpm.list_installed_packages()
assert pkgs["bash"][0]["arch"] == "aarch64" # sorted by arch then version assert pkgs["bash"][0]["arch"] == "aarch64" # sorted by arch then version
assert pkgs["bash"][0]["version"].startswith("1:") assert pkgs["bash"][0]["version"].startswith("1:")
assert pkgs["bash"][0]["group"] == "System Environment/Shells"
assert pkgs["coreutils"][0]["version"] == "9.1-2.el9" assert pkgs["coreutils"][0]["version"] == "9.1-2.el9"