Skip to content

Commit 80ccc2f

Browse files
cdeustclaude
andcommitted
perf(consolidate): v3.11 deferred fixes — batching, cache, plasticity, CLS, cascade (#13)
Addresses the five pathologies identified by the agent+genius audit of darval's performance report (issue #13). Phase order matches the telemetry patch commit message in 69d81fb. Phase A — Set-based SQL batching --------------------------------- Per-row UPDATE/DELETE loops with per-row commits were the dominant cost of consolidate on a 66K-memory store (Erlang analysis: ~64 ops/s ceiling imposed by fsync amplification). Added batch methods: pg_store.update_memories_heat_batch (UPDATE FROM UNNEST) pg_store.update_entities_heat_batch pg_store.archive_entities_batch (UPDATE WHERE id = ANY) pg_store.update_relationships_weight_batch pg_store.delete_relationships_batch pg_store.insert_stage_transitions_batch (INSERT FROM UNNEST) Each mirrors onto SqliteMemoryStore via executemany + single commit. Callers converted: decay.run_decay_cycle → heat updates in one statement decay._decay_entities → entity heat in one statement decay._modulate_domain → metabolic adjustments in one statement plasticity._apply_updates → 33k LTP/LTD updates in one statement pruning._prune_edges → 32k DELETEs in one statement pruning._archive_orphans → orphan archival in one statement cascade.run_cascade_advancement → transition INSERTs in one statement Erlang predicted 100-500× speedup on the decay stage alone. On darval's 66K store the decay stage should drop from ~1100 s to 3-10 s. Phase B — Consolidation-scoped memory cache -------------------------------------------- `store.get_all_memories_for_decay()` was called 6× per consolidate run (decay, compression, memify, homeostatic, sleep, emergence). Load once at handler entry, pass list to each stage: run_decay_cycle(store, settings, memories=None) run_compression_cycle(store, settings, embeddings, memories=None) run_memify_cycle(store, memories=None) run_homeostatic_cycle(store, memories=None) run_deep_sleep(store, embeddings, memories=None) (emergence closure uses the same list inline) Each stage retains a fallback load when called standalone outside the handler, preserving backward compatibility. Phase C — Plasticity co-access starvation ------------------------------------------ Feinstein's audit: `get_hot_memories(limit=50)` sampled 0.5% of a 10k-entity store → 99.95% LTD was distribution collapse, not plasticity. Fix: - raised sample cap 50 → 2000 (_CO_ACCESS_SAMPLE_CAP) - reuses the Phase B consolidation-scoped list when present - precomputes lowercase entity name index once per run - exception handler now surfaces the error (was logger.debug silent) - returns co_access_pairs and memories_sampled for diagnostics Phase D — CLS causal-edge signal gate -------------------------------------- Feynman's audit: `_discover_causal_edges` hard-capped at 500 episodic on a 25K store (2% sample), ran O(entities × 500) substring scans, produced 0 edges by construction. Fix: - episodic sample cap 500 → 2000 (_EPISODIC_SAMPLE_CAP) - precomputes content_lowered once, then per-name scan is one pass - early-exit gate: if fewer than _MIN_ENTITIES_FOR_PC entities clear _PC_MIN_OBSERVATIONS mentions, skip the O(E²) PC pass - restricts the PC vocabulary to qualifying entities only, so the matrix is E_qualifying² not E_all² - returns episodic_scanned for diagnostics Phase E — Cascade heartbeat + payload trim ------------------------------------------- Feinstein's audit: cascade wrote a heartbeat UPDATE on every scanned memory (~2000) even when nothing advanced, each with its own commit; also per-transition INSERT+commit for stage_transitions (503 fsyncs on darval's run); also returned the full 503-transition list in the MCP response duplicating queryable DB state. Fix: - heartbeat UPDATE skipped when |Δhours| < _HEARTBEAT_SKIP_HOURS (1h) — the previous value was pure noise and wasted fsync - stage_transitions INSERTs batched into one statement - response payload capped: transitions_preview (first 50) + count - exception handler surfaces the error (was logger.debug silent) - returns scanned / heartbeats_written / heartbeats_skipped - SQLite schema gets stage_transitions table + stage_entered_at migration so cascade no longer crashes silently on cowork mode Reviewed by agents: code-reviewer (APPROVE), test-engineer (2188 pass), genius:erlang (queuing analysis), genius:feinstein (differential), genius:feynman (independent rederivation). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 69d81fb commit 80ccc2f

