Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@

### Rust
- Format code with rustfmt.
- While still satisfying rustfmt, prefer code shapes that minimize indentation.
- Prefer captured formatting arguments such as `format!("{argument}")` over positional forms such as `format!("{}", argument)`.
- `@agent` uses the `stable` Rust toolchain via `@agent/rust-toolchain`.
- Prefer placing new Rust code in `@agent/crates`.
- Dependencies in any `Cargo.toml` must be sorted in alphabet order.
Expand Down
28 changes: 20 additions & 8 deletions agent/crates/public/src/codecs/hessian2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,18 @@ impl Hessian2Decoder {
}
}
// int
0x80..=0xbf | 0xc0..=0xcf | 0xd0..=0xd7 | BC_INT => {
Self::to_hessian_value(Self::decode_i32(bytes, start), bytes.len(), HessianValue::Int)
}
0x80..=0xbf | 0xc0..=0xcf | 0xd0..=0xd7 | BC_INT => Self::to_hessian_value(
Self::decode_i32(bytes, start),
bytes.len(),
HessianValue::Int,
),
// long
0xd8..=0xef | 0xf0..=0xff | 0x38..=0x3f | BC_LONG_INT | BC_LONG => {
Self::to_hessian_value(Self::decode_i64(bytes, start), bytes.len(), HessianValue::Long)
Self::to_hessian_value(
Self::decode_i64(bytes, start),
bytes.len(),
HessianValue::Long,
)
}
// date
BC_DATE | BC_DATE_MINUTE => Self::to_hessian_value(
Expand Down Expand Up @@ -139,12 +145,18 @@ impl Hessian2Decoder {
None => (None, bytes.len()),
},
// hashmap
BC_MAP | BC_MAP_UNTYPED => {
Self::to_hessian_value(self.decode_map(bytes, start), bytes.len(), HessianValue::Map)
}
BC_MAP | BC_MAP_UNTYPED => Self::to_hessian_value(
self.decode_map(bytes, start),
bytes.len(),
HessianValue::Map,
),
// object,只能处理为 hashmap
BC_OBJECT_DEF | BC_OBJECT | BC_OBJECT_DIRECT..=BC_OBJECT_DIRECT_MAX => {
Self::to_hessian_value(self.decode_obj(bytes, start), bytes.len(), HessianValue::Map)
Self::to_hessian_value(
self.decode_obj(bytes, start),
bytes.len(),
HessianValue::Map,
)
}
_ => (None, bytes.len()), // 如果不符合任何一种,表示这个 tag 没有意义,直接丢弃剩余所有数据
}
Expand Down
144 changes: 112 additions & 32 deletions agent/src/rpc/remote_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,15 +326,8 @@ impl Interior {
if message.exec_type.is_none() {
continue;
}
match pb::ExecutionType::try_from(message.exec_type.unwrap()) {
Ok(t) => debug!("received {:?} command from server", t),
Err(_) => {
warn!(
"unsupported remote exec type id {}",
message.exec_type.unwrap()
);
continue;
}
if let Ok(t) = pb::ExecutionType::try_from(message.exec_type.unwrap()) {
debug!("received {:?} command from server", t);
}
if sender.send(message).await.is_err() {
debug!("responser channel closed");
Expand Down Expand Up @@ -902,30 +895,41 @@ impl Responser {
match self.msg_recv.poll_recv(ctx) {
// sender closed, terminate the current stream
Poll::Ready(None) => ControlFlow::Return(None),
Poll::Ready(Some(msg)) => match pb::ExecutionType::try_from(msg.exec_type.unwrap()) {
Ok(pb::ExecutionType::ListCommand) => {
let commands = Self::generate_command_list();
debug!("list command returning {} entries", commands.len());
ControlFlow::Return(Some(pb::RemoteExecResponse {
agent_id: Some(self.agent_id.read().deref().into()),
request_id: msg.request_id,
commands,
..Default::default()
}))
}
Ok(pb::ExecutionType::ListNamespace) => {
trace!("pending list namespace");
self.pending_lsns = Some((msg.request_id, Box::pin(ls_netns())));
ControlFlow::Continue
}
Ok(pb::ExecutionType::RunCommand) => self.handle_run_command_message(msg),
#[cfg(feature = "enterprise")]
Ok(pb::ExecutionType::DryReplayPcap) => self.handle_dry_replay_pcap_message(msg),
_ => {
warn!("unsupported execution type: {:?}", msg.exec_type.unwrap());
ControlFlow::Fallthrough
Poll::Ready(Some(msg)) => {
let raw_exec_type = msg.exec_type.unwrap();
match pb::ExecutionType::try_from(raw_exec_type) {
Ok(pb::ExecutionType::ListCommand) => {
let commands = Self::generate_command_list();
debug!("list command returning {} entries", commands.len());
ControlFlow::Return(Some(pb::RemoteExecResponse {
agent_id: Some(self.agent_id.read().deref().into()),
request_id: msg.request_id,
commands,
..Default::default()
}))
}
Ok(pb::ExecutionType::ListNamespace) => {
trace!("pending list namespace");
self.pending_lsns = Some((msg.request_id, Box::pin(ls_netns())));
ControlFlow::Continue
}
Ok(pb::ExecutionType::RunCommand) => self.handle_run_command_message(msg),
#[cfg(feature = "enterprise")]
Ok(pb::ExecutionType::DryReplayPcap) => {
self.handle_dry_replay_pcap_message(msg)
}
Ok(exec_type) => ControlFlow::Return(self.command_failed_helper(
msg.request_id,
None,
format!("unsupported execution type: {exec_type:?}"),
)),
Err(_) => ControlFlow::Return(self.command_failed_helper(
msg.request_id,
None,
format!("unsupported execution type: {raw_exec_type}"),
)),
}
},
}
_ => ControlFlow::Fallthrough,
}
}
Expand Down Expand Up @@ -1418,3 +1422,79 @@ async fn kubectl_log(namespace: String, pod: String, previous: bool) -> Result<O
stderr: vec![],
})
}

#[cfg(test)]
mod tests {
use std::{
net::{IpAddr, Ipv4Addr},
sync::Arc,
};

use arc_swap::{access::Map, ArcSwap};
use futures::StreamExt;
use parking_lot::RwLock;

use super::*;
use crate::config::ModuleConfig;

fn test_config() -> Config {
let current_config = Arc::new(ArcSwap::from_pointee(ModuleConfig::default()));
Config {
flow: Map::new(current_config.clone(), |config| &config.flow),
log_parser: Map::new(current_config, |config| &config.log_parser),
}
}

fn test_agent_id() -> Arc<RwLock<AgentId>> {
Arc::new(RwLock::new(AgentId {
ipmac: (IpAddr::V4(Ipv4Addr::LOCALHOST), Default::default()).into(),
..Default::default()
}))
}

async fn next_response_for(exec_type: i32) -> pb::RemoteExecResponse {
let (sender, receiver) = mpsc::channel(1);
let mut responser = Responser::new(test_agent_id(), receiver, test_config());
sender
.send(pb::RemoteExecRequest {
request_id: Some(42),
exec_type: Some(exec_type),
..Default::default()
})
.await
.unwrap();

responser.next().await.unwrap()
}

#[tokio::test]
async fn unsupported_exec_type_returns_error_response() {
let response = next_response_for(99).await;

assert_eq!(response.request_id, Some(42));
assert_eq!(
response.errmsg.as_deref(),
Some("unsupported execution type: 99")
);
assert!(response.agent_id.is_some());
assert!(response.command_result.is_some());
assert!(response.commands.is_empty());
assert!(response.linux_namespaces.is_empty());
}

#[cfg(not(feature = "enterprise"))]
#[tokio::test]
async fn dry_replay_pcap_returns_error_without_enterprise_feature() {
let response = next_response_for(pb::ExecutionType::DryReplayPcap as i32).await;

assert_eq!(response.request_id, Some(42));
assert_eq!(
response.errmsg.as_deref(),
Some("unsupported execution type: DryReplayPcap")
);
assert!(response.agent_id.is_some());
assert!(response.command_result.is_some());
assert!(response.commands.is_empty());
assert!(response.linux_namespaces.is_empty());
}
}
Loading