Skip to content

Commit 211ead4

Browse files
jsonbaileyclaude
andcommitted
feat!: Add per-execution runId, at-most-once tracking, and cross-process tracker resumption
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 2b997fa commit 211ead4

9 files changed

Lines changed: 424 additions & 42 deletions

File tree

packages/sdk/server-ai/src/ldai/client.py

Lines changed: 74 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
from typing import Any, Dict, List, Optional, Tuple
1+
import base64
2+
import json
3+
from typing import Any, Callable, Dict, List, Optional, Tuple
24

35
import chevron
46
from ldclient import Context
@@ -61,14 +63,53 @@ def __init__(self, client: LDClient):
6163
1,
6264
)
6365

66+
def create_tracker(self, token: str, context: Context) -> LDAIConfigTracker:
67+
"""
68+
Reconstruct a tracker from a resumption token.
69+
70+
This is used for cross-process scenarios such as deferred feedback,
71+
where a different service needs to associate tracking events with the
72+
original execution's ``runId``.
73+
74+
:param token: A URL-safe Base64-encoded resumption token obtained from
75+
:attr:`LDAIConfigTracker.resumption_token`.
76+
:param context: The context to use for track events.
77+
:return: A new :class:`LDAIConfigTracker` bound to the original
78+
``runId`` from the token.
79+
:raises ValueError: If the token is invalid or missing required fields.
80+
"""
81+
try:
82+
# Add padding back before decoding
83+
padded = token + "=" * (-len(token) % 4)
84+
payload = json.loads(
85+
base64.urlsafe_b64decode(padded.encode("utf-8")).decode("utf-8")
86+
)
87+
except (json.JSONDecodeError, Exception) as e:
88+
raise ValueError(f"Invalid resumption token: {e}") from e
89+
90+
for field in ("runId", "configKey", "version"):
91+
if field not in payload:
92+
raise ValueError(f"Invalid resumption token: missing required field '{field}'")
93+
94+
return LDAIConfigTracker(
95+
ld_client=self._client,
96+
variation_key=payload.get("variationKey", ""),
97+
config_key=payload["configKey"],
98+
version=payload["version"],
99+
model_name="",
100+
provider_name="",
101+
context=context,
102+
run_id=payload["runId"],
103+
)
104+
64105
def _completion_config(
65106
self,
66107
key: str,
67108
context: Context,
68109
default: AICompletionConfigDefault,
69110
variables: Optional[Dict[str, Any]] = None,
70111
) -> AICompletionConfig:
71-
model, provider, messages, instructions, tracker, enabled, judge_configuration, _ = self.__evaluate(
112+
model, provider, messages, instructions, tracker, tracker_factory, enabled, judge_configuration, _ = self.__evaluate(
72113
key, context, default.to_dict(), variables
73114
)
74115

@@ -79,6 +120,7 @@ def _completion_config(
79120
messages=messages,
80121
provider=provider,
81122
tracker=tracker,
123+
create_tracker=tracker_factory if enabled else None,
82124
judge_configuration=judge_configuration,
83125
)
84126

@@ -134,7 +176,7 @@ def _judge_config(
134176
default: AIJudgeConfigDefault,
135177
variables: Optional[Dict[str, Any]] = None,
136178
) -> AIJudgeConfig:
137-
model, provider, messages, instructions, tracker, enabled, judge_configuration, variation = self.__evaluate(
179+
model, provider, messages, instructions, tracker, tracker_factory, enabled, judge_configuration, variation = self.__evaluate(
138180
key, context, default.to_dict(), variables
139181
)
140182

@@ -163,6 +205,7 @@ def _extract_evaluation_metric_key(variation: Dict[str, Any]) -> Optional[str]:
163205
messages=messages,
164206
provider=provider,
165207
tracker=tracker,
208+
create_tracker=tracker_factory if enabled else None,
166209
)
167210

168211
return config
@@ -248,14 +291,14 @@ async def create_judge(
248291
key, context, default or AIJudgeConfigDefault.disabled(), extended_variables
249292
)
250293

251-
if not judge_config.enabled or not judge_config.tracker:
294+
if not judge_config.enabled or not judge_config.create_tracker:
252295
return None
253296

254297
provider = RunnerFactory.create_model(judge_config, default_ai_provider)
255298
if not provider:
256299
return None
257300

258-
return Judge(judge_config, judge_config.tracker, provider)
301+
return Judge(judge_config, judge_config.create_tracker(), provider)
259302
except Exception as error:
260303
return None
261304

@@ -345,7 +388,7 @@ async def create_model(
345388
log.debug(f"Creating managed model for key: {key}")
346389
config = self._completion_config(key, context, default or AICompletionConfigDefault.disabled(), variables)
347390

348-
if not config.enabled or not config.tracker:
391+
if not config.enabled or not config.create_tracker:
349392
return None
350393

351394
runner = RunnerFactory.create_model(config, default_ai_provider)
@@ -361,7 +404,7 @@ async def create_model(
361404
default_ai_provider,
362405
)
363406

364-
return ManagedModel(config, config.tracker, runner, judges)
407+
return ManagedModel(config, config.create_tracker(), runner, judges)
365408

366409
async def create_chat(
367410
self,
@@ -428,14 +471,14 @@ async def create_agent(
428471
log.debug(f"Creating managed agent for key: {key}")
429472
config = self.__evaluate_agent(key, context, default or AIAgentConfigDefault.disabled(), variables)
430473

431-
if not config.enabled or not config.tracker:
474+
if not config.enabled or not config.create_tracker:
432475
return None
433476

434477
runner = RunnerFactory.create_agent(config, tools or {}, default_ai_provider)
435478
if not runner:
436479
return None
437480

438-
return ManagedAgent(config, config.tracker, runner)
481+
return ManagedAgent(config, config.create_tracker(), runner)
439482

440483
def agent_config(
441484
self,
@@ -754,7 +797,7 @@ def __evaluate(
754797
graph_key: Optional[str] = None,
755798
) -> Tuple[
756799
Optional[ModelConfig], Optional[ProviderConfig], Optional[List[LDMessage]],
757-
Optional[str], LDAIConfigTracker, bool, Optional[Any], Dict[str, Any]
800+
Optional[str], LDAIConfigTracker, Callable[[], LDAIConfigTracker], bool, Optional[Any], Dict[str, Any]
758801
]:
759802
"""
760803
Internal method to evaluate a configuration and extract components.
@@ -806,16 +849,24 @@ def __evaluate(
806849
custom=custom
807850
)
808851

809-
tracker = LDAIConfigTracker(
810-
self._client,
811-
variation.get('_ldMeta', {}).get('variationKey', ''),
812-
key,
813-
int(variation.get('_ldMeta', {}).get('version', 1)),
814-
model.name if model else '',
815-
provider_config.name if provider_config else '',
816-
context,
817-
graph_key=graph_key,
818-
)
852+
variation_key = variation.get('_ldMeta', {}).get('variationKey', '')
853+
version = int(variation.get('_ldMeta', {}).get('version', 1))
854+
model_name = model.name if model else ''
855+
provider_name = provider_config.name if provider_config else ''
856+
857+
def tracker_factory() -> LDAIConfigTracker:
858+
return LDAIConfigTracker(
859+
self._client,
860+
variation_key,
861+
key,
862+
version,
863+
model_name,
864+
provider_name,
865+
context,
866+
graph_key=graph_key,
867+
)
868+
869+
tracker = tracker_factory()
819870

820871
enabled = variation.get('_ldMeta', {}).get('enabled', False)
821872

@@ -834,7 +885,7 @@ def __evaluate(
834885
if judges:
835886
judge_configuration = JudgeConfiguration(judges=judges)
836887

837-
return model, provider_config, messages, instructions, tracker, enabled, judge_configuration, variation
888+
return model, provider_config, messages, instructions, tracker, tracker_factory, enabled, judge_configuration, variation
838889

839890
def __evaluate_agent(
840891
self,
@@ -854,7 +905,7 @@ def __evaluate_agent(
854905
:param graph_key: When set, passed to the tracker so all events include ``graphKey``.
855906
:return: Configured AIAgentConfig instance.
856907
"""
857-
model, provider, messages, instructions, tracker, enabled, judge_configuration, _ = self.__evaluate(
908+
model, provider, messages, instructions, tracker, tracker_factory, enabled, judge_configuration, _ = self.__evaluate(
858909
key, context, default.to_dict(), variables, graph_key=graph_key
859910
)
860911

@@ -868,6 +919,7 @@ def __evaluate_agent(
868919
provider=provider or default.provider,
869920
instructions=final_instructions,
870921
tracker=tracker,
922+
create_tracker=tracker_factory if enabled else None,
871923
judge_configuration=judge_configuration or default.judge_configuration,
872924
)
873925

packages/sdk/server-ai/src/ldai/judge/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ async def evaluate(
7373
return judge_result
7474

7575
judge_result.sampled = True
76+
7677
messages = self._construct_evaluation_messages(input_text, output_text)
7778
assert self._evaluation_response_structure is not None
7879

packages/sdk/server-ai/src/ldai/managed_agent.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ async def run(self, input: str) -> AgentResult:
3030
:param input: The user prompt or input to the agent
3131
:return: AgentResult containing the agent's output and metrics
3232
"""
33-
return await self._tracker.track_metrics_of_async(
33+
tracker = self._ai_config.create_tracker() if self._ai_config.create_tracker else self._tracker
34+
return await tracker.track_metrics_of_async(
3435
lambda: self._agent_runner.run(input),
3536
lambda result: result.metrics,
3637
)

packages/sdk/server-ai/src/ldai/managed_model.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,15 @@ async def invoke(self, prompt: str) -> ModelResponse:
4242
:param prompt: The user prompt to send to the model
4343
:return: ModelResponse containing the model's response and metrics
4444
"""
45+
tracker = self._ai_config.create_tracker() if self._ai_config.create_tracker else self._tracker
46+
4547
user_message = LDMessage(role='user', content=prompt)
4648
self._messages.append(user_message)
4749

4850
config_messages = self._ai_config.messages or []
4951
all_messages = config_messages + self._messages
5052

51-
response = await self._tracker.track_metrics_of_async(
53+
response = await tracker.track_metrics_of_async(
5254
lambda: self._model_runner.invoke_model(all_messages),
5355
lambda result: result.metrics,
5456
)
@@ -57,13 +59,14 @@ async def invoke(self, prompt: str) -> ModelResponse:
5759
self._ai_config.judge_configuration
5860
and self._ai_config.judge_configuration.judges
5961
):
60-
response.evaluations = self._start_judge_evaluations(self._messages, response)
62+
response.evaluations = self._start_judge_evaluations(tracker, self._messages, response)
6163

6264
self._messages.append(response.message)
6365
return response
6466

6567
def _start_judge_evaluations(
6668
self,
69+
tracker: LDAIConfigTracker,
6770
messages: List[LDMessage],
6871
response: ModelResponse,
6972
) -> List[asyncio.Task[Optional[JudgeResult]]]:
@@ -77,7 +80,7 @@ async def evaluate_judge(judge_config: Any) -> Optional[JudgeResult]:
7780
return None
7881
judge_result = await judge.evaluate_messages(messages, response, judge_config.sampling_rate)
7982
if judge_result.success:
80-
self._tracker.track_judge_result(judge_result)
83+
tracker.track_judge_result(judge_result)
8184
return judge_result
8285

8386
return [

packages/sdk/server-ai/src/ldai/models.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import warnings
22
from dataclasses import dataclass, field
3-
from typing import Any, Dict, List, Literal, Optional, Union
3+
from typing import Any, Callable, Dict, List, Literal, Optional, Union
44

55

66
@dataclass
@@ -181,6 +181,7 @@ class AIConfig:
181181
model: Optional[ModelConfig] = None
182182
provider: Optional[ProviderConfig] = None
183183
tracker: Optional[Any] = None
184+
create_tracker: Optional[Callable[[], Any]] = None
184185

185186
def _base_to_dict(self) -> Dict[str, Any]:
186187
"""

packages/sdk/server-ai/src/ldai/tracker.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import base64
2+
import json
13
import logging
24
import time
35
import uuid
@@ -82,6 +84,7 @@ def __init__(
8284
provider_name: str,
8385
context: Context,
8486
graph_key: Optional[str] = None,
87+
run_id: Optional[str] = None,
8588
):
8689
"""
8790
Initialize an AI Config tracker.
@@ -95,6 +98,7 @@ def __init__(
9598
:param context: Context for evaluation.
9699
:param graph_key: When set, include ``graphKey`` in all event payloads
97100
(e.g. config-level metrics inside a graph).
101+
:param run_id: Optional run ID. When not provided, a new UUID is generated.
98102
"""
99103
self._ld_client = ld_client
100104
self._variation_key = variation_key
@@ -105,9 +109,26 @@ def __init__(
105109
self._context = context
106110
self._graph_key = graph_key
107111
self._summary = LDAIMetricSummary()
108-
self._run_id = str(uuid.uuid4())
112+
self._run_id = run_id or str(uuid.uuid4())
109113
self._tracked: Dict[str, bool] = {}
110114

115+
@property
116+
def resumption_token(self) -> str:
117+
"""
118+
A URL-safe Base64-encoded JSON string that can be used to reconstruct
119+
a tracker in a different process (e.g. for deferred feedback).
120+
121+
The token contains ``runId``, ``configKey``, ``variationKey``, and
122+
``version``. ``modelName`` and ``providerName`` are **not** included.
123+
"""
124+
payload = json.dumps({
125+
"runId": self._run_id,
126+
"configKey": self._config_key,
127+
"variationKey": self._variation_key,
128+
"version": self._version,
129+
})
130+
return base64.urlsafe_b64encode(payload.encode("utf-8")).rstrip(b"=").decode("utf-8")
131+
111132
def __get_track_data(self) -> dict:
112133
"""
113134
Get tracking data for events.

packages/sdk/server-ai/tests/test_managed_agent.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class TestManagedAgentRun:
5555
async def test_run_delegates_to_agent_runner(self):
5656
"""Should delegate run() to the underlying AgentRunner."""
5757
mock_config = MagicMock(spec=AIAgentConfig)
58+
mock_config.create_tracker = None # fall back to passed tracker
5859
mock_tracker = MagicMock()
5960
mock_tracker.track_metrics_of_async = AsyncMock(
6061
return_value=AgentResult(
@@ -79,6 +80,31 @@ async def test_run_delegates_to_agent_runner(self):
7980
assert result.metrics.success is True
8081
mock_tracker.track_metrics_of_async.assert_called_once()
8182

83+
@pytest.mark.asyncio
84+
async def test_run_uses_create_tracker_when_available(self):
85+
"""Should use create_tracker() factory for a fresh tracker per invocation."""
86+
mock_config = MagicMock(spec=AIAgentConfig)
87+
fresh_tracker = MagicMock()
88+
fresh_tracker.track_metrics_of_async = AsyncMock(
89+
return_value=AgentResult(
90+
output="Fresh tracker response",
91+
raw=None,
92+
metrics=LDAIMetrics(success=True, usage=None),
93+
)
94+
)
95+
mock_config.create_tracker = MagicMock(return_value=fresh_tracker)
96+
97+
old_tracker = MagicMock()
98+
mock_runner = MagicMock()
99+
100+
agent = ManagedAgent(mock_config, old_tracker, mock_runner)
101+
result = await agent.run("Hello")
102+
103+
assert result.output == "Fresh tracker response"
104+
mock_config.create_tracker.assert_called_once()
105+
fresh_tracker.track_metrics_of_async.assert_called_once()
106+
old_tracker.track_metrics_of_async.assert_not_called()
107+
82108
def test_get_agent_runner_returns_runner(self):
83109
"""Should return the underlying AgentRunner."""
84110
mock_runner = MagicMock()

0 commit comments

Comments
 (0)