Skip to content

Commit 8c61336

Browse files
authored
ENG-298 repeat the sync at regular intervals (#513)
* ENG-298 isolate the repetition logic from the refactoring * stop sync un unload * Not finding the Space is not a reason not to sync; it will get created with getContext * reorder task preconditions, avoid ending a task that failed to start * clarify different interval values * timing suggestions * coderabbit correction
1 parent edc933c commit 8c61336

2 files changed

Lines changed: 144 additions & 84 deletions

File tree

apps/roam/src/index.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ import {
2727
installDiscourseFloatingMenu,
2828
removeDiscourseFloatingMenu,
2929
} from "./components/DiscourseFloatingMenu";
30-
import { initializeSupabaseSync } from "./utils/syncDgNodesToSupabase";
30+
import {
31+
initializeSupabaseSync,
32+
setSyncActivity,
33+
} from "./utils/syncDgNodesToSupabase";
3134
import { initPluginTimer } from "./utils/pluginTimer";
3235

3336
const initPostHog = () => {
@@ -137,7 +140,7 @@ export default runExtension(async (onloadArgs) => {
137140
},
138141
listActiveQueries: () => listActiveQueries(extensionAPI),
139142
isDiscourseNode: isDiscourseNode,
140-
// @ts-ignore - we are still using roamjs-components global definition
143+
// @ts-expect-error - we are still using roamjs-components global definition
141144
getDiscourseNodes: getDiscourseNodes,
142145
};
143146

@@ -152,8 +155,9 @@ export default runExtension(async (onloadArgs) => {
152155
],
153156
observers: observers,
154157
unload: () => {
158+
setSyncActivity(false);
155159
window.roamjs.extension?.smartblocks?.unregisterCommand("QUERYBUILDER");
156-
// @ts-ignore - tldraw throws a warning on multiple loads
160+
// @ts-expect-error - tldraw throws a warning on multiple loads
157161
delete window[Symbol.for("__signia__")];
158162
document.removeEventListener(
159163
"roamjs:query-builder:action",

apps/roam/src/utils/syncDgNodesToSupabase.ts

Lines changed: 137 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -20,29 +20,33 @@ import { fetchEmbeddingsForNodes } from "./upsertNodesAsContentWithEmbeddings";
2020
import { convertRoamNodeToLocalContent } from "./upsertNodesAsContentWithEmbeddings";
2121
import { getRoamUrl } from "roamjs-components/dom";
2222
import { render as renderToast } from "roamjs-components/components/Toast";
23-
import type { DGSupabaseClient } from "@repo/database/lib/client";
24-
import type { Json, CompositeTypes } from "@repo/database/dbTypes";
23+
import { createClient, type DGSupabaseClient } from "@repo/database/lib/client";
24+
import type { Json, CompositeTypes, Enums } from "@repo/database/dbTypes";
2525

2626
type LocalContentDataInput = Partial<CompositeTypes<"content_local_input">>;
2727
type AccountLocalInput = CompositeTypes<"account_local_input">;
28-
const { createClient } = require("@repo/database/lib/client");
2928

3029
const SYNC_FUNCTION = "embedding";
30+
// Minimal interval between syncs of all clients for this task.
3131
const SYNC_INTERVAL = "45s";
32+
// Interval between syncs for each client individually
33+
const BASE_SYNC_INTERVAL = 5 * 60 * 1000; // 5 minutes
3234
const SYNC_TIMEOUT = "20s";
3335
const BATCH_SIZE = 200;
34-
const DEFAULT_TIME = "1970-01-01";
36+
const DEFAULT_TIME = new Date("1970-01-01");
37+
38+
class FatalError extends Error {}
3539

3640
type SyncTaskInfo = {
37-
lastUpdateTime: string | null;
38-
spaceId: number;
39-
worker: string;
41+
lastUpdateTime?: Date;
42+
nextUpdateTime?: Date;
4043
shouldProceed: boolean;
4144
};
4245

4346
export const endSyncTask = async (
4447
worker: string,
45-
status: "complete" | "failed",
48+
status: Enums<"task_status">,
49+
showToast: boolean = false,
4650
): Promise<void> => {
4751
try {
4852
const supabaseClient = await getLoggedInClient();
@@ -60,13 +64,15 @@ export const endSyncTask = async (
6064
});
6165
if (error) {
6266
console.error("endSyncTask: Error calling end_sync_task:", error);
63-
renderToast({
64-
id: "discourse-embedding-error",
65-
content: "Failed to complete discourse node embeddings sync",
66-
intent: "danger",
67-
timeout: 5000,
68-
});
69-
} else {
67+
if (showToast)
68+
renderToast({
69+
id: "discourse-embedding-error",
70+
content: "Failed to complete discourse node embeddings sync",
71+
intent: "danger",
72+
timeout: 5000,
73+
});
74+
return;
75+
} else if (showToast) {
7076
if (status === "complete") {
7177
renderToast({
7278
id: "discourse-embedding-complete",
@@ -85,39 +91,23 @@ export const endSyncTask = async (
8591
}
8692
} catch (error) {
8793
console.error("endSyncTask: Error calling end_sync_task:", error);
88-
renderToast({
89-
id: "discourse-embedding-error",
90-
content: "Failed to complete discourse node embeddings sync",
91-
intent: "danger",
92-
timeout: 5000,
93-
});
94+
if (showToast)
95+
renderToast({
96+
id: "discourse-embedding-error",
97+
content: "Failed to complete discourse node embeddings sync",
98+
intent: "danger",
99+
timeout: 5000,
100+
});
94101
}
95102
};
96103

97-
export const proposeSyncTask = async (): Promise<SyncTaskInfo> => {
104+
export const proposeSyncTask = async (
105+
worker: string,
106+
supabaseClient: DGSupabaseClient,
107+
context: SupabaseContext,
108+
): Promise<SyncTaskInfo> => {
98109
try {
99-
const supabaseClient = await getLoggedInClient();
100-
const context = supabaseClient ? await getSupabaseContext() : null;
101-
if (!context || !supabaseClient) {
102-
console.error("proposeSyncTask: Unable to obtain Supabase context.");
103-
return {
104-
lastUpdateTime: null,
105-
spaceId: 0,
106-
worker: "",
107-
shouldProceed: false,
108-
};
109-
}
110-
const worker = window.roamAlphaAPI.user.uid();
111-
if (!worker) {
112-
console.error("proposeSyncTask: Unable to obtain user UID.");
113-
return {
114-
lastUpdateTime: null,
115-
spaceId: 0,
116-
worker: "",
117-
shouldProceed: false,
118-
};
119-
}
120-
110+
const now = new Date();
121111
const { data, error } = await supabaseClient.rpc("propose_sync_task", {
122112
s_target: context.spaceId,
123113
s_function: SYNC_FUNCTION,
@@ -126,36 +116,36 @@ export const proposeSyncTask = async (): Promise<SyncTaskInfo> => {
126116
timeout: SYNC_TIMEOUT,
127117
});
128118

129-
const { spaceId } = context;
130-
131119
if (error) {
132120
console.error(
133121
`proposeSyncTask: propose_sync_task failed – ${error.message}`,
134122
);
135-
return { lastUpdateTime: null, spaceId, worker, shouldProceed: false };
123+
return { shouldProceed: false };
136124
}
137125

138126
if (typeof data === "string") {
139127
const timestamp = new Date(data);
140-
const now = new Date();
141128

142129
if (timestamp > now) {
143-
return { lastUpdateTime: null, spaceId, worker, shouldProceed: false };
130+
return {
131+
nextUpdateTime: timestamp,
132+
shouldProceed: false,
133+
};
144134
} else {
145-
return { lastUpdateTime: data, spaceId, worker, shouldProceed: true };
135+
return {
136+
lastUpdateTime: timestamp,
137+
shouldProceed: true,
138+
};
146139
}
147140
}
148141

149-
return { lastUpdateTime: null, spaceId, worker, shouldProceed: true };
142+
return { shouldProceed: true };
150143
} catch (error) {
151144
console.error(
152145
`proposeSyncTask: Unexpected error while contacting sync-task API:`,
153146
error,
154147
);
155148
return {
156-
lastUpdateTime: null,
157-
spaceId: 0,
158-
worker: "",
159149
shouldProceed: false,
160150
};
161151
}
@@ -188,7 +178,6 @@ const upsertNodeSchemaToContent = async ({
188178
189179
]
190180
`;
191-
//@ts-ignore - backend to be added to roamjs-components
192181
const result = (await window.roamAlphaAPI.data.async.q(
193182
query,
194183
nodeTypesUids,
@@ -369,30 +358,77 @@ const upsertUsers = async (
369358
}
370359
};
371360

372-
export const createOrUpdateDiscourseEmbedding = async () => {
373-
const { shouldProceed, lastUpdateTime, worker } = await proposeSyncTask();
374-
375-
if (!shouldProceed) {
376-
return;
361+
let doSync = true;
362+
let numFailures = 0;
363+
const MAX_FAILURES = 5;
364+
type TimeoutValue = ReturnType<typeof setTimeout>;
365+
let activeTimeout: TimeoutValue | null = null;
366+
// TODO: Maybe also pause sync while the window is not active?
367+
368+
export const setSyncActivity = (active: boolean) => {
369+
doSync = active;
370+
if (!active && activeTimeout !== null) {
371+
clearTimeout(activeTimeout);
372+
activeTimeout = null;
373+
} else if (active && activeTimeout === null) {
374+
activeTimeout = setTimeout(
375+
// eslint-disable-next-line @typescript-eslint/no-misused-promises
376+
createOrUpdateDiscourseEmbedding,
377+
100,
378+
);
377379
}
380+
};
381+
382+
export const createOrUpdateDiscourseEmbedding = async (showToast = false) => {
383+
if (!doSync) return;
384+
console.debug("starting createOrUpdateDiscourseEmbedding");
385+
let success = true;
386+
let claimed = false;
387+
const worker = window.roamAlphaAPI.user.uid();
378388

379389
try {
390+
if (!worker) {
391+
throw new FatalError("Unable to obtain user UID.");
392+
}
393+
if (!createClient()) {
394+
// not worth retrying
395+
// TODO: Differentiate setup vs connetion error
396+
throw new FatalError("Could not access supabase.");
397+
}
398+
const supabaseClient = await getLoggedInClient();
399+
if (!supabaseClient) {
400+
// TODO: Distinguish connection vs credentials error
401+
throw new Error("Could not log in to client.");
402+
}
403+
const context = await getSupabaseContext();
404+
if (!context) {
405+
// not worth retrying: setup error
406+
throw new FatalError("Error connecting to client.");
407+
}
408+
const { shouldProceed, lastUpdateTime, nextUpdateTime } =
409+
await proposeSyncTask(worker, supabaseClient, context);
410+
if (!shouldProceed) {
411+
if (nextUpdateTime === undefined) {
412+
throw new Error("Can't obtain sync task");
413+
}
414+
console.debug("postponed to ", nextUpdateTime);
415+
if (doSync) {
416+
activeTimeout = setTimeout(
417+
createOrUpdateDiscourseEmbedding, // eslint-disable-line @typescript-eslint/no-misused-promises
418+
Math.max(0, nextUpdateTime.valueOf() - Date.now()) + 100,
419+
);
420+
}
421+
return;
422+
}
423+
claimed = true;
380424
const allUsers = await getAllUsers();
381-
const time = lastUpdateTime === null ? DEFAULT_TIME : lastUpdateTime;
425+
const time = (lastUpdateTime || DEFAULT_TIME).toISOString();
382426
const { allDgNodeTypes, dgNodeTypesWithSettings } = getDgNodeTypes();
383427

384428
const allNodeInstances = await getAllDiscourseNodesSince(
385429
time,
386430
dgNodeTypesWithSettings,
387431
);
388-
const supabaseClient = await getLoggedInClient();
389-
if (!supabaseClient) return null;
390-
const context = await getSupabaseContext();
391-
if (!context) {
392-
console.error("No Supabase context found.");
393-
await endSyncTask(worker, "failed");
394-
return;
395-
}
396432
await upsertUsers(allUsers, supabaseClient, context);
397433
await upsertNodesToSupabaseAsContentWithEmbeddings(
398434
allNodeInstances,
@@ -407,25 +443,45 @@ export const createOrUpdateDiscourseEmbedding = async () => {
407443
context,
408444
});
409445
await cleanupOrphanedNodes(supabaseClient, context);
410-
await endSyncTask(worker, "complete");
446+
await endSyncTask(worker, "complete", showToast);
411447
} catch (error) {
412448
console.error("createOrUpdateDiscourseEmbedding: Process failed:", error);
413-
await endSyncTask(worker, "failed");
414-
throw error;
449+
success = false;
450+
if (worker && claimed) await endSyncTask(worker, "failed", showToast);
451+
if (error instanceof FatalError) {
452+
doSync = false;
453+
return;
454+
}
455+
}
456+
let timeout = BASE_SYNC_INTERVAL;
457+
if (success) {
458+
numFailures = 0;
459+
} else {
460+
numFailures += 1;
461+
if (numFailures >= MAX_FAILURES) {
462+
doSync = false;
463+
return;
464+
}
465+
const jitter = 0.9 + Math.random() * 0.2; // 0.9x–1.1x
466+
timeout *= 2 ** numFailures * jitter;
467+
}
468+
if (activeTimeout != null) {
469+
clearTimeout(activeTimeout);
470+
activeTimeout = null;
471+
}
472+
if (doSync) {
473+
// eslint-disable-next-line @typescript-eslint/no-misused-promises
474+
activeTimeout = setTimeout(createOrUpdateDiscourseEmbedding, timeout);
415475
}
416476
};
417477

418478
export const initializeSupabaseSync = async () => {
419479
const supabase = createClient();
420-
if (supabase === null) return;
421-
const result = await supabase
422-
.from("Space")
423-
.select()
424-
.eq("url", getRoamUrl())
425-
.maybeSingle();
426-
if (!result.data) {
427-
return;
480+
if (supabase === null) {
481+
doSync = false;
428482
} else {
429-
createOrUpdateDiscourseEmbedding();
483+
doSync = true;
484+
// eslint-disable-next-line @typescript-eslint/no-misused-promises
485+
activeTimeout = setTimeout(createOrUpdateDiscourseEmbedding, 100, true);
430486
}
431487
};

0 commit comments

Comments
 (0)