Skip to content

Commit

Permalink
Enhance SENTINEL FAILOVER to use the FAILOVER command to avoid data loss
Browse files Browse the repository at this point in the history
Signed-off-by: Binbin <[email protected]>
  • Loading branch information
enjoy-binbin committed Oct 29, 2024
1 parent c21f1dc commit 6fa3f78
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 15 deletions.
9 changes: 8 additions & 1 deletion src/commands.def
Original file line number Diff line number Diff line change
Expand Up @@ -5561,9 +5561,16 @@ struct COMMAND_ARG SENTINEL_DEBUG_Args[] = {
#define SENTINEL_FAILOVER_Keyspecs NULL
#endif

/* SENTINEL FAILOVER failover_type argument table */
struct COMMAND_ARG SENTINEL_FAILOVER_failover_type_Subargs[] = {
{MAKE_ARG("legacy",ARG_TYPE_PURE_TOKEN,-1,"LEGACY",NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("pause",ARG_TYPE_PURE_TOKEN,-1,"PAUSE",NULL,NULL,CMD_ARG_NONE,0,NULL)},
};

/* SENTINEL FAILOVER argument table */
struct COMMAND_ARG SENTINEL_FAILOVER_Args[] = {
{MAKE_ARG("primary-name",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("failover-type",ARG_TYPE_ONEOF,-1,"TYPE",NULL,"8.1.0",CMD_ARG_OPTIONAL,2,NULL),.subargs=SENTINEL_FAILOVER_failover_type_Subargs},
};

/********** SENTINEL FLUSHCONFIG ********************/
Expand Down Expand Up @@ -6026,7 +6033,7 @@ struct COMMAND_STRUCT SENTINEL_Subcommands[] = {
{MAKE_CMD("ckquorum","Checks for a Sentinel quorum.",NULL,"2.8.4",CMD_DOC_NONE,NULL,NULL,"sentinel",COMMAND_GROUP_SENTINEL,SENTINEL_CKQUORUM_History,0,SENTINEL_CKQUORUM_Tips,0,sentinelCommand,3,CMD_ADMIN|CMD_SENTINEL|CMD_ONLY_SENTINEL,0,SENTINEL_CKQUORUM_Keyspecs,0,NULL,1),.args=SENTINEL_CKQUORUM_Args},
{MAKE_CMD("config","Configures Sentinel.","O(N) when N is the number of configuration parameters provided","6.2.0",CMD_DOC_NONE,NULL,NULL,"sentinel",COMMAND_GROUP_SENTINEL,SENTINEL_CONFIG_History,1,SENTINEL_CONFIG_Tips,0,sentinelCommand,-4,CMD_ADMIN|CMD_SENTINEL|CMD_ONLY_SENTINEL,0,SENTINEL_CONFIG_Keyspecs,0,NULL,1),.args=SENTINEL_CONFIG_Args},
{MAKE_CMD("debug","Lists or updates the current configurable parameters of Sentinel.","O(N) where N is the number of configurable parameters","7.0.0",CMD_DOC_NONE,NULL,NULL,"sentinel",COMMAND_GROUP_SENTINEL,SENTINEL_DEBUG_History,0,SENTINEL_DEBUG_Tips,0,sentinelCommand,-2,CMD_ADMIN|CMD_SENTINEL|CMD_ONLY_SENTINEL,0,SENTINEL_DEBUG_Keyspecs,0,NULL,1),.args=SENTINEL_DEBUG_Args},
{MAKE_CMD("failover","Forces a Sentinel failover.",NULL,"2.8.4",CMD_DOC_NONE,NULL,NULL,"sentinel",COMMAND_GROUP_SENTINEL,SENTINEL_FAILOVER_History,0,SENTINEL_FAILOVER_Tips,0,sentinelCommand,3,CMD_ADMIN|CMD_SENTINEL|CMD_ONLY_SENTINEL,0,SENTINEL_FAILOVER_Keyspecs,0,NULL,1),.args=SENTINEL_FAILOVER_Args},
{MAKE_CMD("failover","Forces a Sentinel failover.",NULL,"2.8.4",CMD_DOC_NONE,NULL,NULL,"sentinel",COMMAND_GROUP_SENTINEL,SENTINEL_FAILOVER_History,0,SENTINEL_FAILOVER_Tips,0,sentinelCommand,-3,CMD_ADMIN|CMD_SENTINEL|CMD_ONLY_SENTINEL,0,SENTINEL_FAILOVER_Keyspecs,0,NULL,2),.args=SENTINEL_FAILOVER_Args},
{MAKE_CMD("flushconfig","Rewrites the Sentinel configuration file.","O(1)","2.8.4",CMD_DOC_NONE,NULL,NULL,"sentinel",COMMAND_GROUP_SENTINEL,SENTINEL_FLUSHCONFIG_History,0,SENTINEL_FLUSHCONFIG_Tips,0,sentinelCommand,2,CMD_ADMIN|CMD_SENTINEL|CMD_ONLY_SENTINEL,0,SENTINEL_FLUSHCONFIG_Keyspecs,0,NULL,0)},
{MAKE_CMD("get-master-addr-by-name","Returns the port and address of a primary instance.","O(1)","2.8.4",CMD_DOC_DEPRECATED,"`SENTINEL GET-PRIMARY-ADDR-BY-NAME`","8.0.0","sentinel",COMMAND_GROUP_SENTINEL,SENTINEL_GET_MASTER_ADDR_BY_NAME_History,0,SENTINEL_GET_MASTER_ADDR_BY_NAME_Tips,0,sentinelCommand,3,CMD_ADMIN|CMD_SENTINEL|CMD_ONLY_SENTINEL,0,SENTINEL_GET_MASTER_ADDR_BY_NAME_Keyspecs,0,NULL,1),.args=SENTINEL_GET_MASTER_ADDR_BY_NAME_Args},
{MAKE_CMD("get-primary-addr-by-name","Returns the port and address of a primary instance.","O(1)","8.0.0",CMD_DOC_NONE,NULL,NULL,"sentinel",COMMAND_GROUP_SENTINEL,SENTINEL_GET_PRIMARY_ADDR_BY_NAME_History,0,SENTINEL_GET_PRIMARY_ADDR_BY_NAME_Tips,0,sentinelCommand,3,CMD_ADMIN|CMD_SENTINEL|CMD_ONLY_SENTINEL,0,SENTINEL_GET_PRIMARY_ADDR_BY_NAME_Keyspecs,0,NULL,1),.args=SENTINEL_GET_PRIMARY_ADDR_BY_NAME_Args},
Expand Down
21 changes: 20 additions & 1 deletion src/commands/sentinel-failover.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"summary": "Forces a Sentinel failover.",
"group": "sentinel",
"since": "2.8.4",
"arity": 3,
"arity": -3,
"container": "SENTINEL",
"function": "sentinelCommand",
"command_flags": [
Expand All @@ -19,6 +19,25 @@
{
"name": "primary-name",
"type": "string"
},
{
"token": "TYPE",
"name": "failover-type",
"type": "oneof",
"optional": true,
"since": "8.1.0",
"arguments": [
{
"name": "legacy",
"type": "pure-token",
"token": "legacy"
},
{
"name": "pause",
"type": "pure-token",
"token": "pause"
}
]
}
]
}
Expand Down
104 changes: 99 additions & 5 deletions src/sentinel.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ typedef struct sentinelAddr {
#define SRI_FORCE_FAILOVER (1 << 11) /* Force failover with primary up. */
#define SRI_SCRIPT_KILL_SENT (1 << 12) /* SCRIPT KILL already sent on -BUSY */
#define SRI_PRIMARY_REBOOT (1 << 13) /* Primary was detected as rebooting */
#define SRI_SUPPORT_FAILOVER (1 << 14) /* Primary and replica support FAILOVER command. */
/* Note: when adding new flags, please check the flags section in addReplySentinelValkeyInstance. */

/* Note: times are in milliseconds. */
Expand Down Expand Up @@ -114,6 +115,7 @@ static mstime_t sentinel_default_failover_timeout = 60 * 3 * 1000;
#define SENTINEL_FAILOVER_STATE_WAIT_PROMOTION 4 /* Wait replica to change role */
#define SENTINEL_FAILOVER_STATE_RECONF_REPLICAS 5 /* REPLICAOF newprimary */
#define SENTINEL_FAILOVER_STATE_UPDATE_CONFIG 6 /* Monitor promoted replica. */
#define SENTINEL_FAILOVER_STATE_SEND_FAILOVER 7 /* Send FAILOVER Command to primary. */

#define SENTINEL_PRIMARY_LINK_STATUS_UP 0
#define SENTINEL_PRIMARY_LINK_STATUS_DOWN 1
Expand Down Expand Up @@ -3269,6 +3271,7 @@ void addReplySentinelValkeyInstance(client *c, sentinelValkeyInstance *ri) {
if (ri->flags & SRI_FORCE_FAILOVER) flags = sdscat(flags, "force_failover,");
if (ri->flags & SRI_SCRIPT_KILL_SENT) flags = sdscat(flags, "script_kill_sent,");
if (ri->flags & SRI_PRIMARY_REBOOT) flags = sdscat(flags, "master_reboot,");
if (ri->flags & SRI_SUPPORT_FAILOVER) flags = sdscat(flags, "support_failover,");

if (sdslen(flags) != 0) sdsrange(flags, 0, -2); /* remove last "," */
addReplyBulkCString(c, flags);
Expand Down Expand Up @@ -3837,11 +3840,31 @@ void sentinelCommand(client *c) {
addReplyBulkLongLong(c, addr->port);
}
} else if (!strcasecmp(c->argv[1]->ptr, "failover")) {
/* SENTINEL FAILOVER <primary-name> */
/* SENTINEL FAILOVER <primary-name> [type <legacy | pause>] */
sentinelValkeyInstance *ri;
/* 0: No parameters passed default is legacy. 1: legacy. 2: pause. */
int failover_type = 0;

if (c->argc != 3) goto numargserr;
if (c->argc != 3 && c->argc != 5) goto numargserr;
if ((ri = sentinelGetPrimaryByNameOrReplyError(c, c->argv[2])) == NULL) return;
for (int i = 3; i < c->argc; i++) {
int moreargs = c->argc > i + 1;
if (!strcasecmp(c->argv[i]->ptr, "type") && moreargs && failover_type == 0) {
if (!strcasecmp(c->argv[i + 1]->ptr, "legacy")) {
failover_type = 1;
} else if (!strcasecmp(c->argv[i + 1]->ptr, "pause")) {
failover_type = 2;
} else {
addReplyErrorFormat(c, "Unknown failover type '%s'", (char *)c->argv[i + 1]->ptr);
return;
}
i++;
} else {
addReplyErrorObject(c, shared.syntaxerr);
return;
}
}

if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
addReplyError(c, "-INPROG Failover already in progress");
return;
Expand All @@ -3853,6 +3876,7 @@ void sentinelCommand(client *c) {
serverLog(LL_NOTICE, "Executing user requested FAILOVER of '%s'", ri->name);
sentinelStartFailover(ri);
ri->flags |= SRI_FORCE_FAILOVER;
if (failover_type == 2) ri->flags |= SRI_SUPPORT_FAILOVER;
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "pending-scripts")) {
/* SENTINEL PENDING-SCRIPTS */
Expand Down Expand Up @@ -4635,6 +4659,41 @@ char *sentinelGetLeader(sentinelValkeyInstance *primary, uint64_t epoch) {
return winner;
}

void sentinelFailoverReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
sentinelValkeyInstance *ri = privdata;
instanceLink *link = c->data;
redisReply *r;

if (!reply || !link) return;
link->pending_commands--;
r = reply;

/* Primary does not support FAILOVER, fallback to legacy type. */
if (r->type == REDIS_REPLY_ERROR) {
sentinelEvent(LL_NOTICE, "+failover-state-send-slaveof-noone", ri->promoted_replica, "%@");
ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_REPLICAOF_NOONE;
ri->failover_state_change_time = mstime();
}
}

/* Send FAILOVER to the specified primary instance, the replica addr passed in
* at the same time will be used as the TO parameter. */
int sentinelSendFailover(sentinelValkeyInstance *ri, const sentinelAddr *addr) {
char portstr[32];
const char *host;
int retval;

host = announceSentinelAddr(addr);
ll2string(portstr, sizeof(portstr), addr->port);

retval = redisAsyncCommand(ri->link->cc, sentinelFailoverReplyCallback, ri, "%s TO %s %s TIMEOUT %lld",
sentinelInstanceMapCommand(ri, "FAILOVER"), host, portstr, ri->failover_timeout);
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

return C_OK;
}

/* Send REPLICAOF to the specified instance, always followed by a
* CONFIG REWRITE command in order to store the new configuration on disk
* when possible (that is, if the instance is recent enough to support
Expand Down Expand Up @@ -4901,12 +4960,46 @@ void sentinelFailoverSelectReplica(sentinelValkeyInstance *ri) {
sentinelEvent(LL_WARNING, "+selected-slave", replica, "%@");
replica->flags |= SRI_PROMOTED;
ri->promoted_replica = replica;
ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_REPLICAOF_NOONE;
if (ri->flags & SRI_SUPPORT_FAILOVER) {
sentinelEvent(LL_NOTICE, "+failover-state-send-failover", replica, "%@");
ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_FAILOVER;
} else {
sentinelEvent(LL_NOTICE, "+failover-state-send-slaveof-noone", replica, "%@");
ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_REPLICAOF_NOONE;
}
ri->failover_state_change_time = mstime();
sentinelEvent(LL_NOTICE, "+failover-state-send-slaveof-noone", replica, "%@");
}
}

void sentinelFailoverSendFailover(sentinelValkeyInstance *ri) {
/* We can't send the command to the promoted replica if it is now
* disconnected. Retry again and again with this state until the timeout
* is reached, then abort the failover. */
if (ri->promoted_replica->link->disconnected) {
if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
sentinelEvent(LL_WARNING, "-failover-abort-slave-timeout", ri, "%@");
sentinelAbortFailover(ri);
}
return;
}

/* We will first try to use SHUTDOWN to coordinate a failover between the primary
* and promoted replica to avoid data loss. */
if ((ri->flags & (SRI_S_DOWN | SRI_O_DOWN)) == 0 && !ri->link->disconnected) {
if (sentinelSendFailover(ri, ri->promoted_replica->addr) == C_OK) {
sentinelEvent(LL_NOTICE, "+failover-state-wait-promotion", ri->promoted_replica, "%@");
ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
ri->failover_state_change_time = mstime();
return;
}
}

/* Fallback to legacy type. */
sentinelEvent(LL_NOTICE, "+failover-state-send-slaveof-noone", ri->promoted_replica, "%@");
ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_REPLICAOF_NOONE;
ri->failover_state_change_time = mstime();
}

void sentinelFailoverSendReplicaOfNoOne(sentinelValkeyInstance *ri) {
int retval;

Expand Down Expand Up @@ -5078,6 +5171,7 @@ void sentinelFailoverStateMachine(sentinelValkeyInstance *ri) {
switch (ri->failover_state) {
case SENTINEL_FAILOVER_STATE_WAIT_START: sentinelFailoverWaitStart(ri); break;
case SENTINEL_FAILOVER_STATE_SELECT_REPLICA: sentinelFailoverSelectReplica(ri); break;
case SENTINEL_FAILOVER_STATE_SEND_FAILOVER: sentinelFailoverSendFailover(ri); break;
case SENTINEL_FAILOVER_STATE_SEND_REPLICAOF_NOONE: sentinelFailoverSendReplicaOfNoOne(ri); break;
case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: sentinelFailoverWaitPromotion(ri); break;
case SENTINEL_FAILOVER_STATE_RECONF_REPLICAS: sentinelFailoverReconfNextReplica(ri); break;
Expand All @@ -5093,7 +5187,7 @@ void sentinelAbortFailover(sentinelValkeyInstance *ri) {
serverAssert(ri->flags & SRI_FAILOVER_IN_PROGRESS);
serverAssert(ri->failover_state <= SENTINEL_FAILOVER_STATE_WAIT_PROMOTION);

ri->flags &= ~(SRI_FAILOVER_IN_PROGRESS | SRI_FORCE_FAILOVER);
ri->flags &= ~(SRI_FAILOVER_IN_PROGRESS | SRI_FORCE_FAILOVER | SRI_SUPPORT_FAILOVER);
ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
ri->failover_state_change_time = mstime();
if (ri->promoted_replica) {
Expand Down
58 changes: 50 additions & 8 deletions tests/sentinel/tests/05-manual.tcl
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
# Test manual failover

source "../tests/includes/init-tests.tcl"
proc test_sentinel_failover {type master_id} {

foreach_sentinel_id id {
S $id sentinel debug info-period 2000
S $id sentinel debug default-down-after 6000
S $id sentinel debug publish-period 1000
}

test "Manual failover works" {
set val 0

test "Manual failover works - $type" {
R $master_id del foo
set old_port [RPort $master_id]
set addr [S 0 SENTINEL GET-PRIMARY-ADDR-BY-NAME mymaster]
assert {[lindex $addr 1] == $old_port}
Expand All @@ -17,16 +20,26 @@ test "Manual failover works" {
# sentinel - replica may not have enough time to exchange INFO and update
# the replica's info-period, so the test may get a NOGOODSLAVE.
wait_for_condition 300 50 {
[catch {S 0 SENTINEL FAILOVER mymaster}] == 0
[catch {S 0 SENTINEL FAILOVER mymaster type $type}] == 0
} else {
catch {S 0 SENTINEL FAILOVER mymaster} reply
catch {S 0 SENTINEL FAILOVER mymaster type $type} reply
puts [S 0 SENTINEL REPLICAS mymaster]
fail "Sentinel manual failover did not work, got: $reply"
}

catch {S 0 SENTINEL FAILOVER mymaster} reply
catch {S 0 SENTINEL FAILOVER mymaster type $type} reply
assert_match {*INPROG*} $reply ;# Failover already in progress

# After sending sentinel failover, continue writing to the primary
# to observe the final data consistency.
for {set j 0} {$j < 1000000} {incr j} {
catch {R $master_id incr foo} err
if {[string match "READONLY*" $err]} {
break
}
set val $err
}

foreach_sentinel_id id {
wait_for_condition 1000 50 {
[lindex [S $id SENTINEL GET-PRIMARY-ADDR-BY-NAME mymaster] 1] != $old_port
Expand All @@ -38,11 +51,11 @@ test "Manual failover works" {
set master_id [get_instance_id_by_port valkey [lindex $addr 1]]
}

test "New primary [join $addr {:}] role matches" {
test "New primary [join $addr {:}] role matches - $type" {
assert {[RI $master_id role] eq {master}}
}

test "All the other slaves now point to the new primary" {
test "All the other slaves now point to the new primary - $type" {
foreach_valkey_id id {
if {$id != $master_id && $id != 0} {
wait_for_condition 1000 50 {
Expand All @@ -54,14 +67,43 @@ test "All the other slaves now point to the new primary" {
}
}

test "The old primary eventually gets reconfigured as a slave" {
test "The old primary eventually gets reconfigured as a replica - $type" {
wait_for_condition 1000 50 {
[RI 0 master_port] == [lindex $addr 1]
} else {
fail "Old master not reconfigured as slave of new master"
}
}

test "Check data consistency - $type" {
if {$type == "legacy"} {
# In legacy type, there is a good chance that data will be lost eventually.
foreach_valkey_id id {
wait_for_condition 1000 50 {
[R $id get foo] != $val
} else {
fail "Data is consistency in legacy type"
}
}
} elseif {$type == "pause"} {
foreach_valkey_id id {
wait_for_condition 1000 50 {
[R $id get foo] == $val
} else {
fail "Data is not consistency in pause type"
}
}
}
}

} ;# end proc test_sentinel_failover

source "../tests/includes/init-tests.tcl"
test_sentinel_failover "legacy" $master_id

source "../tests/includes/init-tests.tcl"
test_sentinel_failover "pause" $master_id

foreach flag {crash-after-election crash-after-promotion} {
# Before each SIMULATE-FAILURE test, re-source init-tests to get a clean environment
source "../tests/includes/init-tests.tcl"
Expand Down

0 comments on commit 6fa3f78

Please sign in to comment.