Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.

Commit 8880472

Browse files
committed
edits to correct for failing tests
1 parent 0fa4f69 commit 8880472

2 files changed

Lines changed: 34 additions & 12 deletions

File tree

google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1041,13 +1041,10 @@ def _shutdown(self, reason: Any = None) -> None:
10411041
assert self._leaser is not None
10421042
self._leaser.stop()
10431043

1044-
total = len(dropped_messages) + len(
1045-
self._messages_on_hold._messages_on_hold
1046-
)
1044+
on_hold_msgs = self._messages_on_hold._messages_on_hold
1045+
total = len(dropped_messages) + len(on_hold_msgs)
10471046
_LOGGER.debug(f"NACK-ing all not-yet-dispatched messages (total: {total}).")
1048-
messages_to_nack = itertools.chain(
1049-
dropped_messages, self._messages_on_hold._messages_on_hold
1050-
)
1047+
messages_to_nack = itertools.chain(dropped_messages, on_hold_msgs)
10511048
for msg in messages_to_nack:
10521049
msg.nack()
10531050

tests/unit/pubsub_v1/publisher/test_publisher_client.py

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -308,13 +308,38 @@ def test_opentelemetry_flow_control_exception(creds, span_exporter):
308308
future2.result()
309309

310310
spans = span_exporter.get_finished_spans()
311-
# The number of spans created is non-deterministic.
312-
# The first successful publish can create 2 or 4 spans. The second,
313-
# failing publish creates 2 spans.
314-
assert len(spans) in [4, 6]
315311

316-
failed_flow_control_span = spans[2]
317-
finished_publish_create_span = spans[3]
312+
failed_flow_control_span = None
313+
for span in spans:
314+
if (
315+
span.name == "publisher flow control"
316+
and span.status.status_code == trace.StatusCode.ERROR
317+
):
318+
failed_flow_control_span = span
319+
break
320+
assert failed_flow_control_span is not None, "Failed flow control span not found"
321+
322+
finished_publish_create_span = None
323+
for span in spans:
324+
if (
325+
span.name == "topicID create"
326+
and span.status.status_code == trace.StatusCode.ERROR
327+
):
328+
# We need the span for the second publish, which is the one that failed.
329+
# This is a bit heuristic, but the one with the flow control error as a child.
330+
for child_span in spans:
331+
if (
332+
child_span.parent
333+
and child_span.parent.span_id == span.get_span_context().span_id
334+
and child_span.name == "publisher flow control"
335+
):
336+
finished_publish_create_span = span
337+
break
338+
if finished_publish_create_span:
339+
break
340+
assert (
341+
finished_publish_create_span is not None
342+
), "Finished publish create span with flow control error not found"
318343

319344
# Verify failed flow control span values.
320345
assert failed_flow_control_span.name == "publisher flow control"

0 commit comments

Comments
 (0)