Skip to content

Commit 36f8b0a

Browse files
committed
feat(fumadb): add bulk upsert queries
1 parent 24bccd6 commit 36f8b0a

7 files changed

Lines changed: 307 additions & 3 deletions

File tree

packages/core/fumadb/src/adapters/drizzle/query.ts

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,16 @@ function buildWhere(
121121
);
122122
}
123123

124+
function countConditionParameters(condition: Condition): number {
125+
if (condition.type === ConditionType.Compare) {
126+
if (condition.b instanceof Column) return 0;
127+
if (Array.isArray(condition.b)) return condition.b.length;
128+
return 1;
129+
}
130+
if (condition.type === ConditionType.Not) return countConditionParameters(condition.item);
131+
return condition.items.reduce((count, item) => count + countConditionParameters(item), 0);
132+
}
133+
124134
function mapValues(
125135
values: Record<string, unknown>,
126136
table: AnyTable
@@ -303,6 +313,70 @@ export function fromDrizzle(
303313
await this.createMany(table, [v.create]);
304314
}
305315
},
316+
async upsertMany(table, v) {
317+
if (v.values.length === 0) return;
318+
if (v.update.length === 0) {
319+
// oxlint-disable-next-line executor/no-try-catch-or-throw, executor/no-error-constructor -- boundary: adapter rejects invalid upsert shape
320+
throw new Error("[FumaDB] upsertMany requires at least one update column.");
321+
}
322+
if (provider !== "sqlite" && provider !== "postgresql") {
323+
for (const value of v.values) {
324+
const targetCondition: Condition = {
325+
type: ConditionType.And,
326+
items: v.target.map((column) => ({
327+
type: ConditionType.Compare,
328+
a: column,
329+
operator: "=",
330+
b: value[column.ormName],
331+
})),
332+
};
333+
await this.upsert(table, {
334+
where: v.where
335+
? { type: ConditionType.And, items: [targetCondition, v.where] }
336+
: targetCondition,
337+
update: Object.fromEntries(
338+
v.update.map((column) => [column.ormName, value[column.ormName]]),
339+
),
340+
create: value,
341+
});
342+
}
343+
return;
344+
}
345+
346+
const drizzleTable = toDrizzle(table);
347+
const values = v.values.map((value) => mapValues(value, table));
348+
const where = v.where ? buildWhere(toDrizzleColumn, v.where) : undefined;
349+
const whereParameters = v.where ? countConditionParameters(v.where) : 0;
350+
const columnsPerRow = values.length > 0 ? Math.max(1, Object.keys(values[0]!).length) : 1;
351+
const batchSize = maxBoundParameters
352+
? Math.max(
353+
1,
354+
Math.min(
355+
CREATE_MANY_BATCH_SIZE,
356+
Math.floor(Math.max(1, maxBoundParameters - whereParameters) / columnsPerRow),
357+
),
358+
)
359+
: CREATE_MANY_BATCH_SIZE;
360+
const target = v.target.map((column) => drizzleTable[column.names.drizzle]);
361+
const set = Object.fromEntries(
362+
v.update.map((column) => [
363+
column.names.drizzle,
364+
Drizzle.sql.raw(`excluded.${column.names.sql}`),
365+
]),
366+
);
367+
368+
for (let i = 0; i < values.length; i += batchSize) {
369+
const batch = values.slice(i, i + batchSize);
370+
await (db as any)
371+
.insert(drizzleTable)
372+
.values(batch)
373+
.onConflictDoUpdate({
374+
target,
375+
set,
376+
...(where === undefined ? {} : { where }),
377+
});
378+
}
379+
},
306380
async findMany(table, v) {
307381
return (
308382
await db.query[table.names.drizzle].findMany(buildQueryConfig(table, v))

packages/core/fumadb/src/adapters/memory/index.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,31 @@ export function memoryAdapter(options: MemoryAdapterOptions = {}): FumaDBAdapter
174174
}
175175
await this.create(table, v.create);
176176
},
177+
async upsertMany(table, v) {
178+
if (v.update.length === 0) {
179+
// oxlint-disable-next-line executor/no-try-catch-or-throw, executor/no-error-constructor -- boundary: adapter rejects invalid upsert shape
180+
throw new Error("[FumaDB] upsertMany requires at least one update column.");
181+
}
182+
for (const value of v.values) {
183+
const existing = tableRows(db, table).find(
184+
(row) =>
185+
matchesCondition(row, v.where) &&
186+
v.target.every((column) => row[column.ormName] === value[column.ormName]),
187+
);
188+
if (existing) {
189+
Object.assign(
190+
existing,
191+
cloneValue(
192+
Object.fromEntries(
193+
v.update.map((column) => [column.ormName, value[column.ormName]]),
194+
),
195+
),
196+
);
197+
continue;
198+
}
199+
await this.create(table, value);
200+
}
201+
},
177202
async create(table, values) {
178203
const row = applyDefaults(table, values);
179204
tableRows(db, table).push(row);

packages/core/fumadb/src/query/index.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,22 @@ export interface AbstractQuery<S extends AnySchema> {
174174
}
175175
) => Promise<void>;
176176

