From 558e127dcfe2fe0a61dee705cce1630c746fbc36 Mon Sep 17 00:00:00 2001 From: Andrew Dodd Date: Wed, 21 Oct 2015 10:31:48 +0200 Subject: [PATCH 01/13] Removed unnecessary 'this.' references The excessive use of 'this.' is confusing, as it implies access to an object attribute/method that needs disambiguation. --- .../org/influxdb/impl/BatchProcessor.java | 26 ++++---- .../java/org/influxdb/impl/InfluxDBImpl.java | 62 +++++++++---------- 2 files changed, 44 insertions(+), 44 deletions(-) diff --git a/src/main/java/org/influxdb/impl/BatchProcessor.java b/src/main/java/org/influxdb/impl/BatchProcessor.java index fb3328da6..88415c40a 100644 --- a/src/main/java/org/influxdb/impl/BatchProcessor.java +++ b/src/main/java/org/influxdb/impl/BatchProcessor.java @@ -82,10 +82,10 @@ public Builder interval(final int interval, final TimeUnit unit) { * @return the BatchProcessor instance. */ public BatchProcessor build() { - Preconditions.checkNotNull(this.actions, "actions may not be null"); - Preconditions.checkNotNull(this.flushInterval, "flushInterval may not be null"); - Preconditions.checkNotNull(this.flushIntervalUnit, "flushIntervalUnit may not be null"); - return new BatchProcessor(this.influxDB, this.actions, this.flushIntervalUnit, this.flushInterval); + Preconditions.checkNotNull(actions, "actions may not be null"); + Preconditions.checkNotNull(flushInterval, "flushInterval may not be null"); + Preconditions.checkNotNull(flushIntervalUnit, "flushIntervalUnit may not be null"); + return new BatchProcessor(influxDB, actions, flushIntervalUnit, flushInterval); } } @@ -134,7 +134,7 @@ public static Builder builder(final InfluxDB influxDB) { this.flushInterval = flushInterval; // Flush at specified Rate - this.scheduler.scheduleAtFixedRate(new Runnable() { + scheduler.scheduleAtFixedRate(new Runnable() { @Override public void run() { write(); @@ -144,13 +144,13 @@ public void run() { } void write() { - if (this.queue.isEmpty()) { + if (queue.isEmpty()) { return; } Map databaseToBatchPoints = Maps.newHashMap(); - List batchEntries = new ArrayList<>(this.queue.size()); - this.queue.drainTo(batchEntries); + List batchEntries = new ArrayList<>(queue.size()); + queue.drainTo(batchEntries); for (BatchEntry batchEntry : batchEntries) { String dbName = batchEntry.getDb(); @@ -163,7 +163,7 @@ void write() { } for (BatchPoints batchPoints : databaseToBatchPoints.values()) { - BatchProcessor.this.influxDB.write(batchPoints); + influxDB.write(batchPoints); } } @@ -174,8 +174,8 @@ void write() { * the batchEntry to write to the cache. */ void put(final BatchEntry batchEntry) { - this.queue.add(batchEntry); - if (this.queue.size() >= this.actions) { + queue.add(batchEntry); + if (queue.size() >= actions) { write(); } } @@ -186,8 +186,8 @@ void put(final BatchEntry batchEntry) { * */ void flush() { - this.write(); - this.scheduler.shutdown(); + write(); + scheduler.shutdown(); } } diff --git a/src/main/java/org/influxdb/impl/InfluxDBImpl.java b/src/main/java/org/influxdb/impl/InfluxDBImpl.java index f1f8f8d51..5a644ffb3 100644 --- a/src/main/java/org/influxdb/impl/InfluxDBImpl.java +++ b/src/main/java/org/influxdb/impl/InfluxDBImpl.java @@ -58,28 +58,28 @@ public InfluxDBImpl(final String url, final String username, final String passwo this.username = username; this.password = password; Client client = new OkClient(new OkHttpClient()); - this.restAdapter = new RestAdapter.Builder() + restAdapter = new RestAdapter.Builder() .setEndpoint(url) .setErrorHandler(new InfluxDBErrorHandler()) .setClient(client) .build(); - this.influxDBService = this.restAdapter.create(InfluxDBService.class); + influxDBService = restAdapter.create(InfluxDBService.class); } @Override public InfluxDB setLogLevel(final LogLevel logLevel) { switch (logLevel) { case NONE: - this.restAdapter.setLogLevel(retrofit.RestAdapter.LogLevel.NONE); + restAdapter.setLogLevel(retrofit.RestAdapter.LogLevel.NONE); break; case BASIC: - this.restAdapter.setLogLevel(retrofit.RestAdapter.LogLevel.BASIC); + restAdapter.setLogLevel(retrofit.RestAdapter.LogLevel.BASIC); break; case HEADERS: - this.restAdapter.setLogLevel(retrofit.RestAdapter.LogLevel.HEADERS); + restAdapter.setLogLevel(retrofit.RestAdapter.LogLevel.HEADERS); break; case FULL: - this.restAdapter.setLogLevel(retrofit.RestAdapter.LogLevel.FULL); + restAdapter.setLogLevel(retrofit.RestAdapter.LogLevel.FULL); break; default: break; @@ -90,33 +90,33 @@ public InfluxDB setLogLevel(final LogLevel logLevel) { @Override public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit) { - if (this.batchEnabled.get()) { + if (batchEnabled.get()) { throw new IllegalArgumentException("BatchProcessing is already enabled."); } - this.batchProcessor = BatchProcessor + batchProcessor = BatchProcessor .builder(this) .actions(actions) .interval(flushDuration, flushDurationTimeUnit) .build(); - this.batchEnabled.set(true); + batchEnabled.set(true); return this; } @Override public void disableBatch() { - this.batchEnabled.set(false); - this.batchProcessor.flush(); - if (this.logLevel != LogLevel.NONE) { + batchEnabled.set(false); + batchProcessor.flush(); + if (logLevel != LogLevel.NONE) { System.out.println( - "total writes:" + this.writeCount.get() + " unbatched:" + this.unBatchedCount.get() + "batchPoints:" - + this.batchedCount); + "total writes:" + writeCount.get() + " unbatched:" + unBatchedCount.get() + "batchPoints:" + + batchedCount); } } @Override public Pong ping() { Stopwatch watch = Stopwatch.createStarted(); - Response response = this.influxDBService.ping(); + Response response = influxDBService.ping(); List
headers = response.getHeaders(); String version = "unknown"; for (Header header : headers) { @@ -137,25 +137,25 @@ public String version() { @Override public void write(final String database, final String retentionPolicy, final Point point) { - if (this.batchEnabled.get()) { + if (batchEnabled.get()) { BatchEntry batchEntry = new BatchEntry(point, database, retentionPolicy); - this.batchProcessor.put(batchEntry); + batchProcessor.put(batchEntry); } else { BatchPoints batchPoints = BatchPoints.database(database).retentionPolicy(retentionPolicy).build(); batchPoints.point(point); - this.write(batchPoints); - this.unBatchedCount.incrementAndGet(); + write(batchPoints); + unBatchedCount.incrementAndGet(); } - this.writeCount.incrementAndGet(); + writeCount.incrementAndGet(); } @Override public void write(final BatchPoints batchPoints) { - this.batchedCount.addAndGet(batchPoints.getPoints().size()); + batchedCount.addAndGet(batchPoints.getPoints().size()); TypedString lineProtocol = new TypedString(batchPoints.lineProtocol()); - this.influxDBService.writePoints( - this.username, - this.password, + influxDBService.writePoints( + username, + password, batchPoints.getDatabase(), batchPoints.getRetentionPolicy(), TimeUtil.toTimePrecision(TimeUnit.NANOSECONDS), @@ -169,8 +169,8 @@ public void write(final BatchPoints batchPoints) { */ @Override public QueryResult query(final Query query) { - QueryResult response = this.influxDBService - .query(this.username, this.password, query.getDatabase(), query.getCommand()); + QueryResult response = influxDBService + .query(username, password, query.getDatabase(), query.getCommand()); return response; } @@ -179,8 +179,8 @@ public QueryResult query(final Query query) { */ @Override public QueryResult query(final Query query, final TimeUnit timeUnit) { - QueryResult response = this.influxDBService - .query(this.username, this.password, query.getDatabase(), TimeUtil.toTimePrecision(timeUnit) , query.getCommand()); + QueryResult response = influxDBService + .query(username, password, query.getDatabase(), TimeUtil.toTimePrecision(timeUnit) , query.getCommand()); return response; } @@ -190,7 +190,7 @@ public QueryResult query(final Query query, final TimeUnit timeUnit) { @Override public void createDatabase(final String name) { Preconditions.checkArgument(!name.contains("-"), "Databasename cant contain -"); - this.influxDBService.query(this.username, this.password, "CREATE DATABASE " + name); + influxDBService.query(username, password, "CREATE DATABASE " + name); } /** @@ -198,7 +198,7 @@ public void createDatabase(final String name) { */ @Override public void deleteDatabase(final String name) { - this.influxDBService.query(this.username, this.password, "DROP DATABASE " + name); + influxDBService.query(username, password, "DROP DATABASE " + name); } /** @@ -206,7 +206,7 @@ public void deleteDatabase(final String name) { */ @Override public List describeDatabases() { - QueryResult result = this.influxDBService.query(this.username, this.password, "SHOW DATABASES"); + QueryResult result = influxDBService.query(username, password, "SHOW DATABASES"); // {"results":[{"series":[{"name":"databases","columns":["name"],"values":[["mydb"]]}]}]} // Series [name=databases, columns=[name], values=[[mydb], [unittest_1433605300968]]] List> databaseNames = result.getResults().get(0).getSeries().get(0).getValues(); From fcb7fd85457895c481b0373893901a9d86123369 Mon Sep 17 00:00:00 2001 From: Andrew Dodd Date: Wed, 21 Oct 2015 10:53:48 +0200 Subject: [PATCH 02/13] Changing attributes to full words The non-abbreviated field and method names are clearer. --- .../org/influxdb/impl/BatchProcessor.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/main/java/org/influxdb/impl/BatchProcessor.java b/src/main/java/org/influxdb/impl/BatchProcessor.java index 88415c40a..a898f8bf6 100644 --- a/src/main/java/org/influxdb/impl/BatchProcessor.java +++ b/src/main/java/org/influxdb/impl/BatchProcessor.java @@ -91,26 +91,26 @@ public BatchProcessor build() { static class BatchEntry { private final Point point; - private final String db; - private final String rp; + private final String database; + private final String retentionPolicy; - public BatchEntry(final Point point, final String db, final String rp) { + public BatchEntry(final Point point, final String database, final String retentionPolicy) { super(); this.point = point; - this.db = db; - this.rp = rp; + this.database = database; + this.retentionPolicy = retentionPolicy; } public Point getPoint() { return this.point; } - public String getDb() { - return this.db; + public String getDatabase() { + return this.database; } - public String getRp() { - return this.rp; + public String getRetentionPolicy() { + return this.retentionPolicy; } } @@ -153,9 +153,9 @@ void write() { queue.drainTo(batchEntries); for (BatchEntry batchEntry : batchEntries) { - String dbName = batchEntry.getDb(); + String dbName = batchEntry.getDatabase(); if (!databaseToBatchPoints.containsKey(dbName)) { - BatchPoints batchPoints = BatchPoints.database(dbName).retentionPolicy(batchEntry.getRp()).build(); + BatchPoints batchPoints = BatchPoints.database(dbName).retentionPolicy(batchEntry.getRetentionPolicy()).build(); databaseToBatchPoints.put(dbName, batchPoints); } Point point = batchEntry.getPoint(); From b93d0e375e1448c91ab4c9c49816d77aad2e47be Mon Sep 17 00:00:00 2001 From: Andrew Dodd Date: Wed, 21 Oct 2015 11:15:59 +0200 Subject: [PATCH 03/13] Refactoring batched & unmatched writes These changes introduce two helper functions for writing batched and unmatched points separately. This helps by: - Removing the need to wrap 'unlatched' points in a 'BatchPoints' object - Locating the updates of the counters in a sensible location --- .../java/org/influxdb/impl/InfluxDBImpl.java | 44 ++++++++++++------- 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/influxdb/impl/InfluxDBImpl.java b/src/main/java/org/influxdb/impl/InfluxDBImpl.java index 5a644ffb3..bd2d0cc61 100644 --- a/src/main/java/org/influxdb/impl/InfluxDBImpl.java +++ b/src/main/java/org/influxdb/impl/InfluxDBImpl.java @@ -107,9 +107,8 @@ public void disableBatch() { batchEnabled.set(false); batchProcessor.flush(); if (logLevel != LogLevel.NONE) { - System.out.println( - "total writes:" + writeCount.get() + " unbatched:" + unBatchedCount.get() + "batchPoints:" - + batchedCount); + System.out.println(String.format("Total writes:%d Unbatched:%d Batched:%d", + writeCount.get(), unBatchedCount.get(), batchedCount.get())); } } @@ -141,27 +140,40 @@ public void write(final String database, final String retentionPolicy, final Poi BatchEntry batchEntry = new BatchEntry(point, database, retentionPolicy); batchProcessor.put(batchEntry); } else { - BatchPoints batchPoints = BatchPoints.database(database).retentionPolicy(retentionPolicy).build(); - batchPoints.point(point); - write(batchPoints); - unBatchedCount.incrementAndGet(); + writeUnbatched(database, retentionPolicy, ConsistencyLevel.ONE, point); } - writeCount.incrementAndGet(); } - + @Override - public void write(final BatchPoints batchPoints) { - batchedCount.addAndGet(batchPoints.getPoints().size()); - TypedString lineProtocol = new TypedString(batchPoints.lineProtocol()); + public void write(final BatchPoints points) { + writeBatched(points); + } + + protected void writeBatched(final BatchPoints points) { + batchedCount.addAndGet(points.getPoints().size()); + writeCount.addAndGet(points.getPoints().size()); + writeLine(points.getDatabase(), + points.getRetentionPolicy(), + points.getConsistency(), + points.lineProtocol()); + } + + protected void writeUnbatched(String database, String retentionPolicy, ConsistencyLevel consistencyLevel, Point point) { + unBatchedCount.incrementAndGet(); + writeCount.incrementAndGet(); + writeLine(database, retentionPolicy, consistencyLevel, point.lineProtocol()); + } + + private void writeLine(String database, String retentionPolicy, ConsistencyLevel consistency, String line) { + TypedString lineProtocol = new TypedString(line); influxDBService.writePoints( username, password, - batchPoints.getDatabase(), - batchPoints.getRetentionPolicy(), + database, + retentionPolicy, TimeUtil.toTimePrecision(TimeUnit.NANOSECONDS), - batchPoints.getConsistency().value(), + consistency.value(), lineProtocol); - } /** From c7a49a31fd3fd6a8677f4c7a0c17d3da8cb424a0 Mon Sep 17 00:00:00 2001 From: Andrew Dodd Date: Wed, 21 Oct 2015 12:16:54 +0200 Subject: [PATCH 04/13] Interface Change! These changes: - Add the ConsistencyLevel parameter to both the single and batched write methods. This was previously hidden from the user (defaulted to ConsistencyLevel.ONE). - Added a batched write method that takes a List parameter, rather than requiring a BatchPoints object. - Deprecated the BatchPoints object and both of the previous write() methods. - Provide implementations of the deprecated interface that use the new interface. - Updates the tests to use the new interface, though the old one will still work. - Fixes the BatchProcessor to use all of the common fields on the BatchEntry objects when performing the write (i.e. the current implementation creates the Map object only on the database name, but it should use all of the common fields). Justification 1. The current interface forces the user to create a BatchPoints object, even if they already have a list of Point objects that they want to send. 2. The BatchPoints object does not offer much convenience. The one feature that has not been reproduced is the ability to put the same tag on all points within a batch. I do not believe this is necessary because: - This tag information should be created and stored on the Point itself; and - It is pretty easy to add a function to add the same tag to a collection of points into the Point class. --- src/main/java/org/influxdb/InfluxDB.java | 26 ++++++- .../java/org/influxdb/dto/BatchPoints.java | 1 + src/main/java/org/influxdb/dto/Point.java | 10 +++ .../org/influxdb/impl/BatchProcessor.java | 76 +++++++++++++++---- .../java/org/influxdb/impl/InfluxDBImpl.java | 37 +++++---- src/test/java/org/influxdb/InfluxDBTest.java | 20 +++-- .../java/org/influxdb/PerformanceTests.java | 37 +++++---- src/test/java/org/influxdb/TicketTests.java | 20 ++--- src/test/java/org/influxdb/dto/PointTest.java | 19 ++--- 9 files changed, 165 insertions(+), 81 deletions(-) diff --git a/src/main/java/org/influxdb/InfluxDB.java b/src/main/java/org/influxdb/InfluxDB.java index 30504cf3f..c229a0512 100644 --- a/src/main/java/org/influxdb/InfluxDB.java +++ b/src/main/java/org/influxdb/InfluxDB.java @@ -109,7 +109,7 @@ public String value() { public String version(); /** - * Write a single Point to the database. + * Write a single Point to the database with ConsistencyLevel.One. * * @param database * the database to write to. @@ -118,16 +118,38 @@ public String value() { * @param point * The point to write */ + @Deprecated public void write(final String database, final String retentionPolicy, final Point point); + + /** + * Write a single Point to the database. + * + * @param database + * @param retentionPolicy + * @param consistencyLevel + * @param point + */ + public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistencyLevel, final Point point); /** * Write a set of Points to the influxdb database with the new (>= 0.9.0rc32) lineprotocol. * * {@linkplain "https://github.com/influxdb/influxdb/pull/2696"} - * + * * @param batchPoints */ + @Deprecated public void write(final BatchPoints batchPoints); + + /** + * Write a set of Points to the influxdb database with the new (>= 0.9.0rc32) lineprotocol. + * + * {@linkplain "https://github.com/influxdb/influxdb/pull/2696"} + * + * @param batchPoints + */ + public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistencyLevel, final List points); + /** * Execute a query agains a database. diff --git a/src/main/java/org/influxdb/dto/BatchPoints.java b/src/main/java/org/influxdb/dto/BatchPoints.java index e49e35f72..0d5fabb56 100644 --- a/src/main/java/org/influxdb/dto/BatchPoints.java +++ b/src/main/java/org/influxdb/dto/BatchPoints.java @@ -19,6 +19,7 @@ * @author stefan * */ +@Deprecated public class BatchPoints { private String database; private String retentionPolicy; diff --git a/src/main/java/org/influxdb/dto/Point.java b/src/main/java/org/influxdb/dto/Point.java index 76ce9bc6f..f0763ef38 100644 --- a/src/main/java/org/influxdb/dto/Point.java +++ b/src/main/java/org/influxdb/dto/Point.java @@ -1,6 +1,7 @@ package org.influxdb.dto; import java.text.NumberFormat; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Map.Entry; @@ -288,4 +289,13 @@ private StringBuilder formatedTime() { return sb; } + public static String toLineProtocol(List points) { + StringBuilder sb = new StringBuilder(); + for (Point point : points) { + sb.append(point.lineProtocol()).append("\n"); + } + return sb.toString(); + } + + } diff --git a/src/main/java/org/influxdb/impl/BatchProcessor.java b/src/main/java/org/influxdb/impl/BatchProcessor.java index a898f8bf6..a06557039 100644 --- a/src/main/java/org/influxdb/impl/BatchProcessor.java +++ b/src/main/java/org/influxdb/impl/BatchProcessor.java @@ -3,6 +3,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -10,9 +11,10 @@ import java.util.concurrent.TimeUnit; import org.influxdb.InfluxDB; -import org.influxdb.dto.BatchPoints; +import org.influxdb.InfluxDB.ConsistencyLevel; import org.influxdb.dto.Point; +import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; @@ -93,24 +95,30 @@ static class BatchEntry { private final Point point; private final String database; private final String retentionPolicy; + private final ConsistencyLevel consistencyLevel; - public BatchEntry(final Point point, final String database, final String retentionPolicy) { + public BatchEntry(final Point point, final String database, ConsistencyLevel consistencyLevel, final String retentionPolicy) { super(); this.point = point; this.database = database; this.retentionPolicy = retentionPolicy; + this.consistencyLevel = consistencyLevel; } public Point getPoint() { - return this.point; + return point; } public String getDatabase() { - return this.database; + return database; } public String getRetentionPolicy() { - return this.retentionPolicy; + return retentionPolicy; + } + + public ConsistencyLevel getConsistencyLevel() { + return consistencyLevel; } } @@ -148,22 +156,23 @@ void write() { return; } - Map databaseToBatchPoints = Maps.newHashMap(); + Map> databaseToBatchPoints = Maps.newHashMap(); List batchEntries = new ArrayList<>(queue.size()); queue.drainTo(batchEntries); for (BatchEntry batchEntry : batchEntries) { - String dbName = batchEntry.getDatabase(); - if (!databaseToBatchPoints.containsKey(dbName)) { - BatchPoints batchPoints = BatchPoints.database(dbName).retentionPolicy(batchEntry.getRetentionPolicy()).build(); - databaseToBatchPoints.put(dbName, batchPoints); + BatchCommonFields common = BatchCommonFields.fromEntry(batchEntry); + + if (!databaseToBatchPoints.containsKey(common)) { + databaseToBatchPoints.put(common, new ArrayList()); } - Point point = batchEntry.getPoint(); - databaseToBatchPoints.get(dbName).point(point); + databaseToBatchPoints.get(common).add(batchEntry.getPoint()); } - for (BatchPoints batchPoints : databaseToBatchPoints.values()) { - influxDB.write(batchPoints); + for (Entry> entry : databaseToBatchPoints.entrySet()) { + BatchCommonFields common = entry.getKey(); + List points = entry.getValue(); + influxDB.write(common.database, common.retentionPolicy, common.consistencyLevel, points); } } @@ -190,4 +199,43 @@ void flush() { scheduler.shutdown(); } + private static class BatchCommonFields { + private final String database; + private final String retentionPolicy; + private final ConsistencyLevel consistencyLevel; + + public BatchCommonFields(final String database, final String retentionPolicy, + final ConsistencyLevel consistencyLevel) { + this.database = database; + this.retentionPolicy = retentionPolicy; + this.consistencyLevel = consistencyLevel; + } + + @Override + public int hashCode() { + return Objects.hashCode(database, retentionPolicy, consistencyLevel); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + BatchCommonFields other = (BatchCommonFields) obj; + + return (Objects.equal(database, other.database) + && Objects.equal(retentionPolicy, other.retentionPolicy) + && Objects.equal(consistencyLevel, other.consistencyLevel)); + } + + public static BatchCommonFields fromEntry(BatchEntry batchEntry) { + return new BatchCommonFields(batchEntry.getDatabase(), + batchEntry.getRetentionPolicy(), batchEntry.getConsistencyLevel()); + } + + + } } diff --git a/src/main/java/org/influxdb/impl/InfluxDBImpl.java b/src/main/java/org/influxdb/impl/InfluxDBImpl.java index bd2d0cc61..1d86098d3 100644 --- a/src/main/java/org/influxdb/impl/InfluxDBImpl.java +++ b/src/main/java/org/influxdb/impl/InfluxDBImpl.java @@ -13,6 +13,11 @@ import org.influxdb.dto.QueryResult; import org.influxdb.impl.BatchProcessor.BatchEntry; +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; +import com.squareup.okhttp.OkHttpClient; + import retrofit.RestAdapter; import retrofit.client.Client; import retrofit.client.Header; @@ -20,11 +25,6 @@ import retrofit.client.Response; import retrofit.mime.TypedString; -import com.google.common.base.Preconditions; -import com.google.common.base.Stopwatch; -import com.google.common.collect.Lists; -import com.squareup.okhttp.OkHttpClient; - /** * Implementation of a InluxDB API. * @@ -136,8 +136,13 @@ public String version() { @Override public void write(final String database, final String retentionPolicy, final Point point) { + write(database, retentionPolicy, point); + } + + @Override + public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistencyLevel, final Point point) { if (batchEnabled.get()) { - BatchEntry batchEntry = new BatchEntry(point, database, retentionPolicy); + BatchEntry batchEntry = new BatchEntry(point, database, consistencyLevel, retentionPolicy); batchProcessor.put(batchEntry); } else { writeUnbatched(database, retentionPolicy, ConsistencyLevel.ONE, point); @@ -145,17 +150,19 @@ public void write(final String database, final String retentionPolicy, final Poi } @Override - public void write(final BatchPoints points) { - writeBatched(points); + public void write(BatchPoints batchPoints) { + write(batchPoints.getDatabase(), batchPoints.getRetentionPolicy(), ConsistencyLevel.ONE, batchPoints.getPoints()); + } + + @Override + public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistencyLevel, final List points) { + writeBatched(database, retentionPolicy, consistencyLevel, points); } - protected void writeBatched(final BatchPoints points) { - batchedCount.addAndGet(points.getPoints().size()); - writeCount.addAndGet(points.getPoints().size()); - writeLine(points.getDatabase(), - points.getRetentionPolicy(), - points.getConsistency(), - points.lineProtocol()); + protected void writeBatched(final String database, final String retentionPolicy, final ConsistencyLevel consistencyLevel, final List points) { + batchedCount.addAndGet(points.size()); + writeCount.addAndGet(points.size()); + writeLine(database, retentionPolicy, consistencyLevel, Point.toLineProtocol(points)); } protected void writeUnbatched(String database, String retentionPolicy, ConsistencyLevel consistencyLevel, Point point) { diff --git a/src/test/java/org/influxdb/InfluxDBTest.java b/src/test/java/org/influxdb/InfluxDBTest.java index 481a7efed..2c7741e24 100644 --- a/src/test/java/org/influxdb/InfluxDBTest.java +++ b/src/test/java/org/influxdb/InfluxDBTest.java @@ -5,8 +5,8 @@ import java.util.logging.Level; import java.util.logging.Logger; +import org.influxdb.InfluxDB.ConsistencyLevel; import org.influxdb.InfluxDB.LogLevel; -import org.influxdb.dto.BatchPoints; import org.influxdb.dto.Point; import org.influxdb.dto.Pong; import org.influxdb.dto.Query; @@ -20,6 +20,7 @@ import com.github.dockerjava.api.command.CreateContainerResponse; import com.github.dockerjava.core.DockerClientBuilder; import com.github.dockerjava.core.DockerClientConfig; +import com.google.common.collect.Lists; /** * Test the InfluxDB API. @@ -164,23 +165,26 @@ public void testDescribeDatabases() { @Test(enabled = true) public void testWrite() { String dbName = "write_unittest_" + System.currentTimeMillis(); - this.influxDB.createDatabase(dbName); + influxDB.createDatabase(dbName); - BatchPoints batchPoints = BatchPoints.database(dbName).tag("async", "true").retentionPolicy("default").build(); Point point1 = Point .measurement("cpu") .tag("atag", "test") + .tag("async", "true") .field("idle", 90L) .field("usertime", 9L) .field("system", 1L) .build(); - Point point2 = Point.measurement("disk").tag("atag", "test").field("used", 80L).field("free", 1L).build(); - batchPoints.point(point1); - batchPoints.point(point2); - this.influxDB.write(batchPoints); + Point point2 = Point.measurement("disk") + .tag("atag", "test") + .tag("async", "true") + .field("used", 80L).field("free", 1L).build(); + + influxDB.write(dbName, "default", ConsistencyLevel.ONE, Lists.newArrayList(point1, point2)); + Query query = new Query("SELECT * FROM cpu GROUP BY *", dbName); QueryResult result = this.influxDB.query(query); Assert.assertFalse(result.getResults().get(0).getSeries().get(0).getTags().isEmpty()); - this.influxDB.deleteDatabase(dbName); + influxDB.deleteDatabase(dbName); } } diff --git a/src/test/java/org/influxdb/PerformanceTests.java b/src/test/java/org/influxdb/PerformanceTests.java index b89a0a973..f077e8c89 100644 --- a/src/test/java/org/influxdb/PerformanceTests.java +++ b/src/test/java/org/influxdb/PerformanceTests.java @@ -1,9 +1,11 @@ package org.influxdb; +import java.util.List; import java.util.concurrent.TimeUnit; +import org.assertj.core.util.Lists; +import org.influxdb.InfluxDB.ConsistencyLevel; import org.influxdb.InfluxDB.LogLevel; -import org.influxdb.dto.BatchPoints; import org.influxdb.dto.Point; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -26,59 +28,56 @@ public void setUp() { @Test(threadPoolSize = 10, enabled = false) public void writeSinglePointPerformance() throws InterruptedException { String dbName = "write_" + System.currentTimeMillis(); - this.influxDB.createDatabase(dbName); - this.influxDB.enableBatch(2000, 100, TimeUnit.MILLISECONDS); + influxDB.createDatabase(dbName); + influxDB.enableBatch(2000, 100, TimeUnit.MILLISECONDS); Stopwatch watch = Stopwatch.createStarted(); for (int j = 0; j < SINGLE_POINT_COUNT; j++) { Point point = Point.measurement("cpu").field("idle", j).field("user", 2 * j).field("system", 3 * j).build(); - this.influxDB.write(dbName, "default", point); + influxDB.write(dbName, "default", ConsistencyLevel.ONE, point); } - this.influxDB.disableBatch(); + influxDB.disableBatch(); System.out.println("Single Point Write for " + SINGLE_POINT_COUNT + " writes of Points took:" + watch); - this.influxDB.deleteDatabase(dbName); + influxDB.deleteDatabase(dbName); } @Test(enabled = false) public void writePerformance() { String dbName = "writepoints_" + System.currentTimeMillis(); - this.influxDB.createDatabase(dbName); + influxDB.createDatabase(dbName); Stopwatch watch = Stopwatch.createStarted(); for (int i = 0; i < COUNT; i++) { - BatchPoints batchPoints = BatchPoints - .database(dbName) - .tag("blubber", "bla") - .retentionPolicy("default") - .build(); + List points = Lists.newArrayList(); for (int j = 0; j < POINT_COUNT; j++) { Point point = Point .measurement("cpu") .field("idle", j) .field("user", 2 * j) .field("system", 3 * j) + .tag("blubber", "bla") .build(); - batchPoints.point(point); + points.add(point); } - this.influxDB.write(batchPoints); + influxDB.write(dbName, "default", ConsistencyLevel.ONE, points); } System.out.println("WritePoints for " + COUNT + " writes of " + POINT_COUNT + " Points took:" + watch); - this.influxDB.deleteDatabase(dbName); + influxDB.deleteDatabase(dbName); } @Test(enabled = true) public void maxWritePointsPerformance() { String dbName = "d"; - this.influxDB.createDatabase(dbName); - this.influxDB.enableBatch(100000, 60, TimeUnit.SECONDS); + influxDB.createDatabase(dbName); + influxDB.enableBatch(100000, 60, TimeUnit.SECONDS); Stopwatch watch = Stopwatch.createStarted(); for (int i = 0; i < 2000000; i++) { Point point = Point.measurement("s").field("v", 1).build(); - this.influxDB.write(dbName, "default", point); + influxDB.write(dbName, "default", ConsistencyLevel.ONE, point); } System.out.println("5Mio points:" + watch); - this.influxDB.deleteDatabase(dbName); + influxDB.deleteDatabase(dbName); } } diff --git a/src/test/java/org/influxdb/TicketTests.java b/src/test/java/org/influxdb/TicketTests.java index 69ef44354..a80c9268d 100644 --- a/src/test/java/org/influxdb/TicketTests.java +++ b/src/test/java/org/influxdb/TicketTests.java @@ -5,13 +5,14 @@ import java.util.logging.Level; import java.util.logging.Logger; +import org.influxdb.InfluxDB.ConsistencyLevel; import org.influxdb.InfluxDB.LogLevel; -import org.influxdb.dto.BatchPoints; import org.influxdb.dto.Point; import org.influxdb.dto.Pong; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import org.testng.collections.Lists; import com.github.dockerjava.api.DockerClient; import com.github.dockerjava.api.command.CreateContainerResponse; @@ -101,7 +102,7 @@ public void testTicket38() { .tag("host", "host-\"C") .tag("region", "region") .build(); - this.influxDB.write(dbName, "default", point1); + this.influxDB.write(dbName, "default", ConsistencyLevel.ONE, point1); this.influxDB.deleteDatabase(dbName); } @@ -113,17 +114,12 @@ public void testTicket38() { public void testTicket39() { String dbName = "ticket39_" + System.currentTimeMillis(); this.influxDB.createDatabase(dbName); - BatchPoints batchPoints = BatchPoints - .database(dbName) + + Point point = Point.measurement("my_type") + .field("my_field", "string_value") .tag("async", "true") - .retentionPolicy("default") - .consistency(InfluxDB.ConsistencyLevel.ALL) .build(); - Point.Builder builder = Point.measurement("my_type"); - builder.field("my_field", "string_value"); - Point point = builder.build(); - batchPoints.point(point); - this.influxDB.write(batchPoints); + this.influxDB.write(dbName, "default", ConsistencyLevel.ALL, Lists.newArrayList(point)); this.influxDB.deleteDatabase(dbName); } @@ -137,7 +133,7 @@ public void testTicket40() { this.influxDB.enableBatch(100, 100, TimeUnit.MICROSECONDS); for (int i = 0; i < 1000; i++) { Point point = Point.measurement("cpu").field("idle", 99).build(); - this.influxDB.write(dbName, "default", point); + this.influxDB.write(dbName, "default", ConsistencyLevel.ONE, point); } this.influxDB.deleteDatabase(dbName); } diff --git a/src/test/java/org/influxdb/dto/PointTest.java b/src/test/java/org/influxdb/dto/PointTest.java index 981786f73..ca37e08b4 100644 --- a/src/test/java/org/influxdb/dto/PointTest.java +++ b/src/test/java/org/influxdb/dto/PointTest.java @@ -1,7 +1,6 @@ package org.influxdb.dto; -import org.testng.annotations.Test; -import static org.assertj.core.api.Assertions.*; +import static org.assertj.core.api.Assertions.assertThat; import java.math.BigDecimal; import java.math.BigInteger; @@ -9,6 +8,9 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.assertj.core.util.Lists; +import org.testng.annotations.Test; + /** * Test for the Point DTO. * @@ -65,21 +67,16 @@ public void testTicket44() { assertThat(point.lineProtocol()).asString().isEqualTo("test a=1.0 1000000"); point = Point.measurement("test").time(1, TimeUnit.NANOSECONDS).field("a", 1).build(); - BatchPoints batchPoints = BatchPoints.database("db").point(point).build(); - assertThat(batchPoints.lineProtocol()).asString().isEqualTo("test a=1.0 1\n"); + assertThat(Point.toLineProtocol(Lists.newArrayList(point))).asString().isEqualTo("test a=1.0 1\n"); point = Point.measurement("test").time(1, TimeUnit.MICROSECONDS).field("a", 1).build(); - batchPoints = BatchPoints.database("db").point(point).build(); - assertThat(batchPoints.lineProtocol()).asString().isEqualTo("test a=1.0 1000\n"); + assertThat(Point.toLineProtocol(Lists.newArrayList(point))).asString().isEqualTo("test a=1.0 1000\n"); point = Point.measurement("test").time(1, TimeUnit.MILLISECONDS).field("a", 1).build(); - batchPoints = BatchPoints.database("db").point(point).build(); - assertThat(batchPoints.lineProtocol()).asString().isEqualTo("test a=1.0 1000000\n"); + assertThat(Point.toLineProtocol(Lists.newArrayList(point))).asString().isEqualTo("test a=1.0 1000000\n"); point = Point.measurement("test").field("a", 1).time(1, TimeUnit.MILLISECONDS).build(); - batchPoints = BatchPoints.database("db").build(); - batchPoints = batchPoints.point(point); - assertThat(batchPoints.lineProtocol()).asString().isEqualTo("test a=1.0 1000000\n"); + assertThat(Point.toLineProtocol(Lists.newArrayList(point))).asString().isEqualTo("test a=1.0 1000000\n"); } From 77a16a7413a4d90eb07950bb0b2c64d6d3dab895 Mon Sep 17 00:00:00 2001 From: andrewdodd Date: Wed, 21 Oct 2015 14:19:18 +0200 Subject: [PATCH 05/13] Update InfluxDBImpl.java Forgot to include the ConsistencyLevel.ONE parameter. --- src/main/java/org/influxdb/impl/InfluxDBImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/influxdb/impl/InfluxDBImpl.java b/src/main/java/org/influxdb/impl/InfluxDBImpl.java index 1d86098d3..36fc6700a 100644 --- a/src/main/java/org/influxdb/impl/InfluxDBImpl.java +++ b/src/main/java/org/influxdb/impl/InfluxDBImpl.java @@ -136,7 +136,7 @@ public String version() { @Override public void write(final String database, final String retentionPolicy, final Point point) { - write(database, retentionPolicy, point); + write(database, retentionPolicy, ConsistencyLevel.ONE, point); } @Override From e8bd99b36290724a217ea2e2123427bb7919e26c Mon Sep 17 00:00:00 2001 From: Andrew Dodd Date: Wed, 21 Oct 2015 15:36:47 +0200 Subject: [PATCH 06/13] Data Retention Options Issue - #107 These changes add the configuration options and functionality to allow the internal BatchProcessor's data retention behaviour to be configured. The new configurable items are: - maxBatchWriteSize: The maximum number of points to attempt in one batch. - discardOnFailedWrite: If the BatchProcessor should throw away batched points if it fails to successfully write them (i.e. the current behaviour is to do this with buffered points). - BufferFailBehaviour: The behaviour the BatchProcessor should exhibit when adding to the queue fails (due to capacity problems, either implicit or explicit). --- src/main/java/org/influxdb/dto/Point.java | 8 +- .../org/influxdb/impl/BatchProcessor.java | 244 +++++++++++++--- .../java/org/influxdb/impl/InfluxDBImpl.java | 34 ++- .../org/influxdb/impl/BatchProcessorTest.java | 266 ++++++++++++++++++ 4 files changed, 512 insertions(+), 40 deletions(-) create mode 100644 src/test/java/org/influxdb/impl/BatchProcessorTest.java diff --git a/src/main/java/org/influxdb/dto/Point.java b/src/main/java/org/influxdb/dto/Point.java index f0763ef38..faf164d13 100644 --- a/src/main/java/org/influxdb/dto/Point.java +++ b/src/main/java/org/influxdb/dto/Point.java @@ -61,7 +61,7 @@ public static final class Builder { /** * @param measurement */ - Builder(final String measurement) { + protected Builder(final String measurement) { this.measurement = measurement; } @@ -162,6 +162,10 @@ public Point build() { void setMeasurement(final String measurement) { this.measurement = measurement; } + + public String getMeasurement() { + return measurement; + } /** * @param time @@ -239,7 +243,7 @@ public String lineProtocol() { sb.append(formatedTime()); return sb.toString(); } - + private StringBuilder concatenatedTags() { final StringBuilder sb = new StringBuilder(); for (Entry tag : this.tags.entrySet()) { diff --git a/src/main/java/org/influxdb/impl/BatchProcessor.java b/src/main/java/org/influxdb/impl/BatchProcessor.java index a06557039..627adf409 100644 --- a/src/main/java/org/influxdb/impl/BatchProcessor.java +++ b/src/main/java/org/influxdb/impl/BatchProcessor.java @@ -4,18 +4,21 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.BlockingDeque; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.influxdb.InfluxDB; import org.influxdb.InfluxDB.ConsistencyLevel; import org.influxdb.dto.Point; +import com.google.common.base.Function; import com.google.common.base.Objects; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; /** @@ -26,21 +29,40 @@ * */ public class BatchProcessor { - protected final BlockingQueue queue = new LinkedBlockingQueue<>(); + public static final int DEFAULT_ACTIONS = 10; + public static final int DEFAULT_FLUSH_INTERVAL = 100; + public static final TimeUnit DEFAULT_FLUSH_INTERVAL_TIME_UINT = TimeUnit.MILLISECONDS; + public static final int DEFAULT_MAX_BATCH_WRITE_SIZE = 50; + + protected final BlockingDeque queue; private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); final InfluxDBImpl influxDB; final int actions; private final TimeUnit flushIntervalUnit; private final int flushInterval; + private BufferFailBehaviour behaviour; + private boolean discardOnFailedWrite = true; + AtomicBoolean writeLockout = new AtomicBoolean(false); + private int maxBatchWriteSize; + + public enum BufferFailBehaviour { + THROW_EXCEPTION, DROP_CURRENT, DROP_OLDEST, BLOCK_THREAD, + } + + /** * The Builder to create a BatchProcessor instance. */ public static final class Builder { private final InfluxDBImpl influxDB; - private int actions; - private TimeUnit flushIntervalUnit; - private int flushInterval; + private int actions = DEFAULT_ACTIONS; + private TimeUnit flushIntervalUnit = DEFAULT_FLUSH_INTERVAL_TIME_UINT; + private int flushInterval = DEFAULT_FLUSH_INTERVAL; + private Integer capacity = null; + private BufferFailBehaviour behaviour = BufferFailBehaviour.THROW_EXCEPTION; + private boolean discardOnFailedWrite = true; + private int maxBatchWriteSize = DEFAULT_MAX_BATCH_WRITE_SIZE; /** * @param influxDB @@ -53,12 +75,13 @@ public Builder(final InfluxDB influxDB) { /** * The number of actions after which a batchwrite must be performed. * - * @param maxActions - * number of Points written after which a write must happen. + * @param actions + * number of Points written after which a write should + * happen. * @return this Builder to use it fluent */ - public Builder actions(final int maxActions) { - this.actions = maxActions; + public Builder actions(final int actions) { + this.actions = actions; return this; } @@ -78,16 +101,87 @@ public Builder interval(final int interval, final TimeUnit unit) { return this; } + /** + * The maximum queue capacity. + * + * @param capacity + * the maximum number of points to hold Should be NULL, for + * no buffering OR > 0 for buffering (NB: a capacity of 1 + * will not really buffer) + * @return this {@code Builder}, to allow chaining + */ + public Builder capacity(final Integer capacity) { + this.capacity = capacity; + return this; + } + + /** + * Set both the capacity and actions + * @param capacity + * @param actions + * @return this builder instance, for fluent usage + */ + public Builder capacityAndActions(final Integer capacity, final int actions) { + this.capacity = capacity; + this.actions = actions; + return this; + } + + /** + * The behaviour when a put to the buffer fails + * + * @param behaviour + * @return this builder instance, for fluent usage + */ + public Builder behaviour(final BufferFailBehaviour behaviour) { + this.behaviour = behaviour; + return this; + } + + /** + * Controls whether the buffer will keep or discard buffered points on + * network errors. + * + * @param discardOnFailedWrite + * @return this builder instance, for fluent usage + */ + public Builder discardOnFailedWrite(final boolean discardOnFailedWrite) { + this.discardOnFailedWrite = discardOnFailedWrite; + return this; + } + + /** + * The maximum number of points to write in a batch + * + * @param maxBatchWriteSize + * @return this builder instance, for fluent usage + */ + public Builder maxBatchWriteSize(final int maxBatchWriteSize) { + this.maxBatchWriteSize = maxBatchWriteSize; + return this; + } + /** * Create the BatchProcessor. * * @return the BatchProcessor instance. */ public BatchProcessor build() { - Preconditions.checkNotNull(actions, "actions may not be null"); - Preconditions.checkNotNull(flushInterval, "flushInterval may not be null"); + Preconditions.checkArgument(actions > 0, "actions must be > 0"); + Preconditions.checkArgument(flushInterval > 0, "flushInterval must be > 0"); Preconditions.checkNotNull(flushIntervalUnit, "flushIntervalUnit may not be null"); - return new BatchProcessor(influxDB, actions, flushIntervalUnit, flushInterval); + Preconditions.checkArgument(maxBatchWriteSize > 0, "maxBatchWriteSize must be > 0"); + + if (capacity != null) { + Preconditions.checkArgument(capacity > 0, "Capacity should be > 0 or NULL"); + Preconditions.checkArgument(capacity >= actions, "Capacity must be >= than actions"); + } else { + Preconditions.checkArgument(behaviour != BufferFailBehaviour.DROP_OLDEST, + "Behaviour cannot be DROP_OLDEST if capacity not set"); + } + + return new BatchProcessor(influxDB, actions, flushIntervalUnit, flushInterval, capacity, behaviour, + discardOnFailedWrite, maxBatchWriteSize); } } @@ -134,17 +228,31 @@ public static Builder builder(final InfluxDB influxDB) { } BatchProcessor(final InfluxDBImpl influxDB, final int actions, final TimeUnit flushIntervalUnit, - final int flushInterval) { + final int flushInterval, final Integer capacity, final BufferFailBehaviour behaviour, + boolean discardOnFailedWrite, final int maxBatchWriteSize) { super(); this.influxDB = influxDB; this.actions = actions; this.flushIntervalUnit = flushIntervalUnit; this.flushInterval = flushInterval; + this.behaviour = behaviour; + this.discardOnFailedWrite = discardOnFailedWrite; + this.maxBatchWriteSize = maxBatchWriteSize; + + if (capacity != null) { + if (capacity == 0) { + throw new IllegalArgumentException("capacity cannot be 0"); + } + queue = new LinkedBlockingDeque(capacity); + } else { + queue = new LinkedBlockingDeque(); + } // Flush at specified Rate scheduler.scheduleAtFixedRate(new Runnable() { @Override public void run() { + writeLockout.set(false); write(); } }, this.flushInterval, this.flushInterval, this.flushIntervalUnit); @@ -156,42 +264,116 @@ void write() { return; } - Map> databaseToBatchPoints = Maps.newHashMap(); - List batchEntries = new ArrayList<>(queue.size()); - queue.drainTo(batchEntries); + // Never write the whole queue, it could be very big, so just get a temporary list + List writeList = new ArrayList(maxBatchWriteSize); + queue.drainTo(writeList, maxBatchWriteSize); - for (BatchEntry batchEntry : batchEntries) { + // Map the writeList by the common (and hence batchable) fields + Map> databaseToBatchPoints = Maps.newHashMap(); + + for (BatchEntry batchEntry : writeList) { BatchCommonFields common = BatchCommonFields.fromEntry(batchEntry); - + if (!databaseToBatchPoints.containsKey(common)) { - databaseToBatchPoints.put(common, new ArrayList()); + databaseToBatchPoints.put(common, new ArrayList()); } - databaseToBatchPoints.get(common).add(batchEntry.getPoint()); + databaseToBatchPoints.get(common).add(batchEntry); } - for (Entry> entry : databaseToBatchPoints.entrySet()) { + // For each collection of batchable fields, attempt a batched write + for (Entry> entry : databaseToBatchPoints.entrySet()) { BatchCommonFields common = entry.getKey(); - List points = entry.getValue(); - influxDB.write(common.database, common.retentionPolicy, common.consistencyLevel, points); + List batchEntries = entry.getValue(); + + List points = Lists.transform(batchEntries, new Function() { + @Override + public Point apply(BatchEntry input) { + return input.point; + } + }); + + try { + influxDB.writeBatched(common.database, common.retentionPolicy, common.consistencyLevel, points); + writeList.removeAll(batchEntries); + } catch (Exception e) { + // TODO: we should probably include some logging here + e.printStackTrace(); + } + } + + + if (!writeList.isEmpty()) { + // Some points were not written, return them to the queue if necessary + if (!discardOnFailedWrite) { + // If we failed our write, add back the elements from this + // attempt in REVERSE order to maintain queue ordering + for (BatchEntry batchEntry : Lists.reverse(writeList)) { + if (!queue.offerFirst(batchEntry)) { + break; + } + } + + // Enable the rate throttling + writeLockout.set(true); + } } } + /** * Put a single BatchEntry to the cache for later processing. * * @param batchEntry * the batchEntry to write to the cache. + * @return */ - void put(final BatchEntry batchEntry) { - queue.add(batchEntry); - if (queue.size() >= actions) { - write(); + public boolean put(String database, String retentionPolicy, ConsistencyLevel consistency, Point point) { + BatchEntry entry = new BatchEntry(point, database, consistency, retentionPolicy); + boolean added = false; + + switch(behaviour) { + case DROP_CURRENT: + added = queue.offer(entry); + break; + case DROP_OLDEST: + added = addAndDropIfNecessary(entry); + break; + case THROW_EXCEPTION: + added = queue.add(entry); + break; + case BLOCK_THREAD: + try { + queue.put(entry); + added = true; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + break; + default: + throw new UnsupportedOperationException("Behaviour not yet supported"); + } + + if (!writeLockout.get()) { + if (queue.size() >= actions) { + write(); + } } + + return added; + } + + public boolean addAndDropIfNecessary(BatchEntry entry) { + boolean added = queue.offer(entry); + if (!added) { + queue.poll(); // Remove the front of the queue + added = queue.add(entry); + } + return added; } /** - * Flush the current open writes to influxdb and end stop the reaper thread. This should only be - * called if no batch processing is needed anymore. + * Flush the current open writes to influxdb and end stop the reaper thread. + * This should only be called if no batch processing is needed anymore. * */ void flush() { @@ -235,7 +417,5 @@ public static BatchCommonFields fromEntry(BatchEntry batchEntry) { return new BatchCommonFields(batchEntry.getDatabase(), batchEntry.getRetentionPolicy(), batchEntry.getConsistencyLevel()); } - - } } diff --git a/src/main/java/org/influxdb/impl/InfluxDBImpl.java b/src/main/java/org/influxdb/impl/InfluxDBImpl.java index 1d86098d3..2af5368f9 100644 --- a/src/main/java/org/influxdb/impl/InfluxDBImpl.java +++ b/src/main/java/org/influxdb/impl/InfluxDBImpl.java @@ -11,7 +11,7 @@ import org.influxdb.dto.Pong; import org.influxdb.dto.Query; import org.influxdb.dto.QueryResult; -import org.influxdb.impl.BatchProcessor.BatchEntry; +import org.influxdb.impl.BatchProcessor.BufferFailBehaviour; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; @@ -88,19 +88,42 @@ public InfluxDB setLogLevel(final LogLevel logLevel) { return this; } - @Override - public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit) { + public InfluxDB enableBatch( + final int capacity, + final int actions, + final int flushDuration, + final TimeUnit flushDurationTimeUnit, + BufferFailBehaviour behaviour, + boolean discardOnFailedWrite, + int maxBatchWriteSize) { if (batchEnabled.get()) { throw new IllegalArgumentException("BatchProcessing is already enabled."); } batchProcessor = BatchProcessor .builder(this) - .actions(actions) + .capacityAndActions(capacity, actions) .interval(flushDuration, flushDurationTimeUnit) + .behaviour(behaviour) + .discardOnFailedWrite(discardOnFailedWrite) + .maxBatchWriteSize(maxBatchWriteSize) .build(); batchEnabled.set(true); return this; } + + @Override + public InfluxDB enableBatch(final int actions, + final int flushDuration, + final TimeUnit flushDurationTimeUnit) { + + enableBatch(0, + actions, + flushDuration, + flushDurationTimeUnit, + BufferFailBehaviour.THROW_EXCEPTION, + true, actions); + return this; + } @Override public void disableBatch() { @@ -142,8 +165,7 @@ public void write(final String database, final String retentionPolicy, final Poi @Override public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistencyLevel, final Point point) { if (batchEnabled.get()) { - BatchEntry batchEntry = new BatchEntry(point, database, consistencyLevel, retentionPolicy); - batchProcessor.put(batchEntry); + batchProcessor.put(database, retentionPolicy, consistencyLevel, point); } else { writeUnbatched(database, retentionPolicy, ConsistencyLevel.ONE, point); } diff --git a/src/test/java/org/influxdb/impl/BatchProcessorTest.java b/src/test/java/org/influxdb/impl/BatchProcessorTest.java new file mode 100644 index 000000000..6316a2074 --- /dev/null +++ b/src/test/java/org/influxdb/impl/BatchProcessorTest.java @@ -0,0 +1,266 @@ +package org.influxdb.impl; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.influxdb.InfluxDB.ConsistencyLevel; +import org.influxdb.dto.Point; +import org.influxdb.impl.BatchProcessor.BufferFailBehaviour; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class BatchProcessorTest { + private static class AnonInfluxDBImpl extends InfluxDBImpl { + private static final String FAIL_DATABASE = "fail_db"; + private final boolean throwErrorOnWriteBatched; + private int writeCalled = 0; + + public AnonInfluxDBImpl(final boolean throwErrorOnWriteBatched) { + super("temp", "user", "pass"); + this.throwErrorOnWriteBatched = throwErrorOnWriteBatched; + } + + @Override + protected void writeBatched(String database, String retentionPolicy, ConsistencyLevel consistencyLevel, + List points) { + writeCalled++; + if (throwErrorOnWriteBatched) { + throw new RuntimeException("Anon error"); + } + + if (FAIL_DATABASE.equals(database)) { + throw new RuntimeException("Will not write to fail db"); + } + } + + @Override + protected void writeUnbatched(String database, String retentionPolicy, ConsistencyLevel consistencyLevel, + Point point) { + } + } + + private static Point getAnonPoint() { + return getPoint("anon"); + } + + private static Point getPoint(String measurement) { + return Point.measurement(measurement) + .field("field", "value").build(); + } + + private static AnonInfluxDBImpl getAnonInfluxDB() { + return new AnonInfluxDBImpl(false); + } + + private static AnonInfluxDBImpl getErrorThrowingDB() { + return new AnonInfluxDBImpl(true); + } + + private final String ANON_DB = "db"; + private final String ANON_RETENTION = "default"; + private final ConsistencyLevel ANON_CONSISTENCY = ConsistencyLevel.ONE; + + @Test(expectedExceptions={IllegalArgumentException.class}) + public void cannotBuildWithActionsGreaterThanCapacity() { + BatchProcessor.builder(getAnonInfluxDB()) + .capacityAndActions(1, 2) + .build(); + } + + @Test(expectedExceptions={IllegalStateException.class}) + public void addingThrowsExceptionWhenBehaviourIsThrowExceptionAndQueueAtCapacity() { + BatchProcessor subject = BatchProcessor.builder(getErrorThrowingDB()) + .interval(1, TimeUnit.DAYS) + .capacityAndActions(1, 1) + .discardOnFailedWrite(false) + .behaviour(BufferFailBehaviour.THROW_EXCEPTION) + .build(); + + boolean putResult; + putResult = subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getAnonPoint()); + Assert.assertTrue(putResult); + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getAnonPoint()); + } + + @Test + public void addingEvictsOldestWhenBehaviourIsDropOldestAndQueueAtCapacity() { + BatchProcessor subject = BatchProcessor.builder(getErrorThrowingDB()) + .interval(1, TimeUnit.DAYS) + .capacityAndActions(1, 1) + .discardOnFailedWrite(false) + .behaviour(BufferFailBehaviour.DROP_OLDEST) + .build(); + + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure1")); + Assert.assertEquals(subject.queue.peek().getPoint().getMeasurement(), "measure1"); + + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure2")); + Assert.assertEquals(subject.queue.peek().getPoint().getMeasurement(), "measure2"); + + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure3")); + Assert.assertEquals(subject.queue.peek().getPoint().getMeasurement(), "measure3"); + + } + + @Test + public void addingDoesNotInsertCurrentWhenBehaviourIsDropCurrentAndKeepOnFailedWriteAndQueueAtCapacity() { + BatchProcessor subject = BatchProcessor.builder(getErrorThrowingDB()) + .interval(1, TimeUnit.DAYS) + .capacityAndActions(1, 1) + .behaviour(BufferFailBehaviour.DROP_CURRENT) + .discardOnFailedWrite(false) + .build(); + + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure1")); + Assert.assertEquals(subject.queue.peek().getPoint().getMeasurement(), "measure1"); + + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure2")); + Assert.assertEquals(subject.queue.peek().getPoint().getMeasurement(), "measure1"); + + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure3")); + Assert.assertEquals(subject.queue.peek().getPoint().getMeasurement(), "measure1"); + + } + + @Test(expectedExceptions = { IllegalArgumentException.class }) + public void cannotBeBuiltWithDropOldestBehaviourAndWithoutCapacityLimit() { + BatchProcessor.builder(getAnonInfluxDB()) + .interval(1, TimeUnit.DAYS) + .behaviour(BufferFailBehaviour.DROP_OLDEST) + .build(); + + } + + @Test + public void pointsAreRemovedFromQueueAfterSuccessfulWrite() { + BatchProcessor subject = BatchProcessor.builder(getAnonInfluxDB()) + .interval(1, TimeUnit.DAYS) + .build(); + + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getAnonPoint()); + Assert.assertEquals(subject.queue.size(), 1); + subject.write(); + Assert.assertEquals(subject.queue.size(), 0); + } + + @Test + public void keepOnFailedWriteProcessorRetainsPointsAfterExceptionThrown() { + BatchProcessor subject = BatchProcessor.builder(getErrorThrowingDB()) + .interval(1, TimeUnit.DAYS) + .discardOnFailedWrite(false) + .build(); + + Point point = getAnonPoint(); + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, point); + Assert.assertEquals(subject.queue.size(), 1); + subject.write(); + Assert.assertEquals(subject.queue.size(), 1); + // TODO this is bad, Law of Demeter violation! + Assert.assertEquals(subject.queue.peek().getPoint(), point); + } + + @Test + public void discardOnFailedWriteProcessorDropsPointsAfterExceptionThrown() { + BatchProcessor subject = BatchProcessor.builder(getErrorThrowingDB()) + .interval(1, TimeUnit.DAYS) + .discardOnFailedWrite(true) + .build(); + + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getAnonPoint()); + Assert.assertEquals(subject.queue.size(), 1); + subject.write(); + Assert.assertEquals(subject.queue.size(), 0); + } + + @Test + public void writeCalledAfterActionsReached() { + AnonInfluxDBImpl influxDb = getAnonInfluxDB(); + BatchProcessor subject = BatchProcessor.builder(influxDb) + .interval(1, TimeUnit.DAYS) + .actions(2) + .build(); + + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getAnonPoint()); + Assert.assertEquals(subject.queue.size(), 1); + Assert.assertEquals(influxDb.writeCalled, 0); + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getAnonPoint()); + Assert.assertEquals(subject.queue.size(), 0); + Assert.assertEquals(influxDb.writeCalled, 1); + } + + @Test + public void writeNotCascadedAfterWriteFailure() { + AnonInfluxDBImpl influxDb = getErrorThrowingDB(); + BatchProcessor subject = BatchProcessor.builder(influxDb) + .interval(1, TimeUnit.DAYS) + .capacityAndActions(3, 1) + .discardOnFailedWrite(false) + .build(); + + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getAnonPoint()); + Assert.assertEquals(subject.queue.size(), 1); + Assert.assertEquals(influxDb.writeCalled, 1); + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getAnonPoint()); + Assert.assertEquals(subject.queue.size(), 2); + Assert.assertEquals(influxDb.writeCalled, 1); + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getAnonPoint()); + Assert.assertEquals(subject.queue.size(), 3); + Assert.assertEquals(influxDb.writeCalled, 1); + } + + @Test + public void successfullyWrittenPointsAreNotReturnedToQueue() { + AnonInfluxDBImpl influxDb = getAnonInfluxDB(); + BatchProcessor subject = BatchProcessor.builder(influxDb) + .interval(1, TimeUnit.DAYS) + .capacityAndActions(3, 3) + .discardOnFailedWrite(false) + .build(); + + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure1")); + subject.put(AnonInfluxDBImpl.FAIL_DATABASE, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure3")); + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure2")); + Assert.assertEquals(influxDb.writeCalled, 2); // Once for ANON_TB, once for FAIL_DATABASE + Assert.assertEquals(subject.queue.size(), 1); + Assert.assertEquals(subject.queue.peek().getPoint().getMeasurement(), "measure3"); + } + + @Test + public void unsuccessfullyWrittenPointsAreReturnedToQueueInCorrectOrder() { + AnonInfluxDBImpl influxDb = getErrorThrowingDB(); + BatchProcessor subject = BatchProcessor.builder(influxDb) + .interval(1, TimeUnit.DAYS) + .capacityAndActions(4, 4) + .discardOnFailedWrite(false) + .build(); + + subject.put("db1", ANON_RETENTION, ANON_CONSISTENCY, getPoint("inserted1")); + subject.put("db2", ANON_RETENTION, ANON_CONSISTENCY, getPoint("inserted2")); + subject.put("db2", ANON_RETENTION, ANON_CONSISTENCY, getPoint("inserted3")); + subject.put("db1", ANON_RETENTION, ANON_CONSISTENCY, getPoint("inserted4")); + + Assert.assertEquals(influxDb.writeCalled, 2); // Once for db1, once for db2 + Assert.assertEquals(subject.queue.size(), 4); + Assert.assertEquals(subject.queue.peekFirst().getPoint().getMeasurement(), "inserted1"); + Assert.assertEquals(subject.queue.peekLast().getPoint().getMeasurement(), "inserted4"); + } + + @Test + public void writeOnlyAttemptsUpToMaxBatchWrite() { + AnonInfluxDBImpl influxDb = getAnonInfluxDB(); + BatchProcessor subject = BatchProcessor.builder(influxDb) + .interval(1, TimeUnit.DAYS) + .capacityAndActions(3, 3) + .maxBatchWriteSize(2) + .discardOnFailedWrite(false) + .build(); + + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure1")); + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure2")); + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure3")); + + Assert.assertEquals(influxDb.writeCalled, 1); + Assert.assertEquals(subject.queue.size(), 1); + Assert.assertEquals(subject.queue.peek().getPoint().getMeasurement(), "measure3"); + } +} From 2db394cdc23f1f871ef005c0cfcf9228632daede Mon Sep 17 00:00:00 2001 From: Andrew Dodd Date: Thu, 22 Oct 2015 08:03:30 +1000 Subject: [PATCH 07/13] Data Retention Enhancement Fixing the public API to include the required enums and configuration methods. Cleaning up the attribute definitions in BatchProcessor. --- src/main/java/org/influxdb/InfluxDB.java | 54 ++++++++++++++++++- .../org/influxdb/impl/BatchProcessor.java | 19 +++---- .../java/org/influxdb/impl/InfluxDBImpl.java | 3 +- .../org/influxdb/impl/BatchProcessorTest.java | 2 +- 4 files changed, 62 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/influxdb/InfluxDB.java b/src/main/java/org/influxdb/InfluxDB.java index c229a0512..af9fcc7f3 100644 --- a/src/main/java/org/influxdb/InfluxDB.java +++ b/src/main/java/org/influxdb/InfluxDB.java @@ -66,6 +66,21 @@ public String value() { return this.value; } } + + /** + * Behaviour options for when a put fails to add to the buffer. This is + * particularly important when using capacity limited buffering. + */ + public enum BufferFailBehaviour { + /** Throw an exception if cannot add to buffer */ + THROW_EXCEPTION, + /** Drop (do not add) the element attempting to be added */ + DROP_CURRENT, + /** Drop the oldest element in the queue and add the current element */ + DROP_OLDEST, + /** Block the thread until the space becomes available (NB: Not tested) */ + BLOCK_THREAD, + } /** * Set the loglevel which is used for REST related actions. @@ -75,7 +90,7 @@ public String value() { * @return the InfluxDB instance to be able to use it in a fluent manner. */ public InfluxDB setLogLevel(final LogLevel logLevel); - + /** * Enable Batching of single Point writes to speed up writes significant. If either actions or * flushDurations is reached first, a batchwrite is issued. @@ -89,6 +104,43 @@ public String value() { */ public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit); + /** + * Enable Batching of single Point with a capacity limit. Batching provides + * a significant performance improvement in write speed. If either actions + * or flushDurations is reached first, a batchwrite is issued. + * + * This allows greater control over the behaviour when the capacity of the + * underlying buffer is limited. + * + * @param capacity + * the maximum number of points to hold. Should be NULL, for no + * buffering OR > 0 for buffering (NB: a capacity of 1 will not + * really buffer) + * @param actions + * the number of actions to collect before triggering a batched + * write + * @param flushDuration + * the amount of time to wait, at most, before triggering a + * batched write + * @param flushDurationTimeUnit + * the time unit for the flushDuration parameter + * @param behaviour + * the desired behaviour when capacity constrains are met + * @param discardOnFailedWrite + * if FALSE, the points from a failed batch write buffer will + * attempt to put them back onto the queue if TRUE, the points + * froma failed batch write will be discarded + * @param maxBatchWriteSize + * the maximum number of points to include in one batch write + * attempt. NB: this is different from the actions parameter, as + * the buffer can hold more than the actions parameter + * @return + */ + public InfluxDB enableBatch(final Integer capacity, final int actions, + final int flushDuration, final TimeUnit flushDurationTimeUnit, + BufferFailBehaviour behaviour, boolean discardOnFailedWrite, + int maxBatchWriteSize); + /** * Disable Batching. */ diff --git a/src/main/java/org/influxdb/impl/BatchProcessor.java b/src/main/java/org/influxdb/impl/BatchProcessor.java index 627adf409..57c9879fd 100644 --- a/src/main/java/org/influxdb/impl/BatchProcessor.java +++ b/src/main/java/org/influxdb/impl/BatchProcessor.java @@ -12,6 +12,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.influxdb.InfluxDB; +import org.influxdb.InfluxDB.BufferFailBehaviour; import org.influxdb.InfluxDB.ConsistencyLevel; import org.influxdb.dto.Point; @@ -36,20 +37,15 @@ public class BatchProcessor { protected final BlockingDeque queue; private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - final InfluxDBImpl influxDB; - final int actions; + private final InfluxDBImpl influxDB; + private final int actions; private final TimeUnit flushIntervalUnit; private final int flushInterval; - private BufferFailBehaviour behaviour; - private boolean discardOnFailedWrite = true; - AtomicBoolean writeLockout = new AtomicBoolean(false); - private int maxBatchWriteSize; - - public enum BufferFailBehaviour { - THROW_EXCEPTION, DROP_CURRENT, DROP_OLDEST, BLOCK_THREAD, - } - + private final BufferFailBehaviour behaviour; + private final boolean discardOnFailedWrite; + private final int maxBatchWriteSize; + private final AtomicBoolean writeLockout = new AtomicBoolean(false); /** * The Builder to create a BatchProcessor instance. @@ -297,7 +293,6 @@ public Point apply(BatchEntry input) { writeList.removeAll(batchEntries); } catch (Exception e) { // TODO: we should probably include some logging here - e.printStackTrace(); } } diff --git a/src/main/java/org/influxdb/impl/InfluxDBImpl.java b/src/main/java/org/influxdb/impl/InfluxDBImpl.java index 2af5368f9..0455d28ba 100644 --- a/src/main/java/org/influxdb/impl/InfluxDBImpl.java +++ b/src/main/java/org/influxdb/impl/InfluxDBImpl.java @@ -11,7 +11,6 @@ import org.influxdb.dto.Pong; import org.influxdb.dto.Query; import org.influxdb.dto.QueryResult; -import org.influxdb.impl.BatchProcessor.BufferFailBehaviour; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; @@ -89,7 +88,7 @@ public InfluxDB setLogLevel(final LogLevel logLevel) { } public InfluxDB enableBatch( - final int capacity, + final Integer capacity, final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit, diff --git a/src/test/java/org/influxdb/impl/BatchProcessorTest.java b/src/test/java/org/influxdb/impl/BatchProcessorTest.java index 6316a2074..1da5a1123 100644 --- a/src/test/java/org/influxdb/impl/BatchProcessorTest.java +++ b/src/test/java/org/influxdb/impl/BatchProcessorTest.java @@ -3,9 +3,9 @@ import java.util.List; import java.util.concurrent.TimeUnit; +import org.influxdb.InfluxDB.BufferFailBehaviour; import org.influxdb.InfluxDB.ConsistencyLevel; import org.influxdb.dto.Point; -import org.influxdb.impl.BatchProcessor.BufferFailBehaviour; import org.testng.Assert; import org.testng.annotations.Test; From 7f8b497435b5fefe2636184fa21185a98625b20e Mon Sep 17 00:00:00 2001 From: Andrew Dodd Date: Thu, 22 Oct 2015 08:08:36 +1000 Subject: [PATCH 08/13] Data Retention Enhancement Adding 'interrogation' functions to the InfluxDB interface, for investigating the state of the underlying buffer. NB: This now requires synchronisation, eek! --- src/main/java/org/influxdb/InfluxDB.java | 18 +++ src/main/java/org/influxdb/dto/Point.java | 8 ++ .../org/influxdb/impl/BatchProcessor.java | 131 ++++++++++++------ .../java/org/influxdb/impl/InfluxDBImpl.java | 25 ++++ .../org/influxdb/impl/BatchProcessorTest.java | 47 +++++++ 5 files changed, 188 insertions(+), 41 deletions(-) diff --git a/src/main/java/org/influxdb/InfluxDB.java b/src/main/java/org/influxdb/InfluxDB.java index af9fcc7f3..236c76622 100644 --- a/src/main/java/org/influxdb/InfluxDB.java +++ b/src/main/java/org/influxdb/InfluxDB.java @@ -9,6 +9,8 @@ import org.influxdb.dto.Query; import org.influxdb.dto.QueryResult; +import com.google.common.base.Optional; + /** * Interface with all available methods to access a InfluxDB database. * @@ -245,4 +247,20 @@ public InfluxDB enableBatch(final Integer capacity, final int actions, */ public List describeDatabases(); + /** + * Get the number of buffered points NB: If batching is not enabled this + * will return 0 + * + * @return + */ + public int getBufferedCount(); + + /** + * Retrieves, but does not remove, the first element of the buffer + * + * @return an Optional containing the first element in the queue if + * it is present + */ + public Optional peekFirstBuffered(); + } \ No newline at end of file diff --git a/src/main/java/org/influxdb/dto/Point.java b/src/main/java/org/influxdb/dto/Point.java index faf164d13..214607e28 100644 --- a/src/main/java/org/influxdb/dto/Point.java +++ b/src/main/java/org/influxdb/dto/Point.java @@ -174,6 +174,10 @@ public String getMeasurement() { void setTime(final Long time) { this.time = time; } + + public Long getTime() { + return time; + } /** * @param tags @@ -197,6 +201,10 @@ Map getTags() { void setPrecision(final TimeUnit precision) { this.precision = precision; } + + public TimeUnit getPrecision() { + return precision; + } /** * @param fields diff --git a/src/main/java/org/influxdb/impl/BatchProcessor.java b/src/main/java/org/influxdb/impl/BatchProcessor.java index 57c9879fd..a88661e91 100644 --- a/src/main/java/org/influxdb/impl/BatchProcessor.java +++ b/src/main/java/org/influxdb/impl/BatchProcessor.java @@ -18,6 +18,7 @@ import com.google.common.base.Function; import com.google.common.base.Objects; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -47,6 +48,9 @@ public class BatchProcessor { private final AtomicBoolean writeLockout = new AtomicBoolean(false); + private final Object queueLock = new Object(); + private final ArrayList writeList; + /** * The Builder to create a BatchProcessor instance. */ @@ -234,6 +238,7 @@ public static Builder builder(final InfluxDB influxDB) { this.behaviour = behaviour; this.discardOnFailedWrite = discardOnFailedWrite; this.maxBatchWriteSize = maxBatchWriteSize; + this.writeList = Lists.newArrayListWithCapacity(maxBatchWriteSize); if (capacity != null) { if (capacity == 0) { @@ -260,9 +265,12 @@ void write() { return; } - // Never write the whole queue, it could be very big, so just get a temporary list - List writeList = new ArrayList(maxBatchWriteSize); - queue.drainTo(writeList, maxBatchWriteSize); + synchronized (queueLock) { + writeList.clear(); // probably redundant + // Never write the whole queue, it could be very big, so just get a + // temporary list + queue.drainTo(writeList, maxBatchWriteSize); + } // Map the writeList by the common (and hence batchable) fields Map> databaseToBatchPoints = Maps.newHashMap(); @@ -282,11 +290,11 @@ void write() { List batchEntries = entry.getValue(); List points = Lists.transform(batchEntries, new Function() { - @Override - public Point apply(BatchEntry input) { - return input.point; - } - }); + @Override + public Point apply(BatchEntry input) { + return input.point; + } + }); try { influxDB.writeBatched(common.database, common.retentionPolicy, common.consistencyLevel, points); @@ -296,25 +304,35 @@ public Point apply(BatchEntry input) { } } - if (!writeList.isEmpty()) { - // Some points were not written, return them to the queue if necessary - if (!discardOnFailedWrite) { - // If we failed our write, add back the elements from this - // attempt in REVERSE order to maintain queue ordering - for (BatchEntry batchEntry : Lists.reverse(writeList)) { - if (!queue.offerFirst(batchEntry)) { - break; + // Some points were not written, return them to the queue if + // necessary + synchronized (queueLock) { + if (!discardOnFailedWrite) { + // If we failed our write, add back the elements from this + // attempt in REVERSE order to maintain queue ordering + for (BatchEntry batchEntry : Lists.reverse(writeList)) { + boolean insertedAtStart = queue.offerFirst(batchEntry); + if (!insertedAtStart) { + // We have inserted as much as we can, may as well + // stop. + + // NB: There is possibly a need for an enhancement + // here based on the behaviour attribute, but for + // now I cannot think of a more reasonable action + // than the current behaviour + break; + } } - } - // Enable the rate throttling - writeLockout.set(true); + // Enable the rate throttling + writeLockout.set(true); + } + writeList.clear(); } } } - /** * Put a single BatchEntry to the cache for later processing. * @@ -326,28 +344,31 @@ public boolean put(String database, String retentionPolicy, ConsistencyLevel con BatchEntry entry = new BatchEntry(point, database, consistency, retentionPolicy); boolean added = false; - switch(behaviour) { - case DROP_CURRENT: - added = queue.offer(entry); - break; - case DROP_OLDEST: - added = addAndDropIfNecessary(entry); - break; - case THROW_EXCEPTION: - added = queue.add(entry); - break; - case BLOCK_THREAD: - try { - queue.put(entry); - added = true; - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - break; - default: + synchronized (queueLock) { + + switch (behaviour) { + case DROP_CURRENT: + added = queue.offer(entry); + break; + case DROP_OLDEST: + added = addAndDropIfNecessary(entry); + break; + case THROW_EXCEPTION: + added = queue.add(entry); + break; + case BLOCK_THREAD: + try { + queue.put(entry); + added = true; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + break; + default: throw new UnsupportedOperationException("Behaviour not yet supported"); + } } - + if (!writeLockout.get()) { if (queue.size() >= actions) { write(); @@ -356,7 +377,7 @@ public boolean put(String database, String retentionPolicy, ConsistencyLevel con return added; } - + public boolean addAndDropIfNecessary(BatchEntry entry) { boolean added = queue.offer(entry); if (!added) { @@ -413,4 +434,32 @@ public static BatchCommonFields fromEntry(BatchEntry batchEntry) { batchEntry.getRetentionPolicy(), batchEntry.getConsistencyLevel()); } } + + public int getBufferedCount() { + synchronized (queueLock) { + return writeList.size() + queue.size(); + } + } + + /** + * Retrieves, but does not remove, the first element of the buffer + * + * @return an Optional containing the first element in the queue + */ + public Optional peekFirstBuffered() { + BatchEntry batchEntry = null; + synchronized (queueLock) { + if (!writeList.isEmpty()) { + batchEntry = writeList.get(0); + } else { + batchEntry = queue.peekFirst(); + } + } + + if (batchEntry == null) { + return Optional.absent(); + } + + return Optional.of(batchEntry.point); + } } diff --git a/src/main/java/org/influxdb/impl/InfluxDBImpl.java b/src/main/java/org/influxdb/impl/InfluxDBImpl.java index 0455d28ba..6b8d49136 100644 --- a/src/main/java/org/influxdb/impl/InfluxDBImpl.java +++ b/src/main/java/org/influxdb/impl/InfluxDBImpl.java @@ -12,6 +12,7 @@ import org.influxdb.dto.Query; import org.influxdb.dto.QueryResult; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; @@ -64,6 +65,10 @@ public InfluxDBImpl(final String url, final String username, final String passwo .build(); influxDBService = restAdapter.create(InfluxDBService.class); } + + protected BatchProcessor getBatchProcessor() { + return batchProcessor; + } @Override public InfluxDB setLogLevel(final LogLevel logLevel) { @@ -259,4 +264,24 @@ public List describeDatabases() { return databases; } + public int getBufferedCount() { + if (batchEnabled.get()) { + return batchProcessor.getBufferedCount(); + } + + return 0; + } + + @Override + public Optional peekFirstBuffered() { + if (batchEnabled.get()) { + Optional point = batchProcessor.peekFirstBuffered(); + + if (point.isPresent()) { + return Optional.of(point.get()); + } + } + + return Optional.absent(); + } } diff --git a/src/test/java/org/influxdb/impl/BatchProcessorTest.java b/src/test/java/org/influxdb/impl/BatchProcessorTest.java index 1da5a1123..f6e17e32b 100644 --- a/src/test/java/org/influxdb/impl/BatchProcessorTest.java +++ b/src/test/java/org/influxdb/impl/BatchProcessorTest.java @@ -38,6 +38,30 @@ protected void writeUnbatched(String database, String retentionPolicy, Consisten Point point) { } } + + private static class QueueDepthRecordingDBImpl extends InfluxDBImpl { + private int queueDepth = 0; + private int writeCalled = 0; + + public QueueDepthRecordingDBImpl() { + super("temp", "user", "pass"); + } + + @Override + protected void writeBatched(String database, String retentionPolicy, + ConsistencyLevel consistencyLevel, List points) { + writeCalled++; + queueDepth = getBufferedCount(); + } + + public int getQueueDepth() { + return queueDepth; + } + + public BatchProcessor getBatchProcessor() { + return super.getBatchProcessor(); + } + } private static Point getAnonPoint() { return getPoint("anon"); @@ -56,6 +80,10 @@ private static AnonInfluxDBImpl getErrorThrowingDB() { return new AnonInfluxDBImpl(true); } + private static QueueDepthRecordingDBImpl getQueueDepthRecordingDBImpl() { + return new QueueDepthRecordingDBImpl(); + } + private final String ANON_DB = "db"; private final String ANON_RETENTION = "default"; private final ConsistencyLevel ANON_CONSISTENCY = ConsistencyLevel.ONE; @@ -263,4 +291,23 @@ public void writeOnlyAttemptsUpToMaxBatchWrite() { Assert.assertEquals(subject.queue.size(), 1); Assert.assertEquals(subject.queue.peek().getPoint().getMeasurement(), "measure3"); } + + @Test + public void testGetBufferedCountWorksInTheMiddleOfAWrite() { + QueueDepthRecordingDBImpl influxDb = getQueueDepthRecordingDBImpl(); + influxDb.enableBatch(5, 5, 5, TimeUnit.SECONDS, BufferFailBehaviour.THROW_EXCEPTION, false, 5); + BatchProcessor subject = influxDb.getBatchProcessor(); + + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure1")); + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure2")); + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure3")); + Assert.assertEquals(subject.queue.size(), 3); + Assert.assertEquals(subject.getBufferedCount(), 3); + + subject.write(); + Assert.assertEquals(influxDb.writeCalled, 1); + Assert.assertEquals(influxDb.getQueueDepth(), 3); + Assert.assertEquals(subject.queue.size(), 0); + Assert.assertEquals(subject.getBufferedCount(), 0); + } } From 3c1e430bea195f21bffd356d64cda6086a23f79e Mon Sep 17 00:00:00 2001 From: Andrew Dodd Date: Thu, 22 Oct 2015 21:30:33 +1000 Subject: [PATCH 09/13] Introduce slf4j to allow logging This library could really do with some logging. --- pom.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pom.xml b/pom.xml index 163166343..58e7f8907 100644 --- a/pom.xml +++ b/pom.xml @@ -106,5 +106,10 @@ okhttp 2.4.0 + + org.slf4j + slf4j-api + 1.7.12 + From aae92ce4135e38d6ae119f939add35c85c37ba73 Mon Sep 17 00:00:00 2001 From: Andrew Dodd Date: Thu, 22 Oct 2015 22:09:15 +1000 Subject: [PATCH 10/13] index on DataRetentionEnhancement: eef0c3a Merge branch 'master' into DataRetentionEnhancement --- pom.xml | 9 + .../org/influxdb/impl/BatchProcessor.java | 208 ++++++++++++------ .../java/org/influxdb/impl/InfluxDBImpl.java | 1 + .../org/influxdb/impl/BatchProcessorTest.java | 73 +++--- 4 files changed, 193 insertions(+), 98 deletions(-) diff --git a/pom.xml b/pom.xml index 58e7f8907..b87fff60a 100644 --- a/pom.xml +++ b/pom.xml @@ -106,10 +106,19 @@ okhttp 2.4.0 +<<<<<<< Updated upstream org.slf4j slf4j-api 1.7.12 +||||||| merged common ancestors +======= + + org.slf4j + slf4j-api + 1.7.12 + +>>>>>>> Stashed changes diff --git a/src/main/java/org/influxdb/impl/BatchProcessor.java b/src/main/java/org/influxdb/impl/BatchProcessor.java index a88661e91..8358892d0 100644 --- a/src/main/java/org/influxdb/impl/BatchProcessor.java +++ b/src/main/java/org/influxdb/impl/BatchProcessor.java @@ -15,6 +15,8 @@ import org.influxdb.InfluxDB.BufferFailBehaviour; import org.influxdb.InfluxDB.ConsistencyLevel; import org.influxdb.dto.Point; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Function; import com.google.common.base.Objects; @@ -31,34 +33,41 @@ * */ public class BatchProcessor { + private static final Logger logger = LoggerFactory.getLogger(BatchProcessor.class); public static final int DEFAULT_ACTIONS = 10; - public static final int DEFAULT_FLUSH_INTERVAL = 100; + public static final int DEFAULT_FLUSH_INTERVAL_MIN = 1000; + public static final int DEFAULT_FLUSH_INTERVAL_MAX = 60000; public static final TimeUnit DEFAULT_FLUSH_INTERVAL_TIME_UINT = TimeUnit.MILLISECONDS; public static final int DEFAULT_MAX_BATCH_WRITE_SIZE = 50; protected final BlockingDeque queue; - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private final InfluxDBImpl influxDB; - private final int actions; + private final int flushActions; private final TimeUnit flushIntervalUnit; - private final int flushInterval; + private int flushInterval; + private final int flushIntervalMin; + private final int flushIntervalMax; private final BufferFailBehaviour behaviour; private final boolean discardOnFailedWrite; private final int maxBatchWriteSize; - private final AtomicBoolean writeLockout = new AtomicBoolean(false); + private final AtomicBoolean writeInProgressLock = new AtomicBoolean(false); + private final AtomicBoolean waitForFlushIntervalToWriteLock = new AtomicBoolean(false); private final Object queueLock = new Object(); private final ArrayList writeList; + /** * The Builder to create a BatchProcessor instance. */ public static final class Builder { private final InfluxDBImpl influxDB; - private int actions = DEFAULT_ACTIONS; + private int flushActions = DEFAULT_ACTIONS; private TimeUnit flushIntervalUnit = DEFAULT_FLUSH_INTERVAL_TIME_UINT; - private int flushInterval = DEFAULT_FLUSH_INTERVAL; + private int flushIntervalMin = DEFAULT_FLUSH_INTERVAL_MIN; + private int flushIntervalMax = DEFAULT_FLUSH_INTERVAL_MAX; private Integer capacity = null; private BufferFailBehaviour behaviour = BufferFailBehaviour.THROW_EXCEPTION; private boolean discardOnFailedWrite = true; @@ -80,8 +89,8 @@ public Builder(final InfluxDB influxDB) { * happen. * @return this Builder to use it fluent */ - public Builder actions(final int actions) { - this.actions = actions; + public Builder actions(final int flushActions) { + this.flushActions = flushActions; return this; } @@ -95,8 +104,9 @@ public Builder actions(final int actions) { * * @return this Builder to use it fluent */ - public Builder interval(final int interval, final TimeUnit unit) { - this.flushInterval = interval; + public Builder interval(final int intervalMin, final int intervalMax, final TimeUnit unit) { + this.flushIntervalMin = intervalMin; + this.flushIntervalMax = intervalMax; this.flushIntervalUnit = unit; return this; } @@ -121,9 +131,9 @@ public Builder capacity(final Integer capacity) { * @param actions * @return this builder instance, for fluent usage */ - public Builder capacityAndActions(final Integer capacity, final int actions) { + public Builder capacityAndActions(final Integer capacity, final int flushActions) { this.capacity = capacity; - this.actions = actions; + this.flushActions = flushActions; return this; } @@ -167,20 +177,22 @@ public Builder maxBatchWriteSize(final int maxBatchWriteSize) { * @return the BatchProcessor instance. */ public BatchProcessor build() { - Preconditions.checkArgument(actions > 0, "actions must be > 0"); - Preconditions.checkArgument(flushInterval > 0, "flushInterval must be > 0"); + Preconditions.checkArgument(flushActions > 0, "flushActions must be > 0"); + Preconditions.checkArgument(flushIntervalMin > 0, "flushIntervalMin must be > 0"); Preconditions.checkNotNull(flushIntervalUnit, "flushIntervalUnit may not be null"); + Preconditions.checkArgument(flushIntervalMin <= flushIntervalMax, "flushIntervalMin must be <= flushIntervalMax"); Preconditions.checkArgument(maxBatchWriteSize > 0, "maxBatchWriteSize must be > 0"); if (capacity != null) { Preconditions.checkArgument(capacity > 0, "Capacity should be > 0 or NULL"); - Preconditions.checkArgument(capacity >= actions, "Capacity must be >= than actions"); + Preconditions.checkArgument(capacity >= flushActions, "Capacity must be >= than flushActions"); } else { Preconditions.checkArgument(behaviour != BufferFailBehaviour.DROP_OLDEST, "Behaviour cannot be DROP_OLDEST if capacity not set"); } - return new BatchProcessor(influxDB, actions, flushIntervalUnit, flushInterval, capacity, behaviour, + return new BatchProcessor(influxDB, flushActions, flushIntervalUnit, flushIntervalMin, flushIntervalMax, + capacity, behaviour, discardOnFailedWrite, maxBatchWriteSize); } } @@ -227,19 +239,22 @@ public static Builder builder(final InfluxDB influxDB) { return new Builder(influxDB); } - BatchProcessor(final InfluxDBImpl influxDB, final int actions, final TimeUnit flushIntervalUnit, - final int flushInterval, final Integer capacity, final BufferFailBehaviour behaviour, + BatchProcessor(final InfluxDBImpl influxDB, final int flushActions, final TimeUnit flushIntervalUnit, + final int flushIntervalMin, final int flushIntervalMax, final Integer capacity, final BufferFailBehaviour behaviour, boolean discardOnFailedWrite, final int maxBatchWriteSize) { super(); this.influxDB = influxDB; - this.actions = actions; + this.flushActions = flushActions; this.flushIntervalUnit = flushIntervalUnit; - this.flushInterval = flushInterval; + this.flushIntervalMin = flushIntervalMin; + this.flushIntervalMax = flushIntervalMax; this.behaviour = behaviour; this.discardOnFailedWrite = discardOnFailedWrite; this.maxBatchWriteSize = maxBatchWriteSize; - this.writeList = Lists.newArrayListWithCapacity(maxBatchWriteSize); - + writeList = Lists.newArrayListWithCapacity(maxBatchWriteSize); + + flushInterval = this.flushIntervalMin; + if (capacity != null) { if (capacity == 0) { throw new IllegalArgumentException("capacity cannot be 0"); @@ -248,21 +263,79 @@ public static Builder builder(final InfluxDB influxDB) { } else { queue = new LinkedBlockingDeque(); } - + // Flush at specified Rate - scheduler.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - writeLockout.set(false); - write(); + scheduleNextFlush(); + } + + private void scheduleNextFlush() { + logger.debug("scheduling next flush for {} {}", flushInterval, flushIntervalUnit); + scheduler.schedule(new FlushIntervalRunnable(), flushInterval, flushIntervalUnit); + } + + private class FlushIntervalRunnable implements Runnable { + public void run() { + logger.debug("Flush interval commenced"); + WriteResult result = attemptWrite(); + + switch (result){ + case FAILED: + logger.debug("Flush interval - FAILED"); + flushInterval = Math.min(flushInterval * 2, flushIntervalMax); + break; + case NOT_ATTEMPTED: + logger.debug("Flush interval - NOT ATTEMPTED"); + break; + case SUCCESSFUL: + logger.debug("Flush interval - SUCCESS"); + flushInterval = flushIntervalMin; + waitForFlushIntervalToWriteLock.set(false); + break; + default: + throw new RuntimeException("Unhandled WriteResult enum value:" + result); } - }, this.flushInterval, this.flushInterval, this.flushIntervalUnit); + + scheduleNextFlush(); + } + } + + private class WriteRunnable implements Runnable{ + @Override + public void run() { + attemptWrite(); + } + } + + enum WriteResult { + NOT_ATTEMPTED, + SUCCESSFUL, + FAILED, + } + + WriteResult attemptWrite() { + if (writeInProgressLock.compareAndSet(false, true)) { + logger.debug("Attempting to write"); + boolean success = write(); + writeInProgressLock.set(false); + + return success ? WriteResult.SUCCESSFUL: WriteResult.FAILED; + } + logger.debug("Write already in progress, not attempting"); + return WriteResult.NOT_ATTEMPTED; + } + + void writeNow() { + // If there is no write in progress, schedule an immediate write + if (!writeInProgressLock.get()) { + logger.debug("Write NOT already in progress, scheduling WriteRunnable"); + scheduler.execute(new WriteRunnable()); + } } - void write() { + boolean write() { if (queue.isEmpty()) { - return; + return true; } synchronized (queueLock) { @@ -324,13 +397,14 @@ public Point apply(BatchEntry input) { break; } } - - // Enable the rate throttling - writeLockout.set(true); + waitForFlushIntervalToWriteLock.set(true); } writeList.clear(); } + return false; } + + return true; } /** @@ -344,47 +418,49 @@ public boolean put(String database, String retentionPolicy, ConsistencyLevel con BatchEntry entry = new BatchEntry(point, database, consistency, retentionPolicy); boolean added = false; - synchronized (queueLock) { - - switch (behaviour) { - case DROP_CURRENT: - added = queue.offer(entry); - break; - case DROP_OLDEST: - added = addAndDropIfNecessary(entry); - break; - case THROW_EXCEPTION: - added = queue.add(entry); - break; - case BLOCK_THREAD: - try { - queue.put(entry); - added = true; - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - break; - default: - throw new UnsupportedOperationException("Behaviour not yet supported"); + switch (behaviour) { + case DROP_CURRENT: + added = queue.offer(entry); + break; + case DROP_OLDEST: + added = addAndDropIfNecessary(entry); + break; + case THROW_EXCEPTION: + added = queue.add(entry); + break; + case BLOCK_THREAD: + try { + queue.put(entry); + added = true; + } catch (InterruptedException e) { + throw new RuntimeException(e); } + break; + default: + throw new UnsupportedOperationException("Behaviour not yet supported"); } - if (!writeLockout.get()) { - if (queue.size() >= actions) { - write(); + logger.debug("Queue size:{}", queue.size()); + + if (!waitForFlushIntervalToWriteLock.get()) { + if (queue.size() >= flushActions) { + logger.debug("No flush lock - Queue size[{}] actions[{}]", queue.size(), flushActions); + writeNow(); } } return added; } - public boolean addAndDropIfNecessary(BatchEntry entry) { - boolean added = queue.offer(entry); - if (!added) { - queue.poll(); // Remove the front of the queue - added = queue.add(entry); + private boolean addAndDropIfNecessary(BatchEntry entry) { + synchronized (queueLock) { + boolean added = queue.offer(entry); + if (!added) { + queue.poll(); // Remove the front of the queue + added = queue.add(entry); + } + return added; } - return added; } /** diff --git a/src/main/java/org/influxdb/impl/InfluxDBImpl.java b/src/main/java/org/influxdb/impl/InfluxDBImpl.java index bca57793e..690930929 100644 --- a/src/main/java/org/influxdb/impl/InfluxDBImpl.java +++ b/src/main/java/org/influxdb/impl/InfluxDBImpl.java @@ -6,6 +6,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.influxdb.InfluxDB; +import org.influxdb.InfluxDB.BufferFailBehaviour; import org.influxdb.dto.BatchPoints; import org.influxdb.dto.Point; import org.influxdb.dto.Pong; diff --git a/src/test/java/org/influxdb/impl/BatchProcessorTest.java b/src/test/java/org/influxdb/impl/BatchProcessorTest.java index f6e17e32b..761a58792 100644 --- a/src/test/java/org/influxdb/impl/BatchProcessorTest.java +++ b/src/test/java/org/influxdb/impl/BatchProcessorTest.java @@ -63,6 +63,19 @@ public BatchProcessor getBatchProcessor() { } } + private static class NonScheduledWriteBatchProcessor extends BatchProcessor { + + NonScheduledWriteBatchProcessor(InfluxDBImpl influxDB, int actions, TimeUnit flushIntervalUnit, + int flushInterval, Integer capacity, BufferFailBehaviour behaviour, boolean discardOnFailedWrite, + int maxBatchWriteSize) { + super(influxDB, actions, flushIntervalUnit, flushInterval, capacity, behaviour, discardOnFailedWrite, + maxBatchWriteSize); + } + @Override + void writeNow() { + attemptWrite(); + } + } private static Point getAnonPoint() { return getPoint("anon"); } @@ -132,14 +145,10 @@ public void addingEvictsOldestWhenBehaviourIsDropOldestAndQueueAtCapacity() { @Test public void addingDoesNotInsertCurrentWhenBehaviourIsDropCurrentAndKeepOnFailedWriteAndQueueAtCapacity() { - BatchProcessor subject = BatchProcessor.builder(getErrorThrowingDB()) - .interval(1, TimeUnit.DAYS) - .capacityAndActions(1, 1) - .behaviour(BufferFailBehaviour.DROP_CURRENT) - .discardOnFailedWrite(false) - .build(); + BatchProcessor subject = new NonScheduledWriteBatchProcessor(getErrorThrowingDB(), 1, TimeUnit.SECONDS, 1, 1, BufferFailBehaviour.DROP_CURRENT, false, 50); subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure1")); +// subject.attemptWrite(); Assert.assertEquals(subject.queue.peek().getPoint().getMeasurement(), "measure1"); subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure2")); @@ -203,11 +212,13 @@ public void discardOnFailedWriteProcessorDropsPointsAfterExceptionThrown() { @Test public void writeCalledAfterActionsReached() { AnonInfluxDBImpl influxDb = getAnonInfluxDB(); - BatchProcessor subject = BatchProcessor.builder(influxDb) - .interval(1, TimeUnit.DAYS) - .actions(2) - .build(); - +// BatchProcessor subject = BatchProcessor.builder(influxDb) +// .interval(1, TimeUnit.DAYS) +// .actions(2) +// .build(); + BatchProcessor subject = new NonScheduledWriteBatchProcessor(influxDb, 2, TimeUnit.DAYS, 1, null, + BufferFailBehaviour.THROW_EXCEPTION, false, 50); + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getAnonPoint()); Assert.assertEquals(subject.queue.size(), 1); Assert.assertEquals(influxDb.writeCalled, 0); @@ -219,11 +230,14 @@ public void writeCalledAfterActionsReached() { @Test public void writeNotCascadedAfterWriteFailure() { AnonInfluxDBImpl influxDb = getErrorThrowingDB(); - BatchProcessor subject = BatchProcessor.builder(influxDb) - .interval(1, TimeUnit.DAYS) - .capacityAndActions(3, 1) - .discardOnFailedWrite(false) - .build(); +// BatchProcessor subject = BatchProcessor.builder(influxDb) +// .interval(1, TimeUnit.DAYS) +// .capacityAndActions(3, 1) +// .discardOnFailedWrite(false) +// .build(); + BatchProcessor subject = new NonScheduledWriteBatchProcessor(influxDb, 1, TimeUnit.DAYS, 1, 3, + BufferFailBehaviour.THROW_EXCEPTION, false, 50); + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getAnonPoint()); Assert.assertEquals(subject.queue.size(), 1); @@ -239,11 +253,7 @@ public void writeNotCascadedAfterWriteFailure() { @Test public void successfullyWrittenPointsAreNotReturnedToQueue() { AnonInfluxDBImpl influxDb = getAnonInfluxDB(); - BatchProcessor subject = BatchProcessor.builder(influxDb) - .interval(1, TimeUnit.DAYS) - .capacityAndActions(3, 3) - .discardOnFailedWrite(false) - .build(); + BatchProcessor subject = new NonScheduledWriteBatchProcessor(influxDb, 3, TimeUnit.DAYS, 1, 3, BufferFailBehaviour.DROP_CURRENT, false, 50); subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure1")); subject.put(AnonInfluxDBImpl.FAIL_DATABASE, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure3")); @@ -256,11 +266,7 @@ public void successfullyWrittenPointsAreNotReturnedToQueue() { @Test public void unsuccessfullyWrittenPointsAreReturnedToQueueInCorrectOrder() { AnonInfluxDBImpl influxDb = getErrorThrowingDB(); - BatchProcessor subject = BatchProcessor.builder(influxDb) - .interval(1, TimeUnit.DAYS) - .capacityAndActions(4, 4) - .discardOnFailedWrite(false) - .build(); + BatchProcessor subject = new NonScheduledWriteBatchProcessor(influxDb, 4, TimeUnit.DAYS, 1, 4, BufferFailBehaviour.THROW_EXCEPTION, false, 50); subject.put("db1", ANON_RETENTION, ANON_CONSISTENCY, getPoint("inserted1")); subject.put("db2", ANON_RETENTION, ANON_CONSISTENCY, getPoint("inserted2")); @@ -276,12 +282,15 @@ public void unsuccessfullyWrittenPointsAreReturnedToQueueInCorrectOrder() { @Test public void writeOnlyAttemptsUpToMaxBatchWrite() { AnonInfluxDBImpl influxDb = getAnonInfluxDB(); - BatchProcessor subject = BatchProcessor.builder(influxDb) - .interval(1, TimeUnit.DAYS) - .capacityAndActions(3, 3) - .maxBatchWriteSize(2) - .discardOnFailedWrite(false) - .build(); +// BatchProcessor subject = BatchProcessor.builder(influxDb) +// .interval(1, TimeUnit.DAYS) +// .capacityAndActions(3, 3) +// .maxBatchWriteSize(2) +// .discardOnFailedWrite(false) +// .build(); + BatchProcessor subject = new NonScheduledWriteBatchProcessor(influxDb, 3, TimeUnit.DAYS, 1, 3, + BufferFailBehaviour.THROW_EXCEPTION, false, 2); + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure1")); subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure2")); From 75f8b835f7a3141bbacf562ce4ae90103107fc19 Mon Sep 17 00:00:00 2001 From: Andrew Dodd Date: Thu, 22 Oct 2015 22:30:46 +1000 Subject: [PATCH 11/13] Write of buffered points occur in worker thread These changes make the 'write()' call happen in the worker thread, rather than in the 'put()' thread, even when the 'actions' trigger is reached. --- pom.xml | 2 +- .../org/influxdb/impl/BatchProcessor.java | 149 +++++++++++++----- .../java/org/influxdb/impl/InfluxDBImpl.java | 1 + .../org/influxdb/impl/BatchProcessorTest.java | 73 +++++---- 4 files changed, 149 insertions(+), 76 deletions(-) diff --git a/pom.xml b/pom.xml index 58e7f8907..b909bfe36 100644 --- a/pom.xml +++ b/pom.xml @@ -110,6 +110,6 @@ org.slf4j slf4j-api 1.7.12 - + diff --git a/src/main/java/org/influxdb/impl/BatchProcessor.java b/src/main/java/org/influxdb/impl/BatchProcessor.java index a88661e91..42c73c30c 100644 --- a/src/main/java/org/influxdb/impl/BatchProcessor.java +++ b/src/main/java/org/influxdb/impl/BatchProcessor.java @@ -15,6 +15,8 @@ import org.influxdb.InfluxDB.BufferFailBehaviour; import org.influxdb.InfluxDB.ConsistencyLevel; import org.influxdb.dto.Point; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Function; import com.google.common.base.Objects; @@ -31,13 +33,14 @@ * */ public class BatchProcessor { + private static final Logger logger = LoggerFactory.getLogger(BatchProcessor.class); public static final int DEFAULT_ACTIONS = 10; public static final int DEFAULT_FLUSH_INTERVAL = 100; public static final TimeUnit DEFAULT_FLUSH_INTERVAL_TIME_UINT = TimeUnit.MILLISECONDS; public static final int DEFAULT_MAX_BATCH_WRITE_SIZE = 50; protected final BlockingDeque queue; - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private final InfluxDBImpl influxDB; private final int actions; private final TimeUnit flushIntervalUnit; @@ -46,7 +49,8 @@ public class BatchProcessor { private final boolean discardOnFailedWrite; private final int maxBatchWriteSize; - private final AtomicBoolean writeLockout = new AtomicBoolean(false); + private final AtomicBoolean writeInProgressLock = new AtomicBoolean(false); + private final AtomicBoolean waitForFlushIntervalToWriteLock = new AtomicBoolean(false); private final Object queueLock = new Object(); private final ArrayList writeList; @@ -250,19 +254,75 @@ public static Builder builder(final InfluxDB influxDB) { } // Flush at specified Rate - scheduler.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - writeLockout.set(false); - write(); + scheduleNextFlush(); + } + + private void scheduleNextFlush() { + logger.debug("scheduling next flush for {} {}", flushInterval, flushIntervalUnit); + scheduler.schedule(new FlushIntervalRunnable(), flushInterval, flushIntervalUnit); + } + + private class FlushIntervalRunnable implements Runnable { + public void run() { + logger.debug("Flush interval commenced"); + WriteResult result = attemptWrite(); + + switch (result){ + case FAILED: + logger.debug("Flush interval - FAILED"); + break; + case NOT_ATTEMPTED: + logger.debug("Flush interval - NOT ATTEMPTED"); + break; + case SUCCESSFUL: + logger.debug("Flush interval - SUCCESS"); + waitForFlushIntervalToWriteLock.set(false); + break; + default: + throw new RuntimeException("Unhandled WriteResult enum value:" + result); } - }, this.flushInterval, this.flushInterval, this.flushIntervalUnit); + + scheduleNextFlush(); + } + } + + private class WriteRunnable implements Runnable{ + @Override + public void run() { + attemptWrite(); + } + } + + enum WriteResult { + NOT_ATTEMPTED, + SUCCESSFUL, + FAILED, + } + + WriteResult attemptWrite() { + if (writeInProgressLock.compareAndSet(false, true)) { + logger.debug("Attempting to write"); + boolean success = write(); + writeInProgressLock.set(false); + + return success ? WriteResult.SUCCESSFUL: WriteResult.FAILED; + } + logger.debug("Write already in progress, not attempting"); + return WriteResult.NOT_ATTEMPTED; + } + + void writeNow() { + // If there is no write in progress, schedule an immediate write + if (!writeInProgressLock.get()) { + logger.debug("Write NOT already in progress, scheduling WriteRunnable"); + scheduler.execute(new WriteRunnable()); + } } - void write() { + boolean write() { if (queue.isEmpty()) { - return; + return true; } synchronized (queueLock) { @@ -324,13 +384,14 @@ public Point apply(BatchEntry input) { break; } } - - // Enable the rate throttling - writeLockout.set(true); + waitForFlushIntervalToWriteLock.set(true); } writeList.clear(); } + return false; } + + return true; } /** @@ -344,47 +405,49 @@ public boolean put(String database, String retentionPolicy, ConsistencyLevel con BatchEntry entry = new BatchEntry(point, database, consistency, retentionPolicy); boolean added = false; - synchronized (queueLock) { - - switch (behaviour) { - case DROP_CURRENT: - added = queue.offer(entry); - break; - case DROP_OLDEST: - added = addAndDropIfNecessary(entry); - break; - case THROW_EXCEPTION: - added = queue.add(entry); - break; - case BLOCK_THREAD: - try { - queue.put(entry); - added = true; - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - break; - default: - throw new UnsupportedOperationException("Behaviour not yet supported"); + switch (behaviour) { + case DROP_CURRENT: + added = queue.offer(entry); + break; + case DROP_OLDEST: + added = addAndDropIfNecessary(entry); + break; + case THROW_EXCEPTION: + added = queue.add(entry); + break; + case BLOCK_THREAD: + try { + queue.put(entry); + added = true; + } catch (InterruptedException e) { + throw new RuntimeException(e); } + break; + default: + throw new UnsupportedOperationException("Behaviour not yet supported"); } - if (!writeLockout.get()) { + logger.debug("Queue size:{}", queue.size()); + + if (!waitForFlushIntervalToWriteLock.get()) { if (queue.size() >= actions) { - write(); + logger.debug("No flush lock - Queue size[{}] actions[{}]", queue.size(), actions); + writeNow(); } } return added; } - public boolean addAndDropIfNecessary(BatchEntry entry) { - boolean added = queue.offer(entry); - if (!added) { - queue.poll(); // Remove the front of the queue - added = queue.add(entry); + private boolean addAndDropIfNecessary(BatchEntry entry) { + synchronized (queueLock) { + boolean added = queue.offer(entry); + if (!added) { + queue.poll(); // Remove the front of the queue + added = queue.add(entry); + } + return added; } - return added; } /** diff --git a/src/main/java/org/influxdb/impl/InfluxDBImpl.java b/src/main/java/org/influxdb/impl/InfluxDBImpl.java index bca57793e..690930929 100644 --- a/src/main/java/org/influxdb/impl/InfluxDBImpl.java +++ b/src/main/java/org/influxdb/impl/InfluxDBImpl.java @@ -6,6 +6,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.influxdb.InfluxDB; +import org.influxdb.InfluxDB.BufferFailBehaviour; import org.influxdb.dto.BatchPoints; import org.influxdb.dto.Point; import org.influxdb.dto.Pong; diff --git a/src/test/java/org/influxdb/impl/BatchProcessorTest.java b/src/test/java/org/influxdb/impl/BatchProcessorTest.java index f6e17e32b..761a58792 100644 --- a/src/test/java/org/influxdb/impl/BatchProcessorTest.java +++ b/src/test/java/org/influxdb/impl/BatchProcessorTest.java @@ -63,6 +63,19 @@ public BatchProcessor getBatchProcessor() { } } + private static class NonScheduledWriteBatchProcessor extends BatchProcessor { + + NonScheduledWriteBatchProcessor(InfluxDBImpl influxDB, int actions, TimeUnit flushIntervalUnit, + int flushInterval, Integer capacity, BufferFailBehaviour behaviour, boolean discardOnFailedWrite, + int maxBatchWriteSize) { + super(influxDB, actions, flushIntervalUnit, flushInterval, capacity, behaviour, discardOnFailedWrite, + maxBatchWriteSize); + } + @Override + void writeNow() { + attemptWrite(); + } + } private static Point getAnonPoint() { return getPoint("anon"); } @@ -132,14 +145,10 @@ public void addingEvictsOldestWhenBehaviourIsDropOldestAndQueueAtCapacity() { @Test public void addingDoesNotInsertCurrentWhenBehaviourIsDropCurrentAndKeepOnFailedWriteAndQueueAtCapacity() { - BatchProcessor subject = BatchProcessor.builder(getErrorThrowingDB()) - .interval(1, TimeUnit.DAYS) - .capacityAndActions(1, 1) - .behaviour(BufferFailBehaviour.DROP_CURRENT) - .discardOnFailedWrite(false) - .build(); + BatchProcessor subject = new NonScheduledWriteBatchProcessor(getErrorThrowingDB(), 1, TimeUnit.SECONDS, 1, 1, BufferFailBehaviour.DROP_CURRENT, false, 50); subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure1")); +// subject.attemptWrite(); Assert.assertEquals(subject.queue.peek().getPoint().getMeasurement(), "measure1"); subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure2")); @@ -203,11 +212,13 @@ public void discardOnFailedWriteProcessorDropsPointsAfterExceptionThrown() { @Test public void writeCalledAfterActionsReached() { AnonInfluxDBImpl influxDb = getAnonInfluxDB(); - BatchProcessor subject = BatchProcessor.builder(influxDb) - .interval(1, TimeUnit.DAYS) - .actions(2) - .build(); - +// BatchProcessor subject = BatchProcessor.builder(influxDb) +// .interval(1, TimeUnit.DAYS) +// .actions(2) +// .build(); + BatchProcessor subject = new NonScheduledWriteBatchProcessor(influxDb, 2, TimeUnit.DAYS, 1, null, + BufferFailBehaviour.THROW_EXCEPTION, false, 50); + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getAnonPoint()); Assert.assertEquals(subject.queue.size(), 1); Assert.assertEquals(influxDb.writeCalled, 0); @@ -219,11 +230,14 @@ public void writeCalledAfterActionsReached() { @Test public void writeNotCascadedAfterWriteFailure() { AnonInfluxDBImpl influxDb = getErrorThrowingDB(); - BatchProcessor subject = BatchProcessor.builder(influxDb) - .interval(1, TimeUnit.DAYS) - .capacityAndActions(3, 1) - .discardOnFailedWrite(false) - .build(); +// BatchProcessor subject = BatchProcessor.builder(influxDb) +// .interval(1, TimeUnit.DAYS) +// .capacityAndActions(3, 1) +// .discardOnFailedWrite(false) +// .build(); + BatchProcessor subject = new NonScheduledWriteBatchProcessor(influxDb, 1, TimeUnit.DAYS, 1, 3, + BufferFailBehaviour.THROW_EXCEPTION, false, 50); + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getAnonPoint()); Assert.assertEquals(subject.queue.size(), 1); @@ -239,11 +253,7 @@ public void writeNotCascadedAfterWriteFailure() { @Test public void successfullyWrittenPointsAreNotReturnedToQueue() { AnonInfluxDBImpl influxDb = getAnonInfluxDB(); - BatchProcessor subject = BatchProcessor.builder(influxDb) - .interval(1, TimeUnit.DAYS) - .capacityAndActions(3, 3) - .discardOnFailedWrite(false) - .build(); + BatchProcessor subject = new NonScheduledWriteBatchProcessor(influxDb, 3, TimeUnit.DAYS, 1, 3, BufferFailBehaviour.DROP_CURRENT, false, 50); subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure1")); subject.put(AnonInfluxDBImpl.FAIL_DATABASE, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure3")); @@ -256,11 +266,7 @@ public void successfullyWrittenPointsAreNotReturnedToQueue() { @Test public void unsuccessfullyWrittenPointsAreReturnedToQueueInCorrectOrder() { AnonInfluxDBImpl influxDb = getErrorThrowingDB(); - BatchProcessor subject = BatchProcessor.builder(influxDb) - .interval(1, TimeUnit.DAYS) - .capacityAndActions(4, 4) - .discardOnFailedWrite(false) - .build(); + BatchProcessor subject = new NonScheduledWriteBatchProcessor(influxDb, 4, TimeUnit.DAYS, 1, 4, BufferFailBehaviour.THROW_EXCEPTION, false, 50); subject.put("db1", ANON_RETENTION, ANON_CONSISTENCY, getPoint("inserted1")); subject.put("db2", ANON_RETENTION, ANON_CONSISTENCY, getPoint("inserted2")); @@ -276,12 +282,15 @@ public void unsuccessfullyWrittenPointsAreReturnedToQueueInCorrectOrder() { @Test public void writeOnlyAttemptsUpToMaxBatchWrite() { AnonInfluxDBImpl influxDb = getAnonInfluxDB(); - BatchProcessor subject = BatchProcessor.builder(influxDb) - .interval(1, TimeUnit.DAYS) - .capacityAndActions(3, 3) - .maxBatchWriteSize(2) - .discardOnFailedWrite(false) - .build(); +// BatchProcessor subject = BatchProcessor.builder(influxDb) +// .interval(1, TimeUnit.DAYS) +// .capacityAndActions(3, 3) +// .maxBatchWriteSize(2) +// .discardOnFailedWrite(false) +// .build(); + BatchProcessor subject = new NonScheduledWriteBatchProcessor(influxDb, 3, TimeUnit.DAYS, 1, 3, + BufferFailBehaviour.THROW_EXCEPTION, false, 2); + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure1")); subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure2")); From 9e2c5e9a389ff6c03225aff4547753a32d8c8008 Mon Sep 17 00:00:00 2001 From: Andrew Dodd Date: Wed, 20 Apr 2016 18:00:35 +0200 Subject: [PATCH 12/13] Using correct default (should be `null` not `0`) Thanks @FlavioF for catching it here: https://github.com/influxdata/influxdb-java/pull/108/files#r60393226 --- src/main/java/org/influxdb/impl/InfluxDBImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/influxdb/impl/InfluxDBImpl.java b/src/main/java/org/influxdb/impl/InfluxDBImpl.java index c2cde2377..67927823d 100644 --- a/src/main/java/org/influxdb/impl/InfluxDBImpl.java +++ b/src/main/java/org/influxdb/impl/InfluxDBImpl.java @@ -120,7 +120,7 @@ public InfluxDB enableBatch(final int flushActions, final int flushIntervalMax, final TimeUnit flushIntervalTimeUnit) { - enableBatch(0, + enableBatch(null, flushActions, flushIntervalMin, flushIntervalMax, From a60c0e5e64fe9ce6a811d089b5629cfd6c1e844f Mon Sep 17 00:00:00 2001 From: Andrew Dodd Date: Wed, 20 Apr 2016 18:01:12 +0200 Subject: [PATCH 13/13] Improving the names of the tests --- src/test/java/org/influxdb/impl/BatchProcessorTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/test/java/org/influxdb/impl/BatchProcessorTest.java b/src/test/java/org/influxdb/impl/BatchProcessorTest.java index d64461180..aa4d22068 100644 --- a/src/test/java/org/influxdb/impl/BatchProcessorTest.java +++ b/src/test/java/org/influxdb/impl/BatchProcessorTest.java @@ -113,7 +113,7 @@ public void cannotBuildWithActionsGreaterThanCapacity() { } @Test(expectedExceptions={IllegalStateException.class}) - public void addingThrowsExceptionWhenBehaviourIsThrowExceptionAndQueueAtCapacity() { + public void itThrowsExceptionWhenQueueAtCapacityAndBehaviourIsThrowException() { BatchProcessor subject = BatchProcessor.builder(getErrorThrowingDB()) .interval(1, 2, TimeUnit.DAYS) .capacityAndActions(1, 1) @@ -128,7 +128,7 @@ public void addingThrowsExceptionWhenBehaviourIsThrowExceptionAndQueueAtCapacity } @Test - public void addingEvictsOldestWhenBehaviourIsDropOldestAndQueueAtCapacity() { + public void itEvictsTheOldestWhenQueueAtCapacityAndBehaviourIsDropOldest() { BatchProcessor subject = BatchProcessor.builder(getErrorThrowingDB()) .interval(1, 2, TimeUnit.DAYS) .capacityAndActions(1, 1) @@ -148,7 +148,7 @@ public void addingEvictsOldestWhenBehaviourIsDropOldestAndQueueAtCapacity() { } @Test - public void addingDoesNotInsertCurrentWhenBehaviourIsDropCurrentAndKeepOnFailedWriteAndQueueAtCapacity() { + public void itDoesNotInsertIfQueueAtCapcityeAndBehaviourIsDropCurrentAndKeppOnFailedWrite() { BatchProcessor subject = new NonScheduledWriteBatchProcessor(getErrorThrowingDB(), 1, TimeUnit.SECONDS, 1, 1, BufferFailBehaviour.DROP_CURRENT, false, 50); subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure1")); @@ -173,7 +173,7 @@ public void cannotBeBuiltWithDropOldestBehaviourAndWithoutCapacityLimit() { } @Test - public void pointsAreRemovedFromQueueAfterSuccessfulWrite() { + public void itRemovesPointsFromQueueAfterSuccessfulWrite() { BatchProcessor subject = BatchProcessor.builder(getAnonInfluxDB()) .interval(1, 2, TimeUnit.DAYS) .build();