feat: nearest asof joins#6953
Conversation
Rust Dependency DiffHead: ✅ OK: Within budget.
Added
|
Greptile SummaryThis PR adds a
Confidence Score: 4/5The nearest-join logic is algorithmically sound but an unresolved issue from a prior review (executor crash on unsupported on-key Arrow types) still needs attention before merging. The core probe/fill/carryover pipeline is correct and well-tested. The is_nearer function ends with unreachable! for types outside its dispatch table, which panics the executor thread. This was flagged in a previous review and is still present in the current diff. The type-dispatch fallthrough in is_nearer inside src/daft-local-execution/src/join/asof_join.rs needs to return a DaftError instead of panicking. Important Files Changed
Reviews (3): Last reviewed commit: "fix docstring" | Re-trigger Greptile |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #6953 +/- ##
==========================================
- Coverage 75.76% 75.51% -0.26%
==========================================
Files 1135 1149 +14
Lines 161527 161691 +164
==========================================
- Hits 122388 122095 -293
- Misses 39139 39596 +457
🚀 New features to boost your workflow:
|
|
@greptileai review |
colin-ho
left a comment
There was a problem hiding this comment.
Haven't read the local execution code yet, will get to that later
| const PER_WORKER_CARRYOVER_BACKWARD_PHASE: &str = "per_worker_carryover_backward"; | ||
| const PER_PARTITION_CARRYOVER_BACKWARD_PHASE: &str = "per_partition_carryover_backward"; | ||
| const PER_WORKER_CARRYOVER_FORWARD_PHASE: &str = "per_worker_carryover_forward"; | ||
| const PER_PARTITION_CARRYOVER_FORWARD_PHASE: &str = "per_partition_carryover_forward"; |
There was a problem hiding this comment.
Took me to understand what these are. Per worker makes sense, you compute the carryovers per worker, but per partition doesn't make sense in the same context, because it's not computing carryovers per partition, its computing carryovers across the per worker carryovers.
I recommend changing to partial / final, or local / global. My preference is partial / final.
There was a problem hiding this comment.
is this only for the phase string, or also for
let mut per_partition_outputs: Vec<Option<MaterializedOutput>> =
try_join_all(per_partition_tasks.into_iter().map(|t| async {
match t {
Some(task) => task.await.map(|mo| mo.filter(|m| m.num_rows() > 0)),
None => Ok(None),
}
}))
.await?;
let n = per_partition_outputs.len();
if propagate_forward {
for i in 1..n {
if per_partition_outputs[i].is_none() {
let prev = per_partition_outputs[i - 1].clone();
per_partition_outputs[i] = prev;
}
}
} else {
for i in (0..n.saturating_sub(1)).rev() {
if per_partition_outputs[i].is_none() {
let next = per_partition_outputs[i + 1].clone();
per_partition_outputs[i] = next;
}
}
}
Ok(per_partition_outputs)
}
?
|
In your PR description: How much faster? |
| const PER_WORKER_CARRYOVER_BACKWARD_PHASE: &str = "per_worker_carryover_backward"; | ||
| const PER_PARTITION_CARRYOVER_BACKWARD_PHASE: &str = "per_partition_carryover_backward"; | ||
| const PER_WORKER_CARRYOVER_FORWARD_PHASE: &str = "per_worker_carryover_forward"; | ||
| const PER_PARTITION_CARRYOVER_FORWARD_PHASE: &str = "per_partition_carryover_forward"; |
Nearest ASOF Join
Adds strategy="nearest" to join_asof, which matches each left row to the right row with the minimum absolute difference in the on-key. Ties prefer the larger (later/forward) value.
Changes: Native execution
Probe
Previously, we assign each right row to exactly one left row and rely on a single directional fill to propagate matches. Nearest can't do this, when two left rows are equidistant from a right row, assigning to only the nearest means the other never gets to compare that candidate.
The fix is search_bucket_nearest_range: for each right row it returns a Range covering both the floor (last left ≤ right) and ceil (first left ≥ right). Every right row is offered to every position in the range via update_nearest_match, which keeps the closer candidate.
Finalize
After per-worker probe states are merged, nearest_fill resolves unmatched left rows by running both a forward and backward fill on copies of global_best, then picking the closer candidate from the two directions using is_nearer.
is_nearer
For each comparison, dispatches on the Arrow DataType once, downcasts to the concrete PrimitiveArray, then extracts three plain Rust scalars (candidate A, candidate B, pivot) and computes |a - pivot| vs |b - pivot|
Chose this approach as type-matching and computing distances as plain Rust scalars was faster than going through Arrow compute kernels or Daft Series/array primitives
Changes: Distributed execution
We refactored the carryover computation into compute_carryovers(descending: bool), which runs a top_n(limit=1) pass over the right table:
For Nearest, both passes run concurrently via tokio::try_join!. Each partition's local join task then receives its own right data plus one boundary row from each direction (two carryovers), and the native nearest join handles the rest.