Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add InfluxDB.tryWrite(...) methods #287

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions src/main/java/org/influxdb/InfluxDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,18 @@ public InfluxDB enableBatch(final int actions, final int flushDuration, final Ti
*/
public void write(final String database, final String retentionPolicy, final Point point);

/**
* Try to write a single Point to the database. Useful if batching is enabled with a bounded
* <code>actions</code> size: once the maximum batch size is reached, calls to
* {@link #tryWrite(String, String, Point)} will return <code>false</code> until the batch is
* written to Influx.
*
* @return <code>true</code> if the point was enqueued or written, <code>false</code> otherwise.
*
* @see #write(String, String, Point)
*/
public boolean tryWrite(final String database, final String retentionPolicy, final Point point);

/**
* Write a single Point to the database through UDP.
*
Expand All @@ -164,6 +176,18 @@ public InfluxDB enableBatch(final int actions, final int flushDuration, final Ti
*/
public void write(final int udpPort, final Point point);

/**
* Try to write a single Point to the database through UDP. Useful if batching is enabled with a
* bounded <code>actions</code> size: once the maximum batch size is reached, calls to
* {@link #tryWrite(int, Point)} will return <code>false</code> until the batch is written to
* Influx.
*
* @return <code>true</code> if the point was enqueued or written, <code>false</code> otherwise.
*
* @see #write(int, Point)
*/
public boolean tryWrite(final int udpPort, final Point point);

/**
* Write a set of Points to the influxdb database with the new (>= 0.9.0rc32) lineprotocol.
*
Expand Down
25 changes: 18 additions & 7 deletions src/main/java/org/influxdb/impl/BatchProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -248,14 +248,15 @@ void put(final AbstractBatchEntry batchEntry) {
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (this.queue.size() >= this.actions) {
this.scheduler.submit(new Runnable() {
@Override
public void run() {
write();
}
});
scheduleWriteIfAtCapacity();
}

boolean offer(final AbstractBatchEntry batchEntry) {
if (this.queue.offer(batchEntry)) {
scheduleWriteIfAtCapacity();
return true;
}
return false;
}

/**
Expand All @@ -268,4 +269,14 @@ void flush() {
this.scheduler.shutdown();
}

private void scheduleWriteIfAtCapacity() {
if (this.queue.size() >= this.actions) {
this.scheduler.submit(new Runnable() {
@Override
public void run() {
write();
}
});
}
}
}
45 changes: 40 additions & 5 deletions src/main/java/org/influxdb/impl/InfluxDBImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -226,15 +226,27 @@ public void write(final String database, final String retentionPolicy, final Poi
HttpBatchEntry batchEntry = new HttpBatchEntry(point, database, retentionPolicy);
this.batchProcessor.put(batchEntry);
} else {
BatchPoints batchPoints = BatchPoints.database(database)
.retentionPolicy(retentionPolicy).build();
batchPoints.point(point);
this.write(batchPoints);
this.unBatchedCount.incrementAndGet();
writeDirect(database, retentionPolicy, point);
}
this.writeCount.incrementAndGet();
}

/**
* {@inheritDoc}
*/
@Override
public boolean tryWrite(final String database, final String retentionPolicy, final Point point) {
boolean written = true;
if (this.batchEnabled.get()) {
HttpBatchEntry batchEntry = new HttpBatchEntry(point, database, retentionPolicy);
written = this.batchProcessor.offer(batchEntry);
} else {
writeDirect(database, retentionPolicy, point);
}
this.writeCount.incrementAndGet();
return written;
}

/**
* {@inheritDoc}
*/
Expand All @@ -250,6 +262,23 @@ public void write(final int udpPort, final Point point) {
this.writeCount.incrementAndGet();
}

/**
* {@inheritDoc}
*/
@Override
public boolean tryWrite(final int udpPort, final Point point) {
boolean written = true;
if (this.batchEnabled.get()) {
UdpBatchEntry batchEntry = new UdpBatchEntry(point, udpPort);
written = this.batchProcessor.offer(batchEntry);
} else {
this.write(udpPort, point.lineProtocol());
this.unBatchedCount.incrementAndGet();
}
this.writeCount.incrementAndGet();
return written;
}

@Override
public void write(final BatchPoints batchPoints) {
this.batchedCount.addAndGet(batchPoints.getPoints().size());
Expand Down Expand Up @@ -474,4 +503,10 @@ public void close() {
}
}

private void writeDirect(final String database, final String retentionPolicy, final Point point) {
BatchPoints batchPoints = BatchPoints.database(database).retentionPolicy(retentionPolicy).build();
batchPoints.point(point);
this.write(batchPoints);
this.unBatchedCount.incrementAndGet();
}
}
85 changes: 59 additions & 26 deletions src/test/java/org/influxdb/impl/BatchProcessorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,37 @@
import org.influxdb.dto.Point;
import org.junit.Test;

import static org.junit.Assert.*;
import static org.hamcrest.CoreMatchers.*;

