Skip to content

Commit 30fa0ee

Browse files
authored
Merge pull request #292 from jganoff/feature-289-flush
Added InfluxDB.flush()
2 parents 46c29a3 + 4bf7459 commit 30fa0ee

File tree

6 files changed

+84
-2
lines changed

6 files changed

+84
-2
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
- Support chunking
66
- Add a databaseExists method to InfluxDB interface
7+
- [Issue #289] (https://github.com/influxdata/influxdb-java/issues/289) Batching enhancements: Pending asynchronous writes can be explicitly flushed via `InfluxDB.flush()`.
78

89
#### Fixes
910

src/main/java/org/influxdb/InfluxDB.java

+8
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,14 @@ public void write(final String database, final String retentionPolicy,
273273
*/
274274
public boolean databaseExists(final String name);
275275

276+
/**
277+
* Send any buffered points to InfluxDB. This method is synchronous and will block while all pending points are
278+
* written.
279+
*
280+
* @throws IllegalStateException if batching is not enabled.
281+
*/
282+
public void flush();
283+
276284
/**
277285
* close thread for asynchronous batch write and UDP socket to release resources if need.
278286
*/

src/main/java/org/influxdb/impl/BatchProcessor.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -263,9 +263,15 @@ public void run() {
263263
* called if no batch processing is needed anymore.
264264
*
265265
*/
266-
void flush() {
266+
void flushAndShutdown() {
267267
this.write();
268268
this.scheduler.shutdown();
269269
}
270270

271+
/**
272+
* Flush the current open writes to InfluxDB. This will block until all pending points are written.
273+
*/
274+
void flush() {
275+
this.write();
276+
}
271277
}

src/main/java/org/influxdb/impl/InfluxDBImpl.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ public InfluxDB enableBatch(final int actions, final int flushDuration,
177177
public void disableBatch() {
178178
this.batchEnabled.set(false);
179179
if (this.batchProcessor != null) {
180-
this.batchProcessor.flush();
180+
this.batchProcessor.flushAndShutdown();
181181
if (this.logLevel != LogLevel.NONE) {
182182
System.out.println(
183183
"total writes:" + this.writeCount.get()
@@ -460,6 +460,17 @@ private <T> T execute(final Call<T> call) {
460460
}
461461
}
462462

463+
/**
464+
* {@inheritDoc}
465+
*/
466+
@Override
467+
public void flush() {
468+
if (!batchEnabled.get()) {
469+
throw new IllegalStateException("BatchProcessing is not enabled.");
470+
}
471+
batchProcessor.flush();
472+
}
473+
463474
/**
464475
* {@inheritDoc}
465476
*/

src/test/java/org/influxdb/InfluxDBTest.java

+29
Original file line numberDiff line numberDiff line change
@@ -621,4 +621,33 @@ public void accept(QueryResult result) {
621621
}
622622
}
623623

624+
@Test
625+
public void testFlushPendingWritesWhenBatchingEnabled() {
626+
String dbName = "flush_tests_" + System.currentTimeMillis();
627+
try {
628+
this.influxDB.createDatabase(dbName);
629+
630+
// Enable batching with a very large buffer and flush interval so writes will be triggered by our call to flush().
631+
this.influxDB.enableBatch(Integer.MAX_VALUE, Integer.MAX_VALUE, TimeUnit.HOURS);
632+
633+
String measurement = TestUtils.getRandomMeasurement();
634+
Point point = Point.measurement(measurement).tag("atag", "test").addField("used", 80L).addField("free", 1L).build();
635+
this.influxDB.write(dbName, TestUtils.defaultRetentionPolicy(this.influxDB.version()), point);
636+
this.influxDB.flush();
637+
638+
Query query = new Query("SELECT * FROM " + measurement + " GROUP BY *", dbName);
639+
QueryResult result = this.influxDB.query(query);
640+
Assert.assertFalse(result.getResults().get(0).getSeries().get(0).getTags().isEmpty());
641+
} finally {
642+
this.influxDB.deleteDatabase(dbName);
643+
this.influxDB.disableBatch();
644+
}
645+
}
646+
647+
@Test(expected = IllegalStateException.class)
648+
public void testFlushThrowsIfBatchingIsNotEnabled() {
649+
Assert.assertFalse(this.influxDB.isBatchEnabled());
650+
this.influxDB.flush();
651+
}
652+
624653
}

src/test/java/org/influxdb/impl/BatchProcessorTest.java

+27
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import static org.mockito.Mockito.mock;
66
import static org.mockito.Mockito.times;
77
import static org.mockito.Mockito.verify;
8+
import static org.mockito.Mockito.verifyNoMoreInteractions;
89

910
import java.io.IOException;
1011
import java.util.concurrent.TimeUnit;
@@ -58,6 +59,32 @@ public void testBatchWriteWithDifferenctRp() throws InterruptedException, IOExce
5859
verify(mockInfluxDB, times(2)).write(any(BatchPoints.class));
5960
}
6061

62+
@Test
63+
public void testFlushWritesBufferedPointsAndDoesNotShutdownScheduler() throws InterruptedException {
64+
InfluxDB mockInfluxDB = mock(InfluxDBImpl.class);
65+
BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB)
66+
.actions(Integer.MAX_VALUE)
67+
.interval(1, TimeUnit.NANOSECONDS).build();
68+
69+
Point point = Point.measurement("test").addField("region", "a").build();
70+
BatchProcessor.HttpBatchEntry httpBatchEntry = new BatchProcessor.HttpBatchEntry(point, "http", "http-rp");
71+
72+
batchProcessor.put(httpBatchEntry);
73+
Thread.sleep(100); // wait for scheduler
74+
// Our put should have been written
75+
verify(mockInfluxDB).write(any(BatchPoints.class));
76+
77+
// Force a flush which should not stop the scheduler
78+
batchProcessor.flush();
79+
80+
batchProcessor.put(httpBatchEntry);
81+
Thread.sleep(100); // wait for scheduler
82+
// Our second put should have been written if the scheduler is still running
83+
verify(mockInfluxDB, times(2)).write(any(BatchPoints.class));
84+
85+
verifyNoMoreInteractions(mockInfluxDB);
86+
}
87+
6188
@Test(expected = IllegalArgumentException.class)
6289
public void testActionsIsZero() throws InterruptedException, IOException {
6390
InfluxDB mockInfluxDB = mock(InfluxDBImpl.class);

0 commit comments

Comments
 (0)