Skip to content

Commit 54a9e7e

Browse files
committed
Merge branch 'master' into jsdt/store-client-creds
2 parents fc7cd3d + 321e430 commit 54a9e7e

8 files changed

Lines changed: 162 additions & 160 deletions

File tree

crates/core/src/host/host_controller.rs

Lines changed: 1 addition & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use crate::subscription::module_subscription_manager::{spawn_send_worker, Subscr
1616
use crate::util::asyncify;
1717
use crate::util::jobs::{JobCore, JobCores};
1818
use crate::worker_metrics::WORKER_METRICS;
19-
use anyhow::{anyhow, ensure, Context};
19+
use anyhow::{anyhow, Context};
2020
use async_trait::async_trait;
2121
use durability::{Durability, EmptyHistory};
2222
use log::{info, trace, warn};
@@ -448,117 +448,6 @@ impl HostController {
448448
host.migrate_plan(host_type, program, style).await
449449
}
450450

451-
/// Start the host `replica_id` and conditionally update it.
452-
///
453-
/// If the host was not initialized before, it is initialized with the
454-
/// program [`Database::initial_program`], which is loaded from the
455-
/// controller's [`ProgramStorage`].
456-
///
457-
/// If it was already initialized and its stored program hash matches
458-
/// [`Database::initial_program`], no further action is taken.
459-
///
460-
/// Otherwise, if `expected_hash` is `Some` and does **not** match the
461-
/// stored hash, an error is returned.
462-
///
463-
/// Otherwise, the host is updated to [`Database::initial_program`], loading
464-
/// the program data from the controller's [`ProgramStorage`].
465-
///
466-
/// > Note that this ascribes different semantics to [`Database::initial_program`]
467-
/// > than elsewhere, where the [`Database`] value is provided by the control
468-
/// > database. The method is mainly useful for bootstrapping the control
469-
/// > database itself.
470-
pub async fn init_maybe_update_module_host(
471-
&self,
472-
database: Database,
473-
replica_id: u64,
474-
expected_hash: Option<Hash>,
475-
) -> anyhow::Result<watch::Receiver<ModuleHost>> {
476-
trace!("custom bootstrap {}/{}", database.database_identity, replica_id);
477-
478-
let db_addr = database.database_identity;
479-
let host_type = database.host_type;
480-
let program_hash = database.initial_program;
481-
482-
let mut guard = self.acquire_write_lock(replica_id).await;
483-
484-
// `HostController::clone` is fast,
485-
// as all of its fields are either `Copy` or wrapped in `Arc`.
486-
let this = self.clone();
487-
488-
// `try_init_host` is not cancel safe, as it will spawn other async tasks
489-
// which hold a filesystem lock past when `try_init_host` returns or is cancelled.
490-
// This means that, if `try_init_host` is cancelled, subsequent calls will fail.
491-
//
492-
// The rest of this future is also not cancel safe, as it will `Option::take` out of the guard
493-
// at the start of the block and then store back into it at the end.
494-
//
495-
// This is problematic because Axum will cancel its handler tasks if the client disconnects,
496-
// and this method is called from Axum handlers, e.g. for the publish route.
497-
// `tokio::spawn` a task to update the contents of `guard`,
498-
// so that it will run to completion even if the caller goes away.
499-
//
500-
// Note that `tokio::spawn` only cancels its tasks when the runtime shuts down,
501-
// at which point we won't be calling `try_init_host` again anyways.
502-
let module = tokio::spawn(async move {
503-
let mut host = match guard.take() {
504-
Some(host) => host,
505-
None => this.try_init_host(database, replica_id).await?,
506-
};
507-
let module = host.module.subscribe();
508-
509-
// The program is now either:
510-
//
511-
// - the desired one from [Database], in which case we do nothing
512-
// - `Some` expected hash, in which case we update to the desired one
513-
// - `None` expected hash, in which case we also update
514-
let stored_hash = stored_program_hash(host.db())?
515-
.with_context(|| format!("[{db_addr}] database improperly initialized"))?;
516-
if stored_hash == program_hash {
517-
info!("[{db_addr}] database up-to-date with {program_hash}");
518-
*guard = Some(host);
519-
} else {
520-
if let Some(expected_hash) = expected_hash {
521-
ensure!(
522-
expected_hash == stored_hash,
523-
"[{}] expected program {} found {}",
524-
db_addr,
525-
expected_hash,
526-
stored_hash
527-
);
528-
}
529-
info!("[{db_addr}] updating database from `{stored_hash}` to `{program_hash}`");
530-
let program = load_program(&this.program_storage, program_hash).await?;
531-
let update_result = host
532-
.update_module(
533-
this.runtimes.clone(),
534-
host_type,
535-
program,
536-
MigrationPolicy::Compatible,
537-
this.energy_monitor.clone(),
538-
this.unregister_fn(replica_id),
539-
this.db_cores.take(),
540-
)
541-
.await?;
542-
match update_result {
543-
UpdateDatabaseResult::NoUpdateNeeded | UpdateDatabaseResult::UpdatePerformed => {
544-
*guard = Some(host);
545-
}
546-
UpdateDatabaseResult::AutoMigrateError(e) => {
547-
return Err(anyhow::anyhow!(e));
548-
}
549-
UpdateDatabaseResult::ErrorExecutingMigration(e) => {
550-
return Err(e);
551-
}
552-
}
553-
}
554-
555-
Ok::<_, anyhow::Error>(module)
556-
})
557-
.await??;
558-
559-
Ok(module)
560-
}
561-
562451
/// Release all resources of the [`ModuleHost`] identified by `replica_id`,
563452
/// and deregister it from the controller.
564453
#[tracing::instrument(level = "trace", skip_all)]
@@ -1121,10 +1010,6 @@ impl Host {
11211010

11221011
Ok(res)
11231012
}
1124-
1125-
fn db(&self) -> &RelationalDB {
1126-
&self.replica_ctx.relational_db
1127-
}
11281013
}
11291014

