Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 51 additions & 46 deletions crates/runc-shim/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,56 +161,61 @@ async fn process_exits(
if let Subject::Pid(pid) = e.subject {
debug!("receive exit event: {}", &e);
let exit_code = e.exit_code;
for (_k, cont) in containers.write().await.iter_mut() {
let bundle = cont.bundle.to_string();
let container_id = cont.id.clone();
let mut change_process: Vec<&mut (dyn Process + Send + Sync)> = Vec::new();
// pid belongs to container init process
if cont.init.pid == pid {
// kill all children process if the container has a private PID namespace
if should_kill_all_on_exit(&bundle).await {
cont.kill(None, 9, true).await.unwrap_or_else(|e| {
error!("failed to kill init's children: {}", e)
});
}
if let Ok(process_d) = cont.get_mut_process(None) {
change_process.push(process_d);
let containers = containers.clone();
let tx = tx.clone();
tokio::spawn(async move {
for (_k, cont) in containers.write().await.iter_mut() {
let bundle = cont.bundle.to_string();
let container_id = cont.id.clone();
let mut change_process: Vec<&mut (dyn Process + Send + Sync)> = Vec::new();
// pid belongs to container init process
if cont.init.pid == pid {
// kill all children process if the container has a private PID namespace
if should_kill_all_on_exit(&bundle).await {
cont.kill(None, 9, true).await.unwrap_or_else(|e| {
error!("failed to kill init's children: {}", e)
});
}
if let Ok(process_d) = cont.get_mut_process(None) {
change_process.push(process_d);
} else {
break;
}
} else {
break;
// pid belongs to container common process
if let Some((_, p)) =
cont.processes.iter_mut().find(|(_, p)| p.pid == pid)
{
change_process.push(p as &mut (dyn Process + Send + Sync));
}
}
} else {
// pid belongs to container common process
if let Some((_, p)) = cont.processes.iter_mut().find(|(_, p)| p.pid == pid)
{
change_process.push(p as &mut (dyn Process + Send + Sync));
let process_len = change_process.len();
for process in change_process {
// set exit for process
process.set_exited(exit_code).await;
let code = process.exit_code().await;
let exited_at = process.exited_at().await;
// publish event
let ts = convert_to_timestamp(exited_at);
let event = TaskExit {
container_id: container_id.clone(),
id: process.id().await.to_string(),
pid: process.pid().await as u32,
exit_status: code as u32,
exited_at: Some(ts).into(),
..Default::default()
};
let topic = event.topic();
tx.send((topic.to_string(), Box::new(event)))
.await
.unwrap_or_else(|e| warn!("send {} to publisher: {}", topic, e));
}
//if process has been find , no need to keep search
if process_len != 0 {
break;
}
}
let process_len = change_process.len();
for process in change_process {
// set exit for process
process.set_exited(exit_code).await;
let code = process.exit_code().await;
let exited_at = process.exited_at().await;
// publish event
let ts = convert_to_timestamp(exited_at);
let event = TaskExit {
container_id: container_id.clone(),
id: process.id().await.to_string(),
pid: process.pid().await as u32,
exit_status: code as u32,
exited_at: Some(ts).into(),
..Default::default()
};
let topic = event.topic();
tx.send((topic.to_string(), Box::new(event)))
.await
.unwrap_or_else(|e| warn!("send {} to publisher: {}", topic, e));
}
//if process has been find , no need to keep search
if process_len != 0 {
break;
}
}
});
}
}
monitor_unsubscribe(s.id).await.unwrap_or_default();
Expand Down
Loading