Skip to content

Commit b8bd925

Browse files
committed
fix: restore explicit span.end() to fix span end_time regression
PR #1293 wrapped event_loop_cycle() in use_span(end_on_exit=True) and removed explicit span.end() calls. Because event_loop_cycle is an async generator, yield keeps the context manager open across recursive cycles, causing all execute_event_loop_cycle spans to share the same OTel end_time. Switch to end_on_exit=False and explicitly call span.end() via _end_span() in end_event_loop_cycle_span() and end_model_invoke_span(), restoring end_span_with_error() in all exception paths.
1 parent 566e5ad commit b8bd925

File tree

5 files changed

+262
-76
lines changed

5 files changed

+262
-76
lines changed

src/strands/event_loop/event_loop.py

Lines changed: 68 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -139,25 +139,29 @@ async def event_loop_cycle(
139139
)
140140
invocation_state["event_loop_cycle_span"] = cycle_span
141141

142-
with trace_api.use_span(cycle_span, end_on_exit=True):
143-
# Skipping model invocation if in interrupt state as interrupts are currently only supported for tool calls.
144-
if agent._interrupt_state.activated:
145-
stop_reason: StopReason = "tool_use"
146-
message = agent._interrupt_state.context["tool_use_message"]
147-
# Skip model invocation if the latest message contains ToolUse
148-
elif _has_tool_use_in_latest_message(agent.messages):
149-
stop_reason = "tool_use"
150-
message = agent.messages[-1]
151-
else:
152-
model_events = _handle_model_execution(
153-
agent, cycle_span, cycle_trace, invocation_state, tracer, structured_output_context
154-
)
155-
async for model_event in model_events:
156-
if not isinstance(model_event, ModelStopReason):
157-
yield model_event
142+
with trace_api.use_span(cycle_span, end_on_exit=False):
143+
try:
144+
# Skipping model invocation if in interrupt state as interrupts are currently only supported for tool calls.
145+
if agent._interrupt_state.activated:
146+
stop_reason: StopReason = "tool_use"
147+
message = agent._interrupt_state.context["tool_use_message"]
148+
# Skip model invocation if the latest message contains ToolUse
149+
elif _has_tool_use_in_latest_message(agent.messages):
150+
stop_reason = "tool_use"
151+
message = agent.messages[-1]
152+
else:
153+
model_events = _handle_model_execution(
154+
agent, cycle_span, cycle_trace, invocation_state, tracer, structured_output_context
155+
)
156+
async for model_event in model_events:
157+
if not isinstance(model_event, ModelStopReason):
158+
yield model_event
158159

159-
stop_reason, message, *_ = model_event["stop"]
160-
yield ModelMessageEvent(message=message)
160+
stop_reason, message, *_ = model_event["stop"]
161+
yield ModelMessageEvent(message=message)
162+
except Exception as e:
163+
tracer.end_span_with_error(cycle_span, str(e), e)
164+
raise
161165

