20
20
import org .msgpack .value .ValueType ;
21
21
22
22
/**
23
- * Traverse the MessagePack input stream and return Query Result objects .
23
+ * Traverse the MessagePack input stream and return Query Result object(s) .
24
24
*
25
25
* @author hoan.le [at] bonitoo.io
26
26
*
@@ -29,7 +29,16 @@ public class MessagePackTraverser {
29
29
30
30
private String lastStringNode ;
31
31
32
- public Iterable <QueryResult > traverse (final InputStream is ) throws IOException {
32
+ /**
33
+ * Traverse over the whole message pack stream.
34
+ * This method can be used for converting query results in chunk
35
+ *
36
+ * @param is
37
+ * The MessagePack format input stream
38
+ * @return an Iterable over the QueryResult objects
39
+
40
+ */
41
+ public Iterable <QueryResult > traverse (final InputStream is ) {
33
42
MessageUnpacker unpacker = MessagePack .newDefaultUnpacker (is );
34
43
35
44
return () -> {
@@ -45,23 +54,42 @@ public boolean hasNext() {
45
54
46
55
@ Override
47
56
public QueryResult next () {
48
- QueryResult queryResult = new QueryResult ();
49
- QueryResultModelPath queryResultPath = new QueryResultModelPath ();
50
- queryResultPath .add ("queryResult" , queryResult );
51
- try {
52
- traverse (unpacker , queryResultPath , 1 );
53
- } catch (IOException e ) {
54
- throw new InfluxDBException (e );
55
- }
56
- return queryResult ;
57
+ return parse (unpacker );
57
58
}
58
59
};
59
60
};
60
61
61
62
}
62
63
63
- void traverse (final MessageUnpacker unpacker , final QueryResultModelPath queryResultPath ,
64
- final int readAmount ) throws IOException {
64
+ /**
65
+ * Parse the message pack stream.
66
+ * This method can be used for converting query result from normal query response
67
+ * where exactly one QueryResult returned
68
+ *
69
+ * @param is
70
+ * The MessagePack format input stream
71
+ * @return QueryResult
72
+
73
+ */
74
+ public QueryResult parse (final InputStream is ) {
75
+ MessageUnpacker unpacker = MessagePack .newDefaultUnpacker (is );
76
+ return parse (unpacker );
77
+ }
78
+
79
+ private QueryResult parse (MessageUnpacker unpacker ) {
80
+ QueryResult queryResult = new QueryResult ();
81
+ QueryResultModelPath queryResultPath = new QueryResultModelPath ();
82
+ queryResultPath .add ("queryResult" , queryResult );
83
+ try {
84
+ traverse (unpacker , queryResultPath , 1 );
85
+ } catch (IOException e ) {
86
+ throw new InfluxDBException (e );
87
+ }
88
+ return queryResult ;
89
+ }
90
+
91
+ void traverse (final MessageUnpacker unpacker , final QueryResultModelPath queryResultPath , final int readAmount )
92
+ throws IOException {
65
93
int amount = 0 ;
66
94
67
95
while (unpacker .hasNext () && amount < readAmount ) {
0 commit comments