Skip to content

Commit c739b58

Browse files
psclsyschroeder97westphal-jan
committed
[FLINK-21373] RabbitMQ Connector using FLIP-143 Sink API
RabbitMQ Connector using new Sink API https://issues.apache.org/jira/browse/FLINK-21373 Co-authored-by: Yannik Schroeder <[email protected]> Co-authored-by: Jan Westphal <[email protected]>
1 parent b1cbead commit c739b58

33 files changed

+1873
-113
lines changed

flink-connector-rabbitmq/README.md

+141-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# License of the Rabbit MQ Connector
1+
# License of the RabbitMQ Connector
22

33
Flink's RabbitMQ connector defines a Maven dependency on the
44
"RabbitMQ AMQP Java Client", is triple-licensed under the Mozilla Public License 1.1 ("MPL"),
@@ -19,6 +19,143 @@ and the Sink API specified in [FLIP-143](https://cwiki.apache.org/confluence/dis
1919

2020
For more information about RabbitMQ visit https://www.rabbitmq.com/.
2121

22-
In order to view how to use the connector inspect
23-
[Source](src/main/java/org/apache/flink/connector/rabbitmq2/source/README.md) and
24-
[Sink](src/main/java/org/apache/flink/connector/rabbitmq2/sink/README.md).
22+
# RabbitMQ Source
23+
24+
Flink's RabbitMQ connector provides a streaming-only source which enables you to receive messages
25+
from a RabbitMQ queue in three different consistency modes: at-most-once, at-least-once,
26+
and exactly-once.
27+
28+
## Consistency Modes
29+
30+
With **at-most-once**, the source will receive each message and automatically acknowledges it to
31+
RabbitMQ. The message content is then polled by the output. If the system crashes in the meantime,
32+
the messages that the source buffers are lost.
33+
34+
By contrast, the messages in the **at-least-once** mode are not automatically acknowledged but
35+
instead the delivery tag is stored in order to acknowledge it later to RabbitMQ. Messages are polled
36+
by the output and when the notification for a completed checkpoint is received the messages that were
37+
polled are acknowledged to RabbitMQ. Therefore, the mode requires _checkpointing enabled_. This way,
38+
it is assured that the messages are correctly processed by the system. If the system crashes in the
39+
meantime, the unacknowledged messages will be resend by RabbitMQ to assure at-least-once behavior.
40+
41+
The **exactly-once-mode** mode uses _correlation ids_ to deduplicate messages. Correlation ids are
42+
properties of the messages and need to be set by the message publisher (who publishes the messages
43+
to RabbitMQ) in order for the mode to function. The user has the obligation to ensure that the set
44+
correlation id for a message is unique, otherwise no exactly-once can be guaranteed here since
45+
RabbitMQ itself has no support for automatic exactly-once ids or the required behavior. In addition,
46+
it requires _checkpointing enabled_and only \_parallelism 1_ is allowed. Similar to at-least-once,
47+
the messages are received from RabbitMQ,buffered, and passed to the output when polled. A set of
48+
seen correlation ids is maintained to apply the deduplication. During a checkpoint, the seen
49+
correlation ids are stored so that in case of failure they can be recovered and used for
50+
deduplication. When the notification for a completed checkpoint is received, all polled messages are
51+
acknowledged as one transaction to ensure the reception by RabbitMQ. Afterwards, the set of
52+
correlation ids is updated as RabbitMQ will not send the acknowledged messages again. This behavior
53+
assures exactly-once processing but also has a performance drawback. Committing many messages will
54+
take time and will thus increase the overall time it takes to do a checkpoint. This can result in
55+
checkpoint delays and in peaks where checkpoint have either many or just a few messages.
56+
57+
## How to use it
58+
59+
```java
60+
public class Main {
61+
public static void main(String[]args) {
62+
63+
RabbitMQSource<T> source =
64+
RabbitMQSource.<T>builder()
65+
.setConnectionConfig(RMQ_CONNECTION_CONFIG)
66+
.setQueueName(RABBITMQ_QUEUE_NAME)
67+
.setDeserializationSchema(DESERIALIZATION_SCHEMA)
68+
.setConsistencyMode(CONSISTENCY_MODE)
69+
.build();
70+
71+
// ******************* An example usage looks like this *******************
72+
73+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
74+
75+
RMQConnectionConfig rmqConnectionConfig =
76+
new RMQConnectionConfig.Builder()
77+
.setHost("localhost")
78+
.setVirtualHost("/")
79+
.setUserName("guest")
80+
.setPassword("guest")
81+
.setPort(5672)
82+
.build();
83+
84+
RabbitMQSource<String> rmqSource =
85+
RabbitMQSource.<String>builder()
86+
.setConnectionConfig(rmqConnectionConfig)
87+
.setQueueName("consume-queue")
88+
.setDeserializationSchema(new SimpleStringSchema())
89+
.setConsistencyMode(ConsistencyMode.AT_MOST_ONCE)
90+
.build();
91+
92+
DataStream<String> stream = env.fromSource(rmqSource, WatermarkStrategy.noWatermarks(), "RMQSource");
93+
}
94+
}
95+
```
96+
97+
# RabbitMQ Sink
98+
99+
Flink's RabbitMQ connector provides a sink which enables you to publish your stream directly
100+
to a RabbitMQ exchange in three different consistency modes: at-most-once, at-least-once,
101+
and exactly-once. Furthermore, user defined publish options can be used to customize each message
102+
options in regard to exchange and publish settings in the RabbitMQ context.
103+
104+
## Consistency Mode
105+
106+
With **at-most-once**, the sink will simply take each message and publish the serialization of it
107+
(with publish options if given) to RabbitMQ. At this point the sink gives up the ownership of the message.
108+
109+
For **at-least-once** the same process as for at-most-once is executed except that the ownership of
110+
the message does not end immediately with publishing it. The sink will keep the individual publishing
111+
id for each message as well as the message itself and buffer it as long as it takes to receive the
112+
message received acknowledgment from RabbitMQ. Since we are in the need of buffering messages and waiting
113+
for their asynchronous acknowledgment, this requires _checkpointing enabled_. On each checkpoint,
114+
all to this point buffered (and thus send) but unacknowledged messages will be stored. Simultaneously,
115+
on each checkpoint a resend will be triggered to send all unacknowledged messages once again since
116+
we have to assume that something went wrong for it during the publishing process. Since it can take a
117+
moment until messages get acknowledged from RabbitMQ this can and probably will result in a message
118+
duplication and therefore this logic becomes at-least-once.
119+
120+
By contrast, the **exactly-once-mode** mode will not send messages on receive. All incoming messages
121+
will be buffered until a checkpoint is triggered. On each checkpoint all messages will be
122+
published/committed as one transaction to ensure the reception acknowledge by RabbitMQ.
123+
If successful, all messages which were committed will be given up, otherwise they will be stored
124+
and tried to commit again in the next transaction during the next checkpoint.
125+
This consistency mode ensures that each message will be stored in RabbitMQ exactly once but also has
126+
a performance drawback. Committing many messages will take time and will thus increase the overall
127+
time it takes to do a checkpoint. This can result in checkpoint delays and in peaks where
128+
checkpoint have either many or just a few messages. This also correlates to the latency of each message.
129+
130+
## How to use it
131+
132+
```java
133+
RabbitMQSink<T> sink =
134+
RabbitMQSink.<T>builder()
135+
.setConnectionConfig(<RMQConnectionConfig>)
136+
.setQueueName(<RabbitMQ Queue Name>)
137+
.setSerializationSchema(<Serialization Schema>)
138+
.setConsistencyMode(<ConsistencyMode>)
139+
.build();
140+
141+
// ******************* An example usage looks like this *******************
142+
143+
RMQConnectionConfig rmqConnectionConfig =
144+
new RMQConnectionConfig.Builder()
145+
.setHost("localhost")
146+
.setVirtualHost("/")
147+
.setUserName("guest")
148+
.setPassword("guest")
149+
.setPort(5672)
150+
.build();
151+
152+
RabbitMQSink<String> rmqSink =
153+
RabbitMQSink.<String>builder()
154+
.setConnectionConfig(rmqConnectionConfig)
155+
.setQueueName("publish-queue")
156+
.setSerializationSchema(new SimpleStringSchema())
157+
.setConsistencyMode(ConsistencyMode.AT_MOST_ONCE)
158+
.build();
159+
160+
(DataStream<String>).sinkTo(rmqSink)
161+
```

flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/common/RabbitMQConnectionConfig.java flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/RabbitMQConnectionConfig.java

+7-10
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import java.security.NoSuchAlgorithmException;
3131
import java.util.Optional;
3232

33+
import static java.util.Objects.requireNonNull;
34+
3335
/**
3436
* This class is copied from the previous RabbitMQ connector. Connection Configuration for RMQ. If
3537
* {@link Builder#setUri(String)} has been set then {@link
@@ -92,16 +94,11 @@ private RabbitMQConnectionConfig(
9294
Integer requestedFrameMax,
9395
Integer requestedHeartbeat,
9496
Integer prefetchCount) {
95-
Preconditions.checkNotNull(host, "host can not be null");
96-
Preconditions.checkNotNull(port, "port can not be null");
97-
Preconditions.checkNotNull(virtualHost, "virtualHost can not be null");
98-
Preconditions.checkNotNull(username, "username can not be null");
99-
Preconditions.checkNotNull(password, "password can not be null");
100-
this.host = host;
101-
this.port = port;
102-
this.virtualHost = virtualHost;
103-
this.username = username;
104-
this.password = password;
97+
this.host = requireNonNull(host);
98+
this.port = requireNonNull(port);
99+
this.virtualHost = requireNonNull(virtualHost);
100+
this.username = requireNonNull(username);
101+
this.password = requireNonNull(password);
105102

106103
this.networkRecoveryInterval = networkRecoveryInterval;
107104
this.automaticRecovery = automaticRecovery;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# License of the RabbitMQ Connector
2+
3+
Flink's RabbitMQ connector defines a Maven dependency on the
4+
"RabbitMQ AMQP Java Client", is triple-licensed under the Mozilla Public License 1.1 ("MPL"),
5+
the GNU General Public License version 2 ("GPL") and the Apache License version 2 ("ASL").
6+
7+
Flink itself neither reuses source code from the "RabbitMQ AMQP Java Client"
8+
nor packages binaries from the "RabbitMQ AMQP Java Client".
9+
10+
Users that create and publish derivative work based on Flink's
11+
RabbitMQ connector (thereby re-distributing the "RabbitMQ AMQP Java Client")
12+
must be aware that this may be subject to conditions declared in the
13+
Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL")
14+
and the Apache License version 2 ("ASL").

0 commit comments

Comments
 (0)