Skip to content
This repository was archived by the owner on Feb 12, 2023. It is now read-only.

Commit 23b24f7

Browse files
committed
Implement Issue influxdata#389 : Support for MessagePack
1 parent f8bd921 commit 23b24f7

14 files changed

+1037
-113
lines changed

compile-and-test.sh

+1
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ docker run -it --rm \
4545
--workdir /usr/src/mymaven \
4646
--link=influxdb \
4747
--link=nginx \
48+
--env INFLUXDB_VERSION=${INFLUXDB_VERSION} \
4849
--env INFLUXDB_IP=influxdb \
4950
--env PROXY_API_URL=${PROXY_API_URL} \
5051
--env PROXY_UDP_PORT=${PROXY_UDP_PORT} \

pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,11 @@
255255
<artifactId>converter-moshi</artifactId>
256256
<version>2.4.0</version>
257257
</dependency>
258+
<dependency>
259+
<groupId>org.msgpack</groupId>
260+
<artifactId>msgpack-core</artifactId>
261+
<version>0.8.16</version>
262+
</dependency>
258263
<!-- If we use okhttp instead of java urlconnection we achieve server failover
259264
of the influxdb server address resolves to all influxdb server ips. -->
260265
<dependency>

src/main/java/org/influxdb/InfluxDB.java

+9
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,15 @@ public String value() {
9494
}
9595
}
9696

97+
/**
98+
* Format of HTTP Response body from InfluxDB server.
99+
*/
100+
public enum ResponseFormat {
101+
/** application/json format. */
102+
JSON,
103+
/** application/x-msgpack format. */
104+
MSGPACK
105+
}
97106
/**
98107
* Set the loglevel which is used for REST related actions.
99108
*

src/main/java/org/influxdb/InfluxDBFactory.java

+23-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.influxdb;
22

3+
import org.influxdb.InfluxDB.ResponseFormat;
34
import org.influxdb.impl.InfluxDBImpl;
45

56
import okhttp3.OkHttpClient;
@@ -78,9 +79,30 @@ public static InfluxDB connect(final String url, final OkHttpClient.Builder clie
7879
*/
7980
public static InfluxDB connect(final String url, final String username, final String password,
8081
final OkHttpClient.Builder client) {
82+
return connect(url, username, password, client, ResponseFormat.JSON);
83+
}
84+
85+
/**
86+
* Create a connection to a InfluxDB.
87+
*
88+
* @param url
89+
* the url to connect to.
90+
* @param username
91+
* the username which is used to authorize against the influxDB instance.
92+
* @param password
93+
* the password for the username which is used to authorize against the influxDB
94+
* instance.
95+
* @param client
96+
* the HTTP client to use
97+
* @param responseFormat
98+
* The {@code ResponseFormat} to use for response from InfluxDB server
99+
* @return a InfluxDB adapter suitable to access a InfluxDB.
100+
*/
101+
public static InfluxDB connect(final String url, final String username, final String password,
102+
final OkHttpClient.Builder client, final ResponseFormat responseFormat) {
81103
Preconditions.checkNonEmptyString(url, "url");
82104
Preconditions.checkNonEmptyString(username, "username");
83105
Objects.requireNonNull(client, "client");
84-
return new InfluxDBImpl(url, username, password, client);
106+
return new InfluxDBImpl(url, username, password, client, responseFormat);
85107
}
86108
}

src/main/java/org/influxdb/impl/InfluxDBImpl.java

+146-57
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import okhttp3.HttpUrl;
88
import okhttp3.MediaType;
99
import okhttp3.OkHttpClient;
10+
import okhttp3.Request;
1011
import okhttp3.RequestBody;
1112
import okhttp3.ResponseBody;
1213
import okhttp3.logging.HttpLoggingInterceptor;
@@ -25,14 +26,19 @@
2526
import org.influxdb.dto.QueryResult;
2627
import org.influxdb.impl.BatchProcessor.HttpBatchEntry;
2728
import org.influxdb.impl.BatchProcessor.UdpBatchEntry;
29+
import org.influxdb.msgpack.MessagePackConverterFactory;
30+
import org.influxdb.msgpack.MessagePackTraverser;
31+
2832
import retrofit2.Call;
2933
import retrofit2.Callback;
34+
import retrofit2.Converter.Factory;
3035
import retrofit2.Response;
3136
import retrofit2.Retrofit;
3237
import retrofit2.converter.moshi.MoshiConverterFactory;
3338

