File Tail Source for Data Prepper (v2)
Motivation
Data Prepper has a file source that reads a single file once and exits. It is a development utility. There is no way to continuously tail log files with offset tracking, rotation detection, or glob pattern matching.
Every major log pipeline (Filebeat, Fluent Bit, Vector, Logstash) ships file tailing as their most fundamental input. Without it, organizations must deploy Fluent Bit or Filebeat in front of Data Prepper just to collect logs from files. That adds operational complexity and another failure point in the pipeline.
Current Behavior
The existing file source in data-prepper-plugins/common:
- Reads a single file path once from start to end, then the source thread exits
- Supports
plain and json formats with optional codec and compression
- Writes each line to the buffer with a fixed 5 second timeout. If the buffer is full, the write fails and the line is lost
- No glob patterns. No watching for new files
- No offset tracking. On restart, the entire file is re read or skipped entirely
- No rotation handling. If the file is renamed mid read, the source does not follow it
- No multiline assembly. Each line is one event regardless of content
- No acknowledgement support.
areAcknowledgementsEnabled() returns false
- No file metadata on events (no path, no offset, no line number)
Proposed Solution
Extend the existing file source with a tail boolean config option (default false). When tail: true, continuous tailing with offset tracking, rotation detection, glob patterns, and back pressure handling is enabled. When tail: false, the current one shot behavior is preserved.
The source moves from data-prepper-plugins/common to its own Gradle module data-prepper-plugins/file-source. The plugin name remains file. Existing pipeline configs continue to work without changes.
Key Design Decisions
Plugin name stays file: per dlvenable's feedback, the existing file source is enhanced rather than replaced. A tail boolean controls the mode.
Codec handles parsing, multiline is part of the newline codec: per dlvenable's feedback, the standard codec config handles input parsing. Multiline options live under the newline/plain codec, not as top level source config. JSON and NDJSON codecs do not need multiline since they are record delimited.
path and paths coexist: the existing path (singular) field continues to work for backward compatibility. A new paths (plural) field accepts a list of glob patterns. If both are provided, they are merged.
Config naming follows project convention: all acknowledgement related config uses acknowledgment_* not ack_* (e.g., acknowledgment_timeout, max_acknowledgment_retries).
Architecture (when tail: true)
FileSource (Source<Record<Event>>)
|
+-- start(Buffer)
| |
| +-- DirectoryWatcher (thread)
| | Monitors paths via WatchService or polling.
| | Discovers new files matching glob.
| | Detects rotation events.
| |
| +-- FileReaderPool (thread pool)
| | One FileReader per active file.
| | Reads via FileChannel with byte offset tracking.
| | Writes Record<Event> to Buffer via codec.
| | Pauses on back pressure, resumes when buffer has space.
| | max_read_time_per_file prevents starvation.
| |
| +-- CheckpointRegistry
| | Persists (file identity, offset) to disk as JSON.
| | Atomic writes via temp file + rename.
| | Periodic flush + flush on acknowledgement callback.
| |
| +-- RotationDetector
| Detects inode change at path (create/rename rotation).
| Detects offset > file size (copytruncate rotation).
| Drains old file before switching to new.
|
+-- stop()
Signals all threads to stop.
Persists final checkpoint.
Closes all file handles.
File Discovery
The source accepts glob patterns (/var/log/app/*.log, /var/log/**/*.log) via the paths config and resolves them using java.nio.file.PathMatcher. An exclude_paths option skips files matching exclusion patterns. The existing path (singular) field is also supported and merged into paths for backward compatibility.
On local filesystems, the source uses java.nio.file.WatchService (inotify on Linux, kqueue on macOS) for low latency change detection. On network filesystems (NFS, CIFS), inotify does not work. The source detects NFS by checking FileStore.type() and falls back to periodic polling automatically. On macOS, the JDK WatchService has known event delivery issues, so the source runs a supplementary polling scan regardless.
When WatchService delivers an OVERFLOW event (OS queue overflowed), the source does a full directory rescan to reconcile state.
File Identity
Reliable identity is critical for tracking offsets across renames and rotations.
- Primary: inode + device. Extracted from
BasicFileAttributes.fileKey() on Unix systems.
- Fallback: CRC fingerprint. If
fileKey() returns null (Windows, some NFS mounts), the source reads the first N bytes and computes a CRC32 checksum combined with file size and creation time.
- Inode reuse guard. On ext4, deleted inodes are recycled quickly. The source compares creation time against the registry entry. If creation times differ, the file is treated as new regardless of matching inode.
File Reading
Each active file gets a FileReader running in a thread pool. The reader uses FileChannel (not BufferedReader) because FileChannel.position() provides exact byte offsets for checkpointing.
Reading loop:
- Read into a ByteBuffer (default 64 KB)
- Pass bytes to the configured codec for parsing
- Write events to the Buffer
- On
TimeoutException (buffer full), pause and retry with a configurable interval. Do not advance the offset until the write succeeds.
- After
max_read_time_per_file (default 5 seconds), yield the thread to prevent starvation when active files outnumber reader threads.
Codec Integration
The source uses the standard Data Prepper codec config for input parsing. The default codec is newline (one line per event). Multiline options are configured under the newline codec:
source:
file:
tail: true
paths:
- "/var/log/app/*.log"
codec:
newline:
multiline:
pattern: "^\\d{4}-\\d{2}-\\d{2}"
negate: false
match: "after"
max_lines: 500
max_bytes: 1048576
flush_timeout: "5s"
For JSON input, the json codec handles record boundaries without multiline:
source:
file:
tail: true
paths:
- "/var/log/app/*.json"
codec:
json: {}
Rotation Handling
Create/rename rotation (logrotate default): The rotation tool renames app.log to app.log.1 and creates a new app.log. The source detects the inode change at the original path. The old reader drains the renamed file to EOF. A new reader starts on the new file at offset 0. A rotation_wait timer handles the gap between rename and create.
Copytruncate rotation: The rotation tool copies app.log to a backup, then truncates the original in place. The source detects that file size dropped below the current offset, resets to 0, and re reads from the start.
Copytruncate data loss: Between the copy and the truncate, lines written by the application exist in neither the backup nor the truncated file. This is inherent to copytruncate and cannot be fixed at the reader level. The source documents this and recommends create/rename rotation where possible.
Checkpoint Registry
A JSON file stores (file identity, offset, path, creation time, last updated, status) per tracked file. Writes use atomic rename (write to temp file, then Files.move with ATOMIC_MOVE). If the file is corrupt on startup, the source renames it with a .corrupt suffix and starts fresh.
Without acknowledgements: offsets are flushed at a configurable interval (default 5 seconds). On crash, up to 5 seconds of data may be re delivered. At least once semantics.
With acknowledgements: offsets are committed only when the downstream pipeline acknowledges the batch. The registry maintains two offsets per file: readOffset (where the reader currently is) and committedOffset (last acknowledged). On restart, the source resumes from committedOffset.
Cleanup: entries for deleted files older than checkpoint_cleanup_after (default 7 days) are pruned hourly. This prevents unbounded registry growth in container environments.
Acknowledgement Integration
When the pipeline has acknowledgements: true, the source creates an AcknowledgementSet per batch (configurable batch_size, default 100 events). Each event is added to the set after successful buffer write. The source registers a callback:
- Positive ack: commit the batch end offset to the registry.
- Negative ack: do not advance the offset. The batch will be re read on the next cycle. This is standard at least once behavior, consistent with S3 and Kafka sources.
- Timeout: treat as negative ack.
acknowledgment_timeout defaults to 60 seconds.
A max_acknowledgment_retries (default 3) prevents infinite retry loops on persistently failing batches.
Back Pressure
When the buffer is full, the reader pauses and retries. While paused, the file handle stays open. On Linux, even if the file is deleted, the open descriptor keeps the data on disk.
The critical interaction is back pressure + rotation + deletion. If a file is rotated and the old copy is deleted while the reader is paused, data can be lost on NFS (stale handle) but not on local filesystems (open descriptor preserves data). A rotation_drain_timeout (default 30 seconds) bounds how long the source tries to drain an old file. If back pressure persists beyond this, the source logs a warning and moves on.
Event Structure
Each event is a Record<Event> produced by the configured codec. With the default newline codec:
{
"message": "the log line or assembled multiline block",
"file": {
"path": "/var/log/app/server.log",
"name": "server.log"
},
"offset": 48230
}
File metadata can be disabled via include_file_metadata: false.
Metrics
| Metric |
Type |
Description |
files_active |
Gauge |
Files currently being tailed |
files_opened_total |
Counter |
Total files opened |
files_closed_total |
Counter |
Total files closed |
files_rotated_total |
Counter |
Rotation events detected |
bytes_read_total |
Counter |
Total bytes read |
events_emitted_total |
Counter |
Events written to buffer |
read_errors_total |
Counter |
Read errors |
backpressure_time_total |
Timer |
Time spent waiting on full buffer |
file_lag_bytes |
Gauge |
File size minus reader offset |
truncation_events_total |
Counter |
Copytruncate detections |
data_loss_events_total |
Counter |
Detected or suspected data loss |
acknowledgment_failures_total |
Counter |
Negative acknowledgements |
Configuration
source:
file:
path: "/var/log/app/server.log" # Existing single file (backward compat).
paths: # New: glob patterns. Merged with path.
- "/var/log/app/*.log"
tail: true # New: enable continuous tailing. Default: false.
exclude_paths: [] # Glob patterns to skip.
start_position: "end" # "beginning" or "end" for new files.
poll_interval: "1s" # File change check interval.
encoding: "UTF-8"
read_buffer_size: 65536 # Bytes per read.
max_active_files: 1000 # Concurrent file limit.
reader_threads: 4 # Reader thread pool size.
max_read_time_per_file: "5s" # Yield after this to prevent starvation.
rotate_wait: "5s" # Wait after EOF on rotated file.
rotation_drain_timeout: "30s" # Max time to drain old rotated file.
checkpoint_file: "data/file-checkpoint.json"
checkpoint_interval: "5s" # Flush frequency.
checkpoint_cleanup_after: "7d" # Stale entry TTL.
fingerprint_bytes: 1024 # CRC fingerprint size for NFS.
close_inactive: "30m" # Close idle file handles.
close_removed: true # Close handle when file deleted.
batch_size: 100 # Events per acknowledgement batch.
batch_timeout: "5s"
acknowledgment_timeout: "60s"
max_acknowledgment_retries: 3
include_file_metadata: true
codec: # Standard Data Prepper codec.
newline:
multiline:
pattern: "^\\d{4}-\\d{2}-\\d{2}"
negate: false
match: "after"
max_lines: 500
max_bytes: 1048576
flush_timeout: "5s"
Examples
Basic Application Log
app-logs:
source:
file:
tail: true
paths:
- "/var/log/myapp/*.log"
start_position: "end"
processor:
- grok:
match:
message: ["%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:msg}"]
sink:
- opensearch:
index: "app-logs"
Multiline Stack Traces
java-logs:
source:
file:
tail: true
paths:
- "/var/log/java-app/application.log"
codec:
newline:
multiline:
pattern: "^\\d{4}-\\d{2}-\\d{2}"
negate: false
match: "after"
flush_timeout: "10s"
sink:
- opensearch:
index: "java-logs"
Kubernetes Container Logs
k8s-logs:
source:
file:
tail: true
paths:
- "/var/log/containers/*.log"
exclude_paths:
- "/var/log/containers/dataprepper-*.log"
start_position: "end"
codec:
json: {}
sink:
- opensearch:
index: "k8s-logs"
High Throughput with Acknowledgements
nginx-pipeline:
source:
file:
tail: true
paths:
- "/var/log/nginx/access.log"
read_buffer_size: 262144
reader_threads: 8
batch_size: 500
checkpoint_interval: "1s"
buffer:
bounded_blocking:
buffer_size: 100000
sink:
- opensearch:
index: "nginx-access"
acknowledgements: true
Backward Compatible (existing behavior, no tail)
one-shot-pipeline:
source:
file:
path: "/tmp/sample.log"
format: "plain"
record_type: "event"
sink:
- stdout:
Alternatives Considered
- Fluent Bit in front of Data Prepper. Works but adds a separate process, configuration, and failure point.
- S3 source with file uploads. Not real time. Requires an external process to move files to S3.
- Creating a separate
file_tail plugin. dlvenable's feedback: keep the existing file source and add tail: true. Many of the configurations are valid for any file reading. A new plugin creates unnecessary fragmentation.
- Java WatchService only (no polling). WatchService does not work on NFS, has known bugs on macOS, and silently drops events on overflow. Polling is required as a fallback.
File Tail Source for Data Prepper (v2)
Motivation
Data Prepper has a
filesource that reads a single file once and exits. It is a development utility. There is no way to continuously tail log files with offset tracking, rotation detection, or glob pattern matching.Every major log pipeline (Filebeat, Fluent Bit, Vector, Logstash) ships file tailing as their most fundamental input. Without it, organizations must deploy Fluent Bit or Filebeat in front of Data Prepper just to collect logs from files. That adds operational complexity and another failure point in the pipeline.
Current Behavior
The existing
filesource indata-prepper-plugins/common:plainandjsonformats with optional codec and compressionareAcknowledgementsEnabled()returns falseProposed Solution
Extend the existing
filesource with atailboolean config option (defaultfalse). Whentail: true, continuous tailing with offset tracking, rotation detection, glob patterns, and back pressure handling is enabled. Whentail: false, the current one shot behavior is preserved.The source moves from
data-prepper-plugins/commonto its own Gradle moduledata-prepper-plugins/file-source. The plugin name remainsfile. Existing pipeline configs continue to work without changes.Key Design Decisions
Plugin name stays
file: per dlvenable's feedback, the existing file source is enhanced rather than replaced. Atailboolean controls the mode.Codec handles parsing, multiline is part of the newline codec: per dlvenable's feedback, the standard
codecconfig handles input parsing. Multiline options live under the newline/plain codec, not as top level source config. JSON and NDJSON codecs do not need multiline since they are record delimited.pathandpathscoexist: the existingpath(singular) field continues to work for backward compatibility. A newpaths(plural) field accepts a list of glob patterns. If both are provided, they are merged.Config naming follows project convention: all acknowledgement related config uses
acknowledgment_*notack_*(e.g.,acknowledgment_timeout,max_acknowledgment_retries).Architecture (when
tail: true)File Discovery
The source accepts glob patterns (
/var/log/app/*.log,/var/log/**/*.log) via thepathsconfig and resolves them usingjava.nio.file.PathMatcher. Anexclude_pathsoption skips files matching exclusion patterns. The existingpath(singular) field is also supported and merged intopathsfor backward compatibility.On local filesystems, the source uses
java.nio.file.WatchService(inotifyon Linux,kqueueon macOS) for low latency change detection. On network filesystems (NFS, CIFS),inotifydoes not work. The source detects NFS by checkingFileStore.type()and falls back to periodic polling automatically. On macOS, the JDK WatchService has known event delivery issues, so the source runs a supplementary polling scan regardless.When WatchService delivers an
OVERFLOWevent (OS queue overflowed), the source does a full directory rescan to reconcile state.File Identity
Reliable identity is critical for tracking offsets across renames and rotations.
BasicFileAttributes.fileKey()on Unix systems.fileKey()returns null (Windows, some NFS mounts), the source reads the first N bytes and computes a CRC32 checksum combined with file size and creation time.File Reading
Each active file gets a
FileReaderrunning in a thread pool. The reader usesFileChannel(notBufferedReader) becauseFileChannel.position()provides exact byte offsets for checkpointing.Reading loop:
TimeoutException(buffer full), pause and retry with a configurable interval. Do not advance the offset until the write succeeds.max_read_time_per_file(default 5 seconds), yield the thread to prevent starvation when active files outnumber reader threads.Codec Integration
The source uses the standard Data Prepper
codecconfig for input parsing. The default codec isnewline(one line per event). Multiline options are configured under the newline codec:For JSON input, the
jsoncodec handles record boundaries without multiline:Rotation Handling
Create/rename rotation (logrotate default): The rotation tool renames
app.logtoapp.log.1and creates a newapp.log. The source detects the inode change at the original path. The old reader drains the renamed file to EOF. A new reader starts on the new file at offset 0. Arotation_waittimer handles the gap between rename and create.Copytruncate rotation: The rotation tool copies
app.logto a backup, then truncates the original in place. The source detects that file size dropped below the current offset, resets to 0, and re reads from the start.Copytruncate data loss: Between the copy and the truncate, lines written by the application exist in neither the backup nor the truncated file. This is inherent to copytruncate and cannot be fixed at the reader level. The source documents this and recommends create/rename rotation where possible.
Checkpoint Registry
A JSON file stores
(file identity, offset, path, creation time, last updated, status)per tracked file. Writes use atomic rename (write to temp file, thenFiles.movewithATOMIC_MOVE). If the file is corrupt on startup, the source renames it with a.corruptsuffix and starts fresh.Without acknowledgements: offsets are flushed at a configurable interval (default 5 seconds). On crash, up to 5 seconds of data may be re delivered. At least once semantics.
With acknowledgements: offsets are committed only when the downstream pipeline acknowledges the batch. The registry maintains two offsets per file:
readOffset(where the reader currently is) andcommittedOffset(last acknowledged). On restart, the source resumes fromcommittedOffset.Cleanup: entries for deleted files older than
checkpoint_cleanup_after(default 7 days) are pruned hourly. This prevents unbounded registry growth in container environments.Acknowledgement Integration
When the pipeline has
acknowledgements: true, the source creates anAcknowledgementSetper batch (configurablebatch_size, default 100 events). Each event is added to the set after successful buffer write. The source registers a callback:acknowledgment_timeoutdefaults to 60 seconds.A
max_acknowledgment_retries(default 3) prevents infinite retry loops on persistently failing batches.Back Pressure
When the buffer is full, the reader pauses and retries. While paused, the file handle stays open. On Linux, even if the file is deleted, the open descriptor keeps the data on disk.
The critical interaction is back pressure + rotation + deletion. If a file is rotated and the old copy is deleted while the reader is paused, data can be lost on NFS (stale handle) but not on local filesystems (open descriptor preserves data). A
rotation_drain_timeout(default 30 seconds) bounds how long the source tries to drain an old file. If back pressure persists beyond this, the source logs a warning and moves on.Event Structure
Each event is a
Record<Event>produced by the configured codec. With the default newline codec:{ "message": "the log line or assembled multiline block", "file": { "path": "/var/log/app/server.log", "name": "server.log" }, "offset": 48230 }File metadata can be disabled via
include_file_metadata: false.Metrics
files_activefiles_opened_totalfiles_closed_totalfiles_rotated_totalbytes_read_totalevents_emitted_totalread_errors_totalbackpressure_time_totalfile_lag_bytestruncation_events_totaldata_loss_events_totalacknowledgment_failures_totalConfiguration
Examples
Basic Application Log
Multiline Stack Traces
Kubernetes Container Logs
High Throughput with Acknowledgements
Backward Compatible (existing behavior, no tail)
Alternatives Considered
file_tailplugin. dlvenable's feedback: keep the existingfilesource and addtail: true. Many of the configurations are valid for any file reading. A new plugin creates unnecessary fragmentation.