Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Commit a9b3d02

Browse files
committed
feat: Add support for multiplexed sessions - part 2 (read-only transactions)
- Enabled use of multiplexed sessions for read-only transactions - Add maintenance threads for refreshing multiplexed sessions Signed-off-by: currantw <taylor.curran@improving.com>
1 parent 8619929 commit a9b3d02

File tree

5 files changed

+411
-35
lines changed

5 files changed

+411
-35
lines changed

google/cloud/spanner_v1/database.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,9 @@ def __init__(
192192
pool = BurstyPool(database_role=database_role)
193193
pool.bind(self)
194194

195-
self._session_manager = DatabaseSessionsManager(database=self, pool=pool)
195+
self._session_manager = DatabaseSessionsManager(
196+
database=self, pool=pool, logger=self.logger
197+
)
196198

197199
@classmethod
198200
def from_pb(cls, database_pb, instance, pool=None):

google/cloud/spanner_v1/database_sessions_manager.py

Lines changed: 220 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,13 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14+
import datetime
15+
import threading
16+
import time
17+
import weakref
18+
19+
from google.api_core.exceptions import MethodNotImplemented
20+
1421
from google.cloud.spanner_v1._opentelemetry_tracing import (
1522
get_current_span,
1623
add_span_event,
@@ -34,27 +41,45 @@ class DatabaseSessionsManager(object):
3441
3542
:type pool: :class:`~google.cloud.spanner_v1.pool.AbstractSessionPool`
3643
:param pool: The pool to get non-multiplexed sessions from.
44+
45+
:type logger: :class:`logging.Logger`
46+
:param logger: Logger for the database session manager.
3747
"""
3848

39-
def __init__(self, database, pool):
49+
# Intervals for the maintenance thread to check and refresh the multiplexed session.
50+
_MAINTENANCE_THREAD_POLLING_INTERVAL = datetime.timedelta(hours=1)
51+
_MAINTENANCE_THREAD_REFRESH_INTERVAL = datetime.timedelta(days=7)
52+
53+
def __init__(self, database, pool, logger):
4054
self._database = database
55+
self._logger = logger
56+
57+
# The session pool manages non-multiplexed sessions, and
58+
# will only be used if multiplexed sessions are not enabled.
4159
self._pool = pool
4260

61+
# Declare multiplexed session attributes. When a multiplexed session for the
62+
# database session manager is created, a maintenance thread is initialized to
63+
# periodically delete and recreate the multiplexed session so that it remains
64+
# valid. Because of this concurrency, we need to use a lock whenever we access
65+
# the multiplexed session to avoid any race conditions. We also create an event
66+
# so that the thread can terminate if the use of multiplexed session has been
67+
# disabled for all transactions.
68+
self._multiplexed_session = None
69+
self._multiplexed_session_maintenance_thread = None
70+
self._multiplexed_session_lock = threading.Lock()
71+
self._is_multiplexed_sessions_disabled_event = threading.Event()
72+
4373
def get_session_for_read_only(self) -> Session:
4474
"""Returns a session for read-only transactions from the database session manager.
4575
4676
:rtype: :class:`~google.cloud.spanner_v1.session.Session`
4777
:returns: a session for read-only transactions.
4878
"""
4979

50-
if (
51-
self._database._instance._client.session_options.use_multiplexed_for_read_only()
52-
):
53-
raise NotImplementedError(
54-
"Multiplexed sessions are not yet supported for read-only transactions."
55-
)
56-
57-
return self._get_pooled_session()
80+
return self._get_session(
81+
use_multiplexed=self._database._instance._client.session_options.use_multiplexed_for_read_only()
82+
)
5883

5984
def get_session_for_partitioned(self) -> Session:
6085
"""Returns a session for partitioned transactions from the database session manager.
@@ -70,7 +95,7 @@ def get_session_for_partitioned(self) -> Session:
7095
"Multiplexed sessions are not yet supported for partitioned transactions."
7196
)
7297

73-
return self._get_pooled_session()
98+
return self._get_session(use_multiplexed=False)
7499

75100
def get_session_for_read_write(self) -> Session:
76101
"""Returns a session for read/write transactions from the database session manager.
@@ -86,15 +111,20 @@ def get_session_for_read_write(self) -> Session:
86111
"Multiplexed sessions are not yet supported for read/write transactions."
87112
)
88113

89-
return self._get_pooled_session()
114+
return self._get_session(use_multiplexed=False)
90115

91116
def put_session(self, session: Session) -> None:
92-
"""Returns the session to the database session manager."""
117+
"""Returns the session to the database session manager.
93118
94-
if session.is_multiplexed:
95-
raise NotImplementedError("Multiplexed sessions are not yet supported.")
119+
:type session: :class:`~google.cloud.spanner_v1.session.Session`
120+
:param session: The session to return to the database session manager.
121+
"""
96122

97-
self._pool.put(session)
123+
# No action is needed for multiplexed sessions: the session
124+
# pool is only used for managing non-multiplexed sessions,
125+
# since they can only process one transaction at a time.
126+
if not session.is_multiplexed:
127+
self._pool.put(session)
98128

99129
current_span = get_current_span()
100130
add_span_event(
@@ -103,14 +133,186 @@ def put_session(self, session: Session) -> None:
103133
{"id": session.session_id, "multiplexed": session.is_multiplexed},
104134
)
105135

106-
def _get_pooled_session(self):
107-
"""Returns a non-multiplexed session from the session pool."""
136+
def _get_session(self, use_multiplexed: bool) -> Session:
137+
"""Returns a session from the database session manager.
138+
139+
If use_multiplexed is True, returns a multiplexed session if
140+
multiplexed sessions are supported. If multiplexed sessions are
141+
not supported or if use_multiplexed is False, returns a non-
142+
multiplexed session from the session pool.
143+
144+
:type use_multiplexed: bool
145+
:param use_multiplexed: Whether to try to get a multiplexed session.
146+
147+
:rtype: :class:`~google.cloud.spanner_v1.session.Session`
148+
:returns: a session for the database session manager.
149+
"""
150+
151+
if use_multiplexed:
152+
try:
153+
session = self._get_multiplexed_session()
154+
155+
# If multiplexed sessions are not supported, disable
156+
# them for all transactions and return a non-multiplexed session.
157+
except MethodNotImplemented:
158+
self._disable_multiplexed_sessions()
159+
session = self._pool.get()
160+
161+
else:
162+
session = self._pool.get()
108163

109-
session = self._pool.get()
110164
add_span_event(
111165
get_current_span(),
112166
"Using session",
113167
{"id": session.session_id, "multiplexed": session.is_multiplexed},
114168
)
115169

116170
return session
171+
172+
def _get_multiplexed_session(self) -> Session:
173+
"""Returns a multiplexed session from the database session manager.
174+
175+
If the multiplexed session is not defined, creates a new multiplexed
176+
session and starts a maintenance thread to periodically delete and
177+
recreate it so that it remains valid. Otherwise, simply returns the
178+
current multiplexed session.
179+
180+
:raises MethodNotImplemented:
181+
if multiplexed sessions are not supported.
182+
183+
:rtype: :class:`~google.cloud.spanner_v1.session.Session`
184+
:returns: a multiplexed session.
185+
"""
186+
187+
with self._multiplexed_session_lock:
188+
if self._multiplexed_session is None:
189+
self._multiplexed_session = self._build_multiplexed_session()
190+
191+
# Build and start a thread to maintain the multiplexed session.
192+
self._multiplexed_session_maintenance_thread = (
193+
self._build_maintenance_thread()
194+
)
195+
self._multiplexed_session_maintenance_thread.start()
196+
197+
add_span_event(
198+
get_current_span(),
199+
"Using session",
200+
{"id": self._multiplexed_session.session_id, "multiplexed": True},
201+
)
202+
203+
return self._multiplexed_session
204+
205+
def _build_multiplexed_session(self) -> Session:
206+
"""Builds and returns a new multiplexed session for the database session manager.
207+
208+
:raises MethodNotImplemented:
209+
if multiplexed sessions are not supported.
210+
211+
:rtype: :class:`~google.cloud.spanner_v1.session.Session`
212+
:returns: a new multiplexed session.
213+
"""
214+
215+
session = Session(
216+
database=self._database,
217+
database_role=self._database.database_role,
218+
is_multiplexed=True,
219+
)
220+
221+
session.create()
222+
223+
self._logger.info("Created multiplexed session.")
224+
225+
return session
226+
227+
def _disable_multiplexed_sessions(self) -> None:
228+
"""Disables multiplexed sessions for all transactions."""
229+
230+
self._logger.warning(
231+
"Multiplexed session creation failed. Disabling multiplexed sessions."
232+
)
233+
234+
session_options = self._database._instance._client.session_options
235+
session_options.disable_multiplexed_for_read_only()
236+
session_options.disable_multiplexed_for_partitioned()
237+
session_options.disable_multiplexed_for_read_write()
238+
239+
self._multiplexed_session = None
240+
self._is_multiplexed_sessions_disabled_event.set()
241+
242+
def _build_maintenance_thread(self) -> threading.Thread:
243+
"""Builds and returns a multiplexed session maintenance thread for
244+
the database session manager. This thread will periodically delete
245+
and recreate the multiplexed session to ensure that it is always valid.
246+
247+
:rtype: :class:`threading.Thread`
248+
:returns: a multiplexed session maintenance thread.
249+
"""
250+
251+
# Use a weak reference to the database session manager to avoid
252+
# creating a circular reference that would prevent the database
253+
# session manager from being garbage collected.
254+
session_manager_ref = weakref.ref(self)
255+
256+
return threading.Thread(
257+
target=self._maintain_multiplexed_session,
258+
name=f"maintenance-multiplexed-session-{self._multiplexed_session.name}",
259+
args=[session_manager_ref],
260+
daemon=True,
261+
)
262+
263+
@staticmethod
264+
def _maintain_multiplexed_session(session_manager_ref) -> None:
265+
"""Maintains the multiplexed session for the database session manager.
266+
267+
This method will delete and recreate the referenced database session manager's
268+
multiplexed session to ensure that it is always valid. The method will run until
269+
the database session manager is deleted, the multiplexed session is deleted, or
270+
building a multiplexed session fails.
271+
272+
:type session_manager_ref: :class:`_weakref.ReferenceType`
273+
:param session_manager_ref: A weak reference to the database session manager.
274+
"""
275+
276+
session_manager = session_manager_ref()
277+
if session_manager is None:
278+
return
279+
280+
polling_interval_seconds = (
281+
session_manager._MAINTENANCE_THREAD_POLLING_INTERVAL.total_seconds()
282+
)
283+
refresh_interval_seconds = (
284+
session_manager._MAINTENANCE_THREAD_REFRESH_INTERVAL.total_seconds()
285+
)
286+
287+
session_created_time = time.time()
288+
289+
while True:
290+
# Terminate the thread is the database session manager has been deleted.
291+
session_manager = session_manager_ref()
292+
if session_manager is None:
293+
return
294+
295+
# Terminate the thread if the use of multiplexed sessions has been disabled.
296+
if session_manager._is_multiplexed_sessions_disabled_event.is_set():
297+
return
298+
299+
# Wait for until the refresh interval has elapsed.
300+
if time.time() - session_created_time < refresh_interval_seconds:
301+
time.sleep(polling_interval_seconds)
302+
continue
303+
304+
with session_manager._multiplexed_session_lock:
305+
session_manager._multiplexed_session.delete()
306+
307+
try:
308+
session_manager._multiplexed_session = (
309+
session_manager._build_multiplexed_session()
310+
)
311+
312+
# Disable multiplexed sessions for all transactions and terminate
313+
# the thread if building a multiplexed session fails.
314+
except MethodNotImplemented:
315+
session_manager._disable_multiplexed_sessions()
316+
return
317+
318+
session_created_time = time.time()

google/cloud/spanner_v1/session_options.py

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,13 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
import datetime
1514
import os
1615

16+
from google.cloud.spanner_v1._opentelemetry_tracing import (
17+
get_current_span,
18+
add_span_event,
19+
)
20+
1721

1822
class SessionOptions(object):
1923
"""Represents the session options for the Cloud Spanner Python client.
@@ -22,9 +26,12 @@ class SessionOptions(object):
2226
* read-only transactions (:meth:`use_multiplexed_for_read_only`)
2327
* partitioned transactions (:meth:`use_multiplexed_for_partitioned`)
2428
* read/write transactions (:meth:`use_multiplexed_for_read_write`).
25-
"""
2629
27-
MULTIPLEXED_SESSIONS_REFRESH_INTERVAL = datetime.timedelta(days=7)
30+
The use of multiplexed session can be disabled for corresponding transaction types by calling:
31+
* :meth:`disable_multiplexed_for_read_only`
32+
* :meth:`disable_multiplexed_for_partitioned`
33+
* :meth:`disable_multiplexed_for_read_write`.
34+
"""
2835

2936
# Environment variables for multiplexed sessions
3037
ENV_VAR_ENABLE_MULTIPLEXED = "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS"
@@ -61,6 +68,13 @@ def use_multiplexed_for_read_only(self) -> bool:
6168

6269
def disable_multiplexed_for_read_only(self) -> None:
6370
"""Disables the use of multiplexed sessions for read-only transactions."""
71+
72+
current_span = get_current_span()
73+
add_span_event(
74+
current_span,
75+
"Disabling use of multiplexed session for read-only transactions",
76+
)
77+
6478
self._is_multiplexed_enabled_for_read_only = False
6579

6680
def use_multiplexed_for_partitioned(self) -> bool:
@@ -81,6 +95,13 @@ def use_multiplexed_for_partitioned(self) -> bool:
8195

8296
def disable_multiplexed_for_partitioned(self) -> None:
8397
"""Disables the use of multiplexed sessions for read-only transactions."""
98+
99+
current_span = get_current_span()
100+
add_span_event(
101+
current_span,
102+
"Disabling use of multiplexed session for partitioned transactions",
103+
)
104+
84105
self._is_multiplexed_enabled_for_partitioned = False
85106

86107
def use_multiplexed_for_read_write(self) -> bool:
@@ -101,12 +122,19 @@ def use_multiplexed_for_read_write(self) -> bool:
101122

102123
def disable_multiplexed_for_read_write(self) -> None:
103124
"""Disables the use of multiplexed sessions for read/write transactions."""
125+
126+
current_span = get_current_span()
127+
add_span_event(
128+
current_span,
129+
"Disabling use of multiplexed session for read/write transactions",
130+
)
131+
104132
self._is_multiplexed_enabled_for_read_write = False
105133

106134
@staticmethod
107135
def _getenv(name: str) -> bool:
108136
"""Returns the value of the given environment variable as a boolean.
109137
True values are '1' and 'true' (case-insensitive); all other values are considered false.
110138
"""
111-
env_var = os.getenv(name, "").lower()
139+
env_var = os.getenv(name, "").lower().strip()
112140
return env_var in ["1", "true"]

0 commit comments

Comments
 (0)