Several bug fixes and prep for 0.2.2
Some checks failed
CI / test (push) Failing after 1m40s
Lint / test (push) Successful in 31s
Trivy / test (push) Successful in 24s

- Fix stat() of parent directory so that we set directory perms correct on --include paths.
 - Set pty for remote calls when sudo is required, to help systems with limits on sudo without pty
This commit is contained in:
Miguel Jacq 2026-01-03 11:39:57 +11:00
parent 29b52d451d
commit 824010b2ab
Signed by: mig5
GPG key ID: 59B3F0C24135C6A9
9 changed files with 249 additions and 61 deletions

View file

@ -6,7 +6,7 @@ import os
import re
import shutil
import time
from dataclasses import dataclass, asdict
from dataclasses import dataclass, asdict, field
from typing import Dict, List, Optional, Set
from .systemd import (
@ -58,59 +58,66 @@ class ServiceSnapshot:
sub_state: Optional[str]
unit_file_state: Optional[str]
condition_result: Optional[str]
managed_files: List[ManagedFile]
excluded: List[ExcludedFile]
notes: List[str]
managed_dirs: List[ManagedDir] = field(default_factory=list)
managed_files: List[ManagedFile] = field(default_factory=list)
excluded: List[ExcludedFile] = field(default_factory=list)
notes: List[str] = field(default_factory=list)
@dataclass
class PackageSnapshot:
package: str
role_name: str
managed_files: List[ManagedFile]
excluded: List[ExcludedFile]
notes: List[str]
managed_dirs: List[ManagedDir] = field(default_factory=list)
managed_files: List[ManagedFile] = field(default_factory=list)
excluded: List[ExcludedFile] = field(default_factory=list)
notes: List[str] = field(default_factory=list)
@dataclass
class UsersSnapshot:
role_name: str
users: List[dict]
managed_files: List[ManagedFile]
excluded: List[ExcludedFile]
notes: List[str]
managed_dirs: List[ManagedDir] = field(default_factory=list)
managed_files: List[ManagedFile] = field(default_factory=list)
excluded: List[ExcludedFile] = field(default_factory=list)
notes: List[str] = field(default_factory=list)
@dataclass
class AptConfigSnapshot:
role_name: str
managed_files: List[ManagedFile]
excluded: List[ExcludedFile]
notes: List[str]
managed_dirs: List[ManagedDir] = field(default_factory=list)
managed_files: List[ManagedFile] = field(default_factory=list)
excluded: List[ExcludedFile] = field(default_factory=list)
notes: List[str] = field(default_factory=list)
@dataclass
class DnfConfigSnapshot:
role_name: str
managed_files: List[ManagedFile]
excluded: List[ExcludedFile]
notes: List[str]
managed_dirs: List[ManagedDir] = field(default_factory=list)
managed_files: List[ManagedFile] = field(default_factory=list)
excluded: List[ExcludedFile] = field(default_factory=list)
notes: List[str] = field(default_factory=list)
@dataclass
class EtcCustomSnapshot:
role_name: str
managed_files: List[ManagedFile]
excluded: List[ExcludedFile]
notes: List[str]
managed_dirs: List[ManagedDir] = field(default_factory=list)
managed_files: List[ManagedFile] = field(default_factory=list)
excluded: List[ExcludedFile] = field(default_factory=list)
notes: List[str] = field(default_factory=list)
@dataclass
class UsrLocalCustomSnapshot:
role_name: str
managed_files: List[ManagedFile]
excluded: List[ExcludedFile]
notes: List[str]
managed_dirs: List[ManagedDir] = field(default_factory=list)
managed_files: List[ManagedFile] = field(default_factory=list)
excluded: List[ExcludedFile] = field(default_factory=list)
notes: List[str] = field(default_factory=list)
@dataclass
@ -149,6 +156,71 @@ ALLOWED_UNOWNED_EXTS = {
MAX_FILES_CAP = 4000
MAX_UNOWNED_FILES_PER_ROLE = 500
def _merge_parent_dirs(
existing_dirs: List[ManagedDir],
managed_files: List[ManagedFile],
*,
policy: IgnorePolicy,
) -> List[ManagedDir]:
"""Ensure parent directories for managed_files are present in managed_dirs.
This is used so the Ansible manifest can create destination directories with
explicit owner/group/mode (ansible-lint friendly) without needing a separate
"mkdir without perms" task.
We only add the immediate parent directory for each managed file. For
explicit directory includes (extra_paths), existing_dirs will already
contain the walked directory tree.
"""
by_path: Dict[str, ManagedDir] = {
d.path: d for d in (existing_dirs or []) if d.path
}
for mf in managed_files or []:
p = str(mf.path or "").rstrip("/")
if not p:
continue
dpath = os.path.dirname(p)
if not dpath or dpath == "/":
continue
if dpath in by_path:
continue
# Directory-deny logic: newer IgnorePolicy implementations provide
# deny_reason_dir(). Older/simple policies (including unit tests) may
# only implement deny_reason(), which is file-oriented and may return
# "not_regular_file" for directories.
deny = None
deny_dir = getattr(policy, "deny_reason_dir", None)
if callable(deny_dir):
deny = deny_dir(dpath)
else:
deny = policy.deny_reason(dpath)
if deny in ("not_regular_file", "not_file", "not_regular"):
deny = None
if deny:
# If the file itself was captured, its parent directory is likely safe,
# but still respect deny globs for directories to avoid managing
# sensitive/forbidden trees.
continue
try:
owner, group, mode = stat_triplet(dpath)
except OSError:
continue
by_path[dpath] = ManagedDir(
path=dpath,
owner=owner,
group=group,
mode=mode,
reason="parent_of_managed_file",
)
return [by_path[k] for k in sorted(by_path)]
# Directories that are shared across many packages.
# Never attribute all unowned files in these trees
# to one single package.
@ -1521,7 +1593,14 @@ def harvest(
continue
if dirpath not in extra_dir_seen:
deny = policy.deny_reason_dir(dirpath)
deny = None
deny_dir = getattr(policy, "deny_reason_dir", None)
if callable(deny_dir):
deny = deny_dir(dirpath)
else:
deny = policy.deny_reason(dirpath)
if deny in ("not_regular_file", "not_file", "not_regular"):
deny = None
if not deny:
try:
owner, group, mode = stat_triplet(dirpath)
@ -1661,6 +1740,52 @@ def harvest(
"roles": roles,
}
# Ensure every role has explicit managed_dirs for parent directories of managed files.
# This lets the manifest create directories with owner/group/mode (ansible-lint friendly)
# without a separate "mkdir without perms" task.
users_snapshot.managed_dirs = _merge_parent_dirs(
users_snapshot.managed_dirs, users_snapshot.managed_files, policy=policy
)
for s in service_snaps:
s.managed_dirs = _merge_parent_dirs(
s.managed_dirs, s.managed_files, policy=policy
)
for p in pkg_snaps:
p.managed_dirs = _merge_parent_dirs(
p.managed_dirs, p.managed_files, policy=policy
)
if apt_config_snapshot:
apt_config_snapshot.managed_dirs = _merge_parent_dirs(
apt_config_snapshot.managed_dirs,
apt_config_snapshot.managed_files,
policy=policy,
)
if dnf_config_snapshot:
dnf_config_snapshot.managed_dirs = _merge_parent_dirs(
dnf_config_snapshot.managed_dirs,
dnf_config_snapshot.managed_files,
policy=policy,
)
if etc_custom_snapshot:
etc_custom_snapshot.managed_dirs = _merge_parent_dirs(
etc_custom_snapshot.managed_dirs,
etc_custom_snapshot.managed_files,
policy=policy,
)
if usr_local_custom_snapshot:
usr_local_custom_snapshot.managed_dirs = _merge_parent_dirs(
usr_local_custom_snapshot.managed_dirs,
usr_local_custom_snapshot.managed_files,
policy=policy,
)
if extra_paths_snapshot:
extra_paths_snapshot.managed_dirs = _merge_parent_dirs(
extra_paths_snapshot.managed_dirs,
extra_paths_snapshot.managed_files,
policy=policy,
)
state = {
"enroll": {
"version": get_enroll_version(),