284 lines
9.2 KiB
Python
284 lines
9.2 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
import stat
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from enroll.capture import capture_file
|
|
from enroll.harvest import harvest
|
|
from enroll.harvest_types import ExcludedFile, ManagedFile
|
|
from enroll.ignore import FileInspection, IgnorePolicy
|
|
from enroll.manifest_safety import prepare_manifest_output_dir
|
|
from enroll.harvest_safety import OutputSafetyError, prepare_new_private_dir
|
|
from enroll.pathfilter import PathFilter
|
|
|
|
import enroll.harvest_safety as hs
|
|
|
|
|
|
class _RacePolicy(IgnorePolicy):
|
|
def inspect_file(self, path: str):
|
|
fd = os.open(path, os.O_RDONLY | getattr(os, "O_CLOEXEC", 0))
|
|
try:
|
|
st = os.fstat(fd)
|
|
data = os.read(fd, st.st_size)
|
|
finally:
|
|
os.close(fd)
|
|
Path(path).write_bytes(b"changed-after-inspection")
|
|
return None, FileInspection(data=data, stat_result=st)
|
|
|
|
|
|
def test_prepare_new_private_dir_refuses_existing_path(tmp_path: Path):
|
|
out = tmp_path / "bundle"
|
|
out.mkdir()
|
|
with pytest.raises(OutputSafetyError, match="already exists"):
|
|
prepare_new_private_dir(out, label="harvest output")
|
|
|
|
|
|
def test_prepare_new_private_dir_creates_0700(tmp_path: Path):
|
|
out = prepare_new_private_dir(tmp_path / "bundle", label="harvest output")
|
|
assert out.exists()
|
|
assert (out.stat().st_mode & 0o777) == 0o700
|
|
|
|
|
|
def test_harvest_refuses_existing_plaintext_output_dir(tmp_path: Path):
|
|
out = tmp_path / "bundle"
|
|
out.mkdir()
|
|
with pytest.raises(OutputSafetyError, match="already exists"):
|
|
harvest(str(out))
|
|
|
|
|
|
def test_manifest_output_dir_is_private_by_default(tmp_path: Path):
|
|
out = prepare_manifest_output_dir(tmp_path / "manifest")
|
|
assert (out.stat().st_mode & 0o777) == 0o700
|
|
|
|
|
|
def test_capture_file_writes_inspected_bytes_not_later_source(tmp_path: Path):
|
|
source = tmp_path / "source.conf"
|
|
source.write_bytes(b"safe-original")
|
|
bundle = tmp_path / "bundle"
|
|
bundle.mkdir()
|
|
|
|
managed: list[ManagedFile] = []
|
|
excluded: list[ExcludedFile] = []
|
|
ok = capture_file(
|
|
bundle_dir=str(bundle),
|
|
role_name="role",
|
|
abs_path=str(source),
|
|
reason="test",
|
|
policy=_RacePolicy(),
|
|
path_filter=PathFilter(),
|
|
managed_out=managed,
|
|
excluded_out=excluded,
|
|
)
|
|
|
|
assert ok is True
|
|
artifact = bundle / "artifacts" / "role" / str(source).lstrip("/")
|
|
assert artifact.read_bytes() == b"safe-original"
|
|
assert source.read_bytes() == b"changed-after-inspection"
|
|
|
|
|
|
def test_capture_file_rejects_symlink_source_with_ignore_policy(tmp_path: Path):
|
|
target = tmp_path / "target.conf"
|
|
target.write_text("safe=true\n", encoding="utf-8")
|
|
link = tmp_path / "link.conf"
|
|
link.symlink_to(target)
|
|
bundle = tmp_path / "bundle"
|
|
bundle.mkdir()
|
|
|
|
managed: list[ManagedFile] = []
|
|
excluded: list[ExcludedFile] = []
|
|
ok = capture_file(
|
|
bundle_dir=str(bundle),
|
|
role_name="role",
|
|
abs_path=str(link),
|
|
reason="test",
|
|
policy=IgnorePolicy(),
|
|
path_filter=PathFilter(),
|
|
managed_out=managed,
|
|
excluded_out=excluded,
|
|
)
|
|
|
|
assert ok is False
|
|
assert managed == []
|
|
assert excluded and excluded[0].reason == "not_regular_file"
|
|
|
|
|
|
def test_prepare_new_private_dir_rejects_symlink_parent(tmp_path: Path):
|
|
real = tmp_path / "real"
|
|
real.mkdir()
|
|
link = tmp_path / "link"
|
|
link.symlink_to(real, target_is_directory=True)
|
|
|
|
with pytest.raises(OutputSafetyError, match="parent path contains a symlink"):
|
|
prepare_new_private_dir(link / "bundle", label="harvest output")
|
|
|
|
|
|
def test_manifest_output_dir_rejects_symlink_parent(tmp_path: Path):
|
|
from enroll.manifest_safety import ManifestOutputError
|
|
|
|
real = tmp_path / "real"
|
|
real.mkdir()
|
|
link = tmp_path / "link"
|
|
link.symlink_to(real, target_is_directory=True)
|
|
|
|
with pytest.raises(ManifestOutputError, match="parent path contains a symlink"):
|
|
prepare_manifest_output_dir(link / "manifest")
|
|
|
|
|
|
def test_prepare_new_private_dir_rejects_untrusted_root_parent(
|
|
tmp_path: Path, monkeypatch
|
|
):
|
|
import enroll.harvest_safety as hs
|
|
|
|
untrusted = tmp_path / "untrusted"
|
|
untrusted.mkdir()
|
|
if hasattr(os, "geteuid") and os.geteuid() == 0:
|
|
try:
|
|
os.chown(untrusted, 65534, -1)
|
|
except OSError:
|
|
pass
|
|
|
|
monkeypatch.setattr(hs, "_effective_uid", lambda: 0)
|
|
with pytest.raises(OutputSafetyError, match="not owned by root"):
|
|
prepare_new_private_dir(untrusted / "bundle", label="harvest output")
|
|
|
|
|
|
def test_prepare_new_private_dir_uses_real_euid_despite_os_geteuid_monkeypatch(
|
|
tmp_path: Path, monkeypatch
|
|
):
|
|
import enroll.harvest_safety as hs
|
|
|
|
monkeypatch.setattr(hs.os, "geteuid", lambda: 0)
|
|
out = prepare_new_private_dir(tmp_path / "bundle", label="harvest output")
|
|
|
|
assert out.is_dir()
|
|
assert (out.stat().st_mode & 0o777) == 0o700
|
|
|
|
|
|
def test_write_text_output_file_replaces_final_symlink_not_target(tmp_path: Path):
|
|
from enroll.harvest_safety import write_text_output_file
|
|
|
|
target = tmp_path / "target.txt"
|
|
target.write_text("old\n", encoding="utf-8")
|
|
link = tmp_path / "report.txt"
|
|
link.symlink_to(target)
|
|
|
|
write_text_output_file(link, "new\n", label="test report")
|
|
|
|
assert not link.is_symlink()
|
|
assert link.read_text(encoding="utf-8") == "new\n"
|
|
assert target.read_text(encoding="utf-8") == "old\n"
|
|
|
|
|
|
def test_safe_output_parent_does_not_descend_into_raced_symlink(
|
|
tmp_path: Path, monkeypatch
|
|
):
|
|
import enroll.harvest_safety as hs
|
|
|
|
target = tmp_path / "target"
|
|
target.mkdir()
|
|
link = tmp_path / "link"
|
|
real_mkdir = os.mkdir
|
|
|
|
def racing_mkdir(path, mode=0o777, *, dir_fd=None):
|
|
if Path(path) == link and not link.exists():
|
|
link.symlink_to(target, target_is_directory=True)
|
|
if dir_fd is not None:
|
|
return real_mkdir(path, mode, dir_fd=dir_fd)
|
|
return real_mkdir(path, mode)
|
|
|
|
monkeypatch.setattr(hs.os, "mkdir", racing_mkdir)
|
|
|
|
with pytest.raises(OutputSafetyError, match="parent path contains a symlink"):
|
|
hs.ensure_safe_output_parent(link / "subdir" / "report.txt", label="report")
|
|
|
|
assert not (target / "subdir").exists()
|
|
|
|
|
|
def _stat_result(mode: int, *, uid: int = 0) -> os.stat_result:
|
|
return os.stat_result((mode, 1, 1, 1, uid, 0, 0, 0, 0, 0))
|
|
|
|
|
|
def test_effective_uid_handles_missing_geteuid(monkeypatch):
|
|
monkeypatch.setattr(hs, "_OS_GETEUID", None)
|
|
assert hs._effective_uid() is None
|
|
|
|
|
|
def test_effective_uid_handles_geteuid_error(monkeypatch):
|
|
def boom():
|
|
raise OSError("no euid")
|
|
|
|
monkeypatch.setattr(hs, "_OS_GETEUID", boom)
|
|
assert hs._effective_uid() is None
|
|
|
|
|
|
def test_trusted_root_parent_skips_checks_when_not_root(monkeypatch):
|
|
monkeypatch.setattr(hs, "_effective_uid", lambda: 1000)
|
|
hs._assert_trusted_root_parent(
|
|
Path("not-a-dir"), _stat_result(stat.S_IFREG | 0o644, uid=1234), label="x"
|
|
)
|
|
|
|
|
|
def test_trusted_root_parent_rejects_non_directory(monkeypatch):
|
|
monkeypatch.setattr(hs, "_effective_uid", lambda: 0)
|
|
with pytest.raises(OutputSafetyError, match="parent is not a directory"):
|
|
hs._assert_trusted_root_parent(
|
|
Path("file"), _stat_result(stat.S_IFREG | 0o644), label="x"
|
|
)
|
|
|
|
|
|
def test_trusted_root_parent_rejects_group_or_world_writable(monkeypatch):
|
|
monkeypatch.setattr(hs, "_effective_uid", lambda: 0)
|
|
with pytest.raises(OutputSafetyError, match="writable by group/other"):
|
|
hs._assert_trusted_root_parent(
|
|
Path("open-dir"), _stat_result(stat.S_IFDIR | 0o777), label="x"
|
|
)
|
|
|
|
|
|
def test_trusted_root_parent_allows_root_owned_sticky_shared_dir(monkeypatch):
|
|
monkeypatch.setattr(hs, "_effective_uid", lambda: 0)
|
|
hs._assert_trusted_root_parent(
|
|
Path("tmp"), _stat_result(stat.S_IFDIR | stat.S_ISVTX | 0o777), label="x"
|
|
)
|
|
|
|
|
|
def test_assert_no_existing_symlink_components_without_root_trust_still_rejects_symlink(
|
|
tmp_path: Path,
|
|
):
|
|
real = tmp_path / "real"
|
|
real.mkdir()
|
|
link = tmp_path / "link"
|
|
link.symlink_to(real, target_is_directory=True)
|
|
|
|
with pytest.raises(OutputSafetyError, match="parent path contains a symlink"):
|
|
hs._assert_no_existing_symlink_components(
|
|
link / "leaf", label="x", require_trusted_root_parents=False
|
|
)
|
|
|
|
|
|
def test_ensure_private_empty_dir_rejects_bad_existing_paths(tmp_path: Path):
|
|
file_path = tmp_path / "file"
|
|
file_path.write_text("x", encoding="utf-8")
|
|
with pytest.raises(OutputSafetyError, match="not a directory"):
|
|
hs.ensure_private_empty_dir(file_path, label="cache")
|
|
|
|
nonempty = tmp_path / "nonempty"
|
|
nonempty.mkdir()
|
|
(nonempty / "child").write_text("x", encoding="utf-8")
|
|
with pytest.raises(OutputSafetyError, match="not empty"):
|
|
hs.ensure_private_empty_dir(nonempty, label="cache")
|
|
|
|
real = tmp_path / "real"
|
|
real.mkdir()
|
|
link = tmp_path / "link"
|
|
link.symlink_to(real, target_is_directory=True)
|
|
with pytest.raises(OutputSafetyError, match="symlink"):
|
|
hs.ensure_private_empty_dir(link, label="cache")
|
|
|
|
|
|
def test_ensure_private_empty_dir_creates_private_dir(tmp_path: Path):
|
|
out = hs.ensure_private_empty_dir(tmp_path / "new-cache", label="cache")
|
|
assert out.is_dir()
|
|
assert (out.stat().st_mode & 0o777) == 0o700
|