Skip to content

Commit ad39b7b

Browse files
authored
spacetimedb_execution: avoid get_row_ref (#2806)
1 parent c3b0009 commit ad39b7b

3 files changed

Lines changed: 60 additions & 85 deletions

File tree

crates/execution/src/iter.rs

Lines changed: 35 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ use spacetimedb_lib::{query::Delta, AlgebraicValue, ProductValue};
55
use spacetimedb_physical_plan::plan::{
66
HashJoin, IxJoin, IxScan, PhysicalExpr, PhysicalPlan, ProjectField, ProjectPlan, Sarg, Semi, TableScan, TupleField,
77
};
8+
use spacetimedb_primitives::{IndexId, TableId};
89
use spacetimedb_table::{
9-
blob_store::BlobStore,
10-
table::{IndexScanPointIter, IndexScanRangeIter, Table, TableScanIter},
10+
table::{IndexScanPointIter, IndexScanRangeIter, TableAndIndex, TableScanIter},
1111
table_index::{TableIndex, TableIndexPointIter},
1212
};
1313

@@ -383,30 +383,28 @@ pub struct UniqueIxJoin<'a> {
383383
/// The lhs of the join
384384
lhs: Box<Iter<'a>>,
385385
/// The rhs index
386-
rhs_index: &'a TableIndex,
387-
/// A handle to the datastore
388-
rhs_table: &'a Table,
389-
/// A handle to the blobstore
390-
blob_store: &'a dyn BlobStore,
386+
rhs_index: TableAndIndex<'a>,
391387
/// The lhs probe field
392388
lhs_field: &'a TupleField,
393389
}
394390

