Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

List pattern support #81

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 42 additions & 6 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,14 @@ Redis allows for the renaming or disabling of commands in its protocol, see: ht
===== `data_type`

* This is a required setting.
* Value can be any of: `list`, `channel`, `pattern_channel`
* Value can be any of: `list`, `pattern_list`, `channel`, `pattern_channel`
* There is no default value for this setting.

Specify either list or channel. If `data_type` is `list`, then we will BLPOP the
key. If `data_type` is `channel`, then we will SUBSCRIBE to the key.
If `data_type` is `pattern_channel`, then we will PSUBSCRIBE to the key.
key. If `data_type` is `pattern_list`, then we will spawn a number of worker
threads that will LPOP from keys matching that pattern. If `data_type` is
`channel`, then we will SUBSCRIBE to the key. If `data_type` is
`pattern_channel`, then we will PSUBSCRIBE to the key.

[id="plugins-{type}s-{plugin}-db"]
===== `db`
Expand Down Expand Up @@ -125,6 +127,7 @@ The unix socket path of your Redis server.

The name of a Redis list or channel.


[id="plugins-{type}s-{plugin}-password"]
===== `password`

Expand All @@ -133,6 +136,37 @@ The name of a Redis list or channel.

Password to authenticate with. There is no authentication by default.


[id="plugins-{type}s-{plugin}-pattern_list_max_items"]
===== `pattern_list_max_items`

* Value type is <<number,number>>
* Default value is `1000`

Maximum number of items for a single worker thread to process when `data_type` is `pattern_list`.
After the list is empty or this number of items have been processed, the thread will exit and a
new one will be started if there are non-empty lists matching the pattern without a consumer.


[id="plugins-{type}s-{plugin}-pattern_list_threadpool_sleep"]
===== `pattern_list_threadpool_sleep`

* Value type is <<number,number>>
* Default value is `0.2`

Time to sleep in main loop after checking if more threads can/need to be spawned.
Applies to `data_type` is `pattern_list`


[id="plugins-{type}s-{plugin}-pattern_list_threads"]
===== `pattern_list_threads`

* Value type is <<number,number>>
* Default value is `20`

Maximum number of worker threads to spawn when using `data_type` `pattern_list`.


[id="plugins-{type}s-{plugin}-port"]
===== `port`

Expand All @@ -141,8 +175,9 @@ Password to authenticate with. There is no authentication by default.

The port to connect on.


[id="plugins-{type}s-{plugin}-ssl"]
===== `ssl`
===== `ssl`

* Value type is <<boolean,boolean>>
* Default value is `false`
Expand All @@ -157,7 +192,6 @@ Enable SSL support.
* Default value is `1`



[id="plugins-{type}s-{plugin}-timeout"]
===== `timeout`

Expand All @@ -166,7 +200,9 @@ Enable SSL support.

Initial connection timeout in seconds.



[id="plugins-{type}s-{plugin}-common-options"]
include::{include_path}/{type}.asciidoc[]

:default_codec!:
:default_codec!:
159 changes: 146 additions & 13 deletions lib/logstash/inputs/redis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
require "logstash/inputs/base"
require "logstash/inputs/threadable"
require 'redis'
require 'concurrent'
require 'concurrent/executors'
require "stud/interval"

# This input will read events from a Redis instance; it supports both Redis channels and lists.
Expand Down Expand Up @@ -50,25 +52,51 @@ module LogStash module Inputs class Redis < LogStash::Inputs::Threadable
config :key, :validate => :string, :required => true

# Specify either list or channel. If `data_type` is `list`, then we will BLPOP the
# key. If `data_type` is `channel`, then we will SUBSCRIBE to the key.
# If `data_type` is `pattern_channel`, then we will PSUBSCRIBE to the key.
config :data_type, :validate => [ "list", "channel", "pattern_channel" ], :required => true
# key. If `data_type` is `pattern_list`, then we will spawn a number of worker
# threads that will LPOP from keys matching that pattern. If `data_type` is
# `channel`, then we will SUBSCRIBE to the key. If `data_type` is `pattern_channel`,
# then we will PSUBSCRIBE to the key.
config :data_type, :validate => [ "list", "pattern_list", "channel", "pattern_channel" ], :required => true

# The number of events to return from Redis using EVAL.
config :batch_count, :validate => :number, :default => 125

# Redefined Redis commands to be passed to the Redis client.
config :command_map, :validate => :hash, :default => {}

# Maximum number of worker threads to spawn when using `data_type` `pattern_list`.
config :pattern_list_threads, :validate => :number, :default => 20

# Maximum number of items for a single worker thread to process when `data_type` is `pattern_list`.
# After the list is empty or this number of items have been processed, the thread will exit and a
# new one will be started if there are non-empty lists matching the pattern without a consumer.
config :pattern_list_max_items, :validate => :number, :default => 1000

# Time to sleep in main loop after checking if more threads can/need to be spawned.
# Applies to `data_type` is `pattern_list`
config :pattern_list_threadpool_sleep, :validate => :number, :default => 0.2

public

def init_threadpool
@threadpool ||= Concurrent::ThreadPoolExecutor.new(
min_threads: @pattern_list_threads,
max_threads: @pattern_list_threads,
max_queue: 2 * @pattern_list_threads
)
@current_workers ||= Concurrent::Set.new
end

