This repository was archived by the owner on Mar 31, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 104
feat: Add support for multiplexed sessions - part 1 (supporting classes and refactoring) #1329
Merged
rahul2393
merged 1 commit into
googleapis:multiplexed-sessions
from
currantw:multiplexed_session_1
Apr 4, 2025
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,6 +40,7 @@ | |
| from google.cloud.spanner_admin_database_v1 import RestoreDatabaseRequest | ||
| from google.cloud.spanner_admin_database_v1 import UpdateDatabaseDdlRequest | ||
| from google.cloud.spanner_admin_database_v1.types import DatabaseDialect | ||
| from google.cloud.spanner_v1.database_sessions_manager import DatabaseSessionsManager | ||
| from google.cloud.spanner_v1.transaction import BatchTransactionId | ||
| from google.cloud.spanner_v1 import ExecuteSqlRequest | ||
| from google.cloud.spanner_v1 import Type | ||
|
|
@@ -59,7 +60,6 @@ | |
| from google.cloud.spanner_v1.keyset import KeySet | ||
| from google.cloud.spanner_v1.merged_result_set import MergedResultSet | ||
| from google.cloud.spanner_v1.pool import BurstyPool | ||
| from google.cloud.spanner_v1.pool import SessionCheckout | ||
| from google.cloud.spanner_v1.session import Session | ||
| from google.cloud.spanner_v1.snapshot import _restart_on_unavailable | ||
| from google.cloud.spanner_v1.snapshot import Snapshot | ||
|
|
@@ -70,7 +70,6 @@ | |
| from google.cloud.spanner_v1.table import Table | ||
| from google.cloud.spanner_v1._opentelemetry_tracing import ( | ||
| add_span_event, | ||
| get_current_span, | ||
| trace_call, | ||
| ) | ||
| from google.cloud.spanner_v1.metrics.metrics_capture import MetricsCapture | ||
|
|
@@ -191,10 +190,10 @@ def __init__( | |
|
|
||
| if pool is None: | ||
| pool = BurstyPool(database_role=database_role) | ||
|
|
||
| self._pool = pool | ||
| pool.bind(self) | ||
|
|
||
| self._session_manager = DatabaseSessionsManager(database=self, pool=pool) | ||
|
|
||
| @classmethod | ||
| def from_pb(cls, database_pb, instance, pool=None): | ||
| """Creates an instance of this class from a protobuf. | ||
|
|
@@ -708,7 +707,7 @@ def execute_pdml(): | |
| "CloudSpanner.Database.execute_partitioned_pdml", | ||
| observability_options=self.observability_options, | ||
| ) as span, MetricsCapture(): | ||
| with SessionCheckout(self._pool) as session: | ||
| with SessionCheckout(self) as session: | ||
| add_span_event(span, "Starting BeginTransaction") | ||
| txn = api.begin_transaction( | ||
| session=session.name, options=txn_options, metadata=metadata | ||
|
|
@@ -923,7 +922,7 @@ def run_in_transaction(self, func, *args, **kw): | |
| # Check out a session and run the function in a transaction; once | ||
| # done, flip the sanity check bit back. | ||
| try: | ||
| with SessionCheckout(self._pool) as session: | ||
| with SessionCheckout(self) as session: | ||
| return session.run_in_transaction(func, *args, **kw) | ||
| finally: | ||
| self._local.transaction_running = False | ||
|
|
@@ -1160,6 +1159,35 @@ def observability_options(self): | |
| return opts | ||
|
|
||
|
|
||
| class SessionCheckout(object): | ||
| """Context manager for using a session from a database. | ||
|
|
||
| :type database: :class:`~google.cloud.spanner_v1.database.Database` | ||
| :param database: database to use the session from | ||
| """ | ||
|
|
||
| _session = None # Not checked out until '__enter__'. | ||
|
|
||
| def __init__(self, database): | ||
| if not isinstance(database, Database): | ||
| raise TypeError( | ||
| "{class_name} must receive an instance of {expected_class_name}. Received: {actual_class_name}".format( | ||
| class_name=self.__class__.__name__, | ||
| expected_class_name=Database.__name__, | ||
| actual_class_name=database.__class__.__name__, | ||
| ) | ||
| ) | ||
|
|
||
| self._database = database | ||
|
|
||
| def __enter__(self): | ||
| self._session = self._database._session_manager.get_session_for_read_write() | ||
| return self._session | ||
|
|
||
| def __exit__(self, *ignored): | ||
| self._database._session_manager.put_session(self._session) | ||
|
|
||
|
|
||
| class BatchCheckout(object): | ||
| """Context manager for using a batch from a database. | ||
|
|
||
|
|
@@ -1194,6 +1222,15 @@ def __init__( | |
| isolation_level=TransactionOptions.IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED, | ||
| **kw, | ||
| ): | ||
| if not isinstance(database, Database): | ||
| raise TypeError( | ||
| "{class_name} must receive an instance of {expected_class_name}. Received: {actual_class_name}".format( | ||
| class_name=self.__class__.__name__, | ||
| expected_class_name=Database.__name__, | ||
| actual_class_name=database.__class__.__name__, | ||
| ) | ||
| ) | ||
|
Comment on lines
+1225
to
+1232
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For better error handling, added a type check to all |
||
|
|
||
| self._database = database | ||
| self._session = self._batch = None | ||
| if request_options is None: | ||
|
|
@@ -1209,10 +1246,8 @@ def __init__( | |
|
|
||
| def __enter__(self): | ||
| """Begin ``with`` block.""" | ||
| current_span = get_current_span() | ||
| session = self._session = self._database._pool.get() | ||
| add_span_event(current_span, "Using session", {"id": session.session_id}) | ||
| batch = self._batch = Batch(session) | ||
| self._session = self._database._session_manager.get_session_for_read_only() | ||
| batch = self._batch = Batch(self._session) | ||
| if self._request_options.transaction_tag: | ||
| batch.transaction_tag = self._request_options.transaction_tag | ||
| return batch | ||
|
|
@@ -1235,13 +1270,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): | |
| "CommitStats: {}".format(self._batch.commit_stats), | ||
| extra={"commit_stats": self._batch.commit_stats}, | ||
| ) | ||
| self._database._pool.put(self._session) | ||
| current_span = get_current_span() | ||
| add_span_event( | ||
| current_span, | ||
| "Returned session to pool", | ||
| {"id": self._session.session_id}, | ||
| ) | ||
| self._database._session_manager.put_session(self._session) | ||
|
|
||
|
|
||
| class MutationGroupsCheckout(object): | ||
|
|
@@ -1258,23 +1287,26 @@ class MutationGroupsCheckout(object): | |
| """ | ||
|
|
||
| def __init__(self, database): | ||
| if not isinstance(database, Database): | ||
| raise TypeError( | ||
| "{class_name} must receive an instance of {expected_class_name}. Received: {actual_class_name}".format( | ||
| class_name=self.__class__.__name__, | ||
| expected_class_name=Database.__name__, | ||
| actual_class_name=database.__class__.__name__, | ||
| ) | ||
| ) | ||
|
|
||
| self._database = database | ||
| self._session = None | ||
|
|
||
| def __enter__(self): | ||
| """Begin ``with`` block.""" | ||
| session = self._session = self._database._pool.get() | ||
| return MutationGroups(session) | ||
| self._session = self._database._session_manager.get_session_for_read_write() | ||
| return MutationGroups(self._session) | ||
|
|
||
| def __exit__(self, exc_type, exc_val, exc_tb): | ||
| """End ``with`` block.""" | ||
| if isinstance(exc_val, NotFound): | ||
| # If NotFound exception occurs inside the with block | ||
| # then we validate if the session still exists. | ||
| if not self._session.exists(): | ||
| self._session = self._database._pool._new_session() | ||
| self._session.create() | ||
|
Comment on lines
-1271
to
-1276
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This check has been removed from
We recommend removing this code, and instead depend on |
||
| self._database._pool.put(self._session) | ||
| self._database._session_manager.put_session(self._session) | ||
|
|
||
|
|
||
| class SnapshotCheckout(object): | ||
|
|
@@ -1296,24 +1328,27 @@ class SnapshotCheckout(object): | |
| """ | ||
|
|
||
| def __init__(self, database, **kw): | ||
| if not isinstance(database, Database): | ||
| raise TypeError( | ||
| "{class_name} must receive an instance of {expected_class_name}. Received: {actual_class_name}".format( | ||
| class_name=self.__class__.__name__, | ||
| expected_class_name=Database.__name__, | ||
| actual_class_name=database.__class__.__name__, | ||
| ) | ||
| ) | ||
|
|
||
| self._database = database | ||
| self._session = None | ||
| self._kw = kw | ||
|
|
||
| def __enter__(self): | ||
| """Begin ``with`` block.""" | ||
| session = self._session = self._database._pool.get() | ||
| return Snapshot(session, **self._kw) | ||
| self._session = self._database._session_manager.get_session_for_read_only() | ||
| return Snapshot(self._session, **self._kw) | ||
|
|
||
| def __exit__(self, exc_type, exc_val, exc_tb): | ||
| """End ``with`` block.""" | ||
| if isinstance(exc_val, NotFound): | ||
| # If NotFound exception occurs inside the with block | ||
| # then we validate if the session still exists. | ||
| if not self._session.exists(): | ||
| self._session = self._database._pool._new_session() | ||
| self._session.create() | ||
| self._database._pool.put(self._session) | ||
| self._database._session_manager.put_session(self._session) | ||
|
|
||
|
|
||
| class BatchSnapshot(object): | ||
|
|
||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SessionCheckouthas been moved frompool.pytodatabase.py, and updated so that the session is checked out from the database rather than from the pool.Furthermore,
SessionCheckout's constructor previous accepted arbitrary keyword arguments, which were passed to the pool'sgetmethod. WhileAbstractSessionPool.getdoes not describe any keyword arguments, bothFixedSizePool.getandPingingPool.getaccept an optionaltimeoutkeyword argument, which can be used to override the default timeout.Because
DatabaseSessionManagernow manages which session is returned, and it not guaranteed to return a session from the pool, we are suggesting removing the ability to provide arbitrary keyword arguments to the pool viaSessionCheckout.__init__.No keyword arguments are supported by the default pool implementation (
BurstyPool), and users are already able to specify their own default timeout forFixedSizePoolandPingingPool. Moreover, it is not clear to us where this functionality is even used: remaining uses ofSessionCheckoutdo not support these keyword arguments.Please let us know:
timeoutarguments toFixedSizePoolandPingingPool, for consistency?