Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Commit e42f099

Browse files
committed
feat: Multiplexed sessions - Only populate previous transaction ID for transactions with multiplexed session.
Signed-off-by: Taylor Curran <taylor.curran@improving.com>
1 parent 18f1a6c commit e42f099

File tree

3 files changed

+107
-26
lines changed

3 files changed

+107
-26
lines changed

google/cloud/spanner_v1/session.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -530,9 +530,11 @@ def run_in_transaction(self, func, *args, **kw):
530530
) as span, MetricsCapture():
531531
attempts: int = 0
532532

533-
# If the transaction is retried after an aborted user operation, it should include the previous transaction ID
534-
# in the transaction options used to begin the transaction. This allows the backend to recognize the transaction
535-
# and increase the lock order for the new transaction ID that is created.
533+
# If a transaction using a multiplexed session is retried after an aborted
534+
# user operation, it should include the previous transaction ID in the
535+
# transaction options used to begin the transaction. This allows the backend
536+
# to recognize the transaction and increase the lock order for the new
537+
# transaction that is created.
536538
# See :attr:`~google.cloud.spanner_v1.types.TransactionOptions.ReadWrite.multiplexed_session_previous_transaction_id`
537539
previous_transaction_id: Optional[bytes] = None
538540

@@ -541,7 +543,11 @@ def run_in_transaction(self, func, *args, **kw):
541543
txn.transaction_tag = transaction_tag
542544
txn.exclude_txn_from_change_streams = exclude_txn_from_change_streams
543545
txn.isolation_level = isolation_level
544-
txn._previous_transaction_id = previous_transaction_id
546+
547+
if self.is_multiplexed:
548+
txn._multiplexed_session_previous_transaction_id = (
549+
previous_transaction_id
550+
)
545551

546552
attempts += 1
547553
span_attributes = dict(attempt=attempts)

google/cloud/spanner_v1/transaction.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,11 @@ def __init__(self, session):
7070
super(Transaction, self).__init__(session)
7171
self.rolled_back: bool = False
7272

73-
# If this transaction is used to retry a previous aborted transaction, the
74-
# identifier for that transaction is used to increase the lock order of the new
75-
# transaction (see :meth:`_build_transaction_options_pb`). This attribute should
76-
# only be set by :meth:`~google.cloud.spanner_v1.session.Session.run_in_transaction`.
77-
self._previous_transaction_id: Optional[bytes] = None
73+
# If this transaction is used to retry a previous aborted transaction with a
74+
# multiplexed session, the identifier for that transaction is used to increase
75+
# the lock order of the new transaction (see :meth:`_build_transaction_options_pb`).
76+
# This attribute should only be set by :meth:`~google.cloud.spanner_v1.session.Session.run_in_transaction`.
77+
self._multiplexed_session_previous_transaction_id: Optional[bytes] = None
7878

