Skip to content

Commit aa3523a

Browse files
committed
add a version to the JSON output
1 parent 7d342f3 commit aa3523a

4 files changed

Lines changed: 59 additions & 6 deletions

File tree

apps/roam/src/utils/syncDgNodesToSupabase.ts

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ const SYNC_TIMEOUT = "60s"; // must be less than half the SYNC_INTERVAL.
3737
const BATCH_SIZE = 200;
3838
const CONCEPT_BATCH_SIZE = 200;
3939
const DEFAULT_TIME = new Date("1970-01-01");
40+
const END_SYNC_TASK_RESULT_VERSION = 1;
4041

4142
type SyncPhaseDurations = Record<string, number>;
4243

@@ -49,6 +50,7 @@ type SyncTaskInfo = {
4950
type LocalConceptDataInput = Partial<CompositeTypes<"concept_local_input">>;
5051

5152
type EndSyncTaskRpcResult = {
53+
version?: number;
5254
ok: boolean;
5355
stale: boolean;
5456
reason?: string;
@@ -99,15 +101,39 @@ const getSyncWorkerId = (): string => {
99101
return syncWorkerId;
100102
};
101103

102-
const isEndSyncTaskRpcResult = (
104+
const getJsonObject = (
103105
data: Json | undefined,
104-
): data is EndSyncTaskRpcResult => {
106+
): Record<string, unknown> | null => {
105107
if (typeof data !== "object" || data === null || Array.isArray(data)) {
106-
return false;
108+
return null;
109+
}
110+
111+
return data as Record<string, unknown>;
112+
};
113+
114+
const getEndSyncTaskResultVersion = (data: Json | undefined): number => {
115+
const result = getJsonObject(data);
116+
if (result === null || typeof result.version !== "number") {
117+
return 0;
107118
}
108119

109-
const result = data as { ok?: unknown; stale?: unknown };
110-
return typeof result.ok === "boolean" && typeof result.stale === "boolean";
120+
return result.version;
121+
};
122+
123+
const isEndSyncTaskRpcResult = (
124+
data: Json | undefined,
125+
): data is EndSyncTaskRpcResult => {
126+
const result = getJsonObject(data);
127+
if (result === null) return false;
128+
129+
const versionIsSupported =
130+
result.version === undefined || typeof result.version === "number";
131+
132+
return (
133+
versionIsSupported &&
134+
typeof result.ok === "boolean" &&
135+
typeof result.stale === "boolean"
136+
);
111137
};
112138

113139
const measureSyncPhase = async <T>({
@@ -319,6 +345,27 @@ export const endSyncTask = async ({
319345
}
320346

321347
if (!isEndSyncTaskRpcResult(data)) {
348+
const resultVersion = getEndSyncTaskResultVersion(data);
349+
if (resultVersion > END_SYNC_TASK_RESULT_VERSION) {
350+
const rpcResult: EndSyncTaskRpcResult = {
351+
version: resultVersion,
352+
ok: true,
353+
stale: false,
354+
reason: "unsupported_future_result_version",
355+
};
356+
posthog.capture("Sync end task future result", {
357+
...telemetryContext,
358+
endSyncPayload: data,
359+
endSyncResult: rpcResult,
360+
});
361+
362+
return {
363+
ok: true,
364+
stale: false,
365+
rpcResult,
366+
};
367+
}
368+
322369
const reason = "Supabase end_sync_task returned unexpected payload";
323370
notifyEndSyncFailure({
324371
status,

packages/database/doc/sync_functions.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@ When a worker calls `propose_sync_task`, it will receive either:
1616
3. Null, meaning the task was not executed successfully before, and your worker is tasked with starting from scratch.
1717

1818
When a worker finishes the task, it should clean up with `end_sync_task`, giving the same identifying arguments, a status (`complete` or `failed`), and the time at which the task was claimed.
19-
The function returns a JSON object with `ok` and `stale` fields plus task metadata. A stale response means another attempt claimed a newer task while this attempt was still running, so the caller should not report it as a successful completion.
19+
The function returns a versioned JSON object with `ok` and `stale` fields plus task metadata. A stale response means another attempt claimed a newer task while this attempt was still running, so the caller should not report it as a successful completion.

packages/database/supabase/migrations/20260528033000_end_sync_task_result.sql

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ BEGIN
3232
IF t_worker != s_worker AND COALESCE(s_started_at, t_last_task_start) < t_last_task_start THEN
3333
-- we probably took too long. Let the other task have priority.
3434
RETURN jsonb_build_object(
35+
'version', 1,
3536
'ok', false,
3637
'stale', true,
3738
'reason', 'completed_by_newer_task',
@@ -50,6 +51,7 @@ BEGIN
5051
IF t_worker != s_worker THEN
5152
RAISE EXCEPTION 'Wrong worker'
5253
USING DETAIL = jsonb_build_object(
54+
'version', 1,
5355
'requestedStatus', s_status,
5456
'callerWorker', s_worker,
5557
'currentWorker', t_worker,
@@ -83,6 +85,7 @@ BEGIN
8385
WHERE id=t_id;
8486

8587
RETURN jsonb_build_object(
88+
'version', 1,
8689
'ok', true,
8790
'stale', false,
8891
'requestedStatus', s_status,

packages/database/supabase/schemas/sync.sql

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ BEGIN
7373
IF t_worker != s_worker AND COALESCE(s_started_at, t_last_task_start) < t_last_task_start THEN
7474
-- we probably took too long. Let the other task have priority.
7575
RETURN jsonb_build_object(
76+
'version', 1,
7677
'ok', false,
7778
'stale', true,
7879
'reason', 'completed_by_newer_task',
@@ -91,6 +92,7 @@ BEGIN
9192
IF t_worker != s_worker THEN
9293
RAISE EXCEPTION 'Wrong worker'
9394
USING DETAIL = jsonb_build_object(
95+
'version', 1,
9496
'requestedStatus', s_status,
9597
'callerWorker', s_worker,
9698
'currentWorker', t_worker,
@@ -124,6 +126,7 @@ BEGIN
124126
WHERE id=t_id;
125127

126128
RETURN jsonb_build_object(
129+
'version', 1,
127130
'ok', true,
128131
'stale', false,
129132
'requestedStatus', s_status,

0 commit comments

Comments
 (0)