Skip to content

Commit ac17703

Browse files
declan-scaleclaude
andcommitted
fix(messages): stamp agent messages with workflow.now() for monotonic ordering
Pairs with scaleapi/scale-agentex#233 (server now respects caller-supplied created_at). Without an SDK-side timestamp, the user-echo and the assistant- shell messages.create calls can land at the server within the same ms, and the assistant turn sorts before the user message that triggered it on reload. - MessagesModule.create / create_batch auto-inject workflow.now() inside a Temporal workflow; caller-supplied values are respected; outside workflows remains None so the server wall-clock applies (sync / plain-async agents). - CreateMessageParams / CreateMessagesBatchParams and MessagesService thread created_at to the SDK client (omit sentinel when None). - StreamingTaskMessageContext + factory accept created_at and forward to the initial messages.create at open(). - Auto-send dispatchers (adk.providers.litellm / .openai) capture workflow.now() at the workflow->activity boundary and thread it through ChatCompletion* AutoSendParams / RunAgent(Streamed)AutoSendParams. Inside the activity the timestamp is consumed once on the first streaming context per turn; later contexts in the same turn fall back to server wall-clock (naturally separated by network/processing latency plus PR 233's ms-stagger). - workflow_now_if_in_workflow() helper in lib.utils.temporal. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 96e2b0f commit ac17703

17 files changed

Lines changed: 634 additions & 178 deletions

File tree

src/agentex/lib/adk/_modules/messages.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# ruff: noqa: I001
22
# Import order matters - AsyncTracer must come after client import to avoid circular imports
33
from __future__ import annotations
4-
from datetime import timedelta
4+
from datetime import datetime, timedelta
55

66
from temporalio.common import RetryPolicy
77

@@ -22,7 +22,7 @@
2222
from agentex.lib.core.tracing.tracer import AsyncTracer
2323
from agentex.types.task_message import TaskMessage, TaskMessageContent
2424
from agentex.lib.utils.logging import make_logger
25-
from agentex.lib.utils.temporal import in_temporal_workflow
25+
from agentex.lib.utils.temporal import in_temporal_workflow, workflow_now_if_in_workflow
2626

2727
logger = make_logger(__name__)
2828

