Skip to content

Commit 472f49a

Browse files
committed
fix(binding-mcp): hold cache lifecycle lock for binding lifetime
McpProxyCacheManager previously released the cache lifecycle lock the moment initial hydrate completed (in onCacheReady), opening a window where a different worker's reconnect retry would acquire the lock and also open its own lifecycle stream to upstream. The result was multiple workers redundantly subscribing to the upstream SSE for the same binding — wasted resources, and harder to reason about which worker drives TTL refreshes. Move releaseLifecycle out of onCacheReady. The lifecycle lock is acquired once at binding attach (or after lifecycle abort + reconnect) and held until detach / engine shutdown. Loser workers keep retrying the acquire; their attempts only succeed if the holder dies and the lock TTL expires (or releases explicitly on detach). This matches MCP semantics: one worker per binding owns the upstream SSE; others serve their agents from the shared cache populated via the store-watch propagation path. Migrate the per-kind and lifecycle locks in McpProxyCache from putIfAbsent/delete to lock/unlock from PR #1790. Ownership-checked unlock means a worker that never acquired the lock cannot accidentally release another worker's lock (which the old unauthenticated delete allowed). Token state is held on McpListCache and McpProxyCache; null token short-circuits release as a no-op. Engine TestStoreHandler updated to share watchers and locks per storeConfig.id (mirroring how entries are already shared via TestStoreContext.supplyEntries). The watcher record carries the registering worker's signaler so cross-worker notify dispatches listener invocations onto the registering worker's I/O thread, matching the contract documented on StoreHandler. Without these fixes cross-worker watch propagation either didn't fire at all or fired on the wrong thread. https://claude.ai/code/session_01Gx5yC2CuFd54Fyoy7kL3qg
1 parent bf62cdb commit 472f49a

7 files changed

Lines changed: 126 additions & 32 deletions

File tree

runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/stream/cache/McpProxyCache.java

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ public final class McpProxyCache
4444
private static final String STORE_KEY_RESOURCES = "resources";
4545
private static final String STORE_KEY_PROMPTS = "prompts";
4646
private static final String STORE_LOCK_SUFFIX = ".lock";
47-
private static final String STORE_LOCK_VALUE = "1";
4847
private static final String STORE_LOCK_KEY_TOOLS = STORE_KEY_TOOLS + STORE_LOCK_SUFFIX;
4948
private static final String STORE_LOCK_KEY_RESOURCES = STORE_KEY_RESOURCES + STORE_LOCK_SUFFIX;
5049
private static final String STORE_LOCK_KEY_PROMPTS = STORE_KEY_PROMPTS + STORE_LOCK_SUFFIX;
@@ -67,6 +66,7 @@ public final class McpProxyCache
6766
private final CRC32 crc32 = new CRC32();
6867

6968
boolean populated;
69+
private String lifecycleLockToken;
7070

