Skip to content

Commit 8989a78

Browse files
authored
impl(bigtable): add ChannelUsage class (#16031)
1 parent 6b6c478 commit 8989a78

File tree

5 files changed

+291
-0
lines changed

5 files changed

+291
-0
lines changed

google/cloud/bigtable/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ add_library(
165165
internal/bigtable_tracing_stub.h
166166
internal/bulk_mutator.cc
167167
internal/bulk_mutator.h
168+
internal/channel_usage.h
168169
internal/client_options_defaults.h
169170
internal/common_client.h
170171
internal/connection_refresh_state.cc
@@ -444,6 +445,7 @@ if (BUILD_TESTING)
444445
internal/bigtable_channel_refresh_test.cc
445446
internal/bigtable_stub_factory_test.cc
446447
internal/bulk_mutator_test.cc
448+
internal/channel_usage_test.cc
447449
internal/connection_refresh_state_test.cc
448450
internal/convert_policies_test.cc
449451
internal/crc32c_test.cc

google/cloud/bigtable/bigtable_client_unit_tests.bzl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ bigtable_client_unit_tests = [
4444
"internal/bigtable_channel_refresh_test.cc",
4545
"internal/bigtable_stub_factory_test.cc",
4646
"internal/bulk_mutator_test.cc",
47+
"internal/channel_usage_test.cc",
4748
"internal/connection_refresh_state_test.cc",
4849
"internal/convert_policies_test.cc",
4950
"internal/crc32c_test.cc",

google/cloud/bigtable/google_cloud_cpp_bigtable.bzl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ google_cloud_cpp_bigtable_hdrs = [
7979
"internal/bigtable_stub_factory.h",
8080
"internal/bigtable_tracing_stub.h",
8181
"internal/bulk_mutator.h",
82+
"internal/channel_usage.h",
8283
"internal/client_options_defaults.h",
8384
"internal/common_client.h",
8485
"internal/connection_refresh_state.h",
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
// Copyright 2026 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_CHANNEL_USAGE_H
16+
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_CHANNEL_USAGE_H
17+
18+
#include "google/cloud/internal/clock.h"
19+
#include "google/cloud/status_or.h"
20+
#include "google/cloud/version.h"
21+
#include <chrono>
22+
#include <deque>
23+
#include <memory>
24+
#include <mutex>
25+
#include <utility>
26+
27+
namespace google {
28+
namespace cloud {
29+
namespace bigtable_internal {
30+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
31+
32+
// This class wraps a `T`, typically a BigtableStub, and tracks the number of
33+
// outstanding RPCs by taking measurements when the wrapped stub is acquired
34+
// and released.
35+
template <typename T>
36+
class ChannelUsage : public std::enable_shared_from_this<ChannelUsage<T>> {
37+
public:
38+
using Clock = ::google::cloud::internal::SteadyClock;
39+
ChannelUsage() = default;
40+
explicit ChannelUsage(std::shared_ptr<T> stub, std::shared_ptr<Clock> clock =
41+
std::make_shared<Clock>())
42+
: stub_(std::move(stub)), clock_(std::move(clock)) {}
43+
44+
// Computes the weighted average of outstanding RPCs on the channel over the
45+
// past 60 seconds.
46+
StatusOr<int> average_outstanding_rpcs() {
47+
auto constexpr kWindowSeconds = 60;
48+
auto constexpr kWindowDuration = std::chrono::seconds(kWindowSeconds);
49+
std::scoped_lock lk(mu_);
50+
if (!last_refresh_status_.ok()) return last_refresh_status_;
51+
// If there are no measurements then the stub has never been used.
52+
if (measurements_.empty()) return 0;
53+
auto now = clock_->Now();
54+
auto last_time = now;
55+
auto window_start = now - kWindowDuration;
56+
57+
double sum = 0.0;
58+
double total_weight = 0.0;
59+
auto iter = measurements_.rbegin();
60+
while (iter != measurements_.rend() && iter->timestamp >= window_start) {
61+
double weight =
62+
std::chrono::duration<double>(last_time - iter->timestamp).count();
63+
last_time = iter->timestamp;
64+
sum += iter->outstanding_rpcs * weight;
65+
total_weight += weight;
66+
++iter;
67+
}
68+
69+
// It's unlikely we will have a measurement at precisely the beginning of
70+
// the window. So, we need to use the first measurement outside the window
71+
// to compute a measurement for the missing part of the window using a
72+
// weight equal to the missing time.
73+
if (iter != measurements_.rend()) {
74+
double weight = std::max(0.0, kWindowSeconds - total_weight);
75+
sum += iter->outstanding_rpcs * weight;
76+
total_weight += weight;
77+
// We want to keep one measurement that's at least 60s old to provide a
78+
// starting value for the next window.
79+
++iter;
80+
}
81+
82+
if (measurements_.size() > 1) {
83+
measurements_.erase(measurements_.begin(), iter.base());
84+
}
85+
// After iterating through the measurements if the total_weight is zero,
86+
// then all of the measurements occurred at time == now, and returning the
87+
// current number of outstanding RPCs is most correct.
88+
return total_weight == 0.0 ? outstanding_rpcs_
89+
: static_cast<int>(sum / total_weight);
90+
}
91+
92+
StatusOr<int> instant_outstanding_rpcs() {
93+
std::scoped_lock lk(mu_);
94+
if (!last_refresh_status_.ok()) return last_refresh_status_;
95+
return outstanding_rpcs_;
96+
}
97+
98+
ChannelUsage& set_last_refresh_status(Status s) {
99+
std::scoped_lock lk(mu_);
100+
last_refresh_status_ = std::move(s);
101+
return *this;
102+
}
103+
104+
// A channel can only be set if the current value is nullptr. This mutator
105+
// exists only so that we can obtain a std::weak_ptr to the ChannelUsage
106+
// object that will eventually hold the channel.
107+
ChannelUsage& set_stub(std::shared_ptr<T> stub) {
108+
std::scoped_lock lk(mu_);
109+
if (!stub_) stub_ = std::move(stub);
110+
return *this;
111+
}
112+
113+
std::weak_ptr<ChannelUsage<T>> MakeWeak() { return this->shared_from_this(); }
114+
115+
std::shared_ptr<T> AcquireStub() {
116+
std::scoped_lock lk(mu_);
117+
++outstanding_rpcs_;
118+
auto time = clock_->Now();
119+
measurements_.emplace_back(outstanding_rpcs_, time);
120+
return stub_;
121+
}
122+
123+
void ReleaseStub() {
124+
std::scoped_lock lk(mu_);
125+
--outstanding_rpcs_;
126+
measurements_.emplace_back(outstanding_rpcs_, clock_->Now());
127+
}
128+
129+
private:
130+
mutable std::mutex mu_;
131+
std::shared_ptr<T> stub_;
132+
std::shared_ptr<Clock> clock_ = std::make_shared<Clock>();
133+
int outstanding_rpcs_ = 0;
134+
Status last_refresh_status_;
135+
struct Measurement {
136+
Measurement(int outstanding_rpcs, std::chrono::steady_clock::time_point p)
137+
: outstanding_rpcs(outstanding_rpcs), timestamp(p) {}
138+
int outstanding_rpcs;
139+
std::chrono::steady_clock::time_point timestamp;
140+
};
141+
// Older measurements are removed as part of the average_outstanding_rpcs
142+
// method.
143+
std::deque<Measurement> measurements_;
144+
};
145+
146+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
147+
} // namespace bigtable_internal
148+
} // namespace cloud
149+
} // namespace google
150+
151+
#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_CHANNEL_USAGE_H
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
// Copyright 2026 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "google/cloud/bigtable/internal/channel_usage.h"
16+
#include "google/cloud/bigtable/testing/mock_bigtable_stub.h"
17+
#include "google/cloud/internal/make_status.h"
18+
#include "google/cloud/testing_util/fake_clock.h"
19+
#include "google/cloud/testing_util/status_matchers.h"
20+
#include <gmock/gmock.h>
21+
22+
namespace google {
23+
namespace cloud {
24+
namespace bigtable_internal {
25+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
26+
namespace {
27+
28+
using ::google::cloud::bigtable::testing::MockBigtableStub;
29+
using ::google::cloud::testing_util::IsOkAndHolds;
30+
using ::google::cloud::testing_util::StatusIs;
31+
using ::testing::Eq;
32+
33+
TEST(ChannelUsageTest, SetChannel) {
34+
auto mock = std::make_shared<MockBigtableStub>();
35+
auto channel = std::make_shared<ChannelUsage<BigtableStub>>();
36+
EXPECT_THAT(channel->AcquireStub(), Eq(nullptr));
37+
channel->set_stub(mock);
38+
EXPECT_THAT(channel->AcquireStub(), Eq(mock));
39+
auto mock2 = std::make_shared<MockBigtableStub>();
40+
channel->set_stub(mock2);
41+
EXPECT_THAT(channel->AcquireStub(), Eq(mock));
42+
}
43+
44+
TEST(ChannelUsageTest, InstantOutstandingRpcs) {
45+
auto mock = std::make_shared<MockBigtableStub>();
46+
auto channel = std::make_shared<ChannelUsage<BigtableStub>>(mock);
47+
48+
auto stub = channel->AcquireStub();
49+
EXPECT_THAT(stub, Eq(mock));
50+
EXPECT_THAT(channel->instant_outstanding_rpcs(), IsOkAndHolds(1));
51+
stub = channel->AcquireStub();
52+
EXPECT_THAT(stub, Eq(mock));
53+
stub = channel->AcquireStub();
54+
EXPECT_THAT(stub, Eq(mock));
55+
EXPECT_THAT(channel->instant_outstanding_rpcs(), IsOkAndHolds(3));
56+
channel->ReleaseStub();
57+
EXPECT_THAT(channel->instant_outstanding_rpcs(), IsOkAndHolds(2));
58+
channel->ReleaseStub();
59+
channel->ReleaseStub();
60+
EXPECT_THAT(channel->instant_outstanding_rpcs(), IsOkAndHolds(0));
61+
}
62+
63+
TEST(ChannelUsageTest, SetLastRefreshStatus) {
64+
auto mock = std::make_shared<MockBigtableStub>();
65+
auto channel = std::make_shared<ChannelUsage<BigtableStub>>(mock);
66+
Status expected_status = internal::InternalError("uh oh");
67+
(void)channel->AcquireStub();
68+
EXPECT_THAT(channel->instant_outstanding_rpcs(), IsOkAndHolds(1));
69+
channel->set_last_refresh_status(expected_status);
70+
EXPECT_THAT(channel->instant_outstanding_rpcs(),
71+
StatusIs(expected_status.code()));
72+
EXPECT_THAT(channel->average_outstanding_rpcs(),
73+
StatusIs(expected_status.code()));
74+
}
75+
76+
TEST(ChannelUsageTest, AverageOutstandingRpcs) {
77+
auto clock = std::make_shared<testing_util::FakeSteadyClock>();
78+
auto mock = std::make_shared<MockBigtableStub>();
79+
auto channel = std::make_shared<ChannelUsage<BigtableStub>>(mock, clock);
80+
EXPECT_THAT(channel->instant_outstanding_rpcs(), IsOkAndHolds(0));
81+
82+
auto start = std::chrono::steady_clock::now();
83+
clock->SetTime(start);
84+
85+
for (int i = 0; i < 10; ++i) (void)channel->AcquireStub();
86+
EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(10));
87+
88+
clock->AdvanceTime(std::chrono::seconds(1));
89+
// sum=10 total_weight=1
90+
EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(10));
91+
92+
for (int i = 0; i < 10; ++i) (void)channel->AcquireStub();
93+
clock->AdvanceTime(std::chrono::seconds(1));
94+
// sum=30, total_weight=2
95+
EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(15));
96+
97+
for (int i = 0; i < 20; ++i) channel->ReleaseStub();
98+
clock->AdvanceTime(std::chrono::seconds(1));
99+
// sum=30, total_weight=3
100+
EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(10));
101+
102+
clock->AdvanceTime(std::chrono::seconds(2));
103+
// sum=30, total_weight=5
104+
EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(6));
105+
106+
for (int i = 0; i < 100; ++i) (void)channel->AcquireStub();
107+
clock->AdvanceTime(std::chrono::seconds(25));
108+
// sum=2530, total_weight=84
109+
EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(84));
110+
111+
clock->AdvanceTime(std::chrono::seconds(35));
112+
// First 5s of measurements have aged out.
113+
EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(100));
114+
115+
clock->AdvanceTime(std::chrono::seconds(60));
116+
// All measurements have aged out.
117+
EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(100));
118+
119+
clock->AdvanceTime(std::chrono::seconds(3600));
120+
// All measurements have aged out.
121+
EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(100));
122+
}
123+
124+
TEST(ChannelUsageTest, MakeWeak) {
125+
auto channel = std::make_shared<ChannelUsage<BigtableStub>>();
126+
auto weak = channel->MakeWeak();
127+
EXPECT_THAT(weak.lock(), Eq(channel));
128+
channel.reset();
129+
EXPECT_THAT(weak.lock(), Eq(nullptr));
130+
}
131+
132+
} // namespace
133+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
134+
} // namespace bigtable_internal
135+
} // namespace cloud
136+
} // namespace google

0 commit comments

Comments
 (0)