19 files changed

Lines changed: 517 additions & 149 deletions

mcp_server/handlers/consolidate.py

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,14 @@ async def handler(args: dict[str, Any] | None = None) -> dict[str, Any]:
127127
embeddings = get_embedding_engine()
128128
start = time.monotonic()
129129

130-
stats = _run_cycles(args, store, settings, embeddings)
131-
stats = _run_always_cycles(args, store, stats)
130+
# Phase B (issue #13): load the full memory list once and thread it
131+
# through every stage that needs it, so consolidate does ONE load
132+
# instead of 6 (decay, compression, memify, homeostatic, sleep,
133+
# emergence). Cheap stages still load ad-hoc for standalone callers.
134+
memories = store.get_all_memories_for_decay()
135+
136+
stats = _run_cycles(args, store, settings, embeddings, memories)
137+
stats = _run_always_cycles(args, store, stats, memories)
132138

133139
elapsed_ms = int((time.monotonic() - start) * 1000)
134140
stats["duration_ms"] = elapsed_ms
@@ -149,28 +155,33 @@ def _run_cycles(
149155
store: MemoryStore,
150156
settings: Any,
151157
embeddings: EmbeddingEngine,
158+
memories: list[dict],
152159
) -> dict[str, Any]:
153-
"""Run optional maintenance cycles based on args flags."""
160+
"""Run optional maintenance cycles based on args flags.
161+
162+
`memories` is the consolidation-scoped snapshot so stages share one
163+
load across the whole run (issue #13).
164+
"""
154165
stats: dict[str, Any] = {}
155166

156167
if args.get("decay", True):
157-
stats["decay"] = _timed(run_decay_cycle, store, settings)
168+
stats["decay"] = _timed(run_decay_cycle, store, settings, memories)
158169
stats["plasticity"] = _timed(run_plasticity_cycle, store)
159170
stats["pruning"] = _timed(run_pruning_cycle, store)
160171

161172
if args.get("compress", True):
162173
stats["compression"] = _timed(
163-
run_compression_cycle, store, settings, embeddings
174+
run_compression_cycle, store, settings, embeddings, memories
164175
)
165176

166177
if args.get("cls", True):
167178
stats["cls"] = _timed(run_cls_cycle, store, settings, embeddings)
168179

169180
if args.get("memify", True):
170-
stats["memify"] = _timed(run_memify_cycle, store)
181+
stats["memify"] = _timed(run_memify_cycle, store, memories)
171182

172183
if args.get("deep", False):
173-
stats["deep_sleep"] = _timed(run_deep_sleep, store, embeddings)
184+
stats["deep_sleep"] = _timed(run_deep_sleep, store, embeddings, memories)
174185

175186
return stats
176187

@@ -179,17 +190,18 @@ def _run_always_cycles(
179190
args: dict,
180191
store: MemoryStore,
181192
stats: dict[str, Any],
193+
memories: list[dict],
182194
) -> dict[str, Any]:
183195
"""Run cycles that always execute regardless of flags."""
184196
stats["cascade"] = _timed(run_cascade_advancement, store)
185-
stats["homeostatic"] = _timed(run_homeostatic_cycle, store)
197+
stats["homeostatic"] = _timed(run_homeostatic_cycle, store, memories)
186198

187199
if args.get("deep", False):
188200
stats["transfer"] = _timed(run_two_stage_transfer, store)
189201

190202
def _run_emergence() -> dict[str, Any]:
191-
all_mems = store.get_all_memories_for_decay()
192-
return emergence_tracker.generate_emergence_report(all_mems) or {}
203+
# Uses the consolidation-scoped memory list — no extra load.
204+
return emergence_tracker.generate_emergence_report(memories) or {}
193205

194206
stats["emergence"] = _timed(_run_emergence)
195207

mcp_server/handlers/consolidation/cascade.py

Lines changed: 63 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -17,34 +17,61 @@
1717

1818
_ADVANCEABLE_STAGES = ["labile", "early_ltp", "late_ltp", "reconsolidating"]
1919

