Add ability to generate 'loops' in Jinja if the XML or YAML config supports it
This commit is contained in:
parent
4f9d1a0442
commit
2db80cc6e1
9 changed files with 1411 additions and 37 deletions
|
|
@ -7,6 +7,7 @@ from pathlib import Path
|
||||||
|
|
||||||
from .core import (
|
from .core import (
|
||||||
parse_config,
|
parse_config,
|
||||||
|
analyze_loops,
|
||||||
flatten_config,
|
flatten_config,
|
||||||
generate_defaults_yaml,
|
generate_defaults_yaml,
|
||||||
generate_template,
|
generate_template,
|
||||||
|
|
@ -53,12 +54,27 @@ def _main(argv: list[str] | None = None) -> int:
|
||||||
args = parser.parse_args(argv)
|
args = parser.parse_args(argv)
|
||||||
|
|
||||||
config_path = Path(args.config)
|
config_path = Path(args.config)
|
||||||
fmt, parsed = parse_config(config_path, args.format)
|
|
||||||
flat_items = flatten_config(fmt, parsed)
|
|
||||||
defaults_yaml = generate_defaults_yaml(args.role_name, flat_items)
|
|
||||||
config_text = config_path.read_text(encoding="utf-8")
|
config_text = config_path.read_text(encoding="utf-8")
|
||||||
|
|
||||||
|
# Parse the config
|
||||||
|
fmt, parsed = parse_config(config_path, args.format)
|
||||||
|
|
||||||
|
# Analyze for loops
|
||||||
|
loop_candidates = analyze_loops(fmt, parsed)
|
||||||
|
|
||||||
|
# Flatten config (excluding loop paths if loops are detected)
|
||||||
|
flat_items = flatten_config(fmt, parsed, loop_candidates)
|
||||||
|
|
||||||
|
# Generate defaults YAML (with loop collections if detected)
|
||||||
|
defaults_yaml = generate_defaults_yaml(args.role_name, flat_items, loop_candidates)
|
||||||
|
|
||||||
|
# Generate template (with loops if detected)
|
||||||
template_str = generate_template(
|
template_str = generate_template(
|
||||||
fmt, parsed, args.role_name, original_text=config_text
|
fmt,
|
||||||
|
parsed,
|
||||||
|
args.role_name,
|
||||||
|
original_text=config_text,
|
||||||
|
loop_candidates=loop_candidates,
|
||||||
)
|
)
|
||||||
|
|
||||||
if args.defaults_output:
|
if args.defaults_output:
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ from typing import Any, Iterable
|
||||||
|
|
||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
|
from .loop_analyzer import LoopAnalyzer, LoopCandidate
|
||||||
from .handlers import (
|
from .handlers import (
|
||||||
BaseHandler,
|
BaseHandler,
|
||||||
IniHandler,
|
IniHandler,
|
||||||
|
|
@ -12,25 +13,30 @@ from .handlers import (
|
||||||
TomlHandler,
|
TomlHandler,
|
||||||
YamlHandler,
|
YamlHandler,
|
||||||
XmlHandler,
|
XmlHandler,
|
||||||
|
YamlHandlerLoopable,
|
||||||
|
XmlHandlerLoopable,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class QuotedString(str):
|
class QuotedString(str):
|
||||||
"""Marker type for strings that must be double-quoted in YAML output."""
|
"""
|
||||||
|
Marker type for strings that must be double-quoted in YAML output.
|
||||||
|
"""
|
||||||
|
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def _fallback_str_representer(dumper: yaml.SafeDumper, data: Any):
|
def _fallback_str_representer(dumper: yaml.SafeDumper, data: Any):
|
||||||
"""
|
"""
|
||||||
Fallback for objects the dumper doesn't know about. Represent them as
|
Fallback for objects the dumper doesn't know about.
|
||||||
plain strings.
|
|
||||||
"""
|
"""
|
||||||
return dumper.represent_scalar("tag:yaml.org,2002:str", str(data))
|
return dumper.represent_scalar("tag:yaml.org,2002:str", str(data))
|
||||||
|
|
||||||
|
|
||||||
class _TurtleDumper(yaml.SafeDumper):
|
class _TurtleDumper(yaml.SafeDumper):
|
||||||
"""Custom YAML dumper that always double-quotes QuotedString values."""
|
"""
|
||||||
|
Custom YAML dumper that always double-quotes QuotedString values.
|
||||||
|
"""
|
||||||
|
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
@ -42,6 +48,7 @@ def _quoted_str_representer(dumper: yaml.SafeDumper, data: QuotedString):
|
||||||
_TurtleDumper.add_representer(QuotedString, _quoted_str_representer)
|
_TurtleDumper.add_representer(QuotedString, _quoted_str_representer)
|
||||||
# Use our fallback for any unknown object types
|
# Use our fallback for any unknown object types
|
||||||
_TurtleDumper.add_representer(None, _fallback_str_representer)
|
_TurtleDumper.add_representer(None, _fallback_str_representer)
|
||||||
|
|
||||||
_HANDLERS: dict[str, BaseHandler] = {}
|
_HANDLERS: dict[str, BaseHandler] = {}
|
||||||
|
|
||||||
_INI_HANDLER = IniHandler()
|
_INI_HANDLER = IniHandler()
|
||||||
|
|
@ -49,6 +56,9 @@ _JSON_HANDLER = JsonHandler()
|
||||||
_TOML_HANDLER = TomlHandler()
|
_TOML_HANDLER = TomlHandler()
|
||||||
_YAML_HANDLER = YamlHandler()
|
_YAML_HANDLER = YamlHandler()
|
||||||
_XML_HANDLER = XmlHandler()
|
_XML_HANDLER = XmlHandler()
|
||||||
|
_YAML_HANDLER_LOOPABLE = YamlHandlerLoopable()
|
||||||
|
_XML_HANDLER_LOOPABLE = XmlHandlerLoopable()
|
||||||
|
|
||||||
_HANDLERS["ini"] = _INI_HANDLER
|
_HANDLERS["ini"] = _INI_HANDLER
|
||||||
_HANDLERS["json"] = _JSON_HANDLER
|
_HANDLERS["json"] = _JSON_HANDLER
|
||||||
_HANDLERS["toml"] = _TOML_HANDLER
|
_HANDLERS["toml"] = _TOML_HANDLER
|
||||||
|
|
@ -57,17 +67,15 @@ _HANDLERS["xml"] = _XML_HANDLER
|
||||||
|
|
||||||
|
|
||||||
def make_var_name(role_prefix: str, path: Iterable[str]) -> str:
|
def make_var_name(role_prefix: str, path: Iterable[str]) -> str:
|
||||||
"""Wrapper for :meth:`BaseHandler.make_var_name`.
|
"""
|
||||||
|
Wrapper for :meth:`BaseHandler.make_var_name`.
|
||||||
This keeps the public API (and tests) working while the implementation
|
|
||||||
lives on the BaseHandler class.
|
|
||||||
"""
|
"""
|
||||||
return BaseHandler.make_var_name(role_prefix, path)
|
return BaseHandler.make_var_name(role_prefix, path)
|
||||||
|
|
||||||
|
|
||||||
def detect_format(path: Path, explicit: str | None = None) -> str:
|
def detect_format(path: Path, explicit: str | None = None) -> str:
|
||||||
"""
|
"""
|
||||||
Determine config format (toml, yaml, json, ini-ish, xml) from argument or filename.
|
Determine config format from argument or filename.
|
||||||
"""
|
"""
|
||||||
if explicit:
|
if explicit:
|
||||||
return explicit
|
return explicit
|
||||||
|
|
@ -99,27 +107,66 @@ def parse_config(path: Path, fmt: str | None = None) -> tuple[str, Any]:
|
||||||
return fmt, parsed
|
return fmt, parsed
|
||||||
|
|
||||||
|
|
||||||
def flatten_config(fmt: str, parsed: Any) -> list[tuple[tuple[str, ...], Any]]:
|
def analyze_loops(fmt: str, parsed: Any) -> list[LoopCandidate]:
|
||||||
"""
|
"""
|
||||||
Flatten parsed config into a list of (path_tuple, value).
|
Analyze parsed config to find loop opportunities.
|
||||||
|
"""
|
||||||
|
analyzer = LoopAnalyzer()
|
||||||
|
candidates = analyzer.analyze(parsed, fmt)
|
||||||
|
|
||||||
|
# Filter by confidence threshold
|
||||||
|
return [c for c in candidates if c.confidence >= LoopAnalyzer.MIN_CONFIDENCE]
|
||||||
|
|
||||||
|
|
||||||
|
def flatten_config(
|
||||||
|
fmt: str, parsed: Any, loop_candidates: list[LoopCandidate] | None = None
|
||||||
|
) -> list[tuple[tuple[str, ...], Any]]:
|
||||||
|
"""
|
||||||
|
Flatten parsed config into (path, value) pairs.
|
||||||
|
|
||||||
|
If loop_candidates is provided, paths within those loops are excluded
|
||||||
|
from flattening (they'll be handled via loops in the template).
|
||||||
"""
|
"""
|
||||||
handler = _HANDLERS.get(fmt)
|
handler = _HANDLERS.get(fmt)
|
||||||
if handler is None:
|
if handler is None:
|
||||||
# preserve previous ValueError for unsupported formats
|
|
||||||
raise ValueError(f"Unsupported format: {fmt}")
|
raise ValueError(f"Unsupported format: {fmt}")
|
||||||
return handler.flatten(parsed)
|
|
||||||
|
all_items = handler.flatten(parsed)
|
||||||
|
|
||||||
|
if not loop_candidates:
|
||||||
|
return all_items
|
||||||
|
|
||||||
|
# Build set of paths to exclude (anything under a loop path)
|
||||||
|
excluded_prefixes = {candidate.path for candidate in loop_candidates}
|
||||||
|
|
||||||
|
# Filter out items that fall under loop paths
|
||||||
|
filtered_items = []
|
||||||
|
for item_path, value in all_items:
|
||||||
|
# Check if this path starts with any loop path
|
||||||
|
is_excluded = False
|
||||||
|
for loop_path in excluded_prefixes:
|
||||||
|
if _path_starts_with(item_path, loop_path):
|
||||||
|
is_excluded = True
|
||||||
|
break
|
||||||
|
|
||||||
|
if not is_excluded:
|
||||||
|
filtered_items.append((item_path, value))
|
||||||
|
|
||||||
|
return filtered_items
|
||||||
|
|
||||||
|
|
||||||
|
def _path_starts_with(path: tuple[str, ...], prefix: tuple[str, ...]) -> bool:
|
||||||
|
"""Check if path starts with prefix."""
|
||||||
|
if len(path) < len(prefix):
|
||||||
|
return False
|
||||||
|
return path[: len(prefix)] == prefix
|
||||||
|
|
||||||
|
|
||||||
def _normalize_default_value(value: Any) -> Any:
|
def _normalize_default_value(value: Any) -> Any:
|
||||||
"""
|
"""
|
||||||
Ensure that 'true' / 'false' end up as quoted strings in YAML, not booleans.
|
Ensure that 'true' / 'false' end up as quoted strings in YAML.
|
||||||
|
|
||||||
- bool -> QuotedString("true"/"false")
|
|
||||||
- "true"/"false" (any case) -> QuotedString(original_text)
|
|
||||||
- everything else -> unchanged
|
|
||||||
"""
|
"""
|
||||||
if isinstance(value, bool):
|
if isinstance(value, bool):
|
||||||
# YAML booleans are lower-case; we keep them as strings.
|
|
||||||
return QuotedString("true" if value else "false")
|
return QuotedString("true" if value else "false")
|
||||||
if isinstance(value, str) and value.lower() in {"true", "false"}:
|
if isinstance(value, str) and value.lower() in {"true", "false"}:
|
||||||
return QuotedString(value)
|
return QuotedString(value)
|
||||||
|
|
@ -129,19 +176,24 @@ def _normalize_default_value(value: Any) -> Any:
|
||||||
def generate_defaults_yaml(
|
def generate_defaults_yaml(
|
||||||
role_prefix: str,
|
role_prefix: str,
|
||||||
flat_items: list[tuple[tuple[str, ...], Any]],
|
flat_items: list[tuple[tuple[str, ...], Any]],
|
||||||
|
loop_candidates: list[LoopCandidate] | None = None,
|
||||||
) -> str:
|
) -> str:
|
||||||
"""
|
"""
|
||||||
Create YAML for defaults/main.yml from flattened items.
|
Create Ansible YAML for defaults/main.yml.
|
||||||
|
|
||||||
Boolean/boolean-like values ("true"/"false") are forced to be *strings*
|
|
||||||
and double-quoted in the resulting YAML so that Ansible does not coerce
|
|
||||||
them back into Python booleans.
|
|
||||||
"""
|
"""
|
||||||
defaults: dict[str, Any] = {}
|
defaults: dict[str, Any] = {}
|
||||||
|
|
||||||
|
# Add scalar variables
|
||||||
for path, value in flat_items:
|
for path, value in flat_items:
|
||||||
var_name = make_var_name(role_prefix, path)
|
var_name = make_var_name(role_prefix, path)
|
||||||
defaults[var_name] = _normalize_default_value(value)
|
defaults[var_name] = _normalize_default_value(value)
|
||||||
|
|
||||||
|
# Add loop collections
|
||||||
|
if loop_candidates:
|
||||||
|
for candidate in loop_candidates:
|
||||||
|
var_name = make_var_name(role_prefix, candidate.path)
|
||||||
|
defaults[var_name] = candidate.items
|
||||||
|
|
||||||
return yaml.dump(
|
return yaml.dump(
|
||||||
defaults,
|
defaults,
|
||||||
Dumper=_TurtleDumper,
|
Dumper=_TurtleDumper,
|
||||||
|
|
@ -158,16 +210,29 @@ def generate_template(
|
||||||
parsed: Any,
|
parsed: Any,
|
||||||
role_prefix: str,
|
role_prefix: str,
|
||||||
original_text: str | None = None,
|
original_text: str | None = None,
|
||||||
|
loop_candidates: list[LoopCandidate] | None = None,
|
||||||
) -> str:
|
) -> str:
|
||||||
"""
|
"""
|
||||||
Generate a Jinja2 template for the config.
|
Generate a Jinja2 template for the config.
|
||||||
|
|
||||||
If original_text is provided, comments and blank lines are preserved by
|
|
||||||
patching values in-place. Otherwise we fall back to reconstructing from
|
|
||||||
the parsed structure (no comments). JSON of course does not support
|
|
||||||
comments.
|
|
||||||
"""
|
"""
|
||||||
|
# Use enhanced handler if we have loop candidates
|
||||||
handler = _HANDLERS.get(fmt)
|
handler = _HANDLERS.get(fmt)
|
||||||
|
|
||||||
|
if loop_candidates and fmt in ("yaml", "xml"):
|
||||||
|
# Use enhanced handlers for YAML and XML when we have loops
|
||||||
|
if fmt == "yaml":
|
||||||
|
handler = _YAML_HANDLER_LOOPABLE
|
||||||
|
elif fmt == "xml":
|
||||||
|
handler = _XML_HANDLER_LOOPABLE
|
||||||
|
|
||||||
if handler is None:
|
if handler is None:
|
||||||
raise ValueError(f"Unsupported format: {fmt}")
|
raise ValueError(f"Unsupported format: {fmt}")
|
||||||
|
|
||||||
|
# Check if handler supports loop-aware generation
|
||||||
|
if hasattr(handler, "generate_template_with_loops") and loop_candidates:
|
||||||
|
return handler.generate_template_with_loops(
|
||||||
|
parsed, role_prefix, original_text, loop_candidates
|
||||||
|
)
|
||||||
|
|
||||||
|
# Fallback to original scalar-only generation
|
||||||
return handler.generate_template(parsed, role_prefix, original_text=original_text)
|
return handler.generate_template(parsed, role_prefix, original_text=original_text)
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,8 @@ from .json import JsonHandler
|
||||||
from .toml import TomlHandler
|
from .toml import TomlHandler
|
||||||
from .yaml import YamlHandler
|
from .yaml import YamlHandler
|
||||||
from .xml import XmlHandler
|
from .xml import XmlHandler
|
||||||
|
from .xml_loopable import XmlHandlerLoopable
|
||||||
|
from .yaml_loopable import YamlHandlerLoopable
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
"BaseHandler",
|
"BaseHandler",
|
||||||
|
|
@ -16,4 +18,6 @@ __all__ = [
|
||||||
"TomlHandler",
|
"TomlHandler",
|
||||||
"YamlHandler",
|
"YamlHandler",
|
||||||
"XmlHandler",
|
"XmlHandler",
|
||||||
|
"XmlHandlerLoopable",
|
||||||
|
"YamlHandlerLoopable",
|
||||||
]
|
]
|
||||||
|
|
|
||||||
|
|
@ -72,8 +72,8 @@ class IniHandler(BaseHandler):
|
||||||
|
|
||||||
def _generate_ini_template_from_text(self, role_prefix: str, text: str) -> str:
|
def _generate_ini_template_from_text(self, role_prefix: str, text: str) -> str:
|
||||||
"""
|
"""
|
||||||
Generate a Jinja2 template for an INI-style file, preserving comments,
|
Generate a Jinja2 template for an INI/php.ini-style file, preserving
|
||||||
blank lines, and section headers by patching values in-place.
|
comments, blank lines, and section headers by patching values in-place.
|
||||||
"""
|
"""
|
||||||
lines = text.splitlines(keepends=True)
|
lines = text.splitlines(keepends=True)
|
||||||
current_section: str | None = None
|
current_section: str | None = None
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,7 @@ class JsonHandler(DictLikeHandler):
|
||||||
) -> str:
|
) -> str:
|
||||||
if not isinstance(parsed, (dict, list)):
|
if not isinstance(parsed, (dict, list)):
|
||||||
raise TypeError("JSON parser result must be a dict or list")
|
raise TypeError("JSON parser result must be a dict or list")
|
||||||
# Rebuild structurally
|
# As before: ignore original_text and rebuild structurally
|
||||||
return self._generate_json_template(role_prefix, parsed)
|
return self._generate_json_template(role_prefix, parsed)
|
||||||
|
|
||||||
def _generate_json_template(self, role_prefix: str, data: Any) -> str:
|
def _generate_json_template(self, role_prefix: str, data: Any) -> str:
|
||||||
|
|
|
||||||
405
src/jinjaturtle/handlers/xml_loopable.py
Normal file
405
src/jinjaturtle/handlers/xml_loopable.py
Normal file
|
|
@ -0,0 +1,405 @@
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from collections import Counter, defaultdict
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
import xml.etree.ElementTree as ET # nosec
|
||||||
|
|
||||||
|
from .base import BaseHandler
|
||||||
|
from ..loop_analyzer import LoopCandidate
|
||||||
|
|
||||||
|
|
||||||
|
class XmlHandlerLoopable(BaseHandler):
|
||||||
|
"""
|
||||||
|
XML handler that can generate both scalar templates and loop-based templates.
|
||||||
|
"""
|
||||||
|
|
||||||
|
fmt = "xml"
|
||||||
|
|
||||||
|
def parse(self, path: Path) -> ET.Element:
|
||||||
|
text = path.read_text(encoding="utf-8")
|
||||||
|
parser = ET.XMLParser(
|
||||||
|
target=ET.TreeBuilder(insert_comments=False)
|
||||||
|
) # nosec B314
|
||||||
|
parser.feed(text)
|
||||||
|
root = parser.close()
|
||||||
|
return root
|
||||||
|
|
||||||
|
def flatten(self, parsed: Any) -> list[tuple[tuple[str, ...], Any]]:
|
||||||
|
if not isinstance(parsed, ET.Element):
|
||||||
|
raise TypeError("XML parser result must be an Element")
|
||||||
|
return self._flatten_xml(parsed)
|
||||||
|
|
||||||
|
def generate_template(
|
||||||
|
self,
|
||||||
|
parsed: Any,
|
||||||
|
role_prefix: str,
|
||||||
|
original_text: str | None = None,
|
||||||
|
) -> str:
|
||||||
|
"""Original scalar-only template generation."""
|
||||||
|
if original_text is not None:
|
||||||
|
return self._generate_xml_template_from_text(role_prefix, original_text)
|
||||||
|
if not isinstance(parsed, ET.Element):
|
||||||
|
raise TypeError("XML parser result must be an Element")
|
||||||
|
xml_str = ET.tostring(parsed, encoding="unicode")
|
||||||
|
return self._generate_xml_template_from_text(role_prefix, xml_str)
|
||||||
|
|
||||||
|
def generate_template_with_loops(
|
||||||
|
self,
|
||||||
|
parsed: Any,
|
||||||
|
role_prefix: str,
|
||||||
|
original_text: str | None,
|
||||||
|
loop_candidates: list[LoopCandidate],
|
||||||
|
) -> str:
|
||||||
|
"""Generate template with Jinja2 for loops where appropriate."""
|
||||||
|
|
||||||
|
if original_text is not None:
|
||||||
|
return self._generate_xml_template_with_loops_from_text(
|
||||||
|
role_prefix, original_text, loop_candidates
|
||||||
|
)
|
||||||
|
|
||||||
|
if not isinstance(parsed, ET.Element):
|
||||||
|
raise TypeError("XML parser result must be an Element")
|
||||||
|
|
||||||
|
xml_str = ET.tostring(parsed, encoding="unicode")
|
||||||
|
return self._generate_xml_template_with_loops_from_text(
|
||||||
|
role_prefix, xml_str, loop_candidates
|
||||||
|
)
|
||||||
|
|
||||||
|
def _flatten_xml(self, root: ET.Element) -> list[tuple[tuple[str, ...], Any]]:
|
||||||
|
"""Flatten an XML tree into (path, value) pairs."""
|
||||||
|
items: list[tuple[tuple[str, ...], Any]] = []
|
||||||
|
|
||||||
|
def walk(elem: ET.Element, path: tuple[str, ...]) -> None:
|
||||||
|
# Attributes
|
||||||
|
for attr_name, attr_val in elem.attrib.items():
|
||||||
|
attr_path = path + (f"@{attr_name}",)
|
||||||
|
items.append((attr_path, attr_val))
|
||||||
|
|
||||||
|
# Children
|
||||||
|
children = [c for c in list(elem) if isinstance(c.tag, str)]
|
||||||
|
|
||||||
|
# Text content
|
||||||
|
text = (elem.text or "").strip()
|
||||||
|
if text:
|
||||||
|
if not elem.attrib and not children:
|
||||||
|
items.append((path, text))
|
||||||
|
else:
|
||||||
|
items.append((path + ("value",), text))
|
||||||
|
|
||||||
|
# Repeated siblings get an index; singletons just use the tag
|
||||||
|
counts = Counter(child.tag for child in children)
|
||||||
|
index_counters: dict[str, int] = defaultdict(int)
|
||||||
|
|
||||||
|
for child in children:
|
||||||
|
tag = child.tag
|
||||||
|
if counts[tag] > 1:
|
||||||
|
idx = index_counters[tag]
|
||||||
|
index_counters[tag] += 1
|
||||||
|
child_path = path + (tag, str(idx))
|
||||||
|
else:
|
||||||
|
child_path = path + (tag,)
|
||||||
|
walk(child, child_path)
|
||||||
|
|
||||||
|
walk(root, ())
|
||||||
|
return items
|
||||||
|
|
||||||
|
def _split_xml_prolog(self, text: str) -> tuple[str, str]:
|
||||||
|
"""Split XML into (prolog, body)."""
|
||||||
|
i = 0
|
||||||
|
n = len(text)
|
||||||
|
prolog_parts: list[str] = []
|
||||||
|
|
||||||
|
while i < n:
|
||||||
|
while i < n and text[i].isspace():
|
||||||
|
prolog_parts.append(text[i])
|
||||||
|
i += 1
|
||||||
|
if i >= n:
|
||||||
|
break
|
||||||
|
|
||||||
|
if text.startswith("<?", i):
|
||||||
|
end = text.find("?>", i + 2)
|
||||||
|
if end == -1:
|
||||||
|
break
|
||||||
|
prolog_parts.append(text[i : end + 2])
|
||||||
|
i = end + 2
|
||||||
|
continue
|
||||||
|
|
||||||
|
if text.startswith("<!--", i):
|
||||||
|
end = text.find("-->", i + 4)
|
||||||
|
if end == -1:
|
||||||
|
break
|
||||||
|
prolog_parts.append(text[i : end + 3])
|
||||||
|
i = end + 3
|
||||||
|
continue
|
||||||
|
|
||||||
|
if text.startswith("<!DOCTYPE", i):
|
||||||
|
end = text.find(">", i + 9)
|
||||||
|
if end == -1:
|
||||||
|
break
|
||||||
|
prolog_parts.append(text[i : end + 1])
|
||||||
|
i = end + 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
if text[i] == "<":
|
||||||
|
break
|
||||||
|
|
||||||
|
break
|
||||||
|
|
||||||
|
return "".join(prolog_parts), text[i:]
|
||||||
|
|
||||||
|
def _apply_jinja_to_xml_tree(
|
||||||
|
self,
|
||||||
|
role_prefix: str,
|
||||||
|
root: ET.Element,
|
||||||
|
loop_candidates: list[LoopCandidate] | None = None,
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Mutate XML tree in-place, replacing values with Jinja expressions.
|
||||||
|
|
||||||
|
If loop_candidates is provided, repeated elements matching a candidate
|
||||||
|
will be replaced with a {% for %} loop.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Build a map of loop paths for quick lookup
|
||||||
|
loop_paths = {}
|
||||||
|
if loop_candidates:
|
||||||
|
for candidate in loop_candidates:
|
||||||
|
loop_paths[candidate.path] = candidate
|
||||||
|
|
||||||
|
def walk(elem: ET.Element, path: tuple[str, ...]) -> None:
|
||||||
|
# Attributes (unless this element is in a loop)
|
||||||
|
for attr_name in list(elem.attrib.keys()):
|
||||||
|
attr_path = path + (f"@{attr_name}",)
|
||||||
|
var_name = self.make_var_name(role_prefix, attr_path)
|
||||||
|
elem.set(attr_name, f"{{{{ {var_name} }}}}")
|
||||||
|
|
||||||
|
# Children
|
||||||
|
children = [c for c in list(elem) if isinstance(c.tag, str)]
|
||||||
|
|
||||||
|
# Text content
|
||||||
|
text = (elem.text or "").strip()
|
||||||
|
if text:
|
||||||
|
if not elem.attrib and not children:
|
||||||
|
text_path = path
|
||||||
|
else:
|
||||||
|
text_path = path + ("value",)
|
||||||
|
var_name = self.make_var_name(role_prefix, text_path)
|
||||||
|
elem.text = f"{{{{ {var_name} }}}}"
|
||||||
|
|
||||||
|
# Handle children - check for loops first
|
||||||
|
counts = Counter(child.tag for child in children)
|
||||||
|
index_counters: dict[str, int] = defaultdict(int)
|
||||||
|
|
||||||
|
# Check each tag to see if it's a loop candidate
|
||||||
|
processed_tags = set()
|
||||||
|
|
||||||
|
for child in children:
|
||||||
|
tag = child.tag
|
||||||
|
|
||||||
|
# Skip if we've already processed this tag as a loop
|
||||||
|
if tag in processed_tags:
|
||||||
|
continue
|
||||||
|
|
||||||
|
child_path = path + (tag,)
|
||||||
|
|
||||||
|
# Check if this is a loop candidate
|
||||||
|
if child_path in loop_paths:
|
||||||
|
# Mark this tag as processed
|
||||||
|
processed_tags.add(tag)
|
||||||
|
|
||||||
|
# Remove all children with this tag
|
||||||
|
for child_to_remove in [c for c in children if c.tag == tag]:
|
||||||
|
elem.remove(child_to_remove)
|
||||||
|
|
||||||
|
# Create a loop comment/marker
|
||||||
|
# We'll handle the actual loop generation in text processing
|
||||||
|
loop_marker = ET.Comment(f"LOOP:{tag}")
|
||||||
|
elem.append(loop_marker)
|
||||||
|
|
||||||
|
elif counts[tag] > 1:
|
||||||
|
# Multiple children but not a loop candidate - use indexed paths
|
||||||
|
idx = index_counters[tag]
|
||||||
|
index_counters[tag] += 1
|
||||||
|
indexed_path = path + (tag, str(idx))
|
||||||
|
walk(child, indexed_path)
|
||||||
|
else:
|
||||||
|
# Single child
|
||||||
|
walk(child, child_path)
|
||||||
|
|
||||||
|
walk(root, ())
|
||||||
|
|
||||||
|
def _generate_xml_template_from_text(self, role_prefix: str, text: str) -> str:
|
||||||
|
"""Generate scalar-only Jinja2 template."""
|
||||||
|
prolog, body = self._split_xml_prolog(text)
|
||||||
|
|
||||||
|
parser = ET.XMLParser(target=ET.TreeBuilder(insert_comments=True)) # nosec B314
|
||||||
|
parser.feed(body)
|
||||||
|
root = parser.close()
|
||||||
|
|
||||||
|
self._apply_jinja_to_xml_tree(role_prefix, root)
|
||||||
|
|
||||||
|
indent = getattr(ET, "indent", None)
|
||||||
|
if indent is not None:
|
||||||
|
indent(root, space=" ") # type: ignore[arg-type]
|
||||||
|
|
||||||
|
xml_body = ET.tostring(root, encoding="unicode")
|
||||||
|
return prolog + xml_body
|
||||||
|
|
||||||
|
def _generate_xml_template_with_loops_from_text(
|
||||||
|
self,
|
||||||
|
role_prefix: str,
|
||||||
|
text: str,
|
||||||
|
loop_candidates: list[LoopCandidate],
|
||||||
|
) -> str:
|
||||||
|
"""Generate Jinja2 template with for loops."""
|
||||||
|
|
||||||
|
prolog, body = self._split_xml_prolog(text)
|
||||||
|
|
||||||
|
# Parse with comments preserved
|
||||||
|
parser = ET.XMLParser(target=ET.TreeBuilder(insert_comments=True)) # nosec B314
|
||||||
|
parser.feed(body)
|
||||||
|
root = parser.close()
|
||||||
|
|
||||||
|
# Apply Jinja transformations (including loop markers)
|
||||||
|
self._apply_jinja_to_xml_tree(role_prefix, root, loop_candidates)
|
||||||
|
|
||||||
|
# Convert to string
|
||||||
|
indent = getattr(ET, "indent", None)
|
||||||
|
if indent is not None:
|
||||||
|
indent(root, space=" ") # type: ignore[arg-type]
|
||||||
|
|
||||||
|
xml_body = ET.tostring(root, encoding="unicode")
|
||||||
|
|
||||||
|
# Post-process to replace loop markers with actual Jinja loops
|
||||||
|
xml_body = self._insert_xml_loops(xml_body, role_prefix, loop_candidates, root)
|
||||||
|
|
||||||
|
return prolog + xml_body
|
||||||
|
|
||||||
|
def _insert_xml_loops(
|
||||||
|
self,
|
||||||
|
xml_str: str,
|
||||||
|
role_prefix: str,
|
||||||
|
loop_candidates: list[LoopCandidate],
|
||||||
|
root: ET.Element,
|
||||||
|
) -> str:
|
||||||
|
"""
|
||||||
|
Post-process XML string to insert Jinja2 for loops.
|
||||||
|
|
||||||
|
This replaces <!--LOOP:tagname--> markers with actual loop constructs.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Build a sample element for each loop to use as template
|
||||||
|
lines = xml_str.split("\n")
|
||||||
|
result_lines = []
|
||||||
|
|
||||||
|
for line in lines:
|
||||||
|
# Check if this line contains a loop marker
|
||||||
|
if "<!--LOOP:" in line:
|
||||||
|
# Extract tag name from marker
|
||||||
|
start = line.find("<!--LOOP:") + 9
|
||||||
|
end = line.find("-->", start)
|
||||||
|
tag_name = line[start:end].strip()
|
||||||
|
|
||||||
|
# Find matching loop candidate
|
||||||
|
candidate = None
|
||||||
|
for cand in loop_candidates:
|
||||||
|
if cand.path and cand.path[-1] == tag_name:
|
||||||
|
candidate = cand
|
||||||
|
break
|
||||||
|
|
||||||
|
if candidate:
|
||||||
|
# Get indentation from current line
|
||||||
|
indent_level = len(line) - len(line.lstrip())
|
||||||
|
indent_str = " " * indent_level
|
||||||
|
|
||||||
|
# Generate loop variable name
|
||||||
|
collection_var = self.make_var_name(role_prefix, candidate.path)
|
||||||
|
item_var = candidate.loop_var
|
||||||
|
|
||||||
|
# Create sample element from first item
|
||||||
|
if candidate.items:
|
||||||
|
sample_elem = self._dict_to_xml_element(
|
||||||
|
tag_name, candidate.items[0], item_var
|
||||||
|
)
|
||||||
|
|
||||||
|
# Apply indentation to the sample element
|
||||||
|
ET.indent(sample_elem, space=" ")
|
||||||
|
|
||||||
|
# Convert sample to string
|
||||||
|
sample_str = ET.tostring(
|
||||||
|
sample_elem, encoding="unicode"
|
||||||
|
).strip()
|
||||||
|
|
||||||
|
# Add proper indentation to each line of the sample
|
||||||
|
sample_lines = sample_str.split("\n")
|
||||||
|
indented_sample_lines = [
|
||||||
|
(
|
||||||
|
f"{indent_str} {line}"
|
||||||
|
if i > 0
|
||||||
|
else f"{indent_str} {line}"
|
||||||
|
)
|
||||||
|
for i, line in enumerate(sample_lines)
|
||||||
|
]
|
||||||
|
indented_sample = "\n".join(indented_sample_lines)
|
||||||
|
|
||||||
|
# Build loop
|
||||||
|
result_lines.append(
|
||||||
|
f"{indent_str}{{% for {item_var} in {collection_var} %}}"
|
||||||
|
)
|
||||||
|
result_lines.append(indented_sample)
|
||||||
|
result_lines.append(f"{indent_str}{{% endfor %}}")
|
||||||
|
else:
|
||||||
|
# Keep the marker if we can't find the candidate
|
||||||
|
result_lines.append(line)
|
||||||
|
else:
|
||||||
|
result_lines.append(line)
|
||||||
|
|
||||||
|
return "\n".join(result_lines)
|
||||||
|
|
||||||
|
def _dict_to_xml_element(
|
||||||
|
self, tag: str, data: dict[str, Any], loop_var: str
|
||||||
|
) -> ET.Element:
|
||||||
|
"""
|
||||||
|
Convert a dict to an XML element with Jinja2 variable references.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
tag: Element tag name
|
||||||
|
data: Dict representing element structure
|
||||||
|
loop_var: Loop variable name to use in Jinja expressions
|
||||||
|
"""
|
||||||
|
|
||||||
|
elem = ET.Element(tag)
|
||||||
|
|
||||||
|
# Handle attributes and child elements
|
||||||
|
for key, value in data.items():
|
||||||
|
if key.startswith("@"):
|
||||||
|
# Attribute
|
||||||
|
attr_name = key[1:] # Remove @ prefix
|
||||||
|
elem.set(attr_name, f"{{{{ {loop_var}.{attr_name} }}}}")
|
||||||
|
elif key == "_text":
|
||||||
|
# Simple text content
|
||||||
|
elem.text = f"{{{{ {loop_var} }}}}"
|
||||||
|
elif key == "value":
|
||||||
|
# Text with attributes/children
|
||||||
|
elem.text = f"{{{{ {loop_var}.value }}}}"
|
||||||
|
elif key == "_key":
|
||||||
|
# This is the dict key (for dict collections), skip in XML
|
||||||
|
pass
|
||||||
|
elif isinstance(value, dict):
|
||||||
|
# Nested element - check if it has _text
|
||||||
|
child = ET.SubElement(elem, key)
|
||||||
|
if "_text" in value:
|
||||||
|
child.text = f"{{{{ {loop_var}.{key}._text }}}}"
|
||||||
|
else:
|
||||||
|
# More complex nested structure
|
||||||
|
for sub_key, sub_val in value.items():
|
||||||
|
if not sub_key.startswith("_"):
|
||||||
|
grandchild = ET.SubElement(child, sub_key)
|
||||||
|
grandchild.text = f"{{{{ {loop_var}.{key}.{sub_key} }}}}"
|
||||||
|
elif not isinstance(value, list):
|
||||||
|
# Simple child element (scalar value)
|
||||||
|
child = ET.SubElement(elem, key)
|
||||||
|
child.text = f"{{{{ {loop_var}.{key} }}}}"
|
||||||
|
|
||||||
|
return elem
|
||||||
|
|
@ -9,7 +9,7 @@ from . import DictLikeHandler
|
||||||
|
|
||||||
class YamlHandler(DictLikeHandler):
|
class YamlHandler(DictLikeHandler):
|
||||||
fmt = "yaml"
|
fmt = "yaml"
|
||||||
flatten_lists = True
|
flatten_lists = True # you flatten YAML lists
|
||||||
|
|
||||||
def parse(self, path: Path) -> Any:
|
def parse(self, path: Path) -> Any:
|
||||||
text = path.read_text(encoding="utf-8")
|
text = path.read_text(encoding="utf-8")
|
||||||
|
|
@ -97,6 +97,8 @@ class YamlHandler(DictLikeHandler):
|
||||||
out_lines.append(raw_line)
|
out_lines.append(raw_line)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# We have an inline scalar value on this same line.
|
||||||
|
|
||||||
# Separate value from inline comment
|
# Separate value from inline comment
|
||||||
value_part, comment_part = self._split_inline_comment(
|
value_part, comment_part = self._split_inline_comment(
|
||||||
rest_stripped, {"#"}
|
rest_stripped, {"#"}
|
||||||
|
|
|
||||||
449
src/jinjaturtle/handlers/yaml_loopable.py
Normal file
449
src/jinjaturtle/handlers/yaml_loopable.py
Normal file
|
|
@ -0,0 +1,449 @@
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import yaml
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from .dict import DictLikeHandler
|
||||||
|
from ..loop_analyzer import LoopCandidate
|
||||||
|
|
||||||
|
|
||||||
|
class YamlHandlerLoopable(DictLikeHandler):
|
||||||
|
"""
|
||||||
|
YAML handler that can generate both scalar templates and loop-based templates.
|
||||||
|
"""
|
||||||
|
|
||||||
|
fmt = "yaml"
|
||||||
|
flatten_lists = True
|
||||||
|
|
||||||
|
def parse(self, path: Path) -> Any:
|
||||||
|
text = path.read_text(encoding="utf-8")
|
||||||
|
return yaml.safe_load(text) or {}
|
||||||
|
|
||||||
|
def generate_template(
|
||||||
|
self,
|
||||||
|
parsed: Any,
|
||||||
|
role_prefix: str,
|
||||||
|
original_text: str | None = None,
|
||||||
|
) -> str:
|
||||||
|
"""Original scalar-only template generation."""
|
||||||
|
if original_text is not None:
|
||||||
|
return self._generate_yaml_template_from_text(role_prefix, original_text)
|
||||||
|
if not isinstance(parsed, (dict, list)):
|
||||||
|
raise TypeError("YAML parser result must be a dict or list")
|
||||||
|
dumped = yaml.safe_dump(parsed, sort_keys=False)
|
||||||
|
return self._generate_yaml_template_from_text(role_prefix, dumped)
|
||||||
|
|
||||||
|
def generate_template_with_loops(
|
||||||
|
self,
|
||||||
|
parsed: Any,
|
||||||
|
role_prefix: str,
|
||||||
|
original_text: str | None,
|
||||||
|
loop_candidates: list[LoopCandidate],
|
||||||
|
) -> str:
|
||||||
|
"""Generate template with Jinja2 for loops where appropriate."""
|
||||||
|
|
||||||
|
# Build loop path set for quick lookup
|
||||||
|
loop_paths = {candidate.path for candidate in loop_candidates}
|
||||||
|
|
||||||
|
if original_text is not None:
|
||||||
|
return self._generate_yaml_template_with_loops_from_text(
|
||||||
|
role_prefix, original_text, loop_candidates, loop_paths
|
||||||
|
)
|
||||||
|
|
||||||
|
if not isinstance(parsed, (dict, list)):
|
||||||
|
raise TypeError("YAML parser result must be a dict or list")
|
||||||
|
|
||||||
|
dumped = yaml.safe_dump(parsed, sort_keys=False)
|
||||||
|
return self._generate_yaml_template_with_loops_from_text(
|
||||||
|
role_prefix, dumped, loop_candidates, loop_paths
|
||||||
|
)
|
||||||
|
|
||||||
|
def _generate_yaml_template_from_text(
|
||||||
|
self,
|
||||||
|
role_prefix: str,
|
||||||
|
text: str,
|
||||||
|
) -> str:
|
||||||
|
"""Original scalar-only template generation (unchanged from base)."""
|
||||||
|
lines = text.splitlines(keepends=True)
|
||||||
|
out_lines: list[str] = []
|
||||||
|
|
||||||
|
stack: list[tuple[int, tuple[str, ...], str]] = []
|
||||||
|
seq_counters: dict[tuple[str, ...], int] = {}
|
||||||
|
|
||||||
|
def current_path() -> tuple[str, ...]:
|
||||||
|
return stack[-1][1] if stack else ()
|
||||||
|
|
||||||
|
for raw_line in lines:
|
||||||
|
stripped = raw_line.lstrip()
|
||||||
|
indent = len(raw_line) - len(stripped)
|
||||||
|
|
||||||
|
if not stripped or stripped.startswith("#"):
|
||||||
|
out_lines.append(raw_line)
|
||||||
|
continue
|
||||||
|
|
||||||
|
while stack and indent < stack[-1][0]:
|
||||||
|
stack.pop()
|
||||||
|
|
||||||
|
if ":" in stripped and not stripped.lstrip().startswith("- "):
|
||||||
|
key_part, rest = stripped.split(":", 1)
|
||||||
|
key = key_part.strip()
|
||||||
|
if not key:
|
||||||
|
out_lines.append(raw_line)
|
||||||
|
continue
|
||||||
|
|
||||||
|
rest_stripped = rest.lstrip(" \t")
|
||||||
|
value_candidate, _ = self._split_inline_comment(rest_stripped, {"#"})
|
||||||
|
has_value = bool(value_candidate.strip())
|
||||||
|
|
||||||
|
if stack and stack[-1][0] == indent and stack[-1][2] == "map":
|
||||||
|
stack.pop()
|
||||||
|
path = current_path() + (key,)
|
||||||
|
stack.append((indent, path, "map"))
|
||||||
|
|
||||||
|
if not has_value:
|
||||||
|
out_lines.append(raw_line)
|
||||||
|
continue
|
||||||
|
|
||||||
|
value_part, comment_part = self._split_inline_comment(
|
||||||
|
rest_stripped, {"#"}
|
||||||
|
)
|
||||||
|
raw_value = value_part.strip()
|
||||||
|
var_name = self.make_var_name(role_prefix, path)
|
||||||
|
|
||||||
|
use_quotes = (
|
||||||
|
len(raw_value) >= 2
|
||||||
|
and raw_value[0] == raw_value[-1]
|
||||||
|
and raw_value[0] in {'"', "'"}
|
||||||
|
)
|
||||||
|
|
||||||
|
if use_quotes:
|
||||||
|
q = raw_value[0]
|
||||||
|
replacement = f"{q}{{{{ {var_name} }}}}{q}"
|
||||||
|
else:
|
||||||
|
replacement = f"{{{{ {var_name} }}}}"
|
||||||
|
|
||||||
|
leading = rest[: len(rest) - len(rest.lstrip(" \t"))]
|
||||||
|
new_stripped = f"{key}: {leading}{replacement}{comment_part}"
|
||||||
|
out_lines.append(
|
||||||
|
" " * indent
|
||||||
|
+ new_stripped
|
||||||
|
+ ("\n" if raw_line.endswith("\n") else "")
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
if stripped.startswith("- "):
|
||||||
|
if not stack or stack[-1][0] != indent or stack[-1][2] != "seq":
|
||||||
|
parent_path = current_path()
|
||||||
|
stack.append((indent, parent_path, "seq"))
|
||||||
|
|
||||||
|
parent_path = stack[-1][1]
|
||||||
|
content = stripped[2:]
|
||||||
|
|
||||||
|
index = seq_counters.get(parent_path, 0)
|
||||||
|
seq_counters[parent_path] = index + 1
|
||||||
|
|
||||||
|
path = parent_path + (str(index),)
|
||||||
|
|
||||||
|
value_part, comment_part = self._split_inline_comment(content, {"#"})
|
||||||
|
raw_value = value_part.strip()
|
||||||
|
var_name = self.make_var_name(role_prefix, path)
|
||||||
|
|
||||||
|
use_quotes = (
|
||||||
|
len(raw_value) >= 2
|
||||||
|
and raw_value[0] == raw_value[-1]
|
||||||
|
and raw_value[0] in {'"', "'"}
|
||||||
|
)
|
||||||
|
|
||||||
|
if use_quotes:
|
||||||
|
q = raw_value[0]
|
||||||
|
replacement = f"{q}{{{{ {var_name} }}}}{q}"
|
||||||
|
else:
|
||||||
|
replacement = f"{{{{ {var_name} }}}}"
|
||||||
|
|
||||||
|
new_stripped = f"- {replacement}{comment_part}"
|
||||||
|
out_lines.append(
|
||||||
|
" " * indent
|
||||||
|
+ new_stripped
|
||||||
|
+ ("\n" if raw_line.endswith("\n") else "")
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
out_lines.append(raw_line)
|
||||||
|
|
||||||
|
return "".join(out_lines)
|
||||||
|
|
||||||
|
def _generate_yaml_template_with_loops_from_text(
|
||||||
|
self,
|
||||||
|
role_prefix: str,
|
||||||
|
text: str,
|
||||||
|
loop_candidates: list[LoopCandidate],
|
||||||
|
loop_paths: set[tuple[str, ...]],
|
||||||
|
) -> str:
|
||||||
|
"""
|
||||||
|
Generate YAML template with Jinja2 for loops.
|
||||||
|
|
||||||
|
Strategy:
|
||||||
|
1. Parse YAML line-by-line maintaining context
|
||||||
|
2. When we encounter a path that's a loop candidate:
|
||||||
|
- Replace that section with a {% for %} loop
|
||||||
|
- Use the first item as template structure
|
||||||
|
3. Everything else gets scalar variable replacement
|
||||||
|
"""
|
||||||
|
|
||||||
|
lines = text.splitlines(keepends=True)
|
||||||
|
out_lines: list[str] = []
|
||||||
|
|
||||||
|
stack: list[tuple[int, tuple[str, ...], str]] = []
|
||||||
|
seq_counters: dict[tuple[str, ...], int] = {}
|
||||||
|
|
||||||
|
# Track which lines are part of loop sections (to skip them)
|
||||||
|
skip_until_indent: int | None = None
|
||||||
|
|
||||||
|
def current_path() -> tuple[str, ...]:
|
||||||
|
return stack[-1][1] if stack else ()
|
||||||
|
|
||||||
|
for raw_line in lines:
|
||||||
|
stripped = raw_line.lstrip()
|
||||||
|
indent = len(raw_line) - len(stripped)
|
||||||
|
|
||||||
|
# If we're skipping lines (inside a loop section), check if we can stop
|
||||||
|
if skip_until_indent is not None:
|
||||||
|
if (
|
||||||
|
indent <= skip_until_indent
|
||||||
|
and stripped
|
||||||
|
and not stripped.startswith("#")
|
||||||
|
):
|
||||||
|
skip_until_indent = None
|
||||||
|
else:
|
||||||
|
continue # Skip this line
|
||||||
|
|
||||||
|
# Blank or comment lines
|
||||||
|
if not stripped or stripped.startswith("#"):
|
||||||
|
out_lines.append(raw_line)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Adjust stack based on indent
|
||||||
|
while stack and indent < stack[-1][0]:
|
||||||
|
stack.pop()
|
||||||
|
|
||||||
|
# --- Handle mapping key lines: "key:" or "key: value"
|
||||||
|
if ":" in stripped and not stripped.lstrip().startswith("- "):
|
||||||
|
key_part, rest = stripped.split(":", 1)
|
||||||
|
key = key_part.strip()
|
||||||
|
if not key:
|
||||||
|
out_lines.append(raw_line)
|
||||||
|
continue
|
||||||
|
|
||||||
|
rest_stripped = rest.lstrip(" \t")
|
||||||
|
value_candidate, _ = self._split_inline_comment(rest_stripped, {"#"})
|
||||||
|
has_value = bool(value_candidate.strip())
|
||||||
|
|
||||||
|
if stack and stack[-1][0] == indent and stack[-1][2] == "map":
|
||||||
|
stack.pop()
|
||||||
|
path = current_path() + (key,)
|
||||||
|
stack.append((indent, path, "map"))
|
||||||
|
|
||||||
|
# Check if this path is a loop candidate
|
||||||
|
if path in loop_paths:
|
||||||
|
# Find the matching candidate
|
||||||
|
candidate = next(c for c in loop_candidates if c.path == path)
|
||||||
|
|
||||||
|
# Generate loop
|
||||||
|
loop_str = self._generate_yaml_loop(candidate, role_prefix, indent)
|
||||||
|
out_lines.append(loop_str)
|
||||||
|
|
||||||
|
# Skip subsequent lines that are part of this collection
|
||||||
|
skip_until_indent = indent
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not has_value:
|
||||||
|
out_lines.append(raw_line)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Scalar value - replace with variable
|
||||||
|
value_part, comment_part = self._split_inline_comment(
|
||||||
|
rest_stripped, {"#"}
|
||||||
|
)
|
||||||
|
raw_value = value_part.strip()
|
||||||
|
var_name = self.make_var_name(role_prefix, path)
|
||||||
|
|
||||||
|
use_quotes = (
|
||||||
|
len(raw_value) >= 2
|
||||||
|
and raw_value[0] == raw_value[-1]
|
||||||
|
and raw_value[0] in {'"', "'"}
|
||||||
|
)
|
||||||
|
|
||||||
|
if use_quotes:
|
||||||
|
q = raw_value[0]
|
||||||
|
replacement = f"{q}{{{{ {var_name} }}}}{q}"
|
||||||
|
else:
|
||||||
|
replacement = f"{{{{ {var_name} }}}}"
|
||||||
|
|
||||||
|
leading = rest[: len(rest) - len(rest.lstrip(" \t"))]
|
||||||
|
new_stripped = f"{key}: {leading}{replacement}{comment_part}"
|
||||||
|
out_lines.append(
|
||||||
|
" " * indent
|
||||||
|
+ new_stripped
|
||||||
|
+ ("\n" if raw_line.endswith("\n") else "")
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# --- Handle list items: "- value" or "- key: value"
|
||||||
|
if stripped.startswith("- "):
|
||||||
|
if not stack or stack[-1][0] != indent or stack[-1][2] != "seq":
|
||||||
|
parent_path = current_path()
|
||||||
|
stack.append((indent, parent_path, "seq"))
|
||||||
|
|
||||||
|
parent_path = stack[-1][1]
|
||||||
|
|
||||||
|
# Check if parent path is a loop candidate
|
||||||
|
if parent_path in loop_paths:
|
||||||
|
# Find the matching candidate
|
||||||
|
candidate = next(
|
||||||
|
c for c in loop_candidates if c.path == parent_path
|
||||||
|
)
|
||||||
|
|
||||||
|
# Generate loop (with indent for the '-' items)
|
||||||
|
loop_str = self._generate_yaml_loop(
|
||||||
|
candidate, role_prefix, indent, is_list=True
|
||||||
|
)
|
||||||
|
out_lines.append(loop_str)
|
||||||
|
|
||||||
|
# Skip subsequent items
|
||||||
|
skip_until_indent = indent - 1 if indent > 0 else None
|
||||||
|
continue
|
||||||
|
|
||||||
|
content = stripped[2:]
|
||||||
|
index = seq_counters.get(parent_path, 0)
|
||||||
|
seq_counters[parent_path] = index + 1
|
||||||
|
|
||||||
|
path = parent_path + (str(index),)
|
||||||
|
|
||||||
|
value_part, comment_part = self._split_inline_comment(content, {"#"})
|
||||||
|
raw_value = value_part.strip()
|
||||||
|
var_name = self.make_var_name(role_prefix, path)
|
||||||
|
|
||||||
|
use_quotes = (
|
||||||
|
len(raw_value) >= 2
|
||||||
|
and raw_value[0] == raw_value[-1]
|
||||||
|
and raw_value[0] in {'"', "'"}
|
||||||
|
)
|
||||||
|
|
||||||
|
if use_quotes:
|
||||||
|
q = raw_value[0]
|
||||||
|
replacement = f"{q}{{{{ {var_name} }}}}{q}"
|
||||||
|
else:
|
||||||
|
replacement = f"{{{{ {var_name} }}}}"
|
||||||
|
|
||||||
|
new_stripped = f"- {replacement}{comment_part}"
|
||||||
|
out_lines.append(
|
||||||
|
" " * indent
|
||||||
|
+ new_stripped
|
||||||
|
+ ("\n" if raw_line.endswith("\n") else "")
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
out_lines.append(raw_line)
|
||||||
|
|
||||||
|
return "".join(out_lines)
|
||||||
|
|
||||||
|
def _generate_yaml_loop(
|
||||||
|
self,
|
||||||
|
candidate: LoopCandidate,
|
||||||
|
role_prefix: str,
|
||||||
|
indent: int,
|
||||||
|
is_list: bool = False,
|
||||||
|
) -> str:
|
||||||
|
"""
|
||||||
|
Generate a Jinja2 for loop for a YAML collection.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
candidate: Loop candidate with items and metadata
|
||||||
|
role_prefix: Variable prefix
|
||||||
|
indent: Indentation level in spaces
|
||||||
|
is_list: True if this is a YAML list, False if dict
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
YAML string with Jinja2 loop
|
||||||
|
"""
|
||||||
|
|
||||||
|
indent_str = " " * indent
|
||||||
|
collection_var = self.make_var_name(role_prefix, candidate.path)
|
||||||
|
item_var = candidate.loop_var
|
||||||
|
|
||||||
|
lines = []
|
||||||
|
|
||||||
|
if not is_list:
|
||||||
|
# Dict-style: key: {% for ... %}
|
||||||
|
key = candidate.path[-1] if candidate.path else "items"
|
||||||
|
lines.append(f"{indent_str}{key}:")
|
||||||
|
lines.append(f"{indent_str} {{% for {item_var} in {collection_var} %}}")
|
||||||
|
else:
|
||||||
|
# List-style: just the loop
|
||||||
|
lines.append(f"{indent_str}{{% for {item_var} in {collection_var} %}}")
|
||||||
|
|
||||||
|
# Generate template for item structure
|
||||||
|
if candidate.items:
|
||||||
|
sample_item = candidate.items[0]
|
||||||
|
item_indent = indent + 2 if not is_list else indent
|
||||||
|
|
||||||
|
if candidate.item_schema == "scalar":
|
||||||
|
# Simple list of scalars
|
||||||
|
if is_list:
|
||||||
|
lines.append(f"{indent_str}- {{{{ {item_var} }}}}")
|
||||||
|
else:
|
||||||
|
lines.append(f"{indent_str} - {{{{ {item_var} }}}}")
|
||||||
|
|
||||||
|
elif candidate.item_schema in ("simple_dict", "nested"):
|
||||||
|
# List of dicts or complex items - these are ALWAYS list items in YAML
|
||||||
|
item_lines = self._dict_to_yaml_lines(
|
||||||
|
sample_item, item_var, item_indent, is_list_item=True
|
||||||
|
)
|
||||||
|
lines.extend(item_lines)
|
||||||
|
|
||||||
|
# Close loop
|
||||||
|
close_indent = indent + 2 if not is_list else indent
|
||||||
|
lines.append(f"{' ' * close_indent}{{% endfor %}}")
|
||||||
|
|
||||||
|
return "\n".join(lines) + "\n"
|
||||||
|
|
||||||
|
def _dict_to_yaml_lines(
|
||||||
|
self,
|
||||||
|
data: dict[str, Any],
|
||||||
|
loop_var: str,
|
||||||
|
indent: int,
|
||||||
|
is_list_item: bool = False,
|
||||||
|
) -> list[str]:
|
||||||
|
"""
|
||||||
|
Convert a dict to YAML lines with Jinja2 variable references.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data: Dict representing item structure
|
||||||
|
loop_var: Loop variable name
|
||||||
|
indent: Base indentation level
|
||||||
|
is_list_item: True if this should start with '-'
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of YAML lines
|
||||||
|
"""
|
||||||
|
|
||||||
|
lines = []
|
||||||
|
indent_str = " " * indent
|
||||||
|
|
||||||
|
first_key = True
|
||||||
|
for key, value in data.items():
|
||||||
|
if key == "_key":
|
||||||
|
# Special key for dict collections - output as comment or skip
|
||||||
|
continue
|
||||||
|
|
||||||
|
if first_key and is_list_item:
|
||||||
|
# First key gets the list marker
|
||||||
|
lines.append(f"{indent_str}- {key}: {{{{ {loop_var}.{key} }}}}")
|
||||||
|
first_key = False
|
||||||
|
else:
|
||||||
|
# Subsequent keys are indented
|
||||||
|
sub_indent = indent + 2 if is_list_item else indent
|
||||||
|
lines.append(f"{' ' * sub_indent}{key}: {{{{ {loop_var}.{key} }}}}")
|
||||||
|
|
||||||
|
return lines
|
||||||
433
src/jinjaturtle/loop_analyzer.py
Normal file
433
src/jinjaturtle/loop_analyzer.py
Normal file
|
|
@ -0,0 +1,433 @@
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from collections import Counter
|
||||||
|
from typing import Any, Literal
|
||||||
|
|
||||||
|
|
||||||
|
class LoopCandidate:
|
||||||
|
"""
|
||||||
|
Represents a detected loop opportunity in the config structure.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
path: Path to the collection (e.g. ("servers",) or ("config", "endpoints"))
|
||||||
|
loop_var: Variable name for loop items (e.g. "server", "endpoint")
|
||||||
|
items: The actual list/dict items that will be looped over
|
||||||
|
item_schema: Structure of each item ("scalar", "simple_dict", "nested")
|
||||||
|
confidence: How confident we are this should be a loop (0.0 to 1.0)
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
path: tuple[str, ...],
|
||||||
|
loop_var: str,
|
||||||
|
items: list[Any] | dict[str, Any],
|
||||||
|
item_schema: Literal["scalar", "simple_dict", "nested"],
|
||||||
|
confidence: float = 1.0,
|
||||||
|
):
|
||||||
|
self.path = path
|
||||||
|
self.loop_var = loop_var
|
||||||
|
self.items = items
|
||||||
|
self.item_schema = item_schema
|
||||||
|
self.confidence = confidence
|
||||||
|
|
||||||
|
def __repr__(self) -> str:
|
||||||
|
path_str = ".".join(self.path) if self.path else "<root>"
|
||||||
|
return (
|
||||||
|
f"LoopCandidate(path={path_str}, var={self.loop_var}, "
|
||||||
|
f"count={len(self.items)}, schema={self.item_schema}, "
|
||||||
|
f"confidence={self.confidence:.2f})"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class LoopAnalyzer:
|
||||||
|
"""
|
||||||
|
Analyzes parsed config structures to detect loop opportunities.
|
||||||
|
|
||||||
|
Strategy:
|
||||||
|
1. Detect homogeneous lists (all items same type/structure)
|
||||||
|
2. Detect dict collections where all values have similar structure
|
||||||
|
3. Assign confidence scores based on:
|
||||||
|
- Homogeneity of items
|
||||||
|
- Number of items (2+ for loops to make sense)
|
||||||
|
- Depth and complexity (too nested -> fallback to scalars)
|
||||||
|
- Structural patterns (e.g., repeated XML elements)
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Configuration thresholds
|
||||||
|
MIN_ITEMS_FOR_LOOP = 2 # Need at least 2 items to justify a loop
|
||||||
|
MAX_NESTING_DEPTH = 3 # Beyond this, use scalar fallback
|
||||||
|
MIN_CONFIDENCE = 0.7 # Minimum confidence to use a loop
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.candidates: list[LoopCandidate] = []
|
||||||
|
|
||||||
|
def analyze(self, parsed: Any, fmt: str) -> list[LoopCandidate]:
|
||||||
|
"""
|
||||||
|
Analyze a parsed config structure and return loop candidates.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
parsed: The parsed config (dict, list, or ET.Element for XML)
|
||||||
|
fmt: Format type ("yaml", "json", "toml", "xml", "ini")
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of LoopCandidate objects, sorted by path depth (shallowest first)
|
||||||
|
"""
|
||||||
|
self.candidates = []
|
||||||
|
|
||||||
|
if fmt == "xml":
|
||||||
|
self._analyze_xml(parsed)
|
||||||
|
elif fmt in ("yaml", "json", "toml"):
|
||||||
|
self._analyze_dict_like(parsed, path=())
|
||||||
|
# INI files are typically flat key-value, not suitable for loops
|
||||||
|
|
||||||
|
# Sort by path depth (process parent structures before children)
|
||||||
|
self.candidates.sort(key=lambda c: len(c.path))
|
||||||
|
return self.candidates
|
||||||
|
|
||||||
|
def _analyze_dict_like(
|
||||||
|
self, obj: Any, path: tuple[str, ...], depth: int = 0
|
||||||
|
) -> None:
|
||||||
|
"""Recursively analyze dict/list structures."""
|
||||||
|
|
||||||
|
# Safety: don't go too deep
|
||||||
|
if depth > self.MAX_NESTING_DEPTH:
|
||||||
|
return
|
||||||
|
|
||||||
|
if isinstance(obj, dict):
|
||||||
|
# Check if this dict's values form a homogeneous collection
|
||||||
|
if len(obj) >= self.MIN_ITEMS_FOR_LOOP:
|
||||||
|
candidate = self._check_dict_collection(obj, path)
|
||||||
|
if candidate:
|
||||||
|
self.candidates.append(candidate)
|
||||||
|
# Don't recurse into items we've marked as a loop
|
||||||
|
return
|
||||||
|
|
||||||
|
# Recurse into dict values
|
||||||
|
for key, value in obj.items():
|
||||||
|
self._analyze_dict_like(value, path + (str(key),), depth + 1)
|
||||||
|
|
||||||
|
elif isinstance(obj, list):
|
||||||
|
# Check if this list is homogeneous
|
||||||
|
if len(obj) >= self.MIN_ITEMS_FOR_LOOP:
|
||||||
|
candidate = self._check_list_collection(obj, path)
|
||||||
|
if candidate:
|
||||||
|
self.candidates.append(candidate)
|
||||||
|
# Don't recurse into items we've marked as a loop
|
||||||
|
return
|
||||||
|
|
||||||
|
# If not a good loop candidate, recurse into items
|
||||||
|
for i, item in enumerate(obj):
|
||||||
|
self._analyze_dict_like(item, path + (str(i),), depth + 1)
|
||||||
|
|
||||||
|
def _check_list_collection(
|
||||||
|
self, items: list[Any], path: tuple[str, ...]
|
||||||
|
) -> LoopCandidate | None:
|
||||||
|
"""Check if a list should be a loop."""
|
||||||
|
|
||||||
|
if not items:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Analyze item types and structures
|
||||||
|
item_types = [type(item).__name__ for item in items]
|
||||||
|
type_counts = Counter(item_types)
|
||||||
|
|
||||||
|
# Must be homogeneous (all same type)
|
||||||
|
if len(type_counts) != 1:
|
||||||
|
return None
|
||||||
|
|
||||||
|
item_type = item_types[0]
|
||||||
|
|
||||||
|
# Scalar list (strings, numbers, bools)
|
||||||
|
if item_type in ("str", "int", "float", "bool", "NoneType"):
|
||||||
|
return LoopCandidate(
|
||||||
|
path=path,
|
||||||
|
loop_var=self._derive_loop_var(path, singular=True),
|
||||||
|
items=items,
|
||||||
|
item_schema="scalar",
|
||||||
|
confidence=1.0,
|
||||||
|
)
|
||||||
|
|
||||||
|
# List of dicts - check structural homogeneity
|
||||||
|
if item_type == "dict":
|
||||||
|
schema = self._analyze_dict_schema(items)
|
||||||
|
if schema == "simple_dict":
|
||||||
|
return LoopCandidate(
|
||||||
|
path=path,
|
||||||
|
loop_var=self._derive_loop_var(path, singular=True),
|
||||||
|
items=items,
|
||||||
|
item_schema="simple_dict",
|
||||||
|
confidence=0.95,
|
||||||
|
)
|
||||||
|
elif schema == "homogeneous":
|
||||||
|
return LoopCandidate(
|
||||||
|
path=path,
|
||||||
|
loop_var=self._derive_loop_var(path, singular=True),
|
||||||
|
items=items,
|
||||||
|
item_schema="simple_dict",
|
||||||
|
confidence=0.85,
|
||||||
|
)
|
||||||
|
# If too complex/heterogeneous, return None (use scalar fallback)
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _check_dict_collection(
|
||||||
|
self, obj: dict[str, Any], path: tuple[str, ...]
|
||||||
|
) -> LoopCandidate | None:
|
||||||
|
"""
|
||||||
|
Check if a dict's values form a collection suitable for looping.
|
||||||
|
|
||||||
|
Example: {"server1": {...}, "server2": {...}} where all values
|
||||||
|
have the same structure.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not obj:
|
||||||
|
return None
|
||||||
|
|
||||||
|
values = list(obj.values())
|
||||||
|
|
||||||
|
# Check type homogeneity
|
||||||
|
value_types = [type(v).__name__ for v in values]
|
||||||
|
type_counts = Counter(value_types)
|
||||||
|
|
||||||
|
if len(type_counts) != 1:
|
||||||
|
return None
|
||||||
|
|
||||||
|
value_type = value_types[0]
|
||||||
|
|
||||||
|
# Only interested in dict values for dict collections
|
||||||
|
# (scalar-valued dicts stay as scalars)
|
||||||
|
if value_type != "dict":
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Check structural homogeneity
|
||||||
|
schema = self._analyze_dict_schema(values)
|
||||||
|
if schema in ("simple_dict", "homogeneous"):
|
||||||
|
confidence = 0.9 if schema == "simple_dict" else 0.8
|
||||||
|
|
||||||
|
# Convert dict to list of items with 'key' added
|
||||||
|
items_with_keys = [{"_key": k, **v} for k, v in obj.items()]
|
||||||
|
|
||||||
|
return LoopCandidate(
|
||||||
|
path=path,
|
||||||
|
loop_var=self._derive_loop_var(path, singular=True),
|
||||||
|
items=items_with_keys,
|
||||||
|
item_schema="simple_dict",
|
||||||
|
confidence=confidence,
|
||||||
|
)
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _analyze_dict_schema(
|
||||||
|
self, dicts: list[dict[str, Any]]
|
||||||
|
) -> Literal["simple_dict", "homogeneous", "heterogeneous"]:
|
||||||
|
"""
|
||||||
|
Analyze a list of dicts to determine their structural homogeneity.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
"simple_dict": All dicts have same keys, all values are scalars
|
||||||
|
"homogeneous": All dicts have same keys, may have nested structures
|
||||||
|
"heterogeneous": Dicts have different structures
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not dicts:
|
||||||
|
return "heterogeneous"
|
||||||
|
|
||||||
|
# Get key sets from each dict
|
||||||
|
key_sets = [set(d.keys()) for d in dicts]
|
||||||
|
|
||||||
|
# Check if all have the same keys
|
||||||
|
first_keys = key_sets[0]
|
||||||
|
if not all(ks == first_keys for ks in key_sets):
|
||||||
|
# Allow minor variations (80% key overlap)
|
||||||
|
all_keys = set().union(*key_sets)
|
||||||
|
common_keys = set.intersection(*key_sets)
|
||||||
|
if len(common_keys) / len(all_keys) < 0.8:
|
||||||
|
return "heterogeneous"
|
||||||
|
|
||||||
|
# Check if values are all scalars
|
||||||
|
all_scalars = True
|
||||||
|
for d in dicts:
|
||||||
|
for v in d.values():
|
||||||
|
if isinstance(v, (dict, list)):
|
||||||
|
all_scalars = False
|
||||||
|
break
|
||||||
|
if not all_scalars:
|
||||||
|
break
|
||||||
|
|
||||||
|
if all_scalars:
|
||||||
|
return "simple_dict"
|
||||||
|
else:
|
||||||
|
return "homogeneous"
|
||||||
|
|
||||||
|
def _derive_loop_var(self, path: tuple[str, ...], singular: bool = True) -> str:
|
||||||
|
"""
|
||||||
|
Derive a sensible loop variable name from the path.
|
||||||
|
|
||||||
|
Examples:
|
||||||
|
("servers",) -> "server" (singular)
|
||||||
|
("config", "endpoints") -> "endpoint"
|
||||||
|
("users",) -> "user"
|
||||||
|
("databases",) -> "database"
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not path:
|
||||||
|
return "item"
|
||||||
|
|
||||||
|
last_part = path[-1].lower()
|
||||||
|
|
||||||
|
if singular:
|
||||||
|
# Simple English pluralization rules (order matters - most specific first)
|
||||||
|
if last_part.endswith("sses"):
|
||||||
|
return last_part[:-2] # "classes" -> "class"
|
||||||
|
elif last_part.endswith("xes"):
|
||||||
|
return last_part[:-2] # "boxes" -> "box"
|
||||||
|
elif last_part.endswith("ches"):
|
||||||
|
return last_part[:-2] # "watches" -> "watch"
|
||||||
|
elif last_part.endswith("shes"):
|
||||||
|
return last_part[:-2] # "dishes" -> "dish"
|
||||||
|
elif last_part.endswith("ies"):
|
||||||
|
return last_part[:-3] + "y" # "entries" -> "entry"
|
||||||
|
elif last_part.endswith("oes"):
|
||||||
|
return last_part[:-2] # "tomatoes" -> "tomato"
|
||||||
|
elif last_part.endswith("ses") and not last_part.endswith("sses"):
|
||||||
|
# Only for words ending in "se": "databases" -> "database"
|
||||||
|
# But NOT for "sses" which we already handled
|
||||||
|
if len(last_part) > 3 and last_part[-4] not in "aeiou":
|
||||||
|
# "databases" -> "database" (consonant before 's')
|
||||||
|
return last_part[:-1]
|
||||||
|
else:
|
||||||
|
# "houses" -> "house", "causes" -> "cause"
|
||||||
|
return last_part[:-1]
|
||||||
|
elif last_part.endswith("s") and not last_part.endswith("ss"):
|
||||||
|
return last_part[:-1] # "servers" -> "server"
|
||||||
|
|
||||||
|
return last_part
|
||||||
|
|
||||||
|
def _analyze_xml(self, root: Any) -> None:
|
||||||
|
"""
|
||||||
|
Analyze XML structure for loop opportunities.
|
||||||
|
|
||||||
|
XML is particularly suited for loops when we have repeated sibling elements.
|
||||||
|
"""
|
||||||
|
import xml.etree.ElementTree as ET
|
||||||
|
|
||||||
|
if not isinstance(root, ET.Element):
|
||||||
|
return
|
||||||
|
|
||||||
|
self._walk_xml_element(root, path=())
|
||||||
|
|
||||||
|
def _walk_xml_element(self, elem: Any, path: tuple[str, ...]) -> None:
|
||||||
|
"""Recursively walk XML elements looking for repeated siblings."""
|
||||||
|
import xml.etree.ElementTree as ET
|
||||||
|
|
||||||
|
children = [c for c in list(elem) if isinstance(c.tag, str)]
|
||||||
|
|
||||||
|
# Count sibling elements by tag
|
||||||
|
tag_counts = Counter(child.tag for child in children)
|
||||||
|
|
||||||
|
# Find repeated tags
|
||||||
|
for tag, count in tag_counts.items():
|
||||||
|
if count >= self.MIN_ITEMS_FOR_LOOP:
|
||||||
|
# Get all elements with this tag
|
||||||
|
tagged_elements = [c for c in children if c.tag == tag]
|
||||||
|
|
||||||
|
# Check homogeneity
|
||||||
|
if self._are_xml_elements_homogeneous(tagged_elements):
|
||||||
|
# Convert to dict representation for easier handling
|
||||||
|
items = [self._xml_elem_to_dict(el) for el in tagged_elements]
|
||||||
|
|
||||||
|
# Determine schema
|
||||||
|
if all(self._is_scalar_dict(item) for item in items):
|
||||||
|
schema = "simple_dict"
|
||||||
|
confidence = 1.0
|
||||||
|
else:
|
||||||
|
schema = "nested"
|
||||||
|
confidence = 0.8
|
||||||
|
|
||||||
|
candidate = LoopCandidate(
|
||||||
|
path=path + (tag,),
|
||||||
|
loop_var=self._derive_loop_var((tag,), singular=True),
|
||||||
|
items=items,
|
||||||
|
item_schema=schema,
|
||||||
|
confidence=confidence,
|
||||||
|
)
|
||||||
|
self.candidates.append(candidate)
|
||||||
|
|
||||||
|
# Recurse into unique children (non-repeated ones will be processed normally)
|
||||||
|
for tag, count in tag_counts.items():
|
||||||
|
if count == 1:
|
||||||
|
child = next(c for c in children if c.tag == tag)
|
||||||
|
self._walk_xml_element(child, path + (tag,))
|
||||||
|
|
||||||
|
def _are_xml_elements_homogeneous(self, elements: list[Any]) -> bool:
|
||||||
|
"""Check if XML elements have similar structure."""
|
||||||
|
|
||||||
|
if not elements:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Compare attribute sets
|
||||||
|
attr_sets = [set(el.attrib.keys()) for el in elements]
|
||||||
|
first_attrs = attr_sets[0]
|
||||||
|
|
||||||
|
if not all(attrs == first_attrs for attrs in attr_sets):
|
||||||
|
# Allow some variation
|
||||||
|
all_attrs = set().union(*attr_sets)
|
||||||
|
common_attrs = set.intersection(*attr_sets) if attr_sets else set()
|
||||||
|
if len(common_attrs) / max(len(all_attrs), 1) < 0.7:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Compare child element tags
|
||||||
|
child_tag_sets = [
|
||||||
|
set(c.tag for c in el if hasattr(c, "tag")) for el in elements
|
||||||
|
]
|
||||||
|
|
||||||
|
if child_tag_sets:
|
||||||
|
first_tags = child_tag_sets[0]
|
||||||
|
if not all(tags == first_tags for tags in child_tag_sets):
|
||||||
|
# Allow some variation
|
||||||
|
all_tags = set().union(*child_tag_sets)
|
||||||
|
common_tags = (
|
||||||
|
set.intersection(*child_tag_sets) if child_tag_sets else set()
|
||||||
|
)
|
||||||
|
if len(common_tags) / max(len(all_tags), 1) < 0.7:
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _xml_elem_to_dict(self, elem: Any) -> dict[str, Any]:
|
||||||
|
"""Convert an XML element to a dict representation."""
|
||||||
|
result: dict[str, Any] = {}
|
||||||
|
|
||||||
|
# Add attributes
|
||||||
|
for attr_name, attr_val in elem.attrib.items():
|
||||||
|
result[f"@{attr_name}"] = attr_val
|
||||||
|
|
||||||
|
# Add text content
|
||||||
|
text = (elem.text or "").strip()
|
||||||
|
if text:
|
||||||
|
children = [c for c in list(elem) if hasattr(c, "tag")]
|
||||||
|
if not elem.attrib and not children:
|
||||||
|
result["_text"] = text
|
||||||
|
else:
|
||||||
|
result["value"] = text
|
||||||
|
|
||||||
|
# Add child elements
|
||||||
|
for child in elem:
|
||||||
|
if hasattr(child, "tag"):
|
||||||
|
child_dict = self._xml_elem_to_dict(child)
|
||||||
|
if child.tag in result:
|
||||||
|
# Multiple children with same tag - convert to list
|
||||||
|
if not isinstance(result[child.tag], list):
|
||||||
|
result[child.tag] = [result[child.tag]]
|
||||||
|
result[child.tag].append(child_dict)
|
||||||
|
else:
|
||||||
|
result[child.tag] = child_dict
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def _is_scalar_dict(self, obj: dict[str, Any]) -> bool:
|
||||||
|
"""Check if a dict contains only scalar values (no nested dicts/lists)."""
|
||||||
|
for v in obj.values():
|
||||||
|
if isinstance(v, (dict, list)):
|
||||||
|
return False
|
||||||
|
return True
|
||||||
Loading…
Add table
Add a link
Reference in a new issue