1111# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212# See the License for the specific language governing permissions and
1313# limitations under the License.
14+ from enum import Enum
15+ from os import getenv
1416from datetime import timedelta
1517from threading import Event , Lock , Thread
1618from time import sleep , time
1719from typing import Optional
1820from weakref import ref
1921
2022from google .cloud .spanner_v1 .session import Session
21- from google .cloud .spanner_v1 .session_options import TransactionType
2223from google .cloud .spanner_v1 ._opentelemetry_tracing import (
2324 get_current_span ,
2425 add_span_event ,
2526)
2627
2728
29+ class TransactionType (Enum ):
30+ """Transaction types for session options."""
31+
32+ READ_ONLY = "read-only"
33+ PARTITIONED = "partitioned"
34+ READ_WRITE = "read/write"
35+
36+
2837class DatabaseSessionsManager (object ):
2938 """Manages sessions for a Cloud Spanner database.
3039
3140 Sessions can be checked out from the database session manager for a specific
3241 transaction type using :meth:`get_session`, and returned to the session manager
3342 using :meth:`put_session`.
3443
35- The sessions returned by the session manager depend on the client's session options
36- (see :class:`~google.cloud.spanner_v1.session_options.SessionOptions`) and the
37- provided session pool (see :class:`~google.cloud.spanner_v1.pool.AbstractSessionPool`).
44+ The sessions returned by the session manager depend on the configured environment variables
45+ and the provided session pool (see :class:`~google.cloud.spanner_v1.pool.AbstractSessionPool`).
3846
3947 :type database: :class:`~google.cloud.spanner_v1.database.Database`
4048 :param database: The database to manage sessions for.
@@ -43,6 +51,13 @@ class DatabaseSessionsManager(object):
4351 :param pool: The pool to get non-multiplexed sessions from.
4452 """
4553
54+ # Environment variables for multiplexed sessions
55+ _ENV_VAR_MULTIPLEXED = "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS"
56+ _ENV_VAR_MULTIPLEXED_PARTITIONED = (
57+ "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS"
58+ )
59+ _ENV_VAR_MULTIPLEXED_READ_WRITE = "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW"
60+
4661 # Intervals for the maintenance thread to check and refresh the multiplexed session.
4762 _MAINTENANCE_THREAD_POLLING_INTERVAL = timedelta (minutes = 10 )
4863 _MAINTENANCE_THREAD_REFRESH_INTERVAL = timedelta (days = 7 )
@@ -55,13 +70,14 @@ def __init__(self, database, pool):
5570 # database session manager is created, a maintenance thread is initialized to
5671 # periodically delete and recreate the multiplexed session so that it remains
5772 # valid. Because of this concurrency, we need to use a lock whenever we access
58- # the multiplexed session to avoid any race conditions. We also create an event
59- # so that the thread can terminate if the use of multiplexed session has been
60- # disabled for all transactions.
73+ # the multiplexed session to avoid any race conditions.
6174 self ._multiplexed_session : Optional [Session ] = None
6275 self ._multiplexed_session_thread : Optional [Thread ] = None
6376 self ._multiplexed_session_lock : Lock = Lock ()
64- self ._multiplexed_session_disabled_event : Event = Event ()
77+
78+ # Event to terminate the maintenance thread.
79+ # Only used for testing purposes.
80+ self ._multiplexed_session_terminate_event : Event = Event ()
6581
6682 def get_session (self , transaction_type : TransactionType ) -> Session :
6783 """Returns a session for the given transaction type from the database session manager.
@@ -70,8 +86,7 @@ def get_session(self, transaction_type: TransactionType) -> Session:
7086 :returns: a session for the given transaction type.
7187 """
7288
73- session_options = self ._database ._instance ._client ._session_options
74- use_multiplexed = session_options .use_multiplexed (transaction_type )
89+ use_multiplexed = self ._use_multiplexed (transaction_type )
7590
7691 # TODO multiplexed: enable for read/write transactions
7792 if use_multiplexed and transaction_type == TransactionType .READ_WRITE :
@@ -149,15 +164,6 @@ def _build_multiplexed_session(self) -> Session:
149164
150165 return session
151166
152- def _disable_multiplexed_sessions (self ) -> None :
153- """Disables multiplexed sessions for all transactions."""
154-
155- self ._multiplexed_session = None
156- self ._multiplexed_session_disabled_event .set ()
157-
158- session_options = self ._database ._instance ._client ._session_options
159- session_options .disable_multiplexed (self ._database .logger )
160-
161167 def _build_maintenance_thread (self ) -> Thread :
162168 """Builds and returns a multiplexed session maintenance thread for
163169 the database session manager. This thread will periodically delete
@@ -185,45 +191,99 @@ def _maintain_multiplexed_session(session_manager_ref) -> None:
185191
186192 This method will delete and recreate the referenced database session manager's
187193 multiplexed session to ensure that it is always valid. The method will run until
188- the database session manager is deleted, the multiplexed session is deleted, or
189- building a multiplexed session fails.
194+ the database session manager is deleted or the multiplexed session is deleted.
190195
191196 :type session_manager_ref: :class:`_weakref.ReferenceType`
192197 :param session_manager_ref: A weak reference to the database session manager.
193198 """
194199
195- session_manager = session_manager_ref ()
196- if session_manager is None :
200+ manager = session_manager_ref ()
201+ if manager is None :
197202 return
198203
199204 polling_interval_seconds = (
200- session_manager ._MAINTENANCE_THREAD_POLLING_INTERVAL .total_seconds ()
205+ manager ._MAINTENANCE_THREAD_POLLING_INTERVAL .total_seconds ()
201206 )
202207 refresh_interval_seconds = (
203- session_manager ._MAINTENANCE_THREAD_REFRESH_INTERVAL .total_seconds ()
208+ manager ._MAINTENANCE_THREAD_REFRESH_INTERVAL .total_seconds ()
204209 )
205210
206211 session_created_time = time ()
207212
208213 while True :
209214 # Terminate the thread is the database session manager has been deleted.
210- session_manager = session_manager_ref ()
211- if session_manager is None :
215+ manager = session_manager_ref ()
216+ if manager is None :
212217 return
213218
214- # Terminate the thread if the use of multiplexed sessions has been disabled .
215- if session_manager . _multiplexed_session_disabled_event .is_set ():
219+ # Terminate the thread if corresponding event is set .
220+ if manager . _multiplexed_session_terminate_event .is_set ():
216221 return
217222
218223 # Wait for until the refresh interval has elapsed.
219224 if time () - session_created_time < refresh_interval_seconds :
220225 sleep (polling_interval_seconds )
221226 continue
222227
223- with session_manager ._multiplexed_session_lock :
224- session_manager ._multiplexed_session .delete ()
225- session_manager ._multiplexed_session = (
226- session_manager ._build_multiplexed_session ()
227- )
228+ with manager ._multiplexed_session_lock :
229+ manager ._multiplexed_session .delete ()
230+ manager ._multiplexed_session = manager ._build_multiplexed_session ()
228231
229232 session_created_time = time ()
233+
234+ @classmethod
235+ def _use_multiplexed (cls , transaction_type : TransactionType ) -> bool :
236+ """Returns whether to use multiplexed sessions for the given transaction type.
237+
238+ Multiplexed sessions are enabled for read-only transactions if:
239+ * _ENV_VAR_MULTIPLEXED is set to true.
240+
241+ Multiplexed sessions are enabled for partitioned transactions if:
242+ * _ENV_VAR_MULTIPLEXED is set to true; and
243+ * _ENV_VAR_MULTIPLEXED_PARTITIONED is set to true.
244+
245+ Multiplexed sessions are enabled for read/write transactions if:
246+ * _ENV_VAR_MULTIPLEXED is set to true; and
247+ * _ENV_VAR_MULTIPLEXED_READ_WRITE is set to true.
248+
249+ :type transaction_type: :class:`TransactionType`
250+ :param transaction_type: the type of transaction
251+
252+ :rtype: bool
253+ :returns: True if multiplexed sessions should be used for the given transaction
254+ type, False otherwise.
255+
256+ :raises ValueError: if the transaction type is not supported.
257+ """
258+
259+ if transaction_type is TransactionType .READ_ONLY :
260+ return cls ._getenv (cls ._ENV_VAR_MULTIPLEXED )
261+
262+ elif transaction_type is TransactionType .PARTITIONED :
263+ return cls ._getenv (cls ._ENV_VAR_MULTIPLEXED ) and cls ._getenv (
264+ cls ._ENV_VAR_MULTIPLEXED_PARTITIONED
265+ )
266+
267+ elif transaction_type is TransactionType .READ_WRITE :
268+ return cls ._getenv (cls ._ENV_VAR_MULTIPLEXED ) and cls ._getenv (
269+ cls ._ENV_VAR_MULTIPLEXED_READ_WRITE
270+ )
271+
272+ raise ValueError (f"Transaction type { transaction_type } is not supported." )
273+
274+ @classmethod
275+ def _getenv (cls , env_var_name : str ) -> bool :
276+ """Returns the value of the given environment variable as a boolean.
277+
278+ True values are '1' and 'true' (case-insensitive).
279+ All other values are considered false.
280+
281+ :type env_var_name: str
282+ :param env_var_name: the name of the boolean environment variable
283+
284+ :rtype: bool
285+ :returns: True if the environment variable is set to a true value, False otherwise.
286+ """
287+
288+ env_var_value = getenv (env_var_name , "" ).lower ().strip ()
289+ return env_var_value in ["1" , "true" ]
0 commit comments