Add --sops mode to encrypt harvest and manifest data at rest (especially useful if using --dangerous)
Some checks failed
CI / test (push) Successful in 5m35s
Lint / test (push) Failing after 29s
Trivy / test (push) Successful in 18s

This commit is contained in:
Miguel Jacq 2025-12-17 18:51:40 +11:00
parent 6a36a9d2d5
commit 33b1176800
Signed by: mig5
GPG key ID: 59B3F0C24135C6A9
12 changed files with 760 additions and 117 deletions

View file

@ -2,6 +2,8 @@ from __future__ import annotations
import argparse
import os
import tarfile
import tempfile
from pathlib import Path
from typing import Optional
@ -9,6 +11,56 @@ from .cache import new_harvest_cache_dir
from .harvest import harvest
from .manifest import manifest
from .remote import remote_harvest
from .sopsutil import SopsError, encrypt_file_binary
def _resolve_sops_out_file(out: Optional[str], *, hint: str) -> Path:
"""Resolve an output *file* path for --sops mode.
If `out` looks like a directory (or points to an existing directory), we
place the encrypted harvest inside it as harvest.tar.gz.sops.
"""
if out:
p = Path(out).expanduser()
if p.exists() and p.is_dir():
return p / "harvest.tar.gz.sops"
# Heuristic: treat paths with a suffix as files; otherwise directories.
if p.suffix:
return p
return p / "harvest.tar.gz.sops"
# Default: use a secure cache directory.
d = new_harvest_cache_dir(hint=hint).dir
return d / "harvest.tar.gz.sops"
def _tar_dir_to(path_dir: Path, tar_path: Path) -> None:
tar_path.parent.mkdir(parents=True, exist_ok=True)
with tarfile.open(tar_path, mode="w:gz") as tf:
# Keep a stable on-disk layout when extracted: state.json + artifacts/
tf.add(str(path_dir), arcname=".")
def _encrypt_harvest_dir_to_sops(
bundle_dir: Path, out_file: Path, fps: list[str]
) -> Path:
out_file = Path(out_file)
out_file.parent.mkdir(parents=True, exist_ok=True)
# Create the tarball alongside the output file (keeps filesystem permissions/locality sane).
fd, tmp_tgz = tempfile.mkstemp(
prefix=".enroll-harvest-", suffix=".tar.gz", dir=str(out_file.parent)
)
os.close(fd)
try:
_tar_dir_to(bundle_dir, Path(tmp_tgz))
encrypt_file_binary(Path(tmp_tgz), out_file, pgp_fingerprints=fps, mode=0o600)
finally:
try:
os.unlink(tmp_tgz)
except FileNotFoundError:
pass
return out_file
def _add_common_manifest_args(p: argparse.ArgumentParser) -> None:
@ -60,12 +112,27 @@ def main() -> None:
sub = ap.add_subparsers(dest="cmd", required=True)
h = sub.add_parser("harvest", help="Harvest service/package/config state")
h.add_argument("--out", help="Harvest output directory")
h.add_argument(
"--out",
help=(
"Harvest output directory. If --sops is set, this may be either a directory "
"(an encrypted file named harvest.tar.gz.sops will be created inside) or a file path."
),
)
h.add_argument(
"--dangerous",
action="store_true",
help="Collect files more aggressively (may include secrets). Disables secret-avoidance checks.",
)
h.add_argument(
"--sops",
nargs="+",
metavar="GPG_FINGERPRINT",
help=(
"Encrypt the harvest output as a SOPS-encrypted tarball using the given GPG fingerprint(s). "
"Requires `sops` on PATH."
),
)
h.add_argument(
"--no-sudo",
action="store_true",
@ -77,24 +144,56 @@ def main() -> None:
m.add_argument(
"--harvest",
required=True,
help="Path to the directory created by the harvest command",
help=(
"Path to the directory created by the harvest command, or (with --sops) "
"a SOPS-encrypted harvest tarball."
),
)
m.add_argument(
"--out",
required=True,
help="Output directory for generated roles/playbook Ansible manifest",
help=(
"Output location for the generated manifest. In plain mode this is a directory. "
"In --sops mode this may be either a directory (an encrypted file named manifest.tar.gz.sops will be created inside) "
"or a file path."
),
)
m.add_argument(
"--sops",
nargs="+",
metavar="GPG_FINGERPRINT",
help=(
"In --sops mode, decrypt the harvest using `sops -d` (if the harvest is an encrypted file) "
"and then bundle+encrypt the entire generated manifest output into a single SOPS-encrypted tarball "
"(binary) using the given GPG fingerprint(s). Requires `sops` on PATH."
),
)
_add_common_manifest_args(m)
s = sub.add_parser(
"single-shot", help="Harvest state, then manifest Ansible code, in one shot"
)
s.add_argument("--harvest", help="Path to the directory to place the harvest in")
s.add_argument(
"--harvest",
help=(
"Where to place the harvest. In plain mode this is a directory; in --sops mode this may be "
"a directory or a file path (an encrypted file is produced)."
),
)
s.add_argument(
"--dangerous",
action="store_true",
help="Collect files more aggressively (may include secrets). Disables secret-avoidance checks.",
)
s.add_argument(
"--sops",
nargs="+",
metavar="GPG_FINGERPRINT",
help=(
"Encrypt the harvest as a SOPS-encrypted tarball, and bundle+encrypt the manifest output in --out "
"(same behavior as `harvest --sops` and `manifest --sops`)."
),
)
s.add_argument(
"--no-sudo",
action="store_true",
@ -103,7 +202,11 @@ def main() -> None:
s.add_argument(
"--out",
required=True,
help="Output directory for generated roles/playbook Ansible manifest",
help=(
"Output location for the generated manifest. In plain mode this is a directory. "
"In --sops mode this may be either a directory (an encrypted file named manifest.tar.gz.sops will be created inside) "
"or a file path."
),
)
_add_common_manifest_args(s)
_add_remote_args(s)
@ -112,54 +215,169 @@ def main() -> None:
remote_host: Optional[str] = getattr(args, "remote_host", None)
if args.cmd == "harvest":
if remote_host:
out_dir = (
Path(args.out)
if args.out
else new_harvest_cache_dir(hint=remote_host).dir
try:
if args.cmd == "harvest":
sops_fps = getattr(args, "sops", None)
if remote_host:
if sops_fps:
out_file = _resolve_sops_out_file(args.out, hint=remote_host)
with tempfile.TemporaryDirectory(prefix="enroll-harvest-") as td:
tmp_bundle = Path(td) / "bundle"
tmp_bundle.mkdir(parents=True, exist_ok=True)
try:
os.chmod(tmp_bundle, 0o700)
except OSError:
pass
remote_harvest(
local_out_dir=tmp_bundle,
remote_host=remote_host,
remote_port=int(args.remote_port),
remote_user=args.remote_user,
dangerous=bool(args.dangerous),
no_sudo=bool(args.no_sudo),
)
_encrypt_harvest_dir_to_sops(
tmp_bundle, out_file, list(sops_fps)
)
print(str(out_file))
else:
out_dir = (
Path(args.out)
if args.out
else new_harvest_cache_dir(hint=remote_host).dir
)
state = remote_harvest(
local_out_dir=out_dir,
remote_host=remote_host,
remote_port=int(args.remote_port),
remote_user=args.remote_user,
dangerous=bool(args.dangerous),
no_sudo=bool(args.no_sudo),
)
print(str(state))
else:
if sops_fps:
out_file = _resolve_sops_out_file(args.out, hint="local")
with tempfile.TemporaryDirectory(prefix="enroll-harvest-") as td:
tmp_bundle = Path(td) / "bundle"
tmp_bundle.mkdir(parents=True, exist_ok=True)
try:
os.chmod(tmp_bundle, 0o700)
except OSError:
pass
harvest(str(tmp_bundle), dangerous=bool(args.dangerous))
_encrypt_harvest_dir_to_sops(
tmp_bundle, out_file, list(sops_fps)
)
print(str(out_file))
else:
if not args.out:
raise SystemExit(
"error: --out is required unless --remote-host is set"
)
path = harvest(args.out, dangerous=bool(args.dangerous))
print(path)
elif args.cmd == "manifest":
out_enc = manifest(
args.harvest,
args.out,
fqdn=args.fqdn,
jinjaturtle=_jt_mode(args),
sops_fingerprints=getattr(args, "sops", None),
)
state = remote_harvest(
local_out_dir=out_dir,
remote_host=remote_host,
remote_port=int(args.remote_port),
remote_user=args.remote_user,
dangerous=bool(args.dangerous),
no_sudo=bool(args.no_sudo),
)
print(str(state))
else:
if not args.out:
raise SystemExit("error: --out is required unless --remote-host is set")
path = harvest(args.out, dangerous=bool(args.dangerous))
print(path)
elif args.cmd == "manifest":
manifest(args.harvest, args.out, fqdn=args.fqdn, jinjaturtle=_jt_mode(args))
elif args.cmd == "single-shot":
if remote_host:
harvest_dir = (
Path(args.harvest)
if args.harvest
else new_harvest_cache_dir(hint=remote_host).dir
)
remote_harvest(
local_out_dir=harvest_dir,
remote_host=remote_host,
remote_port=int(args.remote_port),
remote_user=args.remote_user,
dangerous=bool(args.dangerous),
no_sudo=bool(args.no_sudo),
)
manifest(
str(harvest_dir), args.out, fqdn=args.fqdn, jinjaturtle=_jt_mode(args)
)
# For usability (when --harvest wasn't provided), print the harvest path.
if not args.harvest:
print(str(harvest_dir / "state.json"))
else:
if not args.harvest:
raise SystemExit(
"error: --harvest is required unless --remote-host is set"
)
harvest(args.harvest, dangerous=bool(args.dangerous))
manifest(args.harvest, args.out, fqdn=args.fqdn, jinjaturtle=_jt_mode(args))
if getattr(args, "sops", None) and out_enc:
print(str(out_enc))
elif args.cmd == "single-shot":
sops_fps = getattr(args, "sops", None)
if remote_host:
if sops_fps:
out_file = _resolve_sops_out_file(args.harvest, hint=remote_host)
with tempfile.TemporaryDirectory(prefix="enroll-harvest-") as td:
tmp_bundle = Path(td) / "bundle"
tmp_bundle.mkdir(parents=True, exist_ok=True)
try:
os.chmod(tmp_bundle, 0o700)
except OSError:
pass
remote_harvest(
local_out_dir=tmp_bundle,
remote_host=remote_host,
remote_port=int(args.remote_port),
remote_user=args.remote_user,
dangerous=bool(args.dangerous),
no_sudo=bool(args.no_sudo),
)
_encrypt_harvest_dir_to_sops(
tmp_bundle, out_file, list(sops_fps)
)
manifest(
str(out_file),
args.out,
fqdn=args.fqdn,
jinjaturtle=_jt_mode(args),
sops_fingerprints=list(sops_fps),
)
if not args.harvest:
print(str(out_file))
else:
harvest_dir = (
Path(args.harvest)
if args.harvest
else new_harvest_cache_dir(hint=remote_host).dir
)
remote_harvest(
local_out_dir=harvest_dir,
remote_host=remote_host,
remote_port=int(args.remote_port),
remote_user=args.remote_user,
dangerous=bool(args.dangerous),
no_sudo=bool(args.no_sudo),
)
manifest(
str(harvest_dir),
args.out,
fqdn=args.fqdn,
jinjaturtle=_jt_mode(args),
)
# For usability (when --harvest wasn't provided), print the harvest path.
if not args.harvest:
print(str(harvest_dir / "state.json"))
else:
if sops_fps:
out_file = _resolve_sops_out_file(args.harvest, hint="local")
with tempfile.TemporaryDirectory(prefix="enroll-harvest-") as td:
tmp_bundle = Path(td) / "bundle"
tmp_bundle.mkdir(parents=True, exist_ok=True)
try:
os.chmod(tmp_bundle, 0o700)
except OSError:
pass
harvest(str(tmp_bundle), dangerous=bool(args.dangerous))
_encrypt_harvest_dir_to_sops(
tmp_bundle, out_file, list(sops_fps)
)
manifest(
str(out_file),
args.out,
fqdn=args.fqdn,
jinjaturtle=_jt_mode(args),
sops_fingerprints=list(sops_fps),
)
if not args.harvest:
print(str(out_file))
else:
if not args.harvest:
raise SystemExit(
"error: --harvest is required unless --remote-host is set"
)
harvest(args.harvest, dangerous=bool(args.dangerous))
manifest(
args.harvest,
args.out,
fqdn=args.fqdn,
jinjaturtle=_jt_mode(args),
)
except SopsError as e:
raise SystemExit(f"error: {e}")

View file

@ -73,6 +73,10 @@ class IgnorePolicy:
yield raw
def deny_reason(self, path: str) -> Optional[str]:
# Always ignore plain *.log files (rarely useful as config, often noisy).
if path.endswith(".log"):
return "log_file"
if not self.dangerous:
for g in self.deny_globs or []:
if fnmatch.fnmatch(path, g):

View file

@ -1,6 +1,5 @@
from __future__ import annotations
import re
import shutil
import subprocess # nosec
import tempfile

View file

@ -4,6 +4,7 @@ import json
import os
import shutil
import stat
import tarfile
import tempfile
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple
@ -14,9 +15,12 @@ from .jinjaturtle import (
run_jinjaturtle,
)
JINJATURTLE_BEGIN = "# BEGIN JINJATURTLE (generated by enroll)"
JINJATURTLE_END = "# END JINJATURTLE"
from .remote import _safe_extract_tar
from .sopsutil import (
decrypt_file_binary_to,
encrypt_file_binary,
require_sops_cmd,
)
def _try_yaml():
@ -85,24 +89,6 @@ def _merge_mappings_overwrite(
return merged
def _normalise_jinjaturtle_vars_text(vars_text: str) -> str:
"""Deduplicate keys in a vars fragment by parsing as YAML and dumping it back."""
m = _yaml_load_mapping(vars_text)
if not m:
# if YAML isn't available or parsing failed, return raw text (best-effort)
return vars_text.rstrip() + (
"\n" if vars_text and not vars_text.endswith("\n") else ""
)
return _yaml_dump_mapping(m, sort_keys=True)
def _yaml_list(items: List[str], indent: int = 2) -> str:
pad = " " * indent
if not items:
return f"{pad}[]"
return "\n".join(f"{pad}- {x}" for x in items)
def _copy2_replace(src: str, dst: str) -> None:
dst_dir = os.path.dirname(dst)
os.makedirs(dst_dir, exist_ok=True)
@ -349,23 +335,6 @@ def _jinjify_managed_files(
return templated, ""
def _defaults_with_jinjaturtle(base_defaults: str, vars_text: str) -> str:
if not vars_text.strip():
return base_defaults.rstrip() + "\n"
vars_text = _normalise_jinjaturtle_vars_text(vars_text)
# Always regenerate the block (we regenerate whole defaults files anyway)
return (
base_defaults.rstrip()
+ "\n\n"
+ JINJATURTLE_BEGIN
+ "\n"
+ vars_text.rstrip()
+ "\n"
+ JINJATURTLE_END
+ "\n"
)
def _write_role_defaults(role_dir: str, mapping: Dict[str, Any]) -> None:
"""Overwrite role defaults/main.yml with the provided mapping."""
defaults_path = os.path.join(role_dir, "defaults", "main.yml")
@ -499,7 +468,153 @@ def _render_generic_files_tasks(
"""
def manifest(
def _prepare_bundle_dir(
bundle: str,
*,
sops_mode: bool,
) -> tuple[str, Optional[tempfile.TemporaryDirectory]]:
"""Return (bundle_dir, tempdir).
- In non-sops mode, `bundle` must be a directory.
- In sops mode, `bundle` may be a directory (already-decrypted) *or*
a SOPS-encrypted tarball. In the tarball case we decrypt+extract into
a secure temp directory.
"""
p = Path(bundle).expanduser()
if p.is_dir():
return str(p), None
if not sops_mode:
raise RuntimeError(f"Harvest path is not a directory: {p}")
if not p.exists():
raise RuntimeError(f"Harvest path not found: {p}")
# Ensure sops is available early for clear error messages.
require_sops_cmd()
td = tempfile.TemporaryDirectory(prefix="enroll-harvest-")
td_path = Path(td.name)
try:
os.chmod(td_path, 0o700)
except OSError:
pass
tar_path = td_path / "harvest.tar.gz"
out_dir = td_path / "bundle"
out_dir.mkdir(parents=True, exist_ok=True)
try:
os.chmod(out_dir, 0o700)
except OSError:
pass
decrypt_file_binary_to(p, tar_path, mode=0o600)
# Extract using the same safe extraction rules as remote harvesting.
with tarfile.open(tar_path, mode="r:gz") as tf:
_safe_extract_tar(tf, out_dir)
return str(out_dir), td
def _resolve_sops_manifest_out_file(out: str) -> Path:
"""Resolve an output *file* path for manifest --sops mode.
If `out` looks like a directory (or points to an existing directory), we
place the encrypted manifest bundle inside it as manifest.tar.gz.sops.
"""
p = Path(out).expanduser()
if p.exists() and p.is_dir():
return p / "manifest.tar.gz.sops"
# Heuristic: treat paths with a suffix as files; otherwise directories.
if p.suffix:
return p
return p / "manifest.tar.gz.sops"
def _tar_dir_to_with_progress(
src_dir: Path, tar_path: Path, *, desc: str = "tarring"
) -> None:
"""Create a tar.gz of src_dir at tar_path, with a simple per-entry progress display."""
src_dir = Path(src_dir)
tar_path = Path(tar_path)
tar_path.parent.mkdir(parents=True, exist_ok=True)
# Collect paths (dirs + files)
paths: list[Path] = [src_dir]
for root, dirs, files in os.walk(str(src_dir)):
root_p = Path(root)
for d in sorted(dirs):
paths.append(root_p / d)
for f in sorted(files):
paths.append(root_p / f)
total = len(paths)
is_tty = hasattr(os, "isatty") and os.isatty(2)
def _print_progress(i: int, p: Path) -> None:
if not is_tty:
return
pct = (i / total * 100.0) if total else 100.0
rel = "."
try:
rel = str(p.relative_to(src_dir))
except Exception:
rel = str(p)
msg = f"{desc}: {i}/{total} ({pct:5.1f}%) {rel}"
try:
cols = shutil.get_terminal_size((80, 20)).columns
msg = msg[: cols - 1]
except Exception:
pass
os.write(2, ("\r" + msg).encode("utf-8", errors="replace"))
with tarfile.open(tar_path, mode="w:gz") as tf:
prefix = Path("manifest")
for i, p in enumerate(paths, start=1):
if p == src_dir:
arcname = str(prefix)
else:
rel = p.relative_to(src_dir)
arcname = str(prefix / rel)
tf.add(str(p), arcname=arcname, recursive=False)
_print_progress(i, p)
if is_tty:
os.write(2, b"\n")
def _encrypt_manifest_out_dir_to_sops(
out_dir: Path, out_file: Path, fps: list[str]
) -> Path:
"""Tar+encrypt the generated manifest output directory into a single .sops file."""
require_sops_cmd()
out_file = Path(out_file)
out_file.parent.mkdir(parents=True, exist_ok=True)
fd, tmp_tgz = tempfile.mkstemp(
prefix=".enroll-manifest-",
suffix=".tar.gz",
dir=str(out_file.parent),
)
os.close(fd)
try:
_tar_dir_to_with_progress(
Path(out_dir), Path(tmp_tgz), desc="Bundling manifest"
)
encrypt_file_binary(Path(tmp_tgz), out_file, pgp_fingerprints=fps, mode=0o600)
finally:
try:
os.unlink(tmp_tgz)
except FileNotFoundError:
pass
return out_file
def _manifest_from_bundle_dir(
bundle_dir: str,
out_dir: str,
*,
@ -1204,3 +1319,69 @@ Generated for package `{pkg}`.
)
else:
_write_playbook_all(os.path.join(out_dir, "playbook.yml"), all_roles)
def manifest(
bundle_dir: str,
out: str,
*,
fqdn: Optional[str] = None,
jinjaturtle: str = "auto", # auto|on|off
sops_fingerprints: Optional[List[str]] = None,
) -> Optional[str]:
"""Render an Ansible manifest from a harvest.
Plain mode:
- `bundle_dir` must be a directory
- `out` is a directory written in-place
SOPS mode (when `sops_fingerprints` is provided):
- `bundle_dir` may be either a directory (already decrypted) or a SOPS
encrypted tarball (binary) produced by `harvest --sops`
- the manifest output is bundled (tar.gz) and encrypted into a single
SOPS file (binary) at the resolved output path.
Returns:
- In SOPS mode: the path to the encrypted manifest bundle (.sops)
- In plain mode: None
"""
sops_mode = bool(sops_fingerprints)
# Decrypt/extract the harvest bundle if needed.
resolved_bundle_dir, td_bundle = _prepare_bundle_dir(
bundle_dir, sops_mode=sops_mode
)
td_out: Optional[tempfile.TemporaryDirectory] = None
try:
if not sops_mode:
_manifest_from_bundle_dir(
resolved_bundle_dir, out, fqdn=fqdn, jinjaturtle=jinjaturtle
)
return None
# SOPS mode: generate into a secure temp dir, then tar+encrypt into a single file.
out_file = _resolve_sops_manifest_out_file(out)
td_out = tempfile.TemporaryDirectory(prefix="enroll-manifest-")
tmp_out = Path(td_out.name) / "out"
tmp_out.mkdir(parents=True, exist_ok=True)
try:
os.chmod(tmp_out, 0o700)
except OSError:
pass
_manifest_from_bundle_dir(
resolved_bundle_dir, str(tmp_out), fqdn=fqdn, jinjaturtle=jinjaturtle
)
enc = _encrypt_manifest_out_dir_to_sops(
tmp_out, out_file, list(sops_fingerprints or [])
)
return str(enc)
finally:
if td_out is not None:
td_out.cleanup()
if td_bundle is not None:
td_bundle.cleanup()

View file

@ -138,15 +138,29 @@ def remote_harvest(
look_for_keys=True,
)
# If no username was explicitly provided, SSH may have selected a default.
# We need a concrete username for the (sudo) chown step below.
resolved_user = remote_user
if not resolved_user:
rc, out, err = _ssh_run(ssh, "id -un")
if rc == 0 and out.strip():
resolved_user = out.strip()
sftp = ssh.open_sftp()
rtmp: Optional[str] = None
try:
rc, out, err = _ssh_run(ssh, "mktemp -d")
if rc != 0:
raise RuntimeError(f"Remote mktemp failed: {err.strip()}")
rtmp = out.strip()
# Be explicit: restrict the remote staging area to the current user.
rc, out, err = _ssh_run(ssh, f"chmod 700 {rtmp}")
if rc != 0:
raise RuntimeError(f"Remote chmod failed: {err.strip()}")
rapp = f"{rtmp}/enroll.pyz"
rbundle = f"{rtmp}/bundle"
rtgz = f"{rtmp}/bundle.tgz"
sftp.put(str(pyz), rapp)
@ -169,7 +183,12 @@ def remote_harvest(
if not no_sudo:
# Ensure user can read the files, before we tar it
cmd = f"sudo chown -R {remote_user} {rbundle}"
if not resolved_user:
raise RuntimeError(
"Unable to determine remote username for chown. "
"Pass --remote-user explicitly or use --no-sudo."
)
cmd = f"sudo chown -R {resolved_user} {rbundle}"
rc, out, err = _ssh_run(ssh, cmd)
if rc != 0:
raise RuntimeError(
@ -179,26 +198,33 @@ def remote_harvest(
f"Stderr: {err.strip()}"
)
# Tar the bundle for efficient download.
cmd = f"tar -czf {rtgz} -C {rbundle} ."
rc, out, err = _ssh_run(ssh, cmd)
# Stream a tarball back to the local machine (avoid creating a tar file on the remote).
cmd = f"tar -cz -C {rbundle} ."
_stdin, stdout, stderr = ssh.exec_command(cmd)
with open(local_tgz, "wb") as f:
while True:
chunk = stdout.read(1024 * 128)
if not chunk:
break
f.write(chunk)
rc = stdout.channel.recv_exit_status()
err_text = stderr.read().decode("utf-8", errors="replace")
if rc != 0:
raise RuntimeError(
"Remote tar failed.\n"
"Remote tar stream failed.\n"
f"Command: {cmd}\n"
f"Exit code: {rc}\n"
f"Stderr: {err.strip()}"
f"Stderr: {err_text.strip()}"
)
sftp.get(rtgz, str(local_tgz))
# Extract into the destination.
with tarfile.open(local_tgz, mode="r:gz") as tf:
_safe_extract_tar(tf, local_out_dir)
# Cleanup remote tmpdir.
_ssh_run(ssh, f"rm -rf {rtmp}")
finally:
# Cleanup remote tmpdir even on failure.
if rtmp:
_ssh_run(ssh, f"rm -rf {rtmp}")
try:
sftp.close()
ssh.close()

137
enroll/sopsutil.py Normal file
View file

@ -0,0 +1,137 @@
from __future__ import annotations
import os
import shutil
import subprocess
import tempfile
from pathlib import Path
from typing import Iterable, List, Optional
class SopsError(RuntimeError):
pass
def find_sops_cmd() -> Optional[str]:
"""Return the `sops` executable path if present on PATH."""
return shutil.which("sops")
def require_sops_cmd() -> str:
exe = find_sops_cmd()
if not exe:
raise SopsError(
"--sops was requested but `sops` was not found on PATH. "
"Install sops and ensure it is available as `sops`."
)
return exe
def _pgp_arg(fingerprints: Iterable[str]) -> str:
fps = [f.strip() for f in fingerprints if f and f.strip()]
if not fps:
raise SopsError("No GPG fingerprints provided for --sops")
# sops accepts a comma-separated list for --pgp.
return ",".join(fps)
def encrypt_file_binary(
src_path: Path,
dst_path: Path,
*,
pgp_fingerprints: List[str],
mode: int = 0o600,
) -> None:
"""Encrypt src_path with sops (binary) and write to dst_path atomically."""
sops = require_sops_cmd()
src_path = Path(src_path)
dst_path = Path(dst_path)
dst_path.parent.mkdir(parents=True, exist_ok=True)
res = subprocess.run(
[
sops,
"--encrypt",
"--input-type",
"binary",
"--output-type",
"binary",
"--pgp",
_pgp_arg(pgp_fingerprints),
str(src_path),
],
capture_output=True,
check=False,
)
if res.returncode != 0:
raise SopsError(
"sops encryption failed:\n"
f" cmd: {sops} --encrypt ... {src_path}\n"
f" rc: {res.returncode}\n"
f" stderr: {res.stderr.decode('utf-8', errors='replace').strip()}"
)
# Write atomically in the destination directory.
fd, tmp = tempfile.mkstemp(prefix=".enroll-sops-", dir=str(dst_path.parent))
try:
with os.fdopen(fd, "wb") as f:
f.write(res.stdout)
try:
os.chmod(tmp, mode)
except OSError:
pass
os.replace(tmp, dst_path)
finally:
try:
os.unlink(tmp)
except FileNotFoundError:
pass
def decrypt_file_binary_to(
src_path: Path,
dst_path: Path,
*,
mode: int = 0o600,
) -> None:
"""Decrypt a sops-encrypted file (binary) into dst_path."""
sops = require_sops_cmd()
src_path = Path(src_path)
dst_path = Path(dst_path)
dst_path.parent.mkdir(parents=True, exist_ok=True)
res = subprocess.run(
[
sops,
"--decrypt",
"--input-type",
"binary",
"--output-type",
"binary",
str(src_path),
],
capture_output=True,
check=False,
)
if res.returncode != 0:
raise SopsError(
"sops decryption failed:\n"
f" cmd: {sops} --decrypt ... {src_path}\n"
f" rc: {res.returncode}\n"
f" stderr: {res.stderr.decode('utf-8', errors='replace').strip()}"
)
fd, tmp = tempfile.mkstemp(prefix=".enroll-sops-", dir=str(dst_path.parent))
try:
with os.fdopen(fd, "wb") as f:
f.write(res.stdout)
try:
os.chmod(tmp, mode)
except OSError:
pass
os.replace(tmp, dst_path)
finally:
try:
os.unlink(tmp)
except FileNotFoundError:
pass