@@ -10,6 +10,7 @@ mod profiles_dictionary_translator;
1010pub use profiles_dictionary_translator:: * ;
1111
1212use self :: api:: UpscalingInfo ;
13+ use super :: upscaling:: ActiveUpscalingRules ;
1314use super :: * ;
1415use crate :: api:: ManagedStringId ;
1516use crate :: collections:: identifiable:: * ;
@@ -63,7 +64,7 @@ pub struct Profile {
6364 string_storage : Option < Arc < Mutex < ManagedStringStorage > > > ,
6465 string_storage_cached_profile_id : Option < CachedProfileId > ,
6566 timestamp_key : StringId ,
66- upscaling_rules : UpscalingRules ,
67+ upscaling_rules : ActiveUpscalingRules ,
6768}
6869
6970pub struct EncodedProfile {
@@ -604,23 +605,35 @@ impl Profile {
604605 // Collect label set attribute indices (including endpoints) before taking strings, since
605606 // get_endpoint_for_label_set needs self. Endpoints are deduped into labels so they appear
606607 // in the attribute_table built from labels.
607- let label_set_attr_indices: Vec < Vec < i32 > > = std:: mem:: take ( & mut self . label_sets )
608- . into_iter ( )
609- . map ( |label_set| {
610- let endpoint = self
611- . get_endpoint_for_label_set ( & label_set)
612- . ok ( )
613- . flatten ( )
614- . and_then ( |label| self . labels . try_dedup ( label) . ok ( ) )
615- . map ( |id| id. to_offset ( ) as i32 + 1 ) ;
616-
608+ // extended_label_sets holds the same data as Label values, indexed by LabelSetId raw id,
609+ // so upscaling rules can be matched by label key/value during sample emission.
610+ let label_sets_raw = std:: mem:: take ( & mut self . label_sets ) ;
611+ let mut label_set_attr_indices: Vec < Vec < i32 > > = Vec :: with_capacity ( label_sets_raw. len ( ) ) ;
612+ let mut extended_label_sets: Vec < Vec < Label > > = Vec :: with_capacity ( label_sets_raw. len ( ) ) ;
613+ for label_set in & label_sets_raw {
614+ // Label: Copy, so endpoint_label remains usable after the and_then chain below.
615+ let endpoint_label: Option < Label > =
616+ self . get_endpoint_for_label_set ( label_set) . ok ( ) . flatten ( ) ;
617+ let endpoint_id = endpoint_label
618+ . and_then ( |label| self . labels . try_dedup ( label) . ok ( ) )
619+ . map ( |id| id. to_offset ( ) as i32 + 1 ) ;
620+ label_set_attr_indices. push (
617621 label_set
618622 . iter ( )
619623 . map ( |id| id. to_offset ( ) as i32 + 1 )
620- . chain ( endpoint)
621- . collect ( )
622- } )
623- . collect ( ) ;
624+ . chain ( endpoint_id)
625+ . collect ( ) ,
626+ ) ;
627+ let mut labels: Vec < Label > = label_set
628+ . iter ( )
629+ . filter_map ( |l| self . get_label ( * l) . ok ( ) . copied ( ) )
630+ . collect ( ) ;
631+ if let Some ( ep) = endpoint_label {
632+ labels. push ( ep) ;
633+ }
634+ extended_label_sets. push ( labels) ;
635+ }
636+ drop ( label_sets_raw) ;
624637
625638 let mut lender = self . strings . into_lending_iter ( ) ;
626639 let string_table: Vec < String > =
@@ -719,6 +732,10 @@ impl Profile {
719732 . context ( "missing profile" ) ?;
720733 profile. sample . reserve ( samples. len ( ) ) ;
721734 for ( sample, val) in samples {
735+ let labels: & [ Label ] = & extended_label_sets[ sample. labels . to_raw_id ( ) ] ;
736+ let val = self
737+ . upscaling_rules
738+ . upscale_singleton ( sample_type, val, labels) ;
722739 let attribute_indices = label_set_attr_indices
723740 . get ( sample. labels . to_raw_id ( ) )
724741 . cloned ( )
@@ -739,12 +756,21 @@ impl Profile {
739756 . context ( "missing profile" ) ?;
740757 profile. sample . reserve ( samples. len ( ) ) ;
741758 for ( sample, vals) in samples {
759+ let labels: & [ Label ] = & extended_label_sets[ sample. labels . to_raw_id ( ) ] ;
742760 let attribute_indices = label_set_attr_indices
743761 . get ( sample. labels . to_raw_id ( ) )
744762 . cloned ( )
745763 . unwrap_or_default ( ) ;
746- let ( values, timestamps_unix_nano) : ( Vec < _ > , Vec < u64 > ) =
747- vals. into_iter ( ) . map ( |( v, ts) | ( v, ts. get ( ) as u64 ) ) . unzip ( ) ;
764+ let ( values, timestamps_unix_nano) : ( Vec < _ > , Vec < u64 > ) = vals
765+ . into_iter ( )
766+ . map ( |( v, ts) | {
767+ (
768+ self . upscaling_rules
769+ . upscale_singleton ( sample_type, v, labels) ,
770+ ts. get ( ) as u64 ,
771+ )
772+ } )
773+ . unzip ( ) ;
748774 profile. sample . push ( crate :: otel:: Sample {
749775 stack_index : sample. stacktrace . into_raw_id ( ) as i32 ,
750776 values,
@@ -762,6 +788,10 @@ impl Profile {
762788 let mut samples2 = Vec :: with_capacity ( samples. len ( ) ) ;
763789
764790 for ( sample, ( val1, val2) ) in samples. into_iter ( ) {
791+ let labels: & [ Label ] = & extended_label_sets[ sample. labels . to_raw_id ( ) ] ;
792+ let ( val1, val2) = self
793+ . upscaling_rules
794+ . upscale_pair ( st1, val1, st2, val2, labels) ?;
765795 let attribute_indices = label_set_attr_indices
766796 . get ( sample. labels . to_raw_id ( ) )
767797 . cloned ( )
@@ -791,27 +821,23 @@ impl Profile {
791821 let mut samples2 = Vec :: with_capacity ( samples. len ( ) ) ;
792822
793823 for ( sample, vals) in samples. into_iter ( ) {
824+ let labels: & [ Label ] = & extended_label_sets[ sample. labels . to_raw_id ( ) ] ;
794825 let attribute_indices = label_set_attr_indices
795826 . get ( sample. labels . to_raw_id ( ) )
796827 . cloned ( )
797828 . unwrap_or_default ( ) ;
798829 let cap = vals. len ( ) ;
799- let ( values1, values2, timestamps_unix_nano) = vals
800- . into_iter ( )
801- . map ( |( v1, v2, ts) | ( v1, v2, ts. get ( ) as u64 ) )
802- . fold (
803- (
804- Vec :: with_capacity ( cap) ,
805- Vec :: with_capacity ( cap) ,
806- Vec :: with_capacity ( cap) ,
807- ) ,
808- |( mut vs1, mut vs2, mut ts) , ( v1, v2, t) | {
809- vs1. push ( v1) ;
810- vs2. push ( v2) ;
811- ts. push ( t) ;
812- ( vs1, vs2, ts)
813- } ,
814- ) ;
830+ let mut values1 = Vec :: with_capacity ( cap) ;
831+ let mut values2 = Vec :: with_capacity ( cap) ;
832+ let mut timestamps_unix_nano = Vec :: with_capacity ( cap) ;
833+ for ( v1, v2, ts) in vals {
834+ let ( v1, v2) = self
835+ . upscaling_rules
836+ . upscale_pair ( st1, v1, st2, v2, labels) ?;
837+ values1. push ( v1) ;
838+ values2. push ( v2) ;
839+ timestamps_unix_nano. push ( ts. get ( ) as u64 ) ;
840+ }
815841 let stack_index = sample. stacktrace . into_raw_id ( ) as i32 ;
816842 profile1. sample . push ( crate :: otel:: Sample {
817843 stack_index,
@@ -920,6 +946,8 @@ impl Profile {
920946 extended_label_sets. push ( labels) ;
921947 }
922948
949+ #[ cfg( feature = "otel" ) ]
950+ let paired_samples = std:: mem:: take ( & mut self . observations . paired_samples ) ;
923951 let iter = std:: mem:: take ( & mut self . observations ) . try_into_iter ( ) ?;
924952 for ( sample, timestamp, mut values) in iter {
925953 let labels = & mut extended_label_sets[ sample. labels . to_raw_id ( ) ] ;
@@ -930,7 +958,11 @@ impl Profile {
930958 . map ( Id :: to_raw_id)
931959 . collect ( ) ;
932960 self . check_location_ids_are_valid ( & location_ids, self . locations . len ( ) ) ?;
961+ #[ cfg( not( feature = "otel" ) ) ]
933962 self . upscaling_rules . upscale_values ( & mut values, labels) ;
963+ #[ cfg( feature = "otel" ) ]
964+ self . upscaling_rules
965+ . upscale_values ( & mut values, & paired_samples, labels) ?;
934966
935967 // Use the extra slot in the labels vector to store the timestamp without any reallocs.
936968 if let Some ( ts) = timestamp {
@@ -1325,19 +1357,25 @@ impl Profile {
13251357 label_sets : Default :: default ( ) ,
13261358 locations : Default :: default ( ) ,
13271359 mappings : Default :: default ( ) ,
1360+ #[ cfg( not( feature = "otel" ) ) ]
13281361 observations : Default :: default ( ) ,
1362+ #[ cfg( feature = "otel" ) ]
1363+ observations : Observations :: new ( sample_types. clone ( ) , sample_type_index_map) ,
13291364 period,
13301365 #[ cfg( feature = "otel" ) ]
13311366 sample_type_index_map,
1332- sample_types,
1367+ sample_types : sample_types . clone ( ) ,
13331368 stack_traces : Default :: default ( ) ,
13341369 start_time,
13351370 strings : Default :: default ( ) ,
13361371 string_storage,
13371372 string_storage_cached_profile_id : None , /* Never reuse an id! See comments on
13381373 * CachedProfileId for why. */
13391374 timestamp_key : Default :: default ( ) ,
1375+ #[ cfg( not( feature = "otel" ) ) ]
13401376 upscaling_rules : Default :: default ( ) ,
1377+ #[ cfg( feature = "otel" ) ]
1378+ upscaling_rules : ActiveUpscalingRules :: new ( sample_types) ,
13411379 } ;
13421380
13431381 let _id = profile. intern ( "" ) ;
0 commit comments