391+
pub(super) fn get_index(tx: &impl Datastore, table_id: TableId, index_id: IndexId) -> Result<TableAndIndex<'_>> {
392+
let table = tx.table_or_err(table_id)?;
393+
table
394+
.get_index_by_id_with_table(tx.blob_store(), index_id)
395+
.ok_or_else(|| anyhow!("IndexId `{}` does not exist", index_id))
396+
}
397+
395398
impl<'a> UniqueIxJoin<'a> {
396399
fn build_from<Tx>(join: &'a IxJoin, tx: &'a Tx) -> Result<Self>
397400
where
398401
Tx: Datastore + DeltaStore,
399402
{
400403
let lhs = Iter::build(&join.lhs, tx)?;
401-
let rhs_table = tx.table_or_err(join.rhs.table_id)?;
402-
let rhs_index = rhs_table
403-
.get_index_by_id(join.rhs_index)
404-
.ok_or_else(|| anyhow!("IndexId `{}` does not exist", join.rhs_index))?;
404+
let rhs_index = get_index(tx, join.rhs.table_id, join.rhs_index)?;
405405
Ok(Self {
406406
lhs: Box::new(lhs),
407407
rhs_index,
408-
rhs_table,
409-
blob_store: tx.blob_store(),
410408
lhs_field: &join.lhs_field,
411409
})
412410
}
@@ -420,7 +418,6 @@ impl<'a> Iterator for UniqueIxJoin<'a> {
420418
self.rhs_index
421419
.seek_point(&tuple.project(self.lhs_field))
422420
.next()
423-
.and_then(|ptr| self.rhs_table.get_row_ref(self.blob_store, ptr))
424421
.map(Row::Ptr)
425422
.map(|ptr| (tuple, ptr))
426423
})
@@ -468,11 +465,7 @@ pub struct UniqueIxJoinRhs<'a> {
468465
/// The lhs of the join
469466
lhs: Box<Iter<'a>>,
470467
/// The rhs index
471-
rhs_index: &'a TableIndex,
472-
/// A handle to the datastore
473-
rhs_table: &'a Table,
474-
/// A handle to the blobstore
475-
blob_store: &'a dyn BlobStore,
468+
rhs_index: TableAndIndex<'a>,
476469
/// The lhs probe field
477470
lhs_field: &'a TupleField,
478471
}
@@ -483,15 +476,10 @@ impl<'a> UniqueIxJoinRhs<'a> {
483476
Tx: Datastore + DeltaStore,
484477
{
485478
let lhs = Iter::build(&join.lhs, tx)?;
486-
let rhs_table = tx.table_or_err(join.rhs.table_id)?;
487-
let rhs_index = rhs_table
488-
.get_index_by_id(join.rhs_index)
489-
.ok_or_else(|| anyhow!("IndexId `{}` does not exist", join.rhs_index))?;
479+
let rhs_index = get_index(tx, join.rhs.table_id, join.rhs_index)?;
490480
Ok(Self {
491481
lhs: Box::new(lhs),
492482
rhs_index,
493-
rhs_table,
494-
blob_store: tx.blob_store(),
495483
lhs_field: &join.lhs_field,
496484
})
497485
}
@@ -505,7 +493,6 @@ impl<'a> Iterator for UniqueIxJoinRhs<'a> {
505493
self.rhs_index
506494
.seek_point(&tuple.project(self.lhs_field))
507495
.next()
508-
.and_then(|ptr| self.rhs_table.get_row_ref(self.blob_store, ptr))
509496
.map(Row::Ptr)
510497
})
511498
}
@@ -518,13 +505,9 @@ pub struct IxJoinIter<'a> {
518505
/// The current lhs tuple
519506
lhs_tuple: Option<Tuple<'a>>,
520507
/// The rhs index
521-
rhs_index: &'a TableIndex,
508+
rhs_index: TableAndIndex<'a>,
522509
/// The current rhs index cursor
523510
rhs_index_cursor: Option<TableIndexPointIter<'a>>,
524-
/// A handle to the datastore
525-
rhs_table: &'a Table,
526-
/// A handle to the blobstore
527-
blob_store: &'a dyn BlobStore,
528511
/// The lhs probe field
529512
lhs_field: &'a TupleField,
530513
}
@@ -535,17 +518,12 @@ impl<'a> IxJoinIter<'a> {
535518
Tx: Datastore + DeltaStore,
536519
{
537520
let lhs = Iter::build(&join.lhs, tx)?;
538-
let rhs_table = tx.table_or_err(join.rhs.table_id)?;
539-
let rhs_index = rhs_table
540-
.get_index_by_id(join.rhs_index)
541-
.ok_or_else(|| anyhow!("IndexId `{}` does not exist", join.rhs_index))?;
521+
let rhs_index = get_index(tx, join.rhs.table_id, join.rhs_index)?;
542522
Ok(Self {
543523
lhs: Box::new(lhs),
544524
lhs_tuple: None,
545525
rhs_index,
546526
rhs_index_cursor: None,
547-
rhs_table,
548-
blob_store: tx.blob_store(),
549527
lhs_field: &join.lhs_field,
550528
})
551529
}
@@ -558,27 +536,22 @@ impl<'a> Iterator for IxJoinIter<'a> {
558536
self.lhs_tuple
559537
.as_ref()
560538
.and_then(|tuple| {
561-
self.rhs_index_cursor.as_mut().and_then(|cursor| {
562-
cursor.next().and_then(|ptr| {
563-
self.rhs_table
564-
.get_row_ref(self.blob_store, ptr)
565-
.map(Row::Ptr)
566-
.map(|ptr| (tuple.clone(), ptr))
567-
})
568-
})
539+
self.rhs_index_cursor
540+
.as_mut()
541+
.and_then(|cursor| cursor.next())
542+
// SAFETY: `ptr` came from `self.rhs_index`.
543+
.map(|ptr| unsafe { self.rhs_index.combine_with_ptr(ptr) })
544+
.map(Row::Ptr)
545+
.map(|ptr| (tuple.clone(), ptr))
569546
})
570547
.or_else(|| {
571548
self.lhs.find_map(|tuple| {
572549
let mut cursor = self.rhs_index.seek_point(&tuple.project(self.lhs_field));
573-
cursor.next().and_then(|ptr| {
574-
self.rhs_table
575-
.get_row_ref(self.blob_store, ptr)
576-
.map(Row::Ptr)
577-
.map(|ptr| {
578-
self.lhs_tuple = Some(tuple.clone());
579-
self.rhs_index_cursor = Some(cursor);
580-
(tuple, ptr)
581-
})
550+
cursor.next().map(|ptr| {
551+
let ptr = Row::Ptr(ptr);
552+
self.lhs_tuple = Some(tuple.clone());
553+
self.rhs_index_cursor = Some(cursor.index());
554+
(tuple, ptr)
582555
})
583556
})
584557
})
@@ -645,13 +618,9 @@ pub struct IxJoinRhs<'a> {
645618
/// The lhs of the join
646619
lhs: Box<Iter<'a>>,
647620
/// The rhs index
648-
rhs_index: &'a TableIndex,
621+
rhs_index: TableAndIndex<'a>,
649622
/// The current rhs index cursor
650623
rhs_index_cursor: Option<TableIndexPointIter<'a>>,
651-
/// A handle to the datastore
652-
rhs_table: &'a Table,
653-
/// A handle to the blobstore
654-
blob_store: &'a dyn BlobStore,
655624
/// The lhs probe field
656625
lhs_field: &'a TupleField,
657626
}
@@ -662,16 +631,11 @@ impl<'a> IxJoinRhs<'a> {
662631
Tx: Datastore + DeltaStore,
663632
{
664633
let lhs = Iter::build(&join.lhs, tx)?;
665-
let rhs_table = tx.table_or_err(join.rhs.table_id)?;
666-
let rhs_index = rhs_table
667-
.get_index_by_id(join.rhs_index)
668-
.ok_or_else(|| anyhow!("IndexId `{}` does not exist", join.rhs_index))?;
634+
let rhs_index = get_index(tx, join.rhs.table_id, join.rhs_index)?;
669635
Ok(Self {
670636
lhs: Box::new(lhs),
671637
rhs_index,
672638
rhs_index_cursor: None,
673-
rhs_table,
674-
blob_store: tx.blob_store(),
675639
lhs_field: &join.lhs_field,
676640
})
677641
}
@@ -686,18 +650,17 @@ impl<'a> Iterator for IxJoinRhs<'a> {
686650
.and_then(|cursor| {
687651
cursor
688652
.next()
689-
.and_then(|ptr| self.rhs_table.get_row_ref(self.blob_store, ptr))
653+
// SAFETY: `ptr` came from `self.rhs_index`.
654+
.map(|ptr| unsafe { self.rhs_index.combine_with_ptr(ptr) })
690655
.map(Row::Ptr)
691656
})
692657
.or_else(|| {
693658
self.lhs.find_map(|tuple| {
694659
let mut cursor = self.rhs_index.seek_point(&tuple.project(self.lhs_field));
695-
cursor.next().and_then(|ptr| {
696-
self.rhs_table
697-
.get_row_ref(self.blob_store, ptr)
698-
.map(Row::Ptr)
699-
.inspect(|_| self.rhs_index_cursor = Some(cursor))
700-
})
660+
cursor
661+
.next()
662+
.map(Row::Ptr)
663+
.inspect(|_| self.rhs_index_cursor = Some(cursor.index()))
701664
})
702665
})
703666
}

crates/execution/src/pipelined.rs

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{
33
ops::Bound,
44
};
55

6-
use anyhow::{anyhow, Result};
6+
use anyhow::Result;
77
use itertools::Either;
88
use spacetimedb_expr::expr::AggType;
99
use spacetimedb_lib::{metrics::ExecutionMetrics, query::Delta, sats::size_of::SizeOf, AlgebraicValue, ProductValue};
@@ -14,7 +14,7 @@ use spacetimedb_physical_plan::plan::{
1414
use spacetimedb_primitives::{ColId, IndexId, TableId};
1515
use spacetimedb_sats::product;
1616

17-
use crate::{Datastore, DeltaStore, Row, Tuple};
17+
use crate::{iter::get_index, Datastore, DeltaStore, Row, Tuple};
1818

1919
/// An executor for explicit column projections.
2020
/// Note, this plan can only be constructed from the http api,
@@ -728,11 +728,7 @@ impl PipelinedIxJoin {
728728
metrics: &mut ExecutionMetrics,
729729
f: &mut dyn FnMut(Tuple<'a>) -> Result<()>,
730730
) -> Result<()> {
731-
let blob_store = tx.blob_store();
732-
let rhs_table = tx.table_or_err(self.rhs_table)?;
733-
let rhs_index = rhs_table
734-
.get_index_by_id(self.rhs_index)
735-
.ok_or_else(|| anyhow!("IndexId `{0}` does not exist", self.rhs_index))?;
731+
let rhs_index = get_index(tx, self.rhs_table, self.rhs_index)?;
736732

737733
let mut n = 0;
738734
let mut index_seeks = 0;
@@ -751,7 +747,10 @@ impl PipelinedIxJoin {
751747
lhs.execute(tx, metrics, &mut |u| {
752748
n += 1;
753749
index_seeks += 1;
754-
if rhs_index.contains_any(&project(&u, lhs_field, &mut bytes_scanned)) {
750+
if rhs_index
751+
.index()
752+
.contains_any(&project(&u, lhs_field, &mut bytes_scanned))
753+
{
755754
f(u)?;
756755
}
757756
Ok(())
@@ -771,7 +770,6 @@ impl PipelinedIxJoin {
771770
if let Some(v) = rhs_index
772771
.seek_point(&project(&u, lhs_field, &mut bytes_scanned))
773772
.next()
774-
.and_then(|ptr| rhs_table.get_row_ref(blob_store, ptr))
775773
.map(Row::Ptr)
776774
.map(Tuple::Row)
777775
{
@@ -794,7 +792,6 @@ impl PipelinedIxJoin {
794792
if let Some(v) = rhs_index
795793
.seek_point(&project(&u, lhs_field, &mut bytes_scanned))
796794
.next()
797-
.and_then(|ptr| rhs_table.get_row_ref(blob_store, ptr))
798795
.map(Row::Ptr)
799796
.map(Tuple::Row)
800797
{
@@ -815,7 +812,7 @@ impl PipelinedIxJoin {
815812
lhs.execute(tx, metrics, &mut |u| {
816813
n += 1;
817814
index_seeks += 1;
818-
if let Some(n) = rhs_index.count(&project(&u, lhs_field, &mut bytes_scanned)) {
815+
if let Some(n) = rhs_index.index().count(&project(&u, lhs_field, &mut bytes_scanned)) {
819816
for _ in 0..n {
820817
f(u.clone())?;
821818
}
@@ -836,7 +833,6 @@ impl PipelinedIxJoin {
836833
index_seeks += 1;
837834
for v in rhs_index
838835
.seek_point(&project(&u, lhs_field, &mut bytes_scanned))
839-
.filter_map(|ptr| rhs_table.get_row_ref(blob_store, ptr))
840836
.map(Row::Ptr)
841837
.map(Tuple::Row)
842838
{
@@ -858,7 +854,6 @@ impl PipelinedIxJoin {
858854
index_seeks += 1;
859855
for v in rhs_index
860856
.seek_point(&project(&u, lhs_field, &mut bytes_scanned))
861-
.filter_map(|ptr| rhs_table.get_row_ref(blob_store, ptr))
862857
.map(Row::Ptr)
863858
.map(Tuple::Row)
864859
{

crates/table/src/table.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1752,6 +1752,16 @@ impl<'a> TableAndIndex<'a> {
17521752
self.index
17531753
}
17541754

1755+
/// Wraps `ptr` in a [`RowRef`].
1756+
///
1757+
/// # Safety
1758+
///
1759+
/// The `self.table().is_row_present(ptr)` must hold.
1760+
pub unsafe fn combine_with_ptr(&self, ptr: RowPointer) -> RowRef<'a> {
1761+
// SAFETY: forward caller requirement.
1762+
unsafe { self.table.get_row_ref_unchecked(self.blob_store, ptr) }
1763+
}
1764+
17551765
/// Returns an iterator yielding all rows in this index for `key`.
17561766
///
17571767
/// Matching is defined by `Ord for AlgebraicValue`.
@@ -1788,6 +1798,13 @@ pub struct IndexScanPointIter<'a> {
17881798
btree_index_iter: TableIndexPointIter<'a>,
17891799
}
17901800

1801+
impl<'a> IndexScanPointIter<'a> {
1802+
/// Consume the iterator, returning the inner one.
1803+
pub fn index(self) -> TableIndexPointIter<'a> {
1804+
self.btree_index_iter
1805+
}
1806+
}
1807+
17911808
impl<'a> Iterator for IndexScanPointIter<'a> {
17921809
type Item = RowRef<'a>;
17931810

0 commit comments

Comments
 (0)