Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 76347c0

Browse files
committedFeb 28, 2017
Write consistency can be set for batching mode.
1 parent 30fa0ee commit 76347c0

File tree

6 files changed

+114
-24
lines changed

6 files changed

+114
-24
lines changed
 

‎CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
- Update slf4j from 1.7.22 to 1.7.24
1717
- Update okhttp3 from 3.5 to 3.6
1818
- automatically adjust batch processor capacity [PR #282]
19+
- [Issue #289] (https://github.com/influxdata/influxdb-java/issues/289) Batching enhancements: Consistency Level may be specified when batching is enabled.
1920

2021
## v2.5 [2016-12-05]
2122

‎src/main/java/org/influxdb/InfluxDB.java

+18-3
Original file line numberDiff line numberDiff line change
@@ -94,13 +94,23 @@ public String value() {
9494
public boolean isGzipEnabled();
9595

9696
/**
97-
* Enable batching of single Point writes as {@link #enableBatch(int, int, TimeUnit, ThreadFactory)}}
98-
* using {@linkplain java.util.concurrent.Executors#defaultThreadFactory() default thread factory}.
97+
* Enable batching of single Point writes as {@link #enableBatch(int, int, TimeUnit, ConsistencyLevel, ThreadFactory)}
98+
* with a default consistency level of {@link ConsistencyLevel#ONE ONE} using
99+
* {@linkplain java.util.concurrent.Executors#defaultThreadFactory() default thread factory}.
99100
*
100-
* @see #enableBatch(int, int, TimeUnit, ThreadFactory)
101+
* @see #enableBatch(int, int, TimeUnit, ConsistencyLevel, ThreadFactory)
101102
*/
102103
public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit);
103104

105+
/**
106+
* Enable batching of single Point writes as {@link #enableBatch(int, int, TimeUnit, ConsistencyLevel, ThreadFactory)}
107+
* using {@linkplain java.util.concurrent.Executors#defaultThreadFactory() default thread factory}.
108+
*
109+
* @see #enableBatch(int, int, TimeUnit, ConsistencyLevel, ThreadFactory)
110+
*/
111+
public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit,
112+
final ConsistencyLevel consistencyLevel);
113+
104114
/**
105115
* Enable batching of single Point writes to speed up writes significant. If either actions or
106116
* flushDurations is reached first, a batch write is issued.
@@ -112,10 +122,15 @@ public String value() {
112122
* @param flushDuration
113123
* the time to wait at most.
114124
* @param flushDurationTimeUnit
125+
* the unit the flush duration is measured in.
126+
* @param consistencyLevel
127+
* The write consistency level to use when writing batched points.
115128
* @param threadFactory
129+
* The thread factory to use when creating new threads to handle asynchronous writes.
116130
* @return the InfluxDB instance to be able to use it in a fluent manner.
117131
*/
118132
public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit,
133+
final ConsistencyLevel consistencyLevel,
119134
final ThreadFactory threadFactory);
120135

