Skip to content
Merged
7 changes: 7 additions & 0 deletions crates/memory-usage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ impl<K: MemoryUsage, V: MemoryUsage> MemoryUsage for std::collections::BTreeMap<
}
}

impl<T: MemoryUsage> MemoryUsage for std::collections::BTreeSet<T> {
fn heap_usage(&self) -> usize {
// NB: this is best-effort, since we don't have a `capacity()` method on `BTreeMap`.
Comment thread
kim marked this conversation as resolved.
self.len() * mem::size_of::<T>() + self.iter().map(|t| t.heap_usage()).sum::<usize>()
}
}

#[cfg(feature = "smallvec")]
impl<A: smallvec::Array> MemoryUsage for smallvec::SmallVec<A>
where
Expand Down
15 changes: 14 additions & 1 deletion crates/table/src/page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,13 @@ impl PageHeader {
pub(super) fn present_rows_storage_ptr_for_test(&self) -> *const () {
self.fixed.present_rows.storage().as_ptr().cast()
}

/// Returns the number of var-len granules available for allocation,
/// including those in the "gap" between the fixed-len and var-len part of the page.
fn available_var_len_granules(&self) -> usize {
self.var.freelist_len as usize
+ VarLenGranule::space_to_granules(gap_remaining_size(self.var.first, self.fixed.last))
}
}

/// Fixed-length row portions must be at least large enough to store a `FreeCellRef`.
Expand Down Expand Up @@ -1195,6 +1202,12 @@ impl Page {
self.header.var.num_granules as usize
}

/// Returns the number of var-len granules free to store data,
/// including those in the "gap" between the fixed-len and var-len part of the page.
pub fn available_var_len_granules(&self) -> usize {
self.header.available_var_len_granules()
}

