Skip to content

Commit bab29b2

Browse files
authored
Merge pull request #90 from rolandpg/feat/rfc-009-phase0.5-instrumentation
feat(telemetry): per-phase timers in remember() — RFC-009 Phase 0.5
2 parents e6bb580 + 3871f85 commit bab29b2

2 files changed

Lines changed: 98 additions & 2 deletions

File tree

src/zettelforge/memory_manager.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,21 +230,33 @@ def remember(
230230
)
231231

232232
# Direct store path
233+
# [RFC-009 Phase 0.5] Per-phase timings to attribute remember() latency.
234+
# Emitted in ocsf_api_activity.phase_timings_ms so the RFC-007 aggregator
235+
# can bucket them without schema changes.
236+
phase_timings_ms: Dict[str, float] = {}
237+
238+
_p = time.perf_counter()
233239
note = self.constructor.construct(
234240
raw_content=content, source_type=source_type, source_ref=source_ref, domain=domain
235241
)
242+
phase_timings_ms["construct"] = (time.perf_counter() - _p) * 1000
236243

244+
_p = time.perf_counter()
237245
self.store.write_note(note)
246+
phase_timings_ms["write_note"] = (time.perf_counter() - _p) * 1000
238247
self.stats["notes_created"] += 1
239248

240249
# Keep LanceDB in sync for vector retrieval
241250
if self._lance_store.lancedb is not None:
251+
_p = time.perf_counter()
242252
try:
243253
self._lance_store._index_in_lance(note)
244254
except Exception:
245255
self._logger.warning("lance_index_sync_failed", note_id=note.id, exc_info=True)
256+
phase_timings_ms["lance_index"] = (time.perf_counter() - _p) * 1000
246257

247258
# Alias resolution and indexing (regex-only for speed; LLM NER runs on recall)
259+
_p = time.perf_counter()
248260
raw_entities = self.indexer.extractor.extract_all(note.content.raw, use_llm=False)
249261