7979
def _build_transaction_options_pb(self) -> TransactionOptions:
8080
"""Builds and returns transaction options for this transaction.
@@ -89,7 +89,7 @@ def _build_transaction_options_pb(self) -> TransactionOptions:
8989

9090
merge_transaction_options = TransactionOptions(
9191
read_write=TransactionOptions.ReadWrite(
92-
multiplexed_session_previous_transaction_id=self._previous_transaction_id
92+
multiplexed_session_previous_transaction_id=self._multiplexed_session_previous_transaction_id
9393
),
9494
exclude_txn_from_change_streams=self.exclude_txn_from_change_streams,
9595
isolation_level=self.isolation_level,

tests/unit/test_session.py

Lines changed: 91 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,12 @@
3333
from google.cloud._helpers import UTC, _datetime_to_pb_timestamp
3434
from google.cloud.spanner_v1._helpers import _delay_until_retry
3535
from google.cloud.spanner_v1.transaction import Transaction
36-
from tests._builders import build_spanner_api, build_session, build_transaction_pb
36+
from tests._builders import (
37+
build_spanner_api,
38+
build_session,
39+
build_transaction_pb,
40+
build_commit_response_pb,
41+
)
3742
from tests._helpers import (
3843
OpenTelemetryBase,
3944
LIB_VERSION,
@@ -1051,6 +1056,41 @@ def unit_of_work(txn, *args, **kw):
10511056
def test_run_in_transaction_retry_callback_raises_abort(self):
10521057
session = build_session()
10531058
database = session._database
1059+
1060+
# Build API responses.
1061+
api = database.spanner_api
1062+
begin_transaction = api.begin_transaction
1063+
streaming_read = api.streaming_read
1064+
streaming_read.side_effect = [_make_rpc_error(Aborted), []]
1065+
1066+
# Run in transaction.
1067+
def unit_of_work(transaction):
1068+
transaction.begin()
1069+
list(transaction.read(TABLE_NAME, COLUMNS, KEYSET))
1070+
1071+
session.create()
1072+
session.run_in_transaction(unit_of_work)
1073+
1074+
self.assertEqual(begin_transaction.call_count, 2)
1075+
1076+
begin_transaction.assert_called_with(
1077+
request=BeginTransactionRequest(
1078+
session=session.name,
1079+
options=TransactionOptions(read_write=TransactionOptions.ReadWrite()),
1080+
),
1081+
metadata=[
1082+
("google-cloud-resource-prefix", database.name),
1083+
("x-goog-spanner-route-to-leader", "true"),
1084+
(
1085+
"x-goog-spanner-request-id",
1086+
f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.4.1",
1087+
),
1088+
],
1089+
)
1090+
1091+
def test_run_in_transaction_retry_callback_raises_abort_multiplexed(self):
1092+
session = build_session(is_multiplexed=True)
1093+
database = session._database
10541094
api = database.spanner_api
10551095

10561096
# Build API responses
@@ -1093,6 +1133,51 @@ def unit_of_work(transaction):
10931133
],
10941134
)
10951135

1136+
def test_run_in_transaction_retry_commit_raises_abort_multiplexed(self):
1137+
session = build_session(is_multiplexed=True)
1138+
database = session._database
1139+
1140+
# Build API responses
1141+
api = database.spanner_api
1142+
previous_transaction_id = b"transaction-id"
1143+
begin_transaction = api.begin_transaction
1144+
begin_transaction.return_value = build_transaction_pb(
1145+
id=previous_transaction_id
1146+
)
1147+
1148+
commit = api.commit
1149+
commit.side_effect = [_make_rpc_error(Aborted), build_commit_response_pb()]
1150+
1151+
# Run in transaction.
1152+
def unit_of_work(transaction):
1153+
transaction.begin()
1154+
list(transaction.read(TABLE_NAME, COLUMNS, KEYSET))
1155+
1156+
session.create()
1157+
session.run_in_transaction(unit_of_work)
1158+
1159+
# Verify retried BeginTransaction API call.
1160+
self.assertEqual(begin_transaction.call_count, 2)
1161+
1162+
begin_transaction.assert_called_with(
1163+
request=BeginTransactionRequest(
1164+
session=session.name,
1165+
options=TransactionOptions(
1166+
read_write=TransactionOptions.ReadWrite(
1167+
multiplexed_session_previous_transaction_id=previous_transaction_id
1168+
)
1169+
),
1170+
),
1171+
metadata=[
1172+
("google-cloud-resource-prefix", database.name),
1173+
("x-goog-spanner-route-to-leader", "true"),
1174+
(
1175+
"x-goog-spanner-request-id",
1176+
f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.5.1",
1177+
),
1178+
],
1179+
)
1180+
10961181
def test_run_in_transaction_w_args_w_kwargs_wo_abort(self):
10971182
VALUES = [
10981183
["phred@exammple.com", "Phred", "Phlyntstone", 32],
@@ -1279,9 +1364,7 @@ def unit_of_work(txn, *args, **kw):
12791364
request=BeginTransactionRequest(
12801365
session=session.name,
12811366
options=TransactionOptions(
1282-
read_write=TransactionOptions.ReadWrite(
1283-
multiplexed_session_previous_transaction_id=TRANSACTION_ID
1284-
)
1367+
read_write=TransactionOptions.ReadWrite()
12851368
),
12861369
),
12871370
metadata=[
@@ -1395,9 +1478,7 @@ def unit_of_work(txn, *args, **kw):
13951478
request=BeginTransactionRequest(
13961479
session=session.name,
13971480
options=TransactionOptions(
1398-
read_write=TransactionOptions.ReadWrite(
1399-
multiplexed_session_previous_transaction_id=TRANSACTION_ID
1400-
)
1481+
read_write=TransactionOptions.ReadWrite()
14011482
),
14021483
),
14031484
metadata=[
@@ -1662,9 +1743,7 @@ def _time(_results=[1, 2, 4, 8]):
16621743
request=BeginTransactionRequest(
16631744
session=session.name,
16641745
options=TransactionOptions(
1665-
read_write=TransactionOptions.ReadWrite(
1666-
multiplexed_session_previous_transaction_id=TRANSACTION_ID
1667-
)
1746+
read_write=TransactionOptions.ReadWrite()
16681747
),
16691748
),
16701749
metadata=[
@@ -1680,9 +1759,7 @@ def _time(_results=[1, 2, 4, 8]):
16801759
request=BeginTransactionRequest(
16811760
session=session.name,
16821761
options=TransactionOptions(
1683-
read_write=TransactionOptions.ReadWrite(
1684-
multiplexed_session_previous_transaction_id=TRANSACTION_ID
1685-
)
1762+
read_write=TransactionOptions.ReadWrite()
16861763
),
16871764
),
16881765
metadata=[
@@ -2074,9 +2151,7 @@ def unit_of_work(txn, *args, **kw):
20742151
request=BeginTransactionRequest(
20752152
session=session.name,
20762153
options=TransactionOptions(
2077-
read_write=TransactionOptions.ReadWrite(
2078-
multiplexed_session_previous_transaction_id=TRANSACTION_ID
2079-
),
2154+
read_write=TransactionOptions.ReadWrite(),
20802155
exclude_txn_from_change_streams=True,
20812156
),
20822157
),

0 commit comments

Comments
 (0)