Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Commit 1f21bb9

Browse files
committed
feat: Multiplexed sessions - Refactor logic for creating transaction selector to base class.
Signed-off-by: Taylor Curran <taylor.curran@improving.com>
1 parent bda6e55 commit 1f21bb9

File tree

5 files changed

+293
-341
lines changed

5 files changed

+293
-341
lines changed

google/cloud/spanner_v1/snapshot.py

Lines changed: 49 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ def _restart_on_unavailable(
9393
item_buffer: List[PartialResultSet] = []
9494

9595
if transaction is not None:
96-
transaction_selector = transaction._make_txn_selector()
96+
transaction_selector = transaction._build_transaction_selector_pb()
9797
elif transaction_selector is None:
9898
raise InvalidArgument(
9999
"Either transaction or transaction_selector should be set"
@@ -149,7 +149,7 @@ def _restart_on_unavailable(
149149
) as span, MetricsCapture():
150150
request.resume_token = resume_token
151151
if transaction is not None:
152-
transaction_selector = transaction._make_txn_selector()
152+
transaction_selector = transaction._build_transaction_selector_pb()
153153
request.transaction = transaction_selector
154154
attempt += 1
155155
iterator = method(
@@ -180,7 +180,7 @@ def _restart_on_unavailable(
180180
) as span, MetricsCapture():
181181
request.resume_token = resume_token
182182
if transaction is not None:
183-
transaction_selector = transaction._make_txn_selector()
183+
transaction_selector = transaction._build_transaction_selector_pb()
184184
attempt += 1
185185
request.transaction = transaction_selector
186186
iterator = method(
@@ -238,17 +238,6 @@ def __init__(self, session):
238238
# threads, so we need to use a lock when updating the transaction.
239239
self._lock: threading.Lock = threading.Lock()
240240

241-
def _make_txn_selector(self):
242-
"""Helper for :meth:`read` / :meth:`execute_sql`.
243-
244-
Subclasses must override, returning an instance of
245-
:class:`transaction_pb2.TransactionSelector`
246-
appropriate for making ``read`` / ``execute_sql`` requests
247-
248-
:raises: NotImplementedError, always
249-
"""
250-
raise NotImplementedError
251-
252241
def begin(self) -> bytes:
253242
"""Begins a transaction on the database.
254243
@@ -732,7 +721,7 @@ def partition_read(
732721
metadata.append(
733722
_metadata_with_leader_aware_routing(database._route_to_leader_enabled)
734723
)
735-
transaction = self._make_txn_selector()
724+
transaction = self._build_transaction_selector_pb()
736725
partition_options = PartitionOptions(
737726
partition_size_bytes=partition_size_bytes, max_partitions=max_partitions
738727
)
@@ -854,7 +843,7 @@ def partition_query(
854843
metadata.append(
855844
_metadata_with_leader_aware_routing(database._route_to_leader_enabled)
856845
)
857-
transaction = self._make_txn_selector()
846+
transaction = self._build_transaction_selector_pb()
858847
partition_options = PartitionOptions(
859848
partition_size_bytes=partition_size_bytes, max_partitions=max_partitions
860849
)
@@ -944,7 +933,7 @@ def _begin_transaction(self, mutation: Mutation = None) -> bytes:
944933
def wrapped_method():
945934
begin_transaction_request = BeginTransactionRequest(
946935
session=session.name,
947-
options=self._make_txn_selector().begin,
936+
options=self._build_transaction_selector_pb().begin,
948937
mutation_key=mutation,
949938
)
950939
begin_transaction_method = functools.partial(
@@ -983,6 +972,34 @@ def before_next_retry(nth_retry, delay_in_seconds):
983972
self._update_for_transaction_pb(transaction_pb)
984973
return self._transaction_id
985974

975+
def _build_transaction_options_pb(self) -> TransactionOptions:
976+
"""Builds and returns the transaction options for this snapshot.
977+
978+
:rtype: :class:`transaction_pb2.TransactionOptions`
979+
:returns: the transaction options for this snapshot.
980+
"""
981+
raise NotImplementedError
982+
983+
def _build_transaction_selector_pb(self) -> TransactionSelector:
984+
"""Builds and returns a transaction selector for this snapshot.
985+
986+
:rtype: :class:`transaction_pb2.TransactionSelector`
987+
:returns: a transaction selector for this snapshot.
988+
"""
989+
990+
# Select a previously begun transaction.
991+
if self._transaction_id is not None:
992+
return TransactionSelector(id=self._transaction_id)
993+
994+
options = self._build_transaction_options_pb()
995+
996+
# Select a single-use transaction.
997+
if not self._multi_use:
998+
return TransactionSelector(single_use=options)
999+
1000+
# Select a new, multi-use transaction.
1001+
return TransactionSelector(begin=options)
1002+
9861003
def _update_for_result_set_pb(
9871004
self, result_set_pb: Union[ResultSet, PartialResultSet]
9881005
) -> None:
@@ -1101,38 +1118,28 @@ def __init__(
11011118
self._multi_use = multi_use
11021119
self._transaction_id = transaction_id
11031120

1104-
# TODO multiplexed - refactor to base class
1105-
def _make_txn_selector(self):
1106-
"""Helper for :meth:`read`."""
1107-
if self._transaction_id is not None:
1108-
return TransactionSelector(id=self._transaction_id)
1121+
def _build_transaction_options_pb(self) -> TransactionOptions:
1122+
"""Builds and returns transaction options for this snapshot.
1123+
1124+
:rtype: :class:`transaction_pb2.TransactionOptions`
1125+
:returns: transaction options for this snapshot.
1126+
"""
1127+
1128+
read_only_pb_args = dict(return_read_timestamp=True)
11091129

11101130
if self._read_timestamp:
1111-
key = "read_timestamp"
1112-
value = self._read_timestamp
1131+
read_only_pb_args["read_timestamp"] = self._read_timestamp
11131132
elif self._min_read_timestamp:
1114-
key = "min_read_timestamp"
1115-
value = self._min_read_timestamp
1133+
read_only_pb_args["min_read_timestamp"] = self._min_read_timestamp
11161134
elif self._max_staleness:
1117-
key = "max_staleness"
1118-
value = self._max_staleness
1135+
read_only_pb_args["max_staleness"] = self._max_staleness
11191136
elif self._exact_staleness:
1120-
key = "exact_staleness"
1121-
value = self._exact_staleness
1137+
read_only_pb_args["exact_staleness"] = self._exact_staleness
11221138
else:
1123-
key = "strong"
1124-
value = True
1125-
1126-
options = TransactionOptions(
1127-
read_only=TransactionOptions.ReadOnly(
1128-
**{key: value, "return_read_timestamp": True}
1129-
)
1130-
)
1139+
read_only_pb_args["strong"] = True
11311140

1132-
if self._multi_use:
1133-
return TransactionSelector(begin=options)
1134-
else:
1135-
return TransactionSelector(single_use=options)
1141+
read_only_pb = TransactionOptions.ReadOnly(**read_only_pb_args)
1142+
return TransactionOptions(read_only=read_only_pb)
11361143

11371144
def _update_for_transaction_pb(self, transaction_pb: Transaction) -> None:
11381145
"""Updates the snapshot for the given transaction.

google/cloud/spanner_v1/transaction.py

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
)
3636
from google.cloud.spanner_v1 import ExecuteBatchDmlRequest
3737
from google.cloud.spanner_v1 import ExecuteSqlRequest
38-
from google.cloud.spanner_v1 import TransactionSelector
3938
from google.cloud.spanner_v1 import TransactionOptions
4039
from google.cloud.spanner_v1._helpers import AtomicCounter
4140
from google.cloud.spanner_v1.snapshot import _SnapshotBase
@@ -71,27 +70,27 @@ def __init__(self, session):
7170
super(Transaction, self).__init__(session)
7271
self.rolled_back: bool = False
7372

74-
def _make_txn_selector(self):
75-
"""Helper for :meth:`read`.
73+
def _build_transaction_options_pb(self) -> TransactionOptions:
74+
"""Builds and returns transaction options for this transaction.
7675
77-
:rtype: :class:`~.transaction_pb2.TransactionSelector`
78-
:returns: a selector configured for read-write transaction semantics.
76+
:rtype: :class:`~.transaction_pb2.TransactionOptions`
77+
:returns: transaction options for this transaction.
7978
"""
8079

81-
if self._transaction_id is None:
82-
txn_options = TransactionOptions(
83-
read_write=TransactionOptions.ReadWrite(),
84-
exclude_txn_from_change_streams=self.exclude_txn_from_change_streams,
85-
isolation_level=self.isolation_level,
86-
)
80+
default_transaction_options = (
81+
self._session._database.default_transaction_options.default_read_write_transaction_options
82+
)
8783

88-
txn_options = _merge_Transaction_Options(
89-
self._session._database.default_transaction_options.default_read_write_transaction_options,
90-
txn_options,
91-
)
92-
return TransactionSelector(begin=txn_options)
93-
else:
94-
return TransactionSelector(id=self._transaction_id)
84+
merge_transaction_options = TransactionOptions(
85+
read_write=TransactionOptions.ReadWrite(),
86+
exclude_txn_from_change_streams=self.exclude_txn_from_change_streams,
87+
isolation_level=self.isolation_level,
88+
)
89+
90+
return _merge_Transaction_Options(
91+
defaultTransactionOptions=default_transaction_options,
92+
mergeTransactionOptions=merge_transaction_options,
93+
)
9594

9695
def _execute_request(
9796
self,
@@ -118,7 +117,7 @@ def _execute_request(
118117
raise ValueError("Transaction already rolled back.")
119118

120119
session = self._session
121-
transaction = self._make_txn_selector()
120+
transaction = self._build_transaction_selector_pb()
122121
request.transaction = transaction
123122

124123
with trace_call(
@@ -469,7 +468,7 @@ def execute_update(
469468

470469
execute_sql_request = ExecuteSqlRequest(
471470
session=session.name,
472-
transaction=self._make_txn_selector(),
471+
transaction=self._build_transaction_selector_pb(),
473472
sql=dml,
474473
params=params_pb,
475474
param_types=param_types,
@@ -617,7 +616,7 @@ def batch_update(
617616

618617
execute_batch_dml_request = ExecuteBatchDmlRequest(
619618
session=session.name,
620-
transaction=self._make_txn_selector(),
619+
transaction=self._build_transaction_selector_pb(),
621620
statements=parsed,
622621
seqno=seqno,
623622
request_options=request_options,

tests/_builders.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,19 @@ def build_session(**kwargs: Mapping) -> Session:
172172
return Session(**kwargs)
173173

174174

175+
def build_snapshot(**kwargs):
176+
"""Builds and returns a snapshot for testing using the given arguments.
177+
If a required argument is not provided, a default value will be used."""
178+
179+
session = kwargs.pop("session", build_session())
180+
181+
# Ensure session exists.
182+
if session.session_id is None:
183+
session._session_id = _SESSION_ID
184+
185+
return session.snapshot(**kwargs)
186+
187+
175188
def build_transaction(session=None) -> Transaction:
176189
"""Builds and returns a transaction for testing using the given arguments.
177190
If a required argument is not provided, a default value will be used."""

0 commit comments

Comments
 (0)