Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35299] Respect initial position for new streams #140

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
32 changes: 32 additions & 0 deletions docs/content/docs/connectors/datastream/kinesis.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,38 @@ properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIME
If `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` is not defined then the default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`
(for example, timestamp value is `2016-04-04` and pattern is `yyyy-MM-dd` given by user or timestamp value is `2016-04-04T19:58:46.480-00:00` without given a pattern).

### Configuring starting position for new streams

By default, the Flink Kinesis Consumer handles new streams the same way it handles a new shard for an existing stream, and it starts consuming from the earliest record (same behaviour as TRIM_HORIZON).

This behaviour is fine if you're consuming from a stream that you don't want to lose any data from, but if you're consuming from a stream with a large retention and where it is fine to start consuming from "now",
or more generally started from that is defined in `ConsumerConfigConstants.STREAM_INITIAL_POSITION`, this was not possible before.

This behaviour can now be enabled by setting the `ConsumerConfigConstants.APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS` flag to true, which will make ALL new streams "reset" to consume from the initial position
instead of starting from the beginning.

If you just want to force a particular new stream to start consuming from the defined `ConsumerConfigConstants.STREAM_INITIAL_POSITION`, you can use the `ConsumerConfigConstants.STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO` property (described below) instead.

### Resetting specific streams to the starting position

One of the features of the Flink Kinesis Consumer is that it keeps track of the offset that the application is at for each shard, so that if the application is restarted we can start consuming from that offset
when restoring from snapshot.

This is the ideal behaviour most of the time, but what if you want to jump to `LATEST` or go back to `TRIM_HORIZON` for a stream that is already being tracked by the Flink Kinesis Consumer?

You can now do this via the `ConsumerConfigConstants.STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO` property, which expects a comma separated list of strings referring to the names of the Kinesis Streams to reset.

For example, if you configure your application with
```
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if we are able to set different streams with different INITIAL POSITION. Let's say we would add streamA, streamB and streamC as new streams, I want to have streamA and streamB to consume from LATEST and streamC from AT_TIMESTAMP. Is this possible?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was not possible before and is still not possible now 😅

Even though it would be possible to do that with minimal changes, I feel like that is another feature request on its own and probably doesn't belong in this one.
The main goal of this change is: allow users to say when INITIAL position should be used regardless of the stored state.

consumerConfig.put(ConsumerConfigConstants.STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO, "streamA, streamB");
```
then `streamA` and `streamB` would start consuming from LATEST, even if they are already being tracked by the application.

{{< hint warning >}}
Note that you need to remove this property after the value is reset and a savepoint is taken, otherwise the Flink Kinesis Consumer will always be resetting those streams to the configured initial position.
{{< /hint >}}

### Fault Tolerance for Exactly-Once User-Defined State Update Semantics

With Flink's checkpointing enabled, the Flink Kinesis Consumer will consume records from shards in Kinesis streams and
Expand Down
Loading
Loading