diff --git a/.gitignore b/.gitignore index 4ef962d..07c956d 100644 --- a/.gitignore +++ b/.gitignore @@ -8,4 +8,3 @@ dist *.pdf *.csv *.html -coverage.xml diff --git a/tests/test_accounts.py b/tests/test_accounts.py index 36e5af9..d5cc267 100644 --- a/tests/test_accounts.py +++ b/tests/test_accounts.py @@ -141,174 +141,3 @@ def test_collect_non_system_users(monkeypatch, tmp_path: Path): assert u.primary_group == "users" assert u.supplementary_groups == ["admins"] assert u.ssh_files == ["/home/alice/.ssh/authorized_keys"] - - -def test_parse_login_defs_file_not_found(tmp_path: Path): - from enroll.accounts import parse_login_defs - - nonexistent = tmp_path / "nonexistent" / "login.defs" - vals = parse_login_defs(str(nonexistent)) - assert vals == {} - - -def test_parse_login_defs_handles_invalid_numbers(tmp_path: Path): - from enroll.accounts import parse_login_defs - - p = tmp_path / "login.defs" - p.write_text("UID_MIN not_a_number\nUID_MAX 60000\n", encoding="utf-8") - vals = parse_login_defs(str(p)) - assert "UID_MIN" not in vals - assert vals["UID_MAX"] == 60000 - - -def test_parse_group_handles_invalid_gid(tmp_path: Path): - from enroll.accounts import parse_group - - p = tmp_path / "group" - p.write_text( - "valid:x:1000:user1\n" "invalid_gid:x:notanint:user2\n", - encoding="utf-8", - ) - gid_to_name, name_to_gid, members = parse_group(str(p)) - assert 1000 in gid_to_name - assert gid_to_name[1000] == "valid" - assert "invalid_gid" not in name_to_gid - - -def test_parse_group_line_too_short(tmp_path: Path): - from enroll.accounts import parse_group - - p = tmp_path / "group" - p.write_text( - "valid:x:1000:user1\n" "shortline:x:1001\n", - encoding="utf-8", - ) - gid_to_name, name_to_gid, members = parse_group(str(p)) - assert 1000 in gid_to_name - assert 1001 not in gid_to_name - - -def test_is_human_user_filters_by_uid_and_shell(): - from enroll.accounts import is_human_user - - assert is_human_user(1000, "/bin/bash", 1000) is True - assert is_human_user(999, "/bin/bash", 1000) is False - assert is_human_user(1000, "/usr/sbin/nologin", 1000) is False - assert is_human_user(1000, "/usr/bin/nologin", 1000) is False - assert is_human_user(1000, "/bin/false", 1000) is False - assert is_human_user(1000, "", 1000) is True - - -def test_find_user_ssh_files_no_ssh_dir(tmp_path: Path): - from enroll.accounts import find_user_ssh_files - - home = tmp_path / "home" / "user" - home.mkdir(parents=True) - assert find_user_ssh_files(str(home)) == [] - - -def test_find_user_ssh_files_ignores_symlink(tmp_path: Path): - from enroll.accounts import find_user_ssh_files - - home = tmp_path / "home" / "user" - sshdir = home / ".ssh" - sshdir.mkdir(parents=True) - target = sshdir / "real_file" - target.write_text("x", encoding="utf-8") - os.symlink(str(target), str(sshdir / "authorized_keys")) - - result = find_user_ssh_files(str(home)) - assert result == [] - - -def test_find_user_ssh_files_handles_home_not_starting_with_slash(): - from enroll.accounts import find_user_ssh_files - - assert find_user_ssh_files("relative/path") == [] - assert find_user_ssh_files("") == [] - - -def test_collect_non_system_users_skips_nologin_users(tmp_path: Path): - import enroll.accounts as a - - orig_parse_login_defs = a.parse_login_defs - orig_parse_passwd = a.parse_passwd - orig_parse_group = a.parse_group - - passwd = tmp_path / "passwd" - passwd.write_text( - "root:x:0:0:root:/root:/bin/bash\n" - "alice:x:1000:1000:Alice:/home/alice:/bin/bash\n" - "nobody:x:65534:65534:nobody:/nonexistent:/usr/sbin/nologin\n" - "sysuser:x:100:100:Sys:/home/sys:/bin/bash\n", - encoding="utf-8", - ) - group = tmp_path / "group" - group.write_text("users:x:1000:alice\n", encoding="utf-8") - defs = tmp_path / "login.defs" - defs.write_text("UID_MIN 1000\n", encoding="utf-8") - - monkeypatch_wrapper = lambda fn, p: lambda path=str(p): fn(path) - - a.parse_login_defs = monkeypatch_wrapper(orig_parse_login_defs, defs) - a.parse_passwd = monkeypatch_wrapper(orig_parse_passwd, passwd) - a.parse_group = monkeypatch_wrapper(orig_parse_group, group) - a.find_user_ssh_files = lambda home: [] - - users = a.collect_non_system_users() - assert [u.name for u in users] == ["alice"] - - -def test_collect_non_system_users_skips_below_uid_min(tmp_path: Path): - import enroll.accounts as a - - orig_parse_login_defs = a.parse_login_defs - orig_parse_passwd = a.parse_passwd - orig_parse_group = a.parse_group - - passwd = tmp_path / "passwd" - passwd.write_text( - "root:x:0:0:root:/root:/bin/bash\n" - "sysuser:x:999:999:Sys:/home/sys:/bin/bash\n" - "alice:x:1000:1000:Alice:/home/alice:/bin/bash\n", - encoding="utf-8", - ) - group = tmp_path / "group" - group.write_text("users:x:1000:alice\n", encoding="utf-8") - defs = tmp_path / "login.defs" - defs.write_text("UID_MIN 1000\n", encoding="utf-8") - - a.parse_login_defs = lambda path=str(defs): orig_parse_login_defs(path) - a.parse_passwd = lambda path=str(passwd): orig_parse_passwd(path) - a.parse_group = lambda path=str(group): orig_parse_group(path) - a.find_user_ssh_files = lambda home: [] - - users = a.collect_non_system_users() - assert [u.name for u in users] == ["alice"] - - -def test_parse_group_handles_empty_lines(tmp_path: Path): - from enroll.accounts import parse_group - - p = tmp_path / "group" - p.write_text( - "valid:x:1000:user1\n" "\n" "another:x:1001:user2\n", - encoding="utf-8", - ) - gid_to_name, name_to_gid, members = parse_group(str(p)) - assert 1000 in gid_to_name - assert 1001 in gid_to_name - - -def test_parse_group_handles_short_lines(tmp_path: Path): - from enroll.accounts import parse_group - - p = tmp_path / "group" - p.write_text( - "valid:x:1000:user1\n" "short:x:1001\n" "another:x:1002:user2\n", - encoding="utf-8", - ) - gid_to_name, name_to_gid, members = parse_group(str(p)) - assert 1000 in gid_to_name - assert 1001 not in gid_to_name # skipped due to short line - assert 1002 in gid_to_name diff --git a/tests/test_cache_security.py b/tests/test_cache_security.py index 4fda1e1..9f31587 100644 --- a/tests/test_cache_security.py +++ b/tests/test_cache_security.py @@ -31,67 +31,3 @@ def test_ensure_dir_secure_ignores_chmod_failures(tmp_path: Path, monkeypatch): # Should not raise. _ensure_dir_secure(d) assert d.exists() and d.is_dir() - - -def test_safe_component_returns_unknown_for_empty_string(): - from enroll.cache import _safe_component - - assert _safe_component("") == "unknown" - assert _safe_component(" ") == "unknown" - - -def test_safe_component_truncates_long_strings(): - from enroll.cache import _safe_component - - long_str = "a" * 100 - result = _safe_component(long_str) - assert len(result) <= 64 - - -def test_safe_component_replaces_special_chars(): - from enroll.cache import _safe_component - - result = _safe_component("hello world!") - assert result == "hello_world_" - - -def test_enroll_cache_dir_uses_xdg_cache_home(monkeypatch): - from enroll.cache import enroll_cache_dir - - monkeypatch.setenv("XDG_CACHE_HOME", "/custom/cache") - result = enroll_cache_dir() - assert str(result) == "/custom/cache/enroll" - - -def test_harvest_cache_state_json_property(): - from enroll.cache import HarvestCache - - cache_dir = HarvestCache(dir=Path("/tmp/test")) - assert cache_dir.state_json == Path("/tmp/test/state.json") - - -def test_new_harvest_cache_dir_chmod_fails(tmp_path: Path, monkeypatch): - from enroll.cache import new_harvest_cache_dir - - def fake_enroll_cache_dir(): - return tmp_path / "enroll" - - def fake_chmod(path, mode): - raise OSError("no") - - monkeypatch.setattr("enroll.cache.enroll_cache_dir", fake_enroll_cache_dir) - monkeypatch.setattr(os, "chmod", fake_chmod) - - # Should not raise even though chmod fails - cache = new_harvest_cache_dir(hint="test") - assert cache.dir.exists() - assert isinstance(cache.dir, Path) - - -def test_enroll_cache_dir_uses_default_when_xdg_not_set(monkeypatch): - from enroll.cache import enroll_cache_dir - - # Remove XDG_CACHE_HOME if it exists - monkeypatch.delenv("XDG_CACHE_HOME", raising=False) - result = enroll_cache_dir() - assert str(result).endswith("/.local/cache/enroll") diff --git a/tests/test_debian.py b/tests/test_debian.py index ed9df7a..abad361 100644 --- a/tests/test_debian.py +++ b/tests/test_debian.py @@ -1,7 +1,6 @@ from __future__ import annotations from pathlib import Path -import pytest def test_dpkg_owner_parses_output(monkeypatch): @@ -97,441 +96,3 @@ def test_parse_status_conffiles_handles_continuations(tmp_path: Path): assert m["nginx"]["/etc/nginx/nginx.conf"] == "abcdef" assert m["nginx"]["/etc/nginx/mime.types"] == "123456" assert "other" not in m - - -def test_dpkg_owner_returns_none_on_diversion_only(monkeypatch): - import enroll.debian as d - - class P: - def __init__(self, rc: int, out: str): - self.returncode = rc - self.stdout = out - self.stderr = "" - - def fake_run(cmd, text, capture_output): - return P(0, "diversion by foo from: /etc/something\n") - - monkeypatch.setattr(d.subprocess, "run", fake_run) - assert d.dpkg_owner("/etc/something") is None - - -def test_dpkg_owner_handles_line_without_colon(monkeypatch): - import enroll.debian as d - - class P: - def __init__(self, rc: int, out: str): - self.returncode = rc - self.stdout = out - self.stderr = "" - - def fake_run(cmd, text, capture_output): - return P(0, "invalid line without colon\n") - - monkeypatch.setattr(d.subprocess, "run", fake_run) - assert d.dpkg_owner("/etc/foo") is None - - -def test_list_manual_packages_returns_empty_on_error(monkeypatch): - import enroll.debian as d - - class P: - def __init__(self, rc: int, out: str): - self.returncode = rc - self.stdout = out - self.stderr = "" - - def fake_run(cmd, text, capture_output): - return P(1, "error") - - monkeypatch.setattr(d.subprocess, "run", fake_run) - assert d.list_manual_packages() == [] - - -def test_list_installed_packages_handles_exception(monkeypatch): - import enroll.debian as d - - def fake_run(*args, **kwargs): - raise Exception("simulated error") - - monkeypatch.setattr(d.subprocess, "run", fake_run) - assert d.list_installed_packages() == {} - - -def test_list_installed_packages_parses_output(): - import enroll.debian as d - - class P: - def __init__(self, rc: int, out: str): - self.returncode = rc - self.stdout = out - self.stderr = "" - - original_run = d.subprocess.run - - def fake_run(cmd, text, capture_output, check): - return P(0, "nginx\t1.18.0\tamd64\nvim\t8.2\tamd64\n") - - d.subprocess.run = fake_run - try: - result = d.list_installed_packages() - assert "nginx" in result - assert result["nginx"][0]["version"] == "1.18.0" - assert result["nginx"][0]["arch"] == "amd64" - assert "vim" in result - finally: - d.subprocess.run = original_run - - -def test_list_installed_packages_skips_invalid_lines(): - import enroll.debian as d - - class P: - def __init__(self, rc: int, out: str): - self.returncode = rc - self.stdout = out - self.stderr = "" - - original_run = d.subprocess.run - - def fake_run(cmd, text, capture_output, check): - return P(0, "nginx\t1.18.0\tamd64\ninvalid_line\n\t1.0\tamd64\n") - - d.subprocess.run = fake_run - try: - result = d.list_installed_packages() - assert "nginx" in result - assert "invalid_line" not in result - finally: - d.subprocess.run = original_run - - -def test_list_installed_packages_handles_empty_name(): - import enroll.debian as d - - class P: - def __init__(self, rc: int, out: str): - self.returncode = rc - self.stdout = out - self.stderr = "" - - original_run = d.subprocess.run - - def fake_run(cmd, text, capture_output, check): - return P(0, "\t1.0\tamd64\nnginx\t1.18.0\tamd64\n") - - d.subprocess.run = fake_run - try: - result = d.list_installed_packages() - assert "" not in result - assert "nginx" in result - finally: - d.subprocess.run = original_run - - -def test_list_installed_packages_sorts_output(): - import enroll.debian as d - - class P: - def __init__(self, rc: int, out: str): - self.returncode = rc - self.stdout = out - self.stderr = "" - - original_run = d.subprocess.run - - def fake_run(cmd, text, capture_output, check): - return P(0, "nginx\t1.18.0\tamd64\nnginx\t1.19.0\tarm64\n") - - d.subprocess.run = fake_run - try: - result = d.list_installed_packages() - assert len(result["nginx"]) == 2 - assert result["nginx"][0]["arch"] == "amd64" - assert result["nginx"][1]["arch"] == "arm64" - finally: - d.subprocess.run = original_run - - -def test_build_dpkg_etc_index_handles_missing_file(tmp_path: Path): - import enroll.debian as d - - info = tmp_path / "info" - info.mkdir() - # Don't create any .list files - - owned, owner_map, topdir_to_pkgs, pkg_to_etc = d.build_dpkg_etc_index(str(info)) - assert owned == set() - assert owner_map == {} - assert topdir_to_pkgs == {} - assert pkg_to_etc == {} - - -def test_build_dpkg_etc_index_skips_non_etc_paths(tmp_path: Path): - import enroll.debian as d - - info = tmp_path / "info" - info.mkdir() - (info / "foo.list").write_text("/usr/bin/foo\n/etc/bar\n", encoding="utf-8") - - owned, owner_map, topdir_to_pkgs, pkg_to_etc = d.build_dpkg_etc_index(str(info)) - assert "/usr/bin/foo" not in owned - assert "/etc/bar" in owned - assert "foo" not in topdir_to_pkgs - - -def test_parse_status_conffiles_handles_empty_status(tmp_path: Path): - import enroll.debian as d - - status = tmp_path / "status" - status.write_text("", encoding="utf-8") - m = d.parse_status_conffiles(str(status)) - assert m == {} - - -def test_parse_status_conffiles_handles_package_without_conffiles(tmp_path: Path): - import enroll.debian as d - - status = tmp_path / "status" - status.write_text( - "Package: nginx\nVersion: 1\nStatus: install ok installed\n", - encoding="utf-8", - ) - m = d.parse_status_conffiles(str(status)) - assert m == {} - - -def test_read_pkg_md5sums_returns_empty_if_file_not_exists(tmp_path: Path): - import enroll.debian as d - - result = d.read_pkg_md5sums("nonexistent_package") - assert result == {} - - -def test_read_pkg_md5sums_parses_md5sums_file(tmp_path: Path, monkeypatch): - import enroll.debian as d - - info_dir = tmp_path / "info" - info_dir.mkdir() - md5_file = info_dir / "nginx.md5sums" - md5_file.write_text( - "abcdef1234567890abcdef1234567890 etc/nginx/nginx.conf\n" - "1234567890abcdef1234567890abcdef etc/nginx/sites-enabled/default\n", - encoding="utf-8", - ) - - def fake_exists(path): - return str(path).endswith("nginx.md5sums") - - monkeypatch.setattr(d.os.path, "exists", fake_exists) - - original_open = open - - def fake_open(path, *args, **kwargs): - if "nginx.md5sums" in str(path): - return original_open(md5_file, *args, **kwargs) - return original_open(path, *args, **kwargs) - - monkeypatch.setattr("builtins.open", fake_open, raising=False) - - result = d.read_pkg_md5sums("nginx") - assert result["etc/nginx/nginx.conf"] == "abcdef1234567890abcdef1234567890" - assert ( - result["etc/nginx/sites-enabled/default"] == "1234567890abcdef1234567890abcdef" - ) - - -def test_dpkg_owner_raises_on_command_failure(monkeypatch): - """Test _run raises RuntimeError on non-zero exit.""" - import enroll.debian as d - - class P: - returncode = 1 - stdout = "" - stderr = "command failed" - - def fake_run(cmd, text, capture_output, check=False): - return P() - - monkeypatch.setattr(d.subprocess, "run", fake_run) - - with pytest.raises(RuntimeError) as exc_info: - d._run(["fake", "command"]) - - assert "Command failed" in str(exc_info.value) - assert "fake" in str(exc_info.value) - - -def test_build_dpkg_etc_index_skips_invalid_line_formats(tmp_path: Path): - """Test that lines with less than 3 parts are skipped.""" - import enroll.debian as d - - info = tmp_path / "info" - info.mkdir() - # Create a .list file with invalid format (missing tab-separated fields) - (info / "foo.list").write_text( - "/etc/foo/bar\n" # This is a path, not a tab-separated line - "/etc/foo/baz\n", - encoding="utf-8", - ) - - # Should handle gracefully - owned, owner_map, topdir_to_pkgs, pkg_to_etc = d.build_dpkg_etc_index(str(info)) - # The path lines should be processed normally - assert "/etc/foo/bar" in owned or "/etc/foo/baz" in owned - - -def test_build_dpkg_etc_index_handles_file_not_found(tmp_path: Path): - """Test that FileNotFoundError is handled gracefully.""" - import enroll.debian as d - - info = tmp_path / "info" - info.mkdir() - # Create a .list file that references a non-existent path - (info / "foo.list").write_text( - "/nonexistent/path\n", - encoding="utf-8", - ) - - # Should not raise - owned, owner_map, topdir_to_pkgs, pkg_to_etc = d.build_dpkg_etc_index(str(info)) - # The non-existent path should be skipped - assert "/nonexistent/path" not in owned - - -def test_parse_status_conffiles_skips_empty_lines(tmp_path: Path): - """Test that empty lines in conffiles are skipped.""" - import enroll.debian as d - - status = tmp_path / "status" - status.write_text( - "Package: nginx\n" - "Version: 1\n" - "Conffiles:\n" - " /etc/nginx/nginx.conf abcdef\n" - " /etc/nginx/mime.types 123456\n" - "\n", # Empty line to trigger flush - encoding="utf-8", - ) - - m = d.parse_status_conffiles(str(status)) - assert "/etc/nginx/nginx.conf" in m["nginx"] - assert "/etc/nginx/mime.types" in m["nginx"] - - -def test_read_pkg_md5sums_skips_invalid_md5_lines(tmp_path: Path, monkeypatch): - """Test that lines without proper MD5 format are skipped.""" - import enroll.debian as d - - info_dir = tmp_path / "info" - info_dir.mkdir() - md5_file = info_dir / "foo.md5sums" - md5_file.write_text( - "abcdef1234567890abcdef1234567890 etc/foo/bar\n" - "invalid line without proper format\n" - "1234567890abcdef1234567890abcdef etc/foo/baz\n", - encoding="utf-8", - ) - - def fake_exists(path): - return str(path).endswith("foo.md5sums") - - monkeypatch.setattr(d.os.path, "exists", fake_exists) - - original_open = open - - def fake_open(path, *args, **kwargs): - if "foo.md5sums" in str(path): - return original_open(md5_file, *args, **kwargs) - return original_open(path, *args, **kwargs) - - monkeypatch.setattr("builtins.open", fake_open, raising=False) - - result = d.read_pkg_md5sums("foo") - assert "etc/foo/bar" in result - assert "etc/foo/baz" in result - - -def test_build_dpkg_etc_index_skips_lines_without_tabs(tmp_path: Path): - """Test that lines without tab separators are skipped (parts < 3).""" - import enroll.debian as d - - info = tmp_path / "info" - info.mkdir() - # Create file with lines that don't have tab separators - (info / "foo.list").write_text( - "notabseparator\n" # No tab - should be skipped - "/etc/foo/bar\n", # This is a path line, processed differently - encoding="utf-8", - ) - - owned, owner_map, topdir_to_pkgs, pkg_to_etc = d.build_dpkg_etc_index(str(info)) - # Path lines are still processed - assert "/etc/foo/bar" in owned - - -def test_read_pkg_md5sums_skips_empty_lines(tmp_path: Path, monkeypatch): - """Test that empty lines in md5sums are skipped.""" - import enroll.debian as d - - info_dir = tmp_path / "info" - info_dir.mkdir() - md5_file = info_dir / "bar.md5sums" - md5_file.write_text( - "abcdef1234567890abcdef1234567890 etc/bar/file1\n" - "\n" # Empty line - "1234567890abcdef1234567890abcdef etc/bar/file2\n", - encoding="utf-8", - ) - - def fake_exists(path): - return str(path).endswith("bar.md5sums") - - monkeypatch.setattr(d.os.path, "exists", fake_exists) - - original_open = open - - def fake_open(path, *args, **kwargs): - if "bar.md5sums" in str(path): - return original_open(md5_file, *args, **kwargs) - return original_open(path, *args, **kwargs) - - monkeypatch.setattr("builtins.open", fake_open, raising=False) - - result = d.read_pkg_md5sums("bar") - assert "etc/bar/file1" in result - assert "etc/bar/file2" in result - - -def test_read_pkg_md5sums_skips_lines_not_starting_with_path( - tmp_path: Path, monkeypatch -): - """Test that lines not starting with / are skipped.""" - import enroll.debian as d - - info_dir = tmp_path / "info" - info_dir.mkdir() - md5_file = info_dir / "baz.md5sums" - md5_file.write_text( - "abcdef1234567890abcdef1234567890 etc/baz/file1\n" - "invalid line\n" # Doesn't start with / - "1234567890abcdef1234567890abcdef etc/baz/file2\n", - encoding="utf-8", - ) - - def fake_exists(path): - return str(path).endswith("baz.md5sums") - - monkeypatch.setattr(d.os.path, "exists", fake_exists) - - original_open = open - - def fake_open(path, *args, **kwargs): - if "baz.md5sums" in str(path): - return original_open(md5_file, *args, **kwargs) - return original_open(path, *args, **kwargs) - - monkeypatch.setattr("builtins.open", fake_open, raising=False) - - result = d.read_pkg_md5sums("baz") - assert "etc/baz/file1" in result - assert "etc/baz/file2" in result diff --git a/tests/test_diff_bundle.py b/tests/test_diff_bundle.py index 2895484..66ef094 100644 --- a/tests/test_diff_bundle.py +++ b/tests/test_diff_bundle.py @@ -6,15 +6,6 @@ from pathlib import Path import pytest -from enroll.diff import ( - _Spinner, - _enforcement_plan, - has_enforceable_drift, - _role_tag, - _utc_now_iso, - _report_markdown, -) - def _make_bundle_dir(tmp_path: Path) -> Path: b = tmp_path / "bundle" @@ -96,1278 +87,3 @@ def test_bundle_from_input_missing_path(tmp_path: Path): with pytest.raises(RuntimeError, match="not found"): d._bundle_from_input(str(tmp_path / "nope"), sops_mode=False) - - -import json -import sys - - -from enroll.diff import ( - _bundle_from_input, - _file_index, - _iter_managed_files, - _load_state, - _pkg_version_display, - _pkg_version_key, - _progress_enabled, - _roles, - _service_units, - _sha256, - _users_by_name, - compare_harvests, -) -from enroll.sopsutil import SopsError - - -def test_progress_enabled_when_tty(monkeypatch): - monkeypatch.setattr(sys.stderr, "isatty", lambda: True) - monkeypatch.delenv("ENROLL_NO_PROGRESS", raising=False) - assert _progress_enabled() is True - - -def test_progress_enabled_when_not_tty(monkeypatch): - monkeypatch.setattr(sys.stderr, "isatty", lambda: False) - monkeypatch.delenv("ENROLL_NO_PROGRESS", raising=False) - assert _progress_enabled() is False - - -def test_progress_enabled_with_env_var(monkeypatch): - monkeypatch.setattr(sys.stderr, "isatty", lambda: True) - monkeypatch.setenv("ENROLL_NO_PROGRESS", "1") - assert _progress_enabled() is False - - monkeypatch.setenv("ENROLL_NO_PROGRESS", "true") - assert _progress_enabled() is False - - monkeypatch.setenv("ENROLL_NO_PROGRESS", "yes") - assert _progress_enabled() is False - - -def test_sha256(tmp_path: Path): - test_file = tmp_path / "test.txt" - test_file.write_text("hello world", encoding="utf-8") - hash_result = _sha256(test_file) - assert len(hash_result) == 64 - - -def test_sha256_empty_file(tmp_path: Path): - test_file = tmp_path / "empty.txt" - test_file.write_bytes(b"") - hash_result = _sha256(test_file) - assert ( - hash_result - == "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" - ) - - -def test_bundle_from_input_directory(tmp_path: Path): - result = _bundle_from_input(str(tmp_path), sops_mode=False) - assert result.dir == tmp_path - assert result.tempdir is None - - -def test_bundle_from_input_state_json_path(tmp_path: Path): - state_file = tmp_path / "state.json" - state_file.write_text("{}", encoding="utf-8") - result = _bundle_from_input(str(state_file), sops_mode=False) - assert result.dir == tmp_path - assert result.tempdir is None - - -def test_bundle_from_input_not_found(): - with pytest.raises(RuntimeError) as exc_info: - _bundle_from_input("/nonexistent/path", sops_mode=False) - assert "not found" in str(exc_info.value).lower() - - -def test_bundle_from_input_tarball(tmp_path: Path): - bundle_dir = tmp_path / "bundle" - bundle_dir.mkdir() - state_file = bundle_dir / "state.json" - state_file.write_text("{}", encoding="utf-8") - - tar_path = tmp_path / "bundle.tar.gz" - with tarfile.open(tar_path, "w:gz") as tf: - tf.add(bundle_dir, arcname="bundle") - - result = _bundle_from_input(str(tar_path), sops_mode=False) - assert result.dir.exists() - assert result.tempdir is not None - result.tempdir.cleanup() - - -def test_bundle_from_input_invalid_type(tmp_path: Path): - test_file = tmp_path / "test.txt" - test_file.write_text("not a bundle", encoding="utf-8") - - with pytest.raises(RuntimeError) as exc_info: - _bundle_from_input(str(test_file), sops_mode=False) - assert "not a directory" in str(exc_info.value).lower() - - -def test_load_state(tmp_path: Path): - state_file = tmp_path / "state.json" - state_file.write_text('{"host": {"hostname": "test"}}', encoding="utf-8") - result = _load_state(tmp_path) - assert result["host"]["hostname"] == "test" - - -def test_roles_with_roles(): - state = {"roles": {"users": {}, "services": []}} - result = _roles(state) - assert "users" in result - - -def test_service_units_empty(): - assert _service_units({}) == {} - - -def test_service_units_with_services(): - state = { - "roles": { - "services": [ - {"unit": "nginx.service", "active_state": "active"}, - {"unit": "ssh.service", "active_state": "inactive"}, - ] - } - } - result = _service_units(state) - assert "nginx.service" in result - assert "ssh.service" in result - assert result["nginx.service"]["active_state"] == "active" - - -def test_users_by_name_empty(): - assert _users_by_name({}) == {} - - -def test_users_by_name_with_users(): - state = { - "roles": { - "users": { - "users": [ - {"name": "alice", "uid": 1000}, - {"name": "bob", "uid": 1001}, - ] - } - } - } - result = _users_by_name(state) - assert "alice" in result - assert "bob" in result - assert result["alice"]["uid"] == 1000 - - -def test_pkg_version_key_with_version(): - entry = {"version": "1.2.3"} - assert _pkg_version_key(entry) == "1.2.3" - - -def test_pkg_version_key_with_installations(): - entry = { - "installations": [ - {"arch": "x86_64", "version": "1.2.3"}, - {"arch": "aarch64", "version": "1.2.3"}, - ] - } - result = _pkg_version_key(entry) - assert "x86_64:1.2.3" in result - assert "aarch64:1.2.3" in result - - -def test_pkg_version_key_with_empty_version(): - entry = {"version": None} - assert _pkg_version_key(entry) is None - - -def test_pkg_version_key_with_invalid_installations(): - entry = {"installations": ["not_a_dict", {"arch": "x86_64", "version": "1.0"}]} - result = _pkg_version_key(entry) - assert "x86_64:1.0" in result - - -def test_pkg_version_display_with_version(): - entry = {"version": "1.2.3"} - assert _pkg_version_display(entry) == "1.2.3" - - -def test_pkg_version_display_with_installations(): - entry = { - "installations": [ - {"arch": "x86_64", "version": "1.2.3"}, - ] - } - assert _pkg_version_display(entry) == "1.2.3 (x86_64)" - - -def test_pkg_version_display_empty(): - assert _pkg_version_display({}) is None - - -def test_iter_managed_files_empty(): - state = {"roles": {}} - files = list(_iter_managed_files(state)) - assert files == [] - - -def test_iter_managed_files_services(): - state = { - "roles": { - "services": [ - { - "role_name": "nginx", - "managed_files": [ - {"path": "/etc/nginx/nginx.conf", "src_rel": "nginx.conf"} - ], - } - ] - } - } - files = list(_iter_managed_files(state)) - assert len(files) == 1 - assert files[0] == ( - "nginx", - {"path": "/etc/nginx/nginx.conf", "src_rel": "nginx.conf"}, - ) - - -def test_iter_managed_files_packages(): - state = { - "roles": { - "packages": [ - { - "role_name": "vim", - "managed_files": [{"path": "/usr/bin/vim", "src_rel": "bin/vim"}], - } - ] - } - } - files = list(_iter_managed_files(state)) - assert len(files) == 1 - assert files[0][0] == "vim" - - -def test_iter_managed_files_users(): - state = { - "roles": { - "users": { - "role_name": "users", - "managed_files": [{"path": "/etc/passwd", "src_rel": "passwd"}], - } - } - } - files = list(_iter_managed_files(state)) - assert len(files) == 1 - assert files[0][0] == "users" - - -def test_iter_managed_files_apt_config(): - state = { - "roles": { - "apt_config": { - "role_name": "apt_config", - "managed_files": [ - {"path": "/etc/apt/sources.list", "src_rel": "sources.list"} - ], - } - } - } - files = list(_iter_managed_files(state)) - assert len(files) == 1 - assert files[0][0] == "apt_config" - - -def test_iter_managed_files_etc_custom(): - state = { - "roles": { - "etc_custom": { - "role_name": "etc_custom", - "managed_files": [ - {"path": "/etc/custom.conf", "src_rel": "custom.conf"} - ], - } - } - } - files = list(_iter_managed_files(state)) - assert len(files) == 1 - assert files[0][0] == "etc_custom" - - -def test_iter_managed_files_usr_local_custom(): - state = { - "roles": { - "usr_local_custom": { - "role_name": "usr_local_custom", - "managed_files": [ - {"path": "/usr/local/bin/script", "src_rel": "bin/script"} - ], - } - } - } - files = list(_iter_managed_files(state)) - assert len(files) == 1 - assert files[0][0] == "usr_local_custom" - - -def test_iter_managed_files_extra_paths(): - state = { - "roles": { - "extra_paths": { - "role_name": "extra_paths", - "managed_files": [{"path": "/opt/app/config", "src_rel": "config"}], - } - } - } - files = list(_iter_managed_files(state)) - assert len(files) == 1 - assert files[0][0] == "extra_paths" - - -def test_file_index_empty(): - state = {"roles": {}} - index = _file_index(Path("/tmp"), state) - assert index == {} - - -def test_file_index_with_files(tmp_path: Path): - state = { - "roles": { - "users": { - "managed_files": [ - {"path": "/etc/passwd", "src_rel": "passwd", "owner": "root"}, - ] - } - } - } - index = _file_index(tmp_path, state) - assert "/etc/passwd" in index - assert index["/etc/passwd"].role == "users" - assert index["/etc/passwd"].owner == "root" - - -def test_file_index_duplicates_first_wins(tmp_path: Path): - state = { - "roles": { - "users": { - "managed_files": [ - {"path": "/etc/passwd", "src_rel": "passwd"}, - ] - }, - "etc_custom": { - "managed_files": [ - {"path": "/etc/passwd", "src_rel": "custom_passwd"}, - ] - }, - } - } - index = _file_index(tmp_path, state) - assert "/etc/passwd" in index - assert index["/etc/passwd"].src_rel == "passwd" - - -def test_file_index_skips_missing_path_or_src_rel(tmp_path: Path): - state = { - "roles": { - "users": { - "managed_files": [ - {"path": "/etc/passwd"}, # missing src_rel - {"src_rel": "passwd"}, # missing path - ] - } - } - } - index = _file_index(tmp_path, state) - assert index == {} - - -def test_compare_harvests_no_changes(tmp_path: Path): - old_bundle = tmp_path / "old" - old_bundle.mkdir() - (old_bundle / "state.json").write_text( - json.dumps( - { - "inventory": {"packages": {"vim": {"version": "1.0"}}}, - "roles": {}, - } - ), - encoding="utf-8", - ) - - new_bundle = tmp_path / "new" - new_bundle.mkdir() - (new_bundle / "state.json").write_text( - json.dumps( - { - "inventory": {"packages": {"vim": {"version": "1.0"}}}, - "roles": {}, - } - ), - encoding="utf-8", - ) - - report, has_changes = compare_harvests(str(old_bundle), str(new_bundle)) - assert has_changes is False - assert report["packages"]["added"] == [] - assert report["packages"]["removed"] == [] - - -def test_compare_harvests_package_added(tmp_path: Path): - old_bundle = tmp_path / "old" - old_bundle.mkdir() - (old_bundle / "state.json").write_text( - json.dumps({"inventory": {"packages": {}}, "roles": {}}), - encoding="utf-8", - ) - - new_bundle = tmp_path / "new" - new_bundle.mkdir() - (new_bundle / "state.json").write_text( - json.dumps( - {"inventory": {"packages": {"vim": {"version": "1.0"}}}, "roles": {}} - ), - encoding="utf-8", - ) - - report, has_changes = compare_harvests(str(old_bundle), str(new_bundle)) - assert has_changes is True - assert "vim" in report["packages"]["added"] - - -def test_compare_harvests_package_removed(tmp_path: Path): - old_bundle = tmp_path / "old" - old_bundle.mkdir() - (old_bundle / "state.json").write_text( - json.dumps( - {"inventory": {"packages": {"vim": {"version": "1.0"}}}, "roles": {}} - ), - encoding="utf-8", - ) - - new_bundle = tmp_path / "new" - new_bundle.mkdir() - (new_bundle / "state.json").write_text( - json.dumps({"inventory": {"packages": {}}, "roles": {}}), - encoding="utf-8", - ) - - report, has_changes = compare_harvests(str(old_bundle), str(new_bundle)) - assert has_changes is True - assert "vim" in report["packages"]["removed"] - - -def test_compare_harvests_package_version_changed(tmp_path: Path): - old_bundle = tmp_path / "old" - old_bundle.mkdir() - (old_bundle / "state.json").write_text( - json.dumps( - {"inventory": {"packages": {"vim": {"version": "1.0"}}}, "roles": {}} - ), - encoding="utf-8", - ) - - new_bundle = tmp_path / "new" - new_bundle.mkdir() - (new_bundle / "state.json").write_text( - json.dumps( - {"inventory": {"packages": {"vim": {"version": "2.0"}}}, "roles": {}} - ), - encoding="utf-8", - ) - - report, has_changes = compare_harvests(str(old_bundle), str(new_bundle)) - assert has_changes is True - assert len(report["packages"]["version_changed"]) == 1 - - -def test_compare_harvests_ignore_package_versions(tmp_path: Path): - old_bundle = tmp_path / "old" - old_bundle.mkdir() - (old_bundle / "state.json").write_text( - json.dumps( - {"inventory": {"packages": {"vim": {"version": "1.0"}}}, "roles": {}} - ), - encoding="utf-8", - ) - - new_bundle = tmp_path / "new" - new_bundle.mkdir() - (new_bundle / "state.json").write_text( - json.dumps( - {"inventory": {"packages": {"vim": {"version": "2.0"}}}, "roles": {}} - ), - encoding="utf-8", - ) - - report, has_changes = compare_harvests( - str(old_bundle), str(new_bundle), ignore_package_versions=True - ) - assert report["packages"]["version_changed_ignored_count"] == 1 - - -def test_compare_harvests_service_added(tmp_path: Path): - old_bundle = tmp_path / "old" - old_bundle.mkdir() - (old_bundle / "state.json").write_text( - json.dumps({"inventory": {"packages": {}}, "roles": {"services": []}}), - encoding="utf-8", - ) - - new_bundle = tmp_path / "new" - new_bundle.mkdir() - (new_bundle / "state.json").write_text( - json.dumps( - { - "inventory": {"packages": {}}, - "roles": {"services": [{"unit": "nginx.service"}]}, - } - ), - encoding="utf-8", - ) - - report, has_changes = compare_harvests(str(old_bundle), str(new_bundle)) - assert has_changes is True - assert "nginx.service" in report["services"]["enabled_added"] - - -def test_compare_harvests_user_added(tmp_path: Path): - old_bundle = tmp_path / "old" - old_bundle.mkdir() - (old_bundle / "state.json").write_text( - json.dumps({"inventory": {"packages": {}}, "roles": {"users": {"users": []}}}), - encoding="utf-8", - ) - - new_bundle = tmp_path / "new" - new_bundle.mkdir() - (new_bundle / "state.json").write_text( - json.dumps( - { - "inventory": {"packages": {}}, - "roles": {"users": {"users": [{"name": "alice", "uid": 1000}]}}, - } - ), - encoding="utf-8", - ) - - report, has_changes = compare_harvests(str(old_bundle), str(new_bundle)) - assert has_changes is True - assert "alice" in report["users"]["added"] - - -def test_compare_harvests_with_exclude_paths(tmp_path: Path): - old_bundle = tmp_path / "old" - old_bundle.mkdir() - old_artifacts = old_bundle / "artifacts" / "users" - old_artifacts.mkdir(parents=True) - (old_artifacts / "passwd").write_text("old", encoding="utf-8") - (old_bundle / "state.json").write_text( - json.dumps( - { - "inventory": {"packages": {}}, - "roles": { - "users": { - "managed_files": [{"path": "/etc/passwd", "src_rel": "passwd"}] - } - }, - } - ), - encoding="utf-8", - ) - - new_bundle = tmp_path / "new" - new_bundle.mkdir() - new_artifacts = new_bundle / "artifacts" / "users" - new_artifacts.mkdir(parents=True) - (new_artifacts / "passwd").write_text("new", encoding="utf-8") - (new_bundle / "state.json").write_text( - json.dumps( - { - "inventory": {"packages": {}}, - "roles": { - "users": { - "managed_files": [{"path": "/etc/passwd", "src_rel": "passwd"}] - } - }, - } - ), - encoding="utf-8", - ) - - report, has_changes = compare_harvests( - str(old_bundle), str(new_bundle), exclude_paths=["/etc/passwd"] - ) - assert "/etc/passwd" not in [f["path"] for f in report["files"]["added"]] - assert "/etc/passwd" not in [f["path"] for f in report["files"]["removed"]] - assert "/etc/passwd" not in [f["path"] for f in report["files"]["changed"]] - - -def test_utc_now_iso(): - result = _utc_now_iso() - assert "T" in result - assert "+" in result or "Z" in result - - -def test_spinner_stop_without_start(): - spinner = _Spinner("Test") - spinner.stop(final_line="Done") - # Should not raise - - -def test_spinner_run_exception(monkeypatch): - class FakeStderr: - def write(self, s): - raise Exception("Write error") - - def flush(self): - pass - - monkeypatch.setattr(sys, "stderr", FakeStderr()) - - spinner = _Spinner("Test") - spinner.start() - spinner.stop() - - -def test_spinner_double_start(): - spinner = _Spinner("Test") - spinner.start() - spinner.start() # Should not raise or spawn another thread - spinner.stop() - - -def test_role_tag_normal(): - assert _role_tag("nginx") == "role_nginx" - assert _role_tag("my-app") == "role_my-app" - - -def test_role_tag_with_special_chars(): - assert _role_tag("my.app") == "role_my_app" - assert _role_tag("my app") == "role_my_app" - - -def test_role_tag_empty(): - assert _role_tag("") == "role_other" - assert _role_tag(" ") == "role_other" - - -def test_has_enforceable_drift_packages_removed(): - report = {"packages": {"removed": ["vim"]}} - assert has_enforceable_drift(report) is True - - -def test_has_enforceable_drift_services_removed(): - report = {"services": {"enabled_removed": ["nginx.service"]}} - assert has_enforceable_drift(report) is True - - -def test_has_enforceable_drift_service_changed(): - report = { - "services": { - "changed": [ - { - "unit": "nginx.service", - "changes": {"active_state": {"old": "active", "new": "inactive"}}, - } - ] - } - } - assert has_enforceable_drift(report) is True - - -def test_has_enforceable_drift_service_package_only_changed(): - # Service changed only in packages - should NOT be enforceable - report = { - "services": { - "changed": [ - { - "unit": "nginx.service", - "changes": {"packages": {"added": ["nginx-extra"]}}, - } - ] - } - } - assert has_enforceable_drift(report) is False - - -def test_has_enforceable_drift_users_removed(): - report = {"users": {"removed": ["alice"]}} - assert has_enforceable_drift(report) is True - - -def test_has_enforceable_drift_users_changed(): - report = { - "users": { - "changed": [ - {"name": "alice", "changes": {"uid": {"old": 1000, "new": 1001}}} - ] - } - } - assert has_enforceable_drift(report) is True - - -def test_has_enforceable_drift_files_removed(): - report = { - "files": { - "removed": [{"path": "/etc/passwd", "role": "users", "reason": "conffile"}] - } - } - assert has_enforceable_drift(report) is True - - -def test_has_enforceable_drift_files_changed(): - report = { - "files": { - "changed": [ - { - "path": "/etc/passwd", - "changes": {"content": {"old": "sha1", "new": "sha2"}}, - } - ] - } - } - assert has_enforceable_drift(report) is True - - -def test_has_enforceable_drift_no_drift(): - report = { - "packages": {"added": ["newpkg"]}, - "services": {"enabled_added": ["new.service"]}, - "users": {"added": ["bob"]}, - "files": {"added": ["/opt/newfile"]}, - } - assert has_enforceable_drift(report) is False - - -def test_enforcement_plan_packages_removed(monkeypatch, tmp_path: Path): - old_state = { - "roles": { - "services": [{"role_name": "nginx", "packages": ["nginx"]}], - "packages": [{"role_name": "vim", "package": "vim"}], - } - } - report = {"packages": {"removed": ["nginx", "vim"]}} - - result = _enforcement_plan(report, old_state, tmp_path) - assert "nginx" in result.get("roles", []) - assert "vim" in result.get("roles", []) - assert "role_nginx" in result.get("tags", []) - - -def test_enforcement_plan_users_changed(): - old_state = { - "roles": {"users": {"role_name": "users", "users": [{"name": "alice"}]}} - } - report = {"users": {"changed": [{"name": "alice", "changes": {"uid": {}}}]}} - - result = _enforcement_plan(report, old_state, Path("/tmp")) - assert "users" in result.get("roles", []) - - -def test_enforcement_plan_files_removed(tmp_path: Path): - # Create the artifacts directory structure that _file_index expects - artifacts_dir = tmp_path / "artifacts" / "etc_custom" - artifacts_dir.mkdir(parents=True) - - old_state = { - "roles": { - "etc_custom": { - "role_name": "etc_custom", - "managed_files": [ - {"path": "/etc/custom.conf", "src_rel": "custom.conf"} - ], - } - } - } - report = { - "files": {"removed": [{"path": "/etc/custom.conf", "role": "etc_custom"}]} - } - - result = _enforcement_plan(report, old_state, tmp_path) - assert "etc_custom" in result.get("roles", []) - - -def test_enforcement_plan_no_drift(): - old_state = {"roles": {}} - report = {"packages": {"added": ["newpkg"]}} - - result = _enforcement_plan(report, old_state, Path("/tmp")) - assert result.get("roles", []) == [] - - -def test_bundle_from_input_tgz(monkeypatch, tmp_path: Path): - bundle_dir = tmp_path / "bundle" - bundle_dir.mkdir() - state_file = bundle_dir / "state.json" - state_file.write_text("{}", encoding="utf-8") - - tar_path = tmp_path / "bundle.tgz" - with tarfile.open(tar_path, "w:gz") as tf: - tf.add(bundle_dir, arcname="bundle") - - result = _bundle_from_input(str(tar_path), sops_mode=False) - assert result.dir.exists() - assert result.tempdir is not None - result.tempdir.cleanup() - - -def test_bundle_from_input_sops_mode_no_sops(monkeypatch, tmp_path: Path): - # Create a fake .sops file - sops_file = tmp_path / "harvest.sops" - sops_file.write_bytes(b"encrypted") - - def fake_require(): - raise SopsError("sops not found") - - import enroll.diff as d - - monkeypatch.setattr(d, "require_sops_cmd", fake_require) - - with pytest.raises(SopsError): - _bundle_from_input(str(sops_file), sops_mode=True) - - -def test_report_markdown_basic(): - report = { - "generated_at": "2024-01-01T00:00:00Z", - "old": {"input": "old.tar.gz", "host": "host1"}, - "new": {"input": "new.tar.gz", "host": "host2"}, - "packages": {"added": ["vim"], "removed": [], "version_changed": []}, - "services": {"enabled_added": [], "enabled_removed": [], "changed": []}, - "users": {"added": [], "removed": [], "changed": []}, - "files": {"added": [], "removed": [], "changed": []}, - } - result = _report_markdown(report) - assert "## Packages" in result - assert "+ vim" in result - - -def test_report_markdown_with_enforcement_applied(): - report = { - "generated_at": "2024-01-01T00:00:00Z", - "old": {"input": "old.tar.gz"}, - "new": {"input": "new.tar.gz"}, - "packages": {"added": [], "removed": [], "version_changed": []}, - "services": {"enabled_added": [], "enabled_removed": [], "changed": []}, - "users": {"added": [], "removed": [], "changed": []}, - "files": {"added": [], "removed": [], "changed": []}, - "enforcement": { - "status": "applied", - "tags": ["role_users"], - "returncode": 0, - "finished_at": "2024-01-01T00:01:00Z", - }, - } - result = _report_markdown(report) - assert "Applied old harvest" in result - assert "role_users" in result - - -def test_report_markdown_with_enforcement_failed(): - report = { - "generated_at": "2024-01-01T00:00:00Z", - "old": {"input": "old.tar.gz"}, - "new": {"input": "new.tar.gz"}, - "packages": {"added": [], "removed": [], "version_changed": []}, - "services": {"enabled_added": [], "enabled_removed": [], "changed": []}, - "users": {"added": [], "removed": [], "changed": []}, - "files": {"added": [], "removed": [], "changed": []}, - "enforcement": { - "status": "failed", - "returncode": 1, - }, - } - result = _report_markdown(report) - assert "ansible-playbook failed" in result - - -def test_report_markdown_with_enforcement_skipped(): - report = { - "generated_at": "2024-01-01T00:00:00Z", - "old": {"input": "old.tar.gz"}, - "new": {"input": "new.tar.gz"}, - "packages": {"added": [], "removed": [], "version_changed": []}, - "services": {"enabled_added": [], "enabled_removed": [], "changed": []}, - "users": {"added": [], "removed": [], "changed": []}, - "files": {"added": [], "removed": [], "changed": []}, - "enforcement": { - "status": "skipped", - "reason": "no drift", - }, - } - result = _report_markdown(report) - assert "Skipped" in result - assert "no drift" in result - - -def test_report_markdown_with_version_ignored(): - report = { - "generated_at": "2024-01-01T00:00:00Z", - "old": {"input": "old.tar.gz"}, - "new": {"input": "new.tar.gz"}, - "packages": { - "added": [], - "removed": [], - "version_changed": [{"package": "vim", "old": "1.0", "new": "2.0"}], - "version_changed_ignored_count": 1, - }, - "services": {"enabled_added": [], "enabled_removed": [], "changed": []}, - "users": {"added": [], "removed": [], "changed": []}, - "files": {"added": [], "removed": [], "changed": []}, - } - result = _report_markdown(report) - assert "ignored 1" in result - - -def test_report_markdown_with_service_package_changes(): - report = { - "generated_at": "2024-01-01T00:00:00Z", - "old": {"input": "old.tar.gz"}, - "new": {"input": "new.tar.gz"}, - "packages": {"added": [], "removed": [], "version_changed": []}, - "services": { - "enabled_added": [], - "enabled_removed": [], - "changed": [ - { - "unit": "nginx.service", - "changes": {"packages": {"added": ["nginx-extra"], "removed": []}}, - } - ], - }, - "users": {"added": [], "removed": [], "changed": []}, - "files": {"added": [], "removed": [], "changed": []}, - } - result = _report_markdown(report) - assert "packages added" in result - - -def test_report_markdown_empty(): - report = { - "generated_at": "2024-01-01T00:00:00Z", - "old": {"input": "old.tar.gz"}, - "new": {"input": "new.tar.gz"}, - "packages": {}, - "services": {}, - "users": {}, - "files": {}, - } - result = _report_markdown(report) - assert "## Packages" in result - assert "## Services" in result - - -def test_spinner_start_stop(monkeypatch): - """Test spinner can be started and stopped.""" - import enroll.diff as d - - # Mock threading to avoid actual thread creation - class FakeThread: - def __init__(self, target, name, daemon): - self.target = target - self.daemon = daemon - - def start(self): - pass - - def join(self, timeout): - pass - - monkeypatch.setattr(d.threading, "Thread", FakeThread) - - spinner = d._Spinner("test message") - spinner.start() - spinner.stop() - - -def test_spinner_already_started(monkeypatch): - """Test spinner doesn't restart if already running.""" - import enroll.diff as d - - class FakeThread: - def __init__(self, target, name, daemon): - pass - - def start(self): - pass - - def join(self, timeout): - pass - - monkeypatch.setattr(d.threading, "Thread", FakeThread) - - spinner = d._Spinner("test message") - spinner.start() - spinner._thread = FakeThread(None, None, True) # Simulate already running - spinner.start() # Should return early - - -def test_spinner_stop_clears_line(monkeypatch, tmp_path): - """Test spinner stop clears the line.""" - import enroll.diff as d - import sys - - class FakeThread: - def __init__(self, target, name, daemon): - pass - - def start(self): - pass - - def join(self, timeout): - pass - - monkeypatch.setattr(d.threading, "Thread", FakeThread) - - # Capture stderr writes - writes = [] - original_write = sys.stderr.write - - def capture_write(s): - writes.append(s) - return original_write(s) - - monkeypatch.setattr(sys.stderr, "write", capture_write) - - spinner = d._Spinner("test message") - spinner._last_len = 20 - spinner.stop() - - # Should have written clearing sequence - assert any("\r" in w for w in writes) - - -def test_should_show_spinner_disabled_env(monkeypatch): - """Test spinner disabled via environment variable.""" - import enroll.diff as d - - monkeypatch.setenv("ENROLL_NO_PROGRESS", "1") - assert d._progress_enabled() is False - - monkeypatch.setenv("ENROLL_NO_PROGRESS", "true") - assert d._progress_enabled() is False - - monkeypatch.setenv("ENROLL_NO_PROGRESS", "yes") - assert d._progress_enabled() is False - - -def test_should_show_spinner_exception_on_isatty(monkeypatch): - """Test spinner returns False when isatty raises exception.""" - import enroll.diff as d - import sys - - original_stderr = sys.stderr - - class FakeStderr: - def isatty(self): - raise Exception("No tty") - - monkeypatch.setattr(sys, "stderr", FakeStderr()) - assert d._progress_enabled() is False - - # Restore - monkeypatch.setattr(sys, "stderr", original_stderr) - - -def test_all_packages_from_state(): - """Test _all_packages extracts sorted package list.""" - import enroll.diff as d - - state = { - "inventory": { - "packages": { - "nginx": [{"version": "1.0"}], - "vim": [{"version": "2.0"}], - "bash": [{"version": "3.0"}], - } - } - } - - result = d._all_packages(state) - assert result == ["bash", "nginx", "vim"] - - -def test_all_packages_empty_state(): - """Test _all_packages with empty state.""" - import enroll.diff as d - - state = {"inventory": {"packages": {}}} - result = d._all_packages(state) - assert result == [] - - -def test_roles_from_state(): - """Test _roles extracts roles from state.""" - import enroll.diff as d - - state = {"roles": {"web": {}, "db": {}}} - result = d._roles(state) - assert result == {"web": {}, "db": {}} - - -def test_roles_empty_state(): - """Test _roles with empty state.""" - import enroll.diff as d - - state = {} - result = d._roles(state) - assert result == {} - - -def test_pkg_version_key_with_multiple_versions(): - """Test _pkg_version_key handles multiple versions.""" - import enroll.diff as d - - entry = { - "installations": [ - {"version": "1.0", "arch": "amd64"}, - {"version": "2.0", "arch": "arm64"}, - ] - } - - result = d._pkg_version_key(entry) - # Just check it returns a non-None value with version info - assert result is not None - assert len(result) > 0 - - -def test_pkg_version_key_without_version(): - """Test _pkg_version_key skips entries without version.""" - import enroll.diff as d - - entry = { - "installations": [ - {"arch": "amd64"}, # No version - ] - } - - result = d._pkg_version_key(entry) - assert result is None - - -def test_pkg_version_key_with_empty_installations(): - """Test _pkg_version_key with empty installations.""" - import enroll.diff as d - - entry = {"installations": []} - result = d._pkg_version_key(entry) - assert result is None - - -def test_pkg_version_key_without_installations(): - """Test _pkg_version_key without installations key.""" - import enroll.diff as d - - entry = {} - result = d._pkg_version_key(entry) - assert result is None - - -def test_pkg_version_key_with_direct_version(): - """Test _pkg_version_key with direct version field.""" - import enroll.diff as d - - entry = {"version": "1.2.3"} - result = d._pkg_version_key(entry) - assert result == "1.2.3" - - -def test_report_text_with_exclude_paths(): - """Test _report_text includes exclude paths.""" - import enroll.diff as d - - report = { - "generated_at": "2024-01-01T00:00:00Z", - "old": {"input": "old.tar.gz", "host": "host1", "state_mtime": "mtime1"}, - "new": {"input": "new.tar.gz", "host": "host2", "state_mtime": "mtime2"}, - "filters": {"exclude_paths": ["/tmp/*", "/var/log/*"]}, - "packages": {"added": [], "removed": [], "version_changed": []}, - "services": {"enabled_added": [], "enabled_removed": [], "changed": []}, - "users": {"added": [], "removed": [], "changed": []}, - "files": {"added": [], "removed": [], "changed": []}, - } - result = d._report_text(report) - assert "file exclude patterns" in result - assert "/tmp/*" in result - - -def test_report_text_with_ignore_package_versions(): - """Test _report_text includes ignore package versions message.""" - import enroll.diff as d - - report = { - "generated_at": "2024-01-01T00:00:00Z", - "old": {"input": "old.tar.gz", "host": "host1", "state_mtime": "mtime1"}, - "new": {"input": "new.tar.gz", "host": "host2", "state_mtime": "mtime2"}, - "filters": {"ignore_package_versions": True}, - "packages": {"version_changed_ignored_count": 5}, - "services": {"enabled_added": [], "enabled_removed": [], "changed": []}, - "users": {"added": [], "removed": [], "changed": []}, - "files": {"added": [], "removed": [], "changed": []}, - } - result = d._report_text(report) - assert "package version drift: ignored" in result - assert "ignored 5 changes" in result - - -def test_report_text_with_enforcement_applied(): - """Test _report_text includes enforcement applied status.""" - import enroll.diff as d - - report = { - "generated_at": "2024-01-01T00:00:00Z", - "old": {"input": "old.tar.gz", "host": "host1", "state_mtime": "mtime1"}, - "new": {"input": "new.tar.gz", "host": "host2", "state_mtime": "mtime2"}, - "packages": {"added": [], "removed": [], "version_changed": []}, - "services": {"enabled_added": [], "enabled_removed": [], "changed": []}, - "users": {"added": [], "removed": [], "changed": []}, - "files": {"added": [], "removed": [], "changed": []}, - "enforcement": { - "status": "applied", - "returncode": 0, - "tags": ["test"], - "finished_at": "2024-01-01T01:00:00Z", - }, - } - result = d._report_text(report) - assert "Enforcement" in result - assert "applied old harvest via ansible-playbook" in result - assert "tags=test" in result - - -def test_report_text_with_enforcement_failed(): - """Test _report_text includes enforcement failed status.""" - import enroll.diff as d - - report = { - "generated_at": "2024-01-01T00:00:00Z", - "old": {"input": "old.tar.gz", "host": "host1", "state_mtime": "mtime1"}, - "new": {"input": "new.tar.gz", "host": "host2", "state_mtime": "mtime2"}, - "packages": {"added": [], "removed": [], "version_changed": []}, - "services": {"enabled_added": [], "enabled_removed": [], "changed": []}, - "users": {"added": [], "removed": [], "changed": []}, - "files": {"added": [], "removed": [], "changed": []}, - "enforcement": {"status": "failed", "returncode": 1}, - } - result = d._report_text(report) - assert "Enforcement" in result - assert "ansible-playbook failed" in result - - -def test_report_text_with_enforcement_skipped(): - """Test _report_text includes enforcement skipped status.""" - import enroll.diff as d - - report = { - "generated_at": "2024-01-01T00:00:00Z", - "old": {"input": "old.tar.gz", "host": "host1", "state_mtime": "mtime1"}, - "new": {"input": "new.tar.gz", "host": "host2", "state_mtime": "mtime2"}, - "packages": {"added": [], "removed": [], "version_changed": []}, - "services": {"enabled_added": [], "enabled_removed": [], "changed": []}, - "users": {"added": [], "removed": [], "changed": []}, - "files": {"added": [], "removed": [], "changed": []}, - "enforcement": {"status": "skipped", "reason": "no changes"}, - } - result = d._report_text(report) - assert "Enforcement" in result - assert "skipped" in result - assert "no changes" in result diff --git a/tests/test_harvest.py b/tests/test_harvest.py index 33b5302..1b884aa 100644 --- a/tests/test_harvest.py +++ b/tests/test_harvest.py @@ -1,5 +1,4 @@ import json -import enroll.harvest as harvest from pathlib import Path import enroll.harvest as h @@ -368,149 +367,3 @@ def test_shared_cron_snippet_prefers_matching_role_over_lexicographic( assert all( mf["path"] != "/etc/cron.d/ntpsec" for mf in svc_apparmor["managed_files"] ) - - -def test_files_differ_same_content(tmp_path: Path): - file1 = tmp_path / "file1.txt" - file2 = tmp_path / "file2.txt" - file1.write_text("same content", encoding="utf-8") - file2.write_text("same content", encoding="utf-8") - assert harvest._files_differ(str(file1), str(file2)) is False - - -def test_files_differ_different_content(tmp_path: Path): - file1 = tmp_path / "file1.txt" - file2 = tmp_path / "file2.txt" - file1.write_text("content1", encoding="utf-8") - file2.write_text("content2", encoding="utf-8") - assert harvest._files_differ(str(file1), str(file2)) is True - - -def test_files_differ_missing_file(tmp_path: Path): - file1 = tmp_path / "file1.txt" - file1.write_text("content", encoding="utf-8") - file2 = tmp_path / "file2.txt" - assert harvest._files_differ(str(file1), str(file2)) is True - - -def test_files_differ_both_missing(tmp_path: Path): - file1 = tmp_path / "file1.txt" - file2 = tmp_path / "file2.txt" - assert harvest._files_differ(str(file1), str(file2)) is True - - -def test_files_differ_binary(tmp_path: Path): - file1 = tmp_path / "file1.bin" - file2 = tmp_path / "file2.bin" - file1.write_bytes(b"\x00\x01\x02\x03") - file2.write_bytes(b"\x00\x01\x02\x03") - assert harvest._files_differ(str(file1), str(file2)) is False - - -def test_files_differ_binary_different(tmp_path: Path): - file1 = tmp_path / "file1.bin" - file2 = tmp_path / "file2.bin" - file1.write_bytes(b"\x00\x01\x02\x03") - file2.write_bytes(b"\x00\x01\x02\x04") - assert harvest._files_differ(str(file1), str(file2)) is True - - -def test_files_differ_non_regular_a(tmp_path: Path): - directory = tmp_path / "dir" - directory.mkdir() - file1 = tmp_path / "file1.txt" - file1.write_text("content", encoding="utf-8") - assert harvest._files_differ(str(directory), str(file1)) is True - - -def test_files_differ_non_regular_b(tmp_path: Path): - file1 = tmp_path / "file1.txt" - file1.write_text("content", encoding="utf-8") - directory = tmp_path / "dir" - directory.mkdir() - assert harvest._files_differ(str(file1), str(directory)) is True - - -def test_files_differ_size_mismatch(tmp_path: Path): - file1 = tmp_path / "file1.txt" - file1.write_text("short", encoding="utf-8") - file2 = tmp_path / "file2.txt" - file2.write_text("much longer content", encoding="utf-8") - assert harvest._files_differ(str(file1), str(file2)) is True - - -def test_files_differ_large_files(tmp_path: Path): - file1 = tmp_path / "file1.bin" - file2 = tmp_path / "file2.bin" - file1.write_bytes(b"x" * 3_000_000) - file2.write_bytes(b"x" * 3_000_000) - assert harvest._files_differ(str(file1), str(file2)) is True - - -def test_is_confish_with_conf(tmp_path: Path): - file1 = tmp_path / "test.conf" - file1.write_text("content", encoding="utf-8") - assert harvest._is_confish(str(file1)) is True - - -def test_is_confish_with_yaml(tmp_path: Path): - file1 = tmp_path / "test.yaml" - file1.write_text("content", encoding="utf-8") - assert harvest._is_confish(str(file1)) is True - - -def test_is_confish_with_json(tmp_path: Path): - file1 = tmp_path / "test.json" - file1.write_text("{}", encoding="utf-8") - assert harvest._is_confish(str(file1)) is True - - -def test_is_confish_with_service(tmp_path: Path): - file1 = tmp_path / "test.service" - file1.write_text("[Unit]", encoding="utf-8") - assert harvest._is_confish(str(file1)) is True - - -def test_is_confish_with_extensionless(tmp_path: Path): - file1 = tmp_path / "default" - file1.write_text("OPTIONS=", encoding="utf-8") - assert harvest._is_confish(str(file1)) is True - - -def test_is_confish_not_config(tmp_path: Path): - file1 = tmp_path / "test.log" - file1.write_text("log", encoding="utf-8") - assert harvest._is_confish(str(file1)) is False - - -def test_is_confish_nonexistent(): - assert harvest._is_confish("/nonexistent/file.xyz") is False - - -def test_topdirs_for_package_with_multiple_paths(): - pkg_to_etc_paths = { - "nginx": ["/etc/nginx/nginx.conf", "/etc/nginx/sites-enabled/default"], - } - result = harvest._topdirs_for_package("nginx", pkg_to_etc_paths) - assert result == {"nginx"} - - -def test_topdirs_for_package_with_multiple_topdirs(): - pkg_to_etc_paths = { - "multi": ["/etc/nginx/nginx.conf", "/etc/ssh/sshd_config"], - } - result = harvest._topdirs_for_package("multi", pkg_to_etc_paths) - assert result == {"nginx", "ssh"} - - -def test_topdirs_for_package_empty(): - result = harvest._topdirs_for_package("empty", {}) - assert result == set() - - -def test_topdirs_for_package_no_etc(): - pkg_to_etc_paths = { - "other": ["/usr/share/doc/file"], - } - result = harvest._topdirs_for_package("other", pkg_to_etc_paths) - assert result == set() diff --git a/tests/test_ignore.py b/tests/test_ignore.py index 2ba9a90..1eaae01 100644 --- a/tests/test_ignore.py +++ b/tests/test_ignore.py @@ -1,8 +1,3 @@ -from __future__ import annotations - -import os -from pathlib import Path - from enroll.ignore import IgnorePolicy @@ -13,238 +8,3 @@ def test_ignore_policy_denies_common_backup_files(): assert pol.deny_reason("/etc/group-") == "backup_file" assert pol.deny_reason("/etc/something~") == "backup_file" assert pol.deny_reason("/foobar") == "unreadable" - - -def test_deny_reason_dir_with_denied_path(): - pol = IgnorePolicy() - assert pol.deny_reason_dir("/etc/ssl/private/key") == "denied_path" - assert pol.deny_reason_dir("/etc/ssh/ssh_host_key") == "denied_path" - assert pol.deny_reason_dir("/etc/ssh") is None - - -def test_deny_reason_dir_unreadable(tmp_path: Path): - pol = IgnorePolicy() - nonexistent = tmp_path / "nonexistent" - assert pol.deny_reason_dir(str(nonexistent)) == "unreadable" - - -def test_deny_reason_dir_symlink(tmp_path: Path): - pol = IgnorePolicy() - real_dir = tmp_path / "real" - real_dir.mkdir() - link = tmp_path / "link" - os.symlink(str(real_dir), str(link)) - assert pol.deny_reason_dir(str(link)) == "symlink" - - -def test_deny_reason_dir_not_directory(tmp_path: Path): - pol = IgnorePolicy() - regular_file = tmp_path / "file.txt" - regular_file.write_text("content", encoding="utf-8") - assert pol.deny_reason_dir(str(regular_file)) == "not_directory" - - -def test_deny_reason_dir_dangerous_mode(tmp_path: Path): - pol = IgnorePolicy(dangerous=True) - real_dir = tmp_path / "private" - real_dir.mkdir() - assert pol.deny_reason_dir(str(real_dir)) is None - - -def test_deny_reason_link_basic(tmp_path: Path): - pol = IgnorePolicy() - real_file = tmp_path / "real" - real_file.write_text("content", encoding="utf-8") - link = tmp_path / "link" - os.symlink(str(real_file), str(link)) - assert pol.deny_reason_link(str(link)) is None - - -def test_deny_reason_link_denied_path(): - pol = IgnorePolicy() - assert pol.deny_reason_link("/etc/ssh/ssh_host_rsa_key") == "denied_path" - - -def test_deny_reason_link_unreadable(tmp_path: Path): - pol = IgnorePolicy() - # Create a symlink in a directory that doesn't exist - # This simulates an unreadable path - broken_link = tmp_path / "broken_link" - os.symlink("/nonexistent/target", str(broken_link)) - # Broken symlinks are still readable (we can readlink them) - # So they return None (allowed) unless they match deny globs - result = pol.deny_reason_link(str(broken_link)) - # Broken symlinks are allowed - we can still read the link target - assert result is None - - -def test_deny_reason_link_not_symlink(tmp_path: Path): - pol = IgnorePolicy() - regular_file = tmp_path / "file.txt" - regular_file.write_text("content", encoding="utf-8") - assert pol.deny_reason_link(str(regular_file)) == "not_symlink" - - -def test_deny_reason_link_log_file(): - pol = IgnorePolicy() - assert pol.deny_reason_link("/var/log/something.log") == "log_file" - - -def test_deny_reason_link_backup_file(): - pol = IgnorePolicy() - assert pol.deny_reason_link("/etc/passwd-") == "backup_file" - assert pol.deny_reason_link("/etc/something~") == "backup_file" - - -def test_deny_reason_link_dangerous_mode(tmp_path: Path): - pol = IgnorePolicy(dangerous=True) - real_file = tmp_path / "real" - real_file.write_text("content", encoding="utf-8") - link = tmp_path / "link" - os.symlink(str(real_file), str(link)) - assert pol.deny_reason_link(str(link)) is None - - -def test_iter_effective_lines_with_comments(): - pol = IgnorePolicy() - content = b""" -# This is a comment -; This is also a comment -* continuation -def main(): - pass -""" - lines = list(pol.iter_effective_lines(content)) - assert b"def main():" in lines - assert b"# This is a comment" not in lines - - -def test_iter_effective_lines_with_block_comments(): - pol = IgnorePolicy() - content = b""" -/* This is a block comment - spanning multiple lines */ -int x = 5; -""" - lines = list(pol.iter_effective_lines(content)) - assert b"int x = 5;" in lines - assert b"/*" not in lines - - -def test_iter_effective_lines_empty(): - pol = IgnorePolicy() - content = b"" - lines = list(pol.iter_effective_lines(content)) - assert lines == [] - - -def test_deny_reason_binary_not_allowed(tmp_path: Path): - pol = IgnorePolicy() - binary = tmp_path / "random.bin" - binary.write_bytes(b"\x00\x01\x02\x03") - reason = pol.deny_reason(str(binary)) - assert reason == "binary_like" - - -def test_deny_reason_sensitive_content(tmp_path: Path): - pol = IgnorePolicy() - config = tmp_path / "config.txt" - config.write_text("password=secret123", encoding="utf-8") - reason = pol.deny_reason(str(config)) - assert reason == "sensitive_content" - - -def test_deny_reason_sensitive_api_key(tmp_path: Path): - pol = IgnorePolicy() - config = tmp_path / "config.txt" - config.write_text("api_key=abc123", encoding="utf-8") - reason = pol.deny_reason(str(config)) - assert reason == "sensitive_content" - - -def test_deny_reason_private_key(tmp_path: Path): - pol = IgnorePolicy() - key = tmp_path / "key.pem" - key.write_text( - "-----BEGIN RSA PRIVATE KEY-----\nMIIEowIBAAKCAQEA...", encoding="utf-8" - ) - reason = pol.deny_reason(str(key)) - assert reason == "sensitive_content" - - -def test_deny_reason_too_large(tmp_path: Path): - pol = IgnorePolicy(max_file_bytes=100) - large = tmp_path / "large.txt" - large.write_bytes(b"x" * 200) - reason = pol.deny_reason(str(large)) - assert reason == "too_large" - - -def test_deny_reason_unreadable(tmp_path: Path): - pol = IgnorePolicy() - nonexistent = tmp_path / "nonexistent" - reason = pol.deny_reason(str(nonexistent)) - assert reason == "unreadable" - - -def test_deny_reason_not_regular_file(tmp_path: Path): - pol = IgnorePolicy() - directory = tmp_path / "dir" - directory.mkdir() - reason = pol.deny_reason(str(directory)) - assert reason == "not_regular_file" - - -def test_deny_reason_symlink_file(tmp_path: Path): - pol = IgnorePolicy() - real_file = tmp_path / "real" - real_file.write_text("content", encoding="utf-8") - link = tmp_path / "link" - os.symlink(str(real_file), str(link)) - reason = pol.deny_reason(str(link)) - assert reason == "not_regular_file" - - -def test_deny_reason_logs(tmp_path: Path): - pol = IgnorePolicy() - log = tmp_path / "test.log" - log.write_text("log content", encoding="utf-8") - assert pol.deny_reason(str(log)) == "log_file" - - -def test_deny_reason_backup_file(tmp_path: Path): - pol = IgnorePolicy() - backup = tmp_path / "file~" - backup.write_text("backup", encoding="utf-8") - assert pol.deny_reason(str(backup)) == "backup_file" - - -def test_deny_reason_shadow_file(): - pol = IgnorePolicy() - assert pol.deny_reason("/etc/shadow") == "denied_path" - assert pol.deny_reason("/etc/gshadow") == "denied_path" - - -def test_deny_reason_ssl_private(): - pol = IgnorePolicy() - assert pol.deny_reason("/etc/ssl/private/key.pem") == "denied_path" - - -def test_deny_reason_ssh_host_keys(): - pol = IgnorePolicy() - assert pol.deny_reason("/etc/ssh/ssh_host_rsa_key") == "denied_path" - assert pol.deny_reason("/etc/ssh/ssh_host_ed25519_key") == "denied_path" - - -def test_deny_reason_letsencrypt(): - pol = IgnorePolicy() - assert ( - pol.deny_reason("/etc/letsencrypt/live/example.com/fullchain.pem") - == "denied_path" - ) - - -def test_deny_reason_shadow_backup(): - pol = IgnorePolicy() - assert pol.deny_reason("/etc/shadow-") == "backup_file" - assert pol.deny_reason("/etc/passwd-") == "backup_file" diff --git a/tests/test_manifest.py b/tests/test_manifest.py index 1b78bcf..658d77f 100644 --- a/tests/test_manifest.py +++ b/tests/test_manifest.py @@ -892,175 +892,3 @@ def test_manifest_writes_firewall_runtime_role(tmp_path: Path): assert ( out / "roles" / "firewall_runtime" / "files" / "firewall" / "ipset.save" ).exists() - - -def test_try_yaml_with_yaml_installed(): - result = manifest._try_yaml() - # PyYAML should be installed for tests - if result is None: - pytest.skip("PyYAML not installed") - assert hasattr(result, "safe_load") - assert hasattr(result, "dump") - - -def test_yaml_load_mapping_with_yaml(tmp_path: Path): - text = """ -key1: value1 -key2: - nested: value -list: - - item1 - - item2 -""" - result = manifest._yaml_load_mapping(text) - assert result["key1"] == "value1" - assert result["key2"]["nested"] == "value" - assert result["list"] == ["item1", "item2"] - - -def test_yaml_load_mapping_empty(): - result = manifest._yaml_load_mapping("") - assert result == {} - - -def test_yaml_load_mapping_invalid(): - result = manifest._yaml_load_mapping("invalid: yaml: :") - assert result == {} - - -def test_yaml_load_mapping_not_dict(): - result = manifest._yaml_load_mapping("- item1\n- item2") - assert result == {} - - -def test_yaml_load_mapping_none(): - result = manifest._yaml_load_mapping("~") - assert result == {} - - -def test_yaml_dump_mapping_with_yaml(tmp_path: Path): - obj = {"key1": "value1", "key2": 123} - result = manifest._yaml_dump_mapping(obj) - assert "key1: value1" in result - assert "key2:" in result - - -def test_yaml_dump_mapping_empty(): - result = manifest._yaml_dump_mapping({}) - # Empty dict produces '{}' - assert result.strip() == "{}" - - -def test_yaml_dump_mapping_with_nested(tmp_path: Path): - obj = {"key1": {"nested": "value"}} - result = manifest._yaml_dump_mapping(obj) - assert "nested:" in result - - -def test_merge_mappings_overwrite_simple(): - existing = {"key1": "old", "key2": "keep"} - incoming = {"key1": "new", "key3": "added"} - result = manifest._merge_mappings_overwrite(existing, incoming) - assert result["key1"] == "new" - assert result["key2"] == "keep" - assert result["key3"] == "added" - - -def test_merge_mappings_overwrite_nested(): - existing = {"key1": {"a": 1}} - incoming = {"key1": {"b": 2}} - result = manifest._merge_mappings_overwrite(existing, incoming) - # Nested dicts are replaced, not merged - assert result["key1"] == {"b": 2} - - -def test_merge_mappings_overwrite_empty(): - result = manifest._merge_mappings_overwrite({}, {"key": "value"}) - assert result == {"key": "value"} - - result = manifest._merge_mappings_overwrite({"key": "value"}, {}) - assert result == {"key": "value"} - - -def test_copy2_replace(tmp_path: Path): - src = tmp_path / "src.txt" - src.write_text("content", encoding="utf-8") - dst = tmp_path / "dst" / "subdir" / "dst.txt" - - manifest._copy2_replace(str(src), str(dst)) - - assert dst.exists() - assert dst.read_text(encoding="utf-8") == "content" - - -def test_copy2_replace_preserves_metadata(tmp_path: Path): - src = tmp_path / "src.txt" - src.write_text("content", encoding="utf-8") - os.chmod(str(src), 0o644) - dst = tmp_path / "dst.txt" - - manifest._copy2_replace(str(src), str(dst)) - - assert dst.exists() - st = dst.stat() - assert stat.S_IMODE(st.st_mode) == 0o644 - - -def test_copy2_replace_atomic(tmp_path: Path): - src = tmp_path / "src.txt" - src.write_text("content", encoding="utf-8") - dst = tmp_path / "dst.txt" - - # Write initial content - dst.write_text("old", encoding="utf-8") - - manifest._copy2_replace(str(src), str(dst)) - - assert dst.read_text(encoding="utf-8") == "content" - - -def test_render_firewall_runtime_tasks_empty(): - state = {"roles": {}} - result = manifest._render_firewall_runtime_tasks(state) - # Function always returns at least a basic playbook structure - assert isinstance(result, str) - assert len(result) > 0 - - -def test_render_firewall_runtime_tasks_with_iptables(): - state = { - "roles": { - "firewall_runtime": { - "role_name": "firewall_runtime", - "iptables_v4_save": "artifacts/firewall_runtime/iptables.save", - } - } - } - result = manifest._render_firewall_runtime_tasks(state) - assert len(result) >= 1 - - -def test_render_firewall_runtime_tasks_with_ipset(): - state = { - "roles": { - "firewall_runtime": { - "role_name": "firewall_runtime", - "ipset_save": "artifacts/firewall_runtime/ipset.save", - } - } - } - result = manifest._render_firewall_runtime_tasks(state) - assert len(result) >= 1 - - -def test_render_firewall_runtime_tasks_with_ipv6(): - state = { - "roles": { - "firewall_runtime": { - "role_name": "firewall_runtime", - "iptables_v6_save": "artifacts/firewall_runtime/ip6tables.save", - } - } - } - result = manifest._render_firewall_runtime_tasks(state) - assert len(result) >= 1 diff --git a/tests/test_misc_coverage.py b/tests/test_misc_coverage.py new file mode 100644 index 0000000..1ff6e98 --- /dev/null +++ b/tests/test_misc_coverage.py @@ -0,0 +1,416 @@ +from __future__ import annotations + +import json +import os +import stat +import subprocess +import sys +import types +from pathlib import Path +from types import SimpleNamespace + +import pytest + +from enroll.cache import _safe_component, new_harvest_cache_dir +from enroll.ignore import IgnorePolicy +from enroll.sopsutil import ( + SopsError, + _pgp_arg, + decrypt_file_binary_to, + encrypt_file_binary, +) + + +def test_safe_component_sanitizes_and_bounds_length(): + assert _safe_component(" ") == "unknown" + assert _safe_component("a/b c") == "a_b_c" + assert _safe_component("x" * 200) == "x" * 64 + + +def test_new_harvest_cache_dir_uses_xdg_cache_home(tmp_path: Path, monkeypatch): + monkeypatch.setenv("XDG_CACHE_HOME", str(tmp_path / "xdg")) + hc = new_harvest_cache_dir(hint="my host/01") + assert hc.dir.exists() + assert "my_host_01" in hc.dir.name + assert str(hc.dir).startswith(str(tmp_path / "xdg")) + # best-effort: ensure directory is not world-readable on typical FS + try: + mode = stat.S_IMODE(hc.dir.stat().st_mode) + assert mode & 0o077 == 0 + except OSError: + pass + + +def test_ignore_policy_denies_binary_and_sensitive_content(tmp_path: Path): + p_bin = tmp_path / "binfile" + p_bin.write_bytes(b"abc\x00def") + assert IgnorePolicy().deny_reason(str(p_bin)) == "binary_like" + + p_secret = tmp_path / "secret.conf" + p_secret.write_text("password=foo\n", encoding="utf-8") + assert IgnorePolicy().deny_reason(str(p_secret)) == "sensitive_content" + + # dangerous mode disables heuristic scanning (but still checks file-ness/size) + assert IgnorePolicy(dangerous=True).deny_reason(str(p_secret)) is None + + +def test_ignore_policy_denies_usr_local_shadow_by_glob(): + # This should short-circuit before stat() (path doesn't need to exist). + assert IgnorePolicy().deny_reason("/usr/local/etc/shadow") == "denied_path" + + +def test_sops_pgp_arg_and_encrypt_decrypt_roundtrip(tmp_path: Path, monkeypatch): + assert _pgp_arg([" ABC ", "DEF"]) == "ABC,DEF" + with pytest.raises(SopsError): + _pgp_arg([]) + + # Stub out sops and subprocess. + import enroll.sopsutil as s + + monkeypatch.setattr(s, "require_sops_cmd", lambda: "sops") + + class R: + def __init__(self, rc: int, out: bytes, err: bytes = b""): + self.returncode = rc + self.stdout = out + self.stderr = err + + calls = [] + + def fake_run(cmd, capture_output, check): + calls.append(cmd) + # Return a deterministic payload so we can assert file writes. + if "--encrypt" in cmd: + return R(0, b"ENCRYPTED") + if "--decrypt" in cmd: + return R(0, b"PLAINTEXT") + return R(1, b"", b"bad") + + monkeypatch.setattr(s.subprocess, "run", fake_run) + + src = tmp_path / "src.bin" + src.write_bytes(b"x") + enc = tmp_path / "out.sops" + dec = tmp_path / "out.bin" + + encrypt_file_binary(src, enc, pgp_fingerprints=["ABC"], mode=0o600) + assert enc.read_bytes() == b"ENCRYPTED" + + decrypt_file_binary_to(enc, dec, mode=0o644) + assert dec.read_bytes() == b"PLAINTEXT" + + # Sanity: we invoked encrypt and decrypt. + assert any("--encrypt" in c for c in calls) + assert any("--decrypt" in c for c in calls) + + +def test_cache_dir_defaults_to_home_cache(monkeypatch, tmp_path: Path): + # Ensure default path uses ~/.cache when XDG_CACHE_HOME is unset. + from enroll.cache import enroll_cache_dir + + monkeypatch.delenv("XDG_CACHE_HOME", raising=False) + monkeypatch.setattr(Path, "home", lambda: tmp_path) + + p = enroll_cache_dir() + assert str(p).startswith(str(tmp_path)) + assert p.name == "enroll" + + +def test_harvest_cache_state_json_property(tmp_path: Path): + from enroll.cache import HarvestCache + + hc = HarvestCache(tmp_path / "h1") + assert hc.state_json == hc.dir / "state.json" + + +def test_cache_dir_security_rejects_symlink(tmp_path: Path): + from enroll.cache import _ensure_dir_secure + + real = tmp_path / "real" + real.mkdir() + link = tmp_path / "link" + link.symlink_to(real, target_is_directory=True) + + with pytest.raises(RuntimeError, match="Refusing to use symlink"): + _ensure_dir_secure(link) + + +def test_cache_dir_chmod_failures_are_ignored(monkeypatch, tmp_path: Path): + from enroll import cache + + # Make the cache base path deterministic and writable. + monkeypatch.setattr(cache, "enroll_cache_dir", lambda: tmp_path) + + # Force os.chmod to fail to cover the "except OSError: pass" paths. + monkeypatch.setattr( + os, "chmod", lambda *a, **k: (_ for _ in ()).throw(OSError("nope")) + ) + + hc = cache.new_harvest_cache_dir() + assert hc.dir.exists() + assert hc.dir.is_dir() + + +def test_stat_triplet_falls_back_to_numeric_ids(monkeypatch, tmp_path: Path): + from enroll.fsutil import stat_triplet + import pwd + import grp + + p = tmp_path / "x" + p.write_text("x", encoding="utf-8") + + # Force username/group resolution failures. + monkeypatch.setattr( + pwd, "getpwuid", lambda _uid: (_ for _ in ()).throw(KeyError("no user")) + ) + monkeypatch.setattr( + grp, "getgrgid", lambda _gid: (_ for _ in ()).throw(KeyError("no group")) + ) + + owner, group, mode = stat_triplet(str(p)) + assert owner.isdigit() + assert group.isdigit() + assert len(mode) == 4 + + +def test_ignore_policy_iter_effective_lines_removes_block_comments(): + from enroll.ignore import IgnorePolicy + + pol = IgnorePolicy() + data = b"""keep1 +/* +drop me +*/ +keep2 +""" + assert list(pol.iter_effective_lines(data)) == [b"keep1", b"keep2"] + + +def test_ignore_policy_deny_reason_dir_variants(tmp_path: Path): + from enroll.ignore import IgnorePolicy + + pol = IgnorePolicy() + + # denied by glob + assert pol.deny_reason_dir("/etc/shadow") == "denied_path" + + # symlink rejected + d = tmp_path / "d" + d.mkdir() + link = tmp_path / "l" + link.symlink_to(d, target_is_directory=True) + assert pol.deny_reason_dir(str(link)) == "symlink" + + # not a directory + f = tmp_path / "f" + f.write_text("x", encoding="utf-8") + assert pol.deny_reason_dir(str(f)) == "not_directory" + + # ok + assert pol.deny_reason_dir(str(d)) is None + + +def test_run_jinjaturtle_parses_outputs(monkeypatch, tmp_path: Path): + # Fully unit-test enroll.jinjaturtle.run_jinjaturtle by stubbing subprocess.run. + from enroll.jinjaturtle import run_jinjaturtle + + def fake_run(cmd, **kwargs): # noqa: ARG001 + # cmd includes "-d -t