Initial commit: pi_mcps monorepo with BigMind MCP server

This commit is contained in:
Patrick Plate
2026-04-03 13:37:45 +02:00
commit 6623fe0337
33 changed files with 10314 additions and 0 deletions
View File
+21
View File
@@ -0,0 +1,21 @@
import os
import sys
import pytest
# Add project root and src/ to path so bigmind and server tools are importable
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "src")))
@pytest.fixture(autouse=True)
def temp_db(tmp_path, monkeypatch):
"""Redirect every test to a fresh temporary database."""
db_file = tmp_path / "test_memory.db"
monkeypatch.setenv("BIGMIND_DB_PATH", str(db_file))
monkeypatch.setenv("BIGMIND_USER", "testuser")
# Re-initialise DB with the new path
from bigmind.db import init_db
init_db()
yield db_file
+87
View File
@@ -0,0 +1,87 @@
"""Tests for context_builder — the bootstrapped markdown output."""
import pytest
from bigmind import memory_store
from bigmind.context_builder import build_context, _format_date
@pytest.fixture
def user():
return memory_store.get_or_create_user("ctxuser", "Context User")
class TestFormatDate:
def test_valid_iso_date(self):
assert _format_date("2026-03-30T09:15:00+00:00") == "2026-03-30"
def test_z_suffix(self):
assert _format_date("2026-03-30T00:00:00Z") == "2026-03-30"
def test_none_returns_dash(self):
assert _format_date(None) == ""
def test_empty_string_returns_dash(self):
assert _format_date("") == ""
class TestBuildContext:
def test_returns_string(self, temp_db, user):
output = build_context(user["id"])
assert isinstance(output, str)
assert len(output) > 0
def test_contains_bigmind_header(self, temp_db, user):
output = build_context(user["id"])
assert "🧠 BigMind Context" in output
def test_no_profile_shows_placeholder(self, temp_db, user):
output = build_context(user["id"])
assert "memory_update_profile" in output
def test_profile_shown_when_set(self, temp_db, user):
memory_store.upsert_identity_profile(
user["id"],
role="Principal Engineer",
preferences="Python first",
pinned_facts="- Uses uv for packages",
)
output = build_context(user["id"])
assert "Principal Engineer" in output
assert "Python first" in output
assert "Uses uv for packages" in output
def test_no_sessions_shows_placeholder(self, temp_db, user):
output = build_context(user["id"])
assert "No past sessions yet" in output
def test_closed_session_appears_in_index(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.close_session(
sid, "Implemented BigMind Phase 1",
topics="mcp,sqlite", outcome="All files created"
)
output = build_context(user["id"])
assert "Implemented BigMind Phase 1" in output
assert "mcp,sqlite" in output
def test_open_session_shown_as_in_progress(self, temp_db, user):
memory_store.create_session(user["id"])
output = build_context(user["id"])
# open session MUST appear — marked as [in progress] for parallel IDE visibility
assert "in progress" in output
# but "No past sessions yet" placeholder must NOT appear
assert "No past sessions yet" not in output
def test_tier2_hint_shown_for_sessions_with_summary(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.close_session(sid, "Session with summary")
memory_store.save_session_summary(sid, "Full narrative here")
output = build_context(user["id"])
assert "📄" in output
def test_respects_n_sessions_limit(self, temp_db, user):
for i in range(15):
sid = memory_store.create_session(user["id"])
memory_store.close_session(sid, f"Session {i}")
output = build_context(user["id"], n_sessions=5)
assert "last 5" in output
+227
View File
@@ -0,0 +1,227 @@
"""Tests for database initialisation."""
import sqlite3
import pytest
from bigmind.db import get_db_path, get_connection, init_db, _migrate_v1_to_v2, _migrate_v2_to_v3, _migrate_v3_to_v4, _migrate_v4_to_v5
class TestDbInit:
def test_db_file_created(self, temp_db):
assert temp_db.exists()
def test_schema_version_is_7(self, temp_db):
conn = get_connection()
row = conn.execute("SELECT version FROM schema_version").fetchone()
conn.close()
assert row is not None
assert row["version"] == 7
def test_all_tables_exist(self, temp_db):
expected = {
"users", "identity_profile", "sessions",
"session_summaries", "conversation_chunks", "facts",
"global_knowledge", "hypotheses", "upgrade_requests",
}
conn = get_connection()
rows = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
).fetchall()
conn.close()
found = {r["name"] for r in rows}
assert expected.issubset(found)
def test_facts_fts_table_exists(self, temp_db):
conn = get_connection()
rows = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='facts_fts'"
).fetchall()
conn.close()
assert len(rows) == 1
def test_facts_has_deprecated_columns(self, temp_db):
conn = get_connection()
rows = conn.execute("PRAGMA table_info(facts)").fetchall()
conn.close()
col_names = {r["name"] for r in rows}
assert "deprecated" in col_names
assert "deprecation_reason" in col_names
def test_hypotheses_has_correct_columns(self, temp_db):
conn = get_connection()
rows = conn.execute("PRAGMA table_info(hypotheses)").fetchall()
conn.close()
col_names = {r["name"] for r in rows}
for col in ("id", "session_id", "user_id", "hypothesis",
"confidence", "status", "resolution",
"created_at", "resolved_at"):
assert col in col_names, f"Missing column: {col}"
def test_init_is_idempotent(self, temp_db):
"""Calling init_db() twice must not raise."""
init_db()
init_db()
def test_db_path_override_via_env(self, temp_db):
assert str(get_db_path()) == str(temp_db)
class TestMigrationV1ToV2:
def test_migration_adds_columns_to_existing_table(self, temp_db):
"""Simulate a v1 DB: facts table without deprecated columns."""
conn = get_connection()
conn.execute("DROP TABLE IF EXISTS facts")
conn.execute("""CREATE TABLE facts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
category TEXT NOT NULL,
fact TEXT NOT NULL,
source_session TEXT,
confidence REAL DEFAULT 1.0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
)""")
conn.commit()
rows = conn.execute("PRAGMA table_info(facts)").fetchall()
assert "deprecated" not in {r["name"] for r in rows}
_migrate_v1_to_v2(conn)
conn.commit()
rows = conn.execute("PRAGMA table_info(facts)").fetchall()
conn.close()
col_names = {r["name"] for r in rows}
assert "deprecated" in col_names
assert "deprecation_reason" in col_names
def test_migration_is_idempotent(self, temp_db):
"""Running v1→v2 twice must not raise."""
conn = get_connection()
_migrate_v1_to_v2(conn)
conn.commit()
conn.close()
class TestMigrationV4ToV5:
def test_migration_creates_facts_fts(self, temp_db):
conn = get_connection()
conn.execute("DROP TABLE IF EXISTS facts_fts")
conn.commit()
_migrate_v4_to_v5(conn)
conn.commit()
rows = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='facts_fts'"
).fetchall()
conn.close()
assert len(rows) == 1
def test_migration_backfills_existing_facts(self, temp_db):
from bigmind import memory_store
user = memory_store.get_or_create_user("backfill_user")
# Insert a fact directly (bypassing FTS sync) to simulate pre-v5 data
conn = get_connection()
conn.execute(
"INSERT INTO facts (user_id, category, fact) VALUES (?,?,?)",
(user["id"], "codebase", "pre-migration fact about SQLite")
)
conn.execute("DROP TABLE IF EXISTS facts_fts")
conn.commit()
_migrate_v4_to_v5(conn)
conn.commit()
conn.close()
results = memory_store.search_facts(user["id"], "SQLite")
assert len(results) == 1
def test_migration_is_idempotent(self, temp_db):
conn = get_connection()
_migrate_v4_to_v5(conn)
conn.commit()
conn.close()
class TestMigrationV3ToV4:
def test_migration_creates_upgrade_requests_table(self, temp_db):
conn = get_connection()
conn.execute("DROP TABLE IF EXISTS upgrade_requests")
conn.commit()
rows = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='upgrade_requests'"
).fetchall()
assert len(rows) == 0
_migrate_v3_to_v4(conn)
conn.commit()
rows = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='upgrade_requests'"
).fetchall()
conn.close()
assert len(rows) == 1
def test_migration_creates_index(self, temp_db):
conn = get_connection()
conn.execute("DROP INDEX IF EXISTS idx_upgrade_requests_user_status")
conn.execute("DROP TABLE IF EXISTS upgrade_requests")
conn.commit()
_migrate_v3_to_v4(conn)
conn.commit()
rows = conn.execute(
"SELECT name FROM sqlite_master WHERE type='index' "
"AND name='idx_upgrade_requests_user_status'"
).fetchall()
conn.close()
assert len(rows) == 1
def test_migration_is_idempotent(self, temp_db):
conn = get_connection()
_migrate_v3_to_v4(conn)
conn.commit()
conn.close()
def test_migration_creates_hypotheses_table(self, temp_db):
"""Drop hypotheses and re-run migration — table must reappear."""
conn = get_connection()
conn.execute("DROP TABLE IF EXISTS hypotheses")
conn.commit()
rows = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='hypotheses'"
).fetchall()
assert len(rows) == 0
_migrate_v2_to_v3(conn)
conn.commit()
rows = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='hypotheses'"
).fetchall()
conn.close()
assert len(rows) == 1
def test_migration_creates_index(self, temp_db):
conn = get_connection()
conn.execute("DROP INDEX IF EXISTS idx_hypotheses_user_status")
conn.execute("DROP TABLE IF EXISTS hypotheses")
conn.commit()
_migrate_v2_to_v3(conn)
conn.commit()
rows = conn.execute(
"SELECT name FROM sqlite_master WHERE type='index' AND name='idx_hypotheses_user_status'"
).fetchall()
conn.close()
assert len(rows) == 1
def test_migration_is_idempotent(self, temp_db):
"""Running v2→v3 twice must not raise."""
conn = get_connection()
_migrate_v2_to_v3(conn)
conn.commit()
conn.close()
def test_status_check_constraint_enforced(self, temp_db):
"""Inserting a bad status value must raise an integrity error."""
conn = get_connection()
user_row = conn.execute("SELECT id FROM users LIMIT 1").fetchone()
if not user_row:
conn.execute("INSERT INTO users (id, username) VALUES ('u1', 'testuser')")
conn.commit()
with pytest.raises(sqlite3.IntegrityError):
conn.execute(
"""INSERT INTO hypotheses (user_id, hypothesis, status)
VALUES ('u1', 'bad status test', 'invalid_status')"""
)
conn.commit()
conn.close()
@@ -0,0 +1,274 @@
"""Tests for Feature 7 — Live Session Awareness.
Covers:
- announce_focus() — atomic conflict detection, focus writes
- get_active_sessions() — idle_minutes, field values, exclusion of closed
- close_session() — must NULL focus columns
- Schema v6 — sessions focus columns + token_saves table
- log_token_save() — insert, accumulate, stats
- get_token_efficiency_stats() — totals, session filter, best, by_method
"""
import json
import pytest
from bigmind import memory_store
from bigmind.db import db
@pytest.fixture
def user(temp_db):
return memory_store.get_or_create_user("testuser", "Test User")
# ── announce_focus ─────────────────────────────────────────────────────────────
class TestAnnounceFocus:
def test_sets_focus_and_files(self, temp_db, user):
sid = memory_store.create_session(user["id"])
result = memory_store.announce_focus(sid, "Working on db.py", ["bigmind/db.py"])
assert result["updated"] is True
assert result["conflicts"] == []
sessions = memory_store.get_active_sessions(user["id"])
assert len(sessions) == 1
assert sessions[0]["focus"] == "Working on db.py"
assert "bigmind/db.py" in sessions[0]["files"]
def test_sets_ide_hint(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.announce_focus(sid, "task", ["file.py"], ide_hint="PyCharm")
sessions = memory_store.get_active_sessions(user["id"])
assert sessions[0]["ide_hint"] == "PyCharm"
def test_detects_file_conflict(self, temp_db, user):
sid1 = memory_store.create_session(user["id"])
sid2 = memory_store.create_session(user["id"])
memory_store.announce_focus(sid1, "Working on server.py", ["src/server.py"])
result = memory_store.announce_focus(sid2, "Also server.py", ["src/server.py"])
assert len(result["conflicts"]) == 1
assert result["conflicts"][0]["session_id"] == sid1[:8]
assert "src/server.py" in result["conflicts"][0]["overlapping_files"]
def test_no_conflict_for_different_files(self, temp_db, user):
sid1 = memory_store.create_session(user["id"])
sid2 = memory_store.create_session(user["id"])
memory_store.announce_focus(sid1, "Editing db.py", ["bigmind/db.py"])
result = memory_store.announce_focus(sid2, "Editing server.py", ["src/server.py"])
assert result["conflicts"] == []
def test_second_call_overwrites_focus(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.announce_focus(sid, "First task", ["a.py"])
memory_store.announce_focus(sid, "Second task", ["b.py"])
sessions = memory_store.get_active_sessions(user["id"])
assert sessions[0]["focus"] == "Second task"
assert "b.py" in sessions[0]["files"]
assert "a.py" not in sessions[0]["files"]
def test_empty_files_list_no_conflict(self, temp_db, user):
sid1 = memory_store.create_session(user["id"])
sid2 = memory_store.create_session(user["id"])
memory_store.announce_focus(sid1, "Task A", [])
result = memory_store.announce_focus(sid2, "Task B", [])
assert result["conflicts"] == []
def test_own_session_not_a_conflict(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.announce_focus(sid, "First focus", ["server.py"])
result = memory_store.announce_focus(sid, "Updated focus", ["server.py"])
assert result["conflicts"] == []
def test_conflict_lists_overlapping_files_only(self, temp_db, user):
sid1 = memory_store.create_session(user["id"])
sid2 = memory_store.create_session(user["id"])
memory_store.announce_focus(sid1, "Multiple files", ["a.py", "b.py", "c.py"])
result = memory_store.announce_focus(sid2, "Partial overlap", ["b.py", "z.py"])
assert len(result["conflicts"]) == 1
assert result["conflicts"][0]["overlapping_files"] == ["b.py"]
def test_conflict_not_raised_for_closed_session(self, temp_db, user):
sid1 = memory_store.create_session(user["id"])
memory_store.announce_focus(sid1, "Old work", ["x.py"])
memory_store.close_session(sid1, "done")
sid2 = memory_store.create_session(user["id"])
result = memory_store.announce_focus(sid2, "New work", ["x.py"])
assert result["conflicts"] == []
# ── get_active_sessions ────────────────────────────────────────────────────────
class TestGetActiveSessions:
def test_returns_all_open_sessions(self, temp_db, user):
sid1 = memory_store.create_session(user["id"])
sid2 = memory_store.create_session(user["id"])
sessions = memory_store.get_active_sessions(user["id"])
ids = [s["session_id"] for s in sessions]
assert sid1 in ids
assert sid2 in ids
def test_excludes_closed_sessions(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.close_session(sid, "done")
sessions = memory_store.get_active_sessions(user["id"])
ids = [s["session_id"] for s in sessions]
assert sid not in ids
def test_idle_minutes_non_negative(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.announce_focus(sid, "task", [])
sessions = memory_store.get_active_sessions(user["id"])
assert sessions[0]["idle_minutes"] is not None
assert sessions[0]["idle_minutes"] >= 0
def test_focus_null_when_not_announced(self, temp_db, user):
sid = memory_store.create_session(user["id"])
sessions = memory_store.get_active_sessions(user["id"])
s = next(x for x in sessions if x["session_id"] == sid)
assert s["focus"] is None
assert s["files"] == []
assert s["ide_hint"] is None
def test_focus_reflects_announce(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.announce_focus(sid, "Live task", ["db.py"], ide_hint="VS Code")
sessions = memory_store.get_active_sessions(user["id"])
s = next(x for x in sessions if x["session_id"] == sid)
assert s["focus"] == "Live task"
assert "db.py" in s["files"]
assert s["ide_hint"] == "VS Code"
def test_empty_when_no_open_sessions(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.close_session(sid, "done")
sessions = memory_store.get_active_sessions(user["id"])
assert sessions == []
# ── close_session clears focus ────────────────────────────────────────────────
class TestCloseClearsFocus:
def test_focus_columns_nulled_on_close(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.announce_focus(sid, "some work", ["x.py"], ide_hint="PyCharm")
# Verify focus was set
with db() as conn:
row = conn.execute(
"SELECT current_focus, focus_files, focus_updated_at FROM sessions WHERE id=?",
(sid,),
).fetchone()
assert row["current_focus"] == "some work"
memory_store.close_session(sid, "finished", topics="test")
# All focus columns must be NULL after close
with db() as conn:
row = conn.execute(
"SELECT current_focus, focus_files, focus_updated_at FROM sessions WHERE id=?",
(sid,),
).fetchone()
assert row["current_focus"] is None
assert row["focus_files"] is None
assert row["focus_updated_at"] is None
# ── Schema v6 ─────────────────────────────────────────────────────────────────
class TestSchemaV6:
def test_sessions_have_focus_columns(self, temp_db, user):
sid = memory_store.create_session(user["id"])
with db() as conn:
row = conn.execute("SELECT * FROM sessions WHERE id=?", (sid,)).fetchone()
col_names = row.keys()
assert "current_focus" in col_names
assert "focus_files" in col_names
assert "focus_updated_at" in col_names
assert "ide_hint" in col_names
def test_token_saves_table_exists(self, temp_db):
with db() as conn:
count = conn.execute("SELECT COUNT(*) FROM token_saves").fetchone()[0]
assert count == 0 # table exists, just empty
def test_schema_version_is_7(self, temp_db):
with db() as conn:
version = conn.execute(
"SELECT version FROM schema_version"
).fetchone()["version"]
assert version == 7
# ── Token Efficiency Tracker (Feature 6) ──────────────────────────────────────
class TestTokenSaves:
def test_log_returns_row_id(self, temp_db, user):
sid = memory_store.create_session(user["id"])
row_id = memory_store.log_token_save(
sid, user["id"], "grep instead of read", 50_000
)
assert isinstance(row_id, int) and row_id > 0
def test_total_accumulates(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.log_token_save(sid, user["id"], "save 1", 10_000, method_used="grep")
memory_store.log_token_save(sid, user["id"], "save 2", 20_000, method_used="memory_hit")
stats = memory_store.get_token_efficiency_stats(user["id"])
assert stats["total_tokens_saved"] == 30_000
def test_session_total_correct(self, temp_db, user):
sid1 = memory_store.create_session(user["id"])
sid2 = memory_store.create_session(user["id"])
memory_store.log_token_save(sid1, user["id"], "s1 save", 5_000)
memory_store.log_token_save(sid2, user["id"], "s2 save", 8_000)
stats = memory_store.get_token_efficiency_stats(user["id"], session_id=sid1)
assert stats["session_tokens_saved"] == 5_000
def test_best_save_returns_largest(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.log_token_save(sid, user["id"], "small", 1_000)
memory_store.log_token_save(sid, user["id"], "big one", 999_000)
memory_store.log_token_save(sid, user["id"], "medium", 50_000)
stats = memory_store.get_token_efficiency_stats(user["id"])
assert stats["best_save"]["tokens_saved_estimate"] == 999_000
assert stats["best_save"]["description"] == "big one"
def test_by_method_aggregation(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.log_token_save(sid, user["id"], "g1", 10_000, method_used="grep")
memory_store.log_token_save(sid, user["id"], "g2", 10_000, method_used="grep")
memory_store.log_token_save(sid, user["id"], "m1", 5_000, method_used="memory_hit")
stats = memory_store.get_token_efficiency_stats(user["id"])
by_method = {r["method_used"]: r["total"] for r in stats["by_method"]}
assert by_method["grep"] == 20_000
assert by_method["memory_hit"] == 5_000
def test_empty_stats_when_no_saves(self, temp_db, user):
stats = memory_store.get_token_efficiency_stats(user["id"])
assert stats["total_tokens_saved"] == 0
assert stats["best_save"] is None
assert stats["recent_saves"] == []
def test_method_stored_correctly(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.log_token_save(
sid, user["id"], "tail log file", 200_000, method_used="tail"
)
stats = memory_store.get_token_efficiency_stats(user["id"])
assert stats["best_save"]["method_used"] == "tail"
+756
View File
@@ -0,0 +1,756 @@
"""Tests for memory_store CRUD operations."""
import json
import pytest
from datetime import datetime, timezone, timedelta
from pathlib import Path
from bigmind import memory_store
from bigmind.db import db
@pytest.fixture
def user():
return memory_store.get_or_create_user("testuser", "Test User")
class TestUsers:
def test_create_user(self, temp_db):
u = memory_store.get_or_create_user("alice")
assert u["username"] == "alice"
assert u["id"]
def test_get_existing_user(self, temp_db):
u1 = memory_store.get_or_create_user("bob")
u2 = memory_store.get_or_create_user("bob")
assert u1["id"] == u2["id"]
def test_get_current_username_from_env(self, monkeypatch):
monkeypatch.setenv("BIGMIND_USER", "envuser")
assert memory_store.get_current_username() == "envuser"
class TestIdentityProfile:
def test_upsert_creates_profile(self, temp_db, user):
profile = memory_store.upsert_identity_profile(
user["id"], role="Engineer", preferences="Python first"
)
assert profile["role"] == "Engineer"
assert profile["preferences"] == "Python first"
def test_upsert_partial_update_preserves_other_fields(self, temp_db, user):
memory_store.upsert_identity_profile(
user["id"], role="Eng", preferences="Python", pinned_facts="- fact 1"
)
# Only update role — preferences and pinned_facts should be preserved
memory_store.upsert_identity_profile(user["id"], role="Senior Eng")
profile = memory_store.get_identity_profile(user["id"])
assert profile["role"] == "Senior Eng"
assert profile["preferences"] == "Python"
assert "fact 1" in profile["pinned_facts"]
def test_get_missing_profile_returns_none(self, temp_db):
assert memory_store.get_identity_profile("nonexistent-id") is None
class TestSessions:
def test_create_session_returns_id(self, temp_db, user):
sid = memory_store.create_session(user["id"])
assert sid and len(sid) == 36 # UUID format
def test_close_session(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.close_session(sid, "Built a thing", topics="mcp", outcome="Done")
sessions = memory_store.get_recent_sessions(user["id"])
assert len(sessions) == 1
assert sessions[0]["one_liner"] == "Built a thing"
assert sessions[0]["topics"] == "mcp"
def test_one_liner_truncated_at_120_chars(self, temp_db, user):
sid = memory_store.create_session(user["id"])
long_title = "x" * 200
memory_store.close_session(sid, long_title)
sessions = memory_store.get_recent_sessions(user["id"])
assert len(sessions[0]["one_liner"]) == 120
def test_open_session_not_in_recent(self, temp_db, user):
memory_store.create_session(user["id"])
# Open sessions (no ended_at) must NOT appear in the recent list
assert memory_store.get_recent_sessions(user["id"]) == []
def test_save_session_summary(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.close_session(sid, "Headline")
memory_store.save_session_summary(sid, "Detailed summary", key_facts="- fact")
detail = memory_store.get_session_detail(sid)
assert detail["summary"] == "Detailed summary"
assert detail["key_facts"] == "- fact"
# has_tier2 flag must be set
sessions = memory_store.get_recent_sessions(user["id"])
assert sessions[0]["has_tier2"] == 1
def test_get_open_sessions(self, temp_db, user):
sid = memory_store.create_session(user["id"])
open_s = memory_store.get_open_sessions(user["id"])
assert any(s["id"] == sid for s in open_s)
class TestChunks:
def test_append_and_search_chunks(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.append_chunk(
sid, user["id"], "assistant",
"We decided to use SQLite for the database",
"architectural decision"
)
results = memory_store.search_chunks(user["id"], "SQLite")
assert len(results) == 1
assert "SQLite" in results[0]["content"]
def test_chunk_seq_increments(self, temp_db, user):
sid = memory_store.create_session(user["id"])
id1 = memory_store.append_chunk(sid, user["id"], "user", "first")
id2 = memory_store.append_chunk(sid, user["id"], "assistant", "second")
assert id2 > id1
def test_search_isolated_by_user(self, temp_db):
u1 = memory_store.get_or_create_user("user_a")
u2 = memory_store.get_or_create_user("user_b")
sid1 = memory_store.create_session(u1["id"])
memory_store.append_chunk(sid1, u1["id"], "user", "secret data")
# u2 should not see u1's chunks
assert memory_store.search_chunks(u2["id"], "secret") == []
class TestFacts:
def test_store_and_retrieve_fact(self, temp_db, user):
fid = memory_store.store_fact(user["id"], "preference", "Prefers dark mode")
facts = memory_store.get_facts(user["id"])
assert any(f["id"] == fid for f in facts)
def test_filter_by_category(self, temp_db, user):
memory_store.store_fact(user["id"], "preference", "Python first")
memory_store.store_fact(user["id"], "decision", "Use SQLite")
prefs = memory_store.get_facts(user["id"], category="preference")
assert all(f["category"] == "preference" for f in prefs)
class TestFactDeprecation:
def test_deprecate_returns_true_on_success(self, temp_db, user):
fid = memory_store.store_fact(user["id"], "codebase", "Old stack note")
result = memory_store.deprecate_fact(fid, user["id"], "No longer applicable")
assert result is True
def test_deprecate_hides_fact_from_get_facts(self, temp_db, user):
fid = memory_store.store_fact(user["id"], "codebase", "Deprecated technology")
memory_store.deprecate_fact(fid, user["id"])
facts = memory_store.get_facts(user["id"])
assert not any(f["id"] == fid for f in facts)
def test_deprecated_fact_visible_with_include_deprecated(self, temp_db, user):
fid = memory_store.store_fact(user["id"], "codebase", "Soft-deleted fact")
memory_store.deprecate_fact(fid, user["id"], "outdated")
all_facts = memory_store.get_facts(user["id"], include_deprecated=True)
assert any(f["id"] == fid for f in all_facts)
def test_deprecation_reason_stored(self, temp_db, user):
fid = memory_store.store_fact(user["id"], "decision", "Use Gradle")
memory_store.deprecate_fact(fid, user["id"], "Switched to Maven")
all_facts = memory_store.get_facts(user["id"], include_deprecated=True)
deprecated = next(f for f in all_facts if f["id"] == fid)
assert deprecated["deprecation_reason"] == "Switched to Maven"
assert deprecated["deprecated"] == 1
def test_deprecate_no_reason_still_works(self, temp_db, user):
fid = memory_store.store_fact(user["id"], "preference", "Some preference")
result = memory_store.deprecate_fact(fid, user["id"])
assert result is True
facts = memory_store.get_facts(user["id"])
assert not any(f["id"] == fid for f in facts)
def test_deprecate_unknown_id_returns_false(self, temp_db, user):
result = memory_store.deprecate_fact(99999, user["id"])
assert result is False
def test_deprecate_wrong_user_returns_false(self, temp_db):
user_a = memory_store.get_or_create_user("user_a")
user_b = memory_store.get_or_create_user("user_b")
fid = memory_store.store_fact(user_a["id"], "codebase", "User A's fact")
result = memory_store.deprecate_fact(fid, user_b["id"], "Should not work")
assert result is False
# Fact must still be visible for user_a
facts = memory_store.get_facts(user_a["id"])
assert any(f["id"] == fid for f in facts)
def test_non_deprecated_facts_still_returned_by_default(self, temp_db, user):
fid_keep = memory_store.store_fact(user["id"], "preference", "Keep this one")
fid_drop = memory_store.store_fact(user["id"], "preference", "Drop this one")
memory_store.deprecate_fact(fid_drop, user["id"])
facts = memory_store.get_facts(user["id"])
ids = [f["id"] for f in facts]
assert fid_keep in ids
assert fid_drop not in ids
class TestStats:
def test_stats_returns_expected_keys(self, temp_db, user):
stats = memory_store.get_stats(user["id"])
for key in ("sessions", "facts", "chunks", "db_size_kb", "db_path"):
assert key in stats
class TestHypotheses:
def test_add_hypothesis_returns_id(self, temp_db, user):
sid = memory_store.create_session(user["id"])
hid = memory_store.add_hypothesis(user["id"], sid, "I believe X because Y")
assert isinstance(hid, int)
assert hid > 0
def test_add_hypothesis_default_confidence(self, temp_db, user):
sid = memory_store.create_session(user["id"])
hid = memory_store.add_hypothesis(user["id"], sid, "Default confidence belief")
results = memory_store.list_hypotheses(user["id"])
assert results[0]["confidence"] == 0.7
def test_add_hypothesis_custom_confidence(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.add_hypothesis(user["id"], sid, "Strong belief", confidence=0.95)
results = memory_store.list_hypotheses(user["id"])
assert results[0]["confidence"] == 0.95
def test_new_hypothesis_status_is_open(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.add_hypothesis(user["id"], sid, "Fresh thought")
results = memory_store.list_hypotheses(user["id"])
assert results[0]["status"] == "open"
def test_list_hypotheses_returns_all(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.add_hypothesis(user["id"], sid, "Thought one")
memory_store.add_hypothesis(user["id"], sid, "Thought two")
results = memory_store.list_hypotheses(user["id"])
assert len(results) == 2
def test_list_hypotheses_filter_by_status(self, temp_db, user):
sid = memory_store.create_session(user["id"])
hid = memory_store.add_hypothesis(user["id"], sid, "Will be confirmed")
memory_store.add_hypothesis(user["id"], sid, "Stays open")
memory_store.resolve_hypothesis(hid, user["id"], "confirmed", "It was true")
open_list = memory_store.list_hypotheses(user["id"], status="open")
confirmed_list = memory_store.list_hypotheses(user["id"], status="confirmed")
assert len(open_list) == 1
assert open_list[0]["hypothesis"] == "Stays open"
assert len(confirmed_list) == 1
assert confirmed_list[0]["hypothesis"] == "Will be confirmed"
def test_resolve_confirmed(self, temp_db, user):
sid = memory_store.create_session(user["id"])
hid = memory_store.add_hypothesis(user["id"], sid, "Bug is in serializer")
result = memory_store.resolve_hypothesis(
hid, user["id"], "confirmed", "Confirmed — the serializer had a null check missing"
)
assert result is True
resolved = memory_store.list_hypotheses(user["id"], status="confirmed")
assert len(resolved) == 1
assert resolved[0]["resolution"] == "Confirmed — the serializer had a null check missing"
assert resolved[0]["resolved_at"] is not None
def test_resolve_refuted(self, temp_db, user):
sid = memory_store.create_session(user["id"])
hid = memory_store.add_hypothesis(user["id"], sid, "Problem is in the network")
memory_store.resolve_hypothesis(hid, user["id"], "refuted", "Was actually a race condition")
results = memory_store.list_hypotheses(user["id"], status="refuted")
assert results[0]["status"] == "refuted"
def test_resolve_abandoned(self, temp_db, user):
sid = memory_store.create_session(user["id"])
hid = memory_store.add_hypothesis(user["id"], sid, "Might be a cache issue")
memory_store.resolve_hypothesis(hid, user["id"], "abandoned")
results = memory_store.list_hypotheses(user["id"], status="abandoned")
assert len(results) == 1
def test_resolve_invalid_status_raises(self, temp_db, user):
sid = memory_store.create_session(user["id"])
hid = memory_store.add_hypothesis(user["id"], sid, "Some thought")
with pytest.raises(ValueError, match="Invalid status"):
memory_store.resolve_hypothesis(hid, user["id"], "wrong_status")
def test_resolve_unknown_id_returns_false(self, temp_db, user):
result = memory_store.resolve_hypothesis(99999, user["id"], "confirmed")
assert result is False
def test_resolve_wrong_user_returns_false(self, temp_db):
user_a = memory_store.get_or_create_user("user_a")
user_b = memory_store.get_or_create_user("user_b")
sid = memory_store.create_session(user_a["id"])
hid = memory_store.add_hypothesis(user_a["id"], sid, "User A's private thought")
result = memory_store.resolve_hypothesis(hid, user_b["id"], "confirmed")
assert result is False
# Hypothesis must still be open for user_a
results = memory_store.list_hypotheses(user_a["id"], status="open")
assert len(results) == 1
def test_list_isolated_by_user(self, temp_db):
user_a = memory_store.get_or_create_user("user_a")
user_b = memory_store.get_or_create_user("user_b")
sid_a = memory_store.create_session(user_a["id"])
memory_store.add_hypothesis(user_a["id"], sid_a, "User A only sees this")
assert memory_store.list_hypotheses(user_b["id"]) == []
def test_resolve_no_resolution_text_is_allowed(self, temp_db, user):
sid = memory_store.create_session(user["id"])
hid = memory_store.add_hypothesis(user["id"], sid, "Quick thought")
result = memory_store.resolve_hypothesis(hid, user["id"], "abandoned")
assert result is True
class TestSearchFacts:
def test_search_returns_matching_fact(self, temp_db, user):
memory_store.store_fact(user["id"], "codebase", "We use SQLite for local storage")
results = memory_store.search_facts(user["id"], "SQLite")
assert len(results) == 1
assert "SQLite" in results[0]["fact"]
def test_search_porter_stemming(self, temp_db, user):
memory_store.store_fact(user["id"], "codebase", "FastMCP serialization rules")
# 'serialize' should match 'serialization' via Porter stemmer
results = memory_store.search_facts(user["id"], "serialize")
assert len(results) == 1
def test_search_no_results(self, temp_db, user):
memory_store.store_fact(user["id"], "preference", "Prefers dark mode")
results = memory_store.search_facts(user["id"], "quantum")
assert results == []
def test_search_excludes_deprecated(self, temp_db, user):
fid = memory_store.store_fact(user["id"], "codebase", "Old deprecated SQLite note")
memory_store.deprecate_fact(fid, user["id"])
results = memory_store.search_facts(user["id"], "SQLite")
assert results == []
def test_search_isolated_by_user(self, temp_db):
user_a = memory_store.get_or_create_user("search_a")
user_b = memory_store.get_or_create_user("search_b")
memory_store.store_fact(user_a["id"], "codebase", "User A secret fact")
assert memory_store.search_facts(user_b["id"], "secret") == []
def test_search_returns_category(self, temp_db, user):
memory_store.store_fact(user["id"], "preference", "Prefers uv over pip")
results = memory_store.search_facts(user["id"], "uv")
assert results[0]["category"] == "preference"
def test_search_limit(self, temp_db, user):
for i in range(5):
memory_store.store_fact(user["id"], "codebase", f"SQLite fact number {i}")
results = memory_store.search_facts(user["id"], "SQLite", limit=3)
assert len(results) == 3
def test_search_multiword_and_match(self, temp_db, user):
# Regression: multi-word query must AND-match (both words anywhere),
# NOT phrase-match (words consecutive). The 2026-03-31 fix accidentally
# broke this by wrapping the whole query in one double-quoted phrase.
memory_store.store_fact(user["id"], "codebase",
"BigMind uses SQLite for persistent local storage")
# Both words present but NOT consecutive — phrase search would return nothing
results = memory_store.search_facts(user["id"], "SQLite persistent")
assert len(results) == 1, "Multi-word AND search must match even when words are not consecutive"
def test_search_multiword_partial_match_returns_nothing(self, temp_db, user):
# If only one of two required words is present, no match
memory_store.store_fact(user["id"], "codebase", "We use SQLite locally")
results = memory_store.search_facts(user["id"], "SQLite quantum")
assert results == []
def test_search_reserved_word_category(self, temp_db, user):
# Regression: FTS5 reserved words like 'rank', 'content', 'category'
# must not crash or return wrong results
memory_store.store_fact(user["id"], "codebase", "rank and content pipeline")
results = memory_store.search_facts(user["id"], "rank")
assert len(results) == 1
def test_search_reserved_word_content(self, temp_db, user):
memory_store.store_fact(user["id"], "codebase", "the content table stores data")
results = memory_store.search_facts(user["id"], "content")
assert len(results) == 1
def test_search_three_word_query(self, temp_db, user):
memory_store.store_fact(user["id"], "codebase",
"parallel sessions across IDEs share the same SQLite database")
results = memory_store.search_facts(user["id"], "parallel sessions SQLite")
assert len(results) == 1
class TestSearchChunksMultiword:
def test_chunk_multiword_and_match(self, temp_db, user):
# Same regression test for search_chunks
sid = memory_store.create_session(user["id"])
memory_store.append_chunk(sid, user["id"], "assistant",
"WAL mode allows parallel reads from multiple IDE sessions",
"architectural decision")
# Words present but not consecutive — phrase search would fail
results = memory_store.search_chunks(user["id"], "parallel IDE")
assert len(results) == 1, "Multi-word AND search on chunks must not be a phrase search"
def test_chunk_reserved_word_rank(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.append_chunk(sid, user["id"], "assistant",
"bm25 rank scores the relevance of results", None)
results = memory_store.search_chunks(user["id"], "rank")
assert len(results) == 1
class TestUpgradeRequests:
def test_add_returns_id(self, temp_db, user):
sid = memory_store.create_session(user["id"])
rid = memory_store.add_upgrade_request(
user["id"], sid, "Add FTS to facts", "Would speed up recall"
)
assert isinstance(rid, int) and rid > 0
def test_default_status_is_open(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.add_upgrade_request(user["id"], sid, "Feature X", "Reason Y")
results = memory_store.list_upgrade_requests(user["id"])
assert results[0]["status"] == "open"
def test_default_priority_is_medium(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.add_upgrade_request(user["id"], sid, "Feature X", "Reason Y")
results = memory_store.list_upgrade_requests(user["id"])
assert results[0]["priority"] == "medium"
def test_default_certainty(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.add_upgrade_request(user["id"], sid, "Feature X", "Reason Y")
results = memory_store.list_upgrade_requests(user["id"])
assert results[0]["certainty"] == 0.7
def test_custom_priority_and_certainty(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.add_upgrade_request(
user["id"], sid, "Critical feature", "Blocks work",
priority="high", certainty=0.95
)
results = memory_store.list_upgrade_requests(user["id"])
assert results[0]["priority"] == "high"
assert results[0]["certainty"] == 0.95
def test_list_filter_by_status(self, temp_db, user):
sid = memory_store.create_session(user["id"])
rid = memory_store.add_upgrade_request(user["id"], sid, "Feature A", "Reason A")
memory_store.add_upgrade_request(user["id"], sid, "Feature B", "Reason B")
memory_store.resolve_upgrade_request(rid, user["id"], "resolved", "Done")
open_list = memory_store.list_upgrade_requests(user["id"], status="open")
resolved_list = memory_store.list_upgrade_requests(user["id"], status="resolved")
assert len(open_list) == 1 and open_list[0]["description"] == "Feature B"
assert len(resolved_list) == 1 and resolved_list[0]["description"] == "Feature A"
def test_resolve_resolved(self, temp_db, user):
sid = memory_store.create_session(user["id"])
rid = memory_store.add_upgrade_request(user["id"], sid, "Feature X", "Reason Y")
result = memory_store.resolve_upgrade_request(rid, user["id"], "resolved", "Shipped in v4")
assert result is True
results = memory_store.list_upgrade_requests(user["id"], status="resolved")
assert results[0]["resolution"] == "Shipped in v4"
assert results[0]["resolved_at"] is not None
def test_resolve_rejected(self, temp_db, user):
sid = memory_store.create_session(user["id"])
rid = memory_store.add_upgrade_request(user["id"], sid, "Feature X", "Reason Y")
memory_store.resolve_upgrade_request(rid, user["id"], "rejected", "Out of scope")
results = memory_store.list_upgrade_requests(user["id"], status="rejected")
assert results[0]["status"] == "rejected"
def test_resolve_invalid_status_raises(self, temp_db, user):
sid = memory_store.create_session(user["id"])
rid = memory_store.add_upgrade_request(user["id"], sid, "Feature X", "Reason Y")
with pytest.raises(ValueError, match="Invalid status"):
memory_store.resolve_upgrade_request(rid, user["id"], "done")
def test_resolve_unknown_id_returns_false(self, temp_db, user):
assert memory_store.resolve_upgrade_request(99999, user["id"], "resolved") is False
def test_resolve_wrong_user_returns_false(self, temp_db):
user_a = memory_store.get_or_create_user("user_a")
user_b = memory_store.get_or_create_user("user_b")
sid = memory_store.create_session(user_a["id"])
rid = memory_store.add_upgrade_request(user_a["id"], sid, "Feature X", "Reason Y")
assert memory_store.resolve_upgrade_request(rid, user_b["id"], "resolved") is False
assert memory_store.list_upgrade_requests(user_a["id"], status="open")[0]["id"] == rid
def test_list_isolated_by_user(self, temp_db):
user_a = memory_store.get_or_create_user("user_a")
user_b = memory_store.get_or_create_user("user_b")
sid = memory_store.create_session(user_a["id"])
memory_store.add_upgrade_request(user_a["id"], sid, "Only A sees this", "Reason")
assert memory_store.list_upgrade_requests(user_b["id"]) == []
# ── Health Check ───────────────────────────────────────────────────────────────
class TestHealthCheck:
def test_returns_expected_keys(self, temp_db, user):
report = memory_store.health_check(user["id"])
for key in (
"stale_facts", "sessions_without_summary", "open_sessions",
"chunk_count", "fts_row_count", "fts_in_sync",
"low_confidence_facts", "stale_threshold_days",
):
assert key in report
def test_empty_db_no_issues(self, temp_db, user):
report = memory_store.health_check(user["id"])
assert report["stale_facts"] == []
assert report["sessions_without_summary"] == 0
assert report["open_sessions"] == []
assert report["fts_in_sync"] is True
def test_detects_stale_facts(self, temp_db, user):
fid = memory_store.store_fact(user["id"], "codebase", "Old deployment note")
old_date = (datetime.now(timezone.utc) - timedelta(days=60)).isoformat()
with db() as conn:
conn.execute("UPDATE facts SET updated_at=? WHERE id=?", (old_date, fid))
report = memory_store.health_check(user["id"], stale_days=30)
assert len(report["stale_facts"]) == 1
assert report["stale_facts"][0]["id"] == fid
def test_fresh_facts_not_flagged(self, temp_db, user):
memory_store.store_fact(user["id"], "preference", "Fresh fact today")
report = memory_store.health_check(user["id"], stale_days=30)
assert report["stale_facts"] == []
def test_detects_sessions_without_summary(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.close_session(sid, "Closed without summary")
report = memory_store.health_check(user["id"])
assert report["sessions_without_summary"] == 1
def test_sessions_with_summary_not_flagged(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.close_session(sid, "Closed with summary")
memory_store.save_session_summary(sid, "Full narrative here.")
report = memory_store.health_check(user["id"])
assert report["sessions_without_summary"] == 0
def test_detects_open_sessions(self, temp_db, user):
memory_store.create_session(user["id"])
report = memory_store.health_check(user["id"])
assert len(report["open_sessions"]) == 1
def test_detects_low_confidence_facts(self, temp_db, user):
memory_store.store_fact(user["id"], "codebase", "Uncertain thing", confidence=0.5)
report = memory_store.health_check(user["id"])
assert len(report["low_confidence_facts"]) == 1
assert report["low_confidence_facts"][0]["confidence"] == 0.5
def test_high_confidence_facts_not_flagged(self, temp_db, user):
memory_store.store_fact(user["id"], "preference", "Certain thing", confidence=1.0)
report = memory_store.health_check(user["id"])
assert report["low_confidence_facts"] == []
def test_fts_in_sync_after_append(self, temp_db, user):
sid = memory_store.create_session(user["id"])
memory_store.append_chunk(sid, user["id"], "user", "FTS test chunk")
report = memory_store.health_check(user["id"])
assert report["fts_in_sync"] is True
assert report["chunk_count"] == 1
assert report["fts_row_count"] == 1
def test_stale_threshold_stored_in_report(self, temp_db, user):
report = memory_store.health_check(user["id"], stale_days=45)
assert report["stale_threshold_days"] == 45
class TestExportMemory:
def test_creates_file(self, temp_db, user, tmp_path):
out = str(tmp_path / "export.json")
memory_store.export_memory(user["id"], out)
assert Path(out).exists()
def test_json_structure(self, temp_db, user, tmp_path):
out = str(tmp_path / "export.json")
memory_store.export_memory(user["id"], out)
data = json.loads(Path(out).read_text())
for key in (
"export_date", "bigmind_version", "user", "identity_profile",
"facts", "sessions", "conversation_chunks", "stats",
):
assert key in data
def test_facts_included(self, temp_db, user, tmp_path):
memory_store.store_fact(user["id"], "preference", "Exported preference fact")
out = str(tmp_path / "export.json")
memory_store.export_memory(user["id"], out)
data = json.loads(Path(out).read_text())
assert any(f["fact"] == "Exported preference fact" for f in data["facts"])
def test_sessions_included_with_tier2(self, temp_db, user, tmp_path):
sid = memory_store.create_session(user["id"])
memory_store.close_session(sid, "Exported session")
memory_store.save_session_summary(sid, "Full story for export test.")
out = str(tmp_path / "export.json")
memory_store.export_memory(user["id"], out)
data = json.loads(Path(out).read_text())
exported = next(s for s in data["sessions"] if s["id"] == sid)
assert exported["one_liner"] == "Exported session"
assert exported["tier2_summary"]["summary"] == "Full story for export test."
def test_session_without_summary_has_null_tier2(self, temp_db, user, tmp_path):
sid = memory_store.create_session(user["id"])
memory_store.close_session(sid, "No narrative session")
out = str(tmp_path / "export.json")
memory_store.export_memory(user["id"], out)
data = json.loads(Path(out).read_text())
exported = next(s for s in data["sessions"] if s["id"] == sid)
assert exported["tier2_summary"] is None
def test_chunks_included(self, temp_db, user, tmp_path):
sid = memory_store.create_session(user["id"])
memory_store.append_chunk(sid, user["id"], "assistant", "Exported chunk content")
out = str(tmp_path / "export.json")
memory_store.export_memory(user["id"], out)
data = json.loads(Path(out).read_text())
assert any("Exported chunk content" in c["content"] for c in data["conversation_chunks"])
def test_stats_accurate(self, temp_db, user, tmp_path):
memory_store.store_fact(user["id"], "pref", "Fact A")
memory_store.store_fact(user["id"], "pref", "Fact B")
out = str(tmp_path / "export.json")
memory_store.export_memory(user["id"], out)
data = json.loads(Path(out).read_text())
assert data["stats"]["facts_count"] == 2
def test_returns_result_dict(self, temp_db, user, tmp_path):
out = str(tmp_path / "export.json")
result = memory_store.export_memory(user["id"], out)
assert result["output_path"] == out
for key in ("facts_count", "sessions_count", "chunks_count", "file_size_kb"):
assert key in result
def test_default_path_in_home_dir(self, temp_db, user):
result = memory_store.export_memory(user["id"])
path = Path(result["output_path"])
try:
assert path.exists()
assert path.parent == Path.home()
finally:
path.unlink(missing_ok=True) # cleanup
# ── Auto-close / Orphaned Sessions ────────────────────────────────────────────
class TestCloseOrphanedSessions:
"""Tests for close_orphaned_sessions — the manual session cleanup tool."""
def test_closes_all_open_except_current(self, temp_db, user):
from bigmind.auto_close import close_orphaned_sessions
s1 = memory_store.create_session(user["id"])
s2 = memory_store.create_session(user["id"])
s3 = memory_store.create_session(user["id"]) # "current"
closed = close_orphaned_sessions(user["id"], keep_session_id=s3)
assert set(closed) == {s1, s2}
assert s3 not in closed
def test_closed_sessions_have_ended_at(self, temp_db, user):
from bigmind.auto_close import close_orphaned_sessions
s1 = memory_store.create_session(user["id"])
s2 = memory_store.create_session(user["id"]) # keep
close_orphaned_sessions(user["id"], keep_session_id=s2)
with db() as conn:
row = conn.execute(
"SELECT ended_at, one_liner FROM sessions WHERE id=?", (s1,)
).fetchone()
assert row["ended_at"] is not None
assert "orphaned" in row["one_liner"]
def test_keep_session_remains_open(self, temp_db, user):
from bigmind.auto_close import close_orphaned_sessions
memory_store.create_session(user["id"])
s_current = memory_store.create_session(user["id"])
close_orphaned_sessions(user["id"], keep_session_id=s_current)
with db() as conn:
row = conn.execute(
"SELECT ended_at FROM sessions WHERE id=?", (s_current,)
).fetchone()
assert row["ended_at"] is None
def test_returns_empty_when_no_orphans(self, temp_db, user):
from bigmind.auto_close import close_orphaned_sessions
s_current = memory_store.create_session(user["id"])
assert close_orphaned_sessions(user["id"], keep_session_id=s_current) == []
def test_does_not_touch_already_closed_sessions(self, temp_db, user):
from bigmind.auto_close import close_orphaned_sessions
s_old = memory_store.create_session(user["id"])
memory_store.close_session(s_old, "Already closed")
s_current = memory_store.create_session(user["id"])
closed = close_orphaned_sessions(user["id"], keep_session_id=s_current)
assert s_old not in closed
def test_isolated_by_user(self, temp_db):
from bigmind.auto_close import close_orphaned_sessions
u1 = memory_store.get_or_create_user("cleanup_user_a")
u2 = memory_store.get_or_create_user("cleanup_user_b")
memory_store.create_session(u1["id"])
s_u1_current = memory_store.create_session(u1["id"])
s_u2_active = memory_store.create_session(u2["id"])
close_orphaned_sessions(u1["id"], keep_session_id=s_u1_current)
still_open = memory_store.get_open_sessions(u2["id"])
assert any(s["id"] == s_u2_active for s in still_open)
# ── Restart Server ─────────────────────────────────────────────────────────────
class TestRestartServer:
"""Tests for restart_server_in_place — os.execv-based in-place restart."""
def test_calls_execv_with_correct_args(self, monkeypatch):
"""restart_server_in_place must call os.execv(sys.executable, [sys.executable] + sys.argv)."""
import os
import sys
import time
from bigmind.auto_close import restart_server_in_place
execv_calls = []
monkeypatch.setattr(os, "execv", lambda path, args: execv_calls.append((path, args)))
monkeypatch.setattr(time, "sleep", lambda _: None) # skip the 500ms delay
restart_server_in_place()
assert len(execv_calls) == 1
path, args = execv_calls[0]
assert path == sys.executable
assert args[0] == sys.executable
assert args[1:] == sys.argv
def test_sleep_called_before_execv(self, monkeypatch):
"""sleep must be called before execv so the MCP response is delivered first."""
import os
import time
from bigmind.auto_close import restart_server_in_place
call_order = []
monkeypatch.setattr(time, "sleep", lambda _: call_order.append("sleep"))
monkeypatch.setattr(os, "execv", lambda *_: call_order.append("execv"))
restart_server_in_place()
assert call_order == ["sleep", "execv"]
def test_sleep_duration_is_half_second(self, monkeypatch):
"""The delay must be 0.5s — long enough for the MCP response to be sent."""
import os
import time
from bigmind.auto_close import restart_server_in_place
sleep_durations = []
monkeypatch.setattr(time, "sleep", lambda d: sleep_durations.append(d))
monkeypatch.setattr(os, "execv", lambda *_: None)
restart_server_in_place()
assert sleep_durations == [0.5]
+116
View File
@@ -0,0 +1,116 @@
"""Tests for BigMind people/contacts directory (v3.0 — schema v7)."""
import pytest
from bigmind import memory_store
@pytest.fixture
def user(temp_db):
return memory_store.get_or_create_user("testuser", "Test User")
class TestRememberPerson:
def test_insert_minimal(self, temp_db, user):
pid = memory_store.upsert_person(user["id"], "elias")
assert pid > 0
def test_insert_full(self, temp_db, user):
pid = memory_store.upsert_person(
user["id"], "elias",
display_name="Elias Müller",
role="Engineer",
team="PI",
notes="Shared BigMind with him",
bigmind_user="elias",
bigmind_url="http://localhost:7701",
)
people = memory_store.list_people(user["id"])
assert len(people) == 1
p = people[0]
assert p["display_name"] == "Elias Müller"
assert p["bigmind_user"] == "elias"
assert p["bigmind_url"] == "http://localhost:7701"
def test_upsert_updates_existing(self, temp_db, user):
memory_store.upsert_person(user["id"], "elias", role="Intern")
memory_store.upsert_person(user["id"], "elias", role="Engineer")
people = memory_store.list_people(user["id"])
assert len(people) == 1
assert people[0]["role"] == "Engineer"
def test_upsert_preserves_unset_fields(self, temp_db, user):
memory_store.upsert_person(user["id"], "elias", notes="First note")
memory_store.upsert_person(user["id"], "elias", role="Engineer")
people = memory_store.list_people(user["id"])
assert people[0]["notes"] == "First note"
assert people[0]["role"] == "Engineer"
def test_different_users_isolated(self, temp_db):
u1 = memory_store.get_or_create_user("alice")
u2 = memory_store.get_or_create_user("bob")
memory_store.upsert_person(u1["id"], "elias")
assert memory_store.list_people(u2["id"]) == []
class TestRecallPerson:
def test_search_by_name(self, temp_db, user):
memory_store.upsert_person(user["id"], "elias", display_name="Elias Müller")
memory_store.upsert_person(user["id"], "klaus", display_name="Klaus Schmidt")
results = memory_store.recall_person(user["id"], "elias")
assert len(results) == 1
assert results[0]["username"] == "elias"
def test_search_by_role(self, temp_db, user):
memory_store.upsert_person(user["id"], "elias", role="Frontend Engineer")
memory_store.upsert_person(user["id"], "oliver", role="Backend Engineer")
results = memory_store.recall_person(user["id"], "Frontend")
assert len(results) == 1
assert results[0]["username"] == "elias"
def test_search_by_notes(self, temp_db, user):
memory_store.upsert_person(user["id"], "elias", notes="token efficiency idea")
results = memory_store.recall_person(user["id"], "token")
assert len(results) == 1
def test_no_results(self, temp_db, user):
results = memory_store.recall_person(user["id"], "nobody")
assert results == []
def test_search_isolated_by_user(self, temp_db):
u1 = memory_store.get_or_create_user("alice")
u2 = memory_store.get_or_create_user("bob")
memory_store.upsert_person(u1["id"], "elias", role="Engineer")
results = memory_store.recall_person(u2["id"], "Engineer")
assert results == []
class TestListPeople:
def test_empty(self, temp_db, user):
assert memory_store.list_people(user["id"]) == []
def test_returns_all(self, temp_db, user):
memory_store.upsert_person(user["id"], "elias")
memory_store.upsert_person(user["id"], "klaus")
memory_store.upsert_person(user["id"], "oliver")
assert len(memory_store.list_people(user["id"])) == 3
class TestLinkAI:
def test_link_existing_person(self, temp_db, user):
memory_store.upsert_person(user["id"], "elias")
result = memory_store.link_ai(user["id"], "elias", "elias_ai", "http://localhost:7701")
assert result is True
people = memory_store.list_people(user["id"])
assert people[0]["bigmind_user"] == "elias_ai"
assert people[0]["bigmind_url"] == "http://localhost:7701"
def test_link_nonexistent_person(self, temp_db, user):
result = memory_store.link_ai(user["id"], "ghost", "ghost_ai")
assert result is False
def test_link_without_url(self, temp_db, user):
memory_store.upsert_person(user["id"], "elias")
result = memory_store.link_ai(user["id"], "elias", "elias_ai")
assert result is True
people = memory_store.list_people(user["id"])
assert people[0]["bigmind_user"] == "elias_ai"
assert people[0]["bigmind_url"] is None
+328
View File
@@ -0,0 +1,328 @@
"""Tests for the Achievement Gallery — profile_builder.compute_achievements().
All tests use the temp_db fixture (auto-use in conftest.py) which wires
BIGMIND_DB_PATH + BIGMIND_USER to a fresh SQLite file per test.
"""
import pytest
from datetime import datetime, timezone, timedelta
from bigmind import memory_store
from bigmind.profile_builder import compute_achievements, build_profile_data
# ── Helpers ───────────────────────────────────────────────────────────────────
def _uid():
user = memory_store.get_or_create_user(memory_store.get_current_username())
return user["id"]
def _close_session(session_id: str, has_tier2: bool = False):
"""Close a session with a one-liner summary."""
memory_store.close_session(
session_id=session_id,
one_liner="test session",
topics="test",
outcome="ok",
importance=5,
)
if has_tier2:
memory_store.save_session_summary(session_id, summary="detailed summary")
# ── TestComputeAchievements ───────────────────────────────────────────────────
class TestComputeAchievements:
def test_returns_list_of_expected_ids(self):
uid = _uid()
achievements = compute_achievements(uid)
ids = {a["id"] for a in achievements}
expected = {
"first_breath", "first_thought", "eureka", "honest_mind",
"scholar", "deep_knowledge", "scientist", "veteran",
"on_fire", "storyteller", "night_owl", "speed_thinker",
"first_handshake", "birthday", "shared_mind",
"frugal_mind", "quarter_million", "token_millionaire", "sniper",
}
assert expected == ids
def test_all_locked_for_empty_db(self):
"""Fresh DB: most achievements locked, except First Handshake (hardcoded)."""
uid = _uid()
achievements = compute_achievements(uid)
by_id = {a["id"]: a for a in achievements}
# First Handshake is always unlocked (hardcoded to 2026-03-31)
assert by_id["first_handshake"]["unlocked"] is True
assert by_id["first_handshake"]["unlocked_at"] == "2026-03-31"
# Everything else locked
for aid in ["first_breath", "first_thought", "eureka", "honest_mind",
"scholar", "veteran", "on_fire", "storyteller", "night_owl",
"speed_thinker", "frugal_mind", "quarter_million",
"token_millionaire", "sniper"]:
assert by_id[aid]["unlocked"] is False, f"{aid} should be locked"
# Shared Mind is always locked (Phase 3 not yet)
assert by_id["shared_mind"]["unlocked"] is False
# ── First Breath ──────────────────────────────────────────────────────────
def test_first_breath_unlocks_after_first_session(self):
uid = _uid()
sid = memory_store.create_session(uid)
_close_session(sid)
ach = {a["id"]: a for a in compute_achievements(uid)}
assert ach["first_breath"]["unlocked"] is True
assert ach["first_breath"]["unlocked_at"] is not None
def test_first_breath_locked_with_only_open_session(self):
"""Open (unclosed) session does NOT unlock First Breath."""
uid = _uid()
memory_store.create_session(uid) # not closed
ach = {a["id"]: a for a in compute_achievements(uid)}
assert ach["first_breath"]["unlocked"] is False
# ── First Thought / Eureka / Honest Mind ─────────────────────────────────
def test_first_thought_unlocks_on_first_hypothesis(self):
uid = _uid()
sid = memory_store.create_session(uid)
memory_store.add_hypothesis(uid, sid, "test hypothesis", confidence=0.7)
ach = {a["id"]: a for a in compute_achievements(uid)}
assert ach["first_thought"]["unlocked"] is True
def test_eureka_locked_until_confirmed(self):
uid = _uid()
sid = memory_store.create_session(uid)
hid = memory_store.add_hypothesis(uid, sid, "will be confirmed")
ach = {a["id"]: a for a in compute_achievements(uid)}
assert ach["eureka"]["unlocked"] is False # still open
memory_store.resolve_hypothesis(hid, uid, "confirmed", "yes it was true")
ach = {a["id"]: a for a in compute_achievements(uid)}
assert ach["eureka"]["unlocked"] is True
def test_honest_mind_locked_until_refuted(self):
uid = _uid()
sid = memory_store.create_session(uid)
hid = memory_store.add_hypothesis(uid, sid, "will be refuted")
ach = {a["id"]: a for a in compute_achievements(uid)}
assert ach["honest_mind"]["unlocked"] is False
memory_store.resolve_hypothesis(hid, uid, "refuted", "nope, was wrong")
ach = {a["id"]: a for a in compute_achievements(uid)}
assert ach["honest_mind"]["unlocked"] is True
# ── Scholar ───────────────────────────────────────────────────────────────
def test_scholar_locks_below_25_facts(self):
uid = _uid()
for i in range(24):
memory_store.store_fact(uid, "test", f"fact number {i}")
ach = {a["id"]: a for a in compute_achievements(uid)}
assert ach["scholar"]["unlocked"] is False
assert "24" in ach["scholar"]["condition"]
def test_scholar_unlocks_at_25_facts(self):
uid = _uid()
for i in range(25):
memory_store.store_fact(uid, "test", f"fact number {i}")
ach = {a["id"]: a for a in compute_achievements(uid)}
assert ach["scholar"]["unlocked"] is True
assert ach["scholar"]["unlocked_at"] is not None
def test_deep_knowledge_unlocks_at_100_facts(self):
uid = _uid()
for i in range(100):
memory_store.store_fact(uid, "test", f"fact number {i}")
ach = {a["id"]: a for a in compute_achievements(uid)}
assert ach["deep_knowledge"]["unlocked"] is True
# Scholar should also be unlocked
assert ach["scholar"]["unlocked"] is True
# ── Scientist ─────────────────────────────────────────────────────────────
def test_scientist_unlocks_at_10_hypotheses(self):
uid = _uid()
sid = memory_store.create_session(uid)
for i in range(9):
memory_store.add_hypothesis(uid, sid, f"hypothesis {i}")
ach = {a["id"]: a for a in compute_achievements(uid)}
assert ach["scientist"]["unlocked"] is False
memory_store.add_hypothesis(uid, sid, "hypothesis 9")
ach = {a["id"]: a for a in compute_achievements(uid)}
assert ach["scientist"]["unlocked"] is True
assert ach["scientist"]["unlocked_at"] is not None
# ── Veteran ───────────────────────────────────────────────────────────────
def test_veteran_unlocks_at_50_sessions(self):
uid = _uid()
for _ in range(50):
sid = memory_store.create_session(uid)
_close_session(sid)
ach = {a["id"]: a for a in compute_achievements(uid)}
assert ach["veteran"]["unlocked"] is True
assert ach["veteran"]["unlocked_at"] is not None
def test_veteran_locks_at_49_sessions(self):
uid = _uid()
for _ in range(49):
sid = memory_store.create_session(uid)
_close_session(sid)
ach = {a["id"]: a for a in compute_achievements(uid)}
assert ach["veteran"]["unlocked"] is False
# ── On Fire ───────────────────────────────────────────────────────────────
def test_on_fire_locked_below_5_sessions_per_day(self):
uid = _uid()
for _ in range(4):
sid = memory_store.create_session(uid)
_close_session(sid)
ach = {a["id"]: a for a in compute_achievements(uid)}
assert ach["on_fire"]["unlocked"] is False
def test_on_fire_unlocks_at_5_sessions_same_day(self):
uid = _uid()
for _ in range(5):
sid = memory_store.create_session(uid)
_close_session(sid)
ach = {a["id"]: a for a in compute_achievements(uid)}
assert ach["on_fire"]["unlocked"] is True
# ── Storyteller ───────────────────────────────────────────────────────────
def test_storyteller_requires_20_tier2_sessions(self):
uid = _uid()
# 19 sessions with tier2
for _ in range(19):
sid = memory_store.create_session(uid)
_close_session(sid, has_tier2=True)
ach = {a["id"]: a for a in compute_achievements(uid)}
assert ach["storyteller"]["unlocked"] is False
sid = memory_store.create_session(uid)
_close_session(sid, has_tier2=True)
ach = {a["id"]: a for a in compute_achievements(uid)}
assert ach["storyteller"]["unlocked"] is True
# ── Speed Thinker ─────────────────────────────────────────────────────────
def test_speed_thinker_unlocks_same_day_confirm(self):
uid = _uid()
sid = memory_store.create_session(uid)
hid = memory_store.add_hypothesis(uid, sid, "quick thought")
memory_store.resolve_hypothesis(hid, uid, "confirmed", "yes!")
ach = {a["id"]: a for a in compute_achievements(uid)}
assert ach["speed_thinker"]["unlocked"] is True
# ── Token achievements ────────────────────────────────────────────────────
def test_frugal_mind_unlocks_on_first_token_save(self):
uid = _uid()
sid = memory_store.create_session(uid)
memory_store.log_token_save(sid, uid, "saved tokens by grep", 5000, "grep")
ach = {a["id"]: a for a in compute_achievements(uid)}
assert ach["frugal_mind"]["unlocked"] is True
assert ach["frugal_mind"]["unlocked_at"] is not None
def test_quarter_million_unlocks_at_250k(self):
uid = _uid()
sid = memory_store.create_session(uid)
memory_store.log_token_save(sid, uid, "big save", 250_000, "grep")
ach = {a["id"]: a for a in compute_achievements(uid)}
assert ach["quarter_million"]["unlocked"] is True
def test_token_millionaire_unlocks_at_1m(self):
uid = _uid()
sid = memory_store.create_session(uid)
memory_store.log_token_save(sid, uid, "huge save", 1_000_000, "memory_hit")
ach = {a["id"]: a for a in compute_achievements(uid)}
assert ach["token_millionaire"]["unlocked"] is True
def test_sniper_requires_single_save_over_500k(self):
uid = _uid()
sid = memory_store.create_session(uid)
# Multiple saves that total > 500k but none individual exceeds it
memory_store.log_token_save(sid, uid, "save 1", 300_000, "grep")
memory_store.log_token_save(sid, uid, "save 2", 300_000, "grep")
ach = {a["id"]: a for a in compute_achievements(uid)}
assert ach["sniper"]["unlocked"] is False # no single save > 500k
memory_store.log_token_save(sid, uid, "sniper shot", 600_000, "grep")
ach = {a["id"]: a for a in compute_achievements(uid)}
assert ach["sniper"]["unlocked"] is True
# ── Birthday ──────────────────────────────────────────────────────────────
def test_birthday_locked_shows_countdown(self):
uid = _uid()
sid = memory_store.create_session(uid)
_close_session(sid)
ach = {a["id"]: a for a in compute_achievements(uid)}
bday = ach["birthday"]
assert bday["unlocked"] is False
assert bday["extra"] is not None
assert "In " in bday["extra"] or "day" in bday["extra"]
# ── Hardcoded achievements ─────────────────────────────────────────────────
def test_first_handshake_always_unlocked(self):
uid = _uid()
ach = {a["id"]: a for a in compute_achievements(uid)}
assert ach["first_handshake"]["unlocked"] is True
assert ach["first_handshake"]["unlocked_at"] == "2026-03-31"
def test_shared_mind_always_locked(self):
uid = _uid()
ach = {a["id"]: a for a in compute_achievements(uid)}
assert ach["shared_mind"]["unlocked"] is False
# ── Achievement structure ─────────────────────────────────────────────────
def test_all_achievements_have_required_keys(self):
uid = _uid()
achievements = compute_achievements(uid)
for a in achievements:
assert "id" in a
assert "icon" in a
assert "name" in a
assert "description" in a
assert "unlocked" in a
assert "unlocked_at" in a
assert "condition" in a
def test_unlocked_achievement_has_no_extra_for_non_birthday(self):
"""Non-birthday unlocked achievements should not have 'extra' countdown text."""
uid = _uid()
sid = memory_store.create_session(uid)
_close_session(sid)
ach = {a["id"]: a for a in compute_achievements(uid)}
fb = ach["first_breath"]
assert fb["unlocked"] is True
assert fb.get("extra") is None
# ── build_profile_data integration ────────────────────────────────────────
def test_build_profile_data_includes_achievements(self):
uid = _uid()
data = build_profile_data(uid)
assert "achievements" in data
assert isinstance(data["achievements"], list)
assert len(data["achievements"]) > 0
def test_build_profile_data_achievement_count_correct(self):
uid = _uid()
# Add one session so first_breath and on_fire can unlock
sid = memory_store.create_session(uid)
_close_session(sid)
data = build_profile_data(uid)
unlocked = [a for a in data["achievements"] if a["unlocked"]]
# At minimum: first_breath + first_handshake = 2
assert len(unlocked) >= 2
+864
View File
@@ -0,0 +1,864 @@
"""Tests for BigMind MCP server tools (src/server.py).
All tests run against an isolated temp database (autouse fixture in conftest.py).
Server functions are called directly — no MCP transport layer needed for unit tests.
The module-level init_db() in server.py runs once at import time (harmless, idempotent).
All DATA operations are redirected to the temp DB by the monkeypatched BIGMIND_DB_PATH.
"""
import json
import re
import pytest
from datetime import datetime, timezone, timedelta
from pathlib import Path
from server import (
memory_start_session,
memory_end_session,
memory_flag_important,
memory_get_context,
memory_get_session_detail,
memory_search_chunks,
memory_list_sessions,
memory_store_fact,
memory_update_profile,
memory_append_chunk,
memory_get_stats,
memory_vacuum,
memory_get_instructions,
memory_health_check,
memory_export,
memory_deprecate_fact,
memory_add_hypothesis,
memory_resolve_hypothesis,
memory_list_hypotheses,
)
from bigmind import memory_store
from bigmind.db import db
# ── Helpers ────────────────────────────────────────────────────────────────────
def _start_and_get_id() -> str:
"""Call memory_start_session and return the session UUID."""
result = memory_start_session()
match = re.search(r"id: `([^`]+)`", result)
assert match, f"Could not extract session id from:\n{result}"
return match.group(1)
def _close(sid: str, headline: str = "Test session", topics: str = "test") -> str:
"""Convenience wrapper for memory_end_session."""
return memory_end_session(
session_id=sid,
one_liner=headline,
topics=topics,
outcome="Test outcome",
summary="Test summary narrative.",
)
# ── memory_start_session ───────────────────────────────────────────────────────
class TestMemoryStartSession:
def test_returns_started_confirmation(self, temp_db):
result = memory_start_session()
assert "BigMind session started" in result
def test_returns_valid_session_id(self, temp_db):
sid = _start_and_get_id()
# UUIDs are 36 chars with 4 hyphens
assert len(sid) == 36
assert sid.count("-") == 4
def test_returns_context_markdown(self, temp_db):
result = memory_start_session()
assert "🧠 BigMind Context" in result
def test_creates_open_session_in_db(self, temp_db):
user = memory_store.get_or_create_user("testuser")
before = memory_store.get_open_sessions(user["id"])
memory_start_session()
after = memory_store.get_open_sessions(user["id"])
assert len(after) == len(before) + 1
def test_multiple_starts_create_multiple_sessions(self, temp_db):
user = memory_store.get_or_create_user("testuser")
memory_start_session()
memory_start_session()
open_sessions = memory_store.get_open_sessions(user["id"])
assert len(open_sessions) == 2
# ── memory_end_session ─────────────────────────────────────────────────────────
class TestMemoryEndSession:
def test_returns_confirmation_with_headline(self, temp_db):
sid = _start_and_get_id()
result = _close(sid, headline="My important session")
assert "✅ Session closed" in result
assert "My important session" in result
def test_session_no_longer_open(self, temp_db):
user = memory_store.get_or_create_user("testuser")
sid = _start_and_get_id()
_close(sid)
open_sessions = memory_store.get_open_sessions(user["id"])
assert all(s["id"] != sid for s in open_sessions)
def test_session_appears_in_closed_list(self, temp_db):
sid = _start_and_get_id()
_close(sid, headline="Findable session")
result = memory_list_sessions()
assert "Findable session" in result
def test_tier2_summary_saved(self, temp_db):
sid = _start_and_get_id()
memory_end_session(
session_id=sid,
one_liner="Narrative session",
topics="test",
outcome="Done",
summary="The full story lives here.",
key_facts="- Key insight one",
code_refs="src/server.py",
)
detail = memory_store.get_session_detail(sid)
assert detail is not None
assert "full story" in detail["summary"]
assert "Key insight one" in detail["key_facts"]
assert "src/server.py" in detail["code_refs"]
def test_importance_stored_correctly(self, temp_db):
sid = _start_and_get_id()
memory_end_session(
session_id=sid, one_liner="High importance",
topics="test", outcome="Done", summary="Summary", importance=9,
)
with db() as conn:
row = conn.execute(
"SELECT importance FROM sessions WHERE id=?", (sid,)
).fetchone()
assert row["importance"] == 9
def test_topics_stored(self, temp_db):
sid = _start_and_get_id()
_close(sid, topics="bigmind,sqlite,memory")
result = memory_list_sessions()
assert "bigmind" in result
# ── memory_flag_important ──────────────────────────────────────────────────────
class TestMemoryFlagImportant:
def test_returns_tier3_confirmation(self, temp_db):
sid = _start_and_get_id()
result = memory_flag_important(
session_id=sid,
content="We decided to use SQLite",
flag_reason="architectural decision",
)
assert "✅ Stored as Tier-3 memory chunk" in result
def test_chunk_id_increments(self, temp_db):
sid = _start_and_get_id()
r1 = memory_flag_important(session_id=sid, content="First chunk")
r2 = memory_flag_important(session_id=sid, content="Second chunk")
id1 = int(re.search(r"id: (\d+)", r1).group(1))
id2 = int(re.search(r"id: (\d+)", r2).group(1))
assert id2 > id1
def test_chunk_is_searchable_after_flagging(self, temp_db):
sid = _start_and_get_id()
memory_flag_important(
session_id=sid,
content="PostgreSQL migration plan discussed",
)
result = memory_search_chunks("PostgreSQL")
assert "PostgreSQL" in result
def test_flag_reason_stored(self, temp_db):
sid = _start_and_get_id()
memory_flag_important(
session_id=sid,
content="Some content",
flag_reason="user preference",
)
user = memory_store.get_or_create_user("testuser")
chunks = memory_store.search_chunks(user["id"], "Some content")
assert chunks[0]["flag_reason"] == "user preference"
def test_default_role_is_assistant(self, temp_db):
sid = _start_and_get_id()
memory_flag_important(session_id=sid, content="Default role check")
user = memory_store.get_or_create_user("testuser")
chunks = memory_store.search_chunks(user["id"], "Default role check")
assert chunks[0]["role"] == "assistant"
def test_custom_role_stored(self, temp_db):
sid = _start_and_get_id()
memory_flag_important(session_id=sid, content="User said this", role="user")
user = memory_store.get_or_create_user("testuser")
chunks = memory_store.search_chunks(user["id"], "User said this")
assert chunks[0]["role"] == "user"
# ── memory_get_context ─────────────────────────────────────────────────────────
class TestMemoryGetContext:
def test_returns_markdown(self, temp_db):
result = memory_get_context()
assert "🧠 BigMind Context" in result
def test_does_not_create_new_session(self, temp_db):
user = memory_store.get_or_create_user("testuser")
before = memory_store.get_open_sessions(user["id"])
memory_get_context()
after = memory_store.get_open_sessions(user["id"])
assert len(before) == len(after)
def test_reflects_profile_changes(self, temp_db):
memory_update_profile(role="Staff Engineer")
result = memory_get_context()
assert "Staff Engineer" in result
def test_reflects_stored_facts(self, temp_db):
memory_store_fact(category="codebase", fact="Uses FastMCP for all servers")
result = memory_get_context()
assert "Uses FastMCP for all servers" in result
# ── memory_get_session_detail ──────────────────────────────────────────────────
class TestMemoryGetSessionDetail:
def test_returns_detail_for_session_with_summary(self, temp_db):
sid = _start_and_get_id()
memory_end_session(
session_id=sid, one_liner="Detailed one",
topics="test", outcome="Done",
summary="The complete story is stored here.",
)
result = memory_get_session_detail(sid)
assert "complete story" in result
def test_returns_key_facts_when_present(self, temp_db):
sid = _start_and_get_id()
memory_end_session(
session_id=sid, one_liner="With facts",
topics="test", outcome="Done", summary="Summary",
key_facts="- Decided on Python\n- Chose SQLite",
)
result = memory_get_session_detail(sid)
assert "Decided on Python" in result
def test_returns_code_refs_when_present(self, temp_db):
sid = _start_and_get_id()
memory_end_session(
session_id=sid, one_liner="With refs",
topics="test", outcome="Done", summary="Summary",
code_refs="bigmind/db.py, src/server.py",
)
result = memory_get_session_detail(sid)
assert "bigmind/db.py" in result
def test_returns_error_for_nonexistent_session(self, temp_db):
result = memory_get_session_detail("00000000-0000-0000-0000-000000000000")
assert "No detailed summary found" in result
def test_returns_error_for_session_without_tier2(self, temp_db):
user = memory_store.get_or_create_user("testuser")
sid = memory_store.create_session(user["id"])
result = memory_get_session_detail(sid)
assert "No detailed summary found" in result
# ── memory_search_chunks ───────────────────────────────────────────────────────
class TestMemorySearchChunks:
def test_returns_matching_results(self, temp_db):
sid = _start_and_get_id()
memory_flag_important(session_id=sid, content="Chose WAL mode for SQLite concurrency")
result = memory_search_chunks("WAL mode")
assert "WAL mode" in result
assert "Result 1" in result
def test_returns_no_results_message_when_empty(self, temp_db):
result = memory_search_chunks("xyzzy_term_that_will_never_exist_42")
assert "No memory chunks found" in result
def test_result_count_in_header(self, temp_db):
sid = _start_and_get_id()
memory_flag_important(session_id=sid, content="alpha beta gamma")
result = memory_search_chunks("alpha beta")
assert "1 result" in result
def test_respects_limit(self, temp_db):
sid = _start_and_get_id()
for i in range(5):
memory_flag_important(session_id=sid, content=f"unique chunk item {i}")
result = memory_search_chunks("unique chunk item", limit=2)
assert "2 results" in result
def test_isolated_to_current_user(self, temp_db):
"""Chunks from a different user must not appear in search results."""
user = memory_store.get_or_create_user("testuser")
other_user = memory_store.get_or_create_user("otheruser")
other_sid = memory_store.create_session(other_user["id"])
memory_store.append_chunk(other_sid, other_user["id"], "user", "secret other data")
result = memory_search_chunks("secret other data")
assert "No memory chunks found" in result
# ── memory_list_sessions ───────────────────────────────────────────────────────
class TestMemoryListSessions:
def test_lists_closed_sessions(self, temp_db):
sid = _start_and_get_id()
_close(sid, headline="Listed session headline")
result = memory_list_sessions()
assert "Listed session headline" in result
def test_no_sessions_message_when_empty(self, temp_db):
result = memory_list_sessions()
assert "No sessions found" in result
def test_open_sessions_not_in_list(self, temp_db):
_start_and_get_id() # open session — should NOT appear
result = memory_list_sessions()
assert "No sessions found" in result
def test_topics_filter_includes_match(self, temp_db):
sid = _start_and_get_id()
_close(sid, headline="Python backend work", topics="python,backend")
result = memory_list_sessions(topics_filter="python")
assert "Python backend work" in result
def test_topics_filter_excludes_non_match(self, temp_db):
sid1 = _start_and_get_id()
_close(sid1, headline="Python session", topics="python")
sid2 = _start_and_get_id()
_close(sid2, headline="Design session", topics="design,frontend")
result = memory_list_sessions(topics_filter="python")
assert "Python session" in result
assert "Design session" not in result
def test_topics_filter_no_match_returns_message(self, temp_db):
sid = _start_and_get_id()
_close(sid, topics="test")
result = memory_list_sessions(topics_filter="nonexistenttopic")
assert "No sessions found" in result
def test_tier2_indicator_shown(self, temp_db):
sid = _start_and_get_id()
memory_end_session(
session_id=sid, one_liner="With detail",
topics="test", outcome="Done", summary="Full narrative.",
)
result = memory_list_sessions()
assert "📄" in result
# ── memory_store_fact ──────────────────────────────────────────────────────────
class TestMemoryStoreFact:
def test_returns_confirmation(self, temp_db):
result = memory_store_fact(category="preference", fact="Prefers dark mode always")
assert "✅ Fact stored" in result
assert "preference" in result
assert "Prefers dark mode always" in result
def test_fact_appears_in_context(self, temp_db):
memory_store_fact(category="codebase", fact="All servers use FastMCP pattern")
result = memory_get_context()
assert "All servers use FastMCP pattern" in result
def test_category_shown_in_context(self, temp_db):
memory_store_fact(category="constraint", fact="Must support Python 3.12+")
result = memory_get_context()
assert "[constraint]" in result
def test_multiple_facts_all_shown(self, temp_db):
memory_store_fact(category="preference", fact="Uses uv for packaging")
memory_store_fact(category="decision", fact="Chose SQLite over DuckDB")
result = memory_get_context()
assert "Uses uv for packaging" in result
assert "Chose SQLite over DuckDB" in result
# ── memory_update_profile ──────────────────────────────────────────────────────
class TestMemoryUpdateProfile:
def test_returns_confirmation(self, temp_db):
result = memory_update_profile(role="Senior Engineer")
assert "✅ Identity profile updated" in result
def test_role_appears_in_context(self, temp_db):
memory_update_profile(role="Principal Engineer")
result = memory_get_context()
assert "Principal Engineer" in result
def test_preferences_appear_in_context(self, temp_db):
memory_update_profile(preferences="Python first, no unnecessary abstractions")
result = memory_get_context()
assert "Python first" in result
def test_pinned_facts_appear_in_context(self, temp_db):
memory_update_profile(pinned_facts="- Always uses uv\n- macOS only")
result = memory_get_context()
assert "Always uses uv" in result
def test_partial_update_preserves_existing_fields(self, temp_db):
memory_update_profile(role="Engineer", preferences="Concise code")
memory_update_profile(pinned_facts="- New pinned fact") # only update pinned_facts
result = memory_get_context()
assert "Engineer" in result # role preserved
assert "Concise code" in result # preferences preserved
assert "New pinned fact" in result # new field added
# ── memory_append_chunk ────────────────────────────────────────────────────────
class TestMemoryAppendChunk:
def test_returns_confirmation(self, temp_db):
sid = _start_and_get_id()
result = memory_append_chunk(session_id=sid, content="Manually saved content")
assert "✅ Chunk stored" in result
def test_chunk_is_searchable(self, temp_db):
sid = _start_and_get_id()
memory_append_chunk(session_id=sid, content="Manually appended important data")
result = memory_search_chunks("Manually appended")
assert "Manually appended" in result
def test_flag_reason_stored(self, temp_db):
sid = _start_and_get_id()
memory_append_chunk(
session_id=sid, content="Some important note",
flag_reason="manual save by user",
)
user = memory_store.get_or_create_user("testuser")
chunks = memory_store.search_chunks(user["id"], "important note")
assert chunks[0]["flag_reason"] == "manual save by user"
# ── memory_get_stats ───────────────────────────────────────────────────────────
class TestMemoryGetStats:
def test_returns_stats_markdown(self, temp_db):
result = memory_get_stats()
assert "📊 BigMind Stats" in result
assert "Sessions" in result
assert "Facts" in result
assert "Database size" in result
assert "Database path" in result
def test_session_count_is_accurate(self, temp_db):
sid = _start_and_get_id()
_close(sid)
result = memory_get_stats()
assert "| Sessions | 1 |" in result
def test_chunk_count_is_accurate(self, temp_db):
sid = _start_and_get_id()
memory_flag_important(session_id=sid, content="Counted chunk")
result = memory_get_stats()
assert "| Memory chunks (Tier 3) | 1 |" in result
def test_fact_count_is_accurate(self, temp_db):
memory_store_fact(category="test", fact="A counted fact")
result = memory_get_stats()
assert "| Facts | 1 |" in result
# ── memory_vacuum ──────────────────────────────────────────────────────────────
class TestMemoryVacuum:
def test_returns_confirmation(self, temp_db):
result = memory_vacuum(older_than_days=90)
assert "✅ Removed" in result
assert "chunk(s)" in result
def test_removes_old_chunks(self, temp_db):
user = memory_store.get_or_create_user("testuser")
sid = memory_store.create_session(user["id"])
old_date = (datetime.now(timezone.utc) - timedelta(days=100)).isoformat()
with db() as conn:
cur = conn.execute(
"""INSERT INTO conversation_chunks
(session_id, user_id, role, content, flag_reason, seq, created_at)
VALUES (?,?,?,?,?,?,?)""",
(sid, user["id"], "user", "old stale content", "old", 1, old_date),
)
chunk_id = cur.lastrowid
conn.execute(
"INSERT INTO conversation_chunks_fts(rowid, content, flag_reason) VALUES(?,?,?)",
(chunk_id, "old stale content", "old"),
)
result = memory_vacuum(older_than_days=90)
assert "Removed 1 chunk(s)" in result
def test_preserves_recent_chunks(self, temp_db):
sid = _start_and_get_id()
memory_flag_important(session_id=sid, content="Very recent important thing")
result = memory_vacuum(older_than_days=90)
assert "Removed 0 chunk(s)" in result
def test_old_chunk_not_searchable_after_vacuum(self, temp_db):
user = memory_store.get_or_create_user("testuser")
sid = memory_store.create_session(user["id"])
old_date = (datetime.now(timezone.utc) - timedelta(days=100)).isoformat()
with db() as conn:
cur = conn.execute(
"""INSERT INTO conversation_chunks
(session_id, user_id, role, content, flag_reason, seq, created_at)
VALUES (?,?,?,?,?,?,?)""",
(sid, user["id"], "user", "ancient secret data", "old", 1, old_date),
)
chunk_id = cur.lastrowid
conn.execute(
"INSERT INTO conversation_chunks_fts(rowid, content, flag_reason) VALUES(?,?,?)",
(chunk_id, "ancient secret data", "old"),
)
memory_vacuum(older_than_days=90)
result = memory_search_chunks("ancient secret data")
assert "No memory chunks found" in result
# ── memory_get_instructions ────────────────────────────────────────────────────
class TestMemoryGetInstructions:
def test_returns_string(self, temp_db):
result = memory_get_instructions()
assert isinstance(result, str)
assert len(result) > 0
def test_contains_start_session_directive(self, temp_db):
result = memory_get_instructions()
assert "memory_start_session" in result
def test_contains_end_session_directive(self, temp_db):
result = memory_get_instructions()
assert "memory_end_session" in result
def test_contains_mandatory_language(self, temp_db):
result = memory_get_instructions()
assert "ALWAYS" in result
# ── memory_health_check ────────────────────────────────────────────────────────
class TestMemoryHealthCheck:
def test_returns_health_report_header(self, temp_db):
result = memory_health_check()
assert "🩺 BigMind Health Check" in result
def test_fts_in_sync_shown_on_clean_db(self, temp_db):
result = memory_health_check()
assert "FTS index" in result
assert "" in result
def test_no_warnings_on_clean_db(self, temp_db):
result = memory_health_check()
assert "⚠️" not in result
def test_flags_stale_facts(self, temp_db):
user = memory_store.get_or_create_user("testuser")
fid = memory_store.store_fact(user["id"], "codebase", "Stale old technology note")
old_date = (datetime.now(timezone.utc) - timedelta(days=60)).isoformat()
with db() as conn:
conn.execute("UPDATE facts SET updated_at=? WHERE id=?", (old_date, fid))
result = memory_health_check(stale_days=30)
assert "Stale facts" in result
assert "Stale old technology note" in result
def test_fresh_facts_not_flagged_as_stale(self, temp_db):
memory_store_fact(category="preference", fact="Very fresh preference")
result = memory_health_check(stale_days=30)
assert "Stale facts: 0" not in result
# The "✅ Facts freshness" line should appear
assert "Facts freshness" in result
def test_flags_sessions_without_summary(self, temp_db):
user = memory_store.get_or_create_user("testuser")
sid = memory_store.create_session(user["id"])
memory_store.close_session(sid, "Session with no narrative")
result = memory_health_check()
assert "Sessions without Tier-2 summary" in result
def test_no_warning_when_all_sessions_have_summary(self, temp_db):
sid = _start_and_get_id()
_close(sid) # _close calls memory_end_session which saves a Tier-2 summary
result = memory_health_check()
assert "Sessions without Tier-2 summary" not in result
def test_flags_low_confidence_facts(self, temp_db):
user = memory_store.get_or_create_user("testuser")
memory_store.store_fact(user["id"], "codebase", "Uncertain technology choice", confidence=0.4)
result = memory_health_check()
assert "Low-confidence facts" in result
assert "Uncertain technology choice" in result
def test_open_sessions_listed(self, temp_db):
_start_and_get_id() # creates an open session
result = memory_health_check()
assert "Open sessions" in result
def test_default_stale_days_is_30(self, temp_db):
result = memory_health_check()
# Either "30 days" in the stale line or the clean version
assert "30" in result
# ── memory_export ──────────────────────────────────────────────────────────────
class TestMemoryExport:
def test_returns_confirmation(self, temp_db, tmp_path):
out = str(tmp_path / "test_export.json")
result = memory_export(output_path=out)
assert "" in result
assert "BigMind memory exported" in result
def test_shows_file_path_in_result(self, temp_db, tmp_path):
out = str(tmp_path / "test_export.json")
result = memory_export(output_path=out)
assert out in result
def test_file_created_on_disk(self, temp_db, tmp_path):
out = str(tmp_path / "test_export.json")
memory_export(output_path=out)
assert Path(out).exists()
def test_result_contains_count_rows(self, temp_db, tmp_path):
out = str(tmp_path / "test_export.json")
result = memory_export(output_path=out)
assert "| **Facts** |" in result
assert "| **Sessions** |" in result
assert "| **Chunks (Tier 3)** |" in result
assert "| **File size** |" in result
def test_exports_facts(self, temp_db, tmp_path):
memory_store_fact(category="preference", fact="Exported preference via tool")
out = str(tmp_path / "test_export.json")
memory_export(output_path=out)
data = json.loads(Path(out).read_text())
assert data["stats"]["facts_count"] == 1
assert any("Exported preference via tool" in f["fact"] for f in data["facts"])
def test_exports_sessions_and_summaries(self, temp_db, tmp_path):
sid = _start_and_get_id()
_close(sid, headline="Session for export")
out = str(tmp_path / "test_export.json")
memory_export(output_path=out)
data = json.loads(Path(out).read_text())
assert data["stats"]["sessions_count"] >= 1
matches = [s for s in data["sessions"] if s.get("one_liner") == "Session for export"]
assert len(matches) == 1
def test_exports_chunks(self, temp_db, tmp_path):
sid = _start_and_get_id()
memory_flag_important(session_id=sid, content="Chunk to be exported")
out = str(tmp_path / "test_export.json")
memory_export(output_path=out)
data = json.loads(Path(out).read_text())
assert data["stats"]["chunks_count"] == 1
assert any("Chunk to be exported" in c["content"] for c in data["conversation_chunks"])
def test_valid_json_output(self, temp_db, tmp_path):
out = str(tmp_path / "test_export.json")
memory_export(output_path=out)
# Should not raise
data = json.loads(Path(out).read_text())
assert isinstance(data, dict)
def test_export_date_present(self, temp_db, tmp_path):
out = str(tmp_path / "test_export.json")
memory_export(output_path=out)
data = json.loads(Path(out).read_text())
assert "export_date" in data
assert data["bigmind_version"] == "1.0"
# ── memory_deprecate_fact ──────────────────────────────────────────────────────
class TestMemoryDeprecateFact:
def test_returns_confirmation(self, temp_db):
result_store = memory_store_fact(category="codebase", fact="Old stack fact")
fid = int(re.search(r"id: (\d+)", result_store).group(1))
result = memory_deprecate_fact(fact_id=fid, reason="Technology replaced")
assert "" in result
assert str(fid) in result
def test_reason_shown_in_confirmation(self, temp_db):
result_store = memory_store_fact(category="decision", fact="Used Gradle")
fid = int(re.search(r"id: (\d+)", result_store).group(1))
result = memory_deprecate_fact(fact_id=fid, reason="Switched to Maven")
assert "Switched to Maven" in result
def test_no_reason_still_succeeds(self, temp_db):
result_store = memory_store_fact(category="preference", fact="Some old pref")
fid = int(re.search(r"id: (\d+)", result_store).group(1))
result = memory_deprecate_fact(fact_id=fid)
assert "" in result
def test_deprecated_fact_absent_from_context(self, temp_db):
result_store = memory_store_fact(category="codebase", fact="Outdated deployment detail")
fid = int(re.search(r"id: (\d+)", result_store).group(1))
# Confirm it's in context before deprecation
assert "Outdated deployment detail" in memory_get_context()
# Deprecate it
memory_deprecate_fact(fact_id=fid, reason="No longer true")
# Must be gone from context
assert "Outdated deployment detail" not in memory_get_context()
def test_other_facts_unaffected_by_deprecation(self, temp_db):
r1 = memory_store_fact(category="preference", fact="Keep this preference")
r2 = memory_store_fact(category="codebase", fact="Drop this codebase note")
fid_drop = int(re.search(r"id: (\d+)", r2).group(1))
memory_deprecate_fact(fact_id=fid_drop)
ctx = memory_get_context()
assert "Keep this preference" in ctx
assert "Drop this codebase note" not in ctx
def test_nonexistent_fact_returns_error(self, temp_db):
result = memory_deprecate_fact(fact_id=99999)
assert "" in result
assert "99999" in result
def test_health_check_does_not_flag_deprecated_facts_as_stale(self, temp_db):
"""Deprecated facts should not surface as actionable stale warnings."""
user = memory_store.get_or_create_user("testuser")
fid = memory_store.store_fact(user["id"], "codebase", "Will be deprecated")
from datetime import timedelta
old_date = (datetime.now(timezone.utc) - timedelta(days=60)).isoformat()
from bigmind.db import db as _db
with _db() as conn:
conn.execute("UPDATE facts SET updated_at=? WHERE id=?", (old_date, fid))
memory_deprecate_fact(fact_id=fid, reason="Removed intentionally")
result = memory_health_check(stale_days=30)
assert "Will be deprecated" not in result
# ── memory_add_hypothesis / resolve / list ─────────────────────────────────────
class TestMemoryHypotheses:
def test_add_returns_confirmation(self, temp_db):
sid = _start_and_get_id()
result = memory_add_hypothesis(
session_id=sid,
hypothesis="I believe the issue is in the connection pool",
)
assert "💭 Hypothesis recorded" in result
assert "connection pool" in result
def test_add_shows_confidence_percent(self, temp_db):
sid = _start_and_get_id()
result = memory_add_hypothesis(
session_id=sid,
hypothesis="High confidence belief",
confidence=0.9,
)
assert "90%" in result
def test_add_returns_id(self, temp_db):
sid = _start_and_get_id()
result = memory_add_hypothesis(session_id=sid, hypothesis="Some thought")
assert re.search(r"id: \d+", result)
def test_list_empty_returns_message(self, temp_db):
result = memory_list_hypotheses()
assert "No hypotheses found" in result
def test_list_shows_open_hypotheses(self, temp_db):
sid = _start_and_get_id()
memory_add_hypothesis(session_id=sid, hypothesis="The cache is stale")
result = memory_list_hypotheses()
assert "The cache is stale" in result
assert "💭" in result
def test_list_filter_by_status(self, temp_db):
sid = _start_and_get_id()
memory_add_hypothesis(session_id=sid, hypothesis="Will be confirmed soon")
memory_add_hypothesis(session_id=sid, hypothesis="Will stay open")
user = memory_store.get_or_create_user("testuser")
hyps = memory_store.list_hypotheses(user["id"])
hid = next(h["id"] for h in hyps if "confirmed" in h["hypothesis"])
memory_resolve_hypothesis(hypothesis_id=hid, status="confirmed", resolution="Yes it was true")
open_result = memory_list_hypotheses(status="open")
confirmed_result = memory_list_hypotheses(status="confirmed")
assert "Will stay open" in open_result
assert "Will be confirmed soon" not in open_result
assert "Will be confirmed soon" in confirmed_result
def test_resolve_confirmed_shows_checkmark(self, temp_db):
sid = _start_and_get_id()
result_add = memory_add_hypothesis(session_id=sid, hypothesis="Bug in serializer")
hid = int(re.search(r"id: (\d+)", result_add).group(1))
result = memory_resolve_hypothesis(
hypothesis_id=hid,
status="confirmed",
resolution="Confirmed — null check was missing in BVV serializer",
)
assert "" in result
assert "confirmed" in result
def test_resolve_refuted_shows_cross(self, temp_db):
sid = _start_and_get_id()
result_add = memory_add_hypothesis(session_id=sid, hypothesis="Network latency")
hid = int(re.search(r"id: (\d+)", result_add).group(1))
result = memory_resolve_hypothesis(
hypothesis_id=hid,
status="refuted",
resolution="Was a race condition, not network",
)
assert "" in result
assert "refuted" in result
def test_resolve_abandoned_shows_icon(self, temp_db):
sid = _start_and_get_id()
result_add = memory_add_hypothesis(session_id=sid, hypothesis="Might be cache")
hid = int(re.search(r"id: (\d+)", result_add).group(1))
result = memory_resolve_hypothesis(hypothesis_id=hid, status="abandoned")
assert "🚫" in result
assert "abandoned" in result
def test_resolve_shows_resolution_text(self, temp_db):
sid = _start_and_get_id()
result_add = memory_add_hypothesis(session_id=sid, hypothesis="A theory")
hid = int(re.search(r"id: (\d+)", result_add).group(1))
result = memory_resolve_hypothesis(
hypothesis_id=hid, status="confirmed", resolution="The theory held up"
)
assert "The theory held up" in result
def test_resolve_invalid_status_returns_error(self, temp_db):
sid = _start_and_get_id()
result_add = memory_add_hypothesis(session_id=sid, hypothesis="Some belief")
hid = int(re.search(r"id: (\d+)", result_add).group(1))
result = memory_resolve_hypothesis(hypothesis_id=hid, status="maybe")
assert "" in result
def test_resolve_nonexistent_id_returns_error(self, temp_db):
result = memory_resolve_hypothesis(hypothesis_id=99999, status="confirmed")
assert "" in result
assert "99999" in result
def test_list_shows_resolution_when_resolved(self, temp_db):
sid = _start_and_get_id()
result_add = memory_add_hypothesis(session_id=sid, hypothesis="Root cause is threading")
hid = int(re.search(r"id: (\d+)", result_add).group(1))
memory_resolve_hypothesis(
hypothesis_id=hid, status="confirmed",
resolution="Thread-local storage was the culprit"
)
result = memory_list_hypotheses()
assert "Thread-local storage was the culprit" in result
def test_list_status_filter_no_match_returns_message(self, temp_db):
sid = _start_and_get_id()
memory_add_hypothesis(session_id=sid, hypothesis="Open thought")
result = memory_list_hypotheses(status="confirmed")
assert "No hypotheses found" in result