Skip to content

Commit 17922f0

Browse files
feat(transaction): implement transaction handling with sequential operations and validation (#2399)
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
1 parent f2c567b commit 17922f0

File tree

2 files changed

+366
-1
lines changed

2 files changed

+366
-1
lines changed

packages/server/src/api/rpc/index.ts

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { lowerCaseFirst, safeJSONStringify } from '@zenstackhq/common-helpers';
2-
import { ORMError, ORMErrorReason, type ClientContract } from '@zenstackhq/orm';
2+
import { CoreCrudOperations, ORMError, ORMErrorReason, type ClientContract } from '@zenstackhq/orm';
33
import type { SchemaDef } from '@zenstackhq/orm/schema';
44
import SuperJSON from 'superjson';
55
import { match } from 'ts-pattern';
@@ -11,6 +11,9 @@ import { loggerSchema } from '../common/schemas';
1111
import { processSuperJsonRequestPayload, unmarshalQ } from '../common/utils';
1212
import { log, registerCustomSerializers } from '../utils';
1313

14+
const TRANSACTION_ROUTE_PREFIX = '$transaction' as const;
15+
const VALID_OPS = new Set(CoreCrudOperations as unknown as string[]);
16+
1417
registerCustomSerializers();
1518

1619
/**
@@ -71,6 +74,15 @@ export class RPCApiHandler<Schema extends SchemaDef = SchemaDef> implements ApiH
7174
});
7275
}
7376

77+
if (model === TRANSACTION_ROUTE_PREFIX) {
78+
return this.handleTransaction({
79+
client,
80+
method: method.toUpperCase(),
81+
type: op,
82+
requestBody,
83+
});
84+
}
85+
7486
model = lowerCaseFirst(model);
7587
method = method.toUpperCase();
7688
let args: unknown;
@@ -185,6 +197,91 @@ export class RPCApiHandler<Schema extends SchemaDef = SchemaDef> implements ApiH
185197
}
186198
}
187199

200+
private async handleTransaction({
201+
client,
202+
method,
203+
type,
204+
requestBody,
205+
}: {
206+
client: ClientContract<Schema>;
207+
method: string;
208+
type: string;
209+
requestBody?: unknown;
210+
}): Promise<Response> {
211+
if (method !== 'POST') {
212+
return this.makeBadInputErrorResponse('invalid request method, only POST is supported');
213+
}
214+
215+
if (type !== 'sequential') {
216+
return this.makeBadInputErrorResponse(`unsupported transaction type: ${type}`);
217+
}
218+
219+
if (!requestBody || !Array.isArray(requestBody) || requestBody.length === 0) {
220+
return this.makeBadInputErrorResponse('request body must be a non-empty array of operations');
221+
}
222+
223+
const processedOps: Array<{ model: string; op: string; args: unknown }> = [];
224+
225+
for (let i = 0; i < requestBody.length; i++) {
226+
const item = requestBody[i];
227+
if (!item || typeof item !== 'object') {
228+
return this.makeBadInputErrorResponse(`operation at index ${i} must be an object`);
229+
}
230+
const { model: itemModel, op: itemOp, args: itemArgs } = item as any;
231+
if (!itemModel || typeof itemModel !== 'string') {
232+
return this.makeBadInputErrorResponse(`operation at index ${i} is missing a valid "model" field`);
233+
}
234+
if (!itemOp || typeof itemOp !== 'string') {
235+
return this.makeBadInputErrorResponse(`operation at index ${i} is missing a valid "op" field`);
236+
}
237+
if (!VALID_OPS.has(itemOp)) {
238+
return this.makeBadInputErrorResponse(`operation at index ${i} has invalid op: ${itemOp}`);
239+
}
240+
if (!this.isValidModel(client, lowerCaseFirst(itemModel))) {
241+
return this.makeBadInputErrorResponse(`operation at index ${i} has unknown model: ${itemModel}`);
242+
}
243+
if (itemArgs !== undefined && itemArgs !== null && (typeof itemArgs !== 'object' || Array.isArray(itemArgs))) {
244+
return this.makeBadInputErrorResponse(`operation at index ${i} has invalid "args" field`);
245+
}
246+
247+
const { result: processedArgs, error: argsError } = await this.processRequestPayload(itemArgs ?? {});
248+
if (argsError) {
249+
return this.makeBadInputErrorResponse(`operation at index ${i}: ${argsError}`);
250+
}
251+
processedOps.push({ model: lowerCaseFirst(itemModel), op: itemOp, args: processedArgs });
252+
}
253+
254+
try {
255+
const promises = processedOps.map(({ model, op, args }) => {
256+
return (client as any)[model][op](args);
257+
});
258+
259+
log(this.options.log, 'debug', () => `handling "$transaction" request with ${promises.length} operations`);
260+
261+
const clientResult = await client.$transaction(promises as any);
262+
263+
const { json, meta } = SuperJSON.serialize(clientResult);
264+
const responseBody: any = { data: json };
265+
if (meta) {
266+
responseBody.meta = { serialization: meta };
267+
}
268+
269+
const response = { status: 200, body: responseBody };
270+
log(
271+
this.options.log,
272+
'debug',
273+
() => `sending response for "$transaction" request: ${safeJSONStringify(response)}`,
274+
);
275+
return response;
276+
} catch (err) {
277+
log(this.options.log, 'error', `error occurred when handling "$transaction" request`, err);
278+
if (err instanceof ORMError) {
279+
return this.makeORMErrorResponse(err);
280+
}
281+
return this.makeGenericErrorResponse(err);
282+
}
283+
}
284+
188285
private async handleProcedureRequest({
189286
client,
190287
method,

packages/server/test/api/rpc.test.ts

Lines changed: 268 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -763,6 +763,274 @@ procedure echoOverview(o: Overview): Overview
763763
expect(r.data.stringList).toEqual(['d', 'e', 'f']);
764764
});
765765

766+
describe('transaction', () => {
767+
it('runs sequential operations atomically', async () => {
768+
const handleRequest = makeHandler();
769+
770+
// Clean up
771+
await rawClient.post.deleteMany();
772+
await rawClient.user.deleteMany();
773+
774+
const r = await handleRequest({
775+
method: 'post',
776+
path: '/$transaction/sequential',
777+
requestBody: [
778+
{
779+
model: 'User',
780+
op: 'create',
781+
args: { data: { id: 'txuser1', email: 'txuser1@abc.com' } },
782+
},
783+
{
784+
model: 'Post',
785+
op: 'create',
786+
args: { data: { id: 'txpost1', title: 'Tx Post', authorId: 'txuser1' } },
787+
},
788+
{
789+
model: 'Post',
790+
op: 'findMany',
791+
args: { where: { authorId: 'txuser1' } },
792+
},
793+
],
794+
client: rawClient,
795+
});
796+
797+
expect(r.status).toBe(200);
798+
expect(Array.isArray(r.data)).toBe(true);
799+
expect(r.data).toHaveLength(3);
800+
expect(r.data[0]).toMatchObject({ id: 'txuser1', email: 'txuser1@abc.com' });
801+
expect(r.data[1]).toMatchObject({ id: 'txpost1', title: 'Tx Post' });
802+
expect(r.data[2]).toHaveLength(1);
803+
expect(r.data[2][0]).toMatchObject({ id: 'txpost1' });
804+
805+
// Clean up
806+
await rawClient.post.deleteMany();
807+
await rawClient.user.deleteMany();
808+
});
809+
810+
it('rejects non-POST methods', async () => {
811+
const handleRequest = makeHandler();
812+
813+
const r = await handleRequest({
814+
method: 'get',
815+
path: '/$transaction/sequential',
816+
client: rawClient,
817+
});
818+
expect(r.status).toBe(400);
819+
expect(r.error.message).toMatch(/only POST is supported/i);
820+
});
821+
822+
it('rejects missing or non-array body', async () => {
823+
const handleRequest = makeHandler();
824+
825+
let r = await handleRequest({
826+
method: 'post',
827+
path: '/$transaction/sequential',
828+
client: rawClient,
829+
});
830+
expect(r.status).toBe(400);
831+
expect(r.error.message).toMatch(/non-empty array/i);
832+
833+
r = await handleRequest({
834+
method: 'post',
835+
path: '/$transaction/sequential',
836+
requestBody: [],
837+
client: rawClient,
838+
});
839+
expect(r.status).toBe(400);
840+
expect(r.error.message).toMatch(/non-empty array/i);
841+
842+
r = await handleRequest({
843+
method: 'post',
844+
path: '/$transaction/sequential',
845+
requestBody: { model: 'User', op: 'findMany', args: {} },
846+
client: rawClient,
847+
});
848+
expect(r.status).toBe(400);
849+
expect(r.error.message).toMatch(/non-empty array/i);
850+
});
851+
852+
it('rejects unknown model in operation', async () => {
853+
const handleRequest = makeHandler();
854+
855+
const r = await handleRequest({
856+
method: 'post',
857+
path: '/$transaction/sequential',
858+
requestBody: [{ model: 'Ghost', op: 'create', args: { data: {} } }],
859+
client: rawClient,
860+
});
861+
expect(r.status).toBe(400);
862+
expect(r.error.message).toMatch(/unknown model/i);
863+
});
864+
865+
it('rejects invalid op in operation', async () => {
866+
const handleRequest = makeHandler();
867+
868+
const r = await handleRequest({
869+
method: 'post',
870+
path: '/$transaction/sequential',
871+
requestBody: [{ model: 'User', op: 'dropTable', args: {} }],
872+
client: rawClient,
873+
});
874+
expect(r.status).toBe(400);
875+
expect(r.error.message).toMatch(/invalid op/i);
876+
});
877+
878+
it('rejects operation missing model or op field', async () => {
879+
const handleRequest = makeHandler();
880+
881+
let r = await handleRequest({
882+
method: 'post',
883+
path: '/$transaction/sequential',
884+
requestBody: [{ op: 'create', args: { data: {} } }],
885+
client: rawClient,
886+
});
887+
expect(r.status).toBe(400);
888+
expect(r.error.message).toMatch(/"model"/i);
889+
890+
r = await handleRequest({
891+
method: 'post',
892+
path: '/$transaction/sequential',
893+
requestBody: [{ model: 'User', args: { data: {} } }],
894+
client: rawClient,
895+
});
896+
expect(r.status).toBe(400);
897+
expect(r.error.message).toMatch(/"op"/i);
898+
});
899+
900+
it('returns error for invalid args (non-existent field in where clause)', async () => {
901+
const handleRequest = makeHandler();
902+
903+
// findMany with a non-existent field in where → ORM validation error
904+
let r = await handleRequest({
905+
method: 'post',
906+
path: '/$transaction/sequential',
907+
requestBody: [
908+
{
909+
model: 'User',
910+
op: 'findMany',
911+
args: { where: { nonExistentField: 'value' } },
912+
},
913+
],
914+
client: rawClient,
915+
});
916+
expect(r.status).toBe(422);
917+
expect(r.error.message).toMatch(/validation error/i);
918+
919+
// findUnique missing required where clause → ORM validation error
920+
r = await handleRequest({
921+
method: 'post',
922+
path: '/$transaction/sequential',
923+
requestBody: [
924+
{
925+
model: 'Post',
926+
op: 'findUnique',
927+
args: {},
928+
},
929+
],
930+
client: rawClient,
931+
});
932+
expect(r.status).toBe(422);
933+
expect(r.error.message).toMatch(/validation error/i);
934+
935+
// create with missing required field → ORM validation error
936+
r = await handleRequest({
937+
method: 'post',
938+
path: '/$transaction/sequential',
939+
requestBody: [
940+
{
941+
model: 'Post',
942+
op: 'create',
943+
// title is required but omitted
944+
args: { data: {} },
945+
},
946+
],
947+
client: rawClient,
948+
});
949+
expect(r.status).toBe(422);
950+
expect(r.error.message).toMatch(/validation error/i);
951+
});
952+
953+
it('deserializes SuperJSON-encoded args per operation', async () => {
954+
const handleRequest = makeHandler();
955+
956+
// Clean up
957+
await rawClient.post.deleteMany();
958+
await rawClient.user.deleteMany();
959+
960+
// Serialize args containing a Date so they need SuperJSON deserialization
961+
const publishedAt = new Date('2025-01-15T00:00:00.000Z');
962+
const serialized = SuperJSON.serialize({
963+
data: { id: 'txuser3', email: 'txuser3@abc.com' },
964+
});
965+
const serializedPost = SuperJSON.serialize({
966+
data: { id: 'txpost3', title: 'Dated Post', authorId: 'txuser3', publishedAt },
967+
});
968+
969+
const r = await handleRequest({
970+
method: 'post',
971+
path: '/$transaction/sequential',
972+
requestBody: [
973+
{
974+
model: 'User',
975+
op: 'create',
976+
args: { ...(serialized.json as any), meta: { serialization: serialized.meta } },
977+
},
978+
{
979+
model: 'Post',
980+
op: 'create',
981+
args: { ...(serializedPost.json as any), meta: { serialization: serializedPost.meta } },
982+
},
983+
],
984+
client: rawClient,
985+
});
986+
987+
expect(r.status).toBe(200);
988+
expect(r.data).toHaveLength(2);
989+
expect(r.data[0]).toMatchObject({ id: 'txuser3' });
990+
expect(r.data[1]).toMatchObject({ id: 'txpost3' });
991+
992+
// Verify the Date was stored correctly
993+
const post = await (rawClient as any).post.findUnique({ where: { id: 'txpost3' } });
994+
expect(post?.publishedAt instanceof Date).toBe(true);
995+
expect((post?.publishedAt as Date)?.toISOString()).toBe(publishedAt.toISOString());
996+
997+
// Clean up
998+
await rawClient.post.deleteMany();
999+
await rawClient.user.deleteMany();
1000+
});
1001+
1002+
it('rolls back all operations when one fails', async () => {
1003+
const handleRequest = makeHandler();
1004+
1005+
// Ensure no users before
1006+
await rawClient.user.deleteMany();
1007+
1008+
const r = await handleRequest({
1009+
method: 'post',
1010+
path: '/$transaction/sequential',
1011+
requestBody: [
1012+
{
1013+
model: 'User',
1014+
op: 'create',
1015+
args: { data: { id: 'txuser2', email: 'txuser2@abc.com' } },
1016+
},
1017+
// duplicate id will cause a DB error → whole tx rolls back
1018+
{
1019+
model: 'User',
1020+
op: 'create',
1021+
args: { data: { id: 'txuser2', email: 'txuser2@abc.com' } },
1022+
},
1023+
],
1024+
client: rawClient,
1025+
});
1026+
expect(r.status).toBeGreaterThanOrEqual(400);
1027+
1028+
// User should not have been committed
1029+
const count = await rawClient.user.count();
1030+
expect(count).toBe(0);
1031+
});
1032+
});
1033+
7661034
function makeHandler() {
7671035
const handler = new RPCApiHandler({ schema: client.$schema });
7681036
return async (args: any) => {

0 commit comments

Comments
 (0)