From 4102b035abd74b38f2819ab99561a71531a252f3 Mon Sep 17 00:00:00 2001 From: Michael Gartner Date: Wed, 27 May 2026 22:05:52 -0600 Subject: [PATCH 1/6] Implement end_sync_task function with JSON response and telemetry context - Refactor end_sync_task to return a JSON object containing task status and metadata. - Enhance error handling to include telemetry context for better debugging. - Update related API routes and database schema to accommodate new return type. - Document changes in sync_functions.md to reflect the new response structure. --- apps/roam/src/utils/syncDgNodesToSupabase.ts | 469 ++++++++++++++++-- .../sync-task/[fn]/[target]/[worker]/route.ts | 17 +- packages/database/doc/sync_functions.md | 3 +- packages/database/src/dbTypes.ts | 2 +- .../20260528033000_end_sync_task_result.sql | 129 +++++ packages/database/supabase/schemas/sync.sql | 57 ++- 6 files changed, 610 insertions(+), 67 deletions(-) create mode 100644 packages/database/supabase/migrations/20260528033000_end_sync_task_result.sql diff --git a/apps/roam/src/utils/syncDgNodesToSupabase.ts b/apps/roam/src/utils/syncDgNodesToSupabase.ts index c06b010ff..dbdc10932 100644 --- a/apps/roam/src/utils/syncDgNodesToSupabase.ts +++ b/apps/roam/src/utils/syncDgNodesToSupabase.ts @@ -30,6 +30,7 @@ import type { LocalContentDataInput, LocalAccountDataInput, } from "@repo/database/inputTypes"; +import type { Properties } from "posthog-js"; const SYNC_FUNCTION = "embedding"; // Minimal interval between syncs of all clients for this task. @@ -40,12 +41,141 @@ const SYNC_TIMEOUT = "60s"; // must be less than half the SYNC_INTERVAL. const BATCH_SIZE = 200; const CONCEPT_BATCH_SIZE = 200; +type SyncPhaseDurations = Record; + type SyncTaskInfo = { lastUpdateTime?: Date; nextUpdateTime?: Date; shouldProceed: boolean; }; +type EndSyncTaskRpcResult = { + ok?: boolean; + stale?: boolean; + reason?: string; + requestedStatus?: string; + callerWorker?: string; + currentWorker?: string; + currentStatus?: string; + callerStartedAt?: string; + currentStartedAt?: string; + lastTaskEnd?: string; + lastSuccessStart?: string; + taskTimesOutAt?: string; + failureCount?: number; +}; + +type EndSyncTaskResult = + | { + ok: true; + stale: false; + rpcResult?: EndSyncTaskRpcResult; + } + | { + ok: false; + stale: true; + rpcResult?: EndSyncTaskRpcResult; + } + | { + ok: false; + stale: false; + error: Error; + rpcResult?: EndSyncTaskRpcResult; + }; + +let syncWorkerId: string | null = null; + +const createRuntimeId = (): string => { + if (typeof crypto !== "undefined" && typeof crypto.randomUUID === "function") + return crypto.randomUUID(); + + return `${Date.now()}-${Math.random().toString(36).slice(2)}`; +}; + +const getSyncWorkerId = (): string => { + if (syncWorkerId === null) { + syncWorkerId = `roam-${createRuntimeId()}`; + } + + return syncWorkerId; +}; + +const isEndSyncTaskRpcResult = ( + data: Json | undefined, +): data is EndSyncTaskRpcResult => { + return typeof data === "object" && data !== null && !Array.isArray(data); +}; + +const measureSyncPhase = async ({ + phase, + phases, + operation, +}: { + phase: string; + phases: SyncPhaseDurations; + operation: () => Promise; +}): Promise => { + const phaseStart = performance.now(); + try { + return await operation(); + } finally { + phases[phase] = Math.round(performance.now() - phaseStart); + } +}; + +const syncTelemetryContext = ({ + attemptId, + worker, + userUid, + context, + startTime, + claimed, + phases, + status, + reason, + nextUpdateTime, + lastUpdateTime, + endSyncResult, +}: { + attemptId: string; + worker: string; + userUid: string; + context: SupabaseContext | null; + startTime: Date; + claimed: boolean; + phases: SyncPhaseDurations; + status: string; + reason?: string; + nextUpdateTime?: Date; + lastUpdateTime?: Date; + endSyncResult?: EndSyncTaskRpcResult; +}): Properties => { + const duration = (Date.now() - startTime.valueOf()) / 1000.0; + const phaseProperties = Object.fromEntries( + Object.entries(phases).map(([phase, durationMs]) => [ + `phase_${phase}_ms`, + durationMs, + ]), + ); + + return { + syncAttemptId: attemptId, + syncWorkerId: worker, + syncFunction: SYNC_FUNCTION, + syncUserUid: userUid, + claimed, + status, + duration, + reason, + spaceId: context?.spaceId, + startedAt: startTime.toISOString(), + nextUpdateTime: nextUpdateTime?.toISOString(), + lastUpdateTime: lastUpdateTime?.toISOString(), + endSyncResult, + ...phaseProperties, + }; +}; + const chunk = (array: T[], size: number): T[][] => { const chunks: T[][] = []; for (let i = 0; i < array.length; i += size) { @@ -103,10 +233,12 @@ const notifyEndSyncFailure = ({ status, showToast, reason, + context, }: { status: Enums<"task_status">; showToast: boolean; reason: string; + context?: Properties; }): void => { if (showToast) { renderToast({ @@ -120,7 +252,7 @@ const notifyEndSyncFailure = ({ internalError({ error: new Error(reason), type: "Sync Failed", - context: { status }, + context: { status, ...(context || {}) }, }); }; @@ -129,22 +261,32 @@ export const endSyncTask = async ({ status, showToast = false, startTime, + context, + supabaseClient, + telemetryContext, }: { worker: string; status: Enums<"task_status">; showToast: boolean; startTime: Date; -}): Promise => { + context?: SupabaseContext; + supabaseClient?: DGSupabaseClient; + telemetryContext?: Properties; +}): Promise => { try { - const supabaseClient = await getLoggedInClient(); - if (!supabaseClient) return; - const context = await getSupabaseContext(); - if (!context) { + const resolvedClient = supabaseClient || (await getLoggedInClient()); + if (!resolvedClient) { + const error = new Error("Missing Supabase client while ending sync task"); + return { ok: false, stale: false, error }; + } + const resolvedContext = context || (await getSupabaseContext()); + if (!resolvedContext) { console.error("endSyncTask: Unable to obtain Supabase context."); - return; + const error = new Error("Unable to obtain Supabase context"); + return { ok: false, stale: false, error }; } - const { error } = await supabaseClient.rpc("end_sync_task", { - s_target: context.spaceId, + const { data, error } = await resolvedClient.rpc("end_sync_task", { + s_target: resolvedContext.spaceId, s_function: SYNC_FUNCTION, s_worker: worker, s_status: status, @@ -152,14 +294,62 @@ export const endSyncTask = async ({ }); if (error) { console.error("endSyncTask: Error calling end_sync_task:", error); + const reason = `Supabase end_sync_task RPC failed: ${error.message ?? "Unknown error"}`; notifyEndSyncFailure({ status, showToast, - reason: `Supabase end_sync_task RPC failed: ${error.message ?? "Unknown error"}`, + reason, + context: { + ...telemetryContext, + endSyncErrorCode: error.code, + endSyncErrorDetails: error.details, + endSyncErrorHint: error.hint, + }, }); - return; - } else if (showToast) { + return { + ok: false, + stale: false, + error: new Error(reason), + }; + } + + const rpcResult = isEndSyncTaskRpcResult(data) ? data : undefined; + if (rpcResult?.stale === true) { + posthog.capture("Sync end task stale", { + ...telemetryContext, + endSyncResult: rpcResult, + }); + + return { + ok: false, + stale: true, + rpcResult, + }; + } + + if (rpcResult?.ok === false) { + const reason = + rpcResult.reason || "Supabase end_sync_task returned failure"; + notifyEndSyncFailure({ + status, + showToast, + reason, + context: { + ...telemetryContext, + endSyncResult: rpcResult, + }, + }); + + return { + ok: false, + stale: false, + error: new Error(reason), + rpcResult, + }; + } + + if (showToast) { if (status === "failed") { renderToast({ id: "discourse-embedding-failed", @@ -169,16 +359,26 @@ export const endSyncTask = async ({ }); } } + + return { ok: true, stale: false, rpcResult }; } catch (error) { console.error("endSyncTask: Error calling end_sync_task:", error); + const reason = + error instanceof Error + ? `Unexpected error ending sync task: ${error.message}` + : "Unexpected non-error thrown while ending sync task"; notifyEndSyncFailure({ status, showToast, - reason: - error instanceof Error - ? `Unexpected error ending sync task: ${error.message}` - : "Unexpected non-error thrown while ending sync task", + reason, + context: telemetryContext, }); + + return { + ok: false, + stale: false, + error: error instanceof Error ? error : new Error(reason), + }; } }; @@ -199,7 +399,7 @@ export const proposeSyncTask = async ( if (error) { console.error( - `proposeSyncTask: propose_sync_task failed – ${error.message}`, + `proposeSyncTask: propose_sync_task failed - ${error.message}`, ); return { shouldProceed: false }; } @@ -497,37 +697,97 @@ const getAllMissingOrNewDiscourseNodes = async ({ ]; }; -export const createOrUpdateDiscourseEmbedding = async (showToast = false) => { +export const createOrUpdateDiscourseEmbedding = async ( + showToast = false, +): Promise => { if (!doSync) return; console.debug("starting createOrUpdateDiscourseEmbedding"); const startTime = new Date(); + const attemptId = createRuntimeId(); + const phases: SyncPhaseDurations = {}; let success = true; let claimed = false; const isInitialSync = initialSync; // record state at start - const worker = window.roamAlphaAPI.user.uid(); + let context: SupabaseContext | null = null; + let supabaseClient: DGSupabaseClient | null = null; + let userUid = ""; + const worker = getSyncWorkerId(); + + const buildTelemetry = ({ + status, + reason, + nextUpdateTime, + lastUpdateTime, + endSyncResult, + }: { + status: string; + reason?: string; + nextUpdateTime?: Date; + lastUpdateTime?: Date; + endSyncResult?: EndSyncTaskRpcResult; + }): Properties => + syncTelemetryContext({ + attemptId, + worker, + userUid, + context, + startTime, + claimed, + phases, + status, + reason, + nextUpdateTime, + lastUpdateTime, + endSyncResult, + }); try { - if (!worker) { + const resolvedUserUid = window.roamAlphaAPI.user.uid(); + if (!resolvedUserUid) { throw new FatalError("Unable to obtain user UID."); } + userUid = resolvedUserUid; - const supabaseClient = await getLoggedInClient(); + supabaseClient = await measureSyncPhase({ + phase: "getLoggedInClient", + phases, + operation: getLoggedInClient, + }); if (!supabaseClient) { // TODO: Distinguish connection vs credentials error throw new Error("Could not log in to client."); } - const context = await getSupabaseContext(); + context = await measureSyncPhase({ + phase: "getSupabaseContext", + phases, + operation: getSupabaseContext, + }); if (!context) { // not worth retrying: setup error throw new FatalError("Error connecting to client."); } + const activeSupabaseClient = supabaseClient; + const activeContext = context; const { shouldProceed, lastUpdateTime, nextUpdateTime } = - await proposeSyncTask(worker, supabaseClient, context); + await measureSyncPhase({ + phase: "proposeSyncTask", + phases, + operation: () => + proposeSyncTask(worker, activeSupabaseClient, activeContext), + }); if (!shouldProceed) { if (nextUpdateTime === undefined) { throw new Error("Can't obtain sync task"); } console.debug("postponed to ", nextUpdateTime); + posthog.capture( + "Sync postponed", + buildTelemetry({ + status: "postponed", + nextUpdateTime, + lastUpdateTime, + }), + ); if (doSync) { activeTimeout = setTimeout( createOrUpdateDiscourseEmbedding, // eslint-disable-line @typescript-eslint/no-misused-promises @@ -539,7 +799,11 @@ export const createOrUpdateDiscourseEmbedding = async (showToast = false) => { return; } claimed = true; - const allUsers = await getAllUsers(); + const allUsers = await measureSyncPhase({ + phase: "getAllUsers", + phases, + operation: getAllUsers, + }); const sinceTime = lastUpdateTime ? lastUpdateTime.valueOf() - 1000 // add a one-second buffer : undefined; @@ -547,39 +811,138 @@ export const createOrUpdateDiscourseEmbedding = async (showToast = false) => { (n) => n.backedBy === "user", ); - const allNodeInstances = isInitialSync - ? await getAllMissingOrNewDiscourseNodes({ - supabaseClient, - spaceId: context.spaceId, + const allNodeInstances = await measureSyncPhase({ + phase: isInitialSync + ? "getAllMissingOrNewDiscourseNodes" + : "getAllDiscourseNodesSince", + phases, + operation: () => + isInitialSync + ? getAllMissingOrNewDiscourseNodes({ + supabaseClient: activeSupabaseClient, + spaceId: activeContext.spaceId, + since: sinceTime, + nodeTypes: allDgNodeTypes, + }) + : getAllDiscourseNodesSince(sinceTime, allDgNodeTypes), + }); + await measureSyncPhase({ + phase: "upsertUsers", + phases, + operation: () => + upsertUsers(allUsers, activeSupabaseClient, activeContext), + }); + await measureSyncPhase({ + phase: "upsertNodes", + phases, + operation: () => + upsertNodesToSupabaseAsContentWithEmbeddings( + allNodeInstances, + activeSupabaseClient, + activeContext, + ), + }); + await measureSyncPhase({ + phase: "convertConcepts", + phases, + operation: () => + convertDgToSupabaseConcepts({ + nodesSince: allNodeInstances, since: sinceTime, - nodeTypes: allDgNodeTypes, - }) - : await getAllDiscourseNodesSince(sinceTime, allDgNodeTypes); - await upsertUsers(allUsers, supabaseClient, context); - await upsertNodesToSupabaseAsContentWithEmbeddings( - allNodeInstances, - supabaseClient, - context, - ); - await convertDgToSupabaseConcepts({ - nodesSince: allNodeInstances, - since: sinceTime, - allNodeTypes: allDgNodeTypes, - supabaseClient, - context, + allNodeTypes: allDgNodeTypes, + supabaseClient: activeSupabaseClient, + context: activeContext, + }), + }); + await measureSyncPhase({ + phase: "cleanupOrphanedNodes", + phases, + operation: () => + cleanupOrphanedNodes(activeSupabaseClient, activeContext), + }); + const completeEndResult = await measureSyncPhase({ + phase: "endSyncTask", + phases, + operation: () => + endSyncTask({ + worker, + status: "complete", + showToast, + startTime, + context: activeContext, + supabaseClient: activeSupabaseClient, + telemetryContext: buildTelemetry({ + status: "complete", + lastUpdateTime, + }), + }), }); - await cleanupOrphanedNodes(supabaseClient, context); - await endSyncTask({ worker, status: "complete", showToast, startTime }); - initialSync = false; - const duration = (new Date().valueOf() - startTime.valueOf()) / 1000.0; - posthog.capture("Sync complete", { duration }); + + if (completeEndResult.ok) { + initialSync = false; + posthog.capture( + "Sync complete", + buildTelemetry({ + status: "complete", + lastUpdateTime, + endSyncResult: completeEndResult.rpcResult, + }), + ); + } else if (completeEndResult.stale) { + success = false; + posthog.capture( + "Sync stale", + buildTelemetry({ + status: "stale", + reason: + completeEndResult.rpcResult?.reason || + "end_sync_task completed by newer sync task", + lastUpdateTime, + endSyncResult: completeEndResult.rpcResult, + }), + ); + } else { + success = false; + posthog.capture( + "Sync error", + buildTelemetry({ + status: "end_task_failed", + reason: completeEndResult.error.message, + lastUpdateTime, + endSyncResult: completeEndResult.rpcResult, + }), + ); + } } catch (error) { console.error("createOrUpdateDiscourseEmbedding: Process failed:", error); success = false; - if (worker && claimed) - await endSyncTask({ worker, status: "failed", showToast, startTime }); - const duration = (new Date().valueOf() - startTime.valueOf()) / 1000.0; - posthog.capture("Sync error", { duration }); + const reason = + error instanceof Error ? error.message : "Unknown sync error"; + let failedEndResult: EndSyncTaskResult | undefined; + if (claimed) { + failedEndResult = await measureSyncPhase({ + phase: "endSyncTaskFailed", + phases, + operation: () => + endSyncTask({ + worker, + status: "failed", + showToast, + startTime, + context: context || undefined, + supabaseClient: supabaseClient || undefined, + telemetryContext: buildTelemetry({ status: "failed", reason }), + }), + }); + } + posthog.capture( + "Sync error", + buildTelemetry({ + status: error instanceof FatalError ? "fatal" : "failed", + reason, + endSyncResult: failedEndResult?.rpcResult, + }), + ); if (error instanceof FatalError) { doSync = false; return; @@ -594,7 +957,7 @@ export const createOrUpdateDiscourseEmbedding = async (showToast = false) => { doSync = false; return; } - const jitter = 0.9 + Math.random() * 0.2; // 0.9x–1.1x + const jitter = 0.9 + Math.random() * 0.2; // 0.9x-1.1x timeout *= 2 ** numFailures * jitter; } if (activeTimeout != null) { diff --git a/apps/website/app/api/supabase/sync-task/[fn]/[target]/[worker]/route.ts b/apps/website/app/api/supabase/sync-task/[fn]/[target]/[worker]/route.ts index c61865f1f..0cd41bd13 100644 --- a/apps/website/app/api/supabase/sync-task/[fn]/[target]/[worker]/route.ts +++ b/apps/website/app/api/supabase/sync-task/[fn]/[target]/[worker]/route.ts @@ -1,5 +1,4 @@ import { NextResponse, NextRequest } from "next/server"; -import { PostgrestSingleResponse } from "@supabase/supabase-js"; import { type Database, Constants } from "@repo/database/dbTypes"; import { asPostgrestFailure } from "@repo/database/lib/contextFunctions"; import { createClient } from "~/utils/supabase/server"; @@ -22,7 +21,14 @@ export const POST = async ( asPostgrestFailure(`${target} is not a number`, "type"), ); } - const infoS: string = await request.json(); + const infoRaw: unknown = await request.json(); + if (typeof infoRaw !== "string") { + return createApiResponse( + request, + asPostgrestFailure("Request body is not a task status", "type"), + ); + } + const infoS = infoRaw; if ( !(Constants.public.Enums.task_status as readonly string[]).includes(infoS) ) { @@ -33,15 +39,14 @@ export const POST = async ( } const info = infoS as Database["public"]["Enums"]["task_status"]; const supabase = await createClient(); - const response = (await supabase.rpc("end_sync_task", { + const response = await supabase.rpc("end_sync_task", { s_target: targetN, s_function: fn, s_worker: worker, s_status: info, - })) as PostgrestSingleResponse; - // Transform 204 No Content to 200 OK with success indicator for API consistency + }); if (response.status === 204) { - response.data = true; + response.data = { ok: true, stale: false }; response.status = 200; } diff --git a/packages/database/doc/sync_functions.md b/packages/database/doc/sync_functions.md index 20b280d80..3fa23bc03 100644 --- a/packages/database/doc/sync_functions.md +++ b/packages/database/doc/sync_functions.md @@ -15,4 +15,5 @@ When a worker calls `propose_sync_task`, it will receive either: 2. a timestamp in the past, which is also the last time the task was executed successfully. Your worker can ask the platform for all changes posterior to that time. (This will only be reliable if you make always make your queries after calling this function!) 3. Null, meaning the task was not executed successfully before, and your worker is tasked with starting from scratch. -When a worker finishes the task, it should clean up with `end_sync_task`, giving the same identifying arguments and a status ('complete' or 'failed'.) +When a worker finishes the task, it should clean up with `end_sync_task`, giving the same identifying arguments and a status (`complete` or `failed`). +The function returns a JSON object with `ok` and `stale` fields plus task metadata. A stale response means another attempt claimed a newer task while this attempt was still running, so the caller should not report it as a successful completion. diff --git a/packages/database/src/dbTypes.ts b/packages/database/src/dbTypes.ts index 06c723a94..48342b32c 100644 --- a/packages/database/src/dbTypes.ts +++ b/packages/database/src/dbTypes.ts @@ -1659,7 +1659,7 @@ export type Database = { s_target: number s_worker: string } - Returns: undefined + Returns: Json } extract_references: { Args: { refs: Json }; Returns: number[] } file_access: { Args: { hashvalue: string }; Returns: boolean } diff --git a/packages/database/supabase/migrations/20260528033000_end_sync_task_result.sql b/packages/database/supabase/migrations/20260528033000_end_sync_task_result.sql new file mode 100644 index 000000000..0e413630a --- /dev/null +++ b/packages/database/supabase/migrations/20260528033000_end_sync_task_result.sql @@ -0,0 +1,129 @@ +DROP FUNCTION IF EXISTS public.end_sync_task( + s_target bigint, + s_function character varying, + s_worker character varying, + s_status public.task_status, + s_started_at timestamptz); + +CREATE OR REPLACE FUNCTION public.end_sync_task( + s_target bigint, + s_function character varying, + s_worker character varying, + s_status public.task_status, + s_started_at timestamptz = NULL +) RETURNS jsonb +SET search_path = '' +LANGUAGE plpgsql +AS $$ +DECLARE t_id INTEGER; +DECLARE t_worker varchar; +DECLARE t_status public.task_status; +DECLARE t_failure_count SMALLINT; +DECLARE t_last_task_start TIMESTAMP WITH TIME ZONE; +DECLARE t_last_success_start TIMESTAMP WITH TIME ZONE; +DECLARE t_last_task_end TIMESTAMP WITH TIME ZONE; +DECLARE t_task_times_out_at TIMESTAMP WITH TIME ZONE; +BEGIN + SELECT id, worker, status, failure_count, last_task_start, last_task_end, last_success_start, task_times_out_at + INTO STRICT t_id, t_worker, t_status, t_failure_count, t_last_task_start, t_last_task_end, t_last_success_start, t_task_times_out_at + FROM public.sync_info WHERE sync_target = s_target AND sync_function = s_function; + ASSERT s_status > 'active'; + IF COALESCE(s_started_at, t_last_task_start) < t_last_task_start THEN + -- we probably took too long. Let the other task have priority. + RETURN jsonb_build_object( + 'ok', false, + 'stale', true, + 'reason', 'completed_by_newer_task', + 'requestedStatus', s_status, + 'callerWorker', s_worker, + 'currentWorker', t_worker, + 'currentStatus', t_status, + 'callerStartedAt', s_started_at, + 'currentStartedAt', t_last_task_start, + 'lastTaskEnd', t_last_task_end, + 'lastSuccessStart', t_last_success_start, + 'taskTimesOutAt', t_task_times_out_at, + 'failureCount', t_failure_count + ); + END IF; + IF t_worker != s_worker THEN + RAISE EXCEPTION 'Wrong worker' + USING DETAIL = jsonb_build_object( + 'requestedStatus', s_status, + 'callerWorker', s_worker, + 'currentWorker', t_worker, + 'currentStatus', t_status, + 'callerStartedAt', s_started_at, + 'currentStartedAt', t_last_task_start, + 'lastTaskEnd', t_last_task_end, + 'lastSuccessStart', t_last_success_start, + 'taskTimesOutAt', t_task_times_out_at, + 'failureCount', t_failure_count + )::text; + END IF; + ASSERT s_status >= t_status, 'do not go back in status'; + IF s_status = 'complete' THEN + t_last_task_end := now(); + t_last_success_start := t_last_task_start; + t_failure_count := 0; + ELSE + IF t_status != s_status THEN + t_failure_count := t_failure_count + 1; + END IF; + END IF; + + t_task_times_out_at := NULL; + UPDATE public.sync_info + SET status = s_status, + task_times_out_at=null, + last_task_end=t_last_task_end, + last_success_start=t_last_success_start, + failure_count=t_failure_count + WHERE id=t_id; + + RETURN jsonb_build_object( + 'ok', true, + 'stale', false, + 'requestedStatus', s_status, + 'callerWorker', s_worker, + 'currentWorker', t_worker, + 'currentStatus', t_status, + 'callerStartedAt', s_started_at, + 'currentStartedAt', t_last_task_start, + 'lastTaskEnd', t_last_task_end, + 'lastSuccessStart', t_last_success_start, + 'taskTimesOutAt', t_task_times_out_at, + 'failureCount', t_failure_count + ); +END; +$$; + +ALTER FUNCTION public.end_sync_task( + s_target bigint, + s_function character varying, + s_worker character varying, + s_status public.task_status, + s_started_at timestamptz +) OWNER TO "postgres"; + +GRANT ALL ON FUNCTION public.end_sync_task( + s_target bigint, + s_function character varying, + s_worker character varying, + s_status public.task_status, + s_started_at timestamptz +) TO "anon"; +GRANT ALL ON FUNCTION public.end_sync_task( + s_target bigint, + s_function character varying, + s_worker character varying, + s_status public.task_status, + s_started_at timestamptz +) TO "authenticated"; +GRANT ALL ON FUNCTION public.end_sync_task( + s_target bigint, + s_function character varying, + s_worker character varying, + s_status public.task_status, + s_started_at timestamptz +) TO "service_role"; diff --git a/packages/database/supabase/schemas/sync.sql b/packages/database/supabase/schemas/sync.sql index 10e73a91d..c05df02ee 100644 --- a/packages/database/supabase/schemas/sync.sql +++ b/packages/database/supabase/schemas/sync.sql @@ -52,7 +52,7 @@ CREATE OR REPLACE FUNCTION public.end_sync_task( s_worker character varying, s_status public.task_status, s_started_at timestamptz = NULL -) RETURNS void +) RETURNS jsonb SET search_path = '' LANGUAGE plpgsql AS $$ @@ -63,16 +63,45 @@ DECLARE t_failure_count SMALLINT; DECLARE t_last_task_start TIMESTAMP WITH TIME ZONE; DECLARE t_last_success_start TIMESTAMP WITH TIME ZONE; DECLARE t_last_task_end TIMESTAMP WITH TIME ZONE; +DECLARE t_task_times_out_at TIMESTAMP WITH TIME ZONE; BEGIN - SELECT id, worker, status, failure_count, last_task_start, last_task_end, last_success_start - INTO STRICT t_id, t_worker, t_status, t_failure_count, t_last_task_start, t_last_task_end, t_last_success_start + SELECT id, worker, status, failure_count, last_task_start, last_task_end, last_success_start, task_times_out_at + INTO STRICT t_id, t_worker, t_status, t_failure_count, t_last_task_start, t_last_task_end, t_last_success_start, t_task_times_out_at FROM public.sync_info WHERE sync_target = s_target AND sync_function = s_function; ASSERT s_status > 'active'; - IF t_worker != s_worker AND COALESCE(s_started_at, t_last_task_start) < t_last_task_start THEN + IF COALESCE(s_started_at, t_last_task_start) < t_last_task_start THEN -- we probably took too long. Let the other task have priority. - RETURN; + RETURN jsonb_build_object( + 'ok', false, + 'stale', true, + 'reason', 'completed_by_newer_task', + 'requestedStatus', s_status, + 'callerWorker', s_worker, + 'currentWorker', t_worker, + 'currentStatus', t_status, + 'callerStartedAt', s_started_at, + 'currentStartedAt', t_last_task_start, + 'lastTaskEnd', t_last_task_end, + 'lastSuccessStart', t_last_success_start, + 'taskTimesOutAt', t_task_times_out_at, + 'failureCount', t_failure_count + ); + END IF; + IF t_worker != s_worker THEN + RAISE EXCEPTION 'Wrong worker' + USING DETAIL = jsonb_build_object( + 'requestedStatus', s_status, + 'callerWorker', s_worker, + 'currentWorker', t_worker, + 'currentStatus', t_status, + 'callerStartedAt', s_started_at, + 'currentStartedAt', t_last_task_start, + 'lastTaskEnd', t_last_task_end, + 'lastSuccessStart', t_last_success_start, + 'taskTimesOutAt', t_task_times_out_at, + 'failureCount', t_failure_count + )::text; END IF; - ASSERT t_worker = s_worker, 'Wrong worker'; ASSERT s_status >= t_status, 'do not go back in status'; IF s_status = 'complete' THEN t_last_task_end := now(); @@ -84,6 +113,7 @@ BEGIN END IF; END IF; + t_task_times_out_at := NULL; UPDATE public.sync_info SET status = s_status, task_times_out_at=null, @@ -91,6 +121,21 @@ BEGIN last_success_start=t_last_success_start, failure_count=t_failure_count WHERE id=t_id; + + RETURN jsonb_build_object( + 'ok', true, + 'stale', false, + 'requestedStatus', s_status, + 'callerWorker', s_worker, + 'currentWorker', t_worker, + 'currentStatus', t_status, + 'callerStartedAt', s_started_at, + 'currentStartedAt', t_last_task_start, + 'lastTaskEnd', t_last_task_end, + 'lastSuccessStart', t_last_success_start, + 'taskTimesOutAt', t_task_times_out_at, + 'failureCount', t_failure_count + ); END; $$; From ece01847857ec9dafb4c534c735abb7fe2b3b858 Mon Sep 17 00:00:00 2001 From: Michael Gartner Date: Wed, 27 May 2026 22:38:41 -0600 Subject: [PATCH 2/6] Refactor sync task handling to include claimed timestamp and improve error responses - Update EndSyncTaskRpcResult to require 'ok' and 'stale' fields. - Enhance isEndSyncTaskRpcResult function to validate response structure. - Modify endSyncTask function to handle claimed timestamps and improve error handling. - Update API route to parse and validate request body for task status and timestamps. - Document changes in sync_functions.md to reflect new requirements for end_sync_task. --- apps/roam/src/utils/syncDgNodesToSupabase.ts | 49 ++++++++--- .../sync-task/[fn]/[target]/[worker]/route.ts | 84 +++++++++++++++---- packages/database/doc/sync_functions.md | 2 +- .../20260528033000_end_sync_task_result.sql | 3 +- packages/database/supabase/schemas/sync.sql | 3 +- 5 files changed, 113 insertions(+), 28 deletions(-) diff --git a/apps/roam/src/utils/syncDgNodesToSupabase.ts b/apps/roam/src/utils/syncDgNodesToSupabase.ts index dbdc10932..48259f496 100644 --- a/apps/roam/src/utils/syncDgNodesToSupabase.ts +++ b/apps/roam/src/utils/syncDgNodesToSupabase.ts @@ -50,8 +50,8 @@ type SyncTaskInfo = { }; type EndSyncTaskRpcResult = { - ok?: boolean; - stale?: boolean; + ok: boolean; + stale: boolean; reason?: string; requestedStatus?: string; callerWorker?: string; @@ -103,7 +103,12 @@ const getSyncWorkerId = (): string => { const isEndSyncTaskRpcResult = ( data: Json | undefined, ): data is EndSyncTaskRpcResult => { - return typeof data === "object" && data !== null && !Array.isArray(data); + if (typeof data !== "object" || data === null || Array.isArray(data)) { + return false; + } + + const result = data as { ok?: unknown; stale?: unknown }; + return typeof result.ok === "boolean" && typeof result.stale === "boolean"; }; const measureSyncPhase = async ({ @@ -260,7 +265,7 @@ export const endSyncTask = async ({ worker, status, showToast = false, - startTime, + taskStartedAt, context, supabaseClient, telemetryContext, @@ -268,7 +273,7 @@ export const endSyncTask = async ({ worker: string; status: Enums<"task_status">; showToast: boolean; - startTime: Date; + taskStartedAt: Date; context?: SupabaseContext; supabaseClient?: DGSupabaseClient; telemetryContext?: Properties; @@ -290,7 +295,7 @@ export const endSyncTask = async ({ s_function: SYNC_FUNCTION, s_worker: worker, s_status: status, - s_started_at: startTime.toISOString(), + s_started_at: taskStartedAt.toISOString(), }); if (error) { console.error("endSyncTask: Error calling end_sync_task:", error); @@ -314,7 +319,26 @@ export const endSyncTask = async ({ }; } - const rpcResult = isEndSyncTaskRpcResult(data) ? data : undefined; + if (!isEndSyncTaskRpcResult(data)) { + const reason = "Supabase end_sync_task returned unexpected payload"; + notifyEndSyncFailure({ + status, + showToast, + reason, + context: { + ...telemetryContext, + endSyncPayload: data, + }, + }); + + return { + ok: false, + stale: false, + error: new Error(reason), + }; + } + + const rpcResult = data; if (rpcResult?.stale === true) { posthog.capture("Sync end task stale", { ...telemetryContext, @@ -708,6 +732,7 @@ export const createOrUpdateDiscourseEmbedding = async ( let success = true; let claimed = false; const isInitialSync = initialSync; // record state at start + let claimedAt: Date | null = null; let context: SupabaseContext | null = null; let supabaseClient: DGSupabaseClient | null = null; let userUid = ""; @@ -799,6 +824,8 @@ export const createOrUpdateDiscourseEmbedding = async ( return; } claimed = true; + const activeClaimedAt = new Date(); + claimedAt = activeClaimedAt; const allUsers = await measureSyncPhase({ phase: "getAllUsers", phases, @@ -868,7 +895,7 @@ export const createOrUpdateDiscourseEmbedding = async ( worker, status: "complete", showToast, - startTime, + taskStartedAt: activeClaimedAt, context: activeContext, supabaseClient: activeSupabaseClient, telemetryContext: buildTelemetry({ @@ -889,7 +916,6 @@ export const createOrUpdateDiscourseEmbedding = async ( }), ); } else if (completeEndResult.stale) { - success = false; posthog.capture( "Sync stale", buildTelemetry({ @@ -919,7 +945,8 @@ export const createOrUpdateDiscourseEmbedding = async ( const reason = error instanceof Error ? error.message : "Unknown sync error"; let failedEndResult: EndSyncTaskResult | undefined; - if (claimed) { + const failedClaimedAt = claimedAt; + if (failedClaimedAt !== null) { failedEndResult = await measureSyncPhase({ phase: "endSyncTaskFailed", phases, @@ -928,7 +955,7 @@ export const createOrUpdateDiscourseEmbedding = async ( worker, status: "failed", showToast, - startTime, + taskStartedAt: failedClaimedAt, context: context || undefined, supabaseClient: supabaseClient || undefined, telemetryContext: buildTelemetry({ status: "failed", reason }), diff --git a/apps/website/app/api/supabase/sync-task/[fn]/[target]/[worker]/route.ts b/apps/website/app/api/supabase/sync-task/[fn]/[target]/[worker]/route.ts index 0cd41bd13..90a2c46df 100644 --- a/apps/website/app/api/supabase/sync-task/[fn]/[target]/[worker]/route.ts +++ b/apps/website/app/api/supabase/sync-task/[fn]/[target]/[worker]/route.ts @@ -7,7 +7,71 @@ import { createApiResponse, handleRouteError } from "~/utils/supabase/apiUtils"; type ApiParams = Promise<{ target: string; fn: string; worker: string }>; export type SegmentDataType = { params: ApiParams }; -// POST the task status to the /supabase/sync-task/{function_name}/{target}/{worker} endpoint +type TaskStatus = Database["public"]["Enums"]["task_status"]; + +type ParsedEndSyncTaskBody = + | { + ok: true; + status: TaskStatus; + startedAt: string; + } + | { + ok: false; + error: string; + }; + +const isRecord = (value: unknown): value is Record => + typeof value === "object" && value !== null && !Array.isArray(value); + +const normalizeStartedAt = (value: unknown): string | null => { + if (typeof value === "number" && Number.isFinite(value)) { + const date = new Date(value); + return Number.isNaN(date.valueOf()) ? null : date.toISOString(); + } + + if (typeof value === "string" && !Number.isNaN(Date.parse(value))) { + return value; + } + + return null; +}; + +const parseEndSyncTaskBody = (body: unknown): ParsedEndSyncTaskBody => { + if (!isRecord(body)) { + return { + ok: false, + error: + "Request body must be { status: string, s_started_at: string | number }", + }; + } + + const { status, s_started_at: startedAtRaw } = body; + if ( + typeof status !== "string" || + !(Constants.public.Enums.task_status as readonly string[]).includes(status) + ) { + return { + ok: false, + error: `${String(status)} is not a task status`, + }; + } + + const startedAt = normalizeStartedAt(startedAtRaw); + if (startedAt === null) { + return { + ok: false, + error: "s_started_at must be an ISO timestamp string or epoch number", + }; + } + + return { + ok: true, + status: status as TaskStatus, + startedAt, + }; +}; + +// POST the task status and claim timestamp to the /supabase/sync-task/{function_name}/{target}/{worker} endpoint export const POST = async ( request: NextRequest, segmentData: SegmentDataType, @@ -22,28 +86,20 @@ export const POST = async ( ); } const infoRaw: unknown = await request.json(); - if (typeof infoRaw !== "string") { - return createApiResponse( - request, - asPostgrestFailure("Request body is not a task status", "type"), - ); - } - const infoS = infoRaw; - if ( - !(Constants.public.Enums.task_status as readonly string[]).includes(infoS) - ) { + const parsedBody = parseEndSyncTaskBody(infoRaw); + if (!parsedBody.ok) { return createApiResponse( request, - asPostgrestFailure(`${infoS} is not a task status`, "type"), + asPostgrestFailure(parsedBody.error, "type"), ); } - const info = infoS as Database["public"]["Enums"]["task_status"]; const supabase = await createClient(); const response = await supabase.rpc("end_sync_task", { s_target: targetN, s_function: fn, s_worker: worker, - s_status: info, + s_status: parsedBody.status, + s_started_at: parsedBody.startedAt, }); if (response.status === 204) { response.data = { ok: true, stale: false }; diff --git a/packages/database/doc/sync_functions.md b/packages/database/doc/sync_functions.md index 3fa23bc03..b9deec26c 100644 --- a/packages/database/doc/sync_functions.md +++ b/packages/database/doc/sync_functions.md @@ -15,5 +15,5 @@ When a worker calls `propose_sync_task`, it will receive either: 2. a timestamp in the past, which is also the last time the task was executed successfully. Your worker can ask the platform for all changes posterior to that time. (This will only be reliable if you make always make your queries after calling this function!) 3. Null, meaning the task was not executed successfully before, and your worker is tasked with starting from scratch. -When a worker finishes the task, it should clean up with `end_sync_task`, giving the same identifying arguments and a status (`complete` or `failed`). +When a worker finishes the task, it should clean up with `end_sync_task`, giving the same identifying arguments, a status (`complete` or `failed`), and the time at which the task was claimed. The function returns a JSON object with `ok` and `stale` fields plus task metadata. A stale response means another attempt claimed a newer task while this attempt was still running, so the caller should not report it as a successful completion. diff --git a/packages/database/supabase/migrations/20260528033000_end_sync_task_result.sql b/packages/database/supabase/migrations/20260528033000_end_sync_task_result.sql index 0e413630a..52bc1d44b 100644 --- a/packages/database/supabase/migrations/20260528033000_end_sync_task_result.sql +++ b/packages/database/supabase/migrations/20260528033000_end_sync_task_result.sql @@ -26,7 +26,8 @@ DECLARE t_task_times_out_at TIMESTAMP WITH TIME ZONE; BEGIN SELECT id, worker, status, failure_count, last_task_start, last_task_end, last_success_start, task_times_out_at INTO STRICT t_id, t_worker, t_status, t_failure_count, t_last_task_start, t_last_task_end, t_last_success_start, t_task_times_out_at - FROM public.sync_info WHERE sync_target = s_target AND sync_function = s_function; + FROM public.sync_info WHERE sync_target = s_target AND sync_function = s_function + FOR UPDATE; ASSERT s_status > 'active'; IF COALESCE(s_started_at, t_last_task_start) < t_last_task_start THEN -- we probably took too long. Let the other task have priority. diff --git a/packages/database/supabase/schemas/sync.sql b/packages/database/supabase/schemas/sync.sql index c05df02ee..2e493cb6a 100644 --- a/packages/database/supabase/schemas/sync.sql +++ b/packages/database/supabase/schemas/sync.sql @@ -67,7 +67,8 @@ DECLARE t_task_times_out_at TIMESTAMP WITH TIME ZONE; BEGIN SELECT id, worker, status, failure_count, last_task_start, last_task_end, last_success_start, task_times_out_at INTO STRICT t_id, t_worker, t_status, t_failure_count, t_last_task_start, t_last_task_end, t_last_success_start, t_task_times_out_at - FROM public.sync_info WHERE sync_target = s_target AND sync_function = s_function; + FROM public.sync_info WHERE sync_target = s_target AND sync_function = s_function + FOR UPDATE; ASSERT s_status > 'active'; IF COALESCE(s_started_at, t_last_task_start) < t_last_task_start THEN -- we probably took too long. Let the other task have priority. From c1c6b1f711f1a61c0bf40293ab025bf222ce48f2 Mon Sep 17 00:00:00 2001 From: Michael Gartner Date: Wed, 27 May 2026 23:02:52 -0600 Subject: [PATCH 3/6] Restored the worker guard --- .../supabase/migrations/20260528033000_end_sync_task_result.sql | 2 +- packages/database/supabase/schemas/sync.sql | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/database/supabase/migrations/20260528033000_end_sync_task_result.sql b/packages/database/supabase/migrations/20260528033000_end_sync_task_result.sql index 52bc1d44b..860090e23 100644 --- a/packages/database/supabase/migrations/20260528033000_end_sync_task_result.sql +++ b/packages/database/supabase/migrations/20260528033000_end_sync_task_result.sql @@ -29,7 +29,7 @@ BEGIN FROM public.sync_info WHERE sync_target = s_target AND sync_function = s_function FOR UPDATE; ASSERT s_status > 'active'; - IF COALESCE(s_started_at, t_last_task_start) < t_last_task_start THEN + IF t_worker != s_worker AND COALESCE(s_started_at, t_last_task_start) < t_last_task_start THEN -- we probably took too long. Let the other task have priority. RETURN jsonb_build_object( 'ok', false, diff --git a/packages/database/supabase/schemas/sync.sql b/packages/database/supabase/schemas/sync.sql index 2e493cb6a..249f97152 100644 --- a/packages/database/supabase/schemas/sync.sql +++ b/packages/database/supabase/schemas/sync.sql @@ -70,7 +70,7 @@ BEGIN FROM public.sync_info WHERE sync_target = s_target AND sync_function = s_function FOR UPDATE; ASSERT s_status > 'active'; - IF COALESCE(s_started_at, t_last_task_start) < t_last_task_start THEN + IF t_worker != s_worker AND COALESCE(s_started_at, t_last_task_start) < t_last_task_start THEN -- we probably took too long. Let the other task have priority. RETURN jsonb_build_object( 'ok', false, From b2fe9d35e87bb6dbd787d0c30acf6c2b95affaf6 Mon Sep 17 00:00:00 2001 From: Michael Gartner Date: Wed, 27 May 2026 23:41:44 -0600 Subject: [PATCH 4/6] Update end_sync_task API to allow optional startedAt parameter and improve request body validation - Made 'startedAt' in ParsedEndSyncTaskBody optional. - Enhanced parseEndSyncTaskBody to accept a task status string directly. - Updated error messages for better clarity on request body requirements. - Adjusted RPC call to conditionally include 's_started_at' based on availability. --- .../sync-task/[fn]/[target]/[worker]/route.ts | 31 ++++++++++++++++--- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/apps/website/app/api/supabase/sync-task/[fn]/[target]/[worker]/route.ts b/apps/website/app/api/supabase/sync-task/[fn]/[target]/[worker]/route.ts index 90a2c46df..b2b16745b 100644 --- a/apps/website/app/api/supabase/sync-task/[fn]/[target]/[worker]/route.ts +++ b/apps/website/app/api/supabase/sync-task/[fn]/[target]/[worker]/route.ts @@ -13,7 +13,7 @@ type ParsedEndSyncTaskBody = | { ok: true; status: TaskStatus; - startedAt: string; + startedAt?: string; } | { ok: false; @@ -37,11 +37,28 @@ const normalizeStartedAt = (value: unknown): string | null => { }; const parseEndSyncTaskBody = (body: unknown): ParsedEndSyncTaskBody => { + if (typeof body === "string") { + if ( + (Constants.public.Enums.task_status as readonly string[]).includes(body) + ) { + return { + ok: true, + status: body as TaskStatus, + }; + } + + return { + ok: false, + error: `${body} is not a task status`, + }; + } + if (!isRecord(body)) { return { ok: false, error: - "Request body must be { status: string, s_started_at: string | number }", + "Request body must be a task status string or " + + "{ status: string, s_started_at: string | number }", }; } @@ -94,13 +111,17 @@ export const POST = async ( ); } const supabase = await createClient(); - const response = await supabase.rpc("end_sync_task", { + const rpcArgs: Database["public"]["Functions"]["end_sync_task"]["Args"] = { s_target: targetN, s_function: fn, s_worker: worker, s_status: parsedBody.status, - s_started_at: parsedBody.startedAt, - }); + }; + if (parsedBody.startedAt !== undefined) { + rpcArgs.s_started_at = parsedBody.startedAt; + } + + const response = await supabase.rpc("end_sync_task", rpcArgs); if (response.status === 204) { response.data = { ok: true, stale: false }; response.status = 200; From c8ecd67050f337b640b360f8c59e422fbdca5322 Mon Sep 17 00:00:00 2001 From: Michael Gartner Date: Wed, 27 May 2026 23:59:12 -0600 Subject: [PATCH 5/6] Remove unnecessary success assignment in createOrUpdateDiscourseEmbedding function to streamline error handling. --- apps/roam/src/utils/syncDgNodesToSupabase.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/apps/roam/src/utils/syncDgNodesToSupabase.ts b/apps/roam/src/utils/syncDgNodesToSupabase.ts index 48259f496..46064becb 100644 --- a/apps/roam/src/utils/syncDgNodesToSupabase.ts +++ b/apps/roam/src/utils/syncDgNodesToSupabase.ts @@ -928,7 +928,6 @@ export const createOrUpdateDiscourseEmbedding = async ( }), ); } else { - success = false; posthog.capture( "Sync error", buildTelemetry({ From 9a72a284e48444ce7d6423841bef681669da94db Mon Sep 17 00:00:00 2001 From: Michael Gartner Date: Thu, 28 May 2026 14:21:54 -0600 Subject: [PATCH 6/6] add a version to the JSON output --- apps/roam/src/utils/syncDgNodesToSupabase.ts | 57 +++++++++++++++++-- packages/database/doc/sync_functions.md | 2 +- .../20260528033000_end_sync_task_result.sql | 3 + packages/database/supabase/schemas/sync.sql | 3 + 4 files changed, 59 insertions(+), 6 deletions(-) diff --git a/apps/roam/src/utils/syncDgNodesToSupabase.ts b/apps/roam/src/utils/syncDgNodesToSupabase.ts index 46064becb..69ab5d5cb 100644 --- a/apps/roam/src/utils/syncDgNodesToSupabase.ts +++ b/apps/roam/src/utils/syncDgNodesToSupabase.ts @@ -40,6 +40,7 @@ const BASE_SYNC_INTERVAL = 5 * 60 * 1000; // 5 minutes const SYNC_TIMEOUT = "60s"; // must be less than half the SYNC_INTERVAL. const BATCH_SIZE = 200; const CONCEPT_BATCH_SIZE = 200; +const END_SYNC_TASK_RESULT_VERSION = 1; type SyncPhaseDurations = Record; @@ -50,6 +51,7 @@ type SyncTaskInfo = { }; type EndSyncTaskRpcResult = { + version?: number; ok: boolean; stale: boolean; reason?: string; @@ -100,15 +102,39 @@ const getSyncWorkerId = (): string => { return syncWorkerId; }; -const isEndSyncTaskRpcResult = ( +const getJsonObject = ( data: Json | undefined, -): data is EndSyncTaskRpcResult => { +): Record | null => { if (typeof data !== "object" || data === null || Array.isArray(data)) { - return false; + return null; + } + + return data as Record; +}; + +const getEndSyncTaskResultVersion = (data: Json | undefined): number => { + const result = getJsonObject(data); + if (result === null || typeof result.version !== "number") { + return 0; } - const result = data as { ok?: unknown; stale?: unknown }; - return typeof result.ok === "boolean" && typeof result.stale === "boolean"; + return result.version; +}; + +const isEndSyncTaskRpcResult = ( + data: Json | undefined, +): data is EndSyncTaskRpcResult => { + const result = getJsonObject(data); + if (result === null) return false; + + const versionIsSupported = + result.version === undefined || typeof result.version === "number"; + + return ( + versionIsSupported && + typeof result.ok === "boolean" && + typeof result.stale === "boolean" + ); }; const measureSyncPhase = async ({ @@ -320,6 +346,27 @@ export const endSyncTask = async ({ } if (!isEndSyncTaskRpcResult(data)) { + const resultVersion = getEndSyncTaskResultVersion(data); + if (resultVersion > END_SYNC_TASK_RESULT_VERSION) { + const rpcResult: EndSyncTaskRpcResult = { + version: resultVersion, + ok: true, + stale: false, + reason: "unsupported_future_result_version", + }; + posthog.capture("Sync end task future result", { + ...telemetryContext, + endSyncPayload: data, + endSyncResult: rpcResult, + }); + + return { + ok: true, + stale: false, + rpcResult, + }; + } + const reason = "Supabase end_sync_task returned unexpected payload"; notifyEndSyncFailure({ status, diff --git a/packages/database/doc/sync_functions.md b/packages/database/doc/sync_functions.md index b9deec26c..8381c6248 100644 --- a/packages/database/doc/sync_functions.md +++ b/packages/database/doc/sync_functions.md @@ -16,4 +16,4 @@ When a worker calls `propose_sync_task`, it will receive either: 3. Null, meaning the task was not executed successfully before, and your worker is tasked with starting from scratch. When a worker finishes the task, it should clean up with `end_sync_task`, giving the same identifying arguments, a status (`complete` or `failed`), and the time at which the task was claimed. -The function returns a JSON object with `ok` and `stale` fields plus task metadata. A stale response means another attempt claimed a newer task while this attempt was still running, so the caller should not report it as a successful completion. +The function returns a versioned JSON object with `ok` and `stale` fields plus task metadata. A stale response means another attempt claimed a newer task while this attempt was still running, so the caller should not report it as a successful completion. diff --git a/packages/database/supabase/migrations/20260528033000_end_sync_task_result.sql b/packages/database/supabase/migrations/20260528033000_end_sync_task_result.sql index 860090e23..0c965e73d 100644 --- a/packages/database/supabase/migrations/20260528033000_end_sync_task_result.sql +++ b/packages/database/supabase/migrations/20260528033000_end_sync_task_result.sql @@ -32,6 +32,7 @@ BEGIN IF t_worker != s_worker AND COALESCE(s_started_at, t_last_task_start) < t_last_task_start THEN -- we probably took too long. Let the other task have priority. RETURN jsonb_build_object( + 'version', 1, 'ok', false, 'stale', true, 'reason', 'completed_by_newer_task', @@ -50,6 +51,7 @@ BEGIN IF t_worker != s_worker THEN RAISE EXCEPTION 'Wrong worker' USING DETAIL = jsonb_build_object( + 'version', 1, 'requestedStatus', s_status, 'callerWorker', s_worker, 'currentWorker', t_worker, @@ -83,6 +85,7 @@ BEGIN WHERE id=t_id; RETURN jsonb_build_object( + 'version', 1, 'ok', true, 'stale', false, 'requestedStatus', s_status, diff --git a/packages/database/supabase/schemas/sync.sql b/packages/database/supabase/schemas/sync.sql index 249f97152..2316e658c 100644 --- a/packages/database/supabase/schemas/sync.sql +++ b/packages/database/supabase/schemas/sync.sql @@ -73,6 +73,7 @@ BEGIN IF t_worker != s_worker AND COALESCE(s_started_at, t_last_task_start) < t_last_task_start THEN -- we probably took too long. Let the other task have priority. RETURN jsonb_build_object( + 'version', 1, 'ok', false, 'stale', true, 'reason', 'completed_by_newer_task', @@ -91,6 +92,7 @@ BEGIN IF t_worker != s_worker THEN RAISE EXCEPTION 'Wrong worker' USING DETAIL = jsonb_build_object( + 'version', 1, 'requestedStatus', s_status, 'callerWorker', s_worker, 'currentWorker', t_worker, @@ -124,6 +126,7 @@ BEGIN WHERE id=t_id; RETURN jsonb_build_object( + 'version', 1, 'ok', true, 'stale', false, 'requestedStatus', s_status,