137 lines
3.7 KiB
Python
137 lines
3.7 KiB
Python
import bouquin.db as dbmod
|
|
from bouquin.db import DBConfig, DBManager
|
|
|
|
|
|
class FakeCursor:
|
|
def __init__(self, rows=None):
|
|
self._rows = rows or []
|
|
self.executed = []
|
|
|
|
def execute(self, sql, params=None):
|
|
self.executed.append((sql, tuple(params) if params else None))
|
|
return self
|
|
|
|
def fetchall(self):
|
|
return list(self._rows)
|
|
|
|
def fetchone(self):
|
|
return self._rows[0] if self._rows else None
|
|
|
|
|
|
class FakeConn:
|
|
def __init__(self, rows=None):
|
|
self._rows = rows or []
|
|
self.closed = False
|
|
self.cursors = []
|
|
self.row_factory = None
|
|
|
|
def cursor(self):
|
|
c = FakeCursor(rows=self._rows)
|
|
self.cursors.append(c)
|
|
return c
|
|
|
|
def close(self):
|
|
self.closed = True
|
|
|
|
def commit(self):
|
|
pass
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, *a):
|
|
pass
|
|
|
|
|
|
def test_integrity_ok_ok(monkeypatch, tmp_path):
|
|
mgr = DBManager(DBConfig(tmp_path / "db.sqlite", key="x"))
|
|
mgr.conn = FakeConn(rows=[])
|
|
assert mgr._integrity_ok() is None
|
|
|
|
|
|
def test_integrity_ok_raises(monkeypatch, tmp_path):
|
|
mgr = DBManager(DBConfig(tmp_path / "db.sqlite", key="x"))
|
|
mgr.conn = FakeConn(rows=[("oops",), (None,)])
|
|
try:
|
|
mgr._integrity_ok()
|
|
except Exception as e:
|
|
assert isinstance(e, dbmod.sqlite.IntegrityError)
|
|
|
|
|
|
def test_connect_closes_on_integrity_failure(monkeypatch, tmp_path):
|
|
# Use a non-empty key to avoid SQLCipher complaining before our patch runs
|
|
mgr = DBManager(DBConfig(tmp_path / "db.sqlite", key="x"))
|
|
# Make the integrity check raise so connect() takes the failure path
|
|
monkeypatch.setattr(
|
|
DBManager,
|
|
"_integrity_ok",
|
|
lambda self: (_ for _ in ()).throw(RuntimeError("bad")),
|
|
)
|
|
ok = mgr.connect()
|
|
assert ok is False
|
|
assert mgr.conn is None
|
|
|
|
|
|
def test_rekey_not_connected_raises(tmp_path):
|
|
mgr = DBManager(DBConfig(tmp_path / "db.sqlite", key="old"))
|
|
mgr.conn = None
|
|
import pytest
|
|
|
|
with pytest.raises(RuntimeError):
|
|
mgr.rekey("new")
|
|
|
|
|
|
def test_rekey_reopen_failure(monkeypatch, tmp_path):
|
|
mgr = DBManager(DBConfig(tmp_path / "db.sqlite", key="old"))
|
|
mgr.conn = FakeConn(rows=[(None,)])
|
|
monkeypatch.setattr(DBManager, "connect", lambda self: False)
|
|
import pytest
|
|
|
|
with pytest.raises(Exception):
|
|
mgr.rekey("new")
|
|
|
|
|
|
def test_export_by_extension_and_unknown(tmp_path):
|
|
mgr = DBManager(DBConfig(tmp_path / "db.sqlite", key="x"))
|
|
entries = [("2025-01-01", "<b>Hi</b>")]
|
|
# Test each exporter writes the file
|
|
p = tmp_path / "out.json"
|
|
mgr.export_json(entries, str(p))
|
|
assert p.exists() and p.stat().st_size > 0
|
|
p = tmp_path / "out.csv"
|
|
mgr.export_csv(entries, str(p))
|
|
assert p.exists()
|
|
p = tmp_path / "out.txt"
|
|
mgr.export_txt(entries, str(p))
|
|
assert p.exists()
|
|
p = tmp_path / "out.html"
|
|
mgr.export_html(entries, str(p))
|
|
assert p.exists()
|
|
p = tmp_path / "out.md"
|
|
mgr.export_markdown(entries, str(p))
|
|
assert p.exists()
|
|
# Router
|
|
import types
|
|
|
|
mgr.get_all_entries = types.MethodType(lambda self: entries, mgr)
|
|
for ext in [".json", ".csv", ".txt", ".html", ".md"]:
|
|
path = tmp_path / f"route{ext}"
|
|
mgr.export_by_extension(str(path))
|
|
assert path.exists()
|
|
import pytest
|
|
|
|
with pytest.raises(ValueError):
|
|
mgr.export_by_extension(str(tmp_path / "x.zzz"))
|
|
|
|
|
|
def test_compact_error_prints(monkeypatch, tmp_path, capsys):
|
|
mgr = DBManager(DBConfig(tmp_path / "db.sqlite", key="x"))
|
|
|
|
class BadConn:
|
|
def cursor(self):
|
|
raise RuntimeError("no")
|
|
|
|
mgr.conn = BadConn()
|
|
mgr.compact()
|
|
out = capsys.readouterr().out
|
|
assert "Error:" in out
|