Skip to content

Commit

Permalink
Handle empty addresses in CLUSTER NODES responses
Browse files Browse the repository at this point in the history
Treat an empty ip string as it means the same endpoint that
the current command was sent to.

Signed-off-by: Björn Svensson <[email protected]>
  • Loading branch information
bjosv committed Jan 15, 2025
1 parent f0e8cbb commit 57aac29
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 26 deletions.
60 changes: 44 additions & 16 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ static int store_replica_nodes(dict *nodes, dict *replicas) {
* Only parse primary nodes if the `parsed_primary_id` argument is NULL,
* otherwise replicas are also parsed and its primary_id is returned by pointer
* via 'parsed_primary_id'. */
static int parse_cluster_nodes_line(valkeyClusterContext *cc, char *line,
static int parse_cluster_nodes_line(valkeyClusterContext *cc, valkeyContext *c, char *line,
valkeyClusterNode **parsed_node, char **parsed_primary_id) {
char *p, *id = NULL, *addr = NULL, *flags = NULL, *primary_id = NULL,
*link_state = NULL, *slots = NULL;
Expand Down Expand Up @@ -825,25 +825,53 @@ static int parse_cluster_nodes_line(valkeyClusterContext *cc, char *line,
if ((p = strchr(addr, PORT_CPORT_SEPARATOR)) != NULL) {
*p = '\0';
}
node->addr = sdsnew(addr);
if (node->addr == NULL)
goto oom;

/* Get the host part */
/* Find the required port separator. */
if ((p = strrchr(addr, IP_PORT_SEPARATOR)) == NULL) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Invalid node address");
freeValkeyClusterNode(node);
return VALKEY_ERR;
}
*p = '\0';

node->host = sdsnew(addr);
if (node->host == NULL)
goto oom;
/* Get the port (skip the found port separator). */
int port = vk_atoi(p + 1, strlen(p + 1));
if (port < 1 || port > UINT16_MAX) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Invalid port");
freeValkeyClusterNode(node);
return VALKEY_ERR;
}
node->port = port;

/* Get the port. */
p++; // Skip separator character.
node->port = vk_atoi(p, strlen(p));
/* Check that we received an ip/host address, i.e. the field
* does not start with the port separator. */
if (p != addr) {
node->addr = sdsnew(addr);
if (node->addr == NULL)
goto oom;

*p = '\0'; /* Cut port separator. */

node->host = sdsnew(addr);
if (node->host == NULL)
goto oom;

} else {
/* We received an ip/host that is an empty string. According to the docs
* we can treat this as it means the same address we sent this command to. */
node->host = sdsnew(c->tcp.host);
if (node->host == NULL) {
goto oom;
}
/* Create a new addr field using correct host:port */
node->addr = sdsnew(node->host);
if (node->addr == NULL) {
goto oom;
}
node->addr = sdscatfmt(node->addr, ":%i", node->port);
if (node->addr == NULL) {
goto oom;
}
}

/* No slot parsing needed for replicas, but return primary id. */
if (node->role == VALKEY_ROLE_REPLICA) {
Expand Down Expand Up @@ -905,7 +933,7 @@ static int parse_cluster_nodes_line(valkeyClusterContext *cc, char *line,
/**
* Parse the "cluster nodes" command reply to nodes dict.
*/
static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) {
static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyContext *c, valkeyReply *reply) {
dict *nodes = NULL;
int slot_ranges_found = 0;
int add_replicas = cc->flags & VALKEYCLUSTER_FLAG_ADD_SLAVE;
Expand All @@ -930,7 +958,7 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) {

char *primary_id;
valkeyClusterNode *node;
if (parse_cluster_nodes_line(cc, line, &node, add_replicas ? &primary_id : NULL) != VALKEY_OK)
if (parse_cluster_nodes_line(cc, c, line, &node, add_replicas ? &primary_id : NULL) != VALKEY_OK)
goto error;
if (node == NULL)
continue; /* Line skipped. */
Expand Down Expand Up @@ -1031,7 +1059,7 @@ static int clusterUpdateRouteHandleReply(valkeyClusterContext *cc,
if (cc->flags & VALKEYCLUSTER_FLAG_ROUTE_USE_SLOTS) {
nodes = parse_cluster_slots(cc, reply);
} else {
nodes = parse_cluster_nodes(cc, reply);
nodes = parse_cluster_nodes(cc, c, reply);
}
freeReplyObject(reply);
return updateNodesAndSlotmap(cc, nodes);
Expand Down Expand Up @@ -2890,7 +2918,7 @@ void clusterNodesReplyCallback(valkeyAsyncContext *ac, void *r,
}

valkeyClusterContext *cc = acc->cc;
dict *nodes = parse_cluster_nodes(cc, reply);
dict *nodes = parse_cluster_nodes(cc, &ac->c, reply);
if (updateNodesAndSlotmap(cc, nodes) != VALKEY_OK) {
/* Ignore failures for now */
}
Expand Down
79 changes: 69 additions & 10 deletions tests/ut_slotmap_update.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ const char *__asan_default_options(void) {
}
#endif

/* Includes source file to test static functions. */
/* Includes source files to test static functions. */
#include "cluster.c"
#include "valkey.c"

#include <stdbool.h>

Expand All @@ -37,6 +38,7 @@ valkeyReply *create_cluster_nodes_reply(const char *bulkstr) {
/* Parse a cluster nodes reply from a basic deployment. */
void test_parse_cluster_nodes(bool parse_replicas) {
valkeyClusterContext *cc = valkeyClusterContextInit();
valkeyContext *c = valkeyContextInit();
valkeyClusterNode *node;
cluster_slot *slot;
dictIterator di;
Expand All @@ -51,7 +53,7 @@ void test_parse_cluster_nodes(bool parse_replicas) {
"6ec23923021cf3ffec47632106199cb7f496ce01 127.0.0.1:30005@31005,hostname5 slave 67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1 0 1426238316232 5 connected\n"
"824fe116063bc5fcf9f4ffd895bc17aee7731ac3 127.0.0.1:30006@31006,hostname6 slave 292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f 0 1426238317741 6 connected\n"
"e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 127.0.0.1:30001@31001,hostname1 myself,master - 0 0 1 connected 0-5460\n");
dict *nodes = parse_cluster_nodes(cc, reply);
dict *nodes = parse_cluster_nodes(cc, c, reply);
freeReplyObject(reply);

assert(nodes);
Expand Down Expand Up @@ -116,11 +118,13 @@ void test_parse_cluster_nodes(bool parse_replicas) {
}

dictRelease(nodes);
valkeyFree(c);
valkeyClusterFree(cc);
}

void test_parse_cluster_nodes_during_failover(void) {
valkeyClusterContext *cc = valkeyClusterContextInit();
valkeyContext *c = valkeyContextInit();
valkeyClusterNode *node;
cluster_slot *slot;
dictIterator di;
Expand All @@ -133,7 +137,7 @@ void test_parse_cluster_nodes_during_failover(void) {
"ad0f5210dda1736a1b5467cd6e797f011a192097 10.10.10.125:7000@17000 slave 4394d8eb03de1f524b56cb385f0eb9052ce65283 0 1625255656366 1 connected\n"
"8675cd30fdd4efa088634e50fbd5c0675238a35e 10.10.10.124:7000@17000 slave 22de56650b3714c1c42fc0d120f80c66c24d8795 0 1625255655360 3 connected\n"
"4394d8eb03de1f524b56cb385f0eb9052ce65283 10.10.10.121:7000@17000 myself,master - 0 1625255653000 1 connected 0-5460\n");
dict *nodes = parse_cluster_nodes(cc, reply);
dict *nodes = parse_cluster_nodes(cc, c, reply);
freeReplyObject(reply);

assert(nodes);
Expand Down Expand Up @@ -178,20 +182,22 @@ void test_parse_cluster_nodes_during_failover(void) {
assert(slot->end == 5460);

dictRelease(nodes);
valkeyFree(c);
valkeyClusterFree(cc);
}

/* Skip nodes with the `noaddr` flag. */
void test_parse_cluster_nodes_with_noaddr(void) {
valkeyClusterContext *cc = valkeyClusterContextInit();
valkeyContext *c = valkeyContextInit();
valkeyClusterNode *node;
dictIterator di;

valkeyReply *reply = create_cluster_nodes_reply(
"752d150249c157c7cb312b6b056517bbbecb42d2 :0@0 master,noaddr - 1658754833817 1658754833000 3 disconnected 5461-10922\n"
"e839a12fbed631de867016f636d773e644562e72 127.0.0.0:6379@16379 myself,master - 0 1658755601000 1 connected 0-5460\n"
"87f785c4a51f58c06e4be55de8c112210a811db9 127.0.0.2:6379@16379 master - 0 1658755602418 3 connected 10923-16383\n");
dict *nodes = parse_cluster_nodes(cc, reply);
dict *nodes = parse_cluster_nodes(cc, c, reply);
freeReplyObject(reply);

assert(nodes);
Expand All @@ -205,12 +211,48 @@ void test_parse_cluster_nodes_with_noaddr(void) {
assert(strcmp(node->addr, "127.0.0.2:6379") == 0);

dictRelease(nodes);
valkeyFree(c);
valkeyClusterFree(cc);
}

void test_parse_cluster_nodes_with_empty_ip(void) {
valkeyClusterContext *cc = valkeyClusterContextInit();
valkeyClusterNode *node;
dictIterator di;

/* Set the IP from which the response is received from. */
valkeyContext *c = valkeyContextInit();
c->tcp.host = strdup("127.0.0.99");

valkeyReply *reply = create_cluster_nodes_reply(
"752d150249c157c7cb312b6b056517bbbecb42d2 :6379@16379 master - 1658754833817 1658754833000 3 disconnected 5461-10922\n"
"e839a12fbed631de867016f636d773e644562e72 127.0.0.1:6379@16379 myself,master - 0 1658755601000 1 connected 0-5460\n"
"87f785c4a51f58c06e4be55de8c112210a811db9 127.0.0.2:6379@16379 master - 0 1658755602418 3 connected 10923-16383\n");
dict *nodes = parse_cluster_nodes(cc, c, reply);
freeReplyObject(reply);

assert(nodes);
assert(dictSize(nodes) == 3);
dictInitIterator(&di, nodes);
/* Verify node 1 */
node = dictGetEntryVal(dictNext(&di));
assert(strcmp(node->addr, "127.0.0.1:6379") == 0);
/* Verify node 2 */
node = dictGetEntryVal(dictNext(&di));
assert(strcmp(node->addr, "127.0.0.2:6379") == 0);
/* Verify node 3 */
node = dictGetEntryVal(dictNext(&di));
assert(strcmp(node->addr, "127.0.0.99:6379") == 0); /* Uses the IP from which the response was received from. */

dictRelease(nodes);
valkeyFree(c);
valkeyClusterFree(cc);
}

/* Parse replies with additional importing and migrating information. */
void test_parse_cluster_nodes_with_special_slot_entries(void) {
valkeyClusterContext *cc = valkeyClusterContextInit();
valkeyContext *c = valkeyContextInit();
valkeyClusterNode *node;
cluster_slot *slot;
dictIterator di;
Expand All @@ -220,7 +262,7 @@ void test_parse_cluster_nodes_with_special_slot_entries(void) {
* importing slot information that will be ignored. */
valkeyReply *reply = create_cluster_nodes_reply(
"4394d8eb03de1f524b56cb385f0eb9052ce65283 10.10.10.121:7000@17000 myself,master - 0 1625255653000 1 connected 0 2-5460 [0->-e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca] [1-<-292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f]\n");
dict *nodes = parse_cluster_nodes(cc, reply);
dict *nodes = parse_cluster_nodes(cc, c, reply);
freeReplyObject(reply);

assert(nodes);
Expand All @@ -243,12 +285,14 @@ void test_parse_cluster_nodes_with_special_slot_entries(void) {
assert(slot->end == 5460);

dictRelease(nodes);
valkeyFree(c);
valkeyClusterFree(cc);
}

/* Parse a cluster nodes reply containing a primary with multiple replicas. */
void test_parse_cluster_nodes_with_multiple_replicas(void) {
valkeyClusterContext *cc = valkeyClusterContextInit();
valkeyContext *c = valkeyContextInit();
valkeyClusterNode *node;
cluster_slot *slot;
dictIterator di;
Expand All @@ -263,7 +307,7 @@ void test_parse_cluster_nodes_with_multiple_replicas(void) {
"e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 127.0.0.1:30001@31001,hostname1 myself,master - 0 0 1 connected 0-16383\n"
"67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1 127.0.0.1:30002@31002,hostname2 slave e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 0 1426238316232 2 connected\n"
"292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f 127.0.0.1:30003@31003,hostname3 slave e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 0 1426238318243 3 connected\n");
dict *nodes = parse_cluster_nodes(cc, reply);
dict *nodes = parse_cluster_nodes(cc, c, reply);
freeReplyObject(reply);

/* Verify master. */
Expand Down Expand Up @@ -306,19 +350,21 @@ void test_parse_cluster_nodes_with_multiple_replicas(void) {
assert(node->role == VALKEY_ROLE_REPLICA);

dictRelease(nodes);
valkeyFree(c);
valkeyClusterFree(cc);
}

/* Give error when parsing erroneous data. */
void test_parse_cluster_nodes_with_parse_error(void) {
valkeyClusterContext *cc = valkeyClusterContextInit();
valkeyContext *c = valkeyContextInit();
valkeyReply *reply;
dict *nodes;

/* Missing link-state (and slots). */
reply = create_cluster_nodes_reply(
"e839a12fbed631de867016f636d773e644562e72 127.0.0.0:30001@31001 myself,master - 0 1658755601000 1 \n");
nodes = parse_cluster_nodes(cc, reply);
nodes = parse_cluster_nodes(cc, c, reply);
freeReplyObject(reply);
assert(nodes == NULL);
assert(cc->err == VALKEY_ERR_OTHER);
Expand All @@ -327,7 +373,7 @@ void test_parse_cluster_nodes_with_parse_error(void) {
/* Missing port. */
reply = create_cluster_nodes_reply(
"e839a12fbed631de867016f636d773e644562e72 127.0.0.0@31001 myself,master - 0 1658755601000 1 connected 0-5460\n");
nodes = parse_cluster_nodes(cc, reply);
nodes = parse_cluster_nodes(cc, c, reply);
freeReplyObject(reply);
assert(nodes == NULL);
assert(cc->err == VALKEY_ERR_OTHER);
Expand All @@ -336,26 +382,37 @@ void test_parse_cluster_nodes_with_parse_error(void) {
/* Missing port and cport. */
reply = create_cluster_nodes_reply(
"e839a12fbed631de867016f636d773e644562e72 127.0.0.0 myself,master - 0 1658755601000 1 connected 0-5460\n");
nodes = parse_cluster_nodes(cc, reply);
nodes = parse_cluster_nodes(cc, c, reply);
freeReplyObject(reply);
assert(nodes == NULL);
assert(cc->err == VALKEY_ERR_OTHER);
valkeyClusterClearError(cc);

/* Invalid port. */
reply = create_cluster_nodes_reply(
"e839a12fbed631de867016f636d773e644562e72 127.0.0.0:66000@67000 myself,master - 0 1658755601000 1 connected 0-5460\n");
nodes = parse_cluster_nodes(cc, c, reply);
freeReplyObject(reply);
assert(nodes == NULL);
assert(cc->err == VALKEY_ERR_OTHER);
valkeyClusterClearError(cc);

valkeyFree(c);
valkeyClusterFree(cc);
}

/* Redis pre-v4.0 returned node addresses without the clusterbus port,
* i.e. `ip:port` instead of `ip:port@cport` */
void test_parse_cluster_nodes_with_legacy_format(void) {
valkeyClusterContext *cc = valkeyClusterContextInit();
valkeyContext *c = valkeyContextInit();
valkeyClusterNode *node;
dictIterator di;

valkeyReply *reply = create_cluster_nodes_reply(
"e839a12fbed631de867016f636d773e644562e72 127.0.0.0:6379 myself,master - 0 1658755601000 1 connected 0-5460\n"
"752d150249c157c7cb312b6b056517bbbecb42d2 :0 master,noaddr - 1658754833817 1658754833000 3 disconnected 5461-10922\n");
dict *nodes = parse_cluster_nodes(cc, reply);
dict *nodes = parse_cluster_nodes(cc, c, reply);
freeReplyObject(reply);

assert(nodes);
Expand All @@ -365,6 +422,7 @@ void test_parse_cluster_nodes_with_legacy_format(void) {
assert(strcmp(node->addr, "127.0.0.0:6379") == 0);

dictRelease(nodes);
valkeyFree(c);
valkeyClusterFree(cc);
}

Expand All @@ -373,6 +431,7 @@ int main(void) {
test_parse_cluster_nodes(true /* replicas parsed */);
test_parse_cluster_nodes_during_failover();
test_parse_cluster_nodes_with_noaddr();
test_parse_cluster_nodes_with_empty_ip();
test_parse_cluster_nodes_with_special_slot_entries();
test_parse_cluster_nodes_with_multiple_replicas();
test_parse_cluster_nodes_with_parse_error();
Expand Down

0 comments on commit 57aac29

Please sign in to comment.