diff --git a/app-builder/fel/java/pom.xml b/app-builder/fel/java/pom.xml index b354708748..403135a264 100644 --- a/app-builder/fel/java/pom.xml +++ b/app-builder/fel/java/pom.xml @@ -24,7 +24,7 @@ 17 - 3.5.0-SNAPSHOT + 3.5.0-M1 1.0.0-SNAPSHOT 1.0.0-SNAPSHOT 1.0.0-SNAPSHOT diff --git a/app-builder/jane/dynamic-form-genericable/pom.xml b/app-builder/jane/dynamic-form-genericable/pom.xml index 6428f94e54..67680dd588 100644 --- a/app-builder/jane/dynamic-form-genericable/pom.xml +++ b/app-builder/jane/dynamic-form-genericable/pom.xml @@ -19,12 +19,12 @@ org.fitframework fit-api - 3.5.0-SNAPSHOT + 3.5.0-M1 org.fitframework fit-util - 3.5.0-SNAPSHOT + 3.5.0-M1 modelengine.fit.jane @@ -34,7 +34,7 @@ org.fitframework.service fit-http-classic - 3.5.0-SNAPSHOT + 3.5.0-M1 org.projectlombok diff --git a/app-builder/jane/plugins/aipp-plugin/src/main/java/modelengine/fit/jober/aipp/fitable/FlowPublishSubscriber.java b/app-builder/jane/plugins/aipp-plugin/src/main/java/modelengine/fit/jober/aipp/fitable/FlowPublishSubscriber.java index d31da9391c..1ade931657 100644 --- a/app-builder/jane/plugins/aipp-plugin/src/main/java/modelengine/fit/jober/aipp/fitable/FlowPublishSubscriber.java +++ b/app-builder/jane/plugins/aipp-plugin/src/main/java/modelengine/fit/jober/aipp/fitable/FlowPublishSubscriber.java @@ -9,6 +9,7 @@ import modelengine.fit.jober.aipp.constants.AippConst; import modelengine.fit.jober.aipp.domain.AppBuilderRuntimeInfo; import modelengine.fit.jober.aipp.dto.chat.AppChatRsp; +import modelengine.fit.jober.aipp.entity.ChatSession; import modelengine.fit.jober.aipp.repository.AppBuilderRuntimeInfoRepository; import modelengine.fit.jober.aipp.service.AppChatSessionService; import modelengine.fit.jober.aipp.service.AppChatSseService; @@ -88,13 +89,17 @@ public void publishNodeInfo(FlowNodePublishInfo flowNodePublishInfo) { private void stageProcessedHandle(FlowNodePublishInfo flowNodePublishInfo, Map businessData, String aippInstId) { + Optional> instanceSession = this.appChatSessionService.getSession(aippInstId); + if (instanceSession.isPresent() && !instanceSession.get().isDebug()) { + return; + } FlowPublishContext context = flowNodePublishInfo.getFlowContext(); String traceId = context.getTraceId(); String nodeId = flowNodePublishInfo.getNodeId(); String nodeType = flowNodePublishInfo.getNodeType(); FlowErrorInfo errorInfo = flowNodePublishInfo.getErrorMsg(); AtomicReference locale = new AtomicReference<>(Locale.getDefault()); - appChatSessionService.getSession(aippInstId).ifPresent(e -> locale.set(e.getLocale())); + instanceSession.ifPresent(session -> locale.set(session.getLocale())); ToolExceptionHandle.handleFitException(errorInfo); String finalErrorMsg = this.toolExceptionHandle.getFixErrorMsg(errorInfo, locale.get(), false); if (StringUtils.isBlank(finalErrorMsg)) { @@ -103,7 +108,7 @@ private void stageProcessedHandle(FlowNodePublishInfo flowNodePublishInfo, Map initContext, OperationContext context) { - Meta meta = getMetaByAppId(metaService, appId, isDebug, context); + Meta meta = CacheUtils.getMetaByAppId(metaService, appId, isDebug, context); String metaInstId = this.createAippInstance(meta.getId(), meta.getVersion(), initContext, context); return metaInstanceService.list(Collections.singletonList(metaInstId), 0, 1, context) .getResults() @@ -268,46 +268,16 @@ public String createLatestAippInstanceByAppId(String appId, boolean isDebug, Map @Override public Tuple createInstanceByApp(String appId, String question, Map businessData, OperationContext context, boolean isDebug) { - Meta meta = getMetaByAppId(metaService, appId, isDebug, context); + Meta meta = CacheUtils.getMetaByAppId(metaService, appId, isDebug, context); return this.createInstanceHandle(question, businessData, meta, context, isDebug); } @Override public MetaVo queryLatestMetaVoByAppId(String appId, boolean isDebug, OperationContext context) { - Meta meta = getMetaByAppId(metaService, appId, isDebug, context); + Meta meta = CacheUtils.getMetaByAppId(metaService, appId, isDebug, context); return MetaVo.builder().id(meta.getId()).version(meta.getVersion()).build(); } - /** - * 根据appid查询对应meta - * - * @param metaService 操作meta的service - * @param appId 应用appId - * @param isDebug 是否为debug会话 - * @param context 操作上下文 - * @return app对应的meta信息 - */ - public static Meta getMetaByAppId(MetaService metaService, String appId, boolean isDebug, - OperationContext context) { - if (isDebug) { - return getLatestMetaByAppId(metaService, appId, context); - } - // get 一个aipp_id的缓存,然后根据aipp_id查询最新发布版的meta - String aippId = CacheUtils.APP_ID_TO_AIPP_ID_CACHE.get(appId, - (id) -> getLatestMetaByAppId(metaService, id, context).getId()); - Meta lastPublishedMeta = MetaUtils.getLastPublishedMeta(metaService, aippId, context); - return Optional.ofNullable(lastPublishedMeta) - .orElseThrow(() -> new AippException(AippErrCode.APP_CHAT_PUBLISHED_META_NOT_FOUND)); - } - - private static Meta getLatestMetaByAppId(MetaService metaService, String appId, OperationContext context) { - List meta = MetaUtils.getAllMetasByAppId(metaService, appId, context); - if (CollectionUtils.isEmpty(meta)) { - throw new AippException(AippErrCode.APP_CHAT_DEBUG_META_NOT_FOUND); - } - return meta.get(0); - } - @Override public Choir startFlowWithUserSelectMemory(String metaInstId, Map initContext, OperationContext context, boolean isDebug) { diff --git a/app-builder/jane/plugins/aipp-plugin/src/main/java/modelengine/fit/jober/aipp/service/impl/AppChatServiceImpl.java b/app-builder/jane/plugins/aipp-plugin/src/main/java/modelengine/fit/jober/aipp/service/impl/AppChatServiceImpl.java index 1b8284bf39..3b1a9d6d71 100644 --- a/app-builder/jane/plugins/aipp-plugin/src/main/java/modelengine/fit/jober/aipp/service/impl/AppChatServiceImpl.java +++ b/app-builder/jane/plugins/aipp-plugin/src/main/java/modelengine/fit/jober/aipp/service/impl/AppChatServiceImpl.java @@ -12,7 +12,6 @@ import static modelengine.fit.jober.aipp.constants.AippConst.BUSINESS_INFOS_KEY; import static modelengine.fit.jober.aipp.constants.AippConst.BUSINESS_INPUT_KEY; import static modelengine.fit.jober.aipp.enums.AppTypeEnum.APP; -import static modelengine.fit.jober.aipp.service.impl.AippRunTimeServiceImpl.getMetaByAppId; import modelengine.fit.jade.waterflow.FlowsService; import modelengine.fit.jane.common.entity.OperationContext; @@ -43,9 +42,9 @@ import modelengine.fit.jober.aipp.service.AppChatService; import modelengine.fit.jober.aipp.util.AippLogUtils; import modelengine.fit.jober.aipp.util.AppUtils; +import modelengine.fit.jober.aipp.util.CacheUtils; import modelengine.fit.jober.aipp.util.FlowUtils; import modelengine.fit.jober.aipp.util.JsonUtils; -import modelengine.fit.jober.aipp.util.MetaUtils; import modelengine.fit.jober.aipp.util.UUIDUtil; import modelengine.fit.jober.common.ServerInternalException; import modelengine.fitframework.annotation.Component; @@ -132,11 +131,13 @@ public Choir chat(CreateAppChatRequest body, OperationContext context, b Map businessData = this.convertContextToBusinessData(body, isDebug); // 这里几行代码的顺序不可以调整,必须先把对话的appId查询出来,再去创建chatId String chatAppId = this.getAppId(body); - boolean hasAtOtherApp = this.hasAtOtherApp(body); + boolean hasAtOtherApp = body.hasAtOtherApp(); this.createChatId(body, hasAtOtherApp, businessData); // create instance —— 根据实际的那个app创建 AppUtils.setAppChatInfo(body.getAppId(), isDebug); - this.appService.updateFlow(chatAppId, context); + if (isDebug) { + this.appService.updateFlow(chatAppId, context); + } LOGGER.info("[perf] [{}] chat updateFlow end, appId={}", System.currentTimeMillis(), body.getAppId()); this.addUserContext(body, businessData, isDebug, context, app.getType()); Tuple tuple = this.aippRunTimeService.createInstanceByApp(chatAppId, @@ -150,8 +151,8 @@ public Choir chat(CreateAppChatRequest body, OperationContext context, b this.saveChatInfos(body, context, ObjectUtils.cast(tuple.get(0).orElseThrow(this::generalServerException)), - hasAtOtherApp, - chatAppId); + chatAppId, + isDebug); } catch (AippTaskNotFoundException e) { throw new AippException(TASK_NOT_FOUND); } @@ -243,15 +244,15 @@ private void validateApp(String appId) { } private void saveChatInfos(CreateAppChatRequest body, OperationContext context, String instId, - boolean hasAtOtherApp, String chatAppId) throws AippTaskNotFoundException { + String chatAppId, boolean isDebug) throws AippTaskNotFoundException { AppBuilderApp app = this.appFactory.create(body.getAppId()); Map attributes = new HashMap<>(); - List metas = MetaUtils.getAllMetasByAppId(this.metaService, chatAppId, context); - if (CollectionUtils.isEmpty(metas)) { - LOGGER.error("metas is empty."); + Meta meta = CacheUtils.getMetaByAppId(this.metaService, chatAppId, isDebug, context); + if (meta == null) { + LOGGER.error("Cannot find meta for chat app. [appId={}, instId={}]", chatAppId, instId); throw new AippTaskNotFoundException(TASK_NOT_FOUND); } - String aippId = metas.get(0).getId(); + String aippId = meta.getId(); attributes.put(AippConst.ATTR_CHAT_INST_ID_KEY, instId); attributes.put(AippConst.ATTR_CHAT_STATE_KEY, app.getState()); attributes.put(AippConst.BS_AIPP_ID_KEY, aippId); @@ -261,7 +262,7 @@ private void saveChatInfos(CreateAppChatRequest body, OperationContext context, String chatId = body.getChatId(); this.buildAndInsertChatInfo(app, attributes, body.getQuestion(), chatId, context.getOperator()); this.buildAndInsertWideRelationInfo(instId, chatId); - if (hasAtOtherApp) { + if (body.hasAtOtherApp()) { AppBuilderApp chatApp = this.appFactory.create(chatAppId); // 被@的应用的对话 Map originAttributes = new HashMap<>(); @@ -287,9 +288,9 @@ private Map convertContextToBusinessData(CreateAppChatRequest bo private void addUserContext(CreateAppChatRequest body, Map businessData, boolean isDebug, OperationContext context, String appType) { - Meta meta = getMetaByAppId(metaService, body.getAppId(), isDebug, context); + Meta meta = CacheUtils.getMetaByAppId(this.metaService, body.getAppId(), isDebug, context); String flowDefinitionId = ObjectUtils.cast(meta.getAttributes().get(ATTR_FLOW_DEF_ID_KEY)); - List inputParams = FlowUtils.getAppInputParams(flowsService, flowDefinitionId, context); + List inputParams = FlowUtils.getAppInputParams(this.flowsService, flowDefinitionId, context); if (StringUtils.equals(APP.code(), appType)) { inputParams = inputParams.stream() .filter(param -> !StringUtils.equals("Question", param.getName())) @@ -376,11 +377,6 @@ private String getAppId(CreateAppChatRequest body) { return body.getAppId(); } - private boolean hasAtOtherApp(CreateAppChatRequest body) { - return StringUtils.isNotBlank(body.getContext().getAtChatId()) || StringUtils.isNotBlank(body.getContext() - .getAtAppId()); - } - private void createChatId(CreateAppChatRequest body, boolean hasAtOtherApp, Map businessData) { // body里没有chatId:第一次对话 if (StringUtils.isBlank(body.getChatId())) { diff --git a/app-builder/jane/plugins/aipp-plugin/src/main/java/modelengine/fit/jober/aipp/util/CacheUtils.java b/app-builder/jane/plugins/aipp-plugin/src/main/java/modelengine/fit/jober/aipp/util/CacheUtils.java index 54b7feba3a..7eb2548fbe 100644 --- a/app-builder/jane/plugins/aipp-plugin/src/main/java/modelengine/fit/jober/aipp/util/CacheUtils.java +++ b/app-builder/jane/plugins/aipp-plugin/src/main/java/modelengine/fit/jober/aipp/util/CacheUtils.java @@ -12,9 +12,17 @@ import modelengine.fit.jade.waterflow.FlowsService; import modelengine.fit.jade.waterflow.dto.FlowInfo; import modelengine.fit.jane.common.entity.OperationContext; +import modelengine.fit.jane.meta.multiversion.MetaService; +import modelengine.fit.jane.meta.multiversion.definition.Meta; +import modelengine.fit.jober.aipp.common.exception.AippErrCode; +import modelengine.fit.jober.aipp.common.exception.AippException; import modelengine.fit.jober.aipp.po.AppBuilderAppPo; import modelengine.fitframework.annotation.Component; +import modelengine.fitframework.log.Logger; +import modelengine.fitframework.util.CollectionUtils; +import java.util.List; +import java.util.Optional; import java.util.concurrent.TimeUnit; /** @@ -26,29 +34,30 @@ */ @Component public class CacheUtils { + private static final Logger log = Logger.get(CacheUtils.class); /** * 用于缓存appId to app */ - public static final Cache APP_CACHE = Caffeine.newBuilder() - .expireAfterAccess(48, TimeUnit.HOURS) - .maximumSize(30) - .build(); + public static final Cache APP_CACHE = + Caffeine.newBuilder().expireAfterAccess(48, TimeUnit.HOURS).maximumSize(30).build(); /** * 用于缓存flowDefinitionId to flowInfo */ - public static final Cache FLOW_CACHE = Caffeine.newBuilder() - .expireAfterAccess(48, TimeUnit.HOURS) - .maximumSize(30) - .build(); + public static final Cache FLOW_CACHE = + Caffeine.newBuilder().expireAfterAccess(48, TimeUnit.HOURS).maximumSize(30).build(); /** * 用于缓存app_id和aipp_id的关系 */ - public static final Cache APP_ID_TO_AIPP_ID_CACHE = Caffeine.newBuilder() - .expireAfterAccess(30, TimeUnit.DAYS) - .maximumSize(1000) - .build(); + public static final Cache APP_ID_TO_AIPP_ID_CACHE = + Caffeine.newBuilder().expireAfterAccess(30, TimeUnit.DAYS).maximumSize(1000).build(); + + /** + * 用于缓存app_id和最新发布的meta关系 + */ + private static final Cache AIPP_ID_TO_LAST_META_CACHE = + Caffeine.newBuilder().expireAfterAccess(5, TimeUnit.SECONDS).maximumSize(1000).build(); /** * 清理缓存 @@ -57,10 +66,12 @@ public static void clear() { APP_CACHE.invalidateAll(); FLOW_CACHE.invalidateAll(); APP_ID_TO_AIPP_ID_CACHE.invalidateAll(); + AIPP_ID_TO_LAST_META_CACHE.invalidateAll(); APP_CACHE.cleanUp(); FLOW_CACHE.cleanUp(); APP_ID_TO_AIPP_ID_CACHE.cleanUp(); + AIPP_ID_TO_LAST_META_CACHE.cleanUp(); } /** @@ -72,7 +83,40 @@ public static void clear() { * @return 缓存的FlowInfo */ public static FlowInfo getPublishedFlowWithCache(FlowsService flowsService, String flowDefinitionId, - OperationContext context) { + OperationContext context) { return FLOW_CACHE.get(flowDefinitionId, id -> flowsService.getFlows(id, context)); } + + /** + * 根据appId查询对应meta + * + * @param metaService 用于查询meta的服务实例 + * @param appId 应用appId + * @param isDebug 是否为debug会话 + * @param context 操作上下文 + * @return app对应的meta信息 + */ + public static Meta getMetaByAppId(MetaService metaService, String appId, boolean isDebug, + OperationContext context) { + if (isDebug) { + return getLatestMetaByAppId(metaService, appId, context); + } + // 获取一个aipp_id的缓存,然后根据aipp_id查询最新发布版的meta。 + String aippId = + APP_ID_TO_AIPP_ID_CACHE.get(appId, id -> getLatestMetaByAppId(metaService, id, context).getId()); + return AIPP_ID_TO_LAST_META_CACHE.get(aippId, (ignore) -> { + Meta lastPublishedMeta = MetaUtils.getLastPublishedMeta(metaService, appId, context); + return Optional.ofNullable(lastPublishedMeta) + .orElseThrow(() -> new AippException(AippErrCode.APP_CHAT_PUBLISHED_META_NOT_FOUND)); + }); + } + + private static Meta getLatestMetaByAppId(MetaService metaService, String appId, OperationContext context) { + List metas = MetaUtils.getAllMetasByAppId(metaService, appId, context); + if (CollectionUtils.isEmpty(metas)) { + log.error("No metas found for appId: " + appId); + throw new AippException(AippErrCode.APP_CHAT_DEBUG_META_NOT_FOUND); + } + return metas.get(0); + } } diff --git a/app-builder/jane/services/aipp-service/src/main/java/modelengine/fit/jober/aipp/dto/chat/CreateAppChatRequest.java b/app-builder/jane/services/aipp-service/src/main/java/modelengine/fit/jober/aipp/dto/chat/CreateAppChatRequest.java index b2128fb5c8..7168939bb3 100644 --- a/app-builder/jane/services/aipp-service/src/main/java/modelengine/fit/jober/aipp/dto/chat/CreateAppChatRequest.java +++ b/app-builder/jane/services/aipp-service/src/main/java/modelengine/fit/jober/aipp/dto/chat/CreateAppChatRequest.java @@ -12,6 +12,7 @@ import lombok.NoArgsConstructor; import modelengine.fit.jober.aipp.constants.AippConst; import modelengine.fitframework.annotation.Property; +import modelengine.fitframework.util.StringUtils; import java.util.Map; @@ -38,6 +39,15 @@ public class CreateAppChatRequest { @Property(description = "context", name = "context") private Context context; + /** + * 判断是否有@应用 + * + * @return 表示是否有@应用的 {@link boolean} + */ + public boolean hasAtOtherApp() { + return StringUtils.isNotBlank(getContext().getAtChatId()) || StringUtils.isNotBlank(getContext().getAtAppId()); + } + /** * 本类表示对话的上下文 */ diff --git a/app-builder/plugins/plugins-show-case-parent/agent-test/pom.xml b/app-builder/plugins/plugins-show-case-parent/agent-test/pom.xml index 3d110bf15e..2ad798d826 100644 --- a/app-builder/plugins/plugins-show-case-parent/agent-test/pom.xml +++ b/app-builder/plugins/plugins-show-case-parent/agent-test/pom.xml @@ -82,7 +82,7 @@ org.fitframework fit-build-maven-plugin - 3.5.0-SNAPSHOT + 3.5.0-M1 build-plugin @@ -124,7 +124,7 @@ - + diff --git a/app-builder/plugins/plugins-show-case-parent/fortune-telling/pom.xml b/app-builder/plugins/plugins-show-case-parent/fortune-telling/pom.xml index 0ac32b3a75..d8725cbd0b 100644 --- a/app-builder/plugins/plugins-show-case-parent/fortune-telling/pom.xml +++ b/app-builder/plugins/plugins-show-case-parent/fortune-telling/pom.xml @@ -47,7 +47,7 @@ org.fitframework fit-build-maven-plugin - 3.5.0-SNAPSHOT + 3.5.0-M1 build-plugin diff --git a/app-builder/plugins/plugins-show-case-parent/wenjie-data/pom.xml b/app-builder/plugins/plugins-show-case-parent/wenjie-data/pom.xml index 318ddf4bc0..2f5f42f9e0 100644 --- a/app-builder/plugins/plugins-show-case-parent/wenjie-data/pom.xml +++ b/app-builder/plugins/plugins-show-case-parent/wenjie-data/pom.xml @@ -86,7 +86,7 @@ org.fitframework fit-build-maven-plugin - 3.5.0-SNAPSHOT + 3.5.0-M1 build-plugin diff --git a/app-builder/waterflow/java/pom.xml b/app-builder/waterflow/java/pom.xml index fe6237a519..ef3d90e0c1 100644 --- a/app-builder/waterflow/java/pom.xml +++ b/app-builder/waterflow/java/pom.xml @@ -25,7 +25,7 @@ 17 - 3.5.0-SNAPSHOT + 3.5.0-M1 1.0.0-SNAPSHOT diff --git a/app-builder/waterflow/java/waterflow-dependency/pom.xml b/app-builder/waterflow/java/waterflow-dependency/pom.xml index 2d3c7829fa..b3d90454f9 100644 --- a/app-builder/waterflow/java/waterflow-dependency/pom.xml +++ b/app-builder/waterflow/java/waterflow-dependency/pom.xml @@ -14,7 +14,7 @@ 17 - 3.5.0-SNAPSHOT + 3.5.0-M1 1.0.0-SNAPSHOT 1.0.0-SNAPSHOT 0.0.3.6-SNAPSHOT diff --git a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/common/utils/UUIDUtil.java b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/common/utils/UUIDUtil.java new file mode 100644 index 0000000000..a4fc63140d --- /dev/null +++ b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/common/utils/UUIDUtil.java @@ -0,0 +1,46 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * This file is a part of the ModelEngine Project. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package modelengine.fit.waterflow.common.utils; + +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; + +/** + * Uuid的Utils类。 + * + * @author 孙怡菲 + * @since 2025-04-09 + */ +public class UUIDUtil { + /** + * 随机生成uuid。 + * + * @return 表示随机生成的uuid的 {@link String}。 + */ + public static String uuid() { + return UUID.randomUUID().toString().replace("-", ""); + } + + /** + * 使用线程本地随机数生成UUID。 + * 生成的 UUID 在唯一性和不可预测性上可能不如 UUID.randomUUID(),但在性能上有显著提升 + * + * @return 表示随机生成UUID的 {@link String}。 + */ + public static String fastUuid() { + long mostSigBits = ThreadLocalRandom.current().nextLong(); + long leastSigBits = ThreadLocalRandom.current().nextLong(); + + // 设置版本4和变体IETF + mostSigBits &= 0xffffffffffff0fffL; + mostSigBits |= 0x0000000000004000L; + leastSigBits &= 0x3fffffffffffffffL; + leastSigBits |= 0x8000000000000000L; + + return new UUID(mostSigBits, leastSigBits).toString().replace("-", ""); + } +} diff --git a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/biz/service/scheduletasks/CleanContextSchedule.java b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/biz/service/scheduletasks/CleanContextSchedule.java new file mode 100644 index 0000000000..d00ffefcaf --- /dev/null +++ b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/biz/service/scheduletasks/CleanContextSchedule.java @@ -0,0 +1,76 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * This file is a part of the ModelEngine Project. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package modelengine.fit.waterflow.flowsengine.biz.service.scheduletasks; + +import modelengine.fit.waterflow.common.utils.SleepUtil; +import modelengine.fit.waterflow.flowsengine.domain.flows.context.repo.flowcontext.FlowContextRepo; +import modelengine.fit.waterflow.flowsengine.domain.flows.context.repo.flowtrace.FlowTraceRepo; +import modelengine.fitframework.annotation.Component; +import modelengine.fitframework.annotation.Fit; +import modelengine.fitframework.annotation.Value; +import modelengine.fitframework.log.Logger; +import modelengine.fitframework.schedule.annotation.Scheduled; +import modelengine.fitframework.transaction.Transactional; + +import java.util.List; + +/** + * 定时清理流程中已完成的context + * 包括成功、失败、终止的流程数据 + * + * @author 杨祥宇 + * @since 2025-04-02 + */ +@Component +public class CleanContextSchedule { + private static final Logger log = Logger.get(CleanContextSchedule.class); + private static final int LIMIT = 1000; + private final FlowTraceRepo flowTraceRepo; + private final FlowContextRepo flowContextRepo; + private final int expiredDays; + + public CleanContextSchedule(FlowTraceRepo flowTraceRepo, @Fit(alias = "flowContextPersistRepo") FlowContextRepo + flowContextRepo, @Value("${jane.flowsEngine.contextExpiredDays}") int expiredDays) { + this.flowTraceRepo = flowTraceRepo; + this.flowContextRepo = flowContextRepo; + this.expiredDays = expiredDays; + } + + /** + * 每天凌晨3点定时清理超期EXPIRED_DAYS天的流程运行数据 + * 多实例并发执行分析:会并发执行getExpiredTrace()查询,可能导致重复获取相同traceIds + * 重复删除trace以及context数据不会对结果有影响 + * + */ + @Scheduled(strategy = Scheduled.Strategy.CRON, value = "0 0 3 * * ?") + public void cleanContextSchedule() { + log.info("Start clean flow expired contexts"); + try { + while (true) { + List traceIds = flowTraceRepo.getExpiredTrace(expiredDays, LIMIT); + if (traceIds.isEmpty()) { + break; + } + deleteFlowContext(traceIds); + } + } catch (Exception ex) { + log.error("Clean context error, error message: {}" + ex.getMessage()); + } + } + + /** + * 根据traceId列表删除trace和context数据 + * + * @param traceIds 表示流程trace id列表的{@link List}{@code <}{@link String}{@code >}。 + */ + @Transactional + public void deleteFlowContext(List traceIds) { + flowContextRepo.deleteByTraceIdList(traceIds); + flowTraceRepo.deleteByIdList(traceIds); + } + +} diff --git a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/repo/flowcontext/FlowContextPersistRepo.java b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/repo/flowcontext/FlowContextPersistRepo.java index 25bc2ab0f9..843a788bf2 100644 --- a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/repo/flowcontext/FlowContextPersistRepo.java +++ b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/repo/flowcontext/FlowContextPersistRepo.java @@ -169,24 +169,17 @@ public void save(List> flowContexts) { if (flowContexts == null || flowContexts.size() == 0) { return; } - log.warn("save before after"); FlowContextPO flowContextPO = contextMapper.find(flowContexts.get(0).getId()); - log.warn("save find after"); List flowContextPOS = flowContexts.stream().map(this::serializer).collect(Collectors.toList()); - log.warn("save after serializer"); if (flowContextPO == null) { contextMapper.batchCreate(flowContextPOS); - log.warn("save after batchCreate"); } else { batchUpdate(flowContextPOS); - log.warn("save after batchUpdate"); } } private void batchUpdate(List flowContextPOS) { - log.warn("batchUpdate before"); contextMapper.batchUpdate(flowContextPOS); - log.warn("batchUpdate after"); } @Override @@ -743,4 +736,12 @@ public boolean hasContextWithStatusAtPosition(List statusList, String tr public String getTransIdByTrace(String traceId) { return contextMapper.getTransIdByTrace(traceId); } + + @Override + public void deleteByTraceIdList(List traceIdList) { + if (traceIdList.isEmpty()) { + return; + } + contextMapper.deleteByTraceIdList(traceIdList); + } } diff --git a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/repo/flowcontext/FlowContextRepo.java b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/repo/flowcontext/FlowContextRepo.java index 073d58de63..af124433d9 100644 --- a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/repo/flowcontext/FlowContextRepo.java +++ b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/repo/flowcontext/FlowContextRepo.java @@ -671,5 +671,14 @@ default List> getWithoutFlowDataByToBatch(List toBat * @return trans id */ String getTransIdByTrace(String traceId); + + /** + * 根据trace列表删除对应的context数据 + * + * @param traceIdList trace id列表 + */ + default void deleteByTraceIdList(List traceIdList) { + throw new WaterflowException(ErrorCodes.FLOW_ENGINE_DATABASE_NOT_SUPPORT, "deleteByTraceIdList"); + } } diff --git a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/repo/flowtrace/DefaultFlowTraceRepo.java b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/repo/flowtrace/DefaultFlowTraceRepo.java index d7959d2f3f..49dd3bf867 100644 --- a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/repo/flowtrace/DefaultFlowTraceRepo.java +++ b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/repo/flowtrace/DefaultFlowTraceRepo.java @@ -101,6 +101,13 @@ public void deleteByIdList(List traceIds) { flowTraceMapper.deleteByIdList(traceIds); } + @Override + public List getExpiredTrace(int expiredDays, int limit) { + LocalDateTime now = LocalDateTime.now(); + LocalDateTime expired = now.minusDays(expiredDays); + return flowTraceMapper.getExpiredTrace(expired, limit); + } + private FlowTracePO serializer(FlowTrace flowTrace) { String contextPool = String.join(", ", flowTrace.getContextPool()); return FlowTracePO.builder() diff --git a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/repo/flowtrace/FlowTraceRepo.java b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/repo/flowtrace/FlowTraceRepo.java index e09965fd67..60bfafac55 100644 --- a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/repo/flowtrace/FlowTraceRepo.java +++ b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/repo/flowtrace/FlowTraceRepo.java @@ -100,4 +100,13 @@ public interface FlowTraceRepo { * @param traceIds traceId列表 */ void deleteByIdList(List traceIds); + + /** + * 查询超期并且已完成的trace id + * + * @param expiredDays 超期天数 + * @param limit 查询限制 + * @return trace id列表 + */ + List getExpiredTrace(int expiredDays, int limit); } diff --git a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/persist/mapper/FlowContextMapper.java b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/persist/mapper/FlowContextMapper.java index 1fd1201684..082991d3e6 100644 --- a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/persist/mapper/FlowContextMapper.java +++ b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/persist/mapper/FlowContextMapper.java @@ -523,4 +523,11 @@ List findFinishedContextsPagedByTraceId(String traceId, String en * @return trans id */ String getTransIdByTrace(String traceId); + + /** + * 根据trace列表删除对应的context + * + * @param traceIds trace id列表 + */ + void deleteByTraceIdList(List traceIds); } diff --git a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/persist/mapper/FlowTraceMapper.java b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/persist/mapper/FlowTraceMapper.java index 4130495cf8..0fd9e9ea41 100644 --- a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/persist/mapper/FlowTraceMapper.java +++ b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/persist/mapper/FlowTraceMapper.java @@ -113,4 +113,13 @@ public interface FlowTraceMapper { * @return trace列表 */ List findRunningTrace(List applications); + + /** + * 查询超期并且已完成的trace id + * + * @param expiredDays 超期天数 + * @param limit 查询数量 + * @return trace id列表 + */ + List getExpiredTrace(LocalDateTime expiredDays, int limit); } diff --git a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/utils/OhScriptExecutor.java b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/utils/OhScriptExecutor.java index b5b25d0292..409991c45d 100644 --- a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/utils/OhScriptExecutor.java +++ b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/utils/OhScriptExecutor.java @@ -16,8 +16,8 @@ import modelengine.fit.ohscript.script.interpreter.ASTEnv; import modelengine.fit.ohscript.script.parser.AST; import modelengine.fit.ohscript.script.parser.ParserBuilder; -import modelengine.fit.ohscript.util.UUIDUtil; import modelengine.fit.waterflow.common.Constant; +import modelengine.fit.waterflow.common.utils.UUIDUtil; import modelengine.fit.waterflow.flowsengine.domain.flows.context.FlowData; import modelengine.fitframework.log.Logger; import modelengine.fitframework.util.ObjectUtils; diff --git a/app-builder/waterflow/java/waterflow-service/src/main/resources/application.yml b/app-builder/waterflow/java/waterflow-service/src/main/resources/application.yml index 553e762dda..e2a62fd7f9 100644 --- a/app-builder/waterflow/java/waterflow-service/src/main/resources/application.yml +++ b/app-builder/waterflow/java/waterflow-service/src/main/resources/application.yml @@ -16,5 +16,6 @@ jane: scheduleRate: 10000 maxCount: 0 isNeedFlowCallbackAdapt: false + contextExpiredDays: 7 distributed-lock-provider: databaseDistributedLockProvider \ No newline at end of file diff --git a/app-builder/waterflow/java/waterflow-service/src/main/resources/mapper/FlowContextMapper.xml b/app-builder/waterflow/java/waterflow-service/src/main/resources/mapper/FlowContextMapper.xml index 9dbb51e0ec..ed40903489 100644 --- a/app-builder/waterflow/java/waterflow-service/src/main/resources/mapper/FlowContextMapper.xml +++ b/app-builder/waterflow/java/waterflow-service/src/main/resources/mapper/FlowContextMapper.xml @@ -765,4 +765,14 @@ trace_id = #{traceId} LIMIT 1 + + + DELETE FROM + + WHERE + trace_id IN + + #{id} + + \ No newline at end of file diff --git a/app-builder/waterflow/java/waterflow-service/src/main/resources/mapper/FlowTraceMapper.xml b/app-builder/waterflow/java/waterflow-service/src/main/resources/mapper/FlowTraceMapper.xml index 01353d0bfb..bbd56e67fd 100644 --- a/app-builder/waterflow/java/waterflow-service/src/main/resources/mapper/FlowTraceMapper.xml +++ b/app-builder/waterflow/java/waterflow-service/src/main/resources/mapper/FlowTraceMapper.xml @@ -163,4 +163,15 @@ #{traceId} + + \ No newline at end of file diff --git a/app-builder/waterflow/java/waterflow-service/src/test/java/modelengine/fit/waterflow/flowsengine/persist/mapper/FlowContextMapperTest.java b/app-builder/waterflow/java/waterflow-service/src/test/java/modelengine/fit/waterflow/flowsengine/persist/mapper/FlowContextMapperTest.java index 58c2f25114..910d3d31e5 100644 --- a/app-builder/waterflow/java/waterflow-service/src/test/java/modelengine/fit/waterflow/flowsengine/persist/mapper/FlowContextMapperTest.java +++ b/app-builder/waterflow/java/waterflow-service/src/test/java/modelengine/fit/waterflow/flowsengine/persist/mapper/FlowContextMapperTest.java @@ -445,6 +445,16 @@ public void testFindByToBatch() { Assertions.assertEquals("5", contextPO.get(1).getContextId()); } + @Test + @DisplayName("测试根据trace id列表删除成功") + void testDeleteByTraceIds() { + executeSqlInFile(sqlFile); + flowContextMapper.deleteByTraceIdList(Collections.singletonList("4")); + + FlowContextPO flowContextPO = flowContextMapper.find("4"); + Assertions.assertNull(flowContextPO); + } + private FlowContextPO getFlowContextPO() { LocalDateTime time = LocalDateTime.now(); return FlowContextPO.builder() diff --git a/app-builder/waterflow/java/waterflow-service/src/test/java/modelengine/fit/waterflow/flowsengine/persist/mapper/FlowTraceMapperTest.java b/app-builder/waterflow/java/waterflow-service/src/test/java/modelengine/fit/waterflow/flowsengine/persist/mapper/FlowTraceMapperTest.java index bcf3ce2f2c..66b4469811 100644 --- a/app-builder/waterflow/java/waterflow-service/src/test/java/modelengine/fit/waterflow/flowsengine/persist/mapper/FlowTraceMapperTest.java +++ b/app-builder/waterflow/java/waterflow-service/src/test/java/modelengine/fit/waterflow/flowsengine/persist/mapper/FlowTraceMapperTest.java @@ -133,4 +133,27 @@ public void testFindByTraceIdListSuccess() { Assertions.assertEquals("2", flowTracePOS.get(1).getTraceId()); Assertions.assertEquals("ERROR", flowTracePOS.get(1).getStatus()); } + + @Test + @DisplayName("查询过期trace成功") + public void testGetExpiredTraceSuccess() { + FlowTracePO tracePO = FlowTracePO + .builder() + .traceId("123") + .streamId("123") + .operator("yxy") + .application("flow") + .startNode("start") + .startTime(LocalDateTime.now()) + .status("ERROR") + .endTime(LocalDateTime.of(2023, 1, 1, 1, 1)) + .build(); + flowTraceMapper.create(tracePO); + LocalDateTime now = LocalDateTime.now(); + LocalDateTime expired = now.minusDays(30); + + List expiredTrace = flowTraceMapper.getExpiredTrace(expired, 2); + + Assertions.assertEquals(1, expiredTrace.size()); + } } diff --git a/app-engine/plugins/app-metrics/pom.xml b/app-engine/plugins/app-metrics/pom.xml index 79deb5a3d0..9ca0c94ed7 100644 --- a/app-engine/plugins/app-metrics/pom.xml +++ b/app-engine/plugins/app-metrics/pom.xml @@ -111,7 +111,7 @@ org.fitframework fit-build-maven-plugin - 3.5.0-SNAPSHOT + 3.5.0-M1 user 3 @@ -134,7 +134,7 @@ org.fitframework fit-dependency-maven-plugin - 3.5.0-SNAPSHOT + 3.5.0-M1 dependency diff --git a/common/components/ui-word-globalizer/pom.xml b/common/components/ui-word-globalizer/pom.xml index 0d038303a3..ad567b0d71 100644 --- a/common/components/ui-word-globalizer/pom.xml +++ b/common/components/ui-word-globalizer/pom.xml @@ -40,7 +40,7 @@ org.fitframework fit-dependency-maven-plugin - 3.5.0-SNAPSHOT + 3.5.0-M1 dependency diff --git a/common/dependency/pom.xml b/common/dependency/pom.xml index 5e5580f112..9cfb8d2907 100644 --- a/common/dependency/pom.xml +++ b/common/dependency/pom.xml @@ -14,7 +14,7 @@ 17 - 3.5.0-SNAPSHOT + 3.5.0-M1 1.0.0-SNAPSHOT 1.0.0-SNAPSHOT 1.0.0-SNAPSHOT diff --git a/pom.xml b/pom.xml index f60d3f47c0..05fc1462a7 100644 --- a/pom.xml +++ b/pom.xml @@ -25,7 +25,7 @@ 17 - 3.5.0-SNAPSHOT + 3.5.0-M1 1.0.0-SNAPSHOT