Skip to content

Commit bde1b52

Browse files
Fix: Updated GZipCompressionEngine to use GzipCompressorInputStream (#1570)
Signed-off-by: Asif Sohail Mohammed <[email protected]>
1 parent 2f91aa7 commit bde1b52

File tree

4 files changed

+12
-7
lines changed

4 files changed

+12
-7
lines changed

data-prepper-plugins/s3-source/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ dependencies {
1818
implementation 'software.amazon.awssdk:sts'
1919
implementation 'software.amazon.awssdk:sqs'
2020
implementation 'com.amazonaws:aws-java-sdk-s3:1.12.220'
21+
implementation 'org.apache.commons:commons-compress:1.21'
2122
implementation 'org.hibernate.validator:hibernate-validator:7.0.4.Final'
2223
testImplementation 'org.apache.commons:commons-lang3:3.12.0'
2324
}

data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/codec/NewlineDelimitedCodec.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ public NewlineDelimitedCodec(final NewlineDelimitedConfig config) {
3636

3737
@Override
3838
public void parse(final InputStream inputStream, final Consumer<Record<Event>> eventConsumer) throws IOException {
39-
final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
40-
41-
parseBufferedReader(reader, eventConsumer);
39+
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
40+
parseBufferedReader(reader, eventConsumer);
41+
}
4242
}
4343

4444
private void parseBufferedReader(final BufferedReader reader, final Consumer<Record<Event>> eventConsumer) throws IOException {

data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/compression/GZipCompressionEngine.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,17 @@
55

66
package com.amazon.dataprepper.plugins.source.compression;
77

8+
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
9+
810
import java.io.IOException;
911
import java.io.InputStream;
10-
import java.util.zip.GZIPInputStream;
1112

1213
public class GZipCompressionEngine implements CompressionEngine {
1314
@Override
1415
public InputStream createInputStream(final String s3Key, final InputStream responseInputStream) throws IOException {
15-
return new GZIPInputStream(responseInputStream);
16+
// We are using GzipCompressorInputStream here to decompress because GZIPInputStream doesn't decompress concatenated .gz files
17+
// it stops after the first member and silently ignores the rest.
18+
// It doesn't leave the read position to point to the beginning of the next member.
19+
return new GzipCompressorInputStream(responseInputStream, true);
1620
}
1721
}

data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/compression/GZipCompressionEngineTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package com.amazon.dataprepper.plugins.source.compression;
77

8+
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
89
import org.junit.jupiter.api.BeforeEach;
910
import org.junit.jupiter.api.Test;
1011
import software.amazon.awssdk.core.ResponseInputStream;
@@ -16,7 +17,6 @@
1617
import java.io.InputStream;
1718
import java.nio.charset.StandardCharsets;
1819
import java.util.UUID;
19-
import java.util.zip.GZIPInputStream;
2020
import java.util.zip.GZIPOutputStream;
2121

2222
import static org.hamcrest.CoreMatchers.instanceOf;
@@ -51,7 +51,7 @@ void createInputStream_with_gzip_should_return_instance_of_GZIPInputStream() thr
5151

5252
final InputStream inputStream = compressionEngine.createInputStream(s3Key, byteInStream);
5353

54-
assertThat(inputStream, instanceOf(GZIPInputStream.class));
54+
assertThat(inputStream, instanceOf(GzipCompressorInputStream.class));
5555
assertThat(inputStream.readAllBytes(), equalTo(testStringBytes));
5656
}
5757
}

0 commit comments

Comments
 (0)