Remote mode and dangerous flag, other tweaks

* Add remote mode for harvesting a remote machine via a local workstation (no need to install enroll remotely)
   Optionally use `--no-sudo` if you don't want the remote user to have passwordless sudo when conducting the
   harvest, albeit you'll end up with less useful data (same as if running `enroll harvest` on a machine without
   sudo)
 * Add `--dangerous` flag to capture even sensitive data (use at your own risk!)
 * Do a better job at capturing other config files in `/etc/<package>/` even if that package doesn't normally
   ship or manage those files.
This commit is contained in:
Miguel Jacq 2025-12-17 17:02:16 +11:00
parent 026416d158
commit 6a36a9d2d5
Signed by: mig5
GPG key ID: 59B3F0C24135C6A9
13 changed files with 1083 additions and 155 deletions

79
enroll/cache.py Normal file
View file

@ -0,0 +1,79 @@
from __future__ import annotations
import os
import re
import tempfile
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Optional
def _safe_component(s: str) -> str:
s = s.strip()
if not s:
return "unknown"
s = re.sub(r"[^A-Za-z0-9_.-]+", "_", s)
s = re.sub(r"_+", "_", s)
return s[:64]
def enroll_cache_dir() -> Path:
"""Return the base cache directory for enroll.
We default to ~/.local/cache to match common Linux conventions in personal
homedirs, but honour XDG_CACHE_HOME if set.
"""
base = os.environ.get("XDG_CACHE_HOME")
if base:
root = Path(base).expanduser()
else:
root = Path.home() / ".local" / "cache"
return root / "enroll"
@dataclass(frozen=True)
class HarvestCache:
"""A locally-persistent directory that holds a harvested bundle."""
dir: Path
@property
def state_json(self) -> Path:
return self.dir / "state.json"
def _ensure_dir_secure(path: Path) -> None:
"""Create a directory with restrictive permissions; refuse symlinks."""
# Refuse a symlink at the leaf.
if path.exists() and path.is_symlink():
raise RuntimeError(f"Refusing to use symlink path: {path}")
path.mkdir(parents=True, exist_ok=True, mode=0o700)
try:
os.chmod(path, 0o700)
except OSError:
# Best-effort; on some FS types chmod may fail.
pass
def new_harvest_cache_dir(*, hint: Optional[str] = None) -> HarvestCache:
"""Create a new, unpredictable harvest directory under the user's cache.
This mitigates pre-guessing attacks (e.g. an attacker creating a directory
in advance in a shared temp location) by creating the bundle directory under
the user's home and using mkdtemp() randomness.
"""
base = enroll_cache_dir() / "harvest"
_ensure_dir_secure(base)
ts = datetime.now().strftime("%Y%m%d-%H%M%S")
safe = _safe_component(hint or "harvest")
prefix = f"{ts}-{safe}-"
# mkdtemp creates a new directory with a random suffix.
d = Path(tempfile.mkdtemp(prefix=prefix, dir=str(base)))
try:
os.chmod(d, 0o700)
except OSError:
pass
return HarvestCache(dir=d)

View file

