Allow the user to add extra paths to harvest, or
All checks were successful
CI / test (push) Successful in 5m31s
Lint / test (push) Successful in 34s
Trivy / test (push) Successful in 19s

paths to ignore, using `--exclude-path` and
`--include-path` arguments.
This commit is contained in:
Miguel Jacq 2025-12-20 17:47:00 +11:00
parent 25add369dc
commit 240e79706f
Signed by: mig5
GPG key ID: 59B3F0C24135C6A9
9 changed files with 687 additions and 12 deletions

View file

@ -19,6 +19,7 @@ from .debian import (
stat_triplet,
)
from .ignore import IgnorePolicy
from .pathfilter import PathFilter, expand_includes
from .accounts import collect_non_system_users
@ -86,6 +87,16 @@ class UsrLocalCustomSnapshot:
notes: List[str]
@dataclass
class ExtraPathsSnapshot:
role_name: str
include_patterns: List[str]
exclude_patterns: List[str]
managed_files: List[ManagedFile]
excluded: List[ExcludedFile]
notes: List[str]
ALLOWED_UNOWNED_EXTS = {
".conf",
".cfg",
@ -250,6 +261,8 @@ def harvest(
policy: Optional[IgnorePolicy] = None,
*,
dangerous: bool = False,
include_paths: Optional[List[str]] = None,
exclude_paths: Optional[List[str]] = None,
) -> str:
# If a policy is not supplied, build one. `--dangerous` relaxes secret
# detection and deny-glob skipping.
@ -261,6 +274,10 @@ def harvest(
policy.dangerous = True
os.makedirs(bundle_dir, exist_ok=True)
# User-provided includes/excludes. Excludes apply to all harvesting;
# includes are harvested into an extra role.
path_filter = PathFilter(include=include_paths or (), exclude=exclude_paths or ())
if hasattr(os, "geteuid") and os.geteuid() != 0:
print(
"Warning: not running as root; harvest may miss files or metadata.",
@ -406,6 +423,9 @@ def harvest(
)
for path, reason in sorted(candidates.items()):
if path_filter.is_excluded(path):
excluded.append(ExcludedFile(path=path, reason="user_excluded"))
continue
deny = policy.deny_reason(path)
if deny:
excluded.append(ExcludedFile(path=path, reason=deny))
@ -522,6 +542,9 @@ def harvest(
candidates.setdefault(r, "custom_specific_path")
for path, reason in sorted(candidates.items()):
if path_filter.is_excluded(path):
excluded.append(ExcludedFile(path=path, reason="user_excluded"))
continue
deny = policy.deny_reason(path)
if deny:
excluded.append(ExcludedFile(path=path, reason=deny))
@ -593,6 +616,9 @@ def harvest(
# Copy only safe SSH public material: authorized_keys + *.pub
for sf in u.ssh_files:
if path_filter.is_excluded(sf):
users_excluded.append(ExcludedFile(path=sf, reason="user_excluded"))
continue
deny = policy.deny_reason(sf)
if deny:
users_excluded.append(ExcludedFile(path=sf, reason=deny))
@ -665,6 +691,10 @@ def harvest(
if not _is_confish(path):
continue
if path_filter.is_excluded(path):
etc_excluded.append(ExcludedFile(path=path, reason="user_excluded"))
continue
deny = policy.deny_reason(path)
if deny:
etc_excluded.append(ExcludedFile(path=path, reason=deny))
@ -754,6 +784,10 @@ def harvest(
ul_excluded.append(ExcludedFile(path=path, reason="unreadable"))
continue
if path_filter.is_excluded(path):
ul_excluded.append(ExcludedFile(path=path, reason="user_excluded"))
continue
deny = policy.deny_reason(path)
if deny:
ul_excluded.append(ExcludedFile(path=path, reason=deny))
@ -806,6 +840,81 @@ def harvest(
notes=ul_notes,
)
# -------------------------
# extra_paths role (user-requested includes)
# -------------------------
extra_notes: List[str] = []
extra_excluded: List[ExcludedFile] = []
extra_managed: List[ManagedFile] = []
extra_role_name = "extra_paths"
include_specs = list(include_paths or [])
exclude_specs = list(exclude_paths or [])
if include_specs:
extra_notes.append("User include patterns:")
extra_notes.extend([f"- {p}" for p in include_specs])
if exclude_specs:
extra_notes.append("User exclude patterns:")
extra_notes.extend([f"- {p}" for p in exclude_specs])
included_files: List[str] = []
if include_specs:
files, inc_notes = expand_includes(
path_filter.iter_include_patterns(),
exclude=path_filter,
max_files=4000,
)
included_files = files
extra_notes.extend(inc_notes)
for path in included_files:
if path in already_all:
continue
if path_filter.is_excluded(path):
extra_excluded.append(ExcludedFile(path=path, reason="user_excluded"))
continue
deny = policy.deny_reason(path)
if deny:
extra_excluded.append(ExcludedFile(path=path, reason=deny))
continue
try:
owner, group, mode = stat_triplet(path)
except OSError:
extra_excluded.append(ExcludedFile(path=path, reason="unreadable"))
continue
src_rel = path.lstrip("/")
try:
_copy_into_bundle(bundle_dir, extra_role_name, path, src_rel)
except OSError:
extra_excluded.append(ExcludedFile(path=path, reason="unreadable"))
continue
extra_managed.append(
ManagedFile(
path=path,
src_rel=src_rel,
owner=owner,
group=group,
mode=mode,
reason="user_include",
)
)
already_all.add(path)
extra_paths_snapshot = ExtraPathsSnapshot(
role_name=extra_role_name,
include_patterns=include_specs,
exclude_patterns=exclude_specs,
managed_files=extra_managed,
excluded=extra_excluded,
notes=extra_notes,
)
state = {
"host": {"hostname": os.uname().nodename, "os": "debian"},
"users": asdict(users_snapshot),
@ -815,6 +924,7 @@ def harvest(
"package_roles": [asdict(p) for p in pkg_snaps],
"etc_custom": asdict(etc_custom_snapshot),
"usr_local_custom": asdict(usr_local_custom_snapshot),
"extra_paths": asdict(extra_paths_snapshot),
}
state_path = os.path.join(bundle_dir, "state.json")