Add tail mode to file source for continuous log tailing #6853
Add tail mode to file source for continuous log tailing #6853srikanthpadakanti wants to merge 19 commits into
Conversation
Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com>
Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com>
Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com>
…FileSourceTests Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com>
Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com>
…ilFileReaderPoolTest Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com>
…tests Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com>
Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com>
Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com>
Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com>
|
Hello @dlvenable @kkondaka Please review. |
dlvenable
left a comment
There was a problem hiding this comment.
@srikanthpadakanti , Thank you for this contribution! I have a few high-level comments.
| checkNotNull(buffer, "Buffer cannot be null for file source to start"); | ||
| Objects.requireNonNull(buffer, "Buffer cannot be null for file source to start"); | ||
|
|
||
| if (fileSourceConfig.isTail()) { |
There was a problem hiding this comment.
This creates a rather stark difference between tailing and a one-time read. The configurations should be consistent between the two. I think the one case we don't need to care about are the legacy configurations. They can not gain any new features.
record_typeformat
But somebody could want to set tail: false and use multiple paths or end-to-end acknowledgements. The tail just controls repeated reading.
I'd even say that you could use parallel reads (multi threads) without tail to read many different paths.
There was a problem hiding this comment.
Done. Removed the validation gate that restricted paths/globs to tail-only. Now ClassicFileStrategy and CodecFileStrategy both resolve glob patterns via GlobPathResolver and iterate all matched files.
tail only controls whether the source continuously monitors for new content after initial read.
There was a problem hiding this comment.
I think we should have them unified more structurally. Right now they are still split at the code level.
At a high-level we have two paths - the legacy path and the modern path. The legacy is the format/record_type and doesn't have codec. We don't need to add any new features to the legacy path. The modern path would include tail or no tail, acknowledgements, globs, etc.
FileSource.start(buffer)
│
├─ Legacy path (format/record_type, no codec, no globs, no tail)
│ └─ ClassicFileStrategy [unchanged, no new features]
│
└─ Modern path (everything else)
└─ FileReaderStrategy
├─ GlobPathResolver (resolve paths/globs + excludes)
├─ FileReaderPool (thread pool, schedules FileReaders)
├─ FileReader (per-file: codec, acks, metadata, offset tracking)
├─ CheckpointRegistry (optional, for resume)
└─ DirectoryWatcher (only when tail: true)
We'd have some decision like this:
if (isLegacyConfig()) {
fileStrategy = new ClassicFileStrategy();
} else {
fileStrategy = new FileReaderStrategy(...);
}
The FileReaderStrategy would use a new FileReader which is essentially what you have with TailFileReader. When tail is enabled, then it would detect rotations and read again. But overall it would be the same code for tail versus non-tail.
Some options become meaningful for both modes:
- reader_threads — parallel reads in one-shot mode too
- max_active_files — bounds concurrency for both
- include_file_metadata — available in one-shot
- acknowledgments — available in one-shot
- batch_size / batch_timeout — applies to both
Some remain tail-only:
- close_inactive, close_removed — no meaning for one-shot
- rotate_wait, rotation_drain_timeout — only relevant when watching
- poll_interval — only when polling a directory
- start_position: end — Some other positions could make sense, but certainly not
end
Also, our validation should reject tail-only options when tail: false.
There was a problem hiding this comment.
Thanks for the detailed architecture diagram. I agree the modern path should be unified, having TailFileReader/TailFileReaderPool handle both modes with DirectoryWatcher as the only tail specific addition makes sense structurally.
One concern: for one shot mode with 3 files and tail: false, spinning up a FileReaderPool with thread scheduling, checkpoint registry, and the full lifecycle feels heavyweight compared to what ClassicFileStrategy does today (sequential read, done). Users who just want to read a single file with a codec and exit would now go through the pool machinery. Is the overhead acceptable or should there be a lightweight path for small file counts?
Also on validation, rejecting tail only configs when tail: false is clean, but start_position is a gray area. beginning makes sense for one shot (read full file), but end with tail: false would mean "read nothing and exit" which is valid but confusing. Should we default start_position to beginning when tail: false and only allow end with tail: true?
Happy to start the refactor. Want me to rename TailFileReader to FileReader, TailFileReaderPool to FileReaderPool, create FileReaderStrategy, and wire DirectoryWatcher conditionally? Or would you prefer I put up a design first?
There was a problem hiding this comment.
or one shot mode with 3 files and tail: false, spinning up a FileReaderPool with thread scheduling, checkpoint registry, and the full lifecycle feels heavyweight compared to what ClassicFileStrategy does today (sequential read, done).
I agree with that point. But I think the options should be consistent. One may have a one-shot with a 1000 files to read and want to spin up multiple threads. Perhaps the solution is to have different defaults: When tail: false set reader_threads: 1 by default. When tail: true set reader_threads: 4 (which is what this PR has)? That seems a simple solution to having a consistent approach.
Should we default start_position to beginning when tail: false and only allow end with tail: true?
Yes. This is what I was suggesting above, but maybe didn't express it clearly.
Happy to start the refactor. Want me to rename TailFileReader to FileReader, TailFileReaderPool to FileReaderPool, create FileReaderStrategy, and wire DirectoryWatcher conditionally? Or would you prefer I put up a design first?
I'm happy for you to make the change directly. If you want to put a design first I can also review that before you code. But I think we are generally aligning so I'll leave that up to you.
There was a problem hiding this comment.
Got it. Will default reader_threads to 1 for tail:false and 4 for tail:true, reject start_position:end when tail:false, and unify into a single FileReaderStrategy with DirectoryWatcher conditional on tail. Going straight to code.
| } | ||
|
|
||
| public Set<Path> resolve() { | ||
| final Set<Path> result = new HashSet<>(); |
There was a problem hiding this comment.
Is there a library that we can use which provides this?
There was a problem hiding this comment.
I looked into available libraries. Apache Commons IO has WildcardFileFilter but uses non-standard wildcard syntax (not POSIX glob). There's no existing utility in Data Prepper for filesystem glob resolution. The implementation uses JDK's PathMatcher + Files.walkFileTree which is the standard approach for this. I didn't find a library that would be a net improvement. Let me know if you have one in mind.
There was a problem hiding this comment.
Thanks for looking into it. I think these came originally from Ant and I thought they might have a library.
…consolidate codec creation, add file prefix to thread names Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com>
…-tail-source-6782
Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com>
8346cb8 to
624cf4a
Compare
Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com>
…ne-shot modes Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com>
|
Hello @dlvenable Please review this. |
Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com>
Description
Add continuous file tailing support to the existing file source plugin. When
tail: trueis configured, the source continuously monitors log files using glob patterns, tracks byte offsets via checkpoint persistence, detects log rotation (both create/rename and copytruncate), handles back pressure with bounded retry, and supports end-to-end acknowledgements. Whentail: false(default), existing one-shot behavior is preserved with zero changes.Moves the file source from
data-prepper-plugins/commonto its own moduledata-prepper-plugins/file-source. Plugin name remainsfile. No breaking change for existing pipelines.Key capabilities:
max_active_filesand pending queuestart_position: end or beginning for new files.corrupt)max_read_time_per_fileto prevent thread starvationclose_inactiveandclose_removedfor file handle lifecyclecodec: json: {},codec: newline: {})file.path,file.name,offset)Multiline support will be a follow-up enhancement to the
newline-codecsplugin. The file source architecture supports it, once the newline codec adds multiline config, it works with this source out of the box.Issues Resolved
Resolves #6782
#6782
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.