Skip to content

Commit 544bc15

Browse files
authored
Merge branch 'master' into centril/reuse-ws-msg-allocations
2 parents 42dc169 + c522c0f commit 544bc15

24 files changed

Lines changed: 1307 additions & 359 deletions

File tree

crates/cli/src/subcommands/dns.rs

Lines changed: 24 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use crate::common_args;
22
use crate::config::Config;
3-
use crate::util::{add_auth_header_opt, decode_identity, get_auth_header, get_login_token_or_log_in, ResponseExt};
3+
use crate::util::{add_auth_header_opt, get_auth_header, ResponseExt};
44
use clap::ArgMatches;
55
use clap::{Arg, Command};
66

7-
use spacetimedb_client_api_messages::name::{DomainName, InsertDomainResult};
7+
use spacetimedb_client_api_messages::name::{DomainName, SetDomainsResult};
88

99
pub fn cli() -> Command {
1010
Command::new("rename")
@@ -30,70 +30,39 @@ pub async fn exec(mut config: Config, args: &ArgMatches) -> Result<(), anyhow::E
3030
let database_identity = args.get_one::<String>("database-identity").unwrap();
3131
let server = args.get_one::<String>("server").map(|s| s.as_ref());
3232
let force = args.get_flag("force");
33-
let token = get_login_token_or_log_in(&mut config, server, !force).await?;
34-
let identity = decode_identity(&token)?;
3533
let auth_header = get_auth_header(&mut config, false, server, !force).await?;
3634

3735
let domain: DomainName = domain.parse()?;
3836

3937
let builder = reqwest::Client::new()
40-
.post(format!(
38+
.put(format!(
4139
"{}/v1/database/{database_identity}/names",
4240
config.get_host_url(server)?
4341
))
44-
.body(String::from(domain));
42+
.header(reqwest::header::CONTENT_TYPE, "application/json")
43+
.body(serde_json::to_string(&[&domain])?);
4544
let builder = add_auth_header_opt(builder, &auth_header);
4645

47-
let result = builder.send().await?.json_or_error().await?;
48-
match result {
49-
InsertDomainResult::Success {
50-
domain,
51-
database_identity,
52-
} => {
53-
println!("Domain set to {} for identity {}.", domain, database_identity);
54-
}
55-
InsertDomainResult::TldNotRegistered { domain } => {
56-
return Err(anyhow::anyhow!(
57-
"The top level domain that you provided is not registered.\n\
58-
This tld is not yet registered to any identity. You can register this domain with the following command:\n\
59-
\n\
60-
\tspacetime dns register-tld {}\n",
61-
domain.tld()
62-
));
63-
}
64-
InsertDomainResult::PermissionDenied { domain } => {
65-
//TODO(jdetter): Have a nice name generator here, instead of using some abstract characters
66-
// we should perhaps generate fun names like 'green-fire-dragon' instead
67-
let suggested_tld: String = identity.chars().take(12).collect();
68-
if let Some(sub_domain) = domain.sub_domain() {
69-
return Err(anyhow::anyhow!(
70-
"The top level domain {} is not registered to the identity you provided.\n\
71-
We suggest you register a new tld:\n\
72-
\tspacetime dns register-tld {}\n\
73-
\n\
74-
And then push to the domain that uses that tld:\n\
75-
\tspacetime publish {}/{}\n",
76-
domain.tld(),
77-
suggested_tld,
78-
suggested_tld,
79-
sub_domain
80-
));
81-
} else {
82-
return Err(anyhow::anyhow!(
83-
"The top level domain {} is not registered to the identity you provided.\n\
84-
We suggest you register a new tld:\n\
85-
\tspacetime dns register-tld {}\n\
86-
\n\
87-
And then push to the domain that uses that tld:\n\
88-
\tspacetime publish {}\n",
89-
domain.tld(),
90-
suggested_tld,
91-
suggested_tld
92-
));
93-
}
94-
}
95-
InsertDomainResult::OtherError(e) => return Err(anyhow::anyhow!(e)),
46+
let response = builder.send().await?;
47+
let status = &response.status();
48+
let result: SetDomainsResult = response.json_or_error().await?;
49+
50+
if !status.is_success() {
51+
anyhow::bail!(match result {
52+
SetDomainsResult::Success => "".to_string(),
53+
SetDomainsResult::PermissionDenied { domain } => format!("Permission denied for domain: {}", domain),
54+
SetDomainsResult::PermissionDeniedOnAny { domains } =>
55+
format!("Permission denied for domains: {:?}", domains),
56+
SetDomainsResult::DatabaseNotFound => format!("Database {} not found", database_identity),
57+
SetDomainsResult::NotYourDatabase { .. } => format!(
58+
"You cannot rename {} because it is owned by another identity.",
59+
database_identity
60+
),
61+
SetDomainsResult::OtherError(err) => err,
62+
});
9663
}
9764

65+
println!("Name set to {} for identity {}.", domain, database_identity);
66+
9867
Ok(())
9968
}

crates/client-api/src/routes/database.rs

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use spacetimedb::messages::control_db::{Database, HostType};
2929
use spacetimedb_client_api_messages::name::{self, DatabaseName, DomainName, PublishOp, PublishResult};
3030
use spacetimedb_lib::db::raw_def::v9::RawModuleDefV9;
3131
use spacetimedb_lib::identity::AuthCtx;
32-
use spacetimedb_lib::sats;
32+
use spacetimedb_lib::{sats, Timestamp};
3333

3434
use super::subscribe::handle_websocket;
3535

@@ -371,7 +371,7 @@ fn mime_ndjson() -> mime::Mime {
371371
"application/x-ndjson".parse().unwrap()
372372
}
373373

374-
async fn worker_ctx_find_database(
374+
pub(crate) async fn worker_ctx_find_database(
375375
worker_ctx: &(impl ControlStateDelegate + ?Sized),
376376
database_identity: &Identity,
377377
) -> axum::response::Result<Option<Database>> {
@@ -704,6 +704,18 @@ pub async fn set_names<S: ControlStateDelegate>(
704704
));
705705
}
706706

707+
for name in &validated_names {
708+
if ctx.lookup_identity(name.as_str()).unwrap().is_some() {
709+
return Ok((
710+
StatusCode::BAD_REQUEST,
711+
axum::Json(name::SetDomainsResult::OtherError(format!(
712+
"Cannot rename to {} because it already is in use.",
713+
name.as_str()
714+
))),
715+
));
716+
}
717+
}
718+
707719
let response = ctx
708720
.replace_dns_records(&database_identity, &database.owner_identity, &validated_names)
709721
.await
@@ -720,6 +732,33 @@ pub async fn set_names<S: ControlStateDelegate>(
720732
Ok((status, axum::Json(response)))
721733
}
722734

735+
#[derive(serde::Deserialize)]
736+
pub struct TimestampParams {
737+
name_or_identity: NameOrIdentity,
738+
}
739+
740+
/// Returns the database's view of the current time,
741+
/// as a SATS-JSON encoded [`Timestamp`].
742+
///
743+
/// Takes a particular database's [`NameOrIdentity`] as an argument
744+
/// because in a clusterized SpacetimeDB-cloud deployment,
745+
/// this request will be routed to the node running the requested database.
746+
async fn get_timestamp<S: ControlStateDelegate>(
747+
State(worker_ctx): State<S>,
748+
Path(TimestampParams { name_or_identity }): Path<TimestampParams>,
749+
) -> axum::response::Result<impl IntoResponse> {
750+
let db_identity = name_or_identity.resolve(&worker_ctx).await?;
751+
752+
let _database = worker_ctx_find_database(&worker_ctx, &db_identity)
753+
.await?
754+
.ok_or_else(|| {
755+
log::error!("Could not find database: {}", db_identity.to_hex());
756+
NO_SUCH_DATABASE
757+
})?;
758+
759+
Ok(axum::Json(sats::serde::SerdeWrapper(Timestamp::now())).into_response())
760+
}
761+
723762
/// This struct allows the edition to customize `/database` routes more meticulously.
724763
pub struct DatabaseRoutes<S> {
725764
/// POST /database
@@ -748,6 +787,9 @@ pub struct DatabaseRoutes<S> {
748787
pub logs_get: MethodRouter<S>,
749788
/// POST: /database/:name_or_identity/sql
750789
pub sql_post: MethodRouter<S>,
790+
791+
/// GET: /database/: name_or_identity/unstable/timestamp
792+
pub timestamp_get: MethodRouter<S>,
751793
}
752794

753795
impl<S> Default for DatabaseRoutes<S>
@@ -770,6 +812,7 @@ where
770812
schema_get: get(schema::<S>),
771813
logs_get: get(logs::<S>),
772814
sql_post: post(sql::<S>),
815+
timestamp_get: get(get_timestamp::<S>),
773816
}
774817
}
775818
}
@@ -791,7 +834,8 @@ where
791834
.route("/call/:reducer", self.call_reducer_post)
792835
.route("/schema", self.schema_get)
793836
.route("/logs", self.logs_get)
794-
.route("/sql", self.sql_post);
837+
.route("/sql", self.sql_post)
838+
.route("/unstable/timestamp", self.timestamp_get);
795839

796840
axum::Router::new()
797841
.route("/", self.root_post)

crates/client-api/src/routes/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ mod internal;
1212
pub mod metrics;
1313
pub mod prometheus;
1414
pub mod subscribe;
15-
pub mod unstable;
1615

1716
/// This API call is just designed to allow clients to determine whether or not they can
1817
/// establish a connection to SpacetimeDB. This API call doesn't actually do anything.
@@ -41,5 +40,4 @@ where
4140
axum::Router::new()
4241
.nest("/v1", router.layer(cors))
4342
.nest("/internal", internal::router())
44-
.nest("/unstable", unstable::router())
4543
}

