Support aggregate event IDs for multi-toolkit MCP proxy#1791
Conversation
…ecycle In multi-route mode each upstream MCP server mints SSE event ids in its own namespace, so forwarding them verbatim collides on the agent's merged stream and breaks Last-Event-ID recovery on reconnect. The proxy now derives a unique short prefix per route from CRC32C of its `when[].toolkit` (URL-safe base64, shortest unique length), then on each outbound FlushEx that carries an id (resumable, listChanged, progress, elicitComplete) rewrites the id to a canonical sorted `<prefix>=<id>;<prefix>=<id>` aggregate of the latest known per-route ids. On inbound McpResumeChallengeEx the aggregate is decoded back to per-route ids and dispatched to each route's lifecycle client, opening clients lazily when needed; unknown prefixes are skipped without rejecting the resume. Single-route configs and the hydrater self-loop bypass aggregation entirely. McpBindingConfig validates that every route declares a toolkit when more than one route is configured.
Adds engine-driven McpProxyIT scenarios that exercise the demux path on McpLifecycleServer.onServerChallenge end-to-end: - lifecycle.events.resume.aggregate.event.id — client injects a resume challenge with the aggregate id "2=200;S=100"; the proxy decodes it, opens both upstream lifecycle clients lazily, and the upstreams each receive a resume challenge carrying their per-route id. When bluesky subsequently emits toolsListChanged id=101, the agent reads "2=200;S=101", proving the aggregate snapshot retained the inbound per-route ids. - lifecycle.events.resume.missing.prefix — client injects an aggregate containing an unknown prefix X plus a known prefix S; the proxy skips X (no route bound to that prefix) and only dispatches the bluesky route. When bluesky emits id=101 the agent reads "S=101". The challenge is injected via `read advise zilla:challenge` on the script's connect side — the same active-emit primitive the elicit scripts use on the accept side. This is the script analogue of mcp.server translating an HTTP Last-Event-ID header into a ChallengeFW on the lifecycle stream. Also fixes McpLifecycleServer.dispatchAggregateResume to seed lastEventIdsByPrefix with the per-route ids decoded from the inbound aggregate, so the first post-resume FlushEx from any one route still emits a complete aggregate covering every route present in the original Last-Event-ID rather than just the route that just emitted.
Tightens the mcp binding schema so `toolkit` is required on every
`when` item — and only permitted on `kind: proxy`. The `cache` option
is likewise restricted to `kind: proxy`. Adds invalid configs and
SchemaTest cases asserting that misconfigured server-kind bindings
(cache option, toolkit-bearing when) and proxy-kind routes missing
toolkit are rejected at config-parse time.
Drops the runtime IllegalArgumentException in McpBindingConfig that
checked multi-route toolkit presence; the schema now enforces this
earlier (config-parse, not engine startup), making the runtime check
dead code.
Splits the multi-route resume scenarios into prefixed and non-prefixed
variants per repo convention so both forms have peer-to-peer
ApplicationIT coverage and engine-driven McpProxyIT coverage:
- lifecycle.events.resume.aggregate.event.id{.prefixed}/
- lifecycle.events.resume.missing.prefix{.prefixed}/
The prefixed/client.rpt connects to the proxy's app surface and
injects the aggregate id; the non-prefixed/client.rpt simulates what
the proxy would do upstream — two direct connects, one per route,
each carrying the per-route id. The shared server.rpt observes the
per-route challenges via `write advised zilla:challenge`.
Replaces the StringBuilder-backed encode that allocated a StringBuilder plus a String per outbound FlushEx with a buffer-based encode that writes UTF-8 bytes directly into a factory-level MutableDirectBuffer and returns the byte count. McpLifecycleServer.mintAggregateEventId now wraps the encoded slice in a reusable OctetsFW (null if no route has emitted yet); the rewriteFlushExWithAggregateId helper passes buffer/offset/length to the generated id(DirectBuffer, int, int) builder overload on every applicable FlushEx variant (resumable, toolsListChanged, promptsListChanged, resourcesListChanged, progress, elicitComplete). McpAggregateEventIdTest exercises the new buffer API (including encode at a non-zero offset) and reads the encoded bytes back via getStringWithoutLengthUtf8 for assertion.
Tightens names that encoded preconditions or context that is already
clear from scope:
- McpBindingConfig.sortedPrefixes -> prefixes
- McpBindingConfig.sortedRoutedIdsByPrefix -> routedIds
- McpBindingConfig.prefixByRoutedId dropped (was never read)
- McpAggregateEventId.encode parameters
prefixesSortedAscending / idsAlignedWithPrefixes -> prefixes / ids
- McpProxyLifecycleFactory:
flushCodecBuffer -> flushExBuffer
aggregateIdBuffer -> aggregateBuffer
aggregateIdRO -> aggregateRO
lastEventIdsByPrefix -> eventIds
pendingResumeId -> resumeId
recordRouteEventId(sourceRoutedId, perRouteId) -> recordEventId(routedId, id)
mintAggregateEventId() -> mintAggregate()
dispatchAggregateResume(...) -> dispatchResume(...)
extractEventId(...) -> eventIdOf(...) (matches the existing
capabilityOf / identifierOf getter convention)
rewriteFlushExWithAggregateId(...) -> rewriteFlushEx(...)
Pure rename — no behavioural change. All 188 tests + checkstyle + license
remain green.
Resolves the "prefix"-overloading in the previous scenario names: - "prefix" in `missing.prefix` referred to the *toolkit* routing prefix - ".prefixed" suffix is the convention marker for the engine-driven variant (per tools.list.toolkit.multi.prefixed) The two collided, so `resume.missing.prefix.prefixed` read as nonsense. Renames: - lifecycle.events.resume.aggregate.event.id -> lifecycle.events.resume.aggregate - lifecycle.events.resume.aggregate.event.id.prefixed -> lifecycle.events.resume.aggregate.prefixed - lifecycle.events.resume.missing.prefix -> lifecycle.events.resume.partial - lifecycle.events.resume.missing.prefix.prefixed -> lifecycle.events.resume.partial.prefixed `event.id` is redundant once `resume` is in the name; `partial` describes the outcome (only the resolvable prefixes are dispatched) without re-using the "prefix" word. Test methods follow: - shouldResumeLifecycleEventsWithAggregateEventId -> shouldResumeLifecycleEventsAggregate - shouldResumeLifecycleEventsWithMissingPrefix -> shouldResumeLifecycleEventsPartial
| final int reserved = flush.reserved(); | ||
| final OctetsFW extension = flush.extension(); | ||
|
|
||
| OctetsFW forwardExtension = extension; |
There was a problem hiding this comment.
| OctetsFW forwardExtension = extension; | |
| OctetsFW newExtension = extension; |
| if (server.aggregating() && extension.sizeof() > 0) | ||
| { | ||
| final McpFlushExFW flushEx = | ||
| mcpFlushExRO.tryWrap(extension.buffer(), extension.offset(), extension.limit()); | ||
| if (flushEx != null) | ||
| { |
There was a problem hiding this comment.
Let's assume flushEx is non-null, so no need to check for extension.sizeof() > 0 either.
| final String perRouteId = extractEventId(flushEx); | ||
| if (perRouteId != null) | ||
| { | ||
| server.recordRouteEventId(routedId, perRouteId); |
There was a problem hiding this comment.
| server.recordRouteEventId(routedId, perRouteId); | |
| server.onDecodeEventId(routedId, perRouteId); |
| mcpFlushExRO.tryWrap(extension.buffer(), extension.offset(), extension.limit()); | ||
| if (flushEx != null) | ||
| { | ||
| final String perRouteId = extractEventId(flushEx); |
There was a problem hiding this comment.
| final String perRouteId = extractEventId(flushEx); | |
| final String eventId = extractEventId(flushEx); |
| if (perRouteId != null) | ||
| { | ||
| server.recordRouteEventId(routedId, perRouteId); | ||
| final OctetsFW aggregateId = server.mintAggregateEventId(); |
There was a problem hiding this comment.
| final OctetsFW aggregateId = server.mintAggregateEventId(); | |
| final OctetsFW aggregateId = server.nextEventId(); |
|
|
||
| @Test | ||
| public void shouldValidateProxyToolkitMulti() | ||
| { | ||
| JsonObject config = schema.validate("proxy.toolkit.multi.yaml"); | ||
|
|
||
| assertThat(config, not(nullValue())); | ||
| } | ||
|
|
||
| @Test(expected = JsonValidatingException.class) | ||
| public void shouldRejectProxyRouteMissingToolkit() | ||
| { | ||
| schema.validate("proxy.routes.missing.toolkit.invalid.yaml"); | ||
| } | ||
|
|
||
| @Test(expected = JsonValidatingException.class) | ||
| public void shouldRejectServerWithCache() | ||
| { | ||
| schema.validate("server.cache.invalid.yaml"); | ||
| } | ||
|
|
||
| @Test(expected = JsonValidatingException.class) | ||
| public void shouldRejectServerRouteWithToolkit() | ||
| { | ||
| schema.validate("server.toolkit.invalid.yaml"); | ||
| } |
There was a problem hiding this comment.
Just need a positive test one each for server, proxy and client with all the supported properties set.
| final String[] prefixes = routeByPrefix.keySet().stream() | ||
| .sorted(Comparator.naturalOrder()) | ||
| .toArray(String[]::new); | ||
| final long[] routedIds = new long[prefixes.length]; | ||
| for (int i = 0; i < prefixes.length; i++) | ||
| { | ||
| routedIds[i] = routeByPrefix.get(prefixes[i]).id; | ||
| } | ||
| this.prefixes = prefixes; | ||
| this.routedIds = routedIds; |
There was a problem hiding this comment.
Let's define a record and have a single array of records, instead of parallel arrays with same dimensions.
| final List<String> distinct = new ArrayList<>(new HashSet<>(toolkits)); | ||
| if (distinct.size() == 1) | ||
| { | ||
| return Map.of(distinct.get(0), encodeCrc32c(distinct.get(0)).substring(0, 1)); | ||
| } | ||
|
|
There was a problem hiding this comment.
We don't need this special case optimization on startup.
| final Map<String, String> encoded = new HashMap<>(); | ||
| for (String toolkit : distinct) | ||
| { | ||
| encoded.put(toolkit, encodeCrc32c(toolkit)); | ||
| } |
There was a problem hiding this comment.
Use toolkits.stream().collect(toMap(k -> k, this::encodeCrc32c)).
| final Map<String, String> result = new HashMap<>(); | ||
| for (Map.Entry<String, String> entry : encoded.entrySet()) | ||
| { | ||
| result.put(entry.getKey(), entry.getValue().substring(0, length)); | ||
| } | ||
| return result; |
There was a problem hiding this comment.
Use encoded.entrySet().stream().collect(toMap(k -> k, v -> v.substring(0, length)).
Naming and structure tweaks from the review: - McpBindingConfig: replace parallel arrays (prefixes[], routedIds[]) with a single McpAggregateRoute[] (new record (prefix, routedId)). - McpAggregateEventId.computePrefixes: drop the redundant single-toolkit special case (the general loop produces the same 1-char prefix); replace explicit Map builders with stream + toMap collectors. - McpAggregateEventId.encode: signature now takes McpAggregateRoute[] directly instead of a parallel String[] prefixes argument. - McpLifecycleServer.recordEventId -> onDecodeEventId (event-handler naming consistent with on*/do* convention). - McpLifecycleServer.mintAggregate -> nextEventId. - McpProxyLifecycleFactory.eventIdOf -> extractEventId. - onClientFlush now just forwards to doServerFlush(... routedId); the aggregation transform lives on doServerFlush where it can read its own server state directly. Inside, drop the extension.sizeof() > 0 guard and use wrap (not tryWrap) — the FlushEx is non-null on this path. - McpLifecycleClient.doClientResume: drop the resumeId parameter; the client's own resumeId field is read via a `this::injectResumeId` method reference on the builder. - McpLifecycleClient.sessionId: now private with a sessionId() accessor; external callers in McpProxyItemFactory / McpProxyListFactory updated (the McpLifecycleServer.sessionId field — a different class — stays package-private). - rewriteFlushEx locals renamed to (buffer, offset, length) to avoid shadowing the factory-level aggregateBuffer field. - forwardExtension -> newExtension. SchemaTest: positive cases for each kind with all supported options populated (server.options.yaml, proxy.options.yaml, client.options.yaml). Pure refactor — all 18 unit + 171 IT tests still green.
| final String identifier = server.identifier; | ||
| final String upstreamSessionId = lifecycle.sessionId; | ||
| final String upstreamSessionId = lifecycle.sessionId(); | ||
| final String outboundSessionId = upstreamSessionId != null | ||
| ? upstreamSessionId |
There was a problem hiding this comment.
Does it make sense to do this at the server call site instead, and pass sessionId as a parameter?
| this.writeBuffer = context.writeBuffer(); | ||
| this.codecBuffer = new UnsafeBuffer(new byte[context.writeBuffer().capacity()]); | ||
| this.flushExBuffer = new UnsafeBuffer(new byte[context.writeBuffer().capacity()]); | ||
| this.aggregateBuffer = new UnsafeBuffer(new byte[context.writeBuffer().capacity()]); |
There was a problem hiding this comment.
This is too large, agree?
| for (int i = 0; i < aggregateRoutes.length; i++) | ||
| { | ||
| if (aggregateRoutes[i].routedId() == routedId) | ||
| { | ||
| eventIds[i] = eventId; | ||
| break; | ||
| } | ||
| } |
There was a problem hiding this comment.
If we need lookup and store by routedId, should we store as Long2ObjectHashMap instead of an array?
This has zero-allocate iterator, plus deterministic ordering.
| if (McpState.replyOpened(client.state)) | ||
| { | ||
| client.doClientResume(traceId, authorization); | ||
| } | ||
| else | ||
| { | ||
| client.doClientBegin(traceId); | ||
| } |
There was a problem hiding this comment.
If we make doClientBegin a no-op if initialOpening and set that state when emitting initial BEGIN, and we also make doClientResume a no-op if !replyOpened, then this becomes client.doClientBegin(...) and client.doClientResume(...) unconditionally.
| final McpRouteConfig route = binding.routeByPrefix.get(prefix); | ||
| if (route != null) | ||
| { | ||
| onDecodeEventId(route.id, eventId); | ||
| final McpLifecycleClient client = supplyClient(route.id); | ||
| client.resumeId = eventId; | ||
| if (McpState.replyOpened(client.state)) | ||
| { | ||
| client.doClientResume(traceId, authorization); | ||
| } | ||
| else | ||
| { | ||
| client.doClientBegin(traceId); | ||
| } |
There was a problem hiding this comment.
Move this to a separate method to make the dispatchResume method a one-liner, then inline to dispatchResume call site.
| final McpFlushExFW flushEx = | ||
| mcpFlushExRO.wrap(extension.buffer(), extension.offset(), extension.limit()); | ||
| final String eventId = extractEventId(flushEx); | ||
| if (eventId != null) | ||
| { | ||
| onDecodeEventId(fromRoutedId, eventId); |
There was a problem hiding this comment.
This part should be in onClientFlush, the rest should remain here.
| String aggregate = null; | ||
| if (challengeEx != null && challengeEx.kind() == McpChallengeExFW.KIND_RESUME) | ||
| { | ||
| final String16FW resumeId = challengeEx.resume().id(); | ||
| if (resumeId != null && resumeId.length() != -1) | ||
| { | ||
| aggregate = resumeId.asString(); | ||
| } | ||
| } |
There was a problem hiding this comment.
| String aggregate = null; | |
| if (challengeEx != null && challengeEx.kind() == McpChallengeExFW.KIND_RESUME) | |
| { | |
| final String16FW resumeId = challengeEx.resume().id(); | |
| if (resumeId != null && resumeId.length() != -1) | |
| { | |
| aggregate = resumeId.asString(); | |
| } | |
| } | |
| if (challengeEx != null && | |
| challengeEx.kind() == McpChallengeExFW.KIND_RESUME) | |
| { | |
| String aggregate = challengeEx.resume().id().asString(); | |
| ... other statements dependent on challenge resume kind | |
| } |
| .resume(b -> {}) | ||
| .resume(this::injectResumeId) | ||
| .build(); | ||
| doClientChallenge(traceId, authorization, resumeEx); |
There was a problem hiding this comment.
| doClientChallenge(traceId, authorization, resumeEx); | |
| doClientChallenge(traceId, authorization, resumeEx); | |
| resumeId = null; |
| private long decodedParserProgress; // absolute streamOffset of buffer[offset] passed to decode | ||
| private int decodeDepth; // JSON nesting depth in the reply envelope | ||
| private int decodeItemDepth; // JSON nesting depth within the current item | ||
| private int decodeSkipDepth; // JSON nesting depth within a skipped value | ||
| private long decodedItemProgress = -1; // streamOffset of last byte emitted within the current item, -1 between items |
| final String upstreamSessionId = lifecycle.sessionId(); | ||
| final String sid = upstreamSessionId != null ? upstreamSessionId : server.lifecycle.sessionId; |
There was a problem hiding this comment.
Why not just guarantee that McpClientLifecycle stream has the correct sessionId perhaps derived from McpServerLifecycle stream, and then always use the client lifecycle sessionId here.
- Long2ObjectHashMap<String> replaces String[] eventIds for cleaner lookup and storage keyed by routedId - aggregateBuffer sized to fixed 1024 bytes instead of writeBuffer capacity - doClientResume guards on replyOpened and clears resumeId on send - doClientBegin pre-sets lifecycle sessionId from server, allowing list factory to drop fallback to server.lifecycle.sessionId - McpItem.doClientBegin accepts sessionId as parameter from server call site instead of computing locally - resumeClient extracted from dispatchResume lambda; onServerChallenge inlines aggregate decode under nested KIND_RESUME guard - onClientFlush records per-route event id; doServerFlush now only rewrites the aggregate - SchemaTest keeps a single positive options test per kind with all supported properties, drops the redundant toolkit-multi test
| { | ||
| lifecycle.doClientBegin(traceId); | ||
|
|
||
| final String identifier = server.identifier; |
There was a problem hiding this comment.
Should identifier also be a parameter?
| if (aggregate != null && aggregating()) | ||
| { | ||
| McpAggregateEventId.decode(aggregate, | ||
| (prefix, eventId) -> resumeClient(traceId, authorization, prefix, eventId)); |
There was a problem hiding this comment.
| (prefix, eventId) -> resumeClient(traceId, authorization, prefix, eventId)); | |
| (prefix, eventId) -> onDecodeAggregateEventId(traceId, authorization, prefix, eventId)); |
There was a problem hiding this comment.
| long originId, | |
| long routedId, |
There was a problem hiding this comment.
| this.originId = originId; | |
| this.routedId = routedId; |
There was a problem hiding this comment.
| sender = newStream(this::onClientMessage, originId, routedId, initialId, |
Same for other usages.
- Pass identifier as a parameter to McpItem.doClientBegin from the server call site instead of pulling from server.identifier inside - Rename resumeClient to onDecodeAggregateEventId for symmetry with onDecodeEventId - Replace rewriteFlushEx with kind-specific inject methods dispatched from a generic injectFlushEx; inline the builder chain at the call site in doServerFlush - Drop redundant shouldValidateServer; the with-options positive tests cover the supported properties for each kind
- Remove unused McpProxyItemFactory.McpServer.prefix field, its constructor parameter, the call-site argument, and the local variable from the resolve block - Inline McpProxyItemFactory.McpServer.sessionId() to direct lifecycle.sessionId field access at the two call sites - Promote McpProxyLifecycleFactory.McpLifecycleClient.sessionId to package-private and drop the sessionId() accessor; the sole external caller now reads the field directly
McpLifecycleClient, McpProxyItemFactory.McpClient, and McpProxyListFactory.McpListClient now hold originId and routedId as final fields set once in the constructor, instead of recomputing server.lifecycle.originId / server.routedId at every do* call site. The resolvedId field is renamed to routedId for consistency with the existing initialId/replyId/originId/routedId naming convention.
McpProxyIT references @configuration("proxy.toolkit.multi.yaml") for six multi-toolkit and aggregate-resume tests. The yaml was removed along with the redundant SchemaTest case, but the IT still depends on it for the multi-route proxy configuration. Local tests masked the gap because target/classes retained a stale copy; clean CI builds hit NullPointerException loading the missing config.
Resolves conflicts in McpProxyLifecycleFactory.java: - Take union of FlushEx flyweight type imports (develop adds aggregate-id rewrite paths for elicitComplete/progress/resumable/list-changed variants on top of the basic McpFlushExFW import). - Drop duplicate mcpFlushExRW builder field created by auto-merge accepting both sides' independently-added declaration. Both sides' work is complementary: branch adds initialize-time listChanged advertisement + cache→list_changed wiring + diff-gated fan-out + ITs; develop (PR #1791) adds multi-route aggregate event IDs. Verified all 182 binding-mcp tests pass. https://claude.ai/code/session_01Gx5yC2CuFd54Fyoy7kL3qg
Description
Implements support for aggregate event IDs in the MCP proxy binding to enable proper event tracking and resumption across multiple toolkit routes.
Changes
Core Implementation:
McpAggregateEventIdutility class to encode/decode composite event IDs using toolkit-specific prefixes (e.g.,"S=100;2=200")Lifecycle Management:
McpLifecycleServerto track per-route event IDs vialastEventIdsByPrefixarrayaggregating()check to determine if a session spans multiple routesrecordRouteEventId()to update tracked IDs from individual routesmintAggregateEventId()to generate composite IDs for client notificationdispatchAggregateResume()to decompose aggregate IDs and resume individual routesChallenge/Flush Handling:
McpChallengeExFWextensionsMcpFlushExFWto handle tools list changed notifications with aggregate event IDsConfiguration:
McpBindingConfigto compute and store sorted prefixes and routed IDs for efficient lookuptoolkitfield in route conditions for proxy bindingscacheoption for server bindingsTesting
McpAggregateEventIdTestcovering:Fixes #(issue)
https://claude.ai/code/session_018egHGhhcAWAQ1qZb6G6eva