From 28e0146a09052e6a4183ffd038bf59306dbd3eef Mon Sep 17 00:00:00 2001 From: Tom Lee Date: Wed, 15 Feb 2017 19:29:15 -0800 Subject: [PATCH 1/3] Add InfluxDb.tryWrite(...) methods --- src/main/java/org/influxdb/InfluxDB.java | 24 ++++++++++ .../org/influxdb/impl/BatchProcessor.java | 25 +++++++--- .../java/org/influxdb/impl/InfluxDBImpl.java | 47 +++++++++++++++++-- .../org/influxdb/impl/BatchProcessorTest.java | 25 +++++++++- 4 files changed, 107 insertions(+), 14 deletions(-) diff --git a/src/main/java/org/influxdb/InfluxDB.java b/src/main/java/org/influxdb/InfluxDB.java index 2d1793dfb..b7f9c91b5 100644 --- a/src/main/java/org/influxdb/InfluxDB.java +++ b/src/main/java/org/influxdb/InfluxDB.java @@ -154,6 +154,18 @@ public InfluxDB enableBatch(final int actions, final int flushDuration, final Ti */ public void write(final String database, final String retentionPolicy, final Point point); + /** + * Try to write a single Point to the database. Useful if batching is enabled with a bounded + * actions size: once the maximum batch size is reached, calls to + * {@link #tryWrite(String, String, Point)} will return false until the batch is + * written to Influx. + * + * @return true if the point was enqueued or written, false otherwise. + * + * @see #write(String, String, Point) + */ + public boolean tryWrite(final String database, final String retentionPolicy, final Point point); + /** * Write a single Point to the database through UDP. * @@ -164,6 +176,18 @@ public InfluxDB enableBatch(final int actions, final int flushDuration, final Ti */ public void write(final int udpPort, final Point point); + /** + * Try to write a single Point to the database through UDP. Useful if batching is enabled with a + * bounded actions size: once the maximum batch size is reached, calls to + * {@link #tryWrite(int, Point)} will return false until the batch is written to + * Influx. + * + * @return true if the point was enqueued or written, false otherwise. + * + * @see #write(int, Point) + */ + public boolean tryWrite(final int udpPort, final Point point); + /** * Write a set of Points to the influxdb database with the new (>= 0.9.0rc32) lineprotocol. * diff --git a/src/main/java/org/influxdb/impl/BatchProcessor.java b/src/main/java/org/influxdb/impl/BatchProcessor.java index e276e6c46..61d2bdda6 100644 --- a/src/main/java/org/influxdb/impl/BatchProcessor.java +++ b/src/main/java/org/influxdb/impl/BatchProcessor.java @@ -248,14 +248,15 @@ void put(final AbstractBatchEntry batchEntry) { } catch (InterruptedException e) { throw new RuntimeException(e); } - if (this.queue.size() >= this.actions) { - this.scheduler.submit(new Runnable() { - @Override - public void run() { - write(); - } - }); + scheduleWriteIfAtCapacity(); + } + + boolean offer(final AbstractBatchEntry batchEntry) { + if (this.queue.offer(batchEntry)) { + scheduleWriteIfAtCapacity(); + return true; } + return false; } /** @@ -268,4 +269,14 @@ void flush() { this.scheduler.shutdown(); } + private void scheduleWriteIfAtCapacity() { + if (this.queue.size() >= this.actions) { + this.scheduler.submit(new Runnable() { + @Override + public void run() { + write(); + } + }); + } + } } diff --git a/src/main/java/org/influxdb/impl/InfluxDBImpl.java b/src/main/java/org/influxdb/impl/InfluxDBImpl.java index 8cc047e40..a4a51b758 100644 --- a/src/main/java/org/influxdb/impl/InfluxDBImpl.java +++ b/src/main/java/org/influxdb/impl/InfluxDBImpl.java @@ -226,15 +226,28 @@ public void write(final String database, final String retentionPolicy, final Poi HttpBatchEntry batchEntry = new HttpBatchEntry(point, database, retentionPolicy); this.batchProcessor.put(batchEntry); } else { - BatchPoints batchPoints = BatchPoints.database(database) - .retentionPolicy(retentionPolicy).build(); - batchPoints.point(point); - this.write(batchPoints); - this.unBatchedCount.incrementAndGet(); + writeDirect(database, retentionPolicy, point); } this.writeCount.incrementAndGet(); } + /** + * {@inheritDoc} + */ + @Override + public boolean tryWrite(String database, String retentionPolicy, Point point) { + boolean written = true; + if (this.batchEnabled.get()) { + HttpBatchEntry batchEntry = new HttpBatchEntry(point, database, retentionPolicy); + written = this.batchProcessor.offer(batchEntry); + } + else { + writeDirect(database, retentionPolicy, point); + } + this.writeCount.incrementAndGet(); + return written; + } + /** * {@inheritDoc} */ @@ -250,6 +263,24 @@ public void write(final int udpPort, final Point point) { this.writeCount.incrementAndGet(); } + /** + * {@inheritDoc} + */ + @Override + public boolean tryWrite(final int udpPort, final Point point) { + boolean written = true; + if (this.batchEnabled.get()) { + UdpBatchEntry batchEntry = new UdpBatchEntry(point, udpPort); + written = this.batchProcessor.offer(batchEntry); + } + else { + this.write(udpPort, point.lineProtocol()); + this.unBatchedCount.incrementAndGet(); + } + this.writeCount.incrementAndGet(); + return written; + } + @Override public void write(final BatchPoints batchPoints) { this.batchedCount.addAndGet(batchPoints.getPoints().size()); @@ -474,4 +505,10 @@ public void close() { } } + private void writeDirect(final String database, final String retentionPolicy, final Point point) { + BatchPoints batchPoints = BatchPoints.database(database).retentionPolicy(retentionPolicy).build(); + batchPoints.point(point); + this.write(batchPoints); + this.unBatchedCount.incrementAndGet(); + } } diff --git a/src/test/java/org/influxdb/impl/BatchProcessorTest.java b/src/test/java/org/influxdb/impl/BatchProcessorTest.java index 039d301f1..1f1597024 100644 --- a/src/test/java/org/influxdb/impl/BatchProcessorTest.java +++ b/src/test/java/org/influxdb/impl/BatchProcessorTest.java @@ -14,6 +14,9 @@ import org.influxdb.dto.Point; import org.junit.Test; +import static org.junit.Assert.*; +import static org.hamcrest.CoreMatchers.*; + public class BatchProcessorTest { @Test @@ -58,20 +61,38 @@ public void testBatchWriteWithDifferenctRp() throws InterruptedException, IOExce verify(mockInfluxDB, times(2)).write(any(BatchPoints.class)); } + @Test + public void testNoBlockOnCapacityExceeded() { + InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); + BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB).actions(2) + .interval(10, TimeUnit.MINUTES).build(); + + Point point = Point.measurement("cpu").addField("4", "").build(); + BatchProcessor.HttpBatchEntry batchEntry1 = new BatchProcessor.HttpBatchEntry(point, "db1", "rp_1"); + BatchProcessor.HttpBatchEntry batchEntry2 = new BatchProcessor.HttpBatchEntry(point, "db1", "rp_2"); + BatchProcessor.HttpBatchEntry batchEntry3 = new BatchProcessor.HttpBatchEntry(point, "db1", "rp_3"); + + assertThat(batchProcessor.offer(batchEntry1), is(true)); + assertThat(batchProcessor.offer(batchEntry2), is(true)); + assertThat(batchProcessor.offer(batchEntry3), is(false)); + + batchProcessor.flush(); + } + @Test(expected = IllegalArgumentException.class) public void testActionsIsZero() throws InterruptedException, IOException { InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); BatchProcessor.builder(mockInfluxDB).actions(0) .interval(1, TimeUnit.NANOSECONDS).build(); } - + @Test(expected = IllegalArgumentException.class) public void testIntervalIsZero() throws InterruptedException, IOException { InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); BatchProcessor.builder(mockInfluxDB).actions(1) .interval(0, TimeUnit.NANOSECONDS).build(); } - + @Test(expected = NullPointerException.class) public void testInfluxDBIsNull() throws InterruptedException, IOException { InfluxDB mockInfluxDB = null; From 671d5ac69b434e9f37158910698463858292658f Mon Sep 17 00:00:00 2001 From: Tom Lee Date: Wed, 15 Feb 2017 19:46:12 -0800 Subject: [PATCH 2/3] Ensure BatchProcessor threads get properly cleaned up in tests --- .../org/influxdb/impl/BatchProcessorTest.java | 92 +++++++++++-------- 1 file changed, 52 insertions(+), 40 deletions(-) diff --git a/src/test/java/org/influxdb/impl/BatchProcessorTest.java b/src/test/java/org/influxdb/impl/BatchProcessorTest.java index 1f1597024..6e41ee32f 100644 --- a/src/test/java/org/influxdb/impl/BatchProcessorTest.java +++ b/src/test/java/org/influxdb/impl/BatchProcessorTest.java @@ -24,23 +24,27 @@ public void testSchedulerExceptionHandling() throws InterruptedException, IOExce InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB).actions(Integer.MAX_VALUE) .interval(1, TimeUnit.NANOSECONDS).build(); - - doThrow(new RuntimeException()).when(mockInfluxDB).write(any(BatchPoints.class)); - - Point point = Point.measurement("cpu").field("6", "").build(); - BatchProcessor.HttpBatchEntry batchEntry1 = new BatchProcessor.HttpBatchEntry(point, "db1", ""); - BatchProcessor.HttpBatchEntry batchEntry2 = new BatchProcessor.HttpBatchEntry(point, "db2", ""); - - batchProcessor.put(batchEntry1); - Thread.sleep(200); // wait for scheduler - - // first try throws an exception - verify(mockInfluxDB, times(1)).write(any(BatchPoints.class)); - - batchProcessor.put(batchEntry2); - Thread.sleep(200); // wait for scheduler - // without try catch the 2nd time does not occur - verify(mockInfluxDB, times(2)).write(any(BatchPoints.class)); + try { + doThrow(new RuntimeException()).when(mockInfluxDB).write(any(BatchPoints.class)); + + Point point = Point.measurement("cpu").field("6", "").build(); + BatchProcessor.HttpBatchEntry batchEntry1 = new BatchProcessor.HttpBatchEntry(point, "db1", ""); + BatchProcessor.HttpBatchEntry batchEntry2 = new BatchProcessor.HttpBatchEntry(point, "db2", ""); + + batchProcessor.put(batchEntry1); + Thread.sleep(200); // wait for scheduler + + // first try throws an exception + verify(mockInfluxDB, times(1)).write(any(BatchPoints.class)); + + batchProcessor.put(batchEntry2); + Thread.sleep(200); // wait for scheduler + // without try catch the 2nd time does not occur + verify(mockInfluxDB, times(2)).write(any(BatchPoints.class)); + } + finally { + batchProcessor.flush(); + } } @Test @@ -49,16 +53,21 @@ public void testBatchWriteWithDifferenctRp() throws InterruptedException, IOExce BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB).actions(Integer.MAX_VALUE) .interval(1, TimeUnit.NANOSECONDS).build(); - Point point = Point.measurement("cpu").field("6", "").build(); - BatchProcessor.HttpBatchEntry batchEntry1 = new BatchProcessor.HttpBatchEntry(point, "db1", "rp_1"); - BatchProcessor.HttpBatchEntry batchEntry2 = new BatchProcessor.HttpBatchEntry(point, "db1", "rp_2"); - - batchProcessor.put(batchEntry1); - batchProcessor.put(batchEntry2); - - Thread.sleep(200); // wait for scheduler - // same dbname with different rp should write two batchs instead of only one. - verify(mockInfluxDB, times(2)).write(any(BatchPoints.class)); + try { + Point point = Point.measurement("cpu").field("6", "").build(); + BatchProcessor.HttpBatchEntry batchEntry1 = new BatchProcessor.HttpBatchEntry(point, "db1", "rp_1"); + BatchProcessor.HttpBatchEntry batchEntry2 = new BatchProcessor.HttpBatchEntry(point, "db1", "rp_2"); + + batchProcessor.put(batchEntry1); + batchProcessor.put(batchEntry2); + + Thread.sleep(200); // wait for scheduler + // same dbname with different rp should write two batchs instead of only one. + verify(mockInfluxDB, times(2)).write(any(BatchPoints.class)); + } + finally { + batchProcessor.flush(); + } } @Test @@ -67,36 +76,39 @@ public void testNoBlockOnCapacityExceeded() { BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB).actions(2) .interval(10, TimeUnit.MINUTES).build(); - Point point = Point.measurement("cpu").addField("4", "").build(); - BatchProcessor.HttpBatchEntry batchEntry1 = new BatchProcessor.HttpBatchEntry(point, "db1", "rp_1"); - BatchProcessor.HttpBatchEntry batchEntry2 = new BatchProcessor.HttpBatchEntry(point, "db1", "rp_2"); - BatchProcessor.HttpBatchEntry batchEntry3 = new BatchProcessor.HttpBatchEntry(point, "db1", "rp_3"); - - assertThat(batchProcessor.offer(batchEntry1), is(true)); - assertThat(batchProcessor.offer(batchEntry2), is(true)); - assertThat(batchProcessor.offer(batchEntry3), is(false)); - - batchProcessor.flush(); + try { + Point point = Point.measurement("cpu").addField("4", "").build(); + BatchProcessor.HttpBatchEntry batchEntry1 = new BatchProcessor.HttpBatchEntry(point, "db1", "rp_1"); + BatchProcessor.HttpBatchEntry batchEntry2 = new BatchProcessor.HttpBatchEntry(point, "db1", "rp_2"); + BatchProcessor.HttpBatchEntry batchEntry3 = new BatchProcessor.HttpBatchEntry(point, "db1", "rp_3"); + + assertThat(batchProcessor.offer(batchEntry1), is(true)); + assertThat(batchProcessor.offer(batchEntry2), is(true)); + assertThat(batchProcessor.offer(batchEntry3), is(false)); + } + finally { + batchProcessor.flush(); + } } @Test(expected = IllegalArgumentException.class) public void testActionsIsZero() throws InterruptedException, IOException { InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); BatchProcessor.builder(mockInfluxDB).actions(0) - .interval(1, TimeUnit.NANOSECONDS).build(); + .interval(1, TimeUnit.NANOSECONDS).build().flush(); } @Test(expected = IllegalArgumentException.class) public void testIntervalIsZero() throws InterruptedException, IOException { InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); BatchProcessor.builder(mockInfluxDB).actions(1) - .interval(0, TimeUnit.NANOSECONDS).build(); + .interval(0, TimeUnit.NANOSECONDS).build().flush(); } @Test(expected = NullPointerException.class) public void testInfluxDBIsNull() throws InterruptedException, IOException { InfluxDB mockInfluxDB = null; BatchProcessor.builder(mockInfluxDB).actions(1) - .interval(1, TimeUnit.NANOSECONDS).build(); + .interval(1, TimeUnit.NANOSECONDS).build().flush(); } } From 54c824df4e8122822513465679ddfeed9bd3bd55 Mon Sep 17 00:00:00 2001 From: Tom Lee Date: Wed, 15 Feb 2017 20:04:17 -0800 Subject: [PATCH 3/3] Keep checkstyle happy --- src/main/java/org/influxdb/impl/InfluxDBImpl.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/influxdb/impl/InfluxDBImpl.java b/src/main/java/org/influxdb/impl/InfluxDBImpl.java index a4a51b758..ee8062bb9 100644 --- a/src/main/java/org/influxdb/impl/InfluxDBImpl.java +++ b/src/main/java/org/influxdb/impl/InfluxDBImpl.java @@ -235,13 +235,12 @@ public void write(final String database, final String retentionPolicy, final Poi * {@inheritDoc} */ @Override - public boolean tryWrite(String database, String retentionPolicy, Point point) { + public boolean tryWrite(final String database, final String retentionPolicy, final Point point) { boolean written = true; if (this.batchEnabled.get()) { HttpBatchEntry batchEntry = new HttpBatchEntry(point, database, retentionPolicy); written = this.batchProcessor.offer(batchEntry); - } - else { + } else { writeDirect(database, retentionPolicy, point); } this.writeCount.incrementAndGet(); @@ -272,8 +271,7 @@ public boolean tryWrite(final int udpPort, final Point point) { if (this.batchEnabled.get()) { UdpBatchEntry batchEntry = new UdpBatchEntry(point, udpPort); written = this.batchProcessor.offer(batchEntry); - } - else { + } else { this.write(udpPort, point.lineProtocol()); this.unBatchedCount.incrementAndGet(); }