diff --git a/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/stream/cache/McpProxyCache.java b/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/stream/cache/McpProxyCache.java index dd47bebe1a..2533b6318d 100644 --- a/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/stream/cache/McpProxyCache.java +++ b/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/stream/cache/McpProxyCache.java @@ -46,7 +46,7 @@ public final class McpProxyCache private static final String STORE_LOCK_KEY_RESOURCES = STORE_KEY_RESOURCES + STORE_LOCK_SUFFIX; private static final String STORE_LOCK_KEY_PROMPTS = STORE_KEY_PROMPTS + STORE_LOCK_SUFFIX; private static final String STORE_LOCK_KEY_LIFECYCLE = "lifecycle.lock"; - private static final long STORE_TTL_FOREVER = Long.MAX_VALUE; + private static final Duration STORE_TTL_FOREVER = null; public final long bindingId; public final GuardHandler guard; @@ -131,7 +131,7 @@ public void register( void acquireLifecycle( Consumer completion) { - store.putIfAbsent(STORE_LOCK_KEY_LIFECYCLE, STORE_LOCK_VALUE, leaseTtl.toMillis(), + store.putIfAbsent(STORE_LOCK_KEY_LIFECYCLE, STORE_LOCK_VALUE, leaseTtl, prior -> completion.accept(prior == null)); } @@ -208,7 +208,7 @@ public void put( public void acquire( Consumer completion) { - store.putIfAbsent(storeLockKey, STORE_LOCK_VALUE, leaseTtl.toMillis(), + store.putIfAbsent(storeLockKey, STORE_LOCK_VALUE, leaseTtl, prior -> completion.accept(prior == null)); } diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/store/StoreHandler.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/store/StoreHandler.java index d81bcbafc7..1f9bcedd06 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/store/StoreHandler.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/store/StoreHandler.java @@ -15,6 +15,8 @@ */ package io.aklivity.zilla.runtime.engine.store; +import java.io.Closeable; +import java.time.Duration; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -60,13 +62,13 @@ void get( * * @param key the key to store * @param value the value to associate - * @param ttl the time-to-live in milliseconds, or {@code Long.MAX_VALUE} for no expiry + * @param ttl the time-to-live, or {@code null} for no expiry * @param completion a callback invoked when the operation completes */ void put( String key, String value, - long ttl, + Duration ttl, Consumer completion); /** @@ -74,14 +76,14 @@ void put( * * @param key the key to store * @param value the value to associate if absent - * @param ttl the time-to-live in milliseconds, or {@code Long.MAX_VALUE} for no expiry + * @param ttl the time-to-live, or {@code null} for no expiry * @param completion a callback that receives the existing value if present, * or {@code null} if the key was absent and the value was stored */ void putIfAbsent( String key, String value, - long ttl, + Duration ttl, Consumer completion); /** @@ -104,4 +106,76 @@ void delete( void getAndDelete( String key, Consumer completion); + + /** + * Attempts to acquire a TTL-bounded lock on the given key without waiting. + *

+ * The completion fires strictly later than the call returns, on the caller's I/O + * thread, following the same async contract as the other {@code StoreHandler} operations. + * On success it receives a non-null ownership token, generated by the implementation, that + * the caller must supply to {@link #unlock} to release. On contention — another holder owns + * an unexpired lock — the completion receives {@code null}. + *

+ *

+ * The lock auto-releases when its TTL elapses; an {@code unlock} from the original holder + * against an expired or reacquired lock is treated as a no-op. + *

+ * + * @param key the key to lock + * @param ttl the lease duration, or {@code null} for no expiry + * @param completion a callback that receives {@code (key, token)}; + * {@code token} is non-null when the lock was acquired, + * {@code null} when another holder owns the key + */ + void lock( + String key, + Duration ttl, + BiConsumer completion); + + /** + * Releases a lock previously acquired via {@link #lock}, provided the supplied token matches + * the current holder. + *

+ * On a successful ownership-checked release, the completion receives the supplied token — + * mirroring the {@link #lock} completion shape (non-null = you own it). On failure — the + * supplied token does not match the current holder, or the lock has already expired or been + * released — the completion receives {@code null}. No information about another holder is + * surfaced to a caller that did not prove ownership. + *

+ *

+ * The completion fires strictly later than the call returns, on the caller's I/O + * thread, following the same async contract as the other {@code StoreHandler} operations. + *

+ * + * @param key the key to unlock + * @param token the ownership token returned by a prior {@link #lock} call + * @param completion a callback that receives the supplied {@code token} on a successful + * ownership-checked release, or {@code null} otherwise + */ + void unlock( + String key, + String token, + Consumer completion); + + /** + * Registers a listener invoked when the value associated with the given key changes. + *

+ * Registration returns synchronously. The listener fires strictly later than the + * mutation that triggered it, on the registering caller's I/O thread, following the same + * async contract as the other {@code StoreHandler} operations. Each invocation receives the + * key and the new value, or {@code null} when the entry has been removed. + *

+ *

+ * Implementations decide the scope of change visibility — in-process, node-wide, or + * cluster-wide — and document it in their module. + *

+ * + * @param key the key to observe + * @param listener a callback that receives {@code (key, value)} on every change; + * {@code value} is {@code null} when the entry has been removed + * @return a {@link Closeable} handle that unsubscribes the listener when closed + */ + Closeable watch( + String key, + BiConsumer listener); } diff --git a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/TestBindingFactory.java b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/TestBindingFactory.java index e026d31734..f96e2ecced 100644 --- a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/TestBindingFactory.java +++ b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/TestBindingFactory.java @@ -191,7 +191,7 @@ public void attach( { final Thread dispatchThread = Thread.currentThread(); final MutableBoolean callbackFired = new MutableBoolean(); - this.store.putIfAbsent("init", "", Long.MAX_VALUE, v -> + this.store.putIfAbsent("init", "", null, v -> { if (Thread.currentThread() != dispatchThread || callbackFired.value) { @@ -711,6 +711,52 @@ private void runStoreAssertions( callbackFired.value = true; }); break; + case "lock": + store.lock(a.key, a.ttl, (k, token) -> + { + if (Thread.currentThread() != dispatchThread || + callbackFired.value) + { + doInitialReset(traceId); + } + // expect="" → token must be null (lock failed); + // expect=non-empty → token must be non-null (lock succeeded) + if (a.hasExpect) + { + boolean expectAcquired = a.expect != null && !a.expect.isEmpty(); + boolean acquired = token != null; + if (expectAcquired != acquired) + { + doInitialReset(traceId); + } + } + callbackFired.value = true; + }); + break; + case "unlock": + store.unlock(a.key, a.value, v -> + { + if (Thread.currentThread() != dispatchThread || + callbackFired.value || + a.hasExpect && !Objects.equals(v, a.expect)) + { + doInitialReset(traceId); + } + callbackFired.value = true; + }); + break; + case "watch": + // registration is synchronous; listener fires async on mutations + store.watch(a.key, (k, v) -> + { + if (Thread.currentThread() != dispatchThread || + a.hasExpect && !Objects.equals(v, a.expect)) + { + doInitialReset(traceId); + } + }); + callbackFired.value = false; + continue; default: doInitialReset(traceId); break; diff --git a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/config/TestBindingOptionsConfig.java b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/config/TestBindingOptionsConfig.java index 0cc4a9bbc4..c55a127f7d 100644 --- a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/config/TestBindingOptionsConfig.java +++ b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/config/TestBindingOptionsConfig.java @@ -15,6 +15,7 @@ */ package io.aklivity.zilla.runtime.engine.test.internal.binding.config; +import java.time.Duration; import java.util.List; import java.util.function.Function; @@ -176,7 +177,7 @@ public static final class StoreAssertion public final String op; public final String key; public final String value; - public final long ttl; + public final Duration ttl; public final String expect; public final boolean hasExpect; public final long delay; @@ -185,7 +186,7 @@ public StoreAssertion( String op, String key, String value, - long ttl, + Duration ttl, String expect, boolean hasExpect, long delay) diff --git a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/config/TestBindingOptionsConfigAdapter.java b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/config/TestBindingOptionsConfigAdapter.java index 9c581c31f5..c9e3058e14 100644 --- a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/config/TestBindingOptionsConfigAdapter.java +++ b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/config/TestBindingOptionsConfigAdapter.java @@ -15,6 +15,7 @@ */ package io.aklivity.zilla.runtime.engine.test.internal.binding.config; +import java.time.Duration; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -179,9 +180,9 @@ public JsonObject adaptToJson( { assertion.add(STORE_VALUE_NAME, a.value); } - if (a.ttl != Long.MAX_VALUE) + if (a.ttl != null) { - assertion.add(STORE_TTL_NAME, a.ttl); + assertion.add(STORE_TTL_NAME, a.ttl.toString()); } if (a.hasExpect) { @@ -362,7 +363,7 @@ public OptionsConfig adaptFromJson( s.getString(STORE_OP_NAME), s.getString(STORE_KEY_NAME), s.getString(STORE_VALUE_NAME, null), - s.containsKey(STORE_TTL_NAME) ? s.getJsonNumber(STORE_TTL_NAME).longValue() : Long.MAX_VALUE, + s.containsKey(STORE_TTL_NAME) ? Duration.parse(s.getString(STORE_TTL_NAME)) : null, expect, hasExpect, s.containsKey(DELAY_NAME) ? s.getJsonNumber(DELAY_NAME).longValue() : 0L)); diff --git a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestStoreHandler.java b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestStoreHandler.java index 360319f7ec..67c2e92c4c 100644 --- a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestStoreHandler.java +++ b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/store/TestStoreHandler.java @@ -15,8 +15,14 @@ */ package io.aklivity.zilla.runtime.engine.test.internal.store; +import java.io.Closeable; +import java.time.Duration; +import java.util.List; import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -27,6 +33,8 @@ public final class TestStoreHandler implements StoreHandler { private final ConcurrentMap entries; + private final ConcurrentMap>> listeners; + private final ConcurrentMap locks; private final Signaler signaler; public TestStoreHandler( @@ -36,6 +44,8 @@ public TestStoreHandler( { this.entries = Objects.requireNonNull(entries); this.signaler = Objects.requireNonNull(signaler); + this.listeners = new ConcurrentHashMap<>(); + this.locks = new ConcurrentHashMap<>(); } @Override @@ -51,10 +61,11 @@ public void get( public void put( String key, String value, - long ttl, + Duration ttl, Consumer completion) { entries.put(key, value); + notifyListeners(key, value); defer(() -> completion.accept(null)); } @@ -62,10 +73,14 @@ public void put( public void putIfAbsent( String key, String value, - long ttl, + Duration ttl, Consumer completion) { final String existing = entries.putIfAbsent(key, value); + if (existing == null) + { + notifyListeners(key, value); + } defer(() -> completion.accept(existing)); } @@ -74,7 +89,11 @@ public void delete( String key, Consumer completion) { - entries.remove(key); + final String removed = entries.remove(key); + if (removed != null) + { + notifyListeners(key, null); + } defer(() -> completion.accept(null)); } @@ -84,13 +103,99 @@ public void getAndDelete( Consumer completion) { final String prior = entries.remove(key); + if (prior != null) + { + notifyListeners(key, null); + } defer(() -> completion.accept(prior)); } + @Override + public void lock( + String key, + Duration ttl, + BiConsumer completion) + { + final long now = System.currentTimeMillis(); + final long expiresAt = ttl == null ? Long.MAX_VALUE : now + ttl.toMillis(); + final String token = UUID.randomUUID().toString(); + final TestLockEntry candidate = new TestLockEntry(token, expiresAt); + TestLockEntry existing = locks.putIfAbsent(key, candidate); + if (existing != null && existing.expiresAt <= now) + { + existing = locks.replace(key, existing, candidate) ? null : locks.get(key); + } + final String result = existing == null ? token : null; + defer(() -> completion.accept(key, result)); + } + + @Override + public void unlock( + String key, + String token, + Consumer completion) + { + final long now = System.currentTimeMillis(); + final TestLockEntry current = locks.get(key); + final String result; + if (current != null && current.expiresAt > now && current.token.equals(token)) + { + locks.remove(key, current); + result = token; + } + else + { + if (current != null && current.expiresAt <= now) + { + locks.remove(key, current); + } + result = null; + } + defer(() -> completion.accept(result)); + } + + @Override + public Closeable watch( + String key, + BiConsumer listener) + { + final List> list = listeners.computeIfAbsent(key, k -> new CopyOnWriteArrayList<>()); + list.add(listener); + return () -> + { + final List> current = listeners.get(key); + if (current != null) + { + current.remove(listener); + } + }; + } + + private void notifyListeners( + String key, + String value) + { + final List> list = listeners.get(key); + if (list != null && !list.isEmpty()) + { + final long now = System.currentTimeMillis(); + for (BiConsumer listener : list) + { + signaler.signalAt(now, 0, ignored -> listener.accept(key, value)); + } + } + } + private void defer( Runnable task) { // contract: callback fires strictly later than the call, on the caller's I/O thread signaler.signalAt(System.currentTimeMillis(), 0, ignored -> task.run()); } + + private record TestLockEntry( + String token, + long expiresAt) + { + } } diff --git a/runtime/store-memory/src/main/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStore.java b/runtime/store-memory/src/main/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStore.java index a7da7bd2a8..15a9d19ae3 100644 --- a/runtime/store-memory/src/main/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStore.java +++ b/runtime/store-memory/src/main/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStore.java @@ -15,6 +15,7 @@ package io.aklivity.zilla.runtime.store.memory.internal; import java.net.URL; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; @@ -22,6 +23,8 @@ import io.aklivity.zilla.runtime.engine.EngineContext; import io.aklivity.zilla.runtime.engine.store.Store; import io.aklivity.zilla.runtime.engine.store.StoreContext; +import io.aklivity.zilla.runtime.store.memory.internal.MemoryStoreHandler.LockEntry; +import io.aklivity.zilla.runtime.store.memory.internal.MemoryStoreHandler.Watcher; final class MemoryStore implements Store { @@ -45,7 +48,12 @@ public String name() public StoreContext supply( EngineContext context) { - return new MemoryStoreContext(this::acquireEntries, this::releaseEntries, context.signaler()); + return new MemoryStoreContext( + this::acquireEntries, + this::supplyWatchers, + this::supplyLocks, + this::releaseEntries, + context.signaler()); } @Override @@ -62,6 +70,20 @@ private ConcurrentMap acquireEntries( return memoryStorage.entries; } + private ConcurrentMap> supplyWatchers( + long storeId) + { + // attach already incremented refs via acquireEntries + return storage.computeIfAbsent(storeId, id -> new MemoryStorage()).watchers; + } + + private ConcurrentMap supplyLocks( + long storeId) + { + // attach already incremented refs via acquireEntries + return storage.computeIfAbsent(storeId, id -> new MemoryStorage()).locks; + } + private void releaseEntries( long storeId) { @@ -72,5 +94,7 @@ private static final class MemoryStorage { final AtomicInteger refs = new AtomicInteger(); final ConcurrentMap entries = new ConcurrentHashMap<>(); + final ConcurrentMap> watchers = new ConcurrentHashMap<>(); + final ConcurrentMap locks = new ConcurrentHashMap<>(); } } diff --git a/runtime/store-memory/src/main/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreContext.java b/runtime/store-memory/src/main/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreContext.java index 29cf345c34..8ae3827487 100644 --- a/runtime/store-memory/src/main/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreContext.java +++ b/runtime/store-memory/src/main/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreContext.java @@ -14,6 +14,7 @@ */ package io.aklivity.zilla.runtime.store.memory.internal; +import java.util.List; import java.util.concurrent.ConcurrentMap; import java.util.function.LongConsumer; import java.util.function.LongFunction; @@ -22,19 +23,27 @@ import io.aklivity.zilla.runtime.engine.config.StoreConfig; import io.aklivity.zilla.runtime.engine.store.StoreContext; import io.aklivity.zilla.runtime.engine.store.StoreHandler; +import io.aklivity.zilla.runtime.store.memory.internal.MemoryStoreHandler.LockEntry; +import io.aklivity.zilla.runtime.store.memory.internal.MemoryStoreHandler.Watcher; final class MemoryStoreContext implements StoreContext { private final LongFunction> supplyEntries; + private final LongFunction>> supplyWatchers; + private final LongFunction> supplyLocks; private final LongConsumer removeEntries; private final Signaler signaler; MemoryStoreContext( LongFunction> supplyEntries, + LongFunction>> supplyWatchers, + LongFunction> supplyLocks, LongConsumer removeEntries, Signaler signaler) { this.supplyEntries = supplyEntries; + this.supplyWatchers = supplyWatchers; + this.supplyLocks = supplyLocks; this.removeEntries = removeEntries; this.signaler = signaler; } @@ -43,7 +52,11 @@ final class MemoryStoreContext implements StoreContext public StoreHandler attach( StoreConfig config) { - return new MemoryStoreHandler(supplyEntries.apply(config.id), signaler); + return new MemoryStoreHandler( + supplyEntries.apply(config.id), + supplyWatchers.apply(config.id), + supplyLocks.apply(config.id), + signaler); } @Override diff --git a/runtime/store-memory/src/main/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandler.java b/runtime/store-memory/src/main/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandler.java index 7702046991..c60acc4340 100644 --- a/runtime/store-memory/src/main/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandler.java +++ b/runtime/store-memory/src/main/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandler.java @@ -14,8 +14,13 @@ */ package io.aklivity.zilla.runtime.store.memory.internal; +import java.io.Closeable; +import java.time.Duration; +import java.util.List; import java.util.Objects; +import java.util.UUID; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -25,13 +30,19 @@ final class MemoryStoreHandler implements StoreHandler { private final ConcurrentMap entries; + private final ConcurrentMap> watchers; + private final ConcurrentMap locks; private final Signaler signaler; MemoryStoreHandler( ConcurrentMap entries, + ConcurrentMap> watchers, + ConcurrentMap locks, Signaler signaler) { this.entries = entries; + this.watchers = watchers; + this.locks = locks; this.signaler = Objects.requireNonNull(signaler); } @@ -49,11 +60,12 @@ public void get( public void put( String key, String value, - long ttlMillis, + Duration ttl, Consumer completion) { - final long expiresAt = ttlMillis == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + ttlMillis; + final long expiresAt = expiresAt(ttl); entries.put(key, new MemoryEntry(value, expiresAt)); + notifyWatchers(key, value); defer(() -> completion.accept(null)); } @@ -61,15 +73,20 @@ public void put( public void putIfAbsent( String key, String value, - long ttlMillis, + Duration ttl, Consumer completion) { - final long expiresAt = ttlMillis == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + ttlMillis; + final long expiresAt = expiresAt(ttl); final MemoryEntry newEntry = new MemoryEntry(value, expiresAt); final MemoryEntry existing = entries.putIfAbsent(key, newEntry); + boolean stored = existing == null; if (existing != null && existing.expired()) { - entries.replace(key, existing, newEntry); + stored = entries.replace(key, existing, newEntry); + } + if (stored) + { + notifyWatchers(key, value); } final String result = existing != null && !existing.expired() ? existing.value() : null; defer(() -> completion.accept(result)); @@ -80,7 +97,11 @@ public void delete( String key, Consumer completion) { - entries.remove(key); + final MemoryEntry removed = entries.remove(key); + if (removed != null) + { + notifyWatchers(key, null); + } defer(() -> completion.accept(null)); } @@ -91,13 +112,114 @@ public void getAndDelete( { final MemoryEntry entry = entries.remove(key); final String value = entry != null && !entry.expired() ? entry.value() : null; + if (entry != null) + { + notifyWatchers(key, null); + } defer(() -> completion.accept(value)); } + @Override + public void lock( + String key, + Duration ttl, + BiConsumer completion) + { + final long now = System.currentTimeMillis(); + final long expiresAt = ttl == null ? Long.MAX_VALUE : now + ttl.toMillis(); + final String token = UUID.randomUUID().toString(); + final LockEntry candidate = new LockEntry(token, expiresAt); + LockEntry existing = locks.putIfAbsent(key, candidate); + if (existing != null && existing.expiresAt() <= now) + { + existing = locks.replace(key, existing, candidate) ? null : locks.get(key); + } + final String result = existing == null ? token : null; + defer(() -> completion.accept(key, result)); + } + + @Override + public void unlock( + String key, + String token, + Consumer completion) + { + final long now = System.currentTimeMillis(); + final LockEntry current = locks.get(key); + final String result; + if (current != null && current.expiresAt() > now && current.token().equals(token)) + { + locks.remove(key, current); + result = token; + } + else + { + // expired holder still cluttering the map — clean up opportunistically; + // either way the caller did not prove ownership of an active lock + if (current != null && current.expiresAt() <= now) + { + locks.remove(key, current); + } + result = null; + } + defer(() -> completion.accept(result)); + } + + @Override + public Closeable watch( + String key, + BiConsumer listener) + { + final Watcher watcher = new Watcher(listener, signaler); + final List list = watchers.computeIfAbsent(key, k -> new CopyOnWriteArrayList<>()); + list.add(watcher); + return () -> + { + final List current = watchers.get(key); + if (current != null) + { + current.remove(watcher); + } + }; + } + + private void notifyWatchers( + String key, + String value) + { + final List list = watchers.get(key); + if (list != null && !list.isEmpty()) + { + final long now = System.currentTimeMillis(); + for (Watcher w : list) + { + w.signaler.signalAt(now, 0, ignored -> w.listener.accept(key, value)); + } + } + } + private void defer( Runnable task) { // contract: callback fires strictly later than the call, on the caller's I/O thread signaler.signalAt(System.currentTimeMillis(), 0, ignored -> task.run()); } + + private static long expiresAt( + Duration ttl) + { + return ttl == null ? Long.MAX_VALUE : System.currentTimeMillis() + ttl.toMillis(); + } + + record Watcher( + BiConsumer listener, + Signaler signaler) + { + } + + record LockEntry( + String token, + long expiresAt) + { + } } diff --git a/runtime/store-memory/src/test/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandlerIT.java b/runtime/store-memory/src/test/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandlerIT.java index b6f5092c74..226e6cb7cb 100644 --- a/runtime/store-memory/src/test/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandlerIT.java +++ b/runtime/store-memory/src/test/java/io/aklivity/zilla/runtime/store/memory/internal/MemoryStoreHandlerIT.java @@ -55,4 +55,34 @@ public void shouldHandshakeWithMemoryStore() throws Exception { k3po.finish(); } + + @Test + @Configuration("store.lock.yaml") + @Specification({ + "${net}/handshake/client", + "${app}/handshake/server"}) + public void shouldLockResource() throws Exception + { + k3po.finish(); + } + + @Test + @Configuration("store.unlock.yaml") + @Specification({ + "${net}/handshake/client", + "${app}/handshake/server"}) + public void shouldUnlockAbsentResource() throws Exception + { + k3po.finish(); + } + + @Test + @Configuration("store.watch.yaml") + @Specification({ + "${net}/handshake/client", + "${app}/handshake/server"}) + public void shouldWatchKey() throws Exception + { + k3po.finish(); + } } diff --git a/specs/store-memory.spec/src/main/scripts/io/aklivity/zilla/specs/store/memory/config/store.lock.yaml b/specs/store-memory.spec/src/main/scripts/io/aklivity/zilla/specs/store/memory/config/store.lock.yaml new file mode 100644 index 0000000000..908f0757db --- /dev/null +++ b/specs/store-memory.spec/src/main/scripts/io/aklivity/zilla/specs/store/memory/config/store.lock.yaml @@ -0,0 +1,39 @@ +# +# Copyright 2021-2024 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +--- +name: test +stores: + memory0: + type: memory +bindings: + net0: + type: test + kind: server + options: + store: memory0 + assertions: + store: + memory0: + - op: lock + key: lock.resource + ttl: PT1M + expect: acquired + - op: lock + key: lock.resource + ttl: PT1M + expect: "" + routes: + - exit: app0 diff --git a/specs/store-memory.spec/src/main/scripts/io/aklivity/zilla/specs/store/memory/config/store.unlock.yaml b/specs/store-memory.spec/src/main/scripts/io/aklivity/zilla/specs/store/memory/config/store.unlock.yaml new file mode 100644 index 0000000000..09e4f1ae30 --- /dev/null +++ b/specs/store-memory.spec/src/main/scripts/io/aklivity/zilla/specs/store/memory/config/store.unlock.yaml @@ -0,0 +1,35 @@ +# +# Copyright 2021-2024 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +--- +name: test +stores: + memory0: + type: memory +bindings: + net0: + type: test + kind: server + options: + store: memory0 + assertions: + store: + memory0: + - op: unlock + key: unlock.absent + value: any-token + expect: null + routes: + - exit: app0 diff --git a/specs/store-memory.spec/src/main/scripts/io/aklivity/zilla/specs/store/memory/config/store.watch.yaml b/specs/store-memory.spec/src/main/scripts/io/aklivity/zilla/specs/store/memory/config/store.watch.yaml new file mode 100644 index 0000000000..a3a48356c7 --- /dev/null +++ b/specs/store-memory.spec/src/main/scripts/io/aklivity/zilla/specs/store/memory/config/store.watch.yaml @@ -0,0 +1,37 @@ +# +# Copyright 2021-2024 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +--- +name: test +stores: + memory0: + type: memory +bindings: + net0: + type: test + kind: server + options: + store: memory0 + assertions: + store: + memory0: + - op: watch + key: watch.key + expect: watched-value + - op: put + key: watch.key + value: watched-value + routes: + - exit: app0