8f24168dcd
conversation_chunks_fts is a standalone FTS5 table (no content= option).
The old INSERT ... VALUES('rebuild') is a no-op on standalone tables and
left deleted chunks searchable in the FTS shadow tables.
Fix: collect IDs before deletion, explicitly DELETE FROM conversation_chunks_fts
WHERE rowid IN (...) before removing from the main table. This keeps FTS
in sync after every vacuum call.
Tests: 303/303 passing. Vacuum tests now pass for the right reason.
980 lines
36 KiB
Python
980 lines
36 KiB
Python
"""Memory store: CRUD operations for all BigMind tiers."""
|
|
import uuid
|
|
import os
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
from bigmind.db import db, get_db_path
|
|
|
|
logger = logging.getLogger("BigMindStore")
|
|
|
|
|
|
def get_current_username() -> str:
|
|
return (
|
|
os.environ.get("BIGMIND_USER")
|
|
or os.environ.get("USER")
|
|
or os.environ.get("USERNAME")
|
|
or "default"
|
|
)
|
|
|
|
|
|
# ── USERS ──────────────────────────────────────────────────────────────────────
|
|
|
|
def get_or_create_user(username: str, display_name: str = None) -> dict:
|
|
with db() as conn:
|
|
row = conn.execute(
|
|
"SELECT * FROM users WHERE username = ?", (username,)
|
|
).fetchone()
|
|
if row:
|
|
conn.execute(
|
|
"UPDATE users SET last_seen = ? WHERE id = ?",
|
|
(datetime.now(timezone.utc).isoformat(), row["id"]),
|
|
)
|
|
return dict(row)
|
|
uid = str(uuid.uuid4())
|
|
conn.execute(
|
|
"INSERT INTO users (id, username, display_name, last_seen) VALUES (?,?,?,?)",
|
|
(uid, username, display_name or username,
|
|
datetime.now(timezone.utc).isoformat()),
|
|
)
|
|
return {
|
|
"id": uid, "username": username,
|
|
"display_name": display_name or username, "role": "member",
|
|
}
|
|
|
|
|
|
# ── TIER 0 ─────────────────────────────────────────────────────────────────────
|
|
|
|
def get_identity_profile(user_id: str) -> Optional[dict]:
|
|
with db() as conn:
|
|
row = conn.execute(
|
|
"SELECT * FROM identity_profile WHERE user_id = ?", (user_id,)
|
|
).fetchone()
|
|
return dict(row) if row else None
|
|
|
|
|
|
def upsert_identity_profile(
|
|
user_id: str,
|
|
role: str = None,
|
|
preferences: str = None,
|
|
pinned_facts: str = None,
|
|
) -> dict:
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
with db() as conn:
|
|
existing = conn.execute(
|
|
"SELECT id FROM identity_profile WHERE user_id = ?", (user_id,)
|
|
).fetchone()
|
|
if existing:
|
|
conn.execute(
|
|
"""UPDATE identity_profile
|
|
SET role=COALESCE(?,role),
|
|
preferences=COALESCE(?,preferences),
|
|
pinned_facts=COALESCE(?,pinned_facts),
|
|
updated_at=?
|
|
WHERE user_id=?""",
|
|
(role, preferences, pinned_facts, now, user_id),
|
|
)
|
|
else:
|
|
conn.execute(
|
|
"""INSERT INTO identity_profile
|
|
(id, user_id, role, preferences, pinned_facts, updated_at)
|
|
VALUES (?,?,?,?,?,?)""",
|
|
(user_id, user_id, role, preferences, pinned_facts, now),
|
|
)
|
|
row = conn.execute(
|
|
"SELECT * FROM identity_profile WHERE user_id=?", (user_id,)
|
|
).fetchone()
|
|
return dict(row)
|
|
|
|
|
|
# ── TIER 1 ─────────────────────────────────────────────────────────────────────
|
|
|
|
def create_session(user_id: str) -> str:
|
|
session_id = str(uuid.uuid4())
|
|
with db() as conn:
|
|
conn.execute(
|
|
"""INSERT INTO sessions (id, user_id, started_at, one_liner)
|
|
VALUES (?, ?, ?, '[session in progress]')""",
|
|
(session_id, user_id, datetime.now(timezone.utc).isoformat()),
|
|
)
|
|
return session_id
|
|
|
|
|
|
def close_session(
|
|
session_id: str,
|
|
one_liner: str,
|
|
topics: str = None,
|
|
outcome: str = None,
|
|
importance: int = 5,
|
|
) -> None:
|
|
with db() as conn:
|
|
conn.execute(
|
|
"""UPDATE sessions
|
|
SET ended_at=?, one_liner=?, topics=?, outcome=?, importance=?,
|
|
current_focus=NULL, focus_files=NULL, focus_updated_at=NULL
|
|
WHERE id=?""",
|
|
(
|
|
datetime.now(timezone.utc).isoformat(),
|
|
one_liner[:120],
|
|
topics,
|
|
outcome,
|
|
importance,
|
|
session_id,
|
|
),
|
|
)
|
|
|
|
|
|
def save_session_summary(
|
|
session_id: str,
|
|
summary: str,
|
|
key_facts: str = None,
|
|
code_refs: str = None,
|
|
) -> None:
|
|
with db() as conn:
|
|
existing = conn.execute(
|
|
"SELECT id FROM session_summaries WHERE id=?", (session_id,)
|
|
).fetchone()
|
|
if existing:
|
|
conn.execute(
|
|
"""UPDATE session_summaries
|
|
SET summary=?, key_facts=?, code_refs=? WHERE id=?""",
|
|
(summary, key_facts, code_refs, session_id),
|
|
)
|
|
else:
|
|
conn.execute(
|
|
"""INSERT INTO session_summaries (id, summary, key_facts, code_refs)
|
|
VALUES (?,?,?,?)""",
|
|
(session_id, summary, key_facts, code_refs),
|
|
)
|
|
conn.execute(
|
|
"UPDATE sessions SET has_tier2=1 WHERE id=?", (session_id,)
|
|
)
|
|
|
|
|
|
def get_recent_sessions(user_id: str, limit: int = 10) -> list:
|
|
with db() as conn:
|
|
rows = conn.execute(
|
|
"""SELECT id, started_at, ended_at, one_liner, topics,
|
|
outcome, importance, has_tier2
|
|
FROM sessions
|
|
WHERE user_id=? AND ended_at IS NOT NULL
|
|
ORDER BY started_at DESC LIMIT ?""",
|
|
(user_id, limit),
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
def get_session_detail(session_id: str) -> Optional[dict]:
|
|
with db() as conn:
|
|
row = conn.execute(
|
|
"SELECT * FROM session_summaries WHERE id=?", (session_id,)
|
|
).fetchone()
|
|
return dict(row) if row else None
|
|
|
|
|
|
def get_open_sessions(user_id: str) -> list:
|
|
with db() as conn:
|
|
rows = conn.execute(
|
|
"SELECT * FROM sessions WHERE user_id=? AND ended_at IS NULL",
|
|
(user_id,),
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
def announce_focus(
|
|
session_id: str,
|
|
description: str,
|
|
files: list = None,
|
|
ide_hint: str = None,
|
|
) -> dict:
|
|
"""Atomically update this session's focus and check for conflicts with other open sessions.
|
|
|
|
Uses BEGIN IMMEDIATE to make the conflict-check + write atomic — eliminates the
|
|
TOCTOU race condition where two sessions could both pass the conflict check before
|
|
either writes. Returns a dict with 'conflicts' (list of colliding sessions) and
|
|
'updated' (bool).
|
|
"""
|
|
import json
|
|
files = files or []
|
|
files_json = json.dumps(files)
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
|
|
conflicts = []
|
|
conn = None
|
|
try:
|
|
from bigmind.db import get_connection
|
|
conn = get_connection()
|
|
# BEGIN IMMEDIATE acquires the write lock before we read other sessions —
|
|
# no other writer can sneak in between our check and our update.
|
|
conn.execute("BEGIN IMMEDIATE")
|
|
|
|
# Find other open sessions that share any of our files
|
|
if files:
|
|
other_sessions = conn.execute(
|
|
"""SELECT id, current_focus, focus_files, ide_hint, focus_updated_at
|
|
FROM sessions
|
|
WHERE user_id = (SELECT user_id FROM sessions WHERE id=?)
|
|
AND ended_at IS NULL
|
|
AND id != ?
|
|
AND focus_files IS NOT NULL""",
|
|
(session_id, session_id),
|
|
).fetchall()
|
|
|
|
for row in other_sessions:
|
|
try:
|
|
other_files = json.loads(row["focus_files"] or "[]")
|
|
except (json.JSONDecodeError, TypeError):
|
|
other_files = []
|
|
overlapping = [f for f in files if f in other_files]
|
|
if overlapping:
|
|
conflicts.append({
|
|
"session_id": row["id"][:8],
|
|
"ide_hint": row["ide_hint"],
|
|
"focus": row["current_focus"],
|
|
"overlapping_files": overlapping,
|
|
"focus_updated_at": row["focus_updated_at"],
|
|
})
|
|
|
|
# Write our focus atomically — under the same lock as the check above
|
|
update_fields: list = [description, files_json, now]
|
|
if ide_hint is not None:
|
|
conn.execute(
|
|
"""UPDATE sessions
|
|
SET current_focus=?, focus_files=?, focus_updated_at=?, ide_hint=?
|
|
WHERE id=?""",
|
|
(description, files_json, now, ide_hint, session_id),
|
|
)
|
|
else:
|
|
conn.execute(
|
|
"""UPDATE sessions
|
|
SET current_focus=?, focus_files=?, focus_updated_at=?
|
|
WHERE id=?""",
|
|
(description, files_json, now, session_id),
|
|
)
|
|
|
|
conn.commit()
|
|
except Exception:
|
|
if conn:
|
|
conn.rollback()
|
|
raise
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
return {"updated": True, "conflicts": conflicts}
|
|
|
|
|
|
def get_active_sessions(user_id: str) -> list:
|
|
"""Return all open sessions with their focus data and idle_minutes computed."""
|
|
import json
|
|
now = datetime.now(timezone.utc)
|
|
with db() as conn:
|
|
rows = conn.execute(
|
|
"""SELECT id, started_at, current_focus, focus_files,
|
|
focus_updated_at, ide_hint
|
|
FROM sessions
|
|
WHERE user_id=? AND ended_at IS NULL
|
|
ORDER BY COALESCE(focus_updated_at, started_at) DESC""",
|
|
(user_id,),
|
|
).fetchall()
|
|
|
|
result = []
|
|
for row in rows:
|
|
r = dict(row)
|
|
# Compute idle_minutes from focus_updated_at (or started_at as fallback)
|
|
ts_str = r.get("focus_updated_at") or r.get("started_at")
|
|
idle_minutes = None
|
|
if ts_str:
|
|
try:
|
|
ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
|
|
if ts.tzinfo is None:
|
|
ts = ts.replace(tzinfo=timezone.utc)
|
|
idle_minutes = int((now - ts).total_seconds() / 60)
|
|
except (ValueError, TypeError):
|
|
idle_minutes = None
|
|
|
|
try:
|
|
files = json.loads(r.get("focus_files") or "[]")
|
|
except (json.JSONDecodeError, TypeError):
|
|
files = []
|
|
|
|
result.append({
|
|
"session_id": r["id"],
|
|
"ide_hint": r.get("ide_hint"),
|
|
"focus": r.get("current_focus"),
|
|
"files": files,
|
|
"focus_updated_at": r.get("focus_updated_at"),
|
|
"idle_minutes": idle_minutes,
|
|
})
|
|
return result
|
|
|
|
|
|
# ── TOKEN SAVES ────────────────────────────────────────────────────────────────
|
|
|
|
def log_token_save(
|
|
session_id: str,
|
|
user_id: str,
|
|
description: str,
|
|
tokens_saved_estimate: int,
|
|
method_used: str = None,
|
|
) -> int:
|
|
"""Record a token efficiency event in the token_saves table. Returns the new row id."""
|
|
with db() as conn:
|
|
cur = conn.execute(
|
|
"""INSERT INTO token_saves (session_id, user_id, description, tokens_saved_estimate, method_used)
|
|
VALUES (?,?,?,?,?)""",
|
|
(session_id, user_id, description, tokens_saved_estimate, method_used),
|
|
)
|
|
return cur.lastrowid
|
|
|
|
|
|
def get_token_efficiency_stats(user_id: str, session_id: str = None) -> dict:
|
|
"""Return aggregated token efficiency stats for profile display."""
|
|
with db() as conn:
|
|
total = conn.execute(
|
|
"SELECT COALESCE(SUM(tokens_saved_estimate),0) FROM token_saves WHERE user_id=?",
|
|
(user_id,),
|
|
).fetchone()[0]
|
|
|
|
session_total = 0
|
|
if session_id:
|
|
session_total = conn.execute(
|
|
"SELECT COALESCE(SUM(tokens_saved_estimate),0) FROM token_saves WHERE user_id=? AND session_id=?",
|
|
(user_id, session_id),
|
|
).fetchone()[0]
|
|
|
|
best_row = conn.execute(
|
|
"""SELECT description, tokens_saved_estimate, method_used, created_at
|
|
FROM token_saves WHERE user_id=?
|
|
ORDER BY tokens_saved_estimate DESC LIMIT 1""",
|
|
(user_id,),
|
|
).fetchone()
|
|
|
|
by_method = conn.execute(
|
|
"""SELECT method_used, SUM(tokens_saved_estimate) as total
|
|
FROM token_saves WHERE user_id=?
|
|
GROUP BY method_used ORDER BY total DESC""",
|
|
(user_id,),
|
|
).fetchall()
|
|
|
|
recent = conn.execute(
|
|
"""SELECT description, tokens_saved_estimate, method_used, created_at
|
|
FROM token_saves WHERE user_id=?
|
|
ORDER BY created_at DESC LIMIT 5""",
|
|
(user_id,),
|
|
).fetchall()
|
|
|
|
return {
|
|
"total_tokens_saved": total,
|
|
"session_tokens_saved": session_total,
|
|
"best_save": dict(best_row) if best_row else None,
|
|
"by_method": [dict(r) for r in by_method],
|
|
"recent_saves": [dict(r) for r in recent],
|
|
}
|
|
|
|
|
|
# ── TIER 3 ─────────────────────────────────────────────────────────────────────
|
|
|
|
def append_chunk(
|
|
session_id: str,
|
|
user_id: str,
|
|
role: str,
|
|
content: str,
|
|
flag_reason: str = None,
|
|
) -> int:
|
|
with db() as conn:
|
|
max_seq = conn.execute(
|
|
"SELECT COALESCE(MAX(seq),0) FROM conversation_chunks WHERE session_id=?",
|
|
(session_id,),
|
|
).fetchone()[0]
|
|
seq = max_seq + 1
|
|
cur = conn.execute(
|
|
"""INSERT INTO conversation_chunks
|
|
(session_id, user_id, role, content, flag_reason, seq)
|
|
VALUES (?,?,?,?,?,?)""",
|
|
(session_id, user_id, role, content, flag_reason, seq),
|
|
)
|
|
chunk_id = cur.lastrowid
|
|
# Keep FTS in sync — rowid of FTS row = chunk_id
|
|
conn.execute(
|
|
"INSERT INTO conversation_chunks_fts(rowid, content, flag_reason) VALUES(?,?,?)",
|
|
(chunk_id, content, flag_reason or ""),
|
|
)
|
|
return chunk_id
|
|
|
|
|
|
def _fts_safe_query(query: str) -> str:
|
|
"""Wrap each token in double-quotes for safe FTS5 matching.
|
|
|
|
Prevents FTS5 reserved-word collisions (rank, content, category, etc.)
|
|
while correctly AND-matching multi-word queries — NOT phrase matching.
|
|
|
|
FTS5 semantics:
|
|
"word1" "word2" → documents containing BOTH words anywhere (AND match ✅)
|
|
"word1 word2" → documents where word1 appears directly before word2 (phrase ❌)
|
|
|
|
Bug history: the 2026-03-31 fix used f'"{query}"' which wraps the entire string,
|
|
accidentally turning every multi-word query into a phrase search that almost never
|
|
matches. This helper fixes that by quoting each token independently.
|
|
"""
|
|
tokens = [t.strip('"\'') for t in query.split() if t.strip()]
|
|
return ' '.join(f'"{t}"' for t in tokens if t)
|
|
|
|
|
|
def search_chunks(user_id: str, query: str, limit: int = 10) -> list:
|
|
with db() as conn:
|
|
rows = conn.execute(
|
|
"""SELECT cc.id, cc.session_id, cc.role, cc.content,
|
|
cc.flag_reason, cc.created_at,
|
|
bm25(conversation_chunks_fts) AS rank
|
|
FROM conversation_chunks_fts
|
|
JOIN conversation_chunks cc ON cc.id = conversation_chunks_fts.rowid
|
|
WHERE conversation_chunks_fts MATCH ?
|
|
AND cc.user_id = ?
|
|
ORDER BY rank
|
|
LIMIT ?""",
|
|
(_fts_safe_query(query), user_id, limit),
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
def delete_chunks_before(user_id: str, cutoff_iso: str) -> int:
|
|
"""Delete Tier-3 chunks older than cutoff. Returns count deleted."""
|
|
with db() as conn:
|
|
# Collect IDs first — needed for FTS sync.
|
|
# conversation_chunks_fts is a STANDALONE FTS5 table (no content= option),
|
|
# so we must delete FTS rows explicitly by rowid. The old VALUES('rebuild')
|
|
# approach only works for content= backed tables and was a no-op here.
|
|
rows = conn.execute(
|
|
"SELECT id FROM conversation_chunks WHERE user_id=? AND created_at < ?",
|
|
(user_id, cutoff_iso),
|
|
).fetchall()
|
|
if not rows:
|
|
return 0
|
|
ids = [r[0] for r in rows]
|
|
# Delete FTS entries by rowid before removing from main table
|
|
placeholders = ",".join("?" * len(ids))
|
|
conn.execute(
|
|
f"DELETE FROM conversation_chunks_fts WHERE rowid IN ({placeholders})",
|
|
ids,
|
|
)
|
|
conn.execute(
|
|
"DELETE FROM conversation_chunks WHERE user_id=? AND created_at < ?",
|
|
(user_id, cutoff_iso),
|
|
)
|
|
return len(ids)
|
|
|
|
|
|
# ── FACTS ───────────────────────────────────────────────────────────────────────
|
|
|
|
def store_fact(
|
|
user_id: str,
|
|
category: str,
|
|
fact: str,
|
|
source_session: str = None,
|
|
confidence: float = 1.0,
|
|
) -> int:
|
|
with db() as conn:
|
|
cur = conn.execute(
|
|
"""INSERT INTO facts (user_id, category, fact, source_session, confidence)
|
|
VALUES (?,?,?,?,?)""",
|
|
(user_id, category, fact, source_session, confidence),
|
|
)
|
|
fact_id = cur.lastrowid
|
|
conn.execute(
|
|
"INSERT INTO facts_fts(rowid, fact, category) VALUES (?,?,?)",
|
|
(fact_id, fact, category),
|
|
)
|
|
return fact_id
|
|
|
|
|
|
def get_facts(user_id: str, category: str = None, include_deprecated: bool = False) -> list:
|
|
with db() as conn:
|
|
clauses = ["user_id=?"]
|
|
params: list = [user_id]
|
|
if category:
|
|
clauses.append("category=?")
|
|
params.append(category)
|
|
if not include_deprecated:
|
|
clauses.append("(deprecated IS NULL OR deprecated=0)")
|
|
where = " AND ".join(clauses)
|
|
rows = conn.execute(
|
|
f"SELECT * FROM facts WHERE {where} ORDER BY created_at DESC",
|
|
params,
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
def deprecate_fact(fact_id: int, user_id: str, reason: str = None) -> bool:
|
|
"""Mark a fact as deprecated. Returns True if a row was updated."""
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
with db() as conn:
|
|
row = conn.execute(
|
|
"SELECT id FROM facts WHERE id=? AND user_id=?", (fact_id, user_id)
|
|
).fetchone()
|
|
if not row:
|
|
return False
|
|
conn.execute(
|
|
"""UPDATE facts
|
|
SET deprecated=1, deprecation_reason=?, updated_at=?
|
|
WHERE id=?""",
|
|
(reason, now, fact_id),
|
|
)
|
|
# Remove from FTS so deprecated facts don't appear in search results
|
|
conn.execute("DELETE FROM facts_fts WHERE rowid=?", (fact_id,))
|
|
return True
|
|
|
|
|
|
def search_facts(user_id: str, query: str, limit: int = 10) -> list:
|
|
"""Full-text search across non-deprecated facts for a user."""
|
|
with db() as conn:
|
|
rows = conn.execute(
|
|
"""SELECT f.id, f.category, f.fact, f.confidence, f.created_at,
|
|
bm25(facts_fts) AS rank
|
|
FROM facts_fts
|
|
JOIN facts f ON f.id = facts_fts.rowid
|
|
WHERE facts_fts MATCH ?
|
|
AND f.user_id = ?
|
|
AND (f.deprecated IS NULL OR f.deprecated = 0)
|
|
ORDER BY rank
|
|
LIMIT ?""",
|
|
(_fts_safe_query(query), user_id, limit),
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
# ── THOUGHT JOURNAL ──────────────────────────────────────────────────────────────
|
|
|
|
def add_hypothesis(
|
|
user_id: str,
|
|
session_id: str,
|
|
hypothesis: str,
|
|
confidence: float = 0.7,
|
|
) -> int:
|
|
"""Record a new hypothesis. Returns the hypothesis id."""
|
|
with db() as conn:
|
|
cur = conn.execute(
|
|
"""INSERT INTO hypotheses (user_id, session_id, hypothesis, confidence)
|
|
VALUES (?, ?, ?, ?)""",
|
|
(user_id, session_id, hypothesis, confidence),
|
|
)
|
|
return cur.lastrowid
|
|
|
|
|
|
def resolve_hypothesis(
|
|
hypothesis_id: int,
|
|
user_id: str,
|
|
status: str,
|
|
resolution: str = None,
|
|
) -> bool:
|
|
"""Resolve a hypothesis. Returns True if updated, False if not found / wrong user."""
|
|
if status not in ("confirmed", "refuted", "abandoned"):
|
|
raise ValueError(f"Invalid status '{status}'. Must be confirmed, refuted, or abandoned.")
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
with db() as conn:
|
|
row = conn.execute(
|
|
"SELECT id FROM hypotheses WHERE id=? AND user_id=?",
|
|
(hypothesis_id, user_id),
|
|
).fetchone()
|
|
if not row:
|
|
return False
|
|
conn.execute(
|
|
"""UPDATE hypotheses
|
|
SET status=?, resolution=?, resolved_at=?
|
|
WHERE id=?""",
|
|
(status, resolution, now, hypothesis_id),
|
|
)
|
|
return True
|
|
|
|
|
|
def list_hypotheses(user_id: str, status: str = None) -> list:
|
|
"""Return hypotheses for a user, optionally filtered by status."""
|
|
with db() as conn:
|
|
if status:
|
|
rows = conn.execute(
|
|
"""SELECT * FROM hypotheses WHERE user_id=? AND status=?
|
|
ORDER BY created_at DESC""",
|
|
(user_id, status),
|
|
).fetchall()
|
|
else:
|
|
rows = conn.execute(
|
|
"SELECT * FROM hypotheses WHERE user_id=? ORDER BY created_at DESC",
|
|
(user_id,),
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
# ── UPGRADE REQUESTS ────────────────────────────────────────────────────────────────────────────────────
|
|
|
|
def add_upgrade_request(
|
|
user_id: str,
|
|
session_id: str,
|
|
description: str,
|
|
reason: str,
|
|
priority: str = "medium",
|
|
certainty: float = 0.7,
|
|
) -> int:
|
|
"""Record a new upgrade request. Returns the request id."""
|
|
with db() as conn:
|
|
cur = conn.execute(
|
|
"""INSERT INTO upgrade_requests
|
|
(user_id, session_id, description, reason, priority, certainty)
|
|
VALUES (?, ?, ?, ?, ?, ?)""",
|
|
(user_id, session_id, description, reason, priority, certainty),
|
|
)
|
|
return cur.lastrowid
|
|
|
|
|
|
def list_upgrade_requests(user_id: str, status: str = None) -> list:
|
|
"""Return upgrade requests for a user, optionally filtered by status."""
|
|
with db() as conn:
|
|
if status:
|
|
rows = conn.execute(
|
|
"""SELECT * FROM upgrade_requests WHERE user_id=? AND status=?
|
|
ORDER BY created_at DESC""",
|
|
(user_id, status),
|
|
).fetchall()
|
|
else:
|
|
rows = conn.execute(
|
|
"SELECT * FROM upgrade_requests WHERE user_id=? ORDER BY created_at DESC",
|
|
(user_id,),
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
def resolve_upgrade_request(
|
|
request_id: int,
|
|
user_id: str,
|
|
status: str,
|
|
resolution: str = None,
|
|
) -> bool:
|
|
"""Resolve an upgrade request. Returns True if updated, False if not found / wrong user."""
|
|
if status not in ("resolved", "rejected"):
|
|
raise ValueError(f"Invalid status '{status}'. Must be resolved or rejected.")
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
with db() as conn:
|
|
row = conn.execute(
|
|
"SELECT id FROM upgrade_requests WHERE id=? AND user_id=?",
|
|
(request_id, user_id),
|
|
).fetchone()
|
|
if not row:
|
|
return False
|
|
conn.execute(
|
|
"""UPDATE upgrade_requests
|
|
SET status=?, resolution=?, resolved_at=?
|
|
WHERE id=?""",
|
|
(status, resolution, now, request_id),
|
|
)
|
|
return True
|
|
|
|
|
|
# ── HEALTH CHECK ────────────────────────────────────────────────────────────────
|
|
|
|
def health_check(user_id: str, stale_days: int = 30) -> dict:
|
|
"""Diagnostic health check on BigMind memory for a user."""
|
|
from datetime import timedelta
|
|
cutoff = (datetime.now(timezone.utc) - timedelta(days=stale_days)).isoformat()
|
|
|
|
with db() as conn:
|
|
# Facts not updated since the cutoff
|
|
stale_rows = conn.execute(
|
|
"""SELECT id, category, fact, updated_at, confidence
|
|
FROM facts WHERE user_id=? AND updated_at < ?
|
|
ORDER BY updated_at""",
|
|
(user_id, cutoff),
|
|
).fetchall()
|
|
|
|
# Closed sessions with no Tier-2 narrative
|
|
sessions_no_summary = conn.execute(
|
|
"""SELECT COUNT(*) FROM sessions
|
|
WHERE user_id=? AND ended_at IS NOT NULL AND has_tier2=0""",
|
|
(user_id,),
|
|
).fetchone()[0]
|
|
|
|
# Sessions still open (ended_at IS NULL)
|
|
open_rows = conn.execute(
|
|
"SELECT id, started_at FROM sessions WHERE user_id=? AND ended_at IS NULL",
|
|
(user_id,),
|
|
).fetchall()
|
|
|
|
# FTS integrity: global count (FTS rowid = chunk id, no user_id column)
|
|
chunk_count = conn.execute(
|
|
"SELECT COUNT(*) FROM conversation_chunks"
|
|
).fetchone()[0]
|
|
fts_count = conn.execute(
|
|
"SELECT COUNT(*) FROM conversation_chunks_fts"
|
|
).fetchone()[0]
|
|
|
|
# Low confidence facts (< 0.8)
|
|
low_conf_rows = conn.execute(
|
|
"""SELECT id, category, fact, confidence
|
|
FROM facts WHERE user_id=? AND confidence < 0.8
|
|
ORDER BY confidence""",
|
|
(user_id,),
|
|
).fetchall()
|
|
|
|
return {
|
|
"stale_facts": [dict(r) for r in stale_rows],
|
|
"sessions_without_summary": sessions_no_summary,
|
|
"open_sessions": [dict(r) for r in open_rows],
|
|
"chunk_count": chunk_count,
|
|
"fts_row_count": fts_count,
|
|
"fts_in_sync": chunk_count == fts_count,
|
|
"low_confidence_facts": [dict(r) for r in low_conf_rows],
|
|
"stale_threshold_days": stale_days,
|
|
}
|
|
|
|
|
|
# ── EXPORT ───────────────────────────────────────────────────────────────────────
|
|
|
|
def export_memory(user_id: str, output_path: str = None) -> dict:
|
|
"""Export all memory for a user to a portable JSON file.
|
|
|
|
Exports ALL tables: facts, sessions (with Tier-2), conversation chunks,
|
|
people/contacts, hypotheses, token saves, and upgrade requests.
|
|
"""
|
|
import json
|
|
from pathlib import Path
|
|
from bigmind.db import SCHEMA_VERSION
|
|
|
|
if not output_path:
|
|
date_str = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
|
|
output_path = str(Path.home() / f"bigmind_export_{date_str}.json")
|
|
|
|
output = Path(output_path)
|
|
output.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
with db() as conn:
|
|
user_row = conn.execute(
|
|
"SELECT id, username, display_name, role, created_at, last_seen FROM users WHERE id=?",
|
|
(user_id,),
|
|
).fetchone()
|
|
user_info = dict(user_row) if user_row else {}
|
|
|
|
profile_row = conn.execute(
|
|
"SELECT * FROM identity_profile WHERE user_id=?", (user_id,)
|
|
).fetchone()
|
|
profile = dict(profile_row) if profile_row else {}
|
|
|
|
facts = [
|
|
dict(r) for r in conn.execute(
|
|
"SELECT * FROM facts WHERE user_id=? ORDER BY created_at", (user_id,)
|
|
).fetchall()
|
|
]
|
|
|
|
sessions = []
|
|
for s in conn.execute(
|
|
"SELECT * FROM sessions WHERE user_id=? ORDER BY started_at", (user_id,)
|
|
).fetchall():
|
|
sd = dict(s)
|
|
summary_row = conn.execute(
|
|
"SELECT * FROM session_summaries WHERE id=?", (s["id"],)
|
|
).fetchone()
|
|
sd["tier2_summary"] = dict(summary_row) if summary_row else None
|
|
sessions.append(sd)
|
|
|
|
chunks = [
|
|
dict(r) for r in conn.execute(
|
|
"SELECT * FROM conversation_chunks WHERE user_id=? ORDER BY created_at, seq",
|
|
(user_id,),
|
|
).fetchall()
|
|
]
|
|
|
|
# ── v3+ tables ───────────────────────────────────────────────────────
|
|
hypotheses = [
|
|
dict(r) for r in conn.execute(
|
|
"SELECT * FROM hypotheses WHERE user_id=? ORDER BY created_at",
|
|
(user_id,),
|
|
).fetchall()
|
|
]
|
|
|
|
upgrade_requests = [
|
|
dict(r) for r in conn.execute(
|
|
"SELECT * FROM upgrade_requests WHERE user_id=? ORDER BY created_at",
|
|
(user_id,),
|
|
).fetchall()
|
|
]
|
|
|
|
# ── v6+ tables ───────────────────────────────────────────────────────
|
|
token_saves = [
|
|
dict(r) for r in conn.execute(
|
|
"SELECT * FROM token_saves WHERE user_id=? ORDER BY created_at",
|
|
(user_id,),
|
|
).fetchall()
|
|
]
|
|
|
|
# ── v7+ tables ───────────────────────────────────────────────────────
|
|
people = [
|
|
dict(r) for r in conn.execute(
|
|
"SELECT * FROM people WHERE user_id=? ORDER BY created_at",
|
|
(user_id,),
|
|
).fetchall()
|
|
]
|
|
|
|
export_data = {
|
|
"export_date": datetime.now(timezone.utc).isoformat(),
|
|
"bigmind_schema_version": SCHEMA_VERSION,
|
|
"user": user_info,
|
|
"identity_profile": profile,
|
|
"facts": facts,
|
|
"sessions": sessions,
|
|
"conversation_chunks": chunks,
|
|
"hypotheses": hypotheses,
|
|
"upgrade_requests": upgrade_requests,
|
|
"token_saves": token_saves,
|
|
"people": people,
|
|
"stats": {
|
|
"facts_count": len(facts),
|
|
"sessions_count": len(sessions),
|
|
"chunks_count": len(chunks),
|
|
"hypotheses_count": len(hypotheses),
|
|
"upgrade_requests_count": len(upgrade_requests),
|
|
"token_saves_count": len(token_saves),
|
|
"people_count": len(people),
|
|
},
|
|
}
|
|
|
|
with open(output_path, "w", encoding="utf-8") as f:
|
|
json.dump(export_data, f, indent=2, default=str)
|
|
|
|
return {
|
|
"output_path": str(output_path),
|
|
"facts_count": len(facts),
|
|
"sessions_count": len(sessions),
|
|
"chunks_count": len(chunks),
|
|
"hypotheses_count": len(hypotheses),
|
|
"people_count": len(people),
|
|
"file_size_kb": round(output.stat().st_size / 1024, 1),
|
|
}
|
|
|
|
|
|
# ── STATS ───────────────────────────────────────────────────────────────────────
|
|
|
|
def get_stats(user_id: str) -> dict:
|
|
db_path = get_db_path()
|
|
with db() as conn:
|
|
sessions = conn.execute(
|
|
"SELECT COUNT(*) FROM sessions WHERE user_id=?", (user_id,)
|
|
).fetchone()[0]
|
|
facts = conn.execute(
|
|
"SELECT COUNT(*) FROM facts WHERE user_id=?", (user_id,)
|
|
).fetchone()[0]
|
|
chunks = conn.execute(
|
|
"SELECT COUNT(*) FROM conversation_chunks WHERE user_id=?", (user_id,)
|
|
).fetchone()[0]
|
|
global_cnt = conn.execute(
|
|
"SELECT COUNT(*) FROM global_knowledge WHERE status='approved'"
|
|
).fetchone()[0]
|
|
size = db_path.stat().st_size if db_path.exists() else 0
|
|
return {
|
|
"sessions": sessions,
|
|
"facts": facts,
|
|
"chunks": chunks,
|
|
"global_knowledge_entries": global_cnt,
|
|
"db_size_bytes": size,
|
|
"db_size_kb": round(size / 1024, 1),
|
|
"db_path": str(db_path),
|
|
}
|
|
|
|
|
|
# ── PEOPLE / CONTACTS ────────────────────────────────────────────────────────
|
|
|
|
def upsert_person(
|
|
user_id: str,
|
|
username: str,
|
|
display_name: str = None,
|
|
role: str = None,
|
|
team: str = None,
|
|
notes: str = None,
|
|
bigmind_user: str = None,
|
|
bigmind_url: str = None,
|
|
) -> int:
|
|
"""Insert or update a person in the contacts directory. Returns the row id."""
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
with db() as conn:
|
|
existing = conn.execute(
|
|
"SELECT id FROM people WHERE user_id=? AND username=?",
|
|
(user_id, username),
|
|
).fetchone()
|
|
|
|
if existing:
|
|
person_id = existing["id"]
|
|
# Build dynamic UPDATE — only overwrite non-None fields
|
|
updates = {"last_mentioned_at": now}
|
|
for field, val in [
|
|
("display_name", display_name), ("role", role), ("team", team),
|
|
("notes", notes), ("bigmind_user", bigmind_user), ("bigmind_url", bigmind_url),
|
|
]:
|
|
if val is not None:
|
|
updates[field] = val
|
|
set_clause = ", ".join(f"{k}=?" for k in updates)
|
|
conn.execute(
|
|
f"UPDATE people SET {set_clause} WHERE id=?",
|
|
(*updates.values(), person_id),
|
|
)
|
|
# Refresh FTS
|
|
conn.execute("DELETE FROM people_fts WHERE rowid=?", (person_id,))
|
|
row = conn.execute("SELECT * FROM people WHERE id=?", (person_id,)).fetchone()
|
|
else:
|
|
cur = conn.execute(
|
|
"""INSERT INTO people
|
|
(user_id, username, display_name, role, team, notes,
|
|
bigmind_user, bigmind_url, last_mentioned_at)
|
|
VALUES (?,?,?,?,?,?,?,?,?)""",
|
|
(user_id, username, display_name, role, team, notes,
|
|
bigmind_user, bigmind_url, now),
|
|
)
|
|
person_id = cur.lastrowid
|
|
row = conn.execute("SELECT * FROM people WHERE id=?", (person_id,)).fetchone()
|
|
|
|
conn.execute(
|
|
"INSERT INTO people_fts(rowid, username, display_name, role, team, notes) "
|
|
"VALUES (?,?,?,?,?,?)",
|
|
(person_id, row["username"], row["display_name"] or "",
|
|
row["role"] or "", row["team"] or "", row["notes"] or ""),
|
|
)
|
|
return person_id
|
|
|
|
|
|
def recall_person(user_id: str, query: str, limit: int = 10) -> list:
|
|
"""Full-text search across the people directory."""
|
|
with db() as conn:
|
|
rows = conn.execute(
|
|
"""SELECT p.*, bm25(people_fts) AS rank
|
|
FROM people_fts
|
|
JOIN people p ON p.id = people_fts.rowid
|
|
WHERE people_fts MATCH ?
|
|
AND p.user_id = ?
|
|
ORDER BY rank
|
|
LIMIT ?""",
|
|
(_fts_safe_query(query), user_id, limit),
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
def list_people(user_id: str) -> list:
|
|
"""Return all contacts for a user, ordered by last_mentioned_at."""
|
|
with db() as conn:
|
|
rows = conn.execute(
|
|
"SELECT * FROM people WHERE user_id=? ORDER BY last_mentioned_at DESC",
|
|
(user_id,),
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
def link_ai(user_id: str, username: str, bigmind_user: str, bigmind_url: str = None) -> bool:
|
|
"""Link a contact to their BigMind AI instance. Returns True if the person was found."""
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
with db() as conn:
|
|
row = conn.execute(
|
|
"SELECT id FROM people WHERE user_id=? AND username=?",
|
|
(user_id, username),
|
|
).fetchone()
|
|
if not row:
|
|
return False
|
|
conn.execute(
|
|
"UPDATE people SET bigmind_user=?, bigmind_url=?, last_mentioned_at=? WHERE id=?",
|
|
(bigmind_user, bigmind_url, now, row["id"]),
|
|
)
|
|
return True
|