Make the bucket a ledger event - add project log view for such events and others (time log, invoicing etc)

This commit is contained in:
Miguel Jacq 2026-06-07 17:37:52 +10:00
parent 58333bf93c
commit 54af723b53
Signed by: mig5
GPG key ID: 03906B4110AAD3B8
5 changed files with 1175 additions and 496 deletions

View file

@ -323,6 +323,24 @@ class DBManager:
)
);
CREATE TABLE IF NOT EXISTS project_bucket_ledger (
id INTEGER PRIMARY KEY,
project_id INTEGER NOT NULL
REFERENCES projects(id) ON DELETE CASCADE,
occurred_at TEXT NOT NULL DEFAULT (
strftime('%Y-%m-%dT%H:%M:%fZ','now')
),
entry_type TEXT NOT NULL,
baseline_delta_minutes INTEGER NOT NULL DEFAULT 0,
ceiling_delta_minutes INTEGER NOT NULL DEFAULT 0,
description TEXT,
invoice_id INTEGER,
FOREIGN KEY(invoice_id) REFERENCES invoices(id) ON DELETE SET NULL
);
CREATE INDEX IF NOT EXISTS ix_project_bucket_ledger_project
ON project_bucket_ledger(project_id, occurred_at);
CREATE TABLE IF NOT EXISTS company_profile (
id INTEGER PRIMARY KEY CHECK (id = 1),
@ -352,6 +370,9 @@ class DBManager:
paid_at TEXT,
payment_note TEXT,
document_id INTEGER,
created_at TEXT NOT NULL DEFAULT (
strftime('%Y-%m-%dT%H:%M:%fZ','now')
),
FOREIGN KEY(document_id) REFERENCES project_documents(id)
ON DELETE SET NULL,
UNIQUE(project_id, invoice_number)
@ -382,8 +403,20 @@ class DBManager:
);
"""
)
self._ensure_column(
"invoices",
"created_at",
"created_at TEXT",
)
self.conn.commit()
def _ensure_column(self, table: str, column: str, definition: str) -> None:
"""Add a simple column during startup schema upgrades if needed."""
rows = self.conn.execute(f"PRAGMA table_info({table})").fetchall()
if any(str(r["name"]) == column for r in rows):
return
self.conn.execute(f"ALTER TABLE {table} ADD COLUMN {definition}")
def rekey(self, new_key: str) -> None:
"""
Change the SQLCipher passphrase in-place, then reopen the connection
@ -1243,24 +1276,35 @@ class DBManager:
raise ValueError("invalid project id")
return project_id
def upsert_project_bucket(
self,
project_id: int,
baseline_minutes: int,
bucket_ceiling_minutes: int,
warn_at_percent: float = 80.0,
) -> None:
"""Save cumulative prepaid-hour bucket settings for a project.
def _normalise_minutes_delta(self, minutes: int | float | None) -> int:
return int(round(float(minutes or 0)))
``baseline_minutes`` represents already-spent hours that pre-date
Bouquin time logging. ``bucket_ceiling_minutes`` is the cumulative
prepaid ceiling purchased for this project.
def _normalise_bucket_warning(self, warn_at_percent: float | None) -> float:
return min(100.0, max(0.0, float(warn_at_percent or 0.0)))
def _project_bucket_ledger_totals(self, project_id: int) -> tuple[int, int]:
row = self.conn.execute(
"""
project_id = self._normalise_project_id(project_id)
baseline_minutes = max(0, int(baseline_minutes or 0))
bucket_ceiling_minutes = max(0, int(bucket_ceiling_minutes or 0))
warn_at_percent = min(100.0, max(0.0, float(warn_at_percent or 0.0)))
with self.conn:
SELECT
COALESCE(SUM(baseline_delta_minutes), 0) AS baseline_minutes,
COALESCE(SUM(ceiling_delta_minutes), 0) AS bucket_ceiling_minutes
FROM project_bucket_ledger
WHERE project_id = ?;
""",
(project_id,),
).fetchone()
baseline = max(0, int(row["baseline_minutes"] or 0))
ceiling = max(0, int(row["bucket_ceiling_minutes"] or 0))
return baseline, ceiling
def _sync_project_bucket_cache(self, project_id: int) -> None:
"""Refresh the project_buckets cache from the ledger."""
baseline, ceiling = self._project_bucket_ledger_totals(project_id)
existing = self.conn.execute(
"SELECT warn_at_percent FROM project_buckets WHERE project_id = ?;",
(project_id,),
).fetchone()
warn_at = float(existing["warn_at_percent"] or 80.0) if existing else 80.0
self.conn.execute(
"""
INSERT INTO project_buckets (
@ -1273,23 +1317,87 @@ class DBManager:
ON CONFLICT(project_id) DO UPDATE SET
baseline_minutes = excluded.baseline_minutes,
bucket_ceiling_minutes = excluded.bucket_ceiling_minutes,
warn_at_percent = excluded.warn_at_percent,
warn_at_percent = project_buckets.warn_at_percent,
updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now');
""",
(project_id, baseline, ceiling, warn_at),
)
def add_project_bucket_ledger_entry(
self,
project_id: int,
entry_type: str,
baseline_delta_minutes: int = 0,
ceiling_delta_minutes: int = 0,
description: str | None = None,
invoice_id: int | None = None,
) -> int:
"""Append an auditable project bucket ledger entry.
Positive ``baseline_delta_minutes`` increases the already-spent baseline.
Positive ``ceiling_delta_minutes`` increases the prepaid bucket ceiling.
Negative deltas are used for explicit corrections when the user lowers a
previously saved baseline or ceiling.
"""
project_id = self._normalise_project_id(project_id)
entry_type = str(entry_type or "adjustment").strip() or "adjustment"
baseline_delta_minutes = self._normalise_minutes_delta(baseline_delta_minutes)
ceiling_delta_minutes = self._normalise_minutes_delta(ceiling_delta_minutes)
description = (description or "").strip() or None
if baseline_delta_minutes == 0 and ceiling_delta_minutes == 0:
raise ValueError("bucket ledger entry has no minute delta")
with self.conn:
cur = self.conn.cursor()
cur.execute(
"""
INSERT INTO project_bucket_ledger (
project_id,
entry_type,
baseline_delta_minutes,
ceiling_delta_minutes,
description,
invoice_id
)
VALUES (?, ?, ?, ?, ?, ?);
""",
(
project_id,
baseline_minutes,
bucket_ceiling_minutes,
warn_at_percent,
entry_type,
baseline_delta_minutes,
ceiling_delta_minutes,
description,
invoice_id,
),
)
ledger_id = cur.lastrowid
self._sync_project_bucket_cache(project_id)
return ledger_id
def add_to_project_bucket_ceiling(self, project_id: int, add_minutes: int) -> None:
"""Increase a project's cumulative bucket ceiling by ``add_minutes``."""
def upsert_project_bucket(
self,
project_id: int,
baseline_minutes: int,
bucket_ceiling_minutes: int,
warn_at_percent: float = 80.0,
) -> None:
"""Save project bucket settings as an auditable ledger adjustment.
``baseline_minutes`` represents already-spent hours that pre-date
Bouquin time logging. ``bucket_ceiling_minutes`` is the cumulative
prepaid ceiling purchased for this project. The current values are
derived from ``project_bucket_ledger`` rather than silently overwritten.
"""
project_id = self._normalise_project_id(project_id)
add_minutes = max(0, int(add_minutes or 0))
if add_minutes <= 0:
return
baseline_minutes = max(0, int(baseline_minutes or 0))
bucket_ceiling_minutes = max(0, int(bucket_ceiling_minutes or 0))
warn_at_percent = self._normalise_bucket_warning(warn_at_percent)
current_baseline, current_ceiling = self._project_bucket_ledger_totals(
project_id
)
baseline_delta = baseline_minutes - current_baseline
ceiling_delta = bucket_ceiling_minutes - current_ceiling
with self.conn:
self.conn.execute(
"""
@ -1299,12 +1407,68 @@ class DBManager:
bucket_ceiling_minutes,
warn_at_percent
)
VALUES (?, 0, ?, 80.0)
VALUES (?, ?, ?, ?)
ON CONFLICT(project_id) DO UPDATE SET
bucket_ceiling_minutes = bucket_ceiling_minutes + excluded.bucket_ceiling_minutes,
warn_at_percent = excluded.warn_at_percent,
updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now');
""",
(project_id, add_minutes),
(
project_id,
current_baseline,
current_ceiling,
warn_at_percent,
),
)
if baseline_delta or ceiling_delta:
self.conn.execute(
"""
INSERT INTO project_bucket_ledger (
project_id,
entry_type,
baseline_delta_minutes,
ceiling_delta_minutes,
description
)
VALUES (?, ?, ?, ?, ?);
""",
(
project_id,
"settings_adjustment",
baseline_delta,
ceiling_delta,
"Bucket settings updated",
),
)
self._sync_project_bucket_cache(project_id)
self.conn.execute(
"""
UPDATE project_buckets
SET warn_at_percent = ?,
updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')
WHERE project_id = ?;
""",
(warn_at_percent, project_id),
)
def add_to_project_bucket_ceiling(
self,
project_id: int,
add_minutes: int,
description: str | None = None,
invoice_id: int | None = None,
) -> None:
"""Increase a project's cumulative bucket ceiling by ``add_minutes``."""
project_id = self._normalise_project_id(project_id)
add_minutes = max(0, int(add_minutes or 0))
if add_minutes <= 0:
return
entry_type = "prepaid_invoice" if invoice_id is not None else "manual_topup"
self.add_project_bucket_ledger_entry(
project_id,
entry_type,
ceiling_delta_minutes=add_minutes,
description=description or "Bucket ceiling increased",
invoice_id=invoice_id,
)
def get_project_bucket(self, project_id: int):
@ -1326,6 +1490,24 @@ class DBManager:
""",
(project_id,),
).fetchone()
if row is None:
baseline, ceiling = self._project_bucket_ledger_totals(project_id)
if baseline == 0 and ceiling == 0:
return None
self._sync_project_bucket_cache(project_id)
row = self.conn.execute(
"""
SELECT
project_id,
baseline_minutes,
bucket_ceiling_minutes,
warn_at_percent,
updated_at
FROM project_buckets
WHERE project_id = ?;
""",
(project_id,),
).fetchone()
return row
def logged_minutes_for_project(self, project_id: int) -> int:
@ -1471,6 +1653,122 @@ class DBManager:
(project_id,),
).fetchall()
def project_bucket_ledger_for_project(self, project_id: int):
"""Return bucket ledger rows, including time-log consumption entries."""
try:
project_id = self._normalise_project_id(project_id)
except (TypeError, ValueError):
return []
rows = self.conn.execute(
"""
SELECT
l.occurred_at AS occurred_at,
l.entry_type AS entry_type,
l.description AS description,
l.baseline_delta_minutes AS baseline_delta_minutes,
l.ceiling_delta_minutes AS ceiling_delta_minutes,
0 AS used_delta_minutes,
l.invoice_id AS invoice_id,
i.invoice_number AS invoice_number,
NULL AS time_log_id,
NULL AS page_date,
NULL AS activity_name,
l.id AS source_id
FROM project_bucket_ledger AS l
LEFT JOIN invoices AS i ON i.id = l.invoice_id
WHERE l.project_id = ?
UNION ALL
SELECT
t.created_at AS occurred_at,
'time_log' AS entry_type,
COALESCE(NULLIF(t.note, ''), a.name) AS description,
0 AS baseline_delta_minutes,
0 AS ceiling_delta_minutes,
t.minutes AS used_delta_minutes,
NULL AS invoice_id,
NULL AS invoice_number,
t.id AS time_log_id,
t.page_date AS page_date,
a.name AS activity_name,
t.id AS source_id
FROM time_log AS t
JOIN activities AS a ON a.id = t.activity_id
WHERE t.project_id = ?
ORDER BY occurred_at DESC, source_id DESC;
""",
(project_id, project_id),
).fetchall()
return rows
def project_activity_log_for_project(self, project_id: int):
"""Return a generated project changelog from existing dated records."""
try:
project_id = self._normalise_project_id(project_id)
except (TypeError, ValueError):
return []
rows = self.conn.execute(
"""
SELECT
t.created_at AS occurred_at,
'time_log' AS event_type,
'Time logged' AS title,
printf('%.2f hours for %s%s',
t.minutes / 60.0,
a.name,
CASE
WHEN COALESCE(t.note, '') = '' THEN ''
ELSE ': ' || t.note
END) AS details,
t.id AS source_id
FROM time_log AS t
JOIN activities AS a ON a.id = t.activity_id
WHERE t.project_id = ?
UNION ALL
SELECT
d.uploaded_at AS occurred_at,
'document' AS event_type,
'Document added' AS title,
d.file_name || CASE
WHEN COALESCE(d.description, '') = '' THEN ''
ELSE ': ' || d.description
END AS details,
d.id AS source_id
FROM project_documents AS d
WHERE d.project_id = ?
UNION ALL
SELECT
COALESCE(i.created_at, i.issue_date) AS occurred_at,
'invoice' AS event_type,
'Invoice issued' AS title,
i.invoice_number || '' || printf('%.2f %s', i.total_cents / 100.0, i.currency) AS details,
i.id AS source_id
FROM invoices AS i
WHERE i.project_id = ?
UNION ALL
SELECT
l.occurred_at AS occurred_at,
'bucket' AS event_type,
'Bucket ledger updated' AS title,
COALESCE(l.description, l.entry_type) AS details,
l.id AS source_id
FROM project_bucket_ledger AS l
WHERE l.project_id = ?
ORDER BY occurred_at DESC, source_id DESC;
""",
(project_id, project_id, project_id, project_id),
).fetchall()
return rows
def list_activities(self) -> list[ActivityRow]:
cur = self.conn.cursor()
rows = cur.execute(
@ -2479,6 +2777,7 @@ class DBManager:
i.paid_at,
i.payment_note,
i.document_id,
i.created_at,
d.file_name AS document_file_name
FROM invoices AS i
LEFT JOIN projects AS p ON p.id = i.project_id

View file

@ -91,6 +91,7 @@ class InvoiceDialog(QDialog):
super().__init__(parent)
self._db = db
self._project_id = project_id
self.last_invoice_id: int | None = None
self._start = start_date_iso
self._end = end_date_iso
@ -661,6 +662,7 @@ class InvoiceDialog(QDialog):
line_items=[(li.description, li.hours, li.rate_cents) for li in items],
time_log_ids=time_log_ids,
)
self.last_invoice_id = invoice_id
# Automatically create a reminder for the invoice due date
if self.cfg.reminders:

View file

@ -469,5 +469,22 @@
"project_bucket_invoice_prepaid": "Invoice prepaid hours",
"project_prepaid_invoice_default_desc": "Prepaid support bucket ({hours:.2f} hours)",
"project_prepaid_invoice_hours_required": "Enter a prepaid-hours amount greater than zero before creating an invoice.",
"time_logs": "Time logs"
"time_logs": "Time logs",
"project_bucket_ledger_tab": "Bucket ledger",
"project_changelog_tab": "Project log",
"project_bucket_baseline_delta": "Baseline Δ",
"project_bucket_ceiling_delta": "Ceiling Δ",
"project_bucket_used_delta": "Used Δ",
"project_bucket_manual_topup_desc": "Manual bucket top-up ({hours:.2f} hours)",
"project_bucket_prepaid_invoice_desc": "Prepaid bucket invoice ({hours:.2f} hours)",
"project_bucket_ledger_type_settings_adjustment": "Bucket settings",
"project_bucket_ledger_type_manual_topup": "Manual top-up",
"project_bucket_ledger_type_prepaid_invoice": "Prepaid invoice",
"project_bucket_ledger_type_time_log": "Time logged",
"project_changelog_type_time_log": "Time log",
"project_changelog_type_document": "Document",
"project_changelog_type_invoice": "Invoice",
"project_changelog_type_bucket": "Bucket",
"summary": "Summary",
"details": "Details"
}

View file

@ -108,6 +108,18 @@ class ProjectsDialog(QDialog):
LOG_HOURS = 3
LOG_CREATED = 4
LEDGER_DATE = 0
LEDGER_TYPE = 1
LEDGER_BASELINE = 2
LEDGER_CEILING = 3
LEDGER_USED = 4
LEDGER_NOTE = 5
CHANGE_DATE = 0
CHANGE_TYPE = 1
CHANGE_TITLE = 2
CHANGE_DETAILS = 3
DOC_FILE = 0
DOC_ADDED = 1
DOC_DESCRIPTION = 2
@ -264,6 +276,72 @@ class ProjectsDialog(QDialog):
logs_layout.addWidget(self.time_logs_table, 1)
self.tabs.addTab(logs_tab, strings._("time_logs"))
ledger_tab = QWidget()
ledger_layout = QVBoxLayout(ledger_tab)
self.bucket_ledger_table = QTableWidget()
self.bucket_ledger_table.setColumnCount(6)
self.bucket_ledger_table.setHorizontalHeaderLabels(
[
strings._("date"),
strings._("type"),
strings._("project_bucket_baseline_delta"),
strings._("project_bucket_ceiling_delta"),
strings._("project_bucket_used_delta"),
strings._("note"),
]
)
self.bucket_ledger_table.setSelectionBehavior(QAbstractItemView.SelectRows)
self.bucket_ledger_table.setSelectionMode(QAbstractItemView.SingleSelection)
self.bucket_ledger_table.setEditTriggers(QAbstractItemView.NoEditTriggers)
ledger_header = self.bucket_ledger_table.horizontalHeader()
ledger_header.setSectionResizeMode(
self.LEDGER_DATE, QHeaderView.ResizeToContents
)
ledger_header.setSectionResizeMode(
self.LEDGER_TYPE, QHeaderView.ResizeToContents
)
ledger_header.setSectionResizeMode(
self.LEDGER_BASELINE, QHeaderView.ResizeToContents
)
ledger_header.setSectionResizeMode(
self.LEDGER_CEILING, QHeaderView.ResizeToContents
)
ledger_header.setSectionResizeMode(
self.LEDGER_USED, QHeaderView.ResizeToContents
)
ledger_header.setSectionResizeMode(self.LEDGER_NOTE, QHeaderView.Stretch)
ledger_layout.addWidget(self.bucket_ledger_table, 1)
self.tabs.addTab(ledger_tab, strings._("project_bucket_ledger_tab"))
changelog_tab = QWidget()
changelog_layout = QVBoxLayout(changelog_tab)
self.changelog_table = QTableWidget()
self.changelog_table.setColumnCount(4)
self.changelog_table.setHorizontalHeaderLabels(
[
strings._("date"),
strings._("type"),
strings._("summary"),
strings._("details"),
]
)
self.changelog_table.setSelectionBehavior(QAbstractItemView.SelectRows)
self.changelog_table.setSelectionMode(QAbstractItemView.SingleSelection)
self.changelog_table.setEditTriggers(QAbstractItemView.NoEditTriggers)
changelog_header = self.changelog_table.horizontalHeader()
changelog_header.setSectionResizeMode(
self.CHANGE_DATE, QHeaderView.ResizeToContents
)
changelog_header.setSectionResizeMode(
self.CHANGE_TYPE, QHeaderView.ResizeToContents
)
changelog_header.setSectionResizeMode(
self.CHANGE_TITLE, QHeaderView.ResizeToContents
)
changelog_header.setSectionResizeMode(self.CHANGE_DETAILS, QHeaderView.Stretch)
changelog_layout.addWidget(self.changelog_table, 1)
self.tabs.addTab(changelog_tab, strings._("project_changelog_tab"))
docs_tab = QWidget()
docs_layout = QVBoxLayout(docs_tab)
self.documents_table = QTableWidget()
@ -479,6 +557,8 @@ class ProjectsDialog(QDialog):
self.ceiling_spin.setValue(0.0)
self.warn_spin.setValue(80.0)
self.time_logs_table.setRowCount(0)
self.bucket_ledger_table.setRowCount(0)
self.changelog_table.setRowCount(0)
self.documents_table.setRowCount(0)
self.invoices_table.setRowCount(0)
return
@ -498,6 +578,8 @@ class ProjectsDialog(QDialog):
self.warn_spin.setValue(float(bucket["warn_at_percent"] if bucket else 80.0))
self._reload_time_logs(project_id)
self._reload_bucket_ledger(project_id)
self._reload_changelog(project_id)
self._reload_documents(project_id)
self._reload_invoices(project_id)
@ -523,6 +605,81 @@ class ProjectsDialog(QDialog):
row_idx, self.LOG_CREATED, QTableWidgetItem(r["created_at"] or "")
)
def _format_delta_hours(self, minutes: int | None, invert: bool = False) -> str:
minutes = int(minutes or 0)
if minutes == 0:
return ""
if invert:
minutes = -minutes
sign = "+" if minutes > 0 else "-"
return f"{sign}{hours_from_minutes(abs(minutes)):.2f}"
def _reload_bucket_ledger(self, project_id: int) -> None:
rows = self._db.project_bucket_ledger_for_project(project_id)
self.bucket_ledger_table.setRowCount(len(rows))
for row_idx, r in enumerate(rows):
entry_type = str(r["entry_type"] or "")
type_text = strings._(f"project_bucket_ledger_type_{entry_type}")
if type_text == f"project_bucket_ledger_type_{entry_type}":
type_text = entry_type.replace("_", " ").title()
note = r["description"] or ""
if r["invoice_number"]:
note = (
f"{note} ({r['invoice_number']})" if note else r["invoice_number"]
)
if entry_type == "time_log" and r["page_date"]:
activity = r["activity_name"] or ""
note = f"{r['page_date']}{activity}: {note}".strip()
self.bucket_ledger_table.setItem(
row_idx, self.LEDGER_DATE, QTableWidgetItem(r["occurred_at"] or "")
)
self.bucket_ledger_table.setItem(
row_idx, self.LEDGER_TYPE, QTableWidgetItem(type_text)
)
self.bucket_ledger_table.setItem(
row_idx,
self.LEDGER_BASELINE,
QTableWidgetItem(self._format_delta_hours(r["baseline_delta_minutes"])),
)
self.bucket_ledger_table.setItem(
row_idx,
self.LEDGER_CEILING,
QTableWidgetItem(self._format_delta_hours(r["ceiling_delta_minutes"])),
)
self.bucket_ledger_table.setItem(
row_idx,
self.LEDGER_USED,
QTableWidgetItem(
self._format_delta_hours(r["used_delta_minutes"], invert=True)
),
)
self.bucket_ledger_table.setItem(
row_idx, self.LEDGER_NOTE, QTableWidgetItem(note)
)
def _reload_changelog(self, project_id: int) -> None:
rows = self._db.project_activity_log_for_project(project_id)
self.changelog_table.setRowCount(len(rows))
for row_idx, r in enumerate(rows):
event_type = str(r["event_type"] or "")
type_text = strings._(f"project_changelog_type_{event_type}")
if type_text == f"project_changelog_type_{event_type}":
type_text = event_type.replace("_", " ").title()
self.changelog_table.setItem(
row_idx, self.CHANGE_DATE, QTableWidgetItem(r["occurred_at"] or "")
)
self.changelog_table.setItem(
row_idx, self.CHANGE_TYPE, QTableWidgetItem(type_text)
)
self.changelog_table.setItem(
row_idx, self.CHANGE_TITLE, QTableWidgetItem(r["title"] or "")
)
self.changelog_table.setItem(
row_idx, self.CHANGE_DETAILS, QTableWidgetItem(r["details"] or "")
)
def _reload_documents(self, project_id: int) -> None:
rows = self._db.documents_for_project(project_id)
self.documents_table.setRowCount(len(rows))
@ -592,7 +749,13 @@ class ProjectsDialog(QDialog):
add_minutes = minutes_from_hours(self.topup_spin.value())
if add_minutes <= 0:
return
self._db.add_to_project_bucket_ceiling(project_id, add_minutes)
self._db.add_to_project_bucket_ceiling(
project_id,
add_minutes,
description=strings._("project_bucket_manual_topup_desc").format(
hours=hours_from_minutes(add_minutes)
),
)
self.reload()
def _invoice_prepaid_hours(self) -> None:
@ -626,6 +789,16 @@ class ProjectsDialog(QDialog):
dialog._recalc_totals()
if dialog.exec() == QDialog.Accepted:
invoice_id = getattr(dialog, "last_invoice_id", None)
if invoice_id is not None:
self._db.add_to_project_bucket_ceiling(
project_id,
minutes_from_hours(hours),
description=strings._("project_bucket_prepaid_invoice_desc").format(
hours=hours
),
invoice_id=int(invoice_id),
)
self.reload()
def _selected_doc_id(self) -> tuple[int, str] | None:

View file

@ -578,3 +578,191 @@ def test_time_report_dialog_shows_bucket_for_selected_project_and_clears_for_all
dialog.project_combo.setCurrentIndex(dialog.project_combo.findData(None))
dialog._run_report()
assert dialog.bucket_label.text() == ""
def test_project_bucket_ledger_records_settings_topups_and_time_use(fresh_db):
project_id = _add_project(fresh_db, "Ledger Project")
fresh_db.upsert_project_bucket(project_id, 30, 120, 80.0)
fresh_db.add_to_project_bucket_ceiling(
project_id,
60,
description="Extra prepaid hours",
)
_add_minutes(fresh_db, project_id, 45, "Ledger work")
rows = fresh_db.project_bucket_ledger_for_project(project_id)
types = [r["entry_type"] for r in rows]
assert "settings_adjustment" in types
assert "manual_topup" in types
assert "time_log" in types
settings = next(r for r in rows if r["entry_type"] == "settings_adjustment")
assert settings["baseline_delta_minutes"] == 30
assert settings["ceiling_delta_minutes"] == 120
topup = next(r for r in rows if r["entry_type"] == "manual_topup")
assert topup["ceiling_delta_minutes"] == 60
assert topup["description"] == "Extra prepaid hours"
time_log = next(r for r in rows if r["entry_type"] == "time_log")
assert time_log["used_delta_minutes"] == 45
assert time_log["description"] == "Ledger work"
status = fresh_db.project_bucket_status(project_id)
assert status["baseline_minutes"] == 30
assert status["logged_minutes"] == 45
assert status["bucket_ceiling_minutes"] == 180
assert status["remaining_minutes"] == 105
def test_project_bucket_ledger_records_negative_corrections(fresh_db):
project_id = _add_project(fresh_db, "Correction Project")
fresh_db.upsert_project_bucket(project_id, 120, 240, 80.0)
fresh_db.upsert_project_bucket(project_id, 60, 180, 80.0)
rows = [
r
for r in fresh_db.project_bucket_ledger_for_project(project_id)
if r["entry_type"] == "settings_adjustment"
]
assert len(rows) == 2
assert any(r["baseline_delta_minutes"] == -60 for r in rows)
assert any(r["ceiling_delta_minutes"] == -60 for r in rows)
status = fresh_db.project_bucket_status(project_id)
assert status["baseline_minutes"] == 60
assert status["bucket_ceiling_minutes"] == 180
def test_project_activity_log_includes_time_documents_invoices_and_bucket_events(
fresh_db, tmp_path
):
project_id = _add_project(fresh_db, "Activity Log Project")
_add_minutes(fresh_db, project_id, 30, "Log work")
fresh_db.upsert_project_bucket(project_id, 0, 120, 80.0)
doc_path = tmp_path / "activity-doc.pdf"
doc_path.write_bytes(b"activity")
fresh_db.add_document_from_path(
project_id,
str(doc_path),
description="Activity document",
uploaded_at="2026-04-01",
)
fresh_db.create_invoice(
project_id=project_id,
invoice_number="ACT-001",
issue_date="2026-04-02",
due_date="2026-04-16",
currency="AUD",
tax_label=None,
tax_rate_percent=None,
detail_mode="summary",
line_items=[("Activity prepaid", 2.0, 10000)],
time_log_ids=[],
)
rows = fresh_db.project_activity_log_for_project(project_id)
event_types = [r["event_type"] for r in rows]
assert "time_log" in event_types
assert "document" in event_types
assert "invoice" in event_types
assert "bucket" in event_types
assert any("Log work" in r["details"] for r in rows)
assert any("activity-doc.pdf" in r["details"] for r in rows)
assert any("ACT-001" in r["details"] for r in rows)
assert any("Bucket settings updated" in r["details"] for r in rows)
def test_projects_dialog_shows_bucket_ledger_and_project_log(qtbot, fresh_db, tmp_path):
project_id = _add_project(fresh_db, "Ledger UI Project")
_add_minutes(fresh_db, project_id, 60, "UI ledger work")
fresh_db.upsert_project_bucket(project_id, 30, 120, 75.0)
fresh_db.add_to_project_bucket_ceiling(
project_id,
60,
description="UI top-up",
)
doc_path = tmp_path / "ledger-ui.pdf"
doc_path.write_bytes(b"ledger")
fresh_db.add_document_from_path(project_id, str(doc_path), uploaded_at="2026-05-01")
fresh_db.create_invoice(
project_id=project_id,
invoice_number="LEDGER-UI-001",
issue_date="2026-05-02",
due_date=None,
currency="AUD",
tax_label=None,
tax_rate_percent=None,
detail_mode="summary",
line_items=[("Ledger UI prepaid", 1.0, 10000)],
time_log_ids=[],
)
dialog = ProjectsDialog(fresh_db)
qtbot.addWidget(dialog)
assert dialog.bucket_ledger_table.rowCount() == 3
ledger_types = {
dialog.bucket_ledger_table.item(row, dialog.LEDGER_TYPE).text()
for row in range(dialog.bucket_ledger_table.rowCount())
}
assert "Bucket settings" in ledger_types
assert "Manual top-up" in ledger_types
assert "Time logged" in ledger_types
assert any(
dialog.bucket_ledger_table.item(row, dialog.LEDGER_NOTE).text()
for row in range(dialog.bucket_ledger_table.rowCount())
)
changelog_types = {
dialog.changelog_table.item(row, dialog.CHANGE_TYPE).text()
for row in range(dialog.changelog_table.rowCount())
}
assert {"Time log", "Document", "Invoice", "Bucket"}.issubset(changelog_types)
assert any(
"LEDGER-UI-001"
in dialog.changelog_table.item(row, dialog.CHANGE_DETAILS).text()
for row in range(dialog.changelog_table.rowCount())
)
def test_prepaid_invoice_dialog_adds_bucket_ledger_entry_when_invoice_is_created(
qtbot, fresh_db
):
project_id = _add_project(fresh_db, "Prepaid Ledger Project")
invoice_id = fresh_db.create_invoice(
project_id=project_id,
invoice_number="PREPAID-LEDGER-001",
issue_date="2026-06-01",
due_date=None,
currency="AUD",
tax_label=None,
tax_rate_percent=None,
detail_mode="summary",
line_items=[("Prepaid", 40.0, 15000)],
time_log_ids=[],
)
dialog = ProjectsDialog(fresh_db)
qtbot.addWidget(dialog)
dialog.topup_spin.setValue(40.0)
class _InvoiceDialogWithId(_FakeInvoiceDialog):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.last_invoice_id = invoice_id
with patch("bouquin.projects.InvoiceDialog", _InvoiceDialogWithId):
dialog._invoice_prepaid_hours()
row = next(
r
for r in fresh_db.project_bucket_ledger_for_project(project_id)
if r["entry_type"] == "prepaid_invoice"
)
assert row["ceiling_delta_minutes"] == 2400
assert row["invoice_id"] == invoice_id
assert "40.00 hours" in row["description"]