Skip to content

Commit 3b829a6

Browse files
committed
feat: batch mutations
1 parent 7e414b6 commit 3b829a6

22 files changed

Lines changed: 768 additions & 407 deletions

packages/entity-database-adapter-knex-testing-utils/src/StubPostgresDatabaseAdapter.ts

Lines changed: 49 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -242,75 +242,82 @@ export class StubPostgresDatabaseAdapter<
242242
}
243243
}
244244

245-
protected async insertInternalAsync(
245+
protected async insertManyInternalAsync(
246246
_queryInterface: any,
247247
tableName: string,
248-
object: object,
248+
objects: readonly object[],
249249
): Promise<object[]> {
250250
const objectCollection = this.getObjectCollectionForTable(tableName);
251251

252252
const idField = getDatabaseFieldForEntityField(
253253
this.entityConfiguration2,
254254
this.entityConfiguration2.idField,
255255
);
256-
const objectToInsert = {
257-
[idField]: this.generateRandomID(),
258-
...object,
259-
};
260-
objectCollection.push(objectToInsert);
261-
return [objectToInsert];
256+
const insertedObjects: object[] = [];
257+
for (const object of objects) {
258+
const objectToInsert = {
259+
[idField]: this.generateRandomID(),
260+
...object,
261+
};
262+
objectCollection.push(objectToInsert);
263+
insertedObjects.push(objectToInsert);
264+
}
265+
return insertedObjects;
262266
}
263267

264-
protected async updateInternalAsync(
268+
protected async updateManyInternalAsync(
265269
_queryInterface: any,
266270
tableName: string,
267271
tableIdField: string,
268-
id: any,
269-
object: object,
270-
): Promise<{ updatedRowCount: number }> {
271-
// SQL does not support empty updates, mirror behavior here for better test simulation
272-
if (Object.keys(object).length === 0) {
273-
throw new Error(`Empty update (${tableIdField} = ${id})`);
274-
}
272+
items: readonly { id: any; object: object }[],
273+
): Promise<readonly { updatedRowCount: number }[]> {
274+
const results: { updatedRowCount: number }[] = [];
275+
for (const item of items) {
276+
if (Object.keys(item.object).length === 0) {
277+
throw new Error(`Empty update (${tableIdField} = ${item.id})`);
278+
}
275279

276-
const objectCollection = this.getObjectCollectionForTable(tableName);
280+
const objectCollection = this.getObjectCollectionForTable(tableName);
277281

278-
const objectIndex = objectCollection.findIndex((obj) => {
279-
return obj[tableIdField] === id;
280-
});
282+
const objectIndex = objectCollection.findIndex((obj) => {
283+
return obj[tableIdField] === item.id;
284+
});
281285

282-
// SQL updates to a nonexistent row succeed but affect 0 rows,
283-
// mirror that behavior here for better test simulation
284-
if (objectIndex < 0) {
285-
return { updatedRowCount: 0 };
286-
}
286+
if (objectIndex < 0) {
287+
results.push({ updatedRowCount: 0 });
288+
continue;
289+
}
287290

288-
objectCollection[objectIndex] = {
289-
...objectCollection[objectIndex],
290-
...object,
291-
};
292-
return { updatedRowCount: 1 };
291+
objectCollection[objectIndex] = {
292+
...objectCollection[objectIndex],
293+
...item.object,
294+
};
295+
results.push({ updatedRowCount: 1 });
296+
}
297+
return results;
293298
}
294299

295-
protected async deleteInternalAsync(
300+
protected async deleteManyInternalAsync(
296301
_queryInterface: any,
297302
tableName: string,
298303
tableIdField: string,
299-
id: any,
304+
ids: readonly any[],
300305
): Promise<number> {
301306
const objectCollection = this.getObjectCollectionForTable(tableName);
307+
let numDeleted = 0;
302308

303-
const objectIndex = objectCollection.findIndex((obj) => {
304-
return obj[tableIdField] === id;
305-
});
309+
for (const id of ids) {
310+
const objectIndex = objectCollection.findIndex((obj) => {
311+
return obj[tableIdField] === id;
312+
});
306313

307-
// SQL deletes to a nonexistent row succeed and affect 0 rows,
308-
// mirror that behavior here for better test simulation
309-
if (objectIndex < 0) {
310-
return 0;
311-
}
314+
if (objectIndex < 0) {
315+
continue;
316+
}
312317

313-
objectCollection.splice(objectIndex, 1);
314-
return 1;
318+
objectCollection.splice(objectIndex, 1);
319+
numDeleted++;
320+
}
321+
return numDeleted;
315322
}
316323
}

packages/entity-database-adapter-knex-testing-utils/src/__tests__/StubPostgresDatabaseAdapter-test.ts

Lines changed: 35 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -577,17 +577,19 @@ describe(StubPostgresDatabaseAdapter, () => {
577577
});
578578
});
579579