250262
resolved_entities = {}
@@ -257,8 +269,10 @@ def remember(
257269
for etype, elist in resolved_entities.items():
258270
for evalue in elist:
259271
self.store.add_entity_mapping(etype, evalue, note.id)
272+
phase_timings_ms["entity_index"] = (time.perf_counter() - _p) * 1000
260273

261274
# GAM consolidation: observe note for semantic shift detection
275+
_p = time.perf_counter()
262276
try:
263277
is_shift, shift_meta = self.consolidation.before_write(
264278
note_entities=resolved_entities,
@@ -274,14 +288,24 @@ def remember(
274288
)
275289
except Exception as e:
276290
self._logger.warning("consolidation_observe_failed", error=str(e))
291+
phase_timings_ms["consolidation_observe"] = (time.perf_counter() - _p) * 1000
277292

278293
# Phase 3: Check supersession
294+
_p = time.perf_counter()
279295
self._check_supersession(note, resolved_entities)
296+
phase_timings_ms["supersession"] = (time.perf_counter() - _p) * 1000
280297

281298
# Phase 6: Knowledge Graph Update (heuristic edges — fast path)
299+
_p = time.perf_counter()
282300
self._update_knowledge_graph(note, resolved_entities)
283-
284-
# Phase 6b: LLM causal enrichment (slow path — background worker)
301+
phase_timings_ms["kg_update"] = (time.perf_counter() - _p) * 1000
302+
303+
# Phase 6b/6c/6d: dispatch background enrichment jobs (causal + NER + evolution).
304+
# The dispatch bucket measures job construction + put_nowait() + count_notes()
305+
# overhead only. In sync=True mode the LLM work runs inline and is intentionally
306+
# EXCLUDED from this bucket — mixing LLM latency into "dispatch" would corrupt
307+
# the Phase 0.5 attribution. sync=True is retained for tests/debug.
308+
dispatch_start = time.perf_counter() if not sync else None
285309
job = _EnrichmentJob(
286310
note_id=note.id,
287311
domain=domain,
@@ -331,6 +355,8 @@ def remember(
331355
self._pending_enrichment.add(note.id)
332356
except queue.Full:
333357
self._logger.warning("evolution_queue_full", note_id=note.id)
358+
if dispatch_start is not None:
359+
phase_timings_ms["enrichment_dispatch"] = (time.perf_counter() - dispatch_start) * 1000
334360

335361
duration_ms = (time.perf_counter() - start) * 1000
336362
log_api_activity(
@@ -341,6 +367,7 @@ def remember(
341367
duration_ms=duration_ms,
342368
request_id=request_id,
343369
evolve=False,
370+
phase_timings_ms={k: round(v, 2) for k, v in phase_timings_ms.items()},
344371
)
345372
return note, "created"
346373

tests/test_logging_compliance.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,75 @@ def test_authorization_deny_fields(self):
139139
assert event["policy"] == "GOV-011"
140140

141141

142+
class TestPhaseTimingsInstrumentation:
143+
"""[RFC-009 Phase 0.5] remember() emits per-phase timers for latency attribution."""
144+
145+
def test_remember_emits_phase_timings_ms(self):
146+
"""The ocsf_api_activity event for remember() must carry phase_timings_ms
147+
with numeric values for each instrumented phase."""
148+
from zettelforge import MemoryManager
149+
150+
with tempfile.TemporaryDirectory() as tmpdir:
151+
mm = MemoryManager(jsonl_path=f"{tmpdir}/notes.jsonl", lance_path=f"{tmpdir}/vectordb")
152+
with structlog.testing.capture_logs() as logs:
153+
mm.remember("APT28 deploys Cobalt Strike beacons", domain="cti")
154+
155+
remember_events = [
156+
e
157+
for e in logs
158+
if e.get("event") == "ocsf_api_activity" and e.get("activity_name") == "remember"
159+
]
160+
assert remember_events, "No ocsf_api_activity event for remember() was emitted"
161+
162+
event = remember_events[-1]
163+
assert "phase_timings_ms" in event, (
164+
"phase_timings_ms missing from remember() ocsf_api_activity event"
165+
)
166+
timings = event["phase_timings_ms"]
167+
assert isinstance(timings, dict)
168+
169+
# Phases that always run on the direct-store async path
170+
expected_keys = {
171+
"construct",
172+
"write_note",
173+
"entity_index",
174+
"consolidation_observe",
175+
"supersession",
176+
"kg_update",
177+
"enrichment_dispatch",
178+
}
179+
missing = expected_keys - set(timings)
180+
assert not missing, f"Missing phase timings: {missing}. Got: {set(timings)}"
181+
182+
for key, value in timings.items():
183+
assert isinstance(value, (int, float)), (
184+
f"phase_timings_ms[{key!r}] is {type(value).__name__}, expected numeric"
185+
)
186+
assert value >= 0, f"phase_timings_ms[{key!r}] = {value} (negative)"
187+
188+
def test_enrichment_dispatch_excluded_in_sync_mode(self):
189+
"""In sync=True mode the dispatch bucket is intentionally omitted so
190+
inline LLM work is not misattributed to dispatch latency."""
191+
from zettelforge import MemoryManager
192+
193+
with tempfile.TemporaryDirectory() as tmpdir:
194+
mm = MemoryManager(jsonl_path=f"{tmpdir}/notes.jsonl", lance_path=f"{tmpdir}/vectordb")
195+
with structlog.testing.capture_logs() as logs:
196+
mm.remember("APT29 phishing campaign observed", domain="cti", sync=True)
197+
198+
remember_events = [
199+
e
200+
for e in logs
201+
if e.get("event") == "ocsf_api_activity" and e.get("activity_name") == "remember"
202+
]
203+
assert remember_events
204+
timings = remember_events[-1]["phase_timings_ms"]
205+
assert "enrichment_dispatch" not in timings, (
206+
"sync=True should NOT emit enrichment_dispatch (would mix LLM work into "
207+
"dispatch bucket and corrupt Phase 0.5 attribution)"
208+
)
209+
210+
142211
class TestLanceDBFailureLogged:
143212
"""Regression test for #26: LanceDB failures must be logged, not swallowed."""
144213

0 commit comments

Comments
 (0)