Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redis supports failover through Sentinel #50

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 34 additions & 7 deletions lib/logstash/inputs/redis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ module LogStash module Inputs class Redis < LogStash::Inputs::Threadable
# The number of events to return from Redis using EVAL.
config :batch_count, :validate => :number, :default => 125

config :sentinel_hosts, :validate => :array

config :master, :validate => :string, :default => "mymaster"

public
# public API
# use to store a proc that can provide a redis instance or mock
Expand All @@ -72,8 +76,8 @@ def new_redis_instance
end

def register
@redis_url = @path.nil? ? "redis://#{@password}@#{@host}:#{@port}/#{@db}" : "#{@password}@#{@path}/#{@db}"

@redis_url = identity
@redis_builder ||= method(:internal_redis_builder)

# just switch on data_type once
Expand All @@ -90,10 +94,17 @@ def register

@list_method = batched? ? method(:list_batch_listener) : method(:list_single_listener)

@identity = "#{@redis_url} #{@data_type}:#{@key}"
@identity = identity
@logger.info("Registering Redis", :identity => @identity)
end # def register

def identity
if @sentinel_hosts
return "redis-sentinel://#{@password} #{$sentinel_hosts} #{@db} #{@data_type}:#{@key}"
end
@path.nil? ? "redis://#{@password}@#{@host}:#{@port}/#{@db}" : "#{@password}@#{@path}/#{@db}"
end

def run(output_queue)
@run_method.call(output_queue)
rescue LogStash::ShutdownSignal
Expand All @@ -119,10 +130,26 @@ def is_list_type?
# private
def redis_params
if @path.nil?
connectionParams = {
:host => @host,
:port => @port
}
if @sentinel_hosts.nil?
connectionParams = {
:host => @host,
:port => @port
}
else
hosts = []
for sentinel_host in @sentinel_hosts
host, port = sentinel_host.split(":")
unless port
port = @sentinel_port
end
hosts.push({:host => host, :port => port})
end
connectionParams = {
:url => 'redis://' + @master,
:sentinels => hosts,
:role => :master
}
end
else
@logger.warn("Parameter 'path' is set, ignoring parameters: 'host' and 'port'")
connectionParams = {
Expand Down