Initial work on time logging
Some checks failed
CI / test (push) Has been cancelled
Lint / test (push) Has been cancelled
Trivy / test (push) Has been cancelled

This commit is contained in:
Miguel Jacq 2025-11-18 21:51:04 +11:00
parent 83f25405db
commit 55b78833ac
Signed by: mig5
GPG key ID: 59B3F0C24135C6A9
6 changed files with 1199 additions and 10 deletions

View file

@ -17,6 +17,18 @@ from . import strings
Entry = Tuple[str, str]
TagRow = Tuple[int, str, str]
ProjectRow = Tuple[int, str] # (id, name)
ActivityRow = Tuple[int, str] # (id, name)
TimeLogRow = Tuple[
int, # id
str, # page_date (yyyy-MM-dd)
int,
str, # project_id, project_name
int,
str, # activity_id, activity_name
int, # minutes
str | None, # note
]
_TAG_COLORS = [
"#FFB3BA", # soft red
@ -148,6 +160,38 @@ class DBManager:
);
CREATE INDEX IF NOT EXISTS ix_page_tags_tag_id ON page_tags(tag_id);
CREATE TABLE IF NOT EXISTS projects (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL UNIQUE
);
CREATE TABLE IF NOT EXISTS activities (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL UNIQUE
);
CREATE TABLE IF NOT EXISTS time_log (
id INTEGER PRIMARY KEY,
page_date TEXT NOT NULL, -- FK to pages.date (yyyy-MM-dd)
project_id INTEGER NOT NULL, -- FK to projects.id
activity_id INTEGER NOT NULL, -- FK to activities.id
minutes INTEGER NOT NULL, -- duration in minutes
note TEXT,
created_at TEXT NOT NULL DEFAULT (
strftime('%Y-%m-%dT%H:%M:%fZ','now')
),
FOREIGN KEY(page_date) REFERENCES pages(date) ON DELETE CASCADE,
FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE RESTRICT,
FOREIGN KEY(activity_id) REFERENCES activities(id) ON DELETE RESTRICT
);
CREATE INDEX IF NOT EXISTS ix_time_log_date
ON time_log(page_date);
CREATE INDEX IF NOT EXISTS ix_time_log_project
ON time_log(project_id);
CREATE INDEX IF NOT EXISTS ix_time_log_activity
ON time_log(activity_id);
"""
)
self.conn.commit()
@ -789,6 +833,216 @@ class DBManager:
edges = [(r["tag1"], r["tag2"], r["c"]) for r in rows]
return tags_by_id, edges, tag_page_counts
# -------- Time logging: projects & activities ---------------------
def list_projects(self) -> list[ProjectRow]:
cur = self.conn.cursor()
rows = cur.execute(
"SELECT id, name FROM projects ORDER BY LOWER(name);"
).fetchall()
return [(r["id"], r["name"]) for r in rows]
def add_project(self, name: str) -> int:
name = name.strip()
if not name:
raise ValueError("empty project name")
with self.conn:
cur = self.conn.cursor()
cur.execute(
"INSERT OR IGNORE INTO projects(name) VALUES (?);",
(name,),
)
row = cur.execute(
"SELECT id, name FROM projects WHERE name = ?;",
(name,),
).fetchone()
return row["id"]
def rename_project(self, project_id: int, new_name: str) -> None:
new_name = new_name.strip()
if not new_name:
return
with self.conn:
self.conn.execute(
"UPDATE projects SET name = ? WHERE id = ?;",
(new_name, project_id),
)
def delete_project(self, project_id: int) -> None:
with self.conn:
self.conn.execute(
"DELETE FROM projects WHERE id = ?;",
(project_id,),
)
def list_activities(self) -> list[ActivityRow]:
cur = self.conn.cursor()
rows = cur.execute(
"SELECT id, name FROM activities ORDER BY LOWER(name);"
).fetchall()
return [(r["id"], r["name"]) for r in rows]
def add_activity(self, name: str) -> int:
name = name.strip()
if not name:
raise ValueError("empty activity name")
with self.conn:
cur = self.conn.cursor()
cur.execute(
"INSERT OR IGNORE INTO activities(name) VALUES (?);",
(name,),
)
row = cur.execute(
"SELECT id, name FROM activities WHERE name = ?;",
(name,),
).fetchone()
return row["id"]
def rename_activity(self, activity_id: int, new_name: str) -> None:
new_name = new_name.strip()
if not new_name:
return
with self.conn:
self.conn.execute(
"UPDATE activities SET name = ? WHERE id = ?;",
(new_name, activity_id),
)
def delete_activity(self, activity_id: int) -> None:
with self.conn:
self.conn.execute(
"DELETE FROM activities WHERE id = ?;",
(activity_id,),
)
# -------- Time logging: entries -----------------------------------
def add_time_log(
self,
date_iso: str,
project_id: int,
activity_id: int,
minutes: int,
note: str | None = None,
) -> int:
with self.conn:
cur = self.conn.cursor()
# Ensure a page row exists even if there is no text content yet
cur.execute("INSERT OR IGNORE INTO pages(date) VALUES (?);", (date_iso,))
cur.execute(
"""
INSERT INTO time_log(page_date, project_id, activity_id, minutes, note)
VALUES (?, ?, ?, ?, ?);
""",
(date_iso, project_id, activity_id, minutes, note),
)
return cur.lastrowid
def update_time_log(
self,
entry_id: int,
project_id: int,
activity_id: int,
minutes: int,
note: str | None = None,
) -> None:
with self.conn:
self.conn.execute(
"""
UPDATE time_log
SET project_id = ?, activity_id = ?, minutes = ?, note = ?
WHERE id = ?;
""",
(project_id, activity_id, minutes, note, entry_id),
)
def delete_time_log(self, entry_id: int) -> None:
with self.conn:
self.conn.execute(
"DELETE FROM time_log WHERE id = ?;",
(entry_id,),
)
def time_log_for_date(self, date_iso: str) -> list[TimeLogRow]:
cur = self.conn.cursor()
rows = cur.execute(
"""
SELECT
t.id,
t.page_date,
t.project_id,
p.name AS project_name,
t.activity_id,
a.name AS activity_name,
t.minutes,
t.note
FROM time_log t
JOIN projects p ON p.id = t.project_id
JOIN activities a ON a.id = t.activity_id
WHERE t.page_date = ?
ORDER BY LOWER(p.name), LOWER(a.name), t.id;
""",
(date_iso,),
).fetchall()
result: list[TimeLogRow] = []
for r in rows:
result.append(
(
r["id"],
r["page_date"],
r["project_id"],
r["project_name"],
r["activity_id"],
r["activity_name"],
r["minutes"],
r["note"],
)
)
return result
def time_report(
self,
project_id: int,
start_date_iso: str,
end_date_iso: str,
granularity: str = "day", # 'day' | 'week' | 'month'
) -> list[tuple[str, str, int]]:
"""
Return (time_period, activity_name, total_minutes) tuples between start and end
for a project, grouped by period and activity.
time_period is:
- 'YYYY-MM-DD' for day
- 'YYYY-WW' for week
- 'YYYY-MM' for month
"""
if granularity == "day":
bucket_expr = "page_date"
elif granularity == "week":
# ISO-like year-week; SQLite weeks start at 00
bucket_expr = "strftime('%Y-%W', page_date)"
else: # month
bucket_expr = "substr(page_date, 1, 7)" # YYYY-MM
cur = self.conn.cursor()
rows = cur.execute(
f"""
SELECT
{bucket_expr} AS bucket,
a.name AS activity_name,
SUM(t.minutes) AS total_minutes
FROM time_log t
JOIN activities a ON a.id = t.activity_id
WHERE t.project_id = ?
AND t.page_date BETWEEN ? AND ?
GROUP BY bucket, activity_name
ORDER BY bucket, LOWER(activity_name);
""", # nosec B608: bucket_expr comes from a fixed internal list
(project_id, start_date_iso, end_date_iso),
).fetchall()
return [(r["bucket"], r["activity_name"], r["total_minutes"]) for r in rows]
def close(self) -> None:
if self.conn is not None:
self.conn.close()