Skip to content

Commit 92a7615

Browse files
committed
Flink Connector Redshift
1 parent 27db878 commit 92a7615

24 files changed

+3249
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
# Flink Redshift Connector
2+
3+
This is the initial Proof of Concept for Flink connector redshift in 2 modes
4+
5+
- read.mode = JDBC
6+
- read.mode = COPY
7+
8+
This POC only supports Sink Table.
9+
10+
## Connector Options
11+
| Option | Required | Default | Type | Description |
12+
|:-------|:---------|:---------|:-----|:------------|
13+
hostname | required | none | String | Redshift connection hostname
14+
port | required | 5439 | Integer | Redshift connection port
15+
username | required | none | String | Redshift user username
16+
password | required | none | String | Redshift user password
17+
database-name | required | dev | String | Redshift database to connect
18+
table-name | required | none | String | Reshift table name
19+
sink.batch-size | optional | 1000 | Integer | The max flush size, over this will flush data.
20+
sink.flush-interval | optional | 1s | Duration | Over this flush interval mills, asynchronous threads will flush data.
21+
sink.max-retries | optional | 3 | Integer | The max retry times when writing records to the database failed.
22+
copy-mode | required | false | Boolean | Using Redshift COPY command to insert/upsert or not.
23+
copy-temp-s3-uri | conditional required | none | String | If the copy-mode=true then then Redshift COPY command must need a S3 URI.
24+
iam-role-arn | conditional required | none | String | If the copy-mode=true then then Redshift COPY command must need a IAM role. And this role must have the privilege and attache to the Redshift cluser.
25+
26+
**Update/Delete Data Considerations:**
27+
The data is updated and deleted by the primary key.
28+
29+
## Data Type Mapping
30+
31+
| Flink Type | Redshift Type |
32+
|:--------------------|:--------------------------------------------------------|
33+
| CHAR | VARCHAR |
34+
| VARCHAR | VARCHAR |
35+
| STRING | VARCHAR |
36+
| BOOLEAN | Boolean |
37+
| BYTES | Not supported |
38+
| DECIMAL | Decimal |
39+
| TINYINT | Int8 |
40+
| SMALLINT | Int16 |
41+
| INTEGER | Int32 |
42+
| BIGINT | Int64 |
43+
| FLOAT | Float32 |
44+
| DOUBLE | Float64 |
45+
| DATE | Date |
46+
| TIME | Timestamp |
47+
| TIMESTAMP | Timestamp |
48+
| TIMESTAMP_LTZ | Timestamp |
49+
| INTERVAL_YEAR_MONTH | Int32 |
50+
| INTERVAL_DAY_TIME | Int64 |
51+
| ARRAY | Not supported |
52+
| MAP | Not supported |
53+
| ROW | Not supported |
54+
| MULTISET | Not supported |
55+
| RAW | Not supported |
56+
57+
58+
59+
## How POC is Tested
60+
61+
### Create and sink a table in pure JDBC mode
62+
63+
```SQL
64+
65+
-- register a Redshift table `t_user` in flink sql.
66+
CREATE TABLE t_user (
67+
`user_id` BIGINT,
68+
`user_type` INTEGER,
69+
`language` STRING,
70+
`country` STRING,
71+
`gender` STRING,
72+
`score` DOUBLE,
73+
PRIMARY KEY (`user_id`) NOT ENFORCED
74+
) WITH (
75+
'connector' = 'redshift',
76+
'hostname' = 'xxxx.redshift.awsamazon.com',
77+
'port' = '5439',
78+
'username' = 'awsuser',
79+
'password' = 'passwordxxxx',
80+
'database-name' = 'tutorial',
81+
'table-name' = 'users',
82+
'sink.batch-size' = '500',
83+
'sink.flush-interval' = '1000',
84+
'sink.max-retries' = '3'
85+
);
86+
87+
-- write data into the Redshift table from the table `T`
88+
INSERT INTO t_user
89+
SELECT cast(`user_id` as BIGINT), `user_type`, `lang`, `country`, `gender`, `score`) FROM T;
90+
91+
```
92+
93+
### Create and sink a table in COPY mode
94+
95+
```SQL
96+
97+
-- register a Redshift table `t_user` in flink sql.
98+
CREATE TABLE t_user (
99+
`user_id` BIGINT,
100+
`user_type` INTEGER,
101+
`language` STRING,
102+
`country` STRING,
103+
`gender` STRING,
104+
`score` DOUBLE,
105+
PRIMARY KEY (`user_id`) NOT ENFORCED
106+
) WITH (
107+
'connector' = 'redshift',
108+
'hostname' = 'xxxx.redshift.awsamazon.com',
109+
'port' = '5439',
110+
'username' = 'awsuser',
111+
'password' = 'passwordxxxx',
112+
'database-name' = 'tutorial',
113+
'table-name' = 'users',
114+
'sink.batch-size' = '500',
115+
'sink.flush-interval' = '1000',
116+
'sink.max-retries' = '3',
117+
'copy-mode' = 'true',
118+
'copy-temp-s3-uri' = 's3://bucket-name/key/temp',
119+
'iam-role-arn' = 'arn:aws:iam::xxxxxxxx:role/xxxxxRedshiftS3Rolexxxxx'
120+
);
121+
```
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
Licensed to the Apache Software Foundation (ASF) under one
4+
or more contributor license agreements. See the NOTICE file
5+
distributed with this work for additional information
6+
regarding copyright ownership. The ASF licenses this file
7+
to you under the Apache License, Version 2.0 (the
8+
"License"); you may not use this file except in compliance
9+
with the License. You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing,
14+
software distributed under the License is distributed on an
15+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
KIND, either express or implied. See the License for the
17+
specific language governing permissions and limitations
18+
under the License.
19+
-->
20+
<project xmlns="http://maven.apache.org/POM/4.0.0"
21+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
22+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/maven-v4_0_0.xsd">
23+
24+
<modelVersion>4.0.0</modelVersion>
25+
26+
<parent>
27+
<groupId>org.apache.flink</groupId>
28+
<artifactId>flink-connector-aws-parent</artifactId>
29+
<version>4.3-SNAPSHOT</version>
30+
</parent>
31+
32+
<artifactId>flink-connector-redshift</artifactId>
33+
<name>Flink : Connectors : AWS : Amazon Redshift</name>
34+
35+
<properties>
36+
<redshift.jdbc.version>2.1.0.17</redshift.jdbc.version>
37+
<commons-logging.version>1.2</commons-logging.version>
38+
<commons-csv.version>1.10.0</commons-csv.version>
39+
<scala.binary.version>2.12</scala.binary.version>
40+
</properties>
41+
42+
<packaging>jar</packaging>
43+
44+
<dependencies>
45+
<dependency>
46+
<groupId>org.apache.flink</groupId>
47+
<artifactId>flink-connector-aws-base</artifactId>
48+
<version>${parent.version}</version>
49+
<scope>provided</scope>
50+
</dependency>
51+
52+
<!-- Apache Flink dependencies -->
53+
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
54+
55+
<!-- <dependency>-->
56+
<!-- <groupId>org.apache.flink</groupId>-->
57+
<!-- <artifactId>flink-java</artifactId>-->
58+
<!-- <version>${flink.version}</version>-->
59+
<!-- <scope>provided</scope>-->
60+
<!-- </dependency>-->
61+
62+
<!-- <dependency>-->
63+
<!-- <groupId>org.apache.flink</groupId>-->
64+
<!-- <artifactId>flink-streaming-java</artifactId>-->
65+
<!-- <version>${flink.version}</version>-->
66+
<!-- <scope>provided</scope>-->
67+
<!-- </dependency>-->
68+
69+
<!-- <dependency>-->
70+
<!-- <groupId>org.apache.flink</groupId>-->
71+
<!-- <artifactId>flink-clients</artifactId>-->
72+
<!-- <version>${flink.version}</version>-->
73+
<!-- <scope>provided</scope>-->
74+
<!-- </dependency>-->
75+
76+
<dependency>
77+
<groupId>org.apache.flink</groupId>
78+
<artifactId>flink-table-common</artifactId>
79+
<version>${flink.version}</version>
80+
<scope>provided</scope>
81+
</dependency>
82+
83+
<!-- <dependency>-->
84+
<!-- <groupId>org.apache.flink</groupId>-->
85+
<!-- <artifactId>flink-table-api-java-bridge</artifactId>-->
86+
<!-- <version>${flink.version}</version>-->
87+
<!-- <scope>provided</scope>-->
88+
<!-- </dependency>-->
89+
90+
91+
<dependency>
92+
<groupId>com.amazon.redshift</groupId>
93+
<artifactId>redshift-jdbc42</artifactId>
94+
<version>${redshift.jdbc.version}</version>
95+
<scope>provided</scope>
96+
</dependency>
97+
98+
<dependency>
99+
<groupId>org.apache.commons</groupId>
100+
<artifactId>commons-csv</artifactId>
101+
<version>${commons-csv.version}</version>
102+
</dependency>
103+
104+
<dependency>
105+
<groupId>com.amazonaws</groupId>
106+
<artifactId>aws-java-sdk-core</artifactId>
107+
<version>${aws.sdkv1.version}</version>
108+
</dependency>
109+
<dependency>
110+
<groupId>com.amazonaws</groupId>
111+
<artifactId>aws-java-sdk-s3</artifactId>
112+
<version>${aws.sdkv1.version}</version>
113+
</dependency>
114+
115+
<!-- Add logging framework, to produce console output when running in the IDE. -->
116+
<!-- These dependencies are excluded from the application JAR by default. -->
117+
<dependency>
118+
<groupId>commons-logging</groupId>
119+
<artifactId>commons-logging</artifactId>
120+
<version>${commons-logging.version}</version>
121+
<!-- <scope>provided</scope> -->
122+
</dependency>
123+
</dependencies>
124+
125+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.connector.redshift.connection;
19+
20+
import org.apache.flink.connector.redshift.options.RedshiftOptions;
21+
22+
import com.amazon.redshift.jdbc.RedshiftConnectionImpl;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
import java.io.Serializable;
27+
import java.sql.DriverManager;
28+
import java.sql.SQLException;
29+
30+
/** Redshift Connection Provider. */
31+
public class RedshiftConnectionProvider implements Serializable {
32+
private static final long serialVersionUID = 1L;
33+
34+
static final Logger LOG = LoggerFactory.getLogger(RedshiftConnectionProvider.class);
35+
36+
private static final String REDSHIFT_DRIVER_NAME = "com.amazon.redshift.Driver";
37+
38+
private transient RedshiftConnectionImpl connection;
39+
40+
private final RedshiftOptions options;
41+
42+
public RedshiftConnectionProvider(RedshiftOptions options) {
43+
this.options = options;
44+
}
45+
46+
public synchronized RedshiftConnectionImpl getConnection() throws SQLException {
47+
if (connection == null) {
48+
connection =
49+
createConnection(
50+
options.getHostname(), options.getPort(), options.getDatabaseName());
51+
}
52+
return connection;
53+
}
54+
55+
private RedshiftConnectionImpl createConnection(String hostname, int port, String dbName)
56+
throws SQLException {
57+
// String url = parseUrl(urls);
58+
59+
RedshiftConnectionImpl conn;
60+
String url = "jdbc:redshift://" + hostname + ":" + port + "/" + dbName;
61+
LOG.info("connection to {}", url);
62+
63+
try {
64+
Class.forName(REDSHIFT_DRIVER_NAME);
65+
} catch (ClassNotFoundException e) {
66+
throw new SQLException(e);
67+
}
68+
69+
if (options.getUsername().isPresent()) {
70+
conn =
71+
(RedshiftConnectionImpl)
72+
DriverManager.getConnection(
73+
url,
74+
options.getUsername().orElse(null),
75+
options.getPassword().orElse(null));
76+
} else {
77+
conn = (RedshiftConnectionImpl) DriverManager.getConnection(url);
78+
}
79+
80+
return conn;
81+
}
82+
83+
public void closeConnection() throws SQLException {
84+
if (this.connection != null) {
85+
this.connection.close();
86+
}
87+
}
88+
89+
public RedshiftConnectionImpl getOrCreateConnection() throws SQLException {
90+
if (connection == null) {
91+
connection =
92+
createConnection(
93+
options.getHostname(), options.getPort(), options.getDatabaseName());
94+
}
95+
return connection;
96+
}
97+
}

0 commit comments

Comments
 (0)