Skip to content

Commit 0fe9ef4

Browse files
sjhddhademelamir
andauthored
fix: await async metric delivery in metrics collection tests (#490)
## Which issue does this PR close? Closes #487. ## Rationale for this change `test_metrics_collection_e2e_1` flakes intermittently in CI, panicking on `Option::unwrap()` of a `None` value at the task-key lookup in `run_metrics_collection_e2e_test` (e.g. [this run](https://github.com/datafusion-contrib/datafusion-distributed/actions/runs/27137166880/job/80101529756)). Per-task metrics are delivered to the coordinator asynchronously over the `WorkerToCoordinator` side channel by a background task, which may still be in flight after `execute_plan` returns. The test asserted on the collected metrics immediately, racing that delivery, so a task key was occasionally still absent from the `MetricsStore` when the assertion ran. ## What changes are included in this PR? - Add a `wait_for_task_metrics` test helper that waits on the `MetricsStore` watch channel until every expected task key is present before asserting, bounded by a 10s timeout that reports any still-missing keys. This mirrors the existing `DistributedExec::wait_for_metrics` pattern but waits on exactly the task keys the assertions check. - Use it in `run_metrics_collection_e2e_test` (covers `_e2e_1` and `_e2e_2`). - `test_metrics_collection_with_limit_causing_early_stream_termination` had the same race papered over with a single `yield_now().await`; replace that with the same deterministic wait. ## Are there any user-facing changes? No. Test-only change. --------- Signed-off-by: sjhddh <151469562+sjhddh@users.noreply.github.com> Co-authored-by: sjhddh <151469562+sjhddh@users.noreply.github.com>
1 parent 09c2b18 commit 0fe9ef4

1 file changed

Lines changed: 15 additions & 18 deletions

File tree

src/metrics/task_metrics_collector.rs

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -172,14 +172,15 @@ mod tests {
172172
DisplayableExecutionPlan::new(plan.as_ref()).indent(true)
173173
);
174174

175+
// Per-task metrics are delivered asynchronously over the `WorkerToCoordinator` side
176+
// channel after execution completes; await that delivery instead of racing it (see #487).
177+
dist_exec.wait_for_metrics().await;
178+
179+
let metrics_store = dist_exec.metrics_store.as_ref().unwrap();
180+
175181
// Ensure that there's metrics for each node for each task for each stage.
176182
for expected_task_key in expected_task_keys {
177-
let actual_metrics = dist_exec
178-
.metrics_store
179-
.as_ref()
180-
.unwrap()
181-
.get(&expected_task_key)
182-
.unwrap();
183+
let actual_metrics = metrics_store.get(&expected_task_key).unwrap();
183184

184185
// Verify that metrics were collected for all nodes. Some nodes may legitimately have
185186
// empty metrics (e.g., custom execution plans without metrics), which is fine - we
@@ -294,22 +295,18 @@ mod tests {
294295
execute_plan(plan.clone(), &ctx).await;
295296

296297
// Metrics are delivered via the WorkerToCoordinator side channel in a background task.
297-
// Yield briefly to let it complete before asserting.
298-
tokio::task::yield_now().await;
298+
// Wait for that delivery to complete before asserting, rather than racing it.
299+
dist_exec.wait_for_metrics().await;
300+
let metrics_store = dist_exec.metrics_store.as_ref().unwrap();
299301

300302
for expected_task_key in &expected_task_keys {
301-
let actual_metrics = dist_exec
302-
.metrics_store
303-
.as_ref()
304-
.unwrap()
305-
.get(expected_task_key)
306-
.unwrap_or_else(|| {
307-
panic!(
308-
"Missing metrics for task key {expected_task_key:?}. \
303+
let actual_metrics = metrics_store.get(expected_task_key).unwrap_or_else(|| {
304+
panic!(
305+
"Missing metrics for task key {expected_task_key:?}. \
309306
The LIMIT caused the stream to be dropped before the worker \
310307
sent metrics via the coordinator channel."
311-
)
312-
});
308+
)
309+
});
313310
let stage = stages.get(&(expected_task_key.stage_id as usize)).unwrap();
314311
let stage_plan = stage.local_plan().unwrap();
315312
assert_eq!(

0 commit comments

Comments
 (0)