Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/plugin-storage-bulk-upserts.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@executor-js/fumadb": patch
"@executor-js/sdk": patch
---

Add a FumaDB bulk upsert query path and route plugin-storage bulk writes through
it so existing rows are updated without delete/reinsert churn.
74 changes: 74 additions & 0 deletions packages/core/fumadb/src/adapters/drizzle/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,16 @@ function buildWhere(
);
}

function countConditionParameters(condition: Condition): number {
if (condition.type === ConditionType.Compare) {
if (condition.b instanceof Column) return 0;
if (Array.isArray(condition.b)) return condition.b.length;
return 1;
}
if (condition.type === ConditionType.Not) return countConditionParameters(condition.item);
return condition.items.reduce((count, item) => count + countConditionParameters(item), 0);
}

function mapValues(
values: Record<string, unknown>,
table: AnyTable
Expand Down Expand Up @@ -303,6 +313,70 @@ export function fromDrizzle(
await this.createMany(table, [v.create]);
}
},
async upsertMany(table, v) {
if (v.values.length === 0) return;
if (v.update.length === 0) {
// oxlint-disable-next-line executor/no-try-catch-or-throw, executor/no-error-constructor -- boundary: adapter rejects invalid upsert shape
throw new Error("[FumaDB] upsertMany requires at least one update column.");
}
if (provider !== "sqlite" && provider !== "postgresql") {
for (const value of v.values) {
const targetCondition: Condition = {
type: ConditionType.And,
items: v.target.map((column) => ({
type: ConditionType.Compare,
a: column,
operator: "=",
b: value[column.ormName],
})),
};
await this.upsert(table, {
where: v.where
? { type: ConditionType.And, items: [targetCondition, v.where] }
: targetCondition,
update: Object.fromEntries(
v.update.map((column) => [column.ormName, value[column.ormName]]),
),
create: value,
});
}
return;
}

const drizzleTable = toDrizzle(table);
const values = v.values.map((value) => mapValues(value, table));
const where = v.where ? buildWhere(toDrizzleColumn, v.where) : undefined;
const whereParameters = v.where ? countConditionParameters(v.where) : 0;
const columnsPerRow = values.length > 0 ? Math.max(1, Object.keys(values[0]!).length) : 1;
const batchSize = maxBoundParameters
? Math.max(
1,
Math.min(
CREATE_MANY_BATCH_SIZE,
Math.floor(Math.max(1, maxBoundParameters - whereParameters) / columnsPerRow),
),
)
: CREATE_MANY_BATCH_SIZE;
const target = v.target.map((column) => drizzleTable[column.names.drizzle]);
const set = Object.fromEntries(
v.update.map((column) => [
column.names.drizzle,
Drizzle.sql.raw(`excluded.${column.names.sql}`),
]),
);

for (let i = 0; i < values.length; i += batchSize) {
const batch = values.slice(i, i + batchSize);
await (db as any)
.insert(drizzleTable)
.values(batch)
.onConflictDoUpdate({
target,
set,
...(where === undefined ? {} : { where }),
});
}
Comment thread
greptile-apps[bot] marked this conversation as resolved.
},
async findMany(table, v) {
return (
await db.query[table.names.drizzle].findMany(buildQueryConfig(table, v))
Expand Down
25 changes: 25 additions & 0 deletions packages/core/fumadb/src/adapters/memory/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,31 @@ export function memoryAdapter(options: MemoryAdapterOptions = {}): FumaDBAdapter
}
await this.create(table, v.create);
},
async upsertMany(table, v) {
if (v.update.length === 0) {
// oxlint-disable-next-line executor/no-try-catch-or-throw, executor/no-error-constructor -- boundary: adapter rejects invalid upsert shape
throw new Error("[FumaDB] upsertMany requires at least one update column.");
}
for (const value of v.values) {
const existing = tableRows(db, table).find(
(row) =>
matchesCondition(row, v.where) &&
v.target.every((column) => row[column.ormName] === value[column.ormName]),
);
if (existing) {
Object.assign(
existing,
cloneValue(
Object.fromEntries(
v.update.map((column) => [column.ormName, value[column.ormName]]),
),
),
);
continue;
}
await this.create(table, value);
}
},
Comment thread
greptile-apps[bot] marked this conversation as resolved.
async create(table, values) {
const row = applyDefaults(table, values);
tableRows(db, table).push(row);
Expand Down
16 changes: 16 additions & 0 deletions packages/core/fumadb/src/query/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,22 @@ export interface AbstractQuery<S extends AnySchema> {
}
) => Promise<void>;

