Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #365 - fix problem of connecting to the influx api with URL which… #400

Merged
merged 12 commits into from
Jun 5, 2018
38 changes: 29 additions & 9 deletions compile-and-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,44 @@ INFLUXDB_VERSION="${INFLUXDB_VERSION:-$DEFAULT_INFLUXDB_VERSION}"
MAVEN_JAVA_VERSION="${MAVEN_JAVA_VERSION:-$DEFAULT_MAVEN_JAVA_VERSION}"

echo "Run tests with maven:${MAVEN_JAVA_VERSION} on onfluxdb-${INFLUXDB_VERSION}"

docker kill influxdb || true
docker rm influxdb || true
docker pull influxdb:${version}-alpine || true
docker run \
--detach \
--name influxdb \
--publish 8086:8086 \
--publish 8089:8089/udp \
--detach \
--name influxdb \
--publish 8086:8086 \
--publish 8089:8089/udp \
--volume ${PWD}/influxdb.conf:/etc/influxdb/influxdb.conf \
influxdb:${INFLUXDB_VERSION}-alpine
influxdb:${INFLUXDB_VERSION}-alpine

echo Starting Nginx
docker kill nginx || true
docker rm nginx || true

echo ----- STARTING NGINX CONTAINER -----
docker run \
--detach \
--name nginx \
--publish 8080:8080 \
--publish 8080:8080/udp \
--volume ${PWD}/src/test/nginx/nginx.conf:/etc/nginx/nginx.conf:ro \
--link influxdb:influxdb \
nginx:stable nginx '-g' 'daemon off;'

PROXY_API_URL=http://nginx:8080/influx-api/
PROXY_UDP_PORT=8080

docker run -it --rm \
--volume $PWD:/usr/src/mymaven \
--volume $PWD/.m2:/root/.m2 \
--workdir /usr/src/mymaven \
--link=influxdb \
--env INFLUXDB_IP=influxdb \
maven:${MAVEN_JAVA_VERSION} mvn clean install
--link=influxdb \
--link=nginx \
--env INFLUXDB_IP=influxdb \
--env PROXY_API_URL=$PROXY_API_URL \
--env PROXY_UDP_PORT=$PROXY_UDP_PORT \
maven:${MAVEN_JAVA_VERSION} mvn clean install

