Compare commits
5 commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 2f77cd4d80 | |||
| 8f7f48dc91 | |||
| a5c860e463 | |||
| 14428ff89c | |||
| f92854382a |
12 changed files with 561 additions and 29 deletions
|
|
@ -1,5 +1,6 @@
|
|||
# syntax=docker/dockerfile:1
|
||||
FROM fedora:42
|
||||
ARG BASE_IMAGE=fedora:42
|
||||
FROM ${BASE_IMAGE}
|
||||
|
||||
RUN set -eux; \
|
||||
dnf -y update; \
|
||||
|
|
|
|||
17
README.md
17
README.md
|
|
@ -74,7 +74,7 @@ sudo apt update
|
|||
sudo apt install jinjaturtle
|
||||
```
|
||||
|
||||
### Fedora 42
|
||||
### Fedora
|
||||
|
||||
```bash
|
||||
sudo rpm --import https://mig5.net/static/mig5.asc
|
||||
|
|
@ -82,7 +82,7 @@ sudo rpm --import https://mig5.net/static/mig5.asc
|
|||
sudo tee /etc/yum.repos.d/mig5.repo > /dev/null << 'EOF'
|
||||
[mig5]
|
||||
name=mig5 Repository
|
||||
baseurl=https://rpm.mig5.net/rpm/$basearch
|
||||
baseurl=https://rpm.mig5.net/$releasever/rpm/$basearch
|
||||
enabled=1
|
||||
gpgcheck=1
|
||||
repo_gpgcheck=1
|
||||
|
|
@ -127,12 +127,12 @@ jinjaturtle php.ini \
|
|||
## Full usage info
|
||||
|
||||
```
|
||||
usage: jinjaturtle [-h] -r ROLE_NAME [-f {json,ini,toml,yaml,xml}] [-d DEFAULTS_OUTPUT] [-t TEMPLATE_OUTPUT] config
|
||||
usage: jinjaturtle [-h] -r ROLE_NAME [-f {json,ini,toml,yaml,xml,postfix,systemd}] [-d DEFAULTS_OUTPUT] [-t TEMPLATE_OUTPUT] config
|
||||
|
||||
Convert a config file into Ansible inventory and a Jinja2 template.
|
||||
|
||||
positional arguments:
|
||||
config Path to the source configuration file (TOML or INI-style).
|
||||
config Path to the source configuration file.
|
||||
|
||||
options:
|
||||
-h, --help show this help message and exit
|
||||
|
|
@ -146,6 +146,15 @@ options:
|
|||
Path to write the Jinja2 config template. If omitted, template is printed to stdout.
|
||||
```
|
||||
|
||||
## Additional supported formats
|
||||
|
||||
JinjaTurtle can also template some common "bespoke" config formats:
|
||||
|
||||
- **Postfix main.cf** (`main.cf`) → `--format postfix`
|
||||
- **systemd unit files** (`*.service`, `*.socket`, etc.) → `--format systemd`
|
||||
|
||||
For ambiguous extensions like `*.conf`, JinjaTurtle uses lightweight content sniffing; you can always force a specific handler via `--format`.
|
||||
|
||||
|
||||
## Found a bug, have a suggestion?
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
[tool.poetry]
|
||||
name = "jinjaturtle"
|
||||
version = "0.3.5"
|
||||
version = "0.4.0"
|
||||
description = "Convert config files into Ansible defaults and Jinja2 templates."
|
||||
authors = ["Miguel Jacq <mig@mig5.net>"]
|
||||
license = "GPL-3.0-or-later"
|
||||
|
|
|
|||
34
release.sh
34
release.sh
|
|
@ -45,19 +45,36 @@ done
|
|||
|
||||
# RPM
|
||||
sudo apt-get -y install createrepo-c rpm
|
||||
docker build -f Dockerfile.rpmbuild -t jinjaturtle:f42 --progress=plain .
|
||||
docker run --rm -v "$PWD":/src -v "$PWD/dist/rpm":/out jinjaturtle:f42
|
||||
sudo chown -R "${USER}" "$PWD/dist"
|
||||
|
||||
REPO_ROOT="${HOME}/git/repo_rpm"
|
||||
RPM_REPO="${REPO_ROOT}/rpm/x86_64"
|
||||
BUILD_OUTPUT="${HOME}/git/jinjaturtle/dist"
|
||||
REMOTE="letessier.mig5.net:/opt/repo_rpm"
|
||||
KEYID="00AE817C24A10C2540461A9C1D7CDE0234DB458D"
|
||||
REPO_ROOT="${HOME}/git/repo_rpm"
|
||||
REMOTE="letessier.mig5.net:/opt/repo_rpm"
|
||||
|
||||
echo "==> Updating RPM repo..."
|
||||
DISTS=(
|
||||
fedora:43
|
||||
fedora:42
|
||||
)
|
||||
|
||||
for dist in ${DISTS[@]}; do
|
||||
release=$(echo ${dist} | cut -d: -f2)
|
||||
REPO_RELEASE_ROOT="${REPO_ROOT}/${release}"
|
||||
RPM_REPO="${REPO_RELEASE_ROOT}/rpm/x86_64"
|
||||
mkdir -p "$RPM_REPO"
|
||||
|
||||
docker build \
|
||||
--no-cache \
|
||||
-f Dockerfile.rpmbuild \
|
||||
-t jinjaturtle-rpm:${release} \
|
||||
--progress=plain \
|
||||
--build-arg BASE_IMAGE=${dist} \
|
||||
.
|
||||
|
||||
rm -rf "$PWD/dist/rpm"/*
|
||||
mkdir -p "$PWD/dist/rpm"
|
||||
|
||||
docker run --rm -v "$PWD":/src -v "$PWD/dist/rpm":/out jinjaturtle-rpm:${release}
|
||||
sudo chown -R "${USER}" "$PWD/dist"
|
||||
|
||||
for file in `ls -1 "${BUILD_OUTPUT}/rpm"`; do
|
||||
rpmsign --addsign "${BUILD_OUTPUT}/rpm/$file"
|
||||
done
|
||||
|
|
@ -68,6 +85,7 @@ createrepo_c "$RPM_REPO"
|
|||
|
||||
echo "==> Signing repomd.xml..."
|
||||
qubes-gpg-client --local-user "$KEYID" --detach-sign --armor "$RPM_REPO/repodata/repomd.xml" > "$RPM_REPO/repodata/repomd.xml.asc"
|
||||
done
|
||||
|
||||
echo "==> Syncing repo to server..."
|
||||
rsync -aHPvz --exclude=.git --delete "$REPO_ROOT/" "$REMOTE/"
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@ def _build_arg_parser() -> argparse.ArgumentParser:
|
|||
ap.add_argument(
|
||||
"-f",
|
||||
"--format",
|
||||
choices=["ini", "json", "toml", "yaml", "xml"],
|
||||
choices=["ini", "json", "toml", "yaml", "xml", "postfix", "systemd"],
|
||||
help="Force config format instead of auto-detecting from filename.",
|
||||
)
|
||||
ap.add_argument(
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ from pathlib import Path
|
|||
from typing import Any, Iterable
|
||||
|
||||
import datetime
|
||||
import re
|
||||
import yaml
|
||||
|
||||
from .loop_analyzer import LoopAnalyzer, LoopCandidate
|
||||
|
|
@ -14,6 +15,8 @@ from .handlers import (
|
|||
TomlHandler,
|
||||
YamlHandler,
|
||||
XmlHandler,
|
||||
PostfixMainHandler,
|
||||
SystemdUnitHandler,
|
||||
)
|
||||
|
||||
|
||||
|
|
@ -56,12 +59,18 @@ _TOML_HANDLER = TomlHandler()
|
|||
_YAML_HANDLER = YamlHandler()
|
||||
_XML_HANDLER = XmlHandler()
|
||||
|
||||
_POSTFIX_HANDLER = PostfixMainHandler()
|
||||
_SYSTEMD_HANDLER = SystemdUnitHandler()
|
||||
|
||||
_HANDLERS["ini"] = _INI_HANDLER
|
||||
_HANDLERS["json"] = _JSON_HANDLER
|
||||
_HANDLERS["toml"] = _TOML_HANDLER
|
||||
_HANDLERS["yaml"] = _YAML_HANDLER
|
||||
_HANDLERS["xml"] = _XML_HANDLER
|
||||
|
||||
_HANDLERS["postfix"] = _POSTFIX_HANDLER
|
||||
_HANDLERS["systemd"] = _SYSTEMD_HANDLER
|
||||
|
||||
|
||||
def dump_yaml(data: Any, *, sort_keys: bool = True) -> str:
|
||||
"""Dump YAML using JinjaTurtle's dumper settings.
|
||||
|
|
@ -86,24 +95,92 @@ def make_var_name(role_prefix: str, path: Iterable[str]) -> str:
|
|||
return BaseHandler.make_var_name(role_prefix, path)
|
||||
|
||||
|
||||
def _read_head(path: Path, max_bytes: int = 65536) -> str:
|
||||
try:
|
||||
with path.open("r", encoding="utf-8", errors="replace") as f:
|
||||
return f.read(max_bytes)
|
||||
except OSError:
|
||||
return ""
|
||||
|
||||
|
||||
_SYSTEMD_SUFFIXES: set[str] = {
|
||||
".service",
|
||||
".socket",
|
||||
".target",
|
||||
".timer",
|
||||
".path",
|
||||
".mount",
|
||||
".automount",
|
||||
".slice",
|
||||
".swap",
|
||||
".scope",
|
||||
".link",
|
||||
".netdev",
|
||||
".network",
|
||||
}
|
||||
|
||||
|
||||
def _looks_like_systemd(text: str) -> bool:
|
||||
# Be conservative: many INI-style configs have [section] and key=value.
|
||||
# systemd unit files almost always contain one of these well-known sections.
|
||||
if re.search(
|
||||
r"^\s*\[(Unit|Service|Install|Socket|Timer|Path|Mount|Automount|Slice|Swap|Scope)\]\s*$",
|
||||
text,
|
||||
re.M,
|
||||
) and re.search(r"^\s*\w[\w\-]*\s*=", text, re.M):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def detect_format(path: Path, explicit: str | None = None) -> str:
|
||||
"""
|
||||
Determine config format from argument or filename.
|
||||
Determine config format.
|
||||
|
||||
For unambiguous extensions (json/yaml/toml/xml/ini), we rely on the suffix.
|
||||
For ambiguous extensions like '.conf' (or no extension), we sniff the content.
|
||||
"""
|
||||
if explicit:
|
||||
return explicit
|
||||
|
||||
suffix = path.suffix.lower()
|
||||
name = path.name.lower()
|
||||
|
||||
# Unambiguous extensions
|
||||
if suffix == ".toml":
|
||||
return "toml"
|
||||
if suffix in {".yaml", ".yml"}:
|
||||
return "yaml"
|
||||
if suffix == ".json":
|
||||
return "json"
|
||||
if suffix in {".ini", ".cfg", ".conf"} or name.endswith(".ini"):
|
||||
return "ini"
|
||||
if suffix == ".xml":
|
||||
return "xml"
|
||||
|
||||
# Special-ish INI-like formats
|
||||
if suffix in {".ini", ".cfg"} or name.endswith(".ini"):
|
||||
return "ini"
|
||||
if suffix == ".repo":
|
||||
return "ini"
|
||||
|
||||
# systemd units
|
||||
if suffix in _SYSTEMD_SUFFIXES:
|
||||
return "systemd"
|
||||
|
||||
# well-known filenames
|
||||
if name == "main.cf":
|
||||
return "postfix"
|
||||
|
||||
head = _read_head(path)
|
||||
|
||||
# Content sniffing
|
||||
if _looks_like_systemd(head):
|
||||
return "systemd"
|
||||
|
||||
# Ambiguous .conf/.cf defaults to INI-ish if no better match
|
||||
if suffix in {".conf", ".cf"}:
|
||||
if name == "main.cf":
|
||||
return "postfix"
|
||||
return "ini"
|
||||
|
||||
# Fallback: treat as INI-ish
|
||||
return "ini"
|
||||
|
||||
|
|
|
|||
|
|
@ -8,6 +8,9 @@ from .toml import TomlHandler
|
|||
from .yaml import YamlHandler
|
||||
from .xml import XmlHandler
|
||||
|
||||
from .postfix import PostfixMainHandler
|
||||
from .systemd import SystemdUnitHandler
|
||||
|
||||
__all__ = [
|
||||
"BaseHandler",
|
||||
"DictLikeHandler",
|
||||
|
|
@ -16,4 +19,6 @@ __all__ = [
|
|||
"TomlHandler",
|
||||
"YamlHandler",
|
||||
"XmlHandler",
|
||||
"PostfixMainHandler",
|
||||
"SystemdUnitHandler",
|
||||
]
|
||||
|
|
|
|||
177
src/jinjaturtle/handlers/postfix.py
Normal file
177
src/jinjaturtle/handlers/postfix.py
Normal file
|
|
@ -0,0 +1,177 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from . import BaseHandler
|
||||
|
||||
|
||||
class PostfixMainHandler(BaseHandler):
|
||||
"""
|
||||
Handler for Postfix main.cf style configuration.
|
||||
|
||||
Postfix main.cf is largely 'key = value' with:
|
||||
- '#' comments
|
||||
- continuation lines starting with whitespace (they continue the previous value)
|
||||
"""
|
||||
|
||||
fmt = "postfix"
|
||||
|
||||
def parse(self, path: Path) -> dict[str, str]:
|
||||
text = path.read_text(encoding="utf-8")
|
||||
return self._parse_text_to_dict(text)
|
||||
|
||||
def _parse_text_to_dict(self, text: str) -> dict[str, str]:
|
||||
lines = text.splitlines()
|
||||
out: dict[str, str] = {}
|
||||
i = 0
|
||||
while i < len(lines):
|
||||
line = lines[i]
|
||||
stripped = line.strip()
|
||||
if not stripped or stripped.startswith("#"):
|
||||
i += 1
|
||||
continue
|
||||
|
||||
if "=" not in line:
|
||||
i += 1
|
||||
continue
|
||||
|
||||
eq_index = line.find("=")
|
||||
key = line[:eq_index].strip()
|
||||
if not key:
|
||||
i += 1
|
||||
continue
|
||||
|
||||
# value + inline comment
|
||||
after = line[eq_index + 1 :]
|
||||
value_part, _comment = self._split_inline_comment(after, {"#"})
|
||||
value = value_part.strip()
|
||||
|
||||
# collect continuation lines
|
||||
j = i + 1
|
||||
cont_parts: list[str] = []
|
||||
while j < len(lines):
|
||||
nxt = lines[j]
|
||||
if not nxt:
|
||||
break
|
||||
if nxt.startswith((" ", "\t")):
|
||||
if nxt.strip().startswith("#"):
|
||||
# a commented continuation line - treat as a break
|
||||
break
|
||||
cont_parts.append(nxt.strip())
|
||||
j += 1
|
||||
continue
|
||||
break
|
||||
|
||||
if cont_parts:
|
||||
value = " ".join([value] + cont_parts).strip()
|
||||
|
||||
out[key] = value
|
||||
i = j if cont_parts else i + 1
|
||||
|
||||
return out
|
||||
|
||||
def flatten(self, parsed: Any) -> list[tuple[tuple[str, ...], Any]]:
|
||||
if not isinstance(parsed, dict):
|
||||
raise TypeError("Postfix parse result must be a dict[str, str]")
|
||||
items: list[tuple[tuple[str, ...], Any]] = []
|
||||
for k, v in parsed.items():
|
||||
items.append(((k,), v))
|
||||
return items
|
||||
|
||||
def generate_jinja2_template(
|
||||
self,
|
||||
parsed: Any,
|
||||
role_prefix: str,
|
||||
original_text: str | None = None,
|
||||
) -> str:
|
||||
if original_text is None:
|
||||
# Canonical render (lossy)
|
||||
if not isinstance(parsed, dict):
|
||||
raise TypeError("Postfix parse result must be a dict[str, str]")
|
||||
lines: list[str] = []
|
||||
for k, v in parsed.items():
|
||||
var = self.make_var_name(role_prefix, (k,))
|
||||
lines.append(f"{k} = {{{{ {var} }}}}")
|
||||
return "\n".join(lines).rstrip() + "\n"
|
||||
return self._generate_from_text(role_prefix, original_text)
|
||||
|
||||
def _generate_from_text(self, role_prefix: str, text: str) -> str:
|
||||
lines = text.splitlines(keepends=True)
|
||||
out_lines: list[str] = []
|
||||
i = 0
|
||||
while i < len(lines):
|
||||
raw_line = lines[i]
|
||||
content = raw_line.rstrip("\n")
|
||||
newline = "\n" if raw_line.endswith("\n") else ""
|
||||
|
||||
stripped = content.strip()
|
||||
if not stripped:
|
||||
out_lines.append(raw_line)
|
||||
i += 1
|
||||
continue
|
||||
if stripped.startswith("#"):
|
||||
out_lines.append(raw_line)
|
||||
i += 1
|
||||
continue
|
||||
|
||||
if "=" not in content:
|
||||
out_lines.append(raw_line)
|
||||
i += 1
|
||||
continue
|
||||
|
||||
eq_index = content.find("=")
|
||||
before_eq = content[:eq_index]
|
||||
after_eq = content[eq_index + 1 :]
|
||||
|
||||
key = before_eq.strip()
|
||||
if not key:
|
||||
out_lines.append(raw_line)
|
||||
i += 1
|
||||
continue
|
||||
|
||||
# whitespace after '='
|
||||
value_ws_len = len(after_eq) - len(after_eq.lstrip(" \t"))
|
||||
leading_ws = after_eq[:value_ws_len]
|
||||
value_and_comment = after_eq[value_ws_len:]
|
||||
|
||||
value_part, comment_part = self._split_inline_comment(
|
||||
value_and_comment, {"#"}
|
||||
)
|
||||
value = value_part.strip()
|
||||
|
||||
# collect continuation physical lines to skip
|
||||
j = i + 1
|
||||
cont_parts: list[str] = []
|
||||
while j < len(lines):
|
||||
nxt_raw = lines[j]
|
||||
nxt = nxt_raw.rstrip("\n")
|
||||
if (
|
||||
nxt.startswith((" ", "\t"))
|
||||
and nxt.strip()
|
||||
and not nxt.strip().startswith("#")
|
||||
):
|
||||
cont_parts.append(nxt.strip())
|
||||
j += 1
|
||||
continue
|
||||
break
|
||||
|
||||
if cont_parts:
|
||||
value = " ".join([value] + cont_parts).strip()
|
||||
|
||||
var = self.make_var_name(role_prefix, (key,))
|
||||
v = value
|
||||
quoted = len(v) >= 2 and v[0] == v[-1] and v[0] in {'"', "'"}
|
||||
if quoted:
|
||||
replacement = (
|
||||
f'{before_eq}={leading_ws}"{{{{ {var} }}}}"{comment_part}{newline}'
|
||||
)
|
||||
else:
|
||||
replacement = (
|
||||
f"{before_eq}={leading_ws}{{{{ {var} }}}}{comment_part}{newline}"
|
||||
)
|
||||
|
||||
out_lines.append(replacement)
|
||||
i = j # skip continuation lines (if any)
|
||||
|
||||
return "".join(out_lines)
|
||||
177
src/jinjaturtle/handlers/systemd.py
Normal file
177
src/jinjaturtle/handlers/systemd.py
Normal file
|
|
@ -0,0 +1,177 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from . import BaseHandler
|
||||
|
||||
|
||||
@dataclass
|
||||
class SystemdLine:
|
||||
kind: str # 'blank' | 'comment' | 'section' | 'kv' | 'raw'
|
||||
raw: str
|
||||
lineno: int
|
||||
section: str | None = None
|
||||
key: str | None = None
|
||||
value: str | None = None
|
||||
comment: str = ""
|
||||
before_eq: str = ""
|
||||
leading_ws_after_eq: str = ""
|
||||
occ_index: int | None = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class SystemdUnit:
|
||||
lines: list[SystemdLine]
|
||||
|
||||
|
||||
class SystemdUnitHandler(BaseHandler):
|
||||
"""
|
||||
Handler for systemd unit files.
|
||||
|
||||
unit files are INI-like, but keys may repeat (e.g. multiple ExecStart= lines).
|
||||
We preserve repeated keys by indexing them when flattening and templating.
|
||||
"""
|
||||
|
||||
fmt = "systemd"
|
||||
|
||||
def parse(self, path: Path) -> SystemdUnit:
|
||||
text = path.read_text(encoding="utf-8")
|
||||
return self._parse_text(text)
|
||||
|
||||
def _parse_text(self, text: str) -> SystemdUnit:
|
||||
lines = text.splitlines(keepends=True)
|
||||
out: list[SystemdLine] = []
|
||||
current_section: str | None = None
|
||||
# counts per section+key to assign occ_index
|
||||
occ: dict[tuple[str, str], int] = {}
|
||||
|
||||
for lineno, raw_line in enumerate(lines, start=1):
|
||||
content = raw_line.rstrip("\n")
|
||||
stripped = content.strip()
|
||||
|
||||
if not stripped:
|
||||
out.append(SystemdLine(kind="blank", raw=raw_line, lineno=lineno))
|
||||
continue
|
||||
|
||||
if stripped.startswith(("#", ";")):
|
||||
out.append(SystemdLine(kind="comment", raw=raw_line, lineno=lineno))
|
||||
continue
|
||||
|
||||
# section header
|
||||
if (
|
||||
stripped.startswith("[")
|
||||
and stripped.endswith("]")
|
||||
and len(stripped) >= 2
|
||||
):
|
||||
sec = stripped[1:-1].strip()
|
||||
current_section = sec
|
||||
out.append(
|
||||
SystemdLine(
|
||||
kind="section", raw=raw_line, lineno=lineno, section=sec
|
||||
)
|
||||
)
|
||||
continue
|
||||
|
||||
if "=" not in content:
|
||||
out.append(SystemdLine(kind="raw", raw=raw_line, lineno=lineno))
|
||||
continue
|
||||
|
||||
eq_index = content.find("=")
|
||||
before_eq = content[:eq_index]
|
||||
after_eq = content[eq_index + 1 :]
|
||||
|
||||
key = before_eq.strip()
|
||||
if not key:
|
||||
out.append(SystemdLine(kind="raw", raw=raw_line, lineno=lineno))
|
||||
continue
|
||||
|
||||
# whitespace after '='
|
||||
value_ws_len = len(after_eq) - len(after_eq.lstrip(" \t"))
|
||||
leading_ws = after_eq[:value_ws_len]
|
||||
value_and_comment = after_eq[value_ws_len:]
|
||||
|
||||
value_part, comment = self._split_inline_comment(
|
||||
value_and_comment, {"#", ";"}
|
||||
)
|
||||
value = value_part.strip()
|
||||
|
||||
sec = current_section or "DEFAULT"
|
||||
k = (sec, key)
|
||||
idx = occ.get(k, 0)
|
||||
occ[k] = idx + 1
|
||||
|
||||
out.append(
|
||||
SystemdLine(
|
||||
kind="kv",
|
||||
raw=raw_line,
|
||||
lineno=lineno,
|
||||
section=sec,
|
||||
key=key,
|
||||
value=value,
|
||||
comment=comment,
|
||||
before_eq=before_eq,
|
||||
leading_ws_after_eq=leading_ws,
|
||||
occ_index=idx,
|
||||
)
|
||||
)
|
||||
|
||||
return SystemdUnit(lines=out)
|
||||
|
||||
def flatten(self, parsed: Any) -> list[tuple[tuple[str, ...], Any]]:
|
||||
if not isinstance(parsed, SystemdUnit):
|
||||
raise TypeError("systemd parse result must be a SystemdUnit")
|
||||
|
||||
# determine duplicates per (section,key)
|
||||
counts: dict[tuple[str, str], int] = {}
|
||||
for ln in parsed.lines:
|
||||
if ln.kind == "kv" and ln.section and ln.key:
|
||||
counts[(ln.section, ln.key)] = counts.get((ln.section, ln.key), 0) + 1
|
||||
|
||||
items: list[tuple[tuple[str, ...], Any]] = []
|
||||
for ln in parsed.lines:
|
||||
if ln.kind != "kv" or not ln.section or not ln.key:
|
||||
continue
|
||||
path: tuple[str, ...] = (ln.section, ln.key)
|
||||
if counts.get((ln.section, ln.key), 0) > 1 and ln.occ_index is not None:
|
||||
path = path + (str(ln.occ_index),)
|
||||
items.append((path, ln.value or ""))
|
||||
return items
|
||||
|
||||
def generate_jinja2_template(
|
||||
self,
|
||||
parsed: Any,
|
||||
role_prefix: str,
|
||||
original_text: str | None = None,
|
||||
) -> str:
|
||||
if not isinstance(parsed, SystemdUnit):
|
||||
raise TypeError("systemd parse result must be a SystemdUnit")
|
||||
# We template using parsed lines so we preserve original formatting/comments.
|
||||
counts: dict[tuple[str, str], int] = {}
|
||||
for ln in parsed.lines:
|
||||
if ln.kind == "kv" and ln.section and ln.key:
|
||||
counts[(ln.section, ln.key)] = counts.get((ln.section, ln.key), 0) + 1
|
||||
|
||||
out_lines: list[str] = []
|
||||
for ln in parsed.lines:
|
||||
if ln.kind != "kv" or not ln.section or not ln.key:
|
||||
out_lines.append(ln.raw)
|
||||
continue
|
||||
|
||||
path: tuple[str, ...] = (ln.section, ln.key)
|
||||
if counts.get((ln.section, ln.key), 0) > 1 and ln.occ_index is not None:
|
||||
path = path + (str(ln.occ_index),)
|
||||
var = self.make_var_name(role_prefix, path)
|
||||
|
||||
v = (ln.value or "").strip()
|
||||
quoted = len(v) >= 2 and v[0] == v[-1] and v[0] in {'"', "'"}
|
||||
if quoted:
|
||||
repl = f'{ln.before_eq}={ln.leading_ws_after_eq}"{{{{ {var} }}}}"{ln.comment}'
|
||||
else:
|
||||
repl = f"{ln.before_eq}={ln.leading_ws_after_eq}{{{{ {var} }}}}{ln.comment}"
|
||||
|
||||
newline = "\n" if ln.raw.endswith("\n") else ""
|
||||
out_lines.append(repl + newline)
|
||||
|
||||
return "".join(out_lines)
|
||||
|
|
@ -36,7 +36,7 @@ SUPPORTED_SUFFIXES: dict[str, set[str]] = {
|
|||
"toml": {".toml"},
|
||||
"yaml": {".yaml", ".yml"},
|
||||
"json": {".json"},
|
||||
"ini": {".ini", ".cfg", ".conf"},
|
||||
"ini": {".ini", ".cfg", ".conf", ".repo"},
|
||||
"xml": {".xml"},
|
||||
}
|
||||
|
||||
|
|
@ -584,6 +584,9 @@ class FormatOutput:
|
|||
items: list[dict[str, Any]]
|
||||
|
||||
|
||||
FOLDER_SUPPORTED_FORMATS: set[str] = {"json", "yaml", "toml", "ini", "xml"}
|
||||
|
||||
|
||||
def process_directory(
|
||||
root: Path, recursive: bool, role_prefix: str
|
||||
) -> tuple[str, list[FormatOutput]]:
|
||||
|
|
@ -596,8 +599,14 @@ def process_directory(
|
|||
grouped: dict[str, list[tuple[Path, Any]]] = defaultdict(list)
|
||||
for p in files:
|
||||
fmt, parsed = parse_config(p, None)
|
||||
if fmt not in FOLDER_SUPPORTED_FORMATS:
|
||||
# Directory mode only supports a subset of formats for now.
|
||||
continue
|
||||
grouped[fmt].append((p, parsed))
|
||||
|
||||
if not grouped:
|
||||
raise ValueError(f"No folder-supported config files found under: {root}")
|
||||
|
||||
multiple_formats = len(grouped) > 1
|
||||
outputs: list[FormatOutput] = []
|
||||
|
||||
|
|
|
|||
33
tests/test_postfix_format.py
Normal file
33
tests/test_postfix_format.py
Normal file
|
|
@ -0,0 +1,33 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import jinjaturtle.core as core
|
||||
|
||||
|
||||
def test_postfix_main_cf_parsing_and_template(tmp_path: Path) -> None:
|
||||
p = tmp_path / "main.cf"
|
||||
p.write_text(
|
||||
"# comment\n"
|
||||
"myhostname = mail.example.com\n"
|
||||
"mynetworks = 127.0.0.0/8\n"
|
||||
" [::1]/128\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
fmt, parsed = core.parse_config(p)
|
||||
assert fmt == "postfix"
|
||||
|
||||
flat = core.flatten_config(fmt, parsed)
|
||||
assert (("myhostname",), "mail.example.com") in flat
|
||||
assert any(
|
||||
path == ("mynetworks",) and value.startswith("127.0.0.0/8")
|
||||
for path, value in flat
|
||||
)
|
||||
|
||||
template = core.generate_jinja2_template(
|
||||
fmt, parsed, role_prefix="role", original_text=p.read_text(encoding="utf-8")
|
||||
)
|
||||
assert "myhostname = {{ role_myhostname }}" in template
|
||||
assert "mynetworks = {{ role_mynetworks }}" in template
|
||||
assert "# comment" in template
|
||||
26
tests/test_systemd_format.py
Normal file
26
tests/test_systemd_format.py
Normal file
|
|
@ -0,0 +1,26 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import jinjaturtle.core as core
|
||||
|
||||
|
||||
def test_systemd_unit_repeated_keys(tmp_path: Path) -> None:
|
||||
p = tmp_path / "demo.service"
|
||||
p.write_text(
|
||||
"[Service]\n" "ExecStart=/bin/echo one\n" "ExecStart=/bin/echo two\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
fmt, parsed = core.parse_config(p)
|
||||
assert fmt == "systemd"
|
||||
|
||||
flat = core.flatten_config(fmt, parsed)
|
||||
assert (("Service", "ExecStart", "0"), "/bin/echo one") in flat
|
||||
assert (("Service", "ExecStart", "1"), "/bin/echo two") in flat
|
||||
|
||||
template = core.generate_jinja2_template(
|
||||
fmt, parsed, role_prefix="role", original_text=p.read_text(encoding="utf-8")
|
||||
)
|
||||
assert "ExecStart={{ role_service_execstart_0 }}" in template
|
||||
assert "ExecStart={{ role_service_execstart_1 }}" in template
|
||||
Loading…
Add table
Add a link
Reference in a new issue