From 8df44aab4f4d5d423946a375b16fcb02760a0e52 Mon Sep 17 00:00:00 2001 From: Scott Hart Date: Mon, 23 Mar 2026 17:24:00 -0400 Subject: [PATCH 1/3] impl(bigtable): update stub_factory and connection creation functions for dynamic pool --- google/cloud/bigtable/data_connection.cc | 33 +++- google/cloud/bigtable/data_connection_test.cc | 31 +++- .../internal/bigtable_stub_factory.cc | 161 +++++++++++++++--- .../bigtable/internal/bigtable_stub_factory.h | 62 ++++++- .../internal/bigtable_stub_factory_test.cc | 82 +++++++++ 5 files changed, 329 insertions(+), 40 deletions(-) diff --git a/google/cloud/bigtable/data_connection.cc b/google/cloud/bigtable/data_connection.cc index bada0bb3d7952..91ee5a61d5b46 100644 --- a/google/cloud/bigtable/data_connection.cc +++ b/google/cloud/bigtable/data_connection.cc @@ -184,14 +184,35 @@ std::shared_ptr MakeDataConnection(Options options) { google::cloud::internal::MakeBackgroundThreadsFactory(options)(); auto auth = google::cloud::internal::CreateAuthenticationStrategy( background->cq(), options); - auto stub = bigtable_internal::CreateBigtableStub(std::move(auth), - background->cq(), options); auto limiter = bigtable_internal::MakeMutateRowsLimiter(background->cq(), options); - std::shared_ptr conn = - std::make_shared( - std::move(background), std::move(stub), std::move(limiter), - std::move(options)); + std::shared_ptr conn; + + if (options.has()) { + auto stub_creation_fn = + [auth, cq = background->cq(), options]( + std::string_view instance_name, + bigtable_internal::StubManager::Priming priming) { + return bigtable_internal::CreateBigtableStub(auth, cq, instance_name, + priming, options); + }; + + auto affinity_stubs = bigtable_internal::CreateBigtableAffinityStubs( + options.get(), + stub_creation_fn); + conn = std::make_shared( + std::move(background), + std::make_unique( + std::move(affinity_stubs), stub_creation_fn), + std::move(limiter), std::move(options)); + } else { + auto stub = bigtable_internal::CreateBigtableStub( + std::move(auth), background->cq(), options); + conn = std::make_shared( + std::move(background), + std::make_unique(std::move(stub)), + std::move(limiter), std::move(options)); + } if (google::cloud::internal::TracingEnabled(conn->options())) { conn = bigtable_internal::MakeDataTracingConnection(std::move(conn)); } diff --git a/google/cloud/bigtable/data_connection_test.cc b/google/cloud/bigtable/data_connection_test.cc index e51d7f38819e8..2682aa2dee32e 100644 --- a/google/cloud/bigtable/data_connection_test.cc +++ b/google/cloud/bigtable/data_connection_test.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "google/cloud/bigtable/data_connection.h" +#include "google/cloud/bigtable/internal/bigtable_stub_factory.h" #include "google/cloud/bigtable/options.h" #include "google/cloud/common_options.h" #include "google/cloud/credentials.h" @@ -29,6 +30,12 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN namespace { using ms = std::chrono::milliseconds; +using ::google::cloud::testing_util::DisableTracing; +using ::google::cloud::testing_util::EnableTracing; +using ::google::cloud::testing_util::SpanNamed; +using ::testing::Contains; +using ::testing::Eq; +using ::testing::IsEmpty; Options TestOptions() { return Options{} @@ -54,12 +61,6 @@ TEST(MakeDataConnection, DefaultsOptions) { << "User supplied Options are overridden in MakeDataConnection()"; } -using ::google::cloud::testing_util::DisableTracing; -using ::google::cloud::testing_util::EnableTracing; -using ::google::cloud::testing_util::SpanNamed; -using ::testing::Contains; -using ::testing::IsEmpty; - TEST(MakeDataConnection, TracingEnabled) { auto span_catcher = testing_util::InstallSpanCatcher(); @@ -71,6 +72,24 @@ TEST(MakeDataConnection, TracingEnabled) { Contains(SpanNamed("bigtable::Table::Apply"))); } +TEST(MakeDataConnection, InstanceChannelAffinityOption) { + InstanceResource instance_a{Project("my-project"), "instance-a"}; + InstanceResource instance_b{Project("my-project"), "instance-b"}; + auto conn = + MakeDataConnection(TestOptions() + .set("user-supplied") + .set( + {instance_a, instance_b})); + auto options = conn->options(); + EXPECT_TRUE(options.has()) + << "Options are not defaulted in MakeDataConnection()"; + EXPECT_EQ(options.get(), "user-supplied") + << "User supplied Options are overridden in MakeDataConnection()"; + ASSERT_TRUE(options.has()); + EXPECT_THAT(options.get().size(), + Eq(2)); +} + TEST(MakeDataConnection, TracingDisabled) { auto span_catcher = testing_util::InstallSpanCatcher(); diff --git a/google/cloud/bigtable/internal/bigtable_stub_factory.cc b/google/cloud/bigtable/internal/bigtable_stub_factory.cc index 8f201cf522438..408be01438630 100644 --- a/google/cloud/bigtable/internal/bigtable_stub_factory.cc +++ b/google/cloud/bigtable/internal/bigtable_stub_factory.cc @@ -17,6 +17,7 @@ #include "google/cloud/bigtable/internal/bigtable_channel_refresh.h" #include "google/cloud/bigtable/internal/bigtable_logging_decorator.h" #include "google/cloud/bigtable/internal/bigtable_metadata_decorator.h" +#include "google/cloud/bigtable/internal/bigtable_random_two_least_used_decorator.h" #include "google/cloud/bigtable/internal/bigtable_round_robin_decorator.h" #include "google/cloud/bigtable/internal/bigtable_tracing_stub.h" #include "google/cloud/bigtable/internal/connection_refresh_state.h" @@ -60,56 +61,154 @@ std::string FeaturesMetadata() { return *kFeatures; } +std::shared_ptr ApplyCommonDecorators( + std::shared_ptr auth, + std::shared_ptr stub, Options const& options) { + if (auth->RequiresConfigureContext()) { + stub = std::make_shared(std::move(auth), std::move(stub)); + } + stub = std::make_shared( + std::move(stub), + std::multimap{ + {"bigtable-features", FeaturesMetadata()}}, + internal::HandCraftedLibClientHeader()); + if (internal::Contains(options.get(), "rpc")) { + GCP_LOG(INFO) << "Enabled logging for gRPC calls"; + stub = std::make_shared( + std::move(stub), options.get(), + options.get()); + } + if (internal::TracingEnabled(options)) { + stub = MakeBigtableTracingStub(std::move(stub)); + } + return stub; +} + } // namespace std::shared_ptr CreateBigtableStubRoundRobin( - Options const& options, - std::function(int)> child_factory) { + Options const& options, std::function(int)> + refreshing_channel_stub_factory) { std::vector> children( (std::max)(1, options.get())); int id = 0; std::generate(children.begin(), children.end(), - [&id, &child_factory] { return child_factory(id++); }); + [&id, &refreshing_channel_stub_factory] { + return refreshing_channel_stub_factory(id++); + }); return std::make_shared(std::move(children)); } +std::shared_ptr CreateBigtableStubRandomTwoLeastUsed( + std::shared_ptr auth, + std::shared_ptr cq_impl, + std::string_view instance_name, StubManager::Priming priming, + Options const& options, BaseBigtableStubFactory stub_factory, + std::shared_ptr refresh_state) { + auto refreshing_channel_stub_factory = + [stub_factory = std::move(stub_factory), cq_impl, refresh_state, + auth = std::move(auth), + options](std::uint32_t id, std::string_view instance_name, + StubManager::Priming priming) + -> StatusOr>> { + auto wrapper = std::make_shared>(); + auto connection_status_fn = [weak = wrapper->MakeWeak()](Status const& s) { + if (auto self = weak.lock()) { + self->set_last_refresh_status(s); + } + if (!s.ok()) { + GCP_LOG(WARNING) << "Failed to refresh connection. Error: " << s; + } + }; + + auto channel = CreateGrpcChannel(*auth, options, id); + auto stub = stub_factory(std::move(channel)); + if (priming == StubManager::Priming::kSynchronousPriming) { + grpc::ClientContext client_context; + google::bigtable::v2::PingAndWarmRequest request; + request.set_name(instance_name); + auto response = + stub->PingAndWarm(client_context, options, std::move(request)); + if (!response.ok()) return response.status(); + } + + ScheduleStubRefresh(cq_impl, refresh_state, stub, + std::string{instance_name}, + std::move(connection_status_fn)); + + wrapper->set_stub(std::move(stub)); + return wrapper; + }; + + std::vector>> children; + children.reserve(std::max(1, options.get())); + std::uint32_t id = 0; + for (std::uint32_t i = 0; i < children.capacity(); ++i) { + auto stub = refreshing_channel_stub_factory(id++, instance_name, priming); + if (stub.ok()) { + children.push_back(*std::move(stub)); + } + } + + return std::make_shared( + DynamicChannelPool::Create( + std::string{instance_name}, CompletionQueue(std::move(cq_impl)), + std::move(children), std::move(refresh_state), + std::move(refreshing_channel_stub_factory))); +} + +std::shared_ptr CreateDecoratedStubs( + std::shared_ptr auth, + CompletionQueue const& cq, std::string_view instance_name, + StubManager::Priming priming, Options const& options, + BaseBigtableStubFactory const& stub_factory) { + auto cq_impl = internal::GetCompletionQueueImpl(cq); + auto refresh = std::make_shared( + cq_impl, options.get(), + options.get()); + auto stub = CreateBigtableStubRandomTwoLeastUsed( + auth, std::move(cq_impl), instance_name, priming, options, stub_factory, + std::move(refresh)); + return ApplyCommonDecorators(std::move(auth), std::move(stub), options); +} + std::shared_ptr CreateDecoratedStubs( std::shared_ptr auth, CompletionQueue const& cq, Options const& options, - BaseBigtableStubFactory const& base_factory) { + BaseBigtableStubFactory const& stub_factory) { auto cq_impl = internal::GetCompletionQueueImpl(cq); auto refresh = std::make_shared( cq_impl, options.get(), options.get()); - auto child_factory = [base_factory, cq_impl, refresh, &auth, - options](int id) { + // Cannot use Dynamic Channel Pool as it requires affinity. + auto refreshing_channel_stub_factory = [stub_factory, cq_impl, refresh, &auth, + options](int id) { auto channel = CreateGrpcChannel(*auth, options, id); if (refresh->enabled()) ScheduleChannelRefresh(cq_impl, refresh, channel); - return base_factory(std::move(channel)); + return stub_factory(std::move(channel)); }; - auto stub = CreateBigtableStubRoundRobin(options, std::move(child_factory)); + auto stub = CreateBigtableStubRoundRobin( + options, std::move(refreshing_channel_stub_factory)); if (refresh->enabled()) { stub = std::make_shared(std::move(stub), std::move(refresh)); } - if (auth->RequiresConfigureContext()) { - stub = std::make_shared(std::move(auth), std::move(stub)); - } - stub = std::make_shared( - std::move(stub), - std::multimap{ - {"bigtable-features", FeaturesMetadata()}}, - internal::HandCraftedLibClientHeader()); - if (internal::Contains(options.get(), "rpc")) { - GCP_LOG(INFO) << "Enabled logging for gRPC calls"; - stub = std::make_shared( - std::move(stub), options.get(), - options.get()); - } - if (internal::TracingEnabled(options)) { - stub = MakeBigtableTracingStub(std::move(stub)); + return ApplyCommonDecorators(std::move(auth), std::move(stub), options); +} + +absl::flat_hash_map> +CreateBigtableAffinityStubs( + std::vector const& instances, + StubManager::StubCreationFn const& stub_creation_fn) { + absl::flat_hash_map> + affinity_stubs; + for (auto const& instance : instances) { + affinity_stubs.insert(std::make_pair( + instance.FullName(), + stub_creation_fn(instance.FullName(), + StubManager::Priming::kSynchronousPriming))); } - return stub; + return affinity_stubs; } std::shared_ptr CreateBigtableStub( @@ -122,6 +221,18 @@ std::shared_ptr CreateBigtableStub( }); } +std::shared_ptr CreateBigtableStub( + std::shared_ptr auth, + CompletionQueue const& cq, std::string_view instance_name, + StubManager::Priming priming, Options const& options) { + return CreateDecoratedStubs( + std::move(auth), cq, instance_name, priming, options, + [](std::shared_ptr c) { + return std::make_shared( + google::bigtable::v2::Bigtable::NewStub(std::move(c))); + }); +} + GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace bigtable_internal } // namespace cloud diff --git a/google/cloud/bigtable/internal/bigtable_stub_factory.h b/google/cloud/bigtable/internal/bigtable_stub_factory.h index cdf7a633b0f42..03d762b9dad03 100644 --- a/google/cloud/bigtable/internal/bigtable_stub_factory.h +++ b/google/cloud/bigtable/internal/bigtable_stub_factory.h @@ -15,7 +15,10 @@ #ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_BIGTABLE_STUB_FACTORY_H #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_BIGTABLE_STUB_FACTORY_H +#include "google/cloud/bigtable/instance_resource.h" #include "google/cloud/bigtable/internal/bigtable_stub.h" +#include "google/cloud/bigtable/internal/connection_refresh_state.h" +#include "google/cloud/bigtable/internal/stub_manager.h" #include "google/cloud/completion_queue.h" #include "google/cloud/internal/unified_grpc_credentials.h" #include "google/cloud/options.h" @@ -23,6 +26,30 @@ #include #include +namespace google::cloud::bigtable { +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN + +// TODO(#16035): Move this Option to bigtable/options.h in the experimental +// namespace when the feature is ready. +namespace experimental { +/** + * If set, a dynamic channel pool is created for each instance that requests + * are destined. Instances specified as part of this Option have dynamic + * channel pools created and primed as part of DataConnection construction. If + * no Instances are specified, then dynamic channel pool creation is deferred + * until the first request sent, increasing time to first byte latency. + * + * @note This option must be supplied to `MakeDataConnection()` in order to take + * effect. + */ +struct InstanceChannelAffinityOption { + using Type = std::vector; +}; +} // namespace experimental + +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace google::cloud::bigtable + namespace google { namespace cloud { namespace bigtable_internal { @@ -32,20 +59,49 @@ using BaseBigtableStubFactory = std::function( std::shared_ptr)>; std::shared_ptr CreateBigtableStubRoundRobin( - Options const& options, - std::function(int)> child_factory); + Options const& options, std::function(int)> + refreshing_channel_stub_factory); + +std::shared_ptr CreateBigtableStubRandomTwoLeastUsed( + std::shared_ptr auth, + std::shared_ptr cq_impl, + std::string_view instance_name, StubManager::Priming priming, + Options const& options, BaseBigtableStubFactory stub_factory, + std::shared_ptr refresh_state); /// Used in testing to create decorated mocks. std::shared_ptr CreateDecoratedStubs( std::shared_ptr auth, CompletionQueue const& cq, Options const& options, - BaseBigtableStubFactory const& base_factory); + BaseBigtableStubFactory const& stub_factory); + +/// Used in testing to create decorated mocks. +std::shared_ptr CreateDecoratedStubs( + std::shared_ptr auth, + CompletionQueue const& cq, std::string_view instance_name, + StubManager::Priming priming, Options const& options, + BaseBigtableStubFactory const& stub_factory); /// Default function used by `DataConnectionImpl`. +/// No instance affinity, uses legacy grpc::GetState to refresh using the +/// round-robin channel selection strategy. std::shared_ptr CreateBigtableStub( std::shared_ptr auth, CompletionQueue const& cq, Options const& options); +/// Creates a stub with instance affinity using PingAndWarm priming using the +/// random two least used channel selection strategy. +std::shared_ptr CreateBigtableStub( + std::shared_ptr auth, + CompletionQueue const& cq, std::string_view instance_name, + StubManager::Priming priming, Options const& options); + +/// Creates a map of instance to stub pairs. +absl::flat_hash_map> +CreateBigtableAffinityStubs( + std::vector const& instances, + StubManager::StubCreationFn const& stub_creation_fn); + GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace bigtable_internal } // namespace cloud diff --git a/google/cloud/bigtable/internal/bigtable_stub_factory_test.cc b/google/cloud/bigtable/internal/bigtable_stub_factory_test.cc index a94e233fd2928..e188d05f6a6d6 100644 --- a/google/cloud/bigtable/internal/bigtable_stub_factory_test.cc +++ b/google/cloud/bigtable/internal/bigtable_stub_factory_test.cc @@ -21,6 +21,7 @@ #include "google/cloud/internal/api_client_header.h" #include "google/cloud/internal/async_streaming_read_rpc_impl.h" #include "google/cloud/internal/make_status.h" +#include "google/cloud/testing_util/fake_completion_queue_impl.h" #include "google/cloud/testing_util/mock_grpc_authentication_strategy.h" #include "google/cloud/testing_util/opentelemetry_matchers.h" #include "google/cloud/testing_util/scoped_log.h" @@ -44,8 +45,10 @@ using ::google::cloud::testing_util::StatusIs; using ::google::cloud::testing_util::ValidateMetadataFixture; using ::testing::_; using ::testing::Contains; +using ::testing::Eq; using ::testing::HasSubstr; using ::testing::IsEmpty; +using ::testing::MockFunction; using ::testing::Not; using ::testing::Optional; using ::testing::Pair; @@ -107,6 +110,56 @@ TEST(BigtableStubFactory, RoundRobin) { } } +TEST(BigtableStubFactory, RandomTwoLeastUsed) { + auto constexpr kTestChannels = 3; + + MockFactory factory; + auto mock = std::make_shared(); + EXPECT_CALL(*mock, PingAndWarm) + .WillRepeatedly(Return(google::bigtable::v2::PingAndWarmResponse{})); + EXPECT_CALL(*mock, MutateRow) + .WillRepeatedly(Return(internal::AbortedError("fail"))); + + EXPECT_CALL(factory, Call) + .Times(kTestChannels) + .WillRepeatedly( + [&](std::shared_ptr const&) { return mock; }); + + auto expect_channel_id = [](int id) { + return ResultOf( + "channel ID", + [](grpc::ChannelArguments const& args) { + return internal::GetIntChannelArgument(args, "grpc.channel_id"); + }, + Optional(id)); + }; + + auto auth = MakeStubFactoryMockAuth(); + EXPECT_CALL(*auth, CreateChannel("localhost:1", expect_channel_id(0))); + EXPECT_CALL(*auth, CreateChannel("localhost:1", expect_channel_id(1))); + EXPECT_CALL(*auth, CreateChannel("localhost:1", expect_channel_id(2))); + EXPECT_CALL(*auth, RequiresConfigureContext).WillOnce(Return(false)); + + auto fake_cq_impl = std::make_shared(); + CompletionQueue cq(fake_cq_impl); + auto stub = CreateDecoratedStubs( + std::move(auth), cq, "projects/my-projects/instances/my-instance", + StubManager::Priming::kSynchronousPriming, + Options{} + .set(kTestChannels) + .set("localhost:1") + .set(MakeInsecureCredentials()), + factory.AsStdFunction()); + + grpc::ClientContext context; + for (int i = 0; i != kTestChannels; ++i) { + auto response = stub->MutateRow(context, Options{}, {}); + EXPECT_THAT(response, StatusIs(StatusCode::kAborted, "fail")); + } + + fake_cq_impl->SimulateCompletion(false); +} + // Note that the channel refreshing decorator is tested in // bigtable_channel_refresh_test.cc @@ -335,6 +388,35 @@ TEST(BigtableStubFactory, TracingDisabled) { EXPECT_THAT(span_catcher->GetSpans(), IsEmpty()); } +TEST(BigtableStubFactory, CreateBigtableAffinityStubs) { + bigtable::InstanceResource instance_a{Project("my-project"), "instance-a"}; + bigtable::InstanceResource instance_b{Project("my-project"), "instance-b"}; + std::vector instances; + instances.push_back(instance_a); + instances.push_back(instance_b); + + MockFunction(std::string_view, + StubManager::Priming)> + stub_creation_fn; + + EXPECT_CALL(stub_creation_fn, Call) + .WillOnce([&](std::string_view instance, StubManager::Priming priming) { + EXPECT_THAT(instance, Eq(instance_a.FullName())); + EXPECT_THAT(priming, Eq(StubManager::Priming::kSynchronousPriming)); + return std::make_shared(); + }) + .WillOnce([&](std::string_view instance, StubManager::Priming priming) { + EXPECT_THAT(instance, Eq(instance_b.FullName())); + EXPECT_THAT(priming, Eq(StubManager::Priming::kSynchronousPriming)); + return std::make_shared(); + }); + + auto stubs = + CreateBigtableAffinityStubs(instances, stub_creation_fn.AsStdFunction()); + + EXPECT_THAT(stubs, testing::SizeIs(2)); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace bigtable_internal From 1144d49082fe592360d2496494548c627569b913 Mon Sep 17 00:00:00 2001 From: Scott Hart Date: Mon, 23 Mar 2026 17:50:27 -0400 Subject: [PATCH 2/3] create string from string_view for proto method --- google/cloud/bigtable/internal/bigtable_stub_factory.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/bigtable/internal/bigtable_stub_factory.cc b/google/cloud/bigtable/internal/bigtable_stub_factory.cc index 408be01438630..c95d23a0db618 100644 --- a/google/cloud/bigtable/internal/bigtable_stub_factory.cc +++ b/google/cloud/bigtable/internal/bigtable_stub_factory.cc @@ -126,7 +126,7 @@ std::shared_ptr CreateBigtableStubRandomTwoLeastUsed( if (priming == StubManager::Priming::kSynchronousPriming) { grpc::ClientContext client_context; google::bigtable::v2::PingAndWarmRequest request; - request.set_name(instance_name); + request.set_name(std::string{instance_name}); auto response = stub->PingAndWarm(client_context, options, std::move(request)); if (!response.ok()) return response.status(); From c5cefffe48735b0eed4b2929fb480171def3ca65 Mon Sep 17 00:00:00 2001 From: Scott Hart Date: Tue, 24 Mar 2026 11:47:58 -0400 Subject: [PATCH 3/3] add comments --- google/cloud/bigtable/internal/bigtable_stub_factory.h | 3 +++ 1 file changed, 3 insertions(+) diff --git a/google/cloud/bigtable/internal/bigtable_stub_factory.h b/google/cloud/bigtable/internal/bigtable_stub_factory.h index 03d762b9dad03..750e2e0739b42 100644 --- a/google/cloud/bigtable/internal/bigtable_stub_factory.h +++ b/google/cloud/bigtable/internal/bigtable_stub_factory.h @@ -58,10 +58,13 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN using BaseBigtableStubFactory = std::function( std::shared_ptr)>; +/// Creates a static pool of stubs that use a round-robin strategy to select. std::shared_ptr CreateBigtableStubRoundRobin( Options const& options, std::function(int)> refreshing_channel_stub_factory); +/// Creates a dynamic pool of stubs that selects the least used from a random +/// pair of stubs. std::shared_ptr CreateBigtableStubRandomTwoLeastUsed( std::shared_ptr auth, std::shared_ptr cq_impl,