Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions app-builder/jane/plugins/aipp-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@
<groupId>modelengine.fit.jade.waterflow</groupId>
<artifactId>waterflow-graph-service</artifactId>
</dependency>
<dependency>
<groupId>org.fitframework.fel</groupId>
<artifactId>tool-mcp-client-service</artifactId>
</dependency>

<!-- Redis -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ public enum AippErrCode implements ErrorCode, RetCode {
*/
INVALID_FILE_PATH(90002003, "无效文件路径。"),

/**
* 调用 MCP 服务失败。
*/
CALL_MCP_SERVER_FAILED(90002004, "调用 MCP 服务失败,原因:{0}。"),

/**
* json解析失败
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import modelengine.fel.core.chat.ChatModel;
import modelengine.fel.core.chat.Prompt;
import modelengine.fel.engine.operators.patterns.AbstractAgent;
import modelengine.fel.tool.mcp.client.McpClientFactory;
import modelengine.fit.jade.tool.SyncToolCall;
import modelengine.fit.jober.aipp.constants.AippConst;
import modelengine.fitframework.annotation.Bean;
Expand All @@ -28,11 +29,12 @@ public class FelComponentConfig {
*
* @param syncToolCall 表示同步工具调用服务的 {@link SyncToolCall}。
* @param chatModel 表示模型流式服务的 {@link ChatModel}。
* @param mcpClientFactory 表示大模型上下文客户端工厂的 {@link McpClientFactory}。
* @return 返回 WaterFlow 场景的 Agent 服务的 {@link AbstractAgent}{@code <}{@link Prompt}{@code ,
* }{@link Prompt}{@code >}。
*/
@Bean(AippConst.WATER_FLOW_AGENT_BEAN)
public AbstractAgent getWaterFlowAgent(@Fit SyncToolCall syncToolCall, ChatModel chatModel) {
return new WaterFlowAgent(syncToolCall, chatModel);
public AbstractAgent getWaterFlowAgent(@Fit SyncToolCall syncToolCall, ChatModel chatModel, McpClientFactory mcpClientFactory) {
return new WaterFlowAgent(syncToolCall, chatModel, mcpClientFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,40 @@

package modelengine.fit.jober.aipp.fel;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

import modelengine.fel.core.chat.ChatMessage;
import modelengine.fel.core.chat.ChatModel;
import modelengine.fel.core.chat.Prompt;
import modelengine.fel.core.chat.support.ChatMessages;
import modelengine.fel.core.chat.support.FlatChatMessage;
import modelengine.fel.core.chat.support.ToolMessage;
import modelengine.fel.core.tool.ToolCall;
import modelengine.fel.core.tool.ToolInfo;
import modelengine.fel.engine.flows.AiFlows;
import modelengine.fel.engine.flows.AiProcessFlow;
import modelengine.fel.engine.operators.models.ChatChunk;
import modelengine.fel.engine.operators.models.ChatFlowModel;
import modelengine.fel.engine.operators.patterns.AbstractAgent;
import modelengine.fel.tool.mcp.client.McpClient;
import modelengine.fel.tool.mcp.client.McpClientFactory;
import modelengine.fit.jade.tool.SyncToolCall;
import modelengine.fit.jober.aipp.common.exception.AippErrCode;
import modelengine.fit.jober.aipp.common.exception.AippException;
import modelengine.fit.jober.aipp.constants.AippConst;
import modelengine.fit.jober.aipp.util.McpUtils;
import modelengine.fit.waterflow.domain.context.StateContext;
import modelengine.fitframework.annotation.Fit;
import modelengine.fitframework.inspection.Validation;
import modelengine.fitframework.util.CollectionUtils;
import modelengine.fitframework.util.ObjectUtils;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
Expand All @@ -42,28 +54,30 @@ public class WaterFlowAgent extends AbstractAgent {

private final String agentMsgKey;
private final SyncToolCall syncToolCall;
private final McpClientFactory mcpClientFactory;

/**
* {@link WaterFlowAgent} 的构造方法。
*
* @param syncToolCall 表示工具调用服务的 {@link SyncToolCall}。
* @param chatStreamModel 表示流式对话大模型的 {@link ChatModel}。
* @param mcpClientFactory 表示大模型上下文客户端工厂的 {@link McpClientFactory}。
*/
public WaterFlowAgent(@Fit SyncToolCall syncToolCall, ChatModel chatStreamModel) {
public WaterFlowAgent(@Fit SyncToolCall syncToolCall, ChatModel chatStreamModel,
McpClientFactory mcpClientFactory) {
super(new ChatFlowModel(chatStreamModel, null));
this.syncToolCall = Validation.notNull(syncToolCall, "The tool sync tool call cannot be null.");
this.syncToolCall = Validation.notNull(syncToolCall, "The tool sync tool call cannot be null.");
this.mcpClientFactory = Validation.notNull(mcpClientFactory, "The mcp client factory cannot be null.");
this.agentMsgKey = AGENT_MSG_KEY;
}

@Override
protected Prompt doToolCall(List<ToolCall> toolCalls, StateContext ctx) {
Validation.notNull(ctx, "The state context cannot be null.");
Map<String, Object> toolContext = ObjectUtils.getIfNull(ctx.getState(AippConst.TOOL_CONTEXT_KEY),
Collections::emptyMap);
return toolCalls.stream()
.map(toolCall -> (ChatMessage) new ToolMessage(toolCall.id(),
this.syncToolCall.call(toolCall.name(), toolCall.arguments(), toolContext)))
.collect(Collectors.collectingAndThen(Collectors.toList(), ChatMessages::from));
return ChatMessages.from(this.callTools(toolCalls, ctx)
.stream()
.map(message -> (ChatMessage) FlatChatMessage.from(message))
.collect(Collectors.toList()));
}

@Override
Expand All @@ -87,18 +101,53 @@ public AiProcessFlow<Prompt, ChatMessage> buildFlow() {
private ChatMessage handleTool(ChatMessage input, StateContext ctx) {
Validation.notNull(ctx, "The state context cannot be null.");
Validation.notNull(input, "The input message cannot be null.");

Map<String, Object> toolContext = ObjectUtils.getIfNull(ctx.getState(AippConst.TOOL_CONTEXT_KEY),
Collections::emptyMap);
ChatMessages lastRequest = ctx.getState(this.agentMsgKey);
lastRequest.add(input);
input.toolCalls().forEach(toolCall -> {
lastRequest.add(FlatChatMessage.from(new ToolMessage(toolCall.id(),
this.syncToolCall.call(toolCall.name(), toolCall.arguments(), toolContext))));
});
lastRequest.addAll(this.callTools(input.toolCalls(), ctx));
return input;
}

private List<ChatMessage> callTools(List<ToolCall> toolCalls, StateContext ctx) {
if (CollectionUtils.isEmpty(toolCalls)) {
return Collections.emptyList();
}
List<ToolInfo> tools = ctx.getState(AippConst.TOOLS_KEY);
Validation.notEmpty(tools, "Missing tool detected during call.");
Map<String, ToolInfo> toolsMap = tools.stream().collect(Collectors.toMap(ToolInfo::name, Function.identity()));
Map<String, Object> toolContext =
ObjectUtils.getIfNull(ctx.getState(AippConst.TOOL_CONTEXT_KEY), Collections::emptyMap);
return toolCalls.stream()
.map(toolCall -> this.callTool(toolCall, toolsMap, toolContext))
.collect(Collectors.toList());
}

private ChatMessage callTool(ToolCall toolCall, Map<String, ToolInfo> toolsMap, Map<String, Object> toolContext) {
ToolInfo toolInfo = toolsMap.get(toolCall.name());
if (toolInfo == null) {
throw new IllegalStateException(String.format("The tool call's tool is not exist. [toolName=%s]",
toolCall.name()));
}
Map<String, Object> extensions = Validation.notNull(toolInfo.extensions(),
"The tool call's extension is not exist. [toolName={0}]", toolCall.name());
String toolRealName = Validation.notBlank(ObjectUtils.cast(extensions.get(AippConst.TOOL_REAL_NAME)),
"Can not find the tool real name. [toolName={0}]",
toolCall.name());
Map<String, Object> mcpServerConfig = ObjectUtils.cast(extensions.get(AippConst.MCP_SERVER_KEY));
if (mcpServerConfig != null) {
String url = Validation.notBlank(ObjectUtils.cast(mcpServerConfig.get(AippConst.MCP_SERVER_URL_KEY)),
"The mcp url should not be empty.");
try (McpClient mcpClient = this.mcpClientFactory.create(McpUtils.getBaseUrl(url),
McpUtils.getSseEndpoint(url))) {
mcpClient.initialize();
Object result = mcpClient.callTool(toolRealName, JSONObject.parseObject(toolCall.arguments()));
return new ToolMessage(toolCall.id(), JSON.toJSONString(result));
} catch (IOException exception) {
throw new AippException(AippErrCode.CALL_MCP_SERVER_FAILED, exception.getMessage());
}
}
return new ToolMessage(toolCall.id(), this.syncToolCall.call(toolRealName, toolCall.arguments(), toolContext));
}

private ChatMessages getAgentMsg(ChatMessage input, StateContext ctx) {
Validation.notNull(ctx, "The state context cannot be null.");
return ctx.getState(this.agentMsgKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@
import modelengine.fel.engine.flows.AiProcessFlow;
import modelengine.fel.engine.operators.patterns.AbstractAgent;
import modelengine.fel.engine.operators.prompts.Prompts;
import modelengine.fel.tool.mcp.client.McpClient;
import modelengine.fel.tool.mcp.client.McpClientFactory;
import modelengine.fel.tool.mcp.entity.Tool;
import modelengine.fel.tool.model.transfer.ToolData;
import modelengine.fit.jober.aipp.util.McpUtils;
import modelengine.fitframework.inspection.Validation;
import modelengine.jade.store.service.ToolService;
import modelengine.fit.jade.aipp.model.dto.ModelAccessInfo;
import modelengine.fit.jade.aipp.model.service.AippModelCenter;
Expand Down Expand Up @@ -60,6 +65,7 @@
import modelengine.fitframework.util.StringUtils;
import modelengine.fitframework.util.UuidUtils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -69,6 +75,7 @@
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* LLM 组件实现
Expand Down Expand Up @@ -101,6 +108,7 @@ public class LlmComponent implements FlowableService {
private final AippModelCenter aippModelCenter;
private final PromptBuilderChain promptBuilderChain;
private final AppTaskInstanceService appTaskInstanceService;
private final McpClientFactory mcpClientFactory;

/**
* 大模型节点构造器,内部通过提供的 agent 和 tool 构建智能体工作流。
Expand All @@ -114,6 +122,7 @@ public class LlmComponent implements FlowableService {
* @param aippModelCenter 表示模型中心的 {@link AippModelCenter}。
* @param promptBuilderChain 表示提示器构造器职责链的 {@link PromptBuilderChain}。
* @param appTaskInstanceService 表示任务实例服务的 {@link AppTaskInstanceService}。
* @param mcpClientFactory 表示大模型上下文客户端工厂的 {@link McpClientFactory}。
*/
public LlmComponent(FlowInstanceService flowInstanceService,
@Fit ToolService toolService,
Expand All @@ -123,7 +132,8 @@ public LlmComponent(FlowInstanceService flowInstanceService,
@Fit(alias = "json") ObjectSerializer serializer,
AippModelCenter aippModelCenter,
PromptBuilderChain promptBuilderChain,
AppTaskInstanceService appTaskInstanceService) {
AppTaskInstanceService appTaskInstanceService,
McpClientFactory mcpClientFactory) {
this.flowInstanceService = flowInstanceService;
this.toolService = toolService;
this.aippLogService = aippLogService;
Expand All @@ -139,6 +149,7 @@ public LlmComponent(FlowInstanceService flowInstanceService,
.close();
this.promptBuilderChain = promptBuilderChain;
this.appTaskInstanceService = appTaskInstanceService;
this.mcpClientFactory = notNull(mcpClientFactory, "The mcp client factory cannot be null.");
}

/**
Expand Down Expand Up @@ -177,6 +188,7 @@ public List<Map<String, Object>> handleTask(List<Map<String, Object>> flowData)
StreamMsgSender streamMsgSender =
new StreamMsgSender(this.aippLogStreamService, this.serializer, path, msgId, instId);
streamMsgSender.sendKnowledge(promptMessage.getMetadata(), businessData);
ChatOption chatOption = this.buildChatOptions(businessData);
agentFlow.converse()
.bind((acc, chunk) -> {
if (firstTokenFlag[0]) {
Expand All @@ -195,7 +207,8 @@ public List<Map<String, Object>> handleTask(List<Map<String, Object>> flowData)
.doOnConsume(msg -> llmOutputConsumer(llmMeta, msg, promptMessage.getMetadata()))
.doOnError(throwable -> doOnAgentError(llmMeta,
throwable.getCause() == null ? throwable.getMessage() : throwable.getCause().getMessage()))
.bind(buildChatOptions(businessData))
.bind(chatOption)
.bind(AippConst.TOOLS_KEY, chatOption.tools())
.offer(Tip.fromArray(promptMessage.getSystemMessage(), promptMessage.getHumanMessage()));
log.info("[perf] [{}] handleTask end, instId={}", System.currentTimeMillis(), instId);
return flowData;
Expand Down Expand Up @@ -393,10 +406,6 @@ private String getFilePath(Map<String, Object> businessData) {
* @return 返回表示自定义参数。
*/
private ChatOption buildChatOptions(Map<String, Object> businessData) {
List<String> skillNameList = new ArrayList<>(ObjectUtils.cast(businessData.get("tools")));
if (businessData.containsKey("workflows")) {
skillNameList.addAll(ObjectUtils.cast(businessData.get("workflows")));
}
String model = ObjectUtils.cast(businessData.get("model"));
Map<String, String> accessInfo = ObjectUtils.nullIf(ObjectUtils.cast(businessData.get("accessInfo")),
MapBuilder.<String, String>get().put("serviceName", model).put("tag", "INTERNAL").build());
Expand All @@ -413,10 +422,40 @@ private ChatOption buildChatOptions(Map<String, Object> businessData) {
.secureConfig(modelAccessInfo.isSystemModel() ? null : SecureConfig.custom().ignoreTrust(true).build())
.apiKey(modelAccessInfo.getAccessKey())
.temperature(ObjectUtils.cast(businessData.get("temperature")))
.tools(this.buildToolInfos(skillNameList))
.tools(this.buildToolInfos(businessData))
.build();
}

private List<ToolInfo> buildToolInfos(Map<String, Object> businessData) {
List<String> skillNameList = new ArrayList<>(ObjectUtils.cast(businessData.get("tools")));
if (businessData.containsKey("workflows")) {
skillNameList.addAll(ObjectUtils.cast(businessData.get("workflows")));
}
Map<String, Object> mcpServersConfig = ObjectUtils.cast(businessData.get(AippConst.MCP_SERVERS_KEY));

return Stream.concat(this.buildToolInfos(skillNameList).stream(),
this.buildMcpToolInfos(mcpServersConfig).stream()).collect(Collectors.toList());
}

private List<ToolInfo> buildMcpToolInfos(Map<String, Object> mcpServersConfig) {
List<ToolInfo> result = new ArrayList<>();
ObjectUtils.nullIf(mcpServersConfig, new HashMap<String, Object>()).forEach((serverName, value) -> {
Map<String, Object> serverConfig = ObjectUtils.cast(value);
Comment thread
CodeCasterX marked this conversation as resolved.
String url = Validation.notBlank(ObjectUtils.cast(serverConfig.get(AippConst.MCP_SERVER_URL_KEY)),
"The mcp url should not be empty.");

try (McpClient mcpClient = this.mcpClientFactory.create(McpUtils.getBaseUrl(url),
McpUtils.getSseEndpoint(url))) {
mcpClient.initialize();
List<Tool> tools = mcpClient.getTools();
result.addAll(tools.stream().map(tool -> buildMcpToolInfo(serverName, tool, serverConfig)).toList());
} catch (IOException exception) {
throw new AippException(AippErrCode.CALL_MCP_SERVER_FAILED, exception.getMessage());
}
});
return result;
}

private List<ToolInfo> buildToolInfos(List<String> skillNameList) {
return skillNameList.stream()
.map(this.toolService::getTool)
Expand All @@ -427,12 +466,39 @@ private List<ToolInfo> buildToolInfos(List<String> skillNameList) {

private ToolInfo buildToolInfo(ToolData toolData) {
return ToolInfo.custom()
.name(toolData.getUniqueName())
.name(buildUniqueToolName(AippConst.STORE_SERVER_TYPE,
AippConst.STORE_SERVER_NAME,
toolData.getUniqueName()))
.description(toolData.getDescription())
.parameters(new HashMap<>(toolData.getSchema()))
.extensions(MapBuilder.<String, Object>get()
.put(AippConst.TOOL_REAL_NAME, toolData.getUniqueName())
.build())
.build();
}

private static ToolInfo buildMcpToolInfo(String serverName, Tool tool, Map<String, Object> serverConfig) {
return ToolInfo.custom()
.name(buildUniqueToolName(AippConst.MCP_SERVER_TYPE, serverName, tool.getName()))
.description(tool.getDescription())
.parameters(tool.getInputSchema())
.extensions(MapBuilder.<String, Object>get()
.put(AippConst.MCP_SERVER_KEY, serverConfig)
.put(AippConst.TOOL_REAL_NAME, tool.getName())
.build())
.build();
}

private static String buildUniqueToolName(String type, String serverName, String toolName) {
return StringUtils.format("{0}_{1}_{2}", type, serverName, toolName);
}

/**
* 判断是否启用日志。
*
* @param businessData 表示业务上下文数据的 {@link Map}{@code <}{@link String}{@code , }{@link Object}{@code >}。
* @return 表示是否启用日志的 {@code boolean}。
*/
public static boolean checkEnableLog(Map<String, Object> businessData) {
Comment thread
CodeCasterX marked this conversation as resolved.
Object value = businessData.get(AippConst.BS_LLM_ENABLE_LOG);
if (value == null) {
Expand Down
Loading