diff --git a/README.md b/README.md index 0cd022d..0e0ad48 100644 --- a/README.md +++ b/README.md @@ -127,12 +127,12 @@ jinjaturtle php.ini \ ## Full usage info ``` -usage: jinjaturtle [-h] -r ROLE_NAME [-f {json,ini,toml,yaml,xml}] [-d DEFAULTS_OUTPUT] [-t TEMPLATE_OUTPUT] config +usage: jinjaturtle [-h] -r ROLE_NAME [-f {json,ini,toml,yaml,xml,postfix,systemd}] [-d DEFAULTS_OUTPUT] [-t TEMPLATE_OUTPUT] config Convert a config file into Ansible inventory and a Jinja2 template. positional arguments: - config Path to the source configuration file (TOML or INI-style). + config Path to the source configuration file. options: -h, --help show this help message and exit @@ -146,6 +146,15 @@ options: Path to write the Jinja2 config template. If omitted, template is printed to stdout. ``` +## Additional supported formats + +JinjaTurtle can also template some common "bespoke" config formats: + +- **Postfix main.cf** (`main.cf`) → `--format postfix` +- **systemd unit files** (`*.service`, `*.socket`, etc.) → `--format systemd` + +For ambiguous extensions like `*.conf`, JinjaTurtle uses lightweight content sniffing; you can always force a specific handler via `--format`. + ## Found a bug, have a suggestion? diff --git a/pyproject.toml b/pyproject.toml index 3ed9e59..3fc5c4a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "jinjaturtle" -version = "0.3.5" +version = "0.4.0" description = "Convert config files into Ansible defaults and Jinja2 templates." authors = ["Miguel Jacq "] license = "GPL-3.0-or-later" diff --git a/src/jinjaturtle/cli.py b/src/jinjaturtle/cli.py index e6f74ec..74fdd8e 100644 --- a/src/jinjaturtle/cli.py +++ b/src/jinjaturtle/cli.py @@ -42,7 +42,7 @@ def _build_arg_parser() -> argparse.ArgumentParser: ap.add_argument( "-f", "--format", - choices=["ini", "json", "toml", "yaml", "xml"], + choices=["ini", "json", "toml", "yaml", "xml", "postfix", "systemd"], help="Force config format instead of auto-detecting from filename.", ) ap.add_argument( diff --git a/src/jinjaturtle/core.py b/src/jinjaturtle/core.py index fee5e80..d53f182 100644 --- a/src/jinjaturtle/core.py +++ b/src/jinjaturtle/core.py @@ -4,6 +4,7 @@ from pathlib import Path from typing import Any, Iterable import datetime +import re import yaml from .loop_analyzer import LoopAnalyzer, LoopCandidate @@ -14,6 +15,8 @@ from .handlers import ( TomlHandler, YamlHandler, XmlHandler, + PostfixMainHandler, + SystemdUnitHandler, ) @@ -56,12 +59,18 @@ _TOML_HANDLER = TomlHandler() _YAML_HANDLER = YamlHandler() _XML_HANDLER = XmlHandler() +_POSTFIX_HANDLER = PostfixMainHandler() +_SYSTEMD_HANDLER = SystemdUnitHandler() + _HANDLERS["ini"] = _INI_HANDLER _HANDLERS["json"] = _JSON_HANDLER _HANDLERS["toml"] = _TOML_HANDLER _HANDLERS["yaml"] = _YAML_HANDLER _HANDLERS["xml"] = _XML_HANDLER +_HANDLERS["postfix"] = _POSTFIX_HANDLER +_HANDLERS["systemd"] = _SYSTEMD_HANDLER + def dump_yaml(data: Any, *, sort_keys: bool = True) -> str: """Dump YAML using JinjaTurtle's dumper settings. @@ -86,24 +95,92 @@ def make_var_name(role_prefix: str, path: Iterable[str]) -> str: return BaseHandler.make_var_name(role_prefix, path) +def _read_head(path: Path, max_bytes: int = 65536) -> str: + try: + with path.open("r", encoding="utf-8", errors="replace") as f: + return f.read(max_bytes) + except OSError: + return "" + + +_SYSTEMD_SUFFIXES: set[str] = { + ".service", + ".socket", + ".target", + ".timer", + ".path", + ".mount", + ".automount", + ".slice", + ".swap", + ".scope", + ".link", + ".netdev", + ".network", +} + + +def _looks_like_systemd(text: str) -> bool: + # Be conservative: many INI-style configs have [section] and key=value. + # systemd unit files almost always contain one of these well-known sections. + if re.search( + r"^\s*\[(Unit|Service|Install|Socket|Timer|Path|Mount|Automount|Slice|Swap|Scope)\]\s*$", + text, + re.M, + ) and re.search(r"^\s*\w[\w\-]*\s*=", text, re.M): + return True + return False + + def detect_format(path: Path, explicit: str | None = None) -> str: """ - Determine config format from argument or filename. + Determine config format. + + For unambiguous extensions (json/yaml/toml/xml/ini), we rely on the suffix. + For ambiguous extensions like '.conf' (or no extension), we sniff the content. """ if explicit: return explicit + suffix = path.suffix.lower() name = path.name.lower() + + # Unambiguous extensions if suffix == ".toml": return "toml" if suffix in {".yaml", ".yml"}: return "yaml" if suffix == ".json": return "json" - if suffix in {".ini", ".cfg", ".conf"} or name.endswith(".ini"): - return "ini" if suffix == ".xml": return "xml" + + # Special-ish INI-like formats + if suffix in {".ini", ".cfg"} or name.endswith(".ini"): + return "ini" + if suffix == ".repo": + return "ini" + + # systemd units + if suffix in _SYSTEMD_SUFFIXES: + return "systemd" + + # well-known filenames + if name == "main.cf": + return "postfix" + + head = _read_head(path) + + # Content sniffing + if _looks_like_systemd(head): + return "systemd" + + # Ambiguous .conf/.cf defaults to INI-ish if no better match + if suffix in {".conf", ".cf"}: + if name == "main.cf": + return "postfix" + return "ini" + # Fallback: treat as INI-ish return "ini" diff --git a/src/jinjaturtle/handlers/__init__.py b/src/jinjaturtle/handlers/__init__.py index 6bbcba1..97074c5 100644 --- a/src/jinjaturtle/handlers/__init__.py +++ b/src/jinjaturtle/handlers/__init__.py @@ -8,6 +8,9 @@ from .toml import TomlHandler from .yaml import YamlHandler from .xml import XmlHandler +from .postfix import PostfixMainHandler +from .systemd import SystemdUnitHandler + __all__ = [ "BaseHandler", "DictLikeHandler", @@ -16,4 +19,6 @@ __all__ = [ "TomlHandler", "YamlHandler", "XmlHandler", + "PostfixMainHandler", + "SystemdUnitHandler", ] diff --git a/src/jinjaturtle/handlers/postfix.py b/src/jinjaturtle/handlers/postfix.py new file mode 100644 index 0000000..65f6be9 --- /dev/null +++ b/src/jinjaturtle/handlers/postfix.py @@ -0,0 +1,177 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Any + +from . import BaseHandler + + +class PostfixMainHandler(BaseHandler): + """ + Handler for Postfix main.cf style configuration. + + Postfix main.cf is largely 'key = value' with: + - '#' comments + - continuation lines starting with whitespace (they continue the previous value) + """ + + fmt = "postfix" + + def parse(self, path: Path) -> dict[str, str]: + text = path.read_text(encoding="utf-8") + return self._parse_text_to_dict(text) + + def _parse_text_to_dict(self, text: str) -> dict[str, str]: + lines = text.splitlines() + out: dict[str, str] = {} + i = 0 + while i < len(lines): + line = lines[i] + stripped = line.strip() + if not stripped or stripped.startswith("#"): + i += 1 + continue + + if "=" not in line: + i += 1 + continue + + eq_index = line.find("=") + key = line[:eq_index].strip() + if not key: + i += 1 + continue + + # value + inline comment + after = line[eq_index + 1 :] + value_part, _comment = self._split_inline_comment(after, {"#"}) + value = value_part.strip() + + # collect continuation lines + j = i + 1 + cont_parts: list[str] = [] + while j < len(lines): + nxt = lines[j] + if not nxt: + break + if nxt.startswith((" ", "\t")): + if nxt.strip().startswith("#"): + # a commented continuation line - treat as a break + break + cont_parts.append(nxt.strip()) + j += 1 + continue + break + + if cont_parts: + value = " ".join([value] + cont_parts).strip() + + out[key] = value + i = j if cont_parts else i + 1 + + return out + + def flatten(self, parsed: Any) -> list[tuple[tuple[str, ...], Any]]: + if not isinstance(parsed, dict): + raise TypeError("Postfix parse result must be a dict[str, str]") + items: list[tuple[tuple[str, ...], Any]] = [] + for k, v in parsed.items(): + items.append(((k,), v)) + return items + + def generate_jinja2_template( + self, + parsed: Any, + role_prefix: str, + original_text: str | None = None, + ) -> str: + if original_text is None: + # Canonical render (lossy) + if not isinstance(parsed, dict): + raise TypeError("Postfix parse result must be a dict[str, str]") + lines: list[str] = [] + for k, v in parsed.items(): + var = self.make_var_name(role_prefix, (k,)) + lines.append(f"{k} = {{{{ {var} }}}}") + return "\n".join(lines).rstrip() + "\n" + return self._generate_from_text(role_prefix, original_text) + + def _generate_from_text(self, role_prefix: str, text: str) -> str: + lines = text.splitlines(keepends=True) + out_lines: list[str] = [] + i = 0 + while i < len(lines): + raw_line = lines[i] + content = raw_line.rstrip("\n") + newline = "\n" if raw_line.endswith("\n") else "" + + stripped = content.strip() + if not stripped: + out_lines.append(raw_line) + i += 1 + continue + if stripped.startswith("#"): + out_lines.append(raw_line) + i += 1 + continue + + if "=" not in content: + out_lines.append(raw_line) + i += 1 + continue + + eq_index = content.find("=") + before_eq = content[:eq_index] + after_eq = content[eq_index + 1 :] + + key = before_eq.strip() + if not key: + out_lines.append(raw_line) + i += 1 + continue + + # whitespace after '=' + value_ws_len = len(after_eq) - len(after_eq.lstrip(" \t")) + leading_ws = after_eq[:value_ws_len] + value_and_comment = after_eq[value_ws_len:] + + value_part, comment_part = self._split_inline_comment( + value_and_comment, {"#"} + ) + value = value_part.strip() + + # collect continuation physical lines to skip + j = i + 1 + cont_parts: list[str] = [] + while j < len(lines): + nxt_raw = lines[j] + nxt = nxt_raw.rstrip("\n") + if ( + nxt.startswith((" ", "\t")) + and nxt.strip() + and not nxt.strip().startswith("#") + ): + cont_parts.append(nxt.strip()) + j += 1 + continue + break + + if cont_parts: + value = " ".join([value] + cont_parts).strip() + + var = self.make_var_name(role_prefix, (key,)) + v = value + quoted = len(v) >= 2 and v[0] == v[-1] and v[0] in {'"', "'"} + if quoted: + replacement = ( + f'{before_eq}={leading_ws}"{{{{ {var} }}}}"{comment_part}{newline}' + ) + else: + replacement = ( + f"{before_eq}={leading_ws}{{{{ {var} }}}}{comment_part}{newline}" + ) + + out_lines.append(replacement) + i = j # skip continuation lines (if any) + + return "".join(out_lines) diff --git a/src/jinjaturtle/handlers/systemd.py b/src/jinjaturtle/handlers/systemd.py new file mode 100644 index 0000000..044fd86 --- /dev/null +++ b/src/jinjaturtle/handlers/systemd.py @@ -0,0 +1,177 @@ +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +from . import BaseHandler + + +@dataclass +class SystemdLine: + kind: str # 'blank' | 'comment' | 'section' | 'kv' | 'raw' + raw: str + lineno: int + section: str | None = None + key: str | None = None + value: str | None = None + comment: str = "" + before_eq: str = "" + leading_ws_after_eq: str = "" + occ_index: int | None = None + + +@dataclass +class SystemdUnit: + lines: list[SystemdLine] + + +class SystemdUnitHandler(BaseHandler): + """ + Handler for systemd unit files. + + unit files are INI-like, but keys may repeat (e.g. multiple ExecStart= lines). + We preserve repeated keys by indexing them when flattening and templating. + """ + + fmt = "systemd" + + def parse(self, path: Path) -> SystemdUnit: + text = path.read_text(encoding="utf-8") + return self._parse_text(text) + + def _parse_text(self, text: str) -> SystemdUnit: + lines = text.splitlines(keepends=True) + out: list[SystemdLine] = [] + current_section: str | None = None + # counts per section+key to assign occ_index + occ: dict[tuple[str, str], int] = {} + + for lineno, raw_line in enumerate(lines, start=1): + content = raw_line.rstrip("\n") + stripped = content.strip() + + if not stripped: + out.append(SystemdLine(kind="blank", raw=raw_line, lineno=lineno)) + continue + + if stripped.startswith(("#", ";")): + out.append(SystemdLine(kind="comment", raw=raw_line, lineno=lineno)) + continue + + # section header + if ( + stripped.startswith("[") + and stripped.endswith("]") + and len(stripped) >= 2 + ): + sec = stripped[1:-1].strip() + current_section = sec + out.append( + SystemdLine( + kind="section", raw=raw_line, lineno=lineno, section=sec + ) + ) + continue + + if "=" not in content: + out.append(SystemdLine(kind="raw", raw=raw_line, lineno=lineno)) + continue + + eq_index = content.find("=") + before_eq = content[:eq_index] + after_eq = content[eq_index + 1 :] + + key = before_eq.strip() + if not key: + out.append(SystemdLine(kind="raw", raw=raw_line, lineno=lineno)) + continue + + # whitespace after '=' + value_ws_len = len(after_eq) - len(after_eq.lstrip(" \t")) + leading_ws = after_eq[:value_ws_len] + value_and_comment = after_eq[value_ws_len:] + + value_part, comment = self._split_inline_comment( + value_and_comment, {"#", ";"} + ) + value = value_part.strip() + + sec = current_section or "DEFAULT" + k = (sec, key) + idx = occ.get(k, 0) + occ[k] = idx + 1 + + out.append( + SystemdLine( + kind="kv", + raw=raw_line, + lineno=lineno, + section=sec, + key=key, + value=value, + comment=comment, + before_eq=before_eq, + leading_ws_after_eq=leading_ws, + occ_index=idx, + ) + ) + + return SystemdUnit(lines=out) + + def flatten(self, parsed: Any) -> list[tuple[tuple[str, ...], Any]]: + if not isinstance(parsed, SystemdUnit): + raise TypeError("systemd parse result must be a SystemdUnit") + + # determine duplicates per (section,key) + counts: dict[tuple[str, str], int] = {} + for ln in parsed.lines: + if ln.kind == "kv" and ln.section and ln.key: + counts[(ln.section, ln.key)] = counts.get((ln.section, ln.key), 0) + 1 + + items: list[tuple[tuple[str, ...], Any]] = [] + for ln in parsed.lines: + if ln.kind != "kv" or not ln.section or not ln.key: + continue + path: tuple[str, ...] = (ln.section, ln.key) + if counts.get((ln.section, ln.key), 0) > 1 and ln.occ_index is not None: + path = path + (str(ln.occ_index),) + items.append((path, ln.value or "")) + return items + + def generate_jinja2_template( + self, + parsed: Any, + role_prefix: str, + original_text: str | None = None, + ) -> str: + if not isinstance(parsed, SystemdUnit): + raise TypeError("systemd parse result must be a SystemdUnit") + # We template using parsed lines so we preserve original formatting/comments. + counts: dict[tuple[str, str], int] = {} + for ln in parsed.lines: + if ln.kind == "kv" and ln.section and ln.key: + counts[(ln.section, ln.key)] = counts.get((ln.section, ln.key), 0) + 1 + + out_lines: list[str] = [] + for ln in parsed.lines: + if ln.kind != "kv" or not ln.section or not ln.key: + out_lines.append(ln.raw) + continue + + path: tuple[str, ...] = (ln.section, ln.key) + if counts.get((ln.section, ln.key), 0) > 1 and ln.occ_index is not None: + path = path + (str(ln.occ_index),) + var = self.make_var_name(role_prefix, path) + + v = (ln.value or "").strip() + quoted = len(v) >= 2 and v[0] == v[-1] and v[0] in {'"', "'"} + if quoted: + repl = f'{ln.before_eq}={ln.leading_ws_after_eq}"{{{{ {var} }}}}"{ln.comment}' + else: + repl = f"{ln.before_eq}={ln.leading_ws_after_eq}{{{{ {var} }}}}{ln.comment}" + + newline = "\n" if ln.raw.endswith("\n") else "" + out_lines.append(repl + newline) + + return "".join(out_lines) diff --git a/src/jinjaturtle/multi.py b/src/jinjaturtle/multi.py index fb1737f..20cf544 100644 --- a/src/jinjaturtle/multi.py +++ b/src/jinjaturtle/multi.py @@ -36,7 +36,7 @@ SUPPORTED_SUFFIXES: dict[str, set[str]] = { "toml": {".toml"}, "yaml": {".yaml", ".yml"}, "json": {".json"}, - "ini": {".ini", ".cfg", ".conf"}, + "ini": {".ini", ".cfg", ".conf", ".repo"}, "xml": {".xml"}, } @@ -584,6 +584,9 @@ class FormatOutput: items: list[dict[str, Any]] +FOLDER_SUPPORTED_FORMATS: set[str] = {"json", "yaml", "toml", "ini", "xml"} + + def process_directory( root: Path, recursive: bool, role_prefix: str ) -> tuple[str, list[FormatOutput]]: @@ -596,8 +599,14 @@ def process_directory( grouped: dict[str, list[tuple[Path, Any]]] = defaultdict(list) for p in files: fmt, parsed = parse_config(p, None) + if fmt not in FOLDER_SUPPORTED_FORMATS: + # Directory mode only supports a subset of formats for now. + continue grouped[fmt].append((p, parsed)) + if not grouped: + raise ValueError(f"No folder-supported config files found under: {root}") + multiple_formats = len(grouped) > 1 outputs: list[FormatOutput] = [] diff --git a/tests/test_postfix_format.py b/tests/test_postfix_format.py new file mode 100644 index 0000000..6bb9e5d --- /dev/null +++ b/tests/test_postfix_format.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +from pathlib import Path + +import jinjaturtle.core as core + + +def test_postfix_main_cf_parsing_and_template(tmp_path: Path) -> None: + p = tmp_path / "main.cf" + p.write_text( + "# comment\n" + "myhostname = mail.example.com\n" + "mynetworks = 127.0.0.0/8\n" + " [::1]/128\n", + encoding="utf-8", + ) + + fmt, parsed = core.parse_config(p) + assert fmt == "postfix" + + flat = core.flatten_config(fmt, parsed) + assert (("myhostname",), "mail.example.com") in flat + assert any( + path == ("mynetworks",) and value.startswith("127.0.0.0/8") + for path, value in flat + ) + + template = core.generate_jinja2_template( + fmt, parsed, role_prefix="role", original_text=p.read_text(encoding="utf-8") + ) + assert "myhostname = {{ role_myhostname }}" in template + assert "mynetworks = {{ role_mynetworks }}" in template + assert "# comment" in template diff --git a/tests/test_systemd_format.py b/tests/test_systemd_format.py new file mode 100644 index 0000000..c310f72 --- /dev/null +++ b/tests/test_systemd_format.py @@ -0,0 +1,26 @@ +from __future__ import annotations + +from pathlib import Path + +import jinjaturtle.core as core + + +def test_systemd_unit_repeated_keys(tmp_path: Path) -> None: + p = tmp_path / "demo.service" + p.write_text( + "[Service]\n" "ExecStart=/bin/echo one\n" "ExecStart=/bin/echo two\n", + encoding="utf-8", + ) + + fmt, parsed = core.parse_config(p) + assert fmt == "systemd" + + flat = core.flatten_config(fmt, parsed) + assert (("Service", "ExecStart", "0"), "/bin/echo one") in flat + assert (("Service", "ExecStart", "1"), "/bin/echo two") in flat + + template = core.generate_jinja2_template( + fmt, parsed, role_prefix="role", original_text=p.read_text(encoding="utf-8") + ) + assert "ExecStart={{ role_service_execstart_0 }}" in template + assert "ExecStart={{ role_service_execstart_1 }}" in template