Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions google/cloud/bigtable/data_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,35 @@ std::shared_ptr<DataConnection> MakeDataConnection(Options options) {
google::cloud::internal::MakeBackgroundThreadsFactory(options)();
auto auth = google::cloud::internal::CreateAuthenticationStrategy(
background->cq(), options);
auto stub = bigtable_internal::CreateBigtableStub(std::move(auth),
background->cq(), options);
auto limiter =
bigtable_internal::MakeMutateRowsLimiter(background->cq(), options);
std::shared_ptr<DataConnection> conn =
std::make_shared<bigtable_internal::DataConnectionImpl>(
std::move(background), std::move(stub), std::move(limiter),
std::move(options));
std::shared_ptr<DataConnection> conn;

if (options.has<experimental::InstanceChannelAffinityOption>()) {
auto stub_creation_fn =
[auth, cq = background->cq(), options](
std::string_view instance_name,
bigtable_internal::StubManager::Priming priming) {
return bigtable_internal::CreateBigtableStub(auth, cq, instance_name,
priming, options);
};

auto affinity_stubs = bigtable_internal::CreateBigtableAffinityStubs(
options.get<experimental::InstanceChannelAffinityOption>(),
stub_creation_fn);
conn = std::make_shared<bigtable_internal::DataConnectionImpl>(
std::move(background),
std::make_unique<bigtable_internal::StubManager>(
std::move(affinity_stubs), stub_creation_fn),
std::move(limiter), std::move(options));
} else {
auto stub = bigtable_internal::CreateBigtableStub(
std::move(auth), background->cq(), options);
conn = std::make_shared<bigtable_internal::DataConnectionImpl>(
std::move(background),
std::make_unique<bigtable_internal::StubManager>(std::move(stub)),
std::move(limiter), std::move(options));
}
if (google::cloud::internal::TracingEnabled(conn->options())) {
conn = bigtable_internal::MakeDataTracingConnection(std::move(conn));
}
Expand Down
31 changes: 25 additions & 6 deletions google/cloud/bigtable/data_connection_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include "google/cloud/bigtable/data_connection.h"
#include "google/cloud/bigtable/internal/bigtable_stub_factory.h"
#include "google/cloud/bigtable/options.h"
#include "google/cloud/common_options.h"
#include "google/cloud/credentials.h"
Expand All @@ -29,6 +30,12 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
namespace {

using ms = std::chrono::milliseconds;
using ::google::cloud::testing_util::DisableTracing;
using ::google::cloud::testing_util::EnableTracing;
using ::google::cloud::testing_util::SpanNamed;
using ::testing::Contains;
using ::testing::Eq;
using ::testing::IsEmpty;

Options TestOptions() {
return Options{}
Expand All @@ -54,12 +61,6 @@ TEST(MakeDataConnection, DefaultsOptions) {
<< "User supplied Options are overridden in MakeDataConnection()";
}

using ::google::cloud::testing_util::DisableTracing;
using ::google::cloud::testing_util::EnableTracing;
using ::google::cloud::testing_util::SpanNamed;
using ::testing::Contains;
using ::testing::IsEmpty;

TEST(MakeDataConnection, TracingEnabled) {
auto span_catcher = testing_util::InstallSpanCatcher();

Expand All @@ -71,6 +72,24 @@ TEST(MakeDataConnection, TracingEnabled) {
Contains(SpanNamed("bigtable::Table::Apply")));
}

TEST(MakeDataConnection, InstanceChannelAffinityOption) {
InstanceResource instance_a{Project("my-project"), "instance-a"};
InstanceResource instance_b{Project("my-project"), "instance-b"};
auto conn =
MakeDataConnection(TestOptions()
.set<AppProfileIdOption>("user-supplied")
.set<experimental::InstanceChannelAffinityOption>(
{instance_a, instance_b}));
auto options = conn->options();
EXPECT_TRUE(options.has<DataBackoffPolicyOption>())
<< "Options are not defaulted in MakeDataConnection()";
EXPECT_EQ(options.get<AppProfileIdOption>(), "user-supplied")
<< "User supplied Options are overridden in MakeDataConnection()";
ASSERT_TRUE(options.has<experimental::InstanceChannelAffinityOption>());
EXPECT_THAT(options.get<experimental::InstanceChannelAffinityOption>().size(),
Eq(2));
}

TEST(MakeDataConnection, TracingDisabled) {
auto span_catcher = testing_util::InstallSpanCatcher();

Expand Down
161 changes: 136 additions & 25 deletions google/cloud/bigtable/internal/bigtable_stub_factory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "google/cloud/bigtable/internal/bigtable_channel_refresh.h"
#include "google/cloud/bigtable/internal/bigtable_logging_decorator.h"
#include "google/cloud/bigtable/internal/bigtable_metadata_decorator.h"
#include "google/cloud/bigtable/internal/bigtable_random_two_least_used_decorator.h"
#include "google/cloud/bigtable/internal/bigtable_round_robin_decorator.h"
#include "google/cloud/bigtable/internal/bigtable_tracing_stub.h"
#include "google/cloud/bigtable/internal/connection_refresh_state.h"
Expand Down Expand Up @@ -60,56 +61,154 @@ std::string FeaturesMetadata() {
return *kFeatures;
}

std::shared_ptr<BigtableStub> ApplyCommonDecorators(
std::shared_ptr<internal::GrpcAuthenticationStrategy> auth,
std::shared_ptr<BigtableStub> stub, Options const& options) {
if (auth->RequiresConfigureContext()) {
stub = std::make_shared<BigtableAuth>(std::move(auth), std::move(stub));
}
stub = std::make_shared<BigtableMetadata>(
std::move(stub),
std::multimap<std::string, std::string>{
{"bigtable-features", FeaturesMetadata()}},
internal::HandCraftedLibClientHeader());
if (internal::Contains(options.get<LoggingComponentsOption>(), "rpc")) {
GCP_LOG(INFO) << "Enabled logging for gRPC calls";
stub = std::make_shared<BigtableLogging>(
std::move(stub), options.get<GrpcTracingOptionsOption>(),
options.get<LoggingComponentsOption>());
}
if (internal::TracingEnabled(options)) {
stub = MakeBigtableTracingStub(std::move(stub));
}
return stub;
}

} // namespace

std::shared_ptr<BigtableStub> CreateBigtableStubRoundRobin(
Options const& options,
std::function<std::shared_ptr<BigtableStub>(int)> child_factory) {
Options const& options, std::function<std::shared_ptr<BigtableStub>(int)>
refreshing_channel_stub_factory) {
std::vector<std::shared_ptr<BigtableStub>> children(
(std::max)(1, options.get<GrpcNumChannelsOption>()));
int id = 0;
std::generate(children.begin(), children.end(),
[&id, &child_factory] { return child_factory(id++); });
[&id, &refreshing_channel_stub_factory] {
return refreshing_channel_stub_factory(id++);
});
return std::make_shared<BigtableRoundRobin>(std::move(children));
}

std::shared_ptr<BigtableStub> CreateBigtableStubRandomTwoLeastUsed(
std::shared_ptr<internal::GrpcAuthenticationStrategy> auth,
std::shared_ptr<internal::CompletionQueueImpl> cq_impl,
std::string_view instance_name, StubManager::Priming priming,
Options const& options, BaseBigtableStubFactory stub_factory,
std::shared_ptr<ConnectionRefreshState> refresh_state) {
auto refreshing_channel_stub_factory =
[stub_factory = std::move(stub_factory), cq_impl, refresh_state,
auth = std::move(auth),
options](std::uint32_t id, std::string_view instance_name,
StubManager::Priming priming)
-> StatusOr<std::shared_ptr<ChannelUsage<BigtableStub>>> {
auto wrapper = std::make_shared<ChannelUsage<BigtableStub>>();
auto connection_status_fn = [weak = wrapper->MakeWeak()](Status const& s) {
if (auto self = weak.lock()) {
self->set_last_refresh_status(s);
}
if (!s.ok()) {
GCP_LOG(WARNING) << "Failed to refresh connection. Error: " << s;
}
};

auto channel = CreateGrpcChannel(*auth, options, id);
auto stub = stub_factory(std::move(channel));
if (priming == StubManager::Priming::kSynchronousPriming) {
grpc::ClientContext client_context;
google::bigtable::v2::PingAndWarmRequest request;
request.set_name(std::string{instance_name});
auto response =
stub->PingAndWarm(client_context, options, std::move(request));
if (!response.ok()) return response.status();
}

ScheduleStubRefresh(cq_impl, refresh_state, stub,
std::string{instance_name},
std::move(connection_status_fn));

wrapper->set_stub(std::move(stub));
return wrapper;
};

std::vector<std::shared_ptr<ChannelUsage<BigtableStub>>> children;
children.reserve(std::max(1, options.get<GrpcNumChannelsOption>()));
std::uint32_t id = 0;
for (std::uint32_t i = 0; i < children.capacity(); ++i) {
auto stub = refreshing_channel_stub_factory(id++, instance_name, priming);
if (stub.ok()) {
children.push_back(*std::move(stub));
}
}

return std::make_shared<BigtableRandomTwoLeastUsed>(
DynamicChannelPool<BigtableStub>::Create(
std::string{instance_name}, CompletionQueue(std::move(cq_impl)),
std::move(children), std::move(refresh_state),
std::move(refreshing_channel_stub_factory)));
}

std::shared_ptr<BigtableStub> CreateDecoratedStubs(
std::shared_ptr<internal::GrpcAuthenticationStrategy> auth,
CompletionQueue const& cq, std::string_view instance_name,
StubManager::Priming priming, Options const& options,
BaseBigtableStubFactory const& stub_factory) {
auto cq_impl = internal::GetCompletionQueueImpl(cq);
auto refresh = std::make_shared<ConnectionRefreshState>(
cq_impl, options.get<bigtable::MinConnectionRefreshOption>(),
options.get<bigtable::MaxConnectionRefreshOption>());
auto stub = CreateBigtableStubRandomTwoLeastUsed(
auth, std::move(cq_impl), instance_name, priming, options, stub_factory,
std::move(refresh));
return ApplyCommonDecorators(std::move(auth), std::move(stub), options);
}

std::shared_ptr<BigtableStub> CreateDecoratedStubs(
std::shared_ptr<internal::GrpcAuthenticationStrategy> auth,
CompletionQueue const& cq, Options const& options,
BaseBigtableStubFactory const& base_factory) {
BaseBigtableStubFactory const& stub_factory) {
auto cq_impl = internal::GetCompletionQueueImpl(cq);
auto refresh = std::make_shared<ConnectionRefreshState>(
cq_impl, options.get<bigtable::MinConnectionRefreshOption>(),
options.get<bigtable::MaxConnectionRefreshOption>());
auto child_factory = [base_factory, cq_impl, refresh, &auth,
options](int id) {
// Cannot use Dynamic Channel Pool as it requires affinity.
auto refreshing_channel_stub_factory = [stub_factory, cq_impl, refresh, &auth,
options](int id) {
auto channel = CreateGrpcChannel(*auth, options, id);
if (refresh->enabled()) ScheduleChannelRefresh(cq_impl, refresh, channel);
return base_factory(std::move(channel));
return stub_factory(std::move(channel));
};
auto stub = CreateBigtableStubRoundRobin(options, std::move(child_factory));
auto stub = CreateBigtableStubRoundRobin(
options, std::move(refreshing_channel_stub_factory));
if (refresh->enabled()) {
stub = std::make_shared<BigtableChannelRefresh>(std::move(stub),
std::move(refresh));
}
if (auth->RequiresConfigureContext()) {
stub = std::make_shared<BigtableAuth>(std::move(auth), std::move(stub));
}
stub = std::make_shared<BigtableMetadata>(
std::move(stub),
std::multimap<std::string, std::string>{
{"bigtable-features", FeaturesMetadata()}},
internal::HandCraftedLibClientHeader());
if (internal::Contains(options.get<LoggingComponentsOption>(), "rpc")) {
GCP_LOG(INFO) << "Enabled logging for gRPC calls";
stub = std::make_shared<BigtableLogging>(
std::move(stub), options.get<GrpcTracingOptionsOption>(),
options.get<LoggingComponentsOption>());
}
if (internal::TracingEnabled(options)) {
stub = MakeBigtableTracingStub(std::move(stub));
return ApplyCommonDecorators(std::move(auth), std::move(stub), options);
}

absl::flat_hash_map<std::string, std::shared_ptr<BigtableStub>>
CreateBigtableAffinityStubs(
std::vector<bigtable::InstanceResource> const& instances,
StubManager::StubCreationFn const& stub_creation_fn) {
absl::flat_hash_map<std::string, std::shared_ptr<BigtableStub>>
affinity_stubs;
for (auto const& instance : instances) {
affinity_stubs.insert(std::make_pair(
instance.FullName(),
stub_creation_fn(instance.FullName(),
StubManager::Priming::kSynchronousPriming)));
}
return stub;
return affinity_stubs;
}

std::shared_ptr<BigtableStub> CreateBigtableStub(
Expand All @@ -122,6 +221,18 @@ std::shared_ptr<BigtableStub> CreateBigtableStub(
});
}

std::shared_ptr<BigtableStub> CreateBigtableStub(
std::shared_ptr<internal::GrpcAuthenticationStrategy> auth,
CompletionQueue const& cq, std::string_view instance_name,
StubManager::Priming priming, Options const& options) {
return CreateDecoratedStubs(
std::move(auth), cq, instance_name, priming, options,
[](std::shared_ptr<grpc::Channel> c) {
return std::make_shared<DefaultBigtableStub>(
google::bigtable::v2::Bigtable::NewStub(std::move(c)));
});
}

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace bigtable_internal
} // namespace cloud
Expand Down
Loading
Loading