Skip to content

Commit a2ddba4

Browse files
authored
Merge pull request #319 from csokol/biconsumer-exception-callback
Biconsumer exception callback
2 parents a3d315a + 0f57e39 commit a2ddba4

File tree

6 files changed

+96
-17
lines changed

6 files changed

+96
-17
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
- Support chunking
77
- Add a databaseExists method to InfluxDB interface
88
- [Issue #289] (https://github.com/influxdata/influxdb-java/issues/289) Batching enhancements: Pending asynchronous writes can be explicitly flushed via `InfluxDB.flush()`.
9+
- Add a listener to notify asynchronous errors during batch flushes (https://github.com/influxdata/influxdb-java/pull/318).
910

1011
#### Fixes
1112

README.md

+10-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,16 @@ Query query = new Query("SELECT idle FROM cpu", dbName);
7575
influxDB.query(query);
7676
influxDB.deleteDatabase(dbName);
7777
```
78-
Note that the batching functionality creates an internal thread pool that needs to be shutdown explicitly as part of a gracefull application shut-down, or the application will not shut down properly. To do so simply call: ```influxDB.close()```
78+
Note that the batching functionality creates an internal thread pool that needs to be shutdown explicitly as part of a graceful application shut-down, or the application will not shut down properly. To do so simply call: ```influxDB.close()```
79+
80+
Also note that any errors that happen during the batch flush won't leak into the caller of the `write` method. By default, any kind of errors will be just logged with "SEVERE" level.
81+
82+
If you need to be notified and do some custom logic when such asynchronous errors happen, you can add an error handler with a `BiConsumer<Iterable<Point>, Throwable>` using the overloaded `enableBatch` method:
83+
84+
```java
85+
// Flush every 2000 Points, at least every 100ms
86+
influxDB.enableBatch(2000, 100, TimeUnit.MILLISECONDS, Executors.defaultThreadFactory(), (failedPoints, throwable) -> { /* custom error handling here */ });
87+
```
7988

8089
### Advanced Usages:
8190

src/main/java/org/influxdb/InfluxDB.java

+20-6
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
package org.influxdb;
22

3-
import java.util.List;
4-
import java.util.concurrent.ThreadFactory;
5-
import java.util.concurrent.TimeUnit;
6-
import java.util.function.Consumer;
7-
83
import org.influxdb.dto.BatchPoints;
94
import org.influxdb.dto.Point;
105
import org.influxdb.dto.Pong;
116
import org.influxdb.dto.Query;
127
import org.influxdb.dto.QueryResult;
138

9+
import java.util.List;
10+
import java.util.concurrent.ThreadFactory;
11+
import java.util.concurrent.TimeUnit;
12+
import java.util.function.BiConsumer;
13+
import java.util.function.Consumer;
14+
1415
/**
1516
* Interface with all available methods to access a InfluxDB database.
1617
*
@@ -101,6 +102,16 @@ public String value() {
101102
*/
102103
public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit);
103104

105+
/**
106+
* Enable batching of single Point writes as
107+
* {@link #enableBatch(int, int, TimeUnit, ThreadFactory, BiConsumer<Iterable<Point>, Throwable>)}
108+
* using with a exceptionHandler that does nothing.
109+
*
110+
* @see #enableBatch(int, int, TimeUnit, ThreadFactory, BiConsumer<Iterable<Point>, Throwable>)
111+
*/
112+
public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit,
113+
final ThreadFactory threadFactory);
114+
104115
/**
105116
* Enable batching of single Point writes to speed up writes significant. If either actions or
106117
* flushDurations is reached first, a batch write is issued.
@@ -113,10 +124,13 @@ public String value() {
113124
* the time to wait at most.
114125
* @param flushDurationTimeUnit
115126
* @param threadFactory
127+
* @param exceptionHandler
128+
* a consumer function to handle asynchronous errors
116129
* @return the InfluxDB instance to be able to use it in a fluent manner.
117130
*/
118131
public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit,
119-
final ThreadFactory threadFactory);
132+
final ThreadFactory threadFactory,
133+
final BiConsumer<Iterable<Point>, Throwable> exceptionHandler);
120134

121135
/**
122136
* Disable Batching.

src/main/java/org/influxdb/impl/BatchProcessor.java

+26-3
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.util.concurrent.ScheduledExecutorService;
1111
import java.util.concurrent.ThreadFactory;
1212
import java.util.concurrent.TimeUnit;
13+
import java.util.function.BiConsumer;
1314
import java.util.logging.Level;
1415
import java.util.logging.Logger;
1516

@@ -32,6 +33,7 @@ public class BatchProcessor {
3233
private static final Logger LOG = Logger.getLogger(BatchProcessor.class.getName());
3334
protected final BlockingQueue<AbstractBatchEntry> queue;
3435
private final ScheduledExecutorService scheduler;
36+
private final BiConsumer<Iterable<Point>, Throwable> exceptionHandler;
3537
final InfluxDBImpl influxDB;
3638
final int actions;
3739
private final TimeUnit flushIntervalUnit;
@@ -46,6 +48,7 @@ public static final class Builder {
4648
private int actions;
4749
private TimeUnit flushIntervalUnit;
4850
private int flushInterval;
51+
private BiConsumer<Iterable<Point>, Throwable> exceptionHandler = (entries, throwable) -> { };
4952

5053
/**
5154
* @param threadFactory
@@ -92,6 +95,19 @@ public Builder interval(final int interval, final TimeUnit unit) {
9295
return this;
9396
}
9497

98+
/**
99+
* A callback to be used when an error occurs during a batchwrite.
100+
*
101+
* @param handler
102+
* the handler
103+
*
104+
* @return this Builder to use it fluent
105+
*/
106+
public Builder exceptionHandler(final BiConsumer<Iterable<Point>, Throwable> handler) {
107+
this.exceptionHandler = handler;
108+
return this;
109+
}
110+
95111
/**
96112
* Create the BatchProcessor.
97113
*
@@ -103,8 +119,9 @@ public BatchProcessor build() {
103119
Preconditions.checkArgument(this.flushInterval > 0, "flushInterval should > 0");
104120
Preconditions.checkNotNull(this.flushIntervalUnit, "flushIntervalUnit may not be null");
105121
Preconditions.checkNotNull(this.threadFactory, "threadFactory may not be null");
122+
Preconditions.checkNotNull(this.exceptionHandler, "exceptionHandler may not be null");
106123
return new BatchProcessor(this.influxDB, this.threadFactory, this.actions, this.flushIntervalUnit,
107-
this.flushInterval);
124+
this.flushInterval, exceptionHandler);
108125
}
109126
}
110127

@@ -164,14 +181,16 @@ public static Builder builder(final InfluxDB influxDB) {
164181
}
165182

166183
BatchProcessor(final InfluxDBImpl influxDB, final ThreadFactory threadFactory, final int actions,
167-
final TimeUnit flushIntervalUnit, final int flushInterval) {
184+
final TimeUnit flushIntervalUnit, final int flushInterval,
185+
final BiConsumer<Iterable<Point>, Throwable> exceptionHandler) {
168186
super();
169187
this.influxDB = influxDB;
170188
this.actions = actions;
171189
this.flushIntervalUnit = flushIntervalUnit;
172190
this.flushInterval = flushInterval;
173191
this.scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory);
174-
if (actions > 1 && actions < Integer.MAX_VALUE) {
192+
this.exceptionHandler = exceptionHandler;
193+
if (actions > 1 && actions < Integer.MAX_VALUE) {
175194
this.queue = new LinkedBlockingQueue<>(actions);
176195
} else {
177196
this.queue = new LinkedBlockingQueue<>();
@@ -187,6 +206,7 @@ public void run() {
187206
}
188207

189208
void write() {
209+
List<Point> currentBatch = null;
190210
try {
191211
if (this.queue.isEmpty()) {
192212
return;
@@ -197,9 +217,11 @@ void write() {
197217
Map<Integer, List<String>> udpPortToBatchPoints = Maps.newHashMap();
198218
List<AbstractBatchEntry> batchEntries = new ArrayList<>(this.queue.size());
199219
this.queue.drainTo(batchEntries);
220+
currentBatch = new ArrayList<>(batchEntries.size());
200221

201222
for (AbstractBatchEntry batchEntry : batchEntries) {
202223
Point point = batchEntry.getPoint();
224+
currentBatch.add(point);
203225
if (batchEntry instanceof HttpBatchEntry) {
204226
HttpBatchEntry httpBatchEntry = HttpBatchEntry.class.cast(batchEntry);
205227
String dbName = httpBatchEntry.getDb();
@@ -232,6 +254,7 @@ void write() {
232254
}
233255
} catch (Throwable t) {
234256
// any exception wouldn't stop the scheduler
257+
exceptionHandler.accept(currentBatch, t);
235258
LOG.log(Level.SEVERE, "Batch could not be sent. Data will be lost", t);
236259
}
237260
}

src/main/java/org/influxdb/impl/InfluxDBImpl.java

+15-5
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.util.concurrent.TimeUnit;
4848
import java.util.concurrent.atomic.AtomicBoolean;
4949
import java.util.concurrent.atomic.AtomicLong;
50+
import java.util.function.BiConsumer;
5051
import java.util.function.Consumer;
5152

5253
/**
@@ -160,15 +161,24 @@ public InfluxDB enableBatch(final int actions, final int flushDuration,
160161
@Override
161162
public InfluxDB enableBatch(final int actions, final int flushDuration,
162163
final TimeUnit flushDurationTimeUnit, final ThreadFactory threadFactory) {
164+
enableBatch(actions, flushDuration, flushDurationTimeUnit, threadFactory, (points, throwable) -> { });
165+
return this;
166+
}
167+
168+
@Override
169+
public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit,
170+
final ThreadFactory threadFactory,
171+
final BiConsumer<Iterable<Point>, Throwable> exceptionHandler) {
163172
if (this.batchEnabled.get()) {
164173
throw new IllegalStateException("BatchProcessing is already enabled.");
165174
}
166175
this.batchProcessor = BatchProcessor
167-
.builder(this)
168-
.actions(actions)
169-
.interval(flushDuration, flushDurationTimeUnit)
170-
.threadFactory(threadFactory)
171-
.build();
176+
.builder(this)
177+
.actions(actions)
178+
.exceptionHandler(exceptionHandler)
179+
.interval(flushDuration, flushDurationTimeUnit)
180+
.threadFactory(threadFactory)
181+
.build();
172182
this.batchEnabled.set(true);
173183
return this;
174184
}

src/test/java/org/influxdb/impl/BatchProcessorTest.java

+24-2
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
package org.influxdb.impl;
22

3+
import static org.hamcrest.CoreMatchers.hasItems;
34
import static org.mockito.Mockito.any;
45
import static org.mockito.Mockito.doThrow;
56
import static org.mockito.Mockito.mock;
67
import static org.mockito.Mockito.times;
78
import static org.mockito.Mockito.verify;
89
import static org.mockito.Mockito.verifyNoMoreInteractions;
10+
import static org.mockito.hamcrest.MockitoHamcrest.argThat;
911

1012
import java.io.IOException;
1113
import java.util.concurrent.TimeUnit;
14+
import java.util.function.BiConsumer;
1215

1316
import org.influxdb.InfluxDB;
1417
import org.influxdb.dto.BatchPoints;
@@ -41,6 +44,25 @@ public void testSchedulerExceptionHandling() throws InterruptedException, IOExce
4144
verify(mockInfluxDB, times(2)).write(any(BatchPoints.class));
4245
}
4346

47+
@Test
48+
public void testSchedulerExceptionHandlingCallback() throws InterruptedException, IOException {
49+
InfluxDB mockInfluxDB = mock(InfluxDBImpl.class);
50+
BiConsumer<Iterable<Point>, Throwable> mockHandler = mock(BiConsumer.class);
51+
BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB).actions(Integer.MAX_VALUE)
52+
.interval(1, TimeUnit.NANOSECONDS).exceptionHandler(mockHandler).build();
53+
54+
doThrow(new RuntimeException()).when(mockInfluxDB).write(any(BatchPoints.class));
55+
56+
Point point = Point.measurement("cpu").field("6", "").build();
57+
BatchProcessor.HttpBatchEntry batchEntry1 = new BatchProcessor.HttpBatchEntry(point, "db1", "");
58+
BatchProcessor.HttpBatchEntry batchEntry2 = new BatchProcessor.HttpBatchEntry(point, "db2", "");
59+
60+
batchProcessor.put(batchEntry1);
61+
Thread.sleep(200); // wait for scheduler
62+
63+
verify(mockHandler, times(1)).accept(argThat(hasItems(point, point)), any(RuntimeException.class));
64+
}
65+
4466
@Test
4567
public void testBatchWriteWithDifferenctRp() throws InterruptedException, IOException {
4668
InfluxDB mockInfluxDB = mock(InfluxDBImpl.class);
@@ -91,14 +113,14 @@ public void testActionsIsZero() throws InterruptedException, IOException {
91113
BatchProcessor.builder(mockInfluxDB).actions(0)
92114
.interval(1, TimeUnit.NANOSECONDS).build();
93115
}
94-
116+
95117
@Test(expected = IllegalArgumentException.class)
96118
public void testIntervalIsZero() throws InterruptedException, IOException {
97119
InfluxDB mockInfluxDB = mock(InfluxDBImpl.class);
98120
BatchProcessor.builder(mockInfluxDB).actions(1)
99121
.interval(0, TimeUnit.NANOSECONDS).build();
100122
}
101-
123+
102124
@Test(expected = NullPointerException.class)
103125
public void testInfluxDBIsNull() throws InterruptedException, IOException {
104126
InfluxDB mockInfluxDB = null;

0 commit comments

Comments
 (0)