From 5ce64d6bc8cef056fa027746dc5d747bd021c5b6 Mon Sep 17 00:00:00 2001 From: Jacob Murphy Date: Tue, 5 Nov 2024 01:29:45 +0000 Subject: [PATCH] Fix slot migration vote logic Signed-off-by: Jacob Murphy --- src/cluster_legacy.c | 232 ++++++++++++++++++++++++++++--------------- src/cluster_legacy.h | 5 +- src/server.c | 7 ++ 3 files changed, 160 insertions(+), 84 deletions(-) diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 79cea11e19..a729de67b2 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -60,8 +60,8 @@ void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask); void clusterReadHandler(connection *conn); void clusterSendPing(clusterLink *link, int type); void clusterSendFail(char *nodename); -int clusterValidateFailoverAuth(clusterNode *node, clusterMsg *request, int slot_num); -void clusterSendFailoverAuth(clusterNode *node); +void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request); +void clusterSendMigrateSlotAckIfNeeded(clusterNode *node, clusterMsg *request, int slot_num); void clusterUpdateState(void); list *clusterGetNodesInMyShard(clusterNode *node); int clusterNodeAddReplica(clusterNode *primary, clusterNode *replica); @@ -1111,6 +1111,7 @@ void clusterInit(void) { memset(server.cluster->slots, 0, sizeof(server.cluster->slots)); clusterCloseAllSlots(); + memset(server.cluster->last_slot_vote_time, 0, sizeof(server.cluster->last_slot_vote_time)); memset(server.cluster->owner_not_claiming_slot, 0, sizeof(server.cluster->owner_not_claiming_slot)); @@ -1505,7 +1506,6 @@ clusterNode *createClusterNode(char *nodename, int flags) { node->cport = 0; node->tls_port = 0; node->fail_reports = listCreate(); - node->voted_time = 0; node->orphaned_time = 0; node->repl_offset_time = 0; node->repl_offset = 0; @@ -3165,19 +3165,19 @@ int clusterProcessPacket(clusterLink *link) { server.cluster->mf_primary_offset); } /* If we are a primary performing slot migration and the slot owner - * sent its offset while already paused, populate the MF state. */ + * sent its offset while already paused, populate the migration state. */ slotMigration * curr_migration = clusterGetCurrentSlotMigration(); - if (server.cluster->mf_end && hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED && - server.cluster->mf_primary_offset == -1 && curr_migration != NULL && - curr_migration->state == SLOT_MIGRATION_PAUSE_OWNER && + if (hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED && curr_migration != NULL && + curr_migration->state == SLOT_MIGRATION_WAITING_FOR_OFFSET && curr_migration->source_node == sender) { - serverLog(LL_NOTICE, - "Received replication offset for paused " - "slot migration failover: %lld", - server.cluster->mf_primary_offset); curr_migration->pause_primary_offset = sender->repl_offset; - curr_migration->state = SLOT_MIGRATION_SYNCING_TO_PAUSE; + curr_migration->state = SLOT_MIGRATION_SYNCING_TO_OFFSET; clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_SLOTMIGRATION); + serverLog(LL_NOTICE, + "Received replication offset from paused owner for " + "slot migration of slot %d: %lld", + curr_migration->slot, + curr_migration->pause_primary_offset); } } @@ -3525,9 +3525,7 @@ int clusterProcessPacket(clusterLink *link) { clusterProcessPublishPacket(&hdr->data.publish.msg, type); } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) { if (!sender) return 1; /* We don't know that node. */ - if (clusterValidateFailoverAuth(sender, hdr, -1) != C_ERR) { - clusterSendFailoverAuth(sender); - } + clusterSendFailoverAuthIfNeeded(sender, hdr); } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) { if (!sender) return 1; /* We don't know that node. */ /* We consider this vote only if the sender is a primary serving @@ -3606,9 +3604,7 @@ int clusterProcessPacket(clusterLink *link) { } else if (type == CLUSTERMSG_TYPE_MIGRATE_SLOT_REQUEST) { if (!sender) return 1; /* We don't know that node. */ uint16_t slot_num = hdr->data.slot_migration.msg.slot_num; - if (clusterValidateFailoverAuth(sender, hdr, slot_num) == C_OK) { - clusterSendMigrateSlotAck(sender, slot_num); - } + clusterSendMigrateSlotAckIfNeeded(sender, hdr, slot_num); } else if (type == CLUSTERMSG_TYPE_MIGRATE_SLOT_ACK) { if (!sender) return 1; /* We don't know that node. */ /* We consider this vote only if the sender is a primary serving @@ -4331,9 +4327,9 @@ void clusterProceedWithSlotMigration(void) { "Timed out for slot migration from source node %.40s for slot %d", curr_migration->source_node->name, curr_migration->slot); curr_migration->state = SLOT_MIGRATION_FAILED; } - if (curr_migration->state >= SLOT_MIGRATION_PAUSE_OWNER && curr_migration->pause_end < mstime()) { + if (curr_migration->state > SLOT_MIGRATION_PAUSE_OWNER && curr_migration->pause_end < mstime() && curr_migration->vote_retry_time < mstime()) { /* If the owner ever unpauses, we have to move back in the state machine and retry. */ - serverLog(LL_WARNING, "Timed out waiting to sync to slot owner's paused time. Going to reinitiate pause and retry."); + serverLog(LL_WARNING, "Reinitiating pause on the node owning the slot..."); curr_migration->state = SLOT_MIGRATION_PAUSE_OWNER; curr_migration->pause_end = mstime() + CLUSTER_MF_TIMEOUT; } @@ -4379,11 +4375,15 @@ void clusterProceedWithSlotMigration(void) { case SLOT_MIGRATION_PAUSE_OWNER: serverLog(LL_NOTICE, "Replication link to slot owner %.40s has been established. Pausing source node on slot %d and waiting to continue", curr_migration->source_node->name, curr_migration->slot); clusterSendMigrateSlotStart(curr_migration->source_node, curr_migration->slot); + curr_migration->pause_primary_offset = -1; curr_migration->pause_end = mstime() + CLUSTER_MF_TIMEOUT; - curr_migration->state = SLOT_MIGRATION_SYNCING_TO_PAUSE; + curr_migration->state = SLOT_MIGRATION_WAITING_FOR_OFFSET; continue; - case SLOT_MIGRATION_SYNCING_TO_PAUSE: - if (curr_migration->pause_primary_offset && curr_migration->link->client->reploff >= curr_migration->pause_primary_offset) { + case SLOT_MIGRATION_WAITING_FOR_OFFSET: + /* Nothing to do, need to wait for cluster message to come in. */ + return; + case SLOT_MIGRATION_SYNCING_TO_OFFSET: + if (curr_migration->link->client->reploff >= curr_migration->pause_primary_offset) { serverLog(LL_NOTICE, "Replication of slot %d has caught up to paused slot owner, slot migration can start.", curr_migration->slot); curr_migration->state = SLOT_MIGRATION_STARTING_VOTE; continue; @@ -4392,14 +4392,6 @@ void clusterProceedWithSlotMigration(void) { return; case SLOT_MIGRATION_STARTING_VOTE: if (curr_migration->vote_retry_time < mstime()) { - /* Compute the failover timeout (the max time we have to send votes - * and wait for replies), and the failover retry time (the time to wait - * before trying to get voted again). - * - * Timeout is MAX(NODE_TIMEOUT*2,2000) milliseconds. - * Retry is two times the Timeout. - */ - // TODO(murphyjacob4) lets make this a function and share with repl mstime_t timeout = clusterGetVoteTimeout(); curr_migration->vote_retry_time = mstime() + timeout * 2; curr_migration->vote_end_time = mstime() + timeout; @@ -4460,10 +4452,6 @@ void clusterProceedWithSlotMigration(void) { } } -void clusterProceedWithSlotMigrationDestination(void) { - -} - /* ----------------------------------------------------------------------------- * REPLICA node specific functions * -------------------------------------------------------------------------- */ @@ -4538,35 +4526,54 @@ void clusterSendMFStart(clusterNode *node) { clusterMsgSendBlockDecrRefCount(msgblock); } -int clusterValidateSlotForFailover(int slot_num, clusterNode* request_node, uint64_t request_epoch) { - if (!isSlotUnclaimed(slot_num) && server.cluster->slots[slot_num]->configEpoch > request_epoch) { - /* We found a slot that in our current slots is served by a primary - * with a greater configEpoch than the one claimed by the replica - * requesting our vote. Refuse to vote for this replica. */ - serverLog(LL_WARNING, - "Failover auth denied to %.40s (%s): " - "slot %d epoch (%llu) > reqEpoch (%llu)", - request_node->name, request_node->human_nodename, slot_num, (unsigned long long)server.cluster->slots[slot_num]->configEpoch, - (unsigned long long)request_epoch); - return C_ERR; +mstime_t clusterGetRemainingSlotLevelVoteTimeout(unsigned char* claimed_slots) { + mstime_t now = mstime(); + mstime_t result = 0; + for (int j = 0; j < CLUSTER_SLOTS; j++) { + if (bitmapTestBit(claimed_slots, j) == 0) continue; + mstime_t remaining = server.cluster->last_slot_vote_time[j] + server.cluster_node_timeout * 2 - now; + if (remaining > result) { + result = remaining; + } } - return C_OK; + return result; +} + +void clusterUpdateSlotLevelVoteTime(unsigned char* claimed_slots) { + mstime_t now = mstime(); + for (int j = 0; j < CLUSTER_SLOTS; j++) { + if (bitmapTestBit(claimed_slots, j) == 0) continue; + server.cluster->last_slot_vote_time[j] = now; + } +} + +int clusterGetFirstSlotWithHigherConfigEpoch(uint64_t request_config_epoch, unsigned char* claimed_slots) { + for (int j = 0; j < CLUSTER_SLOTS; j++) { + if (bitmapTestBit(claimed_slots, j) == 0) continue; + if (isSlotUnclaimed(j) || server.cluster->slots[j]->configEpoch <= request_config_epoch) { + continue; + } + /* If we reached this point we found a slot that in our current slots + * is served by a primary with a greater configEpoch than the one claimed + * by the node requesting our vote. We should refuse to vote for this node. */ + return j; + } + return -1; } /* Vote for the node asking for our vote if there are the conditions. */ -int clusterValidateFailoverAuth(clusterNode *node, clusterMsg *request, int slot_num) { +void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { clusterNode *primary = node->replicaof; uint64_t requestCurrentEpoch = ntohu64(request->currentEpoch); uint64_t requestConfigEpoch = ntohu64(request->configEpoch); unsigned char *claimed_slots = request->myslots; int force_ack = request->mflags[0] & CLUSTERMSG_FLAG0_FORCEACK; - int j; /* IF we are not a primary serving at least 1 slot, we don't have the * right to vote, as the cluster size is the number * of primaries serving at least one slot, and quorum is the cluster * size + 1 */ - if (!clusterNodeIsVotingPrimary(myself)) return C_ERR; + if (!clusterNodeIsVotingPrimary(myself)) return; /* Request epoch must be >= our currentEpoch. * Note that it is impossible for it to actually be greater since @@ -4576,21 +4583,20 @@ int clusterValidateFailoverAuth(clusterNode *node, clusterMsg *request, int slot serverLog(LL_WARNING, "Failover auth denied to %.40s (%s): reqEpoch (%llu) < curEpoch(%llu)", node->name, node->human_nodename, (unsigned long long)requestCurrentEpoch, (unsigned long long)server.cluster->currentEpoch); - return C_ERR; + return; } /* I already voted for this epoch? Return ASAP. */ if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) { serverLog(LL_WARNING, "Failover auth denied to %.40s (%s): already voted for epoch %llu", node->name, node->human_nodename, (unsigned long long)server.cluster->currentEpoch); - return C_ERR; + return; } /* Node must be a replica and its primary down. * The primary can be non failing if the request is flagged - * with CLUSTERMSG_FLAG0_FORCEACK (manual failover) or slot_num - * is set to some slot (slot migration). */ - if (slot_num == -1 && (clusterNodeIsPrimary(node) || primary == NULL || (!nodeFailed(primary) && !force_ack))) { + * with CLUSTERMSG_FLAG0_FORCEACK (manual failover). */ + if (clusterNodeIsPrimary(node) || primary == NULL || (!nodeFailed(primary) && !force_ack)) { if (clusterNodeIsPrimary(node)) { serverLog(LL_WARNING, "Failover auth denied to %.40s (%s) for epoch %llu: it is a primary node", node->name, node->human_nodename, (unsigned long long)requestCurrentEpoch); @@ -4601,52 +4607,114 @@ int clusterValidateFailoverAuth(clusterNode *node, clusterMsg *request, int slot serverLog(LL_WARNING, "Failover auth denied to %.40s (%s) for epoch %llu: its primary is up", node->name, node->human_nodename, (unsigned long long)requestCurrentEpoch); } - return C_ERR; + return; } - /* We did not voted for a failover about this primary for two + /* We will not vote in an election for all slots owned by this primary for two * times the node timeout. This is not strictly needed for correctness * of the algorithm but makes the base case more linear. */ - clusterNode *primary_node; - if (slot_num != -1) { - primary_node = server.cluster->slots[slot_num]; - } else { - primary_node = node->replicaof; - } - if (mstime() - primary_node->voted_time < server.cluster_node_timeout * 2) { + mstime_t remaining_timeout = clusterGetRemainingSlotLevelVoteTimeout(claimed_slots); + if (remaining_timeout > 0) { serverLog(LL_WARNING, "Failover auth denied to %.40s %s: " "can't vote about this primary before %lld milliseconds", node->name, node->human_nodename, - (long long)((server.cluster_node_timeout * 2) - (mstime() - node->replicaof->voted_time))); - return C_ERR; + (long long)remaining_timeout); + return; } /* The replica requesting the vote must have a configEpoch for the claimed * slots that is >= the one of the primaries currently serving the same * slots in the current configuration. */ - if (slot_num != -1) { - if (clusterValidateSlotForFailover(slot_num, node, requestConfigEpoch) == C_ERR) { - return C_ERR; - } - } else { - for (j = 0; j < CLUSTER_SLOTS; j++) { - if (bitmapTestBit(claimed_slots, j) == 0) continue; - if (clusterValidateSlotForFailover(j, node, requestConfigEpoch) == C_ERR) { - return C_ERR; - } - } + int invalid_slot = clusterGetFirstSlotWithHigherConfigEpoch(requestConfigEpoch, claimed_slots); + if (invalid_slot != -1) { + serverLog(LL_WARNING, + "Failover auth denied to %.40s (%s): " + "slot %d epoch (%llu) > reqEpoch (%llu)", + node->name, node->human_nodename, invalid_slot, (unsigned long long)server.cluster->slots[invalid_slot]->configEpoch, + (unsigned long long)requestConfigEpoch); + return; } /* We can vote for this replica. */ server.cluster->lastVoteEpoch = server.cluster->currentEpoch; - primary_node->voted_time = mstime(); + clusterUpdateSlotLevelVoteTime(claimed_slots); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_FSYNC_CONFIG); - serverLog(LL_NOTICE, "Failover auth granted to %.40s (%s) for epoch %llu, slot number %d", node->name, node->human_nodename, - (unsigned long long)server.cluster->currentEpoch, slot_num); - return C_OK; + clusterSendFailoverAuth(node); + serverLog(LL_NOTICE, "Failover auth granted to %.40s (%s) for epoch %llu", node->name, node->human_nodename, + (unsigned long long)server.cluster->currentEpoch); } +/* Vote for the node asking for our vote if there are the conditions. */ +void clusterSendMigrateSlotAckIfNeeded(clusterNode *node, clusterMsg *request, int slot_num) { + uint64_t requestCurrentEpoch = ntohu64(request->currentEpoch); + uint64_t requestConfigEpoch = ntohu64(request->configEpoch); + unsigned char *claimed_slots = request->myslots; + + /* IF we are not a primary serving at least 1 slot, we don't have the + * right to vote, as the cluster size is the number + * of primaries serving at least one slot, and quorum is the cluster + * size + 1 */ + if (!clusterNodeIsVotingPrimary(myself)) return; + + if (!clusterNodeIsPrimary(node)) { + serverLog(LL_WARNING, "Slot migration auth denied to %.40s (%s) for epoch %llu: it is a replica node", node->name, + node->human_nodename, (unsigned long long)requestCurrentEpoch); + } + + /* Request epoch must be >= our currentEpoch. + * Note that it is impossible for it to actually be greater since + * our currentEpoch was updated as a side effect of receiving this + * request, if the request epoch was greater. */ + if (requestCurrentEpoch < server.cluster->currentEpoch) { + serverLog(LL_WARNING, "Slot migration auth denied to %.40s (%s): reqEpoch (%llu) < curEpoch(%llu)", node->name, + node->human_nodename, (unsigned long long)requestCurrentEpoch, + (unsigned long long)server.cluster->currentEpoch); + return; + } + + /* I already voted for this epoch? Return ASAP. */ + if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) { + serverLog(LL_WARNING, "Slot migration auth denied to %.40s (%s): already voted for epoch %llu", node->name, + node->human_nodename, (unsigned long long)server.cluster->currentEpoch); + return; + } + + /* We will not vote for a migration about this slot for two + * times the node timeout. */ + unsigned char to_be_claimed_slots[2048]; + bitmapSetBit(to_be_claimed_slots, slot_num); + mstime_t remaining_timeout = clusterGetRemainingSlotLevelVoteTimeout(to_be_claimed_slots); + if (remaining_timeout > 0) { + serverLog(LL_WARNING, + "Slot migration auth denied to %.40s %s: " + "can't vote about slot %d before %lld milliseconds", + node->name, node->human_nodename, slot_num, + (long long)remaining_timeout); + return; + } + + /* We also need to verify that the primary node still owns all the slots + * it thinks it does, or we risk regressing the slot ownership. */ + int invalid_slot = clusterGetFirstSlotWithHigherConfigEpoch(requestConfigEpoch, claimed_slots); + if (invalid_slot != -1) { + serverLog(LL_WARNING, + "Slot migration auth denied to %.40s (%s): " + "slot %d epoch (%llu) > reqEpoch (%llu)", + node->name, node->human_nodename, invalid_slot, (unsigned long long)server.cluster->slots[invalid_slot]->configEpoch, + (unsigned long long)requestConfigEpoch); + } + + /* We can vote for this migration. */ + server.cluster->lastVoteEpoch = server.cluster->currentEpoch; + clusterUpdateSlotLevelVoteTime(to_be_claimed_slots); + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_FSYNC_CONFIG); + clusterSendMigrateSlotAck(node, slot_num); + serverLog(LL_NOTICE, "Slot migration auth granted to %.40s (%s) for epoch %llu", node->name, node->human_nodename, + (unsigned long long)server.cluster->currentEpoch); +} + + /* This function returns the "rank" of this instance, a replica, in the context * of its primary-replicas ring. The rank of the replica is given by the number of * other replicas for the same primary that have a better replication offset @@ -7149,7 +7217,7 @@ int clusterCommandSpecial(client *c) { to_enqueue->end_time = 0; /* Will be set once started. */ to_enqueue->link = NULL; to_enqueue->pause_end = 0; - to_enqueue->pause_primary_offset = 0; + to_enqueue->pause_primary_offset = -1; to_enqueue->vote_end_time = 0; to_enqueue->vote_retry_time = 0; to_enqueue->vote_epoch = 0; diff --git a/src/cluster_legacy.h b/src/cluster_legacy.h index 403b4eac90..81bfa9540c 100644 --- a/src/cluster_legacy.h +++ b/src/cluster_legacy.h @@ -351,7 +351,6 @@ struct _clusterNode { mstime_t pong_received; /* Unix time we received the pong */ mstime_t data_received; /* Unix time we received any data */ mstime_t fail_time; /* Unix time when FAIL flag was set */ - mstime_t voted_time; /* Last time we voted for a replica of this primary */ mstime_t repl_offset_time; /* Unix time we received offset for this node */ mstime_t orphaned_time; /* Starting time of orphaned primary condition */ long long repl_offset; /* Last known repl offset for this node. */ @@ -381,7 +380,8 @@ typedef enum slotMigrationState { SLOT_MIGRATION_QUEUED, /* Queued behind some other slot migration. */ SLOT_MIGRATION_SYNCING, /* Syncing contents from current owner. */ SLOT_MIGRATION_PAUSE_OWNER, - SLOT_MIGRATION_SYNCING_TO_PAUSE, + SLOT_MIGRATION_WAITING_FOR_OFFSET, + SLOT_MIGRATION_SYNCING_TO_OFFSET, SLOT_MIGRATION_STARTING_VOTE, SLOT_MIGRATION_GATHERING_VOTES, /* Gathering votes necessary for slot-level takeover. */ SLOT_MIGRATION_FINISH, @@ -414,6 +414,7 @@ struct clusterState { clusterNode *migrating_slots_to[CLUSTER_SLOTS]; clusterNode *importing_slots_from[CLUSTER_SLOTS]; clusterNode *slots[CLUSTER_SLOTS]; + mstime_t last_slot_vote_time[CLUSTER_SLOTS]; /* The following fields are used to take the replica state on elections. */ mstime_t failover_auth_time; /* Time of previous or next election. */ int failover_auth_count; /* Number of votes received so far. */ diff --git a/src/server.c b/src/server.c index 413d2b7170..9f1d4c6eae 100644 --- a/src/server.c +++ b/src/server.c @@ -4022,6 +4022,13 @@ int processCommand(client *c) { } } + /* If we are replicating from a slot, ignore requests for other slots. */ + if (obey_client && c->primary_slot_num != -1) { + serverLog(LL_NOTICE, "Received request from primary in slot %d", c->slot); + getNodeByQuery(c, c->cmd, c->argv, c->argc, &c->slot, NULL); + if (c->slot != c->primary_slot_num) return C_OK; + } + if (!server.cluster_enabled && c->capa & CLIENT_CAPA_REDIRECT && server.primary_replication_link && !obey_client && (is_write_command || (is_read_command && !c->flag.readonly))) { if (server.failover_state == FAILOVER_IN_PROGRESS) {