Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write consistency can be set for batching mode #293

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
- Update slf4j from 1.7.22 to 1.7.24
- Update okhttp3 from 3.5 to 3.6
- automatically adjust batch processor capacity [PR #282]
- [Issue #289] (https://github.com/influxdata/influxdb-java/issues/289) Batching enhancements: Consistency Level may be specified when batching is enabled.

## v2.5 [2016-12-05]

Expand Down
21 changes: 18 additions & 3 deletions src/main/java/org/influxdb/InfluxDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,23 @@ public String value() {
public boolean isGzipEnabled();

/**
* Enable batching of single Point writes as {@link #enableBatch(int, int, TimeUnit, ThreadFactory)}}
* using {@linkplain java.util.concurrent.Executors#defaultThreadFactory() default thread factory}.
* Enable batching of single Point writes as {@link #enableBatch(int, int, TimeUnit, ConsistencyLevel, ThreadFactory)}
* with a default consistency level of {@link ConsistencyLevel#ONE ONE} using
* {@linkplain java.util.concurrent.Executors#defaultThreadFactory() default thread factory}.
*
* @see #enableBatch(int, int, TimeUnit, ThreadFactory)
* @see #enableBatch(int, int, TimeUnit, ConsistencyLevel, ThreadFactory)
*/
public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit);

/**
* Enable batching of single Point writes as {@link #enableBatch(int, int, TimeUnit, ConsistencyLevel, ThreadFactory)}
* using {@linkplain java.util.concurrent.Executors#defaultThreadFactory() default thread factory}.
*
* @see #enableBatch(int, int, TimeUnit, ConsistencyLevel, ThreadFactory)
*/
public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit,
final ConsistencyLevel consistencyLevel);

/**
* Enable batching of single Point writes to speed up writes significant. If either actions or
* flushDurations is reached first, a batch write is issued.
Expand All @@ -112,10 +122,15 @@ public String value() {
* @param flushDuration
* the time to wait at most.
* @param flushDurationTimeUnit
* the unit the flush duration is measured in.
* @param consistencyLevel
* The write consistency level to use when writing batched points.
* @param threadFactory
* The thread factory to use when creating new threads to handle asynchronous writes.
* @return the InfluxDB instance to be able to use it in a fluent manner.
*/
public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit,
final ConsistencyLevel consistencyLevel,
final ThreadFactory threadFactory);

/**
Expand Down
22 changes: 20 additions & 2 deletions src/main/java/org/influxdb/impl/BatchProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class BatchProcessor {
final int actions;
private final TimeUnit flushIntervalUnit;
private final int flushInterval;
private final InfluxDB.ConsistencyLevel consistencyLevel;

/**
* The Builder to create a BatchProcessor instance.
Expand All @@ -46,6 +47,7 @@ public static final class Builder {
private int actions;
private TimeUnit flushIntervalUnit;
private int flushInterval;
private InfluxDB.ConsistencyLevel consistencyLevel;

/**
* @param threadFactory
Expand Down Expand Up @@ -92,6 +94,18 @@ public Builder interval(final int interval, final TimeUnit unit) {
return this;
}

/**
* Set the consistency level writes should use.
*
* @param consistencyLevel
* The consistency level.
* @return this Builder to use it fluent
*/
public Builder consistencyLevel(final InfluxDB.ConsistencyLevel consistencyLevel) {
this.consistencyLevel = consistencyLevel;
return this;
}

