Skip to content
66 changes: 59 additions & 7 deletions modules/cachedb_redis/cachedb_redis_dbase.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
#include <hiredis/hiredis.h>

#define QUERY_ATTEMPTS 2
#define REDIS_DF_PORT 6379

int redis_query_tout = CACHEDB_REDIS_DEFAULT_TIMEOUT;
int redis_connnection_tout = CACHEDB_REDIS_DEFAULT_TIMEOUT;
Expand Down Expand Up @@ -548,18 +547,71 @@ static int _redis_run_command(cachedb_con *connection, redisReply **rpl, str *ke
va_end(aq);
}

if (reply == NULL || reply->type == REDIS_REPLY_ERROR) {
if (reply == NULL) {
LM_INFO("Redis query failed: reply: NULL node->context->err: %d, node->context->errstr: %s\n", node->context->err, node->context->errstr);
if (node->context->err == REDIS_OK || redis_reconnect_node(con,node) < 0) {
i = 0;
break;
}
} else if (reply->type == REDIS_REPLY_ERROR) {
LM_INFO("Redis query failed: %p %.*s (%s)\n",
reply,reply?(unsigned)reply->len:7,reply?reply->str:"FAILURE",
node->context->errstr);
if (reply) {
freeReplyObject(reply);
reply = NULL;

if (match_prefix(reply->str, reply->len, MOVED_PREFIX, MOVED_PREFIX_LEN)) {
// It's a MOVED response
redis_moved *moved_info = pkg_malloc(sizeof(redis_moved));
if (!moved_info) {
LM_ERR("cachedb_redis: Unable to allocate redis_moved struct, no more pkg memory\n");
freeReplyObject(reply);
reply = NULL;
goto try_next_con;
} else {
if (parse_moved_reply(reply, moved_info) < 0) {
LM_ERR("cachedb_redis: Unable to parse MOVED reply\n");
pkg_free(moved_info);
moved_info = NULL;
freeReplyObject(reply);
goto try_next_con;
}

LM_DBG("cachedb_redis: MOVED slot: [%d] endpoint: [%.*s] port: [%d]\n", moved_info->slot, moved_info->endpoint.len, moved_info->endpoint.s, moved_info->port);
node = get_redis_connection_by_endpoint(con, moved_info);

pkg_free(moved_info);
moved_info = NULL;
freeReplyObject(reply);
reply = NULL;

if (node == NULL) {
LM_ERR("Unable to locate connection by endpoint\n");
last_err = -10;
goto try_next_con;
}

if (node->context == NULL) {
if (redis_reconnect_node(con,node) < 0) {
LM_ERR("Unable to reconnect to node %p endpoint: %s:%d\n", node, node->ip, node->port);
last_err = -1;
goto try_next_con;
}
}

i = QUERY_ATTEMPTS; // New node that is the target being MOVED to, should have the attempts reset
continue;
}
}

freeReplyObject(reply);
reply = NULL;

if (node->context->err == REDIS_OK || redis_reconnect_node(con,node) < 0) {
i = 0; break;
i = 0;
break;
}
} else break;
} else {
break;
}
}

if (i==0) {
Expand Down
15 changes: 15 additions & 0 deletions modules/cachedb_redis/cachedb_redis_dbase.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,21 @@ typedef struct cluster_nodes {
struct cluster_nodes *next;
} cluster_node;

// Helper typedef to store the endpoint from a redisReply.
typedef struct {
const char *s;
int len;
} const_str;

// When a MOVED is returned from Redis, it is parsed
// and its componenets are stored using the following
// typedef.
typedef struct {
int slot;
const_str endpoint;
int port;
} redis_moved;


#define CACHEDB_REDIS_DEFAULT_TIMEOUT 5000

Expand Down
116 changes: 116 additions & 0 deletions modules/cachedb_redis/cachedb_redis_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "../../dprint.h"
#include "cachedb_redis_dbase.h"
#include "cachedb_redis_utils.h"
#include "../../mem/mem.h"
#include "../../ut.h"
#include "../../cachedb/cachedb.h"
Expand Down Expand Up @@ -104,6 +105,31 @@ cluster_node *get_redis_connection(redis_con *con,str *key)
}
}

