Skip to content

Commit ce57396

Browse files
authored
Filesource compression support (opensearch-project#5255)
Add support for compressed files in FileSource Signed-off-by: Joël Marty <jmarty@twilio.com> Signed-off-by: Joël Marty <134835+joelmarty@users.noreply.github.com>
1 parent a99a3a5 commit ce57396

4 files changed

Lines changed: 63 additions & 18 deletions

File tree

data-prepper-plugins/common/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ A source plugin to read input data from the specified file path. The file source
3535
Temporarily, `type` can either be `event` or `string`. If you would like to use the file source for log analytics use cases like grok,
3636
change this to `event`.
3737
38+
* `compression` (String): The source file compression format, if any. Valid options are `none`, `gzip` and `snappy`. Default is `none`.
39+
3840
## `file` (sink)
3941
4042
A sink plugin to write output data to the specified file path.

data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/file/FileSource.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
1313
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
1414
import org.opensearch.dataprepper.model.buffer.Buffer;
15+
import org.opensearch.dataprepper.model.codec.DecompressionEngine;
1516
import org.opensearch.dataprepper.model.codec.InputCodec;
1617
import org.opensearch.dataprepper.model.configuration.PluginModel;
1718
import org.opensearch.dataprepper.model.configuration.PluginSetting;
@@ -23,11 +24,14 @@
2324
import org.slf4j.Logger;
2425
import org.slf4j.LoggerFactory;
2526

27+
2628
import java.io.BufferedReader;
27-
import java.io.FileInputStream;
2829
import java.io.IOException;
30+
import java.io.InputStream;
31+
import java.io.InputStreamReader;
2932
import java.nio.charset.StandardCharsets;
3033
import java.nio.file.Files;
34+
import java.nio.file.Path;
3135
import java.nio.file.Paths;
3236
import java.util.HashMap;
3337
import java.util.Map;
@@ -48,6 +52,7 @@ public class FileSource implements Source<Record<Object>> {
4852
private final FileSourceConfig fileSourceConfig;
4953
private final FileStrategy fileStrategy;
5054
private final EventFactory eventFactory;
55+
private final DecompressionEngine decompressionEngine;
5156

5257
private Thread readThread;
5358

@@ -63,6 +68,7 @@ public FileSource(
6368
this.fileSourceConfig = fileSourceConfig;
6469
this.isStopRequested = false;
6570
this.writeTimeout = FileSourceConfig.DEFAULT_TIMEOUT;
71+
this.decompressionEngine = fileSourceConfig.getCompression().getDecompressionEngine();
6672

6773
if(fileSourceConfig.getCodec() != null) {
6874
fileStrategy = new CodecFileStrategy(pluginFactory);
@@ -104,7 +110,8 @@ private interface FileStrategy {
104110
private class ClassicFileStrategy implements FileStrategy {
105111
@Override
106112
public void start(Buffer<Record<Object>> buffer) {
107-
try (BufferedReader reader = Files.newBufferedReader(Paths.get(fileSourceConfig.getFilePathToRead()), StandardCharsets.UTF_8)) {
113+
Path filePath = Paths.get(fileSourceConfig.getFilePathToRead());
114+
try (BufferedReader reader = new BufferedReader(new InputStreamReader(decompressionEngine.createInputStream(Files.newInputStream(filePath)), StandardCharsets.UTF_8))) {
108115
String line;
109116
while ((line = reader.readLine()) != null && !isStopRequested) {
110117
writeLineAsEventOrString(line, buffer);
@@ -166,13 +173,13 @@ private class CodecFileStrategy implements FileStrategy {
166173
final PluginModel codecConfiguration = fileSourceConfig.getCodec();
167174
final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings());
168175
codec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSettings);
169-
170176
}
171177

172178
@Override
173179
public void start(final Buffer<Record<Object>> buffer) {
174-
try {
175-
codec.parse(new FileInputStream(fileSourceConfig.getFilePathToRead()), eventRecord -> {
180+
Path filePath = Paths.get(fileSourceConfig.getFilePathToRead());
181+
try(InputStream is = decompressionEngine.createInputStream(Files.newInputStream(filePath))) {
182+
codec.parse(is, eventRecord -> {
176183
try {
177184
buffer.write((Record) eventRecord, writeTimeout);
178185
} catch (TimeoutException e) {
@@ -186,4 +193,4 @@ public void start(final Buffer<Record<Object>> buffer) {
186193
}
187194
}
188195

189-
}
196+
}

data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/file/FileSourceConfig.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import com.google.common.base.Preconditions;
1111
import jakarta.validation.constraints.AssertTrue;
1212
import org.opensearch.dataprepper.model.configuration.PluginModel;
13+
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
1314

1415
import java.util.Objects;
1516

@@ -35,6 +36,9 @@ public class FileSourceConfig {
3536
@JsonProperty("codec")
3637
private PluginModel codec;
3738

39+
@JsonProperty("compression")
40+
private CompressionOption compression = CompressionOption.NONE;
41+
3842
public String getFilePathToRead() {
3943
return filePathToRead;
4044
}
@@ -52,6 +56,10 @@ public PluginModel getCodec() {
5256
return codec;
5357
}
5458

59+
public CompressionOption getCompression() {
60+
return compression;
61+
}
62+
5563
void validate() {
5664
Objects.requireNonNull(filePathToRead, "File path is required");
5765
Preconditions.checkArgument(recordType.equals(EVENT_TYPE) || recordType.equals(DEFAULT_TYPE), "Invalid type: must be either [event] or [string]");

data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/file/FileSourceTests.java

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,18 @@
1818
import org.opensearch.dataprepper.event.TestEventFactory;
1919
import org.opensearch.dataprepper.metrics.PluginMetrics;
2020
import org.opensearch.dataprepper.model.buffer.Buffer;
21+
import org.opensearch.dataprepper.model.codec.DecompressionEngine;
2122
import org.opensearch.dataprepper.model.codec.InputCodec;
2223
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
24+
import org.opensearch.dataprepper.model.configuration.PluginModel;
2325
import org.opensearch.dataprepper.model.configuration.PluginSetting;
2426
import org.opensearch.dataprepper.model.event.Event;
2527
import org.opensearch.dataprepper.model.event.EventBuilder;
2628
import org.opensearch.dataprepper.model.plugin.PluginFactory;
2729
import org.opensearch.dataprepper.model.record.Record;
2830
import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer;
2931
import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBufferConfig;
32+
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
3033
import org.slf4j.Logger;
3134
import org.slf4j.LoggerFactory;
3235

@@ -90,6 +93,31 @@ private FileSource createObjectUnderTest() {
9093
return new FileSource(fileSourceConfig, pluginMetrics, pluginFactory, TestEventFactory.getTestEventFactory());
9194
}
9295

96+
/**
97+
* Variant of creatgeObjectUnderTest that uses mocks for the configuration instead of object mapper, so we can
98+
* pass concrete mocks to the FileSource through the FileSourceConfig.
99+
* @param codec the codec to use in the configuration
100+
* @param engine the {@link DecompressionEngine} to use in the configuration
101+
* @return
102+
*/
103+
private FileSource createObjectUnderTest(PluginModel codec, DecompressionEngine engine) {
104+
FileSourceConfig fileSourceConfig = mock(FileSourceConfig.class);
105+
106+
when(fileSourceConfig.getFilePathToRead()).thenReturn(TEST_FILE_PATH_PLAIN);
107+
108+
if (codec != null) {
109+
when(fileSourceConfig.getCodec()).thenReturn(codec);
110+
}
111+
112+
if (engine != null) {
113+
CompressionOption compressionOption = mock(CompressionOption.class);
114+
when(compressionOption.getDecompressionEngine()).thenReturn(engine);
115+
when(fileSourceConfig.getCompression()).thenReturn(compressionOption);
116+
}
117+
118+
return new FileSource(fileSourceConfig, pluginMetrics, pluginFactory, TestEventFactory.getTestEventFactory());
119+
}
120+
93121
@Nested
94122
class WithRecord {
95123
private static final String TEST_PIPELINE_NAME = "pipeline";
@@ -285,6 +313,9 @@ class WithCodec {
285313
@Mock
286314
private Buffer buffer;
287315

316+
@Mock
317+
private DecompressionEngine decompressionEngine;
318+
288319
@BeforeEach
289320
void setUp() {
290321
Map<String, String> codecConfiguration = Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString());
@@ -297,21 +328,18 @@ void setUp() {
297328

298329
@Test
299330
void start_will_parse_codec_with_correct_inputStream() throws IOException {
300-
createObjectUnderTest().start(buffer);
331+
final FileInputStream decompressedStream = new FileInputStream(TEST_FILE_PATH_PLAIN);
332+
DecompressionEngine mockEngine = mock(DecompressionEngine.class);
333+
when(mockEngine.createInputStream(any(InputStream.class))).thenReturn(decompressedStream);
301334

302-
final ArgumentCaptor<InputStream> inputStreamArgumentCaptor = ArgumentCaptor.forClass(InputStream.class);
335+
PluginModel fakeCodec = mock(PluginModel.class);
336+
when(fakeCodec.getPluginName()).thenReturn("fake_codec");
337+
when(fakeCodec.getPluginSettings()).thenReturn(Map.of());
303338

304-
await().atMost(2, TimeUnit.SECONDS)
305-
.untilAsserted(() -> verify(inputCodec).parse(any(InputStream.class), any(Consumer.class)));
306-
verify(inputCodec).parse(inputStreamArgumentCaptor.capture(), any(Consumer.class));
307-
308-
final InputStream actualInputStream = inputStreamArgumentCaptor.getValue();
339+
createObjectUnderTest(fakeCodec, mockEngine).start(buffer);
309340

310-
final byte[] actualBytes = actualInputStream.readAllBytes();
311-
final FileInputStream fileInputStream = new FileInputStream(TEST_FILE_PATH_PLAIN);
312-
final byte[] expectedBytes = fileInputStream.readAllBytes();
313-
314-
assertThat(actualBytes, equalTo(expectedBytes));
341+
await().atMost(2, TimeUnit.SECONDS)
342+
.untilAsserted(() -> verify(inputCodec).parse(eq(decompressedStream), any(Consumer.class)));
315343
}
316344

317345
@Test

0 commit comments

Comments
 (0)