Skip to content

Commit 5d11e5f

Browse files
authored
Merge pull request #242 from jiafu1115/patch-23
support writing Point by async/sync through UDP.
2 parents 359a9b6 + 1ad2916 commit 5d11e5f

File tree

5 files changed

+87
-2
lines changed

5 files changed

+87
-2
lines changed

README.md

+11
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,19 @@ influxDB.deleteDatabase(dbName);
7474
```
7575
Note that the batching functionality creates an internal thread pool that needs to be shutdown explicitly as part of a gracefull application shut-down, or the application will not shut down properly. To do so simply call: ```influxDB.close()```
7676

77+
Gzip's support:
78+
7779
influxdb-java client doesn't enable gzip compress for http request body by default. If you want to enable gzip to reduce transfter data's size , you can call: ```influxDB.enableGzip()```
7880

81+
UDP's support:
82+
83+
influxdb-java client support udp protocol now. you can call followed methods directly to wirte through UDP.
84+
```
85+
public void write(final int udpPort, final String records);
86+
public void write(final int udpPort, final List<String> records);
87+
public void write(final int udpPort, final Point point);
88+
```
89+
7990
### Changes in 2.4
8091
influxdb-java now uses okhttp3 and retrofit2. As a result, you can now pass an ``OkHttpClient.Builder``
8192
to the ``InfluxDBFactory.connect`` if you wish to add more interceptors, etc, to OkHttp.

src/main/java/org/influxdb/InfluxDB.java

+10
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,16 @@ public InfluxDB enableBatch(final int actions, final int flushDuration, final Ti
153153
*/
154154
public void write(final String database, final String retentionPolicy, final Point point);
155155

156+
/**
157+
* Write a single Point to the database through UDP.
158+
*
159+
* @param udpPort
160+
* the udpPort to write to.
161+
* @param point
162+
* The point to write.
163+
*/
164+
public void write(final int udpPort, final Point point);
165+
156166
/**
157167
* Write a set of Points to the influxdb database with the new (>= 0.9.0rc32) lineprotocol.
158168
*

src/main/java/org/influxdb/impl/BatchProcessor.java

+14-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.util.ArrayList;
44
import java.util.List;
55
import java.util.Map;
6+
import java.util.Map.Entry;
67
import java.util.concurrent.BlockingQueue;
78
import java.util.concurrent.Executors;
89
import java.util.concurrent.LinkedBlockingQueue;
@@ -187,10 +188,12 @@ void write() {
187188
}
188189

189190
Map<String, BatchPoints> databaseToBatchPoints = Maps.newHashMap();
191+
Map<Integer, List<String>> udpPortToBatchPoints = Maps.newHashMap();
190192
List<AbstractBatchEntry> batchEntries = new ArrayList<>(this.queue.size());
191193
this.queue.drainTo(batchEntries);
192194

193195
for (AbstractBatchEntry batchEntry : batchEntries) {
196+
Point point = batchEntry.getPoint();
194197
if (batchEntry instanceof HttpBatchEntry) {
195198
HttpBatchEntry httpBatchEntry = HttpBatchEntry.class.cast(batchEntry);
196199
String dbName = httpBatchEntry.getDb();
@@ -199,15 +202,24 @@ void write() {
199202
.retentionPolicy(httpBatchEntry.getRp()).build();
200203
databaseToBatchPoints.put(dbName, batchPoints);
201204
}
202-
Point point = batchEntry.getPoint();
203205
databaseToBatchPoints.get(dbName).point(point);
206+
} else if (batchEntry instanceof UdpBatchEntry) {
207+
UdpBatchEntry udpBatchEntry = UdpBatchEntry.class.cast(batchEntry);
208+
int udpPort = udpBatchEntry.getUdpPort();
209+
if (!udpPortToBatchPoints.containsKey(udpPort)) {
210+
List<String> batchPoints = new ArrayList<String>();
211+
udpPortToBatchPoints.put(udpPort, batchPoints);
212+
}
213+
udpPortToBatchPoints.get(udpPort).add(point.lineProtocol());
204214
}
205-
206215
}
207216

208217
for (BatchPoints batchPoints : databaseToBatchPoints.values()) {
209218
BatchProcessor.this.influxDB.write(batchPoints);
210219
}
220+
for (Entry<Integer, List<String>> entry : udpPortToBatchPoints.entrySet()) {
221+
BatchProcessor.this.influxDB.write(entry.getKey(), entry.getValue());
222+
}
211223
} catch (Throwable t) {
212224
// any exception wouldn't stop the scheduler
213225
LOG.log(Level.SEVERE, "Batch could not be sent. Data will be lost", t);

src/main/java/org/influxdb/impl/InfluxDBImpl.java

+17
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
import org.influxdb.dto.Query;
1414
import org.influxdb.dto.QueryResult;
1515
import org.influxdb.impl.BatchProcessor.HttpBatchEntry;
16+
import org.influxdb.impl.BatchProcessor.UdpBatchEntry;
17+
1618
import okhttp3.Headers;
1719
import okhttp3.HttpUrl;
1820
import okhttp3.MediaType;
@@ -220,6 +222,21 @@ public void write(final String database, final String retentionPolicy, final Poi
220222
this.writeCount.incrementAndGet();
221223
}
222224

225+
/**
226+
* {@inheritDoc}
227+
*/
228+
@Override
229+
public void write(final int udpPort, final Point point) {
230+
if (this.batchEnabled.get()) {
231+
UdpBatchEntry batchEntry = new UdpBatchEntry(point, udpPort);
232+
this.batchProcessor.put(batchEntry);
233+
} else {
234+
this.write(udpPort, point.lineProtocol());
235+
this.unBatchedCount.incrementAndGet();
236+
}
237+
this.writeCount.incrementAndGet();
238+
}
239+
223240
@Override
224241
public void write(final BatchPoints batchPoints) {
225242
this.batchedCount.addAndGet(batchPoints.getPoints().size());

src/test/java/org/influxdb/InfluxDBTest.java

+35
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,41 @@ public void testWrite() {
147147
Assert.assertFalse(result.getResults().get(0).getSeries().get(0).getTags().isEmpty());
148148
this.influxDB.deleteDatabase(dbName);
149149
}
150+
151+
/**
152+
* Test the implementation of {@link InfluxDB#write(int, Point)}'s sync support.
153+
*/
154+
@Test
155+
public void testSyncWritePointThroughUDP() {
156+
this.influxDB.disableBatch();
157+
String measurement = TestUtils.getRandomMeasurement();
158+
Point point = Point.measurement(measurement).tag("atag", "test").addField("used", 80L).addField("free", 1L).build();
159+
this.influxDB.write(UDP_PORT, point);
160+
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
161+
Query query = new Query("SELECT * FROM " + measurement + " GROUP BY *", UDP_DATABASE);
162+
QueryResult result = this.influxDB.query(query);
163+
Assert.assertFalse(result.getResults().get(0).getSeries().get(0).getTags().isEmpty());
164+
}
165+
166+
/**
167+
* Test the implementation of {@link InfluxDB#write(int, Point)}'s async support.
168+
*/
169+
@Test
170+
public void testAsyncWritePointThroughUDP() {
171+
this.influxDB.enableBatch(1, 1, TimeUnit.SECONDS);
172+
try{
173+
Assert.assertTrue(this.influxDB.isBatchEnabled());
174+
String measurement = TestUtils.getRandomMeasurement();
175+
Point point = Point.measurement(measurement).tag("atag", "test").addField("used", 80L).addField("free", 1L).build();
176+
this.influxDB.write(UDP_PORT, point);
177+
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
178+
Query query = new Query("SELECT * FROM " + measurement + " GROUP BY *", UDP_DATABASE);
179+
QueryResult result = this.influxDB.query(query);
180+
Assert.assertFalse(result.getResults().get(0).getSeries().get(0).getTags().isEmpty());
181+
}finally{
182+
this.influxDB.disableBatch();
183+
}
184+
}
150185

151186
/**
152187
* Test writing to the database using string protocol.

0 commit comments

Comments
 (0)