Skip to content

Commit cda2a55

Browse files
zastrowmDi-Is
andauthored
fix: restore explicit span.end() to fix span end_time regression (#2032)
Co-authored-by: Di-Is <rhoxbox@gmail.com> Co-authored-by: Mackenzie Zastrow <zastrowm@users.noreply.github.com>
1 parent 5391794 commit cda2a55

File tree

5 files changed

+264
-81
lines changed

5 files changed

+264
-81
lines changed

src/strands/event_loop/event_loop.py

Lines changed: 70 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -139,25 +139,29 @@ async def event_loop_cycle(
139139
)
140140
invocation_state["event_loop_cycle_span"] = cycle_span
141141

142-
with trace_api.use_span(cycle_span, end_on_exit=True):
143-
# Skipping model invocation if in interrupt state as interrupts are currently only supported for tool calls.
144-
if agent._interrupt_state.activated:
145-
stop_reason: StopReason = "tool_use"
146-
message = agent._interrupt_state.context["tool_use_message"]
147-
# Skip model invocation if the latest message contains ToolUse
148-
elif _has_tool_use_in_latest_message(agent.messages):
149-
stop_reason = "tool_use"
150-
message = agent.messages[-1]
151-
else:
152-
model_events = _handle_model_execution(
153-
agent, cycle_span, cycle_trace, invocation_state, tracer, structured_output_context
154-
)
155-
async for model_event in model_events:
156-
if not isinstance(model_event, ModelStopReason):
157-
yield model_event
142+
with trace_api.use_span(cycle_span, end_on_exit=False):
143+
try:
144+
# Skipping model invocation if in interrupt state as interrupts are currently only supported for tool calls.
145+
if agent._interrupt_state.activated:
146+
stop_reason: StopReason = "tool_use"
147+
message = agent._interrupt_state.context["tool_use_message"]
148+
# Skip model invocation if the latest message contains ToolUse
149+
elif _has_tool_use_in_latest_message(agent.messages):
150+
stop_reason = "tool_use"
151+
message = agent.messages[-1]
152+
else:
153+
model_events = _handle_model_execution(
154+
agent, cycle_span, cycle_trace, invocation_state, tracer, structured_output_context
155+
)
156+
async for model_event in model_events:
157+
if not isinstance(model_event, ModelStopReason):
158+
yield model_event
158159

159-
stop_reason, message, *_ = model_event["stop"]
160-
yield ModelMessageEvent(message=message)
160+
stop_reason, message, *_ = model_event["stop"]
161+
yield ModelMessageEvent(message=message)
162+
except Exception as e:
163+
tracer.end_span_with_error(cycle_span, str(e), e)
164+
raise
161165

162166
try:
163167
if stop_reason == "max_tokens":
@@ -196,42 +200,45 @@ async def event_loop_cycle(
196200

197201
# End the cycle and return results
198202
agent.event_loop_metrics.end_cycle(cycle_start_time, cycle_trace, attributes)
199-
# Set attributes before span auto-closes
203+
204+
# Force structured output tool call if LLM didn't use it automatically
205+
if structured_output_context.is_enabled and stop_reason == "end_turn":
206+
if structured_output_context.force_attempted:
207+
raise StructuredOutputException(
208+
"The model failed to invoke the structured output tool even after it was forced."
209+
)
210+
structured_output_context.set_forced_mode()
211+
logger.debug("Forcing structured output tool")
212+
await agent._append_messages(
213+
{"role": "user", "content": [{"text": structured_output_context.structured_output_prompt}]}
214+
)
215+
216+
tracer.end_event_loop_cycle_span(cycle_span, message)
217+
events = recurse_event_loop(
218+
agent=agent, invocation_state=invocation_state, structured_output_context=structured_output_context
219+
)
220+
async for typed_event in events:
221+
yield typed_event
222+
return
223+
200224
tracer.end_event_loop_cycle_span(cycle_span, message)
201-
except EventLoopException:
202-
# Don't yield or log the exception - we already did it when we
203-
# raised the exception and we don't need that duplication.
225+
yield EventLoopStopEvent(stop_reason, message, agent.event_loop_metrics, invocation_state["request_state"])
226+
except (
227+
StructuredOutputException,
228+
EventLoopException,
229+
ContextWindowOverflowException,
230+
MaxTokensReachedException,
231+
) as e:
232+
# These exceptions should bubble up directly rather than get wrapped in an EventLoopException
233+
tracer.end_span_with_error(cycle_span, str(e), e)
204234
raise
205-
except (ContextWindowOverflowException, MaxTokensReachedException) as e:
206-
# Special cased exceptions which we want to bubble up rather than get wrapped in an EventLoopException
207-
raise e
208235
except Exception as e:
236+
tracer.end_span_with_error(cycle_span, str(e), e)
209237
# Handle any other exceptions
210238
yield ForceStopEvent(reason=e)
211239
logger.exception("cycle failed")
212240
raise EventLoopException(e, invocation_state["request_state"]) from e
213241

214-
# Force structured output tool call if LLM didn't use it automatically
215-
if structured_output_context.is_enabled and stop_reason == "end_turn":
216-
if structured_output_context.force_attempted:
217-
raise StructuredOutputException(
218-
"The model failed to invoke the structured output tool even after it was forced."
219-
)
220-
structured_output_context.set_forced_mode()
221-
logger.debug("Forcing structured output tool")
222-
await agent._append_messages(
223-
{"role": "user", "content": [{"text": structured_output_context.structured_output_prompt}]}
224-
)
225-
226-
events = recurse_event_loop(
227-
agent=agent, invocation_state=invocation_state, structured_output_context=structured_output_context
228-
)
229-
async for typed_event in events:
230-
yield typed_event
231-
return
232-
233-
yield EventLoopStopEvent(stop_reason, message, agent.event_loop_metrics, invocation_state["request_state"])
234-
235242

236243
async def recurse_event_loop(
237244
agent: "Agent",
@@ -316,20 +323,21 @@ async def _handle_model_execution(
316323
system_prompt=agent.system_prompt,
317324
system_prompt_content=agent._system_prompt_content,
318325
)
319-
with trace_api.use_span(model_invoke_span, end_on_exit=True):
320-
await agent.hooks.invoke_callbacks_async(
321-
BeforeModelCallEvent(
322-
agent=agent,
323-
invocation_state=invocation_state,
326+
with trace_api.use_span(model_invoke_span, end_on_exit=False):
327+
try:
328+
await agent.hooks.invoke_callbacks_async(
329+
BeforeModelCallEvent(
330+
agent=agent,
331+
invocation_state=invocation_state,
332+
)
324333
)
325-
)
326334

327-
if structured_output_context.forced_mode:
328-
tool_spec = structured_output_context.get_tool_spec()
329-
tool_specs = [tool_spec] if tool_spec else []
330-
else:
331-
tool_specs = agent.tool_registry.get_all_tool_specs()
332-
try:
335+
if structured_output_context.forced_mode:
336+
tool_spec = structured_output_context.get_tool_spec()
337+
tool_specs = [tool_spec] if tool_spec else []
338+
else:
339+
tool_specs = agent.tool_registry.get_all_tool_specs()
340+
333341
async for event in stream_messages(
334342
agent.model,
335343
agent.system_prompt,
@@ -363,17 +371,17 @@ async def _handle_model_execution(
363371
"stop_reason=<%s>, retry_requested=<True> | hook requested model retry",
364372
stop_reason,
365373
)
374+
tracer.end_model_invoke_span(model_invoke_span, message, usage, metrics, stop_reason)
366375
continue # Retry the model call
367376

368377
if stop_reason == "max_tokens":
369378
message = recover_message_on_max_tokens_reached(message)
370379

371-
# Set attributes before span auto-closes
372380
tracer.end_model_invoke_span(model_invoke_span, message, usage, metrics, stop_reason)
373381
break # Success! Break out of retry loop
374382

375383
except Exception as e:
376-
# Exception is automatically recorded by use_span with end_on_exit=True
384+
tracer.end_span_with_error(model_invoke_span, str(e), e)
377385
after_model_call_event = AfterModelCallEvent(
378386
agent=agent,
379387
invocation_state=invocation_state,
@@ -541,7 +549,7 @@ async def _handle_tool_execution(
541549
interrupts,
542550
structured_output=structured_output_result,
543551
)
544-
# Set attributes before span auto-closes (span is managed by use_span in event_loop_cycle)
552+
# End the cycle span before yielding the recursive cycle.
545553
if cycle_span:
546554
tracer.end_event_loop_cycle_span(span=cycle_span, message=message)
547555

@@ -559,7 +567,7 @@ async def _handle_tool_execution(
559567

560568
yield ToolResultMessageEvent(message=tool_result_message)
561569

562-
# Set attributes before span auto-closes (span is managed by use_span in event_loop_cycle)
570+
# End the cycle span before yielding the recursive cycle.
563571
if cycle_span:
564572
tracer.end_event_loop_cycle_span(span=cycle_span, message=message, tool_result_message=tool_result_message)
565573

src/strands/telemetry/tracer.py

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -185,15 +185,17 @@ def _end_span(
185185
span: Span,
186186
attributes: dict[str, AttributeValue] | None = None,
187187
error: Exception | None = None,
188+
error_message: str | None = None,
188189
) -> None:
189190
"""Generic helper method to end a span.
190191
191192
Args:
192193
span: The span to end
193194
attributes: Optional attributes to set before ending the span
194195
error: Optional exception if an error occurred
196+
error_message: Optional error message to set in the span status
195197
"""
196-
if not span:
198+
if not span or not span.is_recording():
197199
return
198200

199201
try:
@@ -206,7 +208,8 @@ def _end_span(
206208

207209
# Handle error if present
208210
if error:
209-
span.set_status(StatusCode.ERROR, str(error))
211+
status_description = error_message or str(error) or type(error).__name__
212+
span.set_status(StatusCode.ERROR, status_description)
210213
span.record_exception(error)
211214
else:
212215
span.set_status(StatusCode.OK)
@@ -229,11 +232,11 @@ def end_span_with_error(self, span: Span, error_message: str, exception: Excepti
229232
error_message: Error message to set in the span status.
230233
exception: Optional exception to record in the span.
231234
"""
232-
if not span:
235+
if not span or not span.is_recording():
233236
return
234237

235238
error = exception or Exception(error_message)
236-
self._end_span(span, error=error)
239+
self._end_span(span, error=error, error_message=error_message)
237240

238241
def _add_event(
239242
self, span: Span | None, event_name: str, event_attributes: Attributes, to_span_attributes: bool = False
@@ -330,18 +333,15 @@ def end_model_invoke_span(
330333
) -> None:
331334
"""End a model invocation span with results and metrics.
332335
333-
Note: The span is automatically closed and exceptions recorded. This method just sets the necessary attributes.
334-
Status in the span is automatically set to UNSET (OK) on success or ERROR on exception.
335-
336336
Args:
337-
span: The span to set attributes on.
337+
span: The span to end.
338338
message: The message response from the model.
339339
usage: Token usage information from the model call.
340340
metrics: Metrics from the model call.
341341
stop_reason: The reason the model stopped generating.
342342
"""
343-
# Set end time attribute
344-
span.set_attribute("gen_ai.event.end_time", datetime.now(timezone.utc).isoformat())
343+
if not span or not span.is_recording():
344+
return
345345

346346
attributes: dict[str, AttributeValue] = {
347347
"gen_ai.usage.prompt_tokens": usage["inputTokens"],
@@ -378,7 +378,7 @@ def end_model_invoke_span(
378378
event_attributes={"finish_reason": str(stop_reason), "message": serialize(message["content"])},
379379
)
380380

381-
span.set_attributes(attributes)
381+
self._end_span(span, attributes)
382382

383383
def start_tool_call_span(
384384
self,
@@ -553,20 +553,14 @@ def end_event_loop_cycle_span(
553553
) -> None:
554554
"""End an event loop cycle span with results.
555555
556-
Note: The span is automatically closed and exceptions recorded. This method just sets the necessary attributes.
557-
Status in the span is automatically set to UNSET (OK) on success or ERROR on exception.
558-
559556
Args:
560-
span: The span to set attributes on.
557+
span: The span to end.
561558
message: The message response from this cycle.
562559
tool_result_message: Optional tool result message if a tool was called.
563560
"""
564-
if not span:
561+
if not span or not span.is_recording():
565562
return
566563

567-
# Set end time attribute
568-
span.set_attribute("gen_ai.event.end_time", datetime.now(timezone.utc).isoformat())
569-
570564
event_attributes: dict[str, AttributeValue] = {"message": serialize(message["content"])}
571565

572566
if tool_result_message:
@@ -591,6 +585,8 @@ def end_event_loop_cycle_span(
591585
else:
592586
self._add_event(span, "gen_ai.choice", event_attributes=event_attributes)
593587

588+
self._end_span(span)
589+
594590
def start_agent_span(
595591
self,
596592
messages: Messages,

tests/strands/event_loop/test_event_loop.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
from unittest.mock import ANY, AsyncMock, MagicMock, call, patch
66

77
import pytest
8+
from opentelemetry.sdk.trace import TracerProvider
9+
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
10+
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
811

912
import strands
1013
import strands.telemetry
@@ -19,6 +22,7 @@
1922
)
2023
from strands.interrupt import Interrupt, _InterruptState
2124
from strands.telemetry.metrics import EventLoopMetrics
25+
from strands.telemetry.tracer import Tracer
2226
from strands.tools.executors import SequentialToolExecutor
2327
from strands.tools.registry import ToolRegistry
2428
from strands.types._events import EventLoopStopEvent
@@ -583,6 +587,14 @@ async def test_event_loop_tracing_with_model_error(
583587
)
584588
await alist(stream)
585589

590+
assert mock_tracer.end_span_with_error.call_count == 2
591+
mock_tracer.end_span_with_error.assert_has_calls(
592+
[
593+
call(model_span, "Input too long", model.stream.side_effect),
594+
call(cycle_span, "Input too long", model.stream.side_effect),
595+
]
596+
)
597+
586598

587599
@pytest.mark.asyncio
588600
async def test_event_loop_cycle_max_tokens_exception(
@@ -673,6 +685,53 @@ async def test_event_loop_tracing_with_tool_execution(
673685
assert mock_tracer.end_model_invoke_span.call_count == 2
674686

675687

688+
@pytest.mark.asyncio
689+
async def test_event_loop_cycle_closes_cycle_span_before_recursive_cycle(
690+
agent,
691+
model,
692+
tool_stream,
693+
agenerator,
694+
alist,
695+
):
696+
exporter = InMemorySpanExporter()
697+
provider = TracerProvider()
698+
provider.add_span_processor(SimpleSpanProcessor(exporter))
699+
700+
tracer = Tracer()
701+
tracer.tracer_provider = provider
702+
tracer.tracer = provider.get_tracer(tracer.service_name)
703+
704+
async def delayed_text_stream():
705+
yield {"contentBlockDelta": {"delta": {"text": "test text"}}}
706+
await asyncio.sleep(0.05)
707+
yield {"contentBlockStop": {}}
708+
709+
agent.trace_span = None
710+
agent._system_prompt_content = None
711+
model.config = {"model_id": "test-model"}
712+
model.stream.side_effect = [
713+
agenerator(tool_stream),
714+
delayed_text_stream(),
715+
]
716+
717+
with patch("strands.event_loop.event_loop.get_tracer", return_value=tracer):
718+
stream = strands.event_loop.event_loop.event_loop_cycle(
719+
agent=agent,
720+
invocation_state={},
721+
)
722+
await alist(stream)
723+
724+
provider.force_flush()
725+
cycle_spans = sorted(
726+
[span for span in exporter.get_finished_spans() if span.name == "execute_event_loop_cycle"],
727+
key=lambda span: span.start_time,
728+
)
729+
730+
assert len(cycle_spans) == 2
731+
assert cycle_spans[0].end_time <= cycle_spans[1].start_time
732+
assert cycle_spans[0].end_time < cycle_spans[1].end_time
733+
734+
676735
@patch("strands.event_loop.event_loop.get_tracer")
677736
@pytest.mark.asyncio
678737
async def test_event_loop_tracing_with_throttling_exception(
@@ -709,6 +768,7 @@ async def test_event_loop_tracing_with_throttling_exception(
709768
)
710769
await alist(stream)
711770

771+
assert mock_tracer.end_span_with_error.call_count == 1
712772
# Verify span was created for the successful retry
713773
assert mock_tracer.start_model_invoke_span.call_count == 2
714774
assert mock_tracer.end_model_invoke_span.call_count == 1

0 commit comments

Comments
 (0)