@@ -42,6 +42,7 @@ use spacetimedb_schema::{def::IndexAlgorithm, schema::TableSchema};
4242use spacetimedb_table:: {
4343 blob_store:: { BlobStore , HashMapBlobStore } ,
4444 indexes:: { RowPointer , SquashedOffset } ,
45+ page_pool:: PagePool ,
4546 table:: { IndexScanRangeIter , InsertError , RowRef , Table , TableAndIndex } ,
4647 MemoryUsage ,
4748} ;
@@ -53,13 +54,21 @@ use std::sync::Arc;
5354/// logs directly. For normal usage, see the RelationalDB struct instead.
5455///
5556/// NOTE: unstable API, this may change at any point in the future.
56- #[ derive( Default ) ]
5757pub struct CommittedState {
5858 pub ( crate ) next_tx_offset : u64 ,
5959 pub ( crate ) tables : IntMap < TableId , Table > ,
6060 pub ( crate ) blob_store : HashMapBlobStore ,
6161 /// Provides fast lookup for index id -> an index.
6262 pub ( super ) index_id_map : IndexIdMap ,
63+ /// The page pool used to retrieve new/unused pages for tables.
64+ ///
65+ /// Between transactions, this is untouched.
66+ /// During transactions, the [`MutTxId`] can steal pages from the committed state.
67+ ///
68+ /// This is a handle on a shared structure.
69+ /// Pages are shared between all modules running on a particular host,
70+ /// not allocated per-module.
71+ pub ( super ) page_pool : PagePool ,
6372}
6473
6574impl MemoryUsage for CommittedState {
@@ -69,7 +78,9 @@ impl MemoryUsage for CommittedState {
6978 tables,
7079 blob_store,
7180 index_id_map,
81+ page_pool : _,
7282 } = self ;
83+ // NOTE(centril): We do not want to include the heap usage of `page_pool` as it's a shared resource.
7384 next_tx_offset. heap_usage ( ) + tables. heap_usage ( ) + blob_store. heap_usage ( ) + index_id_map. heap_usage ( )
7485 }
7586}
@@ -143,6 +154,16 @@ fn ignore_duplicate_insert_error<T>(res: std::result::Result<T, InsertError>) ->
143154}
144155
145156impl CommittedState {
157+ pub ( super ) fn new ( page_pool : PagePool ) -> Self {
158+ Self {
159+ next_tx_offset : <_ >:: default ( ) ,
160+ tables : <_ >:: default ( ) ,
161+ blob_store : <_ >:: default ( ) ,
162+ index_id_map : <_ >:: default ( ) ,
163+ page_pool,
164+ }
165+ }
166+
146167 /// Extremely delicate function to bootstrap the system tables.
147168 /// Don't update this unless you know what you're doing.
148169 pub ( super ) fn bootstrap_system_tables ( & mut self , database_identity : Identity ) -> Result < ( ) > {
@@ -158,7 +179,8 @@ impl CommittedState {
158179 let ref_schemas = schemas. each_ref ( ) . map ( |s| & * * s) ;
159180
160181 // Insert the table row into st_tables, creating st_tables if it's missing.
161- let ( st_tables, blob_store) = self . get_table_and_blob_store_or_create ( ST_TABLE_ID , & schemas[ ST_TABLE_IDX ] ) ;
182+ let ( st_tables, blob_store, pool) =
183+ self . get_table_and_blob_store_or_create ( ST_TABLE_ID , & schemas[ ST_TABLE_IDX ] ) ;
162184 // Insert the table row into `st_tables` for all system tables
163185 for schema in ref_schemas {
164186 let table_id = schema. table_id ;
@@ -175,11 +197,12 @@ impl CommittedState {
175197 let row = ProductValue :: from ( row) ;
176198 // Insert the meta-row into the in-memory ST_TABLES.
177199 // If the row is already there, no-op.
178- ignore_duplicate_insert_error ( st_tables. insert ( blob_store, & row) ) ?;
200+ ignore_duplicate_insert_error ( st_tables. insert ( pool , blob_store, & row) ) ?;
179201 }
180202
181203 // Insert the columns into `st_columns`
182- let ( st_columns, blob_store) = self . get_table_and_blob_store_or_create ( ST_COLUMN_ID , & schemas[ ST_COLUMN_IDX ] ) ;
204+ let ( st_columns, blob_store, pool) =
205+ self . get_table_and_blob_store_or_create ( ST_COLUMN_ID , & schemas[ ST_COLUMN_IDX ] ) ;
183206 for col in ref_schemas. iter ( ) . flat_map ( |x| x. columns ( ) ) . cloned ( ) {
184207 let row = StColumnRow {
185208 table_id : col. table_id ,
@@ -190,15 +213,15 @@ impl CommittedState {
190213 let row = ProductValue :: from ( row) ;
191214 // Insert the meta-row into the in-memory ST_COLUMNS.
192215 // If the row is already there, no-op.
193- ignore_duplicate_insert_error ( st_columns. insert ( blob_store, & row) ) ?;
216+ ignore_duplicate_insert_error ( st_columns. insert ( pool , blob_store, & row) ) ?;
194217 // Increment row count for st_columns.
195218 with_label_values ( ST_COLUMN_ID , ST_COLUMN_NAME ) . inc ( ) ;
196219 }
197220
198221 // Insert the FK sorted by table/column so it show together when queried.
199222
200223 // Insert constraints into `st_constraints`
201- let ( st_constraints, blob_store) =
224+ let ( st_constraints, blob_store, pool ) =
202225 self . get_table_and_blob_store_or_create ( ST_CONSTRAINT_ID , & schemas[ ST_CONSTRAINT_IDX ] ) ;
203226 for ( i, constraint) in ref_schemas
204227 . iter ( )
@@ -219,13 +242,14 @@ impl CommittedState {
219242 let row = ProductValue :: from ( row) ;
220243 // Insert the meta-row into the in-memory ST_CONSTRAINTS.
221244 // If the row is already there, no-op.
222- ignore_duplicate_insert_error ( st_constraints. insert ( blob_store, & row) ) ?;
245+ ignore_duplicate_insert_error ( st_constraints. insert ( pool , blob_store, & row) ) ?;
223246 // Increment row count for st_constraints.
224247 with_label_values ( ST_CONSTRAINT_ID , ST_CONSTRAINT_NAME ) . inc ( ) ;
225248 }
226249
227250 // Insert the indexes into `st_indexes`
228- let ( st_indexes, blob_store) = self . get_table_and_blob_store_or_create ( ST_INDEX_ID , & schemas[ ST_INDEX_IDX ] ) ;
251+ let ( st_indexes, blob_store, pool) =
252+ self . get_table_and_blob_store_or_create ( ST_INDEX_ID , & schemas[ ST_INDEX_IDX ] ) ;
229253 for ( i, mut index) in ref_schemas
230254 . iter ( )
231255 . flat_map ( |x| & x. indexes )
@@ -240,7 +264,7 @@ impl CommittedState {
240264 let row = ProductValue :: from ( row) ;
241265 // Insert the meta-row into the in-memory ST_INDEXES.
242266 // If the row is already there, no-op.
243- ignore_duplicate_insert_error ( st_indexes. insert ( blob_store, & row) ) ?;
267+ ignore_duplicate_insert_error ( st_indexes. insert ( pool , blob_store, & row) ) ?;
244268 // Increment row count for st_indexes.
245269 with_label_values ( ST_INDEX_ID , ST_INDEX_NAME ) . inc ( ) ;
246270 }
@@ -260,7 +284,7 @@ impl CommittedState {
260284 // IMPORTANT: It is crucial that the `st_sequences` table is created last
261285
262286 // Insert the sequences into `st_sequences`
263- let ( st_sequences, blob_store) =
287+ let ( st_sequences, blob_store, pool ) =
264288 self . get_table_and_blob_store_or_create ( ST_SEQUENCE_ID , & schemas[ ST_SEQUENCE_IDX ] ) ;
265289 // We create sequences last to get right the starting number
266290 // so, we don't sort here
@@ -286,7 +310,7 @@ impl CommittedState {
286310 let row = ProductValue :: from ( row) ;
287311 // Insert the meta-row into the in-memory ST_SEQUENCES.
288312 // If the row is already there, no-op.
289- ignore_duplicate_insert_error ( st_sequences. insert ( blob_store, & row) ) ?;
313+ ignore_duplicate_insert_error ( st_sequences. insert ( pool , blob_store, & row) ) ?;
290314 // Increment row count for st_sequences
291315 with_label_values ( ST_SEQUENCE_ID , ST_SEQUENCE_NAME ) . inc ( ) ;
292316 }
@@ -320,7 +344,7 @@ impl CommittedState {
320344 . ok_or ( TableError :: IdNotFoundState ( table_id) ) ?;
321345 let blob_store = & mut self . blob_store ;
322346 table
323- . delete_equal_row ( blob_store, rel)
347+ . delete_equal_row ( & self . page_pool , blob_store, rel)
324348 . map_err ( TableError :: Bflatn ) ?
325349 . ok_or_else ( || anyhow ! ( "Delete for non-existent row when replaying transaction" ) ) ?;
326350 Ok ( ( ) )
@@ -332,8 +356,8 @@ impl CommittedState {
332356 schema : & Arc < TableSchema > ,
333357 row : & ProductValue ,
334358 ) -> Result < ( ) > {
335- let ( table, blob_store) = self . get_table_and_blob_store_or_create ( table_id, schema) ;
336- table. insert ( blob_store, row) . map ( drop) . map_err ( |e| match e {
359+ let ( table, blob_store, pool ) = self . get_table_and_blob_store_or_create ( table_id, schema) ;
360+ table. insert ( pool , blob_store, row) . map ( drop) . map_err ( |e| match e {
337361 InsertError :: Bflatn ( e) => TableError :: Bflatn ( e) . into ( ) ,
338362 InsertError :: Duplicate ( e) => TableError :: Duplicate ( e) . into ( ) ,
339363 InsertError :: IndexError ( e) => IndexError :: UniqueConstraintViolation ( e) . into ( ) ,
@@ -553,9 +577,8 @@ impl CommittedState {
553577 deletes. push ( pv) ;
554578 }
555579
556- let table_name = & * table. get_schema ( ) . table_name ;
557-
558580 if !deletes. is_empty ( ) {
581+ let table_name = & * table. get_schema ( ) . table_name ;
559582 tx_data. set_deletes_for_table ( table_id, table_name, deletes. into ( ) ) ;
560583 }
561584 } else if !row_ptrs. is_empty ( ) {
@@ -579,43 +602,47 @@ impl CommittedState {
579602 // and the fullness of the page.
580603
581604 for ( table_id, tx_table) in insert_tables {
582- let ( commit_table, commit_blob_store) =
605+ let ( commit_table, commit_blob_store, page_pool ) =
583606 self . get_table_and_blob_store_or_create ( table_id, tx_table. get_schema ( ) ) ;
584607
585- // TODO(perf): Allocate with capacity?
586- let mut inserts = vec ! [ ] ;
587608 // For each newly-inserted row, insert it into the committed state.
609+ let mut inserts = Vec :: with_capacity ( tx_table. row_count as usize ) ;
588610 for row_ref in tx_table. scan_rows ( & tx_blob_store) {
589611 let pv = row_ref. to_product_value ( ) ;
590612 commit_table
591- . insert ( commit_blob_store, & pv)
613+ . insert ( page_pool , commit_blob_store, & pv)
592614 . expect ( "Failed to insert when merging commit" ) ;
593615
594616 inserts. push ( pv) ;
595617 }
596618
597- let table_name = & * commit_table. get_schema ( ) . table_name ;
598-
619+ // Add the table to `TxData` if there were insertions.
599620 if !inserts. is_empty ( ) {
621+ let table_name = & * commit_table. get_schema ( ) . table_name ;
600622 tx_data. set_inserts_for_table ( table_id, table_name, inserts. into ( ) ) ;
601623 }
602624
625+ let ( schema, indexes, pages) = tx_table. consume_for_merge ( ) ;
626+
603627 // Add all newly created indexes to the committed state.
604- for ( cols , mut index) in tx_table . indexes {
605- if !commit_table. indexes . contains_key ( & cols ) {
628+ for ( index_id , mut index) in indexes {
629+ if !commit_table. indexes . contains_key ( & index_id ) {
606630 index. clear ( ) ;
607631 // SAFETY: `tx_table` is derived from `commit_table`,
608632 // so they have the same row type.
609633 // This entails that all indices in `tx_table`
610634 // were constructed with the same row type/layout as `commit_table`.
611- unsafe { commit_table. insert_index ( commit_blob_store, cols , index) } ;
635+ unsafe { commit_table. insert_index ( commit_blob_store, index_id , index) } ;
612636 }
613637 }
614638
615639 // The schema may have been modified in the transaction.
616640 // Update this last to placate borrowck and avoid a clone.
617641 // None of the above operations will inspect the schema.
618- commit_table. schema = tx_table. schema ;
642+ commit_table. schema = schema;
643+
644+ // Put all the pages in the table back into the pool.
645+ self . page_pool . put_many ( pages) ;
619646 }
620647 }
621648
@@ -642,8 +669,8 @@ impl CommittedState {
642669 self . tables . get ( & table_id)
643670 }
644671
645- pub ( super ) fn get_table_mut ( & mut self , table_id : TableId ) -> Option < & mut Table > {
646- self . tables . get_mut ( & table_id)
672+ pub ( super ) fn get_table_mut ( & mut self , table_id : TableId ) -> ( Option < & mut Table > , & PagePool ) {
673+ ( self . tables . get_mut ( & table_id) , & self . page_pool )
647674 }
648675
649676 pub fn get_table_and_blob_store_immutable ( & self , table_id : TableId ) -> Option < ( & Table , & dyn BlobStore ) > {
@@ -670,13 +697,14 @@ impl CommittedState {
670697 & ' this mut self ,
671698 table_id : TableId ,
672699 schema : & Arc < TableSchema > ,
673- ) -> ( & ' this mut Table , & ' this mut dyn BlobStore ) {
700+ ) -> ( & ' this mut Table , & ' this mut dyn BlobStore , & ' this PagePool ) {
674701 let table = self
675702 . tables
676703 . entry ( table_id)
677704 . or_insert_with ( || Self :: make_table ( schema. clone ( ) ) ) ;
678705 let blob_store = & mut self . blob_store ;
679- ( table, blob_store)
706+ let pool = & mut self . page_pool ;
707+ ( table, blob_store, pool)
680708 }
681709
682710 pub fn report_data_size ( & self , database_identity : Identity ) {
0 commit comments