diff --git a/src/main/java/org/influxdb/InfluxDB.java b/src/main/java/org/influxdb/InfluxDB.java index 2d1793dfb..b7f9c91b5 100644 --- a/src/main/java/org/influxdb/InfluxDB.java +++ b/src/main/java/org/influxdb/InfluxDB.java @@ -154,6 +154,18 @@ public InfluxDB enableBatch(final int actions, final int flushDuration, final Ti */ public void write(final String database, final String retentionPolicy, final Point point); + /** + * Try to write a single Point to the database. Useful if batching is enabled with a bounded + * actions size: once the maximum batch size is reached, calls to + * {@link #tryWrite(String, String, Point)} will return false until the batch is + * written to Influx. + * + * @return true if the point was enqueued or written, false otherwise. + * + * @see #write(String, String, Point) + */ + public boolean tryWrite(final String database, final String retentionPolicy, final Point point); + /** * Write a single Point to the database through UDP. * @@ -164,6 +176,18 @@ public InfluxDB enableBatch(final int actions, final int flushDuration, final Ti */ public void write(final int udpPort, final Point point); + /** + * Try to write a single Point to the database through UDP. Useful if batching is enabled with a + * bounded actions size: once the maximum batch size is reached, calls to + * {@link #tryWrite(int, Point)} will return false until the batch is written to + * Influx. + * + * @return true if the point was enqueued or written, false otherwise. + * + * @see #write(int, Point) + */ + public boolean tryWrite(final int udpPort, final Point point); + /** * Write a set of Points to the influxdb database with the new (>= 0.9.0rc32) lineprotocol. * diff --git a/src/main/java/org/influxdb/impl/BatchProcessor.java b/src/main/java/org/influxdb/impl/BatchProcessor.java index e276e6c46..61d2bdda6 100644 --- a/src/main/java/org/influxdb/impl/BatchProcessor.java +++ b/src/main/java/org/influxdb/impl/BatchProcessor.java @@ -248,14 +248,15 @@ void put(final AbstractBatchEntry batchEntry) { } catch (InterruptedException e) { throw new RuntimeException(e); } - if (this.queue.size() >= this.actions) { - this.scheduler.submit(new Runnable() { - @Override - public void run() { - write(); - } - }); + scheduleWriteIfAtCapacity(); + } + + boolean offer(final AbstractBatchEntry batchEntry) { + if (this.queue.offer(batchEntry)) { + scheduleWriteIfAtCapacity(); + return true; } + return false; } /** @@ -268,4 +269,14 @@ void flush() { this.scheduler.shutdown(); } + private void scheduleWriteIfAtCapacity() { + if (this.queue.size() >= this.actions) { + this.scheduler.submit(new Runnable() { + @Override + public void run() { + write(); + } + }); + } + } } diff --git a/src/main/java/org/influxdb/impl/InfluxDBImpl.java b/src/main/java/org/influxdb/impl/InfluxDBImpl.java index 8cc047e40..ee8062bb9 100644 --- a/src/main/java/org/influxdb/impl/InfluxDBImpl.java +++ b/src/main/java/org/influxdb/impl/InfluxDBImpl.java @@ -226,15 +226,27 @@ public void write(final String database, final String retentionPolicy, final Poi HttpBatchEntry batchEntry = new HttpBatchEntry(point, database, retentionPolicy); this.batchProcessor.put(batchEntry); } else { - BatchPoints batchPoints = BatchPoints.database(database) - .retentionPolicy(retentionPolicy).build(); - batchPoints.point(point); - this.write(batchPoints); - this.unBatchedCount.incrementAndGet(); + writeDirect(database, retentionPolicy, point); } this.writeCount.incrementAndGet(); } + /** + * {@inheritDoc} + */ + @Override + public boolean tryWrite(final String database, final String retentionPolicy, final Point point) { + boolean written = true; + if (this.batchEnabled.get()) { + HttpBatchEntry batchEntry = new HttpBatchEntry(point, database, retentionPolicy); + written = this.batchProcessor.offer(batchEntry); + } else { + writeDirect(database, retentionPolicy, point); + } + this.writeCount.incrementAndGet(); + return written; + } + /** * {@inheritDoc} */ @@ -250,6 +262,23 @@ public void write(final int udpPort, final Point point) { this.writeCount.incrementAndGet(); } + /** + * {@inheritDoc} + */ + @Override + public boolean tryWrite(final int udpPort, final Point point) { + boolean written = true; + if (this.batchEnabled.get()) { + UdpBatchEntry batchEntry = new UdpBatchEntry(point, udpPort); + written = this.batchProcessor.offer(batchEntry); + } else { + this.write(udpPort, point.lineProtocol()); + this.unBatchedCount.incrementAndGet(); + } + this.writeCount.incrementAndGet(); + return written; + } + @Override public void write(final BatchPoints batchPoints) { this.batchedCount.addAndGet(batchPoints.getPoints().size()); @@ -474,4 +503,10 @@ public void close() { } } + private void writeDirect(final String database, final String retentionPolicy, final Point point) { + BatchPoints batchPoints = BatchPoints.database(database).retentionPolicy(retentionPolicy).build(); + batchPoints.point(point); + this.write(batchPoints); + this.unBatchedCount.incrementAndGet(); + } } diff --git a/src/test/java/org/influxdb/impl/BatchProcessorTest.java b/src/test/java/org/influxdb/impl/BatchProcessorTest.java index 039d301f1..6e41ee32f 100644 --- a/src/test/java/org/influxdb/impl/BatchProcessorTest.java +++ b/src/test/java/org/influxdb/impl/BatchProcessorTest.java @@ -14,6 +14,9 @@ import org.influxdb.dto.Point; import org.junit.Test; +import static org.junit.Assert.*; +import static org.hamcrest.CoreMatchers.*; + public class BatchProcessorTest { @Test @@ -21,23 +24,27 @@ public void testSchedulerExceptionHandling() throws InterruptedException, IOExce InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB).actions(Integer.MAX_VALUE) .interval(1, TimeUnit.NANOSECONDS).build(); + try { + doThrow(new RuntimeException()).when(mockInfluxDB).write(any(BatchPoints.class)); - doThrow(new RuntimeException()).when(mockInfluxDB).write(any(BatchPoints.class)); - - Point point = Point.measurement("cpu").field("6", "").build(); - BatchProcessor.HttpBatchEntry batchEntry1 = new BatchProcessor.HttpBatchEntry(point, "db1", ""); - BatchProcessor.HttpBatchEntry batchEntry2 = new BatchProcessor.HttpBatchEntry(point, "db2", ""); + Point point = Point.measurement("cpu").field("6", "").build(); + BatchProcessor.HttpBatchEntry batchEntry1 = new BatchProcessor.HttpBatchEntry(point, "db1", ""); + BatchProcessor.HttpBatchEntry batchEntry2 = new BatchProcessor.HttpBatchEntry(point, "db2", ""); - batchProcessor.put(batchEntry1); - Thread.sleep(200); // wait for scheduler + batchProcessor.put(batchEntry1); + Thread.sleep(200); // wait for scheduler - // first try throws an exception - verify(mockInfluxDB, times(1)).write(any(BatchPoints.class)); + // first try throws an exception + verify(mockInfluxDB, times(1)).write(any(BatchPoints.class)); - batchProcessor.put(batchEntry2); - Thread.sleep(200); // wait for scheduler - // without try catch the 2nd time does not occur - verify(mockInfluxDB, times(2)).write(any(BatchPoints.class)); + batchProcessor.put(batchEntry2); + Thread.sleep(200); // wait for scheduler + // without try catch the 2nd time does not occur + verify(mockInfluxDB, times(2)).write(any(BatchPoints.class)); + } + finally { + batchProcessor.flush(); + } } @Test @@ -46,36 +53,62 @@ public void testBatchWriteWithDifferenctRp() throws InterruptedException, IOExce BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB).actions(Integer.MAX_VALUE) .interval(1, TimeUnit.NANOSECONDS).build(); - Point point = Point.measurement("cpu").field("6", "").build(); - BatchProcessor.HttpBatchEntry batchEntry1 = new BatchProcessor.HttpBatchEntry(point, "db1", "rp_1"); - BatchProcessor.HttpBatchEntry batchEntry2 = new BatchProcessor.HttpBatchEntry(point, "db1", "rp_2"); + try { + Point point = Point.measurement("cpu").field("6", "").build(); + BatchProcessor.HttpBatchEntry batchEntry1 = new BatchProcessor.HttpBatchEntry(point, "db1", "rp_1"); + BatchProcessor.HttpBatchEntry batchEntry2 = new BatchProcessor.HttpBatchEntry(point, "db1", "rp_2"); + + batchProcessor.put(batchEntry1); + batchProcessor.put(batchEntry2); + + Thread.sleep(200); // wait for scheduler + // same dbname with different rp should write two batchs instead of only one. + verify(mockInfluxDB, times(2)).write(any(BatchPoints.class)); + } + finally { + batchProcessor.flush(); + } + } + + @Test + public void testNoBlockOnCapacityExceeded() { + InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); + BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB).actions(2) + .interval(10, TimeUnit.MINUTES).build(); - batchProcessor.put(batchEntry1); - batchProcessor.put(batchEntry2); + try { + Point point = Point.measurement("cpu").addField("4", "").build(); + BatchProcessor.HttpBatchEntry batchEntry1 = new BatchProcessor.HttpBatchEntry(point, "db1", "rp_1"); + BatchProcessor.HttpBatchEntry batchEntry2 = new BatchProcessor.HttpBatchEntry(point, "db1", "rp_2"); + BatchProcessor.HttpBatchEntry batchEntry3 = new BatchProcessor.HttpBatchEntry(point, "db1", "rp_3"); - Thread.sleep(200); // wait for scheduler - // same dbname with different rp should write two batchs instead of only one. - verify(mockInfluxDB, times(2)).write(any(BatchPoints.class)); + assertThat(batchProcessor.offer(batchEntry1), is(true)); + assertThat(batchProcessor.offer(batchEntry2), is(true)); + assertThat(batchProcessor.offer(batchEntry3), is(false)); + } + finally { + batchProcessor.flush(); + } } @Test(expected = IllegalArgumentException.class) public void testActionsIsZero() throws InterruptedException, IOException { InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); BatchProcessor.builder(mockInfluxDB).actions(0) - .interval(1, TimeUnit.NANOSECONDS).build(); + .interval(1, TimeUnit.NANOSECONDS).build().flush(); } - + @Test(expected = IllegalArgumentException.class) public void testIntervalIsZero() throws InterruptedException, IOException { InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); BatchProcessor.builder(mockInfluxDB).actions(1) - .interval(0, TimeUnit.NANOSECONDS).build(); + .interval(0, TimeUnit.NANOSECONDS).build().flush(); } - + @Test(expected = NullPointerException.class) public void testInfluxDBIsNull() throws InterruptedException, IOException { InfluxDB mockInfluxDB = null; BatchProcessor.builder(mockInfluxDB).actions(1) - .interval(1, TimeUnit.NANOSECONDS).build(); + .interval(1, TimeUnit.NANOSECONDS).build().flush(); } }