Skip to content

Commit 3936934

Browse files
committed
feat: redis add retry logic
1 parent 8f9dbf2 commit 3936934

6 files changed

Lines changed: 234 additions & 63 deletions

File tree

api/.env.example

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,13 @@ REDIS_USE_CLUSTERS=false
7171
REDIS_CLUSTERS=
7272
REDIS_CLUSTERS_PASSWORD=
7373

74+
REDIS_RETRY_RETRIES=3
75+
REDIS_RETRY_BACKOFF_BASE=1.0
76+
REDIS_RETRY_BACKOFF_CAP=10.0
77+
REDIS_SOCKET_TIMEOUT=5.0
78+
REDIS_SOCKET_CONNECT_TIMEOUT=5.0
79+
REDIS_HEALTH_CHECK_INTERVAL=30
80+
7481
# celery configuration
7582
CELERY_BROKER_URL=redis://:difyai123456@localhost:${REDIS_PORT}/1
7683
CELERY_BACKEND=redis

api/configs/middleware/cache/redis_config.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,37 @@ class RedisConfig(BaseSettings):
117117
default=None,
118118
)
119119

120+
REDIS_RETRY_RETRIES: NonNegativeInt = Field(
121+
description="Maximum number of retries per Redis command on "
122+
"transient failures (ConnectionError, TimeoutError, socket.timeout)",
123+
default=3,
124+
)
125+
126+
REDIS_RETRY_BACKOFF_BASE: PositiveFloat = Field(
127+
description="Base delay in seconds for exponential backoff between retries",
128+
default=1.0,
129+
)
130+
131+
REDIS_RETRY_BACKOFF_CAP: PositiveFloat = Field(
132+
description="Maximum backoff delay in seconds between retries",
133+
default=10.0,
134+
)
135+
136+
REDIS_SOCKET_TIMEOUT: PositiveFloat | None = Field(
137+
description="Socket timeout in seconds for Redis read/write operations",
138+
default=5.0,
139+
)
140+
141+
REDIS_SOCKET_CONNECT_TIMEOUT: PositiveFloat | None = Field(
142+
description="Socket timeout in seconds for Redis connection establishment",
143+
default=5.0,
144+
)
145+
146+
REDIS_HEALTH_CHECK_INTERVAL: NonNegativeInt = Field(
147+
description="Interval in seconds between Redis connection health checks (0 to disable)",
148+
default=30,
149+
)
150+
120151
@field_validator("REDIS_MAX_CONNECTIONS", mode="before")
121152
@classmethod
122153
def _empty_string_to_none_for_max_conns(cls, v):

api/extensions/ext_redis.py

Lines changed: 52 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@
77

88
import redis
99
from redis import RedisError
10+
from redis.backoff import ExponentialWithJitterBackoff # type: ignore
1011
from redis.cache import CacheConfig
1112
from redis.client import PubSub
1213
from redis.cluster import ClusterNode, RedisCluster
1314
from redis.connection import Connection, SSLConnection
15+
from redis.retry import Retry
1416
from redis.sentinel import Sentinel
1517

1618
from configs import dify_config
@@ -158,8 +160,41 @@ def _get_cache_configuration() -> CacheConfig | None:
158160
return CacheConfig()
159161

160162

163+
def _get_retry_policy() -> Retry:
164+
"""Build the shared retry policy for Redis connections."""
165+
return Retry(
166+
backoff=ExponentialWithJitterBackoff(
167+
base=dify_config.REDIS_RETRY_BACKOFF_BASE,
168+
cap=dify_config.REDIS_RETRY_BACKOFF_CAP,
169+
),
170+
retries=dify_config.REDIS_RETRY_RETRIES,
171+
)
172+
173+
174+
def _get_connection_health_params() -> dict[str, Any]:
175+
"""Get connection health and retry parameters for standalone and Sentinel Redis clients."""
176+
return {
177+
"retry": _get_retry_policy(),
178+
"socket_timeout": dify_config.REDIS_SOCKET_TIMEOUT,
179+
"socket_connect_timeout": dify_config.REDIS_SOCKET_CONNECT_TIMEOUT,
180+
"health_check_interval": dify_config.REDIS_HEALTH_CHECK_INTERVAL,
181+
}
182+
183+
184+
def _get_cluster_connection_health_params() -> dict[str, Any]:
185+
"""Get retry and timeout parameters for Redis Cluster clients.
186+
187+
RedisCluster does not support ``health_check_interval`` as a constructor
188+
keyword (it is silently stripped by ``cleanup_kwargs``), so it is excluded
189+
here. Only ``retry``, ``socket_timeout``, and ``socket_connect_timeout``
190+
are passed through.
191+
"""
192+
params = _get_connection_health_params()
193+
return {k: v for k, v in params.items() if k != "health_check_interval"}
194+
195+
161196
def _get_base_redis_params() -> dict[str, Any]:
162-
"""Get base Redis connection parameters."""
197+
"""Get base Redis connection parameters including retry and health policy."""
163198
return {
164199
"username": dify_config.REDIS_USERNAME,
165200
"password": dify_config.REDIS_PASSWORD or None,
@@ -169,6 +204,7 @@ def _get_base_redis_params() -> dict[str, Any]:
169204
"decode_responses": False,
170205
"protocol": dify_config.REDIS_SERIALIZATION_PROTOCOL,
171206
"cache_config": _get_cache_configuration(),
207+
**_get_connection_health_params(),
172208
}
173209

