1
1
package org .influxdb ;
2
2
3
- import static org .mockito .ArgumentMatchers .any ;
4
- import static org .mockito .Mockito .atLeastOnce ;
5
- import static org .mockito .Mockito .doAnswer ;
6
- import static org .mockito .Mockito .doThrow ;
7
- import static org .mockito .Mockito .mock ;
8
- import static org .mockito .Mockito .spy ;
9
- import static org .mockito .Mockito .times ;
10
- import static org .mockito .Mockito .verify ;
11
-
12
3
import java .io .IOException ;
13
- import java .util .List ;
14
- import java .util .function .BiConsumer ;
15
4
16
5
import org .influxdb .InfluxDB .ResponseFormat ;
17
- import org .influxdb .InfluxDBException .DatabaseNotFoundException ;
18
- import org .influxdb .dto .BatchPoints ;
19
- import org .influxdb .dto .Point ;
20
- import org .influxdb .dto .Query ;
21
- import org .influxdb .dto .QueryResult ;
22
- import org .influxdb .dto .QueryResult .Series ;
23
- import org .junit .jupiter .api .Assertions ;
24
6
import org .junit .jupiter .api .BeforeEach ;
25
- import org .junit .jupiter .api .Test ;
26
7
import org .junit .jupiter .api .condition .EnabledIfEnvironmentVariable ;
27
8
import org .junit .platform .runner .JUnitPlatform ;
28
9
import org .junit .runner .RunWith ;
29
- import org .mockito .invocation .InvocationOnMock ;
30
10
31
11
/**
32
- * Test the InfluxDB API over MessagePack format
12
+ * Test the BatchOptions with MessagePack format
33
13
*
34
14
* @author hoan.le [at] bonitoo.io
35
15
*
@@ -43,229 +23,4 @@ public class MessagePackBatchOptionsTest extends BatchOptionsTest {
43
23
public void setUp () throws InterruptedException , IOException {
44
24
influxDB = TestUtils .connectToInfluxDB (ResponseFormat .MSGPACK );
45
25
}
46
-
47
- /**
48
- * Test the implementation of {@link BatchOptions#flushDuration(int)} }.
49
- *
50
- * @throws InterruptedException
51
- */
52
- @ Override
53
- @ Test
54
- public void testFlushDuration () throws InterruptedException {
55
- String dbName = "write_unittest_" + System .currentTimeMillis ();
56
- try {
57
- BatchOptions options = BatchOptions .DEFAULTS .flushDuration (200 );
58
- influxDB .createDatabase (dbName );
59
- influxDB .setDatabase (dbName );
60
- influxDB .enableBatch (options );
61
- write20Points (influxDB );
62
-
63
- // check no points writen to DB before the flush duration
64
- QueryResult result = influxDB .query (new Query ("select * from weather" , dbName ));
65
- List <Series > series = result .getResults ().get (0 ).getSeries ();
66
- Assertions .assertNull (series );
67
- Assertions .assertNull (result .getResults ().get (0 ).getError ());
68
-
69
- // wait for at least one flush
70
- Thread .sleep (500 );
71
- result = influxDB .query (new Query ("select * from weather" , dbName ));
72
-
73
- // check points written already to DB
74
- Assertions .assertEquals (20 , result .getResults ().get (0 ).getSeries ().get (0 ).getValues ().size ());
75
- } finally {
76
- this .influxDB .disableBatch ();
77
- this .influxDB .deleteDatabase (dbName );
78
- }
79
- }
80
-
81
- /**
82
- * Test the implementation of {@link BatchOptions#jitterDuration(int)} }.
83
- *
84
- * @throws InterruptedException
85
- */
86
- @ Override
87
- @ Test
88
- public void testJitterDuration () throws InterruptedException {
89
-
90
- String dbName = "write_unittest_" + System .currentTimeMillis ();
91
- try {
92
- BatchOptions options = BatchOptions .DEFAULTS .flushDuration (100 ).jitterDuration (500 );
93
- influxDB .createDatabase (dbName );
94
- influxDB .setDatabase (dbName );
95
- influxDB .enableBatch (options );
96
- write20Points (influxDB );
97
-
98
- Thread .sleep (100 );
99
-
100
- QueryResult result = influxDB .query (new Query ("select * from weather" , dbName ));
101
- List <Series > series = result .getResults ().get (0 ).getSeries ();
102
- Assertions .assertNull (series );
103
- Assertions .assertNull (result .getResults ().get (0 ).getError ());
104
-
105
- // wait for at least one flush
106
- Thread .sleep (1000 );
107
- result = influxDB .query (new Query ("select * from weather" , dbName ));
108
- Assertions .assertEquals (20 , result .getResults ().get (0 ).getSeries ().get (0 ).getValues ().size ());
109
- } finally {
110
- influxDB .disableBatch ();
111
- influxDB .deleteDatabase (dbName );
112
- }
113
- }
114
-
115
- /**
116
- * Test the implementation of {@link BatchOptions#bufferLimit(int)} }. use a
117
- * bufferLimit that less than actions, then OneShotBatchWrite is used
118
- */
119
- @ Override
120
- @ Test
121
- public void testBufferLimitLessThanActions () throws InterruptedException {
122
-
123
- TestAnswer answer = new TestAnswer () {
124
-
125
- InfluxDBException influxDBException = InfluxDBException
126
- .buildExceptionForErrorState (createErrorBody (InfluxDBException .CACHE_MAX_MEMORY_SIZE_EXCEEDED_ERROR ));
127
-
128
- @ Override
129
- protected void check (InvocationOnMock invocation ) {
130
- if ((Boolean ) params .get ("throwException" )) {
131
- throw influxDBException ;
132
- }
133
- }
134
- };
135
-
136
- InfluxDB spy = spy (influxDB );
137
- // the spied influxDB.write(BatchPoints) will always throw InfluxDBException
138
- doAnswer (answer ).when (spy ).write (any (BatchPoints .class ));
139
-
140
- String dbName = "write_unittest_" + System .currentTimeMillis ();
141
- try {
142
- answer .params .put ("throwException" , true );
143
- BiConsumer <Iterable <Point >, Throwable > mockHandler = mock (BiConsumer .class );
144
- BatchOptions options = BatchOptions .DEFAULTS .bufferLimit (3 ).actions (4 ).flushDuration (100 )
145
- .exceptionHandler (mockHandler );
146
-
147
- spy .createDatabase (dbName );
148
- spy .setDatabase (dbName );
149
- spy .enableBatch (options );
150
- write20Points (spy );
151
-
152
- Thread .sleep (300 );
153
- verify (mockHandler , atLeastOnce ()).accept (any (), any ());
154
-
155
- QueryResult result = spy .query (new Query ("select * from weather" , dbName ));
156
- // assert 0 point written because of InfluxDBException and
157
- // OneShotBatchWriter did not retry
158
- List <Series > series = result .getResults ().get (0 ).getSeries ();
159
- Assertions .assertNull (series );
160
- Assertions .assertNull (result .getResults ().get (0 ).getError ());
161
-
162
- answer .params .put ("throwException" , false );
163
- write20Points (spy );
164
- Thread .sleep (300 );
165
- result = spy .query (new Query ("select * from weather" , dbName ));
166
- // assert all 20 points written to DB due to no exception
167
- Assertions .assertEquals (20 , result .getResults ().get (0 ).getSeries ().get (0 ).getValues ().size ());
168
- } finally {
169
- spy .disableBatch ();
170
- spy .deleteDatabase (dbName );
171
- }
172
- }
173
-
174
- /**
175
- * Test the implementation of {@link BatchOptions#bufferLimit(int)} }.
176
- * use a bufferLimit that greater than actions, then RetryCapableBatchWriter is used
177
- */
178
- @ Override
179
- @ Test
180
- public void testBufferLimitGreaterThanActions () throws InterruptedException {
181
- TestAnswer answer = new TestAnswer () {
182
-
183
- int nthCall = 0 ;
184
- InfluxDBException cacheMaxMemorySizeExceededException = InfluxDBException .buildExceptionForErrorState (createErrorBody (InfluxDBException .CACHE_MAX_MEMORY_SIZE_EXCEEDED_ERROR ));
185
- @ Override
186
- protected void check (InvocationOnMock invocation ) {
187
-
188
- switch (nthCall ++) {
189
- case 0 :
190
- throw InfluxDBException .buildExceptionForErrorState (createErrorBody (InfluxDBException .DATABASE_NOT_FOUND_ERROR ));
191
- case 1 :
192
- throw InfluxDBException .buildExceptionForErrorState (createErrorBody (InfluxDBException .CACHE_MAX_MEMORY_SIZE_EXCEEDED_ERROR ));
193
- default :
194
- break ;
195
- }
196
- }
197
- };
198
-
199
- InfluxDB spy = spy (influxDB );
200
- doAnswer (answer ).when (spy ).write (any (BatchPoints .class ));
201
-
202
- String dbName = "write_unittest_" + System .currentTimeMillis ();
203
- try {
204
- BiConsumer <Iterable <Point >, Throwable > mockHandler = mock (BiConsumer .class );
205
- BatchOptions options = BatchOptions .DEFAULTS .bufferLimit (10 ).actions (8 ).flushDuration (100 ).exceptionHandler (mockHandler );
206
-
207
- spy .createDatabase (dbName );
208
- spy .setDatabase (dbName );
209
- spy .enableBatch (options );
210
- writeSomePoints (spy , "measurement1" , 0 , 5 );
211
-
212
- Thread .sleep (300 );
213
- verify (mockHandler , atLeastOnce ()).accept (any (), any ());
214
-
215
- QueryResult result = spy .query (new Query ("select * from measurement1" , dbName ));
216
- //assert 0 point written because of non-retry capable DATABASE_NOT_FOUND_ERROR and RetryCapableBatchWriter did not retry
217
- List <Series > series = result .getResults ().get (0 ).getSeries ();
218
- Assertions .assertNull (series );
219
- Assertions .assertNull (result .getResults ().get (0 ).getError ());
220
-
221
- writeSomePoints (spy , "measurement2" , 0 , 5 );
222
-
223
- Thread .sleep (300 );
224
-
225
- result = spy .query (new Query ("select * from measurement2" , dbName ));
226
- //assert all 6 point written because of retry capable CACHE_MAX_MEMORY_SIZE_EXCEEDED_ERROR and RetryCapableBatchWriter did retry
227
- Assertions .assertEquals (6 , result .getResults ().get (0 ).getSeries ().get (0 ).getValues ().size ());
228
- }
229
- finally {
230
- spy .disableBatch ();
231
- spy .deleteDatabase (dbName );
232
- }
233
- }
234
-
235
-
236
- /**
237
- * Test the implementation of {@link BatchOptions#exceptionHandler(BiConsumer)} }.
238
- * @throws InterruptedException
239
- */
240
- @ Override
241
- @ Test
242
- public void testHandlerOnRetryImpossible () throws InterruptedException {
243
-
244
- String dbName = "write_unittest_" + System .currentTimeMillis ();
245
- InfluxDB spy = spy (influxDB );
246
- doThrow (DatabaseNotFoundException .class ).when (spy ).write (any (BatchPoints .class ));
247
-
248
- try {
249
- BiConsumer <Iterable <Point >, Throwable > mockHandler = mock (BiConsumer .class );
250
- BatchOptions options = BatchOptions .DEFAULTS .exceptionHandler (mockHandler ).flushDuration (100 );
251
-
252
- spy .createDatabase (dbName );
253
- spy .setDatabase (dbName );
254
- spy .enableBatch (options );
255
-
256
- writeSomePoints (spy , 1 );
257
-
258
- Thread .sleep (200 );
259
- verify (mockHandler , times (1 )).accept (any (), any ());
260
-
261
- QueryResult result = influxDB .query (new Query ("select * from weather" , dbName ));
262
- List <Series > series = result .getResults ().get (0 ).getSeries ();
263
- Assertions .assertNull (series );
264
- Assertions .assertNull (result .getResults ().get (0 ).getError ());
265
- } finally {
266
- spy .disableBatch ();
267
- spy .deleteDatabase (dbName );
268
- }
269
-
270
- }
271
26
}
0 commit comments