@@ -66,6 +66,7 @@ async def create(
6666
start_to_close_timeout: timedelta = timedelta(seconds=5),
6767
heartbeat_timeout: timedelta = timedelta(seconds=5),
6868
retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
69+
created_at: datetime | None = None,
6970
) -> TaskMessage:
7071
"""
7172
Create a new message for a task.
@@ -82,12 +83,17 @@ async def create(
8283
Returns:
8384
TaskMessageEntity: The created message.
8485
"""
86+
# Default created_at to workflow.now() so two awaited adk.messages.create
87+
# calls from the same workflow are guaranteed monotonic at the server.
88+
if created_at is None and in_temporal_workflow():
89+
created_at = workflow_now_if_in_workflow()
8590
params = CreateMessageParams(
8691
trace_id=trace_id,
8792
parent_span_id=parent_span_id,
8893
task_id=task_id,
8994
content=content,
9095
emit_updates=emit_updates,
96+
created_at=created_at,
9197
)
9298
if in_temporal_workflow():
9399
return await ActivityHelpers.execute_activity(
@@ -103,6 +109,7 @@ async def create(
103109
task_id=task_id,
104110
content=content,
105111
emit_updates=emit_updates,
112+
created_at=created_at,
106113
)
107114

108115
async def update(
@@ -163,6 +170,7 @@ async def create_batch(
163170
start_to_close_timeout: timedelta = timedelta(seconds=5),
164171
heartbeat_timeout: timedelta = timedelta(seconds=5),
165172
retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
173+
created_at: datetime | None = None,
166174
) -> list[TaskMessage]:
167175
"""
168176
Create a batch of messages for a task.
@@ -177,12 +185,15 @@ async def create_batch(
177185
Returns:
178186
List[TaskMessageEntity]: The created messages.
179187
"""
188+
if created_at is None and in_temporal_workflow():
189+
created_at = workflow_now_if_in_workflow()
180190
params = CreateMessagesBatchParams(
181191
task_id=task_id,
182192
contents=contents,
183193
emit_updates=emit_updates,
184194
trace_id=trace_id,
185195
parent_span_id=parent_span_id,
196+
created_at=created_at,
186197
)
187198
if in_temporal_workflow():
188199
return await ActivityHelpers.execute_activity(
@@ -198,6 +209,7 @@ async def create_batch(
198209
task_id=task_id,
199210
contents=contents,
200211
emit_updates=emit_updates,
212+
created_at=created_at,
201213
)
202214

203215
async def update_batch(

src/agentex/lib/adk/_modules/streaming.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# ruff: noqa: I001
22
# Import order matters - AsyncTracer must come after client import to avoid circular imports
33
from __future__ import annotations
4+
from datetime import datetime
45
from temporalio.common import RetryPolicy
56

67
from agentex import AsyncAgentex # noqa: F401
@@ -52,6 +53,7 @@ def streaming_task_message_context(
5253
task_id: str,
5354
initial_content: TaskMessageContent,
5455
streaming_mode: StreamingMode = "coalesced",
56+
created_at: datetime | None = None,
5557
) -> StreamingTaskMessageContext:
5658
"""
5759
Create a streaming context for managing TaskMessage lifecycle.
@@ -83,4 +85,5 @@ def streaming_task_message_context(
8385
task_id=task_id,
8486
initial_content=initial_content,
8587
streaming_mode=streaming_mode,
88+
created_at=created_at,
8689
)

src/agentex/lib/adk/providers/_modules/litellm.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from temporalio.common import RetryPolicy
77

88
from agentex.lib.utils.logging import make_logger
9-
from agentex.lib.utils.temporal import in_temporal_workflow
9+
from agentex.lib.utils.temporal import in_temporal_workflow, workflow_now_if_in_workflow
1010
from agentex.types.task_message import TaskMessage
1111
from agentex.lib.types.llm_messages import LLMConfig, Completion
1212
from agentex.lib.core.tracing.tracer import AsyncTracer
@@ -88,9 +88,7 @@ async def chat_completion(
8888
Completion: An OpenAI compatible Completion object
8989
"""
9090
if in_temporal_workflow():
91-
params = ChatCompletionParams(
92-
trace_id=trace_id, parent_span_id=parent_span_id, llm_config=llm_config
93-
)
91+
params = ChatCompletionParams(trace_id=trace_id, parent_span_id=parent_span_id, llm_config=llm_config)
9492
return await ActivityHelpers.execute_activity(
9593
activity_name=LiteLLMActivityName.CHAT_COMPLETION,
9694
request=params,
@@ -138,6 +136,7 @@ async def chat_completion_auto_send(
138136
parent_span_id=parent_span_id,
139137
task_id=task_id,
140138
llm_config=llm_config,
139+
created_at=workflow_now_if_in_workflow(),
141140
)
142141
return await ActivityHelpers.execute_activity(
143142
activity_name=LiteLLMActivityName.CHAT_COMPLETION_AUTO_SEND,
@@ -222,6 +221,7 @@ async def chat_completion_stream_auto_send(
222221
parent_span_id=parent_span_id,
223222
task_id=task_id,
224223
llm_config=llm_config,
224+
created_at=workflow_now_if_in_workflow(),
225225
)
226226
return await ActivityHelpers.execute_activity(
227227
activity_name=LiteLLMActivityName.CHAT_COMPLETION_STREAM_AUTO_SEND,

src/agentex/lib/adk/providers/_modules/openai.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from typing_extensions import deprecated
2121

2222
from agentex.lib.utils.logging import make_logger
23-
from agentex.lib.utils.temporal import in_temporal_workflow
23+
from agentex.lib.utils.temporal import in_temporal_workflow, workflow_now_if_in_workflow
2424
from agentex.lib.core.tracing.tracer import AsyncTracer
2525
from agentex.lib.types.agent_results import (
2626
SerializableRunResult,
@@ -265,6 +265,7 @@ async def run_agent_auto_send(
265265
output_guardrails=output_guardrails, # type: ignore[arg-type]
266266
max_turns=max_turns,
267267
previous_response_id=previous_response_id,
268+
created_at=workflow_now_if_in_workflow(),
268269
)
269270
return await ActivityHelpers.execute_activity(
270271
activity_name=OpenAIActivityName.RUN_AGENT_AUTO_SEND,
@@ -479,6 +480,7 @@ async def run_agent_streamed_auto_send(
479480
input_guardrails=input_guardrails,
480481
output_guardrails=output_guardrails,
481482
max_turns=max_turns,
483+
created_at=workflow_now_if_in_workflow(),
482484
)
483485
return await ActivityHelpers.execute_activity(
484486
activity_name=OpenAIActivityName.RUN_AGENT_STREAMED_AUTO_SEND,
@@ -509,4 +511,4 @@ async def run_agent_streamed_auto_send(
509511
output_guardrails=output_guardrails,
510512
max_turns=max_turns,
511513
previous_response_id=previous_response_id,
512-
)
514+
)

src/agentex/lib/core/services/adk/messages.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22

33
import asyncio
44
from typing import Any, Coroutine
5+
from datetime import datetime
56

67
from agentex import AsyncAgentex
8+
from agentex._types import omit
79
from agentex.lib.utils.logging import make_logger
810
from agentex.lib.utils.temporal import heartbeat_if_in_workflow
911
from agentex.types.task_message import TaskMessage, TaskMessageContent
@@ -32,6 +34,7 @@ async def create_message(
3234
emit_updates: bool = True,
3335
trace_id: str | None = None,
3436
parent_span_id: str | None = None,
37+
created_at: datetime | None = None,
3538
) -> TaskMessage:
3639
trace = self._tracer.trace(trace_id)
3740
async with trace.span(
@@ -43,6 +46,7 @@ async def create_message(
4346
task_message = await self._agentex_client.messages.create(
4447
task_id=task_id,
4548
content=content.model_dump(),
49+
created_at=created_at if created_at is not None else omit,
4650
)
4751
if emit_updates:
4852
await self._emit_updates([task_message])
@@ -85,6 +89,7 @@ async def create_messages_batch(
8589
emit_updates: bool = True,
8690
trace_id: str | None = None,
8791
parent_span_id: str | None = None,
92+
created_at: datetime | None = None,
8893
) -> list[TaskMessage]:
8994
trace = self._tracer.trace(trace_id)
9095
async with trace.span(
@@ -96,6 +101,7 @@ async def create_messages_batch(
96101
task_messages = await self._agentex_client.messages.batch.create(
97102
task_id=task_id,
98103
contents=[content.model_dump() for content in contents],
104+
created_at=created_at if created_at is not None else omit,
99105
)
100106
if emit_updates:
101107
await self._emit_updates(task_messages)
@@ -119,10 +125,7 @@ async def update_messages_batch(
119125
heartbeat_if_in_workflow("update messages batch")
120126
task_messages = await self._agentex_client.messages.batch.update(
121127
task_id=task_id,
122-
updates={
123-
message_id: content.model_dump()
124-
for message_id, content in updates.items()
125-
},
128+
updates={message_id: content.model_dump() for message_id, content in updates.items()},
126129
)
127130
if span:
128131
span.output = [task_message.model_dump() for task_message in task_messages]

src/agentex/lib/core/services/adk/providers/litellm.py

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
from datetime import datetime
34
from collections.abc import AsyncGenerator
45

56
from agentex import AsyncAgentex
@@ -63,6 +64,7 @@ async def chat_completion_auto_send(
6364
llm_config: LLMConfig,
6465
trace_id: str | None = None,
6566
parent_span_id: str | None = None,
67+
created_at: datetime | None = None,
6668
) -> TaskMessage | None:
6769
"""
6870
Chat completion with automatic TaskMessage creation. This does not stream the completion. To stream use chat_completion_stream_auto_send.
@@ -98,13 +100,10 @@ async def chat_completion_auto_send(
98100
content="",
99101
format="markdown",
100102
),
103+
created_at=created_at,
101104
) as streaming_context:
102105
completion = await self.llm_gateway.acompletion(**llm_config.model_dump())
103-
if (
104-
completion.choices
105-
and len(completion.choices) > 0
106-
and completion.choices[0].message
107-
):
106+
if completion.choices and len(completion.choices) > 0 and completion.choices[0].message:
108107
final_content = TextContent(
109108
author="agent",
110109
content=completion.choices[0].message.content or "",
@@ -159,9 +158,7 @@ async def chat_completion_stream(
159158
) as span:
160159
# Direct streaming outside temporal - yield each chunk as it comes
161160
chunks: list[Completion] = []
162-
async for chunk in self.llm_gateway.acompletion_stream(
163-
**llm_config.model_dump()
164-
):
161+
async for chunk in self.llm_gateway.acompletion_stream(**llm_config.model_dump()):
165162
chunks.append(chunk)
166163
yield chunk
167164
if span:
@@ -173,6 +170,7 @@ async def chat_completion_stream_auto_send(
173170
llm_config: LLMConfig,
174171
trace_id: str | None = None,
175172
parent_span_id: str | None = None,
173+
created_at: datetime | None = None,
176174
) -> TaskMessage | None:
177175
"""
178176
Stream chat completion with automatic TaskMessage creation and streaming.
@@ -206,18 +204,13 @@ async def chat_completion_stream_auto_send(
206204
content="",
207205
format="markdown",
208206
),
207+
created_at=created_at,
209208
) as streaming_context:
210209
# Get the streaming response
211210
chunks = []
212-
async for response in self.llm_gateway.acompletion_stream(
213-
**llm_config.model_dump()
214-
):
211+
async for response in self.llm_gateway.acompletion_stream(**llm_config.model_dump()):
215212
heartbeat_if_in_workflow("chat completion streaming")
216-
if (
217-
response.choices
218-
and len(response.choices) > 0
219-
and response.choices[0].delta
220-
):
213+
if response.choices and len(response.choices) > 0 and response.choices[0].delta:
221214
delta = response.choices[0].delta.content
222215
if delta:
223216
# Stream the chunk via the context manager
@@ -235,11 +228,7 @@ async def chat_completion_stream_auto_send(
235228

236229
# Update the final message content
237230
complete_message = concat_completion_chunks(chunks)
238-
if (
239-
complete_message
240-
and complete_message.choices
241-
and complete_message.choices[0].message
242-
):
231+
if complete_message and complete_message.choices and complete_message.choices[0].message:
243232
final_content = TextContent(
244233
author="agent",
245234
content=complete_message.choices[0].message.content or "",

0 commit comments

Comments
 (0)