Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
12 changes: 6 additions & 6 deletions crates/sdk/examples/quickstart-chat/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,22 +63,22 @@ fn creds_store() -> credentials::File {
/// Our `on_connect` callback: save our credentials to a file.
fn on_connected(_ctx: &DbConnection, _identity: Identity, token: &str) {
if let Err(e) = creds_store().save(token) {
eprintln!("Failed to save credentials: {:?}", e);
eprintln!("Failed to save credentials: {e:?}");
}
}

// ### Handle errors and disconnections

/// Our `on_connect_error` callback: print the error, then exit the process.
fn on_connect_error(_ctx: &ErrorContext, err: Error) {
eprintln!("Connection error: {}", err);
eprintln!("Connection error: {err}");
std::process::exit(1);
}

/// Our `on_disconnect` callback: print a note, then exit the process.
fn on_disconnected(_ctx: &ErrorContext, err: Option<Error>) {
if let Some(err) = err {
eprintln!("Disconnected: {}", err);
eprintln!("Disconnected: {err}");
std::process::exit(1);
} else {
println!("Disconnected.");
Expand Down Expand Up @@ -166,14 +166,14 @@ fn print_message(ctx: &impl RemoteDbContext, message: &Message) {
/// Our `on_set_name` callback: print a warning if the reducer failed.
fn on_name_set(ctx: &ReducerEventContext, name: &String) {
if let Status::Failed(err) = &ctx.event.status {
eprintln!("Failed to change name to {:?}: {}", name, err);
eprintln!("Failed to change name to {name:?}: {err}");
}
}

/// Our `on_send_message` callback: print a warning if the reducer failed.
fn on_message_sent(ctx: &ReducerEventContext, text: &String) {
if let Status::Failed(err) = &ctx.event.status {
eprintln!("Failed to send message {:?}: {}", text, err);
eprintln!("Failed to send message {text:?}: {err}");
}
}

Expand Down Expand Up @@ -206,7 +206,7 @@ fn on_sub_applied(ctx: &SubscriptionEventContext) {
/// Or `on_error` callback:
/// print the error, then exit the process.
fn on_sub_error(_ctx: &ErrorContext, err: Error) {
eprintln!("Subscription failed: {}", err);
eprintln!("Subscription failed: {err}");
std::process::exit(1);
}

Expand Down
31 changes: 26 additions & 5 deletions crates/sdk/src/db_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ pub struct DbContextImpl<M: SpacetimeModule> {
/// May be `None` if we connected anonymously
/// and have not yet received the [`ws::IdentityToken`] message.
identity: SharedCell<Option<Identity>>,

/// This connection's `Identity`.
Comment thread
jsdt marked this conversation as resolved.
Outdated
///
/// This may be none if we have not yet received the [`ws::IdentityToken`] message.
connection_id: SharedCell<Option<ConnectionId>>,
}

impl<M: SpacetimeModule> Clone for DbContextImpl<M> {
Expand All @@ -99,6 +104,7 @@ impl<M: SpacetimeModule> Clone for DbContextImpl<M> {
pending_mutations_send: self.pending_mutations_send.clone(),
pending_mutations_recv: Arc::clone(&self.pending_mutations_recv),
identity: Arc::clone(&self.identity),
connection_id: Arc::clone(&self.connection_id),
}
}
}
Expand Down Expand Up @@ -130,7 +136,18 @@ impl<M: SpacetimeModule> DbContextImpl<M> {
}
*ident_store = Some(identity);
}
assert_eq!(get_connection_id(), conn_id);
{
// Don't hold the `self.connection_id` lock while running callbacks.
// Callbacks can (will) call [`DbContext::connection_id`], which acquires that lock,
// so holding it while running a callback causes deadlocks.
let mut conn_id_store = self.connection_id.lock().unwrap();
// This would only happen if the client is using the unstable `set_connection_id` method.
if let Some(prev_conn_id) = *conn_id_store {
assert_eq!(prev_conn_id, conn_id);
}
*conn_id_store = Some(conn_id);
}
// assert_eq!(get_connection_id(), conn_id);
Comment thread
jsdt marked this conversation as resolved.
Outdated
let mut inner = self.inner.lock().unwrap();
if let Some(on_connect) = inner.on_connect.take() {
let ctx = <M::DbConnection as DbConnection>::new(self.clone());
Expand Down Expand Up @@ -674,8 +691,9 @@ impl<M: SpacetimeModule> DbContextImpl<M> {
}

/// Called by the autogenerated `DbConnection` method of the same name.
/// TODO: Deprecate and add a `try_identity`.
pub fn connection_id(&self) -> ConnectionId {
get_connection_id()
self.connection_id.lock().unwrap().unwrap()
}
}

Expand Down Expand Up @@ -754,8 +772,8 @@ pub struct DbConnectionBuilder<M: SpacetimeModule> {
// TODO: rip this out. Make the connection id a property of the `DbConnection`. Cloud can supply it to the builder.
static CONNECTION_ID: OnceLock<ConnectionId> = OnceLock::new();

fn get_connection_id() -> ConnectionId {
*CONNECTION_ID.get_or_init(|| ConnectionId::from_le_byte_array(rand::random()))
fn get_connection_id_override() -> Option<ConnectionId> {
CONNECTION_ID.get().copied()
}

#[doc(hidden)]
Expand All @@ -772,6 +790,7 @@ fn get_connection_id() -> ConnectionId {
/// Returns `Err` if this process's connection ID has already been initialized to a random value.
pub fn set_connection_id(id: ConnectionId) -> crate::Result<()> {
let stored = *CONNECTION_ID.get_or_init(|| id);

if stored != id {
return Err(InternalError::new(
"Call to set_connection_id after CONNECTION_ID was initialized to a different value ",
Expand Down Expand Up @@ -834,12 +853,13 @@ but you must call one of them, or else the connection will never progress.
let db_callbacks = DbCallbacks::default();
let reducer_callbacks = ReducerCallbacks::default();

let connection_id_override = get_connection_id_override();
let ws_connection = tokio::task::block_in_place(|| {
handle.block_on(WsConnection::connect(
self.uri.unwrap(),
self.module_name.as_ref().unwrap(),
self.token.as_deref(),
get_connection_id(),
connection_id_override,
self.params,
))
})
Expand Down Expand Up @@ -878,6 +898,7 @@ but you must call one of them, or else the connection will never progress.
pending_mutations_send,
pending_mutations_recv: Arc::new(TokioMutex::new(pending_mutations_recv)),
identity: Arc::new(StdMutex::new(None)),
connection_id: Arc::new(StdMutex::new(connection_id_override)),
};

Ok(ctx_imp)
Expand Down
5 changes: 2 additions & 3 deletions crates/sdk/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
use once_cell::sync::Lazy;
use prometheus::{HistogramVec, IntCounterVec};
use spacetimedb_lib::ConnectionId;
use spacetimedb_metrics::metrics_group;

metrics_group!(
pub struct ClientMetrics {
#[name = spacetime_client_received_total]
#[help = "The cumulative number of received websocket messages"]
#[labels(db: Box<str>, connection_id: ConnectionId)]
#[labels(db: Box<str>)]
pub websocket_received: IntCounterVec,

#[name = spacetime_client_received_msg_size]
#[help = "The size of received websocket messages"]
#[labels(db: Box<str>, connection_id: ConnectionId)]
#[labels(db: Box<str>)]
pub websocket_received_msg_size: HistogramVec,
}
);
Expand Down
33 changes: 17 additions & 16 deletions crates/sdk/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub enum WsError {

pub(crate) struct WsConnection {
db_name: Box<str>,
connection_id: ConnectionId,
// connection_id: ConnectionId,
Comment thread
jsdt marked this conversation as resolved.
Outdated
sock: WebSocketStream<MaybeTlsStream<TcpStream>>,
}

Expand All @@ -112,7 +112,7 @@ pub(crate) struct WsParams {
pub light: bool,
}

fn make_uri(host: Uri, db_name: &str, connection_id: ConnectionId, params: WsParams) -> Result<Uri, UriError> {
fn make_uri(host: Uri, db_name: &str, connection_id: Option<ConnectionId>, params: WsParams) -> Result<Uri, UriError> {
let mut parts = host.into_parts();
let scheme = parse_scheme(parts.scheme.take())?;
parts.scheme = Some(scheme);
Expand All @@ -134,19 +134,22 @@ fn make_uri(host: Uri, db_name: &str, connection_id: ConnectionId, params: WsPar
path.push_str(db_name);
path.push_str("/subscribe");

// Provide the connection ID.
path.push_str("?connection_id=");
path.push_str(&connection_id.to_hex());

// Specify the desired compression for host->client replies.
match params.compression {
Compression::None => path.push_str("&compression=None"),
Compression::Gzip => path.push_str("&compression=Gzip"),
Compression::None => path.push_str("?compression=None"),
Compression::Gzip => path.push_str("?compression=Gzip"),
// The host uses the same default as the sdk,
// but in case this changes, we prefer to be explicit now.
Compression::Brotli => path.push_str("&compression=Brotli"),
Compression::Brotli => path.push_str("?compression=Brotli"),
};

// Provide the connection ID if the client provided one.
if let Some(cid) = connection_id {
// If a connection ID is provided, append it to the path.
path.push_str("&connection_id=");
path.push_str(&cid.to_hex());
}

// Specify the `light` mode if requested.
if params.light {
path.push_str("&light=true");
Expand All @@ -173,7 +176,7 @@ fn make_request(
host: Uri,
db_name: &str,
token: Option<&str>,
connection_id: ConnectionId,
connection_id: Option<ConnectionId>,
params: WsParams,
) -> Result<http::Request<()>, WsError> {
let uri = make_uri(host, db_name, connection_id, params)?;
Expand Down Expand Up @@ -205,7 +208,7 @@ impl WsConnection {
host: Uri,
db_name: &str,
token: Option<&str>,
connection_id: ConnectionId,
connection_id: Option<ConnectionId>,
params: WsParams,
) -> Result<Self, WsError> {
let req = make_request(host, db_name, token, connection_id, params)?;
Expand All @@ -228,7 +231,7 @@ impl WsConnection {
})?;
Ok(WsConnection {
db_name: db_name.into(),
connection_id,
// connection_id,
Comment thread
jsdt marked this conversation as resolved.
Outdated
sock,
})
}
Expand Down Expand Up @@ -275,12 +278,10 @@ impl WsConnection {
incoming_messages: mpsc::UnboundedSender<ServerMessage<BsatnFormat>>,
outgoing_messages: mpsc::UnboundedReceiver<ClientMessage<Bytes>>,
) {
let websocket_received = CLIENT_METRICS
.websocket_received
.with_label_values(&self.db_name, &self.connection_id);
let websocket_received = CLIENT_METRICS.websocket_received.with_label_values(&self.db_name);
let websocket_received_msg_size = CLIENT_METRICS
.websocket_received_msg_size
.with_label_values(&self.db_name, &self.connection_id);
.with_label_values(&self.db_name);
let record_metrics = |msg_size: usize| {
websocket_received.inc();
websocket_received_msg_size.observe(msg_size as f64);
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading