This repository was archived by the owner on Mar 31, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 104
Expand file tree
/
Copy pathdatabase_sessions_manager.py
More file actions
242 lines (209 loc) · 10.1 KB
/
database_sessions_manager.py
File metadata and controls
242 lines (209 loc) · 10.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# Copyright 2024 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Manage sessions for a database."""
__CROSS_SYNC_OUTPUT__ = "google.cloud.spanner_v1.database_sessions_manager"
import asyncio
from datetime import timedelta
from enum import Enum
from os import getenv
import threading
from threading import Thread
from typing import Optional
from weakref import ref
from google.cloud.aio._cross_sync import CrossSync
from google.cloud.spanner_v1._async.session import Session
from google.cloud.spanner_v1._opentelemetry_tracing import (
add_span_event,
get_current_span,
)
class TransactionType(Enum):
"""Transaction types for session options."""
READ_ONLY = "read-only"
PARTITIONED = "partitioned"
READ_WRITE = "read/write"
@CrossSync.convert_class
class DatabaseSessionsManager(object):
"""Manages sessions for a Cloud Spanner database.
Sessions can be checked out from the database session manager for a specific
transaction type using :meth:`get_session`, and returned to the session manager
using :meth:`put_session`.
The sessions returned by the session manager depend on the configured environment variables
and the provided session pool (see :class:`~google.cloud.spanner_v1.pool.AbstractSessionPool`).
:type database: :class:`~google.cloud.spanner_v1.database.Database`
:param database: The database to manage sessions for.
:type pool: :class:`~google.cloud.spanner_v1.pool.AbstractSessionPool`
:param pool: The pool to get non-multiplexed sessions from.
"""
_ENV_VAR_MULTIPLEXED = "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS"
_ENV_VAR_MULTIPLEXED_PARTITIONED = (
"GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS"
)
_ENV_VAR_MULTIPLEXED_READ_WRITE = "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW"
_MAINTENANCE_THREAD_POLLING_INTERVAL = timedelta(minutes=10)
_MAINTENANCE_THREAD_REFRESH_INTERVAL = timedelta(days=7)
def __init__(self, database, pool):
self._database = database
self._pool = pool
self._multiplexed_session: Optional[Session] = None
self._multiplexed_session_thread: Optional[CrossSync.Task] = None
self._init_lock = threading.Lock()
self._multiplexed_session_lock: Optional[CrossSync.Lock] = None
self._multiplexed_session_terminate_event: Optional[CrossSync.Event] = None
@CrossSync.convert
async def get_session(self, transaction_type: TransactionType) -> Session:
"""Returns a session for the given transaction type from the database session manager.
:rtype: :class:`~google.cloud.spanner_v1.session.Session`
:returns: a session for the given transaction type."""
session = (
await self._get_multiplexed_session()
if self._use_multiplexed(transaction_type)
or self._database._experimental_host is not None
else await CrossSync.run_if_async(self._pool.get)
)
add_span_event(
get_current_span(),
"Using session",
{"id": session.session_id, "multiplexed": session.is_multiplexed},
)
return session
@CrossSync.convert
async def put_session(self, session: Session) -> None:
"""Returns the session to the database session manager.
:type session: :class:`~google.cloud.spanner_v1.session.Session`
:param session: The session to return to the database session manager."""
add_span_event(
get_current_span(),
"Returning session",
{"id": session.session_id, "multiplexed": session.is_multiplexed},
)
if not session.is_multiplexed:
await CrossSync.run_if_async(self._pool.put, session)
@CrossSync.convert
async def _get_multiplexed_session(self) -> Session:
"""Returns a multiplexed session from the database session manager.
If the multiplexed session is not defined, creates a new multiplexed
session and starts a maintenance thread to periodically delete and
recreate it so that it remains valid. Otherwise, simply returns the
current multiplexed session.
:rtype: :class:`~google.cloud.spanner_v1.session.Session`
:returns: a multiplexed session."""
with self._init_lock:
if self._multiplexed_session_lock is None:
self._multiplexed_session_lock = CrossSync.Lock()
if self._multiplexed_session_terminate_event is None:
self._multiplexed_session_terminate_event = CrossSync.Event()
async with self._multiplexed_session_lock:
if self._multiplexed_session is None:
self._multiplexed_session = await self._build_multiplexed_session()
self._multiplexed_session_thread = self._build_maintenance_thread()
if not CrossSync.is_async:
self._multiplexed_session_thread.start()
return self._multiplexed_session
@CrossSync.convert
async def _build_multiplexed_session(self) -> Session:
"""Builds and returns a new multiplexed session for the database session manager.
:rtype: :class:`~google.cloud.spanner_v1.session.Session`
:returns: a new multiplexed session."""
session = Session(
database=self._database,
database_role=self._database.database_role,
is_multiplexed=True,
)
await session.create()
return session
def _build_maintenance_thread(self) -> CrossSync.Task:
"""Builds and returns a multiplexed session maintenance thread for
the database session manager. This thread will periodically delete
and recreate the multiplexed session to ensure that it is always valid.
:rtype: :class:`CrossSync.Task`
:returns: a multiplexed session maintenance thread."""
session_manager_ref = ref(self)
if CrossSync.is_async:
return CrossSync.create_task(
self._maintain_multiplexed_session, session_manager_ref
)
else:
return Thread(
target=self._maintain_multiplexed_session,
name=f"maintenance-multiplexed-session-{self._multiplexed_session.session_id}",
args=[session_manager_ref],
daemon=True,
)
@staticmethod
@CrossSync.convert
async def _maintain_multiplexed_session(session_manager_ref) -> None:
"""Maintains the multiplexed session for the database session manager.
This method will delete and recreate the referenced database session manager's
multiplexed session to ensure that it is always valid. The method will run until
the database session manager is deleted or the multiplexed session is deleted.
:type session_manager_ref: :class:`_weakref.ReferenceType`
:param session_manager_ref: A weak reference to the database session manager."""
manager = session_manager_ref()
if manager is None:
return
polling_interval_seconds = (
manager._MAINTENANCE_THREAD_POLLING_INTERVAL.total_seconds()
)
refresh_interval_seconds = (
manager._MAINTENANCE_THREAD_REFRESH_INTERVAL.total_seconds()
)
from time import time
session_created_time = time()
while True:
manager = session_manager_ref()
if manager is None:
return
if manager._multiplexed_session_terminate_event.is_set():
return
if time() - session_created_time < refresh_interval_seconds:
await CrossSync.sleep(polling_interval_seconds)
continue
async with manager._multiplexed_session_lock:
await CrossSync.run_if_async(manager._multiplexed_session.delete)
manager._multiplexed_session = (
await manager._build_multiplexed_session()
)
session_created_time = time()
@classmethod
def _use_multiplexed(cls, transaction_type: TransactionType) -> bool:
"""Returns whether to use multiplexed sessions for the given transaction type."""
if transaction_type is TransactionType.READ_ONLY:
return cls._getenv(cls._ENV_VAR_MULTIPLEXED)
elif transaction_type is TransactionType.PARTITIONED:
return cls._getenv(cls._ENV_VAR_MULTIPLEXED_PARTITIONED)
elif transaction_type is TransactionType.READ_WRITE:
return cls._getenv(cls._ENV_VAR_MULTIPLEXED_READ_WRITE)
raise ValueError(f"Transaction type {transaction_type} is not supported.")
@classmethod
def _getenv(cls, env_var_name: str) -> bool:
"""Returns the value of the given environment variable as a boolean."""
env_var_value = getenv(env_var_name, "true").lower().strip()
return env_var_value != "false"
@CrossSync.convert
async def close(self) -> None:
"""Closes the database session manager and stops all background tasks."""
if self._multiplexed_session_terminate_event is not None:
self._multiplexed_session_terminate_event.set()
if self._multiplexed_session_thread is not None:
if CrossSync.is_async:
self._multiplexed_session_thread.cancel()
try:
await self._multiplexed_session_thread
except CrossSync.rm_aio(asyncio.CancelledError):
pass
else:
self._multiplexed_session_thread.join()
if self._multiplexed_session is not None:
await self._multiplexed_session.delete()
self._multiplexed_session = None