Skip to content

Commit 5851d0a

Browse files
committed
perf(sdk): upsert plugin storage bulk writes
1 parent 36f8b0a commit 5851d0a

4 files changed

Lines changed: 82 additions & 35 deletions

File tree

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
"@executor-js/fumadb": patch
3+
"@executor-js/sdk": patch
4+
---
5+
6+
Add a FumaDB bulk upsert query path and route plugin-storage bulk writes through
7+
it so existing rows are updated without delete/reinsert churn.

packages/core/sdk/src/executor.ts

Lines changed: 50 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,6 @@ import { connectionIdentifier } from "./connection-name-identifier";
143143
import { annotateToolResultOutcome } from "./tool-result";
144144

145145
const PLUGIN_STORAGE_DELETE_KEY_BATCH_SIZE = 90;
146-
const PLUGIN_STORAGE_CREATE_ROW_BATCH_SIZE = 90;
147146
const MAX_APPROVAL_ARGUMENT_PREVIEW_CHARS = 4_000;
148147

149148
// ---------------------------------------------------------------------------
@@ -601,6 +600,14 @@ type LooseStorageDb = {
601600
tableName: string,
602601
rows: readonly Record<string, unknown>[],
603602
) => Promise<readonly unknown[]>;
603+
readonly upsertMany: (
604+
tableName: string,
605+
options: {
606+
readonly target: readonly string[];
607+
readonly update: readonly string[];
608+
readonly values: readonly Record<string, unknown>[];
609+
},
610+
) => Promise<void>;
604611
readonly deleteMany: (tableName: string, options?: unknown) => Promise<void>;
605612
readonly findFirst: (
606613
tableName: string,
@@ -637,6 +644,19 @@ const makeCoreDb = (fuma: ReturnType<typeof makeFumaClient>) => ({
637644
: fuma
638645
.use(`${tableName}.createMany`, (db) => asLooseStorageDb(db).createMany(tableName, rows))
639646
.pipe(Effect.asVoid),
647+
upsertMany: <TName extends CoreTableName>(
648+
tableName: TName,
649+
options: {
650+
readonly target: readonly string[];
651+
readonly update: readonly string[];
652+
readonly values: readonly Record<string, unknown>[];
653+
},
654+
): Effect.Effect<void, StorageFailure> =>
655+
options.values.length === 0
656+
? Effect.void
657+
: fuma.use(`${tableName}.upsertMany`, (db) =>
658+
asLooseStorageDb(db).upsertMany(tableName, options),
659+
),
640660
deleteMany: <TName extends CoreTableName>(
641661
tableName: TName,
642662
options: { readonly where?: CoreWhere } = {},
@@ -986,33 +1006,22 @@ const makePluginStorageFacade = (input: {
9861006
const uniqueEntries = [...entriesById.values()];
9871007
if (uniqueEntries.length === 0) return;
9881008

989-
yield* deleteManyImpl(owner, os.subject, uniqueEntries);
990-
9911009
const now = new Date();
992-
for (
993-
let offset = 0;
994-
offset < uniqueEntries.length;
995-
offset += PLUGIN_STORAGE_CREATE_ROW_BATCH_SIZE
996-
) {
997-
const batchEntries = uniqueEntries.slice(
998-
offset,
999-
offset + PLUGIN_STORAGE_CREATE_ROW_BATCH_SIZE,
1000-
);
1001-
yield* input.core.createMany(
1002-
"plugin_storage",
1003-
batchEntries.map((entry) => ({
1004-
tenant,
1005-
owner: os.owner,
1006-
subject: os.subject,
1007-
plugin_id: input.pluginId,
1008-
collection: entry.collection,
1009-
key: entry.key,
1010-
data: entry.data,
1011-
created_at: now,
1012-
updated_at: now,
1013-
})),
1014-
);
1015-
}
1010+
yield* input.core.upsertMany("plugin_storage", {
1011+
target: ["tenant", "owner", "subject", "plugin_id", "collection", "key"],
1012+
update: ["data", "updated_at"],
1013+
values: uniqueEntries.map((entry) => ({
1014+
tenant,
1015+
owner: os.owner,
1016+
subject: os.subject,
1017+
plugin_id: input.pluginId,
1018+
collection: entry.collection,
1019+
key: entry.key,
1020+
data: entry.data,
1021+
created_at: now,
1022+
updated_at: now,
1023+
})),
1024+
});
10161025
});
10171026

10181027
const removeManyImpl = (
@@ -1109,10 +1118,24 @@ const makePluginStorageFacade = (input: {
11091118
PluginStorageEntry<PluginStorageCollectionData<typeof definition>>,
11101119
StorageFailure
11111120
>,
1121+
putMany: (storageInput) =>
1122+
putManyImpl(
1123+
storageInput.owner,
1124+
storageInput.entries.map((entry) => ({
1125+
collection: definition.name,
1126+
key: entry.key,
1127+
data: entry.data,
1128+
})),
1129+
),
11121130
query: (storageInput) => queryCollection(definition, storageInput),
11131131
count: (storageInput) =>
11141132
queryCollection(definition, storageInput).pipe(Effect.map((rows) => rows.length)),
11151133
remove: (storageInput) => removeImpl(storageInput.owner, definition.name, storageInput.key),
1134+
removeMany: (storageInput) =>
1135+
removeManyImpl(
1136+
storageInput.owner,
1137+
storageInput.keys.map((key) => ({ collection: definition.name, key })),
1138+
),
11161139
}),
11171140
get: (storageInput) => getVisible(storageInput.collection, storageInput.key),
11181141
getForOwner: (storageInput) =>

packages/core/sdk/src/plugin-storage.test.ts

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,18 +65,14 @@ const executionHistoryPlugin = definePlugin(() => ({
6565
owner: Owner,
6666
rows: readonly { readonly key: string; readonly data: ToolCall }[],
6767
) =>
68-
ctx.pluginStorage.putMany({
68+
ctx.storage.toolCalls.putMany({
6969
owner,
70-
entries: rows.map((row) => ({
71-
collection: toolCalls.name,
72-
key: row.key,
73-
data: row.data,
74-
})),
70+
entries: rows,
7571
}),
7672
removeMany: (owner: Owner, keys: readonly string[]) =>
77-
ctx.pluginStorage.removeMany({
73+
ctx.storage.toolCalls.removeMany({
7874
owner,
79-
entries: keys.map((key) => ({ collection: toolCalls.name, key })),
75+
keys,
8076
}),
8177
get: (key: string) => ctx.storage.toolCalls.get({ key }),
8278
getForOwner: (owner: Owner, key: string) => ctx.storage.toolCalls.getForOwner({ owner, key }),

packages/core/sdk/src/plugin-storage.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,21 @@ export interface PluginStorageCollectionScopedKeyInput extends PluginStorageColl
135135
readonly owner: Owner;
136136
}
137137

138+
export interface PluginStorageCollectionPutManyEntry<TData extends object> {
139+
readonly key: string;
140+
readonly data: TData;
141+
}
142+
143+
export interface PluginStorageCollectionPutManyInput<TData extends object> {
144+
readonly owner: Owner;
145+
readonly entries: readonly PluginStorageCollectionPutManyEntry<TData>[];
146+
}
147+
148+
export interface PluginStorageCollectionRemoveManyInput {
149+
readonly owner: Owner;
150+
readonly keys: readonly string[];
151+
}
152+
138153
export interface PluginStorageCollectionListInput {
139154
readonly keyPrefix?: string;
140155
}
@@ -188,6 +203,9 @@ export interface PluginStorageCollectionFacade<
188203
readonly put: (
189204
input: PluginStorageCollectionPutInput<PluginStorageCollectionData<TDefinition>>,
190205
) => Effect.Effect<PluginStorageEntry<PluginStorageCollectionData<TDefinition>>, StorageFailure>;
206+
readonly putMany: (
207+
input: PluginStorageCollectionPutManyInput<PluginStorageCollectionData<TDefinition>>,
208+
) => Effect.Effect<void, StorageFailure>;
191209
readonly query: (
192210
input?: PluginStorageCollectionQueryInput<TDefinition>,
193211
) => Effect.Effect<
@@ -200,6 +218,9 @@ export interface PluginStorageCollectionFacade<
200218
readonly remove: (
201219
input: PluginStorageCollectionScopedKeyInput,
202220
) => Effect.Effect<void, StorageFailure>;
221+
readonly removeMany: (
222+
input: PluginStorageCollectionRemoveManyInput,
223+
) => Effect.Effect<void, StorageFailure>;
203224
}
204225

205226
export interface PluginStorageFacade {

0 commit comments

Comments
 (0)