Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 35 additions & 72 deletions crates/execution/src/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use spacetimedb_lib::{query::Delta, AlgebraicValue, ProductValue};
use spacetimedb_physical_plan::plan::{
HashJoin, IxJoin, IxScan, PhysicalExpr, PhysicalPlan, ProjectField, ProjectPlan, Sarg, Semi, TableScan, TupleField,
};
use spacetimedb_primitives::{IndexId, TableId};
use spacetimedb_table::{
blob_store::BlobStore,
table::{IndexScanPointIter, IndexScanRangeIter, Table, TableScanIter},
table::{IndexScanPointIter, IndexScanRangeIter, TableAndIndex, TableScanIter},
table_index::{TableIndex, TableIndexPointIter},
};

Expand Down Expand Up @@ -383,30 +383,28 @@ pub struct UniqueIxJoin<'a> {
/// The lhs of the join
lhs: Box<Iter<'a>>,
/// The rhs index
rhs_index: &'a TableIndex,
/// A handle to the datastore
rhs_table: &'a Table,
/// A handle to the blobstore
blob_store: &'a dyn BlobStore,
rhs_index: TableAndIndex<'a>,
/// The lhs probe field
lhs_field: &'a TupleField,
}

pub(super) fn get_index(tx: &impl Datastore, table_id: TableId, index_id: IndexId) -> Result<TableAndIndex<'_>> {
let table = tx.table_or_err(table_id)?;
table
.get_index_by_id_with_table(tx.blob_store(), index_id)
.ok_or_else(|| anyhow!("IndexId `{}` does not exist", index_id))
}

