@@ -314,26 +314,23 @@ impl CascadingCompressor {
314314 // Run the winning scheme's `compress`. On failure, emit an ERROR event carrying the
315315 // scheme name and cascade history before propagating.
316316 let error_ctx = trace:: enabled_error_context ( & compress_ctx) ;
317- let compressed = match winner. compress ( self , & data, compress_ctx, exec_ctx) {
318- Ok ( compressed) => compressed,
319- Err ( err) => {
317+ let _winner_span = trace:: winner_compress_span ( winner. id ( ) , before_nbytes) . entered ( ) ;
318+ let compressed = winner
319+ . compress ( self , & data, compress_ctx, exec_ctx)
320+ . inspect_err ( |err| {
320321 // NB: this is the only way we can tell which scheme panicked / bailed on their
321322 // data, especially for third-party schemes where the error site may not carry any
322323 // compressor context.
323- trace:: scheme_compress_failed ( winner. id ( ) , before_nbytes, error_ctx. as_ref ( ) , & err) ;
324- return Err ( err) ;
325- }
326- } ;
324+ trace:: scheme_compress_failed ( winner. id ( ) , before_nbytes, error_ctx. as_ref ( ) , err) ;
325+ } ) ?;
327326
328327 let after_nbytes = compressed. nbytes ( ) ;
329328 let actual_ratio = ( after_nbytes != 0 ) . then ( || before_nbytes as f64 / after_nbytes as f64 ) ;
330329
331330 // TODO(connor): HACK TO SUPPORT L2 DENORMALIZATION!!!
332331 let accepted = after_nbytes < before_nbytes || compressed. is :: < AnyScalarFn > ( ) ;
333332
334- trace:: scheme_compress_result (
335- winner. id ( ) ,
336- before_nbytes,
333+ trace:: record_winner_compress_result (
337334 after_nbytes,
338335 winner_estimate. trace_ratio ( ) ,
339336 actual_ratio,
@@ -373,28 +370,32 @@ impl CascadingCompressor {
373370 let mut deferred: Vec < ( & ' static dyn Scheme , DeferredEstimate ) > = Vec :: new ( ) ;
374371
375372 // Pass 1: evaluate every immediate verdict. Stash deferred work for pass 2.
376- for & scheme in schemes {
377- match scheme. expected_compression_ratio ( data, compress_ctx. clone ( ) , exec_ctx) {
378- CompressionEstimate :: Verdict ( EstimateVerdict :: Skip ) => { }
379- CompressionEstimate :: Verdict ( EstimateVerdict :: AlwaysUse ) => {
380- return Ok ( Some ( ( scheme, WinnerEstimate :: AlwaysUse ) ) ) ;
381- }
382- CompressionEstimate :: Verdict ( EstimateVerdict :: Ratio ( ratio) ) => {
383- let score = EstimateScore :: FiniteCompression ( ratio) ;
373+ {
374+ let _verdict_pass = trace:: verdict_pass_span ( ) . entered ( ) ;
375+ for & scheme in schemes {
376+ match scheme. expected_compression_ratio ( data, compress_ctx. clone ( ) , exec_ctx) {
377+ CompressionEstimate :: Verdict ( EstimateVerdict :: Skip ) => { }
378+ CompressionEstimate :: Verdict ( EstimateVerdict :: AlwaysUse ) => {
379+ return Ok ( Some ( ( scheme, WinnerEstimate :: AlwaysUse ) ) ) ;
380+ }
381+ CompressionEstimate :: Verdict ( EstimateVerdict :: Ratio ( ratio) ) => {
382+ let score = EstimateScore :: FiniteCompression ( ratio) ;
384383
385- if is_better_score ( score, & best) {
386- best = Some ( ( scheme, score) ) ;
384+ if is_better_score ( score, & best) {
385+ best = Some ( ( scheme, score) ) ;
386+ }
387+ }
388+ CompressionEstimate :: Deferred ( deferred_estimate) => {
389+ deferred. push ( ( scheme, deferred_estimate) ) ;
387390 }
388- }
389- CompressionEstimate :: Deferred ( deferred_estimate) => {
390- deferred. push ( ( scheme, deferred_estimate) ) ;
391391 }
392392 }
393393 }
394394
395395 // Pass 2: run deferred work. Callbacks receive the current best as a threshold so they can
396396 // short-circuit with `Skip` when they cannot beat it.
397397 for ( scheme, deferred_estimate) in deferred {
398+ let _span = trace:: scheme_eval_span ( scheme. id ( ) ) . entered ( ) ;
398399 let threshold: Option < EstimateScore > = best. map ( |( _, score) | score) ;
399400 match deferred_estimate {
400401 DeferredEstimate :: Sample => {
@@ -560,18 +561,9 @@ impl CascadingCompressor {
560561
561562#[ cfg( test) ]
562563mod tests {
563- use std:: collections:: BTreeMap ;
564- use std:: sync:: Arc ;
565564 use std:: sync:: LazyLock ;
566565
567566 use parking_lot:: Mutex ;
568- use tracing:: Event ;
569- use tracing:: Subscriber ;
570- use tracing:: field:: Field ;
571- use tracing:: field:: Visit ;
572- use tracing_subscriber:: Layer ;
573- use tracing_subscriber:: layer:: Context ;
574- use tracing_subscriber:: prelude:: * ;
575567 use vortex_array:: ArrayRef ;
576568 use vortex_array:: Canonical ;
577569 use vortex_array:: VortexSessionExecute ;
@@ -595,7 +587,6 @@ mod tests {
595587 use crate :: estimate:: EstimateVerdict ;
596588 use crate :: estimate:: WinnerEstimate ;
597589 use crate :: scheme:: SchemeExt ;
598- use crate :: trace:: TARGET_TRACE ;
599590
600591 static SESSION : LazyLock < VortexSession > =
601592 LazyLock :: new ( || VortexSession :: empty ( ) . with :: < ArraySession > ( ) ) ;
@@ -613,98 +604,6 @@ mod tests {
613604 matches ! ( canonical, Canonical :: Primitive ( primitive) if primitive. ptype( ) . is_int( ) )
614605 }
615606
616- fn test_integer_array ( ) -> ArrayRef {
617- PrimitiveArray :: new ( buffer ! [ 1i32 , 2 , 3 , 4 ] , Validity :: NonNullable ) . into_array ( )
618- }
619-
620- #[ derive( Debug , Clone , PartialEq , Eq ) ]
621- struct RecordedEvent {
622- target : String ,
623- fields : BTreeMap < String , String > ,
624- }
625-
626- #[ derive( Default ) ]
627- struct EventVisitor {
628- fields : BTreeMap < String , String > ,
629- }
630-
631- impl Visit for EventVisitor {
632- fn record_debug ( & mut self , field : & Field , value : & dyn std:: fmt:: Debug ) {
633- self . fields
634- . insert ( field. name ( ) . to_owned ( ) , format ! ( "{value:?}" ) ) ;
635- }
636-
637- fn record_i64 ( & mut self , field : & Field , value : i64 ) {
638- self . fields
639- . insert ( field. name ( ) . to_owned ( ) , value. to_string ( ) ) ;
640- }
641-
642- fn record_u64 ( & mut self , field : & Field , value : u64 ) {
643- self . fields
644- . insert ( field. name ( ) . to_owned ( ) , value. to_string ( ) ) ;
645- }
646-
647- fn record_bool ( & mut self , field : & Field , value : bool ) {
648- self . fields
649- . insert ( field. name ( ) . to_owned ( ) , value. to_string ( ) ) ;
650- }
651-
652- fn record_str ( & mut self , field : & Field , value : & str ) {
653- self . fields
654- . insert ( field. name ( ) . to_owned ( ) , value. to_owned ( ) ) ;
655- }
656- }
657-
658- struct RecordingLayer {
659- events : Arc < Mutex < Vec < RecordedEvent > > > ,
660- }
661-
662- impl RecordingLayer {
663- fn new ( events : Arc < Mutex < Vec < RecordedEvent > > > ) -> Self {
664- Self { events }
665- }
666- }
667-
668- impl < S > Layer < S > for RecordingLayer
669- where
670- S : Subscriber ,
671- {
672- fn on_event ( & self , event : & Event < ' _ > , _ctx : Context < ' _ , S > ) {
673- let mut visitor = EventVisitor :: default ( ) ;
674- event. record ( & mut visitor) ;
675- self . events . lock ( ) . push ( RecordedEvent {
676- target : event. metadata ( ) . target ( ) . to_owned ( ) ,
677- fields : visitor. fields ,
678- } ) ;
679- }
680- }
681-
682- fn record_events < T > ( f : impl FnOnce ( ) -> T ) -> ( T , Vec < RecordedEvent > ) {
683- let events = Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ;
684- let subscriber =
685- tracing_subscriber:: registry ( ) . with ( RecordingLayer :: new ( Arc :: clone ( & events) ) ) ;
686- let result = tracing:: subscriber:: with_default ( subscriber, f) ;
687- let recorded = events. lock ( ) . clone ( ) ;
688- ( result, recorded)
689- }
690-
691- fn find_event < ' a > (
692- events : & ' a [ RecordedEvent ] ,
693- target : & str ,
694- message : & str ,
695- ) -> & ' a RecordedEvent {
696- events
697- . iter ( )
698- . find ( |event| {
699- event. target == target
700- && event
701- . fields
702- . get ( "message" )
703- . is_some_and ( |value| value == message)
704- } )
705- . expect ( "expected event not found" )
706- }
707-
708607 #[ derive( Debug ) ]
709608 struct DirectRatioScheme ;
710609
@@ -935,102 +834,6 @@ mod tests {
935834 }
936835 }
937836
938- #[ derive( Debug ) ]
939- struct NestedFailureParentScheme ;
940-
941- impl Scheme for NestedFailureParentScheme {
942- fn scheme_name ( & self ) -> & ' static str {
943- "test.nested_failure_parent"
944- }
945-
946- fn matches ( & self , canonical : & Canonical ) -> bool {
947- matches_integer_primitive ( canonical)
948- }
949-
950- fn expected_compression_ratio (
951- & self ,
952- _data : & ArrayAndStats ,
953- _compress_ctx : CompressorContext ,
954- _exec_ctx : & mut ExecutionCtx ,
955- ) -> CompressionEstimate {
956- CompressionEstimate :: Verdict ( EstimateVerdict :: AlwaysUse )
957- }
958-
959- fn compress (
960- & self ,
961- compressor : & CascadingCompressor ,
962- data : & ArrayAndStats ,
963- compress_ctx : CompressorContext ,
964- exec_ctx : & mut ExecutionCtx ,
965- ) -> VortexResult < ArrayRef > {
966- compressor. compress_child ( data. array ( ) , & compress_ctx, self . id ( ) , 1 , exec_ctx)
967- }
968- }
969-
970- #[ derive( Debug ) ]
971- struct NestedFailureLeafScheme ;
972-
973- impl Scheme for NestedFailureLeafScheme {
974- fn scheme_name ( & self ) -> & ' static str {
975- "test.nested_failure_leaf"
976- }
977-
978- fn matches ( & self , canonical : & Canonical ) -> bool {
979- matches_integer_primitive ( canonical)
980- }
981-
982- fn expected_compression_ratio (
983- & self ,
984- _data : & ArrayAndStats ,
985- _compress_ctx : CompressorContext ,
986- _exec_ctx : & mut ExecutionCtx ,
987- ) -> CompressionEstimate {
988- CompressionEstimate :: Verdict ( EstimateVerdict :: AlwaysUse )
989- }
990-
991- fn compress (
992- & self ,
993- _compressor : & CascadingCompressor ,
994- _data : & ArrayAndStats ,
995- _compress_ctx : CompressorContext ,
996- _exec_ctx : & mut ExecutionCtx ,
997- ) -> VortexResult < ArrayRef > {
998- vortex_error:: vortex_bail!( "nested failure" )
999- }
1000- }
1001-
1002- #[ derive( Debug ) ]
1003- struct SamplingFailureScheme ;
1004-
1005- impl Scheme for SamplingFailureScheme {
1006- fn scheme_name ( & self ) -> & ' static str {
1007- "test.sampling_failure"
1008- }
1009-
1010- fn matches ( & self , canonical : & Canonical ) -> bool {
1011- matches_integer_primitive ( canonical)
1012- }
1013-
1014- fn expected_compression_ratio (
1015- & self ,
1016- _data : & ArrayAndStats ,
1017- _compress_ctx : CompressorContext ,
1018- _exec_ctx : & mut ExecutionCtx ,
1019- ) -> CompressionEstimate {
1020- CompressionEstimate :: Deferred ( DeferredEstimate :: Sample )
1021- }
1022-
1023- fn compress (
1024- & self ,
1025- _compressor : & CascadingCompressor ,
1026- _data : & ArrayAndStats ,
1027- _compress_ctx : CompressorContext ,
1028- _exec_ctx : & mut ExecutionCtx ,
1029- ) -> VortexResult < ArrayRef > {
1030- vortex_error:: vortex_bail!( "sample failure" )
1031- }
1032- }
1033-
1034837 #[ test]
1035838 fn test_self_exclusion ( ) {
1036839 let c = compressor ( ) ;
@@ -1494,85 +1297,4 @@ mod tests {
14941297 assert ! ( matches!( score, EstimateScore :: FiniteCompression ( ratio) if ratio. is_finite( ) ) ) ;
14951298 Ok ( ( ) )
14961299 }
1497-
1498- #[ test]
1499- fn compress_failure_event_includes_cascade_path_and_depth ( ) {
1500- let compressor =
1501- CascadingCompressor :: new ( vec ! [ & NestedFailureParentScheme , & NestedFailureLeafScheme ] ) ;
1502- let array = test_integer_array ( ) ;
1503-
1504- let ( result, events) = record_events ( || {
1505- let mut exec_ctx = SESSION . create_execution_ctx ( ) ;
1506- compressor. compress ( & array, & mut exec_ctx)
1507- } ) ;
1508-
1509- assert ! ( result. is_err( ) ) ;
1510- let event = find_event ( & events, TARGET_TRACE , "scheme.compress_failed" ) ;
1511- assert_eq ! (
1512- event. fields. get( "scheme" ) . map( String :: as_str) ,
1513- Some ( "test.nested_failure_leaf" )
1514- ) ;
1515- assert_eq ! (
1516- event. fields. get( "cascade_path" ) . map( String :: as_str) ,
1517- Some ( "test.nested_failure_parent[1]" )
1518- ) ;
1519- assert_eq ! (
1520- event. fields. get( "cascade_depth" ) . map( String :: as_str) ,
1521- Some ( "1" )
1522- ) ;
1523- }
1524-
1525- #[ test]
1526- fn sample_failure_event_includes_cascade_path_and_depth ( ) {
1527- let compressor = CascadingCompressor :: new ( vec ! [ & SamplingFailureScheme ] ) ;
1528- let array = test_integer_array ( ) ;
1529-
1530- let ( result, events) = record_events ( || {
1531- let mut exec_ctx = SESSION . create_execution_ctx ( ) ;
1532- compressor. compress ( & array, & mut exec_ctx)
1533- } ) ;
1534-
1535- assert ! ( result. is_err( ) ) ;
1536- let event = find_event ( & events, TARGET_TRACE , "sample.compress_failed" ) ;
1537- assert_eq ! (
1538- event. fields. get( "scheme" ) . map( String :: as_str) ,
1539- Some ( "test.sampling_failure" )
1540- ) ;
1541- assert_eq ! (
1542- event. fields. get( "cascade_path" ) . map( String :: as_str) ,
1543- Some ( "root" )
1544- ) ;
1545- assert_eq ! (
1546- event. fields. get( "cascade_depth" ) . map( String :: as_str) ,
1547- Some ( "0" )
1548- ) ;
1549- }
1550-
1551- #[ test]
1552- fn zero_byte_sample_result_omits_ratio_fields_and_selects_no_scheme ( ) {
1553- let compressor = CascadingCompressor :: new ( vec ! [ & ZeroBytesSamplingScheme ] ) ;
1554- let array = test_integer_array ( ) ;
1555-
1556- let ( result, events) = record_events ( || {
1557- let mut exec_ctx = SESSION . create_execution_ctx ( ) ;
1558- compressor. compress ( & array, & mut exec_ctx)
1559- } ) ;
1560-
1561- assert ! ( result. is_ok( ) ) ;
1562-
1563- let sample_event = find_event ( & events, TARGET_TRACE , "sample.result" ) ;
1564- assert_eq ! (
1565- sample_event. fields. get( "sampled_after" ) . map( String :: as_str) ,
1566- Some ( "0" )
1567- ) ;
1568- assert ! ( !sample_event. fields. contains_key( "sampled_ratio" ) ) ;
1569-
1570- assert ! ( !events. iter( ) . any( |event| {
1571- event. target == TARGET_TRACE
1572- && event
1573- . fields
1574- . get( "message" )
1575- . is_some_and( |value| value == "scheme.compress_result" )
1576- } ) ) ;
1577- }
15781300}
0 commit comments