282 lines
8.3 KiB
Python
282 lines
8.3 KiB
Python
from __future__ import annotations
|
|
|
|
import shutil
|
|
from dataclasses import dataclass
|
|
from typing import Dict, List, Optional, Set, Tuple
|
|
|
|
from .fsutil import file_md5
|
|
|
|
|
|
def _read_os_release(path: str = "/etc/os-release") -> Dict[str, str]:
|
|
out: Dict[str, str] = {}
|
|
try:
|
|
with open(path, "r", encoding="utf-8", errors="replace") as f:
|
|
for raw in f:
|
|
line = raw.strip()
|
|
if not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
k, v = line.split("=", 1)
|
|
k = k.strip()
|
|
v = v.strip().strip('"')
|
|
out[k] = v
|
|
except OSError:
|
|
return {}
|
|
return out
|
|
|
|
|
|
@dataclass
|
|
class PlatformInfo:
|
|
os_family: str # debian|redhat|unknown
|
|
pkg_backend: str # dpkg|rpm|unknown
|
|
os_release: Dict[str, str]
|
|
|
|
|
|
def detect_platform() -> PlatformInfo:
|
|
"""Detect platform family and package backend.
|
|
|
|
Uses /etc/os-release when available, with a conservative fallback to
|
|
checking for dpkg/rpm binaries.
|
|
"""
|
|
|
|
osr = _read_os_release()
|
|
os_id = (osr.get("ID") or "").strip().lower()
|
|
likes = (osr.get("ID_LIKE") or "").strip().lower().split()
|
|
|
|
deb_ids = {"debian", "ubuntu", "linuxmint", "raspbian", "kali"}
|
|
rhel_ids = {
|
|
"fedora",
|
|
"rhel",
|
|
"centos",
|
|
"rocky",
|
|
"almalinux",
|
|
"ol",
|
|
"oracle",
|
|
"scientific",
|
|
}
|
|
|
|
if os_id in deb_ids or "debian" in likes:
|
|
return PlatformInfo(os_family="debian", pkg_backend="dpkg", os_release=osr)
|
|
if os_id in rhel_ids or any(
|
|
x in likes for x in ("rhel", "fedora", "centos", "redhat")
|
|
):
|
|
return PlatformInfo(os_family="redhat", pkg_backend="rpm", os_release=osr)
|
|
|
|
# Fallback heuristics.
|
|
if shutil.which("dpkg"):
|
|
return PlatformInfo(os_family="debian", pkg_backend="dpkg", os_release=osr)
|
|
if shutil.which("rpm"):
|
|
return PlatformInfo(os_family="redhat", pkg_backend="rpm", os_release=osr)
|
|
return PlatformInfo(os_family="unknown", pkg_backend="unknown", os_release=osr)
|
|
|
|
|
|
class PackageBackend:
|
|
"""Backend abstraction for package ownership, config detection, and manual package lists."""
|
|
|
|
name: str
|
|
pkg_config_prefixes: Tuple[str, ...]
|
|
|
|
def owner_of_path(self, path: str) -> Optional[str]: # pragma: no cover
|
|
raise NotImplementedError
|
|
|
|
def list_manual_packages(self) -> List[str]: # pragma: no cover
|
|
raise NotImplementedError
|
|
|
|
def installed_packages(self) -> Dict[str, List[Dict[str, str]]]: # pragma: no cover
|
|
"""Return mapping of package name -> installed instances.
|
|
|
|
Each instance is a dict with at least:
|
|
- version: package version string
|
|
- arch: architecture string
|
|
|
|
Backends should be best-effort and return an empty mapping on failure.
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
def build_etc_index(
|
|
self,
|
|
) -> Tuple[
|
|
Set[str], Dict[str, str], Dict[str, Set[str]], Dict[str, List[str]]
|
|
]: # pragma: no cover
|
|
raise NotImplementedError
|
|
|
|
def specific_paths_for_hints(self, hints: Set[str]) -> List[str]:
|
|
return []
|
|
|
|
def is_pkg_config_path(self, path: str) -> bool:
|
|
for pfx in self.pkg_config_prefixes:
|
|
if path == pfx or path.startswith(pfx):
|
|
return True
|
|
return False
|
|
|
|
def modified_paths(self, pkg: str, etc_paths: List[str]) -> Dict[str, str]:
|
|
"""Return a mapping of modified file paths -> reason label."""
|
|
return {}
|
|
|
|
|
|
class DpkgBackend(PackageBackend):
|
|
name = "dpkg"
|
|
pkg_config_prefixes = ("/etc/apt/",)
|
|
|
|
def __init__(self) -> None:
|
|
from .debian import parse_status_conffiles
|
|
|
|
self._conffiles_by_pkg = parse_status_conffiles()
|
|
|
|
def owner_of_path(self, path: str) -> Optional[str]:
|
|
from .debian import dpkg_owner
|
|
|
|
return dpkg_owner(path)
|
|
|
|
def list_manual_packages(self) -> List[str]:
|
|
from .debian import list_manual_packages
|
|
|
|
return list_manual_packages()
|
|
|
|
def installed_packages(self) -> Dict[str, List[Dict[str, str]]]:
|
|
from .debian import list_installed_packages
|
|
|
|
return list_installed_packages()
|
|
|
|
def build_etc_index(self):
|
|
from .debian import build_dpkg_etc_index
|
|
|
|
return build_dpkg_etc_index()
|
|
|
|
def specific_paths_for_hints(self, hints: Set[str]) -> List[str]:
|
|
paths: List[str] = []
|
|
for h in hints:
|
|
paths.extend(
|
|
[
|
|
f"/etc/default/{h}",
|
|
f"/etc/init.d/{h}",
|
|
f"/etc/sysctl.d/{h}.conf",
|
|
]
|
|
)
|
|
return paths
|
|
|
|
def modified_paths(self, pkg: str, etc_paths: List[str]) -> Dict[str, str]:
|
|
from .debian import read_pkg_md5sums
|
|
|
|
out: Dict[str, str] = {}
|
|
conff = self._conffiles_by_pkg.get(pkg, {})
|
|
md5sums = read_pkg_md5sums(pkg)
|
|
|
|
for path in etc_paths:
|
|
if not path.startswith("/etc/"):
|
|
continue
|
|
if self.is_pkg_config_path(path):
|
|
continue
|
|
if path in conff:
|
|
try:
|
|
current = file_md5(path)
|
|
except OSError:
|
|
continue
|
|
if current != conff[path]:
|
|
out[path] = "modified_conffile"
|
|
continue
|
|
|
|
rel = path.lstrip("/")
|
|
baseline = md5sums.get(rel)
|
|
if baseline:
|
|
try:
|
|
current = file_md5(path)
|
|
except OSError:
|
|
continue
|
|
if current != baseline:
|
|
out[path] = "modified_packaged_file"
|
|
return out
|
|
|
|
|
|
class RpmBackend(PackageBackend):
|
|
name = "rpm"
|
|
pkg_config_prefixes = (
|
|
"/etc/dnf/",
|
|
"/etc/yum/",
|
|
"/etc/yum.repos.d/",
|
|
"/etc/yum.conf",
|
|
)
|
|
|
|
def __init__(self) -> None:
|
|
self._modified_cache: Dict[str, Set[str]] = {}
|
|
self._config_cache: Dict[str, Set[str]] = {}
|
|
|
|
def owner_of_path(self, path: str) -> Optional[str]:
|
|
from .rpm import rpm_owner
|
|
|
|
return rpm_owner(path)
|
|
|
|
def list_manual_packages(self) -> List[str]:
|
|
from .rpm import list_manual_packages
|
|
|
|
return list_manual_packages()
|
|
|
|
def installed_packages(self) -> Dict[str, List[Dict[str, str]]]:
|
|
from .rpm import list_installed_packages
|
|
|
|
return list_installed_packages()
|
|
|
|
def build_etc_index(self):
|
|
from .rpm import build_rpm_etc_index
|
|
|
|
return build_rpm_etc_index()
|
|
|
|
def specific_paths_for_hints(self, hints: Set[str]) -> List[str]:
|
|
paths: List[str] = []
|
|
for h in hints:
|
|
paths.extend(
|
|
[
|
|
f"/etc/sysconfig/{h}",
|
|
f"/etc/sysconfig/{h}.conf",
|
|
f"/etc/sysctl.d/{h}.conf",
|
|
]
|
|
)
|
|
return paths
|
|
|
|
def _config_files(self, pkg: str) -> Set[str]:
|
|
if pkg in self._config_cache:
|
|
return self._config_cache[pkg]
|
|
from .rpm import rpm_config_files
|
|
|
|
s = rpm_config_files(pkg)
|
|
self._config_cache[pkg] = s
|
|
return s
|
|
|
|
def _modified_files(self, pkg: str) -> Set[str]:
|
|
if pkg in self._modified_cache:
|
|
return self._modified_cache[pkg]
|
|
from .rpm import rpm_modified_files
|
|
|
|
s = rpm_modified_files(pkg)
|
|
self._modified_cache[pkg] = s
|
|
return s
|
|
|
|
def modified_paths(self, pkg: str, etc_paths: List[str]) -> Dict[str, str]:
|
|
out: Dict[str, str] = {}
|
|
modified = self._modified_files(pkg)
|
|
if not modified:
|
|
return out
|
|
config = self._config_files(pkg)
|
|
|
|
for path in etc_paths:
|
|
if not path.startswith("/etc/"):
|
|
continue
|
|
if self.is_pkg_config_path(path):
|
|
continue
|
|
if path not in modified:
|
|
continue
|
|
out[path] = (
|
|
"modified_conffile" if path in config else "modified_packaged_file"
|
|
)
|
|
return out
|
|
|
|
|
|
def get_backend(info: Optional[PlatformInfo] = None) -> PackageBackend:
|
|
info = info or detect_platform()
|
|
if info.pkg_backend == "dpkg":
|
|
return DpkgBackend()
|
|
if info.pkg_backend == "rpm":
|
|
return RpmBackend()
|
|
# Unknown: be conservative and use an rpm backend if rpm exists, otherwise dpkg.
|
|
if shutil.which("rpm"):
|
|
return RpmBackend()
|
|
return DpkgBackend()
|