Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions langfuse/_client/attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,32 @@ def _serialize(obj: Any) -> Optional[str]:
return json.dumps(obj, cls=EventSerializer)


def _flatten_and_serialize_metadata_values(
metadata: Optional[Dict[str, Any]],
) -> Optional[Dict[str, str]]:
if metadata is None:
return None

flattened_metadata: Dict[str, str] = {}

def flatten_value(path: str, value: Any) -> None:
if isinstance(value, dict):
for nested_key, nested_value in value.items():
flatten_value(f"{path}.{nested_key}", nested_value)

return

serialized_value = _serialize(value)

if serialized_value is not None:
flattened_metadata[path] = serialized_value
Comment thread
hassiebp marked this conversation as resolved.

for key, value in metadata.items():
flatten_value(str(key), value)

return flattened_metadata


def _flatten_and_serialize_metadata(
metadata: Any, type: Literal["observation", "trace"]
) -> dict:
Expand Down
14 changes: 11 additions & 3 deletions langfuse/_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@
from packaging.version import Version
from typing_extensions import deprecated

from langfuse._client.attributes import LangfuseOtelSpanAttributes, _serialize
from langfuse._client.attributes import (
LangfuseOtelSpanAttributes,
_flatten_and_serialize_metadata_values,
_serialize,
)
from langfuse._client.constants import (
LANGFUSE_SDK_EXPERIMENT_ENVIRONMENT,
ObservationTypeGenerationLike,
Expand Down Expand Up @@ -2791,10 +2795,14 @@ async def _process_experiment_item(
propagated_experiment_attributes = PropagatedExperimentAttributes(
experiment_id=experiment_id,
experiment_name=experiment_run_name,
experiment_metadata=_serialize(experiment_metadata),
experiment_metadata=_flatten_and_serialize_metadata_values(
experiment_metadata
),
Comment thread
hassiebp marked this conversation as resolved.
experiment_dataset_id=dataset_id,
experiment_item_id=experiment_item_id,
experiment_item_metadata=_serialize(item_metadata),
experiment_item_metadata=_flatten_and_serialize_metadata_values(
item_metadata if isinstance(item_metadata, dict) else None
),
experiment_item_root_observation_id=span.id,
)

Expand Down
50 changes: 35 additions & 15 deletions langfuse/_client/propagation.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@
class PropagatedExperimentAttributes(TypedDict):
experiment_id: str
experiment_name: str
experiment_metadata: Optional[str]
experiment_metadata: Optional[Dict[str, str]]
experiment_dataset_id: Optional[str]
experiment_item_id: str
experiment_item_metadata: Optional[str]
experiment_item_metadata: Optional[Dict[str, str]]
experiment_item_root_observation_id: str


Expand Down Expand Up @@ -247,9 +247,20 @@ def _propagate_attributes(
"trace_name": trace_name,
}

propagated_string_attributes = propagated_string_attributes | (
cast(Dict[str, Union[str, List[str], None]], experiment) or {}
)
propagated_metadata_attributes: Dict[str, Optional[Dict[str, str]]] = {
"metadata": metadata,
}

if experiment:
for key, value in experiment.items():
if key in ("experiment_metadata", "experiment_item_metadata"):
propagated_metadata_attributes[key] = cast(
Optional[Dict[str, str]], value
)
else:
propagated_string_attributes[key] = cast(
Optional[Union[str, List[str]]], value
)

# Filter out None values
propagated_string_attributes = {
Expand All @@ -268,16 +279,19 @@ def _propagate_attributes(
as_baggage=as_baggage,
)

if metadata is not None:
for metadata_key, metadata_value in propagated_metadata_attributes.items():
if metadata_value is None:
continue

validated_metadata: Dict[str, str] = {}

for key, value in metadata.items():
if _validate_string_value(value=value, key=f"metadata.{key}"):
for key, value in metadata_value.items():
if _validate_string_value(value=value, key=f"{metadata_key}.{key}"):
validated_metadata[key] = value

if validated_metadata:
context = _set_propagated_attribute(
key="metadata",
key=metadata_key,
value=validated_metadata,
context=context,
span=current_span,
Expand Down Expand Up @@ -322,9 +336,10 @@ def _get_propagated_attributes_from_context(

if isinstance(value, dict):
# Handle metadata
span_key = _get_propagated_span_key(key)

for k, v in value.items():
span_key = f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.{k}"
propagated_attributes[span_key] = v
propagated_attributes[f"{span_key}.{k}"] = v

else:
span_key = _get_propagated_span_key(key)
Expand Down Expand Up @@ -387,7 +402,7 @@ def _set_propagated_attribute(
# Handle metadata
for k, v in value.items():
span.set_attribute(
key=f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.{k}",
key=f"{span_key}.{k}",
value=v,
)

Expand Down Expand Up @@ -469,10 +484,14 @@ def _get_span_key_from_baggage_key(key: str) -> Optional[str]:
# Remove prefix to get the actual key name
suffix = key[len(LANGFUSE_BAGGAGE_PREFIX) :]

if suffix.startswith("metadata_"):
metadata_key = suffix[len("metadata_") :]
for metadata_key in ("metadata", "experiment_metadata", "experiment_item_metadata"):
baggage_metadata_prefix = f"{metadata_key}_"

return _get_propagated_span_key(metadata_key)
if suffix.startswith(baggage_metadata_prefix):
return (
f"{_get_propagated_span_key(metadata_key)}."
f"{suffix[len(baggage_metadata_prefix) :]}"
)

return _get_propagated_span_key(suffix)

Expand All @@ -484,6 +503,7 @@ def _get_propagated_span_key(key: str) -> str:
"version": LangfuseOtelSpanAttributes.VERSION,
"tags": LangfuseOtelSpanAttributes.TRACE_TAGS,
"trace_name": LangfuseOtelSpanAttributes.TRACE_NAME,
"metadata": LangfuseOtelSpanAttributes.TRACE_METADATA,
"experiment_id": LangfuseOtelSpanAttributes.EXPERIMENT_ID,
"experiment_name": LangfuseOtelSpanAttributes.EXPERIMENT_NAME,
"experiment_metadata": LangfuseOtelSpanAttributes.EXPERIMENT_METADATA,
Expand Down
59 changes: 59 additions & 0 deletions tests/e2e/test_experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
from typing import Any, Dict, List

import pytest
from opentelemetry import trace as otel_trace_api

from langfuse import get_client
from langfuse._client.attributes import LangfuseOtelSpanAttributes
from langfuse.experiment import (
Evaluation,
ExperimentData,
Expand Down Expand Up @@ -135,6 +137,63 @@ def test_run_experiment_on_local_dataset(sample_dataset):
)


def test_run_experiment_flattens_large_metadata_for_server_ingestion():
"""Server ingestion handles flattened experiment metadata on non-SDK child spans."""
langfuse_client = get_client()
external_tracer = otel_trace_api.get_tracer("ai.langfuse-python.e2e")
external_span_name = "external-experiment-metadata-child-" + create_uuid()[:8]

experiment_metadata = {
"mode": "offline",
"job_name": "agent-eval/PR-4",
"build_url": "https://example.com/job/agent-eval-example/job/PR-4",
"agent_name": "agent-eval-example",
}

def task_with_external_child(*, item: ExperimentItem, **kwargs: Dict[str, Any]):
with external_tracer.start_as_current_span(external_span_name) as span:
span.set_attribute("gen_ai.operation.name", "experiment-metadata-e2e")

return "processed"

result = langfuse_client.run_experiment(
name="Flattened Experiment Metadata " + create_uuid()[:8],
data=[{"input": "test input", "expected_output": "processed"}],
task=task_with_external_child,
metadata=experiment_metadata,
)

langfuse_client.flush()

trace_id = result.item_results[0].trace_id
assert trace_id is not None

trace = wait_for_trace(
trace_id,
is_result_ready=lambda fetched_trace: any(
observation.name == external_span_name
for observation in fetched_trace.observations
),
)

assert trace.metadata is not None
for metadata_key, metadata_value in experiment_metadata.items():
assert trace.metadata[metadata_key] == metadata_value
Comment thread
hassiebp marked this conversation as resolved.

external_observation = next(
observation
for observation in trace.observations
if observation.name == external_span_name
)
external_metadata = external_observation.metadata or {}

assert not any(
key == LangfuseOtelSpanAttributes.EXPERIMENT_METADATA
or key.startswith(f"{LangfuseOtelSpanAttributes.EXPERIMENT_METADATA}.")
for key in external_metadata
)


def test_run_experiment_on_langfuse_dataset():
"""Test running experiment on Langfuse dataset."""
langfuse_client = get_client()
Expand Down
Loading
Loading