Skip to content

feat: nearest asof joins#6953

Open
euanlimzx wants to merge 27 commits into
mainfrom
euan/nearest-asof
Open

feat: nearest asof joins#6953
euanlimzx wants to merge 27 commits into
mainfrom
euan/nearest-asof

Conversation

@euanlimzx
Copy link
Copy Markdown
Contributor

@euanlimzx euanlimzx commented May 18, 2026


Nearest ASOF Join

Adds strategy="nearest" to join_asof, which matches each left row to the right row with the minimum absolute difference in the on-key. Ties prefer the larger (later/forward) value.


Changes: Native execution
Probe
Previously, we assign each right row to exactly one left row and rely on a single directional fill to propagate matches. Nearest can't do this, when two left rows are equidistant from a right row, assigning to only the nearest means the other never gets to compare that candidate.

The fix is search_bucket_nearest_range: for each right row it returns a Range covering both the floor (last left ≤ right) and ceil (first left ≥ right). Every right row is offered to every position in the range via update_nearest_match, which keeps the closer candidate.

Finalize
After per-worker probe states are merged, nearest_fill resolves unmatched left rows by running both a forward and backward fill on copies of global_best, then picking the closer candidate from the two directions using is_nearer.

is_nearer
For each comparison, dispatches on the Arrow DataType once, downcasts to the concrete PrimitiveArray, then extracts three plain Rust scalars (candidate A, candidate B, pivot) and computes |a - pivot| vs |b - pivot|

Chose this approach as type-matching and computing distances as plain Rust scalars was faster than going through Arrow compute kernels or Daft Series/array primitives


Changes: Distributed execution
We refactored the carryover computation into compute_carryovers(descending: bool), which runs a top_n(limit=1) pass over the right table:

  • descending=true picks the per-partition max and propagates it left→right, giving each partition the closest right row from behind;
  • descending=false picks the per-partition min and propagates right→left, giving each partition the closest right row from ahead.

For Nearest, both passes run concurrently via tokio::try_join!. Each partition's local join task then receives its own right data plus one boundary row from each direction (two carryovers), and the native nearest join handles the rest.

@euanlimzx euanlimzx requested a review from a team as a code owner May 18, 2026 04:44
@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 18, 2026

Rust Dependency Diff

Head: c37128c570eac15e1246072277975410b885e55a vs Base: cad4cd5d921e85b4794465baddf2f8f37e027367.

OK: Within budget.

  • New Crates: 7
  • Removed Crates: 0

Added

  • chacha20: 0.10.0
  • cpufeatures: 0.3.0
  • parquet-variant: 57.3.0
  • parquet-variant-compute: 57.3.0
  • parquet-variant-json: 57.3.0
  • rand: 0.10.1
  • rand_core: 0.10.1

@github-actions github-actions Bot added the feat label May 18, 2026
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 18, 2026

Greptile Summary

This PR adds a \"nearest\" strategy to the asof join, which matches each left row to the right row with the minimum absolute on_key distance (ties prefer the larger/forward value). Both local-execution and distributed execution paths are updated.

  • Local execution (asof_join.rs): adds type-dispatched is_nearer distance comparison, a new search_bucket_update_range probe helper, update_nearest_match for slot updates, and nearest_fill for the gap-filling finalization pass.
  • Distributed execution: generalises the single carryover to a (backward, forward) tuple so each partition receives the closest cross-boundary candidate from both directions.
  • Tests: a new test_asof_join_nearest.py covers correctness, tie-breaking, nulls, group-by, empty tables, float keys, and multi-partition distributed scenarios.

Confidence Score: 4/5

The nearest-join logic is algorithmically sound but an unresolved issue from a prior review (executor crash on unsupported on-key Arrow types) still needs attention before merging.

The core probe/fill/carryover pipeline is correct and well-tested. The is_nearer function ends with unreachable! for types outside its dispatch table, which panics the executor thread. This was flagged in a previous review and is still present in the current diff.

The type-dispatch fallthrough in is_nearer inside src/daft-local-execution/src/join/asof_join.rs needs to return a DaftError instead of panicking.

Important Files Changed

Filename Overview
src/daft-local-execution/src/join/asof_join.rs Core nearest-join implementation: adds is_nearer dispatch, search_bucket_update_range, update_nearest_match, and nearest_fill; previously flagged unreachable! panic for unsupported on-key types still present
src/daft-distributed/src/pipeline_node/join/asof_join.rs Refactors carryover to dual-direction tuple; forward_pass[i+1] / backward_pass[i-1] indexing logic is correct
tests/dataframe/test_asof_join_nearest.py Comprehensive new test suite covering match correctness, tie-breaking, nulls, group-by, empty tables, float keys, and distributed execution across 1/2/4/8 partitions
src/daft-core/src/join.rs Adds Nearest variant to AsofJoinStrategy enum, iterator, and FromStr
daft/dataframe/dataframe.py Adds nearest to strategy Literal type and corrects docstring tie-break description
src/daft-local-execution/Cargo.toml Adds arrow workspace dependency needed for DataType dispatch in is_nearer

Reviews (3): Last reviewed commit: "fix docstring" | Re-trigger Greptile

Comment thread daft/dataframe/dataframe.py Outdated
Comment thread src/daft-local-execution/src/join/asof_join.rs Outdated
Comment thread src/daft-local-execution/src/join/asof_join.rs Outdated
@codecov
Copy link
Copy Markdown

codecov Bot commented May 18, 2026

Codecov Report

