Skip to content

Commit 202b092

Browse files
committed
GH-11041: Revise cache in the JdbcLockRegistry
Fixes: #11041 * Fix `JdbcLockRegistry.renewLock()` to rely on the `obtain()` The held lock might be evicted from the cache, so the null from the `this.locks.get(path)` does not reflect reality * Explain more about `cacheCapacity` in the docs # Conflicts: # spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/JdbcLockRegistry.java # spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryTests.java # src/reference/antora/modules/ROOT/pages/jdbc/lock-registry.adoc
1 parent 560fc2a commit 202b092

3 files changed

Lines changed: 68 additions & 22 deletions

File tree

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/JdbcLockRegistry.java

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.springframework.dao.CannotAcquireLockException;
3030
import org.springframework.dao.DataAccessResourceFailureException;
3131
import org.springframework.dao.TransientDataAccessException;
32+
import org.springframework.integration.support.locks.DefaultLockRegistry;
3233
import org.springframework.integration.support.locks.ExpirableLockRegistry;
3334
import org.springframework.integration.support.locks.RenewableLockRegistry;
3435
import org.springframework.integration.util.UUIDConverter;
@@ -65,7 +66,7 @@ public class JdbcLockRegistry implements ExpirableLockRegistry, RenewableLockReg
6566

6667
private static final int DEFAULT_IDLE = 100;
6768

68-
private static final int DEFAULT_CAPACITY = 100_000;
69+
private static final int DEFAULT_CAPACITY = 256;
6970

7071
private final Lock lock = new ReentrantLock();
7172

