Skip to content

Commit fe8642d

Browse files
NormBCopilot
andauthored
Add Redis MOVED Redirection support (#3639)
* Updates to support Redis MOVED redirection * Add missing function definition to the include * Update some comments * Formatting updates * Update modules/cachedb_redis/cachedb_redis_utils.c Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update modules/cachedb_redis/cachedb_redis_dbase.c Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Add missing include * Fix: A little too aggressive in freeing the redisReply object * Remove ASK defines until there is logic to implement it. * Remove slot comparison when a MOVED is returned / fix space vs. tab formatting to be consistent --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 7bb062c commit fe8642d

4 files changed

Lines changed: 209 additions & 7 deletions

File tree

modules/cachedb_redis/cachedb_redis_dbase.c

Lines changed: 59 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
#include <hiredis/hiredis.h>
3838

3939
#define QUERY_ATTEMPTS 2
40-
#define REDIS_DF_PORT 6379
4140

4241
int redis_query_tout = CACHEDB_REDIS_DEFAULT_TIMEOUT;
4342
int redis_connnection_tout = CACHEDB_REDIS_DEFAULT_TIMEOUT;
@@ -548,18 +547,71 @@ static int _redis_run_command(cachedb_con *connection, redisReply **rpl, str *ke
548547
va_end(aq);
549548
}
550549

551-
if (reply == NULL || reply->type == REDIS_REPLY_ERROR) {
550+
if (reply == NULL) {
551+
LM_INFO("Redis query failed: reply: NULL node->context->err: %d, node->context->errstr: %s\n", node->context->err, node->context->errstr);
552+
if (node->context->err == REDIS_OK || redis_reconnect_node(con,node) < 0) {
553+
i = 0;
554+
break;
555+
}
556+
} else if (reply->type == REDIS_REPLY_ERROR) {
552557
LM_INFO("Redis query failed: %p %.*s (%s)\n",
553558
reply,reply?(unsigned)reply->len:7,reply?reply->str:"FAILURE",
554559
node->context->errstr);
555-
if (reply) {
556-
freeReplyObject(reply);
557-
reply = NULL;
560+
561+
if (match_prefix(reply->str, reply->len, MOVED_PREFIX, MOVED_PREFIX_LEN)) {
562+
// It's a MOVED response
563+
redis_moved *moved_info = pkg_malloc(sizeof(redis_moved));
564+
if (!moved_info) {
565+
LM_ERR("cachedb_redis: Unable to allocate redis_moved struct, no more pkg memory\n");
566+
freeReplyObject(reply);
567+
reply = NULL;
568+
goto try_next_con;
569+
} else {
570+
if (parse_moved_reply(reply, moved_info) < 0) {
571+
LM_ERR("cachedb_redis: Unable to parse MOVED reply\n");
572+
pkg_free(moved_info);
573+
moved_info = NULL;
574+
freeReplyObject(reply);
575+
goto try_next_con;
576+
}
577+
578+
LM_DBG("cachedb_redis: MOVED slot: [%d] endpoint: [%.*s] port: [%d]\n", moved_info->slot, moved_info->endpoint.len, moved_info->endpoint.s, moved_info->port);
579+
node = get_redis_connection_by_endpoint(con, moved_info);
580+
581+
pkg_free(moved_info);
582+
moved_info = NULL;
583+
freeReplyObject(reply);
584+
reply = NULL;
585+
586+
if (node == NULL) {
587+
LM_ERR("Unable to locate connection by endpoint\n");
588+
last_err = -10;
589+
goto try_next_con;
590+
}
591+
592+
if (node->context == NULL) {
593+
if (redis_reconnect_node(con,node) < 0) {
594+
LM_ERR("Unable to reconnect to node %p endpoint: %s:%d\n", node, node->ip, node->port);
595+
last_err = -1;
596+
goto try_next_con;
597+
}
598+
}
599+
600+
i = QUERY_ATTEMPTS; // New node that is the target being MOVED to, should have the attempts reset
601+
continue;
602+
}
558603
}
604+
605+
freeReplyObject(reply);
606+
reply = NULL;
607+
559608
if (node->context->err == REDIS_OK || redis_reconnect_node(con,node) < 0) {
560-
i = 0; break;
609+
i = 0;
610+
break;
561611
}
562-
} else break;
612+
} else {
613+
break;
614+
}
563615
}
564616

565617
if (i==0) {

modules/cachedb_redis/cachedb_redis_dbase.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,21 @@ typedef struct cluster_nodes {
4848
struct cluster_nodes *next;
4949
} cluster_node;
5050

51+
// Helper typedef to store the endpoint from a redisReply.
52+
typedef struct {
53+
const char *s;
54+
int len;
55+
} const_str;
56+
57+
// When a MOVED is returned from Redis, it is parsed
58+
// and its componenets are stored using the following
59+
// typedef.
60+
typedef struct {
61+
int slot;
62+
const_str endpoint;
63+
int port;
64+
} redis_moved;
65+
5166

5267
#define CACHEDB_REDIS_DEFAULT_TIMEOUT 5000
5368

modules/cachedb_redis/cachedb_redis_utils.c

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
#include "../../dprint.h"
2727
#include "cachedb_redis_dbase.h"
28+
#include "cachedb_redis_utils.h"
2829
#include "../../mem/mem.h"
2930
#include "../../ut.h"
3031
#include "../../cachedb/cachedb.h"
@@ -104,6 +105,31 @@ cluster_node *get_redis_connection(redis_con *con,str *key)
104105
}
105106
}
106107

