From bd3f9bf8d26d1f6d56995d831c0e51c66e9a65dd Mon Sep 17 00:00:00 2001 From: Miguel Jacq Date: Thu, 27 Nov 2025 21:36:56 +1100 Subject: [PATCH 1/5] logo update --- jinjaturtle.svg | 2 -- 1 file changed, 2 deletions(-) diff --git a/jinjaturtle.svg b/jinjaturtle.svg index 4a0edb7..2e6fcf2 100644 --- a/jinjaturtle.svg +++ b/jinjaturtle.svg @@ -9,8 +9,6 @@ stroke-width="4"/> - Date: Thu, 27 Nov 2025 21:37:29 +1100 Subject: [PATCH 2/5] comment cleanup --- src/jinjaturtle/handlers/ini.py | 4 ++-- src/jinjaturtle/handlers/json.py | 2 +- src/jinjaturtle/handlers/yaml.py | 4 +--- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/jinjaturtle/handlers/ini.py b/src/jinjaturtle/handlers/ini.py index 24bf44f..d18718a 100644 --- a/src/jinjaturtle/handlers/ini.py +++ b/src/jinjaturtle/handlers/ini.py @@ -72,8 +72,8 @@ class IniHandler(BaseHandler): def _generate_ini_template_from_text(self, role_prefix: str, text: str) -> str: """ - Generate a Jinja2 template for an INI/php.ini-style file, preserving - comments, blank lines, and section headers by patching values in-place. + Generate a Jinja2 template for an INI-style file, preserving comments, + blank lines, and section headers by patching values in-place. """ lines = text.splitlines(keepends=True) current_section: str | None = None diff --git a/src/jinjaturtle/handlers/json.py b/src/jinjaturtle/handlers/json.py index 5149238..544a9af 100644 --- a/src/jinjaturtle/handlers/json.py +++ b/src/jinjaturtle/handlers/json.py @@ -23,7 +23,7 @@ class JsonHandler(DictLikeHandler): ) -> str: if not isinstance(parsed, (dict, list)): raise TypeError("JSON parser result must be a dict or list") - # As before: ignore original_text and rebuild structurally + # Rebuild structurally return self._generate_json_template(role_prefix, parsed) def _generate_json_template(self, role_prefix: str, data: Any) -> str: diff --git a/src/jinjaturtle/handlers/yaml.py b/src/jinjaturtle/handlers/yaml.py index 2ebaf3e..f4b3fc5 100644 --- a/src/jinjaturtle/handlers/yaml.py +++ b/src/jinjaturtle/handlers/yaml.py @@ -9,7 +9,7 @@ from . import DictLikeHandler class YamlHandler(DictLikeHandler): fmt = "yaml" - flatten_lists = True # you flatten YAML lists + flatten_lists = True def parse(self, path: Path) -> Any: text = path.read_text(encoding="utf-8") @@ -97,8 +97,6 @@ class YamlHandler(DictLikeHandler): out_lines.append(raw_line) continue - # We have an inline scalar value on this same line. - # Separate value from inline comment value_part, comment_part = self._split_inline_comment( rest_stripped, {"#"} From 2db80cc6e12f600eac9f4635d5e2ef09bba09bd3 Mon Sep 17 00:00:00 2001 From: Miguel Jacq Date: Fri, 28 Nov 2025 12:14:17 +1100 Subject: [PATCH 3/5] Add ability to generate 'loops' in Jinja if the XML or YAML config supports it --- src/jinjaturtle/cli.py | 24 +- src/jinjaturtle/core.py | 123 ++++-- src/jinjaturtle/handlers/__init__.py | 4 + src/jinjaturtle/handlers/ini.py | 4 +- src/jinjaturtle/handlers/json.py | 2 +- src/jinjaturtle/handlers/xml_loopable.py | 405 +++++++++++++++++++ src/jinjaturtle/handlers/yaml.py | 4 +- src/jinjaturtle/handlers/yaml_loopable.py | 449 ++++++++++++++++++++++ src/jinjaturtle/loop_analyzer.py | 433 +++++++++++++++++++++ 9 files changed, 1411 insertions(+), 37 deletions(-) create mode 100644 src/jinjaturtle/handlers/xml_loopable.py create mode 100644 src/jinjaturtle/handlers/yaml_loopable.py create mode 100644 src/jinjaturtle/loop_analyzer.py diff --git a/src/jinjaturtle/cli.py b/src/jinjaturtle/cli.py index ce096c4..032aa7e 100644 --- a/src/jinjaturtle/cli.py +++ b/src/jinjaturtle/cli.py @@ -7,6 +7,7 @@ from pathlib import Path from .core import ( parse_config, + analyze_loops, flatten_config, generate_defaults_yaml, generate_template, @@ -53,12 +54,27 @@ def _main(argv: list[str] | None = None) -> int: args = parser.parse_args(argv) config_path = Path(args.config) - fmt, parsed = parse_config(config_path, args.format) - flat_items = flatten_config(fmt, parsed) - defaults_yaml = generate_defaults_yaml(args.role_name, flat_items) config_text = config_path.read_text(encoding="utf-8") + + # Parse the config + fmt, parsed = parse_config(config_path, args.format) + + # Analyze for loops + loop_candidates = analyze_loops(fmt, parsed) + + # Flatten config (excluding loop paths if loops are detected) + flat_items = flatten_config(fmt, parsed, loop_candidates) + + # Generate defaults YAML (with loop collections if detected) + defaults_yaml = generate_defaults_yaml(args.role_name, flat_items, loop_candidates) + + # Generate template (with loops if detected) template_str = generate_template( - fmt, parsed, args.role_name, original_text=config_text + fmt, + parsed, + args.role_name, + original_text=config_text, + loop_candidates=loop_candidates, ) if args.defaults_output: diff --git a/src/jinjaturtle/core.py b/src/jinjaturtle/core.py index 3fc46c5..b0c24b7 100644 --- a/src/jinjaturtle/core.py +++ b/src/jinjaturtle/core.py @@ -5,6 +5,7 @@ from typing import Any, Iterable import yaml +from .loop_analyzer import LoopAnalyzer, LoopCandidate from .handlers import ( BaseHandler, IniHandler, @@ -12,25 +13,30 @@ from .handlers import ( TomlHandler, YamlHandler, XmlHandler, + YamlHandlerLoopable, + XmlHandlerLoopable, ) class QuotedString(str): - """Marker type for strings that must be double-quoted in YAML output.""" + """ + Marker type for strings that must be double-quoted in YAML output. + """ pass def _fallback_str_representer(dumper: yaml.SafeDumper, data: Any): """ - Fallback for objects the dumper doesn't know about. Represent them as - plain strings. + Fallback for objects the dumper doesn't know about. """ return dumper.represent_scalar("tag:yaml.org,2002:str", str(data)) class _TurtleDumper(yaml.SafeDumper): - """Custom YAML dumper that always double-quotes QuotedString values.""" + """ + Custom YAML dumper that always double-quotes QuotedString values. + """ pass @@ -42,6 +48,7 @@ def _quoted_str_representer(dumper: yaml.SafeDumper, data: QuotedString): _TurtleDumper.add_representer(QuotedString, _quoted_str_representer) # Use our fallback for any unknown object types _TurtleDumper.add_representer(None, _fallback_str_representer) + _HANDLERS: dict[str, BaseHandler] = {} _INI_HANDLER = IniHandler() @@ -49,6 +56,9 @@ _JSON_HANDLER = JsonHandler() _TOML_HANDLER = TomlHandler() _YAML_HANDLER = YamlHandler() _XML_HANDLER = XmlHandler() +_YAML_HANDLER_LOOPABLE = YamlHandlerLoopable() +_XML_HANDLER_LOOPABLE = XmlHandlerLoopable() + _HANDLERS["ini"] = _INI_HANDLER _HANDLERS["json"] = _JSON_HANDLER _HANDLERS["toml"] = _TOML_HANDLER @@ -57,17 +67,15 @@ _HANDLERS["xml"] = _XML_HANDLER def make_var_name(role_prefix: str, path: Iterable[str]) -> str: - """Wrapper for :meth:`BaseHandler.make_var_name`. - - This keeps the public API (and tests) working while the implementation - lives on the BaseHandler class. + """ + Wrapper for :meth:`BaseHandler.make_var_name`. """ return BaseHandler.make_var_name(role_prefix, path) def detect_format(path: Path, explicit: str | None = None) -> str: """ - Determine config format (toml, yaml, json, ini-ish, xml) from argument or filename. + Determine config format from argument or filename. """ if explicit: return explicit @@ -99,27 +107,66 @@ def parse_config(path: Path, fmt: str | None = None) -> tuple[str, Any]: return fmt, parsed -def flatten_config(fmt: str, parsed: Any) -> list[tuple[tuple[str, ...], Any]]: +def analyze_loops(fmt: str, parsed: Any) -> list[LoopCandidate]: """ - Flatten parsed config into a list of (path_tuple, value). + Analyze parsed config to find loop opportunities. + """ + analyzer = LoopAnalyzer() + candidates = analyzer.analyze(parsed, fmt) + + # Filter by confidence threshold + return [c for c in candidates if c.confidence >= LoopAnalyzer.MIN_CONFIDENCE] + + +def flatten_config( + fmt: str, parsed: Any, loop_candidates: list[LoopCandidate] | None = None +) -> list[tuple[tuple[str, ...], Any]]: + """ + Flatten parsed config into (path, value) pairs. + + If loop_candidates is provided, paths within those loops are excluded + from flattening (they'll be handled via loops in the template). """ handler = _HANDLERS.get(fmt) if handler is None: - # preserve previous ValueError for unsupported formats raise ValueError(f"Unsupported format: {fmt}") - return handler.flatten(parsed) + + all_items = handler.flatten(parsed) + + if not loop_candidates: + return all_items + + # Build set of paths to exclude (anything under a loop path) + excluded_prefixes = {candidate.path for candidate in loop_candidates} + + # Filter out items that fall under loop paths + filtered_items = [] + for item_path, value in all_items: + # Check if this path starts with any loop path + is_excluded = False + for loop_path in excluded_prefixes: + if _path_starts_with(item_path, loop_path): + is_excluded = True + break + + if not is_excluded: + filtered_items.append((item_path, value)) + + return filtered_items + + +def _path_starts_with(path: tuple[str, ...], prefix: tuple[str, ...]) -> bool: + """Check if path starts with prefix.""" + if len(path) < len(prefix): + return False + return path[: len(prefix)] == prefix def _normalize_default_value(value: Any) -> Any: """ - Ensure that 'true' / 'false' end up as quoted strings in YAML, not booleans. - - - bool -> QuotedString("true"/"false") - - "true"/"false" (any case) -> QuotedString(original_text) - - everything else -> unchanged + Ensure that 'true' / 'false' end up as quoted strings in YAML. """ if isinstance(value, bool): - # YAML booleans are lower-case; we keep them as strings. return QuotedString("true" if value else "false") if isinstance(value, str) and value.lower() in {"true", "false"}: return QuotedString(value) @@ -129,19 +176,24 @@ def _normalize_default_value(value: Any) -> Any: def generate_defaults_yaml( role_prefix: str, flat_items: list[tuple[tuple[str, ...], Any]], + loop_candidates: list[LoopCandidate] | None = None, ) -> str: """ - Create YAML for defaults/main.yml from flattened items. - - Boolean/boolean-like values ("true"/"false") are forced to be *strings* - and double-quoted in the resulting YAML so that Ansible does not coerce - them back into Python booleans. + Create Ansible YAML for defaults/main.yml. """ defaults: dict[str, Any] = {} + + # Add scalar variables for path, value in flat_items: var_name = make_var_name(role_prefix, path) defaults[var_name] = _normalize_default_value(value) + # Add loop collections + if loop_candidates: + for candidate in loop_candidates: + var_name = make_var_name(role_prefix, candidate.path) + defaults[var_name] = candidate.items + return yaml.dump( defaults, Dumper=_TurtleDumper, @@ -158,16 +210,29 @@ def generate_template( parsed: Any, role_prefix: str, original_text: str | None = None, + loop_candidates: list[LoopCandidate] | None = None, ) -> str: """ Generate a Jinja2 template for the config. - - If original_text is provided, comments and blank lines are preserved by - patching values in-place. Otherwise we fall back to reconstructing from - the parsed structure (no comments). JSON of course does not support - comments. """ + # Use enhanced handler if we have loop candidates handler = _HANDLERS.get(fmt) + + if loop_candidates and fmt in ("yaml", "xml"): + # Use enhanced handlers for YAML and XML when we have loops + if fmt == "yaml": + handler = _YAML_HANDLER_LOOPABLE + elif fmt == "xml": + handler = _XML_HANDLER_LOOPABLE + if handler is None: raise ValueError(f"Unsupported format: {fmt}") + + # Check if handler supports loop-aware generation + if hasattr(handler, "generate_template_with_loops") and loop_candidates: + return handler.generate_template_with_loops( + parsed, role_prefix, original_text, loop_candidates + ) + + # Fallback to original scalar-only generation return handler.generate_template(parsed, role_prefix, original_text=original_text) diff --git a/src/jinjaturtle/handlers/__init__.py b/src/jinjaturtle/handlers/__init__.py index 6bbcba1..4bb73cf 100644 --- a/src/jinjaturtle/handlers/__init__.py +++ b/src/jinjaturtle/handlers/__init__.py @@ -7,6 +7,8 @@ from .json import JsonHandler from .toml import TomlHandler from .yaml import YamlHandler from .xml import XmlHandler +from .xml_loopable import XmlHandlerLoopable +from .yaml_loopable import YamlHandlerLoopable __all__ = [ "BaseHandler", @@ -16,4 +18,6 @@ __all__ = [ "TomlHandler", "YamlHandler", "XmlHandler", + "XmlHandlerLoopable", + "YamlHandlerLoopable", ] diff --git a/src/jinjaturtle/handlers/ini.py b/src/jinjaturtle/handlers/ini.py index d18718a..24bf44f 100644 --- a/src/jinjaturtle/handlers/ini.py +++ b/src/jinjaturtle/handlers/ini.py @@ -72,8 +72,8 @@ class IniHandler(BaseHandler): def _generate_ini_template_from_text(self, role_prefix: str, text: str) -> str: """ - Generate a Jinja2 template for an INI-style file, preserving comments, - blank lines, and section headers by patching values in-place. + Generate a Jinja2 template for an INI/php.ini-style file, preserving + comments, blank lines, and section headers by patching values in-place. """ lines = text.splitlines(keepends=True) current_section: str | None = None diff --git a/src/jinjaturtle/handlers/json.py b/src/jinjaturtle/handlers/json.py index 544a9af..5149238 100644 --- a/src/jinjaturtle/handlers/json.py +++ b/src/jinjaturtle/handlers/json.py @@ -23,7 +23,7 @@ class JsonHandler(DictLikeHandler): ) -> str: if not isinstance(parsed, (dict, list)): raise TypeError("JSON parser result must be a dict or list") - # Rebuild structurally + # As before: ignore original_text and rebuild structurally return self._generate_json_template(role_prefix, parsed) def _generate_json_template(self, role_prefix: str, data: Any) -> str: diff --git a/src/jinjaturtle/handlers/xml_loopable.py b/src/jinjaturtle/handlers/xml_loopable.py new file mode 100644 index 0000000..d2922aa --- /dev/null +++ b/src/jinjaturtle/handlers/xml_loopable.py @@ -0,0 +1,405 @@ +from __future__ import annotations + +from collections import Counter, defaultdict +from pathlib import Path +from typing import Any +import xml.etree.ElementTree as ET # nosec + +from .base import BaseHandler +from ..loop_analyzer import LoopCandidate + + +class XmlHandlerLoopable(BaseHandler): + """ + XML handler that can generate both scalar templates and loop-based templates. + """ + + fmt = "xml" + + def parse(self, path: Path) -> ET.Element: + text = path.read_text(encoding="utf-8") + parser = ET.XMLParser( + target=ET.TreeBuilder(insert_comments=False) + ) # nosec B314 + parser.feed(text) + root = parser.close() + return root + + def flatten(self, parsed: Any) -> list[tuple[tuple[str, ...], Any]]: + if not isinstance(parsed, ET.Element): + raise TypeError("XML parser result must be an Element") + return self._flatten_xml(parsed) + + def generate_template( + self, + parsed: Any, + role_prefix: str, + original_text: str | None = None, + ) -> str: + """Original scalar-only template generation.""" + if original_text is not None: + return self._generate_xml_template_from_text(role_prefix, original_text) + if not isinstance(parsed, ET.Element): + raise TypeError("XML parser result must be an Element") + xml_str = ET.tostring(parsed, encoding="unicode") + return self._generate_xml_template_from_text(role_prefix, xml_str) + + def generate_template_with_loops( + self, + parsed: Any, + role_prefix: str, + original_text: str | None, + loop_candidates: list[LoopCandidate], + ) -> str: + """Generate template with Jinja2 for loops where appropriate.""" + + if original_text is not None: + return self._generate_xml_template_with_loops_from_text( + role_prefix, original_text, loop_candidates + ) + + if not isinstance(parsed, ET.Element): + raise TypeError("XML parser result must be an Element") + + xml_str = ET.tostring(parsed, encoding="unicode") + return self._generate_xml_template_with_loops_from_text( + role_prefix, xml_str, loop_candidates + ) + + def _flatten_xml(self, root: ET.Element) -> list[tuple[tuple[str, ...], Any]]: + """Flatten an XML tree into (path, value) pairs.""" + items: list[tuple[tuple[str, ...], Any]] = [] + + def walk(elem: ET.Element, path: tuple[str, ...]) -> None: + # Attributes + for attr_name, attr_val in elem.attrib.items(): + attr_path = path + (f"@{attr_name}",) + items.append((attr_path, attr_val)) + + # Children + children = [c for c in list(elem) if isinstance(c.tag, str)] + + # Text content + text = (elem.text or "").strip() + if text: + if not elem.attrib and not children: + items.append((path, text)) + else: + items.append((path + ("value",), text)) + + # Repeated siblings get an index; singletons just use the tag + counts = Counter(child.tag for child in children) + index_counters: dict[str, int] = defaultdict(int) + + for child in children: + tag = child.tag + if counts[tag] > 1: + idx = index_counters[tag] + index_counters[tag] += 1 + child_path = path + (tag, str(idx)) + else: + child_path = path + (tag,) + walk(child, child_path) + + walk(root, ()) + return items + + def _split_xml_prolog(self, text: str) -> tuple[str, str]: + """Split XML into (prolog, body).""" + i = 0 + n = len(text) + prolog_parts: list[str] = [] + + while i < n: + while i < n and text[i].isspace(): + prolog_parts.append(text[i]) + i += 1 + if i >= n: + break + + if text.startswith("", i + 2) + if end == -1: + break + prolog_parts.append(text[i : end + 2]) + i = end + 2 + continue + + if text.startswith("", i + 4) + if end == -1: + break + prolog_parts.append(text[i : end + 3]) + i = end + 3 + continue + + if text.startswith("", i + 9) + if end == -1: + break + prolog_parts.append(text[i : end + 1]) + i = end + 1 + continue + + if text[i] == "<": + break + + break + + return "".join(prolog_parts), text[i:] + + def _apply_jinja_to_xml_tree( + self, + role_prefix: str, + root: ET.Element, + loop_candidates: list[LoopCandidate] | None = None, + ) -> None: + """ + Mutate XML tree in-place, replacing values with Jinja expressions. + + If loop_candidates is provided, repeated elements matching a candidate + will be replaced with a {% for %} loop. + """ + + # Build a map of loop paths for quick lookup + loop_paths = {} + if loop_candidates: + for candidate in loop_candidates: + loop_paths[candidate.path] = candidate + + def walk(elem: ET.Element, path: tuple[str, ...]) -> None: + # Attributes (unless this element is in a loop) + for attr_name in list(elem.attrib.keys()): + attr_path = path + (f"@{attr_name}",) + var_name = self.make_var_name(role_prefix, attr_path) + elem.set(attr_name, f"{{{{ {var_name} }}}}") + + # Children + children = [c for c in list(elem) if isinstance(c.tag, str)] + + # Text content + text = (elem.text or "").strip() + if text: + if not elem.attrib and not children: + text_path = path + else: + text_path = path + ("value",) + var_name = self.make_var_name(role_prefix, text_path) + elem.text = f"{{{{ {var_name} }}}}" + + # Handle children - check for loops first + counts = Counter(child.tag for child in children) + index_counters: dict[str, int] = defaultdict(int) + + # Check each tag to see if it's a loop candidate + processed_tags = set() + + for child in children: + tag = child.tag + + # Skip if we've already processed this tag as a loop + if tag in processed_tags: + continue + + child_path = path + (tag,) + + # Check if this is a loop candidate + if child_path in loop_paths: + # Mark this tag as processed + processed_tags.add(tag) + + # Remove all children with this tag + for child_to_remove in [c for c in children if c.tag == tag]: + elem.remove(child_to_remove) + + # Create a loop comment/marker + # We'll handle the actual loop generation in text processing + loop_marker = ET.Comment(f"LOOP:{tag}") + elem.append(loop_marker) + + elif counts[tag] > 1: + # Multiple children but not a loop candidate - use indexed paths + idx = index_counters[tag] + index_counters[tag] += 1 + indexed_path = path + (tag, str(idx)) + walk(child, indexed_path) + else: + # Single child + walk(child, child_path) + + walk(root, ()) + + def _generate_xml_template_from_text(self, role_prefix: str, text: str) -> str: + """Generate scalar-only Jinja2 template.""" + prolog, body = self._split_xml_prolog(text) + + parser = ET.XMLParser(target=ET.TreeBuilder(insert_comments=True)) # nosec B314 + parser.feed(body) + root = parser.close() + + self._apply_jinja_to_xml_tree(role_prefix, root) + + indent = getattr(ET, "indent", None) + if indent is not None: + indent(root, space=" ") # type: ignore[arg-type] + + xml_body = ET.tostring(root, encoding="unicode") + return prolog + xml_body + + def _generate_xml_template_with_loops_from_text( + self, + role_prefix: str, + text: str, + loop_candidates: list[LoopCandidate], + ) -> str: + """Generate Jinja2 template with for loops.""" + + prolog, body = self._split_xml_prolog(text) + + # Parse with comments preserved + parser = ET.XMLParser(target=ET.TreeBuilder(insert_comments=True)) # nosec B314 + parser.feed(body) + root = parser.close() + + # Apply Jinja transformations (including loop markers) + self._apply_jinja_to_xml_tree(role_prefix, root, loop_candidates) + + # Convert to string + indent = getattr(ET, "indent", None) + if indent is not None: + indent(root, space=" ") # type: ignore[arg-type] + + xml_body = ET.tostring(root, encoding="unicode") + + # Post-process to replace loop markers with actual Jinja loops + xml_body = self._insert_xml_loops(xml_body, role_prefix, loop_candidates, root) + + return prolog + xml_body + + def _insert_xml_loops( + self, + xml_str: str, + role_prefix: str, + loop_candidates: list[LoopCandidate], + root: ET.Element, + ) -> str: + """ + Post-process XML string to insert Jinja2 for loops. + + This replaces markers with actual loop constructs. + """ + + # Build a sample element for each loop to use as template + lines = xml_str.split("\n") + result_lines = [] + + for line in lines: + # Check if this line contains a loop marker + if "", start) + tag_name = line[start:end].strip() + + # Find matching loop candidate + candidate = None + for cand in loop_candidates: + if cand.path and cand.path[-1] == tag_name: + candidate = cand + break + + if candidate: + # Get indentation from current line + indent_level = len(line) - len(line.lstrip()) + indent_str = " " * indent_level + + # Generate loop variable name + collection_var = self.make_var_name(role_prefix, candidate.path) + item_var = candidate.loop_var + + # Create sample element from first item + if candidate.items: + sample_elem = self._dict_to_xml_element( + tag_name, candidate.items[0], item_var + ) + + # Apply indentation to the sample element + ET.indent(sample_elem, space=" ") + + # Convert sample to string + sample_str = ET.tostring( + sample_elem, encoding="unicode" + ).strip() + + # Add proper indentation to each line of the sample + sample_lines = sample_str.split("\n") + indented_sample_lines = [ + ( + f"{indent_str} {line}" + if i > 0 + else f"{indent_str} {line}" + ) + for i, line in enumerate(sample_lines) + ] + indented_sample = "\n".join(indented_sample_lines) + + # Build loop + result_lines.append( + f"{indent_str}{{% for {item_var} in {collection_var} %}}" + ) + result_lines.append(indented_sample) + result_lines.append(f"{indent_str}{{% endfor %}}") + else: + # Keep the marker if we can't find the candidate + result_lines.append(line) + else: + result_lines.append(line) + + return "\n".join(result_lines) + + def _dict_to_xml_element( + self, tag: str, data: dict[str, Any], loop_var: str + ) -> ET.Element: + """ + Convert a dict to an XML element with Jinja2 variable references. + + Args: + tag: Element tag name + data: Dict representing element structure + loop_var: Loop variable name to use in Jinja expressions + """ + + elem = ET.Element(tag) + + # Handle attributes and child elements + for key, value in data.items(): + if key.startswith("@"): + # Attribute + attr_name = key[1:] # Remove @ prefix + elem.set(attr_name, f"{{{{ {loop_var}.{attr_name} }}}}") + elif key == "_text": + # Simple text content + elem.text = f"{{{{ {loop_var} }}}}" + elif key == "value": + # Text with attributes/children + elem.text = f"{{{{ {loop_var}.value }}}}" + elif key == "_key": + # This is the dict key (for dict collections), skip in XML + pass + elif isinstance(value, dict): + # Nested element - check if it has _text + child = ET.SubElement(elem, key) + if "_text" in value: + child.text = f"{{{{ {loop_var}.{key}._text }}}}" + else: + # More complex nested structure + for sub_key, sub_val in value.items(): + if not sub_key.startswith("_"): + grandchild = ET.SubElement(child, sub_key) + grandchild.text = f"{{{{ {loop_var}.{key}.{sub_key} }}}}" + elif not isinstance(value, list): + # Simple child element (scalar value) + child = ET.SubElement(elem, key) + child.text = f"{{{{ {loop_var}.{key} }}}}" + + return elem diff --git a/src/jinjaturtle/handlers/yaml.py b/src/jinjaturtle/handlers/yaml.py index f4b3fc5..2ebaf3e 100644 --- a/src/jinjaturtle/handlers/yaml.py +++ b/src/jinjaturtle/handlers/yaml.py @@ -9,7 +9,7 @@ from . import DictLikeHandler class YamlHandler(DictLikeHandler): fmt = "yaml" - flatten_lists = True + flatten_lists = True # you flatten YAML lists def parse(self, path: Path) -> Any: text = path.read_text(encoding="utf-8") @@ -97,6 +97,8 @@ class YamlHandler(DictLikeHandler): out_lines.append(raw_line) continue + # We have an inline scalar value on this same line. + # Separate value from inline comment value_part, comment_part = self._split_inline_comment( rest_stripped, {"#"} diff --git a/src/jinjaturtle/handlers/yaml_loopable.py b/src/jinjaturtle/handlers/yaml_loopable.py new file mode 100644 index 0000000..2cc66a9 --- /dev/null +++ b/src/jinjaturtle/handlers/yaml_loopable.py @@ -0,0 +1,449 @@ +from __future__ import annotations + +import yaml +from pathlib import Path +from typing import Any + +from .dict import DictLikeHandler +from ..loop_analyzer import LoopCandidate + + +class YamlHandlerLoopable(DictLikeHandler): + """ + YAML handler that can generate both scalar templates and loop-based templates. + """ + + fmt = "yaml" + flatten_lists = True + + def parse(self, path: Path) -> Any: + text = path.read_text(encoding="utf-8") + return yaml.safe_load(text) or {} + + def generate_template( + self, + parsed: Any, + role_prefix: str, + original_text: str | None = None, + ) -> str: + """Original scalar-only template generation.""" + if original_text is not None: + return self._generate_yaml_template_from_text(role_prefix, original_text) + if not isinstance(parsed, (dict, list)): + raise TypeError("YAML parser result must be a dict or list") + dumped = yaml.safe_dump(parsed, sort_keys=False) + return self._generate_yaml_template_from_text(role_prefix, dumped) + + def generate_template_with_loops( + self, + parsed: Any, + role_prefix: str, + original_text: str | None, + loop_candidates: list[LoopCandidate], + ) -> str: + """Generate template with Jinja2 for loops where appropriate.""" + + # Build loop path set for quick lookup + loop_paths = {candidate.path for candidate in loop_candidates} + + if original_text is not None: + return self._generate_yaml_template_with_loops_from_text( + role_prefix, original_text, loop_candidates, loop_paths + ) + + if not isinstance(parsed, (dict, list)): + raise TypeError("YAML parser result must be a dict or list") + + dumped = yaml.safe_dump(parsed, sort_keys=False) + return self._generate_yaml_template_with_loops_from_text( + role_prefix, dumped, loop_candidates, loop_paths + ) + + def _generate_yaml_template_from_text( + self, + role_prefix: str, + text: str, + ) -> str: + """Original scalar-only template generation (unchanged from base).""" + lines = text.splitlines(keepends=True) + out_lines: list[str] = [] + + stack: list[tuple[int, tuple[str, ...], str]] = [] + seq_counters: dict[tuple[str, ...], int] = {} + + def current_path() -> tuple[str, ...]: + return stack[-1][1] if stack else () + + for raw_line in lines: + stripped = raw_line.lstrip() + indent = len(raw_line) - len(stripped) + + if not stripped or stripped.startswith("#"): + out_lines.append(raw_line) + continue + + while stack and indent < stack[-1][0]: + stack.pop() + + if ":" in stripped and not stripped.lstrip().startswith("- "): + key_part, rest = stripped.split(":", 1) + key = key_part.strip() + if not key: + out_lines.append(raw_line) + continue + + rest_stripped = rest.lstrip(" \t") + value_candidate, _ = self._split_inline_comment(rest_stripped, {"#"}) + has_value = bool(value_candidate.strip()) + + if stack and stack[-1][0] == indent and stack[-1][2] == "map": + stack.pop() + path = current_path() + (key,) + stack.append((indent, path, "map")) + + if not has_value: + out_lines.append(raw_line) + continue + + value_part, comment_part = self._split_inline_comment( + rest_stripped, {"#"} + ) + raw_value = value_part.strip() + var_name = self.make_var_name(role_prefix, path) + + use_quotes = ( + len(raw_value) >= 2 + and raw_value[0] == raw_value[-1] + and raw_value[0] in {'"', "'"} + ) + + if use_quotes: + q = raw_value[0] + replacement = f"{q}{{{{ {var_name} }}}}{q}" + else: + replacement = f"{{{{ {var_name} }}}}" + + leading = rest[: len(rest) - len(rest.lstrip(" \t"))] + new_stripped = f"{key}: {leading}{replacement}{comment_part}" + out_lines.append( + " " * indent + + new_stripped + + ("\n" if raw_line.endswith("\n") else "") + ) + continue + + if stripped.startswith("- "): + if not stack or stack[-1][0] != indent or stack[-1][2] != "seq": + parent_path = current_path() + stack.append((indent, parent_path, "seq")) + + parent_path = stack[-1][1] + content = stripped[2:] + + index = seq_counters.get(parent_path, 0) + seq_counters[parent_path] = index + 1 + + path = parent_path + (str(index),) + + value_part, comment_part = self._split_inline_comment(content, {"#"}) + raw_value = value_part.strip() + var_name = self.make_var_name(role_prefix, path) + + use_quotes = ( + len(raw_value) >= 2 + and raw_value[0] == raw_value[-1] + and raw_value[0] in {'"', "'"} + ) + + if use_quotes: + q = raw_value[0] + replacement = f"{q}{{{{ {var_name} }}}}{q}" + else: + replacement = f"{{{{ {var_name} }}}}" + + new_stripped = f"- {replacement}{comment_part}" + out_lines.append( + " " * indent + + new_stripped + + ("\n" if raw_line.endswith("\n") else "") + ) + continue + + out_lines.append(raw_line) + + return "".join(out_lines) + + def _generate_yaml_template_with_loops_from_text( + self, + role_prefix: str, + text: str, + loop_candidates: list[LoopCandidate], + loop_paths: set[tuple[str, ...]], + ) -> str: + """ + Generate YAML template with Jinja2 for loops. + + Strategy: + 1. Parse YAML line-by-line maintaining context + 2. When we encounter a path that's a loop candidate: + - Replace that section with a {% for %} loop + - Use the first item as template structure + 3. Everything else gets scalar variable replacement + """ + + lines = text.splitlines(keepends=True) + out_lines: list[str] = [] + + stack: list[tuple[int, tuple[str, ...], str]] = [] + seq_counters: dict[tuple[str, ...], int] = {} + + # Track which lines are part of loop sections (to skip them) + skip_until_indent: int | None = None + + def current_path() -> tuple[str, ...]: + return stack[-1][1] if stack else () + + for raw_line in lines: + stripped = raw_line.lstrip() + indent = len(raw_line) - len(stripped) + + # If we're skipping lines (inside a loop section), check if we can stop + if skip_until_indent is not None: + if ( + indent <= skip_until_indent + and stripped + and not stripped.startswith("#") + ): + skip_until_indent = None + else: + continue # Skip this line + + # Blank or comment lines + if not stripped or stripped.startswith("#"): + out_lines.append(raw_line) + continue + + # Adjust stack based on indent + while stack and indent < stack[-1][0]: + stack.pop() + + # --- Handle mapping key lines: "key:" or "key: value" + if ":" in stripped and not stripped.lstrip().startswith("- "): + key_part, rest = stripped.split(":", 1) + key = key_part.strip() + if not key: + out_lines.append(raw_line) + continue + + rest_stripped = rest.lstrip(" \t") + value_candidate, _ = self._split_inline_comment(rest_stripped, {"#"}) + has_value = bool(value_candidate.strip()) + + if stack and stack[-1][0] == indent and stack[-1][2] == "map": + stack.pop() + path = current_path() + (key,) + stack.append((indent, path, "map")) + + # Check if this path is a loop candidate + if path in loop_paths: + # Find the matching candidate + candidate = next(c for c in loop_candidates if c.path == path) + + # Generate loop + loop_str = self._generate_yaml_loop(candidate, role_prefix, indent) + out_lines.append(loop_str) + + # Skip subsequent lines that are part of this collection + skip_until_indent = indent + continue + + if not has_value: + out_lines.append(raw_line) + continue + + # Scalar value - replace with variable + value_part, comment_part = self._split_inline_comment( + rest_stripped, {"#"} + ) + raw_value = value_part.strip() + var_name = self.make_var_name(role_prefix, path) + + use_quotes = ( + len(raw_value) >= 2 + and raw_value[0] == raw_value[-1] + and raw_value[0] in {'"', "'"} + ) + + if use_quotes: + q = raw_value[0] + replacement = f"{q}{{{{ {var_name} }}}}{q}" + else: + replacement = f"{{{{ {var_name} }}}}" + + leading = rest[: len(rest) - len(rest.lstrip(" \t"))] + new_stripped = f"{key}: {leading}{replacement}{comment_part}" + out_lines.append( + " " * indent + + new_stripped + + ("\n" if raw_line.endswith("\n") else "") + ) + continue + + # --- Handle list items: "- value" or "- key: value" + if stripped.startswith("- "): + if not stack or stack[-1][0] != indent or stack[-1][2] != "seq": + parent_path = current_path() + stack.append((indent, parent_path, "seq")) + + parent_path = stack[-1][1] + + # Check if parent path is a loop candidate + if parent_path in loop_paths: + # Find the matching candidate + candidate = next( + c for c in loop_candidates if c.path == parent_path + ) + + # Generate loop (with indent for the '-' items) + loop_str = self._generate_yaml_loop( + candidate, role_prefix, indent, is_list=True + ) + out_lines.append(loop_str) + + # Skip subsequent items + skip_until_indent = indent - 1 if indent > 0 else None + continue + + content = stripped[2:] + index = seq_counters.get(parent_path, 0) + seq_counters[parent_path] = index + 1 + + path = parent_path + (str(index),) + + value_part, comment_part = self._split_inline_comment(content, {"#"}) + raw_value = value_part.strip() + var_name = self.make_var_name(role_prefix, path) + + use_quotes = ( + len(raw_value) >= 2 + and raw_value[0] == raw_value[-1] + and raw_value[0] in {'"', "'"} + ) + + if use_quotes: + q = raw_value[0] + replacement = f"{q}{{{{ {var_name} }}}}{q}" + else: + replacement = f"{{{{ {var_name} }}}}" + + new_stripped = f"- {replacement}{comment_part}" + out_lines.append( + " " * indent + + new_stripped + + ("\n" if raw_line.endswith("\n") else "") + ) + continue + + out_lines.append(raw_line) + + return "".join(out_lines) + + def _generate_yaml_loop( + self, + candidate: LoopCandidate, + role_prefix: str, + indent: int, + is_list: bool = False, + ) -> str: + """ + Generate a Jinja2 for loop for a YAML collection. + + Args: + candidate: Loop candidate with items and metadata + role_prefix: Variable prefix + indent: Indentation level in spaces + is_list: True if this is a YAML list, False if dict + + Returns: + YAML string with Jinja2 loop + """ + + indent_str = " " * indent + collection_var = self.make_var_name(role_prefix, candidate.path) + item_var = candidate.loop_var + + lines = [] + + if not is_list: + # Dict-style: key: {% for ... %} + key = candidate.path[-1] if candidate.path else "items" + lines.append(f"{indent_str}{key}:") + lines.append(f"{indent_str} {{% for {item_var} in {collection_var} %}}") + else: + # List-style: just the loop + lines.append(f"{indent_str}{{% for {item_var} in {collection_var} %}}") + + # Generate template for item structure + if candidate.items: + sample_item = candidate.items[0] + item_indent = indent + 2 if not is_list else indent + + if candidate.item_schema == "scalar": + # Simple list of scalars + if is_list: + lines.append(f"{indent_str}- {{{{ {item_var} }}}}") + else: + lines.append(f"{indent_str} - {{{{ {item_var} }}}}") + + elif candidate.item_schema in ("simple_dict", "nested"): + # List of dicts or complex items - these are ALWAYS list items in YAML + item_lines = self._dict_to_yaml_lines( + sample_item, item_var, item_indent, is_list_item=True + ) + lines.extend(item_lines) + + # Close loop + close_indent = indent + 2 if not is_list else indent + lines.append(f"{' ' * close_indent}{{% endfor %}}") + + return "\n".join(lines) + "\n" + + def _dict_to_yaml_lines( + self, + data: dict[str, Any], + loop_var: str, + indent: int, + is_list_item: bool = False, + ) -> list[str]: + """ + Convert a dict to YAML lines with Jinja2 variable references. + + Args: + data: Dict representing item structure + loop_var: Loop variable name + indent: Base indentation level + is_list_item: True if this should start with '-' + + Returns: + List of YAML lines + """ + + lines = [] + indent_str = " " * indent + + first_key = True + for key, value in data.items(): + if key == "_key": + # Special key for dict collections - output as comment or skip + continue + + if first_key and is_list_item: + # First key gets the list marker + lines.append(f"{indent_str}- {key}: {{{{ {loop_var}.{key} }}}}") + first_key = False + else: + # Subsequent keys are indented + sub_indent = indent + 2 if is_list_item else indent + lines.append(f"{' ' * sub_indent}{key}: {{{{ {loop_var}.{key} }}}}") + + return lines diff --git a/src/jinjaturtle/loop_analyzer.py b/src/jinjaturtle/loop_analyzer.py new file mode 100644 index 0000000..6835104 --- /dev/null +++ b/src/jinjaturtle/loop_analyzer.py @@ -0,0 +1,433 @@ +from __future__ import annotations + +from collections import Counter +from typing import Any, Literal + + +class LoopCandidate: + """ + Represents a detected loop opportunity in the config structure. + + Attributes: + path: Path to the collection (e.g. ("servers",) or ("config", "endpoints")) + loop_var: Variable name for loop items (e.g. "server", "endpoint") + items: The actual list/dict items that will be looped over + item_schema: Structure of each item ("scalar", "simple_dict", "nested") + confidence: How confident we are this should be a loop (0.0 to 1.0) + """ + + def __init__( + self, + path: tuple[str, ...], + loop_var: str, + items: list[Any] | dict[str, Any], + item_schema: Literal["scalar", "simple_dict", "nested"], + confidence: float = 1.0, + ): + self.path = path + self.loop_var = loop_var + self.items = items + self.item_schema = item_schema + self.confidence = confidence + + def __repr__(self) -> str: + path_str = ".".join(self.path) if self.path else "" + return ( + f"LoopCandidate(path={path_str}, var={self.loop_var}, " + f"count={len(self.items)}, schema={self.item_schema}, " + f"confidence={self.confidence:.2f})" + ) + + +class LoopAnalyzer: + """ + Analyzes parsed config structures to detect loop opportunities. + + Strategy: + 1. Detect homogeneous lists (all items same type/structure) + 2. Detect dict collections where all values have similar structure + 3. Assign confidence scores based on: + - Homogeneity of items + - Number of items (2+ for loops to make sense) + - Depth and complexity (too nested -> fallback to scalars) + - Structural patterns (e.g., repeated XML elements) + """ + + # Configuration thresholds + MIN_ITEMS_FOR_LOOP = 2 # Need at least 2 items to justify a loop + MAX_NESTING_DEPTH = 3 # Beyond this, use scalar fallback + MIN_CONFIDENCE = 0.7 # Minimum confidence to use a loop + + def __init__(self): + self.candidates: list[LoopCandidate] = [] + + def analyze(self, parsed: Any, fmt: str) -> list[LoopCandidate]: + """ + Analyze a parsed config structure and return loop candidates. + + Args: + parsed: The parsed config (dict, list, or ET.Element for XML) + fmt: Format type ("yaml", "json", "toml", "xml", "ini") + + Returns: + List of LoopCandidate objects, sorted by path depth (shallowest first) + """ + self.candidates = [] + + if fmt == "xml": + self._analyze_xml(parsed) + elif fmt in ("yaml", "json", "toml"): + self._analyze_dict_like(parsed, path=()) + # INI files are typically flat key-value, not suitable for loops + + # Sort by path depth (process parent structures before children) + self.candidates.sort(key=lambda c: len(c.path)) + return self.candidates + + def _analyze_dict_like( + self, obj: Any, path: tuple[str, ...], depth: int = 0 + ) -> None: + """Recursively analyze dict/list structures.""" + + # Safety: don't go too deep + if depth > self.MAX_NESTING_DEPTH: + return + + if isinstance(obj, dict): + # Check if this dict's values form a homogeneous collection + if len(obj) >= self.MIN_ITEMS_FOR_LOOP: + candidate = self._check_dict_collection(obj, path) + if candidate: + self.candidates.append(candidate) + # Don't recurse into items we've marked as a loop + return + + # Recurse into dict values + for key, value in obj.items(): + self._analyze_dict_like(value, path + (str(key),), depth + 1) + + elif isinstance(obj, list): + # Check if this list is homogeneous + if len(obj) >= self.MIN_ITEMS_FOR_LOOP: + candidate = self._check_list_collection(obj, path) + if candidate: + self.candidates.append(candidate) + # Don't recurse into items we've marked as a loop + return + + # If not a good loop candidate, recurse into items + for i, item in enumerate(obj): + self._analyze_dict_like(item, path + (str(i),), depth + 1) + + def _check_list_collection( + self, items: list[Any], path: tuple[str, ...] + ) -> LoopCandidate | None: + """Check if a list should be a loop.""" + + if not items: + return None + + # Analyze item types and structures + item_types = [type(item).__name__ for item in items] + type_counts = Counter(item_types) + + # Must be homogeneous (all same type) + if len(type_counts) != 1: + return None + + item_type = item_types[0] + + # Scalar list (strings, numbers, bools) + if item_type in ("str", "int", "float", "bool", "NoneType"): + return LoopCandidate( + path=path, + loop_var=self._derive_loop_var(path, singular=True), + items=items, + item_schema="scalar", + confidence=1.0, + ) + + # List of dicts - check structural homogeneity + if item_type == "dict": + schema = self._analyze_dict_schema(items) + if schema == "simple_dict": + return LoopCandidate( + path=path, + loop_var=self._derive_loop_var(path, singular=True), + items=items, + item_schema="simple_dict", + confidence=0.95, + ) + elif schema == "homogeneous": + return LoopCandidate( + path=path, + loop_var=self._derive_loop_var(path, singular=True), + items=items, + item_schema="simple_dict", + confidence=0.85, + ) + # If too complex/heterogeneous, return None (use scalar fallback) + + return None + + def _check_dict_collection( + self, obj: dict[str, Any], path: tuple[str, ...] + ) -> LoopCandidate | None: + """ + Check if a dict's values form a collection suitable for looping. + + Example: {"server1": {...}, "server2": {...}} where all values + have the same structure. + """ + + if not obj: + return None + + values = list(obj.values()) + + # Check type homogeneity + value_types = [type(v).__name__ for v in values] + type_counts = Counter(value_types) + + if len(type_counts) != 1: + return None + + value_type = value_types[0] + + # Only interested in dict values for dict collections + # (scalar-valued dicts stay as scalars) + if value_type != "dict": + return None + + # Check structural homogeneity + schema = self._analyze_dict_schema(values) + if schema in ("simple_dict", "homogeneous"): + confidence = 0.9 if schema == "simple_dict" else 0.8 + + # Convert dict to list of items with 'key' added + items_with_keys = [{"_key": k, **v} for k, v in obj.items()] + + return LoopCandidate( + path=path, + loop_var=self._derive_loop_var(path, singular=True), + items=items_with_keys, + item_schema="simple_dict", + confidence=confidence, + ) + + return None + + def _analyze_dict_schema( + self, dicts: list[dict[str, Any]] + ) -> Literal["simple_dict", "homogeneous", "heterogeneous"]: + """ + Analyze a list of dicts to determine their structural homogeneity. + + Returns: + "simple_dict": All dicts have same keys, all values are scalars + "homogeneous": All dicts have same keys, may have nested structures + "heterogeneous": Dicts have different structures + """ + + if not dicts: + return "heterogeneous" + + # Get key sets from each dict + key_sets = [set(d.keys()) for d in dicts] + + # Check if all have the same keys + first_keys = key_sets[0] + if not all(ks == first_keys for ks in key_sets): + # Allow minor variations (80% key overlap) + all_keys = set().union(*key_sets) + common_keys = set.intersection(*key_sets) + if len(common_keys) / len(all_keys) < 0.8: + return "heterogeneous" + + # Check if values are all scalars + all_scalars = True + for d in dicts: + for v in d.values(): + if isinstance(v, (dict, list)): + all_scalars = False + break + if not all_scalars: + break + + if all_scalars: + return "simple_dict" + else: + return "homogeneous" + + def _derive_loop_var(self, path: tuple[str, ...], singular: bool = True) -> str: + """ + Derive a sensible loop variable name from the path. + + Examples: + ("servers",) -> "server" (singular) + ("config", "endpoints") -> "endpoint" + ("users",) -> "user" + ("databases",) -> "database" + """ + + if not path: + return "item" + + last_part = path[-1].lower() + + if singular: + # Simple English pluralization rules (order matters - most specific first) + if last_part.endswith("sses"): + return last_part[:-2] # "classes" -> "class" + elif last_part.endswith("xes"): + return last_part[:-2] # "boxes" -> "box" + elif last_part.endswith("ches"): + return last_part[:-2] # "watches" -> "watch" + elif last_part.endswith("shes"): + return last_part[:-2] # "dishes" -> "dish" + elif last_part.endswith("ies"): + return last_part[:-3] + "y" # "entries" -> "entry" + elif last_part.endswith("oes"): + return last_part[:-2] # "tomatoes" -> "tomato" + elif last_part.endswith("ses") and not last_part.endswith("sses"): + # Only for words ending in "se": "databases" -> "database" + # But NOT for "sses" which we already handled + if len(last_part) > 3 and last_part[-4] not in "aeiou": + # "databases" -> "database" (consonant before 's') + return last_part[:-1] + else: + # "houses" -> "house", "causes" -> "cause" + return last_part[:-1] + elif last_part.endswith("s") and not last_part.endswith("ss"): + return last_part[:-1] # "servers" -> "server" + + return last_part + + def _analyze_xml(self, root: Any) -> None: + """ + Analyze XML structure for loop opportunities. + + XML is particularly suited for loops when we have repeated sibling elements. + """ + import xml.etree.ElementTree as ET + + if not isinstance(root, ET.Element): + return + + self._walk_xml_element(root, path=()) + + def _walk_xml_element(self, elem: Any, path: tuple[str, ...]) -> None: + """Recursively walk XML elements looking for repeated siblings.""" + import xml.etree.ElementTree as ET + + children = [c for c in list(elem) if isinstance(c.tag, str)] + + # Count sibling elements by tag + tag_counts = Counter(child.tag for child in children) + + # Find repeated tags + for tag, count in tag_counts.items(): + if count >= self.MIN_ITEMS_FOR_LOOP: + # Get all elements with this tag + tagged_elements = [c for c in children if c.tag == tag] + + # Check homogeneity + if self._are_xml_elements_homogeneous(tagged_elements): + # Convert to dict representation for easier handling + items = [self._xml_elem_to_dict(el) for el in tagged_elements] + + # Determine schema + if all(self._is_scalar_dict(item) for item in items): + schema = "simple_dict" + confidence = 1.0 + else: + schema = "nested" + confidence = 0.8 + + candidate = LoopCandidate( + path=path + (tag,), + loop_var=self._derive_loop_var((tag,), singular=True), + items=items, + item_schema=schema, + confidence=confidence, + ) + self.candidates.append(candidate) + + # Recurse into unique children (non-repeated ones will be processed normally) + for tag, count in tag_counts.items(): + if count == 1: + child = next(c for c in children if c.tag == tag) + self._walk_xml_element(child, path + (tag,)) + + def _are_xml_elements_homogeneous(self, elements: list[Any]) -> bool: + """Check if XML elements have similar structure.""" + + if not elements: + return False + + # Compare attribute sets + attr_sets = [set(el.attrib.keys()) for el in elements] + first_attrs = attr_sets[0] + + if not all(attrs == first_attrs for attrs in attr_sets): + # Allow some variation + all_attrs = set().union(*attr_sets) + common_attrs = set.intersection(*attr_sets) if attr_sets else set() + if len(common_attrs) / max(len(all_attrs), 1) < 0.7: + return False + + # Compare child element tags + child_tag_sets = [ + set(c.tag for c in el if hasattr(c, "tag")) for el in elements + ] + + if child_tag_sets: + first_tags = child_tag_sets[0] + if not all(tags == first_tags for tags in child_tag_sets): + # Allow some variation + all_tags = set().union(*child_tag_sets) + common_tags = ( + set.intersection(*child_tag_sets) if child_tag_sets else set() + ) + if len(common_tags) / max(len(all_tags), 1) < 0.7: + return False + + return True + + def _xml_elem_to_dict(self, elem: Any) -> dict[str, Any]: + """Convert an XML element to a dict representation.""" + result: dict[str, Any] = {} + + # Add attributes + for attr_name, attr_val in elem.attrib.items(): + result[f"@{attr_name}"] = attr_val + + # Add text content + text = (elem.text or "").strip() + if text: + children = [c for c in list(elem) if hasattr(c, "tag")] + if not elem.attrib and not children: + result["_text"] = text + else: + result["value"] = text + + # Add child elements + for child in elem: + if hasattr(child, "tag"): + child_dict = self._xml_elem_to_dict(child) + if child.tag in result: + # Multiple children with same tag - convert to list + if not isinstance(result[child.tag], list): + result[child.tag] = [result[child.tag]] + result[child.tag].append(child_dict) + else: + result[child.tag] = child_dict + + return result + + def _is_scalar_dict(self, obj: dict[str, Any]) -> bool: + """Check if a dict contains only scalar values (no nested dicts/lists).""" + for v in obj.values(): + if isinstance(v, (dict, list)): + return False + return True From f66f58a7bbb53591e6e72113a16d35f75e3ae21d Mon Sep 17 00:00:00 2001 From: Miguel Jacq Date: Fri, 28 Nov 2025 12:28:46 +1100 Subject: [PATCH 4/5] Rename some methods, merge the loopable classes and just always try it --- pyproject.toml | 2 +- src/jinjaturtle/cli.py | 12 +- src/jinjaturtle/core.py | 24 +- src/jinjaturtle/handlers/__init__.py | 4 - src/jinjaturtle/handlers/base.py | 4 +- src/jinjaturtle/handlers/ini.py | 2 +- src/jinjaturtle/handlers/json.py | 2 +- src/jinjaturtle/handlers/toml.py | 2 +- src/jinjaturtle/handlers/xml.py | 357 ++++++++++++++--- src/jinjaturtle/handlers/xml_loopable.py | 405 ------------------- src/jinjaturtle/handlers/yaml.py | 346 +++++++++++++++-- src/jinjaturtle/handlers/yaml_loopable.py | 449 ---------------------- src/jinjaturtle/loop_analyzer.py | 18 +- tests/test_base_handler.py | 2 +- tests/test_core_utils.py | 32 +- tests/test_ini_handler.py | 16 +- tests/test_json_handler.py | 18 +- tests/test_toml_handler.py | 16 +- tests/test_xml_handler.py | 24 +- tests/test_yaml_handler.py | 18 +- 20 files changed, 702 insertions(+), 1051 deletions(-) delete mode 100644 src/jinjaturtle/handlers/xml_loopable.py delete mode 100644 src/jinjaturtle/handlers/yaml_loopable.py diff --git a/pyproject.toml b/pyproject.toml index a54c5c4..937cb9b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "jinjaturtle" -version = "0.1.4" +version = "0.2.0" description = "Convert config files into Ansible defaults and Jinja2 templates." authors = ["Miguel Jacq "] license = "GPL-3.0-or-later" diff --git a/src/jinjaturtle/cli.py b/src/jinjaturtle/cli.py index 032aa7e..40a9aba 100644 --- a/src/jinjaturtle/cli.py +++ b/src/jinjaturtle/cli.py @@ -9,8 +9,8 @@ from .core import ( parse_config, analyze_loops, flatten_config, - generate_defaults_yaml, - generate_template, + generate_ansible_yaml, + generate_jinja2_template, ) @@ -66,10 +66,10 @@ def _main(argv: list[str] | None = None) -> int: flat_items = flatten_config(fmt, parsed, loop_candidates) # Generate defaults YAML (with loop collections if detected) - defaults_yaml = generate_defaults_yaml(args.role_name, flat_items, loop_candidates) + ansible_yaml = generate_ansible_yaml(args.role_name, flat_items, loop_candidates) # Generate template (with loops if detected) - template_str = generate_template( + template_str = generate_jinja2_template( fmt, parsed, args.role_name, @@ -78,10 +78,10 @@ def _main(argv: list[str] | None = None) -> int: ) if args.defaults_output: - Path(args.defaults_output).write_text(defaults_yaml, encoding="utf-8") + Path(args.defaults_output).write_text(ansible_yaml, encoding="utf-8") else: print("# defaults/main.yml") - print(defaults_yaml, end="") + print(ansible_yaml, end="") if args.template_output: Path(args.template_output).write_text(template_str, encoding="utf-8") diff --git a/src/jinjaturtle/core.py b/src/jinjaturtle/core.py index b0c24b7..c8e6d71 100644 --- a/src/jinjaturtle/core.py +++ b/src/jinjaturtle/core.py @@ -13,8 +13,6 @@ from .handlers import ( TomlHandler, YamlHandler, XmlHandler, - YamlHandlerLoopable, - XmlHandlerLoopable, ) @@ -56,8 +54,6 @@ _JSON_HANDLER = JsonHandler() _TOML_HANDLER = TomlHandler() _YAML_HANDLER = YamlHandler() _XML_HANDLER = XmlHandler() -_YAML_HANDLER_LOOPABLE = YamlHandlerLoopable() -_XML_HANDLER_LOOPABLE = XmlHandlerLoopable() _HANDLERS["ini"] = _INI_HANDLER _HANDLERS["json"] = _JSON_HANDLER @@ -173,7 +169,7 @@ def _normalize_default_value(value: Any) -> Any: return value -def generate_defaults_yaml( +def generate_ansible_yaml( role_prefix: str, flat_items: list[tuple[tuple[str, ...], Any]], loop_candidates: list[LoopCandidate] | None = None, @@ -205,7 +201,7 @@ def generate_defaults_yaml( ) -def generate_template( +def generate_jinja2_template( fmt: str, parsed: Any, role_prefix: str, @@ -215,24 +211,18 @@ def generate_template( """ Generate a Jinja2 template for the config. """ - # Use enhanced handler if we have loop candidates handler = _HANDLERS.get(fmt) - if loop_candidates and fmt in ("yaml", "xml"): - # Use enhanced handlers for YAML and XML when we have loops - if fmt == "yaml": - handler = _YAML_HANDLER_LOOPABLE - elif fmt == "xml": - handler = _XML_HANDLER_LOOPABLE - if handler is None: raise ValueError(f"Unsupported format: {fmt}") # Check if handler supports loop-aware generation - if hasattr(handler, "generate_template_with_loops") and loop_candidates: - return handler.generate_template_with_loops( + if hasattr(handler, "generate_jinja2_template_with_loops") and loop_candidates: + return handler.generate_jinja2_template_with_loops( parsed, role_prefix, original_text, loop_candidates ) # Fallback to original scalar-only generation - return handler.generate_template(parsed, role_prefix, original_text=original_text) + return handler.generate_jinja2_template( + parsed, role_prefix, original_text=original_text + ) diff --git a/src/jinjaturtle/handlers/__init__.py b/src/jinjaturtle/handlers/__init__.py index 4bb73cf..6bbcba1 100644 --- a/src/jinjaturtle/handlers/__init__.py +++ b/src/jinjaturtle/handlers/__init__.py @@ -7,8 +7,6 @@ from .json import JsonHandler from .toml import TomlHandler from .yaml import YamlHandler from .xml import XmlHandler -from .xml_loopable import XmlHandlerLoopable -from .yaml_loopable import YamlHandlerLoopable __all__ = [ "BaseHandler", @@ -18,6 +16,4 @@ __all__ = [ "TomlHandler", "YamlHandler", "XmlHandler", - "XmlHandlerLoopable", - "YamlHandlerLoopable", ] diff --git a/src/jinjaturtle/handlers/base.py b/src/jinjaturtle/handlers/base.py index f427b76..14aaec7 100644 --- a/src/jinjaturtle/handlers/base.py +++ b/src/jinjaturtle/handlers/base.py @@ -11,7 +11,7 @@ class BaseHandler: Each handler is responsible for: - parse(path) -> parsed object - flatten(parsed) -> list[(path_tuple, value)] - - generate_template(parsed, role_prefix, original_text=None) -> str + - generate_jinja2_template(parsed, role_prefix, original_text=None) -> str """ fmt: str # e.g. "ini", "yaml", ... @@ -22,7 +22,7 @@ class BaseHandler: def flatten(self, parsed: Any) -> list[tuple[tuple[str, ...], Any]]: raise NotImplementedError - def generate_template( + def generate_jinja2_template( self, parsed: Any, role_prefix: str, diff --git a/src/jinjaturtle/handlers/ini.py b/src/jinjaturtle/handlers/ini.py index 24bf44f..ce5848e 100644 --- a/src/jinjaturtle/handlers/ini.py +++ b/src/jinjaturtle/handlers/ini.py @@ -32,7 +32,7 @@ class IniHandler(BaseHandler): items.append(((section, key), processed)) return items - def generate_template( + def generate_jinja2_template( self, parsed: Any, role_prefix: str, diff --git a/src/jinjaturtle/handlers/json.py b/src/jinjaturtle/handlers/json.py index 5149238..dbf7d82 100644 --- a/src/jinjaturtle/handlers/json.py +++ b/src/jinjaturtle/handlers/json.py @@ -15,7 +15,7 @@ class JsonHandler(DictLikeHandler): with path.open("r", encoding="utf-8") as f: return json.load(f) - def generate_template( + def generate_jinja2_template( self, parsed: Any, role_prefix: str, diff --git a/src/jinjaturtle/handlers/toml.py b/src/jinjaturtle/handlers/toml.py index b70a9c8..069b319 100644 --- a/src/jinjaturtle/handlers/toml.py +++ b/src/jinjaturtle/handlers/toml.py @@ -19,7 +19,7 @@ class TomlHandler(DictLikeHandler): with path.open("rb") as f: return tomllib.load(f) - def generate_template( + def generate_jinja2_template( self, parsed: Any, role_prefix: str, diff --git a/src/jinjaturtle/handlers/xml.py b/src/jinjaturtle/handlers/xml.py index 4d99a7d..bc92c26 100644 --- a/src/jinjaturtle/handlers/xml.py +++ b/src/jinjaturtle/handlers/xml.py @@ -5,19 +5,19 @@ from pathlib import Path from typing import Any import xml.etree.ElementTree as ET # nosec -from . import BaseHandler +from .base import BaseHandler +from ..loop_analyzer import LoopCandidate class XmlHandler(BaseHandler): + """ + XML handler that can generate both scalar templates and loop-based templates. + """ + fmt = "xml" def parse(self, path: Path) -> ET.Element: text = path.read_text(encoding="utf-8") - # Parse with an explicit XMLParser instance so this stays compatible - # with Python versions where xml.etree.ElementTree.fromstring() may - # not accept a ``parser=`` keyword argument. - # defusedxml.defuse_stdlib() is called in the CLI entrypoint, so using - # the stdlib XMLParser here is safe. parser = ET.XMLParser( target=ET.TreeBuilder(insert_comments=False) ) # nosec B314 @@ -30,12 +30,13 @@ class XmlHandler(BaseHandler): raise TypeError("XML parser result must be an Element") return self._flatten_xml(parsed) - def generate_template( + def generate_jinja2_template( self, parsed: Any, role_prefix: str, original_text: str | None = None, ) -> str: + """Original scalar-only template generation.""" if original_text is not None: return self._generate_xml_template_from_text(role_prefix, original_text) if not isinstance(parsed, ET.Element): @@ -43,25 +44,30 @@ class XmlHandler(BaseHandler): xml_str = ET.tostring(parsed, encoding="unicode") return self._generate_xml_template_from_text(role_prefix, xml_str) - def _flatten_xml(self, root: ET.Element) -> list[tuple[tuple[str, ...], Any]]: - """ - Flatten an XML tree into (path, value) pairs. + def generate_jinja2_template_with_loops( + self, + parsed: Any, + role_prefix: str, + original_text: str | None, + loop_candidates: list[LoopCandidate], + ) -> str: + """Generate template with Jinja2 for loops where appropriate.""" - Path conventions: - - Root element's children are treated as top-level (root tag is *not* included). - - Element text: - bar -> path ("foo",) value "bar" - bar -> path ("foo", "value") value "bar" - baz -> ("foo", "bar") / etc. - - Attributes: - - -> path ("server", "@host") value "localhost" - - Repeated sibling elements: - /a - /b - -> ("endpoint", "0") "/a" - ("endpoint", "1") "/b" - """ + if original_text is not None: + return self._generate_xml_template_with_loops_from_text( + role_prefix, original_text, loop_candidates + ) + + if not isinstance(parsed, ET.Element): + raise TypeError("XML parser result must be an Element") + + xml_str = ET.tostring(parsed, encoding="unicode") + return self._generate_xml_template_with_loops_from_text( + role_prefix, xml_str, loop_candidates + ) + + def _flatten_xml(self, root: ET.Element) -> list[tuple[tuple[str, ...], Any]]: + """Flatten an XML tree into (path, value) pairs.""" items: list[tuple[tuple[str, ...], Any]] = [] def walk(elem: ET.Element, path: tuple[str, ...]) -> None: @@ -77,10 +83,8 @@ class XmlHandler(BaseHandler): text = (elem.text or "").strip() if text: if not elem.attrib and not children: - # Simple bar items.append((path, text)) else: - # Text alongside attrs/children items.append((path + ("value",), text)) # Repeated siblings get an index; singletons just use the tag @@ -97,24 +101,16 @@ class XmlHandler(BaseHandler): child_path = path + (tag,) walk(child, child_path) - # Treat root as a container: its children are top-level walk(root, ()) return items def _split_xml_prolog(self, text: str) -> tuple[str, str]: - """ - Split an XML document into (prolog, body), where prolog includes: - - XML declaration () - - top-level comments - - DOCTYPE - The body starts at the root element. - """ + """Split XML into (prolog, body).""" i = 0 n = len(text) prolog_parts: list[str] = [] while i < n: - # Preserve leading whitespace while i < n and text[i].isspace(): prolog_parts.append(text[i]) i += 1 @@ -146,22 +142,33 @@ class XmlHandler(BaseHandler): continue if text[i] == "<": - # Assume root element starts here break - # Unexpected content: stop treating as prolog break return "".join(prolog_parts), text[i:] - def _apply_jinja_to_xml_tree(self, role_prefix: str, root: ET.Element) -> None: + def _apply_jinja_to_xml_tree( + self, + role_prefix: str, + root: ET.Element, + loop_candidates: list[LoopCandidate] | None = None, + ) -> None: """ - Mutate the XML tree in-place, replacing scalar values with Jinja - expressions based on the same paths used in _flatten_xml. + Mutate XML tree in-place, replacing values with Jinja expressions. + + If loop_candidates is provided, repeated elements matching a candidate + will be replaced with a {% for %} loop. """ + # Build a map of loop paths for quick lookup + loop_paths = {} + if loop_candidates: + for candidate in loop_candidates: + loop_paths[candidate.path] = candidate + def walk(elem: ET.Element, path: tuple[str, ...]) -> None: - # Attributes + # Attributes (unless this element is in a loop) for attr_name in list(elem.attrib.keys()): attr_path = path + (f"@{attr_name}",) var_name = self.make_var_name(role_prefix, attr_path) @@ -180,51 +187,273 @@ class XmlHandler(BaseHandler): var_name = self.make_var_name(role_prefix, text_path) elem.text = f"{{{{ {var_name} }}}}" - # Repeated children get indexes just like in _flatten_xml + # Handle children - check for loops first counts = Counter(child.tag for child in children) index_counters: dict[str, int] = defaultdict(int) + # Check each tag to see if it's a loop candidate + processed_tags = set() + for child in children: tag = child.tag - if counts[tag] > 1: + + # Skip if we've already processed this tag as a loop + if tag in processed_tags: + continue + + child_path = path + (tag,) + + # Check if this is a loop candidate + if child_path in loop_paths: + # Mark this tag as processed + processed_tags.add(tag) + + # Remove all children with this tag + for child_to_remove in [c for c in children if c.tag == tag]: + elem.remove(child_to_remove) + + # Create a loop comment/marker + # We'll handle the actual loop generation in text processing + loop_marker = ET.Comment(f"LOOP:{tag}") + elem.append(loop_marker) + + elif counts[tag] > 1: + # Multiple children but not a loop candidate - use indexed paths idx = index_counters[tag] index_counters[tag] += 1 - child_path = path + (tag, str(idx)) + indexed_path = path + (tag, str(idx)) + walk(child, indexed_path) else: - child_path = path + (tag,) - walk(child, child_path) + # Single child + walk(child, child_path) walk(root, ()) def _generate_xml_template_from_text(self, role_prefix: str, text: str) -> str: - """ - Generate a Jinja2 template for an XML file, preserving comments and prolog. - - - Attributes become Jinja placeholders: - - -> - - - Text nodes become placeholders: - 8080 - -> {{ prefix_port }} - - but if the element also has attributes/children, the value path - gets a trailing "value" component, matching flattening. - """ + """Generate scalar-only Jinja2 template.""" prolog, body = self._split_xml_prolog(text) - # Parse with comments included so are preserved - # defusedxml.defuse_stdlib() is called in CLI entrypoint parser = ET.XMLParser(target=ET.TreeBuilder(insert_comments=True)) # nosec B314 parser.feed(body) root = parser.close() self._apply_jinja_to_xml_tree(role_prefix, root) - # Pretty indentation if available (Python 3.9+) indent = getattr(ET, "indent", None) if indent is not None: indent(root, space=" ") # type: ignore[arg-type] xml_body = ET.tostring(root, encoding="unicode") return prolog + xml_body + + def _generate_xml_template_with_loops_from_text( + self, + role_prefix: str, + text: str, + loop_candidates: list[LoopCandidate], + ) -> str: + """Generate Jinja2 template with for loops.""" + + prolog, body = self._split_xml_prolog(text) + + # Parse with comments preserved + parser = ET.XMLParser(target=ET.TreeBuilder(insert_comments=True)) # nosec B314 + parser.feed(body) + root = parser.close() + + # Apply Jinja transformations (including loop markers) + self._apply_jinja_to_xml_tree(role_prefix, root, loop_candidates) + + # Convert to string + indent = getattr(ET, "indent", None) + if indent is not None: + indent(root, space=" ") # type: ignore[arg-type] + + xml_body = ET.tostring(root, encoding="unicode") + + # Post-process to replace loop markers with actual Jinja loops + xml_body = self._insert_xml_loops(xml_body, role_prefix, loop_candidates, root) + + return prolog + xml_body + + def _insert_xml_loops( + self, + xml_str: str, + role_prefix: str, + loop_candidates: list[LoopCandidate], + root: ET.Element, + ) -> str: + """ + Post-process XML string to insert Jinja2 for loops. + + This replaces markers with actual loop constructs. + """ + + # Build a sample element for each loop to use as template + lines = xml_str.split("\n") + result_lines = [] + + for line in lines: + # Check if this line contains a loop marker + if "", start) + tag_name = line[start:end].strip() + + # Find matching loop candidate + candidate = None + for cand in loop_candidates: + if cand.path and cand.path[-1] == tag_name: + candidate = cand + break + + if candidate: + # Get indentation from current line + indent_level = len(line) - len(line.lstrip()) + indent_str = " " * indent_level + + # Generate loop variable name + collection_var = self.make_var_name(role_prefix, candidate.path) + item_var = candidate.loop_var + + # Create sample element with ALL possible fields from ALL items + if candidate.items: + # Merge all items to get the union of all fields + merged_dict = self._merge_dicts_for_template(candidate.items) + + sample_elem = self._dict_to_xml_element( + tag_name, merged_dict, item_var + ) + + # Apply indentation to the sample element + ET.indent(sample_elem, space=" ") + + # Convert sample to string + sample_str = ET.tostring( + sample_elem, encoding="unicode" + ).strip() + + # Add proper indentation to each line of the sample + sample_lines = sample_str.split("\n") + + # Build loop + result_lines.append( + f"{indent_str}{{% for {item_var} in {collection_var} %}}" + ) + # Add each line of the sample with proper indentation + for sample_line in sample_lines: + result_lines.append(f"{indent_str} {sample_line}") + result_lines.append(f"{indent_str}{{% endfor %}}") + else: + # Keep the marker if we can't find the candidate + result_lines.append(line) + else: + result_lines.append(line) + + # Post-process to replace and with Jinja2 conditionals + final_lines = [] + for line in result_lines: + # Replace with {% if var.field is defined %} + if "", start) + condition = line[start:end] + indent = len(line) - len(line.lstrip()) + final_lines.append(f"{' ' * indent}{{% if {condition} is defined %}}") + # Replace with {% endif %} + elif "", i + 4) - if end == -1: - break - prolog_parts.append(text[i : end + 3]) - i = end + 3 - continue - - if text.startswith("", i + 9) - if end == -1: - break - prolog_parts.append(text[i : end + 1]) - i = end + 1 - continue - - if text[i] == "<": - break - - break - - return "".join(prolog_parts), text[i:] - - def _apply_jinja_to_xml_tree( - self, - role_prefix: str, - root: ET.Element, - loop_candidates: list[LoopCandidate] | None = None, - ) -> None: - """ - Mutate XML tree in-place, replacing values with Jinja expressions. - - If loop_candidates is provided, repeated elements matching a candidate - will be replaced with a {% for %} loop. - """ - - # Build a map of loop paths for quick lookup - loop_paths = {} - if loop_candidates: - for candidate in loop_candidates: - loop_paths[candidate.path] = candidate - - def walk(elem: ET.Element, path: tuple[str, ...]) -> None: - # Attributes (unless this element is in a loop) - for attr_name in list(elem.attrib.keys()): - attr_path = path + (f"@{attr_name}",) - var_name = self.make_var_name(role_prefix, attr_path) - elem.set(attr_name, f"{{{{ {var_name} }}}}") - - # Children - children = [c for c in list(elem) if isinstance(c.tag, str)] - - # Text content - text = (elem.text or "").strip() - if text: - if not elem.attrib and not children: - text_path = path - else: - text_path = path + ("value",) - var_name = self.make_var_name(role_prefix, text_path) - elem.text = f"{{{{ {var_name} }}}}" - - # Handle children - check for loops first - counts = Counter(child.tag for child in children) - index_counters: dict[str, int] = defaultdict(int) - - # Check each tag to see if it's a loop candidate - processed_tags = set() - - for child in children: - tag = child.tag - - # Skip if we've already processed this tag as a loop - if tag in processed_tags: - continue - - child_path = path + (tag,) - - # Check if this is a loop candidate - if child_path in loop_paths: - # Mark this tag as processed - processed_tags.add(tag) - - # Remove all children with this tag - for child_to_remove in [c for c in children if c.tag == tag]: - elem.remove(child_to_remove) - - # Create a loop comment/marker - # We'll handle the actual loop generation in text processing - loop_marker = ET.Comment(f"LOOP:{tag}") - elem.append(loop_marker) - - elif counts[tag] > 1: - # Multiple children but not a loop candidate - use indexed paths - idx = index_counters[tag] - index_counters[tag] += 1 - indexed_path = path + (tag, str(idx)) - walk(child, indexed_path) - else: - # Single child - walk(child, child_path) - - walk(root, ()) - - def _generate_xml_template_from_text(self, role_prefix: str, text: str) -> str: - """Generate scalar-only Jinja2 template.""" - prolog, body = self._split_xml_prolog(text) - - parser = ET.XMLParser(target=ET.TreeBuilder(insert_comments=True)) # nosec B314 - parser.feed(body) - root = parser.close() - - self._apply_jinja_to_xml_tree(role_prefix, root) - - indent = getattr(ET, "indent", None) - if indent is not None: - indent(root, space=" ") # type: ignore[arg-type] - - xml_body = ET.tostring(root, encoding="unicode") - return prolog + xml_body - - def _generate_xml_template_with_loops_from_text( - self, - role_prefix: str, - text: str, - loop_candidates: list[LoopCandidate], - ) -> str: - """Generate Jinja2 template with for loops.""" - - prolog, body = self._split_xml_prolog(text) - - # Parse with comments preserved - parser = ET.XMLParser(target=ET.TreeBuilder(insert_comments=True)) # nosec B314 - parser.feed(body) - root = parser.close() - - # Apply Jinja transformations (including loop markers) - self._apply_jinja_to_xml_tree(role_prefix, root, loop_candidates) - - # Convert to string - indent = getattr(ET, "indent", None) - if indent is not None: - indent(root, space=" ") # type: ignore[arg-type] - - xml_body = ET.tostring(root, encoding="unicode") - - # Post-process to replace loop markers with actual Jinja loops - xml_body = self._insert_xml_loops(xml_body, role_prefix, loop_candidates, root) - - return prolog + xml_body - - def _insert_xml_loops( - self, - xml_str: str, - role_prefix: str, - loop_candidates: list[LoopCandidate], - root: ET.Element, - ) -> str: - """ - Post-process XML string to insert Jinja2 for loops. - - This replaces markers with actual loop constructs. - """ - - # Build a sample element for each loop to use as template - lines = xml_str.split("\n") - result_lines = [] - - for line in lines: - # Check if this line contains a loop marker - if "", start) - tag_name = line[start:end].strip() - - # Find matching loop candidate - candidate = None - for cand in loop_candidates: - if cand.path and cand.path[-1] == tag_name: - candidate = cand - break - - if candidate: - # Get indentation from current line - indent_level = len(line) - len(line.lstrip()) - indent_str = " " * indent_level - - # Generate loop variable name - collection_var = self.make_var_name(role_prefix, candidate.path) - item_var = candidate.loop_var - - # Create sample element from first item - if candidate.items: - sample_elem = self._dict_to_xml_element( - tag_name, candidate.items[0], item_var - ) - - # Apply indentation to the sample element - ET.indent(sample_elem, space=" ") - - # Convert sample to string - sample_str = ET.tostring( - sample_elem, encoding="unicode" - ).strip() - - # Add proper indentation to each line of the sample - sample_lines = sample_str.split("\n") - indented_sample_lines = [ - ( - f"{indent_str} {line}" - if i > 0 - else f"{indent_str} {line}" - ) - for i, line in enumerate(sample_lines) - ] - indented_sample = "\n".join(indented_sample_lines) - - # Build loop - result_lines.append( - f"{indent_str}{{% for {item_var} in {collection_var} %}}" - ) - result_lines.append(indented_sample) - result_lines.append(f"{indent_str}{{% endfor %}}") - else: - # Keep the marker if we can't find the candidate - result_lines.append(line) - else: - result_lines.append(line) - - return "\n".join(result_lines) - - def _dict_to_xml_element( - self, tag: str, data: dict[str, Any], loop_var: str - ) -> ET.Element: - """ - Convert a dict to an XML element with Jinja2 variable references. - - Args: - tag: Element tag name - data: Dict representing element structure - loop_var: Loop variable name to use in Jinja expressions - """ - - elem = ET.Element(tag) - - # Handle attributes and child elements - for key, value in data.items(): - if key.startswith("@"): - # Attribute - attr_name = key[1:] # Remove @ prefix - elem.set(attr_name, f"{{{{ {loop_var}.{attr_name} }}}}") - elif key == "_text": - # Simple text content - elem.text = f"{{{{ {loop_var} }}}}" - elif key == "value": - # Text with attributes/children - elem.text = f"{{{{ {loop_var}.value }}}}" - elif key == "_key": - # This is the dict key (for dict collections), skip in XML - pass - elif isinstance(value, dict): - # Nested element - check if it has _text - child = ET.SubElement(elem, key) - if "_text" in value: - child.text = f"{{{{ {loop_var}.{key}._text }}}}" - else: - # More complex nested structure - for sub_key, sub_val in value.items(): - if not sub_key.startswith("_"): - grandchild = ET.SubElement(child, sub_key) - grandchild.text = f"{{{{ {loop_var}.{key}.{sub_key} }}}}" - elif not isinstance(value, list): - # Simple child element (scalar value) - child = ET.SubElement(elem, key) - child.text = f"{{{{ {loop_var}.{key} }}}}" - - return elem diff --git a/src/jinjaturtle/handlers/yaml.py b/src/jinjaturtle/handlers/yaml.py index 2ebaf3e..1220f52 100644 --- a/src/jinjaturtle/handlers/yaml.py +++ b/src/jinjaturtle/handlers/yaml.py @@ -4,23 +4,29 @@ import yaml from pathlib import Path from typing import Any -from . import DictLikeHandler +from .dict import DictLikeHandler +from ..loop_analyzer import LoopCandidate class YamlHandler(DictLikeHandler): + """ + YAML handler that can generate both scalar templates and loop-based templates. + """ + fmt = "yaml" - flatten_lists = True # you flatten YAML lists + flatten_lists = True def parse(self, path: Path) -> Any: text = path.read_text(encoding="utf-8") return yaml.safe_load(text) or {} - def generate_template( + def generate_jinja2_template( self, parsed: Any, role_prefix: str, original_text: str | None = None, ) -> str: + """Original scalar-only template generation.""" if original_text is not None: return self._generate_yaml_template_from_text(role_prefix, original_text) if not isinstance(parsed, (dict, list)): @@ -28,29 +34,41 @@ class YamlHandler(DictLikeHandler): dumped = yaml.safe_dump(parsed, sort_keys=False) return self._generate_yaml_template_from_text(role_prefix, dumped) + def generate_jinja2_template_with_loops( + self, + parsed: Any, + role_prefix: str, + original_text: str | None, + loop_candidates: list[LoopCandidate], + ) -> str: + """Generate template with Jinja2 for loops where appropriate.""" + + # Build loop path set for quick lookup + loop_paths = {candidate.path for candidate in loop_candidates} + + if original_text is not None: + return self._generate_yaml_template_with_loops_from_text( + role_prefix, original_text, loop_candidates, loop_paths + ) + + if not isinstance(parsed, (dict, list)): + raise TypeError("YAML parser result must be a dict or list") + + dumped = yaml.safe_dump(parsed, sort_keys=False) + return self._generate_yaml_template_with_loops_from_text( + role_prefix, dumped, loop_candidates, loop_paths + ) + def _generate_yaml_template_from_text( self, role_prefix: str, text: str, ) -> str: - """ - Generate a Jinja2 template for a YAML file, preserving comments and - blank lines by patching scalar values in-place. - - This handles common "config-ish" YAML: - - top-level and nested mappings - - lists of scalars - - lists of small mapping objects - It does *not* aim to support all YAML edge cases (anchors, tags, etc.). - """ + """Original scalar-only template generation (unchanged from base).""" lines = text.splitlines(keepends=True) out_lines: list[str] = [] - # Simple indentation-based context stack: (indent, path, kind) - # kind is "map" or "seq". stack: list[tuple[int, tuple[str, ...], str]] = [] - - # Track index per parent path for sequences seq_counters: dict[tuple[str, ...], int] = {} def current_path() -> tuple[str, ...]: @@ -60,7 +78,147 @@ class YamlHandler(DictLikeHandler): stripped = raw_line.lstrip() indent = len(raw_line) - len(stripped) - # Blank or pure comment lines unchanged + if not stripped or stripped.startswith("#"): + out_lines.append(raw_line) + continue + + while stack and indent < stack[-1][0]: + stack.pop() + + if ":" in stripped and not stripped.lstrip().startswith("- "): + key_part, rest = stripped.split(":", 1) + key = key_part.strip() + if not key: + out_lines.append(raw_line) + continue + + rest_stripped = rest.lstrip(" \t") + value_candidate, _ = self._split_inline_comment(rest_stripped, {"#"}) + has_value = bool(value_candidate.strip()) + + if stack and stack[-1][0] == indent and stack[-1][2] == "map": + stack.pop() + path = current_path() + (key,) + stack.append((indent, path, "map")) + + if not has_value: + out_lines.append(raw_line) + continue + + value_part, comment_part = self._split_inline_comment( + rest_stripped, {"#"} + ) + raw_value = value_part.strip() + var_name = self.make_var_name(role_prefix, path) + + use_quotes = ( + len(raw_value) >= 2 + and raw_value[0] == raw_value[-1] + and raw_value[0] in {'"', "'"} + ) + + if use_quotes: + q = raw_value[0] + replacement = f"{q}{{{{ {var_name} }}}}{q}" + else: + replacement = f"{{{{ {var_name} }}}}" + + leading = rest[: len(rest) - len(rest.lstrip(" \t"))] + new_stripped = f"{key}: {leading}{replacement}{comment_part}" + out_lines.append( + " " * indent + + new_stripped + + ("\n" if raw_line.endswith("\n") else "") + ) + continue + + if stripped.startswith("- "): + if not stack or stack[-1][0] != indent or stack[-1][2] != "seq": + parent_path = current_path() + stack.append((indent, parent_path, "seq")) + + parent_path = stack[-1][1] + content = stripped[2:] + + index = seq_counters.get(parent_path, 0) + seq_counters[parent_path] = index + 1 + + path = parent_path + (str(index),) + + value_part, comment_part = self._split_inline_comment(content, {"#"}) + raw_value = value_part.strip() + var_name = self.make_var_name(role_prefix, path) + + use_quotes = ( + len(raw_value) >= 2 + and raw_value[0] == raw_value[-1] + and raw_value[0] in {'"', "'"} + ) + + if use_quotes: + q = raw_value[0] + replacement = f"{q}{{{{ {var_name} }}}}{q}" + else: + replacement = f"{{{{ {var_name} }}}}" + + new_stripped = f"- {replacement}{comment_part}" + out_lines.append( + " " * indent + + new_stripped + + ("\n" if raw_line.endswith("\n") else "") + ) + continue + + out_lines.append(raw_line) + + return "".join(out_lines) + + def _generate_yaml_template_with_loops_from_text( + self, + role_prefix: str, + text: str, + loop_candidates: list[LoopCandidate], + loop_paths: set[tuple[str, ...]], + ) -> str: + """ + Generate YAML template with Jinja2 for loops. + + Strategy: + 1. Parse YAML line-by-line maintaining context + 2. When we encounter a path that's a loop candidate: + - Replace that section with a {% for %} loop + - Use the first item as template structure + 3. Everything else gets scalar variable replacement + """ + + lines = text.splitlines(keepends=True) + out_lines: list[str] = [] + + stack: list[tuple[int, tuple[str, ...], str]] = [] + seq_counters: dict[tuple[str, ...], int] = {} + + # Track which lines are part of loop sections (to skip them) + skip_until_indent: int | None = None + + def current_path() -> tuple[str, ...]: + return stack[-1][1] if stack else () + + for raw_line in lines: + stripped = raw_line.lstrip() + indent = len(raw_line) - len(stripped) + + # If we're skipping lines (inside a loop section), check if we can stop + if skip_until_indent is not None: + if ( + indent <= skip_until_indent + and stripped + and not stripped.startswith("#") + ): + skip_until_indent = None + else: + continue # Skip this line + + # Blank or comment lines if not stripped or stripped.startswith("#"): out_lines.append(raw_line) continue @@ -71,42 +229,45 @@ class YamlHandler(DictLikeHandler): # --- Handle mapping key lines: "key:" or "key: value" if ":" in stripped and not stripped.lstrip().startswith("- "): - # separate key and rest key_part, rest = stripped.split(":", 1) key = key_part.strip() if not key: out_lines.append(raw_line) continue - # Is this just "key:" or "key: value"? rest_stripped = rest.lstrip(" \t") - - # Use the same inline-comment splitter to see if there's any real value value_candidate, _ = self._split_inline_comment(rest_stripped, {"#"}) has_value = bool(value_candidate.strip()) - # Update stack/context: current mapping at this indent - # Replace any existing mapping at same indent if stack and stack[-1][0] == indent and stack[-1][2] == "map": stack.pop() path = current_path() + (key,) stack.append((indent, path, "map")) + # Check if this path is a loop candidate + if path in loop_paths: + # Find the matching candidate + candidate = next(c for c in loop_candidates if c.path == path) + + # Generate loop + loop_str = self._generate_yaml_loop(candidate, role_prefix, indent) + out_lines.append(loop_str) + + # Skip subsequent lines that are part of this collection + skip_until_indent = indent + continue + if not has_value: - # Just "key:" -> collection or nested structure begins on following lines. out_lines.append(raw_line) continue - # We have an inline scalar value on this same line. - - # Separate value from inline comment + # Scalar value - replace with variable value_part, comment_part = self._split_inline_comment( rest_stripped, {"#"} ) raw_value = value_part.strip() var_name = self.make_var_name(role_prefix, path) - # Keep quote-style if original was quoted use_quotes = ( len(raw_value) >= 2 and raw_value[0] == raw_value[-1] @@ -130,18 +291,30 @@ class YamlHandler(DictLikeHandler): # --- Handle list items: "- value" or "- key: value" if stripped.startswith("- "): - # Determine parent path - # If top of stack isn't sequence at this indent, push one using current path if not stack or stack[-1][0] != indent or stack[-1][2] != "seq": parent_path = current_path() stack.append((indent, parent_path, "seq")) parent_path = stack[-1][1] - content = stripped[2:] # after "- " - parent_path = stack[-1][1] - content = stripped[2:] # after "- " - # Determine index for this parent path + # Check if parent path is a loop candidate + if parent_path in loop_paths: + # Find the matching candidate + candidate = next( + c for c in loop_candidates if c.path == parent_path + ) + + # Generate loop (with indent for the '-' items) + loop_str = self._generate_yaml_loop( + candidate, role_prefix, indent, is_list=True + ) + out_lines.append(loop_str) + + # Skip subsequent items + skip_until_indent = indent - 1 if indent > 0 else None + continue + + content = stripped[2:] index = seq_counters.get(parent_path, 0) seq_counters[parent_path] = index + 1 @@ -151,8 +324,6 @@ class YamlHandler(DictLikeHandler): raw_value = value_part.strip() var_name = self.make_var_name(role_prefix, path) - # If it's of the form "key: value" inside the list, we could try to - # support that, but a simple scalar is the common case: use_quotes = ( len(raw_value) >= 2 and raw_value[0] == raw_value[-1] @@ -173,7 +344,106 @@ class YamlHandler(DictLikeHandler): ) continue - # Anything else (multi-line scalars, weird YAML): leave untouched out_lines.append(raw_line) return "".join(out_lines) + + def _generate_yaml_loop( + self, + candidate: LoopCandidate, + role_prefix: str, + indent: int, + is_list: bool = False, + ) -> str: + """ + Generate a Jinja2 for loop for a YAML collection. + + Args: + candidate: Loop candidate with items and metadata + role_prefix: Variable prefix + indent: Indentation level in spaces + is_list: True if this is a YAML list, False if dict + + Returns: + YAML string with Jinja2 loop + """ + + indent_str = " " * indent + collection_var = self.make_var_name(role_prefix, candidate.path) + item_var = candidate.loop_var + + lines = [] + + if not is_list: + # Dict-style: key: {% for ... %} + key = candidate.path[-1] if candidate.path else "items" + lines.append(f"{indent_str}{key}:") + lines.append(f"{indent_str} {{% for {item_var} in {collection_var} %}}") + else: + # List-style: just the loop + lines.append(f"{indent_str}{{% for {item_var} in {collection_var} %}}") + + # Generate template for item structure + if candidate.items: + sample_item = candidate.items[0] + item_indent = indent + 2 if not is_list else indent + + if candidate.item_schema == "scalar": + # Simple list of scalars + if is_list: + lines.append(f"{indent_str}- {{{{ {item_var} }}}}") + else: + lines.append(f"{indent_str} - {{{{ {item_var} }}}}") + + elif candidate.item_schema in ("simple_dict", "nested"): + # List of dicts or complex items - these are ALWAYS list items in YAML + item_lines = self._dict_to_yaml_lines( + sample_item, item_var, item_indent, is_list_item=True + ) + lines.extend(item_lines) + + # Close loop + close_indent = indent + 2 if not is_list else indent + lines.append(f"{' ' * close_indent}{{% endfor %}}") + + return "\n".join(lines) + "\n" + + def _dict_to_yaml_lines( + self, + data: dict[str, Any], + loop_var: str, + indent: int, + is_list_item: bool = False, + ) -> list[str]: + """ + Convert a dict to YAML lines with Jinja2 variable references. + + Args: + data: Dict representing item structure + loop_var: Loop variable name + indent: Base indentation level + is_list_item: True if this should start with '-' + + Returns: + List of YAML lines + """ + + lines = [] + indent_str = " " * indent + + first_key = True + for key, value in data.items(): + if key == "_key": + # Special key for dict collections - output as comment or skip + continue + + if first_key and is_list_item: + # First key gets the list marker + lines.append(f"{indent_str}- {key}: {{{{ {loop_var}.{key} }}}}") + first_key = False + else: + # Subsequent keys are indented + sub_indent = indent + 2 if is_list_item else indent + lines.append(f"{' ' * sub_indent}{key}: {{{{ {loop_var}.{key} }}}}") + + return lines diff --git a/src/jinjaturtle/handlers/yaml_loopable.py b/src/jinjaturtle/handlers/yaml_loopable.py deleted file mode 100644 index 2cc66a9..0000000 --- a/src/jinjaturtle/handlers/yaml_loopable.py +++ /dev/null @@ -1,449 +0,0 @@ -from __future__ import annotations - -import yaml -from pathlib import Path -from typing import Any - -from .dict import DictLikeHandler -from ..loop_analyzer import LoopCandidate - - -class YamlHandlerLoopable(DictLikeHandler): - """ - YAML handler that can generate both scalar templates and loop-based templates. - """ - - fmt = "yaml" - flatten_lists = True - - def parse(self, path: Path) -> Any: - text = path.read_text(encoding="utf-8") - return yaml.safe_load(text) or {} - - def generate_template( - self, - parsed: Any, - role_prefix: str, - original_text: str | None = None, - ) -> str: - """Original scalar-only template generation.""" - if original_text is not None: - return self._generate_yaml_template_from_text(role_prefix, original_text) - if not isinstance(parsed, (dict, list)): - raise TypeError("YAML parser result must be a dict or list") - dumped = yaml.safe_dump(parsed, sort_keys=False) - return self._generate_yaml_template_from_text(role_prefix, dumped) - - def generate_template_with_loops( - self, - parsed: Any, - role_prefix: str, - original_text: str | None, - loop_candidates: list[LoopCandidate], - ) -> str: - """Generate template with Jinja2 for loops where appropriate.""" - - # Build loop path set for quick lookup - loop_paths = {candidate.path for candidate in loop_candidates} - - if original_text is not None: - return self._generate_yaml_template_with_loops_from_text( - role_prefix, original_text, loop_candidates, loop_paths - ) - - if not isinstance(parsed, (dict, list)): - raise TypeError("YAML parser result must be a dict or list") - - dumped = yaml.safe_dump(parsed, sort_keys=False) - return self._generate_yaml_template_with_loops_from_text( - role_prefix, dumped, loop_candidates, loop_paths - ) - - def _generate_yaml_template_from_text( - self, - role_prefix: str, - text: str, - ) -> str: - """Original scalar-only template generation (unchanged from base).""" - lines = text.splitlines(keepends=True) - out_lines: list[str] = [] - - stack: list[tuple[int, tuple[str, ...], str]] = [] - seq_counters: dict[tuple[str, ...], int] = {} - - def current_path() -> tuple[str, ...]: - return stack[-1][1] if stack else () - - for raw_line in lines: - stripped = raw_line.lstrip() - indent = len(raw_line) - len(stripped) - - if not stripped or stripped.startswith("#"): - out_lines.append(raw_line) - continue - - while stack and indent < stack[-1][0]: - stack.pop() - - if ":" in stripped and not stripped.lstrip().startswith("- "): - key_part, rest = stripped.split(":", 1) - key = key_part.strip() - if not key: - out_lines.append(raw_line) - continue - - rest_stripped = rest.lstrip(" \t") - value_candidate, _ = self._split_inline_comment(rest_stripped, {"#"}) - has_value = bool(value_candidate.strip()) - - if stack and stack[-1][0] == indent and stack[-1][2] == "map": - stack.pop() - path = current_path() + (key,) - stack.append((indent, path, "map")) - - if not has_value: - out_lines.append(raw_line) - continue - - value_part, comment_part = self._split_inline_comment( - rest_stripped, {"#"} - ) - raw_value = value_part.strip() - var_name = self.make_var_name(role_prefix, path) - - use_quotes = ( - len(raw_value) >= 2 - and raw_value[0] == raw_value[-1] - and raw_value[0] in {'"', "'"} - ) - - if use_quotes: - q = raw_value[0] - replacement = f"{q}{{{{ {var_name} }}}}{q}" - else: - replacement = f"{{{{ {var_name} }}}}" - - leading = rest[: len(rest) - len(rest.lstrip(" \t"))] - new_stripped = f"{key}: {leading}{replacement}{comment_part}" - out_lines.append( - " " * indent - + new_stripped - + ("\n" if raw_line.endswith("\n") else "") - ) - continue - - if stripped.startswith("- "): - if not stack or stack[-1][0] != indent or stack[-1][2] != "seq": - parent_path = current_path() - stack.append((indent, parent_path, "seq")) - - parent_path = stack[-1][1] - content = stripped[2:] - - index = seq_counters.get(parent_path, 0) - seq_counters[parent_path] = index + 1 - - path = parent_path + (str(index),) - - value_part, comment_part = self._split_inline_comment(content, {"#"}) - raw_value = value_part.strip() - var_name = self.make_var_name(role_prefix, path) - - use_quotes = ( - len(raw_value) >= 2 - and raw_value[0] == raw_value[-1] - and raw_value[0] in {'"', "'"} - ) - - if use_quotes: - q = raw_value[0] - replacement = f"{q}{{{{ {var_name} }}}}{q}" - else: - replacement = f"{{{{ {var_name} }}}}" - - new_stripped = f"- {replacement}{comment_part}" - out_lines.append( - " " * indent - + new_stripped - + ("\n" if raw_line.endswith("\n") else "") - ) - continue - - out_lines.append(raw_line) - - return "".join(out_lines) - - def _generate_yaml_template_with_loops_from_text( - self, - role_prefix: str, - text: str, - loop_candidates: list[LoopCandidate], - loop_paths: set[tuple[str, ...]], - ) -> str: - """ - Generate YAML template with Jinja2 for loops. - - Strategy: - 1. Parse YAML line-by-line maintaining context - 2. When we encounter a path that's a loop candidate: - - Replace that section with a {% for %} loop - - Use the first item as template structure - 3. Everything else gets scalar variable replacement - """ - - lines = text.splitlines(keepends=True) - out_lines: list[str] = [] - - stack: list[tuple[int, tuple[str, ...], str]] = [] - seq_counters: dict[tuple[str, ...], int] = {} - - # Track which lines are part of loop sections (to skip them) - skip_until_indent: int | None = None - - def current_path() -> tuple[str, ...]: - return stack[-1][1] if stack else () - - for raw_line in lines: - stripped = raw_line.lstrip() - indent = len(raw_line) - len(stripped) - - # If we're skipping lines (inside a loop section), check if we can stop - if skip_until_indent is not None: - if ( - indent <= skip_until_indent - and stripped - and not stripped.startswith("#") - ): - skip_until_indent = None - else: - continue # Skip this line - - # Blank or comment lines - if not stripped or stripped.startswith("#"): - out_lines.append(raw_line) - continue - - # Adjust stack based on indent - while stack and indent < stack[-1][0]: - stack.pop() - - # --- Handle mapping key lines: "key:" or "key: value" - if ":" in stripped and not stripped.lstrip().startswith("- "): - key_part, rest = stripped.split(":", 1) - key = key_part.strip() - if not key: - out_lines.append(raw_line) - continue - - rest_stripped = rest.lstrip(" \t") - value_candidate, _ = self._split_inline_comment(rest_stripped, {"#"}) - has_value = bool(value_candidate.strip()) - - if stack and stack[-1][0] == indent and stack[-1][2] == "map": - stack.pop() - path = current_path() + (key,) - stack.append((indent, path, "map")) - - # Check if this path is a loop candidate - if path in loop_paths: - # Find the matching candidate - candidate = next(c for c in loop_candidates if c.path == path) - - # Generate loop - loop_str = self._generate_yaml_loop(candidate, role_prefix, indent) - out_lines.append(loop_str) - - # Skip subsequent lines that are part of this collection - skip_until_indent = indent - continue - - if not has_value: - out_lines.append(raw_line) - continue - - # Scalar value - replace with variable - value_part, comment_part = self._split_inline_comment( - rest_stripped, {"#"} - ) - raw_value = value_part.strip() - var_name = self.make_var_name(role_prefix, path) - - use_quotes = ( - len(raw_value) >= 2 - and raw_value[0] == raw_value[-1] - and raw_value[0] in {'"', "'"} - ) - - if use_quotes: - q = raw_value[0] - replacement = f"{q}{{{{ {var_name} }}}}{q}" - else: - replacement = f"{{{{ {var_name} }}}}" - - leading = rest[: len(rest) - len(rest.lstrip(" \t"))] - new_stripped = f"{key}: {leading}{replacement}{comment_part}" - out_lines.append( - " " * indent - + new_stripped - + ("\n" if raw_line.endswith("\n") else "") - ) - continue - - # --- Handle list items: "- value" or "- key: value" - if stripped.startswith("- "): - if not stack or stack[-1][0] != indent or stack[-1][2] != "seq": - parent_path = current_path() - stack.append((indent, parent_path, "seq")) - - parent_path = stack[-1][1] - - # Check if parent path is a loop candidate - if parent_path in loop_paths: - # Find the matching candidate - candidate = next( - c for c in loop_candidates if c.path == parent_path - ) - - # Generate loop (with indent for the '-' items) - loop_str = self._generate_yaml_loop( - candidate, role_prefix, indent, is_list=True - ) - out_lines.append(loop_str) - - # Skip subsequent items - skip_until_indent = indent - 1 if indent > 0 else None - continue - - content = stripped[2:] - index = seq_counters.get(parent_path, 0) - seq_counters[parent_path] = index + 1 - - path = parent_path + (str(index),) - - value_part, comment_part = self._split_inline_comment(content, {"#"}) - raw_value = value_part.strip() - var_name = self.make_var_name(role_prefix, path) - - use_quotes = ( - len(raw_value) >= 2 - and raw_value[0] == raw_value[-1] - and raw_value[0] in {'"', "'"} - ) - - if use_quotes: - q = raw_value[0] - replacement = f"{q}{{{{ {var_name} }}}}{q}" - else: - replacement = f"{{{{ {var_name} }}}}" - - new_stripped = f"- {replacement}{comment_part}" - out_lines.append( - " " * indent - + new_stripped - + ("\n" if raw_line.endswith("\n") else "") - ) - continue - - out_lines.append(raw_line) - - return "".join(out_lines) - - def _generate_yaml_loop( - self, - candidate: LoopCandidate, - role_prefix: str, - indent: int, - is_list: bool = False, - ) -> str: - """ - Generate a Jinja2 for loop for a YAML collection. - - Args: - candidate: Loop candidate with items and metadata - role_prefix: Variable prefix - indent: Indentation level in spaces - is_list: True if this is a YAML list, False if dict - - Returns: - YAML string with Jinja2 loop - """ - - indent_str = " " * indent - collection_var = self.make_var_name(role_prefix, candidate.path) - item_var = candidate.loop_var - - lines = [] - - if not is_list: - # Dict-style: key: {% for ... %} - key = candidate.path[-1] if candidate.path else "items" - lines.append(f"{indent_str}{key}:") - lines.append(f"{indent_str} {{% for {item_var} in {collection_var} %}}") - else: - # List-style: just the loop - lines.append(f"{indent_str}{{% for {item_var} in {collection_var} %}}") - - # Generate template for item structure - if candidate.items: - sample_item = candidate.items[0] - item_indent = indent + 2 if not is_list else indent - - if candidate.item_schema == "scalar": - # Simple list of scalars - if is_list: - lines.append(f"{indent_str}- {{{{ {item_var} }}}}") - else: - lines.append(f"{indent_str} - {{{{ {item_var} }}}}") - - elif candidate.item_schema in ("simple_dict", "nested"): - # List of dicts or complex items - these are ALWAYS list items in YAML - item_lines = self._dict_to_yaml_lines( - sample_item, item_var, item_indent, is_list_item=True - ) - lines.extend(item_lines) - - # Close loop - close_indent = indent + 2 if not is_list else indent - lines.append(f"{' ' * close_indent}{{% endfor %}}") - - return "\n".join(lines) + "\n" - - def _dict_to_yaml_lines( - self, - data: dict[str, Any], - loop_var: str, - indent: int, - is_list_item: bool = False, - ) -> list[str]: - """ - Convert a dict to YAML lines with Jinja2 variable references. - - Args: - data: Dict representing item structure - loop_var: Loop variable name - indent: Base indentation level - is_list_item: True if this should start with '-' - - Returns: - List of YAML lines - """ - - lines = [] - indent_str = " " * indent - - first_key = True - for key, value in data.items(): - if key == "_key": - # Special key for dict collections - output as comment or skip - continue - - if first_key and is_list_item: - # First key gets the list marker - lines.append(f"{indent_str}- {key}: {{{{ {loop_var}.{key} }}}}") - first_key = False - else: - # Subsequent keys are indented - sub_indent = indent + 2 if is_list_item else indent - lines.append(f"{' ' * sub_indent}{key}: {{{{ {loop_var}.{key} }}}}") - - return lines diff --git a/src/jinjaturtle/loop_analyzer.py b/src/jinjaturtle/loop_analyzer.py index 6835104..fd7e0b5 100644 --- a/src/jinjaturtle/loop_analyzer.py +++ b/src/jinjaturtle/loop_analyzer.py @@ -1,3 +1,10 @@ +""" +Loop detection and analysis for intelligent Jinja2 template generation. + +This module determines when config structures should use Jinja2 'for' loops +instead of flattened scalar variables. +""" + from __future__ import annotations from collections import Counter @@ -373,7 +380,8 @@ class LoopAnalyzer: # Allow some variation all_attrs = set().union(*attr_sets) common_attrs = set.intersection(*attr_sets) if attr_sets else set() - if len(common_attrs) / max(len(all_attrs), 1) < 0.7: + # Very permissive for attributes - 20% overlap is OK + if len(common_attrs) / max(len(all_attrs), 1) < 0.2: return False # Compare child element tags @@ -384,12 +392,16 @@ class LoopAnalyzer: if child_tag_sets: first_tags = child_tag_sets[0] if not all(tags == first_tags for tags in child_tag_sets): - # Allow some variation + # Allow significant variation for XML - just need SOME commonality + # This is important for cases like OSSEC rules where each rule + # has different optional child elements (if_sid, url_pcre2, etc.) all_tags = set().union(*child_tag_sets) common_tags = ( set.intersection(*child_tag_sets) if child_tag_sets else set() ) - if len(common_tags) / max(len(all_tags), 1) < 0.7: + # Lower threshold to 20% - if they share at least 20% of tags, consider them similar + # Even if they just share 'description' or 'id' fields, that's enough + if len(common_tags) / max(len(all_tags), 1) < 0.2: return False return True diff --git a/tests/test_base_handler.py b/tests/test_base_handler.py index cd8b0c1..5ee761f 100644 --- a/tests/test_base_handler.py +++ b/tests/test_base_handler.py @@ -31,4 +31,4 @@ def test_base_handler_abstract_methods_raise_not_implemented(tmp_path: Path): handler.flatten(object()) with pytest.raises(NotImplementedError): - handler.generate_template(parsed=object(), role_prefix="role") + handler.generate_jinja2_template(parsed=object(), role_prefix="role") diff --git a/tests/test_core_utils.py b/tests/test_core_utils.py index 96e80e2..b907d5c 100644 --- a/tests/test_core_utils.py +++ b/tests/test_core_utils.py @@ -10,8 +10,8 @@ from jinjaturtle.core import ( detect_format, parse_config, flatten_config, - generate_defaults_yaml, - generate_template, + generate_ansible_yaml, + generate_jinja2_template, make_var_name, ) @@ -90,9 +90,9 @@ def test_parse_config_unsupported_format(tmp_path: Path): parse_config(cfg_path, fmt="bogus") -def test_generate_template_type_and_format_errors(): +def test_generate_jinja2_template_type_and_format_errors(): """ - Exercise the error branches in generate_template: + Exercise the error branches in generate_jinja2_template: - toml with non-dict parsed - ini with non-ConfigParser parsed - yaml with wrong parsed type @@ -101,27 +101,29 @@ def test_generate_template_type_and_format_errors(): """ # wrong type for TOML with pytest.raises(TypeError): - generate_template("toml", parsed="not a dict", role_prefix="role") + generate_jinja2_template("toml", parsed="not a dict", role_prefix="role") # wrong type for INI with pytest.raises(TypeError): - generate_template("ini", parsed={"not": "a configparser"}, role_prefix="role") + generate_jinja2_template( + "ini", parsed={"not": "a configparser"}, role_prefix="role" + ) # wrong type for YAML with pytest.raises(TypeError): - generate_template("yaml", parsed=None, role_prefix="role") + generate_jinja2_template("yaml", parsed=None, role_prefix="role") # wrong type for JSON with pytest.raises(TypeError): - generate_template("json", parsed=None, role_prefix="role") + generate_jinja2_template("json", parsed=None, role_prefix="role") # unsupported format, no original_text with pytest.raises(ValueError): - generate_template("bogusfmt", parsed=None, role_prefix="role") + generate_jinja2_template("bogusfmt", parsed=None, role_prefix="role") # unsupported format, with original_text with pytest.raises(ValueError): - generate_template( + generate_jinja2_template( "bogusfmt", parsed=None, role_prefix="role", @@ -135,8 +137,8 @@ def test_normalize_default_value_true_false_strings(): (("section", "foo"), "true"), (("section", "bar"), "FALSE"), ] - defaults_yaml = generate_defaults_yaml("role", flat_items) - data = yaml.safe_load(defaults_yaml) + ansible_yaml = generate_ansible_yaml("role", flat_items) + data = yaml.safe_load(ansible_yaml) assert data["role_section_foo"] == "true" assert data["role_section_bar"] == "FALSE" @@ -167,14 +169,14 @@ def test_fallback_str_representer_for_unknown_type(): def test_normalize_default_value_bool_inputs_are_stringified(): """ Real boolean values should be turned into quoted 'true'/'false' strings - by _normalize_default_value via generate_defaults_yaml. + by _normalize_default_value via generate_ansible_yaml. """ flat_items = [ (("section", "flag_true"), True), (("section", "flag_false"), False), ] - defaults_yaml = generate_defaults_yaml("role", flat_items) - data = yaml.safe_load(defaults_yaml) + ansible_yaml = generate_ansible_yaml("role", flat_items) + data = yaml.safe_load(ansible_yaml) assert data["role_section_flag_true"] == "true" assert data["role_section_flag_false"] == "false" diff --git a/tests/test_ini_handler.py b/tests/test_ini_handler.py index 51ae457..3bf1252 100644 --- a/tests/test_ini_handler.py +++ b/tests/test_ini_handler.py @@ -8,8 +8,8 @@ import yaml from jinjaturtle.core import ( parse_config, flatten_config, - generate_defaults_yaml, - generate_template, + generate_ansible_yaml, + generate_jinja2_template, ) from jinjaturtle.handlers.ini import IniHandler @@ -26,8 +26,8 @@ def test_ini_php_sample_roundtrip(): flat_items = flatten_config(fmt, parsed) assert flat_items, "Expected at least one flattened item from php.ini sample" - defaults_yaml = generate_defaults_yaml("php", flat_items) - defaults = yaml.safe_load(defaults_yaml) + ansible_yaml = generate_ansible_yaml("php", flat_items) + defaults = yaml.safe_load(ansible_yaml) # defaults should be a non-empty dict assert isinstance(defaults, dict) @@ -41,7 +41,7 @@ def test_ini_php_sample_roundtrip(): # template generation original_text = ini_path.read_text(encoding="utf-8") - template = generate_template(fmt, parsed, "php", original_text=original_text) + template = generate_jinja2_template(fmt, parsed, "php", original_text=original_text) assert "; About this file" in template assert isinstance(template, str) assert template.strip(), "Template for php.ini sample should not be empty" @@ -53,16 +53,16 @@ def test_ini_php_sample_roundtrip(): ), f"Variable {var_name} not referenced in INI template" -def test_generate_template_fallback_ini(): +def test_generate_jinja2_template_fallback_ini(): """ - When original_text is not provided, generate_template should use the + When original_text is not provided, generate_jinja2_template should use the structural fallback path for INI configs. """ parser = configparser.ConfigParser() # foo is quoted in the INI text to hit the "preserve quotes" branch parser["section"] = {"foo": '"bar"', "num": "42"} - tmpl_ini = generate_template("ini", parsed=parser, role_prefix="role") + tmpl_ini = generate_jinja2_template("ini", parsed=parser, role_prefix="role") assert "[section]" in tmpl_ini assert "role_section_foo" in tmpl_ini assert '"{{ role_section_foo }}"' in tmpl_ini # came from quoted INI value diff --git a/tests/test_json_handler.py b/tests/test_json_handler.py index 8e6efe2..b9a914a 100644 --- a/tests/test_json_handler.py +++ b/tests/test_json_handler.py @@ -9,7 +9,7 @@ import yaml from jinjaturtle.core import ( parse_config, flatten_config, - generate_defaults_yaml, + generate_ansible_yaml, ) from jinjaturtle.handlers.json import JsonHandler @@ -24,8 +24,8 @@ def test_json_roundtrip(): assert fmt == "json" flat_items = flatten_config(fmt, parsed) - defaults_yaml = generate_defaults_yaml("foobar", flat_items) - defaults = yaml.safe_load(defaults_yaml) + ansible_yaml = generate_ansible_yaml("foobar", flat_items) + defaults = yaml.safe_load(ansible_yaml) # Defaults: nested keys and list indices assert defaults["foobar_foo"] == "bar" @@ -35,10 +35,12 @@ def test_json_roundtrip(): assert defaults["foobar_list_0"] == 10 assert defaults["foobar_list_1"] == 20 - # Template generation is done via JsonHandler.generate_template; we just + # Template generation is done via JsonHandler.generate_jinja2_template; we just # make sure it produces a structure with the expected placeholders. handler = JsonHandler() - templated = json.loads(handler.generate_template(parsed, role_prefix="foobar")) + templated = json.loads( + handler.generate_jinja2_template(parsed, role_prefix="foobar") + ) assert templated["foo"] == "{{ foobar_foo }}" assert "foobar_nested_a" in str(templated) @@ -47,10 +49,10 @@ def test_json_roundtrip(): assert "foobar_list_1" in str(templated) -def test_generate_template_json_type_error(): +def test_generate_jinja2_template_json_type_error(): """ - Wrong type for JSON in JsonHandler.generate_template should raise TypeError. + Wrong type for JSON in JsonHandler.generate_jinja2_template should raise TypeError. """ handler = JsonHandler() with pytest.raises(TypeError): - handler.generate_template(parsed="not a dict", role_prefix="role") + handler.generate_jinja2_template(parsed="not a dict", role_prefix="role") diff --git a/tests/test_toml_handler.py b/tests/test_toml_handler.py index b36830f..a446536 100644 --- a/tests/test_toml_handler.py +++ b/tests/test_toml_handler.py @@ -8,8 +8,8 @@ import yaml from jinjaturtle.core import ( parse_config, flatten_config, - generate_defaults_yaml, - generate_template, + generate_ansible_yaml, + generate_jinja2_template, ) from jinjaturtle.handlers.toml import TomlHandler import jinjaturtle.handlers.toml as toml_module @@ -27,8 +27,8 @@ def test_toml_sample_roundtrip(): flat_items = flatten_config(fmt, parsed) assert flat_items - defaults_yaml = generate_defaults_yaml("jinjaturtle", flat_items) - defaults = yaml.safe_load(defaults_yaml) + ansible_yaml = generate_ansible_yaml("jinjaturtle", flat_items) + defaults = yaml.safe_load(ansible_yaml) # defaults should be a non-empty dict assert isinstance(defaults, dict) @@ -42,7 +42,7 @@ def test_toml_sample_roundtrip(): # template generation – **now with original_text** original_text = toml_path.read_text(encoding="utf-8") - template = generate_template( + template = generate_jinja2_template( fmt, parsed, "jinjaturtle", original_text=original_text ) assert isinstance(template, str) @@ -72,9 +72,9 @@ def test_parse_config_toml_missing_tomllib(monkeypatch): assert "tomllib/tomli is required" in str(exc.value) -def test_generate_template_fallback_toml(): +def test_generate_jinja2_template_fallback_toml(): """ - When original_text is not provided, generate_template should use the + When original_text is not provided, generate_jinja2_template should use the structural fallback path for TOML configs. """ parsed_toml = { @@ -84,7 +84,7 @@ def test_generate_template_fallback_toml(): "file": {"path": "/tmp/app.log"} }, # nested table to hit recursive walk } - tmpl_toml = generate_template("toml", parsed=parsed_toml, role_prefix="role") + tmpl_toml = generate_jinja2_template("toml", parsed=parsed_toml, role_prefix="role") assert "[server]" in tmpl_toml assert "role_server_port" in tmpl_toml assert "[logging]" in tmpl_toml or "[logging.file]" in tmpl_toml diff --git a/tests/test_xml_handler.py b/tests/test_xml_handler.py index 6cf5836..6b124c4 100644 --- a/tests/test_xml_handler.py +++ b/tests/test_xml_handler.py @@ -10,8 +10,8 @@ import yaml from jinjaturtle.core import ( parse_config, flatten_config, - generate_defaults_yaml, - generate_template, + generate_ansible_yaml, + generate_jinja2_template, ) from jinjaturtle.handlers.xml import XmlHandler @@ -28,8 +28,8 @@ def test_xml_roundtrip_ossec_web_rules(): flat_items = flatten_config(fmt, parsed) assert flat_items, "Expected at least one flattened item from XML sample" - defaults_yaml = generate_defaults_yaml("ossec", flat_items) - defaults = yaml.safe_load(defaults_yaml) + ansible_yaml = generate_ansible_yaml("ossec", flat_items) + defaults = yaml.safe_load(ansible_yaml) # defaults should be a non-empty dict assert isinstance(defaults, dict) @@ -55,7 +55,9 @@ def test_xml_roundtrip_ossec_web_rules(): # Template generation (preserving comments) original_text = xml_path.read_text(encoding="utf-8") - template = generate_template(fmt, parsed, "ossec", original_text=original_text) + template = generate_jinja2_template( + fmt, parsed, "ossec", original_text=original_text + ) assert isinstance(template, str) assert template.strip(), "Template for XML sample should not be empty" @@ -108,13 +110,13 @@ def test_generate_xml_template_from_text_edge_cases(): assert "role_child_1" in tmpl -def test_generate_template_xml_type_error(): +def test_generate_jinja2_template_xml_type_error(): """ - Wrong type for XML in XmlHandler.generate_template should raise TypeError. + Wrong type for XML in XmlHandler.generate_jinja2_template should raise TypeError. """ handler = XmlHandler() with pytest.raises(TypeError): - handler.generate_template(parsed="not an element", role_prefix="role") + handler.generate_jinja2_template(parsed="not an element", role_prefix="role") def test_flatten_config_xml_type_error(): @@ -125,9 +127,9 @@ def test_flatten_config_xml_type_error(): flatten_config("xml", parsed="not-an-element") -def test_generate_template_xml_structural_fallback(): +def test_generate_jinja2_template_xml_structural_fallback(): """ - When original_text is not provided for XML, generate_template should use + When original_text is not provided for XML, generate_jinja2_template should use the structural fallback path (ET.tostring + handler processing). """ xml_text = textwrap.dedent( @@ -140,7 +142,7 @@ def test_generate_template_xml_structural_fallback(): ) root = ET.fromstring(xml_text) - tmpl = generate_template("xml", parsed=root, role_prefix="role") + tmpl = generate_jinja2_template("xml", parsed=root, role_prefix="role") # Root attribute path ("@attr",) -> role_attr assert "role_attr" in tmpl diff --git a/tests/test_yaml_handler.py b/tests/test_yaml_handler.py index f2d89f1..c7bacb7 100644 --- a/tests/test_yaml_handler.py +++ b/tests/test_yaml_handler.py @@ -8,8 +8,8 @@ import yaml from jinjaturtle.core import ( parse_config, flatten_config, - generate_defaults_yaml, - generate_template, + generate_ansible_yaml, + generate_jinja2_template, ) from jinjaturtle.handlers.yaml import YamlHandler @@ -24,8 +24,8 @@ def test_yaml_roundtrip_with_list_and_comment(): assert fmt == "yaml" flat_items = flatten_config(fmt, parsed) - defaults_yaml = generate_defaults_yaml("foobar", flat_items) - defaults = yaml.safe_load(defaults_yaml) + ansible_yaml = generate_ansible_yaml("foobar", flat_items) + defaults = yaml.safe_load(ansible_yaml) # Defaults: keys are flattened with indices assert defaults["foobar_foo"] == "bar" @@ -34,7 +34,9 @@ def test_yaml_roundtrip_with_list_and_comment(): # Template generation (preserving comments) original_text = yaml_path.read_text(encoding="utf-8") - template = generate_template(fmt, parsed, "foobar", original_text=original_text) + template = generate_jinja2_template( + fmt, parsed, "foobar", original_text=original_text + ) # Comment preserved assert "# Top comment" in template @@ -86,14 +88,14 @@ def test_generate_yaml_template_from_text_edge_cases(): assert "role_list_1" in tmpl -def test_generate_template_yaml_structural_fallback(): +def test_generate_jinja2_template_yaml_structural_fallback(): """ - When original_text is not provided for YAML, generate_template should use + When original_text is not provided for YAML, generate_jinja2_template should use the structural fallback path (yaml.safe_dump + handler processing). """ parsed = {"outer": {"inner": "val"}} - tmpl = generate_template("yaml", parsed=parsed, role_prefix="role") + tmpl = generate_jinja2_template("yaml", parsed=parsed, role_prefix="role") # We don't care about exact formatting, just that the expected variable # name shows up, proving we went through the structural path. From edd1acdabdd37c4e05a8f5586aae7d10e90e16ea Mon Sep 17 00:00:00 2001 From: Miguel Jacq Date: Fri, 28 Nov 2025 12:30:26 +1100 Subject: [PATCH 5/5] Add notes to the README about looping config --- README.md | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index c5702f3..27fc7c5 100644 --- a/README.md +++ b/README.md @@ -29,9 +29,12 @@ TOML, YAML, INI, JSON and XML-style config files should be okay. There are alway going to be some edge cases in very complex files that are difficult to work with, though, so you may still find that you need to tweak the results. -The tool does not do anything intelligent like detect common sections that -could practically be turned into 'for' loops in Jinja. You'd have to do those -sorts of optimisations yourself. +For XML and YAML files, JinjaTurtle will attempt to generate 'for' loops +and lists in the Ansible yaml if the config file looks homogenous enough to +support it. However, if it lacks the confidence in this, it will fall back to +using scalar-style flattened attributes. + +You may need or wish to tidy up the config to suit your needs. The goal here is really to *speed up* converting files into Ansible/Jinja2, but not necessarily to make it perfect.