Skip to content

Commit 028154b

Browse files
Di-Iszastrowm
authored andcommitted
fix: restore explicit span.end() to fix span end_time regression
PR #1293 wrapped event_loop_cycle() in use_span(end_on_exit=True) and removed explicit span.end() calls. Because event_loop_cycle is an async generator, yield keeps the context manager open across recursive cycles, causing all execute_event_loop_cycle spans to share the same OTel end_time. Switch to end_on_exit=False and explicitly call span.end() via _end_span() in end_event_loop_cycle_span() and end_model_invoke_span(), restoring end_span_with_error() in all exception paths.
1 parent 5391794 commit 028154b

File tree

5 files changed

+262
-76
lines changed

5 files changed

+262
-76
lines changed

src/strands/event_loop/event_loop.py

Lines changed: 68 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -139,25 +139,29 @@ async def event_loop_cycle(
139139
)
140140
invocation_state["event_loop_cycle_span"] = cycle_span
141141

142-
with trace_api.use_span(cycle_span, end_on_exit=True):
143-
# Skipping model invocation if in interrupt state as interrupts are currently only supported for tool calls.
144-
if agent._interrupt_state.activated:
145-
stop_reason: StopReason = "tool_use"
146-
message = agent._interrupt_state.context["tool_use_message"]
147-
# Skip model invocation if the latest message contains ToolUse
148-
elif _has_tool_use_in_latest_message(agent.messages):
149-
stop_reason = "tool_use"
150-
message = agent.messages[-1]
151-
else:
152-
model_events = _handle_model_execution(
153-
agent, cycle_span, cycle_trace, invocation_state, tracer, structured_output_context
154-
)
155-
async for model_event in model_events:
156-
if not isinstance(model_event, ModelStopReason):
157-
yield model_event
142+
with trace_api.use_span(cycle_span, end_on_exit=False):
143+
try:
144+
# Skipping model invocation if in interrupt state as interrupts are currently only supported for tool calls.
145+
if agent._interrupt_state.activated:
146+
stop_reason: StopReason = "tool_use"
147+
message = agent._interrupt_state.context["tool_use_message"]
148+
# Skip model invocation if the latest message contains ToolUse
149+
elif _has_tool_use_in_latest_message(agent.messages):
150+
stop_reason = "tool_use"
151+
message = agent.messages[-1]
152+
else:
153+
model_events = _handle_model_execution(
154+
agent, cycle_span, cycle_trace, invocation_state, tracer, structured_output_context
155+
)
156+
async for model_event in model_events:
157+
if not isinstance(model_event, ModelStopReason):
158+
yield model_event
158159

159-
stop_reason, message, *_ = model_event["stop"]
160-
yield ModelMessageEvent(message=message)
160+
stop_reason, message, *_ = model_event["stop"]
161+
yield ModelMessageEvent(message=message)
162+
except Exception as e:
163+
tracer.end_span_with_error(cycle_span, str(e), e)
164+
raise
161165

