Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-32097][Connectors/Kinesis] Implement support for Kinesis deaggregation #188

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
22 changes: 22 additions & 0 deletions flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ under the License.
<name>Flink : Connectors : AWS : Amazon Kinesis Data Streams Connector v2</name>
<packaging>jar</packaging>

<repositories>
<!-- used for the kinesis aggregator dependency since it is not available in maven central -->
<repository>
<id>jitpack.io</id>
<url>https://jitpack.io</url>
</repository>
</repositories>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
Expand All @@ -52,6 +60,11 @@ under the License.
<artifactId>kinesis</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-client</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>arns</artifactId>
Expand Down Expand Up @@ -102,6 +115,15 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<!-- the kinesis aggregator dependency since it is not available in maven central -->
<!-- look into issue https://github.com/awslabs/kinesis-aggregation/issues/120 -->
<groupId>com.github.awslabs.kinesis-aggregation</groupId>
<artifactId>amazon-kinesis-aggregator</artifactId>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has Apache 2.0 licence, all good so
https://github.com/awslabs/kinesis-aggregation

<version>2.0.3</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>nl.jqno.equalsverifier</groupId>
<artifactId>equalsverifier</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerResponse;
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.utils.AttributeMap;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

import java.time.Duration;
import java.util.Map;
Expand Down Expand Up @@ -209,8 +209,10 @@ public SimpleVersionedSerializer<KinesisShardSplit> getSplitSerializer() {
return new KinesisStreamsSourceEnumeratorStateSerializer(new KinesisShardSplitSerializer());
}

