Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions lib/fluent/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,43 @@ def ensure_decompressed!
end
end

class MetadataTimeEventStream < EventStream
def initialize(es)
@es = es
end

def empty?
@es.empty?
end

def dup
self.class.new(@es.dup)
end

def size
@es.size
end

def repeatable?
@es.repeatable?
end

def slice(index, num)
self.class.new(@es.slice(index, num))
end

def each(unpacker: nil, &block)
@es.each(unpacker: unpacker) do |time, record|
next if record.nil?

time = time[0] if time.is_a?(Array)
time = Fluent::EventTime.now if time.nil? || time.to_i == 0
block.call(time, record)
end
nil
end
end

module ChunkMessagePackEventStreamer
# chunk.extend(ChunkMessagePackEventStreamer)
# => chunk.each{|time, record| ... }
Expand Down
51 changes: 29 additions & 22 deletions lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ class ForwardInput < Input
helpers :server

LISTEN_PORT = 24224
OPTION_ACK = 'ack'.freeze
OPTION_CHUNK = 'chunk'.freeze
OPTION_COMPRESSED = 'compressed'.freeze
OPTION_FLUENT_SIGNAL = 'fluent_signal'.freeze
OPTION_SIZE = 'size'.freeze
OPTION_TEXT = 'text'.freeze

desc 'The port to listen to.'
config_param :port, :integer, default: LISTEN_PORT
Expand Down Expand Up @@ -275,8 +281,8 @@ def read_messages(conn, &block)
end

def response(option)
if option && option['chunk']
return { 'ack' => option['chunk'] }
if option && option[OPTION_CHUNK]
return { OPTION_ACK => option[OPTION_CHUNK] }
end
nil
end
Expand Down Expand Up @@ -310,34 +316,27 @@ def on_message(msg, chunk_size, conn)
when String
# PackedForward
option = msg[2] || {}
size = option['size'] || 0
size = option[OPTION_SIZE] || 0
compressed = option[OPTION_COMPRESSED]

if option['compressed'] && option['compressed'] != 'text'
es = Fluent::CompressedMessagePackEventStream.new(entries, nil, size.to_i, compress: option['compressed'].to_sym, decompression_size_limit: @decompression_size_limit)
if compressed && compressed != OPTION_TEXT
es = Fluent::CompressedMessagePackEventStream.new(entries, nil, size.to_i, compress: compressed.to_sym, decompression_size_limit: @decompression_size_limit)
else
es = Fluent::MessagePackEventStream.new(entries, nil, size.to_i)
end
es = check_and_skip_invalid_event(tag, es, conn.remote_host) if @skip_invalid_event
if @skip_invalid_event
es = normalize_event_stream(tag, es, conn.remote_host)
elsif option[OPTION_FLUENT_SIGNAL] == 0
es = Fluent::MetadataTimeEventStream.new(es)
end
if @enable_field_injection
es = add_source_info(es, conn)
end
router.emit_stream(tag, es)

when Array
# Forward
es = if @skip_invalid_event
check_and_skip_invalid_event(tag, entries, conn.remote_host)
else
es = Fluent::MultiEventStream.new
entries.each { |e|
record = e[1]
next if record.nil?
time = e[0]
time = Fluent::EventTime.now if time.nil? || time.to_i == 0 # `to_i == 0` for empty EventTime
es.add(time, record)
}
es
end
es = normalize_event_stream(tag, entries, conn.remote_host)
if @enable_field_injection
es = add_source_info(es, conn)
end
Expand All @@ -346,7 +345,7 @@ def on_message(msg, chunk_size, conn)

else
# Message
time = msg[1]
time = extract_event_time(msg[1])
record = msg[2]
if @skip_invalid_event && invalid_event?(tag, time, record)
log.warn "got invalid event and drop it:", host: conn.remote_host, tag: tag, time: time, record: record
Expand All @@ -367,21 +366,29 @@ def on_message(msg, chunk_size, conn)
end

def invalid_event?(tag, time, record)
time = extract_event_time(time)
!((time.is_a?(Integer) || time.is_a?(::Fluent::EventTime)) && record.is_a?(Hash) && tag.is_a?(String))
end