#[cfg(test)]
/// # Safety
///
Expand Down Expand Up @@ -1369,7 +1382,7 @@ impl Page {

/// Returns whether the row is full with respect to storing a fixed row with `fixed_row_size`
/// and no variable component.
pub fn is_full(&self, fixed_row_size: Size) -> bool {
pub(crate) fn is_full(&self, fixed_row_size: Size) -> bool {
!self.has_space_for_row(fixed_row_size, 0)
}

Expand Down
192 changes: 153 additions & 39 deletions crates/table/src/pages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use super::var_len::VarLenMembers;
use core::ops::{ControlFlow, Deref, Index, IndexMut};
use spacetimedb_sats::layout::Size;
use spacetimedb_sats::memory_usage::MemoryUsage;
use std::collections::BTreeSet;
use std::ops::DerefMut;
use thiserror::Error;

Expand Down Expand Up @@ -39,8 +40,21 @@ impl IndexMut<PageIndex> for Pages {
pub struct Pages {
/// The collection of pages under management.
pages: Vec<Box<Page>>,
/// The set of pages that aren't yet full.
non_full_pages: Vec<PageIndex>,
/// The set of pages that aren't yet full,
/// sorted by the number of var-len granules available in each page.
///
/// Used during insertion to locate a page with enough space to store a given row.
///
/// The first value in the tuple is [`Page::available_var_len_granules`], and the second value is the page index.
///
/// Pages for which [`Page::is_full`] is true are not stored.
///
/// If multiple pages have the same number of granules available, they are then sorted by `PageIndex`.
/// This maintains a deterministic sort order,
/// so that replaying the same set of operations on multiple datastores
/// will always result in the same layout of rows in pages,
/// regardless of when those datastores were (re)started prior to or during the sequence of operations.
non_full_pages: BTreeSet<(usize, PageIndex)>,
}

impl MemoryUsage for Pages {
Expand All @@ -51,6 +65,48 @@ impl MemoryUsage for Pages {
}

impl Pages {
#[cfg(test)]
pub(crate) fn assert_non_full_pages_consistent(&self, fixed_row_size: Size) {
let mut seen_page_indexes = BTreeSet::new();
for &(_, page_index) in &self.non_full_pages {
assert!(
seen_page_indexes.insert(page_index),
"page {:?} appears multiple times in non_full_pages",
page_index
);
}

for (idx, page) in self.pages.iter().enumerate() {
let page_index = PageIndex(idx as u64);
let is_full = page.is_full(fixed_row_size);
let available_granules = page.available_var_len_granules();
let entries_for_page: Vec<_> = self
.non_full_pages
.iter()
.copied()
.filter(|&(_, idx)| idx == page_index)
.collect();

if is_full {
assert!(
entries_for_page.is_empty(),
"page {:?} has 0 available var-len granules but appears in non_full_pages as {:?}",
page_index,
entries_for_page
);
} else {
assert_eq!(
entries_for_page,
vec![(available_granules, page_index)],
"page {:?} has {} available var-len granules but non_full_pages has {:?}",
page_index,
available_granules,
entries_for_page
);
}
}
}

/// Is there space to allocate another page?
pub fn can_allocate_new_page(&self) -> Result<PageIndex, Error> {
let new_idx = self.len();
Expand Down Expand Up @@ -78,7 +134,14 @@ impl Pages {
page.clear();
}
// Mark every page non-full.
self.non_full_pages = (0..self.pages.len()).map(|idx| PageIndex(idx as u64)).collect();
self.non_full_pages = (0..self.pages.len())
// We could probably compute the number of available granules once and use it for all pages,
// rather than calling the method on each page,
// but we'd have to do some amount of reasoning to demonstrate it was correct
// based on the definition of `Page::clear`,
// and why bother?
.map(|idx| (self.pages[idx].available_var_len_granules(), PageIndex(idx as u64)))
Comment thread
kim marked this conversation as resolved.
.collect();
}

/// Get a reference to fixed-len row data.
Expand All @@ -94,7 +157,7 @@ impl Pages {
/// returning an error if the new number of pages would overflow `PageIndex::MAX`.
///
/// The new page is initially empty, but is not added to the non-full set.
/// Callers should call [`Pages::maybe_mark_page_non_full`] after operating on the new page.
/// Callers should call [`Pages::record_page_non_full`] after operating on the new page.
fn allocate_new_page(&mut self, pool: &PagePool, fixed_row_size: Size) -> Result<PageIndex, Error> {
let new_idx = self.can_allocate_new_page()?;

Expand All @@ -107,23 +170,10 @@ impl Pages {
/// Reserve a new, initially empty page.
pub fn reserve_empty_page(&mut self, pool: &PagePool, fixed_row_size: Size) -> Result<PageIndex, Error> {
let idx = self.allocate_new_page(pool, fixed_row_size)?;
self.mark_page_non_full(idx);
self.record_page_non_full(idx, fixed_row_size);
Ok(idx)
}

/// Mark the page at `idx` as non-full.
pub fn mark_page_non_full(&mut self, idx: PageIndex) {
self.non_full_pages.push(idx);
}

/// If the page at `page_index` is not full,
/// add it to the non-full set so that later insertions can access it.
pub fn maybe_mark_page_non_full(&mut self, page_index: PageIndex, fixed_row_size: Size) {
if !self[page_index].is_full(fixed_row_size) {
self.non_full_pages.push(page_index);
}
}

/// Call `f` with a reference to a page which satisfies
/// `page.has_space_for_row(fixed_row_size, num_var_len_granules)`.
pub fn with_page_to_insert_row<Res>(
Expand All @@ -135,30 +185,29 @@ impl Pages {
) -> Result<(PageIndex, Res), Error> {
let page_index = self.find_page_with_space_for_row(pool, fixed_row_size, num_var_len_granules)?;
let res = f(&mut self[page_index]);
self.maybe_mark_page_non_full(page_index, fixed_row_size);
self.record_page_non_full(page_index, fixed_row_size);
Ok((page_index, res))
}

/// Find a page with sufficient available space to store a row of size `fixed_row_size`
/// containing `num_var_len_granules` granules of var-len data.
///
/// Retrieving a page in this way will remove it from the non-full set.
/// After performing an insertion, the caller should use [`Pages::maybe_mark_page_non_full`]
/// After performing an insertion, the caller should use [`Pages::record_page_non_full`]
/// to restore the page to the non-full set.
fn find_page_with_space_for_row(
&mut self,
pool: &PagePool,
fixed_row_size: Size,
num_var_len_granules: usize,
) -> Result<PageIndex, Error> {
if let Some((page_idx_idx, page_idx)) = self
if let Some((page_num_free_granules, page_idx)) = self
.non_full_pages
.iter()
.range((num_var_len_granules, PageIndex(0))..)
.copied()
.enumerate()
.find(|(_, page_idx)| self[*page_idx].has_space_for_row(fixed_row_size, num_var_len_granules))
{
self.non_full_pages.swap_remove(page_idx_idx);
self.non_full_pages.remove(&(page_num_free_granules, page_idx));
Comment thread
gefjon marked this conversation as resolved.
return Ok(page_idx);
}

Expand Down Expand Up @@ -232,23 +281,86 @@ impl Pages {
row_ptr: RowPointer,
blob_store: &mut dyn BlobStore,
) -> BlobNumBytes {
let page = &mut self[row_ptr.page_index()];
let page_index = row_ptr.page_index();

self.with_updating_non_full_pages(page_index, fixed_row_size, |this| {
let page = &mut this[page_index];

// SAFETY:
// - `row_ptr.page_offset()` does point to a valid row in this page
// as the caller promised that `row_ptr` points to a valid row in `self`.
//
// - `fixed_row_size` is consistent with the size in bytes of the fixed part of the row.
// The size is also conistent with `var_len_visitor`.
unsafe { page.delete_row(row_ptr.page_offset(), fixed_row_size, var_len_visitor, blob_store) }
})
}

/// Collect information about the page `self[page_index]` sufficient to update [`Self::non_full_pages`],
/// then run `body` to update the page, and finally update [`Self::non_full_pages`] for its new fullness and capacity.
///
/// `body` should not update any pages other than the one identified by `page_index`.
fn with_updating_non_full_pages<Ret>(
&mut self,
page_index: PageIndex,
fixed_row_size: Size,
body: impl FnOnce(&mut Self) -> Ret,
) -> Ret {
let page = &self[page_index];

let full_before = page.is_full(fixed_row_size);
// SAFETY:
// - `row_ptr.page_offset()` does point to a valid row in this page
// as the caller promised that `row_ptr` points to a valid row in `self`.
//
// - `fixed_row_size` is consistent with the size in bytes of the fixed part of the row.
// The size is also conistent with `var_len_visitor`.
let blob_store_deleted_bytes =
unsafe { page.delete_row(row_ptr.page_offset(), fixed_row_size, var_len_visitor, blob_store) };

// If the page was previously full, mark it as non-full now,
// since we just opened a space in it.
let available_granules_before = page.available_var_len_granules();

let ret = body(self);

self.update_page_non_full(available_granules_before, full_before, page_index, fixed_row_size);

ret
}

/// Update [`Self::non_full_pages`] to change the number of var-len granules available in the page at `self[page_index]`,
/// first deleting any old entry and then re-inserting the new entry.
///
/// The entry for `page` in `self.non_full_granules` should not have been deleted prior to calling this method.
/// If the entry has already been deleted or was never present, instead use [`Self::record_page_non_full`].
///
/// `available_granules_before` should be the previous count from [`Page::available_var_len_granules`],
/// prior to whatever operation made space available in the page.
/// This is necessary because `non_full_pages` is a `BTreeSet` sorted by `(available_granules, page_index)`,
/// so locating the `page_index` without the `available_granules` would be slow.
///
/// `full_before` should be the result of [`Page::is_full`] prior to whatever operation made space available in the page.
/// This is necessary because `non_full_pages` does not store full pages (as the name implies),
/// so we should not attempt to delete the previous entry if the page was previously full.
fn update_page_non_full(
&mut self,
available_granules_before: usize,
full_before: bool,
page_index: PageIndex,
fixed_row_size: Size,
) {
if full_before {
self.mark_page_non_full(row_ptr.page_index());
debug_assert!(!self.non_full_pages.remove(&(available_granules_before, page_index)));
} else {
let _prev = self.non_full_pages.remove(&(available_granules_before, page_index));
debug_assert!(_prev);
}

self.record_page_non_full(page_index, fixed_row_size);
}

/// Record the number of available var-len granules in the page at `self[page_index]` into [`Self::non_full_pages`].
///
/// Prior to calling this function, there must not be an entry for `page_index` in [`Self::non_full_pages`].
fn record_page_non_full(&mut self, page_index: PageIndex, fixed_row_size: Size) {
debug_assert!(!self.non_full_pages.iter().any(|(_, idx)| *idx == page_index));

let page = &self[page_index];
let available_granules = page.available_var_len_granules();

if !page.is_full(fixed_row_size) {
self.non_full_pages.insert((available_granules, page_index));
}
blob_store_deleted_bytes
}

/// Materialize a view of rows in `self` for which the `filter` returns `true`.
Expand Down Expand Up @@ -358,7 +470,9 @@ impl Pages {
self.non_full_pages = pages
.iter()
.enumerate()
.filter_map(|(idx, page)| (!page.is_full(fixed_row_size)).then_some(PageIndex(idx as _)))
.filter_map(|(idx, page)| {
(!page.is_full(fixed_row_size)).then_some((page.available_var_len_granules(), PageIndex(idx as _)))
})
.collect();
self.pages = pages;
}
Expand Down
Loading
Loading