Skip to content

Commit bca8d0b

Browse files
author
qq_62395577
committed
新增分布式锁,防值两个node同时落库
1 parent a7c760d commit bca8d0b

1 file changed

Lines changed: 30 additions & 16 deletions

File tree

app-builder/plugins/aipp-plugin/src/main/java/modelengine/fit/jober/aipp/fitable/AippFlowEndCallback.java

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -151,16 +151,34 @@ public void callback(List<Map<String, Object>> contexts) {
151151
String aippInstId = ObjectUtils.cast(businessData.get(AippConst.BS_AIPP_INST_ID_KEY));
152152
String parentCallbackId = ObjectUtils.cast(businessData.get(AippConst.PARENT_CALLBACK_ID));
153153
boolean allowTerminalSignal = StringUtils.isEmpty(parentCallbackId);
154-
this.insertEndNodeStatus(contexts, aippInstId, appTask.getEntity().getFlowConfigId());
155-
boolean readyToTerminate = this.shouldSendLastData(contexts, appTask);
154+
String lockKey = TERMINATE_LOCK_PREFIX + aippInstId;
155+
Lock lock = this.distributedLockProvider.get(lockKey);
156+
lock.lock();
157+
try {
158+
this.insertEndNodeStatus(contexts, aippInstId, appTask.getEntity().getFlowConfigId());
159+
boolean readyToTerminate = this.shouldSendLastData(contexts, appTask);
160+
this.executeAfterReadyToTerminateCheck(contexts, businessData, versionId, aippInstId, context, appTask,
161+
readyToTerminate, allowTerminalSignal);
162+
} finally {
163+
lock.unlock();
164+
}
165+
166+
// 子流程 callback 主流程
167+
if (StringUtils.isNotEmpty(parentCallbackId)) {
168+
this.brokerClient.getRouter(FlowCallbackService.class, "w8onlgq9xsw13jce4wvbcz3kbmjv3tuw")
169+
.route(new FitableIdFilter(parentCallbackId))
170+
.format(SerializationFormat.CBOR)
171+
.invoke(contexts);
172+
}
173+
}
174+
175+
private void executeAfterReadyToTerminateCheck(List<Map<String, Object>> contexts,
176+
Map<String, Object> businessData, String versionId, String aippInstId,
177+
OperationContext context, AppTask appTask, boolean readyToTerminate, boolean allowTerminalSignal) {
156178
this.saveInstance(businessData, versionId, aippInstId, context, appTask, readyToTerminate);
157179
String parentInstanceId = ObjectUtils.cast(businessData.get(AippConst.PARENT_INSTANCE_ID));
158180
String appId = ObjectUtils.cast(appTask.getEntity().getAppId());
159181
businessData.put(AippConst.ATTR_APP_ID_KEY, appId);
160-
// 表明流程结果是否需要再经过模型加工,当前场景全为false。
161-
// 正常情况下应该是在结束节点配上该key并放入businessData中,此处模拟该过程。
162-
// 如果子流程结束后需要再经过模型加工,子流程结束节点不打印日志;否则子流程结束节点需要打印日志。
163-
// 如果前一个节点是人工检查节点,并在结束节点reference到了表单,那么这里一定会打印消息。
164182
businessData.put(AippConst.BS_AIPP_OUTPUT_IS_NEEDED_LLM, false);
165183
if (businessData.containsKey(AippConst.BS_END_FORM_ID_KEY)) {
166184
String endFormId = ObjectUtils.cast(businessData.get(AippConst.BS_END_FORM_ID_KEY));
@@ -183,7 +201,7 @@ public void callback(List<Map<String, Object>> contexts) {
183201
.instanceId(aippInstId).logId(returnedLogId)
184202
.build();
185203
if (readyToTerminate && allowTerminalSignal) {
186-
this.sendTerminalSignalWithLock(aippInstId, appChatRsp);
204+
this.doSendTerminalSignal(aippInstId, appChatRsp);
187205
} else {
188206
this.appChatSseService.send(aippInstId, appChatRsp);
189207
}
@@ -193,14 +211,6 @@ public void callback(List<Map<String, Object>> contexts) {
193211
this.sendTerminalEventIfReady(readyToTerminate && allowTerminalSignal, contexts, appTask,
194212
businessData, context, aippInstId);
195213
}
196-
197-
// 子流程 callback 主流程
198-
if (StringUtils.isNotEmpty(parentCallbackId)) {
199-
this.brokerClient.getRouter(FlowCallbackService.class, "w8onlgq9xsw13jce4wvbcz3kbmjv3tuw")
200-
.route(new FitableIdFilter(parentCallbackId))
201-
.format(SerializationFormat.CBOR)
202-
.invoke(contexts);
203-
}
204214
}
205215

206216
private void sendTerminalEventIfReady(boolean readyToTerminate, List<Map<String, Object>> contexts, AppTask appTask,
@@ -217,7 +227,7 @@ private void sendTerminalEventIfReady(boolean readyToTerminate, List<Map<String,
217227
.answer(Collections.emptyList())
218228
.extension(this.buildEndNodeSummary(contexts, appTask))
219229
.build();
220-
this.sendTerminalSignalWithLock(aippInstId, terminalRsp);
230+
this.doSendTerminalSignal(aippInstId, terminalRsp);
221231
}
222232

223233
private void sendTerminalSignalWithLock(String aippInstId, AppChatRsp appChatRsp) {
@@ -233,6 +243,10 @@ private void sendTerminalSignalWithLock(String aippInstId, AppChatRsp appChatRsp
233243
// If lock acquisition fails, another thread has already sent the terminal signal, skip.
234244
}
235245

246+
private void doSendTerminalSignal(String aippInstId, AppChatRsp appChatRsp) {
247+
this.appChatSseService.sendLastData(aippInstId, appChatRsp);
248+
}
249+
236250
private void insertEndNodeStatus(List<Map<String, Object>> contexts, String aippInstId, String flowConfigId) {
237251
String flowTraceId = DataUtils.getFlowTraceId(contexts);
238252
String currentNodeId = ObjectUtils.cast(contexts.get(0).get(AippConst.BS_NODE_ID_KEY));

0 commit comments

Comments
 (0)