From 07b07e60c51f0db0023d7a2c2d8900a12de6c932 Mon Sep 17 00:00:00 2001 From: Miguel Jacq Date: Mon, 22 Jun 2026 15:32:40 +1000 Subject: [PATCH] Ensure paths are not followed through parent links --- enroll/accounts.py | 7 +- enroll/capture.py | 13 +--- enroll/explain.py | 6 ++ enroll/fsutil.py | 121 +++++++++++++++++++++++++++++++++++ enroll/ignore.py | 29 ++++++--- tests/test_accounts.py | 19 ++++++ tests/test_fsutil.py | 51 +++++++++++++++ tests/test_harvest_safety.py | 40 +++++++++++- tests/test_ignore.py | 60 ++++++++++++++++- 9 files changed, 323 insertions(+), 23 deletions(-) diff --git a/enroll/accounts.py b/enroll/accounts.py index b4f774b..16890d6 100644 --- a/enroll/accounts.py +++ b/enroll/accounts.py @@ -146,7 +146,12 @@ def is_human_user(uid: int, shell: str, uid_min: int) -> bool: def find_user_ssh_files(home: str) -> List[str]: sshdir = os.path.join(home, ".ssh") out: List[str] = [] - if not os.path.isdir(sshdir): + # ``os.path.isdir`` follows symlinks, so a user who replaces ``~/.ssh`` + # with a link to a sensitive directory (e.g. /etc/ssl/private) could + # otherwise have a regular file inside it harvested through the symlinked + # parent. Refuse a symlinked .ssh outright; capture_file() applies the + # same parent-symlink protection at copy time as defense in depth. + if os.path.islink(sshdir) or not os.path.isdir(sshdir): return out ak = os.path.join(sshdir, "authorized_keys") diff --git a/enroll/capture.py b/enroll/capture.py index 1edf405..66065e6 100644 --- a/enroll/capture.py +++ b/enroll/capture.py @@ -5,7 +5,7 @@ import errno import stat from typing import List, Optional, Set -from .fsutil import stat_triplet, stat_triplet_from_stat +from .fsutil import open_no_follow_path, stat_triplet, stat_triplet_from_stat from .harvest_types import ExcludedFile, ManagedFile, ManagedLink from .ignore import IgnorePolicy from .pathfilter import PathFilter @@ -55,10 +55,7 @@ def files_differ(a: str, b: str, *, max_bytes: int = 2_000_000) -> bool: def _open_no_follow_write(path: str, mode: int = 0o600) -> int: - flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL | getattr(os, "O_CLOEXEC", 0) - if hasattr(os, "O_NOFOLLOW"): - flags |= os.O_NOFOLLOW - return os.open(path, flags, mode) + return open_no_follow_path(path, write=True, mode=mode) def write_bytes_into_bundle( @@ -92,14 +89,10 @@ def copy_into_bundle( symlinks at copy time and refuses destination symlink overwrites. """ - flags = os.O_RDONLY | getattr(os, "O_CLOEXEC", 0) - if hasattr(os, "O_NOFOLLOW"): - flags |= os.O_NOFOLLOW - fd = -1 try: try: - fd = os.open(abs_path, flags) + fd = open_no_follow_path(abs_path) except OSError as e: if e.errno in {errno.ELOOP, errno.ENOTDIR}: raise OSError("refusing to copy symlink source") from e diff --git a/enroll/explain.py b/enroll/explain.py index 84d5de0..17f1b4b 100644 --- a/enroll/explain.py +++ b/enroll/explain.py @@ -189,6 +189,12 @@ _EXCLUDED_REASONS: Dict[str, ReasonInfo] = { "Not a regular file", "Excluded because it was not a regular file (device, socket, etc.).", ), + "symlink_component": ReasonInfo( + "Unsafe symlinked path", + "Excluded because a directory in the path was a symlink, which could " + "redirect capture into a sensitive location; Enroll refuses to follow " + "symlinked parents when harvesting files.", + ), "binary_like": ReasonInfo( "Binary-like", "Excluded because it looked like binary content (not useful for config management).", diff --git a/enroll/fsutil.py b/enroll/fsutil.py index 6360058..b4f6c3d 100644 --- a/enroll/fsutil.py +++ b/enroll/fsutil.py @@ -1,10 +1,131 @@ from __future__ import annotations +import errno import hashlib import os +import stat from typing import Tuple +def open_no_follow_path(path: str, *, write: bool = False, mode: int = 0o600) -> int: + """Open ``path`` without following a symlink in *any* path component. + + ``O_NOFOLLOW`` only protects the final component of a path. A regular + file reached through a symlinked *parent* directory (for example a user + replacing ``~/.ssh`` with a link to a sensitive directory) would still be + opened by a plain ``os.open(path, O_NOFOLLOW)``. + + This helper resolves the path one component at a time with ``openat`` + semantics: + + - each intermediate component is opened relative to its parent's + descriptor without following symlinks; + - the final component is opened with ``O_NOFOLLOW`` (read, or + ``O_WRONLY | O_CREAT | O_EXCL`` when ``write`` is True). + + The important detail is that intermediate components are opened with + ``O_PATH | O_NOFOLLOW`` when ``O_PATH`` is available, and then verified + with ``fstat()``. On Linux, ``O_RDONLY | O_DIRECTORY | O_NOFOLLOW`` is not + sufficient for this job: a symlink whose target is a directory can still be + opened as the target directory on some kernels. Opening with ``O_PATH`` and + checking the resulting descriptor reliably exposes such a component as a + symlink instead. + + A symlink (or a ``..`` component) anywhere in the path raises + ``OSError(ELOOP)``. On platforms without ``openat``/``O_DIRECTORY`` + support, this falls back to a single ``O_NOFOLLOW`` open of the whole path, + which is no worse than the historical behaviour. + """ + + cloexec = getattr(os, "O_CLOEXEC", 0) + nofollow = getattr(os, "O_NOFOLLOW", 0) + o_directory = getattr(os, "O_DIRECTORY", 0) + o_path = getattr(os, "O_PATH", 0) + + if write: + final_flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL | cloexec | nofollow + else: + final_flags = os.O_RDONLY | cloexec | nofollow + + supports_openat = bool( + o_directory and nofollow and os.open in getattr(os, "supports_dir_fd", set()) + ) + if not supports_openat: + return os.open(path, final_flags, mode) + + absolute = path.startswith("/") + parts = [p for p in path.split("/") if p not in ("", ".")] + if not parts: + return os.open(path, final_flags, mode) + + *parent_parts, leaf = parts + + # Use O_PATH for directory descriptors when available. O_PATH descriptors + # can be used as dir_fd anchors for later openat-style calls, and with + # O_NOFOLLOW they let us fstat() a symlink component instead of silently + # following it. If O_PATH is unavailable, use O_RDONLY and an lstat() + # pre-check for intermediate components as a best-effort fallback. + dir_base_flags = (o_path if o_path else os.O_RDONLY) | cloexec | o_directory + component_flags = ( + (o_path if o_path else os.O_RDONLY) | cloexec | o_directory | nofollow + ) + + dir_fd = os.open("/" if absolute else ".", dir_base_flags) + try: + for component in parent_parts: + if component == "..": + raise OSError(errno.ELOOP, "unsafe '..' path component", path) + + if not o_path: + # Best-effort fallback for platforms without O_PATH. This is not + # as race-resistant as the descriptor-only path, but it avoids + # known symlink parents where we cannot open the component itself + # as a non-followed O_PATH descriptor. + try: + st = os.lstat(component, dir_fd=dir_fd) + except OSError: + raise + if stat.S_ISLNK(st.st_mode): + raise OSError(errno.ELOOP, "symlinked path component", path) + if not stat.S_ISDIR(st.st_mode): + raise OSError(errno.ENOTDIR, "non-directory path component", path) + + try: + next_fd = os.open(component, component_flags, dir_fd=dir_fd) + except OSError as e: + if e.errno in {errno.ELOOP, errno.ENOTDIR}: + try: + st = os.lstat(component, dir_fd=dir_fd) + except OSError: + raise + if stat.S_ISLNK(st.st_mode): + raise OSError( + errno.ELOOP, + "symlinked path component", + path, + ) from e + raise + + try: + st = os.fstat(next_fd) + if stat.S_ISLNK(st.st_mode): + raise OSError(errno.ELOOP, "symlinked path component", path) + if not stat.S_ISDIR(st.st_mode): + raise OSError(errno.ENOTDIR, "non-directory path component", path) + except Exception: + os.close(next_fd) + raise + + os.close(dir_fd) + dir_fd = next_fd + + if leaf == "..": + raise OSError(errno.ELOOP, "unsafe '..' path component", path) + return os.open(leaf, final_flags, mode, dir_fd=dir_fd) + finally: + os.close(dir_fd) + + def stat_triplet_from_stat(st: os.stat_result) -> Tuple[str, str, str]: """Return (owner, group, mode) for an existing stat result.""" diff --git a/enroll/ignore.py b/enroll/ignore.py index dc1532f..eed4035 100644 --- a/enroll/ignore.py +++ b/enroll/ignore.py @@ -8,6 +8,8 @@ import stat from dataclasses import dataclass from typing import Optional +from .fsutil import open_no_follow_path + DEFAULT_DENY_GLOBS = [ # Common backup copies created by passwd tools (can contain sensitive data) @@ -213,26 +215,33 @@ class IgnorePolicy: def inspect_file(self, path: str) -> tuple[Optional[str], Optional[FileInspection]]: """Safely inspect a regular file and return the exact bytes to copy. - The source is opened with O_NOFOLLOW where available, fstat() is taken - from that file descriptor, and the whole file is read only after the - size cap passes. With the default 256 KiB cap this avoids a memory DoS - while ensuring secret scanning covers every byte that may be copied. + The source is opened with O_NOFOLLOW on every path component (see + ``fsutil.open_no_follow_path``), fstat() is taken from that file + descriptor, and the whole file is read only after the size cap passes. + With the default 256 KiB cap this avoids a memory DoS while ensuring + secret scanning covers every byte that may be copied. + + Opening every component without following symlinks means a regular + file reached through a symlinked *parent* directory is refused with + ``symlink_component`` rather than silently captured -- its logical + path would not have matched the deny globs. """ deny = self._path_deny_reason(path) if deny: return deny, None - flags = os.O_RDONLY | getattr(os, "O_CLOEXEC", 0) - if hasattr(os, "O_NOFOLLOW"): - flags |= os.O_NOFOLLOW - fd: Optional[int] = None try: try: - fd = os.open(path, flags) + fd = open_no_follow_path(path) except OSError as e: - if e.errno in {errno.ELOOP, errno.ENOTDIR}: + if e.errno == errno.ELOOP: + # A symlink (or unsafe '..') somewhere in the path. This is + # distinct from "not a regular file" so operators can see + # why a path under a symlinked parent was skipped. + return "symlink_component", None + if e.errno == errno.ENOTDIR: return "not_regular_file", None return "unreadable", None diff --git a/tests/test_accounts.py b/tests/test_accounts.py index a78c5f6..ac12774 100644 --- a/tests/test_accounts.py +++ b/tests/test_accounts.py @@ -221,6 +221,25 @@ def test_find_user_ssh_files_ignores_symlink(tmp_path: Path): assert result == [] +def test_find_user_ssh_files_ignores_symlinked_ssh_dir(tmp_path: Path): + """A user who replaces ~/.ssh with a symlink to a sensitive directory must + not have files inside it harvested through the symlinked parent. os.path.isdir + follows symlinks, so the directory itself must be checked with islink(). + """ + + from enroll.accounts import find_user_ssh_files + + sensitive = tmp_path / "sensitive" + sensitive.mkdir() + (sensitive / "authorized_keys").write_text("ssh-rsa AAAA...\n", encoding="utf-8") + + home = tmp_path / "home" / "mallory" + home.mkdir(parents=True) + os.symlink(str(sensitive), str(home / ".ssh")) + + assert find_user_ssh_files(str(home)) == [] + + def test_find_user_ssh_files_handles_home_not_starting_with_slash(): from enroll.accounts import find_user_ssh_files diff --git a/tests/test_fsutil.py b/tests/test_fsutil.py index ebe2224..7b5771b 100644 --- a/tests/test_fsutil.py +++ b/tests/test_fsutil.py @@ -23,3 +23,54 @@ def test_stat_triplet_reports_mode(tmp_path: Path): assert mode == "0600" assert owner # non-empty string assert group # non-empty string + + +def test_open_no_follow_path_reads_regular_file(tmp_path: Path): + from enroll.fsutil import open_no_follow_path + + nested = tmp_path / "a" / "b" + nested.mkdir(parents=True) + f = nested / "file.txt" + f.write_text("hello\n", encoding="utf-8") + + fd = open_no_follow_path(str(f)) + try: + assert os.read(fd, 100) == b"hello\n" + finally: + os.close(fd) + + +def test_open_no_follow_path_refuses_symlinked_parent(tmp_path: Path): + import errno + + from enroll.fsutil import open_no_follow_path + + real = tmp_path / "real" + real.mkdir() + (real / "file.txt").write_text("x\n", encoding="utf-8") + (tmp_path / "link").symlink_to(real) + + try: + fd = open_no_follow_path(str(tmp_path / "link" / "file.txt")) + os.close(fd) + raise AssertionError("expected OSError for symlinked parent") + except OSError as e: + assert e.errno == errno.ELOOP + + +def test_open_no_follow_path_refuses_symlinked_leaf(tmp_path: Path): + import errno + + from enroll.fsutil import open_no_follow_path + + target = tmp_path / "target.txt" + target.write_text("x\n", encoding="utf-8") + link = tmp_path / "link.txt" + link.symlink_to(target) + + try: + fd = open_no_follow_path(str(link)) + os.close(fd) + raise AssertionError("expected OSError for symlinked leaf") + except OSError as e: + assert e.errno == errno.ELOOP diff --git a/tests/test_harvest_safety.py b/tests/test_harvest_safety.py index 6c5b338..ef01cc5 100644 --- a/tests/test_harvest_safety.py +++ b/tests/test_harvest_safety.py @@ -102,7 +102,45 @@ def test_capture_file_rejects_symlink_source_with_ignore_policy(tmp_path: Path): assert ok is False assert managed == [] - assert excluded and excluded[0].reason == "not_regular_file" + # Symlinked sources are now reported with the dedicated symlink_component + # reason (covers both symlinked leaves and symlinked parent directories), + # which is more precise than the old generic not_regular_file. + assert excluded and excluded[0].reason == "symlink_component" + + +def test_capture_file_rejects_symlinked_parent_with_ignore_policy(tmp_path: Path): + """O_NOFOLLOW only guards the final component. A regular file reached + through a symlinked *parent* directory must still be refused, otherwise a + file whose real location is deny-globbed could be captured while its + logical (recorded) path looks safe. + """ + + secret = tmp_path / "secretroot" + secret.mkdir() + (secret / "config").write_text("listen_port=8080\n", encoding="utf-8") + (tmp_path / "allowed").symlink_to(secret, target_is_directory=True) + bundle = tmp_path / "bundle" + bundle.mkdir() + + managed: list[ManagedFile] = [] + excluded: list[ExcludedFile] = [] + ok = capture_file( + bundle_dir=str(bundle), + role_name="role", + abs_path=str(tmp_path / "allowed" / "config"), + reason="test", + policy=IgnorePolicy(), + path_filter=PathFilter(), + managed_out=managed, + excluded_out=excluded, + ) + + assert ok is False + assert managed == [] + assert excluded and excluded[0].reason == "symlink_component" + # Nothing should have been written into the bundle. + artifact = bundle / "artifacts" / "role" / "allowed" / "config" + assert not artifact.exists() def test_prepare_new_private_dir_rejects_symlink_parent(tmp_path: Path): diff --git a/tests/test_ignore.py b/tests/test_ignore.py index 227bf65..2f138ef 100644 --- a/tests/test_ignore.py +++ b/tests/test_ignore.py @@ -236,7 +236,10 @@ def test_deny_reason_symlink_file(tmp_path: Path): link = tmp_path / "link" os.symlink(str(real_file), str(link)) reason = pol.deny_reason(str(link)) - assert reason == "not_regular_file" + # A symlinked path (final component or parent) is refused with the + # dedicated symlink_component reason so operators can tell symlink + # redirection apart from genuine non-regular files (sockets, devices). + assert reason == "symlink_component" def test_deny_reason_logs(tmp_path: Path): @@ -358,3 +361,58 @@ def test_noncanonical_backup_and_log_fastpaths(): assert pol._path_deny_reason("/var/log/foo/../bar.log") == "log_file" assert pol._path_deny_reason("/etc/foo/../something~") == "backup_file" assert pol._path_deny_reason("/etc//passwd-") == "backup_file" + + +def test_inspect_file_refuses_symlinked_parent_directory(tmp_path: Path): + """A regular file reached through a symlinked *parent* directory must be + refused, even though O_NOFOLLOW alone would only guard the final + component. Otherwise a file whose real location is deny-globbed (or whose + content is benign) could be captured while its logical path looks safe. + """ + + pol = IgnorePolicy() + secret = tmp_path / "secretroot" + secret.mkdir() + (secret / "config").write_text("listen_port=8080\n", encoding="utf-8") + (tmp_path / "allowed").symlink_to(secret) + + reason, inspection = pol.inspect_file(str(tmp_path / "allowed" / "config")) + assert reason == "symlink_component" + assert inspection is None + + +def test_inspect_file_refuses_denyglob_evasion_via_symlinked_parent(tmp_path: Path): + """The strongest variant: the real file lives under a deny-globbed dir, + but is reached via a symlinked parent so the *logical* path does not match + the deny glob. Content is non-secret-looking (DH params), so only the + parent-symlink check stands between the operator and disclosure. + """ + + pol = IgnorePolicy() + realdir = tmp_path / "ssl_private" + realdir.mkdir() + (realdir / "dhparam.pem").write_text( + "-----BEGIN DH PARAMETERS-----\nMII...\n-----END DH PARAMETERS-----\n", + encoding="utf-8", + ) + (tmp_path / "innocent").symlink_to(realdir) + + reason, inspection = pol.inspect_file(str(tmp_path / "innocent" / "dhparam.pem")) + assert reason == "symlink_component" + assert inspection is None + + +def test_inspect_file_still_captures_normal_nested_file(tmp_path: Path): + """Regression guard: ordinary files in real (non-symlinked) directories + must still be inspected and returned. + """ + + pol = IgnorePolicy() + nested = tmp_path / "etc" / "myapp" + nested.mkdir(parents=True) + (nested / "app.conf").write_text("workers=4\n", encoding="utf-8") + + reason, inspection = pol.inspect_file(str(nested / "app.conf")) + assert reason is None + assert inspection is not None + assert inspection.data == b"workers=4\n"