Skip to content

Commit 6255838

Browse files
committed
improvement(mothership): workflow edits via sockets
1 parent e8f7fe0 commit 6255838

File tree

3 files changed

+93
-0
lines changed

3 files changed

+93
-0
lines changed

apps/sim/app/workspace/providers/socket-provider.tsx

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ interface SocketContextType {
9090
onSelectionUpdate: (handler: (data: any) => void) => void
9191
onWorkflowDeleted: (handler: (data: any) => void) => void
9292
onWorkflowReverted: (handler: (data: any) => void) => void
93+
onWorkflowUpdated: (handler: (data: any) => void) => void
9394
onOperationConfirmed: (handler: (data: any) => void) => void
9495
onOperationFailed: (handler: (data: any) => void) => void
9596
}
@@ -118,6 +119,7 @@ const SocketContext = createContext<SocketContextType>({
118119
onSelectionUpdate: () => {},
119120
onWorkflowDeleted: () => {},
120121
onWorkflowReverted: () => {},
122+
onWorkflowUpdated: () => {},
121123
onOperationConfirmed: () => {},
122124
onOperationFailed: () => {},
123125
})
@@ -155,6 +157,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
155157
selectionUpdate?: (data: any) => void
156158
workflowDeleted?: (data: any) => void
157159
workflowReverted?: (data: any) => void
160+
workflowUpdated?: (data: any) => void
158161
operationConfirmed?: (data: any) => void
159162
operationFailed?: (data: any) => void
160163
}>({})
@@ -382,6 +385,11 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
382385
eventHandlers.current.workflowReverted?.(data)
383386
})
384387

388+
socketInstance.on('workflow-updated', (data) => {
389+
logger.info(`Workflow ${data.workflowId} has been updated externally`)
390+
eventHandlers.current.workflowUpdated?.(data)
391+
})
392+
385393
const rehydrateWorkflowStores = async (workflowId: string, workflowState: any) => {
386394
const [
387395
{ useOperationQueueStore },
@@ -804,6 +812,10 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
804812
eventHandlers.current.workflowReverted = handler
805813
}, [])
806814

815+
const onWorkflowUpdated = useCallback((handler: (data: any) => void) => {
816+
eventHandlers.current.workflowUpdated = handler
817+
}, [])
818+
807819
const onOperationConfirmed = useCallback((handler: (data: any) => void) => {
808820
eventHandlers.current.operationConfirmed = handler
809821
}, [])
@@ -837,6 +849,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
837849
onSelectionUpdate,
838850
onWorkflowDeleted,
839851
onWorkflowReverted,
852+
onWorkflowUpdated,
840853
onOperationConfirmed,
841854
onOperationFailed,
842855
}),
@@ -864,6 +877,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
864877
onSelectionUpdate,
865878
onWorkflowDeleted,
866879
onWorkflowReverted,
880+
onWorkflowUpdated,
867881
onOperationConfirmed,
868882
onOperationFailed,
869883
]

apps/sim/hooks/use-collaborative-workflow.ts

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ export function useCollaborativeWorkflow() {
122122
onVariableUpdate,
123123
onWorkflowDeleted,
124124
onWorkflowReverted,
125+
onWorkflowUpdated,
125126
onOperationConfirmed,
126127
onOperationFailed,
127128
} = useSocket()
@@ -615,6 +616,67 @@ export function useCollaborativeWorkflow() {
615616
}
616617
}
617618

