enroll/enroll/systemd.py
Miguel Jacq 054a6192d1
Some checks failed
Lint / test (push) Waiting to run
Trivy / test (push) Waiting to run
CI / test (push) Has been cancelled
Capture more singletons in /etc and avoid apt duplication
2025-12-27 19:02:22 +11:00

216 lines
5.6 KiB
Python

from __future__ import annotations
import re
import subprocess # nosec
from dataclasses import dataclass
from typing import List, Optional
@dataclass
class UnitInfo:
name: str
fragment_path: Optional[str]
dropin_paths: List[str]
env_files: List[str]
exec_paths: List[str]
active_state: Optional[str]
sub_state: Optional[str]
unit_file_state: Optional[str]
condition_result: Optional[str]
class UnitQueryError(RuntimeError):
def __init__(self, unit: str, stderr: str):
self.unit = unit
self.stderr = (stderr or "").strip()
super().__init__(f"systemctl show failed for {unit}: {self.stderr}")
def _run(cmd: list[str]) -> str:
p = subprocess.run(cmd, check=False, text=True, capture_output=True) # nosec
if p.returncode != 0:
raise RuntimeError(f"Command failed: {cmd}\n{p.stderr}")
return p.stdout
@dataclass
class TimerInfo:
name: str
fragment_path: Optional[str]
dropin_paths: List[str]
env_files: List[str]
trigger_unit: Optional[str]
active_state: Optional[str]
sub_state: Optional[str]
unit_file_state: Optional[str]
condition_result: Optional[str]
def list_enabled_services() -> List[str]:
out = _run(
[
"systemctl",
"list-unit-files",
"--type=service",
"--state=enabled",
"--no-legend",
]
)
units: List[str] = []
for line in out.splitlines():
parts = line.split()
if not parts:
continue
unit = parts[0].strip()
if not unit.endswith(".service"):
continue
# Skip template units like "getty@.service"
if unit.endswith("@.service") or "@.service" in unit:
continue
units.append(unit)
return sorted(set(units))
def list_enabled_timers() -> List[str]:
out = _run(
[
"systemctl",
"list-unit-files",
"--type=timer",
"--state=enabled",
"--no-legend",
]
)
units: List[str] = []
for line in out.splitlines():
parts = line.split()
if not parts:
continue
unit = parts[0].strip()
if not unit.endswith(".timer"):
continue
# Skip template units like "foo@.timer"
if unit.endswith("@.timer"):
continue
units.append(unit)
return sorted(set(units))
def get_unit_info(unit: str) -> UnitInfo:
p = subprocess.run(
[
"systemctl",
"show",
unit,
"-p",
"FragmentPath",
"-p",
"DropInPaths",
"-p",
"EnvironmentFiles",
"-p",
"ExecStart",
"-p",
"ActiveState",
"-p",
"SubState",
"-p",
"UnitFileState",
"-p",
"ConditionResult",
"--no-page",
], # nosec
check=False,
text=True,
capture_output=True,
)
if p.returncode != 0:
raise UnitQueryError(unit, p.stderr)
kv: dict[str, str] = {}
for line in (p.stdout or "").splitlines():
if "=" in line:
k, v = line.split("=", 1)
kv[k] = v.strip()
fragment = kv.get("FragmentPath") or None
dropins = [pp for pp in (kv.get("DropInPaths", "") or "").split() if pp]
env_files: List[str] = []
for token in (kv.get("EnvironmentFiles", "") or "").split():
token = token.lstrip("-")
if token:
env_files.append(token)
exec_paths = re.findall(r"path=([^ ;}]+)", kv.get("ExecStart", "") or "")
return UnitInfo(
name=unit,
fragment_path=fragment,
dropin_paths=sorted(set(dropins)),
env_files=sorted(set(env_files)),
exec_paths=sorted(set(exec_paths)),
active_state=kv.get("ActiveState") or None,
sub_state=kv.get("SubState") or None,
unit_file_state=kv.get("UnitFileState") or None,
condition_result=kv.get("ConditionResult") or None,
)
def get_timer_info(unit: str) -> TimerInfo:
p = subprocess.run(
[
"systemctl",
"show",
unit,
"-p",
"FragmentPath",
"-p",
"DropInPaths",
"-p",
"EnvironmentFiles",
"-p",
"Unit",
"-p",
"ActiveState",
"-p",
"SubState",
"-p",
"UnitFileState",
"-p",
"ConditionResult",
],
text=True,
capture_output=True,
) # nosec
if p.returncode != 0:
raise RuntimeError(f"systemctl show failed for {unit}: {p.stderr}")
kv: dict[str, str] = {}
for line in (p.stdout or "").splitlines():
if "=" in line:
k, v = line.split("=", 1)
kv[k] = v.strip()
fragment = kv.get("FragmentPath") or None
dropins = [pp for pp in (kv.get("DropInPaths", "") or "").split() if pp]
env_files: List[str] = []
for token in (kv.get("EnvironmentFiles", "") or "").split():
token = token.lstrip("-")
if token:
env_files.append(token)
trigger = kv.get("Unit") or None
return TimerInfo(
name=unit,
fragment_path=fragment,
dropin_paths=dropins,
env_files=env_files,
trigger_unit=trigger,
active_state=kv.get("ActiveState") or None,
sub_state=kv.get("SubState") or None,
unit_file_state=kv.get("UnitFileState") or None,
condition_result=kv.get("ConditionResult") or None,
)