diff --git a/engine/packages/pegboard/src/ops/runner/list_runner_config_enabled_dcs.rs b/engine/packages/pegboard/src/ops/runner/list_runner_config_enabled_dcs.rs index a8abf1404c..3bde0d2071 100644 --- a/engine/packages/pegboard/src/ops/runner/list_runner_config_enabled_dcs.rs +++ b/engine/packages/pegboard/src/ops/runner/list_runner_config_enabled_dcs.rs @@ -59,39 +59,41 @@ async fn list_runner_config_enabled_dcs_inner( ctx: &OperationCtx, input: &Input, ) -> Result> { - futures_util::stream::iter(ctx.config().topology().datacenters.clone()) - .map(|dc| async move { - let runner_config_key = keys::runner_config::GlobalDataKey::new( - dc.datacenter_label, - input.namespace_id, - input.runner_name.clone(), - ); - let res = ctx - .op(epoxy::ops::kv::get_optimistic::Input { - replica_id: ctx.config().epoxy_replica_id(), - key: namespace::keys::subspace().pack(&runner_config_key), - caching_behavior: CachingBehavior::Optimistic, - target_replicas: None, - save_empty: true, - }) - .await; + Ok( + futures_util::stream::iter(ctx.config().topology().datacenters.clone()) + .map(|dc| async move { + let runner_config_key = keys::runner_config::GlobalDataKey::new( + dc.datacenter_label, + input.namespace_id, + input.runner_name.clone(), + ); + let res = ctx + .op(epoxy::ops::kv::get_optimistic::Input { + replica_id: ctx.config().epoxy_replica_id(), + key: namespace::keys::subspace().pack(&runner_config_key), + caching_behavior: CachingBehavior::Optimistic, + target_replicas: None, + save_empty: true, + }) + .await; - match res { - Ok(res) => res.value.map(|_| dc.datacenter_label), - Err(err) => { - tracing::warn!( - ?err, - namespace_id=?input.namespace_id, - runner_name=%input.runner_name, - dc_label=dc.datacenter_label, - "failed to read runner config from dc" - ); - None + match res { + Ok(res) => res.value.map(|_| dc.datacenter_label), + Err(err) => { + tracing::warn!( + ?err, + namespace_id=?input.namespace_id, + runner_name=%input.runner_name, + dc_label=dc.datacenter_label, + "failed to read runner config from dc" + ); + None + } } - } - }) - .buffer_unordered(512) - .filter_map(std::future::ready) - .collect::>() - .await + }) + .buffer_unordered(512) + .filter_map(std::future::ready) + .collect::>() + .await, + ) } diff --git a/engine/packages/pegboard/src/workflows/runner_pool_backfill.rs b/engine/packages/pegboard/src/workflows/runner_pool_backfill.rs index 1d60a1c673..d05069dfa8 100644 --- a/engine/packages/pegboard/src/workflows/runner_pool_backfill.rs +++ b/engine/packages/pegboard/src/workflows/runner_pool_backfill.rs @@ -99,10 +99,10 @@ async fn backfill_chunk(ctx: &ActivityCtx, input: &BackfillChunkInput) -> Result }; new_last_key = [entry.key(), &[0xff]].concat(); - if let Ok((_, (_, _, leaf))) = + if let Ok((_, _, leaf)) = runner_config_subspace.unpack::<(Id, String, usize)>(entry.key()) { - if leaf == keys::runner_config::PROTOCOL_VERSION { + if leaf == universaldb::utils::keys::PROTOCOL_VERSION { continue; } }