Skip to content

Commit 75d12ae

Browse files
committed
Emit buffered logs normally on timeout instead of throwing error
Previously, when a timeout occurred while waiting for the next multiline log, the plugin raised a `TimeoutError` via `emit_error_event`. This caused the remaining buffered logs—most notably the last line of log outputs—to be dropped or incorrectly routed. While the plugin provides a `timeout_label` option to route logs on timeout, users often use it without realizing they must explicitly configure a `<label>` block to catch and handle the rerouted logs. Without this awareness, it simply results in silent data loss. This commit changes the behavior to log an info message and emit the buffered record normally using `router.emit` when no `timeout_label` is set, ensuring no logs are lost by default. Signed-off-by: Shizuo Fujita <fujita@clear-code.com>
1 parent 602a134 commit 75d12ae

2 files changed

Lines changed: 12 additions & 11 deletions

File tree

lib/fluent/plugin/filter_concat.rb

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -460,19 +460,19 @@ def flush_remaining_buffer
460460
@key => lines.join(@separator)
461461
}
462462
tag, time, record = elements.first
463-
message = "Flush remaining buffer: #{stream_identity}"
464-
handle_timeout_error(tag, time, record.merge(new_record), message)
465-
log.info(message)
463+
handle_timeout_error(tag, time, record.merge(new_record))
464+
log.info("Flush remaining buffer: #{stream_identity}")
466465
end
467466
@buffer.clear
468467
end
469468

470-
def handle_timeout_error(tag, time, record, message)
469+
def handle_timeout_error(tag, time, record)
471470
if @timeout_label
472471
event_router = event_emitter_router(@timeout_label)
473472
event_router.emit(tag, time, record)
474473
else
475-
router.emit_error_event(tag, time, record, TimeoutError.new(message))
474+
log.info "Timeout occurred while waiting for the next multiline log. Emitting the buffered log normally. tag: #{tag}"
475+
router.emit(tag, time, record)
476476
end
477477
end
478478

test/plugin/test_filter_concat.rb

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ def filter_with_time(conf, messages, wait: nil)
224224
]
225225
filtered = filter(CONFIG + "flush_interval 2s", messages, wait: 3) do |d|
226226
errored = { "container_id" => "1", "message" => "message 1\nmessage 2" }
227-
mock(d.instance.router).emit_error_event("test", anything, errored, anything)
227+
mock(d.instance.router).emit("test", anything, errored)
228228
end
229229
assert_equal([], filtered)
230230
end
@@ -307,8 +307,8 @@ def filter_with_time(conf, messages, wait: nil)
307307
errored1 = { "container_id" => "1", "message" => "start" }
308308
errored2 = { "container_id" => "2", "message" => "start" }
309309
router = d.instance.router
310-
mock(router).emit_error_event("test", anything, errored1, anything)
311-
mock(router).emit_error_event("test", anything, errored2, anything)
310+
mock(router).emit("test", anything, errored1)
311+
mock(router).emit("test", anything, errored2)
312312
end
313313
assert_equal(expected, filtered)
314314
end
@@ -456,7 +456,7 @@ def filter_with_time(conf, messages, wait: nil)
456456
]
457457
filtered = filter(config, messages, wait: 3) do |d|
458458
errored = { "container_id" => "1", "message" => "start\n message 1\n message 2" }
459-
mock(d.instance.router).emit_error_event("test", anything, errored, anything)
459+
mock(d.instance.router).emit("test", anything, errored)
460460
end
461461
assert_equal([], filtered)
462462
end
@@ -976,7 +976,7 @@ def filter_with_time(conf, messages, wait: nil)
976976
]
977977
filtered = filter_with_time(config, messages, wait: 3) do |d|
978978
errored = { "container_id" => "1", "message" => "start\n message 3\n message 4" }
979-
mock(d.instance.router).emit_error_event("test", @time, errored, anything)
979+
mock(d.instance.router).emit("test", @time, errored)
980980
end
981981
expected = [
982982
[@time, { "container_id" => "1", "message" => "start\n message 1\n message 2" }]
@@ -1000,7 +1000,7 @@ def filter_with_time(conf, messages, wait: nil)
10001000
filtered = filter_with_time(config, messages, wait: 3) do |d|
10011001
mock(d.instance).flush_timeout_buffer.at_most(0)
10021002
errored = { "container_id" => "1", "message" => "start" }
1003-
mock(d.instance.router).emit_error_event("test", @time, errored, anything)
1003+
mock(d.instance.router).emit("test", @time, errored)
10041004
end
10051005
expected = [
10061006
[@time, { "container_id" => "1", "message" => "start\n message 1\n message 2" }]
@@ -1060,6 +1060,7 @@ def filter_with_time(conf, messages, wait: nil)
10601060
"[error]: failed to flush timeout buffer error_class=StandardError error=\"timeout\"",
10611061
"[error]: failed to flush timeout buffer error_class=StandardError error=\"timeout\"",
10621062
"[error]: failed to flush timeout buffer error_class=StandardError error=\"timeout\"",
1063+
"[info]: Timeout occurred while waiting for the next multiline log. Emitting the buffered log normally. tag: test",
10631064
"[info]: Flush remaining buffer: test:default"
10641065
]
10651066
log_messages = logs.map do |line|

0 commit comments

Comments
 (0)