@ -1,9 +1,14 @@
from __future__ import annotations
import argparse
import os
from pathlib import Path
from typing import Optional
from .cache import new_harvest_cache_dir
from .harvest import harvest
from .manifest import manifest
from .remote import remote_harvest
def _add_common_manifest_args(p: argparse.ArgumentParser) -> None:
@ -32,46 +37,129 @@ def _jt_mode(args: argparse.Namespace) -> str:
return "auto"
def _add_remote_args(p: argparse.ArgumentParser) -> None:
p.add_argument(
"--remote-host",
help="SSH host to run harvesting on (if set, harvest runs remotely and is pulled locally).",
)
p.add_argument(
"--remote-port",
type=int,
default=22,
help="SSH port for --remote-host (default: 22).",
)
p.add_argument(
"--remote-user",
default=os.environ.get("USER") or None,
help="SSH username for --remote-host (default: local $USER).",
)
def main() -> None:
ap = argparse.ArgumentParser(prog="enroll")
sub = ap.add_subparsers(dest="cmd", required=True)
h = sub.add_parser("harvest", help="Harvest service/package/config state")
h.add_argument("--out", required=True, help="Harvest output directory")
h.add_argument("--out", help="Harvest output directory")
h.add_argument(
"--dangerous",
action="store_true",
help="Collect files more aggressively (may include secrets). Disables secret-avoidance checks.",
)
h.add_argument(
"--no-sudo",
action="store_true",
help="Don't use sudo on the remote host (when using --remote options). This may result in a limited harvest due to permission restrictions.",
)
_add_remote_args(h)
r = sub.add_parser("manifest", help="Render Ansible roles from a harvest")
r.add_argument(
m = sub.add_parser("manifest", help="Render Ansible roles from a harvest")
m.add_argument(
"--harvest",
required=True,
help="Path to the directory created by the harvest command",
)
r.add_argument(
m.add_argument(
"--out",
required=True,
help="Output directory for generated roles/playbook Ansible manifest",
)
_add_common_manifest_args(r)
_add_common_manifest_args(m)
e = sub.add_parser(
s = sub.add_parser(
"single-shot", help="Harvest state, then manifest Ansible code, in one shot"
)
e.add_argument(
"--harvest", required=True, help="Path to the directory to place the harvest in"
s.add_argument("--harvest", help="Path to the directory to place the harvest in")
s.add_argument(
"--dangerous",
action="store_true",
help="Collect files more aggressively (may include secrets). Disables secret-avoidance checks.",
)
e.add_argument(
s.add_argument(
"--no-sudo",
action="store_true",
help="Don't use sudo on the remote host (when using --remote options). This may result in a limited harvest due to permission restrictions.",
)
s.add_argument(
"--out",
required=True,
help="Output directory for generated roles/playbook Ansible manifest",
)
_add_common_manifest_args(e)
_add_common_manifest_args(s)
_add_remote_args(s)
args = ap.parse_args()
remote_host: Optional[str] = getattr(args, "remote_host", None)
if args.cmd == "harvest":
path = harvest(args.out)
print(path)
if remote_host:
out_dir = (
Path(args.out)
if args.out
else new_harvest_cache_dir(hint=remote_host).dir
)
state = remote_harvest(
local_out_dir=out_dir,
remote_host=remote_host,
remote_port=int(args.remote_port),
remote_user=args.remote_user,
dangerous=bool(args.dangerous),
no_sudo=bool(args.no_sudo),
)
print(str(state))
else:
if not args.out:
raise SystemExit("error: --out is required unless --remote-host is set")
path = harvest(args.out, dangerous=bool(args.dangerous))
print(path)
elif args.cmd == "manifest":
manifest(args.harvest, args.out, fqdn=args.fqdn, jinjaturtle=_jt_mode(args))
elif args.cmd == "single-shot":
harvest(args.harvest)
manifest(args.harvest, args.out, fqdn=args.fqdn, jinjaturtle=_jt_mode(args))
if remote_host:
harvest_dir = (
Path(args.harvest)
if args.harvest
else new_harvest_cache_dir(hint=remote_host).dir
)
remote_harvest(
local_out_dir=harvest_dir,
remote_host=remote_host,
remote_port=int(args.remote_port),
remote_user=args.remote_user,
dangerous=bool(args.dangerous),
no_sudo=bool(args.no_sudo),
)
manifest(
str(harvest_dir), args.out, fqdn=args.fqdn, jinjaturtle=_jt_mode(args)
)
# For usability (when --harvest wasn't provided), print the harvest path.
if not args.harvest:
print(str(harvest_dir / "state.json"))
else:
if not args.harvest:
raise SystemExit(
"error: --harvest is required unless --remote-host is set"
)
harvest(args.harvest, dangerous=bool(args.dangerous))
manifest(args.harvest, args.out, fqdn=args.fqdn, jinjaturtle=_jt_mode(args))

View file

@ -199,7 +199,11 @@ def _maybe_add_specific_paths(hints: Set[str]) -> List[str]:
def _scan_unowned_under_roots(
roots: List[str], owned_etc: Set[str], limit: int = MAX_UNOWNED_FILES_PER_ROLE
roots: List[str],
owned_etc: Set[str],
limit: int = MAX_UNOWNED_FILES_PER_ROLE,
*,
confish_only: bool = True,
) -> List[str]:
found: List[str] = []
for root in roots:
@ -218,7 +222,7 @@ def _scan_unowned_under_roots(
continue
if not os.path.isfile(p) or os.path.islink(p):
continue
if not _is_confish(p):
if confish_only and not _is_confish(p):
continue
found.append(p)
return found
@ -233,8 +237,20 @@ def _topdirs_for_package(pkg: str, pkg_to_etc_paths: Dict[str, List[str]]) -> Se
return topdirs
def harvest(bundle_dir: str, policy: Optional[IgnorePolicy] = None) -> str:
policy = policy or IgnorePolicy()
def harvest(
bundle_dir: str,
policy: Optional[IgnorePolicy] = None,
*,
dangerous: bool = False,
) -> str:
# If a policy is not supplied, build one. `--dangerous` relaxes secret
# detection and deny-glob skipping.
if policy is None:
policy = IgnorePolicy(dangerous=dangerous)
elif dangerous:
# If callers explicitly provided a policy but also requested
# dangerous behavior, honour the CLI intent.
policy.dangerous = True
os.makedirs(bundle_dir, exist_ok=True)
if hasattr(os, "geteuid") and os.geteuid() != 0:
@ -338,10 +354,42 @@ def harvest(bundle_dir: str, policy: Optional[IgnorePolicy] = None) -> str:
if current != baseline:
candidates.setdefault(path, "modified_packaged_file")
roots: List[str] = []
# Capture custom/unowned files living under /etc/<name> for this service.
#
# Historically we only captured "config-ish" files (by extension). That
# misses important runtime-generated artifacts like certificates and
# key material under service directories (e.g. /etc/openvpn/*.crt).
#
# To avoid exploding output for shared trees (e.g. /etc/systemd), keep
# the older "config-ish only" behavior for known shared topdirs.
any_roots: List[str] = []
confish_roots: List[str] = []
for h in hints:
roots.extend([f"/etc/{h}", f"/etc/{h}.d"])
for pth in _scan_unowned_under_roots(roots, owned_etc):
roots_for_h = [f"/etc/{h}", f"/etc/{h}.d"]
if h in SHARED_ETC_TOPDIRS:
confish_roots.extend(roots_for_h)
else:
any_roots.extend(roots_for_h)
found: List[str] = []
found.extend(
_scan_unowned_under_roots(
any_roots,
owned_etc,
limit=MAX_UNOWNED_FILES_PER_ROLE,
confish_only=False,
)
)
if len(found) < MAX_UNOWNED_FILES_PER_ROLE:
found.extend(
_scan_unowned_under_roots(
confish_roots,
owned_etc,
limit=MAX_UNOWNED_FILES_PER_ROLE - len(found),
confish_only=True,
)
)
for pth in found:
candidates.setdefault(pth, "custom_unowned")
if not pkgs and not candidates:
@ -449,8 +497,14 @@ def harvest(bundle_dir: str, policy: Optional[IgnorePolicy] = None) -> str:
roots.extend([f"/etc/logrotate.d/{td}"])
roots.extend([f"/etc/sysctl.d/{td}.conf"])
# Capture any custom/unowned files under /etc/<topdir> for this
# manually-installed package. This may include runtime-generated
# artifacts like certificates, key files, and helper scripts which are
# not owned by any .deb.
for pth in _scan_unowned_under_roots(
[r for r in roots if os.path.isdir(r)], owned_etc
[r for r in roots if os.path.isdir(r)],
owned_etc,
confish_only=False,
):
candidates.setdefault(pth, "custom_unowned")

View file

@ -38,9 +38,13 @@ BLOCK_END = b"*/"
@dataclass
class IgnorePolicy:
deny_globs: list[str] = None
deny_globs: Optional[list[str]] = None
max_file_bytes: int = 256_000
sample_bytes: int = 64_000
# If True, be much less conservative about collecting potentially
# sensitive files. This disables deny globs (e.g. /etc/shadow,
# /etc/ssl/private/*) and skips heuristic content scanning.
dangerous: bool = False
def __post_init__(self) -> None:
if self.deny_globs is None:
@ -69,9 +73,10 @@ class IgnorePolicy:
yield raw
def deny_reason(self, path: str) -> Optional[str]:
for g in self.deny_globs:
if fnmatch.fnmatch(path, g):
return "denied_path"
if not self.dangerous:
for g in self.deny_globs or []:
if fnmatch.fnmatch(path, g):
return "denied_path"
try:
st = os.stat(path, follow_symlinks=True)
@ -93,9 +98,10 @@ class IgnorePolicy:
if b"\x00" in data:
return "binary_like"
for line in self.iter_effective_lines(data):
for pat in SENSITIVE_CONTENT_PATTERNS:
if pat.search(line):
return "sensitive_content"
if not self.dangerous:
for line in self.iter_effective_lines(data):
for pat in SENSITIVE_CONTENT_PATTERNS:
if pat.search(line):
return "sensitive_content"
return None

View file

@ -81,25 +81,3 @@ def run_jinjaturtle(
return JinjifyResult(
template_text=template_text, vars_text=vars_text.rstrip() + "\n"
)
def replace_or_append_block(
base_text: str,
*,
begin: str,
end: str,
block_body: str,
) -> str:
"""Replace a marked block if present; else append it."""
pattern = re.compile(
re.escape(begin) + r".*?" + re.escape(end),
flags=re.DOTALL,
)
new_block = f"{begin}\n{block_body.rstrip()}\n{end}"
if pattern.search(base_text):
return pattern.sub(new_block, base_text).rstrip() + "\n"
# ensure base ends with newline
bt = base_text.rstrip() + "\n"
if not bt.endswith("\n"):
bt += "\n"
return bt + "\n" + new_block + "\n"

View file

@ -3,6 +3,8 @@ from __future__ import annotations
import json
import os
import shutil
import stat
import tempfile
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple
@ -70,36 +72,6 @@ def _yaml_dump_mapping(obj: Dict[str, Any], *, sort_keys: bool = True) -> str:
)
def _merge_list_keep_order(existing: List[Any], new: List[Any]) -> List[Any]:
out = list(existing)
seen = set(existing)
for item in new:
if item not in seen:
out.append(item)
seen.add(item)
return out
def _merge_mappings_preserve(
existing: Dict[str, Any], incoming: Dict[str, Any]
) -> Dict[str, Any]:
"""Merge incoming into existing:
- lists: union (preserve existing order)
- scalars/dicts: only set if missing (do not overwrite)
"""
merged = dict(existing)
for k, v in incoming.items():
if k in merged:
if isinstance(merged[k], list) and isinstance(v, list):
merged[k] = _merge_list_keep_order(merged[k], v)
else:
# keep existing value (non-overwriting)
continue
else:
merged[k] = v
return merged
def _merge_mappings_overwrite(
existing: Dict[str, Any], incoming: Dict[str, Any]
) -> Dict[str, Any]:
@ -113,33 +85,6 @@ def _merge_mappings_overwrite(
return merged
def _write_role_defaults_merge(role_dir: str, incoming: Dict[str, Any]) -> None:
"""Write/merge role defaults without clobbering existing values.
Used in site mode to keep roles reusable across hosts.
"""
defaults_path = os.path.join(role_dir, "defaults", "main.yml")
existing: Dict[str, Any] = {}
if os.path.exists(defaults_path):
try:
existing_text = Path(defaults_path).read_text(encoding="utf-8")
existing = _yaml_load_mapping(existing_text)
except Exception:
existing = {}
merged = _merge_mappings_preserve(existing, incoming)
body = "---\n" + _yaml_dump_mapping(merged, sort_keys=True)
with open(defaults_path, "w", encoding="utf-8") as f:
f.write(body)
def _extract_jinjaturtle_block(text: str) -> str:
"""Return YAML text inside JINJATURTLE_BEGIN/END markers, or the whole text if no markers."""
if JINJATURTLE_BEGIN in text and JINJATURTLE_END in text:
start = text.split(JINJATURTLE_BEGIN, 1)[1]
inner = start.split(JINJATURTLE_END, 1)[0]
return inner.strip() + "\n"
return text.strip() + "\n"
def _normalise_jinjaturtle_vars_text(vars_text: str) -> str:
"""Deduplicate keys in a vars fragment by parsing as YAML and dumping it back."""
m = _yaml_load_mapping(vars_text)
@ -158,6 +103,30 @@ def _yaml_list(items: List[str], indent: int = 2) -> str:
return "\n".join(f"{pad}- {x}" for x in items)
def _copy2_replace(src: str, dst: str) -> None:
dst_dir = os.path.dirname(dst)
os.makedirs(dst_dir, exist_ok=True)
# Copy to a temp file in the same directory, then atomically replace.
fd, tmp = tempfile.mkstemp(prefix=".enroll-tmp-", dir=dst_dir)
os.close(fd)
try:
shutil.copy2(src, tmp)
# Ensure the working tree stays mergeable: make the file user-writable.
st = os.stat(tmp, follow_symlinks=False)
mode = stat.S_IMODE(st.st_mode)
if not (mode & stat.S_IWUSR):
os.chmod(tmp, mode | stat.S_IWUSR)
os.replace(tmp, dst)
finally:
try:
os.unlink(tmp)
except FileNotFoundError:
pass
def _copy_artifacts(
bundle_dir: str,
role: str,
@ -195,7 +164,7 @@ def _copy_artifacts(
if preserve_existing and os.path.exists(dst):
continue
os.makedirs(os.path.dirname(dst), exist_ok=True)
shutil.copy2(src, dst)
_copy2_replace(src, dst)
def _write_role_scaffold(role_dir: str) -> None:
@ -380,11 +349,6 @@ def _jinjify_managed_files(
return templated, ""
def _hostvars_only_jinjaturtle(vars_text: str) -> str:
# keep as valid YAML file
return _defaults_with_jinjaturtle("---\n", vars_text)
def _defaults_with_jinjaturtle(base_defaults: str, vars_text: str) -> str:
if not vars_text.strip():
return base_defaults.rstrip() + "\n"

209
enroll/remote.py Normal file
View file

@ -0,0 +1,209 @@
from __future__ import annotations
import os
import shutil
import tarfile
import tempfile
import zipapp
from pathlib import Path
from pathlib import PurePosixPath
from typing import Optional
def _safe_extract_tar(tar: tarfile.TarFile, dest: Path) -> None:
"""Safely extract a tar archive into dest.
Protects against path traversal (e.g. entries containing ../).
"""
# Note: tar member names use POSIX separators regardless of platform.
dest = dest.resolve()
for m in tar.getmembers():
name = m.name
# Some tar implementations include a top-level '.' entry when created
# with `tar -C <dir> .`. That's harmless and should be allowed.
if name in {".", "./"}:
continue
# Reject absolute paths and any '..' components up front.
p = PurePosixPath(name)
if p.is_absolute() or ".." in p.parts:
raise RuntimeError(f"Unsafe tar member path: {name}")
# Refuse to extract links or device nodes from an untrusted archive.
# (A symlink can be used to redirect subsequent writes outside dest.)
if m.issym() or m.islnk() or m.isdev():
raise RuntimeError(f"Refusing to extract special tar member: {name}")
member_path = (dest / Path(*p.parts)).resolve()
if member_path != dest and not str(member_path).startswith(str(dest) + os.sep):
raise RuntimeError(f"Unsafe tar member path: {name}")
# Extract members one-by-one after validation.
for m in tar.getmembers():
if m.name in {".", "./"}:
continue
tar.extract(m, path=dest)
def _build_enroll_pyz(tmpdir: Path) -> Path:
"""Build a self-contained enroll zipapp (pyz) on the local machine.
The resulting file is stdlib-only and can be executed on the remote host
as long as it has Python 3 available.
"""
import enroll as pkg
pkg_dir = Path(pkg.__file__).resolve().parent
stage = tmpdir / "stage"
(stage / "enroll").mkdir(parents=True, exist_ok=True)
def _ignore(d: str, names: list[str]) -> set[str]:
return {
n
for n in names
if n in {"__pycache__", ".pytest_cache"} or n.endswith(".pyc")
}
shutil.copytree(pkg_dir, stage / "enroll", dirs_exist_ok=True, ignore=_ignore)
pyz_path = tmpdir / "enroll.pyz"
zipapp.create_archive(
stage,
target=pyz_path,
main="enroll.cli:main",
compressed=True,
)
return pyz_path
def _ssh_run(ssh, cmd: str) -> tuple[int, str, str]:
"""Run a command over a Paramiko SSHClient."""
_stdin, stdout, stderr = ssh.exec_command(cmd)
out = stdout.read().decode("utf-8", errors="replace")
err = stderr.read().decode("utf-8", errors="replace")
rc = stdout.channel.recv_exit_status()
return rc, out, err
def remote_harvest(
*,
local_out_dir: Path,
remote_host: str,
remote_port: int = 22,
remote_user: Optional[str] = None,
remote_python: str = "python3",
dangerous: bool = False,
no_sudo: bool = False,
) -> Path:
"""Run enroll harvest on a remote host via SSH and pull the bundle locally.
Returns the local path to state.json inside local_out_dir.
"""
try:
import paramiko # type: ignore
except Exception as e:
raise RuntimeError(
"Remote harvesting requires the 'paramiko' package. "
"Install it with: pip install paramiko"
) from e
local_out_dir = Path(local_out_dir)
local_out_dir.mkdir(parents=True, exist_ok=True)
try:
os.chmod(local_out_dir, 0o700)
except OSError:
pass
# Build a zipapp locally and upload it to the remote.
with tempfile.TemporaryDirectory(prefix="enroll-remote-") as td:
td_path = Path(td)
pyz = _build_enroll_pyz(td_path)
local_tgz = td_path / "bundle.tgz"
ssh = paramiko.SSHClient()
ssh.load_system_host_keys()
# Default: refuse unknown host keys.
# Users should add the key to known_hosts.
ssh.set_missing_host_key_policy(paramiko.RejectPolicy())
ssh.connect(
hostname=remote_host,
port=int(remote_port),
username=remote_user,
allow_agent=True,
look_for_keys=True,
)
sftp = ssh.open_sftp()
try:
rc, out, err = _ssh_run(ssh, "mktemp -d")
if rc != 0:
raise RuntimeError(f"Remote mktemp failed: {err.strip()}")
rtmp = out.strip()
rapp = f"{rtmp}/enroll.pyz"
rbundle = f"{rtmp}/bundle"
rtgz = f"{rtmp}/bundle.tgz"
sftp.put(str(pyz), rapp)
# Run remote harvest.
_cmd = f"{remote_python} {rapp} harvest --out {rbundle}"
if not no_sudo:
cmd = f"sudo {_cmd}"
else:
cmd = _cmd
if dangerous:
cmd += " --dangerous"
rc, out, err = _ssh_run(ssh, cmd)
if rc != 0:
raise RuntimeError(
"Remote harvest failed.\n"
f"Command: {cmd}\n"
f"Exit code: {rc}\n"
f"Stderr: {err.strip()}"
)
if not no_sudo:
# Ensure user can read the files, before we tar it
cmd = f"sudo chown -R {remote_user} {rbundle}"
rc, out, err = _ssh_run(ssh, cmd)
if rc != 0:
raise RuntimeError(
"chown of harvest failed.\n"
f"Command: {cmd}\n"
f"Exit code: {rc}\n"
f"Stderr: {err.strip()}"
)
# Tar the bundle for efficient download.
cmd = f"tar -czf {rtgz} -C {rbundle} ."
rc, out, err = _ssh_run(ssh, cmd)
if rc != 0:
raise RuntimeError(
"Remote tar failed.\n"
f"Command: {cmd}\n"
f"Exit code: {rc}\n"
f"Stderr: {err.strip()}"
)
sftp.get(rtgz, str(local_tgz))
# Extract into the destination.
with tarfile.open(local_tgz, mode="r:gz") as tf:
_safe_extract_tar(tf, local_out_dir)
# Cleanup remote tmpdir.
_ssh_run(ssh, f"rm -rf {rtmp}")
finally:
try:
sftp.close()
ssh.close()
except Exception:
ssh.close()
raise RuntimeError("Something went wrong generating the harvest")
return local_out_dir / "state.json"