Skip to content

Commit 24bf01a

Browse files
committed
Merge branch 'master' into jsdt/broadcast-one-offs
2 parents 0c0cd08 + 053fc6d commit 24bf01a

8 files changed

Lines changed: 72 additions & 17 deletions

File tree

crates/client-api/src/routes/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ mod internal;
1212
pub mod metrics;
1313
pub mod prometheus;
1414
pub mod subscribe;
15+
pub mod unstable;
1516

1617
/// This API call is just designed to allow clients to determine whether or not they can
1718
/// establish a connection to SpacetimeDB. This API call doesn't actually do anything.
@@ -40,4 +41,5 @@ where
4041
axum::Router::new()
4142
.nest("/v1", router.layer(cors))
4243
.nest("/internal", internal::router())
44+
.nest("/unstable", unstable::router())
4345
}

crates/client-api/src/routes/subscribe.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,8 @@ async fn ws_client_actor_inner(
296296
// TODO: this isn't great. when we receive a close request from the peer,
297297
// tungstenite doesn't let us send any new messages on the socket,
298298
// even though the websocket RFC allows it. should we fork tungstenite?
299-
log::info!("dropping messages due to ws already being closed: {:?}", &rx_buf[..n]);
299+
log::info!("dropping {n} messages due to ws already being closed");
300+
log::debug!("dropped messages: {:?}", &rx_buf[..n]);
300301
} else {
301302
let send_all = async {
302303
for msg in rx_buf.drain(..n) {
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
use axum::response::IntoResponse;
2+
use spacetimedb_lib::{sats, Timestamp};
3+
4+
use crate::NodeDelegate;
5+
6+
/// Returns the database's view of the current time,
7+
/// as a SATS-JSON encoded [`Timestamp`].
8+
async fn get_timestamp() -> impl IntoResponse {
9+
axum::Json(sats::serde::SerdeWrapper(Timestamp::now())).into_response()
10+
}
11+
12+
/// The internal router is for routes which are early in design,
13+
/// and may incompatibly change or be removed without a major version bump.
14+
pub fn router<S>() -> axum::Router<S>
15+
where
16+
S: NodeDelegate + Clone + 'static,
17+
{
18+
use axum::routing::get;
19+
20+
axum::Router::new().route("/timestamp", get(get_timestamp))
21+
}

crates/core/src/db/datastore/system_tables.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -813,10 +813,18 @@ impl From<StRowLevelSecurityRow> for RowLevelSecuritySchema {
813813
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
814814
pub struct ModuleKind(u8);
815815

816-
/// The [`ModuleKind`] of WASM-based modules.
817-
///
818-
/// This is currently the only known kind.
819-
pub const WASM_MODULE: ModuleKind = ModuleKind(0);
816+
impl ModuleKind {
817+
/// The [`ModuleKind`] of WASM-based modules.
818+
pub const WASM: ModuleKind = ModuleKind(0);
819+
}
820+
821+
impl From<crate::messages::control_db::HostType> for ModuleKind {
822+
fn from(host_type: crate::messages::control_db::HostType) -> Self {
823+
match host_type {
824+
crate::messages::control_db::HostType::Wasm => Self::WASM,
825+
}
826+
}
827+
}
820828

821829
impl_serialize!([] ModuleKind, (self, ser) => self.0.serialize(ser));
822830
impl_deserialize!([] ModuleKind, de => u8::deserialize(de).map(Self));
@@ -852,7 +860,7 @@ impl From<Identity> for IdentityViaU256 {
852860
///
853861
/// * `database_identity` is the [`Identity`] of the database.
854862
/// * `owner_identity` is the [`Identity`] of the owner of the database.
855-
/// * `program_kind` is the [`ModuleKind`] (currently always [`WASM_MODULE`]).
863+
/// * `program_kind` is the [`ModuleKind`] (currently always [`ModuleKind::WASM`]).
856864
/// * `program_hash` is the [`Hash`] of the raw bytes of the (compiled) module.
857865
/// * `program_bytes` are the raw bytes of the (compiled) module.
858866
/// * `module_version` is the version of the module.

crates/core/src/db/relational_db.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use super::datastore::{
1717
traits::TxData,
1818
};
1919
use super::db_metrics::DB_METRICS;
20-
use crate::db::datastore::system_tables::{StModuleRow, WASM_MODULE};
20+
use crate::db::datastore::system_tables::StModuleRow;
2121
use crate::error::{DBError, DatabaseError, RestoreSnapshotError};
2222
use crate::execution_context::{ReducerContext, Workload, WorkloadType};
2323
use crate::messages::control_db::HostType;
@@ -427,9 +427,7 @@ impl RelationalDB {
427427
database_identity: self.database_identity.into(),
428428
owner_identity: self.owner_identity.into(),
429429

430-
program_kind: match host_type {
431-
HostType::Wasm => WASM_MODULE,
432-
},
430+
program_kind: host_type.into(),
433431
program_hash: program.hash,
434432
program_bytes: program.bytes,
435433
module_version: ONLY_MODULE_VERSION.into(),
@@ -473,10 +471,7 @@ impl RelationalDB {
473471
/// - the `__init__` reducer contained in the module has been executed
474472
/// within the transactional context `tx`.
475473
pub fn update_program(&self, tx: &mut MutTx, host_type: HostType, program: Program) -> Result<(), DBError> {
476-
let program_kind = match host_type {
477-
HostType::Wasm => WASM_MODULE,
478-
};
479-
Ok(self.inner.update_program(tx, program_kind, program)?)
474+
Ok(self.inner.update_program(tx, host_type.into(), program)?)
480475
}
481476

482477
fn restore_from_snapshot_or_bootstrap(

crates/core/src/startup.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use core_affinity::CoreId;
22
use crossbeam_queue::ArrayQueue;
3+
use itertools::Itertools;
34
use spacetimedb_paths::server::{ConfigToml, LogsDir};
45
use std::path::PathBuf;
56
use std::time::Duration;
@@ -194,7 +195,14 @@ pub struct Cores {
194195
impl Cores {
195196
fn get() -> Option<Self> {
196197
let cores = &mut core_affinity::get_core_ids()
197-
.filter(|cores| cores.len() >= 8)?
198+
.filter(|cores| cores.len() >= 10)?
199+
.into_iter()
200+
// We reserve the first two cores for the OS.
201+
// This allows us to pin interrupt handlers (IRQs) to these cores,
202+
// particularly those for incoming network traffic,
203+
// preventing them from preempting the main reducer threads.
204+
.filter(|core_id| core_id.id > 1)
205+
.collect_vec()
198206
.into_iter();
199207

200208
let total = cores.len() as f64;

smoketests/__init__.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -264,10 +264,11 @@ def api_call(self, method, path, body = None, headers = {}):
264264
log_cmd([method, path])
265265
conn.request(method, path, body, headers)
266266
resp = conn.getresponse()
267-
logging.debug(f"{resp.status} {resp.read()}")
267+
body = resp.read()
268+
logging.debug(f"{resp.status} {body}")
268269
if resp.status != 200:
269270
raise resp
270-
resp
271+
return body
271272

272273

273274
@classmethod
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
from .. import Smoketest, random_string
2+
import unittest
3+
import json
4+
import io
5+
6+
TIMESTAMP_TAG = "__timestamp_micros_since_unix_epoch__"
7+
8+
class TimestampRoute(Smoketest):
9+
AUTOPUBLISH = False
10+
11+
def test_timestamp_route(self):
12+
resp = self.api_call(
13+
"GET",
14+
"/unstable/timestamp",
15+
)
16+
timestamp = json.load(io.BytesIO(resp))
17+
self.assertIsInstance(timestamp, dict)
18+
self.assertIn(TIMESTAMP_TAG, timestamp)
19+
self.assertIsInstance(timestamp[TIMESTAMP_TAG], int)

0 commit comments

Comments
 (0)