Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def end_subscribe_scheduler_span(self) -> None:
assert self._scheduler_span is not None
self._scheduler_span.end()

def start_process_span(self) -> None:
def start_process_span(self) -> trace.Span:
assert self._subscribe_span is not None
tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
publish_create_span_link: Optional[trace.Link] = None
Expand All @@ -186,6 +186,7 @@ def start_process_span(self) -> None:
end_on_exit=False,
) as process_span:
self._process_span = process_span
return process_span

def end_process_span(self) -> None:
assert self._process_span is not None
Expand All @@ -200,6 +201,13 @@ def add_process_span_event(self, event: str) -> None:
},
)

def __enter__(self) -> trace.Span:
return self.start_process_span()

def __exit__(self, exc_type, exc_val, traceback):
if self._process_span:
self.end_process_span()


def start_modack_span(
subscribe_span_links: List[trace.Link],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,10 @@ def _wrap_callback_errors(
try:
if message.opentelemetry_data:
message.opentelemetry_data.end_subscribe_concurrency_control_span()
message.opentelemetry_data.start_process_span()
callback(message)
with message.opentelemetry_data:
callback(message)
else:
callback(message)
except BaseException as exc:
# Note: the likelihood of this failing is extremely low. This just adds
# a message to a queue, so if this doesn't work the world is in an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2956,10 +2956,10 @@ def test_opentelemetry_subscriber_concurrency_control_span_end(span_exporter):
streaming_pull_manager._wrap_callback_errors(mock.Mock(), mock.Mock(), msg)

spans = span_exporter.get_finished_spans()
assert len(spans) == 1
assert len(spans) == 2

concurrency_control_span = spans[0]
concurrency_control_span.name == "subscriber concurrency control"
assert concurrency_control_span.name == "subscriber concurrency control"


def test_opentelemetry_wrap_callback_error(span_exporter):
Expand Down