7171
Runnable onReady;
7272
public OnSettled onSettled;
@@ -142,14 +142,26 @@ public void register(
142142
void acquireLifecycle(
143143
Consumer<Boolean> completion)
144144
{
145-
store.putIfAbsent(STORE_LOCK_KEY_LIFECYCLE, STORE_LOCK_VALUE, leaseTtl,
146-
prior -> completion.accept(prior == null));
145+
store.lock(STORE_LOCK_KEY_LIFECYCLE, leaseTtl, (k, t) ->
146+
{
147+
lifecycleLockToken = t;
148+
completion.accept(t != null);
149+
});
147150
}
148151

149152
void releaseLifecycle(
150153
Consumer<String> completion)
151154
{
152-
store.delete(STORE_LOCK_KEY_LIFECYCLE, completion);
155+
final String token = lifecycleLockToken;
156+
lifecycleLockToken = null;
157+
if (token != null)
158+
{
159+
store.unlock(STORE_LOCK_KEY_LIFECYCLE, token, completion);
160+
}
161+
else
162+
{
163+
completion.accept(null);
164+
}
153165
}
154166

155167
void onPurged(
@@ -193,6 +205,7 @@ public final class McpListCache
193205

194206
boolean populated;
195207
private long lastChecksum = -1L;
208+
private String lockToken;
196209

197210
private McpListCache(
198211
int kind,
@@ -231,14 +244,26 @@ public void put(
231244
public void acquire(
232245
Consumer<Boolean> completion)
233246
{
234-
store.putIfAbsent(storeLockKey, STORE_LOCK_VALUE, leaseTtl,
235-
prior -> completion.accept(prior == null));
247+
store.lock(storeLockKey, leaseTtl, (k, t) ->
248+
{
249+
lockToken = t;
250+
completion.accept(t != null);
251+
});
236252
}
237253

238254
public void release(
239255
Consumer<String> completion)
240256
{
241-
store.delete(storeLockKey, completion);
257+
final String token = lockToken;
258+
lockToken = null;
259+
if (token != null)
260+
{
261+
store.unlock(storeLockKey, token, completion);
262+
}
263+
else
264+
{
265+
completion.accept(null);
266+
}
242267
}
243268

244269
public Closeable watch(

runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/stream/cache/McpProxyCacheManager.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,6 @@ private void onCacheReady()
177177
{
178178
return;
179179
}
180-
cache.releaseLifecycle(k -> {});
181180
scheduleRefresh();
182181
}
183182

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright 2021-2024 Aklivity Inc.
3+
*
4+
* Aklivity licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package io.aklivity.zilla.runtime.engine.test.internal.store;
17+
18+
record TestLockEntry(
19+
String token,
20+
long expiresAt)
21+
{
22+
}

runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestStore.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.aklivity.zilla.runtime.engine.test.internal.store;
1717

1818
import java.net.URL;
19+
import java.util.List;
1920
import java.util.concurrent.ConcurrentHashMap;
2021
import java.util.concurrent.ConcurrentMap;
2122

@@ -28,11 +29,15 @@ public final class TestStore implements Store
2829
public static final String NAME = "test";
2930

3031
private final ConcurrentMap<Long, ConcurrentMap<String, String>> storage;
32+
private final ConcurrentMap<Long, ConcurrentMap<String, List<TestWatcher>>> storageListeners;
33+
private final ConcurrentMap<Long, ConcurrentMap<String, TestLockEntry>> storageLocks;
3134

3235
public TestStore(
3336
Configuration config)
3437
{
3538
this.storage = new ConcurrentHashMap<>();
39+
this.storageListeners = new ConcurrentHashMap<>();
40+
this.storageLocks = new ConcurrentHashMap<>();
3641
}
3742

3843
@Override
@@ -51,12 +56,24 @@ public URL type()
5156
public TestStoreContext supply(
5257
EngineContext context)
5358
{
54-
return new TestStoreContext(context, this::acquireEntries);
59+
return new TestStoreContext(context, this::acquireEntries, this::acquireListeners, this::acquireLocks);
5560
}
5661

5762
private ConcurrentMap<String, String> acquireEntries(
5863
long storeId)
5964
{
6065
return storage.computeIfAbsent(storeId, id -> new ConcurrentHashMap<>());
6166
}
67+
68+
private ConcurrentMap<String, List<TestWatcher>> acquireListeners(
69+
long storeId)
70+
{
71+
return storageListeners.computeIfAbsent(storeId, id -> new ConcurrentHashMap<>());
72+
}
73+
74+
private ConcurrentMap<String, TestLockEntry> acquireLocks(
75+
long storeId)
76+
{
77+
return storageLocks.computeIfAbsent(storeId, id -> new ConcurrentHashMap<>());
78+
}
6279
}

runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestStoreContext.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.aklivity.zilla.runtime.engine.test.internal.store;
1717

18+
import java.util.List;
1819
import java.util.concurrent.ConcurrentMap;
1920
import java.util.function.LongFunction;
2021

@@ -28,13 +29,19 @@ public final class TestStoreContext implements StoreContext
2829
{
2930
private final Signaler signaler;
3031
private final LongFunction<ConcurrentMap<String, String>> supplyEntries;
32+
private final LongFunction<ConcurrentMap<String, List<TestWatcher>>> supplyListeners;
33+
private final LongFunction<ConcurrentMap<String, TestLockEntry>> supplyLocks;
3134

3235
public TestStoreContext(
3336
EngineContext context,
34-
LongFunction<ConcurrentMap<String, String>> supplyEntries)
37+
LongFunction<ConcurrentMap<String, String>> supplyEntries,
38+
LongFunction<ConcurrentMap<String, List<TestWatcher>>> supplyListeners,
39+
LongFunction<ConcurrentMap<String, TestLockEntry>> supplyLocks)
3540
{
3641
this.signaler = context.signaler();
3742
this.supplyEntries = supplyEntries;
43+
this.supplyListeners = supplyListeners;
44+
this.supplyLocks = supplyLocks;
3845
}
3946

4047
@Override
@@ -46,7 +53,9 @@ public TestStoreHandler attach(
4653
{
4754
options.entries.forEach(entries::putIfAbsent);
4855
}
49-
return new TestStoreHandler(store, signaler, entries);
56+
final ConcurrentMap<String, List<TestWatcher>> listeners = supplyListeners.apply(store.id);
57+
final ConcurrentMap<String, TestLockEntry> locks = supplyLocks.apply(store.id);
58+
return new TestStoreHandler(store, signaler, entries, listeners, locks);
5059
}
5160

5261
@Override

runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestStoreHandler.java

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.util.List;
2121
import java.util.Objects;
2222
import java.util.UUID;
23-
import java.util.concurrent.ConcurrentHashMap;
2423
import java.util.concurrent.ConcurrentMap;
2524
import java.util.concurrent.CopyOnWriteArrayList;
2625
import java.util.function.BiConsumer;
@@ -33,19 +32,21 @@
3332
public final class TestStoreHandler implements StoreHandler
3433
{
3534
private final ConcurrentMap<String, String> entries;
36-
private final ConcurrentMap<String, List<BiConsumer<String, String>>> listeners;
35+
private final ConcurrentMap<String, List<TestWatcher>> listeners;
3736
private final ConcurrentMap<String, TestLockEntry> locks;
3837
private final Signaler signaler;
3938

4039
public TestStoreHandler(
4140
StoreConfig store,
4241
Signaler signaler,
43-
ConcurrentMap<String, String> entries)
42+
ConcurrentMap<String, String> entries,
43+
ConcurrentMap<String, List<TestWatcher>> listeners,
44+
ConcurrentMap<String, TestLockEntry> locks)
4445
{
4546
this.entries = Objects.requireNonNull(entries);
4647
this.signaler = Objects.requireNonNull(signaler);
47-
this.listeners = new ConcurrentHashMap<>();
48-
this.locks = new ConcurrentHashMap<>();
48+
this.listeners = Objects.requireNonNull(listeners);
49+
this.locks = Objects.requireNonNull(locks);
4950
}
5051

5152
@Override
@@ -121,7 +122,7 @@ public void lock(
121122
final String token = UUID.randomUUID().toString();
122123
final TestLockEntry candidate = new TestLockEntry(token, expiresAt);
123124
TestLockEntry existing = locks.putIfAbsent(key, candidate);
124-
if (existing != null && existing.expiresAt <= now)
125+
if (existing != null && existing.expiresAt() <= now)
125126
{
126127
existing = locks.replace(key, existing, candidate) ? null : locks.get(key);
127128
}
@@ -138,14 +139,14 @@ public void unlock(
138139
final long now = System.currentTimeMillis();
139140
final TestLockEntry current = locks.get(key);
140141
final String result;
141-
if (current != null && current.expiresAt > now && current.token.equals(token))
142+
if (current != null && current.expiresAt() > now && current.token().equals(token))
142143
{
143144
locks.remove(key, current);
144145
result = token;
145146
}
146147
else
147148
{
148-
if (current != null && current.expiresAt <= now)
149+
if (current != null && current.expiresAt() <= now)
149150
{
150151
locks.remove(key, current);
151152
}
@@ -159,14 +160,15 @@ public Closeable watch(
159160
String key,
160161
BiConsumer<String, String> listener)
161162
{
162-
final List<BiConsumer<String, String>> list = listeners.computeIfAbsent(key, k -> new CopyOnWriteArrayList<>());
163-
list.add(listener);
163+
final TestWatcher watcher = new TestWatcher(listener, signaler);
164+
final List<TestWatcher> list = listeners.computeIfAbsent(key, k -> new CopyOnWriteArrayList<>());
165+
list.add(watcher);
164166
return () ->
165167
{
166-
final List<BiConsumer<String, String>> current = listeners.get(key);
168+
final List<TestWatcher> current = listeners.get(key);
167169
if (current != null)
168170
{
169-
current.remove(listener);
171+
current.remove(watcher);
170172
}
171173
};
172174
}
@@ -175,13 +177,13 @@ private void notifyListeners(
175177
String key,
176178
String value)
177179
{
178-
final List<BiConsumer<String, String>> list = listeners.get(key);
180+
final List<TestWatcher> list = listeners.get(key);
179181
if (list != null && !list.isEmpty())
180182
{
181183
final long now = System.currentTimeMillis();
182-
for (BiConsumer<String, String> listener : list)
184+
for (TestWatcher w : list)
183185
{
184-
signaler.signalAt(now, 0, ignored -> listener.accept(key, value));
186+
w.signaler().signalAt(now, 0, ignored -> w.listener().accept(key, value));
185187
}
186188
}
187189
}
@@ -192,10 +194,4 @@ private void defer(
192194
// contract: callback fires strictly later than the call, on the caller's I/O thread
193195
signaler.signalAt(System.currentTimeMillis(), 0, ignored -> task.run());
194196
}
195-
196-
private record TestLockEntry(
197-
String token,
198-
long expiresAt)
199-
{
200-
}
201197
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright 2021-2024 Aklivity Inc.
3+
*
4+
* Aklivity licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package io.aklivity.zilla.runtime.engine.test.internal.store;
17+
18+
import java.util.function.BiConsumer;
19+
20+
import io.aklivity.zilla.runtime.engine.concurrent.Signaler;
21+
22+
record TestWatcher(
23+
BiConsumer<String, String> listener,
24+
Signaler signaler)
25+
{
26+
}

0 commit comments

Comments
 (0)