Skip to content

Commit 16e79a9

Browse files
authored
Merge pull request #243 from jiafu1115/patch-25
support async write point with different rp. Close #162
2 parents 970aca8 + 3a90c2c commit 16e79a9

File tree

2 files changed

+30
-9
lines changed

2 files changed

+30
-9
lines changed

src/main/java/org/influxdb/impl/BatchProcessor.java

+11-8
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,9 @@ void write() {
186186
if (this.queue.isEmpty()) {
187187
return;
188188
}
189-
190-
Map<String, BatchPoints> databaseToBatchPoints = Maps.newHashMap();
189+
//for batch on HTTP.
190+
Map<String, BatchPoints> batchKeyToBatchPoints = Maps.newHashMap();
191+
//for batch on UDP.
191192
Map<Integer, List<String>> udpPortToBatchPoints = Maps.newHashMap();
192193
List<AbstractBatchEntry> batchEntries = new ArrayList<>(this.queue.size());
193194
this.queue.drainTo(batchEntries);
@@ -197,12 +198,14 @@ void write() {
197198
if (batchEntry instanceof HttpBatchEntry) {
198199
HttpBatchEntry httpBatchEntry = HttpBatchEntry.class.cast(batchEntry);
199200
String dbName = httpBatchEntry.getDb();
200-
if (!databaseToBatchPoints.containsKey(dbName)) {
201+
String rp = httpBatchEntry.getRp();
202+
String batchKey = dbName + "_" + rp;
203+
if (!batchKeyToBatchPoints.containsKey(batchKey)) {
201204
BatchPoints batchPoints = BatchPoints.database(dbName)
202-
.retentionPolicy(httpBatchEntry.getRp()).build();
203-
databaseToBatchPoints.put(dbName, batchPoints);
205+
.retentionPolicy(rp).build();
206+
batchKeyToBatchPoints.put(batchKey, batchPoints);
204207
}
205-
databaseToBatchPoints.get(dbName).point(point);
208+
batchKeyToBatchPoints.get(batchKey).point(point);
206209
} else if (batchEntry instanceof UdpBatchEntry) {
207210
UdpBatchEntry udpBatchEntry = UdpBatchEntry.class.cast(batchEntry);
208211
int udpPort = udpBatchEntry.getUdpPort();
@@ -214,8 +217,8 @@ void write() {
214217
}
215218
}
216219

217-
for (BatchPoints batchPoints : databaseToBatchPoints.values()) {
218-
BatchProcessor.this.influxDB.write(batchPoints);
220+
for (BatchPoints batchPoints : batchKeyToBatchPoints.values()) {
221+
BatchProcessor.this.influxDB.write(batchPoints);
219222
}
220223
for (Entry<Integer, List<String>> entry : udpPortToBatchPoints.entrySet()) {
221224
BatchProcessor.this.influxDB.write(entry.getKey(), entry.getValue());

src/test/java/org/influxdb/impl/BatchProcessorTest.java

+19-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,25 @@ public void testSchedulerExceptionHandling() throws InterruptedException, IOExce
3939
// without try catch the 2nd time does not occur
4040
verify(mockInfluxDB, times(2)).write(any(BatchPoints.class));
4141
}
42-
42+
43+
@Test
44+
public void testBatchWriteWithDifferenctRp() throws InterruptedException, IOException {
45+
InfluxDB mockInfluxDB = mock(InfluxDBImpl.class);
46+
BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB).actions(Integer.MAX_VALUE)
47+
.interval(1, TimeUnit.NANOSECONDS).build();
48+
49+
Point point = Point.measurement("cpu").field("6", "").build();
50+
BatchProcessor.HttpBatchEntry batchEntry1 = new BatchProcessor.HttpBatchEntry(point, "db1", "rp_1");
51+
BatchProcessor.HttpBatchEntry batchEntry2 = new BatchProcessor.HttpBatchEntry(point, "db1", "rp_2");
52+
53+
batchProcessor.put(batchEntry1);
54+
batchProcessor.put(batchEntry2);
55+
56+
Thread.sleep(200); // wait for scheduler
57+
// same dbname with different rp should write two batchs instead of only one.
58+
verify(mockInfluxDB, times(2)).write(any(BatchPoints.class));
59+
}
60+
4361
@Test(expected = IllegalArgumentException.class)
4462
public void testActionsIsZero() throws InterruptedException, IOException {
4563
InfluxDB mockInfluxDB = mock(InfluxDBImpl.class);

0 commit comments

Comments
 (0)