162166
try:
163167
if stop_reason == "max_tokens":
@@ -196,42 +200,48 @@ async def event_loop_cycle(
196200

197201
# End the cycle and return results
198202
agent.event_loop_metrics.end_cycle(cycle_start_time, cycle_trace, attributes)
199-
# Set attributes before span auto-closes
203+
204+
# Force structured output tool call if LLM didn't use it automatically
205+
if structured_output_context.is_enabled and stop_reason == "end_turn":
206+
if structured_output_context.force_attempted:
207+
raise StructuredOutputException(
208+
"The model failed to invoke the structured output tool even after it was forced."
209+
)
210+
structured_output_context.set_forced_mode()
211+
logger.debug("Forcing structured output tool")
212+
await agent._append_messages(
213+
{"role": "user", "content": [{"text": structured_output_context.structured_output_prompt}]}
214+
)
215+
216+
tracer.end_event_loop_cycle_span(cycle_span, message)
217+
events = recurse_event_loop(
218+
agent=agent, invocation_state=invocation_state, structured_output_context=structured_output_context
219+
)
220+
async for typed_event in events:
221+
yield typed_event
222+
return
223+
200224
tracer.end_event_loop_cycle_span(cycle_span, message)
201-
except EventLoopException:
225+
yield EventLoopStopEvent(stop_reason, message, agent.event_loop_metrics, invocation_state["request_state"])
226+
except StructuredOutputException as e:
227+
tracer.end_span_with_error(cycle_span, str(e), e)
228+
raise
229+
except EventLoopException as e:
230+
tracer.end_span_with_error(cycle_span, str(e), e)
202231
# Don't yield or log the exception - we already did it when we
203232
# raised the exception and we don't need that duplication.
204233
raise
205234
except (ContextWindowOverflowException, MaxTokensReachedException) as e:
206235
# Special cased exceptions which we want to bubble up rather than get wrapped in an EventLoopException
236+
tracer.end_span_with_error(cycle_span, str(e), e)
207237
raise e
208238
except Exception as e:
239+
tracer.end_span_with_error(cycle_span, str(e), e)
209240
# Handle any other exceptions
210241
yield ForceStopEvent(reason=e)
211242
logger.exception("cycle failed")
212243
raise EventLoopException(e, invocation_state["request_state"]) from e
213244

214-
# Force structured output tool call if LLM didn't use it automatically
215-
if structured_output_context.is_enabled and stop_reason == "end_turn":
216-
if structured_output_context.force_attempted:
217-
raise StructuredOutputException(
218-
"The model failed to invoke the structured output tool even after it was forced."
219-
)
220-
structured_output_context.set_forced_mode()
221-
logger.debug("Forcing structured output tool")
222-
await agent._append_messages(
223-
{"role": "user", "content": [{"text": structured_output_context.structured_output_prompt}]}
224-
)
225-
226-
events = recurse_event_loop(
227-
agent=agent, invocation_state=invocation_state, structured_output_context=structured_output_context
228-
)
229-
async for typed_event in events:
230-
yield typed_event
231-
return
232-
233-
yield EventLoopStopEvent(stop_reason, message, agent.event_loop_metrics, invocation_state["request_state"])
234-
235245

236246
async def recurse_event_loop(
237247
agent: "Agent",
@@ -316,20 +326,21 @@ async def _handle_model_execution(
316326
system_prompt=agent.system_prompt,
317327
system_prompt_content=agent._system_prompt_content,
318328
)
319-
with trace_api.use_span(model_invoke_span, end_on_exit=True):
320-
await agent.hooks.invoke_callbacks_async(
321-
BeforeModelCallEvent(
322-
agent=agent,
323-
invocation_state=invocation_state,
329+
with trace_api.use_span(model_invoke_span, end_on_exit=False):
330+
try:
331+
await agent.hooks.invoke_callbacks_async(
332+
BeforeModelCallEvent(
333+
agent=agent,
334+
invocation_state=invocation_state,
335+
)
324336
)
325-
)
326337

327-
if structured_output_context.forced_mode:
328-
tool_spec = structured_output_context.get_tool_spec()
329-
tool_specs = [tool_spec] if tool_spec else []
330-
else:
331-
tool_specs = agent.tool_registry.get_all_tool_specs()
332-
try:
338+
if structured_output_context.forced_mode:
339+
tool_spec = structured_output_context.get_tool_spec()
340+
tool_specs = [tool_spec] if tool_spec else []
341+
else:
342+
tool_specs = agent.tool_registry.get_all_tool_specs()
343+
333344
async for event in stream_messages(
334345
agent.model,
335346
agent.system_prompt,
@@ -363,17 +374,17 @@ async def _handle_model_execution(
363374
"stop_reason=<%s>, retry_requested=<True> | hook requested model retry",
364375
stop_reason,
365376
)
377+
tracer.end_model_invoke_span(model_invoke_span, message, usage, metrics, stop_reason)
366378
continue # Retry the model call
367379

368380
if stop_reason == "max_tokens":
369381
message = recover_message_on_max_tokens_reached(message)
370382

371-
# Set attributes before span auto-closes
372383
tracer.end_model_invoke_span(model_invoke_span, message, usage, metrics, stop_reason)
373384
break # Success! Break out of retry loop
374385

375386
except Exception as e:
376-
# Exception is automatically recorded by use_span with end_on_exit=True
387+
tracer.end_span_with_error(model_invoke_span, str(e), e)
377388
after_model_call_event = AfterModelCallEvent(
378389
agent=agent,
379390
invocation_state=invocation_state,
@@ -541,7 +552,7 @@ async def _handle_tool_execution(
541552
interrupts,
542553
structured_output=structured_output_result,
543554
)
544-
# Set attributes before span auto-closes (span is managed by use_span in event_loop_cycle)
555+
# End the cycle span before yielding the recursive cycle.
545556
if cycle_span:
546557
tracer.end_event_loop_cycle_span(span=cycle_span, message=message)
547558

@@ -559,7 +570,7 @@ async def _handle_tool_execution(
559570

560571
yield ToolResultMessageEvent(message=tool_result_message)
561572

562-
# Set attributes before span auto-closes (span is managed by use_span in event_loop_cycle)
573+
# End the cycle span before yielding the recursive cycle.
563574
if cycle_span:
564575
tracer.end_event_loop_cycle_span(span=cycle_span, message=message, tool_result_message=tool_result_message)
565576

src/strands/telemetry/tracer.py

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -185,15 +185,17 @@ def _end_span(
185185
span: Span,
186186
attributes: dict[str, AttributeValue] | None = None,
187187
error: Exception | None = None,
188+
error_message: str | None = None,
188189
) -> None:
189190
"""Generic helper method to end a span.
190191
191192
Args:
192193
span: The span to end
193194
attributes: Optional attributes to set before ending the span
194195
error: Optional exception if an error occurred
196+
error_message: Optional error message to set in the span status
195197
"""
196-
if not span:
198+
if not span or not span.is_recording():
197199
return
198200

199201
try:
@@ -206,7 +208,8 @@ def _end_span(
206208

207209
# Handle error if present
208210
if error:
209-
span.set_status(StatusCode.ERROR, str(error))
211+
status_description = error_message or str(error) or type(error).__name__
212+
span.set_status(StatusCode.ERROR, status_description)
210213
span.record_exception(error)
211214
else:
212215
span.set_status(StatusCode.OK)
@@ -229,11 +232,11 @@ def end_span_with_error(self, span: Span, error_message: str, exception: Excepti
229232
error_message: Error message to set in the span status.
230233
exception: Optional exception to record in the span.
231234
"""
232-
if not span:
235+
if not span or not span.is_recording():
233236
return
234237

235238
error = exception or Exception(error_message)
236-
self._end_span(span, error=error)
239+
self._end_span(span, error=error, error_message=error_message)
237240

238241
def _add_event(
239242
self, span: Span | None, event_name: str, event_attributes: Attributes, to_span_attributes: bool = False
@@ -330,18 +333,15 @@ def end_model_invoke_span(
330333
) -> None:
331334
"""End a model invocation span with results and metrics.
332335
333-
Note: The span is automatically closed and exceptions recorded. This method just sets the necessary attributes.
334-
Status in the span is automatically set to UNSET (OK) on success or ERROR on exception.
335-
336336
Args:
337-
span: The span to set attributes on.
337+
span: The span to end.
338338
message: The message response from the model.
339339
usage: Token usage information from the model call.
340340
metrics: Metrics from the model call.
341341
stop_reason: The reason the model stopped generating.
342342
"""
343-
# Set end time attribute
344-
span.set_attribute("gen_ai.event.end_time", datetime.now(timezone.utc).isoformat())
343+
if not span or not span.is_recording():
344+
return
345345

346346
attributes: dict[str, AttributeValue] = {
347347
"gen_ai.usage.prompt_tokens": usage["inputTokens"],
@@ -378,7 +378,7 @@ def end_model_invoke_span(
378378
event_attributes={"finish_reason": str(stop_reason), "message": serialize(message["content"])},
379379
)
380380

381-
span.set_attributes(attributes)
381+
self._end_span(span, attributes)
382382

383383
def start_tool_call_span(
384384
self,
@@ -553,20 +553,14 @@ def end_event_loop_cycle_span(
553553
) -> None:
554554
"""End an event loop cycle span with results.
555555
556-
Note: The span is automatically closed and exceptions recorded. This method just sets the necessary attributes.
557-
Status in the span is automatically set to UNSET (OK) on success or ERROR on exception.
558-
559556
Args:
560-
span: The span to set attributes on.
557+
span: The span to end.
561558
message: The message response from this cycle.
562559
tool_result_message: Optional tool result message if a tool was called.
563560
"""
564-
if not span:
561+
if not span or not span.is_recording():
565562
return
566563

567-
# Set end time attribute
568-
span.set_attribute("gen_ai.event.end_time", datetime.now(timezone.utc).isoformat())
569-
570564
event_attributes: dict[str, AttributeValue] = {"message": serialize(message["content"])}
571565

572566
if tool_result_message:
@@ -591,6 +585,8 @@ def end_event_loop_cycle_span(
591585
else:
592586
self._add_event(span, "gen_ai.choice", event_attributes=event_attributes)
593587

588+
self._end_span(span)
589+
594590
def start_agent_span(
595591
self,
596592
messages: Messages,

tests/strands/event_loop/test_event_loop.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
from unittest.mock import ANY, AsyncMock, MagicMock, call, patch
66

77
import pytest
8+
from opentelemetry.sdk.trace import TracerProvider
9+
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
10+
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
811

912
import strands
1013
import strands.telemetry
@@ -19,6 +22,7 @@
1922
)
2023
from strands.interrupt import Interrupt, _InterruptState
2124
from strands.telemetry.metrics import EventLoopMetrics
25+
from strands.telemetry.tracer import Tracer
2226
from strands.tools.executors import SequentialToolExecutor
2327
from strands.tools.registry import ToolRegistry
2428
from strands.types._events import EventLoopStopEvent
@@ -583,6 +587,14 @@ async def test_event_loop_tracing_with_model_error(
583587
)
584588
await alist(stream)
585589

590+
assert mock_tracer.end_span_with_error.call_count == 2
591+
mock_tracer.end_span_with_error.assert_has_calls(
592+
[
593+
call(model_span, "Input too long", model.stream.side_effect),
594+
call(cycle_span, "Input too long", model.stream.side_effect),
595+
]
596+
)
597+
586598

587599
@pytest.mark.asyncio
588600
async def test_event_loop_cycle_max_tokens_exception(
@@ -673,6 +685,53 @@ async def test_event_loop_tracing_with_tool_execution(
673685
assert mock_tracer.end_model_invoke_span.call_count == 2
674686

675687

688+
@pytest.mark.asyncio
689+
async def test_event_loop_cycle_closes_cycle_span_before_recursive_cycle(
690+
agent,
691+
model,
692+
tool_stream,
693+
agenerator,
694+
alist,
695+
):
696+
exporter = InMemorySpanExporter()
697+
provider = TracerProvider()
698+
provider.add_span_processor(SimpleSpanProcessor(exporter))
699+
700+
tracer = Tracer()
701+
tracer.tracer_provider = provider
702+
tracer.tracer = provider.get_tracer(tracer.service_name)
703+
704+
async def delayed_text_stream():
705+
yield {"contentBlockDelta": {"delta": {"text": "test text"}}}
706+
await asyncio.sleep(0.05)
707+
yield {"contentBlockStop": {}}
708+
709+
agent.trace_span = None
710+
agent._system_prompt_content = None
711+
model.config = {"model_id": "test-model"}
712+
model.stream.side_effect = [
713+
agenerator(tool_stream),
714+
delayed_text_stream(),
715+
]
716+
717+
with patch("strands.event_loop.event_loop.get_tracer", return_value=tracer):
718+
stream = strands.event_loop.event_loop.event_loop_cycle(
719+
agent=agent,
720+
invocation_state={},
721+
)
722+
await alist(stream)
723+
724+
provider.force_flush()
725+
cycle_spans = sorted(
726+
[span for span in exporter.get_finished_spans() if span.name == "execute_event_loop_cycle"],
727+
key=lambda span: span.start_time,
728+
)
729+
730+
assert len(cycle_spans) == 2
731+
assert cycle_spans[0].end_time <= cycle_spans[1].start_time
732+
assert cycle_spans[0].end_time < cycle_spans[1].end_time
733+
734+
676735
@patch("strands.event_loop.event_loop.get_tracer")
677736
@pytest.mark.asyncio
678737
async def test_event_loop_tracing_with_throttling_exception(
@@ -709,6 +768,7 @@ async def test_event_loop_tracing_with_throttling_exception(
709768
)
710769
await alist(stream)
711770

771+
assert mock_tracer.end_span_with_error.call_count == 1
712772
# Verify span was created for the successful retry
713773
assert mock_tracer.start_model_invoke_span.call_count == 2
714774
assert mock_tracer.end_model_invoke_span.call_count == 1

0 commit comments

Comments
 (0)