Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Commit 7c75774

Browse files
committed
feat: Multiplexed sessions - Update session to remove class attributes, add TODOs, and make Session._transaction default to None. Plus add some Optional typing hints.
Signed-off-by: Taylor Curran <taylor.curran@improving.com>
1 parent a7ff9f9 commit 7c75774

File tree

4 files changed

+54
-38
lines changed

4 files changed

+54
-38
lines changed

google/cloud/spanner_v1/batch.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
"""Context manager for Cloud Spanner batched writes."""
1616
import functools
1717
from datetime import datetime
18-
from typing import List
18+
from typing import List, Optional
1919

2020
from google.cloud.spanner_v1 import CommitRequest, CommitResponse
2121
from google.cloud.spanner_v1 import Mutation
@@ -53,11 +53,11 @@ def __init__(self, session):
5353
super(_BatchBase, self).__init__(session)
5454

5555
self._mutations: List[Mutation] = []
56-
self.transaction_tag: str = None
56+
self.transaction_tag: Optional[str] = None
5757

58-
self.committed: datetime = None
58+
self.committed: Optional[datetime] = None
5959
"""Timestamp at which the batch was successfully committed."""
60-
self.commit_stats: CommitResponse.CommitStats = None
60+
self.commit_stats: Optional[CommitResponse.CommitStats] = None
6161

6262
# TODO multiplexed - cleanup
6363
def _check_state(self):

google/cloud/spanner_v1/session.py

Lines changed: 44 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from functools import total_ordering
1818
import time
1919
from datetime import datetime
20+
from typing import MutableMapping, Optional
2021

2122
from google.api_core.exceptions import Aborted
2223
from google.api_core.exceptions import GoogleAPICallError
@@ -69,17 +70,20 @@ class Session(object):
6970
:param is_multiplexed: (Optional) whether this session is a multiplexed session.
7071
"""
7172

72-
_session_id = None
73-
_transaction = None
74-
7573
def __init__(self, database, labels=None, database_role=None, is_multiplexed=False):
7674
self._database = database
75+
self._session_id: Optional[str] = None
76+
77+
# TODO multiplexed - remove
78+
self._transaction: Optional[Transaction] = None
79+
7780
if labels is None:
7881
labels = {}
79-
self._labels = labels
80-
self._database_role = database_role
81-
self._is_multiplexed = is_multiplexed
82-
self._last_use_time = datetime.utcnow()
82+
83+
self._labels: MutableMapping[str, str] = labels
84+
self._database_role: Optional[str] = database_role
85+
self._is_multiplexed: bool = is_multiplexed
86+
self._last_use_time: datetime = datetime.utcnow()
8387

8488
def __lt__(self, other):
8589
return self._session_id < other._session_id
@@ -100,7 +104,7 @@ def is_multiplexed(self):
100104

101105
@property
102106
def last_use_time(self):
103-
""" "Approximate last use time of this session
107+
"""Approximate last use time of this session
104108
105109
:rtype: datetime
106110
:returns: the approximate last use time of this session"""
@@ -157,27 +161,28 @@ def create(self):
157161

158162
if self._session_id is not None:
159163
raise ValueError("Session ID already set by back-end")
160-
api = self._database.spanner_api
161-
metadata = _metadata_with_prefix(self._database.name)
162-
if self._database._route_to_leader_enabled:
164+
165+
database = self._database
166+
api = database.spanner_api
167+
168+
metadata = _metadata_with_prefix(database.name)
169+
if database._route_to_leader_enabled:
163170
metadata.append(
164-
_metadata_with_leader_aware_routing(
165-
self._database._route_to_leader_enabled
166-
)
171+
_metadata_with_leader_aware_routing(database._route_to_leader_enabled)
167172
)
168173

169-
request = CreateSessionRequest(database=self._database.name)
170-
if self._database.database_role is not None:
171-
request.session.creator_role = self._database.database_role
174+
create_session_request = CreateSessionRequest(database=database.name)
175+
if database.database_role is not None:
176+
create_session_request.session.creator_role = database.database_role
172177

173178
if self._labels:
174-
request.session.labels = self._labels
179+
create_session_request.session.labels = self._labels
175180

176181
# Set the multiplexed field for multiplexed sessions
177182
if self._is_multiplexed:
178-
request.session.multiplexed = True
183+
create_session_request.session.multiplexed = True
179184

