Refactor and add much more robust tests (both automated and manual) to ensure loops and things work ok
This commit is contained in:
parent
3af628e22e
commit
d7c71f6349
17 changed files with 2126 additions and 91 deletions
|
|
@ -3,6 +3,7 @@ from __future__ import annotations
|
|||
from pathlib import Path
|
||||
from typing import Any, Iterable
|
||||
|
||||
import datetime
|
||||
import yaml
|
||||
|
||||
from .loop_analyzer import LoopAnalyzer, LoopCandidate
|
||||
|
|
@ -100,6 +101,9 @@ def parse_config(path: Path, fmt: str | None = None) -> tuple[str, Any]:
|
|||
if handler is None:
|
||||
raise ValueError(f"Unsupported config format: {fmt}")
|
||||
parsed = handler.parse(path)
|
||||
# Make sure datetime objects are treated as strings (TOML, YAML)
|
||||
parsed = _stringify_timestamps(parsed)
|
||||
|
||||
return fmt, parsed
|
||||
|
||||
|
||||
|
|
@ -158,17 +162,6 @@ def _path_starts_with(path: tuple[str, ...], prefix: tuple[str, ...]) -> bool:
|
|||
return path[: len(prefix)] == prefix
|
||||
|
||||
|
||||
def _normalize_default_value(value: Any) -> Any:
|
||||
"""
|
||||
Ensure that 'true' / 'false' end up as quoted strings in YAML.
|
||||
"""
|
||||
if isinstance(value, bool):
|
||||
return QuotedString("true" if value else "false")
|
||||
if isinstance(value, str) and value.lower() in {"true", "false"}:
|
||||
return QuotedString(value)
|
||||
return value
|
||||
|
||||
|
||||
def generate_ansible_yaml(
|
||||
role_prefix: str,
|
||||
flat_items: list[tuple[tuple[str, ...], Any]],
|
||||
|
|
@ -182,7 +175,7 @@ def generate_ansible_yaml(
|
|||
# Add scalar variables
|
||||
for path, value in flat_items:
|
||||
var_name = make_var_name(role_prefix, path)
|
||||
defaults[var_name] = _normalize_default_value(value)
|
||||
defaults[var_name] = value # No normalization - keep original types
|
||||
|
||||
# Add loop collections
|
||||
if loop_candidates:
|
||||
|
|
@ -226,3 +219,29 @@ def generate_jinja2_template(
|
|||
return handler.generate_jinja2_template(
|
||||
parsed, role_prefix, original_text=original_text
|
||||
)
|
||||
|
||||
|
||||
def _stringify_timestamps(obj: Any) -> Any:
|
||||
"""
|
||||
Recursively walk a parsed config and turn any datetime/date/time objects
|
||||
into plain strings in ISO-8601 form.
|
||||
|
||||
This prevents Python datetime objects from leaking into YAML/Jinja, which
|
||||
would otherwise reformat the value (e.g. replacing 'T' with a space).
|
||||
|
||||
This commonly occurs otherwise with TOML and YAML files, which sees
|
||||
Python automatically convert those sorts of strings into datetime objects.
|
||||
"""
|
||||
if isinstance(obj, dict):
|
||||
return {k: _stringify_timestamps(v) for k, v in obj.items()}
|
||||
if isinstance(obj, list):
|
||||
return [_stringify_timestamps(v) for v in obj]
|
||||
|
||||
# TOML & YAML both use the standard datetime types
|
||||
if isinstance(obj, datetime.datetime):
|
||||
# Use default ISO-8601: 'YYYY-MM-DDTHH:MM:SS±HH:MM' (with 'T')
|
||||
return obj.isoformat()
|
||||
if isinstance(obj, (datetime.date, datetime.time)):
|
||||
return obj.isoformat()
|
||||
|
||||
return obj
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ class IniHandler(BaseHandler):
|
|||
|
||||
def parse(self, path: Path) -> configparser.ConfigParser:
|
||||
parser = configparser.ConfigParser()
|
||||
parser.optionxform = str # preserve key case
|
||||
parser.optionxform = str # noqa
|
||||
with path.open("r", encoding="utf-8") as f:
|
||||
parser.read_file(f)
|
||||
return parser
|
||||
|
|
|
|||
|
|
@ -1,10 +1,12 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from . import DictLikeHandler
|
||||
from ..loop_analyzer import LoopCandidate
|
||||
|
||||
|
||||
class JsonHandler(DictLikeHandler):
|
||||
|
|
@ -21,17 +23,38 @@ class JsonHandler(DictLikeHandler):
|
|||
role_prefix: str,
|
||||
original_text: str | None = None,
|
||||
) -> str:
|
||||
"""Original scalar-only template generation."""
|
||||
if not isinstance(parsed, (dict, list)):
|
||||
raise TypeError("JSON parser result must be a dict or list")
|
||||
# As before: ignore original_text and rebuild structurally
|
||||
return self._generate_json_template(role_prefix, parsed)
|
||||
|
||||
def generate_jinja2_template_with_loops(
|
||||
self,
|
||||
parsed: Any,
|
||||
role_prefix: str,
|
||||
original_text: str | None,
|
||||
loop_candidates: list[LoopCandidate],
|
||||
) -> str:
|
||||
"""Generate template with Jinja2 for loops where appropriate."""
|
||||
if not isinstance(parsed, (dict, list)):
|
||||
raise TypeError("JSON parser result must be a dict or list")
|
||||
|
||||
# Build loop path set for quick lookup
|
||||
loop_paths = {candidate.path for candidate in loop_candidates}
|
||||
|
||||
return self._generate_json_template_with_loops(
|
||||
role_prefix, parsed, loop_paths, loop_candidates
|
||||
)
|
||||
|
||||
def _generate_json_template(self, role_prefix: str, data: Any) -> str:
|
||||
"""
|
||||
Generate a JSON Jinja2 template from parsed JSON data.
|
||||
|
||||
All scalar values are replaced with Jinja expressions whose names are
|
||||
derived from the path, similar to TOML/YAML.
|
||||
|
||||
Uses | tojson filter to preserve types (numbers, booleans, null).
|
||||
"""
|
||||
|
||||
def _walk(obj: Any, path: tuple[str, ...] = ()) -> Any:
|
||||
|
|
@ -39,9 +62,130 @@ class JsonHandler(DictLikeHandler):
|
|||
return {k: _walk(v, path + (str(k),)) for k, v in obj.items()}
|
||||
if isinstance(obj, list):
|
||||
return [_walk(v, path + (str(i),)) for i, v in enumerate(obj)]
|
||||
# scalar
|
||||
# scalar - use marker that will be replaced with tojson
|
||||
var_name = self.make_var_name(role_prefix, path)
|
||||
return f"{{{{ {var_name} }}}}"
|
||||
return f"__SCALAR__{var_name}__"
|
||||
|
||||
templated = _walk(data)
|
||||
return json.dumps(templated, indent=2, ensure_ascii=False) + "\n"
|
||||
json_str = json.dumps(templated, indent=2, ensure_ascii=False)
|
||||
|
||||
# Replace scalar markers with Jinja expressions using tojson filter
|
||||
# This preserves types (numbers stay numbers, booleans stay booleans)
|
||||
json_str = re.sub(
|
||||
r'"__SCALAR__([a-zA-Z_][a-zA-Z0-9_]*)__"', r"{{ \1 | tojson }}", json_str
|
||||
)
|
||||
|
||||
return json_str + "\n"
|
||||
|
||||
def _generate_json_template_with_loops(
|
||||
self,
|
||||
role_prefix: str,
|
||||
data: Any,
|
||||
loop_paths: set[tuple[str, ...]],
|
||||
loop_candidates: list[LoopCandidate],
|
||||
path: tuple[str, ...] = (),
|
||||
) -> str:
|
||||
"""
|
||||
Generate a JSON Jinja2 template with for loops where appropriate.
|
||||
"""
|
||||
|
||||
def _walk(obj: Any, current_path: tuple[str, ...] = ()) -> Any:
|
||||
# Check if this path is a loop candidate
|
||||
if current_path in loop_paths:
|
||||
# Find the matching candidate
|
||||
candidate = next(c for c in loop_candidates if c.path == current_path)
|
||||
collection_var = self.make_var_name(role_prefix, candidate.path)
|
||||
item_var = candidate.loop_var
|
||||
|
||||
if candidate.item_schema == "scalar":
|
||||
# Simple list of scalars - use special marker that we'll replace
|
||||
return f"__LOOP_SCALAR__{collection_var}__{item_var}__"
|
||||
elif candidate.item_schema in ("simple_dict", "nested"):
|
||||
# List of dicts - use special marker
|
||||
return f"__LOOP_DICT__{collection_var}__{item_var}__"
|
||||
|
||||
if isinstance(obj, dict):
|
||||
return {k: _walk(v, current_path + (str(k),)) for k, v in obj.items()}
|
||||
if isinstance(obj, list):
|
||||
# Check if this list is a loop candidate
|
||||
if current_path in loop_paths:
|
||||
# Already handled above
|
||||
return _walk(obj, current_path)
|
||||
return [_walk(v, current_path + (str(i),)) for i, v in enumerate(obj)]
|
||||
|
||||
# scalar - use marker to preserve type
|
||||
var_name = self.make_var_name(role_prefix, current_path)
|
||||
return f"__SCALAR__{var_name}__"
|
||||
|
||||
templated = _walk(data, path)
|
||||
|
||||
# Convert to JSON string
|
||||
json_str = json.dumps(templated, indent=2, ensure_ascii=False)
|
||||
|
||||
# Replace scalar markers with Jinja expressions using tojson filter
|
||||
json_str = re.sub(
|
||||
r'"__SCALAR__([a-zA-Z_][a-zA-Z0-9_]*)__"', r"{{ \1 | tojson }}", json_str
|
||||
)
|
||||
|
||||
# Post-process to replace loop markers with actual Jinja loops
|
||||
for candidate in loop_candidates:
|
||||
collection_var = self.make_var_name(role_prefix, candidate.path)
|
||||
item_var = candidate.loop_var
|
||||
|
||||
if candidate.item_schema == "scalar":
|
||||
# Replace scalar loop marker with Jinja for loop
|
||||
marker = f'"__LOOP_SCALAR__{collection_var}__{item_var}__"'
|
||||
replacement = self._generate_json_scalar_loop(
|
||||
collection_var, item_var, candidate
|
||||
)
|
||||
json_str = json_str.replace(marker, replacement)
|
||||
|
||||
elif candidate.item_schema in ("simple_dict", "nested"):
|
||||
# Replace dict loop marker with Jinja for loop
|
||||
marker = f'"__LOOP_DICT__{collection_var}__{item_var}__"'
|
||||
replacement = self._generate_json_dict_loop(
|
||||
collection_var, item_var, candidate
|
||||
)
|
||||
json_str = json_str.replace(marker, replacement)
|
||||
|
||||
return json_str + "\n"
|
||||
|
||||
def _generate_json_scalar_loop(
|
||||
self, collection_var: str, item_var: str, candidate: LoopCandidate
|
||||
) -> str:
|
||||
"""Generate a Jinja for loop for a scalar list in JSON."""
|
||||
# Use tojson filter to properly handle strings (quotes them) and other types
|
||||
# Include array brackets around the loop
|
||||
return (
|
||||
f"[{{% for {item_var} in {collection_var} %}}"
|
||||
f"{{{{ {item_var} | tojson }}}}"
|
||||
f"{{% if not loop.last %}}, {{% endif %}}"
|
||||
f"{{% endfor %}}]"
|
||||
)
|
||||
|
||||
def _generate_json_dict_loop(
|
||||
self, collection_var: str, item_var: str, candidate: LoopCandidate
|
||||
) -> str:
|
||||
"""Generate a Jinja for loop for a dict list in JSON."""
|
||||
if not candidate.items:
|
||||
return "[]"
|
||||
|
||||
# Get first item as template
|
||||
sample_item = candidate.items[0]
|
||||
|
||||
# Build the dict template - use tojson for all values to handle types correctly
|
||||
fields = []
|
||||
for key, value in sample_item.items():
|
||||
if key == "_key":
|
||||
continue
|
||||
# Use tojson filter to properly serialize all types (strings, numbers, booleans)
|
||||
fields.append(f'"{key}": {{{{ {item_var}.{key} | tojson }}}}')
|
||||
|
||||
dict_template = "{" + ", ".join(fields) + "}"
|
||||
|
||||
return (
|
||||
f"{{% for {item_var} in {collection_var} %}}"
|
||||
f"{dict_template}"
|
||||
f"{{% if not loop.last %}}, {{% endif %}}"
|
||||
f"{{% endfor %}}"
|
||||
)
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ from pathlib import Path
|
|||
from typing import Any
|
||||
|
||||
from . import DictLikeHandler
|
||||
from ..loop_analyzer import LoopCandidate
|
||||
|
||||
|
||||
class TomlHandler(DictLikeHandler):
|
||||
|
|
@ -25,12 +26,31 @@ class TomlHandler(DictLikeHandler):
|
|||
role_prefix: str,
|
||||
original_text: str | None = None,
|
||||
) -> str:
|
||||
"""Original scalar-only template generation."""
|
||||
if original_text is not None:
|
||||
return self._generate_toml_template_from_text(role_prefix, original_text)
|
||||
if not isinstance(parsed, dict):
|
||||
raise TypeError("TOML parser result must be a dict")
|
||||
return self._generate_toml_template(role_prefix, parsed)
|
||||
|
||||
def generate_jinja2_template_with_loops(
|
||||
self,
|
||||
parsed: Any,
|
||||
role_prefix: str,
|
||||
original_text: str | None,
|
||||
loop_candidates: list[LoopCandidate],
|
||||
) -> str:
|
||||
"""Generate template with Jinja2 for loops where appropriate."""
|
||||
if original_text is not None:
|
||||
return self._generate_toml_template_with_loops_from_text(
|
||||
role_prefix, original_text, loop_candidates
|
||||
)
|
||||
if not isinstance(parsed, dict):
|
||||
raise TypeError("TOML parser result must be a dict")
|
||||
return self._generate_toml_template_with_loops(
|
||||
role_prefix, parsed, loop_candidates
|
||||
)
|
||||
|
||||
def _generate_toml_template(self, role_prefix: str, data: dict[str, Any]) -> str:
|
||||
"""
|
||||
Generate a TOML Jinja2 template from parsed TOML dict.
|
||||
|
|
@ -45,6 +65,89 @@ class TomlHandler(DictLikeHandler):
|
|||
var_name = self.make_var_name(role_prefix, path + (key,))
|
||||
if isinstance(value, str):
|
||||
lines.append(f'{key} = "{{{{ {var_name} }}}}"')
|
||||
elif isinstance(value, bool):
|
||||
# Booleans need | lower filter (Python True/False → TOML true/false)
|
||||
lines.append(f"{key} = {{{{ {var_name} | lower }}}}")
|
||||
else:
|
||||
lines.append(f"{key} = {{{{ {var_name} }}}}")
|
||||
|
||||
def walk(obj: dict[str, Any], path: tuple[str, ...] = ()) -> None:
|
||||
scalar_items = {k: v for k, v in obj.items() if not isinstance(v, dict)}
|
||||
nested_items = {k: v for k, v in obj.items() if isinstance(v, dict)}
|
||||
|
||||
if path:
|
||||
header = ".".join(path)
|
||||
lines.append(f"[{header}]")
|
||||
|
||||
for key, val in scalar_items.items():
|
||||
emit_kv(path, str(key), val)
|
||||
|
||||
if scalar_items:
|
||||
lines.append("")
|
||||
|
||||
for key, val in nested_items.items():
|
||||
walk(val, path + (str(key),))
|
||||
|
||||
# Root scalars (no table header)
|
||||
root_scalars = {k: v for k, v in data.items() if not isinstance(v, dict)}
|
||||
for key, val in root_scalars.items():
|
||||
emit_kv((), str(key), val)
|
||||
if root_scalars:
|
||||
lines.append("")
|
||||
|
||||
# Tables
|
||||
for key, val in data.items():
|
||||
if isinstance(val, dict):
|
||||
walk(val, (str(key),))
|
||||
|
||||
return "\n".join(lines).rstrip() + "\n"
|
||||
|
||||
def _generate_toml_template_with_loops(
|
||||
self,
|
||||
role_prefix: str,
|
||||
data: dict[str, Any],
|
||||
loop_candidates: list[LoopCandidate],
|
||||
) -> str:
|
||||
"""
|
||||
Generate a TOML Jinja2 template with for loops where appropriate.
|
||||
"""
|
||||
lines: list[str] = []
|
||||
loop_paths = {candidate.path for candidate in loop_candidates}
|
||||
|
||||
def emit_kv(path: tuple[str, ...], key: str, value: Any) -> None:
|
||||
var_name = self.make_var_name(role_prefix, path + (key,))
|
||||
if isinstance(value, str):
|
||||
lines.append(f'{key} = "{{{{ {var_name} }}}}"')
|
||||
elif isinstance(value, bool):
|
||||
# Booleans need | lower filter (Python True/False → TOML true/false)
|
||||
lines.append(f"{key} = {{{{ {var_name} | lower }}}}")
|
||||
elif isinstance(value, list):
|
||||
# Check if this list is a loop candidate
|
||||
if path + (key,) in loop_paths:
|
||||
# Find the matching candidate
|
||||
candidate = next(
|
||||
c for c in loop_candidates if c.path == path + (key,)
|
||||
)
|
||||
collection_var = self.make_var_name(role_prefix, candidate.path)
|
||||
item_var = candidate.loop_var
|
||||
|
||||
if candidate.item_schema == "scalar":
|
||||
# Scalar list loop
|
||||
lines.append(
|
||||
f"{key} = ["
|
||||
f"{{% for {item_var} in {collection_var} %}}"
|
||||
f"{{{{ {item_var} }}}}"
|
||||
f"{{% if not loop.last %}}, {{% endif %}}"
|
||||
f"{{% endfor %}}"
|
||||
f"]"
|
||||
)
|
||||
elif candidate.item_schema in ("simple_dict", "nested"):
|
||||
# Dict list loop - TOML array of tables
|
||||
# This is complex for TOML, using simplified approach
|
||||
lines.append(f"{key} = {{{{ {var_name} | tojson }}}}")
|
||||
else:
|
||||
# Not a loop, treat as regular variable
|
||||
lines.append(f"{key} = {{{{ {var_name} }}}}")
|
||||
else:
|
||||
lines.append(f"{key} = {{{{ {var_name} }}}}")
|
||||
|
||||
|
|
@ -173,6 +276,236 @@ class TomlHandler(DictLikeHandler):
|
|||
nested_var = self.make_var_name(role_prefix, nested_path)
|
||||
if isinstance(sub_val, str):
|
||||
inner_bits.append(f'{sub_key} = "{{{{ {nested_var} }}}}"')
|
||||
elif isinstance(sub_val, bool):
|
||||
inner_bits.append(
|
||||
f"{sub_key} = {{{{ {nested_var} | lower }}}}"
|
||||
)
|
||||
else:
|
||||
inner_bits.append(f"{sub_key} = {{{ {nested_var} }}}")
|
||||
replacement_value = "{ " + ", ".join(inner_bits) + " }"
|
||||
new_content = (
|
||||
before_eq + "=" + leading_ws + replacement_value + comment_part
|
||||
)
|
||||
out_lines.append(new_content + newline)
|
||||
continue
|
||||
# If parsing fails, fall through to normal handling
|
||||
|
||||
# Normal scalar value handling (including bools, numbers, strings)
|
||||
var_name = self.make_var_name(role_prefix, path)
|
||||
use_quotes = (
|
||||
len(raw_value) >= 2
|
||||
and raw_value[0] == raw_value[-1]
|
||||
and raw_value[0] in {'"', "'"}
|
||||
)
|
||||
|
||||
# Check if value is a boolean in the text
|
||||
is_bool = raw_value.strip().lower() in ("true", "false")
|
||||
|
||||
if use_quotes:
|
||||
quote_char = raw_value[0]
|
||||
replacement_value = f"{quote_char}{{{{ {var_name} }}}}{quote_char}"
|
||||
elif is_bool:
|
||||
replacement_value = f"{{{{ {var_name} | lower }}}}"
|
||||
else:
|
||||
replacement_value = f"{{{{ {var_name} }}}}"
|
||||
|
||||
new_content = (
|
||||
before_eq + "=" + leading_ws + replacement_value + comment_part
|
||||
)
|
||||
out_lines.append(new_content + newline)
|
||||
|
||||
return "".join(out_lines)
|
||||
|
||||
def _generate_toml_template_with_loops_from_text(
|
||||
self, role_prefix: str, text: str, loop_candidates: list[LoopCandidate]
|
||||
) -> str:
|
||||
"""
|
||||
Generate a Jinja2 template for a TOML file with loop support.
|
||||
"""
|
||||
loop_paths = {candidate.path for candidate in loop_candidates}
|
||||
lines = text.splitlines(keepends=True)
|
||||
current_table: tuple[str, ...] = ()
|
||||
out_lines: list[str] = []
|
||||
skip_until_next_table = (
|
||||
False # Track when we're inside a looped array-of-tables
|
||||
)
|
||||
|
||||
for raw_line in lines:
|
||||
line = raw_line
|
||||
stripped = line.lstrip()
|
||||
|
||||
# Blank or pure comment
|
||||
if not stripped or stripped.startswith("#"):
|
||||
# Only output if we're not skipping
|
||||
if not skip_until_next_table:
|
||||
out_lines.append(raw_line)
|
||||
continue
|
||||
|
||||
# Table header: [server] or [server.tls] or [[array.of.tables]]
|
||||
if stripped.startswith("[") and "]" in stripped:
|
||||
header = stripped
|
||||
# Check if it's array-of-tables ([[name]]) or regular table ([name])
|
||||
is_array_table = header.startswith("[[") and "]]" in header
|
||||
|
||||
if is_array_table:
|
||||
# Extract content between [[ and ]]
|
||||
start = header.find("[[") + 2
|
||||
end = header.find("]]", start)
|
||||
inner = header[start:end].strip() if end != -1 else ""
|
||||
else:
|
||||
# Extract content between [ and ]
|
||||
start = header.find("[") + 1
|
||||
end = header.find("]", start)
|
||||
inner = header[start:end].strip() if end != -1 else ""
|
||||
|
||||
if inner:
|
||||
parts = [p.strip() for p in inner.split(".") if p.strip()]
|
||||
table_path = tuple(parts)
|
||||
|
||||
# Check if this is an array-of-tables that's a loop candidate
|
||||
if is_array_table and table_path in loop_paths:
|
||||
# If we're already skipping this table, this is a subsequent occurrence
|
||||
if skip_until_next_table and current_table == table_path:
|
||||
# This is a duplicate [[table]] - skip it
|
||||
continue
|
||||
|
||||
# This is the first occurrence - generate the loop
|
||||
current_table = table_path
|
||||
candidate = next(
|
||||
c for c in loop_candidates if c.path == table_path
|
||||
)
|
||||
|
||||
# Generate the loop header
|
||||
collection_var = self.make_var_name(role_prefix, candidate.path)
|
||||
item_var = candidate.loop_var
|
||||
|
||||
# Get sample item to build template
|
||||
if candidate.items:
|
||||
sample_item = candidate.items[0]
|
||||
|
||||
# Build loop
|
||||
out_lines.append(
|
||||
f"{{% for {item_var} in {collection_var} %}}\n"
|
||||
)
|
||||
out_lines.append(f"[[{'.'.join(table_path)}]]\n")
|
||||
|
||||
# Add fields from sample item
|
||||
for key, value in sample_item.items():
|
||||
if key == "_key":
|
||||
continue
|
||||
if isinstance(value, str):
|
||||
out_lines.append(
|
||||
f'{key} = "{{{{ {item_var}.{key} }}}}"\n'
|
||||
)
|
||||
else:
|
||||
out_lines.append(
|
||||
f"{key} = {{{{ {item_var}.{key} }}}}\n"
|
||||
)
|
||||
|
||||
out_lines.append("{% endfor %}\n")
|
||||
|
||||
# Skip all content until the next different table
|
||||
skip_until_next_table = True
|
||||
continue
|
||||
else:
|
||||
# Regular table or non-loop array - reset skip flag if it's a different table
|
||||
if current_table != table_path:
|
||||
skip_until_next_table = False
|
||||
current_table = table_path
|
||||
|
||||
out_lines.append(raw_line)
|
||||
continue
|
||||
|
||||
# If we're inside a skipped array-of-tables section, skip this line
|
||||
if skip_until_next_table:
|
||||
continue
|
||||
|
||||
# Try key = value
|
||||
newline = ""
|
||||
content = raw_line
|
||||
if content.endswith("\r\n"):
|
||||
newline = "\r\n"
|
||||
content = content[:-2]
|
||||
elif content.endswith("\n"):
|
||||
newline = content[-1]
|
||||
content = content[:-1]
|
||||
|
||||
eq_index = content.find("=")
|
||||
if eq_index == -1:
|
||||
out_lines.append(raw_line)
|
||||
continue
|
||||
|
||||
before_eq = content[:eq_index]
|
||||
after_eq = content[eq_index + 1 :]
|
||||
|
||||
key = before_eq.strip()
|
||||
if not key:
|
||||
out_lines.append(raw_line)
|
||||
continue
|
||||
|
||||
# Whitespace after '='
|
||||
value_ws_len = len(after_eq) - len(after_eq.lstrip(" \t"))
|
||||
leading_ws = after_eq[:value_ws_len]
|
||||
value_and_comment = after_eq[value_ws_len:]
|
||||
|
||||
value_part, comment_part = self._split_inline_comment(
|
||||
value_and_comment, {"#"}
|
||||
)
|
||||
raw_value = value_part.strip()
|
||||
|
||||
# Path for this key (table + key)
|
||||
path = current_table + (key,)
|
||||
|
||||
# Check if this path is a loop candidate
|
||||
if path in loop_paths:
|
||||
candidate = next(c for c in loop_candidates if c.path == path)
|
||||
collection_var = self.make_var_name(role_prefix, candidate.path)
|
||||
item_var = candidate.loop_var
|
||||
|
||||
if candidate.item_schema == "scalar":
|
||||
# Scalar list loop
|
||||
replacement_value = (
|
||||
f"["
|
||||
f"{{% for {item_var} in {collection_var} %}}"
|
||||
f"{{{{ {item_var} }}}}"
|
||||
f"{{% if not loop.last %}}, {{% endif %}}"
|
||||
f"{{% endfor %}}"
|
||||
f"]"
|
||||
)
|
||||
else:
|
||||
# Dict/nested loop - use tojson filter for complex arrays
|
||||
replacement_value = f"{{{{ {collection_var} | tojson }}}}"
|
||||
|
||||
new_content = (
|
||||
before_eq + "=" + leading_ws + replacement_value + comment_part
|
||||
)
|
||||
out_lines.append(new_content + newline)
|
||||
continue
|
||||
|
||||
# Special case: inline table
|
||||
if (
|
||||
raw_value.startswith("{")
|
||||
and raw_value.endswith("}")
|
||||
and tomllib is not None
|
||||
):
|
||||
try:
|
||||
# Parse the inline table as a tiny TOML document
|
||||
mini_source = "table = " + raw_value + "\n"
|
||||
mini_data = tomllib.loads(mini_source)["table"]
|
||||
except Exception:
|
||||
mini_data = None
|
||||
|
||||
if isinstance(mini_data, dict):
|
||||
inner_bits: list[str] = []
|
||||
for sub_key, sub_val in mini_data.items():
|
||||
nested_path = path + (sub_key,)
|
||||
nested_var = self.make_var_name(role_prefix, nested_path)
|
||||
if isinstance(sub_val, str):
|
||||
inner_bits.append(f'{sub_key} = "{{{{ {nested_var} }}}}"')
|
||||
elif isinstance(sub_val, bool):
|
||||
inner_bits.append(
|
||||
f"{sub_key} = {{{{ {nested_var} | lower }}}}"
|
||||
)
|
||||
else:
|
||||
inner_bits.append(f"{sub_key} = {{{{ {nested_var} }}}}")
|
||||
replacement_value = "{ " + ", ".join(inner_bits) + " }"
|
||||
|
|
@ -191,9 +524,14 @@ class TomlHandler(DictLikeHandler):
|
|||
and raw_value[0] in {'"', "'"}
|
||||
)
|
||||
|
||||
# Check if value is a boolean in the text
|
||||
is_bool = raw_value.strip().lower() in ("true", "false")
|
||||
|
||||
if use_quotes:
|
||||
quote_char = raw_value[0]
|
||||
replacement_value = f"{quote_char}{{{{ {var_name} }}}}{quote_char}"
|
||||
elif is_bool:
|
||||
replacement_value = f"{{{{ {var_name} | lower }}}}"
|
||||
else:
|
||||
replacement_value = f"{{{{ {var_name} }}}}"
|
||||
|
||||
|
|
|
|||
|
|
@ -418,8 +418,8 @@ class XmlHandler(BaseHandler):
|
|||
# Use simple variable reference - attributes should always exist
|
||||
elem.set(attr_name, f"{{{{ {loop_var}.{attr_name} }}}}")
|
||||
elif key == "_text":
|
||||
# Simple text content
|
||||
elem.text = f"{{{{ {loop_var} }}}}"
|
||||
# Simple text content - use ._text accessor for dict-based items
|
||||
elem.text = f"{{{{ {loop_var}._text }}}}"
|
||||
elif key == "value":
|
||||
# Text with attributes/children
|
||||
elem.text = f"{{{{ {loop_var}.value }}}}"
|
||||
|
|
|
|||
|
|
@ -124,7 +124,8 @@ class YamlHandler(DictLikeHandler):
|
|||
replacement = f"{{{{ {var_name} }}}}"
|
||||
|
||||
leading = rest[: len(rest) - len(rest.lstrip(" \t"))]
|
||||
new_stripped = f"{key}: {leading}{replacement}{comment_part}"
|
||||
new_rest = f"{leading}{replacement}{comment_part}"
|
||||
new_stripped = f"{key}:{new_rest}"
|
||||
out_lines.append(
|
||||
" " * indent
|
||||
+ new_stripped
|
||||
|
|
@ -281,7 +282,8 @@ class YamlHandler(DictLikeHandler):
|
|||
replacement = f"{{{{ {var_name} }}}}"
|
||||
|
||||
leading = rest[: len(rest) - len(rest.lstrip(" \t"))]
|
||||
new_stripped = f"{key}: {leading}{replacement}{comment_part}"
|
||||
new_rest = f"{leading}{replacement}{comment_part}"
|
||||
new_stripped = f"{key}:{new_rest}"
|
||||
out_lines.append(
|
||||
" " * indent
|
||||
+ new_stripped
|
||||
|
|
@ -378,10 +380,10 @@ class YamlHandler(DictLikeHandler):
|
|||
# Dict-style: key: {% for ... %}
|
||||
key = candidate.path[-1] if candidate.path else "items"
|
||||
lines.append(f"{indent_str}{key}:")
|
||||
lines.append(f"{indent_str} {{% for {item_var} in {collection_var} %}}")
|
||||
lines.append(f"{indent_str} {{% for {item_var} in {collection_var} -%}}")
|
||||
else:
|
||||
# List-style: just the loop
|
||||
lines.append(f"{indent_str}{{% for {item_var} in {collection_var} %}}")
|
||||
lines.append(f"{indent_str}{{% for {item_var} in {collection_var} -%}}")
|
||||
|
||||
# Generate template for item structure
|
||||
if candidate.items:
|
||||
|
|
|
|||
|
|
@ -85,14 +85,20 @@ class LoopAnalyzer:
|
|||
self._analyze_xml(parsed)
|
||||
elif fmt in ("yaml", "json", "toml"):
|
||||
self._analyze_dict_like(parsed, path=())
|
||||
# INI files are typically flat key-value, not suitable for loops
|
||||
elif fmt == "ini":
|
||||
# INI files are typically flat key-value, not suitable for loops
|
||||
pass
|
||||
|
||||
# Sort by path depth (process parent structures before children)
|
||||
self.candidates.sort(key=lambda c: len(c.path))
|
||||
return self.candidates
|
||||
|
||||
def _analyze_dict_like(
|
||||
self, obj: Any, path: tuple[str, ...], depth: int = 0
|
||||
self,
|
||||
obj: Any,
|
||||
path: tuple[str, ...],
|
||||
depth: int = 0,
|
||||
parent_is_list: bool = False,
|
||||
) -> None:
|
||||
"""Recursively analyze dict/list structures."""
|
||||
|
||||
|
|
@ -111,9 +117,16 @@ class LoopAnalyzer:
|
|||
|
||||
# Recurse into dict values
|
||||
for key, value in obj.items():
|
||||
self._analyze_dict_like(value, path + (str(key),), depth + 1)
|
||||
self._analyze_dict_like(
|
||||
value, path + (str(key),), depth + 1, parent_is_list=False
|
||||
)
|
||||
|
||||
elif isinstance(obj, list):
|
||||
# Don't create loop candidates for nested lists (lists inside lists)
|
||||
# These are too complex for clean template generation and should fall back to scalar handling
|
||||
if parent_is_list:
|
||||
return
|
||||
|
||||
# Check if this list is homogeneous
|
||||
if len(obj) >= self.MIN_ITEMS_FOR_LOOP:
|
||||
candidate = self._check_list_collection(obj, path)
|
||||
|
|
@ -123,8 +136,11 @@ class LoopAnalyzer:
|
|||
return
|
||||
|
||||
# If not a good loop candidate, recurse into items
|
||||
# Pass parent_is_list=True so nested lists won't create loop candidates
|
||||
for i, item in enumerate(obj):
|
||||
self._analyze_dict_like(item, path + (str(i),), depth + 1)
|
||||
self._analyze_dict_like(
|
||||
item, path + (str(i),), depth + 1, parent_is_list=True
|
||||
)
|
||||
|
||||
def _check_list_collection(
|
||||
self, items: list[Any], path: tuple[str, ...]
|
||||
|
|
@ -185,45 +201,55 @@ class LoopAnalyzer:
|
|||
|
||||
Example: {"server1": {...}, "server2": {...}} where all values
|
||||
have the same structure.
|
||||
|
||||
NOTE: Currently disabled for TOML compatibility. TOML's dict-of-tables
|
||||
syntax ([servers.alpha], [servers.beta]) cannot be easily converted to
|
||||
loops without restructuring the entire TOML format. To maintain consistency
|
||||
between Ansible YAML and Jinja2 templates, we treat these as scalars.
|
||||
"""
|
||||
|
||||
if not obj:
|
||||
return None
|
||||
|
||||
values = list(obj.values())
|
||||
|
||||
# Check type homogeneity
|
||||
value_types = [type(v).__name__ for v in values]
|
||||
type_counts = Counter(value_types)
|
||||
|
||||
if len(type_counts) != 1:
|
||||
return None
|
||||
|
||||
value_type = value_types[0]
|
||||
|
||||
# Only interested in dict values for dict collections
|
||||
# (scalar-valued dicts stay as scalars)
|
||||
if value_type != "dict":
|
||||
return None
|
||||
|
||||
# Check structural homogeneity
|
||||
schema = self._analyze_dict_schema(values)
|
||||
if schema in ("simple_dict", "homogeneous"):
|
||||
confidence = 0.9 if schema == "simple_dict" else 0.8
|
||||
|
||||
# Convert dict to list of items with 'key' added
|
||||
items_with_keys = [{"_key": k, **v} for k, v in obj.items()]
|
||||
|
||||
return LoopCandidate(
|
||||
path=path,
|
||||
loop_var=self._derive_loop_var(path, singular=True),
|
||||
items=items_with_keys,
|
||||
item_schema="simple_dict",
|
||||
confidence=confidence,
|
||||
)
|
||||
|
||||
# TODO: Re-enable this if we implement proper dict-of-tables loop generation
|
||||
# For now, return None to use scalar handling
|
||||
return None
|
||||
|
||||
# Original logic preserved below for reference:
|
||||
# if not obj:
|
||||
# return None
|
||||
#
|
||||
# values = list(obj.values())
|
||||
#
|
||||
# # Check type homogeneity
|
||||
# value_types = [type(v).__name__ for v in values]
|
||||
# type_counts = Counter(value_types)
|
||||
#
|
||||
# if len(type_counts) != 1:
|
||||
# return None
|
||||
#
|
||||
# value_type = value_types[0]
|
||||
#
|
||||
# # Only interested in dict values for dict collections
|
||||
# # (scalar-valued dicts stay as scalars)
|
||||
# if value_type != "dict":
|
||||
# return None
|
||||
#
|
||||
# # Check structural homogeneity
|
||||
# schema = self._analyze_dict_schema(values)
|
||||
# if schema in ("simple_dict", "homogeneous"):
|
||||
# confidence = 0.9 if schema == "simple_dict" else 0.8
|
||||
#
|
||||
# # Convert dict to list of items with 'key' added
|
||||
# items_with_keys = [{"_key": k, **v} for k, v in obj.items()]
|
||||
#
|
||||
# return LoopCandidate(
|
||||
# path=path,
|
||||
# loop_var=self._derive_loop_var(path, singular=True),
|
||||
# items=items_with_keys,
|
||||
# item_schema="simple_dict",
|
||||
# confidence=confidence,
|
||||
# )
|
||||
#
|
||||
# return None
|
||||
|
||||
def _analyze_dict_schema(
|
||||
self, dicts: list[dict[str, Any]]
|
||||
) -> Literal["simple_dict", "homogeneous", "heterogeneous"]:
|
||||
|
|
@ -316,7 +342,7 @@ class LoopAnalyzer:
|
|||
|
||||
XML is particularly suited for loops when we have repeated sibling elements.
|
||||
"""
|
||||
import xml.etree.ElementTree as ET
|
||||
import xml.etree.ElementTree as ET # nosec B405
|
||||
|
||||
if not isinstance(root, ET.Element):
|
||||
return
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue