diff --git a/spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java b/spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java index 964019e7d5..f723adcff1 100644 --- a/spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java +++ b/spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java @@ -48,9 +48,12 @@ import org.springframework.beans.factory.DisposableBean; import org.springframework.dao.CannotAcquireLockException; +import org.springframework.dao.InvalidDataAccessApiUsageException; +import org.springframework.data.redis.RedisSystemException; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.core.BoundValueOperations; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.RedisScript; @@ -99,6 +102,7 @@ * @author Youbin Wu * @author Michal Domagala * @author Severin Kistler + * @author Yordan Tsintsov * * @since 4.0 * @@ -173,6 +177,11 @@ protected boolean removeEldestEntry(Entry eldest) { */ private volatile @Nullable RedisMessageListenerContainer redisMessageListenerContainer; + /** + * Flag to denote whether the Redis server supports CAS/CAD operations. + */ + private volatile boolean supportsCasCadOperations = true; + /** * Create a lock registry with the default (60 second) lock expiration. * @param connectionFactory The connection factory. @@ -373,6 +382,13 @@ private Function getRedisLockConstructor(RedisLockType redisL }; } + private static boolean isCasCadNotSupportedError(Exception ex) { + Throwable cause = ex.getCause(); + return cause != null + && cause.getMessage() != null + && (cause.getMessage().contains("ERR syntax error") || cause.getMessage().contains("ERR unknown command")); + } + private abstract class RedisLock implements DistributedLock { private static final String OBTAIN_LOCK_SCRIPT = """ @@ -403,6 +419,8 @@ private abstract class RedisLock implements DistributedLock { protected final String lockKey; + protected final BoundValueOperations boundValueOps; + private final ReentrantLock localLock = new ReentrantLock(); private volatile long lockedAt; @@ -411,6 +429,7 @@ private abstract class RedisLock implements DistributedLock { private RedisLock(String path) { this.lockKey = constructLockKey(path); + this.boundValueOps = RedisLockRegistry.this.redisTemplate.boundValueOps(this.lockKey); } private String constructLockKey(String path) { @@ -433,7 +452,7 @@ protected abstract boolean tryRedisLockInner(long time, long expireAfter) throws ExecutionException, InterruptedException; /** - * Unlock the lock using the unlink method in redis. + * Unlock the lock. Uses delete method for Redis 8.4 and higher or unlink for earlier versions. */ protected abstract boolean removeLockKeyInnerUnlink(); @@ -591,13 +610,44 @@ private void removeLockKey() { } protected final boolean renew(long expireAfter) { - boolean res = Boolean.TRUE.equals(RedisLockRegistry.this.redisTemplate.execute( - RENEW_REDIS_SCRIPT, Collections.singletonList(this.lockKey), - RedisLockRegistry.this.clientId, String.valueOf(expireAfter))); - if (!res) { + Boolean res; + + if (RedisLockRegistry.this.supportsCasCadOperations) { + try { + res = this.boundValueOps.set( + RedisLockRegistry.this.clientId, + spec -> spec.ifEquals() + .value(RedisLockRegistry.this.clientId) + .expire(Duration.ofMillis(expireAfter))); + } + catch (RedisSystemException | InvalidDataAccessApiUsageException ex) { + if (isCasCadNotSupportedError(ex)) { + LOGGER.warn("CAS/CAD for value operations not supported, falling back to Lua script", ex); + RedisLockRegistry.this.supportsCasCadOperations = false; + res = executeRenewRedisScript(expireAfter); + } + else { + throw ex; + } + } + } + else { + res = executeRenewRedisScript(expireAfter); + } + + boolean result = Boolean.TRUE.equals(res); + + if (!result) { stopRenew(); } - return res; + + return result; + } + + private Boolean executeRenewRedisScript(long expireAfter) { + return RedisLockRegistry.this.redisTemplate.execute( + RENEW_REDIS_SCRIPT, Collections.singletonList(this.lockKey), + RedisLockRegistry.this.clientId, String.valueOf(expireAfter)); } protected final void stopRenew() { @@ -833,6 +883,27 @@ protected boolean tryRedisLockInner(long time, long expireAfter) throws Interrup @Override protected boolean removeLockKeyInnerUnlink() { + if (RedisLockRegistry.this.supportsCasCadOperations) { + try { + return RedisLockRegistry.this.redisTemplate.delete(this.lockKey, it -> it.ifEquals().value(RedisLockRegistry.this.clientId)); + } + catch (RedisSystemException | InvalidDataAccessApiUsageException ex) { + if (isCasCadNotSupportedError(ex)) { + LOGGER.warn("CAS/CAD for value operations not supported, falling back to Lua script", ex); + RedisLockRegistry.this.supportsCasCadOperations = false; + return executeUnlinkUnlockRedisScript(); + } + else { + throw ex; + } + } + } + else { + return executeUnlinkUnlockRedisScript(); + } + } + + private Boolean executeUnlinkUnlockRedisScript() { return Boolean.TRUE.equals(RedisLockRegistry.this.redisTemplate.execute( UNLINK_UNLOCK_REDIS_SCRIPT, Collections.singletonList(this.lockKey), RedisLockRegistry.this.clientId)); diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/RedisContainerTest.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/RedisContainerTest.java index 4c9f274818..94ec4f8831 100644 --- a/spring-integration-redis/src/test/java/org/springframework/integration/redis/RedisContainerTest.java +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/RedisContainerTest.java @@ -58,7 +58,7 @@ @Testcontainers(disabledWithoutDocker = true) public interface RedisContainerTest { - GenericContainer REDIS_CONTAINER = new GenericContainer<>("redis:7.0.2") + GenericContainer REDIS_CONTAINER = new GenericContainer<>("redis:8.4.0") .withExposedPorts(6379); @BeforeAll diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/RedisLockRegistryTests.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/RedisLockRegistryTests.java index 8dcab5da1b..3430c4c56e 100644 --- a/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/RedisLockRegistryTests.java +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/RedisLockRegistryTests.java @@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -47,18 +48,24 @@ import org.junit.jupiter.params.ParameterizedClass; import org.junit.jupiter.params.provider.EnumSource; +import org.springframework.data.redis.RedisSystemException; import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.core.BoundValueOperations; +import org.springframework.data.redis.core.SetSpec; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.integration.redis.RedisContainerTest; import org.springframework.integration.redis.util.RedisLockRegistry.RedisLockType; import org.springframework.integration.support.locks.DistributedLock; import org.springframework.integration.test.util.TestUtils; import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler; +import org.springframework.test.util.ReflectionTestUtils; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.mock; /** @@ -73,6 +80,7 @@ * @author Youbin Wu * @author Glenn Renfro * @author Jiandong Ma + * @author Yordan Tsintsov * * @since 4.0 * @@ -999,6 +1007,68 @@ void testInitialiseWithCustomExecutor() { assertThatNoException().isThrownBy(() -> redisLockRegistry.setExecutor(mock())); } + @Test + void testRenewFallbackWhenCasCadNotSupported() { + RedisLockRegistry registry = new RedisLockRegistry(redisConnectionFactory, this.registryKey); + registry.setRedisLockType(testRedisLockType); + + Lock lock = registry.obtain("testLock"); + assertThat(lock.tryLock()).isTrue(); + + try { + BoundValueOperations boundValueOps = mock(); + ReflectionTestUtils.setField(lock, "boundValueOps", boundValueOps); + + given(boundValueOps.set(any(), (Consumer>) any())) + .willThrow(new RedisSystemException("CAS failed", new RuntimeException("ERR unknown command"))); + + assertThat(TestUtils.getPropertyValue(registry, "supportsCasCadOperations")).isTrue(); + + registry.renewLock("testLock"); + + assertThat(TestUtils.getPropertyValue(registry, "supportsCasCadOperations")).isFalse(); + } + finally { + lock.unlock(); + } + } + + @Test + void testUnlockFallbackWhenCasCadNotSupported() { + RedisLockRegistry registry = new RedisLockRegistry(redisConnectionFactory, this.registryKey); + registry.setRedisLockType(testRedisLockType); + + ReflectionTestUtils.setField(registry, "supportsCasCadOperations", false); + + Lock lock = registry.obtain("testLock"); + lock.lock(); + assertThatNoException().isThrownBy(lock::unlock); + } + + @Test + void testUnlockDoesNotDeleteOtherClientsLock() throws Exception { + RedisLockRegistry registry1 = new RedisLockRegistry(redisConnectionFactory, this.registryKey, 100); + registry1.setRedisLockType(testRedisLockType); + RedisLockRegistry registry2 = new RedisLockRegistry(redisConnectionFactory, this.registryKey, 10000); + registry2.setRedisLockType(testRedisLockType); + + Lock lock1 = registry1.obtain("testLock"); + lock1.lock(); + + waitForExpire("testLock"); + + Lock lock2 = registry2.obtain("testLock"); + assertThat(lock2.tryLock()).isTrue(); + try { + assertThatThrownBy(lock1::unlock).isInstanceOf(ConcurrentModificationException.class); + } + finally { + lock2.unlock(); + registry1.destroy(); + registry2.destroy(); + } + } + private Long getExpire(RedisLockRegistry registry, String lockKey) { StringRedisTemplate template = createTemplate(); String registryKey = TestUtils.getPropertyValue(registry, "registryKey"); diff --git a/src/reference/antora/modules/ROOT/pages/redis.adoc b/src/reference/antora/modules/ROOT/pages/redis.adoc index ac4169ea53..439bd83ba5 100644 --- a/src/reference/antora/modules/ROOT/pages/redis.adoc +++ b/src/reference/antora/modules/ROOT/pages/redis.adoc @@ -1333,6 +1333,9 @@ Starting with version 7.0, the `RedisLock` implements `DistributedLock` interfac A `RedisLock` can now be acquired using the `lock(Duration ttl)` or `tryLock(long time, TimeUnit unit, Duration ttl)` method, with a specified time-to-live (TTL) value. The `RedisLockRegistry` now provides new `renewLock(Object lockKey, Duration ttl)` method, allowing to renew the lock with a custom time-to-live value. +Starting with version 7.1, the RedisLockRegistry uses native Redis CAS (Compare-And-Set) and CAD (Compare-And-Delete) commands for Redis 8.4 or later. +For older Redis versions, the registry automatically falls back to the previous Lua script-based approach. + [[elasticache-valkey-cluster]] === AWS ElastiCache for Valkey Support in cluster mode diff --git a/src/reference/antora/modules/ROOT/pages/whats-new.adoc b/src/reference/antora/modules/ROOT/pages/whats-new.adoc index 09615629c6..0d27c5ff69 100644 --- a/src/reference/antora/modules/ROOT/pages/whats-new.adoc +++ b/src/reference/antora/modules/ROOT/pages/whats-new.adoc @@ -54,6 +54,7 @@ See xref:testing.adoc[] for more information. The `RedisMessageStore.doRemove` now uses `GETDEL` instead of `GET` + `UNLINK` for Redis 6.2+ by default. Use `RedisMessageStore.setUseUnlink(true)` to use `GET` + `UNLINK` when atomicity is not required and `GETDEL` causes noticeable Redis latency. The Redis module now provides a Java DSL API via its dedicated `org.springframework.integration.redis.dsl.Redis` factory. +The RedisLockRegistry now uses native Redis CAS/CAD commands for lock renewal and release (Redis 8.4+), with automatic fallback to Lua scripts for older Redis versions. See xref:redis.adoc[] for more information. [[x7.1-jms-changes]]