Skip to content

Commit 1f24b62

Browse files
committed
[FLINK-34996][Connectors/Kafka] Simplify code and add more tests on serialization wrappers.
1 parent 8986f77 commit 1f24b62

File tree

4 files changed

+24
-18
lines changed

4 files changed

+24
-18
lines changed

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapper.java

+1-9
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class KafkaSerializerWrapper<IN> implements SerializationSchema<IN> {
6363

6464
@Override
6565
public void open(InitializationContext context) throws Exception {
66-
final ClassLoader userCodeClassLoader = selectClassLoader(context);
66+
final ClassLoader userCodeClassLoader = context.getUserCodeClassLoader().asClassLoader();
6767
try (TemporaryClassLoaderContext ignored =
6868
TemporaryClassLoaderContext.of(userCodeClassLoader)) {
6969
initializeSerializer(userCodeClassLoader);
@@ -84,14 +84,6 @@ public byte[] serialize(IN element) {
8484
return serializer.serialize(topicSelector.apply(element), element);
8585
}
8686

87-
/**
88-
* Selects the class loader to be used when instantiating the serializer. Using a class loader
89-
* with user code allows users to customize the serializer.
90-
*/
91-
protected ClassLoader selectClassLoader(InitializationContext context) {
92-
return context.getUserCodeClassLoader().asClassLoader();
93-
}
94-
9587
@SuppressWarnings("unchecked")
9688
protected void initializeSerializer(ClassLoader classLoader) throws Exception {
9789
serializer =

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapper.java

+1-9
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ class KafkaValueOnlyDeserializerWrapper<T> implements KafkaRecordDeserialization
5656

5757
@Override
5858
public void open(DeserializationSchema.InitializationContext context) throws Exception {
59-
ClassLoader userCodeClassLoader = selectClassLoader(context);
59+
ClassLoader userCodeClassLoader = context.getUserCodeClassLoader().asClassLoader();
6060
try (TemporaryClassLoaderContext ignored =
6161
TemporaryClassLoaderContext.of(userCodeClassLoader)) {
6262
initializeDeserializer(userCodeClassLoader);
@@ -98,14 +98,6 @@ public TypeInformation<T> getProducedType() {
9898
return TypeExtractor.createTypeInfo(Deserializer.class, deserializerClass, 0, null, null);
9999
}
100100

101-
/**
102-
* Selects the class loader to be used when instantiating the deserializer. Using a class loader
103-
* with user code allows users to customize the deserializer.
104-
*/
105-
protected ClassLoader selectClassLoader(DeserializationSchema.InitializationContext context) {
106-
return context.getUserCodeClassLoader().asClassLoader();
107-
}
108-
109101
@SuppressWarnings("unchecked")
110102
protected void initializeDeserializer(ClassLoader classLoader) throws Exception {
111103
deserializer =

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapperTest.java

+11
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.apache.flink.connector.kafka.sink;
22

33
import org.apache.flink.api.common.serialization.SerializationSchema;
4+
import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
45
import org.apache.flink.metrics.MetricGroup;
56
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
67
import org.apache.flink.util.FlinkUserCodeClassLoaders;
@@ -43,6 +44,16 @@ public UserCodeClassLoader getUserCodeClassLoader() {
4344
assertEquals(classLoader, wrapper.getClassLoaderUsed());
4445
}
4546

47+
@Test
48+
public void testDefaultClassLoaderIsUsed() throws Exception {
49+
final KafkaSerializerWrapperCaptureForTest wrapper =
50+
new KafkaSerializerWrapperCaptureForTest();
51+
wrapper.open(new DummyInitializationContext());
52+
53+
assertEquals(
54+
DummyInitializationContext.class.getClassLoader(), wrapper.getClassLoaderUsed());
55+
}
56+
4657
static class KafkaSerializerWrapperCaptureForTest extends KafkaSerializerWrapper<String> {
4758
private ClassLoader classLoaderUsed;
4859

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapperTest.java

+11
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.apache.flink.connector.kafka.source.reader.deserializer;
22

33
import org.apache.flink.api.common.serialization.DeserializationSchema;
4+
import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
45
import org.apache.flink.metrics.MetricGroup;
56
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
67
import org.apache.flink.util.FlinkUserCodeClassLoaders;
@@ -44,6 +45,16 @@ public UserCodeClassLoader getUserCodeClassLoader() {
4445
assertEquals(classLoader, wrapper.getClassLoaderUsed());
4546
}
4647

48+
@Test
49+
public void testDefaultClassLoaderIsUsed() throws Exception {
50+
final KafkaValueOnlyDeserializerWrapperCaptureForTest wrapper =
51+
new KafkaValueOnlyDeserializerWrapperCaptureForTest();
52+
wrapper.open(new DummyInitializationContext());
53+
54+
assertEquals(
55+
DummyInitializationContext.class.getClassLoader(), wrapper.getClassLoaderUsed());
56+
}
57+
4758
static class KafkaValueOnlyDeserializerWrapperCaptureForTest
4859
extends KafkaValueOnlyDeserializerWrapper<String> {
4960
private ClassLoader classLoaderUsed;

0 commit comments

Comments
 (0)