From 21a3ef3447b6a3a5deec71f77d73d8c4a8bb58ed Mon Sep 17 00:00:00 2001 From: Miguel Jacq Date: Mon, 22 Jun 2026 12:21:33 +1000 Subject: [PATCH] More safety about writing output harvests/manifests to safe locations, including SOPS and diff. --- enroll/cli.py | 91 ++++++++++++--- enroll/harvest_safety.py | 211 ++++++++++++++++++++++++++++++----- enroll/manifest.py | 5 +- enroll/manifest_safety.py | 28 +++-- enroll/sopsutil.py | 6 +- tests/test_cli_helpers.py | 17 +++ tests/test_harvest_safety.py | 82 ++++++++++++++ 7 files changed, 384 insertions(+), 56 deletions(-) diff --git a/enroll/cli.py b/enroll/cli.py index 1715c98..bb868e5 100644 --- a/enroll/cli.py +++ b/enroll/cli.py @@ -22,6 +22,7 @@ from .diff import ( ) from .explain import explain_state from .harvest import harvest +from .harvest_safety import ensure_safe_output_parent, write_text_output_file from .manifest import manifest from .remote import ( remote_harvest, @@ -135,14 +136,75 @@ def _split_list_value(v: str) -> list[str]: return [raw] if raw else [] +def _root_trust_reason(path: Path, *, final: bool) -> Optional[str]: + """Return why a PATH directory/ancestor is unsafe for root execution.""" + + running_as_root = _is_effective_root() + if not final and not running_as_root: + return None + try: + st = os.stat(path) + except OSError: + return None + if not stat.S_ISDIR(st.st_mode): + return None + + subject = "directory" if final else "parent directory" + if running_as_root and st.st_uid != 0: + return f"{subject} is not owned by root" + + writable_by_group = bool(st.st_mode & stat.S_IWGRP) + writable_by_other = bool(st.st_mode & stat.S_IWOTH) + sticky = bool(st.st_mode & stat.S_ISVTX) + + # A sticky shared ancestor such as /tmp may contain a root-owned PATH + # directory safely enough for this check, but the PATH entry itself must + # never be writable by group/other because that permits command planting. + if final or not sticky: + if writable_by_other: + return f"{subject} is world-writable" + if writable_by_group: + return f"{subject} is group-writable" + return None + + +def _root_parent_trust_reason(path: Path) -> Optional[str]: + """Check original and resolved PATH ancestors for root trust.""" + + if not _is_effective_root(): + return None + + candidates: list[Path] = [] + candidates.extend(reversed(path.parents)) + try: + resolved = path.resolve(strict=True) + except OSError: + resolved = None + if resolved is not None and resolved != path: + candidates.extend(reversed(resolved.parents)) + + seen: set[str] = set() + for parent in candidates: + key = str(parent) + if key in seen: + continue + seen.add(key) + reason = _root_trust_reason(parent, final=False) + if reason: + return f"{reason}: {parent}" + return None + + def _path_entry_is_unsafe(entry: str) -> Optional[str]: """Return a human-readable reason if a PATH entry is unsafe for root. Empty PATH entries and relative entries resolve via the current working directory, which is equivalent to trusting whatever directory the operator - happens to be in. Existing group/world-writable directories are also risky + happens to be in. Existing group/world-writable directories are also risky when Enroll is run as root because Enroll deliberately invokes host tools - from PATH while harvesting and enforcing state. + from PATH while harvesting and enforcing state. When running as root, an + existing PATH directory must also be root-owned; a non-root-owned 0755 + directory is still attacker-controlled by its owner. """ if entry == "": @@ -152,16 +214,21 @@ def _path_entry_is_unsafe(entry: str) -> Optional[str]: if not os.path.isabs(entry): return "relative PATH entry resolves from the current directory" + p = Path(entry) + parent_reason = _root_parent_trust_reason(p) + if parent_reason: + return parent_reason + try: st = os.stat(entry) except OSError: return None if not stat.S_ISDIR(st.st_mode): return None - if st.st_mode & stat.S_IWOTH: - return "directory is world-writable" - if st.st_mode & stat.S_IWGRP: - return "directory is group-writable" + + final_reason = _root_trust_reason(p, final=True) + if final_reason: + return final_reason return None @@ -353,7 +420,7 @@ def _resolve_sops_out_file(out: Optional[str], *, hint: str) -> Path: def _tar_dir_to(path_dir: Path, tar_path: Path) -> None: - tar_path.parent.mkdir(parents=True, exist_ok=True) + ensure_safe_output_parent(tar_path, label="harvest tar output") with tarfile.open(tar_path, mode="w:gz") as tf: # Keep a stable on-disk layout when extracted: state.json + artifacts/ tf.add(str(path_dir), arcname=".") @@ -363,7 +430,7 @@ def _encrypt_harvest_dir_to_sops( bundle_dir: Path, out_file: Path, fps: list[str] ) -> Path: out_file = Path(out_file) - out_file.parent.mkdir(parents=True, exist_ok=True) + ensure_safe_output_parent(out_file, label="encrypted harvest output") # Create the tarball alongside the output file (keeps filesystem permissions/locality sane). fd, tmp_tgz = tempfile.mkstemp( @@ -1021,9 +1088,7 @@ def main() -> None: out_path = getattr(args, "out", None) if out_path: - p = Path(out_path).expanduser() - p.parent.mkdir(parents=True, exist_ok=True) - p.write_text(txt, encoding="utf-8") + write_text_output_file(out_path, txt, label="validation report") else: sys.stdout.write(txt) @@ -1094,9 +1159,7 @@ def main() -> None: txt = format_report(report, fmt=str(getattr(args, "format", "text"))) out_path = getattr(args, "out", None) if out_path: - p = Path(out_path).expanduser() - p.parent.mkdir(parents=True, exist_ok=True) - p.write_text(txt, encoding="utf-8") + write_text_output_file(out_path, txt, label="diff report") else: print(txt, end="" if txt.endswith("\n") else "\n") diff --git a/enroll/harvest_safety.py b/enroll/harvest_safety.py index 75bed5a..b6e4bc4 100644 --- a/enroll/harvest_safety.py +++ b/enroll/harvest_safety.py @@ -2,6 +2,7 @@ from __future__ import annotations import os import stat +import tempfile from pathlib import Path @@ -9,6 +10,13 @@ class OutputSafetyError(RuntimeError): """Raised when an output path is unsafe for root-run plaintext output.""" +# Keep a reference to the real euid getter so tests that monkeypatch +# enroll.harvest.os.geteuid do not accidentally make output-safety code +# believe a non-root test process is running as root. Tests that need to +# exercise root behavior can still monkeypatch _effective_uid directly. +_OS_GETEUID = getattr(os, "geteuid", None) + + def _chmod_private(path: Path) -> None: try: os.chmod(path, 0o700) @@ -17,8 +25,111 @@ def _chmod_private(path: Path) -> None: pass -def _assert_no_existing_symlink_components(path: Path, *, label: str) -> None: - """Reject symlinks in existing parent components of an output path.""" +def _effective_uid() -> int | None: + if _OS_GETEUID is None: + return None + try: + return int(_OS_GETEUID()) + except OSError: + return None + + +def _assert_trusted_root_parent(path: Path, st: os.stat_result, *, label: str) -> None: + """Reject parent directories that are unsafe when Enroll runs as root. + + Enroll deliberately invokes host tools and writes host configuration state, + so root-run output should not pass through parent directories controlled by + an unprivileged user. Root-owned sticky shared directories such as /tmp are + allowed as a boundary, but any existing child below them must still be + root-owned and non-writable by group/other. + """ + + if _effective_uid() != 0: + return + if not stat.S_ISDIR(st.st_mode): + raise OutputSafetyError(f"{label} parent is not a directory: {path}") + if st.st_uid != 0: + raise OutputSafetyError( + f"{label} parent is not owned by root; refusing root-run output: {path}" + ) + writable_by_group_or_other = st.st_mode & (stat.S_IWGRP | stat.S_IWOTH) + sticky = st.st_mode & stat.S_ISVTX + if writable_by_group_or_other and not sticky: + raise OutputSafetyError( + f"{label} parent is writable by group/other; refusing root-run output: {path}" + ) + + +def _assert_existing_output_dir_component(path: Path, *, label: str) -> None: + try: + st = path.lstat() + except OSError as e: + raise OutputSafetyError(f"unable to inspect {label} parent: {path}") from e + if stat.S_ISLNK(st.st_mode): + raise OutputSafetyError( + f"{label} parent path contains a symlink; refusing: {path}" + ) + _assert_trusted_root_parent(path, st, label=label) + + +def _mkdir_private_dir_tree( + path: Path, *, label: str, final_must_be_new: bool = False +) -> Path: + """Create a directory tree one component at a time with safety checks. + + pathlib.mkdir(parents=True) can traverse a symlink inserted after a parent + pre-check and create deeper components in the symlink target. Walking one + component at a time avoids that class of race for root-run output paths. + """ + + out = Path(path).expanduser() + parts = out.parts + if not parts: + return out + + if out.is_absolute(): + cur = Path(parts[0]) + rest = parts[1:] + _assert_existing_output_dir_component(cur, label=label) + else: + cur = Path.cwd() + rest = parts + _assert_existing_output_dir_component(cur, label=label) + + for idx, part in enumerate(rest): + cur = cur / part + is_final = idx == len(rest) - 1 + if os.path.lexists(cur): + if is_final and final_must_be_new: + raise OutputSafetyError( + f"{label} path already exists; refusing to overwrite or merge: {cur}" + ) + _assert_existing_output_dir_component(cur, label=label) + continue + try: + os.mkdir(cur, 0o700) + except FileExistsError: + if is_final and final_must_be_new: + raise OutputSafetyError( + f"{label} path already exists; refusing to overwrite or merge: {cur}" + ) + _assert_existing_output_dir_component(cur, label=label) + continue + _chmod_private(cur) + _assert_existing_output_dir_component(cur, label=label) + + return out + + +def _assert_no_existing_symlink_components( + path: Path, *, label: str, require_trusted_root_parents: bool = True +) -> None: + """Reject unsafe existing parent components of an output path. + + This catches symlink parents for all users. When running as root, it also + rejects existing parents controlled by an unprivileged user so an attacker + cannot redirect root output by racing or replacing a parent directory. + """ parts = path.parts if not parts: @@ -30,52 +141,96 @@ def _assert_no_existing_symlink_components(path: Path, *, label: str) -> None: else: cur = Path.cwd() rest = parts[:-1] + if require_trusted_root_parents: + _assert_existing_output_dir_component(cur, label=label) for part in rest: cur = cur / part if not os.path.lexists(cur): return + if require_trusted_root_parents: + _assert_existing_output_dir_component(cur, label=label) + else: + try: + st = cur.lstat() + except OSError as e: + raise OutputSafetyError( + f"unable to inspect {label} parent: {cur}" + ) from e + if stat.S_ISLNK(st.st_mode): + raise OutputSafetyError( + f"{label} parent path contains a symlink; refusing: {cur}" + ) + + +def ensure_safe_output_parent(path: str | Path, *, label: str = "output") -> Path: + """Create and validate the parent directory for a root-run output file. + + The parent is checked with the same symlink/root-trust rules as plaintext + bundle directories. This is for output *files* such as reports and SOPS + bundles, where replacing an existing regular file is acceptable but + following attacker-controlled parent paths is not. + """ + + out = Path(path).expanduser() + parent = out.parent if out.parent != Path("") else Path(".") + sentinel = parent / ".enroll-output-parent-check" + _assert_no_existing_symlink_components(sentinel, label=label) + _mkdir_private_dir_tree(parent, label=label, final_must_be_new=False) + _assert_no_existing_symlink_components(sentinel, label=label) + return parent + + +def write_text_output_file( + path: str | Path, + text: str, + *, + label: str = "output file", + mode: int = 0o600, +) -> Path: + """Safely write a user-facing output text file. + + The write is staged in the destination directory and atomically renamed into + place. A final-path symlink is replaced rather than followed, while parent + symlinks or root-unsafe parents are refused by ensure_safe_output_parent(). + """ + + out = Path(path).expanduser() + parent = ensure_safe_output_parent(out, label=label) + fd, tmp_name = tempfile.mkstemp(prefix=".enroll-output-", dir=str(parent)) + try: + with os.fdopen(fd, "w", encoding="utf-8") as f: + f.write(text) try: - st = cur.lstat() - except OSError as e: - raise OutputSafetyError(f"unable to inspect {label} parent: {cur}") from e - if stat.S_ISLNK(st.st_mode): - raise OutputSafetyError( - f"{label} parent path contains a symlink; refusing: {cur}" - ) + os.chmod(tmp_name, mode) + except OSError: + pass + os.replace(tmp_name, out) + finally: + try: + os.unlink(tmp_name) + except FileNotFoundError: + pass + return out def prepare_new_private_dir(path: str | Path, *, label: str = "output") -> Path: """Create a brand-new private output directory. - Refuse existing paths, including symlinks. This prevents root-run harvests + Refuse existing paths, including symlinks. This prevents root-run harvests from writing into attacker-precreated directories in shared locations such as /tmp, and keeps plaintext bundles private by default. """ out = Path(path).expanduser() _assert_no_existing_symlink_components(out, label=label) - if os.path.lexists(out): - raise OutputSafetyError( - f"{label} path already exists; refusing to overwrite or merge: {out}" - ) - - out.mkdir(parents=True, exist_ok=False, mode=0o700) - _chmod_private(out) - - try: - st = out.lstat() - except OSError as e: - raise OutputSafetyError(f"unable to inspect {label} path: {out}") from e - if stat.S_ISLNK(st.st_mode) or not stat.S_ISDIR(st.st_mode): - raise OutputSafetyError(f"{label} path is not a real directory: {out}") - return out + return _mkdir_private_dir_tree(out, label=label, final_must_be_new=True) def ensure_private_empty_dir(path: str | Path, *, label: str = "output") -> Path: """Create or validate a private empty directory. - This is for internally-generated random cache/temp directories. User-facing + This is for internally-generated random cache/temp directories. User-facing --out paths should normally use prepare_new_private_dir() instead. """ @@ -99,6 +254,4 @@ def ensure_private_empty_dir(path: str | Path, *, label: str = "output") -> Path _chmod_private(out) return out - out.mkdir(parents=True, exist_ok=False, mode=0o700) - _chmod_private(out) - return out + return _mkdir_private_dir_tree(out, label=label, final_must_be_new=True) diff --git a/enroll/manifest.py b/enroll/manifest.py index 9c6806a..3ebfefe 100644 --- a/enroll/manifest.py +++ b/enroll/manifest.py @@ -10,6 +10,7 @@ from typing import List, Optional from .ansible import manifest_from_bundle_dir as manifest_ansible_from_bundle_dir from .puppet import manifest_from_bundle_dir as manifest_puppet_from_bundle_dir from .salt import manifest_from_bundle_dir as manifest_salt_from_bundle_dir +from .harvest_safety import ensure_safe_output_parent from .manifest_safety import validate_site_fqdn from .remote import _safe_extract_tar from .sopsutil import ( @@ -91,7 +92,7 @@ def _tar_dir_to_with_progress( """Create a tar.gz of src_dir at tar_path, with a simple per-entry progress display.""" src_dir = Path(src_dir) tar_path = Path(tar_path) - tar_path.parent.mkdir(parents=True, exist_ok=True) + ensure_safe_output_parent(tar_path, label="manifest tar output") # Collect paths (dirs + files) paths: list[Path] = [src_dir] @@ -144,7 +145,7 @@ def _encrypt_manifest_out_dir_to_sops( """Tar+encrypt the generated manifest output directory into a single .sops file.""" require_sops_cmd() out_file = Path(out_file) - out_file.parent.mkdir(parents=True, exist_ok=True) + ensure_safe_output_parent(out_file, label="encrypted manifest output") fd, tmp_tgz = tempfile.mkstemp( prefix=".enroll-manifest-", diff --git a/enroll/manifest_safety.py b/enroll/manifest_safety.py index 41623bd..c7e64e1 100644 --- a/enroll/manifest_safety.py +++ b/enroll/manifest_safety.py @@ -7,6 +7,12 @@ import stat from pathlib import Path from typing import Iterator, Tuple +from .harvest_safety import ( + OutputSafetyError, + ensure_safe_output_parent, + prepare_new_private_dir, +) + class ArtifactSafetyError(RuntimeError): """Raised when a harvest artifact path is unsafe to consume.""" @@ -105,13 +111,13 @@ def _safe_relative_path(value: str, *, field: str) -> Path: def prepare_manifest_output_dir( out_dir: str | Path, *, allow_existing: bool = False ) -> Path: - """Create a manifest output directory, refusing to overwrite anything. + """Create a manifest output directory, refusing unsafe root output paths. Rendering a manifest may be run by root and may target configuration- - management trees. Refuse an existing path rather than deleting or merging + management trees. Refuse an existing path rather than deleting or merging with it by default; callers that intentionally support accumulation, such - as --fqdn site mode, may allow an existing directory but never a symlink or - non-directory path. + as --fqdn site mode, may allow an existing directory but never a symlink, + non-directory path, symlinked parent, or root-unsafe parent. """ out = Path(out_dir).expanduser() @@ -120,6 +126,12 @@ def prepare_manifest_output_dir( raise ManifestOutputError( "manifest output path already exists; refusing to overwrite: " f"{out}" ) + try: + ensure_safe_output_parent( + out / ".enroll-manifest-output-check", label="manifest output" + ) + except OutputSafetyError as e: + raise ManifestOutputError(str(e)) from e st = out.lstat() if stat.S_ISLNK(st.st_mode): raise ManifestOutputError( @@ -131,12 +143,10 @@ def prepare_manifest_output_dir( ) _assert_no_output_symlinks(out) return out - out.mkdir(parents=True, exist_ok=False, mode=0o700) try: - os.chmod(out, 0o700) - except OSError: - pass - return out + return prepare_new_private_dir(out, label="manifest output") + except OutputSafetyError as e: + raise ManifestOutputError(str(e)) from e def _assert_no_symlink_components(path: Path, *, root: Path) -> None: diff --git a/enroll/sopsutil.py b/enroll/sopsutil.py index de36d4f..e8d31ac 100644 --- a/enroll/sopsutil.py +++ b/enroll/sopsutil.py @@ -7,6 +7,8 @@ import tempfile from pathlib import Path from typing import Iterable, List, Optional +from .harvest_safety import ensure_safe_output_parent + class SopsError(RuntimeError): pass @@ -46,7 +48,7 @@ def encrypt_file_binary( sops = require_sops_cmd() src_path = Path(src_path) dst_path = Path(dst_path) - dst_path.parent.mkdir(parents=True, exist_ok=True) + ensure_safe_output_parent(dst_path, label="sops output") res = subprocess.run( [ @@ -98,7 +100,7 @@ def decrypt_file_binary_to( sops = require_sops_cmd() src_path = Path(src_path) dst_path = Path(dst_path) - dst_path.parent.mkdir(parents=True, exist_ok=True) + ensure_safe_output_parent(dst_path, label="sops output") res = subprocess.run( [ diff --git a/tests/test_cli_helpers.py b/tests/test_cli_helpers.py index 7313b33..b749597 100644 --- a/tests/test_cli_helpers.py +++ b/tests/test_cli_helpers.py @@ -243,3 +243,20 @@ def test_confirm_root_path_safety_force_skips_prompt(monkeypatch): ) cli._confirm_root_path_safety(force=True) + + +def test_unsafe_root_path_reasons_flags_non_root_owned_dir(tmp_path: Path, monkeypatch): + from enroll import cli + + non_root_owned = tmp_path / "user-bin" + non_root_owned.mkdir() + if hasattr(os, "geteuid") and os.geteuid() == 0: + try: + os.chown(non_root_owned, 65534, -1) + except OSError: + pass + + monkeypatch.setattr(cli, "_is_effective_root", lambda: True) + reasons = cli._unsafe_root_path_reasons(str(non_root_owned)) + + assert any("not owned by root" in reason for reason in reasons) diff --git a/tests/test_harvest_safety.py b/tests/test_harvest_safety.py index c6038a7..5084788 100644 --- a/tests/test_harvest_safety.py +++ b/tests/test_harvest_safety.py @@ -110,3 +110,85 @@ def test_prepare_new_private_dir_rejects_symlink_parent(tmp_path: Path): with pytest.raises(OutputSafetyError, match="parent path contains a symlink"): prepare_new_private_dir(link / "bundle", label="harvest output") + + +def test_manifest_output_dir_rejects_symlink_parent(tmp_path: Path): + from enroll.manifest_safety import ManifestOutputError + + real = tmp_path / "real" + real.mkdir() + link = tmp_path / "link" + link.symlink_to(real, target_is_directory=True) + + with pytest.raises(ManifestOutputError, match="parent path contains a symlink"): + prepare_manifest_output_dir(link / "manifest") + + +def test_prepare_new_private_dir_rejects_untrusted_root_parent( + tmp_path: Path, monkeypatch +): + import enroll.harvest_safety as hs + + untrusted = tmp_path / "untrusted" + untrusted.mkdir() + if hasattr(os, "geteuid") and os.geteuid() == 0: + try: + os.chown(untrusted, 65534, -1) + except OSError: + pass + + monkeypatch.setattr(hs, "_effective_uid", lambda: 0) + with pytest.raises(OutputSafetyError, match="not owned by root"): + prepare_new_private_dir(untrusted / "bundle", label="harvest output") + + +def test_prepare_new_private_dir_uses_real_euid_despite_os_geteuid_monkeypatch( + tmp_path: Path, monkeypatch +): + import enroll.harvest_safety as hs + + monkeypatch.setattr(hs.os, "geteuid", lambda: 0) + out = prepare_new_private_dir(tmp_path / "bundle", label="harvest output") + + assert out.is_dir() + assert (out.stat().st_mode & 0o777) == 0o700 + + +def test_write_text_output_file_replaces_final_symlink_not_target(tmp_path: Path): + from enroll.harvest_safety import write_text_output_file + + target = tmp_path / "target.txt" + target.write_text("old\n", encoding="utf-8") + link = tmp_path / "report.txt" + link.symlink_to(target) + + write_text_output_file(link, "new\n", label="test report") + + assert not link.is_symlink() + assert link.read_text(encoding="utf-8") == "new\n" + assert target.read_text(encoding="utf-8") == "old\n" + + +def test_safe_output_parent_does_not_descend_into_raced_symlink( + tmp_path: Path, monkeypatch +): + import enroll.harvest_safety as hs + + target = tmp_path / "target" + target.mkdir() + link = tmp_path / "link" + real_mkdir = os.mkdir + + def racing_mkdir(path, mode=0o777, *, dir_fd=None): + if Path(path) == link and not link.exists(): + link.symlink_to(target, target_is_directory=True) + if dir_fd is not None: + return real_mkdir(path, mode, dir_fd=dir_fd) + return real_mkdir(path, mode) + + monkeypatch.setattr(hs.os, "mkdir", racing_mkdir) + + with pytest.raises(OutputSafetyError, match="parent path contains a symlink"): + hs.ensure_safe_output_parent(link / "subdir" / "report.txt", label="report") + + assert not (target / "subdir").exists()