Compare commits
5 commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 2f77cd4d80 | |||
| 8f7f48dc91 | |||
| a5c860e463 | |||
| 14428ff89c | |||
| f92854382a |
12 changed files with 561 additions and 29 deletions
|
|
@ -1,5 +1,6 @@
|
||||||
# syntax=docker/dockerfile:1
|
# syntax=docker/dockerfile:1
|
||||||
FROM fedora:42
|
ARG BASE_IMAGE=fedora:42
|
||||||
|
FROM ${BASE_IMAGE}
|
||||||
|
|
||||||
RUN set -eux; \
|
RUN set -eux; \
|
||||||
dnf -y update; \
|
dnf -y update; \
|
||||||
|
|
|
||||||
17
README.md
17
README.md
|
|
@ -74,7 +74,7 @@ sudo apt update
|
||||||
sudo apt install jinjaturtle
|
sudo apt install jinjaturtle
|
||||||
```
|
```
|
||||||
|
|
||||||
### Fedora 42
|
### Fedora
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
sudo rpm --import https://mig5.net/static/mig5.asc
|
sudo rpm --import https://mig5.net/static/mig5.asc
|
||||||
|
|
@ -82,7 +82,7 @@ sudo rpm --import https://mig5.net/static/mig5.asc
|
||||||
sudo tee /etc/yum.repos.d/mig5.repo > /dev/null << 'EOF'
|
sudo tee /etc/yum.repos.d/mig5.repo > /dev/null << 'EOF'
|
||||||
[mig5]
|
[mig5]
|
||||||
name=mig5 Repository
|
name=mig5 Repository
|
||||||
baseurl=https://rpm.mig5.net/rpm/$basearch
|
baseurl=https://rpm.mig5.net/$releasever/rpm/$basearch
|
||||||
enabled=1
|
enabled=1
|
||||||
gpgcheck=1
|
gpgcheck=1
|
||||||
repo_gpgcheck=1
|
repo_gpgcheck=1
|
||||||
|
|
@ -127,12 +127,12 @@ jinjaturtle php.ini \
|
||||||
## Full usage info
|
## Full usage info
|
||||||
|
|
||||||
```
|
```
|
||||||
usage: jinjaturtle [-h] -r ROLE_NAME [-f {json,ini,toml,yaml,xml}] [-d DEFAULTS_OUTPUT] [-t TEMPLATE_OUTPUT] config
|
usage: jinjaturtle [-h] -r ROLE_NAME [-f {json,ini,toml,yaml,xml,postfix,systemd}] [-d DEFAULTS_OUTPUT] [-t TEMPLATE_OUTPUT] config
|
||||||
|
|
||||||
Convert a config file into Ansible inventory and a Jinja2 template.
|
Convert a config file into Ansible inventory and a Jinja2 template.
|
||||||
|
|
||||||
positional arguments:
|
positional arguments:
|
||||||
config Path to the source configuration file (TOML or INI-style).
|
config Path to the source configuration file.
|
||||||
|
|
||||||
options:
|
options:
|
||||||
-h, --help show this help message and exit
|
-h, --help show this help message and exit
|
||||||
|
|
@ -146,6 +146,15 @@ options:
|
||||||
Path to write the Jinja2 config template. If omitted, template is printed to stdout.
|
Path to write the Jinja2 config template. If omitted, template is printed to stdout.
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## Additional supported formats
|
||||||
|
|
||||||
|
JinjaTurtle can also template some common "bespoke" config formats:
|
||||||
|
|
||||||
|
- **Postfix main.cf** (`main.cf`) → `--format postfix`
|
||||||
|
- **systemd unit files** (`*.service`, `*.socket`, etc.) → `--format systemd`
|
||||||
|
|
||||||
|
For ambiguous extensions like `*.conf`, JinjaTurtle uses lightweight content sniffing; you can always force a specific handler via `--format`.
|
||||||
|
|
||||||
|
|
||||||
## Found a bug, have a suggestion?
|
## Found a bug, have a suggestion?
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
[tool.poetry]
|
[tool.poetry]
|
||||||
name = "jinjaturtle"
|
name = "jinjaturtle"
|
||||||
version = "0.3.5"
|
version = "0.4.0"
|
||||||
description = "Convert config files into Ansible defaults and Jinja2 templates."
|
description = "Convert config files into Ansible defaults and Jinja2 templates."
|
||||||
authors = ["Miguel Jacq <mig@mig5.net>"]
|
authors = ["Miguel Jacq <mig@mig5.net>"]
|
||||||
license = "GPL-3.0-or-later"
|
license = "GPL-3.0-or-later"
|
||||||
|
|
|
||||||
52
release.sh
52
release.sh
|
|
@ -45,30 +45,48 @@ done
|
||||||
|
|
||||||
# RPM
|
# RPM
|
||||||
sudo apt-get -y install createrepo-c rpm
|
sudo apt-get -y install createrepo-c rpm
|
||||||
docker build -f Dockerfile.rpmbuild -t jinjaturtle:f42 --progress=plain .
|
|
||||||
docker run --rm -v "$PWD":/src -v "$PWD/dist/rpm":/out jinjaturtle:f42
|
|
||||||
sudo chown -R "${USER}" "$PWD/dist"
|
|
||||||
|
|
||||||
REPO_ROOT="${HOME}/git/repo_rpm"
|
|
||||||
RPM_REPO="${REPO_ROOT}/rpm/x86_64"
|
|
||||||
BUILD_OUTPUT="${HOME}/git/jinjaturtle/dist"
|
BUILD_OUTPUT="${HOME}/git/jinjaturtle/dist"
|
||||||
REMOTE="letessier.mig5.net:/opt/repo_rpm"
|
|
||||||
KEYID="00AE817C24A10C2540461A9C1D7CDE0234DB458D"
|
KEYID="00AE817C24A10C2540461A9C1D7CDE0234DB458D"
|
||||||
|
REPO_ROOT="${HOME}/git/repo_rpm"
|
||||||
|
REMOTE="letessier.mig5.net:/opt/repo_rpm"
|
||||||
|
|
||||||
echo "==> Updating RPM repo..."
|
DISTS=(
|
||||||
mkdir -p "$RPM_REPO"
|
fedora:43
|
||||||
|
fedora:42
|
||||||
|
)
|
||||||
|
|
||||||
for file in `ls -1 "${BUILD_OUTPUT}/rpm"`; do
|
for dist in ${DISTS[@]}; do
|
||||||
|
release=$(echo ${dist} | cut -d: -f2)
|
||||||
|
REPO_RELEASE_ROOT="${REPO_ROOT}/${release}"
|
||||||
|
RPM_REPO="${REPO_RELEASE_ROOT}/rpm/x86_64"
|
||||||
|
mkdir -p "$RPM_REPO"
|
||||||
|
|
||||||
|
docker build \
|
||||||
|
--no-cache \
|
||||||
|
-f Dockerfile.rpmbuild \
|
||||||
|
-t jinjaturtle-rpm:${release} \
|
||||||
|
--progress=plain \
|
||||||
|
--build-arg BASE_IMAGE=${dist} \
|
||||||
|
.
|
||||||
|
|
||||||
|
rm -rf "$PWD/dist/rpm"/*
|
||||||
|
mkdir -p "$PWD/dist/rpm"
|
||||||
|
|
||||||
|
docker run --rm -v "$PWD":/src -v "$PWD/dist/rpm":/out jinjaturtle-rpm:${release}
|
||||||
|
sudo chown -R "${USER}" "$PWD/dist"
|
||||||
|
|
||||||
|
for file in `ls -1 "${BUILD_OUTPUT}/rpm"`; do
|
||||||
rpmsign --addsign "${BUILD_OUTPUT}/rpm/$file"
|
rpmsign --addsign "${BUILD_OUTPUT}/rpm/$file"
|
||||||
|
done
|
||||||
|
|
||||||
|
cp "${BUILD_OUTPUT}/rpm/"*.rpm "$RPM_REPO/"
|
||||||
|
|
||||||
|
createrepo_c "$RPM_REPO"
|
||||||
|
|
||||||
|
echo "==> Signing repomd.xml..."
|
||||||
|
qubes-gpg-client --local-user "$KEYID" --detach-sign --armor "$RPM_REPO/repodata/repomd.xml" > "$RPM_REPO/repodata/repomd.xml.asc"
|
||||||
done
|
done
|
||||||
|
|
||||||
cp "${BUILD_OUTPUT}/rpm/"*.rpm "$RPM_REPO/"
|
|
||||||
|
|
||||||
createrepo_c "$RPM_REPO"
|
|
||||||
|
|
||||||
echo "==> Signing repomd.xml..."
|
|
||||||
qubes-gpg-client --local-user "$KEYID" --detach-sign --armor "$RPM_REPO/repodata/repomd.xml" > "$RPM_REPO/repodata/repomd.xml.asc"
|
|
||||||
|
|
||||||
echo "==> Syncing repo to server..."
|
echo "==> Syncing repo to server..."
|
||||||
rsync -aHPvz --exclude=.git --delete "$REPO_ROOT/" "$REMOTE/"
|
rsync -aHPvz --exclude=.git --delete "$REPO_ROOT/" "$REMOTE/"
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -42,7 +42,7 @@ def _build_arg_parser() -> argparse.ArgumentParser:
|
||||||
ap.add_argument(
|
ap.add_argument(
|
||||||
"-f",
|
"-f",
|
||||||
"--format",
|
"--format",
|
||||||
choices=["ini", "json", "toml", "yaml", "xml"],
|
choices=["ini", "json", "toml", "yaml", "xml", "postfix", "systemd"],
|
||||||
help="Force config format instead of auto-detecting from filename.",
|
help="Force config format instead of auto-detecting from filename.",
|
||||||
)
|
)
|
||||||
ap.add_argument(
|
ap.add_argument(
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@ from pathlib import Path
|
||||||
from typing import Any, Iterable
|
from typing import Any, Iterable
|
||||||
|
|
||||||
import datetime
|
import datetime
|
||||||
|
import re
|
||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
from .loop_analyzer import LoopAnalyzer, LoopCandidate
|
from .loop_analyzer import LoopAnalyzer, LoopCandidate
|
||||||
|
|
@ -14,6 +15,8 @@ from .handlers import (
|
||||||
TomlHandler,
|
TomlHandler,
|
||||||
YamlHandler,
|
YamlHandler,
|
||||||
XmlHandler,
|
XmlHandler,
|
||||||
|
PostfixMainHandler,
|
||||||
|
SystemdUnitHandler,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -56,12 +59,18 @@ _TOML_HANDLER = TomlHandler()
|
||||||
_YAML_HANDLER = YamlHandler()
|
_YAML_HANDLER = YamlHandler()
|
||||||
_XML_HANDLER = XmlHandler()
|
_XML_HANDLER = XmlHandler()
|
||||||
|
|
||||||
|
_POSTFIX_HANDLER = PostfixMainHandler()
|
||||||
|
_SYSTEMD_HANDLER = SystemdUnitHandler()
|
||||||
|
|
||||||
_HANDLERS["ini"] = _INI_HANDLER
|
_HANDLERS["ini"] = _INI_HANDLER
|
||||||
_HANDLERS["json"] = _JSON_HANDLER
|
_HANDLERS["json"] = _JSON_HANDLER
|
||||||
_HANDLERS["toml"] = _TOML_HANDLER
|
_HANDLERS["toml"] = _TOML_HANDLER
|
||||||
_HANDLERS["yaml"] = _YAML_HANDLER
|
_HANDLERS["yaml"] = _YAML_HANDLER
|
||||||
_HANDLERS["xml"] = _XML_HANDLER
|
_HANDLERS["xml"] = _XML_HANDLER
|
||||||
|
|
||||||
|
_HANDLERS["postfix"] = _POSTFIX_HANDLER
|
||||||
|
_HANDLERS["systemd"] = _SYSTEMD_HANDLER
|
||||||
|
|
||||||
|
|
||||||
def dump_yaml(data: Any, *, sort_keys: bool = True) -> str:
|
def dump_yaml(data: Any, *, sort_keys: bool = True) -> str:
|
||||||
"""Dump YAML using JinjaTurtle's dumper settings.
|
"""Dump YAML using JinjaTurtle's dumper settings.
|
||||||
|
|
@ -86,24 +95,92 @@ def make_var_name(role_prefix: str, path: Iterable[str]) -> str:
|
||||||
return BaseHandler.make_var_name(role_prefix, path)
|
return BaseHandler.make_var_name(role_prefix, path)
|
||||||
|
|
||||||
|
|
||||||
|
def _read_head(path: Path, max_bytes: int = 65536) -> str:
|
||||||
|
try:
|
||||||
|
with path.open("r", encoding="utf-8", errors="replace") as f:
|
||||||
|
return f.read(max_bytes)
|
||||||
|
except OSError:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
_SYSTEMD_SUFFIXES: set[str] = {
|
||||||
|
".service",
|
||||||
|
".socket",
|
||||||
|
".target",
|
||||||
|
".timer",
|
||||||
|
".path",
|
||||||
|
".mount",
|
||||||
|
".automount",
|
||||||
|
".slice",
|
||||||
|
".swap",
|
||||||
|
".scope",
|
||||||
|
".link",
|
||||||
|
".netdev",
|
||||||
|
".network",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _looks_like_systemd(text: str) -> bool:
|
||||||
|
# Be conservative: many INI-style configs have [section] and key=value.
|
||||||
|
# systemd unit files almost always contain one of these well-known sections.
|
||||||
|
if re.search(
|
||||||
|
r"^\s*\[(Unit|Service|Install|Socket|Timer|Path|Mount|Automount|Slice|Swap|Scope)\]\s*$",
|
||||||
|
text,
|
||||||
|
re.M,
|
||||||
|
) and re.search(r"^\s*\w[\w\-]*\s*=", text, re.M):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def detect_format(path: Path, explicit: str | None = None) -> str:
|
def detect_format(path: Path, explicit: str | None = None) -> str:
|
||||||
"""
|
"""
|
||||||
Determine config format from argument or filename.
|
Determine config format.
|
||||||
|
|
||||||
|
For unambiguous extensions (json/yaml/toml/xml/ini), we rely on the suffix.
|
||||||
|
For ambiguous extensions like '.conf' (or no extension), we sniff the content.
|
||||||
"""
|
"""
|
||||||
if explicit:
|
if explicit:
|
||||||
return explicit
|
return explicit
|
||||||
|
|
||||||
suffix = path.suffix.lower()
|
suffix = path.suffix.lower()
|
||||||
name = path.name.lower()
|
name = path.name.lower()
|
||||||
|
|
||||||
|
# Unambiguous extensions
|
||||||
if suffix == ".toml":
|
if suffix == ".toml":
|
||||||
return "toml"
|
return "toml"
|
||||||
if suffix in {".yaml", ".yml"}:
|
if suffix in {".yaml", ".yml"}:
|
||||||
return "yaml"
|
return "yaml"
|
||||||
if suffix == ".json":
|
if suffix == ".json":
|
||||||
return "json"
|
return "json"
|
||||||
if suffix in {".ini", ".cfg", ".conf"} or name.endswith(".ini"):
|
|
||||||
return "ini"
|
|
||||||
if suffix == ".xml":
|
if suffix == ".xml":
|
||||||
return "xml"
|
return "xml"
|
||||||
|
|
||||||
|
# Special-ish INI-like formats
|
||||||
|
if suffix in {".ini", ".cfg"} or name.endswith(".ini"):
|
||||||
|
return "ini"
|
||||||
|
if suffix == ".repo":
|
||||||
|
return "ini"
|
||||||
|
|
||||||
|
# systemd units
|
||||||
|
if suffix in _SYSTEMD_SUFFIXES:
|
||||||
|
return "systemd"
|
||||||
|
|
||||||
|
# well-known filenames
|
||||||
|
if name == "main.cf":
|
||||||
|
return "postfix"
|
||||||
|
|
||||||
|
head = _read_head(path)
|
||||||
|
|
||||||
|
# Content sniffing
|
||||||
|
if _looks_like_systemd(head):
|
||||||
|
return "systemd"
|
||||||
|
|
||||||
|
# Ambiguous .conf/.cf defaults to INI-ish if no better match
|
||||||
|
if suffix in {".conf", ".cf"}:
|
||||||
|
if name == "main.cf":
|
||||||
|
return "postfix"
|
||||||
|
return "ini"
|
||||||
|
|
||||||
# Fallback: treat as INI-ish
|
# Fallback: treat as INI-ish
|
||||||
return "ini"
|
return "ini"
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,9 @@ from .toml import TomlHandler
|
||||||
from .yaml import YamlHandler
|
from .yaml import YamlHandler
|
||||||
from .xml import XmlHandler
|
from .xml import XmlHandler
|
||||||
|
|
||||||
|
from .postfix import PostfixMainHandler
|
||||||
|
from .systemd import SystemdUnitHandler
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
"BaseHandler",
|
"BaseHandler",
|
||||||
"DictLikeHandler",
|
"DictLikeHandler",
|
||||||
|
|
@ -16,4 +19,6 @@ __all__ = [
|
||||||
"TomlHandler",
|
"TomlHandler",
|
||||||
"YamlHandler",
|
"YamlHandler",
|
||||||
"XmlHandler",
|
"XmlHandler",
|
||||||
|
"PostfixMainHandler",
|
||||||
|
"SystemdUnitHandler",
|
||||||
]
|
]
|
||||||
|
|
|
||||||
177
src/jinjaturtle/handlers/postfix.py
Normal file
177
src/jinjaturtle/handlers/postfix.py
Normal file
|
|
@ -0,0 +1,177 @@
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from . import BaseHandler
|
||||||
|
|
||||||
|
|
||||||
|
class PostfixMainHandler(BaseHandler):
|
||||||
|
"""
|
||||||
|
Handler for Postfix main.cf style configuration.
|
||||||
|
|
||||||
|
Postfix main.cf is largely 'key = value' with:
|
||||||
|
- '#' comments
|
||||||
|
- continuation lines starting with whitespace (they continue the previous value)
|
||||||
|
"""
|
||||||
|
|
||||||
|
fmt = "postfix"
|
||||||
|
|
||||||
|
def parse(self, path: Path) -> dict[str, str]:
|
||||||
|
text = path.read_text(encoding="utf-8")
|
||||||
|
return self._parse_text_to_dict(text)
|
||||||
|
|
||||||
|
def _parse_text_to_dict(self, text: str) -> dict[str, str]:
|
||||||
|
lines = text.splitlines()
|
||||||
|
out: dict[str, str] = {}
|
||||||
|
i = 0
|
||||||
|
while i < len(lines):
|
||||||
|
line = lines[i]
|
||||||
|
stripped = line.strip()
|
||||||
|
if not stripped or stripped.startswith("#"):
|
||||||
|
i += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
if "=" not in line:
|
||||||
|
i += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
eq_index = line.find("=")
|
||||||
|
key = line[:eq_index].strip()
|
||||||
|
if not key:
|
||||||
|
i += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
# value + inline comment
|
||||||
|
after = line[eq_index + 1 :]
|
||||||
|
value_part, _comment = self._split_inline_comment(after, {"#"})
|
||||||
|
value = value_part.strip()
|
||||||
|
|
||||||
|
# collect continuation lines
|
||||||
|
j = i + 1
|
||||||
|
cont_parts: list[str] = []
|
||||||
|
while j < len(lines):
|
||||||
|
nxt = lines[j]
|
||||||
|
if not nxt:
|
||||||
|
break
|
||||||
|
if nxt.startswith((" ", "\t")):
|
||||||
|
if nxt.strip().startswith("#"):
|
||||||
|
# a commented continuation line - treat as a break
|
||||||
|
break
|
||||||
|
cont_parts.append(nxt.strip())
|
||||||
|
j += 1
|
||||||
|
continue
|
||||||
|
break
|
||||||
|
|
||||||
|
if cont_parts:
|
||||||
|
value = " ".join([value] + cont_parts).strip()
|
||||||
|
|
||||||
|
out[key] = value
|
||||||
|
i = j if cont_parts else i + 1
|
||||||
|
|
||||||
|
return out
|
||||||
|
|
||||||
|
def flatten(self, parsed: Any) -> list[tuple[tuple[str, ...], Any]]:
|
||||||
|
if not isinstance(parsed, dict):
|
||||||
|
raise TypeError("Postfix parse result must be a dict[str, str]")
|
||||||
|
items: list[tuple[tuple[str, ...], Any]] = []
|
||||||
|
for k, v in parsed.items():
|
||||||
|
items.append(((k,), v))
|
||||||
|
return items
|
||||||
|
|
||||||
|
def generate_jinja2_template(
|
||||||
|
self,
|
||||||
|
parsed: Any,
|
||||||
|
role_prefix: str,
|
||||||
|
original_text: str | None = None,
|
||||||
|
) -> str:
|
||||||
|
if original_text is None:
|
||||||
|
# Canonical render (lossy)
|
||||||
|
if not isinstance(parsed, dict):
|
||||||
|
raise TypeError("Postfix parse result must be a dict[str, str]")
|
||||||
|
lines: list[str] = []
|
||||||
|
for k, v in parsed.items():
|
||||||
|
var = self.make_var_name(role_prefix, (k,))
|
||||||
|
lines.append(f"{k} = {{{{ {var} }}}}")
|
||||||
|
return "\n".join(lines).rstrip() + "\n"
|
||||||
|
return self._generate_from_text(role_prefix, original_text)
|
||||||
|
|
||||||
|
def _generate_from_text(self, role_prefix: str, text: str) -> str:
|
||||||
|
lines = text.splitlines(keepends=True)
|
||||||
|
out_lines: list[str] = []
|
||||||
|
i = 0
|
||||||
|
while i < len(lines):
|
||||||
|
raw_line = lines[i]
|
||||||
|
content = raw_line.rstrip("\n")
|
||||||
|
newline = "\n" if raw_line.endswith("\n") else ""
|
||||||
|
|
||||||
|
stripped = content.strip()
|
||||||
|
if not stripped:
|
||||||
|
out_lines.append(raw_line)
|
||||||
|
i += 1
|
||||||
|
continue
|
||||||
|
if stripped.startswith("#"):
|
||||||
|
out_lines.append(raw_line)
|
||||||
|
i += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
if "=" not in content:
|
||||||
|
out_lines.append(raw_line)
|
||||||
|
i += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
eq_index = content.find("=")
|
||||||
|
before_eq = content[:eq_index]
|
||||||
|
after_eq = content[eq_index + 1 :]
|
||||||
|
|
||||||
|
key = before_eq.strip()
|
||||||
|
if not key:
|
||||||
|
out_lines.append(raw_line)
|
||||||
|
i += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
# whitespace after '='
|
||||||
|
value_ws_len = len(after_eq) - len(after_eq.lstrip(" \t"))
|
||||||
|
leading_ws = after_eq[:value_ws_len]
|
||||||
|
value_and_comment = after_eq[value_ws_len:]
|
||||||
|
|
||||||
|
value_part, comment_part = self._split_inline_comment(
|
||||||
|
value_and_comment, {"#"}
|
||||||
|
)
|
||||||
|
value = value_part.strip()
|
||||||
|
|
||||||
|
# collect continuation physical lines to skip
|
||||||
|
j = i + 1
|
||||||
|
cont_parts: list[str] = []
|
||||||
|
while j < len(lines):
|
||||||
|
nxt_raw = lines[j]
|
||||||
|
nxt = nxt_raw.rstrip("\n")
|
||||||
|
if (
|
||||||
|
nxt.startswith((" ", "\t"))
|
||||||
|
and nxt.strip()
|
||||||
|
and not nxt.strip().startswith("#")
|
||||||
|
):
|
||||||
|
cont_parts.append(nxt.strip())
|
||||||
|
j += 1
|
||||||
|
continue
|
||||||
|
break
|
||||||
|
|
||||||
|
if cont_parts:
|
||||||
|
value = " ".join([value] + cont_parts).strip()
|
||||||
|
|
||||||
|
var = self.make_var_name(role_prefix, (key,))
|
||||||
|
v = value
|
||||||
|
quoted = len(v) >= 2 and v[0] == v[-1] and v[0] in {'"', "'"}
|
||||||
|
if quoted:
|
||||||
|
replacement = (
|
||||||
|
f'{before_eq}={leading_ws}"{{{{ {var} }}}}"{comment_part}{newline}'
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
replacement = (
|
||||||
|
f"{before_eq}={leading_ws}{{{{ {var} }}}}{comment_part}{newline}"
|
||||||
|
)
|
||||||
|
|
||||||
|
out_lines.append(replacement)
|
||||||
|
i = j # skip continuation lines (if any)
|
||||||
|
|
||||||
|
return "".join(out_lines)
|
||||||
177
src/jinjaturtle/handlers/systemd.py
Normal file
177
src/jinjaturtle/handlers/systemd.py
Normal file
|
|
@ -0,0 +1,177 @@
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from . import BaseHandler
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class SystemdLine:
|
||||||
|
kind: str # 'blank' | 'comment' | 'section' | 'kv' | 'raw'
|
||||||
|
raw: str
|
||||||
|
lineno: int
|
||||||
|
section: str | None = None
|
||||||
|
key: str | None = None
|
||||||
|
value: str | None = None
|
||||||
|
comment: str = ""
|
||||||
|
before_eq: str = ""
|
||||||
|
leading_ws_after_eq: str = ""
|
||||||
|
occ_index: int | None = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class SystemdUnit:
|
||||||
|
lines: list[SystemdLine]
|
||||||
|
|
||||||
|
|
||||||
|
class SystemdUnitHandler(BaseHandler):
|
||||||
|
"""
|
||||||
|
Handler for systemd unit files.
|
||||||
|
|
||||||
|
unit files are INI-like, but keys may repeat (e.g. multiple ExecStart= lines).
|
||||||
|
We preserve repeated keys by indexing them when flattening and templating.
|
||||||
|
"""
|
||||||
|
|
||||||
|
fmt = "systemd"
|
||||||
|
|
||||||
|
def parse(self, path: Path) -> SystemdUnit:
|
||||||
|
text = path.read_text(encoding="utf-8")
|
||||||
|
return self._parse_text(text)
|
||||||
|
|
||||||
|
def _parse_text(self, text: str) -> SystemdUnit:
|
||||||
|
lines = text.splitlines(keepends=True)
|
||||||
|
out: list[SystemdLine] = []
|
||||||
|
current_section: str | None = None
|
||||||
|
# counts per section+key to assign occ_index
|
||||||
|
occ: dict[tuple[str, str], int] = {}
|
||||||
|
|
||||||
|
for lineno, raw_line in enumerate(lines, start=1):
|
||||||
|
content = raw_line.rstrip("\n")
|
||||||
|
stripped = content.strip()
|
||||||
|
|
||||||
|
if not stripped:
|
||||||
|
out.append(SystemdLine(kind="blank", raw=raw_line, lineno=lineno))
|
||||||
|
continue
|
||||||
|
|
||||||
|
if stripped.startswith(("#", ";")):
|
||||||
|
out.append(SystemdLine(kind="comment", raw=raw_line, lineno=lineno))
|
||||||
|
continue
|
||||||
|
|
||||||
|
# section header
|
||||||
|
if (
|
||||||
|
stripped.startswith("[")
|
||||||
|
and stripped.endswith("]")
|
||||||
|
and len(stripped) >= 2
|
||||||
|
):
|
||||||
|
sec = stripped[1:-1].strip()
|
||||||
|
current_section = sec
|
||||||
|
out.append(
|
||||||
|
SystemdLine(
|
||||||
|
kind="section", raw=raw_line, lineno=lineno, section=sec
|
||||||
|
)
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
if "=" not in content:
|
||||||
|
out.append(SystemdLine(kind="raw", raw=raw_line, lineno=lineno))
|
||||||
|
continue
|
||||||
|
|
||||||
|
eq_index = content.find("=")
|
||||||
|
before_eq = content[:eq_index]
|
||||||
|
after_eq = content[eq_index + 1 :]
|
||||||
|
|
||||||
|
key = before_eq.strip()
|
||||||
|
if not key:
|
||||||
|
out.append(SystemdLine(kind="raw", raw=raw_line, lineno=lineno))
|
||||||
|
continue
|
||||||
|
|
||||||
|
# whitespace after '='
|
||||||
|
value_ws_len = len(after_eq) - len(after_eq.lstrip(" \t"))
|
||||||
|
leading_ws = after_eq[:value_ws_len]
|
||||||
|
value_and_comment = after_eq[value_ws_len:]
|
||||||
|
|
||||||
|
value_part, comment = self._split_inline_comment(
|
||||||
|
value_and_comment, {"#", ";"}
|
||||||
|
)
|
||||||
|
value = value_part.strip()
|
||||||
|
|
||||||
|
sec = current_section or "DEFAULT"
|
||||||
|
k = (sec, key)
|
||||||
|
idx = occ.get(k, 0)
|
||||||
|
occ[k] = idx + 1
|
||||||
|
|
||||||
|
out.append(
|
||||||
|
SystemdLine(
|
||||||
|
kind="kv",
|
||||||
|
raw=raw_line,
|
||||||
|
lineno=lineno,
|
||||||
|
section=sec,
|
||||||
|
key=key,
|
||||||
|
value=value,
|
||||||
|
comment=comment,
|
||||||
|
before_eq=before_eq,
|
||||||
|
leading_ws_after_eq=leading_ws,
|
||||||
|
occ_index=idx,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return SystemdUnit(lines=out)
|
||||||
|
|
||||||
|
def flatten(self, parsed: Any) -> list[tuple[tuple[str, ...], Any]]:
|
||||||
|
if not isinstance(parsed, SystemdUnit):
|
||||||
|
raise TypeError("systemd parse result must be a SystemdUnit")
|
||||||
|
|
||||||
|
# determine duplicates per (section,key)
|
||||||
|
counts: dict[tuple[str, str], int] = {}
|
||||||
|
for ln in parsed.lines:
|
||||||
|
if ln.kind == "kv" and ln.section and ln.key:
|
||||||
|
counts[(ln.section, ln.key)] = counts.get((ln.section, ln.key), 0) + 1
|
||||||
|
|
||||||
|
items: list[tuple[tuple[str, ...], Any]] = []
|
||||||
|
for ln in parsed.lines:
|
||||||
|
if ln.kind != "kv" or not ln.section or not ln.key:
|
||||||
|
continue
|
||||||
|
path: tuple[str, ...] = (ln.section, ln.key)
|
||||||
|
if counts.get((ln.section, ln.key), 0) > 1 and ln.occ_index is not None:
|
||||||
|
path = path + (str(ln.occ_index),)
|
||||||
|
items.append((path, ln.value or ""))
|
||||||
|
return items
|
||||||
|
|
||||||
|
def generate_jinja2_template(
|
||||||
|
self,
|
||||||
|
parsed: Any,
|
||||||
|
role_prefix: str,
|
||||||
|
original_text: str | None = None,
|
||||||
|
) -> str:
|
||||||
|
if not isinstance(parsed, SystemdUnit):
|
||||||
|
raise TypeError("systemd parse result must be a SystemdUnit")
|
||||||
|
# We template using parsed lines so we preserve original formatting/comments.
|
||||||
|
counts: dict[tuple[str, str], int] = {}
|
||||||
|
for ln in parsed.lines:
|
||||||
|
if ln.kind == "kv" and ln.section and ln.key:
|
||||||
|
counts[(ln.section, ln.key)] = counts.get((ln.section, ln.key), 0) + 1
|
||||||
|
|
||||||
|
out_lines: list[str] = []
|
||||||
|
for ln in parsed.lines:
|
||||||
|
if ln.kind != "kv" or not ln.section or not ln.key:
|
||||||
|
out_lines.append(ln.raw)
|
||||||
|
continue
|
||||||
|
|
||||||
|
path: tuple[str, ...] = (ln.section, ln.key)
|
||||||
|
if counts.get((ln.section, ln.key), 0) > 1 and ln.occ_index is not None:
|
||||||
|
path = path + (str(ln.occ_index),)
|
||||||
|
var = self.make_var_name(role_prefix, path)
|
||||||
|
|
||||||
|
v = (ln.value or "").strip()
|
||||||
|
quoted = len(v) >= 2 and v[0] == v[-1] and v[0] in {'"', "'"}
|
||||||
|
if quoted:
|
||||||
|
repl = f'{ln.before_eq}={ln.leading_ws_after_eq}"{{{{ {var} }}}}"{ln.comment}'
|
||||||
|
else:
|
||||||
|
repl = f"{ln.before_eq}={ln.leading_ws_after_eq}{{{{ {var} }}}}{ln.comment}"
|
||||||
|
|
||||||
|
newline = "\n" if ln.raw.endswith("\n") else ""
|
||||||
|
out_lines.append(repl + newline)
|
||||||
|
|
||||||
|
return "".join(out_lines)
|
||||||
|
|
@ -36,7 +36,7 @@ SUPPORTED_SUFFIXES: dict[str, set[str]] = {
|
||||||
"toml": {".toml"},
|
"toml": {".toml"},
|
||||||
"yaml": {".yaml", ".yml"},
|
"yaml": {".yaml", ".yml"},
|
||||||
"json": {".json"},
|
"json": {".json"},
|
||||||
"ini": {".ini", ".cfg", ".conf"},
|
"ini": {".ini", ".cfg", ".conf", ".repo"},
|
||||||
"xml": {".xml"},
|
"xml": {".xml"},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -584,6 +584,9 @@ class FormatOutput:
|
||||||
items: list[dict[str, Any]]
|
items: list[dict[str, Any]]
|
||||||
|
|
||||||
|
|
||||||
|
FOLDER_SUPPORTED_FORMATS: set[str] = {"json", "yaml", "toml", "ini", "xml"}
|
||||||
|
|
||||||
|
|
||||||
def process_directory(
|
def process_directory(
|
||||||
root: Path, recursive: bool, role_prefix: str
|
root: Path, recursive: bool, role_prefix: str
|
||||||
) -> tuple[str, list[FormatOutput]]:
|
) -> tuple[str, list[FormatOutput]]:
|
||||||
|
|
@ -596,8 +599,14 @@ def process_directory(
|
||||||
grouped: dict[str, list[tuple[Path, Any]]] = defaultdict(list)
|
grouped: dict[str, list[tuple[Path, Any]]] = defaultdict(list)
|
||||||
for p in files:
|
for p in files:
|
||||||
fmt, parsed = parse_config(p, None)
|
fmt, parsed = parse_config(p, None)
|
||||||
|
if fmt not in FOLDER_SUPPORTED_FORMATS:
|
||||||
|
# Directory mode only supports a subset of formats for now.
|
||||||
|
continue
|
||||||
grouped[fmt].append((p, parsed))
|
grouped[fmt].append((p, parsed))
|
||||||
|
|
||||||
|
if not grouped:
|
||||||
|
raise ValueError(f"No folder-supported config files found under: {root}")
|
||||||
|
|
||||||
multiple_formats = len(grouped) > 1
|
multiple_formats = len(grouped) > 1
|
||||||
outputs: list[FormatOutput] = []
|
outputs: list[FormatOutput] = []
|
||||||
|
|
||||||
|
|
|
||||||
33
tests/test_postfix_format.py
Normal file
33
tests/test_postfix_format.py
Normal file
|
|
@ -0,0 +1,33 @@
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import jinjaturtle.core as core
|
||||||
|
|
||||||
|
|
||||||
|
def test_postfix_main_cf_parsing_and_template(tmp_path: Path) -> None:
|
||||||
|
p = tmp_path / "main.cf"
|
||||||
|
p.write_text(
|
||||||
|
"# comment\n"
|
||||||
|
"myhostname = mail.example.com\n"
|
||||||
|
"mynetworks = 127.0.0.0/8\n"
|
||||||
|
" [::1]/128\n",
|
||||||
|
encoding="utf-8",
|
||||||
|
)
|
||||||
|
|
||||||
|
fmt, parsed = core.parse_config(p)
|
||||||
|
assert fmt == "postfix"
|
||||||
|
|
||||||
|
flat = core.flatten_config(fmt, parsed)
|
||||||
|
assert (("myhostname",), "mail.example.com") in flat
|
||||||
|
assert any(
|
||||||
|
path == ("mynetworks",) and value.startswith("127.0.0.0/8")
|
||||||
|
for path, value in flat
|
||||||
|
)
|
||||||
|
|
||||||
|
template = core.generate_jinja2_template(
|
||||||
|
fmt, parsed, role_prefix="role", original_text=p.read_text(encoding="utf-8")
|
||||||
|
)
|
||||||
|
assert "myhostname = {{ role_myhostname }}" in template
|
||||||
|
assert "mynetworks = {{ role_mynetworks }}" in template
|
||||||
|
assert "# comment" in template
|
||||||
26
tests/test_systemd_format.py
Normal file
26
tests/test_systemd_format.py
Normal file
|
|
@ -0,0 +1,26 @@
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import jinjaturtle.core as core
|
||||||
|
|
||||||
|
|
||||||
|
def test_systemd_unit_repeated_keys(tmp_path: Path) -> None:
|
||||||
|
p = tmp_path / "demo.service"
|
||||||
|
p.write_text(
|
||||||
|
"[Service]\n" "ExecStart=/bin/echo one\n" "ExecStart=/bin/echo two\n",
|
||||||
|
encoding="utf-8",
|
||||||
|
)
|
||||||
|
|
||||||
|
fmt, parsed = core.parse_config(p)
|
||||||
|
assert fmt == "systemd"
|
||||||
|
|
||||||
|
flat = core.flatten_config(fmt, parsed)
|
||||||
|
assert (("Service", "ExecStart", "0"), "/bin/echo one") in flat
|
||||||
|
assert (("Service", "ExecStart", "1"), "/bin/echo two") in flat
|
||||||
|
|
||||||
|
template = core.generate_jinja2_template(
|
||||||
|
fmt, parsed, role_prefix="role", original_text=p.read_text(encoding="utf-8")
|
||||||
|
)
|
||||||
|
assert "ExecStart={{ role_service_execstart_0 }}" in template
|
||||||
|
assert "ExecStart={{ role_service_execstart_1 }}" in template
|
||||||
Loading…
Add table
Add a link
Reference in a new issue