Add tests for symlinks management
This commit is contained in:
parent
aea58c8684
commit
7a9a0abcd1
3 changed files with 377 additions and 0 deletions
263
tests/test_harvest_symlinks.py
Normal file
263
tests/test_harvest_symlinks.py
Normal file
|
|
@ -0,0 +1,263 @@
|
||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import enroll.harvest as h
|
||||||
|
from enroll.platform import PlatformInfo
|
||||||
|
from enroll.systemd import UnitInfo
|
||||||
|
|
||||||
|
|
||||||
|
class AllowAllPolicy:
|
||||||
|
def deny_reason(self, path: str):
|
||||||
|
return None
|
||||||
|
|
||||||
|
def deny_reason_link(self, path: str):
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
class FakeBackend:
|
||||||
|
"""Minimal backend stub for harvest tests.
|
||||||
|
|
||||||
|
Keep harvest deterministic and avoid enumerating the real system.
|
||||||
|
"""
|
||||||
|
|
||||||
|
name = "dpkg"
|
||||||
|
|
||||||
|
def build_etc_index(self):
|
||||||
|
return (set(), {}, {}, {})
|
||||||
|
|
||||||
|
def owner_of_path(self, path: str):
|
||||||
|
return None
|
||||||
|
|
||||||
|
def list_manual_packages(self):
|
||||||
|
return []
|
||||||
|
|
||||||
|
def installed_packages(self):
|
||||||
|
return {}
|
||||||
|
|
||||||
|
def specific_paths_for_hints(self, hints: set[str]):
|
||||||
|
return []
|
||||||
|
|
||||||
|
def is_pkg_config_path(self, path: str) -> bool:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def modified_paths(self, pkg: str, etc_paths: list[str]):
|
||||||
|
return {}
|
||||||
|
|
||||||
|
|
||||||
|
def _base_monkeypatches(monkeypatch, *, unit: str):
|
||||||
|
"""Patch harvest to avoid live system access."""
|
||||||
|
|
||||||
|
monkeypatch.setattr(
|
||||||
|
h, "detect_platform", lambda: PlatformInfo("debian", "dpkg", {})
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(h, "get_backend", lambda info=None: FakeBackend())
|
||||||
|
|
||||||
|
monkeypatch.setattr(h, "list_enabled_timers", lambda: [])
|
||||||
|
monkeypatch.setattr(
|
||||||
|
h,
|
||||||
|
"get_unit_info",
|
||||||
|
lambda u: UnitInfo(
|
||||||
|
name=u,
|
||||||
|
fragment_path=None,
|
||||||
|
dropin_paths=[],
|
||||||
|
env_files=[],
|
||||||
|
exec_paths=[],
|
||||||
|
active_state="inactive",
|
||||||
|
sub_state="dead",
|
||||||
|
unit_file_state="enabled",
|
||||||
|
condition_result=None,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Keep users empty and avoid touching /etc/skel.
|
||||||
|
monkeypatch.setattr(h, "collect_non_system_users", lambda: [])
|
||||||
|
|
||||||
|
# Avoid warning spam from non-root test runs.
|
||||||
|
if hasattr(h.os, "geteuid"):
|
||||||
|
monkeypatch.setattr(h.os, "geteuid", lambda: 0)
|
||||||
|
|
||||||
|
# Avoid walking the real filesystem.
|
||||||
|
monkeypatch.setattr(h.os, "walk", lambda root: iter(()))
|
||||||
|
monkeypatch.setattr(h, "_copy_into_bundle", lambda *a, **k: None)
|
||||||
|
|
||||||
|
# Default to a "no files exist" view of the world unless a test overrides.
|
||||||
|
monkeypatch.setattr(h.os.path, "isfile", lambda p: False)
|
||||||
|
monkeypatch.setattr(h.os.path, "exists", lambda p: False)
|
||||||
|
|
||||||
|
# Minimal enabled services list.
|
||||||
|
monkeypatch.setattr(h, "list_enabled_services", lambda: [unit] if unit else [])
|
||||||
|
|
||||||
|
|
||||||
|
def test_harvest_captures_nginx_enabled_symlinks(monkeypatch, tmp_path: Path):
|
||||||
|
bundle = tmp_path / "bundle"
|
||||||
|
unit = "nginx.service"
|
||||||
|
_base_monkeypatches(monkeypatch, unit=unit)
|
||||||
|
|
||||||
|
# Fake filesystem for nginx enabled dirs.
|
||||||
|
dirs = {
|
||||||
|
"/etc",
|
||||||
|
"/etc/nginx",
|
||||||
|
"/etc/nginx/sites-enabled",
|
||||||
|
"/etc/nginx/modules-enabled",
|
||||||
|
}
|
||||||
|
links = {
|
||||||
|
"/etc/nginx/sites-enabled/default": "../sites-available/default",
|
||||||
|
"/etc/nginx/modules-enabled/mod-http": "../modules-available/mod-http",
|
||||||
|
}
|
||||||
|
|
||||||
|
monkeypatch.setattr(h.os.path, "isdir", lambda p: p in dirs)
|
||||||
|
monkeypatch.setattr(h.os.path, "islink", lambda p: p in links)
|
||||||
|
monkeypatch.setattr(h.os, "readlink", lambda p: links[p])
|
||||||
|
|
||||||
|
def fake_glob(pat: str):
|
||||||
|
if pat == "/etc/nginx/sites-enabled/*":
|
||||||
|
return [
|
||||||
|
"/etc/nginx/sites-enabled/default",
|
||||||
|
"/etc/nginx/sites-enabled/README",
|
||||||
|
]
|
||||||
|
if pat == "/etc/nginx/modules-enabled/*":
|
||||||
|
return ["/etc/nginx/modules-enabled/mod-http"]
|
||||||
|
return []
|
||||||
|
|
||||||
|
monkeypatch.setattr(h.glob, "glob", fake_glob)
|
||||||
|
|
||||||
|
state_path = h.harvest(str(bundle), policy=AllowAllPolicy())
|
||||||
|
st = json.loads(Path(state_path).read_text(encoding="utf-8"))
|
||||||
|
|
||||||
|
svc = next(s for s in st["roles"]["services"] if s["role_name"] == "nginx")
|
||||||
|
managed_links = svc.get("managed_links") or []
|
||||||
|
assert {(ml["path"], ml["target"], ml["reason"]) for ml in managed_links} == {
|
||||||
|
(
|
||||||
|
"/etc/nginx/sites-enabled/default",
|
||||||
|
"../sites-available/default",
|
||||||
|
"enabled_symlink",
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"/etc/nginx/modules-enabled/mod-http",
|
||||||
|
"../modules-available/mod-http",
|
||||||
|
"enabled_symlink",
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def test_harvest_does_not_capture_enabled_symlinks_without_role(
|
||||||
|
monkeypatch, tmp_path: Path
|
||||||
|
):
|
||||||
|
bundle = tmp_path / "bundle"
|
||||||
|
_base_monkeypatches(monkeypatch, unit="")
|
||||||
|
|
||||||
|
# Dirs exist but nginx isn't detected, so nothing should be captured.
|
||||||
|
monkeypatch.setattr(
|
||||||
|
h.os.path,
|
||||||
|
"isdir",
|
||||||
|
lambda p: p
|
||||||
|
in {
|
||||||
|
"/etc",
|
||||||
|
"/etc/nginx/sites-enabled",
|
||||||
|
"/etc/nginx/modules-enabled",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
h.glob, "glob", lambda pat: ["/etc/nginx/sites-enabled/default"]
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(h.os.path, "islink", lambda p: True)
|
||||||
|
monkeypatch.setattr(h.os, "readlink", lambda p: "../sites-available/default")
|
||||||
|
|
||||||
|
state_path = h.harvest(str(bundle), policy=AllowAllPolicy())
|
||||||
|
st = json.loads(Path(state_path).read_text(encoding="utf-8"))
|
||||||
|
|
||||||
|
# No services => no place to attach nginx links.
|
||||||
|
assert st["roles"]["services"] == []
|
||||||
|
# And no package snapshots either.
|
||||||
|
assert st["roles"]["packages"] == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_harvest_symlink_capture_respects_ignore_policy(monkeypatch, tmp_path: Path):
|
||||||
|
bundle = tmp_path / "bundle"
|
||||||
|
_base_monkeypatches(monkeypatch, unit="nginx.service")
|
||||||
|
|
||||||
|
dirs = {"/etc", "/etc/nginx/sites-enabled", "/etc/nginx/modules-enabled"}
|
||||||
|
links = {
|
||||||
|
"/etc/nginx/sites-enabled/default": "../sites-available/default",
|
||||||
|
"/etc/nginx/sites-enabled/ok": "../sites-available/ok",
|
||||||
|
}
|
||||||
|
|
||||||
|
monkeypatch.setattr(h.os.path, "isdir", lambda p: p in dirs)
|
||||||
|
monkeypatch.setattr(h.os.path, "islink", lambda p: p in links)
|
||||||
|
monkeypatch.setattr(h.os, "readlink", lambda p: links[p])
|
||||||
|
monkeypatch.setattr(
|
||||||
|
h.glob,
|
||||||
|
"glob",
|
||||||
|
lambda pat: (
|
||||||
|
sorted(list(links.keys())) if pat == "/etc/nginx/sites-enabled/*" else []
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
calls: list[str] = []
|
||||||
|
|
||||||
|
class Policy:
|
||||||
|
def deny_reason(self, path: str):
|
||||||
|
return None
|
||||||
|
|
||||||
|
def deny_reason_link(self, path: str):
|
||||||
|
calls.append(path)
|
||||||
|
if path.endswith("/default"):
|
||||||
|
return "denied_path"
|
||||||
|
return None
|
||||||
|
|
||||||
|
state_path = h.harvest(str(bundle), policy=Policy())
|
||||||
|
st = json.loads(Path(state_path).read_text(encoding="utf-8"))
|
||||||
|
|
||||||
|
svc = next(s for s in st["roles"]["services"] if s["role_name"] == "nginx")
|
||||||
|
managed_links = svc.get("managed_links") or []
|
||||||
|
excluded = svc.get("excluded") or []
|
||||||
|
|
||||||
|
assert any(p.endswith("/default") for p in calls)
|
||||||
|
assert any(p.endswith("/ok") for p in calls)
|
||||||
|
assert {ml["path"] for ml in managed_links} == {"/etc/nginx/sites-enabled/ok"}
|
||||||
|
assert {ex["path"] for ex in excluded} == {"/etc/nginx/sites-enabled/default"}
|
||||||
|
assert (
|
||||||
|
next(ex["reason"] for ex in excluded if ex["path"].endswith("/default"))
|
||||||
|
== "denied_path"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_harvest_captures_apache2_enabled_symlinks(monkeypatch, tmp_path: Path):
|
||||||
|
bundle = tmp_path / "bundle"
|
||||||
|
_base_monkeypatches(monkeypatch, unit="apache2.service")
|
||||||
|
|
||||||
|
dirs = {
|
||||||
|
"/etc",
|
||||||
|
"/etc/apache2/conf-enabled",
|
||||||
|
"/etc/apache2/mods-enabled",
|
||||||
|
"/etc/apache2/sites-enabled",
|
||||||
|
}
|
||||||
|
links = {
|
||||||
|
"/etc/apache2/sites-enabled/000-default.conf": "../sites-available/000-default.conf",
|
||||||
|
"/etc/apache2/mods-enabled/rewrite.load": "../mods-available/rewrite.load",
|
||||||
|
"/etc/apache2/conf-enabled/security.conf": "../conf-available/security.conf",
|
||||||
|
}
|
||||||
|
|
||||||
|
monkeypatch.setattr(h.os.path, "isdir", lambda p: p in dirs)
|
||||||
|
monkeypatch.setattr(h.os.path, "islink", lambda p: p in links)
|
||||||
|
monkeypatch.setattr(h.os, "readlink", lambda p: links[p])
|
||||||
|
|
||||||
|
def fake_glob(pat: str):
|
||||||
|
if pat == "/etc/apache2/sites-enabled/*":
|
||||||
|
return ["/etc/apache2/sites-enabled/000-default.conf"]
|
||||||
|
if pat == "/etc/apache2/mods-enabled/*":
|
||||||
|
return ["/etc/apache2/mods-enabled/rewrite.load"]
|
||||||
|
if pat == "/etc/apache2/conf-enabled/*":
|
||||||
|
return ["/etc/apache2/conf-enabled/security.conf"]
|
||||||
|
return []
|
||||||
|
|
||||||
|
monkeypatch.setattr(h.glob, "glob", fake_glob)
|
||||||
|
|
||||||
|
state_path = h.harvest(str(bundle), policy=AllowAllPolicy())
|
||||||
|
st = json.loads(Path(state_path).read_text(encoding="utf-8"))
|
||||||
|
|
||||||
|
svc = next(s for s in st["roles"]["services"] if s["role_name"] == "apache2")
|
||||||
|
managed_links = svc.get("managed_links") or []
|
||||||
|
assert {ml["path"] for ml in managed_links} == set(links.keys())
|
||||||
|
assert {ml["target"] for ml in managed_links} == set(links.values())
|
||||||
|
assert all(ml["reason"] == "enabled_symlink" for ml in managed_links)
|
||||||
|
|
@ -8,3 +8,21 @@ def test_ignore_policy_denies_common_backup_files():
|
||||||
assert pol.deny_reason("/etc/group-") == "backup_file"
|
assert pol.deny_reason("/etc/group-") == "backup_file"
|
||||||
assert pol.deny_reason("/etc/something~") == "backup_file"
|
assert pol.deny_reason("/etc/something~") == "backup_file"
|
||||||
assert pol.deny_reason("/foobar") == "unreadable"
|
assert pol.deny_reason("/foobar") == "unreadable"
|
||||||
|
|
||||||
|
|
||||||
|
def test_ignore_policy_deny_reason_link(tmp_path):
|
||||||
|
pol = IgnorePolicy()
|
||||||
|
|
||||||
|
target = tmp_path / "target.txt"
|
||||||
|
target.write_text("hello", encoding="utf-8")
|
||||||
|
link = tmp_path / "link.txt"
|
||||||
|
link.symlink_to(target)
|
||||||
|
|
||||||
|
# File is not a symlink.
|
||||||
|
assert pol.deny_reason_link(str(target)) == "not_symlink"
|
||||||
|
|
||||||
|
# Symlink is accepted if readable.
|
||||||
|
assert pol.deny_reason_link(str(link)) is None
|
||||||
|
|
||||||
|
# Missing path should be unreadable.
|
||||||
|
assert pol.deny_reason_link(str(tmp_path / "missing")) == "unreadable"
|
||||||
|
|
|
||||||
96
tests/test_manifest_symlinks.py
Normal file
96
tests/test_manifest_symlinks.py
Normal file
|
|
@ -0,0 +1,96 @@
|
||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import enroll.manifest as manifest
|
||||||
|
|
||||||
|
|
||||||
|
def test_manifest_emits_symlink_tasks_and_vars(tmp_path: Path):
|
||||||
|
bundle = tmp_path / "bundle"
|
||||||
|
out = tmp_path / "ansible"
|
||||||
|
|
||||||
|
state = {
|
||||||
|
"host": {"hostname": "test", "os": "debian", "pkg_backend": "dpkg"},
|
||||||
|
"inventory": {"packages": {}},
|
||||||
|
"roles": {
|
||||||
|
"users": {
|
||||||
|
"role_name": "users",
|
||||||
|
"users": [],
|
||||||
|
"managed_files": [],
|
||||||
|
"excluded": [],
|
||||||
|
"notes": [],
|
||||||
|
},
|
||||||
|
"services": [
|
||||||
|
{
|
||||||
|
"unit": "nginx.service",
|
||||||
|
"role_name": "nginx",
|
||||||
|
"packages": ["nginx"],
|
||||||
|
"active_state": "active",
|
||||||
|
"sub_state": "running",
|
||||||
|
"unit_file_state": "enabled",
|
||||||
|
"condition_result": None,
|
||||||
|
"managed_files": [],
|
||||||
|
"managed_links": [
|
||||||
|
{
|
||||||
|
"path": "/etc/nginx/sites-enabled/default",
|
||||||
|
"target": "../sites-available/default",
|
||||||
|
"reason": "enabled_symlink",
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"excluded": [],
|
||||||
|
"notes": [],
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"packages": [],
|
||||||
|
"apt_config": {
|
||||||
|
"role_name": "apt_config",
|
||||||
|
"managed_files": [],
|
||||||
|
"excluded": [],
|
||||||
|
"notes": [],
|
||||||
|
},
|
||||||
|
"dnf_config": {
|
||||||
|
"role_name": "dnf_config",
|
||||||
|
"managed_files": [],
|
||||||
|
"excluded": [],
|
||||||
|
"notes": [],
|
||||||
|
},
|
||||||
|
"etc_custom": {
|
||||||
|
"role_name": "etc_custom",
|
||||||
|
"managed_files": [],
|
||||||
|
"excluded": [],
|
||||||
|
"notes": [],
|
||||||
|
},
|
||||||
|
"usr_local_custom": {
|
||||||
|
"role_name": "usr_local_custom",
|
||||||
|
"managed_files": [],
|
||||||
|
"excluded": [],
|
||||||
|
"notes": [],
|
||||||
|
},
|
||||||
|
"extra_paths": {
|
||||||
|
"role_name": "extra_paths",
|
||||||
|
"include_patterns": [],
|
||||||
|
"exclude_patterns": [],
|
||||||
|
"managed_files": [],
|
||||||
|
"managed_links": [],
|
||||||
|
"excluded": [],
|
||||||
|
"notes": [],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
bundle.mkdir(parents=True, exist_ok=True)
|
||||||
|
(bundle / "artifacts").mkdir(parents=True, exist_ok=True)
|
||||||
|
(bundle / "state.json").write_text(json.dumps(state), encoding="utf-8")
|
||||||
|
|
||||||
|
manifest.manifest(str(bundle), str(out))
|
||||||
|
|
||||||
|
tasks = (out / "roles" / "nginx" / "tasks" / "main.yml").read_text(encoding="utf-8")
|
||||||
|
assert "- name: Ensure managed symlinks exist" in tasks
|
||||||
|
assert 'loop: "{{ nginx_managed_links | default([]) }}"' in tasks
|
||||||
|
|
||||||
|
defaults = (out / "roles" / "nginx" / "defaults" / "main.yml").read_text(
|
||||||
|
encoding="utf-8"
|
||||||
|
)
|
||||||
|
# The role defaults should include the converted link mapping.
|
||||||
|
assert "nginx_managed_links:" in defaults
|
||||||
|
assert "dest: /etc/nginx/sites-enabled/default" in defaults
|
||||||
|
assert "src: ../sites-available/default" in defaults
|
||||||
Loading…
Add table
Add a link
Reference in a new issue