108+
cluster_node *get_redis_connection_by_endpoint(redis_con *con, redis_moved *redis_info)
109+
{
110+
cluster_node *it;
111+
112+
if (con->flags & REDIS_SINGLE_INSTANCE) {
113+
LM_DBG("Single redis connection, returning %p\n",con->nodes);
114+
return con->nodes;
115+
} else {
116+
for (it=con->nodes;it;it=it->next) {
117+
if (match_prefix(redis_info->endpoint.s, redis_info->endpoint.len, it->ip, strlen(it->ip))) {
118+
if (it->port == redis_info->port) {
119+
// Removed slot comparison as it may be a little too aggressive of a match
120+
// Code is still here in the event that it needs to be added back in
121+
//if (it->start_slot <= redis_info->slot && it->end_slot >= redis_info->slot) {
122+
LM_DBG("Redis cluster connection, matched con %p for endpoint: %.*s:%d slot: [%u] %u [%u] \n", it, redis_info->endpoint.len, redis_info->endpoint.s, redis_info->port, it->start_slot, redis_info->slot, it->end_slot);
123+
return it;
124+
//}
125+
}
126+
}
127+
}
128+
LM_ERR("Redis cluster connection, No match found for endpoint: %.*s:%d slot %u\n", redis_info->endpoint.len, redis_info->endpoint.s, redis_info->port, redis_info->slot);
129+
return NULL;
130+
}
131+
}
132+
107133
void destroy_cluster_nodes(redis_con *con)
108134
{
109135
cluster_node *new,*foo;
@@ -319,3 +345,93 @@ int build_cluster_nodes(redis_con *con,char *info,int size)
319345
destroy_cluster_nodes(con);
320346
return -1;
321347
}
348+
349+
/*
350+
When Redis is operating as a cluster, it is possible (very likely)
351+
that a MOVED redirection will be returned by the Redis nodes that
352+
received the request. The general format of the reply from Redis is:
353+
MOVED slot [IP|FQDN]:port
354+
355+
This routine will parse the Redis MOVED reply into its components.
356+
Note that the redisReply struct MUST be released outside of this routine
357+
to avoid a memory leak. The out->endpoint pointer must not be used after
358+
the redisReply has been released.
359+
360+
The parsed data is stored into the following redis_moved struct:
361+
362+
typedef struct {
363+
int slot;
364+
const_str endpoint;
365+
int port;
366+
} redis_moved;
367+
368+
*/
369+
int parse_moved_reply(redisReply *reply, redis_moved *out) {
370+
if (!reply || !reply->str || reply->len < MOVED_PREFIX_LEN || !out)
371+
return ERR_INVALID_REPLY;
372+
373+
const char *p = reply->str;
374+
const char *end = reply->str + reply->len;
375+
376+
for (int i = 0; i < MOVED_PREFIX_LEN; ++i) {
377+
if (p[i] != MOVED_PREFIX[i]) {
378+
return ERR_INVALID_REPLY;
379+
}
380+
}
381+
p += MOVED_PREFIX_LEN;
382+
383+
// Parse slot number
384+
int slot = 0;
385+
while (p < end && *p >= '0' && *p <= '9') {
386+
slot = slot * 10 + (*p - '0');
387+
p++;
388+
}
389+
if (slot == 0 && (p == reply->str + MOVED_PREFIX_LEN || *(p - 1) < '0' || *(p - 1) > '9'))
390+
return ERR_INVALID_SLOT;
391+
392+
// Skip spaces
393+
while (p < end && *p == ' ') p++;
394+
395+
// Parse host and port
396+
const char *host_start = p;
397+
const char *colon = NULL;
398+
while (p < end) {
399+
if (*p == ':') {
400+
colon = p;
401+
break;
402+
}
403+
p++;
404+
}
405+
406+
out->endpoint.s = NULL;
407+
out->endpoint.len = 0;
408+
409+
int port = REDIS_DF_PORT; // Default to Redis standard port
410+
411+
if (colon) {
412+
out->endpoint.s = host_start;
413+
out->endpoint.len = colon - host_start;
414+
415+
// Parse port
416+
const char *port_start = colon + 1;
417+
p = port_start;
418+
if (p < end) {
419+
port = 0;
420+
while (p < end && *p >= '0' && *p <= '9') {
421+
port = port * 10 + (*p - '0');
422+
p++;
423+
}
424+
if (port < 0 || port > 65535 || port_start == p)
425+
return ERR_INVALID_PORT;
426+
}
427+
} else if (out->endpoint.s < end) {
428+
out->endpoint.s = host_start;
429+
out->endpoint.len = end - host_start;
430+
}
431+
432+
// Fill output
433+
out->slot = slot;
434+
out->port = port;
435+
436+
return 0;
437+
}