121136
/**

‎src/main/java/org/influxdb/impl/BatchProcessor.java

+20-2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public class BatchProcessor {
3636
final int actions;
3737
private final TimeUnit flushIntervalUnit;
3838
private final int flushInterval;
39+
private final InfluxDB.ConsistencyLevel consistencyLevel;
3940

4041
/**
4142
* The Builder to create a BatchProcessor instance.
@@ -46,6 +47,7 @@ public static final class Builder {
4647
private int actions;
4748
private TimeUnit flushIntervalUnit;
4849
private int flushInterval;
50+
private InfluxDB.ConsistencyLevel consistencyLevel;
4951

5052
/**
5153
* @param threadFactory
@@ -92,6 +94,18 @@ public Builder interval(final int interval, final TimeUnit unit) {
9294
return this;
9395
}
9496

97+
/**
98+
* Set the consistency level writes should use.
99+
*
100+
* @param consistencyLevel
101+
* The consistency level.
102+
* @return this Builder to use it fluent
103+
*/
104+
public Builder consistencyLevel(final InfluxDB.ConsistencyLevel consistencyLevel) {
105+
this.consistencyLevel = consistencyLevel;
106+
return this;
107+
}
108+
95109
/**
96110
* Create the BatchProcessor.
97111
*
@@ -102,9 +116,10 @@ public BatchProcessor build() {
102116
Preconditions.checkArgument(this.actions > 0, "actions should > 0");
103117
Preconditions.checkArgument(this.flushInterval > 0, "flushInterval should > 0");
104118
Preconditions.checkNotNull(this.flushIntervalUnit, "flushIntervalUnit may not be null");
119+
Preconditions.checkNotNull(this.consistencyLevel, "consistencyLevel must not be null");
105120
Preconditions.checkNotNull(this.threadFactory, "threadFactory may not be null");
106121
return new BatchProcessor(this.influxDB, this.threadFactory, this.actions, this.flushIntervalUnit,
107-
this.flushInterval);
122+
this.flushInterval, this.consistencyLevel);
108123
}
109124
}
110125

@@ -164,12 +179,14 @@ public static Builder builder(final InfluxDB influxDB) {
164179
}
165180

166181
BatchProcessor(final InfluxDBImpl influxDB, final ThreadFactory threadFactory, final int actions,
167-
final TimeUnit flushIntervalUnit, final int flushInterval) {
182+
final TimeUnit flushIntervalUnit, final int flushInterval,
183+
final InfluxDB.ConsistencyLevel consistencyLevel) {
168184
super();
169185
this.influxDB = influxDB;
170186
this.actions = actions;
171187
this.flushIntervalUnit = flushIntervalUnit;
172188
this.flushInterval = flushInterval;
189+
this.consistencyLevel = consistencyLevel;
173190
this.scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory);
174191
if (actions > 1 && actions < Integer.MAX_VALUE) {
175192
this.queue = new LinkedBlockingQueue<>(actions);
@@ -207,6 +224,7 @@ void write() {
207224
String batchKey = dbName + "_" + rp;
208225
if (!batchKeyToBatchPoints.containsKey(batchKey)) {
209226
BatchPoints batchPoints = BatchPoints.database(dbName)
227+
.consistency(consistencyLevel)
210228
.retentionPolicy(rp).build();
211229
batchKeyToBatchPoints.put(batchKey, batchPoints);
212230
}

‎src/main/java/org/influxdb/impl/InfluxDBImpl.java

+20-3
Original file line numberDiff line numberDiff line change
@@ -150,23 +150,40 @@ public boolean isGzipEnabled() {
150150
return this.gzipRequestInterceptor.isEnabled();
151151
}
152152

153+
/**
154+
* {@inheritDoc}
155+
*/
153156
@Override
154157
public InfluxDB enableBatch(final int actions, final int flushDuration,
155158
final TimeUnit flushDurationTimeUnit) {
156-
enableBatch(actions, flushDuration, flushDurationTimeUnit, Executors.defaultThreadFactory());
159+
enableBatch(actions, flushDuration, flushDurationTimeUnit, ConsistencyLevel.ONE);
157160
return this;
158161
}
159162

