@@ -276,6 +276,26 @@ def create_driver(conf=base_config)
276276 assert_equal ( records , d . events )
277277 end
278278
279+ test 'metadata attached logs type in entries array' do
280+ @d = d = create_driver ( base_config + "skip_invalid_event false" )
281+
282+ time = event_time ( "2011-01-02 13:14:15 UTC" )
283+ metadata = { }
284+
285+ records = [
286+ { "a" => 1 } ,
287+ { "a" => 2 }
288+ ]
289+
290+ d . run ( expect_records : records . length , timeout : 20 ) do
291+ entries = records . map { |record | [ [ time , metadata ] , record ] }
292+ send_data packer . write ( [ "tag1" , entries , { "fluent_signal" => 0 } ] ) . to_s
293+ end
294+ assert_equal ( [ "tag1" , "tag1" ] , d . events . map { |tag , _time , _record | tag } )
295+ assert_equal ( [ time , time ] , d . events . map { |_tag , time , _record | time } )
296+ assert_equal ( records , d . events . map { |_tag , _time , record | record } )
297+ end
298+
279299 data ( tag : {
280300 param : "tag new_tag" ,
281301 result : "new_tag"
@@ -407,6 +427,29 @@ def create_driver(conf=base_config)
407427 assert_equal ( records , d . events )
408428 end
409429
430+ test 'metadata attached logs type in packed entries' do
431+ @d = d = create_driver ( base_config + "skip_invalid_event false" )
432+
433+ time = event_time ( "2011-01-02 13:14:15 UTC" )
434+ metadata = { }
435+
436+ records = [
437+ { "a" => 1 } ,
438+ { "a" => 2 } ,
439+ ]
440+
441+ d . run ( expect_records : records . length , timeout : 20 ) do
442+ entries = ''
443+ records . each { |record |
444+ packer ( entries ) . write ( [ [ time , metadata ] , record ] ) . flush
445+ }
446+ send_data packer . write ( [ "tag1" , entries , { "fluent_signal" => 0 } ] ) . to_s
447+ end
448+ assert_equal ( [ "tag1" , "tag1" ] , d . events . map { |tag , _time , _record | tag } )
449+ assert_equal ( [ time , time ] , d . events . map { |_tag , time , _record | time } )
450+ assert_equal ( records , d . events . map { |_tag , _time , record | record } )
451+ end
452+
410453 data ( tag : {
411454 param : "tag new_tag" ,
412455 result : "new_tag"
@@ -509,6 +552,33 @@ def create_driver(conf=base_config)
509552 end
510553
511554 sub_test_case 'compressed packed forward' do
555+ test 'metadata attached logs type without skip_invalid_event' do
556+ @d = d = create_driver ( base_config + "skip_invalid_event false" )
557+
558+ time = event_time ( "2011-01-02 13:14:15 UTC" )
559+ metadata = { }
560+ records = [
561+ { "a" => 1 } ,
562+ { "a" => 2 } ,
563+ ]
564+
565+ entries = ''
566+ records . each do |record |
567+ entries << compress ( [ [ time , metadata ] , record ] . to_msgpack )
568+ end
569+ chunk = [ "tag1" , entries , { 'compressed' => 'gzip' , 'fluent_signal' => 0 } ] . to_msgpack
570+
571+ d . run do
572+ Fluent ::MessagePackFactory . msgpack_unpacker . feed_each ( chunk ) do |obj |
573+ d . instance . send ( :on_message , obj , chunk . size , DUMMY_SOCK )
574+ end
575+ end
576+
577+ assert_equal ( [ "tag1" , "tag1" ] , d . events . map { |tag , _time , _record | tag } )
578+ assert_equal ( [ time , time ] , d . events . map { |_tag , time , _record | time } )
579+ assert_equal ( records , d . events . map { |_tag , _time , record | record } )
580+ end
581+
512582 test 'set_compress_to_option_gzip' do
513583 @d = d = create_driver
514584
0 commit comments