Skip to content

Commit ddcb1a9

Browse files
committed
[FLINK-32097][Connectors/Kinesis] Added tests for RecordBatch regarding deaggregation.
There was a need to create aggregated records to test if the records are being deaggregated correctly. For that there is a dependency that can create aggregated records, but unfortunately it does not have a version available in the maven repository that is compatible with the kcl 3.x. So it uses a release of github that is compatible. Also, the protobuf version set in the dependency management was not compatible with the kcl 3.x
1 parent 767f576 commit ddcb1a9

File tree

3 files changed

+118
-5
lines changed

3 files changed

+118
-5
lines changed

flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml

+17
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,14 @@ under the License.
3333
<name>Flink : Connectors : AWS : Amazon Kinesis Data Streams Connector v2</name>
3434
<packaging>jar</packaging>
3535

36+
<repositories>
37+
<!-- used for the kinesis aggregator dependency since it is not available in maven central -->
38+
<repository>
39+
<id>jitpack.io</id>
40+
<url>https://jitpack.io</url>
41+
</repository>
42+
</repositories>
43+
3644
<dependencies>
3745
<dependency>
3846
<groupId>org.apache.flink</groupId>
@@ -107,6 +115,15 @@ under the License.
107115
<scope>test</scope>
108116
</dependency>
109117

118+
<dependency>
119+
<!-- the kinesis aggregator dependency since it is not available in maven central -->
120+
<!-- look into issue https://github.com/awslabs/kinesis-aggregation/issues/120 -->
121+
<groupId>com.github.awslabs.kinesis-aggregation</groupId>
122+
<artifactId>amazon-kinesis-aggregator</artifactId>
123+
<version>2.0.3</version>
124+
<scope>test</scope>
125+
</dependency>
126+
110127
<dependency>
111128
<groupId>nl.jqno.equalsverifier</groupId>
112129
<artifactId>equalsverifier</artifactId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.kinesis.source.reader;
20+
21+
import com.amazonaws.kinesis.agg.AggRecord;
22+
import com.amazonaws.kinesis.agg.RecordAggregator;
23+
import org.junit.jupiter.api.Test;
24+
import software.amazon.awssdk.core.SdkBytes;
25+
import software.amazon.awssdk.services.kinesis.model.Record;
26+
27+
import java.time.Instant;
28+
import java.util.Collections;
29+
import java.util.List;
30+
import java.util.stream.Collectors;
31+
import java.util.stream.Stream;
32+
33+
import static org.apache.flink.connector.kinesis.source.util.TestUtil.getTestRecord;
34+
import static org.apache.flink.connector.kinesis.source.util.TestUtil.getTestSplit;
35+
import static org.assertj.core.api.Assertions.assertThat;
36+
37+
class RecordBatchTest {
38+
39+
@Test
40+
public void testDeaggregateRecordsPassThrough() {
41+
List<Record> records =
42+
Stream.of(getTestRecord("data-1"), getTestRecord("data-2"), getTestRecord("data-3"))
43+
.collect(Collectors.toList());
44+
45+
RecordBatch result =
46+
new RecordBatch(records, getTestSplit(), 100L, true);
47+
48+
assertThat(result.getDeaggregatedRecords().size()).isEqualTo(3);
49+
}
50+
51+
@Test
52+
public void testDeaggregateRecordsWithAggregatedRecords() {
53+
List<Record> records =
54+
Stream.of(getTestRecord("data-1"), getTestRecord("data-2"), getTestRecord("data-3"))
55+
.collect(Collectors.toList());
56+
57+
Record aggregatedRecord = createAggregatedRecord(records);
58+
59+
RecordBatch result =
60+
new RecordBatch(Collections.singletonList(aggregatedRecord), getTestSplit(), 100L, true);
61+
62+
assertThat(result.getDeaggregatedRecords().size()).isEqualTo(3);
63+
}
64+
65+
@Test
66+
public void testGetMillisBehindLatest() {
67+
RecordBatch result =
68+
new RecordBatch(
69+
Collections.singletonList(getTestRecord("data-1")), getTestSplit(), 100L, true);
70+
71+
assertThat(result.getMillisBehindLatest()).isEqualTo(100L);
72+
}
73+
74+
@Test
75+
public void testIsCompleted() {
76+
RecordBatch result =
77+
new RecordBatch(
78+
Collections.singletonList(getTestRecord("data-1")), getTestSplit(), 100L, true);
79+
80+
assertThat(result.isCompleted()).isTrue();
81+
}
82+
83+
private static Record createAggregatedRecord(List<Record> records) {
84+
RecordAggregator recordAggregator = new RecordAggregator();
85+
86+
for (Record record : records) {
87+
try {
88+
recordAggregator.addUserRecord("key", record.data().asByteArray());
89+
} catch (Exception e) {
90+
throw new RuntimeException("Failed to add record to aggregator", e);
91+
}
92+
}
93+
94+
AggRecord aggRecord = recordAggregator.clearAndGet();
95+
96+
return Record.builder()
97+
.data(SdkBytes.fromByteArray(aggRecord.toRecordBytes()))
98+
.approximateArrivalTimestamp(Instant.now())
99+
.build();
100+
}
101+
}

pom.xml

-5
Original file line numberDiff line numberDiff line change
@@ -332,11 +332,6 @@ under the License.
332332
<artifactId>javassist</artifactId>
333333
<version>3.24.0-GA</version>
334334
</dependency>
335-
<dependency>
336-
<groupId>com.google.protobuf</groupId>
337-
<artifactId>protobuf-java</artifactId>
338-
<version>3.25.5</version>
339-
</dependency>
340335
<dependency>
341336
<groupId>com.google.guava</groupId>
342337
<artifactId>guava</artifactId>

0 commit comments

Comments
 (0)