Skip to content

Commit fd524cf

Browse files
authored
[TS] Anonymous transactions (#3743)
# Description of Changes Mirrors the Rust API. # Expected complexity level and risk 2 # Testing - [x] Automated procedure testing now enabled for typescript (from `sdks/rust/tests`)
1 parent cb3ac50 commit fd524cf

10 files changed

Lines changed: 268 additions & 93 deletions

File tree

crates/bindings-typescript/src/lib/procedures.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import type { ConnectionId } from '../lib/connection_id';
33
import type { Identity } from '../lib/identity';
44
import type { Timestamp } from '../lib/timestamp';
55
import type { HttpClient } from '../server/http_internal';
6-
import type { ParamsObj } from './reducers';
6+
import type { ParamsObj, ReducerCtx } from './reducers';
77
import {
88
MODULE_DEF,
99
registerTypesRecursively,
@@ -18,15 +18,19 @@ export type ProcedureFn<
1818
Ret extends TypeBuilder<any, any>,
1919
> = (ctx: ProcedureCtx<S>, args: InferTypeOfRow<Params>) => Infer<Ret>;
2020

21-
// eslint-disable-next-line @typescript-eslint/no-unused-vars
2221
export interface ProcedureCtx<S extends UntypedSchemaDef> {
2322
readonly sender: Identity;
2423
readonly identity: Identity;
2524
readonly timestamp: Timestamp;
2625
readonly connectionId: ConnectionId | null;
2726
readonly http: HttpClient;
27+
withTx<T>(body: (ctx: TransactionCtx<S>) => T): T;
2828
}
2929

30+
// eslint-disable-next-line @typescript-eslint/no-empty-object-type
31+
export interface TransactionCtx<S extends UntypedSchemaDef>
32+
extends ReducerCtx<S> {}
33+
3034
export function procedure<
3135
S extends UntypedSchemaDef,
3236
Params extends ParamsObj,

crates/bindings-typescript/src/server/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ export { reducers } from '../lib/reducers';
55
export { SenderError, SpacetimeHostError, errors } from './errors';
66
export { type Reducer, type ReducerCtx } from '../lib/reducers';
77
export { type DbView } from './db_view';
8+
export type { ProcedureCtx, TransactionCtx } from '../lib/procedures';
89

910
import './polyfills'; // Ensure polyfills are loaded
1011
import './register_hooks'; // Ensure module hooks are registered

crates/bindings-typescript/src/server/procedures.ts

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,15 @@ import BinaryReader from '../lib/binary_reader';
33
import BinaryWriter from '../lib/binary_writer';
44
import type { ConnectionId } from '../lib/connection_id';
55
import { Identity } from '../lib/identity';
6-
import { PROCEDURES, type ProcedureCtx } from '../lib/procedures';
6+
import {
7+
PROCEDURES,
8+
type ProcedureCtx,
9+
type TransactionCtx,
10+
} from '../lib/procedures';
711
import { MODULE_DEF, type UntypedSchemaDef } from '../lib/schema';
8-
import type { Timestamp } from '../lib/timestamp';
12+
import { Timestamp } from '../lib/timestamp';
913
import { httpClient } from './http_internal';
10-
import { sys } from './runtime';
14+
import { makeReducerCtx, sys } from './runtime';
1115

1216
const { freeze } = Object;
1317

@@ -25,15 +29,47 @@ export function callProcedure(
2529
MODULE_DEF.typespace
2630
);
2731

28-
const ctx: ProcedureCtx<UntypedSchemaDef> = freeze({
32+
const ctx: ProcedureCtx<UntypedSchemaDef> = {
2933
sender,
3034
timestamp,
3135
connectionId,
3236
http: httpClient,
3337
get identity() {
3438
return new Identity(sys.identity().__identity__);
3539
},
36-
});
40+
withTx(body) {
41+
const run = () => {
42+
const timestamp = sys.procedure_start_mut_tx();
43+
44+
try {
45+
const ctx: TransactionCtx<UntypedSchemaDef> = freeze(
46+
makeReducerCtx(sender, new Timestamp(timestamp), connectionId)
47+
);
48+
return body(ctx);
49+
} catch (e) {
50+
sys.procedure_abort_mut_tx();
51+
throw e;
52+
}
53+
};
54+
55+
let res = run();
56+
try {
57+
sys.procedure_commit_mut_tx();
58+
return res;
59+
} catch {
60+
// ignore the commit error
61+
}
62+
console.warn('committing anonymous transaction failed');
63+
res = run();
64+
try {
65+
sys.procedure_commit_mut_tx();
66+
return res;
67+
} catch (e) {
68+
throw new Error('transaction retry failed again', { cause: e });
69+
}
70+
},
71+
};
72+
freeze(ctx);
3773

3874
const ret = fn(ctx, args);
3975
const retBuf = new BinaryWriter(returnTypeBaseSize);

crates/bindings-typescript/src/server/runtime.ts

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import {
2525
type JwtClaims,
2626
type ReducerCtx,
2727
} from '../lib/reducers';
28-
import { MODULE_DEF } from '../lib/schema';
28+
import { MODULE_DEF, type UntypedSchemaDef } from '../lib/schema';
2929
import { type RowType, type Table, type TableMethods } from '../lib/table';
3030
import { Timestamp } from '../lib/timestamp';
3131
import type { Infer } from '../lib/type_builders';
@@ -175,6 +175,21 @@ class AuthCtxImpl implements AuthCtx {
175175
}
176176
}
177177

178+
export const makeReducerCtx = (
179+
sender: Identity,
180+
timestamp: Timestamp,
181+
connectionId: ConnectionId | null
182+
): ReducerCtx<UntypedSchemaDef> => ({
183+
sender,
184+
get identity() {
185+
return new Identity(sys.identity().__identity__);
186+
},
187+
timestamp,
188+
connectionId,
189+
db: getDbView(),
190+
senderAuth: AuthCtxImpl.fromSystemTables(connectionId, sender),
191+
});
192+
178193
export const hooks: ModuleHooks = {
179194
__describe_module__() {
180195
const writer = new BinaryWriter(128);
@@ -195,19 +210,13 @@ export const hooks: ModuleHooks = {
195210
MODULE_DEF.typespace
196211
);
197212
const senderIdentity = new Identity(sender);
198-
const ctx: ReducerCtx<any> = freeze({
199-
sender: senderIdentity,
200-
get identity() {
201-
return new Identity(sys.identity().__identity__);
202-
},
203-
timestamp: new Timestamp(timestamp),
204-
connectionId: ConnectionId.nullIfZero(new ConnectionId(connId)),
205-
db: getDbView(),
206-
senderAuth: AuthCtxImpl.fromSystemTables(
207-
ConnectionId.nullIfZero(new ConnectionId(connId)),
208-
senderIdentity
209-
),
210-
});
213+
const ctx: ReducerCtx<any> = freeze(
214+
makeReducerCtx(
215+
senderIdentity,
216+
new Timestamp(timestamp),
217+
ConnectionId.nullIfZero(new ConnectionId(connId))
218+
)
219+
);
211220
try {
212221
return REDUCERS[reducerId](ctx, args) ?? { tag: 'ok' };
213222
} catch (e) {

crates/bindings-typescript/src/server/sys.d.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,4 +93,10 @@ declare module 'spacetime:sys@1.2' {
9393
request: Uint8Array,
9494
body: Uint8Array | string
9595
): [response: Uint8Array, body: Uint8Array];
96+
97+
export function procedure_start_mut_tx(): bigint;
98+
99+
export function procedure_commit_mut_tx();
100+
101+
export function procedure_abort_mut_tx();
96102
}

crates/core/src/host/instance_env.rs

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -601,7 +601,11 @@ impl InstanceEnv {
601601
written
602602
}
603603

604-
pub async fn start_mutable_tx(&mut self) -> Result<(), NodesError> {
604+
// Async procedure syscalls return a `Result<impl Future>`, so that we can check `get_tx()`
605+
// *before* requiring an async runtime. Otherwise, the v8 module host would have to call
606+
// on `tokio::runtime::Handle::try_current()` before being able to run the `get_tx()` check.
607+
608+
pub fn start_mutable_tx(&mut self) -> Result<impl Future<Output = ()> + use<'_>, NodesError> {
605609
if self.get_tx().is_ok() {
606610
return Err(NodesError::WouldBlockTransaction(
607611
super::AbiCall::ProcedureStartMutTransaction,
@@ -610,11 +614,13 @@ impl InstanceEnv {
610614

611615
let stdb = self.replica_ctx.relational_db.clone();
612616
// TODO(procedure-tx): should we add a new workload, e.g., `AnonTx`?
613-
let tx = asyncify(move || stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal)).await;
614-
self.tx.set_raw(tx);
615-
self.in_anon_tx = true;
617+
let fut = async move {
618+
let tx = asyncify(move || stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal)).await;
619+
self.tx.set_raw(tx);
620+
self.in_anon_tx = true;
621+
};
616622

617-
Ok(())
623+
Ok(fut)
618624
}
619625

620626
/// Finishes an anonymous transaction,
@@ -637,7 +643,7 @@ impl InstanceEnv {
637643
// *before* requiring an async runtime. Otherwise, the v8 module host would have to call
638644
// on `tokio::runtime::Handle::try_current()` before being able to run the `get_tx()` check.
639645

640-
pub async fn commit_mutable_tx(&mut self) -> Result<(), NodesError> {
646+
pub fn commit_mutable_tx(&mut self) -> Result<impl Future<Output = ()> + use<'_>, NodesError> {
641647
self.finish_anon_tx()?;
642648

643649
let stdb = self.relational_db().clone();
@@ -660,10 +666,12 @@ impl InstanceEnv {
660666
// This is somewhat expensive,
661667
// and can block for a while,
662668
// so we need to asyncify it.
663-
let event = asyncify(move || commit_and_broadcast_event(&subs, None, event, tx)).await;
664-
self.procedure_last_tx_offset = Some(event.tx_offset);
669+
let fut = async move {
670+
let event = asyncify(move || commit_and_broadcast_event(&subs, None, event, tx)).await;
671+
self.procedure_last_tx_offset = Some(event.tx_offset);
672+
};
665673

666-
Ok(())
674+
Ok(fut)
667675
}
668676

669677
pub fn abort_mutable_tx(&mut self) -> Result<(), NodesError> {

crates/core/src/host/v8/syscall/v1.rs

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use crate::host::wasm_common::{err_to_errno_and_log, RowIterIdx, TimingSpan, Tim
2020
use crate::host::AbiCall;
2121
use anyhow::Context;
2222
use bytes::Bytes;
23-
use spacetimedb_lib::{bsatn, ConnectionId, Identity, RawModuleDef};
23+
use spacetimedb_lib::{bsatn, ConnectionId, Identity, RawModuleDef, Timestamp};
2424
use spacetimedb_primitives::{errno, ColId, IndexId, ProcedureId, ReducerId, TableId, ViewFnPtr};
2525
use spacetimedb_sats::Serialize;
2626
use v8::{
@@ -129,7 +129,22 @@ pub(super) fn sys_v1_2<'scope>(scope: &mut PinScope<'scope, '_>) -> Local<'scope
129129
with_sys_result_value,
130130
AbiCall::ProcedureHttpRequest,
131131
procedure_http_request
132-
)
132+
),
133+
(
134+
with_sys_result_value,
135+
AbiCall::ProcedureStartMutTransaction,
136+
procedure_start_mut_tx
137+
),
138+
(
139+
with_sys_result_noret,
140+
AbiCall::ProcedureAbortMutTransaction,
141+
procedure_abort_mut_tx
142+
),
143+
(
144+
with_sys_result_noret,
145+
AbiCall::ProcedureCommitMutTransaction,
146+
procedure_commit_mut_tx
147+
),
133148
)
134149
}
135150

@@ -1585,3 +1600,37 @@ fn procedure_http_request<'scope>(
15851600
&[response.into(), response_body.into()],
15861601
))
15871602
}
1603+
1604+
fn procedure_start_mut_tx<'scope>(
1605+
scope: &mut PinScope<'scope, '_>,
1606+
_args: FunctionCallbackArguments<'_>,
1607+
) -> SysCallResult<Local<'scope, v8::BigInt>> {
1608+
let env = get_env(scope)?;
1609+
1610+
let fut = env.instance_env.start_mutable_tx()?;
1611+
1612+
let rt = tokio::runtime::Handle::current();
1613+
rt.block_on(fut);
1614+
1615+
let timestamp = Timestamp::now().to_micros_since_unix_epoch() as u64;
1616+
1617+
Ok(v8::BigInt::new_from_u64(scope, timestamp))
1618+
}
1619+
1620+
fn procedure_abort_mut_tx(scope: &mut PinScope<'_, '_>, _args: FunctionCallbackArguments<'_>) -> SysCallResult<()> {
1621+
let env = get_env(scope)?;
1622+
1623+
env.instance_env.abort_mutable_tx()?;
1624+
Ok(())
1625+
}
1626+
1627+
fn procedure_commit_mut_tx(scope: &mut PinScope<'_, '_>, _args: FunctionCallbackArguments<'_>) -> SysCallResult<()> {
1628+
let env = get_env(scope)?;
1629+
1630+
let fut = env.instance_env.commit_mutable_tx()?;
1631+
1632+
let rt = tokio::runtime::Handle::current();
1633+
rt.block_on(fut);
1634+
1635+
Ok(())
1636+
}

crates/core/src/host/wasmtime/wasm_instance_env.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1465,7 +1465,11 @@ impl WasmInstanceEnv {
14651465
AbiCall::ProcedureStartMutTransaction,
14661466
move |mut caller| async move {
14671467
let (mem, env) = Self::mem_env(&mut caller);
1468-
let res = env.instance_env.start_mutable_tx().await.map_err(Into::into);
1468+
let res = async {
1469+
env.instance_env.start_mutable_tx()?.await;
1470+
Ok(())
1471+
}
1472+
.await;
14691473
let timestamp = Timestamp::now().to_micros_since_unix_epoch() as u64;
14701474
let res = res.and_then(|()| Ok(timestamp.write_to(mem, out)?));
14711475

@@ -1508,12 +1512,12 @@ impl WasmInstanceEnv {
15081512
|mut caller| async move {
15091513
let (_, env) = Self::mem_env(&mut caller);
15101514

1511-
let res = env
1512-
.instance_env
1513-
.commit_mutable_tx()
1514-
.await
1515-
.map(|()| 0u16.into())
1516-
.or_else(|err| Self::convert_wasm_result(AbiCall::ProcedureCommitMutTransaction, err.into()));
1515+
let res = async {
1516+
env.instance_env.commit_mutable_tx()?.await;
1517+
Ok(0u16.into())
1518+
}
1519+
.await
1520+
.or_else(|err| Self::convert_wasm_result(AbiCall::ProcedureCommitMutTransaction, err));
15171521

15181522
(caller, res)
15191523
},

0 commit comments

Comments
 (0)