@@ -536,7 +536,7 @@ impl<'a> Builder<'a> {
536536 . insert ( key. clone ( ) , ( input_tx, node. inputs . clone ( ) ) ) ;
537537
538538 let ( transform_task, transform_outputs) =
539- build_transform ( transform, node, input_rx, & self . utilization_registry ) ;
539+ self . build_transform ( transform, node, input_rx) ;
540540
541541 self . outputs . extend ( transform_outputs) ;
542542 self . tasks . insert ( key. clone ( ) , transform_task) ;
@@ -730,6 +730,153 @@ impl<'a> Builder<'a> {
730730 self . detach_triggers . insert ( key. clone ( ) , trigger) ;
731731 }
732732 }
733+
734+ fn build_transform (
735+ & self ,
736+ transform : Transform ,
737+ node : TransformNode ,
738+ input_rx : BufferReceiver < EventArray > ,
739+ ) -> ( Task , HashMap < OutputId , fanout:: ControlChannel > ) {
740+ match transform {
741+ // TODO: avoid the double boxing for function transforms here
742+ Transform :: Function ( t) => self . build_sync_transform ( Box :: new ( t) , node, input_rx) ,
743+ Transform :: Synchronous ( t) => self . build_sync_transform ( t, node, input_rx) ,
744+ Transform :: Task ( t) => self . build_task_transform (
745+ t,
746+ input_rx,
747+ node. input_details . data_type ( ) ,
748+ node. typetag ,
749+ & node. key ,
750+ & node. outputs ,
751+ ) ,
752+ }
753+ }
754+
755+ fn build_sync_transform (
756+ & self ,
757+ t : Box < dyn SyncTransform > ,
758+ node : TransformNode ,
759+ input_rx : BufferReceiver < EventArray > ,
760+ ) -> ( Task , HashMap < OutputId , fanout:: ControlChannel > ) {
761+ let ( outputs, controls) = TransformOutputs :: new ( node. outputs , & node. key ) ;
762+
763+ let sender = self
764+ . utilization_registry
765+ . add_component ( node. key . clone ( ) , gauge ! ( "utilization" ) ) ;
766+ let runner = Runner :: new ( t, input_rx, sender, node. input_details . data_type ( ) , outputs) ;
767+ let transform = if node. enable_concurrency {
768+ runner. run_concurrently ( ) . boxed ( )
769+ } else {
770+ runner. run_inline ( ) . boxed ( )
771+ } ;
772+
773+ let transform = async move {
774+ debug ! ( "Synchronous transform starting." ) ;
775+
776+ match transform. await {
777+ Ok ( v) => {
778+ debug ! ( "Synchronous transform finished normally." ) ;
779+ Ok ( v)
780+ }
781+ Err ( e) => {
782+ debug ! ( "Synchronous transform finished with an error." ) ;
783+ Err ( e)
784+ }
785+ }
786+ } ;
787+
788+ let mut output_controls = HashMap :: new ( ) ;
789+ for ( name, control) in controls {
790+ let id = name
791+ . map ( |name| OutputId :: from ( ( & node. key , name) ) )
792+ . unwrap_or_else ( || OutputId :: from ( & node. key ) ) ;
793+ output_controls. insert ( id, control) ;
794+ }
795+
796+ let task = Task :: new ( node. key . clone ( ) , node. typetag , transform) ;
797+
798+ ( task, output_controls)
799+ }
800+
801+ fn build_task_transform (
802+ & self ,
803+ t : Box < dyn TaskTransform < EventArray > > ,
804+ input_rx : BufferReceiver < EventArray > ,
805+ input_type : DataType ,
806+ typetag : & str ,
807+ key : & ComponentKey ,
808+ outputs : & [ TransformOutput ] ,
809+ ) -> ( Task , HashMap < OutputId , fanout:: ControlChannel > ) {
810+ let ( mut fanout, control) = Fanout :: new ( ) ;
811+
812+ let sender = self
813+ . utilization_registry
814+ . add_component ( key. clone ( ) , gauge ! ( "utilization" ) ) ;
815+ let input_rx = wrap ( sender, key. clone ( ) , input_rx. into_stream ( ) ) ;
816+
817+ let events_received = register ! ( EventsReceived ) ;
818+ let filtered = input_rx
819+ . filter ( move |events| ready ( filter_events_type ( events, input_type) ) )
820+ . inspect ( move |events| {
821+ events_received. emit ( CountByteSize (
822+ events. len ( ) ,
823+ events. estimated_json_encoded_size_of ( ) ,
824+ ) )
825+ } ) ;
826+ let events_sent = register ! ( EventsSent :: from( internal_event:: Output ( None ) ) ) ;
827+ let output_id = Arc :: new ( OutputId {
828+ component : key. clone ( ) ,
829+ port : None ,
830+ } ) ;
831+
832+ // Task transforms can only write to the default output, so only a single schema def map is needed
833+ let schema_definition_map = outputs
834+ . iter ( )
835+ . find ( |x| x. port . is_none ( ) )
836+ . expect ( "output for default port required for task transforms" )
837+ . log_schema_definitions
838+ . clone ( )
839+ . into_iter ( )
840+ . map ( |( key, value) | ( key, Arc :: new ( value) ) )
841+ . collect ( ) ;
842+
843+ let stream = t
844+ . transform ( Box :: pin ( filtered) )
845+ . map ( move |mut events| {
846+ for event in events. iter_events_mut ( ) {
847+ update_runtime_schema_definition ( event, & output_id, & schema_definition_map) ;
848+ }
849+ ( events, Instant :: now ( ) )
850+ } )
851+ . inspect ( move |( events, _) : & ( EventArray , Instant ) | {
852+ events_sent. emit ( CountByteSize (
853+ events. len ( ) ,
854+ events. estimated_json_encoded_size_of ( ) ,
855+ ) ) ;
856+ } ) ;
857+ let transform = async move {
858+ debug ! ( "Task transform starting." ) ;
859+
860+ match fanout. send_stream ( stream) . await {
861+ Ok ( ( ) ) => {
862+ debug ! ( "Task transform finished normally." ) ;
863+ Ok ( TaskOutput :: Transform )
864+ }
865+ Err ( e) => {
866+ debug ! ( "Task transform finished with an error." ) ;
867+ Err ( TaskError :: wrapped ( e) )
868+ }
869+ }
870+ }
871+ . boxed ( ) ;
872+
873+ let mut outputs = HashMap :: new ( ) ;
874+ outputs. insert ( OutputId :: from ( key) , control) ;
875+
876+ let task = Task :: new ( key. clone ( ) , typetag, transform) ;
877+
878+ ( task, outputs)
879+ }
733880}
734881
735882pub async fn reload_enrichment_tables ( config : & Config ) {
@@ -940,74 +1087,6 @@ impl TransformNode {
9401087 }
9411088}
9421089
943- fn build_transform (
944- transform : Transform ,
945- node : TransformNode ,
946- input_rx : BufferReceiver < EventArray > ,
947- utilization_registry : & UtilizationRegistry ,
948- ) -> ( Task , HashMap < OutputId , fanout:: ControlChannel > ) {
949- match transform {
950- // TODO: avoid the double boxing for function transforms here
951- Transform :: Function ( t) => {
952- build_sync_transform ( Box :: new ( t) , node, input_rx, utilization_registry)
953- }
954- Transform :: Synchronous ( t) => build_sync_transform ( t, node, input_rx, utilization_registry) ,
955- Transform :: Task ( t) => build_task_transform (
956- t,
957- input_rx,
958- node. input_details . data_type ( ) ,
959- node. typetag ,
960- & node. key ,
961- & node. outputs ,
962- utilization_registry,
963- ) ,
964- }
965- }
966-
967- fn build_sync_transform (
968- t : Box < dyn SyncTransform > ,
969- node : TransformNode ,
970- input_rx : BufferReceiver < EventArray > ,
971- utilization_registry : & UtilizationRegistry ,
972- ) -> ( Task , HashMap < OutputId , fanout:: ControlChannel > ) {
973- let ( outputs, controls) = TransformOutputs :: new ( node. outputs , & node. key ) ;
974-
975- let sender = utilization_registry. add_component ( node. key . clone ( ) , gauge ! ( "utilization" ) ) ;
976- let runner = Runner :: new ( t, input_rx, sender, node. input_details . data_type ( ) , outputs) ;
977- let transform = if node. enable_concurrency {
978- runner. run_concurrently ( ) . boxed ( )
979- } else {
980- runner. run_inline ( ) . boxed ( )
981- } ;
982-
983- let transform = async move {
984- debug ! ( "Synchronous transform starting." ) ;
985-
986- match transform. await {
987- Ok ( v) => {
988- debug ! ( "Synchronous transform finished normally." ) ;
989- Ok ( v)
990- }
991- Err ( e) => {
992- debug ! ( "Synchronous transform finished with an error." ) ;
993- Err ( e)
994- }
995- }
996- } ;
997-
998- let mut output_controls = HashMap :: new ( ) ;
999- for ( name, control) in controls {
1000- let id = name
1001- . map ( |name| OutputId :: from ( ( & node. key , name) ) )
1002- . unwrap_or_else ( || OutputId :: from ( & node. key ) ) ;
1003- output_controls. insert ( id, control) ;
1004- }
1005-
1006- let task = Task :: new ( node. key . clone ( ) , node. typetag , transform) ;
1007-
1008- ( task, output_controls)
1009- }
1010-
10111090struct Runner {
10121091 transform : Box < dyn SyncTransform > ,
10131092 input_rx : Option < BufferReceiver < EventArray > > ,
@@ -1095,8 +1174,7 @@ impl Runner {
10951174
10961175 result = in_flight. next( ) , if !in_flight. is_empty( ) => {
10971176 match result {
1098- Some ( Ok ( outputs_buf) ) => {
1099- let mut outputs_buf: TransformOutputsBuf = outputs_buf;
1177+ Some ( Ok ( mut outputs_buf) ) => {
11001178 self . send_outputs( & mut outputs_buf) . await
11011179 . map_err( TaskError :: wrapped) ?;
11021180 }
@@ -1141,81 +1219,3 @@ impl Runner {
11411219 Ok ( TaskOutput :: Transform )
11421220 }
11431221}
1144-
1145- fn build_task_transform (
1146- t : Box < dyn TaskTransform < EventArray > > ,
1147- input_rx : BufferReceiver < EventArray > ,
1148- input_type : DataType ,
1149- typetag : & str ,
1150- key : & ComponentKey ,
1151- outputs : & [ TransformOutput ] ,
1152- utilization_registry : & UtilizationRegistry ,
1153- ) -> ( Task , HashMap < OutputId , fanout:: ControlChannel > ) {
1154- let ( mut fanout, control) = Fanout :: new ( ) ;
1155-
1156- let sender = utilization_registry. add_component ( key. clone ( ) , gauge ! ( "utilization" ) ) ;
1157- let input_rx = wrap ( sender, key. clone ( ) , input_rx. into_stream ( ) ) ;
1158-
1159- let events_received = register ! ( EventsReceived ) ;
1160- let filtered = input_rx
1161- . filter ( move |events| ready ( filter_events_type ( events, input_type) ) )
1162- . inspect ( move |events| {
1163- events_received. emit ( CountByteSize (
1164- events. len ( ) ,
1165- events. estimated_json_encoded_size_of ( ) ,
1166- ) )
1167- } ) ;
1168- let events_sent = register ! ( EventsSent :: from( internal_event:: Output ( None ) ) ) ;
1169- let output_id = Arc :: new ( OutputId {
1170- component : key. clone ( ) ,
1171- port : None ,
1172- } ) ;
1173-
1174- // Task transforms can only write to the default output, so only a single schema def map is needed
1175- let schema_definition_map = outputs
1176- . iter ( )
1177- . find ( |x| x. port . is_none ( ) )
1178- . expect ( "output for default port required for task transforms" )
1179- . log_schema_definitions
1180- . clone ( )
1181- . into_iter ( )
1182- . map ( |( key, value) | ( key, Arc :: new ( value) ) )
1183- . collect ( ) ;
1184-
1185- let stream = t
1186- . transform ( Box :: pin ( filtered) )
1187- . map ( move |mut events| {
1188- for event in events. iter_events_mut ( ) {
1189- update_runtime_schema_definition ( event, & output_id, & schema_definition_map) ;
1190- }
1191- ( events, Instant :: now ( ) )
1192- } )
1193- . inspect ( move |( events, _) : & ( EventArray , Instant ) | {
1194- events_sent. emit ( CountByteSize (
1195- events. len ( ) ,
1196- events. estimated_json_encoded_size_of ( ) ,
1197- ) ) ;
1198- } ) ;
1199- let transform = async move {
1200- debug ! ( "Task transform starting." ) ;
1201-
1202- match fanout. send_stream ( stream) . await {
1203- Ok ( ( ) ) => {
1204- debug ! ( "Task transform finished normally." ) ;
1205- Ok ( TaskOutput :: Transform )
1206- }
1207- Err ( e) => {
1208- debug ! ( "Task transform finished with an error." ) ;
1209- Err ( TaskError :: wrapped ( e) )
1210- }
1211- }
1212- }
1213- . boxed ( ) ;
1214-
1215- let mut outputs = HashMap :: new ( ) ;
1216- outputs. insert ( OutputId :: from ( key) , control) ;
1217-
1218- let task = Task :: new ( key. clone ( ) , typetag, transform) ;
1219-
1220- ( task, outputs)
1221- }
0 commit comments