2121#include " google/cloud/bigtable/internal/stub_manager.h"
2222#include " google/cloud/bigtable/options.h"
2323#include " google/cloud/completion_queue.h"
24- #include " google/cloud/internal/clock.h"
2524#include " google/cloud/internal/random.h"
2625#include " google/cloud/version.h"
2726#include < cmath>
@@ -96,6 +95,8 @@ template <typename T>
9695class DynamicChannelPool
9796 : public std::enable_shared_from_this<DynamicChannelPool<T>> {
9897 public:
98+ // This function should only return an error status if priming is attempted
99+ // and it is unsuccessful.
99100 using StubFactoryFn =
100101 std::function<StatusOr<std::shared_ptr<ChannelUsage<T>>>(
101102 std::uint32_t id, std::string const & instance_name,
@@ -140,8 +141,7 @@ class DynamicChannelPool
140141 return channels_.empty ();
141142 }
142143
143- // If the pool is not under a pool resize cooldown, call
144- // CheckPoolChannelHealth.
144+ // Calls CheckPoolChannelHealth before picking a channel.
145145 //
146146 // Pick two random channels from channels_ and return the channel with the
147147 // lower number of outstanding_rpcs. This is the "quick" path.
@@ -156,8 +156,47 @@ class DynamicChannelPool
156156 // If there are no healthy channels in channels_, create a new channel and
157157 // use that one. Also call ScheduleAddChannels to replenish channels_.
158158 std::shared_ptr<ChannelUsage<T>> GetChannelRandomTwoLeastUsed () {
159- // Not yet implemented.
160- return {};
159+ std::scoped_lock lk (mu_);
160+ CheckPoolChannelHealth (lk);
161+
162+ ChannelSelectionData d;
163+ d.iterators .reserve (channels_.size ());
164+ for (auto iter = channels_.begin (); iter != channels_.end (); ++iter) {
165+ d.iterators .push_back (iter);
166+ }
167+ std::shuffle (d.iterators .begin (), d.iterators .end (), rng_);
168+ d.shuffle_iter = d.iterators .begin ();
169+
170+ if (d.shuffle_iter != d.iterators .end ()) {
171+ d.channel_1_iter = *d.shuffle_iter ;
172+ d.channel_1_rpcs = (*d.channel_1_iter )->instant_outstanding_rpcs ();
173+ ++d.shuffle_iter ;
174+ }
175+
176+ if (d.shuffle_iter != d.iterators .end ()) {
177+ d.channel_2_iter = *d.shuffle_iter ;
178+ d.channel_2_rpcs = (*d.channel_2_iter )->instant_outstanding_rpcs ();
179+ }
180+
181+ // This is the most common case so we try it first.
182+ if (d.channel_1_rpcs .ok () && d.channel_2_rpcs .ok ()) {
183+ return *d.channel_1_rpcs < *d.channel_2_rpcs ? *d.channel_1_iter
184+ : *d.channel_2_iter ;
185+ }
186+ if (d.iterators .size () == 1 && d.channel_1_rpcs .ok ()) {
187+ // Pool contains exactly 1 good channel.
188+ return *d.channel_1_iter ;
189+ }
190+ if (d.iterators .empty ()) {
191+ // Pool is empty, create a channel immediately and return it. While the
192+ // return value is a StatusOr<T>, it will only ever contain an error if
193+ // priming is attempted.
194+ channels_.push_back (stub_factory_fn_ (next_channel_id_++, instance_name_,
195+ StubManager::Priming::kNoPriming )
196+ .value ());
197+ return channels_.front ();
198+ }
199+ return HandleBadChannels (lk, d);
161200 }
162201
163202 private:
@@ -180,6 +219,76 @@ class DynamicChannelPool
180219 SetSizeDecreaseCooldownTimer (lk);
181220 }
182221
222+ struct ChannelSelectionData {
223+ using ChannelSelect =
224+ typename std::vector<std::shared_ptr<ChannelUsage<T>>>::iterator;
225+ std::vector<ChannelSelect> iterators;
226+ ChannelSelect channel_1_iter;
227+ ChannelSelect channel_2_iter;
228+ StatusOr<int > channel_1_rpcs = Status{StatusCode::kNotFound , " " };
229+ StatusOr<int > channel_2_rpcs = Status{StatusCode::kNotFound , " " };
230+ typename std::vector<ChannelSelect>::iterator shuffle_iter;
231+
232+ static void FindGoodChannel (
233+ std::vector<ChannelSelect>& iterators, ChannelSelect& iter,
234+ StatusOr<int >& rpcs,
235+ typename std::vector<ChannelSelect>::iterator& shuffle_iter,
236+ std::vector<ChannelSelect>& bad_channel_iters) {
237+ if (!rpcs.ok ()) {
238+ bad_channel_iters.push_back (iter);
239+ while (shuffle_iter != iterators.end () && !rpcs.ok ()) {
240+ iter = *shuffle_iter;
241+ rpcs = (*iter)->instant_outstanding_rpcs ();
242+ if (!rpcs.ok ()) bad_channel_iters.push_back (iter);
243+ ++shuffle_iter;
244+ }
245+ }
246+ }
247+ };
248+
249+ // We have one or more bad channels. Spending time finding a good channel
250+ // will be cheaper than trying to use a bad channel in the long run.
251+ std::shared_ptr<ChannelUsage<T>> HandleBadChannels (
252+ std::scoped_lock<std::mutex> const & lk, ChannelSelectionData& d) {
253+ std::vector<typename ChannelSelectionData::ChannelSelect> bad_channel_iters;
254+ if (d.shuffle_iter != d.iterators .end ()) ++d.shuffle_iter ;
255+ ChannelSelectionData::FindGoodChannel (d.iterators , d.channel_1_iter ,
256+ d.channel_1_rpcs , d.shuffle_iter ,
257+ bad_channel_iters);
258+ ChannelSelectionData::FindGoodChannel (d.iterators , d.channel_2_iter ,
259+ d.channel_2_rpcs , d.shuffle_iter ,
260+ bad_channel_iters);
261+
262+ std::shared_ptr<ChannelUsage<T>> channel;
263+ if (d.channel_1_rpcs .ok () || d.channel_2_rpcs .ok ()) {
264+ if (d.channel_1_rpcs .ok () && d.channel_2_rpcs .ok ()) {
265+ channel = *d.channel_1_rpcs < *d.channel_2_rpcs ? *d.channel_1_iter
266+ : *d.channel_2_iter ;
267+ } else if (d.channel_1_rpcs .ok ()) {
268+ channel = *d.channel_1_iter ;
269+ } else if (d.channel_2_rpcs .ok ()) {
270+ channel = *d.channel_2_iter ;
271+ }
272+ // Wait until we no longer need valid iterators to call EvictBadChannels.
273+ EvictBadChannels (lk, bad_channel_iters);
274+ } else {
275+ // Call EvictBadChannels before we channels_.push_back to avoid
276+ // invalidating bad_channel_iters if there is a realloc of the vector.
277+ EvictBadChannels (lk, bad_channel_iters);
278+ // We have no usable channels in the entire pool; this is bad.
279+ // Create a channel immediately to unblock application. While the
280+ // return value is a StatusOr<T>, it will only ever contain an error if
281+ // priming is attempted.
282+ channels_.push_back (stub_factory_fn_ (next_channel_id_++, instance_name_,
283+ StubManager::Priming::kNoPriming )
284+ .value ());
285+ std::swap (channels_.front (), channels_.back ());
286+ channel = channels_.front ();
287+ }
288+ ScheduleRemoveChannels (lk);
289+ return channel;
290+ }
291+
183292 struct ChannelAddVisitor {
184293 std::size_t pool_size;
185294 explicit ChannelAddVisitor (std::size_t pool_size) : pool_size(pool_size) {}
@@ -297,22 +406,77 @@ class DynamicChannelPool
297406 void EvictBadChannels (
298407 std::scoped_lock<std::mutex> const &,
299408 std::vector<
300- typename std::vector<std::shared_ptr<ChannelUsage<T>>>::iterator>&) {
301- // Not yet implemented.
409+ typename std::vector<std::shared_ptr<ChannelUsage<T>>>::iterator>&
410+ bad_channel_iters) {
411+ auto back_iter = channels_.rbegin ();
412+ for (auto & bad_channel_iter : bad_channel_iters) {
413+ bool swapped = false ;
414+ while (!swapped && back_iter != channels_.rend ()) {
415+ auto b = (*back_iter)->instant_outstanding_rpcs ();
416+ if (b.ok ()) {
417+ std::swap (*back_iter, *bad_channel_iter);
418+ draining_channels_.push_back (std::move (*back_iter));
419+ swapped = true ;
420+ }
421+ ++back_iter;
422+ }
423+ }
424+ for (std::size_t i = 0 ; i < bad_channel_iters.size (); ++i) {
425+ channels_.pop_back ();
426+ }
302427 }
303428
304429 void SetSizeDecreaseCooldownTimer (std::scoped_lock<std::mutex> const &) {
305430 pool_size_decrease_cooldown_timer_ = cq_.MakeRelativeTimer (
306431 sizing_policy_.pool_size_decrease_cooldown_interval );
307432 }
308433
309- // Computes the average_rpcs_pre_channel across all channels in the pool,
310- // excluding any channels that are awaiting removal in draining_channels_.
434+ // Computes the average RPCs per channel across all channels in the pool,
435+ // by summing the outstanding_rpc from each channel and dividing by the
436+ // number of active channels plus the num_pending_channels_.
437+ // Any channels that are awaiting removal in draining_channels_ are excluded
438+ // from this calculation.
311439 // The computed average is compared to the thresholds in the sizing policy
312- // and calls either ScheduleRemoveChannels or ScheduleAddChannels as
313- // appropriate. If either is called the resize_cooldown_timer is also set.
314- void CheckPoolChannelHealth (std::scoped_lock<std::mutex> const &) {
315- // Not yet implemented
440+ // and calls either ScheduleRemoveChannel or ScheduleAddChannel as
441+ // appropriate. If ScheduleRemoveChannel is called the resize_cooldown_timer
442+ // is also set.
443+ void CheckPoolChannelHealth (std::scoped_lock<std::mutex> const & lk) {
444+ int average_rpcs_per_channel =
445+ channels_.empty ()
446+ ? 0
447+ : std::accumulate (channels_.begin (), channels_.end (), 0 ,
448+ [](int a, auto const & b) {
449+ auto rpcs_b = b->instant_outstanding_rpcs ();
450+ return a + (rpcs_b.ok () ? *rpcs_b : 0 );
451+ }) /
452+ static_cast <int >(channels_.size () + num_pending_channels_);
453+ if (channels_.size () < sizing_policy_.minimum_channel_pool_size ||
454+ (average_rpcs_per_channel >
455+ sizing_policy_.maximum_average_outstanding_rpcs_per_channel &&
456+ channels_.size () < sizing_policy_.maximum_channel_pool_size )) {
457+ // Channel/stub creation is expensive, instead of making the current RPC
458+ // wait on this, use an existing channel right now, and schedule a channel
459+ // to be added.
460+ ScheduleAddChannels (lk);
461+ return ;
462+ }
463+
464+ if ((!pool_size_decrease_cooldown_timer_.valid () ||
465+ pool_size_decrease_cooldown_timer_.is_ready ()) &&
466+ average_rpcs_per_channel <
467+ sizing_policy_.minimum_average_outstanding_rpcs_per_channel &&
468+ channels_.size () > sizing_policy_.minimum_channel_pool_size ) {
469+ if (pool_size_decrease_cooldown_timer_.is_ready ()) {
470+ pool_size_decrease_cooldown_timer_.get ();
471+ }
472+ auto random_channel = std::uniform_int_distribution<std::size_t >(
473+ 0 , channels_.size () - 1 )(rng_);
474+ std::swap (channels_[random_channel], channels_.back ());
475+ draining_channels_.push_back (std::move (channels_.back ()));
476+ channels_.pop_back ();
477+ ScheduleRemoveChannels (lk);
478+ SetSizeDecreaseCooldownTimer (lk);
479+ }
316480 }
317481
318482 mutable std::mutex mu_;
0 commit comments