private Supplier<SplitReader<Record, KinesisShardSplit>> getKinesisShardSplitReaderSupplier(
Configuration sourceConfig, Map<String, KinesisShardMetrics> shardMetricGroupMap) {
private Supplier<SplitReader<KinesisClientRecord, KinesisShardSplit>>
getKinesisShardSplitReaderSupplier(
Configuration sourceConfig,
Map<String, KinesisShardMetrics> shardMetricGroupMap) {
KinesisSourceConfigOptions.ReaderType readerType = sourceConfig.get(READER_TYPE);
switch (readerType) {
// We create a new stream proxy for each split reader since they have their own
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

import javax.annotation.Nullable;

Expand All @@ -41,7 +41,6 @@
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

Expand All @@ -50,10 +49,10 @@
/** Base implementation of the SplitReader for reading from KinesisShardSplits. */
@Internal
public abstract class KinesisShardSplitReaderBase
implements SplitReader<Record, KinesisShardSplit> {
implements SplitReader<KinesisClientRecord, KinesisShardSplit> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change actually changes the interface exposed for the KinesisSource, so it would be a backwards incompatible change. Is there a way we can wrap this internally?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have been looking at the implications of not changing from Record into KinesisClientRecord and this is what I have found:

  • The deaggreated records would have to be converted from KinesisClientRecord, since AggregatorUtil only uses KinesisClientRecord, to Record adding a slight overhead.
  • The KinesisClientRecord has the subSequenceNumber that could and should be used for checkpointing.

Apart from that I don't see any more issues. Either way I don't mind changing it back to Record.


private static final Logger LOG = LoggerFactory.getLogger(KinesisShardSplitReaderBase.class);
private static final RecordsWithSplitIds<Record> INCOMPLETE_SHARD_EMPTY_RECORDS =
private static final RecordsWithSplitIds<KinesisClientRecord> INCOMPLETE_SHARD_EMPTY_RECORDS =
new KinesisRecordsWithSplitIds(Collections.emptyIterator(), null, false);

private final Deque<KinesisShardSplitState> assignedSplits = new ArrayDeque<>();
Expand All @@ -65,7 +64,7 @@ protected KinesisShardSplitReaderBase(Map<String, KinesisShardMetrics> shardMetr
}

@Override
public RecordsWithSplitIds<Record> fetch() throws IOException {
public RecordsWithSplitIds<KinesisClientRecord> fetch() throws IOException {
KinesisShardSplitState splitState = assignedSplits.poll();

// When there are no assigned splits, return quickly
Expand Down Expand Up @@ -103,7 +102,7 @@ public RecordsWithSplitIds<Record> fetch() throws IOException {
.get(splitState.getShardId())
.setMillisBehindLatest(recordBatch.getMillisBehindLatest());

if (recordBatch.getRecords().isEmpty()) {
if (recordBatch.getDeaggregatedRecords().isEmpty()) {
if (recordBatch.isCompleted()) {
return new KinesisRecordsWithSplitIds(
Collections.emptyIterator(), splitState.getSplitId(), true);
Expand All @@ -115,12 +114,12 @@ public RecordsWithSplitIds<Record> fetch() throws IOException {
splitState.setNextStartingPosition(
StartingPosition.continueFromSequenceNumber(
recordBatch
.getRecords()
.get(recordBatch.getRecords().size() - 1)
.getDeaggregatedRecords()
.get(recordBatch.getDeaggregatedRecords().size() - 1)
.sequenceNumber()));

return new KinesisRecordsWithSplitIds(
recordBatch.getRecords().iterator(),
recordBatch.getDeaggregatedRecords().iterator(),
splitState.getSplitId(),
recordBatch.isCompleted());
}
Expand Down Expand Up @@ -154,48 +153,20 @@ public void pauseOrResumeSplits(
splitsToResume.forEach(split -> pausedSplitIds.remove(split.splitId()));
}

/**
* Dataclass to store a batch of Kinesis records with metadata. Used to pass Kinesis records
* from the SplitReader implementation to the SplitReaderBase.
*/
@Internal
protected static class RecordBatch {
private final List<Record> records;
private final long millisBehindLatest;
private final boolean completed;

public RecordBatch(List<Record> records, long millisBehindLatest, boolean completed) {
this.records = records;
this.millisBehindLatest = millisBehindLatest;
this.completed = completed;
}

public List<Record> getRecords() {
return records;
}

public long getMillisBehindLatest() {
return millisBehindLatest;
}

public boolean isCompleted() {
return completed;
}
}

/**
* Implementation of {@link RecordsWithSplitIds} for sending Kinesis records from fetcher to the
* SourceReader.
*/
@Internal
private static class KinesisRecordsWithSplitIds implements RecordsWithSplitIds<Record> {
private static class KinesisRecordsWithSplitIds
implements RecordsWithSplitIds<KinesisClientRecord> {

private final Iterator<Record> recordsIterator;
private final Iterator<KinesisClientRecord> recordsIterator;
private final String splitId;
private final boolean isComplete;

public KinesisRecordsWithSplitIds(
Iterator<Record> recordsIterator, String splitId, boolean isComplete) {
Iterator<KinesisClientRecord> recordsIterator, String splitId, boolean isComplete) {
this.recordsIterator = recordsIterator;
this.splitId = splitId;
this.isComplete = isComplete;
Expand All @@ -209,7 +180,7 @@ public String nextSplit() {

@Nullable
@Override
public Record nextRecordFromSplit() {
public KinesisClientRecord nextRecordFromSplit() {
return recordsIterator.hasNext() ? recordsIterator.next() : null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.flink.connector.kinesis.source.split.StartingPosition;
import org.apache.flink.util.Collector;

import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

/**
* Emits record from the source into the Flink job graph. This serves as the interface between the
Expand All @@ -36,7 +36,7 @@
*/
@Internal
public class KinesisStreamsRecordEmitter<T>
implements RecordEmitter<Record, T, KinesisShardSplitState> {
implements RecordEmitter<KinesisClientRecord, T, KinesisShardSplitState> {

private final KinesisDeserializationSchema<T> deserializationSchema;
private final SourceOutputWrapper<T> sourceOutputWrapper = new SourceOutputWrapper<>();
Expand All @@ -47,7 +47,7 @@ public KinesisStreamsRecordEmitter(KinesisDeserializationSchema<T> deserializati

@Override
public void emitRecord(
Record element, SourceOutput<T> output, KinesisShardSplitState splitState)
KinesisClientRecord element, SourceOutput<T> output, KinesisShardSplitState splitState)
throws Exception {
sourceOutputWrapper.setSourceOutput(output);
sourceOutputWrapper.setTimestamp(element.approximateArrivalTimestamp().toEpochMilli());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

import java.util.HashSet;
import java.util.List;
Expand All @@ -45,14 +45,14 @@
@Internal
public class KinesisStreamsSourceReader<T>
extends SingleThreadMultiplexSourceReaderBase<
Record, T, KinesisShardSplit, KinesisShardSplitState> {
KinesisClientRecord, T, KinesisShardSplit, KinesisShardSplitState> {

private static final Logger LOG = LoggerFactory.getLogger(KinesisStreamsSourceReader.class);
private final Map<String, KinesisShardMetrics> shardMetricGroupMap;

public KinesisStreamsSourceReader(
SingleThreadFetcherManager<Record, KinesisShardSplit> splitFetcherManager,
RecordEmitter<Record, T, KinesisShardSplitState> recordEmitter,
SingleThreadFetcherManager<KinesisClientRecord, KinesisShardSplit> splitFetcherManager,
RecordEmitter<KinesisClientRecord, T, KinesisShardSplitState> recordEmitter,
Configuration config,
SourceReaderContext context,
Map<String, KinesisShardMetrics> shardMetricGroupMap) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.kinesis.source.reader;

import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;

import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

import java.util.ArrayList;
import java.util.List;

/**
* Dataclass to store a batch of Kinesis records with metadata. Used to pass Kinesis records from
* the SplitReader implementation to the SplitReaderBase.
*
* <p>Input records are de-aggregated using KCL 3.x library. It is expected that AWS SDK v2.x
* messages are converted to KCL 3.x {@link KinesisClientRecord}.
*/
@Internal
public class RecordBatch {
private final List<KinesisClientRecord> deaggregatedRecords;
private final long millisBehindLatest;
private final boolean completed;

public RecordBatch(
final List<Record> records,
final KinesisShardSplit subscribedShard,
final long millisBehindLatest,
final boolean completed) {
this.deaggregatedRecords = deaggregateRecords(records, subscribedShard);
this.millisBehindLatest = millisBehindLatest;
this.completed = completed;
}

public List<KinesisClientRecord> getDeaggregatedRecords() {
return deaggregatedRecords;
}

public long getMillisBehindLatest() {
return millisBehindLatest;
}

public boolean isCompleted() {
return completed;
}

private List<KinesisClientRecord> deaggregateRecords(
final List<Record> records, final KinesisShardSplit subscribedShard) {
final List<KinesisClientRecord> kinesisClientRecords = new ArrayList<>();
for (Record record : records) {
kinesisClientRecords.add(KinesisClientRecord.fromRecord(record));
}

final String startingHashKey = subscribedShard.getStartingHashKey();
final String endingHashKey = subscribedShard.getEndingHashKey();

return new AggregatorUtil()
.deaggregate(kinesisClientRecords, startingHashKey, endingHashKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics;
import org.apache.flink.connector.kinesis.source.proxy.AsyncStreamProxy;
import org.apache.flink.connector.kinesis.source.reader.KinesisShardSplitReaderBase;
import org.apache.flink.connector.kinesis.source.reader.RecordBatch;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;

Expand Down Expand Up @@ -69,7 +70,11 @@ protected RecordBatch fetchRecords(KinesisShardSplitState splitState) {
if (shardCompleted) {
splitSubscriptions.remove(splitState.getShardId());
}
return new RecordBatch(event.records(), event.millisBehindLatest(), shardCompleted);
return new RecordBatch(
event.records(),
splitState.getKinesisShardSplit(),
event.millisBehindLatest(),
shardCompleted);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics;
import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
import org.apache.flink.connector.kinesis.source.reader.KinesisShardSplitReaderBase;
import org.apache.flink.connector.kinesis.source.reader.RecordBatch;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;

import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
Expand Down Expand Up @@ -59,8 +60,12 @@ protected RecordBatch fetchRecords(KinesisShardSplitState splitState) {
splitState.getNextStartingPosition(),
this.maxRecordsToGet);
boolean isCompleted = getRecordsResponse.nextShardIterator() == null;

return new RecordBatch(
getRecordsResponse.records(), getRecordsResponse.millisBehindLatest(), isCompleted);
getRecordsResponse.records(),
splitState.getKinesisShardSplit(),
getRecordsResponse.millisBehindLatest(),
isCompleted);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.connector.kinesis.source.KinesisStreamsSource;
import org.apache.flink.util.Collector;

import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

import java.io.IOException;
import java.io.Serializable;
Expand Down Expand Up @@ -60,7 +60,7 @@ default void open(DeserializationSchema.InitializationContext context) throws Ex
* @param output the identifier of the shard the record was sent to
* @throws IOException exception when deserializing record
*/
void deserialize(Record record, String stream, String shardId, Collector<T> output)
void deserialize(KinesisClientRecord record, String stream, String shardId, Collector<T> output)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change to the public interface

throws IOException;

static <T> KinesisDeserializationSchema<T> of(DeserializationSchema<T> deserializationSchema) {
Expand Down
Loading