From 76347c0fc48cdb60ba69b2984ba72b9bedc79aec Mon Sep 17 00:00:00 2001 From: Jordan Ganoff Date: Mon, 27 Feb 2017 22:45:50 -0800 Subject: [PATCH] Write consistency can be set for batching mode. --- CHANGELOG.md | 1 + src/main/java/org/influxdb/InfluxDB.java | 21 +++++- .../org/influxdb/impl/BatchProcessor.java | 22 +++++- .../java/org/influxdb/impl/InfluxDBImpl.java | 23 ++++++- src/test/java/org/influxdb/InfluxDBTest.java | 4 +- .../org/influxdb/impl/BatchProcessorTest.java | 67 +++++++++++++++---- 6 files changed, 114 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 88d5978fc..4a9f358d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ - Update slf4j from 1.7.22 to 1.7.24 - Update okhttp3 from 3.5 to 3.6 - automatically adjust batch processor capacity [PR #282] + - [Issue #289] (https://github.com/influxdata/influxdb-java/issues/289) Batching enhancements: Consistency Level may be specified when batching is enabled. ## v2.5 [2016-12-05] diff --git a/src/main/java/org/influxdb/InfluxDB.java b/src/main/java/org/influxdb/InfluxDB.java index 698a470b1..dc96b1b8c 100644 --- a/src/main/java/org/influxdb/InfluxDB.java +++ b/src/main/java/org/influxdb/InfluxDB.java @@ -94,13 +94,23 @@ public String value() { public boolean isGzipEnabled(); /** - * Enable batching of single Point writes as {@link #enableBatch(int, int, TimeUnit, ThreadFactory)}} - * using {@linkplain java.util.concurrent.Executors#defaultThreadFactory() default thread factory}. + * Enable batching of single Point writes as {@link #enableBatch(int, int, TimeUnit, ConsistencyLevel, ThreadFactory)} + * with a default consistency level of {@link ConsistencyLevel#ONE ONE} using + * {@linkplain java.util.concurrent.Executors#defaultThreadFactory() default thread factory}. * - * @see #enableBatch(int, int, TimeUnit, ThreadFactory) + * @see #enableBatch(int, int, TimeUnit, ConsistencyLevel, ThreadFactory) */ public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit); + /** + * Enable batching of single Point writes as {@link #enableBatch(int, int, TimeUnit, ConsistencyLevel, ThreadFactory)} + * using {@linkplain java.util.concurrent.Executors#defaultThreadFactory() default thread factory}. + * + * @see #enableBatch(int, int, TimeUnit, ConsistencyLevel, ThreadFactory) + */ + public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit, + final ConsistencyLevel consistencyLevel); + /** * Enable batching of single Point writes to speed up writes significant. If either actions or * flushDurations is reached first, a batch write is issued. @@ -112,10 +122,15 @@ public String value() { * @param flushDuration * the time to wait at most. * @param flushDurationTimeUnit + * the unit the flush duration is measured in. + * @param consistencyLevel + * The write consistency level to use when writing batched points. * @param threadFactory + * The thread factory to use when creating new threads to handle asynchronous writes. * @return the InfluxDB instance to be able to use it in a fluent manner. */ public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit, + final ConsistencyLevel consistencyLevel, final ThreadFactory threadFactory); /** diff --git a/src/main/java/org/influxdb/impl/BatchProcessor.java b/src/main/java/org/influxdb/impl/BatchProcessor.java index 0ad0d0ae9..1921e16f0 100644 --- a/src/main/java/org/influxdb/impl/BatchProcessor.java +++ b/src/main/java/org/influxdb/impl/BatchProcessor.java @@ -36,6 +36,7 @@ public class BatchProcessor { final int actions; private final TimeUnit flushIntervalUnit; private final int flushInterval; + private final InfluxDB.ConsistencyLevel consistencyLevel; /** * The Builder to create a BatchProcessor instance. @@ -46,6 +47,7 @@ public static final class Builder { private int actions; private TimeUnit flushIntervalUnit; private int flushInterval; + private InfluxDB.ConsistencyLevel consistencyLevel; /** * @param threadFactory @@ -92,6 +94,18 @@ public Builder interval(final int interval, final TimeUnit unit) { return this; } + /** + * Set the consistency level writes should use. + * + * @param consistencyLevel + * The consistency level. + * @return this Builder to use it fluent + */ + public Builder consistencyLevel(final InfluxDB.ConsistencyLevel consistencyLevel) { + this.consistencyLevel = consistencyLevel; + return this; + } + /** * Create the BatchProcessor. * @@ -102,9 +116,10 @@ public BatchProcessor build() { Preconditions.checkArgument(this.actions > 0, "actions should > 0"); Preconditions.checkArgument(this.flushInterval > 0, "flushInterval should > 0"); Preconditions.checkNotNull(this.flushIntervalUnit, "flushIntervalUnit may not be null"); + Preconditions.checkNotNull(this.consistencyLevel, "consistencyLevel must not be null"); Preconditions.checkNotNull(this.threadFactory, "threadFactory may not be null"); return new BatchProcessor(this.influxDB, this.threadFactory, this.actions, this.flushIntervalUnit, - this.flushInterval); + this.flushInterval, this.consistencyLevel); } } @@ -164,12 +179,14 @@ public static Builder builder(final InfluxDB influxDB) { } BatchProcessor(final InfluxDBImpl influxDB, final ThreadFactory threadFactory, final int actions, - final TimeUnit flushIntervalUnit, final int flushInterval) { + final TimeUnit flushIntervalUnit, final int flushInterval, + final InfluxDB.ConsistencyLevel consistencyLevel) { super(); this.influxDB = influxDB; this.actions = actions; this.flushIntervalUnit = flushIntervalUnit; this.flushInterval = flushInterval; + this.consistencyLevel = consistencyLevel; this.scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory); if (actions > 1 && actions < Integer.MAX_VALUE) { this.queue = new LinkedBlockingQueue<>(actions); @@ -207,6 +224,7 @@ void write() { String batchKey = dbName + "_" + rp; if (!batchKeyToBatchPoints.containsKey(batchKey)) { BatchPoints batchPoints = BatchPoints.database(dbName) + .consistency(consistencyLevel) .retentionPolicy(rp).build(); batchKeyToBatchPoints.put(batchKey, batchPoints); } diff --git a/src/main/java/org/influxdb/impl/InfluxDBImpl.java b/src/main/java/org/influxdb/impl/InfluxDBImpl.java index 85e3d5d4a..4c8f9760c 100644 --- a/src/main/java/org/influxdb/impl/InfluxDBImpl.java +++ b/src/main/java/org/influxdb/impl/InfluxDBImpl.java @@ -150,16 +150,32 @@ public boolean isGzipEnabled() { return this.gzipRequestInterceptor.isEnabled(); } + /** + * {@inheritDoc} + */ @Override public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit) { - enableBatch(actions, flushDuration, flushDurationTimeUnit, Executors.defaultThreadFactory()); + enableBatch(actions, flushDuration, flushDurationTimeUnit, ConsistencyLevel.ONE); return this; } + /** + * {@inheritDoc} + */ @Override - public InfluxDB enableBatch(final int actions, final int flushDuration, - final TimeUnit flushDurationTimeUnit, final ThreadFactory threadFactory) { + public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit, + final ConsistencyLevel consistencyLevel) { + enableBatch(actions, flushDuration, flushDurationTimeUnit, consistencyLevel, Executors.defaultThreadFactory()); + return this; + } + + /** + * {@inheritDoc} + */ + @Override + public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit, + final ConsistencyLevel consistencyLevel, final ThreadFactory threadFactory) { if (this.batchEnabled.get()) { throw new IllegalStateException("BatchProcessing is already enabled."); } @@ -167,6 +183,7 @@ public InfluxDB enableBatch(final int actions, final int flushDuration, .builder(this) .actions(actions) .interval(flushDuration, flushDurationTimeUnit) + .consistencyLevel(consistencyLevel) .threadFactory(threadFactory) .build(); this.batchEnabled.set(true); diff --git a/src/test/java/org/influxdb/InfluxDBTest.java b/src/test/java/org/influxdb/InfluxDBTest.java index 116f42552..52d23ed7e 100644 --- a/src/test/java/org/influxdb/InfluxDBTest.java +++ b/src/test/java/org/influxdb/InfluxDBTest.java @@ -414,12 +414,12 @@ public void testIsBatchEnabled() { } /** - * Test the implementation of {@link InfluxDB#enableBatch(int, int, TimeUnit, ThreadFactory)}. + * Test the implementation of {@link InfluxDB#enableBatch(int, int, TimeUnit, InfluxDB.ConsistencyLevel, ThreadFactory)}. */ @Test public void testBatchEnabledWithThreadFactory() { final String threadName = "async_influxdb_write"; - this.influxDB.enableBatch(1, 1, TimeUnit.SECONDS, new ThreadFactory() { + this.influxDB.enableBatch(1, 1, TimeUnit.SECONDS, InfluxDB.ConsistencyLevel.ONE, new ThreadFactory() { @Override public Thread newThread(Runnable r) { diff --git a/src/test/java/org/influxdb/impl/BatchProcessorTest.java b/src/test/java/org/influxdb/impl/BatchProcessorTest.java index 82a2e8de4..833e2de2b 100644 --- a/src/test/java/org/influxdb/impl/BatchProcessorTest.java +++ b/src/test/java/org/influxdb/impl/BatchProcessorTest.java @@ -1,5 +1,15 @@ package org.influxdb.impl; +import org.influxdb.InfluxDB; +import org.influxdb.dto.BatchPoints; +import org.influxdb.dto.Point; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -7,21 +17,15 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; -import java.io.IOException; -import java.util.concurrent.TimeUnit; - -import org.influxdb.InfluxDB; -import org.influxdb.dto.BatchPoints; -import org.influxdb.dto.Point; -import org.junit.Test; - public class BatchProcessorTest { @Test public void testSchedulerExceptionHandling() throws InterruptedException, IOException { InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); - BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB).actions(Integer.MAX_VALUE) - .interval(1, TimeUnit.NANOSECONDS).build(); + BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB) + .actions(Integer.MAX_VALUE) + .consistencyLevel(InfluxDB.ConsistencyLevel.ONE) + .interval(1, TimeUnit.NANOSECONDS).build(); doThrow(new RuntimeException()).when(mockInfluxDB).write(any(BatchPoints.class)); @@ -44,7 +48,9 @@ public void testSchedulerExceptionHandling() throws InterruptedException, IOExce @Test public void testBatchWriteWithDifferenctRp() throws InterruptedException, IOException { InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); - BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB).actions(Integer.MAX_VALUE) + BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB) + .actions(Integer.MAX_VALUE) + .consistencyLevel(InfluxDB.ConsistencyLevel.ONE) .interval(1, TimeUnit.NANOSECONDS).build(); Point point = Point.measurement("cpu").field("6", "").build(); @@ -59,11 +65,37 @@ public void testBatchWriteWithDifferenctRp() throws InterruptedException, IOExce verify(mockInfluxDB, times(2)).write(any(BatchPoints.class)); } + @Test + public void testConsistencyLevelIsHonored() { + InfluxDB.ConsistencyLevel desiredConsistencyLevel = InfluxDB.ConsistencyLevel.QUORUM; + + InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); + BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB) + .actions(Integer.MAX_VALUE) + .consistencyLevel(desiredConsistencyLevel) + .interval(1, TimeUnit.DAYS).build(); + + Point point = Point.measurement("test").addField("region", "a").build(); + BatchProcessor.HttpBatchEntry httpBatchEntry = new BatchProcessor.HttpBatchEntry(point, "http", "http-rp"); + + batchProcessor.put(httpBatchEntry); + + batchProcessor.flush(); + + ArgumentCaptor batchPoints = ArgumentCaptor.forClass(BatchPoints.class); + + verify(mockInfluxDB, times(1)).write(batchPoints.capture()); + + assertThat(batchPoints.getAllValues()).hasSize(1); + assertThat(batchPoints.getValue().getConsistency()).isEqualTo(desiredConsistencyLevel); + } + @Test public void testFlushWritesBufferedPointsAndDoesNotShutdownScheduler() throws InterruptedException { InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB) .actions(Integer.MAX_VALUE) + .consistencyLevel(InfluxDB.ConsistencyLevel.ONE) .interval(1, TimeUnit.NANOSECONDS).build(); Point point = Point.measurement("test").addField("region", "a").build(); @@ -88,21 +120,28 @@ public void testFlushWritesBufferedPointsAndDoesNotShutdownScheduler() throws In @Test(expected = IllegalArgumentException.class) public void testActionsIsZero() throws InterruptedException, IOException { InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); - BatchProcessor.builder(mockInfluxDB).actions(0) + BatchProcessor.builder(mockInfluxDB).actions(0).consistencyLevel(InfluxDB.ConsistencyLevel.ONE) .interval(1, TimeUnit.NANOSECONDS).build(); } @Test(expected = IllegalArgumentException.class) public void testIntervalIsZero() throws InterruptedException, IOException { InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); - BatchProcessor.builder(mockInfluxDB).actions(1) + BatchProcessor.builder(mockInfluxDB).actions(1).consistencyLevel(InfluxDB.ConsistencyLevel.ONE) .interval(0, TimeUnit.NANOSECONDS).build(); } @Test(expected = NullPointerException.class) public void testInfluxDBIsNull() throws InterruptedException, IOException { InfluxDB mockInfluxDB = null; - BatchProcessor.builder(mockInfluxDB).actions(1) + BatchProcessor.builder(mockInfluxDB).actions(1).consistencyLevel(InfluxDB.ConsistencyLevel.ONE) .interval(1, TimeUnit.NANOSECONDS).build(); } + + @Test(expected = NullPointerException.class) + public void testConsistencyLevelIsNull() { + InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); + BatchProcessor.builder(mockInfluxDB).actions(1).consistencyLevel(null) + .interval(1, TimeUnit.NANOSECONDS).build(); + } }