Skip to content

Commit dc94bf1

Browse files
committed
update redis to prevent cross-slot violations in cluster mode
1 parent 0598244 commit dc94bf1

3 files changed

Lines changed: 23 additions & 35 deletions

File tree

lib/logstash/inputs/redis.rb

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
require "logstash/inputs/base"
44
require "logstash/inputs/threadable"
55
require 'redis'
6+
require 'redis-clustering'
67
require "stud/interval"
78

89
# This input will read events from a Redis instance; it supports both Redis channels and lists.
@@ -137,12 +138,12 @@ def redis_params
137138
if @path.nil?
138139
if !@cluster_hosts.nil?
139140
params = {
140-
:cluster => cluster_hosts,
141+
:nodes => cluster_hosts,
141142
}
142143
elsif !@sentinel_hosts.nil?
143144
hosts = @sentinel_hosts.map { |sentinel_host| { host: sentinel_host, port: @sentinel_port } }
144145
params = {
145-
:host => @sentinel_master_name,
146+
:name => @sentinel_master_name,
146147
:sentinels => hosts,
147148
:role => :master
148149
}
@@ -161,18 +162,13 @@ def redis_params
161162
end
162163

163164
def new_redis_instance
164-
::Redis.new(redis_params)
165+
@cluster_hosts.nil? ? ::Redis.new(redis_params) : ::Redis::Cluster.new(redis_params)
165166
end
166167

167168
# private
168169
def connect
169170
redis = new_redis_instance
170171

171-
# register any renamed Redis commands
172-
@command_map.each do |name, renamed|
173-
redis._client.command_map[name.to_sym] = renamed.to_sym
174-
end
175-
176172
load_batch_script(redis) if batched? && is_list_type?
177173

178174
redis
@@ -266,7 +262,7 @@ def list_batch_listener(redis, output_queue)
266262
end
267263

268264
def list_single_listener(redis, output_queue)
269-
item = redis.blpop(@key, 0, :timeout => 1)
265+
item = redis.blpop(@key, timeout: 1)
270266
return unless item # from timeout or other conditions
271267

272268
# blpop returns the 'key' read from as well as the item result

logstash-input-redis.gemspec

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ Gem::Specification.new do |s|
2323
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
2424

2525
s.add_runtime_dependency 'logstash-codec-json'
26-
s.add_runtime_dependency 'redis', '>= 4.0.1', '< 5'
26+
s.add_runtime_dependency 'redis', '>= 5.0', '< 6'
27+
s.add_runtime_dependency 'redis-clustering', '>= 5.0', '< 6'
2728

2829
s.add_development_dependency 'logstash-devutils'
2930
end

spec/inputs/redis_spec.rb

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -140,35 +140,25 @@ def process(conf, event_count)
140140
}
141141
end
142142

143-
it 'sets the renamed commands in the command map' do
144-
allow_any_instance_of( Redis::Client ).to receive(:call) do |_, command|
143+
it 'registers and connects without error' do
144+
allow_any_instance_of( Redis::Client ).to receive(:call_v) do |_, command|
145145
expect(command[0]).to eql :script
146-
expect(command[1]).to eql 'load'
147-
end
146+
expect(command[1].to_s.downcase).to eql 'load'
147+
end.and_return('a' * 40)
148148

149149
subject.register
150-
redis = subject.send :connect
151-
152-
command_map = redis._client.command_map
153-
154-
expect(command_map[:blpop]).to eq config['command_map']['blpop'].to_sym
155-
expect(command_map[:evalsha]).to eq config['command_map']['evalsha'].to_sym
156-
expect(command_map[:lrange]).to eq config['command_map']['lrange'].to_sym
157-
expect(command_map[:ltrim]).to eq config['command_map']['ltrim'].to_sym
158-
expect(command_map[:script]).to eq config['command_map']['script'].to_sym
159-
expect(command_map[:subscribe]).to eq config['command_map']['subscribe'].to_sym
160-
expect(command_map[:psubscribe]).to eq config['command_map']['psubscribe'].to_sym
150+
expect { subject.send :connect }.not_to raise_error
161151
end
162152

163153
it 'loads the batch script with the renamed command' do
164-
expect_any_instance_of( Redis::Client ).to receive(:call) do |_, command|
154+
expect_any_instance_of( Redis::Client ).to receive(:call_v) do |_, command|
165155
expect(command[0]).to eql :script
166-
expect(command[1]).to eql 'load'
156+
expect(command[1].to_s.downcase).to eql 'load'
167157

168158
script = command[2]
169159
expect(script).to include "redis.call('#{config['command_map']['lrange']}', KEYS[1], 0, batchsize)"
170160
expect(script).to include "redis.call('#{config['command_map']['ltrim']}', KEYS[1], batchsize + 1, -1)"
171-
end
161+
end.and_return('a' * 40)
172162

173163
subject.register
174164
subject.send :connect
@@ -199,9 +189,10 @@ def process(conf, event_count)
199189
end
200190

201191
it 'calling the run method, adds events to the queue' do
202-
allow_any_instance_of( Redis::Client ).to receive(:call_with_timeout) do |_, command, timeout, &block|
192+
allow_any_instance_of( Redis::Client ).to receive(:blocking_call_v) do |_, timeout, command|
203193
expect(command[0]).to eql :blpop
204-
expect(command[1]).to eql ['foo', 0]
194+
expect(command[1]).to eql 'foo'
195+
expect(command[2]).to eql ({ timeout: 1 })
205196
end.and_return ['foo', "{\"foo1\":\"bar\"}"], nil
206197

207198
tt = Thread.new do
@@ -218,7 +209,7 @@ def process(conf, event_count)
218209

219210
it 'keeps running when a connection error occurs' do
220211
raised = false
221-
allow_any_instance_of( Redis::Client ).to receive(:call_with_timeout) do |_, command, timeout, &block|
212+
allow_any_instance_of( Redis::Client ).to receive(:blocking_call_v) do |_, timeout, command|
222213
expect(command[0]).to eql :blpop
223214
unless raised
224215
raised = true
@@ -283,8 +274,8 @@ def process(conf, event_count)
283274
let(:batch_count) { 10 }
284275

285276
it 'calling the run method, adds events to the queue' do
286-
allow_any_instance_of( Redis ).to receive(:script)
287-
allow_any_instance_of( Redis::Client ).to receive(:call) do |_, command|
277+
allow_any_instance_of( Redis ).to receive(:script).and_return('a' * 40)
278+
allow_any_instance_of( Redis::Client ).to receive(:call_v) do |_, command|
288279
expect(command[0]).to eql :evalsha
289280
end.and_return ['{"a": 1}', '{"b": 2}'], []
290281

@@ -306,8 +297,8 @@ def process(conf, event_count)
306297
let(:rates) { [] }
307298

308299
it 'will throttle the loop' do
309-
allow_any_instance_of( Redis ).to receive(:script)
310-
allow_any_instance_of( Redis::Client ).to receive(:call) do |_, command|
300+
allow_any_instance_of( Redis ).to receive(:script).and_return('a' * 40)
301+
allow_any_instance_of( Redis::Client ).to receive(:call_v) do |_, command|
311302
expect(command[0]).to eql :evalsha
312303
rates.unshift Time.now.to_f
313304
end.and_return []

0 commit comments

Comments
 (0)