Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Commit 19e341a

Browse files
committed
chore(x-goog-spanner-request-id): more updates for batch_write
This change plumbs in some x-goog-spanner-request-id updates for batch_write and some tests too. Updates #1261
1 parent 3a91671 commit 19e341a

File tree

6 files changed

+105
-56
lines changed

6 files changed

+105
-56
lines changed

google/cloud/spanner_v1/batch.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
_metadata_with_prefix,
2727
_metadata_with_leader_aware_routing,
2828
_merge_Transaction_Options,
29+
AtomicCounter,
2930
)
3031
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
3132
from google.cloud.spanner_v1 import RequestOptions
@@ -385,13 +386,22 @@ def batch_write(self, request_options=None, exclude_txn_from_change_streams=Fals
385386
observability_options=observability_options,
386387
metadata=metadata,
387388
), MetricsCapture():
388-
method = functools.partial(
389-
api.batch_write,
390-
request=request,
391-
metadata=metadata,
392-
)
389+
attempt = AtomicCounter(0)
390+
nth_request = database._next_nth_request
391+
392+
def wrapped_method(*args, **kwargs):
393+
return functools.partial(
394+
api.batch_write,
395+
request=request,
396+
metadata=database.metadata_with_request_id(
397+
nth_request,
398+
attempt.increment(),
399+
metadata,
400+
),
401+
)(*args, **kwargs)
402+
393403
response = _retry(
394-
method,
404+
wrapped_method,
395405
allowed_exceptions={
396406
InternalServerError: _check_rst_stream_error,
397407
},

google/cloud/spanner_v1/testing/interceptors.py

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,6 @@ def reset(self):
7171

7272

7373
class XGoogRequestIDHeaderInterceptor(ClientInterceptor):
74-
# TODO:(@odeke-em): delete this guard when PR #1367 is merged.
75-
X_GOOG_REQUEST_ID_FUNCTIONALITY_MERGED = True
76-
7774
def __init__(self):
7875
self._unary_req_segments = []
7976
self._stream_req_segments = []
@@ -87,24 +84,23 @@ def intercept(self, method, request_or_iterator, call_details):
8784
x_goog_request_id = value
8885
break
8986

90-
if self.X_GOOG_REQUEST_ID_FUNCTIONALITY_MERGED and not x_goog_request_id:
87+
if not x_goog_request_id:
9188
raise Exception(
9289
f"Missing {X_GOOG_REQUEST_ID} header in {call_details.method}"
9390
)
9491

9592
response_or_iterator = method(request_or_iterator, call_details)
9693
streaming = getattr(response_or_iterator, "__iter__", None) is not None
9794

98-
if self.X_GOOG_REQUEST_ID_FUNCTIONALITY_MERGED:
99-
with self.__lock:
100-
if streaming:
101-
self._stream_req_segments.append(
102-
(call_details.method, parse_request_id(x_goog_request_id))
103-
)
104-
else:
105-
self._unary_req_segments.append(
106-
(call_details.method, parse_request_id(x_goog_request_id))
107-
)
95+
with self.__lock:
96+
if streaming:
97+
self._stream_req_segments.append(
98+
(call_details.method, parse_request_id(x_goog_request_id))
99+
)
100+
else:
101+
self._unary_req_segments.append(
102+
(call_details.method, parse_request_id(x_goog_request_id))
103+
)
108104

109105
return response_or_iterator
110106

tests/unit/test_batch.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -596,9 +596,14 @@ def _test_batch_write_with_request_options(
596596
"traceparent is missing in metadata",
597597
)
598598

599+
expected_metadata.append(
600+
(
601+
"x-goog-spanner-request-id",
602+
f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1",
603+
)
604+
)
599605
# Remove traceparent from actual metadata for comparison
600606
filtered_metadata = [item for item in metadata if item[0] != "traceparent"]
601-
602607
self.assertEqual(filtered_metadata, expected_metadata)
603608

604609
if request_options is None:

tests/unit/test_database.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,9 @@ def _make_database_admin_api():
120120
def _make_spanner_api():
121121
from google.cloud.spanner_v1 import SpannerClient
122122

123-
return mock.create_autospec(SpannerClient, instance=True)
123+
api = mock.create_autospec(SpannerClient, instance=True)
124+
api._transport = "transport"
125+
return api
124126

125127
def test_ctor_defaults(self):
126128
from google.cloud.spanner_v1.pool import BurstyPool
@@ -1300,6 +1302,19 @@ def _execute_partitioned_dml_helper(
13001302
],
13011303
)
13021304
self.assertEqual(api.begin_transaction.call_count, 2)
1305+
api.begin_transaction.assert_called_with(
1306+
session=session.name,
1307+
options=txn_options,
1308+
metadata=[
1309+
("google-cloud-resource-prefix", database.name),
1310+
("x-goog-spanner-route-to-leader", "true"),
1311+
(
1312+
"x-goog-spanner-request-id",
1313+
# Please note that this try was by an abort and not from service unavailable.
1314+
f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.3.1",
1315+
),
1316+
],
1317+
)
13031318
else:
13041319
api.begin_transaction.assert_called_with(
13051320
session=session.name,
@@ -1314,6 +1329,18 @@ def _execute_partitioned_dml_helper(
13141329
],
13151330
)
13161331
self.assertEqual(api.begin_transaction.call_count, 1)
1332+
api.begin_transaction.assert_called_with(
1333+
session=session.name,
1334+
options=txn_options,
1335+
metadata=[
1336+
("google-cloud-resource-prefix", database.name),
1337+
("x-goog-spanner-route-to-leader", "true"),
1338+
(
1339+
"x-goog-spanner-request-id",
1340+
f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1",
1341+
),
1342+
],
1343+
)
13171344

