diff --git a/modules/cachedb_redis/cachedb_redis_dbase.c b/modules/cachedb_redis/cachedb_redis_dbase.c index 301ce5bb680..38957e4a9cc 100644 --- a/modules/cachedb_redis/cachedb_redis_dbase.c +++ b/modules/cachedb_redis/cachedb_redis_dbase.c @@ -37,7 +37,6 @@ #include #define QUERY_ATTEMPTS 2 -#define REDIS_DF_PORT 6379 int redis_query_tout = CACHEDB_REDIS_DEFAULT_TIMEOUT; int redis_connnection_tout = CACHEDB_REDIS_DEFAULT_TIMEOUT; @@ -548,18 +547,71 @@ static int _redis_run_command(cachedb_con *connection, redisReply **rpl, str *ke va_end(aq); } - if (reply == NULL || reply->type == REDIS_REPLY_ERROR) { + if (reply == NULL) { + LM_INFO("Redis query failed: reply: NULL node->context->err: %d, node->context->errstr: %s\n", node->context->err, node->context->errstr); + if (node->context->err == REDIS_OK || redis_reconnect_node(con,node) < 0) { + i = 0; + break; + } + } else if (reply->type == REDIS_REPLY_ERROR) { LM_INFO("Redis query failed: %p %.*s (%s)\n", reply,reply?(unsigned)reply->len:7,reply?reply->str:"FAILURE", node->context->errstr); - if (reply) { - freeReplyObject(reply); - reply = NULL; + + if (match_prefix(reply->str, reply->len, MOVED_PREFIX, MOVED_PREFIX_LEN)) { + // It's a MOVED response + redis_moved *moved_info = pkg_malloc(sizeof(redis_moved)); + if (!moved_info) { + LM_ERR("cachedb_redis: Unable to allocate redis_moved struct, no more pkg memory\n"); + freeReplyObject(reply); + reply = NULL; + goto try_next_con; + } else { + if (parse_moved_reply(reply, moved_info) < 0) { + LM_ERR("cachedb_redis: Unable to parse MOVED reply\n"); + pkg_free(moved_info); + moved_info = NULL; + freeReplyObject(reply); + goto try_next_con; + } + + LM_DBG("cachedb_redis: MOVED slot: [%d] endpoint: [%.*s] port: [%d]\n", moved_info->slot, moved_info->endpoint.len, moved_info->endpoint.s, moved_info->port); + node = get_redis_connection_by_endpoint(con, moved_info); + + pkg_free(moved_info); + moved_info = NULL; + freeReplyObject(reply); + reply = NULL; + + if (node == NULL) { + LM_ERR("Unable to locate connection by endpoint\n"); + last_err = -10; + goto try_next_con; + } + + if (node->context == NULL) { + if (redis_reconnect_node(con,node) < 0) { + LM_ERR("Unable to reconnect to node %p endpoint: %s:%d\n", node, node->ip, node->port); + last_err = -1; + goto try_next_con; + } + } + + i = QUERY_ATTEMPTS; // New node that is the target being MOVED to, should have the attempts reset + continue; + } } + + freeReplyObject(reply); + reply = NULL; + if (node->context->err == REDIS_OK || redis_reconnect_node(con,node) < 0) { - i = 0; break; + i = 0; + break; } - } else break; + } else { + break; + } } if (i==0) { diff --git a/modules/cachedb_redis/cachedb_redis_dbase.h b/modules/cachedb_redis/cachedb_redis_dbase.h index cecdeb6bcb6..fc9d2f1e1c3 100644 --- a/modules/cachedb_redis/cachedb_redis_dbase.h +++ b/modules/cachedb_redis/cachedb_redis_dbase.h @@ -48,6 +48,21 @@ typedef struct cluster_nodes { struct cluster_nodes *next; } cluster_node; +// Helper typedef to store the endpoint from a redisReply. +typedef struct { + const char *s; + int len; +} const_str; + +// When a MOVED is returned from Redis, it is parsed +// and its componenets are stored using the following +// typedef. +typedef struct { + int slot; + const_str endpoint; + int port; +} redis_moved; + #define CACHEDB_REDIS_DEFAULT_TIMEOUT 5000 diff --git a/modules/cachedb_redis/cachedb_redis_utils.c b/modules/cachedb_redis/cachedb_redis_utils.c index 0735dc98820..5c75cba1b85 100644 --- a/modules/cachedb_redis/cachedb_redis_utils.c +++ b/modules/cachedb_redis/cachedb_redis_utils.c @@ -25,6 +25,7 @@ #include "../../dprint.h" #include "cachedb_redis_dbase.h" +#include "cachedb_redis_utils.h" #include "../../mem/mem.h" #include "../../ut.h" #include "../../cachedb/cachedb.h" @@ -104,6 +105,31 @@ cluster_node *get_redis_connection(redis_con *con,str *key) } } +cluster_node *get_redis_connection_by_endpoint(redis_con *con, redis_moved *redis_info) +{ + cluster_node *it; + + if (con->flags & REDIS_SINGLE_INSTANCE) { + LM_DBG("Single redis connection, returning %p\n",con->nodes); + return con->nodes; + } else { + for (it=con->nodes;it;it=it->next) { + if (match_prefix(redis_info->endpoint.s, redis_info->endpoint.len, it->ip, strlen(it->ip))) { + if (it->port == redis_info->port) { + // Removed slot comparison as it may be a little too aggressive of a match + // Code is still here in the event that it needs to be added back in + //if (it->start_slot <= redis_info->slot && it->end_slot >= redis_info->slot) { + LM_DBG("Redis cluster connection, matched con %p for endpoint: %.*s:%d slot: [%u] %u [%u] \n", it, redis_info->endpoint.len, redis_info->endpoint.s, redis_info->port, it->start_slot, redis_info->slot, it->end_slot); + return it; + //} + } + } + } + LM_ERR("Redis cluster connection, No match found for endpoint: %.*s:%d slot %u\n", redis_info->endpoint.len, redis_info->endpoint.s, redis_info->port, redis_info->slot); + return NULL; + } +} + void destroy_cluster_nodes(redis_con *con) { cluster_node *new,*foo; @@ -319,3 +345,93 @@ int build_cluster_nodes(redis_con *con,char *info,int size) destroy_cluster_nodes(con); return -1; } + +/* + When Redis is operating as a cluster, it is possible (very likely) + that a MOVED redirection will be returned by the Redis nodes that + received the request. The general format of the reply from Redis is: + MOVED slot [IP|FQDN]:port + + This routine will parse the Redis MOVED reply into its components. + Note that the redisReply struct MUST be released outside of this routine + to avoid a memory leak. The out->endpoint pointer must not be used after + the redisReply has been released. + + The parsed data is stored into the following redis_moved struct: + + typedef struct { + int slot; + const_str endpoint; + int port; + } redis_moved; + +*/ +int parse_moved_reply(redisReply *reply, redis_moved *out) { + if (!reply || !reply->str || reply->len < MOVED_PREFIX_LEN || !out) + return ERR_INVALID_REPLY; + + const char *p = reply->str; + const char *end = reply->str + reply->len; + + for (int i = 0; i < MOVED_PREFIX_LEN; ++i) { + if (p[i] != MOVED_PREFIX[i]) { + return ERR_INVALID_REPLY; + } + } + p += MOVED_PREFIX_LEN; + + // Parse slot number + int slot = 0; + while (p < end && *p >= '0' && *p <= '9') { + slot = slot * 10 + (*p - '0'); + p++; + } + if (slot == 0 && (p == reply->str + MOVED_PREFIX_LEN || *(p - 1) < '0' || *(p - 1) > '9')) + return ERR_INVALID_SLOT; + + // Skip spaces + while (p < end && *p == ' ') p++; + + // Parse host and port + const char *host_start = p; + const char *colon = NULL; + while (p < end) { + if (*p == ':') { + colon = p; + break; + } + p++; + } + + out->endpoint.s = NULL; + out->endpoint.len = 0; + + int port = REDIS_DF_PORT; // Default to Redis standard port + + if (colon) { + out->endpoint.s = host_start; + out->endpoint.len = colon - host_start; + + // Parse port + const char *port_start = colon + 1; + p = port_start; + if (p < end) { + port = 0; + while (p < end && *p >= '0' && *p <= '9') { + port = port * 10 + (*p - '0'); + p++; + } + if (port < 0 || port > 65535 || port_start == p) + return ERR_INVALID_PORT; + } + } else if (out->endpoint.s < end) { + out->endpoint.s = host_start; + out->endpoint.len = end - host_start; + } + + // Fill output + out->slot = slot; + out->port = port; + + return 0; +} diff --git a/modules/cachedb_redis/cachedb_redis_utils.h b/modules/cachedb_redis/cachedb_redis_utils.h index df7435e9b94..daf5bec3ef6 100644 --- a/modules/cachedb_redis/cachedb_redis_utils.h +++ b/modules/cachedb_redis/cachedb_redis_utils.h @@ -26,10 +26,29 @@ #ifndef CACHEDB_REDIS_UTILSH #define CACHEDB_REDIS_UTILSH +#define REDIS_DF_PORT 6379 + +#define MOVED_PREFIX "MOVED " +#define MOVED_PREFIX_LEN (sizeof(MOVED_PREFIX) - 1) + +#define ERR_INVALID_REPLY -1 +#define ERR_INVALID_SLOT -2 +#define ERR_INVALID_PORT -3 + #include "cachedb_redis_dbase.h" int build_cluster_nodes(redis_con *con,char *info,int size); cluster_node *get_redis_connection(redis_con *con,str *key); +cluster_node *get_redis_connection_by_endpoint(redis_con *con, redis_moved *redis_info); void destroy_cluster_nodes(redis_con *con); +int parse_moved_reply(redisReply *reply, redis_moved *out); + +static inline int match_prefix(const char *buf, size_t len, const char *prefix, size_t prefix_len) { + if (len < prefix_len) return 0; + for (size_t i = 0; i < prefix_len; ++i) { + if (buf[i] != prefix[i]) return 0; + } + return 1; +} #endif