Skip to content

Commit 434558b

Browse files
committed
[FLINK-21373] Add RabbitMQ SinkV2 Implementation, Port Flink version to 1.19
1 parent 66e323a commit 434558b

29 files changed

+3156
-599
lines changed

.github/workflows/push_pr.yml

+2-5
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,8 @@ jobs:
2828
compile_and_test:
2929
strategy:
3030
matrix:
31-
flink: [ 1.18-SNAPSHOT ]
32-
jdk: [ '8, 11, 17' ]
33-
include:
34-
- flink: 1.19-SNAPSHOT
35-
jdk: '8, 11, 17, 21'
31+
flink: [ 1.19-SNAPSHOT ]
32+
jdk: [ '8, 11, 17, 21' ]
3633
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
3734
with:
3835
flink_version: ${{ matrix.flink }}

.github/workflows/weekly.yml

-4
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,6 @@ jobs:
3030
strategy:
3131
matrix:
3232
flink_branches: [{
33-
flink: 1.18-SNAPSHOT,
34-
jdk: '8, 11, 17',
35-
branch: main
36-
}, {
3733
flink: 1.19-SNAPSHOT,
3834
jdk: '8, 11, 17, 21',
3935
branch: main
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
Method <org.apache.flink.streaming.connectors.rabbitmq.RMQSource$RMQCollectorImpl.setMessageIdentifiers(java.lang.String, long)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (RMQSource.java:415)
2+
Method <org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close()> calls method <org.apache.flink.util.ExceptionUtils.firstOrSuppressed(java.lang.Throwable, java.lang.Throwable)> in (RMQSource.java:303)
3+
Method <org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close()> calls method <org.apache.flink.util.IOUtils.closeAll([Ljava.lang.AutoCloseable;)> in (RMQSource.java:300)
4+
Method <org.apache.flink.streaming.connectors.rabbitmq.RMQSource.open(org.apache.flink.configuration.Configuration)> calls method <org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters.deserializationAdapter(org.apache.flink.api.common.functions.RuntimeContext, java.util.function.Function)> in (RMQSource.java:275)
5+
Method <org.apache.flink.streaming.connectors.rabbitmq.RMQSource.open(org.apache.flink.configuration.Configuration)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.isCheckpointingEnabled()> in (RMQSource.java:254)
6+
Method <org.apache.flink.streaming.connectors.rabbitmq.RMQSource.open(org.apache.flink.configuration.Configuration)> calls method <org.apache.flink.util.IOUtils.closeAllQuietly([Ljava.lang.AutoCloseable;)> in (RMQSource.java:266)
7+
Method <org.apache.flink.streaming.connectors.rabbitmq.RMQSource.open(org.apache.flink.configuration.Configuration)> checks instanceof <org.apache.flink.streaming.api.operators.StreamingRuntimeContext> in (RMQSource.java:253)
8+
Method <org.apache.flink.streaming.connectors.rabbitmq.RMQSource.setupConnection()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (RMQSource.java:0)
9+
Method <org.apache.flink.streaming.connectors.rabbitmq.RMQSource.setupQueue()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (RMQSource.java:0)
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#
2-
#Mon Apr 03 14:18:47 CEST 2023
2+
#Thu May 02 14:30:30 BST 2024
33
Return\ and\ argument\ types\ of\ methods\ annotated\ with\ @Public\ must\ be\ annotated\ with\ @Public.=f67f70fc-4a24-448c-a247-354e7ce69167
44
Connector\ production\ code\ must\ not\ depend\ on\ non-public\ API\ outside\ of\ connector\ packages=deb59a69-6a64-49f2-8aa3-84985ee63d70
55
ITCASE\ tests\ should\ use\ a\ MiniCluster\ resource\ or\ extension=6fdbfe74-a937-4a8a-8e1b-9f0a3391f3fe
@@ -8,3 +8,4 @@ Options\ for\ connectors\ and\ formats\ should\ reside\ in\ a\ consistent\ packa
88
Tests\ inheriting\ from\ AbstractTestBase\ should\ have\ name\ ending\ with\ ITCase=675cade4-c44e-4b2b-aacf-0c23d2032e4a
99
Return\ and\ argument\ types\ of\ methods\ annotated\ with\ @PublicEvolving\ must\ be\ annotated\ with\ @Public(Evolving).=871721c9-4c5f-4523-b8f6-a419e8a0085f
1010
Classes\ in\ API\ packages\ should\ have\ at\ least\ one\ API\ visibility\ annotation.=54a3d1fc-24ac-4bdc-bf15-56e8d7831aed
11+
Connector\ production\ code\ must\ depend\ only\ on\ public\ API\ when\ outside\ of\ connector\ packages=a6cee285-bdbf-4479-a652-8143c2bc1a69

flink-connector-rabbitmq/pom.xml

+15
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,21 @@ under the License.
7676
<scope>test</scope>
7777
</dependency>
7878

79+
<dependency>
80+
<groupId>org.apache.flink</groupId>
81+
<artifactId>flink-connector-test-utils</artifactId>
82+
<version>${flink.version}</version>
83+
<scope>test</scope>
84+
</dependency>
85+
86+
<dependency>
87+
<groupId>org.apache.flink</groupId>
88+
<artifactId>flink-connector-base</artifactId>
89+
<version>${flink.version}</version>
90+
<scope>test</scope>
91+
<type>test-jar</type>
92+
</dependency>
93+
7994
<dependency>
8095
<groupId>org.testcontainers</groupId>
8196
<artifactId>rabbitmq</artifactId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package org.apache.flink.connector.rabbitmq.common;
2+
3+
import org.apache.flink.annotation.PublicEvolving;
4+
5+
/** Constants for the RabbitMQ connector. */
6+
@PublicEvolving
7+
public class Constants {
8+
9+
/** The default RabbitMQ host Exchange used when exchange routing is disabled. */
10+
public static final String DEFAULT_EXCHANGE = "";
11+
12+
/** The default maximum number of inflight messages handled by SinkWriter at the same time. */
13+
public static final int DEFAULT_MAX_INFLIGHT = 100;
14+
15+
/** The default behaviour of sink on failing to send elements. */
16+
public static final boolean DEFAULT_FAIL_ON_ERROR = false;
17+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package org.apache.flink.connector.rabbitmq.common;
2+
3+
import org.apache.flink.annotation.PublicEvolving;
4+
5+
import static org.apache.flink.connector.rabbitmq.common.Constants.DEFAULT_EXCHANGE;
6+
7+
/**
8+
* Default implementation of {@link RabbitMQMessageConverter}.
9+
*
10+
* @param <T> type of the message to be converted
11+
*/
12+
@PublicEvolving
13+
public class DefaultRabbitMQMessageConverter<T> implements RabbitMQMessageConverter<T> {
14+
@Override
15+
public RabbitMQMessage<T> toRabbitMQMessage(T value) {
16+
return RabbitMQMessage.<T>builder().setMessage(value).setExchange(DEFAULT_EXCHANGE).build();
17+
}
18+
19+
@Override
20+
public boolean supportsExchangeRouting() {
21+
return false;
22+
}
23+
}

0 commit comments

Comments
 (0)