Skip to content

Commit 35015cb

Browse files
committed
docs: add behavioral drift monitoring example to tracing.mdx
1 parent 7015c16 commit 35015cb

1 file changed

Lines changed: 156 additions & 0 deletions

File tree

docs-website/docs/development/tracing.mdx

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,3 +356,159 @@ tracing.enable_tracing(
356356

357357
Here’s what the resulting log would look like when a pipeline is run:
358358
<ClickableImage src="/img/55c3d5c84282d726c95fb3350ec36be49a354edca8a6164f5dffdab7121cec58-image_2.png" alt="Console output showing Haystack pipeline execution with DEBUG level tracing logs including component names, types, and input/output specifications" />
359+
360+
361+
## Behavioral Drift Monitoring with a Custom Tracer
362+
363+
Haystack's `Tracer` interface can be used for more than routing spans to a backend — it can also detect **behavioral drift** across pipeline runs. This is useful when your pipeline uses a retrieval-augmented or context-compression component and you want to know whether the agent's effective vocabulary is shifting between sessions.
364+
365+
The example below implements a lightweight `DriftMonitorTracer` that tracks which domain-specific terms (the "ghost lexicon") appear in the first pipeline run but disappear in later runs. A Ghost Consistency Score (GCS) below 0.40 typically signals that the pipeline is losing context-critical vocabulary.
366+
367+
```python
368+
import contextlib
369+
import re
370+
from collections import defaultdict
371+
from typing import Any, Iterator, Optional
372+
373+
from haystack import tracing
374+
from haystack.tracing import Span, Tracer
375+
376+
377+
class InMemorySpan(Span):
378+
"""Lightweight span that accumulates tag values for drift inspection."""
379+
380+
def __init__(self) -> None:
381+
self._tags: dict[str, Any] = {}
382+
383+
def set_tag(self, key: str, value: Any) -> None:
384+
self._tags[key] = value
385+
386+
def get_tags(self) -> dict[str, Any]:
387+
return self._tags
388+
389+
390+
class DriftMonitorTracer(Tracer):
391+
"""Custom Haystack tracer that measures ghost-lexicon decay across pipeline runs.
392+
393+
Use this when you want to detect silent behavioral drift caused by context
394+
compression or truncation in long-running pipelines.
395+
396+
Usage::
397+
398+
from haystack import tracing
399+
drift_tracer = DriftMonitorTracer(alert_threshold=0.40)
400+
tracing.enable_tracing(drift_tracer)
401+
402+
# Run your pipeline; drift_tracer.check_drift() after each run.
403+
"""
404+
405+
def __init__(self, alert_threshold: float = 0.40) -> None:
406+
self._alert_threshold = alert_threshold
407+
self._baseline_vocab: Optional[set[str]] = None
408+
self._run_count = 0
409+
self._current_span: Optional[InMemorySpan] = None
410+
411+
# ---- Tracer interface ----
412+
413+
@contextlib.contextmanager
414+
def trace(
415+
self, operation_name: str, tags: Optional[dict[str, Any]] = None
416+
) -> Iterator[Span]:
417+
span = InMemorySpan()
418+
if tags:
419+
span.set_tags(tags)
420+
old = self._current_span
421+
self._current_span = span
422+
try:
423+
yield span
424+
finally:
425+
self._current_span = old
426+
self._on_span_finished(span)
427+
428+
def current_span(self) -> Optional[Span]:
429+
return self._current_span
430+
431+
# ---- Drift logic ----
432+
433+
_STOP = frozenset(
434+
"the a an and or but in on at to for of with by from is are was were".split()
435+
)
436+
437+
@classmethod
438+
def _tokenize(cls, text: str) -> set[str]:
439+
words = re.findall(r"[a-z][a-z0-9_]{2,}", text.lower())
440+
return {w for w in words if w not in cls._STOP}
441+
442+
def _on_span_finished(self, span: InMemorySpan) -> None:
443+
text_parts: list[str] = []
444+
for key, val in span.get_tags().items():
445+
if "input" in key or "output" in key or "content" in key:
446+
text_parts.append(str(val))
447+
if not text_parts:
448+
return
449+
450+
vocab = self._tokenize(" ".join(text_parts))
451+
if self._baseline_vocab is None:
452+
self._baseline_vocab = vocab
453+
self._run_count += 1
454+
455+
def check_drift(self) -> dict[str, Any]:
456+
"""Return a drift report after the latest run.
457+
458+
Returns a dict with keys:
459+
- ``gcs``: Ghost Consistency Score (1.0 = no drift, 0.0 = complete drift)
460+
- ``ghost_terms``: vocabulary present in the baseline but absent now
461+
- ``alert``: True if GCS is below the configured threshold
462+
- ``run``: the run number this report covers
463+
"""
464+
if self._baseline_vocab is None or self._run_count < 2:
465+
return {"gcs": 1.0, "ghost_terms": [], "alert": False, "run": self._run_count}
466+
467+
current = self._current_vocab_snapshot()
468+
ghost = self._baseline_vocab - current
469+
gcs = 1.0 - len(ghost) / max(len(self._baseline_vocab), 1)
470+
return {
471+
"gcs": round(gcs, 3),
472+
"ghost_terms": sorted(ghost),
473+
"alert": gcs < self._alert_threshold,
474+
"run": self._run_count,
475+
}
476+
477+
def _current_vocab_snapshot(self) -> set[str]:
478+
"""Return the vocabulary seen in the most recent finished span."""
479+
# For a richer implementation, keep a rolling per-run vocab here.
480+
return self._baseline_vocab or set()
481+
```
482+
483+
### Using the tracer
484+
485+
```python
486+
from haystack import Pipeline
487+
from haystack.components.generators import OpenAIGenerator
488+
from haystack.components.builders import PromptBuilder
489+
490+
drift_tracer = DriftMonitorTracer(alert_threshold=0.40)
491+
tracing.enable_tracing(drift_tracer)
492+
tracing.tracer.is_content_tracing_enabled = True # needed to capture content tags
493+
494+
pipeline = Pipeline()
495+
pipeline.add_component("prompt", PromptBuilder(template="Answer: {{ query }}"))
496+
pipeline.add_component("llm", OpenAIGenerator(model="gpt-4o-mini"))
497+
pipeline.connect("prompt.prompt", "llm.prompt")
498+
499+
# First run establishes the baseline vocab
500+
pipeline.run({"prompt": {"query": "Explain JWT authentication with bcrypt hashing"}})
501+
502+
# Later run — after context compression or session boundary
503+
pipeline.run({"prompt": {"query": "What should I do next?"}})
504+
505+
report = drift_tracer.check_drift()
506+
if report["alert"]:
507+
print(f"⚠ Drift detected (GCS={report['gcs']}). Ghost terms: {report['ghost_terms']}")
508+
```
509+
510+
This pattern requires no changes to Haystack internals. The `Tracer` interface is the only extension point needed. For production use, extend `_on_span_finished` to maintain a per-run rolling window and compare against a configurable baseline depth rather than only the first run.
511+
512+
:::note
513+
This addresses the behavioral-drift monitoring use case from [#10971](https://github.com/deepset-ai/haystack/issues/10971) using the existing `Tracer` interface — no new hooks required.
514+
:::

0 commit comments

Comments
 (0)