Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.

Commit b41d8ab

Browse files
committed
adjust logic to log ack requests
1 parent 7fbf91b commit b41d8ab

1 file changed

Lines changed: 8 additions & 9 deletions

File tree

google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,6 @@ def _get_ack_errors(
214214
return info.metadata
215215
return None
216216

217-
218217
def _process_requests(
219218
error_status: Optional["status_pb2.Status"],
220219
ack_reqs_dict: Dict[str, requests.AckRequest],
@@ -230,17 +229,17 @@ def _process_requests(
230229
"""
231230
requests_completed = []
232231
requests_to_retry = []
233-
for ack_id, ack_request in ack_reqs_dict.items():
232+
for ack_id in ack_reqs_dict:
234233
# Debug logging: slow acks
235-
if ack_histogram and ack_request.time_to_ack > ack_histogram.percentile(
234+
if ack_histogram and ack_reqs_dict[ack_id].time_to_ack > ack_histogram.percentile(
236235
percent=99
237236
):
238237
_SLOW_ACK_LOGGER.debug(
239238
"Message (id=%s, ack_id=%s) ack duration of %s s is higher than the p99 ack duration",
240-
ack_request.message_id,
241-
ack_request.ack_id,
239+
ack_reqs_dict[ack_id].message_id,
240+
ack_reqs_dict[ack_id].ack_id,
242241
)
243-
242+
244243
# Handle special errors returned for ack/modack RPCs via the ErrorInfo
245244
# sidecar metadata when exactly-once delivery is enabled.
246245
if errors_dict and ack_id in errors_dict:
@@ -252,16 +251,16 @@ def _process_requests(
252251
exc = AcknowledgeError(AcknowledgeStatus.INVALID_ACK_ID, info=None)
253252
else:
254253
exc = AcknowledgeError(AcknowledgeStatus.OTHER, exactly_once_error)
255-
future = ack_request.future
254+
future = ack_reqs_dict[ack_id].future
256255
if future is not None:
257256
future.set_exception(exc)
258-
requests_completed.append(ack_request)
257+
requests_completed.append(ack_reqs_dict[ack_id])
259258
# Temporary GRPC errors are retried
260259
elif (
261260
error_status
262261
and error_status.code in _EXACTLY_ONCE_DELIVERY_TEMPORARY_RETRY_ERRORS
263262
):
264-
requests_to_retry.append(ack_request)
263+
requests_to_retry.append(ack_reqs_dict[ack_id])
265264
# Other GRPC errors are NOT retried
266265
elif error_status:
267266
if error_status.code == code_pb2.PERMISSION_DENIED:

0 commit comments

Comments
 (0)