13181345
if params:
13191346
expected_params = Struct(
@@ -3241,6 +3268,10 @@ def test_context_mgr_success(self):
32413268
metadata=[
32423269
("google-cloud-resource-prefix", database.name),
32433270
("x-goog-spanner-route-to-leader", "true"),
3271+
(
3272+
"x-goog-spanner-request-id",
3273+
f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1",
3274+
),
32443275
],
32453276
)
32463277

tests/unit/test_snapshot.py

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
)
2727
from google.cloud.spanner_v1._helpers import (
2828
_metadata_with_request_id,
29+
AtomicCounter,
2930
)
3031
from google.cloud.spanner_v1.param_types import INT64
3132
from google.cloud.spanner_v1.request_id_header import REQ_RAND_PROCESS_ID
@@ -165,7 +166,7 @@ def test_iteration_w_empty_raw(self):
165166
metadata=[
166167
(
167168
"x-goog-spanner-request-id",
168-
f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1",
169+
f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1",
169170
)
170171
],
171172
)
@@ -187,7 +188,7 @@ def test_iteration_w_non_empty_raw(self):
187188
metadata=[
188189
(
189190
"x-goog-spanner-request-id",
190-
f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1",
191+
f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1",
191192
)
192193
],
193194
)
@@ -214,7 +215,7 @@ def test_iteration_w_raw_w_resume_tken(self):
214215
metadata=[
215216
(
216217
"x-goog-spanner-request-id",
217-
f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1",
218+
f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1",
218219
)
219220
],
220221
)
@@ -293,7 +294,7 @@ def test_iteration_w_raw_raising_non_retryable_internal_error_no_token(self):
293294
metadata=[
294295
(
295296
"x-goog-spanner-request-id",
296-
f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1",
297+
f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1",
297298
)
298299
],
299300
)
@@ -371,7 +372,7 @@ def test_iteration_w_raw_raising_non_retryable_internal_error(self):
371372
metadata=[
372373
(
373374
"x-goog-spanner-request-id",
374-
f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1",
375+
f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1",
375376
)
376377
],
377378
)
@@ -550,7 +551,7 @@ def test_iteration_w_raw_raising_non_retryable_internal_error_after_token(self):
550551
metadata=[
551552
(
552553
"x-goog-spanner-request-id",
553-
f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1",
554+
f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1",
554555
)
555556
],
556557
)
@@ -1089,7 +1090,7 @@ def _execute_sql_helper(
10891090
("google-cloud-resource-prefix", database.name),
10901091
(
10911092
"x-goog-spanner-request-id",
1092-
f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1",
1093+
f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1",
10931094
),
10941095
],
10951096
timeout=timeout,
@@ -1266,7 +1267,7 @@ def _partition_read_helper(
12661267
("x-goog-spanner-route-to-leader", "true"),
12671268
(
12681269
"x-goog-spanner-request-id",
1269-
f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1",
1270+
f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1",
12701271
),
12711272
],
12721273
retry=retry,
@@ -1449,7 +1450,7 @@ def _partition_query_helper(
14491450
("x-goog-spanner-route-to-leader", "true"),
14501451
(
14511452
"x-goog-spanner-request-id",
1452-
f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1",
1453+
f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1",
14531454
),
14541455
],
14551456
retry=retry,
@@ -1906,10 +1907,18 @@ def test_begin_ok_exact_strong(self):
19061907

19071908

19081909
class _Client(object):
1910+
NTH_CLIENT = AtomicCounter()
1911+
19091912
def __init__(self):
19101913
from google.cloud.spanner_v1 import ExecuteSqlRequest
19111914

19121915
self._query_options = ExecuteSqlRequest.QueryOptions(optimizer_version="1")
1916+
self._nth_client_id = _Client.NTH_CLIENT.increment()
1917+
self._nth_request = AtomicCounter()
1918+
1919+
@property
1920+
def _next_nth_request(self):
1921+
return self._nth_request.increment()
19131922

19141923

19151924
class _Instance(object):

0 commit comments

Comments
 (0)