diff --git a/framework/fel/java/plugins/tool-mcp-server/README.md b/framework/fel/java/plugins/tool-mcp-server/README.md index b9222ebdd..cbb1b4f84 100644 --- a/framework/fel/java/plugins/tool-mcp-server/README.md +++ b/framework/fel/java/plugins/tool-mcp-server/README.md @@ -1,188 +1,248 @@ -# FitMcpStreamableServerTransportProvider类维护文档 +# MCP Server 插件维护文档 ## 文档概述 -本文档用于记录 `FitMcpStreamableServerTransportProvider` 类的设计、实现细节以及维护更新指南。该类是基于 MCP SDK 中的 -`HttpServletStreamableServerTransportProvider` 类改造而来,用于在 FIT 框架中提供 MCP(Model Context Protocol)服务端的传输层实现。 - -**原始参考类**: MCP SDK 中的 `HttpServletStreamableServerTransportProvider` +本文档用于记录 MCP Server 插件的设计、实现细节以及维护更新指南。该插件基于 MCP SDK 改造而来,用于在 FIT 框架中提供 MCP(Model Context Protocol)服务端的传输层实现。 **创建时间**: 2025-11-04 --- -## 类的作用和职责 - -`FitMcpStreamableServerTransportProvider` 是 MCP 服务端传输层的核心实现类,负责: +## 架构概览 -1. **HTTP 端点处理**: 处理 GET、POST、DELETE 请求,实现 MCP 协议的 HTTP 传输层 -2. **会话管理**: 管理客户端会话的生命周期(创建、维护、销毁) -3. **SSE 通信**: 通过 Server-Sent Events (SSE) 实现服务端到客户端的实时消息推送 -4. **消息序列化**: 处理 JSON-RPC 消息的序列化和反序列化 -5. **连接保活**: 支持可选的 Keep-Alive 机制 -6. **优雅关闭**: 支持服务的优雅关闭和资源清理 - ---- +### 核心组件关系 -## 类结构概览 +本插件提供了两种独立的 MCP 服务器实例,分别支持不同的传输协议: -### 主要成员变量 +1. **McpSseServer** - 基于 SSE 传输的服务器实例 +2. **McpStreamableServer** - 基于 Streamable 传输的服务器实例 -| 变量名 | 类型 | 来源 | 说明 | -|----------------------|----------------------------------------------------------|------------|---------------------------------| -| `MESSAGE_ENDPOINT` | `String` | SDK 原始 | 消息端点路径 `/mcp/streamable` | -| `disallowDelete` | `boolean` | SDK 原始 | 是否禁用 DELETE 请求 | -| `jsonMapper` | `McpJsonMapper` | SDK 原始 | JSON 序列化器 | -| `contextExtractor` | `McpTransportContextExtractor` | **FIT 改造** | 上下文提取器(泛型参数改为 FIT 的 Request 类型) | -| `keepAliveScheduler` | `KeepAliveScheduler` | SDK 原始 | Keep-Alive 调度器 | -| `sessionFactory` | `McpStreamableServerSession.Factory` | SDK 原始 | 会话工厂 | -| `sessions` | `Map` | SDK 原始 | 活跃会话映射表 | -| `isClosing` | `volatile boolean` | SDK 原始 | 关闭标志 | +每个服务器实例由以下三个核心组件构成: -### 主要方法 +``` +配置类 (McpSseServerConfig / McpStreamableServerConfig) + │ + ├─> TransportProvider (传输层实现) + │ ├─> FitMcpSseServerTransportProvider + │ └─> FitMcpStreamableServerTransportProvider + │ + ├─> McpSyncServer (MCP SDK 提供的同步服务器) + │ + └─> FitMcpServer (服务器的Fit接口包装,实现工具注册和执行) +``` -| 方法名 | 来源 | 说明 | -| --------------------- | ------------ | ------------------------------- | -| `protocolVersions()` | SDK 原始 | 返回支持的 MCP 协议版本 | -| `setSessionFactory()` | SDK 原始 | 设置会话工厂 | -| `notifyClients()` | SDK 原始 | 广播通知到所有客户端 | -| `closeGracefully()` | SDK 原始 | 优雅关闭传输层 | -| `handleGet()` | **FIT 改造** | 处理 GET 请求(SSE 连接) | -| `handlePost()` | **FIT 改造** | 处理 POST 请求(JSON-RPC 消息) | -| `handleDelete()` | **FIT 改造** | 处理 DELETE 请求(会话删除) | +### FitMcpServer - Fit接口的MCP服务器 -### 重构后的辅助方法 +`FitMcpServer` 是连接 FIT 工具系统与 MCP SDK服务器的核心类,主要职责包括: -为提高代码可读性和可维护性,从原本的 `handleGet()`、`handlePost()`、`handleDelete()` 方法中抽取了以下辅助方法: +- **工具观察**: 实现 `ToolChangedObserver` 接口,监听工具的添加和移除 +- **工具注册**: 将 FIT 工具转换为 MCP 工具规范并注册到 MCP 服务器 +- **工具执行**: 处理来自 MCP 客户端的工具调用请求 +- **生命周期管理**: 在服务销毁时自动注销观察者 -#### 验证请求合法性的方法 +每个 `FitMcpServer` 实例持有一个 `McpSyncServer`,通过配置类注入。两个独立的实例(SSE 和 Streamable)分别管理各自的工具列表和执行逻辑。 -| 方法名 | 说明 | -|-------------------------------|----------------------------------------------------------| -| `validateGetAcceptHeaders()` | 验证 GET 请求的 Accept 头,确保包含 `text/event-stream` | -| `validatePostAcceptHeaders()` | 验证 POST 请求的 Accept 头,确保包含 `text/event-stream` 和 `application/json` | -| `validateRequestSessionId()` | 验证请求的 `mcp-session-id` 头是否存在,以及对应的会话是否存在 | +### FitMcpServerTransportProvider - 传输层基类 -#### 根据请求类型调用处理逻辑的方法 +`FitMcpServerTransportProvider` 是一个抽象基类,为 SSE 和 Streamable 两种传输方式提供通用功能: -| 方法名 | 处理的请求类型 | 说明 | -|---------------------------------|---------|------------------------------------------| -| `handleReplaySseRequest()` | GET | 处理 SSE 消息重放请求,用于断线重连后恢复错过的消息 | -| `handleEstablishSseRequest()` | GET | 处理 SSE 连接建立请求,创建新的持久化 SSE 监听流 | -| `handleInitializeRequest()` | POST | 处理客户端初始化连接请求,创建新的 MCP 会话 | -| `handleJsonRpcMessage()` | POST | 把非Initialize的客户端消息分流给下面三个方法,包含Session验证。 | -| `handleJsonRpcResponse()` | POST | 处理 JSON-RPC 响应消息(如 Elicitation 中的客户端响应) | -| `handleJsonRpcNotification()` | POST | 处理 JSON-RPC 通知消息(客户端单向通知) | -| `handleJsonRpcRequest()` | POST | 处理 JSON-RPC 请求消息,返回 SSE 流式响应 | +**通用职责**: +- **会话管理**: 维护客户端会话的生命周期(创建、存储、销毁) +- **消息序列化**: 使用 `McpJsonMapper` 处理 JSON-RPC 消息的序列化和反序列化 +- **上下文提取**: 从 HTTP 请求中提取传输上下文信息 +- **Keep-Alive**: 支持可选的连接保活机制 +- **优雅关闭**: 提供服务优雅关闭和资源清理 -### 内部类 +**成员变量**: +- `jsonMapper` - JSON 序列化器 +- `contextExtractor` - 上下文提取器 +- `keepAliveScheduler` - Keep-Alive 调度器 +- `sessions` - 会话映射表 (ConcurrentHashMap) +- `isClosing` - 关闭标志 -| 类名 | 来源 | 说明 | -|------------------------------------|------------|-----------------------------| -| `FitStreamableMcpSessionTransport` | **FIT 改造** | 用于SSE 会话`sendMessage()`传输实现 | -| `Builder` | SDK 原始 | 构建器模式 | +两种传输方式的具体实现继承此基类,泛型参数 `` 指定会话类型。 --- -## SDK 原始逻辑 +## 传输层实现 -以下是从 MCP SDK 的 `HttpServletStreamableServerTransportProvider` 类保留的原始逻辑: +### SSE 传输方式 -### 1. 会话管理核心逻辑 +`FitMcpSseServerTransportProvider` 基于 MCP SDK 的 `HttpServletSseServerTransportProvider` 改造,提供基本的 SSE 传输实现。 -```java -private final Map sessions = new ConcurrentHashMap<>(); -``` +#### 端点配置 -- 使用 `ConcurrentHashMap` 存储活跃会话 -- 会话以 `mcp-session-id` 作为键 +- **GET `/mcp/sse`**: 建立 SSE 连接,用于服务端向客户端推送消息 +- **POST `/mcp/message`**: 接收客户端发送的 JSON-RPC 消息 -### 2. 会话工厂设置 +#### 特点 -```java -public void setSessionFactory(McpStreamableServerSession.Factory sessionFactory) { - this.sessionFactory = sessionFactory; -} -``` +- **会话类型**: 使用 `McpServerSession` 管理客户端会话 +- **会话创建**: 在 GET 请求时创建会话,生成唯一的 session ID +- **协议版本**: 仅支持 `MCP_2024_11_05` +- **简洁设计**: 适合简单的服务端到客户端推送场景 -- 由外部设置会话工厂,用于创建新会话 +#### 请求处理流程 -### 3. 客户端通知 - -```java -public Mono notifyClients(String method, Object params) { - // ... 广播逻辑 -} -``` +**GET 请求**: +1. 检查服务器是否正在关闭 +2. 验证 Accept 头是否包含 `text/event-stream` +3. 提取传输上下文 +4. 生成会话 ID 并创建新会话 +5. 建立 SSE 监听流,持续推送消息 -- 向所有活跃会话并行发送通知 -- 使用 `parallelStream()` 提高效率 -- 单个会话失败不影响其他会话 +**POST 请求**: +1. 检查服务器是否正在关闭 +2. 验证 Accept 头包含 `application/json` +3. 验证 `mcp-session-id` 头及会话存在性 +4. 提取传输上下文 +5. 反序列化 JSON-RPC 消息并转发给会话处理 -### 4. 关闭逻辑 +#### 内部实现 -```java -public Mono closeGracefully() { - this.isClosing = true; - // ... 关闭所有会话 - // ... 关闭 keep-alive 调度器 -} -``` +- **Transport 类**: `FitSseMcpSessionTransport` +- **职责**: 封装 SSE 消息发送逻辑,通过 `Emitter` 发送消息 -- 设置关闭标志 -- 关闭所有活跃会话 -- 清理资源 +--- -## FIT 框架改造核心逻辑 +### Streamable 传输方式 -以下是为适配 FIT 框架而新增或改造的部分: +`FitMcpStreamableServerTransportProvider` 基于 MCP SDK 的 `HttpServletStreamableServerTransportProvider` 改造,提供功能更丰富的传输实现。 -### 1. HTTP 端点处理核心流程(核心改造) +#### 端点配置 -- 请求/响应对象类型变更: - - `HttpServletRequest` → `HttpClassicServerRequest` - - `HttpServletResponse` → `HttpClassicServerResponse` -- 返回类型改为通用的 `Object`,支持多种返回形式 +- **GET `/mcp/streamable`**: 建立 SSE 连接或重放消息 +- **POST `/mcp/streamable`**: 处理初始化请求和其他 JSON-RPC 消息 +- **DELETE `/mcp/streamable`**: 删除指定会话 -#### a. GET 请求处理流程 +#### 特点 -1. 检查服务器是否正在关闭 -2. **调用 `validateGetAcceptHeaders()`** - 验证 Accept 头是否包含 `text/event-stream` -3. **调用 `validateRequestSessionId()`** - 验证 `mcp-session-id` 头是否存在及对应会话是否存在 -4. 提取 `transportContext` 上下文 -5. 获取会话 ID 和会话对象 -6. 检查是否是重放请求(`Last-Event-ID` 头): - - 如果是,**调用 `handleReplaySseRequest()`** - 重放错过的消息 - - 如果否,**调用 `handleEstablishSseRequest()`** - 建立新的 SSE 监听流 +- **会话类型**: 使用 `McpStreamableServerSession` 管理客户端会话 +- **会话创建**: 在 POST 初始化请求时创建会话 +- **协议版本**: 支持 `MCP_2024_11_05`、`MCP_2025_03_26`、`MCP_2025_06_18` +- **消息重放**: 支持断线重连后恢复错过的消息(通过 `Last-Event-ID`) +- **会话管理**: 提供显式的会话删除机制 +- **功能完整**: 适合需要完整会话管理的复杂场景 -#### b. POST 请求处理流程 +#### 请求处理流程 +**GET 请求**: 1. 检查服务器是否正在关闭 -2. **调用 `validatePostAcceptHeaders()`** - 验证 Accept 头包含 `text/event-stream` 和 `application/json` -3. 提取 `transportContext` 上下文 +2. 验证 Accept 头是否包含 `text/event-stream` +3. 验证 `mcp-session-id` 头及会话存在性 +4. 提取传输上下文 +5. 检查是否为重放请求(`Last-Event-ID` 头): + - **重放模式**: 重放错过的消息 + - **监听模式**: 建立新的 SSE 监听流 + +**POST 请求**: +1. 检查服务器是否正在关闭 +2. 验证 Accept 头包含 `text/event-stream` 和 `application/json` +3. 提取传输上下文 4. 反序列化 JSON-RPC 消息 -5. 判断是否为初始化请求(`initialize` 方法): - - 如果是,**调用 `handleInitializeRequest()`** - 创建新会话并返回初始化结果 -6. **调用 `validateRequestSessionId()`** - 验证会话(仅非初始化请求) -7. 获取会话 ID 和会话对象 -8. 根据消息类型分发处理: - - `JSONRPCResponse` → **调用 `handleJsonRpcResponse()`** - - `JSONRPCNotification` → **调用 `handleJsonRpcNotification()`** - - `JSONRPCRequest` → **调用 `handleJsonRpcRequest()`** - -#### c. DELETE 请求处理流程 +5. 判断消息类型: + - **初始化请求**: 创建新会话并返回初始化结果 + - **其他消息**: 验证会话后分发处理(响应/通知/请求) +**DELETE 请求**: 1. 检查服务器是否正在关闭 2. 检查是否禁用 DELETE 操作 -3. **调用 `validateRequestSessionId()`** - 验证 `mcp-session-id` 头及会话存在性 -4. 提取 `transportContext` 上下文 -5. 获取会话 ID 和会话对象 -6. 删除会话并从会话映射表中移除 +3. 验证 `mcp-session-id` 头及会话存在性 +4. 提取传输上下文 +5. 删除会话并清理资源 + +#### 辅助方法 + +为提高代码可读性,从请求处理方法中抽取了以下辅助方法: -### 2. SSE 实现改造(核心改造) +**验证类**: +- `validateGetAcceptHeaders()` - 验证 GET 请求的 Accept 头 +- `validatePostAcceptHeaders()` - 验证 POST 请求的 Accept 头 +- `validateRequestSessionId()` - 验证会话 ID -**原始 SDK**: +**处理类**: +- `handleReplaySseRequest()` - 处理消息重放请求 +- `handleEstablishSseRequest()` - 处理 SSE 连接建立 +- `handleInitializeRequest()` - 处理初始化请求 +- `handleJsonRpcMessage()` - 分流非初始化消息 +- `handleJsonRpcResponse()` - 处理 JSON-RPC 响应 +- `handleJsonRpcNotification()` - 处理 JSON-RPC 通知 +- `handleJsonRpcRequest()` - 处理 JSON-RPC 请求 + +#### 内部实现 + +- **Transport 类**: `FitStreamableMcpSessionTransport` +- **职责**: 封装 SSE 消息发送逻辑,支持消息重放和连接状态检查 + +--- +## 传输方式对比 + +### 功能对比表 + +| 特性 | SSE | Streamable | +|------|-----|------------| +| **端点路径** | GET `/mcp/sse`
POST `/mcp/message` | GET/POST/DELETE `/mcp/streamable` | +| **支持的协议版本** | `MCP_2024_11_05` | `MCP_2024_11_05`
`MCP_2025_03_26`
`MCP_2025_06_18` | +| **会话类型** | `McpServerSession` | `McpStreamableServerSession` | +| **会话创建时机** | GET 请求时 | POST 初始化请求时 | +| **消息重放** | ❌ 不支持 | ✅ 支持 (通过 `Last-Event-ID`) | +| **显式会话删除** | ❌ 无 DELETE 端点 | ✅ 支持 DELETE 请求 | +| **Keep-Alive** | ✅ 支持 | ✅ 支持 | +| **代码复杂度** | 较低 | 较高 | +| **适用场景** | 简单的单向推送 | 复杂的双向通信和会话管理 | + +### 选择建议 + +**使用 SSE 方式**,当你需要: +- 简单的服务端到客户端消息推送 +- 最小化的会话管理开销 +- 单一协议版本支持 + +**使用 Streamable 方式**,当你需要: +- 完整的会话生命周期管理 +- 断线重连后的消息重放功能 +- 支持多个 MCP 协议版本 +- 显式的会话清理机制 + +--- + +## SDK 改造说明 + +以下是将 MCP SDK 适配到 FIT 框架的通用改造点,两种传输方式均涉及这些改造(详细实现可参考各自的 TransportProvider 类)。 + +### 1. HTTP 请求/响应对象 + +**SDK 原始**: +```java +HttpServletRequest request +HttpServletResponse response +``` + +**FIT 改造**: +```java +HttpClassicServerRequest request +HttpClassicServerResponse response +``` + +**HTTP 头操作**: +```java +// 获取 Header +String accept = request.headers().first(MessageHeaderNames.ACCEPT).orElse(""); +String sessionId = request.headers().first(HttpHeaders.MCP_SESSION_ID).orElse(""); +boolean hasSessionId = request.headers().contains(HttpHeaders.MCP_SESSION_ID); + +// 设置 Header +response.headers().set("Content-Type", MimeType.APPLICATION_JSON.value()); +response.headers().set(HttpHeaders.MCP_SESSION_ID, sessionId); + +// 设置状态码 +response.statusCode(HttpResponseStatus.OK.statusCode()); +``` + +### 2. SSE 事件流实现 + +**SDK 原始**: ```java SseEmitter sseEmitter = new SseEmitter(); sseEmitter.send(SseEmitter.event() @@ -192,121 +252,171 @@ sseEmitter.send(SseEmitter.event() sseEmitter.complete(); ``` -**FIT 框架改造**: - +**FIT 改造**: ```java -// 使用 Choir 和 Emitter 实现 SSE -Choir.create(emitter -> { - // 创建sessionTransport类,用于调用emitter发送消息 +return Choir.create(emitter -> { + // 创建 Transport 封装 emitter FitStreamableMcpSessionTransport sessionTransport = new FitStreamableMcpSessionTransport(sessionId, emitter, response); - // session的逻辑是SDK原有的,里面会调用sessionTransport发送事件流 + // 调用 SDK 的 session 逻辑发送消息 session.responseStream(jsonrpcRequest, sessionTransport) .contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext)) .block(); - // 监听 Emitter 的生命周期 + // 监听生命周期 emitter.observe(new Emitter.Observer() { - @Override - public void onEmittedData(TextEvent data) { - // 数据发送完成 - } - - @Override - public void onCompleted() { - // SSE 流正常结束 - listeningStream.close(); - } - - @Override - public void onFailed(Exception cause) { - // SSE 流异常结束 - listeningStream.close(); - } + @Override + public void onEmittedData(TextEvent data) { } + + @Override + public void onCompleted() { + listeningStream.close(); + } + + @Override + public void onFailed(Exception cause) { + listeningStream.close(); + } }); }); ``` **关键变化**: +- 使用 `Choir` 替代 `SseEmitter` +- 使用 `Emitter` 发送事件 +- 使用 `Emitter.Observer` 监听生命周期 -- 使用 `Choir` 返回事件流 -- 使用 `Emitter` 替代 `SseEmitter` 的发送方法 -- 使用 `Emitter.Observer` 监听 SSE 生命周期事件 - -### 3. HTTP 响应处理改造 - -**FIT 特有的响应方式**: - -#### 返回纯文本 +### 3. HTTP 响应创建 +**返回纯文本**: ```java response.statusCode(HttpResponseStatus.BAD_REQUEST.statusCode()); -return Entity.createText(response, "Session ID required in mcp-session-id header"); +return Entity.createText(response, "Session ID required"); ``` -#### 返回 JSON 对象 - +**返回 JSON 对象**: ```java response.statusCode(HttpResponseStatus.NOT_FOUND.statusCode()); return Entity.createObject(response, McpError.builder(McpSchema.ErrorCodes.INVALID_PARAMS) - .message("Session not found: "+sessionId) + .message("Session not found: " + sessionId) .build()); ``` -#### 返回 SSE 流(重要改造) - +**返回 SSE 流**: ```java -return Choir. create(emitter ->{ - // emitter封装在sessionTransport中,被session调用 +return Choir.create(emitter -> { emitter.emit(textEvent); }); ``` -### 4. HTTP 头处理改造 +### 4. Transport 实现类 -**FIT 框架的 Headers API**: +两种传输方式都实现了内部 Transport 类,封装 SSE 消息发送逻辑: +**核心职责**: +- 通过 `Emitter` 发送 SSE 消息 +- 在 `close()` 时关闭 Emitter +- 发送前检查连接是否活跃 + +**连接检查**: ```java -// 获取 Header -String acceptHeaders = request.headers().first(MessageHeaderNames.ACCEPT).orElse(""); -boolean hasSessionId = request.headers().contains(HttpHeaders.MCP_SESSION_ID); -String sessionId = request.headers().first(HttpHeaders.MCP_SESSION_ID).orElse(""); +@Override +public void sendMessage(JSONRPCMessage message) { + // 检查连接是否仍然活跃 + if (!this.response.isActive()) { + logger.warn("[SSE] Connection inactive, session: {}", this.sessionId); + this.close(); + return; + } + + // 发送消息 + String messageJson = jsonMapper.writeValueAsString(message); + Event event = new Event(messageId, "message", messageJson); + this.emitter.emit(new TextEvent(event.toString())); +} +``` -// 设置 Header -response.headers().set("Content-Type",MimeType.APPLICATION_JSON.value()); -response.headers().set(HttpHeaders.MCP_SESSION_ID, sessionId); +--- -// 设置状态码 -response.statusCode(HttpResponseStatus.OK.statusCode()); +## SDK 保留逻辑 + +以下是从 MCP SDK 保留的核心逻辑,两种传输方式共享。 + +### 1. 会话存储 + +```java +private final Map sessions = new ConcurrentHashMap<>(); ``` -**变化**: +- 使用线程安全的 `ConcurrentHashMap` 存储会话 +- 键为 `mcp-session-id`,值为会话对象 -- 使用 `request.headers().first(name).orElse(default)` 获取单个 Header -- 使用 `request.headers().contains(name)` 检查 Header 是否存在 -- 使用 FIT 的 `MessageHeaderNames` 和 `MimeType` 常量 -- 使用 `HttpResponseStatus` 枚举设置状态码 +### 2. 会话工厂 -### 5. 内部类 Transport 实现 +```java +public void setSessionFactory(S.Factory sessionFactory) { + this.sessionFactory = sessionFactory; +} +``` -`FitStreamableMcpSessionTransport` 类的核心职责是发送SSE事件: +- 由外部(MCP SDK)设置会话工厂 +- 用于创建新会话实例 -- `sendmessage()`方法通过`Emitter` 发送SSE消息到客户端 -- 保存了当前会话的事件的`Emitter`,负责close时关闭`Emitter` +### 3. 客户端通知 -- SSE的`Emitter`感知不到GET连接是否断开,因此在`sendmessage()`发送前检查GET连接是否活跃 +```java +public Mono notifyClients(String method, Object params) { + // 并行向所有活跃会话发送通知 + sessions.values().parallelStream() + .forEach(session -> session.sendNotification(method, params)); +} +``` + +- 向所有活跃会话并行发送通知 +- 单个会话失败不影响其他会话 + +### 4. 优雅关闭 ```java -// 在发送消息前检查连接是否仍然活跃 -if(!this.response.isActive()){ - logger.warn("[SSE] Connection inactive detected while sending message for session: {}", - this.sessionId); - this.close(); - return; +public Mono closeGracefully() { + this.isClosing = true; + // 关闭所有会话 + // 关闭 keep-alive 调度器 + // 清理资源 } ``` +- 设置关闭标志,拒绝新请求 +- 关闭所有活跃会话 +- 清理调度器和其他资源 + +--- + +## 配置说明 + +### SSE 配置类 (McpSseServerConfig) + +创建三个组件: +1. `FitMcpSseServerTransportProvider` - 传输层 +2. `McpSyncSseServer` - MCP 同步服务器 +3. `McpSseServer` - FIT 工具服务器 + +### Streamable 配置类 (McpStreamableServerConfig) + +创建三个组件: +1. `FitMcpStreamableServerTransportProvider` - 传输层 +2. `McpSyncStreamableServer` - MCP 同步服务器 +3. `McpStreamableServer` - FIT 工具服务器 + +### 配置参数 + +- `mcp.server.ping.interval-seconds` - Keep-Alive 间隔(秒) +- `mcp.server.request.timeout-seconds` - 请求超时时间(秒) +- `mcp.server.streamable.disallow-delete` - 是否禁用 DELETE 请求(仅 Streamable) + +--- + ## 参考资源 ### MCP 协议文档 @@ -319,4 +429,5 @@ if(!this.response.isActive()){ | 日期 | 更新内容 | 负责人 | |----------|---------------------------------|-----| | 2025-11-04 | 初始版本,从 SDK 改造为 FIT 框架实现 | 黄可欣 | -| 2025-11-05 | 代码重构,提取9个辅助方法提高可读性和可维护性 | 黄可欣 | \ No newline at end of file +| 2025-11-05 | 代码重构,提取辅助方法提高可读性和可维护性 | 黄可欣 | +| 2025-11-21 | 文档重构,调整结构使其与代码保持一致,简化技术术语 | 黄可欣 | diff --git a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/support/DefaultMcpStreamableServer.java b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/FitMcpServer.java similarity index 79% rename from framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/support/DefaultMcpStreamableServer.java rename to framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/FitMcpServer.java index f3de70277..12e4b915e 100644 --- a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/support/DefaultMcpStreamableServer.java +++ b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/FitMcpServer.java @@ -4,7 +4,7 @@ * Licensed under the MIT License. See License.txt in the project root for license information. *--------------------------------------------------------------------------------------------*/ -package modelengine.fel.tool.mcp.server.support; +package modelengine.fel.tool.mcp.server; import static modelengine.fel.tool.info.schema.PluginSchema.TYPE; import static modelengine.fel.tool.info.schema.ToolsSchema.PROPERTIES; @@ -15,56 +15,53 @@ import io.modelcontextprotocol.server.McpSyncServer; import io.modelcontextprotocol.spec.McpSchema; import modelengine.fel.tool.mcp.entity.Tool; -import modelengine.fel.tool.mcp.server.McpServer; import modelengine.fel.tool.service.ToolChangedObserver; +import modelengine.fel.tool.service.ToolChangedObserverRegistry; import modelengine.fel.tool.service.ToolExecuteService; -import modelengine.fitframework.annotation.Component; +import modelengine.fitframework.ioc.annotation.PreDestroy; import modelengine.fitframework.log.Logger; import modelengine.fitframework.util.MapUtils; import modelengine.fitframework.util.StringUtils; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; /** - * Mcp Server implementing interface {@link McpServer}, {@link ToolChangedObserver} - * with MCP Server Bean {@link McpSyncServer}. + * Mcp Server implementing interface {@link ToolChangedObserver} + * with MCP Server {@link McpSyncServer} implemented with SDK. * * @author 季聿阶 * @since 2025-05-15 */ -@Component -public class DefaultMcpStreamableServer implements McpServer, ToolChangedObserver { - private static final Logger log = Logger.get(DefaultMcpStreamableServer.class); +public class FitMcpServer implements ToolChangedObserver { + private static final Logger log = Logger.get(FitMcpServer.class); private final McpSyncServer mcpSyncServer; - private final ToolExecuteService toolExecuteService; - private final List toolsChangedObservers = new ArrayList<>(); + private final ToolChangedObserverRegistry toolChangedObserverRegistry; /** - * Constructs a new instance of the DefaultMcpServer class. + * Constructs a new instance of the FitMcpServer class. * * @param toolExecuteService The service used to execute tools when handling tool call requests. - * @throws IllegalArgumentException If {@code toolExecuteService} is null. + * @param mcpSyncServer The MCP sync server. */ - public DefaultMcpStreamableServer(ToolExecuteService toolExecuteService, McpSyncServer mcpSyncServer) { + public FitMcpServer(ToolExecuteService toolExecuteService, McpSyncServer mcpSyncServer, + ToolChangedObserverRegistry toolChangedObserverRegistry) { this.toolExecuteService = notNull(toolExecuteService, "The tool execute service cannot be null."); this.mcpSyncServer = mcpSyncServer; + this.toolChangedObserverRegistry = toolChangedObserverRegistry; + this.toolChangedObserverRegistry.register(this); } - @Override - public List getTools() { - return this.mcpSyncServer.listTools().stream().map(this::convertToFelTool).collect(Collectors.toList()); + @PreDestroy + public void onDestroy() { + this.toolChangedObserverRegistry.unregister(this); } - @Override - public void registerToolsChangedObserver(ToolsChangedObserver observer) { - if (observer != null) { - this.toolsChangedObservers.add(observer); - } + public List getTools() { + return this.mcpSyncServer.listTools().stream().map(this::convertToFelTool).collect(Collectors.toList()); } @Override @@ -88,10 +85,13 @@ public void onToolAdded(String name, String description, Map par McpServerFeatures.SyncToolSpecification toolSpecification = createToolSpecification(name, description, parameters); - - this.mcpSyncServer.addTool(toolSpecification); + try { + this.mcpSyncServer.addTool(toolSpecification); + } catch (Exception e) { + log.error("Failed to added tool to MCP server. [toolName={}, error={}]", name, e.getMessage()); + throw e; + } log.info("Tool added to MCP server. [toolName={}, description={}, schema={}]", name, description, parameters); - this.toolsChangedObservers.forEach(ToolsChangedObserver::onToolsChanged); } @Override @@ -102,22 +102,15 @@ public void onToolRemoved(String name) { } this.mcpSyncServer.removeTool(name); log.info("Tool removed from MCP server. [toolName={}]", name); - this.toolsChangedObservers.forEach(ToolsChangedObserver::onToolsChanged); } /** * Creates a tool specification for the MCP server. - *

- * This method constructs a {@link McpServerFeatures.SyncToolSpecification} that includes: - *

    - *
  • Tool metadata (name, description, input schema)
  • - *
  • Call handler that executes the tool and handles exceptions
  • - *
* * @param name The name of the tool. * @param description The description of the tool. * @param parameters The parameter schema containing type, properties, and required fields. - * @return A fully configured {@link McpServerFeatures.SyncToolSpecification}. + * @return A configured {@link McpServerFeatures.SyncToolSpecification}. */ private McpServerFeatures.SyncToolSpecification createToolSpecification(String name, String description, Map parameters) { @@ -137,12 +130,6 @@ private McpServerFeatures.SyncToolSpecification createToolSpecification(String n /** * Executes a tool and handles any exceptions that may occur. - *

- * This method handles two types of exceptions: - *

    - *
  • {@link IllegalArgumentException}: Invalid tool arguments (logged as warning)
  • - *
  • {@link Exception}: Any other execution failure (logged as error)
  • - *
* * @param toolName The name of the tool to execute. * @param request The tool call request containing arguments. diff --git a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/FitMcpServerTransportProvider.java b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/FitMcpServerTransportProvider.java new file mode 100644 index 000000000..c2f4c1ce3 --- /dev/null +++ b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/FitMcpServerTransportProvider.java @@ -0,0 +1,353 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * This file is a part of the ModelEngine Project. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package modelengine.fel.tool.mcp.server; + +import io.modelcontextprotocol.json.McpJsonMapper; +import io.modelcontextprotocol.json.TypeRef; +import io.modelcontextprotocol.server.McpTransportContextExtractor; +import io.modelcontextprotocol.spec.McpError; +import io.modelcontextprotocol.spec.McpSchema; +import io.modelcontextprotocol.util.KeepAliveScheduler; +import modelengine.fel.tool.mcp.entity.Event; +import modelengine.fit.http.entity.Entity; +import modelengine.fit.http.entity.TextEvent; +import modelengine.fit.http.protocol.HttpResponseStatus; +import modelengine.fit.http.server.HttpClassicServerRequest; +import modelengine.fit.http.server.HttpClassicServerResponse; +import modelengine.fitframework.flowable.Emitter; +import modelengine.fitframework.inspection.Validation; +import modelengine.fitframework.log.Logger; +import reactor.core.publisher.Mono; + +import java.io.IOException; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Abstract base class for FIT MCP Server Transport Providers. + * This class provides common functionality for both SSE and Streamable transport implementations. + * + * @param The session type + * @author 黄可欣 + * @since 2025-11-19 + */ +public abstract class FitMcpServerTransportProvider { + private static final Logger logger = Logger.get(FitMcpServerTransportProvider.class); + protected final McpJsonMapper jsonMapper; + protected final McpTransportContextExtractor contextExtractor; + protected KeepAliveScheduler keepAliveScheduler; + + protected volatile boolean isClosing = false; + protected final Map sessions = new ConcurrentHashMap<>(); + + /** + * Constructs a new FitMcpServerTransportProvider instance. + * + * @param jsonMapper The JSON mapper for serialization/deserialization. + * @param contextExtractor The context extractor for HTTP requests. + * @param keepAliveInterval The interval for keep-alive messages, or null to disable. + */ + protected FitMcpServerTransportProvider(McpJsonMapper jsonMapper, + McpTransportContextExtractor contextExtractor, Duration keepAliveInterval) { + Validation.notNull(jsonMapper, "MCP Json mapper must not be null."); + Validation.notNull(contextExtractor, "Context extractor must not be null."); + + this.jsonMapper = jsonMapper; + this.contextExtractor = contextExtractor; + if (keepAliveInterval != null) { + this.initKeepAliveScheduler(keepAliveInterval); + } + } + + /** + * Initializes the keep-alive scheduler with the specified interval. + * + * @param keepAliveInterval The interval for keep-alive messages. + */ + protected abstract void initKeepAliveScheduler(Duration keepAliveInterval); + + /** + * Gets the session ID from a session object. + * + * @param session The session object. + * @return The session ID. + */ + protected abstract String getSessionId(S session); + + /** + * Closes a session gracefully. + * + * @param session The session to close. + * @return A Mono that completes when the session is closed. + */ + protected abstract Mono closeSession(S session); + + /** + * Sends a notification to a specific session. + * + * @param session The session to send to. + * @param method The notification method name. + * @param params The notification parameters. + * @return A Mono that completes when the notification is sent. + */ + protected abstract Mono sendNotificationToSession(S session, String method, Object params); + + /** + * Broadcasts a notification to all connected clients. + * If any errors occur during sending to a particular client, they are logged but + * don't prevent sending to other clients. + * + * @param method The method name for the notification. + * @param params The parameters for the notification. + * @return A Mono that completes when the broadcast attempt is finished. + */ + public Mono notifyClients(String method, Object params) { + if (this.sessions.isEmpty()) { + logger.debug("No active sessions to broadcast message to."); + return Mono.empty(); + } + + logger.debug("Attempting to broadcast message. [activeSessions={}]", this.sessions.size()); + + return Mono.fromRunnable(() -> this.sessions.values().parallelStream().forEach(session -> { + try { + this.sendNotificationToSession(session, method, params).block(); + } catch (Exception e) { + logger.error("Failed to send message to session. [sessionId={}, error={}]", + this.getSessionId(session), + e.getMessage(), + e); + } + })); + } + + /** + * Initiates a graceful shutdown of the transport. + * + * @return A Mono that completes when all cleanup operations are finished. + */ + public Mono closeGracefully() { + this.isClosing = true; + logger.debug("Initiating graceful shutdown. [activeSessions={}]", this.sessions.size()); + + return Mono.fromRunnable(() -> { + this.sessions.values().parallelStream().forEach(session -> { + try { + this.closeSession(session).block(); + } catch (Exception e) { + logger.error("Failed to close session. [sessionId={}, error={}]", + this.getSessionId(session), + e.getMessage(), + e); + } + }); + + logger.debug("Graceful shutdown completed."); + this.sessions.clear(); + if (this.keepAliveScheduler != null) { + this.keepAliveScheduler.shutdown(); + } + }); + } + + /** + * Creates a response indicating the server is shutting down. + * + * @param response The HTTP response. + * @return An Entity with the shutdown message. + */ + protected Object createShuttingDownResponse(HttpClassicServerResponse response) { + response.statusCode(HttpResponseStatus.SERVICE_UNAVAILABLE.statusCode()); + return Entity.createText(response, "Server is shutting down."); + } + + /** + * Validates that a session exists for the given session ID. + * + * @param sessionId The session ID to validate. + * @param response The HTTP response to set status code if validation fails. + * @return An error Entity if validation fails, null if validation succeeds. + */ + protected Object validateSessionExists(String sessionId, HttpClassicServerResponse response) { + if (sessionId == null || sessionId.isEmpty()) { + response.statusCode(HttpResponseStatus.BAD_REQUEST.statusCode()); + return Entity.createText(response, "Session ID missing."); + } + if (this.sessions.get(sessionId) == null) { + response.statusCode(HttpResponseStatus.NOT_FOUND.statusCode()); + return Entity.createObject(response, + McpError.builder(McpSchema.ErrorCodes.INVALID_PARAMS) + .message("Session not found: " + sessionId) + .build()); + } + return null; + } + + /** + * Deserializes a JSON-RPC message from the request body. + * + * @param requestBody The request body string to deserialize. + * @param response The HTTP response to set error status if deserialization fails. + * @return The deserialized {@link McpSchema.JSONRPCMessage}, or {@code null} if deserialization fails. + */ + protected McpSchema.JSONRPCMessage deserializeMessage(String requestBody, HttpClassicServerResponse response) { + try { + return McpSchema.deserializeJsonRpcMessage(this.jsonMapper, requestBody); + } catch (IllegalArgumentException | IOException e) { + logger.error("Failed to deserialize message. [error={}]", e.getMessage(), e); + response.statusCode(HttpResponseStatus.BAD_REQUEST.statusCode()); + return null; + } + } + + /** + * Abstract base class for session transport implementations. + * Provides common functionality for sending messages over SSE connections. + */ + protected abstract class AbstractFitMcpSessionTransport { + protected final String sessionId; + protected final Emitter emitter; + protected final HttpClassicServerResponse response; + + protected final ReentrantLock lock = new ReentrantLock(); + protected volatile boolean closed = false; + + /** + * Creates a new session transport. + * + * @param sessionId The unique identifier for this session. + * @param emitter The emitter for sending SSE events. + * @param response The HTTP response for checking connection status. + */ + protected AbstractFitMcpSessionTransport(String sessionId, Emitter emitter, + HttpClassicServerResponse response) { + this.sessionId = sessionId; + this.emitter = emitter; + this.response = response; + FitMcpServerTransportProvider.logger.info("[SSE] Building SSE emitter. [sessionId={}]", sessionId); + } + + /** + * Sends a JSON-RPC message to the client through the SSE connection. + * This method is thread-safe and checks if the connection is still active before sending. + * + * @param message The JSON-RPC message to send. + * @return A Mono that completes when the message has been sent. + */ + protected Mono doSendMessage(McpSchema.JSONRPCMessage message, String messageId) { + return Mono.fromRunnable(() -> { + if (this.closed) { + FitMcpServerTransportProvider.logger.info( + "[SSE] Attempted to send message to closed session. [sessionId={}]", + this.sessionId); + return; + } + this.lock.lock(); + try { + if (this.closed) { + FitMcpServerTransportProvider.logger.info( + "[SSE] Session was closed during message send attempt. [sessionId={}]", + this.sessionId); + return; + } + + if (!this.response.isActive()) { + FitMcpServerTransportProvider.logger.warn( + "[SSE] Connection inactive detected while sending message. [sessionId={}]", + this.sessionId); + this.doClose(); + return; + } + + String jsonText = FitMcpServerTransportProvider.this.jsonMapper.writeValueAsString(message); + TextEvent textEvent = TextEvent.custom() + .id(messageId != null ? messageId : this.sessionId) + .event(Event.MESSAGE.code()) + .data(jsonText) + .build(); + this.emitter.emit(textEvent); + + FitMcpServerTransportProvider.logger.info( + "[SSE] Sending message to session. [sessionId={}, eventId={}, jsonText={}]", + this.sessionId, + messageId != null ? messageId : this.sessionId, + jsonText); + } catch (Exception e) { + FitMcpServerTransportProvider.logger.error( + "[SSE] Failed to send message to session. [sessionId={}, error={}]", + this.sessionId, + e.getMessage(), + e); + try { + this.emitter.fail(e); + } catch (Exception errorException) { + FitMcpServerTransportProvider.logger.error( + "[SSE] Failed to send error to SSE builder. [sessionId={}, error={}]", + this.sessionId, + errorException.getMessage(), + errorException); + } + } finally { + this.lock.unlock(); + } + }); + } + + /** + * Converts data from one type to another using the configured McpJsonMapper. + * + * @param data The source data object to convert. + * @param typeRef The target type reference. + * @param The target type. + * @return The converted object of type T. + */ + public T unmarshalFrom(Object data, TypeRef typeRef) { + return FitMcpServerTransportProvider.this.jsonMapper.convertValue(data, typeRef); + } + + /** + * Initiates a graceful shutdown of the transport. + * + * @return A Mono that completes when the shutdown is complete. + */ + public Mono closeGracefully() { + return Mono.fromRunnable(this::doClose); + } + + /** + * Closes the transport immediately. + * Completes the SSE emitter and releases any associated resources. + */ + protected void doClose() { + this.lock.lock(); + try { + if (this.closed) { + FitMcpServerTransportProvider.logger.info("[SSE] Session transport already closed. [sessionId={}]", + this.sessionId); + return; + } + + this.closed = true; + FitMcpServerTransportProvider.logger.debug("[SSE] Closing session transport. [sessionId={}]", + this.sessionId); + + this.emitter.complete(); + FitMcpServerTransportProvider.logger.info("[SSE] Closed SSE builder successfully. [sessionId={}]", + this.sessionId); + } catch (Exception e) { + FitMcpServerTransportProvider.logger.warn( + "[SSE] Failed to complete SSE builder. [sessionId={}, error={}]", + this.sessionId, + e.getMessage()); + } finally { + this.lock.unlock(); + } + } + } +} diff --git a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/McpServer.java b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/McpServer.java deleted file mode 100644 index 7febd4ddd..000000000 --- a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/McpServer.java +++ /dev/null @@ -1,43 +0,0 @@ -/*--------------------------------------------------------------------------------------------- - * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. - * This file is a part of the ModelEngine Project. - * Licensed under the MIT License. See License.txt in the project root for license information. - *--------------------------------------------------------------------------------------------*/ - -package modelengine.fel.tool.mcp.server; - -import modelengine.fel.tool.mcp.entity.Tool; - -import java.util.List; - -/** - * Represents the MCP Server. - * - * @author 季聿阶 - * @since 2025-05-15 - */ -public interface McpServer { - /** - * Gets MCP server tools. - * - * @return The MCP server tools as a {@link List}{@code <}{@link Tool}{@code >}. - */ - List getTools(); - - /** - * Registers MCP server tools changed observer. - * - * @param observer The MCP server tools changed observer as a {@link ToolsChangedObserver}. - */ - void registerToolsChangedObserver(ToolsChangedObserver observer); - - /** - * Represents the MCP server tools changed observer. - */ - interface ToolsChangedObserver { - /** - * Called when MCP server tools changed. - */ - void onToolsChanged(); - } -} diff --git a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/config/McpSseServerConfig.java b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/config/McpSseServerConfig.java new file mode 100644 index 000000000..91baf3a94 --- /dev/null +++ b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/config/McpSseServerConfig.java @@ -0,0 +1,57 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * This file is a part of the ModelEngine Project. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package modelengine.fel.tool.mcp.server.config; + +import io.modelcontextprotocol.json.McpJsonMapper; +import io.modelcontextprotocol.server.McpServer; +import io.modelcontextprotocol.server.McpSyncServer; +import io.modelcontextprotocol.spec.McpSchema; +import modelengine.fel.tool.mcp.server.FitMcpServer; +import modelengine.fel.tool.mcp.server.transport.FitMcpSseServerTransportProvider; +import modelengine.fel.tool.service.ToolChangedObserverRegistry; +import modelengine.fel.tool.service.ToolExecuteService; +import modelengine.fitframework.annotation.Bean; +import modelengine.fitframework.annotation.Component; +import modelengine.fitframework.annotation.Fit; +import modelengine.fitframework.annotation.Value; + +import java.time.Duration; + +/** + * MCP SSE Server Bean implemented with MCP SDK. + * + * @author 黄可欣 + * @since 2025-11-10 + */ +@Component +public class McpSseServerConfig { + @Bean + public FitMcpSseServerTransportProvider fitMcpSseServerTransportProvider( + @Value("${mcp.server.ping.interval-seconds}") int keepAliveIntervalSeconds) { + return FitMcpSseServerTransportProvider.builder() + .jsonMapper(McpJsonMapper.getDefault()) + .keepAliveInterval(Duration.ofSeconds(keepAliveIntervalSeconds)) + .build(); + } + + @Bean("McpSyncSseServer") + public McpSyncServer mcpSyncSseServer(FitMcpSseServerTransportProvider transportProvider, + @Value("${mcp.server.request.timeout-seconds}") int requestTimeoutSeconds) { + return McpServer.sync(transportProvider) + .serverInfo("FIT Store MCP SSE Server", "3.6.1-SNAPSHOT") + .capabilities(McpSchema.ServerCapabilities.builder().tools(true).logging().build()) + .requestTimeout(Duration.ofSeconds(requestTimeoutSeconds)) + .build(); + } + + @Bean("McpSseServer") + public FitMcpServer defaultMcpSseServer(ToolExecuteService toolExecuteService, + @Fit(alias = "McpSyncSseServer") McpSyncServer mcpSyncServer, + ToolChangedObserverRegistry toolChangedObserverRegistry) { + return new FitMcpServer(toolExecuteService, mcpSyncServer, toolChangedObserverRegistry); + } +} diff --git a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/McpServerConfig.java b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/config/McpStreamableServerConfig.java similarity index 51% rename from framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/McpServerConfig.java rename to framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/config/McpStreamableServerConfig.java index e5ff98f9b..8c4e8ed2b 100644 --- a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/McpServerConfig.java +++ b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/config/McpStreamableServerConfig.java @@ -4,39 +4,56 @@ * Licensed under the MIT License. See License.txt in the project root for license information. *--------------------------------------------------------------------------------------------*/ -package modelengine.fel.tool.mcp.server; +package modelengine.fel.tool.mcp.server.config; import io.modelcontextprotocol.json.McpJsonMapper; import io.modelcontextprotocol.server.McpServer; import io.modelcontextprotocol.server.McpSyncServer; import io.modelcontextprotocol.spec.McpSchema; +import modelengine.fel.tool.mcp.server.FitMcpServer; import modelengine.fel.tool.mcp.server.transport.FitMcpStreamableServerTransportProvider; +import modelengine.fel.tool.service.ToolChangedObserverRegistry; +import modelengine.fel.tool.service.ToolExecuteService; import modelengine.fitframework.annotation.Bean; import modelengine.fitframework.annotation.Component; +import modelengine.fitframework.annotation.Fit; import modelengine.fitframework.annotation.Value; import java.time.Duration; /** - * Mcp Server Bean implemented with MCP SDK. + * MCP Streamable Server Bean implemented with MCP SDK. * * @author 黄可欣 * @since 2025-10-22 */ @Component -public class McpServerConfig { +public class McpStreamableServerConfig { @Bean - public FitMcpStreamableServerTransportProvider fitMcpStreamableServerTransportProvider() { - return FitMcpStreamableServerTransportProvider.builder().jsonMapper(McpJsonMapper.getDefault()).build(); + public FitMcpStreamableServerTransportProvider fitMcpStreamableServerTransportProvider( + @Value("${mcp.server.ping.interval-seconds}") int keepAliveIntervalSeconds, + @Value("${mcp.server.streamable.disallow-delete}") boolean disallowDelete) { + return FitMcpStreamableServerTransportProvider.builder() + .jsonMapper(McpJsonMapper.getDefault()) + .keepAliveInterval(Duration.ofSeconds(keepAliveIntervalSeconds)) + .disallowDelete(disallowDelete) + .build(); } - @Bean - public McpSyncServer mcpSyncServer(FitMcpStreamableServerTransportProvider transportProvider, + @Bean("McpSyncStreamableServer") + public McpSyncServer mcpSyncStreamableServer(FitMcpStreamableServerTransportProvider transportProvider, @Value("${mcp.server.request.timeout-seconds}") int requestTimeoutSeconds) { return McpServer.sync(transportProvider) - .serverInfo("FIT Store MCP Server", "3.6.1-SNAPSHOT") + .serverInfo("FIT Store MCP Streamable Server", "3.6.1-SNAPSHOT") .capabilities(McpSchema.ServerCapabilities.builder().tools(true).logging().build()) .requestTimeout(Duration.ofSeconds(requestTimeoutSeconds)) .build(); } + + @Bean("McpStreamableServer") + public FitMcpServer defaultMcpStreamableServer(ToolExecuteService toolExecuteService, + @Fit(alias = "McpSyncStreamableServer") McpSyncServer mcpSyncServer, + ToolChangedObserverRegistry toolChangedObserverRegistry) { + return new FitMcpServer(toolExecuteService, mcpSyncServer, toolChangedObserverRegistry); + } } diff --git a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/transport/FitMcpSseServerTransportProvider.java b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/transport/FitMcpSseServerTransportProvider.java new file mode 100644 index 000000000..86abcd76e --- /dev/null +++ b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/transport/FitMcpSseServerTransportProvider.java @@ -0,0 +1,366 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * This file is a part of the ModelEngine Project. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package modelengine.fel.tool.mcp.server.transport; + +import io.modelcontextprotocol.common.McpTransportContext; +import io.modelcontextprotocol.json.McpJsonMapper; +import io.modelcontextprotocol.server.McpTransportContextExtractor; +import io.modelcontextprotocol.spec.McpError; +import io.modelcontextprotocol.spec.McpSchema; +import io.modelcontextprotocol.spec.McpServerSession; +import io.modelcontextprotocol.spec.McpServerTransport; +import io.modelcontextprotocol.spec.McpServerTransportProvider; +import io.modelcontextprotocol.spec.ProtocolVersions; +import io.modelcontextprotocol.util.KeepAliveScheduler; +import modelengine.fel.tool.mcp.server.FitMcpServerTransportProvider; +import modelengine.fit.http.annotation.GetMapping; +import modelengine.fit.http.annotation.PostMapping; +import modelengine.fit.http.annotation.RequestParam; +import modelengine.fit.http.entity.Entity; +import modelengine.fit.http.entity.TextEvent; +import modelengine.fit.http.protocol.HttpResponseStatus; +import modelengine.fit.http.server.HttpClassicServerRequest; +import modelengine.fit.http.server.HttpClassicServerResponse; +import modelengine.fitframework.flowable.Choir; +import modelengine.fitframework.flowable.Emitter; +import modelengine.fitframework.inspection.Validation; +import modelengine.fitframework.log.Logger; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.List; +import java.util.UUID; + +/** + * The default implementation of {@link McpServerTransportProvider}. + * The FIT transport provider for MCP SSE Server, according to {@code HttpServletSseServerTransportProvider} in MCP + * SDK. + * + * @author 黄可欣 + * @since 2025-11-19 + */ +public class FitMcpSseServerTransportProvider extends FitMcpServerTransportProvider + implements McpServerTransportProvider { + private static final Logger logger = Logger.get(FitMcpSseServerTransportProvider.class); + private static final String MESSAGE_ENDPOINT = "/mcp/message"; + private static final String SSE_ENDPOINT = "/mcp/sse"; + public static final String ENDPOINT_EVENT_TYPE = "endpoint"; + private McpServerSession.Factory sessionFactory; + + /** + * Constructs a new FitMcpSseServerTransportProvider instance. + * + * @param jsonMapper The McpJsonMapper to use for JSON serialization/deserialization + * of messages. + * @param keepAliveInterval The interval for sending keep-alive messages to clients. + * @param contextExtractor The contextExtractor to fill in a + * {@link McpTransportContext}. + * @throws IllegalArgumentException if any parameter is null. + */ + private FitMcpSseServerTransportProvider(McpJsonMapper jsonMapper, Duration keepAliveInterval, + McpTransportContextExtractor contextExtractor) { + super(jsonMapper, contextExtractor, keepAliveInterval); + } + + @Override + protected void initKeepAliveScheduler(Duration keepAliveInterval) { + this.keepAliveScheduler = KeepAliveScheduler.builder(() -> this.isClosing + ? Flux.empty() + : Flux.fromIterable(this.sessions.values())) + .initialDelay(keepAliveInterval) + .interval(keepAliveInterval) + .build(); + this.keepAliveScheduler.start(); + } + + @Override + protected String getSessionId(McpServerSession session) { + return session.getId(); + } + + @Override + protected Mono closeSession(McpServerSession session) { + return session.closeGracefully(); + } + + @Override + protected Mono sendNotificationToSession(McpServerSession session, String method, Object params) { + return session.sendNotification(method, params); + } + + /** + * Returns the list of supported MCP protocol versions. + * + * @return A list of supported protocol version strings. + */ + @Override + public List protocolVersions() { + return List.of(ProtocolVersions.MCP_2024_11_05); + } + + /** + * Sets the session factory used to create new MCP server sessions. + * + * @param sessionFactory The factory for creating server sessions. + */ + @Override + public void setSessionFactory(McpServerSession.Factory sessionFactory) { + this.sessionFactory = sessionFactory; + } + + /** + * Handles new SSE connection requests from clients by creating a new session and + * establishing an SSE connection. This method: + *
    + *
  • Generates a unique session ID
  • + *
  • Creates a new session with a {@link FitSseMcpSessionTransport}
  • + *
  • Sends an initial endpoint event to inform the client where to send messages
  • + *
  • Maintains the session in the sessions map
  • + *
+ * + * @param request The incoming server request. + * @param response The HTTP response for SSE communication. + * @return A {@link Choir}{@code <}{@link TextEvent}{@code >} object for SSE streaming, + * or an error response if the server is shutting down or the connection fails. + */ + @GetMapping(path = SSE_ENDPOINT) + public Object handleSseConnection(HttpClassicServerRequest request, HttpClassicServerResponse response) { + if (this.isClosing) { + return this.createShuttingDownResponse(response); + } + + String sessionId = UUID.randomUUID().toString(); + logger.debug("Creating new SSE connection. [sessionId={}]", sessionId); + try { + return Choir.create(emitter -> { + this.addEmitterObserver(emitter, sessionId); + FitSseMcpSessionTransport sessionTransport = + new FitSseMcpSessionTransport(sessionId, emitter, response); + McpServerSession session = this.sessionFactory.create(sessionTransport); + this.sessions.put(sessionId, session); + try { + String initData = MESSAGE_ENDPOINT + "?sessionId=" + sessionId; + TextEvent textEvent = + TextEvent.custom().id(sessionId).event(ENDPOINT_EVENT_TYPE).data(initData).build(); + emitter.emit(textEvent); + logger.info("[SSE] Sending init data to session. [sessionId={}, initData={}]", sessionId, initData); + + } catch (Exception e) { + logger.error("Failed to send initial endpoint event. [error={}]", e.getMessage(), e); + emitter.fail(e); + } + }); + } catch (Exception e) { + logger.error("[GET] Failed to handle GET request. [sessionId={}, error={}]", sessionId, e.getMessage(), e); + this.sessions.remove(sessionId); + response.statusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.statusCode()); + return null; + } + } + + /** + * Handles incoming JSON-RPC messages from clients. This method: + *
    + *
  • Validates the session ID from the request parameter
  • + *
  • Deserializes the request body into a JSON-RPC message
  • + *
  • Processes the message through the session's handle method
  • + *
  • Returns appropriate HTTP responses based on the processing result
  • + *
+ * + * @param request The incoming server request containing the JSON-RPC message. + * @param response The HTTP response to set status code and return data. + * @param sessionId The session ID from the request parameter. + * @return An error {@link Entity} if validation fails, or {@code null} on success. + */ + @PostMapping(path = MESSAGE_ENDPOINT) + public Object handleMessage(HttpClassicServerRequest request, HttpClassicServerResponse response, + @RequestParam("sessionId") String sessionId) { + if (this.isClosing) { + return this.createShuttingDownResponse(response); + } + Object sessionError = this.validateRequestSessionId(sessionId, response); + if (sessionError != null) { + return sessionError; + } + McpServerSession session = this.sessions.get(sessionId); + + String requestBody = new String(request.entityBytes(), StandardCharsets.UTF_8); + McpSchema.JSONRPCMessage message = this.deserializeMessage(requestBody, response); + if (message == null) { + logger.error("[POST] Invalid message format. [sessionId={}, requestBody={}]", sessionId, requestBody); + return Entity.createObject(response, + McpError.builder(McpSchema.ErrorCodes.PARSE_ERROR).message("Invalid message format.").build()); + } + logger.info("[POST] Receiving message from session. [sessionId={}, requestBody={}]", sessionId, requestBody); + McpTransportContext transportContext = this.contextExtractor.extract(request); + try { + session.handle(message).contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext)).block(); + response.statusCode(HttpResponseStatus.OK.statusCode()); + return null; + } catch (Exception e) { + logger.error("[POST] Error handling message. [error={}]", e.getMessage(), e); + response.statusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.statusCode()); + return Entity.createObject(response, + McpError.builder(McpSchema.ErrorCodes.INTERNAL_ERROR).message(e.getMessage()).build()); + } + } + + /** + * Adds an observer to the SSE emitter to handle connection lifecycle events. + * The observer removes the session from the sessions map when the connection + * completes or fails. + * + * @param emitter The SSE emitter to observe. + * @param sessionId The session ID associated with this emitter. + */ + private void addEmitterObserver(Emitter emitter, String sessionId) { + emitter.observe(new Emitter.Observer() { + @Override + public void onEmittedData(TextEvent data) { + // No action needed + } + + @Override + public void onCompleted() { + FitMcpSseServerTransportProvider.this.sessions.remove(sessionId); + FitMcpSseServerTransportProvider.logger.info( + "[SSE] Completed SSE emitting and closed session successfully. [sessionId={}]", + sessionId); + } + + @Override + public void onFailed(Exception cause) { + FitMcpSseServerTransportProvider.this.sessions.remove(sessionId); + FitMcpSseServerTransportProvider.logger.warn( + "[SSE] SSE failed, session closed. [sessionId={}, cause={}]", + sessionId, + cause.getMessage()); + } + }); + } + + /** + * Validates the MCP session ID in the request headers and verifies the session exists. + * This method checks both the presence of the {@code mcp-session-id} header and + * the existence of the corresponding session in the active sessions map. + * + * @param sessionId The {@link String} session ID in request parameter. + * @param response The {@link HttpClassicServerResponse} to set status code if validation fails. + * @return An error {@link Entity} if validation fails (either missing session ID or session not found), + * {@code null} if validation succeeds. + */ + private Object validateRequestSessionId(String sessionId, HttpClassicServerResponse response) { + if (sessionId.isEmpty()) { + response.statusCode(HttpResponseStatus.BAD_REQUEST.statusCode()); + return Entity.createText(response, "Session ID missing in message endpoint."); + } + return this.validateSessionExists(sessionId, response); + } + + /** + * Implementation of {@link McpServerTransport} for FIT SSE sessions. + * This class handles the transport-level communication for a specific client session. + * + *

+ * This class is thread-safe and uses a {@link java.util.concurrent.locks.ReentrantLock} to synchronize access to + * the + * underlying SSE emitter to prevent race conditions when multiple threads attempt to + * send messages concurrently. + */ + private class FitSseMcpSessionTransport extends AbstractFitMcpSessionTransport implements McpServerTransport { + FitSseMcpSessionTransport(String sessionId, Emitter emitter, HttpClassicServerResponse response) { + super(sessionId, emitter, response); + } + + @Override + public Mono sendMessage(McpSchema.JSONRPCMessage message) { + return this.doSendMessage(message, null); + } + + @Override + public void close() { + this.doClose(); + } + } + + public static Builder builder() { + return new Builder(); + } + + /** + * Builder for creating instances of FitMcpSseServerTransportProvider. + *

+ * This builder provides a fluent API for configuring and creating instances of + * FitMcpSseServerTransportProvider with custom settings. + */ + public static class Builder { + private McpJsonMapper jsonMapper; + private Duration keepAliveInterval; + private McpTransportContextExtractor contextExtractor = + (serverRequest) -> McpTransportContext.EMPTY; + + /** + * Sets the JSON object mapper to use for message serialization/deserialization. + * + * @param jsonMapper The object mapper to use. + * @return This builder instance for method chaining. + */ + public Builder jsonMapper(McpJsonMapper jsonMapper) { + Validation.notNull(jsonMapper, "MCP Json mapper must not be null."); + this.jsonMapper = jsonMapper; + return this; + } + + /** + * Sets the interval for keep-alive pings. + *

+ * If not specified, keep-alive pings will be disabled. + * + * @param keepAliveInterval The interval duration for keep-alive pings. + * @return This builder instance for method chaining. + */ + public Builder keepAliveInterval(Duration keepAliveInterval) { + this.keepAliveInterval = keepAliveInterval; + return this; + } + + /** + * Sets the context extractor that allows providing the MCP feature + * implementations to inspect HTTP transport level metadata that was present at + * HTTP request processing time. This allows to extract custom headers and other + * useful data for use during execution later on in the process. + * + * @param contextExtractor The contextExtractor to fill in a + * {@link McpTransportContext}. + * @return This builder instance. + * @throws IllegalArgumentException if contextExtractor is null. + */ + public Builder contextExtractor(McpTransportContextExtractor contextExtractor) { + Validation.notNull(contextExtractor, "Context extractor must not be null."); + this.contextExtractor = contextExtractor; + return this; + } + + /** + * Builds a new instance of FitMcpSseServerTransportProvider with the configured + * settings. + * + * @return A new FitMcpSseServerTransportProvider instance. + * @throws IllegalStateException if jsonMapper or messageEndpoint is not set. + */ + public FitMcpSseServerTransportProvider build() { + Validation.notNull(this.jsonMapper, "Json mapper must be set."); + + return new FitMcpSseServerTransportProvider( + this.jsonMapper == null ? McpJsonMapper.getDefault() : this.jsonMapper, + this.keepAliveInterval, + this.contextExtractor); + } + } +} diff --git a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/transport/FitMcpStreamableServerTransportProvider.java b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/transport/FitMcpStreamableServerTransportProvider.java index 9bf5bbfe4..0cd19c363 100644 --- a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/transport/FitMcpStreamableServerTransportProvider.java +++ b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/transport/FitMcpStreamableServerTransportProvider.java @@ -18,7 +18,7 @@ import io.modelcontextprotocol.spec.McpStreamableServerTransportProvider; import io.modelcontextprotocol.spec.ProtocolVersions; import io.modelcontextprotocol.util.KeepAliveScheduler; -import modelengine.fel.tool.mcp.entity.Event; +import modelengine.fel.tool.mcp.server.FitMcpServerTransportProvider; import modelengine.fit.http.annotation.DeleteMapping; import modelengine.fit.http.annotation.GetMapping; import modelengine.fit.http.annotation.PostMapping; @@ -36,46 +36,25 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantLock; /** * The default implementation of {@link McpStreamableServerTransportProvider}. - * The FIT transport provider for MCP Server, according to {@code HttpServletStreamableServerTransportProvider} in MCP + * The FIT transport provider for MCP Streamable Server, according to + * {@code HttpServletStreamableServerTransportProvider} in MCP * SDK. * * @author 黄可欣 * @since 2025-09-30 */ -public class FitMcpStreamableServerTransportProvider implements McpStreamableServerTransportProvider { +public class FitMcpStreamableServerTransportProvider extends FitMcpServerTransportProvider + implements McpStreamableServerTransportProvider { private static final Logger logger = Logger.get(FitMcpStreamableServerTransportProvider.class); - private static final String MESSAGE_ENDPOINT = "/mcp/streamable"; - - /** - * Flag indicating whether DELETE requests are disallowed on the endpoint. - */ - private final boolean disallowDelete; - private final McpJsonMapper jsonMapper; - private final McpTransportContextExtractor contextExtractor; - private KeepAliveScheduler keepAliveScheduler; - private McpStreamableServerSession.Factory sessionFactory; - - /** - * Map of active client sessions, keyed by mcp-session-id. - */ - private final Map sessions = new ConcurrentHashMap<>(); - - /** - * Flag indicating if the transport is shutting down. - */ - private volatile boolean isClosing = false; + private final boolean disallowDelete; /** * Constructs a new FitMcpStreamableServerTransportProvider instance, @@ -86,125 +65,73 @@ public class FitMcpStreamableServerTransportProvider implements McpStreamableSer * @param disallowDelete Whether to disallow DELETE requests on the endpoint. * @param contextExtractor The context extractor to fill in a {@link McpTransportContext}. * @param keepAliveInterval The interval for sending keep-alive messages to clients. - * @throws IllegalArgumentException if any parameter is null + * @throws IllegalArgumentException if any parameter is null. */ private FitMcpStreamableServerTransportProvider(McpJsonMapper jsonMapper, boolean disallowDelete, McpTransportContextExtractor contextExtractor, Duration keepAliveInterval) { - Validation.notNull(jsonMapper, "jsonMapper must not be null"); - Validation.notNull(contextExtractor, "McpTransportContextExtractor must not be null"); - - this.jsonMapper = jsonMapper; + super(jsonMapper, contextExtractor, keepAliveInterval); this.disallowDelete = disallowDelete; - this.contextExtractor = contextExtractor; - - if (keepAliveInterval != null) { - this.keepAliveScheduler = KeepAliveScheduler.builder(() -> (isClosing) - ? Flux.empty() - : Flux.fromIterable(this.sessions.values())) - .initialDelay(keepAliveInterval) - .interval(keepAliveInterval) - .build(); - - this.keepAliveScheduler.start(); - } } @Override - public List protocolVersions() { - return List.of(ProtocolVersions.MCP_2024_11_05, - ProtocolVersions.MCP_2025_03_26, - ProtocolVersions.MCP_2025_06_18); + protected void initKeepAliveScheduler(Duration keepAliveInterval) { + this.keepAliveScheduler = KeepAliveScheduler.builder(() -> this.isClosing + ? Flux.empty() + : Flux.fromIterable(this.sessions.values())) + .initialDelay(keepAliveInterval) + .interval(keepAliveInterval) + .build(); + this.keepAliveScheduler.start(); } @Override - public void setSessionFactory(McpStreamableServerSession.Factory sessionFactory) { - this.sessionFactory = sessionFactory; + protected String getSessionId(McpStreamableServerSession session) { + return session.getId(); } - /** - * Broadcasts a notification to all connected clients through their SSE connections. - * If any errors occur during sending to a particular client, they are logged but - * don't prevent sending to other clients. - * - * @param method The method name for the notification - * @param params The parameters for the notification - * @return A Mono that completes when the broadcast attempt is finished - */ @Override - public Mono notifyClients(String method, Object params) { - if (this.sessions.isEmpty()) { - logger.debug("No active sessions to broadcast message."); - return Mono.empty(); - } - - logger.info("Attempting to broadcast message. [sessionCount={}]", this.sessions.size()); - - return Mono.fromRunnable(() -> { - this.sessions.values().parallelStream().forEach(session -> { - try { - session.sendNotification(method, params).block(); - } catch (Exception e) { - logger.error("Failed to send message to session. [sessionId={}, error={}]", - session.getId(), - e.getMessage(), - e); - } - }); - }); + protected Mono closeSession(McpStreamableServerSession session) { + return session.closeGracefully(); } - /** - * Initiates a graceful shutdown of the transport. - * - * @return A Mono that completes when all cleanup operations are finished - */ @Override - public Mono closeGracefully() { - return Mono.fromRunnable(() -> { - this.isClosing = true; - logger.info("Initiating graceful shutdown. [sessionCount={}]", this.sessions.size()); + protected Mono sendNotificationToSession(McpStreamableServerSession session, String method, Object params) { + return session.sendNotification(method, params); + } - this.sessions.values().parallelStream().forEach(session -> { - try { - session.closeGracefully().block(); - } catch (Exception e) { - logger.error("Failed to close session. [sessionId={}, error={}]", - session.getId(), - e.getMessage(), - e); - } - }); + @Override + public List protocolVersions() { + return List.of(ProtocolVersions.MCP_2024_11_05, + ProtocolVersions.MCP_2025_03_26, + ProtocolVersions.MCP_2025_06_18); + } - this.sessions.clear(); - logger.info("Graceful shutdown completed."); - }).then().doOnSuccess(v -> { - if (this.keepAliveScheduler != null) { - this.keepAliveScheduler.shutdown(); - } - }); + @Override + public void setSessionFactory(McpStreamableServerSession.Factory sessionFactory) { + this.sessionFactory = sessionFactory; } /** * Set up the listening SSE connections and message replay. * - * @param request The incoming server request - * @param response The HTTP response - * @return Return the HTTP response body {@link Entity} or a {@link Choir}{@code <}{@link TextEvent}{@code >} object + * @param request The incoming server request. + * @param response The HTTP response. + * @return Return the HTTP response body {@link Entity} or a {@link Choir}{@code <}{@link TextEvent}{@code >} + * object. */ @GetMapping(path = MESSAGE_ENDPOINT) public Object handleGet(HttpClassicServerRequest request, HttpClassicServerResponse response) { if (this.isClosing) { - response.statusCode(HttpResponseStatus.SERVICE_UNAVAILABLE.statusCode()); - return Entity.createText(response, "Server is shutting down"); + return this.createShuttingDownResponse(response); } - Object headerError = validateGetAcceptHeaders(request, response); + Object headerError = this.validateGetAcceptHeaders(request, response); if (headerError != null) { return headerError; } // Get session ID and session - Object sessionError = validateRequestSessionId(request, response); + Object sessionError = this.validateRequestSessionId(request, response); if (sessionError != null) { return sessionError; } @@ -220,9 +147,17 @@ public Object handleGet(HttpClassicServerRequest request, HttpClassicServerRespo // Handle building SSE, and check if this is a replay request if (request.headers().contains(HttpHeaders.LAST_EVENT_ID)) { - handleReplaySseRequest(request, transportContext, sessionId, session, sessionTransport, emitter); + FitMcpStreamableServerTransportProvider.this.handleReplaySseRequest(request, + transportContext, + sessionId, + session, + sessionTransport, + emitter); } else { - handleEstablishSseRequest(sessionId, session, sessionTransport, emitter); + FitMcpStreamableServerTransportProvider.this.handleEstablishSseRequest(sessionId, + session, + sessionTransport, + emitter); } }); } catch (Exception e) { @@ -235,39 +170,38 @@ public Object handleGet(HttpClassicServerRequest request, HttpClassicServerRespo /** * Handles POST requests for incoming JSON-RPC messages from clients. * - * @param request The incoming server request containing the JSON-RPC message - * @param response The HTTP response - * @return Return the HTTP response body {@link Entity} or a {@link Choir}{@code <}{@link TextEvent}{@code >} object + * @param request The incoming server request containing the JSON-RPC message. + * @param response The HTTP response. + * @return Return the HTTP response body {@link Entity} or a {@link Choir}{@code <}{@link TextEvent}{@code >} + * object. */ @PostMapping(path = MESSAGE_ENDPOINT) public Object handlePost(HttpClassicServerRequest request, HttpClassicServerResponse response) { if (this.isClosing) { - response.statusCode(HttpResponseStatus.SERVICE_UNAVAILABLE.statusCode()); - return Entity.createText(response, "Server is shutting down"); + return this.createShuttingDownResponse(response); } - Object headerError = validatePostAcceptHeaders(request, response); + Object headerError = this.validatePostAcceptHeaders(request, response); if (headerError != null) { return headerError; } + String requestBody = new String(request.entityBytes(), StandardCharsets.UTF_8); + McpSchema.JSONRPCMessage message = this.deserializeMessage(requestBody, response); + if (message == null) { + logger.error("[POST] Invalid message format. [requestBody={}]", requestBody); + return Entity.createObject(response, + McpError.builder(McpSchema.ErrorCodes.PARSE_ERROR).message("Invalid message format.").build()); + } McpTransportContext transportContext = this.contextExtractor.extract(request); try { - String requestBody = new String(request.entityBytes(), StandardCharsets.UTF_8); - McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(jsonMapper, requestBody); - // Handle JSONRPCMessage if (message instanceof McpSchema.JSONRPCRequest jsonrpcRequest && jsonrpcRequest.method() .equals(McpSchema.METHOD_INITIALIZE)) { logger.info("[POST] Handling initialize method. [requestBody={}]", requestBody); - return handleInitializeRequest(request, response, jsonrpcRequest); + return this.handleInitializeRequest(request, response, jsonrpcRequest); } else { - return handleJsonRpcMessage(message, request, requestBody, transportContext, response); + return this.handleJsonRpcMessage(message, request, requestBody, transportContext, response); } - } catch (IllegalArgumentException | IOException e) { - logger.error("[POST] Failed to deserialize message. [error={}]", e.getMessage(), e); - response.statusCode(HttpResponseStatus.BAD_REQUEST.statusCode()); - return Entity.createObject(response, - McpError.builder(McpSchema.ErrorCodes.PARSE_ERROR).message("Invalid message format").build()); } catch (Exception e) { logger.error("[POST] Error handling message. [error={}]", e.getMessage(), e); response.statusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.statusCode()); @@ -279,15 +213,14 @@ public Object handlePost(HttpClassicServerRequest request, HttpClassicServerResp /** * Handles DELETE requests for session deletion. * - * @param request The incoming server request - * @param response The HTTP response + * @param request The incoming server request. + * @param response The HTTP response. * @return Return HTTP response body {@link Entity}. */ @DeleteMapping(path = MESSAGE_ENDPOINT) public Object handleDelete(HttpClassicServerRequest request, HttpClassicServerResponse response) { if (this.isClosing) { - response.statusCode(HttpResponseStatus.SERVICE_UNAVAILABLE.statusCode()); - return Entity.createText(response, "Server is shutting down"); + return this.createShuttingDownResponse(response); } if (this.disallowDelete) { response.statusCode(HttpResponseStatus.METHOD_NOT_ALLOWED.statusCode()); @@ -295,7 +228,7 @@ public Object handleDelete(HttpClassicServerRequest request, HttpClassicServerRe } // Get session ID and session - Object sessionError = validateRequestSessionId(request, response); + Object sessionError = this.validateRequestSessionId(request, response); if (sessionError != null) { return sessionError; } @@ -321,15 +254,15 @@ public Object handleDelete(HttpClassicServerRequest request, HttpClassicServerRe * Validates the Accept header for SSE (Server-Sent Events) connections in GET requests. * Checks if the request contains the required {@code text/event-stream} content type. * - * @param request The incoming {@link HttpClassicServerRequest} - * @param response The {@link HttpClassicServerResponse} to set status code if validation fails - * @return An error {@link Entity} if validation fails, {@code null} if validation succeeds + * @param request The incoming {@link HttpClassicServerRequest}. + * @param response The {@link HttpClassicServerResponse} to set status code if validation fails. + * @return An error {@link Entity} if validation fails, {@code null} if validation succeeds. */ private Object validateGetAcceptHeaders(HttpClassicServerRequest request, HttpClassicServerResponse response) { String acceptHeaders = request.headers().first(MessageHeaderNames.ACCEPT).orElse(""); if (!acceptHeaders.contains(MimeType.TEXT_EVENT_STREAM.value())) { response.statusCode(HttpResponseStatus.BAD_REQUEST.statusCode()); - return Entity.createText(response, "Invalid Accept header. Expected TEXT_EVENT_STREAM"); + return Entity.createText(response, "Invalid Accept header. Expected TEXT_EVENT_STREAM."); } return null; } @@ -339,9 +272,9 @@ private Object validateGetAcceptHeaders(HttpClassicServerRequest request, HttpCl * Checks if the request contains both {@code text/event-stream} and {@code application/json} content types, * as POST requests may return either SSE streams or JSON responses. * - * @param request The incoming {@link HttpClassicServerRequest} - * @param response The {@link HttpClassicServerResponse} to set status code if validation fails - * @return An error {@link Entity} with {@link McpError} if validation fails, {@code null} if validation succeeds + * @param request The incoming {@link HttpClassicServerRequest}. + * @param response The {@link HttpClassicServerResponse} to set status code if validation fails. + * @return An error {@link Entity} with {@link McpError} if validation fails, {@code null} if validation succeeds. */ private Object validatePostAcceptHeaders(HttpClassicServerRequest request, HttpClassicServerResponse response) { String acceptHeaders = request.headers().first(MessageHeaderNames.ACCEPT).orElse(""); @@ -350,7 +283,7 @@ private Object validatePostAcceptHeaders(HttpClassicServerRequest request, HttpC response.statusCode(HttpResponseStatus.BAD_REQUEST.statusCode()); return Entity.createObject(response, McpError.builder(McpSchema.ErrorCodes.INVALID_REQUEST) - .message("Invalid Accept headers. Expected TEXT_EVENT_STREAM and APPLICATION_JSON") + .message("Invalid Accept headers. Expected TEXT_EVENT_STREAM and APPLICATION_JSON.") .build()); } return null; @@ -361,25 +294,18 @@ private Object validatePostAcceptHeaders(HttpClassicServerRequest request, HttpC * This method checks both the presence of the {@code mcp-session-id} header and * the existence of the corresponding session in the active sessions map. * - * @param request The incoming {@link HttpClassicServerRequest} containing the session ID header - * @param response The {@link HttpClassicServerResponse} to set status code if validation fails + * @param request The incoming {@link HttpClassicServerRequest} containing the session ID header. + * @param response The {@link HttpClassicServerResponse} to set status code if validation fails. * @return An error {@link Entity} if validation fails (either missing session ID or session not found), - * {@code null} if validation succeeds + * {@code null} if validation succeeds. */ private Object validateRequestSessionId(HttpClassicServerRequest request, HttpClassicServerResponse response) { if (!request.headers().contains(HttpHeaders.MCP_SESSION_ID)) { response.statusCode(HttpResponseStatus.BAD_REQUEST.statusCode()); - return Entity.createText(response, "Session ID required in mcp-session-id header"); + return Entity.createText(response, "Session ID required in mcp-session-id header."); } String sessionId = request.headers().first(HttpHeaders.MCP_SESSION_ID).orElse(""); - if (this.sessions.get(sessionId) == null) { - response.statusCode(HttpResponseStatus.NOT_FOUND.statusCode()); - return Entity.createObject(response, - McpError.builder(McpSchema.ErrorCodes.INVALID_PARAMS) - .message("Session not found: " + sessionId) - .build()); - } - return null; + return this.validateSessionExists(sessionId, response); } /** @@ -387,12 +313,12 @@ private Object validateRequestSessionId(HttpClassicServerRequest request, HttpCl * Replays previously sent messages starting from the last received event ID, * allowing clients to recover missed messages after reconnection. * - * @param request The incoming {@link HttpClassicServerRequest} containing the {@code Last-Event-ID} header - * @param transportContext The {@link McpTransportContext} for request context propagation - * @param sessionId The MCP session identifier - * @param session The {@link McpStreamableServerSession} to replay messages from - * @param sessionTransport The {@link FitStreamableMcpSessionTransport} for sending replayed messages - * @param emitter The SSE {@link Emitter} to send {@link TextEvent} to the client + * @param request The incoming {@link HttpClassicServerRequest} containing the {@code Last-Event-ID} header. + * @param transportContext The {@link McpTransportContext} for request context propagation. + * @param sessionId The MCP session identifier. + * @param session The {@link McpStreamableServerSession} to replay messages from. + * @param sessionTransport The {@link FitStreamableMcpSessionTransport} for sending replayed messages. + * @param emitter The SSE {@link Emitter} to send {@link TextEvent} to the client. */ private void handleReplaySseRequest(HttpClassicServerRequest request, McpTransportContext transportContext, String sessionId, McpStreamableServerSession session, FitStreamableMcpSessionTransport sessionTransport, @@ -425,10 +351,10 @@ private void handleReplaySseRequest(HttpClassicServerRequest request, McpTranspo * Creates a persistent connection that allows the server to push messages to the client * as they become available. The stream remains open until explicitly closed or an error occurs. * - * @param sessionId The MCP session identifier - * @param session The {@link McpStreamableServerSession} to establish the listening stream for - * @param sessionTransport The {@link FitStreamableMcpSessionTransport} for bidirectional communication - * @param emitter The SSE {@link Emitter} to send {@link TextEvent} to the client + * @param sessionId The MCP session identifier. + * @param session The {@link McpStreamableServerSession} to establish the listening stream for. + * @param sessionTransport The {@link FitStreamableMcpSessionTransport} for bidirectional communication. + * @param emitter The SSE {@link Emitter} to send {@link TextEvent} to the client. */ private void handleEstablishSseRequest(String sessionId, McpStreamableServerSession session, FitStreamableMcpSessionTransport sessionTransport, Emitter emitter) { @@ -444,11 +370,13 @@ public void onEmittedData(TextEvent data) { @Override public void onCompleted() { - logger.info("[SSE] Completed SSE emitting. [sessionId={}]", sessionId); + FitMcpStreamableServerTransportProvider.logger.info("[SSE] Completed SSE emitting. [sessionId={}]", + sessionId); try { listeningStream.close(); } catch (Exception e) { - logger.warn("[SSE] Error closing listeningStream on complete. [sessionId={}, error={}]", + FitMcpStreamableServerTransportProvider.logger.warn( + "[SSE] Error closing listeningStream on complete. [sessionId={}, error={}]", sessionId, e.getMessage()); } @@ -456,11 +384,14 @@ public void onCompleted() { @Override public void onFailed(Exception cause) { - logger.warn("[SSE] SSE failed. [sessionId={}, cause={}]", sessionId, cause.getMessage()); + FitMcpStreamableServerTransportProvider.logger.warn("[SSE] SSE failed. [sessionId={}, cause={}]", + sessionId, + cause.getMessage()); try { listeningStream.close(); } catch (Exception e) { - logger.warn("[SSE] Error closing listeningStream on failure. [sessionId={}, error={}]", + FitMcpStreamableServerTransportProvider.logger.warn( + "[SSE] Error closing listeningStream on failure. [sessionId={}, error={}]", sessionId, e.getMessage()); } @@ -473,18 +404,18 @@ public void onFailed(Exception cause) { * Creates a new {@link McpStreamableServerSession} and returns the initialization result * with the assigned session ID in the response headers. * - * @param request The incoming {@link HttpClassicServerRequest} - * @param response The {@link HttpClassicServerResponse} to set session ID and initialization result + * @param request The incoming {@link HttpClassicServerRequest}. + * @param response The {@link HttpClassicServerResponse} to set session ID and initialization result. * @param jsonrpcRequest The {@link McpSchema.JSONRPCRequest} containing {@link McpSchema.InitializeRequest} - * parameters + * parameters. * @return An {@link Entity} containing the {@link McpSchema.JSONRPCResponse} with * {@link McpSchema.InitializeResult} - * on success, or an error {@link Entity} with {@link McpError} on failure + * on success, or an error {@link Entity} with {@link McpError} on failure. */ private Object handleInitializeRequest(HttpClassicServerRequest request, HttpClassicServerResponse response, McpSchema.JSONRPCRequest jsonrpcRequest) { McpSchema.InitializeRequest initializeRequest = - jsonMapper.convertValue(jsonrpcRequest.params(), new TypeRef() {}); + this.jsonMapper.convertValue(jsonrpcRequest.params(), new TypeRef() {}); McpStreamableServerSession.McpStreamableServerSessionInit init = this.sessionFactory.startSession(initializeRequest); this.sessions.put(init.session().getId(), init.session()); @@ -509,17 +440,17 @@ private Object handleInitializeRequest(HttpClassicServerRequest request, HttpCla * Handles different types of JSON-RPC messages (Response, Notification, Request). * Routes the message to the appropriate handler method based on its type. * - * @param message The {@link McpSchema.JSONRPCMessage} to handle - * @param request The incoming {@link HttpClassicServerRequest} - * @param requestBody The {@link String} of request body. - * @param transportContext The {@link McpTransportContext} for request context propagation - * @param response The {@link HttpClassicServerResponse} to set status code and return data - * @return An {@link Entity} or {@link Choir} containing the response data, or {@code null} for accepted messages + * @param message The {@link McpSchema.JSONRPCMessage} to handle. + * @param request The incoming {@link HttpClassicServerRequest}. + * @param requestBody The {@link String} of request body.. + * @param transportContext The {@link McpTransportContext} for request context propagation. + * @param response The {@link HttpClassicServerResponse} to set status code and return data. + * @return An {@link Entity} or {@link Choir} containing the response data, or {@code null} for accepted messages. */ private Object handleJsonRpcMessage(McpSchema.JSONRPCMessage message, HttpClassicServerRequest request, String requestBody, McpTransportContext transportContext, HttpClassicServerResponse response) { // Get session ID and session - Object sessionError = validateRequestSessionId(request, response); + Object sessionError = this.validateRequestSessionId(request, response); if (sessionError != null) { return sessionError; } @@ -528,17 +459,17 @@ private Object handleJsonRpcMessage(McpSchema.JSONRPCMessage message, HttpClassi logger.info("[POST] Receiving message from session. [sessionId={}, requestBody={}]", sessionId, requestBody); if (message instanceof McpSchema.JSONRPCResponse jsonrpcResponse) { - handleJsonRpcResponse(jsonrpcResponse, session, transportContext, response); + this.handleJsonRpcResponse(jsonrpcResponse, session, transportContext, response); return null; } else if (message instanceof McpSchema.JSONRPCNotification jsonrpcNotification) { - handleJsonRpcNotification(jsonrpcNotification, session, transportContext, response); + this.handleJsonRpcNotification(jsonrpcNotification, session, transportContext, response); return null; } else if (message instanceof McpSchema.JSONRPCRequest jsonrpcRequest) { - return handleJsonRpcRequest(jsonrpcRequest, session, sessionId, transportContext, response); + return this.handleJsonRpcRequest(jsonrpcRequest, session, sessionId, transportContext, response); } else { response.statusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.statusCode()); return Entity.createObject(response, - McpError.builder(McpSchema.ErrorCodes.INTERNAL_ERROR).message("Unknown message type").build()); + McpError.builder(McpSchema.ErrorCodes.INTERNAL_ERROR).message("Unknown message type.").build()); } } @@ -547,10 +478,10 @@ private Object handleJsonRpcMessage(McpSchema.JSONRPCMessage message, HttpClassi * Accepts the response and delivers it to the corresponding pending request within the session. * Sets the HTTP response status to {@code 202 Accepted} to acknowledge receipt. * - * @param jsonrpcResponse The {@link McpSchema.JSONRPCResponse} from the client - * @param session The {@link McpStreamableServerSession} to accept the response - * @param transportContext The {@link McpTransportContext} for request context propagation - * @param response The {@link HttpClassicServerResponse} to set the status code + * @param jsonrpcResponse The {@link McpSchema.JSONRPCResponse} from the client. + * @param session The {@link McpStreamableServerSession} to accept the response. + * @param transportContext The {@link McpTransportContext} for request context propagation. + * @param response The {@link HttpClassicServerResponse} to set the status code. */ private void handleJsonRpcResponse(McpSchema.JSONRPCResponse jsonrpcResponse, McpStreamableServerSession session, McpTransportContext transportContext, HttpClassicServerResponse response) { @@ -563,10 +494,10 @@ private void handleJsonRpcResponse(McpSchema.JSONRPCResponse jsonrpcResponse, Mc * Notifications are one-way messages that do not require a response. * Sets the HTTP response status to {@code 202 Accepted} to acknowledge receipt. * - * @param jsonrpcNotification The {@link McpSchema.JSONRPCNotification} from the client - * @param session The {@link McpStreamableServerSession} to accept the notification - * @param transportContext The {@link McpTransportContext} for request context propagation - * @param response The {@link HttpClassicServerResponse} to set the status code + * @param jsonrpcNotification The {@link McpSchema.JSONRPCNotification} from the client. + * @param session The {@link McpStreamableServerSession} to accept the notification. + * @param transportContext The {@link McpTransportContext} for request context propagation. + * @param response The {@link HttpClassicServerResponse} to set the status code. */ private void handleJsonRpcNotification(McpSchema.JSONRPCNotification jsonrpcNotification, McpStreamableServerSession session, McpTransportContext transportContext, @@ -582,12 +513,12 @@ private void handleJsonRpcNotification(McpSchema.JSONRPCNotification jsonrpcNoti * Creates an SSE stream to send the response and any subsequent messages back to the client. * This allows for real-time, bidirectional communication during request processing. * - * @param jsonrpcRequest The {@link McpSchema.JSONRPCRequest} from the client - * @param session The {@link McpStreamableServerSession} to process the request - * @param sessionId The MCP session identifier for logging and tracking - * @param transportContext The {@link McpTransportContext} for request context propagation - * @param response The {@link HttpClassicServerResponse} for the SSE stream - * @return A {@link Choir} containing {@link TextEvent} for SSE streaming of the response + * @param jsonrpcRequest The {@link McpSchema.JSONRPCRequest} from the client. + * @param session The {@link McpStreamableServerSession} to process the request. + * @param sessionId The MCP session identifier for logging and tracking. + * @param transportContext The {@link McpTransportContext} for request context propagation. + * @param response The {@link HttpClassicServerResponse} for the SSE stream. + * @return A {@link Choir} containing {@link TextEvent} for SSE streaming of the response. */ private Object handleJsonRpcRequest(McpSchema.JSONRPCRequest jsonrpcRequest, McpStreamableServerSession session, String sessionId, McpTransportContext transportContext, HttpClassicServerResponse response) { @@ -600,12 +531,15 @@ public void onEmittedData(TextEvent data) { @Override public void onCompleted() { - logger.info("[SSE] Completed SSE emitting. [sessionId={}]", sessionId); + FitMcpStreamableServerTransportProvider.logger.info("[SSE] Completed SSE emitting. [sessionId={}]", + sessionId); } @Override public void onFailed(Exception e) { - logger.warn("[SSE] SSE failed. [sessionId={}, cause={}]", sessionId, e.getMessage()); + FitMcpStreamableServerTransportProvider.logger.warn("[SSE] SSE failed. [sessionId={}, cause={}]", + sessionId, + e.getMessage()); } }); @@ -632,148 +566,34 @@ public void onFailed(Exception e) { * underlying SSE builder to prevent race conditions when multiple threads attempt to * send messages concurrently. */ - private class FitStreamableMcpSessionTransport implements McpStreamableServerTransport { - private final String sessionId; - private final Emitter emitter; - private final HttpClassicServerResponse response; - - private final ReentrantLock lock = new ReentrantLock(); - - private volatile boolean closed = false; - + private class FitStreamableMcpSessionTransport extends AbstractFitMcpSessionTransport + implements McpStreamableServerTransport { /** * Creates a new session transport with the specified ID and SSE builder. * - * @param sessionId The unique identifier for this session - * @param emitter The emitter for sending events - * @param response The HTTP response for checking connection status + * @param sessionId The unique identifier for this session. + * @param emitter The emitter for sending events. + * @param response The HTTP response for checking connection status. */ FitStreamableMcpSessionTransport(String sessionId, Emitter emitter, HttpClassicServerResponse response) { - this.sessionId = sessionId; - this.emitter = emitter; - this.response = response; - logger.info("[SSE] Building SSE emitter. [sessionId={}]", sessionId); + super(sessionId, emitter, response); } - /** - * Sends a JSON-RPC message to the client through the SSE connection. - * - * @param message The JSON-RPC message to send - * @return A Mono that completes when the message has been sent - */ @Override public Mono sendMessage(McpSchema.JSONRPCMessage message) { - return sendMessage(message, null); + return this.doSendMessage(message, null); } - /** - * Sends a JSON-RPC message to the client through the SSE connection with a - * specific message ID. - * - * @param message The JSON-RPC message to send - * @param messageId The message ID for SSE event identification - * @return A Mono that completes when the message has been sent - */ @Override public Mono sendMessage(McpSchema.JSONRPCMessage message, String messageId) { - return Mono.fromRunnable(() -> { - if (this.closed) { - logger.info("[SSE] Attempted to send message to closed session. [sessionId={}]", this.sessionId); - return; - } - - this.lock.lock(); - try { - if (this.closed) { - logger.info("[SSE] Session was closed during message send attempt. [sessionId={}]", - this.sessionId); - return; - } - - // Check if connection is still active before sending - if (!this.response.isActive()) { - logger.warn("[SSE] Connection inactive detected while sending message. [sessionId={}]", - this.sessionId); - this.close(); - return; - } - - String jsonText = jsonMapper.writeValueAsString(message); - TextEvent textEvent = - TextEvent.custom().id(this.sessionId).event(Event.MESSAGE.code()).data(jsonText).build(); - this.emitter.emit(textEvent); - - logger.info("[SSE] Sending message to session. [sessionId={}, jsonText={}]", - this.sessionId, - jsonText); - } catch (Exception e) { - logger.error("[SSE] Failed to send message to session. [sessionId={}, error={}]", - this.sessionId, - e.getMessage(), - e); - try { - this.emitter.fail(e); - } catch (Exception errorException) { - logger.error("[SSE] Failed to send error to SSE builder. [sessionId={}, error={}]", - this.sessionId, - errorException.getMessage(), - errorException); - } - } finally { - this.lock.unlock(); - } - }); + return this.doSendMessage(message, messageId); } - /** - * Converts data from one type to another using the configured jsonMapper. - * - * @param data The source data object to convert - * @param typeRef The target type reference - * @param The target type - * @return The converted object of type T - */ - @Override - public T unmarshalFrom(Object data, TypeRef typeRef) { - return jsonMapper.convertValue(data, typeRef); - } - - /** - * Initiates a graceful shutdown of the transport. - * - * @return A Mono that completes when the shutdown is complete - */ - @Override - public Mono closeGracefully() { - return Mono.fromRunnable(FitStreamableMcpSessionTransport.this::close); - } - - /** - * Closes the transport immediately. - */ @Override public void close() { - this.lock.lock(); - try { - if (this.closed) { - logger.info("[SSE] Session transport already closed. [sessionId={}]", this.sessionId); - return; - } - - this.closed = true; - - this.emitter.complete(); - logger.info("[SSE] Closed SSE builder successfully. [sessionId={}]", sessionId); - } catch (Exception e) { - logger.warn("[SSE] Failed to complete SSE builder. [sessionId={}, error={}]", - sessionId, - e.getMessage()); - } finally { - this.lock.unlock(); - } + this.doClose(); } - } public static Builder builder() { @@ -794,11 +614,11 @@ public static class Builder { * Sets the jsonMapper to use for JSON serialization/deserialization of MCP messages. * * @param jsonMapper The jsonMapper instance. Must not be null. - * @return this builder instance - * @throws IllegalArgumentException if jsonMapper is null + * @return This builder instance. + * @throws IllegalArgumentException if jsonMapper is null. */ public Builder jsonMapper(McpJsonMapper jsonMapper) { - Validation.notNull(jsonMapper, "jsonMapper must not be null"); + Validation.notNull(jsonMapper, "Json mapper must not be null."); this.jsonMapper = jsonMapper; return this; } @@ -806,8 +626,8 @@ public Builder jsonMapper(McpJsonMapper jsonMapper) { /** * Sets whether to disallow DELETE requests on the endpoint. * - * @param disallowDelete true to disallow DELETE requests, false otherwise - * @return this builder instance + * @param disallowDelete true to disallow DELETE requests, false otherwise. + * @return This builder instance. */ public Builder disallowDelete(boolean disallowDelete) { this.disallowDelete = disallowDelete; @@ -822,11 +642,11 @@ public Builder disallowDelete(boolean disallowDelete) { * * @param contextExtractor The contextExtractor to fill in a * {@link McpTransportContext}. - * @return this builder instance - * @throws IllegalArgumentException if contextExtractor is null + * @return This builder instance. + * @throws IllegalArgumentException if contextExtractor is null. */ public Builder contextExtractor(McpTransportContextExtractor contextExtractor) { - Validation.notNull(contextExtractor, "contextExtractor must not be null"); + Validation.notNull(contextExtractor, "Context extractor must not be null."); this.contextExtractor = contextExtractor; return this; } @@ -836,8 +656,8 @@ public Builder contextExtractor(McpTransportContextExtractor new DefaultMcpStreamableServer(null, mcpSyncServer)); + () -> new FitMcpServer(null, mcpSyncServer, toolChangedObserverRegistry)); assertThat(exception).isNotNull().hasMessage("The tool execute service cannot be null."); } } - @Nested - @DisplayName("registerToolsChangedObserver and Notification Tests") - class GivenRegisterAndNotify { - @Test - @DisplayName("Should notify observers when tools are added or removed") - void notifyObserversOnToolAddOrRemove() { - DefaultMcpStreamableServer server = new DefaultMcpStreamableServer(toolExecuteService, mcpSyncServer); - McpServer.ToolsChangedObserver observer = mock(McpServer.ToolsChangedObserver.class); - server.registerToolsChangedObserver(observer); - - Map schema = MapBuilder.get() - .put("type", "object") - .put("properties", Collections.emptyMap()) - .put("required", Collections.emptyList()) - .build(); - server.onToolAdded("tool1", "description1", schema); - verify(observer, times(1)).onToolsChanged(); - - server.onToolRemoved("tool1"); - verify(observer, times(2)).onToolsChanged(); - } - } - @Nested @DisplayName("onToolAdded Method Tests") class GivenOnToolAdded { @Test @DisplayName("Should add tool successfully with valid parameters") void addToolSuccessfully() { - DefaultMcpStreamableServer server = new DefaultMcpStreamableServer(toolExecuteService, mcpSyncServer); + FitMcpServer server = new FitMcpServer(toolExecuteService, mcpSyncServer, toolChangedObserverRegistry); String name = "tool1"; String description = "description1"; Map schema = MapBuilder.get() @@ -110,7 +90,7 @@ void addToolSuccessfully() { @Test @DisplayName("Should ignore invalid parameters and not add any tool") void ignoreInvalidParameters() { - DefaultMcpStreamableServer server = new DefaultMcpStreamableServer(toolExecuteService, mcpSyncServer); + FitMcpServer server = new FitMcpServer(toolExecuteService, mcpSyncServer, toolChangedObserverRegistry); Map schema = MapBuilder.get() .put("type", "object") .put("properties", Collections.emptyMap()) @@ -134,7 +114,7 @@ class GivenOnToolRemoved { @Test @DisplayName("Should remove an added tool correctly") void removeToolSuccessfully() { - DefaultMcpStreamableServer server = new DefaultMcpStreamableServer(toolExecuteService, mcpSyncServer); + FitMcpServer server = new FitMcpServer(toolExecuteService, mcpSyncServer, toolChangedObserverRegistry); Map schema = MapBuilder.get() .put("type", "object") .put("properties", Collections.emptyMap()) @@ -150,7 +130,7 @@ void removeToolSuccessfully() { @Test @DisplayName("Should ignore removal if name is blank") void ignoreBlankName() { - DefaultMcpStreamableServer server = new DefaultMcpStreamableServer(toolExecuteService, mcpSyncServer); + FitMcpServer server = new FitMcpServer(toolExecuteService, mcpSyncServer, toolChangedObserverRegistry); Map schema = MapBuilder.get() .put("type", "object") .put("properties", Collections.emptyMap()) diff --git a/framework/fel/java/plugins/tool-repository-simple/src/main/java/modelengine/fel/tool/support/SimpleToolRepository.java b/framework/fel/java/plugins/tool-repository-simple/src/main/java/modelengine/fel/tool/support/SimpleToolRepository.java index ae9bd7739..0385805ff 100644 --- a/framework/fel/java/plugins/tool-repository-simple/src/main/java/modelengine/fel/tool/support/SimpleToolRepository.java +++ b/framework/fel/java/plugins/tool-repository-simple/src/main/java/modelengine/fel/tool/support/SimpleToolRepository.java @@ -7,17 +7,18 @@ package modelengine.fel.tool.support; import static modelengine.fitframework.inspection.Validation.notBlank; -import static modelengine.fitframework.inspection.Validation.notNull; import static modelengine.fitframework.util.ObjectUtils.cast; import modelengine.fel.core.tool.ToolInfo; import modelengine.fel.tool.ToolInfoEntity; import modelengine.fel.tool.service.ToolChangedObserver; +import modelengine.fel.tool.service.ToolChangedObserverRegistry; import modelengine.fel.tool.service.ToolRepository; import modelengine.fitframework.annotation.Component; import modelengine.fitframework.log.Logger; import modelengine.fitframework.util.StringUtils; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -31,21 +32,24 @@ * @since 2024-08-15 */ @Component -public class SimpleToolRepository implements ToolRepository { +public class SimpleToolRepository implements ToolRepository, ToolChangedObserverRegistry { private static final Logger log = Logger.get(SimpleToolRepository.class); - private final ToolChangedObserver toolChangedObserver; private final Map toolCache = new ConcurrentHashMap<>(); + private final List toolChangedObservers = new ArrayList<>(); - /** - * Constructs a new instance of the SimpleToolRepository class. - * - * @param toolChangedObserver The observer to be notified when tools are added or removed, as a - * {@link ToolChangedObserver}. - * @throws IllegalStateException If {@code toolChangedObserver} is null. - */ - public SimpleToolRepository(ToolChangedObserver toolChangedObserver) { - this.toolChangedObserver = notNull(toolChangedObserver, "The tool changed observer cannot be null."); + @Override + public void register(ToolChangedObserver observer) { + if (observer != null) { + this.toolChangedObservers.add(observer); + } + } + + @Override + public void unregister(ToolChangedObserver observer) { + if (observer != null) { + this.toolChangedObservers.remove(observer); + } } @Override @@ -57,7 +61,17 @@ public void addTool(ToolInfoEntity tool) { this.toolCache.put(uniqueName, tool); log.info("Register tool[uniqueName={}] success.", uniqueName); Map parameters = cast(tool.schema().get("parameters")); - this.toolChangedObserver.onToolAdded(uniqueName, tool.description(), parameters); + this.toolChangedObservers.forEach(observer -> { + try { + observer.onToolAdded(uniqueName, tool.description(), parameters); + } catch (Exception e) { + log.error("Failed to notify observer of tool added. [observer={}, uniqueName={}, error={}]", + observer.getClass().getName(), + uniqueName, + e.getMessage(), + e); + } + }); } @Override @@ -68,7 +82,17 @@ public void deleteTool(String namespace, String toolName) { String uniqueName = ToolInfo.identify(namespace, toolName); this.toolCache.remove(uniqueName); log.info("Unregister tool[uniqueName={}] success.", uniqueName); - this.toolChangedObserver.onToolRemoved(uniqueName); + this.toolChangedObservers.forEach(observer -> { + try { + observer.onToolRemoved(uniqueName); + } catch (Exception e) { + log.error("Failed to notify observer of tool removed. [observer={}, uniqueName={}, error={}]", + observer.getClass().getName(), + uniqueName, + e.getMessage(), + e); + } + }); } @Override diff --git a/framework/fel/java/services/tool-service/src/main/java/modelengine/fel/tool/service/ToolChangedObserverRegistry.java b/framework/fel/java/services/tool-service/src/main/java/modelengine/fel/tool/service/ToolChangedObserverRegistry.java new file mode 100644 index 000000000..0e1211d26 --- /dev/null +++ b/framework/fel/java/services/tool-service/src/main/java/modelengine/fel/tool/service/ToolChangedObserverRegistry.java @@ -0,0 +1,29 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * This file is a part of the ModelEngine Project. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package modelengine.fel.tool.service; + +/** + * 工具变更观察者注册表接口。 + * + * @author 黄可欣 + * @since 2025-11-20 + */ +public interface ToolChangedObserverRegistry { + /** + * 注册工具变更观察者。 + * + * @param observer 待注册的工具变更观察者。 + */ + void register(ToolChangedObserver observer); + + /** + * 注销工具变更观察者。 + * + * @param observer 需要注销的工具变更观察者。 + */ + void unregister(ToolChangedObserver observer); +}