@@ -12,6 +12,7 @@ use crate::identity::Identity;
1212use crate :: messages:: control_db:: Database ;
1313use crate :: replica_context:: ReplicaContext ;
1414use crate :: sql:: ast:: SchemaViewer ;
15+ use crate :: sql:: parser:: RowLevelExpr ;
1516use crate :: subscription:: module_subscription_actor:: ModuleSubscriptions ;
1617use crate :: subscription:: tx:: DeltaTx ;
1718use crate :: subscription:: { execute_plan, record_exec_metrics} ;
@@ -39,6 +40,7 @@ use spacetimedb_sats::ProductValue;
3940use spacetimedb_schema:: auto_migrate:: AutoMigrateError ;
4041use spacetimedb_schema:: def:: deserialize:: ReducerArgsDeserializeSeed ;
4142use spacetimedb_schema:: def:: { ModuleDef , ReducerDef } ;
43+ use spacetimedb_schema:: schema:: { Schema , TableSchema } ;
4244use spacetimedb_vm:: relation:: RelValue ;
4345use std:: fmt;
4446use std:: sync:: { Arc , Weak } ;
@@ -263,9 +265,6 @@ pub trait Module: Send + Sync + 'static {
263265pub trait ModuleInstance : Send + ' static {
264266 fn trapped ( & self ) -> bool ;
265267
266- /// If the module instance's replica_ctx is uninitialized, initialize it.
267- fn init_database ( & mut self , program : Program ) -> anyhow:: Result < Option < ReducerCallResult > > ;
268-
269268 /// Update the module instance's database to match the schema of the module instance.
270269 fn update_database (
271270 & mut self ,
@@ -276,6 +275,79 @@ pub trait ModuleInstance: Send + 'static {
276275 fn call_reducer ( & mut self , tx : Option < MutTxId > , params : CallReducerParams ) -> ReducerCallResult ;
277276}
278277
278+ /// If the module instance's replica_ctx is uninitialized, initialize it.
279+ fn init_database (
280+ replica_ctx : & ReplicaContext ,
281+ module_def : & ModuleDef ,
282+ inst : & mut dyn ModuleInstance ,
283+ program : Program ,
284+ ) -> anyhow:: Result < Option < ReducerCallResult > > {
285+ log:: debug!( "init database" ) ;
286+ let timestamp = Timestamp :: now ( ) ;
287+ let stdb = & * replica_ctx. relational_db ;
288+ let logger = replica_ctx. logger . system_logger ( ) ;
289+
290+ let tx = stdb. begin_mut_tx ( IsolationLevel :: Serializable , Workload :: Internal ) ;
291+ let auth_ctx = AuthCtx :: for_current ( replica_ctx. database . owner_identity ) ;
292+ let ( tx, ( ) ) = stdb
293+ . with_auto_rollback ( tx, |tx| {
294+ let mut table_defs: Vec < _ > = module_def. tables ( ) . collect ( ) ;
295+ table_defs. sort_by ( |a, b| a. name . cmp ( & b. name ) ) ;
296+
297+ for def in table_defs {
298+ let table_name = & def. name ;
299+ logger. info ( & format ! ( "Creating table `{table_name}`" ) ) ;
300+ let schema = TableSchema :: from_module_def ( module_def, def, ( ) , TableId :: SENTINEL ) ;
301+ stdb. create_table ( tx, schema)
302+ . with_context ( || format ! ( "failed to create table {table_name}" ) ) ?;
303+ }
304+ // Insert the late-bound row-level security expressions.
305+ for rls in module_def. row_level_security ( ) {
306+ logger. info ( & format ! ( "Creating row level security `{}`" , rls. sql) ) ;
307+
308+ let rls = RowLevelExpr :: build_row_level_expr ( tx, & auth_ctx, rls)
309+ . with_context ( || format ! ( "failed to create row-level security: `{}`" , rls. sql) ) ?;
310+ let table_id = rls. def . table_id ;
311+ let sql = rls. def . sql . clone ( ) ;
312+ stdb. create_row_level_security ( tx, rls. def )
313+ . with_context ( || format ! ( "failed to create row-level security for table `{table_id}`: `{sql}`" , ) ) ?;
314+ }
315+
316+ stdb. set_initialized ( tx, replica_ctx. host_type , program) ?;
317+
318+ anyhow:: Ok ( ( ) )
319+ } )
320+ . inspect_err ( |e| log:: error!( "{e:?}" ) ) ?;
321+
322+ let rcr = match module_def. lifecycle_reducer ( Lifecycle :: Init ) {
323+ None => {
324+ stdb. commit_tx ( tx) ?;
325+ None
326+ }
327+
328+ Some ( ( reducer_id, _) ) => {
329+ logger. info ( "Invoking `init` reducer" ) ;
330+ let caller_identity = replica_ctx. database . owner_identity ;
331+ Some ( inst. call_reducer (
332+ Some ( tx) ,
333+ CallReducerParams {
334+ timestamp,
335+ caller_identity,
336+ caller_connection_id : ConnectionId :: ZERO ,
337+ client : None ,
338+ request_id : None ,
339+ timer : None ,
340+ reducer_id,
341+ args : ArgsTuple :: nullary ( ) ,
342+ } ,
343+ ) )
344+ }
345+ } ;
346+
347+ logger. info ( "Database initialized" ) ;
348+ Ok ( rcr)
349+ }
350+
279351pub struct CallReducerParams {
280352 pub timestamp : Timestamp ,
281353 pub caller_identity : Identity ,
@@ -306,11 +378,6 @@ impl<T: Module> ModuleInstance for AutoReplacingModuleInstance<T> {
306378 fn trapped ( & self ) -> bool {
307379 self . inst . trapped ( )
308380 }
309- fn init_database ( & mut self , program : Program ) -> anyhow:: Result < Option < ReducerCallResult > > {
310- let ret = self . inst . init_database ( program) ;
311- self . check_trap ( ) ;
312- ret
313- }
314381 fn update_database (
315382 & mut self ,
316383 program : Program ,
@@ -877,9 +944,13 @@ impl ModuleHost {
877944 }
878945
879946 pub async fn init_database ( & self , program : Program ) -> Result < Option < ReducerCallResult > , InitDatabaseError > {
880- self . call ( "<init_database>" , move |inst| inst. init_database ( program) )
881- . await ?
882- . map_err ( InitDatabaseError :: Other )
947+ let replica_ctx = self . inner . replica_ctx ( ) . clone ( ) ;
948+ let info = self . info . clone ( ) ;
949+ self . call ( "<init_database>" , move |inst| {
950+ init_database ( & replica_ctx, & info. module_def , inst, program)
951+ } )
952+ . await ?
953+ . map_err ( InitDatabaseError :: Other )
883954 }
884955
885956 pub async fn update_database (
0 commit comments