11package com .mx .path .service .facility .store .redis ;
22
3+ import java .io .File ;
34import java .util .Set ;
45import java .util .function .Function ;
56
1213import io .lettuce .core .RedisClient ;
1314import io .lettuce .core .RedisException ;
1415import io .lettuce .core .RedisURI ;
16+ import io .lettuce .core .SslOptions ;
1517import io .lettuce .core .api .StatefulRedisConnection ;
18+ import io .lettuce .core .cluster .ClusterClientOptions ;
19+ import io .lettuce .core .cluster .RedisClusterClient ;
20+ import io .lettuce .core .cluster .api .StatefulRedisClusterConnection ;
1621import io .lettuce .core .protocol .ProtocolVersion ;
1722import io .lettuce .core .resource .ClientResources ;
1823
@@ -27,59 +32,105 @@ public class RedisStore implements Store {
2732 private final RedisStoreConfiguration configuration ;
2833 @ Getter
2934 private StatefulRedisConnection <String , String > connection ;
35+ @ Getter
36+ private StatefulRedisClusterConnection <String , String > redisClusterConnection ;
3037
3138 public final void setConnection (StatefulRedisConnection <String , String > connection ) {
3239 this .connection = connection ;
3340 }
3441
42+ public final void setRedisClusterConnection (StatefulRedisClusterConnection <String , String > redisClusterConnection ) {
43+ this .redisClusterConnection = redisClusterConnection ;
44+ }
45+
3546 public RedisStore (@ Configuration RedisStoreConfiguration redisStoreConfiguration ) {
3647 this .configuration = redisStoreConfiguration ;
3748 }
3849
3950 @ Override
4051 public final void delete (String key ) {
41- safeCall ("delete" , (conn ) -> {
42- conn .sync ().del (key );
43- return Void .TYPE ;
44- });
52+ if (configuration .isCluster ()) {
53+ safeClusterCall ("delete" , (conn ) -> {
54+ conn .sync ().del (key );
55+ return Void .TYPE ;
56+ });
57+ } else {
58+ safeCall ("delete" , (conn ) -> {
59+ conn .sync ().del (key );
60+ return Void .TYPE ;
61+ });
62+ }
4563 }
4664
4765 @ Override
4866 public final void deleteSet (String key , String value ) {
49- safeCall ("deleteSet" , (conn ) -> {
50- conn .sync ().srem (key , value );
51- return Void .TYPE ;
52- });
67+ if (configuration .isCluster ()) {
68+ safeClusterCall ("deleteSet" , (conn ) -> {
69+ conn .sync ().srem (key , value );
70+ return Void .TYPE ;
71+ });
72+ } else {
73+ safeCall ("deleteSet" , (conn ) -> {
74+ conn .sync ().srem (key , value );
75+ return Void .TYPE ;
76+ });
77+ }
5378 }
5479
5580 @ Override
5681 public final String get (String key ) {
57- return safeCall ("get" , (conn ) -> {
58- return conn .sync ().get (key );
59- });
82+ if (configuration .isCluster ()) {
83+ return safeClusterCall ("get" , (conn ) -> {
84+ return conn .sync ().get (key );
85+ });
86+ } else {
87+ return safeCall ("get" , (conn ) -> {
88+ return conn .sync ().get (key );
89+ });
90+ }
6091 }
6192
6293 @ Override
6394 public final Set <String > getSet (String key ) {
64- return safeCall ("getSet" , (conn ) -> {
65- return conn .sync ().smembers (key );
66- });
95+ if (configuration .isCluster ()) {
96+ return safeClusterCall ("getSet" , (conn ) -> {
97+ return conn .sync ().smembers (key );
98+ });
99+ } else {
100+ return safeCall ("getSet" , (conn ) -> {
101+ return conn .sync ().smembers (key );
102+ });
103+ }
67104 }
68105
69106 @ Override
70107 public final boolean inSet (String key , String value ) {
71- return safeCall ("inSet" , (conn ) -> {
72- return conn .sync ().sismember (key , value );
73- });
108+ if (configuration .isCluster ()) {
109+ return safeClusterCall ("inSet" , (conn ) -> {
110+ return conn .sync ().sismember (key , value );
111+ });
112+ } else {
113+ return safeCall ("inSet" , (conn ) -> {
114+ return conn .sync ().sismember (key , value );
115+ });
116+ }
74117 }
75118
76119 @ Override
77120 public final void put (String key , String value , long expirySeconds ) {
78- safeCall ("put" , (conn ) -> {
79- conn .sync ().set (key , value );
80- conn .sync ().expire (key , expirySeconds );
81- return Void .TYPE ;
82- });
121+ if (configuration .isCluster ()) {
122+ safeClusterCall ("put" , (conn ) -> {
123+ conn .sync ().set (key , value );
124+ conn .sync ().expire (key , expirySeconds );
125+ return Void .TYPE ;
126+ });
127+ } else {
128+ safeCall ("put" , (conn ) -> {
129+ conn .sync ().set (key , value );
130+ conn .sync ().expire (key , expirySeconds );
131+ return Void .TYPE ;
132+ });
133+ }
83134 }
84135
85136 @ Override
@@ -89,11 +140,19 @@ public final void put(String key, String value) {
89140
90141 @ Override
91142 public final void putSet (String key , String value , long expirySeconds ) {
92- safeCall ("putSet" , (conn ) -> {
93- conn .sync ().sadd (key , value );
94- conn .sync ().expire (key , expirySeconds );
95- return Void .TYPE ;
96- });
143+ if (configuration .isCluster ()) {
144+ safeClusterCall ("putSet" , (conn ) -> {
145+ conn .sync ().sadd (key , value );
146+ conn .sync ().expire (key , expirySeconds );
147+ return Void .TYPE ;
148+ });
149+ } else {
150+ safeCall ("putSet" , (conn ) -> {
151+ conn .sync ().sadd (key , value );
152+ conn .sync ().expire (key , expirySeconds );
153+ return Void .TYPE ;
154+ });
155+ }
97156 }
98157
99158 @ Override
@@ -103,14 +162,24 @@ public final void putSet(String key, String value) {
103162
104163 @ Override
105164 public final boolean putIfNotExist (String key , String value , long expirySeconds ) {
106- return safeCall ("putIfNotExist" , (conn ) -> {
107- boolean result = conn .sync ().setnx (key , value );
108- if (result ) {
109- conn .sync ().expire (key , expirySeconds );
110- }
165+ if (configuration .isCluster ()) {
166+ return safeClusterCall ("putIfNotExist" , (conn ) -> {
167+ boolean result = conn .sync ().setnx (key , value );
168+ if (result ) {
169+ conn .sync ().expire (key , expirySeconds );
170+ }
171+ return result ;
172+ });
173+ } else {
174+ return safeCall ("putIfNotExist" , (conn ) -> {
175+ boolean result = conn .sync ().setnx (key , value );
176+ if (result ) {
177+ conn .sync ().expire (key , expirySeconds );
178+ }
111179
112- return result ;
113- });
180+ return result ;
181+ });
182+ }
114183 }
115184
116185 @ Override
@@ -141,6 +210,32 @@ final synchronized StatefulRedisConnection<String, String> buildConnection() {
141210 }
142211 }
143212
213+ final synchronized StatefulRedisClusterConnection <String , String > buildClusterConnection () {
214+ try {
215+ ClientResources resources = ClientResources .builder ()
216+ .ioThreadPoolSize (configuration .getIoThreadPoolSize ())
217+ .computationThreadPoolSize (configuration .getComputationThreadPoolSize ())
218+ .build ();
219+
220+ RedisURI redisUri = RedisURI .Builder .redis (configuration .getHost (), configuration .getPort ())
221+ .withSsl (configuration .isSsl ())
222+ .withVerifyPeer (false )
223+ .build ();
224+
225+ RedisClusterClient redisClusterClient = RedisClusterClient .create (resources , redisUri );
226+ SslOptions sslOptions = SslOptions .builder ()
227+ .keyManager (new File (configuration .getCertFile ()), new File (configuration .getKeyFile ()), configuration .getPasswordFile ().toCharArray ())
228+ .build ();
229+ ClusterClientOptions clusterClientOptions = ClusterClientOptions .builder ()
230+ .sslOptions (sslOptions ).build ();
231+ redisClusterClient .setOptions (clusterClientOptions );
232+
233+ return redisClusterClient .connect ();
234+ } catch (RedisException e ) {
235+ throw new RedisStoreConnectionException ("An error occurred connecting to redis" , e );
236+ }
237+ }
238+
144239 private <T > T safeCall (String operation , Function <StatefulRedisConnection <String , String >, T > runnable ) {
145240 try {
146241 return runnable .apply (connection ());
@@ -153,11 +248,30 @@ private <T> T safeCall(String operation, Function<StatefulRedisConnection<String
153248 }
154249 }
155250
251+ private <T > T safeClusterCall (String operation , Function <StatefulRedisClusterConnection <String , String >, T > runnable ) {
252+ try {
253+ return (T ) runnable .apply (clusterConnection ());
254+ } catch (RedisStoreConnectionException e ) {
255+ throw e ;
256+ } catch (RedisException e ) {
257+ throw new RedisStoreOperationException ("Redis error occurred on " + operation , e );
258+ } catch (RuntimeException e ) {
259+ throw new RedisStoreOperationException ("Unknown exception thrown by redis on " + operation , e );
260+ }
261+ }
262+
156263 private StatefulRedisConnection <String , String > connection () {
157264 if (connection == null ) {
158265 connection = buildConnection ();
159266 }
160267
161268 return connection ;
162269 }
270+
271+ private StatefulRedisClusterConnection clusterConnection () {
272+ if (redisClusterConnection == null ) {
273+ redisClusterConnection = buildClusterConnection ();
274+ }
275+ return redisClusterConnection ;
276+ }
163277}
0 commit comments