Skip to content

Commit 3d387c0

Browse files
committed
imlement issue #439 : Retry capability for write(final BatchPoints batchPoints) as well
1 parent 24c5542 commit 3d387c0

File tree

4 files changed

+139
-2
lines changed

4 files changed

+139
-2
lines changed

src/main/java/org/influxdb/InfluxDB.java

+17
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,23 @@ public InfluxDB enableBatch(final int actions, final int flushDuration, final Ti
312312
*/
313313
public void write(final BatchPoints batchPoints);
314314

315+
/**
316+
* Write a set of Points to the influxdb database with the new (>= 0.9.0rc32) lineprotocol.
317+
*
318+
* If batching is enabled with appropriate {@code BatchOptions} settings
319+
* ({@code BatchOptions.bufferLimit} greater than {@code BatchOptions.actions})
320+
* This method will try to retry in case of some recoverable errors.
321+
* Otherwise it just works as {@link #write(BatchPoints)}
322+
*
323+
* @see <a href="https://github.com/influxdb/influxdb/pull/2696">2696</a>
324+
* @see <a href="https://github.com/influxdata/influxdb-java/wiki/Handling-errors-of-InfluxDB-under-high-load">
325+
* Retry worth errors</a>
326+
*
327+
* @param batchPoints
328+
* the points to write in BatchPoints.
329+
*/
330+
public void writeWithRetry(final BatchPoints batchPoints);
331+
315332
/**
316333
* Write a set of Points to the influxdb database with the string records.
317334
*

src/main/java/org/influxdb/impl/BatchProcessor.java

+4
Original file line numberDiff line numberDiff line change
@@ -376,4 +376,8 @@ public ConsistencyLevel getConsistencyLevel() {
376376
return consistencyLevel;
377377
}
378378

379+
BatchWriter getBatchWriter() {
380+
return batchWriter;
381+
}
382+
379383
}

src/main/java/org/influxdb/impl/InfluxDBImpl.java

+9
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.net.UnknownHostException;
4747
import java.nio.charset.StandardCharsets;
4848
import java.util.ArrayList;
49+
import java.util.Collections;
4950
import java.util.List;
5051
import java.util.concurrent.Executors;
5152
import java.util.concurrent.ThreadFactory;
@@ -425,6 +426,14 @@ public void write(final BatchPoints batchPoints) {
425426
lineProtocol));
426427
}
427428

429+
@Override
430+
public void writeWithRetry(final BatchPoints batchPoints) {
431+
if (isBatchEnabled()) {
432+
batchProcessor.getBatchWriter().write(Collections.singleton(batchPoints));
433+
} else {
434+
write(batchPoints);
435+
}
436+
}
428437

429438
@Override
430439
public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistency,

src/test/java/org/influxdb/BatchOptionsTest.java

+109-2
Original file line numberDiff line numberDiff line change
@@ -484,7 +484,100 @@ protected void check(InvocationOnMock invocation) {
484484
spy.deleteDatabase(dbName);
485485
}
486486
}
487-
487+
488+
489+
@Test
490+
public void testWriteWithRetryOnRecoverableError() throws InterruptedException {
491+
String dbName = "write_unittest_" + System.currentTimeMillis();
492+
InfluxDB spy = spy(influxDB);
493+
doAnswer(new Answer() {
494+
boolean firstCall = true;
495+
496+
@Override
497+
public Object answer(InvocationOnMock invocation) throws Throwable {
498+
if (firstCall) {
499+
firstCall = false;
500+
throw new InfluxDBException("error");
501+
} else {
502+
return invocation.callRealMethod();
503+
}
504+
}
505+
}).when(spy).write(any(BatchPoints.class));
506+
try {
507+
BiConsumer<Iterable<Point>, Throwable> mockHandler = mock(BiConsumer.class);
508+
BatchOptions options = BatchOptions.DEFAULTS.exceptionHandler(mockHandler).flushDuration(100);
509+
510+
spy.createDatabase(dbName);
511+
spy.setDatabase(dbName);
512+
spy.enableBatch(options);
513+
514+
BatchPoints batchPoints = createBatchPoints(dbName, "m0", 200);
515+
spy.writeWithRetry(batchPoints);
516+
Thread.sleep(500);
517+
verify(mockHandler, never()).accept(any(), any());
518+
519+
verify(spy, times(2)).write(any(BatchPoints.class));
520+
521+
QueryResult result = influxDB.query(new Query("select * from m0", dbName));
522+
Assertions.assertNotNull(result.getResults().get(0).getSeries());
523+
Assertions.assertEquals(200, result.getResults().get(0).getSeries().get(0).getValues().size());
524+
525+
} finally {
526+
spy.disableBatch();
527+
spy.deleteDatabase(dbName);
528+
}
529+
}
530+
531+
@Test
532+
public void testWriteWithRetryOnUnrecoverableError() throws InterruptedException {
533+
534+
String dbName = "write_unittest_" + System.currentTimeMillis();
535+
InfluxDB spy = spy((InfluxDB) influxDB);
536+
doThrow(DatabaseNotFoundException.class).when(spy).write(any(BatchPoints.class));
537+
538+
try {
539+
BiConsumer<Iterable<Point>, Throwable> mockHandler = mock(BiConsumer.class);
540+
BatchOptions options = BatchOptions.DEFAULTS.exceptionHandler(mockHandler).flushDuration(100);
541+
542+
spy.createDatabase(dbName);
543+
spy.setDatabase(dbName);
544+
spy.enableBatch(options);
545+
546+
BatchPoints batchPoints = createBatchPoints(dbName, "m0", 200);
547+
spy.writeWithRetry(batchPoints);
548+
Thread.sleep(500);
549+
550+
verify(mockHandler, times(1)).accept(any(), any());
551+
552+
QueryResult result = influxDB.query(new Query("select * from m0", dbName));
553+
Assertions.assertNull(result.getResults().get(0).getSeries());
554+
Assertions.assertNull(result.getResults().get(0).getError());
555+
} finally {
556+
spy.disableBatch();
557+
spy.deleteDatabase(dbName);
558+
}
559+
560+
}
561+
562+
@Test
563+
public void testWriteWithRetryOnBatchingNotEnabled() {
564+
String dbName = "write_unittest_" + System.currentTimeMillis();
565+
try {
566+
567+
influxDB.createDatabase(dbName);
568+
influxDB.setDatabase(dbName);
569+
570+
BatchPoints batchPoints = createBatchPoints(dbName, "m0", 200);
571+
influxDB.writeWithRetry(batchPoints);
572+
573+
QueryResult result = influxDB.query(new Query("select * from m0", dbName));
574+
Assertions.assertNotNull(result.getResults().get(0).getSeries());
575+
Assertions.assertEquals(200, result.getResults().get(0).getSeries().get(0).getValues().size());
576+
} finally {
577+
influxDB.deleteDatabase(dbName);
578+
}
579+
580+
}
488581
void writeSomePoints(InfluxDB influxDB, String measurement, int firstIndex, int lastIndex) {
489582
for (int i = firstIndex; i <= lastIndex; i++) {
490583
Point point = Point.measurement(measurement)
@@ -514,7 +607,21 @@ void write20Points(InfluxDB influxDB) {
514607
void writeSomePoints(InfluxDB influxDB, int n) {
515608
writeSomePoints(influxDB, 0, n - 1);
516609
}
517-
610+
611+
private BatchPoints createBatchPoints(String dbName, String measurement, int n) {
612+
BatchPoints batchPoints = BatchPoints.database(dbName).build();
613+
for (int i = 1; i <= n; i++) {
614+
Point point = Point.measurement(measurement)
615+
.time(i,TimeUnit.MILLISECONDS)
616+
.addField("f1", (double) i)
617+
.addField("f2", (double) (i) * 1.1)
618+
.addField("f3", "f_v3").build();
619+
batchPoints.point(point);
620+
}
621+
622+
return batchPoints;
623+
}
624+
518625
static String createErrorBody(String errorMessage) {
519626
return MessageFormat.format("'{' \"error\": \"{0}\" '}'", errorMessage);
520627
}

0 commit comments

Comments
 (0)