Skip to content

Commit 22daade

Browse files
committed
[TS] Anonymous transactions
1 parent cb3ac50 commit 22daade

10 files changed

Lines changed: 266 additions & 92 deletions

File tree

crates/bindings-typescript/src/lib/procedures.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import type { ConnectionId } from '../lib/connection_id';
33
import type { Identity } from '../lib/identity';
44
import type { Timestamp } from '../lib/timestamp';
55
import type { HttpClient } from '../server/http_internal';
6-
import type { ParamsObj } from './reducers';
6+
import type { ParamsObj, ReducerCtx } from './reducers';
77
import {
88
MODULE_DEF,
99
registerTypesRecursively,
@@ -25,8 +25,12 @@ export interface ProcedureCtx<S extends UntypedSchemaDef> {
2525
readonly timestamp: Timestamp;
2626
readonly connectionId: ConnectionId | null;
2727
readonly http: HttpClient;
28+
withTx<T>(body: (ctx: TransactionCtx<S>) => T): T;
2829
}
2930

31+
export interface TransactionCtx<S extends UntypedSchemaDef>
32+
extends ReducerCtx<S> {}
33+
3034
export function procedure<
3135
S extends UntypedSchemaDef,
3236
Params extends ParamsObj,

crates/bindings-typescript/src/server/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ export { reducers } from '../lib/reducers';
55
export { SenderError, SpacetimeHostError, errors } from './errors';
66
export { type Reducer, type ReducerCtx } from '../lib/reducers';
77
export { type DbView } from './db_view';
8+
export type { ProcedureCtx, TransactionCtx } from '../lib/procedures';
89

910
import './polyfills'; // Ensure polyfills are loaded
1011
import './register_hooks'; // Ensure module hooks are registered

crates/bindings-typescript/src/server/procedures.ts

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,16 @@ import BinaryReader from '../lib/binary_reader';
33
import BinaryWriter from '../lib/binary_writer';
44
import type { ConnectionId } from '../lib/connection_id';
55
import { Identity } from '../lib/identity';
6-
import { PROCEDURES, type ProcedureCtx } from '../lib/procedures';
6+
import {
7+
PROCEDURES,
8+
type ProcedureCtx,
9+
type TransactionCtx,
10+
} from '../lib/procedures';
711
import { MODULE_DEF, type UntypedSchemaDef } from '../lib/schema';
8-
import type { Timestamp } from '../lib/timestamp';
12+
import { Timestamp } from '../lib/timestamp';
13+
import type { DbView } from './db_view';
914
import { httpClient } from './http_internal';
10-
import { sys } from './runtime';
15+
import { makeReducerCtx, sys } from './runtime';
1116

1217
const { freeze } = Object;
1318

@@ -25,15 +30,45 @@ export function callProcedure(
2530
MODULE_DEF.typespace
2631
);
2732

28-
const ctx: ProcedureCtx<UntypedSchemaDef> = freeze({
33+
const ctx: ProcedureCtx<UntypedSchemaDef> = {
2934
sender,
3035
timestamp,
3136
connectionId,
3237
http: httpClient,
3338
get identity() {
3439
return new Identity(sys.identity().__identity__);
3540
},
36-
});
41+
withTx(body) {
42+
const run = () => {
43+
const timestamp = sys.procedure_start_mut_tx();
44+
45+
try {
46+
const ctx: TransactionCtx<UntypedSchemaDef> = freeze(
47+
makeReducerCtx(sender, new Timestamp(timestamp), connectionId)
48+
);
49+
return body(ctx);
50+
} catch (e) {
51+
sys.procedure_abort_mut_tx();
52+
throw e;
53+
}
54+
};
55+
56+
let res = run();
57+
try {
58+
sys.procedure_commit_mut_tx();
59+
return res;
60+
} catch {}
61+
console.warn('committing anonymous transaction failed');
62+
res = run();
63+
try {
64+
sys.procedure_commit_mut_tx();
65+
return res;
66+
} catch (e) {
67+
throw new Error('transaction retry failed again', { cause: e });
68+
}
69+
},
70+
};
71+
freeze(ctx);
3772

3873
const ret = fn(ctx, args);
3974
const retBuf = new BinaryWriter(returnTypeBaseSize);

crates/bindings-typescript/src/server/runtime.ts

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import {
2525
type JwtClaims,
2626
type ReducerCtx,
2727
} from '../lib/reducers';
28-
import { MODULE_DEF } from '../lib/schema';
28+
import { MODULE_DEF, type UntypedSchemaDef } from '../lib/schema';
2929
import { type RowType, type Table, type TableMethods } from '../lib/table';
3030
import { Timestamp } from '../lib/timestamp';
3131
import type { Infer } from '../lib/type_builders';
@@ -175,6 +175,21 @@ class AuthCtxImpl implements AuthCtx {
175175
}
176176
}
177177

