diff --git a/CHANGELOG.md b/CHANGELOG.md index f5db57f53..e593f0a49 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - Support chunking - Add a databaseExists method to InfluxDB interface - [Issue #289] (https://github.com/influxdata/influxdb-java/issues/289) Batching enhancements: Pending asynchronous writes can be explicitly flushed via `InfluxDB.flush()`. + - Add a listener to notify asynchronous errors during batch flushes (https://github.com/influxdata/influxdb-java/pull/318). #### Fixes diff --git a/README.md b/README.md index 8199cc639..94e1eb2e2 100644 --- a/README.md +++ b/README.md @@ -75,7 +75,16 @@ Query query = new Query("SELECT idle FROM cpu", dbName); influxDB.query(query); influxDB.deleteDatabase(dbName); ``` -Note that the batching functionality creates an internal thread pool that needs to be shutdown explicitly as part of a gracefull application shut-down, or the application will not shut down properly. To do so simply call: ```influxDB.close()``` +Note that the batching functionality creates an internal thread pool that needs to be shutdown explicitly as part of a graceful application shut-down, or the application will not shut down properly. To do so simply call: ```influxDB.close()``` + +Also note that any errors that happen during the batch flush won't leak into the caller of the `write` method. By default, any kind of errors will be just logged with "SEVERE" level. + +If you need to be notified and do some custom logic when such asynchronous errors happen, you can add an error handler with a `BiConsumer, Throwable>` using the overloaded `enableBatch` method: + +```java +// Flush every 2000 Points, at least every 100ms +influxDB.enableBatch(2000, 100, TimeUnit.MILLISECONDS, Executors.defaultThreadFactory(), (failedPoints, throwable) -> { /* custom error handling here */ }); +``` ### Advanced Usages: diff --git a/src/main/java/org/influxdb/InfluxDB.java b/src/main/java/org/influxdb/InfluxDB.java index 698a470b1..cad445c05 100644 --- a/src/main/java/org/influxdb/InfluxDB.java +++ b/src/main/java/org/influxdb/InfluxDB.java @@ -1,16 +1,17 @@ package org.influxdb; -import java.util.List; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; - import org.influxdb.dto.BatchPoints; import org.influxdb.dto.Point; import org.influxdb.dto.Pong; import org.influxdb.dto.Query; import org.influxdb.dto.QueryResult; +import java.util.List; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + /** * Interface with all available methods to access a InfluxDB database. * @@ -101,6 +102,16 @@ public String value() { */ public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit); + /** + * Enable batching of single Point writes as + * {@link #enableBatch(int, int, TimeUnit, ThreadFactory, BiConsumer, Throwable>)} + * using with a exceptionHandler that does nothing. + * + * @see #enableBatch(int, int, TimeUnit, ThreadFactory, BiConsumer, Throwable>) + */ + public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit, + final ThreadFactory threadFactory); + /** * Enable batching of single Point writes to speed up writes significant. If either actions or * flushDurations is reached first, a batch write is issued. @@ -113,10 +124,13 @@ public String value() { * the time to wait at most. * @param flushDurationTimeUnit * @param threadFactory + * @param exceptionHandler + * a consumer function to handle asynchronous errors * @return the InfluxDB instance to be able to use it in a fluent manner. */ public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit, - final ThreadFactory threadFactory); + final ThreadFactory threadFactory, + final BiConsumer, Throwable> exceptionHandler); /** * Disable Batching. diff --git a/src/main/java/org/influxdb/impl/BatchProcessor.java b/src/main/java/org/influxdb/impl/BatchProcessor.java index 0ad0d0ae9..cea38f0b4 100644 --- a/src/main/java/org/influxdb/impl/BatchProcessor.java +++ b/src/main/java/org/influxdb/impl/BatchProcessor.java @@ -10,6 +10,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import java.util.logging.Level; import java.util.logging.Logger; @@ -32,6 +33,7 @@ public class BatchProcessor { private static final Logger LOG = Logger.getLogger(BatchProcessor.class.getName()); protected final BlockingQueue queue; private final ScheduledExecutorService scheduler; + private final BiConsumer, Throwable> exceptionHandler; final InfluxDBImpl influxDB; final int actions; private final TimeUnit flushIntervalUnit; @@ -46,6 +48,7 @@ public static final class Builder { private int actions; private TimeUnit flushIntervalUnit; private int flushInterval; + private BiConsumer, Throwable> exceptionHandler = (entries, throwable) -> { }; /** * @param threadFactory @@ -92,6 +95,19 @@ public Builder interval(final int interval, final TimeUnit unit) { return this; } + /** + * A callback to be used when an error occurs during a batchwrite. + * + * @param handler + * the handler + * + * @return this Builder to use it fluent + */ + public Builder exceptionHandler(final BiConsumer, Throwable> handler) { + this.exceptionHandler = handler; + return this; + } + /** * Create the BatchProcessor. * @@ -103,8 +119,9 @@ public BatchProcessor build() { Preconditions.checkArgument(this.flushInterval > 0, "flushInterval should > 0"); Preconditions.checkNotNull(this.flushIntervalUnit, "flushIntervalUnit may not be null"); Preconditions.checkNotNull(this.threadFactory, "threadFactory may not be null"); + Preconditions.checkNotNull(this.exceptionHandler, "exceptionHandler may not be null"); return new BatchProcessor(this.influxDB, this.threadFactory, this.actions, this.flushIntervalUnit, - this.flushInterval); + this.flushInterval, exceptionHandler); } } @@ -164,14 +181,16 @@ public static Builder builder(final InfluxDB influxDB) { } BatchProcessor(final InfluxDBImpl influxDB, final ThreadFactory threadFactory, final int actions, - final TimeUnit flushIntervalUnit, final int flushInterval) { + final TimeUnit flushIntervalUnit, final int flushInterval, + final BiConsumer, Throwable> exceptionHandler) { super(); this.influxDB = influxDB; this.actions = actions; this.flushIntervalUnit = flushIntervalUnit; this.flushInterval = flushInterval; this.scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory); - if (actions > 1 && actions < Integer.MAX_VALUE) { + this.exceptionHandler = exceptionHandler; + if (actions > 1 && actions < Integer.MAX_VALUE) { this.queue = new LinkedBlockingQueue<>(actions); } else { this.queue = new LinkedBlockingQueue<>(); @@ -187,6 +206,7 @@ public void run() { } void write() { + List currentBatch = null; try { if (this.queue.isEmpty()) { return; @@ -197,9 +217,11 @@ void write() { Map> udpPortToBatchPoints = Maps.newHashMap(); List batchEntries = new ArrayList<>(this.queue.size()); this.queue.drainTo(batchEntries); + currentBatch = new ArrayList<>(batchEntries.size()); for (AbstractBatchEntry batchEntry : batchEntries) { Point point = batchEntry.getPoint(); + currentBatch.add(point); if (batchEntry instanceof HttpBatchEntry) { HttpBatchEntry httpBatchEntry = HttpBatchEntry.class.cast(batchEntry); String dbName = httpBatchEntry.getDb(); @@ -232,6 +254,7 @@ void write() { } } catch (Throwable t) { // any exception wouldn't stop the scheduler + exceptionHandler.accept(currentBatch, t); LOG.log(Level.SEVERE, "Batch could not be sent. Data will be lost", t); } } diff --git a/src/main/java/org/influxdb/impl/InfluxDBImpl.java b/src/main/java/org/influxdb/impl/InfluxDBImpl.java index 85e3d5d4a..884787cab 100644 --- a/src/main/java/org/influxdb/impl/InfluxDBImpl.java +++ b/src/main/java/org/influxdb/impl/InfluxDBImpl.java @@ -47,6 +47,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; import java.util.function.Consumer; /** @@ -160,15 +161,24 @@ public InfluxDB enableBatch(final int actions, final int flushDuration, @Override public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit, final ThreadFactory threadFactory) { + enableBatch(actions, flushDuration, flushDurationTimeUnit, threadFactory, (points, throwable) -> { }); + return this; + } + + @Override + public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit, + final ThreadFactory threadFactory, + final BiConsumer, Throwable> exceptionHandler) { if (this.batchEnabled.get()) { throw new IllegalStateException("BatchProcessing is already enabled."); } this.batchProcessor = BatchProcessor - .builder(this) - .actions(actions) - .interval(flushDuration, flushDurationTimeUnit) - .threadFactory(threadFactory) - .build(); + .builder(this) + .actions(actions) + .exceptionHandler(exceptionHandler) + .interval(flushDuration, flushDurationTimeUnit) + .threadFactory(threadFactory) + .build(); this.batchEnabled.set(true); return this; } diff --git a/src/test/java/org/influxdb/impl/BatchProcessorTest.java b/src/test/java/org/influxdb/impl/BatchProcessorTest.java index 82a2e8de4..b43a6a322 100644 --- a/src/test/java/org/influxdb/impl/BatchProcessorTest.java +++ b/src/test/java/org/influxdb/impl/BatchProcessorTest.java @@ -1,14 +1,17 @@ package org.influxdb.impl; +import static org.hamcrest.CoreMatchers.hasItems; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.hamcrest.MockitoHamcrest.argThat; import java.io.IOException; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import org.influxdb.InfluxDB; import org.influxdb.dto.BatchPoints; @@ -41,6 +44,25 @@ public void testSchedulerExceptionHandling() throws InterruptedException, IOExce verify(mockInfluxDB, times(2)).write(any(BatchPoints.class)); } + @Test + public void testSchedulerExceptionHandlingCallback() throws InterruptedException, IOException { + InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); + BiConsumer, Throwable> mockHandler = mock(BiConsumer.class); + BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB).actions(Integer.MAX_VALUE) + .interval(1, TimeUnit.NANOSECONDS).exceptionHandler(mockHandler).build(); + + doThrow(new RuntimeException()).when(mockInfluxDB).write(any(BatchPoints.class)); + + Point point = Point.measurement("cpu").field("6", "").build(); + BatchProcessor.HttpBatchEntry batchEntry1 = new BatchProcessor.HttpBatchEntry(point, "db1", ""); + BatchProcessor.HttpBatchEntry batchEntry2 = new BatchProcessor.HttpBatchEntry(point, "db2", ""); + + batchProcessor.put(batchEntry1); + Thread.sleep(200); // wait for scheduler + + verify(mockHandler, times(1)).accept(argThat(hasItems(point, point)), any(RuntimeException.class)); + } + @Test public void testBatchWriteWithDifferenctRp() throws InterruptedException, IOException { InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); @@ -91,14 +113,14 @@ public void testActionsIsZero() throws InterruptedException, IOException { BatchProcessor.builder(mockInfluxDB).actions(0) .interval(1, TimeUnit.NANOSECONDS).build(); } - + @Test(expected = IllegalArgumentException.class) public void testIntervalIsZero() throws InterruptedException, IOException { InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); BatchProcessor.builder(mockInfluxDB).actions(1) .interval(0, TimeUnit.NANOSECONDS).build(); } - + @Test(expected = NullPointerException.class) public void testInfluxDBIsNull() throws InterruptedException, IOException { InfluxDB mockInfluxDB = null;