diff --git a/google/cloud/spanner/internal/connection_impl.cc b/google/cloud/spanner/internal/connection_impl.cc index 8c17114eca020..6f797bef3ada5 100644 --- a/google/cloud/spanner/internal/connection_impl.cc +++ b/google/cloud/spanner/internal/connection_impl.cc @@ -346,7 +346,7 @@ spanner::RowStream ConnectionImpl::Read(ReadParams params) { std::move(params.transaction), [this, ¶ms](SessionHolder& session, StatusOr& s, - TransactionContext const& ctx) { + TransactionContext& ctx) { return ReadImpl(session, s, ctx, std::move(params)); }); } @@ -357,7 +357,7 @@ StatusOr> ConnectionImpl::PartitionRead( std::move(params.read_params.transaction), [this, ¶ms](SessionHolder& session, StatusOr& s, - TransactionContext const& ctx) { + TransactionContext& ctx) { return PartitionReadImpl(session, s, ctx, params.read_params, params.partition_options); }); @@ -368,7 +368,7 @@ spanner::RowStream ConnectionImpl::ExecuteQuery(SqlParams params) { std::move(params.transaction), [this, ¶ms](SessionHolder& session, StatusOr& s, - TransactionContext const& ctx) { + TransactionContext& ctx) { return ExecuteQueryImpl(session, s, ctx, std::move(params)); }); } @@ -378,7 +378,7 @@ StatusOr ConnectionImpl::ExecuteDml(SqlParams params) { std::move(params.transaction), [this, ¶ms](SessionHolder& session, StatusOr& s, - TransactionContext const& ctx) { + TransactionContext& ctx) { return ExecuteDmlImpl(session, s, ctx, std::move(params)); }); } @@ -388,7 +388,7 @@ spanner::ProfileQueryResult ConnectionImpl::ProfileQuery(SqlParams params) { std::move(params.transaction), [this, ¶ms](SessionHolder& session, StatusOr& s, - TransactionContext const& ctx) { + TransactionContext& ctx) { return ProfileQueryImpl(session, s, ctx, std::move(params)); }); } @@ -399,7 +399,7 @@ StatusOr ConnectionImpl::ProfileDml( std::move(params.transaction), [this, ¶ms](SessionHolder& session, StatusOr& s, - TransactionContext const& ctx) { + TransactionContext& ctx) { return ProfileDmlImpl(session, s, ctx, std::move(params)); }); } @@ -409,7 +409,7 @@ StatusOr ConnectionImpl::AnalyzeSql(SqlParams params) { std::move(params.transaction), [this, ¶ms](SessionHolder& session, StatusOr& s, - TransactionContext const& ctx) { + TransactionContext& ctx) { return AnalyzeSqlImpl(session, s, ctx, std::move(params)); }); } @@ -423,7 +423,7 @@ StatusOr ConnectionImpl::ExecutePartitionedDml( return Visit(txn, [this, ¶ms]( SessionHolder& session, StatusOr& s, - TransactionContext const& ctx) { + TransactionContext& ctx) { return ExecutePartitionedDmlImpl(session, s, ctx, std::move(params)); }); } @@ -434,7 +434,7 @@ StatusOr> ConnectionImpl::PartitionQuery( std::move(params.transaction), [this, ¶ms](SessionHolder& session, StatusOr& s, - TransactionContext const& ctx) { + TransactionContext& ctx) { return PartitionQueryImpl(session, s, ctx, params); }); } @@ -445,7 +445,7 @@ StatusOr ConnectionImpl::ExecuteBatchDml( std::move(params.transaction), [this, ¶ms](SessionHolder& session, StatusOr& s, - TransactionContext const& ctx) { + TransactionContext& ctx) { return ExecuteBatchDmlImpl(session, s, ctx, std::move(params)); }); } @@ -455,7 +455,7 @@ StatusOr ConnectionImpl::Commit(CommitParams params) { std::move(params.transaction), [this, ¶ms](SessionHolder& session, StatusOr& s, - TransactionContext const& ctx) { + TransactionContext& ctx) { return CommitImpl(session, s, ctx, std::move(params)); }); } @@ -464,7 +464,7 @@ Status ConnectionImpl::Rollback(RollbackParams params) { return Visit(std::move(params.transaction), [this](SessionHolder& session, StatusOr& s, - TransactionContext const& ctx) { + TransactionContext& ctx) { return RollbackImpl(session, s, ctx); }); } @@ -479,9 +479,15 @@ spanner::BatchedCommitResultStream ConnectionImpl::BatchWrite( * an error if `session` is empty and no `Session` can be allocated. */ Status ConnectionImpl::PrepareSession(SessionHolder& session, - bool dissociate_from_pool) { + Session::Mode mode) { if (!session) { - auto session_or = session_pool_->Allocate(dissociate_from_pool); + StatusOr session_or; + if (opts_.has()) { + session_or = session_pool_->Multiplexed(mode); + } else { + session_or = session_pool_->Allocate(mode); + } + if (!session_or) { return std::move(session_or).status(); } @@ -500,7 +506,7 @@ Status ConnectionImpl::PrepareSession(SessionHolder& session, */ StatusOr ConnectionImpl::BeginTransaction( SessionHolder& session, google::spanner::v1::TransactionOptions options, - std::string request_tag, TransactionContext const& ctx, char const* func) { + std::string request_tag, TransactionContext& ctx, char const* func) { google::spanner::v1::BeginTransactionRequest begin; begin.set_session(session->session_name()); *begin.mutable_options() = std::move(options); @@ -510,7 +516,13 @@ StatusOr ConnectionImpl::BeginTransaction( begin.mutable_request_options()->set_request_tag(std::move(request_tag)); begin.mutable_request_options()->set_transaction_tag(ctx.tag); - auto stub = session_pool_->GetStub(*session); + std::shared_ptr stub; + if (session->is_multiplexed()) { + stub = session_pool_->GetStub(*session, ctx); + } else { + stub = session_pool_->GetStub(*session); + } + auto const& current = internal::CurrentOptions(); auto response = RetryLoop( RetryPolicyPrototype(current)->clone(), @@ -533,7 +545,7 @@ StatusOr ConnectionImpl::BeginTransaction( spanner::RowStream ConnectionImpl::ReadImpl( SessionHolder& session, StatusOr& s, - TransactionContext const& ctx, ReadParams params) { + TransactionContext& ctx, ReadParams params) { if (!s.ok()) { return MakeStatusOnlyResult(s.status()); } @@ -573,7 +585,12 @@ spanner::RowStream ConnectionImpl::ReadImpl( // Capture a copy of `stub` to ensure the `shared_ptr<>` remains valid through // the lifetime of the lambda. - auto stub = session_pool_->GetStub(*session); + std::shared_ptr stub; + if (session->is_multiplexed()) { + stub = session_pool_->GetStub(*session, ctx); + } else { + stub = session_pool_->GetStub(*session); + } auto const tracing_enabled = RpcStreamTracingEnabled(); auto const& tracing_options = RpcTracingOptions(); auto factory = [stub, request, route_to_leader = ctx.route_to_leader, @@ -632,7 +649,7 @@ spanner::RowStream ConnectionImpl::ReadImpl( StatusOr> ConnectionImpl::PartitionReadImpl( SessionHolder& session, StatusOr& s, - TransactionContext const& ctx, ReadParams const& params, + TransactionContext& ctx, ReadParams const& params, spanner::PartitionOptions const& partition_options) { if (!s.ok()) { return s.status(); @@ -640,7 +657,7 @@ StatusOr> ConnectionImpl::PartitionReadImpl( // Since the session may be sent to other machines, it should not be returned // to the pool when the Transaction is destroyed. - auto prepare_status = PrepareSession(session, /*dissociate_from_pool=*/true); + auto prepare_status = PrepareSession(session, Session::Mode::kDisassociated); if (!prepare_status.ok()) { return prepare_status; } @@ -656,7 +673,12 @@ StatusOr> ConnectionImpl::PartitionReadImpl( *request.mutable_key_set() = ToProto(params.keys); *request.mutable_partition_options() = ToProto(partition_options); - auto stub = session_pool_->GetStub(*session); + std::shared_ptr stub; + if (session->is_multiplexed()) { + stub = session_pool_->GetStub(*session, ctx); + } else { + stub = session_pool_->GetStub(*session); + } auto const& current = internal::CurrentOptions(); for (;;) { auto response = RetryLoop( @@ -714,7 +736,7 @@ template StatusOr ConnectionImpl::ExecuteSqlImpl( SessionHolder& session, StatusOr& s, - TransactionContext const& ctx, SqlParams params, + TransactionContext& ctx, SqlParams params, google::spanner::v1::ExecuteSqlRequest::QueryMode query_mode, std::function>( google::spanner::v1::ExecuteSqlRequest& request)> const& @@ -792,7 +814,7 @@ template ResultType ConnectionImpl::CommonQueryImpl( SessionHolder& session, StatusOr& s, - TransactionContext const& ctx, SqlParams params, + TransactionContext& ctx, SqlParams params, google::spanner::v1::ExecuteSqlRequest::QueryMode query_mode) { if (!s.ok()) { return MakeStatusOnlyResult(s.status()); @@ -805,7 +827,12 @@ ResultType ConnectionImpl::CommonQueryImpl( // Capture a copy of of these to ensure the `shared_ptr<>` remains valid // through the lifetime of the lambda. Note that the local variables are a // reference to avoid increasing refcounts twice, but the capture is by value. - auto stub = session_pool_->GetStub(*session); + std::shared_ptr stub; + if (session->is_multiplexed()) { + stub = session_pool_->GetStub(*session, ctx); + } else { + stub = session_pool_->GetStub(*session); + } auto const& retry_policy_prototype = RetryPolicyPrototype(); auto const& backoff_policy_prototype = BackoffPolicyPrototype(); auto const tracing_enabled = RpcStreamTracingEnabled(); @@ -853,7 +880,7 @@ ResultType ConnectionImpl::CommonQueryImpl( spanner::RowStream ConnectionImpl::ExecuteQueryImpl( SessionHolder& session, StatusOr& s, - TransactionContext const& ctx, SqlParams params) { + TransactionContext& ctx, SqlParams params) { return CommonQueryImpl( session, s, ctx, std::move(params), google::spanner::v1::ExecuteSqlRequest::NORMAL); @@ -862,7 +889,7 @@ spanner::RowStream ConnectionImpl::ExecuteQueryImpl( spanner::ProfileQueryResult ConnectionImpl::ProfileQueryImpl( SessionHolder& session, StatusOr& s, - TransactionContext const& ctx, SqlParams params) { + TransactionContext& ctx, SqlParams params) { return CommonQueryImpl( session, s, ctx, std::move(params), google::spanner::v1::ExecuteSqlRequest::PROFILE); @@ -872,7 +899,7 @@ template StatusOr ConnectionImpl::CommonDmlImpl( SessionHolder& session, StatusOr& s, - TransactionContext const& ctx, SqlParams params, + TransactionContext& ctx, SqlParams params, google::spanner::v1::ExecuteSqlRequest::QueryMode query_mode) { if (!s.ok()) { return s.status(); @@ -885,7 +912,12 @@ StatusOr ConnectionImpl::CommonDmlImpl( // Capture a copy of of these to ensure the `shared_ptr<>` remains valid // through the lifetime of the lambda. Note that the local variables are a // reference to avoid increasing refcounts twice, but the capture is by value. - auto stub = session_pool_->GetStub(*session); + std::shared_ptr stub; + if (session->is_multiplexed()) { + stub = session_pool_->GetStub(*session, ctx); + } else { + stub = session_pool_->GetStub(*session); + } auto current = google::cloud::internal::SaveCurrentOptions(); auto const& retry_policy_prototype = RetryPolicyPrototype(*current); auto const& backoff_policy_prototype = BackoffPolicyPrototype(*current); @@ -919,7 +951,7 @@ StatusOr ConnectionImpl::CommonDmlImpl( StatusOr ConnectionImpl::ExecuteDmlImpl( SessionHolder& session, StatusOr& s, - TransactionContext const& ctx, SqlParams params) { + TransactionContext& ctx, SqlParams params) { return CommonDmlImpl( session, s, ctx, std::move(params), google::spanner::v1::ExecuteSqlRequest::NORMAL); @@ -928,7 +960,7 @@ StatusOr ConnectionImpl::ExecuteDmlImpl( StatusOr ConnectionImpl::ProfileDmlImpl( SessionHolder& session, StatusOr& s, - TransactionContext const& ctx, SqlParams params) { + TransactionContext& ctx, SqlParams params) { return CommonDmlImpl( session, s, ctx, std::move(params), google::spanner::v1::ExecuteSqlRequest::PROFILE); @@ -937,7 +969,7 @@ StatusOr ConnectionImpl::ProfileDmlImpl( StatusOr ConnectionImpl::AnalyzeSqlImpl( SessionHolder& session, StatusOr& s, - TransactionContext const& ctx, SqlParams params) { + TransactionContext& ctx, SqlParams params) { auto result = CommonDmlImpl( session, s, ctx, std::move(params), google::spanner::v1::ExecuteSqlRequest::PLAN); @@ -951,14 +983,14 @@ StatusOr> ConnectionImpl::PartitionQueryImpl( SessionHolder& session, StatusOr& s, - TransactionContext const& ctx, PartitionQueryParams const& params) { + TransactionContext& ctx, PartitionQueryParams const& params) { if (!s.ok()) { return s.status(); } // Since the session may be sent to other machines, it should not be returned // to the pool when the Transaction is destroyed. - auto prepare_status = PrepareSession(session, /*dissociate_from_pool=*/true); + auto prepare_status = PrepareSession(session, Session::Mode::kDisassociated); if (!prepare_status.ok()) { return prepare_status; } @@ -973,7 +1005,12 @@ ConnectionImpl::PartitionQueryImpl( std::move(*sql_statement.mutable_param_types()); *request.mutable_partition_options() = ToProto(params.partition_options); - auto stub = session_pool_->GetStub(*session); + std::shared_ptr stub; + if (session->is_multiplexed()) { + stub = session_pool_->GetStub(*session, ctx); + } else { + stub = session_pool_->GetStub(*session); + } auto const& current = internal::CurrentOptions(); for (;;) { auto response = RetryLoop( @@ -1023,7 +1060,7 @@ ConnectionImpl::PartitionQueryImpl( StatusOr ConnectionImpl::ExecuteBatchDmlImpl( SessionHolder& session, StatusOr& s, - TransactionContext const& ctx, ExecuteBatchDmlParams params) { + TransactionContext& ctx, ExecuteBatchDmlParams params) { if (!s.ok()) { return s.status(); } @@ -1050,7 +1087,12 @@ StatusOr ConnectionImpl::ExecuteBatchDmlImpl( request.mutable_request_options()->set_transaction_tag(ctx.tag); auto const& current = internal::CurrentOptions(); - auto stub = session_pool_->GetStub(*session); + std::shared_ptr stub; + if (session->is_multiplexed()) { + stub = session_pool_->GetStub(*session, ctx); + } else { + stub = session_pool_->GetStub(*session); + } for (;;) { auto response = RetryLoop( RetryPolicyPrototype()->clone(), BackoffPolicyPrototype()->clone(), @@ -1097,7 +1139,7 @@ StatusOr ConnectionImpl::ExecutePartitionedDmlImpl( SessionHolder& session, StatusOr& s, - TransactionContext const& ctx, ExecutePartitionedDmlParams params) { + TransactionContext& ctx, ExecutePartitionedDmlParams params) { if (!s.ok()) { return s.status(); } @@ -1141,7 +1183,7 @@ ConnectionImpl::ExecutePartitionedDmlImpl( StatusOr ConnectionImpl::CommitImpl( SessionHolder& session, StatusOr& s, - TransactionContext const& ctx, CommitParams params) { + TransactionContext& ctx, CommitParams params) { if (!s.ok()) { // Fail the commit if the transaction has been invalidated. return s.status(); @@ -1197,7 +1239,12 @@ StatusOr ConnectionImpl::CommitImpl( } auto const& current = internal::CurrentOptions(); - auto stub = session_pool_->GetStub(*session); + std::shared_ptr stub; + if (session->is_multiplexed()) { + stub = session_pool_->GetStub(*session, ctx); + } else { + stub = session_pool_->GetStub(*session); + } auto response = RetryLoop( RetryPolicyPrototype(current)->clone(), BackoffPolicyPrototype(current)->clone(), Idempotency::kIdempotent, @@ -1224,7 +1271,7 @@ StatusOr ConnectionImpl::CommitImpl( Status ConnectionImpl::RollbackImpl( SessionHolder& session, StatusOr& s, - TransactionContext const& ctx) { + TransactionContext& ctx) { if (!s.ok()) { return s.status(); } @@ -1251,7 +1298,12 @@ Status ConnectionImpl::RollbackImpl( google::spanner::v1::RollbackRequest request; request.set_session(session->session_name()); request.set_transaction_id(s->id()); - auto stub = session_pool_->GetStub(*session); + std::shared_ptr stub; + if (session->is_multiplexed()) { + stub = session_pool_->GetStub(*session, ctx); + } else { + stub = session_pool_->GetStub(*session); + } auto const& current = internal::CurrentOptions(); auto status = RetryLoop( RetryPolicyPrototype(current)->clone(), @@ -1300,6 +1352,8 @@ spanner::BatchedCommitResultStream ConnectionImpl::BatchWriteImpl( request.set_exclude_txn_from_change_streams(true); } + // There's no client-side transaction involved with BatchWrite, so no need + // to store the resulting stub in the case of a Multiplexed Session. auto stub = session_pool_->GetStub(*session); auto factory = [stub = std::move(stub)]( google::spanner::v1::BatchWriteRequest const& request) { diff --git a/google/cloud/spanner/internal/connection_impl.h b/google/cloud/spanner/internal/connection_impl.h index 1c1a3aa408763..8f7c1d595d158 100644 --- a/google/cloud/spanner/internal/connection_impl.h +++ b/google/cloud/spanner/internal/connection_impl.h @@ -68,71 +68,71 @@ class ConnectionImpl : public spanner::Connection { private: Status PrepareSession(SessionHolder& session, - bool dissociate_from_pool = false); + Session::Mode mode = Session::Mode::kPooled); StatusOr BeginTransaction( SessionHolder& session, google::spanner::v1::TransactionOptions options, - std::string request_tag, TransactionContext const& ctx, char const* func); + std::string request_tag, TransactionContext& ctx, char const* func); spanner::RowStream ReadImpl( SessionHolder& session, StatusOr& s, - TransactionContext const& ctx, ReadParams params); + TransactionContext& ctx, ReadParams params); StatusOr> PartitionReadImpl( SessionHolder& session, StatusOr& s, - TransactionContext const& ctx, ReadParams const& params, + TransactionContext& ctx, ReadParams const& params, spanner::PartitionOptions const& partition_options); spanner::RowStream ExecuteQueryImpl( SessionHolder& session, StatusOr& s, - TransactionContext const& ctx, SqlParams params); + TransactionContext& ctx, SqlParams params); StatusOr ExecuteDmlImpl( SessionHolder& session, StatusOr& s, - TransactionContext const& ctx, SqlParams params); + TransactionContext& ctx, SqlParams params); spanner::ProfileQueryResult ProfileQueryImpl( SessionHolder& session, StatusOr& s, - TransactionContext const& ctx, SqlParams params); + TransactionContext& ctx, SqlParams params); StatusOr ProfileDmlImpl( SessionHolder& session, StatusOr& s, - TransactionContext const& ctx, SqlParams params); + TransactionContext& ctx, SqlParams params); StatusOr AnalyzeSqlImpl( SessionHolder& session, StatusOr& s, - TransactionContext const& ctx, SqlParams params); + TransactionContext& ctx, SqlParams params); StatusOr ExecutePartitionedDmlImpl( SessionHolder& session, StatusOr& s, - TransactionContext const& ctx, ExecutePartitionedDmlParams params); + TransactionContext& ctx, ExecutePartitionedDmlParams params); StatusOr> PartitionQueryImpl( SessionHolder& session, StatusOr& s, - TransactionContext const& ctx, PartitionQueryParams const& params); + TransactionContext& ctx, PartitionQueryParams const& params); StatusOr ExecuteBatchDmlImpl( SessionHolder& session, StatusOr& s, - TransactionContext const& ctx, ExecuteBatchDmlParams params); + TransactionContext& ctx, ExecuteBatchDmlParams params); StatusOr CommitImpl( SessionHolder& session, StatusOr& s, - TransactionContext const& ctx, CommitParams params); + TransactionContext& ctx, CommitParams params); Status RollbackImpl(SessionHolder& session, StatusOr& s, - TransactionContext const& ctx); + TransactionContext& ctx); spanner::BatchedCommitResultStream BatchWriteImpl(BatchWriteParams); @@ -140,7 +140,7 @@ class ConnectionImpl : public spanner::Connection { StatusOr ExecuteSqlImpl( SessionHolder& session, StatusOr& s, - TransactionContext const& ctx, SqlParams params, + TransactionContext& ctx, SqlParams params, google::spanner::v1::ExecuteSqlRequest::QueryMode query_mode, std::function>( google::spanner::v1::ExecuteSqlRequest& request)> const& @@ -150,13 +150,13 @@ class ConnectionImpl : public spanner::Connection { ResultType CommonQueryImpl( SessionHolder& session, StatusOr& s, - TransactionContext const& ctx, SqlParams params, + TransactionContext& ctx, SqlParams params, google::spanner::v1::ExecuteSqlRequest::QueryMode query_mode); template StatusOr CommonDmlImpl( SessionHolder& session, StatusOr& s, - TransactionContext const& ctx, SqlParams params, + TransactionContext& ctx, SqlParams params, google::spanner::v1::ExecuteSqlRequest::QueryMode query_mode); spanner::Database db_; diff --git a/google/cloud/spanner/internal/session_pool.cc b/google/cloud/spanner/internal/session_pool.cc index 380c6dd700259..6760a33d0ddf0 100644 --- a/google/cloud/spanner/internal/session_pool.cc +++ b/google/cloud/spanner/internal/session_pool.cc @@ -437,13 +437,20 @@ Status SessionPool::CreateSessions( return return_status; } -StatusOr SessionPool::Allocate(bool dissociate_from_pool) { - return Allocate(std::unique_lock(mu_), dissociate_from_pool); +StatusOr SessionPool::Allocate(Session::Mode mode) { + return Allocate(std::unique_lock(mu_), mode); } -StatusOr SessionPool::Multiplexed() { +StatusOr SessionPool::Multiplexed(Session::Mode mode) { if (opts_.has()) { std::unique_lock lk(mu_); + if (mode == Session::Mode::kDisassociated && multiplexed_session_.ok()) { + // For disassociated sessions, only used in partitioned operations, we do + // NOT want to maintain transaction to channel affinity. Instead, we want + // to "best effort" round-robin over the stubs. + return MakeDissociatedSessionHolder( + (*multiplexed_session_)->session_name()); + } return multiplexed_session_; } return internal::FailedPreconditionError( @@ -451,8 +458,10 @@ StatusOr SessionPool::Multiplexed() { } std::shared_ptr SessionPool::GetStub(Session const& session) { - auto const& channel = session.channel(); - if (channel) return channel->stub; + if (!session.is_disassociated() && !session.is_multiplexed()) { + auto const& channel = session.channel(); + if (channel) return channel->stub; + } // Multiplexed sessions, or sessions that were created for partitioned // Reads/Queries, do not have their own channel/stub, so return a stub @@ -460,6 +469,16 @@ std::shared_ptr SessionPool::GetStub(Session const& session) { return GetStub(std::unique_lock(mu_)); } +// In order to maintain transaction/channel affinity, we leverage the +// TransactionContext to make sure we use the same stub for the life of the +// transaction. +std::shared_ptr SessionPool::GetStub(Session const& session, + TransactionContext& context) { + if (context.stub.has_value()) return *(context.stub); + context.stub = GetStub(session); + return *context.stub; +} + int SessionPool::total_sessions() const { std::lock_guard lk(mu_); return total_sessions_; @@ -475,7 +494,7 @@ void SessionPool::DecrementSessionCount(std::unique_lock const&, } StatusOr SessionPool::Allocate(std::unique_lock lk, - bool dissociate_from_pool) { + Session::Mode mode) { // We choose to ignore the internal::CurrentOptions() here as it is // non-deterministic when RPCs to create sessions are actually made. // It is clearer if we just stick with the construction-time Options. @@ -485,10 +504,10 @@ StatusOr SessionPool::Allocate(std::unique_lock lk, // return the most recently used session. auto session = std::move(sessions_.back()); sessions_.pop_back(); - if (dissociate_from_pool) { + if (mode == Session::Mode::kDisassociated) { DecrementSessionCount(lk, *session); } - return {MakeSessionHolder(std::move(session), dissociate_from_pool)}; + return {MakeSessionHolder(std::move(session), mode)}; } // If the pool is at its max size, fail or wait until someone returns a @@ -601,8 +620,8 @@ void SessionPool::CreateSessionsAsync( } SessionHolder SessionPool::MakeSessionHolder(std::unique_ptr session, - bool dissociate_from_pool) { - if (dissociate_from_pool) { + Session::Mode mode) { + if (mode == Session::Mode::kDisassociated) { // Uses the default deleter; the `Session` is not returned to the pool. return {std::move(session)}; } diff --git a/google/cloud/spanner/internal/session_pool.h b/google/cloud/spanner/internal/session_pool.h index a5d08979c6055..ab108ae92af25 100644 --- a/google/cloud/spanner/internal/session_pool.h +++ b/google/cloud/spanner/internal/session_pool.h @@ -20,6 +20,7 @@ #include "google/cloud/spanner/internal/channel.h" #include "google/cloud/spanner/internal/session.h" #include "google/cloud/spanner/internal/spanner_stub.h" +#include "google/cloud/spanner/internal/transaction_impl.h" #include "google/cloud/spanner/retry_policy.h" #include "google/cloud/spanner/version.h" #include "google/cloud/backoff_policy.h" @@ -105,7 +106,7 @@ class SessionPool : public std::enable_shared_from_this { * @return a `SessionHolder` on success (which is guaranteed not to be * `nullptr`), or an error. */ - StatusOr Allocate(bool dissociate_from_pool = false); + StatusOr Allocate(Session::Mode mode = Session::Mode::kPooled); /** * Returns the multiplexed session, which allows an unbounded number of @@ -119,12 +120,15 @@ class SessionPool : public std::enable_shared_from_this { * @return a `SessionHolder` on success (which is guaranteed not to be * `nullptr`), or an error. */ - StatusOr Multiplexed(); + StatusOr Multiplexed( + Session::Mode mode = Session::Mode::kMultiplexed); /** * Return a `SpannerStub` to be used when making calls using `session`. */ std::shared_ptr GetStub(Session const& session); + std::shared_ptr GetStub(Session const& session, + TransactionContext& context); /** * Returns the number of sessions in the session pool plus the number of @@ -161,7 +165,7 @@ class SessionPool : public std::enable_shared_from_this { // Allocate a session from the pool. StatusOr Allocate(std::unique_lock, - bool dissociate_from_pool); + Session::Mode mode); // Returns a stub to use by round-robining between the channels. std::shared_ptr GetStub(std::unique_lock); @@ -185,6 +189,7 @@ class SessionPool : public std::enable_shared_from_this { std::shared_ptr); Status HandleMultiplexedCreateSessionDone( StatusOr response); + bool HasValidMultiplexedSession(std::unique_lock const&) const; Status Grow(std::unique_lock& lk, int sessions_to_create, WaitForSessionAllocation wait); // EXCLUSIVE_LOCKS_REQUIRED(mu_) @@ -202,7 +207,7 @@ class SessionPool : public std::enable_shared_from_this { int num_sessions); // LOCKS_EXCLUDED(mu_) SessionHolder MakeSessionHolder(std::unique_ptr session, - bool dissociate_from_pool); + Session::Mode mode); friend struct SessionPoolFriendForTest; // To test Async*() // Asynchronous calls used to maintain the pool. diff --git a/google/cloud/spanner/internal/transaction_impl.h b/google/cloud/spanner/internal/transaction_impl.h index ef122c5c66337..d3cbb06b679ef 100644 --- a/google/cloud/spanner/internal/transaction_impl.h +++ b/google/cloud/spanner/internal/transaction_impl.h @@ -36,13 +36,13 @@ struct TransactionContext { bool route_to_leader; std::string const& tag; std::int64_t seqno; + absl::optional> stub; }; template using VisitInvokeResult = ::google::cloud::internal::invoke_result_t< Functor, SessionHolder&, - StatusOr&, - TransactionContext const&>; + StatusOr&, TransactionContext&>; /** * The internal representation of a google::cloud::spanner::Transaction. @@ -97,13 +97,14 @@ class TransactionImpl { static_assert(google::cloud::internal::is_invocable< Functor, SessionHolder&, StatusOr&, - TransactionContext const&>::value, + TransactionContext&>::value, "TransactionImpl::Visit() functor has incompatible type."); - TransactionContext ctx{route_to_leader_, tag_, 0}; + TransactionContext ctx{route_to_leader_, tag_, 0, absl::nullopt}; { std::unique_lock lock(mu_); ctx.seqno = ++seqno_; // what about overflow? cond_.wait(lock, [this] { return state_ != State::kPending; }); + ctx.stub = stub_; if (state_ == State::kDone) { lock.unlock(); return f(session_, selector_, ctx); @@ -118,6 +119,7 @@ class TransactionImpl { bool done = false; { std::lock_guard lock(mu_); + stub_ = ctx.stub; state_ = selector_ && selector_->has_begin() ? State::kBegin : State::kDone; done = (state_ == State::kDone); @@ -155,6 +157,7 @@ class TransactionImpl { bool route_to_leader_; std::string tag_; std::int64_t seqno_; + absl::optional> stub_ = absl::nullopt; }; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/spanner/internal/transaction_impl_test.cc b/google/cloud/spanner/internal/transaction_impl_test.cc index 0ff3a7c3b8bb8..b4521b18d0c24 100644 --- a/google/cloud/spanner/internal/transaction_impl_test.cc +++ b/google/cloud/spanner/internal/transaction_impl_test.cc @@ -13,10 +13,12 @@ // limitations under the License. #include "google/cloud/spanner/internal/transaction_impl.h" +#include "google/cloud/spanner/testing/mock_spanner_stub.h" #include "google/cloud/spanner/timestamp.h" #include "google/cloud/spanner/transaction.h" #include "google/cloud/internal/port_platform.h" #include +#include #include #include #include @@ -49,15 +51,23 @@ class Client { kReadFailsAndTxnInvalidated, }; - explicit Client(Mode mode) : mode_(mode) {} + explicit Client(Mode mode) : mode_(mode) { + for (std::size_t i = 0; i < stubs_.max_size(); ++i) { + stubs_[i] = + std::shared_ptr(new spanner_testing::MockSpannerStub()); + } + next_stub_ = stubs_.begin(); + } // Set the `read_timestamp` we expect to see, and the `session_id` and // `txn_id` we want to use during the upcoming `Read()` calls. void Reset(spanner::Timestamp read_timestamp, std::string session_id, - std::string txn_id) { + std::string txn_id, + absl::optional> stub) { read_timestamp_ = read_timestamp; session_id_ = std::move(session_id); txn_id_ = std::move(txn_id); + expected_stub_ = std::move(stub); std::unique_lock lock(mu_); valid_visits_ = 0; } @@ -75,9 +85,8 @@ class Client { auto read = [this, &table, &keys, &columns]( SessionHolder& session, StatusOr& selector, - TransactionContext const& ctx) { - return this->Read(session, selector, ctx.tag, ctx.seqno, table, keys, - columns); + TransactionContext& ctx) { + return this->Read(session, selector, ctx, table, keys, columns); }; #if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS try { @@ -90,97 +99,139 @@ class Client { #endif } + std::array, 3> const& stubs() const { + return stubs_; + } + private: ResultSet Read(SessionHolder& session, StatusOr& selector, - std::string const& tag, std::int64_t seqno, - std::string const& table, KeySet const& keys, - std::vector const& columns); + TransactionContext& ctx, std::string const& table, + KeySet const& keys, std::vector const& columns); + + bool NoSelector(StatusOr& selector); + + bool SelectorHasBegin(SessionHolder& session, + StatusOr& selector, + TransactionContext& ctx); + + void SelectorHasId(SessionHolder& session, + StatusOr& selector, + TransactionContext& ctx); + + std::shared_ptr GetStub() { return *next_stub_++; } Mode mode_; spanner::Timestamp read_timestamp_; std::string session_id_; std::string txn_id_; + absl::optional> expected_stub_; + std::array, 3>::iterator next_stub_; + std::array, 3> stubs_; std::mutex mu_; std::int64_t begin_seqno_{0}; // GUARDED_BY(mu_) int valid_visits_; // GUARDED_BY(mu_) }; -// Transaction callback. Normally we would use the TransactionSelector -// to make a StreamingRead() RPC, and then, if the selector was a `begin`, -// switch the selector to use the allocated transaction ID. Here we use -// the pre-assigned transaction ID after checking the read timestamp. -ResultSet Client::Read(SessionHolder& session, - StatusOr& selector, - std::string const& tag, std::int64_t seqno, - std::string const&, KeySet const&, - std::vector const&) { +bool Client::NoSelector(StatusOr& selector) { // when we mark a transaction invalid, we use this Status. Status const failed_txn_status(StatusCode::kInternal, "Bad transaction"); + bool fail_with_throw = false; + std::unique_lock lock(mu_); + switch (mode_) { + case Mode::kReadSucceeds: // visits never valid + case Mode::kReadFailsAndTxnRemainsBegin: + break; + case Mode::kReadFailsAndTxnInvalidated: + EXPECT_EQ(selector.status(), failed_txn_status); + ++valid_visits_; + fail_with_throw = (valid_visits_ % 2 == 0); + break; + } + return fail_with_throw; +} +bool Client::SelectorHasBegin(SessionHolder& session, + StatusOr& selector, + TransactionContext& ctx) { + // when we mark a transaction invalid, we use this Status. + Status const failed_txn_status(StatusCode::kInternal, "Bad transaction"); bool fail_with_throw = false; - EXPECT_THAT(tag, IsEmpty()); - if (!selector) { - std::unique_lock lock(mu_); - switch (mode_) { - case Mode::kReadSucceeds: // visits never valid - case Mode::kReadFailsAndTxnRemainsBegin: - break; - case Mode::kReadFailsAndTxnInvalidated: - EXPECT_EQ(selector.status(), failed_txn_status); - ++valid_visits_; - fail_with_throw = (valid_visits_ % 2 == 0); - break; - } - } else if (selector->has_begin()) { - EXPECT_THAT(session, IsNull()); - if (selector->begin().has_read_only() && - selector->begin().read_only().has_read_timestamp()) { - auto const& proto = selector->begin().read_only().read_timestamp(); - if (spanner::MakeTimestamp(proto).value() == read_timestamp_ && - seqno > 0) { - std::unique_lock lock(mu_); - switch (mode_) { - case Mode::kReadSucceeds: // first visit valid - if (valid_visits_ == 0) ++valid_visits_; - break; - case Mode::kReadFailsAndTxnRemainsBegin: // visits always valid - case Mode::kReadFailsAndTxnInvalidated: - ++valid_visits_; - fail_with_throw = (valid_visits_ % 2 == 0); - break; - } - if (valid_visits_ != 0) begin_seqno_ = seqno; + EXPECT_THAT(session, IsNull()); + if (selector->begin().has_read_only() && + selector->begin().read_only().has_read_timestamp()) { + auto const& proto = selector->begin().read_only().read_timestamp(); + if (spanner::MakeTimestamp(proto).value() == read_timestamp_ && + ctx.seqno > 0) { + std::unique_lock lock(mu_); + switch (mode_) { + case Mode::kReadSucceeds: // first visit valid + if (valid_visits_ == 0) ++valid_visits_; + break; + case Mode::kReadFailsAndTxnRemainsBegin: // visits always valid + case Mode::kReadFailsAndTxnInvalidated: + ++valid_visits_; + fail_with_throw = (valid_visits_ % 2 == 0); + break; } + if (valid_visits_ != 0) begin_seqno_ = ctx.seqno; } + } + std::shared_ptr stub; + switch (mode_) { + case Mode::kReadSucceeds: + // `begin` -> `id`, calls now parallelized + session = MakeDissociatedSessionHolder(session_id_); + ctx.stub = GetStub(); + selector->set_id(txn_id_); + break; + case Mode::kReadFailsAndTxnRemainsBegin: + // leave as `begin`, calls stay serialized + break; + case Mode::kReadFailsAndTxnInvalidated: + // `begin` -> `error`, calls now parallelized + selector = failed_txn_status; + break; + } + return fail_with_throw; +} + +void Client::SelectorHasId(SessionHolder& session, + StatusOr& selector, + TransactionContext& ctx) { + if (selector->id() == txn_id_) { + EXPECT_THAT(session, NotNull()); + EXPECT_EQ(session_id_, session->session_name()); + EXPECT_THAT(expected_stub_, ::testing::Optional(ctx.stub)); + + std::unique_lock lock(mu_); switch (mode_) { - case Mode::kReadSucceeds: - // `begin` -> `id`, calls now parallelized - session = MakeDissociatedSessionHolder(session_id_); - selector->set_id(txn_id_); - break; - case Mode::kReadFailsAndTxnRemainsBegin: - // leave as `begin`, calls stay serialized + case Mode::kReadSucceeds: // non-initial visits valid + if (valid_visits_ != 0 && ctx.seqno > begin_seqno_) ++valid_visits_; break; + case Mode::kReadFailsAndTxnRemainsBegin: // visits never valid case Mode::kReadFailsAndTxnInvalidated: - // `begin` -> `error`, calls now parallelized - selector = failed_txn_status; break; } + } +} + +// Transaction callback. Normally we would use the TransactionSelector +// to make a StreamingRead() RPC, and then, if the selector was a `begin`, +// switch the selector to use the allocated transaction ID. Here we use +// the pre-assigned transaction ID after checking the read timestamp. +ResultSet Client::Read(SessionHolder& session, + StatusOr& selector, + TransactionContext& ctx, std::string const&, + KeySet const&, std::vector const&) { + bool fail_with_throw = false; + EXPECT_THAT(ctx.tag, IsEmpty()); + if (!selector) { + fail_with_throw = NoSelector(selector); + } else if (selector->has_begin()) { + fail_with_throw = SelectorHasBegin(session, selector, ctx); } else { - if (selector->id() == txn_id_) { - EXPECT_THAT(session, NotNull()); - EXPECT_EQ(session_id_, session->session_name()); - std::unique_lock lock(mu_); - switch (mode_) { - case Mode::kReadSucceeds: // non-initial visits valid - if (valid_visits_ != 0 && seqno > begin_seqno_) ++valid_visits_; - break; - case Mode::kReadFailsAndTxnRemainsBegin: // visits never valid - case Mode::kReadFailsAndTxnInvalidated: - break; - } - } + SelectorHasId(session, selector, ctx); } if (fail_with_throw) { #if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS @@ -196,12 +247,12 @@ ResultSet Client::Read(SessionHolder& session, // read-only transaction with an exact-staleness timestamp, and return the // number of valid visitations to that transaction (should be `n_threads`). int MultiThreadedRead(int n_threads, Client* client, std::time_t read_time, - std::string const& session_id, - std::string const& txn_id) { + std::string const& session_id, std::string const& txn_id, + absl::optional> stub) { auto read_timestamp = spanner::MakeTimestamp(std::chrono::system_clock::from_time_t(read_time)) .value(); - client->Reset(read_timestamp, session_id, txn_id); + client->Reset(read_timestamp, session_id, txn_id, std::move(stub)); spanner::Transaction::ReadOnlyOptions opts(read_timestamp); spanner::Transaction txn(opts); @@ -235,23 +286,32 @@ int MultiThreadedRead(int n_threads, Client* client, std::time_t read_time, TEST(InternalTransaction, ReadSucceeds) { Client client(Client::Mode::kReadSucceeds); - EXPECT_EQ(1, MultiThreadedRead(1, &client, 1562359982, "sess0", "txn0")); - EXPECT_EQ(64, MultiThreadedRead(64, &client, 1562360571, "sess1", "txn1")); - EXPECT_EQ(128, MultiThreadedRead(128, &client, 1562361252, "sess2", "txn2")); + EXPECT_EQ(1, MultiThreadedRead(1, &client, 1562359982, "sess0", "txn0", + client.stubs()[0])); + EXPECT_EQ(64, MultiThreadedRead(64, &client, 1562360571, "sess1", "txn1", + client.stubs()[1])); + EXPECT_EQ(128, MultiThreadedRead(128, &client, 1562361252, "sess2", "txn2", + client.stubs()[2])); } TEST(InternalTransaction, ReadFailsAndTxnRemainsBegin) { Client client(Client::Mode::kReadFailsAndTxnRemainsBegin); - EXPECT_EQ(1, MultiThreadedRead(1, &client, 1562359982, "sess0", "txn0")); - EXPECT_EQ(64, MultiThreadedRead(64, &client, 1562360571, "sess1", "txn1")); - EXPECT_EQ(128, MultiThreadedRead(128, &client, 1562361252, "sess2", "txn2")); + EXPECT_EQ(1, MultiThreadedRead(1, &client, 1562359982, "sess0", "txn0", + absl::nullopt)); + EXPECT_EQ(64, MultiThreadedRead(64, &client, 1562360571, "sess1", "txn1", + absl::nullopt)); + EXPECT_EQ(128, MultiThreadedRead(128, &client, 1562361252, "sess2", "txn2", + absl::nullopt)); } TEST(InternalTransaction, ReadFailsAndTxnInvalidated) { Client client(Client::Mode::kReadFailsAndTxnInvalidated); - EXPECT_EQ(1, MultiThreadedRead(1, &client, 1562359982, "sess0", "txn0")); - EXPECT_EQ(64, MultiThreadedRead(64, &client, 1562360571, "sess1", "txn1")); - EXPECT_EQ(128, MultiThreadedRead(128, &client, 1562361252, "sess2", "txn2")); + EXPECT_EQ(1, MultiThreadedRead(1, &client, 1562359982, "sess0", "txn0", + absl::nullopt)); + EXPECT_EQ(64, MultiThreadedRead(64, &client, 1562360571, "sess1", "txn1", + absl::nullopt)); + EXPECT_EQ(128, MultiThreadedRead(128, &client, 1562361252, "sess2", "txn2", + absl::nullopt)); } } // namespace