@@ -326,15 +326,8 @@ impl Interior {
326326 if message. exec_type . is_none ( ) {
327327 continue ;
328328 }
329- match pb:: ExecutionType :: try_from ( message. exec_type . unwrap ( ) ) {
330- Ok ( t) => debug ! ( "received {:?} command from server" , t) ,
331- Err ( _) => {
332- warn ! (
333- "unsupported remote exec type id {}" ,
334- message. exec_type. unwrap( )
335- ) ;
336- continue ;
337- }
329+ if let Ok ( t) = pb:: ExecutionType :: try_from ( message. exec_type . unwrap ( ) ) {
330+ debug ! ( "received {:?} command from server" , t) ;
338331 }
339332 if sender. send ( message) . await . is_err ( ) {
340333 debug ! ( "responser channel closed" ) ;
@@ -902,30 +895,41 @@ impl Responser {
902895 match self . msg_recv . poll_recv ( ctx) {
903896 // sender closed, terminate the current stream
904897 Poll :: Ready ( None ) => ControlFlow :: Return ( None ) ,
905- Poll :: Ready ( Some ( msg) ) => match pb:: ExecutionType :: try_from ( msg. exec_type . unwrap ( ) ) {
906- Ok ( pb:: ExecutionType :: ListCommand ) => {
907- let commands = Self :: generate_command_list ( ) ;
908- debug ! ( "list command returning {} entries" , commands. len( ) ) ;
909- ControlFlow :: Return ( Some ( pb:: RemoteExecResponse {
910- agent_id : Some ( self . agent_id . read ( ) . deref ( ) . into ( ) ) ,
911- request_id : msg. request_id ,
912- commands,
913- ..Default :: default ( )
914- } ) )
915- }
916- Ok ( pb:: ExecutionType :: ListNamespace ) => {
917- trace ! ( "pending list namespace" ) ;
918- self . pending_lsns = Some ( ( msg. request_id , Box :: pin ( ls_netns ( ) ) ) ) ;
919- ControlFlow :: Continue
920- }
921- Ok ( pb:: ExecutionType :: RunCommand ) => self . handle_run_command_message ( msg) ,
922- #[ cfg( feature = "enterprise" ) ]
923- Ok ( pb:: ExecutionType :: DryReplayPcap ) => self . handle_dry_replay_pcap_message ( msg) ,
924- _ => {
925- warn ! ( "unsupported execution type: {:?}" , msg. exec_type. unwrap( ) ) ;
926- ControlFlow :: Fallthrough
898+ Poll :: Ready ( Some ( msg) ) => {
899+ let raw_exec_type = msg. exec_type . unwrap ( ) ;
900+ match pb:: ExecutionType :: try_from ( raw_exec_type) {
901+ Ok ( pb:: ExecutionType :: ListCommand ) => {
902+ let commands = Self :: generate_command_list ( ) ;
903+ debug ! ( "list command returning {} entries" , commands. len( ) ) ;
904+ ControlFlow :: Return ( Some ( pb:: RemoteExecResponse {
905+ agent_id : Some ( self . agent_id . read ( ) . deref ( ) . into ( ) ) ,
906+ request_id : msg. request_id ,
907+ commands,
908+ ..Default :: default ( )
909+ } ) )
910+ }
911+ Ok ( pb:: ExecutionType :: ListNamespace ) => {
912+ trace ! ( "pending list namespace" ) ;
913+ self . pending_lsns = Some ( ( msg. request_id , Box :: pin ( ls_netns ( ) ) ) ) ;
914+ ControlFlow :: Continue
915+ }
916+ Ok ( pb:: ExecutionType :: RunCommand ) => self . handle_run_command_message ( msg) ,
917+ #[ cfg( feature = "enterprise" ) ]
918+ Ok ( pb:: ExecutionType :: DryReplayPcap ) => {
919+ self . handle_dry_replay_pcap_message ( msg)
920+ }
921+ Ok ( exec_type) => ControlFlow :: Return ( self . command_failed_helper (
922+ msg. request_id ,
923+ None ,
924+ format ! ( "unsupported execution type: {exec_type:?}" ) ,
925+ ) ) ,
926+ Err ( _) => ControlFlow :: Return ( self . command_failed_helper (
927+ msg. request_id ,
928+ None ,
929+ format ! ( "unsupported execution type: {raw_exec_type}" ) ,
930+ ) ) ,
927931 }
928- } ,
932+ }
929933 _ => ControlFlow :: Fallthrough ,
930934 }
931935 }
@@ -1418,3 +1422,79 @@ async fn kubectl_log(namespace: String, pod: String, previous: bool) -> Result<O
14181422 stderr : vec ! [ ] ,
14191423 } )
14201424}
1425+
1426+ #[ cfg( test) ]
1427+ mod tests {
1428+ use std:: {
1429+ net:: { IpAddr , Ipv4Addr } ,
1430+ sync:: Arc ,
1431+ } ;
1432+
1433+ use arc_swap:: { access:: Map , ArcSwap } ;
1434+ use futures:: StreamExt ;
1435+ use parking_lot:: RwLock ;
1436+
1437+ use super :: * ;
1438+ use crate :: config:: ModuleConfig ;
1439+
1440+ fn test_config ( ) -> Config {
1441+ let current_config = Arc :: new ( ArcSwap :: from_pointee ( ModuleConfig :: default ( ) ) ) ;
1442+ Config {
1443+ flow : Map :: new ( current_config. clone ( ) , |config| & config. flow ) ,
1444+ log_parser : Map :: new ( current_config, |config| & config. log_parser ) ,
1445+ }
1446+ }
1447+
1448+ fn test_agent_id ( ) -> Arc < RwLock < AgentId > > {
1449+ Arc :: new ( RwLock :: new ( AgentId {
1450+ ipmac : ( IpAddr :: V4 ( Ipv4Addr :: LOCALHOST ) , Default :: default ( ) ) . into ( ) ,
1451+ ..Default :: default ( )
1452+ } ) )
1453+ }
1454+
1455+ async fn next_response_for ( exec_type : i32 ) -> pb:: RemoteExecResponse {
1456+ let ( sender, receiver) = mpsc:: channel ( 1 ) ;
1457+ let mut responser = Responser :: new ( test_agent_id ( ) , receiver, test_config ( ) ) ;
1458+ sender
1459+ . send ( pb:: RemoteExecRequest {
1460+ request_id : Some ( 42 ) ,
1461+ exec_type : Some ( exec_type) ,
1462+ ..Default :: default ( )
1463+ } )
1464+ . await
1465+ . unwrap ( ) ;
1466+
1467+ responser. next ( ) . await . unwrap ( )
1468+ }
1469+
1470+ #[ tokio:: test]
1471+ async fn unsupported_exec_type_returns_error_response ( ) {
1472+ let response = next_response_for ( 99 ) . await ;
1473+
1474+ assert_eq ! ( response. request_id, Some ( 42 ) ) ;
1475+ assert_eq ! (
1476+ response. errmsg. as_deref( ) ,
1477+ Some ( "unsupported execution type: 99" )
1478+ ) ;
1479+ assert ! ( response. agent_id. is_some( ) ) ;
1480+ assert ! ( response. command_result. is_some( ) ) ;
1481+ assert ! ( response. commands. is_empty( ) ) ;
1482+ assert ! ( response. linux_namespaces. is_empty( ) ) ;
1483+ }
1484+
1485+ #[ cfg( not( feature = "enterprise" ) ) ]
1486+ #[ tokio:: test]
1487+ async fn dry_replay_pcap_returns_error_without_enterprise_feature ( ) {
1488+ let response = next_response_for ( pb:: ExecutionType :: DryReplayPcap as i32 ) . await ;
1489+
1490+ assert_eq ! ( response. request_id, Some ( 42 ) ) ;
1491+ assert_eq ! (
1492+ response. errmsg. as_deref( ) ,
1493+ Some ( "unsupported execution type: DryReplayPcap" )
1494+ ) ;
1495+ assert ! ( response. agent_id. is_some( ) ) ;
1496+ assert ! ( response. command_result. is_some( ) ) ;
1497+ assert ! ( response. commands. is_empty( ) ) ;
1498+ assert ! ( response. linux_namespaces. is_empty( ) ) ;
1499+ }
1500+ }
0 commit comments