163+
/**
164+
* {@inheritDoc}
165+
*/
160166
@Override
161-
public InfluxDB enableBatch(final int actions, final int flushDuration,
162-
final TimeUnit flushDurationTimeUnit, final ThreadFactory threadFactory) {
167+
public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit,
168+
final ConsistencyLevel consistencyLevel) {
169+
enableBatch(actions, flushDuration, flushDurationTimeUnit, consistencyLevel, Executors.defaultThreadFactory());
170+
return this;
171+
}
172+
173+
/**
174+
* {@inheritDoc}
175+
*/
176+
@Override
177+
public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit,
178+
final ConsistencyLevel consistencyLevel, final ThreadFactory threadFactory) {
163179
if (this.batchEnabled.get()) {
164180
throw new IllegalStateException("BatchProcessing is already enabled.");
165181
}
166182
this.batchProcessor = BatchProcessor
167183
.builder(this)
168184
.actions(actions)
169185
.interval(flushDuration, flushDurationTimeUnit)
186+
.consistencyLevel(consistencyLevel)
170187
.threadFactory(threadFactory)
171188
.build();
172189
this.batchEnabled.set(true);

‎src/test/java/org/influxdb/InfluxDBTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -414,12 +414,12 @@ public void testIsBatchEnabled() {
414414
}
415415

416416
/**
417-
* Test the implementation of {@link InfluxDB#enableBatch(int, int, TimeUnit, ThreadFactory)}.
417+
* Test the implementation of {@link InfluxDB#enableBatch(int, int, TimeUnit, InfluxDB.ConsistencyLevel, ThreadFactory)}.
418418
*/
419419
@Test
420420
public void testBatchEnabledWithThreadFactory() {
421421
final String threadName = "async_influxdb_write";
422-
this.influxDB.enableBatch(1, 1, TimeUnit.SECONDS, new ThreadFactory() {
422+
this.influxDB.enableBatch(1, 1, TimeUnit.SECONDS, InfluxDB.ConsistencyLevel.ONE, new ThreadFactory() {
423423

424424
@Override
425425
public Thread newThread(Runnable r) {

‎src/test/java/org/influxdb/impl/BatchProcessorTest.java

+53-14
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,31 @@
11
package org.influxdb.impl;
22

3+
import org.influxdb.InfluxDB;
4+
import org.influxdb.dto.BatchPoints;
5+
import org.influxdb.dto.Point;
6+
import org.junit.Test;
7+
import org.mockito.ArgumentCaptor;
8+
9+
import java.io.IOException;
10+
import java.util.concurrent.TimeUnit;
11+
12+
import static org.assertj.core.api.Assertions.assertThat;
313
import static org.mockito.Mockito.any;
414
import static org.mockito.Mockito.doThrow;
515
import static org.mockito.Mockito.mock;
616
import static org.mockito.Mockito.times;
717
import static org.mockito.Mockito.verify;
818
import static org.mockito.Mockito.verifyNoMoreInteractions;
919

10-
import java.io.IOException;
11-
import java.util.concurrent.TimeUnit;
12-
13-
import org.influxdb.InfluxDB;
14-
import org.influxdb.dto.BatchPoints;
15-
import org.influxdb.dto.Point;
16-
import org.junit.Test;
17-
1820
public class BatchProcessorTest {
1921

2022
@Test
2123
public void testSchedulerExceptionHandling() throws InterruptedException, IOException {
2224
InfluxDB mockInfluxDB = mock(InfluxDBImpl.class);
23-
BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB).actions(Integer.MAX_VALUE)
24-
.interval(1, TimeUnit.NANOSECONDS).build();
25+
BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB)
26+
.actions(Integer.MAX_VALUE)
27+
.consistencyLevel(InfluxDB.ConsistencyLevel.ONE)
28+
.interval(1, TimeUnit.NANOSECONDS).build();
2529

2630
doThrow(new RuntimeException()).when(mockInfluxDB).write(any(BatchPoints.class));
2731

@@ -44,7 +48,9 @@ public void testSchedulerExceptionHandling() throws InterruptedException, IOExce
4448
@Test
4549
public void testBatchWriteWithDifferenctRp() throws InterruptedException, IOException {
4650
InfluxDB mockInfluxDB = mock(InfluxDBImpl.class);
47-
BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB).actions(Integer.MAX_VALUE)
51+
BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB)
52+
.actions(Integer.MAX_VALUE)
53+
.consistencyLevel(InfluxDB.ConsistencyLevel.ONE)
4854
.interval(1, TimeUnit.NANOSECONDS).build();
4955

5056
Point point = Point.measurement("cpu").field("6", "").build();
@@ -59,11 +65,37 @@ public void testBatchWriteWithDifferenctRp() throws InterruptedException, IOExce
5965
verify(mockInfluxDB, times(2)).write(any(BatchPoints.class));
6066
}
6167

68+
@Test
69+
public void testConsistencyLevelIsHonored() {
70+
InfluxDB.ConsistencyLevel desiredConsistencyLevel = InfluxDB.ConsistencyLevel.QUORUM;
71+
72+
InfluxDB mockInfluxDB = mock(InfluxDBImpl.class);
73+
BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB)
74+
.actions(Integer.MAX_VALUE)
75+
.consistencyLevel(desiredConsistencyLevel)
76+
.interval(1, TimeUnit.DAYS).build();
77+
78+
Point point = Point.measurement("test").addField("region", "a").build();
79+
BatchProcessor.HttpBatchEntry httpBatchEntry = new BatchProcessor.HttpBatchEntry(point, "http", "http-rp");
80+
81+
batchProcessor.put(httpBatchEntry);
82+
83+
batchProcessor.flush();
84+
85+
ArgumentCaptor<BatchPoints> batchPoints = ArgumentCaptor.forClass(BatchPoints.class);
86+
87+
verify(mockInfluxDB, times(1)).write(batchPoints.capture());
88+
89+
assertThat(batchPoints.getAllValues()).hasSize(1);
90+
assertThat(batchPoints.getValue().getConsistency()).isEqualTo(desiredConsistencyLevel);
91+
}
92+
6293
@Test
6394
public void testFlushWritesBufferedPointsAndDoesNotShutdownScheduler() throws InterruptedException {
6495
InfluxDB mockInfluxDB = mock(InfluxDBImpl.class);
6596
BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB)
6697
.actions(Integer.MAX_VALUE)
98+
.consistencyLevel(InfluxDB.ConsistencyLevel.ONE)
6799
.interval(1, TimeUnit.NANOSECONDS).build();
68100

69101
Point point = Point.measurement("test").addField("region", "a").build();
@@ -88,21 +120,28 @@ public void testFlushWritesBufferedPointsAndDoesNotShutdownScheduler() throws In
88120
@Test(expected = IllegalArgumentException.class)
89121
public void testActionsIsZero() throws InterruptedException, IOException {
90122
InfluxDB mockInfluxDB = mock(InfluxDBImpl.class);
91-
BatchProcessor.builder(mockInfluxDB).actions(0)
123+
BatchProcessor.builder(mockInfluxDB).actions(0).consistencyLevel(InfluxDB.ConsistencyLevel.ONE)
92124
.interval(1, TimeUnit.NANOSECONDS).build();
93125
}
94126

95127
@Test(expected = IllegalArgumentException.class)
96128
public void testIntervalIsZero() throws InterruptedException, IOException {
97129
InfluxDB mockInfluxDB = mock(InfluxDBImpl.class);
98-
BatchProcessor.builder(mockInfluxDB).actions(1)
130+
BatchProcessor.builder(mockInfluxDB).actions(1).consistencyLevel(InfluxDB.ConsistencyLevel.ONE)
99131
.interval(0, TimeUnit.NANOSECONDS).build();
100132
}
101133

102134
@Test(expected = NullPointerException.class)
103135
public void testInfluxDBIsNull() throws InterruptedException, IOException {
104136
InfluxDB mockInfluxDB = null;
105-
BatchProcessor.builder(mockInfluxDB).actions(1)
137+
BatchProcessor.builder(mockInfluxDB).actions(1).consistencyLevel(InfluxDB.ConsistencyLevel.ONE)
106138
.interval(1, TimeUnit.NANOSECONDS).build();
107139
}
140+
141+
@Test(expected = NullPointerException.class)
142+
public void testConsistencyLevelIsNull() {
143+
InfluxDB mockInfluxDB = mock(InfluxDBImpl.class);
144+
BatchProcessor.builder(mockInfluxDB).actions(1).consistencyLevel(null)
145+
.interval(1, TimeUnit.NANOSECONDS).build();
146+
}
108147
}

0 commit comments

Comments
 (0)
Please sign in to comment.