diff --git a/AGENTS.md b/AGENTS.md index 516876f2b12..909b6285238 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -44,6 +44,8 @@ ### Rust - Format code with rustfmt. +- While still satisfying rustfmt, prefer code shapes that minimize indentation. +- Prefer captured formatting arguments such as `format!("{argument}")` over positional forms such as `format!("{}", argument)`. - `@agent` uses the `stable` Rust toolchain via `@agent/rust-toolchain`. - Prefer placing new Rust code in `@agent/crates`. - Dependencies in any `Cargo.toml` must be sorted in alphabet order. diff --git a/agent/crates/public/src/codecs/hessian2.rs b/agent/crates/public/src/codecs/hessian2.rs index 3336fea13a1..a52d265dce5 100644 --- a/agent/crates/public/src/codecs/hessian2.rs +++ b/agent/crates/public/src/codecs/hessian2.rs @@ -90,12 +90,18 @@ impl Hessian2Decoder { } } // int - 0x80..=0xbf | 0xc0..=0xcf | 0xd0..=0xd7 | BC_INT => { - Self::to_hessian_value(Self::decode_i32(bytes, start), bytes.len(), HessianValue::Int) - } + 0x80..=0xbf | 0xc0..=0xcf | 0xd0..=0xd7 | BC_INT => Self::to_hessian_value( + Self::decode_i32(bytes, start), + bytes.len(), + HessianValue::Int, + ), // long 0xd8..=0xef | 0xf0..=0xff | 0x38..=0x3f | BC_LONG_INT | BC_LONG => { - Self::to_hessian_value(Self::decode_i64(bytes, start), bytes.len(), HessianValue::Long) + Self::to_hessian_value( + Self::decode_i64(bytes, start), + bytes.len(), + HessianValue::Long, + ) } // date BC_DATE | BC_DATE_MINUTE => Self::to_hessian_value( @@ -139,12 +145,18 @@ impl Hessian2Decoder { None => (None, bytes.len()), }, // hashmap - BC_MAP | BC_MAP_UNTYPED => { - Self::to_hessian_value(self.decode_map(bytes, start), bytes.len(), HessianValue::Map) - } + BC_MAP | BC_MAP_UNTYPED => Self::to_hessian_value( + self.decode_map(bytes, start), + bytes.len(), + HessianValue::Map, + ), // object,只能处理为 hashmap BC_OBJECT_DEF | BC_OBJECT | BC_OBJECT_DIRECT..=BC_OBJECT_DIRECT_MAX => { - Self::to_hessian_value(self.decode_obj(bytes, start), bytes.len(), HessianValue::Map) + Self::to_hessian_value( + self.decode_obj(bytes, start), + bytes.len(), + HessianValue::Map, + ) } _ => (None, bytes.len()), // 如果不符合任何一种,表示这个 tag 没有意义,直接丢弃剩余所有数据 } diff --git a/agent/src/rpc/remote_exec.rs b/agent/src/rpc/remote_exec.rs index 52945b771ca..505b0a5f28a 100644 --- a/agent/src/rpc/remote_exec.rs +++ b/agent/src/rpc/remote_exec.rs @@ -326,15 +326,8 @@ impl Interior { if message.exec_type.is_none() { continue; } - match pb::ExecutionType::try_from(message.exec_type.unwrap()) { - Ok(t) => debug!("received {:?} command from server", t), - Err(_) => { - warn!( - "unsupported remote exec type id {}", - message.exec_type.unwrap() - ); - continue; - } + if let Ok(t) = pb::ExecutionType::try_from(message.exec_type.unwrap()) { + debug!("received {:?} command from server", t); } if sender.send(message).await.is_err() { debug!("responser channel closed"); @@ -902,30 +895,41 @@ impl Responser { match self.msg_recv.poll_recv(ctx) { // sender closed, terminate the current stream Poll::Ready(None) => ControlFlow::Return(None), - Poll::Ready(Some(msg)) => match pb::ExecutionType::try_from(msg.exec_type.unwrap()) { - Ok(pb::ExecutionType::ListCommand) => { - let commands = Self::generate_command_list(); - debug!("list command returning {} entries", commands.len()); - ControlFlow::Return(Some(pb::RemoteExecResponse { - agent_id: Some(self.agent_id.read().deref().into()), - request_id: msg.request_id, - commands, - ..Default::default() - })) - } - Ok(pb::ExecutionType::ListNamespace) => { - trace!("pending list namespace"); - self.pending_lsns = Some((msg.request_id, Box::pin(ls_netns()))); - ControlFlow::Continue - } - Ok(pb::ExecutionType::RunCommand) => self.handle_run_command_message(msg), - #[cfg(feature = "enterprise")] - Ok(pb::ExecutionType::DryReplayPcap) => self.handle_dry_replay_pcap_message(msg), - _ => { - warn!("unsupported execution type: {:?}", msg.exec_type.unwrap()); - ControlFlow::Fallthrough + Poll::Ready(Some(msg)) => { + let raw_exec_type = msg.exec_type.unwrap(); + match pb::ExecutionType::try_from(raw_exec_type) { + Ok(pb::ExecutionType::ListCommand) => { + let commands = Self::generate_command_list(); + debug!("list command returning {} entries", commands.len()); + ControlFlow::Return(Some(pb::RemoteExecResponse { + agent_id: Some(self.agent_id.read().deref().into()), + request_id: msg.request_id, + commands, + ..Default::default() + })) + } + Ok(pb::ExecutionType::ListNamespace) => { + trace!("pending list namespace"); + self.pending_lsns = Some((msg.request_id, Box::pin(ls_netns()))); + ControlFlow::Continue + } + Ok(pb::ExecutionType::RunCommand) => self.handle_run_command_message(msg), + #[cfg(feature = "enterprise")] + Ok(pb::ExecutionType::DryReplayPcap) => { + self.handle_dry_replay_pcap_message(msg) + } + Ok(exec_type) => ControlFlow::Return(self.command_failed_helper( + msg.request_id, + None, + format!("unsupported execution type: {exec_type:?}"), + )), + Err(_) => ControlFlow::Return(self.command_failed_helper( + msg.request_id, + None, + format!("unsupported execution type: {raw_exec_type}"), + )), } - }, + } _ => ControlFlow::Fallthrough, } } @@ -1418,3 +1422,79 @@ async fn kubectl_log(namespace: String, pod: String, previous: bool) -> Result Config { + let current_config = Arc::new(ArcSwap::from_pointee(ModuleConfig::default())); + Config { + flow: Map::new(current_config.clone(), |config| &config.flow), + log_parser: Map::new(current_config, |config| &config.log_parser), + } + } + + fn test_agent_id() -> Arc> { + Arc::new(RwLock::new(AgentId { + ipmac: (IpAddr::V4(Ipv4Addr::LOCALHOST), Default::default()).into(), + ..Default::default() + })) + } + + async fn next_response_for(exec_type: i32) -> pb::RemoteExecResponse { + let (sender, receiver) = mpsc::channel(1); + let mut responser = Responser::new(test_agent_id(), receiver, test_config()); + sender + .send(pb::RemoteExecRequest { + request_id: Some(42), + exec_type: Some(exec_type), + ..Default::default() + }) + .await + .unwrap(); + + responser.next().await.unwrap() + } + + #[tokio::test] + async fn unsupported_exec_type_returns_error_response() { + let response = next_response_for(99).await; + + assert_eq!(response.request_id, Some(42)); + assert_eq!( + response.errmsg.as_deref(), + Some("unsupported execution type: 99") + ); + assert!(response.agent_id.is_some()); + assert!(response.command_result.is_some()); + assert!(response.commands.is_empty()); + assert!(response.linux_namespaces.is_empty()); + } + + #[cfg(not(feature = "enterprise"))] + #[tokio::test] + async fn dry_replay_pcap_returns_error_without_enterprise_feature() { + let response = next_response_for(pb::ExecutionType::DryReplayPcap as i32).await; + + assert_eq!(response.request_id, Some(42)); + assert_eq!( + response.errmsg.as_deref(), + Some("unsupported execution type: DryReplayPcap") + ); + assert!(response.agent_id.is_some()); + assert!(response.command_result.is_some()); + assert!(response.commands.is_empty()); + assert!(response.linux_namespaces.is_empty()); + } +}