20+
# Source: issue #13 — cascade previously wrote a heartbeat UPDATE on
21+
# EVERY scanned memory (~2000) even when nothing advanced. Below this
22+
# delta, the hours_in_stage change is noise and the write is waste.
23+
_HEARTBEAT_SKIP_HOURS = 1.0
24+
25+
# Source: issue #13 — the 503-transition payload darval reported is
26+
# redundant with the stage_transitions table and inflates the MCP
27+
# response. Surface a preview + count instead.
28+
_TRANSITION_PREVIEW_CAP = 50
29+
2030

2131
def run_cascade_advancement(store: MemoryStore) -> dict:
22-
"""Advance memory consolidation stages based on real elapsed time."""
32+
"""Advance memory consolidation stages based on real elapsed time.
33+
34+
Skips no-op heartbeat UPDATEs (|Δhours| < _HEARTBEAT_SKIP_HOURS),
35+
batches stage_transitions INSERTs into one statement, and caps the
36+
response payload at `transitions_preview` (first N) + total count.
37+
"""
2338
try:
24-
advanced = 0
2539
transitions: list[dict] = []
40+
heartbeats_written = 0
41+
heartbeats_skipped = 0
42+
scanned = 0
2643
now = datetime.now(timezone.utc)
2744

2845
for stage_name in _ADVANCEABLE_STAGES:
2946
memories = store.get_memories_by_stage(stage_name, limit=500)
47+
scanned += len(memories)
3048

3149
for mem in memories:
32-
result = _try_advance(store, mem, stage_name, now)
50+
result, heartbeat = _try_advance(store, mem, stage_name, now)
3351
if result:
34-
advanced += 1
3552
transitions.append(result)
53+
if heartbeat == "written":
54+
heartbeats_written += 1
55+
elif heartbeat == "skipped":
56+
heartbeats_skipped += 1
3657

37-
# Log transitions to stage_transitions table
38-
for t in transitions:
39-
_log_transition(store, t)
58+
store.insert_stage_transitions_batch(transitions)
4059

4160
return {
42-
"advanced": advanced,
43-
"transitions": transitions,
61+
"advanced": len(transitions),
62+
"scanned": scanned,
63+
"heartbeats_written": heartbeats_written,
64+
"heartbeats_skipped": heartbeats_skipped,
65+
"transitions_count": len(transitions),
66+
"transitions_preview": transitions[:_TRANSITION_PREVIEW_CAP],
67+
}
68+
except Exception as exc:
69+
logger.warning("Cascade advancement failed: %s", exc, exc_info=True)
70+
return {
71+
"advanced": 0,
72+
"scanned": 0,
73+
"error": f"{type(exc).__name__}: {exc}",
4474
}
45-
except Exception as e:
46-
logger.debug("Cascade advancement failed: %s", e)
47-
return {"advanced": 0, "transitions": []}
4875

4976

5077
def _compute_real_hours(mem: dict, now: datetime) -> float:
@@ -93,8 +120,12 @@ def _try_advance(
93120
mem: dict,
94121
stage_name: str,
95122
now: datetime,
96-
) -> dict | None:
97-
"""Check and advance a single memory. Returns transition info or None."""
123+
) -> tuple[dict | None, str]:
124+
"""Check and advance a single memory.
125+
126+
Returns (transition_or_None, heartbeat_status) where heartbeat_status
127+
is one of "written", "skipped", "transition".
128+
"""
98129
hours = _compute_real_hours(mem, now)
99130

