From b4aeaa368d3ebf519eb55c4f58e80a0eff277ccb Mon Sep 17 00:00:00 2001 From: Hoan Xuan Le Date: Sun, 26 Aug 2018 17:41:36 +0700 Subject: [PATCH] imlement issue #439 : Retry capability for write(final BatchPoints batchPoints) as well --- .../org/influxdb/impl/BatchProcessor.java | 3 + .../java/org/influxdb/impl/InfluxDBImpl.java | 10 +- .../org/influxdb/impl/OneShotBatchWriter.java | 7 +- .../impl/RetryCapableBatchWriter.java | 7 +- .../java/org/influxdb/BatchOptionsTest.java | 255 ++---------- src/test/java/org/influxdb/TestAnswer.java | 2 +- .../impl/BatchOptionsRetryCapibilityTest.java | 384 ++++++++++++++++++ .../org/influxdb/impl/BatchProcessorTest.java | 20 +- .../impl/RetryCapableBatchWriterTest.java | 47 ++- 9 files changed, 472 insertions(+), 263 deletions(-) create mode 100644 src/test/java/org/influxdb/impl/BatchOptionsRetryCapibilityTest.java diff --git a/src/main/java/org/influxdb/impl/BatchProcessor.java b/src/main/java/org/influxdb/impl/BatchProcessor.java index b0f153c98..84d0a657b 100644 --- a/src/main/java/org/influxdb/impl/BatchProcessor.java +++ b/src/main/java/org/influxdb/impl/BatchProcessor.java @@ -376,4 +376,7 @@ public ConsistencyLevel getConsistencyLevel() { return consistencyLevel; } + BatchWriter getBatchWriter() { + return batchWriter; + } } diff --git a/src/main/java/org/influxdb/impl/InfluxDBImpl.java b/src/main/java/org/influxdb/impl/InfluxDBImpl.java index a2f1019b6..1a1046d88 100644 --- a/src/main/java/org/influxdb/impl/InfluxDBImpl.java +++ b/src/main/java/org/influxdb/impl/InfluxDBImpl.java @@ -46,6 +46,7 @@ import java.net.UnknownHostException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -415,6 +416,14 @@ public void write(final int udpPort, final Point point) { @Override public void write(final BatchPoints batchPoints) { + if (isBatchEnabled()) { + batchProcessor.getBatchWriter().write(Collections.singleton(batchPoints)); + } else { + writeNoRetry(batchPoints); + } + } + + void writeNoRetry(final BatchPoints batchPoints) { this.batchedCount.add(batchPoints.getPoints().size()); RequestBody lineProtocol = RequestBody.create(MEDIA_TYPE_STRING, batchPoints.lineProtocol()); execute(this.influxDBService.writePoints( @@ -425,7 +434,6 @@ public void write(final BatchPoints batchPoints) { lineProtocol)); } - @Override public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistency, final TimeUnit precision, final String records) { diff --git a/src/main/java/org/influxdb/impl/OneShotBatchWriter.java b/src/main/java/org/influxdb/impl/OneShotBatchWriter.java index 96754f144..956ee6572 100644 --- a/src/main/java/org/influxdb/impl/OneShotBatchWriter.java +++ b/src/main/java/org/influxdb/impl/OneShotBatchWriter.java @@ -1,6 +1,5 @@ package org.influxdb.impl; -import org.influxdb.InfluxDB; import org.influxdb.dto.BatchPoints; import java.util.Collection; @@ -10,16 +9,16 @@ */ class OneShotBatchWriter implements BatchWriter { - private InfluxDB influxDB; + private InfluxDBImpl influxDB; - OneShotBatchWriter(final InfluxDB influxDB) { + OneShotBatchWriter(final InfluxDBImpl influxDB) { this.influxDB = influxDB; } @Override public void write(final Collection batchPointsCollection) { for (BatchPoints batchPoints : batchPointsCollection) { - influxDB.write(batchPoints); + influxDB.writeNoRetry(batchPoints); } } diff --git a/src/main/java/org/influxdb/impl/RetryCapableBatchWriter.java b/src/main/java/org/influxdb/impl/RetryCapableBatchWriter.java index 141b215de..5f3f14e40 100644 --- a/src/main/java/org/influxdb/impl/RetryCapableBatchWriter.java +++ b/src/main/java/org/influxdb/impl/RetryCapableBatchWriter.java @@ -1,6 +1,5 @@ package org.influxdb.impl; -import org.influxdb.InfluxDB; import org.influxdb.InfluxDBException; import org.influxdb.dto.BatchPoints; import org.influxdb.dto.Point; @@ -18,14 +17,14 @@ */ class RetryCapableBatchWriter implements BatchWriter { - private InfluxDB influxDB; + private InfluxDBImpl influxDB; private BiConsumer, Throwable> exceptionHandler; private LinkedList batchQueue; private int requestActionsLimit; private int retryBufferCapacity; private int usedRetryBufferCapacity; - RetryCapableBatchWriter(final InfluxDB influxDB, final BiConsumer, Throwable> exceptionHandler, + RetryCapableBatchWriter(final InfluxDBImpl influxDB, final BiConsumer, Throwable> exceptionHandler, final int retryBufferCapacity, final int requestActionsLimit) { this.influxDB = influxDB; this.exceptionHandler = exceptionHandler; @@ -124,7 +123,7 @@ public synchronized void close() { private WriteResult tryToWrite(final BatchPoints batchPoints) { try { - influxDB.write(batchPoints); + influxDB.writeNoRetry(batchPoints); return WriteResult.WRITTEN; } catch (InfluxDBException e) { return new WriteResult(e); diff --git a/src/test/java/org/influxdb/BatchOptionsTest.java b/src/test/java/org/influxdb/BatchOptionsTest.java index 9ae453756..dc954a886 100644 --- a/src/test/java/org/influxdb/BatchOptionsTest.java +++ b/src/test/java/org/influxdb/BatchOptionsTest.java @@ -1,8 +1,7 @@ package org.influxdb; import org.influxdb.InfluxDB.ConsistencyLevel; -import org.influxdb.InfluxDBException.DatabaseNotFoundException; -import org.influxdb.dto.BatchPoints; +import org.influxdb.InfluxDB.ResponseFormat; import org.influxdb.dto.Point; import org.influxdb.dto.Query; import org.influxdb.dto.QueryResult; @@ -11,13 +10,17 @@ import org.junit.jupiter.api.Test; import org.junit.platform.runner.JUnitPlatform; import org.junit.runner.RunWith; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; + +import okhttp3.HttpUrl; +import okhttp3.Interceptor; +import okhttp3.OkHttpClient; import static org.mockito.Mockito.*; import java.io.IOException; import java.text.MessageFormat; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -176,8 +179,6 @@ public void testJitterDuration() throws InterruptedException { influxDB.disableBatch(); influxDB.deleteDatabase(dbName); } - - } /** @@ -192,121 +193,7 @@ public void testNegativeJitterDuration() { influxDB.disableBatch(); }); } - - /** - * Test the implementation of {@link BatchOptions#bufferLimit(int)} }. - * use a bufferLimit that less than actions, then OneShotBatchWrite is used - */ - @Test - public void testBufferLimitLessThanActions() throws InterruptedException { - - TestAnswer answer = new TestAnswer() { - - InfluxDBException influxDBException = InfluxDBException.buildExceptionForErrorState(createErrorBody(InfluxDBException.CACHE_MAX_MEMORY_SIZE_EXCEEDED_ERROR)); - @Override - protected void check(InvocationOnMock invocation) { - if ((Boolean) params.get("throwException")) { - throw influxDBException; - } - } - }; - - InfluxDB spy = spy(influxDB); - //the spied influxDB.write(BatchPoints) will always throw InfluxDBException - doAnswer(answer).when(spy).write(any(BatchPoints.class)); - - String dbName = "write_unittest_" + System.currentTimeMillis(); - try { - answer.params.put("throwException", true); - BiConsumer, Throwable> mockHandler = mock(BiConsumer.class); - BatchOptions options = BatchOptions.DEFAULTS.bufferLimit(3).actions(4).flushDuration(100).exceptionHandler(mockHandler); - spy.createDatabase(dbName); - spy.setDatabase(dbName); - spy.enableBatch(options); - write20Points(spy); - - Thread.sleep(300); - verify(mockHandler, atLeastOnce()).accept(any(), any()); - - QueryResult result = spy.query(new Query("select * from weather", dbName)); - //assert 0 point written because of InfluxDBException and OneShotBatchWriter did not retry - Assertions.assertNull(result.getResults().get(0).getSeries()); - Assertions.assertNull(result.getResults().get(0).getError()); - - answer.params.put("throwException", false); - write20Points(spy); - Thread.sleep(300); - result = spy.query(new Query("select * from weather", dbName)); - //assert all 20 points written to DB due to no exception - Assertions.assertEquals(20, result.getResults().get(0).getSeries().get(0).getValues().size()); - } - finally { - spy.disableBatch(); - spy.deleteDatabase(dbName); - } - - } - - /** - * Test the implementation of {@link BatchOptions#bufferLimit(int)} }. - * use a bufferLimit that greater than actions, then RetryCapableBatchWriter is used - */ - @Test - public void testBufferLimitGreaterThanActions() throws InterruptedException { - TestAnswer answer = new TestAnswer() { - - int nthCall = 0; - InfluxDBException cacheMaxMemorySizeExceededException = InfluxDBException.buildExceptionForErrorState(createErrorBody(InfluxDBException.CACHE_MAX_MEMORY_SIZE_EXCEEDED_ERROR)); - @Override - protected void check(InvocationOnMock invocation) { - - switch (nthCall++) { - case 0: - throw InfluxDBException.buildExceptionForErrorState(createErrorBody(InfluxDBException.DATABASE_NOT_FOUND_ERROR)); - case 1: - throw InfluxDBException.buildExceptionForErrorState(createErrorBody(InfluxDBException.CACHE_MAX_MEMORY_SIZE_EXCEEDED_ERROR)); - default: - break; - } - } - }; - - InfluxDB spy = spy(influxDB); - doAnswer(answer).when(spy).write(any(BatchPoints.class)); - - String dbName = "write_unittest_" + System.currentTimeMillis(); - try { - BiConsumer, Throwable> mockHandler = mock(BiConsumer.class); - BatchOptions options = BatchOptions.DEFAULTS.bufferLimit(10).actions(8).flushDuration(100).exceptionHandler(mockHandler); - - spy.createDatabase(dbName); - spy.setDatabase(dbName); - spy.enableBatch(options); - writeSomePoints(spy, "measurement1", 0, 5); - - Thread.sleep(300); - verify(mockHandler, atLeastOnce()).accept(any(), any()); - - QueryResult result = spy.query(new Query("select * from measurement1", dbName)); - //assert 0 point written because of non-retry capable DATABASE_NOT_FOUND_ERROR and RetryCapableBatchWriter did not retry - Assertions.assertNull(result.getResults().get(0).getSeries()); - Assertions.assertNull(result.getResults().get(0).getError()); - - writeSomePoints(spy, "measurement2", 0, 5); - - Thread.sleep(300); - - result = spy.query(new Query("select * from measurement2", dbName)); - //assert all 6 point written because of retry capable CACHE_MAX_MEMORY_SIZE_EXCEEDED_ERROR and RetryCapableBatchWriter did retry - Assertions.assertEquals(6, result.getResults().get(0).getSeries().get(0).getValues().size()); - } - finally { - spy.disableBatch(); - spy.deleteDatabase(dbName); - } - - } /** * Test the implementation of {@link BatchOptions#bufferLimit(int)} }. */ @@ -357,131 +244,47 @@ public Thread newThread(Runnable r) { } - /** - * Test the implementation of {@link BatchOptions#exceptionHandler(BiConsumer)} }. - * @throws InterruptedException - */ - @Test - public void testHandlerOnRetryImpossible() throws InterruptedException { - - String dbName = "write_unittest_" + System.currentTimeMillis(); - InfluxDB spy = spy(influxDB); - doThrow(DatabaseNotFoundException.class).when(spy).write(any(BatchPoints.class)); - - try { - BiConsumer, Throwable> mockHandler = mock(BiConsumer.class); - BatchOptions options = BatchOptions.DEFAULTS.exceptionHandler(mockHandler).flushDuration(100); - spy.createDatabase(dbName); - spy.setDatabase(dbName); - spy.enableBatch(options); - - writeSomePoints(spy, 1); - - Thread.sleep(200); - verify(mockHandler, times(1)).accept(any(), any()); - - QueryResult result = influxDB.query(new Query("select * from weather", dbName)); - Assertions.assertNull(result.getResults().get(0).getSeries()); - Assertions.assertNull(result.getResults().get(0).getError()); - } finally { - spy.disableBatch(); - spy.deleteDatabase(dbName); - } - - } - - /** - * Test the implementation of {@link BatchOptions#exceptionHandler(BiConsumer)} }. - * @throws InterruptedException - */ - @Test - public void testHandlerOnRetryPossible() throws InterruptedException { - - String dbName = "write_unittest_" + System.currentTimeMillis(); - InfluxDB spy = spy(influxDB); - doAnswer(new Answer() { - boolean firstCall = true; - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - if (firstCall) { - firstCall = false; - throw new InfluxDBException("error"); - } else { - return invocation.callRealMethod(); - } - } - }).when(spy).write(any(BatchPoints.class)); - - try { - BiConsumer, Throwable> mockHandler = mock(BiConsumer.class); - BatchOptions options = BatchOptions.DEFAULTS.exceptionHandler(mockHandler).flushDuration(100); - - spy.createDatabase(dbName); - spy.setDatabase(dbName); - spy.enableBatch(options); - - writeSomePoints(spy, 1); - - Thread.sleep(500); - verify(mockHandler, never()).accept(any(), any()); - - verify(spy, times(2)).write(any(BatchPoints.class)); - - QueryResult result = influxDB.query(new Query("select * from weather", dbName)); - Assertions.assertNotNull(result.getResults().get(0).getSeries()); - Assertions.assertEquals(1, result.getResults().get(0).getSeries().get(0).getValues().size()); - - } finally { - spy.disableBatch(); - spy.deleteDatabase(dbName); - } - - } /** * Test the implementation of {@link BatchOptions#consistency(InfluxDB.ConsistencyLevel)} }. * @throws InterruptedException + * @throws IOException */ @Test - public void testConsistency() throws InterruptedException { + public void testConsistency() throws InterruptedException, IOException { String dbName = "write_unittest_" + System.currentTimeMillis(); - - InfluxDB spy = spy(influxDB); - spy.createDatabase(dbName); - spy.setDatabase(dbName); + final Map params = new HashMap<>(); + InfluxDB influxDB = createInterceptedInfluxDb(chain -> { + HttpUrl url = chain.request().url(); + if ("/write".equals(url.encodedPath())) { + String consistencyLevel = url.queryParameter("consistency"); + Assertions.assertEquals(params.get("consistencyLevel").toString(), consistencyLevel.toUpperCase()); + } + return chain.proceed(chain.request()); + }); + influxDB.createDatabase(dbName); + influxDB.setDatabase(dbName); try { - TestAnswer answer = new TestAnswer() { - @Override - protected void check(InvocationOnMock invocation) { - BatchPoints batchPoints = (BatchPoints) invocation.getArgument(0); - Assertions.assertEquals(params.get("consistencyLevel"), batchPoints.getConsistency()); - - } - }; - doAnswer(answer).when(spy).write(any(BatchPoints.class)); - int n = 0; for (final ConsistencyLevel consistencyLevel : ConsistencyLevel.values()) { - answer.params.put("consistencyLevel", consistencyLevel); + params.put("consistencyLevel", consistencyLevel); BatchOptions options = BatchOptions.DEFAULTS.consistency(consistencyLevel).flushDuration(100); - spy.enableBatch(options); + influxDB.enableBatch(options); Assertions.assertEquals(options.getConsistency(), consistencyLevel); - writeSomePoints(spy, n, n + 4); + writeSomePoints(influxDB, n, n + 4); n += 5; Thread.sleep(300); - verify(spy, atLeastOnce()).write(any(BatchPoints.class)); - QueryResult result = spy.query(new Query("select * from weather", dbName)); + QueryResult result = influxDB.query(new Query("select * from weather", dbName)); Assertions.assertEquals(n, result.getResults().get(0).getSeries().get(0).getValues().size()); - - spy.disableBatch(); + influxDB.disableBatch(); } } finally { - spy.deleteDatabase(dbName); + influxDB.deleteDatabase(dbName); } } @@ -518,4 +321,10 @@ void writeSomePoints(InfluxDB influxDB, int n) { static String createErrorBody(String errorMessage) { return MessageFormat.format("'{' \"error\": \"{0}\" '}'", errorMessage); } + + private InfluxDB createInterceptedInfluxDb(Interceptor interceptor) throws InterruptedException, IOException { + OkHttpClient.Builder client = new OkHttpClient.Builder(); + client.addInterceptor(interceptor); + return TestUtils.connectToInfluxDB(client, null, ResponseFormat.JSON); + } } diff --git a/src/test/java/org/influxdb/TestAnswer.java b/src/test/java/org/influxdb/TestAnswer.java index 8b0a2cd41..ae11e5f5c 100644 --- a/src/test/java/org/influxdb/TestAnswer.java +++ b/src/test/java/org/influxdb/TestAnswer.java @@ -9,7 +9,7 @@ public abstract class TestAnswer implements Answer { - Map params = new HashMap<>(); + public Map params = new HashMap<>(); protected abstract void check(InvocationOnMock invocation); diff --git a/src/test/java/org/influxdb/impl/BatchOptionsRetryCapibilityTest.java b/src/test/java/org/influxdb/impl/BatchOptionsRetryCapibilityTest.java new file mode 100644 index 000000000..f5747b584 --- /dev/null +++ b/src/test/java/org/influxdb/impl/BatchOptionsRetryCapibilityTest.java @@ -0,0 +1,384 @@ +package org.influxdb.impl; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.text.MessageFormat; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; + +import org.influxdb.BatchOptions; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBException; +import org.influxdb.TestAnswer; +import org.influxdb.TestUtils; +import org.influxdb.InfluxDBException.DatabaseNotFoundException; +import org.influxdb.dto.BatchPoints; +import org.influxdb.dto.Point; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.platform.runner.JUnitPlatform; +import org.junit.runner.RunWith; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * Test cases of BatchOptions on a failure retry-capable InfluxDB Implementation + * + * @author hoan.le [at] bonitoo.io + * + */ + +@RunWith(JUnitPlatform.class) +public class BatchOptionsRetryCapibilityTest { + + private InfluxDB influxDB; + + @BeforeEach + public void setUp() throws InterruptedException, IOException { + this.influxDB = TestUtils.connectToInfluxDB(); + } + /** + * Test the implementation of {@link BatchOptions#bufferLimit(int)} }. use a + * bufferLimit that less than actions, then OneShotBatchWrite is used + */ + @Test + public void testBufferLimitLessThanActions() throws InterruptedException { + + TestAnswer answer = new TestAnswer() { + + InfluxDBException influxDBException = InfluxDBException + .buildExceptionForErrorState(createErrorBody("cache-max-memory-size exceeded")); + + @Override + protected void check(InvocationOnMock invocation) { + if ((Boolean) params.get("throwException")) { + throw influxDBException; + } + } + }; + + InfluxDBImpl spy = spy((InfluxDBImpl) influxDB); + // the spied influxDBImpl.write(BatchPoints, boolean) will always throw + // InfluxDBException + doAnswer(answer).when(spy).writeNoRetry(any(BatchPoints.class)); + + String dbName = "write_unittest_" + System.currentTimeMillis(); + try { + answer.params.put("throwException", true); + BiConsumer, Throwable> mockHandler = mock(BiConsumer.class); + BatchOptions options = BatchOptions.DEFAULTS.bufferLimit(3).actions(4).flushDuration(100) + .exceptionHandler(mockHandler); + + spy.createDatabase(dbName); + spy.setDatabase(dbName); + spy.enableBatch(options); + write20Points(spy); + + Thread.sleep(300); + verify(mockHandler, atLeastOnce()).accept(any(), any()); + + QueryResult result = spy.query(new Query("select * from weather", dbName)); + // assert 0 point written because of InfluxDBException and + // OneShotBatchWriter did not retry + Assertions.assertNull(result.getResults().get(0).getSeries()); + Assertions.assertNull(result.getResults().get(0).getError()); + + answer.params.put("throwException", false); + write20Points(spy); + Thread.sleep(300); + result = spy.query(new Query("select * from weather", dbName)); + // assert all 20 points written to DB due to no exception + Assertions.assertEquals(20, result.getResults().get(0).getSeries().get(0).getValues().size()); + } finally { + spy.disableBatch(); + spy.deleteDatabase(dbName); + } + + } + + /** + * Test the implementation of {@link BatchOptions#bufferLimit(int)} }. use a + * bufferLimit that greater than actions, then RetryCapableBatchWriter is used + */ + @Test + public void testBufferLimitGreaterThanActions() throws InterruptedException { + TestAnswer answer = new TestAnswer() { + + int nthCall = 0; + InfluxDBException cacheMaxMemorySizeExceededException = InfluxDBException + .buildExceptionForErrorState(createErrorBody("cache-max-memory-size exceeded")); + + @Override + protected void check(InvocationOnMock invocation) { + + switch (nthCall++) { + case 0: + throw InfluxDBException + .buildExceptionForErrorState(createErrorBody("database not found")); + case 1: + throw InfluxDBException + .buildExceptionForErrorState(createErrorBody("cache-max-memory-size exceeded")); + default: + break; + } + } + }; + + InfluxDBImpl spy = spy((InfluxDBImpl) influxDB); + doAnswer(answer).when(spy).writeNoRetry(any(BatchPoints.class)); + + String dbName = "write_unittest_" + System.currentTimeMillis(); + try { + BiConsumer, Throwable> mockHandler = mock(BiConsumer.class); + BatchOptions options = BatchOptions.DEFAULTS.bufferLimit(10).actions(8).flushDuration(100) + .exceptionHandler(mockHandler); + + spy.createDatabase(dbName); + spy.setDatabase(dbName); + spy.enableBatch(options); + writeSomePoints(spy, "measurement1", 0, 5); + + Thread.sleep(300); + verify(mockHandler, atLeastOnce()).accept(any(), any()); + + QueryResult result = spy.query(new Query("select * from measurement1", dbName)); + // assert 0 point written because of non-retry capable + // DATABASE_NOT_FOUND_ERROR and RetryCapableBatchWriter did not retry + Assertions.assertNull(result.getResults().get(0).getSeries()); + Assertions.assertNull(result.getResults().get(0).getError()); + + writeSomePoints(spy, "measurement2", 0, 5); + + Thread.sleep(300); + + result = spy.query(new Query("select * from measurement2", dbName)); + // assert all 6 point written because of retry capable + // CACHE_MAX_MEMORY_SIZE_EXCEEDED_ERROR and RetryCapableBatchWriter did + // retry + Assertions.assertEquals(6, result.getResults().get(0).getSeries().get(0).getValues().size()); + } finally { + spy.disableBatch(); + spy.deleteDatabase(dbName); + } + + } + + /** + * Test the implementation of + * {@link BatchOptions#exceptionHandler(BiConsumer)} }. + * + * @throws InterruptedException + */ + @Test + public void testHandlerOnRetryImpossible() throws InterruptedException { + + String dbName = "write_unittest_" + System.currentTimeMillis(); + InfluxDBImpl spy = spy((InfluxDBImpl) influxDB); + doThrow(DatabaseNotFoundException.class).when(spy).writeNoRetry(any(BatchPoints.class)); + + try { + BiConsumer, Throwable> mockHandler = mock(BiConsumer.class); + BatchOptions options = BatchOptions.DEFAULTS.exceptionHandler(mockHandler).flushDuration(100); + + spy.createDatabase(dbName); + spy.setDatabase(dbName); + spy.enableBatch(options); + + writeSomePoints(spy, 1); + + Thread.sleep(200); + verify(mockHandler, times(1)).accept(any(), any()); + + QueryResult result = influxDB.query(new Query("select * from weather", dbName)); + Assertions.assertNull(result.getResults().get(0).getSeries()); + Assertions.assertNull(result.getResults().get(0).getError()); + } finally { + spy.disableBatch(); + spy.deleteDatabase(dbName); + } + + } + + /** + * Test the implementation of + * {@link BatchOptions#exceptionHandler(BiConsumer)} }. + * + * @throws InterruptedException + */ + @Test + public void testHandlerOnRetryPossible() throws InterruptedException { + + String dbName = "write_unittest_" + System.currentTimeMillis(); + InfluxDBImpl spy = spy((InfluxDBImpl) influxDB); + doAnswer(new Answer() { + boolean firstCall = true; + + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + if (firstCall) { + firstCall = false; + throw new InfluxDBException("error"); + } else { + return invocation.callRealMethod(); + } + } + }).when(spy).writeNoRetry(any(BatchPoints.class)); + + try { + BiConsumer, Throwable> mockHandler = mock(BiConsumer.class); + BatchOptions options = BatchOptions.DEFAULTS.exceptionHandler(mockHandler).flushDuration(100); + + spy.createDatabase(dbName); + spy.setDatabase(dbName); + spy.enableBatch(options); + + writeSomePoints(spy, 1); + + Thread.sleep(500); + verify(mockHandler, never()).accept(any(), any()); + + verify(spy, times(2)).writeNoRetry(any(BatchPoints.class)); + + QueryResult result = influxDB.query(new Query("select * from weather", dbName)); + Assertions.assertNotNull(result.getResults().get(0).getSeries()); + Assertions.assertEquals(1, result.getResults().get(0).getSeries().get(0).getValues().size()); + + } finally { + spy.disableBatch(); + spy.deleteDatabase(dbName); + } + } + + @Test + public void testWriteBatchOnRetryPossible() throws InterruptedException { + String dbName = "write_unittest_" + System.currentTimeMillis(); + InfluxDBImpl spy = spy((InfluxDBImpl) influxDB); + doAnswer(new Answer() { + boolean firstCall = true; + + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + if (firstCall) { + firstCall = false; + throw new InfluxDBException("error"); + } else { + return invocation.callRealMethod(); + } + } + }).when(spy).writeNoRetry(any(BatchPoints.class)); + try { + BiConsumer, Throwable> mockHandler = mock(BiConsumer.class); + BatchOptions options = BatchOptions.DEFAULTS.exceptionHandler(mockHandler).flushDuration(100); + + spy.createDatabase(dbName); + spy.setDatabase(dbName); + spy.enableBatch(options); + + BatchPoints batchPoints = createBatchPoints(dbName, "m0", 200); + spy.write(batchPoints); + Thread.sleep(500); + verify(mockHandler, never()).accept(any(), any()); + + verify(spy, times(2)).writeNoRetry(any(BatchPoints.class)); + + QueryResult result = influxDB.query(new Query("select * from m0", dbName)); + Assertions.assertNotNull(result.getResults().get(0).getSeries()); + Assertions.assertEquals(200, result.getResults().get(0).getSeries().get(0).getValues().size()); + + } finally { + spy.disableBatch(); + spy.deleteDatabase(dbName); + } + } + + @Test + public void testWriteBatchOnRetryImPossible() throws InterruptedException { + + String dbName = "write_unittest_" + System.currentTimeMillis(); + InfluxDBImpl spy = spy((InfluxDBImpl) influxDB); + doThrow(DatabaseNotFoundException.class).when(spy).writeNoRetry(any(BatchPoints.class)); + + try { + BiConsumer, Throwable> mockHandler = mock(BiConsumer.class); + BatchOptions options = BatchOptions.DEFAULTS.exceptionHandler(mockHandler).flushDuration(100); + + spy.createDatabase(dbName); + spy.setDatabase(dbName); + spy.enableBatch(options); + + BatchPoints batchPoints = createBatchPoints(dbName, "m0", 200); + spy.write(batchPoints); + Thread.sleep(500); + + verify(mockHandler, times(1)).accept(any(), any()); + + QueryResult result = influxDB.query(new Query("select * from weather", dbName)); + Assertions.assertNull(result.getResults().get(0).getSeries()); + Assertions.assertNull(result.getResults().get(0).getError()); + } finally { + spy.disableBatch(); + spy.deleteDatabase(dbName); + } + + } + + private void writeSomePoints(InfluxDB influxDB, String measurement, int firstIndex, int lastIndex) { + for (int i = firstIndex; i <= lastIndex; i++) { + Point point = Point.measurement(measurement) + .time(i,TimeUnit.HOURS) + .addField("field1", (double) i) + .addField("field2", (double) (i) * 1.1) + .addField("field3", "moderate").build(); + influxDB.write(point); + } + } + + private void writeSomePoints(InfluxDB influxDB, int firstIndex, int lastIndex) { + for (int i = firstIndex; i <= lastIndex; i++) { + Point point = Point.measurement("weather") + .time(i,TimeUnit.HOURS) + .addField("temperature", (double) i) + .addField("humidity", (double) (i) * 1.1) + .addField("uv_index", "moderate").build(); + influxDB.write(point); + } + } + + private void write20Points(InfluxDB influxDB) { + writeSomePoints(influxDB, 0, 19); + } + + private void writeSomePoints(InfluxDB influxDB, int n) { + writeSomePoints(influxDB, 0, n - 1); + } + + private BatchPoints createBatchPoints(String dbName, String measurement, int n) { + BatchPoints batchPoints = BatchPoints.database(dbName).build(); + for (int i = 1; i <= n; i++) { + Point point = Point.measurement(measurement) + .time(i,TimeUnit.MILLISECONDS) + .addField("f1", (double) i) + .addField("f2", (double) (i) * 1.1) + .addField("f3", "f_v3").build(); + batchPoints.point(point); + } + + return batchPoints; + } + + private static String createErrorBody(String errorMessage) { + return MessageFormat.format("'{' \"error\": \"{0}\" '}'", errorMessage); + } +} diff --git a/src/test/java/org/influxdb/impl/BatchProcessorTest.java b/src/test/java/org/influxdb/impl/BatchProcessorTest.java index 8a17245f0..44af75c78 100644 --- a/src/test/java/org/influxdb/impl/BatchProcessorTest.java +++ b/src/test/java/org/influxdb/impl/BatchProcessorTest.java @@ -31,7 +31,7 @@ public class BatchProcessorTest { @Test public void testSchedulerExceptionHandling() throws InterruptedException, IOException { - InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); + InfluxDBImpl mockInfluxDB = mock(InfluxDBImpl.class); BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB).actions(Integer.MAX_VALUE) .interval(1, TimeUnit.NANOSECONDS).build(); @@ -45,22 +45,22 @@ public void testSchedulerExceptionHandling() throws InterruptedException, IOExce Thread.sleep(200); // wait for scheduler // first try throws an exception - verify(mockInfluxDB, times(1)).write(any(BatchPoints.class)); + verify(mockInfluxDB, times(1)).writeNoRetry(any(BatchPoints.class)); batchProcessor.put(batchEntry2); Thread.sleep(200); // wait for scheduler // without try catch the 2nd time does not occur - verify(mockInfluxDB, times(2)).write(any(BatchPoints.class)); + verify(mockInfluxDB, times(2)).writeNoRetry(any(BatchPoints.class)); } @Test public void testSchedulerExceptionHandlingCallback() throws InterruptedException, IOException { - InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); + InfluxDBImpl mockInfluxDB = mock(InfluxDBImpl.class); BiConsumer, Throwable> mockHandler = mock(BiConsumer.class); BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB).actions(Integer.MAX_VALUE) .interval(1, TimeUnit.NANOSECONDS).exceptionHandler(mockHandler).build(); - doThrow(new RuntimeException()).when(mockInfluxDB).write(any(BatchPoints.class)); + doThrow(new RuntimeException()).when(mockInfluxDB).writeNoRetry(any(BatchPoints.class)); Point point = Point.measurement("cpu").field("6", "").build(); BatchProcessor.HttpBatchEntry batchEntry1 = new BatchProcessor.HttpBatchEntry(point, "db1", ""); @@ -74,7 +74,7 @@ public void testSchedulerExceptionHandlingCallback() throws InterruptedException @Test public void testBatchWriteWithDifferenctRp() throws InterruptedException, IOException { - InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); + InfluxDBImpl mockInfluxDB = mock(InfluxDBImpl.class); BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB).actions(Integer.MAX_VALUE) .interval(1, TimeUnit.NANOSECONDS).build(); @@ -87,12 +87,12 @@ public void testBatchWriteWithDifferenctRp() throws InterruptedException, IOExce Thread.sleep(200); // wait for scheduler // same dbname with different rp should write two batchs instead of only one. - verify(mockInfluxDB, times(2)).write(any(BatchPoints.class)); + verify(mockInfluxDB, times(2)).writeNoRetry(any(BatchPoints.class)); } @Test public void testFlushWritesBufferedPointsAndDoesNotShutdownScheduler() throws InterruptedException { - InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); + InfluxDBImpl mockInfluxDB = mock(InfluxDBImpl.class); BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB) .actions(Integer.MAX_VALUE) .interval(1, TimeUnit.NANOSECONDS).build(); @@ -103,7 +103,7 @@ public void testFlushWritesBufferedPointsAndDoesNotShutdownScheduler() throws In batchProcessor.put(httpBatchEntry); Thread.sleep(100); // wait for scheduler // Our put should have been written - verify(mockInfluxDB).write(any(BatchPoints.class)); + verify(mockInfluxDB).writeNoRetry(any(BatchPoints.class)); // Force a flush which should not stop the scheduler batchProcessor.flush(); @@ -111,7 +111,7 @@ public void testFlushWritesBufferedPointsAndDoesNotShutdownScheduler() throws In batchProcessor.put(httpBatchEntry); Thread.sleep(100); // wait for scheduler // Our second put should have been written if the scheduler is still running - verify(mockInfluxDB, times(2)).write(any(BatchPoints.class)); + verify(mockInfluxDB, times(2)).writeNoRetry(any(BatchPoints.class)); verifyNoMoreInteractions(mockInfluxDB); } diff --git a/src/test/java/org/influxdb/impl/RetryCapableBatchWriterTest.java b/src/test/java/org/influxdb/impl/RetryCapableBatchWriterTest.java index 189f4d6d1..d3b5c1346 100644 --- a/src/test/java/org/influxdb/impl/RetryCapableBatchWriterTest.java +++ b/src/test/java/org/influxdb/impl/RetryCapableBatchWriterTest.java @@ -1,6 +1,5 @@ package org.influxdb.impl; -import org.influxdb.InfluxDB; import org.influxdb.InfluxDBException; import org.influxdb.TestAnswer; import org.influxdb.dto.BatchPoints; @@ -37,7 +36,7 @@ BatchPoints getBP(int count) { @Test public void test() { - InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); + InfluxDBImpl mockInfluxDB = mock(InfluxDBImpl.class); BiConsumer errorHandler = mock(BiConsumer.class); RetryCapableBatchWriter rw = new RetryCapableBatchWriter(mockInfluxDB, errorHandler, 150, 100); @@ -49,10 +48,10 @@ public void test() { Exception nonRecoverable = InfluxDBException.buildExceptionForErrorState("{ \"error\": \"database not found: cvfdgf\" }"); Exception recoverable = InfluxDBException.buildExceptionForErrorState("{ \"error\": \"cache-max-memory-size exceeded 104/1400\" }"); - Mockito.doThrow(nonRecoverable).when(mockInfluxDB).write(bp0); - Mockito.doThrow(recoverable).when(mockInfluxDB).write(bp1); - Mockito.doThrow(recoverable).when(mockInfluxDB).write(bp2); - Mockito.doThrow(recoverable).when(mockInfluxDB).write(bp3); + Mockito.doThrow(nonRecoverable).when(mockInfluxDB).writeNoRetry(bp0); + Mockito.doThrow(recoverable).when(mockInfluxDB).writeNoRetry(bp1); + Mockito.doThrow(recoverable).when(mockInfluxDB).writeNoRetry(bp2); + Mockito.doThrow(recoverable).when(mockInfluxDB).writeNoRetry(bp3); // first one will fail with non-recoverable error rw.write(Collections.singletonList(bp0)); // second one will fail with recoverable error @@ -63,7 +62,7 @@ public void test() { rw.write(Collections.singletonList(bp3)); ArgumentCaptor captor = ArgumentCaptor.forClass(BatchPoints.class); - verify(mockInfluxDB, times(4)).write(captor.capture()); + verify(mockInfluxDB, times(4)).writeNoRetry(captor.capture()); final List capturedArgument1 = captor.getAllValues(); for (BatchPoints b : capturedArgument1) { System.out.println("batchSize written " + b.getPoints().size()); @@ -82,7 +81,7 @@ public void test() { rw.write(Collections.singletonList(bp4)); ArgumentCaptor captor2 = ArgumentCaptor.forClass(BatchPoints.class); - verify(mockInfluxDB, times(2)).write(captor2.capture()); + verify(mockInfluxDB, times(2)).writeNoRetry(captor2.capture()); final List capturedArgument2 = captor2.getAllValues(); for (BatchPoints b : capturedArgument2) { System.out.println("batchSize written " + b.getPoints().size()); @@ -95,7 +94,7 @@ public void test() { @Test public void testAllNonRecoverableExceptions() { - InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); + InfluxDBImpl mockInfluxDB = mock(InfluxDBImpl.class); BiConsumer errorHandler = mock(BiConsumer.class); RetryCapableBatchWriter rw = new RetryCapableBatchWriter(mockInfluxDB, errorHandler, 150, 100); @@ -111,7 +110,7 @@ public void testAllNonRecoverableExceptions() { List exceptions = Arrays.asList(nonRecoverable1, nonRecoverable2, nonRecoverable3, nonRecoverable4, nonRecoverable5, nonRecoverable6, nonRecoverable7); int size = exceptions.size(); - doAnswer(new TestAnswer() { + doAnswer(new NullTestAnswer() { int i = 0; @Override protected void check(InvocationOnMock invocation) { @@ -119,7 +118,7 @@ protected void check(InvocationOnMock invocation) { throw exceptions.get(i++); } } - }).when(mockInfluxDB).write(any(BatchPoints.class)); + }).when(mockInfluxDB).writeNoRetry(any(BatchPoints.class)); BatchPoints bp = getBP(8); for (int i = 0; i < size; i++) { @@ -130,14 +129,14 @@ protected void check(InvocationOnMock invocation) { @Test public void testClosingWriter() { - InfluxDB mockInfluxDB = mock(InfluxDB.class); + InfluxDBImpl mockInfluxDB = mock(InfluxDBImpl.class); BiConsumer, Throwable> errorHandler = mock(BiConsumer.class); BatchPoints bp5 = getBP(5); BatchPoints bp6 = getBP(6); BatchPoints bp90 = getBP(90); - doAnswer(new TestAnswer() { + doAnswer(new NullTestAnswer() { int i = 0; @Override protected void check(InvocationOnMock invocation) { @@ -147,7 +146,7 @@ protected void check(InvocationOnMock invocation) { } return; } - }).when(mockInfluxDB).write(any(BatchPoints.class)); + }).when(mockInfluxDB).writeNoRetry(any(BatchPoints.class)); RetryCapableBatchWriter rw = new RetryCapableBatchWriter(mockInfluxDB, errorHandler, 150, 100); @@ -157,14 +156,14 @@ protected void check(InvocationOnMock invocation) { rw.write(Collections.singletonList(bp90)); //recoverable exception -> never errorHandler verify(errorHandler, never()).accept(any(), any()); - verify(mockInfluxDB, times(3)).write(any(BatchPoints.class)); + verify(mockInfluxDB, times(3)).writeNoRetry(any(BatchPoints.class)); rw.close(); ArgumentCaptor captor4Write = ArgumentCaptor.forClass(BatchPoints.class); ArgumentCaptor> captor4Accept = ArgumentCaptor.forClass(List.class); verify(errorHandler, times(1)).accept(captor4Accept.capture(), any()); - verify(mockInfluxDB, times(5)).write(captor4Write.capture()); + verify(mockInfluxDB, times(5)).writeNoRetry(captor4Write.capture()); //bp5 and bp6 were merged and writing of the merged batch points on closing should be failed Assertions.assertEquals(11, captor4Accept.getValue().size()); @@ -197,11 +196,11 @@ public void testRetryingKeepChronologicalOrder() { } BatchPoints bp2 = b.build(); - InfluxDB mockInfluxDB = mock(InfluxDB.class); + InfluxDBImpl mockInfluxDB = mock(InfluxDBImpl.class); BiConsumer, Throwable> errorHandler = mock(BiConsumer.class); RetryCapableBatchWriter rw = new RetryCapableBatchWriter(mockInfluxDB, errorHandler, 450, 150); - doAnswer(new TestAnswer() { + doAnswer(new NullTestAnswer() { int i = 0; @Override protected void check(InvocationOnMock invocation) { @@ -210,13 +209,13 @@ protected void check(InvocationOnMock invocation) { } return; } - }).when(mockInfluxDB).write(any(BatchPoints.class)); + }).when(mockInfluxDB).writeNoRetry(any(BatchPoints.class)); rw.write(Collections.singletonList(bp1)); rw.write(Collections.singletonList(bp2)); ArgumentCaptor captor4Write = ArgumentCaptor.forClass(BatchPoints.class); - verify(mockInfluxDB, times(3)).write(captor4Write.capture()); + verify(mockInfluxDB, times(3)).writeNoRetry(captor4Write.capture()); //bp1 written but failed because of recoverable cache-max-memory-size error Assertions.assertEquals(bp1, captor4Write.getAllValues().get(0)); @@ -229,4 +228,12 @@ protected void check(InvocationOnMock invocation) { private static String createErrorBody(String errorMessage) { return MessageFormat.format("'{' \"error\": \"{0}\" '}'", errorMessage); } + + private abstract class NullTestAnswer extends TestAnswer { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + check(invocation); + return null; + } + } }