Skip to content

Commit 48b3eff

Browse files
authored
ENG-1793 Roam sync does not recover from Concept upsert failure (#1075)
1 parent cb70908 commit 48b3eff

3 files changed

Lines changed: 73 additions & 18 deletions

File tree

apps/roam/src/components/settings/DiscourseNodeSuggestiveRules.tsx

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,9 @@ const DiscourseNodeSuggestiveRules = ({
4848
const handleUpdateEmbeddings = async (): Promise<void> => {
4949
setIsUpdating(true);
5050
try {
51-
const blockNodesSince = await getAllDiscourseNodesSince(
52-
new Date(0).toISOString(),
53-
[node],
54-
);
51+
const blockNodesSince = await getAllDiscourseNodesSince(undefined, [
52+
node,
53+
]);
5554
const supabaseClient = await getLoggedInClient();
5655
if (!supabaseClient) return;
5756

apps/roam/src/utils/getAllDiscourseNodesSince.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { type DiscourseNode } from "./getDiscourseNodes";
22
import getDiscourseNodeFormatExpression from "./getDiscourseNodeFormatExpression";
33
import extractRef from "roamjs-components/util/extractRef";
44

5-
type ISODateString = string;
5+
const DEFAULT_TIME = new Date("1970-01-01").getTime();
66

77
export type RoamDiscourseNodeData = {
88
author_local_id: string;
@@ -63,10 +63,10 @@ export const getDiscourseNodeTypeWithSettingsBlockNodes = async (
6363
};
6464

6565
export const getAllDiscourseNodesSince = async (
66-
since: ISODateString | undefined,
66+
since: number | undefined,
6767
nodeTypes: DiscourseNode[],
6868
): Promise<RoamDiscourseNodeData[]> => {
69-
const sinceMs = since ? new Date(since).getTime() : 0;
69+
const sinceMs = since ?? DEFAULT_TIME;
7070
if (!nodeTypes.length) {
7171
return [];
7272
}
@@ -144,10 +144,10 @@ export const getAllDiscourseNodesSince = async (
144144
};
145145

146146
export const nodeTypeSince = async (
147-
since: ISODateString,
147+
since: number | undefined,
148148
nodeTypes: DiscourseNode[],
149149
) => {
150-
const sinceMs = new Date(since).getTime();
150+
const sinceMs = since ?? DEFAULT_TIME;
151151
const filterMap = await Promise.all(
152152
nodeTypes.map((node) => {
153153
const query = `

apps/roam/src/utils/syncDgNodesToSupabase.ts

Lines changed: 65 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@ import {
1919
import { fetchEmbeddingsForNodes } from "./upsertNodesAsContentWithEmbeddings";
2020
import { convertRoamNodeToLocalContent } from "./upsertNodesAsContentWithEmbeddings";
2121
import type { DGSupabaseClient } from "@repo/database/lib/client";
22+
import { intersection } from "@repo/utils/setOperations";
2223
import type { Json, CompositeTypes, Enums } from "@repo/database/dbTypes";
2324
import { render as renderToast } from "roamjs-components/components/Toast";
2425
import internalError from "~/utils/internalError";
2526
type LocalContentDataInput = Partial<CompositeTypes<"content_local_input">>;
2627
type AccountLocalInput = CompositeTypes<"account_local_input">;
2728
import { FatalError } from "@repo/database/lib/contextFunctions";
29+
import { getAllPages } from "@repo/database/lib/pagination";
2830

2931
const SYNC_FUNCTION = "embedding";
3032
// Minimal interval between syncs of all clients for this task.
@@ -34,7 +36,6 @@ const BASE_SYNC_INTERVAL = 5 * 60 * 1000; // 5 minutes
3436
const SYNC_TIMEOUT = "60s"; // must be less than half the SYNC_INTERVAL.
3537
const BATCH_SIZE = 200;
3638
const CONCEPT_BATCH_SIZE = 200;
37-
const DEFAULT_TIME = new Date("1970-01-01");
3839

3940
type SyncTaskInfo = {
4041
lastUpdateTime?: Date;
@@ -286,7 +287,7 @@ export const convertDgToSupabaseConcepts = async ({
286287
context,
287288
}: {
288289
nodesSince: RoamDiscourseNodeData[];
289-
since: string;
290+
since: number | undefined;
290291
allNodeTypes: DiscourseNode[];
291292
supabaseClient: DGSupabaseClient;
292293
context: SupabaseContext;
@@ -427,6 +428,7 @@ const upsertUsers = async (
427428
};
428429

429430
let doSync = true;
431+
let initialSync = true;
430432
let numFailures = 0;
431433
const MAX_FAILURES = 5;
432434
type TimeoutValue = ReturnType<typeof setTimeout>;
@@ -447,12 +449,60 @@ export const setSyncActivity = (active: boolean) => {
447449
}
448450
};
449451

452+
const getAllMissingOrNewDiscourseNodes = async ({
453+
supabaseClient,
454+
spaceId,
455+
since,
456+
nodeTypes,
457+
}: {
458+
supabaseClient: DGSupabaseClient;
459+
spaceId: number;
460+
since: number | undefined;
461+
nodeTypes: DiscourseNode[];
462+
}): Promise<RoamDiscourseNodeData[]> => {
463+
const allNodes = await getAllDiscourseNodesSince(undefined, nodeTypes);
464+
if (since === undefined) return allNodes;
465+
const newNodes = await getAllDiscourseNodesSince(since, nodeTypes);
466+
const existingContentIdsReq = await getAllPages(
467+
supabaseClient
468+
.from("my_contents")
469+
.select("source_local_id")
470+
.eq("space_id", spaceId)
471+
.order("id"),
472+
1000,
473+
);
474+
if (!Array.isArray(existingContentIdsReq)) throw existingContentIdsReq;
475+
const existingConceptIdsReq = await getAllPages(
476+
supabaseClient
477+
.from("my_concepts")
478+
.select("source_local_id")
479+
.eq("space_id", spaceId)
480+
.eq("arity", 0)
481+
.eq("is_schema", false)
482+
.order("id"),
483+
1000,
484+
);
485+
if (!Array.isArray(existingConceptIdsReq)) throw existingConceptIdsReq;
486+
const existingIds = new Set([
487+
...intersection(
488+
new Set(existingConceptIdsReq.map((d) => d.source_local_id)),
489+
new Set(existingContentIdsReq.map((d) => d.source_local_id)),
490+
),
491+
...newNodes.map((n) => n.source_local_id),
492+
]);
493+
return [
494+
...newNodes,
495+
...allNodes.filter((n) => !existingIds.has(n.source_local_id)),
496+
];
497+
};
498+
450499
export const createOrUpdateDiscourseEmbedding = async (showToast = false) => {
451500
if (!doSync) return;
452501
console.debug("starting createOrUpdateDiscourseEmbedding");
453502
const startTime = new Date();
454503
let success = true;
455504
let claimed = false;
505+
const isInitialSync = initialSync; // record state at start
456506
const worker = window.roamAlphaAPI.user.uid();
457507

458508
try {
@@ -489,16 +539,21 @@ export const createOrUpdateDiscourseEmbedding = async (showToast = false) => {
489539
}
490540
claimed = true;
491541
const allUsers = await getAllUsers();
492-
const sinceTime = (lastUpdateTime || DEFAULT_TIME).valueOf() - 1000; // add a one-second buffer
493-
const time = new Date(sinceTime).toISOString();
542+
const sinceTime = lastUpdateTime
543+
? lastUpdateTime.valueOf() - 1000 // add a one-second buffer
544+
: undefined;
494545
const allDgNodeTypes = getDiscourseNodes().filter(
495546
(n) => n.backedBy === "user",
496547
);
497548

498-
const allNodeInstances = await getAllDiscourseNodesSince(
499-
time,
500-
allDgNodeTypes,
501-
);
549+
const allNodeInstances = isInitialSync
550+
? await getAllMissingOrNewDiscourseNodes({
551+
supabaseClient,
552+
spaceId: context.spaceId,
553+
since: sinceTime,
554+
nodeTypes: allDgNodeTypes,
555+
})
556+
: await getAllDiscourseNodesSince(sinceTime, allDgNodeTypes);
502557
await upsertUsers(allUsers, supabaseClient, context);
503558
await upsertNodesToSupabaseAsContentWithEmbeddings(
504559
allNodeInstances,
@@ -507,13 +562,14 @@ export const createOrUpdateDiscourseEmbedding = async (showToast = false) => {
507562
);
508563
await convertDgToSupabaseConcepts({
509564
nodesSince: allNodeInstances,
510-
since: time,
565+
since: sinceTime,
511566
allNodeTypes: allDgNodeTypes,
512567
supabaseClient,
513568
context,
514569
});
515570
await cleanupOrphanedNodes(supabaseClient, context);
516571
await endSyncTask({ worker, status: "complete", showToast, startTime });
572+
initialSync = false;
517573
const duration = (new Date().valueOf() - startTime.valueOf()) / 1000.0;
518574
posthog.capture("Sync complete", { duration });
519575
} catch (error) {

0 commit comments

Comments
 (0)