docker kill influxdb || true
docker kill nginx || true
22 changes: 11 additions & 11 deletions src/main/java/org/influxdb/impl/InfluxDBService.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ interface InfluxDBService {
public static final String EPOCH = "epoch";
public static final String CHUNK_SIZE = "chunk_size";

@GET("/ping")
@GET("ping")
public Call<ResponseBody> ping();

/**
Expand All @@ -38,49 +38,49 @@ interface InfluxDBService {
* @param consistency optional The write consistency level required for the write to succeed.
* Can be one of one, any, all, quorum. Defaults to all.
*/
@POST("/write")
@POST("write")
public Call<ResponseBody> writePoints(@Query(U) String username,
@Query(P) String password, @Query(DB) String database,
@Query(RP) String retentionPolicy, @Query(PRECISION) String precision,
@Query(CONSISTENCY) String consistency, @Body RequestBody batchPoints);

@GET("/query")
@GET("query")
public Call<QueryResult> query(@Query(U) String username, @Query(P) String password, @Query(DB) String db,
@Query(EPOCH) String epoch, @Query(value = Q, encoded = true) String query);

@POST("/query")
@POST("query")
public Call<QueryResult> query(@Query(U) String username, @Query(P) String password, @Query(DB) String db,
@Query(EPOCH) String epoch, @Query(value = Q, encoded = true) String query,
@Query(value = PARAMS, encoded = true) String params);

@GET("/query")
@GET("query")
public Call<QueryResult> query(@Query(U) String username, @Query(P) String password, @Query(DB) String db,
@Query(value = Q, encoded = true) String query);

@POST("/query")
@POST("query")
public Call<QueryResult> postQuery(@Query(U) String username, @Query(P) String password, @Query(DB) String db,
@Query(value = Q, encoded = true) String query);

@POST("/query")
@POST("query")
public Call<QueryResult> postQuery(@Query(U) String username, @Query(P) String password, @Query(DB) String db,
@Query(value = Q, encoded = true) String query, @Query(value = PARAMS, encoded = true) String params);

@GET("/query")
@GET("query")
public Call<QueryResult> query(@Query(U) String username, @Query(P) String password,
@Query(value = Q, encoded = true) String query);

@POST("/query")
@POST("query")
public Call<QueryResult> postQuery(@Query(U) String username,
@Query(P) String password, @Query(value = Q, encoded = true) String query);

@Streaming
@GET("/query?chunked=true")
@GET("query?chunked=true")
public Call<ResponseBody> query(@Query(U) String username,
@Query(P) String password, @Query(DB) String db, @Query(value = Q, encoded = true) String query,
@Query(CHUNK_SIZE) int chunkSize);

@Streaming
@POST("/query?chunked=true")
@POST("query?chunked=true")
public Call<ResponseBody> query(@Query(U) String username,
@Query(P) String password, @Query(DB) String db, @Query(value = Q, encoded = true) String query,
@Query(CHUNK_SIZE) int chunkSize, @Query(value = PARAMS, encoded = true) String params);
Expand Down
84 changes: 84 additions & 0 deletions src/test/java/org/influxdb/InfluxDBProxyTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package org.influxdb;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith;
/**
* Test the InfluxDB API.
*
* @author hoan.le [at] bonitoo.io
*
*/
@RunWith(JUnitPlatform.class)
public class InfluxDBProxyTest {
private InfluxDB influxDB;
private static final String TEST_DB = "InfluxDBProxyTest_db";
private static final String UDP_DB = "udp";

@BeforeEach
public void setUp() throws InterruptedException, IOException {
influxDB = TestUtils.connectToInfluxDB(TestUtils.getProxyApiUrl());
}

/**
* delete database after all tests end.
*/
@AfterEach
public void cleanup(){
influxDB.close();
}

@Test
public void testWriteSomePointThroughTcpProxy() {
influxDB.createDatabase(TEST_DB);
influxDB.setDatabase(TEST_DB);

for(int i = 0; i < 20; i++) {
Point point = Point.measurement("weather")
.time(i,TimeUnit.HOURS)
.addField("temperature", (double) i)
.addField("humidity", (double) (i) * 1.1)
.addField("uv_index", "moderate").build();
influxDB.write(point);
}

QueryResult result = influxDB.query(new Query("select * from weather", TEST_DB));
//check points written already to DB
Assertions.assertEquals(20, result.getResults().get(0).getSeries().get(0).getValues().size());

influxDB.deleteDatabase(TEST_DB);
}

@Test
public void testWriteSomePointThroughUdpProxy() throws InterruptedException {
influxDB.createDatabase(UDP_DB);
influxDB.setDatabase(UDP_DB);

int proxyUdpPort = Integer.parseInt(TestUtils.getProxyUdpPort());
for(int i = 0; i < 20; i++) {
Point point = Point.measurement("weather")
.time(i,TimeUnit.HOURS)
.addField("temperature", (double) i)
.addField("humidity", (double) (i) * 1.1)
.addField("uv_index", "moderate").build();
influxDB.write(proxyUdpPort, point);
}

Thread.sleep(2000);
QueryResult result = influxDB.query(new Query("select * from weather", UDP_DB));
//check points written already to DB
Assertions.assertEquals(20, result.getResults().get(0).getSeries().get(0).getValues().size());

influxDB.deleteDatabase(UDP_DB);
}

}
90 changes: 0 additions & 90 deletions src/test/java/org/influxdb/InfluxDBTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -220,42 +220,6 @@ public void testWrite() {
this.influxDB.deleteDatabase(dbName);
}

/**
* Test the implementation of {@link InfluxDB#write(int, Point)}'s sync support.
*/
@Test
public void testSyncWritePointThroughUDP() throws InterruptedException {
this.influxDB.disableBatch();
String measurement = TestUtils.getRandomMeasurement();
Point point = Point.measurement(measurement).tag("atag", "test").addField("used", 80L).addField("free", 1L).build();
this.influxDB.write(UDP_PORT, point);
Thread.sleep(2000);
Query query = new Query("SELECT * FROM " + measurement + " GROUP BY *", UDP_DATABASE);
QueryResult result = this.influxDB.query(query);
Assertions.assertFalse(result.getResults().get(0).getSeries().get(0).getTags().isEmpty());
}

/**
* Test the implementation of {@link InfluxDB#write(int, Point)}'s async support.
*/
@Test
public void testAsyncWritePointThroughUDP() throws InterruptedException {
this.influxDB.enableBatch(1, 1, TimeUnit.SECONDS);
try{
Assertions.assertTrue(this.influxDB.isBatchEnabled());
String measurement = TestUtils.getRandomMeasurement();
Point point = Point.measurement(measurement).tag("atag", "test").addField("used", 80L).addField("free", 1L).build();
this.influxDB.write(UDP_PORT, point);
Thread.sleep(2000);
Query query = new Query("SELECT * FROM " + measurement + " GROUP BY *", UDP_DATABASE);
QueryResult result = this.influxDB.query(query);
Assertions.assertFalse(result.getResults().get(0).getSeries().get(0).getTags().isEmpty());
}finally{
this.influxDB.disableBatch();
}
}


/**
* Test the implementation of {@link InfluxDB#write(int, Point)}'s async support.
*/
Expand Down Expand Up @@ -307,60 +271,6 @@ public void testWriteStringDataSimple() {
this.influxDB.deleteDatabase(dbName);
}

/**
* Test writing to the database using string protocol through UDP.
*/
@Test
public void testWriteStringDataThroughUDP() throws InterruptedException {
String measurement = TestUtils.getRandomMeasurement();
this.influxDB.write(UDP_PORT, measurement + ",atag=test idle=90,usertime=9,system=1");
//write with UDP may be executed on server after query with HTTP. so sleep 2s to handle this case
Thread.sleep(2000);
Query query = new Query("SELECT * FROM " + measurement + " GROUP BY *", UDP_DATABASE);
QueryResult result = this.influxDB.query(query);
Assertions.assertFalse(result.getResults().get(0).getSeries().get(0).getTags().isEmpty());
}

/**
* Test writing multiple records to the database using string protocol through UDP.
*/
@Test
public void testWriteMultipleStringDataThroughUDP() throws InterruptedException {
String measurement = TestUtils.getRandomMeasurement();
this.influxDB.write(UDP_PORT, measurement + ",atag=test1 idle=100,usertime=10,system=1\n" +
measurement + ",atag=test2 idle=200,usertime=20,system=2\n" +
measurement + ",atag=test3 idle=300,usertime=30,system=3");
Thread.sleep(2000);
Query query = new Query("SELECT * FROM " + measurement + " GROUP BY *", UDP_DATABASE);
QueryResult result = this.influxDB.query(query);

Assertions.assertEquals(3, result.getResults().get(0).getSeries().size());
Assertions.assertEquals("test1", result.getResults().get(0).getSeries().get(0).getTags().get("atag"));
Assertions.assertEquals("test2", result.getResults().get(0).getSeries().get(1).getTags().get("atag"));
Assertions.assertEquals("test3", result.getResults().get(0).getSeries().get(2).getTags().get("atag"));
}

/**
* Test writing multiple separate records to the database using string protocol through UDP.
*/
@Test
public void testWriteMultipleStringDataLinesThroughUDP() throws InterruptedException {
String measurement = TestUtils.getRandomMeasurement();
this.influxDB.write(UDP_PORT, Arrays.asList(
measurement + ",atag=test1 idle=100,usertime=10,system=1",
measurement + ",atag=test2 idle=200,usertime=20,system=2",
measurement + ",atag=test3 idle=300,usertime=30,system=3"
));
Thread.sleep(2000);
Query query = new Query("SELECT * FROM " + measurement + " GROUP BY *", UDP_DATABASE);
QueryResult result = this.influxDB.query(query);

Assertions.assertEquals(3, result.getResults().get(0).getSeries().size());
Assertions.assertEquals("test1", result.getResults().get(0).getSeries().get(0).getTags().get("atag"));
Assertions.assertEquals("test2", result.getResults().get(0).getSeries().get(1).getTags().get("atag"));
Assertions.assertEquals("test3", result.getResults().get(0).getSeries().get(2).getTags().get("atag"));
}

/**
* When batch of points' size is over UDP limit, the expected exception
* is java.lang.RuntimeException: java.net.SocketException:
Expand Down
Loading