From 026303990d2f8444dfd47381ae6f6499f0e7c81b Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 23 May 2026 00:27:46 +0000 Subject: [PATCH 1/2] test(engine): share watchers and locks per store across workers in TestStoreHandler MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The engine test store handler is reused across multiple specs to exercise store lock/unlock/watch (introduced in PR #1790). Each worker constructs its own TestStoreHandler, so per-store state — entries, watchers, locks — must be shared via TestStoreContext to give cross-worker semantics that match what production store implementations (memory, redis, hazelcast) provide. Extends TestStoreContext to supply per-store watcher and lock maps (mirroring the existing supplyEntries pattern, keyed by storeConfig.id). TestStoreHandler now stores TestWatcher records that carry the registering worker's signaler, so a put on worker A correctly dispatches the listener back onto the registering worker's I/O thread instead of firing inline on whichever worker performed the put. Without this, a listener registered on worker A but fired by worker B would access state from the wrong thread, violating the engine's single-threaded-per- worker contract. The new TestLockEntry record carries the lease token and expiresAt for ownership-checked unlock. TestStoreHandler.lock uses ConcurrentMap.compute for atomic acquire-or-fail; unlock uses ConcurrentMap.computeIfPresent with a token check so a worker that never acquired the lock cannot release another worker's lock. This brings the in-tree test store implementation up to the same contract surface as the lock/unlock/watch SPI requires, so spec-level ITs that rely on those operations can be written against `type: test` without depending on `store-memory` for correctness. --- .../test/internal/store/TestLockEntry.java | 22 +++++++++++ .../engine/test/internal/store/TestStore.java | 19 +++++++++- .../test/internal/store/TestStoreContext.java | 13 ++++++- .../test/internal/store/TestStoreHandler.java | 38 +++++++++---------- .../test/internal/store/TestWatcher.java | 26 +++++++++++++ 5 files changed, 94 insertions(+), 24 deletions(-) create mode 100644 runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestLockEntry.java create mode 100644 runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestWatcher.java diff --git a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestLockEntry.java b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestLockEntry.java new file mode 100644 index 0000000000..45ab4c1420 --- /dev/null +++ b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestLockEntry.java @@ -0,0 +1,22 @@ +/* + * Copyright 2021-2024 Aklivity Inc. + * + * Aklivity licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.aklivity.zilla.runtime.engine.test.internal.store; + +record TestLockEntry( + String token, + long expiresAt) +{ +} diff --git a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestStore.java b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestStore.java index c4276685ff..f2738b75a0 100644 --- a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestStore.java +++ b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestStore.java @@ -16,6 +16,7 @@ package io.aklivity.zilla.runtime.engine.test.internal.store; import java.net.URL; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -28,11 +29,15 @@ public final class TestStore implements Store public static final String NAME = "test"; private final ConcurrentMap> storage; + private final ConcurrentMap>> storageListeners; + private final ConcurrentMap> storageLocks; public TestStore( Configuration config) { this.storage = new ConcurrentHashMap<>(); + this.storageListeners = new ConcurrentHashMap<>(); + this.storageLocks = new ConcurrentHashMap<>(); } @Override @@ -51,7 +56,7 @@ public URL type() public TestStoreContext supply( EngineContext context) { - return new TestStoreContext(context, this::acquireEntries); + return new TestStoreContext(context, this::acquireEntries, this::acquireListeners, this::acquireLocks); } private ConcurrentMap acquireEntries( @@ -59,4 +64,16 @@ private ConcurrentMap acquireEntries( { return storage.computeIfAbsent(storeId, id -> new ConcurrentHashMap<>()); } + + private ConcurrentMap> acquireListeners( + long storeId) + { + return storageListeners.computeIfAbsent(storeId, id -> new ConcurrentHashMap<>()); + } + + private ConcurrentMap acquireLocks( + long storeId) + { + return storageLocks.computeIfAbsent(storeId, id -> new ConcurrentHashMap<>()); + } } diff --git a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestStoreContext.java b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestStoreContext.java index 1ee82975be..a4539ea18d 100644 --- a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestStoreContext.java +++ b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestStoreContext.java @@ -15,6 +15,7 @@ */ package io.aklivity.zilla.runtime.engine.test.internal.store; +import java.util.List; import java.util.concurrent.ConcurrentMap; import java.util.function.LongFunction; @@ -28,13 +29,19 @@ public final class TestStoreContext implements StoreContext { private final Signaler signaler; private final LongFunction> supplyEntries; + private final LongFunction>> supplyListeners; + private final LongFunction> supplyLocks; public TestStoreContext( EngineContext context, - LongFunction> supplyEntries) + LongFunction> supplyEntries, + LongFunction>> supplyListeners, + LongFunction> supplyLocks) { this.signaler = context.signaler(); this.supplyEntries = supplyEntries; + this.supplyListeners = supplyListeners; + this.supplyLocks = supplyLocks; } @Override @@ -46,7 +53,9 @@ public TestStoreHandler attach( { options.entries.forEach(entries::putIfAbsent); } - return new TestStoreHandler(store, signaler, entries); + final ConcurrentMap> listeners = supplyListeners.apply(store.id); + final ConcurrentMap locks = supplyLocks.apply(store.id); + return new TestStoreHandler(store, signaler, entries, listeners, locks); } @Override diff --git a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestStoreHandler.java b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestStoreHandler.java index 67c2e92c4c..bbb3d8e9d4 100644 --- a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestStoreHandler.java +++ b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestStoreHandler.java @@ -20,7 +20,6 @@ import java.util.List; import java.util.Objects; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.BiConsumer; @@ -33,19 +32,21 @@ public final class TestStoreHandler implements StoreHandler { private final ConcurrentMap entries; - private final ConcurrentMap>> listeners; + private final ConcurrentMap> listeners; private final ConcurrentMap locks; private final Signaler signaler; public TestStoreHandler( StoreConfig store, Signaler signaler, - ConcurrentMap entries) + ConcurrentMap entries, + ConcurrentMap> listeners, + ConcurrentMap locks) { this.entries = Objects.requireNonNull(entries); this.signaler = Objects.requireNonNull(signaler); - this.listeners = new ConcurrentHashMap<>(); - this.locks = new ConcurrentHashMap<>(); + this.listeners = Objects.requireNonNull(listeners); + this.locks = Objects.requireNonNull(locks); } @Override @@ -121,7 +122,7 @@ public void lock( final String token = UUID.randomUUID().toString(); final TestLockEntry candidate = new TestLockEntry(token, expiresAt); TestLockEntry existing = locks.putIfAbsent(key, candidate); - if (existing != null && existing.expiresAt <= now) + if (existing != null && existing.expiresAt() <= now) { existing = locks.replace(key, existing, candidate) ? null : locks.get(key); } @@ -138,14 +139,14 @@ public void unlock( final long now = System.currentTimeMillis(); final TestLockEntry current = locks.get(key); final String result; - if (current != null && current.expiresAt > now && current.token.equals(token)) + if (current != null && current.expiresAt() > now && current.token().equals(token)) { locks.remove(key, current); result = token; } else { - if (current != null && current.expiresAt <= now) + if (current != null && current.expiresAt() <= now) { locks.remove(key, current); } @@ -159,14 +160,15 @@ public Closeable watch( String key, BiConsumer listener) { - final List> list = listeners.computeIfAbsent(key, k -> new CopyOnWriteArrayList<>()); - list.add(listener); + final TestWatcher watcher = new TestWatcher(listener, signaler); + final List list = listeners.computeIfAbsent(key, k -> new CopyOnWriteArrayList<>()); + list.add(watcher); return () -> { - final List> current = listeners.get(key); + final List current = listeners.get(key); if (current != null) { - current.remove(listener); + current.remove(watcher); } }; } @@ -175,13 +177,13 @@ private void notifyListeners( String key, String value) { - final List> list = listeners.get(key); + final List list = listeners.get(key); if (list != null && !list.isEmpty()) { final long now = System.currentTimeMillis(); - for (BiConsumer listener : list) + for (TestWatcher w : list) { - signaler.signalAt(now, 0, ignored -> listener.accept(key, value)); + w.signaler().signalAt(now, 0, ignored -> w.listener().accept(key, value)); } } } @@ -192,10 +194,4 @@ private void defer( // contract: callback fires strictly later than the call, on the caller's I/O thread signaler.signalAt(System.currentTimeMillis(), 0, ignored -> task.run()); } - - private record TestLockEntry( - String token, - long expiresAt) - { - } } diff --git a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestWatcher.java b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestWatcher.java new file mode 100644 index 0000000000..e058ff3f68 --- /dev/null +++ b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestWatcher.java @@ -0,0 +1,26 @@ +/* + * Copyright 2021-2024 Aklivity Inc. + * + * Aklivity licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.aklivity.zilla.runtime.engine.test.internal.store; + +import java.util.function.BiConsumer; + +import io.aklivity.zilla.runtime.engine.concurrent.Signaler; + +record TestWatcher( + BiConsumer listener, + Signaler signaler) +{ +} From aba1812270cdc91da1710b2f32689bff93d4894b Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 23 May 2026 00:28:41 +0000 Subject: [PATCH 2/2] feat(engine): add renew operation to StoreHandler MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds StoreHandler.renew(key, token, ttl, completion) to the engine SPI, following the same ownership-checked, async-completion contract as unlock. Callers that hold a coordination lock for longer than its initial TTL — e.g. a singleton worker that owns a binding-scoped resource for the lifetime of the binding — schedule renewals at an interval shorter than the lease TTL. A failed renewal signals that ownership has been lost (the lock was reacquired by another holder after a TTL expiry), giving callers a deterministic cue to surrender state and let the new owner take over. store-memory and the engine TestStoreHandler implement renew with an atomic ConcurrentMap.replace against the previously-observed LockEntry: if the token matches the unexpired current holder, the entry is replaced with a renewed expiresAt and the original token is returned; otherwise null is returned. Expired entries are evicted opportunistically, mirroring the unlock cleanup behaviour. TestBindingFactory gains a renew assertion alongside the existing lock/unlock/watch ops. The new spec config store-memory.spec/config/ store.renew.yaml exercises the full acquire-renew-release cycle for the IT. --- .../runtime/engine/store/StoreHandler.java | 34 +++++++++++++++ .../internal/binding/TestBindingFactory.java | 42 +++++++++++++++++- .../test/internal/store/TestStoreHandler.java | 27 ++++++++++++ .../memory/internal/MemoryStoreHandler.java | 28 ++++++++++++ .../memory/internal/MemoryStoreHandlerIT.java | 10 +++++ .../store/memory/config/store.renew.yaml | 43 +++++++++++++++++++ 6 files changed, 183 insertions(+), 1 deletion(-) create mode 100644 specs/store-memory.spec/src/main/scripts/io/aklivity/zilla/specs/store/memory/config/store.renew.yaml diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/store/StoreHandler.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/store/StoreHandler.java index 1f9bcedd06..99fa6d5650 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/store/StoreHandler.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/store/StoreHandler.java @@ -157,6 +157,40 @@ void unlock( String token, Consumer completion); + /** + * Extends the TTL of a lock previously acquired via {@link #lock}, provided the supplied token + * matches the current holder. + *

+ * Used by callers that hold a lock longer than its initial TTL — for example, a singleton + * worker that owns a coordination lease for the lifetime of its binding. The caller schedules + * renewals at an interval shorter than the lease TTL; if any renewal fails the caller has + * lost ownership and must surrender any externally-visible state predicated on holding the + * lock. + *

+ *

+ * On a successful ownership-checked renewal, the completion receives the supplied token and + * the lock's expiry is reset to {@code now + ttl}. On failure — the token does not match the + * current holder, or the lock has already expired and possibly been reacquired by another + * holder — the completion receives {@code null}. As with {@link #unlock}, the implementation + * does not surface information about another holder to a caller that did not prove ownership. + *

+ *

+ * The completion fires strictly later than the call returns, on the caller's I/O + * thread, following the same async contract as the other {@code StoreHandler} operations. + *

+ * + * @param key the key whose lock is being renewed + * @param token the ownership token returned by a prior {@link #lock} call + * @param ttl the new lease duration counted from now, or {@code null} for no expiry + * @param completion a callback that receives the supplied {@code token} on a successful + * ownership-checked renewal, or {@code null} otherwise + */ + void renew( + String key, + String token, + Duration ttl, + Consumer completion); + /** * Registers a listener invoked when the value associated with the given key changes. *

diff --git a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/TestBindingFactory.java b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/TestBindingFactory.java index f96e2ecced..48c865f251 100644 --- a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/TestBindingFactory.java +++ b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/TestBindingFactory.java @@ -23,6 +23,7 @@ import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.security.KeyStore; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -118,6 +119,7 @@ final class TestBindingFactory implements BindingHandler private VaultAssertion vaultAssertion; private StoreHandler store; private List storeAssertions; + private final Map heldLockTokens = new HashMap<>(); private long authorization; TestBindingFactory( @@ -730,11 +732,15 @@ private void runStoreAssertions( doInitialReset(traceId); } } + if (token != null) + { + heldLockTokens.put(k, token); + } callbackFired.value = true; }); break; case "unlock": - store.unlock(a.key, a.value, v -> + store.unlock(a.key, resolveToken(a), v -> { if (Thread.currentThread() != dispatchThread || callbackFired.value || @@ -742,6 +748,33 @@ private void runStoreAssertions( { doInitialReset(traceId); } + if (v != null) + { + heldLockTokens.remove(a.key); + } + callbackFired.value = true; + }); + break; + case "renew": + store.renew(a.key, resolveToken(a), a.ttl, v -> + { + if (Thread.currentThread() != dispatchThread || + callbackFired.value) + { + doInitialReset(traceId); + } + // expect="" or "null" → v must be null (renew failed); + // expect=non-empty → v must be non-null (renew succeeded) + if (a.hasExpect) + { + boolean expectRenewed = a.expect != null && !a.expect.isEmpty() && + !"null".equals(a.expect); + boolean renewed = v != null; + if (expectRenewed != renewed) + { + doInitialReset(traceId); + } + } callbackFired.value = true; }); break; @@ -769,6 +802,13 @@ private void runStoreAssertions( } } + private String resolveToken( + StoreAssertion a) + { + // explicit token via value, otherwise use the most recent token captured by a prior lock + return a.value != null && !a.value.isEmpty() ? a.value : heldLockTokens.get(a.key); + } + private void onInitialData( DataFW data) { diff --git a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestStoreHandler.java b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestStoreHandler.java index bbb3d8e9d4..1dbb112582 100644 --- a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestStoreHandler.java +++ b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestStoreHandler.java @@ -155,6 +155,33 @@ public void unlock( defer(() -> completion.accept(result)); } + @Override + public void renew( + String key, + String token, + Duration ttl, + Consumer completion) + { + final long now = System.currentTimeMillis(); + final TestLockEntry current = locks.get(key); + String result = null; + if (current != null && current.expiresAt() > now && current.token().equals(token)) + { + final long expiresAt = ttl == null ? Long.MAX_VALUE : now + ttl.toMillis(); + final TestLockEntry renewed = new TestLockEntry(token, expiresAt); + if (locks.replace(key, current, renewed)) + { + result = token; + } + } + else if (current != null && current.expiresAt() <= now) + { + locks.remove(key, current); + } + final String outcome = result; + defer(() -> completion.accept(outcome)); + } + @Override public Closeable watch( String key, diff --git a/runtime/store-memory/src/main/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandler.java b/runtime/store-memory/src/main/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandler.java index c60acc4340..b8d70e6a4f 100644 --- a/runtime/store-memory/src/main/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandler.java +++ b/runtime/store-memory/src/main/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandler.java @@ -165,6 +165,34 @@ public void unlock( defer(() -> completion.accept(result)); } + @Override + public void renew( + String key, + String token, + Duration ttl, + Consumer completion) + { + final long now = System.currentTimeMillis(); + final LockEntry current = locks.get(key); + String result = null; + if (current != null && current.expiresAt() > now && current.token().equals(token)) + { + final long expiresAt = expiresAt(ttl); + final LockEntry renewed = new LockEntry(token, expiresAt); + if (locks.replace(key, current, renewed)) + { + result = token; + } + } + else if (current != null && current.expiresAt() <= now) + { + // expired holder still cluttering the map — clean up opportunistically + locks.remove(key, current); + } + final String outcome = result; + defer(() -> completion.accept(outcome)); + } + @Override public Closeable watch( String key, diff --git a/runtime/store-memory/src/test/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandlerIT.java b/runtime/store-memory/src/test/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandlerIT.java index 226e6cb7cb..9289e0b959 100644 --- a/runtime/store-memory/src/test/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandlerIT.java +++ b/runtime/store-memory/src/test/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandlerIT.java @@ -76,6 +76,16 @@ public void shouldUnlockAbsentResource() throws Exception k3po.finish(); } + @Test + @Configuration("store.renew.yaml") + @Specification({ + "${net}/handshake/client", + "${app}/handshake/server"}) + public void shouldRenewOwnedLock() throws Exception + { + k3po.finish(); + } + @Test @Configuration("store.watch.yaml") @Specification({ diff --git a/specs/store-memory.spec/src/main/scripts/io/aklivity/zilla/specs/store/memory/config/store.renew.yaml b/specs/store-memory.spec/src/main/scripts/io/aklivity/zilla/specs/store/memory/config/store.renew.yaml new file mode 100644 index 0000000000..c1d566944d --- /dev/null +++ b/specs/store-memory.spec/src/main/scripts/io/aklivity/zilla/specs/store/memory/config/store.renew.yaml @@ -0,0 +1,43 @@ +# +# Copyright 2021-2024 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +--- +name: test +stores: + memory0: + type: memory +bindings: + net0: + type: test + kind: server + options: + store: memory0 + assertions: + store: + memory0: + - op: lock + key: lock.renewable + ttl: PT1M + expect: acquired + - op: renew + key: lock.renewable + ttl: PT1M + expect: renewed + - op: renew + key: lock.absent + value: any-token + expect: null + routes: + - exit: app0