Skip to content

Commit 50ec22a

Browse files
committed
in_forward: Improve interoperability with metadata encapsulated timestamps
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
1 parent 99c7074 commit 50ec22a

1 file changed

Lines changed: 7 additions & 1 deletion

File tree

lib/fluent/plugin/in_forward.rb

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ def on_message(msg, chunk_size, conn)
332332
entries.each { |e|
333333
record = e[1]
334334
next if record.nil?
335-
time = e[0]
335+
time = extract_event_time(e[0])
336336
time = Fluent::EventTime.now if time.nil? || time.to_i == 0 # `to_i == 0` for empty EventTime
337337
es.add(time, record)
338338
}
@@ -367,12 +367,14 @@ def on_message(msg, chunk_size, conn)
367367
end
368368

369369
def invalid_event?(tag, time, record)
370+
time = extract_event_time(time)
370371
!((time.is_a?(Integer) || time.is_a?(::Fluent::EventTime)) && record.is_a?(Hash) && tag.is_a?(String))
371372
end
372373

373374
def check_and_skip_invalid_event(tag, es, remote_host)
374375
new_es = Fluent::MultiEventStream.new
375376
es.each { |time, record|
377+
time = extract_event_time(time)
376378
if invalid_event?(tag, time, record)
377379
log.warn "skip invalid event:", host: remote_host, tag: tag, time: time, record: record
378380
next
@@ -382,6 +384,10 @@ def check_and_skip_invalid_event(tag, es, remote_host)
382384
new_es
383385
end
384386

387+
def extract_event_time(time)
388+
time.is_a?(Array) ? time[0] : time
389+
end
390+
385391
def add_source_info(es, conn)
386392
new_es = Fluent::MultiEventStream.new
387393
if @source_address_key && @source_hostname_key

0 commit comments

Comments
 (0)