Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Commit 99b9a75

Browse files
committed
Add support for multiplexed sessions - part 1 (classes)
- Adds `SessionOptions` and `DatabaseSessionManager` class support multiplexed session in future work. - Update `Client`, `Instance`, `Database`, `Session` and associated `Checkout` classes accordingly. - Add unit tests for new classes and update those for existing classes. Signed-off-by: currantw <taylor.curran@improving.com>
1 parent 33f3750 commit 99b9a75

File tree

14 files changed

+765
-382
lines changed

14 files changed

+765
-382
lines changed

google/cloud/spanner_dbapi/connection.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -330,8 +330,13 @@ def _session_checkout(self):
330330
"""
331331
if self.database is None:
332332
raise ValueError("Database needs to be passed for this operation")
333+
333334
if not self._session:
334-
self._session = self.database._pool.get()
335+
self._session = (
336+
self.database._session_manager.get_session_for_read_only()
337+
if self.read_only
338+
else self.database._session_manager.get_session_for_read_write()
339+
)
335340

336341
return self._session
337342

@@ -344,7 +349,7 @@ def _release_session(self):
344349
return
345350
if self.database is None:
346351
raise ValueError("Database needs to be passed for this operation")
347-
self.database._pool.put(self._session)
352+
self.database._session_manager.put_session(self._session)
348353
self._session = None
349354

350355
def transaction_checkout(self):
@@ -400,7 +405,7 @@ def close(self):
400405
self._transaction.rollback()
401406

402407
if self._own_pool and self.database:
403-
self.database._pool.clear()
408+
self.database._session_manager._pool.clear()
404409

405410
self.is_closed = True
406411

google/cloud/spanner_v1/client.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
except ImportError: # pragma: NO COVER
7171
HAS_GOOGLE_CLOUD_MONITORING_INSTALLED = False
7272

73+
from google.cloud.spanner_v1.session_options import SessionOptions
7374

7475
_CLIENT_INFO = client_info.ClientInfo(client_library_version=__version__)
7576
EMULATOR_ENV_VAR = "SPANNER_EMULATOR_HOST"
@@ -171,6 +172,9 @@ class Client(ClientWithProject):
171172
or :class:`dict`
172173
:param default_transaction_options: (Optional) Default options to use for all transactions.
173174
175+
:type session_options: :class:`~google.cloud.spanner_v1.SessionOptions`
176+
:param session_options: (Optional) Options for client sessions.
177+
174178
:raises: :class:`ValueError <exceptions.ValueError>` if both ``read_only``
175179
and ``admin`` are :data:`True`
176180
"""
@@ -193,6 +197,7 @@ def __init__(
193197
directed_read_options=None,
194198
observability_options=None,
195199
default_transaction_options: Optional[DefaultTransactionOptions] = None,
200+
session_options=None,
196201
):
197202
self._emulator_host = _get_spanner_emulator_host()
198203

@@ -262,6 +267,8 @@ def __init__(
262267
)
263268
self._default_transaction_options = default_transaction_options
264269

