@@ -16,11 +16,17 @@ def populate(key, event_count)
16
16
end
17
17
end
18
18
19
- def process ( conf , event_count )
19
+ def wait_events ( conf , event_count )
20
20
events = input ( conf ) do |_ , queue |
21
21
sleep 0.1 until queue . size >= event_count
22
22
queue . size . times . map { queue . pop }
23
23
end
24
+ expect ( events . size ) . to eq event_count
25
+ events
26
+ end
27
+
28
+ def process ( conf , event_count )
29
+ events = wait_events ( conf , event_count )
24
30
# due multiple workers we get events out-of-order in the output
25
31
events . sort! { |a , b | a . get ( 'sequence' ) <=> b . get ( 'sequence' ) }
26
32
expect ( events [ 0 ] . get ( 'sequence' ) ) . to eq ( 0 )
@@ -73,7 +79,7 @@ def process(conf, event_count)
73
79
input {
74
80
redis {
75
81
type => "blah"
76
- key => "#{ key } .*"
82
+ key => "#{ key_base } .*"
77
83
data_type => "pattern_list"
78
84
batch_count => 1
79
85
}
@@ -85,7 +91,7 @@ def process(conf, event_count)
85
91
total_event_count += event_count
86
92
populate ( "#{ key_base } .#{ idx } " , event_count )
87
93
end
88
- process ( conf , total_event_count )
94
+ wait_events ( conf , total_event_count )
89
95
end
90
96
91
97
it "should read events from a list pattern using batch_count (default 125)" do
@@ -94,7 +100,7 @@ def process(conf, event_count)
94
100
input {
95
101
redis {
96
102
type => "blah"
97
- key => "#{ key } .*"
103
+ key => "#{ key_base } .*"
98
104
data_type => "pattern_list"
99
105
batch_count => 125
100
106
}
@@ -106,7 +112,7 @@ def process(conf, event_count)
106
112
total_event_count += event_count
107
113
populate ( "#{ key_base } .#{ idx } " , event_count )
108
114
end
109
- process ( conf , total_event_count )
115
+ wait_events ( conf , total_event_count )
110
116
end
111
117
end
112
118
0 commit comments