580-
describe('insertAsync', () => {
580+
describe('insertManyAsync', () => {
581581
it('inserts a record', async () => {
582582
const queryContext = instance(mock(EntityQueryContext));
583583
const databaseAdapter = new StubPostgresDatabaseAdapter<TestFields, 'customIdField'>(
584584
testEntityConfiguration,
585585
new Map(),
586586
);
587-
const result = await databaseAdapter.insertAsync(queryContext, {
588-
stringField: 'hello',
589-
});
590-
expect(result).toMatchObject({
587+
const result = await databaseAdapter.insertManyAsync(queryContext, [
588+
{
589+
stringField: 'hello',
590+
},
591+
]);
592+
expect(result[0]).toMatchObject({
591593
stringField: 'hello',
592594
});
593595

@@ -602,16 +604,18 @@ describe(StubPostgresDatabaseAdapter, () => {
602604
testEntityConfiguration,
603605
new Map(),
604606
);
605-
const result = await databaseAdapter.insertAsync(queryContext, {
606-
stringField: 'hello',
607-
});
607+
const result = await databaseAdapter.insertManyAsync(queryContext, [
608+
{
609+
stringField: 'hello',
610+
},
611+
]);
608612

609-
const ts = getTimeFromUUIDv7(result.customIdField);
613+
const ts = getTimeFromUUIDv7(result[0]!.customIdField);
610614
expect(ts).toEqual(expectedTime);
611615
});
612616
});
613617

614-
describe('updateAsync', () => {
618+
describe('updateManyAsync', () => {
615619
it('updates a record', async () => {
616620
const queryContext = instance(mock(EntityQueryContext));
617621
const databaseAdapter = new StubPostgresDatabaseAdapter<TestFields, 'customIdField'>(
@@ -635,9 +639,9 @@ describe(StubPostgresDatabaseAdapter, () => {
635639
]),
636640
),
637641
);
638-
await databaseAdapter.updateAsync(queryContext, 'customIdField', 'hello', {
639-
stringField: 'b',
640-
});
642+
await databaseAdapter.updateManyAsync(queryContext, 'customIdField', [
643+
{ id: 'hello', object: { stringField: 'b' } },
644+
]);
641645
});
642646

643647
it('throws error when empty update to match common DBMS behavior', async () => {
@@ -664,7 +668,9 @@ describe(StubPostgresDatabaseAdapter, () => {
664668
),
665669
);
666670
await expect(
667-
databaseAdapter.updateAsync(queryContext, 'customIdField', 'hello', {}),
671+
databaseAdapter.updateManyAsync(queryContext, 'customIdField', [
672+
{ id: 'hello', object: {} },
673+
]),
668674
).rejects.toThrow(`Empty update (custom_id = hello)`);
669675
});
670676

@@ -694,17 +700,17 @@ describe(StubPostgresDatabaseAdapter, () => {
694700

695701
// Try to update a record that doesn't exist
696702
await expect(
697-
databaseAdapter.updateAsync(
698-
queryContext,
699-
'customIdField',
700-
'nonexistent-id', // This ID doesn't exist in the data store
701-
{ stringField: 'updated' },
702-
),
703+
databaseAdapter.updateManyAsync(queryContext, 'customIdField', [
704+
{
705+
id: 'nonexistent-id', // This ID doesn't exist in the data store
706+
object: { stringField: 'updated' },
707+
},
708+
]),
703709
).rejects.toThrow('Empty results from database adapter update');
704710
});
705711
});
706712

707-
describe('deleteAsync', () => {
713+
describe('deleteManyAsync', () => {
708714
it('deletes an object', async () => {
709715
const queryContext = instance(mock(EntityQueryContext));
710716
const databaseAdapter = new StubPostgresDatabaseAdapter<TestFields, 'customIdField'>(
@@ -729,7 +735,7 @@ describe(StubPostgresDatabaseAdapter, () => {
729735
),
730736
);
731737

732-
await databaseAdapter.deleteAsync(queryContext, 'customIdField', 'hello');
738+
await databaseAdapter.deleteManyAsync(queryContext, 'customIdField', ['hello']);
733739

734740
expect(
735741
databaseAdapter.getObjectCollectionForTable(testEntityConfiguration.tableName),
@@ -762,11 +768,9 @@ describe(StubPostgresDatabaseAdapter, () => {
762768

763769
// Delete a record that doesn't exist
764770
// Unlike update, delete doesn't throw an error when the record doesn't exist
765-
await databaseAdapter.deleteAsync(
766-
queryContext,
767-
'customIdField',
771+
await databaseAdapter.deleteManyAsync(queryContext, 'customIdField', [
768772
'nonexistent-id', // This ID doesn't exist in the data store
769-
);
773+
]);
770774
});
771775
});
772776

@@ -776,21 +780,21 @@ describe(StubPostgresDatabaseAdapter, () => {
776780
simpleTestEntityConfiguration,
777781
new Map(),
778782
);
779-
const insertedObject1 = await databaseAdapter1.insertAsync(queryContext, {});
780-
expect(typeof insertedObject1.id).toBe('string');
783+
const insertedObjects1 = await databaseAdapter1.insertManyAsync(queryContext, [{}]);
784+
expect(typeof insertedObjects1[0]!.id).toBe('string');
781785

782786
const databaseAdapter2 = new StubPostgresDatabaseAdapter<NumberKeyFields, 'id'>(
783787
numberKeyEntityConfiguration,
784788
new Map(),
785789
);
786-
const insertedObject2 = await databaseAdapter2.insertAsync(queryContext, {});
787-
expect(typeof insertedObject2.id).toBe('number');
790+
const insertedObjects2 = await databaseAdapter2.insertManyAsync(queryContext, [{}]);
791+
expect(typeof insertedObjects2[0]!.id).toBe('number');
788792

789793
const databaseAdapter3 = new StubPostgresDatabaseAdapter<DateIDTestFields, 'id'>(
790794
dateIDTestEntityConfiguration,
791795
new Map(),
792796
);
793-
await expect(databaseAdapter3.insertAsync(queryContext, {})).rejects.toThrow(
797+
await expect(databaseAdapter3.insertManyAsync(queryContext, [{}])).rejects.toThrow(
794798
'Unsupported ID type for StubPostgresDatabaseAdapter: DateField',
795799
);
796800
});

packages/entity-database-adapter-knex/src/PostgresEntityDatabaseAdapter.ts

Lines changed: 74 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -277,37 +277,97 @@ export class PostgresEntityDatabaseAdapter<
277277
return parseInt(String(result[0][RESERVED_ENTITY_COUNT_QUERY_ALIAS]), 10);
278278
}
279279

280-
protected async insertInternalAsync(
280+
protected async insertManyInternalAsync(
281281
queryInterface: Knex,
282282
tableName: string,
283-
object: object,
283+
objects: readonly object[],
284284
): Promise<object[]> {
285285
return await wrapNativePostgresCallAsync(() =>
286-
queryInterface.insert(object).into(tableName).returning('*'),
286+
queryInterface
287+
.insert([...objects])
288+
.into(tableName)
289+
.returning('*'),
287290
);
288291
}
289292

290-
protected async updateInternalAsync(
293+
protected async updateManyInternalAsync(
291294
queryInterface: Knex,
292295
tableName: string,
293296
tableIdField: string,
294-
id: any,
295-
object: object,
296-
): Promise<{ updatedRowCount: number }> {
297-
const updatedRowCount = await wrapNativePostgresCallAsync(() =>
298-
queryInterface.update(object).into(tableName).where(tableIdField, id),
299-
);
300-
return { updatedRowCount };
297+
items: readonly { id: any; object: object }[],
298+
): Promise<readonly { updatedRowCount: number }[]> {
299+
if (items.length === 0) {
300+
return [];
301+
}
302+
303+
if (items.length === 1) {
304+
const item = items[0]!;
305+
const updatedRowCount = await wrapNativePostgresCallAsync(() =>
306+
queryInterface.update(item.object).into(tableName).where(tableIdField, item.id),
307+
);
308+
return [{ updatedRowCount }];
309+
}
310+
311+
// Bulk update using UPDATE ... FROM (VALUES ...) for same-column-set items.
312+
// All items are guaranteed to have the same set of columns.
313+
const columns = Object.keys(items[0]!.object);
314+
const allColumns = [tableIdField, ...columns];
315+
316+
const valuePlaceholders = items
317+
.map(() => `(${allColumns.map(() => '?').join(', ')})`)
318+
.join(', ');
319+
const bindings: any[] = items.flatMap((item) => [
320+
item.id,
321+
...columns.map((col) => (item.object as Record<string, any>)[col]),
322+
]);
323+
324+
const setClause = columns.map(() => `?? = ??`).join(', ');
325+
const setBindings = columns.flatMap((col) => [col, `_data_table_.${col}`]);
326+
327+
const columnList = allColumns.map(() => '??').join(', ');
328+
const columnBindings = allColumns;
329+
330+
const sql = [
331+
`UPDATE ?? SET ${setClause}`,
332+
`FROM (VALUES ${valuePlaceholders}) AS "_data_table_"(${columnList})`,
333+
`WHERE ?? = ??`,
334+
`RETURNING ??`,
335+
].join(' ');
336+
337+
const allBindings = [
338+
tableName,
339+
...setBindings,
340+
...bindings,
341+
...columnBindings,
342+
`${tableName}.${tableIdField}`,
343+
`_data_table_.${tableIdField}`,
344+
`${tableName}.${tableIdField}`,
345+
];
346+
347+
const result = await wrapNativePostgresCallAsync(() => queryInterface.raw(sql, allBindings));
348+
349+
const updatedIdCounts = new Map<any, number>();
350+
for (const row of result.rows as Record<string, any>[]) {
351+
const id = row[tableIdField];
352+
updatedIdCounts.set(id, (updatedIdCounts.get(id) ?? 0) + 1);
353+
}
354+
355+
return items.map((item) => ({
356+
updatedRowCount: updatedIdCounts.get(item.id) ?? 0,
357+
}));
301358
}
302359

303-
protected async deleteInternalAsync(
360+
protected async deleteManyInternalAsync(
304361
queryInterface: Knex,
305362
tableName: string,
306363
tableIdField: string,
307-
id: any,
364+
ids: readonly any[],
308365
): Promise<number> {
309366
return await wrapNativePostgresCallAsync(() =>
310-
queryInterface.into(tableName).where(tableIdField, id).del(),
367+
queryInterface
368+
.from(tableName)
369+
.whereIn(tableIdField, [...ids])
370+
.del(),
311371
);
312372
}
313373
}

0 commit comments

Comments
 (0)