Skip to content

Commit 3c7dadf

Browse files
committed
in_forward: Make lazy evaluation for sent by Fluent Bit suspected payloads
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
1 parent fe52d26 commit 3c7dadf

1 file changed

Lines changed: 46 additions & 6 deletions

File tree

lib/fluent/plugin/in_forward.rb

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,43 @@ module Fluent::Plugin
2525
class ForwardInput < Input
2626
Fluent::Plugin.register_input('forward', self)
2727

28+
class MetadataTimeEventStream < Fluent::EventStream
29+
def initialize(es)
30+
@es = es
31+
end
32+
33+
def empty?
34+
@es.empty?
35+
end
36+
37+
def dup
38+
self.class.new(@es.dup)
39+
end
40+
41+
def size
42+
@es.size
43+
end
44+
45+
def repeatable?
46+
@es.repeatable?
47+
end
48+
49+
def slice(index, num)
50+
self.class.new(@es.slice(index, num))
51+
end
52+
53+
def each(unpacker: nil, &block)
54+
@es.each(unpacker: unpacker) do |time, record|
55+
next if record.nil?
56+
57+
time = time[0] if time.is_a?(Array)
58+
time = Fluent::EventTime.now if time.nil? || time.to_i == 0
59+
block.call(time, record)
60+
end
61+
nil
62+
end
63+
end
64+
2865
# See the wiki page below for protocol specification
2966
# https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1
3067

@@ -317,7 +354,13 @@ def on_message(msg, chunk_size, conn)
317354
else
318355
es = Fluent::MessagePackEventStream.new(entries, nil, size.to_i)
319356
end
320-
es = normalize_event_stream(tag, es, conn.remote_host)
357+
es = if @skip_invalid_event
358+
normalize_event_stream(tag, es, conn.remote_host)
359+
elsif option['fluent_signal'] == 0
360+
MetadataTimeEventStream.new(es)
361+
else
362+
es
363+
end
321364
if @enable_field_injection
322365
es = add_source_info(es, conn)
323366
end
@@ -361,11 +404,8 @@ def invalid_event?(tag, time, record)
361404

362405
def normalize_event_stream(tag, es, remote_host)
363406
new_es = Fluent::MultiEventStream.new
364-
normalized = @skip_invalid_event || es.is_a?(Array)
365407
es.each { |time, record|
366-
extracted_time = extract_event_time(time)
367-
normalized ||= !time.equal?(extracted_time)
368-
time = extracted_time
408+
time = extract_event_time(time)
369409
if @skip_invalid_event && invalid_event?(tag, time, record)
370410
log.warn "skip invalid event:", host: remote_host, tag: tag, time: time, record: record
371411
next
@@ -374,7 +414,7 @@ def normalize_event_stream(tag, es, remote_host)
374414
time = Fluent::EventTime.now if time.nil? || time.to_i == 0 # `to_i == 0` for empty EventTime
375415
new_es.add(time, record)
376416
}
377-
normalized ? new_es : es
417+
new_es
378418
end
379419

380420
def extract_event_time(time)

0 commit comments

Comments
 (0)