Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.

Commit de4eac6

Browse files
committed
adjust message logic
1 parent e4140d5 commit de4eac6

File tree

2 files changed

+67
-18
lines changed

2 files changed

+67
-18
lines changed

google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py

Lines changed: 66 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@
6060

6161

6262
_LOGGER = logging.getLogger(__name__)
63+
_SLOW_ACK_LOGGER = logging.getLogger("slow-ack")
64+
_STREAMS_LOGGER = logging.getLogger("subscriber-streams")
65+
_FLOW_CONTROL_LOGGER = logging.getLogger("subscriber-flow-control")
66+
_CALLBACK_DELIVERY_LOGGER = logging.getLogger("callback-delivery")
67+
_CALLBACK_EXCEPTION_LOGGER = logging.getLogger("callback-exceptions")
68+
_EXPIRY_LOGGER = logging.getLogger("expiry")
6369
_REGULAR_SHUTDOWN_THREAD_NAME = "Thread-RegularStreamShutdown"
6470
_RPC_ERROR_THREAD_NAME = "Thread-OnRpcTerminated"
6571
_RETRYABLE_STREAM_ERRORS = (
@@ -145,6 +151,14 @@ def _wrap_callback_errors(
145151
callback: The user callback.
146152
message: The Pub/Sub message.
147153
"""
154+
_CALLBACK_DELIVERY_LOGGER.debug(
155+
"Message (id=%s, ack_id=%s, ordering_key=%s, exactly_once=%s) received by subscriber callback",
156+
message.message_id,
157+
message.ack_id,
158+
message.ordering_key,
159+
message.exactly_once_enabled,
160+
)
161+
148162
try:
149163
if message.opentelemetry_data:
150164
message.opentelemetry_data.end_subscribe_concurrency_control_span()
@@ -156,9 +170,15 @@ def _wrap_callback_errors(
156170
# Note: the likelihood of this failing is extremely low. This just adds
157171
# a message to a queue, so if this doesn't work the world is in an
158172
# unrecoverable state and this thread should just bail.
159-
_LOGGER.exception(
160-
"Top-level exception occurred in callback while processing a message"
173+
174+
_CALLBACK_EXCEPTION_LOGGER.exception(
175+
"Message (id=%s, ack_id=%s, ordering_key=%s, exactly_once=%s)'s callback threw exception, nacking message.",
176+
message.message_id,
177+
message.ack_id,
178+
message.ordering_key,
179+
message.exactly_once_enabled,
161180
)
181+
162182
message.nack()
163183
on_callback_error(exc)
164184

@@ -199,6 +219,7 @@ def _process_requests(
199219
error_status: Optional["status_pb2.Status"],
200220
ack_reqs_dict: Dict[str, requests.AckRequest],
201221
errors_dict: Optional[Dict[str, str]],
222+
ack_histogram: Optional[histogram.Histogram] = None,
202223
):
203224
"""Process requests when exactly-once delivery is enabled by referring to
204225
error_status and errors_dict.
@@ -210,6 +231,16 @@ def _process_requests(
210231
requests_completed = []
211232
requests_to_retry = []
212233
for ack_id in ack_reqs_dict:
234+
# Debug logging: slow acks
235+
if ack_histogram and ack_reqs_dict[
236+
ack_id
237+
].time_to_ack > ack_histogram.percentile(percent=99):
238+
_SLOW_ACK_LOGGER.debug(
239+
"Message (id=%s, ack_id=%s) ack duration of %s s is higher than the p99 ack duration",
240+
ack_reqs_dict[ack_id].message_id,
241+
ack_reqs_dict[ack_id].ack_id,
242+
)
243+
213244
# Handle special errors returned for ack/modack RPCs via the ErrorInfo
214245
# sidecar metadata when exactly-once delivery is enabled.
215246
if errors_dict and ack_id in errors_dict:
@@ -560,8 +591,10 @@ def maybe_pause_consumer(self) -> None:
560591
with self._pause_resume_lock:
561592
if self.load >= _MAX_LOAD:
562593
if self._consumer is not None and not self._consumer.is_paused:
563-
_LOGGER.debug(
564-
"Message backlog over load at %.2f, pausing.", self.load
594+
_FLOW_CONTROL_LOGGER.debug(
595+
"Message backlog over load at %.2f (threshold %.2f), initiating client-side flow control",
596+
self.load,
597+
_RESUME_THRESHOLD,
565598
)
566599
self._consumer.pause()
567600

@@ -588,10 +621,18 @@ def maybe_resume_consumer(self) -> None:
588621
self._maybe_release_messages()
589622

590623
if self.load < _RESUME_THRESHOLD:
591-
_LOGGER.debug("Current load is %.2f, resuming consumer.", self.load)
624+
_FLOW_CONTROL_LOGGER.debug(
625+
"Current load is %.2f (threshold %.2f), suspending client-side flow control.",
626+
self.load,
627+
_RESUME_THRESHOLD,
628+
)
592629
self._consumer.resume()
593630
else:
594-
_LOGGER.debug("Did not resume, current load is %.2f.", self.load)
631+
_FLOW_CONTROL_LOGGER.debug(
632+
"Current load is %.2f (threshold %.2f), retaining client-side flow control.",
633+
self.load,
634+
_RESUME_THRESHOLD,
635+
)
595636

596637
def _maybe_release_messages(self) -> None:
597638
"""Release (some of) the held messages if the current load allows for it.
@@ -702,7 +743,7 @@ def send_unary_ack(
702743

703744
if self._exactly_once_delivery_enabled():
704745
requests_completed, requests_to_retry = _process_requests(
705-
error_status, ack_reqs_dict, ack_errors_dict
746+
error_status, ack_reqs_dict, ack_errors_dict, self.ack_histogram
706747
)
707748
else:
708749
requests_completed = []
@@ -796,7 +837,7 @@ def send_unary_modack(
796837

797838
if self._exactly_once_delivery_enabled():
798839
requests_completed, requests_to_retry = _process_requests(
799-
error_status, ack_reqs_dict, modack_errors_dict
840+
error_status, ack_reqs_dict, modack_errors_dict, self.ack_histogram
800841
)
801842
else:
802843
requests_completed = []
@@ -1239,6 +1280,11 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:
12391280
receipt_modack=True,
12401281
)
12411282

1283+
if len(expired_ack_ids):
1284+
_EXPIRY_LOGGER.debug(
1285+
"ack ids %s were dropped as they have already expired."
1286+
)
1287+
12421288
with self._pause_resume_lock:
12431289
if self._scheduler is None or self._leaser is None:
12441290
_LOGGER.debug(
@@ -1304,9 +1350,13 @@ def _should_recover(self, exception: BaseException) -> bool:
13041350
# If this is in the list of idempotent exceptions, then we want to
13051351
# recover.
13061352
if isinstance(exception, _RETRYABLE_STREAM_ERRORS):
1307-
_LOGGER.debug("Observed recoverable stream error %s", exception)
1353+
_STREAMS_LOGGER.debug(
1354+
"Observed recoverable stream error %s, reopening stream", exception
1355+
)
13081356
return True
1309-
_LOGGER.debug("Observed non-recoverable stream error %s", exception)
1357+
_STREAMS_LOGGER.debug(
1358+
"Observed non-recoverable stream error %s, shutting down stream", exception
1359+
)
13101360
return False
13111361

13121362
def _should_terminate(self, exception: BaseException) -> bool:
@@ -1326,9 +1376,13 @@ def _should_terminate(self, exception: BaseException) -> bool:
13261376
is_api_error = isinstance(exception, exceptions.GoogleAPICallError)
13271377
# Terminate any non-API errors, or non-retryable errors (permission denied, unauthorized, etc.)
13281378
if not is_api_error or isinstance(exception, _TERMINATING_STREAM_ERRORS):
1329-
_LOGGER.debug("Observed terminating stream error %s", exception)
1379+
_STREAMS_LOGGER.debug(
1380+
"Observed terminating stream error %s, shutting down stream", exception
1381+
)
13301382
return True
1331-
_LOGGER.debug("Observed non-terminating stream error %s", exception)
1383+
_STREAMS_LOGGER.debug(
1384+
"Observed non-terminating stream error %s, attempting to reopen", exception
1385+
)
13321386
return False
13331387

13341388
def _on_rpc_done(self, future: Any) -> None:

google/cloud/pubsub_v1/subscriber/message.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ def __init__(
133133
self._delivery_attempt = delivery_attempt if delivery_attempt > 0 else None
134134
self._request_queue = request_queue
135135
self._exactly_once_delivery_enabled_func = exactly_once_delivery_enabled_func
136-
self._message_id = message.message_id
136+
self.message_id = message.message_id
137137

138138
# The instantiation time is the time that this message
139139
# was received. Tracking this provides us a way to be smart about
@@ -231,11 +231,6 @@ def ack_id(self) -> str:
231231
"""the ID used to ack the message."""
232232
return self._ack_id
233233

234-
@property
235-
def message_id(self) -> str:
236-
"""The message id of the message"""
237-
return self._message_id
238-
239234
@property
240235
def delivery_attempt(self) -> Optional[int]:
241236
"""The delivery attempt counter is 1 + (the sum of number of NACKs

0 commit comments

Comments
 (0)