Refactor schema to support versioning of pages. Add HistoryDialog and diff with ability to revert.

This commit is contained in:
Miguel Jacq 2025-11-02 17:02:03 +11:00
parent cf9102939f
commit 82069053be
Signed by: mig5
GPG key ID: 59B3F0C24135C6A9
9 changed files with 520 additions and 33 deletions

View file

@ -1,3 +1,7 @@
# 0.1.5
* Refactor schema to support versioning of pages. Add HistoryDialog and diff with ability to revert.
# 0.1.4
* Add auto-lock of app (configurable in Settings, defaults to 15 minutes)

View file

@ -26,6 +26,7 @@ There is deliberately no network connectivity or syncing intended.
* Search
* Automatic periodic saving (or explicitly save)
* Transparent integrity checking of the database when it opens
* Automatic locking of the app after a period of inactivity (default 15 min)
* Rekey the database (change the password)
* Export the database to json, txt, html or csv

View file

@ -26,14 +26,17 @@ class DBManager:
self.conn: sqlite.Connection | None = None
def connect(self) -> bool:
"""
Open, decrypt and install schema on the database.
"""
# Ensure parent dir exists
self.cfg.path.parent.mkdir(parents=True, exist_ok=True)
self.conn = sqlite.connect(str(self.cfg.path))
self.conn.row_factory = sqlite.Row
cur = self.conn.cursor()
cur.execute(f"PRAGMA key = '{self.cfg.key}';")
cur.execute("PRAGMA journal_mode = WAL;")
self.conn.commit()
cur.execute("PRAGMA foreign_keys = ON;")
cur.execute("PRAGMA journal_mode = WAL;").fetchone()
try:
self._integrity_ok()
except Exception:
@ -44,15 +47,18 @@ class DBManager:
return True
def _integrity_ok(self) -> bool:
"""
Runs the cipher_integrity_check PRAGMA on the database.
"""
cur = self.conn.cursor()
cur.execute("PRAGMA cipher_integrity_check;")
rows = cur.fetchall()
# OK
# OK: nothing returned
if not rows:
return
# Not OK
# Not OK: rows of problems returned
details = "; ".join(str(r[0]) for r in rows if r and r[0] is not None)
raise sqlite.IntegrityError(
"SQLCipher integrity check failed"
@ -60,16 +66,62 @@ class DBManager:
)
def _ensure_schema(self) -> None:
"""
Install the expected schema on the database.
We also handle upgrades here.
"""
cur = self.conn.cursor()
cur.execute(
# Always keep FKs on
cur.execute("PRAGMA foreign_keys = ON;")
# Create new versioned schema if missing (< 0.1.5)
cur.executescript(
"""
CREATE TABLE IF NOT EXISTS entries (
date TEXT PRIMARY KEY, -- ISO yyyy-MM-dd
content TEXT NOT NULL
CREATE TABLE IF NOT EXISTS pages (
date TEXT PRIMARY KEY, -- yyyy-MM-dd
current_version_id INTEGER,
FOREIGN KEY(current_version_id) REFERENCES versions(id) ON DELETE SET NULL
);
CREATE TABLE IF NOT EXISTS versions (
id INTEGER PRIMARY KEY,
date TEXT NOT NULL, -- FK to pages.date
version_no INTEGER NOT NULL, -- 1,2,3 per date
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
note TEXT,
content TEXT NOT NULL,
FOREIGN KEY(date) REFERENCES pages(date) ON DELETE CASCADE
);
CREATE UNIQUE INDEX IF NOT EXISTS ux_versions_date_ver ON versions(date, version_no);
CREATE INDEX IF NOT EXISTS ix_versions_date_created ON versions(date, created_at);
"""
)
cur.execute("PRAGMA user_version = 1;")
# If < 0.1.5 'entries' table exists and nothing has been migrated yet, try to migrate.
pre_0_1_5 = cur.execute(
"SELECT 1 FROM sqlite_master WHERE type='table' AND name='entries';"
).fetchone()
pages_empty = cur.execute("SELECT 1 FROM pages LIMIT 1;").fetchone() is None
if pre_0_1_5 and pages_empty:
# Seed pages and versions (all as version 1)
cur.execute("INSERT OR IGNORE INTO pages(date) SELECT date FROM entries;")
cur.execute(
"INSERT INTO versions(date, version_no, content) "
"SELECT date, 1, content FROM entries;"
)
# Point head to v1 for each page
cur.execute(
"""
UPDATE pages
SET current_version_id = (
SELECT v.id FROM versions v
WHERE v.date = pages.date AND v.version_no = 1
);
"""
)
cur.execute("DROP TABLE IF EXISTS entries;")
self.conn.commit()
def rekey(self, new_key: str) -> None:
@ -92,42 +144,214 @@ class DBManager:
raise sqlite.Error("Re-open failed after rekey")
def get_entry(self, date_iso: str) -> str:
"""
Get a single entry by its date.
"""
cur = self.conn.cursor()
cur.execute("SELECT content FROM entries WHERE date = ?;", (date_iso,))
row = cur.fetchone()
row = cur.execute(
"""
SELECT v.content
FROM pages p
JOIN versions v ON v.id = p.current_version_id
WHERE p.date = ?;
""",
(date_iso,),
).fetchone()
return row[0] if row else ""
def upsert_entry(self, date_iso: str, content: str) -> None:
cur = self.conn.cursor()
cur.execute(
"""
INSERT INTO entries(date, content) VALUES(?, ?)
ON CONFLICT(date) DO UPDATE SET content = excluded.content;
""",
(date_iso, content),
)
self.conn.commit()
"""
Insert or update an entry.
"""
# Make a new version and set it as current
self.save_new_version(date_iso, content, note=None, set_current=True)
def search_entries(self, text: str) -> list[str]:
"""
Search for entries by term. This only works against the latest
version of the page.
"""
cur = self.conn.cursor()
pattern = f"%{text}%"
return cur.execute(
"SELECT * FROM entries WHERE TRIM(content) LIKE ?", (pattern,)
rows = cur.execute(
"""
SELECT p.date, v.content
FROM pages AS p
JOIN versions AS v
ON v.id = p.current_version_id
WHERE TRIM(v.content) <> ''
AND v.content LIKE LOWER(?) ESCAPE '\\'
ORDER BY p.date DESC;
""",
(pattern,),
).fetchall()
return [(r[0], r[1]) for r in rows]
def dates_with_content(self) -> list[str]:
"""
Find all entries and return the dates of them.
This is used to mark the calendar days in bold if they contain entries.
"""
cur = self.conn.cursor()
cur.execute("SELECT date FROM entries WHERE TRIM(content) <> '';")
return [r[0] for r in cur.fetchall()]
rows = cur.execute(
"""
SELECT p.date
FROM pages p
JOIN versions v ON v.id = p.current_version_id
WHERE TRIM(v.content) <> ''
ORDER BY p.date;
"""
).fetchall()
return [r[0] for r in rows]
def get_all_entries(self) -> List[Entry]:
# ------------------------- Versioning logic here ------------------------#
def save_new_version(
self,
date_iso: str,
content: str,
note: str | None = None,
set_current: bool = True,
) -> tuple[int, int]:
"""
Append a new version for this date. Returns (version_id, version_no).
If set_current=True, flips the page head to this new version.
"""
if self.conn is None:
raise RuntimeError("Database is not connected")
with self.conn: # transaction
cur = self.conn.cursor()
# Ensure page row exists
cur.execute("INSERT OR IGNORE INTO pages(date) VALUES (?);", (date_iso,))
# Next version number
row = cur.execute(
"SELECT COALESCE(MAX(version_no), 0) AS maxv FROM versions WHERE date=?;",
(date_iso,),
).fetchone()
next_ver = int(row["maxv"]) + 1
# Insert the version
cur.execute(
"INSERT INTO versions(date, version_no, content, note) "
"VALUES (?,?,?,?);",
(date_iso, next_ver, content, note),
)
ver_id = cur.lastrowid
if set_current:
cur.execute(
"UPDATE pages SET current_version_id=? WHERE date=?;",
(ver_id, date_iso),
)
return ver_id, next_ver
def list_versions(self, date_iso: str) -> list[dict]:
"""
Returns history for a given date (newest first), including which one is current.
Each item: {id, version_no, created_at, note, is_current}
"""
cur = self.conn.cursor()
rows = cur.execute("SELECT date, content FROM entries ORDER BY date").fetchall()
return [(row["date"], row["content"]) for row in rows]
rows = cur.execute(
"""
SELECT v.id, v.version_no, v.created_at, v.note,
CASE WHEN v.id = p.current_version_id THEN 1 ELSE 0 END AS is_current
FROM versions v
LEFT JOIN pages p ON p.date = v.date
WHERE v.date = ?
ORDER BY v.version_no DESC;
""",
(date_iso,),
).fetchall()
return [dict(r) for r in rows]
def get_version(
self,
*,
date_iso: str | None = None,
version_no: int | None = None,
version_id: int | None = None,
) -> dict | None:
"""
Fetch a specific version by (date, version_no) OR by version_id.
Returns a dict with keys: id, date, version_no, created_at, note, content.
"""
cur = self.conn.cursor()
if version_id is not None:
row = cur.execute(
"SELECT id, date, version_no, created_at, note, content "
"FROM versions WHERE id=?;",
(version_id,),
).fetchone()
else:
if date_iso is None or version_no is None:
raise ValueError(
"Provide either version_id OR (date_iso and version_no)"
)
row = cur.execute(
"SELECT id, date, version_no, created_at, note, content "
"FROM versions WHERE date=? AND version_no=?;",
(date_iso, version_no),
).fetchone()
return dict(row) if row else None
def revert_to_version(
self,
date_iso: str,
*,
version_no: int | None = None,
version_id: int | None = None,
) -> None:
"""
Point the page head (pages.current_version_id) to an existing version.
Fast revert: no content is rewritten.
"""
if self.conn is None:
raise RuntimeError("Database is not connected")
cur = self.conn.cursor()
if version_id is None:
if version_no is None:
raise ValueError("Provide version_no or version_id")
row = cur.execute(
"SELECT id FROM versions WHERE date=? AND version_no=?;",
(date_iso, version_no),
).fetchone()
if row is None:
raise ValueError("Version not found for this date")
version_id = int(row["id"])
else:
# Ensure that version_id belongs to the given date
row = cur.execute(
"SELECT date FROM versions WHERE id=?;", (version_id,)
).fetchone()
if row is None or row["date"] != date_iso:
raise ValueError("version_id does not belong to the given date")
with self.conn:
cur.execute(
"UPDATE pages SET current_version_id=? WHERE date=?;",
(version_id, date_iso),
)
# ------------------------- Export logic here ------------------------#
def get_all_entries(self) -> List[Entry]:
"""
Get all entries. Used for exports.
"""
cur = self.conn.cursor()
rows = cur.execute(
"""
SELECT p.date, v.content
FROM pages p
JOIN versions v ON v.id = p.current_version_id
ORDER BY p.date;
"""
).fetchall()
return [(r[0], r[1]) for r in rows]
def export_json(
self, entries: Sequence[Entry], file_path: str, pretty: bool = True
) -> None:
"""
Export to json.
"""
data = [{"date": d, "content": c} for d, c in entries]
with open(file_path, "w", encoding="utf-8") as f:
if pretty:

179
bouquin/history_dialog.py Normal file
View file

@ -0,0 +1,179 @@
from __future__ import annotations
import difflib, re, html as _html
from PySide6.QtCore import Qt, Slot
from PySide6.QtWidgets import (
QDialog,
QVBoxLayout,
QHBoxLayout,
QListWidget,
QListWidgetItem,
QPushButton,
QMessageBox,
QTextBrowser,
QTabWidget,
)
def _html_to_text(s: str) -> str:
"""Lightweight HTML→text for diff (keeps paragraphs/line breaks)."""
STYLE_SCRIPT_RE = re.compile(r"(?is)<(script|style)[^>]*>.*?</\1>")
COMMENT_RE = re.compile(r"<!--.*?-->", re.S)
BR_RE = re.compile(r"(?i)<br\s*/?>")
BLOCK_END_RE = re.compile(r"(?i)</(p|div|section|article|li|h[1-6])\s*>")
TAG_RE = re.compile(r"<[^>]+>")
MULTINL_RE = re.compile(r"\n{3,}")
s = STYLE_SCRIPT_RE.sub("", s)
s = COMMENT_RE.sub("", s)
s = BR_RE.sub("\n", s)
s = BLOCK_END_RE.sub("\n", s)
s = TAG_RE.sub("", s)
s = _html.unescape(s)
s = MULTINL_RE.sub("\n\n", s)
return s.strip()
def _colored_unified_diff_html(old_html: str, new_html: str) -> str:
"""Return HTML with colored unified diff (+ green, - red, context gray)."""
a = _html_to_text(old_html).splitlines()
b = _html_to_text(new_html).splitlines()
ud = difflib.unified_diff(a, b, fromfile="current", tofile="selected", lineterm="")
lines = []
for line in ud:
if line.startswith("+") and not line.startswith("+++"):
lines.append(
f"<span style='color:#116329'>+ {_html.escape(line[1:])}</span>"
)
elif line.startswith("-") and not line.startswith("---"):
lines.append(
f"<span style='color:#b31d28'>- {_html.escape(line[1:])}</span>"
)
elif line.startswith("@@"):
lines.append(f"<span style='color:#6f42c1'>{_html.escape(line)}</span>")
else:
lines.append(f"<span style='color:#586069'>{_html.escape(line)}</span>")
css = "pre { font-family: Consolas,Menlo,Monaco,monospace; font-size: 13px; }"
return f"<style>{css}</style><pre>{'<br>'.join(lines)}</pre>"
class HistoryDialog(QDialog):
"""Show versions for a date, preview, diff, and allow revert."""
def __init__(self, db, date_iso: str, parent=None):
super().__init__(parent)
self.setWindowTitle(f"History — {date_iso}")
self._db = db
self._date = date_iso
self._versions = [] # list[dict] from DB
self._current_id = None # id of current
root = QVBoxLayout(self)
# Top: list of versions
top = QHBoxLayout()
self.list = QListWidget()
self.list.setMinimumSize(500, 650)
self.list.currentItemChanged.connect(self._on_select)
top.addWidget(self.list, 1)
# Right: tabs (Preview / Diff vs current)
self.tabs = QTabWidget()
self.preview = QTextBrowser()
self.preview.setOpenExternalLinks(True)
self.diff = QTextBrowser()
self.diff.setOpenExternalLinks(False)
self.tabs.addTab(self.preview, "Preview")
self.tabs.addTab(self.diff, "Diff vs current")
self.tabs.setMinimumSize(500, 650)
top.addWidget(self.tabs, 2)
root.addLayout(top)
# Buttons
row = QHBoxLayout()
row.addStretch(1)
self.btn_revert = QPushButton("Revert to Selected")
self.btn_revert.clicked.connect(self._revert)
self.btn_close = QPushButton("Close")
self.btn_close.clicked.connect(self.reject)
row.addWidget(self.btn_revert)
row.addWidget(self.btn_close)
root.addLayout(row)
self._load_versions()
# --- Data/UX helpers ---
def _load_versions(self):
self._versions = self._db.list_versions(
self._date
) # [{id,version_no,created_at,note,is_current}]
self._current_id = next(
(v["id"] for v in self._versions if v["is_current"]), None
)
self.list.clear()
for v in self._versions:
label = f"v{v['version_no']}{v['created_at']}"
if v.get("note"):
label += f" · {v['note']}"
if v["is_current"]:
label += " **(current)**"
it = QListWidgetItem(label)
it.setData(Qt.UserRole, v["id"])
self.list.addItem(it)
# select the first non-current if available, else current
idx = 0
for i, v in enumerate(self._versions):
if not v["is_current"]:
idx = i
break
if self.list.count():
self.list.setCurrentRow(idx)
@Slot()
def _on_select(self):
item = self.list.currentItem()
if not item:
self.preview.clear()
self.diff.clear()
self.btn_revert.setEnabled(False)
return
sel_id = item.data(Qt.UserRole)
# Preview selected as HTML
sel = self._db.get_version(version_id=sel_id)
self.preview.setHtml(sel["content"])
# Diff vs current (textual diff)
cur = self._db.get_version(version_id=self._current_id)
self.diff.setHtml(_colored_unified_diff_html(cur["content"], sel["content"]))
# Enable revert only if selecting a non-current
self.btn_revert.setEnabled(sel_id != self._current_id)
@Slot()
def _revert(self):
item = self.list.currentItem()
if not item:
return
sel_id = item.data(Qt.UserRole)
if sel_id == self._current_id:
return
sel = self._db.get_version(version_id=sel_id)
vno = sel["version_no"]
# Confirm
if (
QMessageBox.question(
self,
"Revert",
f"Revert {self._date} to version v{vno}?\n\nYou can always change your mind later.",
QMessageBox.Yes | QMessageBox.No,
)
!= QMessageBox.Yes
):
return
# Flip head pointer
try:
self._db.revert_to_version(self._date, version_id=sel_id)
except Exception as e:
QMessageBox.critical(self, "Revert failed", str(e))
return
QMessageBox.information(self, "Reverted", f"{self._date} is now at v{vno}.")
self.accept() # let the caller refresh the editor

View file

@ -29,7 +29,9 @@ from PySide6.QtWidgets import (
from .db import DBManager
from .editor import Editor
from .history_dialog import HistoryDialog
from .key_prompt import KeyPrompt
from .save_dialog import SaveDialog
from .search import Search
from .settings import APP_ORG, APP_NAME, load_db_config, save_db_config
from .settings_dialog import SettingsDialog
@ -146,6 +148,7 @@ class MainWindow(QMainWindow):
self.toolBar.bulletsRequested.connect(self.editor.toggle_bullets)
self.toolBar.numbersRequested.connect(self.editor.toggle_numbers)
self.toolBar.alignRequested.connect(self.editor.setAlignment)
self.toolBar.historyRequested.connect(self._open_history)
split = QSplitter()
split.addWidget(left_panel)
@ -181,10 +184,15 @@ class MainWindow(QMainWindow):
# Menu bar (File)
mb = self.menuBar()
file_menu = mb.addMenu("&File")
act_save = QAction("&Save", self)
act_save = QAction("&Save a version", self)
act_save.setShortcut("Ctrl+S")
act_save.triggered.connect(lambda: self._save_current(explicit=True))
file_menu.addAction(act_save)
act_history = QAction("History", self)
act_history.setShortcut("Ctrl+H")
act_history.setShortcutContext(Qt.ApplicationShortcut)
act_history.triggered.connect(self._open_history)
file_menu.addAction(act_history)
act_settings = QAction("Settin&gs", self)
act_settings.setShortcut("Ctrl+G")
act_settings.triggered.connect(self._open_settings)
@ -330,7 +338,7 @@ class MainWindow(QMainWindow):
def _on_text_changed(self):
self._dirty = True
self._save_timer.start(1200) # autosave after idle
self._save_timer.start(10000) # autosave after idle
def _adjust_day(self, delta: int):
"""Move selection by delta days (negative for previous)."""
@ -358,7 +366,7 @@ class MainWindow(QMainWindow):
# Now load the newly selected date
self._load_selected_date()
def _save_date(self, date_iso: str, explicit: bool = False):
def _save_date(self, date_iso: str, explicit: bool = False, note: str = "autosave"):
"""
Save editor contents into the given date. Shows status on success.
explicit=True means user invoked Save: show feedback even if nothing changed.
@ -367,7 +375,7 @@ class MainWindow(QMainWindow):
return
text = self.editor.toHtml()
try:
self.db.upsert_entry(date_iso, text)
self.db.save_new_version(date_iso, text, note)
except Exception as e:
QMessageBox.critical(self, "Save Error", str(e))
return
@ -381,9 +389,34 @@ class MainWindow(QMainWindow):
)
def _save_current(self, explicit: bool = False):
try:
self._save_timer.stop()
except Exception:
pass
if explicit:
# Prompt for a note
dlg = SaveDialog(self)
if dlg.exec() != QDialog.Accepted:
return
note = dlg.note_text()
else:
note = "autosave"
# Delegate to _save_date for the currently selected date
self._save_date(self._current_date_iso(), explicit)
self._save_date(self._current_date_iso(), explicit, note)
try:
self._save_timer.start()
except Exception:
pass
def _open_history(self):
date_iso = self._current_date_iso()
dlg = HistoryDialog(self.db, date_iso, self)
if dlg.exec() == QDialog.Accepted:
# refresh editor + calendar (head pointer may have changed)
self._load_selected_date(date_iso)
self._refresh_calendar_marks()
# ----------- Settings handler ------------#
def _open_settings(self):
dlg = SettingsDialog(self.cfg, self.db, self)
if dlg.exec() != QDialog.Accepted:
@ -414,6 +447,7 @@ class MainWindow(QMainWindow):
self._load_selected_date()
self._refresh_calendar_marks()
# ------------ Window positioning --------------- #
def _restore_window_position(self):
geom = self.settings.value("main/geometry", None)
state = self.settings.value("main/windowState", None)
@ -447,6 +481,7 @@ class MainWindow(QMainWindow):
# Center the window in that screens available area
self.move(r.center() - self.rect().center())
# ----------------- Export handler ----------------- #
@Slot()
def _export(self):
try:

35
bouquin/save_dialog.py Normal file
View file

@ -0,0 +1,35 @@
from __future__ import annotations
import datetime
from PySide6.QtWidgets import (
QDialog,
QVBoxLayout,
QLabel,
QLineEdit,
QDialogButtonBox,
)
class SaveDialog(QDialog):
def __init__(
self,
parent=None,
title: str = "Enter a name for this version",
message: str = "Enter a name for this version?",
):
super().__init__(parent)
self.setWindowTitle(title)
v = QVBoxLayout(self)
v.addWidget(QLabel(message))
self.note = QLineEdit()
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.note.setText(f"New version I saved at {now}")
v.addWidget(self.note)
bb = QDialogButtonBox(QDialogButtonBox.Ok | QDialogButtonBox.Cancel)
bb.accepted.connect(self.accept)
bb.rejected.connect(self.reject)
v.addWidget(bb)
def note_text(self) -> str:
return self.note.text()

View file

@ -15,6 +15,7 @@ class ToolBar(QToolBar):
bulletsRequested = Signal()
numbersRequested = Signal()
alignRequested = Signal(Qt.AlignmentFlag)
historyRequested = Signal()
def __init__(self, parent=None):
super().__init__("Format", parent)
@ -76,6 +77,10 @@ class ToolBar(QToolBar):
lambda: self.alignRequested.emit(Qt.AlignRight)
)
# History button
self.actHistory = QAction("History", self)
self.actHistory.triggered.connect(self.historyRequested)
self.addActions(
[
self.actBold,
@ -92,6 +97,7 @@ class ToolBar(QToolBar):
self.actAlignL,
self.actAlignC,
self.actAlignR,
self.actHistory,
]
)
@ -120,6 +126,9 @@ class ToolBar(QToolBar):
self._style_letter_button(self.actAlignC, "C")
self._style_letter_button(self.actAlignR, "R")
# History
self._style_letter_button(self.actHistory, "View History")
def _style_letter_button(
self,
action: QAction,

View file

@ -1,6 +1,6 @@
[tool.poetry]
name = "bouquin"
version = "0.1.4"
version = "0.1.5"
description = "Bouquin is a simple, opinionated notebook application written in Python, PyQt and SQLCipher."
authors = ["Miguel Jacq <mig@mig5.net>"]
readme = "README.md"

Binary file not shown.

Before

Width:  |  Height:  |  Size: 80 KiB

After

Width:  |  Height:  |  Size: 82 KiB

Before After
Before After