Compare commits

..

11 commits
0.3.4 ... main

Author SHA1 Message Date
2f77cd4d80
Add support for systemd and postfix config files
All checks were successful
CI / test (push) Successful in 50s
Lint / test (push) Successful in 29s
Trivy / test (push) Successful in 23s
2026-01-06 11:57:50 +11:00
8f7f48dc91
fix fedora release
All checks were successful
CI / test (push) Successful in 51s
Lint / test (push) Successful in 30s
Trivy / test (push) Successful in 22s
2026-01-03 14:13:45 +11:00
a5c860e463
remove 'fc' from release root
All checks were successful
CI / test (push) Successful in 51s
Lint / test (push) Successful in 30s
Trivy / test (push) Successful in 23s
2026-01-03 12:49:59 +11:00
14428ff89c
Separate rpm dirs for different Fedora versions
All checks were successful
CI / test (push) Successful in 50s
Lint / test (push) Successful in 30s
Trivy / test (push) Successful in 23s
2026-01-03 09:33:06 +11:00
f92854382a
Build for Fedora 43
All checks were successful
CI / test (push) Successful in 50s
Lint / test (push) Successful in 30s
Trivy / test (push) Successful in 23s
2026-01-01 15:25:11 +11:00
3c40e55976
Remove unused method
All checks were successful
CI / test (push) Successful in 36s
Lint / test (push) Successful in 28s
Trivy / test (push) Successful in 17s
2025-12-30 16:46:20 +11:00
f0748e98e0
Support converting a folder of files, not just individual files each time, optionally recursively
Some checks failed
CI / test (push) Successful in 36s
Lint / test (push) Failing after 28s
Trivy / test (push) Successful in 16s
2025-12-30 16:36:05 +11:00
4d58107b22
Add Fedora install steps to README
All checks were successful
CI / test (push) Successful in 38s
Lint / test (push) Successful in 27s
Trivy / test (push) Successful in 17s
2025-12-27 19:14:42 +11:00
ad32d27ea2
Add fedora rpm building
All checks were successful
CI / test (push) Successful in 36s
Lint / test (push) Successful in 26s
Trivy / test (push) Successful in 16s
2025-12-27 16:54:46 +11:00
e652e9dbf3
Add build-deb action workflow
All checks were successful
CI / test (push) Successful in 36s
Lint / test (push) Successful in 28s
Trivy / test (push) Successful in 17s
2025-12-23 17:24:50 +11:00
d35630e5b6
Fix trivy exit code
All checks were successful
CI / test (push) Successful in 36s
Lint / test (push) Successful in 26s
Trivy / test (push) Successful in 16s
2025-12-22 17:28:32 +11:00
18 changed files with 1655 additions and 28 deletions

View file

@ -0,0 +1,67 @@
name: CI
on:
push:
jobs:
test:
runs-on: docker
steps:
- name: Install system dependencies
run: |
apt-get update
DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \
build-essential \
devscripts \
debhelper \
dh-python \
pybuild-plugin-pyproject \
python3-all \
python3-poetry-core \
python3-yaml \
python3-defusedxml \
python3-jinja2 \
python3-toml \
rsync \
ca-certificates
- name: Checkout
uses: actions/checkout@v4
with:
submodules: recursive
- name: Build deb
run: |
mkdir /out
rsync -a --delete \
--exclude '.git' \
--exclude '.venv' \
--exclude 'dist' \
--exclude 'build' \
--exclude '__pycache__' \
--exclude '.pytest_cache' \
--exclude '.mypy_cache' \
./ /out/
cd /out/
export DEBEMAIL="mig@mig5.net"
export DEBFULLNAME="Miguel Jacq"
dch --distribution "trixie" --local "~trixie" "CI build for trixie"
dpkg-buildpackage -us -uc -b
# Notify if any previous step in this job failed
- name: Notify on failure
if: ${{ failure() }}
env:
WEBHOOK_URL: ${{ secrets.NODERED_WEBHOOK_URL }}
REPOSITORY: ${{ forgejo.repository }}
RUN_NUMBER: ${{ forgejo.run_number }}
SERVER_URL: ${{ forgejo.server_url }}
run: |
curl -X POST \
-H "Content-Type: application/json" \
-d "{\"repository\":\"$REPOSITORY\",\"run_number\":\"$RUN_NUMBER\",\"status\":\"failure\",\"url\":\"$SERVER_URL/$REPOSITORY/actions/runs/$RUN_NUMBER\"}" \
"$WEBHOOK_URL"

View file

@ -23,7 +23,7 @@ jobs:
- name: Run trivy
run: |
trivy fs --no-progress --ignore-unfixed --format table --disable-telemetry .
trivy fs --no-progress --ignore-unfixed --format table --disable-telemetry --skip-version-check --exit-code 1 .
# Notify if any previous step in this job failed
- name: Notify on failure

87
Dockerfile.rpmbuild Normal file
View file

