Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Commit 0b6f5df

Browse files
committed
feat: Multiplexed sessions - Add test helper for multiplexed env vars.
Signed-off-by: Taylor Curran <taylor.curran@improving.com>
1 parent 607df64 commit 0b6f5df

File tree

4 files changed

+49
-44
lines changed

4 files changed

+49
-44
lines changed

tests/_helpers.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
import unittest
2+
from os import getenv
3+
24
import mock
35

46
from google.cloud.spanner_v1 import gapic_version
7+
from google.cloud.spanner_v1.session_options import TransactionType
58

69
LIB_VERSION = gapic_version.__version__
710

@@ -31,6 +34,33 @@
3134
_TEST_OT_EXPORTER = None
3235
_TEST_OT_PROVIDER_INITIALIZED = False
3336

37+
# Environment variables for enabling multiplexed sessions
38+
"GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS"
39+
ENV_VAR_ENABLE_MULTIPLEXED_FOR_PARTITIONED = (
40+
"GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS"
41+
)
42+
ENV_VAR_ENABLE_MULTIPLEXED_FOR_READ_WRITE = (
43+
"GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW"
44+
)
45+
46+
47+
def is_multiplexed_enabled(transaction_type: TransactionType) -> bool:
48+
"""Returns whether multiplexed sessions are enabled for the given transaction type."""
49+
50+
env_var = "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS"
51+
env_var_partitioned = "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS"
52+
env_var_read_write = "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW"
53+
54+
def _getenv(val: str) -> bool:
55+
return getenv(val, "false").lower() == "true"
56+
57+
if transaction_type is TransactionType.READ_ONLY:
58+
return _getenv(env_var)
59+
elif transaction_type is TransactionType.PARTITIONED:
60+
return _getenv(env_var) and _getenv(env_var_partitioned)
61+
else:
62+
return _getenv(env_var) and _getenv(env_var_read_write)
63+
3464

3565
def get_test_ot_exporter():
3666
global _TEST_OT_EXPORTER

tests/system/test_observability_options.py

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,15 @@
1616
from mock import PropertyMock, patch
1717

1818
from google.cloud.spanner_v1.session import Session
19+
from google.cloud.spanner_v1.session_options import TransactionType
1920
from . import _helpers
2021
from google.cloud.spanner_v1 import Client
2122
from google.api_core.exceptions import Aborted
2223
from google.auth.credentials import AnonymousCredentials
2324
from google.rpc import code_pb2
2425

26+
from .._helpers import is_multiplexed_enabled
27+
2528
HAS_OTEL_INSTALLED = False
2629

2730
try:
@@ -113,11 +116,7 @@ def test_propagation(enable_extended_tracing):
113116
gotNames = [span.name for span in from_inject_spans]
114117

115118
# Check if multiplexed sessions are enabled
116-
import os
117-
118-
multiplexed_enabled = (
119-
os.getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS", "").lower() == "true"
120-
)
119+
multiplexed_enabled = is_multiplexed_enabled(TransactionType.READ_ONLY)
121120

122121
# Determine expected session span name based on multiplexed sessions
123122
expected_session_span_name = (
@@ -213,13 +212,11 @@ def create_db_trace_exporter():
213212
reason="Tracing requires OpenTelemetry",
214213
)
215214
@patch.object(Session, "session_id", new_callable=PropertyMock)
216-
@patch.object(Session, "is_multiplexed", new_callable=PropertyMock)
217-
def test_transaction_abort_then_retry_spans(mock_session_multiplexed, mock_session_id):
215+
def test_transaction_abort_then_retry_spans(mock_session_id):
218216
from opentelemetry.trace.status import StatusCode
219217

220-
# Mock session properties for testing.
221-
mock_session_multiplexed.return_value = session_multiplexed = False
222218
mock_session_id.return_value = session_id = "session-id"
219+
multiplexed = is_multiplexed_enabled(TransactionType.READ_WRITE)
223220

224221
db, trace_exporter = create_db_trace_exporter()
225222

@@ -247,8 +244,8 @@ def select_in_txn(txn):
247244
("Waiting for a session to become available", {"kind": "BurstyPool"}),
248245
("No sessions available in pool. Creating session", {"kind": "BurstyPool"}),
249246
("Creating Session", {}),
250-
("Using session", {"id": session_id, "multiplexed": session_multiplexed}),
251-
("Returning session", {"id": session_id, "multiplexed": session_multiplexed}),
247+
("Using session", {"id": session_id, "multiplexed": multiplexed}),
248+
("Returning session", {"id": session_id, "multiplexed": multiplexed}),
252249
(
253250
"Transaction was aborted in user operation, retrying",
254251
{"delay_seconds": "EPHEMERAL", "cause": "EPHEMERAL", "attempt": 1},
@@ -417,7 +414,6 @@ def tx_update(txn):
417414
reason="Tracing requires OpenTelemetry",
418415
)
419416
def test_database_partitioned_error():
420-
import os
421417
from opentelemetry.trace.status import StatusCode
422418

423419
db, trace_exporter = create_db_trace_exporter()
@@ -428,12 +424,9 @@ def test_database_partitioned_error():
428424
pass
429425

430426
got_statuses, got_events = finished_spans_statuses(trace_exporter)
427+
multiplexed_enabled = is_multiplexed_enabled(TransactionType.PARTITIONED)
431428

