Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Commit 4817e5e

Browse files
committed
feat: Multiplexed sessions - Update BatchSnapshot to use database session manager.
Signed-off-by: Taylor Curran <taylor.curran@improving.com>
1 parent e7f1f70 commit 4817e5e

File tree

2 files changed

+42
-31
lines changed

2 files changed

+42
-31
lines changed

google/cloud/spanner_v1/database.py

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1481,11 +1481,15 @@ def from_dict(cls, database, mapping):
14811481
14821482
:rtype: :class:`BatchSnapshot`
14831483
"""
1484+
14841485
instance = cls(database)
1485-
session = instance._session = database.session()
1486-
session._session_id = mapping["session_id"]
1486+
1487+
session = instance._session = Session(database=database)
1488+
instance._session_id = session._session_id = mapping["session_id"]
1489+
14871490
snapshot = instance._snapshot = session.snapshot()
1488-
snapshot._transaction_id = mapping["transaction_id"]
1491+
instance._transaction_id = snapshot._transaction_id = mapping["transaction_id"]
1492+
14891493
return instance
14901494

14911495
def to_dict(self):
@@ -1516,28 +1520,39 @@ def _get_session(self):
15161520
all partitions have been processed.
15171521
"""
15181522
if self._session is None:
1519-
# Use sessions manager for partition operations
1520-
transaction_type = TransactionType.PARTITIONED
1521-
self._session = self._database.sessions_manager.get_session(
1522-
transaction_type
1523-
)
1523+
database = self._database
1524+
1525+
# If the session ID is not specified, check out a new session for
1526+
# partitioned transactions from the database session manager; otherwise,
1527+
# the session has already been checked out, so just create a session to
1528+
# represent it.
1529+
if self._session_id is None:
1530+
transaction_type = TransactionType.PARTITIONED
1531+
session = database.sessions_manager.get_session(transaction_type)
1532+
self._session_id = session.session_id
1533+
1534+
else:
1535+
session = Session(database=database)
1536+
session._session_id = self._session_id
15241537

1525-
if self._session_id is not None:
1526-
self._session._session_id = self._session_id
1538+
self._session = session
15271539

15281540
return self._session
15291541

15301542
def _get_snapshot(self):
15311543
"""Create snapshot if needed."""
1544+
15321545
if self._snapshot is None:
15331546
self._snapshot = self._get_session().snapshot(
15341547
read_timestamp=self._read_timestamp,
15351548
exact_staleness=self._exact_staleness,
15361549
multi_use=True,
15371550
transaction_id=self._transaction_id,
15381551
)
1552+
15391553
if self._transaction_id is None:
15401554
self._snapshot.begin()
1555+
15411556
return self._snapshot
15421557

15431558
def get_batch_transaction_id(self):

tests/unit/test_database.py

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@
3535
_metadata_with_request_id,
3636
)
3737
from google.cloud.spanner_v1.request_id_header import REQ_RAND_PROCESS_ID
38-
from google.cloud.spanner_v1.session_options import SessionOptions
38+
from google.cloud.spanner_v1.session import Session
39+
from google.cloud.spanner_v1.session_options import SessionOptions, TransactionType
40+
from tests._builders import build_spanner_api
3941

4042
DML_WO_PARAM = """
4143
DELETE FROM citizens
@@ -1509,8 +1511,6 @@ def test_execute_partitioned_dml_w_exclude_txn_from_change_streams(self):
15091511
)
15101512

15111513
def test_session_factory_defaults(self):
1512-
from google.cloud.spanner_v1.session import Session
1513-
15141514
client = _Client()
15151515
instance = _Instance(self.INSTANCE_NAME, client=client)
15161516
pool = _Pool()
@@ -1524,8 +1524,6 @@ def test_session_factory_defaults(self):
15241524
self.assertEqual(session.labels, {})
15251525

15261526
def test_session_factory_w_labels(self):
1527-
from google.cloud.spanner_v1.session import Session
1528-
15291527
client = _Client()
15301528
instance = _Instance(self.INSTANCE_NAME, client=client)
15311529
pool = _Pool()
@@ -2475,8 +2473,6 @@ def _make_database(**kwargs):
24752473

24762474
@staticmethod
24772475
def _make_session(**kwargs):
2478-
from google.cloud.spanner_v1.session import Session
2479-
24802476
return mock.create_autospec(Session, instance=True, **kwargs)
24812477

24822478
@staticmethod
@@ -2533,20 +2529,22 @@ def test_ctor_w_exact_staleness(self):
25332529
def test_from_dict(self):
25342530
klass = self._get_target_class()
25352531
database = self._make_database()
2536-
session = database.session.return_value = self._make_session()
2537-
snapshot = session.snapshot.return_value = self._make_snapshot()
2538-
api_repr = {
2539-
"session_id": self.SESSION_ID,
2540-
"transaction_id": self.TRANSACTION_ID,
2541-
}
2532+
api = database.spanner_api = build_spanner_api()
2533+
2534+
batch_txn = klass.from_dict(
2535+
database,
2536+
{
2537+
"session_id": self.SESSION_ID,
2538+
"transaction_id": self.TRANSACTION_ID,
2539+
},
2540+
)
25422541

2543-
batch_txn = klass.from_dict(database, api_repr)
25442542
self.assertIs(batch_txn._database, database)
2545-
self.assertIs(batch_txn._session, session)
2546-
self.assertEqual(session._session_id, self.SESSION_ID)
2547-
self.assertEqual(snapshot._transaction_id, self.TRANSACTION_ID)
2548-
snapshot.begin.assert_not_called()
2549-
self.assertIs(batch_txn._snapshot, snapshot)
2543+
self.assertEqual(batch_txn._session._session_id, self.SESSION_ID)
2544+
self.assertEqual(batch_txn._snapshot._transaction_id, self.TRANSACTION_ID)
2545+
2546+
api.create_session.assert_not_called()
2547+
api.begin_transaction.assert_not_called()
25502548

25512549
def test_to_dict(self):
25522550
database = self._make_database()
@@ -2574,8 +2572,6 @@ def test__get_session_new(self):
25742572
batch_txn = self._make_one(database)
25752573
self.assertIs(batch_txn._get_session(), session)
25762574
# Verify that sessions_manager.get_session was called with PARTITIONED transaction type
2577-
from google.cloud.spanner_v1.session_options import TransactionType
2578-
25792575
database.sessions_manager.get_session.assert_called_once_with(
25802576
TransactionType.PARTITIONED
25812577
)

0 commit comments

Comments
 (0)