270+
self._session_options = session_options or SessionOptions()
271+
265272
@property
266273
def credentials(self):
267274
"""Getter for client's credentials.
@@ -525,3 +532,12 @@ def default_transaction_options(
525532
)
526533

527534
self._default_transaction_options = default_transaction_options
535+
536+
@property
537+
def session_options(self):
538+
"""Returns the session options for the client.
539+
540+
:rtype: :class:`~google.cloud.spanner_v1.SessionOptions`
541+
:returns: The session options for the client.
542+
"""
543+
return self._session_options

google/cloud/spanner_v1/database.py

Lines changed: 51 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
from google.cloud.spanner_admin_database_v1 import RestoreDatabaseRequest
4141
from google.cloud.spanner_admin_database_v1 import UpdateDatabaseDdlRequest
4242
from google.cloud.spanner_admin_database_v1.types import DatabaseDialect
43+
from google.cloud.spanner_v1.database_sessions_manager import DatabaseSessionsManager
4344
from google.cloud.spanner_v1.transaction import BatchTransactionId
4445
from google.cloud.spanner_v1 import ExecuteSqlRequest
4546
from google.cloud.spanner_v1 import Type
@@ -59,7 +60,6 @@
5960
from google.cloud.spanner_v1.keyset import KeySet
6061
from google.cloud.spanner_v1.merged_result_set import MergedResultSet
6162
from google.cloud.spanner_v1.pool import BurstyPool
62-
from google.cloud.spanner_v1.pool import SessionCheckout
6363
from google.cloud.spanner_v1.session import Session
6464
from google.cloud.spanner_v1.snapshot import _restart_on_unavailable
6565
from google.cloud.spanner_v1.snapshot import Snapshot
@@ -70,7 +70,6 @@
7070
from google.cloud.spanner_v1.table import Table
7171
from google.cloud.spanner_v1._opentelemetry_tracing import (
7272
add_span_event,
73-
get_current_span,
7473
trace_call,
7574
)
7675
from google.cloud.spanner_v1.metrics.metrics_capture import MetricsCapture
@@ -191,10 +190,10 @@ def __init__(
191190

192191
if pool is None:
193192
pool = BurstyPool(database_role=database_role)
194-
195-
self._pool = pool
196193
pool.bind(self)
197194

195+
self._session_manager = DatabaseSessionsManager(database=self, pool=pool)
196+
198197
@classmethod
199198
def from_pb(cls, database_pb, instance, pool=None):
200199
"""Creates an instance of this class from a protobuf.
@@ -708,7 +707,7 @@ def execute_pdml():
708707
"CloudSpanner.Database.execute_partitioned_pdml",
709708
observability_options=self.observability_options,
710709
) as span, MetricsCapture():
711-
with SessionCheckout(self._pool) as session:
710+
with SessionCheckout(self) as session:
712711
add_span_event(span, "Starting BeginTransaction")
713712
txn = api.begin_transaction(
714713
session=session.name, options=txn_options, metadata=metadata
@@ -759,6 +758,7 @@ def session(self, labels=None, database_role=None):
759758
"""
760759
# If role is specified in param, then that role is used
761760
# instead.
761+
762762
role = database_role or self._database_role
763763
return Session(self, labels=labels, database_role=role)
764764

@@ -923,7 +923,7 @@ def run_in_transaction(self, func, *args, **kw):
923923
# Check out a session and run the function in a transaction; once
924924
# done, flip the sanity check bit back.
925925
try:
926-
with SessionCheckout(self._pool) as session:
926+
with SessionCheckout(self) as session:
927927
return session.run_in_transaction(func, *args, **kw)
928928
finally:
929929
self._local.transaction_running = False
@@ -1160,6 +1160,29 @@ def observability_options(self):
11601160
return opts
11611161

11621162

1163+
class SessionCheckout(object):
1164+
"""Context manager for using a session from a database.
1165+
1166+
:type database: :class:`~google.cloud.spanner_v1.database.Database`
1167+
:param database: database to use the session from
1168+
"""
1169+
1170+
_session = None # Not checked out until '__enter__'.
1171+
1172+
def __init__(self, database):
1173+
if not isinstance(database, Database):
1174+
raise TypeError(f"database must be an instance of {Database.__name__}")
1175+
1176+
self._database = database
1177+
1178+
def __enter__(self):
1179+
self._session = self._database._session_manager.get_session_for_read_write()
1180+
return self._session
1181+
1182+
def __exit__(self, *ignored):
1183+
self._database._session_manager.put_session(self._session)
1184+
1185+
11631186
class BatchCheckout(object):
11641187
"""Context manager for using a batch from a database.
11651188
@@ -1194,6 +1217,9 @@ def __init__(
11941217
isolation_level=TransactionOptions.IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED,
11951218
**kw,
11961219
):
1220+
if not isinstance(database, Database):
1221+
raise TypeError(f"database must be an instance of {Database.__name__}")
1222+
11971223
self._database = database
11981224
self._session = self._batch = None
11991225
if request_options is None:
@@ -1209,9 +1235,9 @@ def __init__(
12091235

12101236
def __enter__(self):
12111237
"""Begin ``with`` block."""
1212-
current_span = get_current_span()
1213-
session = self._session = self._database._pool.get()
1214-
add_span_event(current_span, "Using session", {"id": session.session_id})
1238+
session = (
1239+
self._session
1240+
) = self._database._session_manager.get_session_for_read_only()
12151241
batch = self._batch = Batch(session)
12161242
if self._request_options.transaction_tag:
12171243
batch.transaction_tag = self._request_options.transaction_tag
@@ -1235,13 +1261,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
12351261
"CommitStats: {}".format(self._batch.commit_stats),
12361262
extra={"commit_stats": self._batch.commit_stats},
12371263
)
1238-
self._database._pool.put(self._session)
1239-
current_span = get_current_span()
1240-
add_span_event(
1241-
current_span,
1242-
"Returned session to pool",
1243-
{"id": self._session.session_id},
1244-
)
1264+
self._database._session_manager.put_session(self._session)
12451265

12461266

12471267
class MutationGroupsCheckout(object):
@@ -1258,23 +1278,22 @@ class MutationGroupsCheckout(object):
12581278
"""
12591279

12601280
def __init__(self, database):
1281+
if not isinstance(database, Database):
1282+
raise TypeError(f"database must be an instance of {Database.__name__}")
1283+
12611284
self._database = database
12621285
self._session = None
12631286

12641287
def __enter__(self):
12651288
"""Begin ``with`` block."""
1266-
session = self._session = self._database._pool.get()
1289+
session = (
1290+
self._session
1291+
) = self._database._session_manager.get_session_for_read_write()
12671292
return MutationGroups(session)
12681293

12691294
def __exit__(self, exc_type, exc_val, exc_tb):
12701295
"""End ``with`` block."""
1271-
if isinstance(exc_val, NotFound):
1272-
# If NotFound exception occurs inside the with block
1273-
# then we validate if the session still exists.
1274-
if not self._session.exists():
1275-
self._session = self._database._pool._new_session()
1276-
self._session.create()
1277-
self._database._pool.put(self._session)
1296+
self._database._session_manager.put_session(self._session)
12781297

12791298

12801299
class SnapshotCheckout(object):
@@ -1296,24 +1315,23 @@ class SnapshotCheckout(object):
12961315
"""
12971316

12981317
def __init__(self, database, **kw):
1318+
if not isinstance(database, Database):
1319+
raise TypeError(f"database must be an instance of {Database.__name__}")
1320+
12991321
self._database = database
13001322
self._session = None
13011323
self._kw = kw
13021324

13031325
def __enter__(self):
13041326
"""Begin ``with`` block."""
1305-
session = self._session = self._database._pool.get()
1327+
session = (
1328+
self._session
1329+
) = self._database._session_manager.get_session_for_read_only()
13061330
return Snapshot(session, **self._kw)
13071331

13081332
def __exit__(self, exc_type, exc_val, exc_tb):
13091333
"""End ``with`` block."""
1310-
if isinstance(exc_val, NotFound):
1311-
# If NotFound exception occurs inside the with block
1312-
# then we validate if the session still exists.
1313-
if not self._session.exists():
1314-
self._session = self._database._pool._new_session()
1315-
self._session.create()
1316-
self._database._pool.put(self._session)
1334+
self._database._session_manager.put_session(self._session)
13171335

13181336

13191337
class BatchSnapshot(object):
@@ -1392,6 +1410,7 @@ def _get_session(self):
13921410
Caller is responsible for cleaning up the session after
13931411
all partitions have been processed.
13941412
"""
1413+
13951414
if self._session is None:
13961415
session = self._session = self._database.session()
13971416
if self._session_id is None:
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
# Copyright 2025 Google LLC All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from google.cloud.spanner_v1.session import Session
16+
17+
18+
class DatabaseSessionsManager(object):
19+
"""Manages sessions for a Cloud Spanner database.
20+
21+
Sessions can be checked out from the session manager using :meth:`get_session_for_read_only`,
22+
:meth:`get_session_for_partitioned`, and :meth:`get_session_for_read_write`, and returned to
23+
the session manager using :meth:`put_session`.
24+
25+
The sessions returned by the session manager depend on the client's session options (see
26+
:class:`~google.cloud.spanner_v1.session_options.SessionOptions`) and the provided session
27+
(see :class:`~google.cloud.spanner_v1.pool.AbstractSessionPool`).
28+
29+
:type database: :class:`~google.cloud.spanner_v1.database.Database`
30+
:param database: The database to manage sessions for.
31+
32+
:type pool: :class:`~google.cloud.spanner_v1.pool.SessionPool`
33+
:param pool: The pool to get non-multiplexed sessions from.
34+
"""
35+
36+
def __init__(self, database, pool):
37+
self._database = database
38+
self._pool = pool
39+
40+
def get_session_for_read_only(self) -> Session:
41+
"""Checks out and returns a session for read-only transactions from the session manager.
42+
43+
:rtype: :class:`~google.cloud.spanner_v1.session.Session`
44+
:returns: a session for read-only transactions.
45+
"""
46+
47+
if (
48+
self._database._instance._client.session_options.use_multiplexed_for_partitioned
49+
):
50+
raise NotImplementedError(
51+
"Multiplexed sessions are not yet supported for read-only transactions."
52+
)
53+
54+
return self._pool.get()
55+
56+
def get_session_for_partitioned(self) -> Session:
57+
"""Checks out and returns a session for partitioned transactions from the session manager.
58+
59+
:rtype: :class:`~google.cloud.spanner_v1.session.Session`
60+
:returns: a session for partitioned transactions.
61+
"""
62+
63+
if (
64+
self._database._instance._client.session_options.use_multiplexed_for_partitioned
65+
):
66+
raise NotImplementedError(
67+
"Multiplexed sessions are not yet supported for partitioned transactions."
68+
)
69+
70+
return self._pool.get()
71+
72+
def get_session_for_read_write(self) -> Session:
73+
"""Checks out and returns a session for read/write transactions from the session manager.
74+
75+
:rtype: :class:`~google.cloud.spanner_v1.session.Session`
76+
:returns: a session for read-write transactions.
77+
"""
78+
79+
if (
80+
self._database._instance._client.session_options.use_multiplexed_for_read_write
81+
):
82+
raise NotImplementedError(
83+
"Multiplexed sessions are not yet supported for read/write transactions."
84+
)
85+
86+
return self._pool.get()
87+
88+
def put_session(self, session: Session) -> None:
89+
"""Returns the session to the database session manager."""
90+
91+
if session.is_multiplexed:
92+
raise NotImplementedError("Multiplexed sessions are not yet supported.")
93+
94+
self._pool.put(session)

0 commit comments

Comments
 (0)