Strict validation of PATH when running as root in case it could contain potentially unsafe binaries
This commit is contained in:
parent
205c419a7a
commit
a0914e1369
3 changed files with 109 additions and 0 deletions
102
enroll/cli.py
102
enroll/cli.py
|
|
@ -4,6 +4,7 @@ import argparse
|
|||
import configparser
|
||||
import json
|
||||
import os
|
||||
import stat
|
||||
import sys
|
||||
import tarfile
|
||||
import tempfile
|
||||
|
|
@ -134,6 +135,83 @@ def _split_list_value(v: str) -> list[str]:
|
|||
return [raw] if raw else []
|
||||
|
||||
|
||||
def _path_entry_is_unsafe(entry: str) -> Optional[str]:
|
||||
"""Return a human-readable reason if a PATH entry is unsafe for root.
|
||||
|
||||
Empty PATH entries and relative entries resolve via the current working
|
||||
directory, which is equivalent to trusting whatever directory the operator
|
||||
happens to be in. Existing group/world-writable directories are also risky
|
||||
when Enroll is run as root because Enroll deliberately invokes host tools
|
||||
from PATH while harvesting and enforcing state.
|
||||
"""
|
||||
|
||||
if entry == "":
|
||||
return "empty PATH entry resolves to the current directory"
|
||||
if entry == ".":
|
||||
return "'.' resolves to the current directory"
|
||||
if not os.path.isabs(entry):
|
||||
return "relative PATH entry resolves from the current directory"
|
||||
|
||||
try:
|
||||
st = os.stat(entry)
|
||||
except OSError:
|
||||
return None
|
||||
if not stat.S_ISDIR(st.st_mode):
|
||||
return None
|
||||
if st.st_mode & stat.S_IWOTH:
|
||||
return "directory is world-writable"
|
||||
if st.st_mode & stat.S_IWGRP:
|
||||
return "directory is group-writable"
|
||||
return None
|
||||
|
||||
|
||||
def _unsafe_root_path_reasons(path_value: Optional[str] = None) -> list[str]:
|
||||
"""Return unsafe PATH entries that should make root execution interactive."""
|
||||
|
||||
raw = os.environ.get("PATH", "") if path_value is None else str(path_value)
|
||||
out: list[str] = []
|
||||
for entry in raw.split(os.pathsep):
|
||||
reason = _path_entry_is_unsafe(entry)
|
||||
if reason:
|
||||
label = entry if entry else "<empty>"
|
||||
out.append(f"{label}: {reason}")
|
||||
return out
|
||||
|
||||
|
||||
def _is_effective_root() -> bool:
|
||||
geteuid = getattr(os, "geteuid", None)
|
||||
return bool(geteuid is not None and geteuid() == 0)
|
||||
|
||||
|
||||
def _confirm_root_path_safety(*, force: bool = False) -> None:
|
||||
"""Prompt before running as root with a PATH that trusts writable entries."""
|
||||
|
||||
if force or not _is_effective_root():
|
||||
return
|
||||
|
||||
reasons = _unsafe_root_path_reasons()
|
||||
if not reasons:
|
||||
return
|
||||
|
||||
details = "\n".join(f" - {r}" for r in reasons)
|
||||
msg = (
|
||||
"warning: enroll is running as root and PATH contains entries that "
|
||||
"could allow an untrusted binary to be executed:\n"
|
||||
f"{details}\n"
|
||||
)
|
||||
|
||||
if not sys.stdin.isatty():
|
||||
raise SystemExit(
|
||||
msg + "error: refusing to continue non-interactively. Re-run with "
|
||||
"--assume-safe-path if you intentionally trust this PATH."
|
||||
)
|
||||
|
||||
print(msg, file=sys.stderr, end="")
|
||||
answer = input("Are you sure you want to continue? [y/N] ")
|
||||
if answer.strip().lower() not in {"y", "yes"}:
|
||||
raise SystemExit("aborted: unsafe root PATH was not confirmed")
|
||||
|
||||
|
||||
def _section_to_argv(
|
||||
p: argparse.ArgumentParser, cfg: configparser.ConfigParser, section: str
|
||||
) -> list[str]:
|
||||
|
|
@ -359,6 +437,21 @@ def _add_config_args(p: argparse.ArgumentParser) -> None:
|
|||
)
|
||||
|
||||
|
||||
def _add_path_safety_args(
|
||||
p: argparse.ArgumentParser, *, default: object = False
|
||||
) -> None:
|
||||
p.add_argument(
|
||||
"--assume-safe-path",
|
||||
action="store_true",
|
||||
default=default,
|
||||
help=(
|
||||
"When running as root, continue without confirmation even if PATH "
|
||||
"contains '.', an empty/relative entry, or a group/world-writable "
|
||||
"directory. Intended for trusted non-interactive automation."
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def _add_remote_args(p: argparse.ArgumentParser) -> None:
|
||||
p.add_argument(
|
||||
"--remote-host",
|
||||
|
|
@ -432,10 +525,12 @@ def main() -> None:
|
|||
version=f"{get_enroll_version()}",
|
||||
)
|
||||
_add_config_args(ap)
|
||||
_add_path_safety_args(ap)
|
||||
sub = ap.add_subparsers(dest="cmd", required=True)
|
||||
|
||||
h = sub.add_parser("harvest", help="Harvest service/package/config state")
|
||||
_add_config_args(h)
|
||||
_add_path_safety_args(h, default=argparse.SUPPRESS)
|
||||
_add_remote_args(h)
|
||||
h.add_argument(
|
||||
"--out",
|
||||
|
|
@ -488,6 +583,7 @@ def main() -> None:
|
|||
"manifest", help="Render configuration-management code from a harvest"
|
||||
)
|
||||
_add_config_args(m)
|
||||
_add_path_safety_args(m, default=argparse.SUPPRESS)
|
||||
m.add_argument(
|
||||
"--harvest",
|
||||
required=True,
|
||||
|
|
@ -522,6 +618,7 @@ def main() -> None:
|
|||
help="Harvest state, then manifest configuration-management code, in one shot",
|
||||
)
|
||||
_add_config_args(s)
|
||||
_add_path_safety_args(s, default=argparse.SUPPRESS)
|
||||
_add_remote_args(s)
|
||||
s.add_argument(
|
||||
"--harvest",
|
||||
|
|
@ -582,6 +679,7 @@ def main() -> None:
|
|||
|
||||
d = sub.add_parser("diff", help="Compare two harvests and report differences")
|
||||
_add_config_args(d)
|
||||
_add_path_safety_args(d, default=argparse.SUPPRESS)
|
||||
d.add_argument(
|
||||
"--old",
|
||||
required=True,
|
||||
|
|
@ -703,6 +801,7 @@ def main() -> None:
|
|||
|
||||
e = sub.add_parser("explain", help="Explain a harvest state.json")
|
||||
_add_config_args(e)
|
||||
_add_path_safety_args(e, default=argparse.SUPPRESS)
|
||||
e.add_argument(
|
||||
"harvest",
|
||||
help=(
|
||||
|
|
@ -731,6 +830,7 @@ def main() -> None:
|
|||
"validate", help="Validate a harvest bundle (state.json + artifacts)"
|
||||
)
|
||||
_add_config_args(v)
|
||||
_add_path_safety_args(v, default=argparse.SUPPRESS)
|
||||
v.add_argument(
|
||||
"harvest",
|
||||
help=(
|
||||
|
|
@ -787,6 +887,8 @@ def main() -> None:
|
|||
)
|
||||
args = ap.parse_args(argv)
|
||||
|
||||
_confirm_root_path_safety(force=bool(getattr(args, "assume_safe_path", False)))
|
||||
|
||||
# Preserve historical defaults for remote harvesting unless ssh_config lookup is enabled.
|
||||
# This lets ssh_config values take effect when the user did not explicitly set
|
||||
# --remote-user / --remote-port.
|
||||
|
|
|
|||
Reference in a new issue