174210

@@ -215,6 +251,7 @@ def _create_cluster_client() -> Union[redis.Redis, RedisCluster]:
215251
"password": dify_config.REDIS_CLUSTERS_PASSWORD,
216252
"protocol": dify_config.REDIS_SERIALIZATION_PROTOCOL,
217253
"cache_config": _get_cache_configuration(),
254+
**_get_cluster_connection_health_params(),
218255
}
219256
if dify_config.REDIS_MAX_CONNECTIONS:
220257
cluster_kwargs["max_connections"] = dify_config.REDIS_MAX_CONNECTIONS
@@ -226,7 +263,8 @@ def _create_standalone_client(redis_params: dict[str, Any]) -> Union[redis.Redis
226263
"""Create standalone Redis client."""
227264
connection_class, ssl_kwargs = _get_ssl_configuration()
228265

229-
redis_params.update(
266+
params = {**redis_params}
267+
params.update(
230268
{
231269
"host": dify_config.REDIS_HOST,
232270
"port": dify_config.REDIS_PORT,
@@ -235,28 +273,31 @@ def _create_standalone_client(redis_params: dict[str, Any]) -> Union[redis.Redis
235273
)
236274

237275
if dify_config.REDIS_MAX_CONNECTIONS:
238-
redis_params["max_connections"] = dify_config.REDIS_MAX_CONNECTIONS
276+
params["max_connections"] = dify_config.REDIS_MAX_CONNECTIONS
239277

240278
if ssl_kwargs:
241-
redis_params.update(ssl_kwargs)
279+
params.update(ssl_kwargs)
242280

243-
pool = redis.ConnectionPool(**redis_params)
281+
pool = redis.ConnectionPool(**params)
244282
client: redis.Redis = redis.Redis(connection_pool=pool)
245283
return client
246284

247285

248286
def _create_pubsub_client(pubsub_url: str, use_clusters: bool) -> redis.Redis | RedisCluster:
249287
max_conns = dify_config.REDIS_MAX_CONNECTIONS
288+
250289
if use_clusters:
290+
health_params = _get_cluster_connection_health_params()
291+
kwargs: dict[str, Any] = {**health_params}
251292
if max_conns:
252-
return RedisCluster.from_url(pubsub_url, max_connections=max_conns)
253-
else:
254-
return RedisCluster.from_url(pubsub_url)
293+
kwargs["max_connections"] = max_conns
294+
return RedisCluster.from_url(pubsub_url, **kwargs)
255295

296+
health_params = _get_connection_health_params()
297+
kwargs = {**health_params}
256298
if max_conns:
257-
return redis.Redis.from_url(pubsub_url, max_connections=max_conns)
258-
else:
259-
return redis.Redis.from_url(pubsub_url)
299+
kwargs["max_connections"] = max_conns
300+
return redis.Redis.from_url(pubsub_url, **kwargs)
260301

261302

262303
def init_app(app: DifyApp):
Lines changed: 124 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,125 @@
1-
from redis import RedisError
2-
3-
from extensions.ext_redis import redis_fallback
4-
5-
6-
def test_redis_fallback_success():
7-
@redis_fallback(default_return=None)
8-
def test_func():
9-
return "success"
10-
11-
assert test_func() == "success"
12-
13-
14-
def test_redis_fallback_error():
15-
@redis_fallback(default_return="fallback")
16-
def test_func():
17-
raise RedisError("Redis error")
18-
19-
assert test_func() == "fallback"
20-
21-
22-
def test_redis_fallback_none_default():
23-
@redis_fallback()
24-
def test_func():
25-
raise RedisError("Redis error")
1+
from unittest.mock import patch
262

27-
assert test_func() is None
28-
29-
30-
def test_redis_fallback_with_args():
31-
@redis_fallback(default_return=0)
32-
def test_func(x, y):
33-
raise RedisError("Redis error")
34-
35-
assert test_func(1, 2) == 0
36-
37-
38-
def test_redis_fallback_with_kwargs():
39-
@redis_fallback(default_return={})
40-
def test_func(x=None, y=None):
41-
raise RedisError("Redis error")
42-
43-
assert test_func(x=1, y=2) == {}
44-
45-
46-
def test_redis_fallback_preserves_function_metadata():
47-
@redis_fallback(default_return=None)
48-
def test_func():
49-
"""Test function docstring"""
50-
pass
51-
52-
assert test_func.__name__ == "test_func"
53-
assert test_func.__doc__ == "Test function docstring"
3+
from redis import RedisError
4+
from redis.retry import Retry
5+
6+
from extensions.ext_redis import (
7+
_get_base_redis_params,
8+
_get_cluster_connection_health_params,
9+
_get_connection_health_params,
10+
redis_fallback,
11+
)
12+
13+
14+
class TestGetConnectionHealthParams:
15+
@patch("extensions.ext_redis.dify_config")
16+
def test_includes_all_health_params(self, mock_config):
17+
mock_config.REDIS_RETRY_RETRIES = 3
18+
mock_config.REDIS_RETRY_BACKOFF_BASE = 1.0
19+
mock_config.REDIS_RETRY_BACKOFF_CAP = 10.0
20+
mock_config.REDIS_SOCKET_TIMEOUT = 5.0
21+
mock_config.REDIS_SOCKET_CONNECT_TIMEOUT = 5.0
22+
mock_config.REDIS_HEALTH_CHECK_INTERVAL = 30
23+
24+
params = _get_connection_health_params()
25+
26+
assert "retry" in params
27+
assert "socket_timeout" in params
28+
assert "socket_connect_timeout" in params
29+
assert "health_check_interval" in params
30+
assert isinstance(params["retry"], Retry)
31+
assert params["retry"]._retries == 3
32+
assert params["socket_timeout"] == 5.0
33+
assert params["socket_connect_timeout"] == 5.0
34+
assert params["health_check_interval"] == 30
35+
36+
37+
class TestGetClusterConnectionHealthParams:
38+
@patch("extensions.ext_redis.dify_config")
39+
def test_excludes_health_check_interval(self, mock_config):
40+
mock_config.REDIS_RETRY_RETRIES = 3
41+
mock_config.REDIS_RETRY_BACKOFF_BASE = 1.0
42+
mock_config.REDIS_RETRY_BACKOFF_CAP = 10.0
43+
mock_config.REDIS_SOCKET_TIMEOUT = 5.0
44+
mock_config.REDIS_SOCKET_CONNECT_TIMEOUT = 5.0
45+
mock_config.REDIS_HEALTH_CHECK_INTERVAL = 30
46+
47+
params = _get_cluster_connection_health_params()
48+
49+
assert "retry" in params
50+
assert "socket_timeout" in params
51+
assert "socket_connect_timeout" in params
52+
assert "health_check_interval" not in params
53+
54+
55+
class TestGetBaseRedisParams:
56+
@patch("extensions.ext_redis.dify_config")
57+
def test_includes_retry_and_health_params(self, mock_config):
58+
mock_config.REDIS_USERNAME = None
59+
mock_config.REDIS_PASSWORD = None
60+
mock_config.REDIS_DB = 0
61+
mock_config.REDIS_SERIALIZATION_PROTOCOL = 3
62+
mock_config.REDIS_ENABLE_CLIENT_SIDE_CACHE = False
63+
mock_config.REDIS_RETRY_RETRIES = 3
64+
mock_config.REDIS_RETRY_BACKOFF_BASE = 1.0
65+
mock_config.REDIS_RETRY_BACKOFF_CAP = 10.0
66+
mock_config.REDIS_SOCKET_TIMEOUT = 5.0
67+
mock_config.REDIS_SOCKET_CONNECT_TIMEOUT = 5.0
68+
mock_config.REDIS_HEALTH_CHECK_INTERVAL = 30
69+
70+
params = _get_base_redis_params()
71+
72+
assert "retry" in params
73+
assert isinstance(params["retry"], Retry)
74+
assert params["socket_timeout"] == 5.0
75+
assert params["socket_connect_timeout"] == 5.0
76+
assert params["health_check_interval"] == 30
77+
# Existing params still present
78+
assert params["db"] == 0
79+
assert params["encoding"] == "utf-8"
80+
81+
82+
class TestRedisFallback:
83+
def test_redis_fallback_success(self):
84+
@redis_fallback(default_return=None)
85+
def test_func():
86+
return "success"
87+
88+
assert test_func() == "success"
89+
90+
def test_redis_fallback_error(self):
91+
@redis_fallback(default_return="fallback")
92+
def test_func():
93+
raise RedisError("Redis error")
94+
95+
assert test_func() == "fallback"
96+
97+
def test_redis_fallback_none_default(self):
98+
@redis_fallback()
99+
def test_func():
100+
raise RedisError("Redis error")
101+
102+
assert test_func() is None
103+
104+
def test_redis_fallback_with_args(self):
105+
@redis_fallback(default_return=0)
106+
def test_func(x, y):
107+
raise RedisError("Redis error")
108+
109+
assert test_func(1, 2) == 0
110+
111+
def test_redis_fallback_with_kwargs(self):
112+
@redis_fallback(default_return={})
113+
def test_func(x=None, y=None):
114+
raise RedisError("Redis error")
115+
116+
assert test_func(x=1, y=2) == {}
117+
118+
def test_redis_fallback_preserves_function_metadata(self):
119+
@redis_fallback(default_return=None)
120+
def test_func():
121+
"""Test function docstring"""
122+
pass
123+
124+
assert test_func.__name__ == "test_func"
125+
assert test_func.__doc__ == "Test function docstring"

docker/.env.example

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,20 @@ REDIS_USE_CLUSTERS=false
373373
REDIS_CLUSTERS=
374374
REDIS_CLUSTERS_PASSWORD=
375375

376+
# Redis connection and retry configuration
377+
# max redis retry
378+
REDIS_RETRY_RETRIES=3
379+
# Base delay (in seconds) for exponential backoff on retries
380+
REDIS_RETRY_BACKOFF_BASE=1.0
381+
# Cap (in seconds) for exponential backoff on retries
382+
REDIS_RETRY_BACKOFF_CAP=10.0
383+
# Timeout (in seconds) for Redis socket operations
384+
REDIS_SOCKET_TIMEOUT=5.0
385+
# Timeout (in seconds) for establishing a Redis connection
386+
REDIS_SOCKET_CONNECT_TIMEOUT=5.0
387+
# Interval (in seconds) for Redis health checks
388+
REDIS_HEALTH_CHECK_INTERVAL=30
389+
376390
# ------------------------------
377391
# Celery Configuration
378392
# ------------------------------

docker/docker-compose.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,12 @@ x-shared-env: &shared-api-worker-env
100100
REDIS_USE_CLUSTERS: ${REDIS_USE_CLUSTERS:-false}
101101
REDIS_CLUSTERS: ${REDIS_CLUSTERS:-}
102102
REDIS_CLUSTERS_PASSWORD: ${REDIS_CLUSTERS_PASSWORD:-}
103+
REDIS_RETRY_RETRIES: ${REDIS_RETRY_RETRIES:-3}
104+
REDIS_RETRY_BACKOFF_BASE: ${REDIS_RETRY_BACKOFF_BASE:-1.0}
105+
REDIS_RETRY_BACKOFF_CAP: ${REDIS_RETRY_BACKOFF_CAP:-10.0}
106+
REDIS_SOCKET_TIMEOUT: ${REDIS_SOCKET_TIMEOUT:-5.0}
107+
REDIS_SOCKET_CONNECT_TIMEOUT: ${REDIS_SOCKET_CONNECT_TIMEOUT:-5.0}
108+
REDIS_HEALTH_CHECK_INTERVAL: ${REDIS_HEALTH_CHECK_INTERVAL:-30}
103109
CELERY_BROKER_URL: ${CELERY_BROKER_URL:-redis://:difyai123456@redis:6379/1}
104110
CELERY_BACKEND: ${CELERY_BACKEND:-redis}
105111
BROKER_USE_SSL: ${BROKER_USE_SSL:-false}

0 commit comments

Comments
 (0)