162166
try:
163167
if stop_reason == "max_tokens":
@@ -196,42 +200,48 @@ async def event_loop_cycle(
196200

197201
# End the cycle and return results
198202
agent.event_loop_metrics.end_cycle(cycle_start_time, cycle_trace, attributes)
199-
# Set attributes before span auto-closes
203+
204+
# Force structured output tool call if LLM didn't use it automatically
205+
if structured_output_context.is_enabled and stop_reason == "end_turn":
206+
if structured_output_context.force_attempted:
207+
raise StructuredOutputException(
208+
"The model failed to invoke the structured output tool even after it was forced."
209+
)
210+
structured_output_context.set_forced_mode()
211+
logger.debug("Forcing structured output tool")
212+
await agent._append_messages(
213+
{"role": "user", "content": [{"text": structured_output_context.structured_output_prompt}]}
214+
)
215+
216+
tracer.end_event_loop_cycle_span(cycle_span, message)
217+
events = recurse_event_loop(
218+
agent=agent, invocation_state=invocation_state, structured_output_context=structured_output_context
219+
)
220+
async for typed_event in events:
221+
yield typed_event
222+
return
223+
200224
tracer.end_event_loop_cycle_span(cycle_span, message)
201-
except EventLoopException:
225+
yield EventLoopStopEvent(stop_reason, message, agent.event_loop_metrics, invocation_state["request_state"])
226+
except StructuredOutputException as e:
227+
tracer.end_span_with_error(cycle_span, str(e), e)
228+
raise
229+
except EventLoopException as e:
230+
tracer.end_span_with_error(cycle_span, str(e), e)
202231
# Don't yield or log the exception - we already did it when we
203232
# raised the exception and we don't need that duplication.
204233
raise
205234
except (ContextWindowOverflowException, MaxTokensReachedException) as e:
206235
# Special cased exceptions which we want to bubble up rather than get wrapped in an EventLoopException
236+
tracer.end_span_with_error(cycle_span, str(e), e)
207237
raise e
208238
except Exception as e:
239+
tracer.end_span_with_error(cycle_span, str(e), e)
209240
# Handle any other exceptions
210241
yield ForceStopEvent(reason=e)
211242
logger.exception("cycle failed")
212243
raise EventLoopException(e, invocation_state["request_state"]) from e
213244

214-
# Force structured output tool call if LLM didn't use it automatically
215-
if structured_output_context.is_enabled and stop_reason == "end_turn":
216-
if structured_output_context.force_attempted:
217-
raise StructuredOutputException(
218-
"The model failed to invoke the structured output tool even after it was forced."
219-
)
220-
structured_output_context.set_forced_mode()
221-
logger.debug("Forcing structured output tool")
222-
await agent._append_messages(
223-
{"role": "user", "content": [{"text": structured_output_context.structured_output_prompt}]}
224-
)
225-
226-
events = recurse_event_loop(
227-
agent=agent, invocation_state=invocation_state, structured_output_context=structured_output_context
228-
)
229-
async for typed_event in events:
230-
yield typed_event
231-
return
232-
233-
yield EventLoopStopEvent(stop_reason, message, agent.event_loop_metrics, invocation_state["request_state"])
234-
235245

236246
async def recurse_event_loop(
237247
agent: "Agent",
@@ -314,20 +324,21 @@ async def _handle_model_execution(
314324
model_id=model_id,
315325
custom_trace_attributes=agent.trace_attributes,
316326
)
317-
with trace_api.use_span(model_invoke_span, end_on_exit=True):
318-
await agent.hooks.invoke_callbacks_async(
319-
BeforeModelCallEvent(
320-
agent=agent,
321-
invocation_state=invocation_state,
327+
with trace_api.use_span(model_invoke_span, end_on_exit=False):
328+
try:
329+
await agent.hooks.invoke_callbacks_async(
330+
BeforeModelCallEvent(
331+
agent=agent,
332+
invocation_state=invocation_state,
333+
)
322334
)
323-
)
324335

325-
if structured_output_context.forced_mode:
326-
tool_spec = structured_output_context.get_tool_spec()
327-
tool_specs = [tool_spec] if tool_spec else []
328-
else:
329-
tool_specs = agent.tool_registry.get_all_tool_specs()
330-
try:
336+
if structured_output_context.forced_mode:
337+
tool_spec = structured_output_context.get_tool_spec()
338+
tool_specs = [tool_spec] if tool_spec else []
339+
else:
340+
tool_specs = agent.tool_registry.get_all_tool_specs()
341+
331342
async for event in stream_messages(
332343
agent.model,
333344
agent.system_prompt,
@@ -360,17 +371,17 @@ async def _handle_model_execution(
360371
"stop_reason=<%s>, retry_requested=<True> | hook requested model retry",
361372
stop_reason,
362373
)
374+
tracer.end_model_invoke_span(model_invoke_span, message, usage, metrics, stop_reason)
363375
continue # Retry the model call
364376

365377
if stop_reason == "max_tokens":
366378
message = recover_message_on_max_tokens_reached(message)
367379

368-
# Set attributes before span auto-closes
369380
tracer.end_model_invoke_span(model_invoke_span, message, usage, metrics, stop_reason)
370381
break # Success! Break out of retry loop
371382

372383
except Exception as e:
373-
# Exception is automatically recorded by use_span with end_on_exit=True
384+
tracer.end_span_with_error(model_invoke_span, str(e), e)
374385
after_model_call_event = AfterModelCallEvent(
375386
agent=agent,
376387
invocation_state=invocation_state,
@@ -538,7 +549,7 @@ async def _handle_tool_execution(
538549
interrupts,
539550
structured_output=structured_output_result,
540551
)
541-
# Set attributes before span auto-closes (span is managed by use_span in event_loop_cycle)
552+
# End the cycle span before yielding the recursive cycle.
542553
if cycle_span:
543554
tracer.end_event_loop_cycle_span(span=cycle_span, message=message)
544555

@@ -556,7 +567,7 @@ async def _handle_tool_execution(
556567

557568
yield ToolResultMessageEvent(message=tool_result_message)
558569

559-
# Set attributes before span auto-closes (span is managed by use_span in event_loop_cycle)
570+
# End the cycle span before yielding the recursive cycle.
560571
if cycle_span:
561572
tracer.end_event_loop_cycle_span(span=cycle_span, message=message, tool_result_message=tool_result_message)
562573

src/strands/telemetry/tracer.py

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -185,15 +185,17 @@ def _end_span(
185185
span: Span,
186186
attributes: dict[str, AttributeValue] | None = None,
187187
error: Exception | None = None,
188+
error_message: str | None = None,
188189
) -> None:
189190
"""Generic helper method to end a span.
190191
191192
Args:
192193
span: The span to end
193194
attributes: Optional attributes to set before ending the span
194195
error: Optional exception if an error occurred
196+
error_message: Optional error message to set in the span status
195197
"""
196-
if not span:
198+
if not span or not span.is_recording():
197199
return
198200

199201
try:
@@ -206,7 +208,8 @@ def _end_span(
206208

207209
# Handle error if present
208210
if error:
209-
span.set_status(StatusCode.ERROR, str(error))
211+
status_description = error_message or str(error) or type(error).__name__
212+
span.set_status(StatusCode.ERROR, status_description)
210213
span.record_exception(error)
211214
else:
212215
span.set_status(StatusCode.OK)
@@ -229,11 +232,11 @@ def end_span_with_error(self, span: Span, error_message: str, exception: Excepti
229232
error_message: Error message to set in the span status.
230233
exception: Optional exception to record in the span.
231234
"""
232-
if not span:
235+
if not span or not span.is_recording():
233236
return
234237

235238
error = exception or Exception(error_message)
236-
self._end_span(span, error=error)
239+
self._end_span(span, error=error, error_message=error_message)
237240

238241
def _add_event(
239242
self, span: Span | None, event_name: str, event_attributes: Attributes, to_span_attributes: bool = False
@@ -325,18 +328,15 @@ def end_model_invoke_span(
325328
) -> None:
326329
"""End a model invocation span with results and metrics.
327330
328-
Note: The span is automatically closed and exceptions recorded. This method just sets the necessary attributes.
329-
Status in the span is automatically set to UNSET (OK) on success or ERROR on exception.
330-
331331
Args:
332-
span: The span to set attributes on.
332+
span: The span to end.
333333
message: The message response from the model.
334334
usage: Token usage information from the model call.
335335
metrics: Metrics from the model call.
336336
stop_reason: The reason the model stopped generating.
337337
"""
338-
# Set end time attribute
339-
span.set_attribute("gen_ai.event.end_time", datetime.now(timezone.utc).isoformat())
338+
if not span or not span.is_recording():
339+
return
340340

341341
attributes: dict[str, AttributeValue] = {
342342
"gen_ai.usage.prompt_tokens": usage["inputTokens"],
@@ -373,7 +373,7 @@ def end_model_invoke_span(
373373
event_attributes={"finish_reason": str(stop_reason), "message": serialize(message["content"])},
374374
)
375375

376-
span.set_attributes(attributes)
376+
self._end_span(span, attributes)
377377

378378
def start_tool_call_span(
379379
self,
@@ -548,20 +548,14 @@ def end_event_loop_cycle_span(
548548
) -> None:
549549
"""End an event loop cycle span with results.
550550
551-
Note: The span is automatically closed and exceptions recorded. This method just sets the necessary attributes.
552-
Status in the span is automatically set to UNSET (OK) on success or ERROR on exception.
553-
554551
Args:
555-
span: The span to set attributes on.
552+
span: The span to end.
556553
message: The message response from this cycle.
557554
tool_result_message: Optional tool result message if a tool was called.
558555
"""
559-
if not span:
556+
if not span or not span.is_recording():
560557
return
561558

562-
# Set end time attribute
563-
span.set_attribute("gen_ai.event.end_time", datetime.now(timezone.utc).isoformat())
564-
565559
event_attributes: dict[str, AttributeValue] = {"message": serialize(message["content"])}
566560

567561
if tool_result_message:
@@ -586,6 +580,8 @@ def end_event_loop_cycle_span(
586580
else:
587581
self._add_event(span, "gen_ai.choice", event_attributes=event_attributes)
588582

583+
self._end_span(span)
584+
589585
def start_agent_span(
590586
self,
591587
messages: Messages,

tests/strands/event_loop/test_event_loop.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
from unittest.mock import ANY, AsyncMock, MagicMock, call, patch
66

77
import pytest
8+
from opentelemetry.sdk.trace import TracerProvider
9+
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
10+
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
811

912
import strands
1013
import strands.telemetry
@@ -19,6 +22,7 @@
1922
)
2023
from strands.interrupt import Interrupt, _InterruptState
2124
from strands.telemetry.metrics import EventLoopMetrics
25+
from strands.telemetry.tracer import Tracer
2226
from strands.tools.executors import SequentialToolExecutor
2327
from strands.tools.registry import ToolRegistry
2428
from strands.types._events import EventLoopStopEvent
@@ -578,6 +582,14 @@ async def test_event_loop_tracing_with_model_error(
578582
)
579583
await alist(stream)
580584

585+
assert mock_tracer.end_span_with_error.call_count == 2
586+
mock_tracer.end_span_with_error.assert_has_calls(
587+
[
588+
call(model_span, "Input too long", model.stream.side_effect),
589+
call(cycle_span, "Input too long", model.stream.side_effect),
590+
]
591+
)
592+
581593

582594
@pytest.mark.asyncio
583595
async def test_event_loop_cycle_max_tokens_exception(
@@ -668,6 +680,53 @@ async def test_event_loop_tracing_with_tool_execution(
668680
assert mock_tracer.end_model_invoke_span.call_count == 2
669681

670682

683+
@pytest.mark.asyncio
684+
async def test_event_loop_cycle_closes_cycle_span_before_recursive_cycle(
685+
agent,
686+
model,
687+
tool_stream,
688+
agenerator,
689+
alist,
690+
):
691+
exporter = InMemorySpanExporter()
692+
provider = TracerProvider()
693+
provider.add_span_processor(SimpleSpanProcessor(exporter))
694+
695+
tracer = Tracer()
696+
tracer.tracer_provider = provider
697+
tracer.tracer = provider.get_tracer(tracer.service_name)
698+
699+
async def delayed_text_stream():
700+
yield {"contentBlockDelta": {"delta": {"text": "test text"}}}
701+
await asyncio.sleep(0.05)
702+
yield {"contentBlockStop": {}}
703+
704+
agent.trace_span = None
705+
agent._system_prompt_content = None
706+
model.config = {"model_id": "test-model"}
707+
model.stream.side_effect = [
708+
agenerator(tool_stream),
709+
delayed_text_stream(),
710+
]
711+
712+
with patch("strands.event_loop.event_loop.get_tracer", return_value=tracer):
713+
stream = strands.event_loop.event_loop.event_loop_cycle(
714+
agent=agent,
715+
invocation_state={},
716+
)
717+
await alist(stream)
718+
719+
provider.force_flush()
720+
cycle_spans = sorted(
721+
[span for span in exporter.get_finished_spans() if span.name == "execute_event_loop_cycle"],
722+
key=lambda span: span.start_time,
723+
)
724+
725+
assert len(cycle_spans) == 2
726+
assert cycle_spans[0].end_time <= cycle_spans[1].start_time
727+
assert cycle_spans[0].end_time < cycle_spans[1].end_time
728+
729+
671730
@patch("strands.event_loop.event_loop.get_tracer")
672731
@pytest.mark.asyncio
673732
async def test_event_loop_tracing_with_throttling_exception(
@@ -704,6 +763,7 @@ async def test_event_loop_tracing_with_throttling_exception(
704763
)
705764
await alist(stream)
706765

766+
assert mock_tracer.end_span_with_error.call_count == 1
707767
# Verify span was created for the successful retry
708768
assert mock_tracer.start_model_invoke_span.call_count == 2
709769
assert mock_tracer.end_model_invoke_span.call_count == 1

0 commit comments

Comments
 (0)