❌ Patch coverage is 61.76471% with 169 lines in your changes missing coverage. Please review.
✅ Project coverage is 75.51%. Comparing base (b5ae03a) to head (2ec667a).
⚠️ Report is 27 commits behind head on main.

Files with missing lines Patch % Lines
...ft-distributed/src/pipeline_node/join/asof_join.rs 0.00% 116 Missing ⚠️
src/daft-core/src/kernels/cmp.rs 72.56% 31 Missing ⚠️
src/daft-local-execution/src/join/asof_join.rs 89.62% 22 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #6953      +/-   ##
==========================================
- Coverage   75.76%   75.51%   -0.26%     
==========================================
  Files        1135     1149      +14     
  Lines      161527   161691     +164     
==========================================
- Hits       122388   122095     -293     
- Misses      39139    39596     +457     
Files with missing lines Coverage Δ
daft/dataframe/dataframe.py 85.20% <ø> (+4.57%) ⬆️
src/daft-core/src/array/ops/arrow/comparison.rs 97.70% <ø> (ø)
src/daft-core/src/array/ops/sort.rs 77.33% <ø> (+1.77%) ⬆️
src/daft-core/src/join.rs 53.48% <100.00%> (+0.54%) ⬆️
src/daft-core/src/kernels/search_sorted.rs 91.62% <ø> (-0.58%) ⬇️
src/daft-recordbatch/src/ops/joins/merge_join.rs 78.99% <ø> (ø)
src/daft-local-execution/src/join/asof_join.rs 95.20% <89.62%> (-2.27%) ⬇️
src/daft-core/src/kernels/cmp.rs 72.56% <72.56%> (ø)
...ft-distributed/src/pipeline_node/join/asof_join.rs 12.50% <0.00%> (-1.91%) ⬇️

... and 157 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@euanlimzx euanlimzx closed this May 18, 2026
@euanlimzx euanlimzx reopened this May 18, 2026
@euanlimzx
Copy link
Copy Markdown
Contributor Author

euanlimzx commented May 18, 2026

@greptileai review

@euanlimzx euanlimzx requested a review from colin-ho May 19, 2026 21:14
Copy link
Copy Markdown
Collaborator

@colin-ho colin-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't read the local execution code yet, will get to that later

Comment on lines +31 to +34
const PER_WORKER_CARRYOVER_BACKWARD_PHASE: &str = "per_worker_carryover_backward";
const PER_PARTITION_CARRYOVER_BACKWARD_PHASE: &str = "per_partition_carryover_backward";
const PER_WORKER_CARRYOVER_FORWARD_PHASE: &str = "per_worker_carryover_forward";
const PER_PARTITION_CARRYOVER_FORWARD_PHASE: &str = "per_partition_carryover_forward";
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took me to understand what these are. Per worker makes sense, you compute the carryovers per worker, but per partition doesn't make sense in the same context, because it's not computing carryovers per partition, its computing carryovers across the per worker carryovers.

I recommend changing to partial / final, or local / global. My preference is partial / final.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this only for the phase string, or also for

        let mut per_partition_outputs: Vec<Option<MaterializedOutput>> =
            try_join_all(per_partition_tasks.into_iter().map(|t| async {
                match t {
                    Some(task) => task.await.map(|mo| mo.filter(|m| m.num_rows() > 0)),
                    None => Ok(None),
                }
            }))
            .await?;

        let n = per_partition_outputs.len();
        if propagate_forward {
            for i in 1..n {
                if per_partition_outputs[i].is_none() {
                    let prev = per_partition_outputs[i - 1].clone();
                    per_partition_outputs[i] = prev;
                }
            }
        } else {
            for i in (0..n.saturating_sub(1)).rev() {
                if per_partition_outputs[i].is_none() {
                    let next = per_partition_outputs[i + 1].clone();
                    per_partition_outputs[i] = next;
                }
            }
        }

        Ok(per_partition_outputs)
    }

?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that too

Comment thread src/daft-distributed/src/pipeline_node/join/asof_join.rs Outdated
Comment thread src/daft-distributed/src/pipeline_node/join/asof_join.rs Outdated
Comment thread src/daft-distributed/src/pipeline_node/join/asof_join.rs Outdated
@colin-ho
Copy link
Copy Markdown
Collaborator

In your PR description:
Chose this approach as type-matching and computing distances as plain Rust scalars was faster than going through Arrow compute kernels or Daft Series/array primitives

How much faster?

Comment thread src/daft-distributed/src/pipeline_node/join/asof_join.rs Outdated
Comment thread src/daft-distributed/src/pipeline_node/join/asof_join.rs Outdated
Comment thread src/daft-local-execution/src/join/asof_join.rs Outdated
@euanlimzx euanlimzx requested a review from colin-ho May 22, 2026 16:22
Comment on lines +31 to +34
const PER_WORKER_CARRYOVER_BACKWARD_PHASE: &str = "per_worker_carryover_backward";
const PER_PARTITION_CARRYOVER_BACKWARD_PHASE: &str = "per_partition_carryover_backward";
const PER_WORKER_CARRYOVER_FORWARD_PHASE: &str = "per_worker_carryover_forward";
const PER_PARTITION_CARRYOVER_FORWARD_PHASE: &str = "per_partition_carryover_forward";
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that too

Comment thread src/daft-core/src/kernels/search_sorted.rs Outdated
Comment thread src/daft-local-execution/src/join/asof_join.rs Outdated
@blacksmith-sh

This comment has been minimized.

@euanlimzx euanlimzx requested a review from colin-ho May 26, 2026 21:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants