Compare commits
No commits in common. "main" and "0.3.4" have entirely different histories.
18 changed files with 28 additions and 1655 deletions
|
|
@ -1,67 +0,0 @@
|
||||||
name: CI
|
|
||||||
|
|
||||||
on:
|
|
||||||
push:
|
|
||||||
|
|
||||||
jobs:
|
|
||||||
test:
|
|
||||||
runs-on: docker
|
|
||||||
|
|
||||||
steps:
|
|
||||||
- name: Install system dependencies
|
|
||||||
run: |
|
|
||||||
apt-get update
|
|
||||||
DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \
|
|
||||||
build-essential \
|
|
||||||
devscripts \
|
|
||||||
debhelper \
|
|
||||||
dh-python \
|
|
||||||
pybuild-plugin-pyproject \
|
|
||||||
python3-all \
|
|
||||||
python3-poetry-core \
|
|
||||||
python3-yaml \
|
|
||||||
python3-defusedxml \
|
|
||||||
python3-jinja2 \
|
|
||||||
python3-toml \
|
|
||||||
rsync \
|
|
||||||
ca-certificates
|
|
||||||
|
|
||||||
- name: Checkout
|
|
||||||
uses: actions/checkout@v4
|
|
||||||
with:
|
|
||||||
submodules: recursive
|
|
||||||
|
|
||||||
- name: Build deb
|
|
||||||
run: |
|
|
||||||
mkdir /out
|
|
||||||
|
|
||||||
rsync -a --delete \
|
|
||||||
--exclude '.git' \
|
|
||||||
--exclude '.venv' \
|
|
||||||
--exclude 'dist' \
|
|
||||||
--exclude 'build' \
|
|
||||||
--exclude '__pycache__' \
|
|
||||||
--exclude '.pytest_cache' \
|
|
||||||
--exclude '.mypy_cache' \
|
|
||||||
./ /out/
|
|
||||||
|
|
||||||
cd /out/
|
|
||||||
export DEBEMAIL="mig@mig5.net"
|
|
||||||
export DEBFULLNAME="Miguel Jacq"
|
|
||||||
|
|
||||||
dch --distribution "trixie" --local "~trixie" "CI build for trixie"
|
|
||||||
dpkg-buildpackage -us -uc -b
|
|
||||||
|
|
||||||
# Notify if any previous step in this job failed
|
|
||||||
- name: Notify on failure
|
|
||||||
if: ${{ failure() }}
|
|
||||||
env:
|
|
||||||
WEBHOOK_URL: ${{ secrets.NODERED_WEBHOOK_URL }}
|
|
||||||
REPOSITORY: ${{ forgejo.repository }}
|
|
||||||
RUN_NUMBER: ${{ forgejo.run_number }}
|
|
||||||
SERVER_URL: ${{ forgejo.server_url }}
|
|
||||||
run: |
|
|
||||||
curl -X POST \
|
|
||||||
-H "Content-Type: application/json" \
|
|
||||||
-d "{\"repository\":\"$REPOSITORY\",\"run_number\":\"$RUN_NUMBER\",\"status\":\"failure\",\"url\":\"$SERVER_URL/$REPOSITORY/actions/runs/$RUN_NUMBER\"}" \
|
|
||||||
"$WEBHOOK_URL"
|
|
||||||
|
|
@ -23,7 +23,7 @@ jobs:
|
||||||
|
|
||||||
- name: Run trivy
|
- name: Run trivy
|
||||||
run: |
|
run: |
|
||||||
trivy fs --no-progress --ignore-unfixed --format table --disable-telemetry --skip-version-check --exit-code 1 .
|
trivy fs --no-progress --ignore-unfixed --format table --disable-telemetry .
|
||||||
|
|
||||||
# Notify if any previous step in this job failed
|
# Notify if any previous step in this job failed
|
||||||
- name: Notify on failure
|
- name: Notify on failure
|
||||||
|
|
|
||||||
|
|
@ -1,87 +0,0 @@
|
||||||
# syntax=docker/dockerfile:1
|
|
||||||
ARG BASE_IMAGE=fedora:42
|
|
||||||
FROM ${BASE_IMAGE}
|
|
||||||
|
|
||||||
RUN set -eux; \
|
|
||||||
dnf -y update; \
|
|
||||||
dnf -y install \
|
|
||||||
rpm-build \
|
|
||||||
rpmdevtools \
|
|
||||||
redhat-rpm-config \
|
|
||||||
gcc \
|
|
||||||
make \
|
|
||||||
findutils \
|
|
||||||
tar \
|
|
||||||
gzip \
|
|
||||||
rsync \
|
|
||||||
python3 \
|
|
||||||
python3-devel \
|
|
||||||
python3-setuptools \
|
|
||||||
python3-wheel \
|
|
||||||
pyproject-rpm-macros \
|
|
||||||
python3-rpm-macros \
|
|
||||||
python3-yaml \
|
|
||||||
python3-tomli \
|
|
||||||
python3-defusedxml \
|
|
||||||
python3-jinja2 \
|
|
||||||
openssl-devel \
|
|
||||||
python3-poetry-core ; \
|
|
||||||
dnf -y clean all
|
|
||||||
|
|
||||||
# Build runner script (copies repo, tars, runs rpmbuild)
|
|
||||||
RUN set -eux; cat > /usr/local/bin/build-rpm <<'EOF'
|
|
||||||
#!/usr/bin/env bash
|
|
||||||
set -euo pipefail
|
|
||||||
|
|
||||||
SRC="${SRC:-/src}"
|
|
||||||
WORKROOT="${WORKROOT:-/work}"
|
|
||||||
OUT="${OUT:-/out}"
|
|
||||||
|
|
||||||
mkdir -p "${WORKROOT}" "${OUT}"
|
|
||||||
WORK="${WORKROOT}/src"
|
|
||||||
rm -rf "${WORK}"
|
|
||||||
mkdir -p "${WORK}"
|
|
||||||
|
|
||||||
rsync -a --delete \
|
|
||||||
--exclude '.git' \
|
|
||||||
--exclude '.venv' \
|
|
||||||
--exclude 'dist' \
|
|
||||||
--exclude 'build' \
|
|
||||||
--exclude '__pycache__' \
|
|
||||||
--exclude '.pytest_cache' \
|
|
||||||
--exclude '.mypy_cache' \
|
|
||||||
"${SRC}/" "${WORK}/"
|
|
||||||
|
|
||||||
cd "${WORK}"
|
|
||||||
|
|
||||||
# Determine version from pyproject.toml unless provided
|
|
||||||
if [ -n "${VERSION:-}" ]; then
|
|
||||||
ver="${VERSION}"
|
|
||||||
else
|
|
||||||
ver="$(grep -m1 '^version = ' pyproject.toml | sed -E 's/version = "([^"]+)".*/\1/')"
|
|
||||||
fi
|
|
||||||
|
|
||||||
TOPDIR="${WORKROOT}/rpmbuild"
|
|
||||||
mkdir -p "${TOPDIR}"/{BUILD,BUILDROOT,RPMS,SOURCES,SPECS,SRPMS}
|
|
||||||
|
|
||||||
tarball="${TOPDIR}/SOURCES/jinjaturtle-${ver}.tar.gz"
|
|
||||||
tar -czf "${tarball}" --transform "s#^#jinjaturtle/#" .
|
|
||||||
|
|
||||||
spec_src="rpm/jinjaturtle.spec"
|
|
||||||
|
|
||||||
cp -v "${spec_src}" "${TOPDIR}/SPECS/jinjaturtle.spec"
|
|
||||||
|
|
||||||
rpmbuild -ba "${TOPDIR}/SPECS/jinjaturtle.spec" \
|
|
||||||
--define "_topdir ${TOPDIR}" \
|
|
||||||
--define "upstream_version ${ver}"
|
|
||||||
|
|
||||||
shopt -s nullglob
|
|
||||||
cp -v "${TOPDIR}"/RPMS/*/*.rpm "${OUT}/" || true
|
|
||||||
cp -v "${TOPDIR}"/SRPMS/*.src.rpm "${OUT}/" || true
|
|
||||||
echo "Artifacts copied to ${OUT}"
|
|
||||||
EOF
|
|
||||||
|
|
||||||
RUN chmod +x /usr/local/bin/build-rpm
|
|
||||||
|
|
||||||
WORKDIR /work
|
|
||||||
ENTRYPOINT ["/usr/local/bin/build-rpm"]
|
|
||||||
64
README.md
64
README.md
|
|
@ -5,12 +5,11 @@
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
JinjaTurtle is a command-line tool to help you generate Jinja2 templates and
|
JinjaTurtle is a command-line tool to help you generate Jinja2 templates and
|
||||||
Ansible inventory from a native configuration file (or files) of a piece of
|
Ansible inventory from a native configuration file of a piece of software.
|
||||||
software.
|
|
||||||
|
|
||||||
## How it works
|
## How it works
|
||||||
|
|
||||||
* The config file(s) is/are examined
|
* The config file is examined
|
||||||
* Parameter key names are generated based on the parameter names in the
|
* Parameter key names are generated based on the parameter names in the
|
||||||
config file. In keeping with Ansible best practices, you pass a prefix
|
config file. In keeping with Ansible best practices, you pass a prefix
|
||||||
for the key names, which should typically match the name of your Ansible
|
for the key names, which should typically match the name of your Ansible
|
||||||
|
|
@ -18,7 +17,7 @@ software.
|
||||||
* A Jinja2 file is generated from the file with those parameter key names
|
* A Jinja2 file is generated from the file with those parameter key names
|
||||||
injected as the `{{ variable }}` names.
|
injected as the `{{ variable }}` names.
|
||||||
* An Ansible inventory YAML file is generated with those key names and the
|
* An Ansible inventory YAML file is generated with those key names and the
|
||||||
*values* taken from the original config file as the default vars.
|
*values* taken from the original config file as the defaults.
|
||||||
|
|
||||||
By default, the Jinja2 template and the Ansible inventory are printed to
|
By default, the Jinja2 template and the Ansible inventory are printed to
|
||||||
stdout. However, it is possible to output the results to new files.
|
stdout. However, it is possible to output the results to new files.
|
||||||
|
|
@ -39,29 +38,6 @@ You may need or wish to tidy up the config to suit your needs.
|
||||||
The goal here is really to *speed up* converting files into Ansible/Jinja2,
|
The goal here is really to *speed up* converting files into Ansible/Jinja2,
|
||||||
but not necessarily to make it perfect.
|
but not necessarily to make it perfect.
|
||||||
|
|
||||||
## Can I convert multiple files at once?
|
|
||||||
|
|
||||||
Certainly! Pass the folder name instead of a specific file name, and JinjaTurtle
|
|
||||||
will convert any files it understands in that folder, storing all the various
|
|
||||||
vars in the destination defaults yaml file, and converting each file into a
|
|
||||||
Jinja2 template per file type.
|
|
||||||
|
|
||||||
If all the files had the same 'type', there'll be one Jinja2 template.
|
|
||||||
|
|
||||||
You can also pass `--recursive` to recurse into subfolders.
|
|
||||||
|
|
||||||
Note: when using 'folder' mode and multiple files of the same type, their vars
|
|
||||||
will be listed under an 'items' parent key in the yaml, each with an `id` key.
|
|
||||||
You'll then want to use a `loop` in Ansible later, e.g:
|
|
||||||
|
|
||||||
```yaml
|
|
||||||
- name: Render configs
|
|
||||||
template:
|
|
||||||
src: config.j2
|
|
||||||
dest: "/somewhere/{{ item.id }}"
|
|
||||||
loop: "{{ myrole_items }}"
|
|
||||||
```
|
|
||||||
|
|
||||||
## How to install it
|
## How to install it
|
||||||
|
|
||||||
### Ubuntu/Debian apt repository
|
### Ubuntu/Debian apt repository
|
||||||
|
|
@ -74,25 +50,6 @@ sudo apt update
|
||||||
sudo apt install jinjaturtle
|
sudo apt install jinjaturtle
|
||||||
```
|
```
|
||||||
|
|
||||||
### Fedora
|
|
||||||
|
|
||||||
```bash
|
|
||||||
sudo rpm --import https://mig5.net/static/mig5.asc
|
|
||||||
|
|
||||||
sudo tee /etc/yum.repos.d/mig5.repo > /dev/null << 'EOF'
|
|
||||||
[mig5]
|
|
||||||
name=mig5 Repository
|
|
||||||
baseurl=https://rpm.mig5.net/$releasever/rpm/$basearch
|
|
||||||
enabled=1
|
|
||||||
gpgcheck=1
|
|
||||||
repo_gpgcheck=1
|
|
||||||
gpgkey=https://mig5.net/static/mig5.asc
|
|
||||||
EOF
|
|
||||||
|
|
||||||
sudo dnf upgrade --refresh
|
|
||||||
sudo dnf install jinjaturtle
|
|
||||||
```
|
|
||||||
|
|
||||||
### From PyPi
|
### From PyPi
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|
@ -127,12 +84,12 @@ jinjaturtle php.ini \
|
||||||
## Full usage info
|
## Full usage info
|
||||||
|
|
||||||
```
|
```
|
||||||
usage: jinjaturtle [-h] -r ROLE_NAME [-f {json,ini,toml,yaml,xml,postfix,systemd}] [-d DEFAULTS_OUTPUT] [-t TEMPLATE_OUTPUT] config
|
usage: jinjaturtle [-h] -r ROLE_NAME [-f {json,ini,toml,yaml,xml}] [-d DEFAULTS_OUTPUT] [-t TEMPLATE_OUTPUT] config
|
||||||
|
|
||||||
Convert a config file into Ansible inventory and a Jinja2 template.
|
Convert a config file into Ansible inventory and a Jinja2 template.
|
||||||
|
|
||||||
positional arguments:
|
positional arguments:
|
||||||
config Path to the source configuration file.
|
config Path to the source configuration file (TOML or INI-style).
|
||||||
|
|
||||||
options:
|
options:
|
||||||
-h, --help show this help message and exit
|
-h, --help show this help message and exit
|
||||||
|
|
@ -141,20 +98,11 @@ options:
|
||||||
-f, --format {ini,json,toml,xml}
|
-f, --format {ini,json,toml,xml}
|
||||||
Force config format instead of auto-detecting from filename.
|
Force config format instead of auto-detecting from filename.
|
||||||
-d, --defaults-output DEFAULTS_OUTPUT
|
-d, --defaults-output DEFAULTS_OUTPUT
|
||||||
Path to write defaults/main.yml. If omitted, default vars are printed to stdout.
|
Path to write defaults/main.yml. If omitted, defaults YAML is printed to stdout.
|
||||||
-t, --template-output TEMPLATE_OUTPUT
|
-t, --template-output TEMPLATE_OUTPUT
|
||||||
Path to write the Jinja2 config template. If omitted, template is printed to stdout.
|
Path to write the Jinja2 config template. If omitted, template is printed to stdout.
|
||||||
```
|
```
|
||||||
|
|
||||||
## Additional supported formats
|
|
||||||
|
|
||||||
JinjaTurtle can also template some common "bespoke" config formats:
|
|
||||||
|
|
||||||
- **Postfix main.cf** (`main.cf`) → `--format postfix`
|
|
||||||
- **systemd unit files** (`*.service`, `*.socket`, etc.) → `--format systemd`
|
|
||||||
|
|
||||||
For ambiguous extensions like `*.conf`, JinjaTurtle uses lightweight content sniffing; you can always force a specific handler via `--format`.
|
|
||||||
|
|
||||||
|
|
||||||
## Found a bug, have a suggestion?
|
## Found a bug, have a suggestion?
|
||||||
|
|
||||||
|
|
|
||||||
6
debian/changelog
vendored
6
debian/changelog
vendored
|
|
@ -1,9 +1,3 @@
|
||||||
jinjaturtle (0.3.5) unstable; urgency=medium
|
|
||||||
|
|
||||||
* Support converting a directory (optionally recursively) instead of just an individual file.
|
|
||||||
|
|
||||||
-- Miguel Jacq <mig@mig5.net> Tue, 30 Dec 2025 16:30:00 +1100
|
|
||||||
|
|
||||||
jinjaturtle (0.3.4) unstable; urgency=medium
|
jinjaturtle (0.3.4) unstable; urgency=medium
|
||||||
|
|
||||||
* Render json files in a more pretty way
|
* Render json files in a more pretty way
|
||||||
|
|
|
||||||
2
debian/control
vendored
2
debian/control
vendored
|
|
@ -25,4 +25,4 @@ Depends:
|
||||||
python3-toml,
|
python3-toml,
|
||||||
python3-defusedxml,
|
python3-defusedxml,
|
||||||
python3-jinja2
|
python3-jinja2
|
||||||
Description: Convert config files into Ansible vars and Jinja2 templates.
|
Description: Convert config files into Ansible defaults and Jinja2 templates.
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
[tool.poetry]
|
[tool.poetry]
|
||||||
name = "jinjaturtle"
|
name = "jinjaturtle"
|
||||||
version = "0.4.0"
|
version = "0.3.4"
|
||||||
description = "Convert config files into Ansible defaults and Jinja2 templates."
|
description = "Convert config files into Ansible defaults and Jinja2 templates."
|
||||||
authors = ["Miguel Jacq <mig@mig5.net>"]
|
authors = ["Miguel Jacq <mig@mig5.net>"]
|
||||||
license = "GPL-3.0-or-later"
|
license = "GPL-3.0-or-later"
|
||||||
|
|
|
||||||
49
release.sh
49
release.sh
|
|
@ -42,52 +42,3 @@ for dist in ${DISTS[@]}; do
|
||||||
debfile=$(ls -1 dist/${release}/*.deb)
|
debfile=$(ls -1 dist/${release}/*.deb)
|
||||||
reprepro -b /home/user/git/repo includedeb "${release}" "${debfile}"
|
reprepro -b /home/user/git/repo includedeb "${release}" "${debfile}"
|
||||||
done
|
done
|
||||||
|
|
||||||
# RPM
|
|
||||||
sudo apt-get -y install createrepo-c rpm
|
|
||||||
BUILD_OUTPUT="${HOME}/git/jinjaturtle/dist"
|
|
||||||
KEYID="00AE817C24A10C2540461A9C1D7CDE0234DB458D"
|
|
||||||
REPO_ROOT="${HOME}/git/repo_rpm"
|
|
||||||
REMOTE="letessier.mig5.net:/opt/repo_rpm"
|
|
||||||
|
|
||||||
DISTS=(
|
|
||||||
fedora:43
|
|
||||||
fedora:42
|
|
||||||
)
|
|
||||||
|
|
||||||
for dist in ${DISTS[@]}; do
|
|
||||||
release=$(echo ${dist} | cut -d: -f2)
|
|
||||||
REPO_RELEASE_ROOT="${REPO_ROOT}/${release}"
|
|
||||||
RPM_REPO="${REPO_RELEASE_ROOT}/rpm/x86_64"
|
|
||||||
mkdir -p "$RPM_REPO"
|
|
||||||
|
|
||||||
docker build \
|
|
||||||
--no-cache \
|
|
||||||
-f Dockerfile.rpmbuild \
|
|
||||||
-t jinjaturtle-rpm:${release} \
|
|
||||||
--progress=plain \
|
|
||||||
--build-arg BASE_IMAGE=${dist} \
|
|
||||||
.
|
|
||||||
|
|
||||||
rm -rf "$PWD/dist/rpm"/*
|
|
||||||
mkdir -p "$PWD/dist/rpm"
|
|
||||||
|
|
||||||
docker run --rm -v "$PWD":/src -v "$PWD/dist/rpm":/out jinjaturtle-rpm:${release}
|
|
||||||
sudo chown -R "${USER}" "$PWD/dist"
|
|
||||||
|
|
||||||
for file in `ls -1 "${BUILD_OUTPUT}/rpm"`; do
|
|
||||||
rpmsign --addsign "${BUILD_OUTPUT}/rpm/$file"
|
|
||||||
done
|
|
||||||
|
|
||||||
cp "${BUILD_OUTPUT}/rpm/"*.rpm "$RPM_REPO/"
|
|
||||||
|
|
||||||
createrepo_c "$RPM_REPO"
|
|
||||||
|
|
||||||
echo "==> Signing repomd.xml..."
|
|
||||||
qubes-gpg-client --local-user "$KEYID" --detach-sign --armor "$RPM_REPO/repodata/repomd.xml" > "$RPM_REPO/repodata/repomd.xml.asc"
|
|
||||||
done
|
|
||||||
|
|
||||||
echo "==> Syncing repo to server..."
|
|
||||||
rsync -aHPvz --exclude=.git --delete "$REPO_ROOT/" "$REMOTE/"
|
|
||||||
|
|
||||||
echo "Done!"
|
|
||||||
|
|
|
||||||
|
|
@ -1,48 +0,0 @@
|
||||||
%global upstream_version 0.3.5
|
|
||||||
|
|
||||||
Name: jinjaturtle
|
|
||||||
Version: %{upstream_version}
|
|
||||||
Release: 1%{?dist}.jinjaturtle1
|
|
||||||
Summary: Convert config files into Ansible vars and Jinja2 templates.
|
|
||||||
|
|
||||||
License: GPL-3.0-or-later
|
|
||||||
URL: https://git.mig5.net/mig5/jinjaturtle
|
|
||||||
Source0: %{name}-%{version}.tar.gz
|
|
||||||
|
|
||||||
BuildArch: noarch
|
|
||||||
|
|
||||||
BuildRequires: pyproject-rpm-macros
|
|
||||||
BuildRequires: python3-devel
|
|
||||||
BuildRequires: python3-poetry-core
|
|
||||||
|
|
||||||
Requires: python3-yaml
|
|
||||||
Requires: python3-tomli
|
|
||||||
Requires: python3-defusedxml
|
|
||||||
Requires: python3-jinja2
|
|
||||||
|
|
||||||
%description
|
|
||||||
Convert config files into Ansible defaults and Jinja2 templates.
|
|
||||||
|
|
||||||
%prep
|
|
||||||
%autosetup -n jinjaturtle
|
|
||||||
|
|
||||||
%generate_buildrequires
|
|
||||||
%pyproject_buildrequires
|
|
||||||
|
|
||||||
%build
|
|
||||||
%pyproject_wheel
|
|
||||||
|
|
||||||
%install
|
|
||||||
%pyproject_install
|
|
||||||
%pyproject_save_files jinjaturtle
|
|
||||||
|
|
||||||
%files -f %{pyproject_files}
|
|
||||||
%license LICENSE
|
|
||||||
%doc README.md
|
|
||||||
%{_bindir}/jinjaturtle
|
|
||||||
|
|
||||||
%changelog
|
|
||||||
* Tue Dec 30 2025 Miguel Jacq <mig@mig5.net> - %{version}-%{release}
|
|
||||||
- Support converting a directory (optionally recursively) instead of just an individual file.
|
|
||||||
* Sat Dec 27 2025 Miguel Jacq <mig@mig5.net> - %{version}-%{release}
|
|
||||||
- Initial RPM packaging for Fedora 42
|
|
||||||
|
|
@ -13,8 +13,6 @@ from .core import (
|
||||||
generate_jinja2_template,
|
generate_jinja2_template,
|
||||||
)
|
)
|
||||||
|
|
||||||
from .multi import process_directory
|
|
||||||
|
|
||||||
|
|
||||||
def _build_arg_parser() -> argparse.ArgumentParser:
|
def _build_arg_parser() -> argparse.ArgumentParser:
|
||||||
ap = argparse.ArgumentParser(
|
ap = argparse.ArgumentParser(
|
||||||
|
|
@ -23,26 +21,18 @@ def _build_arg_parser() -> argparse.ArgumentParser:
|
||||||
)
|
)
|
||||||
ap.add_argument(
|
ap.add_argument(
|
||||||
"config",
|
"config",
|
||||||
help=(
|
help="Path to the source configuration file (TOML, YAML, JSON or INI-style).",
|
||||||
"Path to a config file OR a folder containing supported config files. "
|
|
||||||
"Supported: .toml, .yaml/.yml, .json, .ini/.cfg/.conf, .xml"
|
|
||||||
),
|
|
||||||
)
|
)
|
||||||
ap.add_argument(
|
ap.add_argument(
|
||||||
"-r",
|
"-r",
|
||||||
"--role-name",
|
"--role-name",
|
||||||
default="jinjaturtle",
|
required=True,
|
||||||
help="Ansible role name, used as variable prefix (default: jinjaturtle).",
|
help="Ansible role name, used as variable prefix (e.g. cometbft).",
|
||||||
)
|
|
||||||
ap.add_argument(
|
|
||||||
"--recursive",
|
|
||||||
action="store_true",
|
|
||||||
help="When CONFIG is a folder, recurse into subfolders.",
|
|
||||||
)
|
)
|
||||||
ap.add_argument(
|
ap.add_argument(
|
||||||
"-f",
|
"-f",
|
||||||
"--format",
|
"--format",
|
||||||
choices=["ini", "json", "toml", "yaml", "xml", "postfix", "systemd"],
|
choices=["ini", "json", "toml", "yaml", "xml"],
|
||||||
help="Force config format instead of auto-detecting from filename.",
|
help="Force config format instead of auto-detecting from filename.",
|
||||||
)
|
)
|
||||||
ap.add_argument(
|
ap.add_argument(
|
||||||
|
|
@ -64,40 +54,6 @@ def _main(argv: list[str] | None = None) -> int:
|
||||||
args = parser.parse_args(argv)
|
args = parser.parse_args(argv)
|
||||||
|
|
||||||
config_path = Path(args.config)
|
config_path = Path(args.config)
|
||||||
|
|
||||||
# Folder mode
|
|
||||||
if config_path.is_dir():
|
|
||||||
defaults_yaml, outputs = process_directory(
|
|
||||||
config_path, args.recursive, args.role_name
|
|
||||||
)
|
|
||||||
|
|
||||||
# Write defaults
|
|
||||||
if args.defaults_output:
|
|
||||||
Path(args.defaults_output).write_text(defaults_yaml, encoding="utf-8")
|
|
||||||
else:
|
|
||||||
print("# defaults/main.yml")
|
|
||||||
print(defaults_yaml, end="")
|
|
||||||
|
|
||||||
# Write templates
|
|
||||||
if args.template_output:
|
|
||||||
out_path = Path(args.template_output)
|
|
||||||
if len(outputs) == 1 and not out_path.is_dir():
|
|
||||||
out_path.write_text(outputs[0].template, encoding="utf-8")
|
|
||||||
else:
|
|
||||||
out_path.mkdir(parents=True, exist_ok=True)
|
|
||||||
for o in outputs:
|
|
||||||
(out_path / f"config.{o.fmt}.j2").write_text(
|
|
||||||
o.template, encoding="utf-8"
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
for o in outputs:
|
|
||||||
name = "config.j2" if len(outputs) == 1 else f"config.{o.fmt}.j2"
|
|
||||||
print(f"# {name}")
|
|
||||||
print(o.template, end="")
|
|
||||||
|
|
||||||
return 0
|
|
||||||
|
|
||||||
# Single-file mode (existing behaviour)
|
|
||||||
config_text = config_path.read_text(encoding="utf-8")
|
config_text = config_path.read_text(encoding="utf-8")
|
||||||
|
|
||||||
# Parse the config
|
# Parse the config
|
||||||
|
|
@ -133,7 +89,7 @@ def _main(argv: list[str] | None = None) -> int:
|
||||||
print("# config.j2")
|
print("# config.j2")
|
||||||
print(template_str, end="")
|
print(template_str, end="")
|
||||||
|
|
||||||
return 0
|
return True
|
||||||
|
|
||||||
|
|
||||||
def main() -> None:
|
def main() -> None:
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,6 @@ from pathlib import Path
|
||||||
from typing import Any, Iterable
|
from typing import Any, Iterable
|
||||||
|
|
||||||
import datetime
|
import datetime
|
||||||
import re
|
|
||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
from .loop_analyzer import LoopAnalyzer, LoopCandidate
|
from .loop_analyzer import LoopAnalyzer, LoopCandidate
|
||||||
|
|
@ -15,8 +14,6 @@ from .handlers import (
|
||||||
TomlHandler,
|
TomlHandler,
|
||||||
YamlHandler,
|
YamlHandler,
|
||||||
XmlHandler,
|
XmlHandler,
|
||||||
PostfixMainHandler,
|
|
||||||
SystemdUnitHandler,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -59,34 +56,12 @@ _TOML_HANDLER = TomlHandler()
|
||||||
_YAML_HANDLER = YamlHandler()
|
_YAML_HANDLER = YamlHandler()
|
||||||
_XML_HANDLER = XmlHandler()
|
_XML_HANDLER = XmlHandler()
|
||||||
|
|
||||||
_POSTFIX_HANDLER = PostfixMainHandler()
|
|
||||||
_SYSTEMD_HANDLER = SystemdUnitHandler()
|
|
||||||
|
|
||||||
_HANDLERS["ini"] = _INI_HANDLER
|
_HANDLERS["ini"] = _INI_HANDLER
|
||||||
_HANDLERS["json"] = _JSON_HANDLER
|
_HANDLERS["json"] = _JSON_HANDLER
|
||||||
_HANDLERS["toml"] = _TOML_HANDLER
|
_HANDLERS["toml"] = _TOML_HANDLER
|
||||||
_HANDLERS["yaml"] = _YAML_HANDLER
|
_HANDLERS["yaml"] = _YAML_HANDLER
|
||||||
_HANDLERS["xml"] = _XML_HANDLER
|
_HANDLERS["xml"] = _XML_HANDLER
|
||||||
|
|
||||||
_HANDLERS["postfix"] = _POSTFIX_HANDLER
|
|
||||||
_HANDLERS["systemd"] = _SYSTEMD_HANDLER
|
|
||||||
|
|
||||||
|
|
||||||
def dump_yaml(data: Any, *, sort_keys: bool = True) -> str:
|
|
||||||
"""Dump YAML using JinjaTurtle's dumper settings.
|
|
||||||
|
|
||||||
This is used by both the single-file and multi-file code paths.
|
|
||||||
"""
|
|
||||||
return yaml.dump(
|
|
||||||
data,
|
|
||||||
Dumper=_TurtleDumper,
|
|
||||||
sort_keys=sort_keys,
|
|
||||||
default_flow_style=False,
|
|
||||||
allow_unicode=True,
|
|
||||||
explicit_start=True,
|
|
||||||
indent=2,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def make_var_name(role_prefix: str, path: Iterable[str]) -> str:
|
def make_var_name(role_prefix: str, path: Iterable[str]) -> str:
|
||||||
"""
|
"""
|
||||||
|
|
@ -95,92 +70,24 @@ def make_var_name(role_prefix: str, path: Iterable[str]) -> str:
|
||||||
return BaseHandler.make_var_name(role_prefix, path)
|
return BaseHandler.make_var_name(role_prefix, path)
|
||||||
|
|
||||||
|
|
||||||
def _read_head(path: Path, max_bytes: int = 65536) -> str:
|
|
||||||
try:
|
|
||||||
with path.open("r", encoding="utf-8", errors="replace") as f:
|
|
||||||
return f.read(max_bytes)
|
|
||||||
except OSError:
|
|
||||||
return ""
|
|
||||||
|
|
||||||
|
|
||||||
_SYSTEMD_SUFFIXES: set[str] = {
|
|
||||||
".service",
|
|
||||||
".socket",
|
|
||||||
".target",
|
|
||||||
".timer",
|
|
||||||
".path",
|
|
||||||
".mount",
|
|
||||||
".automount",
|
|
||||||
".slice",
|
|
||||||
".swap",
|
|
||||||
".scope",
|
|
||||||
".link",
|
|
||||||
".netdev",
|
|
||||||
".network",
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def _looks_like_systemd(text: str) -> bool:
|
|
||||||
# Be conservative: many INI-style configs have [section] and key=value.
|
|
||||||
# systemd unit files almost always contain one of these well-known sections.
|
|
||||||
if re.search(
|
|
||||||
r"^\s*\[(Unit|Service|Install|Socket|Timer|Path|Mount|Automount|Slice|Swap|Scope)\]\s*$",
|
|
||||||
text,
|
|
||||||
re.M,
|
|
||||||
) and re.search(r"^\s*\w[\w\-]*\s*=", text, re.M):
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def detect_format(path: Path, explicit: str | None = None) -> str:
|
def detect_format(path: Path, explicit: str | None = None) -> str:
|
||||||
"""
|
"""
|
||||||
Determine config format.
|
Determine config format from argument or filename.
|
||||||
|
|
||||||
For unambiguous extensions (json/yaml/toml/xml/ini), we rely on the suffix.
|
|
||||||
For ambiguous extensions like '.conf' (or no extension), we sniff the content.
|
|
||||||
"""
|
"""
|
||||||
if explicit:
|
if explicit:
|
||||||
return explicit
|
return explicit
|
||||||
|
|
||||||
suffix = path.suffix.lower()
|
suffix = path.suffix.lower()
|
||||||
name = path.name.lower()
|
name = path.name.lower()
|
||||||
|
|
||||||
# Unambiguous extensions
|
|
||||||
if suffix == ".toml":
|
if suffix == ".toml":
|
||||||
return "toml"
|
return "toml"
|
||||||
if suffix in {".yaml", ".yml"}:
|
if suffix in {".yaml", ".yml"}:
|
||||||
return "yaml"
|
return "yaml"
|
||||||
if suffix == ".json":
|
if suffix == ".json":
|
||||||
return "json"
|
return "json"
|
||||||
|
if suffix in {".ini", ".cfg", ".conf"} or name.endswith(".ini"):
|
||||||
|
return "ini"
|
||||||
if suffix == ".xml":
|
if suffix == ".xml":
|
||||||
return "xml"
|
return "xml"
|
||||||
|
|
||||||
# Special-ish INI-like formats
|
|
||||||
if suffix in {".ini", ".cfg"} or name.endswith(".ini"):
|
|
||||||
return "ini"
|
|
||||||
if suffix == ".repo":
|
|
||||||
return "ini"
|
|
||||||
|
|
||||||
# systemd units
|
|
||||||
if suffix in _SYSTEMD_SUFFIXES:
|
|
||||||
return "systemd"
|
|
||||||
|
|
||||||
# well-known filenames
|
|
||||||
if name == "main.cf":
|
|
||||||
return "postfix"
|
|
||||||
|
|
||||||
head = _read_head(path)
|
|
||||||
|
|
||||||
# Content sniffing
|
|
||||||
if _looks_like_systemd(head):
|
|
||||||
return "systemd"
|
|
||||||
|
|
||||||
# Ambiguous .conf/.cf defaults to INI-ish if no better match
|
|
||||||
if suffix in {".conf", ".cf"}:
|
|
||||||
if name == "main.cf":
|
|
||||||
return "postfix"
|
|
||||||
return "ini"
|
|
||||||
|
|
||||||
# Fallback: treat as INI-ish
|
# Fallback: treat as INI-ish
|
||||||
return "ini"
|
return "ini"
|
||||||
|
|
||||||
|
|
@ -276,7 +183,15 @@ def generate_ansible_yaml(
|
||||||
var_name = make_var_name(role_prefix, candidate.path)
|
var_name = make_var_name(role_prefix, candidate.path)
|
||||||
defaults[var_name] = candidate.items
|
defaults[var_name] = candidate.items
|
||||||
|
|
||||||
return dump_yaml(defaults, sort_keys=True)
|
return yaml.dump(
|
||||||
|
defaults,
|
||||||
|
Dumper=_TurtleDumper,
|
||||||
|
sort_keys=True,
|
||||||
|
default_flow_style=False,
|
||||||
|
allow_unicode=True,
|
||||||
|
explicit_start=True,
|
||||||
|
indent=2,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def generate_jinja2_template(
|
def generate_jinja2_template(
|
||||||
|
|
|
||||||
|
|
@ -8,9 +8,6 @@ from .toml import TomlHandler
|
||||||
from .yaml import YamlHandler
|
from .yaml import YamlHandler
|
||||||
from .xml import XmlHandler
|
from .xml import XmlHandler
|
||||||
|
|
||||||
from .postfix import PostfixMainHandler
|
|
||||||
from .systemd import SystemdUnitHandler
|
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
"BaseHandler",
|
"BaseHandler",
|
||||||
"DictLikeHandler",
|
"DictLikeHandler",
|
||||||
|
|
@ -19,6 +16,4 @@ __all__ = [
|
||||||
"TomlHandler",
|
"TomlHandler",
|
||||||
"YamlHandler",
|
"YamlHandler",
|
||||||
"XmlHandler",
|
"XmlHandler",
|
||||||
"PostfixMainHandler",
|
|
||||||
"SystemdUnitHandler",
|
|
||||||
]
|
]
|
||||||
|
|
|
||||||
|
|
@ -1,177 +0,0 @@
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
from pathlib import Path
|
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
from . import BaseHandler
|
|
||||||
|
|
||||||
|
|
||||||
class PostfixMainHandler(BaseHandler):
|
|
||||||
"""
|
|
||||||
Handler for Postfix main.cf style configuration.
|
|
||||||
|
|
||||||
Postfix main.cf is largely 'key = value' with:
|
|
||||||
- '#' comments
|
|
||||||
- continuation lines starting with whitespace (they continue the previous value)
|
|
||||||
"""
|
|
||||||
|
|
||||||
fmt = "postfix"
|
|
||||||
|
|
||||||
def parse(self, path: Path) -> dict[str, str]:
|
|
||||||
text = path.read_text(encoding="utf-8")
|
|
||||||
return self._parse_text_to_dict(text)
|
|
||||||
|
|
||||||
def _parse_text_to_dict(self, text: str) -> dict[str, str]:
|
|
||||||
lines = text.splitlines()
|
|
||||||
out: dict[str, str] = {}
|
|
||||||
i = 0
|
|
||||||
while i < len(lines):
|
|
||||||
line = lines[i]
|
|
||||||
stripped = line.strip()
|
|
||||||
if not stripped or stripped.startswith("#"):
|
|
||||||
i += 1
|
|
||||||
continue
|
|
||||||
|
|
||||||
if "=" not in line:
|
|
||||||
i += 1
|
|
||||||
continue
|
|
||||||
|
|
||||||
eq_index = line.find("=")
|
|
||||||
key = line[:eq_index].strip()
|
|
||||||
if not key:
|
|
||||||
i += 1
|
|
||||||
continue
|
|
||||||
|
|
||||||
# value + inline comment
|
|
||||||
after = line[eq_index + 1 :]
|
|
||||||
value_part, _comment = self._split_inline_comment(after, {"#"})
|
|
||||||
value = value_part.strip()
|
|
||||||
|
|
||||||
# collect continuation lines
|
|
||||||
j = i + 1
|
|
||||||
cont_parts: list[str] = []
|
|
||||||
while j < len(lines):
|
|
||||||
nxt = lines[j]
|
|
||||||
if not nxt:
|
|
||||||
break
|
|
||||||
if nxt.startswith((" ", "\t")):
|
|
||||||
if nxt.strip().startswith("#"):
|
|
||||||
# a commented continuation line - treat as a break
|
|
||||||
break
|
|
||||||
cont_parts.append(nxt.strip())
|
|
||||||
j += 1
|
|
||||||
continue
|
|
||||||
break
|
|
||||||
|
|
||||||
if cont_parts:
|
|
||||||
value = " ".join([value] + cont_parts).strip()
|
|
||||||
|
|
||||||
out[key] = value
|
|
||||||
i = j if cont_parts else i + 1
|
|
||||||
|
|
||||||
return out
|
|
||||||
|
|
||||||
def flatten(self, parsed: Any) -> list[tuple[tuple[str, ...], Any]]:
|
|
||||||
if not isinstance(parsed, dict):
|
|
||||||
raise TypeError("Postfix parse result must be a dict[str, str]")
|
|
||||||
items: list[tuple[tuple[str, ...], Any]] = []
|
|
||||||
for k, v in parsed.items():
|
|
||||||
items.append(((k,), v))
|
|
||||||
return items
|
|
||||||
|
|
||||||
def generate_jinja2_template(
|
|
||||||
self,
|
|
||||||
parsed: Any,
|
|
||||||
role_prefix: str,
|
|
||||||
original_text: str | None = None,
|
|
||||||
) -> str:
|
|
||||||
if original_text is None:
|
|
||||||
# Canonical render (lossy)
|
|
||||||
if not isinstance(parsed, dict):
|
|
||||||
raise TypeError("Postfix parse result must be a dict[str, str]")
|
|
||||||
lines: list[str] = []
|
|
||||||
for k, v in parsed.items():
|
|
||||||
var = self.make_var_name(role_prefix, (k,))
|
|
||||||
lines.append(f"{k} = {{{{ {var} }}}}")
|
|
||||||
return "\n".join(lines).rstrip() + "\n"
|
|
||||||
return self._generate_from_text(role_prefix, original_text)
|
|
||||||
|
|
||||||
def _generate_from_text(self, role_prefix: str, text: str) -> str:
|
|
||||||
lines = text.splitlines(keepends=True)
|
|
||||||
out_lines: list[str] = []
|
|
||||||
i = 0
|
|
||||||
while i < len(lines):
|
|
||||||
raw_line = lines[i]
|
|
||||||
content = raw_line.rstrip("\n")
|
|
||||||
newline = "\n" if raw_line.endswith("\n") else ""
|
|
||||||
|
|
||||||
stripped = content.strip()
|
|
||||||
if not stripped:
|
|
||||||
out_lines.append(raw_line)
|
|
||||||
i += 1
|
|
||||||
continue
|
|
||||||
if stripped.startswith("#"):
|
|
||||||
out_lines.append(raw_line)
|
|
||||||
i += 1
|
|
||||||
continue
|
|
||||||
|
|
||||||
if "=" not in content:
|
|
||||||
out_lines.append(raw_line)
|
|
||||||
i += 1
|
|
||||||
continue
|
|
||||||
|
|
||||||
eq_index = content.find("=")
|
|
||||||
before_eq = content[:eq_index]
|
|
||||||
after_eq = content[eq_index + 1 :]
|
|
||||||
|
|
||||||
key = before_eq.strip()
|
|
||||||
if not key:
|
|
||||||
out_lines.append(raw_line)
|
|
||||||
i += 1
|
|
||||||
continue
|
|
||||||
|
|
||||||
# whitespace after '='
|
|
||||||
value_ws_len = len(after_eq) - len(after_eq.lstrip(" \t"))
|
|
||||||
leading_ws = after_eq[:value_ws_len]
|
|
||||||
value_and_comment = after_eq[value_ws_len:]
|
|
||||||
|
|
||||||
value_part, comment_part = self._split_inline_comment(
|
|
||||||
value_and_comment, {"#"}
|
|
||||||
)
|
|
||||||
value = value_part.strip()
|
|
||||||
|
|
||||||
# collect continuation physical lines to skip
|
|
||||||
j = i + 1
|
|
||||||
cont_parts: list[str] = []
|
|
||||||
while j < len(lines):
|
|
||||||
nxt_raw = lines[j]
|
|
||||||
nxt = nxt_raw.rstrip("\n")
|
|
||||||
if (
|
|
||||||
nxt.startswith((" ", "\t"))
|
|
||||||
and nxt.strip()
|
|
||||||
and not nxt.strip().startswith("#")
|
|
||||||
):
|
|
||||||
cont_parts.append(nxt.strip())
|
|
||||||
j += 1
|
|
||||||
continue
|
|
||||||
break
|
|
||||||
|
|
||||||
if cont_parts:
|
|
||||||
value = " ".join([value] + cont_parts).strip()
|
|
||||||
|
|
||||||
var = self.make_var_name(role_prefix, (key,))
|
|
||||||
v = value
|
|
||||||
quoted = len(v) >= 2 and v[0] == v[-1] and v[0] in {'"', "'"}
|
|
||||||
if quoted:
|
|
||||||
replacement = (
|
|
||||||
f'{before_eq}={leading_ws}"{{{{ {var} }}}}"{comment_part}{newline}'
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
replacement = (
|
|
||||||
f"{before_eq}={leading_ws}{{{{ {var} }}}}{comment_part}{newline}"
|
|
||||||
)
|
|
||||||
|
|
||||||
out_lines.append(replacement)
|
|
||||||
i = j # skip continuation lines (if any)
|
|
||||||
|
|
||||||
return "".join(out_lines)
|
|
||||||
|
|
@ -1,177 +0,0 @@
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
from dataclasses import dataclass
|
|
||||||
from pathlib import Path
|
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
from . import BaseHandler
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class SystemdLine:
|
|
||||||
kind: str # 'blank' | 'comment' | 'section' | 'kv' | 'raw'
|
|
||||||
raw: str
|
|
||||||
lineno: int
|
|
||||||
section: str | None = None
|
|
||||||
key: str | None = None
|
|
||||||
value: str | None = None
|
|
||||||
comment: str = ""
|
|
||||||
before_eq: str = ""
|
|
||||||
leading_ws_after_eq: str = ""
|
|
||||||
occ_index: int | None = None
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class SystemdUnit:
|
|
||||||
lines: list[SystemdLine]
|
|
||||||
|
|
||||||
|
|
||||||
class SystemdUnitHandler(BaseHandler):
|
|
||||||
"""
|
|
||||||
Handler for systemd unit files.
|
|
||||||
|
|
||||||
unit files are INI-like, but keys may repeat (e.g. multiple ExecStart= lines).
|
|
||||||
We preserve repeated keys by indexing them when flattening and templating.
|
|
||||||
"""
|
|
||||||
|
|
||||||
fmt = "systemd"
|
|
||||||
|
|
||||||
def parse(self, path: Path) -> SystemdUnit:
|
|
||||||
text = path.read_text(encoding="utf-8")
|
|
||||||
return self._parse_text(text)
|
|
||||||
|
|
||||||
def _parse_text(self, text: str) -> SystemdUnit:
|
|
||||||
lines = text.splitlines(keepends=True)
|
|
||||||
out: list[SystemdLine] = []
|
|
||||||
current_section: str | None = None
|
|
||||||
# counts per section+key to assign occ_index
|
|
||||||
occ: dict[tuple[str, str], int] = {}
|
|
||||||
|
|
||||||
for lineno, raw_line in enumerate(lines, start=1):
|
|
||||||
content = raw_line.rstrip("\n")
|
|
||||||
stripped = content.strip()
|
|
||||||
|
|
||||||
if not stripped:
|
|
||||||
out.append(SystemdLine(kind="blank", raw=raw_line, lineno=lineno))
|
|
||||||
continue
|
|
||||||
|
|
||||||
if stripped.startswith(("#", ";")):
|
|
||||||
out.append(SystemdLine(kind="comment", raw=raw_line, lineno=lineno))
|
|
||||||
continue
|
|
||||||
|
|
||||||
# section header
|
|
||||||
if (
|
|
||||||
stripped.startswith("[")
|
|
||||||
and stripped.endswith("]")
|
|
||||||
and len(stripped) >= 2
|
|
||||||
):
|
|
||||||
sec = stripped[1:-1].strip()
|
|
||||||
current_section = sec
|
|
||||||
out.append(
|
|
||||||
SystemdLine(
|
|
||||||
kind="section", raw=raw_line, lineno=lineno, section=sec
|
|
||||||
)
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
|
|
||||||
if "=" not in content:
|
|
||||||
out.append(SystemdLine(kind="raw", raw=raw_line, lineno=lineno))
|
|
||||||
continue
|
|
||||||
|
|
||||||
eq_index = content.find("=")
|
|
||||||
before_eq = content[:eq_index]
|
|
||||||
after_eq = content[eq_index + 1 :]
|
|
||||||
|
|
||||||
key = before_eq.strip()
|
|
||||||
if not key:
|
|
||||||
out.append(SystemdLine(kind="raw", raw=raw_line, lineno=lineno))
|
|
||||||
continue
|
|
||||||
|
|
||||||
# whitespace after '='
|
|
||||||
value_ws_len = len(after_eq) - len(after_eq.lstrip(" \t"))
|
|
||||||
leading_ws = after_eq[:value_ws_len]
|
|
||||||
value_and_comment = after_eq[value_ws_len:]
|
|
||||||
|
|
||||||
value_part, comment = self._split_inline_comment(
|
|
||||||
value_and_comment, {"#", ";"}
|
|
||||||
)
|
|
||||||
value = value_part.strip()
|
|
||||||
|
|
||||||
sec = current_section or "DEFAULT"
|
|
||||||
k = (sec, key)
|
|
||||||
idx = occ.get(k, 0)
|
|
||||||
occ[k] = idx + 1
|
|
||||||
|
|
||||||
out.append(
|
|
||||||
SystemdLine(
|
|
||||||
kind="kv",
|
|
||||||
raw=raw_line,
|
|
||||||
lineno=lineno,
|
|
||||||
section=sec,
|
|
||||||
key=key,
|
|
||||||
value=value,
|
|
||||||
comment=comment,
|
|
||||||
before_eq=before_eq,
|
|
||||||
leading_ws_after_eq=leading_ws,
|
|
||||||
occ_index=idx,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
return SystemdUnit(lines=out)
|
|
||||||
|
|
||||||
def flatten(self, parsed: Any) -> list[tuple[tuple[str, ...], Any]]:
|
|
||||||
if not isinstance(parsed, SystemdUnit):
|
|
||||||
raise TypeError("systemd parse result must be a SystemdUnit")
|
|
||||||
|
|
||||||
# determine duplicates per (section,key)
|
|
||||||
counts: dict[tuple[str, str], int] = {}
|
|
||||||
for ln in parsed.lines:
|
|
||||||
if ln.kind == "kv" and ln.section and ln.key:
|
|
||||||
counts[(ln.section, ln.key)] = counts.get((ln.section, ln.key), 0) + 1
|
|
||||||
|
|
||||||
items: list[tuple[tuple[str, ...], Any]] = []
|
|
||||||
for ln in parsed.lines:
|
|
||||||
if ln.kind != "kv" or not ln.section or not ln.key:
|
|
||||||
continue
|
|
||||||
path: tuple[str, ...] = (ln.section, ln.key)
|
|
||||||
if counts.get((ln.section, ln.key), 0) > 1 and ln.occ_index is not None:
|
|
||||||
path = path + (str(ln.occ_index),)
|
|
||||||
items.append((path, ln.value or ""))
|
|
||||||
return items
|
|
||||||
|
|
||||||
def generate_jinja2_template(
|
|
||||||
self,
|
|
||||||
parsed: Any,
|
|
||||||
role_prefix: str,
|
|
||||||
original_text: str | None = None,
|
|
||||||
) -> str:
|
|
||||||
if not isinstance(parsed, SystemdUnit):
|
|
||||||
raise TypeError("systemd parse result must be a SystemdUnit")
|
|
||||||
# We template using parsed lines so we preserve original formatting/comments.
|
|
||||||
counts: dict[tuple[str, str], int] = {}
|
|
||||||
for ln in parsed.lines:
|
|
||||||
if ln.kind == "kv" and ln.section and ln.key:
|
|
||||||
counts[(ln.section, ln.key)] = counts.get((ln.section, ln.key), 0) + 1
|
|
||||||
|
|
||||||
out_lines: list[str] = []
|
|
||||||
for ln in parsed.lines:
|
|
||||||
if ln.kind != "kv" or not ln.section or not ln.key:
|
|
||||||
out_lines.append(ln.raw)
|
|
||||||
continue
|
|
||||||
|
|
||||||
path: tuple[str, ...] = (ln.section, ln.key)
|
|
||||||
if counts.get((ln.section, ln.key), 0) > 1 and ln.occ_index is not None:
|
|
||||||
path = path + (str(ln.occ_index),)
|
|
||||||
var = self.make_var_name(role_prefix, path)
|
|
||||||
|
|
||||||
v = (ln.value or "").strip()
|
|
||||||
quoted = len(v) >= 2 and v[0] == v[-1] and v[0] in {'"', "'"}
|
|
||||||
if quoted:
|
|
||||||
repl = f'{ln.before_eq}={ln.leading_ws_after_eq}"{{{{ {var} }}}}"{ln.comment}'
|
|
||||||
else:
|
|
||||||
repl = f"{ln.before_eq}={ln.leading_ws_after_eq}{{{{ {var} }}}}{ln.comment}"
|
|
||||||
|
|
||||||
newline = "\n" if ln.raw.endswith("\n") else ""
|
|
||||||
out_lines.append(repl + newline)
|
|
||||||
|
|
||||||
return "".join(out_lines)
|
|
||||||
|
|
@ -1,771 +0,0 @@
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
"""Directory / multi-file processing.
|
|
||||||
|
|
||||||
Folder mode:
|
|
||||||
* discover supported config files under a directory (optionally recursively)
|
|
||||||
* group them by detected format
|
|
||||||
* generate one *union* Jinja2 template per format
|
|
||||||
* generate a single defaults YAML containing a list of per-file values
|
|
||||||
|
|
||||||
The union templates use `{% if ... is defined %}` blocks for paths that are
|
|
||||||
missing in some input files ("option B"), so missing keys/sections/elements are
|
|
||||||
omitted rather than rendered as empty values.
|
|
||||||
|
|
||||||
Notes:
|
|
||||||
* If the folder contains *multiple* formats, we generate one template per
|
|
||||||
format (e.g. config.yaml.j2, config.xml.j2) and emit one list variable per
|
|
||||||
format in the defaults YAML.
|
|
||||||
* JSON union templates are emitted using a simple `{{ data | tojson }}`
|
|
||||||
approach to avoid comma-management complexity for optional keys.
|
|
||||||
"""
|
|
||||||
|
|
||||||
from collections import Counter, defaultdict
|
|
||||||
from copy import deepcopy
|
|
||||||
import configparser
|
|
||||||
from dataclasses import dataclass
|
|
||||||
from pathlib import Path
|
|
||||||
from typing import Any, Iterable
|
|
||||||
import xml.etree.ElementTree as ET # nosec
|
|
||||||
|
|
||||||
from .core import dump_yaml, flatten_config, make_var_name, parse_config
|
|
||||||
from .handlers.xml import XmlHandler
|
|
||||||
|
|
||||||
|
|
||||||
SUPPORTED_SUFFIXES: dict[str, set[str]] = {
|
|
||||||
"toml": {".toml"},
|
|
||||||
"yaml": {".yaml", ".yml"},
|
|
||||||
"json": {".json"},
|
|
||||||
"ini": {".ini", ".cfg", ".conf", ".repo"},
|
|
||||||
"xml": {".xml"},
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def is_supported_file(path: Path) -> bool:
|
|
||||||
if not path.is_file():
|
|
||||||
return False
|
|
||||||
suffix = path.suffix.lower()
|
|
||||||
for exts in SUPPORTED_SUFFIXES.values():
|
|
||||||
if suffix in exts:
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def iter_supported_files(root: Path, recursive: bool) -> list[Path]:
|
|
||||||
if not root.exists():
|
|
||||||
raise FileNotFoundError(str(root))
|
|
||||||
if root.is_file():
|
|
||||||
return [root] if is_supported_file(root) else []
|
|
||||||
if not root.is_dir():
|
|
||||||
return []
|
|
||||||
|
|
||||||
it = root.rglob("*") if recursive else root.glob("*")
|
|
||||||
files = [p for p in it if is_supported_file(p)]
|
|
||||||
files.sort()
|
|
||||||
return files
|
|
||||||
|
|
||||||
|
|
||||||
def defined_var_name(role_prefix: str, path: Iterable[str]) -> str:
|
|
||||||
"""Presence marker var for a container path."""
|
|
||||||
return make_var_name(role_prefix, ("defined",) + tuple(path))
|
|
||||||
|
|
||||||
|
|
||||||
def _is_scalar(obj: Any) -> bool:
|
|
||||||
return not isinstance(obj, (dict, list))
|
|
||||||
|
|
||||||
|
|
||||||
def _merge_union(a: Any, b: Any) -> Any:
|
|
||||||
"""Merge two parsed objects into a union structure.
|
|
||||||
|
|
||||||
- dicts: union keys, recursive
|
|
||||||
- lists: max length, merge by index
|
|
||||||
- scalars: keep the first (as a representative sample)
|
|
||||||
"""
|
|
||||||
if isinstance(a, dict) and isinstance(b, dict):
|
|
||||||
out: dict[str, Any] = {}
|
|
||||||
for k in b.keys():
|
|
||||||
if k not in out and k not in a:
|
|
||||||
# handled later
|
|
||||||
pass
|
|
||||||
# preserve insertion order roughly: keys from a, then new keys from b
|
|
||||||
for k in a.keys():
|
|
||||||
out[k] = _merge_union(a.get(k), b.get(k)) if k in b else a.get(k)
|
|
||||||
for k in b.keys():
|
|
||||||
if k not in out:
|
|
||||||
out[k] = b.get(k)
|
|
||||||
return out
|
|
||||||
if isinstance(a, list) and isinstance(b, list):
|
|
||||||
n = max(len(a), len(b))
|
|
||||||
out_list: list[Any] = []
|
|
||||||
for i in range(n):
|
|
||||||
if i < len(a) and i < len(b):
|
|
||||||
out_list.append(_merge_union(a[i], b[i]))
|
|
||||||
elif i < len(a):
|
|
||||||
out_list.append(a[i])
|
|
||||||
else:
|
|
||||||
out_list.append(b[i])
|
|
||||||
return out_list
|
|
||||||
# different types or scalar
|
|
||||||
return a if a is not None else b
|
|
||||||
|
|
||||||
|
|
||||||
def _collect_dict_like_paths(
|
|
||||||
obj: Any,
|
|
||||||
) -> tuple[set[tuple[str, ...]], set[tuple[str, ...]]]:
|
|
||||||
"""Return (container_paths, leaf_paths) for dict/list structures."""
|
|
||||||
containers: set[tuple[str, ...]] = set()
|
|
||||||
leaves: set[tuple[str, ...]] = set()
|
|
||||||
|
|
||||||
def walk(o: Any, path: tuple[str, ...]) -> None:
|
|
||||||
if isinstance(o, dict):
|
|
||||||
for k, v in o.items():
|
|
||||||
kp = path + (str(k),)
|
|
||||||
containers.add(kp)
|
|
||||||
walk(v, kp)
|
|
||||||
return
|
|
||||||
if isinstance(o, list):
|
|
||||||
for i, v in enumerate(o):
|
|
||||||
ip = path + (str(i),)
|
|
||||||
containers.add(ip)
|
|
||||||
walk(v, ip)
|
|
||||||
return
|
|
||||||
leaves.add(path)
|
|
||||||
|
|
||||||
walk(obj, ())
|
|
||||||
return containers, leaves
|
|
||||||
|
|
||||||
|
|
||||||
def _yaml_scalar_placeholder(
|
|
||||||
role_prefix: str, path: tuple[str, ...], sample: Any
|
|
||||||
) -> str:
|
|
||||||
var = make_var_name(role_prefix, path)
|
|
||||||
if isinstance(sample, str):
|
|
||||||
return f'"{{{{ {var} }}}}"'
|
|
||||||
return f"{{{{ {var} }}}}"
|
|
||||||
|
|
||||||
|
|
||||||
def _yaml_render_union(
|
|
||||||
role_prefix: str,
|
|
||||||
union_obj: Any,
|
|
||||||
optional_containers: set[tuple[str, ...]],
|
|
||||||
indent: int = 0,
|
|
||||||
path: tuple[str, ...] = (),
|
|
||||||
in_list: bool = False,
|
|
||||||
) -> list[str]:
|
|
||||||
"""Render YAML for union_obj with conditionals for optional containers."""
|
|
||||||
lines: list[str] = []
|
|
||||||
ind = " " * indent
|
|
||||||
|
|
||||||
if isinstance(union_obj, dict):
|
|
||||||
for key, val in union_obj.items():
|
|
||||||
key_path = path + (str(key),)
|
|
||||||
cond_var = (
|
|
||||||
defined_var_name(role_prefix, key_path)
|
|
||||||
if key_path in optional_containers
|
|
||||||
else None
|
|
||||||
)
|
|
||||||
|
|
||||||
if _is_scalar(val) or val is None:
|
|
||||||
value = _yaml_scalar_placeholder(role_prefix, key_path, val)
|
|
||||||
if cond_var:
|
|
||||||
lines.append(f"{ind}{{% if {cond_var} is defined %}}")
|
|
||||||
lines.append(f"{ind}{key}: {value}")
|
|
||||||
if cond_var:
|
|
||||||
lines.append(f"{ind}{{% endif %}}")
|
|
||||||
else:
|
|
||||||
if cond_var:
|
|
||||||
lines.append(f"{ind}{{% if {cond_var} is defined %}}")
|
|
||||||
lines.append(f"{ind}{key}:")
|
|
||||||
lines.extend(
|
|
||||||
_yaml_render_union(
|
|
||||||
role_prefix,
|
|
||||||
val,
|
|
||||||
optional_containers,
|
|
||||||
indent=indent + 2,
|
|
||||||
path=key_path,
|
|
||||||
in_list=False,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
if cond_var:
|
|
||||||
lines.append(f"{ind}{{% endif %}}")
|
|
||||||
return lines
|
|
||||||
|
|
||||||
if isinstance(union_obj, list):
|
|
||||||
for i, item in enumerate(union_obj):
|
|
||||||
item_path = path + (str(i),)
|
|
||||||
cond_var = (
|
|
||||||
defined_var_name(role_prefix, item_path)
|
|
||||||
if item_path in optional_containers
|
|
||||||
else None
|
|
||||||
)
|
|
||||||
|
|
||||||
if _is_scalar(item) or item is None:
|
|
||||||
value = _yaml_scalar_placeholder(role_prefix, item_path, item)
|
|
||||||
if cond_var:
|
|
||||||
lines.append(f"{ind}{{% if {cond_var} is defined %}}")
|
|
||||||
lines.append(f"{ind}- {value}")
|
|
||||||
if cond_var:
|
|
||||||
lines.append(f"{ind}{{% endif %}}")
|
|
||||||
elif isinstance(item, dict):
|
|
||||||
if cond_var:
|
|
||||||
lines.append(f"{ind}{{% if {cond_var} is defined %}}")
|
|
||||||
# First line: list marker with first key if possible
|
|
||||||
first = True
|
|
||||||
for k, v in item.items():
|
|
||||||
kp = item_path + (str(k),)
|
|
||||||
k_cond = (
|
|
||||||
defined_var_name(role_prefix, kp)
|
|
||||||
if kp in optional_containers
|
|
||||||
else None
|
|
||||||
)
|
|
||||||
if _is_scalar(v) or v is None:
|
|
||||||
value = _yaml_scalar_placeholder(role_prefix, kp, v)
|
|
||||||
if first:
|
|
||||||
if k_cond:
|
|
||||||
lines.append(f"{ind}{{% if {k_cond} is defined %}}")
|
|
||||||
lines.append(f"{ind}- {k}: {value}")
|
|
||||||
if k_cond:
|
|
||||||
lines.append(f"{ind}{{% endif %}}")
|
|
||||||
first = False
|
|
||||||
else:
|
|
||||||
if k_cond:
|
|
||||||
lines.append(f"{ind} {{% if {k_cond} is defined %}}")
|
|
||||||
lines.append(f"{ind} {k}: {value}")
|
|
||||||
if k_cond:
|
|
||||||
lines.append(f"{ind} {{% endif %}}")
|
|
||||||
else:
|
|
||||||
# nested
|
|
||||||
if first:
|
|
||||||
if k_cond:
|
|
||||||
lines.append(f"{ind}{{% if {k_cond} is defined %}}")
|
|
||||||
lines.append(f"{ind}- {k}:")
|
|
||||||
lines.extend(
|
|
||||||
_yaml_render_union(
|
|
||||||
role_prefix,
|
|
||||||
v,
|
|
||||||
optional_containers,
|
|
||||||
indent=indent + 4,
|
|
||||||
path=kp,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
if k_cond:
|
|
||||||
lines.append(f"{ind}{{% endif %}}")
|
|
||||||
first = False
|
|
||||||
else:
|
|
||||||
if k_cond:
|
|
||||||
lines.append(f"{ind} {{% if {k_cond} is defined %}}")
|
|
||||||
lines.append(f"{ind} {k}:")
|
|
||||||
lines.extend(
|
|
||||||
_yaml_render_union(
|
|
||||||
role_prefix,
|
|
||||||
v,
|
|
||||||
optional_containers,
|
|
||||||
indent=indent + 4,
|
|
||||||
path=kp,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
if k_cond:
|
|
||||||
lines.append(f"{ind} {{% endif %}}")
|
|
||||||
if first:
|
|
||||||
# empty dict item
|
|
||||||
lines.append(f"{ind}- {{}}")
|
|
||||||
if cond_var:
|
|
||||||
lines.append(f"{ind}{{% endif %}}")
|
|
||||||
else:
|
|
||||||
# list of lists - emit as scalar-ish fallback
|
|
||||||
value = f"{{{{ {make_var_name(role_prefix, item_path)} }}}}"
|
|
||||||
if cond_var:
|
|
||||||
lines.append(f"{ind}{{% if {cond_var} is defined %}}")
|
|
||||||
lines.append(f"{ind}- {value}")
|
|
||||||
if cond_var:
|
|
||||||
lines.append(f"{ind}{{% endif %}}")
|
|
||||||
return lines
|
|
||||||
|
|
||||||
# scalar at root
|
|
||||||
value = _yaml_scalar_placeholder(role_prefix, path, union_obj)
|
|
||||||
if in_list:
|
|
||||||
lines.append(f"{ind}- {value}")
|
|
||||||
else:
|
|
||||||
lines.append(f"{ind}{value}")
|
|
||||||
return lines
|
|
||||||
|
|
||||||
|
|
||||||
def _toml_render_union(
|
|
||||||
role_prefix: str,
|
|
||||||
union_obj: dict[str, Any],
|
|
||||||
optional_containers: set[tuple[str, ...]],
|
|
||||||
) -> str:
|
|
||||||
"""Render TOML union template with optional tables/keys."""
|
|
||||||
lines: list[str] = []
|
|
||||||
|
|
||||||
def emit_kv(path: tuple[str, ...], key: str, value: Any) -> None:
|
|
||||||
var_name = make_var_name(role_prefix, path + (key,))
|
|
||||||
cond = (
|
|
||||||
defined_var_name(role_prefix, path + (key,))
|
|
||||||
if (path + (key,)) in optional_containers
|
|
||||||
else None
|
|
||||||
)
|
|
||||||
if cond:
|
|
||||||
lines.append(f"{{% if {cond} is defined %}}")
|
|
||||||
if isinstance(value, str):
|
|
||||||
lines.append(f'{key} = "{{{{ {var_name} }}}}"')
|
|
||||||
elif isinstance(value, bool):
|
|
||||||
lines.append(f"{key} = {{{{ {var_name} | lower }}}}")
|
|
||||||
else:
|
|
||||||
lines.append(f"{key} = {{{{ {var_name} }}}}")
|
|
||||||
if cond:
|
|
||||||
lines.append("{% endif %}")
|
|
||||||
|
|
||||||
def walk(obj: dict[str, Any], path: tuple[str, ...]) -> None:
|
|
||||||
if path:
|
|
||||||
cond = (
|
|
||||||
defined_var_name(role_prefix, path)
|
|
||||||
if path in optional_containers
|
|
||||||
else None
|
|
||||||
)
|
|
||||||
if cond:
|
|
||||||
lines.append(f"{{% if {cond} is defined %}}")
|
|
||||||
lines.append(f"[{'.'.join(path)}]")
|
|
||||||
|
|
||||||
scalar_items = {k: v for k, v in obj.items() if not isinstance(v, dict)}
|
|
||||||
nested_items = {k: v for k, v in obj.items() if isinstance(v, dict)}
|
|
||||||
|
|
||||||
for k, v in scalar_items.items():
|
|
||||||
emit_kv(path, str(k), v)
|
|
||||||
|
|
||||||
if scalar_items:
|
|
||||||
lines.append("")
|
|
||||||
|
|
||||||
for k, v in nested_items.items():
|
|
||||||
walk(v, path + (str(k),))
|
|
||||||
|
|
||||||
if path and (path in optional_containers):
|
|
||||||
lines.append("{% endif %}")
|
|
||||||
lines.append("")
|
|
||||||
|
|
||||||
# root scalars
|
|
||||||
root_scalars = {k: v for k, v in union_obj.items() if not isinstance(v, dict)}
|
|
||||||
for k, v in root_scalars.items():
|
|
||||||
emit_kv((), str(k), v)
|
|
||||||
if root_scalars:
|
|
||||||
lines.append("")
|
|
||||||
for k, v in union_obj.items():
|
|
||||||
if isinstance(v, dict):
|
|
||||||
walk(v, (str(k),))
|
|
||||||
|
|
||||||
return "\n".join(lines).rstrip() + "\n"
|
|
||||||
|
|
||||||
|
|
||||||
def _ini_union_and_presence(
|
|
||||||
parsers: list[configparser.ConfigParser],
|
|
||||||
) -> tuple[configparser.ConfigParser, set[str], set[tuple[str, str]]]:
|
|
||||||
"""Build a union ConfigParser and compute optional sections/keys."""
|
|
||||||
union = configparser.ConfigParser()
|
|
||||||
union.optionxform = str # noqa
|
|
||||||
|
|
||||||
section_sets: list[set[str]] = []
|
|
||||||
key_sets: list[set[tuple[str, str]]] = []
|
|
||||||
|
|
||||||
for p in parsers:
|
|
||||||
sections = set(p.sections())
|
|
||||||
section_sets.append(sections)
|
|
||||||
keys: set[tuple[str, str]] = set()
|
|
||||||
for s in p.sections():
|
|
||||||
for k, _ in p.items(s, raw=True):
|
|
||||||
keys.add((s, k))
|
|
||||||
key_sets.append(keys)
|
|
||||||
|
|
||||||
for s in p.sections():
|
|
||||||
if not union.has_section(s):
|
|
||||||
union.add_section(s)
|
|
||||||
for k, v in p.items(s, raw=True):
|
|
||||||
if not union.has_option(s, k):
|
|
||||||
union.set(s, k, v)
|
|
||||||
|
|
||||||
if not section_sets:
|
|
||||||
return union, set(), set()
|
|
||||||
|
|
||||||
sec_union = set().union(*section_sets)
|
|
||||||
sec_inter = set.intersection(*section_sets)
|
|
||||||
optional_sections = sec_union - sec_inter
|
|
||||||
|
|
||||||
key_union = set().union(*key_sets)
|
|
||||||
key_inter = set.intersection(*key_sets)
|
|
||||||
optional_keys = key_union - key_inter
|
|
||||||
|
|
||||||
return union, optional_sections, optional_keys
|
|
||||||
|
|
||||||
|
|
||||||
def _ini_render_union(
|
|
||||||
role_prefix: str,
|
|
||||||
union: configparser.ConfigParser,
|
|
||||||
optional_sections: set[str],
|
|
||||||
optional_keys: set[tuple[str, str]],
|
|
||||||
) -> str:
|
|
||||||
lines: list[str] = []
|
|
||||||
for section in union.sections():
|
|
||||||
sec_cond = (
|
|
||||||
defined_var_name(role_prefix, (section,))
|
|
||||||
if section in optional_sections
|
|
||||||
else None
|
|
||||||
)
|
|
||||||
if sec_cond:
|
|
||||||
lines.append(f"{{% if {sec_cond} is defined %}}")
|
|
||||||
lines.append(f"[{section}]")
|
|
||||||
for key, raw_val in union.items(section, raw=True):
|
|
||||||
path = (section, key)
|
|
||||||
var = make_var_name(role_prefix, path)
|
|
||||||
key_cond = (
|
|
||||||
defined_var_name(role_prefix, path) if path in optional_keys else None
|
|
||||||
)
|
|
||||||
v = (raw_val or "").strip()
|
|
||||||
quoted = len(v) >= 2 and v[0] == v[-1] and v[0] in {'"', "'"}
|
|
||||||
if key_cond:
|
|
||||||
lines.append(f"{{% if {key_cond} is defined %}}")
|
|
||||||
if quoted:
|
|
||||||
lines.append(f'{key} = "{{{{ {var} }}}}"')
|
|
||||||
else:
|
|
||||||
lines.append(f"{key} = {{{{ {var} }}}}")
|
|
||||||
if key_cond:
|
|
||||||
lines.append("{% endif %}")
|
|
||||||
lines.append("")
|
|
||||||
if sec_cond:
|
|
||||||
lines.append("{% endif %}")
|
|
||||||
lines.append("")
|
|
||||||
return "\n".join(lines).rstrip() + "\n"
|
|
||||||
|
|
||||||
|
|
||||||
def _xml_collect_paths(
|
|
||||||
root: ET.Element,
|
|
||||||
) -> tuple[set[tuple[str, ...]], set[tuple[str, ...]]]:
|
|
||||||
"""Return (element_paths, leaf_paths) based on XmlHandler's flatten rules."""
|
|
||||||
element_paths: set[tuple[str, ...]] = set()
|
|
||||||
leaf_paths: set[tuple[str, ...]] = set()
|
|
||||||
|
|
||||||
def walk(elem: ET.Element, path: tuple[str, ...]) -> None:
|
|
||||||
element_paths.add(path)
|
|
||||||
|
|
||||||
for attr in elem.attrib:
|
|
||||||
leaf_paths.add(path + (f"@{attr}",))
|
|
||||||
|
|
||||||
children = [c for c in list(elem) if isinstance(c.tag, str)]
|
|
||||||
text = (elem.text or "").strip()
|
|
||||||
if text:
|
|
||||||
if not elem.attrib and not children:
|
|
||||||
leaf_paths.add(path)
|
|
||||||
else:
|
|
||||||
leaf_paths.add(path + ("value",))
|
|
||||||
|
|
||||||
counts = Counter(child.tag for child in children)
|
|
||||||
index_counters: dict[str, int] = defaultdict(int)
|
|
||||||
for child in children:
|
|
||||||
tag = child.tag
|
|
||||||
if counts[tag] > 1:
|
|
||||||
idx = index_counters[tag]
|
|
||||||
index_counters[tag] += 1
|
|
||||||
child_path = path + (tag, str(idx))
|
|
||||||
else:
|
|
||||||
child_path = path + (tag,)
|
|
||||||
walk(child, child_path)
|
|
||||||
|
|
||||||
walk(root, ())
|
|
||||||
return element_paths, leaf_paths
|
|
||||||
|
|
||||||
|
|
||||||
def _xml_merge_union(base: ET.Element, other: ET.Element) -> None:
|
|
||||||
"""Merge other into base in-place."""
|
|
||||||
# attributes
|
|
||||||
for k, v in other.attrib.items():
|
|
||||||
if k not in base.attrib:
|
|
||||||
base.set(k, v)
|
|
||||||
|
|
||||||
# text
|
|
||||||
if (base.text is None or not base.text.strip()) and (
|
|
||||||
other.text and other.text.strip()
|
|
||||||
):
|
|
||||||
base.text = other.text
|
|
||||||
|
|
||||||
# children
|
|
||||||
base_children = [c for c in list(base) if isinstance(c.tag, str)]
|
|
||||||
other_children = [c for c in list(other) if isinstance(c.tag, str)]
|
|
||||||
|
|
||||||
base_by_tag: dict[str, list[ET.Element]] = defaultdict(list)
|
|
||||||
other_by_tag: dict[str, list[ET.Element]] = defaultdict(list)
|
|
||||||
for c in base_children:
|
|
||||||
base_by_tag[c.tag].append(c)
|
|
||||||
for c in other_children:
|
|
||||||
other_by_tag[c.tag].append(c)
|
|
||||||
|
|
||||||
# preserve base ordering; append new tags at end
|
|
||||||
seen_tags = set(base_by_tag.keys())
|
|
||||||
tag_order = [c.tag for c in base_children if isinstance(c.tag, str)]
|
|
||||||
for t in other_by_tag.keys():
|
|
||||||
if t not in seen_tags:
|
|
||||||
tag_order.append(t)
|
|
||||||
|
|
||||||
# unique tags in order
|
|
||||||
ordered_tags: list[str] = []
|
|
||||||
for t in tag_order:
|
|
||||||
if t not in ordered_tags:
|
|
||||||
ordered_tags.append(t)
|
|
||||||
|
|
||||||
for tag in ordered_tags:
|
|
||||||
b_list = base_by_tag.get(tag, [])
|
|
||||||
o_list = other_by_tag.get(tag, [])
|
|
||||||
n = max(len(b_list), len(o_list))
|
|
||||||
for i in range(n):
|
|
||||||
if i < len(b_list) and i < len(o_list):
|
|
||||||
_xml_merge_union(b_list[i], o_list[i])
|
|
||||||
elif i < len(o_list):
|
|
||||||
base.append(deepcopy(o_list[i]))
|
|
||||||
|
|
||||||
|
|
||||||
def _xml_apply_jinja_union(
|
|
||||||
role_prefix: str,
|
|
||||||
root: ET.Element,
|
|
||||||
optional_elements: set[tuple[str, ...]],
|
|
||||||
) -> str:
|
|
||||||
"""Generate XML template with optional element conditionals."""
|
|
||||||
handler = XmlHandler()
|
|
||||||
|
|
||||||
def wrap_optional_children(elem: ET.Element, path: tuple[str, ...]) -> None:
|
|
||||||
children = [c for c in list(elem) if isinstance(c.tag, str)]
|
|
||||||
if not children:
|
|
||||||
return
|
|
||||||
|
|
||||||
# compute indexed paths the same way as flatten
|
|
||||||
counts = Counter(child.tag for child in children)
|
|
||||||
index_counters: dict[str, int] = defaultdict(int)
|
|
||||||
new_children: list[ET.Element] = []
|
|
||||||
for child in children:
|
|
||||||
tag = child.tag
|
|
||||||
if counts[tag] > 1:
|
|
||||||
idx = index_counters[tag]
|
|
||||||
index_counters[tag] += 1
|
|
||||||
child_path = path + (tag, str(idx))
|
|
||||||
else:
|
|
||||||
child_path = path + (tag,)
|
|
||||||
|
|
||||||
if child_path in optional_elements:
|
|
||||||
cond = defined_var_name(role_prefix, child_path)
|
|
||||||
new_children.append(ET.Comment(f"IF:{cond}"))
|
|
||||||
new_children.append(child)
|
|
||||||
new_children.append(ET.Comment(f"ENDIF:{cond}"))
|
|
||||||
else:
|
|
||||||
new_children.append(child)
|
|
||||||
|
|
||||||
wrap_optional_children(child, child_path)
|
|
||||||
|
|
||||||
# replace
|
|
||||||
for c in children:
|
|
||||||
elem.remove(c)
|
|
||||||
for c in new_children:
|
|
||||||
elem.append(c)
|
|
||||||
|
|
||||||
# Wrap optionals before applying scalar substitution so markers stay
|
|
||||||
wrap_optional_children(root, ())
|
|
||||||
handler._apply_jinja_to_xml_tree(role_prefix, root, loop_candidates=None) # type: ignore[attr-defined]
|
|
||||||
|
|
||||||
indent = getattr(ET, "indent", None)
|
|
||||||
if indent is not None:
|
|
||||||
indent(root, space=" ") # type: ignore[arg-type]
|
|
||||||
|
|
||||||
xml_body = ET.tostring(root, encoding="unicode")
|
|
||||||
# Reuse handler's conditional-marker replacement
|
|
||||||
xml_body = handler._insert_xml_loops(xml_body, role_prefix, [], root) # type: ignore[attr-defined]
|
|
||||||
return xml_body
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class FormatOutput:
|
|
||||||
fmt: str
|
|
||||||
template: str
|
|
||||||
list_var: str
|
|
||||||
items: list[dict[str, Any]]
|
|
||||||
|
|
||||||
|
|
||||||
FOLDER_SUPPORTED_FORMATS: set[str] = {"json", "yaml", "toml", "ini", "xml"}
|
|
||||||
|
|
||||||
|
|
||||||
def process_directory(
|
|
||||||
root: Path, recursive: bool, role_prefix: str
|
|
||||||
) -> tuple[str, list[FormatOutput]]:
|
|
||||||
"""Process a directory (or single file) into defaults YAML + template(s)."""
|
|
||||||
files = iter_supported_files(root, recursive)
|
|
||||||
if not files:
|
|
||||||
raise ValueError(f"No supported config files found under: {root}")
|
|
||||||
|
|
||||||
# Parse and group by format
|
|
||||||
grouped: dict[str, list[tuple[Path, Any]]] = defaultdict(list)
|
|
||||||
for p in files:
|
|
||||||
fmt, parsed = parse_config(p, None)
|
|
||||||
if fmt not in FOLDER_SUPPORTED_FORMATS:
|
|
||||||
# Directory mode only supports a subset of formats for now.
|
|
||||||
continue
|
|
||||||
grouped[fmt].append((p, parsed))
|
|
||||||
|
|
||||||
if not grouped:
|
|
||||||
raise ValueError(f"No folder-supported config files found under: {root}")
|
|
||||||
|
|
||||||
multiple_formats = len(grouped) > 1
|
|
||||||
outputs: list[FormatOutput] = []
|
|
||||||
|
|
||||||
for fmt, entries in sorted(grouped.items()):
|
|
||||||
rel_ids = [
|
|
||||||
e[0].relative_to(root).as_posix() if root.is_dir() else e[0].name
|
|
||||||
for e in entries
|
|
||||||
]
|
|
||||||
parsed_list = [e[1] for e in entries]
|
|
||||||
|
|
||||||
# JSON: simplest robust union template
|
|
||||||
if fmt == "json":
|
|
||||||
list_var = (
|
|
||||||
f"{role_prefix}_{fmt}_items"
|
|
||||||
if multiple_formats
|
|
||||||
else f"{role_prefix}_items"
|
|
||||||
)
|
|
||||||
template = "{{ data | tojson(indent=2) }}\n"
|
|
||||||
items: list[dict[str, Any]] = []
|
|
||||||
for rid, parsed in zip(rel_ids, parsed_list):
|
|
||||||
items.append({"id": rid, "data": parsed})
|
|
||||||
outputs.append(
|
|
||||||
FormatOutput(fmt=fmt, template=template, list_var=list_var, items=items)
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Dict-like formats (YAML/TOML) use union merge on parsed objects
|
|
||||||
if fmt in {"yaml", "toml"}:
|
|
||||||
union_obj: Any = deepcopy(parsed_list[0])
|
|
||||||
for p in parsed_list[1:]:
|
|
||||||
union_obj = _merge_union(union_obj, p)
|
|
||||||
|
|
||||||
container_sets: list[set[tuple[str, ...]]] = []
|
|
||||||
leaf_sets: list[set[tuple[str, ...]]] = []
|
|
||||||
for p in parsed_list:
|
|
||||||
containers, leaves = _collect_dict_like_paths(p)
|
|
||||||
container_sets.append(containers)
|
|
||||||
leaf_sets.append(leaves)
|
|
||||||
|
|
||||||
cont_union = set().union(*container_sets)
|
|
||||||
cont_inter = set.intersection(*container_sets) if container_sets else set()
|
|
||||||
optional_containers = cont_union - cont_inter
|
|
||||||
|
|
||||||
list_var = (
|
|
||||||
f"{role_prefix}_{fmt}_items"
|
|
||||||
if multiple_formats
|
|
||||||
else f"{role_prefix}_items"
|
|
||||||
)
|
|
||||||
|
|
||||||
if fmt == "yaml":
|
|
||||||
template_lines = _yaml_render_union(
|
|
||||||
role_prefix, union_obj, optional_containers
|
|
||||||
)
|
|
||||||
template = "\n".join(template_lines).rstrip() + "\n"
|
|
||||||
else:
|
|
||||||
if not isinstance(union_obj, dict):
|
|
||||||
raise TypeError("TOML union must be a dict")
|
|
||||||
template = _toml_render_union(
|
|
||||||
role_prefix, union_obj, optional_containers
|
|
||||||
)
|
|
||||||
|
|
||||||
# Build per-file item dicts (leaf vars + presence markers)
|
|
||||||
items: list[dict[str, Any]] = []
|
|
||||||
for rid, parsed, containers in zip(rel_ids, parsed_list, container_sets):
|
|
||||||
item: dict[str, Any] = {"id": rid}
|
|
||||||
flat = flatten_config(fmt, parsed, loop_candidates=None)
|
|
||||||
for path, value in flat:
|
|
||||||
item[make_var_name(role_prefix, path)] = value
|
|
||||||
for cpath in optional_containers:
|
|
||||||
if cpath in containers:
|
|
||||||
item[defined_var_name(role_prefix, cpath)] = True
|
|
||||||
items.append(item)
|
|
||||||
|
|
||||||
outputs.append(
|
|
||||||
FormatOutput(fmt=fmt, template=template, list_var=list_var, items=items)
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
|
|
||||||
if fmt == "ini":
|
|
||||||
parsers = parsed_list
|
|
||||||
if not all(isinstance(p, configparser.ConfigParser) for p in parsers):
|
|
||||||
raise TypeError("INI parse must produce ConfigParser")
|
|
||||||
union, opt_sections, opt_keys = _ini_union_and_presence(parsers) # type: ignore[arg-type]
|
|
||||||
|
|
||||||
list_var = (
|
|
||||||
f"{role_prefix}_{fmt}_items"
|
|
||||||
if multiple_formats
|
|
||||||
else f"{role_prefix}_items"
|
|
||||||
)
|
|
||||||
template = _ini_render_union(role_prefix, union, opt_sections, opt_keys)
|
|
||||||
|
|
||||||
items: list[dict[str, Any]] = []
|
|
||||||
for rid, parser in zip(rel_ids, parsers): # type: ignore[arg-type]
|
|
||||||
item: dict[str, Any] = {"id": rid}
|
|
||||||
flat = flatten_config(fmt, parser, loop_candidates=None)
|
|
||||||
for path, value in flat:
|
|
||||||
item[make_var_name(role_prefix, path)] = value
|
|
||||||
# section presence
|
|
||||||
for sec in opt_sections:
|
|
||||||
if parser.has_section(sec):
|
|
||||||
item[defined_var_name(role_prefix, (sec,))] = True
|
|
||||||
# key presence
|
|
||||||
for sec, key in opt_keys:
|
|
||||||
if parser.has_option(sec, key):
|
|
||||||
item[defined_var_name(role_prefix, (sec, key))] = True
|
|
||||||
items.append(item)
|
|
||||||
|
|
||||||
outputs.append(
|
|
||||||
FormatOutput(fmt=fmt, template=template, list_var=list_var, items=items)
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
|
|
||||||
if fmt == "xml":
|
|
||||||
if not all(isinstance(p, ET.Element) for p in parsed_list):
|
|
||||||
raise TypeError("XML parse must produce Element")
|
|
||||||
union_root = deepcopy(parsed_list[0])
|
|
||||||
for p in parsed_list[1:]:
|
|
||||||
_xml_merge_union(union_root, p)
|
|
||||||
|
|
||||||
elem_sets: list[set[tuple[str, ...]]] = []
|
|
||||||
for p in parsed_list:
|
|
||||||
elem_paths, _ = _xml_collect_paths(p)
|
|
||||||
elem_sets.append(elem_paths)
|
|
||||||
|
|
||||||
elem_union = set().union(*elem_sets)
|
|
||||||
elem_inter = set.intersection(*elem_sets) if elem_sets else set()
|
|
||||||
optional_elements = (elem_union - elem_inter) - {()} # never wrap root
|
|
||||||
|
|
||||||
list_var = (
|
|
||||||
f"{role_prefix}_{fmt}_items"
|
|
||||||
if multiple_formats
|
|
||||||
else f"{role_prefix}_items"
|
|
||||||
)
|
|
||||||
template = _xml_apply_jinja_union(
|
|
||||||
role_prefix, union_root, optional_elements
|
|
||||||
)
|
|
||||||
|
|
||||||
items: list[dict[str, Any]] = []
|
|
||||||
for rid, parsed, elems in zip(rel_ids, parsed_list, elem_sets):
|
|
||||||
item: dict[str, Any] = {"id": rid}
|
|
||||||
flat = flatten_config(fmt, parsed, loop_candidates=None)
|
|
||||||
for path, value in flat:
|
|
||||||
item[make_var_name(role_prefix, path)] = value
|
|
||||||
for epath in optional_elements:
|
|
||||||
if epath in elems:
|
|
||||||
item[defined_var_name(role_prefix, epath)] = True
|
|
||||||
items.append(item)
|
|
||||||
|
|
||||||
outputs.append(
|
|
||||||
FormatOutput(fmt=fmt, template=template, list_var=list_var, items=items)
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
|
|
||||||
raise ValueError(f"Unsupported format in folder mode: {fmt}")
|
|
||||||
|
|
||||||
# Build combined defaults YAML
|
|
||||||
defaults_doc: dict[str, Any] = {}
|
|
||||||
for out in outputs:
|
|
||||||
defaults_doc[out.list_var] = out.items
|
|
||||||
defaults_yaml = dump_yaml(defaults_doc, sort_keys=True)
|
|
||||||
|
|
||||||
return defaults_yaml, outputs
|
|
||||||
|
|
@ -14,7 +14,7 @@ def test_cli_stdout_toml(capsys):
|
||||||
cfg_path = SAMPLES_DIR / "tom.toml"
|
cfg_path = SAMPLES_DIR / "tom.toml"
|
||||||
exit_code = cli._main([str(cfg_path), "-r", "jinjaturtle"])
|
exit_code = cli._main([str(cfg_path), "-r", "jinjaturtle"])
|
||||||
|
|
||||||
assert exit_code == 0
|
assert exit_code
|
||||||
|
|
||||||
captured = capsys.readouterr()
|
captured = capsys.readouterr()
|
||||||
out = captured.out
|
out = captured.out
|
||||||
|
|
@ -48,7 +48,7 @@ def test_cli_writes_output_files(tmp_path, capsys):
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
assert exit_code == 0
|
assert exit_code
|
||||||
assert defaults_path.is_file()
|
assert defaults_path.is_file()
|
||||||
assert template_path.is_file()
|
assert template_path.is_file()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,33 +0,0 @@
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
import jinjaturtle.core as core
|
|
||||||
|
|
||||||
|
|
||||||
def test_postfix_main_cf_parsing_and_template(tmp_path: Path) -> None:
|
|
||||||
p = tmp_path / "main.cf"
|
|
||||||
p.write_text(
|
|
||||||
"# comment\n"
|
|
||||||
"myhostname = mail.example.com\n"
|
|
||||||
"mynetworks = 127.0.0.0/8\n"
|
|
||||||
" [::1]/128\n",
|
|
||||||
encoding="utf-8",
|
|
||||||
)
|
|
||||||
|
|
||||||
fmt, parsed = core.parse_config(p)
|
|
||||||
assert fmt == "postfix"
|
|
||||||
|
|
||||||
flat = core.flatten_config(fmt, parsed)
|
|
||||||
assert (("myhostname",), "mail.example.com") in flat
|
|
||||||
assert any(
|
|
||||||
path == ("mynetworks",) and value.startswith("127.0.0.0/8")
|
|
||||||
for path, value in flat
|
|
||||||
)
|
|
||||||
|
|
||||||
template = core.generate_jinja2_template(
|
|
||||||
fmt, parsed, role_prefix="role", original_text=p.read_text(encoding="utf-8")
|
|
||||||
)
|
|
||||||
assert "myhostname = {{ role_myhostname }}" in template
|
|
||||||
assert "mynetworks = {{ role_mynetworks }}" in template
|
|
||||||
assert "# comment" in template
|
|
||||||
|
|
@ -1,26 +0,0 @@
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
import jinjaturtle.core as core
|
|
||||||
|
|
||||||
|
|
||||||
def test_systemd_unit_repeated_keys(tmp_path: Path) -> None:
|
|
||||||
p = tmp_path / "demo.service"
|
|
||||||
p.write_text(
|
|
||||||
"[Service]\n" "ExecStart=/bin/echo one\n" "ExecStart=/bin/echo two\n",
|
|
||||||
encoding="utf-8",
|
|
||||||
)
|
|
||||||
|
|
||||||
fmt, parsed = core.parse_config(p)
|
|
||||||
assert fmt == "systemd"
|
|
||||||
|
|
||||||
flat = core.flatten_config(fmt, parsed)
|
|
||||||
assert (("Service", "ExecStart", "0"), "/bin/echo one") in flat
|
|
||||||
assert (("Service", "ExecStart", "1"), "/bin/echo two") in flat
|
|
||||||
|
|
||||||
template = core.generate_jinja2_template(
|
|
||||||
fmt, parsed, role_prefix="role", original_text=p.read_text(encoding="utf-8")
|
|
||||||
)
|
|
||||||
assert "ExecStart={{ role_service_execstart_0 }}" in template
|
|
||||||
assert "ExecStart={{ role_service_execstart_1 }}" in template
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue