Skip to content

Commit e73085c

Browse files
committed
feat(transaction): implement transaction handling with sequential operations and validation
1 parent d49c39e commit e73085c

2 files changed

Lines changed: 322 additions & 0 deletions

File tree

packages/server/src/api/rpc/index.ts

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import z from 'zod';
77
import { fromError } from 'zod-validation-error/v4';
88
import type { ApiHandler, LogConfig, RequestContext, Response } from '../../types';
99
import { getProcedureDef, mapProcedureArgs, PROCEDURE_ROUTE_PREFIXES } from '../common/procedures';
10+
11+
const TRANSACTION_ROUTE_PREFIX = '$transaction' as const;
1012
import { loggerSchema } from '../common/schemas';
1113
import { processSuperJsonRequestPayload, unmarshalQ } from '../common/utils';
1214
import { log, registerCustomSerializers } from '../utils';
@@ -71,6 +73,14 @@ export class RPCApiHandler<Schema extends SchemaDef = SchemaDef> implements ApiH
7173
});
7274
}
7375

76+
if (model === TRANSACTION_ROUTE_PREFIX) {
77+
return this.handleTransaction({
78+
client,
79+
method: method.toUpperCase(),
80+
requestBody,
81+
});
82+
}
83+
7484
model = lowerCaseFirst(model);
7585
method = method.toUpperCase();
7686
let args: unknown;
@@ -185,6 +195,99 @@ export class RPCApiHandler<Schema extends SchemaDef = SchemaDef> implements ApiH
185195
}
186196
}
187197

198+
private async handleTransaction({
199+
client,
200+
method,
201+
requestBody,
202+
}: {
203+
client: ClientContract<Schema>;
204+
method: string;
205+
requestBody?: unknown;
206+
}): Promise<Response> {
207+
if (method !== 'POST') {
208+
return this.makeBadInputErrorResponse('invalid request method, only POST is supported');
209+
}
210+
211+
if (!requestBody || !Array.isArray(requestBody) || requestBody.length === 0) {
212+
return this.makeBadInputErrorResponse('request body must be a non-empty array of operations');
213+
}
214+
215+
const VALID_OPS = new Set([
216+
'create',
217+
'createMany',
218+
'createManyAndReturn',
219+
'upsert',
220+
'findFirst',
221+
'findUnique',
222+
'findMany',
223+
'aggregate',
224+
'groupBy',
225+
'count',
226+
'exists',
227+
'update',
228+
'updateMany',
229+
'updateManyAndReturn',
230+
'delete',
231+
'deleteMany',
232+
]);
233+
234+
for (let i = 0; i < requestBody.length; i++) {
235+
const item = requestBody[i];
236+
if (!item || typeof item !== 'object') {
237+
return this.makeBadInputErrorResponse(`operation at index ${i} must be an object`);
238+
}
239+
const { model: itemModel, op: itemOp, args: itemArgs } = item as any;
240+
if (!itemModel || typeof itemModel !== 'string') {
241+
return this.makeBadInputErrorResponse(`operation at index ${i} is missing a valid "model" field`);
242+
}
243+
if (!itemOp || typeof itemOp !== 'string') {
244+
return this.makeBadInputErrorResponse(`operation at index ${i} is missing a valid "op" field`);
245+
}
246+
if (!VALID_OPS.has(itemOp)) {
247+
return this.makeBadInputErrorResponse(`operation at index ${i} has invalid op: ${itemOp}`);
248+
}
249+
if (!this.isValidModel(client, lowerCaseFirst(itemModel))) {
250+
return this.makeBadInputErrorResponse(`operation at index ${i} has unknown model: ${itemModel}`);
251+
}
252+
if (itemArgs !== undefined && itemArgs !== null && typeof itemArgs !== 'object') {
253+
return this.makeBadInputErrorResponse(`operation at index ${i} has invalid "args" field`);
254+
}
255+
}
256+
257+
const promises = (requestBody as any[]).map((item) => {
258+
const model = lowerCaseFirst(item.model as string);
259+
const op = item.op as string;
260+
const args = item.args ?? {};
261+
return (client as any)[model][op](args);
262+
});
263+
264+
try {
265+
log(this.options.log, 'debug', () => `handling "$transaction" request with ${promises.length} operations`);
266+
267+
const clientResult = await client.$transaction(promises as any);
268+
269+
const { json, meta } = SuperJSON.serialize(clientResult);
270+
const responseBody: any = { data: json };
271+
if (meta) {
272+
responseBody.meta = { serialization: meta };
273+
}
274+
275+
const response = { status: 200, body: responseBody };
276+
log(
277+
this.options.log,
278+
'debug',
279+
() => `sending response for "$transaction" request: ${safeJSONStringify(response)}`,
280+
);
281+
return response;
282+
} catch (err) {
283+
log(this.options.log, 'error', `error occurred when handling "$transaction" request`, err);
284+
if (err instanceof ORMError) {
285+
return this.makeORMErrorResponse(err);
286+
}
287+
return this.makeGenericErrorResponse(err);
288+
}
289+
}
290+
188291
private async handleProcedureRequest({
189292
client,
190293
method,

packages/server/test/api/rpc.test.ts

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -763,6 +763,225 @@ procedure echoOverview(o: Overview): Overview
763763
expect(r.data.stringList).toEqual(['d', 'e', 'f']);
764764
});
765765

