Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Commit 1126b0d

Browse files
committed
feat: Multiplexed sessions - Add retry logic to run_in_transaction with previous transaction ID.
Signed-off-by: Taylor Curran <taylor.curran@improving.com>
1 parent 1f21bb9 commit 1126b0d

File tree

3 files changed

+125
-93
lines changed

3 files changed

+125
-93
lines changed

google/cloud/spanner_v1/session.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,6 @@ def batch(self):
464464

465465
return Batch(self)
466466

467-
# TODO multiplexed - deprecate
468467
def transaction(self) -> Transaction:
469468
"""Create a transaction to perform a set of reads with shared staleness.
470469
@@ -531,6 +530,12 @@ def run_in_transaction(self, func, *args, **kw):
531530
) as span, MetricsCapture():
532531
attempts: int = 0
533532

533+
# If the transaction is retried after an aborted user operation, it should include the previous transaction ID
534+
# in the transaction options used to begin the transaction. This allows the backend to recognize the transaction
535+
# and increase the lock order for the new transaction ID that is created.
536+
# See :attr:`~google.cloud.spanner_v1.types.TransactionOptions.ReadWrite.multiplexed_session_previous_transaction_id`
537+
previous_transaction_id: Optional[bytes] = None
538+
534539
while True:
535540
# [A] Build transaction
536541
# ---------------------
@@ -539,6 +544,7 @@ def run_in_transaction(self, func, *args, **kw):
539544
txn.transaction_tag = transaction_tag
540545
txn.exclude_txn_from_change_streams = exclude_txn_from_change_streams
541546
txn.isolation_level = isolation_level
547+
txn._previous_transaction_id = previous_transaction_id
542548

543549
# [B] Run user operation
544550
# ----------------------
@@ -549,8 +555,8 @@ def run_in_transaction(self, func, *args, **kw):
549555
try:
550556
return_value = func(txn, *args, **kw)
551557

552-
# TODO multiplexed: store previous transaction ID.
553558
except Aborted as exc:
559+
previous_transaction_id = txn._transaction_id
554560
if span:
555561
delay_seconds = _get_retry_delay(
556562
exc.errors[0],
@@ -598,6 +604,7 @@ def run_in_transaction(self, func, *args, **kw):
598604
)
599605

600606
except Aborted as exc:
607+
previous_transaction_id = txn._transaction_id
601608
if span:
602609
delay_seconds = _get_retry_delay(
603610
exc.errors[0],
@@ -615,13 +622,15 @@ def run_in_transaction(self, func, *args, **kw):
615622
_delay_until_retry(
616623
exc, deadline, attempts, default_retry_delay=default_retry_delay
617624
)
625+
618626
except GoogleAPICallError:
619627
add_span_event(
620628
span,
621629
"Transaction.commit failed due to GoogleAPICallError, not retrying",
622630
span_attributes,
623631
)
624632
raise
633+
625634
else:
626635
if log_commit_stats and txn.commit_stats:
627636
database.logger.info(

google/cloud/spanner_v1/transaction.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,12 @@ def __init__(self, session):
7070
super(Transaction, self).__init__(session)
7171
self.rolled_back: bool = False
7272

73+
# If this transaction is used to retry a previous aborted transaction, the
74+
# identifier for that transaction is used to increase the lock order of the new
75+
# transaction (see :meth:`_build_transaction_options_pb`). This attribute should
76+
# only be set by :meth:`~google.cloud.spanner_v1.session.Session.run_in_transaction`.
77+
self._previous_transaction_id: Optional[bytes] = None
78+
7379
def _build_transaction_options_pb(self) -> TransactionOptions:
7480
"""Builds and returns transaction options for this transaction.
7581
@@ -82,7 +88,9 @@ def _build_transaction_options_pb(self) -> TransactionOptions:
8288
)
8389

8490
merge_transaction_options = TransactionOptions(
85-
read_write=TransactionOptions.ReadWrite(),
91+
read_write=TransactionOptions.ReadWrite(
92+
multiplexed_session_previous_transaction_id=self._previous_transaction_id
93+
),
8694
exclude_txn_from_change_streams=self.exclude_txn_from_change_streams,
8795
isolation_level=self.isolation_level,
8896
)

0 commit comments

Comments
 (0)