Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,40 @@ void unlock(
String token,
Consumer<String> completion);

/**
* Extends the TTL of a lock previously acquired via {@link #lock}, provided the supplied token
* matches the current holder.
* <p>
* Used by callers that hold a lock longer than its initial TTL — for example, a singleton
* worker that owns a coordination lease for the lifetime of its binding. The caller schedules
* renewals at an interval shorter than the lease TTL; if any renewal fails the caller has
* lost ownership and must surrender any externally-visible state predicated on holding the
* lock.
* </p>
* <p>
* On a successful ownership-checked renewal, the completion receives the supplied token and
* the lock's expiry is reset to {@code now + ttl}. On failure — the token does not match the
* current holder, or the lock has already expired and possibly been reacquired by another
* holder — the completion receives {@code null}. As with {@link #unlock}, the implementation
* does not surface information about another holder to a caller that did not prove ownership.
* </p>
* <p>
* The completion fires <em>strictly later</em> than the call returns, on the caller's I/O
* thread, following the same async contract as the other {@code StoreHandler} operations.
* </p>
*
* @param key the key whose lock is being renewed
* @param token the ownership token returned by a prior {@link #lock} call
* @param ttl the new lease duration counted from now, or {@code null} for no expiry
* @param completion a callback that receives the supplied {@code token} on a successful
* ownership-checked renewal, or {@code null} otherwise
*/
void renew(
String key,
String token,
Duration ttl,
Consumer<String> completion);

