Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Commit 72574d4

Browse files
committed
feat: Multiplexed sessions - Attempt to fix test_transaction_read_and_insert_then_rollback and add build_request_id helper method, fix test_snapshot and test_transaction failures.
Signed-off-by: Taylor Curran <taylor.curran@improving.com>
1 parent 36c9775 commit 72574d4

File tree

4 files changed

+163
-199
lines changed

4 files changed

+163
-199
lines changed

google/cloud/spanner_v1/request_id_header.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ def generate_rand_uint64():
3939
def with_request_id(
4040
client_id, channel_id, nth_request, attempt, other_metadata=[], span=None
4141
):
42-
req_id = f"{REQ_ID_VERSION}.{REQ_RAND_PROCESS_ID}.{client_id}.{channel_id}.{nth_request}.{attempt}"
42+
req_id = build_request_id(client_id, channel_id, nth_request, attempt)
4343
all_metadata = (other_metadata or []).copy()
4444
all_metadata.append((REQ_ID_HEADER_KEY, req_id))
4545

@@ -49,6 +49,10 @@ def with_request_id(
4949
return all_metadata
5050

5151

52+
def build_request_id(client_id, channel_id, nth_request, attempt):
53+
return f"{REQ_ID_VERSION}.{REQ_RAND_PROCESS_ID}.{client_id}.{channel_id}.{nth_request}.{attempt}"
54+
55+
5256
def parse_request_id(request_id_str):
5357
splits = request_id_str.split(".")
5458
version, rand_process_id, client_id, channel_id, nth_request, nth_attempt = list(

tests/system/test_session_api.py

Lines changed: 134 additions & 183 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import collections
1616
import datetime
1717
import decimal
18+
import functools
19+
1820
import math
1921
import struct
2022
import threading
@@ -28,6 +30,7 @@
2830
from google.cloud import spanner_v1
2931
from google.cloud.spanner_admin_database_v1 import DatabaseDialect
3032
from google.cloud._helpers import UTC
33+
3134
from google.cloud.spanner_v1.data_types import JsonObject
3235
from google.cloud.spanner_v1.session_options import TransactionType
3336
from .testdata import singer_pb2
@@ -37,6 +40,7 @@
3740
from google.cloud.spanner_v1.request_id_header import (
3841
REQ_RAND_PROCESS_ID,
3942
parse_request_id,
43+
build_request_id,
4044
)
4145
from .._helpers import is_multiplexed_enabled
4246

@@ -687,211 +691,158 @@ def transaction_work(transaction):
687691

688692
if ot_exporter is not None:
689693
multiplexed_enabled = is_multiplexed_enabled(TransactionType.READ_ONLY)
690-
span_list = ot_exporter.get_finished_spans()
691-
got_span_names = [span.name for span in span_list]
692694

693-
if multiplexed_enabled:
694-
# With multiplexed sessions enabled:
695-
# - Batch operations use multiplexed sessions (GetSession)
696-
# - run_in_transaction uses regular sessions (GetSession)
697-
# - Snapshot (read-only) re-use existing multiplexed sessions
698-
# Note: Session creation span may not appear if session is reused from pool
699-
expected_span_names = [
700-
"CloudSpanner.Batch.commit", # Batch commit
701-
"CloudSpanner.GetSession", # Transaction session
702-
"CloudSpanner.Transaction.read", # First read
703-
"CloudSpanner.Transaction.read", # Second read
704-
"CloudSpanner.Transaction.rollback", # Rollback due to exception
705-
"CloudSpanner.Session.run_in_transaction", # Session transaction wrapper
706-
"CloudSpanner.Database.run_in_transaction", # Database transaction wrapper
707-
"CloudSpanner.Snapshot.read", # Snapshot read
708-
]
709-
# Check if we have a multiplexed session creation span
710-
if "CloudSpanner.CreateMultiplexedSession" in got_span_names:
711-
expected_span_names.insert(-1, "CloudSpanner.CreateMultiplexedSession")
712-
else:
713-
# Without multiplexed sessions, all operations use regular sessions
714-
expected_span_names = [
715-
"CloudSpanner.GetSession", # Batch operation
716-
"CloudSpanner.Batch.commit", # Batch commit
717-
"CloudSpanner.GetSession", # Transaction session
718-
"CloudSpanner.Transaction.read", # First read
719-
"CloudSpanner.Transaction.read", # Second read
720-
"CloudSpanner.Transaction.rollback", # Rollback due to exception
721-
"CloudSpanner.Session.run_in_transaction", # Session transaction wrapper
722-
"CloudSpanner.Database.run_in_transaction", # Database transaction wrapper
723-
"CloudSpanner.Snapshot.read", # Snapshot read
724-
]
725-
# Check if we have a session creation span for snapshot
726-
if len(got_span_names) > len(expected_span_names):
727-
expected_span_names.insert(-1, "CloudSpanner.GetSession")
728-
729-
assert got_span_names == expected_span_names
730-
731-
sampling_req_id = parse_request_id(
732-
span_list[0].attributes["x_goog_spanner_request_id"]
695+
_build_request_id = functools.partial(
696+
build_request_id,
697+
client_id=sessions_database._nth_client_id,
698+
channel_id=sessions_database._channel_id,
699+
attempt=1,
733700
)
734-
nth_req0 = sampling_req_id[-2]
735701

736-
db = sessions_database
737-
738-
# Span 0: batch operation (always uses GetSession from pool)
739-
assert_span_attributes(
740-
ot_exporter,
741-
"CloudSpanner.GetSession",
742-
attributes=_make_attributes(
743-
db_name,
744-
session_found=True,
745-
x_goog_spanner_request_id=f"1.{REQ_RAND_PROCESS_ID}.{db._nth_client_id}.{db._channel_id}.{nth_req0 + 0}.1",
746-
),
747-
span=span_list[0],
748-
)
702+
expected_span_properties = []
703+
704+
# [A] Batch spans
705+
if not multiplexed_enabled:
706+
expected_span_properties.append(
707+
{
708+
"name": "CloudSpanner.GetSession",
709+
"attributes": _make_attributes(
710+
db_name,
711+
session_found=True,
712+
x_goog_spanner_request_id=_build_request_id(
713+
nth_request=len(expected_span_properties)
714+
),
715+
),
716+
}
717+
)
749718

750-
# Span 1: batch commit
751-
assert_span_attributes(
752-
ot_exporter,
753-
"CloudSpanner.Batch.commit",
754-
attributes=_make_attributes(
755-
db_name,
756-
num_mutations=1,
757-
x_goog_spanner_request_id=f"1.{REQ_RAND_PROCESS_ID}.{db._nth_client_id}.{db._channel_id}.{nth_req0 + 1}.1",
758-
),
759-
span=span_list[1],
719+
expected_span_properties.append(
720+
{
721+
"name": "CloudSpanner.Batch.commit",
722+
"attributes": _make_attributes(
723+
db_name,
724+
num_mutations=1,
725+
x_goog_spanner_request_id=_build_request_id(
726+
nth_request=len(expected_span_properties)
727+
),
728+
),
729+
}
760730
)
761731

762-
# Span 2: GetSession for transaction
763-
assert_span_attributes(
764-
ot_exporter,
765-
"CloudSpanner.GetSession",
766-
attributes=_make_attributes(
767-
db_name,
768-
session_found=True,
769-
x_goog_spanner_request_id=f"1.{REQ_RAND_PROCESS_ID}.{db._nth_client_id}.{db._channel_id}.{nth_req0 + 2}.1",
770-
),
771-
span=span_list[2],
732+
# [B] Transaction spans
733+
expected_span_properties.append(
734+
{
735+
"name": "CloudSpanner.GetSession",
736+
"attributes": _make_attributes(
737+
db_name,
738+
session_found=True,
739+
x_goog_spanner_request_id=_build_request_id(
740+
nth_request=len(expected_span_properties)
741+
),
742+
),
743+
}
772744
)
773745

774-
# Span 3: First transaction read
775-
assert_span_attributes(
776-
ot_exporter,
777-
"CloudSpanner.Transaction.read",
778-
attributes=_make_attributes(
779-
db_name,
780-
table_id=sd.TABLE,
781-
columns=sd.COLUMNS,
782-
x_goog_spanner_request_id=f"1.{REQ_RAND_PROCESS_ID}.{db._nth_client_id}.{db._channel_id}.{nth_req0 + 3}.1",
783-
),
784-
span=span_list[3],
746+
expected_span_properties.append(
747+
{
748+
"name": "CloudSpanner.Transaction.read",
749+
"attributes": _make_attributes(
750+
db_name,
751+
table_id=sd.TABLE,
752+
columns=sd.COLUMNS,
753+
x_goog_spanner_request_id=_build_request_id(
754+
nth_request=len(expected_span_properties)
755+
),
756+
),
757+
}
785758
)
786759

787-
# Span 4: Second transaction read
788-
assert_span_attributes(
789-
ot_exporter,
790-
"CloudSpanner.Transaction.read",
791-
attributes=_make_attributes(
792-
db_name,
793-
table_id=sd.TABLE,
794-
columns=sd.COLUMNS,
795-
x_goog_spanner_request_id=f"1.{REQ_RAND_PROCESS_ID}.{db._nth_client_id}.{db._channel_id}.{nth_req0 + 4}.1",
796-
),
797-
span=span_list[4],
760+
expected_span_properties.append(
761+
{
762+
"name": "CloudSpanner.Transaction.read",
763+
"attributes": _make_attributes(
764+
db_name,
765+
table_id=sd.TABLE,
766+
columns=sd.COLUMNS,
767+
num_mutations=1,
768+
x_goog_spanner_request_id=_build_request_id(
769+
nth_request=len(expected_span_properties)
770+
),
771+
),
772+
}
798773
)
799774

800-
# Span 5: Transaction rollback
801-
assert_span_attributes(
802-
ot_exporter,
803-
"CloudSpanner.Transaction.rollback",
804-
attributes=_make_attributes(
805-
db_name,
806-
x_goog_spanner_request_id=f"1.{REQ_RAND_PROCESS_ID}.{db._nth_client_id}.{db._channel_id}.{nth_req0 + 5}.1",
807-
),
808-
span=span_list[5],
775+
expected_span_properties.append(
776+
{
777+
"name": "CloudSpanner.Transaction.rollback",
778+
"attributes": _make_attributes(
779+
db_name,
780+
x_goog_spanner_request_id=_build_request_id(
781+
nth_request=len(expected_span_properties)
782+
),
783+
),
784+
}
809785
)
810786

811-
# Span 6: Session.run_in_transaction (ERROR status due to intentional exception)
812-
assert_span_attributes(
813-
ot_exporter,
814-
"CloudSpanner.Session.run_in_transaction",
815-
status=ot_helpers.StatusCode.ERROR,
816-
attributes=_make_attributes(db_name),
817-
span=span_list[6],
787+
expected_span_properties.append(
788+
{
789+
"name": "CloudSpanner.Session.run_in_transaction",
790+
"status": ot_helpers.StatusCode.ERROR,
791+
"attributes": _make_attributes(db_name),
792+
}
818793
)
819794

820-
# Span 7: Database.run_in_transaction (ERROR status due to intentional exception)
821-
assert_span_attributes(
822-
ot_exporter,
823-
"CloudSpanner.Database.run_in_transaction",
824-
status=ot_helpers.StatusCode.ERROR,
825-
attributes=_make_attributes(db_name),
826-
span=span_list[7],
795+
expected_span_properties.append(
796+
{
797+
"name": "CloudSpanner.Database.run_in_transaction",
798+
"status": ot_helpers.StatusCode.ERROR,
799+
"attributes": _make_attributes(db_name),
800+
}
827801
)
828802

829-
# Check if we have a snapshot session creation span
830-
snapshot_read_span_index = -1
831-
snapshot_session_span_index = -1
832-
833-
for i, span in enumerate(span_list):
834-
if span.name == "CloudSpanner.Snapshot.read":
835-
snapshot_read_span_index = i
836-
break
837-
838-
# Look for session creation span before the snapshot read
839-
if snapshot_read_span_index > 8:
840-
snapshot_session_span_index = snapshot_read_span_index - 1
803+
# [C] Snapshot spans
804+
if not multiplexed_enabled:
805+
expected_span_properties.append(
806+
{
807+
"name": "CloudSpanner.GetSession",
808+
"attributes": _make_attributes(
809+
db_name,
810+
session_found=True,
811+
x_goog_spanner_request_id=_build_request_id(
812+
nth_request=len(expected_span_properties)
813+
),
814+
),
815+
}
816+
)
841817

842-
if (
843-
multiplexed_enabled
844-
and span_list[snapshot_session_span_index].name
845-
== "CloudSpanner.CreateMultiplexedSession"
846-
):
847-
expected_snapshot_span_name = "CloudSpanner.CreateMultiplexedSession"
848-
snapshot_session_attributes = _make_attributes(
849-
db_name,
850-
x_goog_spanner_request_id=span_list[
851-
snapshot_session_span_index
852-
].attributes["x_goog_spanner_request_id"],
853-
)
854-
assert_span_attributes(
855-
ot_exporter,
856-
expected_snapshot_span_name,
857-
attributes=snapshot_session_attributes,
858-
span=span_list[snapshot_session_span_index],
859-
)
860-
elif (
861-
not multiplexed_enabled
862-
and span_list[snapshot_session_span_index].name
863-
== "CloudSpanner.GetSession"
864-
):
865-
expected_snapshot_span_name = "CloudSpanner.GetSession"
866-
snapshot_session_attributes = _make_attributes(
818+
expected_span_properties.append(
819+
{
820+
"name": "CloudSpanner.Snapshot.read",
821+
"attributes": _make_attributes(
867822
db_name,
868-
session_found=True,
869-
x_goog_spanner_request_id=span_list[
870-
snapshot_session_span_index
871-
].attributes["x_goog_spanner_request_id"],
872-
)
873-
assert_span_attributes(
874-
ot_exporter,
875-
expected_snapshot_span_name,
876-
attributes=snapshot_session_attributes,
877-
span=span_list[snapshot_session_span_index],
878-
)
879-
880-
# Snapshot read span
881-
assert_span_attributes(
882-
ot_exporter,
883-
"CloudSpanner.Snapshot.read",
884-
attributes=_make_attributes(
885-
db_name,
886-
table_id=sd.TABLE,
887-
columns=sd.COLUMNS,
888-
x_goog_spanner_request_id=span_list[
889-
snapshot_read_span_index
890-
].attributes["x_goog_spanner_request_id"],
891-
),
892-
span=span_list[snapshot_read_span_index],
823+
table_id=sd.TABLE,
824+
columns=sd.COLUMNS,
825+
x_goog_spanner_request_id=_build_request_id(
826+
nth_request=len(expected_span_properties)
827+
),
828+
),
829+
}
893830
)
894831

832+
# Verify spans.
833+
span_list = ot_exporter.get_finished_spans()
834+
assert len(span_list) == len(expected_span_properties)
835+
836+
for i, expected in enumerate(expected_span_properties):
837+
expected = expected_span_properties[i]
838+
assert_span_attributes(
839+
span=span_list[i],
840+
name=expected["name"],
841+
status=expected.get("status", ot_helpers.StatusCode.OK),
842+
attributes=expected["attributes"],
843+
ot_exporter=ot_exporter,
844+
)
845+
895846

896847
@_helpers.retry_maybe_conflict
897848
def test_transaction_read_and_insert_then_exception(sessions_database):

0 commit comments

Comments
 (0)