Skip to content

Commit 1176f29

Browse files
HolyGrailclaude
andauthored
Fix unbounded growth of stream state in timeout handling (#150)
flush_buffer leaves an empty entry behind in @buffer/@buffer_size, and flush_timeout_buffer skips expired streams whose buffer is empty without removing them from @timeout_map. Streams which complete normally therefore leave entries in all three hashes forever. This is especially harmful in partial metadata mode: every split message has a unique partial_id and thus a unique stream identity, so a long-running fluentd accumulates state for every split message it has ever processed. Memory grows without bound, and the periodic flush_timeout_buffer scan (which iterates the whole @timeout_map every second while holding @timeout_map_mutex) consumes more and more CPU and increasingly delays filter_stream. Purge expired streams from @timeout_map, @buffer and @buffer_size whether or not they still have buffered content. In-flight streams within flush_interval are untouched. The new tests advance the frozen Fluent::Engine.now (Fluent::Test.setup memoizes it) instead of sleeping, because the expiry condition can never become true under the frozen clock; this is also why existing timeout tests only exercised the shutdown path. Co-authored-by: Claude Fable 5 <noreply@anthropic.com>
1 parent ba79741 commit 1176f29

2 files changed

Lines changed: 112 additions & 4 deletions

File tree

lib/fluent/plugin/filter_concat.rb

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -433,20 +433,27 @@ def flush_buffer(stream_identity, new_element = nil)
433433

434434
def flush_timeout_buffer
435435
now = Fluent::Engine.now
436-
timeout_stream_identities = []
436+
expired_stream_identities = []
437437
@timeout_map_mutex.synchronize do
438438
@timeout_map.each do |stream_identity, previous_timestamp|
439439
next if @flush_interval > (now - previous_timestamp)
440+
expired_stream_identities << stream_identity
440441
next if @buffer[stream_identity].empty?
441442
time, flushed_record = flush_buffer(stream_identity)
442-
timeout_stream_identities << stream_identity
443443
tag = stream_identity.split(":").first
444444
message = "Timeout flush: #{stream_identity}"
445445
handle_timeout_error(tag, @use_first_timestamp ? time : now, flushed_record, message)
446446
log.info(message)
447447
end
448-
@timeout_map.reject! do |stream_identity, _|
449-
timeout_stream_identities.include?(stream_identity)
448+
# Purge expired streams from all the state hashes, not only the
449+
# flushed ones. Streams which completed normally leave empty entries
450+
# behind, so @buffer/@buffer_size/@timeout_map would otherwise grow
451+
# unboundedly (notably in partial metadata mode where every split
452+
# message has a unique stream identity).
453+
expired_stream_identities.each do |stream_identity|
454+
@timeout_map.delete(stream_identity)
455+
@buffer.delete(stream_identity)
456+
@buffer_size.delete(stream_identity)
450457
end
451458
end
452459
end

test/plugin/test_filter_concat.rb

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1070,6 +1070,107 @@ def filter_with_time(conf, messages, wait: nil)
10701070
end
10711071
end
10721072

1073+
sub_test_case "stale stream state cleanup" do
1074+
def partial_messages(partial_id)
1075+
[
1076+
{
1077+
"container_id" => "1",
1078+
"message" => "start",
1079+
"partial_message" => "true",
1080+
"partial_id" => partial_id,
1081+
"partial_ordinal" => "1",
1082+
"partial_last" => "false"
1083+
},
1084+
{
1085+
"container_id" => "1",
1086+
"message" => "end",
1087+
"partial_message" => "true",
1088+
"partial_id" => partial_id,
1089+
"partial_ordinal" => "2",
1090+
"partial_last" => "true"
1091+
},
1092+
]
1093+
end
1094+
1095+
def teardown
1096+
Fluent::Engine.now = @time
1097+
end
1098+
1099+
# Fluent::Test.setup freezes Fluent::Engine.now, so real sleeps never
1100+
# make streams expire. Advance the frozen clock instead and wait for the
1101+
# next tick of the periodic timer (1s interval) to run
1102+
# flush_timeout_buffer with the advanced clock.
1103+
def advance_clock_and_snapshot_state(d, seconds)
1104+
Fluent::Engine.now = Fluent::EventTime.new(@time.sec + seconds, @time.nsec)
1105+
sleep 2
1106+
d.instance.instance_variable_get(:@timeout_map_mutex).synchronize do
1107+
{
1108+
buffer: d.instance.instance_variable_get(:@buffer).dup,
1109+
buffer_size: d.instance.instance_variable_get(:@buffer_size).dup,
1110+
timeout_map: d.instance.instance_variable_get(:@timeout_map).dup,
1111+
}
1112+
end
1113+
end
1114+
1115+
def run_and_snapshot_state(config, messages, advance:)
1116+
d = create_driver(config)
1117+
state = nil
1118+
d.run(default_tag: "test") do
1119+
sleep 0.1 # run event loop
1120+
messages.each do |message|
1121+
d.feed(@time, message)
1122+
end
1123+
state = advance_clock_and_snapshot_state(d, advance)
1124+
end
1125+
[d, state]
1126+
end
1127+
1128+
test "purges state for streams which completed normally" do
1129+
config = <<-CONFIG
1130+
key message
1131+
use_partial_metadata true
1132+
flush_interval 1s
1133+
CONFIG
1134+
messages = partial_messages("partial1") + partial_messages("partial2")
1135+
d, state = run_and_snapshot_state(config, messages, advance: 2)
1136+
expected = [
1137+
{ "container_id" => "1", "message" => "start\nend" },
1138+
{ "container_id" => "1", "message" => "start\nend" },
1139+
]
1140+
assert_equal(expected, d.filtered_records)
1141+
assert_equal({}, state[:buffer])
1142+
assert_equal({}, state[:buffer_size])
1143+
assert_equal({}, state[:timeout_map])
1144+
end
1145+
1146+
test "purges state for streams flushed by timeout" do
1147+
config = <<-CONFIG
1148+
key message
1149+
use_partial_metadata true
1150+
flush_interval 1s
1151+
CONFIG
1152+
messages = [partial_messages("partial1").first]
1153+
d, state = run_and_snapshot_state(config, messages, advance: 2)
1154+
assert(d.logs.any? {|line| line.include?("Timeout flush: test:partial1") })
1155+
assert_equal({}, state[:buffer])
1156+
assert_equal({}, state[:buffer_size])
1157+
assert_equal({}, state[:timeout_map])
1158+
end
1159+
1160+
test "keeps state for in-flight streams within flush_interval" do
1161+
config = <<-CONFIG
1162+
key message
1163+
use_partial_metadata true
1164+
flush_interval 30s
1165+
CONFIG
1166+
messages = [partial_messages("partial1").first]
1167+
d, state = run_and_snapshot_state(config, messages, advance: 2)
1168+
assert_equal(["test:partial1"], state[:buffer].keys)
1169+
assert_equal(1, state[:buffer]["test:partial1"].size)
1170+
assert_equal(["test:partial1"], state[:timeout_map].keys)
1171+
end
1172+
end
1173+
10731174
sub_test_case "overflow" do
10741175
test "ignore" do
10751176
config = <<-CONFIG

0 commit comments

Comments
 (0)