Skip to content

Commit 3530d3b

Browse files
committed
fix: add _end_span method to prevent double-ending generators
1 parent f9ad052 commit 3530d3b

1 file changed

Lines changed: 48 additions & 40 deletions

File tree

langfuse/_client/observe.py

Lines changed: 48 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,26 @@ def __init__(
560560
self.items: List[Any] = []
561561
self.span = span
562562
self.transform_fn = transform_fn
563+
self._ended = False
564+
565+
def _end_span(
566+
self, *, level: Optional[str] = None, status_message: Optional[str] = None
567+
) -> None:
568+
if self._ended:
569+
return
570+
self._ended = True
571+
572+
output: Any = self.items
573+
574+
if self.transform_fn is not None:
575+
output = self.transform_fn(self.items)
576+
elif all(isinstance(item, str) for item in self.items):
577+
output = "".join(self.items)
578+
579+
if level is not None:
580+
self.span.update(output=output, level=level, status_message=status_message).end()
581+
else:
582+
self.span.update(output=output).end()
563583

564584
def __iter__(self) -> "_ContextPreservedSyncGeneratorWrapper":
565585
return self
@@ -573,35 +593,19 @@ def __next__(self) -> Any:
573593
return item
574594

575595
except StopIteration:
576-
# Handle output and span cleanup when generator is exhausted
577-
output: Any = self.items
578-
579-
if self.transform_fn is not None:
580-
output = self.transform_fn(self.items)
581-
582-
elif all(isinstance(item, str) for item in self.items):
583-
output = "".join(self.items)
584-
585-
self.span.update(output=output).end()
596+
self._end_span()
586597

587598
raise # Re-raise StopIteration
588599

589600
except (Exception, asyncio.CancelledError) as e:
590-
self.span.update(
601+
self._end_span(
591602
level="ERROR", status_message=str(e) or type(e).__name__
592-
).end()
603+
)
593604

594605
raise
595606

596607
def close(self) -> None:
597-
output: Any = self.items
598-
599-
if self.transform_fn is not None:
600-
output = self.transform_fn(self.items)
601-
elif all(isinstance(item, str) for item in self.items):
602-
output = "".join(self.items)
603-
604-
self.span.update(output=output).end()
608+
self._end_span()
605609
self.generator.close()
606610

607611

@@ -630,6 +634,26 @@ def __init__(
630634
self.items: List[Any] = []
631635
self.span = span
632636
self.transform_fn = transform_fn
637+
self._ended = False
638+
639+
def _end_span(
640+
self, *, level: Optional[str] = None, status_message: Optional[str] = None
641+
) -> None:
642+
if self._ended:
643+
return
644+
self._ended = True
645+
646+
output: Any = self.items
647+
648+
if self.transform_fn is not None:
649+
output = self.transform_fn(self.items)
650+
elif all(isinstance(item, str) for item in self.items):
651+
output = "".join(self.items)
652+
653+
if level is not None:
654+
self.span.update(output=output, level=level, status_message=status_message).end()
655+
else:
656+
self.span.update(output=output).end()
633657

634658
def __aiter__(self) -> "_ContextPreservedAsyncGeneratorWrapper":
635659
return self
@@ -652,32 +676,16 @@ async def __anext__(self) -> Any:
652676
return item
653677

654678
except StopAsyncIteration:
655-
# Handle output and span cleanup when generator is exhausted
656-
output: Any = self.items
657-
658-
if self.transform_fn is not None:
659-
output = self.transform_fn(self.items)
660-
661-
elif all(isinstance(item, str) for item in self.items):
662-
output = "".join(self.items)
663-
664-
self.span.update(output=output).end()
679+
self._end_span()
665680

666681
raise # Re-raise StopAsyncIteration
667682
except (Exception, asyncio.CancelledError) as e:
668-
self.span.update(
683+
self._end_span(
669684
level="ERROR", status_message=str(e) or type(e).__name__
670-
).end()
685+
)
671686

672687
raise
673688

674689
async def aclose(self) -> None:
675-
output: Any = self.items
676-
677-
if self.transform_fn is not None:
678-
output = self.transform_fn(self.items)
679-
elif all(isinstance(item, str) for item in self.items):
680-
output = "".join(self.items)
681-
682-
self.span.update(output=output).end()
690+
self._end_span()
683691
await self.generator.aclose()

0 commit comments

Comments
 (0)