From 9eab1627318a98128eb5b42d474cc1340bd95639 Mon Sep 17 00:00:00 2001 From: Norm Brandinger Date: Thu, 1 May 2025 16:55:21 -0400 Subject: [PATCH 01/10] Updates to support Redis MOVED redirection --- modules/cachedb_redis/cachedb_redis_dbase.c | 65 +++++++++-- modules/cachedb_redis/cachedb_redis_dbase.h | 11 ++ modules/cachedb_redis/cachedb_redis_utils.c | 118 ++++++++++++++++++++ modules/cachedb_redis/cachedb_redis_utils.h | 21 ++++ 4 files changed, 208 insertions(+), 7 deletions(-) diff --git a/modules/cachedb_redis/cachedb_redis_dbase.c b/modules/cachedb_redis/cachedb_redis_dbase.c index 301ce5bb680..c522a59a886 100644 --- a/modules/cachedb_redis/cachedb_redis_dbase.c +++ b/modules/cachedb_redis/cachedb_redis_dbase.c @@ -37,7 +37,6 @@ #include #define QUERY_ATTEMPTS 2 -#define REDIS_DF_PORT 6379 int redis_query_tout = CACHEDB_REDIS_DEFAULT_TIMEOUT; int redis_connnection_tout = CACHEDB_REDIS_DEFAULT_TIMEOUT; @@ -548,18 +547,70 @@ static int _redis_run_command(cachedb_con *connection, redisReply **rpl, str *ke va_end(aq); } - if (reply == NULL || reply->type == REDIS_REPLY_ERROR) { + if (reply == NULL) { + LM_INFO("Redis query failed: reply: NULL node->context->err: %d, node->context->errstr: %s\n", node->context->err, node->context->errstr); + if (node->context->err == REDIS_OK || redis_reconnect_node(con,node) < 0) { + i = 0; + break; + } + } else if (reply->type == REDIS_REPLY_ERROR) { LM_INFO("Redis query failed: %p %.*s (%s)\n", reply,reply?(unsigned)reply->len:7,reply?reply->str:"FAILURE", node->context->errstr); - if (reply) { - freeReplyObject(reply); - reply = NULL; + + if (match_prefix(reply->str, reply->len, MOVED_PREFIX, MOVED_PREFIX_LEN)) { + // It's a MOVED response + redis_moved *moved_info = pkg_malloc(sizeof(redis_moved)); + if (!moved_info) { + LM_ERR("cachedb_redis: Unable to allocate redis_moved struct, no more pkg memory\n"); + } else { + if (parse_moved_reply(reply, moved_info) < 0) { + LM_ERR("cachedb_redis: Unable to parse MOVED reply\n"); + pkg_free(moved_info); + moved_info = NULL; + freeReplyObject(reply); + goto try_next_con; + } + + LM_DBG("cachedb_redis: MOVED slot: [%d] endpoint: [%.*s] port: [%d]\n", moved_info->slot, moved_info->endpoint.len, moved_info->endpoint.s, moved_info->port); + node = get_redis_connection_by_endpoint(con, moved_info); + + pkg_free(moved_info); + moved_info = NULL; + freeReplyObject(reply); + reply = NULL; + + if (node == NULL) { + LM_ERR("Unable to locate connection by endpoint\n"); + last_err = -10; + goto try_next_con; + } + + if (node->context == NULL) { + if (redis_reconnect_node(con,node) < 0) { + LM_ERR("Unable to reconnect to node %p endpoint: %s:%d\n", node, node->ip, node->port); + last_err = -1; + goto try_next_con; + } + } + + i = QUERY_ATTEMPTS; // New node that is being MOVED to should have the attempts reset + continue; + } } + + freeReplyObject(reply); + reply = NULL; + if (node->context->err == REDIS_OK || redis_reconnect_node(con,node) < 0) { - i = 0; break; + i = 0; + break; } - } else break; + } else { + freeReplyObject(reply); + reply = NULL; + break; + } } if (i==0) { diff --git a/modules/cachedb_redis/cachedb_redis_dbase.h b/modules/cachedb_redis/cachedb_redis_dbase.h index cecdeb6bcb6..063e2fc9a18 100644 --- a/modules/cachedb_redis/cachedb_redis_dbase.h +++ b/modules/cachedb_redis/cachedb_redis_dbase.h @@ -48,6 +48,17 @@ typedef struct cluster_nodes { struct cluster_nodes *next; } cluster_node; +typedef struct { + const char *s; + int len; +} const_str; + +typedef struct { + int slot; + const_str endpoint; + int port; +} redis_moved; + #define CACHEDB_REDIS_DEFAULT_TIMEOUT 5000 diff --git a/modules/cachedb_redis/cachedb_redis_utils.c b/modules/cachedb_redis/cachedb_redis_utils.c index 0735dc98820..748c20e20db 100644 --- a/modules/cachedb_redis/cachedb_redis_utils.c +++ b/modules/cachedb_redis/cachedb_redis_utils.c @@ -104,6 +104,29 @@ cluster_node *get_redis_connection(redis_con *con,str *key) } } +cluster_node *get_redis_connection_by_endpoint(redis_con *con, redis_moved *redis_info) +{ + cluster_node *it; + + if (con->flags & REDIS_SINGLE_INSTANCE) { + LM_DBG("Single redis connection, returning %p\n",con->nodes); + return con->nodes; + } else { + for (it=con->nodes;it;it=it->next) { + if (match_prefix(redis_info->endpoint.s, redis_info->endpoint.len, it->ip, strlen(it->ip))) { + if (it->port == redis_info->port) { + if (it->start_slot <= redis_info->slot && it->end_slot >= redis_info->slot) { + LM_DBG("Redis cluster connection, matched con %p for endpoint: %.*s:%d slot: [%u] %u [%u] \n", it, redis_info->endpoint.len, redis_info->endpoint.s, redis_info->port, it->start_slot, redis_info->slot, it->end_slot); + return it; + } + } + } + } + LM_ERR("Redis cluster connection, No match found for endpoint: %.*s:%d slot %u\n", redis_info->endpoint.len, redis_info->endpoint.s, redis_info->port, redis_info->slot); + return NULL; + } +} + void destroy_cluster_nodes(redis_con *con) { cluster_node *new,*foo; @@ -319,3 +342,98 @@ int build_cluster_nodes(redis_con *con,char *info,int size) destroy_cluster_nodes(con); return -1; } + +/* + When Redis is operating as a cluster, it is possible (very likely) + that a MOVED redirection will be returned by the Redis nodes that + received the request. The general format of the reply from Redis is: + MOVED slot [IP|FQDN]:port + + This routine will parse the Redis MOVED reply into its components. + Note that the redisReply struct MUST be released outside of this routine + to avoid a memory leak. The out->endpoint pointer must not be used after + the redisReply has been released. + + The parsed data is stored into the following redis_moved struct: + + typedef struct { + int slot; + const_str endpoint; + int port; + } redis_moved; + +*/ +int parse_moved_reply(redisReply *reply, redis_moved *out) { + if (!reply || !reply->str || reply->len < MOVED_PREFIX_LEN || !out) + return ERR_INVALID_REPLY; + + // Check if the length of the reply buffer is too short + if (reply->len < MOVED_PREFIX_LEN) { + return ERR_INVALID_REPLY; + } + + const char *p = reply->str; + const char *end = reply->str + reply->len; + + for (int i = 0; i < MOVED_PREFIX_LEN; ++i) { + if (p[i] != MOVED_PREFIX[i]) { + return ERR_INVALID_REPLY; + } + } + p += MOVED_PREFIX_LEN; + + // Parse slot number + int slot = 0; + while (p < end && *p >= '0' && *p <= '9') { + slot = slot * 10 + (*p - '0'); + p++; + } + if (slot == 0 && (p == reply->str + MOVED_PREFIX_LEN || *(p - 1) < '0' || *(p - 1) > '9')) + return ERR_INVALID_SLOT; + + // Skip spaces + while (p < end && *p == ' ') p++; + + // Parse host and port + const char *host_start = p; + const char *colon = NULL; + while (p < end) { + if (*p == ':') { + colon = p; + break; + } + p++; + } + + out->endpoint.s = NULL; + out->endpoint.len = 0; + + int port = REDIS_DF_PORT; // Default to Redis standard port + + if (colon) { + out->endpoint.s = host_start; + out->endpoint.len = colon - host_start; + + // Parse port + const char *port_start = colon + 1; + p = port_start; + if (p < end) { + port = 0; + while (p < end && *p >= '0' && *p <= '9') { + port = port * 10 + (*p - '0'); + p++; + } + if (port < 0 || port > 65535 || port_start == p) + return ERR_INVALID_PORT; + } + } else if (out->endpoint.s < end) { + out->endpoint.s = host_start; + out->endpoint.len = end - host_start; + } + + // Fill output + out->slot = slot; + out->port = port; + + return 0; +} \ No newline at end of file diff --git a/modules/cachedb_redis/cachedb_redis_utils.h b/modules/cachedb_redis/cachedb_redis_utils.h index df7435e9b94..e601adcdcbe 100644 --- a/modules/cachedb_redis/cachedb_redis_utils.h +++ b/modules/cachedb_redis/cachedb_redis_utils.h @@ -26,10 +26,31 @@ #ifndef CACHEDB_REDIS_UTILSH #define CACHEDB_REDIS_UTILSH +#define REDIS_DF_PORT 6379 + +#define MOVED_PREFIX "MOVED " +#define MOVED_PREFIX_LEN (sizeof(MOVED_PREFIX) - 1) + +#define ASK_PREFIX "ASK " +#define ASK_PREFIX_LEN (sizeof(ASK_PREFIX) - 1) + +#define ERR_INVALID_REPLY -1 +#define ERR_INVALID_SLOT -2 +#define ERR_INVALID_PORT -3 + #include "cachedb_redis_dbase.h" int build_cluster_nodes(redis_con *con,char *info,int size); cluster_node *get_redis_connection(redis_con *con,str *key); +cluster_node *get_redis_connection_by_endpoint(redis_con *con, redis_moved *redis_info); void destroy_cluster_nodes(redis_con *con); +static inline int match_prefix(const char *buf, size_t len, const char *prefix, size_t prefix_len) { + if (len < prefix_len) return 0; + for (size_t i = 0; i < prefix_len; ++i) { + if (buf[i] != prefix[i]) return 0; + } + return 1; +} + #endif From 637fe435f7da56be8dd0e213267444d253672fee Mon Sep 17 00:00:00 2001 From: Norm Brandinger Date: Fri, 2 May 2025 07:25:25 -0400 Subject: [PATCH 02/10] Add missing function definition to the include --- modules/cachedb_redis/cachedb_redis_utils.h | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/cachedb_redis/cachedb_redis_utils.h b/modules/cachedb_redis/cachedb_redis_utils.h index e601adcdcbe..86190e95b33 100644 --- a/modules/cachedb_redis/cachedb_redis_utils.h +++ b/modules/cachedb_redis/cachedb_redis_utils.h @@ -44,6 +44,7 @@ int build_cluster_nodes(redis_con *con,char *info,int size); cluster_node *get_redis_connection(redis_con *con,str *key); cluster_node *get_redis_connection_by_endpoint(redis_con *con, redis_moved *redis_info); void destroy_cluster_nodes(redis_con *con); +int parse_moved_reply(redisReply *reply, redis_moved *out); static inline int match_prefix(const char *buf, size_t len, const char *prefix, size_t prefix_len) { if (len < prefix_len) return 0; From c6f6a1764fb4f0a6fbf5dcfcb820b3185e163e95 Mon Sep 17 00:00:00 2001 From: Norm Brandinger Date: Fri, 2 May 2025 07:29:58 -0400 Subject: [PATCH 03/10] Update some comments --- modules/cachedb_redis/cachedb_redis_dbase.h | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/modules/cachedb_redis/cachedb_redis_dbase.h b/modules/cachedb_redis/cachedb_redis_dbase.h index 063e2fc9a18..308b547930d 100644 --- a/modules/cachedb_redis/cachedb_redis_dbase.h +++ b/modules/cachedb_redis/cachedb_redis_dbase.h @@ -48,11 +48,15 @@ typedef struct cluster_nodes { struct cluster_nodes *next; } cluster_node; +// Helper typedef to store the endpoint from a redisReply. typedef struct { const char *s; int len; } const_str; +// When a MOVED is returned from Redis, it is parsed +// and its componenets are stored using the following +// typedef. typedef struct { int slot; const_str endpoint; From da8e646231d48df02ddd0f0f535557ca852c34b1 Mon Sep 17 00:00:00 2001 From: Norm Brandinger Date: Fri, 2 May 2025 07:34:43 -0400 Subject: [PATCH 04/10] Formatting updates --- modules/cachedb_redis/cachedb_redis_dbase.c | 26 ++++++++++----------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/modules/cachedb_redis/cachedb_redis_dbase.c b/modules/cachedb_redis/cachedb_redis_dbase.c index c522a59a886..e089ec7e2c6 100644 --- a/modules/cachedb_redis/cachedb_redis_dbase.c +++ b/modules/cachedb_redis/cachedb_redis_dbase.c @@ -559,28 +559,28 @@ static int _redis_run_command(cachedb_con *connection, redisReply **rpl, str *ke node->context->errstr); if (match_prefix(reply->str, reply->len, MOVED_PREFIX, MOVED_PREFIX_LEN)) { - // It's a MOVED response + // It's a MOVED response redis_moved *moved_info = pkg_malloc(sizeof(redis_moved)); - if (!moved_info) { - LM_ERR("cachedb_redis: Unable to allocate redis_moved struct, no more pkg memory\n"); + if (!moved_info) { + LM_ERR("cachedb_redis: Unable to allocate redis_moved struct, no more pkg memory\n"); } else { - if (parse_moved_reply(reply, moved_info) < 0) { + if (parse_moved_reply(reply, moved_info) < 0) { LM_ERR("cachedb_redis: Unable to parse MOVED reply\n"); pkg_free(moved_info); moved_info = NULL; - freeReplyObject(reply); + freeReplyObject(reply); goto try_next_con; } - LM_DBG("cachedb_redis: MOVED slot: [%d] endpoint: [%.*s] port: [%d]\n", moved_info->slot, moved_info->endpoint.len, moved_info->endpoint.s, moved_info->port); + LM_DBG("cachedb_redis: MOVED slot: [%d] endpoint: [%.*s] port: [%d]\n", moved_info->slot, moved_info->endpoint.len, moved_info->endpoint.s, moved_info->port); node = get_redis_connection_by_endpoint(con, moved_info); pkg_free(moved_info); moved_info = NULL; freeReplyObject(reply); - reply = NULL; + reply = NULL; - if (node == NULL) { + if (node == NULL) { LM_ERR("Unable to locate connection by endpoint\n"); last_err = -10; goto try_next_con; @@ -594,21 +594,21 @@ static int _redis_run_command(cachedb_con *connection, redisReply **rpl, str *ke } } - i = QUERY_ATTEMPTS; // New node that is being MOVED to should have the attempts reset + i = QUERY_ATTEMPTS; // New node that is the target being MOVED to, should have the attempts reset continue; } } - freeReplyObject(reply); - reply = NULL; + freeReplyObject(reply); + reply = NULL; if (node->context->err == REDIS_OK || redis_reconnect_node(con,node) < 0) { i = 0; break; } } else { - freeReplyObject(reply); - reply = NULL; + freeReplyObject(reply); + reply = NULL; break; } } From f96d578ff36ca59d85075b074041f3eca53f3616 Mon Sep 17 00:00:00 2001 From: Norm Brandinger Date: Fri, 2 May 2025 08:04:31 -0400 Subject: [PATCH 05/10] Update modules/cachedb_redis/cachedb_redis_utils.c Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- modules/cachedb_redis/cachedb_redis_utils.c | 4 ---- 1 file changed, 4 deletions(-) diff --git a/modules/cachedb_redis/cachedb_redis_utils.c b/modules/cachedb_redis/cachedb_redis_utils.c index 748c20e20db..4dab999acd3 100644 --- a/modules/cachedb_redis/cachedb_redis_utils.c +++ b/modules/cachedb_redis/cachedb_redis_utils.c @@ -367,10 +367,6 @@ int parse_moved_reply(redisReply *reply, redis_moved *out) { if (!reply || !reply->str || reply->len < MOVED_PREFIX_LEN || !out) return ERR_INVALID_REPLY; - // Check if the length of the reply buffer is too short - if (reply->len < MOVED_PREFIX_LEN) { - return ERR_INVALID_REPLY; - } const char *p = reply->str; const char *end = reply->str + reply->len; From ed937ce08e98027a10f3128acc2e1dd14ba28f81 Mon Sep 17 00:00:00 2001 From: Norm Brandinger Date: Fri, 2 May 2025 08:05:31 -0400 Subject: [PATCH 06/10] Update modules/cachedb_redis/cachedb_redis_dbase.c Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- modules/cachedb_redis/cachedb_redis_dbase.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/modules/cachedb_redis/cachedb_redis_dbase.c b/modules/cachedb_redis/cachedb_redis_dbase.c index e089ec7e2c6..bbb25fdff4a 100644 --- a/modules/cachedb_redis/cachedb_redis_dbase.c +++ b/modules/cachedb_redis/cachedb_redis_dbase.c @@ -563,6 +563,9 @@ static int _redis_run_command(cachedb_con *connection, redisReply **rpl, str *ke redis_moved *moved_info = pkg_malloc(sizeof(redis_moved)); if (!moved_info) { LM_ERR("cachedb_redis: Unable to allocate redis_moved struct, no more pkg memory\n"); + freeReplyObject(reply); + reply = NULL; + goto try_next_con; } else { if (parse_moved_reply(reply, moved_info) < 0) { LM_ERR("cachedb_redis: Unable to parse MOVED reply\n"); From 3a5ebe37fda50d7981861aeba169cbad7c9e882d Mon Sep 17 00:00:00 2001 From: Norm Brandinger Date: Fri, 2 May 2025 08:29:00 -0400 Subject: [PATCH 07/10] Add missing include --- modules/cachedb_redis/cachedb_redis_utils.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/modules/cachedb_redis/cachedb_redis_utils.c b/modules/cachedb_redis/cachedb_redis_utils.c index 4dab999acd3..b03484b3910 100644 --- a/modules/cachedb_redis/cachedb_redis_utils.c +++ b/modules/cachedb_redis/cachedb_redis_utils.c @@ -25,6 +25,7 @@ #include "../../dprint.h" #include "cachedb_redis_dbase.h" +#include "cachedb_redis_utils.h" #include "../../mem/mem.h" #include "../../ut.h" #include "../../cachedb/cachedb.h" @@ -432,4 +433,4 @@ int parse_moved_reply(redisReply *reply, redis_moved *out) { out->port = port; return 0; -} \ No newline at end of file +} From e57c5f2b8fb5198eae5d9ec64ea830e10e29c05f Mon Sep 17 00:00:00 2001 From: Norm Brandinger Date: Fri, 2 May 2025 10:01:30 -0400 Subject: [PATCH 08/10] Fix: A little too aggressive in freeing the redisReply object --- modules/cachedb_redis/cachedb_redis_dbase.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/modules/cachedb_redis/cachedb_redis_dbase.c b/modules/cachedb_redis/cachedb_redis_dbase.c index bbb25fdff4a..7a3b06e5710 100644 --- a/modules/cachedb_redis/cachedb_redis_dbase.c +++ b/modules/cachedb_redis/cachedb_redis_dbase.c @@ -610,8 +610,6 @@ static int _redis_run_command(cachedb_con *connection, redisReply **rpl, str *ke break; } } else { - freeReplyObject(reply); - reply = NULL; break; } } From 039cb78dafd54a90533120e5039e5facdf0e96e3 Mon Sep 17 00:00:00 2001 From: Norm Brandinger Date: Fri, 2 May 2025 14:42:23 -0400 Subject: [PATCH 09/10] Remove ASK defines until there is logic to implement it. --- modules/cachedb_redis/cachedb_redis_utils.h | 3 --- 1 file changed, 3 deletions(-) diff --git a/modules/cachedb_redis/cachedb_redis_utils.h b/modules/cachedb_redis/cachedb_redis_utils.h index 86190e95b33..3c9179a00f7 100644 --- a/modules/cachedb_redis/cachedb_redis_utils.h +++ b/modules/cachedb_redis/cachedb_redis_utils.h @@ -31,9 +31,6 @@ #define MOVED_PREFIX "MOVED " #define MOVED_PREFIX_LEN (sizeof(MOVED_PREFIX) - 1) -#define ASK_PREFIX "ASK " -#define ASK_PREFIX_LEN (sizeof(ASK_PREFIX) - 1) - #define ERR_INVALID_REPLY -1 #define ERR_INVALID_SLOT -2 #define ERR_INVALID_PORT -3 From 0b47f9bf4c371a18759ba017a6d1d344da58741f Mon Sep 17 00:00:00 2001 From: Norm Brandinger Date: Thu, 15 May 2025 14:44:03 -0400 Subject: [PATCH 10/10] Remove slot comparison when a MOVED is returned / fix space vs. tab formatting to be consistent --- modules/cachedb_redis/cachedb_redis_dbase.c | 24 ++-- modules/cachedb_redis/cachedb_redis_dbase.h | 10 +- modules/cachedb_redis/cachedb_redis_utils.c | 149 ++++++++++---------- modules/cachedb_redis/cachedb_redis_utils.h | 10 +- 4 files changed, 97 insertions(+), 96 deletions(-) diff --git a/modules/cachedb_redis/cachedb_redis_dbase.c b/modules/cachedb_redis/cachedb_redis_dbase.c index 7a3b06e5710..38957e4a9cc 100644 --- a/modules/cachedb_redis/cachedb_redis_dbase.c +++ b/modules/cachedb_redis/cachedb_redis_dbase.c @@ -561,29 +561,29 @@ static int _redis_run_command(cachedb_con *connection, redisReply **rpl, str *ke if (match_prefix(reply->str, reply->len, MOVED_PREFIX, MOVED_PREFIX_LEN)) { // It's a MOVED response redis_moved *moved_info = pkg_malloc(sizeof(redis_moved)); - if (!moved_info) { - LM_ERR("cachedb_redis: Unable to allocate redis_moved struct, no more pkg memory\n"); - freeReplyObject(reply); - reply = NULL; - goto try_next_con; + if (!moved_info) { + LM_ERR("cachedb_redis: Unable to allocate redis_moved struct, no more pkg memory\n"); + freeReplyObject(reply); + reply = NULL; + goto try_next_con; } else { - if (parse_moved_reply(reply, moved_info) < 0) { + if (parse_moved_reply(reply, moved_info) < 0) { LM_ERR("cachedb_redis: Unable to parse MOVED reply\n"); pkg_free(moved_info); moved_info = NULL; - freeReplyObject(reply); + freeReplyObject(reply); goto try_next_con; } - LM_DBG("cachedb_redis: MOVED slot: [%d] endpoint: [%.*s] port: [%d]\n", moved_info->slot, moved_info->endpoint.len, moved_info->endpoint.s, moved_info->port); + LM_DBG("cachedb_redis: MOVED slot: [%d] endpoint: [%.*s] port: [%d]\n", moved_info->slot, moved_info->endpoint.len, moved_info->endpoint.s, moved_info->port); node = get_redis_connection_by_endpoint(con, moved_info); pkg_free(moved_info); moved_info = NULL; freeReplyObject(reply); - reply = NULL; + reply = NULL; - if (node == NULL) { + if (node == NULL) { LM_ERR("Unable to locate connection by endpoint\n"); last_err = -10; goto try_next_con; @@ -602,8 +602,8 @@ static int _redis_run_command(cachedb_con *connection, redisReply **rpl, str *ke } } - freeReplyObject(reply); - reply = NULL; + freeReplyObject(reply); + reply = NULL; if (node->context->err == REDIS_OK || redis_reconnect_node(con,node) < 0) { i = 0; diff --git a/modules/cachedb_redis/cachedb_redis_dbase.h b/modules/cachedb_redis/cachedb_redis_dbase.h index 308b547930d..fc9d2f1e1c3 100644 --- a/modules/cachedb_redis/cachedb_redis_dbase.h +++ b/modules/cachedb_redis/cachedb_redis_dbase.h @@ -50,17 +50,17 @@ typedef struct cluster_nodes { // Helper typedef to store the endpoint from a redisReply. typedef struct { - const char *s; - int len; + const char *s; + int len; } const_str; // When a MOVED is returned from Redis, it is parsed // and its componenets are stored using the following // typedef. typedef struct { - int slot; - const_str endpoint; - int port; + int slot; + const_str endpoint; + int port; } redis_moved; diff --git a/modules/cachedb_redis/cachedb_redis_utils.c b/modules/cachedb_redis/cachedb_redis_utils.c index b03484b3910..5c75cba1b85 100644 --- a/modules/cachedb_redis/cachedb_redis_utils.c +++ b/modules/cachedb_redis/cachedb_redis_utils.c @@ -114,12 +114,14 @@ cluster_node *get_redis_connection_by_endpoint(redis_con *con, redis_moved *redi return con->nodes; } else { for (it=con->nodes;it;it=it->next) { - if (match_prefix(redis_info->endpoint.s, redis_info->endpoint.len, it->ip, strlen(it->ip))) { + if (match_prefix(redis_info->endpoint.s, redis_info->endpoint.len, it->ip, strlen(it->ip))) { if (it->port == redis_info->port) { - if (it->start_slot <= redis_info->slot && it->end_slot >= redis_info->slot) { + // Removed slot comparison as it may be a little too aggressive of a match + // Code is still here in the event that it needs to be added back in + //if (it->start_slot <= redis_info->slot && it->end_slot >= redis_info->slot) { LM_DBG("Redis cluster connection, matched con %p for endpoint: %.*s:%d slot: [%u] %u [%u] \n", it, redis_info->endpoint.len, redis_info->endpoint.s, redis_info->port, it->start_slot, redis_info->slot, it->end_slot); return it; - } + //} } } } @@ -358,79 +360,78 @@ int build_cluster_nodes(redis_con *con,char *info,int size) The parsed data is stored into the following redis_moved struct: typedef struct { - int slot; - const_str endpoint; - int port; + int slot; + const_str endpoint; + int port; } redis_moved; */ int parse_moved_reply(redisReply *reply, redis_moved *out) { - if (!reply || !reply->str || reply->len < MOVED_PREFIX_LEN || !out) - return ERR_INVALID_REPLY; - - - const char *p = reply->str; - const char *end = reply->str + reply->len; - - for (int i = 0; i < MOVED_PREFIX_LEN; ++i) { - if (p[i] != MOVED_PREFIX[i]) { - return ERR_INVALID_REPLY; - } - } - p += MOVED_PREFIX_LEN; - - // Parse slot number - int slot = 0; - while (p < end && *p >= '0' && *p <= '9') { - slot = slot * 10 + (*p - '0'); - p++; - } - if (slot == 0 && (p == reply->str + MOVED_PREFIX_LEN || *(p - 1) < '0' || *(p - 1) > '9')) - return ERR_INVALID_SLOT; - - // Skip spaces - while (p < end && *p == ' ') p++; - - // Parse host and port - const char *host_start = p; - const char *colon = NULL; - while (p < end) { - if (*p == ':') { - colon = p; - break; - } - p++; - } - - out->endpoint.s = NULL; - out->endpoint.len = 0; - - int port = REDIS_DF_PORT; // Default to Redis standard port - - if (colon) { - out->endpoint.s = host_start; - out->endpoint.len = colon - host_start; - - // Parse port - const char *port_start = colon + 1; - p = port_start; - if (p < end) { - port = 0; - while (p < end && *p >= '0' && *p <= '9') { - port = port * 10 + (*p - '0'); - p++; - } - if (port < 0 || port > 65535 || port_start == p) - return ERR_INVALID_PORT; - } - } else if (out->endpoint.s < end) { - out->endpoint.s = host_start; - out->endpoint.len = end - host_start; - } - - // Fill output - out->slot = slot; - out->port = port; - - return 0; + if (!reply || !reply->str || reply->len < MOVED_PREFIX_LEN || !out) + return ERR_INVALID_REPLY; + + const char *p = reply->str; + const char *end = reply->str + reply->len; + + for (int i = 0; i < MOVED_PREFIX_LEN; ++i) { + if (p[i] != MOVED_PREFIX[i]) { + return ERR_INVALID_REPLY; + } + } + p += MOVED_PREFIX_LEN; + + // Parse slot number + int slot = 0; + while (p < end && *p >= '0' && *p <= '9') { + slot = slot * 10 + (*p - '0'); + p++; + } + if (slot == 0 && (p == reply->str + MOVED_PREFIX_LEN || *(p - 1) < '0' || *(p - 1) > '9')) + return ERR_INVALID_SLOT; + + // Skip spaces + while (p < end && *p == ' ') p++; + + // Parse host and port + const char *host_start = p; + const char *colon = NULL; + while (p < end) { + if (*p == ':') { + colon = p; + break; + } + p++; + } + + out->endpoint.s = NULL; + out->endpoint.len = 0; + + int port = REDIS_DF_PORT; // Default to Redis standard port + + if (colon) { + out->endpoint.s = host_start; + out->endpoint.len = colon - host_start; + + // Parse port + const char *port_start = colon + 1; + p = port_start; + if (p < end) { + port = 0; + while (p < end && *p >= '0' && *p <= '9') { + port = port * 10 + (*p - '0'); + p++; + } + if (port < 0 || port > 65535 || port_start == p) + return ERR_INVALID_PORT; + } + } else if (out->endpoint.s < end) { + out->endpoint.s = host_start; + out->endpoint.len = end - host_start; + } + + // Fill output + out->slot = slot; + out->port = port; + + return 0; } diff --git a/modules/cachedb_redis/cachedb_redis_utils.h b/modules/cachedb_redis/cachedb_redis_utils.h index 3c9179a00f7..daf5bec3ef6 100644 --- a/modules/cachedb_redis/cachedb_redis_utils.h +++ b/modules/cachedb_redis/cachedb_redis_utils.h @@ -44,11 +44,11 @@ void destroy_cluster_nodes(redis_con *con); int parse_moved_reply(redisReply *reply, redis_moved *out); static inline int match_prefix(const char *buf, size_t len, const char *prefix, size_t prefix_len) { - if (len < prefix_len) return 0; - for (size_t i = 0; i < prefix_len; ++i) { - if (buf[i] != prefix[i]) return 0; - } - return 1; + if (len < prefix_len) return 0; + for (size_t i = 0; i < prefix_len; ++i) { + if (buf[i] != prefix[i]) return 0; + } + return 1; } #endif