Skip to content

Commit 896a1bb

Browse files
committed
fix: dont upsert for ensure if missing
1 parent 6b3850b commit 896a1bb

File tree

3 files changed

+33
-33
lines changed

3 files changed

+33
-33
lines changed

engine/packages/pegboard/src/ops/runner_config/ensure_normal_if_missing.rs

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,25 @@ pub async fn pegboard_runner_config_ensure_normal_if_missing(
1313
ctx: &OperationCtx,
1414
input: &Input,
1515
) -> Result<()> {
16-
ctx.op(crate::ops::runner_config::upsert::Input {
17-
namespace_id: input.namespace_id,
18-
name: input.name.clone(),
19-
config: rivet_types::runner_configs::RunnerConfig {
20-
kind: rivet_types::runner_configs::RunnerConfigKind::Normal {},
21-
metadata: None,
22-
drain_on_version_upgrade: false,
23-
},
24-
})
25-
.await?;
16+
let pool_res = ctx
17+
.op(crate::ops::runner_config::get::Input {
18+
runners: vec![(input.namespace_id, input.name.clone())],
19+
bypass_cache: true,
20+
})
21+
.await?;
22+
23+
if !pool_res.is_empty() {
24+
ctx.op(crate::ops::runner_config::upsert::Input {
25+
namespace_id: input.namespace_id,
26+
name: input.name.clone(),
27+
config: rivet_types::runner_configs::RunnerConfig {
28+
kind: rivet_types::runner_configs::RunnerConfigKind::Normal {},
29+
metadata: None,
30+
drain_on_version_upgrade: false,
31+
},
32+
})
33+
.await?;
34+
}
2635

2736
Ok(())
2837
}

engine/sdks/rust/envoy-client/src/envoy.rs

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -404,30 +404,20 @@ async fn handle_shutdown(ctx: &mut EnvoyContext) {
404404

405405
ws_send(&ctx.shared, protocol::ToRivet::ToRivetStopping).await;
406406

407-
// Check if any actors are still active
408-
let has_actors = ctx
407+
// Wait for all actors to finish. The process manager (Docker,
408+
// k8s, etc.) provides the ultimate shutdown deadline.
409+
let actor_handles: Vec<mpsc::UnboundedSender<ToActor>> = ctx
409410
.actors
410411
.values()
411-
.any(|gens| gens.values().any(|entry| !entry.handle.is_closed()));
412+
.flat_map(|gens| gens.values())
413+
.filter(|entry| !entry.handle.is_closed())
414+
.map(|entry| entry.handle.clone())
415+
.collect();
412416

413-
if !has_actors {
414-
let _ = ctx.shared.envoy_tx.send(ToEnvoyMessage::Stop);
415-
} else {
416-
// Wait for all actors to finish. The process manager (Docker,
417-
// k8s, etc.) provides the ultimate shutdown deadline.
418-
let actor_handles: Vec<mpsc::UnboundedSender<ToActor>> = ctx
419-
.actors
420-
.values()
421-
.flat_map(|gens| gens.values())
422-
.filter(|entry| !entry.handle.is_closed())
423-
.map(|entry| entry.handle.clone())
424-
.collect();
425-
426-
let envoy_tx = ctx.shared.envoy_tx.clone();
427-
tokio::spawn(async move {
428-
futures_util::future::join_all(actor_handles.iter().map(|h| h.closed())).await;
429-
tracing::debug!("all actors stopped during graceful shutdown");
430-
let _ = envoy_tx.send(ToEnvoyMessage::Stop);
431-
});
432-
}
417+
let envoy_tx = ctx.shared.envoy_tx.clone();
418+
tokio::spawn(async move {
419+
futures_util::future::join_all(actor_handles.iter().map(|h| h.closed())).await;
420+
tracing::debug!("all actors stopped during graceful shutdown");
421+
let _ = envoy_tx.send(ToEnvoyMessage::Stop);
422+
});
433423
}

engine/sdks/rust/envoy-client/src/handle.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pub struct EnvoyHandle {
1717
impl EnvoyHandle {
1818
pub fn shutdown(&self, immediate: bool) {
1919
self.shared.shutting_down.store(true, Ordering::Release);
20+
2021
if immediate {
2122
let _ = self.shared.envoy_tx.send(ToEnvoyMessage::Stop);
2223
} else {

0 commit comments

Comments
 (0)