@ -0,0 +1,87 @@
# syntax=docker/dockerfile:1
ARG BASE_IMAGE=fedora:42
FROM ${BASE_IMAGE}
RUN set -eux; \
dnf -y update; \
dnf -y install \
rpm-build \
rpmdevtools \
redhat-rpm-config \
gcc \
make \
findutils \
tar \
gzip \
rsync \
python3 \
python3-devel \
python3-setuptools \
python3-wheel \
pyproject-rpm-macros \
python3-rpm-macros \
python3-yaml \
python3-tomli \
python3-defusedxml \
python3-jinja2 \
openssl-devel \
python3-poetry-core ; \
dnf -y clean all
# Build runner script (copies repo, tars, runs rpmbuild)
RUN set -eux; cat > /usr/local/bin/build-rpm <<'EOF'
#!/usr/bin/env bash
set -euo pipefail
SRC="${SRC:-/src}"
WORKROOT="${WORKROOT:-/work}"
OUT="${OUT:-/out}"
mkdir -p "${WORKROOT}" "${OUT}"
WORK="${WORKROOT}/src"
rm -rf "${WORK}"
mkdir -p "${WORK}"
rsync -a --delete \
--exclude '.git' \
--exclude '.venv' \
--exclude 'dist' \
--exclude 'build' \
--exclude '__pycache__' \
--exclude '.pytest_cache' \
--exclude '.mypy_cache' \
"${SRC}/" "${WORK}/"
cd "${WORK}"
# Determine version from pyproject.toml unless provided
if [ -n "${VERSION:-}" ]; then
ver="${VERSION}"
else
ver="$(grep -m1 '^version = ' pyproject.toml | sed -E 's/version = "([^"]+)".*/\1/')"
fi
TOPDIR="${WORKROOT}/rpmbuild"
mkdir -p "${TOPDIR}"/{BUILD,BUILDROOT,RPMS,SOURCES,SPECS,SRPMS}
tarball="${TOPDIR}/SOURCES/jinjaturtle-${ver}.tar.gz"
tar -czf "${tarball}" --transform "s#^#jinjaturtle/#" .
spec_src="rpm/jinjaturtle.spec"
cp -v "${spec_src}" "${TOPDIR}/SPECS/jinjaturtle.spec"
rpmbuild -ba "${TOPDIR}/SPECS/jinjaturtle.spec" \
--define "_topdir ${TOPDIR}" \
--define "upstream_version ${ver}"
shopt -s nullglob
cp -v "${TOPDIR}"/RPMS/*/*.rpm "${OUT}/" || true
cp -v "${TOPDIR}"/SRPMS/*.src.rpm "${OUT}/" || true
echo "Artifacts copied to ${OUT}"
EOF
RUN chmod +x /usr/local/bin/build-rpm
WORKDIR /work
ENTRYPOINT ["/usr/local/bin/build-rpm"]

View file

@ -5,11 +5,12 @@
</div>
JinjaTurtle is a command-line tool to help you generate Jinja2 templates and
Ansible inventory from a native configuration file of a piece of software.
Ansible inventory from a native configuration file (or files) of a piece of
software.
## How it works
* The config file is examined
* The config file(s) is/are examined
* Parameter key names are generated based on the parameter names in the
config file. In keeping with Ansible best practices, you pass a prefix
for the key names, which should typically match the name of your Ansible
@ -17,7 +18,7 @@ Ansible inventory from a native configuration file of a piece of software.
* A Jinja2 file is generated from the file with those parameter key names
injected as the `{{ variable }}` names.
* An Ansible inventory YAML file is generated with those key names and the
*values* taken from the original config file as the defaults.
*values* taken from the original config file as the default vars.
By default, the Jinja2 template and the Ansible inventory are printed to
stdout. However, it is possible to output the results to new files.
@ -38,6 +39,29 @@ You may need or wish to tidy up the config to suit your needs.
The goal here is really to *speed up* converting files into Ansible/Jinja2,
but not necessarily to make it perfect.
## Can I convert multiple files at once?
Certainly! Pass the folder name instead of a specific file name, and JinjaTurtle
will convert any files it understands in that folder, storing all the various
vars in the destination defaults yaml file, and converting each file into a
Jinja2 template per file type.
If all the files had the same 'type', there'll be one Jinja2 template.
You can also pass `--recursive` to recurse into subfolders.
Note: when using 'folder' mode and multiple files of the same type, their vars
will be listed under an 'items' parent key in the yaml, each with an `id` key.
You'll then want to use a `loop` in Ansible later, e.g:
```yaml
- name: Render configs
template:
src: config.j2
dest: "/somewhere/{{ item.id }}"
loop: "{{ myrole_items }}"
```
## How to install it
### Ubuntu/Debian apt repository
@ -50,6 +74,25 @@ sudo apt update
sudo apt install jinjaturtle
```
### Fedora
```bash
sudo rpm --import https://mig5.net/static/mig5.asc
sudo tee /etc/yum.repos.d/mig5.repo > /dev/null << 'EOF'
[mig5]
name=mig5 Repository
baseurl=https://rpm.mig5.net/$releasever/rpm/$basearch
enabled=1
gpgcheck=1
repo_gpgcheck=1
gpgkey=https://mig5.net/static/mig5.asc
EOF
sudo dnf upgrade --refresh
sudo dnf install jinjaturtle
```
### From PyPi
```
@ -84,12 +127,12 @@ jinjaturtle php.ini \
## Full usage info
```
usage: jinjaturtle [-h] -r ROLE_NAME [-f {json,ini,toml,yaml,xml}] [-d DEFAULTS_OUTPUT] [-t TEMPLATE_OUTPUT] config
usage: jinjaturtle [-h] -r ROLE_NAME [-f {json,ini,toml,yaml,xml,postfix,systemd}] [-d DEFAULTS_OUTPUT] [-t TEMPLATE_OUTPUT] config
Convert a config file into Ansible inventory and a Jinja2 template.
positional arguments:
config Path to the source configuration file (TOML or INI-style).
config Path to the source configuration file.
options:
-h, --help show this help message and exit
@ -98,11 +141,20 @@ options:
-f, --format {ini,json,toml,xml}
Force config format instead of auto-detecting from filename.
-d, --defaults-output DEFAULTS_OUTPUT
Path to write defaults/main.yml. If omitted, defaults YAML is printed to stdout.
Path to write defaults/main.yml. If omitted, default vars are printed to stdout.
-t, --template-output TEMPLATE_OUTPUT
Path to write the Jinja2 config template. If omitted, template is printed to stdout.
```
## Additional supported formats
JinjaTurtle can also template some common "bespoke" config formats:
- **Postfix main.cf** (`main.cf`) → `--format postfix`
- **systemd unit files** (`*.service`, `*.socket`, etc.) → `--format systemd`
For ambiguous extensions like `*.conf`, JinjaTurtle uses lightweight content sniffing; you can always force a specific handler via `--format`.
## Found a bug, have a suggestion?

6
debian/changelog vendored
View file

@ -1,3 +1,9 @@
jinjaturtle (0.3.5) unstable; urgency=medium
* Support converting a directory (optionally recursively) instead of just an individual file.
-- Miguel Jacq <mig@mig5.net> Tue, 30 Dec 2025 16:30:00 +1100
jinjaturtle (0.3.4) unstable; urgency=medium
* Render json files in a more pretty way

2
debian/control vendored
View file

@ -25,4 +25,4 @@ Depends:
python3-toml,
python3-defusedxml,
python3-jinja2
Description: Convert config files into Ansible defaults and Jinja2 templates.
Description: Convert config files into Ansible vars and Jinja2 templates.

View file

@ -1,6 +1,6 @@
[tool.poetry]
name = "jinjaturtle"
version = "0.3.4"
version = "0.4.0"
description = "Convert config files into Ansible defaults and Jinja2 templates."
authors = ["Miguel Jacq <mig@mig5.net>"]
license = "GPL-3.0-or-later"

View file

@ -42,3 +42,52 @@ for dist in ${DISTS[@]}; do
debfile=$(ls -1 dist/${release}/*.deb)
reprepro -b /home/user/git/repo includedeb "${release}" "${debfile}"
done
# RPM
sudo apt-get -y install createrepo-c rpm
BUILD_OUTPUT="${HOME}/git/jinjaturtle/dist"
KEYID="00AE817C24A10C2540461A9C1D7CDE0234DB458D"
REPO_ROOT="${HOME}/git/repo_rpm"
REMOTE="letessier.mig5.net:/opt/repo_rpm"
DISTS=(
fedora:43
fedora:42
)
for dist in ${DISTS[@]}; do
release=$(echo ${dist} | cut -d: -f2)
REPO_RELEASE_ROOT="${REPO_ROOT}/${release}"
RPM_REPO="${REPO_RELEASE_ROOT}/rpm/x86_64"
mkdir -p "$RPM_REPO"
docker build \
--no-cache \
-f Dockerfile.rpmbuild \
-t jinjaturtle-rpm:${release} \
--progress=plain \
--build-arg BASE_IMAGE=${dist} \
.
rm -rf "$PWD/dist/rpm"/*
mkdir -p "$PWD/dist/rpm"
docker run --rm -v "$PWD":/src -v "$PWD/dist/rpm":/out jinjaturtle-rpm:${release}
sudo chown -R "${USER}" "$PWD/dist"
for file in `ls -1 "${BUILD_OUTPUT}/rpm"`; do
rpmsign --addsign "${BUILD_OUTPUT}/rpm/$file"
done
cp "${BUILD_OUTPUT}/rpm/"*.rpm "$RPM_REPO/"
createrepo_c "$RPM_REPO"
echo "==> Signing repomd.xml..."
qubes-gpg-client --local-user "$KEYID" --detach-sign --armor "$RPM_REPO/repodata/repomd.xml" > "$RPM_REPO/repodata/repomd.xml.asc"
done
echo "==> Syncing repo to server..."
rsync -aHPvz --exclude=.git --delete "$REPO_ROOT/" "$REMOTE/"
echo "Done!"

48
rpm/jinjaturtle.spec Normal file
View file

@ -0,0 +1,48 @@
%global upstream_version 0.3.5
Name: jinjaturtle
Version: %{upstream_version}
Release: 1%{?dist}.jinjaturtle1
Summary: Convert config files into Ansible vars and Jinja2 templates.
License: GPL-3.0-or-later
URL: https://git.mig5.net/mig5/jinjaturtle
Source0: %{name}-%{version}.tar.gz
BuildArch: noarch
BuildRequires: pyproject-rpm-macros
BuildRequires: python3-devel
BuildRequires: python3-poetry-core
Requires: python3-yaml
Requires: python3-tomli
Requires: python3-defusedxml
Requires: python3-jinja2
%description
Convert config files into Ansible defaults and Jinja2 templates.
%prep
%autosetup -n jinjaturtle
%generate_buildrequires
%pyproject_buildrequires
%build
%pyproject_wheel
%install
%pyproject_install
%pyproject_save_files jinjaturtle
%files -f %{pyproject_files}
%license LICENSE
%doc README.md
%{_bindir}/jinjaturtle
%changelog
* Tue Dec 30 2025 Miguel Jacq <mig@mig5.net> - %{version}-%{release}
- Support converting a directory (optionally recursively) instead of just an individual file.
* Sat Dec 27 2025 Miguel Jacq <mig@mig5.net> - %{version}-%{release}
- Initial RPM packaging for Fedora 42

View file

@ -13,6 +13,8 @@ from .core import (
generate_jinja2_template,
)
from .multi import process_directory
def _build_arg_parser() -> argparse.ArgumentParser:
ap = argparse.ArgumentParser(
@ -21,18 +23,26 @@ def _build_arg_parser() -> argparse.ArgumentParser:
)
ap.add_argument(
"config",
help="Path to the source configuration file (TOML, YAML, JSON or INI-style).",
help=(
"Path to a config file OR a folder containing supported config files. "
"Supported: .toml, .yaml/.yml, .json, .ini/.cfg/.conf, .xml"
),
)
ap.add_argument(
"-r",
"--role-name",
required=True,
help="Ansible role name, used as variable prefix (e.g. cometbft).",
default="jinjaturtle",
help="Ansible role name, used as variable prefix (default: jinjaturtle).",
)
ap.add_argument(
"--recursive",
action="store_true",
help="When CONFIG is a folder, recurse into subfolders.",
)
ap.add_argument(
"-f",
"--format",
choices=["ini", "json", "toml", "yaml", "xml"],
choices=["ini", "json", "toml", "yaml", "xml", "postfix", "systemd"],
help="Force config format instead of auto-detecting from filename.",
)
ap.add_argument(
@ -54,6 +64,40 @@ def _main(argv: list[str] | None = None) -> int:
args = parser.parse_args(argv)
config_path = Path(args.config)
# Folder mode
if config_path.is_dir():
defaults_yaml, outputs = process_directory(
config_path, args.recursive, args.role_name
)
# Write defaults
if args.defaults_output:
Path(args.defaults_output).write_text(defaults_yaml, encoding="utf-8")
else:
print("# defaults/main.yml")
print(defaults_yaml, end="")
# Write templates
if args.template_output:
out_path = Path(args.template_output)
if len(outputs) == 1 and not out_path.is_dir():
out_path.write_text(outputs[0].template, encoding="utf-8")
else:
out_path.mkdir(parents=True, exist_ok=True)
for o in outputs:
(out_path / f"config.{o.fmt}.j2").write_text(
o.template, encoding="utf-8"
)
else:
for o in outputs:
name = "config.j2" if len(outputs) == 1 else f"config.{o.fmt}.j2"
print(f"# {name}")
print(o.template, end="")
return 0
# Single-file mode (existing behaviour)
config_text = config_path.read_text(encoding="utf-8")
# Parse the config
@ -89,7 +133,7 @@ def _main(argv: list[str] | None = None) -> int:
print("# config.j2")
print(template_str, end="")
return True
return 0
def main() -> None:

View file

@ -4,6 +4,7 @@ from pathlib import Path
from typing import Any, Iterable
import datetime
import re
import yaml
from .loop_analyzer import LoopAnalyzer, LoopCandidate
@ -14,6 +15,8 @@ from .handlers import (
TomlHandler,
YamlHandler,
XmlHandler,
PostfixMainHandler,
SystemdUnitHandler,
)
@ -56,12 +59,34 @@ _TOML_HANDLER = TomlHandler()
_YAML_HANDLER = YamlHandler()
_XML_HANDLER = XmlHandler()
_POSTFIX_HANDLER = PostfixMainHandler()
_SYSTEMD_HANDLER = SystemdUnitHandler()
_HANDLERS["ini"] = _INI_HANDLER
_HANDLERS["json"] = _JSON_HANDLER
_HANDLERS["toml"] = _TOML_HANDLER
_HANDLERS["yaml"] = _YAML_HANDLER
_HANDLERS["xml"] = _XML_HANDLER
_HANDLERS["postfix"] = _POSTFIX_HANDLER
_HANDLERS["systemd"] = _SYSTEMD_HANDLER
def dump_yaml(data: Any, *, sort_keys: bool = True) -> str:
"""Dump YAML using JinjaTurtle's dumper settings.
This is used by both the single-file and multi-file code paths.
"""
return yaml.dump(
data,
Dumper=_TurtleDumper,
sort_keys=sort_keys,
default_flow_style=False,
allow_unicode=True,
explicit_start=True,
indent=2,
)
def make_var_name(role_prefix: str, path: Iterable[str]) -> str:
"""
@ -70,24 +95,92 @@ def make_var_name(role_prefix: str, path: Iterable[str]) -> str:
return BaseHandler.make_var_name(role_prefix, path)
def _read_head(path: Path, max_bytes: int = 65536) -> str:
try:
with path.open("r", encoding="utf-8", errors="replace") as f:
return f.read(max_bytes)
except OSError:
return ""
_SYSTEMD_SUFFIXES: set[str] = {
".service",
".socket",
".target",
".timer",
".path",
".mount",
".automount",
".slice",
".swap",
".scope",
".link",
".netdev",
".network",
}
def _looks_like_systemd(text: str) -> bool:
# Be conservative: many INI-style configs have [section] and key=value.
# systemd unit files almost always contain one of these well-known sections.
if re.search(
r"^\s*\[(Unit|Service|Install|Socket|Timer|Path|Mount|Automount|Slice|Swap|Scope)\]\s*$",
text,
re.M,
) and re.search(r"^\s*\w[\w\-]*\s*=", text, re.M):
return True
return False
def detect_format(path: Path, explicit: str | None = None) -> str:
"""
Determine config format from argument or filename.
Determine config format.
For unambiguous extensions (json/yaml/toml/xml/ini), we rely on the suffix.
For ambiguous extensions like '.conf' (or no extension), we sniff the content.
"""
if explicit:
return explicit
suffix = path.suffix.lower()
name = path.name.lower()
# Unambiguous extensions
if suffix == ".toml":
return "toml"
if suffix in {".yaml", ".yml"}:
return "yaml"
if suffix == ".json":
return "json"
if suffix in {".ini", ".cfg", ".conf"} or name.endswith(".ini"):
return "ini"
if suffix == ".xml":
return "xml"
# Special-ish INI-like formats
if suffix in {".ini", ".cfg"} or name.endswith(".ini"):
return "ini"
if suffix == ".repo":
return "ini"
# systemd units
if suffix in _SYSTEMD_SUFFIXES:
return "systemd"
# well-known filenames
if name == "main.cf":
return "postfix"
head = _read_head(path)
# Content sniffing
if _looks_like_systemd(head):
return "systemd"
# Ambiguous .conf/.cf defaults to INI-ish if no better match
if suffix in {".conf", ".cf"}:
if name == "main.cf":
return "postfix"
return "ini"
# Fallback: treat as INI-ish
return "ini"
@ -183,15 +276,7 @@ def generate_ansible_yaml(
var_name = make_var_name(role_prefix, candidate.path)
defaults[var_name] = candidate.items
return yaml.dump(
defaults,
Dumper=_TurtleDumper,
sort_keys=True,
default_flow_style=False,
allow_unicode=True,
explicit_start=True,
indent=2,
)
return dump_yaml(defaults, sort_keys=True)
def generate_jinja2_template(

View file

@ -8,6 +8,9 @@ from .toml import TomlHandler
from .yaml import YamlHandler
from .xml import XmlHandler
from .postfix import PostfixMainHandler
from .systemd import SystemdUnitHandler
__all__ = [
"BaseHandler",
"DictLikeHandler",
@ -16,4 +19,6 @@ __all__ = [
"TomlHandler",
"YamlHandler",
"XmlHandler",
"PostfixMainHandler",
"SystemdUnitHandler",
]

View file

@ -0,0 +1,177 @@
from __future__ import annotations
from pathlib import Path
from typing import Any
from . import BaseHandler
class PostfixMainHandler(BaseHandler):
"""
Handler for Postfix main.cf style configuration.
Postfix main.cf is largely 'key = value' with:
- '#' comments
- continuation lines starting with whitespace (they continue the previous value)
"""
fmt = "postfix"
def parse(self, path: Path) -> dict[str, str]:
text = path.read_text(encoding="utf-8")
return self._parse_text_to_dict(text)
def _parse_text_to_dict(self, text: str) -> dict[str, str]:
lines = text.splitlines()
out: dict[str, str] = {}
i = 0
while i < len(lines):
line = lines[i]
stripped = line.strip()
if not stripped or stripped.startswith("#"):
i += 1
continue
if "=" not in line:
i += 1
continue
eq_index = line.find("=")
key = line[:eq_index].strip()
if not key:
i += 1
continue
# value + inline comment
after = line[eq_index + 1 :]
value_part, _comment = self._split_inline_comment(after, {"#"})
value = value_part.strip()
# collect continuation lines
j = i + 1
cont_parts: list[str] = []
while j < len(lines):
nxt = lines[j]
if not nxt:
break
if nxt.startswith((" ", "\t")):
if nxt.strip().startswith("#"):
# a commented continuation line - treat as a break
break
cont_parts.append(nxt.strip())
j += 1
continue
break
if cont_parts:
value = " ".join([value] + cont_parts).strip()
out[key] = value
i = j if cont_parts else i + 1
return out
def flatten(self, parsed: Any) -> list[tuple[tuple[str, ...], Any]]:
if not isinstance(parsed, dict):
raise TypeError("Postfix parse result must be a dict[str, str]")
items: list[tuple[tuple[str, ...], Any]] = []
for k, v in parsed.items():
items.append(((k,), v))
return items
def generate_jinja2_template(
self,
parsed: Any,
role_prefix: str,
original_text: str | None = None,
) -> str:
if original_text is None:
# Canonical render (lossy)
if not isinstance(parsed, dict):
raise TypeError("Postfix parse result must be a dict[str, str]")
lines: list[str] = []
for k, v in parsed.items():
var = self.make_var_name(role_prefix, (k,))
lines.append(f"{k} = {{{{ {var} }}}}")
return "\n".join(lines).rstrip() + "\n"
return self._generate_from_text(role_prefix, original_text)
def _generate_from_text(self, role_prefix: str, text: str) -> str:
lines = text.splitlines(keepends=True)
out_lines: list[str] = []
i = 0
while i < len(lines):
raw_line = lines[i]
content = raw_line.rstrip("\n")
newline = "\n" if raw_line.endswith("\n") else ""
stripped = content.strip()
if not stripped:
out_lines.append(raw_line)
i += 1
continue
if stripped.startswith("#"):
out_lines.append(raw_line)
i += 1
continue
if "=" not in content:
out_lines.append(raw_line)
i += 1
continue
eq_index = content.find("=")
before_eq = content[:eq_index]
after_eq = content[eq_index + 1 :]
key = before_eq.strip()
if not key:
out_lines.append(raw_line)
i += 1
continue
# whitespace after '='
value_ws_len = len(after_eq) - len(after_eq.lstrip(" \t"))
leading_ws = after_eq[:value_ws_len]
value_and_comment = after_eq[value_ws_len:]
value_part, comment_part = self._split_inline_comment(
value_and_comment, {"#"}
)
value = value_part.strip()
# collect continuation physical lines to skip
j = i + 1
cont_parts: list[str] = []
while j < len(lines):
nxt_raw = lines[j]
nxt = nxt_raw.rstrip("\n")
if (
nxt.startswith((" ", "\t"))
and nxt.strip()
and not nxt.strip().startswith("#")
):
cont_parts.append(nxt.strip())
j += 1
continue
break
if cont_parts:
value = " ".join([value] + cont_parts).strip()
var = self.make_var_name(role_prefix, (key,))
v = value
quoted = len(v) >= 2 and v[0] == v[-1] and v[0] in {'"', "'"}
if quoted:
replacement = (
f'{before_eq}={leading_ws}"{{{{ {var} }}}}"{comment_part}{newline}'
)
else:
replacement = (
f"{before_eq}={leading_ws}{{{{ {var} }}}}{comment_part}{newline}"
)
out_lines.append(replacement)
i = j # skip continuation lines (if any)
return "".join(out_lines)

View file

@ -0,0 +1,177 @@
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from . import BaseHandler
@dataclass
class SystemdLine:
kind: str # 'blank' | 'comment' | 'section' | 'kv' | 'raw'
raw: str
lineno: int
section: str | None = None
key: str | None = None
value: str | None = None
comment: str = ""
before_eq: str = ""
leading_ws_after_eq: str = ""
occ_index: int | None = None
@dataclass
class SystemdUnit:
lines: list[SystemdLine]
class SystemdUnitHandler(BaseHandler):
"""
Handler for systemd unit files.
unit files are INI-like, but keys may repeat (e.g. multiple ExecStart= lines).
We preserve repeated keys by indexing them when flattening and templating.
"""
fmt = "systemd"
def parse(self, path: Path) -> SystemdUnit:
text = path.read_text(encoding="utf-8")
return self._parse_text(text)
def _parse_text(self, text: str) -> SystemdUnit:
lines = text.splitlines(keepends=True)
out: list[SystemdLine] = []
current_section: str | None = None
# counts per section+key to assign occ_index
occ: dict[tuple[str, str], int] = {}
for lineno, raw_line in enumerate(lines, start=1):
content = raw_line.rstrip("\n")
stripped = content.strip()
if not stripped:
out.append(SystemdLine(kind="blank", raw=raw_line, lineno=lineno))
continue
if stripped.startswith(("#", ";")):
out.append(SystemdLine(kind="comment", raw=raw_line, lineno=lineno))
continue
# section header
if (
stripped.startswith("[")
and stripped.endswith("]")
and len(stripped) >= 2
):
sec = stripped[1:-1].strip()
current_section = sec
out.append(
SystemdLine(
kind="section", raw=raw_line, lineno=lineno, section=sec
)
)
continue
if "=" not in content:
out.append(SystemdLine(kind="raw", raw=raw_line, lineno=lineno))
continue
eq_index = content.find("=")
before_eq = content[:eq_index]
after_eq = content[eq_index + 1 :]
key = before_eq.strip()
if not key:
out.append(SystemdLine(kind="raw", raw=raw_line, lineno=lineno))
continue
# whitespace after '='
value_ws_len = len(after_eq) - len(after_eq.lstrip(" \t"))
leading_ws = after_eq[:value_ws_len]
value_and_comment = after_eq[value_ws_len:]
value_part, comment = self._split_inline_comment(
value_and_comment, {"#", ";"}
)
value = value_part.strip()
sec = current_section or "DEFAULT"
k = (sec, key)
idx = occ.get(k, 0)
occ[k] = idx + 1
out.append(
SystemdLine(
kind="kv",
raw=raw_line,
lineno=lineno,
section=sec,
key=key,
value=value,
comment=comment,
before_eq=before_eq,
leading_ws_after_eq=leading_ws,
occ_index=idx,
)
)
return SystemdUnit(lines=out)
def flatten(self, parsed: Any) -> list[tuple[tuple[str, ...], Any]]:
if not isinstance(parsed, SystemdUnit):
raise TypeError("systemd parse result must be a SystemdUnit")
# determine duplicates per (section,key)
counts: dict[tuple[str, str], int] = {}
for ln in parsed.lines:
if ln.kind == "kv" and ln.section and ln.key:
counts[(ln.section, ln.key)] = counts.get((ln.section, ln.key), 0) + 1
items: list[tuple[tuple[str, ...], Any]] = []
for ln in parsed.lines:
if ln.kind != "kv" or not ln.section or not ln.key:
continue
path: tuple[str, ...] = (ln.section, ln.key)
if counts.get((ln.section, ln.key), 0) > 1 and ln.occ_index is not None:
path = path + (str(ln.occ_index),)
items.append((path, ln.value or ""))
return items
def generate_jinja2_template(
self,
parsed: Any,
role_prefix: str,
original_text: str | None = None,
) -> str:
if not isinstance(parsed, SystemdUnit):
raise TypeError("systemd parse result must be a SystemdUnit")
# We template using parsed lines so we preserve original formatting/comments.
counts: dict[tuple[str, str], int] = {}
for ln in parsed.lines:
if ln.kind == "kv" and ln.section and ln.key:
counts[(ln.section, ln.key)] = counts.get((ln.section, ln.key), 0) + 1
out_lines: list[str] = []
for ln in parsed.lines:
if ln.kind != "kv" or not ln.section or not ln.key:
out_lines.append(ln.raw)
continue
path: tuple[str, ...] = (ln.section, ln.key)
if counts.get((ln.section, ln.key), 0) > 1 and ln.occ_index is not None:
path = path + (str(ln.occ_index),)
var = self.make_var_name(role_prefix, path)
v = (ln.value or "").strip()
quoted = len(v) >= 2 and v[0] == v[-1] and v[0] in {'"', "'"}
if quoted:
repl = f'{ln.before_eq}={ln.leading_ws_after_eq}"{{{{ {var} }}}}"{ln.comment}'
else:
repl = f"{ln.before_eq}={ln.leading_ws_after_eq}{{{{ {var} }}}}{ln.comment}"
newline = "\n" if ln.raw.endswith("\n") else ""
out_lines.append(repl + newline)
return "".join(out_lines)

771
src/jinjaturtle/multi.py Normal file
View file

@ -0,0 +1,771 @@
from __future__ import annotations
"""Directory / multi-file processing.
Folder mode:
* discover supported config files under a directory (optionally recursively)
* group them by detected format
* generate one *union* Jinja2 template per format
* generate a single defaults YAML containing a list of per-file values
The union templates use `{% if ... is defined %}` blocks for paths that are
missing in some input files ("option B"), so missing keys/sections/elements are
omitted rather than rendered as empty values.
Notes:
* If the folder contains *multiple* formats, we generate one template per
format (e.g. config.yaml.j2, config.xml.j2) and emit one list variable per
format in the defaults YAML.
* JSON union templates are emitted using a simple `{{ data | tojson }}`
approach to avoid comma-management complexity for optional keys.
"""
from collections import Counter, defaultdict
from copy import deepcopy
import configparser
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Iterable
import xml.etree.ElementTree as ET # nosec
from .core import dump_yaml, flatten_config, make_var_name, parse_config
from .handlers.xml import XmlHandler
SUPPORTED_SUFFIXES: dict[str, set[str]] = {
"toml": {".toml"},
"yaml": {".yaml", ".yml"},
"json": {".json"},
"ini": {".ini", ".cfg", ".conf", ".repo"},
"xml": {".xml"},
}
def is_supported_file(path: Path) -> bool:
if not path.is_file():
return False
suffix = path.suffix.lower()
for exts in SUPPORTED_SUFFIXES.values():
if suffix in exts:
return True
return False
def iter_supported_files(root: Path, recursive: bool) -> list[Path]:
if not root.exists():
raise FileNotFoundError(str(root))
if root.is_file():
return [root] if is_supported_file(root) else []
if not root.is_dir():
return []
it = root.rglob("*") if recursive else root.glob("*")
files = [p for p in it if is_supported_file(p)]
files.sort()
return files
def defined_var_name(role_prefix: str, path: Iterable[str]) -> str:
"""Presence marker var for a container path."""
return make_var_name(role_prefix, ("defined",) + tuple(path))
def _is_scalar(obj: Any) -> bool:
return not isinstance(obj, (dict, list))
def _merge_union(a: Any, b: Any) -> Any:
"""Merge two parsed objects into a union structure.
- dicts: union keys, recursive
- lists: max length, merge by index
- scalars: keep the first (as a representative sample)
"""
if isinstance(a, dict) and isinstance(b, dict):
out: dict[str, Any] = {}
for k in b.keys():
if k not in out and k not in a:
# handled later
pass
# preserve insertion order roughly: keys from a, then new keys from b
for k in a.keys():
out[k] = _merge_union(a.get(k), b.get(k)) if k in b else a.get(k)
for k in b.keys():
if k not in out:
out[k] = b.get(k)
return out
if isinstance(a, list) and isinstance(b, list):
n = max(len(a), len(b))
out_list: list[Any] = []
for i in range(n):
if i < len(a) and i < len(b):
out_list.append(_merge_union(a[i], b[i]))
elif i < len(a):
out_list.append(a[i])
else:
out_list.append(b[i])
return out_list
# different types or scalar
return a if a is not None else b
def _collect_dict_like_paths(
obj: Any,
) -> tuple[set[tuple[str, ...]], set[tuple[str, ...]]]:
"""Return (container_paths, leaf_paths) for dict/list structures."""
containers: set[tuple[str, ...]] = set()
leaves: set[tuple[str, ...]] = set()
def walk(o: Any, path: tuple[str, ...]) -> None:
if isinstance(o, dict):
for k, v in o.items():
kp = path + (str(k),)
containers.add(kp)
walk(v, kp)
return
if isinstance(o, list):
for i, v in enumerate(o):
ip = path + (str(i),)
containers.add(ip)
walk(v, ip)
return
leaves.add(path)
walk(obj, ())
return containers, leaves
def _yaml_scalar_placeholder(
role_prefix: str, path: tuple[str, ...], sample: Any
) -> str:
var = make_var_name(role_prefix, path)
if isinstance(sample, str):
return f'"{{{{ {var} }}}}"'
return f"{{{{ {var} }}}}"
def _yaml_render_union(
role_prefix: str,
union_obj: Any,
optional_containers: set[tuple[str, ...]],
indent: int = 0,
path: tuple[str, ...] = (),
in_list: bool = False,
) -> list[str]:
"""Render YAML for union_obj with conditionals for optional containers."""
lines: list[str] = []
ind = " " * indent
if isinstance(union_obj, dict):
for key, val in union_obj.items():
key_path = path + (str(key),)
cond_var = (
defined_var_name(role_prefix, key_path)
if key_path in optional_containers
else None
)
if _is_scalar(val) or val is None:
value = _yaml_scalar_placeholder(role_prefix, key_path, val)
if cond_var:
lines.append(f"{ind}{{% if {cond_var} is defined %}}")
lines.append(f"{ind}{key}: {value}")
if cond_var:
lines.append(f"{ind}{{% endif %}}")
else:
if cond_var:
lines.append(f"{ind}{{% if {cond_var} is defined %}}")
lines.append(f"{ind}{key}:")
lines.extend(
_yaml_render_union(
role_prefix,
val,
optional_containers,
indent=indent + 2,
path=key_path,
in_list=False,
)
)
if cond_var:
lines.append(f"{ind}{{% endif %}}")
return lines
if isinstance(union_obj, list):
for i, item in enumerate(union_obj):
item_path = path + (str(i),)
cond_var = (
defined_var_name(role_prefix, item_path)
if item_path in optional_containers
else None
)
if _is_scalar(item) or item is None:
value = _yaml_scalar_placeholder(role_prefix, item_path, item)
if cond_var:
lines.append(f"{ind}{{% if {cond_var} is defined %}}")
lines.append(f"{ind}- {value}")
if cond_var:
lines.append(f"{ind}{{% endif %}}")
elif isinstance(item, dict):
if cond_var:
lines.append(f"{ind}{{% if {cond_var} is defined %}}")
# First line: list marker with first key if possible
first = True
for k, v in item.items():
kp = item_path + (str(k),)
k_cond = (
defined_var_name(role_prefix, kp)
if kp in optional_containers
else None
)
if _is_scalar(v) or v is None:
value = _yaml_scalar_placeholder(role_prefix, kp, v)
if first:
if k_cond:
lines.append(f"{ind}{{% if {k_cond} is defined %}}")
lines.append(f"{ind}- {k}: {value}")
if k_cond:
lines.append(f"{ind}{{% endif %}}")
first = False
else:
if k_cond:
lines.append(f"{ind} {{% if {k_cond} is defined %}}")
lines.append(f"{ind} {k}: {value}")
if k_cond:
lines.append(f"{ind} {{% endif %}}")
else:
# nested
if first:
if k_cond:
lines.append(f"{ind}{{% if {k_cond} is defined %}}")
lines.append(f"{ind}- {k}:")
lines.extend(
_yaml_render_union(
role_prefix,
v,
optional_containers,
indent=indent + 4,
path=kp,
)
)
if k_cond:
lines.append(f"{ind}{{% endif %}}")
first = False
else:
if k_cond:
lines.append(f"{ind} {{% if {k_cond} is defined %}}")
lines.append(f"{ind} {k}:")
lines.extend(
_yaml_render_union(
role_prefix,
v,
optional_containers,
indent=indent + 4,
path=kp,
)
)
if k_cond:
lines.append(f"{ind} {{% endif %}}")
if first:
# empty dict item
lines.append(f"{ind}- {{}}")
if cond_var:
lines.append(f"{ind}{{% endif %}}")
else:
# list of lists - emit as scalar-ish fallback
value = f"{{{{ {make_var_name(role_prefix, item_path)} }}}}"
if cond_var:
lines.append(f"{ind}{{% if {cond_var} is defined %}}")
lines.append(f"{ind}- {value}")
if cond_var:
lines.append(f"{ind}{{% endif %}}")
return lines
# scalar at root
value = _yaml_scalar_placeholder(role_prefix, path, union_obj)
if in_list:
lines.append(f"{ind}- {value}")
else:
lines.append(f"{ind}{value}")
return lines
def _toml_render_union(
role_prefix: str,
union_obj: dict[str, Any],
optional_containers: set[tuple[str, ...]],
) -> str:
"""Render TOML union template with optional tables/keys."""
lines: list[str] = []
def emit_kv(path: tuple[str, ...], key: str, value: Any) -> None:
var_name = make_var_name(role_prefix, path + (key,))
cond = (
defined_var_name(role_prefix, path + (key,))
if (path + (key,)) in optional_containers
else None
)
if cond:
lines.append(f"{{% if {cond} is defined %}}")
if isinstance(value, str):
lines.append(f'{key} = "{{{{ {var_name} }}}}"')
elif isinstance(value, bool):
lines.append(f"{key} = {{{{ {var_name} | lower }}}}")
else:
lines.append(f"{key} = {{{{ {var_name} }}}}")
if cond:
lines.append("{% endif %}")
def walk(obj: dict[str, Any], path: tuple[str, ...]) -> None:
if path:
cond = (
defined_var_name(role_prefix, path)
if path in optional_containers
else None
)
if cond:
lines.append(f"{{% if {cond} is defined %}}")
lines.append(f"[{'.'.join(path)}]")
scalar_items = {k: v for k, v in obj.items() if not isinstance(v, dict)}
nested_items = {k: v for k, v in obj.items() if isinstance(v, dict)}
for k, v in scalar_items.items():
emit_kv(path, str(k), v)
if scalar_items:
lines.append("")
for k, v in nested_items.items():
walk(v, path + (str(k),))
if path and (path in optional_containers):
lines.append("{% endif %}")
lines.append("")
# root scalars
root_scalars = {k: v for k, v in union_obj.items() if not isinstance(v, dict)}
for k, v in root_scalars.items():
emit_kv((), str(k), v)
if root_scalars:
lines.append("")
for k, v in union_obj.items():
if isinstance(v, dict):
walk(v, (str(k),))
return "\n".join(lines).rstrip() + "\n"
def _ini_union_and_presence(
parsers: list[configparser.ConfigParser],
) -> tuple[configparser.ConfigParser, set[str], set[tuple[str, str]]]:
"""Build a union ConfigParser and compute optional sections/keys."""
union = configparser.ConfigParser()
union.optionxform = str # noqa
section_sets: list[set[str]] = []
key_sets: list[set[tuple[str, str]]] = []
for p in parsers:
sections = set(p.sections())
section_sets.append(sections)
keys: set[tuple[str, str]] = set()
for s in p.sections():
for k, _ in p.items(s, raw=True):
keys.add((s, k))
key_sets.append(keys)
for s in p.sections():
if not union.has_section(s):
union.add_section(s)
for k, v in p.items(s, raw=True):
if not union.has_option(s, k):
union.set(s, k, v)
if not section_sets:
return union, set(), set()
sec_union = set().union(*section_sets)
sec_inter = set.intersection(*section_sets)
optional_sections = sec_union - sec_inter
key_union = set().union(*key_sets)
key_inter = set.intersection(*key_sets)
optional_keys = key_union - key_inter
return union, optional_sections, optional_keys
def _ini_render_union(
role_prefix: str,
union: configparser.ConfigParser,
optional_sections: set[str],
optional_keys: set[tuple[str, str]],
) -> str:
lines: list[str] = []
for section in union.sections():
sec_cond = (
defined_var_name(role_prefix, (section,))
if section in optional_sections
else None
)
if sec_cond:
lines.append(f"{{% if {sec_cond} is defined %}}")
lines.append(f"[{section}]")
for key, raw_val in union.items(section, raw=True):
path = (section, key)
var = make_var_name(role_prefix, path)
key_cond = (
defined_var_name(role_prefix, path) if path in optional_keys else None
)
v = (raw_val or "").strip()
quoted = len(v) >= 2 and v[0] == v[-1] and v[0] in {'"', "'"}
if key_cond:
lines.append(f"{{% if {key_cond} is defined %}}")
if quoted:
lines.append(f'{key} = "{{{{ {var} }}}}"')
else:
lines.append(f"{key} = {{{{ {var} }}}}")
if key_cond:
lines.append("{% endif %}")
lines.append("")
if sec_cond:
lines.append("{% endif %}")
lines.append("")
return "\n".join(lines).rstrip() + "\n"
def _xml_collect_paths(
root: ET.Element,
) -> tuple[set[tuple[str, ...]], set[tuple[str, ...]]]:
"""Return (element_paths, leaf_paths) based on XmlHandler's flatten rules."""
element_paths: set[tuple[str, ...]] = set()
leaf_paths: set[tuple[str, ...]] = set()
def walk(elem: ET.Element, path: tuple[str, ...]) -> None:
element_paths.add(path)
for attr in elem.attrib:
leaf_paths.add(path + (f"@{attr}",))
children = [c for c in list(elem) if isinstance(c.tag, str)]
text = (elem.text or "").strip()
if text:
if not elem.attrib and not children:
leaf_paths.add(path)
else:
leaf_paths.add(path + ("value",))
counts = Counter(child.tag for child in children)
index_counters: dict[str, int] = defaultdict(int)
for child in children:
tag = child.tag
if counts[tag] > 1:
idx = index_counters[tag]
index_counters[tag] += 1
child_path = path + (tag, str(idx))
else:
child_path = path + (tag,)
walk(child, child_path)
walk(root, ())
return element_paths, leaf_paths
def _xml_merge_union(base: ET.Element, other: ET.Element) -> None:
"""Merge other into base in-place."""
# attributes
for k, v in other.attrib.items():
if k not in base.attrib:
base.set(k, v)
# text
if (base.text is None or not base.text.strip()) and (
other.text and other.text.strip()
):
base.text = other.text
# children
base_children = [c for c in list(base) if isinstance(c.tag, str)]
other_children = [c for c in list(other) if isinstance(c.tag, str)]
base_by_tag: dict[str, list[ET.Element]] = defaultdict(list)
other_by_tag: dict[str, list[ET.Element]] = defaultdict(list)
for c in base_children:
base_by_tag[c.tag].append(c)
for c in other_children:
other_by_tag[c.tag].append(c)
# preserve base ordering; append new tags at end
seen_tags = set(base_by_tag.keys())
tag_order = [c.tag for c in base_children if isinstance(c.tag, str)]
for t in other_by_tag.keys():
if t not in seen_tags:
tag_order.append(t)
# unique tags in order
ordered_tags: list[str] = []
for t in tag_order:
if t not in ordered_tags:
ordered_tags.append(t)
for tag in ordered_tags:
b_list = base_by_tag.get(tag, [])
o_list = other_by_tag.get(tag, [])
n = max(len(b_list), len(o_list))
for i in range(n):
if i < len(b_list) and i < len(o_list):
_xml_merge_union(b_list[i], o_list[i])
elif i < len(o_list):
base.append(deepcopy(o_list[i]))
def _xml_apply_jinja_union(
role_prefix: str,
root: ET.Element,
optional_elements: set[tuple[str, ...]],
) -> str:
"""Generate XML template with optional element conditionals."""
handler = XmlHandler()
def wrap_optional_children(elem: ET.Element, path: tuple[str, ...]) -> None:
children = [c for c in list(elem) if isinstance(c.tag, str)]
if not children:
return
# compute indexed paths the same way as flatten
counts = Counter(child.tag for child in children)
index_counters: dict[str, int] = defaultdict(int)
new_children: list[ET.Element] = []
for child in children:
tag = child.tag
if counts[tag] > 1:
idx = index_counters[tag]
index_counters[tag] += 1
child_path = path + (tag, str(idx))
else:
child_path = path + (tag,)
if child_path in optional_elements:
cond = defined_var_name(role_prefix, child_path)
new_children.append(ET.Comment(f"IF:{cond}"))
new_children.append(child)
new_children.append(ET.Comment(f"ENDIF:{cond}"))
else:
new_children.append(child)
wrap_optional_children(child, child_path)
# replace
for c in children:
elem.remove(c)
for c in new_children:
elem.append(c)
# Wrap optionals before applying scalar substitution so markers stay
wrap_optional_children(root, ())
handler._apply_jinja_to_xml_tree(role_prefix, root, loop_candidates=None) # type: ignore[attr-defined]
indent = getattr(ET, "indent", None)
if indent is not None:
indent(root, space=" ") # type: ignore[arg-type]
xml_body = ET.tostring(root, encoding="unicode")
# Reuse handler's conditional-marker replacement
xml_body = handler._insert_xml_loops(xml_body, role_prefix, [], root) # type: ignore[attr-defined]
return xml_body
@dataclass
class FormatOutput:
fmt: str
template: str
list_var: str
items: list[dict[str, Any]]
FOLDER_SUPPORTED_FORMATS: set[str] = {"json", "yaml", "toml", "ini", "xml"}
def process_directory(
root: Path, recursive: bool, role_prefix: str
) -> tuple[str, list[FormatOutput]]:
"""Process a directory (or single file) into defaults YAML + template(s)."""
files = iter_supported_files(root, recursive)
if not files:
raise ValueError(f"No supported config files found under: {root}")
# Parse and group by format
grouped: dict[str, list[tuple[Path, Any]]] = defaultdict(list)
for p in files:
fmt, parsed = parse_config(p, None)
if fmt not in FOLDER_SUPPORTED_FORMATS:
# Directory mode only supports a subset of formats for now.
continue
grouped[fmt].append((p, parsed))
if not grouped:
raise ValueError(f"No folder-supported config files found under: {root}")
multiple_formats = len(grouped) > 1
outputs: list[FormatOutput] = []
for fmt, entries in sorted(grouped.items()):
rel_ids = [
e[0].relative_to(root).as_posix() if root.is_dir() else e[0].name
for e in entries
]
parsed_list = [e[1] for e in entries]
# JSON: simplest robust union template
if fmt == "json":
list_var = (
f"{role_prefix}_{fmt}_items"
if multiple_formats
else f"{role_prefix}_items"
)
template = "{{ data | tojson(indent=2) }}\n"
items: list[dict[str, Any]] = []
for rid, parsed in zip(rel_ids, parsed_list):
items.append({"id": rid, "data": parsed})
outputs.append(
FormatOutput(fmt=fmt, template=template, list_var=list_var, items=items)
)
continue
# Dict-like formats (YAML/TOML) use union merge on parsed objects
if fmt in {"yaml", "toml"}:
union_obj: Any = deepcopy(parsed_list[0])
for p in parsed_list[1:]:
union_obj = _merge_union(union_obj, p)
container_sets: list[set[tuple[str, ...]]] = []
leaf_sets: list[set[tuple[str, ...]]] = []
for p in parsed_list:
containers, leaves = _collect_dict_like_paths(p)
container_sets.append(containers)
leaf_sets.append(leaves)
cont_union = set().union(*container_sets)
cont_inter = set.intersection(*container_sets) if container_sets else set()
optional_containers = cont_union - cont_inter
list_var = (
f"{role_prefix}_{fmt}_items"
if multiple_formats
else f"{role_prefix}_items"
)
if fmt == "yaml":
template_lines = _yaml_render_union(
role_prefix, union_obj, optional_containers
)
template = "\n".join(template_lines).rstrip() + "\n"
else:
if not isinstance(union_obj, dict):
raise TypeError("TOML union must be a dict")
template = _toml_render_union(
role_prefix, union_obj, optional_containers
)
# Build per-file item dicts (leaf vars + presence markers)
items: list[dict[str, Any]] = []
for rid, parsed, containers in zip(rel_ids, parsed_list, container_sets):
item: dict[str, Any] = {"id": rid}
flat = flatten_config(fmt, parsed, loop_candidates=None)
for path, value in flat:
item[make_var_name(role_prefix, path)] = value
for cpath in optional_containers:
if cpath in containers:
item[defined_var_name(role_prefix, cpath)] = True
items.append(item)
outputs.append(
FormatOutput(fmt=fmt, template=template, list_var=list_var, items=items)
)
continue
if fmt == "ini":
parsers = parsed_list
if not all(isinstance(p, configparser.ConfigParser) for p in parsers):
raise TypeError("INI parse must produce ConfigParser")
union, opt_sections, opt_keys = _ini_union_and_presence(parsers) # type: ignore[arg-type]
list_var = (
f"{role_prefix}_{fmt}_items"
if multiple_formats
else f"{role_prefix}_items"
)
template = _ini_render_union(role_prefix, union, opt_sections, opt_keys)
items: list[dict[str, Any]] = []
for rid, parser in zip(rel_ids, parsers): # type: ignore[arg-type]
item: dict[str, Any] = {"id": rid}
flat = flatten_config(fmt, parser, loop_candidates=None)
for path, value in flat:
item[make_var_name(role_prefix, path)] = value
# section presence
for sec in opt_sections:
if parser.has_section(sec):
item[defined_var_name(role_prefix, (sec,))] = True
# key presence
for sec, key in opt_keys:
if parser.has_option(sec, key):
item[defined_var_name(role_prefix, (sec, key))] = True
items.append(item)
outputs.append(
FormatOutput(fmt=fmt, template=template, list_var=list_var, items=items)
)
continue
if fmt == "xml":
if not all(isinstance(p, ET.Element) for p in parsed_list):
raise TypeError("XML parse must produce Element")
union_root = deepcopy(parsed_list[0])
for p in parsed_list[1:]:
_xml_merge_union(union_root, p)
elem_sets: list[set[tuple[str, ...]]] = []
for p in parsed_list:
elem_paths, _ = _xml_collect_paths(p)
elem_sets.append(elem_paths)
elem_union = set().union(*elem_sets)
elem_inter = set.intersection(*elem_sets) if elem_sets else set()
optional_elements = (elem_union - elem_inter) - {()} # never wrap root
list_var = (
f"{role_prefix}_{fmt}_items"
if multiple_formats
else f"{role_prefix}_items"
)
template = _xml_apply_jinja_union(
role_prefix, union_root, optional_elements
)
items: list[dict[str, Any]] = []
for rid, parsed, elems in zip(rel_ids, parsed_list, elem_sets):
item: dict[str, Any] = {"id": rid}
flat = flatten_config(fmt, parsed, loop_candidates=None)
for path, value in flat:
item[make_var_name(role_prefix, path)] = value
for epath in optional_elements:
if epath in elems:
item[defined_var_name(role_prefix, epath)] = True
items.append(item)
outputs.append(
FormatOutput(fmt=fmt, template=template, list_var=list_var, items=items)
)
continue
raise ValueError(f"Unsupported format in folder mode: {fmt}")
# Build combined defaults YAML
defaults_doc: dict[str, Any] = {}
for out in outputs:
defaults_doc[out.list_var] = out.items
defaults_yaml = dump_yaml(defaults_doc, sort_keys=True)
return defaults_yaml, outputs

View file

@ -14,7 +14,7 @@ def test_cli_stdout_toml(capsys):
cfg_path = SAMPLES_DIR / "tom.toml"
exit_code = cli._main([str(cfg_path), "-r", "jinjaturtle"])
assert exit_code
assert exit_code == 0
captured = capsys.readouterr()
out = captured.out
@ -48,7 +48,7 @@ def test_cli_writes_output_files(tmp_path, capsys):
]
)
assert exit_code
assert exit_code == 0
assert defaults_path.is_file()
assert template_path.is_file()

View file

@ -0,0 +1,33 @@
from __future__ import annotations
from pathlib import Path
import jinjaturtle.core as core
def test_postfix_main_cf_parsing_and_template(tmp_path: Path) -> None:
p = tmp_path / "main.cf"
p.write_text(
"# comment\n"
"myhostname = mail.example.com\n"
"mynetworks = 127.0.0.0/8\n"
" [::1]/128\n",
encoding="utf-8",
)
fmt, parsed = core.parse_config(p)
assert fmt == "postfix"
flat = core.flatten_config(fmt, parsed)
assert (("myhostname",), "mail.example.com") in flat
assert any(
path == ("mynetworks",) and value.startswith("127.0.0.0/8")
for path, value in flat
)
template = core.generate_jinja2_template(
fmt, parsed, role_prefix="role", original_text=p.read_text(encoding="utf-8")
)
assert "myhostname = {{ role_myhostname }}" in template
assert "mynetworks = {{ role_mynetworks }}" in template
assert "# comment" in template

View file

@ -0,0 +1,26 @@
from __future__ import annotations
from pathlib import Path
import jinjaturtle.core as core
def test_systemd_unit_repeated_keys(tmp_path: Path) -> None:
p = tmp_path / "demo.service"
p.write_text(
"[Service]\n" "ExecStart=/bin/echo one\n" "ExecStart=/bin/echo two\n",
encoding="utf-8",
)
fmt, parsed = core.parse_config(p)
assert fmt == "systemd"
flat = core.flatten_config(fmt, parsed)
assert (("Service", "ExecStart", "0"), "/bin/echo one") in flat
assert (("Service", "ExecStart", "1"), "/bin/echo two") in flat
template = core.generate_jinja2_template(
fmt, parsed, role_prefix="role", original_text=p.read_text(encoding="utf-8")
)
assert "ExecStart={{ role_service_execstart_0 }}" in template
assert "ExecStart={{ role_service_execstart_1 }}" in template