Skip to content

Commit 624cf4a

Browse files
Make config options orthogonal so paths and glob work with tail false
Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com>
1 parent 849dbb8 commit 624cf4a

4 files changed

Lines changed: 73 additions & 31 deletions

File tree

data-prepper-plugins/file-source/src/main/java/org/opensearch/dataprepper/plugins/source/file/FileSource.java

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.nio.file.Paths;
4242
import java.util.HashMap;
4343
import java.util.Map;
44+
import java.util.Set;
4445
import java.util.concurrent.TimeoutException;
4546

4647
import static java.lang.String.format;
@@ -108,7 +109,7 @@ public void start(final Buffer<Record<Object>> buffer) {
108109
return;
109110
}
110111

111-
LOG.info("Starting file source with {} path.", fileSourceConfig.getFilePathToRead());
112+
LOG.info("Starting file source with paths: {}", fileSourceConfig.getAllPaths());
112113

113114
readThread = new Thread(() -> {
114115
fileStrategy.start(buffer);
@@ -242,16 +243,29 @@ private interface FileStrategy {
242243
private class ClassicFileStrategy implements FileStrategy {
243244
@Override
244245
public void start(Buffer<Record<Object>> buffer) {
245-
Path filePath = Paths.get(fileSourceConfig.getFilePathToRead());
246+
final GlobPathResolver resolver = new GlobPathResolver(
247+
fileSourceConfig.getAllPaths(), fileSourceConfig.getExcludePaths());
248+
final Set<Path> resolvedPaths = resolver.resolve();
249+
if (resolvedPaths.isEmpty() && fileSourceConfig.getFilePathToRead() != null) {
250+
resolvedPaths.add(Paths.get(fileSourceConfig.getFilePathToRead()).toAbsolutePath().normalize());
251+
}
252+
for (final Path filePath : resolvedPaths) {
253+
if (isStopRequested) {
254+
break;
255+
}
256+
readFile(filePath, buffer);
257+
}
258+
}
259+
260+
private void readFile(final Path filePath, final Buffer<Record<Object>> buffer) {
246261
try (BufferedReader reader = new BufferedReader(new InputStreamReader(decompressionEngine.createInputStream(Files.newInputStream(filePath)), Charset.forName(fileSourceConfig.getEncoding())))) {
247262
String line;
248263
while ((line = reader.readLine()) != null && !isStopRequested) {
249264
writeLineAsEventOrString(line, buffer);
250265
}
251266
} catch (IOException | TimeoutException | IllegalArgumentException ex) {
252-
LOG.error("Error processing the input file path [{}]", fileSourceConfig.getFilePathToRead(), ex);
253-
throw new RuntimeException(format("Error processing the input file %s",
254-
fileSourceConfig.getFilePathToRead()), ex);
267+
LOG.error("Error processing the input file path [{}]", filePath, ex);
268+
throw new RuntimeException(format("Error processing the input file %s", filePath), ex);
255269
}
256270
}
257271

@@ -305,19 +319,29 @@ private class CodecFileStrategy implements FileStrategy {
305319

306320
@Override
307321
public void start(final Buffer<Record<Object>> buffer) {
308-
Path filePath = Paths.get(fileSourceConfig.getFilePathToRead());
309-
try(InputStream is = decompressionEngine.createInputStream(Files.newInputStream(filePath))) {
310-
codec.parse(is, eventRecord -> {
311-
try {
312-
buffer.write((Record) eventRecord, writeTimeout);
313-
} catch (TimeoutException e) {
314-
throw new RuntimeException(e);
315-
}
316-
});
317-
} catch (final IOException e) {
318-
throw new RuntimeException(e);
322+
final GlobPathResolver resolver = new GlobPathResolver(
323+
fileSourceConfig.getAllPaths(), fileSourceConfig.getExcludePaths());
324+
final Set<Path> resolvedPaths = resolver.resolve();
325+
if (resolvedPaths.isEmpty() && fileSourceConfig.getFilePathToRead() != null) {
326+
resolvedPaths.add(Paths.get(fileSourceConfig.getFilePathToRead()).toAbsolutePath().normalize());
327+
}
328+
for (final Path filePath : resolvedPaths) {
329+
if (isStopRequested) {
330+
break;
331+
}
332+
try (InputStream is = decompressionEngine.createInputStream(Files.newInputStream(filePath))) {
333+
codec.parse(is, eventRecord -> {
334+
try {
335+
buffer.write((Record) eventRecord, writeTimeout);
336+
} catch (TimeoutException e) {
337+
throw new RuntimeException(e);
338+
}
339+
});
340+
} catch (final IOException e) {
341+
LOG.error("Error processing file with codec [{}]", filePath, e);
342+
throw new RuntimeException(e);
343+
}
319344
}
320-
321345
}
322346
}
323347

data-prepper-plugins/file-source/src/main/java/org/opensearch/dataprepper/plugins/source/file/FileSourceConfig.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -248,14 +248,9 @@ public List<String> getExcludePaths() {
248248
}
249249

250250
void validate() {
251-
if (tail) {
252-
Preconditions.checkArgument(
253-
(filePathToRead != null && !filePathToRead.isEmpty()) || !paths.isEmpty(),
254-
"At least one of path or paths is required when tail is enabled");
255-
} else {
256-
Preconditions.checkArgument(filePathToRead != null,
257-
"path is required when tail is disabled. Use paths with tail: true for glob patterns.");
258-
}
251+
Preconditions.checkArgument(
252+
(filePathToRead != null && !filePathToRead.isEmpty()) || !paths.isEmpty(),
253+
"At least one of path or paths is required");
259254
}
260255

261256
@AssertTrue(message = "The file source requires recordType to be event when using a codec.")

data-prepper-plugins/file-source/src/test/java/org/opensearch/dataprepper/plugins/source/file/FileSourceConfigTest.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,13 +121,21 @@ void validate_succeeds_with_path_when_tail_false() {
121121
}
122122

123123
@Test
124-
void validate_fails_without_path_when_tail_false() {
124+
void validate_fails_without_any_path() {
125125
final Map<String, Object> configMap = Map.of("format", "plain", "record_type", "string");
126126
final FileSourceConfig config = OBJECT_MAPPER.convertValue(configMap, FileSourceConfig.class);
127127

128128
assertThrows(IllegalArgumentException.class, config::validate);
129129
}
130130

131+
@Test
132+
void validate_succeeds_with_paths_when_tail_false() {
133+
final Map<String, Object> configMap = Map.of("paths", List.of("/var/log/*.log"));
134+
final FileSourceConfig config = OBJECT_MAPPER.convertValue(configMap, FileSourceConfig.class);
135+
136+
config.validate();
137+
}
138+
131139
@Test
132140
void validate_succeeds_with_paths_when_tail_true() {
133141
final Map<String, Object> configMap = Map.of("tail", true, "paths", List.of("/var/log/*.log"));

data-prepper-plugins/file-source/src/test/java/org/opensearch/dataprepper/plugins/source/file/FileSourceTests.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -182,12 +182,9 @@ private BlockingBuffer<Record<Object>> getBuffer() throws JsonProcessingExceptio
182182
}
183183

184184
@Test
185-
public void testFileSourceWithEmptyFilePathDoesNotWriteToBuffer() throws TimeoutException {
186-
buffer = mock(Buffer.class);
185+
public void testFileSourceWithEmptyFilePathThrowsValidationError() {
187186
pluginSettings.put(FileSourceConfig.ATTRIBUTE_PATH, "");
188-
fileSource = createObjectUnderTest();
189-
fileSource.start(buffer);
190-
verify(buffer, after(500).never()).write(any(Record.class), anyInt());
187+
assertThrows(IllegalArgumentException.class, () -> createObjectUnderTest());
191188
}
192189

193190
@Test
@@ -205,6 +202,15 @@ public void testFileSourceWithNullFilePathThrowsNullPointerException() {
205202
assertThrows(IllegalArgumentException.class, FileSourceTests.this::createObjectUnderTest);
206203
}
207204

205+
@Test
206+
public void testStopBeforeStartPreventsProcessing() throws TimeoutException {
207+
buffer = mock(Buffer.class);
208+
fileSource = createObjectUnderTest();
209+
fileSource.stop();
210+
fileSource.start(buffer);
211+
verify(buffer, after(500).never()).write(any(Record.class), anyInt());
212+
}
213+
208214
@Test
209215
public void testFileWithPlainTextAddsEventsToBufferCorrectly() throws JsonProcessingException {
210216
fileSource = createObjectUnderTest();
@@ -386,6 +392,15 @@ void start_codec_consumer_wraps_timeout_exception() throws IOException, TimeoutE
386392

387393
assertThrows(RuntimeException.class, () -> actualConsumer.accept(record));
388394
}
395+
396+
@Test
397+
void stop_before_start_prevents_codec_processing() throws IOException {
398+
final FileSource objectUnderTest = createObjectUnderTest();
399+
objectUnderTest.stop();
400+
objectUnderTest.start(buffer);
401+
402+
verify(inputCodec, after(500).never()).parse(any(InputStream.class), any(Consumer.class));
403+
}
389404
}
390405

391406
@Nested

0 commit comments

Comments
 (0)