Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 12 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,38 +124,34 @@ Specify first line of multiline by regular expression.
</filter>
```

You can handle timeout events and remaining buffers on shutdown this plugin.
If you want to intentionally separate timeout logs or handle remaining buffers on plugin shutdown, you can use the `timeout_label` option to route them to a specific label.

```aconf
<label @ERROR>
<filter docker.log>
@type concat
key message
multiline_start_regexp /^Start/
flush_interval 5
timeout_label @TIMEOUT_HANDLER
</filter>

<label @TIMEOUT_HANDLER>
<match docker.log>
@type file
path /path/to/error.log
path /path/to/timeout_and_shutdown.log
</match>
</label>
```

Handle timeout log lines the same as normal logs.
By default, when a timeout occurs while waiting for the next multiline log, the plugin outputs an info log and emits the buffered logs normally to the next pipeline. You don't need any special configuration to keep your last log lines.

```aconf
<filter **>
@type concat
key message
multiline_start_regexp /^Start/
flush_interval 5
timeout_label @NORMAL
</filter>

<match **>
@type relabel
@label @NORMAL
</match>

<label @NORMAL>
<match **>
@type stdout
</match>
</label>
```

Handle single line JSON from Docker containers.
Expand Down
10 changes: 5 additions & 5 deletions lib/fluent/plugin/filter_concat.rb
Original file line number Diff line number Diff line change
Expand Up @@ -460,19 +460,19 @@ def flush_remaining_buffer
@key => lines.join(@separator)
}
tag, time, record = elements.first
message = "Flush remaining buffer: #{stream_identity}"
handle_timeout_error(tag, time, record.merge(new_record), message)
log.info(message)
handle_timeout_error(tag, time, record.merge(new_record))
log.info("Flush remaining buffer: #{stream_identity}")
end
@buffer.clear
end

def handle_timeout_error(tag, time, record, message)
def handle_timeout_error(tag, time, record)
if @timeout_label
event_router = event_emitter_router(@timeout_label)
event_router.emit(tag, time, record)
else
router.emit_error_event(tag, time, record, TimeoutError.new(message))
log.info "Timeout occurred while waiting for the next multiline log. Emitting the buffered log normally. tag: #{tag}"
router.emit(tag, time, record)
end
end

Expand Down
13 changes: 7 additions & 6 deletions test/plugin/test_filter_concat.rb
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def filter_with_time(conf, messages, wait: nil)
]
filtered = filter(CONFIG + "flush_interval 2s", messages, wait: 3) do |d|
errored = { "container_id" => "1", "message" => "message 1\nmessage 2" }
mock(d.instance.router).emit_error_event("test", anything, errored, anything)
mock(d.instance.router).emit("test", anything, errored)
end
assert_equal([], filtered)
end
Expand Down Expand Up @@ -307,8 +307,8 @@ def filter_with_time(conf, messages, wait: nil)
errored1 = { "container_id" => "1", "message" => "start" }
errored2 = { "container_id" => "2", "message" => "start" }
router = d.instance.router
mock(router).emit_error_event("test", anything, errored1, anything)
mock(router).emit_error_event("test", anything, errored2, anything)
mock(router).emit("test", anything, errored1)
mock(router).emit("test", anything, errored2)
end
assert_equal(expected, filtered)
end
Expand Down Expand Up @@ -456,7 +456,7 @@ def filter_with_time(conf, messages, wait: nil)
]
filtered = filter(config, messages, wait: 3) do |d|
errored = { "container_id" => "1", "message" => "start\n message 1\n message 2" }
mock(d.instance.router).emit_error_event("test", anything, errored, anything)
mock(d.instance.router).emit("test", anything, errored)
end
assert_equal([], filtered)
end
Expand Down Expand Up @@ -976,7 +976,7 @@ def filter_with_time(conf, messages, wait: nil)
]
filtered = filter_with_time(config, messages, wait: 3) do |d|
errored = { "container_id" => "1", "message" => "start\n message 3\n message 4" }
mock(d.instance.router).emit_error_event("test", @time, errored, anything)
mock(d.instance.router).emit("test", @time, errored)
end
expected = [
[@time, { "container_id" => "1", "message" => "start\n message 1\n message 2" }]
Expand All @@ -1000,7 +1000,7 @@ def filter_with_time(conf, messages, wait: nil)
filtered = filter_with_time(config, messages, wait: 3) do |d|
mock(d.instance).flush_timeout_buffer.at_most(0)
errored = { "container_id" => "1", "message" => "start" }
mock(d.instance.router).emit_error_event("test", @time, errored, anything)
mock(d.instance.router).emit("test", @time, errored)
end
expected = [
[@time, { "container_id" => "1", "message" => "start\n message 1\n message 2" }]
Expand Down Expand Up @@ -1060,6 +1060,7 @@ def filter_with_time(conf, messages, wait: nil)
"[error]: failed to flush timeout buffer error_class=StandardError error=\"timeout\"",
"[error]: failed to flush timeout buffer error_class=StandardError error=\"timeout\"",
"[error]: failed to flush timeout buffer error_class=StandardError error=\"timeout\"",
"[info]: Timeout occurred while waiting for the next multiline log. Emitting the buffered log normally. tag: test",
"[info]: Flush remaining buffer: test:default"
]
log_messages = logs.map do |line|
Expand Down
Loading