Skip to content

Commit 3674048

Browse files
authored
fix(pegboard): isolate runner config dc lookup failures (#4625)
# Description Please include a summary of the changes and the related issue. Please also include relevant motivation and context. ## Type of change - [ ] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [ ] This change requires a documentation update ## How Has This Been Tested? Please describe the tests that you ran to verify your changes. ## Checklist: - [ ] My code follows the style guidelines of this project - [ ] I have performed a self-review of my code - [ ] I have commented my code, particularly in hard-to-understand areas - [ ] I have made corresponding changes to the documentation - [ ] My changes generate no new warnings - [ ] I have added tests that prove my fix is effective or that my feature works - [ ] New and existing unit tests pass locally with my changes
1 parent 0215b9f commit 3674048

File tree

1 file changed

+37
-23
lines changed

1 file changed

+37
-23
lines changed

engine/packages/pegboard/src/ops/runner/list_runner_config_enabled_dcs.rs

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use anyhow::Result;
22
use epoxy_protocol::generated::v2::CachingBehavior;
3-
use futures_util::{StreamExt, TryStreamExt};
3+
use futures_util::StreamExt;
44
use gas::prelude::*;
55

66
use crate::keys;
@@ -59,27 +59,41 @@ async fn list_runner_config_enabled_dcs_inner(
5959
ctx: &OperationCtx,
6060
input: &Input,
6161
) -> Result<Vec<u16>> {
62-
futures_util::stream::iter(ctx.config().topology().datacenters.clone())
63-
.map(|dc| async move {
64-
let runner_config_key = keys::runner_config::GlobalDataKey::new(
65-
dc.datacenter_label,
66-
input.namespace_id,
67-
input.runner_name.clone(),
68-
);
69-
let res = ctx
70-
.op(epoxy::ops::kv::get_optimistic::Input {
71-
replica_id: ctx.config().epoxy_replica_id(),
72-
key: namespace::keys::subspace().pack(&runner_config_key),
73-
caching_behavior: CachingBehavior::Optimistic,
74-
target_replicas: None,
75-
save_empty: true,
76-
})
77-
.await?;
62+
Ok(
63+
futures_util::stream::iter(ctx.config().topology().datacenters.clone())
64+
.map(|dc| async move {
65+
let runner_config_key = keys::runner_config::GlobalDataKey::new(
66+
dc.datacenter_label,
67+
input.namespace_id,
68+
input.runner_name.clone(),
69+
);
70+
let res = ctx
71+
.op(epoxy::ops::kv::get_optimistic::Input {
72+
replica_id: ctx.config().epoxy_replica_id(),
73+
key: namespace::keys::subspace().pack(&runner_config_key),
74+
caching_behavior: CachingBehavior::Optimistic,
75+
target_replicas: None,
76+
save_empty: true,
77+
})
78+
.await;
7879

79-
Ok(res.value.map(|_| dc.datacenter_label))
80-
})
81-
.buffer_unordered(512)
82-
.try_filter_map(|x| std::future::ready(Ok(x)))
83-
.try_collect::<Vec<_>>()
84-
.await
80+
match res {
81+
Ok(res) => res.value.map(|_| dc.datacenter_label),
82+
Err(err) => {
83+
tracing::warn!(
84+
?err,
85+
namespace_id=?input.namespace_id,
86+
runner_name=%input.runner_name,
87+
dc_label=dc.datacenter_label,
88+
"failed to read runner config from dc"
89+
);
90+
None
91+
}
92+
}
93+
})
94+
.buffer_unordered(512)
95+
.filter_map(std::future::ready)
96+
.collect::<Vec<_>>()
97+
.await,
98+
)
8599
}

0 commit comments

Comments
 (0)