Skip to content

Commit 5af3cb7

Browse files
authored
impl(spanner): support multiplexed session transaction priority (#15361)
1 parent 16b6833 commit 5af3cb7

File tree

4 files changed

+84
-17
lines changed

4 files changed

+84
-17
lines changed

google/cloud/spanner/internal/transaction_impl.cc

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,45 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
2121

2222
TransactionImpl::~TransactionImpl() = default;
2323

24+
TransactionImpl::TransactionImpl(
25+
google::spanner::v1::TransactionSelector selector, bool route_to_leader,
26+
std::string tag)
27+
: TransactionImpl(/*session=*/{}, std::move(selector), route_to_leader,
28+
std::move(tag), absl::nullopt) {}
29+
30+
TransactionImpl::TransactionImpl(
31+
TransactionImpl const& impl,
32+
google::spanner::v1::TransactionSelector selector, bool route_to_leader,
33+
std::string tag)
34+
: TransactionImpl(impl.session_, std::move(selector), route_to_leader,
35+
std::move(tag),
36+
(impl.session_ && impl.session_->is_multiplexed() &&
37+
impl.selector_->has_id())
38+
? absl::optional<std::string>(impl.selector_->id())
39+
: absl::nullopt) {}
40+
41+
TransactionImpl::TransactionImpl(
42+
SessionHolder session, google::spanner::v1::TransactionSelector selector,
43+
bool route_to_leader, std::string tag,
44+
absl::optional<std::string> multiplexed_session_previous_transaction_id)
45+
: session_(std::move(session)),
46+
selector_(std::move(selector)),
47+
route_to_leader_(route_to_leader),
48+
tag_(std::move(tag)),
49+
seqno_(0) {
50+
state_ = selector_->has_begin() ? State::kBegin : State::kDone;
51+
// If we're attempting to retry an aborted ReadWrite transaction on a
52+
// multiplexed session, then propagate the aborted transaction id.
53+
if (session_ && session_->is_multiplexed() && selector_.ok() &&
54+
selector_->has_begin() && selector_->begin().has_read_write() &&
55+
multiplexed_session_previous_transaction_id.has_value()) {
56+
selector_->mutable_begin()
57+
->mutable_read_write()
58+
->set_multiplexed_session_previous_transaction_id(
59+
*multiplexed_session_previous_transaction_id);
60+
}
61+
}
62+
2463
void TransactionImpl::UpdatePrecommitToken(
2564
std::unique_lock<std::mutex> const&,
2665
absl::optional<google::spanner::v1::MultiplexedSessionPrecommitToken>

google/cloud/spanner/internal/transaction_impl.h

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -52,26 +52,16 @@ using VisitInvokeResult = ::google::cloud::internal::invoke_result_t<
5252
class TransactionImpl {
5353
public:
5454
TransactionImpl(google::spanner::v1::TransactionSelector selector,
55-
bool route_to_leader, std::string tag)
56-
: TransactionImpl(/*session=*/{}, std::move(selector), route_to_leader,
57-
std::move(tag)) {}
55+
bool route_to_leader, std::string tag);
5856

5957
TransactionImpl(TransactionImpl const& impl,
6058
google::spanner::v1::TransactionSelector selector,
61-
bool route_to_leader, std::string tag)
62-
: TransactionImpl(impl.session_, std::move(selector), route_to_leader,
63-
std::move(tag)) {}
59+
bool route_to_leader, std::string tag);
6460

65-
TransactionImpl(SessionHolder session,
66-
google::spanner::v1::TransactionSelector selector,
67-
bool route_to_leader, std::string tag)
68-
: session_(std::move(session)),
69-
selector_(std::move(selector)),
70-
route_to_leader_(route_to_leader),
71-
tag_(std::move(tag)),
72-
seqno_(0) {
73-
state_ = selector_->has_begin() ? State::kBegin : State::kDone;
74-
}
61+
TransactionImpl(
62+
SessionHolder session, google::spanner::v1::TransactionSelector selector,
63+
bool route_to_leader, std::string tag,
64+
absl::optional<std::string> multiplexed_session_previous_transaction_id);
7565

7666
~TransactionImpl();
7767

google/cloud/spanner/transaction.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,8 @@ Transaction::Transaction(std::string session_id, std::string transaction_id,
144144
selector.set_id(std::move(transaction_id));
145145
impl_ = std::make_shared<spanner_internal::TransactionImpl>(
146146
spanner_internal::MakeDissociatedSessionHolder(std::move(session_id)),
147-
std::move(selector), route_to_leader, std::move(transaction_tag));
147+
std::move(selector), route_to_leader, std::move(transaction_tag),
148+
absl::nullopt);
148149
}
149150

150151
Transaction::~Transaction() = default;

google/cloud/spanner/transaction_test.cc

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,43 @@ TEST(Transaction, SessionAffinity) {
126126
EXPECT_EQ(a_session, session); // session affinity
127127
EXPECT_TRUE(s->has_begin()); // but a new transaction
128128
EXPECT_EQ(ctx.tag, "app=cart,env=dev");
129+
EXPECT_THAT(s->begin()
130+
.read_write()
131+
.multiplexed_session_previous_transaction_id(),
132+
IsEmpty());
133+
return 0;
134+
});
135+
}
136+
137+
TEST(Transaction, MultiplexedPreviousTransactionId) {
138+
std::string aborted_txn_id = "aborted-txn-id";
139+
auto mux_session = spanner_internal::MakeMultiplexedSessionHolder(
140+
"multiplexed", std::make_shared<spanner_internal::Session::Clock>());
141+
auto opts = Transaction::ReadWriteOptions().WithTag("app=cart,env=dev");
142+
Transaction aborted_txn = MakeReadWriteTransaction(opts);
143+
spanner_internal::Visit(
144+
aborted_txn, [&](spanner_internal::SessionHolder& session,
145+
StatusOr<google::spanner::v1::TransactionSelector>& s,
146+
spanner_internal::TransactionContext const& ctx) {
147+
EXPECT_FALSE(session);
148+
EXPECT_TRUE(s->has_begin());
149+
session = mux_session;
150+
s->set_id(aborted_txn_id);
151+
EXPECT_EQ(ctx.tag, "app=cart,env=dev");
152+
return 0;
153+
});
154+
Transaction retry_txn = MakeReadWriteTransaction(aborted_txn, opts);
155+
spanner_internal::Visit(
156+
retry_txn, [&](spanner_internal::SessionHolder& session,
157+
StatusOr<google::spanner::v1::TransactionSelector>& s,
158+
spanner_internal::TransactionContext const& ctx) {
159+
EXPECT_EQ(mux_session, session);
160+
EXPECT_TRUE(s->has_begin());
161+
EXPECT_EQ(s->begin()
162+
.read_write()
163+
.multiplexed_session_previous_transaction_id(),
164+
aborted_txn_id);
165+
EXPECT_EQ(ctx.tag, "app=cart,env=dev");
129166
return 0;
130167
});
131168
}

0 commit comments

Comments
 (0)