Skip to content

Commit eb60077

Browse files
authored
impl(bigtable): update DataConnectionImpl to use StubManager (#16050)
1 parent 1841554 commit eb60077

File tree

2 files changed

+107
-43
lines changed

2 files changed

+107
-43
lines changed

google/cloud/bigtable/internal/data_connection_impl.cc

Lines changed: 93 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,12 @@ ResultType MakeStatusOnlyResult(Status status) {
189189
std::make_unique<StatusOnlyResultSetSource>(std::move(status)));
190190
}
191191

192+
std::string_view InstanceNameFromTableName(std::string_view table_name) {
193+
auto pos = table_name.rfind("/tables");
194+
if (pos == std::string_view::npos) return {};
195+
return table_name.substr(0, pos);
196+
}
197+
192198
} // namespace
193199

194200
bigtable::Row TransformReadModifyWriteRowResponse(
@@ -213,10 +219,10 @@ bigtable::Row TransformReadModifyWriteRowResponse(
213219

214220
DataConnectionImpl::DataConnectionImpl(
215221
std::unique_ptr<BackgroundThreads> background,
216-
std::shared_ptr<BigtableStub> stub,
222+
std::unique_ptr<StubManager> stub_manager,
217223
std::shared_ptr<MutateRowsLimiter> limiter, Options options)
218224
: background_(std::move(background)),
219-
stub_(std::move(stub)),
225+
stub_manager_(std::move(stub_manager)),
220226
limiter_(std::move(limiter)),
221227
options_(internal::MergeOptions(std::move(options),
222228
DataConnection::options())) {
@@ -244,15 +250,33 @@ DataConnectionImpl::DataConnectionImpl(
244250
DataConnectionImpl::DataConnectionImpl(
245251
std::unique_ptr<BackgroundThreads> background,
246252
std::shared_ptr<BigtableStub> stub,
253+
std::shared_ptr<MutateRowsLimiter> limiter, Options options)
254+
: DataConnectionImpl(std::move(background),
255+
std::make_unique<StubManager>(std::move(stub)),
256+
std::move(limiter), std::move(options)) {}
257+
258+
DataConnectionImpl::DataConnectionImpl(
259+
std::unique_ptr<BackgroundThreads> background,
260+
std::unique_ptr<StubManager> stub_manager,
247261
std::unique_ptr<OperationContextFactory> operation_context_factory,
248262
std::shared_ptr<MutateRowsLimiter> limiter, Options options)
249263
: background_(std::move(background)),
250-
stub_(std::move(stub)),
264+
stub_manager_(std::move(stub_manager)),
251265
operation_context_factory_(std::move(operation_context_factory)),
252266
limiter_(std::move(limiter)),
253267
options_(internal::MergeOptions(std::move(options),
254268
DataConnection::options())) {}
255269

270+
DataConnectionImpl::DataConnectionImpl(
271+
std::unique_ptr<BackgroundThreads> background,
272+
std::shared_ptr<BigtableStub> stub,
273+
std::unique_ptr<OperationContextFactory> operation_context_factory,
274+
std::shared_ptr<MutateRowsLimiter> limiter, Options options)
275+
: DataConnectionImpl(std::move(background),
276+
std::make_unique<StubManager>(std::move(stub)),
277+
std::move(operation_context_factory),
278+
std::move(limiter), std::move(options)) {}
279+
256280
Status DataConnectionImpl::Apply(std::string const& table_name,
257281
bigtable::SingleRowMutation mut) {
258282
auto current = google::cloud::internal::SaveCurrentOptions();
@@ -270,14 +294,16 @@ Status DataConnectionImpl::Apply(std::string const& table_name,
270294

271295
auto operation_context = operation_context_factory_->MutateRow(
272296
table_name, app_profile_id(*current));
297+
298+
auto stub = stub_manager_->GetStub(InstanceNameFromTableName(table_name));
273299
auto sor = google::cloud::internal::RetryLoop(
274300
retry_policy(*current), backoff_policy(*current),
275301
is_idempotent ? Idempotency::kIdempotent : Idempotency::kNonIdempotent,
276-
[this, &operation_context](
302+
[stub, &operation_context](
277303
grpc::ClientContext& context, Options const& options,
278304
google::bigtable::v2::MutateRowRequest const& request) {
279305
operation_context->PreCall(context);
280-
auto s = stub_->MutateRow(context, options, request);
306+
auto s = stub->MutateRow(context, options, request);
281307
operation_context->PostCall(context, s.status());
282308
return s;
283309
},
@@ -311,7 +337,9 @@ future<Status> DataConnectionImpl::AsyncApply(std::string const& table_name,
311337
is_idempotent ? Idempotency::kIdempotent
312338
: Idempotency::kNonIdempotent,
313339
background_->cq(),
314-
[stub = stub_, operation_context](
340+
[stub =
341+
stub_manager_->GetStub(InstanceNameFromTableName(table_name)),
342+
operation_context](
315343
CompletionQueue& cq,
316344
std::shared_ptr<grpc::ClientContext> context,
317345
google::cloud::internal::ImmutableOptions options,
@@ -350,8 +378,9 @@ std::vector<bigtable::FailedMutation> DataConnectionImpl::BulkApply(
350378
std::unique_ptr<bigtable::DataRetryPolicy> retry;
351379
std::unique_ptr<BackoffPolicy> backoff;
352380
Status status;
381+
auto stub = stub_manager_->GetStub(InstanceNameFromTableName(table_name));
353382
while (true) {
354-
status = mutator.MakeOneRequest(*stub_, *limiter_, *current);
383+
status = mutator.MakeOneRequest(*stub, *limiter_, *current);
355384
if (!mutator.HasPendingMutations()) break;
356385
if (!retry) retry = retry_policy(*current);
357386
if (!backoff) backoff = backoff_policy(*current);
@@ -372,18 +401,22 @@ DataConnectionImpl::AsyncBulkApply(std::string const& table_name,
372401
auto operation_context = operation_context_factory_->MutateRows(
373402
table_name, app_profile_id(*current));
374403
return AsyncBulkApplier::Create(
375-
background_->cq(), stub_, limiter_, retry_policy(*current),
376-
backoff_policy(*current), enable_server_retries(*current),
377-
*idempotency_policy(*current), app_profile_id(*current), table_name,
378-
std::move(mut), std::move(operation_context));
404+
background_->cq(),
405+
stub_manager_->GetStub(InstanceNameFromTableName(table_name)), limiter_,
406+
retry_policy(*current), backoff_policy(*current),
407+
enable_server_retries(*current), *idempotency_policy(*current),
408+
app_profile_id(*current), table_name, std::move(mut),
409+
std::move(operation_context));
379410
}
380411

381412
bigtable::RowReader DataConnectionImpl::ReadRowsFull(
382413
bigtable::ReadRowsParams params) {
383414
auto current = google::cloud::internal::SaveCurrentOptions();
384415
auto operation_context = operation_context_factory_->ReadRows(
385416
params.table_name, params.app_profile_id);
386-
return ReadRowsHelper(stub_, current, std::move(params),
417+
auto stub =
418+
stub_manager_->GetStub(InstanceNameFromTableName(params.table_name));
419+
return ReadRowsHelper(stub, current, std::move(params),
387420
std::move(operation_context));
388421
}
389422

@@ -398,8 +431,10 @@ StatusOr<std::pair<bool, bigtable::Row>> DataConnectionImpl::ReadRow(
398431
// OperationContextFactory::ReadRow to create the operation_context.
399432
auto operation_context =
400433
operation_context_factory_->ReadRow(table_name, app_profile_id(*current));
434+
auto stub = stub_manager_->GetStub(InstanceNameFromTableName(table_name));
435+
401436
auto reader =
402-
ReadRowsHelper(stub_, current,
437+
ReadRowsHelper(stub, current,
403438
bigtable::ReadRowsParams{
404439
table_name, app_profile_id(*current),
405440
std::move(row_set), rows_limit, std::move(filter)},
@@ -439,13 +474,14 @@ StatusOr<bigtable::MutationBranch> DataConnectionImpl::CheckAndMutateRow(
439474
: Idempotency::kNonIdempotent;
440475
auto operation_context = operation_context_factory_->CheckAndMutateRow(
441476
table_name, app_profile_id(*current));
477+
auto stub = stub_manager_->GetStub(InstanceNameFromTableName(table_name));
442478
auto sor = google::cloud::internal::RetryLoop(
443479
retry_policy(*current), backoff_policy(*current), idempotency,
444-
[this, &operation_context](
480+
[stub, &operation_context](
445481
grpc::ClientContext& context, Options const& options,
446482
google::bigtable::v2::CheckAndMutateRowRequest const& request) {
447483
operation_context->PreCall(context);
448-
auto s = stub_->CheckAndMutateRow(context, options, request);
484+
auto s = stub->CheckAndMutateRow(context, options, request);
449485
operation_context->PostCall(context, s.status());
450486
return s;
451487
},
@@ -483,10 +519,11 @@ DataConnectionImpl::AsyncCheckAndMutateRow(
483519
auto backoff = backoff_policy(*current);
484520
auto operation_context = operation_context_factory_->CheckAndMutateRow(
485521
table_name, app_profile_id(*current));
522+
auto stub = stub_manager_->GetStub(InstanceNameFromTableName(table_name));
486523
return google::cloud::internal::AsyncRetryLoop(
487524
std::move(retry), std::move(backoff), idempotency,
488525
background_->cq(),
489-
[stub = stub_, operation_context](
526+
[stub, operation_context](
490527
CompletionQueue& cq,
491528
std::shared_ptr<grpc::ClientContext> context,
492529
google::cloud::internal::ImmutableOptions options,
@@ -529,11 +566,12 @@ StatusOr<std::vector<bigtable::RowKeySample>> DataConnectionImpl::SampleRows(
529566
std::unique_ptr<BackoffPolicy> backoff;
530567
auto operation_context = operation_context_factory_->SampleRowKeys(
531568
table_name, app_profile_id(*current));
569+
auto stub = stub_manager_->GetStub(InstanceNameFromTableName(table_name));
532570
while (true) {
533571
auto context = std::make_shared<grpc::ClientContext>();
534572
internal::ConfigureContext(*context, internal::CurrentOptions());
535573
operation_context->PreCall(*context);
536-
auto stream = stub_->SampleRowKeys(context, Options{}, request);
574+
auto stream = stub->SampleRowKeys(context, Options{}, request);
537575

538576
absl::optional<Status> status;
539577
while (true) {
@@ -574,24 +612,28 @@ DataConnectionImpl::AsyncSampleRows(std::string const& table_name) {
574612
auto operation_context = operation_context_factory_->SampleRowKeys(
575613
table_name, app_profile_id(*current));
576614
return AsyncRowSampler::Create(
577-
background_->cq(), stub_, retry_policy(*current),
578-
backoff_policy(*current), enable_server_retries(*current),
579-
app_profile_id(*current), table_name, std::move(operation_context));
615+
background_->cq(),
616+
stub_manager_->GetStub(InstanceNameFromTableName(table_name)),
617+
retry_policy(*current), backoff_policy(*current),
618+
enable_server_retries(*current), app_profile_id(*current), table_name,
619+
std::move(operation_context));
580620
}
581621

582622
StatusOr<bigtable::Row> DataConnectionImpl::ReadModifyWriteRow(
583623
google::bigtable::v2::ReadModifyWriteRowRequest request) {
584624
auto current = google::cloud::internal::SaveCurrentOptions();
585625
auto operation_context = operation_context_factory_->ReadModifyWriteRow(
586626
request.table_name(), app_profile_id(*current));
627+
auto stub =
628+
stub_manager_->GetStub(InstanceNameFromTableName(request.table_name()));
587629
auto sor = google::cloud::internal::RetryLoop(
588630
retry_policy(*current), backoff_policy(*current),
589631
Idempotency::kNonIdempotent,
590-
[this, operation_context](
632+
[stub, operation_context](
591633
grpc::ClientContext& context, Options const& options,
592634
google::bigtable::v2::ReadModifyWriteRowRequest const& request) {
593635
operation_context->PreCall(context);
594-
auto result = stub_->ReadModifyWriteRow(context, options, request);
636+
auto result = stub->ReadModifyWriteRow(context, options, request);
595637
operation_context->PostCall(context, result.status());
596638
return result;
597639
},
@@ -608,10 +650,12 @@ future<StatusOr<bigtable::Row>> DataConnectionImpl::AsyncReadModifyWriteRow(
608650
request.table_name(), app_profile_id(*current));
609651
auto retry = retry_policy(*current);
610652
auto backoff = backoff_policy(*current);
653+
auto stub =
654+
stub_manager_->GetStub(InstanceNameFromTableName(request.table_name()));
611655
return google::cloud::internal::AsyncRetryLoop(
612656
std::move(retry), std::move(backoff), Idempotency::kNonIdempotent,
613657
background_->cq(),
614-
[stub = stub_, operation_context](
658+
[stub, operation_context](
615659
CompletionQueue& cq,
616660
std::shared_ptr<grpc::ClientContext> context,
617661
google::cloud::internal::ImmutableOptions options,
@@ -648,11 +692,12 @@ void DataConnectionImpl::AsyncReadRowsHelper(
648692
std::shared_ptr<OperationContext> operation_context) {
649693
auto reverse = internal::CurrentOptions().get<bigtable::ReverseScanOption>();
650694
bigtable_internal::AsyncRowReader::Create(
651-
background_->cq(), stub_, app_profile_id(*current), table_name,
652-
std::move(on_row), std::move(on_finish), std::move(row_set), rows_limit,
653-
std::move(filter), reverse, retry_policy(*current),
654-
backoff_policy(*current), enable_server_retries(*current),
655-
std::move(operation_context));
695+
background_->cq(),
696+
stub_manager_->GetStub(InstanceNameFromTableName(table_name)),
697+
app_profile_id(*current), table_name, std::move(on_row),
698+
std::move(on_finish), std::move(row_set), rows_limit, std::move(filter),
699+
reverse, retry_policy(*current), backoff_policy(*current),
700+
enable_server_retries(*current), std::move(operation_context));
656701
}
657702

658703
void DataConnectionImpl::AsyncReadRows(
@@ -743,14 +788,15 @@ StatusOr<bigtable::PreparedQuery> DataConnectionImpl::PrepareQuery(
743788
}
744789
auto operation_context = operation_context_factory_->PrepareQuery(
745790
instance_full_name, app_profile_id(*current));
791+
auto stub = stub_manager_->GetStub(params.instance.FullName());
746792
auto response = google::cloud::internal::RetryLoop(
747793
retry_policy(*current), backoff_policy(*current),
748794
Idempotency::kIdempotent,
749-
[this, operation_context](
795+
[stub, operation_context](
750796
grpc::ClientContext& context, Options const& options,
751797
google::bigtable::v2::PrepareQueryRequest const& request) {
752798
operation_context->PreCall(context);
753-
auto const& result = stub_->PrepareQuery(context, options, request);
799+
auto const& result = stub->PrepareQuery(context, options, request);
754800
operation_context->PostCall(context, result.status());
755801
return result;
756802
},
@@ -779,17 +825,18 @@ StatusOr<bigtable::PreparedQuery> DataConnectionImpl::PrepareQuery(
779825
auto backoff = backoff_policy(*current);
780826
auto operation_context = operation_context_factory_->PrepareQuery(
781827
request.instance_name(), app_profile_id(*current));
828+
auto stub = stub_manager_->GetStub(request.instance_name());
782829
return google::cloud::internal::AsyncRetryLoop(
783830
std::move(retry), std::move(backoff), Idempotency::kIdempotent,
784831
background_->cq(),
785-
[this, operation_context](
832+
[stub, operation_context](
786833
CompletionQueue& cq,
787834
std::shared_ptr<grpc::ClientContext> context,
788835
google::cloud::internal::ImmutableOptions options,
789836
google::bigtable::v2::PrepareQueryRequest const& request) {
790837
operation_context->PreCall(*context);
791-
auto f = stub_->AsyncPrepareQuery(cq, context,
792-
std::move(options), request);
838+
auto f = stub->AsyncPrepareQuery(cq, context,
839+
std::move(options), request);
793840
return f.then(
794841
[operation_context, context = std::move(context)](auto f) {
795842
auto s = f.get();
@@ -827,17 +874,19 @@ future<StatusOr<bigtable::PreparedQuery>> DataConnectionImpl::AsyncPrepareQuery(
827874
auto operation_context = operation_context_factory_->PrepareQuery(
828875
instance_full_name, app_profile_id(*current));
829876
auto const* func = __func__;
877+
auto instance_name = params.instance.FullName();
878+
auto stub = stub_manager_->GetStub(instance_name);
830879
return google::cloud::internal::AsyncRetryLoop(
831880
std::move(retry), std::move(backoff), Idempotency::kIdempotent,
832881
background_->cq(),
833-
[this, operation_context](
882+
[stub, instance_name, operation_context](
834883
CompletionQueue& cq,
835884
std::shared_ptr<grpc::ClientContext> context,
836885
google::cloud::internal::ImmutableOptions options,
837886
google::bigtable::v2::PrepareQueryRequest const& request) {
838887
operation_context->PreCall(*context);
839-
auto f = stub_->AsyncPrepareQuery(cq, context,
840-
std::move(options), request);
888+
auto f = stub->AsyncPrepareQuery(cq, context, std::move(options),
889+
request);
841890
return f.then(
842891
[operation_context, context = std::move(context)](auto f) {
843892
auto s = f.get();
@@ -846,7 +895,7 @@ future<StatusOr<bigtable::PreparedQuery>> DataConnectionImpl::AsyncPrepareQuery(
846895
});
847896
},
848897
current, request, func)
849-
.then([this, request, operation_context, current,
898+
.then([this, instance_name, request, operation_context, current,
850899
params = std::move(params),
851900
func](future<StatusOr<google::bigtable::v2::PrepareQueryResponse>>
852901
future) -> StatusOr<bigtable::PreparedQuery> {
@@ -869,23 +918,25 @@ future<StatusOr<bigtable::PreparedQuery>> DataConnectionImpl::AsyncPrepareQuery(
869918
"Column type cannot be empty", GCP_ERROR_INFO()));
870919
}
871920
}
872-
auto refresh_fn = [this, request, func]() mutable {
921+
auto refresh_fn = [this, instance_name, request, func]() mutable {
873922
auto current = google::cloud::internal::SaveCurrentOptions();
874923
auto retry = query_plan_refresh_function_retry_policy(*current);
875924
auto backoff = backoff_policy(*current);
876925
auto operation_context = operation_context_factory_->PrepareQuery(
877926
request.instance_name(), app_profile_id(*current));
927+
// Get a new stub here to take advantage of the pool.
928+
auto stub = stub_manager_->GetStub(instance_name);
878929
return google::cloud::internal::AsyncRetryLoop(
879930
std::move(retry), std::move(backoff),
880931
Idempotency::kIdempotent, background_->cq(),
881-
[this, operation_context](
932+
[stub, instance_name, operation_context](
882933
CompletionQueue& cq,
883934
std::shared_ptr<grpc::ClientContext> context,
884935
google::cloud::internal::ImmutableOptions options,
885936
google::bigtable::v2::PrepareQueryRequest const&
886937
request) {
887938
operation_context->PreCall(*context);
888-
auto f = stub_->AsyncPrepareQuery(
939+
auto f = stub->AsyncPrepareQuery(
889940
cq, context, std::move(options), request);
890941
return f.then([operation_context,
891942
context = std::move(context)](auto f) {
@@ -1052,9 +1103,9 @@ bigtable::RowStream DataConnectionImpl::ExecuteQuery(
10521103

10531104
auto const tracing_enabled = RpcStreamTracingEnabled();
10541105
auto const& tracing_options = RpcTracingOptions();
1055-
1106+
auto stub = stub_manager_->GetStub(params.bound_query.instance().FullName());
10561107
auto source_fn =
1057-
[stub = stub_, tracing_enabled, tracing_options](
1108+
[stub, tracing_enabled, tracing_options](
10581109
google::bigtable::v2::ExecuteQueryRequest& request,
10591110
google::bigtable::v2::ResultSetMetadata metadata,
10601111
std::unique_ptr<bigtable::DataRetryPolicy> retry_policy_prototype,

0 commit comments

Comments
 (0)