Skip to content

Commit 8986f77

Browse files
committed
[FLINK-34996][Connectors/Kafka] Allow custom Serializer/Deserializer initialization and remove mockito.
1 parent 68da758 commit 8986f77

File tree

6 files changed

+138
-110
lines changed

6 files changed

+138
-110
lines changed

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapper.java

+17-7
Original file line numberDiff line numberDiff line change
@@ -61,17 +61,12 @@ class KafkaSerializerWrapper<IN> implements SerializationSchema<IN> {
6161
this(serializerClass, isKey, Collections.emptyMap(), topicSelector);
6262
}
6363

64-
@SuppressWarnings("unchecked")
6564
@Override
6665
public void open(InitializationContext context) throws Exception {
67-
final ClassLoader userCodeClassLoader = context.getUserCodeClassLoader().asClassLoader();
66+
final ClassLoader userCodeClassLoader = selectClassLoader(context);
6867
try (TemporaryClassLoaderContext ignored =
6968
TemporaryClassLoaderContext.of(userCodeClassLoader)) {
70-
serializer =
71-
InstantiationUtil.instantiate(
72-
serializerClass.getName(),
73-
Serializer.class,
74-
userCodeClassLoader);
69+
initializeSerializer(userCodeClassLoader);
7570

7671
if (serializer instanceof Configurable) {
7772
((Configurable) serializer).configure(config);
@@ -88,4 +83,19 @@ public byte[] serialize(IN element) {
8883
checkState(serializer != null, "Call open() once before trying to serialize elements.");
8984
return serializer.serialize(topicSelector.apply(element), element);
9085
}
86+
87+
/**
88+
* Selects the class loader to be used when instantiating the serializer. Using a class loader
89+
* with user code allows users to customize the serializer.
90+
*/
91+
protected ClassLoader selectClassLoader(InitializationContext context) {
92+
return context.getUserCodeClassLoader().asClassLoader();
93+
}
94+
95+
@SuppressWarnings("unchecked")
96+
protected void initializeSerializer(ClassLoader classLoader) throws Exception {
97+
serializer =
98+
InstantiationUtil.instantiate(
99+
serializerClass.getName(), Serializer.class, classLoader);
100+
}
91101
}

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapper.java

+17-8
Original file line numberDiff line numberDiff line change
@@ -55,17 +55,11 @@ class KafkaValueOnlyDeserializerWrapper<T> implements KafkaRecordDeserialization
5555
}
5656

5757
@Override
58-
@SuppressWarnings("unchecked")
5958
public void open(DeserializationSchema.InitializationContext context) throws Exception {
60-
ClassLoader userCodeClassLoader = context.getUserCodeClassLoader().asClassLoader();
59+
ClassLoader userCodeClassLoader = selectClassLoader(context);
6160
try (TemporaryClassLoaderContext ignored =
6261
TemporaryClassLoaderContext.of(userCodeClassLoader)) {
63-
deserializer =
64-
(Deserializer<T>)
65-
InstantiationUtil.instantiate(
66-
deserializerClass.getName(),
67-
Deserializer.class,
68-
userCodeClassLoader);
62+
initializeDeserializer(userCodeClassLoader);
6963

7064
if (deserializer instanceof Configurable) {
7165
((Configurable) deserializer).configure(config);
@@ -103,4 +97,19 @@ public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<T> coll
10397
public TypeInformation<T> getProducedType() {
10498
return TypeExtractor.createTypeInfo(Deserializer.class, deserializerClass, 0, null, null);
10599
}
100+
101+
/**
102+
* Selects the class loader to be used when instantiating the deserializer. Using a class loader
103+
* with user code allows users to customize the deserializer.
104+
*/
105+
protected ClassLoader selectClassLoader(DeserializationSchema.InitializationContext context) {
106+
return context.getUserCodeClassLoader().asClassLoader();
107+
}
108+
109+
@SuppressWarnings("unchecked")
110+
protected void initializeDeserializer(ClassLoader classLoader) throws Exception {
111+
deserializer =
112+
InstantiationUtil.instantiate(
113+
deserializerClass.getName(), Deserializer.class, classLoader);
114+
}
106115
}
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,63 @@
11
package org.apache.flink.connector.kafka.sink;
22

3-
import org.apache.flink.streaming.connectors.kafka.testutils.SerializationTestBase;
3+
import org.apache.flink.api.common.serialization.SerializationSchema;
4+
import org.apache.flink.metrics.MetricGroup;
5+
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
6+
import org.apache.flink.util.FlinkUserCodeClassLoaders;
7+
import org.apache.flink.util.SimpleUserCodeClassLoader;
8+
import org.apache.flink.util.UserCodeClassLoader;
9+
410
import org.apache.kafka.common.serialization.StringSerializer;
511
import org.junit.Test;
6-
import org.junit.runner.RunWith;
7-
import org.mockito.junit.MockitoJUnitRunner;
812

9-
import static org.mockito.Mockito.when;
13+
import java.net.URL;
1014

11-
@RunWith(MockitoJUnitRunner.class)
12-
public class KafkaSerializerWrapperTest extends SerializationTestBase {
13-
@Override
14-
protected void setupContext() {
15-
when(serializationContext.getUserCodeClassLoader()).thenReturn(userCodeClassLoader);
16-
}
15+
import static org.junit.Assert.assertEquals;
1716

17+
/** Tests for {@link KafkaSerializerWrapper}. */
18+
public class KafkaSerializerWrapperTest {
1819
@Test
1920
public void testUserCodeClassLoaderIsUsed() throws Exception {
20-
final KafkaSerializerWrapper<String> wrapper =
21-
new KafkaSerializerWrapper<>(StringSerializer.class, true, (value) -> "topic");
21+
final KafkaSerializerWrapperCaptureForTest wrapper =
22+
new KafkaSerializerWrapperCaptureForTest();
23+
final ClassLoader classLoader =
24+
FlinkUserCodeClassLoaders.childFirst(
25+
new URL[0],
26+
getClass().getClassLoader(),
27+
new String[0],
28+
throwable -> {},
29+
true);
30+
wrapper.open(
31+
new SerializationSchema.InitializationContext() {
32+
@Override
33+
public MetricGroup getMetricGroup() {
34+
return new UnregisteredMetricsGroup();
35+
}
36+
37+
@Override
38+
public UserCodeClassLoader getUserCodeClassLoader() {
39+
return SimpleUserCodeClassLoader.create(classLoader);
40+
}
41+
});
42+
43+
assertEquals(classLoader, wrapper.getClassLoaderUsed());
44+
}
45+
46+
static class KafkaSerializerWrapperCaptureForTest extends KafkaSerializerWrapper<String> {
47+
private ClassLoader classLoaderUsed;
48+
49+
KafkaSerializerWrapperCaptureForTest() {
50+
super(StringSerializer.class, true, (value) -> "topic");
51+
}
52+
53+
public ClassLoader getClassLoaderUsed() {
54+
return classLoaderUsed;
55+
}
2256

23-
testUserClassLoaderIsUsedWhen(() -> {
24-
wrapper.open(serializationContext);
25-
return null;
26-
}, new StringSerializer());
57+
@Override
58+
protected void initializeSerializer(ClassLoader classLoader) throws Exception {
59+
classLoaderUsed = classLoader;
60+
super.initializeSerializer(classLoader);
61+
}
2762
}
2863
}
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,65 @@
11
package org.apache.flink.connector.kafka.source.reader.deserializer;
22

3-
import org.apache.flink.streaming.connectors.kafka.testutils.SerializationTestBase;
3+
import org.apache.flink.api.common.serialization.DeserializationSchema;
4+
import org.apache.flink.metrics.MetricGroup;
5+
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
6+
import org.apache.flink.util.FlinkUserCodeClassLoaders;
7+
import org.apache.flink.util.SimpleUserCodeClassLoader;
8+
import org.apache.flink.util.UserCodeClassLoader;
9+
410
import org.apache.kafka.common.serialization.StringDeserializer;
511
import org.junit.Test;
6-
import org.junit.runner.RunWith;
7-
import org.mockito.junit.MockitoJUnitRunner;
812

13+
import java.net.URL;
914
import java.util.HashMap;
10-
import java.util.Map;
11-
12-
import static org.mockito.Mockito.when;
1315

14-
@RunWith(MockitoJUnitRunner.class)
15-
public class KafkaValueOnlyDeserializerWrapperTest extends SerializationTestBase {
16-
@Override
17-
protected void setupContext() {
18-
when(deserializationContext.getUserCodeClassLoader()).thenReturn(userCodeClassLoader);
19-
}
16+
import static org.junit.Assert.assertEquals;
2017

18+
/** Tests for {@link KafkaValueOnlyDeserializerWrapper}. */
19+
public class KafkaValueOnlyDeserializerWrapperTest {
2120
@Test
2221
public void testUserCodeClassLoaderIsUsed() throws Exception {
23-
final Map<String, String> config = new HashMap<>();
24-
final KafkaValueOnlyDeserializerWrapper<String> wrapper =
25-
new KafkaValueOnlyDeserializerWrapper<>(StringDeserializer.class, config);
26-
27-
testUserClassLoaderIsUsedWhen(() -> {
28-
wrapper.open(deserializationContext);
29-
return null;
30-
}, new StringDeserializer());
22+
final KafkaValueOnlyDeserializerWrapperCaptureForTest wrapper =
23+
new KafkaValueOnlyDeserializerWrapperCaptureForTest();
24+
final ClassLoader classLoader =
25+
FlinkUserCodeClassLoaders.childFirst(
26+
new URL[0],
27+
getClass().getClassLoader(),
28+
new String[0],
29+
throwable -> {},
30+
true);
31+
wrapper.open(
32+
new DeserializationSchema.InitializationContext() {
33+
@Override
34+
public MetricGroup getMetricGroup() {
35+
return new UnregisteredMetricsGroup();
36+
}
37+
38+
@Override
39+
public UserCodeClassLoader getUserCodeClassLoader() {
40+
return SimpleUserCodeClassLoader.create(classLoader);
41+
}
42+
});
43+
44+
assertEquals(classLoader, wrapper.getClassLoaderUsed());
45+
}
46+
47+
static class KafkaValueOnlyDeserializerWrapperCaptureForTest
48+
extends KafkaValueOnlyDeserializerWrapper<String> {
49+
private ClassLoader classLoaderUsed;
50+
51+
KafkaValueOnlyDeserializerWrapperCaptureForTest() {
52+
super(StringDeserializer.class, new HashMap<>());
53+
}
54+
55+
public ClassLoader getClassLoaderUsed() {
56+
return classLoaderUsed;
57+
}
58+
59+
@Override
60+
protected void initializeDeserializer(ClassLoader classLoader) throws Exception {
61+
classLoaderUsed = classLoader;
62+
super.initializeDeserializer(classLoader);
63+
}
3164
}
3265
}

flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/SerializationTestBase.java

-58
This file was deleted.

flink-connector-kafka/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker

-1
This file was deleted.

0 commit comments

Comments
 (0)