Fix tests
This commit is contained in:
parent
824010b2ab
commit
6c3275b44a
3 changed files with 525 additions and 5 deletions
189
tests/test_cli_config_and_sops.py
Normal file
189
tests/test_cli_config_and_sops.py
Normal file
|
|
@ -0,0 +1,189 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import configparser
|
||||
import tarfile
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def test_discover_config_path_precedence(monkeypatch, tmp_path: Path):
|
||||
from enroll.cli import _discover_config_path
|
||||
|
||||
cfg = tmp_path / "cfg.ini"
|
||||
cfg.write_text("[enroll]\n", encoding="utf-8")
|
||||
|
||||
# --no-config always wins
|
||||
monkeypatch.setenv("ENROLL_CONFIG", str(cfg))
|
||||
assert _discover_config_path(["--no-config", "harvest"]) is None
|
||||
|
||||
# explicit --config wins
|
||||
assert _discover_config_path(["--config", str(cfg), "harvest"]) == cfg
|
||||
|
||||
# env var used when present
|
||||
assert _discover_config_path(["harvest"]) == cfg
|
||||
|
||||
|
||||
def test_discover_config_path_finds_local_and_xdg(monkeypatch, tmp_path: Path):
|
||||
from enroll.cli import _discover_config_path
|
||||
|
||||
# local file in cwd
|
||||
cwd = tmp_path / "cwd"
|
||||
cwd.mkdir()
|
||||
local = cwd / "enroll.ini"
|
||||
local.write_text("[enroll]\n", encoding="utf-8")
|
||||
|
||||
monkeypatch.chdir(cwd)
|
||||
monkeypatch.delenv("ENROLL_CONFIG", raising=False)
|
||||
monkeypatch.delenv("XDG_CONFIG_HOME", raising=False)
|
||||
assert _discover_config_path(["harvest"]) == local
|
||||
|
||||
# xdg config fallback
|
||||
monkeypatch.chdir(tmp_path)
|
||||
xdg = tmp_path / "xdg"
|
||||
(xdg / "enroll").mkdir(parents=True)
|
||||
xcfg = xdg / "enroll" / "enroll.ini"
|
||||
xcfg.write_text("[enroll]\n", encoding="utf-8")
|
||||
monkeypatch.setenv("XDG_CONFIG_HOME", str(xdg))
|
||||
assert _discover_config_path(["harvest"]) == xcfg
|
||||
|
||||
|
||||
def test_section_to_argv_supports_bool_append_count_and_unknown(monkeypatch, capsys):
|
||||
from enroll.cli import _section_to_argv
|
||||
|
||||
ap = argparse.ArgumentParser(add_help=False)
|
||||
ap.add_argument("--flag", action="store_true")
|
||||
ap.add_argument("--no-flag", action="store_false", dest="flag2")
|
||||
ap.add_argument("--item", action="append", default=[])
|
||||
ap.add_argument("-v", action="count", default=0)
|
||||
|
||||
cfg = configparser.ConfigParser()
|
||||
cfg.read_dict(
|
||||
{
|
||||
"enroll": {
|
||||
"flag": "true",
|
||||
"no_flag": "false",
|
||||
"item": "a,b",
|
||||
"v": "2",
|
||||
"unknown_key": "zzz",
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
argv = _section_to_argv(ap, cfg, "enroll")
|
||||
|
||||
# bools set
|
||||
assert "--flag" in argv
|
||||
assert "--no-flag" in argv
|
||||
|
||||
# append expanded
|
||||
assert argv.count("--item") == 2
|
||||
assert "a" in argv and "b" in argv
|
||||
|
||||
# count flag expanded
|
||||
assert argv.count("-v") == 2
|
||||
|
||||
# unknown key prints warning
|
||||
err = capsys.readouterr().err
|
||||
assert "unknown option" in err
|
||||
|
||||
|
||||
def test_inject_config_argv_inserts_global_and_command_tokens(tmp_path: Path):
|
||||
from enroll.cli import _inject_config_argv
|
||||
|
||||
root = argparse.ArgumentParser(add_help=False)
|
||||
root.add_argument("--root-flag", action="store_true")
|
||||
sub = root.add_subparsers(dest="cmd", required=True)
|
||||
p_h = sub.add_parser("harvest", add_help=False)
|
||||
p_h.add_argument("--dangerous", action="store_true")
|
||||
p_h.add_argument("--include-path", action="append", default=[])
|
||||
|
||||
cfg_path = tmp_path / "enroll.ini"
|
||||
cfg_path.write_text(
|
||||
"""[enroll]
|
||||
root-flag = true
|
||||
|
||||
[harvest]
|
||||
dangerous = true
|
||||
include-path = /etc/one,/etc/two
|
||||
""",
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
argv = ["harvest", "--include-path", "/etc/cli"]
|
||||
injected = _inject_config_argv(
|
||||
argv,
|
||||
cfg_path=cfg_path,
|
||||
root_parser=root,
|
||||
subparsers={"harvest": p_h},
|
||||
)
|
||||
|
||||
# global inserted before cmd, subcommand tokens right after cmd
|
||||
assert injected[:2] == ["--root-flag", "harvest"]
|
||||
# include-path from config inserted before CLI include-path (CLI wins later if duplicates)
|
||||
joined = " ".join(injected)
|
||||
assert "--include-path /etc/one" in joined
|
||||
assert "--include-path /etc/cli" in joined
|
||||
|
||||
|
||||
def test_resolve_sops_out_file_and_encrypt_path(monkeypatch, tmp_path: Path):
|
||||
from enroll import cli
|
||||
|
||||
# directory output should yield harvest.tar.gz.sops inside
|
||||
out_dir = tmp_path / "o"
|
||||
out_dir.mkdir()
|
||||
assert (
|
||||
cli._resolve_sops_out_file(str(out_dir), hint="h").name == "harvest.tar.gz.sops"
|
||||
)
|
||||
|
||||
# file-like output retained
|
||||
out_file = tmp_path / "x.sops"
|
||||
assert cli._resolve_sops_out_file(str(out_file), hint="h") == out_file
|
||||
|
||||
# None uses cache dir
|
||||
class HC:
|
||||
def __init__(self, d: Path):
|
||||
self.dir = d
|
||||
|
||||
monkeypatch.setattr(
|
||||
cli, "new_harvest_cache_dir", lambda hint: HC(tmp_path / "cache")
|
||||
)
|
||||
p = cli._resolve_sops_out_file(None, hint="h")
|
||||
assert str(p).endswith("harvest.tar.gz.sops")
|
||||
|
||||
# Cover _tar_dir_to quickly (writes a tarball)
|
||||
bundle = tmp_path / "bundle"
|
||||
bundle.mkdir()
|
||||
(bundle / "state.json").write_text("{}", encoding="utf-8")
|
||||
tar_path = tmp_path / "b.tar.gz"
|
||||
cli._tar_dir_to(bundle, tar_path)
|
||||
assert tar_path.exists()
|
||||
with tarfile.open(tar_path, "r:gz") as tf:
|
||||
names = tf.getnames()
|
||||
assert "state.json" in names or "./state.json" in names
|
||||
|
||||
|
||||
def test_encrypt_harvest_dir_to_sops_cleans_up_tmp_tgz(monkeypatch, tmp_path: Path):
|
||||
from enroll.cli import _encrypt_harvest_dir_to_sops
|
||||
|
||||
bundle = tmp_path / "bundle"
|
||||
bundle.mkdir()
|
||||
(bundle / "state.json").write_text("{}", encoding="utf-8")
|
||||
out_file = tmp_path / "out.sops"
|
||||
|
||||
seen = {}
|
||||
|
||||
def fake_encrypt(src: Path, dst: Path, pgp_fingerprints, mode): # noqa: ARG001
|
||||
# write something so we can see output created
|
||||
seen["src"] = src
|
||||
dst.write_bytes(b"enc")
|
||||
|
||||
monkeypatch.setattr("enroll.cli.encrypt_file_binary", fake_encrypt)
|
||||
|
||||
# Make os.unlink raise FileNotFoundError to hit the except branch in finally.
|
||||
monkeypatch.setattr(
|
||||
"enroll.cli.os.unlink", lambda p: (_ for _ in ()).throw(FileNotFoundError())
|
||||
)
|
||||
|
||||
res = _encrypt_harvest_dir_to_sops(bundle, out_file, fps=["ABC"])
|
||||
assert res == out_file
|
||||
assert out_file.read_bytes() == b"enc"
|
||||
323
tests/test_more_coverage.py
Normal file
323
tests/test_more_coverage.py
Normal file
|
|
@ -0,0 +1,323 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import types
|
||||
from pathlib import Path
|
||||
from types import SimpleNamespace
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
def test_cache_dir_defaults_to_home_cache(monkeypatch, tmp_path: Path):
|
||||
# Ensure default path uses ~/.cache when XDG_CACHE_HOME is unset.
|
||||
from enroll.cache import enroll_cache_dir
|
||||
|
||||
monkeypatch.delenv("XDG_CACHE_HOME", raising=False)
|
||||
monkeypatch.setattr(Path, "home", lambda: tmp_path)
|
||||
|
||||
p = enroll_cache_dir()
|
||||
assert str(p).startswith(str(tmp_path))
|
||||
assert p.name == "enroll"
|
||||
|
||||
|
||||
def test_harvest_cache_state_json_property(tmp_path: Path):
|
||||
from enroll.cache import HarvestCache
|
||||
|
||||
hc = HarvestCache(tmp_path / "h1")
|
||||
assert hc.state_json == hc.dir / "state.json"
|
||||
|
||||
|
||||
def test_cache_dir_security_rejects_symlink(tmp_path: Path):
|
||||
from enroll.cache import _ensure_dir_secure
|
||||
|
||||
real = tmp_path / "real"
|
||||
real.mkdir()
|
||||
link = tmp_path / "link"
|
||||
link.symlink_to(real, target_is_directory=True)
|
||||
|
||||
with pytest.raises(RuntimeError, match="Refusing to use symlink"):
|
||||
_ensure_dir_secure(link)
|
||||
|
||||
|
||||
def test_cache_dir_chmod_failures_are_ignored(monkeypatch, tmp_path: Path):
|
||||
from enroll import cache
|
||||
|
||||
# Make the cache base path deterministic and writable.
|
||||
monkeypatch.setattr(cache, "enroll_cache_dir", lambda: tmp_path)
|
||||
|
||||
# Force os.chmod to fail to cover the "except OSError: pass" paths.
|
||||
monkeypatch.setattr(
|
||||
os, "chmod", lambda *a, **k: (_ for _ in ()).throw(OSError("nope"))
|
||||
)
|
||||
|
||||
hc = cache.new_harvest_cache_dir()
|
||||
assert hc.dir.exists()
|
||||
assert hc.dir.is_dir()
|
||||
|
||||
|
||||
def test_stat_triplet_falls_back_to_numeric_ids(monkeypatch, tmp_path: Path):
|
||||
from enroll.fsutil import stat_triplet
|
||||
import pwd
|
||||
import grp
|
||||
|
||||
p = tmp_path / "x"
|
||||
p.write_text("x", encoding="utf-8")
|
||||
|
||||
# Force username/group resolution failures.
|
||||
monkeypatch.setattr(
|
||||
pwd, "getpwuid", lambda _uid: (_ for _ in ()).throw(KeyError("no user"))
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
grp, "getgrgid", lambda _gid: (_ for _ in ()).throw(KeyError("no group"))
|
||||
)
|
||||
|
||||
owner, group, mode = stat_triplet(str(p))
|
||||
assert owner.isdigit()
|
||||
assert group.isdigit()
|
||||
assert len(mode) == 4
|
||||
|
||||
|
||||
def test_ignore_policy_iter_effective_lines_removes_block_comments():
|
||||
from enroll.ignore import IgnorePolicy
|
||||
|
||||
pol = IgnorePolicy()
|
||||
data = b"""keep1
|
||||
/*
|
||||
drop me
|
||||
*/
|
||||
keep2
|
||||
"""
|
||||
assert list(pol.iter_effective_lines(data)) == [b"keep1", b"keep2"]
|
||||
|
||||
|
||||
def test_ignore_policy_deny_reason_dir_variants(tmp_path: Path):
|
||||
from enroll.ignore import IgnorePolicy
|
||||
|
||||
pol = IgnorePolicy()
|
||||
|
||||
# denied by glob
|
||||
assert pol.deny_reason_dir("/etc/shadow") == "denied_path"
|
||||
|
||||
# symlink rejected
|
||||
d = tmp_path / "d"
|
||||
d.mkdir()
|
||||
link = tmp_path / "l"
|
||||
link.symlink_to(d, target_is_directory=True)
|
||||
assert pol.deny_reason_dir(str(link)) == "symlink"
|
||||
|
||||
# not a directory
|
||||
f = tmp_path / "f"
|
||||
f.write_text("x", encoding="utf-8")
|
||||
assert pol.deny_reason_dir(str(f)) == "not_directory"
|
||||
|
||||
# ok
|
||||
assert pol.deny_reason_dir(str(d)) is None
|
||||
|
||||
|
||||
def test_run_jinjaturtle_parses_outputs(monkeypatch, tmp_path: Path):
|
||||
# Fully unit-test enroll.jinjaturtle.run_jinjaturtle by stubbing subprocess.run.
|
||||
from enroll.jinjaturtle import run_jinjaturtle
|
||||
|
||||
def fake_run(cmd, **kwargs): # noqa: ARG001
|
||||
# cmd includes "-d <defaults> -t <template>"
|
||||
d_idx = cmd.index("-d") + 1
|
||||
t_idx = cmd.index("-t") + 1
|
||||
defaults = Path(cmd[d_idx])
|
||||
template = Path(cmd[t_idx])
|
||||
defaults.write_text("---\nfoo: 1\n", encoding="utf-8")
|
||||
template.write_text("value={{ foo }}\n", encoding="utf-8")
|
||||
return SimpleNamespace(returncode=0, stdout="ok", stderr="")
|
||||
|
||||
monkeypatch.setattr(subprocess, "run", fake_run)
|
||||
|
||||
src = tmp_path / "src.ini"
|
||||
src.write_text("foo=1\n", encoding="utf-8")
|
||||
|
||||
res = run_jinjaturtle("/bin/jinjaturtle", str(src), role_name="role1")
|
||||
assert "foo: 1" in res.vars_text
|
||||
assert "value=" in res.template_text
|
||||
|
||||
|
||||
def test_run_jinjaturtle_raises_on_failure(monkeypatch, tmp_path: Path):
|
||||
from enroll.jinjaturtle import run_jinjaturtle
|
||||
|
||||
def fake_run(cmd, **kwargs): # noqa: ARG001
|
||||
return SimpleNamespace(returncode=2, stdout="out", stderr="bad")
|
||||
|
||||
monkeypatch.setattr(subprocess, "run", fake_run)
|
||||
|
||||
src = tmp_path / "src.ini"
|
||||
src.write_text("x", encoding="utf-8")
|
||||
with pytest.raises(RuntimeError, match="jinjaturtle failed"):
|
||||
run_jinjaturtle("/bin/jinjaturtle", str(src), role_name="role1")
|
||||
|
||||
|
||||
def test_require_sops_cmd_errors_when_missing(monkeypatch):
|
||||
from enroll.sopsutil import require_sops_cmd, SopsError
|
||||
|
||||
monkeypatch.setattr("enroll.sopsutil.shutil.which", lambda _: None)
|
||||
with pytest.raises(SopsError, match="not found on PATH"):
|
||||
require_sops_cmd()
|
||||
|
||||
|
||||
def test_get_enroll_version_reports_unknown_on_metadata_failure(monkeypatch):
|
||||
import enroll.version as v
|
||||
|
||||
fake_meta = types.ModuleType("importlib.metadata")
|
||||
|
||||
def boom():
|
||||
raise RuntimeError("boom")
|
||||
|
||||
fake_meta.packages_distributions = boom
|
||||
fake_meta.version = lambda _dist: boom()
|
||||
|
||||
monkeypatch.setitem(sys.modules, "importlib.metadata", fake_meta)
|
||||
assert v.get_enroll_version() == "unknown"
|
||||
|
||||
|
||||
def test_get_enroll_version_returns_unknown_if_importlib_metadata_unavailable(
|
||||
monkeypatch,
|
||||
):
|
||||
import builtins
|
||||
import enroll.version as v
|
||||
|
||||
real_import = builtins.__import__
|
||||
|
||||
def fake_import(
|
||||
name, globals=None, locals=None, fromlist=(), level=0
|
||||
): # noqa: A002
|
||||
if name == "importlib.metadata":
|
||||
raise ImportError("no metadata")
|
||||
return real_import(name, globals, locals, fromlist, level)
|
||||
|
||||
monkeypatch.setattr(builtins, "__import__", fake_import)
|
||||
assert v.get_enroll_version() == "unknown"
|
||||
|
||||
|
||||
def test_compare_harvests_and_format_report(tmp_path: Path):
|
||||
from enroll.diff import compare_harvests, format_report
|
||||
|
||||
old = tmp_path / "old"
|
||||
new = tmp_path / "new"
|
||||
(old / "artifacts").mkdir(parents=True)
|
||||
(new / "artifacts").mkdir(parents=True)
|
||||
|
||||
def write_state(base: Path, state: dict) -> None:
|
||||
base.mkdir(parents=True, exist_ok=True)
|
||||
(base / "state.json").write_text(json.dumps(state, indent=2), encoding="utf-8")
|
||||
|
||||
# Old bundle: pkg a@1.0, pkg b@1.0, one service, one user, one managed file.
|
||||
old_state = {
|
||||
"schema_version": 3,
|
||||
"host": {"hostname": "h1"},
|
||||
"inventory": {"packages": {"a": {"version": "1.0"}, "b": {"version": "1.0"}}},
|
||||
"roles": {
|
||||
"services": [
|
||||
{
|
||||
"unit": "svc.service",
|
||||
"role_name": "svc",
|
||||
"packages": ["a"],
|
||||
"active_state": "inactive",
|
||||
"sub_state": "dead",
|
||||
"unit_file_state": "enabled",
|
||||
"condition_result": None,
|
||||
"managed_files": [
|
||||
{
|
||||
"path": "/etc/foo.conf",
|
||||
"src_rel": "etc/foo.conf",
|
||||
"owner": "root",
|
||||
"group": "root",
|
||||
"mode": "0644",
|
||||
"reason": "modified_conffile",
|
||||
}
|
||||
],
|
||||
}
|
||||
],
|
||||
"packages": [],
|
||||
"users": {
|
||||
"role_name": "users",
|
||||
"users": [{"name": "alice", "shell": "/bin/sh"}],
|
||||
},
|
||||
"apt_config": {"role_name": "apt_config", "managed_files": []},
|
||||
"etc_custom": {"role_name": "etc_custom", "managed_files": []},
|
||||
"usr_local_custom": {"role_name": "usr_local_custom", "managed_files": []},
|
||||
"extra_paths": {"role_name": "extra_paths", "managed_files": []},
|
||||
},
|
||||
}
|
||||
(old / "artifacts" / "svc" / "etc").mkdir(parents=True, exist_ok=True)
|
||||
(old / "artifacts" / "svc" / "etc" / "foo.conf").write_text("old", encoding="utf-8")
|
||||
write_state(old, old_state)
|
||||
|
||||
# New bundle: pkg a@2.0, pkg c@1.0, service changed, user changed, file moved role+content.
|
||||
new_state = {
|
||||
"schema_version": 3,
|
||||
"host": {"hostname": "h2"},
|
||||
"inventory": {"packages": {"a": {"version": "2.0"}, "c": {"version": "1.0"}}},
|
||||
"roles": {
|
||||
"services": [
|
||||
{
|
||||
"unit": "svc.service",
|
||||
"role_name": "svc",
|
||||
"packages": ["a", "c"],
|
||||
"active_state": "active",
|
||||
"sub_state": "running",
|
||||
"unit_file_state": "enabled",
|
||||
"condition_result": None,
|
||||
"managed_files": [],
|
||||
}
|
||||
],
|
||||
"packages": [],
|
||||
"users": {
|
||||
"role_name": "users",
|
||||
"users": [{"name": "alice", "shell": "/bin/bash"}, {"name": "bob"}],
|
||||
},
|
||||
"apt_config": {"role_name": "apt_config", "managed_files": []},
|
||||
"etc_custom": {"role_name": "etc_custom", "managed_files": []},
|
||||
"usr_local_custom": {"role_name": "usr_local_custom", "managed_files": []},
|
||||
"extra_paths": {
|
||||
"role_name": "extra_paths",
|
||||
"managed_files": [
|
||||
{
|
||||
"path": "/etc/foo.conf",
|
||||
"src_rel": "etc/foo.conf",
|
||||
"owner": "root",
|
||||
"group": "root",
|
||||
"mode": "0600",
|
||||
"reason": "user_include",
|
||||
},
|
||||
{
|
||||
"path": "/etc/added.conf",
|
||||
"src_rel": "etc/added.conf",
|
||||
"owner": "root",
|
||||
"group": "root",
|
||||
"mode": "0644",
|
||||
"reason": "user_include",
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
}
|
||||
(new / "artifacts" / "extra_paths" / "etc").mkdir(parents=True, exist_ok=True)
|
||||
(new / "artifacts" / "extra_paths" / "etc" / "foo.conf").write_text(
|
||||
"new", encoding="utf-8"
|
||||
)
|
||||
(new / "artifacts" / "extra_paths" / "etc" / "added.conf").write_text(
|
||||
"x", encoding="utf-8"
|
||||
)
|
||||
write_state(new, new_state)
|
||||
|
||||
report, changed = compare_harvests(str(old), str(new))
|
||||
assert changed is True
|
||||
|
||||
txt = format_report(report, fmt="text")
|
||||
assert "Packages" in txt
|
||||
|
||||
md = format_report(report, fmt="markdown")
|
||||
assert "# enroll diff report" in md
|
||||
|
||||
js = format_report(report, fmt="json")
|
||||
parsed = json.loads(js)
|
||||
assert parsed["packages"]["added"] == ["c"]
|
||||
|
|
@ -49,7 +49,7 @@ def test_safe_extract_tar_rejects_symlinks(tmp_path: Path):
|
|||
_safe_extract_tar(tf, tmp_path)
|
||||
|
||||
|
||||
def test_remote_harvest_happy_path(tmp_path: Path, monkeypatch):
|
||||
def test_remote_harvest_happy_path_requests_pty_for_sudo(tmp_path: Path, monkeypatch):
|
||||
import sys
|
||||
|
||||
import enroll.remote as r
|
||||
|
|
@ -65,7 +65,7 @@ def test_remote_harvest_happy_path(tmp_path: Path, monkeypatch):
|
|||
# Prepare a tiny harvest bundle tar stream from the "remote".
|
||||
tgz = _make_tgz_bytes({"state.json": b'{"ok": true}\n'})
|
||||
|
||||
calls: list[str] = []
|
||||
calls: list[tuple[str, bool]] = []
|
||||
|
||||
class _Chan:
|
||||
def __init__(self, rc: int = 0):
|
||||
|
|
@ -116,8 +116,9 @@ def test_remote_harvest_happy_path(tmp_path: Path, monkeypatch):
|
|||
def open_sftp(self):
|
||||
return self._sftp
|
||||
|
||||
def exec_command(self, cmd: str):
|
||||
calls.append(cmd)
|
||||
def exec_command(self, cmd: str, get_pty: bool = False):
|
||||
calls.append((cmd, bool(get_pty)))
|
||||
|
||||
# The tar stream uses exec_command directly.
|
||||
if cmd.startswith("tar -cz -C"):
|
||||
return (None, _Stdout(tgz, rc=0), _Stderr(b""))
|
||||
|
|
@ -168,8 +169,15 @@ def test_remote_harvest_happy_path(tmp_path: Path, monkeypatch):
|
|||
assert b"ok" in state_path.read_bytes()
|
||||
|
||||
# Ensure we attempted remote harvest with sudo and passed include/exclude and dangerous.
|
||||
joined = "\n".join(calls)
|
||||
joined = "\n".join([c for c, _ in calls])
|
||||
assert "sudo" in joined
|
||||
assert "--dangerous" in joined
|
||||
assert "--include-path" in joined
|
||||
assert "--exclude-path" in joined
|
||||
|
||||
# Assert PTY is requested for sudo commands (harvest & chown), not for tar streaming.
|
||||
sudo_cmds = [(c, pty) for c, pty in calls if c.startswith("sudo ")]
|
||||
assert sudo_cmds, "expected at least one sudo command"
|
||||
assert all(pty for _, pty in sudo_cmds)
|
||||
tar_cmds = [(c, pty) for c, pty in calls if c.startswith("tar -cz -C")]
|
||||
assert tar_cmds and all(not pty for _, pty in tar_cmds)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue