Fix unbounded growth of internal stream state in timeout handling#150
Merged
Watson1978 merged 1 commit intoJun 12, 2026
Merged
Conversation
flush_buffer leaves an empty entry behind in @buffer/@buffer_size, and flush_timeout_buffer skips expired streams whose buffer is empty without removing them from @timeout_map. Streams which complete normally therefore leave entries in all three hashes forever. This is especially harmful in partial metadata mode: every split message has a unique partial_id and thus a unique stream identity, so a long-running fluentd accumulates state for every split message it has ever processed. Memory grows without bound, and the periodic flush_timeout_buffer scan (which iterates the whole @timeout_map every second while holding @timeout_map_mutex) consumes more and more CPU and increasingly delays filter_stream. Purge expired streams from @timeout_map, @buffer and @buffer_size whether or not they still have buffered content. In-flight streams within flush_interval are untouched. The new tests advance the frozen Fluent::Engine.now (Fluent::Test.setup memoizes it) instead of sleeping, because the expiry condition can never become true under the frozen clock; this is also why existing timeout tests only exercised the shutdown path. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Contributor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
ConcatFilterkeeps per-stream state in three hashes (@buffer,@buffer_size,@timeout_map), but entries are never removed at runtime once a stream finishes:flush_bufferresets the entry instead of deleting it, leaving@buffer[stream_identity] = []/@buffer_size[stream_identity] = 0behind.flush_timeout_bufferskips expired streams whose buffer is empty (next if @buffer[stream_identity].empty?) and removes only the identities it actually flushed from@timeout_map, so streams that completed normally stay in@timeout_mapforever.@buffer/@buffer_size(flush_remaining_bufferonly runs at shutdown).As a result, every stream identity ever seen leaks three hash entries plus the identity string.
This is especially harmful in
use_partial_metadatamode: the stream identity is"#{tag}:#{partial_id}", and Docker generates a uniquepartial_idper split message, so a long-running fluentd accumulates state for every split (>16KB) log message it has ever processed.The impact is twofold:
flush_timeout_bufferruns every second and iterates the whole ever-growing@timeout_mapwhile holding@timeout_map_mutex, whichprocess(the ingest path) also needs. Under high traffic this eventually stalls ingestion.Fix
In
flush_timeout_buffer, collect all expired stream identities (not just the flushed ones) and purge them from@timeout_map,@buffer, and@buffer_size.In-flight streams within
flush_intervalare untouched. Expired streams that still had buffered content are flushed exactly as before — the only behavioral change is that their (empty) state entries are now removed as well. If the same stream identity appears again later, the entries are transparently recreated by the existingHash.newdefault blocks, so per-stream behavior is unchanged.This also replaces the
reject! { include? }scan (O(map size × flushed)) with direct deletes.Tests
Added a
stale stream state cleanupsub test case:flush_intervalThe first two fail against the current master.
Note on how the tests advance time
While writing these tests I noticed that
Fluent::Test.setupmemoizesFluent::Engine.now, so the clock is effectively frozen during driver runs andnow - previous_timestampis always0— the timeout condition can never become true with realsleeps. (The existing timeout tests pass becauseflush_remaining_bufferat shutdown emits the same error event the mocks expect, i.e. the timer-driven timeout path was not actually exercised.)The new tests therefore advance the frozen clock via the
Fluent::Engine.now=setter provided byfluent/testand let the already-running 1s periodic timer evaluate the expiry, restoring the clock inteardown.Reproduction snippet
🤖 Generated with Claude Code