diff --git a/.travis.yml b/.travis.yml index 028f060..c10cfd7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,4 +10,57 @@ addons: before_install: - sudo service redis-server stop - - sudo service redis-server start --bind 0.0.0.0 + + # --- Standalone (port 6379) --- + - redis-server --port 6379 --daemonize yes --logfile /tmp/redis-standalone.log + + # --- Sentinel Setup (ports 7000–7002, sentinels 26379–26381) --- + - mkdir -p /tmp/redis-sentinel/{7000,7001,7002} + + # Primary + - redis-server --port 7000 --daemonize yes --logfile /tmp/redis-sentinel/7000/redis.log --dir /tmp/redis-sentinel/7000 + + # Replicas + - redis-server --port 7001 --daemonize yes --replicaof 127.0.0.1 7000 --logfile /tmp/redis-sentinel/7001/redis.log --dir /tmp/redis-sentinel/7001 + - redis-server --port 7002 --daemonize yes --replicaof 127.0.0.1 7000 --logfile /tmp/redis-sentinel/7002/redis.log --dir /tmp/redis-sentinel/7002 + + # Sentinel config files + - | + for port in 26379 26380 26381; do + { + echo "port ${port}" + echo "sentinel monitor mymaster 127.0.0.1 7000 2" + echo "sentinel down-after-milliseconds mymaster 5000" + echo "sentinel failover-timeout mymaster 10000" + echo "sentinel parallel-syncs mymaster 1" + echo "daemonize yes" + echo "logfile /tmp/sentinel-${port}.log" + } > /tmp/sentinel-${port}.conf + redis-server /tmp/sentinel-${port}.conf --sentinel + done + + # --- Cluster Setup (ports 7003–7008) --- + - mkdir -p /tmp/redis-cluster/{7003,7004,7005,7006,7007,7008} + + - | + for port in 7003 7004 7005 7006 7007 7008; do + { + echo "port ${port}" + echo "cluster-enabled yes" + echo "cluster-config-file /tmp/redis-cluster/${port}/nodes.conf" + echo "cluster-node-timeout 5000" + echo "appendonly yes" + echo "daemonize yes" + echo "logfile /tmp/redis-cluster/${port}/redis.log" + echo "dir /tmp/redis-cluster/${port}" + } > /tmp/redis-cluster/${port}/redis.conf + redis-server /tmp/redis-cluster/${port}/redis.conf + done + + # Bootstrap the cluster (3 masters + 3 replicas) + - sleep 1 + - | + echo "yes" | redis-cli --cluster create \ + 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005 \ + 127.0.0.1:7006 127.0.0.1:7007 127.0.0.1:7008 \ + --cluster-replicas 1 \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index beea6c7..03a4e71 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 3.8.0 + - Add Redis sentinel and cluster support [#101](https://github.com/logstash-plugins/logstash-input-redis/pull/101/changes) + - Updated redis client dependency to ~> 5 + ## 3.7.1 - Add documentation to "threads" option [#95](https://github.com/logstash-plugins/logstash-input-redis/pull/95) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index e5ea025..6bcc1b5 100755 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -43,6 +43,7 @@ This plugin supports the following configuration options plus the <> |<>|No +| <> |<>|No | <> |<>|No | <> |<>, one of `["list", "channel", "pattern_channel"]`|Yes | <> |<>|No @@ -51,6 +52,8 @@ This plugin supports the following configuration options plus the <> |<>|Yes | <> |<>|No | <> |<>|No +| <> |<>|No +| <> |<>|No | <> |<>|No | <> |<>|No | <> |<>|No @@ -69,6 +72,15 @@ input plugins. The number of events to return from Redis using EVAL. +[id="plugins-{type}s-{plugin}-cluster_hosts"] +===== `cluster_hosts` + + * Value type is <> + * There is no default value for this setting. + * Cluster will override Host or Sentinel configuration if both specified. + +The connection URLs of your Redis Cluster nodes, e.g. `["redis://127.0.0.1:7000", "redis://127.0.0.1:7001"]`. + [id="plugins-{type}s-{plugin}-command_map"] ===== `command_map` @@ -112,7 +124,7 @@ The hostname of your Redis server. * Value type is <> * There is no default value for this setting. - * Path will override Host configuration if both specified. + * Path will override Host, Sentinel or Cluster configuration if both specified. The unix socket path of your Redis server. @@ -141,6 +153,23 @@ Password to authenticate with. There is no authentication by default. The port to connect on. +[id="plugins-{type}s-{plugin}-sentinel_hosts"] +===== `sentinel_hosts` + + * Value type is <> + * There is no default value for this setting. + * Sentinel will override Host configuration if both specified. + +The hostnames and ports of your Redis Sentinel servers, e.g. `["127.0.0.1:26379", "127.0.0.1:26380"]`. + +[id="plugins-{type}s-{plugin}-sentinel_master_name"] +===== `sentinel_master_name` + + * Value type is <> + * Default value is `"mymaster"` + +The name of the Redis Sentinel master group to connect to. Only used when `sentinel_hosts` is set. + [id="plugins-{type}s-{plugin}-ssl"] ===== `ssl` diff --git a/lib/logstash/inputs/redis.rb b/lib/logstash/inputs/redis.rb index e8a2667..9856d36 100755 --- a/lib/logstash/inputs/redis.rb +++ b/lib/logstash/inputs/redis.rb @@ -3,6 +3,7 @@ require "logstash/inputs/base" require "logstash/inputs/threadable" require 'redis' +require 'redis-clustering' require "stud/interval" # This input will read events from a Redis instance; it supports both Redis channels and lists. @@ -27,13 +28,22 @@ module LogStash module Inputs class Redis < LogStash::Inputs::Threadable # The hostname of your Redis server. config :host, :validate => :string, :default => "127.0.0.1" + # The hostnames and ports of your sentinel servers. + config :sentinel_hosts, :validate => :array + + # The connection URLs of your cluster servers. + config :cluster_hosts, :validate => :array + # The port to connect on. config :port, :validate => :number, :default => 6379 + # The name of the sentinel master to connect to. + config :sentinel_master_name, :validate => :string, :default => "mymaster" + # SSL config :ssl, :validate => :boolean, :default => false - # The unix socket path to connect on. Will override host and port if defined. + # The unix socket path to connect on. Will override host and port and sentinel_hosts and cluster_hosts if defined. # There is no unix socket path by default. config :path, :validate => :string @@ -63,7 +73,15 @@ module LogStash module Inputs class Redis < LogStash::Inputs::Threadable public def register - @redis_url = @path.nil? ? "redis://#{@password}@#{@host}:#{@port}/#{@db}" : "#{@password}@#{@path}/#{@db}" + if !@path.nil? + @redis_url = "#{@password}@#{@path}/#{@db}" + elsif !@cluster_hosts.nil? + @redis_url = "#{@password}@#{@cluster_hosts.map { |h| "#{h}" }.join(',')}/#{@db}" + elsif !@sentinel_hosts.nil? + @redis_url = "redis://#{@password}@#{@sentinel_master_name}(#{@sentinel_hosts.map { |h| "#{h}" }.join(',')})/#{@db}" + else + @redis_url = "redis://#{@password}@#{@host}:#{@port}/#{@db}" + end # just switch on data_type once if @data_type == 'list' || @data_type == 'dummy' @@ -115,10 +133,28 @@ def redis_params } if @path.nil? - params[:host] = @host - params[:port] = @port + if !@cluster_hosts.nil? + params = { + :nodes => cluster_hosts, + } + elsif !@sentinel_hosts.nil? + hosts = @sentinel_hosts.map do |sentinel_host| + host, port = sentinel_host.split(':', 2) + { host: host, port: port ? port.to_i : 26379 } + end + params = { + :name => @sentinel_master_name, + :sentinels => hosts, + :role => :master + } + else + params = { + :host => @host, + :port => @port + } + end else - @logger.warn("Parameter 'path' is set, ignoring parameters: 'host' and 'port'") + @logger.warn("Parameter 'path' is set, ignoring parameters: 'host', 'port', 'sentinel_hosts' and 'cluster_hosts'") params[:path] = @path end @@ -126,18 +162,13 @@ def redis_params end def new_redis_instance - ::Redis.new(redis_params) + @cluster_hosts.nil? ? ::Redis.new(redis_params) : ::Redis::Cluster.new(redis_params) end # private def connect redis = new_redis_instance - # register any renamed Redis commands - @command_map.each do |name, renamed| - redis._client.command_map[name.to_sym] = renamed.to_sym - end - load_batch_script(redis) if batched? && is_list_type? redis @@ -231,7 +262,7 @@ def list_batch_listener(redis, output_queue) end def list_single_listener(redis, output_queue) - item = redis.blpop(@key, 0, :timeout => 1) + item = redis.blpop(@key, timeout: 1) return unless item # from timeout or other conditions # blpop returns the 'key' read from as well as the item result diff --git a/logstash-input-redis.gemspec b/logstash-input-redis.gemspec index 5ce35df..f563288 100755 --- a/logstash-input-redis.gemspec +++ b/logstash-input-redis.gemspec @@ -23,7 +23,8 @@ Gem::Specification.new do |s| s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" s.add_runtime_dependency 'logstash-codec-json' - s.add_runtime_dependency 'redis', '>= 4.0.1', '< 5' + s.add_runtime_dependency 'redis', '>= 5.0', '< 6' + s.add_runtime_dependency 'redis-clustering', '>= 5.0', '< 6' s.add_development_dependency 'logstash-devutils' end diff --git a/spec/inputs/redis_spec.rb b/spec/inputs/redis_spec.rb index b8b9472..ccf6964 100755 --- a/spec/inputs/redis_spec.rb +++ b/spec/inputs/redis_spec.rb @@ -3,11 +3,36 @@ require 'logstash/inputs/redis' require 'securerandom' -def populate(key, event_count) +STANDALONE_REDIS_PARAMS = { host: '127.0.0.1', port: 6379 }.freeze + +SENTINEL_REDIS_PARAMS = { + name: 'mymaster', + sentinels: [ + { host: '127.0.0.1', port: 26379 }, + { host: '127.0.0.1', port: 26380 }, + { host: '127.0.0.1', port: 26381 } + ], + role: :master +}.freeze + +CLUSTER_REDIS_PARAMS = { + nodes: %w[redis://127.0.0.1:7003 redis://127.0.0.1:7004 redis://127.0.0.1:7005] +}.freeze + +SENTINEL_EXTRA_CONFIG = <<~CONF.strip + sentinel_hosts => ["127.0.0.1:26379", "127.0.0.1:26380", "127.0.0.1:26381"] +CONF + +CLUSTER_EXTRA_CONFIG = <<~CONF.strip + cluster_hosts => ["redis://127.0.0.1:7003", "redis://127.0.0.1:7004", "redis://127.0.0.1:7005"] +CONF + +def populate(key, event_count, redis_params = STANDALONE_REDIS_PARAMS) require "logstash/event" require "redis" + require "redis-clustering" require "stud/try" - redis = Redis.new(:host => "localhost") + redis = redis_params.key?(:nodes) ? ::Redis::Cluster.new(redis_params) : ::Redis.new(redis_params) event_count.times do |value| event = LogStash::Event.new("sequence" => value) Stud.try(10.times) do @@ -30,11 +55,10 @@ def process(conf, event_count) # integration tests --------------------- -describe "inputs/redis", :redis => true do - +shared_examples "redis list integration" do |redis_params, extra_config| it "should read events from a list" do key = SecureRandom.hex - event_count = 1000 + rand(50) + event_count = 1001 + rand(50) conf = <<-CONFIG input { redis { @@ -42,32 +66,44 @@ def process(conf, event_count) key => "#{key}" data_type => "list" batch_count => 1 + #{extra_config} } } CONFIG - - populate(key, event_count) + populate(key, event_count, redis_params) process(conf, event_count) end it "should read events from a list using batch_count (default 125)" do key = SecureRandom.hex - event_count = 1000 + rand(50) + event_count = 1001 + rand(50) conf = <<-CONFIG input { redis { type => "blah" key => "#{key}" data_type => "list" + #{extra_config} } } CONFIG - - populate(key, event_count) + populate(key, event_count, redis_params) process(conf, event_count) end end +describe "inputs/redis standalone", :redis => true do + include_examples "redis list integration", STANDALONE_REDIS_PARAMS, "" +end + +describe "inputs/redis sentinel", :redis => true do + include_examples "redis list integration", SENTINEL_REDIS_PARAMS, SENTINEL_EXTRA_CONFIG +end + +describe "inputs/redis cluster", :redis => true do + include_examples "redis list integration", CLUSTER_REDIS_PARAMS, CLUSTER_EXTRA_CONFIG +end + describe LogStash::Inputs::Redis do let(:queue) { Queue.new } @@ -104,35 +140,25 @@ def process(conf, event_count) } end - it 'sets the renamed commands in the command map' do - allow_any_instance_of( Redis::Client ).to receive(:call) do |_, command| + it 'registers and connects without error' do + allow_any_instance_of( Redis::Client ).to receive(:call_v) do |_, command| expect(command[0]).to eql :script - expect(command[1]).to eql 'load' - end + expect(command[1].to_s.downcase).to eql 'load' + end.and_return('a' * 40) subject.register - redis = subject.send :connect - - command_map = redis._client.command_map - - expect(command_map[:blpop]).to eq config['command_map']['blpop'].to_sym - expect(command_map[:evalsha]).to eq config['command_map']['evalsha'].to_sym - expect(command_map[:lrange]).to eq config['command_map']['lrange'].to_sym - expect(command_map[:ltrim]).to eq config['command_map']['ltrim'].to_sym - expect(command_map[:script]).to eq config['command_map']['script'].to_sym - expect(command_map[:subscribe]).to eq config['command_map']['subscribe'].to_sym - expect(command_map[:psubscribe]).to eq config['command_map']['psubscribe'].to_sym + expect { subject.send :connect }.not_to raise_error end it 'loads the batch script with the renamed command' do - expect_any_instance_of( Redis::Client ).to receive(:call) do |_, command| + expect_any_instance_of( Redis::Client ).to receive(:call_v) do |_, command| expect(command[0]).to eql :script - expect(command[1]).to eql 'load' + expect(command[1].to_s.downcase).to eql 'load' script = command[2] expect(script).to include "redis.call('#{config['command_map']['lrange']}', KEYS[1], 0, batchsize)" expect(script).to include "redis.call('#{config['command_map']['ltrim']}', KEYS[1], batchsize + 1, -1)" - end + end.and_return('a' * 40) subject.register subject.send :connect @@ -163,9 +189,10 @@ def process(conf, event_count) end it 'calling the run method, adds events to the queue' do - allow_any_instance_of( Redis::Client ).to receive(:call_with_timeout) do |_, command, timeout, &block| + allow_any_instance_of( Redis::Client ).to receive(:blocking_call_v) do |_, timeout, command| expect(command[0]).to eql :blpop - expect(command[1]).to eql ['foo', 0] + expect(command[1]).to eql 'foo' + expect(command[2]).to eql 1 end.and_return ['foo', "{\"foo1\":\"bar\"}"], nil tt = Thread.new do @@ -182,7 +209,7 @@ def process(conf, event_count) it 'keeps running when a connection error occurs' do raised = false - allow_any_instance_of( Redis::Client ).to receive(:call_with_timeout) do |_, command, timeout, &block| + allow_any_instance_of( Redis::Client ).to receive(:blocking_call_v) do |_, timeout, command| expect(command[0]).to eql :blpop unless raised raised = true @@ -247,8 +274,8 @@ def process(conf, event_count) let(:batch_count) { 10 } it 'calling the run method, adds events to the queue' do - allow_any_instance_of( Redis ).to receive(:script) - allow_any_instance_of( Redis::Client ).to receive(:call) do |_, command| + allow_any_instance_of( Redis ).to receive(:script).and_return('a' * 40) + allow_any_instance_of( Redis::Client ).to receive(:call_v) do |_, command| expect(command[0]).to eql :evalsha end.and_return ['{"a": 1}', '{"b": 2}'], [] @@ -270,8 +297,8 @@ def process(conf, event_count) let(:rates) { [] } it 'will throttle the loop' do - allow_any_instance_of( Redis ).to receive(:script) - allow_any_instance_of( Redis::Client ).to receive(:call) do |_, command| + allow_any_instance_of( Redis ).to receive(:script).and_return('a' * 40) + allow_any_instance_of( Redis::Client ).to receive(:call_v) do |_, command| expect(command[0]).to eql :evalsha rates.unshift Time.now.to_f end.and_return [] @@ -292,7 +319,7 @@ def process(conf, event_count) expect( queue.size ).to eq(0) inters.each do |delta| - expect(delta).to be_within(0.01).of(LogStash::Inputs::Redis::BATCH_EMPTY_SLEEP) + expect(delta).to be_within(0.05).of(LogStash::Inputs::Redis::BATCH_EMPTY_SLEEP) end end end diff --git a/version b/version index a76ccff..1981190 100644 --- a/version +++ b/version @@ -1 +1 @@ -3.7.1 +3.8.0