Skip to content

Commit 837336b

Browse files
jfallowsclaude
andauthored
fix(binding-mcp): keep proxied tool-call result when upstream sends a resumable flush (#1823)
Real MCP SDK upstreams (StreamableHTTP transport) emit a leading empty-data SSE event on every response; Zilla's client relays it north as a resumable flush. For a unary tools/call the north response is plain application/json, so McpServer.onAppFlush hit the !sseUpgrade branch and called cleanupApp, tearing down the reply stream before the result arrived and dropping it (the client saw the connection close mid-response with no result). Ignore a KIND_RESUMABLE flush on a non-SSE response instead of cleaning up, so the result still flows. Adds a tools.call.resumable IT (McpServerIT against the engine, plus an ApplicationIT peer) covering a resumable flush delivered before the result on a non-SSE response. Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent f601d56 commit 837336b

5 files changed

Lines changed: 200 additions & 17 deletions

File tree

  • runtime/binding-mcp/src
  • specs/binding-mcp.spec/src
    • main/scripts/io/aklivity/zilla/specs/binding/mcp/streams/application/tools.call.resumable
    • test/java/io/aklivity/zilla/specs/binding/mcp/streams/application

runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/stream/McpServerFactory.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4714,32 +4714,32 @@ private void onAppFlush(
47144714

47154715
assert replyAck <= replySeq;
47164716

4717+
final McpFlushExFW flushEx = extension.get(mcpFlushExRO::tryWrap);
4718+
47174719
if (replySeq > replyAck + decodeMax)
47184720
{
47194721
cleanupApp(traceId, authorization);
47204722
}
47214723
else if (!server.sseUpgrade)
47224724
{
4723-
cleanupApp(traceId, authorization);
4725+
if (flushEx == null || flushEx.kind() != McpFlushExFW.KIND_RESUMABLE)
4726+
{
4727+
cleanupApp(traceId, authorization);
4728+
}
47244729
}
4725-
else if (extension.sizeof() > 0)
4730+
else if (flushEx != null)
47264731
{
4727-
final McpFlushExFW flushEx =
4728-
mcpFlushExRO.tryWrap(extension.buffer(), extension.offset(), extension.limit());
4729-
if (flushEx != null)
4732+
if (flushEx.kind() == McpFlushExFW.KIND_ELICIT_COMPLETE)
47304733
{
4731-
if (flushEx.kind() == McpFlushExFW.KIND_ELICIT_COMPLETE)
4732-
{
4733-
onAppFlushElicitComplete(traceId, authorization, flushEx.elicitComplete());
4734-
}
4735-
else if (sse != null)
4736-
{
4737-
encodeRequestEventViaEventStream(traceId, authorization, flushEx);
4738-
}
4739-
else
4740-
{
4741-
server.doEncodeRequestEvent(traceId, authorization, requestId, flushEx);
4742-
}
4734+
onAppFlushElicitComplete(traceId, authorization, flushEx.elicitComplete());
4735+
}
4736+
else if (sse != null)
4737+
{
4738+
encodeRequestEventViaEventStream(traceId, authorization, flushEx);
4739+
}
4740+
else
4741+
{
4742+
server.doEncodeRequestEvent(traceId, authorization, requestId, flushEx);
47434743
}
47444744
}
47454745
}

runtime/binding-mcp/src/test/java/io/aklivity/zilla/runtime/binding/mcp/internal/stream/McpServerIT.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,16 @@ public void shouldCallToolWithTimeout() throws Exception
306306

307307
@Test
308308
@Configuration("server.timeout.yaml")
309+
@Specification({
310+
"${net}/tools.call/client",
311+
"${app}/tools.call.resumable/server"})
312+
public void shouldCallToolWithUpstreamResumableFlush() throws Exception
313+
{
314+
k3po.finish();
315+
}
316+
317+
@Test
318+
@Configuration("server.yaml")
309319
@Specification({
310320
"${net}/tools.call.elicit.completed/client",
311321
"${app}/tools.call.elicit.completed/server"})
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
#
2+
# Copyright 2021-2024 Aklivity Inc
3+
#
4+
# Licensed under the Aklivity Community License (the "License"); you may not use
5+
# this file except in compliance with the License. You may obtain a copy of the
6+
# License at
7+
#
8+
# https://www.aklivity.io/aklivity-community-license/
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
13+
# specific language governing permissions and limitations under the License.
14+
#
15+
16+
connect "zilla://streams/app0"
17+
option zilla:window 8192
18+
option zilla:transmission "half-duplex"
19+
20+
write zilla:begin.ext ${mcp:beginEx()
21+
.typeId(zilla:id("mcp"))
22+
.lifecycle()
23+
.build()
24+
.build()}
25+
26+
connected
27+
28+
read zilla:begin.ext ${mcp:matchBeginEx()
29+
.typeId(zilla:id("mcp"))
30+
.lifecycle()
31+
.sessionId("session-1")
32+
.build()
33+
.build()}
34+
35+
read notify LIFECYCLE_INITIALIZED
36+
37+
connect await LIFECYCLE_INITIALIZED
38+
"zilla://streams/app0"
39+
option zilla:window 8192
40+
option zilla:transmission "half-duplex"
41+
42+
write zilla:begin.ext ${mcp:beginEx()
43+
.typeId(zilla:id("mcp"))
44+
.toolsCall()
45+
.sessionId("session-1")
46+
.name("get_weather")
47+
.contentLength(59)
48+
.build()
49+
.build()}
50+
51+
connected
52+
53+
write '{'
54+
'"name":"get_weather",'
55+
'"arguments":'
56+
'{'
57+
'"location": "New York"'
58+
'}'
59+
'}'
60+
61+
read advised zilla:flush ${mcp:matchFlushEx()
62+
.typeId(zilla:id("mcp"))
63+
.resumable()
64+
.id("0")
65+
.build()
66+
.build()}
67+
68+
read '{'
69+
'"content":'
70+
'['
71+
'{'
72+
'"type": "text",'
73+
'"text": "Current weather in New York:\\nTemperature: 72°F\\nConditions: Partly cloudy"'
74+
'}'
75+
'],'
76+
'"isError": false'
77+
'}'
78+
read closed
79+
80+
write close
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
#
2+
# Copyright 2021-2024 Aklivity Inc
3+
#
4+
# Licensed under the Aklivity Community License (the "License"); you may not use
5+
# this file except in compliance with the License. You may obtain a copy of the
6+
# License at
7+
#
8+
# https://www.aklivity.io/aklivity-community-license/
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
13+
# specific language governing permissions and limitations under the License.
14+
#
15+
16+
property serverAddress "zilla://streams/app0"
17+
18+
accept ${serverAddress}
19+
option zilla:window 8192
20+
option zilla:transmission "half-duplex"
21+
22+
accepted
23+
24+
read zilla:begin.ext ${mcp:matchBeginEx()
25+
.typeId(zilla:id("mcp"))
26+
.lifecycle()
27+
.build()
28+
.build()}
29+
30+
connected
31+
32+
write zilla:begin.ext ${mcp:beginEx()
33+
.typeId(zilla:id("mcp"))
34+
.lifecycle()
35+
.sessionId("session-1")
36+
.build()
37+
.build()}
38+
write flush
39+
40+
accepted
41+
42+
read zilla:begin.ext ${mcp:matchBeginEx()
43+
.typeId(zilla:id("mcp"))
44+
.toolsCall()
45+
.sessionId("session-1")
46+
.name("get_weather")
47+
.contentLength(59)
48+
.build()
49+
.build()}
50+
51+
connected
52+
53+
read '{'
54+
'"name":"get_weather",'
55+
'"arguments":'
56+
'{'
57+
'"location": "New York"'
58+
'}'
59+
'}'
60+
61+
write flush
62+
63+
write advise zilla:flush ${mcp:flushEx()
64+
.typeId(zilla:id("mcp"))
65+
.resumable()
66+
.id("0")
67+
.build()
68+
.build()}
69+
70+
write '{'
71+
'"content":'
72+
'['
73+
'{'
74+
'"type": "text",'
75+
'"text": "Current weather in New York:\\nTemperature: 72°F\\nConditions: Partly cloudy"'
76+
'}'
77+
'],'
78+
'"isError": false'
79+
'}'
80+
write flush
81+
82+
write close
83+
84+
read closed

specs/binding-mcp.spec/src/test/java/io/aklivity/zilla/specs/binding/mcp/streams/application/ApplicationIT.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,15 @@ public void shouldReadResourceWith100kContents() throws Exception
198198
k3po.finish();
199199
}
200200

201+
@Test
202+
@Specification({
203+
"${app}/tools.call.resumable/client",
204+
"${app}/tools.call.resumable/server"})
205+
public void shouldCallToolWithUpstreamResumableFlush() throws Exception
206+
{
207+
k3po.finish();
208+
}
209+
201210
@Test
202211
@Specification({
203212
"${app}/tools.call/client",

0 commit comments

Comments
 (0)