Skip to content
Closed
149 changes: 140 additions & 9 deletions crates/client-api/src/routes/database.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::borrow::Cow;
use std::num::NonZeroU8;
use std::str::FromStr;
use std::time::Duration;
use std::time::{Duration, Instant};
use std::{env, io};

use crate::auth::{
Expand All @@ -24,7 +24,6 @@ use axum_extra::TypedHeader;
use derive_more::From;
use futures::TryStreamExt;
use http::StatusCode;
use log::{info, warn};
use serde::Deserialize;
use spacetimedb::auth::identity::ConnectionAuthCtx;
use spacetimedb::database_logger::DatabaseLogger;
Expand All @@ -49,6 +48,7 @@ use spacetimedb_schema::auto_migrate::{
use tokio::sync::oneshot;
use tokio::time::error::Elapsed;
use tokio::time::timeout;
use tracing::{info, warn};

use super::subscribe::{handle_websocket, HasWebSocketOptions};

Expand Down Expand Up @@ -838,6 +838,15 @@ pub async fn publish<S: NodeDelegate + ControlStateDelegate + Authorization>(
};

let schema_migration_policy = schema_migration_policy(policy, token)?;
let publish_start = Instant::now();
info!(
database = %database_identity,
op = ?publish_op,
host_type = ?host_type,
num_replicas = ?num_replicas,
confirmation_timeout = ?confirmation_timeout,
"publishing database"
);
let maybe_updated = ctx
.publish_database(
&auth.claims.identity,
Expand All @@ -853,6 +862,13 @@ pub async fn publish<S: NodeDelegate + ControlStateDelegate + Authorization>(
)
.await
.map_err(log_and_500)?;
info!(
database = %database_identity,
op = ?publish_op,
result = update_database_result_name(&maybe_updated),
elapsed = ?publish_start.elapsed(),
"publish_database returned"
);

let success = || {
axum::Json(PublishResult::Success {
Expand All @@ -879,24 +895,139 @@ pub async fn publish<S: NodeDelegate + ControlStateDelegate + Authorization>(
durable_offset,
},
) => {
timeout(confirmation_timeout.min(MAX_UPDATE_CONFIRMATION_TIMEOUT), async {
let tx_offset = tx_offset.await?;
let confirmation_timeout = confirmation_timeout.min(MAX_UPDATE_CONFIRMATION_TIMEOUT);
let confirmation_start = Instant::now();
let initial_durable_offset = durable_offset.as_ref().and_then(|offset| offset.last_seen());
info!(
database = %database_identity,
op = ?publish_op,
confirmation_timeout = ?confirmation_timeout,
has_durable_offset = durable_offset.is_some(),
initial_durable_offset = ?initial_durable_offset,
"waiting for database update confirmation"
);
let confirmation_result = timeout(confirmation_timeout, async {
let tx_wait_start = Instant::now();
let tx_offset = match tx_offset.await {
Ok(tx_offset) => {
info!(
database = %database_identity,
op = ?publish_op,
tx_offset,
elapsed = ?tx_wait_start.elapsed(),
"database update tx offset confirmed"
);
tx_offset
}
Err(err) => {
warn!(
database = %database_identity,
op = ?publish_op,
elapsed = ?tx_wait_start.elapsed(),
error = %err,
"database update tx offset wait was cancelled"
);
return Err(UpdateConfirmationError::Cancelled(err));
}
};
if let Some(mut durable_offset) = durable_offset {
durable_offset.wait_for(tx_offset).await?;
let durable_wait_start = Instant::now();
let last_seen_before_wait = durable_offset.last_seen();
info!(
database = %database_identity,
op = ?publish_op,
tx_offset,
last_seen_durable_offset = ?last_seen_before_wait,
"waiting for database update durable offset"
);
match durable_offset.wait_for(tx_offset).await {
Ok(confirmed_durable_offset) => {
info!(
database = %database_identity,
op = ?publish_op,
tx_offset,
confirmed_durable_offset,
elapsed = ?durable_wait_start.elapsed(),
"database update durable offset confirmed"
);
}
Err(err) => {
warn!(
database = %database_identity,
op = ?publish_op,
tx_offset,
last_seen_durable_offset = ?durable_offset.last_seen(),
elapsed = ?durable_wait_start.elapsed(),
error = %err,
"database update durable offset wait failed"
);
return Err(UpdateConfirmationError::Crashed(err));
}
}
} else {
info!(
database = %database_identity,
op = ?publish_op,
tx_offset,
"database update has no durable offset to wait for"
);
}

Ok::<_, UpdateConfirmationError>(())
})
.await
.map_err(Into::into)
.flatten()?;
.await;

match confirmation_result {
Ok(Ok(())) => {
info!(
database = %database_identity,
op = ?publish_op,
elapsed = ?confirmation_start.elapsed(),
"database update confirmation completed"
);
}
Ok(Err(err)) => {
warn!(
database = %database_identity,
op = ?publish_op,
elapsed = ?confirmation_start.elapsed(),
error = ?err,
"database update confirmation failed"
);
return Err(err.into());
}
Err(err) => {
warn!(
database = %database_identity,
op = ?publish_op,
confirmation_timeout = ?confirmation_timeout,
elapsed = ?confirmation_start.elapsed(),
initial_durable_offset = ?initial_durable_offset,
"database update confirmation timed out"
);
return Err(UpdateConfirmationError::Timeout(err).into());
}
}

Ok(success())
}
}
}

#[derive(From)]
fn update_database_result_name(result: &Option<UpdateDatabaseResult>) -> &'static str {
match result {
None => "created",
Some(UpdateDatabaseResult::NoUpdateNeeded) => "no_update_needed",
Some(UpdateDatabaseResult::UpdatePerformed { .. }) => "update_performed",
Some(UpdateDatabaseResult::UpdatePerformedWithClientDisconnect { .. }) => {
"update_performed_with_client_disconnect"
}
Some(UpdateDatabaseResult::AutoMigrateError(_)) => "auto_migrate_error",
Some(UpdateDatabaseResult::ErrorExecutingMigration(_)) => "error_executing_migration",
}
}

#[derive(Debug, From)]
enum UpdateConfirmationError {
Cancelled(oneshot::error::RecvError),
Crashed(DurabilityExited),
Expand Down
63 changes: 62 additions & 1 deletion crates/smoketests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use std::io::{BufRead, BufReader};
use std::path::{Path, PathBuf};
use std::process::{Command, Output, Stdio};
use std::sync::OnceLock;
use std::time::Instant;
use std::time::{Duration, Instant};
use which::which;

/// Returns the remote server URL if running against a remote server.
Expand Down Expand Up @@ -1199,6 +1199,12 @@ log = "0.4"
// Now publish with --bin-path to skip rebuild
let publish_start = Instant::now();
let mut args = vec!["publish", "--server", &self.server_url, "--bin-path", &wasm_path_str];
eprintln!(
"[SMOKETEST] publish start: server={} module={} target={}",
self.server_url,
self.module_name,
name.unwrap_or("<new>")
);

if opts.force {
args.push("--yes");
Expand Down Expand Up @@ -1256,6 +1262,10 @@ log = "0.4"
/// Arguments are passed directly to the CLI as strings.
pub fn call(&self, name: &str, args: &[&str]) -> Result<String> {
let identity = self.database_identity.as_ref().context("No database published")?;
eprintln!(
"[SMOKETEST] call start: server={} database={} reducer={} args={:?}",
self.server_url, identity, name, args
);

let mut cmd_args = vec!["call", "--server", &self.server_url, "--", identity.as_str(), name];
cmd_args.extend(args);
Expand Down Expand Up @@ -1315,13 +1325,21 @@ log = "0.4"
/// Executes a SQL query against the database.
pub fn sql(&self, query: &str) -> Result<String> {
let identity = self.database_identity.as_ref().context("No database published")?;
eprintln!(
"[SMOKETEST] sql start: server={} database={} query={}",
self.server_url, identity, query
);

self.spacetime(&["sql", "--server", &self.server_url, identity.as_str(), query])
}

/// Executes a SQL query with the --confirmed flag.
pub fn sql_confirmed(&self, query: &str) -> Result<String> {
let identity = self.database_identity.as_ref().context("No database published")?;
eprintln!(
"[SMOKETEST] confirmed sql start: server={} database={} query={}",
self.server_url, identity, query
);

self.spacetime(&[
"sql",
Expand Down Expand Up @@ -1350,6 +1368,49 @@ log = "0.4"
);
}

/// Asserts that a SQL query eventually produces the expected output.
///
/// Use this only for read-only queries after operations that can briefly
/// leave the database worker unavailable, such as publishing an update.
pub fn assert_sql_eventually(&self, query: &str, expected: &str) {
let expected_normalized = normalize_whitespace(expected);
let deadline = Instant::now() + Duration::from_secs(10);
let mut last_actual = None;
let mut last_error = None;

while Instant::now() < deadline {
match self.sql(query) {
Ok(actual) => {
let actual_normalized = normalize_whitespace(&actual);
if actual_normalized == expected_normalized {
return;
}
last_actual = Some(actual_normalized);
last_error = None;
}
Err(err) => {
last_error = Some(err.to_string());
}
}

std::thread::sleep(Duration::from_millis(200));
}

if let Some(actual_normalized) = last_actual {
assert_eq!(
actual_normalized, expected_normalized,
"SQL output mismatch for query after retry: {}\n\nExpected:\n{}\n\nActual:\n{}",
query, expected_normalized, actual_normalized
);
}

panic!(
"SQL query failed after retry: {}\n\nLast error:\n{}",
query,
last_error.unwrap_or_else(|| "no attempts completed".to_string())
);
}

/// Fetches the last N log entries from the database.
pub fn logs(&self, n: usize) -> Result<Vec<String>> {
let records = self.log_records(n)?;
Expand Down
2 changes: 1 addition & 1 deletion crates/smoketests/tests/smoketests/views.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ fn test_views_auto_migration() {
test.use_precompiled_module("views-auto-migrate-updated");
test.publish_module_clear(false).unwrap();

test.assert_sql(
test.assert_sql_eventually(
"SELECT * FROM player",
r#" id | level
----+-------
Expand Down
Loading
Loading