1111# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212# See the License for the specific language governing permissions and
1313# limitations under the License.
14+ import datetime
15+ import threading
16+ import time
17+ import weakref
18+
19+ from google .api_core .exceptions import MethodNotImplemented
20+
1421from google .cloud .spanner_v1 ._opentelemetry_tracing import (
1522 get_current_span ,
1623 add_span_event ,
@@ -34,27 +41,45 @@ class DatabaseSessionsManager(object):
3441
3542 :type pool: :class:`~google.cloud.spanner_v1.pool.AbstractSessionPool`
3643 :param pool: The pool to get non-multiplexed sessions from.
44+
45+ :type logger: :class:`logging.Logger`
46+ :param logger: Logger for the database session manager.
3747 """
3848
39- def __init__ (self , database , pool ):
49+ # Intervals for the maintenance thread to check and refresh the multiplexed session.
50+ _MAINTENANCE_THREAD_POLLING_INTERVAL = datetime .timedelta (hours = 1 )
51+ _MAINTENANCE_THREAD_REFRESH_INTERVAL = datetime .timedelta (days = 7 )
52+
53+ def __init__ (self , database , pool , logger ):
4054 self ._database = database
55+ self ._logger = logger
56+
57+ # The session pool manages non-multiplexed sessions, and
58+ # will only be used if multiplexed sessions are not enabled.
4159 self ._pool = pool
4260
61+ # Declare multiplexed session attributes. When a multiplexed session for the
62+ # database session manager is created, a maintenance thread is initialized to
63+ # periodically delete and recreate the multiplexed session so that it remains
64+ # valid. Because of this concurrency, we need to use a lock whenever we access
65+ # the multiplexed session to avoid any race conditions. We also create an event
66+ # so that the thread can terminate if the use of multiplexed session has been
67+ # disabled for all transactions.
68+ self ._multiplexed_session = None
69+ self ._multiplexed_session_maintenance_thread = None
70+ self ._multiplexed_session_lock = threading .Lock ()
71+ self ._is_multiplexed_sessions_disabled_event = threading .Event ()
72+
4373 def get_session_for_read_only (self ) -> Session :
4474 """Returns a session for read-only transactions from the database session manager.
4575
4676 :rtype: :class:`~google.cloud.spanner_v1.session.Session`
4777 :returns: a session for read-only transactions.
4878 """
4979
50- if (
51- self ._database ._instance ._client .session_options .use_multiplexed_for_read_only ()
52- ):
53- raise NotImplementedError (
54- "Multiplexed sessions are not yet supported for read-only transactions."
55- )
56-
57- return self ._get_pooled_session ()
80+ return self ._get_session (
81+ use_multiplexed = self ._database ._instance ._client .session_options .use_multiplexed_for_read_only ()
82+ )
5883
5984 def get_session_for_partitioned (self ) -> Session :
6085 """Returns a session for partitioned transactions from the database session manager.
@@ -70,7 +95,7 @@ def get_session_for_partitioned(self) -> Session:
7095 "Multiplexed sessions are not yet supported for partitioned transactions."
7196 )
7297
73- return self ._get_pooled_session ( )
98+ return self ._get_session ( use_multiplexed = False )
7499
75100 def get_session_for_read_write (self ) -> Session :
76101 """Returns a session for read/write transactions from the database session manager.
@@ -86,15 +111,20 @@ def get_session_for_read_write(self) -> Session:
86111 "Multiplexed sessions are not yet supported for read/write transactions."
87112 )
88113
89- return self ._get_pooled_session ( )
114+ return self ._get_session ( use_multiplexed = False )
90115
91116 def put_session (self , session : Session ) -> None :
92- """Returns the session to the database session manager."""
117+ """Returns the session to the database session manager.
93118
94- if session .is_multiplexed :
95- raise NotImplementedError ("Multiplexed sessions are not yet supported." )
119+ :type session: :class:`~google.cloud.spanner_v1.session.Session`
120+ :param session: The session to return to the database session manager.
121+ """
96122
97- self ._pool .put (session )
123+ # No action is needed for multiplexed sessions: the session
124+ # pool is only used for managing non-multiplexed sessions,
125+ # since they can only process one transaction at a time.
126+ if not session .is_multiplexed :
127+ self ._pool .put (session )
98128
99129 current_span = get_current_span ()
100130 add_span_event (
@@ -103,14 +133,186 @@ def put_session(self, session: Session) -> None:
103133 {"id" : session .session_id , "multiplexed" : session .is_multiplexed },
104134 )
105135
106- def _get_pooled_session (self ):
107- """Returns a non-multiplexed session from the session pool."""
136+ def _get_session (self , use_multiplexed : bool ) -> Session :
137+ """Returns a session from the database session manager.
138+
139+ If use_multiplexed is True, returns a multiplexed session if
140+ multiplexed sessions are supported. If multiplexed sessions are
141+ not supported or if use_multiplexed is False, returns a non-
142+ multiplexed session from the session pool.
143+
144+ :type use_multiplexed: bool
145+ :param use_multiplexed: Whether to try to get a multiplexed session.
146+
147+ :rtype: :class:`~google.cloud.spanner_v1.session.Session`
148+ :returns: a session for the database session manager.
149+ """
150+
151+ if use_multiplexed :
152+ try :
153+ session = self ._get_multiplexed_session ()
154+
155+ # If multiplexed sessions are not supported, disable
156+ # them for all transactions and return a non-multiplexed session.
157+ except MethodNotImplemented :
158+ self ._disable_multiplexed_sessions ()
159+ session = self ._pool .get ()
160+
161+ else :
162+ session = self ._pool .get ()
108163
109- session = self ._pool .get ()
110164 add_span_event (
111165 get_current_span (),
112166 "Using session" ,
113167 {"id" : session .session_id , "multiplexed" : session .is_multiplexed },
114168 )
115169
116170 return session
171+
172+ def _get_multiplexed_session (self ) -> Session :
173+ """Returns a multiplexed session from the database session manager.
174+
175+ If the multiplexed session is not defined, creates a new multiplexed
176+ session and starts a maintenance thread to periodically delete and
177+ recreate it so that it remains valid. Otherwise, simply returns the
178+ current multiplexed session.
179+
180+ :raises MethodNotImplemented:
181+ if multiplexed sessions are not supported.
182+
183+ :rtype: :class:`~google.cloud.spanner_v1.session.Session`
184+ :returns: a multiplexed session.
185+ """
186+
187+ with self ._multiplexed_session_lock :
188+ if self ._multiplexed_session is None :
189+ self ._multiplexed_session = self ._build_multiplexed_session ()
190+
191+ # Build and start a thread to maintain the multiplexed session.
192+ self ._multiplexed_session_maintenance_thread = (
193+ self ._build_maintenance_thread ()
194+ )
195+ self ._multiplexed_session_maintenance_thread .start ()
196+
197+ add_span_event (
198+ get_current_span (),
199+ "Using session" ,
200+ {"id" : self ._multiplexed_session .session_id , "multiplexed" : True },
201+ )
202+
203+ return self ._multiplexed_session
204+
205+ def _build_multiplexed_session (self ) -> Session :
206+ """Builds and returns a new multiplexed session for the database session manager.
207+
208+ :raises MethodNotImplemented:
209+ if multiplexed sessions are not supported.
210+
211+ :rtype: :class:`~google.cloud.spanner_v1.session.Session`
212+ :returns: a new multiplexed session.
213+ """
214+
215+ session = Session (
216+ database = self ._database ,
217+ database_role = self ._database .database_role ,
218+ is_multiplexed = True ,
219+ )
220+
221+ session .create ()
222+
223+ self ._logger .info ("Created multiplexed session." )
224+
225+ return session
226+
227+ def _disable_multiplexed_sessions (self ) -> None :
228+ """Disables multiplexed sessions for all transactions."""
229+
230+ self ._logger .warning (
231+ "Multiplexed session creation failed. Disabling multiplexed sessions."
232+ )
233+
234+ session_options = self ._database ._instance ._client .session_options
235+ session_options .disable_multiplexed_for_read_only ()
236+ session_options .disable_multiplexed_for_partitioned ()
237+ session_options .disable_multiplexed_for_read_write ()
238+
239+ self ._multiplexed_session = None
240+ self ._is_multiplexed_sessions_disabled_event .set ()
241+
242+ def _build_maintenance_thread (self ) -> threading .Thread :
243+ """Builds and returns a multiplexed session maintenance thread for
244+ the database session manager. This thread will periodically delete
245+ and recreate the multiplexed session to ensure that it is always valid.
246+
247+ :rtype: :class:`threading.Thread`
248+ :returns: a multiplexed session maintenance thread.
249+ """
250+
251+ # Use a weak reference to the database session manager to avoid
252+ # creating a circular reference that would prevent the database
253+ # session manager from being garbage collected.
254+ session_manager_ref = weakref .ref (self )
255+
256+ return threading .Thread (
257+ target = self ._maintain_multiplexed_session ,
258+ name = f"maintenance-multiplexed-session-{ self ._multiplexed_session .name } " ,
259+ args = [session_manager_ref ],
260+ daemon = True ,
261+ )
262+
263+ @staticmethod
264+ def _maintain_multiplexed_session (session_manager_ref ) -> None :
265+ """Maintains the multiplexed session for the database session manager.
266+
267+ This method will delete and recreate the referenced database session manager's
268+ multiplexed session to ensure that it is always valid. The method will run until
269+ the database session manager is deleted, the multiplexed session is deleted, or
270+ building a multiplexed session fails.
271+
272+ :type session_manager_ref: :class:`_weakref.ReferenceType`
273+ :param session_manager_ref: A weak reference to the database session manager.
274+ """
275+
276+ session_manager = session_manager_ref ()
277+ if session_manager is None :
278+ return
279+
280+ polling_interval_seconds = (
281+ session_manager ._MAINTENANCE_THREAD_POLLING_INTERVAL .total_seconds ()
282+ )
283+ refresh_interval_seconds = (
284+ session_manager ._MAINTENANCE_THREAD_REFRESH_INTERVAL .total_seconds ()
285+ )
286+
287+ session_created_time = time .time ()
288+
289+ while True :
290+ # Terminate the thread is the database session manager has been deleted.
291+ session_manager = session_manager_ref ()
292+ if session_manager is None :
293+ return
294+
295+ # Terminate the thread if the use of multiplexed sessions has been disabled.
296+ if session_manager ._is_multiplexed_sessions_disabled_event .is_set ():
297+ return
298+
299+ # Wait for until the refresh interval has elapsed.
300+ if time .time () - session_created_time < refresh_interval_seconds :
301+ time .sleep (polling_interval_seconds )
302+ continue
303+
304+ with session_manager ._multiplexed_session_lock :
305+ session_manager ._multiplexed_session .delete ()
306+
307+ try :
308+ session_manager ._multiplexed_session = (
309+ session_manager ._build_multiplexed_session ()
310+ )
311+
312+ # Disable multiplexed sessions for all transactions and terminate
313+ # the thread if building a multiplexed session fails.
314+ except MethodNotImplemented :
315+ session_manager ._disable_multiplexed_sessions ()
316+ return
317+
318+ session_created_time = time .time ()
0 commit comments