3439
import java.io.EOFException;
3540
import java.io.IOException;
41+
import java.io.InputStream;
3642
import java.net.DatagramPacket;
3743
import java.net.DatagramSocket;
3844
import java.net.InetAddress;
@@ -56,6 +62,8 @@
5662
*/
5763
public class InfluxDBImpl implements InfluxDB {
5864

65+
private static final String APPLICATION_MSGPACK = "application/x-msgpack";
66+
5967
static final okhttp3.MediaType MEDIA_TYPE_STRING = MediaType.parse("text/plain");
6068

6169
private static final String SHOW_DATABASE_COMMAND_ENCODED = Query.encode("SHOW DATABASES");
@@ -82,15 +90,28 @@ public class InfluxDBImpl implements InfluxDB {
8290
private final HttpLoggingInterceptor loggingInterceptor;
8391
private final GzipRequestInterceptor gzipRequestInterceptor;
8492
private LogLevel logLevel = LogLevel.NONE;
85-
private JsonAdapter<QueryResult> adapter;
8693
private String database;
8794
private String retentionPolicy = "autogen";
8895
private ConsistencyLevel consistency = ConsistencyLevel.ONE;
96+
private final ChunkProccesor chunkProccesor;
8997

90-
public InfluxDBImpl(final String url, final String username, final String password,
91-
final OkHttpClient.Builder client) {
92-
super();
93-
Moshi moshi = new Moshi.Builder().build();
98+
/**
99+
* Constructs a new {@code InfluxDBImpl}.
100+
*
101+
* @param url
102+
* The InfluxDB server API URL
103+
* @param username
104+
* The InfluxDB user name
105+
* @param password
106+
* The InfluxDB user password
107+
* @param client
108+
* The OkHttp Client Builder
109+
* @param responseFormat
110+
* The {@code ResponseFormat} to use for response from InfluxDB
111+
* server
112+
*/
113+
public InfluxDBImpl(final String url, final String username, final String password, final OkHttpClient.Builder client,
114+
final ResponseFormat responseFormat) {
94115
this.hostAddress = parseHostAddress(url);
95116
this.username = username;
96117
this.password = password;
@@ -99,38 +120,72 @@ public InfluxDBImpl(final String url, final String username, final String passwo
99120
setLogLevel(LOG_LEVEL);
100121

101122
this.gzipRequestInterceptor = new GzipRequestInterceptor();
102-
this.retrofit = new Retrofit.Builder()
103-
.baseUrl(url)
104-
.client(client.addInterceptor(loggingInterceptor).addInterceptor(gzipRequestInterceptor).build())
105-
.addConverterFactory(MoshiConverterFactory.create())
123+
client.addInterceptor(loggingInterceptor).addInterceptor(gzipRequestInterceptor);
124+
125+
Factory converterFactory = null;
126+
switch (responseFormat) {
127+
case MSGPACK:
128+
client.addInterceptor(chain -> {
129+
Request request = chain.request().newBuilder().addHeader("Accept", APPLICATION_MSGPACK)
130+
.addHeader("Accept-Encoding", "identity").build();
131+
return chain.proceed(request);
132+
});
133+
134+
converterFactory = MessagePackConverterFactory.create();
135+
chunkProccesor = new MessagePackChunkProccesor();
136+
break;
137+
case JSON:
138+
default:
139+
converterFactory = MoshiConverterFactory.create();
140+
141+
Moshi moshi = new Moshi.Builder().build();
142+
JsonAdapter<QueryResult> adapter = moshi.adapter(QueryResult.class);
143+
chunkProccesor = new JSONChunkProccesor(adapter);
144+
break;
145+
}
146+
147+
this.retrofit = new Retrofit.Builder().baseUrl(url).client(client.build()).addConverterFactory(converterFactory)
106148
.build();
107149
this.influxDBService = this.retrofit.create(InfluxDBService.class);
108-
this.adapter = moshi.adapter(QueryResult.class);
109-
}
110-
111-
InfluxDBImpl(final String url, final String username, final String password, final OkHttpClient.Builder client,
112-
final InfluxDBService influxDBService, final JsonAdapter<QueryResult> adapter) {
113-
super();
114-
this.hostAddress = parseHostAddress(url);
115-
this.username = username;
116-
this.password = password;
117-
118-
this.loggingInterceptor = new HttpLoggingInterceptor();
119-
setLogLevel(LOG_LEVEL);
120-
121-
this.gzipRequestInterceptor = new GzipRequestInterceptor();
122-
this.retrofit = new Retrofit.Builder()
123-
.baseUrl(url)
124-
.client(client.addInterceptor(loggingInterceptor).addInterceptor(gzipRequestInterceptor).build())
125-
.addConverterFactory(MoshiConverterFactory.create())
126-
.build();
127-
this.influxDBService = influxDBService;
128-
this.adapter = adapter;
150+
151+
if (ResponseFormat.MSGPACK.equals(responseFormat)) {
152+
String[] versionNumbers = version().split("\\.");
153+
final int major = Integer.parseInt(versionNumbers[0]);
154+
final int minor = Integer.parseInt(versionNumbers[1]);
155+
final int fromMinor = 4;
156+
if ((major < 2) && ((major != 1) || (minor < fromMinor))) {
157+
throw new InfluxDBException("MessagePack format is only supported from InfluxDB version 1.4 and later");
158+
}
129159
}
160+
}
130161

131162
public InfluxDBImpl(final String url, final String username, final String password,
132-
final OkHttpClient.Builder client, final String database,
133-
final String retentionPolicy, final ConsistencyLevel consistency) {
163+
final OkHttpClient.Builder client) {
164+
this(url, username, password, client, ResponseFormat.JSON);
165+
166+
}
167+
168+
InfluxDBImpl(final String url, final String username, final String password, final OkHttpClient.Builder client,
169+
final InfluxDBService influxDBService, final JsonAdapter<QueryResult> adapter) {
170+
super();
171+
this.hostAddress = parseHostAddress(url);
172+
this.username = username;
173+
this.password = password;
174+
175+
this.loggingInterceptor = new HttpLoggingInterceptor();
176+
setLogLevel(LOG_LEVEL);
177+
178+
this.gzipRequestInterceptor = new GzipRequestInterceptor();
179+
this.retrofit = new Retrofit.Builder().baseUrl(url)
180+
.client(client.addInterceptor(loggingInterceptor).addInterceptor(gzipRequestInterceptor).build())
181+
.addConverterFactory(MoshiConverterFactory.create()).build();
182+
this.influxDBService = influxDBService;
183+
184+
chunkProccesor = new JSONChunkProccesor(adapter);
185+
}
186+
187+
public InfluxDBImpl(final String url, final String username, final String password, final OkHttpClient.Builder client,
188+
final String database, final String retentionPolicy, final ConsistencyLevel consistency) {
134189
this(url, username, password, client);
135190

136191
setConsistency(consistency);
@@ -493,32 +548,26 @@ public void query(final Query query, final int chunkSize, final Consumer<QueryRe
493548
query.getDatabase(), query.getCommandWithUrlEncoded(), chunkSize);
494549
}
495550

496-
call.enqueue(new Callback<ResponseBody>() {
497-
@Override
498-
public void onResponse(final Call<ResponseBody> call, final Response<ResponseBody> response) {
499-
try {
500-
if (response.isSuccessful()) {
501-
BufferedSource source = response.body().source();
502-
while (true) {
503-
QueryResult result = InfluxDBImpl.this.adapter.fromJson(source);
504-
if (result != null) {
505-
consumer.accept(result);
506-
}
507-
}
508-
}
509-
try (ResponseBody errorBody = response.errorBody()) {
510-
throw new InfluxDBException(errorBody.string());
511-
}
512-
} catch (EOFException e) {
513-
QueryResult queryResult = new QueryResult();
514-
queryResult.setError("DONE");
515-
consumer.accept(queryResult);
516-
} catch (IOException e) {
517-
QueryResult queryResult = new QueryResult();
518-
queryResult.setError(e.toString());
519-
consumer.accept(queryResult);
520-
}
551+
call.enqueue(new Callback<ResponseBody>() {
552+
@Override
553+
public void onResponse(final Call<ResponseBody> call, final Response<ResponseBody> response) {
554+
try {
555+
if (response.isSuccessful()) {
556+
ResponseBody chunkedBody = response.body();
557+
chunkProccesor.process(chunkedBody, consumer);
558+
} else {
559+
// REVIEW: must be handled consistently with IOException.
560+
ResponseBody errorBody = response.errorBody();
561+
if (errorBody != null) {
562+
throw new InfluxDBException(errorBody.string());
521563
}
564+
}
565+
} catch (IOException e) {
566+
QueryResult queryResult = new QueryResult();
567+
queryResult.setError(e.toString());
568+
consumer.accept(queryResult);
569+
}
570+
}
522571

523572
@Override
524573
public void onFailure(final Call<ResponseBody> call, final Throwable t) {
@@ -752,4 +801,44 @@ public void dropRetentionPolicy(final String rpName, final String database) {
752801
Query.encode(queryBuilder.toString())));
753802
}
754803

804+
private interface ChunkProccesor {
805+
void process(ResponseBody chunkedBody, Consumer<QueryResult> consumer) throws IOException;
806+
}
807+
808+
private class MessagePackChunkProccesor implements ChunkProccesor {
809+
@Override
810+
public void process(final ResponseBody chunkedBody, final Consumer<QueryResult> consumer) throws IOException {
811+
MessagePackTraverser traverser = new MessagePackTraverser();
812+
try (InputStream is = chunkedBody.byteStream()) {
813+
for (QueryResult result : traverser.traverse(is)) {
814+
consumer.accept(result);
815+
}
816+
}
817+
}
818+
}
819+
820+
private class JSONChunkProccesor implements ChunkProccesor {
821+
private JsonAdapter<QueryResult> adapter;
822+
823+
public JSONChunkProccesor(final JsonAdapter<QueryResult> adapter) {
824+
this.adapter = adapter;
825+
}
826+
827+
@Override
828+
public void process(final ResponseBody chunkedBody, final Consumer<QueryResult> consumer) throws IOException {
829+
try {
830+
BufferedSource source = chunkedBody.source();
831+
while (true) {
832+
QueryResult result = adapter.fromJson(source);
833+
if (result != null) {
834+
consumer.accept(result);
835+
}
836+
}
837+
} catch (EOFException e) {
838+
QueryResult queryResult = new QueryResult();
839+
queryResult.setError("DONE");
840+
consumer.accept(queryResult);
841+
}
842+
}
843+
}
755844
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package org.influxdb.msgpack;
2+
3+
import java.lang.annotation.Annotation;
4+
import java.lang.reflect.Type;
5+
6+
import okhttp3.ResponseBody;
7+
import retrofit2.Converter;
8+
import retrofit2.Retrofit;
9+
10+
/**
11+
* A Retrofit Convertor Factory for MessagePack response.
12+
*
13+
* @author hoan.le [at] bonitoo.io
14+
*
15+
*/
16+
public class MessagePackConverterFactory extends Converter.Factory {
17+
public static MessagePackConverterFactory create() {
18+
return new MessagePackConverterFactory();
19+
}
20+
21+
@Override
22+
public Converter<ResponseBody, ?> responseBodyConverter(final Type type, final Annotation[] annotations,
23+
final Retrofit retrofit) {
24+
return new MessagePackResponseBodyConverter();
25+
}
26+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package org.influxdb.msgpack;
2+
3+
import java.io.IOException;
4+
import java.io.InputStream;
5+
6+
import org.influxdb.dto.QueryResult;
7+
import okhttp3.ResponseBody;
8+
import retrofit2.Converter;
9+
10+
/**
11+
* Test the InfluxDB API over MessagePack format.
12+
*
13+
* @author hoan.le [at] bonitoo.io
14+
*
15+
*/
16+
public class MessagePackResponseBodyConverter implements Converter<ResponseBody, QueryResult> {
17+
18+
@Override
19+
public QueryResult convert(final ResponseBody value) throws IOException {
20+
try (InputStream is = value.byteStream()) {
21+
MessagePackTraverser traverser = new MessagePackTraverser();
22+
for (QueryResult queryResult : traverser.traverse(is)) {
23+
return queryResult;
24+
}
25+
return null;
26+
}
27+
}
28+
}

0 commit comments

Comments
 (0)