Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 167 additions & 0 deletions docs-website/docs/development/tracing.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -356,3 +356,170 @@ tracing.enable_tracing(

Here’s what the resulting log would look like when a pipeline is run:
<ClickableImage src="/img/55c3d5c84282d726c95fb3350ec36be49a354edca8a6164f5dffdab7121cec58-image_2.png" alt="Console output showing Haystack pipeline execution with DEBUG level tracing logs including component names, types, and input/output specifications" />


## Cross-Run Behavioral Drift Monitoring with a Custom Tracer

Haystack's `Tracer` interface can be used for more than routing spans to a backend — it can also detect **behavioral drift** across pipeline runs. This is a workaround for users who want extra observability today without adding a first-class context-compaction event to Haystack itself.

The example below implements a lightweight `DriftMonitorTracer` that tracks which domain-specific terms (the "ghost lexicon") appear in the first pipeline run but disappear in later runs. A Ghost Consistency Score (GCS) below 0.40 typically signals that the pipeline is losing context-critical vocabulary across runs.

```python
import contextlib
import re
from typing import Any, Iterator, Optional

from haystack import tracing
from haystack.tracing import Span, Tracer


class InMemorySpan(Span):
"""Lightweight span that shares a run-level vocabulary set."""

def __init__(self, operation_name: str, run_vocab: set[str]) -> None:
self.operation_name = operation_name
self.run_vocab = run_vocab
self._tags: dict[str, Any] = {}

def set_tag(self, key: str, value: Any) -> None:
self._tags[key] = value

def get_tags(self) -> dict[str, Any]:
return self._tags


class DriftMonitorTracer(Tracer):
"""Custom Haystack tracer that measures ghost-lexicon decay across pipeline runs.

Use this when you want to detect silent behavioral drift caused by context
compression or truncation in long-running pipelines.

Usage::

from haystack import tracing
drift_tracer = DriftMonitorTracer(alert_threshold=0.40)
tracing.enable_tracing(drift_tracer)

# Run your pipeline; drift_tracer.check_drift() after each run.
"""

def __init__(self, alert_threshold: float = 0.40) -> None:
self._alert_threshold = alert_threshold
self._baseline_vocab: Optional[set[str]] = None
self._latest_vocab: set[str] = set()
self._run_count = 0
self._current_span: Optional[InMemorySpan] = None

# ---- Tracer interface ----

@contextlib.contextmanager
def trace(
self,
operation_name: str,
tags: Optional[dict[str, Any]] = None,
parent_span: Optional[Span] = None,
) -> Iterator[Span]:
run_vocab = (
parent_span.run_vocab
if isinstance(parent_span, InMemorySpan)
else set()
)
span = InMemorySpan(operation_name=operation_name, run_vocab=run_vocab)
if tags:
span.set_tags(tags)

old = self._current_span
self._current_span = span
try:
yield span
finally:
self._current_span = old
self._on_span_finished(span)
if parent_span is None:
self._finalize_run(span.run_vocab)

def current_span(self) -> Optional[Span]:
return self._current_span

# ---- Drift logic ----

_STOP = frozenset(
"the a an and or but in on at to for of with by from is are was were".split()
)

@classmethod
def _tokenize(cls, text: str) -> set[str]:
words = re.findall(r"[a-z][a-z0-9_]{2,}", text.lower())
return {w for w in words if w not in cls._STOP}

def _on_span_finished(self, span: InMemorySpan) -> None:
text_parts: list[str] = []
for key, val in span.get_tags().items():
if "input" in key or "output" in key or "content" in key:
text_parts.append(str(val))
if text_parts:
span.run_vocab.update(self._tokenize(" ".join(text_parts)))

def _finalize_run(self, run_vocab: set[str]) -> None:
if not run_vocab:
return

self._run_count += 1
if self._baseline_vocab is None:
self._baseline_vocab = set(run_vocab)
self._latest_vocab = set(run_vocab)

def check_drift(self) -> dict[str, Any]:
"""Return a drift report after the latest pipeline run.

Returns a dict with keys:
- ``gcs``: Ghost Consistency Score (1.0 = no drift, 0.0 = complete drift)
- ``ghost_terms``: vocabulary present in the baseline but absent now
- ``alert``: True if GCS is below the configured threshold
- ``run``: the run number this report covers
"""
if self._baseline_vocab is None or self._run_count < 2:
return {"gcs": 1.0, "ghost_terms": [], "alert": False, "run": self._run_count}

ghost = self._baseline_vocab - self._latest_vocab
gcs = 1.0 - len(ghost) / max(len(self._baseline_vocab), 1)
return {
"gcs": round(gcs, 3),
"ghost_terms": sorted(ghost),
"alert": gcs < self._alert_threshold,
"run": self._run_count,
}
```

### Using the tracer

```python
from haystack import Pipeline
from haystack.components.generators import OpenAIGenerator
from haystack.components.builders import PromptBuilder

drift_tracer = DriftMonitorTracer(alert_threshold=0.40)
tracing.enable_tracing(drift_tracer)
tracing.tracer.is_content_tracing_enabled = True # needed to capture content tags

pipeline = Pipeline()
pipeline.add_component("prompt", PromptBuilder(template="Answer: {{ query }}"))
pipeline.add_component("llm", OpenAIGenerator(model="gpt-4o-mini"))
pipeline.connect("prompt.prompt", "llm.prompt")

# First run establishes the baseline vocab
pipeline.run({"prompt": {"query": "Explain JWT authentication with bcrypt hashing"}})

# Later run — after context compression or session boundary
pipeline.run({"prompt": {"query": "What should I do next?"}})

report = drift_tracer.check_drift()
if report["alert"]:
print(f"⚠ Drift detected (GCS={report['gcs']}). Ghost terms: {report['ghost_terms']}")
```

This pattern requires no changes to Haystack internals. The `Tracer` interface is the only extension point needed. For production use, extend `_on_span_finished` to maintain a per-run rolling window and compare against a configurable baseline depth rather than only the first run.

:::note
This addresses part of the behavioral-drift monitoring use case from [#10971](https://github.com/deepset-ai/haystack/issues/10971) using the existing `Tracer` interface. It is not a first-class boundary event for summarizers or truncators; it is an explicit workaround based on traced inputs and outputs.
:::