/**
* Registers a listener invoked when the value associated with the given key changes.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.security.KeyStore;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -118,6 +119,7 @@ final class TestBindingFactory implements BindingHandler
private VaultAssertion vaultAssertion;
private StoreHandler store;
private List<StoreAssertion> storeAssertions;
private final Map<String, String> heldLockTokens = new HashMap<>();
private long authorization;

TestBindingFactory(
Expand Down Expand Up @@ -730,18 +732,49 @@ private void runStoreAssertions(
doInitialReset(traceId);
}
}
if (token != null)
{
heldLockTokens.put(k, token);
}
callbackFired.value = true;
});
break;
case "unlock":
store.unlock(a.key, a.value, v ->
store.unlock(a.key, resolveToken(a), v ->
{
if (Thread.currentThread() != dispatchThread ||
callbackFired.value ||
a.hasExpect && !Objects.equals(v, a.expect))
{
doInitialReset(traceId);
}
if (v != null)
{
heldLockTokens.remove(a.key);
}
callbackFired.value = true;
});
break;
case "renew":
store.renew(a.key, resolveToken(a), a.ttl, v ->
{
if (Thread.currentThread() != dispatchThread ||
callbackFired.value)
{
doInitialReset(traceId);
}
// expect="" or "null" → v must be null (renew failed);
// expect=non-empty → v must be non-null (renew succeeded)
if (a.hasExpect)
{
boolean expectRenewed = a.expect != null && !a.expect.isEmpty() &&
!"null".equals(a.expect);
boolean renewed = v != null;
if (expectRenewed != renewed)
{
doInitialReset(traceId);
}
}
callbackFired.value = true;
});
break;
Expand Down Expand Up @@ -769,6 +802,13 @@ private void runStoreAssertions(
}
}

private String resolveToken(
StoreAssertion a)
{
// explicit token via value, otherwise use the most recent token captured by a prior lock
return a.value != null && !a.value.isEmpty() ? a.value : heldLockTokens.get(a.key);
}

private void onInitialData(
DataFW data)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2021-2024 Aklivity Inc.
*
* Aklivity licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.aklivity.zilla.runtime.engine.test.internal.store;

record TestLockEntry(
String token,
long expiresAt)
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.aklivity.zilla.runtime.engine.test.internal.store;

import java.net.URL;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

Expand All @@ -28,11 +29,15 @@ public final class TestStore implements Store
public static final String NAME = "test";

private final ConcurrentMap<Long, ConcurrentMap<String, String>> storage;
private final ConcurrentMap<Long, ConcurrentMap<String, List<TestWatcher>>> storageListeners;
private final ConcurrentMap<Long, ConcurrentMap<String, TestLockEntry>> storageLocks;

public TestStore(
Configuration config)
{
this.storage = new ConcurrentHashMap<>();
this.storageListeners = new ConcurrentHashMap<>();
this.storageLocks = new ConcurrentHashMap<>();
}

@Override
Expand All @@ -51,12 +56,24 @@ public URL type()
public TestStoreContext supply(
EngineContext context)
{
return new TestStoreContext(context, this::acquireEntries);
return new TestStoreContext(context, this::acquireEntries, this::acquireListeners, this::acquireLocks);
}

private ConcurrentMap<String, String> acquireEntries(
long storeId)
{
return storage.computeIfAbsent(storeId, id -> new ConcurrentHashMap<>());
}

private ConcurrentMap<String, List<TestWatcher>> acquireListeners(
long storeId)
{
return storageListeners.computeIfAbsent(storeId, id -> new ConcurrentHashMap<>());
}

private ConcurrentMap<String, TestLockEntry> acquireLocks(
long storeId)
{
return storageLocks.computeIfAbsent(storeId, id -> new ConcurrentHashMap<>());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.aklivity.zilla.runtime.engine.test.internal.store;

import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.function.LongFunction;

Expand All @@ -28,13 +29,19 @@ public final class TestStoreContext implements StoreContext
{
private final Signaler signaler;
private final LongFunction<ConcurrentMap<String, String>> supplyEntries;
private final LongFunction<ConcurrentMap<String, List<TestWatcher>>> supplyListeners;
private final LongFunction<ConcurrentMap<String, TestLockEntry>> supplyLocks;

public TestStoreContext(
EngineContext context,
LongFunction<ConcurrentMap<String, String>> supplyEntries)
LongFunction<ConcurrentMap<String, String>> supplyEntries,
LongFunction<ConcurrentMap<String, List<TestWatcher>>> supplyListeners,
LongFunction<ConcurrentMap<String, TestLockEntry>> supplyLocks)
{
this.signaler = context.signaler();
this.supplyEntries = supplyEntries;
this.supplyListeners = supplyListeners;
this.supplyLocks = supplyLocks;
}

@Override
Expand All @@ -46,7 +53,9 @@ public TestStoreHandler attach(
{
options.entries.forEach(entries::putIfAbsent);
}
return new TestStoreHandler(store, signaler, entries);
final ConcurrentMap<String, List<TestWatcher>> listeners = supplyListeners.apply(store.id);
final ConcurrentMap<String, TestLockEntry> locks = supplyLocks.apply(store.id);
return new TestStoreHandler(store, signaler, entries, listeners, locks);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.BiConsumer;
Expand All @@ -33,19 +32,21 @@
public final class TestStoreHandler implements StoreHandler
{
private final ConcurrentMap<String, String> entries;
private final ConcurrentMap<String, List<BiConsumer<String, String>>> listeners;
private final ConcurrentMap<String, List<TestWatcher>> listeners;
private final ConcurrentMap<String, TestLockEntry> locks;
private final Signaler signaler;

public TestStoreHandler(
StoreConfig store,
Signaler signaler,
ConcurrentMap<String, String> entries)
ConcurrentMap<String, String> entries,
ConcurrentMap<String, List<TestWatcher>> listeners,
ConcurrentMap<String, TestLockEntry> locks)
{
this.entries = Objects.requireNonNull(entries);
this.signaler = Objects.requireNonNull(signaler);
this.listeners = new ConcurrentHashMap<>();
this.locks = new ConcurrentHashMap<>();
this.listeners = Objects.requireNonNull(listeners);
this.locks = Objects.requireNonNull(locks);
}

@Override
Expand Down Expand Up @@ -121,7 +122,7 @@ public void lock(
final String token = UUID.randomUUID().toString();
final TestLockEntry candidate = new TestLockEntry(token, expiresAt);
TestLockEntry existing = locks.putIfAbsent(key, candidate);
if (existing != null && existing.expiresAt <= now)
if (existing != null && existing.expiresAt() <= now)
{
existing = locks.replace(key, existing, candidate) ? null : locks.get(key);
}
Expand All @@ -138,14 +139,14 @@ public void unlock(
final long now = System.currentTimeMillis();
final TestLockEntry current = locks.get(key);
final String result;
if (current != null && current.expiresAt > now && current.token.equals(token))
if (current != null && current.expiresAt() > now && current.token().equals(token))
{
locks.remove(key, current);
result = token;
}
else
{
if (current != null && current.expiresAt <= now)
if (current != null && current.expiresAt() <= now)
{
locks.remove(key, current);
}
Expand All @@ -154,19 +155,47 @@ public void unlock(
defer(() -> completion.accept(result));
}

@Override
public void renew(
String key,
String token,
Duration ttl,
Consumer<String> completion)
{
final long now = System.currentTimeMillis();
final TestLockEntry current = locks.get(key);
String result = null;
if (current != null && current.expiresAt() > now && current.token().equals(token))
{
final long expiresAt = ttl == null ? Long.MAX_VALUE : now + ttl.toMillis();
final TestLockEntry renewed = new TestLockEntry(token, expiresAt);
if (locks.replace(key, current, renewed))
{
result = token;
}
}
else if (current != null && current.expiresAt() <= now)
{
locks.remove(key, current);
}
final String outcome = result;
defer(() -> completion.accept(outcome));
}

@Override
public Closeable watch(
String key,
BiConsumer<String, String> listener)
{
final List<BiConsumer<String, String>> list = listeners.computeIfAbsent(key, k -> new CopyOnWriteArrayList<>());
list.add(listener);
final TestWatcher watcher = new TestWatcher(listener, signaler);
final List<TestWatcher> list = listeners.computeIfAbsent(key, k -> new CopyOnWriteArrayList<>());
list.add(watcher);
return () ->
{
final List<BiConsumer<String, String>> current = listeners.get(key);
final List<TestWatcher> current = listeners.get(key);
if (current != null)
{
current.remove(listener);
current.remove(watcher);
}
};
}
Expand All @@ -175,13 +204,13 @@ private void notifyListeners(
String key,
String value)
{
final List<BiConsumer<String, String>> list = listeners.get(key);
final List<TestWatcher> list = listeners.get(key);
if (list != null && !list.isEmpty())
{
final long now = System.currentTimeMillis();
for (BiConsumer<String, String> listener : list)
for (TestWatcher w : list)
{
signaler.signalAt(now, 0, ignored -> listener.accept(key, value));
w.signaler().signalAt(now, 0, ignored -> w.listener().accept(key, value));
}
}
}
Expand All @@ -192,10 +221,4 @@ private void defer(
// contract: callback fires strictly later than the call, on the caller's I/O thread
signaler.signalAt(System.currentTimeMillis(), 0, ignored -> task.run());
}

private record TestLockEntry(
String token,
long expiresAt)
{
}
}
Loading
Loading