Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Issue #389 : Support for MessagePack #462

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
- Support dynamic measurement name in InfluxDBResultMapper [PR #423](https://github.com/influxdata/influxdb-java/pull/423)
- Debug mode which allows HTTP requests being sent to the database to be logged [PR #450](https://github.com/influxdata/influxdb-java/pull/450)
- Fix problem of connecting to the influx api with URL which does not points to the url root (e.g. localhots:80/influx-api/) [PR #400] (https://github.com/influxdata/influxdb-java/pull/400)
- Support for MessagePack [PR #462] (https://github.com/influxdata/influxdb-java/pull/462)
- This PR basically improves the query performance by approximately 20%

## 2.10 [2018-04-26]

Expand Down
1 change: 1 addition & 0 deletions compile-and-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ docker run -it --rm \
--workdir /usr/src/mymaven \
--link=influxdb \
--link=nginx \
--env INFLUXDB_VERSION=${INFLUXDB_VERSION} \
--env INFLUXDB_IP=influxdb \
--env PROXY_API_URL=$PROXY_API_URL \
--env PROXY_UDP_PORT=$PROXY_UDP_PORT \
Expand Down
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,11 @@
<artifactId>converter-moshi</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.komamitsu</groupId>
<artifactId>retrofit-converter-msgpack</artifactId>
<version>1.0.0</version>
</dependency>
<!-- If we use okhttp instead of java urlconnection we achieve server failover
of the influxdb server address resolves to all influxdb server ips. -->
<dependency>
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/org/influxdb/InfluxDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ public String value() {
}
}

/**
* Format of HTTP Response body from InfluxDB server.
*/
public enum ResponseFormat {
/** application/json format. */
JSON,
/** application/x-msgpack format. */
MSGPACK
}
/**
* Set the loglevel which is used for REST related actions.
*
Expand Down
24 changes: 23 additions & 1 deletion src/main/java/org/influxdb/InfluxDBFactory.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.influxdb;

import org.influxdb.InfluxDB.ResponseFormat;
import org.influxdb.impl.InfluxDBImpl;

import okhttp3.OkHttpClient;
Expand Down Expand Up @@ -78,9 +79,30 @@ public static InfluxDB connect(final String url, final OkHttpClient.Builder clie
*/
public static InfluxDB connect(final String url, final String username, final String password,
final OkHttpClient.Builder client) {
return connect(url, username, password, client, ResponseFormat.JSON);
}

/**
* Create a connection to a InfluxDB.
*
* @param url
* the url to connect to.
* @param username
* the username which is used to authorize against the influxDB instance.
* @param password
* the password for the username which is used to authorize against the influxDB
* instance.
* @param client
* the HTTP client to use
* @param responseFormat
* The {@code ResponseFormat} to use for response from InfluxDB server
* @return a InfluxDB adapter suitable to access a InfluxDB.
*/
public static InfluxDB connect(final String url, final String username, final String password,
final OkHttpClient.Builder client, final ResponseFormat responseFormat) {
Preconditions.checkNonEmptyString(url, "url");
Preconditions.checkNonEmptyString(username, "username");
Objects.requireNonNull(client, "client");
return new InfluxDBImpl(url, username, password, client);
return new InfluxDBImpl(url, username, password, client, responseFormat);
}
}
208 changes: 166 additions & 42 deletions src/main/java/org/influxdb/impl/InfluxDBImpl.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
package org.influxdb.impl;


import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.squareup.moshi.JsonAdapter;
import com.squareup.moshi.Moshi;

import okhttp3.Headers;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.ResponseBody;
import okhttp3.logging.HttpLoggingInterceptor;
Expand All @@ -25,8 +32,13 @@
import org.influxdb.dto.QueryResult;
import org.influxdb.impl.BatchProcessor.HttpBatchEntry;
import org.influxdb.impl.BatchProcessor.UdpBatchEntry;
import org.komamitsu.retrofit.converter.msgpack.MessagePackConverterFactory;
import org.msgpack.jackson.dataformat.ExtensionTypeCustomDeserializers;
import org.msgpack.jackson.dataformat.MessagePackFactory;

import retrofit2.Call;
import retrofit2.Callback;
import retrofit2.Converter.Factory;
import retrofit2.Response;
import retrofit2.Retrofit;
import retrofit2.converter.moshi.MoshiConverterFactory;
Expand All @@ -38,6 +50,7 @@
import java.net.InetAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -56,6 +69,8 @@
*/
public class InfluxDBImpl implements InfluxDB {

private static final String APPLICATION_MSGPACK = "application/x-msgpack";

static final okhttp3.MediaType MEDIA_TYPE_STRING = MediaType.parse("text/plain");

private static final String SHOW_DATABASE_COMMAND_ENCODED = Query.encode("SHOW DATABASES");
Expand All @@ -66,6 +81,9 @@ public class InfluxDBImpl implements InfluxDB {
*
* @see org.influxdb.impl.LOG_LEVEL_PROPERTY
*/

private static final ObjectMapper QUERY_RESULTS_OBJECT_MAPPER;
private static final ObjectReader QUERY_RESULTS_OBJECT_READER;
private static final LogLevel LOG_LEVEL = LogLevel.parseLogLevel(System.getProperty(LOG_LEVEL_PROPERTY));

private final InetAddress hostAddress;
Expand All @@ -82,15 +100,45 @@ public class InfluxDBImpl implements InfluxDB {
private final HttpLoggingInterceptor loggingInterceptor;
private final GzipRequestInterceptor gzipRequestInterceptor;
private LogLevel logLevel = LogLevel.NONE;
private JsonAdapter<QueryResult> adapter;
private String database;
private String retentionPolicy = "autogen";
private ConsistencyLevel consistency = ConsistencyLevel.ONE;
private final ChunkProccesor chunkProccesor;


static {
MessagePackFactory messagePackFactory = new MessagePackFactory();
ExtensionTypeCustomDeserializers extTypeCustomDeserializers = new ExtensionTypeCustomDeserializers();
final byte msgPackTimeExtType = (byte) 5;
final int timeOffset = 0;
final int timeByteArrayLength = 8;
extTypeCustomDeserializers.addCustomDeser(msgPackTimeExtType, data -> {
return ByteBuffer.wrap(data, timeOffset, timeByteArrayLength).getLong();
});
messagePackFactory.setExtTypeCustomDesers(extTypeCustomDeserializers);

QUERY_RESULTS_OBJECT_MAPPER = new ObjectMapper(messagePackFactory);
QUERY_RESULTS_OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

JavaType javaType = QUERY_RESULTS_OBJECT_MAPPER.getTypeFactory().constructType(QueryResult.class);
QUERY_RESULTS_OBJECT_READER = QUERY_RESULTS_OBJECT_MAPPER.readerFor(javaType);
}

/**
* Constructs a new {@code InfluxDBImpl}.
* @param url
* The InfluxDB server API URL
* @param username
* The InfluxDB user name
* @param password
* The InfluxDB user password
* @param client
* The OkHttp Client Builder
* @param responseFormat
* The {@code ResponseFormat} to use for response from InfluxDB server
*/
public InfluxDBImpl(final String url, final String username, final String password,
final OkHttpClient.Builder client) {
super();
Moshi moshi = new Moshi.Builder().build();
final OkHttpClient.Builder client, final ResponseFormat responseFormat) {
this.hostAddress = parseHostAddress(url);
this.username = username;
this.password = password;
Expand All @@ -99,34 +147,71 @@ public InfluxDBImpl(final String url, final String username, final String passwo
setLogLevel(LOG_LEVEL);

this.gzipRequestInterceptor = new GzipRequestInterceptor();
client.addInterceptor(loggingInterceptor).addInterceptor(gzipRequestInterceptor);

Factory converterFactory = null;
switch (responseFormat) {
case MSGPACK:
client.addInterceptor(chain -> {
Request request = chain.request().newBuilder().addHeader("Accept", APPLICATION_MSGPACK).build();
return chain.proceed(request);
});

converterFactory = MessagePackConverterFactory.create(QUERY_RESULTS_OBJECT_MAPPER);
chunkProccesor = new MessagePackChunkProccesor(QUERY_RESULTS_OBJECT_READER);
break;
case JSON:
default:
converterFactory = MoshiConverterFactory.create();

Moshi moshi = new Moshi.Builder().build();
JsonAdapter<QueryResult> adapter = moshi.adapter(QueryResult.class);
chunkProccesor = new JSONChunkProccesor(adapter);
break;
}

this.retrofit = new Retrofit.Builder()
.baseUrl(url)
.client(client.addInterceptor(loggingInterceptor).addInterceptor(gzipRequestInterceptor).build())
.addConverterFactory(MoshiConverterFactory.create())
.client(client.build())
.addConverterFactory(converterFactory)
.build();
this.influxDBService = this.retrofit.create(InfluxDBService.class);
this.adapter = moshi.adapter(QueryResult.class);
}

InfluxDBImpl(final String url, final String username, final String password, final OkHttpClient.Builder client,
final InfluxDBService influxDBService, final JsonAdapter<QueryResult> adapter) {
super();
this.hostAddress = parseHostAddress(url);
this.username = username;
this.password = password;

this.loggingInterceptor = new HttpLoggingInterceptor();
setLogLevel(LOG_LEVEL);

this.gzipRequestInterceptor = new GzipRequestInterceptor();
this.retrofit = new Retrofit.Builder()
.baseUrl(url)
.client(client.addInterceptor(loggingInterceptor).addInterceptor(gzipRequestInterceptor).build())
.addConverterFactory(MoshiConverterFactory.create())
.build();
this.influxDBService = influxDBService;
this.adapter = adapter;

if (ResponseFormat.MSGPACK.equals(responseFormat)) {
String[] versionNumbers = version().split("\\.");
final int major = Integer.parseInt(versionNumbers[0]);
final int minor = Integer.parseInt(versionNumbers[1]);
final int fromMinor = 4;
if ((major < 2) && ((major != 1) || (minor < fromMinor))) {
throw new InfluxDBException("MessagePack format is only supported from InfluxDB version 1.4 and later");
}
}
}

public InfluxDBImpl(final String url, final String username, final String password,
final OkHttpClient.Builder client) {
this(url, username, password, client, ResponseFormat.JSON);

}

InfluxDBImpl(final String url, final String username, final String password, final OkHttpClient.Builder client,
final InfluxDBService influxDBService, final JsonAdapter<QueryResult> adapter) {
super();
this.hostAddress = parseHostAddress(url);
this.username = username;
this.password = password;

this.loggingInterceptor = new HttpLoggingInterceptor();
setLogLevel(LOG_LEVEL);

this.gzipRequestInterceptor = new GzipRequestInterceptor();
this.retrofit = new Retrofit.Builder().baseUrl(url)
.client(client.addInterceptor(loggingInterceptor).addInterceptor(gzipRequestInterceptor).build())
.addConverterFactory(MoshiConverterFactory.create()).build();
this.influxDBService = influxDBService;

chunkProccesor = new JSONChunkProccesor(adapter);
}

public InfluxDBImpl(final String url, final String username, final String password,
final OkHttpClient.Builder client, final String database,
Expand Down Expand Up @@ -498,28 +583,21 @@ public void query(final Query query, final int chunkSize, final Consumer<QueryRe
public void onResponse(final Call<ResponseBody> call, final Response<ResponseBody> response) {
try {
if (response.isSuccessful()) {
BufferedSource source = response.body().source();
while (true) {
QueryResult result = InfluxDBImpl.this.adapter.fromJson(source);
if (result != null) {
consumer.accept(result);
}
}
}
try (ResponseBody errorBody = response.errorBody()) {
throw new InfluxDBException(errorBody.string());
ResponseBody chunkedBody = response.body();
chunkProccesor.process(chunkedBody, consumer);
} else {
//REVIEW: must be handled consistently with IOException.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm not wrong, IOException will be thrown by Moshi when the stream consumer arrived at the end of the stream and it's not a real error (like errorBody != null).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's actually EOFException on reaching end of stream (handled in JSONChunkProccesor.process())

ResponseBody errorBody = response.errorBody();
if (errorBody != null) {
throw new InfluxDBException(errorBody.string());
}
}
} catch (EOFException e) {
QueryResult queryResult = new QueryResult();
queryResult.setError("DONE");
consumer.accept(queryResult);
} catch (IOException e) {
QueryResult queryResult = new QueryResult();
queryResult.setError(e.toString());
consumer.accept(queryResult);
}
}

@Override
public void onFailure(final Call<ResponseBody> call, final Throwable t) {
throw new InfluxDBException(t);
Expand Down Expand Up @@ -752,4 +830,50 @@ public void dropRetentionPolicy(final String rpName, final String database) {
Query.encode(queryBuilder.toString())));
}

private interface ChunkProccesor {
void process(ResponseBody chunkedBody, Consumer<QueryResult> consumer) throws IOException;
}

private class MessagePackChunkProccesor implements ChunkProccesor {

private ObjectReader objectReader;
public MessagePackChunkProccesor(final ObjectReader objectReader) {
this.objectReader = objectReader;
}

@Override
public void process(final ResponseBody chunkedBody, final Consumer<QueryResult> consumer) throws IOException {
BufferedSource source = chunkedBody.source();
QueryResult result = null;
MappingIterator<QueryResult> results = objectReader.readValues(source.inputStream());
while (results.hasNext()) {
result = results.nextValue();
consumer.accept(result);
}
}
}

private class JSONChunkProccesor implements ChunkProccesor {
private JsonAdapter<QueryResult> adapter;
public JSONChunkProccesor(final JsonAdapter<QueryResult> adapter) {
this.adapter = adapter;
}

@Override
public void process(final ResponseBody chunkedBody, final Consumer<QueryResult> consumer) throws IOException {
try {
BufferedSource source = chunkedBody.source();
while (true) {
QueryResult result = adapter.fromJson(source);
if (result != null) {
consumer.accept(result);
}
}
} catch (EOFException e) {
QueryResult queryResult = new QueryResult();
queryResult.setError("DONE");
consumer.accept(queryResult);
}
}
}
}
Loading