@@ -85,6 +86,8 @@ protected boolean removeEldestEntry(Entry<String, JdbcLock> eldest) {
8586

8687
private int cacheCapacity = DEFAULT_CAPACITY;
8788

89+
private DefaultLockRegistry defaultLockRegistry = new DefaultLockRegistry();
90+
8891
/**
8992
* Construct an instance based on the provided {@link LockRepository}.
9093
* @param client the {@link LockRepository} to rely on.
@@ -106,11 +109,15 @@ public void setIdleBetweenTries(Duration idleBetweenTries) {
106109

107110
/**
108111
* Set the capacity of cached locks.
109-
* @param cacheCapacity The capacity of cached lock, (default 100_000).
112+
* @param cacheCapacity The capacity of cached lock, (default 256 locks).
110113
* @since 5.5.6
114+
* @see DefaultLockRegistry
111115
*/
112116
public void setCacheCapacity(int cacheCapacity) {
113117
this.cacheCapacity = cacheCapacity;
118+
// Find the highest power of 2 for (n + 1), then subtract 1
119+
int mask = Integer.highestOneBit(cacheCapacity + 1) - 1;
120+
this.defaultLockRegistry = new DefaultLockRegistry(mask);
114121
}
115122

116123
@Override
@@ -148,26 +155,13 @@ public void expireUnusedOlderThan(long age) {
148155

149156
@Override
150157
public void renewLock(Object lockKey) {
151-
Assert.isInstanceOf(String.class, lockKey);
152-
String path = pathFor((String) lockKey);
153-
JdbcLock jdbcLock;
154-
this.lock.lock();
155-
try {
156-
jdbcLock = this.locks.get(path);
157-
}
158-
finally {
159-
this.lock.unlock();
160-
}
161-
162-
if (jdbcLock == null) {
163-
throw new IllegalStateException("Could not found mutex at " + path);
164-
}
158+
JdbcLock jdbcLock = (JdbcLock) obtain(lockKey);
165159
if (!jdbcLock.renew()) {
166-
throw new IllegalStateException("Could not renew mutex at " + path);
160+
throw new IllegalStateException("Could not renew lock " + lockKey);
167161
}
168162
}
169163

170-
private static final class JdbcLock implements Lock {
164+
private final class JdbcLock implements Lock {
171165

172166
private final LockRepository mutex;
173167

@@ -177,12 +171,13 @@ private static final class JdbcLock implements Lock {
177171

178172
private volatile long lastUsed = System.currentTimeMillis();
179173

180-
private final ReentrantLock delegate = new ReentrantLock();
174+
private final ReentrantLock delegate;
181175

182176
JdbcLock(LockRepository client, Duration idleBetweenTries, String path) {
183177
this.mutex = client;
184178
this.idleBetweenTries = idleBetweenTries;
185179
this.path = path;
180+
this.delegate = (ReentrantLock) JdbcLockRegistry.this.defaultLockRegistry.obtain(this.path);
186181
}
187182

188183
public long getLastUsed() {

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryTests.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747

4848
import static org.assertj.core.api.Assertions.assertThat;
4949
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
50+
import static org.assertj.core.api.Assertions.assertThatNoException;
5051

5152
/**
5253
* @author Dave Syer
@@ -553,6 +554,53 @@ void testUnlockAfterLockStatusHasBeenExpiredAndDeleted() throws Exception {
553554
.isThrownBy(lock::unlock);
554555
}
555556

557+
@Test
558+
void noSecondLockOnEviction() throws InterruptedException {
559+
DefaultLockRepository client = new DefaultLockRepository(dataSource);
560+
client.setApplicationContext(this.context);
561+
client.afterPropertiesSet();
562+
client.afterSingletonsInstantiated();
563+
JdbcLockRegistry registry = new JdbcLockRegistry(client);
564+
registry.setCacheCapacity(2);
565+
566+
CountDownLatch lock1Latch = new CountDownLatch(1);
567+
CountDownLatch furtherLocksLatch = new CountDownLatch(1);
568+
569+
Executors.newSingleThreadExecutor()
570+
.execute(() -> {
571+
Lock lock1 = registry.obtain("lock1");
572+
lock1.lock();
573+
try {
574+
furtherLocksLatch.countDown();
575+
lock1Latch.await(10, TimeUnit.SECONDS);
576+
}
577+
catch (InterruptedException e) {
578+
Thread.currentThread().interrupt();
579+
}
580+
finally {
581+
lock1.unlock();
582+
}
583+
});
584+
585+
assertThat(furtherLocksLatch.await(10, TimeUnit.SECONDS)).isTrue();
586+
// Two new locks to trigger cache eviction for the 'lock1'
587+
registry.obtain("lock2");
588+
registry.obtain("lock3");
589+
590+
// Request 'lock1' again: will trigger new JdbcLock instance
591+
Lock lock1 = registry.obtain("lock1");
592+
593+
// Cannot lock because 'lock1' is still locked by another thread
594+
assertThat(lock1.tryLock(1, TimeUnit.SECONDS)).isFalse();
595+
596+
lock1Latch.countDown();
597+
598+
assertThatNoException().isThrownBy(() -> {
599+
lock1.lock();
600+
lock1.unlock();
601+
});
602+
}
603+
556604
@SuppressWarnings("unchecked")
557605
private static Map<String, Lock> getRegistryLocks(JdbcLockRegistry registry) {
558606
return TestUtils.getPropertyValue(registry, "locks", Map.class);

src/reference/antora/modules/ROOT/pages/jdbc/lock-registry.adoc

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,11 @@ So the time to live can be highly reduce and deployments can retake a lost lock
4343

4444
NOTE: The lock renewal can be done only if the lock is held by the current thread.
4545

46-
Starting with version 5.5.6, the `JdbcLockRegistry` is support automatically clean up cache for JdbcLock in `JdbcLockRegistry.locks` via `JdbcLockRegistry.setCacheCapacity()`.
47-
See its JavaDocs for more information.
46+
Starting with version 5.5.6, the `JdbcLockRegistry` is support automatically clean-up cache for JdbcLock in `JdbcLockRegistry.locks` via `JdbcLockRegistry.setCacheCapacity()`.
47+
The default value is `256`.
48+
See its Javadocs for more information.
49+
This capacity number is also propagated into the `DefaultLockRegistry` used internally in the `JdbcLockRegistry` for a pool of shared `ReentrantLock` instances.
50+
So, if higher concurrent access (the number of unique lock keys used in parallel) is expected (or allowed) for the application, the `cacheCapacity` should be increased respectively, with the closest power of 2 in mind (essentially `Integer.highestOneBit(cacheCapacity + 1)`).
4851

4952
Starting with version 6.0, the `DefaultLockRepository` can be supplied with a `PlatformTransactionManager` instead of relying on the primary bean from the application context.
5053

@@ -58,4 +61,4 @@ lockRepository.setInsertQuery(lockRepository.getInsertQuery() + " ON CONFLICT DO
5861
----
5962

6063
Starting with version 6.4, the `LockRepository.delete()` method return the result of removing ownership of a distributed lock.
61-
And the `JdbcLockRegistry.JdbcLock.unlock()` method throws `ConcurrentModificationException` if the ownership of the lock is expired.
64+
And the `JdbcLockRegistry.JdbcLock.unlock()` method throws `ConcurrentModificationException` if the ownership of the lock is expired.

0 commit comments

Comments
 (0)