11301015
impl Drop for Host {

crates/core/src/host/module_host.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext,
3535
use spacetimedb_datastore::locking_tx_datastore::MutTxId;
3636
use spacetimedb_datastore::system_tables::{ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID};
3737
use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData};
38+
use spacetimedb_durability::DurableOffset;
3839
use spacetimedb_execution::pipelined::PipelinedProject;
3940
use spacetimedb_lib::db::raw_def::v9::Lifecycle;
4041
use spacetimedb_lib::identity::{AuthCtx, RequestId};
@@ -1263,6 +1264,10 @@ impl ModuleHost {
12631264
&self.replica_ctx().database
12641265
}
12651266

1267+
pub fn durable_tx_offset(&self) -> Option<DurableOffset> {
1268+
self.replica_ctx().relational_db.durable_tx_offset()
1269+
}
1270+
12661271
pub(crate) fn replica_ctx(&self) -> &ReplicaContext {
12671272
self.module.replica_ctx()
12681273
}

sdks/typescript/packages/sdk/src/client_cache.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ export class ClientCache {
55
/**
66
* The tables in the database.
77
*/
8-
tables: Map<string, TableCache>;
8+
tables: Map<string, TableCache<any>>;
99

1010
constructor() {
1111
this.tables = new Map();
@@ -16,7 +16,9 @@ export class ClientCache {
1616
* @param name The name of the table.
1717
* @returns The table
1818
*/
19-
getTable(name: string): TableCache {
19+
getTable<RowType extends Record<string, any>>(
20+
name: string
21+
): TableCache<RowType> {
2022
const table = this.tables.get(name);
2123

2224
// ! This should not happen as the table should be available but an exception is thrown just in case.
@@ -30,10 +32,10 @@ export class ClientCache {
3032
return table;
3133
}
3234

33-
getOrCreateTable<RowType>(
35+
getOrCreateTable<RowType extends Record<string, any>>(
3436
tableTypeInfo: TableRuntimeTypeInfo
3537
): TableCache<RowType> {
36-
let table: TableCache;
38+
let table: TableCache<RowType>;
3739
if (!this.tables.has(tableTypeInfo.tableName)) {
3840
table = new TableCache<RowType>(tableTypeInfo);
3941
this.tables.set(tableTypeInfo.tableName, table);

sdks/typescript/packages/sdk/src/db_connection_impl.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -483,7 +483,9 @@ export class DbConnectionImpl<
483483
const parsedTableUpdates = await parseDatabaseUpdate(
484484
message.value.update
485485
);
486-
const subscribeAppliedMessage: SubscribeAppliedMessage = {
486+
const subscribeAppliedMessage: SubscribeAppliedMessage<
487+
Record<string, any>
488+
> = {
487489
tag: 'SubscribeApplied',
488490
queryId: message.value.queryId.id,
489491
tableUpdates: parsedTableUpdates,
@@ -495,7 +497,9 @@ export class DbConnectionImpl<
495497
const parsedTableUpdates = await parseDatabaseUpdate(
496498
message.value.update
497499
);
498-
const unsubscribeAppliedMessage: UnsubscribeAppliedMessage = {
500+
const unsubscribeAppliedMessage: UnsubscribeAppliedMessage<
501+
Record<string, any>
502+
> = {
499503
tag: 'UnsubscribeApplied',
500504
queryId: message.value.queryId.id,
501505
tableUpdates: parsedTableUpdates,

sdks/typescript/packages/sdk/src/message_types.ts

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ import { Identity } from 'spacetimedb';
44
import type { TableUpdate } from './table_cache.ts';
55
import { Timestamp } from 'spacetimedb';
66

7-
export type InitialSubscriptionMessage = {
7+
export type InitialSubscriptionMessage<RowType extends Record<string, any>> = {
88
tag: 'InitialSubscription';
9-
tableUpdates: TableUpdate[];
9+
tableUpdates: TableUpdate<RowType>[];
1010
};
1111

12-
export type TransactionUpdateMessage = {
12+
export type TransactionUpdateMessage<RowType extends Record<string, any>> = {
1313
tag: 'TransactionUpdate';
14-
tableUpdates: TableUpdate[];
14+
tableUpdates: TableUpdate<RowType>[];
1515
identity: Identity;
1616
connectionId: ConnectionId | null;
1717
reducerInfo?: {
@@ -24,10 +24,11 @@ export type TransactionUpdateMessage = {
2424
energyConsumed: bigint;
2525
};
2626

27-
export type TransactionUpdateLightMessage = {
28-
tag: 'TransactionUpdateLight';
29-
tableUpdates: TableUpdate[];
30-
};
27+
export type TransactionUpdateLightMessage<RowType extends Record<string, any>> =
28+
{
29+
tag: 'TransactionUpdateLight';
30+
tableUpdates: TableUpdate<RowType>[];
31+
};
3132

3233
export type IdentityTokenMessage = {
3334
tag: 'IdentityToken';
@@ -36,16 +37,16 @@ export type IdentityTokenMessage = {
3637
connectionId: ConnectionId;
3738
};
3839

39-
export type SubscribeAppliedMessage = {
40+
export type SubscribeAppliedMessage<RowType extends Record<string, any>> = {
4041
tag: 'SubscribeApplied';
4142
queryId: number;
42-
tableUpdates: TableUpdate[];
43+
tableUpdates: TableUpdate<RowType>[];
4344
};
4445

45-
export type UnsubscribeAppliedMessage = {
46+
export type UnsubscribeAppliedMessage<RowType extends Record<string, any>> = {
4647
tag: 'UnsubscribeApplied';
4748
queryId: number;
48-
tableUpdates: TableUpdate[];
49+
tableUpdates: TableUpdate<RowType>[];
4950
};
5051

5152
export type SubscriptionError = {
@@ -54,11 +55,12 @@ export type SubscriptionError = {
5455
error: string;
5556
};
5657

57-
export type Message =
58-
| InitialSubscriptionMessage
59-
| TransactionUpdateMessage
60-
| TransactionUpdateLightMessage
61-
| IdentityTokenMessage
62-
| SubscribeAppliedMessage
63-
| UnsubscribeAppliedMessage
64-
| SubscriptionError;
58+
export type Message<RowType extends Record<string, any> = Record<string, any>> =
59+
60+
| InitialSubscriptionMessage<RowType>
61+
| TransactionUpdateMessage<RowType>
62+
| TransactionUpdateLightMessage<RowType>
63+
| IdentityTokenMessage
64+
| SubscribeAppliedMessage<RowType>
65+
| UnsubscribeAppliedMessage<RowType>
66+
| SubscriptionError;

sdks/typescript/packages/sdk/src/table_cache.ts

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,21 @@ import {
88
import { stdbLogger } from './logger.ts';
99
import type { ComparablePrimitive } from 'spacetimedb';
1010

11-
export type Operation = {
11+
export type Operation<
12+
RowType extends Record<string, any> = Record<string, any>,
13+
> = {
1214
type: 'insert' | 'delete';
1315
// For tables with a primary key, this is the primary key value, as a primitive or string.
1416
// Otherwise, it is an encoding of the full row.
1517
rowId: ComparablePrimitive;
16-
// TODO: Refine this type to at least reflect that it is a product.
17-
row: any;
18+
row: RowType;
1819
};
1920

20-
export type TableUpdate = {
21+
export type TableUpdate<
22+
RowType extends Record<string, any> = Record<string, any>,
23+
> = {
2124
tableName: string;
22-
operations: Operation[];
25+
operations: Operation<RowType>[];
2326
};
2427

2528
export type PendingCallback = {
@@ -30,7 +33,9 @@ export type PendingCallback = {
3033
/**
3134
* Builder to generate calls to query a `table` in the database
3235
*/
33-
export class TableCache<RowType = any> {
36+
export class TableCache<
37+
RowType extends Record<string, any> = Record<string, any>,
38+
> {
3439
private rows: Map<ComparablePrimitive, [RowType, number]>;
3540
private tableTypeInfo: TableRuntimeTypeInfo;
3641
private emitter: EventEmitter<'insert' | 'delete' | 'update'>;
@@ -57,18 +62,24 @@ export class TableCache<RowType = any> {
5762
/**
5863
* @returns The values of the rows in the table
5964
*/
60-
iter(): any[] {
65+
iter(): RowType[] {
6166
return Array.from(this.rows.values()).map(([row]) => row);
6267
}
6368

6469
applyOperations = (
65-
operations: Operation[],
70+
operations: Operation<RowType>[],
6671
ctx: EventContextInterface
6772
): PendingCallback[] => {
6873
const pendingCallbacks: PendingCallback[] = [];
6974
if (this.tableTypeInfo.primaryKeyInfo !== undefined) {
70-
const insertMap = new Map<ComparablePrimitive, [Operation, number]>();
71-
const deleteMap = new Map<ComparablePrimitive, [Operation, number]>();
75+
const insertMap = new Map<
76+
ComparablePrimitive,
77+
[Operation<RowType>, number]
78+
>();
79+
const deleteMap = new Map<
80+
ComparablePrimitive,
81+
[Operation<RowType>, number]
82+
>();
7283
for (const op of operations) {
7384
if (op.type === 'insert') {
7485
const [_, prevCount] = insertMap.get(op.rowId) || [op, 0];
@@ -177,7 +188,7 @@ export class TableCache<RowType = any> {
177188

178189
insert = (
179190
ctx: EventContextInterface,
180-
operation: Operation,
191+
operation: Operation<RowType>,
181192
count: number = 1
182193
): PendingCallback | undefined => {
183194
const [_, previousCount] = this.rows.get(operation.rowId) || [
@@ -200,7 +211,7 @@ export class TableCache<RowType = any> {
200211

201212
delete = (
202213
ctx: EventContextInterface,
203-
operation: Operation,
214+
operation: Operation<RowType>,
204215
count: number = 1
205216
): PendingCallback | undefined => {
206217
const [_, previousCount] = this.rows.get(operation.rowId) || [

smoketests/__init__.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ def log_records(self, n):
216216
logs = self.spacetime("logs", "--format=json", "-n", str(n), "--", self.database_identity)
217217
return list(map(json.loads, logs.splitlines()))
218218

219-
def publish_module(self, domain=None, *, clear=True, capture_stderr=True):
219+
def publish_module(self, domain=None, *, clear=True, capture_stderr=True, num_replicas=None):
220220
print("publishing module", self.publish_module)
221221
publish_output = self.spacetime(
222222
"publish",
@@ -227,10 +227,11 @@ def publish_module(self, domain=None, *, clear=True, capture_stderr=True):
227227
# because the server address is `node` which doesn't look like `localhost` or `127.0.0.1`
228228
# and so the publish step prompts for confirmation.
229229
"--yes",
230+
*["--num-replicas", f"{num_replicas}"] if num_replicas is not None else [],
230231
capture_stderr=capture_stderr,
231232
)
232233
self.resolved_identity = re.search(r"identity: ([0-9a-fA-F]+)", publish_output)[1]
233-
self.database_identity = domain if domain is not None else self.resolved_identity
234+
self.database_identity = self.resolved_identity
234235

235236
@classmethod
236237
def reset_config(cls):

0 commit comments

Comments
 (0)