From a35b42adcb5d986b16b94357191ab6376e957ae2 Mon Sep 17 00:00:00 2001 From: Chico Sokol Date: Thu, 4 May 2017 12:10:46 +0200 Subject: [PATCH 1/6] Add exception callback to handle async errors --- .../org/influxdb/impl/BatchProcessor.java | 25 ++++++++++++++++--- .../org/influxdb/impl/BatchProcessorTest.java | 24 ++++++++++++++++-- 2 files changed, 44 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/influxdb/impl/BatchProcessor.java b/src/main/java/org/influxdb/impl/BatchProcessor.java index 0ad0d0ae9..4cb9dc005 100644 --- a/src/main/java/org/influxdb/impl/BatchProcessor.java +++ b/src/main/java/org/influxdb/impl/BatchProcessor.java @@ -10,6 +10,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.logging.Level; import java.util.logging.Logger; @@ -32,6 +33,7 @@ public class BatchProcessor { private static final Logger LOG = Logger.getLogger(BatchProcessor.class.getName()); protected final BlockingQueue queue; private final ScheduledExecutorService scheduler; + private final Consumer exceptionHandler; final InfluxDBImpl influxDB; final int actions; private final TimeUnit flushIntervalUnit; @@ -46,6 +48,7 @@ public static final class Builder { private int actions; private TimeUnit flushIntervalUnit; private int flushInterval; + private Consumer exceptionHandler = throwable -> {}; /** * @param threadFactory @@ -92,6 +95,19 @@ public Builder interval(final int interval, final TimeUnit unit) { return this; } + /** + * A callback to be used when an error occurs during a batchwrite + * + * @param handler + * the handler + * + * @return this Builder to use it fluent + */ + public Builder exceptionHandler(Consumer handler) { + this.exceptionHandler = handler; + return this; + } + /** * Create the BatchProcessor. * @@ -103,8 +119,9 @@ public BatchProcessor build() { Preconditions.checkArgument(this.flushInterval > 0, "flushInterval should > 0"); Preconditions.checkNotNull(this.flushIntervalUnit, "flushIntervalUnit may not be null"); Preconditions.checkNotNull(this.threadFactory, "threadFactory may not be null"); + Preconditions.checkNotNull(this.exceptionHandler, "exceptionHandler may not be null"); return new BatchProcessor(this.influxDB, this.threadFactory, this.actions, this.flushIntervalUnit, - this.flushInterval); + this.flushInterval, exceptionHandler); } } @@ -164,14 +181,15 @@ public static Builder builder(final InfluxDB influxDB) { } BatchProcessor(final InfluxDBImpl influxDB, final ThreadFactory threadFactory, final int actions, - final TimeUnit flushIntervalUnit, final int flushInterval) { + final TimeUnit flushIntervalUnit, final int flushInterval, Consumer exceptionHandler) { super(); this.influxDB = influxDB; this.actions = actions; this.flushIntervalUnit = flushIntervalUnit; this.flushInterval = flushInterval; this.scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory); - if (actions > 1 && actions < Integer.MAX_VALUE) { + this.exceptionHandler = exceptionHandler; + if (actions > 1 && actions < Integer.MAX_VALUE) { this.queue = new LinkedBlockingQueue<>(actions); } else { this.queue = new LinkedBlockingQueue<>(); @@ -232,6 +250,7 @@ void write() { } } catch (Throwable t) { // any exception wouldn't stop the scheduler + exceptionHandler.accept(t); LOG.log(Level.SEVERE, "Batch could not be sent. Data will be lost", t); } } diff --git a/src/test/java/org/influxdb/impl/BatchProcessorTest.java b/src/test/java/org/influxdb/impl/BatchProcessorTest.java index 82a2e8de4..0d8e6849f 100644 --- a/src/test/java/org/influxdb/impl/BatchProcessorTest.java +++ b/src/test/java/org/influxdb/impl/BatchProcessorTest.java @@ -9,6 +9,7 @@ import java.io.IOException; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import org.influxdb.InfluxDB; import org.influxdb.dto.BatchPoints; @@ -41,6 +42,25 @@ public void testSchedulerExceptionHandling() throws InterruptedException, IOExce verify(mockInfluxDB, times(2)).write(any(BatchPoints.class)); } + @Test + public void testSchedulerExceptionHandlingCallback() throws InterruptedException, IOException { + InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); + Consumer mockHandler = mock(Consumer.class); + BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB).actions(Integer.MAX_VALUE) + .interval(1, TimeUnit.NANOSECONDS).exceptionHandler(mockHandler).build(); + + doThrow(new RuntimeException()).when(mockInfluxDB).write(any(BatchPoints.class)); + + Point point = Point.measurement("cpu").field("6", "").build(); + BatchProcessor.HttpBatchEntry batchEntry1 = new BatchProcessor.HttpBatchEntry(point, "db1", ""); + BatchProcessor.HttpBatchEntry batchEntry2 = new BatchProcessor.HttpBatchEntry(point, "db2", ""); + + batchProcessor.put(batchEntry1); + Thread.sleep(200); // wait for scheduler + + verify(mockHandler, times(1)).accept(any(RuntimeException.class)); + } + @Test public void testBatchWriteWithDifferenctRp() throws InterruptedException, IOException { InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); @@ -91,14 +111,14 @@ public void testActionsIsZero() throws InterruptedException, IOException { BatchProcessor.builder(mockInfluxDB).actions(0) .interval(1, TimeUnit.NANOSECONDS).build(); } - + @Test(expected = IllegalArgumentException.class) public void testIntervalIsZero() throws InterruptedException, IOException { InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); BatchProcessor.builder(mockInfluxDB).actions(1) .interval(0, TimeUnit.NANOSECONDS).build(); } - + @Test(expected = NullPointerException.class) public void testInfluxDBIsNull() throws InterruptedException, IOException { InfluxDB mockInfluxDB = null; From 9045937d3fae8ec12dddcdeb2198f4c27125c628 Mon Sep 17 00:00:00 2001 From: Chico Sokol Date: Thu, 4 May 2017 12:14:15 +0200 Subject: [PATCH 2/6] Add final modifiers --- src/main/java/org/influxdb/impl/BatchProcessor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/influxdb/impl/BatchProcessor.java b/src/main/java/org/influxdb/impl/BatchProcessor.java index 4cb9dc005..52abb7092 100644 --- a/src/main/java/org/influxdb/impl/BatchProcessor.java +++ b/src/main/java/org/influxdb/impl/BatchProcessor.java @@ -103,7 +103,7 @@ public Builder interval(final int interval, final TimeUnit unit) { * * @return this Builder to use it fluent */ - public Builder exceptionHandler(Consumer handler) { + public Builder exceptionHandler(final Consumer handler) { this.exceptionHandler = handler; return this; } @@ -181,7 +181,7 @@ public static Builder builder(final InfluxDB influxDB) { } BatchProcessor(final InfluxDBImpl influxDB, final ThreadFactory threadFactory, final int actions, - final TimeUnit flushIntervalUnit, final int flushInterval, Consumer exceptionHandler) { + final TimeUnit flushIntervalUnit, final int flushInterval, final Consumer exceptionHandler) { super(); this.influxDB = influxDB; this.actions = actions; From 87487a0be56b240c4253c5211a754906b8c7f93d Mon Sep 17 00:00:00 2001 From: Chico Sokol Date: Thu, 4 May 2017 12:38:23 +0200 Subject: [PATCH 3/6] Fix formatting --- src/main/java/org/influxdb/impl/BatchProcessor.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/influxdb/impl/BatchProcessor.java b/src/main/java/org/influxdb/impl/BatchProcessor.java index 52abb7092..bf111d404 100644 --- a/src/main/java/org/influxdb/impl/BatchProcessor.java +++ b/src/main/java/org/influxdb/impl/BatchProcessor.java @@ -48,7 +48,7 @@ public static final class Builder { private int actions; private TimeUnit flushIntervalUnit; private int flushInterval; - private Consumer exceptionHandler = throwable -> {}; + private Consumer exceptionHandler = throwable -> { }; /** * @param threadFactory @@ -96,7 +96,7 @@ public Builder interval(final int interval, final TimeUnit unit) { } /** - * A callback to be used when an error occurs during a batchwrite + * A callback to be used when an error occurs during a batchwrite. * * @param handler * the handler @@ -181,7 +181,8 @@ public static Builder builder(final InfluxDB influxDB) { } BatchProcessor(final InfluxDBImpl influxDB, final ThreadFactory threadFactory, final int actions, - final TimeUnit flushIntervalUnit, final int flushInterval, final Consumer exceptionHandler) { + final TimeUnit flushIntervalUnit, final int flushInterval, + final Consumer exceptionHandler) { super(); this.influxDB = influxDB; this.actions = actions; From cb12f314382a27c01fa45752f740eac452e5e7fe Mon Sep 17 00:00:00 2001 From: Chico Sokol Date: Tue, 9 May 2017 17:55:02 +0200 Subject: [PATCH 4/6] New enableBatch method --- src/main/java/org/influxdb/InfluxDB.java | 13 ++++++++++++- .../java/org/influxdb/impl/InfluxDBImpl.java | 16 +++++++++++----- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/influxdb/InfluxDB.java b/src/main/java/org/influxdb/InfluxDB.java index 698a470b1..a78cfb91e 100644 --- a/src/main/java/org/influxdb/InfluxDB.java +++ b/src/main/java/org/influxdb/InfluxDB.java @@ -101,6 +101,15 @@ public String value() { */ public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit); + /** + * Enable batching of single Point writes as {@link #enableBatch(int, int, TimeUnit, ThreadFactory, Consumer)} + * using with a exceptionHandler that does nothing. + * + * @see #enableBatch(int, int, TimeUnit, ThreadFactory, Consumer) + */ + public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit, + final ThreadFactory threadFactory); + /** * Enable batching of single Point writes to speed up writes significant. If either actions or * flushDurations is reached first, a batch write is issued. @@ -113,10 +122,12 @@ public String value() { * the time to wait at most. * @param flushDurationTimeUnit * @param threadFactory + * @param exceptionHandler + * a consumer function to handle asynchronous errors * @return the InfluxDB instance to be able to use it in a fluent manner. */ public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit, - final ThreadFactory threadFactory); + final ThreadFactory threadFactory, final Consumer exceptionHandler); /** * Disable Batching. diff --git a/src/main/java/org/influxdb/impl/InfluxDBImpl.java b/src/main/java/org/influxdb/impl/InfluxDBImpl.java index 85e3d5d4a..3c09c0ccb 100644 --- a/src/main/java/org/influxdb/impl/InfluxDBImpl.java +++ b/src/main/java/org/influxdb/impl/InfluxDBImpl.java @@ -160,15 +160,21 @@ public InfluxDB enableBatch(final int actions, final int flushDuration, @Override public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit, final ThreadFactory threadFactory) { + enableBatch(actions, flushDuration, flushDurationTimeUnit, threadFactory, (throwable) -> { }); + return this; + } + + @Override + public InfluxDB enableBatch(int actions, int flushDuration, TimeUnit flushDurationTimeUnit, ThreadFactory threadFactory, Consumer exceptionHandler) { if (this.batchEnabled.get()) { throw new IllegalStateException("BatchProcessing is already enabled."); } this.batchProcessor = BatchProcessor - .builder(this) - .actions(actions) - .interval(flushDuration, flushDurationTimeUnit) - .threadFactory(threadFactory) - .build(); + .builder(this) + .actions(actions) + .interval(flushDuration, flushDurationTimeUnit) + .threadFactory(threadFactory) + .build(); this.batchEnabled.set(true); return this; } From ea861ac592ad69dcd9cb6ae54e3a3ffc95602a92 Mon Sep 17 00:00:00 2001 From: Chico Sokol Date: Tue, 9 May 2017 17:55:12 +0200 Subject: [PATCH 5/6] Error handling doc --- README.md | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 8199cc639..5bca2631a 100644 --- a/README.md +++ b/README.md @@ -75,7 +75,16 @@ Query query = new Query("SELECT idle FROM cpu", dbName); influxDB.query(query); influxDB.deleteDatabase(dbName); ``` -Note that the batching functionality creates an internal thread pool that needs to be shutdown explicitly as part of a gracefull application shut-down, or the application will not shut down properly. To do so simply call: ```influxDB.close()``` +Note that the batching functionality creates an internal thread pool that needs to be shutdown explicitly as part of a graceful application shut-down, or the application will not shut down properly. To do so simply call: ```influxDB.close()``` + +Also note that any errors that happen during the batch flush won't leak into the caller of the `write` method. By default, any kind of errors will be just logged with "SEVERE" level. + +If you need to be notified and do some custom logic when such asynchronous errors happen, you can add an error handler with a `Consumer` using the overloaded `enableBatch` method: + +```java +// Flush every 2000 Points, at least every 100ms +influxDB.enableBatch(2000, 100, TimeUnit.MILLISECONDS, Executors.defaultThreadFactory(), (throwable) -> { /* custom error handling here */ }); +``` ### Advanced Usages: From 1261031e88a0b0010979d9a3b9d14cecfb5e4431 Mon Sep 17 00:00:00 2001 From: Chico Sokol Date: Tue, 9 May 2017 18:07:37 +0200 Subject: [PATCH 6/6] Fix offenses --- src/main/java/org/influxdb/InfluxDB.java | 3 ++- src/main/java/org/influxdb/impl/InfluxDBImpl.java | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/influxdb/InfluxDB.java b/src/main/java/org/influxdb/InfluxDB.java index a78cfb91e..4d3e04a85 100644 --- a/src/main/java/org/influxdb/InfluxDB.java +++ b/src/main/java/org/influxdb/InfluxDB.java @@ -102,7 +102,8 @@ public String value() { public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit); /** - * Enable batching of single Point writes as {@link #enableBatch(int, int, TimeUnit, ThreadFactory, Consumer)} + * Enable batching of single Point writes as + * {@link #enableBatch(int, int, TimeUnit, ThreadFactory, Consumer)} * using with a exceptionHandler that does nothing. * * @see #enableBatch(int, int, TimeUnit, ThreadFactory, Consumer) diff --git a/src/main/java/org/influxdb/impl/InfluxDBImpl.java b/src/main/java/org/influxdb/impl/InfluxDBImpl.java index 3c09c0ccb..ca65a5547 100644 --- a/src/main/java/org/influxdb/impl/InfluxDBImpl.java +++ b/src/main/java/org/influxdb/impl/InfluxDBImpl.java @@ -165,7 +165,8 @@ public InfluxDB enableBatch(final int actions, final int flushDuration, } @Override - public InfluxDB enableBatch(int actions, int flushDuration, TimeUnit flushDurationTimeUnit, ThreadFactory threadFactory, Consumer exceptionHandler) { + public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit, + final ThreadFactory threadFactory, final Consumer exceptionHandler) { if (this.batchEnabled.get()) { throw new IllegalStateException("BatchProcessing is already enabled."); }