178+
export const makeReducerCtx = (
179+
sender: Identity,
180+
timestamp: Timestamp,
181+
connectionId: ConnectionId | null
182+
): ReducerCtx<UntypedSchemaDef> => ({
183+
sender,
184+
get identity() {
185+
return new Identity(sys.identity().__identity__);
186+
},
187+
timestamp,
188+
connectionId,
189+
db: getDbView(),
190+
senderAuth: AuthCtxImpl.fromSystemTables(connectionId, sender),
191+
});
192+
178193
export const hooks: ModuleHooks = {
179194
__describe_module__() {
180195
const writer = new BinaryWriter(128);
@@ -195,19 +210,13 @@ export const hooks: ModuleHooks = {
195210
MODULE_DEF.typespace
196211
);
197212
const senderIdentity = new Identity(sender);
198-
const ctx: ReducerCtx<any> = freeze({
199-
sender: senderIdentity,
200-
get identity() {
201-
return new Identity(sys.identity().__identity__);
202-
},
203-
timestamp: new Timestamp(timestamp),
204-
connectionId: ConnectionId.nullIfZero(new ConnectionId(connId)),
205-
db: getDbView(),
206-
senderAuth: AuthCtxImpl.fromSystemTables(
207-
ConnectionId.nullIfZero(new ConnectionId(connId)),
208-
senderIdentity
209-
),
210-
});
213+
const ctx: ReducerCtx<any> = freeze(
214+
makeReducerCtx(
215+
senderIdentity,
216+
new Timestamp(timestamp),
217+
ConnectionId.nullIfZero(new ConnectionId(connId))
218+
)
219+
);
211220
try {
212221
return REDUCERS[reducerId](ctx, args) ?? { tag: 'ok' };
213222
} catch (e) {

crates/bindings-typescript/src/server/sys.d.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,4 +93,10 @@ declare module 'spacetime:sys@1.2' {
9393
request: Uint8Array,
9494
body: Uint8Array | string
9595
): [response: Uint8Array, body: Uint8Array];
96+
97+
export function procedure_start_mut_tx(): bigint;
98+
99+
export function procedure_commit_mut_tx();
100+
101+
export function procedure_abort_mut_tx();
96102
}

crates/core/src/host/instance_env.rs

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -601,7 +601,11 @@ impl InstanceEnv {
601601
written
602602
}
603603

604-
pub async fn start_mutable_tx(&mut self) -> Result<(), NodesError> {
604+
// Async procedure syscalls return a `Result<impl Future>`, so that we can check `get_tx()`
605+
// *before* requiring an async runtime. Otherwise, the v8 module host would have to call
606+
// on `tokio::runtime::Handle::try_current()` before being able to run the `get_tx()` check.
607+
608+
pub fn start_mutable_tx(&mut self) -> Result<impl Future<Output = ()> + use<'_>, NodesError> {
605609
if self.get_tx().is_ok() {
606610
return Err(NodesError::WouldBlockTransaction(
607611
super::AbiCall::ProcedureStartMutTransaction,
@@ -610,11 +614,13 @@ impl InstanceEnv {
610614

611615
let stdb = self.replica_ctx.relational_db.clone();
612616
// TODO(procedure-tx): should we add a new workload, e.g., `AnonTx`?
613-
let tx = asyncify(move || stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal)).await;
614-
self.tx.set_raw(tx);
615-
self.in_anon_tx = true;
617+
let fut = async move {
618+
let tx = asyncify(move || stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal)).await;
619+
self.tx.set_raw(tx);
620+
self.in_anon_tx = true;
621+
};
616622

617-
Ok(())
623+
Ok(fut)
618624
}
619625

620626
/// Finishes an anonymous transaction,
@@ -637,7 +643,7 @@ impl InstanceEnv {
637643
// *before* requiring an async runtime. Otherwise, the v8 module host would have to call
638644
// on `tokio::runtime::Handle::try_current()` before being able to run the `get_tx()` check.
639645

640-
pub async fn commit_mutable_tx(&mut self) -> Result<(), NodesError> {
646+
pub fn commit_mutable_tx(&mut self) -> Result<impl Future<Output = ()> + use<'_>, NodesError> {
641647
self.finish_anon_tx()?;
642648

643649
let stdb = self.relational_db().clone();
@@ -660,10 +666,12 @@ impl InstanceEnv {
660666
// This is somewhat expensive,
661667
// and can block for a while,
662668
// so we need to asyncify it.
663-
let event = asyncify(move || commit_and_broadcast_event(&subs, None, event, tx)).await;
664-
self.procedure_last_tx_offset = Some(event.tx_offset);
669+
let fut = async move {
670+
let event = asyncify(move || commit_and_broadcast_event(&subs, None, event, tx)).await;
671+
self.procedure_last_tx_offset = Some(event.tx_offset);
672+
};
665673

666-
Ok(())
674+
Ok(fut)
667675
}
668676

669677
pub fn abort_mutable_tx(&mut self) -> Result<(), NodesError> {

crates/core/src/host/v8/syscall/v1.rs

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use crate::host::wasm_common::{err_to_errno_and_log, RowIterIdx, TimingSpan, Tim
2020
use crate::host::AbiCall;
2121
use anyhow::Context;
2222
use bytes::Bytes;
23-
use spacetimedb_lib::{bsatn, ConnectionId, Identity, RawModuleDef};
23+
use spacetimedb_lib::{bsatn, ConnectionId, Identity, RawModuleDef, Timestamp};
2424
use spacetimedb_primitives::{errno, ColId, IndexId, ProcedureId, ReducerId, TableId, ViewFnPtr};
2525
use spacetimedb_sats::Serialize;
2626
use v8::{
@@ -129,7 +129,22 @@ pub(super) fn sys_v1_2<'scope>(scope: &mut PinScope<'scope, '_>) -> Local<'scope
129129
with_sys_result_value,
130130
AbiCall::ProcedureHttpRequest,
131131
procedure_http_request
132-
)
132+
),
133+
(
134+
with_sys_result_value,
135+
AbiCall::ProcedureStartMutTransaction,
136+
procedure_start_mut_tx
137+
),
138+
(
139+
with_sys_result_noret,
140+
AbiCall::ProcedureAbortMutTransaction,
141+
procedure_abort_mut_tx
142+
),
143+
(
144+
with_sys_result_noret,
145+
AbiCall::ProcedureCommitMutTransaction,
146+
procedure_commit_mut_tx
147+
),
133148
)
134149
}
135150

@@ -1585,3 +1600,37 @@ fn procedure_http_request<'scope>(
15851600
&[response.into(), response_body.into()],
15861601
))
15871602
}
1603+
1604+
fn procedure_start_mut_tx<'scope>(
1605+
scope: &mut PinScope<'scope, '_>,
1606+
_args: FunctionCallbackArguments<'_>,
1607+
) -> SysCallResult<Local<'scope, v8::BigInt>> {
1608+
let env = get_env(scope)?;
1609+
1610+
let fut = env.instance_env.start_mutable_tx()?;
1611+
1612+
let rt = tokio::runtime::Handle::current();
1613+
rt.block_on(fut);
1614+
1615+
let timestamp = Timestamp::now().to_micros_since_unix_epoch() as u64;
1616+
1617+
Ok(v8::BigInt::new_from_u64(scope, timestamp))
1618+
}
1619+
1620+
fn procedure_abort_mut_tx(scope: &mut PinScope<'_, '_>, _args: FunctionCallbackArguments<'_>) -> SysCallResult<()> {
1621+
let env = get_env(scope)?;
1622+
1623+
env.instance_env.abort_mutable_tx()?;
1624+
Ok(())
1625+
}
1626+
1627+
fn procedure_commit_mut_tx(scope: &mut PinScope<'_, '_>, _args: FunctionCallbackArguments<'_>) -> SysCallResult<()> {
1628+
let env = get_env(scope)?;
1629+
1630+
let fut = env.instance_env.commit_mutable_tx()?;
1631+
1632+
let rt = tokio::runtime::Handle::current();
1633+
rt.block_on(fut);
1634+
1635+
Ok(())
1636+
}

