|
25 | 25 | from google.cloud.spanner_v1._helpers import ( |
26 | 26 | _metadata_with_prefix, |
27 | 27 | _metadata_with_leader_aware_routing, |
| 28 | + AtomicCounter, |
28 | 29 | ) |
29 | 30 | from google.cloud.spanner_v1._opentelemetry_tracing import trace_call |
30 | 31 | from google.cloud.spanner_v1 import RequestOptions |
@@ -222,13 +223,24 @@ def commit( |
222 | 223 | trace_attributes, |
223 | 224 | observability_options=observability_options, |
224 | 225 | ): |
225 | | - method = functools.partial( |
226 | | - api.commit, |
227 | | - request=request, |
228 | | - metadata=metadata, |
229 | | - ) |
| 226 | + attempt = AtomicCounter(0) |
| 227 | + next_nth_request = database._next_nth_request |
| 228 | + |
| 229 | + def wrapped_method(*args, **kwargs): |
| 230 | + all_metadata = database.metadata_with_request_id( |
| 231 | + next_nth_request, |
| 232 | + attempt.increment(), |
| 233 | + metadata, |
| 234 | + ) |
| 235 | + method = functools.partial( |
| 236 | + api.commit, |
| 237 | + request=request, |
| 238 | + metadata=all_metadata, |
| 239 | + ) |
| 240 | + return method(*args, **kwargs) |
| 241 | + |
230 | 242 | response = _retry( |
231 | | - method, |
| 243 | + wrapped_method, |
232 | 244 | allowed_exceptions={InternalServerError: _check_rst_stream_error}, |
233 | 245 | ) |
234 | 246 | self.committed = response.commit_timestamp |
@@ -341,13 +353,24 @@ def batch_write(self, request_options=None, exclude_txn_from_change_streams=Fals |
341 | 353 | trace_attributes, |
342 | 354 | observability_options=observability_options, |
343 | 355 | ): |
344 | | - method = functools.partial( |
345 | | - api.batch_write, |
346 | | - request=request, |
347 | | - metadata=metadata, |
348 | | - ) |
| 356 | + attempt = AtomicCounter(0) |
| 357 | + next_nth_request = database._next_nth_request |
| 358 | + |
| 359 | + def wrapped_method(*args, **kwargs): |
| 360 | + all_metadata = database.metadata_with_request_id( |
| 361 | + next_nth_request, |
| 362 | + attempt.increment(), |
| 363 | + metadata, |
| 364 | + ) |
| 365 | + method = functools.partial( |
| 366 | + api.batch_write, |
| 367 | + request=request, |
| 368 | + metadata=all_metadata, |
| 369 | + ) |
| 370 | + return method(*args, **kwargs) |
| 371 | + |
349 | 372 | response = _retry( |
350 | | - method, |
| 373 | + wrapped_method, |
351 | 374 | allowed_exceptions={InternalServerError: _check_rst_stream_error}, |
352 | 375 | ) |
353 | 376 | self.committed = True |
|
0 commit comments