diff --git a/.forgejo/workflows/build-deb.yml b/.forgejo/workflows/build-deb.yml deleted file mode 100644 index fefc55f..0000000 --- a/.forgejo/workflows/build-deb.yml +++ /dev/null @@ -1,67 +0,0 @@ -name: CI - -on: - push: - -jobs: - test: - runs-on: docker - - steps: - - name: Install system dependencies - run: | - apt-get update - DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \ - build-essential \ - devscripts \ - debhelper \ - dh-python \ - pybuild-plugin-pyproject \ - python3-all \ - python3-poetry-core \ - python3-yaml \ - python3-defusedxml \ - python3-jinja2 \ - python3-toml \ - rsync \ - ca-certificates - - - name: Checkout - uses: actions/checkout@v4 - with: - submodules: recursive - - - name: Build deb - run: | - mkdir /out - - rsync -a --delete \ - --exclude '.git' \ - --exclude '.venv' \ - --exclude 'dist' \ - --exclude 'build' \ - --exclude '__pycache__' \ - --exclude '.pytest_cache' \ - --exclude '.mypy_cache' \ - ./ /out/ - - cd /out/ - export DEBEMAIL="mig@mig5.net" - export DEBFULLNAME="Miguel Jacq" - - dch --distribution "trixie" --local "~trixie" "CI build for trixie" - dpkg-buildpackage -us -uc -b - - # Notify if any previous step in this job failed - - name: Notify on failure - if: ${{ failure() }} - env: - WEBHOOK_URL: ${{ secrets.NODERED_WEBHOOK_URL }} - REPOSITORY: ${{ forgejo.repository }} - RUN_NUMBER: ${{ forgejo.run_number }} - SERVER_URL: ${{ forgejo.server_url }} - run: | - curl -X POST \ - -H "Content-Type: application/json" \ - -d "{\"repository\":\"$REPOSITORY\",\"run_number\":\"$RUN_NUMBER\",\"status\":\"failure\",\"url\":\"$SERVER_URL/$REPOSITORY/actions/runs/$RUN_NUMBER\"}" \ - "$WEBHOOK_URL" diff --git a/.forgejo/workflows/trivy.yml b/.forgejo/workflows/trivy.yml index d5585f4..fad2f6f 100644 --- a/.forgejo/workflows/trivy.yml +++ b/.forgejo/workflows/trivy.yml @@ -23,7 +23,7 @@ jobs: - name: Run trivy run: | - trivy fs --no-progress --ignore-unfixed --format table --disable-telemetry --skip-version-check --exit-code 1 . + trivy fs --no-progress --ignore-unfixed --format table --disable-telemetry . # Notify if any previous step in this job failed - name: Notify on failure diff --git a/Dockerfile.rpmbuild b/Dockerfile.rpmbuild deleted file mode 100644 index 7dfb0b0..0000000 --- a/Dockerfile.rpmbuild +++ /dev/null @@ -1,87 +0,0 @@ -# syntax=docker/dockerfile:1 -ARG BASE_IMAGE=fedora:42 -FROM ${BASE_IMAGE} - -RUN set -eux; \ - dnf -y update; \ - dnf -y install \ - rpm-build \ - rpmdevtools \ - redhat-rpm-config \ - gcc \ - make \ - findutils \ - tar \ - gzip \ - rsync \ - python3 \ - python3-devel \ - python3-setuptools \ - python3-wheel \ - pyproject-rpm-macros \ - python3-rpm-macros \ - python3-yaml \ - python3-tomli \ - python3-defusedxml \ - python3-jinja2 \ - openssl-devel \ - python3-poetry-core ; \ - dnf -y clean all - -# Build runner script (copies repo, tars, runs rpmbuild) -RUN set -eux; cat > /usr/local/bin/build-rpm <<'EOF' -#!/usr/bin/env bash -set -euo pipefail - -SRC="${SRC:-/src}" -WORKROOT="${WORKROOT:-/work}" -OUT="${OUT:-/out}" - -mkdir -p "${WORKROOT}" "${OUT}" -WORK="${WORKROOT}/src" -rm -rf "${WORK}" -mkdir -p "${WORK}" - -rsync -a --delete \ - --exclude '.git' \ - --exclude '.venv' \ - --exclude 'dist' \ - --exclude 'build' \ - --exclude '__pycache__' \ - --exclude '.pytest_cache' \ - --exclude '.mypy_cache' \ - "${SRC}/" "${WORK}/" - -cd "${WORK}" - -# Determine version from pyproject.toml unless provided -if [ -n "${VERSION:-}" ]; then - ver="${VERSION}" -else - ver="$(grep -m1 '^version = ' pyproject.toml | sed -E 's/version = "([^"]+)".*/\1/')" -fi - -TOPDIR="${WORKROOT}/rpmbuild" -mkdir -p "${TOPDIR}"/{BUILD,BUILDROOT,RPMS,SOURCES,SPECS,SRPMS} - -tarball="${TOPDIR}/SOURCES/jinjaturtle-${ver}.tar.gz" -tar -czf "${tarball}" --transform "s#^#jinjaturtle/#" . - -spec_src="rpm/jinjaturtle.spec" - -cp -v "${spec_src}" "${TOPDIR}/SPECS/jinjaturtle.spec" - -rpmbuild -ba "${TOPDIR}/SPECS/jinjaturtle.spec" \ - --define "_topdir ${TOPDIR}" \ - --define "upstream_version ${ver}" - -shopt -s nullglob -cp -v "${TOPDIR}"/RPMS/*/*.rpm "${OUT}/" || true -cp -v "${TOPDIR}"/SRPMS/*.src.rpm "${OUT}/" || true -echo "Artifacts copied to ${OUT}" -EOF - -RUN chmod +x /usr/local/bin/build-rpm - -WORKDIR /work -ENTRYPOINT ["/usr/local/bin/build-rpm"] diff --git a/README.md b/README.md index 0e0ad48..80763f3 100644 --- a/README.md +++ b/README.md @@ -5,12 +5,11 @@ JinjaTurtle is a command-line tool to help you generate Jinja2 templates and -Ansible inventory from a native configuration file (or files) of a piece of -software. +Ansible inventory from a native configuration file of a piece of software. ## How it works - * The config file(s) is/are examined + * The config file is examined * Parameter key names are generated based on the parameter names in the config file. In keeping with Ansible best practices, you pass a prefix for the key names, which should typically match the name of your Ansible @@ -18,7 +17,7 @@ software. * A Jinja2 file is generated from the file with those parameter key names injected as the `{{ variable }}` names. * An Ansible inventory YAML file is generated with those key names and the - *values* taken from the original config file as the default vars. + *values* taken from the original config file as the defaults. By default, the Jinja2 template and the Ansible inventory are printed to stdout. However, it is possible to output the results to new files. @@ -39,29 +38,6 @@ You may need or wish to tidy up the config to suit your needs. The goal here is really to *speed up* converting files into Ansible/Jinja2, but not necessarily to make it perfect. -## Can I convert multiple files at once? - -Certainly! Pass the folder name instead of a specific file name, and JinjaTurtle -will convert any files it understands in that folder, storing all the various -vars in the destination defaults yaml file, and converting each file into a -Jinja2 template per file type. - -If all the files had the same 'type', there'll be one Jinja2 template. - -You can also pass `--recursive` to recurse into subfolders. - -Note: when using 'folder' mode and multiple files of the same type, their vars -will be listed under an 'items' parent key in the yaml, each with an `id` key. -You'll then want to use a `loop` in Ansible later, e.g: - -```yaml -- name: Render configs - template: - src: config.j2 - dest: "/somewhere/{{ item.id }}" - loop: "{{ myrole_items }}" -``` - ## How to install it ### Ubuntu/Debian apt repository @@ -74,25 +50,6 @@ sudo apt update sudo apt install jinjaturtle ``` -### Fedora - -```bash -sudo rpm --import https://mig5.net/static/mig5.asc - -sudo tee /etc/yum.repos.d/mig5.repo > /dev/null << 'EOF' -[mig5] -name=mig5 Repository -baseurl=https://rpm.mig5.net/$releasever/rpm/$basearch -enabled=1 -gpgcheck=1 -repo_gpgcheck=1 -gpgkey=https://mig5.net/static/mig5.asc -EOF - -sudo dnf upgrade --refresh -sudo dnf install jinjaturtle -``` - ### From PyPi ``` @@ -127,12 +84,12 @@ jinjaturtle php.ini \ ## Full usage info ``` -usage: jinjaturtle [-h] -r ROLE_NAME [-f {json,ini,toml,yaml,xml,postfix,systemd}] [-d DEFAULTS_OUTPUT] [-t TEMPLATE_OUTPUT] config +usage: jinjaturtle [-h] -r ROLE_NAME [-f {json,ini,toml,yaml,xml}] [-d DEFAULTS_OUTPUT] [-t TEMPLATE_OUTPUT] config Convert a config file into Ansible inventory and a Jinja2 template. positional arguments: - config Path to the source configuration file. + config Path to the source configuration file (TOML or INI-style). options: -h, --help show this help message and exit @@ -141,20 +98,11 @@ options: -f, --format {ini,json,toml,xml} Force config format instead of auto-detecting from filename. -d, --defaults-output DEFAULTS_OUTPUT - Path to write defaults/main.yml. If omitted, default vars are printed to stdout. + Path to write defaults/main.yml. If omitted, defaults YAML is printed to stdout. -t, --template-output TEMPLATE_OUTPUT Path to write the Jinja2 config template. If omitted, template is printed to stdout. ``` -## Additional supported formats - -JinjaTurtle can also template some common "bespoke" config formats: - -- **Postfix main.cf** (`main.cf`) → `--format postfix` -- **systemd unit files** (`*.service`, `*.socket`, etc.) → `--format systemd` - -For ambiguous extensions like `*.conf`, JinjaTurtle uses lightweight content sniffing; you can always force a specific handler via `--format`. - ## Found a bug, have a suggestion? diff --git a/debian/changelog b/debian/changelog index 7c6473c..909c34d 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,9 +1,3 @@ -jinjaturtle (0.3.5) unstable; urgency=medium - - * Support converting a directory (optionally recursively) instead of just an individual file. - - -- Miguel Jacq Tue, 30 Dec 2025 16:30:00 +1100 - jinjaturtle (0.3.4) unstable; urgency=medium * Render json files in a more pretty way diff --git a/debian/control b/debian/control index 648f3d9..72a7e21 100644 --- a/debian/control +++ b/debian/control @@ -25,4 +25,4 @@ Depends: python3-toml, python3-defusedxml, python3-jinja2 -Description: Convert config files into Ansible vars and Jinja2 templates. +Description: Convert config files into Ansible defaults and Jinja2 templates. diff --git a/pyproject.toml b/pyproject.toml index 3fc5c4a..c983dd1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "jinjaturtle" -version = "0.4.0" +version = "0.3.4" description = "Convert config files into Ansible defaults and Jinja2 templates." authors = ["Miguel Jacq "] license = "GPL-3.0-or-later" diff --git a/release.sh b/release.sh index 394a969..8133992 100755 --- a/release.sh +++ b/release.sh @@ -42,52 +42,3 @@ for dist in ${DISTS[@]}; do debfile=$(ls -1 dist/${release}/*.deb) reprepro -b /home/user/git/repo includedeb "${release}" "${debfile}" done - -# RPM -sudo apt-get -y install createrepo-c rpm -BUILD_OUTPUT="${HOME}/git/jinjaturtle/dist" -KEYID="00AE817C24A10C2540461A9C1D7CDE0234DB458D" -REPO_ROOT="${HOME}/git/repo_rpm" -REMOTE="letessier.mig5.net:/opt/repo_rpm" - -DISTS=( - fedora:43 - fedora:42 -) - -for dist in ${DISTS[@]}; do - release=$(echo ${dist} | cut -d: -f2) - REPO_RELEASE_ROOT="${REPO_ROOT}/${release}" - RPM_REPO="${REPO_RELEASE_ROOT}/rpm/x86_64" - mkdir -p "$RPM_REPO" - - docker build \ - --no-cache \ - -f Dockerfile.rpmbuild \ - -t jinjaturtle-rpm:${release} \ - --progress=plain \ - --build-arg BASE_IMAGE=${dist} \ - . - - rm -rf "$PWD/dist/rpm"/* - mkdir -p "$PWD/dist/rpm" - - docker run --rm -v "$PWD":/src -v "$PWD/dist/rpm":/out jinjaturtle-rpm:${release} - sudo chown -R "${USER}" "$PWD/dist" - - for file in `ls -1 "${BUILD_OUTPUT}/rpm"`; do - rpmsign --addsign "${BUILD_OUTPUT}/rpm/$file" - done - - cp "${BUILD_OUTPUT}/rpm/"*.rpm "$RPM_REPO/" - - createrepo_c "$RPM_REPO" - - echo "==> Signing repomd.xml..." - qubes-gpg-client --local-user "$KEYID" --detach-sign --armor "$RPM_REPO/repodata/repomd.xml" > "$RPM_REPO/repodata/repomd.xml.asc" -done - -echo "==> Syncing repo to server..." -rsync -aHPvz --exclude=.git --delete "$REPO_ROOT/" "$REMOTE/" - -echo "Done!" diff --git a/rpm/jinjaturtle.spec b/rpm/jinjaturtle.spec deleted file mode 100644 index 13182e8..0000000 --- a/rpm/jinjaturtle.spec +++ /dev/null @@ -1,48 +0,0 @@ -%global upstream_version 0.3.5 - -Name: jinjaturtle -Version: %{upstream_version} -Release: 1%{?dist}.jinjaturtle1 -Summary: Convert config files into Ansible vars and Jinja2 templates. - -License: GPL-3.0-or-later -URL: https://git.mig5.net/mig5/jinjaturtle -Source0: %{name}-%{version}.tar.gz - -BuildArch: noarch - -BuildRequires: pyproject-rpm-macros -BuildRequires: python3-devel -BuildRequires: python3-poetry-core - -Requires: python3-yaml -Requires: python3-tomli -Requires: python3-defusedxml -Requires: python3-jinja2 - -%description -Convert config files into Ansible defaults and Jinja2 templates. - -%prep -%autosetup -n jinjaturtle - -%generate_buildrequires -%pyproject_buildrequires - -%build -%pyproject_wheel - -%install -%pyproject_install -%pyproject_save_files jinjaturtle - -%files -f %{pyproject_files} -%license LICENSE -%doc README.md -%{_bindir}/jinjaturtle - -%changelog -* Tue Dec 30 2025 Miguel Jacq - %{version}-%{release} -- Support converting a directory (optionally recursively) instead of just an individual file. -* Sat Dec 27 2025 Miguel Jacq - %{version}-%{release} -- Initial RPM packaging for Fedora 42 diff --git a/src/jinjaturtle/cli.py b/src/jinjaturtle/cli.py index 74fdd8e..c222e86 100644 --- a/src/jinjaturtle/cli.py +++ b/src/jinjaturtle/cli.py @@ -13,8 +13,6 @@ from .core import ( generate_jinja2_template, ) -from .multi import process_directory - def _build_arg_parser() -> argparse.ArgumentParser: ap = argparse.ArgumentParser( @@ -23,26 +21,18 @@ def _build_arg_parser() -> argparse.ArgumentParser: ) ap.add_argument( "config", - help=( - "Path to a config file OR a folder containing supported config files. " - "Supported: .toml, .yaml/.yml, .json, .ini/.cfg/.conf, .xml" - ), + help="Path to the source configuration file (TOML, YAML, JSON or INI-style).", ) ap.add_argument( "-r", "--role-name", - default="jinjaturtle", - help="Ansible role name, used as variable prefix (default: jinjaturtle).", - ) - ap.add_argument( - "--recursive", - action="store_true", - help="When CONFIG is a folder, recurse into subfolders.", + required=True, + help="Ansible role name, used as variable prefix (e.g. cometbft).", ) ap.add_argument( "-f", "--format", - choices=["ini", "json", "toml", "yaml", "xml", "postfix", "systemd"], + choices=["ini", "json", "toml", "yaml", "xml"], help="Force config format instead of auto-detecting from filename.", ) ap.add_argument( @@ -64,40 +54,6 @@ def _main(argv: list[str] | None = None) -> int: args = parser.parse_args(argv) config_path = Path(args.config) - - # Folder mode - if config_path.is_dir(): - defaults_yaml, outputs = process_directory( - config_path, args.recursive, args.role_name - ) - - # Write defaults - if args.defaults_output: - Path(args.defaults_output).write_text(defaults_yaml, encoding="utf-8") - else: - print("# defaults/main.yml") - print(defaults_yaml, end="") - - # Write templates - if args.template_output: - out_path = Path(args.template_output) - if len(outputs) == 1 and not out_path.is_dir(): - out_path.write_text(outputs[0].template, encoding="utf-8") - else: - out_path.mkdir(parents=True, exist_ok=True) - for o in outputs: - (out_path / f"config.{o.fmt}.j2").write_text( - o.template, encoding="utf-8" - ) - else: - for o in outputs: - name = "config.j2" if len(outputs) == 1 else f"config.{o.fmt}.j2" - print(f"# {name}") - print(o.template, end="") - - return 0 - - # Single-file mode (existing behaviour) config_text = config_path.read_text(encoding="utf-8") # Parse the config @@ -133,7 +89,7 @@ def _main(argv: list[str] | None = None) -> int: print("# config.j2") print(template_str, end="") - return 0 + return True def main() -> None: diff --git a/src/jinjaturtle/core.py b/src/jinjaturtle/core.py index d53f182..e4f3d13 100644 --- a/src/jinjaturtle/core.py +++ b/src/jinjaturtle/core.py @@ -4,7 +4,6 @@ from pathlib import Path from typing import Any, Iterable import datetime -import re import yaml from .loop_analyzer import LoopAnalyzer, LoopCandidate @@ -15,8 +14,6 @@ from .handlers import ( TomlHandler, YamlHandler, XmlHandler, - PostfixMainHandler, - SystemdUnitHandler, ) @@ -59,34 +56,12 @@ _TOML_HANDLER = TomlHandler() _YAML_HANDLER = YamlHandler() _XML_HANDLER = XmlHandler() -_POSTFIX_HANDLER = PostfixMainHandler() -_SYSTEMD_HANDLER = SystemdUnitHandler() - _HANDLERS["ini"] = _INI_HANDLER _HANDLERS["json"] = _JSON_HANDLER _HANDLERS["toml"] = _TOML_HANDLER _HANDLERS["yaml"] = _YAML_HANDLER _HANDLERS["xml"] = _XML_HANDLER -_HANDLERS["postfix"] = _POSTFIX_HANDLER -_HANDLERS["systemd"] = _SYSTEMD_HANDLER - - -def dump_yaml(data: Any, *, sort_keys: bool = True) -> str: - """Dump YAML using JinjaTurtle's dumper settings. - - This is used by both the single-file and multi-file code paths. - """ - return yaml.dump( - data, - Dumper=_TurtleDumper, - sort_keys=sort_keys, - default_flow_style=False, - allow_unicode=True, - explicit_start=True, - indent=2, - ) - def make_var_name(role_prefix: str, path: Iterable[str]) -> str: """ @@ -95,92 +70,24 @@ def make_var_name(role_prefix: str, path: Iterable[str]) -> str: return BaseHandler.make_var_name(role_prefix, path) -def _read_head(path: Path, max_bytes: int = 65536) -> str: - try: - with path.open("r", encoding="utf-8", errors="replace") as f: - return f.read(max_bytes) - except OSError: - return "" - - -_SYSTEMD_SUFFIXES: set[str] = { - ".service", - ".socket", - ".target", - ".timer", - ".path", - ".mount", - ".automount", - ".slice", - ".swap", - ".scope", - ".link", - ".netdev", - ".network", -} - - -def _looks_like_systemd(text: str) -> bool: - # Be conservative: many INI-style configs have [section] and key=value. - # systemd unit files almost always contain one of these well-known sections. - if re.search( - r"^\s*\[(Unit|Service|Install|Socket|Timer|Path|Mount|Automount|Slice|Swap|Scope)\]\s*$", - text, - re.M, - ) and re.search(r"^\s*\w[\w\-]*\s*=", text, re.M): - return True - return False - - def detect_format(path: Path, explicit: str | None = None) -> str: """ - Determine config format. - - For unambiguous extensions (json/yaml/toml/xml/ini), we rely on the suffix. - For ambiguous extensions like '.conf' (or no extension), we sniff the content. + Determine config format from argument or filename. """ if explicit: return explicit - suffix = path.suffix.lower() name = path.name.lower() - - # Unambiguous extensions if suffix == ".toml": return "toml" if suffix in {".yaml", ".yml"}: return "yaml" if suffix == ".json": return "json" + if suffix in {".ini", ".cfg", ".conf"} or name.endswith(".ini"): + return "ini" if suffix == ".xml": return "xml" - - # Special-ish INI-like formats - if suffix in {".ini", ".cfg"} or name.endswith(".ini"): - return "ini" - if suffix == ".repo": - return "ini" - - # systemd units - if suffix in _SYSTEMD_SUFFIXES: - return "systemd" - - # well-known filenames - if name == "main.cf": - return "postfix" - - head = _read_head(path) - - # Content sniffing - if _looks_like_systemd(head): - return "systemd" - - # Ambiguous .conf/.cf defaults to INI-ish if no better match - if suffix in {".conf", ".cf"}: - if name == "main.cf": - return "postfix" - return "ini" - # Fallback: treat as INI-ish return "ini" @@ -276,7 +183,15 @@ def generate_ansible_yaml( var_name = make_var_name(role_prefix, candidate.path) defaults[var_name] = candidate.items - return dump_yaml(defaults, sort_keys=True) + return yaml.dump( + defaults, + Dumper=_TurtleDumper, + sort_keys=True, + default_flow_style=False, + allow_unicode=True, + explicit_start=True, + indent=2, + ) def generate_jinja2_template( diff --git a/src/jinjaturtle/handlers/__init__.py b/src/jinjaturtle/handlers/__init__.py index 97074c5..6bbcba1 100644 --- a/src/jinjaturtle/handlers/__init__.py +++ b/src/jinjaturtle/handlers/__init__.py @@ -8,9 +8,6 @@ from .toml import TomlHandler from .yaml import YamlHandler from .xml import XmlHandler -from .postfix import PostfixMainHandler -from .systemd import SystemdUnitHandler - __all__ = [ "BaseHandler", "DictLikeHandler", @@ -19,6 +16,4 @@ __all__ = [ "TomlHandler", "YamlHandler", "XmlHandler", - "PostfixMainHandler", - "SystemdUnitHandler", ] diff --git a/src/jinjaturtle/handlers/postfix.py b/src/jinjaturtle/handlers/postfix.py deleted file mode 100644 index 65f6be9..0000000 --- a/src/jinjaturtle/handlers/postfix.py +++ /dev/null @@ -1,177 +0,0 @@ -from __future__ import annotations - -from pathlib import Path -from typing import Any - -from . import BaseHandler - - -class PostfixMainHandler(BaseHandler): - """ - Handler for Postfix main.cf style configuration. - - Postfix main.cf is largely 'key = value' with: - - '#' comments - - continuation lines starting with whitespace (they continue the previous value) - """ - - fmt = "postfix" - - def parse(self, path: Path) -> dict[str, str]: - text = path.read_text(encoding="utf-8") - return self._parse_text_to_dict(text) - - def _parse_text_to_dict(self, text: str) -> dict[str, str]: - lines = text.splitlines() - out: dict[str, str] = {} - i = 0 - while i < len(lines): - line = lines[i] - stripped = line.strip() - if not stripped or stripped.startswith("#"): - i += 1 - continue - - if "=" not in line: - i += 1 - continue - - eq_index = line.find("=") - key = line[:eq_index].strip() - if not key: - i += 1 - continue - - # value + inline comment - after = line[eq_index + 1 :] - value_part, _comment = self._split_inline_comment(after, {"#"}) - value = value_part.strip() - - # collect continuation lines - j = i + 1 - cont_parts: list[str] = [] - while j < len(lines): - nxt = lines[j] - if not nxt: - break - if nxt.startswith((" ", "\t")): - if nxt.strip().startswith("#"): - # a commented continuation line - treat as a break - break - cont_parts.append(nxt.strip()) - j += 1 - continue - break - - if cont_parts: - value = " ".join([value] + cont_parts).strip() - - out[key] = value - i = j if cont_parts else i + 1 - - return out - - def flatten(self, parsed: Any) -> list[tuple[tuple[str, ...], Any]]: - if not isinstance(parsed, dict): - raise TypeError("Postfix parse result must be a dict[str, str]") - items: list[tuple[tuple[str, ...], Any]] = [] - for k, v in parsed.items(): - items.append(((k,), v)) - return items - - def generate_jinja2_template( - self, - parsed: Any, - role_prefix: str, - original_text: str | None = None, - ) -> str: - if original_text is None: - # Canonical render (lossy) - if not isinstance(parsed, dict): - raise TypeError("Postfix parse result must be a dict[str, str]") - lines: list[str] = [] - for k, v in parsed.items(): - var = self.make_var_name(role_prefix, (k,)) - lines.append(f"{k} = {{{{ {var} }}}}") - return "\n".join(lines).rstrip() + "\n" - return self._generate_from_text(role_prefix, original_text) - - def _generate_from_text(self, role_prefix: str, text: str) -> str: - lines = text.splitlines(keepends=True) - out_lines: list[str] = [] - i = 0 - while i < len(lines): - raw_line = lines[i] - content = raw_line.rstrip("\n") - newline = "\n" if raw_line.endswith("\n") else "" - - stripped = content.strip() - if not stripped: - out_lines.append(raw_line) - i += 1 - continue - if stripped.startswith("#"): - out_lines.append(raw_line) - i += 1 - continue - - if "=" not in content: - out_lines.append(raw_line) - i += 1 - continue - - eq_index = content.find("=") - before_eq = content[:eq_index] - after_eq = content[eq_index + 1 :] - - key = before_eq.strip() - if not key: - out_lines.append(raw_line) - i += 1 - continue - - # whitespace after '=' - value_ws_len = len(after_eq) - len(after_eq.lstrip(" \t")) - leading_ws = after_eq[:value_ws_len] - value_and_comment = after_eq[value_ws_len:] - - value_part, comment_part = self._split_inline_comment( - value_and_comment, {"#"} - ) - value = value_part.strip() - - # collect continuation physical lines to skip - j = i + 1 - cont_parts: list[str] = [] - while j < len(lines): - nxt_raw = lines[j] - nxt = nxt_raw.rstrip("\n") - if ( - nxt.startswith((" ", "\t")) - and nxt.strip() - and not nxt.strip().startswith("#") - ): - cont_parts.append(nxt.strip()) - j += 1 - continue - break - - if cont_parts: - value = " ".join([value] + cont_parts).strip() - - var = self.make_var_name(role_prefix, (key,)) - v = value - quoted = len(v) >= 2 and v[0] == v[-1] and v[0] in {'"', "'"} - if quoted: - replacement = ( - f'{before_eq}={leading_ws}"{{{{ {var} }}}}"{comment_part}{newline}' - ) - else: - replacement = ( - f"{before_eq}={leading_ws}{{{{ {var} }}}}{comment_part}{newline}" - ) - - out_lines.append(replacement) - i = j # skip continuation lines (if any) - - return "".join(out_lines) diff --git a/src/jinjaturtle/handlers/systemd.py b/src/jinjaturtle/handlers/systemd.py deleted file mode 100644 index 044fd86..0000000 --- a/src/jinjaturtle/handlers/systemd.py +++ /dev/null @@ -1,177 +0,0 @@ -from __future__ import annotations - -from dataclasses import dataclass -from pathlib import Path -from typing import Any - -from . import BaseHandler - - -@dataclass -class SystemdLine: - kind: str # 'blank' | 'comment' | 'section' | 'kv' | 'raw' - raw: str - lineno: int - section: str | None = None - key: str | None = None - value: str | None = None - comment: str = "" - before_eq: str = "" - leading_ws_after_eq: str = "" - occ_index: int | None = None - - -@dataclass -class SystemdUnit: - lines: list[SystemdLine] - - -class SystemdUnitHandler(BaseHandler): - """ - Handler for systemd unit files. - - unit files are INI-like, but keys may repeat (e.g. multiple ExecStart= lines). - We preserve repeated keys by indexing them when flattening and templating. - """ - - fmt = "systemd" - - def parse(self, path: Path) -> SystemdUnit: - text = path.read_text(encoding="utf-8") - return self._parse_text(text) - - def _parse_text(self, text: str) -> SystemdUnit: - lines = text.splitlines(keepends=True) - out: list[SystemdLine] = [] - current_section: str | None = None - # counts per section+key to assign occ_index - occ: dict[tuple[str, str], int] = {} - - for lineno, raw_line in enumerate(lines, start=1): - content = raw_line.rstrip("\n") - stripped = content.strip() - - if not stripped: - out.append(SystemdLine(kind="blank", raw=raw_line, lineno=lineno)) - continue - - if stripped.startswith(("#", ";")): - out.append(SystemdLine(kind="comment", raw=raw_line, lineno=lineno)) - continue - - # section header - if ( - stripped.startswith("[") - and stripped.endswith("]") - and len(stripped) >= 2 - ): - sec = stripped[1:-1].strip() - current_section = sec - out.append( - SystemdLine( - kind="section", raw=raw_line, lineno=lineno, section=sec - ) - ) - continue - - if "=" not in content: - out.append(SystemdLine(kind="raw", raw=raw_line, lineno=lineno)) - continue - - eq_index = content.find("=") - before_eq = content[:eq_index] - after_eq = content[eq_index + 1 :] - - key = before_eq.strip() - if not key: - out.append(SystemdLine(kind="raw", raw=raw_line, lineno=lineno)) - continue - - # whitespace after '=' - value_ws_len = len(after_eq) - len(after_eq.lstrip(" \t")) - leading_ws = after_eq[:value_ws_len] - value_and_comment = after_eq[value_ws_len:] - - value_part, comment = self._split_inline_comment( - value_and_comment, {"#", ";"} - ) - value = value_part.strip() - - sec = current_section or "DEFAULT" - k = (sec, key) - idx = occ.get(k, 0) - occ[k] = idx + 1 - - out.append( - SystemdLine( - kind="kv", - raw=raw_line, - lineno=lineno, - section=sec, - key=key, - value=value, - comment=comment, - before_eq=before_eq, - leading_ws_after_eq=leading_ws, - occ_index=idx, - ) - ) - - return SystemdUnit(lines=out) - - def flatten(self, parsed: Any) -> list[tuple[tuple[str, ...], Any]]: - if not isinstance(parsed, SystemdUnit): - raise TypeError("systemd parse result must be a SystemdUnit") - - # determine duplicates per (section,key) - counts: dict[tuple[str, str], int] = {} - for ln in parsed.lines: - if ln.kind == "kv" and ln.section and ln.key: - counts[(ln.section, ln.key)] = counts.get((ln.section, ln.key), 0) + 1 - - items: list[tuple[tuple[str, ...], Any]] = [] - for ln in parsed.lines: - if ln.kind != "kv" or not ln.section or not ln.key: - continue - path: tuple[str, ...] = (ln.section, ln.key) - if counts.get((ln.section, ln.key), 0) > 1 and ln.occ_index is not None: - path = path + (str(ln.occ_index),) - items.append((path, ln.value or "")) - return items - - def generate_jinja2_template( - self, - parsed: Any, - role_prefix: str, - original_text: str | None = None, - ) -> str: - if not isinstance(parsed, SystemdUnit): - raise TypeError("systemd parse result must be a SystemdUnit") - # We template using parsed lines so we preserve original formatting/comments. - counts: dict[tuple[str, str], int] = {} - for ln in parsed.lines: - if ln.kind == "kv" and ln.section and ln.key: - counts[(ln.section, ln.key)] = counts.get((ln.section, ln.key), 0) + 1 - - out_lines: list[str] = [] - for ln in parsed.lines: - if ln.kind != "kv" or not ln.section or not ln.key: - out_lines.append(ln.raw) - continue - - path: tuple[str, ...] = (ln.section, ln.key) - if counts.get((ln.section, ln.key), 0) > 1 and ln.occ_index is not None: - path = path + (str(ln.occ_index),) - var = self.make_var_name(role_prefix, path) - - v = (ln.value or "").strip() - quoted = len(v) >= 2 and v[0] == v[-1] and v[0] in {'"', "'"} - if quoted: - repl = f'{ln.before_eq}={ln.leading_ws_after_eq}"{{{{ {var} }}}}"{ln.comment}' - else: - repl = f"{ln.before_eq}={ln.leading_ws_after_eq}{{{{ {var} }}}}{ln.comment}" - - newline = "\n" if ln.raw.endswith("\n") else "" - out_lines.append(repl + newline) - - return "".join(out_lines) diff --git a/src/jinjaturtle/multi.py b/src/jinjaturtle/multi.py deleted file mode 100644 index 20cf544..0000000 --- a/src/jinjaturtle/multi.py +++ /dev/null @@ -1,771 +0,0 @@ -from __future__ import annotations - -"""Directory / multi-file processing. - -Folder mode: - * discover supported config files under a directory (optionally recursively) - * group them by detected format - * generate one *union* Jinja2 template per format - * generate a single defaults YAML containing a list of per-file values - -The union templates use `{% if ... is defined %}` blocks for paths that are -missing in some input files ("option B"), so missing keys/sections/elements are -omitted rather than rendered as empty values. - -Notes: - * If the folder contains *multiple* formats, we generate one template per - format (e.g. config.yaml.j2, config.xml.j2) and emit one list variable per - format in the defaults YAML. - * JSON union templates are emitted using a simple `{{ data | tojson }}` - approach to avoid comma-management complexity for optional keys. -""" - -from collections import Counter, defaultdict -from copy import deepcopy -import configparser -from dataclasses import dataclass -from pathlib import Path -from typing import Any, Iterable -import xml.etree.ElementTree as ET # nosec - -from .core import dump_yaml, flatten_config, make_var_name, parse_config -from .handlers.xml import XmlHandler - - -SUPPORTED_SUFFIXES: dict[str, set[str]] = { - "toml": {".toml"}, - "yaml": {".yaml", ".yml"}, - "json": {".json"}, - "ini": {".ini", ".cfg", ".conf", ".repo"}, - "xml": {".xml"}, -} - - -def is_supported_file(path: Path) -> bool: - if not path.is_file(): - return False - suffix = path.suffix.lower() - for exts in SUPPORTED_SUFFIXES.values(): - if suffix in exts: - return True - return False - - -def iter_supported_files(root: Path, recursive: bool) -> list[Path]: - if not root.exists(): - raise FileNotFoundError(str(root)) - if root.is_file(): - return [root] if is_supported_file(root) else [] - if not root.is_dir(): - return [] - - it = root.rglob("*") if recursive else root.glob("*") - files = [p for p in it if is_supported_file(p)] - files.sort() - return files - - -def defined_var_name(role_prefix: str, path: Iterable[str]) -> str: - """Presence marker var for a container path.""" - return make_var_name(role_prefix, ("defined",) + tuple(path)) - - -def _is_scalar(obj: Any) -> bool: - return not isinstance(obj, (dict, list)) - - -def _merge_union(a: Any, b: Any) -> Any: - """Merge two parsed objects into a union structure. - - - dicts: union keys, recursive - - lists: max length, merge by index - - scalars: keep the first (as a representative sample) - """ - if isinstance(a, dict) and isinstance(b, dict): - out: dict[str, Any] = {} - for k in b.keys(): - if k not in out and k not in a: - # handled later - pass - # preserve insertion order roughly: keys from a, then new keys from b - for k in a.keys(): - out[k] = _merge_union(a.get(k), b.get(k)) if k in b else a.get(k) - for k in b.keys(): - if k not in out: - out[k] = b.get(k) - return out - if isinstance(a, list) and isinstance(b, list): - n = max(len(a), len(b)) - out_list: list[Any] = [] - for i in range(n): - if i < len(a) and i < len(b): - out_list.append(_merge_union(a[i], b[i])) - elif i < len(a): - out_list.append(a[i]) - else: - out_list.append(b[i]) - return out_list - # different types or scalar - return a if a is not None else b - - -def _collect_dict_like_paths( - obj: Any, -) -> tuple[set[tuple[str, ...]], set[tuple[str, ...]]]: - """Return (container_paths, leaf_paths) for dict/list structures.""" - containers: set[tuple[str, ...]] = set() - leaves: set[tuple[str, ...]] = set() - - def walk(o: Any, path: tuple[str, ...]) -> None: - if isinstance(o, dict): - for k, v in o.items(): - kp = path + (str(k),) - containers.add(kp) - walk(v, kp) - return - if isinstance(o, list): - for i, v in enumerate(o): - ip = path + (str(i),) - containers.add(ip) - walk(v, ip) - return - leaves.add(path) - - walk(obj, ()) - return containers, leaves - - -def _yaml_scalar_placeholder( - role_prefix: str, path: tuple[str, ...], sample: Any -) -> str: - var = make_var_name(role_prefix, path) - if isinstance(sample, str): - return f'"{{{{ {var} }}}}"' - return f"{{{{ {var} }}}}" - - -def _yaml_render_union( - role_prefix: str, - union_obj: Any, - optional_containers: set[tuple[str, ...]], - indent: int = 0, - path: tuple[str, ...] = (), - in_list: bool = False, -) -> list[str]: - """Render YAML for union_obj with conditionals for optional containers.""" - lines: list[str] = [] - ind = " " * indent - - if isinstance(union_obj, dict): - for key, val in union_obj.items(): - key_path = path + (str(key),) - cond_var = ( - defined_var_name(role_prefix, key_path) - if key_path in optional_containers - else None - ) - - if _is_scalar(val) or val is None: - value = _yaml_scalar_placeholder(role_prefix, key_path, val) - if cond_var: - lines.append(f"{ind}{{% if {cond_var} is defined %}}") - lines.append(f"{ind}{key}: {value}") - if cond_var: - lines.append(f"{ind}{{% endif %}}") - else: - if cond_var: - lines.append(f"{ind}{{% if {cond_var} is defined %}}") - lines.append(f"{ind}{key}:") - lines.extend( - _yaml_render_union( - role_prefix, - val, - optional_containers, - indent=indent + 2, - path=key_path, - in_list=False, - ) - ) - if cond_var: - lines.append(f"{ind}{{% endif %}}") - return lines - - if isinstance(union_obj, list): - for i, item in enumerate(union_obj): - item_path = path + (str(i),) - cond_var = ( - defined_var_name(role_prefix, item_path) - if item_path in optional_containers - else None - ) - - if _is_scalar(item) or item is None: - value = _yaml_scalar_placeholder(role_prefix, item_path, item) - if cond_var: - lines.append(f"{ind}{{% if {cond_var} is defined %}}") - lines.append(f"{ind}- {value}") - if cond_var: - lines.append(f"{ind}{{% endif %}}") - elif isinstance(item, dict): - if cond_var: - lines.append(f"{ind}{{% if {cond_var} is defined %}}") - # First line: list marker with first key if possible - first = True - for k, v in item.items(): - kp = item_path + (str(k),) - k_cond = ( - defined_var_name(role_prefix, kp) - if kp in optional_containers - else None - ) - if _is_scalar(v) or v is None: - value = _yaml_scalar_placeholder(role_prefix, kp, v) - if first: - if k_cond: - lines.append(f"{ind}{{% if {k_cond} is defined %}}") - lines.append(f"{ind}- {k}: {value}") - if k_cond: - lines.append(f"{ind}{{% endif %}}") - first = False - else: - if k_cond: - lines.append(f"{ind} {{% if {k_cond} is defined %}}") - lines.append(f"{ind} {k}: {value}") - if k_cond: - lines.append(f"{ind} {{% endif %}}") - else: - # nested - if first: - if k_cond: - lines.append(f"{ind}{{% if {k_cond} is defined %}}") - lines.append(f"{ind}- {k}:") - lines.extend( - _yaml_render_union( - role_prefix, - v, - optional_containers, - indent=indent + 4, - path=kp, - ) - ) - if k_cond: - lines.append(f"{ind}{{% endif %}}") - first = False - else: - if k_cond: - lines.append(f"{ind} {{% if {k_cond} is defined %}}") - lines.append(f"{ind} {k}:") - lines.extend( - _yaml_render_union( - role_prefix, - v, - optional_containers, - indent=indent + 4, - path=kp, - ) - ) - if k_cond: - lines.append(f"{ind} {{% endif %}}") - if first: - # empty dict item - lines.append(f"{ind}- {{}}") - if cond_var: - lines.append(f"{ind}{{% endif %}}") - else: - # list of lists - emit as scalar-ish fallback - value = f"{{{{ {make_var_name(role_prefix, item_path)} }}}}" - if cond_var: - lines.append(f"{ind}{{% if {cond_var} is defined %}}") - lines.append(f"{ind}- {value}") - if cond_var: - lines.append(f"{ind}{{% endif %}}") - return lines - - # scalar at root - value = _yaml_scalar_placeholder(role_prefix, path, union_obj) - if in_list: - lines.append(f"{ind}- {value}") - else: - lines.append(f"{ind}{value}") - return lines - - -def _toml_render_union( - role_prefix: str, - union_obj: dict[str, Any], - optional_containers: set[tuple[str, ...]], -) -> str: - """Render TOML union template with optional tables/keys.""" - lines: list[str] = [] - - def emit_kv(path: tuple[str, ...], key: str, value: Any) -> None: - var_name = make_var_name(role_prefix, path + (key,)) - cond = ( - defined_var_name(role_prefix, path + (key,)) - if (path + (key,)) in optional_containers - else None - ) - if cond: - lines.append(f"{{% if {cond} is defined %}}") - if isinstance(value, str): - lines.append(f'{key} = "{{{{ {var_name} }}}}"') - elif isinstance(value, bool): - lines.append(f"{key} = {{{{ {var_name} | lower }}}}") - else: - lines.append(f"{key} = {{{{ {var_name} }}}}") - if cond: - lines.append("{% endif %}") - - def walk(obj: dict[str, Any], path: tuple[str, ...]) -> None: - if path: - cond = ( - defined_var_name(role_prefix, path) - if path in optional_containers - else None - ) - if cond: - lines.append(f"{{% if {cond} is defined %}}") - lines.append(f"[{'.'.join(path)}]") - - scalar_items = {k: v for k, v in obj.items() if not isinstance(v, dict)} - nested_items = {k: v for k, v in obj.items() if isinstance(v, dict)} - - for k, v in scalar_items.items(): - emit_kv(path, str(k), v) - - if scalar_items: - lines.append("") - - for k, v in nested_items.items(): - walk(v, path + (str(k),)) - - if path and (path in optional_containers): - lines.append("{% endif %}") - lines.append("") - - # root scalars - root_scalars = {k: v for k, v in union_obj.items() if not isinstance(v, dict)} - for k, v in root_scalars.items(): - emit_kv((), str(k), v) - if root_scalars: - lines.append("") - for k, v in union_obj.items(): - if isinstance(v, dict): - walk(v, (str(k),)) - - return "\n".join(lines).rstrip() + "\n" - - -def _ini_union_and_presence( - parsers: list[configparser.ConfigParser], -) -> tuple[configparser.ConfigParser, set[str], set[tuple[str, str]]]: - """Build a union ConfigParser and compute optional sections/keys.""" - union = configparser.ConfigParser() - union.optionxform = str # noqa - - section_sets: list[set[str]] = [] - key_sets: list[set[tuple[str, str]]] = [] - - for p in parsers: - sections = set(p.sections()) - section_sets.append(sections) - keys: set[tuple[str, str]] = set() - for s in p.sections(): - for k, _ in p.items(s, raw=True): - keys.add((s, k)) - key_sets.append(keys) - - for s in p.sections(): - if not union.has_section(s): - union.add_section(s) - for k, v in p.items(s, raw=True): - if not union.has_option(s, k): - union.set(s, k, v) - - if not section_sets: - return union, set(), set() - - sec_union = set().union(*section_sets) - sec_inter = set.intersection(*section_sets) - optional_sections = sec_union - sec_inter - - key_union = set().union(*key_sets) - key_inter = set.intersection(*key_sets) - optional_keys = key_union - key_inter - - return union, optional_sections, optional_keys - - -def _ini_render_union( - role_prefix: str, - union: configparser.ConfigParser, - optional_sections: set[str], - optional_keys: set[tuple[str, str]], -) -> str: - lines: list[str] = [] - for section in union.sections(): - sec_cond = ( - defined_var_name(role_prefix, (section,)) - if section in optional_sections - else None - ) - if sec_cond: - lines.append(f"{{% if {sec_cond} is defined %}}") - lines.append(f"[{section}]") - for key, raw_val in union.items(section, raw=True): - path = (section, key) - var = make_var_name(role_prefix, path) - key_cond = ( - defined_var_name(role_prefix, path) if path in optional_keys else None - ) - v = (raw_val or "").strip() - quoted = len(v) >= 2 and v[0] == v[-1] and v[0] in {'"', "'"} - if key_cond: - lines.append(f"{{% if {key_cond} is defined %}}") - if quoted: - lines.append(f'{key} = "{{{{ {var} }}}}"') - else: - lines.append(f"{key} = {{{{ {var} }}}}") - if key_cond: - lines.append("{% endif %}") - lines.append("") - if sec_cond: - lines.append("{% endif %}") - lines.append("") - return "\n".join(lines).rstrip() + "\n" - - -def _xml_collect_paths( - root: ET.Element, -) -> tuple[set[tuple[str, ...]], set[tuple[str, ...]]]: - """Return (element_paths, leaf_paths) based on XmlHandler's flatten rules.""" - element_paths: set[tuple[str, ...]] = set() - leaf_paths: set[tuple[str, ...]] = set() - - def walk(elem: ET.Element, path: tuple[str, ...]) -> None: - element_paths.add(path) - - for attr in elem.attrib: - leaf_paths.add(path + (f"@{attr}",)) - - children = [c for c in list(elem) if isinstance(c.tag, str)] - text = (elem.text or "").strip() - if text: - if not elem.attrib and not children: - leaf_paths.add(path) - else: - leaf_paths.add(path + ("value",)) - - counts = Counter(child.tag for child in children) - index_counters: dict[str, int] = defaultdict(int) - for child in children: - tag = child.tag - if counts[tag] > 1: - idx = index_counters[tag] - index_counters[tag] += 1 - child_path = path + (tag, str(idx)) - else: - child_path = path + (tag,) - walk(child, child_path) - - walk(root, ()) - return element_paths, leaf_paths - - -def _xml_merge_union(base: ET.Element, other: ET.Element) -> None: - """Merge other into base in-place.""" - # attributes - for k, v in other.attrib.items(): - if k not in base.attrib: - base.set(k, v) - - # text - if (base.text is None or not base.text.strip()) and ( - other.text and other.text.strip() - ): - base.text = other.text - - # children - base_children = [c for c in list(base) if isinstance(c.tag, str)] - other_children = [c for c in list(other) if isinstance(c.tag, str)] - - base_by_tag: dict[str, list[ET.Element]] = defaultdict(list) - other_by_tag: dict[str, list[ET.Element]] = defaultdict(list) - for c in base_children: - base_by_tag[c.tag].append(c) - for c in other_children: - other_by_tag[c.tag].append(c) - - # preserve base ordering; append new tags at end - seen_tags = set(base_by_tag.keys()) - tag_order = [c.tag for c in base_children if isinstance(c.tag, str)] - for t in other_by_tag.keys(): - if t not in seen_tags: - tag_order.append(t) - - # unique tags in order - ordered_tags: list[str] = [] - for t in tag_order: - if t not in ordered_tags: - ordered_tags.append(t) - - for tag in ordered_tags: - b_list = base_by_tag.get(tag, []) - o_list = other_by_tag.get(tag, []) - n = max(len(b_list), len(o_list)) - for i in range(n): - if i < len(b_list) and i < len(o_list): - _xml_merge_union(b_list[i], o_list[i]) - elif i < len(o_list): - base.append(deepcopy(o_list[i])) - - -def _xml_apply_jinja_union( - role_prefix: str, - root: ET.Element, - optional_elements: set[tuple[str, ...]], -) -> str: - """Generate XML template with optional element conditionals.""" - handler = XmlHandler() - - def wrap_optional_children(elem: ET.Element, path: tuple[str, ...]) -> None: - children = [c for c in list(elem) if isinstance(c.tag, str)] - if not children: - return - - # compute indexed paths the same way as flatten - counts = Counter(child.tag for child in children) - index_counters: dict[str, int] = defaultdict(int) - new_children: list[ET.Element] = [] - for child in children: - tag = child.tag - if counts[tag] > 1: - idx = index_counters[tag] - index_counters[tag] += 1 - child_path = path + (tag, str(idx)) - else: - child_path = path + (tag,) - - if child_path in optional_elements: - cond = defined_var_name(role_prefix, child_path) - new_children.append(ET.Comment(f"IF:{cond}")) - new_children.append(child) - new_children.append(ET.Comment(f"ENDIF:{cond}")) - else: - new_children.append(child) - - wrap_optional_children(child, child_path) - - # replace - for c in children: - elem.remove(c) - for c in new_children: - elem.append(c) - - # Wrap optionals before applying scalar substitution so markers stay - wrap_optional_children(root, ()) - handler._apply_jinja_to_xml_tree(role_prefix, root, loop_candidates=None) # type: ignore[attr-defined] - - indent = getattr(ET, "indent", None) - if indent is not None: - indent(root, space=" ") # type: ignore[arg-type] - - xml_body = ET.tostring(root, encoding="unicode") - # Reuse handler's conditional-marker replacement - xml_body = handler._insert_xml_loops(xml_body, role_prefix, [], root) # type: ignore[attr-defined] - return xml_body - - -@dataclass -class FormatOutput: - fmt: str - template: str - list_var: str - items: list[dict[str, Any]] - - -FOLDER_SUPPORTED_FORMATS: set[str] = {"json", "yaml", "toml", "ini", "xml"} - - -def process_directory( - root: Path, recursive: bool, role_prefix: str -) -> tuple[str, list[FormatOutput]]: - """Process a directory (or single file) into defaults YAML + template(s).""" - files = iter_supported_files(root, recursive) - if not files: - raise ValueError(f"No supported config files found under: {root}") - - # Parse and group by format - grouped: dict[str, list[tuple[Path, Any]]] = defaultdict(list) - for p in files: - fmt, parsed = parse_config(p, None) - if fmt not in FOLDER_SUPPORTED_FORMATS: - # Directory mode only supports a subset of formats for now. - continue - grouped[fmt].append((p, parsed)) - - if not grouped: - raise ValueError(f"No folder-supported config files found under: {root}") - - multiple_formats = len(grouped) > 1 - outputs: list[FormatOutput] = [] - - for fmt, entries in sorted(grouped.items()): - rel_ids = [ - e[0].relative_to(root).as_posix() if root.is_dir() else e[0].name - for e in entries - ] - parsed_list = [e[1] for e in entries] - - # JSON: simplest robust union template - if fmt == "json": - list_var = ( - f"{role_prefix}_{fmt}_items" - if multiple_formats - else f"{role_prefix}_items" - ) - template = "{{ data | tojson(indent=2) }}\n" - items: list[dict[str, Any]] = [] - for rid, parsed in zip(rel_ids, parsed_list): - items.append({"id": rid, "data": parsed}) - outputs.append( - FormatOutput(fmt=fmt, template=template, list_var=list_var, items=items) - ) - continue - - # Dict-like formats (YAML/TOML) use union merge on parsed objects - if fmt in {"yaml", "toml"}: - union_obj: Any = deepcopy(parsed_list[0]) - for p in parsed_list[1:]: - union_obj = _merge_union(union_obj, p) - - container_sets: list[set[tuple[str, ...]]] = [] - leaf_sets: list[set[tuple[str, ...]]] = [] - for p in parsed_list: - containers, leaves = _collect_dict_like_paths(p) - container_sets.append(containers) - leaf_sets.append(leaves) - - cont_union = set().union(*container_sets) - cont_inter = set.intersection(*container_sets) if container_sets else set() - optional_containers = cont_union - cont_inter - - list_var = ( - f"{role_prefix}_{fmt}_items" - if multiple_formats - else f"{role_prefix}_items" - ) - - if fmt == "yaml": - template_lines = _yaml_render_union( - role_prefix, union_obj, optional_containers - ) - template = "\n".join(template_lines).rstrip() + "\n" - else: - if not isinstance(union_obj, dict): - raise TypeError("TOML union must be a dict") - template = _toml_render_union( - role_prefix, union_obj, optional_containers - ) - - # Build per-file item dicts (leaf vars + presence markers) - items: list[dict[str, Any]] = [] - for rid, parsed, containers in zip(rel_ids, parsed_list, container_sets): - item: dict[str, Any] = {"id": rid} - flat = flatten_config(fmt, parsed, loop_candidates=None) - for path, value in flat: - item[make_var_name(role_prefix, path)] = value - for cpath in optional_containers: - if cpath in containers: - item[defined_var_name(role_prefix, cpath)] = True - items.append(item) - - outputs.append( - FormatOutput(fmt=fmt, template=template, list_var=list_var, items=items) - ) - continue - - if fmt == "ini": - parsers = parsed_list - if not all(isinstance(p, configparser.ConfigParser) for p in parsers): - raise TypeError("INI parse must produce ConfigParser") - union, opt_sections, opt_keys = _ini_union_and_presence(parsers) # type: ignore[arg-type] - - list_var = ( - f"{role_prefix}_{fmt}_items" - if multiple_formats - else f"{role_prefix}_items" - ) - template = _ini_render_union(role_prefix, union, opt_sections, opt_keys) - - items: list[dict[str, Any]] = [] - for rid, parser in zip(rel_ids, parsers): # type: ignore[arg-type] - item: dict[str, Any] = {"id": rid} - flat = flatten_config(fmt, parser, loop_candidates=None) - for path, value in flat: - item[make_var_name(role_prefix, path)] = value - # section presence - for sec in opt_sections: - if parser.has_section(sec): - item[defined_var_name(role_prefix, (sec,))] = True - # key presence - for sec, key in opt_keys: - if parser.has_option(sec, key): - item[defined_var_name(role_prefix, (sec, key))] = True - items.append(item) - - outputs.append( - FormatOutput(fmt=fmt, template=template, list_var=list_var, items=items) - ) - continue - - if fmt == "xml": - if not all(isinstance(p, ET.Element) for p in parsed_list): - raise TypeError("XML parse must produce Element") - union_root = deepcopy(parsed_list[0]) - for p in parsed_list[1:]: - _xml_merge_union(union_root, p) - - elem_sets: list[set[tuple[str, ...]]] = [] - for p in parsed_list: - elem_paths, _ = _xml_collect_paths(p) - elem_sets.append(elem_paths) - - elem_union = set().union(*elem_sets) - elem_inter = set.intersection(*elem_sets) if elem_sets else set() - optional_elements = (elem_union - elem_inter) - {()} # never wrap root - - list_var = ( - f"{role_prefix}_{fmt}_items" - if multiple_formats - else f"{role_prefix}_items" - ) - template = _xml_apply_jinja_union( - role_prefix, union_root, optional_elements - ) - - items: list[dict[str, Any]] = [] - for rid, parsed, elems in zip(rel_ids, parsed_list, elem_sets): - item: dict[str, Any] = {"id": rid} - flat = flatten_config(fmt, parsed, loop_candidates=None) - for path, value in flat: - item[make_var_name(role_prefix, path)] = value - for epath in optional_elements: - if epath in elems: - item[defined_var_name(role_prefix, epath)] = True - items.append(item) - - outputs.append( - FormatOutput(fmt=fmt, template=template, list_var=list_var, items=items) - ) - continue - - raise ValueError(f"Unsupported format in folder mode: {fmt}") - - # Build combined defaults YAML - defaults_doc: dict[str, Any] = {} - for out in outputs: - defaults_doc[out.list_var] = out.items - defaults_yaml = dump_yaml(defaults_doc, sort_keys=True) - - return defaults_yaml, outputs diff --git a/tests/test_cli.py b/tests/test_cli.py index 2f299a1..a880135 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -14,7 +14,7 @@ def test_cli_stdout_toml(capsys): cfg_path = SAMPLES_DIR / "tom.toml" exit_code = cli._main([str(cfg_path), "-r", "jinjaturtle"]) - assert exit_code == 0 + assert exit_code captured = capsys.readouterr() out = captured.out @@ -48,7 +48,7 @@ def test_cli_writes_output_files(tmp_path, capsys): ] ) - assert exit_code == 0 + assert exit_code assert defaults_path.is_file() assert template_path.is_file() diff --git a/tests/test_postfix_format.py b/tests/test_postfix_format.py deleted file mode 100644 index 6bb9e5d..0000000 --- a/tests/test_postfix_format.py +++ /dev/null @@ -1,33 +0,0 @@ -from __future__ import annotations - -from pathlib import Path - -import jinjaturtle.core as core - - -def test_postfix_main_cf_parsing_and_template(tmp_path: Path) -> None: - p = tmp_path / "main.cf" - p.write_text( - "# comment\n" - "myhostname = mail.example.com\n" - "mynetworks = 127.0.0.0/8\n" - " [::1]/128\n", - encoding="utf-8", - ) - - fmt, parsed = core.parse_config(p) - assert fmt == "postfix" - - flat = core.flatten_config(fmt, parsed) - assert (("myhostname",), "mail.example.com") in flat - assert any( - path == ("mynetworks",) and value.startswith("127.0.0.0/8") - for path, value in flat - ) - - template = core.generate_jinja2_template( - fmt, parsed, role_prefix="role", original_text=p.read_text(encoding="utf-8") - ) - assert "myhostname = {{ role_myhostname }}" in template - assert "mynetworks = {{ role_mynetworks }}" in template - assert "# comment" in template diff --git a/tests/test_systemd_format.py b/tests/test_systemd_format.py deleted file mode 100644 index c310f72..0000000 --- a/tests/test_systemd_format.py +++ /dev/null @@ -1,26 +0,0 @@ -from __future__ import annotations - -from pathlib import Path - -import jinjaturtle.core as core - - -def test_systemd_unit_repeated_keys(tmp_path: Path) -> None: - p = tmp_path / "demo.service" - p.write_text( - "[Service]\n" "ExecStart=/bin/echo one\n" "ExecStart=/bin/echo two\n", - encoding="utf-8", - ) - - fmt, parsed = core.parse_config(p) - assert fmt == "systemd" - - flat = core.flatten_config(fmt, parsed) - assert (("Service", "ExecStart", "0"), "/bin/echo one") in flat - assert (("Service", "ExecStart", "1"), "/bin/echo two") in flat - - template = core.generate_jinja2_template( - fmt, parsed, role_prefix="role", original_text=p.read_text(encoding="utf-8") - ) - assert "ExecStart={{ role_service_execstart_0 }}" in template - assert "ExecStart={{ role_service_execstart_1 }}" in template