Skip to content

Commit a26004a

Browse files
author
Juuso Mäyränen
committed
Extract redis connection logic out of input class
1 parent 12e270c commit a26004a

File tree

2 files changed

+146
-136
lines changed

2 files changed

+146
-136
lines changed

lib/logstash/inputs/redis.rb

+12-136
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# encoding: utf-8
2+
require_relative '../plugin_mixins/redis_connection'
23
require "logstash/namespace"
34
require "logstash/inputs/base"
45
require "logstash/inputs/threadable"
@@ -19,34 +20,15 @@
1920
# newer. Anything older does not support the operations used by batching.
2021
#
2122
module LogStash module Inputs class Redis < LogStash::Inputs::Threadable
23+
24+
include ::LogStash::PluginMixins::RedisConnection
25+
2226
BATCH_EMPTY_SLEEP = 0.25
2327

2428
config_name "redis"
2529

2630
default :codec, "json"
2731

28-
# The hostname of your Redis server.
29-
config :host, :validate => :string, :default => "127.0.0.1"
30-
31-
# The port to connect on.
32-
config :port, :validate => :number, :default => 6379
33-
34-
# SSL
35-
config :ssl, :validate => :boolean, :default => false
36-
37-
# The unix socket path to connect on. Will override host and port if defined.
38-
# There is no unix socket path by default.
39-
config :path, :validate => :string
40-
41-
# The Redis database number.
42-
config :db, :validate => :number, :default => 0
43-
44-
# Initial connection timeout in seconds.
45-
config :timeout, :validate => :number, :default => 5
46-
47-
# Password to authenticate with. There is no authentication by default.
48-
config :password, :validate => :password
49-
5032
# The name of a Redis list or channel.
5133
config :key, :validate => :string, :required => true
5234

@@ -60,9 +42,6 @@ module LogStash module Inputs class Redis < LogStash::Inputs::Threadable
6042
# The number of events to return from Redis using EVAL.
6143
config :batch_count, :validate => :number, :default => 125
6244

63-
# Redefined Redis commands to be passed to the Redis client.
64-
config :command_map, :validate => :hash, :default => {}
65-
6645
# Maximum number of worker threads to spawn when using `data_type` `pattern_list`.
6746
config :pattern_list_threads, :validate => :number, :default => 20
6847

@@ -76,23 +55,6 @@ module LogStash module Inputs class Redis < LogStash::Inputs::Threadable
7655
config :pattern_list_threadpool_sleep, :validate => :number, :default => 0.2
7756

