Skip to content

Commit cce2c9b

Browse files
committed
Added InfluxDB.flush() and consistency level can be configured for batching.
1 parent 46c29a3 commit cce2c9b

File tree

6 files changed

+193
-31
lines changed

6 files changed

+193
-31
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
- Support chunking
66
- Add a databaseExists method to InfluxDB interface
7+
- [Issue #289] (https://github.com/influxdata/influxdb-java/issues/289) Batching enhancements: Pending asynchronous writes can be explicitly flushed via `InfluxDB.flush()`.
78

89
#### Fixes
910

@@ -15,6 +16,7 @@
1516
- Update slf4j from 1.7.22 to 1.7.24
1617
- Update okhttp3 from 3.5 to 3.6
1718
- automatically adjust batch processor capacity [PR #282]
19+
- [Issue #289] (https://github.com/influxdata/influxdb-java/issues/289) Batching enhancements: Consistency Level may be specified when batching is enabled.
1820

1921
## v2.5 [2016-12-05]
2022

src/main/java/org/influxdb/InfluxDB.java

+26-3
Original file line numberDiff line numberDiff line change
@@ -94,13 +94,23 @@ public String value() {
9494
public boolean isGzipEnabled();
9595

9696
/**
97-
* Enable batching of single Point writes as {@link #enableBatch(int, int, TimeUnit, ThreadFactory)}}
98-
* using {@linkplain java.util.concurrent.Executors#defaultThreadFactory() default thread factory}.
97+
* Enable batching of single Point writes as {@link #enableBatch(int, int, TimeUnit, ConsistencyLevel, ThreadFactory)}
98+
* with a default consistency level of {@link ConsistencyLevel#QUORUM QUORUM} using
99+
* {@linkplain java.util.concurrent.Executors#defaultThreadFactory() default thread factory}.
99100
*
100-
* @see #enableBatch(int, int, TimeUnit, ThreadFactory)
101+
* @see #enableBatch(int, int, TimeUnit, ConsistencyLevel, ThreadFactory)
101102
*/
102103
public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit);
103104

105+
/**
106+
* Enable batching of single Point writes as {@link #enableBatch(int, int, TimeUnit, ConsistencyLevel, ThreadFactory)}
107+
* using {@linkplain java.util.concurrent.Executors#defaultThreadFactory() default thread factory}.
108+
*
109+
* @see #enableBatch(int, int, TimeUnit, ConsistencyLevel, ThreadFactory)
110+
*/
111+
public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit,
112+
final ConsistencyLevel consistencyLevel);
113+
104114
/**
105115
* Enable batching of single Point writes to speed up writes significant. If either actions or
106116
* flushDurations is reached first, a batch write is issued.
@@ -112,10 +122,15 @@ public String value() {
112122
* @param flushDuration
113123
* the time to wait at most.
114124
* @param flushDurationTimeUnit
125+
* the unit the flush duration is measured in.
126+
* @param consistencyLevel
127+
* The write consistency level to use when writing batched points.
115128
* @param threadFactory
129+
* The thread factory to use when creating new threads to handle asynchronous writes.
116130
* @return the InfluxDB instance to be able to use it in a fluent manner.
117131
*/
118132
public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit,
133+
final ConsistencyLevel consistencyLevel,
119134
final ThreadFactory threadFactory);
120135

121136
/**
@@ -273,6 +288,14 @@ public void write(final String database, final String retentionPolicy,
273288
*/
274289
public boolean databaseExists(final String name);
275290

291+
/**
292+
* Send any buffered points to InfluxDB. This method is synchronous and will block while all pending points are
293+
* written.
294+
*
295+
* @throws IllegalStateException if batching is not enabled.
296+
*/
297+
public void flush();
298+
276299
/**
277300
* close thread for asynchronous batch write and UDP socket to release resources if need.
278301
*/

src/main/java/org/influxdb/impl/BatchProcessor.java

+27-3
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public class BatchProcessor {
3636
final int actions;
3737
private final TimeUnit flushIntervalUnit;
3838
private final int flushInterval;
39+
private final InfluxDB.ConsistencyLevel consistencyLevel;
3940

4041
/**
4142
* The Builder to create a BatchProcessor instance.
@@ -46,6 +47,7 @@ public static final class Builder {
4647
private int actions;
4748
private TimeUnit flushIntervalUnit;
4849
private int flushInterval;
50+
private InfluxDB.ConsistencyLevel consistencyLevel;
4951

5052
/**
5153
* @param threadFactory
@@ -92,6 +94,18 @@ public Builder interval(final int interval, final TimeUnit unit) {
9294
return this;
9395
}
9496

97+
/**
98+
* Set the consistency level writes should use.
99+
*
100+
* @param consistencyLevel
101+
* The consistency level.
102+
* @return this Builder to use it fluent
103+
*/
104+
public Builder consistencyLevel(final InfluxDB.ConsistencyLevel consistencyLevel) {
105+
this.consistencyLevel = consistencyLevel;
106+
return this;
107+
}
108+
95109
/**
96110
* Create the BatchProcessor.
97111
*
@@ -102,9 +116,10 @@ public BatchProcessor build() {
102116
Preconditions.checkArgument(this.actions > 0, "actions should > 0");
103117
Preconditions.checkArgument(this.flushInterval > 0, "flushInterval should > 0");
104118
Preconditions.checkNotNull(this.flushIntervalUnit, "flushIntervalUnit may not be null");
119+
Preconditions.checkNotNull(this.consistencyLevel, "consistencyLevel must not be null");
105120
Preconditions.checkNotNull(this.threadFactory, "threadFactory may not be null");
106121
return new BatchProcessor(this.influxDB, this.threadFactory, this.actions, this.flushIntervalUnit,
107-
this.flushInterval);
122+
this.flushInterval, this.consistencyLevel);
108123
}
109124
}
110125

@@ -164,12 +179,14 @@ public static Builder builder(final InfluxDB influxDB) {
164179
}
165180

166181
BatchProcessor(final InfluxDBImpl influxDB, final ThreadFactory threadFactory, final int actions,
167-
final TimeUnit flushIntervalUnit, final int flushInterval) {
182+
final TimeUnit flushIntervalUnit, final int flushInterval,
183+
final InfluxDB.ConsistencyLevel consistencyLevel) {
168184
super();
169185
this.influxDB = influxDB;
170186
this.actions = actions;
171187
this.flushIntervalUnit = flushIntervalUnit;
172188
this.flushInterval = flushInterval;
189+
this.consistencyLevel = consistencyLevel;
173190
this.scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory);
174191
if (actions > 1 && actions < Integer.MAX_VALUE) {
175192
this.queue = new LinkedBlockingQueue<>(actions);
@@ -207,6 +224,7 @@ void write() {
207224
String batchKey = dbName + "_" + rp;
208225
if (!batchKeyToBatchPoints.containsKey(batchKey)) {
209226
BatchPoints batchPoints = BatchPoints.database(dbName)
227+
.consistency(consistencyLevel)
210228
.retentionPolicy(rp).build();
211229
batchKeyToBatchPoints.put(batchKey, batchPoints);
212230
}
@@ -263,9 +281,15 @@ public void run() {
263281
* called if no batch processing is needed anymore.
264282
*
265283
*/
266-
void flush() {
284+
void flushAndShutdown() {
267285
this.write();
268286
this.scheduler.shutdown();
269287
}
270288

289+
/**
290+
* Flush the current open writes to InfluxDB. This will block until all pending points are written.
291+
*/
292+
void flush() {
293+
this.write();
294+
}
271295
}

src/main/java/org/influxdb/impl/InfluxDBImpl.java

+23-4
Original file line numberDiff line numberDiff line change
@@ -153,20 +153,28 @@ public boolean isGzipEnabled() {
153153
@Override
154154
public InfluxDB enableBatch(final int actions, final int flushDuration,
155155
final TimeUnit flushDurationTimeUnit) {
156-
enableBatch(actions, flushDuration, flushDurationTimeUnit, Executors.defaultThreadFactory());
156+
enableBatch(actions, flushDuration, flushDurationTimeUnit, ConsistencyLevel.ONE);
157157
return this;
158158
}
159159

160160
@Override
161-
public InfluxDB enableBatch(final int actions, final int flushDuration,
162-
final TimeUnit flushDurationTimeUnit, final ThreadFactory threadFactory) {
161+
public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit,
162+
final ConsistencyLevel consistencyLevel) {
163+
enableBatch(actions, flushDuration, flushDurationTimeUnit, consistencyLevel, Executors.defaultThreadFactory());
164+
return this;
165+
}
166+
167+
@Override
168+
public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit,
169+
final ConsistencyLevel consistencyLevel, final ThreadFactory threadFactory) {
163170
if (this.batchEnabled.get()) {
164171
throw new IllegalStateException("BatchProcessing is already enabled.");
165172
}
166173
this.batchProcessor = BatchProcessor
167174
.builder(this)
168175
.actions(actions)
169176
.interval(flushDuration, flushDurationTimeUnit)
177+
.consistencyLevel(consistencyLevel)
170178
.threadFactory(threadFactory)
171179
.build();
172180
this.batchEnabled.set(true);
@@ -177,7 +185,7 @@ public InfluxDB enableBatch(final int actions, final int flushDuration,
177185
public void disableBatch() {
178186
this.batchEnabled.set(false);
179187
if (this.batchProcessor != null) {
180-
this.batchProcessor.flush();
188+
this.batchProcessor.flushAndShutdown();
181189
if (this.logLevel != LogLevel.NONE) {
182190
System.out.println(
183191
"total writes:" + this.writeCount.get()
@@ -460,6 +468,17 @@ private <T> T execute(final Call<T> call) {
460468
}
461469
}
462470

471+
/**
472+
* {@inheritDoc}
473+
*/
474+
@Override
475+
public void flush() {
476+
if (!batchEnabled.get()) {
477+
throw new IllegalStateException("BatchProcessing is not enabled.");
478+
}
479+
batchProcessor.flush();
480+
}
481+
463482
/**
464483
* {@inheritDoc}
465484
*/

src/test/java/org/influxdb/InfluxDBTest.java

+43-15
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,6 @@
11
package org.influxdb;
22

3-
import java.io.IOException;
4-
import java.util.ArrayList;
5-
import java.util.Arrays;
6-
import java.util.List;
7-
import java.util.Set;
8-
import java.util.concurrent.BlockingQueue;
9-
import java.util.concurrent.CountDownLatch;
10-
import java.util.concurrent.LinkedBlockingQueue;
11-
import java.util.concurrent.ThreadFactory;
12-
import java.util.concurrent.TimeUnit;
13-
import java.util.function.Consumer;
14-
3+
import com.google.common.util.concurrent.Uninterruptibles;
154
import org.influxdb.InfluxDB.LogLevel;
165
import org.influxdb.dto.BatchPoints;
176
import org.influxdb.dto.Point;
@@ -25,7 +14,17 @@
2514
import org.junit.Test;
2615
import org.junit.rules.ExpectedException;
2716

28-
import com.google.common.util.concurrent.Uninterruptibles;
17+
import java.io.IOException;
18+
import java.util.ArrayList;
19+
import java.util.Arrays;
20+
import java.util.List;
21+
import java.util.Set;
22+
import java.util.concurrent.BlockingQueue;
23+
import java.util.concurrent.CountDownLatch;
24+
import java.util.concurrent.LinkedBlockingQueue;
25+
import java.util.concurrent.ThreadFactory;
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.function.Consumer;
2928

3029
/**
3130
* Test the InfluxDB API.
@@ -414,12 +413,12 @@ public void testIsBatchEnabled() {
414413
}
415414

416415
/**
417-
* Test the implementation of {@link InfluxDB#enableBatch(int, int, TimeUnit, ThreadFactory)}.
416+
* Test the implementation of {@link InfluxDB#enableBatch(int, int, TimeUnit, InfluxDB.ConsistencyLevel, ThreadFactory)}.
418417
*/
419418
@Test
420419
public void testBatchEnabledWithThreadFactory() {
421420
final String threadName = "async_influxdb_write";
422-
this.influxDB.enableBatch(1, 1, TimeUnit.SECONDS, new ThreadFactory() {
421+
this.influxDB.enableBatch(1, 1, TimeUnit.SECONDS, InfluxDB.ConsistencyLevel.ONE, new ThreadFactory() {
423422

424423
@Override
425424
public Thread newThread(Runnable r) {
@@ -621,4 +620,33 @@ public void accept(QueryResult result) {
621620
}
622621
}
623622

623+
@Test
624+
public void testFlushPendingWritesWhenBatchingEnabled() {
625+
String dbName = "flush_tests_" + System.currentTimeMillis();
626+
try {
627+
this.influxDB.createDatabase(dbName);
628+
629+
// Enable batching with a very large buffer and flush interval so writes will be triggered by our call to flush().
630+
this.influxDB.enableBatch(Integer.MAX_VALUE, Integer.MAX_VALUE, TimeUnit.HOURS);
631+
632+
String measurement = TestUtils.getRandomMeasurement();
633+
Point point = Point.measurement(measurement).tag("atag", "test").addField("used", 80L).addField("free", 1L).build();
634+
this.influxDB.write(dbName, TestUtils.defaultRetentionPolicy(this.influxDB.version()), point);
635+
this.influxDB.flush();
636+
637+
Query query = new Query("SELECT * FROM " + measurement + " GROUP BY *", dbName);
638+
QueryResult result = this.influxDB.query(query);
639+
Assert.assertFalse(result.getResults().get(0).getSeries().get(0).getTags().isEmpty());
640+
} finally {
641+
this.influxDB.deleteDatabase(dbName);
642+
this.influxDB.disableBatch();
643+
}
644+
}
645+
646+
@Test(expected = IllegalStateException.class)
647+
public void testFlushThrowsIfBatchingIsNotEnabled() {
648+
Assert.assertFalse(this.influxDB.isBatchEnabled());
649+
this.influxDB.flush();
650+
}
651+
624652
}

0 commit comments

Comments
 (0)