From 9f98f856e0b5bf87e829955bd0d35a57ac8ce45c Mon Sep 17 00:00:00 2001 From: Ryan Jones Date: Mon, 12 May 2025 14:12:05 -0600 Subject: [PATCH] =?UTF-8?q?feat:=20=F0=9F=8E=B8=20Redis=20TLS?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add redis tls and cluster connections --- .../facility/store/redis/RedisStore.java | 182 ++++++++++++--- .../store/redis/RedisStoreConfiguration.java | 17 ++ .../store/redis/RedisStoreClusterTest.groovy | 215 ++++++++++++++++++ 3 files changed, 380 insertions(+), 34 deletions(-) create mode 100644 store-redis/src/test/groovy/com/mx/path/service/facility/store/redis/RedisStoreClusterTest.groovy diff --git a/store-redis/src/main/java/com/mx/path/service/facility/store/redis/RedisStore.java b/store-redis/src/main/java/com/mx/path/service/facility/store/redis/RedisStore.java index c16931a..1929310 100644 --- a/store-redis/src/main/java/com/mx/path/service/facility/store/redis/RedisStore.java +++ b/store-redis/src/main/java/com/mx/path/service/facility/store/redis/RedisStore.java @@ -1,5 +1,6 @@ package com.mx.path.service.facility.store.redis; +import java.io.File; import java.util.Set; import java.util.function.Function; @@ -12,7 +13,11 @@ import io.lettuce.core.RedisClient; import io.lettuce.core.RedisException; import io.lettuce.core.RedisURI; +import io.lettuce.core.SslOptions; import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.cluster.ClusterClientOptions; +import io.lettuce.core.cluster.RedisClusterClient; +import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.protocol.ProtocolVersion; import io.lettuce.core.resource.ClientResources; @@ -27,59 +32,105 @@ public class RedisStore implements Store { private final RedisStoreConfiguration configuration; @Getter private StatefulRedisConnection connection; + @Getter + private StatefulRedisClusterConnection redisClusterConnection; public final void setConnection(StatefulRedisConnection connection) { this.connection = connection; } + public final void setRedisClusterConnection(StatefulRedisClusterConnection redisClusterConnection) { + this.redisClusterConnection = redisClusterConnection; + } + public RedisStore(@Configuration RedisStoreConfiguration redisStoreConfiguration) { this.configuration = redisStoreConfiguration; } @Override public final void delete(String key) { - safeCall("delete", (conn) -> { - conn.sync().del(key); - return Void.TYPE; - }); + if (configuration.isCluster()) { + safeClusterCall("delete", (conn) -> { + conn.sync().del(key); + return Void.TYPE; + }); + } else { + safeCall("delete", (conn) -> { + conn.sync().del(key); + return Void.TYPE; + }); + } } @Override public final void deleteSet(String key, String value) { - safeCall("deleteSet", (conn) -> { - conn.sync().srem(key, value); - return Void.TYPE; - }); + if (configuration.isCluster()) { + safeClusterCall("deleteSet", (conn) -> { + conn.sync().srem(key, value); + return Void.TYPE; + }); + } else { + safeCall("deleteSet", (conn) -> { + conn.sync().srem(key, value); + return Void.TYPE; + }); + } } @Override public final String get(String key) { - return safeCall("get", (conn) -> { - return conn.sync().get(key); - }); + if (configuration.isCluster()) { + return safeClusterCall("get", (conn) -> { + return conn.sync().get(key); + }); + } else { + return safeCall("get", (conn) -> { + return conn.sync().get(key); + }); + } } @Override public final Set getSet(String key) { - return safeCall("getSet", (conn) -> { - return conn.sync().smembers(key); - }); + if (configuration.isCluster()) { + return safeClusterCall("getSet", (conn) -> { + return conn.sync().smembers(key); + }); + } else { + return safeCall("getSet", (conn) -> { + return conn.sync().smembers(key); + }); + } } @Override public final boolean inSet(String key, String value) { - return safeCall("inSet", (conn) -> { - return conn.sync().sismember(key, value); - }); + if (configuration.isCluster()) { + return safeClusterCall("inSet", (conn) -> { + return conn.sync().sismember(key, value); + }); + } else { + return safeCall("inSet", (conn) -> { + return conn.sync().sismember(key, value); + }); + } } @Override public final void put(String key, String value, long expirySeconds) { - safeCall("put", (conn) -> { - conn.sync().set(key, value); - conn.sync().expire(key, expirySeconds); - return Void.TYPE; - }); + if (configuration.isCluster()) { + safeClusterCall("put", (conn) -> { + conn.sync().set(key, value); + conn.sync().expire(key, expirySeconds); + return Void.TYPE; + }); + } else { + safeCall("put", (conn) -> { + conn.sync().set(key, value); + conn.sync().expire(key, expirySeconds); + return Void.TYPE; + }); + } } @Override @@ -89,11 +140,19 @@ public final void put(String key, String value) { @Override public final void putSet(String key, String value, long expirySeconds) { - safeCall("putSet", (conn) -> { - conn.sync().sadd(key, value); - conn.sync().expire(key, expirySeconds); - return Void.TYPE; - }); + if (configuration.isCluster()) { + safeClusterCall("putSet", (conn) -> { + conn.sync().sadd(key, value); + conn.sync().expire(key, expirySeconds); + return Void.TYPE; + }); + } else { + safeCall("putSet", (conn) -> { + conn.sync().sadd(key, value); + conn.sync().expire(key, expirySeconds); + return Void.TYPE; + }); + } } @Override @@ -103,14 +162,24 @@ public final void putSet(String key, String value) { @Override public final boolean putIfNotExist(String key, String value, long expirySeconds) { - return safeCall("putIfNotExist", (conn) -> { - boolean result = conn.sync().setnx(key, value); - if (result) { - conn.sync().expire(key, expirySeconds); - } + if (configuration.isCluster()) { + return safeClusterCall("putIfNotExist", (conn) -> { + boolean result = conn.sync().setnx(key, value); + if (result) { + conn.sync().expire(key, expirySeconds); + } + return result; + }); + } else { + return safeCall("putIfNotExist", (conn) -> { + boolean result = conn.sync().setnx(key, value); + if (result) { + conn.sync().expire(key, expirySeconds); + } - return result; - }); + return result; + }); + } } @Override @@ -141,6 +210,32 @@ final synchronized StatefulRedisConnection buildConnection() { } } + final synchronized StatefulRedisClusterConnection buildClusterConnection() { + try { + ClientResources resources = ClientResources.builder() + .ioThreadPoolSize(configuration.getIoThreadPoolSize()) + .computationThreadPoolSize(configuration.getComputationThreadPoolSize()) + .build(); + + RedisURI redisUri = RedisURI.Builder.redis(configuration.getHost(), configuration.getPort()) + .withSsl(configuration.isSsl()) + .withVerifyPeer(false) + .build(); + + RedisClusterClient redisClusterClient = RedisClusterClient.create(resources, redisUri); + SslOptions sslOptions = SslOptions.builder() + .keyManager(new File(configuration.getCertFile()), new File(configuration.getKeyFile()), configuration.getPasswordFile().toCharArray()) + .build(); + ClusterClientOptions clusterClientOptions = ClusterClientOptions.builder() + .sslOptions(sslOptions).build(); + redisClusterClient.setOptions(clusterClientOptions); + + return redisClusterClient.connect(); + } catch (RedisException e) { + throw new RedisStoreConnectionException("An error occurred connecting to redis", e); + } + } + private T safeCall(String operation, Function, T> runnable) { try { return runnable.apply(connection()); @@ -153,6 +248,18 @@ private T safeCall(String operation, Function T safeClusterCall(String operation, Function, T> runnable) { + try { + return (T) runnable.apply(clusterConnection()); + } catch (RedisStoreConnectionException e) { + throw e; + } catch (RedisException e) { + throw new RedisStoreOperationException("Redis error occurred on " + operation, e); + } catch (RuntimeException e) { + throw new RedisStoreOperationException("Unknown exception thrown by redis on " + operation, e); + } + } + private StatefulRedisConnection connection() { if (connection == null) { connection = buildConnection(); @@ -160,4 +267,11 @@ private StatefulRedisConnection connection() { return connection; } + + private StatefulRedisClusterConnection clusterConnection() { + if (redisClusterConnection == null) { + redisClusterConnection = buildClusterConnection(); + } + return redisClusterConnection; + } } diff --git a/store-redis/src/main/java/com/mx/path/service/facility/store/redis/RedisStoreConfiguration.java b/store-redis/src/main/java/com/mx/path/service/facility/store/redis/RedisStoreConfiguration.java index a751441..bd3a7ec 100644 --- a/store-redis/src/main/java/com/mx/path/service/facility/store/redis/RedisStoreConfiguration.java +++ b/store-redis/src/main/java/com/mx/path/service/facility/store/redis/RedisStoreConfiguration.java @@ -14,6 +14,8 @@ public class RedisStoreConfiguration { private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(10); private static final int DEFAULT_PORT = 6379; private static final int DEFAULT_THREAD_POOL_SIZE = 5; + private static final boolean DEFAULT_SSL = false; + private static final boolean DEFAULT_CLUSTER = false; @ConfigurationField private String host = DEFAULT_HOST; @@ -29,4 +31,19 @@ public class RedisStoreConfiguration { @ConfigurationField private int port = DEFAULT_PORT; + + @ConfigurationField + private boolean ssl = DEFAULT_SSL; + + @ConfigurationField + private boolean cluster = DEFAULT_CLUSTER; + + @ConfigurationField + private String certFile; + + @ConfigurationField + private String keyFile; + + @ConfigurationField + private String passwordFile; } diff --git a/store-redis/src/test/groovy/com/mx/path/service/facility/store/redis/RedisStoreClusterTest.groovy b/store-redis/src/test/groovy/com/mx/path/service/facility/store/redis/RedisStoreClusterTest.groovy new file mode 100644 index 0000000..a70faa1 --- /dev/null +++ b/store-redis/src/test/groovy/com/mx/path/service/facility/store/redis/RedisStoreClusterTest.groovy @@ -0,0 +1,215 @@ +package com.mx.path.service.facility.store.redis + +import static org.mockito.Mockito.* + +import org.mockito.Mockito + +import io.lettuce.core.RedisException +import io.lettuce.core.api.StatefulRedisConnection +import io.lettuce.core.api.sync.RedisCommands +import io.lettuce.core.cluster.api.StatefulRedisClusterConnection +import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands +import io.lettuce.core.cluster.api.sync.RedisClusterCommands + +import spock.lang.Specification + +class RedisStoreClusterTest extends Specification implements WithMockery { + + StatefulRedisClusterConnection clusterConnection + RedisAdvancedClusterCommands commands + RedisStore subject + RedisStoreConfiguration configurations + + def setup() { + configurations = new RedisStoreConfiguration() + configurations.cluster = true + clusterConnection = mock(StatefulRedisClusterConnection.class, Mockito.RETURNS_DEEP_STUBS) + commands = mock(RedisAdvancedClusterCommands.class) + when(clusterConnection.sync()).thenReturn(commands) + + subject = spy(new RedisStore(configurations)) + subject.setRedisClusterConnection(clusterConnection) + } + + def "setters/getters"() { + when: + def conn = mock(StatefulRedisClusterConnection.class) + subject.setRedisClusterConnection(conn) + + then: + subject.getRedisClusterConnection() == conn + subject.getConfiguration() == configurations + } + + def "delete"() { + when: + subject.delete("id") + + then: + verify(commands).del("id") || true + } + + def "deleteSet"() { + when: + subject.deleteSet("id", "value") + + then: + verify(commands).srem("id", "value") || true + } + + def "get"() { + when: + when(commands.get("key1")).thenReturn("value1") + + then: + subject.get("key1") == "value1" + } + + def "getSet"() { + given: + Set set = new HashSet<>() + set.add("key1") + + when: + when(commands.smembers("key1")).thenReturn(set) + + then: + subject.getSet("key1") == set + } + + def "inSet"() { + when: + when(commands.sismember("key1", "value1")).thenReturn(true) + + then: + subject.inSet("key1", "value1") + } + + def "put"() { + when: + subject.put("key1", "value1", 60) + + then: + verify(commands).set("key1", "value1") || true + verify(commands).expire("key1", 60) || true + } + + def "put with two arguments"() { + when: + subject.put("key1", "value1") + + then: + thrown(RedisStoreUnsupportedException) + } + + def "putSet"() { + when: + subject.putSet("key1", "value1", 60) + + then: + verify(commands).sadd("key1", "value1") || true + verify(commands).expire("key1", 60) || true + } + + def "putSet with two arguments"() { + when: + subject.putSet("key1", "value1") + + then: + thrown(RedisStoreUnsupportedException) + } + + def "putIfNotExist with two arguments"() { + when: + subject.putIfNotExist("key1", "value1") + + then: + thrown(RedisStoreUnsupportedException) + } + + def "setIfNotExist when does not exist"() { + when: + when(commands.setnx("key1", "value1")).thenReturn(true) // The key was set + subject.putIfNotExist("key1", "value1", 60) + + then: + verify(commands).expire("key1", 60) || true + } + + def "setIfNotExist when does exist"() { + when: + when(commands.setnx("key1", "value1")).thenReturn(false) // The key already set + subject.putIfNotExist("key1", "value1", 60) + + then: + verify(commands, never()).expire("key1", 60) || true + } + + def "setIfNotExist with expiration when does not exist"() { + when: + when(commands.setnx("key1", "value1")).thenReturn(true) // The key was set + subject.putIfNotExist("key1", "value1", 12) + + then: + verify(commands).expire("key1", 12) || true + } + + def "setIfNotExist with expiration when does exist"() { + when: + when(commands.setnx("key1", "value1")).thenReturn(false) // The key already set + subject.putIfNotExist("key1", "value1", 12) + + then: + verify(commands, never()).expire("key1", 12) || true + } + + def "status"() { + when: + when(commands.ping()).thenReturn("OK") + + then: + subject.status() == "OK" + } + + def "rethrows connection failure"() { + given: + subject.setRedisClusterConnection(null) + def exception = new RedisStoreConnectionException("Something happened when connecting", null) + doThrow(exception).when(subject).buildClusterConnection() + + when: + subject.get("junk") + + then: + def ex = thrown(RedisStoreConnectionException) + ex == exception + } + + def "wraps RedisException"() { + given: + subject.setRedisClusterConnection(null) + def exception = new RedisException("Something happened when performing operation") + doThrow(exception).when(subject).buildClusterConnection() + + when: + subject.get("junk") + + then: + def ex = thrown(RedisStoreOperationException) + ex.getMessage() == "Redis error occurred on get" + } + + def "wraps unknown error"() { + given: + subject.setRedisClusterConnection(null) + def exception = new RuntimeException("Something happened") + doThrow(exception).when(subject).buildClusterConnection() + + when: + subject.get("junk") + + then: + def ex = thrown(RedisStoreOperationException) + ex.getMessage() == "Unknown exception thrown by redis on get" + } +}