Skip to content

Commit bdf7384

Browse files
jsonbaileyclaude
andcommitted
feat!: Add per-execution runId, at-most-once tracking, and cross-process tracker resumption
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent a55fa40 commit bdf7384

9 files changed

Lines changed: 425 additions & 42 deletions

File tree

packages/sdk/server-ai/src/ldai/client.py

Lines changed: 73 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
from typing import Any, Dict, List, Optional, Tuple
1+
import base64
2+
import json
3+
from typing import Any, Callable, Dict, List, Optional, Tuple
24

35
import chevron
46
from ldclient import Context
@@ -61,14 +63,53 @@ def __init__(self, client: LDClient):
6163
1,
6264
)
6365

66+
def create_tracker(self, token: str, context: Context) -> LDAIConfigTracker:
67+
"""
68+
Reconstruct a tracker from a resumption token.
69+
70+
This is used for cross-process scenarios such as deferred feedback,
71+
where a different service needs to associate tracking events with the
72+
original execution's ``runId``.
73+
74+
:param token: A URL-safe Base64-encoded resumption token obtained from
75+
:attr:`LDAIConfigTracker.resumption_token`.
76+
:param context: The context to use for track events.
77+
:return: A new :class:`LDAIConfigTracker` bound to the original
78+
``runId`` from the token.
79+
:raises ValueError: If the token is invalid or missing required fields.
80+
"""
81+
try:
82+
# Add padding back before decoding
83+
padded = token + "=" * (-len(token) % 4)
84+
payload = json.loads(
85+
base64.urlsafe_b64decode(padded.encode("utf-8")).decode("utf-8")
86+
)
87+
except (json.JSONDecodeError, Exception) as e:
88+
raise ValueError(f"Invalid resumption token: {e}") from e
89+
90+
for field in ("runId", "configKey", "version"):
91+
if field not in payload:
92+
raise ValueError(f"Invalid resumption token: missing required field '{field}'")
93+
94+
return LDAIConfigTracker(
95+
ld_client=self._client,
96+
variation_key=payload.get("variationKey", ""),
97+
config_key=payload["configKey"],
98+
version=payload["version"],
99+
model_name="",
100+
provider_name="",
101+
context=context,
102+
run_id=payload["runId"],
103+
)
104+
64105
def _completion_config(
65106
self,
66107
key: str,
67108
context: Context,
68109
default: AICompletionConfigDefault,
69110
variables: Optional[Dict[str, Any]] = None,
70111
) -> AICompletionConfig:
71-
model, provider, messages, instructions, tracker, enabled, judge_configuration, _ = self.__evaluate(
112+
model, provider, messages, instructions, tracker, tracker_factory, enabled, judge_configuration, _ = self.__evaluate(
72113
key, context, default.to_dict(), variables
73114
)
74115

@@ -79,6 +120,7 @@ def _completion_config(
79120
messages=messages,
80121
provider=provider,
81122
tracker=tracker,
123+
create_tracker=tracker_factory if enabled else None,
82124
judge_configuration=judge_configuration,
83125
)
84126

@@ -134,7 +176,7 @@ def _judge_config(
134176
default: AIJudgeConfigDefault,
135177
variables: Optional[Dict[str, Any]] = None,
136178
) -> AIJudgeConfig:
137-
model, provider, messages, instructions, tracker, enabled, judge_configuration, variation = self.__evaluate(
179+
model, provider, messages, instructions, tracker, tracker_factory, enabled, judge_configuration, variation = self.__evaluate(
138180
key, context, default.to_dict(), variables
139181
)
140182

@@ -163,6 +205,7 @@ def _extract_evaluation_metric_key(variation: Dict[str, Any]) -> Optional[str]:
163205
messages=messages,
164206
provider=provider,
165207
tracker=tracker,
208+
create_tracker=tracker_factory if enabled else None,
166209
)
167210

168211
return config
@@ -248,14 +291,14 @@ async def create_judge(
248291
key, context, default or AIJudgeConfigDefault.disabled(), extended_variables
249292
)
250293

251-
if not judge_config.enabled or not judge_config.tracker:
294+
if not judge_config.enabled or not judge_config.create_tracker:
252295
return None
253296

254297
provider = RunnerFactory.create_model(judge_config, default_ai_provider)
255298
if not provider:
256299
return None
257300

258-
return Judge(judge_config, judge_config.tracker, provider)
301+
return Judge(judge_config, judge_config.create_tracker(), provider)
259302
except Exception as error:
260303
return None
261304

@@ -345,7 +388,7 @@ async def create_model(
345388
log.debug(f"Creating managed model for key: {key}")
346389
config = self._completion_config(key, context, default or AICompletionConfigDefault.disabled(), variables)
347390

348-
if not config.enabled or not config.tracker:
391+
if not config.enabled or not config.create_tracker:
349392
return None
350393

351394
runner = RunnerFactory.create_model(config, default_ai_provider)
@@ -361,7 +404,7 @@ async def create_model(
361404
default_ai_provider,
362405
)
363406

364-
return ManagedModel(config, config.tracker, runner, judges)
407+
return ManagedModel(config, config.create_tracker(), runner, judges)
365408

366409
async def create_chat(
367410
self,
@@ -428,14 +471,14 @@ async def create_agent(
428471
log.debug(f"Creating managed agent for key: {key}")
429472
config = self.__evaluate_agent(key, context, default or AIAgentConfigDefault.disabled(), variables)
430473

431-
if not config.enabled or not config.tracker:
474+
if not config.enabled or not config.create_tracker:
432475
return None
433476

434477
runner = RunnerFactory.create_agent(config, tools or {}, default_ai_provider)
435478
if not runner:
436479
return None
437480

438-
return ManagedAgent(config, config.tracker, runner)
481+
return ManagedAgent(config, config.create_tracker(), runner)
439482

440483
def agent_config(
441484
self,
@@ -750,7 +793,7 @@ def __evaluate(
750793
variables: Optional[Dict[str, Any]] = None,
751794
) -> Tuple[
752795
Optional[ModelConfig], Optional[ProviderConfig], Optional[List[LDMessage]],
753-
Optional[str], LDAIConfigTracker, bool, Optional[Any], Dict[str, Any]
796+
Optional[str], LDAIConfigTracker, Callable[[], LDAIConfigTracker], bool, Optional[Any], Dict[str, Any]
754797
]:
755798
"""
756799
Internal method to evaluate a configuration and extract components.
@@ -801,15 +844,23 @@ def __evaluate(
801844
custom=custom
802845
)
803846

804-
tracker = LDAIConfigTracker(
805-
self._client,
806-
variation.get('_ldMeta', {}).get('variationKey', ''),
807-
key,
808-
int(variation.get('_ldMeta', {}).get('version', 1)),
809-
model.name if model else '',
810-
provider_config.name if provider_config else '',
811-
context,
812-
)
847+
variation_key = variation.get('_ldMeta', {}).get('variationKey', '')
848+
version = int(variation.get('_ldMeta', {}).get('version', 1))
849+
model_name = model.name if model else ''
850+
provider_name = provider_config.name if provider_config else ''
851+
852+
def tracker_factory() -> LDAIConfigTracker:
853+
return LDAIConfigTracker(
854+
self._client,
855+
variation_key,
856+
key,
857+
version,
858+
model_name,
859+
provider_name,
860+
context,
861+
)
862+
863+
tracker = tracker_factory()
813864

814865
enabled = variation.get('_ldMeta', {}).get('enabled', False)
815866

@@ -828,7 +879,7 @@ def __evaluate(
828879
if judges:
829880
judge_configuration = JudgeConfiguration(judges=judges)
830881

831-
return model, provider_config, messages, instructions, tracker, enabled, judge_configuration, variation
882+
return model, provider_config, messages, instructions, tracker, tracker_factory, enabled, judge_configuration, variation
832883

833884
def __evaluate_agent(
834885
self,
@@ -846,7 +897,7 @@ def __evaluate_agent(
846897
:param variables: Variables for interpolation.
847898
:return: Configured AIAgentConfig instance.
848899
"""
849-
model, provider, messages, instructions, tracker, enabled, judge_configuration, _ = self.__evaluate(
900+
model, provider, messages, instructions, tracker, tracker_factory, enabled, judge_configuration, _ = self.__evaluate(
850901
key, context, default.to_dict(), variables
851902
)
852903

@@ -860,6 +911,7 @@ def __evaluate_agent(
860911
provider=provider or default.provider,
861912
instructions=final_instructions,
862913
tracker=tracker,
914+
create_tracker=tracker_factory if enabled else None,
863915
judge_configuration=judge_configuration or default.judge_configuration,
864916
)
865917

packages/sdk/server-ai/src/ldai/judge/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,12 @@ async def evaluate(
6868
log.debug(f'Judge evaluation skipped due to sampling rate: {sampling_rate}')
6969
return None
7070

71+
tracker = self._ai_config.create_tracker() if self._ai_config.create_tracker else self._ai_config_tracker
72+
7173
messages = self._construct_evaluation_messages(input_text, output_text)
7274
assert self._evaluation_response_structure is not None
7375

74-
response = await self._ai_config_tracker.track_metrics_of_async(
76+
response = await tracker.track_metrics_of_async(
7577
lambda: self._model_runner.invoke_structured_model(messages, self._evaluation_response_structure),
7678
lambda result: result.metrics,
7779
)

packages/sdk/server-ai/src/ldai/managed_agent.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ async def run(self, input: str) -> AgentResult:
3030
:param input: The user prompt or input to the agent
3131
:return: AgentResult containing the agent's output and metrics
3232
"""
33-
return await self._tracker.track_metrics_of_async(
33+
tracker = self._ai_config.create_tracker() if self._ai_config.create_tracker else self._tracker
34+
return await tracker.track_metrics_of_async(
3435
lambda: self._agent_runner.run(input),
3536
lambda result: result.metrics,
3637
)

packages/sdk/server-ai/src/ldai/managed_model.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,15 @@ async def invoke(self, prompt: str) -> ModelResponse:
4242
:param prompt: The user prompt to send to the model
4343
:return: ModelResponse containing the model's response and metrics
4444
"""
45+
tracker = self._ai_config.create_tracker() if self._ai_config.create_tracker else self._tracker
46+
4547
user_message = LDMessage(role='user', content=prompt)
4648
self._messages.append(user_message)
4749

4850
config_messages = self._ai_config.messages or []
4951
all_messages = config_messages + self._messages
5052

51-
response = await self._tracker.track_metrics_of_async(
53+
response = await tracker.track_metrics_of_async(
5254
lambda: self._model_runner.invoke_model(all_messages),
5355
lambda result: result.metrics,
5456
)
@@ -57,13 +59,14 @@ async def invoke(self, prompt: str) -> ModelResponse:
5759
self._ai_config.judge_configuration
5860
and self._ai_config.judge_configuration.judges
5961
):
60-
response.evaluations = self._start_judge_evaluations(self._messages, response)
62+
response.evaluations = self._start_judge_evaluations(tracker, self._messages, response)
6163

6264
self._messages.append(response.message)
6365
return response
6466

6567
def _start_judge_evaluations(
6668
self,
69+
tracker: LDAIConfigTracker,
6770
messages: List[LDMessage],
6871
response: ModelResponse,
6972
) -> List[asyncio.Task[Optional[JudgeResponse]]]:
@@ -77,7 +80,7 @@ async def evaluate_judge(judge_config: Any) -> Optional[JudgeResponse]:
7780
return None
7881
eval_result = await judge.evaluate_messages(messages, response, judge_config.sampling_rate)
7982
if eval_result and eval_result.success:
80-
self._tracker.track_judge_response(eval_result)
83+
tracker.track_judge_response(eval_result)
8184
return eval_result
8285

8386
return [

packages/sdk/server-ai/src/ldai/models.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import warnings
22
from dataclasses import dataclass, field
3-
from typing import Any, Dict, List, Literal, Optional, Union
3+
from typing import Any, Callable, Dict, List, Literal, Optional, Union
44

55
from ldai.tracker import LDAIConfigTracker
66

@@ -183,6 +183,7 @@ class AIConfig:
183183
model: Optional[ModelConfig] = None
184184
provider: Optional[ProviderConfig] = None
185185
tracker: Optional[LDAIConfigTracker] = None
186+
create_tracker: Optional[Callable[[], LDAIConfigTracker]] = None
186187

187188
def _base_to_dict(self) -> Dict[str, Any]:
188189
"""

packages/sdk/server-ai/src/ldai/tracker.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import base64
2+
import json
13
import logging
24
import time
35
import uuid
@@ -81,6 +83,7 @@ def __init__(
8183
model_name: str,
8284
provider_name: str,
8385
context: Context,
86+
run_id: Optional[str] = None,
8487
):
8588
"""
8689
Initialize an AI Config tracker.
@@ -92,6 +95,7 @@ def __init__(
9295
:param model_name: Name of the model used.
9396
:param provider_name: Name of the provider used.
9497
:param context: Context for evaluation.
98+
:param run_id: Optional run ID. When not provided, a new UUID is generated.
9599
"""
96100
self._ld_client = ld_client
97101
self._variation_key = variation_key
@@ -101,9 +105,26 @@ def __init__(
101105
self._provider_name = provider_name
102106
self._context = context
103107
self._summary = LDAIMetricSummary()
104-
self._run_id = str(uuid.uuid4())
108+
self._run_id = run_id or str(uuid.uuid4())
105109
self._tracked: Dict[str, bool] = {}
106110

111+
@property
112+
def resumption_token(self) -> str:
113+
"""
114+
A URL-safe Base64-encoded JSON string that can be used to reconstruct
115+
a tracker in a different process (e.g. for deferred feedback).
116+
117+
The token contains ``runId``, ``configKey``, ``variationKey``, and
118+
``version``. ``modelName`` and ``providerName`` are **not** included.
119+
"""
120+
payload = json.dumps({
121+
"runId": self._run_id,
122+
"configKey": self._config_key,
123+
"variationKey": self._variation_key,
124+
"version": self._version,
125+
})
126+
return base64.urlsafe_b64encode(payload.encode("utf-8")).rstrip(b"=").decode("utf-8")
127+
107128
def __get_track_data(self, graph_key: Optional[str] = None) -> dict:
108129
"""
109130
Get tracking data for events.

packages/sdk/server-ai/tests/test_managed_agent.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class TestManagedAgentRun:
5555
async def test_run_delegates_to_agent_runner(self):
5656
"""Should delegate run() to the underlying AgentRunner."""
5757
mock_config = MagicMock(spec=AIAgentConfig)
58+
mock_config.create_tracker = None # fall back to passed tracker
5859
mock_tracker = MagicMock()
5960
mock_tracker.track_metrics_of_async = AsyncMock(
6061
return_value=AgentResult(
@@ -79,6 +80,31 @@ async def test_run_delegates_to_agent_runner(self):
7980
assert result.metrics.success is True
8081
mock_tracker.track_metrics_of_async.assert_called_once()
8182

83+
@pytest.mark.asyncio
84+
async def test_run_uses_create_tracker_when_available(self):
85+
"""Should use create_tracker() factory for a fresh tracker per invocation."""
86+
mock_config = MagicMock(spec=AIAgentConfig)
87+
fresh_tracker = MagicMock()
88+
fresh_tracker.track_metrics_of_async = AsyncMock(
89+
return_value=AgentResult(
90+
output="Fresh tracker response",
91+
raw=None,
92+
metrics=LDAIMetrics(success=True, usage=None),
93+
)
94+
)
95+
mock_config.create_tracker = MagicMock(return_value=fresh_tracker)
96+
97+
old_tracker = MagicMock()
98+
mock_runner = MagicMock()
99+
100+
agent = ManagedAgent(mock_config, old_tracker, mock_runner)
101+
result = await agent.run("Hello")
102+
103+
assert result.output == "Fresh tracker response"
104+
mock_config.create_tracker.assert_called_once()
105+
fresh_tracker.track_metrics_of_async.assert_called_once()
106+
old_tracker.track_metrics_of_async.assert_not_called()
107+
82108
def test_get_agent_runner_returns_runner(self):
83109
"""Should return the underlying AgentRunner."""
84110
mock_runner = MagicMock()

0 commit comments

Comments
 (0)