@@ -9,6 +9,7 @@ use crate::auth::{
99 SpacetimeIdentityToken ,
1010} ;
1111use crate :: routes:: subscribe:: generate_random_connection_id;
12+ use crate :: util:: serde:: humantime_duration;
1213pub use crate :: util:: { ByteStringBody , NameOrIdentity } ;
1314use crate :: {
1415 log_and_500, Action , Authorization , ControlStateDelegate , DatabaseDef , DatabaseResetDef , Host , MaybeMisdirected ,
@@ -20,13 +21,14 @@ use axum::response::{ErrorResponse, IntoResponse};
2021use axum:: routing:: MethodRouter ;
2122use axum:: Extension ;
2223use axum_extra:: TypedHeader ;
24+ use derive_more:: From ;
2325use futures:: TryStreamExt ;
2426use http:: StatusCode ;
2527use log:: { info, warn} ;
2628use serde:: Deserialize ;
2729use spacetimedb:: auth:: identity:: ConnectionAuthCtx ;
2830use spacetimedb:: database_logger:: DatabaseLogger ;
29- use spacetimedb:: host:: module_host:: ClientConnectedError ;
31+ use spacetimedb:: host:: module_host:: { ClientConnectedError , DurabilityExited } ;
3032use spacetimedb:: host:: { CallResult , UpdateDatabaseResult } ;
3133use spacetimedb:: host:: { FunctionArgs , MigratePlanResult } ;
3234use spacetimedb:: host:: { ModuleHost , ReducerOutcome } ;
@@ -44,6 +46,9 @@ use spacetimedb_lib::{sats, AlgebraicValue, Hash, ProductValue, Timestamp};
4446use spacetimedb_schema:: auto_migrate:: {
4547 MigrationPolicy as SchemaMigrationPolicy , MigrationToken , PrettyPrintStyle as AutoMigratePrettyPrintStyle ,
4648} ;
49+ use tokio:: sync:: oneshot;
50+ use tokio:: time:: error:: Elapsed ;
51+ use tokio:: time:: timeout;
4752
4853use super :: subscribe:: { handle_websocket, HasWebSocketOptions } ;
4954
@@ -695,8 +700,33 @@ pub struct PublishDatabaseQueryParams {
695700 parent : Option < NameOrIdentity > ,
696701 #[ serde( alias = "org" ) ]
697702 organization : Option < NameOrIdentity > ,
703+ /// Duration to wait for a database update to become confirmed (i.e. durable).
704+ ///
705+ /// The value is parsed via the `humantime` crate, e.g. "1m", "23s", "5min".
706+ ///
707+ /// If not given, defaults to [default_update_confirmation_timeout].
708+ /// The maximum timeout is capped by [MAX_UPDATE_CONFIRMATION_TIMEOUT].
709+ ///
710+ /// The parameter has no effect when creating a new database.
711+ #[ serde( with = "humantime_duration" , default = "default_update_confirmation_timeout" ) ]
712+ update_confirmation_timeout : Duration ,
713+ }
714+
715+ /// Default timeout for a database update to become confirmed / durable.
716+ ///
717+ /// Currently, the value is 5s.
718+ const fn default_update_confirmation_timeout ( ) -> Duration {
719+ Duration :: from_secs ( 5 )
698720}
699721
722+ /// Maximum timeout for a database update to become confirmed / durable.
723+ ///
724+ /// If a replication group doesn't converge within this time span, it is
725+ /// probably not making progress at all.
726+ ///
727+ /// Currently, the value is 5min.
728+ const MAX_UPDATE_CONFIRMATION_TIMEOUT : Duration = Duration :: from_secs ( 5 * 60 ) ;
729+
700730pub async fn publish < S : NodeDelegate + ControlStateDelegate + Authorization > (
701731 State ( ctx) : State < S > ,
702732 Path ( PublishDatabaseParams { name_or_identity } ) : Path < PublishDatabaseParams > ,
@@ -708,6 +738,7 @@ pub async fn publish<S: NodeDelegate + ControlStateDelegate + Authorization>(
708738 host_type,
709739 parent,
710740 organization,
741+ update_confirmation_timeout : confirmation_timeout,
711742 } ) : Query < PublishDatabaseQueryParams > ,
712743 Extension ( auth) : Extension < SpacetimeAuth > ,
713744 program_bytes : Bytes ,
@@ -823,23 +854,72 @@ pub async fn publish<S: NodeDelegate + ControlStateDelegate + Authorization>(
823854 . await
824855 . map_err ( log_and_500) ?;
825856
857+ let success = || {
858+ axum:: Json ( PublishResult :: Success {
859+ domain : db_name. cloned ( ) ,
860+ database_identity,
861+ op : publish_op,
862+ } )
863+ } ;
826864 match maybe_updated {
827865 Some ( UpdateDatabaseResult :: AutoMigrateError ( errs) ) => {
828866 Err ( bad_request ( format ! ( "Database update rejected: {errs}" ) . into ( ) ) )
829867 }
830868 Some ( UpdateDatabaseResult :: ErrorExecutingMigration ( err) ) => Err ( bad_request (
831869 format ! ( "Failed to create or update the database: {err}" ) . into ( ) ,
832870 ) ) ,
833- None
834- | Some (
835- UpdateDatabaseResult :: NoUpdateNeeded
836- | UpdateDatabaseResult :: UpdatePerformed
837- | UpdateDatabaseResult :: UpdatePerformedWithClientDisconnect ,
838- ) => Ok ( axum:: Json ( PublishResult :: Success {
839- domain : db_name. cloned ( ) ,
840- database_identity,
841- op : publish_op,
842- } ) ) ,
871+ None | Some ( UpdateDatabaseResult :: NoUpdateNeeded ) => Ok ( success ( ) ) ,
872+ Some (
873+ UpdateDatabaseResult :: UpdatePerformed {
874+ tx_offset,
875+ durable_offset,
876+ }
877+ | UpdateDatabaseResult :: UpdatePerformedWithClientDisconnect {
878+ tx_offset,
879+ durable_offset,
880+ } ,
881+ ) => {
882+ timeout ( confirmation_timeout. min ( MAX_UPDATE_CONFIRMATION_TIMEOUT ) , async {
883+ let tx_offset = tx_offset. await ?;
884+ if let Some ( mut durable_offset) = durable_offset {
885+ durable_offset. wait_for ( tx_offset) . await ?;
886+ }
887+
888+ Ok :: < _ , UpdateConfirmationError > ( ( ) )
889+ } )
890+ . await
891+ . map_err ( Into :: into)
892+ . flatten ( ) ?;
893+
894+ Ok ( success ( ) )
895+ }
896+ }
897+ }
898+
899+ #[ derive( From ) ]
900+ enum UpdateConfirmationError {
901+ Cancelled ( oneshot:: error:: RecvError ) ,
902+ Crashed ( DurabilityExited ) ,
903+ Timeout ( Elapsed ) ,
904+ }
905+
906+ impl From < UpdateConfirmationError > for ErrorResponse {
907+ fn from ( e : UpdateConfirmationError ) -> Self {
908+ match e {
909+ UpdateConfirmationError :: Cancelled ( _) => (
910+ StatusCode :: SERVICE_UNAVAILABLE ,
911+ "Database update failed: transaction was cancelled" ,
912+ ) ,
913+ UpdateConfirmationError :: Crashed ( _) => (
914+ StatusCode :: SERVICE_UNAVAILABLE ,
915+ "Database update failed: database crashed while waiting for transaction confirmation" ,
916+ ) ,
917+ UpdateConfirmationError :: Timeout ( _) => (
918+ StatusCode :: GATEWAY_TIMEOUT ,
919+ "Database update failed: timeout waiting for transaction confirmation" ,
920+ ) ,
921+ }
922+ . into ( )
843923 }
844924}
845925
0 commit comments