Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public final class McpProxyCache
private static final String STORE_LOCK_KEY_RESOURCES = STORE_KEY_RESOURCES + STORE_LOCK_SUFFIX;
private static final String STORE_LOCK_KEY_PROMPTS = STORE_KEY_PROMPTS + STORE_LOCK_SUFFIX;
private static final String STORE_LOCK_KEY_LIFECYCLE = "lifecycle.lock";
private static final long STORE_TTL_FOREVER = Long.MAX_VALUE;
private static final Duration STORE_TTL_FOREVER = null;

public final long bindingId;
public final GuardHandler guard;
Expand Down Expand Up @@ -131,7 +131,7 @@ public void register(
void acquireLifecycle(
Consumer<Boolean> completion)
{
store.putIfAbsent(STORE_LOCK_KEY_LIFECYCLE, STORE_LOCK_VALUE, leaseTtl.toMillis(),
store.putIfAbsent(STORE_LOCK_KEY_LIFECYCLE, STORE_LOCK_VALUE, leaseTtl,
prior -> completion.accept(prior == null));
}

Expand Down Expand Up @@ -208,7 +208,7 @@ public void put(
public void acquire(
Consumer<Boolean> completion)
{
store.putIfAbsent(storeLockKey, STORE_LOCK_VALUE, leaseTtl.toMillis(),
store.putIfAbsent(storeLockKey, STORE_LOCK_VALUE, leaseTtl,
prior -> completion.accept(prior == null));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.aklivity.zilla.runtime.engine.store;

import java.io.Closeable;
import java.time.Duration;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

Expand Down Expand Up @@ -60,28 +62,28 @@ void get(
*
* @param key the key to store
* @param value the value to associate
* @param ttl the time-to-live in milliseconds, or {@code Long.MAX_VALUE} for no expiry
* @param ttl the time-to-live, or {@code null} for no expiry
* @param completion a callback invoked when the operation completes
*/
void put(
String key,
String value,
long ttl,
Duration ttl,
Consumer<String> completion);

/**
* Associates the given value with the given key only if the key is not already present.
*
* @param key the key to store
* @param value the value to associate if absent
* @param ttl the time-to-live in milliseconds, or {@code Long.MAX_VALUE} for no expiry
* @param ttl the time-to-live, or {@code null} for no expiry
* @param completion a callback that receives the existing value if present,
* or {@code null} if the key was absent and the value was stored
*/
void putIfAbsent(
String key,
String value,
long ttl,
Duration ttl,
Consumer<String> completion);

/**
Expand All @@ -104,4 +106,76 @@ void delete(
void getAndDelete(
String key,
Consumer<String> completion);

/**
* Attempts to acquire a TTL-bounded lock on the given key without waiting.
* <p>
* The completion fires <em>strictly later</em> than the call returns, on the caller's I/O
* thread, following the same async contract as the other {@code StoreHandler} operations.
* On success it receives a non-null ownership token, generated by the implementation, that
* the caller must supply to {@link #unlock} to release. On contention — another holder owns
* an unexpired lock — the completion receives {@code null}.
* </p>
* <p>
* The lock auto-releases when its TTL elapses; an {@code unlock} from the original holder
* against an expired or reacquired lock is treated as a no-op.
* </p>
*
* @param key the key to lock
* @param ttl the lease duration, or {@code null} for no expiry
* @param completion a callback that receives {@code (key, token)};
* {@code token} is non-null when the lock was acquired,
* {@code null} when another holder owns the key
*/
void lock(
String key,
Duration ttl,
BiConsumer<String, String> completion);

/**
* Releases a lock previously acquired via {@link #lock}, provided the supplied token matches
* the current holder.
* <p>
* On a successful ownership-checked release, the completion receives the supplied token —
* mirroring the {@link #lock} completion shape (non-null = you own it). On failure — the
* supplied token does not match the current holder, or the lock has already expired or been
* released — the completion receives {@code null}. No information about another holder is
* surfaced to a caller that did not prove ownership.
* </p>
* <p>
* The completion fires <em>strictly later</em> than the call returns, on the caller's I/O
* thread, following the same async contract as the other {@code StoreHandler} operations.
* </p>
*
* @param key the key to unlock
* @param token the ownership token returned by a prior {@link #lock} call
* @param completion a callback that receives the supplied {@code token} on a successful
* ownership-checked release, or {@code null} otherwise
*/
void unlock(
String key,
String token,
Consumer<String> completion);

/**
* Registers a listener invoked when the value associated with the given key changes.
* <p>
* Registration returns synchronously. The listener fires <em>strictly later</em> than the
* mutation that triggered it, on the registering caller's I/O thread, following the same
* async contract as the other {@code StoreHandler} operations. Each invocation receives the
* key and the new value, or {@code null} when the entry has been removed.
* </p>
* <p>
* Implementations decide the scope of change visibility — in-process, node-wide, or
* cluster-wide — and document it in their module.
* </p>
*
* @param key the key to observe
* @param listener a callback that receives {@code (key, value)} on every change;
* {@code value} is {@code null} when the entry has been removed
* @return a {@link Closeable} handle that unsubscribes the listener when closed
*/
Closeable watch(
String key,
BiConsumer<String, String> listener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public void attach(
{
final Thread dispatchThread = Thread.currentThread();
final MutableBoolean callbackFired = new MutableBoolean();
this.store.putIfAbsent("init", "", Long.MAX_VALUE, v ->
this.store.putIfAbsent("init", "", null, v ->
{
if (Thread.currentThread() != dispatchThread || callbackFired.value)
{
Expand Down Expand Up @@ -711,6 +711,52 @@ private void runStoreAssertions(
callbackFired.value = true;
});
break;
case "lock":
store.lock(a.key, a.ttl, (k, token) ->
{
if (Thread.currentThread() != dispatchThread ||
callbackFired.value)
{
doInitialReset(traceId);
}
// expect="" → token must be null (lock failed);
// expect=non-empty → token must be non-null (lock succeeded)
if (a.hasExpect)
{
boolean expectAcquired = a.expect != null && !a.expect.isEmpty();
boolean acquired = token != null;
if (expectAcquired != acquired)
{
doInitialReset(traceId);
}
}
callbackFired.value = true;
});
break;
case "unlock":
store.unlock(a.key, a.value, v ->
{
if (Thread.currentThread() != dispatchThread ||
callbackFired.value ||
a.hasExpect && !Objects.equals(v, a.expect))
{
doInitialReset(traceId);
}
callbackFired.value = true;
});
break;
case "watch":
// registration is synchronous; listener fires async on mutations
store.watch(a.key, (k, v) ->
{
if (Thread.currentThread() != dispatchThread ||
a.hasExpect && !Objects.equals(v, a.expect))
{
doInitialReset(traceId);
}
});
callbackFired.value = false;
continue;
default:
doInitialReset(traceId);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.aklivity.zilla.runtime.engine.test.internal.binding.config;

import java.time.Duration;
import java.util.List;
import java.util.function.Function;

Expand Down Expand Up @@ -176,7 +177,7 @@ public static final class StoreAssertion
public final String op;
public final String key;
public final String value;
public final long ttl;
public final Duration ttl;
public final String expect;
public final boolean hasExpect;
public final long delay;
Expand All @@ -185,7 +186,7 @@ public StoreAssertion(
String op,
String key,
String value,
long ttl,
Duration ttl,
String expect,
boolean hasExpect,
long delay)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.aklivity.zilla.runtime.engine.test.internal.binding.config;

import java.time.Duration;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -179,9 +180,9 @@ public JsonObject adaptToJson(
{
assertion.add(STORE_VALUE_NAME, a.value);
}
if (a.ttl != Long.MAX_VALUE)
if (a.ttl != null)
{
assertion.add(STORE_TTL_NAME, a.ttl);
assertion.add(STORE_TTL_NAME, a.ttl.toString());
}
if (a.hasExpect)
{
Expand Down Expand Up @@ -362,7 +363,7 @@ public OptionsConfig adaptFromJson(
s.getString(STORE_OP_NAME),
s.getString(STORE_KEY_NAME),
s.getString(STORE_VALUE_NAME, null),
s.containsKey(STORE_TTL_NAME) ? s.getJsonNumber(STORE_TTL_NAME).longValue() : Long.MAX_VALUE,
s.containsKey(STORE_TTL_NAME) ? Duration.parse(s.getString(STORE_TTL_NAME)) : null,
expect,
hasExpect,
s.containsKey(DELAY_NAME) ? s.getJsonNumber(DELAY_NAME).longValue() : 0L));
Expand Down
Loading
Loading