crates/core/src/host/wasmtime/wasm_instance_env.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1465,7 +1465,11 @@ impl WasmInstanceEnv {
14651465
AbiCall::ProcedureStartMutTransaction,
14661466
move |mut caller| async move {
14671467
let (mem, env) = Self::mem_env(&mut caller);
1468-
let res = env.instance_env.start_mutable_tx().await.map_err(Into::into);
1468+
let res = async {
1469+
env.instance_env.start_mutable_tx()?.await;
1470+
Ok(())
1471+
}
1472+
.await;
14691473
let timestamp = Timestamp::now().to_micros_since_unix_epoch() as u64;
14701474
let res = res.and_then(|()| Ok(timestamp.write_to(mem, out)?));
14711475

@@ -1508,12 +1512,12 @@ impl WasmInstanceEnv {
15081512
|mut caller| async move {
15091513
let (_, env) = Self::mem_env(&mut caller);
15101514

1511-
let res = env
1512-
.instance_env
1513-
.commit_mutable_tx()
1514-
.await
1515-
.map(|()| 0u16.into())
1516-
.or_else(|err| Self::convert_wasm_result(AbiCall::ProcedureCommitMutTransaction, err.into()));
1515+
let res = async {
1516+
env.instance_env.commit_mutable_tx()?.await;
1517+
Ok(0u16.into())
1518+
}
1519+
.await
1520+
.or_else(|err| Self::convert_wasm_result(AbiCall::ProcedureCommitMutTransaction, err));
15171521

15181522
(caller, res)
15191523
},

modules/sdk-test-procedure-ts/src/index.ts

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,14 @@
11
// ─────────────────────────────────────────────────────────────────────────────
22
// IMPORTS
33
// ─────────────────────────────────────────────────────────────────────────────
4-
import { errors, schema, t, table } from 'spacetimedb/server';
4+
import {
5+
errors,
6+
schema,
7+
t,
8+
table,
9+
type ProcedureCtx,
10+
type TransactionCtx,
11+
} from 'spacetimedb/server';
512

613
const ReturnStruct = t.object('ReturnStruct', {
714
a: t.u32(),
@@ -13,7 +20,12 @@ const ReturnEnum = t.enum('ReturnEnum', {
1320
B: t.string(),
1421
});
1522

16-
const spacetimedb = schema();
23+
const MyTable = table(
24+
{ name: 'my_table', public: true },
25+
{ field: ReturnStruct }
26+
);
27+
28+
const spacetimedb = schema(MyTable);
1729

1830
spacetimedb.procedure(
1931
'return_primitive',
@@ -68,3 +80,42 @@ spacetimedb.procedure('invalid_request', t.string(), ctx => {
6880
throw e;
6981
}
7082
});
83+
84+
function insertMyTable(ctx: TransactionCtx<typeof spacetimedb.schemaType>) {
85+
ctx.db.myTable.insert({ field: { a: 42, b: 'magic' } });
86+
}
87+
88+
function assertRowCount(
89+
ctx: ProcedureCtx<typeof spacetimedb.schemaType>,
90+
count: number
91+
) {
92+
ctx.withTx(ctx => {
93+
assertEqual(ctx.db.myTable.count(), BigInt(count));
94+
});
95+
}
96+
97+
function assertEqual<T>(a: T, b: T) {
98+
if (a !== b) {
99+
throw new Error(`assertion failed: ${a} != ${b}`);
100+
}
101+
}
102+
103+
spacetimedb.procedure('insert_with_tx_commit', t.unit(), ctx => {
104+
ctx.withTx(insertMyTable);
105+
assertRowCount(ctx, 1);
106+
return {};
107+
});
108+
109+
spacetimedb.procedure('insert_with_tx_rollback', t.unit(), ctx => {
110+
const error = {};
111+
try {
112+
ctx.withTx(ctx => {
113+
insertMyTable(ctx);
114+
throw error;
115+
});
116+
} catch (e) {
117+
if (e !== error) throw e;
118+
}
119+
assertRowCount(ctx, 0);
120+
return {};
121+
});

0 commit comments

Comments
 (0)