Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Commit 89b4d01

Browse files
committed
Add support for multiplexed sessions - part 1 (classes)
- Adds `SessionOptions` and `DatabaseSessionManager` class support multiplexed session in future work. - Update `Client`, `Instance`, `Database`, `Session` and associated `Checkout` classes accordingly. - Add unit tests for new classes and update those for existing classes. Signed-off-by: currantw <taylor.curran@improving.com>
1 parent 33f3750 commit 89b4d01

File tree

15 files changed

+822
-398
lines changed

15 files changed

+822
-398
lines changed

google/cloud/spanner_dbapi/connection.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -330,8 +330,13 @@ def _session_checkout(self):
330330
"""
331331
if self.database is None:
332332
raise ValueError("Database needs to be passed for this operation")
333+
333334
if not self._session:
334-
self._session = self.database._pool.get()
335+
self._session = (
336+
self.database._session_manager.get_session_for_read_only()
337+
if self.read_only
338+
else self.database._session_manager.get_session_for_read_write()
339+
)
335340

336341
return self._session
337342

@@ -344,7 +349,7 @@ def _release_session(self):
344349
return
345350
if self.database is None:
346351
raise ValueError("Database needs to be passed for this operation")
347-
self.database._pool.put(self._session)
352+
self.database._session_manager.put_session(self._session)
348353
self._session = None
349354

350355
def transaction_checkout(self):
@@ -400,7 +405,7 @@ def close(self):
400405
self._transaction.rollback()
401406

402407
if self._own_pool and self.database:
403-
self.database._pool.clear()
408+
self.database._session_manager._pool.clear()
404409

405410
self.is_closed = True
406411

google/cloud/spanner_v1/client.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
except ImportError: # pragma: NO COVER
7171
HAS_GOOGLE_CLOUD_MONITORING_INSTALLED = False
7272

73+
from google.cloud.spanner_v1.session_options import SessionOptions
7374

7475
_CLIENT_INFO = client_info.ClientInfo(client_library_version=__version__)
7576
EMULATOR_ENV_VAR = "SPANNER_EMULATOR_HOST"
@@ -171,6 +172,9 @@ class Client(ClientWithProject):
171172
or :class:`dict`
172173
:param default_transaction_options: (Optional) Default options to use for all transactions.
173174
175+
:type session_options: :class:`~google.cloud.spanner_v1.SessionOptions`
176+
:param session_options: (Optional) Options for client sessions.
177+
174178
:raises: :class:`ValueError <exceptions.ValueError>` if both ``read_only``
175179
and ``admin`` are :data:`True`
176180
"""
@@ -193,6 +197,7 @@ def __init__(
193197
directed_read_options=None,
194198
observability_options=None,
195199
default_transaction_options: Optional[DefaultTransactionOptions] = None,
200+
session_options=SessionOptions(),
196201
):
197202
self._emulator_host = _get_spanner_emulator_host()
198203

@@ -262,6 +267,8 @@ def __init__(
262267
)
263268
self._default_transaction_options = default_transaction_options
264269

