Skip to content

Commit d1922d5

Browse files
jfallowsclaude
andauthored
Support aggregate event IDs for multi-toolkit MCP proxy (#1791)
* feat(binding-mcp): aggregate event ids across routes in mcp.proxy lifecycle In multi-route mode each upstream MCP server mints SSE event ids in its own namespace, so forwarding them verbatim collides on the agent's merged stream and breaks Last-Event-ID recovery on reconnect. The proxy now derives a unique short prefix per route from CRC32C of its `when[].toolkit` (URL-safe base64, shortest unique length), then on each outbound FlushEx that carries an id (resumable, listChanged, progress, elicitComplete) rewrites the id to a canonical sorted `<prefix>=<id>;<prefix>=<id>` aggregate of the latest known per-route ids. On inbound McpResumeChallengeEx the aggregate is decoded back to per-route ids and dispatched to each route's lifecycle client, opening clients lazily when needed; unknown prefixes are skipped without rejecting the resume. Single-route configs and the hydrater self-loop bypass aggregation entirely. McpBindingConfig validates that every route declares a toolkit when more than one route is configured. * test(binding-mcp): k3po IT coverage for aggregate-id resume demux Adds engine-driven McpProxyIT scenarios that exercise the demux path on McpLifecycleServer.onServerChallenge end-to-end: - lifecycle.events.resume.aggregate.event.id — client injects a resume challenge with the aggregate id "2=200;S=100"; the proxy decodes it, opens both upstream lifecycle clients lazily, and the upstreams each receive a resume challenge carrying their per-route id. When bluesky subsequently emits toolsListChanged id=101, the agent reads "2=200;S=101", proving the aggregate snapshot retained the inbound per-route ids. - lifecycle.events.resume.missing.prefix — client injects an aggregate containing an unknown prefix X plus a known prefix S; the proxy skips X (no route bound to that prefix) and only dispatches the bluesky route. When bluesky emits id=101 the agent reads "S=101". The challenge is injected via `read advise zilla:challenge` on the script's connect side — the same active-emit primitive the elicit scripts use on the accept side. This is the script analogue of mcp.server translating an HTTP Last-Event-ID header into a ChallengeFW on the lifecycle stream. Also fixes McpLifecycleServer.dispatchAggregateResume to seed lastEventIdsByPrefix with the per-route ids decoded from the inbound aggregate, so the first post-resume FlushEx from any one route still emits a complete aggregate covering every route present in the original Last-Event-ID rather than just the route that just emitted. * feat(binding-mcp): scope toolkit and cache to kind: proxy in schema Tightens the mcp binding schema so `toolkit` is required on every `when` item — and only permitted on `kind: proxy`. The `cache` option is likewise restricted to `kind: proxy`. Adds invalid configs and SchemaTest cases asserting that misconfigured server-kind bindings (cache option, toolkit-bearing when) and proxy-kind routes missing toolkit are rejected at config-parse time. Drops the runtime IllegalArgumentException in McpBindingConfig that checked multi-route toolkit presence; the schema now enforces this earlier (config-parse, not engine startup), making the runtime check dead code. Splits the multi-route resume scenarios into prefixed and non-prefixed variants per repo convention so both forms have peer-to-peer ApplicationIT coverage and engine-driven McpProxyIT coverage: - lifecycle.events.resume.aggregate.event.id{.prefixed}/ - lifecycle.events.resume.missing.prefix{.prefixed}/ The prefixed/client.rpt connects to the proxy's app surface and injects the aggregate id; the non-prefixed/client.rpt simulates what the proxy would do upstream — two direct connects, one per route, each carrying the per-route id. The shared server.rpt observes the per-route challenges via `write advised zilla:challenge`. * perf(binding-mcp): zero-allocation aggregate event id encode Replaces the StringBuilder-backed encode that allocated a StringBuilder plus a String per outbound FlushEx with a buffer-based encode that writes UTF-8 bytes directly into a factory-level MutableDirectBuffer and returns the byte count. McpLifecycleServer.mintAggregateEventId now wraps the encoded slice in a reusable OctetsFW (null if no route has emitted yet); the rewriteFlushExWithAggregateId helper passes buffer/offset/length to the generated id(DirectBuffer, int, int) builder overload on every applicable FlushEx variant (resumable, toolsListChanged, promptsListChanged, resourcesListChanged, progress, elicitComplete). McpAggregateEventIdTest exercises the new buffer API (including encode at a non-zero offset) and reads the encoded bytes back via getStringWithoutLengthUtf8 for assertion. * refactor(binding-mcp): trim verbose names in aggregate event id paths Tightens names that encoded preconditions or context that is already clear from scope: - McpBindingConfig.sortedPrefixes -> prefixes - McpBindingConfig.sortedRoutedIdsByPrefix -> routedIds - McpBindingConfig.prefixByRoutedId dropped (was never read) - McpAggregateEventId.encode parameters prefixesSortedAscending / idsAlignedWithPrefixes -> prefixes / ids - McpProxyLifecycleFactory: flushCodecBuffer -> flushExBuffer aggregateIdBuffer -> aggregateBuffer aggregateIdRO -> aggregateRO lastEventIdsByPrefix -> eventIds pendingResumeId -> resumeId recordRouteEventId(sourceRoutedId, perRouteId) -> recordEventId(routedId, id) mintAggregateEventId() -> mintAggregate() dispatchAggregateResume(...) -> dispatchResume(...) extractEventId(...) -> eventIdOf(...) (matches the existing capabilityOf / identifierOf getter convention) rewriteFlushExWithAggregateId(...) -> rewriteFlushEx(...) Pure rename — no behavioural change. All 188 tests + checkstyle + license remain green. * refactor(binding-mcp): rename aggregate-resume scenario dirs Resolves the "prefix"-overloading in the previous scenario names: - "prefix" in `missing.prefix` referred to the *toolkit* routing prefix - ".prefixed" suffix is the convention marker for the engine-driven variant (per tools.list.toolkit.multi.prefixed) The two collided, so `resume.missing.prefix.prefixed` read as nonsense. Renames: - lifecycle.events.resume.aggregate.event.id -> lifecycle.events.resume.aggregate - lifecycle.events.resume.aggregate.event.id.prefixed -> lifecycle.events.resume.aggregate.prefixed - lifecycle.events.resume.missing.prefix -> lifecycle.events.resume.partial - lifecycle.events.resume.missing.prefix.prefixed -> lifecycle.events.resume.partial.prefixed `event.id` is redundant once `resume` is in the name; `partial` describes the outcome (only the resolvable prefixes are dispatched) without re-using the "prefix" word. Test methods follow: - shouldResumeLifecycleEventsWithAggregateEventId -> shouldResumeLifecycleEventsAggregate - shouldResumeLifecycleEventsWithMissingPrefix -> shouldResumeLifecycleEventsPartial * refactor(binding-mcp): apply PR #1791 review feedback Naming and structure tweaks from the review: - McpBindingConfig: replace parallel arrays (prefixes[], routedIds[]) with a single McpAggregateRoute[] (new record (prefix, routedId)). - McpAggregateEventId.computePrefixes: drop the redundant single-toolkit special case (the general loop produces the same 1-char prefix); replace explicit Map builders with stream + toMap collectors. - McpAggregateEventId.encode: signature now takes McpAggregateRoute[] directly instead of a parallel String[] prefixes argument. - McpLifecycleServer.recordEventId -> onDecodeEventId (event-handler naming consistent with on*/do* convention). - McpLifecycleServer.mintAggregate -> nextEventId. - McpProxyLifecycleFactory.eventIdOf -> extractEventId. - onClientFlush now just forwards to doServerFlush(... routedId); the aggregation transform lives on doServerFlush where it can read its own server state directly. Inside, drop the extension.sizeof() > 0 guard and use wrap (not tryWrap) — the FlushEx is non-null on this path. - McpLifecycleClient.doClientResume: drop the resumeId parameter; the client's own resumeId field is read via a `this::injectResumeId` method reference on the builder. - McpLifecycleClient.sessionId: now private with a sessionId() accessor; external callers in McpProxyItemFactory / McpProxyListFactory updated (the McpLifecycleServer.sessionId field — a different class — stays package-private). - rewriteFlushEx locals renamed to (buffer, offset, length) to avoid shadowing the factory-level aggregateBuffer field. - forwardExtension -> newExtension. SchemaTest: positive cases for each kind with all supported options populated (server.options.yaml, proxy.options.yaml, client.options.yaml). Pure refactor — all 18 unit + 171 IT tests still green. * refactor(binding-mcp): apply second round PR #1791 feedback - Long2ObjectHashMap<String> replaces String[] eventIds for cleaner lookup and storage keyed by routedId - aggregateBuffer sized to fixed 1024 bytes instead of writeBuffer capacity - doClientResume guards on replyOpened and clears resumeId on send - doClientBegin pre-sets lifecycle sessionId from server, allowing list factory to drop fallback to server.lifecycle.sessionId - McpItem.doClientBegin accepts sessionId as parameter from server call site instead of computing locally - resumeClient extracted from dispatchResume lambda; onServerChallenge inlines aggregate decode under nested KIND_RESUME guard - onClientFlush records per-route event id; doServerFlush now only rewrites the aggregate - SchemaTest keeps a single positive options test per kind with all supported properties, drops the redundant toolkit-multi test * refactor(binding-mcp): apply third round PR #1791 feedback - Pass identifier as a parameter to McpItem.doClientBegin from the server call site instead of pulling from server.identifier inside - Rename resumeClient to onDecodeAggregateEventId for symmetry with onDecodeEventId - Replace rewriteFlushEx with kind-specific inject methods dispatched from a generic injectFlushEx; inline the builder chain at the call site in doServerFlush - Drop redundant shouldValidateServer; the with-options positive tests cover the supported properties for each kind * refactor(binding-mcp): drop WithOptions suffix from schema test names * refactor(binding-mcp): drop dead prefix field and sessionId accessors - Remove unused McpProxyItemFactory.McpServer.prefix field, its constructor parameter, the call-site argument, and the local variable from the resolve block - Inline McpProxyItemFactory.McpServer.sessionId() to direct lifecycle.sessionId field access at the two call sites - Promote McpProxyLifecycleFactory.McpLifecycleClient.sessionId to package-private and drop the sessionId() accessor; the sole external caller now reads the field directly * refactor(binding-mcp): store originId and routedId on proxy clients McpLifecycleClient, McpProxyItemFactory.McpClient, and McpProxyListFactory.McpListClient now hold originId and routedId as final fields set once in the constructor, instead of recomputing server.lifecycle.originId / server.routedId at every do* call site. The resolvedId field is renamed to routedId for consistency with the existing initialId/replyId/originId/routedId naming convention. * fix(binding-mcp): restore proxy.toolkit.multi.yaml for IT tests McpProxyIT references @configuration("proxy.toolkit.multi.yaml") for six multi-toolkit and aggregate-resume tests. The yaml was removed along with the redundant SchemaTest case, but the IT still depends on it for the multi-route proxy configuration. Local tests masked the gap because target/classes retained a stale copy; clean CI builds hit NullPointerException loading the missing config. --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent a21ba2e commit d1922d5

26 files changed

Lines changed: 1659 additions & 66 deletions

File tree

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Copyright 2021-2024 Aklivity Inc
3+
*
4+
* Licensed under the Aklivity Community License (the "License"); you may not use
5+
* this file except in compliance with the License. You may obtain a copy of the
6+
* License at
7+
*
8+
* https://www.aklivity.io/aklivity-community-license/
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
13+
* specific language governing permissions and limitations under the License.
14+
*/
15+
package io.aklivity.zilla.runtime.binding.mcp.internal.config;
16+
17+
import static java.util.function.Function.identity;
18+
import static java.util.stream.Collectors.toMap;
19+
20+
import java.nio.charset.StandardCharsets;
21+
import java.util.Base64;
22+
import java.util.Collection;
23+
import java.util.HashSet;
24+
import java.util.Map;
25+
import java.util.Set;
26+
import java.util.function.BiConsumer;
27+
import java.util.zip.CRC32C;
28+
29+
import org.agrona.MutableDirectBuffer;
30+
import org.agrona.collections.Long2ObjectHashMap;
31+
32+
public final class McpAggregateEventId
33+
{
34+
static final byte PAIR_DELIMITER = (byte) ';';
35+
static final byte KEY_VALUE_DELIMITER = (byte) '=';
36+
static final int MAX_PREFIX_LENGTH = 6;
37+
38+
private McpAggregateEventId()
39+
{
40+
}
41+
42+
public static Map<String, String> computePrefixes(
43+
Collection<String> toolkits)
44+
{
45+
if (toolkits == null || toolkits.isEmpty())
46+
{
47+
return Map.of();
48+
}
49+
50+
final Set<String> distinct = new HashSet<>(toolkits);
51+
final Map<String, String> encoded = distinct.stream()
52+
.collect(toMap(identity(), McpAggregateEventId::encodeCrc32c));
53+
54+
int length = 1;
55+
while (length <= MAX_PREFIX_LENGTH)
56+
{
57+
final Set<String> seen = new HashSet<>();
58+
boolean unique = true;
59+
for (String code : encoded.values())
60+
{
61+
if (!seen.add(code.substring(0, length)))
62+
{
63+
unique = false;
64+
break;
65+
}
66+
}
67+
if (unique)
68+
{
69+
break;
70+
}
71+
length++;
72+
}
73+
74+
if (length > MAX_PREFIX_LENGTH)
75+
{
76+
throw new IllegalStateException("unable to derive unique prefixes for toolkits: " + distinct);
77+
}
78+
79+
final int prefixLength = length;
80+
return encoded.entrySet().stream()
81+
.collect(toMap(Map.Entry::getKey, e -> e.getValue().substring(0, prefixLength)));
82+
}
83+
84+
public static int encode(
85+
McpAggregateRoute[] routes,
86+
Long2ObjectHashMap<String> ids,
87+
MutableDirectBuffer buffer,
88+
int offset)
89+
{
90+
int progress = offset;
91+
for (McpAggregateRoute route : routes)
92+
{
93+
final String id = ids.get(route.routedId());
94+
if (id == null)
95+
{
96+
continue;
97+
}
98+
if (progress > offset)
99+
{
100+
buffer.putByte(progress++, PAIR_DELIMITER);
101+
}
102+
progress += buffer.putStringWithoutLengthUtf8(progress, route.prefix());
103+
buffer.putByte(progress++, KEY_VALUE_DELIMITER);
104+
progress += buffer.putStringWithoutLengthUtf8(progress, id);
105+
}
106+
return progress == offset ? -1 : progress - offset;
107+
}
108+
109+
public static void decode(
110+
String aggregate,
111+
BiConsumer<String, String> visitor)
112+
{
113+
if (aggregate == null || aggregate.isEmpty())
114+
{
115+
return;
116+
}
117+
118+
int start = 0;
119+
final int length = aggregate.length();
120+
while (start < length)
121+
{
122+
int end = aggregate.indexOf((char) PAIR_DELIMITER, start);
123+
if (end < 0)
124+
{
125+
end = length;
126+
}
127+
final int sep = aggregate.indexOf((char) KEY_VALUE_DELIMITER, start);
128+
if (sep > start && sep < end)
129+
{
130+
final String prefix = aggregate.substring(start, sep);
131+
final String value = aggregate.substring(sep + 1, end);
132+
visitor.accept(prefix, value);
133+
}
134+
start = end + 1;
135+
}
136+
}
137+
138+
private static String encodeCrc32c(
139+
String toolkit)
140+
{
141+
final CRC32C crc = new CRC32C();
142+
crc.update(toolkit.getBytes(StandardCharsets.UTF_8));
143+
final long value = crc.getValue();
144+
final byte[] bytes = new byte[]
145+
{
146+
(byte) (value >>> 24),
147+
(byte) (value >>> 16),
148+
(byte) (value >>> 8),
149+
(byte) value
150+
};
151+
return Base64.getUrlEncoder().withoutPadding().encodeToString(bytes);
152+
}
153+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright 2021-2024 Aklivity Inc
3+
*
4+
* Licensed under the Aklivity Community License (the "License"); you may not use
5+
* this file except in compliance with the License. You may obtain a copy of the
6+
* License at
7+
*
8+
* https://www.aklivity.io/aklivity-community-license/
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
13+
* specific language governing permissions and limitations under the License.
14+
*/
15+
package io.aklivity.zilla.runtime.binding.mcp.internal.config;
16+
17+
public record McpAggregateRoute(
18+
String prefix,
19+
long routedId)
20+
{
21+
}

runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/config/McpBindingConfig.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import static io.aklivity.zilla.runtime.binding.mcp.config.McpElicitationConfig.DEFAULT_CALLBACK_PATH;
1818

1919
import java.util.ArrayList;
20+
import java.util.LinkedHashMap;
2021
import java.util.List;
2122
import java.util.Map;
2223
import java.util.Optional;
@@ -43,6 +44,8 @@ public final class McpBindingConfig
4344
public final GuardHandler guard;
4445
public final McpProxyCache cache;
4546
public final Map<String, McpProxySession> sessions;
47+
public final Map<String, McpRouteConfig> routeByPrefix;
48+
public final McpAggregateRoute[] aggregateRoutes;
4649

4750
private final List<McpRouteConfig> routes;
4851

@@ -57,6 +60,26 @@ public McpBindingConfig(
5760
.map(McpRouteConfig::new)
5861
.collect(Collectors.toList());
5962

63+
final Map<String, McpRouteConfig> routeByPrefix = new LinkedHashMap<>();
64+
if (routes.size() > 1)
65+
{
66+
final List<String> toolkits = routes.stream()
67+
.map(McpRouteConfig::toolkit)
68+
.collect(Collectors.toList());
69+
final Map<String, String> prefixesByToolkit = McpAggregateEventId.computePrefixes(toolkits);
70+
for (McpRouteConfig route : routes)
71+
{
72+
final String prefix = prefixesByToolkit.get(route.toolkit());
73+
routeByPrefix.put(prefix, route);
74+
}
75+
}
76+
this.routeByPrefix = routeByPrefix;
77+
78+
this.aggregateRoutes = routeByPrefix.entrySet().stream()
79+
.sorted(Map.Entry.comparingByKey())
80+
.map(e -> new McpAggregateRoute(e.getKey(), e.getValue().id))
81+
.toArray(McpAggregateRoute[]::new);
82+
6083
this.guard = Optional.ofNullable(options)
6184
.map(o -> o.authorization)
6285
.map(a -> a.name)

runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/config/McpRouteConfig.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public final class McpRouteConfig
4949

5050
private final List<ConditionMatcher> matchers;
5151
private final LongObjectPredicate<UnaryOperator<String>> authorized;
52+
private final String toolkit;
5253

5354
public McpRouteConfig(
5455
RouteConfig route)
@@ -60,6 +61,16 @@ public McpRouteConfig(
6061
.map(ConditionMatcher::new)
6162
.collect(toList());
6263
this.authorized = route.authorized;
64+
this.toolkit = matchers.stream()
65+
.map(m -> m.toolkit)
66+
.filter(t -> t != null)
67+
.findFirst()
68+
.orElse(null);
69+
}
70+
71+
public String toolkit()
72+
{
73+
return toolkit;
6374
}
6475

6576
public boolean authorized(
@@ -216,6 +227,7 @@ static String identifierOf(
216227

217228
private static final class ConditionMatcher
218229
{
230+
private final String toolkit;
219231
private final String toolsPrefix;
220232
private final String promptsPrefix;
221233
private final String resourcesPrefix;
@@ -225,6 +237,7 @@ private ConditionMatcher(
225237
{
226238
final List<String> capabilities = condition.capability;
227239
final String toolkit = condition.toolkit;
240+
this.toolkit = toolkit;
228241

229242
final boolean anyCapability = capabilities == null;
230243
final boolean tools = anyCapability || capabilities.contains(CAPABILITY_TOOLS);

0 commit comments

Comments
 (0)