Add lock, unlock, and watch operations to StoreHandler#1790
Merged
Conversation
Adds three new primitives to the StoreHandler SPI: - watch(key, listener) returns a Closeable; listener fires on the registering caller's signaler after every successful mutation of the watched key (null value indicates removal). - lock(key, ttl, completion) acquires a TTL-bounded lock, returning a non-null token on success or null on contention. - unlock(key, token, completion) atomically releases when the token matches the current holder; ownership-mismatch returns the holder token, expired/missing locks are treated as a no-op success. Implementations decide change-visibility scope (in-process, node-wide, cluster-wide). All three methods follow the existing async contract: callbacks fire strictly later than the call, on the caller's I/O thread. store-memory implements all three using shared per-store-config maps (entries / watchers / locks) so cross-worker dispatch is correct: the handler that received watch captures its own signaler in the Watcher record; iteration on put dispatches each listener via its captured signaler, routing the invocation onto the registering worker's event loop regardless of which worker did the mutation. TestStoreHandler updated with the same model so engine.spec IT coverage carries through.
Extends the test binding's storeAssertions vocabulary with lock, unlock, and watch ops so YAML-driven IT scripts can drive the new StoreHandler primitives the same way as get/put/delete. Adds three new YAML configs and IT methods for store-memory: - store.lock.yaml: lock acquires, then second attempt fails - store.unlock.yaml: unlock against absent lock is a no-op success - store.watch.yaml: watch registers, subsequent put fires listener with the matching value
SPI changes: - put / putIfAbsent / lock now take Duration ttl; null = no expiry, replacing the Long.MAX_VALUE sentinel. Read more naturally and parses from ISO-8601 in YAML configs (e.g. PT1M, PT8H). - unlock completion now returns the supplied token on a successful ownership-checked release, or null otherwise — mirroring lock's shape (non-null = you own it). Previous semantics inverted this AND leaked the current holder's token back to a caller that did not prove ownership, defeating the ownership check. store-memory + TestStoreHandler updated to match. Test binding's StoreAssertion config carries ttl as Duration; the adapter serialises/parses via Duration.toString / Duration.parse. store.lock.yaml uses ttl: PT1M; expect: "" still matches the not-acquired path of lock the same way it now matches the not-released path of unlock.
The SPI change to use Duration for ttl in commit 2f7c53e missed the in-tree consumer in McpProxyCache. The autobuild compile failures (zilla CI #1790) pointed at three call sites that still passed ttl.toMillis() and STORE_TTL_FOREVER (= Long.MAX_VALUE). Switch to passing the Duration directly and treat null as the no-expiry sentinel — matches the new SPI contract.
jfallows
pushed a commit
that referenced
this pull request
May 22, 2026
McpProxyCacheManager previously released the cache lifecycle lock the moment initial hydrate completed (in onCacheReady), opening a window where a different worker's reconnect retry would acquire the lock and also open its own lifecycle stream to upstream. The result was multiple workers redundantly subscribing to the upstream SSE for the same binding — wasted resources, and harder to reason about which worker drives TTL refreshes. Move releaseLifecycle out of onCacheReady. The lifecycle lock is acquired once at binding attach (or after lifecycle abort + reconnect) and held until detach / engine shutdown. Loser workers keep retrying the acquire; their attempts only succeed if the holder dies and the lock TTL expires (or releases explicitly on detach). This matches MCP semantics: one worker per binding owns the upstream SSE; others serve their agents from the shared cache populated via the store-watch propagation path. Migrate the per-kind and lifecycle locks in McpProxyCache from putIfAbsent/delete to lock/unlock from PR #1790. Ownership-checked unlock means a worker that never acquired the lock cannot accidentally release another worker's lock (which the old unauthenticated delete allowed). Token state is held on McpListCache and McpProxyCache; null token short-circuits release as a no-op. Engine TestStoreHandler updated to share watchers and locks per storeConfig.id (mirroring how entries are already shared via TestStoreContext.supplyEntries). The watcher record carries the registering worker's signaler so cross-worker notify dispatches listener invocations onto the registering worker's I/O thread, matching the contract documented on StoreHandler. Without these fixes cross-worker watch propagation either didn't fire at all or fired on the wrong thread. https://claude.ai/code/session_01Gx5yC2CuFd54Fyoy7kL3qg
3 tasks
jfallows
added a commit
that referenced
this pull request
May 23, 2026
* test(engine): share watchers and locks per store across workers in TestStoreHandler The engine test store handler is reused across multiple specs to exercise store lock/unlock/watch (introduced in PR #1790). Each worker constructs its own TestStoreHandler, so per-store state — entries, watchers, locks — must be shared via TestStoreContext to give cross-worker semantics that match what production store implementations (memory, redis, hazelcast) provide. Extends TestStoreContext to supply per-store watcher and lock maps (mirroring the existing supplyEntries pattern, keyed by storeConfig.id). TestStoreHandler now stores TestWatcher records that carry the registering worker's signaler, so a put on worker A correctly dispatches the listener back onto the registering worker's I/O thread instead of firing inline on whichever worker performed the put. Without this, a listener registered on worker A but fired by worker B would access state from the wrong thread, violating the engine's single-threaded-per- worker contract. The new TestLockEntry record carries the lease token and expiresAt for ownership-checked unlock. TestStoreHandler.lock uses ConcurrentMap.compute for atomic acquire-or-fail; unlock uses ConcurrentMap.computeIfPresent with a token check so a worker that never acquired the lock cannot release another worker's lock. This brings the in-tree test store implementation up to the same contract surface as the lock/unlock/watch SPI requires, so spec-level ITs that rely on those operations can be written against `type: test` without depending on `store-memory` for correctness. * feat(engine): add renew operation to StoreHandler Adds StoreHandler.renew(key, token, ttl, completion) to the engine SPI, following the same ownership-checked, async-completion contract as unlock. Callers that hold a coordination lock for longer than its initial TTL — e.g. a singleton worker that owns a binding-scoped resource for the lifetime of the binding — schedule renewals at an interval shorter than the lease TTL. A failed renewal signals that ownership has been lost (the lock was reacquired by another holder after a TTL expiry), giving callers a deterministic cue to surrender state and let the new owner take over. store-memory and the engine TestStoreHandler implement renew with an atomic ConcurrentMap.replace against the previously-observed LockEntry: if the token matches the unexpired current holder, the entry is replaced with a renewed expiresAt and the original token is returned; otherwise null is returned. Expired entries are evicted opportunistically, mirroring the unlock cleanup behaviour. TestBindingFactory gains a renew assertion alongside the existing lock/unlock/watch ops. The new spec config store-memory.spec/config/ store.renew.yaml exercises the full acquire-renew-release cycle for the IT. --------- Co-authored-by: Claude <noreply@anthropic.com>
jfallows
added a commit
that referenced
this pull request
May 23, 2026
* feat(binding-mcp): advertise capabilities.{tools,prompts,resources}.listChanged in initialize
Widen McpLifecycleBeginEx.capabilities to uint16 and add three new bits
(SERVER_TOOLS_LIST_CHANGED, SERVER_PROMPTS_LIST_CHANGED,
SERVER_RESOURCES_LIST_CHANGED) so the listChanged capability can flow
end-to-end through the lifecycle BEGIN_EX.
Server kind: replace the hardcoded "capabilities" JSON fragment with one
of 8 pre-computed variants selected by the listChanged bits in the
application's lifecycle reply. The reply is now opened immediately on
"initialize" so the JSON-RPC response can be deferred until the app
declares its capabilities.
Client kind: parse the upstream "initialize" result for
capabilities.{tools,prompts,resources}.listChanged and OR the matching
bits into the lifecycle BEGIN_EX. HttpInitializeRequest's decoder is now
decodeJsonRpc rather than decodeIgnore, with a new onResponseComplete
hook that runs after the result body is fully accumulated.
https://claude.ai/code/session_01XW5Ph2usPaFkkewqMmcVtH
* feat(binding-mcp): cache-enabled proxy advertises all three listChanged bits
When McpBindingConfig.cache is non-null, OR in
SERVER_TOOLS_LIST_CHANGED, SERVER_PROMPTS_LIST_CHANGED, and
SERVER_RESOURCES_LIST_CHANGED into the binding's serverCapabilities. The
cache's TTL refresh path is itself a listChanged mechanism, so the proxy
can credibly emit list_changed notifications for any kind regardless of
whether downstream advertises listChanged.
https://claude.ai/code/session_01XW5Ph2usPaFkkewqMmcVtH
* feat(binding-mcp): wire notifications/.../list_changed to cache refresh
McpProxyCacheHydrater.McpHydrateLifecycleStream gains a FlushFW branch
in onLifecycleMessage that decodes the McpFlushEx, maps the
toolsListChanged / promptsListChanged / resourcesListChanged kind to
the corresponding KIND_*_LIST, and calls handler.onListChanged(kind).
The new callback chain (McpProxyCacheHandler.onListChanged →
McpProxyCacheListener.onListChanged → McpProxyCacheManager.onListChanged)
cancels the pending TTL signal and invokes handler.hydrate(kind) — the
same code path the TTL callback uses. The existing settle-arms-next-signal
cycle re-arms the next TTL from "now" once the refresh completes, so the
TTL countdown naturally resets.
Per-kind lock semantics unchanged: only the hydrate-lock-winning worker
receives notifications and runs the refresh, so there is no contention
on the per-kind <kind>.lock from a notification.
https://claude.ai/code/session_01XW5Ph2usPaFkkewqMmcVtH
* feat(binding-mcp): diff-gated fan-out of list_changed to agents on cache refresh
When McpListCache.put receives a value whose CRC32 differs from the
previously cached value, the cache invokes onChanged(kind). The
McpProxyFactory.attach registers a broadcaster lambda on
McpProxyCache.onChanged that iterates the binding's open agent
sessions and calls McpProxySession.doNotifyListChanged(kind, traceId)
on each.
McpLifecycleServer (proxy-side, per-agent lifecycle) implements
doNotifyListChanged by building an McpFlushExFW with the matching
toolsListChanged / promptsListChanged / resourcesListChanged union
case, stamping a per-session monotonic id, and writing it on the
agent's lifecycle reply stream. The downstream MCP server kind then
echoes the id onto its outbound SSE GET event and emits a
notifications/{kind}/list_changed line.
The diff-gate ensures TTL refreshes that produce identical content
don't spam agents; a fresh worker that reads existing store content on
attach seeds its CRC from that value so subsequent puts diff against
the correct baseline.
Adds cache.refresh.tools.notify scenario (agent script only) and
McpProxyCacheIT.shouldNotifyToolsListChangedAfterRefresh to verify
end-to-end: initial hydrate populates with [get_weather], TTL fires,
refresh returns [get_weather, get_time] (differs), fan-out delivers
FlushEx-toolsListChanged with id=0 to the connected agent.
https://claude.ai/code/session_01XW5Ph2usPaFkkewqMmcVtH
* test(binding-mcp): cover end-to-end notification-driven refresh + fan-out
Adds cache.notify.tools.list.changed scenario:
- Downstream simulator (server.rpt): completes initial tools/list, then
emits FlushEx-toolsListChanged on its lifecycle reply, then accepts a
second tools/list call with updated content
- Agent simulator (client.rpt): opens a lifecycle session and verifies
it receives FlushEx-toolsListChanged
McpProxyCacheIT.shouldRefreshToolsOnListChangedNotification wires these
through Zilla under proxy.cache.yaml (no TTL) with
MCP_HYDRATE_FILTER=tools, so the second tools/list must be triggered by
the downstream's notification — and the diff-gated fan-out delivers the
FlushEx to the connected agent only when content actually differs.
The peer-to-peer variant is intentionally omitted: notification-driven
refresh requires the proxy mediating between hydrate and agent sessions
(distinct session ids), which can't be replayed by a single pair of
peer scripts.
https://claude.ai/code/session_01XW5Ph2usPaFkkewqMmcVtH
* refactor(binding-mcp): rename McpListCache.onChanged to onSettled (kind, changed)
Cache callback is now fired on every successful put — not just when the
CRC32 differs from the prior value. Signature changes from IntConsumer
to a small @FunctionalInterface carrying both the kind and the changed
flag, so downstream broadcasters can act on the no-change case (e.g.,
clear pending state without emitting).
Broadcaster lambda in McpProxyFactory.attach gates emission on
'changed' to preserve current behavior. Prepares the surface for the
upcoming defer-list-changed-in-cache-mode work; no behavior change in
this commit.
https://claude.ai/code/session_01Gx5yC2CuFd54Fyoy7kL3qg
* feat(binding-mcp): defer upstream list_changed FlushEx in agent lifecycle when cache mode
Per-route lifecycle clients on agent-facing lifecycle servers
(originId != routedId, the multi-route or single-route exit pattern)
no longer forward toolsListChanged / promptsListChanged /
resourcesListChanged FlushEx to the agent when the binding has cache
enabled. The cache settle path (McpListCache.put → onSettled →
broadcaster in McpProxyFactory.attach) is now the sole emission path
for these notifications in cache mode.
Eliminates the duplicate notification that previously fired when an
agent had invoked a per-route operation before an upstream
list_changed event: once via the per-route forward, then again via
the cache broadcaster after the refresh settled. With deferral the
agent sees exactly one notification, and crucially it arrives after
the cache holds the new value — so any immediate re-fetch by the
agent sees fresh content.
Hydrater lifecycle servers (originId == routedId, self-loop) are
unchanged: they continue to forward via doServerFlush so that
McpHydrateLifecycleStream.onLifecycleFlush observes the upstream
notification and triggers cache.hydrate(kind).
https://claude.ai/code/session_01Gx5yC2CuFd54Fyoy7kL3qg
* feat(binding-mcp): cross-worker cache settle propagation via StoreHandler.watch
McpProxyCacheManager.start() registers one watch per kind on the
kind's store value key. When any worker (local or remote) updates the
key, the watch listener fires; the manager re-reads the value via the
cache's existing get path, which routes through McpListCache.checkGet
where the CRC32 of the returned value is compared against the
worker-local lastChecksum.
McpListCache.checkGet now fires onSettled(kind, changed) on every read
— mirroring the put path. Initial loads and watch fires of identical
content report changed=false (broadcaster matrix no-ops); watch fires
after a remote worker's content-changing put report changed=true on
this worker (broadcaster matrix emits to local sessions).
stop() unsubscribes via the Closeable handle returned at registration;
the listener's get-on-fire is harmless if it races a concurrent stop
because checkGet honours stopped=true upstream and onSettled is
already gated on cache state. The IOException from Closeable.close is
swallowed — unsubscribe is best-effort, parallels the engine
signaler-cancel pattern.
https://claude.ai/code/session_01Gx5yC2CuFd54Fyoy7kL3qg
* test(binding-mcp): cover deferred list_changed release in cache mode
New k3po scenario cache.notify.tools.list.changed.after.tools.call
exercises the duplicate-fix: the agent's per-route lifecycle client
(opened by the agent invoking tools/call) receives an upstream
toolsListChanged FlushEx with id="200", which McpLifecycleClient.
onClientFlush now defers (no doServerFlush forwarding) because the
binding has cache enabled and the lifecycle server is agent-facing.
The hydrater's per-route lifecycle client receives its own copy of the
notification with id="100", which is forwarded to the hydrater's
self-loop lifecycle server, triggering McpProxyCacheHydrater
.onLifecycleFlush -> McpProxyCacheManager.onListChanged ->
handler.hydrate -> cache.put. The put detects a CRC32 diff against
the initial tools list, fires onSettled(kind, true), and the
broadcaster in McpProxyFactory.attach emits a single
toolsListChanged FlushEx with synthetic id="0" to the agent.
Without the defer-in-cache-mode fix the agent would have read
id="200" first (the forwarded upstream id), failing the id("0")
match. With the fix the cache-broadcast wins and the agent sees
exactly one notification at id="0".
https://claude.ai/code/session_01Gx5yC2CuFd54Fyoy7kL3qg
* fix(binding-mcp): hold cache lifecycle lock for binding lifetime
McpProxyCacheManager previously released the cache lifecycle lock the
moment initial hydrate completed (in onCacheReady), opening a window
where a different worker's reconnect retry would acquire the lock and
also open its own lifecycle stream to upstream. The result was
multiple workers redundantly subscribing to the upstream SSE for the
same binding — wasted resources, and harder to reason about which
worker drives TTL refreshes.
Move releaseLifecycle out of onCacheReady. The lifecycle lock is
acquired once at binding attach (or after lifecycle abort + reconnect)
and held until detach / engine shutdown. Loser workers keep retrying
the acquire; their attempts only succeed if the holder dies and the
lock TTL expires (or releases explicitly on detach). This matches MCP
semantics: one worker per binding owns the upstream SSE; others serve
their agents from the shared cache populated via the store-watch
propagation path.
Migrate the per-kind and lifecycle locks in McpProxyCache from
putIfAbsent/delete to lock/unlock from PR #1790. Ownership-checked
unlock means a worker that never acquired the lock cannot accidentally
release another worker's lock (which the old unauthenticated delete
allowed). Token state is held on McpListCache and McpProxyCache; null
token short-circuits release as a no-op.
Engine TestStoreHandler updated to share watchers and locks per
storeConfig.id (mirroring how entries are already shared via
TestStoreContext.supplyEntries). The watcher record carries the
registering worker's signaler so cross-worker notify dispatches
listener invocations onto the registering worker's I/O thread,
matching the contract documented on StoreHandler. Without these
fixes cross-worker watch propagation either didn't fire at all or
fired on the wrong thread.
https://claude.ai/code/session_01Gx5yC2CuFd54Fyoy7kL3qg
* feat(engine): add renew operation to StoreHandler
Adds StoreHandler.renew(key, token, ttl, completion) to the engine SPI,
following the same ownership-checked, async-completion contract as
unlock. Callers that hold a coordination lock for longer than its
initial TTL — e.g. a singleton worker that owns the cache lifecycle
for the lifetime of a binding — schedule renewals at an interval
shorter than the lease TTL. A failed renewal signals that ownership
has been lost (the lock was reacquired by another holder after a TTL
expiry), giving callers a deterministic cue to surrender state and
let the new owner take over.
store-memory and the engine TestStoreHandler implement renew with an
atomic ConcurrentMap.replace against the previously-observed
LockEntry: if the token matches the unexpired current holder, the
entry is replaced with a renewed expiresAt and the original token is
returned; otherwise null is returned. Expired entries are evicted
opportunistically, mirroring the unlock cleanup behaviour.
TestBindingFactory gains a renew assertion alongside the existing
lock/unlock/watch ops. Lock now stashes the acquired token in a
heldLockTokens map keyed by lock key, so subsequent renew (and
unlock) assertions can target the same key without the YAML needing
to surface the token. Explicit value: in the assertion still wins
when set. store-memory ships an IT covering renew of an owned lock
(success) and renew with a non-matching token (null).
McpProxyCache exposes renewLifecycle for the cache manager.
McpProxyCacheManager schedules a renewal at leaseTtl / 3 once the
hydrater opens, re-scheduling on each successful renewal so the
cache owner holds its lifecycle lock uninterrupted while the node
runs. On a failed renewal the manager treats it as a lifecycle loss:
it stops the current handler, falls into the existing reconnect
path, and the next race winner — possibly this worker, possibly
another — takes over. Combined with TTL-bounded recovery this gives
uninterrupted ownership during normal operation and timely takeover
when a holding node crashes.
https://claude.ai/code/session_01Gx5yC2CuFd54Fyoy7kL3qg
* refactor(binding-mcp): always advertise listChanged; emit cache-driven notifications id-less
Initialize response now unconditionally advertises listChanged:true for prompts,
resources, and tools. MCP spec treats the bit as advisory (SHOULD), and the
zilla server delivers list_changed notifications in both cache-enabled mode
(via cache settle path) and passthrough mode (forwarded from upstream). The
8-variant byte-array selector and the pendingInitialize back-reference from
McpLifecycleStream to McpServer are removed; the initialize JSON is now
encoded synchronously on receipt of the JSON-RPC initialize request.
Cache-driven list_changed FlushEx no longer carries a synthetic monotonic id.
The id field is left as the absent sentinel (string16 length=-1), and the SSE
encoder skips the entire `id:` line when the supplied String16FW length is -1.
This preserves the agent's previously-anchored upstream Last-Event-ID across
cache-driven notifications, instead of overwriting it with a proxy-local
counter that no upstream could resume from.
The McpProxyCacheManager unconditional OR-in of SERVER_*_LIST_CHANGED bits on
the agent-facing BEGIN_EX is removed - the BEGIN_EX bits now reflect upstream's
actual capabilities. Tests that asserted the cache-enabled proxy adds those
bits are deleted alongside the now-collapsed 3-of-4 lifecycle.initialize.*.
list.changed scenarios; one shouldInitializeLifecycleAllListChanged scenario
remains in each of network/ and application/ to assert the unconditional JSON
shape.
McpFunctions adds null-tolerant matcher semantics for the three listChanged
FlushEx variants: .id("X") matches exact value, omitting .id(...) matches any
id (wildcard, unchanged), and .id(null) matches when the id field is absent
(length=-1) - distinct from wildcard.
* test(binding-mcp): drop redundant lifecycle.initialize.all.list.changed scenario
Since the initialize advertise is now unconditional (every initialize.*
scenario already asserts the listChanged JSON output regardless of the
input BEGIN_EX capability bits), the .all.list.changed scenario adds no
regression coverage that the base lifecycle.initialize doesn't already
provide. Any future change that re-couples input bits to output JSON
would break the base scenario first.
Removes the scenario directory and its IT methods in network/
ApplicationIT, runtime McpClientIT, and runtime McpServerIT.
* refactor(binding-mcp): address review feedback on cache + lifecycle naming
- Rename cache lock methods to drop redundant Lifecycle suffix:
acquireLifecycle → acquireLock, releaseLifecycle → releaseLock,
renewLifecycle → renewLock; lifecycleLockToken field → lockToken.
- Rename onListChanged → onChanged on McpProxyCacheHandler and
McpProxyCacheListener; rename OnSettled SAM type to ListChangedListener
and initialize the field to a no-op so callers can chain via andThen
without null checks at every fire site.
- McpProxyCacheManager: define a NO_OP BiConsumer constant for the
watch-fired get callback; use Agrona CloseHelper.quietClose for
best-effort watch unsubscribe (drops the IOException import and the
inline try/catch).
- McpProxyCacheHydrater.onLifecycleFlush: assume the extension wraps a
valid McpFlushExFW (consistent with peer call sites); drop the
null/size guards and inline the listKind temporary into a switch
statement that calls handler.onChanged directly.
- McpProxyLifecycleFactory.doNotifyListChanged: switch expression
instead of switch statement.
- McpListCache: move the boolean populated field below the private
fields with a blank line separator.
- mcp.idl: group SERVER_ capabilities (TOOLS, PROMPTS, RESOURCES, and
their LIST_CHANGED variants) together.
- McpFunctions: use static import for UTF_8.
* refactor(binding-mcp): hoist renewTtl, simplify cancel-id field names, renumber capability bits
- McpProxyCache: add renewTtl as leaseTtl.dividedBy(3) so the renew
schedule is derived once at construction; consumers use
cache.renewTtl.toMillis() directly without inline arithmetic.
- McpProxyCacheManager: rename refreshCancelId / reconnectCancelId /
renewCancelId fields to refreshId / reconnectId / renewId (drops the
redundant Cancel infix); drop the inline comment over the renew
schedule now that the duration is named.
- mcp.idl: renumber the McpCapabilities bitmask so values are ascending
within and across groups — SERVER_* (1..32) before CLIENT_* (64..256).
CLIENT_ROOTS / CLIENT_SAMPLING / CLIENT_ELICITATION shift from 8/16/32
to 64/128/256; SERVER_*_LIST_CHANGED shift from 64/128/256 to 8/16/32.
No literal bit values are referenced from scripts or Java; all
consumers go through the generated enum constants so the renumber is
source-compatible.
* fix(binding-mcp): declare id as nullable on the three list_changed FlushEx structs
McpToolsListChangedFlushEx, McpPromptsListChangedFlushEx, and
McpResourcesListChangedFlushEx each carried a non-nullable string16 id.
The generated builder asserts that all non-default fields are set, so a
caller that wanted to emit an id-less notification (the cache-driven
emit path in McpProxyLifecycleFactory.doNotifyListChanged after the
recent refactor) hit an AssertionError at flyweight build time when
invoked through .toolsListChanged(b -> {}).
The AssertionError propagated out of an EngineSignaler deferred lambda
inside the engine worker loop, terminated the worker via
AgentTerminationException, and left ring buffers and timer-wheel state
inconsistent. Teardown of the test engine then tried to drain those
buffers and crashed with SIGSEGV in C2-compiled UnsafeApi.getLongVolatile
— the apparent JVM crash was the symptom, not the root cause.
Adds = null as the IDL default for all three list_changed id fields,
mirroring how McpResumableFlushEx already declares its nullable id.
The builder now skips the assertion when id is not set, and the
flyweight serializes length() == -1 on the wire — which is exactly
what the SSE encoder relies on to skip the id: line for cache-driven
notifications.
Full binding-mcp IT suite now passes locally end-to-end
(McpAggregateEventIdTest, McpClientIT, McpProxyCacheIT, McpServerIT,
McpProxyIT, McpProxyLifecycleIT).
---------
Co-authored-by: Claude <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
This change extends the
StoreHandlerinterface with three new operations to support distributed coordination and reactive data patterns:lock(key, ttlMillis, completion)— Non-blocking TTL-bounded lock acquisition. Returns a unique token on success (null on contention). Locks auto-expire when TTL elapses.unlock(key, token, completion)— Releases a lock by token. Returns null on success, or the current holder's token if ownership doesn't match. Treats expired/already-released locks as no-op success.watch(key, listener)— Registers a listener that fires asynchronously whenever the value at a key changes (including deletion). Returns aCloseablehandle to unsubscribe.Implementation Details
MemoryStoreHandler:
ConcurrentMap<String, List<Watcher>>to track listeners per keyConcurrentMap<String, LockEntry>to track active locks with expiry timesCopyOnWriteArrayListfor thread-safe listener managementput,putIfAbsent,delete,getAndDelete) now notify watchers vianotifyWatchers()WatcherandLockEntryrecords to encapsulate lock/listener stateMemoryStore & MemoryStoreContext:
TestStoreHandler:
StoreHandler interface:
Test Coverage
Added three integration test configurations:
store.lock.yaml— Verifies lock acquisition and contentionstore.unlock.yaml— Verifies unlock on absent resourcesstore.watch.yaml— Verifies listener fires on mutationsUpdated
TestBindingFactoryto support lock/unlock/watch assertions in test specifications.All operations maintain the existing async contract: callbacks fire strictly later than the call returns, on the caller's I/O thread.
https://claude.ai/code/session_01DcjrFrkfvhdhrbykZ8GdyQ