cluster_node *get_redis_connection_by_endpoint(redis_con *con, redis_moved *redis_info)
{
cluster_node *it;

if (con->flags & REDIS_SINGLE_INSTANCE) {
LM_DBG("Single redis connection, returning %p\n",con->nodes);
return con->nodes;
} else {
for (it=con->nodes;it;it=it->next) {
if (match_prefix(redis_info->endpoint.s, redis_info->endpoint.len, it->ip, strlen(it->ip))) {
if (it->port == redis_info->port) {
// Removed slot comparison as it may be a little too aggressive of a match
// Code is still here in the event that it needs to be added back in
//if (it->start_slot <= redis_info->slot && it->end_slot >= redis_info->slot) {
LM_DBG("Redis cluster connection, matched con %p for endpoint: %.*s:%d slot: [%u] %u [%u] \n", it, redis_info->endpoint.len, redis_info->endpoint.s, redis_info->port, it->start_slot, redis_info->slot, it->end_slot);
return it;
//}
}
}
}
LM_ERR("Redis cluster connection, No match found for endpoint: %.*s:%d slot %u\n", redis_info->endpoint.len, redis_info->endpoint.s, redis_info->port, redis_info->slot);
return NULL;
}
}

void destroy_cluster_nodes(redis_con *con)
{
cluster_node *new,*foo;
Expand Down Expand Up @@ -319,3 +345,93 @@ int build_cluster_nodes(redis_con *con,char *info,int size)
destroy_cluster_nodes(con);
return -1;
}

/*
When Redis is operating as a cluster, it is possible (very likely)
that a MOVED redirection will be returned by the Redis nodes that
received the request. The general format of the reply from Redis is:
MOVED slot [IP|FQDN]:port

This routine will parse the Redis MOVED reply into its components.
Note that the redisReply struct MUST be released outside of this routine
to avoid a memory leak. The out->endpoint pointer must not be used after
the redisReply has been released.

The parsed data is stored into the following redis_moved struct:

typedef struct {
int slot;
const_str endpoint;
int port;
} redis_moved;

*/
int parse_moved_reply(redisReply *reply, redis_moved *out) {
if (!reply || !reply->str || reply->len < MOVED_PREFIX_LEN || !out)
return ERR_INVALID_REPLY;

const char *p = reply->str;
const char *end = reply->str + reply->len;

for (int i = 0; i < MOVED_PREFIX_LEN; ++i) {
if (p[i] != MOVED_PREFIX[i]) {
return ERR_INVALID_REPLY;
}
}
p += MOVED_PREFIX_LEN;

// Parse slot number
int slot = 0;
while (p < end && *p >= '0' && *p <= '9') {
slot = slot * 10 + (*p - '0');
p++;
}
if (slot == 0 && (p == reply->str + MOVED_PREFIX_LEN || *(p - 1) < '0' || *(p - 1) > '9'))
return ERR_INVALID_SLOT;

// Skip spaces
while (p < end && *p == ' ') p++;

// Parse host and port
const char *host_start = p;
const char *colon = NULL;
while (p < end) {
if (*p == ':') {
colon = p;
break;
}
p++;
}

out->endpoint.s = NULL;
out->endpoint.len = 0;

int port = REDIS_DF_PORT; // Default to Redis standard port

if (colon) {
out->endpoint.s = host_start;
out->endpoint.len = colon - host_start;

// Parse port
const char *port_start = colon + 1;
p = port_start;
if (p < end) {
port = 0;
while (p < end && *p >= '0' && *p <= '9') {
port = port * 10 + (*p - '0');
p++;
}
if (port < 0 || port > 65535 || port_start == p)
return ERR_INVALID_PORT;
}
} else if (out->endpoint.s < end) {
out->endpoint.s = host_start;
out->endpoint.len = end - host_start;
}

// Fill output
out->slot = slot;
out->port = port;

return 0;
}
19 changes: 19 additions & 0 deletions modules/cachedb_redis/cachedb_redis_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,29 @@
#ifndef CACHEDB_REDIS_UTILSH
#define CACHEDB_REDIS_UTILSH

#define REDIS_DF_PORT 6379

#define MOVED_PREFIX "MOVED "
#define MOVED_PREFIX_LEN (sizeof(MOVED_PREFIX) - 1)

#define ERR_INVALID_REPLY -1
#define ERR_INVALID_SLOT -2
#define ERR_INVALID_PORT -3

#include "cachedb_redis_dbase.h"

int build_cluster_nodes(redis_con *con,char *info,int size);
cluster_node *get_redis_connection(redis_con *con,str *key);
cluster_node *get_redis_connection_by_endpoint(redis_con *con, redis_moved *redis_info);
void destroy_cluster_nodes(redis_con *con);
int parse_moved_reply(redisReply *reply, redis_moved *out);

static inline int match_prefix(const char *buf, size_t len, const char *prefix, size_t prefix_len) {
if (len < prefix_len) return 0;
for (size_t i = 0; i < prefix_len; ++i) {
if (buf[i] != prefix[i]) return 0;
}
return 1;
}

#endif