Skip to content

Commit 62c8181

Browse files
committed
fix: for the paste node trpc procedure emit the migrated nodes instead of original one. Add support for updating multiple nodes and introduce NodesUpdated event
1 parent adce3a2 commit 62c8181

6 files changed

Lines changed: 93 additions & 17 deletions

File tree

apps/chaingraph-frontend/src/store/flow/stores.ts

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ import {
5151
removeNode,
5252
setNodeVersion,
5353
updateNode,
54+
updateNodes,
5455
updateNodeUILocal,
5556
} from '../nodes/stores'
5657
import {
@@ -432,14 +433,11 @@ function createEventHandlers(flowId: string, nodes: Record<string, INode>): Flow
432433

433434
[FlowEventType.NodeUpdated]: (data) => {
434435
const node = nodes[data.node.id]
435-
if (!node) {
436-
console.error(`[NodeUpdated] Node ${data.node.id} not found`)
437-
return
438-
}
439-
440-
if (data.node.getVersion() && data.node.getVersion() < node.getVersion()) {
441-
// console.warn(`[NodeUpdated] Received outdated node update event for node ${data.node.id}`)
442-
return
436+
if (node) {
437+
if (data.node.getVersion() && data.node.getVersion() < node.getVersion()) {
438+
// console.warn(`[NodeUpdated] Received outdated node update event for node ${data.node.id}`)
439+
return
440+
}
443441
}
444442

445443
// console.log(`[NodeUpdated] Updating node ${data.node.id} to version ${data.node.getVersion()}`)
@@ -448,6 +446,27 @@ function createEventHandlers(flowId: string, nodes: Record<string, INode>): Flow
448446
nodeUpdated(data.node.id)
449447
},
450448

449+
[FlowEventType.NodesUpdated]: (data) => {
450+
const validNodes = data.nodes.filter((node) => {
451+
const existingNode = nodes[node.id]
452+
if (existingNode) {
453+
if (node.getVersion() && node.getVersion() < existingNode.getVersion()) {
454+
// console.warn(`[NodesUpdated] Skipping outdated node update event for node ${node.id}`)
455+
return false
456+
}
457+
}
458+
return true
459+
})
460+
461+
if (validNodes.length === 0) {
462+
return
463+
}
464+
465+
// console.log(`[NodesUpdated] Updating nodes:`, validNodes.map(n => n.id).join(', '))
466+
updateNodes(validNodes)
467+
validNodes.forEach(node => nodeUpdated(node.id))
468+
},
469+
451470
[FlowEventType.NodeRemoved]: (data) => {
452471
removeNode(data.nodeId)
453472
},

apps/chaingraph-frontend/src/store/nodes/stores.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import { positionInterpolator } from './position-interpolation-advanced'
4242
export const addNode = nodesDomain.createEvent<INode>()
4343
export const addNodes = nodesDomain.createEvent<INode[]>()
4444
export const updateNode = nodesDomain.createEvent<INode>()
45+
export const updateNodes = nodesDomain.createEvent<INode[]>()
4546
export const removeNode = nodesDomain.createEvent<string>()
4647
export const setNodeMetadata = nodesDomain.createEvent<{ nodeId: string, metadata: NodeState['metadata'] }>()
4748
export const setNodeVersion = nodesDomain.createEvent<{ nodeId: string, version: number }>()
@@ -214,6 +215,16 @@ export const $nodes = nodesDomain.createStore<Record<string, INode>>({})
214215
return newState
215216
})
216217

218+
// Update nodes operation
219+
.on(updateNodes, (state, nodes) => {
220+
const newState = { ...state }
221+
nodes.forEach((node) => {
222+
newState[node.id] = node
223+
})
224+
225+
return newState
226+
})
227+
217228
.on(updateNode, (state, node) => {
218229
return { ...state, [node.id]: node }
219230
})

packages/chaingraph-trpc/server/procedures/flow/paste-nodes.ts

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@
77
*/
88

99
import type {
10-
Flow,
1110
IEdge,
1211
INode,
1312
IPort,
1413
SerializedEdge,
1514
} from '@badaitech/chaingraph-types'
15+
import {
16+
Flow,
17+
} from '@badaitech/chaingraph-types'
1618
import {
1719
FlowMigration,
1820
} from '@badaitech/chaingraph-types'
@@ -81,10 +83,12 @@ export const pasteNodes = flowContextProcedure
8183
throw new Error(`Flow ${flowId} not found`)
8284
}
8385

86+
const emptyFlow = new Flow()
87+
8488
await ctx.flowStore.lockFlow(flowId)
8589

8690
try {
87-
flow.setIsDisabledPropagationEvents(true)
91+
emptyFlow.setIsDisabledPropagationEvents(true)
8892

8993
// Step 1: Clone nodes with new IDs using cloneWithNewId()
9094
const nodeIdMapping = new Map<string, string>()
@@ -192,7 +196,7 @@ export const pasteNodes = flowContextProcedure
192196
}
193197
}
194198

