From 0b35a2ca31230e30216a1244106ff4a870563cee Mon Sep 17 00:00:00 2001 From: Chico Sokol Date: Thu, 4 May 2017 12:10:46 +0200 Subject: [PATCH 1/9] Add exception callback to handle async errors --- .../org/influxdb/impl/BatchProcessor.java | 25 ++++++++++++++++--- .../org/influxdb/impl/BatchProcessorTest.java | 24 ++++++++++++++++-- 2 files changed, 44 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/influxdb/impl/BatchProcessor.java b/src/main/java/org/influxdb/impl/BatchProcessor.java index 0ad0d0ae9..4cb9dc005 100644 --- a/src/main/java/org/influxdb/impl/BatchProcessor.java +++ b/src/main/java/org/influxdb/impl/BatchProcessor.java @@ -10,6 +10,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.logging.Level; import java.util.logging.Logger; @@ -32,6 +33,7 @@ public class BatchProcessor { private static final Logger LOG = Logger.getLogger(BatchProcessor.class.getName()); protected final BlockingQueue queue; private final ScheduledExecutorService scheduler; + private final Consumer exceptionHandler; final InfluxDBImpl influxDB; final int actions; private final TimeUnit flushIntervalUnit; @@ -46,6 +48,7 @@ public static final class Builder { private int actions; private TimeUnit flushIntervalUnit; private int flushInterval; + private Consumer exceptionHandler = throwable -> {}; /** * @param threadFactory @@ -92,6 +95,19 @@ public Builder interval(final int interval, final TimeUnit unit) { return this; } + /** + * A callback to be used when an error occurs during a batchwrite + * + * @param handler + * the handler + * + * @return this Builder to use it fluent + */ + public Builder exceptionHandler(Consumer handler) { + this.exceptionHandler = handler; + return this; + } + /** * Create the BatchProcessor. * @@ -103,8 +119,9 @@ public BatchProcessor build() { Preconditions.checkArgument(this.flushInterval > 0, "flushInterval should > 0"); Preconditions.checkNotNull(this.flushIntervalUnit, "flushIntervalUnit may not be null"); Preconditions.checkNotNull(this.threadFactory, "threadFactory may not be null"); + Preconditions.checkNotNull(this.exceptionHandler, "exceptionHandler may not be null"); return new BatchProcessor(this.influxDB, this.threadFactory, this.actions, this.flushIntervalUnit, - this.flushInterval); + this.flushInterval, exceptionHandler); } } @@ -164,14 +181,15 @@ public static Builder builder(final InfluxDB influxDB) { } BatchProcessor(final InfluxDBImpl influxDB, final ThreadFactory threadFactory, final int actions, - final TimeUnit flushIntervalUnit, final int flushInterval) { + final TimeUnit flushIntervalUnit, final int flushInterval, Consumer exceptionHandler) { super(); this.influxDB = influxDB; this.actions = actions; this.flushIntervalUnit = flushIntervalUnit; this.flushInterval = flushInterval; this.scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory); - if (actions > 1 && actions < Integer.MAX_VALUE) { + this.exceptionHandler = exceptionHandler; + if (actions > 1 && actions < Integer.MAX_VALUE) { this.queue = new LinkedBlockingQueue<>(actions); } else { this.queue = new LinkedBlockingQueue<>(); @@ -232,6 +250,7 @@ void write() { } } catch (Throwable t) { // any exception wouldn't stop the scheduler + exceptionHandler.accept(t); LOG.log(Level.SEVERE, "Batch could not be sent. Data will be lost", t); } } diff --git a/src/test/java/org/influxdb/impl/BatchProcessorTest.java b/src/test/java/org/influxdb/impl/BatchProcessorTest.java index 82a2e8de4..0d8e6849f 100644 --- a/src/test/java/org/influxdb/impl/BatchProcessorTest.java +++ b/src/test/java/org/influxdb/impl/BatchProcessorTest.java @@ -9,6 +9,7 @@ import java.io.IOException; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import org.influxdb.InfluxDB; import org.influxdb.dto.BatchPoints; @@ -41,6 +42,25 @@ public void testSchedulerExceptionHandling() throws InterruptedException, IOExce verify(mockInfluxDB, times(2)).write(any(BatchPoints.class)); } + @Test + public void testSchedulerExceptionHandlingCallback() throws InterruptedException, IOException { + InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); + Consumer mockHandler = mock(Consumer.class); + BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB).actions(Integer.MAX_VALUE) + .interval(1, TimeUnit.NANOSECONDS).exceptionHandler(mockHandler).build(); + + doThrow(new RuntimeException()).when(mockInfluxDB).write(any(BatchPoints.class)); + + Point point = Point.measurement("cpu").field("6", "").build(); + BatchProcessor.HttpBatchEntry batchEntry1 = new BatchProcessor.HttpBatchEntry(point, "db1", ""); + BatchProcessor.HttpBatchEntry batchEntry2 = new BatchProcessor.HttpBatchEntry(point, "db2", ""); + + batchProcessor.put(batchEntry1); + Thread.sleep(200); // wait for scheduler + + verify(mockHandler, times(1)).accept(any(RuntimeException.class)); + } + @Test public void testBatchWriteWithDifferenctRp() throws InterruptedException, IOException { InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); @@ -91,14 +111,14 @@ public void testActionsIsZero() throws InterruptedException, IOException { BatchProcessor.builder(mockInfluxDB).actions(0) .interval(1, TimeUnit.NANOSECONDS).build(); } - + @Test(expected = IllegalArgumentException.class) public void testIntervalIsZero() throws InterruptedException, IOException { InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); BatchProcessor.builder(mockInfluxDB).actions(1) .interval(0, TimeUnit.NANOSECONDS).build(); } - + @Test(expected = NullPointerException.class) public void testInfluxDBIsNull() throws InterruptedException, IOException { InfluxDB mockInfluxDB = null; From ca47f84889c00d005bcc21d15f52026e006d92dd Mon Sep 17 00:00:00 2001 From: Chico Sokol Date: Thu, 4 May 2017 12:14:15 +0200 Subject: [PATCH 2/9] Add final modifiers --- src/main/java/org/influxdb/impl/BatchProcessor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/influxdb/impl/BatchProcessor.java b/src/main/java/org/influxdb/impl/BatchProcessor.java index 4cb9dc005..52abb7092 100644 --- a/src/main/java/org/influxdb/impl/BatchProcessor.java +++ b/src/main/java/org/influxdb/impl/BatchProcessor.java @@ -103,7 +103,7 @@ public Builder interval(final int interval, final TimeUnit unit) { * * @return this Builder to use it fluent */ - public Builder exceptionHandler(Consumer handler) { + public Builder exceptionHandler(final Consumer handler) { this.exceptionHandler = handler; return this; } @@ -181,7 +181,7 @@ public static Builder builder(final InfluxDB influxDB) { } BatchProcessor(final InfluxDBImpl influxDB, final ThreadFactory threadFactory, final int actions, - final TimeUnit flushIntervalUnit, final int flushInterval, Consumer exceptionHandler) { + final TimeUnit flushIntervalUnit, final int flushInterval, final Consumer exceptionHandler) { super(); this.influxDB = influxDB; this.actions = actions; From c445f9d08910de263c0b2f66b15627925cce71af Mon Sep 17 00:00:00 2001 From: Chico Sokol Date: Thu, 4 May 2017 12:38:23 +0200 Subject: [PATCH 3/9] Fix formatting --- src/main/java/org/influxdb/impl/BatchProcessor.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/influxdb/impl/BatchProcessor.java b/src/main/java/org/influxdb/impl/BatchProcessor.java index 52abb7092..bf111d404 100644 --- a/src/main/java/org/influxdb/impl/BatchProcessor.java +++ b/src/main/java/org/influxdb/impl/BatchProcessor.java @@ -48,7 +48,7 @@ public static final class Builder { private int actions; private TimeUnit flushIntervalUnit; private int flushInterval; - private Consumer exceptionHandler = throwable -> {}; + private Consumer exceptionHandler = throwable -> { }; /** * @param threadFactory @@ -96,7 +96,7 @@ public Builder interval(final int interval, final TimeUnit unit) { } /** - * A callback to be used when an error occurs during a batchwrite + * A callback to be used when an error occurs during a batchwrite. * * @param handler * the handler @@ -181,7 +181,8 @@ public static Builder builder(final InfluxDB influxDB) { } BatchProcessor(final InfluxDBImpl influxDB, final ThreadFactory threadFactory, final int actions, - final TimeUnit flushIntervalUnit, final int flushInterval, final Consumer exceptionHandler) { + final TimeUnit flushIntervalUnit, final int flushInterval, + final Consumer exceptionHandler) { super(); this.influxDB = influxDB; this.actions = actions; From 02f3d9cfe9eb8ecf7721952ac49d5e578394677d Mon Sep 17 00:00:00 2001 From: Chico Sokol Date: Tue, 9 May 2017 17:55:02 +0200 Subject: [PATCH 4/9] New enableBatch method --- src/main/java/org/influxdb/InfluxDB.java | 13 ++++++++++++- .../java/org/influxdb/impl/InfluxDBImpl.java | 16 +++++++++++----- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/influxdb/InfluxDB.java b/src/main/java/org/influxdb/InfluxDB.java index 698a470b1..a78cfb91e 100644 --- a/src/main/java/org/influxdb/InfluxDB.java +++ b/src/main/java/org/influxdb/InfluxDB.java @@ -101,6 +101,15 @@ public String value() { */ public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit); + /** + * Enable batching of single Point writes as {@link #enableBatch(int, int, TimeUnit, ThreadFactory, Consumer)} + * using with a exceptionHandler that does nothing. + * + * @see #enableBatch(int, int, TimeUnit, ThreadFactory, Consumer) + */ + public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit, + final ThreadFactory threadFactory); + /** * Enable batching of single Point writes to speed up writes significant. If either actions or * flushDurations is reached first, a batch write is issued. @@ -113,10 +122,12 @@ public String value() { * the time to wait at most. * @param flushDurationTimeUnit * @param threadFactory + * @param exceptionHandler + * a consumer function to handle asynchronous errors * @return the InfluxDB instance to be able to use it in a fluent manner. */ public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit, - final ThreadFactory threadFactory); + final ThreadFactory threadFactory, final Consumer exceptionHandler); /** * Disable Batching. diff --git a/src/main/java/org/influxdb/impl/InfluxDBImpl.java b/src/main/java/org/influxdb/impl/InfluxDBImpl.java index 85e3d5d4a..3c09c0ccb 100644 --- a/src/main/java/org/influxdb/impl/InfluxDBImpl.java +++ b/src/main/java/org/influxdb/impl/InfluxDBImpl.java @@ -160,15 +160,21 @@ public InfluxDB enableBatch(final int actions, final int flushDuration, @Override public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit, final ThreadFactory threadFactory) { + enableBatch(actions, flushDuration, flushDurationTimeUnit, threadFactory, (throwable) -> { }); + return this; + } + + @Override + public InfluxDB enableBatch(int actions, int flushDuration, TimeUnit flushDurationTimeUnit, ThreadFactory threadFactory, Consumer exceptionHandler) { if (this.batchEnabled.get()) { throw new IllegalStateException("BatchProcessing is already enabled."); } this.batchProcessor = BatchProcessor - .builder(this) - .actions(actions) - .interval(flushDuration, flushDurationTimeUnit) - .threadFactory(threadFactory) - .build(); + .builder(this) + .actions(actions) + .interval(flushDuration, flushDurationTimeUnit) + .threadFactory(threadFactory) + .build(); this.batchEnabled.set(true); return this; } From 59ad7976221fe9fc494aa482840526a723320179 Mon Sep 17 00:00:00 2001 From: Chico Sokol Date: Tue, 9 May 2017 17:55:12 +0200 Subject: [PATCH 5/9] Error handling doc --- README.md | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 8199cc639..5bca2631a 100644 --- a/README.md +++ b/README.md @@ -75,7 +75,16 @@ Query query = new Query("SELECT idle FROM cpu", dbName); influxDB.query(query); influxDB.deleteDatabase(dbName); ``` -Note that the batching functionality creates an internal thread pool that needs to be shutdown explicitly as part of a gracefull application shut-down, or the application will not shut down properly. To do so simply call: ```influxDB.close()``` +Note that the batching functionality creates an internal thread pool that needs to be shutdown explicitly as part of a graceful application shut-down, or the application will not shut down properly. To do so simply call: ```influxDB.close()``` + +Also note that any errors that happen during the batch flush won't leak into the caller of the `write` method. By default, any kind of errors will be just logged with "SEVERE" level. + +If you need to be notified and do some custom logic when such asynchronous errors happen, you can add an error handler with a `Consumer` using the overloaded `enableBatch` method: + +```java +// Flush every 2000 Points, at least every 100ms +influxDB.enableBatch(2000, 100, TimeUnit.MILLISECONDS, Executors.defaultThreadFactory(), (throwable) -> { /* custom error handling here */ }); +``` ### Advanced Usages: From 8d83ebab71391da7ee7553b8b990ae0221bb2228 Mon Sep 17 00:00:00 2001 From: Chico Sokol Date: Tue, 9 May 2017 18:07:37 +0200 Subject: [PATCH 6/9] Fix offenses --- src/main/java/org/influxdb/InfluxDB.java | 3 ++- src/main/java/org/influxdb/impl/InfluxDBImpl.java | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/influxdb/InfluxDB.java b/src/main/java/org/influxdb/InfluxDB.java index a78cfb91e..4d3e04a85 100644 --- a/src/main/java/org/influxdb/InfluxDB.java +++ b/src/main/java/org/influxdb/InfluxDB.java @@ -102,7 +102,8 @@ public String value() { public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit); /** - * Enable batching of single Point writes as {@link #enableBatch(int, int, TimeUnit, ThreadFactory, Consumer)} + * Enable batching of single Point writes as + * {@link #enableBatch(int, int, TimeUnit, ThreadFactory, Consumer)} * using with a exceptionHandler that does nothing. * * @see #enableBatch(int, int, TimeUnit, ThreadFactory, Consumer) diff --git a/src/main/java/org/influxdb/impl/InfluxDBImpl.java b/src/main/java/org/influxdb/impl/InfluxDBImpl.java index 3c09c0ccb..ca65a5547 100644 --- a/src/main/java/org/influxdb/impl/InfluxDBImpl.java +++ b/src/main/java/org/influxdb/impl/InfluxDBImpl.java @@ -165,7 +165,8 @@ public InfluxDB enableBatch(final int actions, final int flushDuration, } @Override - public InfluxDB enableBatch(int actions, int flushDuration, TimeUnit flushDurationTimeUnit, ThreadFactory threadFactory, Consumer exceptionHandler) { + public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit, + final ThreadFactory threadFactory, final Consumer exceptionHandler) { if (this.batchEnabled.get()) { throw new IllegalStateException("BatchProcessing is already enabled."); } From bb394f2653e56a804a778e7455746708244d20c4 Mon Sep 17 00:00:00 2001 From: Chico Sokol Date: Thu, 11 May 2017 11:34:41 +0200 Subject: [PATCH 7/9] Pass failed batch to consumer --- src/main/java/org/influxdb/InfluxDB.java | 18 ++++++++++-------- .../java/org/influxdb/impl/BatchProcessor.java | 15 +++++++++------ .../java/org/influxdb/impl/InfluxDBImpl.java | 7 +++++-- .../org/influxdb/impl/BatchProcessorTest.java | 8 +++++--- 4 files changed, 29 insertions(+), 19 deletions(-) diff --git a/src/main/java/org/influxdb/InfluxDB.java b/src/main/java/org/influxdb/InfluxDB.java index 4d3e04a85..cad445c05 100644 --- a/src/main/java/org/influxdb/InfluxDB.java +++ b/src/main/java/org/influxdb/InfluxDB.java @@ -1,16 +1,17 @@ package org.influxdb; -import java.util.List; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; - import org.influxdb.dto.BatchPoints; import org.influxdb.dto.Point; import org.influxdb.dto.Pong; import org.influxdb.dto.Query; import org.influxdb.dto.QueryResult; +import java.util.List; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + /** * Interface with all available methods to access a InfluxDB database. * @@ -103,10 +104,10 @@ public String value() { /** * Enable batching of single Point writes as - * {@link #enableBatch(int, int, TimeUnit, ThreadFactory, Consumer)} + * {@link #enableBatch(int, int, TimeUnit, ThreadFactory, BiConsumer, Throwable>)} * using with a exceptionHandler that does nothing. * - * @see #enableBatch(int, int, TimeUnit, ThreadFactory, Consumer) + * @see #enableBatch(int, int, TimeUnit, ThreadFactory, BiConsumer, Throwable>) */ public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit, final ThreadFactory threadFactory); @@ -128,7 +129,8 @@ public InfluxDB enableBatch(final int actions, final int flushDuration, final Ti * @return the InfluxDB instance to be able to use it in a fluent manner. */ public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit, - final ThreadFactory threadFactory, final Consumer exceptionHandler); + final ThreadFactory threadFactory, + final BiConsumer, Throwable> exceptionHandler); /** * Disable Batching. diff --git a/src/main/java/org/influxdb/impl/BatchProcessor.java b/src/main/java/org/influxdb/impl/BatchProcessor.java index bf111d404..cea38f0b4 100644 --- a/src/main/java/org/influxdb/impl/BatchProcessor.java +++ b/src/main/java/org/influxdb/impl/BatchProcessor.java @@ -10,7 +10,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; +import java.util.function.BiConsumer; import java.util.logging.Level; import java.util.logging.Logger; @@ -33,7 +33,7 @@ public class BatchProcessor { private static final Logger LOG = Logger.getLogger(BatchProcessor.class.getName()); protected final BlockingQueue queue; private final ScheduledExecutorService scheduler; - private final Consumer exceptionHandler; + private final BiConsumer, Throwable> exceptionHandler; final InfluxDBImpl influxDB; final int actions; private final TimeUnit flushIntervalUnit; @@ -48,7 +48,7 @@ public static final class Builder { private int actions; private TimeUnit flushIntervalUnit; private int flushInterval; - private Consumer exceptionHandler = throwable -> { }; + private BiConsumer, Throwable> exceptionHandler = (entries, throwable) -> { }; /** * @param threadFactory @@ -103,7 +103,7 @@ public Builder interval(final int interval, final TimeUnit unit) { * * @return this Builder to use it fluent */ - public Builder exceptionHandler(final Consumer handler) { + public Builder exceptionHandler(final BiConsumer, Throwable> handler) { this.exceptionHandler = handler; return this; } @@ -182,7 +182,7 @@ public static Builder builder(final InfluxDB influxDB) { BatchProcessor(final InfluxDBImpl influxDB, final ThreadFactory threadFactory, final int actions, final TimeUnit flushIntervalUnit, final int flushInterval, - final Consumer exceptionHandler) { + final BiConsumer, Throwable> exceptionHandler) { super(); this.influxDB = influxDB; this.actions = actions; @@ -206,6 +206,7 @@ public void run() { } void write() { + List currentBatch = null; try { if (this.queue.isEmpty()) { return; @@ -216,9 +217,11 @@ void write() { Map> udpPortToBatchPoints = Maps.newHashMap(); List batchEntries = new ArrayList<>(this.queue.size()); this.queue.drainTo(batchEntries); + currentBatch = new ArrayList<>(batchEntries.size()); for (AbstractBatchEntry batchEntry : batchEntries) { Point point = batchEntry.getPoint(); + currentBatch.add(point); if (batchEntry instanceof HttpBatchEntry) { HttpBatchEntry httpBatchEntry = HttpBatchEntry.class.cast(batchEntry); String dbName = httpBatchEntry.getDb(); @@ -251,7 +254,7 @@ void write() { } } catch (Throwable t) { // any exception wouldn't stop the scheduler - exceptionHandler.accept(t); + exceptionHandler.accept(currentBatch, t); LOG.log(Level.SEVERE, "Batch could not be sent. Data will be lost", t); } } diff --git a/src/main/java/org/influxdb/impl/InfluxDBImpl.java b/src/main/java/org/influxdb/impl/InfluxDBImpl.java index ca65a5547..884787cab 100644 --- a/src/main/java/org/influxdb/impl/InfluxDBImpl.java +++ b/src/main/java/org/influxdb/impl/InfluxDBImpl.java @@ -47,6 +47,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; import java.util.function.Consumer; /** @@ -160,19 +161,21 @@ public InfluxDB enableBatch(final int actions, final int flushDuration, @Override public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit, final ThreadFactory threadFactory) { - enableBatch(actions, flushDuration, flushDurationTimeUnit, threadFactory, (throwable) -> { }); + enableBatch(actions, flushDuration, flushDurationTimeUnit, threadFactory, (points, throwable) -> { }); return this; } @Override public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit, - final ThreadFactory threadFactory, final Consumer exceptionHandler) { + final ThreadFactory threadFactory, + final BiConsumer, Throwable> exceptionHandler) { if (this.batchEnabled.get()) { throw new IllegalStateException("BatchProcessing is already enabled."); } this.batchProcessor = BatchProcessor .builder(this) .actions(actions) + .exceptionHandler(exceptionHandler) .interval(flushDuration, flushDurationTimeUnit) .threadFactory(threadFactory) .build(); diff --git a/src/test/java/org/influxdb/impl/BatchProcessorTest.java b/src/test/java/org/influxdb/impl/BatchProcessorTest.java index 0d8e6849f..b43a6a322 100644 --- a/src/test/java/org/influxdb/impl/BatchProcessorTest.java +++ b/src/test/java/org/influxdb/impl/BatchProcessorTest.java @@ -1,15 +1,17 @@ package org.influxdb.impl; +import static org.hamcrest.CoreMatchers.hasItems; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.hamcrest.MockitoHamcrest.argThat; import java.io.IOException; import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; +import java.util.function.BiConsumer; import org.influxdb.InfluxDB; import org.influxdb.dto.BatchPoints; @@ -45,7 +47,7 @@ public void testSchedulerExceptionHandling() throws InterruptedException, IOExce @Test public void testSchedulerExceptionHandlingCallback() throws InterruptedException, IOException { InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); - Consumer mockHandler = mock(Consumer.class); + BiConsumer, Throwable> mockHandler = mock(BiConsumer.class); BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB).actions(Integer.MAX_VALUE) .interval(1, TimeUnit.NANOSECONDS).exceptionHandler(mockHandler).build(); @@ -58,7 +60,7 @@ public void testSchedulerExceptionHandlingCallback() throws InterruptedException batchProcessor.put(batchEntry1); Thread.sleep(200); // wait for scheduler - verify(mockHandler, times(1)).accept(any(RuntimeException.class)); + verify(mockHandler, times(1)).accept(argThat(hasItems(point, point)), any(RuntimeException.class)); } @Test From 8f790cf52ebfa74b9840d53e7ecae88238630e8a Mon Sep 17 00:00:00 2001 From: Chico Sokol Date: Thu, 11 May 2017 12:49:15 +0200 Subject: [PATCH 8/9] Update readme with biconsumer --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 5bca2631a..94e1eb2e2 100644 --- a/README.md +++ b/README.md @@ -79,11 +79,11 @@ Note that the batching functionality creates an internal thread pool that needs Also note that any errors that happen during the batch flush won't leak into the caller of the `write` method. By default, any kind of errors will be just logged with "SEVERE" level. -If you need to be notified and do some custom logic when such asynchronous errors happen, you can add an error handler with a `Consumer` using the overloaded `enableBatch` method: +If you need to be notified and do some custom logic when such asynchronous errors happen, you can add an error handler with a `BiConsumer, Throwable>` using the overloaded `enableBatch` method: ```java // Flush every 2000 Points, at least every 100ms -influxDB.enableBatch(2000, 100, TimeUnit.MILLISECONDS, Executors.defaultThreadFactory(), (throwable) -> { /* custom error handling here */ }); +influxDB.enableBatch(2000, 100, TimeUnit.MILLISECONDS, Executors.defaultThreadFactory(), (failedPoints, throwable) -> { /* custom error handling here */ }); ``` ### Advanced Usages: From 0f57e39539c7e92846f4de1d2f6c18294388d9f4 Mon Sep 17 00:00:00 2001 From: Chico Sokol Date: Thu, 11 May 2017 13:07:29 +0200 Subject: [PATCH 9/9] Changelog entry --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f5db57f53..e593f0a49 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - Support chunking - Add a databaseExists method to InfluxDB interface - [Issue #289] (https://github.com/influxdata/influxdb-java/issues/289) Batching enhancements: Pending asynchronous writes can be explicitly flushed via `InfluxDB.flush()`. + - Add a listener to notify asynchronous errors during batch flushes (https://github.com/influxdata/influxdb-java/pull/318). #### Fixes