Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,12 @@

import org.springframework.beans.factory.DisposableBean;
import org.springframework.dao.CannotAcquireLockException;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.BoundValueOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
Expand Down Expand Up @@ -99,6 +102,7 @@
* @author Youbin Wu
* @author Michal Domagala
* @author Severin Kistler
* @author Yordan Tsintsov
*
* @since 4.0
*
Expand Down Expand Up @@ -173,6 +177,11 @@ protected boolean removeEldestEntry(Entry<String, RedisLock> eldest) {
*/
private volatile @Nullable RedisMessageListenerContainer redisMessageListenerContainer;

/**
* Flag to denote whether the Redis server supports CAS/CAD operations.
*/
private volatile boolean supportsCasCadOperations = true;
Comment thread
artembilan marked this conversation as resolved.

/**
* Create a lock registry with the default (60 second) lock expiration.
* @param connectionFactory The connection factory.
Expand Down Expand Up @@ -373,6 +382,13 @@ private Function<String, RedisLock> getRedisLockConstructor(RedisLockType redisL
};
}

private static boolean isCasCadNotSupportedError(Exception ex) {
Throwable cause = ex.getCause();
return cause != null
&& cause.getMessage() != null
&& (cause.getMessage().contains("ERR syntax error") || cause.getMessage().contains("ERR unknown command"));
}

private abstract class RedisLock implements DistributedLock {

private static final String OBTAIN_LOCK_SCRIPT = """
Expand Down Expand Up @@ -403,6 +419,8 @@ private abstract class RedisLock implements DistributedLock {

protected final String lockKey;

protected final BoundValueOperations<String, String> boundValueOps;

private final ReentrantLock localLock = new ReentrantLock();

private volatile long lockedAt;
Expand All @@ -411,6 +429,7 @@ private abstract class RedisLock implements DistributedLock {

private RedisLock(String path) {
this.lockKey = constructLockKey(path);
this.boundValueOps = RedisLockRegistry.this.redisTemplate.boundValueOps(this.lockKey);
}

private String constructLockKey(String path) {
Expand All @@ -433,7 +452,7 @@ protected abstract boolean tryRedisLockInner(long time, long expireAfter)
throws ExecutionException, InterruptedException;

/**
* Unlock the lock using the unlink method in redis.
* Unlock the lock. Uses delete method for Redis 8.4 and higher or unlink for earlier versions.
*/
protected abstract boolean removeLockKeyInnerUnlink();

Expand Down Expand Up @@ -591,13 +610,44 @@ private void removeLockKey() {
}

protected final boolean renew(long expireAfter) {
boolean res = Boolean.TRUE.equals(RedisLockRegistry.this.redisTemplate.execute(
RENEW_REDIS_SCRIPT, Collections.singletonList(this.lockKey),
RedisLockRegistry.this.clientId, String.valueOf(expireAfter)));
if (!res) {
Boolean res;

if (RedisLockRegistry.this.supportsCasCadOperations) {
try {
res = this.boundValueOps.set(
RedisLockRegistry.this.clientId,
spec -> spec.ifEquals()
.value(RedisLockRegistry.this.clientId)
.expire(Duration.ofMillis(expireAfter)));
}
catch (RedisSystemException | InvalidDataAccessApiUsageException ex) {
if (isCasCadNotSupportedError(ex)) {
LOGGER.warn("CAS/CAD for value operations not supported, falling back to Lua script", ex);
RedisLockRegistry.this.supportsCasCadOperations = false;
res = executeRenewRedisScript(expireAfter);
}
else {
throw ex;
}
}
}
else {
res = executeRenewRedisScript(expireAfter);
}

boolean result = Boolean.TRUE.equals(res);

if (!result) {
stopRenew();
}
return res;

return result;
}

private Boolean executeRenewRedisScript(long expireAfter) {
return RedisLockRegistry.this.redisTemplate.execute(
RENEW_REDIS_SCRIPT, Collections.singletonList(this.lockKey),
RedisLockRegistry.this.clientId, String.valueOf(expireAfter));
}

protected final void stopRenew() {
Expand Down Expand Up @@ -833,6 +883,27 @@ protected boolean tryRedisLockInner(long time, long expireAfter) throws Interrup

@Override
protected boolean removeLockKeyInnerUnlink() {
if (RedisLockRegistry.this.supportsCasCadOperations) {
try {
return RedisLockRegistry.this.redisTemplate.delete(this.lockKey, it -> it.ifEquals().value(RedisLockRegistry.this.clientId));
Comment thread
artembilan marked this conversation as resolved.
}
catch (RedisSystemException | InvalidDataAccessApiUsageException ex) {
if (isCasCadNotSupportedError(ex)) {
LOGGER.warn("CAS/CAD for value operations not supported, falling back to Lua script", ex);
RedisLockRegistry.this.supportsCasCadOperations = false;
return executeUnlinkUnlockRedisScript();
}
else {
throw ex;
}
}
}
else {
return executeUnlinkUnlockRedisScript();
}
}

private Boolean executeUnlinkUnlockRedisScript() {
return Boolean.TRUE.equals(RedisLockRegistry.this.redisTemplate.execute(
UNLINK_UNLOCK_REDIS_SCRIPT, Collections.singletonList(this.lockKey),
RedisLockRegistry.this.clientId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
@Testcontainers(disabledWithoutDocker = true)
public interface RedisContainerTest {

GenericContainer<?> REDIS_CONTAINER = new GenericContainer<>("redis:7.0.2")
GenericContainer<?> REDIS_CONTAINER = new GenericContainer<>("redis:8.4.0")
.withExposedPorts(6379);

@BeforeAll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand All @@ -47,18 +48,24 @@
import org.junit.jupiter.params.ParameterizedClass;
import org.junit.jupiter.params.provider.EnumSource;

import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.BoundValueOperations;
import org.springframework.data.redis.core.SetSpec;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.integration.redis.RedisContainerTest;
import org.springframework.integration.redis.util.RedisLockRegistry.RedisLockType;
import org.springframework.integration.support.locks.DistributedLock;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler;
import org.springframework.test.util.ReflectionTestUtils;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatNoException;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;

/**
Expand All @@ -73,6 +80,7 @@
* @author Youbin Wu
* @author Glenn Renfro
* @author Jiandong Ma
* @author Yordan Tsintsov
*
* @since 4.0
*
Expand Down Expand Up @@ -999,6 +1007,68 @@ void testInitialiseWithCustomExecutor() {
assertThatNoException().isThrownBy(() -> redisLockRegistry.setExecutor(mock()));
}

@Test
void testRenewFallbackWhenCasCadNotSupported() {
RedisLockRegistry registry = new RedisLockRegistry(redisConnectionFactory, this.registryKey);
registry.setRedisLockType(testRedisLockType);

Lock lock = registry.obtain("testLock");
assertThat(lock.tryLock()).isTrue();

try {
BoundValueOperations<String, String> boundValueOps = mock();
ReflectionTestUtils.setField(lock, "boundValueOps", boundValueOps);

given(boundValueOps.set(any(), (Consumer<SetSpec<String, String>>) any()))
.willThrow(new RedisSystemException("CAS failed", new RuntimeException("ERR unknown command")));

assertThat(TestUtils.<Boolean>getPropertyValue(registry, "supportsCasCadOperations")).isTrue();

registry.renewLock("testLock");

assertThat(TestUtils.<Boolean>getPropertyValue(registry, "supportsCasCadOperations")).isFalse();
}
finally {
lock.unlock();
}
}

@Test
void testUnlockFallbackWhenCasCadNotSupported() {
RedisLockRegistry registry = new RedisLockRegistry(redisConnectionFactory, this.registryKey);
registry.setRedisLockType(testRedisLockType);

ReflectionTestUtils.setField(registry, "supportsCasCadOperations", false);

Lock lock = registry.obtain("testLock");
lock.lock();
assertThatNoException().isThrownBy(lock::unlock);
}

@Test
void testUnlockDoesNotDeleteOtherClientsLock() throws Exception {
RedisLockRegistry registry1 = new RedisLockRegistry(redisConnectionFactory, this.registryKey, 100);
registry1.setRedisLockType(testRedisLockType);
RedisLockRegistry registry2 = new RedisLockRegistry(redisConnectionFactory, this.registryKey, 10000);
registry2.setRedisLockType(testRedisLockType);

Lock lock1 = registry1.obtain("testLock");
lock1.lock();

waitForExpire("testLock");

Lock lock2 = registry2.obtain("testLock");
assertThat(lock2.tryLock()).isTrue();
try {
assertThatThrownBy(lock1::unlock).isInstanceOf(ConcurrentModificationException.class);
}
finally {
lock2.unlock();
registry1.destroy();
registry2.destroy();
}
}

private Long getExpire(RedisLockRegistry registry, String lockKey) {
StringRedisTemplate template = createTemplate();
String registryKey = TestUtils.getPropertyValue(registry, "registryKey");
Expand Down
3 changes: 3 additions & 0 deletions src/reference/antora/modules/ROOT/pages/redis.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1333,6 +1333,9 @@ Starting with version 7.0, the `RedisLock` implements `DistributedLock` interfac
A `RedisLock` can now be acquired using the `lock(Duration ttl)` or `tryLock(long time, TimeUnit unit, Duration ttl)` method, with a specified time-to-live (TTL) value.
The `RedisLockRegistry` now provides new `renewLock(Object lockKey, Duration ttl)` method, allowing to renew the lock with a custom time-to-live value.

Starting with version 7.1, the RedisLockRegistry uses native Redis CAS (Compare-And-Set) and CAD (Compare-And-Delete) commands for Redis 8.4 or later.
For older Redis versions, the registry automatically falls back to the previous Lua script-based approach.

[[elasticache-valkey-cluster]]
=== AWS ElastiCache for Valkey Support in cluster mode

Expand Down
1 change: 1 addition & 0 deletions src/reference/antora/modules/ROOT/pages/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ See xref:testing.adoc[] for more information.
The `RedisMessageStore.doRemove` now uses `GETDEL` instead of `GET` + `UNLINK` for Redis 6.2+ by default.
Use `RedisMessageStore.setUseUnlink(true)` to use `GET` + `UNLINK` when atomicity is not required and `GETDEL` causes noticeable Redis latency.
The Redis module now provides a Java DSL API via its dedicated `org.springframework.integration.redis.dsl.Redis` factory.
The RedisLockRegistry now uses native Redis CAS/CAD commands for lock renewal and release (Redis 8.4+), with automatic fallback to Lua scripts for older Redis versions.
See xref:redis.adoc[] for more information.

[[x7.1-jms-changes]]
Expand Down