619+
const handleWorkflowUpdated = async (data: any) => {
620+
const { workflowId } = data
621+
logger.info(`Workflow ${workflowId} has been updated externally`)
622+
623+
if (activeWorkflowId !== workflowId) return
624+
625+
const { hasActiveDiff } = useWorkflowDiffStore.getState()
626+
if (hasActiveDiff) {
627+
logger.info('Skipping workflow-updated: active diff in progress', { workflowId })
628+
return
629+
}
630+
631+
try {
632+
const response = await fetch(`/api/workflows/${workflowId}`)
633+
if (response.ok) {
634+
const responseData = await response.json()
635+
const workflowData = responseData.data
636+
637+
if (workflowData?.state) {
638+
isApplyingRemoteChange.current = true
639+
try {
640+
useWorkflowStore.getState().replaceWorkflowState({
641+
blocks: workflowData.state.blocks || {},
642+
edges: workflowData.state.edges || [],
643+
loops: workflowData.state.loops || {},
644+
parallels: workflowData.state.parallels || {},
645+
lastSaved: workflowData.state.lastSaved || Date.now(),
646+
deploymentStatuses: workflowData.state.deploymentStatuses || {},
647+
})
648+
649+
const subblockValues: Record<string, Record<string, any>> = {}
650+
Object.entries(workflowData.state.blocks || {}).forEach(([blockId, block]) => {
651+
const blockState = block as any
652+
subblockValues[blockId] = {}
653+
Object.entries(blockState.subBlocks || {}).forEach(([subblockId, subblock]) => {
654+
subblockValues[blockId][subblockId] = (subblock as any).value
655+
})
656+
})
657+
658+
useSubBlockStore.setState((state: any) => ({
659+
workflowValues: {
660+
...state.workflowValues,
661+
[workflowId]: subblockValues,
662+
},
663+
}))
664+
665+
logger.info(`Successfully applied externally updated workflow state`, { workflowId })
666+
} finally {
667+
isApplyingRemoteChange.current = false
668+
}
669+
}
670+
} else {
671+
logger.error(
672+
`Failed to fetch workflow data after external update: ${response.statusText}`
673+
)
674+
}
675+
} catch (error) {
676+
logger.error('Error reloading workflow state after external update:', error)
677+
}
678+
}
679+
618680
const handleOperationConfirmed = (data: any) => {
619681
const { operationId } = data
620682
logger.debug('Operation confirmed', { operationId })
@@ -633,6 +695,7 @@ export function useCollaborativeWorkflow() {
633695
onVariableUpdate(handleVariableUpdate)
634696
onWorkflowDeleted(handleWorkflowDeleted)
635697
onWorkflowReverted(handleWorkflowReverted)
698+
onWorkflowUpdated(handleWorkflowUpdated)
636699
onOperationConfirmed(handleOperationConfirmed)
637700
onOperationFailed(handleOperationFailed)
638701
}, [
@@ -641,6 +704,7 @@ export function useCollaborativeWorkflow() {
641704
onVariableUpdate,
642705
onWorkflowDeleted,
643706
onWorkflowReverted,
707+
onWorkflowUpdated,
644708
onOperationConfirmed,
645709
onOperationFailed,
646710
activeWorkflowId,

apps/sim/lib/copilot/tools/server/workflow/edit-workflow/index.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
type BaseServerTool,
88
type ServerToolContext,
99
} from '@/lib/copilot/tools/server/base-tool'
10+
import { env } from '@/lib/core/config/env'
1011
import { applyTargetedLayout, getTargetedLayoutImpact } from '@/lib/workflows/autolayout'
1112
import {
1213
DEFAULT_HORIZONTAL_SPACING,
@@ -287,6 +288,20 @@ export const editWorkflowServerTool: BaseServerTool<EditWorkflowParams, unknown>
287288

288289
logger.info('Workflow state persisted to database', { workflowId })
289290

291+
try {
292+
const socketUrl = env.SOCKET_SERVER_URL || 'http://localhost:3002'
293+
await fetch(`${socketUrl}/api/workflow-updated`, {
294+
method: 'POST',
295+
headers: {
296+
'Content-Type': 'application/json',
297+
'x-api-key': env.INTERNAL_API_SECRET,
298+
},
299+
body: JSON.stringify({ workflowId }),
300+
})
301+
} catch (error) {
302+
logger.warn('Failed to notify socket server of workflow update', { workflowId, error })
303+
}
304+
290305
const sanitizationWarnings = validation.warnings.length > 0 ? validation.warnings : undefined
291306

292307
return {

0 commit comments

Comments
 (0)