modules/cachedb_redis/cachedb_redis_utils.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,29 @@
2626
#ifndef CACHEDB_REDIS_UTILSH
2727
#define CACHEDB_REDIS_UTILSH
2828

29+
#define REDIS_DF_PORT 6379
30+
31+
#define MOVED_PREFIX "MOVED "
32+
#define MOVED_PREFIX_LEN (sizeof(MOVED_PREFIX) - 1)
33+
34+
#define ERR_INVALID_REPLY -1
35+
#define ERR_INVALID_SLOT -2
36+
#define ERR_INVALID_PORT -3
37+
2938
#include "cachedb_redis_dbase.h"
3039

3140
int build_cluster_nodes(redis_con *con,char *info,int size);
3241
cluster_node *get_redis_connection(redis_con *con,str *key);
42+
cluster_node *get_redis_connection_by_endpoint(redis_con *con, redis_moved *redis_info);
3343
void destroy_cluster_nodes(redis_con *con);
44+
int parse_moved_reply(redisReply *reply, redis_moved *out);
45+
46+
static inline int match_prefix(const char *buf, size_t len, const char *prefix, size_t prefix_len) {
47+
if (len < prefix_len) return 0;
48+
for (size_t i = 0; i < prefix_len; ++i) {
49+
if (buf[i] != prefix[i]) return 0;
50+
}
51+
return 1;
52+
}
3453

3554
#endif

0 commit comments

Comments
 (0)