Skip to content

Commit fa0cb2c

Browse files
authored
GH-10782: Use CAS/CAD commands in RedisLockRegistry
Fixes: #10782 * Use CAS for lock renewal in `RedisLock` * Use CAD for lock release in `RedisSpinLock` * Add graceful fallback to Lua scripts for older Redis versions * Bump test container to Redis `8.4.0` * Add tests for CAS/CAD fallback and lock safety * Updated adoc files. Signed-off-by: yordantsintsov <yordan.tsintsov@gmail.com>
1 parent fbb5b71 commit fa0cb2c

5 files changed

Lines changed: 152 additions & 7 deletions

File tree

spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java

Lines changed: 77 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,12 @@
4848

4949
import org.springframework.beans.factory.DisposableBean;
5050
import org.springframework.dao.CannotAcquireLockException;
51+
import org.springframework.dao.InvalidDataAccessApiUsageException;
52+
import org.springframework.data.redis.RedisSystemException;
5153
import org.springframework.data.redis.connection.Message;
5254
import org.springframework.data.redis.connection.MessageListener;
5355
import org.springframework.data.redis.connection.RedisConnectionFactory;
56+
import org.springframework.data.redis.core.BoundValueOperations;
5457
import org.springframework.data.redis.core.StringRedisTemplate;
5558
import org.springframework.data.redis.core.script.DefaultRedisScript;
5659
import org.springframework.data.redis.core.script.RedisScript;
@@ -99,6 +102,7 @@
99102
* @author Youbin Wu
100103
* @author Michal Domagala
101104
* @author Severin Kistler
105+
* @author Yordan Tsintsov
102106
*
103107
* @since 4.0
104108
*
@@ -173,6 +177,11 @@ protected boolean removeEldestEntry(Entry<String, RedisLock> eldest) {
173177
*/
174178
private volatile @Nullable RedisMessageListenerContainer redisMessageListenerContainer;
175179

180+
/**
181+
* Flag to denote whether the Redis server supports CAS/CAD operations.
182+
*/
183+
private volatile boolean supportsCasCadOperations = true;
184+
176185
/**
177186
* Create a lock registry with the default (60 second) lock expiration.
178187
* @param connectionFactory The connection factory.
@@ -373,6 +382,13 @@ private Function<String, RedisLock> getRedisLockConstructor(RedisLockType redisL
373382
};
374383
}
375384

385+
private static boolean isCasCadNotSupportedError(Exception ex) {
386+
Throwable cause = ex.getCause();
387+
return cause != null
388+
&& cause.getMessage() != null
389+
&& (cause.getMessage().contains("ERR syntax error") || cause.getMessage().contains("ERR unknown command"));
390+
}
391+
376392
private abstract class RedisLock implements DistributedLock {
377393

378394
private static final String OBTAIN_LOCK_SCRIPT = """
@@ -403,6 +419,8 @@ private abstract class RedisLock implements DistributedLock {
403419

404420
protected final String lockKey;
405421

422+
protected final BoundValueOperations<String, String> boundValueOps;
423+
406424
private final ReentrantLock localLock = new ReentrantLock();
407425

408426
private volatile long lockedAt;
@@ -411,6 +429,7 @@ private abstract class RedisLock implements DistributedLock {
411429

412430
private RedisLock(String path) {
413431
this.lockKey = constructLockKey(path);
432+
this.boundValueOps = RedisLockRegistry.this.redisTemplate.boundValueOps(this.lockKey);
414433
}
415434

416435
private String constructLockKey(String path) {
@@ -433,7 +452,7 @@ protected abstract boolean tryRedisLockInner(long time, long expireAfter)
433452
throws ExecutionException, InterruptedException;
434453

435454
/**
436-
* Unlock the lock using the unlink method in redis.
455+
* Unlock the lock. Uses delete method for Redis 8.4 and higher or unlink for earlier versions.
437456
*/
438457
protected abstract boolean removeLockKeyInnerUnlink();
439458

@@ -591,13 +610,44 @@ private void removeLockKey() {
591610
}
592611

593612
protected final boolean renew(long expireAfter) {
594-
boolean res = Boolean.TRUE.equals(RedisLockRegistry.this.redisTemplate.execute(
595-
RENEW_REDIS_SCRIPT, Collections.singletonList(this.lockKey),
596-
RedisLockRegistry.this.clientId, String.valueOf(expireAfter)));
597-
if (!res) {
613+
Boolean res;
614+
615+
if (RedisLockRegistry.this.supportsCasCadOperations) {
616+
try {
617+
res = this.boundValueOps.set(
618+
RedisLockRegistry.this.clientId,
619+
spec -> spec.ifEquals()
620+
.value(RedisLockRegistry.this.clientId)
621+
.expire(Duration.ofMillis(expireAfter)));
622+
}
623+
catch (RedisSystemException | InvalidDataAccessApiUsageException ex) {
624+
if (isCasCadNotSupportedError(ex)) {
625+
LOGGER.warn("CAS/CAD for value operations not supported, falling back to Lua script", ex);
626+
RedisLockRegistry.this.supportsCasCadOperations = false;
627+
res = executeRenewRedisScript(expireAfter);
628+
}
629+
else {
630+
throw ex;
631+
}
632+
}
633+
}
634+
else {
635+
res = executeRenewRedisScript(expireAfter);
636+
}
637+
638+
boolean result = Boolean.TRUE.equals(res);
639+
640+
if (!result) {
598641
stopRenew();
599642
}
600-
return res;
643+
644+
return result;
645+
}
646+
647+
private Boolean executeRenewRedisScript(long expireAfter) {
648+
return RedisLockRegistry.this.redisTemplate.execute(
649+
RENEW_REDIS_SCRIPT, Collections.singletonList(this.lockKey),
650+
RedisLockRegistry.this.clientId, String.valueOf(expireAfter));
601651
}
602652

603653
protected final void stopRenew() {
@@ -833,6 +883,27 @@ protected boolean tryRedisLockInner(long time, long expireAfter) throws Interrup
833883

834884
@Override
835885
protected boolean removeLockKeyInnerUnlink() {
886+
if (RedisLockRegistry.this.supportsCasCadOperations) {
887+
try {
888+
return RedisLockRegistry.this.redisTemplate.delete(this.lockKey, it -> it.ifEquals().value(RedisLockRegistry.this.clientId));
889+
}
890+
catch (RedisSystemException | InvalidDataAccessApiUsageException ex) {
891+
if (isCasCadNotSupportedError(ex)) {
892+
LOGGER.warn("CAS/CAD for value operations not supported, falling back to Lua script", ex);
893+
RedisLockRegistry.this.supportsCasCadOperations = false;
894+
return executeUnlinkUnlockRedisScript();
895+
}
896+
else {
897+
throw ex;
898+
}
899+
}
900+
}
901+
else {
902+
return executeUnlinkUnlockRedisScript();
903+
}
904+
}
905+
906+
private Boolean executeUnlinkUnlockRedisScript() {
836907
return Boolean.TRUE.equals(RedisLockRegistry.this.redisTemplate.execute(
837908
UNLINK_UNLOCK_REDIS_SCRIPT, Collections.singletonList(this.lockKey),
838909
RedisLockRegistry.this.clientId));

spring-integration-redis/src/test/java/org/springframework/integration/redis/RedisContainerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
@Testcontainers(disabledWithoutDocker = true)
5959
public interface RedisContainerTest {
6060

61-
GenericContainer<?> REDIS_CONTAINER = new GenericContainer<>("redis:7.0.2")
61+
GenericContainer<?> REDIS_CONTAINER = new GenericContainer<>("redis:8.4.0")
6262
.withExposedPorts(6379);
6363

6464
@BeforeAll

spring-integration-redis/src/test/java/org/springframework/integration/redis/util/RedisLockRegistryTests.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.concurrent.atomic.AtomicInteger;
3636
import java.util.concurrent.atomic.AtomicReference;
3737
import java.util.concurrent.locks.Lock;
38+
import java.util.function.Consumer;
3839
import java.util.stream.Collectors;
3940
import java.util.stream.IntStream;
4041

@@ -47,18 +48,24 @@
4748
import org.junit.jupiter.params.ParameterizedClass;
4849
import org.junit.jupiter.params.provider.EnumSource;
4950

51+
import org.springframework.data.redis.RedisSystemException;
5052
import org.springframework.data.redis.connection.RedisConnectionFactory;
53+
import org.springframework.data.redis.core.BoundValueOperations;
54+
import org.springframework.data.redis.core.SetSpec;
5155
import org.springframework.data.redis.core.StringRedisTemplate;
5256
import org.springframework.integration.redis.RedisContainerTest;
5357
import org.springframework.integration.redis.util.RedisLockRegistry.RedisLockType;
5458
import org.springframework.integration.support.locks.DistributedLock;
5559
import org.springframework.integration.test.util.TestUtils;
5660
import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler;
61+
import org.springframework.test.util.ReflectionTestUtils;
5762

5863
import static org.assertj.core.api.Assertions.assertThat;
5964
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
6065
import static org.assertj.core.api.Assertions.assertThatNoException;
6166
import static org.assertj.core.api.Assertions.assertThatThrownBy;
67+
import static org.mockito.ArgumentMatchers.any;
68+
import static org.mockito.BDDMockito.given;
6269
import static org.mockito.Mockito.mock;
6370

6471
/**
@@ -73,6 +80,7 @@
7380
* @author Youbin Wu
7481
* @author Glenn Renfro
7582
* @author Jiandong Ma
83+
* @author Yordan Tsintsov
7684
*
7785
* @since 4.0
7886
*
@@ -999,6 +1007,68 @@ void testInitialiseWithCustomExecutor() {
9991007
assertThatNoException().isThrownBy(() -> redisLockRegistry.setExecutor(mock()));
10001008
}
10011009

1010+
@Test
1011+
void testRenewFallbackWhenCasCadNotSupported() {
1012+
RedisLockRegistry registry = new RedisLockRegistry(redisConnectionFactory, this.registryKey);
1013+
registry.setRedisLockType(testRedisLockType);
1014+
1015+
Lock lock = registry.obtain("testLock");
1016+
assertThat(lock.tryLock()).isTrue();
1017+
1018+
try {
1019+
BoundValueOperations<String, String> boundValueOps = mock();
1020+
ReflectionTestUtils.setField(lock, "boundValueOps", boundValueOps);
1021+
1022+
given(boundValueOps.set(any(), (Consumer<SetSpec<String, String>>) any()))
1023+
.willThrow(new RedisSystemException("CAS failed", new RuntimeException("ERR unknown command")));
1024+
1025+
assertThat(TestUtils.<Boolean>getPropertyValue(registry, "supportsCasCadOperations")).isTrue();
1026+
1027+
registry.renewLock("testLock");
1028+
1029+
assertThat(TestUtils.<Boolean>getPropertyValue(registry, "supportsCasCadOperations")).isFalse();
1030+
}
1031+
finally {
1032+
lock.unlock();
1033+
}
1034+
}
1035+
1036+
@Test
1037+
void testUnlockFallbackWhenCasCadNotSupported() {
1038+
RedisLockRegistry registry = new RedisLockRegistry(redisConnectionFactory, this.registryKey);
1039+
registry.setRedisLockType(testRedisLockType);
1040+
1041+
ReflectionTestUtils.setField(registry, "supportsCasCadOperations", false);
1042+
1043+
Lock lock = registry.obtain("testLock");
1044+
lock.lock();
1045+
assertThatNoException().isThrownBy(lock::unlock);
1046+
}
1047+
1048+
@Test
1049+
void testUnlockDoesNotDeleteOtherClientsLock() throws Exception {
1050+
RedisLockRegistry registry1 = new RedisLockRegistry(redisConnectionFactory, this.registryKey, 100);
1051+
registry1.setRedisLockType(testRedisLockType);
1052+
RedisLockRegistry registry2 = new RedisLockRegistry(redisConnectionFactory, this.registryKey, 10000);
1053+
registry2.setRedisLockType(testRedisLockType);
1054+
1055+
Lock lock1 = registry1.obtain("testLock");
1056+
lock1.lock();
1057+
1058+
waitForExpire("testLock");
1059+
1060+
Lock lock2 = registry2.obtain("testLock");
1061+
assertThat(lock2.tryLock()).isTrue();
1062+
try {
1063+
assertThatThrownBy(lock1::unlock).isInstanceOf(ConcurrentModificationException.class);
1064+
}
1065+
finally {
1066+
lock2.unlock();
1067+
registry1.destroy();
1068+
registry2.destroy();
1069+
}
1070+
}
1071+
10021072
private Long getExpire(RedisLockRegistry registry, String lockKey) {
10031073
StringRedisTemplate template = createTemplate();
10041074
String registryKey = TestUtils.getPropertyValue(registry, "registryKey");

src/reference/antora/modules/ROOT/pages/redis.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1333,6 +1333,9 @@ Starting with version 7.0, the `RedisLock` implements `DistributedLock` interfac
13331333
A `RedisLock` can now be acquired using the `lock(Duration ttl)` or `tryLock(long time, TimeUnit unit, Duration ttl)` method, with a specified time-to-live (TTL) value.
13341334
The `RedisLockRegistry` now provides new `renewLock(Object lockKey, Duration ttl)` method, allowing to renew the lock with a custom time-to-live value.
13351335

1336+
Starting with version 7.1, the RedisLockRegistry uses native Redis CAS (Compare-And-Set) and CAD (Compare-And-Delete) commands for Redis 8.4 or later.
1337+
For older Redis versions, the registry automatically falls back to the previous Lua script-based approach.
1338+
13361339
[[elasticache-valkey-cluster]]
13371340
=== AWS ElastiCache for Valkey Support in cluster mode
13381341

src/reference/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ See xref:testing.adoc[] for more information.
5454
The `RedisMessageStore.doRemove` now uses `GETDEL` instead of `GET` + `UNLINK` for Redis 6.2+ by default.
5555
Use `RedisMessageStore.setUseUnlink(true)` to use `GET` + `UNLINK` when atomicity is not required and `GETDEL` causes noticeable Redis latency.
5656
The Redis module now provides a Java DSL API via its dedicated `org.springframework.integration.redis.dsl.Redis` factory.
57+
The RedisLockRegistry now uses native Redis CAS/CAD commands for lock renewal and release (Redis 8.4+), with automatic fallback to Lua scripts for older Redis versions.
5758
See xref:redis.adoc[] for more information.
5859

5960
[[x7.1-jms-changes]]

0 commit comments

Comments
 (0)