enroll/enroll/sopsutil.py
Miguel Jacq a235028f3b
All checks were successful
CI / test (push) Successful in 5m38s
Lint / test (push) Successful in 27s
Trivy / test (push) Successful in 21s
black
2025-12-18 13:34:37 +11:00

137 lines
3.5 KiB
Python

from __future__ import annotations
import os
import shutil
import subprocess # nosec
import tempfile
from pathlib import Path
from typing import Iterable, List, Optional
class SopsError(RuntimeError):
pass
def find_sops_cmd() -> Optional[str]:
"""Return the `sops` executable path if present on PATH."""
return shutil.which("sops")
def require_sops_cmd() -> str:
exe = find_sops_cmd()
if not exe:
raise SopsError(
"--sops was requested but `sops` was not found on PATH. "
"Install sops and ensure it is available as `sops`."
)
return exe
def _pgp_arg(fingerprints: Iterable[str]) -> str:
fps = [f.strip() for f in fingerprints if f and f.strip()]
if not fps:
raise SopsError("No GPG fingerprints provided for --sops")
# sops accepts a comma-separated list for --pgp.
return ",".join(fps)
def encrypt_file_binary(
src_path: Path,
dst_path: Path,
*,
pgp_fingerprints: List[str],
mode: int = 0o600,
) -> None:
"""Encrypt src_path with sops (binary) and write to dst_path atomically."""
sops = require_sops_cmd()
src_path = Path(src_path)
dst_path = Path(dst_path)
dst_path.parent.mkdir(parents=True, exist_ok=True)
res = subprocess.run(
[
sops,
"--encrypt",
"--input-type",
"binary",
"--output-type",
"binary",
"--pgp",
_pgp_arg(pgp_fingerprints),
str(src_path),
],
capture_output=True,
check=False,
) # nosec
if res.returncode != 0:
raise SopsError(
"sops encryption failed:\n"
f" cmd: {sops} --encrypt ... {src_path}\n"
f" rc: {res.returncode}\n"
f" stderr: {res.stderr.decode('utf-8', errors='replace').strip()}"
)
# Write atomically in the destination directory.
fd, tmp = tempfile.mkstemp(prefix=".enroll-sops-", dir=str(dst_path.parent))
try:
with os.fdopen(fd, "wb") as f:
f.write(res.stdout)
try:
os.chmod(tmp, mode)
except OSError:
pass
os.replace(tmp, dst_path)
finally:
try:
os.unlink(tmp)
except FileNotFoundError:
pass
def decrypt_file_binary_to(
src_path: Path,
dst_path: Path,
*,
mode: int = 0o600,
) -> None:
"""Decrypt a sops-encrypted file (binary) into dst_path."""
sops = require_sops_cmd()
src_path = Path(src_path)
dst_path = Path(dst_path)
dst_path.parent.mkdir(parents=True, exist_ok=True)
res = subprocess.run(
[
sops,
"--decrypt",
"--input-type",
"binary",
"--output-type",
"binary",
str(src_path),
],
capture_output=True,
check=False,
) # nosec
if res.returncode != 0:
raise SopsError(
"sops decryption failed:\n"
f" cmd: {sops} --decrypt ... {src_path}\n"
f" rc: {res.returncode}\n"
f" stderr: {res.stderr.decode('utf-8', errors='replace').strip()}"
)
fd, tmp = tempfile.mkstemp(prefix=".enroll-sops-", dir=str(dst_path.parent))
try:
with os.fdopen(fd, "wb") as f:
f.write(res.stdout)
try:
os.chmod(tmp, mode)
except OSError:
pass
os.replace(tmp, dst_path)
finally:
try:
os.unlink(tmp)
except FileNotFoundError:
pass