Skip to content

Commit e6b147a

Browse files
committed
Merge branch 'master' into centril/clear-table-and-fix-drop-table
2 parents 8c48efb + 2f9554c commit e6b147a

10 files changed

Lines changed: 191 additions & 166 deletions

File tree

crates/core/src/db/relational_db.rs

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1621,13 +1621,22 @@ pub async fn local_durability(commitlog_dir: CommitLogDir) -> io::Result<(LocalD
16211621
/// [DurabilityProvider]: crate::host::host_controller::DurabilityProvider
16221622
pub async fn snapshot_watching_commitlog_compressor(
16231623
mut snapshot_rx: watch::Receiver<u64>,
1624+
mut clog_tx: Option<tokio::sync::mpsc::Sender<u64>>,
1625+
mut snap_tx: Option<tokio::sync::mpsc::Sender<u64>>,
16241626
durability: LocalDurability,
16251627
) {
16261628
let mut prev_snapshot_offset = *snapshot_rx.borrow_and_update();
16271629
while snapshot_rx.changed().await.is_ok() {
16281630
let snapshot_offset = *snapshot_rx.borrow_and_update();
16291631
let durability = durability.clone();
1630-
let res = asyncify(move || {
1632+
1633+
if let Some(snap_tx) = &mut snap_tx {
1634+
if let Err(err) = snap_tx.try_send(snapshot_offset) {
1635+
tracing::warn!("failed to send offset {snapshot_offset} after snapshot creation: {err}");
1636+
}
1637+
}
1638+
1639+
let res: io::Result<_> = asyncify(move || {
16311640
let segment_offsets = durability.existing_segment_offsets()?;
16321641
let start_idx = segment_offsets
16331642
.binary_search(&prev_snapshot_offset)
@@ -1641,15 +1650,27 @@ pub async fn snapshot_watching_commitlog_compressor(
16411650
// in this case, segment_offsets[end_idx] is the segment that contains the snapshot,
16421651
// which we don't want to compress, so an exclusive range is correct.
16431652
let segment_offsets = &segment_offsets[..end_idx];
1644-
durability.compress_segments(segment_offsets)
1653+
durability.compress_segments(segment_offsets)?;
1654+
let n = segment_offsets.len();
1655+
let last_compressed_segment = if n > 0 { Some(segment_offsets[n - 1]) } else { None };
1656+
Ok(last_compressed_segment)
16451657
})
16461658
.await;
16471659

1648-
if let Err(e) = res {
1649-
tracing::warn!("failed to compress segments: {e}");
1650-
continue;
1651-
}
1660+
let last_compressed_segment = match res {
1661+
Ok(opt_offset) => opt_offset,
1662+
Err(err) => {
1663+
tracing::warn!("failed to compress segments: {err}");
1664+
continue;
1665+
}
1666+
};
16521667
prev_snapshot_offset = snapshot_offset;
1668+
1669+
if let Some((clog_tx, last_compressed_segment)) = clog_tx.as_mut().zip(last_compressed_segment) {
1670+
if let Err(err) = clog_tx.try_send(last_compressed_segment) {
1671+
tracing::warn!("failed to send offset {last_compressed_segment} after compression: {err}");
1672+
}
1673+
}
16531674
}
16541675
}
16551676

crates/core/src/host/host_controller.rs

Lines changed: 1 addition & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use crate::subscription::module_subscription_manager::{spawn_send_worker, Subscr
1616
use crate::util::asyncify;
1717
use crate::util::jobs::{JobCore, JobCores};
1818
use crate::worker_metrics::WORKER_METRICS;
19-
use anyhow::{anyhow, ensure, Context};
19+
use anyhow::{anyhow, Context};
2020
use async_trait::async_trait;
2121
use durability::{Durability, EmptyHistory};
2222
use log::{info, trace, warn};
@@ -448,117 +448,6 @@ impl HostController {
448448
host.migrate_plan(host_type, program, style).await
449449
}
450450

451-
/// Start the host `replica_id` and conditionally update it.
452-
///
453-
/// If the host was not initialized before, it is initialized with the
454-
/// program [`Database::initial_program`], which is loaded from the
455-
/// controller's [`ProgramStorage`].
456-
///
457-
/// If it was already initialized and its stored program hash matches
458-
/// [`Database::initial_program`], no further action is taken.
459-
///
460-
/// Otherwise, if `expected_hash` is `Some` and does **not** match the
461-
/// stored hash, an error is returned.
462-
///
463-
/// Otherwise, the host is updated to [`Database::initial_program`], loading
464-
/// the program data from the controller's [`ProgramStorage`].
465-
///
466-
/// > Note that this ascribes different semantics to [`Database::initial_program`]
467-
/// > than elsewhere, where the [`Database`] value is provided by the control
468-
/// > database. The method is mainly useful for bootstrapping the control
469-
/// > database itself.
470-
pub async fn init_maybe_update_module_host(
471-
&self,
472-
database: Database,
473-
replica_id: u64,
474-
expected_hash: Option<Hash>,
475-
) -> anyhow::Result<watch::Receiver<ModuleHost>> {
476-
trace!("custom bootstrap {}/{}", database.database_identity, replica_id);
477-
478-
let db_addr = database.database_identity;
479-
let host_type = database.host_type;
480-
let program_hash = database.initial_program;
481-
482-
let mut guard = self.acquire_write_lock(replica_id).await;
483-
484-
// `HostController::clone` is fast,
485-
// as all of its fields are either `Copy` or wrapped in `Arc`.
486-
let this = self.clone();
487-
488-
// `try_init_host` is not cancel safe, as it will spawn other async tasks
489-
// which hold a filesystem lock past when `try_init_host` returns or is cancelled.
490-
// This means that, if `try_init_host` is cancelled, subsequent calls will fail.
491-
//
492-
// The rest of this future is also not cancel safe, as it will `Option::take` out of the guard
493-
// at the start of the block and then store back into it at the end.
494-
//
495-
// This is problematic because Axum will cancel its handler tasks if the client disconnects,
496-
// and this method is called from Axum handlers, e.g. for the publish route.
497-
// `tokio::spawn` a task to update the contents of `guard`,
498-
// so that it will run to completion even if the caller goes away.
499-
//
500-
// Note that `tokio::spawn` only cancels its tasks when the runtime shuts down,
501-
// at which point we won't be calling `try_init_host` again anyways.
502-
let module = tokio::spawn(async move {
503-
let mut host = match guard.take() {
504-
Some(host) => host,
505-
None => this.try_init_host(database, replica_id).await?,
506-
};
507-
let module = host.module.subscribe();
508-
509-
// The program is now either:
510-
//
511-
// - the desired one from [Database], in which case we do nothing
512-
// - `Some` expected hash, in which case we update to the desired one
513-
// - `None` expected hash, in which case we also update
514-
let stored_hash = stored_program_hash(host.db())?
515-
.with_context(|| format!("[{db_addr}] database improperly initialized"))?;
516-
if stored_hash == program_hash {
517-
info!("[{db_addr}] database up-to-date with {program_hash}");
518-
*guard = Some(host);
519-
} else {
520-
if let Some(expected_hash) = expected_hash {
521-
ensure!(
522-
expected_hash == stored_hash,
523-
"[{}] expected program {} found {}",
524-
db_addr,
525-
expected_hash,
526-
stored_hash
527-
);
528-
}
529-
info!("[{db_addr}] updating database from `{stored_hash}` to `{program_hash}`");
530-
let program = load_program(&this.program_storage, program_hash).await?;
531-
let update_result = host
532-
.update_module(
533-
this.runtimes.clone(),
534-
host_type,
535-
program,
536-
MigrationPolicy::Compatible,
537-
this.energy_monitor.clone(),
538-
this.unregister_fn(replica_id),
539-
this.db_cores.take(),
540-
)
541-
.await?;
542-
match update_result {
543-
UpdateDatabaseResult::NoUpdateNeeded | UpdateDatabaseResult::UpdatePerformed => {
544-
*guard = Some(host);
545-
}
546-
UpdateDatabaseResult::AutoMigrateError(e) => {
547-
return Err(anyhow::anyhow!(e));
548-
}
549-
UpdateDatabaseResult::ErrorExecutingMigration(e) => {
550-
return Err(e);
551-
}
552-
}
553-
}
554-
555-
Ok::<_, anyhow::Error>(module)
556-
})
557-
.await??;
558-
559-
Ok(module)
560-
}
561-
562451
/// Release all resources of the [`ModuleHost`] identified by `replica_id`,
563452
/// and deregister it from the controller.
564453
#[tracing::instrument(level = "trace", skip_all)]
@@ -1115,10 +1004,6 @@ impl Host {
11151004

11161005
Ok(res)
11171006
}
1118-
1119-
fn db(&self) -> &RelationalDB {
1120-
&self.replica_ctx.relational_db
1121-
}
11221007
}
11231008

11241009
impl Drop for Host {

crates/core/src/host/module_host.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
3232
use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload, WorkloadType};
3333
use spacetimedb_datastore::locking_tx_datastore::MutTxId;
3434
use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData};
35+
use spacetimedb_durability::DurableOffset;
3536
use spacetimedb_execution::pipelined::PipelinedProject;
3637
use spacetimedb_lib::db::raw_def::v9::Lifecycle;
3738
use spacetimedb_lib::identity::{AuthCtx, RequestId};
@@ -1233,6 +1234,10 @@ impl ModuleHost {
12331234
&self.replica_ctx().database
12341235
}
12351236

1237+
pub fn durable_tx_offset(&self) -> Option<DurableOffset> {
1238+
self.replica_ctx().relational_db.durable_tx_offset()
1239+
}
1240+
12361241
pub(crate) fn replica_ctx(&self) -> &ReplicaContext {
12371242
self.module.replica_ctx()
12381243
}

crates/standalone/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ impl DurabilityProvider for StandaloneDurabilityProvider {
127127
|snapshot_rx| {
128128
tokio::spawn(relational_db::snapshot_watching_commitlog_compressor(
129129
snapshot_rx,
130+
None,
131+
None,
130132
durability,
131133
));
132134
}

sdks/typescript/packages/sdk/src/client_cache.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ export class ClientCache {
55
/**
66
* The tables in the database.
77
*/
8-
tables: Map<string, TableCache>;
8+
tables: Map<string, TableCache<any>>;
99

1010
constructor() {
1111
this.tables = new Map();
@@ -16,7 +16,9 @@ export class ClientCache {
1616
* @param name The name of the table.
1717
* @returns The table
1818
*/
19-
getTable(name: string): TableCache {
19+
getTable<RowType extends Record<string, any>>(
20+
name: string
21+
): TableCache<RowType> {
2022
const table = this.tables.get(name);
2123

2224
// ! This should not happen as the table should be available but an exception is thrown just in case.
@@ -30,10 +32,10 @@ export class ClientCache {
3032
return table;
3133
}
3234

33-
getOrCreateTable<RowType>(
35+
getOrCreateTable<RowType extends Record<string, any>>(
3436
tableTypeInfo: TableRuntimeTypeInfo
3537
): TableCache<RowType> {
36-
let table: TableCache;
38+
let table: TableCache<RowType>;
3739
if (!this.tables.has(tableTypeInfo.tableName)) {
3840
table = new TableCache<RowType>(tableTypeInfo);
3941
this.tables.set(tableTypeInfo.tableName, table);

sdks/typescript/packages/sdk/src/db_connection_impl.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -483,7 +483,9 @@ export class DbConnectionImpl<
483483
const parsedTableUpdates = await parseDatabaseUpdate(
484484
message.value.update
485485
);
486-
const subscribeAppliedMessage: SubscribeAppliedMessage = {
486+
const subscribeAppliedMessage: SubscribeAppliedMessage<
487+
Record<string, any>
488+
> = {
487489
tag: 'SubscribeApplied',
488490
queryId: message.value.queryId.id,
489491
tableUpdates: parsedTableUpdates,
@@ -495,7 +497,9 @@ export class DbConnectionImpl<
495497
const parsedTableUpdates = await parseDatabaseUpdate(
496498
message.value.update
497499
);
498-
const unsubscribeAppliedMessage: UnsubscribeAppliedMessage = {
500+
const unsubscribeAppliedMessage: UnsubscribeAppliedMessage<
501+
Record<string, any>
502+
> = {
499503
tag: 'UnsubscribeApplied',
500504
queryId: message.value.queryId.id,
501505
tableUpdates: parsedTableUpdates,

sdks/typescript/packages/sdk/src/message_types.ts

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ import { Identity } from 'spacetimedb';
44
import type { TableUpdate } from './table_cache.ts';
55
import { Timestamp } from 'spacetimedb';
66

7-
export type InitialSubscriptionMessage = {
7+
export type InitialSubscriptionMessage<RowType extends Record<string, any>> = {
88
tag: 'InitialSubscription';
9-
tableUpdates: TableUpdate[];
9+
tableUpdates: TableUpdate<RowType>[];
1010
};
1111

12-
export type TransactionUpdateMessage = {
12+
export type TransactionUpdateMessage<RowType extends Record<string, any>> = {
1313
tag: 'TransactionUpdate';
14-
tableUpdates: TableUpdate[];
14+
tableUpdates: TableUpdate<RowType>[];
1515
identity: Identity;
1616
connectionId: ConnectionId | null;
1717
reducerInfo?: {
@@ -24,10 +24,11 @@ export type TransactionUpdateMessage = {
2424
energyConsumed: bigint;
2525
};
2626

27-
export type TransactionUpdateLightMessage = {
28-
tag: 'TransactionUpdateLight';
29-
tableUpdates: TableUpdate[];
30-
};
27+
export type TransactionUpdateLightMessage<RowType extends Record<string, any>> =
28+
{
29+
tag: 'TransactionUpdateLight';
30+
tableUpdates: TableUpdate<RowType>[];
31+
};
3132

3233
export type IdentityTokenMessage = {
3334
tag: 'IdentityToken';
@@ -36,16 +37,16 @@ export type IdentityTokenMessage = {
3637
connectionId: ConnectionId;
3738
};
3839

39-
export type SubscribeAppliedMessage = {
40+
export type SubscribeAppliedMessage<RowType extends Record<string, any>> = {
4041
tag: 'SubscribeApplied';
4142
queryId: number;
42-
tableUpdates: TableUpdate[];
43+
tableUpdates: TableUpdate<RowType>[];
4344
};
4445

45-
export type UnsubscribeAppliedMessage = {
46+
export type UnsubscribeAppliedMessage<RowType extends Record<string, any>> = {
4647
tag: 'UnsubscribeApplied';
4748
queryId: number;
48-
tableUpdates: TableUpdate[];
49+
tableUpdates: TableUpdate<RowType>[];
4950
};
5051

5152
export type SubscriptionError = {
@@ -54,11 +55,12 @@ export type SubscriptionError = {
5455
error: string;
5556
};
5657

57-
export type Message =
58-
| InitialSubscriptionMessage
59-
| TransactionUpdateMessage
60-
| TransactionUpdateLightMessage
61-
| IdentityTokenMessage
62-
| SubscribeAppliedMessage
63-
| UnsubscribeAppliedMessage
64-
| SubscriptionError;
58+
export type Message<RowType extends Record<string, any> = Record<string, any>> =
59+
60+
| InitialSubscriptionMessage<RowType>
61+
| TransactionUpdateMessage<RowType>
62+
| TransactionUpdateLightMessage<RowType>
63+
| IdentityTokenMessage
64+
| SubscribeAppliedMessage<RowType>
65+
| UnsubscribeAppliedMessage<RowType>
66+
| SubscriptionError;

0 commit comments

Comments
 (0)