7857
public
79-
# public API
80-
# use to store a proc that can provide a Redis instance or mock
81-
def add_external_redis_builder(builder) #callable
82-
@redis_builder = builder
83-
self
84-
end
85-
86-
# use to apply an instance directly and bypass the builder
87-
def use_redis(instance)
88-
@redis = instance
89-
self
90-
end
91-
92-
def new_redis_instance
93-
@redis_builder.call
94-
end
95-
9658
def init_threadpool
9759
@threadpool ||= Concurrent::ThreadPoolExecutor.new(
9860
min_threads: @pattern_list_threads,
@@ -122,6 +84,8 @@ def register
12284
@stop_method = method(:subscribe_stop)
12385
end
12486

87+
@batched = is_list_type? && batched?
88+
12589
@identity = "#{@redis_url} #{@data_type}:#{@key}"
12690
@logger.info("Registering Redis", :identity => @identity)
12791
end # def register
@@ -148,63 +112,6 @@ def is_list_type?
148112
@data_type == 'list' || @data_type == 'pattern_list'
149113
end
150114

151-
# private
152-
def redis_params
153-
if @path.nil?
154-
connectionParams = {
155-
:host => @host,
156-
:port => @port
157-
}
158-
else
159-
@logger.warn("Parameter 'path' is set, ignoring parameters: 'host' and 'port'")
160-
connectionParams = {
161-
:path => @path
162-
}
163-
end
164-
165-
baseParams = {
166-
:timeout => @timeout,
167-
:db => @db,
168-
:password => @password.nil? ? nil : @password.value,
169-
:ssl => @ssl
170-
}
171-
172-
return connectionParams.merge(baseParams)
173-
end
174-
175-
# private
176-
def internal_redis_builder
177-
::Redis.new(redis_params)
178-
end
179-
180-
# private
181-
def connect
182-
redis = new_redis_instance
183-
184-
# register any renamed Redis commands
185-
if @command_map.any?
186-
client_command_map = redis.client.command_map
187-
@command_map.each do |name, renamed|
188-
client_command_map[name.to_sym] = renamed.to_sym
189-
end
190-
end
191-
192-
load_batch_script(redis) if batched? && is_list_type?
193-
redis
194-
end # def connect
195-
196-
# private
197-
def load_batch_script(redis)
198-
#A Redis Lua EVAL script to fetch a count of keys
199-
redis_script = <<EOF
200-
local batchsize = tonumber(ARGV[1])
201-
local result = redis.call(\'#{@command_map.fetch('lrange', 'lrange')}\', KEYS[1], 0, batchsize)
202-
redis.call(\'#{@command_map.fetch('ltrim', 'ltrim')}\', KEYS[1], batchsize + 1, -1)
203-
return result
204-
EOF
205-
@redis_script_sha = redis.script(:load, redis_script)
206-
end
207-
208115
# private
209116
def queue_event(msg, output_queue, channel=nil)
210117
begin
@@ -218,33 +125,16 @@ def queue_event(msg, output_queue, channel=nil)
218125
end
219126
end
220127

221-
# private
222-
def reset_redis
223-
return if @redis.nil? || !@redis.connected?
224-
225-
@redis.quit rescue nil
226-
@redis = nil
227-
end
228-
229-
# private
230128
def list_stop
231129
reset_redis
232130
end
233131

234132
# private
235133
def list_runner(output_queue)
236-
@list_method = batched? ? method(:list_batch_listener) : method(:list_single_listener)
134+
@list_method = @batched ? method(:list_batch_listener) : method(:list_single_listener)
237135
while !stop?
238-
begin
239-
@redis ||= connect
136+
redis_runner(@batched) do
240137
@list_method.call(@redis, output_queue)
241-
rescue ::Redis::BaseError => e
242-
@logger.warn("Redis connection problem", :exception => e)
243-
# Reset the redis variable to trigger reconnect
244-
@redis = nil
245-
# this sleep does not need to be stoppable as its
246-
# in a while !stop? loop
247-
sleep 1
248138
end
249139
end
250140
end
@@ -298,7 +188,7 @@ def pattern_list_batch_processor(redis, output_queue, key)
298188
# private
299189
def pattern_list_worker_consume(output_queue, key)
300190
begin
301-
redis ||= connect
191+
redis ||= connect(@batched)
302192
@pattern_list_processor.call(redis, output_queue, key)
303193
rescue ::Redis::BaseError => e
304194
@logger.warn("Redis connection problem in thread for key #{key}. Sleeping a while before exiting thread.", :exception => e)
@@ -329,7 +219,7 @@ def pattern_list_launch_worker(output_queue, key)
329219
# private
330220
def pattern_list_ensure_workers(output_queue)
331221
return unless threadpool_capacity?
332-
redis_runner do
222+
redis_runner(@batched) do
333223
@redis.keys(@key).shuffle.each do |key|
334224
next if @current_workers.include?(key)
335225
pattern_list_launch_worker(output_queue, key)
@@ -340,7 +230,7 @@ def pattern_list_ensure_workers(output_queue)
340230

341231
# private
342232
def pattern_list_runner(output_queue)
343-
@pattern_list_processor = batched? ? method(:pattern_list_batch_processor) : method(:pattern_list_single_processor)
233+
@pattern_list_processor = @batched ? method(:pattern_list_batch_processor) : method(:pattern_list_single_processor)
344234
while !stop?
345235
init_threadpool if @threadpool.nil?
346236
pattern_list_ensure_workers(output_queue)
@@ -375,7 +265,7 @@ def process_batch(redis, output_queue, key, batch_size, sleep_time)
375265
# which should further improve the efficiency of the script
376266
rescue ::Redis::CommandError => e
377267
if e.to_s =~ /NOSCRIPT/ then
378-
@logger.warn("Redis may have been restarted, reloading Redis batch EVAL script", :exception => e);
268+
@logger.warn("Redis may have been restarted, reloading Redis batch EVAL script", :exception => e)
379269
load_batch_script(redis)
380270
retry
381271
else
@@ -414,20 +304,6 @@ def subscribe_stop
414304
@redis = nil
415305
end
416306

417-
# private
418-
def redis_runner
419-
begin
420-
@redis ||= connect
421-
yield
422-
rescue ::Redis::BaseError => e
423-
@logger.warn("Redis connection problem", :exception => e)
424-
# Reset the redis variable to trigger reconnect
425-
@redis = nil
426-
Stud.stoppable_sleep(1) { stop? }
427-
retry if !stop?
428-
end
429-
end
430-
431307
# private
432308
def channel_runner(output_queue)
433309
redis_runner do
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
# encoding: utf-8
2+
require "logstash/namespace"
3+
4+
module LogStash module PluginMixins module RedisConnection
5+
def self.included(base)
6+
base.extend(self)
7+
base.setup_redis_connection_config
8+
end
9+
10+
public
11+
12+
def setup_redis_connection_config
13+
# The hostname of your Redis server.
14+
config :host, :validate => :string, :default => "127.0.0.1"
15+
16+
# The port to connect on.
17+
config :port, :validate => :number, :default => 6379
18+
19+
# SSL
20+
config :ssl, :validate => :boolean, :default => false
21+
22+
# The unix socket path to connect on. Will override host and port if defined.
23+
# There is no unix socket path by default.
24+
config :path, :validate => :string
25+
26+
# The Redis database number.
27+
config :db, :validate => :number, :default => 0
28+
29+
# Initial connection timeout in seconds.
30+
config :timeout, :validate => :number, :default => 5
31+
32+
# Password to authenticate with. There is no authentication by default.
33+
config :password, :validate => :password
34+
35+
# Redefined Redis commands to be passed to the Redis client.
36+
config :command_map, :validate => :hash, :default => {}
37+
end
38+
39+
# public API
40+
# use to store a proc that can provide a Redis instance or mock
41+
def add_external_redis_builder(builder) #callable
42+
@redis_builder = builder
43+
self
44+
end
45+
46+
# use to apply an instance directly and bypass the builder
47+
def use_redis(instance)
48+
@redis = instance
49+
self
50+
end
51+
52+
def new_redis_instance
53+
@redis_builder.call
54+
end
55+
56+
private
57+
58+
# private
59+
def redis_params
60+
if @path.nil?
61+
connection_params = {
62+
:host => @host,
63+
:port => @port
64+
}
65+
else
66+
@logger.warn("Parameter 'path' is set, ignoring parameters: 'host' and 'port'")
67+
connection_params = {
68+
:path => @path
69+
}
70+
end
71+
72+
base_params = {
73+
:timeout => @timeout,
74+
:db => @db,
75+
:password => @password.nil? ? nil : @password.value,
76+
:ssl => @ssl
77+
}
78+
79+
connection_params.merge(base_params)
80+
end
81+
82+
# private
83+
def internal_redis_builder
84+
::Redis.new(redis_params)
85+
end
86+
87+
# private
88+
def reset_redis
89+
return if @redis.nil? || !@redis.connected?
90+
91+
@redis.quit rescue nil
92+
@redis = nil
93+
end
94+
95+
# private
96+
def connect(batch=false)
97+
redis = new_redis_instance
98+
# register any renamed Redis commands
99+
if @command_map.any?
100+
client_command_map = redis.client.command_map
101+
@command_map.each do |name, renamed|
102+
client_command_map[name.to_sym] = renamed.to_sym
103+
end
104+
end
105+
load_batch_script(redis) if batch
106+
redis
107+
end
108+
109+
# private
110+
def load_batch_script(redis)
111+
#A Redis Lua EVAL script to fetch a count of keys
112+
redis_script = <<EOF
113+
local batchsize = tonumber(ARGV[1])
114+
local result = redis.call(\'#{@command_map.fetch('lrange', 'lrange')}\', KEYS[1], 0, batchsize)
115+
redis.call(\'#{@command_map.fetch('ltrim', 'ltrim')}\', KEYS[1], batchsize + 1, -1)
116+
return result
117+
EOF
118+
@redis_script_sha = redis.script(:load, redis_script)
119+
end
120+
121+
# private
122+
def redis_runner(batched=false)
123+
begin
124+
@redis ||= connect(batched)
125+
yield
126+
rescue ::Redis::BaseError => e
127+
@logger.warn("Redis connection problem", :exception => e)
128+
# Reset the redis variable to trigger reconnect
129+
@redis = nil
130+
Stud.stoppable_sleep(1) { stop? }
131+
retry unless stop?
132+
end
133+
end
134+
end end end # RedisConnection PluginMixins LogStash

0 commit comments

Comments
 (0)