/**
* Create the BatchProcessor.
*
Expand All @@ -102,9 +116,10 @@ public BatchProcessor build() {
Preconditions.checkArgument(this.actions > 0, "actions should > 0");
Preconditions.checkArgument(this.flushInterval > 0, "flushInterval should > 0");
Preconditions.checkNotNull(this.flushIntervalUnit, "flushIntervalUnit may not be null");
Preconditions.checkNotNull(this.consistencyLevel, "consistencyLevel must not be null");
Preconditions.checkNotNull(this.threadFactory, "threadFactory may not be null");
return new BatchProcessor(this.influxDB, this.threadFactory, this.actions, this.flushIntervalUnit,
this.flushInterval);
this.flushInterval, this.consistencyLevel);
}
}

Expand Down Expand Up @@ -164,12 +179,14 @@ public static Builder builder(final InfluxDB influxDB) {
}

BatchProcessor(final InfluxDBImpl influxDB, final ThreadFactory threadFactory, final int actions,
final TimeUnit flushIntervalUnit, final int flushInterval) {
final TimeUnit flushIntervalUnit, final int flushInterval,
final InfluxDB.ConsistencyLevel consistencyLevel) {
super();
this.influxDB = influxDB;
this.actions = actions;
this.flushIntervalUnit = flushIntervalUnit;
this.flushInterval = flushInterval;
this.consistencyLevel = consistencyLevel;
this.scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory);
if (actions > 1 && actions < Integer.MAX_VALUE) {
this.queue = new LinkedBlockingQueue<>(actions);
Expand Down Expand Up @@ -207,6 +224,7 @@ void write() {
String batchKey = dbName + "_" + rp;
if (!batchKeyToBatchPoints.containsKey(batchKey)) {
BatchPoints batchPoints = BatchPoints.database(dbName)
.consistency(consistencyLevel)
.retentionPolicy(rp).build();
batchKeyToBatchPoints.put(batchKey, batchPoints);
}
Expand Down
23 changes: 20 additions & 3 deletions src/main/java/org/influxdb/impl/InfluxDBImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,23 +150,40 @@ public boolean isGzipEnabled() {
return this.gzipRequestInterceptor.isEnabled();
}

/**
* {@inheritDoc}
*/
@Override
public InfluxDB enableBatch(final int actions, final int flushDuration,
final TimeUnit flushDurationTimeUnit) {
enableBatch(actions, flushDuration, flushDurationTimeUnit, Executors.defaultThreadFactory());
enableBatch(actions, flushDuration, flushDurationTimeUnit, ConsistencyLevel.ONE);
return this;
}

/**
* {@inheritDoc}
*/
@Override
public InfluxDB enableBatch(final int actions, final int flushDuration,
final TimeUnit flushDurationTimeUnit, final ThreadFactory threadFactory) {
public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit,
final ConsistencyLevel consistencyLevel) {
enableBatch(actions, flushDuration, flushDurationTimeUnit, consistencyLevel, Executors.defaultThreadFactory());
return this;
}