public class BatchProcessorTest {

@Test
public void testSchedulerExceptionHandling() throws InterruptedException, IOException {
InfluxDB mockInfluxDB = mock(InfluxDBImpl.class);
BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB).actions(Integer.MAX_VALUE)
.interval(1, TimeUnit.NANOSECONDS).build();
try {
doThrow(new RuntimeException()).when(mockInfluxDB).write(any(BatchPoints.class));

doThrow(new RuntimeException()).when(mockInfluxDB).write(any(BatchPoints.class));

Point point = Point.measurement("cpu").field("6", "").build();
BatchProcessor.HttpBatchEntry batchEntry1 = new BatchProcessor.HttpBatchEntry(point, "db1", "");
BatchProcessor.HttpBatchEntry batchEntry2 = new BatchProcessor.HttpBatchEntry(point, "db2", "");
Point point = Point.measurement("cpu").field("6", "").build();
BatchProcessor.HttpBatchEntry batchEntry1 = new BatchProcessor.HttpBatchEntry(point, "db1", "");
BatchProcessor.HttpBatchEntry batchEntry2 = new BatchProcessor.HttpBatchEntry(point, "db2", "");

batchProcessor.put(batchEntry1);
Thread.sleep(200); // wait for scheduler
batchProcessor.put(batchEntry1);
Thread.sleep(200); // wait for scheduler

// first try throws an exception
verify(mockInfluxDB, times(1)).write(any(BatchPoints.class));
// first try throws an exception
verify(mockInfluxDB, times(1)).write(any(BatchPoints.class));

batchProcessor.put(batchEntry2);
Thread.sleep(200); // wait for scheduler
// without try catch the 2nd time does not occur
verify(mockInfluxDB, times(2)).write(any(BatchPoints.class));
batchProcessor.put(batchEntry2);
Thread.sleep(200); // wait for scheduler
// without try catch the 2nd time does not occur
verify(mockInfluxDB, times(2)).write(any(BatchPoints.class));
}
finally {
batchProcessor.flush();
}
}

@Test
Expand All @@ -46,36 +53,62 @@ public void testBatchWriteWithDifferenctRp() throws InterruptedException, IOExce
BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB).actions(Integer.MAX_VALUE)
.interval(1, TimeUnit.NANOSECONDS).build();

Point point = Point.measurement("cpu").field("6", "").build();
BatchProcessor.HttpBatchEntry batchEntry1 = new BatchProcessor.HttpBatchEntry(point, "db1", "rp_1");
BatchProcessor.HttpBatchEntry batchEntry2 = new BatchProcessor.HttpBatchEntry(point, "db1", "rp_2");
try {
Point point = Point.measurement("cpu").field("6", "").build();
BatchProcessor.HttpBatchEntry batchEntry1 = new BatchProcessor.HttpBatchEntry(point, "db1", "rp_1");
BatchProcessor.HttpBatchEntry batchEntry2 = new BatchProcessor.HttpBatchEntry(point, "db1", "rp_2");

batchProcessor.put(batchEntry1);
batchProcessor.put(batchEntry2);

Thread.sleep(200); // wait for scheduler
// same dbname with different rp should write two batchs instead of only one.
verify(mockInfluxDB, times(2)).write(any(BatchPoints.class));
}
finally {
batchProcessor.flush();
}
}

@Test
public void testNoBlockOnCapacityExceeded() {
InfluxDB mockInfluxDB = mock(InfluxDBImpl.class);
BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB).actions(2)
.interval(10, TimeUnit.MINUTES).build();

batchProcessor.put(batchEntry1);
batchProcessor.put(batchEntry2);
try {
Point point = Point.measurement("cpu").addField("4", "").build();
BatchProcessor.HttpBatchEntry batchEntry1 = new BatchProcessor.HttpBatchEntry(point, "db1", "rp_1");
BatchProcessor.HttpBatchEntry batchEntry2 = new BatchProcessor.HttpBatchEntry(point, "db1", "rp_2");
BatchProcessor.HttpBatchEntry batchEntry3 = new BatchProcessor.HttpBatchEntry(point, "db1", "rp_3");

Thread.sleep(200); // wait for scheduler
// same dbname with different rp should write two batchs instead of only one.
verify(mockInfluxDB, times(2)).write(any(BatchPoints.class));
assertThat(batchProcessor.offer(batchEntry1), is(true));
assertThat(batchProcessor.offer(batchEntry2), is(true));
assertThat(batchProcessor.offer(batchEntry3), is(false));
}
finally {
batchProcessor.flush();
}
}

@Test(expected = IllegalArgumentException.class)
public void testActionsIsZero() throws InterruptedException, IOException {
InfluxDB mockInfluxDB = mock(InfluxDBImpl.class);
BatchProcessor.builder(mockInfluxDB).actions(0)
.interval(1, TimeUnit.NANOSECONDS).build();
.interval(1, TimeUnit.NANOSECONDS).build().flush();
}

@Test(expected = IllegalArgumentException.class)
public void testIntervalIsZero() throws InterruptedException, IOException {
InfluxDB mockInfluxDB = mock(InfluxDBImpl.class);
BatchProcessor.builder(mockInfluxDB).actions(1)
.interval(0, TimeUnit.NANOSECONDS).build();
.interval(0, TimeUnit.NANOSECONDS).build().flush();
}

@Test(expected = NullPointerException.class)
public void testInfluxDBIsNull() throws InterruptedException, IOException {
InfluxDB mockInfluxDB = null;
BatchProcessor.builder(mockInfluxDB).actions(1)
.interval(1, TimeUnit.NANOSECONDS).build();
.interval(1, TimeUnit.NANOSECONDS).build().flush();
}
}