@@ -651,20 +651,24 @@ fn route(
651651 ( "POST" , "/api/v1/lance/search" ) => handle_lance_search ( body, state, format) ,
652652
653653 // =====================================================================
654- // UNIFIED SURFACE — CrewAI + n8n proxied through one gateway
654+ // UNIFIED SURFACE — CrewAI + n8n
655+ //
656+ // When vendor-crewai / vendor-n8n features are enabled, these are
657+ // direct in-process calls (single binary, no HTTP). Otherwise,
658+ // they proxy to external services via HTTP.
655659 // =====================================================================
656660
657- // CrewAI endpoints → proxy to crewai-rust service
658- ( "POST" , "/api/v1/crew/execute" ) => handle_proxy ( "crew" , "/execute" , body, format) ,
659- ( "POST" , "/api/v1/crew/kickoff" ) => handle_proxy ( "crew" , "/execute" , body, format) ,
660- ( "GET" , "/api/v1/crew/health" ) => handle_proxy_get ( "crew" , "/health" , format) ,
661- ( "GET" , "/api/v1/crew/modules" ) => handle_proxy_get ( "crew" , "/modules" , format) ,
661+ // CrewAI endpoints
662+ ( "POST" , "/api/v1/crew/execute" ) => dispatch_crew_post ( "/execute" , body, format) ,
663+ ( "POST" , "/api/v1/crew/kickoff" ) => dispatch_crew_post ( "/execute" , body, format) ,
664+ ( "GET" , "/api/v1/crew/health" ) => dispatch_crew_get ( "/health" , format) ,
665+ ( "GET" , "/api/v1/crew/modules" ) => dispatch_crew_get ( "/modules" , format) ,
662666
663- // n8n endpoints → proxy to n8n-rs service
664- ( "GET" , "/api/v1/workflows" ) => handle_proxy_get ( "n8n" , "/api/v1/workflows" , format) ,
665- ( "POST" , "/api/v1/workflows" ) => handle_proxy ( "n8n" , "/api/v1/workflows" , body, format) ,
666- ( "POST" , "/api/v1/executions" ) => handle_proxy ( "n8n" , "/api/v1/executions" , body, format) ,
667- ( "GET" , "/api/v1/executions" ) => handle_proxy_get ( "n8n" , "/api/v1/executions" , format) ,
667+ // n8n endpoints
668+ ( "GET" , "/api/v1/workflows" ) => dispatch_n8n_get ( "/api/v1/workflows" , format) ,
669+ ( "POST" , "/api/v1/workflows" ) => dispatch_n8n_post ( "/api/v1/workflows" , body, format) ,
670+ ( "POST" , "/api/v1/executions" ) => dispatch_n8n_post ( "/api/v1/executions" , body, format) ,
671+ ( "GET" , "/api/v1/executions" ) => dispatch_n8n_get ( "/api/v1/executions" , format) ,
668672
669673 // Unified command surface — Redis-like protocol for all 3 engines
670674 ( "POST" , "/cmd" ) => handle_unified_command ( body, state, format) ,
@@ -3264,6 +3268,150 @@ fn read_http_response(stream: &mut TcpStream) -> Result<String, String> {
32643268 Ok ( response)
32653269}
32663270
3271+ // =============================================================================
3272+ // VENDOR DISPATCH — single-binary vs HTTP proxy
3273+ // =============================================================================
3274+ //
3275+ // When vendor-crewai / vendor-n8n features are enabled, these functions call
3276+ // directly into the linked library crates (zero HTTP overhead, one process).
3277+ // When disabled, they fall back to the HTTP proxy (multi-service mode).
3278+
3279+ /// Dispatch a POST to crewai — in-process if vendor-crewai, else HTTP proxy.
3280+ fn dispatch_crew_post ( path : & str , body : & str , format : ResponseFormat ) -> Vec < u8 > {
3281+ #[ cfg( feature = "vendor-crewai" ) ]
3282+ {
3283+ // Direct in-process call to crewai-rust library.
3284+ // The crewai crate exposes its server module as a library.
3285+ // We use a thread-local tokio runtime to bridge sync → async.
3286+ match call_vendor_async ( || async {
3287+ crewai_vendor:: server:: handle_request_body ( path, body) . await
3288+ } ) {
3289+ Ok ( response) => match format {
3290+ ResponseFormat :: Json => http_json ( 200 , & response) ,
3291+ ResponseFormat :: Arrow => wrap_proxy_arrow ( "crew" , & response) ,
3292+ } ,
3293+ Err ( e) => http_json ( 500 , & format ! (
3294+ r#"{{"error":"vendor_crewai_error","detail":"{}"}}"# , e
3295+ ) ) ,
3296+ }
3297+ }
3298+ #[ cfg( not( feature = "vendor-crewai" ) ) ]
3299+ {
3300+ handle_proxy ( "crew" , path, body, format)
3301+ }
3302+ }
3303+
3304+ /// Dispatch a GET to crewai.
3305+ fn dispatch_crew_get ( path : & str , format : ResponseFormat ) -> Vec < u8 > {
3306+ #[ cfg( feature = "vendor-crewai" ) ]
3307+ {
3308+ match call_vendor_async ( || async {
3309+ crewai_vendor:: server:: handle_get ( path) . await
3310+ } ) {
3311+ Ok ( response) => match format {
3312+ ResponseFormat :: Json => http_json ( 200 , & response) ,
3313+ ResponseFormat :: Arrow => wrap_proxy_arrow ( "crew" , & response) ,
3314+ } ,
3315+ Err ( e) => http_json ( 500 , & format ! (
3316+ r#"{{"error":"vendor_crewai_error","detail":"{}"}}"# , e
3317+ ) ) ,
3318+ }
3319+ }
3320+ #[ cfg( not( feature = "vendor-crewai" ) ) ]
3321+ {
3322+ handle_proxy_get ( "crew" , path, format)
3323+ }
3324+ }
3325+
3326+ /// Dispatch a POST to n8n — in-process if vendor-n8n, else HTTP proxy.
3327+ fn dispatch_n8n_post ( path : & str , body : & str , format : ResponseFormat ) -> Vec < u8 > {
3328+ #[ cfg( feature = "vendor-n8n" ) ]
3329+ {
3330+ match call_vendor_async ( || async {
3331+ n8n_grpc:: handle_api_post ( path, body) . await
3332+ } ) {
3333+ Ok ( response) => match format {
3334+ ResponseFormat :: Json => http_json ( 200 , & response) ,
3335+ ResponseFormat :: Arrow => wrap_proxy_arrow ( "n8n" , & response) ,
3336+ } ,
3337+ Err ( e) => http_json ( 500 , & format ! (
3338+ r#"{{"error":"vendor_n8n_error","detail":"{}"}}"# , e
3339+ ) ) ,
3340+ }
3341+ }
3342+ #[ cfg( not( feature = "vendor-n8n" ) ) ]
3343+ {
3344+ handle_proxy ( "n8n" , path, body, format)
3345+ }
3346+ }
3347+
3348+ /// Dispatch a GET to n8n.
3349+ fn dispatch_n8n_get ( path : & str , format : ResponseFormat ) -> Vec < u8 > {
3350+ #[ cfg( feature = "vendor-n8n" ) ]
3351+ {
3352+ match call_vendor_async ( || async {
3353+ n8n_grpc:: handle_api_get ( path) . await
3354+ } ) {
3355+ Ok ( response) => match format {
3356+ ResponseFormat :: Json => http_json ( 200 , & response) ,
3357+ ResponseFormat :: Arrow => wrap_proxy_arrow ( "n8n" , & response) ,
3358+ } ,
3359+ Err ( e) => http_json ( 500 , & format ! (
3360+ r#"{{"error":"vendor_n8n_error","detail":"{}"}}"# , e
3361+ ) ) ,
3362+ }
3363+ }
3364+ #[ cfg( not( feature = "vendor-n8n" ) ) ]
3365+ {
3366+ handle_proxy_get ( "n8n" , path, format)
3367+ }
3368+ }
3369+
3370+ /// Bridge sync server → async vendor crate call.
3371+ ///
3372+ /// Uses a thread-local tokio runtime to execute the async future.
3373+ /// This is the sync→async bridge for the single-binary mode.
3374+ #[ cfg( any( feature = "vendor-crewai" , feature = "vendor-n8n" ) ) ]
3375+ fn call_vendor_async < F , Fut > ( f : F ) -> Result < String , String >
3376+ where
3377+ F : FnOnce ( ) -> Fut ,
3378+ Fut : std:: future:: Future < Output = Result < String , Box < dyn std:: error:: Error + Send + Sync > > > ,
3379+ {
3380+ // Try to use an existing runtime, or create a temporary one.
3381+ match tokio:: runtime:: Handle :: try_current ( ) {
3382+ Ok ( handle) => {
3383+ // We're inside a tokio context — use block_in_place.
3384+ tokio:: task:: block_in_place ( || handle. block_on ( f ( ) ) ) . map_err ( |e| e. to_string ( ) )
3385+ }
3386+ Err ( _) => {
3387+ // No runtime — create a one-shot runtime.
3388+ let rt = tokio:: runtime:: Builder :: new_current_thread ( )
3389+ . enable_all ( )
3390+ . build ( )
3391+ . map_err ( |e| e. to_string ( ) ) ?;
3392+ rt. block_on ( f ( ) ) . map_err ( |e| e. to_string ( ) )
3393+ }
3394+ }
3395+ }
3396+
3397+ /// Wrap a JSON string response in Arrow IPC (shared helper for vendor dispatch).
3398+ #[ cfg( any( feature = "vendor-crewai" , feature = "vendor-n8n" ) ) ]
3399+ fn wrap_proxy_arrow ( service : & str , json_response : & str ) -> Vec < u8 > {
3400+ let schema = Arc :: new ( Schema :: new ( vec ! [
3401+ Field :: new( "service" , DataType :: Utf8 , false ) ,
3402+ Field :: new( "response" , DataType :: Utf8 , false ) ,
3403+ ] ) ) ;
3404+ let batch = RecordBatch :: try_new (
3405+ schema,
3406+ vec ! [
3407+ Arc :: new( StringArray :: from( vec![ service] ) ) as ArrayRef ,
3408+ Arc :: new( StringArray :: from( vec![ json_response] ) ) as ArrayRef ,
3409+ ] ,
3410+ )
3411+ . unwrap ( ) ;
3412+ http_arrow ( 200 , & batch)
3413+ }
3414+
32673415// =============================================================================
32683416// MAIN
32693417// =============================================================================
0 commit comments