Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions lib/fluent/plugin/filter_concat.rb
Original file line number Diff line number Diff line change
Expand Up @@ -433,20 +433,27 @@ def flush_buffer(stream_identity, new_element = nil)

def flush_timeout_buffer
now = Fluent::Engine.now
timeout_stream_identities = []
expired_stream_identities = []
@timeout_map_mutex.synchronize do
@timeout_map.each do |stream_identity, previous_timestamp|
next if @flush_interval > (now - previous_timestamp)
expired_stream_identities << stream_identity
next if @buffer[stream_identity].empty?
time, flushed_record = flush_buffer(stream_identity)
timeout_stream_identities << stream_identity
tag = stream_identity.split(":").first
message = "Timeout flush: #{stream_identity}"
handle_timeout_error(tag, @use_first_timestamp ? time : now, flushed_record, message)
log.info(message)
end
@timeout_map.reject! do |stream_identity, _|
timeout_stream_identities.include?(stream_identity)
# Purge expired streams from all the state hashes, not only the
# flushed ones. Streams which completed normally leave empty entries
# behind, so @buffer/@buffer_size/@timeout_map would otherwise grow
# unboundedly (notably in partial metadata mode where every split
# message has a unique stream identity).
expired_stream_identities.each do |stream_identity|
@timeout_map.delete(stream_identity)
@buffer.delete(stream_identity)
@buffer_size.delete(stream_identity)
end
end
end
Expand Down
101 changes: 101 additions & 0 deletions test/plugin/test_filter_concat.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1070,6 +1070,107 @@ def filter_with_time(conf, messages, wait: nil)
end
end

sub_test_case "stale stream state cleanup" do
def partial_messages(partial_id)
[
{
"container_id" => "1",
"message" => "start",
"partial_message" => "true",
"partial_id" => partial_id,
"partial_ordinal" => "1",
"partial_last" => "false"
},
{
"container_id" => "1",
"message" => "end",
"partial_message" => "true",
"partial_id" => partial_id,
"partial_ordinal" => "2",
"partial_last" => "true"
},
]
end

def teardown
Fluent::Engine.now = @time
end

# Fluent::Test.setup freezes Fluent::Engine.now, so real sleeps never
# make streams expire. Advance the frozen clock instead and wait for the
# next tick of the periodic timer (1s interval) to run
# flush_timeout_buffer with the advanced clock.
def advance_clock_and_snapshot_state(d, seconds)
Fluent::Engine.now = Fluent::EventTime.new(@time.sec + seconds, @time.nsec)
sleep 2
d.instance.instance_variable_get(:@timeout_map_mutex).synchronize do
{
buffer: d.instance.instance_variable_get(:@buffer).dup,
buffer_size: d.instance.instance_variable_get(:@buffer_size).dup,
timeout_map: d.instance.instance_variable_get(:@timeout_map).dup,
}
end
end

def run_and_snapshot_state(config, messages, advance:)
d = create_driver(config)
state = nil
d.run(default_tag: "test") do
sleep 0.1 # run event loop
messages.each do |message|
d.feed(@time, message)
end
state = advance_clock_and_snapshot_state(d, advance)
end
[d, state]
end

test "purges state for streams which completed normally" do
config = <<-CONFIG
key message
use_partial_metadata true
flush_interval 1s
CONFIG
messages = partial_messages("partial1") + partial_messages("partial2")
d, state = run_and_snapshot_state(config, messages, advance: 2)
expected = [
{ "container_id" => "1", "message" => "start\nend" },
{ "container_id" => "1", "message" => "start\nend" },
]
assert_equal(expected, d.filtered_records)
assert_equal({}, state[:buffer])
assert_equal({}, state[:buffer_size])
assert_equal({}, state[:timeout_map])
end

test "purges state for streams flushed by timeout" do
config = <<-CONFIG
key message
use_partial_metadata true
flush_interval 1s
CONFIG
messages = [partial_messages("partial1").first]
d, state = run_and_snapshot_state(config, messages, advance: 2)
assert(d.logs.any? {|line| line.include?("Timeout flush: test:partial1") })
assert_equal({}, state[:buffer])
assert_equal({}, state[:buffer_size])
assert_equal({}, state[:timeout_map])
end

test "keeps state for in-flight streams within flush_interval" do
config = <<-CONFIG
key message
use_partial_metadata true
flush_interval 30s
CONFIG
messages = [partial_messages("partial1").first]
d, state = run_and_snapshot_state(config, messages, advance: 2)
assert_equal(["test:partial1"], state[:buffer].keys)
assert_equal(1, state[:buffer]["test:partial1"].size)
assert_equal(["test:partial1"], state[:timeout_map].keys)
end
end

sub_test_case "overflow" do
test "ignore" do
config = <<-CONFIG
Expand Down
Loading