Early work on tags
This commit is contained in:
parent
8cd9538a50
commit
0a04b25fe5
5 changed files with 520 additions and 9 deletions
190
bouquin/db.py
190
bouquin/db.py
|
|
@ -1,18 +1,29 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import csv
|
||||
import hashlib
|
||||
import html
|
||||
import json
|
||||
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from sqlcipher3 import dbapi2 as sqlite
|
||||
from typing import List, Sequence, Tuple
|
||||
from typing import List, Sequence, Tuple, Iterable
|
||||
|
||||
|
||||
from . import strings
|
||||
|
||||
Entry = Tuple[str, str]
|
||||
TagRow = Tuple[int, str, str]
|
||||
|
||||
_TAG_COLORS = [
|
||||
"#FFB3BA", # soft red
|
||||
"#FFDFBA", # soft orange
|
||||
"#FFFFBA", # soft yellow
|
||||
"#BAFFC9", # soft green
|
||||
"#BAE1FF", # soft blue
|
||||
"#E0BAFF", # soft purple
|
||||
]
|
||||
|
||||
@dataclass
|
||||
class DBConfig:
|
||||
|
|
@ -82,7 +93,6 @@ class DBManager:
|
|||
# Always keep FKs on
|
||||
cur.execute("PRAGMA foreign_keys = ON;")
|
||||
|
||||
# Create new versioned schema if missing (< 0.1.5)
|
||||
cur.executescript(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS pages (
|
||||
|
|
@ -103,6 +113,24 @@ class DBManager:
|
|||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS ux_versions_date_ver ON versions(date, version_no);
|
||||
CREATE INDEX IF NOT EXISTS ix_versions_date_created ON versions(date, created_at);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS tags (
|
||||
id INTEGER PRIMARY KEY,
|
||||
name TEXT NOT NULL UNIQUE,
|
||||
color TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS ix_tags_name ON tags(name);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS page_tags (
|
||||
page_date TEXT NOT NULL, -- FK to pages.date
|
||||
tag_id INTEGER NOT NULL, -- FK to tags.id
|
||||
PRIMARY KEY (page_date, tag_id),
|
||||
FOREIGN KEY(page_date) REFERENCES pages(date) ON DELETE CASCADE,
|
||||
FOREIGN KEY(tag_id) REFERENCES tags(id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS ix_page_tags_tag_id ON page_tags(tag_id);
|
||||
"""
|
||||
)
|
||||
self.conn.commit()
|
||||
|
|
@ -142,25 +170,35 @@ class DBManager:
|
|||
|
||||
def search_entries(self, text: str) -> list[str]:
|
||||
"""
|
||||
Search for entries by term. This only works against the latest
|
||||
version of the page.
|
||||
Search for entries by term or tag name.
|
||||
This only works against the latest version of the page.
|
||||
"""
|
||||
cur = self.conn.cursor()
|
||||
pattern = f"%{text}%"
|
||||
q = text.strip()
|
||||
pattern = f"%{q.lower()}%"
|
||||
|
||||
rows = cur.execute(
|
||||
"""
|
||||
SELECT p.date, v.content
|
||||
SELECT DISTINCT p.date, v.content
|
||||
FROM pages AS p
|
||||
JOIN versions AS v
|
||||
ON v.id = p.current_version_id
|
||||
LEFT JOIN page_tags pt
|
||||
ON pt.page_date = p.date
|
||||
LEFT JOIN tags t
|
||||
ON t.id = pt.tag_id
|
||||
WHERE TRIM(v.content) <> ''
|
||||
AND v.content LIKE LOWER(?) ESCAPE '\\'
|
||||
AND (
|
||||
LOWER(v.content) LIKE ?
|
||||
OR LOWER(COALESCE(t.name, '')) LIKE ?
|
||||
)
|
||||
ORDER BY p.date DESC;
|
||||
""",
|
||||
(pattern,),
|
||||
(pattern, pattern),
|
||||
).fetchall()
|
||||
return [(r[0], r[1]) for r in rows]
|
||||
|
||||
|
||||
def dates_with_content(self) -> list[str]:
|
||||
"""
|
||||
Find all entries and return the dates of them.
|
||||
|
|
@ -386,6 +424,142 @@ class DBManager:
|
|||
except Exception as e:
|
||||
print(f"{strings._('error')}: {e}")
|
||||
|
||||
# -------- Tags: helpers -------------------------------------------
|
||||
|
||||
def _default_tag_colour(self, name: str) -> str:
|
||||
"""
|
||||
Deterministically pick a colour for a tag name from a small palette.
|
||||
"""
|
||||
if not name:
|
||||
return "#CCCCCC"
|
||||
h = int(hashlib.sha1(name.encode("utf-8")).hexdigest()[:8], 16)
|
||||
return _TAG_COLORS[h % len(_TAG_COLORS)]
|
||||
|
||||
# -------- Tags: per-page -------------------------------------------
|
||||
|
||||
def get_tags_for_page(self, date_iso: str) -> list[TagRow]:
|
||||
"""
|
||||
Return (id, name, color) for all tags attached to this page/date.
|
||||
"""
|
||||
cur = self.conn.cursor()
|
||||
rows = cur.execute(
|
||||
"""
|
||||
SELECT t.id, t.name, t.color
|
||||
FROM page_tags pt
|
||||
JOIN tags t ON t.id = pt.tag_id
|
||||
WHERE pt.page_date = ?
|
||||
ORDER BY LOWER(t.name);
|
||||
""",
|
||||
(date_iso,),
|
||||
).fetchall()
|
||||
return [(r[0], r[1], r[2]) for r in rows]
|
||||
|
||||
def set_tags_for_page(self, date_iso: str, tag_names: Sequence[str]) -> None:
|
||||
"""
|
||||
Replace the tag set for a page with the given names.
|
||||
Creates new tags as needed (with auto colours).
|
||||
"""
|
||||
# Normalise + dedupe
|
||||
clean_names = []
|
||||
seen = set()
|
||||
for name in tag_names:
|
||||
name = name.strip()
|
||||
if not name:
|
||||
continue
|
||||
if name.lower() in seen:
|
||||
continue
|
||||
seen.add(name.lower())
|
||||
clean_names.append(name)
|
||||
|
||||
with self.conn:
|
||||
cur = self.conn.cursor()
|
||||
|
||||
# Ensure the page row exists even if there's no content yet
|
||||
cur.execute("INSERT OR IGNORE INTO pages(date) VALUES (?);", (date_iso,))
|
||||
|
||||
if not clean_names:
|
||||
# Just clear all tags for this page
|
||||
cur.execute("DELETE FROM page_tags WHERE page_date=?;", (date_iso,))
|
||||
return
|
||||
|
||||
# Ensure tag rows exist
|
||||
for name in clean_names:
|
||||
cur.execute(
|
||||
"""
|
||||
INSERT OR IGNORE INTO tags(name, color)
|
||||
VALUES (?, ?);
|
||||
""",
|
||||
(name, self._default_tag_colour(name)),
|
||||
)
|
||||
|
||||
# Lookup ids
|
||||
placeholders = ",".join("?" for _ in clean_names)
|
||||
rows = cur.execute(
|
||||
f"""
|
||||
SELECT id, name
|
||||
FROM tags
|
||||
WHERE name IN ({placeholders});
|
||||
""",
|
||||
tuple(clean_names),
|
||||
).fetchall()
|
||||
ids_by_name = {r["name"]: r["id"] for r in rows}
|
||||
|
||||
# Reset page_tags for this page
|
||||
cur.execute("DELETE FROM page_tags WHERE page_date=?;", (date_iso,))
|
||||
for name in clean_names:
|
||||
tag_id = ids_by_name.get(name)
|
||||
if tag_id is not None:
|
||||
cur.execute(
|
||||
"""
|
||||
INSERT OR IGNORE INTO page_tags(page_date, tag_id)
|
||||
VALUES (?, ?);
|
||||
""",
|
||||
(date_iso, tag_id),
|
||||
)
|
||||
|
||||
# -------- Tags: global management ----------------------------------
|
||||
|
||||
def list_tags(self) -> list[TagRow]:
|
||||
"""
|
||||
Return all tags in the database.
|
||||
"""
|
||||
cur = self.conn.cursor()
|
||||
rows = cur.execute(
|
||||
"""
|
||||
SELECT id, name, color
|
||||
FROM tags
|
||||
ORDER BY LOWER(name);
|
||||
"""
|
||||
).fetchall()
|
||||
return [(r[0], r[1], r[2]) for r in rows]
|
||||
|
||||
def update_tag(self, tag_id: int, name: str, color: str) -> None:
|
||||
"""
|
||||
Update a tag's name and colour.
|
||||
"""
|
||||
name = name.strip()
|
||||
color = color.strip() or "#CCCCCC"
|
||||
|
||||
with self.conn:
|
||||
cur = self.conn.cursor()
|
||||
cur.execute(
|
||||
"""
|
||||
UPDATE tags
|
||||
SET name = ?, color = ?
|
||||
WHERE id = ?;
|
||||
""",
|
||||
(name, color, tag_id),
|
||||
)
|
||||
|
||||
def delete_tag(self, tag_id: int) -> None:
|
||||
"""
|
||||
Delete a tag entirely (removes it from all pages).
|
||||
"""
|
||||
with self.conn:
|
||||
cur = self.conn.cursor()
|
||||
cur.execute("DELETE FROM page_tags WHERE tag_id=?;", (tag_id,))
|
||||
cur.execute("DELETE FROM tags WHERE id=?;", (tag_id,))
|
||||
|
||||
def close(self) -> None:
|
||||
if self.conn is not None:
|
||||
self.conn.close()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue