Skip to content

[Feature] - File Tail Source Plugin #6782

@srikanthpadakanti

Description

@srikanthpadakanti

File Tail Source for Data Prepper (v2)

Motivation

Data Prepper has a file source that reads a single file once and exits. It is a development utility. There is no way to continuously tail log files with offset tracking, rotation detection, or glob pattern matching.

Every major log pipeline (Filebeat, Fluent Bit, Vector, Logstash) ships file tailing as their most fundamental input. Without it, organizations must deploy Fluent Bit or Filebeat in front of Data Prepper just to collect logs from files. That adds operational complexity and another failure point in the pipeline.

Current Behavior

The existing file source in data-prepper-plugins/common:

  • Reads a single file path once from start to end, then the source thread exits
  • Supports plain and json formats with optional codec and compression
  • Writes each line to the buffer with a fixed 5 second timeout. If the buffer is full, the write fails and the line is lost
  • No glob patterns. No watching for new files
  • No offset tracking. On restart, the entire file is re read or skipped entirely
  • No rotation handling. If the file is renamed mid read, the source does not follow it
  • No multiline assembly. Each line is one event regardless of content
  • No acknowledgement support. areAcknowledgementsEnabled() returns false
  • No file metadata on events (no path, no offset, no line number)

Proposed Solution

Extend the existing file source with a tail boolean config option (default false). When tail: true, continuous tailing with offset tracking, rotation detection, glob patterns, and back pressure handling is enabled. When tail: false, the current one shot behavior is preserved.

The source moves from data-prepper-plugins/common to its own Gradle module data-prepper-plugins/file-source. The plugin name remains file. Existing pipeline configs continue to work without changes.

Key Design Decisions

Plugin name stays file: per dlvenable's feedback, the existing file source is enhanced rather than replaced. A tail boolean controls the mode.

Codec handles parsing, multiline is part of the newline codec: per dlvenable's feedback, the standard codec config handles input parsing. Multiline options live under the newline/plain codec, not as top level source config. JSON and NDJSON codecs do not need multiline since they are record delimited.

path and paths coexist: the existing path (singular) field continues to work for backward compatibility. A new paths (plural) field accepts a list of glob patterns. If both are provided, they are merged.

Config naming follows project convention: all acknowledgement related config uses acknowledgment_* not ack_* (e.g., acknowledgment_timeout, max_acknowledgment_retries).

Architecture (when tail: true)

   FileSource (Source<Record<Event>>)
        |
        +-- start(Buffer)
        |     |
        |     +-- DirectoryWatcher (thread)
        |     |     Monitors paths via WatchService or polling.
        |     |     Discovers new files matching glob.
        |     |     Detects rotation events.
        |     |
        |     +-- FileReaderPool (thread pool)
        |     |     One FileReader per active file.
        |     |     Reads via FileChannel with byte offset tracking.
        |     |     Writes Record<Event> to Buffer via codec.
        |     |     Pauses on back pressure, resumes when buffer has space.
        |     |     max_read_time_per_file prevents starvation.
        |     |
        |     +-- CheckpointRegistry
        |     |     Persists (file identity, offset) to disk as JSON.
        |     |     Atomic writes via temp file + rename.
        |     |     Periodic flush + flush on acknowledgement callback.
        |     |
        |     +-- RotationDetector
        |           Detects inode change at path (create/rename rotation).
        |           Detects offset > file size (copytruncate rotation).
        |           Drains old file before switching to new.
        |
        +-- stop()
              Signals all threads to stop.
              Persists final checkpoint.
              Closes all file handles.

File Discovery

The source accepts glob patterns (/var/log/app/*.log, /var/log/**/*.log) via the paths config and resolves them using java.nio.file.PathMatcher. An exclude_paths option skips files matching exclusion patterns. The existing path (singular) field is also supported and merged into paths for backward compatibility.

On local filesystems, the source uses java.nio.file.WatchService (inotify on Linux, kqueue on macOS) for low latency change detection. On network filesystems (NFS, CIFS), inotify does not work. The source detects NFS by checking FileStore.type() and falls back to periodic polling automatically. On macOS, the JDK WatchService has known event delivery issues, so the source runs a supplementary polling scan regardless.

When WatchService delivers an OVERFLOW event (OS queue overflowed), the source does a full directory rescan to reconcile state.

File Identity

Reliable identity is critical for tracking offsets across renames and rotations.

  1. Primary: inode + device. Extracted from BasicFileAttributes.fileKey() on Unix systems.
  2. Fallback: CRC fingerprint. If fileKey() returns null (Windows, some NFS mounts), the source reads the first N bytes and computes a CRC32 checksum combined with file size and creation time.
  3. Inode reuse guard. On ext4, deleted inodes are recycled quickly. The source compares creation time against the registry entry. If creation times differ, the file is treated as new regardless of matching inode.

File Reading

Each active file gets a FileReader running in a thread pool. The reader uses FileChannel (not BufferedReader) because FileChannel.position() provides exact byte offsets for checkpointing.

Reading loop:

  1. Read into a ByteBuffer (default 64 KB)
  2. Pass bytes to the configured codec for parsing
  3. Write events to the Buffer
  4. On TimeoutException (buffer full), pause and retry with a configurable interval. Do not advance the offset until the write succeeds.
  5. After max_read_time_per_file (default 5 seconds), yield the thread to prevent starvation when active files outnumber reader threads.

Codec Integration

The source uses the standard Data Prepper codec config for input parsing. The default codec is newline (one line per event). Multiline options are configured under the newline codec:

source:
  file:
    tail: true
    paths:
      - "/var/log/app/*.log"
    codec:
      newline:
        multiline:
          pattern: "^\\d{4}-\\d{2}-\\d{2}"
          negate: false
          match: "after"
          max_lines: 500
          max_bytes: 1048576
          flush_timeout: "5s"

For JSON input, the json codec handles record boundaries without multiline:

source:
  file:
    tail: true
    paths:
      - "/var/log/app/*.json"
    codec:
      json: {}

Rotation Handling

Create/rename rotation (logrotate default): The rotation tool renames app.log to app.log.1 and creates a new app.log. The source detects the inode change at the original path. The old reader drains the renamed file to EOF. A new reader starts on the new file at offset 0. A rotation_wait timer handles the gap between rename and create.

Copytruncate rotation: The rotation tool copies app.log to a backup, then truncates the original in place. The source detects that file size dropped below the current offset, resets to 0, and re reads from the start.

Copytruncate data loss: Between the copy and the truncate, lines written by the application exist in neither the backup nor the truncated file. This is inherent to copytruncate and cannot be fixed at the reader level. The source documents this and recommends create/rename rotation where possible.

Checkpoint Registry

A JSON file stores (file identity, offset, path, creation time, last updated, status) per tracked file. Writes use atomic rename (write to temp file, then Files.move with ATOMIC_MOVE). If the file is corrupt on startup, the source renames it with a .corrupt suffix and starts fresh.

Without acknowledgements: offsets are flushed at a configurable interval (default 5 seconds). On crash, up to 5 seconds of data may be re delivered. At least once semantics.

With acknowledgements: offsets are committed only when the downstream pipeline acknowledges the batch. The registry maintains two offsets per file: readOffset (where the reader currently is) and committedOffset (last acknowledged). On restart, the source resumes from committedOffset.

Cleanup: entries for deleted files older than checkpoint_cleanup_after (default 7 days) are pruned hourly. This prevents unbounded registry growth in container environments.

Acknowledgement Integration

When the pipeline has acknowledgements: true, the source creates an AcknowledgementSet per batch (configurable batch_size, default 100 events). Each event is added to the set after successful buffer write. The source registers a callback:

  • Positive ack: commit the batch end offset to the registry.
  • Negative ack: do not advance the offset. The batch will be re read on the next cycle. This is standard at least once behavior, consistent with S3 and Kafka sources.
  • Timeout: treat as negative ack. acknowledgment_timeout defaults to 60 seconds.

A max_acknowledgment_retries (default 3) prevents infinite retry loops on persistently failing batches.

Back Pressure

When the buffer is full, the reader pauses and retries. While paused, the file handle stays open. On Linux, even if the file is deleted, the open descriptor keeps the data on disk.

The critical interaction is back pressure + rotation + deletion. If a file is rotated and the old copy is deleted while the reader is paused, data can be lost on NFS (stale handle) but not on local filesystems (open descriptor preserves data). A rotation_drain_timeout (default 30 seconds) bounds how long the source tries to drain an old file. If back pressure persists beyond this, the source logs a warning and moves on.

Event Structure

Each event is a Record<Event> produced by the configured codec. With the default newline codec:

{
  "message": "the log line or assembled multiline block",
  "file": {
    "path": "/var/log/app/server.log",
    "name": "server.log"
  },
  "offset": 48230
}

File metadata can be disabled via include_file_metadata: false.

Metrics

