Rename some methods, merge the loopable classes and just always try it

This commit is contained in:
Miguel Jacq 2025-11-28 12:28:46 +11:00
parent 2db80cc6e1
commit f66f58a7bb
Signed by: mig5
GPG key ID: 59B3F0C24135C6A9
20 changed files with 702 additions and 1051 deletions

View file

@ -1,6 +1,6 @@
[tool.poetry]
name = "jinjaturtle"
version = "0.1.4"
version = "0.2.0"
description = "Convert config files into Ansible defaults and Jinja2 templates."
authors = ["Miguel Jacq <mig@mig5.net>"]
license = "GPL-3.0-or-later"

View file

@ -9,8 +9,8 @@ from .core import (
parse_config,
analyze_loops,
flatten_config,
generate_defaults_yaml,
generate_template,
generate_ansible_yaml,
generate_jinja2_template,
)
@ -66,10 +66,10 @@ def _main(argv: list[str] | None = None) -> int:
flat_items = flatten_config(fmt, parsed, loop_candidates)
# Generate defaults YAML (with loop collections if detected)
defaults_yaml = generate_defaults_yaml(args.role_name, flat_items, loop_candidates)
ansible_yaml = generate_ansible_yaml(args.role_name, flat_items, loop_candidates)
# Generate template (with loops if detected)
template_str = generate_template(
template_str = generate_jinja2_template(
fmt,
parsed,
args.role_name,
@ -78,10 +78,10 @@ def _main(argv: list[str] | None = None) -> int:
)
if args.defaults_output:
Path(args.defaults_output).write_text(defaults_yaml, encoding="utf-8")
Path(args.defaults_output).write_text(ansible_yaml, encoding="utf-8")
else:
print("# defaults/main.yml")
print(defaults_yaml, end="")
print(ansible_yaml, end="")
if args.template_output:
Path(args.template_output).write_text(template_str, encoding="utf-8")

View file

@ -13,8 +13,6 @@ from .handlers import (
TomlHandler,
YamlHandler,
XmlHandler,
YamlHandlerLoopable,
XmlHandlerLoopable,
)
@ -56,8 +54,6 @@ _JSON_HANDLER = JsonHandler()
_TOML_HANDLER = TomlHandler()
_YAML_HANDLER = YamlHandler()
_XML_HANDLER = XmlHandler()
_YAML_HANDLER_LOOPABLE = YamlHandlerLoopable()
_XML_HANDLER_LOOPABLE = XmlHandlerLoopable()
_HANDLERS["ini"] = _INI_HANDLER
_HANDLERS["json"] = _JSON_HANDLER
@ -173,7 +169,7 @@ def _normalize_default_value(value: Any) -> Any:
return value
def generate_defaults_yaml(
def generate_ansible_yaml(
role_prefix: str,
flat_items: list[tuple[tuple[str, ...], Any]],
loop_candidates: list[LoopCandidate] | None = None,
@ -205,7 +201,7 @@ def generate_defaults_yaml(
)
def generate_template(
def generate_jinja2_template(
fmt: str,
parsed: Any,
role_prefix: str,
@ -215,24 +211,18 @@ def generate_template(
"""
Generate a Jinja2 template for the config.
"""
# Use enhanced handler if we have loop candidates
handler = _HANDLERS.get(fmt)
if loop_candidates and fmt in ("yaml", "xml"):
# Use enhanced handlers for YAML and XML when we have loops
if fmt == "yaml":
handler = _YAML_HANDLER_LOOPABLE
elif fmt == "xml":
handler = _XML_HANDLER_LOOPABLE
if handler is None:
raise ValueError(f"Unsupported format: {fmt}")
# Check if handler supports loop-aware generation
if hasattr(handler, "generate_template_with_loops") and loop_candidates:
return handler.generate_template_with_loops(
if hasattr(handler, "generate_jinja2_template_with_loops") and loop_candidates:
return handler.generate_jinja2_template_with_loops(
parsed, role_prefix, original_text, loop_candidates
)
# Fallback to original scalar-only generation
return handler.generate_template(parsed, role_prefix, original_text=original_text)
return handler.generate_jinja2_template(
parsed, role_prefix, original_text=original_text
)

View file

@ -7,8 +7,6 @@ from .json import JsonHandler
from .toml import TomlHandler
from .yaml import YamlHandler
from .xml import XmlHandler
from .xml_loopable import XmlHandlerLoopable
from .yaml_loopable import YamlHandlerLoopable
__all__ = [
"BaseHandler",
@ -18,6 +16,4 @@ __all__ = [
"TomlHandler",
"YamlHandler",
"XmlHandler",
"XmlHandlerLoopable",
"YamlHandlerLoopable",
]

View file

@ -11,7 +11,7 @@ class BaseHandler:
Each handler is responsible for:
- parse(path) -> parsed object
- flatten(parsed) -> list[(path_tuple, value)]
- generate_template(parsed, role_prefix, original_text=None) -> str
- generate_jinja2_template(parsed, role_prefix, original_text=None) -> str
"""
fmt: str # e.g. "ini", "yaml", ...
@ -22,7 +22,7 @@ class BaseHandler:
def flatten(self, parsed: Any) -> list[tuple[tuple[str, ...], Any]]:
raise NotImplementedError
def generate_template(
def generate_jinja2_template(
self,
parsed: Any,
role_prefix: str,

View file

@ -32,7 +32,7 @@ class IniHandler(BaseHandler):
items.append(((section, key), processed))
return items
def generate_template(
def generate_jinja2_template(
self,
parsed: Any,
role_prefix: str,

View file

@ -15,7 +15,7 @@ class JsonHandler(DictLikeHandler):
with path.open("r", encoding="utf-8") as f:
return json.load(f)
def generate_template(
def generate_jinja2_template(
self,
parsed: Any,
role_prefix: str,

View file

@ -19,7 +19,7 @@ class TomlHandler(DictLikeHandler):
with path.open("rb") as f:
return tomllib.load(f)
def generate_template(
def generate_jinja2_template(
self,
parsed: Any,
role_prefix: str,

View file

@ -5,19 +5,19 @@ from pathlib import Path
from typing import Any
import xml.etree.ElementTree as ET # nosec
from . import BaseHandler
from .base import BaseHandler
from ..loop_analyzer import LoopCandidate
class XmlHandler(BaseHandler):
"""
XML handler that can generate both scalar templates and loop-based templates.
"""
fmt = "xml"
def parse(self, path: Path) -> ET.Element:
text = path.read_text(encoding="utf-8")
# Parse with an explicit XMLParser instance so this stays compatible
# with Python versions where xml.etree.ElementTree.fromstring() may
# not accept a ``parser=`` keyword argument.
# defusedxml.defuse_stdlib() is called in the CLI entrypoint, so using
# the stdlib XMLParser here is safe.
parser = ET.XMLParser(
target=ET.TreeBuilder(insert_comments=False)
) # nosec B314
@ -30,12 +30,13 @@ class XmlHandler(BaseHandler):
raise TypeError("XML parser result must be an Element")
return self._flatten_xml(parsed)
def generate_template(
def generate_jinja2_template(
self,
parsed: Any,
role_prefix: str,
original_text: str | None = None,
) -> str:
"""Original scalar-only template generation."""
if original_text is not None:
return self._generate_xml_template_from_text(role_prefix, original_text)
if not isinstance(parsed, ET.Element):
@ -43,25 +44,30 @@ class XmlHandler(BaseHandler):
xml_str = ET.tostring(parsed, encoding="unicode")
return self._generate_xml_template_from_text(role_prefix, xml_str)
def _flatten_xml(self, root: ET.Element) -> list[tuple[tuple[str, ...], Any]]:
"""
Flatten an XML tree into (path, value) pairs.
def generate_jinja2_template_with_loops(
self,
parsed: Any,
role_prefix: str,
original_text: str | None,
loop_candidates: list[LoopCandidate],
) -> str:
"""Generate template with Jinja2 for loops where appropriate."""
Path conventions:
- Root element's children are treated as top-level (root tag is *not* included).
- Element text:
<foo>bar</foo> -> path ("foo",) value "bar"
<foo attr="x">bar</foo> -> path ("foo", "value") value "bar"
<foo><bar>baz</bar></foo> -> ("foo", "bar") / etc.
- Attributes:
<server host="localhost">
-> path ("server", "@host") value "localhost"
- Repeated sibling elements:
<endpoint>/a</endpoint>
<endpoint>/b</endpoint>
-> ("endpoint", "0") "/a"
("endpoint", "1") "/b"
"""
if original_text is not None:
return self._generate_xml_template_with_loops_from_text(
role_prefix, original_text, loop_candidates
)
if not isinstance(parsed, ET.Element):
raise TypeError("XML parser result must be an Element")
xml_str = ET.tostring(parsed, encoding="unicode")
return self._generate_xml_template_with_loops_from_text(
role_prefix, xml_str, loop_candidates
)
def _flatten_xml(self, root: ET.Element) -> list[tuple[tuple[str, ...], Any]]:
"""Flatten an XML tree into (path, value) pairs."""
items: list[tuple[tuple[str, ...], Any]] = []
def walk(elem: ET.Element, path: tuple[str, ...]) -> None:
@ -77,10 +83,8 @@ class XmlHandler(BaseHandler):
text = (elem.text or "").strip()
if text:
if not elem.attrib and not children:
# Simple <foo>bar</foo>
items.append((path, text))
else:
# Text alongside attrs/children
items.append((path + ("value",), text))
# Repeated siblings get an index; singletons just use the tag
@ -97,24 +101,16 @@ class XmlHandler(BaseHandler):
child_path = path + (tag,)
walk(child, child_path)
# Treat root as a container: its children are top-level
walk(root, ())
return items
def _split_xml_prolog(self, text: str) -> tuple[str, str]:
"""
Split an XML document into (prolog, body), where prolog includes:
- XML declaration (<?xml ...?>)
- top-level comments
- DOCTYPE
The body starts at the root element.
"""
"""Split XML into (prolog, body)."""
i = 0
n = len(text)
prolog_parts: list[str] = []
while i < n:
# Preserve leading whitespace
while i < n and text[i].isspace():
prolog_parts.append(text[i])
i += 1
@ -146,22 +142,33 @@ class XmlHandler(BaseHandler):
continue
if text[i] == "<":
# Assume root element starts here
break
# Unexpected content: stop treating as prolog
break
return "".join(prolog_parts), text[i:]
def _apply_jinja_to_xml_tree(self, role_prefix: str, root: ET.Element) -> None:
def _apply_jinja_to_xml_tree(
self,
role_prefix: str,
root: ET.Element,
loop_candidates: list[LoopCandidate] | None = None,
) -> None:
"""
Mutate the XML tree in-place, replacing scalar values with Jinja
expressions based on the same paths used in _flatten_xml.
Mutate XML tree in-place, replacing values with Jinja expressions.
If loop_candidates is provided, repeated elements matching a candidate
will be replaced with a {% for %} loop.
"""
# Build a map of loop paths for quick lookup
loop_paths = {}
if loop_candidates:
for candidate in loop_candidates:
loop_paths[candidate.path] = candidate
def walk(elem: ET.Element, path: tuple[str, ...]) -> None:
# Attributes
# Attributes (unless this element is in a loop)
for attr_name in list(elem.attrib.keys()):
attr_path = path + (f"@{attr_name}",)
var_name = self.make_var_name(role_prefix, attr_path)
@ -180,51 +187,273 @@ class XmlHandler(BaseHandler):
var_name = self.make_var_name(role_prefix, text_path)
elem.text = f"{{{{ {var_name} }}}}"
# Repeated children get indexes just like in _flatten_xml
# Handle children - check for loops first
counts = Counter(child.tag for child in children)
index_counters: dict[str, int] = defaultdict(int)
# Check each tag to see if it's a loop candidate
processed_tags = set()
for child in children:
tag = child.tag
if counts[tag] > 1:
# Skip if we've already processed this tag as a loop
if tag in processed_tags:
continue
child_path = path + (tag,)
# Check if this is a loop candidate
if child_path in loop_paths:
# Mark this tag as processed
processed_tags.add(tag)
# Remove all children with this tag
for child_to_remove in [c for c in children if c.tag == tag]:
elem.remove(child_to_remove)
# Create a loop comment/marker
# We'll handle the actual loop generation in text processing
loop_marker = ET.Comment(f"LOOP:{tag}")
elem.append(loop_marker)
elif counts[tag] > 1:
# Multiple children but not a loop candidate - use indexed paths
idx = index_counters[tag]
index_counters[tag] += 1
child_path = path + (tag, str(idx))
indexed_path = path + (tag, str(idx))
walk(child, indexed_path)
else:
child_path = path + (tag,)
walk(child, child_path)
# Single child
walk(child, child_path)
walk(root, ())
def _generate_xml_template_from_text(self, role_prefix: str, text: str) -> str:
"""
Generate a Jinja2 template for an XML file, preserving comments and prolog.
- Attributes become Jinja placeholders:
<server host="localhost" />
-> <server host="{{ prefix_server_host }}" />
- Text nodes become placeholders:
<port>8080</port>
-> <port>{{ prefix_port }}</port>
but if the element also has attributes/children, the value path
gets a trailing "value" component, matching flattening.
"""
"""Generate scalar-only Jinja2 template."""
prolog, body = self._split_xml_prolog(text)
# Parse with comments included so <!-- --> are preserved
# defusedxml.defuse_stdlib() is called in CLI entrypoint
parser = ET.XMLParser(target=ET.TreeBuilder(insert_comments=True)) # nosec B314
parser.feed(body)
root = parser.close()
self._apply_jinja_to_xml_tree(role_prefix, root)
# Pretty indentation if available (Python 3.9+)
indent = getattr(ET, "indent", None)
if indent is not None:
indent(root, space=" ") # type: ignore[arg-type]
xml_body = ET.tostring(root, encoding="unicode")
return prolog + xml_body
def _generate_xml_template_with_loops_from_text(
self,
role_prefix: str,
text: str,
loop_candidates: list[LoopCandidate],
) -> str:
"""Generate Jinja2 template with for loops."""
prolog, body = self._split_xml_prolog(text)
# Parse with comments preserved
parser = ET.XMLParser(target=ET.TreeBuilder(insert_comments=True)) # nosec B314
parser.feed(body)
root = parser.close()
# Apply Jinja transformations (including loop markers)
self._apply_jinja_to_xml_tree(role_prefix, root, loop_candidates)
# Convert to string
indent = getattr(ET, "indent", None)
if indent is not None:
indent(root, space=" ") # type: ignore[arg-type]
xml_body = ET.tostring(root, encoding="unicode")
# Post-process to replace loop markers with actual Jinja loops
xml_body = self._insert_xml_loops(xml_body, role_prefix, loop_candidates, root)
return prolog + xml_body
def _insert_xml_loops(
self,
xml_str: str,
role_prefix: str,
loop_candidates: list[LoopCandidate],
root: ET.Element,
) -> str:
"""
Post-process XML string to insert Jinja2 for loops.
This replaces <!--LOOP:tagname--> markers with actual loop constructs.
"""
# Build a sample element for each loop to use as template
lines = xml_str.split("\n")
result_lines = []
for line in lines:
# Check if this line contains a loop marker
if "<!--LOOP:" in line:
# Extract tag name from marker
start = line.find("<!--LOOP:") + 9
end = line.find("-->", start)
tag_name = line[start:end].strip()
# Find matching loop candidate
candidate = None
for cand in loop_candidates:
if cand.path and cand.path[-1] == tag_name:
candidate = cand
break
if candidate:
# Get indentation from current line
indent_level = len(line) - len(line.lstrip())
indent_str = " " * indent_level
# Generate loop variable name
collection_var = self.make_var_name(role_prefix, candidate.path)
item_var = candidate.loop_var
# Create sample element with ALL possible fields from ALL items
if candidate.items:
# Merge all items to get the union of all fields
merged_dict = self._merge_dicts_for_template(candidate.items)
sample_elem = self._dict_to_xml_element(
tag_name, merged_dict, item_var
)
# Apply indentation to the sample element
ET.indent(sample_elem, space=" ")
# Convert sample to string
sample_str = ET.tostring(
sample_elem, encoding="unicode"
).strip()
# Add proper indentation to each line of the sample
sample_lines = sample_str.split("\n")
# Build loop
result_lines.append(
f"{indent_str}{{% for {item_var} in {collection_var} %}}"
)
# Add each line of the sample with proper indentation
for sample_line in sample_lines:
result_lines.append(f"{indent_str} {sample_line}")
result_lines.append(f"{indent_str}{{% endfor %}}")
else:
# Keep the marker if we can't find the candidate
result_lines.append(line)
else:
result_lines.append(line)
# Post-process to replace <!--IF:...--> and <!--ENDIF:...--> with Jinja2 conditionals
final_lines = []
for line in result_lines:
# Replace <!--IF:var.field--> with {% if var.field is defined %}
if "<!--IF:" in line:
start = line.find("<!--IF:") + 7
end = line.find("-->", start)
condition = line[start:end]
indent = len(line) - len(line.lstrip())
final_lines.append(f"{' ' * indent}{{% if {condition} is defined %}}")
# Replace <!--ENDIF:field--> with {% endif %}
elif "<!--ENDIF:" in line:
indent = len(line) - len(line.lstrip())
final_lines.append(f"{' ' * indent}{{% endif %}}")
else:
final_lines.append(line)
return "\n".join(final_lines)
def _merge_dicts_for_template(self, items: list[dict[str, Any]]) -> dict[str, Any]:
"""
Merge all dicts to get the union of all possible keys.
This is used to generate XML templates that include ALL possible child
elements, even if they only appear in some items (like OSSEC rules where
different rules have different optional elements).
Args:
items: List of dict representations of XML elements
Returns:
Merged dict with all possible keys, using first occurrence as example
"""
merged: dict[str, Any] = {}
for item in items:
for key, value in item.items():
if key not in merged:
merged[key] = value
return merged
def _dict_to_xml_element(
self, tag: str, data: dict[str, Any], loop_var: str
) -> ET.Element:
"""
Convert a dict to an XML element with Jinja2 variable references.
For heterogeneous XML (like OSSEC rules), this generates conditional
Jinja2 for optional child elements.
Args:
tag: Element tag name
data: Dict representing element structure (merged from all items)
loop_var: Loop variable name to use in Jinja expressions
"""
elem = ET.Element(tag)
# Handle attributes and child elements
for key, value in data.items():
if key.startswith("@"):
# Attribute - these come from element attributes
attr_name = key[1:] # Remove @ prefix
# Use simple variable reference - attributes should always exist
elem.set(attr_name, f"{{{{ {loop_var}.{attr_name} }}}}")
elif key == "_text":
# Simple text content
elem.text = f"{{{{ {loop_var} }}}}"
elif key == "value":
# Text with attributes/children
elem.text = f"{{{{ {loop_var}.value }}}}"
elif key == "_key":
# This is the dict key (for dict collections), skip in XML
pass
elif isinstance(value, dict):
# Nested element - wrap in conditional since it might not exist in all items
# Create a conditional wrapper comment
child = ET.Element(key)
if "_text" in value:
child.text = f"{{{{ {loop_var}.{key}._text }}}}"
else:
# More complex nested structure
for sub_key, sub_val in value.items():
if not sub_key.startswith("_"):
grandchild = ET.SubElement(child, sub_key)
grandchild.text = f"{{{{ {loop_var}.{key}.{sub_key} }}}}"
# Wrap the child in a Jinja if statement (will be done via text replacement)
# For now, add a marker comment before the element
marker = ET.Comment(f"IF:{loop_var}.{key}")
elem.append(marker)
elem.append(child)
end_marker = ET.Comment(f"ENDIF:{key}")
elem.append(end_marker)
elif not isinstance(value, list):
# Simple child element (scalar value) - also wrap in conditional
marker = ET.Comment(f"IF:{loop_var}.{key}")
elem.append(marker)
child = ET.SubElement(elem, key)
child.text = f"{{{{ {loop_var}.{key} }}}}"
end_marker = ET.Comment(f"ENDIF:{key}")
elem.append(end_marker)
return elem

View file

@ -1,405 +0,0 @@
from __future__ import annotations
from collections import Counter, defaultdict
from pathlib import Path
from typing import Any
import xml.etree.ElementTree as ET # nosec
from .base import BaseHandler
from ..loop_analyzer import LoopCandidate
class XmlHandlerLoopable(BaseHandler):
"""
XML handler that can generate both scalar templates and loop-based templates.
"""
fmt = "xml"
def parse(self, path: Path) -> ET.Element:
text = path.read_text(encoding="utf-8")
parser = ET.XMLParser(
target=ET.TreeBuilder(insert_comments=False)
) # nosec B314
parser.feed(text)
root = parser.close()
return root
def flatten(self, parsed: Any) -> list[tuple[tuple[str, ...], Any]]:
if not isinstance(parsed, ET.Element):
raise TypeError("XML parser result must be an Element")
return self._flatten_xml(parsed)
def generate_template(
self,
parsed: Any,
role_prefix: str,
original_text: str | None = None,
) -> str:
"""Original scalar-only template generation."""
if original_text is not None:
return self._generate_xml_template_from_text(role_prefix, original_text)
if not isinstance(parsed, ET.Element):
raise TypeError("XML parser result must be an Element")
xml_str = ET.tostring(parsed, encoding="unicode")
return self._generate_xml_template_from_text(role_prefix, xml_str)
def generate_template_with_loops(
self,
parsed: Any,
role_prefix: str,
original_text: str | None,
loop_candidates: list[LoopCandidate],
) -> str:
"""Generate template with Jinja2 for loops where appropriate."""
if original_text is not None:
return self._generate_xml_template_with_loops_from_text(
role_prefix, original_text, loop_candidates
)
if not isinstance(parsed, ET.Element):
raise TypeError("XML parser result must be an Element")
xml_str = ET.tostring(parsed, encoding="unicode")
return self._generate_xml_template_with_loops_from_text(
role_prefix, xml_str, loop_candidates
)
def _flatten_xml(self, root: ET.Element) -> list[tuple[tuple[str, ...], Any]]:
"""Flatten an XML tree into (path, value) pairs."""
items: list[tuple[tuple[str, ...], Any]] = []
def walk(elem: ET.Element, path: tuple[str, ...]) -> None:
# Attributes
for attr_name, attr_val in elem.attrib.items():
attr_path = path + (f"@{attr_name}",)
items.append((attr_path, attr_val))
# Children
children = [c for c in list(elem) if isinstance(c.tag, str)]
# Text content
text = (elem.text or "").strip()
if text:
if not elem.attrib and not children:
items.append((path, text))
else:
items.append((path + ("value",), text))
# Repeated siblings get an index; singletons just use the tag
counts = Counter(child.tag for child in children)
index_counters: dict[str, int] = defaultdict(int)
for child in children:
tag = child.tag
if counts[tag] > 1:
idx = index_counters[tag]
index_counters[tag] += 1
child_path = path + (tag, str(idx))
else:
child_path = path + (tag,)
walk(child, child_path)
walk(root, ())
return items
def _split_xml_prolog(self, text: str) -> tuple[str, str]:
"""Split XML into (prolog, body)."""
i = 0
n = len(text)
prolog_parts: list[str] = []
while i < n:
while i < n and text[i].isspace():
prolog_parts.append(text[i])
i += 1
if i >= n:
break
if text.startswith("<?", i):
end = text.find("?>", i + 2)
if end == -1:
break
prolog_parts.append(text[i : end + 2])
i = end + 2
continue
if text.startswith("<!--", i):
end = text.find("-->", i + 4)
if end == -1:
break
prolog_parts.append(text[i : end + 3])
i = end + 3
continue
if text.startswith("<!DOCTYPE", i):
end = text.find(">", i + 9)
if end == -1:
break
prolog_parts.append(text[i : end + 1])
i = end + 1
continue
if text[i] == "<":
break
break
return "".join(prolog_parts), text[i:]
def _apply_jinja_to_xml_tree(
self,
role_prefix: str,
root: ET.Element,
loop_candidates: list[LoopCandidate] | None = None,
) -> None:
"""
Mutate XML tree in-place, replacing values with Jinja expressions.
If loop_candidates is provided, repeated elements matching a candidate
will be replaced with a {% for %} loop.
"""
# Build a map of loop paths for quick lookup
loop_paths = {}
if loop_candidates:
for candidate in loop_candidates:
loop_paths[candidate.path] = candidate
def walk(elem: ET.Element, path: tuple[str, ...]) -> None:
# Attributes (unless this element is in a loop)
for attr_name in list(elem.attrib.keys()):
attr_path = path + (f"@{attr_name}",)
var_name = self.make_var_name(role_prefix, attr_path)
elem.set(attr_name, f"{{{{ {var_name} }}}}")
# Children
children = [c for c in list(elem) if isinstance(c.tag, str)]
# Text content
text = (elem.text or "").strip()
if text:
if not elem.attrib and not children:
text_path = path
else:
text_path = path + ("value",)
var_name = self.make_var_name(role_prefix, text_path)
elem.text = f"{{{{ {var_name} }}}}"
# Handle children - check for loops first
counts = Counter(child.tag for child in children)
index_counters: dict[str, int] = defaultdict(int)
# Check each tag to see if it's a loop candidate
processed_tags = set()
for child in children:
tag = child.tag
# Skip if we've already processed this tag as a loop
if tag in processed_tags:
continue
child_path = path + (tag,)
# Check if this is a loop candidate
if child_path in loop_paths:
# Mark this tag as processed
processed_tags.add(tag)
# Remove all children with this tag
for child_to_remove in [c for c in children if c.tag == tag]:
elem.remove(child_to_remove)
# Create a loop comment/marker
# We'll handle the actual loop generation in text processing
loop_marker = ET.Comment(f"LOOP:{tag}")
elem.append(loop_marker)
elif counts[tag] > 1:
# Multiple children but not a loop candidate - use indexed paths
idx = index_counters[tag]
index_counters[tag] += 1
indexed_path = path + (tag, str(idx))
walk(child, indexed_path)
else:
# Single child
walk(child, child_path)
walk(root, ())
def _generate_xml_template_from_text(self, role_prefix: str, text: str) -> str:
"""Generate scalar-only Jinja2 template."""
prolog, body = self._split_xml_prolog(text)
parser = ET.XMLParser(target=ET.TreeBuilder(insert_comments=True)) # nosec B314
parser.feed(body)
root = parser.close()
self._apply_jinja_to_xml_tree(role_prefix, root)
indent = getattr(ET, "indent", None)
if indent is not None:
indent(root, space=" ") # type: ignore[arg-type]
xml_body = ET.tostring(root, encoding="unicode")
return prolog + xml_body
def _generate_xml_template_with_loops_from_text(
self,
role_prefix: str,
text: str,
loop_candidates: list[LoopCandidate],
) -> str:
"""Generate Jinja2 template with for loops."""
prolog, body = self._split_xml_prolog(text)
# Parse with comments preserved
parser = ET.XMLParser(target=ET.TreeBuilder(insert_comments=True)) # nosec B314
parser.feed(body)
root = parser.close()
# Apply Jinja transformations (including loop markers)
self._apply_jinja_to_xml_tree(role_prefix, root, loop_candidates)
# Convert to string
indent = getattr(ET, "indent", None)
if indent is not None:
indent(root, space=" ") # type: ignore[arg-type]
xml_body = ET.tostring(root, encoding="unicode")
# Post-process to replace loop markers with actual Jinja loops
xml_body = self._insert_xml_loops(xml_body, role_prefix, loop_candidates, root)
return prolog + xml_body
def _insert_xml_loops(
self,
xml_str: str,
role_prefix: str,
loop_candidates: list[LoopCandidate],
root: ET.Element,
) -> str:
"""
Post-process XML string to insert Jinja2 for loops.
This replaces <!--LOOP:tagname--> markers with actual loop constructs.
"""
# Build a sample element for each loop to use as template
lines = xml_str.split("\n")
result_lines = []
for line in lines:
# Check if this line contains a loop marker
if "<!--LOOP:" in line:
# Extract tag name from marker
start = line.find("<!--LOOP:") + 9
end = line.find("-->", start)
tag_name = line[start:end].strip()
# Find matching loop candidate
candidate = None
for cand in loop_candidates:
if cand.path and cand.path[-1] == tag_name:
candidate = cand
break
if candidate:
# Get indentation from current line
indent_level = len(line) - len(line.lstrip())
indent_str = " " * indent_level
# Generate loop variable name
collection_var = self.make_var_name(role_prefix, candidate.path)
item_var = candidate.loop_var
# Create sample element from first item
if candidate.items:
sample_elem = self._dict_to_xml_element(
tag_name, candidate.items[0], item_var
)
# Apply indentation to the sample element
ET.indent(sample_elem, space=" ")
# Convert sample to string
sample_str = ET.tostring(
sample_elem, encoding="unicode"
).strip()
# Add proper indentation to each line of the sample
sample_lines = sample_str.split("\n")
indented_sample_lines = [
(
f"{indent_str} {line}"
if i > 0
else f"{indent_str} {line}"
)
for i, line in enumerate(sample_lines)
]
indented_sample = "\n".join(indented_sample_lines)
# Build loop
result_lines.append(
f"{indent_str}{{% for {item_var} in {collection_var} %}}"
)
result_lines.append(indented_sample)
result_lines.append(f"{indent_str}{{% endfor %}}")
else:
# Keep the marker if we can't find the candidate
result_lines.append(line)
else:
result_lines.append(line)
return "\n".join(result_lines)
def _dict_to_xml_element(
self, tag: str, data: dict[str, Any], loop_var: str
) -> ET.Element:
"""
Convert a dict to an XML element with Jinja2 variable references.
Args:
tag: Element tag name
data: Dict representing element structure
loop_var: Loop variable name to use in Jinja expressions
"""
elem = ET.Element(tag)
# Handle attributes and child elements
for key, value in data.items():
if key.startswith("@"):
# Attribute
attr_name = key[1:] # Remove @ prefix
elem.set(attr_name, f"{{{{ {loop_var}.{attr_name} }}}}")
elif key == "_text":
# Simple text content
elem.text = f"{{{{ {loop_var} }}}}"
elif key == "value":
# Text with attributes/children
elem.text = f"{{{{ {loop_var}.value }}}}"
elif key == "_key":
# This is the dict key (for dict collections), skip in XML
pass
elif isinstance(value, dict):
# Nested element - check if it has _text
child = ET.SubElement(elem, key)
if "_text" in value:
child.text = f"{{{{ {loop_var}.{key}._text }}}}"
else:
# More complex nested structure
for sub_key, sub_val in value.items():
if not sub_key.startswith("_"):
grandchild = ET.SubElement(child, sub_key)
grandchild.text = f"{{{{ {loop_var}.{key}.{sub_key} }}}}"
elif not isinstance(value, list):
# Simple child element (scalar value)
child = ET.SubElement(elem, key)
child.text = f"{{{{ {loop_var}.{key} }}}}"
return elem

View file

@ -4,23 +4,29 @@ import yaml
from pathlib import Path
from typing import Any
from . import DictLikeHandler
from .dict import DictLikeHandler
from ..loop_analyzer import LoopCandidate
class YamlHandler(DictLikeHandler):
"""
YAML handler that can generate both scalar templates and loop-based templates.
"""
fmt = "yaml"
flatten_lists = True # you flatten YAML lists
flatten_lists = True
def parse(self, path: Path) -> Any:
text = path.read_text(encoding="utf-8")
return yaml.safe_load(text) or {}
def generate_template(
def generate_jinja2_template(
self,
parsed: Any,
role_prefix: str,
original_text: str | None = None,
) -> str:
"""Original scalar-only template generation."""
if original_text is not None:
return self._generate_yaml_template_from_text(role_prefix, original_text)
if not isinstance(parsed, (dict, list)):
@ -28,29 +34,41 @@ class YamlHandler(DictLikeHandler):
dumped = yaml.safe_dump(parsed, sort_keys=False)
return self._generate_yaml_template_from_text(role_prefix, dumped)
def generate_jinja2_template_with_loops(
self,
parsed: Any,
role_prefix: str,
original_text: str | None,
loop_candidates: list[LoopCandidate],
) -> str:
"""Generate template with Jinja2 for loops where appropriate."""
# Build loop path set for quick lookup
loop_paths = {candidate.path for candidate in loop_candidates}
if original_text is not None:
return self._generate_yaml_template_with_loops_from_text(
role_prefix, original_text, loop_candidates, loop_paths
)
if not isinstance(parsed, (dict, list)):
raise TypeError("YAML parser result must be a dict or list")
dumped = yaml.safe_dump(parsed, sort_keys=False)
return self._generate_yaml_template_with_loops_from_text(
role_prefix, dumped, loop_candidates, loop_paths
)
def _generate_yaml_template_from_text(
self,
role_prefix: str,
text: str,
) -> str:
"""
Generate a Jinja2 template for a YAML file, preserving comments and
blank lines by patching scalar values in-place.
This handles common "config-ish" YAML:
- top-level and nested mappings
- lists of scalars
- lists of small mapping objects
It does *not* aim to support all YAML edge cases (anchors, tags, etc.).
"""
"""Original scalar-only template generation (unchanged from base)."""
lines = text.splitlines(keepends=True)
out_lines: list[str] = []
# Simple indentation-based context stack: (indent, path, kind)
# kind is "map" or "seq".
stack: list[tuple[int, tuple[str, ...], str]] = []
# Track index per parent path for sequences
seq_counters: dict[tuple[str, ...], int] = {}
def current_path() -> tuple[str, ...]:
@ -60,7 +78,147 @@ class YamlHandler(DictLikeHandler):
stripped = raw_line.lstrip()
indent = len(raw_line) - len(stripped)
# Blank or pure comment lines unchanged
if not stripped or stripped.startswith("#"):
out_lines.append(raw_line)
continue
while stack and indent < stack[-1][0]:
stack.pop()
if ":" in stripped and not stripped.lstrip().startswith("- "):
key_part, rest = stripped.split(":", 1)
key = key_part.strip()
if not key:
out_lines.append(raw_line)
continue
rest_stripped = rest.lstrip(" \t")
value_candidate, _ = self._split_inline_comment(rest_stripped, {"#"})
has_value = bool(value_candidate.strip())
if stack and stack[-1][0] == indent and stack[-1][2] == "map":
stack.pop()
path = current_path() + (key,)
stack.append((indent, path, "map"))
if not has_value:
out_lines.append(raw_line)
continue
value_part, comment_part = self._split_inline_comment(
rest_stripped, {"#"}
)
raw_value = value_part.strip()
var_name = self.make_var_name(role_prefix, path)
use_quotes = (
len(raw_value) >= 2
and raw_value[0] == raw_value[-1]
and raw_value[0] in {'"', "'"}
)
if use_quotes:
q = raw_value[0]
replacement = f"{q}{{{{ {var_name} }}}}{q}"
else:
replacement = f"{{{{ {var_name} }}}}"
leading = rest[: len(rest) - len(rest.lstrip(" \t"))]
new_stripped = f"{key}: {leading}{replacement}{comment_part}"
out_lines.append(
" " * indent
+ new_stripped
+ ("\n" if raw_line.endswith("\n") else "")
)
continue
if stripped.startswith("- "):
if not stack or stack[-1][0] != indent or stack[-1][2] != "seq":
parent_path = current_path()
stack.append((indent, parent_path, "seq"))
parent_path = stack[-1][1]
content = stripped[2:]
index = seq_counters.get(parent_path, 0)
seq_counters[parent_path] = index + 1
path = parent_path + (str(index),)
value_part, comment_part = self._split_inline_comment(content, {"#"})
raw_value = value_part.strip()
var_name = self.make_var_name(role_prefix, path)
use_quotes = (
len(raw_value) >= 2
and raw_value[0] == raw_value[-1]
and raw_value[0] in {'"', "'"}
)
if use_quotes:
q = raw_value[0]
replacement = f"{q}{{{{ {var_name} }}}}{q}"
else:
replacement = f"{{{{ {var_name} }}}}"
new_stripped = f"- {replacement}{comment_part}"
out_lines.append(
" " * indent
+ new_stripped
+ ("\n" if raw_line.endswith("\n") else "")
)
continue
out_lines.append(raw_line)
return "".join(out_lines)
def _generate_yaml_template_with_loops_from_text(
self,
role_prefix: str,
text: str,
loop_candidates: list[LoopCandidate],
loop_paths: set[tuple[str, ...]],
) -> str:
"""
Generate YAML template with Jinja2 for loops.
Strategy:
1. Parse YAML line-by-line maintaining context
2. When we encounter a path that's a loop candidate:
- Replace that section with a {% for %} loop
- Use the first item as template structure
3. Everything else gets scalar variable replacement
"""
lines = text.splitlines(keepends=True)
out_lines: list[str] = []
stack: list[tuple[int, tuple[str, ...], str]] = []
seq_counters: dict[tuple[str, ...], int] = {}
# Track which lines are part of loop sections (to skip them)
skip_until_indent: int | None = None
def current_path() -> tuple[str, ...]:
return stack[-1][1] if stack else ()
for raw_line in lines:
stripped = raw_line.lstrip()
indent = len(raw_line) - len(stripped)
# If we're skipping lines (inside a loop section), check if we can stop
if skip_until_indent is not None:
if (
indent <= skip_until_indent
and stripped
and not stripped.startswith("#")
):
skip_until_indent = None
else:
continue # Skip this line
# Blank or comment lines
if not stripped or stripped.startswith("#"):
out_lines.append(raw_line)
continue
@ -71,42 +229,45 @@ class YamlHandler(DictLikeHandler):
# --- Handle mapping key lines: "key:" or "key: value"
if ":" in stripped and not stripped.lstrip().startswith("- "):
# separate key and rest
key_part, rest = stripped.split(":", 1)
key = key_part.strip()
if not key:
out_lines.append(raw_line)
continue
# Is this just "key:" or "key: value"?
rest_stripped = rest.lstrip(" \t")
# Use the same inline-comment splitter to see if there's any real value
value_candidate, _ = self._split_inline_comment(rest_stripped, {"#"})
has_value = bool(value_candidate.strip())
# Update stack/context: current mapping at this indent
# Replace any existing mapping at same indent
if stack and stack[-1][0] == indent and stack[-1][2] == "map":
stack.pop()
path = current_path() + (key,)
stack.append((indent, path, "map"))
# Check if this path is a loop candidate
if path in loop_paths:
# Find the matching candidate
candidate = next(c for c in loop_candidates if c.path == path)
# Generate loop
loop_str = self._generate_yaml_loop(candidate, role_prefix, indent)
out_lines.append(loop_str)
# Skip subsequent lines that are part of this collection
skip_until_indent = indent
continue
if not has_value:
# Just "key:" -> collection or nested structure begins on following lines.
out_lines.append(raw_line)
continue
# We have an inline scalar value on this same line.
# Separate value from inline comment
# Scalar value - replace with variable
value_part, comment_part = self._split_inline_comment(
rest_stripped, {"#"}
)
raw_value = value_part.strip()
var_name = self.make_var_name(role_prefix, path)
# Keep quote-style if original was quoted
use_quotes = (
len(raw_value) >= 2
and raw_value[0] == raw_value[-1]
@ -130,18 +291,30 @@ class YamlHandler(DictLikeHandler):
# --- Handle list items: "- value" or "- key: value"
if stripped.startswith("- "):
# Determine parent path
# If top of stack isn't sequence at this indent, push one using current path
if not stack or stack[-1][0] != indent or stack[-1][2] != "seq":
parent_path = current_path()
stack.append((indent, parent_path, "seq"))
parent_path = stack[-1][1]
content = stripped[2:] # after "- "
parent_path = stack[-1][1]
content = stripped[2:] # after "- "
# Determine index for this parent path
# Check if parent path is a loop candidate
if parent_path in loop_paths:
# Find the matching candidate
candidate = next(
c for c in loop_candidates if c.path == parent_path
)
# Generate loop (with indent for the '-' items)
loop_str = self._generate_yaml_loop(
candidate, role_prefix, indent, is_list=True
)
out_lines.append(loop_str)
# Skip subsequent items
skip_until_indent = indent - 1 if indent > 0 else None
continue
content = stripped[2:]
index = seq_counters.get(parent_path, 0)
seq_counters[parent_path] = index + 1
@ -151,8 +324,6 @@ class YamlHandler(DictLikeHandler):
raw_value = value_part.strip()
var_name = self.make_var_name(role_prefix, path)
# If it's of the form "key: value" inside the list, we could try to
# support that, but a simple scalar is the common case:
use_quotes = (
len(raw_value) >= 2
and raw_value[0] == raw_value[-1]
@ -173,7 +344,106 @@ class YamlHandler(DictLikeHandler):
)
continue
# Anything else (multi-line scalars, weird YAML): leave untouched
out_lines.append(raw_line)
return "".join(out_lines)
def _generate_yaml_loop(
self,
candidate: LoopCandidate,
role_prefix: str,
indent: int,
is_list: bool = False,
) -> str:
"""
Generate a Jinja2 for loop for a YAML collection.
Args:
candidate: Loop candidate with items and metadata
role_prefix: Variable prefix
indent: Indentation level in spaces
is_list: True if this is a YAML list, False if dict
Returns:
YAML string with Jinja2 loop
"""
indent_str = " " * indent
collection_var = self.make_var_name(role_prefix, candidate.path)
item_var = candidate.loop_var
lines = []
if not is_list:
# Dict-style: key: {% for ... %}
key = candidate.path[-1] if candidate.path else "items"
lines.append(f"{indent_str}{key}:")
lines.append(f"{indent_str} {{% for {item_var} in {collection_var} %}}")
else:
# List-style: just the loop
lines.append(f"{indent_str}{{% for {item_var} in {collection_var} %}}")
# Generate template for item structure
if candidate.items:
sample_item = candidate.items[0]
item_indent = indent + 2 if not is_list else indent
if candidate.item_schema == "scalar":
# Simple list of scalars
if is_list:
lines.append(f"{indent_str}- {{{{ {item_var} }}}}")
else:
lines.append(f"{indent_str} - {{{{ {item_var} }}}}")
elif candidate.item_schema in ("simple_dict", "nested"):
# List of dicts or complex items - these are ALWAYS list items in YAML
item_lines = self._dict_to_yaml_lines(
sample_item, item_var, item_indent, is_list_item=True
)
lines.extend(item_lines)
# Close loop
close_indent = indent + 2 if not is_list else indent
lines.append(f"{' ' * close_indent}{{% endfor %}}")
return "\n".join(lines) + "\n"
def _dict_to_yaml_lines(
self,
data: dict[str, Any],
loop_var: str,
indent: int,
is_list_item: bool = False,
) -> list[str]:
"""
Convert a dict to YAML lines with Jinja2 variable references.
Args:
data: Dict representing item structure
loop_var: Loop variable name
indent: Base indentation level
is_list_item: True if this should start with '-'
Returns:
List of YAML lines
"""
lines = []
indent_str = " " * indent
first_key = True
for key, value in data.items():
if key == "_key":
# Special key for dict collections - output as comment or skip
continue
if first_key and is_list_item:
# First key gets the list marker
lines.append(f"{indent_str}- {key}: {{{{ {loop_var}.{key} }}}}")
first_key = False
else:
# Subsequent keys are indented
sub_indent = indent + 2 if is_list_item else indent
lines.append(f"{' ' * sub_indent}{key}: {{{{ {loop_var}.{key} }}}}")
return lines

View file

@ -1,449 +0,0 @@
from __future__ import annotations
import yaml
from pathlib import Path
from typing import Any
from .dict import DictLikeHandler
from ..loop_analyzer import LoopCandidate
class YamlHandlerLoopable(DictLikeHandler):
"""
YAML handler that can generate both scalar templates and loop-based templates.
"""
fmt = "yaml"
flatten_lists = True
def parse(self, path: Path) -> Any:
text = path.read_text(encoding="utf-8")
return yaml.safe_load(text) or {}
def generate_template(
self,
parsed: Any,
role_prefix: str,
original_text: str | None = None,
) -> str:
"""Original scalar-only template generation."""
if original_text is not None:
return self._generate_yaml_template_from_text(role_prefix, original_text)
if not isinstance(parsed, (dict, list)):
raise TypeError("YAML parser result must be a dict or list")
dumped = yaml.safe_dump(parsed, sort_keys=False)
return self._generate_yaml_template_from_text(role_prefix, dumped)
def generate_template_with_loops(
self,
parsed: Any,
role_prefix: str,
original_text: str | None,
loop_candidates: list[LoopCandidate],
) -> str:
"""Generate template with Jinja2 for loops where appropriate."""
# Build loop path set for quick lookup
loop_paths = {candidate.path for candidate in loop_candidates}
if original_text is not None:
return self._generate_yaml_template_with_loops_from_text(
role_prefix, original_text, loop_candidates, loop_paths
)
if not isinstance(parsed, (dict, list)):
raise TypeError("YAML parser result must be a dict or list")
dumped = yaml.safe_dump(parsed, sort_keys=False)
return self._generate_yaml_template_with_loops_from_text(
role_prefix, dumped, loop_candidates, loop_paths
)
def _generate_yaml_template_from_text(
self,
role_prefix: str,
text: str,
) -> str:
"""Original scalar-only template generation (unchanged from base)."""
lines = text.splitlines(keepends=True)
out_lines: list[str] = []
stack: list[tuple[int, tuple[str, ...], str]] = []
seq_counters: dict[tuple[str, ...], int] = {}
def current_path() -> tuple[str, ...]:
return stack[-1][1] if stack else ()
for raw_line in lines:
stripped = raw_line.lstrip()
indent = len(raw_line) - len(stripped)
if not stripped or stripped.startswith("#"):
out_lines.append(raw_line)
continue
while stack and indent < stack[-1][0]:
stack.pop()
if ":" in stripped and not stripped.lstrip().startswith("- "):
key_part, rest = stripped.split(":", 1)
key = key_part.strip()
if not key:
out_lines.append(raw_line)
continue
rest_stripped = rest.lstrip(" \t")
value_candidate, _ = self._split_inline_comment(rest_stripped, {"#"})
has_value = bool(value_candidate.strip())
if stack and stack[-1][0] == indent and stack[-1][2] == "map":
stack.pop()
path = current_path() + (key,)
stack.append((indent, path, "map"))
if not has_value:
out_lines.append(raw_line)
continue
value_part, comment_part = self._split_inline_comment(
rest_stripped, {"#"}
)
raw_value = value_part.strip()
var_name = self.make_var_name(role_prefix, path)
use_quotes = (
len(raw_value) >= 2
and raw_value[0] == raw_value[-1]
and raw_value[0] in {'"', "'"}
)
if use_quotes:
q = raw_value[0]
replacement = f"{q}{{{{ {var_name} }}}}{q}"
else:
replacement = f"{{{{ {var_name} }}}}"
leading = rest[: len(rest) - len(rest.lstrip(" \t"))]
new_stripped = f"{key}: {leading}{replacement}{comment_part}"
out_lines.append(
" " * indent
+ new_stripped
+ ("\n" if raw_line.endswith("\n") else "")
)
continue
if stripped.startswith("- "):
if not stack or stack[-1][0] != indent or stack[-1][2] != "seq":
parent_path = current_path()
stack.append((indent, parent_path, "seq"))
parent_path = stack[-1][1]
content = stripped[2:]
index = seq_counters.get(parent_path, 0)
seq_counters[parent_path] = index + 1
path = parent_path + (str(index),)
value_part, comment_part = self._split_inline_comment(content, {"#"})
raw_value = value_part.strip()
var_name = self.make_var_name(role_prefix, path)
use_quotes = (
len(raw_value) >= 2
and raw_value[0] == raw_value[-1]
and raw_value[0] in {'"', "'"}
)
if use_quotes:
q = raw_value[0]
replacement = f"{q}{{{{ {var_name} }}}}{q}"
else:
replacement = f"{{{{ {var_name} }}}}"
new_stripped = f"- {replacement}{comment_part}"
out_lines.append(
" " * indent
+ new_stripped
+ ("\n" if raw_line.endswith("\n") else "")
)
continue
out_lines.append(raw_line)
return "".join(out_lines)
def _generate_yaml_template_with_loops_from_text(
self,
role_prefix: str,
text: str,
loop_candidates: list[LoopCandidate],
loop_paths: set[tuple[str, ...]],
) -> str:
"""
Generate YAML template with Jinja2 for loops.
Strategy:
1. Parse YAML line-by-line maintaining context
2. When we encounter a path that's a loop candidate:
- Replace that section with a {% for %} loop
- Use the first item as template structure
3. Everything else gets scalar variable replacement
"""
lines = text.splitlines(keepends=True)
out_lines: list[str] = []
stack: list[tuple[int, tuple[str, ...], str]] = []
seq_counters: dict[tuple[str, ...], int] = {}
# Track which lines are part of loop sections (to skip them)
skip_until_indent: int | None = None
def current_path() -> tuple[str, ...]:
return stack[-1][1] if stack else ()
for raw_line in lines:
stripped = raw_line.lstrip()
indent = len(raw_line) - len(stripped)
# If we're skipping lines (inside a loop section), check if we can stop
if skip_until_indent is not None:
if (
indent <= skip_until_indent
and stripped
and not stripped.startswith("#")
):
skip_until_indent = None
else:
continue # Skip this line
# Blank or comment lines
if not stripped or stripped.startswith("#"):
out_lines.append(raw_line)
continue
# Adjust stack based on indent
while stack and indent < stack[-1][0]:
stack.pop()
# --- Handle mapping key lines: "key:" or "key: value"
if ":" in stripped and not stripped.lstrip().startswith("- "):
key_part, rest = stripped.split(":", 1)
key = key_part.strip()
if not key:
out_lines.append(raw_line)
continue
rest_stripped = rest.lstrip(" \t")
value_candidate, _ = self._split_inline_comment(rest_stripped, {"#"})
has_value = bool(value_candidate.strip())
if stack and stack[-1][0] == indent and stack[-1][2] == "map":
stack.pop()
path = current_path() + (key,)
stack.append((indent, path, "map"))
# Check if this path is a loop candidate
if path in loop_paths:
# Find the matching candidate
candidate = next(c for c in loop_candidates if c.path == path)
# Generate loop
loop_str = self._generate_yaml_loop(candidate, role_prefix, indent)
out_lines.append(loop_str)
# Skip subsequent lines that are part of this collection
skip_until_indent = indent
continue
if not has_value:
out_lines.append(raw_line)
continue
# Scalar value - replace with variable
value_part, comment_part = self._split_inline_comment(
rest_stripped, {"#"}
)
raw_value = value_part.strip()
var_name = self.make_var_name(role_prefix, path)
use_quotes = (
len(raw_value) >= 2
and raw_value[0] == raw_value[-1]
and raw_value[0] in {'"', "'"}
)
if use_quotes:
q = raw_value[0]
replacement = f"{q}{{{{ {var_name} }}}}{q}"
else:
replacement = f"{{{{ {var_name} }}}}"
leading = rest[: len(rest) - len(rest.lstrip(" \t"))]
new_stripped = f"{key}: {leading}{replacement}{comment_part}"
out_lines.append(
" " * indent
+ new_stripped
+ ("\n" if raw_line.endswith("\n") else "")
)
continue
# --- Handle list items: "- value" or "- key: value"
if stripped.startswith("- "):
if not stack or stack[-1][0] != indent or stack[-1][2] != "seq":
parent_path = current_path()
stack.append((indent, parent_path, "seq"))
parent_path = stack[-1][1]
# Check if parent path is a loop candidate
if parent_path in loop_paths:
# Find the matching candidate
candidate = next(
c for c in loop_candidates if c.path == parent_path
)
# Generate loop (with indent for the '-' items)
loop_str = self._generate_yaml_loop(
candidate, role_prefix, indent, is_list=True
)
out_lines.append(loop_str)
# Skip subsequent items
skip_until_indent = indent - 1 if indent > 0 else None
continue
content = stripped[2:]
index = seq_counters.get(parent_path, 0)
seq_counters[parent_path] = index + 1
path = parent_path + (str(index),)
value_part, comment_part = self._split_inline_comment(content, {"#"})
raw_value = value_part.strip()
var_name = self.make_var_name(role_prefix, path)
use_quotes = (
len(raw_value) >= 2
and raw_value[0] == raw_value[-1]
and raw_value[0] in {'"', "'"}
)
if use_quotes:
q = raw_value[0]
replacement = f"{q}{{{{ {var_name} }}}}{q}"
else:
replacement = f"{{{{ {var_name} }}}}"
new_stripped = f"- {replacement}{comment_part}"
out_lines.append(
" " * indent
+ new_stripped
+ ("\n" if raw_line.endswith("\n") else "")
)
continue
out_lines.append(raw_line)
return "".join(out_lines)
def _generate_yaml_loop(
self,
candidate: LoopCandidate,
role_prefix: str,
indent: int,
is_list: bool = False,
) -> str:
"""
Generate a Jinja2 for loop for a YAML collection.
Args:
candidate: Loop candidate with items and metadata
role_prefix: Variable prefix
indent: Indentation level in spaces
is_list: True if this is a YAML list, False if dict
Returns:
YAML string with Jinja2 loop
"""
indent_str = " " * indent
collection_var = self.make_var_name(role_prefix, candidate.path)
item_var = candidate.loop_var
lines = []
if not is_list:
# Dict-style: key: {% for ... %}
key = candidate.path[-1] if candidate.path else "items"
lines.append(f"{indent_str}{key}:")
lines.append(f"{indent_str} {{% for {item_var} in {collection_var} %}}")
else:
# List-style: just the loop
lines.append(f"{indent_str}{{% for {item_var} in {collection_var} %}}")
# Generate template for item structure
if candidate.items:
sample_item = candidate.items[0]
item_indent = indent + 2 if not is_list else indent
if candidate.item_schema == "scalar":
# Simple list of scalars
if is_list:
lines.append(f"{indent_str}- {{{{ {item_var} }}}}")
else:
lines.append(f"{indent_str} - {{{{ {item_var} }}}}")
elif candidate.item_schema in ("simple_dict", "nested"):
# List of dicts or complex items - these are ALWAYS list items in YAML
item_lines = self._dict_to_yaml_lines(
sample_item, item_var, item_indent, is_list_item=True
)
lines.extend(item_lines)
# Close loop
close_indent = indent + 2 if not is_list else indent
lines.append(f"{' ' * close_indent}{{% endfor %}}")
return "\n".join(lines) + "\n"
def _dict_to_yaml_lines(
self,
data: dict[str, Any],
loop_var: str,
indent: int,
is_list_item: bool = False,
) -> list[str]:
"""
Convert a dict to YAML lines with Jinja2 variable references.
Args:
data: Dict representing item structure
loop_var: Loop variable name
indent: Base indentation level
is_list_item: True if this should start with '-'
Returns:
List of YAML lines
"""
lines = []
indent_str = " " * indent
first_key = True
for key, value in data.items():
if key == "_key":
# Special key for dict collections - output as comment or skip
continue
if first_key and is_list_item:
# First key gets the list marker
lines.append(f"{indent_str}- {key}: {{{{ {loop_var}.{key} }}}}")
first_key = False
else:
# Subsequent keys are indented
sub_indent = indent + 2 if is_list_item else indent
lines.append(f"{' ' * sub_indent}{key}: {{{{ {loop_var}.{key} }}}}")
return lines

View file

@ -1,3 +1,10 @@
"""
Loop detection and analysis for intelligent Jinja2 template generation.
This module determines when config structures should use Jinja2 'for' loops
instead of flattened scalar variables.
"""
from __future__ import annotations
from collections import Counter
@ -373,7 +380,8 @@ class LoopAnalyzer:
# Allow some variation
all_attrs = set().union(*attr_sets)
common_attrs = set.intersection(*attr_sets) if attr_sets else set()
if len(common_attrs) / max(len(all_attrs), 1) < 0.7:
# Very permissive for attributes - 20% overlap is OK
if len(common_attrs) / max(len(all_attrs), 1) < 0.2:
return False
# Compare child element tags
@ -384,12 +392,16 @@ class LoopAnalyzer:
if child_tag_sets:
first_tags = child_tag_sets[0]
if not all(tags == first_tags for tags in child_tag_sets):
# Allow some variation
# Allow significant variation for XML - just need SOME commonality
# This is important for cases like OSSEC rules where each rule
# has different optional child elements (if_sid, url_pcre2, etc.)
all_tags = set().union(*child_tag_sets)
common_tags = (
set.intersection(*child_tag_sets) if child_tag_sets else set()
)
if len(common_tags) / max(len(all_tags), 1) < 0.7:
# Lower threshold to 20% - if they share at least 20% of tags, consider them similar
# Even if they just share 'description' or 'id' fields, that's enough
if len(common_tags) / max(len(all_tags), 1) < 0.2:
return False
return True

View file

@ -31,4 +31,4 @@ def test_base_handler_abstract_methods_raise_not_implemented(tmp_path: Path):
handler.flatten(object())
with pytest.raises(NotImplementedError):
handler.generate_template(parsed=object(), role_prefix="role")
handler.generate_jinja2_template(parsed=object(), role_prefix="role")

View file

@ -10,8 +10,8 @@ from jinjaturtle.core import (
detect_format,
parse_config,
flatten_config,
generate_defaults_yaml,
generate_template,
generate_ansible_yaml,
generate_jinja2_template,
make_var_name,
)
@ -90,9 +90,9 @@ def test_parse_config_unsupported_format(tmp_path: Path):
parse_config(cfg_path, fmt="bogus")
def test_generate_template_type_and_format_errors():
def test_generate_jinja2_template_type_and_format_errors():
"""
Exercise the error branches in generate_template:
Exercise the error branches in generate_jinja2_template:
- toml with non-dict parsed
- ini with non-ConfigParser parsed
- yaml with wrong parsed type
@ -101,27 +101,29 @@ def test_generate_template_type_and_format_errors():
"""
# wrong type for TOML
with pytest.raises(TypeError):
generate_template("toml", parsed="not a dict", role_prefix="role")
generate_jinja2_template("toml", parsed="not a dict", role_prefix="role")
# wrong type for INI
with pytest.raises(TypeError):
generate_template("ini", parsed={"not": "a configparser"}, role_prefix="role")
generate_jinja2_template(
"ini", parsed={"not": "a configparser"}, role_prefix="role"
)
# wrong type for YAML
with pytest.raises(TypeError):
generate_template("yaml", parsed=None, role_prefix="role")
generate_jinja2_template("yaml", parsed=None, role_prefix="role")
# wrong type for JSON
with pytest.raises(TypeError):
generate_template("json", parsed=None, role_prefix="role")
generate_jinja2_template("json", parsed=None, role_prefix="role")
# unsupported format, no original_text
with pytest.raises(ValueError):
generate_template("bogusfmt", parsed=None, role_prefix="role")
generate_jinja2_template("bogusfmt", parsed=None, role_prefix="role")
# unsupported format, with original_text
with pytest.raises(ValueError):
generate_template(
generate_jinja2_template(
"bogusfmt",
parsed=None,
role_prefix="role",
@ -135,8 +137,8 @@ def test_normalize_default_value_true_false_strings():
(("section", "foo"), "true"),
(("section", "bar"), "FALSE"),
]
defaults_yaml = generate_defaults_yaml("role", flat_items)
data = yaml.safe_load(defaults_yaml)
ansible_yaml = generate_ansible_yaml("role", flat_items)
data = yaml.safe_load(ansible_yaml)
assert data["role_section_foo"] == "true"
assert data["role_section_bar"] == "FALSE"
@ -167,14 +169,14 @@ def test_fallback_str_representer_for_unknown_type():
def test_normalize_default_value_bool_inputs_are_stringified():
"""
Real boolean values should be turned into quoted 'true'/'false' strings
by _normalize_default_value via generate_defaults_yaml.
by _normalize_default_value via generate_ansible_yaml.
"""
flat_items = [
(("section", "flag_true"), True),
(("section", "flag_false"), False),
]
defaults_yaml = generate_defaults_yaml("role", flat_items)
data = yaml.safe_load(defaults_yaml)
ansible_yaml = generate_ansible_yaml("role", flat_items)
data = yaml.safe_load(ansible_yaml)
assert data["role_section_flag_true"] == "true"
assert data["role_section_flag_false"] == "false"

View file

@ -8,8 +8,8 @@ import yaml
from jinjaturtle.core import (
parse_config,
flatten_config,
generate_defaults_yaml,
generate_template,
generate_ansible_yaml,
generate_jinja2_template,
)
from jinjaturtle.handlers.ini import IniHandler
@ -26,8 +26,8 @@ def test_ini_php_sample_roundtrip():
flat_items = flatten_config(fmt, parsed)
assert flat_items, "Expected at least one flattened item from php.ini sample"
defaults_yaml = generate_defaults_yaml("php", flat_items)
defaults = yaml.safe_load(defaults_yaml)
ansible_yaml = generate_ansible_yaml("php", flat_items)
defaults = yaml.safe_load(ansible_yaml)
# defaults should be a non-empty dict
assert isinstance(defaults, dict)
@ -41,7 +41,7 @@ def test_ini_php_sample_roundtrip():
# template generation
original_text = ini_path.read_text(encoding="utf-8")
template = generate_template(fmt, parsed, "php", original_text=original_text)
template = generate_jinja2_template(fmt, parsed, "php", original_text=original_text)
assert "; About this file" in template
assert isinstance(template, str)
assert template.strip(), "Template for php.ini sample should not be empty"
@ -53,16 +53,16 @@ def test_ini_php_sample_roundtrip():
), f"Variable {var_name} not referenced in INI template"
def test_generate_template_fallback_ini():
def test_generate_jinja2_template_fallback_ini():
"""
When original_text is not provided, generate_template should use the
When original_text is not provided, generate_jinja2_template should use the
structural fallback path for INI configs.
"""
parser = configparser.ConfigParser()
# foo is quoted in the INI text to hit the "preserve quotes" branch
parser["section"] = {"foo": '"bar"', "num": "42"}
tmpl_ini = generate_template("ini", parsed=parser, role_prefix="role")
tmpl_ini = generate_jinja2_template("ini", parsed=parser, role_prefix="role")
assert "[section]" in tmpl_ini
assert "role_section_foo" in tmpl_ini
assert '"{{ role_section_foo }}"' in tmpl_ini # came from quoted INI value

View file

@ -9,7 +9,7 @@ import yaml
from jinjaturtle.core import (
parse_config,
flatten_config,
generate_defaults_yaml,
generate_ansible_yaml,
)
from jinjaturtle.handlers.json import JsonHandler
@ -24,8 +24,8 @@ def test_json_roundtrip():
assert fmt == "json"
flat_items = flatten_config(fmt, parsed)
defaults_yaml = generate_defaults_yaml("foobar", flat_items)
defaults = yaml.safe_load(defaults_yaml)
ansible_yaml = generate_ansible_yaml("foobar", flat_items)
defaults = yaml.safe_load(ansible_yaml)
# Defaults: nested keys and list indices
assert defaults["foobar_foo"] == "bar"
@ -35,10 +35,12 @@ def test_json_roundtrip():
assert defaults["foobar_list_0"] == 10
assert defaults["foobar_list_1"] == 20
# Template generation is done via JsonHandler.generate_template; we just
# Template generation is done via JsonHandler.generate_jinja2_template; we just
# make sure it produces a structure with the expected placeholders.
handler = JsonHandler()
templated = json.loads(handler.generate_template(parsed, role_prefix="foobar"))
templated = json.loads(
handler.generate_jinja2_template(parsed, role_prefix="foobar")
)
assert templated["foo"] == "{{ foobar_foo }}"
assert "foobar_nested_a" in str(templated)
@ -47,10 +49,10 @@ def test_json_roundtrip():
assert "foobar_list_1" in str(templated)
def test_generate_template_json_type_error():
def test_generate_jinja2_template_json_type_error():
"""
Wrong type for JSON in JsonHandler.generate_template should raise TypeError.
Wrong type for JSON in JsonHandler.generate_jinja2_template should raise TypeError.
"""
handler = JsonHandler()
with pytest.raises(TypeError):
handler.generate_template(parsed="not a dict", role_prefix="role")
handler.generate_jinja2_template(parsed="not a dict", role_prefix="role")

View file

@ -8,8 +8,8 @@ import yaml
from jinjaturtle.core import (
parse_config,
flatten_config,
generate_defaults_yaml,
generate_template,
generate_ansible_yaml,
generate_jinja2_template,
)
from jinjaturtle.handlers.toml import TomlHandler
import jinjaturtle.handlers.toml as toml_module
@ -27,8 +27,8 @@ def test_toml_sample_roundtrip():
flat_items = flatten_config(fmt, parsed)
assert flat_items
defaults_yaml = generate_defaults_yaml("jinjaturtle", flat_items)
defaults = yaml.safe_load(defaults_yaml)
ansible_yaml = generate_ansible_yaml("jinjaturtle", flat_items)
defaults = yaml.safe_load(ansible_yaml)
# defaults should be a non-empty dict
assert isinstance(defaults, dict)
@ -42,7 +42,7 @@ def test_toml_sample_roundtrip():
# template generation **now with original_text**
original_text = toml_path.read_text(encoding="utf-8")
template = generate_template(
template = generate_jinja2_template(
fmt, parsed, "jinjaturtle", original_text=original_text
)
assert isinstance(template, str)
@ -72,9 +72,9 @@ def test_parse_config_toml_missing_tomllib(monkeypatch):
assert "tomllib/tomli is required" in str(exc.value)
def test_generate_template_fallback_toml():
def test_generate_jinja2_template_fallback_toml():
"""
When original_text is not provided, generate_template should use the
When original_text is not provided, generate_jinja2_template should use the
structural fallback path for TOML configs.
"""
parsed_toml = {
@ -84,7 +84,7 @@ def test_generate_template_fallback_toml():
"file": {"path": "/tmp/app.log"}
}, # nested table to hit recursive walk
}
tmpl_toml = generate_template("toml", parsed=parsed_toml, role_prefix="role")
tmpl_toml = generate_jinja2_template("toml", parsed=parsed_toml, role_prefix="role")
assert "[server]" in tmpl_toml
assert "role_server_port" in tmpl_toml
assert "[logging]" in tmpl_toml or "[logging.file]" in tmpl_toml

View file

@ -10,8 +10,8 @@ import yaml
from jinjaturtle.core import (
parse_config,
flatten_config,
generate_defaults_yaml,
generate_template,
generate_ansible_yaml,
generate_jinja2_template,
)
from jinjaturtle.handlers.xml import XmlHandler
@ -28,8 +28,8 @@ def test_xml_roundtrip_ossec_web_rules():
flat_items = flatten_config(fmt, parsed)
assert flat_items, "Expected at least one flattened item from XML sample"
defaults_yaml = generate_defaults_yaml("ossec", flat_items)
defaults = yaml.safe_load(defaults_yaml)
ansible_yaml = generate_ansible_yaml("ossec", flat_items)
defaults = yaml.safe_load(ansible_yaml)
# defaults should be a non-empty dict
assert isinstance(defaults, dict)
@ -55,7 +55,9 @@ def test_xml_roundtrip_ossec_web_rules():
# Template generation (preserving comments)
original_text = xml_path.read_text(encoding="utf-8")
template = generate_template(fmt, parsed, "ossec", original_text=original_text)
template = generate_jinja2_template(
fmt, parsed, "ossec", original_text=original_text
)
assert isinstance(template, str)
assert template.strip(), "Template for XML sample should not be empty"
@ -108,13 +110,13 @@ def test_generate_xml_template_from_text_edge_cases():
assert "role_child_1" in tmpl
def test_generate_template_xml_type_error():
def test_generate_jinja2_template_xml_type_error():
"""
Wrong type for XML in XmlHandler.generate_template should raise TypeError.
Wrong type for XML in XmlHandler.generate_jinja2_template should raise TypeError.
"""
handler = XmlHandler()
with pytest.raises(TypeError):
handler.generate_template(parsed="not an element", role_prefix="role")
handler.generate_jinja2_template(parsed="not an element", role_prefix="role")
def test_flatten_config_xml_type_error():
@ -125,9 +127,9 @@ def test_flatten_config_xml_type_error():
flatten_config("xml", parsed="not-an-element")
def test_generate_template_xml_structural_fallback():
def test_generate_jinja2_template_xml_structural_fallback():
"""
When original_text is not provided for XML, generate_template should use
When original_text is not provided for XML, generate_jinja2_template should use
the structural fallback path (ET.tostring + handler processing).
"""
xml_text = textwrap.dedent(
@ -140,7 +142,7 @@ def test_generate_template_xml_structural_fallback():
)
root = ET.fromstring(xml_text)
tmpl = generate_template("xml", parsed=root, role_prefix="role")
tmpl = generate_jinja2_template("xml", parsed=root, role_prefix="role")
# Root attribute path ("@attr",) -> role_attr
assert "role_attr" in tmpl

View file

@ -8,8 +8,8 @@ import yaml
from jinjaturtle.core import (
parse_config,
flatten_config,
generate_defaults_yaml,
generate_template,
generate_ansible_yaml,
generate_jinja2_template,
)
from jinjaturtle.handlers.yaml import YamlHandler
@ -24,8 +24,8 @@ def test_yaml_roundtrip_with_list_and_comment():
assert fmt == "yaml"
flat_items = flatten_config(fmt, parsed)
defaults_yaml = generate_defaults_yaml("foobar", flat_items)
defaults = yaml.safe_load(defaults_yaml)
ansible_yaml = generate_ansible_yaml("foobar", flat_items)
defaults = yaml.safe_load(ansible_yaml)
# Defaults: keys are flattened with indices
assert defaults["foobar_foo"] == "bar"
@ -34,7 +34,9 @@ def test_yaml_roundtrip_with_list_and_comment():
# Template generation (preserving comments)
original_text = yaml_path.read_text(encoding="utf-8")
template = generate_template(fmt, parsed, "foobar", original_text=original_text)
template = generate_jinja2_template(
fmt, parsed, "foobar", original_text=original_text
)
# Comment preserved
assert "# Top comment" in template
@ -86,14 +88,14 @@ def test_generate_yaml_template_from_text_edge_cases():
assert "role_list_1" in tmpl
def test_generate_template_yaml_structural_fallback():
def test_generate_jinja2_template_yaml_structural_fallback():
"""
When original_text is not provided for YAML, generate_template should use
When original_text is not provided for YAML, generate_jinja2_template should use
the structural fallback path (yaml.safe_dump + handler processing).
"""
parsed = {"outer": {"inner": "val"}}
tmpl = generate_template("yaml", parsed=parsed, role_prefix="role")
tmpl = generate_jinja2_template("yaml", parsed=parsed, role_prefix="role")
# We don't care about exact formatting, just that the expected variable
# name shows up, proving we went through the structural path.