Skip to content

Commit 9a72a28

Browse files
committed
add a version to the JSON output
1 parent c8ecd67 commit 9a72a28

4 files changed

Lines changed: 59 additions & 6 deletions

File tree

apps/roam/src/utils/syncDgNodesToSupabase.ts

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ const BASE_SYNC_INTERVAL = 5 * 60 * 1000; // 5 minutes
4040
const SYNC_TIMEOUT = "60s"; // must be less than half the SYNC_INTERVAL.
4141
const BATCH_SIZE = 200;
4242
const CONCEPT_BATCH_SIZE = 200;
43+
const END_SYNC_TASK_RESULT_VERSION = 1;
4344

4445
type SyncPhaseDurations = Record<string, number>;
4546

@@ -50,6 +51,7 @@ type SyncTaskInfo = {
5051
};
5152

5253
type EndSyncTaskRpcResult = {
54+
version?: number;
5355
ok: boolean;
5456
stale: boolean;
5557
reason?: string;
@@ -100,15 +102,39 @@ const getSyncWorkerId = (): string => {
100102
return syncWorkerId;
101103
};
102104

103-
const isEndSyncTaskRpcResult = (
105+
const getJsonObject = (
104106
data: Json | undefined,
105-
): data is EndSyncTaskRpcResult => {
107+
): Record<string, unknown> | null => {
106108
if (typeof data !== "object" || data === null || Array.isArray(data)) {
107-
return false;
109+
return null;
110+
}
111+
112+
return data as Record<string, unknown>;
113+
};
114+
115+
const getEndSyncTaskResultVersion = (data: Json | undefined): number => {
116+
const result = getJsonObject(data);
117+
if (result === null || typeof result.version !== "number") {
118+
return 0;
108119
}
109120

110-
const result = data as { ok?: unknown; stale?: unknown };
111-
return typeof result.ok === "boolean" && typeof result.stale === "boolean";
121+
return result.version;
122+
};
123+
124+
const isEndSyncTaskRpcResult = (
125+
data: Json | undefined,
126+
): data is EndSyncTaskRpcResult => {
127+
const result = getJsonObject(data);
128+
if (result === null) return false;
129+
130+
const versionIsSupported =
131+
result.version === undefined || typeof result.version === "number";
132+
133+
return (
134+
versionIsSupported &&
135+
typeof result.ok === "boolean" &&
136+
typeof result.stale === "boolean"
137+
);
112138
};
113139

114140
const measureSyncPhase = async <T>({
@@ -320,6 +346,27 @@ export const endSyncTask = async ({
320346
}
321347

322348
if (!isEndSyncTaskRpcResult(data)) {
349+
const resultVersion = getEndSyncTaskResultVersion(data);
350+
if (resultVersion > END_SYNC_TASK_RESULT_VERSION) {
351+
const rpcResult: EndSyncTaskRpcResult = {
352+
version: resultVersion,
353+
ok: true,
354+
stale: false,
355+
reason: "unsupported_future_result_version",
356+
};
357+
posthog.capture("Sync end task future result", {
358+
...telemetryContext,
359+
endSyncPayload: data,
360+
endSyncResult: rpcResult,
361+
});
362+
363+
return {
364+
ok: true,
365+
stale: false,
366+
rpcResult,
367+
};
368+
}
369+
323370
const reason = "Supabase end_sync_task returned unexpected payload";
324371
notifyEndSyncFailure({
325372
status,

packages/database/doc/sync_functions.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@ When a worker calls `propose_sync_task`, it will receive either:
1616
3. Null, meaning the task was not executed successfully before, and your worker is tasked with starting from scratch.
1717

1818
When a worker finishes the task, it should clean up with `end_sync_task`, giving the same identifying arguments, a status (`complete` or `failed`), and the time at which the task was claimed.
19-
The function returns a JSON object with `ok` and `stale` fields plus task metadata. A stale response means another attempt claimed a newer task while this attempt was still running, so the caller should not report it as a successful completion.
19+
The function returns a versioned JSON object with `ok` and `stale` fields plus task metadata. A stale response means another attempt claimed a newer task while this attempt was still running, so the caller should not report it as a successful completion.

packages/database/supabase/migrations/20260528033000_end_sync_task_result.sql

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ BEGIN
3232
IF t_worker != s_worker AND COALESCE(s_started_at, t_last_task_start) < t_last_task_start THEN
3333
-- we probably took too long. Let the other task have priority.
3434
RETURN jsonb_build_object(
35+
'version', 1,
3536
'ok', false,
3637
'stale', true,
3738
'reason', 'completed_by_newer_task',
@@ -50,6 +51,7 @@ BEGIN
5051
IF t_worker != s_worker THEN
5152
RAISE EXCEPTION 'Wrong worker'
5253
USING DETAIL = jsonb_build_object(
54+
'version', 1,
5355
'requestedStatus', s_status,
5456
'callerWorker', s_worker,
5557
'currentWorker', t_worker,
@@ -83,6 +85,7 @@ BEGIN
8385
WHERE id=t_id;
8486

8587
RETURN jsonb_build_object(
88+
'version', 1,
8689
'ok', true,
8790
'stale', false,
8891
'requestedStatus', s_status,

packages/database/supabase/schemas/sync.sql

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ BEGIN
7373
IF t_worker != s_worker AND COALESCE(s_started_at, t_last_task_start) < t_last_task_start THEN
7474
-- we probably took too long. Let the other task have priority.
7575
RETURN jsonb_build_object(
76+
'version', 1,
7677
'ok', false,
7778
'stale', true,
7879
'reason', 'completed_by_newer_task',
@@ -91,6 +92,7 @@ BEGIN
9192
IF t_worker != s_worker THEN
9293
RAISE EXCEPTION 'Wrong worker'
9394
USING DETAIL = jsonb_build_object(
95+
'version', 1,
9496
'requestedStatus', s_status,
9597
'callerWorker', s_worker,
9698
'currentWorker', t_worker,
@@ -124,6 +126,7 @@ BEGIN
124126
WHERE id=t_id;
125127

126128
RETURN jsonb_build_object(
129+
'version', 1,
127130
'ok', true,
128131
'stale', false,
129132
'requestedStatus', s_status,

0 commit comments

Comments
 (0)