def check_and_skip_invalid_event(tag, es, remote_host)
def normalize_event_stream(tag, es, remote_host)
new_es = Fluent::MultiEventStream.new
es.each { |time, record|
if invalid_event?(tag, time, record)
time = extract_event_time(time)
if @skip_invalid_event && invalid_event?(tag, time, record)
log.warn "skip invalid event:", host: remote_host, tag: tag, time: time, record: record
next
end
next if record.nil?
time = Fluent::EventTime.now if time.nil? || time.to_i == 0 # `to_i == 0` for empty EventTime
new_es.add(time, record)
}
new_es
end

def extract_event_time(time)
time.is_a?(Array) ? time[0] : time
end

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In practical use case, in same event stream, are there mixed event coming from fluentbit?
I mean that [EventTime, Record], [[EventTime, Hash*], Record], [EventTime, Record], ...

I know that based on the specifications, of course, that case is a possible.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it isn't possible to be contaminated. They'll be handled to switch with the retain_metadata_in_forward_mode parameter:
https://github.com/fluent/fluent-bit/blob/master/plugins/out_forward/forward.c#L1794C28-L1794C59

This should be set up with configuration file and not to be changed at runtime.


def add_source_info(es, conn)
new_es = Fluent::MultiEventStream.new
if @source_address_key && @source_hostname_key
Expand Down
70 changes: 70 additions & 0 deletions test/plugin/test_in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,26 @@ def create_driver(conf=base_config)
assert_equal(records, d.events)
end

test 'metadata attached logs type in entries array' do
@d = d = create_driver(base_config + "skip_invalid_event false")

time = event_time("2011-01-02 13:14:15 UTC")
metadata = {}

records = [
{"a"=>1},
{"a"=>2}
]

d.run(expect_records: records.length, timeout: 20) do
entries = records.map { |record| [[time, metadata], record] }
send_data packer.write(["tag1", entries, {"fluent_signal" => 0}]).to_s
end
assert_equal(["tag1", "tag1"], d.events.map { |tag, _time, _record| tag })
assert_equal([time, time], d.events.map { |_tag, time, _record| time })
assert_equal(records, d.events.map { |_tag, _time, record| record })
end

data(tag: {
param: "tag new_tag",
result: "new_tag"
Expand Down Expand Up @@ -407,6 +427,29 @@ def create_driver(conf=base_config)
assert_equal(records, d.events)
end

test 'metadata attached logs type in packed entries' do
@d = d = create_driver(base_config + "skip_invalid_event false")

time = event_time("2011-01-02 13:14:15 UTC")
metadata = {}

records = [
{"a"=>1},
{"a"=>2},
]

d.run(expect_records: records.length, timeout: 20) do
entries = ''
records.each {|record|
packer(entries).write([[time, metadata], record]).flush
}
send_data packer.write(["tag1", entries, {"fluent_signal" => 0}]).to_s
end
assert_equal(["tag1", "tag1"], d.events.map { |tag, _time, _record| tag })
assert_equal([time, time], d.events.map { |_tag, time, _record| time })
assert_equal(records, d.events.map { |_tag, _time, record| record })
end

data(tag: {
param: "tag new_tag",
result: "new_tag"
Expand Down Expand Up @@ -509,6 +552,33 @@ def create_driver(conf=base_config)
end

sub_test_case 'compressed packed forward' do
test 'metadata attached logs type without skip_invalid_event' do
@d = d = create_driver(base_config + "skip_invalid_event false")

time = event_time("2011-01-02 13:14:15 UTC")
metadata = {}
records = [
{"a"=>1},
{"a"=>2},
]

entries = ''
records.each do |record|
entries << compress([[time, metadata], record].to_msgpack)
end
chunk = ["tag1", entries, { 'compressed' => 'gzip', 'fluent_signal' => 0 }].to_msgpack

d.run do
Fluent::MessagePackFactory.msgpack_unpacker.feed_each(chunk) do |obj|
d.instance.send(:on_message, obj, chunk.size, DUMMY_SOCK)
end
end

assert_equal(["tag1", "tag1"], d.events.map { |tag, _time, _record| tag })
assert_equal([time, time], d.events.map { |_tag, time, _record| time })
assert_equal(records, d.events.map { |_tag, _time, record| record })
end

test 'set_compress_to_option_gzip' do
@d = d = create_driver

Expand Down
Loading