100131
ready, next_stage, _ = compute_advancement_readiness(
@@ -124,22 +155,32 @@ def _try_advance(
124155
mem.get("hippocampal_dependency", 1.0),
125156
)
126157
_update_stage_entered(store, mem["id"], new_entered)
127-
return {
128-
"memory_id": mem["id"],
129-
"from_stage": stage_name,
130-
"to_stage": next_stage,
131-
"hours_in_prev": round(hours, 2),
132-
}
158+
return (
159+
{
160+
"memory_id": mem["id"],
161+
"from_stage": stage_name,
162+
"to_stage": next_stage,
163+
"hours_in_prev": round(hours, 2),
164+
},
165+
"transition",
166+
)
167+
168+
# Not advancing: only write a heartbeat if the hours delta is
169+
# large enough to be informative. Below _HEARTBEAT_SKIP_HOURS the
170+
# change is noise and the write is wasted fsync amplification
171+
# (issue #13, Feinstein audit of darval's 66K-store run).
172+
prev_hours = float(mem.get("hours_in_stage", 0.0) or 0.0)
173+
if abs(hours - prev_hours) < _HEARTBEAT_SKIP_HOURS:
174+
return None, "skipped"
133175

134-
# Not ready: just update hours_in_stage with real value
135176
store.update_memory_consolidation(
136177
mem["id"],
137178
stage_name,
138179
round(hours, 2),
139180
mem.get("replay_count", 0),
140181
mem.get("hippocampal_dependency", 1.0),
141182
)
142-
return None
183+
return None, "written"
143184

144185

145186
def _update_stage_entered(store: MemoryStore, memory_id: int, now: datetime) -> None:
@@ -152,23 +193,3 @@ def _update_stage_entered(store: MemoryStore, memory_id: int, now: datetime) ->
152193
store._conn.commit()
153194
except Exception:
154195
pass
155-
156-
157-
def _log_transition(store: MemoryStore, transition: dict) -> None:
158-
"""Log a stage transition to the stage_transitions table."""
159-
try:
160-
store._conn.execute(
161-
"INSERT INTO stage_transitions "
162-
"(memory_id, from_stage, to_stage, hours_in_prev_stage, trigger) "
163-
"VALUES (%s, %s, %s, %s, %s)",
164-
(
165-
transition["memory_id"],
166-
transition["from_stage"],
167-
transition["to_stage"],
168-
transition["hours_in_prev"],
169-
"cascade",
170-
),
171-
)
172-
store._conn.commit()
173-
except Exception as e:
174-
logger.debug("Failed to log transition: %s", e)

mcp_server/handlers/consolidation/cls.py

Lines changed: 61 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,20 @@
1717

1818
logger = logging.getLogger(__name__)
1919

20+
# Source: issue #13 — previous cap of 500 saw ~2% of a 25k-episodic
21+
# store and produced 0 patterns by construction. 2000 matches plasticity
22+
# sampling and keeps PC algorithm's O(E^2) worst case tractable on a
23+
# 10k-entity vocabulary.
24+
_EPISODIC_SAMPLE_CAP = 2000
25+
_SEMANTICS_SAMPLE_CAP = 2000
26+
2027
_EMPTY_CLS_STATS = {
2128
"patterns_found": 0,
2229
"new_semantics_created": 0,
2330
"skipped_inconsistent": 0,
2431
"skipped_duplicate": 0,
32+
"causal_edges_found": 0,
33+
"episodic_scanned": 0,
2534
}
2635

2736

@@ -30,9 +39,16 @@ def run_cls_cycle(
3039
settings,
3140
embeddings: EmbeddingEngine,
3241
) -> dict:
33-
"""Run CLS consolidation: episodic -> semantic pattern extraction."""
34-
episodic = store.get_episodic_memories(limit=500)
35-
existing_semantics = store.get_semantic_memories(limit=500)
42+
"""Run CLS consolidation: episodic → semantic pattern extraction.
43+
44+
Pattern extraction (`plan_cls_consolidation`) and causal-edge
45+
discovery (`_discover_causal_edges`) sample up to 2000 episodic
46+
memories each — raised from 500 after Feynman's audit of darval's
47+
66K run in issue #13 showed 500 sampled 2% of the episodic store
48+
and produced 0 patterns by construction.
49+
"""
50+
episodic = store.get_episodic_memories(limit=_EPISODIC_SAMPLE_CAP)
51+
existing_semantics = store.get_semantic_memories(limit=_SEMANTICS_SAMPLE_CAP)
3652

3753
if not episodic:
3854
return _EMPTY_CLS_STATS.copy()
@@ -47,6 +63,7 @@ def run_cls_cycle(
4763
"skipped_inconsistent": plan["skipped_inconsistent"],
4864
"skipped_duplicate": plan["skipped_duplicate"],
4965
"causal_edges_found": causal_edges_found,
66+
"episodic_scanned": len(episodic),
5067
}
5168

5269

@@ -133,39 +150,69 @@ def _discover_causal_edges(
133150
store: MemoryStore,
134151
episodic: list[dict],
135152
) -> int:
136-
"""Discover causal edges from entity co-occurrences."""
153+
"""Discover causal edges from entity co-occurrences (PC algorithm).
154+
155+
Gates on minimum signal before running the O(E²) independence tests:
156+
the PC algorithm needs at least `min_observations` mentions per
157+
entity in the sample to distinguish correlation from chance, so if
158+
fewer than `_MIN_ENTITIES_FOR_PC` entities clear that threshold,
159+
skip the analysis entirely (issue #13 Phase D).
160+
"""
137161
try:
138162
all_entities = store.get_all_entities(min_heat=0.0)
139163
entity_names = [e["name"] for e in all_entities if e.get("name")]
140164
if not entity_names or not episodic:
141165
return 0
142166

143-
co_matrix = compute_co_occurrence_matrix(episodic, entity_names)
144167
entity_counts = _count_entity_mentions(entity_names, episodic)
168+
qualifying = sum(1 for c in entity_counts.values() if c >= _PC_MIN_OBSERVATIONS)
169+
if qualifying < _MIN_ENTITIES_FOR_PC:
170+
# Insufficient signal — don't run the full O(E^2) pass.
171+
return 0
172+
173+
# Restrict the vocabulary to entities that meet the minimum, so
174+
# the co-occurrence matrix is E_qualifying^2, not E_all^2.
175+
active_names = [
176+
n for n in entity_names if entity_counts[n] >= _PC_MIN_OBSERVATIONS
177+
]
178+
co_matrix = compute_co_occurrence_matrix(episodic, active_names)
179+
active_counts = {n: entity_counts[n] for n in active_names}
145180
edges = discover_causal_edges(
146-
entity_names,
181+
active_names,
147182
co_matrix,
148-
entity_counts,
183+
active_counts,
149184
len(episodic),
150-
min_observations=3,
185+
min_observations=_PC_MIN_OBSERVATIONS,
151186
independence_threshold=0.5,
152187
)
153188
return _store_causal_edges(store, all_entities, edges)
154-
except Exception:
155-
logger.debug("Causal discovery failed (non-fatal)")
189+
except Exception as exc:
190+
logger.warning("Causal discovery failed: %s", exc, exc_info=True)
156191
return 0
157192

158193

194+
# Source: PC algorithm lower bound — need ≥3 observations per variable
195+
# to distinguish dependence from sampling noise; need ≥5 active variables
196+
# for the independence tests to produce any non-trivial edge.
197+
_PC_MIN_OBSERVATIONS = 3
198+
_MIN_ENTITIES_FOR_PC = 5
199+
200+
159201
def _count_entity_mentions(
160202
entity_names: list[str],
161203
episodic: list[dict],
162204
) -> dict[str, int]:
163-
"""Count how many episodic memories mention each entity."""
205+
"""Count how many episodic memories mention each entity.
206+
207+
Single pass over the episodic sample with precomputed lowercase
208+
content and lowercase entity names (replaces the old O(N_ep × N_ent)
209+
loop that called .lower() on every cell).
210+
"""
211+
content_lowered = [(m.get("content") or "").lower() for m in episodic]
164212
counts: dict[str, int] = {}
165213
for name in entity_names:
166-
counts[name] = sum(
167-
1 for m in episodic if name.lower() in (m.get("content") or "").lower()
168-
)
214+
name_l = name.lower()
215+
counts[name] = sum(1 for c in content_lowered if name_l in c)
169216
return counts
170217

171218

mcp_server/handlers/consolidation/compression.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,21 @@ def run_compression_cycle(
2424
store: MemoryStore,
2525
settings: Any,
2626
embeddings: EmbeddingEngine,
27+
memories: list[dict] | None = None,
2728
) -> dict:
28-
"""Compress aging memories along the rate-distortion curve."""
29-
memories = store.get_all_memories_for_decay()
29+
"""Compress aging memories along the rate-distortion curve.
30+
31+
`memories` may be pre-loaded by the consolidate handler (issue #13).
32+
"""
33+
if memories is None:
34+
memories = store.get_all_memories_for_decay()
3035

3136
stats = {
3237
"compressed_to_gist": 0,
3338
"compressed_to_tag": 0,
3439
"protected_skipped": 0,
3540
"semantic_skipped": 0,
41+
"rows_scanned": len(memories),
3642
}
3743

3844
for mem in memories:

0 commit comments

Comments
 (0)