Skip to content

Commit 7a6ce9f

Browse files
Fix drift tracer example to track per-run vocabulary
1 parent 35015cb commit 7a6ce9f

1 file changed

Lines changed: 28 additions & 17 deletions

File tree

docs-website/docs/development/tracing.mdx

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -367,17 +367,18 @@ The example below implements a lightweight `DriftMonitorTracer` that tracks whic
367367
```python
368368
import contextlib
369369
import re
370-
from collections import defaultdict
371370
from typing import Any, Iterator, Optional
372371

373372
from haystack import tracing
374373
from haystack.tracing import Span, Tracer
375374

376375

377376
class InMemorySpan(Span):
378-
"""Lightweight span that accumulates tag values for drift inspection."""
377+
"""Lightweight span that shares a run-level vocabulary set."""
379378

380-
def __init__(self) -> None:
379+
def __init__(self, operation_name: str, run_vocab: set[str]) -> None:
380+
self.operation_name = operation_name
381+
self.run_vocab = run_vocab
381382
self._tags: dict[str, Any] = {}
382383

383384
def set_tag(self, key: str, value: Any) -> None:
@@ -405,25 +406,37 @@ class DriftMonitorTracer(Tracer):
405406
def __init__(self, alert_threshold: float = 0.40) -> None:
406407
self._alert_threshold = alert_threshold
407408
self._baseline_vocab: Optional[set[str]] = None
409+
self._latest_vocab: set[str] = set()
408410
self._run_count = 0
409411
self._current_span: Optional[InMemorySpan] = None
410412

411413
# ---- Tracer interface ----
412414

413415
@contextlib.contextmanager
414416
def trace(
415-
self, operation_name: str, tags: Optional[dict[str, Any]] = None
417+
self,
418+
operation_name: str,
419+
tags: Optional[dict[str, Any]] = None,
420+
parent_span: Optional[Span] = None,
416421
) -> Iterator[Span]:
417-
span = InMemorySpan()
422+
run_vocab = (
423+
parent_span.run_vocab
424+
if isinstance(parent_span, InMemorySpan)
425+
else set()
426+
)
427+
span = InMemorySpan(operation_name=operation_name, run_vocab=run_vocab)
418428
if tags:
419429
span.set_tags(tags)
430+
420431
old = self._current_span
421432
self._current_span = span
422433
try:
423434
yield span
424435
finally:
425436
self._current_span = old
426437
self._on_span_finished(span)
438+
if parent_span is None:
439+
self._finalize_run(span.run_vocab)
427440

428441
def current_span(self) -> Optional[Span]:
429442
return self._current_span
@@ -444,16 +457,20 @@ class DriftMonitorTracer(Tracer):
444457
for key, val in span.get_tags().items():
445458
if "input" in key or "output" in key or "content" in key:
446459
text_parts.append(str(val))
447-
if not text_parts:
460+
if text_parts:
461+
span.run_vocab.update(self._tokenize(" ".join(text_parts)))
462+
463+
def _finalize_run(self, run_vocab: set[str]) -> None:
464+
if not run_vocab:
448465
return
449466

450-
vocab = self._tokenize(" ".join(text_parts))
451-
if self._baseline_vocab is None:
452-
self._baseline_vocab = vocab
453467
self._run_count += 1
468+
if self._baseline_vocab is None:
469+
self._baseline_vocab = set(run_vocab)
470+
self._latest_vocab = set(run_vocab)
454471

455472
def check_drift(self) -> dict[str, Any]:
456-
"""Return a drift report after the latest run.
473+
"""Return a drift report after the latest pipeline run.
457474
458475
Returns a dict with keys:
459476
- ``gcs``: Ghost Consistency Score (1.0 = no drift, 0.0 = complete drift)
@@ -464,20 +481,14 @@ class DriftMonitorTracer(Tracer):
464481
if self._baseline_vocab is None or self._run_count < 2:
465482
return {"gcs": 1.0, "ghost_terms": [], "alert": False, "run": self._run_count}
466483

467-
current = self._current_vocab_snapshot()
468-
ghost = self._baseline_vocab - current
484+
ghost = self._baseline_vocab - self._latest_vocab
469485
gcs = 1.0 - len(ghost) / max(len(self._baseline_vocab), 1)
470486
return {
471487
"gcs": round(gcs, 3),
472488
"ghost_terms": sorted(ghost),
473489
"alert": gcs < self._alert_threshold,
474490
"run": self._run_count,
475491
}
476-
477-
def _current_vocab_snapshot(self) -> set[str]:
478-
"""Return the vocabulary seen in the most recent finished span."""
479-
# For a richer implementation, keep a rolling per-run vocab here.
480-
return self._baseline_vocab or set()
481492
```
482493

483494
### Using the tracer

0 commit comments

Comments
 (0)