Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions datafusion/bio-function-ranges/src/complement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use ahash::AHashMap;
use ahash::{AHashMap, AHashSet};
use async_trait::async_trait;
use datafusion::arrow::array::{Int64Builder, RecordBatch, StringBuilder};
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
Expand Down Expand Up @@ -261,7 +261,7 @@ impl ExecutionPlan for ComplementExec {
group_idx: 0,
trailing_iter_idx: 0,
trailing_contigs: Vec::new(),
seen_contigs: Vec::new(),
seen_contigs: AHashSet::new(),
contig_builder: StringBuilder::new(),
start_builder: Int64Builder::new(),
end_builder: Int64Builder::new(),
Expand Down Expand Up @@ -290,7 +290,7 @@ struct ComplementStream {
group_idx: usize,
trailing_iter_idx: usize,
trailing_contigs: Vec<String>,
seen_contigs: Vec<String>,
seen_contigs: AHashSet<String>,
contig_builder: StringBuilder,
start_builder: Int64Builder,
end_builder: Int64Builder,
Expand Down Expand Up @@ -337,6 +337,7 @@ impl ComplementStream {
}

/// Compute complement of merged intervals against view intervals for a contig.
/// Uses a merge_cursor to avoid re-scanning from the start for each view interval.
fn emit_contig_complement(
contig: &str,
merged_intervals: &[(i64, i64)],
Expand All @@ -346,12 +347,18 @@ impl ComplementStream {
end_builder: &mut Int64Builder,
) -> usize {
let mut rows = 0;
let mut merge_cursor = 0;
for &(view_start, view_end) in view_intervals {
let mut cursor = view_start;
for &(ms, me) in merged_intervals {
if me <= view_start {
continue;
}
// Advance merge_cursor past intervals entirely before this view
while merge_cursor < merged_intervals.len()
&& merged_intervals[merge_cursor].1 <= view_start
{
merge_cursor += 1;
}
let mut j = merge_cursor;
while j < merged_intervals.len() {
let (ms, me) = merged_intervals[j];
if ms >= view_end {
break;
}
Expand All @@ -364,6 +371,7 @@ impl ComplementStream {
rows += 1;
}
cursor = interval_end;
j += 1;
}
if cursor < view_end {
contig_builder.append_value(contig);
Expand Down Expand Up @@ -436,7 +444,7 @@ impl Stream for ComplementStream {
&mut this.end_builder,
);

this.seen_contigs.push(contig.clone());
this.seen_contigs.insert(contig.clone());
this.group_idx += 1;

if this.pending_rows >= this.batch_size {
Expand Down
10 changes: 6 additions & 4 deletions datafusion/bio-function-ranges/src/grouped_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ impl StreamCollector {
None => {
let idx = self.groups.len();
self.groups.push(Vec::with_capacity(1024));
self.contig_names.push(contig.to_string());
self.contig_to_idx.insert(contig.to_string(), idx);
let owned = contig.to_string();
self.contig_to_idx.insert(owned.clone(), idx);
self.contig_names.push(owned);
idx
}
};
Expand Down Expand Up @@ -192,8 +193,9 @@ impl FullBatchCollector {
None => {
let idx = self.groups.len();
self.groups.push(Vec::with_capacity(1024));
self.contig_names.push(contig.to_string());
self.contig_to_idx.insert(contig.to_string(), idx);
let owned = contig.to_string();
self.contig_to_idx.insert(owned.clone(), idx);
self.contig_names.push(owned);
idx
}
};
Expand Down
15 changes: 11 additions & 4 deletions datafusion/bio-function-ranges/src/interval_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,14 @@ pub fn build_coitree_from_batches(
let (contig_arr, start_arr, end_arr) =
get_join_col_arrays(&batch, (columns.0, columns.1, columns.2))?;

// Resolve once per batch for zero-dispatch inner loop
let start_resolved = start_arr.resolve()?;
let end_resolved = end_arr.resolve()?;

for i in 0..batch.num_rows() {
let contig = contig_arr.value(i);
let pos_start = start_arr.value(i)?;
let pos_end = end_arr.value(i)?;
let pos_start = start_resolved[i];
let pos_end = end_resolved[i];
let interval = Interval::new(pos_start, pos_end, ());

if let Some(seqname_nodes) = nodes.get_mut(contig) {
Expand Down Expand Up @@ -106,12 +110,15 @@ pub fn get_stream(
get_join_col_arrays(&rb, (&columns_2.0, &columns_2.1, &columns_2.2))?;
let mut count_arr = Vec::with_capacity(rb.num_rows());
let num_rows = rb.num_rows();
// Resolve once per batch for zero-dispatch inner loop
let start_resolved = pos_start.resolve()?;
let end_resolved = pos_end.resolve()?;
let mut cached_contig: Option<&str> = None;
let mut cached_tree: Option<&COITree<(), u32>> = None;
for i in 0..num_rows {
let contig = contig.value(i);
let mut query_start = pos_start.value(i)?;
let mut query_end = pos_end.value(i)?;
let mut query_start = start_resolved[i];
let mut query_end = end_resolved[i];
if strict_filter {
query_start += 1;
query_end -= 1;
Expand Down
34 changes: 34 additions & 0 deletions datafusion/bio-function-ranges/src/subtract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,26 @@ use futures::{Stream, ready};
use crate::filter_op::FilterOp;
use crate::grouped_stream::{FullBatchCollector, IndexedGroups, StreamCollector};

/// Merge overlapping/touching intervals in a sorted `Vec<(i64, i64)>` in-place.
/// After this, the intervals are non-overlapping and sorted, which dramatically
/// shrinks the candidate set for subtraction.
fn normalize_intervals(intervals: &mut Vec<(i64, i64)>) {
if intervals.len() <= 1 {
return;
}
// intervals are already sorted by (start, end) from StreamCollector
let mut write = 0;
for read in 1..intervals.len() {
if intervals[read].0 <= intervals[write].1 {
intervals[write].1 = intervals[write].1.max(intervals[read].1);
} else {
write += 1;
intervals[write] = intervals[read];
}
}
intervals.truncate(write + 1);
}

pub struct SubtractProvider {
session: Arc<SessionContext>,
left_table: String,
Expand Down Expand Up @@ -392,6 +412,11 @@ impl Stream for SubtractStream {
}
this.right_groups =
this.right_collector.take().unwrap().take_groups_as_map();
// Normalize (merge) right-side intervals per contig to
// shrink the candidate set for subtraction.
for intervals in this.right_groups.values_mut() {
normalize_intervals(intervals);
}
}
this.phase = SubtractPhase::CollectLeft;
}
Expand Down Expand Up @@ -419,6 +444,8 @@ impl Stream for SubtractStream {
let (ls, le) = left_intervals[this.interval_idx];
this.interval_idx += 1;

// Advance persistent cursor past right intervals
// that end before (or at) this left start.
while this.right_cursor < right_intervals.len() {
let skip = if this.strict {
right_intervals[this.right_cursor].1 <= ls
Expand Down Expand Up @@ -567,6 +594,11 @@ impl Stream for SubtractStreamExtra {
}
this.right_groups =
this.right_collector.take().unwrap().take_groups_as_map();
// Normalize (merge) right-side intervals per contig to
// shrink the candidate set for subtraction.
for intervals in this.right_groups.values_mut() {
normalize_intervals(intervals);
}
}
this.phase = SubtractPhase::CollectLeft;
}
Expand Down Expand Up @@ -604,6 +636,8 @@ impl Stream for SubtractStreamExtra {
let (ls, le, row_idx) = left_intervals[this.interval_idx];
this.interval_idx += 1;

// Advance persistent cursor past right intervals
// that end before (or at) this left start.
while this.right_cursor < right_intervals.len() {
let skip = if this.strict {
right_intervals[this.right_cursor].1 <= ls
Expand Down
Loading
Loading