diff --git a/apps/roam/src/utils/syncDgNodesToSupabase.ts b/apps/roam/src/utils/syncDgNodesToSupabase.ts index c06b010ff..69ab5d5cb 100644 --- a/apps/roam/src/utils/syncDgNodesToSupabase.ts +++ b/apps/roam/src/utils/syncDgNodesToSupabase.ts @@ -30,6 +30,7 @@ import type { LocalContentDataInput, LocalAccountDataInput, } from "@repo/database/inputTypes"; +import type { Properties } from "posthog-js"; const SYNC_FUNCTION = "embedding"; // Minimal interval between syncs of all clients for this task. @@ -39,6 +40,9 @@ const BASE_SYNC_INTERVAL = 5 * 60 * 1000; // 5 minutes const SYNC_TIMEOUT = "60s"; // must be less than half the SYNC_INTERVAL. const BATCH_SIZE = 200; const CONCEPT_BATCH_SIZE = 200; +const END_SYNC_TASK_RESULT_VERSION = 1; + +type SyncPhaseDurations = Record; type SyncTaskInfo = { lastUpdateTime?: Date; @@ -46,6 +50,163 @@ type SyncTaskInfo = { shouldProceed: boolean; }; +type EndSyncTaskRpcResult = { + version?: number; + ok: boolean; + stale: boolean; + reason?: string; + requestedStatus?: string; + callerWorker?: string; + currentWorker?: string; + currentStatus?: string; + callerStartedAt?: string; + currentStartedAt?: string; + lastTaskEnd?: string; + lastSuccessStart?: string; + taskTimesOutAt?: string; + failureCount?: number; +}; + +type EndSyncTaskResult = + | { + ok: true; + stale: false; + rpcResult?: EndSyncTaskRpcResult; + } + | { + ok: false; + stale: true; + rpcResult?: EndSyncTaskRpcResult; + } + | { + ok: false; + stale: false; + error: Error; + rpcResult?: EndSyncTaskRpcResult; + }; + +let syncWorkerId: string | null = null; + +const createRuntimeId = (): string => { + if (typeof crypto !== "undefined" && typeof crypto.randomUUID === "function") + return crypto.randomUUID(); + + return `${Date.now()}-${Math.random().toString(36).slice(2)}`; +}; + +const getSyncWorkerId = (): string => { + if (syncWorkerId === null) { + syncWorkerId = `roam-${createRuntimeId()}`; + } + + return syncWorkerId; +}; + +const getJsonObject = ( + data: Json | undefined, +): Record | null => { + if (typeof data !== "object" || data === null || Array.isArray(data)) { + return null; + } + + return data as Record; +}; + +const getEndSyncTaskResultVersion = (data: Json | undefined): number => { + const result = getJsonObject(data); + if (result === null || typeof result.version !== "number") { + return 0; + } + + return result.version; +}; + +const isEndSyncTaskRpcResult = ( + data: Json | undefined, +): data is EndSyncTaskRpcResult => { + const result = getJsonObject(data); + if (result === null) return false; + + const versionIsSupported = + result.version === undefined || typeof result.version === "number"; + + return ( + versionIsSupported && + typeof result.ok === "boolean" && + typeof result.stale === "boolean" + ); +}; + +const measureSyncPhase = async ({ + phase, + phases, + operation, +}: { + phase: string; + phases: SyncPhaseDurations; + operation: () => Promise; +}): Promise => { + const phaseStart = performance.now(); + try { + return await operation(); + } finally { + phases[phase] = Math.round(performance.now() - phaseStart); + } +}; + +const syncTelemetryContext = ({ + attemptId, + worker, + userUid, + context, + startTime, + claimed, + phases, + status, + reason, + nextUpdateTime, + lastUpdateTime, + endSyncResult, +}: { + attemptId: string; + worker: string; + userUid: string; + context: SupabaseContext | null; + startTime: Date; + claimed: boolean; + phases: SyncPhaseDurations; + status: string; + reason?: string; + nextUpdateTime?: Date; + lastUpdateTime?: Date; + endSyncResult?: EndSyncTaskRpcResult; +}): Properties => { + const duration = (Date.now() - startTime.valueOf()) / 1000.0; + const phaseProperties = Object.fromEntries( + Object.entries(phases).map(([phase, durationMs]) => [ + `phase_${phase}_ms`, + durationMs, + ]), + ); + + return { + syncAttemptId: attemptId, + syncWorkerId: worker, + syncFunction: SYNC_FUNCTION, + syncUserUid: userUid, + claimed, + status, + duration, + reason, + spaceId: context?.spaceId, + startedAt: startTime.toISOString(), + nextUpdateTime: nextUpdateTime?.toISOString(), + lastUpdateTime: lastUpdateTime?.toISOString(), + endSyncResult, + ...phaseProperties, + }; +}; + const chunk = (array: T[], size: number): T[][] => { const chunks: T[][] = []; for (let i = 0; i < array.length; i += size) { @@ -103,10 +264,12 @@ const notifyEndSyncFailure = ({ status, showToast, reason, + context, }: { status: Enums<"task_status">; showToast: boolean; reason: string; + context?: Properties; }): void => { if (showToast) { renderToast({ @@ -120,7 +283,7 @@ const notifyEndSyncFailure = ({ internalError({ error: new Error(reason), type: "Sync Failed", - context: { status }, + context: { status, ...(context || {}) }, }); }; @@ -128,38 +291,136 @@ export const endSyncTask = async ({ worker, status, showToast = false, - startTime, + taskStartedAt, + context, + supabaseClient, + telemetryContext, }: { worker: string; status: Enums<"task_status">; showToast: boolean; - startTime: Date; -}): Promise => { + taskStartedAt: Date; + context?: SupabaseContext; + supabaseClient?: DGSupabaseClient; + telemetryContext?: Properties; +}): Promise => { try { - const supabaseClient = await getLoggedInClient(); - if (!supabaseClient) return; - const context = await getSupabaseContext(); - if (!context) { + const resolvedClient = supabaseClient || (await getLoggedInClient()); + if (!resolvedClient) { + const error = new Error("Missing Supabase client while ending sync task"); + return { ok: false, stale: false, error }; + } + const resolvedContext = context || (await getSupabaseContext()); + if (!resolvedContext) { console.error("endSyncTask: Unable to obtain Supabase context."); - return; + const error = new Error("Unable to obtain Supabase context"); + return { ok: false, stale: false, error }; } - const { error } = await supabaseClient.rpc("end_sync_task", { - s_target: context.spaceId, + const { data, error } = await resolvedClient.rpc("end_sync_task", { + s_target: resolvedContext.spaceId, s_function: SYNC_FUNCTION, s_worker: worker, s_status: status, - s_started_at: startTime.toISOString(), + s_started_at: taskStartedAt.toISOString(), }); if (error) { console.error("endSyncTask: Error calling end_sync_task:", error); + const reason = `Supabase end_sync_task RPC failed: ${error.message ?? "Unknown error"}`; notifyEndSyncFailure({ status, showToast, - reason: `Supabase end_sync_task RPC failed: ${error.message ?? "Unknown error"}`, + reason, + context: { + ...telemetryContext, + endSyncErrorCode: error.code, + endSyncErrorDetails: error.details, + endSyncErrorHint: error.hint, + }, }); - return; - } else if (showToast) { + return { + ok: false, + stale: false, + error: new Error(reason), + }; + } + + if (!isEndSyncTaskRpcResult(data)) { + const resultVersion = getEndSyncTaskResultVersion(data); + if (resultVersion > END_SYNC_TASK_RESULT_VERSION) { + const rpcResult: EndSyncTaskRpcResult = { + version: resultVersion, + ok: true, + stale: false, + reason: "unsupported_future_result_version", + }; + posthog.capture("Sync end task future result", { + ...telemetryContext, + endSyncPayload: data, + endSyncResult: rpcResult, + }); + + return { + ok: true, + stale: false, + rpcResult, + }; + } + + const reason = "Supabase end_sync_task returned unexpected payload"; + notifyEndSyncFailure({ + status, + showToast, + reason, + context: { + ...telemetryContext, + endSyncPayload: data, + }, + }); + + return { + ok: false, + stale: false, + error: new Error(reason), + }; + } + + const rpcResult = data; + if (rpcResult?.stale === true) { + posthog.capture("Sync end task stale", { + ...telemetryContext, + endSyncResult: rpcResult, + }); + + return { + ok: false, + stale: true, + rpcResult, + }; + } + + if (rpcResult?.ok === false) { + const reason = + rpcResult.reason || "Supabase end_sync_task returned failure"; + notifyEndSyncFailure({ + status, + showToast, + reason, + context: { + ...telemetryContext, + endSyncResult: rpcResult, + }, + }); + + return { + ok: false, + stale: false, + error: new Error(reason), + rpcResult, + }; + } + + if (showToast) { if (status === "failed") { renderToast({ id: "discourse-embedding-failed", @@ -169,16 +430,26 @@ export const endSyncTask = async ({ }); } } + + return { ok: true, stale: false, rpcResult }; } catch (error) { console.error("endSyncTask: Error calling end_sync_task:", error); + const reason = + error instanceof Error + ? `Unexpected error ending sync task: ${error.message}` + : "Unexpected non-error thrown while ending sync task"; notifyEndSyncFailure({ status, showToast, - reason: - error instanceof Error - ? `Unexpected error ending sync task: ${error.message}` - : "Unexpected non-error thrown while ending sync task", + reason, + context: telemetryContext, }); + + return { + ok: false, + stale: false, + error: error instanceof Error ? error : new Error(reason), + }; } }; @@ -199,7 +470,7 @@ export const proposeSyncTask = async ( if (error) { console.error( - `proposeSyncTask: propose_sync_task failed – ${error.message}`, + `proposeSyncTask: propose_sync_task failed - ${error.message}`, ); return { shouldProceed: false }; } @@ -497,37 +768,98 @@ const getAllMissingOrNewDiscourseNodes = async ({ ]; }; -export const createOrUpdateDiscourseEmbedding = async (showToast = false) => { +export const createOrUpdateDiscourseEmbedding = async ( + showToast = false, +): Promise => { if (!doSync) return; console.debug("starting createOrUpdateDiscourseEmbedding"); const startTime = new Date(); + const attemptId = createRuntimeId(); + const phases: SyncPhaseDurations = {}; let success = true; let claimed = false; const isInitialSync = initialSync; // record state at start - const worker = window.roamAlphaAPI.user.uid(); + let claimedAt: Date | null = null; + let context: SupabaseContext | null = null; + let supabaseClient: DGSupabaseClient | null = null; + let userUid = ""; + const worker = getSyncWorkerId(); + + const buildTelemetry = ({ + status, + reason, + nextUpdateTime, + lastUpdateTime, + endSyncResult, + }: { + status: string; + reason?: string; + nextUpdateTime?: Date; + lastUpdateTime?: Date; + endSyncResult?: EndSyncTaskRpcResult; + }): Properties => + syncTelemetryContext({ + attemptId, + worker, + userUid, + context, + startTime, + claimed, + phases, + status, + reason, + nextUpdateTime, + lastUpdateTime, + endSyncResult, + }); try { - if (!worker) { + const resolvedUserUid = window.roamAlphaAPI.user.uid(); + if (!resolvedUserUid) { throw new FatalError("Unable to obtain user UID."); } + userUid = resolvedUserUid; - const supabaseClient = await getLoggedInClient(); + supabaseClient = await measureSyncPhase({ + phase: "getLoggedInClient", + phases, + operation: getLoggedInClient, + }); if (!supabaseClient) { // TODO: Distinguish connection vs credentials error throw new Error("Could not log in to client."); } - const context = await getSupabaseContext(); + context = await measureSyncPhase({ + phase: "getSupabaseContext", + phases, + operation: getSupabaseContext, + }); if (!context) { // not worth retrying: setup error throw new FatalError("Error connecting to client."); } + const activeSupabaseClient = supabaseClient; + const activeContext = context; const { shouldProceed, lastUpdateTime, nextUpdateTime } = - await proposeSyncTask(worker, supabaseClient, context); + await measureSyncPhase({ + phase: "proposeSyncTask", + phases, + operation: () => + proposeSyncTask(worker, activeSupabaseClient, activeContext), + }); if (!shouldProceed) { if (nextUpdateTime === undefined) { throw new Error("Can't obtain sync task"); } console.debug("postponed to ", nextUpdateTime); + posthog.capture( + "Sync postponed", + buildTelemetry({ + status: "postponed", + nextUpdateTime, + lastUpdateTime, + }), + ); if (doSync) { activeTimeout = setTimeout( createOrUpdateDiscourseEmbedding, // eslint-disable-line @typescript-eslint/no-misused-promises @@ -539,7 +871,13 @@ export const createOrUpdateDiscourseEmbedding = async (showToast = false) => { return; } claimed = true; - const allUsers = await getAllUsers(); + const activeClaimedAt = new Date(); + claimedAt = activeClaimedAt; + const allUsers = await measureSyncPhase({ + phase: "getAllUsers", + phases, + operation: getAllUsers, + }); const sinceTime = lastUpdateTime ? lastUpdateTime.valueOf() - 1000 // add a one-second buffer : undefined; @@ -547,39 +885,137 @@ export const createOrUpdateDiscourseEmbedding = async (showToast = false) => { (n) => n.backedBy === "user", ); - const allNodeInstances = isInitialSync - ? await getAllMissingOrNewDiscourseNodes({ - supabaseClient, - spaceId: context.spaceId, + const allNodeInstances = await measureSyncPhase({ + phase: isInitialSync + ? "getAllMissingOrNewDiscourseNodes" + : "getAllDiscourseNodesSince", + phases, + operation: () => + isInitialSync + ? getAllMissingOrNewDiscourseNodes({ + supabaseClient: activeSupabaseClient, + spaceId: activeContext.spaceId, + since: sinceTime, + nodeTypes: allDgNodeTypes, + }) + : getAllDiscourseNodesSince(sinceTime, allDgNodeTypes), + }); + await measureSyncPhase({ + phase: "upsertUsers", + phases, + operation: () => + upsertUsers(allUsers, activeSupabaseClient, activeContext), + }); + await measureSyncPhase({ + phase: "upsertNodes", + phases, + operation: () => + upsertNodesToSupabaseAsContentWithEmbeddings( + allNodeInstances, + activeSupabaseClient, + activeContext, + ), + }); + await measureSyncPhase({ + phase: "convertConcepts", + phases, + operation: () => + convertDgToSupabaseConcepts({ + nodesSince: allNodeInstances, since: sinceTime, - nodeTypes: allDgNodeTypes, - }) - : await getAllDiscourseNodesSince(sinceTime, allDgNodeTypes); - await upsertUsers(allUsers, supabaseClient, context); - await upsertNodesToSupabaseAsContentWithEmbeddings( - allNodeInstances, - supabaseClient, - context, - ); - await convertDgToSupabaseConcepts({ - nodesSince: allNodeInstances, - since: sinceTime, - allNodeTypes: allDgNodeTypes, - supabaseClient, - context, + allNodeTypes: allDgNodeTypes, + supabaseClient: activeSupabaseClient, + context: activeContext, + }), }); - await cleanupOrphanedNodes(supabaseClient, context); - await endSyncTask({ worker, status: "complete", showToast, startTime }); - initialSync = false; - const duration = (new Date().valueOf() - startTime.valueOf()) / 1000.0; - posthog.capture("Sync complete", { duration }); + await measureSyncPhase({ + phase: "cleanupOrphanedNodes", + phases, + operation: () => + cleanupOrphanedNodes(activeSupabaseClient, activeContext), + }); + const completeEndResult = await measureSyncPhase({ + phase: "endSyncTask", + phases, + operation: () => + endSyncTask({ + worker, + status: "complete", + showToast, + taskStartedAt: activeClaimedAt, + context: activeContext, + supabaseClient: activeSupabaseClient, + telemetryContext: buildTelemetry({ + status: "complete", + lastUpdateTime, + }), + }), + }); + + if (completeEndResult.ok) { + initialSync = false; + posthog.capture( + "Sync complete", + buildTelemetry({ + status: "complete", + lastUpdateTime, + endSyncResult: completeEndResult.rpcResult, + }), + ); + } else if (completeEndResult.stale) { + posthog.capture( + "Sync stale", + buildTelemetry({ + status: "stale", + reason: + completeEndResult.rpcResult?.reason || + "end_sync_task completed by newer sync task", + lastUpdateTime, + endSyncResult: completeEndResult.rpcResult, + }), + ); + } else { + posthog.capture( + "Sync error", + buildTelemetry({ + status: "end_task_failed", + reason: completeEndResult.error.message, + lastUpdateTime, + endSyncResult: completeEndResult.rpcResult, + }), + ); + } } catch (error) { console.error("createOrUpdateDiscourseEmbedding: Process failed:", error); success = false; - if (worker && claimed) - await endSyncTask({ worker, status: "failed", showToast, startTime }); - const duration = (new Date().valueOf() - startTime.valueOf()) / 1000.0; - posthog.capture("Sync error", { duration }); + const reason = + error instanceof Error ? error.message : "Unknown sync error"; + let failedEndResult: EndSyncTaskResult | undefined; + const failedClaimedAt = claimedAt; + if (failedClaimedAt !== null) { + failedEndResult = await measureSyncPhase({ + phase: "endSyncTaskFailed", + phases, + operation: () => + endSyncTask({ + worker, + status: "failed", + showToast, + taskStartedAt: failedClaimedAt, + context: context || undefined, + supabaseClient: supabaseClient || undefined, + telemetryContext: buildTelemetry({ status: "failed", reason }), + }), + }); + } + posthog.capture( + "Sync error", + buildTelemetry({ + status: error instanceof FatalError ? "fatal" : "failed", + reason, + endSyncResult: failedEndResult?.rpcResult, + }), + ); if (error instanceof FatalError) { doSync = false; return; @@ -594,7 +1030,7 @@ export const createOrUpdateDiscourseEmbedding = async (showToast = false) => { doSync = false; return; } - const jitter = 0.9 + Math.random() * 0.2; // 0.9x–1.1x + const jitter = 0.9 + Math.random() * 0.2; // 0.9x-1.1x timeout *= 2 ** numFailures * jitter; } if (activeTimeout != null) { diff --git a/apps/website/app/api/supabase/sync-task/[fn]/[target]/[worker]/route.ts b/apps/website/app/api/supabase/sync-task/[fn]/[target]/[worker]/route.ts index c61865f1f..b2b16745b 100644 --- a/apps/website/app/api/supabase/sync-task/[fn]/[target]/[worker]/route.ts +++ b/apps/website/app/api/supabase/sync-task/[fn]/[target]/[worker]/route.ts @@ -1,5 +1,4 @@ import { NextResponse, NextRequest } from "next/server"; -import { PostgrestSingleResponse } from "@supabase/supabase-js"; import { type Database, Constants } from "@repo/database/dbTypes"; import { asPostgrestFailure } from "@repo/database/lib/contextFunctions"; import { createClient } from "~/utils/supabase/server"; @@ -8,7 +7,88 @@ import { createApiResponse, handleRouteError } from "~/utils/supabase/apiUtils"; type ApiParams = Promise<{ target: string; fn: string; worker: string }>; export type SegmentDataType = { params: ApiParams }; -// POST the task status to the /supabase/sync-task/{function_name}/{target}/{worker} endpoint +type TaskStatus = Database["public"]["Enums"]["task_status"]; + +type ParsedEndSyncTaskBody = + | { + ok: true; + status: TaskStatus; + startedAt?: string; + } + | { + ok: false; + error: string; + }; + +const isRecord = (value: unknown): value is Record => + typeof value === "object" && value !== null && !Array.isArray(value); + +const normalizeStartedAt = (value: unknown): string | null => { + if (typeof value === "number" && Number.isFinite(value)) { + const date = new Date(value); + return Number.isNaN(date.valueOf()) ? null : date.toISOString(); + } + + if (typeof value === "string" && !Number.isNaN(Date.parse(value))) { + return value; + } + + return null; +}; + +const parseEndSyncTaskBody = (body: unknown): ParsedEndSyncTaskBody => { + if (typeof body === "string") { + if ( + (Constants.public.Enums.task_status as readonly string[]).includes(body) + ) { + return { + ok: true, + status: body as TaskStatus, + }; + } + + return { + ok: false, + error: `${body} is not a task status`, + }; + } + + if (!isRecord(body)) { + return { + ok: false, + error: + "Request body must be a task status string or " + + "{ status: string, s_started_at: string | number }", + }; + } + + const { status, s_started_at: startedAtRaw } = body; + if ( + typeof status !== "string" || + !(Constants.public.Enums.task_status as readonly string[]).includes(status) + ) { + return { + ok: false, + error: `${String(status)} is not a task status`, + }; + } + + const startedAt = normalizeStartedAt(startedAtRaw); + if (startedAt === null) { + return { + ok: false, + error: "s_started_at must be an ISO timestamp string or epoch number", + }; + } + + return { + ok: true, + status: status as TaskStatus, + startedAt, + }; +}; + +// POST the task status and claim timestamp to the /supabase/sync-task/{function_name}/{target}/{worker} endpoint export const POST = async ( request: NextRequest, segmentData: SegmentDataType, @@ -22,26 +102,28 @@ export const POST = async ( asPostgrestFailure(`${target} is not a number`, "type"), ); } - const infoS: string = await request.json(); - if ( - !(Constants.public.Enums.task_status as readonly string[]).includes(infoS) - ) { + const infoRaw: unknown = await request.json(); + const parsedBody = parseEndSyncTaskBody(infoRaw); + if (!parsedBody.ok) { return createApiResponse( request, - asPostgrestFailure(`${infoS} is not a task status`, "type"), + asPostgrestFailure(parsedBody.error, "type"), ); } - const info = infoS as Database["public"]["Enums"]["task_status"]; const supabase = await createClient(); - const response = (await supabase.rpc("end_sync_task", { + const rpcArgs: Database["public"]["Functions"]["end_sync_task"]["Args"] = { s_target: targetN, s_function: fn, s_worker: worker, - s_status: info, - })) as PostgrestSingleResponse; - // Transform 204 No Content to 200 OK with success indicator for API consistency + s_status: parsedBody.status, + }; + if (parsedBody.startedAt !== undefined) { + rpcArgs.s_started_at = parsedBody.startedAt; + } + + const response = await supabase.rpc("end_sync_task", rpcArgs); if (response.status === 204) { - response.data = true; + response.data = { ok: true, stale: false }; response.status = 200; } diff --git a/packages/database/doc/sync_functions.md b/packages/database/doc/sync_functions.md index 20b280d80..8381c6248 100644 --- a/packages/database/doc/sync_functions.md +++ b/packages/database/doc/sync_functions.md @@ -15,4 +15,5 @@ When a worker calls `propose_sync_task`, it will receive either: 2. a timestamp in the past, which is also the last time the task was executed successfully. Your worker can ask the platform for all changes posterior to that time. (This will only be reliable if you make always make your queries after calling this function!) 3. Null, meaning the task was not executed successfully before, and your worker is tasked with starting from scratch. -When a worker finishes the task, it should clean up with `end_sync_task`, giving the same identifying arguments and a status ('complete' or 'failed'.) +When a worker finishes the task, it should clean up with `end_sync_task`, giving the same identifying arguments, a status (`complete` or `failed`), and the time at which the task was claimed. +The function returns a versioned JSON object with `ok` and `stale` fields plus task metadata. A stale response means another attempt claimed a newer task while this attempt was still running, so the caller should not report it as a successful completion. diff --git a/packages/database/src/dbTypes.ts b/packages/database/src/dbTypes.ts index 06c723a94..48342b32c 100644 --- a/packages/database/src/dbTypes.ts +++ b/packages/database/src/dbTypes.ts @@ -1659,7 +1659,7 @@ export type Database = { s_target: number s_worker: string } - Returns: undefined + Returns: Json } extract_references: { Args: { refs: Json }; Returns: number[] } file_access: { Args: { hashvalue: string }; Returns: boolean } diff --git a/packages/database/supabase/migrations/20260528033000_end_sync_task_result.sql b/packages/database/supabase/migrations/20260528033000_end_sync_task_result.sql new file mode 100644 index 000000000..0c965e73d --- /dev/null +++ b/packages/database/supabase/migrations/20260528033000_end_sync_task_result.sql @@ -0,0 +1,133 @@ +DROP FUNCTION IF EXISTS public.end_sync_task( + s_target bigint, + s_function character varying, + s_worker character varying, + s_status public.task_status, + s_started_at timestamptz); + +CREATE OR REPLACE FUNCTION public.end_sync_task( + s_target bigint, + s_function character varying, + s_worker character varying, + s_status public.task_status, + s_started_at timestamptz = NULL +) RETURNS jsonb +SET search_path = '' +LANGUAGE plpgsql +AS $$ +DECLARE t_id INTEGER; +DECLARE t_worker varchar; +DECLARE t_status public.task_status; +DECLARE t_failure_count SMALLINT; +DECLARE t_last_task_start TIMESTAMP WITH TIME ZONE; +DECLARE t_last_success_start TIMESTAMP WITH TIME ZONE; +DECLARE t_last_task_end TIMESTAMP WITH TIME ZONE; +DECLARE t_task_times_out_at TIMESTAMP WITH TIME ZONE; +BEGIN + SELECT id, worker, status, failure_count, last_task_start, last_task_end, last_success_start, task_times_out_at + INTO STRICT t_id, t_worker, t_status, t_failure_count, t_last_task_start, t_last_task_end, t_last_success_start, t_task_times_out_at + FROM public.sync_info WHERE sync_target = s_target AND sync_function = s_function + FOR UPDATE; + ASSERT s_status > 'active'; + IF t_worker != s_worker AND COALESCE(s_started_at, t_last_task_start) < t_last_task_start THEN + -- we probably took too long. Let the other task have priority. + RETURN jsonb_build_object( + 'version', 1, + 'ok', false, + 'stale', true, + 'reason', 'completed_by_newer_task', + 'requestedStatus', s_status, + 'callerWorker', s_worker, + 'currentWorker', t_worker, + 'currentStatus', t_status, + 'callerStartedAt', s_started_at, + 'currentStartedAt', t_last_task_start, + 'lastTaskEnd', t_last_task_end, + 'lastSuccessStart', t_last_success_start, + 'taskTimesOutAt', t_task_times_out_at, + 'failureCount', t_failure_count + ); + END IF; + IF t_worker != s_worker THEN + RAISE EXCEPTION 'Wrong worker' + USING DETAIL = jsonb_build_object( + 'version', 1, + 'requestedStatus', s_status, + 'callerWorker', s_worker, + 'currentWorker', t_worker, + 'currentStatus', t_status, + 'callerStartedAt', s_started_at, + 'currentStartedAt', t_last_task_start, + 'lastTaskEnd', t_last_task_end, + 'lastSuccessStart', t_last_success_start, + 'taskTimesOutAt', t_task_times_out_at, + 'failureCount', t_failure_count + )::text; + END IF; + ASSERT s_status >= t_status, 'do not go back in status'; + IF s_status = 'complete' THEN + t_last_task_end := now(); + t_last_success_start := t_last_task_start; + t_failure_count := 0; + ELSE + IF t_status != s_status THEN + t_failure_count := t_failure_count + 1; + END IF; + END IF; + + t_task_times_out_at := NULL; + UPDATE public.sync_info + SET status = s_status, + task_times_out_at=null, + last_task_end=t_last_task_end, + last_success_start=t_last_success_start, + failure_count=t_failure_count + WHERE id=t_id; + + RETURN jsonb_build_object( + 'version', 1, + 'ok', true, + 'stale', false, + 'requestedStatus', s_status, + 'callerWorker', s_worker, + 'currentWorker', t_worker, + 'currentStatus', t_status, + 'callerStartedAt', s_started_at, + 'currentStartedAt', t_last_task_start, + 'lastTaskEnd', t_last_task_end, + 'lastSuccessStart', t_last_success_start, + 'taskTimesOutAt', t_task_times_out_at, + 'failureCount', t_failure_count + ); +END; +$$; + +ALTER FUNCTION public.end_sync_task( + s_target bigint, + s_function character varying, + s_worker character varying, + s_status public.task_status, + s_started_at timestamptz +) OWNER TO "postgres"; + +GRANT ALL ON FUNCTION public.end_sync_task( + s_target bigint, + s_function character varying, + s_worker character varying, + s_status public.task_status, + s_started_at timestamptz +) TO "anon"; +GRANT ALL ON FUNCTION public.end_sync_task( + s_target bigint, + s_function character varying, + s_worker character varying, + s_status public.task_status, + s_started_at timestamptz +) TO "authenticated"; +GRANT ALL ON FUNCTION public.end_sync_task( + s_target bigint, + s_function character varying, + s_worker character varying, + s_status public.task_status, + s_started_at timestamptz +) TO "service_role"; diff --git a/packages/database/supabase/schemas/sync.sql b/packages/database/supabase/schemas/sync.sql index 10e73a91d..2316e658c 100644 --- a/packages/database/supabase/schemas/sync.sql +++ b/packages/database/supabase/schemas/sync.sql @@ -52,7 +52,7 @@ CREATE OR REPLACE FUNCTION public.end_sync_task( s_worker character varying, s_status public.task_status, s_started_at timestamptz = NULL -) RETURNS void +) RETURNS jsonb SET search_path = '' LANGUAGE plpgsql AS $$ @@ -63,16 +63,48 @@ DECLARE t_failure_count SMALLINT; DECLARE t_last_task_start TIMESTAMP WITH TIME ZONE; DECLARE t_last_success_start TIMESTAMP WITH TIME ZONE; DECLARE t_last_task_end TIMESTAMP WITH TIME ZONE; +DECLARE t_task_times_out_at TIMESTAMP WITH TIME ZONE; BEGIN - SELECT id, worker, status, failure_count, last_task_start, last_task_end, last_success_start - INTO STRICT t_id, t_worker, t_status, t_failure_count, t_last_task_start, t_last_task_end, t_last_success_start - FROM public.sync_info WHERE sync_target = s_target AND sync_function = s_function; + SELECT id, worker, status, failure_count, last_task_start, last_task_end, last_success_start, task_times_out_at + INTO STRICT t_id, t_worker, t_status, t_failure_count, t_last_task_start, t_last_task_end, t_last_success_start, t_task_times_out_at + FROM public.sync_info WHERE sync_target = s_target AND sync_function = s_function + FOR UPDATE; ASSERT s_status > 'active'; IF t_worker != s_worker AND COALESCE(s_started_at, t_last_task_start) < t_last_task_start THEN -- we probably took too long. Let the other task have priority. - RETURN; + RETURN jsonb_build_object( + 'version', 1, + 'ok', false, + 'stale', true, + 'reason', 'completed_by_newer_task', + 'requestedStatus', s_status, + 'callerWorker', s_worker, + 'currentWorker', t_worker, + 'currentStatus', t_status, + 'callerStartedAt', s_started_at, + 'currentStartedAt', t_last_task_start, + 'lastTaskEnd', t_last_task_end, + 'lastSuccessStart', t_last_success_start, + 'taskTimesOutAt', t_task_times_out_at, + 'failureCount', t_failure_count + ); + END IF; + IF t_worker != s_worker THEN + RAISE EXCEPTION 'Wrong worker' + USING DETAIL = jsonb_build_object( + 'version', 1, + 'requestedStatus', s_status, + 'callerWorker', s_worker, + 'currentWorker', t_worker, + 'currentStatus', t_status, + 'callerStartedAt', s_started_at, + 'currentStartedAt', t_last_task_start, + 'lastTaskEnd', t_last_task_end, + 'lastSuccessStart', t_last_success_start, + 'taskTimesOutAt', t_task_times_out_at, + 'failureCount', t_failure_count + )::text; END IF; - ASSERT t_worker = s_worker, 'Wrong worker'; ASSERT s_status >= t_status, 'do not go back in status'; IF s_status = 'complete' THEN t_last_task_end := now(); @@ -84,6 +116,7 @@ BEGIN END IF; END IF; + t_task_times_out_at := NULL; UPDATE public.sync_info SET status = s_status, task_times_out_at=null, @@ -91,6 +124,22 @@ BEGIN last_success_start=t_last_success_start, failure_count=t_failure_count WHERE id=t_id; + + RETURN jsonb_build_object( + 'version', 1, + 'ok', true, + 'stale', false, + 'requestedStatus', s_status, + 'callerWorker', s_worker, + 'currentWorker', t_worker, + 'currentStatus', t_status, + 'callerStartedAt', s_started_at, + 'currentStartedAt', t_last_task_start, + 'lastTaskEnd', t_last_task_end, + 'lastSuccessStart', t_last_success_start, + 'taskTimesOutAt', t_task_times_out_at, + 'failureCount', t_failure_count + ); END; $$;