Metric Type Description
files_active Gauge Files currently being tailed
files_opened_total Counter Total files opened
files_closed_total Counter Total files closed
files_rotated_total Counter Rotation events detected
bytes_read_total Counter Total bytes read
events_emitted_total Counter Events written to buffer
read_errors_total Counter Read errors
backpressure_time_total Timer Time spent waiting on full buffer
file_lag_bytes Gauge File size minus reader offset
truncation_events_total Counter Copytruncate detections
data_loss_events_total Counter Detected or suspected data loss
acknowledgment_failures_total Counter Negative acknowledgements

Configuration

source:
  file:
    path: "/var/log/app/server.log"       # Existing single file (backward compat).
    paths:                                 # New: glob patterns. Merged with path.
      - "/var/log/app/*.log"
    tail: true                             # New: enable continuous tailing. Default: false.
    exclude_paths: []                      # Glob patterns to skip.
    start_position: "end"                  # "beginning" or "end" for new files.
    poll_interval: "1s"                    # File change check interval.
    encoding: "UTF-8"
    read_buffer_size: 65536                # Bytes per read.
    max_active_files: 1000                 # Concurrent file limit.
    reader_threads: 4                      # Reader thread pool size.
    max_read_time_per_file: "5s"           # Yield after this to prevent starvation.
    rotate_wait: "5s"                      # Wait after EOF on rotated file.
    rotation_drain_timeout: "30s"          # Max time to drain old rotated file.
    checkpoint_file: "data/file-checkpoint.json"
    checkpoint_interval: "5s"              # Flush frequency.
    checkpoint_cleanup_after: "7d"         # Stale entry TTL.
    fingerprint_bytes: 1024                # CRC fingerprint size for NFS.
    close_inactive: "30m"                  # Close idle file handles.
    close_removed: true                    # Close handle when file deleted.
    batch_size: 100                        # Events per acknowledgement batch.
    batch_timeout: "5s"
    acknowledgment_timeout: "60s"
    max_acknowledgment_retries: 3
    include_file_metadata: true
    codec:                                 # Standard Data Prepper codec.
      newline:
        multiline:
          pattern: "^\\d{4}-\\d{2}-\\d{2}"
          negate: false
          match: "after"
          max_lines: 500
          max_bytes: 1048576
          flush_timeout: "5s"

Examples

Basic Application Log

app-logs:
  source:
    file:
      tail: true
      paths:
        - "/var/log/myapp/*.log"
      start_position: "end"
  processor:
    - grok:
        match:
          message: ["%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:msg}"]
  sink:
    - opensearch:
        index: "app-logs"

Multiline Stack Traces

java-logs:
  source:
    file:
      tail: true
      paths:
        - "/var/log/java-app/application.log"
      codec:
        newline:
          multiline:
            pattern: "^\\d{4}-\\d{2}-\\d{2}"
            negate: false
            match: "after"
            flush_timeout: "10s"
  sink:
    - opensearch:
        index: "java-logs"

Kubernetes Container Logs

k8s-logs:
  source:
    file:
      tail: true
      paths:
        - "/var/log/containers/*.log"
      exclude_paths:
        - "/var/log/containers/dataprepper-*.log"
      start_position: "end"
      codec:
        json: {}
  sink:
    - opensearch:
        index: "k8s-logs"

High Throughput with Acknowledgements

nginx-pipeline:
  source:
    file:
      tail: true
      paths:
        - "/var/log/nginx/access.log"
      read_buffer_size: 262144
      reader_threads: 8
      batch_size: 500
      checkpoint_interval: "1s"
  buffer:
    bounded_blocking:
      buffer_size: 100000
  sink:
    - opensearch:
        index: "nginx-access"
  acknowledgements: true

Backward Compatible (existing behavior, no tail)

one-shot-pipeline:
  source:
    file:
      path: "/tmp/sample.log"
      format: "plain"
      record_type: "event"
  sink:
    - stdout:

Alternatives Considered

  • Fluent Bit in front of Data Prepper. Works but adds a separate process, configuration, and failure point.
  • S3 source with file uploads. Not real time. Requires an external process to move files to S3.
  • Creating a separate file_tail plugin. dlvenable's feedback: keep the existing file source and add tail: true. Many of the configurations are valid for any file reading. A new plugin creates unnecessary fragmentation.
  • Java WatchService only (no polling). WatchService does not work on NFS, has known bugs on macOS, and silently drops events on overflow. Polling is required as a fallback.

Metadata

Metadata

Labels

enhancementNew feature or request

Type

No type
No fields configured for issues without a type.

Projects

Status

In review

Relationships

None yet

Development

No branches or pull requests

Issue actions