Ensure directories in the tree of anything included with --include are defined in the state and manifest so we make dirs before we try to create files

This commit is contained in:
Miguel Jacq 2026-01-02 21:10:32 +11:00
parent 781efef467
commit c88405ef01
Signed by: mig5
GPG key ID: 59B3F0C24135C6A9
5 changed files with 170 additions and 5 deletions

View file

@ -1,6 +1,7 @@
# 0.2.1 # 0.2.1
* Don't accidentally add extra_paths role to usr_local_custom list, resulting in extra_paths appearing twice in manifested playbook * Don't accidentally add extra_paths role to usr_local_custom list, resulting in extra_paths appearing twice in manifested playbook
* Ensure directories in the tree of anything included with --include are defined in the state and manifest so we make dirs before we try to create files
# 0.2.0 # 0.2.0

View file

@ -24,7 +24,7 @@ def stat_triplet(path: str) -> Tuple[str, str, str]:
mode is a zero-padded octal string (e.g. "0644"). mode is a zero-padded octal string (e.g. "0644").
""" """
st = os.stat(path, follow_symlinks=True) st = os.stat(path, follow_symlinks=True)
mode = oct(st.st_mode & 0o777)[2:].zfill(4) mode = oct(st.st_mode & 0o7777)[2:].zfill(4)
import grp import grp
import pwd import pwd

View file