270+
self._session_options = session_options
271+
265272
@property
266273
def credentials(self):
267274
"""Getter for client's credentials.
@@ -525,3 +532,12 @@ def default_transaction_options(
525532
)
526533

527534
self._default_transaction_options = default_transaction_options
535+
536+
@property
537+
def session_options(self):
538+
"""Returns the session options for the client.
539+
540+
:rtype: :class:`~google.cloud.spanner_v1.SessionOptions`
541+
:returns: The session options for the client.
542+
"""
543+
return self._session_options

google/cloud/spanner_v1/database.py

Lines changed: 70 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
from google.cloud.spanner_admin_database_v1 import RestoreDatabaseRequest
4141
from google.cloud.spanner_admin_database_v1 import UpdateDatabaseDdlRequest
4242
from google.cloud.spanner_admin_database_v1.types import DatabaseDialect
43+
from google.cloud.spanner_v1.database_sessions_manager import DatabaseSessionsManager
4344
from google.cloud.spanner_v1.transaction import BatchTransactionId
4445
from google.cloud.spanner_v1 import ExecuteSqlRequest
4546
from google.cloud.spanner_v1 import Type
@@ -59,7 +60,6 @@
5960
from google.cloud.spanner_v1.keyset import KeySet
6061
from google.cloud.spanner_v1.merged_result_set import MergedResultSet
6162
from google.cloud.spanner_v1.pool import BurstyPool
62-
from google.cloud.spanner_v1.pool import SessionCheckout
6363
from google.cloud.spanner_v1.session import Session
6464
from google.cloud.spanner_v1.snapshot import _restart_on_unavailable
6565
from google.cloud.spanner_v1.snapshot import Snapshot
@@ -70,7 +70,6 @@
7070
from google.cloud.spanner_v1.table import Table
7171
from google.cloud.spanner_v1._opentelemetry_tracing import (
7272
add_span_event,
73-
get_current_span,
7473
trace_call,
7574
)
7675
from google.cloud.spanner_v1.metrics.metrics_capture import MetricsCapture
@@ -191,10 +190,10 @@ def __init__(
191190

192191
if pool is None:
193192
pool = BurstyPool(database_role=database_role)
194-
195-
self._pool = pool
196193
pool.bind(self)
197194

195+
self._session_manager = DatabaseSessionsManager(database=self, pool=pool)
196+
198197
@classmethod
199198
def from_pb(cls, database_pb, instance, pool=None):
200199
"""Creates an instance of this class from a protobuf.
@@ -708,7 +707,7 @@ def execute_pdml():
708707
"CloudSpanner.Database.execute_partitioned_pdml",
709708
observability_options=self.observability_options,
710709
) as span, MetricsCapture():
711-
with SessionCheckout(self._pool) as session:
710+
with SessionCheckout(self) as session:
712711
add_span_event(span, "Starting BeginTransaction")
713712
txn = api.begin_transaction(
714713
session=session.name, options=txn_options, metadata=metadata
@@ -923,7 +922,7 @@ def run_in_transaction(self, func, *args, **kw):
923922
# Check out a session and run the function in a transaction; once
924923
# done, flip the sanity check bit back.
925924
try:
926-
with SessionCheckout(self._pool) as session:
925+
with SessionCheckout(self) as session:
927926
return session.run_in_transaction(func, *args, **kw)
928927
finally:
929928
self._local.transaction_running = False
@@ -1160,6 +1159,35 @@ def observability_options(self):
11601159
return opts
11611160

11621161

1162+
class SessionCheckout(object):
1163+
"""Context manager for using a session from a database.
1164+
1165+
:type database: :class:`~google.cloud.spanner_v1.database.Database`
1166+
:param database: database to use the session from
1167+
"""
1168+
1169+
_session = None # Not checked out until '__enter__'.
1170+
1171+
def __init__(self, database):
1172+
if not isinstance(database, Database):
1173+
raise TypeError(
1174+
"{class_name} must receive an instance of {expected_class_name}. Received: {actual_class_name}".format(
1175+
class_name=self.__class__.__name__,
1176+
expected_class_name=Database.__name__,
1177+
actual_class_name=database.__class__.__name__,
1178+
)
1179+
)
1180+
1181+
self._database = database
1182+
1183+
def __enter__(self):
1184+
self._session = self._database._session_manager.get_session_for_read_write()
1185+
return self._session
1186+
1187+
def __exit__(self, *ignored):
1188+
self._database._session_manager.put_session(self._session)
1189+
1190+
11631191
class BatchCheckout(object):
11641192
"""Context manager for using a batch from a database.
11651193
@@ -1194,6 +1222,15 @@ def __init__(
11941222
isolation_level=TransactionOptions.IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED,
11951223
**kw,
11961224
):
1225+
if not isinstance(database, Database):
1226+
raise TypeError(
1227+
"{class_name} must receive an instance of {expected_class_name}. Received: {actual_class_name}".format(
1228+
class_name=self.__class__.__name__,
1229+
expected_class_name=Database.__name__,
1230+
actual_class_name=database.__class__.__name__,
1231+
)
1232+
)
1233+
11971234
self._database = database
11981235
self._session = self._batch = None
11991236
if request_options is None:
@@ -1209,10 +1246,8 @@ def __init__(
12091246

12101247
def __enter__(self):
12111248
"""Begin ``with`` block."""
1212-
current_span = get_current_span()
1213-
session = self._session = self._database._pool.get()
1214-
add_span_event(current_span, "Using session", {"id": session.session_id})
1215-
batch = self._batch = Batch(session)
1249+
self._session = self._database._session_manager.get_session_for_read_only()
1250+
batch = self._batch = Batch(self._session)
12161251
if self._request_options.transaction_tag:
12171252
batch.transaction_tag = self._request_options.transaction_tag
12181253
return batch
@@ -1235,13 +1270,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
12351270
"CommitStats: {}".format(self._batch.commit_stats),
12361271
extra={"commit_stats": self._batch.commit_stats},
12371272
)
1238-
self._database._pool.put(self._session)
1239-
current_span = get_current_span()
1240-
add_span_event(
1241-
current_span,
1242-
"Returned session to pool",
1243-
{"id": self._session.session_id},
1244-
)
1273+
self._database._session_manager.put_session(self._session)
12451274

12461275

12471276
class MutationGroupsCheckout(object):
@@ -1258,23 +1287,26 @@ class MutationGroupsCheckout(object):
12581287
"""
12591288

12601289
def __init__(self, database):
1290+
if not isinstance(database, Database):
1291+
raise TypeError(
1292+
"{class_name} must receive an instance of {expected_class_name}. Received: {actual_class_name}".format(
1293+
class_name=self.__class__.__name__,
1294+
expected_class_name=Database.__name__,
1295+
actual_class_name=database.__class__.__name__,
1296+
)
1297+
)
1298+
12611299
self._database = database
12621300
self._session = None
12631301

12641302
def __enter__(self):
12651303
"""Begin ``with`` block."""
1266-
session = self._session = self._database._pool.get()
1267-
return MutationGroups(session)
1304+
self._session = self._database._session_manager.get_session_for_read_write()
1305+
return MutationGroups(self._session)
12681306

12691307
def __exit__(self, exc_type, exc_val, exc_tb):
12701308
"""End ``with`` block."""
1271-
if isinstance(exc_val, NotFound):
1272-
# If NotFound exception occurs inside the with block
1273-
# then we validate if the session still exists.
1274-
if not self._session.exists():
1275-
self._session = self._database._pool._new_session()
1276-
self._session.create()
1277-
self._database._pool.put(self._session)
1309+
self._database._session_manager.put_session(self._session)
12781310

12791311

12801312
class SnapshotCheckout(object):
@@ -1296,24 +1328,27 @@ class SnapshotCheckout(object):
12961328
"""
12971329

12981330
def __init__(self, database, **kw):
1331+
if not isinstance(database, Database):
1332+
raise TypeError(
1333+
"{class_name} must receive an instance of {expected_class_name}. Received: {actual_class_name}".format(
1334+
class_name=self.__class__.__name__,
1335+
expected_class_name=Database.__name__,
1336+
actual_class_name=database.__class__.__name__,
1337+
)
1338+
)
1339+
12991340
self._database = database
13001341
self._session = None
13011342
self._kw = kw
13021343

13031344
def __enter__(self):
13041345
"""Begin ``with`` block."""
1305-
session = self._session = self._database._pool.get()
1306-
return Snapshot(session, **self._kw)
1346+
self._session = self._database._session_manager.get_session_for_read_only()
1347+
return Snapshot(self._session, **self._kw)
13071348

13081349
def __exit__(self, exc_type, exc_val, exc_tb):
13091350
"""End ``with`` block."""
1310-
if isinstance(exc_val, NotFound):
1311-
# If NotFound exception occurs inside the with block
1312-
# then we validate if the session still exists.
1313-
if not self._session.exists():
1314-
self._session = self._database._pool._new_session()
1315-
self._session.create()
1316-
self._database._pool.put(self._session)
1351+
self._database._session_manager.put_session(self._session)
13171352

13181353

13191354
class BatchSnapshot(object):

0 commit comments

Comments
 (0)