/**
* {@inheritDoc}
*/
@Override
public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit,
final ConsistencyLevel consistencyLevel, final ThreadFactory threadFactory) {
if (this.batchEnabled.get()) {
throw new IllegalStateException("BatchProcessing is already enabled.");
}
this.batchProcessor = BatchProcessor
.builder(this)
.actions(actions)
.interval(flushDuration, flushDurationTimeUnit)
.consistencyLevel(consistencyLevel)
.threadFactory(threadFactory)
.build();
this.batchEnabled.set(true);
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/org/influxdb/InfluxDBTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -414,12 +414,12 @@ public void testIsBatchEnabled() {
}

/**
* Test the implementation of {@link InfluxDB#enableBatch(int, int, TimeUnit, ThreadFactory)}.
* Test the implementation of {@link InfluxDB#enableBatch(int, int, TimeUnit, InfluxDB.ConsistencyLevel, ThreadFactory)}.
*/
@Test
public void testBatchEnabledWithThreadFactory() {
final String threadName = "async_influxdb_write";
this.influxDB.enableBatch(1, 1, TimeUnit.SECONDS, new ThreadFactory() {
this.influxDB.enableBatch(1, 1, TimeUnit.SECONDS, InfluxDB.ConsistencyLevel.ONE, new ThreadFactory() {

@Override
public Thread newThread(Runnable r) {
Expand Down
67 changes: 53 additions & 14 deletions src/test/java/org/influxdb/impl/BatchProcessorTest.java
Original file line number Diff line number Diff line change
@@ -1,27 +1,31 @@
package org.influxdb.impl;

import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.junit.Test;
import org.mockito.ArgumentCaptor;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.junit.Test;

public class BatchProcessorTest {

@Test
public void testSchedulerExceptionHandling() throws InterruptedException, IOException {
InfluxDB mockInfluxDB = mock(InfluxDBImpl.class);
BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB).actions(Integer.MAX_VALUE)
.interval(1, TimeUnit.NANOSECONDS).build();
BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB)
.actions(Integer.MAX_VALUE)
.consistencyLevel(InfluxDB.ConsistencyLevel.ONE)
.interval(1, TimeUnit.NANOSECONDS).build();

doThrow(new RuntimeException()).when(mockInfluxDB).write(any(BatchPoints.class));

Expand All @@ -44,7 +48,9 @@ public void testSchedulerExceptionHandling() throws InterruptedException, IOExce
@Test
public void testBatchWriteWithDifferenctRp() throws InterruptedException, IOException {
InfluxDB mockInfluxDB = mock(InfluxDBImpl.class);
BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB).actions(Integer.MAX_VALUE)
BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB)
.actions(Integer.MAX_VALUE)
.consistencyLevel(InfluxDB.ConsistencyLevel.ONE)
.interval(1, TimeUnit.NANOSECONDS).build();

Point point = Point.measurement("cpu").field("6", "").build();
Expand All @@ -59,11 +65,37 @@ public void testBatchWriteWithDifferenctRp() throws InterruptedException, IOExce
verify(mockInfluxDB, times(2)).write(any(BatchPoints.class));
}

@Test
public void testConsistencyLevelIsHonored() {
InfluxDB.ConsistencyLevel desiredConsistencyLevel = InfluxDB.ConsistencyLevel.QUORUM;

InfluxDB mockInfluxDB = mock(InfluxDBImpl.class);
BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB)
.actions(Integer.MAX_VALUE)
.consistencyLevel(desiredConsistencyLevel)
.interval(1, TimeUnit.DAYS).build();

Point point = Point.measurement("test").addField("region", "a").build();
BatchProcessor.HttpBatchEntry httpBatchEntry = new BatchProcessor.HttpBatchEntry(point, "http", "http-rp");

batchProcessor.put(httpBatchEntry);

batchProcessor.flush();

ArgumentCaptor<BatchPoints> batchPoints = ArgumentCaptor.forClass(BatchPoints.class);

verify(mockInfluxDB, times(1)).write(batchPoints.capture());

assertThat(batchPoints.getAllValues()).hasSize(1);
assertThat(batchPoints.getValue().getConsistency()).isEqualTo(desiredConsistencyLevel);
}

@Test
public void testFlushWritesBufferedPointsAndDoesNotShutdownScheduler() throws InterruptedException {
InfluxDB mockInfluxDB = mock(InfluxDBImpl.class);
BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB)
.actions(Integer.MAX_VALUE)
.consistencyLevel(InfluxDB.ConsistencyLevel.ONE)
.interval(1, TimeUnit.NANOSECONDS).build();

Point point = Point.measurement("test").addField("region", "a").build();
Expand All @@ -88,21 +120,28 @@ public void testFlushWritesBufferedPointsAndDoesNotShutdownScheduler() throws In
@Test(expected = IllegalArgumentException.class)
public void testActionsIsZero() throws InterruptedException, IOException {
InfluxDB mockInfluxDB = mock(InfluxDBImpl.class);
BatchProcessor.builder(mockInfluxDB).actions(0)
BatchProcessor.builder(mockInfluxDB).actions(0).consistencyLevel(InfluxDB.ConsistencyLevel.ONE)
.interval(1, TimeUnit.NANOSECONDS).build();
}

@Test(expected = IllegalArgumentException.class)
public void testIntervalIsZero() throws InterruptedException, IOException {
InfluxDB mockInfluxDB = mock(InfluxDBImpl.class);
BatchProcessor.builder(mockInfluxDB).actions(1)
BatchProcessor.builder(mockInfluxDB).actions(1).consistencyLevel(InfluxDB.ConsistencyLevel.ONE)
.interval(0, TimeUnit.NANOSECONDS).build();
}

@Test(expected = NullPointerException.class)
public void testInfluxDBIsNull() throws InterruptedException, IOException {
InfluxDB mockInfluxDB = null;
BatchProcessor.builder(mockInfluxDB).actions(1)
BatchProcessor.builder(mockInfluxDB).actions(1).consistencyLevel(InfluxDB.ConsistencyLevel.ONE)
.interval(1, TimeUnit.NANOSECONDS).build();
}

@Test(expected = NullPointerException.class)
public void testConsistencyLevelIsNull() {
InfluxDB mockInfluxDB = mock(InfluxDBImpl.class);
BatchProcessor.builder(mockInfluxDB).actions(1).consistencyLevel(null)
.interval(1, TimeUnit.NANOSECONDS).build();
}
}