diff --git a/lib/fluent/plugin/filter_concat.rb b/lib/fluent/plugin/filter_concat.rb index 3a3f622..616e91d 100644 --- a/lib/fluent/plugin/filter_concat.rb +++ b/lib/fluent/plugin/filter_concat.rb @@ -433,20 +433,27 @@ def flush_buffer(stream_identity, new_element = nil) def flush_timeout_buffer now = Fluent::Engine.now - timeout_stream_identities = [] + expired_stream_identities = [] @timeout_map_mutex.synchronize do @timeout_map.each do |stream_identity, previous_timestamp| next if @flush_interval > (now - previous_timestamp) + expired_stream_identities << stream_identity next if @buffer[stream_identity].empty? time, flushed_record = flush_buffer(stream_identity) - timeout_stream_identities << stream_identity tag = stream_identity.split(":").first message = "Timeout flush: #{stream_identity}" handle_timeout_error(tag, @use_first_timestamp ? time : now, flushed_record, message) log.info(message) end - @timeout_map.reject! do |stream_identity, _| - timeout_stream_identities.include?(stream_identity) + # Purge expired streams from all the state hashes, not only the + # flushed ones. Streams which completed normally leave empty entries + # behind, so @buffer/@buffer_size/@timeout_map would otherwise grow + # unboundedly (notably in partial metadata mode where every split + # message has a unique stream identity). + expired_stream_identities.each do |stream_identity| + @timeout_map.delete(stream_identity) + @buffer.delete(stream_identity) + @buffer_size.delete(stream_identity) end end end diff --git a/test/plugin/test_filter_concat.rb b/test/plugin/test_filter_concat.rb index 9f06a60..0121719 100644 --- a/test/plugin/test_filter_concat.rb +++ b/test/plugin/test_filter_concat.rb @@ -1070,6 +1070,107 @@ def filter_with_time(conf, messages, wait: nil) end end + sub_test_case "stale stream state cleanup" do + def partial_messages(partial_id) + [ + { + "container_id" => "1", + "message" => "start", + "partial_message" => "true", + "partial_id" => partial_id, + "partial_ordinal" => "1", + "partial_last" => "false" + }, + { + "container_id" => "1", + "message" => "end", + "partial_message" => "true", + "partial_id" => partial_id, + "partial_ordinal" => "2", + "partial_last" => "true" + }, + ] + end + + def teardown + Fluent::Engine.now = @time + end + + # Fluent::Test.setup freezes Fluent::Engine.now, so real sleeps never + # make streams expire. Advance the frozen clock instead and wait for the + # next tick of the periodic timer (1s interval) to run + # flush_timeout_buffer with the advanced clock. + def advance_clock_and_snapshot_state(d, seconds) + Fluent::Engine.now = Fluent::EventTime.new(@time.sec + seconds, @time.nsec) + sleep 2 + d.instance.instance_variable_get(:@timeout_map_mutex).synchronize do + { + buffer: d.instance.instance_variable_get(:@buffer).dup, + buffer_size: d.instance.instance_variable_get(:@buffer_size).dup, + timeout_map: d.instance.instance_variable_get(:@timeout_map).dup, + } + end + end + + def run_and_snapshot_state(config, messages, advance:) + d = create_driver(config) + state = nil + d.run(default_tag: "test") do + sleep 0.1 # run event loop + messages.each do |message| + d.feed(@time, message) + end + state = advance_clock_and_snapshot_state(d, advance) + end + [d, state] + end + + test "purges state for streams which completed normally" do + config = <<-CONFIG + key message + use_partial_metadata true + flush_interval 1s + CONFIG + messages = partial_messages("partial1") + partial_messages("partial2") + d, state = run_and_snapshot_state(config, messages, advance: 2) + expected = [ + { "container_id" => "1", "message" => "start\nend" }, + { "container_id" => "1", "message" => "start\nend" }, + ] + assert_equal(expected, d.filtered_records) + assert_equal({}, state[:buffer]) + assert_equal({}, state[:buffer_size]) + assert_equal({}, state[:timeout_map]) + end + + test "purges state for streams flushed by timeout" do + config = <<-CONFIG + key message + use_partial_metadata true + flush_interval 1s + CONFIG + messages = [partial_messages("partial1").first] + d, state = run_and_snapshot_state(config, messages, advance: 2) + assert(d.logs.any? {|line| line.include?("Timeout flush: test:partial1") }) + assert_equal({}, state[:buffer]) + assert_equal({}, state[:buffer_size]) + assert_equal({}, state[:timeout_map]) + end + + test "keeps state for in-flight streams within flush_interval" do + config = <<-CONFIG + key message + use_partial_metadata true + flush_interval 30s + CONFIG + messages = [partial_messages("partial1").first] + d, state = run_and_snapshot_state(config, messages, advance: 2) + assert_equal(["test:partial1"], state[:buffer].keys) + assert_equal(1, state[:buffer]["test:partial1"].size) + assert_equal(["test:partial1"], state[:timeout_map].keys) + end + end + sub_test_case "overflow" do test "ignore" do config = <<-CONFIG