diff --git a/docs-website/docs/development/tracing.mdx b/docs-website/docs/development/tracing.mdx index 589c638766..efb8d4a74b 100644 --- a/docs-website/docs/development/tracing.mdx +++ b/docs-website/docs/development/tracing.mdx @@ -356,3 +356,170 @@ tracing.enable_tracing( Here’s what the resulting log would look like when a pipeline is run: + + +## Cross-Run Behavioral Drift Monitoring with a Custom Tracer + +Haystack's `Tracer` interface can be used for more than routing spans to a backend — it can also detect **behavioral drift** across pipeline runs. This is a workaround for users who want extra observability today without adding a first-class context-compaction event to Haystack itself. + +The example below implements a lightweight `DriftMonitorTracer` that tracks which domain-specific terms (the "ghost lexicon") appear in the first pipeline run but disappear in later runs. A Ghost Consistency Score (GCS) below 0.40 typically signals that the pipeline is losing context-critical vocabulary across runs. + +```python +import contextlib +import re +from typing import Any, Iterator, Optional + +from haystack import tracing +from haystack.tracing import Span, Tracer + + +class InMemorySpan(Span): + """Lightweight span that shares a run-level vocabulary set.""" + + def __init__(self, operation_name: str, run_vocab: set[str]) -> None: + self.operation_name = operation_name + self.run_vocab = run_vocab + self._tags: dict[str, Any] = {} + + def set_tag(self, key: str, value: Any) -> None: + self._tags[key] = value + + def get_tags(self) -> dict[str, Any]: + return self._tags + + +class DriftMonitorTracer(Tracer): + """Custom Haystack tracer that measures ghost-lexicon decay across pipeline runs. + + Use this when you want to detect silent behavioral drift caused by context + compression or truncation in long-running pipelines. + + Usage:: + + from haystack import tracing + drift_tracer = DriftMonitorTracer(alert_threshold=0.40) + tracing.enable_tracing(drift_tracer) + + # Run your pipeline; drift_tracer.check_drift() after each run. + """ + + def __init__(self, alert_threshold: float = 0.40) -> None: + self._alert_threshold = alert_threshold + self._baseline_vocab: Optional[set[str]] = None + self._latest_vocab: set[str] = set() + self._run_count = 0 + self._current_span: Optional[InMemorySpan] = None + + # ---- Tracer interface ---- + + @contextlib.contextmanager + def trace( + self, + operation_name: str, + tags: Optional[dict[str, Any]] = None, + parent_span: Optional[Span] = None, + ) -> Iterator[Span]: + run_vocab = ( + parent_span.run_vocab + if isinstance(parent_span, InMemorySpan) + else set() + ) + span = InMemorySpan(operation_name=operation_name, run_vocab=run_vocab) + if tags: + span.set_tags(tags) + + old = self._current_span + self._current_span = span + try: + yield span + finally: + self._current_span = old + self._on_span_finished(span) + if parent_span is None: + self._finalize_run(span.run_vocab) + + def current_span(self) -> Optional[Span]: + return self._current_span + + # ---- Drift logic ---- + + _STOP = frozenset( + "the a an and or but in on at to for of with by from is are was were".split() + ) + + @classmethod + def _tokenize(cls, text: str) -> set[str]: + words = re.findall(r"[a-z][a-z0-9_]{2,}", text.lower()) + return {w for w in words if w not in cls._STOP} + + def _on_span_finished(self, span: InMemorySpan) -> None: + text_parts: list[str] = [] + for key, val in span.get_tags().items(): + if "input" in key or "output" in key or "content" in key: + text_parts.append(str(val)) + if text_parts: + span.run_vocab.update(self._tokenize(" ".join(text_parts))) + + def _finalize_run(self, run_vocab: set[str]) -> None: + if not run_vocab: + return + + self._run_count += 1 + if self._baseline_vocab is None: + self._baseline_vocab = set(run_vocab) + self._latest_vocab = set(run_vocab) + + def check_drift(self) -> dict[str, Any]: + """Return a drift report after the latest pipeline run. + + Returns a dict with keys: + - ``gcs``: Ghost Consistency Score (1.0 = no drift, 0.0 = complete drift) + - ``ghost_terms``: vocabulary present in the baseline but absent now + - ``alert``: True if GCS is below the configured threshold + - ``run``: the run number this report covers + """ + if self._baseline_vocab is None or self._run_count < 2: + return {"gcs": 1.0, "ghost_terms": [], "alert": False, "run": self._run_count} + + ghost = self._baseline_vocab - self._latest_vocab + gcs = 1.0 - len(ghost) / max(len(self._baseline_vocab), 1) + return { + "gcs": round(gcs, 3), + "ghost_terms": sorted(ghost), + "alert": gcs < self._alert_threshold, + "run": self._run_count, + } +``` + +### Using the tracer + +```python +from haystack import Pipeline +from haystack.components.generators import OpenAIGenerator +from haystack.components.builders import PromptBuilder + +drift_tracer = DriftMonitorTracer(alert_threshold=0.40) +tracing.enable_tracing(drift_tracer) +tracing.tracer.is_content_tracing_enabled = True # needed to capture content tags + +pipeline = Pipeline() +pipeline.add_component("prompt", PromptBuilder(template="Answer: {{ query }}")) +pipeline.add_component("llm", OpenAIGenerator(model="gpt-4o-mini")) +pipeline.connect("prompt.prompt", "llm.prompt") + +# First run establishes the baseline vocab +pipeline.run({"prompt": {"query": "Explain JWT authentication with bcrypt hashing"}}) + +# Later run — after context compression or session boundary +pipeline.run({"prompt": {"query": "What should I do next?"}}) + +report = drift_tracer.check_drift() +if report["alert"]: + print(f"⚠ Drift detected (GCS={report['gcs']}). Ghost terms: {report['ghost_terms']}") +``` + +This pattern requires no changes to Haystack internals. The `Tracer` interface is the only extension point needed. For production use, extend `_on_span_finished` to maintain a per-run rolling window and compare against a configurable baseline depth rather than only the first run. + +:::note +This addresses part of the behavioral-drift monitoring use case from [#10971](https://github.com/deepset-ai/haystack/issues/10971) using the existing `Tracer` interface. It is not a first-class boundary event for summarizers or truncators; it is an explicit workaround based on traced inputs and outputs. +:::