@ -34,6 +34,15 @@ class ManagedFile:
reason: str reason: str
@dataclass
class ManagedDir:
path: str
owner: str
group: str
mode: str
reason: str
@dataclass @dataclass
class ExcludedFile: class ExcludedFile:
path: str path: str
@ -109,6 +118,7 @@ class ExtraPathsSnapshot:
role_name: str role_name: str
include_patterns: List[str] include_patterns: List[str]
exclude_patterns: List[str] exclude_patterns: List[str]
managed_dirs: List[ManagedDir]
managed_files: List[ManagedFile] managed_files: List[ManagedFile]
excluded: List[ExcludedFile] excluded: List[ExcludedFile]
notes: List[str] notes: List[str]
@ -1484,12 +1494,78 @@ def harvest(
extra_notes: List[str] = [] extra_notes: List[str] = []
extra_excluded: List[ExcludedFile] = [] extra_excluded: List[ExcludedFile] = []
extra_managed: List[ManagedFile] = [] extra_managed: List[ManagedFile] = []
extra_managed_dirs: List[ManagedDir] = []
extra_dir_seen: Set[str] = set()
def _walk_and_capture_dirs(root: str) -> None:
root = os.path.normpath(root)
if not root.startswith("/"):
root = "/" + root
if not os.path.isdir(root) or os.path.islink(root):
return
for dirpath, dirnames, _ in os.walk(root, followlinks=False):
if len(extra_managed_dirs) >= MAX_FILES_CAP:
extra_notes.append(
f"Reached directory cap ({MAX_FILES_CAP}) while scanning {root}."
)
return
dirpath = os.path.normpath(dirpath)
if not dirpath.startswith("/"):
dirpath = "/" + dirpath
if path_filter.is_excluded(dirpath):
# Prune excluded subtrees.
dirnames[:] = []
continue
if os.path.islink(dirpath) or not os.path.isdir(dirpath):
dirnames[:] = []
continue
if dirpath not in extra_dir_seen:
deny = policy.deny_reason_dir(dirpath)
if not deny:
try:
owner, group, mode = stat_triplet(dirpath)
extra_managed_dirs.append(
ManagedDir(
path=dirpath,
owner=owner,
group=group,
mode=mode,
reason="user_include_dir",
)
)
except OSError:
pass
extra_dir_seen.add(dirpath)
# Prune excluded dirs and symlinks early.
pruned: List[str] = []
for d in dirnames:
p = os.path.join(dirpath, d)
if os.path.islink(p) or path_filter.is_excluded(p):
continue
pruned.append(d)
dirnames[:] = pruned
extra_role_name = "extra_paths" extra_role_name = "extra_paths"
extra_role_seen = seen_by_role.setdefault(extra_role_name, set()) extra_role_seen = seen_by_role.setdefault(extra_role_name, set())
include_specs = list(include_paths or []) include_specs = list(include_paths or [])
exclude_specs = list(exclude_paths or []) exclude_specs = list(exclude_paths or [])
# If any include pattern points at a directory, capture that directory tree's
# ownership/mode so the manifest can recreate it accurately.
include_pats = path_filter.iter_include_patterns()
for pat in include_pats:
if pat.kind == "prefix":
p = pat.value
if os.path.isdir(p) and not os.path.islink(p):
_walk_and_capture_dirs(p)
elif pat.kind == "glob":
for h in glob.glob(pat.value, recursive=True):
if os.path.isdir(h) and not os.path.islink(h):
_walk_and_capture_dirs(h)
if include_specs: if include_specs:
extra_notes.append("User include patterns:") extra_notes.append("User include patterns:")
extra_notes.extend([f"- {p}" for p in include_specs]) extra_notes.extend([f"- {p}" for p in include_specs])
@ -1529,6 +1605,7 @@ def harvest(
role_name=extra_role_name, role_name=extra_role_name,
include_patterns=include_specs, include_patterns=include_specs,
exclude_patterns=exclude_specs, exclude_patterns=exclude_specs,
managed_dirs=extra_managed_dirs,
managed_files=extra_managed, managed_files=extra_managed,
excluded=extra_excluded, excluded=extra_excluded,
notes=extra_notes, notes=extra_notes,

View file

@ -137,3 +137,33 @@ class IgnorePolicy:
return "sensitive_content" return "sensitive_content"
return None return None
def deny_reason_dir(self, path: str) -> Optional[str]:
"""Directory-specific deny logic.
deny_reason() is file-oriented (it rejects directories as "not_regular_file").
For directory metadata capture (so roles can recreate directory trees), we need
a lighter-weight check:
- apply deny_globs (unless dangerous)
- require the path to be a real directory (no symlink)
- ensure it's stat'able/readable
No size checks or content scanning are performed for directories.
"""
if not self.dangerous:
for g in self.deny_globs or []:
if fnmatch.fnmatch(path, g):
return "denied_path"
try:
os.stat(path, follow_symlinks=True)
except OSError:
return "unreadable"
if os.path.islink(path):
return "symlink"
if not os.path.isdir(path):
return "not_directory"
return None

View file

@ -344,6 +344,29 @@ def _write_role_defaults(role_dir: str, mapping: Dict[str, Any]) -> None:
f.write(out) f.write(out)
def _build_managed_dirs_var(
managed_dirs: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""Convert enroll managed_dirs into an Ansible-friendly list of dicts.
Each dict drives a role task loop and is safe across hosts.
"""
out: List[Dict[str, Any]] = []
for d in managed_dirs:
dest = d.get("path") or ""
if not dest:
continue
out.append(
{
"dest": dest,
"owner": d.get("owner") or "root",
"group": d.get("group") or "root",
"mode": d.get("mode") or "0755",
}
)
return out
def _build_managed_files_var( def _build_managed_files_var(
managed_files: List[Dict[str, Any]], managed_files: List[Dict[str, Any]],
templated_src_rels: Set[str], templated_src_rels: Set[str],
@ -390,7 +413,22 @@ def _render_generic_files_tasks(
# Using first_found makes roles work in both modes: # Using first_found makes roles work in both modes:
# - site-mode: inventory/host_vars/<host>/<role>/.files/... # - site-mode: inventory/host_vars/<host>/<role>/.files/...
# - non-site: roles/<role>/files/... # - non-site: roles/<role>/files/...
return f"""- name: Deploy any systemd unit files (templates) return f"""- name: Ensure managed directories exist (preserve owner/group/mode)
ansible.builtin.file:
path: "{{{{ item.dest }}}}"
state: directory
owner: "{{{{ item.owner }}}}"
group: "{{{{ item.group }}}}"
mode: "{{{{ item.mode }}}}"
loop: "{{{{ {var_prefix}_managed_dirs | default([]) }}}}"
- name: Ensure destination directories exist
ansible.builtin.file:
path: "{{{{ item.dest | dirname }}}}"
state: directory
loop: "{{{{ {var_prefix}_managed_files | default([]) }}}}"
- name: Deploy any systemd unit files (templates)
ansible.builtin.template: ansible.builtin.template:
src: "{{{{ item.src_rel }}}}.j2" src: "{{{{ item.src_rel }}}}.j2"
dest: "{{{{ item.dest }}}}" dest: "{{{{ item.dest }}}}"
@ -1444,13 +1482,17 @@ Unowned /etc config files not attributed to packages or services.
# ------------------------- # -------------------------
# extra_paths role (user-requested includes) # extra_paths role (user-requested includes)
# ------------------------- # -------------------------
if extra_paths_snapshot and extra_paths_snapshot.get("managed_files"): if extra_paths_snapshot and (
extra_paths_snapshot.get("managed_files")
or extra_paths_snapshot.get("managed_dirs")
):
role = extra_paths_snapshot.get("role_name", "extra_paths") role = extra_paths_snapshot.get("role_name", "extra_paths")
role_dir = os.path.join(roles_root, role) role_dir = os.path.join(roles_root, role)
_write_role_scaffold(role_dir) _write_role_scaffold(role_dir)
var_prefix = role var_prefix = role
managed_dirs = extra_paths_snapshot.get("managed_dirs", []) or []
managed_files = extra_paths_snapshot.get("managed_files", []) managed_files = extra_paths_snapshot.get("managed_files", [])
excluded = extra_paths_snapshot.get("excluded", []) excluded = extra_paths_snapshot.get("excluded", [])
notes = extra_paths_snapshot.get("notes", []) notes = extra_paths_snapshot.get("notes", [])
@ -1489,12 +1531,23 @@ Unowned /etc config files not attributed to packages or services.
notify_systemd=None, notify_systemd=None,
) )
dirs_var = _build_managed_dirs_var(managed_dirs)
jt_map = _yaml_load_mapping(jt_vars) if jt_vars.strip() else {} jt_map = _yaml_load_mapping(jt_vars) if jt_vars.strip() else {}
vars_map: Dict[str, Any] = {f"{var_prefix}_managed_files": files_var} vars_map: Dict[str, Any] = {
f"{var_prefix}_managed_dirs": dirs_var,
f"{var_prefix}_managed_files": files_var,
}
vars_map = _merge_mappings_overwrite(vars_map, jt_map) vars_map = _merge_mappings_overwrite(vars_map, jt_map)
if site_mode: if site_mode:
_write_role_defaults(role_dir, {f"{var_prefix}_managed_files": []}) _write_role_defaults(
role_dir,
{
f"{var_prefix}_managed_dirs": [],
f"{var_prefix}_managed_files": [],
},
)
_write_hostvars(out_dir, fqdn or "", role, vars_map) _write_hostvars(out_dir, fqdn or "", role, vars_map)
else: else:
_write_role_defaults(role_dir, vars_map) _write_role_defaults(role_dir, vars_map)
@ -1530,6 +1583,10 @@ User-requested extra file harvesting.
""" """
+ ("\n".join([f"- {p}" for p in exclude_pats]) or "- (none)") + ("\n".join([f"- {p}" for p in exclude_pats]) or "- (none)")
+ """\n + """\n
## Managed directories
"""
+ ("\n".join([f"- {d.get('path')}" for d in managed_dirs]) or "- (none)")
+ """\n
## Managed files ## Managed files
""" """
+ ("\n".join([f"- {mf.get('path')}" for mf in managed_files]) or "- (none)") + ("\n".join([f"- {mf.get('path')}" for mf in managed_files]) or "- (none)")