def register
@redis_url = @path.nil? ? "redis://#{@password}@#{@host}:#{@port}/#{@db}" : "#{@password}@#{@path}/#{@db}"

# just switch on data_type once
if @data_type == 'list' || @data_type == 'dummy'
@run_method = method(:list_runner)
@stop_method = method(:list_stop)
elsif @data_type == 'pattern_list'
@run_method = method(:pattern_list_runner)
@stop_method = method(:pattern_list_stop)
elsif @data_type == 'channel'
@run_method = method(:channel_runner)
@stop_method = method(:subscribe_stop)
Expand All @@ -77,8 +105,6 @@ def register
@stop_method = method(:subscribe_stop)
end

@list_method = batched? ? method(:list_batch_listener) : method(:list_single_listener)

@identity = "#{@redis_url} #{@data_type}:#{@key}"
@logger.info("Registering Redis", :identity => @identity)
end # def register
Expand All @@ -102,7 +128,7 @@ def batched?

# private
def is_list_type?
@data_type == 'list'
@data_type == 'list' || @data_type == 'pattern_list'
end

# private
Expand Down Expand Up @@ -169,7 +195,7 @@ def queue_event(msg, output_queue, channel=nil)
end

# private
def list_stop
def reset_redis
redis = @redis # might change during method invocation
return if redis.nil? || !redis.connected?

Expand All @@ -179,8 +205,14 @@ def list_stop
@redis = nil
end

# private
def list_stop
reset_redis
end

# private
def list_runner(output_queue)
@list_method = batched? ? method(:list_batch_listener) : method(:list_single_listener)
while !stop?
begin
@redis ||= connect
Expand All @@ -192,16 +224,113 @@ def list_runner(output_queue)
end
end

def list_batch_listener(redis, output_queue)
#private
def reset_threadpool
return if @threadpool.nil?
@threadpool.shutdown
@threadpool.wait_for_termination
@threadpool = nil
end

# private
def pattern_list_stop
reset_redis
reset_threadpool
end

# private
def pattern_list_process_item(redis, output_queue, key)
if stop?
@logger.debug("Breaking from thread #{key} as it was requested to stop")
return false
end
value = redis.lpop(key)
return false if value.nil?
queue_event(value, output_queue)
true
end

# private
def pattern_list_single_processor(redis, output_queue, key)
(0...@pattern_list_max_items).each do
break unless pattern_list_process_item(redis, output_queue, key)
end
end

# private
def pattern_list_batch_processor(redis, output_queue, key)
items_left = @pattern_list_max_items
while items_left > 0
limit = [items_left, @batch_count].min
processed = process_batch(redis, output_queue, key, limit, 0)
if processed.zero? || processed < limit
return
end
items_left -= processed
end
end

# private
def pattern_list_worker_consume(output_queue, key)
begin
results = redis.evalsha(@redis_script_sha, [@key], [@batch_count-1])
results.each do |item|
queue_event(item, output_queue)
redis ||= connect
@pattern_list_processor.call(redis, output_queue, key)
rescue ::Redis::BaseError => e
@logger.warn("Redis connection problem in thread for key #{key}. Sleeping a while before exiting thread.", :exception => e)
sleep 1
return
ensure
redis.quit rescue nil
end
end

# private
def threadpool_capacity?
@threadpool.remaining_capacity > 0
end

# private
def pattern_list_launch_worker(output_queue, key)
@current_workers.add(key)
@threadpool.post do
begin
pattern_list_worker_consume(output_queue, key)
ensure
@current_workers.delete(key)
end
end
end

if results.size.zero?
sleep BATCH_EMPTY_SLEEP
# private
def pattern_list_ensure_workers(output_queue)
return unless threadpool_capacity?
redis_runner do
@redis.keys(@key).shuffle.each do |key|
next if @current_workers.include?(key)
pattern_list_launch_worker(output_queue, key)
break unless threadpool_capacity?
end
end
end

# private
def pattern_list_runner(output_queue)
@pattern_list_processor = batched? ? method(:pattern_list_batch_processor) : method(:pattern_list_single_processor)
while !stop?
init_threadpool if @threadpool.nil?
pattern_list_ensure_workers(output_queue)
sleep(@pattern_list_threadpool_sleep)
end
end

def process_batch(redis, output_queue, key, batch_size, sleep_time)
begin
results = redis.evalsha(@redis_script_sha, [key], [batch_size-1])
results.each do |item|
queue_event(item, output_queue)
end
sleep sleep_time if results.size.zero? && sleep_time > 0
results.size

# Below is a commented-out implementation of 'batch fetch'
# using pipelined LPOP calls. This in practice has been observed to
Expand Down Expand Up @@ -230,6 +359,10 @@ def list_batch_listener(redis, output_queue)
end
end

def list_batch_listener(redis, output_queue)
process_batch(redis, output_queue, @key, @batch_count, BATCH_EMPTY_SLEEP)
end

def list_single_listener(redis, output_queue)
item = redis.blpop(@key, 0, :timeout => 1)
return unless item # from timeout or other conditions
Expand Down
Loading