Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added InfluxDB.flush() and consistency level can be configured for batching #290

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- Support chunking
- Add a databaseExists method to InfluxDB interface
- [Issue #289] (https://github.com/influxdata/influxdb-java/issues/289) Batching enhancements: Pending asynchronous writes can be explicitly flushed via `InfluxDB.flush()`.

#### Fixes

Expand All @@ -15,6 +16,7 @@
- Update slf4j from 1.7.22 to 1.7.24
- Update okhttp3 from 3.5 to 3.6
- automatically adjust batch processor capacity [PR #282]
- [Issue #289] (https://github.com/influxdata/influxdb-java/issues/289) Batching enhancements: Consistency Level may be specified when batching is enabled.

## v2.5 [2016-12-05]

Expand Down
29 changes: 26 additions & 3 deletions src/main/java/org/influxdb/InfluxDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,23 @@ public String value() {
public boolean isGzipEnabled();

/**
* Enable batching of single Point writes as {@link #enableBatch(int, int, TimeUnit, ThreadFactory)}}
* using {@linkplain java.util.concurrent.Executors#defaultThreadFactory() default thread factory}.
* Enable batching of single Point writes as {@link #enableBatch(int, int, TimeUnit, ConsistencyLevel, ThreadFactory)}
* with a default consistency level of {@link ConsistencyLevel#QUORUM QUORUM} using
* {@linkplain java.util.concurrent.Executors#defaultThreadFactory() default thread factory}.
*
* @see #enableBatch(int, int, TimeUnit, ThreadFactory)
* @see #enableBatch(int, int, TimeUnit, ConsistencyLevel, ThreadFactory)
*/
public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit);

/**
* Enable batching of single Point writes as {@link #enableBatch(int, int, TimeUnit, ConsistencyLevel, ThreadFactory)}
* using {@linkplain java.util.concurrent.Executors#defaultThreadFactory() default thread factory}.
*
* @see #enableBatch(int, int, TimeUnit, ConsistencyLevel, ThreadFactory)
*/
public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit,
final ConsistencyLevel consistencyLevel);

/**
* Enable batching of single Point writes to speed up writes significant. If either actions or
* flushDurations is reached first, a batch write is issued.
Expand All @@ -112,10 +122,15 @@ public String value() {
* @param flushDuration
* the time to wait at most.
* @param flushDurationTimeUnit
* the unit the flush duration is measured in.
* @param consistencyLevel
* The write consistency level to use when writing batched points.
* @param threadFactory
* The thread factory to use when creating new threads to handle asynchronous writes.
* @return the InfluxDB instance to be able to use it in a fluent manner.
*/
public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit,
final ConsistencyLevel consistencyLevel,
final ThreadFactory threadFactory);

/**
Expand Down Expand Up @@ -273,6 +288,14 @@ public void write(final String database, final String retentionPolicy,
*/
public boolean databaseExists(final String name);

/**
* Send any buffered points to InfluxDB. This method is synchronous and will block while all pending points are
* written.
*
* @throws IllegalStateException if batching is not enabled.
*/
public void flush();

/**
* close thread for asynchronous batch write and UDP socket to release resources if need.
*/
Expand Down
30 changes: 27 additions & 3 deletions src/main/java/org/influxdb/impl/BatchProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class BatchProcessor {
final int actions;
private final TimeUnit flushIntervalUnit;
private final int flushInterval;
private final InfluxDB.ConsistencyLevel consistencyLevel;

/**
* The Builder to create a BatchProcessor instance.
Expand All @@ -46,6 +47,7 @@ public static final class Builder {
private int actions;
private TimeUnit flushIntervalUnit;
private int flushInterval;
private InfluxDB.ConsistencyLevel consistencyLevel;

/**
* @param threadFactory
Expand Down Expand Up @@ -92,6 +94,18 @@ public Builder interval(final int interval, final TimeUnit unit) {
return this;
}

/**
* Set the consistency level writes should use.
*
* @param consistencyLevel
* The consistency level.
* @return this Builder to use it fluent
*/
public Builder consistencyLevel(final InfluxDB.ConsistencyLevel consistencyLevel) {
this.consistencyLevel = consistencyLevel;
return this;
}

/**
* Create the BatchProcessor.
*
Expand All @@ -102,9 +116,10 @@ public BatchProcessor build() {
Preconditions.checkArgument(this.actions > 0, "actions should > 0");
Preconditions.checkArgument(this.flushInterval > 0, "flushInterval should > 0");
Preconditions.checkNotNull(this.flushIntervalUnit, "flushIntervalUnit may not be null");
Preconditions.checkNotNull(this.consistencyLevel, "consistencyLevel must not be null");
Preconditions.checkNotNull(this.threadFactory, "threadFactory may not be null");
return new BatchProcessor(this.influxDB, this.threadFactory, this.actions, this.flushIntervalUnit,
this.flushInterval);
this.flushInterval, this.consistencyLevel);
}
}

Expand Down Expand Up @@ -164,12 +179,14 @@ public static Builder builder(final InfluxDB influxDB) {
}

BatchProcessor(final InfluxDBImpl influxDB, final ThreadFactory threadFactory, final int actions,
final TimeUnit flushIntervalUnit, final int flushInterval) {
final TimeUnit flushIntervalUnit, final int flushInterval,
final InfluxDB.ConsistencyLevel consistencyLevel) {
super();
this.influxDB = influxDB;
this.actions = actions;
this.flushIntervalUnit = flushIntervalUnit;
this.flushInterval = flushInterval;
this.consistencyLevel = consistencyLevel;
this.scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory);
if (actions > 1 && actions < Integer.MAX_VALUE) {
this.queue = new LinkedBlockingQueue<>(actions);
Expand Down Expand Up @@ -207,6 +224,7 @@ void write() {
String batchKey = dbName + "_" + rp;
if (!batchKeyToBatchPoints.containsKey(batchKey)) {
BatchPoints batchPoints = BatchPoints.database(dbName)
.consistency(consistencyLevel)
.retentionPolicy(rp).build();
batchKeyToBatchPoints.put(batchKey, batchPoints);
}
Expand Down Expand Up @@ -263,9 +281,15 @@ public void run() {
* called if no batch processing is needed anymore.
*
*/
void flush() {
void flushAndShutdown() {
this.write();
this.scheduler.shutdown();
}

/**
* Flush the current open writes to InfluxDB. This will block until all pending points are written.
*/
void flush() {
this.write();
}
}
27 changes: 23 additions & 4 deletions src/main/java/org/influxdb/impl/InfluxDBImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,20 +153,28 @@ public boolean isGzipEnabled() {
@Override
public InfluxDB enableBatch(final int actions, final int flushDuration,
final TimeUnit flushDurationTimeUnit) {
enableBatch(actions, flushDuration, flushDurationTimeUnit, Executors.defaultThreadFactory());
enableBatch(actions, flushDuration, flushDurationTimeUnit, ConsistencyLevel.ONE);
return this;
}

@Override
public InfluxDB enableBatch(final int actions, final int flushDuration,
final TimeUnit flushDurationTimeUnit, final ThreadFactory threadFactory) {
public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit,
final ConsistencyLevel consistencyLevel) {
enableBatch(actions, flushDuration, flushDurationTimeUnit, consistencyLevel, Executors.defaultThreadFactory());
return this;
}

@Override
public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit,
final ConsistencyLevel consistencyLevel, final ThreadFactory threadFactory) {
if (this.batchEnabled.get()) {
throw new IllegalStateException("BatchProcessing is already enabled.");
}
this.batchProcessor = BatchProcessor
.builder(this)
.actions(actions)
.interval(flushDuration, flushDurationTimeUnit)
.consistencyLevel(consistencyLevel)
.threadFactory(threadFactory)
.build();
this.batchEnabled.set(true);
Expand All @@ -177,7 +185,7 @@ public InfluxDB enableBatch(final int actions, final int flushDuration,
public void disableBatch() {
this.batchEnabled.set(false);
if (this.batchProcessor != null) {
this.batchProcessor.flush();
this.batchProcessor.flushAndShutdown();
if (this.logLevel != LogLevel.NONE) {
System.out.println(
"total writes:" + this.writeCount.get()
Expand Down Expand Up @@ -460,6 +468,17 @@ private <T> T execute(final Call<T> call) {
}
}

/**
* {@inheritDoc}
*/
@Override
public void flush() {
if (!batchEnabled.get()) {
throw new IllegalStateException("BatchProcessing is not enabled.");
}
batchProcessor.flush();
}

/**
* {@inheritDoc}
*/
Expand Down
58 changes: 43 additions & 15 deletions src/test/java/org/influxdb/InfluxDBTest.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,6 @@
package org.influxdb;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import com.google.common.util.concurrent.Uninterruptibles;
import org.influxdb.InfluxDB.LogLevel;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
Expand All @@ -25,7 +14,17 @@
import org.junit.Test;
import org.junit.rules.ExpectedException;

import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

/**
* Test the InfluxDB API.
Expand Down Expand Up @@ -414,12 +413,12 @@ public void testIsBatchEnabled() {
}

/**
* Test the implementation of {@link InfluxDB#enableBatch(int, int, TimeUnit, ThreadFactory)}.
* Test the implementation of {@link InfluxDB#enableBatch(int, int, TimeUnit, InfluxDB.ConsistencyLevel, ThreadFactory)}.
*/
@Test
public void testBatchEnabledWithThreadFactory() {
final String threadName = "async_influxdb_write";
this.influxDB.enableBatch(1, 1, TimeUnit.SECONDS, new ThreadFactory() {
this.influxDB.enableBatch(1, 1, TimeUnit.SECONDS, InfluxDB.ConsistencyLevel.ONE, new ThreadFactory() {

@Override
public Thread newThread(Runnable r) {
Expand Down Expand Up @@ -621,4 +620,33 @@ public void accept(QueryResult result) {
}
}

@Test
public void testFlushPendingWritesWhenBatchingEnabled() {
String dbName = "flush_tests_" + System.currentTimeMillis();
try {
this.influxDB.createDatabase(dbName);

// Enable batching with a very large buffer and flush interval so writes will be triggered by our call to flush().
this.influxDB.enableBatch(Integer.MAX_VALUE, Integer.MAX_VALUE, TimeUnit.HOURS);

String measurement = TestUtils.getRandomMeasurement();
Point point = Point.measurement(measurement).tag("atag", "test").addField("used", 80L).addField("free", 1L).build();
this.influxDB.write(dbName, TestUtils.defaultRetentionPolicy(this.influxDB.version()), point);
this.influxDB.flush();

Query query = new Query("SELECT * FROM " + measurement + " GROUP BY *", dbName);
QueryResult result = this.influxDB.query(query);
Assert.assertFalse(result.getResults().get(0).getSeries().get(0).getTags().isEmpty());
} finally {
this.influxDB.deleteDatabase(dbName);
this.influxDB.disableBatch();
}
}

@Test(expected = IllegalStateException.class)
public void testFlushThrowsIfBatchingIsNotEnabled() {
Assert.assertFalse(this.influxDB.isBatchEnabled());
this.influxDB.flush();
}

}
Loading