432-
multiplexed_partitioned_enabled = (
433-
os.getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS") == "true"
434-
)
435-
436-
if multiplexed_partitioned_enabled:
429+
if multiplexed_enabled:
437430
expected_event_names = [
438431
"Creating Session",
439432
"Using session",
@@ -496,7 +489,7 @@ def test_database_partitioned_error():
496489

497490
expected_session_span_name = (
498491
"CloudSpanner.CreateMultiplexedSession"
499-
if multiplexed_partitioned_enabled
492+
if multiplexed_enabled
500493
else "CloudSpanner.CreateSession"
501494
)
502495
want_statuses = [

tests/system/test_session_api.py

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from google.cloud.spanner_admin_database_v1 import DatabaseDialect
3030
from google.cloud._helpers import UTC
3131
from google.cloud.spanner_v1.data_types import JsonObject
32+
from google.cloud.spanner_v1.session_options import TransactionType
3233
from .testdata import singer_pb2
3334
from tests import _helpers as ot_helpers
3435
from . import _helpers
@@ -37,7 +38,7 @@
3738
REQ_RAND_PROCESS_ID,
3839
parse_request_id,
3940
)
40-
41+
from .._helpers import is_multiplexed_enabled
4142

4243
SOME_DATE = datetime.date(2011, 1, 17)
4344
SOME_TIME = datetime.datetime(1989, 1, 17, 17, 59, 12, 345612)
@@ -430,8 +431,6 @@ def test_session_crud(sessions_database):
430431

431432

432433
def test_batch_insert_then_read(sessions_database, ot_exporter):
433-
import os
434-
435434
db_name = sessions_database.name
436435
sd = _sample_data
437436

@@ -453,10 +452,7 @@ def test_batch_insert_then_read(sessions_database, ot_exporter):
453452
nth_req0 = sampling_req_id[-2]
454453

455454
db = sessions_database
456-
457-
multiplexed_enabled = (
458-
os.getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS", "").lower() == "true"
459-
)
455+
multiplexed_enabled = is_multiplexed_enabled(TransactionType.READ_ONLY)
460456

461457
# [A] Verify batch checkout spans
462458
# -------------------------------
@@ -690,12 +686,7 @@ def transaction_work(transaction):
690686
assert rows == []
691687

692688
if ot_exporter is not None:
693-
import os
694-
695-
multiplexed_enabled = (
696-
os.getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS", "").lower() == "true"
697-
)
698-
689+
multiplexed_enabled = is_multiplexed_enabled(TransactionType.READ_ONLY)
699690
span_list = ot_exporter.get_finished_spans()
700691
got_span_names = [span.name for span in span_list]
701692

@@ -3332,17 +3323,13 @@ def test_interval_array_cast(transaction):
33323323

33333324

33343325
def test_session_id_and_multiplexed_flag_behavior(sessions_database, ot_exporter):
3335-
import os
3336-
33373326
sd = _sample_data
33383327

33393328
with sessions_database.batch() as batch:
33403329
batch.delete(sd.TABLE, sd.ALL)
33413330
batch.insert(sd.TABLE, sd.COLUMNS, sd.ROW_DATA)
33423331

3343-
multiplexed_enabled = (
3344-
os.getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS", "").lower() == "true"
3345-
)
3332+
multiplexed_enabled = is_multiplexed_enabled(TransactionType.READ_ONLY)
33463333

33473334
snapshot1_session_id = None
33483335
snapshot2_session_id = None

tests/unit/test_database.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
from google.cloud.spanner_v1.session import Session
3939
from google.cloud.spanner_v1.session_options import SessionOptions, TransactionType
4040
from tests._builders import build_spanner_api
41+
from tests._helpers import is_multiplexed_enabled
4142

4243
DML_WO_PARAM = """
4344
DELETE FROM citizens
@@ -1527,7 +1528,6 @@ def test_session_factory_w_labels(self):
15271528
self.assertEqual(session.labels, labels)
15281529

15291530
def test_snapshot_defaults(self):
1530-
import os
15311531
from google.cloud.spanner_v1.database import SnapshotCheckout
15321532
from google.cloud.spanner_v1.snapshot import Snapshot
15331533

@@ -1539,9 +1539,7 @@ def test_snapshot_defaults(self):
15391539
database = self._make_one(self.DATABASE_ID, instance, pool=pool)
15401540

15411541
# Check if multiplexed sessions are enabled for read operations
1542-
multiplexed_enabled = (
1543-
os.getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS") == "true"
1544-
)
1542+
multiplexed_enabled = is_multiplexed_enabled(TransactionType.READ_ONLY)
15451543

15461544
if multiplexed_enabled:
15471545
# When multiplexed sessions are enabled, configure the sessions manager
@@ -1575,7 +1573,6 @@ def test_snapshot_defaults(self):
15751573

15761574
def test_snapshot_w_read_timestamp_and_multi_use(self):
15771575
import datetime
1578-
import os
15791576
from google.cloud._helpers import UTC
15801577
from google.cloud.spanner_v1.database import SnapshotCheckout
15811578
from google.cloud.spanner_v1.snapshot import Snapshot
@@ -1589,9 +1586,7 @@ def test_snapshot_w_read_timestamp_and_multi_use(self):
15891586
database = self._make_one(self.DATABASE_ID, instance, pool=pool)
15901587

15911588
# Check if multiplexed sessions are enabled for read operations
1592-
multiplexed_enabled = (
1593-
os.getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS") == "true"
1594-
)
1589+
multiplexed_enabled = is_multiplexed_enabled(TransactionType.READ_ONLY)
15951590

15961591
if multiplexed_enabled:
15971592
# When multiplexed sessions are enabled, configure the sessions manager

0 commit comments

Comments
 (0)