Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion google/cloud/spanner_v1/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,9 @@ def __init__(
pool = BurstyPool(database_role=database_role)
pool.bind(self)

self._session_manager = DatabaseSessionsManager(database=self, pool=pool)
self._session_manager = DatabaseSessionsManager(
database=self, pool=pool, logger=self.logger
)

@classmethod
def from_pb(cls, database_pb, instance, pool=None):
Expand Down
238 changes: 220 additions & 18 deletions google/cloud/spanner_v1/database_sessions_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import threading
import time
import weakref

from google.api_core.exceptions import MethodNotImplemented

from google.cloud.spanner_v1._opentelemetry_tracing import (
get_current_span,
add_span_event,
Expand All @@ -34,27 +41,45 @@ class DatabaseSessionsManager(object):

:type pool: :class:`~google.cloud.spanner_v1.pool.AbstractSessionPool`
:param pool: The pool to get non-multiplexed sessions from.

:type logger: :class:`logging.Logger`
:param logger: Logger for the database session manager.
"""

def __init__(self, database, pool):
# Intervals for the maintenance thread to check and refresh the multiplexed session.
_MAINTENANCE_THREAD_POLLING_INTERVAL = datetime.timedelta(hours=1)
_MAINTENANCE_THREAD_REFRESH_INTERVAL = datetime.timedelta(days=7)

def __init__(self, database, pool, logger):
self._database = database
self._logger = logger

# The session pool manages non-multiplexed sessions, and
# will only be used if multiplexed sessions are not enabled.
self._pool = pool

# Declare multiplexed session attributes. When a multiplexed session for the
# database session manager is created, a maintenance thread is initialized to
# periodically delete and recreate the multiplexed session so that it remains
# valid. Because of this concurrency, we need to use a lock whenever we access
# the multiplexed session to avoid any race conditions. We also create an event
# so that the thread can terminate if the use of multiplexed session has been
# disabled for all transactions.
self._multiplexed_session = None
self._multiplexed_session_maintenance_thread = None
self._multiplexed_session_lock = threading.Lock()
self._is_multiplexed_sessions_disabled_event = threading.Event()

def get_session_for_read_only(self) -> Session:
"""Returns a session for read-only transactions from the database session manager.

:rtype: :class:`~google.cloud.spanner_v1.session.Session`
:returns: a session for read-only transactions.
"""

if (
self._database._instance._client.session_options.use_multiplexed_for_read_only()
):
raise NotImplementedError(
"Multiplexed sessions are not yet supported for read-only transactions."
)

return self._get_pooled_session()
return self._get_session(
use_multiplexed=self._database._instance._client.session_options.use_multiplexed_for_read_only()
)

def get_session_for_partitioned(self) -> Session:
"""Returns a session for partitioned transactions from the database session manager.
Expand All @@ -70,7 +95,7 @@ def get_session_for_partitioned(self) -> Session:
"Multiplexed sessions are not yet supported for partitioned transactions."
)

return self._get_pooled_session()
return self._get_session(use_multiplexed=False)

def get_session_for_read_write(self) -> Session:
"""Returns a session for read/write transactions from the database session manager.
Expand All @@ -86,15 +111,20 @@ def get_session_for_read_write(self) -> Session:
"Multiplexed sessions are not yet supported for read/write transactions."
)

return self._get_pooled_session()
return self._get_session(use_multiplexed=False)

def put_session(self, session: Session) -> None:
"""Returns the session to the database session manager."""
"""Returns the session to the database session manager.

if session.is_multiplexed:
raise NotImplementedError("Multiplexed sessions are not yet supported.")
:type session: :class:`~google.cloud.spanner_v1.session.Session`
:param session: The session to return to the database session manager.
"""

self._pool.put(session)
# No action is needed for multiplexed sessions: the session
# pool is only used for managing non-multiplexed sessions,
# since they can only process one transaction at a time.
if not session.is_multiplexed:
self._pool.put(session)

current_span = get_current_span()
add_span_event(
Expand All @@ -103,14 +133,186 @@ def put_session(self, session: Session) -> None:
{"id": session.session_id, "multiplexed": session.is_multiplexed},
)

def _get_pooled_session(self):
"""Returns a non-multiplexed session from the session pool."""
def _get_session(self, use_multiplexed: bool) -> Session:
"""Returns a session from the database session manager.

If use_multiplexed is True, returns a multiplexed session if
multiplexed sessions are supported. If multiplexed sessions are
not supported or if use_multiplexed is False, returns a non-
multiplexed session from the session pool.

:type use_multiplexed: bool
:param use_multiplexed: Whether to try to get a multiplexed session.

:rtype: :class:`~google.cloud.spanner_v1.session.Session`
:returns: a session for the database session manager.
"""

if use_multiplexed:
try:
session = self._get_multiplexed_session()

# If multiplexed sessions are not supported, disable
# them for all transactions and return a non-multiplexed session.
except MethodNotImplemented:
self._disable_multiplexed_sessions()
session = self._pool.get()

else:
session = self._pool.get()

session = self._pool.get()
add_span_event(
get_current_span(),
"Using session",
{"id": session.session_id, "multiplexed": session.is_multiplexed},
)

return session

def _get_multiplexed_session(self) -> Session:
"""Returns a multiplexed session from the database session manager.

If the multiplexed session is not defined, creates a new multiplexed
session and starts a maintenance thread to periodically delete and
recreate it so that it remains valid. Otherwise, simply returns the
current multiplexed session.

:raises MethodNotImplemented:
if multiplexed sessions are not supported.

:rtype: :class:`~google.cloud.spanner_v1.session.Session`
:returns: a multiplexed session.
"""

with self._multiplexed_session_lock:
if self._multiplexed_session is None:
self._multiplexed_session = self._build_multiplexed_session()

# Build and start a thread to maintain the multiplexed session.
self._multiplexed_session_maintenance_thread = (
self._build_maintenance_thread()
)
self._multiplexed_session_maintenance_thread.start()

add_span_event(
get_current_span(),
"Using session",
{"id": self._multiplexed_session.session_id, "multiplexed": True},
)

return self._multiplexed_session

def _build_multiplexed_session(self) -> Session:
"""Builds and returns a new multiplexed session for the database session manager.

:raises MethodNotImplemented:
if multiplexed sessions are not supported.

:rtype: :class:`~google.cloud.spanner_v1.session.Session`
:returns: a new multiplexed session.
"""

session = Session(
database=self._database,
database_role=self._database.database_role,
is_multiplexed=True,
)

session.create()

self._logger.info("Created multiplexed session.")

return session

def _disable_multiplexed_sessions(self) -> None:
"""Disables multiplexed sessions for all transactions."""

self._logger.warning(
"Multiplexed session creation failed. Disabling multiplexed sessions."
)

session_options = self._database._instance._client.session_options
session_options.disable_multiplexed_for_read_only()
session_options.disable_multiplexed_for_partitioned()
session_options.disable_multiplexed_for_read_write()

self._multiplexed_session = None
self._is_multiplexed_sessions_disabled_event.set()

def _build_maintenance_thread(self) -> threading.Thread:
"""Builds and returns a multiplexed session maintenance thread for
the database session manager. This thread will periodically delete
and recreate the multiplexed session to ensure that it is always valid.

:rtype: :class:`threading.Thread`
:returns: a multiplexed session maintenance thread.
"""

# Use a weak reference to the database session manager to avoid
# creating a circular reference that would prevent the database
# session manager from being garbage collected.
session_manager_ref = weakref.ref(self)

return threading.Thread(
target=self._maintain_multiplexed_session,
name=f"maintenance-multiplexed-session-{self._multiplexed_session.name}",
args=[session_manager_ref],
daemon=True,
)

@staticmethod
def _maintain_multiplexed_session(session_manager_ref) -> None:
"""Maintains the multiplexed session for the database session manager.

This method will delete and recreate the referenced database session manager's
multiplexed session to ensure that it is always valid. The method will run until
the database session manager is deleted, the multiplexed session is deleted, or
building a multiplexed session fails.

:type session_manager_ref: :class:`_weakref.ReferenceType`
:param session_manager_ref: A weak reference to the database session manager.
"""

session_manager = session_manager_ref()
if session_manager is None:
return

polling_interval_seconds = (
session_manager._MAINTENANCE_THREAD_POLLING_INTERVAL.total_seconds()
)
refresh_interval_seconds = (
session_manager._MAINTENANCE_THREAD_REFRESH_INTERVAL.total_seconds()
)

session_created_time = time.time()

while True:
# Terminate the thread is the database session manager has been deleted.
session_manager = session_manager_ref()
if session_manager is None:
return

# Terminate the thread if the use of multiplexed sessions has been disabled.
if session_manager._is_multiplexed_sessions_disabled_event.is_set():
return

# Wait for until the refresh interval has elapsed.
if time.time() - session_created_time < refresh_interval_seconds:
time.sleep(polling_interval_seconds)
continue

with session_manager._multiplexed_session_lock:
session_manager._multiplexed_session.delete()

try:
session_manager._multiplexed_session = (
session_manager._build_multiplexed_session()
)

# Disable multiplexed sessions for all transactions and terminate
# the thread if building a multiplexed session fails.
except MethodNotImplemented:
session_manager._disable_multiplexed_sessions()
return

session_created_time = time.time()
36 changes: 32 additions & 4 deletions google/cloud/spanner_v1/session_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import os

from google.cloud.spanner_v1._opentelemetry_tracing import (
get_current_span,
add_span_event,
)


class SessionOptions(object):
"""Represents the session options for the Cloud Spanner Python client.
Expand All @@ -22,9 +26,12 @@ class SessionOptions(object):
* read-only transactions (:meth:`use_multiplexed_for_read_only`)
* partitioned transactions (:meth:`use_multiplexed_for_partitioned`)
* read/write transactions (:meth:`use_multiplexed_for_read_write`).
"""

MULTIPLEXED_SESSIONS_REFRESH_INTERVAL = datetime.timedelta(days=7)
The use of multiplexed session can be disabled for corresponding transaction types by calling:
* :meth:`disable_multiplexed_for_read_only`
* :meth:`disable_multiplexed_for_partitioned`
* :meth:`disable_multiplexed_for_read_write`.
"""

# Environment variables for multiplexed sessions
ENV_VAR_ENABLE_MULTIPLEXED = "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS"
Expand Down Expand Up @@ -61,6 +68,13 @@ def use_multiplexed_for_read_only(self) -> bool:

def disable_multiplexed_for_read_only(self) -> None:
"""Disables the use of multiplexed sessions for read-only transactions."""

current_span = get_current_span()
add_span_event(
current_span,
"Disabling use of multiplexed session for read-only transactions",
)

self._is_multiplexed_enabled_for_read_only = False

def use_multiplexed_for_partitioned(self) -> bool:
Expand All @@ -81,6 +95,13 @@ def use_multiplexed_for_partitioned(self) -> bool:

def disable_multiplexed_for_partitioned(self) -> None:
"""Disables the use of multiplexed sessions for read-only transactions."""

current_span = get_current_span()
add_span_event(
current_span,
"Disabling use of multiplexed session for partitioned transactions",
)

self._is_multiplexed_enabled_for_partitioned = False

def use_multiplexed_for_read_write(self) -> bool:
Expand All @@ -101,12 +122,19 @@ def use_multiplexed_for_read_write(self) -> bool:

def disable_multiplexed_for_read_write(self) -> None:
"""Disables the use of multiplexed sessions for read/write transactions."""

current_span = get_current_span()
add_span_event(
current_span,
"Disabling use of multiplexed session for read/write transactions",
)

self._is_multiplexed_enabled_for_read_write = False

@staticmethod
def _getenv(name: str) -> bool:
"""Returns the value of the given environment variable as a boolean.
True values are '1' and 'true' (case-insensitive); all other values are considered false.
"""
env_var = os.getenv(name, "").lower()
env_var = os.getenv(name, "").lower().strip()
return env_var in ["1", "true"]
Loading