/**
* Bulk upsert rows by a unique column target.
*
* Adapters with native conflict support should implement this as one or more
* `INSERT ... ON CONFLICT ... DO UPDATE` statements. Other adapters may fall
* back to per-row `upsert`.
*/
upsertMany: <TableName extends keyof S["tables"]>(
table: TableName,
v: {
target: (keyof S["tables"][TableName]["columns"])[];
update: (keyof S["tables"][TableName]["columns"])[];
values: TableToInsertValues<S["tables"][TableName]>[];
}
) => Promise<void>;

/**
* Note: you cannot update the id of a row, some databases don't support that (including MongoDB).
*/
Expand Down
118 changes: 117 additions & 1 deletion packages/core/fumadb/src/query/orm/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type {
AnySchema,
AnyTable,
} from "../../schema";
import { Column } from "../../schema";
import type {
AbstractQuery,
AnySelectClause,
Expand All @@ -12,7 +13,12 @@ import type {
JoinBuilder,
OrderBy,
} from "..";
import { buildCondition, createBuilder, type Condition } from "../condition-builder";
import {
buildCondition,
createBuilder,
type Condition,
ConditionType,
} from "../condition-builder";

export interface CompiledJoin {
relation: AnyRelation;
Expand Down Expand Up @@ -231,6 +237,27 @@ const applyUpdatePolicies = async (
return nextWhere;
};

const conditionKey = (condition: Condition | undefined): string => {
if (!condition) return "none";
if (condition.type === ConditionType.Compare) {
const right =
condition.b instanceof Column ? { column: condition.b.ormName } : { value: condition.b };
return JSON.stringify({
type: "compare",
left: condition.a.ormName,
operator: condition.operator,
right,
});
}
if (condition.type === ConditionType.Not) {
return JSON.stringify({ type: "not", item: conditionKey(condition.item) });
}
return JSON.stringify({
type: condition.type === ConditionType.And ? "and" : "or",
items: condition.items.map(conditionKey),
});
};

const applyDeletePolicies = async (
table: AnyTable,
where: Condition | undefined,
Expand Down Expand Up @@ -284,6 +311,16 @@ export interface ORMAdapter<S extends AnySchema = AnySchema> {
},
) => Promise<void>;

upsertMany?: (
table: AnyTable,
v: {
target: AnyColumn[];
update: AnyColumn[];
values: Record<string, unknown>[];
where?: Condition;
},
) => Promise<void>;

create: (
table: AnyTable,
values: Record<string, unknown>,
Expand Down Expand Up @@ -367,6 +404,85 @@ export function toORM<S extends AnySchema>(
...options,
});
},
async upsertMany(name, { target, update, values }) {
const table = toTable(name);
if (values.length === 0) return;

const targetColumns = target.map((columnName) => {
const column = table.columns[columnName as string];
if (!column) throw new Error(`[FumaDB] unknown column name ${String(columnName)}.`);
return column;
});
const updateColumns = update.map((columnName) => {
const column = table.columns[columnName as string];
if (!column) throw new Error(`[FumaDB] unknown column name ${String(columnName)}.`);
return column;
});

const builder = createBuilder(table.columns);
const permittedRows: {
readonly value: Record<string, unknown>;
readonly where: Condition | undefined;
}[] = [];
for (const value of values) {
const updateValues = Object.fromEntries(
updateColumns.map((column) => [column.ormName, value[column.ormName]]),
);
const constrainedWhere = await applyUpdatePolicies(
table,
undefined,
updateValues,
context,
"upsert",
value,
);
if (constrainedWhere === false) continue;
await runCreatePolicies(table, value, context);
permittedRows.push({ value, where: constrainedWhere });
}
if (permittedRows.length === 0) return;

if (internal.upsertMany) {
const groups = new Map<
string,
{ readonly where: Condition | undefined; readonly values: Record<string, unknown>[] }
>();
for (const row of permittedRows) {
const key = conditionKey(row.where);
const group = groups.get(key);
if (group) {
group.values.push(row.value);
} else {
groups.set(key, { where: row.where, values: [row.value] });
}
}
for (const group of groups.values()) {
await internal.upsertMany(table, {
target: targetColumns,
update: updateColumns,
values: group.values,
where: group.where,
});
}
return;
}

for (const row of permittedRows) {
const value = row.value;
const targetWhere = builder.and(
...targetColumns.map((column) => builder(column.ormName, "=", value[column.ormName])),
);
const where = builder.and(targetWhere, row.where ?? true);
if (where === false) continue;
await internal.upsert(table, {
where: where === true ? undefined : where,
update: Object.fromEntries(
updateColumns.map((column) => [column.ormName, value[column.ormName]]),
),
create: value,
});
}
},
async create(name, values) {
const table = toTable(name);
await runCreatePolicies(table, values, context);
Expand Down
Loading
Loading