impl<'a> UniqueIxJoin<'a> {
fn build_from<Tx>(join: &'a IxJoin, tx: &'a Tx) -> Result<Self>
where
Tx: Datastore + DeltaStore,
{
let lhs = Iter::build(&join.lhs, tx)?;
let rhs_table = tx.table_or_err(join.rhs.table_id)?;
let rhs_index = rhs_table
.get_index_by_id(join.rhs_index)
.ok_or_else(|| anyhow!("IndexId `{}` does not exist", join.rhs_index))?;
let rhs_index = get_index(tx, join.rhs.table_id, join.rhs_index)?;
Ok(Self {
lhs: Box::new(lhs),
rhs_index,
rhs_table,
blob_store: tx.blob_store(),
lhs_field: &join.lhs_field,
})
}
Expand All @@ -420,7 +418,6 @@ impl<'a> Iterator for UniqueIxJoin<'a> {
self.rhs_index
.seek_point(&tuple.project(self.lhs_field))
.next()
.and_then(|ptr| self.rhs_table.get_row_ref(self.blob_store, ptr))
.map(Row::Ptr)
.map(|ptr| (tuple, ptr))
})
Expand Down Expand Up @@ -468,11 +465,7 @@ pub struct UniqueIxJoinRhs<'a> {
/// The lhs of the join
lhs: Box<Iter<'a>>,
/// The rhs index
rhs_index: &'a TableIndex,
/// A handle to the datastore
rhs_table: &'a Table,
/// A handle to the blobstore
blob_store: &'a dyn BlobStore,
rhs_index: TableAndIndex<'a>,
/// The lhs probe field
lhs_field: &'a TupleField,
}
Expand All @@ -483,15 +476,10 @@ impl<'a> UniqueIxJoinRhs<'a> {
Tx: Datastore + DeltaStore,
{
let lhs = Iter::build(&join.lhs, tx)?;
let rhs_table = tx.table_or_err(join.rhs.table_id)?;
let rhs_index = rhs_table
.get_index_by_id(join.rhs_index)
.ok_or_else(|| anyhow!("IndexId `{}` does not exist", join.rhs_index))?;
let rhs_index = get_index(tx, join.rhs.table_id, join.rhs_index)?;
Ok(Self {
lhs: Box::new(lhs),
rhs_index,
rhs_table,
blob_store: tx.blob_store(),
lhs_field: &join.lhs_field,
})
}
Expand All @@ -505,7 +493,6 @@ impl<'a> Iterator for UniqueIxJoinRhs<'a> {
self.rhs_index
.seek_point(&tuple.project(self.lhs_field))
.next()
.and_then(|ptr| self.rhs_table.get_row_ref(self.blob_store, ptr))
.map(Row::Ptr)
})
}
Expand All @@ -518,13 +505,9 @@ pub struct IxJoinIter<'a> {
/// The current lhs tuple
lhs_tuple: Option<Tuple<'a>>,
/// The rhs index
rhs_index: &'a TableIndex,
rhs_index: TableAndIndex<'a>,
/// The current rhs index cursor
rhs_index_cursor: Option<TableIndexPointIter<'a>>,
/// A handle to the datastore
rhs_table: &'a Table,
/// A handle to the blobstore
blob_store: &'a dyn BlobStore,
/// The lhs probe field
lhs_field: &'a TupleField,
}
Expand All @@ -535,17 +518,12 @@ impl<'a> IxJoinIter<'a> {
Tx: Datastore + DeltaStore,
{
let lhs = Iter::build(&join.lhs, tx)?;
let rhs_table = tx.table_or_err(join.rhs.table_id)?;
let rhs_index = rhs_table
.get_index_by_id(join.rhs_index)
.ok_or_else(|| anyhow!("IndexId `{}` does not exist", join.rhs_index))?;
let rhs_index = get_index(tx, join.rhs.table_id, join.rhs_index)?;
Ok(Self {
lhs: Box::new(lhs),
lhs_tuple: None,
rhs_index,
rhs_index_cursor: None,
rhs_table,
blob_store: tx.blob_store(),
lhs_field: &join.lhs_field,
})
}
Expand All @@ -558,27 +536,22 @@ impl<'a> Iterator for IxJoinIter<'a> {
self.lhs_tuple
.as_ref()
.and_then(|tuple| {
self.rhs_index_cursor.as_mut().and_then(|cursor| {
cursor.next().and_then(|ptr| {
self.rhs_table
.get_row_ref(self.blob_store, ptr)
.map(Row::Ptr)
.map(|ptr| (tuple.clone(), ptr))
})
})
self.rhs_index_cursor
.as_mut()
.and_then(|cursor| cursor.next())
// SAFETY: `ptr` came from `self.rhs_index`.
.map(|ptr| unsafe { self.rhs_index.combine_with_ptr(ptr) })
.map(Row::Ptr)
.map(|ptr| (tuple.clone(), ptr))
})
.or_else(|| {
self.lhs.find_map(|tuple| {
let mut cursor = self.rhs_index.seek_point(&tuple.project(self.lhs_field));
cursor.next().and_then(|ptr| {
self.rhs_table
.get_row_ref(self.blob_store, ptr)
.map(Row::Ptr)
.map(|ptr| {
self.lhs_tuple = Some(tuple.clone());
self.rhs_index_cursor = Some(cursor);
(tuple, ptr)
})
cursor.next().map(|ptr| {
let ptr = Row::Ptr(ptr);
self.lhs_tuple = Some(tuple.clone());
self.rhs_index_cursor = Some(cursor.index());
(tuple, ptr)
})
})
})
Expand Down Expand Up @@ -645,13 +618,9 @@ pub struct IxJoinRhs<'a> {
/// The lhs of the join
lhs: Box<Iter<'a>>,
/// The rhs index
rhs_index: &'a TableIndex,
rhs_index: TableAndIndex<'a>,
/// The current rhs index cursor
rhs_index_cursor: Option<TableIndexPointIter<'a>>,
/// A handle to the datastore
rhs_table: &'a Table,
/// A handle to the blobstore
blob_store: &'a dyn BlobStore,
/// The lhs probe field
lhs_field: &'a TupleField,
}
Expand All @@ -662,16 +631,11 @@ impl<'a> IxJoinRhs<'a> {
Tx: Datastore + DeltaStore,
{
let lhs = Iter::build(&join.lhs, tx)?;
let rhs_table = tx.table_or_err(join.rhs.table_id)?;
let rhs_index = rhs_table
.get_index_by_id(join.rhs_index)
.ok_or_else(|| anyhow!("IndexId `{}` does not exist", join.rhs_index))?;
let rhs_index = get_index(tx, join.rhs.table_id, join.rhs_index)?;
Ok(Self {
lhs: Box::new(lhs),
rhs_index,
rhs_index_cursor: None,
rhs_table,
blob_store: tx.blob_store(),
lhs_field: &join.lhs_field,
})
}
Expand All @@ -686,18 +650,17 @@ impl<'a> Iterator for IxJoinRhs<'a> {
.and_then(|cursor| {
cursor
.next()
.and_then(|ptr| self.rhs_table.get_row_ref(self.blob_store, ptr))
// SAFETY: `ptr` came from `self.rhs_index`.
.map(|ptr| unsafe { self.rhs_index.combine_with_ptr(ptr) })
.map(Row::Ptr)
})
.or_else(|| {
self.lhs.find_map(|tuple| {
let mut cursor = self.rhs_index.seek_point(&tuple.project(self.lhs_field));
cursor.next().and_then(|ptr| {
self.rhs_table
.get_row_ref(self.blob_store, ptr)
.map(Row::Ptr)
.inspect(|_| self.rhs_index_cursor = Some(cursor))
})
cursor
.next()
.map(Row::Ptr)
.inspect(|_| self.rhs_index_cursor = Some(cursor.index()))
})
})
}
Expand Down
21 changes: 8 additions & 13 deletions crates/execution/src/pipelined.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
ops::Bound,
};

use anyhow::{anyhow, Result};
use anyhow::Result;
use itertools::Either;
use spacetimedb_expr::expr::AggType;
use spacetimedb_lib::{metrics::ExecutionMetrics, query::Delta, sats::size_of::SizeOf, AlgebraicValue, ProductValue};
Expand All @@ -14,7 +14,7 @@ use spacetimedb_physical_plan::plan::{
use spacetimedb_primitives::{ColId, IndexId, TableId};
use spacetimedb_sats::product;

use crate::{Datastore, DeltaStore, Row, Tuple};
use crate::{iter::get_index, Datastore, DeltaStore, Row, Tuple};

/// An executor for explicit column projections.
/// Note, this plan can only be constructed from the http api,
Expand Down Expand Up @@ -514,11 +514,7 @@ impl PipelinedIxJoin {
metrics: &mut ExecutionMetrics,
f: &mut dyn FnMut(Tuple<'a>) -> Result<()>,
) -> Result<()> {
let blob_store = tx.blob_store();
let rhs_table = tx.table_or_err(self.rhs_table)?;
let rhs_index = rhs_table
.get_index_by_id(self.rhs_index)
.ok_or_else(|| anyhow!("IndexId `{0}` does not exist", self.rhs_index))?;
let rhs_index = get_index(tx, self.rhs_table, self.rhs_index)?;

let mut n = 0;
let mut index_seeks = 0;
Expand All @@ -537,7 +533,10 @@ impl PipelinedIxJoin {
lhs.execute(tx, metrics, &mut |u| {
n += 1;
index_seeks += 1;
if rhs_index.contains_any(&project(&u, lhs_field, &mut bytes_scanned)) {
if rhs_index
.index()
.contains_any(&project(&u, lhs_field, &mut bytes_scanned))
{
f(u)?;
}
Ok(())
Expand All @@ -557,7 +556,6 @@ impl PipelinedIxJoin {
if let Some(v) = rhs_index
.seek_point(&project(&u, lhs_field, &mut bytes_scanned))
.next()
.and_then(|ptr| rhs_table.get_row_ref(blob_store, ptr))
.map(Row::Ptr)
.map(Tuple::Row)
{
Expand All @@ -580,7 +578,6 @@ impl PipelinedIxJoin {
if let Some(v) = rhs_index
.seek_point(&project(&u, lhs_field, &mut bytes_scanned))
.next()
.and_then(|ptr| rhs_table.get_row_ref(blob_store, ptr))
.map(Row::Ptr)
.map(Tuple::Row)
{
Expand All @@ -601,7 +598,7 @@ impl PipelinedIxJoin {
lhs.execute(tx, metrics, &mut |u| {
n += 1;
index_seeks += 1;
if let Some(n) = rhs_index.count(&project(&u, lhs_field, &mut bytes_scanned)) {
if let Some(n) = rhs_index.index().count(&project(&u, lhs_field, &mut bytes_scanned)) {
for _ in 0..n {
f(u.clone())?;
}
Expand All @@ -622,7 +619,6 @@ impl PipelinedIxJoin {
index_seeks += 1;
for v in rhs_index
.seek_point(&project(&u, lhs_field, &mut bytes_scanned))
.filter_map(|ptr| rhs_table.get_row_ref(blob_store, ptr))
.map(Row::Ptr)
.map(Tuple::Row)
{
Expand All @@ -644,7 +640,6 @@ impl PipelinedIxJoin {
index_seeks += 1;
for v in rhs_index
.seek_point(&project(&u, lhs_field, &mut bytes_scanned))
.filter_map(|ptr| rhs_table.get_row_ref(blob_store, ptr))
.map(Row::Ptr)
.map(Tuple::Row)
{
Expand Down
17 changes: 17 additions & 0 deletions crates/table/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1752,6 +1752,16 @@ impl<'a> TableAndIndex<'a> {
self.index
}

/// Wraps `ptr` in a [`RowRef`].
///
/// # Safety
///
/// The `self.table().is_row_present(ptr)` must hold.
pub unsafe fn combine_with_ptr(&self, ptr: RowPointer) -> RowRef<'a> {
// SAFETY: forward caller requirement.
unsafe { self.table.get_row_ref_unchecked(self.blob_store, ptr) }
}

/// Returns an iterator yielding all rows in this index for `key`.
///
/// Matching is defined by `Ord for AlgebraicValue`.
Expand Down Expand Up @@ -1788,6 +1798,13 @@ pub struct IndexScanPointIter<'a> {
btree_index_iter: TableIndexPointIter<'a>,
}

impl<'a> IndexScanPointIter<'a> {
/// Consume the iterator, returning the inner one.
pub fn index(self) -> TableIndexPointIter<'a> {
self.btree_index_iter
}
}

impl<'a> Iterator for IndexScanPointIter<'a> {
type Item = RowRef<'a>;

Expand Down
Loading