Skip to content

Commit cc26688

Browse files
authored
ENG-1699 - Implement batch upsert for concepts in Supabase (#1000)
- Introduced a new function `upsertConceptBatches` to handle the upsert of concepts in batches, improving efficiency and error handling. - Added utility functions for chunking arrays and summarizing failed upsert IDs. - Updated the `convertDgToSupabaseConcepts` function to utilize the new batching logic, ensuring better management of concept dependencies during synchronization. - Removed redundant chunking logic from `upsertNodesToSupabaseAsContentWithEmbeddings` for cleaner code. This change enhances the synchronization process with Supabase by allowing for more manageable data handling and clearer error reporting.
1 parent bb5607f commit cc26688

1 file changed

Lines changed: 68 additions & 16 deletions

File tree

apps/roam/src/utils/syncDgNodesToSupabase.ts

Lines changed: 68 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ const SYNC_INTERVAL = "130s";
3434
const BASE_SYNC_INTERVAL = 5 * 60 * 1000; // 5 minutes
3535
const SYNC_TIMEOUT = "60s"; // must be less than half the SYNC_INTERVAL.
3636
const BATCH_SIZE = 200;
37+
const CONCEPT_BATCH_SIZE = 200;
3738
const DEFAULT_TIME = new Date("1970-01-01");
3839

3940
type SyncTaskInfo = {
@@ -42,6 +43,61 @@ type SyncTaskInfo = {
4243
shouldProceed: boolean;
4344
};
4445

46+
type LocalConceptDataInput = Partial<CompositeTypes<"concept_local_input">>;
47+
48+
const chunk = <T>(array: T[], size: number): T[][] => {
49+
const chunks: T[][] = [];
50+
for (let i = 0; i < array.length; i += size) {
51+
chunks.push(array.slice(i, i + size));
52+
}
53+
return chunks;
54+
};
55+
56+
const summarizeFailedConceptUpsertIds = (failedIds: number[]): string => {
57+
const counts = failedIds.reduce<Record<string, number>>((acc, id) => {
58+
acc[id] = (acc[id] ?? 0) + 1;
59+
return acc;
60+
}, {});
61+
62+
return Object.entries(counts)
63+
.map(([id, count]) => `${id}: ${count}`)
64+
.join(", ");
65+
};
66+
67+
const upsertConceptBatches = async ({
68+
concepts,
69+
supabaseClient,
70+
spaceId,
71+
}: {
72+
concepts: LocalConceptDataInput[];
73+
supabaseClient: DGSupabaseClient;
74+
spaceId: number;
75+
}): Promise<void> => {
76+
const batches = chunk(concepts, CONCEPT_BATCH_SIZE);
77+
78+
for (let idx = 0; idx < batches.length; idx++) {
79+
const batch = batches[idx];
80+
81+
const { data, error } = await supabaseClient.rpc("upsert_concepts", {
82+
data: batch as Json,
83+
v_space_id: spaceId,
84+
});
85+
86+
if (error) {
87+
throw new Error(
88+
`upsert_concepts failed for batch ${idx + 1}/${batches.length}: ${JSON.stringify(error, null, 2)}`,
89+
);
90+
}
91+
92+
const failedIds = (data || []).filter((id) => id < 0);
93+
if (failedIds.length > 0) {
94+
throw new Error(
95+
`upsert_concepts returned row failures for batch ${idx + 1}/${batches.length}: ${summarizeFailedConceptUpsertIds(failedIds)}`,
96+
);
97+
}
98+
}
99+
};
100+
45101
const notifyEndSyncFailure = ({
46102
status,
47103
showToast,
@@ -261,16 +317,20 @@ export const convertDgToSupabaseConcepts = async ({
261317
...nodesTypesToLocalConcepts,
262318
...nodeBlockToLocalConcepts,
263319
];
264-
const { ordered } = orderConceptsByDependency(conceptsToUpsert);
265-
const { error } = await supabaseClient.rpc("upsert_concepts", {
266-
data: ordered,
267-
v_space_id: context.spaceId,
268-
});
269-
if (error) {
270-
throw new Error(
271-
`upsert_concepts failed: ${JSON.stringify(error, null, 2)}`,
320+
const { ordered, missing } = orderConceptsByDependency(conceptsToUpsert);
321+
322+
if (missing.length > 0) {
323+
console.warn(
324+
"Some concept dependencies were not in the current sync batch:",
325+
missing,
272326
);
273327
}
328+
329+
await upsertConceptBatches({
330+
concepts: ordered,
331+
supabaseClient,
332+
spaceId: context.spaceId,
333+
});
274334
};
275335

276336
export const upsertNodesToSupabaseAsContentWithEmbeddings = async (
@@ -309,14 +369,6 @@ export const upsertNodesToSupabaseAsContentWithEmbeddings = async (
309369
);
310370
}
311371

312-
const chunk = <T>(array: T[], size: number): T[][] => {
313-
const chunks: T[][] = [];
314-
for (let i = 0; i < array.length; i += size) {
315-
chunks.push(array.slice(i, i + size));
316-
}
317-
return chunks;
318-
};
319-
320372
const uploadBatches = async (batches: LocalContentDataInput[][]) => {
321373
for (let idx = 0; idx < batches.length; idx++) {
322374
const batch = batches[idx];

0 commit comments

Comments
 (0)