766+
describe('transaction', () => {
767+
it('runs sequential operations atomically', async () => {
768+
const handleRequest = makeHandler();
769+
770+
// Clean up
771+
await rawClient.post.deleteMany();
772+
await rawClient.user.deleteMany();
773+
774+
const r = await handleRequest({
775+
method: 'post',
776+
path: '/$transaction/sequential',
777+
requestBody: [
778+
{
779+
model: 'User',
780+
op: 'create',
781+
args: { data: { id: 'txuser1', email: 'txuser1@abc.com' } },
782+
},
783+
{
784+
model: 'Post',
785+
op: 'create',
786+
args: { data: { id: 'txpost1', title: 'Tx Post', authorId: 'txuser1' } },
787+
},
788+
{
789+
model: 'Post',
790+
op: 'findMany',
791+
args: { where: { authorId: 'txuser1' } },
792+
},
793+
],
794+
client: rawClient,
795+
});
796+
797+
expect(r.status).toBe(200);
798+
expect(Array.isArray(r.data)).toBe(true);
799+
expect(r.data).toHaveLength(3);
800+
expect(r.data[0]).toMatchObject({ id: 'txuser1', email: 'txuser1@abc.com' });
801+
expect(r.data[1]).toMatchObject({ id: 'txpost1', title: 'Tx Post' });
802+
expect(r.data[2]).toHaveLength(1);
803+
expect(r.data[2][0]).toMatchObject({ id: 'txpost1' });
804+
805+
// Clean up
806+
await rawClient.post.deleteMany();
807+
await rawClient.user.deleteMany();
808+
});
809+
810+
it('rejects non-POST methods', async () => {
811+
const handleRequest = makeHandler();
812+
813+
const r = await handleRequest({
814+
method: 'get',
815+
path: '/$transaction/sequential',
816+
client: rawClient,
817+
});
818+
expect(r.status).toBe(400);
819+
expect(r.error.message).toMatch(/only POST is supported/i);
820+
});
821+
822+
it('rejects missing or non-array body', async () => {
823+
const handleRequest = makeHandler();
824+
825+
let r = await handleRequest({
826+
method: 'post',
827+
path: '/$transaction/sequential',
828+
client: rawClient,
829+
});
830+
expect(r.status).toBe(400);
831+
expect(r.error.message).toMatch(/non-empty array/i);
832+
833+
r = await handleRequest({
834+
method: 'post',
835+
path: '/$transaction/sequential',
836+
requestBody: [],
837+
client: rawClient,
838+
});
839+
expect(r.status).toBe(400);
840+
expect(r.error.message).toMatch(/non-empty array/i);
841+
842+
r = await handleRequest({
843+
method: 'post',
844+
path: '/$transaction/sequential',
845+
requestBody: { model: 'User', op: 'findMany', args: {} },
846+
client: rawClient,
847+
});
848+
expect(r.status).toBe(400);
849+
expect(r.error.message).toMatch(/non-empty array/i);
850+
});
851+
852+
it('rejects unknown model in operation', async () => {
853+
const handleRequest = makeHandler();
854+
855+
const r = await handleRequest({
856+
method: 'post',
857+
path: '/$transaction/sequential',
858+
requestBody: [{ model: 'Ghost', op: 'create', args: { data: {} } }],
859+
client: rawClient,
860+
});
861+
expect(r.status).toBe(400);
862+
expect(r.error.message).toMatch(/unknown model/i);
863+
});
864+
865+
it('rejects invalid op in operation', async () => {
866+
const handleRequest = makeHandler();
867+
868+
const r = await handleRequest({
869+
method: 'post',
870+
path: '/$transaction/sequential',
871+
requestBody: [{ model: 'User', op: 'dropTable', args: {} }],
872+
client: rawClient,
873+
});
874+
expect(r.status).toBe(400);
875+
expect(r.error.message).toMatch(/invalid op/i);
876+
});
877+
878+
it('rejects operation missing model or op field', async () => {
879+
const handleRequest = makeHandler();
880+
881+
let r = await handleRequest({
882+
method: 'post',
883+
path: '/$transaction/sequential',
884+
requestBody: [{ op: 'create', args: { data: {} } }],
885+
client: rawClient,
886+
});
887+
expect(r.status).toBe(400);
888+
expect(r.error.message).toMatch(/"model"/i);
889+
890+
r = await handleRequest({
891+
method: 'post',
892+
path: '/$transaction/sequential',
893+
requestBody: [{ model: 'User', args: { data: {} } }],
894+
client: rawClient,
895+
});
896+
expect(r.status).toBe(400);
897+
expect(r.error.message).toMatch(/"op"/i);
898+
});
899+
900+
it('returns error for invalid args (non-existent field in where clause)', async () => {
901+
const handleRequest = makeHandler();
902+
903+
// findMany with a non-existent field in where → ORM validation error
904+
let r = await handleRequest({
905+
method: 'post',
906+
path: '/$transaction/sequential',
907+
requestBody: [
908+
{
909+
model: 'User',
910+
op: 'findMany',
911+
args: { where: { nonExistentField: 'value' } },
912+
},
913+
],
914+
client: rawClient,
915+
});
916+
expect(r.status).toBe(422);
917+
expect(r.error.message).toMatch(/validation error/i);
918+
919+
// findUnique missing required where clause → ORM validation error
920+
r = await handleRequest({
921+
method: 'post',
922+
path: '/$transaction/sequential',
923+
requestBody: [
924+
{
925+
model: 'Post',
926+
op: 'findUnique',
927+
args: {},
928+
},
929+
],
930+
client: rawClient,
931+
});
932+
expect(r.status).toBe(422);
933+
expect(r.error.message).toMatch(/validation error/i);
934+
935+
// create with missing required field → ORM validation error
936+
r = await handleRequest({
937+
method: 'post',
938+
path: '/$transaction/sequential',
939+
requestBody: [
940+
{
941+
model: 'Post',
942+
op: 'create',
943+
// title is required but omitted
944+
args: { data: {} },
945+
},
946+
],
947+
client: rawClient,
948+
});
949+
expect(r.status).toBe(422);
950+
expect(r.error.message).toMatch(/validation error/i);
951+
});
952+
953+
it('rolls back all operations when one fails', async () => {
954+
const handleRequest = makeHandler();
955+
956+
// Ensure no users before
957+
await rawClient.user.deleteMany();
958+
959+
const r = await handleRequest({
960+
method: 'post',
961+
path: '/$transaction/sequential',
962+
requestBody: [
963+
{
964+
model: 'User',
965+
op: 'create',
966+
args: { data: { id: 'txuser2', email: 'txuser2@abc.com' } },
967+
},
968+
// duplicate id will cause a DB error → whole tx rolls back
969+
{
970+
model: 'User',
971+
op: 'create',
972+
args: { data: { id: 'txuser2', email: 'txuser2@abc.com' } },
973+
},
974+
],
975+
client: rawClient,
976+
});
977+
expect(r.status).toBeGreaterThanOrEqual(400);
978+
979+
// User should not have been committed
980+
const count = await rawClient.user.count();
981+
expect(count).toBe(0);
982+
});
983+
});
984+
766985
function makeHandler() {
767986
const handler = new RPCApiHandler({ schema: client.$schema });
768987
return async (args: any) => {

0 commit comments

Comments
 (0)