crates/client-api/src/routes/unstable.rs

Lines changed: 0 additions & 21 deletions
This file was deleted.

crates/core/src/client/client_connection.rs

Lines changed: 29 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use futures::prelude::*;
1919
use prometheus::{Histogram, IntCounter, IntGauge};
2020
use spacetimedb_client_api_messages::websocket::{
2121
BsatnFormat, CallReducerFlags, Compression, FormatSwitch, JsonFormat, SubscribeMulti, SubscribeSingle, Unsubscribe,
22-
UnsubscribeMulti, WebsocketFormat,
22+
UnsubscribeMulti,
2323
};
2424
use spacetimedb_lib::identity::RequestId;
2525
use spacetimedb_lib::metrics::ExecutionMetrics;
@@ -450,41 +450,40 @@ impl ClientConnection {
450450
.await
451451
}
452452

453-
pub fn one_off_query_json(&self, query: &str, message_id: &[u8], timer: Instant) -> Result<(), anyhow::Error> {
454-
let response = self.one_off_query::<JsonFormat>(query, message_id, timer);
455-
self.send_message(response)?;
456-
Ok(())
457-
}
458-
459-
pub fn one_off_query_bsatn(&self, query: &str, message_id: &[u8], timer: Instant) -> Result<(), anyhow::Error> {
460-
let response = self.one_off_query::<BsatnFormat>(query, message_id, timer);
461-
self.send_message(response)?;
462-
Ok(())
453+
pub async fn one_off_query_json(
454+
&self,
455+
query: &str,
456+
message_id: &[u8],
457+
timer: Instant,
458+
) -> Result<(), anyhow::Error> {
459+
self.module
460+
.one_off_query::<JsonFormat>(
461+
self.id.identity,
462+
query.to_owned(),
463+
self.sender.clone(),
464+
message_id.to_owned(),
465+
timer,
466+
|msg: OneOffQueryResponseMessage<JsonFormat>| msg.into(),
467+
)
468+
.await
463469
}
464470

465-
fn one_off_query<F: WebsocketFormat>(
471+
pub async fn one_off_query_bsatn(
466472
&self,
467473
query: &str,
468474
message_id: &[u8],
469475
timer: Instant,
470-
) -> OneOffQueryResponseMessage<F> {
471-
let result = self.module.one_off_query::<F>(self.id.identity, query.to_owned());
472-
let message_id = message_id.to_owned();
473-
let total_host_execution_duration = timer.elapsed().into();
474-
match result {
475-
Ok(results) => OneOffQueryResponseMessage {
476-
message_id,
477-
error: None,
478-
results: vec![results],
479-
total_host_execution_duration,
480-
},
481-
Err(err) => OneOffQueryResponseMessage {
482-
message_id,
483-
error: Some(format!("{}", err)),
484-
results: vec![],
485-
total_host_execution_duration,
486-
},
487-
}
476+
) -> Result<(), anyhow::Error> {
477+
self.module
478+
.one_off_query::<BsatnFormat>(
479+
self.id.identity,
480+
query.to_owned(),
481+
self.sender.clone(),
482+
message_id.to_owned(),
483+
timer,
484+
|msg: OneOffQueryResponseMessage<BsatnFormat>| msg.into(),
485+
)
486+
.await
488487
}
489488

490489
pub async fn disconnect(self) {

crates/core/src/client/message_handlers.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,8 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst
120120
message_id,
121121
}) => {
122122
let res = match client.config.protocol {
123-
Protocol::Binary => client.one_off_query_bsatn(&query, &message_id, timer),
124-
Protocol::Text => client.one_off_query_json(&query, &message_id, timer),
123+
Protocol::Binary => client.one_off_query_bsatn(&query, &message_id, timer).await,
124+
Protocol::Text => client.one_off_query_json(&query, &message_id, timer).await,
125125
};
126126
mod_metrics
127127
.request_round_trip_sql

crates/core/src/db/datastore/system_tables.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -813,10 +813,18 @@ impl From<StRowLevelSecurityRow> for RowLevelSecuritySchema {
813813
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
814814
pub struct ModuleKind(u8);
815815

816-
/// The [`ModuleKind`] of WASM-based modules.
817-
///
818-
/// This is currently the only known kind.
819-
pub const WASM_MODULE: ModuleKind = ModuleKind(0);
816+
impl ModuleKind {
817+
/// The [`ModuleKind`] of WASM-based modules.
818+
pub const WASM: ModuleKind = ModuleKind(0);
819+
}
820+
821+
impl From<crate::messages::control_db::HostType> for ModuleKind {
822+
fn from(host_type: crate::messages::control_db::HostType) -> Self {
823+
match host_type {
824+
crate::messages::control_db::HostType::Wasm => Self::WASM,
825+
}
826+
}
827+
}
820828

821829
impl_serialize!([] ModuleKind, (self, ser) => self.0.serialize(ser));
822830
impl_deserialize!([] ModuleKind, de => u8::deserialize(de).map(Self));
@@ -852,7 +860,7 @@ impl From<Identity> for IdentityViaU256 {
852860
///
853861
/// * `database_identity` is the [`Identity`] of the database.
854862
/// * `owner_identity` is the [`Identity`] of the owner of the database.
855-
/// * `program_kind` is the [`ModuleKind`] (currently always [`WASM_MODULE`]).
863+
/// * `program_kind` is the [`ModuleKind`] (currently always [`ModuleKind::WASM`]).
856864
/// * `program_hash` is the [`Hash`] of the raw bytes of the (compiled) module.
857865
/// * `program_bytes` are the raw bytes of the (compiled) module.
858866
/// * `module_version` is the version of the module.

crates/core/src/db/db_metrics/mod.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,36 @@ metrics_group!(
155155
#[help = "Number of subscriptions via the legacy api"]
156156
#[labels(database_identity: Identity)]
157157
pub num_legacy_subscriptions: IntGaugeVec,
158+
159+
#[name = spacetime_subscription_compile_time_sec]
160+
#[help = "How much time (in seconds) do we spend compiling subscriptions"]
161+
#[labels(db: Identity, workload: WorkloadType)]
162+
pub subscription_compile_time: HistogramVec,
163+
164+
#[name = spacetime_subscription_lock_num_waiters]
165+
#[help = "The number of clients waiting to acquire the subscription lock"]
166+
#[labels(db: Identity, workload: WorkloadType)]
167+
pub subscription_lock_waiters: IntGaugeVec,
168+
169+
#[name = spacetime_subscription_lock_wait_time_sec]
170+
#[help = "How much time (in seconds) do we spend waiting to acquire the subscription lock"]
171+
#[labels(db: Identity, workload: WorkloadType)]
172+
pub subscription_lock_wait_time: HistogramVec,
173+
174+
#[name = spacetime_num_queries_subscribed]
175+
#[help = "How many total queries make up each subscribe call"]
176+
#[labels(db: Identity)]
177+
pub num_queries_subscribed: IntCounterVec,
178+
179+
#[name = spacetime_num_new_queries_subscribed]
180+
#[help = "How many new (uncached) queries are make up each subscribe call"]
181+
#[labels(db: Identity)]
182+
pub num_new_queries_subscribed: IntCounterVec,
183+
184+
#[name = spacetime_num_queries_evaluated]
185+
#[help = "How many queries are evaluated in each subscribe and unsubscribe"]
186+
#[labels(db: Identity, workload: WorkloadType)]
187+
pub num_queries_evaluated: IntCounterVec,
158188
}
159189
);
160190

0 commit comments

Comments
 (0)