Skip to content

Commit e6de1a5

Browse files
committed
修复Get结束Emitter不close的问题
1 parent 65e0a0f commit e6de1a5

4 files changed

Lines changed: 44 additions & 8 deletions

File tree

framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/DefaultMcpServer.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,19 @@
66

77
package modelengine.fel.tool.mcp.server;
88

9-
import com.fasterxml.jackson.databind.ObjectMapper;
109
import io.modelcontextprotocol.server.McpServerFeatures;
1110
import io.modelcontextprotocol.server.McpSyncServerExchange;
1211
import io.modelcontextprotocol.spec.McpSchema;
1312
import modelengine.fel.tool.mcp.entity.ServerSchema;
1413
import modelengine.fel.tool.mcp.entity.Tool;
1514
import modelengine.fel.tool.service.ToolChangedObserver;
1615
import modelengine.fel.tool.service.ToolExecuteService;
17-
import modelengine.fitframework.annotation.Bean;
1816
import modelengine.fitframework.annotation.Component;
1917
import io.modelcontextprotocol.server.McpSyncServer;
2018
import modelengine.fitframework.log.Logger;
2119
import modelengine.fitframework.util.MapUtils;
2220
import modelengine.fitframework.util.StringUtils;
2321

24-
import java.time.Duration;
2522
import java.util.ArrayList;
2623
import java.util.List;
2724
import java.util.Map;
@@ -34,7 +31,8 @@
3431
import static modelengine.fitframework.inspection.Validation.notNull;
3532

3633
/**
37-
* Mcp Server implementing interface {@link McpServer}, {@link ToolChangedObserver} with MCP SDK.
34+
* Mcp Server implementing interface {@link McpServer}, {@link ToolChangedObserver}
35+
* with MCP Server Bean {@link McpSyncServer}.
3836
*
3937
* @author 黄可欣
4038
* @since 2025-09-30

framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/DefaultMcpStreamableServerTransportProvider.java

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,14 +278,17 @@ public void onEmittedData(TextEvent data) {
278278
@Override
279279
public void onCompleted() {
280280
logger.info("[SSE] Completed SSE emitting for session: {}", sessionId);
281-
listeningStream.close();
282281
}
283282

284283
@Override
285284
public void onFailed(Exception cause) {
286-
// No action needed
285+
logger.warn("[SSE] SSE failed for session: {}, cause: {}", sessionId, cause.getMessage());
287286
}
288287
});
288+
289+
// Add connection monitoring to detect client disconnection
290+
// This is a workaround to ensure listeningStream.close() is called when client disconnects
291+
startConnectionMonitoring(sessionId, listeningStream, response);
289292
}
290293
});
291294
}
@@ -296,6 +299,43 @@ public void onFailed(Exception cause) {
296299
}
297300
}
298301

302+
/**
303+
* Starts connection monitoring to detect client disconnection and ensure proper cleanup.
304+
* This is a workaround to ensure listeningStream.close() is called when client disconnects.
305+
*
306+
* @param sessionId The session ID
307+
* @param listeningStream The listening stream to close when connection is lost
308+
* @param response The HTTP response to check for connection status
309+
*/
310+
private void startConnectionMonitoring(String sessionId,
311+
McpStreamableServerSession.McpStreamableServerSessionStream listeningStream,
312+
HttpClassicServerResponse response) {
313+
// Use a separate thread to periodically check connection status
314+
Thread monitoringThread = new Thread(() -> {
315+
try {
316+
while (!Thread.currentThread().isInterrupted()) {
317+
Thread.sleep(1000); // Check every second
318+
319+
// Check if the HTTP response is still active
320+
if (!response.isActive()) {
321+
logger.info("[SSE] Connection lost for session, completing emitter to trigger cleanup");
322+
listeningStream.close();
323+
break;
324+
}
325+
}
326+
} catch (InterruptedException e) {
327+
logger.debug("[SSE] Connection monitoring interrupted for session");
328+
Thread.currentThread().interrupt();
329+
} catch (Exception e) {
330+
logger.warn("[SSE] Error in connection monitoring: {}", e.getMessage());
331+
}
332+
});
333+
334+
monitoringThread.setDaemon(true);
335+
monitoringThread.setName("sse-connection-monitor-" + sessionId);
336+
monitoringThread.start();
337+
}
338+
299339
/**
300340
* Handles POST requests for incoming JSON-RPC messages from clients.
301341
*

framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/McpServer.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import modelengine.fel.tool.mcp.entity.Tool;
1313

1414
import java.util.List;
15-
import java.util.Map;
1615
import java.util.function.BiFunction;
1716

1817
/**

framework/fel/java/plugins/tool-mcp-server/src/test/java/modelengine/fel/tool/mcp/server/support/DefaultMcpServerTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import static org.mockito.Mockito.verify;
1414

1515
import io.modelcontextprotocol.server.McpSyncServer;
16-
import io.modelcontextprotocol.spec.McpSchema;
1716
import modelengine.fel.tool.mcp.entity.ServerSchema;
1817
import modelengine.fel.tool.mcp.entity.Tool;
1918
import modelengine.fel.tool.mcp.server.DefaultMcpServer;

0 commit comments

Comments
 (0)