Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add exception callback to handle async errors #318

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,16 @@ Query query = new Query("SELECT idle FROM cpu", dbName);
influxDB.query(query);
influxDB.deleteDatabase(dbName);
```
Note that the batching functionality creates an internal thread pool that needs to be shutdown explicitly as part of a gracefull application shut-down, or the application will not shut down properly. To do so simply call: ```influxDB.close()```
Note that the batching functionality creates an internal thread pool that needs to be shutdown explicitly as part of a graceful application shut-down, or the application will not shut down properly. To do so simply call: ```influxDB.close()```

Also note that any errors that happen during the batch flush won't leak into the caller of the `write` method. By default, any kind of errors will be just logged with "SEVERE" level.

If you need to be notified and do some custom logic when such asynchronous errors happen, you can add an error handler with a `Consumer<Throwable>` using the overloaded `enableBatch` method:

```java
// Flush every 2000 Points, at least every 100ms
influxDB.enableBatch(2000, 100, TimeUnit.MILLISECONDS, Executors.defaultThreadFactory(), (throwable) -> { /* custom error handling here */ });
```

### Advanced Usages:

Expand Down
14 changes: 13 additions & 1 deletion src/main/java/org/influxdb/InfluxDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,16 @@ public String value() {
*/
public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit);

/**
* Enable batching of single Point writes as
* {@link #enableBatch(int, int, TimeUnit, ThreadFactory, Consumer<Throwable>)}
* using with a exceptionHandler that does nothing.
*
* @see #enableBatch(int, int, TimeUnit, ThreadFactory, Consumer<Throwable>)
*/
public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit,
final ThreadFactory threadFactory);

/**
* Enable batching of single Point writes to speed up writes significant. If either actions or
* flushDurations is reached first, a batch write is issued.
Expand All @@ -113,10 +123,12 @@ public String value() {
* the time to wait at most.
* @param flushDurationTimeUnit
* @param threadFactory
* @param exceptionHandler
* a consumer function to handle asynchronous errors
* @return the InfluxDB instance to be able to use it in a fluent manner.
*/
public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit,
final ThreadFactory threadFactory);
final ThreadFactory threadFactory, final Consumer<Throwable> exceptionHandler);

/**
* Disable Batching.
Expand Down
26 changes: 23 additions & 3 deletions src/main/java/org/influxdb/impl/BatchProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -32,6 +33,7 @@ public class BatchProcessor {
private static final Logger LOG = Logger.getLogger(BatchProcessor.class.getName());
protected final BlockingQueue<AbstractBatchEntry> queue;
private final ScheduledExecutorService scheduler;
private final Consumer<Throwable> exceptionHandler;
final InfluxDBImpl influxDB;
final int actions;
private final TimeUnit flushIntervalUnit;
Expand All @@ -46,6 +48,7 @@ public static final class Builder {
private int actions;
private TimeUnit flushIntervalUnit;
private int flushInterval;
private Consumer<Throwable> exceptionHandler = throwable -> { };

/**
* @param threadFactory
Expand Down Expand Up @@ -92,6 +95,19 @@ public Builder interval(final int interval, final TimeUnit unit) {
return this;
}

/**
* A callback to be used when an error occurs during a batchwrite.
*
* @param handler
* the handler
*
* @return this Builder to use it fluent
*/
public Builder exceptionHandler(final Consumer<Throwable> handler) {
this.exceptionHandler = handler;
return this;
}

/**
* Create the BatchProcessor.
*
Expand All @@ -103,8 +119,9 @@ public BatchProcessor build() {
Preconditions.checkArgument(this.flushInterval > 0, "flushInterval should > 0");
Preconditions.checkNotNull(this.flushIntervalUnit, "flushIntervalUnit may not be null");
Preconditions.checkNotNull(this.threadFactory, "threadFactory may not be null");
Preconditions.checkNotNull(this.exceptionHandler, "exceptionHandler may not be null");
return new BatchProcessor(this.influxDB, this.threadFactory, this.actions, this.flushIntervalUnit,
this.flushInterval);
this.flushInterval, exceptionHandler);
}
}

Expand Down Expand Up @@ -164,14 +181,16 @@ public static Builder builder(final InfluxDB influxDB) {
}

BatchProcessor(final InfluxDBImpl influxDB, final ThreadFactory threadFactory, final int actions,
final TimeUnit flushIntervalUnit, final int flushInterval) {
final TimeUnit flushIntervalUnit, final int flushInterval,
final Consumer<Throwable> exceptionHandler) {
super();
this.influxDB = influxDB;
this.actions = actions;
this.flushIntervalUnit = flushIntervalUnit;
this.flushInterval = flushInterval;
this.scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory);
if (actions > 1 && actions < Integer.MAX_VALUE) {
this.exceptionHandler = exceptionHandler;
if (actions > 1 && actions < Integer.MAX_VALUE) {
this.queue = new LinkedBlockingQueue<>(actions);
} else {
this.queue = new LinkedBlockingQueue<>();
Expand Down Expand Up @@ -232,6 +251,7 @@ void write() {
}
} catch (Throwable t) {
// any exception wouldn't stop the scheduler
exceptionHandler.accept(t);
LOG.log(Level.SEVERE, "Batch could not be sent. Data will be lost", t);
}
}
Expand Down
17 changes: 12 additions & 5 deletions src/main/java/org/influxdb/impl/InfluxDBImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,15 +160,22 @@ public InfluxDB enableBatch(final int actions, final int flushDuration,
@Override
public InfluxDB enableBatch(final int actions, final int flushDuration,
final TimeUnit flushDurationTimeUnit, final ThreadFactory threadFactory) {
enableBatch(actions, flushDuration, flushDurationTimeUnit, threadFactory, (throwable) -> { });
return this;
}

@Override
public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit,
final ThreadFactory threadFactory, final Consumer<Throwable> exceptionHandler) {
if (this.batchEnabled.get()) {
throw new IllegalStateException("BatchProcessing is already enabled.");
}
this.batchProcessor = BatchProcessor
.builder(this)
.actions(actions)
.interval(flushDuration, flushDurationTimeUnit)
.threadFactory(threadFactory)
.build();
.builder(this)
.actions(actions)
.interval(flushDuration, flushDurationTimeUnit)
.threadFactory(threadFactory)
.build();
this.batchEnabled.set(true);
return this;
}
Expand Down
24 changes: 22 additions & 2 deletions src/test/java/org/influxdb/impl/BatchProcessorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
Expand Down Expand Up @@ -41,6 +42,25 @@ public void testSchedulerExceptionHandling() throws InterruptedException, IOExce
verify(mockInfluxDB, times(2)).write(any(BatchPoints.class));
}

@Test
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like the tests are not formatted with the same settings as the production code. I'm not sure if that's on purpose or accidental but I decided to use the same formatting as in the main code.

public void testSchedulerExceptionHandlingCallback() throws InterruptedException, IOException {
InfluxDB mockInfluxDB = mock(InfluxDBImpl.class);
Consumer<Throwable> mockHandler = mock(Consumer.class);
BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB).actions(Integer.MAX_VALUE)
.interval(1, TimeUnit.NANOSECONDS).exceptionHandler(mockHandler).build();

doThrow(new RuntimeException()).when(mockInfluxDB).write(any(BatchPoints.class));

Point point = Point.measurement("cpu").field("6", "").build();
BatchProcessor.HttpBatchEntry batchEntry1 = new BatchProcessor.HttpBatchEntry(point, "db1", "");
BatchProcessor.HttpBatchEntry batchEntry2 = new BatchProcessor.HttpBatchEntry(point, "db2", "");

batchProcessor.put(batchEntry1);
Thread.sleep(200); // wait for scheduler

verify(mockHandler, times(1)).accept(any(RuntimeException.class));
}

@Test
public void testBatchWriteWithDifferenctRp() throws InterruptedException, IOException {
InfluxDB mockInfluxDB = mock(InfluxDBImpl.class);
Expand Down Expand Up @@ -91,14 +111,14 @@ public void testActionsIsZero() throws InterruptedException, IOException {
BatchProcessor.builder(mockInfluxDB).actions(0)
.interval(1, TimeUnit.NANOSECONDS).build();
}

@Test(expected = IllegalArgumentException.class)
public void testIntervalIsZero() throws InterruptedException, IOException {
InfluxDB mockInfluxDB = mock(InfluxDBImpl.class);
BatchProcessor.builder(mockInfluxDB).actions(1)
.interval(0, TimeUnit.NANOSECONDS).build();
}

@Test(expected = NullPointerException.class)
public void testInfluxDBIsNull() throws InterruptedException, IOException {
InfluxDB mockInfluxDB = null;
Expand Down