Group all package roles into Debian/RPM 'sections'
Some checks failed
Lint / test (push) Waiting to run
CI / test (push) Has been cancelled

This includes managed config files and unit state.

This mode is not used if `--fqdn` or `--no-common-roles` is set,
in which case, the traditional behaviour of preserving one role
per package/unit is used instead.

This is a breaking change.
This commit is contained in:
Miguel Jacq 2026-06-14 19:19:59 +10:00
parent e2339616fb
commit 1e996f4a43
Signed by: mig5
GPG key ID: 03906B4110AAD3B8
14 changed files with 909 additions and 90 deletions

View file

@ -312,6 +312,14 @@ def _add_common_manifest_args(p: argparse.ArgumentParser) -> None:
"--fqdn",
help="Host FQDN/name for site-mode output (creates inventory/, inventory/host_vars/, playbooks/).",
)
p.add_argument(
"--no-common-roles",
action="store_true",
help=(
"Do not group package and systemd-unit roles into common section/group roles. "
"This preserves one generated role per package/unit. --fqdn implies this."
),
)
g = p.add_mutually_exclusive_group()
g.add_argument(
"--jinjaturtle",
@ -503,11 +511,6 @@ def main() -> None:
"(binary) using the given GPG fingerprint(s). Requires `sops` on PATH."
),
)
m.add_argument(
"--merge-simple-packages",
action="store_true",
help="Merge packages with no configuration files into a single 'common_packages' role.",
)
_add_common_manifest_args(m)
s = sub.add_parser(
@ -570,11 +573,6 @@ def main() -> None:
"or a file path."
),
)
s.add_argument(
"--merge-simple-packages",
action="store_true",
help="Merge packages with no configuration files into a single 'common_packages' role.",
)
_add_common_manifest_args(s)
d = sub.add_parser("diff", help="Compare two harvests and report differences")
@ -921,7 +919,7 @@ def main() -> None:
fqdn=args.fqdn,
jinjaturtle=_jt_mode(args),
sops_fingerprints=getattr(args, "sops", None),
merge_simple_packages=getattr(args, "merge_simple_packages", False),
no_common_roles=bool(getattr(args, "no_common_roles", False)),
)
if getattr(args, "sops", None) and out_enc:
print(str(out_enc))
@ -1059,9 +1057,7 @@ def main() -> None:
fqdn=args.fqdn,
jinjaturtle=_jt_mode(args),
sops_fingerprints=list(sops_fps),
merge_simple_packages=getattr(
args, "merge_simple_packages", False
),
no_common_roles=bool(getattr(args, "no_common_roles", False)),
)
if not args.harvest:
print(str(out_file))
@ -1092,9 +1088,7 @@ def main() -> None:
args.out,
fqdn=args.fqdn,
jinjaturtle=_jt_mode(args),
merge_simple_packages=getattr(
args, "merge_simple_packages", False
),
no_common_roles=bool(getattr(args, "no_common_roles", False)),
)
# For usability (when --harvest wasn't provided), print the harvest path.
if not args.harvest:
@ -1125,9 +1119,7 @@ def main() -> None:
fqdn=args.fqdn,
jinjaturtle=_jt_mode(args),
sops_fingerprints=list(sops_fps),
merge_simple_packages=getattr(
args, "merge_simple_packages", False
),
no_common_roles=bool(getattr(args, "no_common_roles", False)),
)
if not args.harvest:
print(str(out_file))
@ -1147,9 +1139,7 @@ def main() -> None:
args.out,
fqdn=args.fqdn,
jinjaturtle=_jt_mode(args),
merge_simple_packages=getattr(
args, "merge_simple_packages", False
),
no_common_roles=bool(getattr(args, "no_common_roles", False)),
)
except RemoteSudoPasswordRequired:
raise SystemExit(

View file

@ -69,7 +69,7 @@ def list_installed_packages() -> Dict[str, List[Dict[str, str]]]:
Uses dpkg-query and is expected to work on Debian/Ubuntu-like systems.
Output format:
{"pkg": [{"version": "...", "arch": "..."}, ...], ...}
{"pkg": [{"version": "...", "arch": "...", "section": "..."}, ...], ...}
"""
try:
@ -77,7 +77,7 @@ def list_installed_packages() -> Dict[str, List[Dict[str, str]]]:
[
"dpkg-query",
"-W",
"-f=${Package}\t${Version}\t${Architecture}\n",
"-f=${Package}\t${Version}\t${Architecture}\t${Section}\n",
],
text=True,
capture_output=True,
@ -97,7 +97,10 @@ def list_installed_packages() -> Dict[str, List[Dict[str, str]]]:
name, ver, arch = parts[0].strip(), parts[1].strip(), parts[2].strip()
if not name:
continue
out.setdefault(name, []).append({"version": ver, "arch": arch})
instance = {"version": ver, "arch": arch}
if len(parts) >= 4 and parts[3].strip():
instance["section"] = parts[3].strip()
out.setdefault(name, []).append(instance)
# Stable ordering for deterministic JSON dumps.
for k in list(out.keys()):

View file

@ -86,6 +86,7 @@ class ServiceSnapshot:
class PackageSnapshot:
package: str
role_name: str
section: Optional[str] = None
managed_dirs: List[ManagedDir] = field(default_factory=list)
managed_files: List[ManagedFile] = field(default_factory=list)
managed_links: List[ManagedLink] = field(default_factory=list)
@ -389,6 +390,30 @@ def _role_name_from_pkg(pkg: str) -> str:
return avoid_reserved_role_name(_safe_name(pkg), prefix="package")
def _package_section_from_installations(
installs: List[Dict[str, str]],
) -> Optional[str]:
"""Return a stable package grouping label from installed package metadata.
Debian exposes this as ``Section``. RPM-family distributions have a broadly
similar ``Group`` tag, although modern Fedora/RHEL packages may omit it or
set it to ``Unspecified``.
"""
values: Set[str] = set()
for inst in installs or []:
value = (inst.get("section") or inst.get("group") or "").strip()
if not value:
continue
if value.lower() in {"(none)", "none", "unspecified"}:
continue
values.add(value)
if not values:
return None
return sorted(values)[0]
def _copy_into_bundle(
bundle_dir: str, role_name: str, abs_path: str, src_rel: str
) -> None:
@ -1279,6 +1304,9 @@ def harvest(
cron_snapshot = PackageSnapshot(
package=cron_pkg,
role_name=cron_role_name,
section=_package_section_from_installations(
installed_pkgs.get(cron_pkg, [])
),
managed_files=cron_managed,
excluded=cron_excluded,
notes=cron_notes,
@ -1314,6 +1342,9 @@ def harvest(
logrotate_snapshot = PackageSnapshot(
package=logrotate_pkg,
role_name=logrotate_role_name,
section=_package_section_from_installations(
installed_pkgs.get(logrotate_pkg, [])
),
managed_files=lr_managed,
excluded=lr_excluded,
notes=lr_notes,
@ -1732,11 +1763,11 @@ def harvest(
seen_global=captured_global,
)
has_config = bool(pkg_to_etc_paths.get(pkg, []) or managed)
has_config = bool(managed or excluded)
if not has_config:
notes.append(
"No /etc files or custom configuration detected for this package."
"No changed or custom configuration detected for this package."
)
simple_packages.append(pkg)
@ -1744,6 +1775,9 @@ def harvest(
PackageSnapshot(
package=pkg,
role_name=role,
section=_package_section_from_installations(
installed_pkgs.get(pkg, [])
),
managed_files=managed,
managed_links=[],
excluded=excluded,
@ -2487,6 +2521,7 @@ def harvest(
arches = sorted({i.get("arch") for i in installs if i.get("arch")})
vers = sorted({i.get("version") for i in installs if i.get("version")})
version: Optional[str] = vers[0] if len(vers) == 1 else None
section = _package_section_from_installations(installs)
observed: List[Dict[str, str]] = []
if pkg in manual_set:
@ -2509,6 +2544,7 @@ def harvest(
"version": version,
"arches": arches,
"installations": installs,
"section": section,
"observed_via": observed,
"roles": roles,
}

View file

@ -80,6 +80,78 @@ def _yaml_dump_mapping(obj: Dict[str, Any], *, sort_keys: bool = True) -> str:
)
def _role_id(raw: str) -> str:
"""Return an Ansible-safe role identifier from an arbitrary label."""
s = re.sub(r"[^A-Za-z0-9]+", "_", raw or "misc")
s = re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", s)
s = s.lower()
s = re.sub(r"_+", "_", s).strip("_")
if not s:
s = "misc"
if not re.match(r"^[a-z_]", s):
s = "r_" + s
return s
def _package_section_label(
package_role: Dict[str, Any], inventory_packages: Dict[str, Any]
) -> str:
"""Return the Debian Section/RPM Group label for a package role."""
pkg = str(package_role.get("package") or "")
inv = inventory_packages.get(pkg) or {}
candidates: List[str] = []
for value in (package_role.get("section"), inv.get("section")):
if isinstance(value, str) and value.strip():
candidates.append(value.strip())
for inst in inv.get("installations", []) or []:
if not isinstance(inst, dict):
continue
for key in ("section", "group"):
value = inst.get(key)
if isinstance(value, str) and value.strip():
candidates.append(value.strip())
for value in candidates:
if value.lower() in {"(none)", "none", "unspecified"}:
continue
return value
return "misc"
def _section_label_for_packages(
packages: List[str], inventory_packages: Dict[str, Any]
) -> str:
"""Return a stable section/group label for a set of packages.
Service roles can involve more than one package. Prefer the first
package with a concrete Debian Section/RPM Group and fall back to misc
when no package metadata is available.
"""
for pkg in packages or []:
label = _package_section_label({"package": pkg}, inventory_packages)
if label and label.lower() != "misc":
return label
return "misc"
def _section_role_name(label: str, occupied_roles: Set[str]) -> str:
"""Create a stable section role name, avoiding generated-role collisions."""
base = avoid_reserved_role_name(_role_id(label), prefix="section")
role = base if base not in occupied_roles else f"section_{base}"
n = 2
while role in occupied_roles:
role = f"section_{base}_{n}"
n += 1
occupied_roles.add(role)
return role
def _merge_mappings_overwrite(
existing: Dict[str, Any], incoming: Dict[str, Any]
) -> Dict[str, Any]:
@ -650,6 +722,39 @@ def _render_install_packages_tasks(role: str, var_prefix: str) -> str:
"""
def _render_grouped_systemd_tasks(var_prefix: str) -> str:
"""Render tasks to manage multiple systemd units in a common role."""
return f"""- name: Probe whether grouped systemd units exist and are manageable
ansible.builtin.systemd:
name: "{{{{ item.name }}}}"
check_mode: true
loop: "{{{{ {var_prefix}_systemd_units | default([]) }}}}"
register: _enroll_unit_probes
failed_when: false
changed_when: false
when: item.manage | default(false)
- name: Ensure grouped unit enablement matches harvest
ansible.builtin.systemd:
name: "{{{{ item.item.name }}}}"
enabled: "{{{{ item.item.enabled | bool }}}}"
loop: "{{{{ _enroll_unit_probes.results | default([]) }}}}"
when:
- item.item.manage | default(false)
- not (item.failed | default(false))
- name: Ensure grouped unit running state matches harvest
ansible.builtin.systemd:
name: "{{{{ item.item.name }}}}"
state: "{{{{ item.item.state }}}}"
loop: "{{{{ _enroll_unit_probes.results | default([]) }}}}"
when:
- item.item.manage | default(false)
- not (item.failed | default(false))
"""
def _render_firewall_runtime_tasks(var_prefix: str) -> str:
"""Render tasks for live ipset/iptables snapshots."""
return f"""- name: Ensure firewall runtime snapshot directory exists
@ -893,13 +998,16 @@ def _manifest_from_bundle_dir(
*,
fqdn: Optional[str] = None,
jinjaturtle: str = "auto", # auto|on|off
merge_simple_packages: bool = False,
no_common_roles: bool = False,
) -> None:
state_path = os.path.join(bundle_dir, "state.json")
with open(state_path, "r", encoding="utf-8") as f:
state = json.load(f)
roles: Dict[str, Any] = state.get("roles") or {}
inventory_packages: Dict[str, Any] = (state.get("inventory") or {}).get(
"packages"
) or {}
services: List[Dict[str, Any]] = roles.get("services", [])
package_roles: List[Dict[str, Any]] = roles.get("packages", [])
@ -914,6 +1022,7 @@ def _manifest_from_bundle_dir(
extra_paths_snapshot: Dict[str, Any] = roles.get("extra_paths", {})
site_mode = fqdn is not None and fqdn != ""
use_common_roles = (not site_mode) and (not no_common_roles)
jt_exe = find_jinjaturtle_cmd()
jt_enabled = False
@ -954,6 +1063,20 @@ def _manifest_from_bundle_dir(
manifested_extra_paths_roles: List[str] = []
manifested_service_roles: List[str] = []
manifested_pkg_roles: List[str] = []
common_role_groups: Dict[str, List[Dict[str, Any]]] = {}
common_tail_roles: List[str] = []
if use_common_roles:
for svc in services:
label = _section_label_for_packages(
svc.get("packages", []) or [], inventory_packages
)
common_role_groups.setdefault(label, []).append(
{"kind": "service", "snapshot": svc}
)
services_to_manifest: List[Dict[str, Any]] = []
else:
services_to_manifest = services
# -------------------------
# Users role (non-system users)
@ -2370,7 +2493,7 @@ User-requested extra file harvesting.
# -------------------------
# Service roles
# -------------------------
for svc in services:
for svc in services_to_manifest:
source_role = svc["role_name"]
role = avoid_reserved_role_name(source_role, prefix="service")
unit = svc["unit"]
@ -2549,64 +2672,200 @@ Generated from `{unit}`.
manifested_service_roles.append(role)
# -------------------------
# Merge simple packages (if --merge-simple-packages is set)
# Common package section/group roles
#
# Packages with no configuration files, systemd units, or cron jobs
# are merged into a single 'common_packages' role to reduce role count.
# Outside --fqdn/site mode, package and systemd-unit roles are grouped by
# Debian Section or RPM Group by default. Managed config and unit state can
# live in those section roles too; --no-common-roles preserves the historic
# one-role-per-package/unit output, and --fqdn implies that mode because
# grouped role contents would be unsafe across multiple harvested hosts.
# -------------------------
simple_packages_list: List[str] = []
if merge_simple_packages:
filtered_package_roles: List[Dict[str, Any]] = []
if use_common_roles:
for pr in package_roles:
has_config = pr.get("has_config", True)
managed_files = pr.get("managed_files", []) or []
# A package is "simple" if it has no config files AND no managed files
if not has_config and not managed_files:
pkg = pr.get("package")
if pkg:
simple_packages_list.append(pkg)
else:
filtered_package_roles.append(pr)
package_roles = filtered_package_roles
label = _package_section_label(pr, inventory_packages)
common_role_groups.setdefault(label, []).append(
{"kind": "package", "snapshot": pr}
)
package_roles = []
# -------------------------
# Manually installed package roles
# -------------------------
# First, create the common_packages role if we have simple packages to merge
if simple_packages_list:
role = "common_packages"
occupied_roles: Set[str] = set(
manifested_apt_config_roles
+ manifested_dnf_config_roles
+ manifested_users_roles
+ manifested_flatpak_roles
+ manifested_snap_roles
+ manifested_service_roles
+ manifested_firewall_runtime_roles
+ manifested_etc_custom_roles
+ manifested_usr_local_custom_roles
+ manifested_extra_paths_roles
)
for pr in package_roles:
occupied_roles.add(
avoid_reserved_role_name(str(pr.get("role_name") or ""), prefix="package")
)
for section_label, entries in sorted(common_role_groups.items()):
role = _section_role_name(section_label, occupied_roles)
role_dir = os.path.join(roles_root, role)
_write_role_scaffold(role_dir)
var_prefix = role
# No managed files for common_packages - just package installation
packages_set: Set[str] = set()
files_var: List[Dict[str, Any]] = []
links_var: List[Dict[str, Any]] = []
dirs_var: List[Dict[str, Any]] = []
links_var: List[Dict[str, Any]] = []
systemd_units: List[Dict[str, Any]] = []
excluded_all: List[Dict[str, Any]] = []
notes_all: List[str] = []
origin_lines: List[str] = []
jt_combined: Dict[str, Any] = {}
seen_files: Set[tuple] = set()
seen_dirs: Set[tuple] = set()
seen_links: Set[tuple] = set()
seen_units: Set[str] = set()
for entry in entries:
kind = entry.get("kind") or "package"
snap = entry.get("snapshot") or {}
source_role = str(snap.get("role_name") or "")
managed_files = snap.get("managed_files", []) or []
managed_dirs = snap.get("managed_dirs", []) or []
managed_links = snap.get("managed_links", []) or []
excluded = snap.get("excluded", []) or []
notes = snap.get("notes", []) or []
if kind == "service":
pkgs = snap.get("packages", []) or []
unit = str(snap.get("unit") or "")
origin_lines.append(f"service `{unit}` from role `{source_role}`")
else:
pkg = str(snap.get("package") or "")
pkgs = [pkg] if pkg else []
origin_lines.append(f"package `{pkg}` from role `{source_role}`")
for pkg in pkgs:
if pkg:
packages_set.add(str(pkg))
templated: Set[str] = set()
jt_vars = ""
if managed_files and source_role:
templated, jt_vars = _jinjify_managed_files(
bundle_dir,
source_role,
role_dir,
managed_files,
jt_exe=jt_exe,
jt_enabled=jt_enabled,
overwrite_templates=True,
)
_copy_artifacts(
bundle_dir,
source_role,
os.path.join(role_dir, "files"),
exclude_rels=templated,
)
notify_other = "Restart managed services" if kind == "service" else None
for item in _build_managed_files_var(
managed_files,
templated,
notify_other=notify_other,
notify_systemd="Run systemd daemon-reload",
):
key = (item.get("dest"), item.get("src_rel"), item.get("kind"))
if key not in seen_files:
seen_files.add(key)
files_var.append(item)
for item in _build_managed_dirs_var(managed_dirs):
key = (
item.get("dest"),
item.get("owner"),
item.get("group"),
item.get("mode"),
)
if key not in seen_dirs:
seen_dirs.add(key)
dirs_var.append(item)
for item in _build_managed_links_var(managed_links):
key = (item.get("dest"), item.get("src"))
if key not in seen_links:
seen_links.add(key)
links_var.append(item)
if kind == "service":
unit = str(snap.get("unit") or "")
if unit and unit not in seen_units:
seen_units.add(unit)
unit_file_state = str(snap.get("unit_file_state") or "")
enabled_at_harvest = unit_file_state in (
"enabled",
"enabled-runtime",
)
desired_state = (
"started" if snap.get("active_state") == "active" else "stopped"
)
systemd_units.append(
{
"name": unit,
"manage": True,
"enabled": bool(enabled_at_harvest),
"state": desired_state,
}
)
excluded_all.extend(excluded)
notes_all.extend(str(n) for n in notes)
jt_map = _yaml_load_mapping(jt_vars) if jt_vars.strip() else {}
jt_combined = _merge_mappings_overwrite(jt_combined, jt_map)
packages = sorted(packages_set)
files_var = sorted(files_var, key=lambda x: str(x.get("dest") or ""))
dirs_var = sorted(dirs_var, key=lambda x: str(x.get("dest") or ""))
links_var = sorted(links_var, key=lambda x: str(x.get("dest") or ""))
systemd_units = sorted(systemd_units, key=lambda x: str(x.get("name") or ""))
base_vars: Dict[str, Any] = {
f"{var_prefix}_packages": simple_packages_list,
f"{var_prefix}_packages": packages,
f"{var_prefix}_managed_files": files_var,
f"{var_prefix}_managed_dirs": dirs_var,
f"{var_prefix}_managed_links": links_var,
f"{var_prefix}_systemd_units": systemd_units,
}
base_vars = _merge_mappings_overwrite(base_vars, jt_combined)
if site_mode:
_write_role_defaults(
role_dir,
{
f"{var_prefix}_packages": [],
f"{var_prefix}_managed_files": [],
f"{var_prefix}_managed_dirs": [],
f"{var_prefix}_managed_links": [],
},
)
_write_hostvars(out_dir, fqdn or "", role, base_vars)
else:
_write_role_defaults(role_dir, base_vars)
_write_role_defaults(role_dir, base_vars)
handlers = "---\n"
if {"cron", "logrotate"}.intersection(packages_set):
common_tail_roles.append(role)
handlers = (
"""---
- name: Run systemd daemon-reload
ansible.builtin.systemd:
daemon_reload: true
- name: Restart managed services
ansible.builtin.service:
name: "{{ item.name }}"
state: restarted
loop: "{{ """
+ f"{var_prefix}_systemd_units"
+ """ | default([]) }}"
when:
- item.manage | default(false)
- (item.state | default('stopped')) == 'started'
"""
)
with open(
os.path.join(role_dir, "handlers", "main.yml"), "w", encoding="utf-8"
) as f:
@ -2614,6 +2873,10 @@ Generated from `{unit}`.
task_parts: List[str] = []
task_parts.append("---\n" + _render_install_packages_tasks(role, var_prefix))
task_parts.append(
_render_generic_files_tasks(var_prefix, include_restart_notify=True)
)
task_parts.append(_render_grouped_systemd_tasks(var_prefix))
tasks = "\n".join(task_parts).rstrip() + "\n"
with open(
@ -2628,14 +2891,28 @@ Generated from `{unit}`.
readme = f"""# {role}
Common packages with no configuration files.
Common role for package section/group `{section_label}`.
This role was created by merging simple packages using the `--merge-simple-packages` flag.
## Origin roles
{os.linesep.join("- " + line for line in sorted(origin_lines)) or "- (none)"}
## Packages
{os.linesep.join("- " + p for p in simple_packages_list) or "- (none)"}
{os.linesep.join("- " + p for p in packages) or "- (none)"}
> Note: This role only installs packages; it does not manage any configuration files or services.
## Managed files
{os.linesep.join("- " + mf["dest"] for mf in files_var) or "- (none)"}
## Managed symlinks
{os.linesep.join("- " + ml["dest"] + " -> " + ml["src"] for ml in links_var) or "- (none)"}
## Systemd units
{os.linesep.join("- " + u["name"] + " (enabled=" + str(u["enabled"]).lower() + ", state=" + u["state"] + ")" for u in systemd_units) or "- (none)"}
## Excluded (possible secrets / unsafe)
{os.linesep.join("- " + e.get("path", "") + " (" + e.get("reason", "") + ")" for e in excluded_all) or "- (none)"}
## Notes
{os.linesep.join("- " + n for n in notes_all) or "- (none)"}
"""
with open(os.path.join(role_dir, "README.md"), "w", encoding="utf-8") as f:
f.write(readme)
@ -2776,6 +3053,9 @@ Generated for package `{pkg}`.
for r in ("cron", "logrotate"):
if r in manifested_pkg_roles:
tail_roles.append(r)
for r in common_tail_roles:
if r in manifested_pkg_roles and r not in tail_roles:
tail_roles.append(r)
main_pkg_roles = [r for r in manifested_pkg_roles if r not in set(tail_roles)]
@ -2809,7 +3089,7 @@ def manifest(
fqdn: Optional[str] = None,
jinjaturtle: str = "auto", # auto|on|off
sops_fingerprints: Optional[List[str]] = None,
merge_simple_packages: bool = False,
no_common_roles: bool = False,
) -> Optional[str]:
"""Render an Ansible manifest from a harvest.
@ -2842,7 +3122,7 @@ def manifest(
out,
fqdn=fqdn,
jinjaturtle=jinjaturtle,
merge_simple_packages=merge_simple_packages,
no_common_roles=no_common_roles,
)
return None
@ -2862,7 +3142,7 @@ def manifest(
str(tmp_out),
fqdn=fqdn,
jinjaturtle=jinjaturtle,
merge_simple_packages=merge_simple_packages,
no_common_roles=no_common_roles,
)
enc = _encrypt_manifest_out_dir_to_sops(

View file

@ -148,7 +148,7 @@ def list_installed_packages() -> Dict[str, List[Dict[str, str]]]:
Uses `rpm -qa` and is expected to work on RHEL/Fedora-like systems.
Output format:
{"pkg": [{"version": "...", "arch": "..."}, ...], ...}
{"pkg": [{"version": "...", "arch": "...", "group": "..."}, ...], ...}
The version string is formatted as:
- "<version>-<release>" for typical packages
@ -161,7 +161,7 @@ def list_installed_packages() -> Dict[str, List[Dict[str, str]]]:
"rpm",
"-qa",
"--qf",
"%{NAME}\t%{EPOCHNUM}\t%{VERSION}\t%{RELEASE}\t%{ARCH}\n",
"%{NAME}\t%{EPOCHNUM}\t%{VERSION}\t%{RELEASE}\t%{ARCH}\t%{GROUP}\n",
],
allow_fail=False,
merge_err=True,
@ -190,7 +190,11 @@ def list_installed_packages() -> Dict[str, List[Dict[str, str]]]:
if epoch and epoch.isdigit() and epoch != "0":
v = f"{epoch}:{v}"
pkgs.setdefault(name, []).append({"version": v, "arch": arch})
instance = {"version": v, "arch": arch}
if len(parts) >= 6 and parts[5].strip():
instance["group"] = parts[5].strip()
pkgs.setdefault(name, []).append(instance)
for k in list(pkgs.keys()):
pkgs[k] = sorted(

View file

@ -117,6 +117,14 @@
"minLength": 1,
"type": "string"
},
"group": {
"minLength": 1,
"type": "string"
},
"section": {
"minLength": 1,
"type": "string"
},
"version": {
"minLength": 1,
"type": "string"
@ -364,6 +372,12 @@
},
"type": "array"
},
"section": {
"type": [
"string",
"null"
]
},
"version": {
"type": [
"string",
@ -394,6 +408,12 @@
"has_config": {
"type": "boolean",
"default": true
},
"section": {
"type": [
"string",
"null"
]
}
},
"required": [