From 7f8bebd65a1d1ff55e0b4a9f4ade6c98b3f7555f Mon Sep 17 00:00:00 2001
From: Claude
Date: Thu, 21 May 2026 22:44:24 +0000
Subject: [PATCH 1/4] feat(engine): add watch / lock / unlock to StoreHandler
SPI
Adds three new primitives to the StoreHandler SPI:
- watch(key, listener) returns a Closeable; listener fires on the
registering caller's signaler after every successful mutation of
the watched key (null value indicates removal).
- lock(key, ttl, completion) acquires a TTL-bounded lock, returning
a non-null token on success or null on contention.
- unlock(key, token, completion) atomically releases when the token
matches the current holder; ownership-mismatch returns the holder
token, expired/missing locks are treated as a no-op success.
Implementations decide change-visibility scope (in-process, node-wide,
cluster-wide). All three methods follow the existing async contract:
callbacks fire strictly later than the call, on the caller's I/O thread.
store-memory implements all three using shared per-store-config maps
(entries / watchers / locks) so cross-worker dispatch is correct:
the handler that received watch captures its own signaler in the
Watcher record; iteration on put dispatches each listener via its
captured signaler, routing the invocation onto the registering
worker's event loop regardless of which worker did the mutation.
TestStoreHandler updated with the same model so engine.spec IT
coverage carries through.
---
.../runtime/engine/store/StoreHandler.java | 72 +++
.../test/internal/store/TestStoreHandler.java | 110 +++-
.../store/memory/internal/MemoryStore.java | 26 +-
.../memory/internal/MemoryStoreContext.java | 15 +-
.../memory/internal/MemoryStoreHandler.java | 122 +++-
.../internal/MemoryStoreHandlerTest.java | 526 ++++++++++++++++++
6 files changed, 866 insertions(+), 5 deletions(-)
create mode 100644 runtime/store-memory/src/test/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandlerTest.java
diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/store/StoreHandler.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/store/StoreHandler.java
index d81bcbafc7..bc6de4b6e7 100644
--- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/store/StoreHandler.java
+++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/store/StoreHandler.java
@@ -15,6 +15,7 @@
*/
package io.aklivity.zilla.runtime.engine.store;
+import java.io.Closeable;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@@ -104,4 +105,75 @@ void delete(
void getAndDelete(
String key,
Consumer completion);
+
+ /**
+ * Attempts to acquire a TTL-bounded lock on the given key without waiting.
+ *
+ * The completion fires strictly later than the call returns, on the caller's I/O
+ * thread, following the same async contract as the other {@code StoreHandler} operations.
+ * On success it receives a non-null ownership token, generated by the implementation, that
+ * the caller must supply to {@link #unlock} to release. On contention — another holder owns
+ * an unexpired lock — the completion receives {@code null}.
+ *
+ *
+ * The lock auto-releases when its TTL elapses; an {@code unlock} from the original holder
+ * against an expired or reacquired lock is treated as a no-op.
+ *
+ *
+ * @param key the key to lock
+ * @param ttlMillis the lease duration in milliseconds, or {@code Long.MAX_VALUE} for no expiry
+ * @param completion a callback that receives {@code (key, token)};
+ * {@code token} is non-null when the lock was acquired,
+ * {@code null} when another holder owns the key
+ */
+ void lock(
+ String key,
+ long ttlMillis,
+ BiConsumer completion);
+
+ /**
+ * Releases a lock previously acquired via {@link #lock}, provided the supplied token matches
+ * the current holder.
+ *
+ * The completion receives {@code null} when the lock has been released (including when it
+ * had already expired or been released — treated as a no-op success). When the supplied
+ * token does not match the current holder, the completion receives the current holder's
+ * token and the lock remains held.
+ *
+ *
+ * The completion fires strictly later than the call returns, on the caller's I/O
+ * thread, following the same async contract as the other {@code StoreHandler} operations.
+ *
+ *
+ * @param key the key to unlock
+ * @param token the ownership token returned by a prior {@link #lock} call
+ * @param completion a callback that receives {@code null} on success,
+ * or the current holder's token on ownership mismatch
+ */
+ void unlock(
+ String key,
+ String token,
+ Consumer completion);
+
+ /**
+ * Registers a listener invoked when the value associated with the given key changes.
+ *
+ * Registration returns synchronously. The listener fires strictly later than the
+ * mutation that triggered it, on the registering caller's I/O thread, following the same
+ * async contract as the other {@code StoreHandler} operations. Each invocation receives the
+ * key and the new value, or {@code null} when the entry has been removed.
+ *
+ *
+ * Implementations decide the scope of change visibility — in-process, node-wide, or
+ * cluster-wide — and document it in their module.
+ *
+ *
+ * @param key the key to observe
+ * @param listener a callback that receives {@code (key, value)} on every change;
+ * {@code value} is {@code null} when the entry has been removed
+ * @return a {@link Closeable} handle that unsubscribes the listener when closed
+ */
+ Closeable watch(
+ String key,
+ BiConsumer listener);
}
diff --git a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestStoreHandler.java b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestStoreHandler.java
index 360319f7ec..5884d36f65 100644
--- a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestStoreHandler.java
+++ b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestStoreHandler.java
@@ -15,8 +15,13 @@
*/
package io.aklivity.zilla.runtime.engine.test.internal.store;
+import java.io.Closeable;
+import java.util.List;
import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@@ -27,6 +32,8 @@
public final class TestStoreHandler implements StoreHandler
{
private final ConcurrentMap entries;
+ private final ConcurrentMap>> listeners;
+ private final ConcurrentMap locks;
private final Signaler signaler;
public TestStoreHandler(
@@ -36,6 +43,8 @@ public TestStoreHandler(
{
this.entries = Objects.requireNonNull(entries);
this.signaler = Objects.requireNonNull(signaler);
+ this.listeners = new ConcurrentHashMap<>();
+ this.locks = new ConcurrentHashMap<>();
}
@Override
@@ -55,6 +64,7 @@ public void put(
Consumer completion)
{
entries.put(key, value);
+ notifyListeners(key, value);
defer(() -> completion.accept(null));
}
@@ -66,6 +76,10 @@ public void putIfAbsent(
Consumer completion)
{
final String existing = entries.putIfAbsent(key, value);
+ if (existing == null)
+ {
+ notifyListeners(key, value);
+ }
defer(() -> completion.accept(existing));
}
@@ -74,7 +88,11 @@ public void delete(
String key,
Consumer completion)
{
- entries.remove(key);
+ final String removed = entries.remove(key);
+ if (removed != null)
+ {
+ notifyListeners(key, null);
+ }
defer(() -> completion.accept(null));
}
@@ -84,13 +102,103 @@ public void getAndDelete(
Consumer completion)
{
final String prior = entries.remove(key);
+ if (prior != null)
+ {
+ notifyListeners(key, null);
+ }
defer(() -> completion.accept(prior));
}
+ @Override
+ public void lock(
+ String key,
+ long ttlMillis,
+ BiConsumer completion)
+ {
+ final long now = System.currentTimeMillis();
+ final long expiresAt = ttlMillis == Long.MAX_VALUE ? Long.MAX_VALUE : now + ttlMillis;
+ final String token = UUID.randomUUID().toString();
+ final TestLockEntry candidate = new TestLockEntry(token, expiresAt);
+ TestLockEntry existing = locks.putIfAbsent(key, candidate);
+ if (existing != null && existing.expiresAt <= now)
+ {
+ existing = locks.replace(key, existing, candidate) ? null : locks.get(key);
+ }
+ final String result = existing == null ? token : null;
+ defer(() -> completion.accept(key, result));
+ }
+
+ @Override
+ public void unlock(
+ String key,
+ String token,
+ Consumer completion)
+ {
+ final long now = System.currentTimeMillis();
+ final TestLockEntry current = locks.get(key);
+ final String result;
+ if (current == null || current.expiresAt <= now)
+ {
+ if (current != null)
+ {
+ locks.remove(key, current);
+ }
+ result = null;
+ }
+ else if (current.token.equals(token))
+ {
+ locks.remove(key, current);
+ result = null;
+ }
+ else
+ {
+ result = current.token;
+ }
+ defer(() -> completion.accept(result));
+ }
+
+ @Override
+ public Closeable watch(
+ String key,
+ BiConsumer listener)
+ {
+ final List> list = listeners.computeIfAbsent(key, k -> new CopyOnWriteArrayList<>());
+ list.add(listener);
+ return () ->
+ {
+ final List> current = listeners.get(key);
+ if (current != null)
+ {
+ current.remove(listener);
+ }
+ };
+ }
+
+ private void notifyListeners(
+ String key,
+ String value)
+ {
+ final List> list = listeners.get(key);
+ if (list != null && !list.isEmpty())
+ {
+ final long now = System.currentTimeMillis();
+ for (BiConsumer listener : list)
+ {
+ signaler.signalAt(now, 0, ignored -> listener.accept(key, value));
+ }
+ }
+ }
+
private void defer(
Runnable task)
{
// contract: callback fires strictly later than the call, on the caller's I/O thread
signaler.signalAt(System.currentTimeMillis(), 0, ignored -> task.run());
}
+
+ private record TestLockEntry(
+ String token,
+ long expiresAt)
+ {
+ }
}
diff --git a/runtime/store-memory/src/main/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStore.java b/runtime/store-memory/src/main/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStore.java
index a7da7bd2a8..15a9d19ae3 100644
--- a/runtime/store-memory/src/main/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStore.java
+++ b/runtime/store-memory/src/main/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStore.java
@@ -15,6 +15,7 @@
package io.aklivity.zilla.runtime.store.memory.internal;
import java.net.URL;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -22,6 +23,8 @@
import io.aklivity.zilla.runtime.engine.EngineContext;
import io.aklivity.zilla.runtime.engine.store.Store;
import io.aklivity.zilla.runtime.engine.store.StoreContext;
+import io.aklivity.zilla.runtime.store.memory.internal.MemoryStoreHandler.LockEntry;
+import io.aklivity.zilla.runtime.store.memory.internal.MemoryStoreHandler.Watcher;
final class MemoryStore implements Store
{
@@ -45,7 +48,12 @@ public String name()
public StoreContext supply(
EngineContext context)
{
- return new MemoryStoreContext(this::acquireEntries, this::releaseEntries, context.signaler());
+ return new MemoryStoreContext(
+ this::acquireEntries,
+ this::supplyWatchers,
+ this::supplyLocks,
+ this::releaseEntries,
+ context.signaler());
}
@Override
@@ -62,6 +70,20 @@ private ConcurrentMap acquireEntries(
return memoryStorage.entries;
}
+ private ConcurrentMap> supplyWatchers(
+ long storeId)
+ {
+ // attach already incremented refs via acquireEntries
+ return storage.computeIfAbsent(storeId, id -> new MemoryStorage()).watchers;
+ }
+
+ private ConcurrentMap supplyLocks(
+ long storeId)
+ {
+ // attach already incremented refs via acquireEntries
+ return storage.computeIfAbsent(storeId, id -> new MemoryStorage()).locks;
+ }
+
private void releaseEntries(
long storeId)
{
@@ -72,5 +94,7 @@ private static final class MemoryStorage
{
final AtomicInteger refs = new AtomicInteger();
final ConcurrentMap entries = new ConcurrentHashMap<>();
+ final ConcurrentMap> watchers = new ConcurrentHashMap<>();
+ final ConcurrentMap locks = new ConcurrentHashMap<>();
}
}
diff --git a/runtime/store-memory/src/main/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreContext.java b/runtime/store-memory/src/main/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreContext.java
index 29cf345c34..8ae3827487 100644
--- a/runtime/store-memory/src/main/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreContext.java
+++ b/runtime/store-memory/src/main/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreContext.java
@@ -14,6 +14,7 @@
*/
package io.aklivity.zilla.runtime.store.memory.internal;
+import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.function.LongConsumer;
import java.util.function.LongFunction;
@@ -22,19 +23,27 @@
import io.aklivity.zilla.runtime.engine.config.StoreConfig;
import io.aklivity.zilla.runtime.engine.store.StoreContext;
import io.aklivity.zilla.runtime.engine.store.StoreHandler;
+import io.aklivity.zilla.runtime.store.memory.internal.MemoryStoreHandler.LockEntry;
+import io.aklivity.zilla.runtime.store.memory.internal.MemoryStoreHandler.Watcher;
final class MemoryStoreContext implements StoreContext
{
private final LongFunction> supplyEntries;
+ private final LongFunction>> supplyWatchers;
+ private final LongFunction> supplyLocks;
private final LongConsumer removeEntries;
private final Signaler signaler;
MemoryStoreContext(
LongFunction> supplyEntries,
+ LongFunction>> supplyWatchers,
+ LongFunction> supplyLocks,
LongConsumer removeEntries,
Signaler signaler)
{
this.supplyEntries = supplyEntries;
+ this.supplyWatchers = supplyWatchers;
+ this.supplyLocks = supplyLocks;
this.removeEntries = removeEntries;
this.signaler = signaler;
}
@@ -43,7 +52,11 @@ final class MemoryStoreContext implements StoreContext
public StoreHandler attach(
StoreConfig config)
{
- return new MemoryStoreHandler(supplyEntries.apply(config.id), signaler);
+ return new MemoryStoreHandler(
+ supplyEntries.apply(config.id),
+ supplyWatchers.apply(config.id),
+ supplyLocks.apply(config.id),
+ signaler);
}
@Override
diff --git a/runtime/store-memory/src/main/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandler.java b/runtime/store-memory/src/main/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandler.java
index 7702046991..79cc1830cc 100644
--- a/runtime/store-memory/src/main/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandler.java
+++ b/runtime/store-memory/src/main/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandler.java
@@ -14,8 +14,12 @@
*/
package io.aklivity.zilla.runtime.store.memory.internal;
+import java.io.Closeable;
+import java.util.List;
import java.util.Objects;
+import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@@ -25,13 +29,19 @@
final class MemoryStoreHandler implements StoreHandler
{
private final ConcurrentMap entries;
+ private final ConcurrentMap> watchers;
+ private final ConcurrentMap locks;
private final Signaler signaler;
MemoryStoreHandler(
ConcurrentMap entries,
+ ConcurrentMap> watchers,
+ ConcurrentMap locks,
Signaler signaler)
{
this.entries = entries;
+ this.watchers = watchers;
+ this.locks = locks;
this.signaler = Objects.requireNonNull(signaler);
}
@@ -54,6 +64,7 @@ public void put(
{
final long expiresAt = ttlMillis == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + ttlMillis;
entries.put(key, new MemoryEntry(value, expiresAt));
+ notifyWatchers(key, value);
defer(() -> completion.accept(null));
}
@@ -67,9 +78,14 @@ public void putIfAbsent(
final long expiresAt = ttlMillis == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + ttlMillis;
final MemoryEntry newEntry = new MemoryEntry(value, expiresAt);
final MemoryEntry existing = entries.putIfAbsent(key, newEntry);
+ boolean stored = existing == null;
if (existing != null && existing.expired())
{
- entries.replace(key, existing, newEntry);
+ stored = entries.replace(key, existing, newEntry);
+ }
+ if (stored)
+ {
+ notifyWatchers(key, value);
}
final String result = existing != null && !existing.expired() ? existing.value() : null;
defer(() -> completion.accept(result));
@@ -80,7 +96,11 @@ public void delete(
String key,
Consumer completion)
{
- entries.remove(key);
+ final MemoryEntry removed = entries.remove(key);
+ if (removed != null)
+ {
+ notifyWatchers(key, null);
+ }
defer(() -> completion.accept(null));
}
@@ -91,13 +111,111 @@ public void getAndDelete(
{
final MemoryEntry entry = entries.remove(key);
final String value = entry != null && !entry.expired() ? entry.value() : null;
+ if (entry != null)
+ {
+ notifyWatchers(key, null);
+ }
defer(() -> completion.accept(value));
}
+ @Override
+ public void lock(
+ String key,
+ long ttlMillis,
+ BiConsumer completion)
+ {
+ final long now = System.currentTimeMillis();
+ final long expiresAt = ttlMillis == Long.MAX_VALUE ? Long.MAX_VALUE : now + ttlMillis;
+ final String token = UUID.randomUUID().toString();
+ final LockEntry candidate = new LockEntry(token, expiresAt);
+ LockEntry existing = locks.putIfAbsent(key, candidate);
+ if (existing != null && existing.expiresAt() <= now)
+ {
+ existing = locks.replace(key, existing, candidate) ? null : locks.get(key);
+ }
+ final String result = existing == null ? token : null;
+ defer(() -> completion.accept(key, result));
+ }
+
+ @Override
+ public void unlock(
+ String key,
+ String token,
+ Consumer completion)
+ {
+ final long now = System.currentTimeMillis();
+ final LockEntry current = locks.get(key);
+ final String result;
+ if (current == null || current.expiresAt() <= now)
+ {
+ // already gone — treat as success
+ if (current != null)
+ {
+ locks.remove(key, current);
+ }
+ result = null;
+ }
+ else if (current.token().equals(token))
+ {
+ locks.remove(key, current);
+ result = null;
+ }
+ else
+ {
+ result = current.token();
+ }
+ defer(() -> completion.accept(result));
+ }
+
+ @Override
+ public Closeable watch(
+ String key,
+ BiConsumer listener)
+ {
+ final Watcher watcher = new Watcher(listener, signaler);
+ final List list = watchers.computeIfAbsent(key, k -> new CopyOnWriteArrayList<>());
+ list.add(watcher);
+ return () ->
+ {
+ final List current = watchers.get(key);
+ if (current != null)
+ {
+ current.remove(watcher);
+ }
+ };
+ }
+
+ private void notifyWatchers(
+ String key,
+ String value)
+ {
+ final List list = watchers.get(key);
+ if (list != null && !list.isEmpty())
+ {
+ final long now = System.currentTimeMillis();
+ for (Watcher w : list)
+ {
+ w.signaler.signalAt(now, 0, ignored -> w.listener.accept(key, value));
+ }
+ }
+ }
+
private void defer(
Runnable task)
{
// contract: callback fires strictly later than the call, on the caller's I/O thread
signaler.signalAt(System.currentTimeMillis(), 0, ignored -> task.run());
}
+
+ record Watcher(
+ BiConsumer listener,
+ Signaler signaler)
+ {
+ }
+
+ record LockEntry(
+ String token,
+ long expiresAt)
+ {
+ }
}
diff --git a/runtime/store-memory/src/test/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandlerTest.java b/runtime/store-memory/src/test/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandlerTest.java
new file mode 100644
index 0000000000..a6d75d07ab
--- /dev/null
+++ b/runtime/store-memory/src/test/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandlerTest.java
@@ -0,0 +1,526 @@
+/*
+ * Copyright 2021-2024 Aklivity Inc
+ *
+ * Licensed under the Aklivity Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * https://www.aklivity.io/aklivity-community-license/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.aklivity.zilla.runtime.store.memory.internal;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.io.Closeable;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.IntConsumer;
+
+import org.agrona.DirectBuffer;
+import org.junit.Before;
+import org.junit.Test;
+
+import io.aklivity.zilla.runtime.engine.concurrent.Signaler;
+import io.aklivity.zilla.runtime.store.memory.internal.MemoryStoreHandler.LockEntry;
+import io.aklivity.zilla.runtime.store.memory.internal.MemoryStoreHandler.Watcher;
+
+public class MemoryStoreHandlerTest
+{
+ private ConcurrentMap entries;
+ private ConcurrentMap> watchers;
+ private ConcurrentMap locks;
+
+ @Before
+ public void setUp()
+ {
+ entries = new ConcurrentHashMap<>();
+ watchers = new ConcurrentHashMap<>();
+ locks = new ConcurrentHashMap<>();
+ }
+
+ @Test
+ public void shouldFireListenerAfterPut()
+ {
+ final RecordingSignaler signaler = new RecordingSignaler();
+ final MemoryStoreHandler handler = new MemoryStoreHandler(entries, watchers, locks, signaler);
+
+ final List values = new ArrayList<>();
+ handler.watch("k", (key, value) -> values.add(value));
+
+ handler.put("k", "v1", Long.MAX_VALUE, MemoryStoreHandlerTest::ignored);
+
+ signaler.runOne();
+ assertThat(values, contains("v1"));
+ }
+
+ @Test
+ public void shouldFireListenerWithNullOnDelete()
+ {
+ final RecordingSignaler signaler = new RecordingSignaler();
+ final MemoryStoreHandler handler = new MemoryStoreHandler(entries, watchers, locks, signaler);
+
+ handler.put("k", "v1", Long.MAX_VALUE, MemoryStoreHandlerTest::ignored);
+ signaler.drain();
+
+ final List values = new ArrayList<>();
+ handler.watch("k", (key, value) -> values.add(value));
+
+ handler.delete("k", MemoryStoreHandlerTest::ignored);
+ signaler.runOne();
+
+ assertThat(values, contains((String) null));
+ }
+
+ @Test
+ public void shouldFireListenerOnGetAndDelete()
+ {
+ final RecordingSignaler signaler = new RecordingSignaler();
+ final MemoryStoreHandler handler = new MemoryStoreHandler(entries, watchers, locks, signaler);
+
+ handler.put("k", "v1", Long.MAX_VALUE, MemoryStoreHandlerTest::ignored);
+ signaler.drain();
+
+ final List values = new ArrayList<>();
+ handler.watch("k", (key, value) -> values.add(value));
+
+ handler.getAndDelete("k", MemoryStoreHandlerTest::ignored);
+ signaler.runOne();
+
+ assertThat(values, contains((String) null));
+ }
+
+ @Test
+ public void shouldFireListenerOnPutIfAbsentWhenStored()
+ {
+ final RecordingSignaler signaler = new RecordingSignaler();
+ final MemoryStoreHandler handler = new MemoryStoreHandler(entries, watchers, locks, signaler);
+
+ final List values = new ArrayList<>();
+ handler.watch("k", (key, value) -> values.add(value));
+
+ handler.putIfAbsent("k", "v1", Long.MAX_VALUE, MemoryStoreHandlerTest::ignored);
+ signaler.runOne();
+
+ assertThat(values, contains("v1"));
+ }
+
+ @Test
+ public void shouldNotFireListenerOnPutIfAbsentWhenExisting()
+ {
+ final RecordingSignaler signaler = new RecordingSignaler();
+ final MemoryStoreHandler handler = new MemoryStoreHandler(entries, watchers, locks, signaler);
+
+ handler.put("k", "existing", Long.MAX_VALUE, MemoryStoreHandlerTest::ignored);
+ signaler.drain();
+
+ final List values = new ArrayList<>();
+ handler.watch("k", (key, value) -> values.add(value));
+
+ handler.putIfAbsent("k", "v1", Long.MAX_VALUE, MemoryStoreHandlerTest::ignored);
+ signaler.drain();
+
+ assertThat(values, is(empty()));
+ }
+
+ @Test
+ public void shouldNotFireListenerOnDeleteWhenAbsent()
+ {
+ final RecordingSignaler signaler = new RecordingSignaler();
+ final MemoryStoreHandler handler = new MemoryStoreHandler(entries, watchers, locks, signaler);
+
+ final List values = new ArrayList<>();
+ handler.watch("k", (key, value) -> values.add(value));
+
+ handler.delete("k", MemoryStoreHandlerTest::ignored);
+ signaler.drain();
+
+ assertThat(values, is(empty()));
+ }
+
+ @Test
+ public void shouldUnsubscribeOnClose() throws Exception
+ {
+ final RecordingSignaler signaler = new RecordingSignaler();
+ final MemoryStoreHandler handler = new MemoryStoreHandler(entries, watchers, locks, signaler);
+
+ final List values = new ArrayList<>();
+ final Closeable subscription = handler.watch("k", (key, value) -> values.add(value));
+
+ subscription.close();
+
+ handler.put("k", "v1", Long.MAX_VALUE, MemoryStoreHandlerTest::ignored);
+ signaler.drain();
+
+ assertThat(values, is(empty()));
+ }
+
+ @Test
+ public void shouldDispatchToRegisteringWorkerWhenPutFromAnother()
+ {
+ final RecordingSignaler signalerA = new RecordingSignaler();
+ final RecordingSignaler signalerB = new RecordingSignaler();
+
+ final MemoryStoreHandler handlerA = new MemoryStoreHandler(entries, watchers, locks, signalerA);
+ final MemoryStoreHandler handlerB = new MemoryStoreHandler(entries, watchers, locks, signalerB);
+
+ final List received = new ArrayList<>();
+ handlerA.watch("k", (key, value) -> received.add(value));
+
+ // mutate on handler B; listener must dispatch via handler A's signaler
+ handlerB.put("k", "v-from-B", Long.MAX_VALUE, MemoryStoreHandlerTest::ignored);
+
+ // draining only B's signaler runs the put completion but NOT the listener
+ signalerB.drain();
+ assertThat(received, is(empty()));
+
+ // listener fires only when A's signaler is drained
+ signalerA.drain();
+ assertThat(received, contains("v-from-B"));
+ }
+
+ @Test
+ public void shouldFanOutToMultipleWatchers()
+ {
+ final RecordingSignaler signalerA = new RecordingSignaler();
+ final RecordingSignaler signalerB = new RecordingSignaler();
+
+ final MemoryStoreHandler handlerA = new MemoryStoreHandler(entries, watchers, locks, signalerA);
+ final MemoryStoreHandler handlerB = new MemoryStoreHandler(entries, watchers, locks, signalerB);
+
+ final List receivedA = new ArrayList<>();
+ final List receivedB = new ArrayList<>();
+ handlerA.watch("k", (key, value) -> receivedA.add(value));
+ handlerB.watch("k", (key, value) -> receivedB.add(value));
+
+ handlerA.put("k", "v1", Long.MAX_VALUE, MemoryStoreHandlerTest::ignored);
+
+ signalerA.drain();
+ signalerB.drain();
+
+ assertThat(receivedA, contains("v1"));
+ assertThat(receivedB, contains("v1"));
+ }
+
+ @Test
+ public void shouldNotFireListenerForOtherKey()
+ {
+ final RecordingSignaler signaler = new RecordingSignaler();
+ final MemoryStoreHandler handler = new MemoryStoreHandler(entries, watchers, locks, signaler);
+
+ final List values = new ArrayList<>();
+ handler.watch("k1", (key, value) -> values.add(value));
+
+ handler.put("k2", "v", Long.MAX_VALUE, MemoryStoreHandlerTest::ignored);
+ signaler.drain();
+
+ assertThat(values, is(empty()));
+ }
+
+ @Test
+ public void shouldReturnCloseableFromWatch()
+ {
+ final RecordingSignaler signaler = new RecordingSignaler();
+ final MemoryStoreHandler handler = new MemoryStoreHandler(entries, watchers, locks, signaler);
+
+ final Closeable subscription = handler.watch("k", MemoryStoreHandlerTest::ignored);
+ assertNotNull(subscription);
+ }
+
+ @Test
+ public void shouldAcquireLockWhenAbsent()
+ {
+ final RecordingSignaler signaler = new RecordingSignaler();
+ final MemoryStoreHandler handler = new MemoryStoreHandler(entries, watchers, locks, signaler);
+
+ final String[] tokenRef = new String[1];
+ handler.lock("k", Long.MAX_VALUE, (key, token) -> tokenRef[0] = token);
+ signaler.drain();
+
+ assertThat(tokenRef[0], is(notNullValue()));
+ }
+
+ @Test
+ public void shouldNotAcquireLockWhenHeld()
+ {
+ final RecordingSignaler signaler = new RecordingSignaler();
+ final MemoryStoreHandler handler = new MemoryStoreHandler(entries, watchers, locks, signaler);
+
+ handler.lock("k", Long.MAX_VALUE, MemoryStoreHandlerTest::ignored);
+ signaler.drain();
+
+ final String[] tokenRef = new String[1];
+ tokenRef[0] = "sentinel";
+ handler.lock("k", Long.MAX_VALUE, (key, token) -> tokenRef[0] = token);
+ signaler.drain();
+
+ assertNull(tokenRef[0]);
+ }
+
+ @Test
+ public void shouldReleaseLockOnUnlockWithMatchingToken()
+ {
+ final RecordingSignaler signaler = new RecordingSignaler();
+ final MemoryStoreHandler handler = new MemoryStoreHandler(entries, watchers, locks, signaler);
+
+ final String[] heldToken = new String[1];
+ handler.lock("k", Long.MAX_VALUE, (key, token) -> heldToken[0] = token);
+ signaler.drain();
+
+ final String[] unlockResult = new String[]{ "sentinel" };
+ handler.unlock("k", heldToken[0], result -> unlockResult[0] = result);
+ signaler.drain();
+ assertNull(unlockResult[0]);
+
+ final String[] reacquired = new String[1];
+ handler.lock("k", Long.MAX_VALUE, (key, token) -> reacquired[0] = token);
+ signaler.drain();
+ assertThat(reacquired[0], is(notNullValue()));
+ }
+
+ @Test
+ public void shouldRejectUnlockWithMismatchedToken()
+ {
+ final RecordingSignaler signaler = new RecordingSignaler();
+ final MemoryStoreHandler handler = new MemoryStoreHandler(entries, watchers, locks, signaler);
+
+ final String[] heldToken = new String[1];
+ handler.lock("k", Long.MAX_VALUE, (key, token) -> heldToken[0] = token);
+ signaler.drain();
+
+ final String[] unlockResult = new String[1];
+ handler.unlock("k", "wrong-token", result -> unlockResult[0] = result);
+ signaler.drain();
+ assertThat(unlockResult[0], is(equalTo(heldToken[0])));
+
+ // lock still held — re-attempt fails
+ final String[] reattempt = new String[]{ "sentinel" };
+ handler.lock("k", Long.MAX_VALUE, (key, token) -> reattempt[0] = token);
+ signaler.drain();
+ assertNull(reattempt[0]);
+ }
+
+ @Test
+ public void shouldAcquireLockAfterTtlExpiry() throws Exception
+ {
+ final RecordingSignaler signaler = new RecordingSignaler();
+ final MemoryStoreHandler handler = new MemoryStoreHandler(entries, watchers, locks, signaler);
+
+ final String[] firstToken = new String[1];
+ handler.lock("k", 10L, (key, token) -> firstToken[0] = token);
+ signaler.drain();
+ assertNotNull(firstToken[0]);
+
+ Thread.sleep(25L);
+
+ final String[] secondToken = new String[1];
+ handler.lock("k", Long.MAX_VALUE, (key, token) -> secondToken[0] = token);
+ signaler.drain();
+ assertThat(secondToken[0], is(notNullValue()));
+ assertThat(secondToken[0], is(not(equalTo(firstToken[0]))));
+ }
+
+ @Test
+ public void shouldTreatUnlockOfExpiredAsNoOp() throws Exception
+ {
+ final RecordingSignaler signaler = new RecordingSignaler();
+ final MemoryStoreHandler handler = new MemoryStoreHandler(entries, watchers, locks, signaler);
+
+ final String[] heldToken = new String[1];
+ handler.lock("k", 10L, (key, token) -> heldToken[0] = token);
+ signaler.drain();
+
+ Thread.sleep(25L);
+
+ final String[] unlockResult = new String[]{ "sentinel" };
+ handler.unlock("k", heldToken[0], result -> unlockResult[0] = result);
+ signaler.drain();
+ assertNull(unlockResult[0]);
+ }
+
+ @Test
+ public void shouldTreatUnlockOfMissingAsNoOp()
+ {
+ final RecordingSignaler signaler = new RecordingSignaler();
+ final MemoryStoreHandler handler = new MemoryStoreHandler(entries, watchers, locks, signaler);
+
+ final String[] unlockResult = new String[]{ "sentinel" };
+ handler.unlock("k", "any-token", result -> unlockResult[0] = result);
+ signaler.drain();
+ assertNull(unlockResult[0]);
+ }
+
+ @Test
+ public void shouldArbitrateLockAcrossWorkers()
+ {
+ final RecordingSignaler signalerA = new RecordingSignaler();
+ final RecordingSignaler signalerB = new RecordingSignaler();
+
+ final MemoryStoreHandler handlerA = new MemoryStoreHandler(entries, watchers, locks, signalerA);
+ final MemoryStoreHandler handlerB = new MemoryStoreHandler(entries, watchers, locks, signalerB);
+
+ final String[] tokenA = new String[1];
+ handlerA.lock("k", Long.MAX_VALUE, (key, token) -> tokenA[0] = token);
+ signalerA.drain();
+ assertNotNull(tokenA[0]);
+
+ final String[] tokenB = new String[]{ "sentinel" };
+ handlerB.lock("k", Long.MAX_VALUE, (key, token) -> tokenB[0] = token);
+ signalerB.drain();
+ assertNull(tokenB[0]);
+
+ handlerA.unlock("k", tokenA[0], MemoryStoreHandlerTest::ignored);
+ signalerA.drain();
+
+ final String[] tokenB2 = new String[1];
+ handlerB.lock("k", Long.MAX_VALUE, (key, token) -> tokenB2[0] = token);
+ signalerB.drain();
+ assertThat(tokenB2[0], is(notNullValue()));
+ }
+
+ private static void ignored(
+ String unused)
+ {
+ }
+
+ private static void ignored(
+ String unusedKey,
+ String unusedValue)
+ {
+ }
+
+ /**
+ * Captures scheduled callbacks so the test can drive them deterministically on its own thread.
+ */
+ private static final class RecordingSignaler implements Signaler
+ {
+ private final List scheduled = new ArrayList<>();
+
+ @Override
+ public long signalAt(
+ long timeMillis,
+ int signalId,
+ IntConsumer handler)
+ {
+ scheduled.add(handler);
+ return NO_CANCEL_ID;
+ }
+
+ @Override
+ public long signalAt(
+ Instant time,
+ int signalId,
+ IntConsumer handler)
+ {
+ return signalAt(time.toEpochMilli(), signalId, handler);
+ }
+
+ @Override
+ public void signalNow(
+ long originId,
+ long routedId,
+ long streamId,
+ long traceId,
+ int signalId,
+ int contextId)
+ {
+ }
+
+ @Override
+ public void signalNow(
+ long originId,
+ long routedId,
+ long streamId,
+ long traceId,
+ int signalId,
+ int contextId,
+ DirectBuffer buffer,
+ int offset,
+ int length)
+ {
+ }
+
+ @Override
+ public long signalAt(
+ long timeMillis,
+ long originId,
+ long routedId,
+ long streamId,
+ long traceId,
+ int signalId,
+ int contextId)
+ {
+ return NO_CANCEL_ID;
+ }
+
+ @Override
+ public long signalAt(
+ Instant time,
+ long originId,
+ long routedId,
+ long streamId,
+ long traceId,
+ int signalId,
+ int contextId)
+ {
+ return NO_CANCEL_ID;
+ }
+
+ @Override
+ public long signalTask(
+ Runnable task,
+ long originId,
+ long routedId,
+ long streamId,
+ long traceId,
+ int signalId,
+ int contextId)
+ {
+ return NO_CANCEL_ID;
+ }
+
+ @Override
+ public boolean cancel(
+ long cancelId)
+ {
+ return false;
+ }
+
+ IntConsumer pollScheduled()
+ {
+ return scheduled.isEmpty() ? null : scheduled.remove(0);
+ }
+
+ void runOne()
+ {
+ final IntConsumer handler = pollScheduled();
+ if (handler != null)
+ {
+ handler.accept(0);
+ }
+ }
+
+ void drain()
+ {
+ while (!scheduled.isEmpty())
+ {
+ scheduled.remove(0).accept(0);
+ }
+ }
+ }
+}
From e6fa515015a1b49b6c5e1da6be47ed1ed42d5a6d Mon Sep 17 00:00:00 2001
From: Claude
Date: Thu, 21 May 2026 23:19:47 +0000
Subject: [PATCH 2/4] test(store-memory): add IT coverage for lock / unlock /
watch
Extends the test binding's storeAssertions vocabulary with lock,
unlock, and watch ops so YAML-driven IT scripts can drive the new
StoreHandler primitives the same way as get/put/delete.
Adds three new YAML configs and IT methods for store-memory:
- store.lock.yaml: lock acquires, then second attempt fails
- store.unlock.yaml: unlock against absent lock is a no-op success
- store.watch.yaml: watch registers, subsequent put fires listener
with the matching value
---
.../internal/binding/TestBindingFactory.java | 46 ++
.../memory/internal/MemoryStoreHandlerIT.java | 30 +
.../internal/MemoryStoreHandlerTest.java | 526 ------------------
.../specs/store/memory/config/store.lock.yaml | 39 ++
.../store/memory/config/store.unlock.yaml | 35 ++
.../store/memory/config/store.watch.yaml | 37 ++
6 files changed, 187 insertions(+), 526 deletions(-)
delete mode 100644 runtime/store-memory/src/test/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandlerTest.java
create mode 100644 specs/store-memory.spec/src/main/scripts/io/aklivity/zilla/specs/store/memory/config/store.lock.yaml
create mode 100644 specs/store-memory.spec/src/main/scripts/io/aklivity/zilla/specs/store/memory/config/store.unlock.yaml
create mode 100644 specs/store-memory.spec/src/main/scripts/io/aklivity/zilla/specs/store/memory/config/store.watch.yaml
diff --git a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/TestBindingFactory.java b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/TestBindingFactory.java
index e026d31734..638c428ffd 100644
--- a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/TestBindingFactory.java
+++ b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/TestBindingFactory.java
@@ -711,6 +711,52 @@ private void runStoreAssertions(
callbackFired.value = true;
});
break;
+ case "lock":
+ store.lock(a.key, a.ttl, (k, token) ->
+ {
+ if (Thread.currentThread() != dispatchThread ||
+ callbackFired.value)
+ {
+ doInitialReset(traceId);
+ }
+ // expect="" → token must be null (lock failed);
+ // expect=non-empty → token must be non-null (lock succeeded)
+ if (a.hasExpect)
+ {
+ boolean expectAcquired = a.expect != null && !a.expect.isEmpty();
+ boolean acquired = token != null;
+ if (expectAcquired != acquired)
+ {
+ doInitialReset(traceId);
+ }
+ }
+ callbackFired.value = true;
+ });
+ break;
+ case "unlock":
+ store.unlock(a.key, a.value, v ->
+ {
+ if (Thread.currentThread() != dispatchThread ||
+ callbackFired.value ||
+ a.hasExpect && !Objects.equals(v, a.expect))
+ {
+ doInitialReset(traceId);
+ }
+ callbackFired.value = true;
+ });
+ break;
+ case "watch":
+ // registration is synchronous; listener fires async on mutations
+ store.watch(a.key, (k, v) ->
+ {
+ if (Thread.currentThread() != dispatchThread ||
+ a.hasExpect && !Objects.equals(v, a.expect))
+ {
+ doInitialReset(traceId);
+ }
+ });
+ callbackFired.value = false;
+ continue;
default:
doInitialReset(traceId);
break;
diff --git a/runtime/store-memory/src/test/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandlerIT.java b/runtime/store-memory/src/test/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandlerIT.java
index b6f5092c74..226e6cb7cb 100644
--- a/runtime/store-memory/src/test/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandlerIT.java
+++ b/runtime/store-memory/src/test/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandlerIT.java
@@ -55,4 +55,34 @@ public void shouldHandshakeWithMemoryStore() throws Exception
{
k3po.finish();
}
+
+ @Test
+ @Configuration("store.lock.yaml")
+ @Specification({
+ "${net}/handshake/client",
+ "${app}/handshake/server"})
+ public void shouldLockResource() throws Exception
+ {
+ k3po.finish();
+ }
+
+ @Test
+ @Configuration("store.unlock.yaml")
+ @Specification({
+ "${net}/handshake/client",
+ "${app}/handshake/server"})
+ public void shouldUnlockAbsentResource() throws Exception
+ {
+ k3po.finish();
+ }
+
+ @Test
+ @Configuration("store.watch.yaml")
+ @Specification({
+ "${net}/handshake/client",
+ "${app}/handshake/server"})
+ public void shouldWatchKey() throws Exception
+ {
+ k3po.finish();
+ }
}
diff --git a/runtime/store-memory/src/test/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandlerTest.java b/runtime/store-memory/src/test/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandlerTest.java
deleted file mode 100644
index a6d75d07ab..0000000000
--- a/runtime/store-memory/src/test/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandlerTest.java
+++ /dev/null
@@ -1,526 +0,0 @@
-/*
- * Copyright 2021-2024 Aklivity Inc
- *
- * Licensed under the Aklivity Community License (the "License"); you may not use
- * this file except in compliance with the License. You may obtain a copy of the
- * License at
- *
- * https://www.aklivity.io/aklivity-community-license/
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package io.aklivity.zilla.runtime.store.memory.internal;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
-import java.io.Closeable;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.function.IntConsumer;
-
-import org.agrona.DirectBuffer;
-import org.junit.Before;
-import org.junit.Test;
-
-import io.aklivity.zilla.runtime.engine.concurrent.Signaler;
-import io.aklivity.zilla.runtime.store.memory.internal.MemoryStoreHandler.LockEntry;
-import io.aklivity.zilla.runtime.store.memory.internal.MemoryStoreHandler.Watcher;
-
-public class MemoryStoreHandlerTest
-{
- private ConcurrentMap entries;
- private ConcurrentMap> watchers;
- private ConcurrentMap locks;
-
- @Before
- public void setUp()
- {
- entries = new ConcurrentHashMap<>();
- watchers = new ConcurrentHashMap<>();
- locks = new ConcurrentHashMap<>();
- }
-
- @Test
- public void shouldFireListenerAfterPut()
- {
- final RecordingSignaler signaler = new RecordingSignaler();
- final MemoryStoreHandler handler = new MemoryStoreHandler(entries, watchers, locks, signaler);
-
- final List values = new ArrayList<>();
- handler.watch("k", (key, value) -> values.add(value));
-
- handler.put("k", "v1", Long.MAX_VALUE, MemoryStoreHandlerTest::ignored);
-
- signaler.runOne();
- assertThat(values, contains("v1"));
- }
-
- @Test
- public void shouldFireListenerWithNullOnDelete()
- {
- final RecordingSignaler signaler = new RecordingSignaler();
- final MemoryStoreHandler handler = new MemoryStoreHandler(entries, watchers, locks, signaler);
-
- handler.put("k", "v1", Long.MAX_VALUE, MemoryStoreHandlerTest::ignored);
- signaler.drain();
-
- final List values = new ArrayList<>();
- handler.watch("k", (key, value) -> values.add(value));
-
- handler.delete("k", MemoryStoreHandlerTest::ignored);
- signaler.runOne();
-
- assertThat(values, contains((String) null));
- }
-
- @Test
- public void shouldFireListenerOnGetAndDelete()
- {
- final RecordingSignaler signaler = new RecordingSignaler();
- final MemoryStoreHandler handler = new MemoryStoreHandler(entries, watchers, locks, signaler);
-
- handler.put("k", "v1", Long.MAX_VALUE, MemoryStoreHandlerTest::ignored);
- signaler.drain();
-
- final List values = new ArrayList<>();
- handler.watch("k", (key, value) -> values.add(value));
-
- handler.getAndDelete("k", MemoryStoreHandlerTest::ignored);
- signaler.runOne();
-
- assertThat(values, contains((String) null));
- }
-
- @Test
- public void shouldFireListenerOnPutIfAbsentWhenStored()
- {
- final RecordingSignaler signaler = new RecordingSignaler();
- final MemoryStoreHandler handler = new MemoryStoreHandler(entries, watchers, locks, signaler);
-
- final List values = new ArrayList<>();
- handler.watch("k", (key, value) -> values.add(value));
-
- handler.putIfAbsent("k", "v1", Long.MAX_VALUE, MemoryStoreHandlerTest::ignored);
- signaler.runOne();
-
- assertThat(values, contains("v1"));
- }
-
- @Test
- public void shouldNotFireListenerOnPutIfAbsentWhenExisting()
- {
- final RecordingSignaler signaler = new RecordingSignaler();
- final MemoryStoreHandler handler = new MemoryStoreHandler(entries, watchers, locks, signaler);
-
- handler.put("k", "existing", Long.MAX_VALUE, MemoryStoreHandlerTest::ignored);
- signaler.drain();
-
- final List values = new ArrayList<>();
- handler.watch("k", (key, value) -> values.add(value));
-
- handler.putIfAbsent("k", "v1", Long.MAX_VALUE, MemoryStoreHandlerTest::ignored);
- signaler.drain();
-
- assertThat(values, is(empty()));
- }
-
- @Test
- public void shouldNotFireListenerOnDeleteWhenAbsent()
- {
- final RecordingSignaler signaler = new RecordingSignaler();
- final MemoryStoreHandler handler = new MemoryStoreHandler(entries, watchers, locks, signaler);
-
- final List values = new ArrayList<>();
- handler.watch("k", (key, value) -> values.add(value));
-
- handler.delete("k", MemoryStoreHandlerTest::ignored);
- signaler.drain();
-
- assertThat(values, is(empty()));
- }
-
- @Test
- public void shouldUnsubscribeOnClose() throws Exception
- {
- final RecordingSignaler signaler = new RecordingSignaler();
- final MemoryStoreHandler handler = new MemoryStoreHandler(entries, watchers, locks, signaler);
-
- final List values = new ArrayList<>();
- final Closeable subscription = handler.watch("k", (key, value) -> values.add(value));
-
- subscription.close();
-
- handler.put("k", "v1", Long.MAX_VALUE, MemoryStoreHandlerTest::ignored);
- signaler.drain();
-
- assertThat(values, is(empty()));
- }
-
- @Test
- public void shouldDispatchToRegisteringWorkerWhenPutFromAnother()
- {
- final RecordingSignaler signalerA = new RecordingSignaler();
- final RecordingSignaler signalerB = new RecordingSignaler();
-
- final MemoryStoreHandler handlerA = new MemoryStoreHandler(entries, watchers, locks, signalerA);
- final MemoryStoreHandler handlerB = new MemoryStoreHandler(entries, watchers, locks, signalerB);
-
- final List received = new ArrayList<>();
- handlerA.watch("k", (key, value) -> received.add(value));
-
- // mutate on handler B; listener must dispatch via handler A's signaler
- handlerB.put("k", "v-from-B", Long.MAX_VALUE, MemoryStoreHandlerTest::ignored);
-
- // draining only B's signaler runs the put completion but NOT the listener
- signalerB.drain();
- assertThat(received, is(empty()));
-
- // listener fires only when A's signaler is drained
- signalerA.drain();
- assertThat(received, contains("v-from-B"));
- }
-
- @Test
- public void shouldFanOutToMultipleWatchers()
- {
- final RecordingSignaler signalerA = new RecordingSignaler();
- final RecordingSignaler signalerB = new RecordingSignaler();
-
- final MemoryStoreHandler handlerA = new MemoryStoreHandler(entries, watchers, locks, signalerA);
- final MemoryStoreHandler handlerB = new MemoryStoreHandler(entries, watchers, locks, signalerB);
-
- final List receivedA = new ArrayList<>();
- final List receivedB = new ArrayList<>();
- handlerA.watch("k", (key, value) -> receivedA.add(value));
- handlerB.watch("k", (key, value) -> receivedB.add(value));
-
- handlerA.put("k", "v1", Long.MAX_VALUE, MemoryStoreHandlerTest::ignored);
-
- signalerA.drain();
- signalerB.drain();
-
- assertThat(receivedA, contains("v1"));
- assertThat(receivedB, contains("v1"));
- }
-
- @Test
- public void shouldNotFireListenerForOtherKey()
- {
- final RecordingSignaler signaler = new RecordingSignaler();
- final MemoryStoreHandler handler = new MemoryStoreHandler(entries, watchers, locks, signaler);
-
- final List values = new ArrayList<>();
- handler.watch("k1", (key, value) -> values.add(value));
-
- handler.put("k2", "v", Long.MAX_VALUE, MemoryStoreHandlerTest::ignored);
- signaler.drain();
-
- assertThat(values, is(empty()));
- }
-
- @Test
- public void shouldReturnCloseableFromWatch()
- {
- final RecordingSignaler signaler = new RecordingSignaler();
- final MemoryStoreHandler handler = new MemoryStoreHandler(entries, watchers, locks, signaler);
-
- final Closeable subscription = handler.watch("k", MemoryStoreHandlerTest::ignored);
- assertNotNull(subscription);
- }
-
- @Test
- public void shouldAcquireLockWhenAbsent()
- {
- final RecordingSignaler signaler = new RecordingSignaler();
- final MemoryStoreHandler handler = new MemoryStoreHandler(entries, watchers, locks, signaler);
-
- final String[] tokenRef = new String[1];
- handler.lock("k", Long.MAX_VALUE, (key, token) -> tokenRef[0] = token);
- signaler.drain();
-
- assertThat(tokenRef[0], is(notNullValue()));
- }
-
- @Test
- public void shouldNotAcquireLockWhenHeld()
- {
- final RecordingSignaler signaler = new RecordingSignaler();
- final MemoryStoreHandler handler = new MemoryStoreHandler(entries, watchers, locks, signaler);
-
- handler.lock("k", Long.MAX_VALUE, MemoryStoreHandlerTest::ignored);
- signaler.drain();
-
- final String[] tokenRef = new String[1];
- tokenRef[0] = "sentinel";
- handler.lock("k", Long.MAX_VALUE, (key, token) -> tokenRef[0] = token);
- signaler.drain();
-
- assertNull(tokenRef[0]);
- }
-
- @Test
- public void shouldReleaseLockOnUnlockWithMatchingToken()
- {
- final RecordingSignaler signaler = new RecordingSignaler();
- final MemoryStoreHandler handler = new MemoryStoreHandler(entries, watchers, locks, signaler);
-
- final String[] heldToken = new String[1];
- handler.lock("k", Long.MAX_VALUE, (key, token) -> heldToken[0] = token);
- signaler.drain();
-
- final String[] unlockResult = new String[]{ "sentinel" };
- handler.unlock("k", heldToken[0], result -> unlockResult[0] = result);
- signaler.drain();
- assertNull(unlockResult[0]);
-
- final String[] reacquired = new String[1];
- handler.lock("k", Long.MAX_VALUE, (key, token) -> reacquired[0] = token);
- signaler.drain();
- assertThat(reacquired[0], is(notNullValue()));
- }
-
- @Test
- public void shouldRejectUnlockWithMismatchedToken()
- {
- final RecordingSignaler signaler = new RecordingSignaler();
- final MemoryStoreHandler handler = new MemoryStoreHandler(entries, watchers, locks, signaler);
-
- final String[] heldToken = new String[1];
- handler.lock("k", Long.MAX_VALUE, (key, token) -> heldToken[0] = token);
- signaler.drain();
-
- final String[] unlockResult = new String[1];
- handler.unlock("k", "wrong-token", result -> unlockResult[0] = result);
- signaler.drain();
- assertThat(unlockResult[0], is(equalTo(heldToken[0])));
-
- // lock still held — re-attempt fails
- final String[] reattempt = new String[]{ "sentinel" };
- handler.lock("k", Long.MAX_VALUE, (key, token) -> reattempt[0] = token);
- signaler.drain();
- assertNull(reattempt[0]);
- }
-
- @Test
- public void shouldAcquireLockAfterTtlExpiry() throws Exception
- {
- final RecordingSignaler signaler = new RecordingSignaler();
- final MemoryStoreHandler handler = new MemoryStoreHandler(entries, watchers, locks, signaler);
-
- final String[] firstToken = new String[1];
- handler.lock("k", 10L, (key, token) -> firstToken[0] = token);
- signaler.drain();
- assertNotNull(firstToken[0]);
-
- Thread.sleep(25L);
-
- final String[] secondToken = new String[1];
- handler.lock("k", Long.MAX_VALUE, (key, token) -> secondToken[0] = token);
- signaler.drain();
- assertThat(secondToken[0], is(notNullValue()));
- assertThat(secondToken[0], is(not(equalTo(firstToken[0]))));
- }
-
- @Test
- public void shouldTreatUnlockOfExpiredAsNoOp() throws Exception
- {
- final RecordingSignaler signaler = new RecordingSignaler();
- final MemoryStoreHandler handler = new MemoryStoreHandler(entries, watchers, locks, signaler);
-
- final String[] heldToken = new String[1];
- handler.lock("k", 10L, (key, token) -> heldToken[0] = token);
- signaler.drain();
-
- Thread.sleep(25L);
-
- final String[] unlockResult = new String[]{ "sentinel" };
- handler.unlock("k", heldToken[0], result -> unlockResult[0] = result);
- signaler.drain();
- assertNull(unlockResult[0]);
- }
-
- @Test
- public void shouldTreatUnlockOfMissingAsNoOp()
- {
- final RecordingSignaler signaler = new RecordingSignaler();
- final MemoryStoreHandler handler = new MemoryStoreHandler(entries, watchers, locks, signaler);
-
- final String[] unlockResult = new String[]{ "sentinel" };
- handler.unlock("k", "any-token", result -> unlockResult[0] = result);
- signaler.drain();
- assertNull(unlockResult[0]);
- }
-
- @Test
- public void shouldArbitrateLockAcrossWorkers()
- {
- final RecordingSignaler signalerA = new RecordingSignaler();
- final RecordingSignaler signalerB = new RecordingSignaler();
-
- final MemoryStoreHandler handlerA = new MemoryStoreHandler(entries, watchers, locks, signalerA);
- final MemoryStoreHandler handlerB = new MemoryStoreHandler(entries, watchers, locks, signalerB);
-
- final String[] tokenA = new String[1];
- handlerA.lock("k", Long.MAX_VALUE, (key, token) -> tokenA[0] = token);
- signalerA.drain();
- assertNotNull(tokenA[0]);
-
- final String[] tokenB = new String[]{ "sentinel" };
- handlerB.lock("k", Long.MAX_VALUE, (key, token) -> tokenB[0] = token);
- signalerB.drain();
- assertNull(tokenB[0]);
-
- handlerA.unlock("k", tokenA[0], MemoryStoreHandlerTest::ignored);
- signalerA.drain();
-
- final String[] tokenB2 = new String[1];
- handlerB.lock("k", Long.MAX_VALUE, (key, token) -> tokenB2[0] = token);
- signalerB.drain();
- assertThat(tokenB2[0], is(notNullValue()));
- }
-
- private static void ignored(
- String unused)
- {
- }
-
- private static void ignored(
- String unusedKey,
- String unusedValue)
- {
- }
-
- /**
- * Captures scheduled callbacks so the test can drive them deterministically on its own thread.
- */
- private static final class RecordingSignaler implements Signaler
- {
- private final List scheduled = new ArrayList<>();
-
- @Override
- public long signalAt(
- long timeMillis,
- int signalId,
- IntConsumer handler)
- {
- scheduled.add(handler);
- return NO_CANCEL_ID;
- }
-
- @Override
- public long signalAt(
- Instant time,
- int signalId,
- IntConsumer handler)
- {
- return signalAt(time.toEpochMilli(), signalId, handler);
- }
-
- @Override
- public void signalNow(
- long originId,
- long routedId,
- long streamId,
- long traceId,
- int signalId,
- int contextId)
- {
- }
-
- @Override
- public void signalNow(
- long originId,
- long routedId,
- long streamId,
- long traceId,
- int signalId,
- int contextId,
- DirectBuffer buffer,
- int offset,
- int length)
- {
- }
-
- @Override
- public long signalAt(
- long timeMillis,
- long originId,
- long routedId,
- long streamId,
- long traceId,
- int signalId,
- int contextId)
- {
- return NO_CANCEL_ID;
- }
-
- @Override
- public long signalAt(
- Instant time,
- long originId,
- long routedId,
- long streamId,
- long traceId,
- int signalId,
- int contextId)
- {
- return NO_CANCEL_ID;
- }
-
- @Override
- public long signalTask(
- Runnable task,
- long originId,
- long routedId,
- long streamId,
- long traceId,
- int signalId,
- int contextId)
- {
- return NO_CANCEL_ID;
- }
-
- @Override
- public boolean cancel(
- long cancelId)
- {
- return false;
- }
-
- IntConsumer pollScheduled()
- {
- return scheduled.isEmpty() ? null : scheduled.remove(0);
- }
-
- void runOne()
- {
- final IntConsumer handler = pollScheduled();
- if (handler != null)
- {
- handler.accept(0);
- }
- }
-
- void drain()
- {
- while (!scheduled.isEmpty())
- {
- scheduled.remove(0).accept(0);
- }
- }
- }
-}
diff --git a/specs/store-memory.spec/src/main/scripts/io/aklivity/zilla/specs/store/memory/config/store.lock.yaml b/specs/store-memory.spec/src/main/scripts/io/aklivity/zilla/specs/store/memory/config/store.lock.yaml
new file mode 100644
index 0000000000..1061df6c82
--- /dev/null
+++ b/specs/store-memory.spec/src/main/scripts/io/aklivity/zilla/specs/store/memory/config/store.lock.yaml
@@ -0,0 +1,39 @@
+#
+# Copyright 2021-2024 Aklivity Inc
+#
+# Licensed under the Aklivity Community License (the "License"); you may not use
+# this file except in compliance with the License. You may obtain a copy of the
+# License at
+#
+# https://www.aklivity.io/aklivity-community-license/
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+#
+
+---
+name: test
+stores:
+ memory0:
+ type: memory
+bindings:
+ net0:
+ type: test
+ kind: server
+ options:
+ store: memory0
+ assertions:
+ store:
+ memory0:
+ - op: lock
+ key: lock.resource
+ ttl: 60000
+ expect: acquired
+ - op: lock
+ key: lock.resource
+ ttl: 60000
+ expect: ""
+ routes:
+ - exit: app0
diff --git a/specs/store-memory.spec/src/main/scripts/io/aklivity/zilla/specs/store/memory/config/store.unlock.yaml b/specs/store-memory.spec/src/main/scripts/io/aklivity/zilla/specs/store/memory/config/store.unlock.yaml
new file mode 100644
index 0000000000..09e4f1ae30
--- /dev/null
+++ b/specs/store-memory.spec/src/main/scripts/io/aklivity/zilla/specs/store/memory/config/store.unlock.yaml
@@ -0,0 +1,35 @@
+#
+# Copyright 2021-2024 Aklivity Inc
+#
+# Licensed under the Aklivity Community License (the "License"); you may not use
+# this file except in compliance with the License. You may obtain a copy of the
+# License at
+#
+# https://www.aklivity.io/aklivity-community-license/
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+#
+
+---
+name: test
+stores:
+ memory0:
+ type: memory
+bindings:
+ net0:
+ type: test
+ kind: server
+ options:
+ store: memory0
+ assertions:
+ store:
+ memory0:
+ - op: unlock
+ key: unlock.absent
+ value: any-token
+ expect: null
+ routes:
+ - exit: app0
diff --git a/specs/store-memory.spec/src/main/scripts/io/aklivity/zilla/specs/store/memory/config/store.watch.yaml b/specs/store-memory.spec/src/main/scripts/io/aklivity/zilla/specs/store/memory/config/store.watch.yaml
new file mode 100644
index 0000000000..a3a48356c7
--- /dev/null
+++ b/specs/store-memory.spec/src/main/scripts/io/aklivity/zilla/specs/store/memory/config/store.watch.yaml
@@ -0,0 +1,37 @@
+#
+# Copyright 2021-2024 Aklivity Inc
+#
+# Licensed under the Aklivity Community License (the "License"); you may not use
+# this file except in compliance with the License. You may obtain a copy of the
+# License at
+#
+# https://www.aklivity.io/aklivity-community-license/
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+#
+
+---
+name: test
+stores:
+ memory0:
+ type: memory
+bindings:
+ net0:
+ type: test
+ kind: server
+ options:
+ store: memory0
+ assertions:
+ store:
+ memory0:
+ - op: watch
+ key: watch.key
+ expect: watched-value
+ - op: put
+ key: watch.key
+ value: watched-value
+ routes:
+ - exit: app0
From 2f7c53ec32226ef1874baf1bb0b390080ff9ed7c Mon Sep 17 00:00:00 2001
From: Claude
Date: Thu, 21 May 2026 23:43:21 +0000
Subject: [PATCH 3/4] refactor(engine): use Duration for StoreHandler ttl and
tighten unlock
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
SPI changes:
- put / putIfAbsent / lock now take Duration ttl; null = no expiry,
replacing the Long.MAX_VALUE sentinel. Read more naturally and
parses from ISO-8601 in YAML configs (e.g. PT1M, PT8H).
- unlock completion now returns the supplied token on a successful
ownership-checked release, or null otherwise — mirroring lock's
shape (non-null = you own it). Previous semantics inverted this
AND leaked the current holder's token back to a caller that did
not prove ownership, defeating the ownership check.
store-memory + TestStoreHandler updated to match. Test binding's
StoreAssertion config carries ttl as Duration; the adapter
serialises/parses via Duration.toString / Duration.parse.
store.lock.yaml uses ttl: PT1M; expect: "" still matches the
not-acquired path of lock the same way it now matches the
not-released path of unlock.
---
.../runtime/engine/store/StoreHandler.java | 26 ++++++------
.../internal/binding/TestBindingFactory.java | 2 +-
.../config/TestBindingOptionsConfig.java | 5 ++-
.../TestBindingOptionsConfigAdapter.java | 7 ++--
.../test/internal/store/TestStoreHandler.java | 27 ++++++-------
.../memory/internal/MemoryStoreHandler.java | 40 ++++++++++---------
.../specs/store/memory/config/store.lock.yaml | 4 +-
7 files changed, 58 insertions(+), 53 deletions(-)
diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/store/StoreHandler.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/store/StoreHandler.java
index bc6de4b6e7..1f9bcedd06 100644
--- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/store/StoreHandler.java
+++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/store/StoreHandler.java
@@ -16,6 +16,7 @@
package io.aklivity.zilla.runtime.engine.store;
import java.io.Closeable;
+import java.time.Duration;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@@ -61,13 +62,13 @@ void get(
*
* @param key the key to store
* @param value the value to associate
- * @param ttl the time-to-live in milliseconds, or {@code Long.MAX_VALUE} for no expiry
+ * @param ttl the time-to-live, or {@code null} for no expiry
* @param completion a callback invoked when the operation completes
*/
void put(
String key,
String value,
- long ttl,
+ Duration ttl,
Consumer completion);
/**
@@ -75,14 +76,14 @@ void put(
*
* @param key the key to store
* @param value the value to associate if absent
- * @param ttl the time-to-live in milliseconds, or {@code Long.MAX_VALUE} for no expiry
+ * @param ttl the time-to-live, or {@code null} for no expiry
* @param completion a callback that receives the existing value if present,
* or {@code null} if the key was absent and the value was stored
*/
void putIfAbsent(
String key,
String value,
- long ttl,
+ Duration ttl,
Consumer completion);
/**
@@ -121,24 +122,25 @@ void getAndDelete(
*
*
* @param key the key to lock
- * @param ttlMillis the lease duration in milliseconds, or {@code Long.MAX_VALUE} for no expiry
+ * @param ttl the lease duration, or {@code null} for no expiry
* @param completion a callback that receives {@code (key, token)};
* {@code token} is non-null when the lock was acquired,
* {@code null} when another holder owns the key
*/
void lock(
String key,
- long ttlMillis,
+ Duration ttl,
BiConsumer completion);
/**
* Releases a lock previously acquired via {@link #lock}, provided the supplied token matches
* the current holder.
*
- * The completion receives {@code null} when the lock has been released (including when it
- * had already expired or been released — treated as a no-op success). When the supplied
- * token does not match the current holder, the completion receives the current holder's
- * token and the lock remains held.
+ * On a successful ownership-checked release, the completion receives the supplied token —
+ * mirroring the {@link #lock} completion shape (non-null = you own it). On failure — the
+ * supplied token does not match the current holder, or the lock has already expired or been
+ * released — the completion receives {@code null}. No information about another holder is
+ * surfaced to a caller that did not prove ownership.
*
*
* The completion fires strictly later than the call returns, on the caller's I/O
@@ -147,8 +149,8 @@ void lock(
*
* @param key the key to unlock
* @param token the ownership token returned by a prior {@link #lock} call
- * @param completion a callback that receives {@code null} on success,
- * or the current holder's token on ownership mismatch
+ * @param completion a callback that receives the supplied {@code token} on a successful
+ * ownership-checked release, or {@code null} otherwise
*/
void unlock(
String key,
diff --git a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/TestBindingFactory.java b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/TestBindingFactory.java
index 638c428ffd..f96e2ecced 100644
--- a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/TestBindingFactory.java
+++ b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/TestBindingFactory.java
@@ -191,7 +191,7 @@ public void attach(
{
final Thread dispatchThread = Thread.currentThread();
final MutableBoolean callbackFired = new MutableBoolean();
- this.store.putIfAbsent("init", "", Long.MAX_VALUE, v ->
+ this.store.putIfAbsent("init", "", null, v ->
{
if (Thread.currentThread() != dispatchThread || callbackFired.value)
{
diff --git a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/config/TestBindingOptionsConfig.java b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/config/TestBindingOptionsConfig.java
index 0cc4a9bbc4..c55a127f7d 100644
--- a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/config/TestBindingOptionsConfig.java
+++ b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/config/TestBindingOptionsConfig.java
@@ -15,6 +15,7 @@
*/
package io.aklivity.zilla.runtime.engine.test.internal.binding.config;
+import java.time.Duration;
import java.util.List;
import java.util.function.Function;
@@ -176,7 +177,7 @@ public static final class StoreAssertion
public final String op;
public final String key;
public final String value;
- public final long ttl;
+ public final Duration ttl;
public final String expect;
public final boolean hasExpect;
public final long delay;
@@ -185,7 +186,7 @@ public StoreAssertion(
String op,
String key,
String value,
- long ttl,
+ Duration ttl,
String expect,
boolean hasExpect,
long delay)
diff --git a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/config/TestBindingOptionsConfigAdapter.java b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/config/TestBindingOptionsConfigAdapter.java
index 9c581c31f5..c9e3058e14 100644
--- a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/config/TestBindingOptionsConfigAdapter.java
+++ b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/config/TestBindingOptionsConfigAdapter.java
@@ -15,6 +15,7 @@
*/
package io.aklivity.zilla.runtime.engine.test.internal.binding.config;
+import java.time.Duration;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -179,9 +180,9 @@ public JsonObject adaptToJson(
{
assertion.add(STORE_VALUE_NAME, a.value);
}
- if (a.ttl != Long.MAX_VALUE)
+ if (a.ttl != null)
{
- assertion.add(STORE_TTL_NAME, a.ttl);
+ assertion.add(STORE_TTL_NAME, a.ttl.toString());
}
if (a.hasExpect)
{
@@ -362,7 +363,7 @@ public OptionsConfig adaptFromJson(
s.getString(STORE_OP_NAME),
s.getString(STORE_KEY_NAME),
s.getString(STORE_VALUE_NAME, null),
- s.containsKey(STORE_TTL_NAME) ? s.getJsonNumber(STORE_TTL_NAME).longValue() : Long.MAX_VALUE,
+ s.containsKey(STORE_TTL_NAME) ? Duration.parse(s.getString(STORE_TTL_NAME)) : null,
expect,
hasExpect,
s.containsKey(DELAY_NAME) ? s.getJsonNumber(DELAY_NAME).longValue() : 0L));
diff --git a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestStoreHandler.java b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestStoreHandler.java
index 5884d36f65..67c2e92c4c 100644
--- a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestStoreHandler.java
+++ b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestStoreHandler.java
@@ -16,6 +16,7 @@
package io.aklivity.zilla.runtime.engine.test.internal.store;
import java.io.Closeable;
+import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
@@ -60,7 +61,7 @@ public void get(
public void put(
String key,
String value,
- long ttl,
+ Duration ttl,
Consumer completion)
{
entries.put(key, value);
@@ -72,7 +73,7 @@ public void put(
public void putIfAbsent(
String key,
String value,
- long ttl,
+ Duration ttl,
Consumer completion)
{
final String existing = entries.putIfAbsent(key, value);
@@ -112,11 +113,11 @@ public void getAndDelete(
@Override
public void lock(
String key,
- long ttlMillis,
+ Duration ttl,
BiConsumer completion)
{
final long now = System.currentTimeMillis();
- final long expiresAt = ttlMillis == Long.MAX_VALUE ? Long.MAX_VALUE : now + ttlMillis;
+ final long expiresAt = ttl == null ? Long.MAX_VALUE : now + ttl.toMillis();
final String token = UUID.randomUUID().toString();
final TestLockEntry candidate = new TestLockEntry(token, expiresAt);
TestLockEntry existing = locks.putIfAbsent(key, candidate);
@@ -137,22 +138,18 @@ public void unlock(
final long now = System.currentTimeMillis();
final TestLockEntry current = locks.get(key);
final String result;
- if (current == null || current.expiresAt <= now)
- {
- if (current != null)
- {
- locks.remove(key, current);
- }
- result = null;
- }
- else if (current.token.equals(token))
+ if (current != null && current.expiresAt > now && current.token.equals(token))
{
locks.remove(key, current);
- result = null;
+ result = token;
}
else
{
- result = current.token;
+ if (current != null && current.expiresAt <= now)
+ {
+ locks.remove(key, current);
+ }
+ result = null;
}
defer(() -> completion.accept(result));
}
diff --git a/runtime/store-memory/src/main/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandler.java b/runtime/store-memory/src/main/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandler.java
index 79cc1830cc..c60acc4340 100644
--- a/runtime/store-memory/src/main/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandler.java
+++ b/runtime/store-memory/src/main/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandler.java
@@ -15,6 +15,7 @@
package io.aklivity.zilla.runtime.store.memory.internal;
import java.io.Closeable;
+import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
@@ -59,10 +60,10 @@ public void get(
public void put(
String key,
String value,
- long ttlMillis,
+ Duration ttl,
Consumer completion)
{
- final long expiresAt = ttlMillis == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + ttlMillis;
+ final long expiresAt = expiresAt(ttl);
entries.put(key, new MemoryEntry(value, expiresAt));
notifyWatchers(key, value);
defer(() -> completion.accept(null));
@@ -72,10 +73,10 @@ public void put(
public void putIfAbsent(
String key,
String value,
- long ttlMillis,
+ Duration ttl,
Consumer completion)
{
- final long expiresAt = ttlMillis == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + ttlMillis;
+ final long expiresAt = expiresAt(ttl);
final MemoryEntry newEntry = new MemoryEntry(value, expiresAt);
final MemoryEntry existing = entries.putIfAbsent(key, newEntry);
boolean stored = existing == null;
@@ -121,11 +122,11 @@ public void getAndDelete(
@Override
public void lock(
String key,
- long ttlMillis,
+ Duration ttl,
BiConsumer completion)
{
final long now = System.currentTimeMillis();
- final long expiresAt = ttlMillis == Long.MAX_VALUE ? Long.MAX_VALUE : now + ttlMillis;
+ final long expiresAt = ttl == null ? Long.MAX_VALUE : now + ttl.toMillis();
final String token = UUID.randomUUID().toString();
final LockEntry candidate = new LockEntry(token, expiresAt);
LockEntry existing = locks.putIfAbsent(key, candidate);
@@ -146,23 +147,20 @@ public void unlock(
final long now = System.currentTimeMillis();
final LockEntry current = locks.get(key);
final String result;
- if (current == null || current.expiresAt() <= now)
- {
- // already gone — treat as success
- if (current != null)
- {
- locks.remove(key, current);
- }
- result = null;
- }
- else if (current.token().equals(token))
+ if (current != null && current.expiresAt() > now && current.token().equals(token))
{
locks.remove(key, current);
- result = null;
+ result = token;
}
else
{
- result = current.token();
+ // expired holder still cluttering the map — clean up opportunistically;
+ // either way the caller did not prove ownership of an active lock
+ if (current != null && current.expiresAt() <= now)
+ {
+ locks.remove(key, current);
+ }
+ result = null;
}
defer(() -> completion.accept(result));
}
@@ -207,6 +205,12 @@ private void defer(
signaler.signalAt(System.currentTimeMillis(), 0, ignored -> task.run());
}
+ private static long expiresAt(
+ Duration ttl)
+ {
+ return ttl == null ? Long.MAX_VALUE : System.currentTimeMillis() + ttl.toMillis();
+ }
+
record Watcher(
BiConsumer listener,
Signaler signaler)
diff --git a/specs/store-memory.spec/src/main/scripts/io/aklivity/zilla/specs/store/memory/config/store.lock.yaml b/specs/store-memory.spec/src/main/scripts/io/aklivity/zilla/specs/store/memory/config/store.lock.yaml
index 1061df6c82..908f0757db 100644
--- a/specs/store-memory.spec/src/main/scripts/io/aklivity/zilla/specs/store/memory/config/store.lock.yaml
+++ b/specs/store-memory.spec/src/main/scripts/io/aklivity/zilla/specs/store/memory/config/store.lock.yaml
@@ -29,11 +29,11 @@ bindings:
memory0:
- op: lock
key: lock.resource
- ttl: 60000
+ ttl: PT1M
expect: acquired
- op: lock
key: lock.resource
- ttl: 60000
+ ttl: PT1M
expect: ""
routes:
- exit: app0
From 1e47b953a1cc0547dfd779a8bc68f31db7baf224 Mon Sep 17 00:00:00 2001
From: Claude
Date: Fri, 22 May 2026 00:06:51 +0000
Subject: [PATCH 4/4] fix(binding-mcp): pass Duration to StoreHandler put /
putIfAbsent
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
The SPI change to use Duration for ttl in commit 2f7c53ec missed the
in-tree consumer in McpProxyCache. The autobuild compile failures
(zilla CI #1790) pointed at three call sites that still passed
ttl.toMillis() and STORE_TTL_FOREVER (= Long.MAX_VALUE). Switch to
passing the Duration directly and treat null as the no-expiry
sentinel — matches the new SPI contract.
---
.../binding/mcp/internal/stream/cache/McpProxyCache.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/stream/cache/McpProxyCache.java b/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/stream/cache/McpProxyCache.java
index dd47bebe1a..2533b6318d 100644
--- a/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/stream/cache/McpProxyCache.java
+++ b/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/stream/cache/McpProxyCache.java
@@ -46,7 +46,7 @@ public final class McpProxyCache
private static final String STORE_LOCK_KEY_RESOURCES = STORE_KEY_RESOURCES + STORE_LOCK_SUFFIX;
private static final String STORE_LOCK_KEY_PROMPTS = STORE_KEY_PROMPTS + STORE_LOCK_SUFFIX;
private static final String STORE_LOCK_KEY_LIFECYCLE = "lifecycle.lock";
- private static final long STORE_TTL_FOREVER = Long.MAX_VALUE;
+ private static final Duration STORE_TTL_FOREVER = null;
public final long bindingId;
public final GuardHandler guard;
@@ -131,7 +131,7 @@ public void register(
void acquireLifecycle(
Consumer completion)
{
- store.putIfAbsent(STORE_LOCK_KEY_LIFECYCLE, STORE_LOCK_VALUE, leaseTtl.toMillis(),
+ store.putIfAbsent(STORE_LOCK_KEY_LIFECYCLE, STORE_LOCK_VALUE, leaseTtl,
prior -> completion.accept(prior == null));
}
@@ -208,7 +208,7 @@ public void put(
public void acquire(
Consumer completion)
{
- store.putIfAbsent(storeLockKey, STORE_LOCK_VALUE, leaseTtl.toMillis(),
+ store.putIfAbsent(storeLockKey, STORE_LOCK_VALUE, leaseTtl,
prior -> completion.accept(prior == null));
}