Skip to content

Commit 19e1445

Browse files
committed
update redis to prevent cross-slot violations in cluster mode
1 parent 2e020bd commit 19e1445

3 files changed

Lines changed: 24 additions & 36 deletions

File tree

lib/logstash/inputs/redis.rb

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
require "logstash/inputs/base"
44
require "logstash/inputs/threadable"
55
require 'redis'
6+
require 'redis-clustering'
67
require "stud/interval"
78

89
# This input will read events from a Redis instance; it supports both Redis channels and lists.
@@ -137,12 +138,12 @@ def redis_params
137138
if @path.nil?
138139
if !@cluster_hosts.nil?
139140
params = {
140-
:cluster => cluster_hosts,
141+
:nodes => cluster_hosts,
141142
}
142143
elsif !@sentinel_hosts.nil?
143144
hosts = @sentinel_hosts.map { |sentinel_host| { host: sentinel_host, port: @sentinel_port } }
144145
params = {
145-
:host => @sentinel_master_name,
146+
:name => @sentinel_master_name,
146147
:sentinels => hosts,
147148
:role => :master
148149
}
@@ -161,18 +162,13 @@ def redis_params
161162
end
162163

163164
def new_redis_instance
164-
::Redis.new(redis_params)
165+
@cluster_hosts.nil? ? ::Redis.new(redis_params) : ::Redis::Cluster.new(redis_params)
165166
end
166167

167168
# private
168169
def connect
169170
redis = new_redis_instance
170171

171-
# register any renamed Redis commands
172-
@command_map.each do |name, renamed|
173-
redis._client.command_map[name.to_sym] = renamed.to_sym
174-
end
175-
176172
load_batch_script(redis) if batched? && is_list_type?
177173

178174
redis
@@ -266,7 +262,7 @@ def list_batch_listener(redis, output_queue)
266262
end
267263

268264
def list_single_listener(redis, output_queue)
269-
item = redis.blpop(@key, 0, :timeout => 1)
265+
item = redis.blpop(@key, timeout: 1)
270266
return unless item # from timeout or other conditions
271267

272268
# blpop returns the 'key' read from as well as the item result

logstash-input-redis.gemspec

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ Gem::Specification.new do |s|
2323
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
2424

2525
s.add_runtime_dependency 'logstash-codec-json'
26-
s.add_runtime_dependency 'redis', '>= 4.0.1', '< 5'
26+
s.add_runtime_dependency 'redis', '>= 5.0', '< 6'
27+
s.add_runtime_dependency 'redis-clustering', '>= 5.0', '< 6'
2728

2829
s.add_development_dependency 'logstash-devutils'
2930
end

spec/inputs/redis_spec.rb

Lines changed: 17 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
STANDALONE_REDIS_PARAMS = { host: '127.0.0.1', port: 6379 }.freeze
77

