from __future__ import annotations import os from pathlib import Path def test_parse_login_defs_parses_known_keys(tmp_path: Path): from enroll.accounts import parse_login_defs p = tmp_path / "login.defs" p.write_text( """ # comment UID_MIN 1000 UID_MAX 60000 SYS_UID_MIN 100 SYS_UID_MAX 999 UID_MIN not_an_int OTHER 123 """, encoding="utf-8", ) vals = parse_login_defs(str(p)) assert vals["UID_MIN"] == 1000 assert vals["UID_MAX"] == 60000 assert vals["SYS_UID_MIN"] == 100 assert vals["SYS_UID_MAX"] == 999 assert "OTHER" not in vals def test_parse_passwd_and_group_and_ssh_files(tmp_path: Path): from enroll.accounts import find_user_ssh_files, parse_group, parse_passwd passwd = tmp_path / "passwd" passwd.write_text( "\n".join( [ "root:x:0:0:root:/root:/bin/bash", "# comment", "alice:x:1000:1000:Alice:/home/alice:/bin/bash", "bob:x:1001:1000:Bob:/home/bob:/usr/sbin/nologin", "badline", "cathy:x:notint:1000:Cathy:/home/cathy:/bin/bash", "", ] ), encoding="utf-8", ) group = tmp_path / "group" group.write_text( "\n".join( [ "root:x:0:", "users:x:1000:alice,bob", "admins:x:1002:alice", "badgroup:x:notint:alice", "", ] ), encoding="utf-8", ) rows = parse_passwd(str(passwd)) assert ("alice", 1000, 1000, "Alice", "/home/alice", "/bin/bash") in rows assert all(r[0] != "cathy" for r in rows) # skipped invalid UID gid_to_name, name_to_gid, members = parse_group(str(group)) assert gid_to_name[1000] == "users" assert name_to_gid["admins"] == 1002 assert "alice" in members["admins"] # ssh discovery: only authorized_keys, no symlinks home = tmp_path / "home" / "alice" sshdir = home / ".ssh" sshdir.mkdir(parents=True) ak = sshdir / "authorized_keys" ak.write_text("ssh-ed25519 AAA...", encoding="utf-8") # a symlink should be ignored (sshdir / "authorized_keys2").write_text("x", encoding="utf-8") os.symlink(str(sshdir / "authorized_keys2"), str(sshdir / "authorized_keys_link")) assert find_user_ssh_files(str(home)) == [str(ak)] def test_collect_non_system_users(monkeypatch, tmp_path: Path): import enroll.accounts as a orig_parse_login_defs = a.parse_login_defs orig_parse_passwd = a.parse_passwd orig_parse_group = a.parse_group # Provide controlled passwd/group/login.defs inputs via monkeypatch. passwd = tmp_path / "passwd" passwd.write_text( "\n".join( [ "root:x:0:0:root:/root:/bin/bash", "nobody:x:65534:65534:nobody:/nonexistent:/usr/sbin/nologin", "alice:x:1000:1000:Alice:/home/alice:/bin/bash", "sysuser:x:200:200:Sys:/home/sys:/bin/bash", "bob:x:1001:1000:Bob:/home/bob:/bin/false", "", ] ), encoding="utf-8", ) group = tmp_path / "group" group.write_text( "\n".join( [ "users:x:1000:alice,bob", "admins:x:1002:alice", "", ] ), encoding="utf-8", ) defs = tmp_path / "login.defs" defs.write_text("UID_MIN 1000\n", encoding="utf-8") monkeypatch.setattr( a, "parse_login_defs", lambda path=str(defs): orig_parse_login_defs(path) ) monkeypatch.setattr( a, "parse_passwd", lambda path=str(passwd): orig_parse_passwd(path) ) monkeypatch.setattr( a, "parse_group", lambda path=str(group): orig_parse_group(path) ) # Use a stable fake ssh discovery. monkeypatch.setattr( a, "find_user_ssh_files", lambda home: [f"{home}/.ssh/authorized_keys"] ) users = a.collect_non_system_users() assert [u.name for u in users] == ["alice"] u = users[0] assert u.primary_group == "users" assert u.supplementary_groups == ["admins"] assert u.ssh_files == ["/home/alice/.ssh/authorized_keys"] def test_parse_login_defs_file_not_found(tmp_path: Path): from enroll.accounts import parse_login_defs nonexistent = tmp_path / "nonexistent" / "login.defs" vals = parse_login_defs(str(nonexistent)) assert vals == {} def test_parse_login_defs_handles_invalid_numbers(tmp_path: Path): from enroll.accounts import parse_login_defs p = tmp_path / "login.defs" p.write_text("UID_MIN not_a_number\nUID_MAX 60000\n", encoding="utf-8") vals = parse_login_defs(str(p)) assert "UID_MIN" not in vals assert vals["UID_MAX"] == 60000 def test_parse_group_handles_invalid_gid(tmp_path: Path): from enroll.accounts import parse_group p = tmp_path / "group" p.write_text( "valid:x:1000:user1\n" "invalid_gid:x:notanint:user2\n", encoding="utf-8", ) gid_to_name, name_to_gid, members = parse_group(str(p)) assert 1000 in gid_to_name assert gid_to_name[1000] == "valid" assert "invalid_gid" not in name_to_gid def test_parse_group_line_too_short(tmp_path: Path): from enroll.accounts import parse_group p = tmp_path / "group" p.write_text( "valid:x:1000:user1\n" "shortline:x:1001\n", encoding="utf-8", ) gid_to_name, name_to_gid, members = parse_group(str(p)) assert 1000 in gid_to_name assert 1001 not in gid_to_name def test_is_human_user_filters_by_uid_and_shell(): from enroll.accounts import is_human_user assert is_human_user(1000, "/bin/bash", 1000) is True assert is_human_user(999, "/bin/bash", 1000) is False assert is_human_user(1000, "/usr/sbin/nologin", 1000) is False assert is_human_user(1000, "/usr/bin/nologin", 1000) is False assert is_human_user(1000, "/bin/false", 1000) is False assert is_human_user(1000, "", 1000) is True def test_find_user_ssh_files_no_ssh_dir(tmp_path: Path): from enroll.accounts import find_user_ssh_files home = tmp_path / "home" / "user" home.mkdir(parents=True) assert find_user_ssh_files(str(home)) == [] def test_find_user_ssh_files_ignores_symlink(tmp_path: Path): from enroll.accounts import find_user_ssh_files home = tmp_path / "home" / "user" sshdir = home / ".ssh" sshdir.mkdir(parents=True) target = sshdir / "real_file" target.write_text("x", encoding="utf-8") os.symlink(str(target), str(sshdir / "authorized_keys")) result = find_user_ssh_files(str(home)) assert result == [] def test_find_user_ssh_files_handles_home_not_starting_with_slash(): from enroll.accounts import find_user_ssh_files assert find_user_ssh_files("relative/path") == [] assert find_user_ssh_files("") == [] def test_collect_non_system_users_skips_nologin_users(tmp_path: Path): import enroll.accounts as a orig_parse_login_defs = a.parse_login_defs orig_parse_passwd = a.parse_passwd orig_parse_group = a.parse_group passwd = tmp_path / "passwd" passwd.write_text( "root:x:0:0:root:/root:/bin/bash\n" "alice:x:1000:1000:Alice:/home/alice:/bin/bash\n" "nobody:x:65534:65534:nobody:/nonexistent:/usr/sbin/nologin\n" "sysuser:x:100:100:Sys:/home/sys:/bin/bash\n", encoding="utf-8", ) group = tmp_path / "group" group.write_text("users:x:1000:alice\n", encoding="utf-8") defs = tmp_path / "login.defs" defs.write_text("UID_MIN 1000\n", encoding="utf-8") monkeypatch_wrapper = lambda fn, p: lambda path=str(p): fn(path) a.parse_login_defs = monkeypatch_wrapper(orig_parse_login_defs, defs) a.parse_passwd = monkeypatch_wrapper(orig_parse_passwd, passwd) a.parse_group = monkeypatch_wrapper(orig_parse_group, group) a.find_user_ssh_files = lambda home: [] users = a.collect_non_system_users() assert [u.name for u in users] == ["alice"] def test_collect_non_system_users_skips_below_uid_min(tmp_path: Path): import enroll.accounts as a orig_parse_login_defs = a.parse_login_defs orig_parse_passwd = a.parse_passwd orig_parse_group = a.parse_group passwd = tmp_path / "passwd" passwd.write_text( "root:x:0:0:root:/root:/bin/bash\n" "sysuser:x:999:999:Sys:/home/sys:/bin/bash\n" "alice:x:1000:1000:Alice:/home/alice:/bin/bash\n", encoding="utf-8", ) group = tmp_path / "group" group.write_text("users:x:1000:alice\n", encoding="utf-8") defs = tmp_path / "login.defs" defs.write_text("UID_MIN 1000\n", encoding="utf-8") a.parse_login_defs = lambda path=str(defs): orig_parse_login_defs(path) a.parse_passwd = lambda path=str(passwd): orig_parse_passwd(path) a.parse_group = lambda path=str(group): orig_parse_group(path) a.find_user_ssh_files = lambda home: [] users = a.collect_non_system_users() assert [u.name for u in users] == ["alice"] def test_parse_group_handles_empty_lines(tmp_path: Path): from enroll.accounts import parse_group p = tmp_path / "group" p.write_text( "valid:x:1000:user1\n" "\n" "another:x:1001:user2\n", encoding="utf-8", ) gid_to_name, name_to_gid, members = parse_group(str(p)) assert 1000 in gid_to_name assert 1001 in gid_to_name def test_parse_group_handles_short_lines(tmp_path: Path): from enroll.accounts import parse_group p = tmp_path / "group" p.write_text( "valid:x:1000:user1\n" "short:x:1001\n" "another:x:1002:user2\n", encoding="utf-8", ) gid_to_name, name_to_gid, members = parse_group(str(p)) assert 1000 in gid_to_name assert 1001 not in gid_to_name # skipped due to short line assert 1002 in gid_to_name def test_find_flatpaks_in_root_detects_remote_branch_and_arch(tmp_path: Path): import enroll.accounts as a root = tmp_path / "flatpak" (root / "repo").mkdir(parents=True) (root / "repo" / "config").write_text( '[remote "acme"]\nurl=https://flatpak.example/repo/\n', encoding="utf-8", ) ref = ( root / "repo" / "refs" / "remotes" / "acme" / "app" / "com.example.App" / "x86_64" / "stable" ) ref.parent.mkdir(parents=True) ref.write_text("checksum\n", encoding="utf-8") active = root / "app" / "com.example.App" / "x86_64" / "stable" / "active" active.mkdir(parents=True) remotes = a.find_flatpak_remotes(str(root), method="system") assert [(r.name, r.url, r.method) for r in remotes] == [ ("acme", "https://flatpak.example/repo/", "system") ] apps = a._find_flatpaks_in_root(str(root), method="system") assert len(apps) == 1 assert apps[0].name == "com.example.App" assert apps[0].remote == "acme" assert apps[0].branch == "stable" assert apps[0].arch == "x86_64" def test_parse_snap_list_output_detects_channel_revision_and_modes(): import enroll.accounts as a output = """Name Version Rev Tracking Publisher Notes code abc 123 latest/stable vscode✓ classic mydev 1.0 42 latest/edge example devmode,dangerous bare 1.0 5 latest/stable canonical✓ base """ snaps = {snap.name: snap for snap in a._parse_snap_list_output(output)} assert snaps["code"].channel == "latest/stable" assert snaps["code"].revision == 123 assert snaps["code"].classic is True assert snaps["mydev"].devmode is True assert snaps["mydev"].dangerous is True assert snaps["bare"].notes == ["base"] def test_parse_flatpak_list_output_detects_system_refs(): from enroll.accounts import _parse_flatpak_list_output output = "\n".join( [ "app/org.example.App/x86_64/stable\tflathub\tstable\tx86_64", "runtime/org.freedesktop.Platform/x86_64/24.08\tflathub\t24.08\tx86_64", ] ) refs = _parse_flatpak_list_output( output, method="system", columns=("ref", "origin", "branch", "arch") ) assert [(r.kind, r.name, r.remote, r.branch, r.arch) for r in refs] == [ ("app", "org.example.App", "flathub", "stable", "x86_64"), ("runtime", "org.freedesktop.Platform", "flathub", "24.08", "x86_64"), ] assert refs[0].source == "flatpak-list" def test_find_system_flatpaks_prefers_flatpak_list(monkeypatch): import subprocess import enroll.accounts as a calls = [] def fake_run(args, **kwargs): calls.append(args) if args == ["flatpak", "list", "--columns=help"]: return subprocess.CompletedProcess( args, 0, stdout="application\norigin\nbranch\narch\n", stderr="", ) return subprocess.CompletedProcess( args, 0, stdout="app/org.example.App/x86_64/stable\tacme\tstable\tx86_64\n", stderr="", ) monkeypatch.setattr(a.shutil, "which", lambda cmd: "/usr/bin/flatpak") monkeypatch.setattr(a.subprocess, "run", fake_run) monkeypatch.setattr( a, "_find_flatpaks_in_root", lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("fallback used")), ) refs = a.find_system_flatpaks() assert calls[0] == ["flatpak", "list", "--columns=help"] assert calls[1][:3] == ["flatpak", "list", "--system"] assert refs[0].name == "org.example.App" assert refs[0].method == "system" assert refs[0].remote == "acme" def test_parse_flatpak_list_output_detects_application_columns(): from enroll.accounts import _parse_flatpak_list_output output = "org.example.App\tflathub\tstable\tx86_64\n" refs = _parse_flatpak_list_output( output, method="system", columns=("application", "origin", "branch", "arch") ) assert len(refs) == 1 assert refs[0].name == "org.example.App" assert refs[0].kind is None assert refs[0].remote == "flathub" assert refs[0].branch == "stable" assert refs[0].arch == "x86_64" def test_parse_plain_flatpak_list_output_like_default_table(): from enroll.accounts import _parse_flatpak_list_output output = """Name Application ID Version Branch Installation Mesa org.freedesktop.Platform.GL.default 26.0.6 25.08 system Mesa (Extra) org.freedesktop.Platform.GL.default 26.0.6 25.08-extra system Codecs Extra Extension org.freedesktop.Platform.codecs-extra 25.08-extra system KDE Application Platform org.kde.Platform 6.10 system OnionShare org.onionshare.OnionShare 2.6.4 stable system """ refs = _parse_flatpak_list_output(output, method="system", columns=None) by_name_branch = {(r.name, r.branch) for r in refs} assert ("org.onionshare.OnionShare", "stable") in by_name_branch assert ("org.freedesktop.Platform.GL.default", "25.08") in by_name_branch assert ("org.freedesktop.Platform.GL.default", "25.08-extra") in by_name_branch assert ("org.kde.Platform", "6.10") in by_name_branch def test_parse_flatpak_columns_help_handles_description_table(): from enroll.accounts import _parse_flatpak_columns_help output = """ Available columns: application The application ID branch The branch installation The installation """ assert _parse_flatpak_columns_help(output) >= { "application", "branch", "installation", } def test_flatpak_list_attempts_respect_supported_columns(): from enroll.accounts import _flatpak_list_attempts attempts = _flatpak_list_attempts( "--system", {"application", "branch", "installation"} ) command_strings = [" ".join(args) for args, _columns in attempts] assert any("--columns=application,branch" in cmd for cmd in command_strings) assert not any("origin" in cmd for cmd in command_strings) assert command_strings[-1] == "flatpak list --system"