Support for remote hosts that require password for sudo.
Some checks failed
Lint / test (push) Waiting to run
Trivy / test (push) Waiting to run
CI / test (push) Has been cancelled

Introduce --ask-become-pass or -K to support password-required sudo on remote hosts, just like Ansible.

It will also fall back to this prompt if a password is required but the arg wasn't passed in.

With thanks to slhck from HN for the initial patch, advice and feedback.
This commit is contained in:
Miguel Jacq 2026-01-04 20:49:10 +11:00
parent 9df4dc862d
commit a2be708a31
Signed by: mig5
GPG key ID: 59B3F0C24135C6A9
4 changed files with 678 additions and 31 deletions

View file

@ -13,7 +13,7 @@ from .cache import new_harvest_cache_dir
from .diff import compare_harvests, format_report, post_webhook, send_email
from .harvest import harvest
from .manifest import manifest
from .remote import remote_harvest
from .remote import remote_harvest, RemoteSudoPasswordRequired
from .sopsutil import SopsError, encrypt_file_binary
from .version import get_enroll_version
@ -352,6 +352,17 @@ def _add_remote_args(p: argparse.ArgumentParser) -> None:
help="SSH username for --remote-host (default: local $USER).",
)
# Align terminology with Ansible: "become" == sudo.
p.add_argument(
"--ask-become-pass",
"-K",
action="store_true",
help=(
"Prompt for the remote sudo (become) password when using --remote-host "
"(similar to ansible --ask-become-pass)."
),
)
def main() -> None:
ap = argparse.ArgumentParser(prog="enroll")
@ -623,6 +634,7 @@ def main() -> None:
except OSError:
pass
remote_harvest(
ask_become_pass=args.ask_become_pass,
local_out_dir=tmp_bundle,
remote_host=args.remote_host,
remote_port=int(args.remote_port),
@ -643,6 +655,7 @@ def main() -> None:
else new_harvest_cache_dir(hint=args.remote_host).dir
)
state = remote_harvest(
ask_become_pass=args.ask_become_pass,
local_out_dir=out_dir,
remote_host=args.remote_host,
remote_port=int(args.remote_port),
@ -769,6 +782,7 @@ def main() -> None:
except OSError:
pass
remote_harvest(
ask_become_pass=args.ask_become_pass,
local_out_dir=tmp_bundle,
remote_host=args.remote_host,
remote_port=int(args.remote_port),
@ -798,6 +812,7 @@ def main() -> None:
else new_harvest_cache_dir(hint=args.remote_host).dir
)
remote_harvest(
ask_become_pass=args.ask_become_pass,
local_out_dir=harvest_dir,
remote_host=args.remote_host,
remote_port=int(args.remote_port),
@ -912,5 +927,11 @@ def main() -> None:
if getattr(args, "exit_code", False) and has_changes:
raise SystemExit(2)
except RemoteSudoPasswordRequired:
raise SystemExit(
"error: remote sudo requires a password. Re-run with --ask-become-pass."
) from None
except RuntimeError as e:
raise SystemExit(f"error: {e}") from None
except SopsError as e:
raise SystemExit(f"error: {e}")
raise SystemExit(f"error: {e}") from None

View file

@ -1,14 +1,117 @@
from __future__ import annotations
import getpass
import os
import shlex
import shutil
import sys
import time
import tarfile
import tempfile
import zipapp
from pathlib import Path
from pathlib import PurePosixPath
from typing import Optional
from typing import Optional, Callable, TextIO
class RemoteSudoPasswordRequired(RuntimeError):
"""Raised when sudo requires a password but none was provided."""
def _sudo_password_required(out: str, err: str) -> bool:
"""Return True if sudo output indicates it needs a password/TTY."""
blob = (out + "\n" + err).lower()
patterns = (
"a password is required",
"password is required",
"a terminal is required to read the password",
"no tty present and no askpass program specified",
"must have a tty to run sudo",
"sudo: sorry, you must have a tty",
"askpass",
)
return any(p in blob for p in patterns)
def _sudo_not_permitted(out: str, err: str) -> bool:
"""Return True if sudo output indicates the user cannot sudo at all."""
blob = (out + "\n" + err).lower()
patterns = (
"is not in the sudoers file",
"not allowed to execute",
"may not run sudo",
"sorry, user",
)
return any(p in blob for p in patterns)
def _sudo_tty_required(out: str, err: str) -> bool:
"""Return True if sudo output indicates it requires a TTY (sudoers requiretty)."""
blob = (out + "\n" + err).lower()
patterns = (
"must have a tty",
"sorry, you must have a tty",
"sudo: sorry, you must have a tty",
"must have a tty to run sudo",
)
return any(p in blob for p in patterns)
def _resolve_become_password(
ask_become_pass: bool,
*,
prompt: str = "sudo password: ",
getpass_fn: Callable[[str], str] = getpass.getpass,
) -> Optional[str]:
if ask_become_pass:
return getpass_fn(prompt)
return None
def remote_harvest(
*,
ask_become_pass: bool = False,
no_sudo: bool = False,
prompt: str = "sudo password: ",
getpass_fn: Optional[Callable[[str], str]] = None,
stdin: Optional[TextIO] = None,
**kwargs,
):
"""Call _remote_harvest, with a safe sudo password fallback.
Behavior:
- Run without a password unless --ask-become-pass is set.
- If the remote sudo policy requires a password and none was provided,
prompt and retry when running interactively.
"""
# Resolve defaults at call time (easier to test/monkeypatch, and avoids capturing
# sys.stdin / getpass.getpass at import time).
if getpass_fn is None:
getpass_fn = getpass.getpass
if stdin is None:
stdin = sys.stdin
sudo_password = _resolve_become_password(
ask_become_pass and not no_sudo,
prompt=prompt,
getpass_fn=getpass_fn,
)
try:
return _remote_harvest(sudo_password=sudo_password, no_sudo=no_sudo, **kwargs)
except RemoteSudoPasswordRequired:
if sudo_password is not None:
raise
# Fallback prompt if interactive
if stdin is not None and getattr(stdin, "isatty", lambda: False)():
pw = getpass_fn(prompt)
return _remote_harvest(sudo_password=pw, no_sudo=no_sudo, **kwargs)
raise RemoteSudoPasswordRequired(
"Remote sudo requires a password. Re-run with --ask-become-pass."
)
def _safe_extract_tar(tar: tarfile.TarFile, dest: Path) -> None:
@ -79,7 +182,14 @@ def _build_enroll_pyz(tmpdir: Path) -> Path:
return pyz_path
def _ssh_run(ssh, cmd: str, *, get_pty: bool = False) -> tuple[int, str, str]:
def _ssh_run(
ssh,
cmd: str,
*,
get_pty: bool = False,
stdin_text: Optional[str] = None,
close_stdin: bool = False,
) -> tuple[int, str, str]:
"""Run a command over a Paramiko SSHClient.
Paramiko's exec_command runs commands without a TTY by default.
@ -90,14 +200,133 @@ def _ssh_run(ssh, cmd: str, *, get_pty: bool = False) -> tuple[int, str, str]:
We do not request a PTY for commands that stream binary data
(e.g. tar/gzip output), as a PTY can corrupt the byte stream.
"""
_stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=get_pty)
out = stdout.read().decode("utf-8", errors="replace")
err = stderr.read().decode("utf-8", errors="replace")
rc = stdout.channel.recv_exit_status()
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=get_pty)
# All three file-like objects share the same underlying Channel.
chan = stdout.channel
if stdin_text is not None and stdin is not None:
try:
stdin.write(stdin_text)
stdin.flush()
except Exception:
# If the remote side closed stdin early, ignore.
pass # nosec
finally:
if close_stdin:
# For sudo -S, a wrong password causes sudo to re-prompt and wait
# forever for more input. We try hard to deliver EOF so sudo can
# fail fast.
try:
chan.shutdown_write() # sends EOF to the remote process
except Exception:
pass # nosec
try:
stdin.close()
except Exception:
pass # nosec
# Read incrementally to avoid blocking forever on stdout.read()/stderr.read()
# if the remote process is waiting for more input (e.g. sudo password retry).
out_chunks: list[bytes] = []
err_chunks: list[bytes] = []
# Keep a small tail of stderr to detect sudo retry messages without
# repeatedly joining potentially large buffers.
err_tail = b""
while True:
progressed = False
if chan.recv_ready():
out_chunks.append(chan.recv(1024 * 64))
progressed = True
if chan.recv_stderr_ready():
chunk = chan.recv_stderr(1024 * 64)
err_chunks.append(chunk)
err_tail = (err_tail + chunk)[-4096:]
progressed = True
# If we just attempted sudo -S with a single password line and sudo is
# asking again, detect it and stop waiting.
if close_stdin and stdin_text is not None:
blob = err_tail.lower()
if b"sorry, try again" in blob or b"incorrect password" in blob:
try:
chan.close()
except Exception:
pass # nosec
break
# Exit once the process has exited and we have drained the buffers.
if (
chan.exit_status_ready()
and not chan.recv_ready()
and not chan.recv_stderr_ready()
):
break
if not progressed:
time.sleep(0.05)
out = b"".join(out_chunks).decode("utf-8", errors="replace")
err = b"".join(err_chunks).decode("utf-8", errors="replace")
rc = chan.recv_exit_status() if chan.exit_status_ready() else 1
return rc, out, err
def remote_harvest(
def _ssh_run_sudo(
ssh,
cmd: str,
*,
sudo_password: Optional[str] = None,
get_pty: bool = True,
) -> tuple[int, str, str]:
"""Run cmd via sudo with a safe non-interactive-first strategy.
Strategy:
1) Try `sudo -n`.
2) If sudo reports a password is required and we have one, retry with
`sudo -S` and feed it via stdin.
3) If sudo reports a password is required and we *don't* have one, raise
RemoteSudoPasswordRequired.
We avoid requesting a PTY unless the remote sudo policy requires it.
This makes sudo -S behavior more reliable (wrong passwords fail fast
instead of blocking on a PTY).
"""
cmd_n = f"sudo -n -p '' -- {cmd}"
# First try: never prompt, and prefer no PTY.
rc, out, err = _ssh_run(ssh, cmd_n, get_pty=False)
need_pty = False
# Some sudoers configurations require a TTY even for passwordless sudo.
if get_pty and rc != 0 and _sudo_tty_required(out, err):
need_pty = True
rc, out, err = _ssh_run(ssh, cmd_n, get_pty=True)
if rc == 0:
return rc, out, err
if _sudo_not_permitted(out, err):
return rc, out, err
if _sudo_password_required(out, err):
if sudo_password is None:
raise RemoteSudoPasswordRequired(
"Remote sudo requires a password, but none was provided."
)
cmd_s = f"sudo -S -p '' -- {cmd}"
return _ssh_run(
ssh,
cmd_s,
get_pty=need_pty,
stdin_text=str(sudo_password) + "\n",
close_stdin=True,
)
return rc, out, err
def _remote_harvest(
*,
local_out_dir: Path,
remote_host: str,
@ -106,6 +335,7 @@ def remote_harvest(
remote_python: str = "python3",
dangerous: bool = False,
no_sudo: bool = False,
sudo_password: Optional[str] = None,
include_paths: Optional[list[str]] = None,
exclude_paths: Optional[list[str]] = None,
) -> Path:
@ -190,10 +420,15 @@ def remote_harvest(
argv.extend(["--exclude-path", str(p)])
_cmd = " ".join(map(shlex.quote, argv))
cmd = f"sudo {_cmd}" if not no_sudo else _cmd
# PTY for sudo commands (helps sudoers requiretty).
rc, out, err = _ssh_run(ssh, cmd, get_pty=(not no_sudo))
if not no_sudo:
# Prefer non-interactive sudo first; retry with -S only when needed.
rc, out, err = _ssh_run_sudo(
ssh, _cmd, sudo_password=sudo_password, get_pty=True
)
cmd = f"sudo {_cmd}"
else:
cmd = _cmd
rc, out, err = _ssh_run(ssh, cmd, get_pty=False)
if rc != 0:
raise RuntimeError(
"Remote harvest failed.\n"
@ -210,12 +445,17 @@ def remote_harvest(
"Unable to determine remote username for chown. "
"Pass --remote-user explicitly or use --no-sudo."
)
cmd = f"sudo chown -R {resolved_user} {rbundle}"
rc, out, err = _ssh_run(ssh, cmd, get_pty=True)
chown_cmd = f"chown -R {resolved_user} {rbundle}"
rc, out, err = _ssh_run_sudo(
ssh,
chown_cmd,
sudo_password=sudo_password,
get_pty=True,
)
if rc != 0:
raise RuntimeError(
"chown of harvest failed.\n"
f"Command: {cmd}\n"
f"Command: sudo {chown_cmd}\n"
f"Exit code: {rc}\n"
f"Stdout: {out.strip()}\n"
f"Stderr: {err.strip()}"