@@ -357,3 +357,190 @@ fn process_raw_large_batch() {
357357 assert_eq ! ( snap. received, 5_000 ) ;
358358 assert_eq ! ( snap. processed, 5_000 ) ;
359359}
360+
361+ #[ test]
362+ fn parse_error_action_skip ( ) {
363+ // With Skip action, invalid messages are silently dropped — not included in results.
364+ let config = BatchProcessingConfig {
365+ parse_error_action : hyperi_rustlib:: worker:: engine:: ParseErrorAction :: Skip ,
366+ ..Default :: default ( )
367+ } ;
368+ let engine = BatchEngine :: new ( config) ;
369+
370+ let mut msgs = make_json_messages ( 3 ) ;
371+ // Insert 2 invalid messages at positions 1 and 3
372+ msgs. insert ( 1 , make_raw ( b"not json {{{" ) ) ;
373+ msgs. push ( make_raw ( b"also not json <<<" ) ) ;
374+ // msgs is now: valid, invalid, valid, valid, invalid → 5 total, 3 valid
375+
376+ let results: Vec < Result < ( ) , String > > = engine. process_mid_tier ( & msgs, |_| Ok ( ( ) ) ) ;
377+ // Skip drops the 2 invalid ones entirely — only 3 Ok entries
378+ assert_eq ! (
379+ results. len( ) ,
380+ 3 ,
381+ "skipped messages must not appear in results"
382+ ) ;
383+ assert ! ( results. iter( ) . all( |r| r. is_ok( ) ) ) ;
384+
385+ let snap = engine. stats ( ) . snapshot ( ) ;
386+ assert_eq ! ( snap. received, 5 ) ;
387+ assert_eq ! ( snap. processed, 3 ) ;
388+ // Errors are still counted even when skipped
389+ assert_eq ! ( snap. errors, 2 ) ;
390+ }
391+
392+ #[ test]
393+ fn parse_error_action_fail_batch ( ) {
394+ // With FailBatch, any parse error causes the entire batch to return Err.
395+ let config = BatchProcessingConfig {
396+ parse_error_action : hyperi_rustlib:: worker:: engine:: ParseErrorAction :: FailBatch ,
397+ ..Default :: default ( )
398+ } ;
399+ let engine = BatchEngine :: new ( config) ;
400+
401+ let mut msgs = make_json_messages ( 4 ) ;
402+ // Inject one invalid message at position 2
403+ msgs. insert ( 2 , make_raw ( b"totally not json!!!" ) ) ;
404+ // msgs: valid, valid, invalid, valid, valid → 5 total
405+
406+ let results: Vec < Result < ( ) , String > > = engine. process_mid_tier ( & msgs, |_| Ok ( ( ) ) ) ;
407+ // FailBatch: all results in the batch (up to and including the error) are Err
408+ assert ! (
409+ !results. is_empty( ) ,
410+ "FailBatch must return results, not empty"
411+ ) ;
412+ assert ! (
413+ results. iter( ) . all( |r| r. is_err( ) ) ,
414+ "FailBatch: every result must be Err, got {} ok and {} err" ,
415+ results. iter( ) . filter( |r| r. is_ok( ) ) . count( ) ,
416+ results. iter( ) . filter( |r| r. is_err( ) ) . count( ) ,
417+ ) ;
418+ }
419+
420+ #[ cfg( feature = "worker-msgpack" ) ]
421+ #[ test]
422+ fn msgpack_auto_detection ( ) {
423+ // Encode {"key": "value"} as MsgPack and verify Auto detection + parsing.
424+ let json_value: serde_json:: Value = serde_json:: json!( { "key" : "value" , "_table" : "events" } ) ;
425+ let msgpack_bytes = rmp_serde:: to_vec ( & json_value) . expect ( "msgpack encode failed" ) ;
426+
427+ let engine = default_engine ( ) ;
428+ // Use Auto format — engine should sniff the MsgPack header bytes
429+ let msg = RawMessage {
430+ payload : bytes:: Bytes :: from ( msgpack_bytes) ,
431+ key : None ,
432+ headers : vec ! [ ] ,
433+ metadata : MessageMetadata {
434+ timestamp_ms : None ,
435+ format : PayloadFormat :: Auto ,
436+ commit_token : None ,
437+ } ,
438+ } ;
439+
440+ let results: Vec < Result < String , String > > = engine. process_mid_tier ( & [ msg] , |pm| {
441+ pm. field ( "key" )
442+ . and_then ( |v| sonic_rs:: JsonValueTrait :: as_str ( v) )
443+ . map ( String :: from)
444+ . ok_or_else ( || "missing key field" . to_string ( ) )
445+ } ) ;
446+
447+ assert_eq ! ( results. len( ) , 1 ) ;
448+ assert ! ( results[ 0 ] . is_ok( ) , "msgpack parse failed: {:?}" , results[ 0 ] ) ;
449+ assert_eq ! ( results[ 0 ] . as_ref( ) . unwrap( ) , "value" ) ;
450+ }
451+
452+ #[ test]
453+ fn pre_route_field_error_on_invalid_json ( ) {
454+ // Routing + DropFieldMissing filter applied to messages with completely invalid JSON.
455+ // Invalid JSON cannot be parsed for field extraction — should be treated as
456+ // field-missing (dropped) or parse error (DLQ) depending on engine phase ordering.
457+ let config = BatchProcessingConfig {
458+ routing_field : Some ( "_table" . to_string ( ) ) ,
459+ pre_route_filters : vec ! [ PreRouteFilterConfig :: DropFieldMissing {
460+ field: "_table" . to_string( ) ,
461+ } ] ,
462+ ..Default :: default ( )
463+ } ;
464+ let engine = BatchEngine :: new ( config) ;
465+
466+ let msgs = vec ! [
467+ make_raw( r#"{"_table":"events","id":1}"# . as_bytes( ) ) , // valid, has field
468+ make_raw( b"not json at all <<<" ) , // completely invalid
469+ make_raw( r#"{"_table":"logs","id":2}"# . as_bytes( ) ) , // valid, has field
470+ ] ;
471+
472+ let results: Vec < Result < ( ) , String > > = engine. process_mid_tier ( & msgs, |_| Ok ( ( ) ) ) ;
473+
474+ // The two valid messages should succeed.
475+ // The invalid JSON message is either filtered (field extraction fails → treated as
476+ // missing) or produces an Err (DLQ from parse phase after pre-route passes).
477+ // Either way, no panic and exactly 2 Ok results.
478+ let ok_count = results. iter ( ) . filter ( |r| r. is_ok ( ) ) . count ( ) ;
479+ assert_eq ! ( ok_count, 2 , "expected 2 successful results, got {ok_count}" ) ;
480+
481+ let snap = engine. stats ( ) . snapshot ( ) ;
482+ assert_eq ! ( snap. received, 3 ) ;
483+ // The 2 valid messages must be processed
484+ assert_eq ! ( snap. processed, 2 ) ;
485+ }
486+
487+ #[ test]
488+ fn concurrent_process_mid_tier ( ) {
489+ // 4 threads each calling process_mid_tier on shared Arc<BatchEngine>.
490+ // Verifies thread safety of the engine, stats, and interner.
491+ let engine = Arc :: new ( BatchEngine :: new ( BatchProcessingConfig :: default ( ) ) ) ;
492+
493+ let num_threads = 4 ;
494+ let msgs_per_thread = 1_000 ;
495+
496+ let handles: Vec < _ > = ( 0 ..num_threads)
497+ . map ( |t| {
498+ let engine = Arc :: clone ( & engine) ;
499+ std:: thread:: spawn ( move || {
500+ let msgs = ( 0 ..msgs_per_thread)
501+ . map ( |i| {
502+ make_raw (
503+ format ! ( r#"{{"_table":"events","thread":{t},"id":{i}}}"# ) . as_bytes ( ) ,
504+ )
505+ } )
506+ . collect :: < Vec < _ > > ( ) ;
507+ let results: Vec < Result < usize , String > > =
508+ engine. process_mid_tier ( & msgs, |pm| Ok ( pm. raw_payload ( ) . len ( ) ) ) ;
509+ assert_eq ! ( results. len( ) , msgs_per_thread) ;
510+ assert ! (
511+ results. iter( ) . all( |r| r. is_ok( ) ) ,
512+ "thread {t}: unexpected errors"
513+ ) ;
514+ } )
515+ } )
516+ . collect ( ) ;
517+
518+ for h in handles {
519+ h. join ( ) . expect ( "thread panicked" ) ;
520+ }
521+
522+ let snap = engine. stats ( ) . snapshot ( ) ;
523+ let total = ( num_threads * msgs_per_thread) as u64 ;
524+ assert_eq ! ( snap. received, total, "stats.received mismatch: {snap:?}" ) ;
525+ assert_eq ! ( snap. processed, total, "stats.processed mismatch: {snap:?}" ) ;
526+ assert_eq ! ( snap. errors, 0 ) ;
527+ }
528+
529+ #[ test]
530+ fn large_batch_20k ( ) {
531+ // Kafka-scale: 20 000 messages across 2 chunks of 10 000 (default max_chunk_size).
532+ let engine = default_engine ( ) ;
533+ let msgs = make_json_messages ( 20_000 ) ;
534+
535+ let results: Vec < Result < usize , String > > =
536+ engine. process_mid_tier ( & msgs, |pm| Ok ( pm. raw_payload ( ) . len ( ) ) ) ;
537+
538+ assert_eq ! ( results. len( ) , 20_000 ) ;
539+ assert ! ( results. iter( ) . all( |r| r. is_ok( ) ) ) ;
540+
541+ let snap = engine. stats ( ) . snapshot ( ) ;
542+ assert_eq ! ( snap. received, 20_000 ) ;
543+ assert_eq ! ( snap. processed, 20_000 ) ;
544+ assert_eq ! ( snap. errors, 0 ) ;
545+ assert_eq ! ( snap. filtered, 0 ) ;
546+ }
0 commit comments