177+
/**
178+
* Bulk upsert rows by a unique column target.
179+
*
180+
* Adapters with native conflict support should implement this as one or more
181+
* `INSERT ... ON CONFLICT ... DO UPDATE` statements. Other adapters may fall
182+
* back to per-row `upsert`.
183+
*/
184+
upsertMany: <TableName extends keyof S["tables"]>(
185+
table: TableName,
186+
v: {
187+
target: (keyof S["tables"][TableName]["columns"])[];
188+
update: (keyof S["tables"][TableName]["columns"])[];
189+
values: TableToInsertValues<S["tables"][TableName]>[];
190+
}
191+
) => Promise<void>;
192+
177193
/**
178194
* Note: you cannot update the id of a row, some databases don't support that (including MongoDB).
179195
*/

packages/core/fumadb/src/query/orm/index.ts

Lines changed: 117 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import type {
44
AnySchema,
55
AnyTable,
66
} from "../../schema";
7+
import { Column } from "../../schema";
78
import type {
89
AbstractQuery,
910
AnySelectClause,
@@ -12,7 +13,12 @@ import type {
1213
JoinBuilder,
1314
OrderBy,
1415
} from "..";
15-
import { buildCondition, createBuilder, type Condition } from "../condition-builder";
16+
import {
17+
buildCondition,
18+
createBuilder,
19+
type Condition,
20+
ConditionType,
21+
} from "../condition-builder";
1622

1723
export interface CompiledJoin {
1824
relation: AnyRelation;
@@ -231,6 +237,27 @@ const applyUpdatePolicies = async (
231237
return nextWhere;
232238
};
233239

240+
const conditionKey = (condition: Condition | undefined): string => {
241+
if (!condition) return "none";
242+
if (condition.type === ConditionType.Compare) {
243+
const right =
244+
condition.b instanceof Column ? { column: condition.b.ormName } : { value: condition.b };
245+
return JSON.stringify({
246+
type: "compare",
247+
left: condition.a.ormName,
248+
operator: condition.operator,
249+
right,
250+
});
251+
}
252+
if (condition.type === ConditionType.Not) {
253+
return JSON.stringify({ type: "not", item: conditionKey(condition.item) });
254+
}
255+
return JSON.stringify({
256+
type: condition.type === ConditionType.And ? "and" : "or",
257+
items: condition.items.map(conditionKey),
258+
});
259+
};
260+
234261
const applyDeletePolicies = async (
235262
table: AnyTable,
236263
where: Condition | undefined,
@@ -284,6 +311,16 @@ export interface ORMAdapter<S extends AnySchema = AnySchema> {
284311
},
285312
) => Promise<void>;
286313

314+
upsertMany?: (
315+
table: AnyTable,
316+
v: {
317+
target: AnyColumn[];
318+
update: AnyColumn[];
319+
values: Record<string, unknown>[];
320+
where?: Condition;
321+
},
322+
) => Promise<void>;
323+
287324
create: (
288325
table: AnyTable,
289326
values: Record<string, unknown>,
@@ -367,6 +404,85 @@ export function toORM<S extends AnySchema>(
367404
...options,
368405
});
369406
},
407+
async upsertMany(name, { target, update, values }) {
408+
const table = toTable(name);
409+
if (values.length === 0) return;
410+
411+
const targetColumns = target.map((columnName) => {
412+
const column = table.columns[columnName as string];
413+
if (!column) throw new Error(`[FumaDB] unknown column name ${String(columnName)}.`);
414+
return column;
415+
});
416+
const updateColumns = update.map((columnName) => {
417+
const column = table.columns[columnName as string];
418+
if (!column) throw new Error(`[FumaDB] unknown column name ${String(columnName)}.`);
419+
return column;
420+
});
421+
422+
const builder = createBuilder(table.columns);
423+
const permittedRows: {
424+
readonly value: Record<string, unknown>;
425+
readonly where: Condition | undefined;
426+
}[] = [];
427+
for (const value of values) {
428+
const updateValues = Object.fromEntries(
429+
updateColumns.map((column) => [column.ormName, value[column.ormName]]),
430+
);
431+
const constrainedWhere = await applyUpdatePolicies(
432+
table,
433+
undefined,
434+
updateValues,
435+
context,
436+
"upsert",
437+
value,
438+
);
439+
if (constrainedWhere === false) continue;
440+
await runCreatePolicies(table, value, context);
441+
permittedRows.push({ value, where: constrainedWhere });
442+
}
443+
if (permittedRows.length === 0) return;
444+
445+
if (internal.upsertMany) {
446+
const groups = new Map<
447+
string,
448+
{ readonly where: Condition | undefined; readonly values: Record<string, unknown>[] }
449+
>();
450+
for (const row of permittedRows) {
451+
const key = conditionKey(row.where);
452+
const group = groups.get(key);
453+
if (group) {
454+
group.values.push(row.value);
455+
} else {
456+
groups.set(key, { where: row.where, values: [row.value] });
457+
}
458+
}
459+
for (const group of groups.values()) {
460+
await internal.upsertMany(table, {
461+
target: targetColumns,
462+
update: updateColumns,
463+
values: group.values,
464+
where: group.where,
465+
});
466+
}
467+
return;
468+
}
469+
470+
for (const row of permittedRows) {
471+
const value = row.value;
472+
const targetWhere = builder.and(
473+
...targetColumns.map((column) => builder(column.ormName, "=", value[column.ormName])),
474+
);
475+
const where = builder.and(targetWhere, row.where ?? true);
476+
if (where === false) continue;
477+
await internal.upsert(table, {
478+
where: where === true ? undefined : where,
479+
update: Object.fromEntries(
480+
updateColumns.map((column) => [column.ormName, value[column.ormName]]),
481+
),
482+
create: value,
483+
});
484+
}
485+
},
370486
async create(name, values) {
371487
const table = toTable(name);
372488
await runCreatePolicies(table, values, context);

packages/core/fumadb/src/query/table-policy.test.ts

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,24 @@ describe("FumaDB table policies", () => {
382382
title: "A Three",
383383
},
384384
});
385+
await tenantA.upsertMany("posts", {
386+
target: ["id"],
387+
update: ["title"],
388+
values: [
389+
{
390+
id: "post-a-1",
391+
tenantId: "tenant-a",
392+
authorId: "author-a",
393+
title: "tenant-a-bulk-upserted",
394+
},
395+
{
396+
id: "post-a-4",
397+
tenantId: "tenant-a",
398+
authorId: "author-a",
399+
title: "A Four",
400+
},
401+
],
402+
});
385403

386404
await expect(
387405
tenantA.findMany("posts", {
@@ -391,7 +409,7 @@ describe("FumaDB table policies", () => {
391409
).resolves.toEqual([
392410
{
393411
id: "post-a-1",
394-
title: "tenant-a-updated",
412+
title: "tenant-a-bulk-upserted",
395413
},
396414
{
397415
id: "post-a-2",
@@ -401,6 +419,10 @@ describe("FumaDB table policies", () => {
401419
id: "post-a-3",
402420
title: "A Three",
403421
},
422+
{
423+
id: "post-a-4",
424+
title: "A Four",
425+
},
404426
]);
405427

406428
expect(tenantAContext.observed).toEqual(
@@ -471,7 +493,7 @@ describe("FumaDB table policies", () => {
471493
);
472494

473495
it.effect(
474-
"rejects out-of-context writes across createMany, updateMany, upsert, and transactions",
496+
"rejects out-of-context writes across createMany, updateMany, upsert, upsertMany, and transactions",
475497
() =>
476498
useHarness(async (orm) => {
477499
await seedTenants(orm);
@@ -524,6 +546,47 @@ describe("FumaDB table policies", () => {
524546
}),
525547
).rejects.toThrow("tenant tenant-b is not allowed for posts");
526548

549+
await expect(
550+
tenantA.upsertMany("posts", {
551+
target: ["id"],
552+
update: ["title"],
553+
values: [
554+
{
555+
id: "post-a-bulk-upsert",
556+
tenantId: "tenant-a",
557+
authorId: "author-a",
558+
title: "A bulk upsert",
559+
},
560+
{
561+
id: "post-b-bulk-upsert",
562+
tenantId: "tenant-b",
563+
authorId: "author-b",
564+
title: "B bulk upsert",
565+
},
566+
],
567+
}),
568+
).rejects.toThrow("tenant tenant-b is not allowed for posts");
569+
await expect(
570+
tenantA.findFirst("posts", {
571+
where: (builder) => builder("id", "=", "post-a-bulk-upsert"),
572+
}),
573+
).resolves.toBeNull();
574+
575+
await expect(
576+
tenantA.upsertMany("posts", {
577+
target: ["id"],
578+
update: ["tenantId"],
579+
values: [
580+
{
581+
id: "post-a-1",
582+
tenantId: "tenant-b",
583+
authorId: "author-b",
584+
title: "tenant move",
585+
},
586+
],
587+
}),
588+
).rejects.toThrow("tenant tenant-b is not allowed for posts");
589+
527590
await expect(
528591
tenantA.transaction(async (tx) => {
529592
await tx.create("posts", {

0 commit comments

Comments
 (0)