88
SENTINEL_REDIS_PARAMS = {
9-
host: 'mymaster',
9+
name: 'mymaster',
1010
sentinels: [
1111
{ host: '127.0.0.1', port: 26379 },
1212
{ host: '127.0.0.1', port: 26380 },
@@ -139,35 +139,25 @@ def process(conf, event_count)
139139
}
140140
end
141141

142-
it 'sets the renamed commands in the command map' do
143-
allow_any_instance_of( Redis::Client ).to receive(:call) do |_, command|
142+
it 'registers and connects without error' do
143+
allow_any_instance_of( Redis::Client ).to receive(:call_v) do |_, command|
144144
expect(command[0]).to eql :script
145-
expect(command[1]).to eql 'load'
146-
end
145+
expect(command[1].to_s.downcase).to eql 'load'
146+
end.and_return('a' * 40)
147147

148148
subject.register
149-
redis = subject.send :connect
150-
151-
command_map = redis._client.command_map
152-
153-
expect(command_map[:blpop]).to eq config['command_map']['blpop'].to_sym
154-
expect(command_map[:evalsha]).to eq config['command_map']['evalsha'].to_sym
155-
expect(command_map[:lrange]).to eq config['command_map']['lrange'].to_sym
156-
expect(command_map[:ltrim]).to eq config['command_map']['ltrim'].to_sym
157-
expect(command_map[:script]).to eq config['command_map']['script'].to_sym
158-
expect(command_map[:subscribe]).to eq config['command_map']['subscribe'].to_sym
159-
expect(command_map[:psubscribe]).to eq config['command_map']['psubscribe'].to_sym
149+
expect { subject.send :connect }.not_to raise_error
160150
end
161151

162152
it 'loads the batch script with the renamed command' do
163-
expect_any_instance_of( Redis::Client ).to receive(:call) do |_, command|
153+
expect_any_instance_of( Redis::Client ).to receive(:call_v) do |_, command|
164154
expect(command[0]).to eql :script
165-
expect(command[1]).to eql 'load'
155+
expect(command[1].to_s.downcase).to eql 'load'
166156

167157
script = command[2]
168158
expect(script).to include "redis.call('#{config['command_map']['lrange']}', KEYS[1], 0, batchsize)"
169159
expect(script).to include "redis.call('#{config['command_map']['ltrim']}', KEYS[1], batchsize + 1, -1)"
170-
end
160+
end.and_return('a' * 40)
171161

172162
subject.register
173163
subject.send :connect
@@ -198,9 +188,10 @@ def process(conf, event_count)
198188
end
199189

200190
it 'calling the run method, adds events to the queue' do
201-
allow_any_instance_of( Redis::Client ).to receive(:call_with_timeout) do |_, command, timeout, &block|
191+
allow_any_instance_of( Redis::Client ).to receive(:blocking_call_v) do |_, timeout, command|
202192
expect(command[0]).to eql :blpop
203-
expect(command[1]).to eql ['foo', 0]
193+
expect(command[1]).to eql 'foo'
194+
expect(command[2]).to eql 1
204195
end.and_return ['foo', "{\"foo1\":\"bar\"}"], nil
205196

206197
tt = Thread.new do
@@ -217,7 +208,7 @@ def process(conf, event_count)
217208

218209
it 'keeps running when a connection error occurs' do
219210
raised = false
220-
allow_any_instance_of( Redis::Client ).to receive(:call_with_timeout) do |_, command, timeout, &block|
211+
allow_any_instance_of( Redis::Client ).to receive(:blocking_call_v) do |_, timeout, command|
221212
expect(command[0]).to eql :blpop
222213
unless raised
223214
raised = true
@@ -282,8 +273,8 @@ def process(conf, event_count)
282273
let(:batch_count) { 10 }
283274

284275
it 'calling the run method, adds events to the queue' do
285-
allow_any_instance_of( Redis ).to receive(:script)
286-
allow_any_instance_of( Redis::Client ).to receive(:call) do |_, command|
276+
allow_any_instance_of( Redis ).to receive(:script).and_return('a' * 40)
277+
allow_any_instance_of( Redis::Client ).to receive(:call_v) do |_, command|
287278
expect(command[0]).to eql :evalsha
288279
end.and_return ['{"a": 1}', '{"b": 2}'], []
289280

@@ -305,8 +296,8 @@ def process(conf, event_count)
305296
let(:rates) { [] }
306297

307298
it 'will throttle the loop' do
308-
allow_any_instance_of( Redis ).to receive(:script)
309-
allow_any_instance_of( Redis::Client ).to receive(:call) do |_, command|
299+
allow_any_instance_of( Redis ).to receive(:script).and_return('a' * 40)
300+
allow_any_instance_of( Redis::Client ).to receive(:call_v) do |_, command|
310301
expect(command[0]).to eql :evalsha
311302
rates.unshift Time.now.to_f
312303
end.and_return []

0 commit comments

Comments
 (0)