Skip to content

Commit 97fccab

Browse files
feat: Add channel pooling for gapic clients (#969)
1 parent ed284a9 commit 97fccab

8 files changed

Lines changed: 923 additions & 8 deletions

File tree

.github/workflows/ci.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ on:
1212
env:
1313
MT_COMPAT: true
1414

15-
1615
jobs:
1716
tests:
1817
if: ${{ github.repository == 'googleapis/gapic-generator-ruby' }}

gapic-common/gapic-common.gemspec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ Gem::Specification.new do |spec|
4343
spec.add_dependency "google-protobuf", "~> 3.14"
4444
spec.add_dependency "grpc", "~> 1.36"
4545

46+
spec.add_development_dependency "concurrent-ruby", "~> 1.2.2"
4647
spec.add_development_dependency "google-cloud-core", "~> 1.5"
4748
spec.add_development_dependency "google-style", "~> 1.26.0"
4849
spec.add_development_dependency "minitest", "~> 5.16"

gapic-common/lib/gapic/grpc/service_stub.rb

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,24 @@
1515
require "grpc"
1616
require "googleauth"
1717
require "gapic/grpc/service_stub/rpc_call"
18+
require "gapic/grpc/service_stub/channel"
19+
require "gapic/grpc/service_stub/channel_pool"
20+
1821

1922
module Gapic
2023
##
2124
# Gapic gRPC Stub
2225
#
23-
# This class wraps the actual gRPC Stub object and it's RPC methods.
26+
# This class wraps the actual gRPC Stub and ChannelPool objects.
2427
#
2528
# @!attribute [r] grpc_stub
2629
# @return [Object] The instance of the gRPC stub class (`grpc_stub_class`) constructor argument.
30+
# @!attribute [r] channel_pool
31+
# @return [Gapic::ServiceStub::ChannelPool] The instance of the ChannelPool class.
2732
#
2833
class ServiceStub
2934
attr_reader :grpc_stub
35+
attr_reader :channel_pool
3036

3137
##
3238
# Creates a Gapic gRPC stub object.
@@ -49,22 +55,49 @@ class ServiceStub
4955
# provided as a `::GRPC::Core::Channel`.)
5056
# @param interceptors [Array<::GRPC::ClientInterceptor>] An array of {::GRPC::ClientInterceptor} objects that will
5157
# be used for intercepting calls before they are executed Interceptors are an EXPERIMENTAL API.
58+
# @param channel_pool_config [::Gapic::ServiceStub:ChannelPool::Configuration] The configuration for channel
59+
# pool. This argument will raise error when `credentials` is provided as a `::GRPC::Core::Channel`.
5260
#
53-
def initialize grpc_stub_class, endpoint:, credentials:, channel_args: nil, interceptors: nil
61+
def initialize grpc_stub_class, endpoint:, credentials:, channel_args: nil,
62+
interceptors: nil, channel_pool_config: nil
5463
raise ArgumentError, "grpc_stub_class is required" if grpc_stub_class.nil?
5564
raise ArgumentError, "endpoint is required" if endpoint.nil?
5665
raise ArgumentError, "credentials is required" if credentials.nil?
5766

67+
@channel_pool = nil
68+
@grpc_stub = nil
5869
channel_args = Hash channel_args
5970
interceptors = Array interceptors
6071

72+
73+
if channel_pool_config && channel_pool_config.channel_count > 1
74+
create_channel_pool grpc_stub_class, endpoint: endpoint, credentials: credentials,
75+
channel_args: channel_args, interceptors: interceptors,
76+
channel_pool_config: channel_pool_config
77+
else
78+
create_grpc_stub grpc_stub_class, endpoint: endpoint, credentials: credentials,
79+
channel_args: channel_args, interceptors: interceptors
80+
end
81+
end
82+
83+
def create_channel_pool grpc_stub_class, endpoint:, credentials:, channel_args: nil,
84+
interceptors: nil, channel_pool_config: nil
85+
if credentials.is_a? ::GRPC::Core::Channel
86+
raise ArgumentError, "Cannot create a channel pool with GRPC::Core::Channel as credentials"
87+
end
88+
@channel_pool = ChannelPool.new grpc_stub_class, endpoint: endpoint, credentials: credentials,
89+
channel_args: channel_args, interceptors: interceptors,
90+
config: channel_pool_config
91+
end
92+
93+
def create_grpc_stub grpc_stub_class, endpoint:, credentials:, channel_args: nil, interceptors: nil
6194
@grpc_stub = case credentials
6295
when ::GRPC::Core::Channel
6396
grpc_stub_class.new endpoint, nil, channel_override: credentials,
64-
interceptors: interceptors
97+
interceptors: interceptors
6598
when ::GRPC::Core::ChannelCredentials, Symbol
6699
grpc_stub_class.new endpoint, credentials, channel_args: channel_args,
67-
interceptors: interceptors
100+
interceptors: interceptors
68101
else
69102
updater_proc = credentials.updater_proc if credentials.respond_to? :updater_proc
70103
updater_proc ||= credentials if credentials.is_a? Proc
@@ -73,7 +106,7 @@ def initialize grpc_stub_class, endpoint:, credentials:, channel_args: nil, inte
73106
call_creds = ::GRPC::Core::CallCredentials.new updater_proc
74107
chan_creds = ::GRPC::Core::ChannelCredentials.new.compose call_creds
75108
grpc_stub_class.new endpoint, chan_creds, channel_args: channel_args,
76-
interceptors: interceptors
109+
interceptors: interceptors
77110
end
78111
end
79112

@@ -152,8 +185,12 @@ def initialize grpc_stub_class, endpoint:, credentials:, channel_args: nil, inte
152185
# end
153186
#
154187
def call_rpc method_name, request, options: nil, &block
155-
rpc_call = RpcCall.new @grpc_stub.method method_name
156-
rpc_call.call request, options: options, &block
188+
if @channel_pool.nil?
189+
rpc_call = RpcCall.new @grpc_stub.method method_name
190+
rpc_call.call request, options: options, &block
191+
else
192+
@channel_pool.call_rpc method_name, request, options: options, &block
193+
end
157194
end
158195
end
159196
end
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
# Copyright 2023 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
require "grpc"
16+
require "googleauth"
17+
require "gapic/grpc/service_stub/rpc_call"
18+
19+
module Gapic
20+
class ServiceStub
21+
##
22+
# @private
23+
#
24+
# Gapic gRPC ServiceStub Channel.
25+
#
26+
# This class wraps the gRPC stub object and its RPC methods.
27+
#
28+
class Channel
29+
attr_reader :concurrent_streams
30+
31+
##
32+
# Creates a new Channel instance
33+
#
34+
def initialize grpc_stub_class, endpoint:, credentials:, channel_args: nil, interceptors: nil,
35+
on_channel_create: nil
36+
@grpc_stub_class = grpc_stub_class
37+
@endpoint = endpoint
38+
@credentials = credentials
39+
@channel_args = Hash channel_args
40+
@interceptors = Array interceptors
41+
@concurrent_streams = 0
42+
@mutex = Mutex.new
43+
setup_grpc_stub
44+
on_channel_create&.call self
45+
end
46+
47+
##
48+
# Creates a gRPC stub object
49+
#
50+
def setup_grpc_stub
51+
raise ArgumentError, "grpc_stub_class is required" if @grpc_stub_class.nil?
52+
raise ArgumentError, "endpoint is required" if @endpoint.nil?
53+
raise ArgumentError, "credentials is required" if @credentials.nil?
54+
55+
@grpc_stub = case @credentials
56+
when ::GRPC::Core::Channel
57+
@grpc_stub_class.new @endpoint, nil, channel_override: @credentials, interceptors: @interceptors
58+
when ::GRPC::Core::ChannelCredentials, Symbol
59+
@grpc_stub_class.new @endpoint, @credentials, channel_args: @channel_args,
60+
interceptors: @interceptors
61+
else
62+
updater_proc = @credentials.updater_proc if @credentials.respond_to? :updater_proc
63+
updater_proc ||= @credentials if @credentials.is_a? Proc
64+
raise ArgumentError, "invalid credentials (#{credentials.class})" if updater_proc.nil?
65+
66+
call_creds = ::GRPC::Core::CallCredentials.new updater_proc
67+
chan_creds = ::GRPC::Core::ChannelCredentials.new.compose call_creds
68+
@grpc_stub_class.new @endpoint, chan_creds, channel_args: @channel_args,
69+
interceptors: @interceptors
70+
end
71+
end
72+
73+
##
74+
# Invoke the specified RPC call.
75+
#
76+
# @param method_name [Symbol] The RPC method name.
77+
# @param request [Object] The request object.
78+
# @param options [Gapic::CallOptions, Hash] The options for making the RPC call. A Hash can be provided to
79+
# customize the options object, using keys that match the arguments for {Gapic::CallOptions.new}. This object
80+
# should only be used once.
81+
#
82+
# @yield [response, operation] Access the response along with the RPC operation.
83+
# @yieldparam response [Object] The response object.
84+
# @yieldparam operation [::GRPC::ActiveCall::Operation] The RPC operation for the response.
85+
#
86+
# @return [Object] The response object.
87+
#
88+
def call_rpc method_name, request, options: nil, &block
89+
@mutex.synchronize { @concurrent_streams += 1 }
90+
begin
91+
rpc_call = RpcCall.new @grpc_stub.method method_name
92+
response = rpc_call.call request, options: options, &block
93+
response
94+
rescue StandardError => e
95+
raise e
96+
ensure
97+
@mutex.synchronize { @concurrent_streams -= 1 }
98+
end
99+
end
100+
end
101+
end
102+
end
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
# Copyright 2023 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
require "grpc"
16+
require "googleauth"
17+
require "gapic/config"
18+
19+
20+
module Gapic
21+
class ServiceStub
22+
##
23+
# @private
24+
#
25+
# Gapic gRPC ServiceStub ChannelPool
26+
#
27+
# This class wraps multiple channels for sending RPCs.
28+
#
29+
class ChannelPool
30+
##
31+
# Initialize an instance of ServiceStub::ChannelPool
32+
#
33+
def initialize grpc_stub_class, endpoint:, credentials:, channel_args: nil, interceptors: nil, config: nil
34+
if credentials.is_a? ::GRPC::Core::Channel
35+
raise ArgumentError, "Can't create a channel pool with GRPC::Core::Channel as credentials"
36+
end
37+
38+
@grpc_stub_class = grpc_stub_class
39+
@endpoint = endpoint
40+
@credentials = credentials
41+
@channel_args = channel_args
42+
@interceptors = interceptors
43+
@config = config || Configuration.new
44+
45+
@channels = (1..@config.channel_count).map { create_channel }
46+
end
47+
48+
##
49+
# Creates a new channel.
50+
def create_channel
51+
Channel.new @grpc_stub_class, endpoint: @endpoint, credentials: @credentials, channel_args: @channel_args,
52+
interceptors: @interceptors, on_channel_create: @config.on_channel_create
53+
end
54+
55+
##
56+
# Invoke the specified RPC call.
57+
#
58+
# @param method_name [Symbol] The RPC method name.
59+
# @param request [Object] The request object.
60+
# @param options [Gapic::CallOptions, Hash] The options for making the RPC call. A Hash can be provided to
61+
# customize the options object, using keys that match the arguments for {Gapic::CallOptions.new}. This object
62+
# should only be used once.
63+
#
64+
# @yield [response, operation] Access the response along with the RPC operation.
65+
# @yieldparam response [Object] The response object.
66+
# @yieldparam operation [::GRPC::ActiveCall::Operation] The RPC operation for the response.
67+
#
68+
# @return [Object] The response object.
69+
#
70+
def call_rpc method_name, request, options: nil, &block
71+
unless @config.channel_selection == :least_loaded
72+
warn "Invalid channel selection configuration, resorting to least loaded channel"
73+
end
74+
channel = least_loaded_channel
75+
channel.call_rpc method_name, request, options: options, &block
76+
end
77+
78+
79+
private
80+
81+
##
82+
# Return the least loaded channel in the pool
83+
#
84+
# @return [::Grpc::ServiceStub::Channel]
85+
#
86+
def least_loaded_channel
87+
@channels.min_by(&:concurrent_streams)
88+
end
89+
90+
##
91+
# Configuration class for ChannelPool
92+
#
93+
# @!attribute [rw] channel_count
94+
# The number of channels in the channel pool.
95+
# return [Integer]
96+
# @!attribute [rw] on_channel_create
97+
# Proc to run at the end of each channel initialization.
98+
# Proc is provided ::Gapic::ServiceStub::Channel object as input.
99+
# return [Proc]
100+
# @!attribute [rw] channel_selection
101+
# The algorithm for selecting a channel for an RPC.
102+
# return [Symbol]
103+
#
104+
class Configuration
105+
extend ::Gapic::Config
106+
107+
config_attr :channel_count, 1, ::Integer
108+
config_attr :on_channel_create, nil, ::Proc
109+
config_attr :channel_selection, :least_loaded, :least_loaded
110+
end
111+
end
112+
end
113+
end

0 commit comments

Comments
 (0)