diff --git a/engine/packages/pegboard/src/ops/runner/list_runner_config_enabled_dcs.rs b/engine/packages/pegboard/src/ops/runner/list_runner_config_enabled_dcs.rs index 5b19d8a266..3bde0d2071 100644 --- a/engine/packages/pegboard/src/ops/runner/list_runner_config_enabled_dcs.rs +++ b/engine/packages/pegboard/src/ops/runner/list_runner_config_enabled_dcs.rs @@ -1,6 +1,6 @@ use anyhow::Result; use epoxy_protocol::generated::v2::CachingBehavior; -use futures_util::{StreamExt, TryStreamExt}; +use futures_util::StreamExt; use gas::prelude::*; use crate::keys; @@ -59,27 +59,41 @@ async fn list_runner_config_enabled_dcs_inner( ctx: &OperationCtx, input: &Input, ) -> Result> { - futures_util::stream::iter(ctx.config().topology().datacenters.clone()) - .map(|dc| async move { - let runner_config_key = keys::runner_config::GlobalDataKey::new( - dc.datacenter_label, - input.namespace_id, - input.runner_name.clone(), - ); - let res = ctx - .op(epoxy::ops::kv::get_optimistic::Input { - replica_id: ctx.config().epoxy_replica_id(), - key: namespace::keys::subspace().pack(&runner_config_key), - caching_behavior: CachingBehavior::Optimistic, - target_replicas: None, - save_empty: true, - }) - .await?; + Ok( + futures_util::stream::iter(ctx.config().topology().datacenters.clone()) + .map(|dc| async move { + let runner_config_key = keys::runner_config::GlobalDataKey::new( + dc.datacenter_label, + input.namespace_id, + input.runner_name.clone(), + ); + let res = ctx + .op(epoxy::ops::kv::get_optimistic::Input { + replica_id: ctx.config().epoxy_replica_id(), + key: namespace::keys::subspace().pack(&runner_config_key), + caching_behavior: CachingBehavior::Optimistic, + target_replicas: None, + save_empty: true, + }) + .await; - Ok(res.value.map(|_| dc.datacenter_label)) - }) - .buffer_unordered(512) - .try_filter_map(|x| std::future::ready(Ok(x))) - .try_collect::>() - .await + match res { + Ok(res) => res.value.map(|_| dc.datacenter_label), + Err(err) => { + tracing::warn!( + ?err, + namespace_id=?input.namespace_id, + runner_name=%input.runner_name, + dc_label=dc.datacenter_label, + "failed to read runner config from dc" + ); + None + } + } + }) + .buffer_unordered(512) + .filter_map(std::future::ready) + .collect::>() + .await, + ) }