Skip to content

Commit 0c7f17e

Browse files
committed
Merge branch 'develop' into claude/add-listchanged-support-epMnQ
Resolves conflicts in McpProxyLifecycleFactory.java: - Take union of FlushEx flyweight type imports (develop adds aggregate-id rewrite paths for elicitComplete/progress/resumable/list-changed variants on top of the basic McpFlushExFW import). - Drop duplicate mcpFlushExRW builder field created by auto-merge accepting both sides' independently-added declaration. Both sides' work is complementary: branch adds initialize-time listChanged advertisement + cache→list_changed wiring + diff-gated fan-out + ITs; develop (PR #1791) adds multi-route aggregate event IDs. Verified all 182 binding-mcp tests pass. https://claude.ai/code/session_01Gx5yC2CuFd54Fyoy7kL3qg
2 parents 2c50420 + d1922d5 commit 0c7f17e

39 files changed

Lines changed: 2209 additions & 91 deletions

File tree

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Copyright 2021-2024 Aklivity Inc
3+
*
4+
* Licensed under the Aklivity Community License (the "License"); you may not use
5+
* this file except in compliance with the License. You may obtain a copy of the
6+
* License at
7+
*
8+
* https://www.aklivity.io/aklivity-community-license/
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
13+
* specific language governing permissions and limitations under the License.
14+
*/
15+
package io.aklivity.zilla.runtime.binding.mcp.internal.config;
16+
17+
import static java.util.function.Function.identity;
18+
import static java.util.stream.Collectors.toMap;
19+
20+
import java.nio.charset.StandardCharsets;
21+
import java.util.Base64;
22+
import java.util.Collection;
23+
import java.util.HashSet;
24+
import java.util.Map;
25+
import java.util.Set;
26+
import java.util.function.BiConsumer;
27+
import java.util.zip.CRC32C;
28+
29+
import org.agrona.MutableDirectBuffer;
30+
import org.agrona.collections.Long2ObjectHashMap;
31+
32+
public final class McpAggregateEventId
33+
{
34+
static final byte PAIR_DELIMITER = (byte) ';';
35+
static final byte KEY_VALUE_DELIMITER = (byte) '=';
36+
static final int MAX_PREFIX_LENGTH = 6;
37+
38+
private McpAggregateEventId()
39+
{
40+
}
41+
42+
public static Map<String, String> computePrefixes(
43+
Collection<String> toolkits)
44+
{
45+
if (toolkits == null || toolkits.isEmpty())
46+
{
47+
return Map.of();
48+
}
49+
50+
final Set<String> distinct = new HashSet<>(toolkits);
51+
final Map<String, String> encoded = distinct.stream()
52+
.collect(toMap(identity(), McpAggregateEventId::encodeCrc32c));
53+
54+
int length = 1;
55+
while (length <= MAX_PREFIX_LENGTH)
56+
{
57+
final Set<String> seen = new HashSet<>();
58+
boolean unique = true;
59+
for (String code : encoded.values())
60+
{
61+
if (!seen.add(code.substring(0, length)))
62+
{
63+
unique = false;
64+
break;
65+
}
66+
}
67+
if (unique)
68+
{
69+
break;
70+
}
71+
length++;
72+
}
73+
74+
if (length > MAX_PREFIX_LENGTH)
75+
{
76+
throw new IllegalStateException("unable to derive unique prefixes for toolkits: " + distinct);
77+
}
78+
79+
final int prefixLength = length;
80+
return encoded.entrySet().stream()
81+
.collect(toMap(Map.Entry::getKey, e -> e.getValue().substring(0, prefixLength)));
82+
}
83+
84+
public static int encode(
85+
McpAggregateRoute[] routes,
86+
Long2ObjectHashMap<String> ids,
87+
MutableDirectBuffer buffer,
88+
int offset)
89+
{
90+
int progress = offset;
91+
for (McpAggregateRoute route : routes)
92+
{
93+
final String id = ids.get(route.routedId());
94+
if (id == null)
95+
{
96+
continue;
97+
}
98+
if (progress > offset)
99+
{
100+
buffer.putByte(progress++, PAIR_DELIMITER);
101+
}
102+
progress += buffer.putStringWithoutLengthUtf8(progress, route.prefix());
103+
buffer.putByte(progress++, KEY_VALUE_DELIMITER);
104+
progress += buffer.putStringWithoutLengthUtf8(progress, id);
105+
}
106+
return progress == offset ? -1 : progress - offset;
107+
}
108+
109+
public static void decode(
110+
String aggregate,
111+
BiConsumer<String, String> visitor)
112+
{
113+
if (aggregate == null || aggregate.isEmpty())
114+
{
115+
return;
116+
}
117+
118+
int start = 0;
119+
final int length = aggregate.length();
120+
while (start < length)
121+
{
122+
int end = aggregate.indexOf((char) PAIR_DELIMITER, start);
123+
if (end < 0)
124+
{
125+
end = length;
126+
}
127+
final int sep = aggregate.indexOf((char) KEY_VALUE_DELIMITER, start);
128+
if (sep > start && sep < end)
129+
{
130+
final String prefix = aggregate.substring(start, sep);
131+
final String value = aggregate.substring(sep + 1, end);
132+
visitor.accept(prefix, value);
133+
}
134+
start = end + 1;
135+
}
136+
}
137+
138+
private static String encodeCrc32c(
139+
String toolkit)
140+
{
141+
final CRC32C crc = new CRC32C();
142+
crc.update(toolkit.getBytes(StandardCharsets.UTF_8));
143+
final long value = crc.getValue();
144+
final byte[] bytes = new byte[]
145+
{
146+
(byte) (value >>> 24),
147+
(byte) (value >>> 16),
148+
(byte) (value >>> 8),
149+
(byte) value
150+
};
151+
return Base64.getUrlEncoder().withoutPadding().encodeToString(bytes);
152+
}
153+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright 2021-2024 Aklivity Inc
3+
*
4+
* Licensed under the Aklivity Community License (the "License"); you may not use
5+
* this file except in compliance with the License. You may obtain a copy of the
6+
* License at
7+
*
8+
* https://www.aklivity.io/aklivity-community-license/
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
13+
* specific language governing permissions and limitations under the License.
14+
*/
15+
package io.aklivity.zilla.runtime.binding.mcp.internal.config;
16+
17+
public record McpAggregateRoute(
18+
String prefix,
19+
long routedId)
20+
{
21+
}

runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/config/McpBindingConfig.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static io.aklivity.zilla.runtime.binding.mcp.internal.types.McpCapabilities.SERVER_TOOLS_LIST_CHANGED;
2121

2222
import java.util.ArrayList;
23+
import java.util.LinkedHashMap;
2324
import java.util.List;
2425
import java.util.Map;
2526
import java.util.Optional;
@@ -46,6 +47,8 @@ public final class McpBindingConfig
4647
public final GuardHandler guard;
4748
public final McpProxyCache cache;
4849
public final Map<String, McpProxySession> sessions;
50+
public final Map<String, McpRouteConfig> routeByPrefix;
51+
public final McpAggregateRoute[] aggregateRoutes;
4952

5053
private final List<McpRouteConfig> routes;
5154

@@ -60,6 +63,26 @@ public McpBindingConfig(
6063
.map(McpRouteConfig::new)
6164
.collect(Collectors.toList());
6265

66+
final Map<String, McpRouteConfig> routeByPrefix = new LinkedHashMap<>();
67+
if (routes.size() > 1)
68+
{
69+
final List<String> toolkits = routes.stream()
70+
.map(McpRouteConfig::toolkit)
71+
.collect(Collectors.toList());
72+
final Map<String, String> prefixesByToolkit = McpAggregateEventId.computePrefixes(toolkits);
73+
for (McpRouteConfig route : routes)
74+
{
75+
final String prefix = prefixesByToolkit.get(route.toolkit());
76+
routeByPrefix.put(prefix, route);
77+
}
78+
}
79+
this.routeByPrefix = routeByPrefix;
80+
81+
this.aggregateRoutes = routeByPrefix.entrySet().stream()
82+
.sorted(Map.Entry.comparingByKey())
83+
.map(e -> new McpAggregateRoute(e.getKey(), e.getValue().id))
84+
.toArray(McpAggregateRoute[]::new);
85+
6386
this.guard = Optional.ofNullable(options)
6487
.map(o -> o.authorization)
6588
.map(a -> a.name)

runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/config/McpRouteConfig.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public final class McpRouteConfig
4949

5050
private final List<ConditionMatcher> matchers;
5151
private final LongObjectPredicate<UnaryOperator<String>> authorized;
52+
private final String toolkit;
5253

5354
public McpRouteConfig(
5455
RouteConfig route)
@@ -60,6 +61,16 @@ public McpRouteConfig(
6061
.map(ConditionMatcher::new)
6162
.collect(toList());
6263
this.authorized = route.authorized;
64+
this.toolkit = matchers.stream()
65+
.map(m -> m.toolkit)
66+
.filter(t -> t != null)
67+
.findFirst()
68+
.orElse(null);
69+
}
70+
71+
public String toolkit()
72+
{
73+
return toolkit;
6374
}
6475

6576
public boolean authorized(
@@ -216,6 +227,7 @@ static String identifierOf(
216227

217228
private static final class ConditionMatcher
218229
{
230+
private final String toolkit;
219231
private final String toolsPrefix;
220232
private final String promptsPrefix;
221233
private final String resourcesPrefix;
@@ -225,6 +237,7 @@ private ConditionMatcher(
225237
{
226238
final List<String> capabilities = condition.capability;
227239
final String toolkit = condition.toolkit;
240+
this.toolkit = toolkit;
228241

229242
final boolean anyCapability = capabilities == null;
230243
final boolean tools = anyCapability || capabilities.contains(CAPABILITY_TOOLS);

0 commit comments

Comments
 (0)