Skip to content

Commit 1841554

Browse files
authored
impl(bigtable): add RandomTwoLeastUsed stub decorator (#16049)
1 parent 98cabd9 commit 1841554

File tree

6 files changed

+707
-0
lines changed

6 files changed

+707
-0
lines changed

google/cloud/bigtable/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ add_library(
155155
internal/bigtable_logging_decorator.h
156156
internal/bigtable_metadata_decorator.cc
157157
internal/bigtable_metadata_decorator.h
158+
internal/bigtable_random_two_least_used_decorator.cc
159+
internal/bigtable_random_two_least_used_decorator.h
158160
internal/bigtable_round_robin_decorator.cc
159161
internal/bigtable_round_robin_decorator.h
160162
internal/bigtable_stub.cc
@@ -444,6 +446,7 @@ if (BUILD_TESTING)
444446
internal/async_row_sampler_test.cc
445447
internal/async_streaming_read_test.cc
446448
internal/bigtable_channel_refresh_test.cc
449+
internal/bigtable_random_two_least_used_decorator_test.cc
447450
internal/bigtable_stub_factory_test.cc
448451
internal/bulk_mutator_test.cc
449452
internal/channel_usage_test.cc

google/cloud/bigtable/bigtable_client_unit_tests.bzl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ bigtable_client_unit_tests = [
4242
"internal/async_row_sampler_test.cc",
4343
"internal/async_streaming_read_test.cc",
4444
"internal/bigtable_channel_refresh_test.cc",
45+
"internal/bigtable_random_two_least_used_decorator_test.cc",
4546
"internal/bigtable_stub_factory_test.cc",
4647
"internal/bulk_mutator_test.cc",
4748
"internal/channel_usage_test.cc",

google/cloud/bigtable/google_cloud_cpp_bigtable.bzl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ google_cloud_cpp_bigtable_hdrs = [
7474
"internal/bigtable_channel_refresh.h",
7575
"internal/bigtable_logging_decorator.h",
7676
"internal/bigtable_metadata_decorator.h",
77+
"internal/bigtable_random_two_least_used_decorator.h",
7778
"internal/bigtable_round_robin_decorator.h",
7879
"internal/bigtable_stub.h",
7980
"internal/bigtable_stub_factory.h",
@@ -190,6 +191,7 @@ google_cloud_cpp_bigtable_srcs = [
190191
"internal/bigtable_channel_refresh.cc",
191192
"internal/bigtable_logging_decorator.cc",
192193
"internal/bigtable_metadata_decorator.cc",
194+
"internal/bigtable_random_two_least_used_decorator.cc",
193195
"internal/bigtable_round_robin_decorator.cc",
194196
"internal/bigtable_stub.cc",
195197
"internal/bigtable_stub_factory.cc",
Lines changed: 343 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,343 @@
1+
// Copyright 2026 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "google/cloud/bigtable/internal/bigtable_random_two_least_used_decorator.h"
16+
#include "google/cloud/internal/async_streaming_read_rpc.h"
17+
#include "google/cloud/internal/streaming_read_rpc.h"
18+
#include <functional>
19+
#include <memory>
20+
#include <optional>
21+
#include <utility>
22+
23+
namespace google {
24+
namespace cloud {
25+
namespace bigtable_internal {
26+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
27+
namespace {
28+
29+
template <typename T>
30+
class StreamingReadRpcTracking
31+
: public google::cloud::internal::StreamingReadRpc<T> {
32+
public:
33+
StreamingReadRpcTracking(
34+
std::unique_ptr<google::cloud::internal::StreamingReadRpc<T>> child,
35+
std::function<void(void)> on_destruction)
36+
: child_(std::move(child)), on_destruction_(std::move(on_destruction)) {}
37+
38+
~StreamingReadRpcTracking() override { on_destruction_(); }
39+
40+
void Cancel() override { child_->Cancel(); }
41+
std::optional<Status> Read(T* response) override {
42+
return child_->Read(response);
43+
}
44+
RpcMetadata GetRequestMetadata() const override {
45+
return child_->GetRequestMetadata();
46+
}
47+
48+
private:
49+
std::unique_ptr<google::cloud::internal::StreamingReadRpc<T>> child_;
50+
std::function<void(void)> on_destruction_;
51+
};
52+
53+
template <typename T>
54+
class AsyncStreamingReadRpcTracking
55+
: public google::cloud::internal::AsyncStreamingReadRpc<T> {
56+
public:
57+
AsyncStreamingReadRpcTracking(
58+
std::unique_ptr<google::cloud::internal::AsyncStreamingReadRpc<T>> child,
59+
std::function<void(void)> on_destruction)
60+
: child_(std::move(child)), on_destruction_(std::move(on_destruction)) {}
61+
62+
~AsyncStreamingReadRpcTracking() override { on_destruction_(); }
63+
64+
void Cancel() override { child_->Cancel(); }
65+
future<bool> Start() override { return child_->Start(); }
66+
future<std::optional<T>> Read() override { return child_->Read(); }
67+
future<Status> Finish() override { return child_->Finish(); }
68+
RpcMetadata GetRequestMetadata() const override {
69+
return child_->GetRequestMetadata();
70+
}
71+
72+
private:
73+
std::unique_ptr<google::cloud::internal::AsyncStreamingReadRpc<T>> child_;
74+
std::function<void(void)> on_destruction_;
75+
};
76+
77+
template <typename Response>
78+
Response UnaryHelper(std::shared_ptr<DynamicChannelPool<BigtableStub>>& pool,
79+
std::function<Response(BigtableStub&)> fn) {
80+
auto child = pool->GetChannelRandomTwoLeastUsed();
81+
auto stub = child->AcquireStub();
82+
auto result = fn(*stub);
83+
child->ReleaseStub();
84+
return result;
85+
}
86+
87+
template <typename Response>
88+
std::unique_ptr<google::cloud::internal::StreamingReadRpc<Response>>
89+
StreamingHelper(
90+
std::shared_ptr<DynamicChannelPool<BigtableStub>>& pool,
91+
std::function<std::unique_ptr<
92+
google::cloud::internal::StreamingReadRpc<Response>>(BigtableStub&)>
93+
fn) {
94+
auto child = pool->GetChannelRandomTwoLeastUsed();
95+
auto stub = child->AcquireStub();
96+
auto result = fn(*stub);
97+
auto release_fn = [weak = child->MakeWeak()] {
98+
auto child = weak.lock();
99+
if (child) child->ReleaseStub();
100+
};
101+
return std::make_unique<StreamingReadRpcTracking<Response>>(
102+
std::move(result), std::move(release_fn));
103+
}
104+
105+
template <typename Response>
106+
std::unique_ptr<google::cloud::internal::AsyncStreamingReadRpc<Response>>
107+
AsyncStreamingHelper(
108+
std::shared_ptr<DynamicChannelPool<BigtableStub>>& pool,
109+
std::function<std::unique_ptr<
110+
google::cloud::internal::AsyncStreamingReadRpc<Response>>(
111+
BigtableStub&)>
112+
fn) {
113+
auto child = pool->GetChannelRandomTwoLeastUsed();
114+
auto stub = child->AcquireStub();
115+
auto result = fn(*stub);
116+
auto release_fn = [weak = child->MakeWeak()] {
117+
auto child = weak.lock();
118+
if (child) child->ReleaseStub();
119+
};
120+
return std::make_unique<AsyncStreamingReadRpcTracking<Response>>(
121+
std::move(result), std::move(release_fn));
122+
}
123+
124+
} // namespace
125+
126+
std::unique_ptr<google::cloud::internal::StreamingReadRpc<
127+
google::bigtable::v2::ReadRowsResponse>>
128+
BigtableRandomTwoLeastUsed::ReadRows(
129+
std::shared_ptr<grpc::ClientContext> context, Options const& options,
130+
google::bigtable::v2::ReadRowsRequest const& request) {
131+
return StreamingHelper<google::bigtable::v2::ReadRowsResponse>(
132+
pool_, [&, context = std::move(context)](BigtableStub& stub) mutable {
133+
return stub.ReadRows(std::move(context), options, request);
134+
});
135+
}
136+
137+
std::unique_ptr<google::cloud::internal::StreamingReadRpc<
138+
google::bigtable::v2::SampleRowKeysResponse>>
139+
BigtableRandomTwoLeastUsed::SampleRowKeys(
140+
std::shared_ptr<grpc::ClientContext> context, Options const& options,
141+
google::bigtable::v2::SampleRowKeysRequest const& request) {
142+
return StreamingHelper<google::bigtable::v2::SampleRowKeysResponse>(
143+
pool_, [&, context = std::move(context)](BigtableStub& stub) mutable {
144+
return stub.SampleRowKeys(std::move(context), options, request);
145+
});
146+
}
147+
148+
StatusOr<google::bigtable::v2::MutateRowResponse>
149+
BigtableRandomTwoLeastUsed::MutateRow(
150+
grpc::ClientContext& context, Options const& options,
151+
google::bigtable::v2::MutateRowRequest const& request) {
152+
return UnaryHelper<StatusOr<google::bigtable::v2::MutateRowResponse>>(
153+
pool_, [&](BigtableStub& stub) {
154+
return stub.MutateRow(context, options, request);
155+
});
156+
}
157+
158+
std::unique_ptr<google::cloud::internal::StreamingReadRpc<
159+
google::bigtable::v2::MutateRowsResponse>>
160+
BigtableRandomTwoLeastUsed::MutateRows(
161+
std::shared_ptr<grpc::ClientContext> context, Options const& options,
162+
google::bigtable::v2::MutateRowsRequest const& request) {
163+
return StreamingHelper<google::bigtable::v2::MutateRowsResponse>(
164+
pool_, [&, context = std::move(context)](BigtableStub& stub) mutable {
165+
return stub.MutateRows(std::move(context), options, request);
166+
});
167+
}
168+
169+
StatusOr<google::bigtable::v2::CheckAndMutateRowResponse>
170+
BigtableRandomTwoLeastUsed::CheckAndMutateRow(
171+
grpc::ClientContext& context, Options const& options,
172+
google::bigtable::v2::CheckAndMutateRowRequest const& request) {
173+
return UnaryHelper<StatusOr<google::bigtable::v2::CheckAndMutateRowResponse>>(
174+
pool_, [&](BigtableStub& stub) {
175+
return stub.CheckAndMutateRow(context, options, request);
176+
});
177+
}
178+
179+
StatusOr<google::bigtable::v2::PingAndWarmResponse>
180+
BigtableRandomTwoLeastUsed::PingAndWarm(
181+
grpc::ClientContext& context, Options const& options,
182+
google::bigtable::v2::PingAndWarmRequest const& request) {
183+
return UnaryHelper<StatusOr<google::bigtable::v2::PingAndWarmResponse>>(
184+
pool_, [&](BigtableStub& stub) {
185+
return stub.PingAndWarm(context, options, request);
186+
});
187+
}
188+
189+
StatusOr<google::bigtable::v2::ReadModifyWriteRowResponse>
190+
BigtableRandomTwoLeastUsed::ReadModifyWriteRow(
191+
grpc::ClientContext& context, Options const& options,
192+
google::bigtable::v2::ReadModifyWriteRowRequest const& request) {
193+
return UnaryHelper<
194+
StatusOr<google::bigtable::v2::ReadModifyWriteRowResponse>>(
195+
pool_, [&](BigtableStub& stub) {
196+
return stub.ReadModifyWriteRow(context, options, request);
197+
});
198+
}
199+
200+
StatusOr<google::bigtable::v2::PrepareQueryResponse>
201+
BigtableRandomTwoLeastUsed::PrepareQuery(
202+
grpc::ClientContext& context, Options const& options,
203+
google::bigtable::v2::PrepareQueryRequest const& request) {
204+
return UnaryHelper<StatusOr<google::bigtable::v2::PrepareQueryResponse>>(
205+
pool_, [&](BigtableStub& stub) {
206+
return stub.PrepareQuery(context, options, request);
207+
});
208+
}
209+
210+
std::unique_ptr<google::cloud::internal::StreamingReadRpc<
211+
google::bigtable::v2::ExecuteQueryResponse>>
212+
BigtableRandomTwoLeastUsed::ExecuteQuery(
213+
std::shared_ptr<grpc::ClientContext> context, Options const& options,
214+
google::bigtable::v2::ExecuteQueryRequest const& request) {
215+
return StreamingHelper<google::bigtable::v2::ExecuteQueryResponse>(
216+
pool_, [&, context = std::move(context)](BigtableStub& stub) mutable {
217+
return stub.ExecuteQuery(std::move(context), options, request);
218+
});
219+
}
220+
221+
std::unique_ptr<google::cloud::internal::AsyncStreamingReadRpc<
222+
google::bigtable::v2::ReadRowsResponse>>
223+
BigtableRandomTwoLeastUsed::AsyncReadRows(
224+
google::cloud::CompletionQueue const& cq,
225+
std::shared_ptr<grpc::ClientContext> context,
226+
google::cloud::internal::ImmutableOptions options,
227+
google::bigtable::v2::ReadRowsRequest const& request) {
228+
return AsyncStreamingHelper<google::bigtable::v2::ReadRowsResponse>(
229+
pool_, [&, context = std::move(context),
230+
options = std::move(options)](BigtableStub& stub) mutable {
231+
return stub.AsyncReadRows(cq, std::move(context), std::move(options),
232+
request);
233+
});
234+
}
235+
236+
std::unique_ptr<google::cloud::internal::AsyncStreamingReadRpc<
237+
google::bigtable::v2::SampleRowKeysResponse>>
238+
BigtableRandomTwoLeastUsed::AsyncSampleRowKeys(
239+
google::cloud::CompletionQueue const& cq,
240+
std::shared_ptr<grpc::ClientContext> context,
241+
google::cloud::internal::ImmutableOptions options,
242+
google::bigtable::v2::SampleRowKeysRequest const& request) {
243+
return AsyncStreamingHelper<google::bigtable::v2::SampleRowKeysResponse>(
244+
pool_, [&, context = std::move(context),
245+
options = std::move(options)](BigtableStub& stub) mutable {
246+
return stub.AsyncSampleRowKeys(cq, std::move(context),
247+
std::move(options), request);
248+
});
249+
}
250+
251+
future<StatusOr<google::bigtable::v2::MutateRowResponse>>
252+
BigtableRandomTwoLeastUsed::AsyncMutateRow(
253+
google::cloud::CompletionQueue& cq,
254+
std::shared_ptr<grpc::ClientContext> context,
255+
google::cloud::internal::ImmutableOptions options,
256+
google::bigtable::v2::MutateRowRequest const& request) {
257+
return UnaryHelper<future<StatusOr<google::bigtable::v2::MutateRowResponse>>>(
258+
pool_, [&, context = std::move(context),
259+
options = std::move(options)](BigtableStub& stub) mutable {
260+
return stub.AsyncMutateRow(cq, std::move(context), std::move(options),
261+
request);
262+
});
263+
}
264+
265+
std::unique_ptr<google::cloud::internal::AsyncStreamingReadRpc<
266+
google::bigtable::v2::MutateRowsResponse>>
267+
BigtableRandomTwoLeastUsed::AsyncMutateRows(
268+
google::cloud::CompletionQueue const& cq,
269+
std::shared_ptr<grpc::ClientContext> context,
270+
google::cloud::internal::ImmutableOptions options,
271+
google::bigtable::v2::MutateRowsRequest const& request) {
272+
return AsyncStreamingHelper<google::bigtable::v2::MutateRowsResponse>(
273+
pool_, [&, context = std::move(context),
274+
options = std::move(options)](BigtableStub& stub) mutable {
275+
return stub.AsyncMutateRows(cq, std::move(context), std::move(options),
276+
request);
277+
});
278+
}
279+
280+
future<StatusOr<google::bigtable::v2::CheckAndMutateRowResponse>>
281+
BigtableRandomTwoLeastUsed::AsyncCheckAndMutateRow(
282+
google::cloud::CompletionQueue& cq,
283+
std::shared_ptr<grpc::ClientContext> context,
284+
google::cloud::internal::ImmutableOptions options,
285+
google::bigtable::v2::CheckAndMutateRowRequest const& request) {
286+
return UnaryHelper<
287+
future<StatusOr<google::bigtable::v2::CheckAndMutateRowResponse>>>(
288+
pool_, [&, context = std::move(context),
289+
options = std::move(options)](BigtableStub& stub) mutable {
290+
return stub.AsyncCheckAndMutateRow(cq, std::move(context),
291+
std::move(options), request);
292+
});
293+
}
294+
295+
future<StatusOr<google::bigtable::v2::PingAndWarmResponse>>
296+
BigtableRandomTwoLeastUsed::AsyncPingAndWarm(
297+
google::cloud::CompletionQueue& cq,
298+
std::shared_ptr<grpc::ClientContext> context,
299+
google::cloud::internal::ImmutableOptions options,
300+
google::bigtable::v2::PingAndWarmRequest const& request) {
301+
return UnaryHelper<
302+
future<StatusOr<google::bigtable::v2::PingAndWarmResponse>>>(
303+
pool_, [&, context = std::move(context),
304+
options = std::move(options)](BigtableStub& stub) mutable {
305+
return stub.AsyncPingAndWarm(cq, std::move(context), std::move(options),
306+
request);
307+
});
308+
}
309+
310+
future<StatusOr<google::bigtable::v2::ReadModifyWriteRowResponse>>
311+
BigtableRandomTwoLeastUsed::AsyncReadModifyWriteRow(
312+
google::cloud::CompletionQueue& cq,
313+
std::shared_ptr<grpc::ClientContext> context,
314+
google::cloud::internal::ImmutableOptions options,
315+
google::bigtable::v2::ReadModifyWriteRowRequest const& request) {
316+
return UnaryHelper<
317+
future<StatusOr<google::bigtable::v2::ReadModifyWriteRowResponse>>>(
318+
pool_, [&, context = std::move(context),
319+
options = std::move(options)](BigtableStub& stub) mutable {
320+
return stub.AsyncReadModifyWriteRow(cq, std::move(context),
321+
std::move(options), request);
322+
});
323+
}
324+
325+
future<StatusOr<google::bigtable::v2::PrepareQueryResponse>>
326+
BigtableRandomTwoLeastUsed::AsyncPrepareQuery(
327+
google::cloud::CompletionQueue& cq,
328+
std::shared_ptr<grpc::ClientContext> context,
329+
google::cloud::internal::ImmutableOptions options,
330+
google::bigtable::v2::PrepareQueryRequest const& request) {
331+
return UnaryHelper<
332+
future<StatusOr<google::bigtable::v2::PrepareQueryResponse>>>(
333+
pool_, [&, context = std::move(context),
334+
options = std::move(options)](BigtableStub& stub) mutable {
335+
return stub.AsyncPrepareQuery(cq, std::move(context),
336+
std::move(options), request);
337+
});
338+
}
339+
340+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
341+
} // namespace bigtable_internal
342+
} // namespace cloud
343+
} // namespace google

0 commit comments

Comments
 (0)