diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx index 57fd7a41706..6d513074196 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx @@ -265,7 +265,7 @@ const WorkflowContent = React.memo( const { fitViewToBounds, getViewportCenter } = useCanvasViewport(reactFlowInstance, { embedded, }) - const { emitCursorUpdate } = useSocket() + const { emitCursorUpdate, joinWorkflow, leaveWorkflow } = useSocket() useDynamicHandleRefresh() const workspaceId = propWorkspaceId || (params.workspaceId as string) @@ -273,6 +273,14 @@ const WorkflowContent = React.memo( const addNotification = useNotificationStore((state) => state.addNotification) + useEffect(() => { + if (!embedded || !workflowIdParam) return + joinWorkflow(workflowIdParam) + return () => { + leaveWorkflow() + } + }, [embedded, workflowIdParam, joinWorkflow, leaveWorkflow]) + useOAuthReturnForWorkflow(workflowIdParam) const { @@ -2144,12 +2152,9 @@ const WorkflowContent = React.memo( const handleCanvasPointerMove = useCallback( (event: React.PointerEvent) => { - const target = event.currentTarget as HTMLElement - const bounds = target.getBoundingClientRect() - const position = screenToFlowPosition({ - x: event.clientX - bounds.left, - y: event.clientY - bounds.top, + x: event.clientX, + y: event.clientY, }) emitCursorUpdate(position) diff --git a/apps/sim/app/workspace/providers/socket-provider.tsx b/apps/sim/app/workspace/providers/socket-provider.tsx index 47c00157d0f..91a415df132 100644 --- a/apps/sim/app/workspace/providers/socket-provider.tsx +++ b/apps/sim/app/workspace/providers/socket-provider.tsx @@ -90,6 +90,7 @@ interface SocketContextType { onSelectionUpdate: (handler: (data: any) => void) => void onWorkflowDeleted: (handler: (data: any) => void) => void onWorkflowReverted: (handler: (data: any) => void) => void + onWorkflowUpdated: (handler: (data: any) => void) => void onOperationConfirmed: (handler: (data: any) => void) => void onOperationFailed: (handler: (data: any) => void) => void } @@ -118,6 +119,7 @@ const SocketContext = createContext({ onSelectionUpdate: () => {}, onWorkflowDeleted: () => {}, onWorkflowReverted: () => {}, + onWorkflowUpdated: () => {}, onOperationConfirmed: () => {}, onOperationFailed: () => {}, }) @@ -155,6 +157,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { selectionUpdate?: (data: any) => void workflowDeleted?: (data: any) => void workflowReverted?: (data: any) => void + workflowUpdated?: (data: any) => void operationConfirmed?: (data: any) => void operationFailed?: (data: any) => void }>({}) @@ -334,7 +337,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { socketInstance.on('join-workflow-success', ({ workflowId, presenceUsers }) => { isRejoiningRef.current = false // Ignore stale success responses from previous navigation - if (workflowId !== urlWorkflowIdRef.current) { + if (urlWorkflowIdRef.current && workflowId !== urlWorkflowIdRef.current) { logger.debug(`Ignoring stale join-workflow-success for ${workflowId}`) return } @@ -382,6 +385,11 @@ export function SocketProvider({ children, user }: SocketProviderProps) { eventHandlers.current.workflowReverted?.(data) }) + socketInstance.on('workflow-updated', (data) => { + logger.info(`Workflow ${data.workflowId} has been updated externally`) + eventHandlers.current.workflowUpdated?.(data) + }) + const rehydrateWorkflowStores = async (workflowId: string, workflowState: any) => { const [ { useOperationQueueStore }, @@ -803,6 +811,10 @@ export function SocketProvider({ children, user }: SocketProviderProps) { eventHandlers.current.workflowReverted = handler }, []) + const onWorkflowUpdated = useCallback((handler: (data: any) => void) => { + eventHandlers.current.workflowUpdated = handler + }, []) + const onOperationConfirmed = useCallback((handler: (data: any) => void) => { eventHandlers.current.operationConfirmed = handler }, []) @@ -836,6 +848,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { onSelectionUpdate, onWorkflowDeleted, onWorkflowReverted, + onWorkflowUpdated, onOperationConfirmed, onOperationFailed, }), @@ -863,6 +876,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { onSelectionUpdate, onWorkflowDeleted, onWorkflowReverted, + onWorkflowUpdated, onOperationConfirmed, onOperationFailed, ] diff --git a/apps/sim/hooks/use-collaborative-workflow.ts b/apps/sim/hooks/use-collaborative-workflow.ts index 0b22a47de4b..5bf03b2683c 100644 --- a/apps/sim/hooks/use-collaborative-workflow.ts +++ b/apps/sim/hooks/use-collaborative-workflow.ts @@ -122,6 +122,7 @@ export function useCollaborativeWorkflow() { onVariableUpdate, onWorkflowDeleted, onWorkflowReverted, + onWorkflowUpdated, onOperationConfirmed, onOperationFailed, } = useSocket() @@ -536,81 +537,99 @@ export function useCollaborativeWorkflow() { } } + const reloadWorkflowFromApi = async (workflowId: string, reason: string): Promise => { + const response = await fetch(`/api/workflows/${workflowId}`) + if (!response.ok) { + logger.error(`Failed to fetch workflow data after ${reason}: ${response.statusText}`) + return false + } + + const responseData = await response.json() + const workflowData = responseData.data + + if (!workflowData?.state) { + logger.error(`No state found in workflow data after ${reason}`, { workflowData }) + return false + } + + isApplyingRemoteChange.current = true + try { + useWorkflowStore.getState().replaceWorkflowState({ + blocks: workflowData.state.blocks || {}, + edges: workflowData.state.edges || [], + loops: workflowData.state.loops || {}, + parallels: workflowData.state.parallels || {}, + lastSaved: workflowData.state.lastSaved || Date.now(), + }) + + const subblockValues: Record> = {} + Object.entries(workflowData.state.blocks || {}).forEach(([blockId, block]) => { + const blockState = block as any + subblockValues[blockId] = {} + Object.entries(blockState.subBlocks || {}).forEach(([subblockId, subblock]) => { + subblockValues[blockId][subblockId] = (subblock as any).value + }) + }) + + useSubBlockStore.setState((state: any) => ({ + workflowValues: { + ...state.workflowValues, + [workflowId]: subblockValues, + }, + })) + + const graph = { + blocksById: workflowData.state.blocks || {}, + edgesById: Object.fromEntries( + (workflowData.state.edges || []).map((e: any) => [e.id, e]) + ), + } + + const undoRedoStore = useUndoRedoStore.getState() + const stackKeys = Object.keys(undoRedoStore.stacks) + stackKeys.forEach((key) => { + const [wfId, userId] = key.split(':') + if (wfId === workflowId) { + undoRedoStore.pruneInvalidEntries(wfId, userId, graph) + } + }) + + logger.info(`Successfully reloaded workflow state after ${reason}`, { workflowId }) + return true + } finally { + isApplyingRemoteChange.current = false + } + } + const handleWorkflowReverted = async (data: any) => { const { workflowId } = data logger.info(`Workflow ${workflowId} has been reverted to deployed state`) - // If the reverted workflow is the currently active one, reload the workflow state - if (activeWorkflowId === workflowId) { - logger.info(`Currently active workflow ${workflowId} was reverted, reloading state`) - - try { - // Fetch the updated workflow state from the server (which loads from normalized tables) - const response = await fetch(`/api/workflows/${workflowId}`) - if (response.ok) { - const responseData = await response.json() - const workflowData = responseData.data - - if (workflowData?.state) { - // Update the workflow store with the reverted state - isApplyingRemoteChange.current = true - try { - // Update the main workflow state using the API response - useWorkflowStore.getState().replaceWorkflowState({ - blocks: workflowData.state.blocks || {}, - edges: workflowData.state.edges || [], - loops: workflowData.state.loops || {}, - parallels: workflowData.state.parallels || {}, - lastSaved: workflowData.state.lastSaved || Date.now(), - }) + if (activeWorkflowId !== workflowId) return - // Update subblock store with reverted values - const subblockValues: Record> = {} - Object.entries(workflowData.state.blocks || {}).forEach(([blockId, block]) => { - const blockState = block as any - subblockValues[blockId] = {} - Object.entries(blockState.subBlocks || {}).forEach(([subblockId, subblock]) => { - subblockValues[blockId][subblockId] = (subblock as any).value - }) - }) + try { + await reloadWorkflowFromApi(workflowId, 'revert') + } catch (error) { + logger.error('Error reloading workflow state after revert:', error) + } + } - // Update subblock store for this workflow - useSubBlockStore.setState((state: any) => ({ - workflowValues: { - ...state.workflowValues, - [workflowId]: subblockValues, - }, - })) + const handleWorkflowUpdated = async (data: any) => { + const { workflowId } = data + logger.info(`Workflow ${workflowId} has been updated externally`) - logger.info(`Successfully loaded reverted workflow state for ${workflowId}`) + if (activeWorkflowId !== workflowId) return - const graph = { - blocksById: workflowData.state.blocks || {}, - edgesById: Object.fromEntries( - (workflowData.state.edges || []).map((e: any) => [e.id, e]) - ), - } + const { hasActiveDiff } = useWorkflowDiffStore.getState() + if (hasActiveDiff) { + logger.info('Skipping workflow-updated: active diff in progress', { workflowId }) + return + } - const undoRedoStore = useUndoRedoStore.getState() - const stackKeys = Object.keys(undoRedoStore.stacks) - stackKeys.forEach((key) => { - const [wfId, userId] = key.split(':') - if (wfId === workflowId) { - undoRedoStore.pruneInvalidEntries(wfId, userId, graph) - } - }) - } finally { - isApplyingRemoteChange.current = false - } - } else { - logger.error('No state found in workflow data after revert', { workflowData }) - } - } else { - logger.error(`Failed to fetch workflow data after revert: ${response.statusText}`) - } - } catch (error) { - logger.error('Error reloading workflow state after revert:', error) - } + try { + await reloadWorkflowFromApi(workflowId, 'external update') + } catch (error) { + logger.error('Error reloading workflow state after external update:', error) } } @@ -632,6 +651,7 @@ export function useCollaborativeWorkflow() { onVariableUpdate(handleVariableUpdate) onWorkflowDeleted(handleWorkflowDeleted) onWorkflowReverted(handleWorkflowReverted) + onWorkflowUpdated(handleWorkflowUpdated) onOperationConfirmed(handleOperationConfirmed) onOperationFailed(handleOperationFailed) }, [ @@ -640,6 +660,7 @@ export function useCollaborativeWorkflow() { onVariableUpdate, onWorkflowDeleted, onWorkflowReverted, + onWorkflowUpdated, onOperationConfirmed, onOperationFailed, activeWorkflowId, diff --git a/apps/sim/lib/copilot/tools/server/workflow/edit-workflow/index.ts b/apps/sim/lib/copilot/tools/server/workflow/edit-workflow/index.ts index eb0a0f23ed6..7be066d2989 100644 --- a/apps/sim/lib/copilot/tools/server/workflow/edit-workflow/index.ts +++ b/apps/sim/lib/copilot/tools/server/workflow/edit-workflow/index.ts @@ -7,6 +7,7 @@ import { type BaseServerTool, type ServerToolContext, } from '@/lib/copilot/tools/server/base-tool' +import { env } from '@/lib/core/config/env' import { applyTargetedLayout, getTargetedLayoutImpact } from '@/lib/workflows/autolayout' import { DEFAULT_HORIZONTAL_SPACING, @@ -287,6 +288,18 @@ export const editWorkflowServerTool: BaseServerTool logger.info('Workflow state persisted to database', { workflowId }) + const socketUrl = env.SOCKET_SERVER_URL || 'http://localhost:3002' + fetch(`${socketUrl}/api/workflow-updated`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'x-api-key': env.INTERNAL_API_SECRET, + }, + body: JSON.stringify({ workflowId }), + }).catch((error) => { + logger.warn('Failed to notify socket server of workflow update', { workflowId, error }) + }) + const sanitizationWarnings = validation.warnings.length > 0 ? validation.warnings : undefined return {