195-
const addedNodes = await flow.addNodes(createdNodes, false)
199+
const addedNodes = await emptyFlow.addNodes(createdNodes, true)
196200

197201
// Step 2: Recreate edges using new IDs
198202
const createdEdges: SerializedEdge[] = []
@@ -241,10 +245,13 @@ export const pasteNodes = flowContextProcedure
241245
})
242246
.filter((edge): edge is NonNullable<typeof edge> => edge !== null)
243247

244-
await flow.addEdges(edgesToAdd, false)
248+
await emptyFlow.addEdges(edgesToAdd, true)
245249

246250
// Force migrate flow to v2
247-
FlowMigration.migrateFlowFromV1ToV2(flow)
251+
const flowWithPastedNodes = FlowMigration.migrateFlowFromV1ToV2(emptyFlow)
252+
253+
await flow.addNodes(Array.from(flowWithPastedNodes.nodes.values()), false)
254+
await flow.addEdges(Array.from(flowWithPastedNodes.edges.values()), false)
248255

249256
// Save the updated flow
250257
await ctx.flowStore.updateFlow(flow as Flow)

packages/chaingraph-types/src/flow/events.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ export enum FlowEventType {
2525
NodesAdded = 'flow:nodes:added',
2626
NodeAdded = 'flow:node:added',
2727
NodeRemoved = 'flow:node:removed',
28+
NodesUpdated = 'flow:nodes:updated',
2829
NodeUpdated = 'flow:node:updated',
2930
NodeParentUpdated = 'flow:node:parent-updated',
3031

@@ -69,6 +70,11 @@ export interface NodeUpdatedEventData {
6970
node: INode
7071
}
7172

73+
/** Data for NodesUpdated event */
74+
export interface NodesUpdatedEventData {
75+
nodes: INode[]
76+
}
77+
7278
/** Data for PortCreated event */
7379
export interface PortCreatedEventData {
7480
port: IPort
@@ -183,6 +189,7 @@ export interface EventDataMap {
183189
[FlowEventType.NodeAdded]: NodeAddedEventData
184190
[FlowEventType.NodeRemoved]: NodeRemovedEventData
185191
[FlowEventType.NodeUpdated]: NodeUpdatedEventData
192+
[FlowEventType.NodesUpdated]: NodesUpdatedEventData
186193
[FlowEventType.PortCreated]: PortCreatedEventData
187194
[FlowEventType.PortRemoved]: PortRemovedEventData
188195
[FlowEventType.PortUpdated]: PortUpdatedEventData

packages/chaingraph-types/src/flow/flow.ts

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -165,10 +165,6 @@ export class Flow implements IFlow {
165165
}
166166

167167
async updateNode(node: INode): Promise<void> {
168-
if (!this.nodes.has(node.id)) {
169-
throw new Error(`Node with ID ${node.id} does not exist in the flow.`)
170-
}
171-
172168
// Commit any pending batch updates before updating the node
173169
// This ensures all collected port updates are emitted before the flow update
174170
await node.commitBatchUpdate({ sourceOfUpdate: 'flow:updateNode' })
@@ -199,6 +195,36 @@ export class Flow implements IFlow {
199195
))
200196
}
201197

198+
async updateNodes(nodes: INode[]): Promise<void> {
199+
if (nodes.length === 0) {
200+
return Promise.resolve()
201+
}
202+
203+
// Commit any pending batch updates before updating the nodes
204+
await Promise.all(nodes.map(node => node.commitBatchUpdate({ sourceOfUpdate: 'flow:updateNodes' })))
205+
206+
// cancel the previous event handler if it exists
207+
nodes.forEach((node) => {
208+
const oldCancel = this.nodeEventHandlersCancel.get(node.id)
209+
this.nodes.set(node.id, node)
210+
const newCancel = node.onAll(async (event: NodeEvent) => {
211+
return this.handleNodeEvent(node, event)
212+
})
213+
if (oldCancel) {
214+
oldCancel()
215+
}
216+
this.nodeEventHandlersCancel.set(node.id, newCancel)
217+
})
218+
219+
// Emit NodesUpdated event
220+
return this.emitEvent(newEvent(
221+
this.getNextEventIndex(),
222+
this.id,
223+
FlowEventType.NodesUpdated,
224+
{ nodes: nodes.map(n => n.clone()) },
225+
))
226+
}
227+
202228
async removeNode(nodeId: string): Promise<void> {
203229
const node = this.nodes.get(nodeId)
204230
if (!node) {

packages/chaingraph-types/src/flow/interface.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,12 @@ export interface IFlow {
6363
*/
6464
updateNode: (node: INode) => Promise<void>
6565

66+
/**
67+
* Updates multiple nodes in the flow and triggers an event.
68+
* @param node
69+
*/
70+
updateNodes: (node: INode[]) => Promise<void>
71+
6672
/**
6773
* Removes a node from the flow.
6874
* @param nodeId The ID of the node to remove.

0 commit comments

Comments
 (0)