180-
observability_options = getattr(self._database, "observability_options", None)
185+
observability_options = getattr(database, "observability_options", None)
181186
span_name = (
182187
"CloudSpanner.CreateMultiplexedSession"
183188
if self._is_multiplexed
@@ -191,9 +196,9 @@ def create(self):
191196
metadata=metadata,
192197
) as span, MetricsCapture():
193198
session_pb = api.create_session(
194-
request=request,
195-
metadata=self._database.metadata_with_request_id(
196-
self._database._next_nth_request,
199+
request=create_session_request,
200+
metadata=database.metadata_with_request_id(
201+
database._next_nth_request,
197202
1,
198203
metadata,
199204
span,
@@ -462,6 +467,7 @@ def batch(self):
462467

463468
return Batch(self)
464469

470+
# TODO multiplexed - remove
465471
def transaction(self):
466472
"""Create a transaction to perform a set of reads with shared staleness.
467473
@@ -474,7 +480,7 @@ def transaction(self):
474480

475481
if self._transaction is not None:
476482
self._transaction.rolled_back = True
477-
del self._transaction
483+
self._transaction = None
478484

479485
txn = self._transaction = Transaction(self)
480486
return txn
@@ -531,6 +537,7 @@ def run_in_transaction(self, func, *args, **kw):
531537
observability_options=observability_options,
532538
) as span, MetricsCapture():
533539
while True:
540+
# TODO multiplexed - remove
534541
if self._transaction is None:
535542
txn = self.transaction()
536543
txn.transaction_tag = transaction_tag
@@ -552,8 +559,11 @@ def run_in_transaction(self, func, *args, **kw):
552559

553560
return_value = func(txn, *args, **kw)
554561

562+
# TODO multiplexed: store previous transaction ID.
555563
except Aborted as exc:
556-
del self._transaction
564+
# TODO multiplexed - remove
565+
self._transaction = None
566+
557567
if span:
558568
delay_seconds = _get_retry_delay(
559569
exc.errors[0],
@@ -573,7 +583,9 @@ def run_in_transaction(self, func, *args, **kw):
573583
)
574584
continue
575585
except GoogleAPICallError:
576-
del self._transaction
586+
# TODO multiplexed - remove
587+
self._transaction = None
588+
577589
add_span_event(
578590
span,
579591
"User operation failed due to GoogleAPICallError, not retrying",
@@ -596,7 +608,9 @@ def run_in_transaction(self, func, *args, **kw):
596608
max_commit_delay=max_commit_delay,
597609
)
598610
except Aborted as exc:
599-
del self._transaction
611+
# TODO multiplexed - remove
612+
self._transaction = None
613+
600614
if span:
601615
delay_seconds = _get_retry_delay(
602616
exc.errors[0],
@@ -615,7 +629,9 @@ def run_in_transaction(self, func, *args, **kw):
615629
exc, deadline, attempts, default_retry_delay=default_retry_delay
616630
)
617631
except GoogleAPICallError:
618-
del self._transaction
632+
# TODO multiplexed - remove
633+
self._transaction = None
634+
619635
add_span_event(
620636
span,
621637
"Transaction.commit failed due to GoogleAPICallError, not retrying",

google/cloud/spanner_v1/snapshot.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
import functools
1818
import threading
19-
from typing import List, Union
19+
from typing import List, Union, Optional
2020

2121
from google.protobuf.struct_pb2 import Struct
2222
from google.cloud.spanner_v1 import (
@@ -225,12 +225,12 @@ def __init__(self, session):
225225
self._read_request_count: int = 0
226226

227227
# Identifier for the transaction.
228-
self._transaction_id: bytes = None
228+
self._transaction_id: Optional[bytes] = None
229229

230230
# Precommit tokens are returned for transactions with
231231
# multiplexed sessions. The precommit token with the
232232
# highest sequence number is included in the commit request.
233-
self._precommit_token: MultiplexedSessionPrecommitToken = None
233+
self._precommit_token: Optional[MultiplexedSessionPrecommitToken] = None
234234

235235
# Operations within a transaction can be performed using multiple
236236
# threads, so we need to use a lock when updating the transaction.

google/cloud/spanner_v1/transaction.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ def wrapped_method(*args, **kwargs):
259259
attempt.increment()
260260
rollback_method = functools.partial(
261261
api.rollback,
262-
session=self._session.name,
262+
session=session.name,
263263
transaction_id=self._transaction_id,
264264
metadata=database.metadata_with_request_id(
265265
nth_request,
@@ -278,7 +278,7 @@ def wrapped_method(*args, **kwargs):
278278
self.rolled_back = True
279279

280280
# TODO multiplexed - remove
281-
del self._session._transaction
281+
self._session._transaction = None
282282

283283
def commit(
284284
self, return_commit_stats=False, request_options=None, max_commit_delay=None
@@ -396,7 +396,7 @@ def before_next_retry(nth_retry, delay_in_seconds):
396396
self.commit_stats = response_pb.commit_stats
397397

398398
# TODO multiplexed - remove
399-
del self._session._transaction
399+
self._session._transaction = None
400400

401401
return self.committed
402402

0 commit comments

Comments
 (0)