Skip to content

Commit

Permalink
ASAN fixes
Browse files Browse the repository at this point in the history
Includes reference counting so that the RDB background event processing doesn't cause the link to be destroyed while being used.

Signed-off-by: Jacob Murphy <[email protected]>
  • Loading branch information
murphyjacob4 committed Nov 4, 2024
1 parent c9131df commit 00e2a90
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 37 deletions.
138 changes: 102 additions & 36 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ void replicaStartCommandStream(client *replica);
int cancelReplicationHandshake(replicationLink *link, int reconnect);
void replicationSteadyStateInit(replicationLink *link);
void dualChannelSetupMainConnForPsync(connection *conn);
void dualChannelSyncHandleRdbLoadCompletion(replicationLink *link);
int dualChannelSyncHandleRdbLoadCompletion(replicationLink *link);
static void dualChannelFullSyncWithSource(connection *conn);

/* We take a global flag to remember if this instance generated an RDB
Expand Down Expand Up @@ -2029,6 +2029,7 @@ void readSyncBulkPayload(connection *conn) {

replicationLink *link = (replicationLink *) connGetPrivateData(conn);
int use_diskless_load = useDisklessLoad(link);
int slot_num = link->slot_num;

/* Static vars used to hold the EOF mark, and the last bytes received
* from the server: when they match, we reached the end of the transfer. */
Expand Down Expand Up @@ -2227,9 +2228,10 @@ void readSyncBulkPayload(connection *conn) {
functionsLibCtx *functions_lib_ctx;
int asyncLoading = 0;

if (link->slot_num != -1) {
if (slot_num != -1) {
dbarray = server.db;
functions_lib_ctx = functionsLibCtxGetCurrent();
asyncLoading = 1;
} else if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
/* Async loading means we continue serving read commands during full resync, and
* "swap" the new db with the old db only when loading is done.
Expand Down Expand Up @@ -2266,6 +2268,10 @@ void readSyncBulkPayload(connection *conn) {
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory");
startLoading(link->transfer_size, RDBFLAGS_REPLICATION, asyncLoading);

/* Before loading, ensure that the link won't be freed, even if
* REPLICAOF NO ONE is called in background event processing. */
link->ref_count++;

int loadingFailed = 0;
rdbLoadingCtx loadingCtx = {.dbarray = dbarray, .functions_lib_ctx = functions_lib_ctx};
if (rdbLoadRioWithLoadingCtx(&rdb, RDBFLAGS_REPLICATION, &rsi, &loadingCtx) != C_OK) {
Expand All @@ -2281,14 +2287,22 @@ void readSyncBulkPayload(connection *conn) {
}
}

/* After loading, decrement the ref count */
if (freeReplicationLink(link)) {
/* Link was freed during RDB load */
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Link to primary closed during diskless RDB load");
loadingFailed = 1;
link = NULL;
}

if (loadingFailed) {
stopLoading(0);
cancelReplicationHandshake(link, 1);
if (link) cancelReplicationHandshake(link, 1);
rioFreeConn(&rdb, NULL);

if (link->slot_num != -1) {
if (slot_num != -1) {
/* Just drop the dictionary for the provided slot. */
dropKeysInSlot(link->slot_num, server.repl_replica_lazy_flush);
dropKeysInSlot(slot_num, server.repl_replica_lazy_flush);
// TODO(murphyjacob4) figure out how to inform modules of this
} else if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
/* Discard potentially partially loaded tempDb. */
Expand Down Expand Up @@ -2388,10 +2402,28 @@ void readSyncBulkPayload(connection *conn) {
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);

serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory");

/* Before loading, ensure that the link won't be freed, even if
* REPLICAOF NO ONE is called in background event processing. */
link->ref_count++;

int loading_failed = 0;
if (rdbLoad(server.rdb_filename, &rsi, RDBFLAGS_REPLICATION) != RDB_OK) {
serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization "
"DB from disk, check server logs.");
cancelReplicationHandshake(link, 1);
loading_failed = 1;
}

/* After loading, decrement the ref count */
if (freeReplicationLink(link)) {
/* Link was freed during RDB load */
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Link to primary closed during diskless RDB load");
loading_failed = 1;
link = NULL;
}

if (loading_failed) {
if (link) cancelReplicationHandshake(link, 1);
if (server.rdb_del_sync_files && allPersistenceDisabled()) {
serverLog(LL_NOTICE, "Removing the RDB file obtained from "
"the primary. This replica has persistence "
Expand Down Expand Up @@ -2424,7 +2456,10 @@ void readSyncBulkPayload(connection *conn) {

/* Final setup of the connected replica <- primary link */
if (conn == link->rdb_transfer_s) {
dualChannelSyncHandleRdbLoadCompletion(link);
if (dualChannelSyncHandleRdbLoadCompletion(link) == C_ERR) {
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Failed to finalize dual channel load");
return;
}
} else {
replicationCreateSourceClient(link, rsi.repl_stream_db);
link->state = REPL_STATE_CONNECTED;
Expand Down Expand Up @@ -2933,14 +2968,19 @@ void bufferReplData(connection *conn) {
}

/* Replication: Replica side.
* Streams accumulated replication data into the database while freeing read nodes */
* Streams accumulated replication data into the database while freeing read nodes.
* An error return indicates that link has been freed and should not be used. */
int streamReplDataBufToDb(replicationLink *link) {
serverAssert(link->client->flag.primary);
blockingOperationStarts();
size_t used, offset = 0;
listNode *cur = NULL;
time_t last_progress_callback = mstime();
while (link->pending_repl_data.blocks && (cur = listFirst(link->pending_repl_data.blocks))) {

/* Before loading, increment the link reference count to ensure it is not freed. */
link->ref_count++;

while (link->ref_count > 1 && link->pending_repl_data.blocks && (cur = listFirst(link->pending_repl_data.blocks))) {
/* Read and process repl data block */
replDataBufBlock *o = listNodeValue(cur);
used = o->used;
Expand All @@ -2953,31 +2993,28 @@ int streamReplDataBufToDb(replicationLink *link) {
replStreamProgressCallback(link, offset, used, &last_progress_callback);
}
blockingOperationEnds();
if (!link->pending_repl_data.blocks) {
if (freeReplicationLink(link)) {
/* If we encounter a `replicaof` command during the replStreamProgressCallback,
* pending_repl_data.blocks will be NULL, and we should return an error and
* abort the current sync session. */
* freeReplicationLink will return 1, and we should return an error and abort
* the current sync session. */
serverLog(LL_NOTICE, "During dual channel sync, replication link freed during stream of data buffer to DB.");
return C_ERR;
}
return C_OK;
}

/* Replication: Replica side.
* After done loading the snapshot using the rdb-channel prepare this replica for steady state by
* initializing the primary client, amd stream local incremental buffer into memory. */
void dualChannelSyncSuccess(replicationLink *link) {
* initializing the primary client, amd stream local incremental buffer into memory.
* An error return indicates that the link is now destroyed and should not be used. */
int dualChannelSyncSuccess(replicationLink *link) {
link->initial_offset = link->provisional_source_state.reploff;
replicationResurrectProvisionalSource(link);
/* Wait for the accumulated buffer to be processed before reading any more replication updates */
if (link->pending_repl_data.blocks && streamReplDataBufToDb(link) == C_ERR) {
/* Sync session aborted during repl data streaming. */
serverLog(LL_WARNING, "Failed to stream local replication buffer into memory");
/* Verify sync is still in progress */
if (link->rdb_channel_state != REPL_DUAL_CHANNEL_STATE_NONE) {
replicationAbortDualChannelSyncTransfer(link);
if (link == server.primary_replication_link) replicationUnsetPrimary();
}
return;
return C_ERR;
}
freePendingReplDataBuf(link);
serverLog(LL_NOTICE, "Successfully streamed replication data into memory");
Expand All @@ -2986,6 +3023,7 @@ void dualChannelSyncSuccess(replicationLink *link) {
replicationSendAck(link); /* Send ACK to notify primary that replica is synced */
link->rdb_client_id = -1;
link->rdb_channel_state = REPL_DUAL_CHANNEL_STATE_NONE;
return C_OK;
}

/* Replication: Replica side.
Expand All @@ -3006,24 +3044,22 @@ int dualChannelSyncHandlePsync(replicationLink *link) {
serverAssert(link->rdb_channel_state == REPL_DUAL_CHANNEL_RDB_LOADED);
/* RDB is loaded */
serverLog(LL_DEBUG, "Dual channel sync - psync established after rdb load");
dualChannelSyncSuccess(link);
return C_OK;
return dualChannelSyncSuccess(link);
}

/* Replication: Replica side.
* RDB channel done loading the RDB. Check whether the main channel has completed its part
* and act accordingly. */
void dualChannelSyncHandleRdbLoadCompletion(replicationLink *link) {
int dualChannelSyncHandleRdbLoadCompletion(replicationLink *link) {
serverAssert(link->rdb_channel_state == REPL_DUAL_CHANNEL_RDB_LOAD);
if (link->state < REPL_STATE_TRANSFER) {
/* Main psync channel hasn't been established yet */
link->rdb_channel_state = REPL_DUAL_CHANNEL_RDB_LOADED;
return;
return C_OK;
}
serverAssert(link->state == REPL_STATE_TRANSFER);
connSetReadHandler(link->transfer_s, NULL);
dualChannelSyncSuccess(link);
return;
return dualChannelSyncSuccess(link);
}

/* Try a partial resynchronization with the primary if we are about to reconnect.
Expand Down Expand Up @@ -3322,7 +3358,8 @@ int dualChannelReplMainConnSendPsync(replicationLink *link, sds *err) {
return C_OK;
}

int dualChannelReplMainConnRecvPsyncReply(replicationLink *link, sds *err) {
int dualChannelReplMainConnRecvPsyncReply(replicationLink *link, sds *err, int *link_closed) {
*link_closed = 0;
int psync_result = replicaTryPartialResynchronization(link, 1);
if (psync_result == PSYNC_WAIT_REPLY) return C_OK; /* Try again later... */

Expand All @@ -3333,8 +3370,11 @@ int dualChannelReplMainConnRecvPsyncReply(replicationLink *link, sds *err) {
serverCommunicateSystemd("STATUS=PRIMARY <-> REPLICA sync: Partial Resynchronization accepted. Ready to "
"accept connections in read-write mode.\n");
}
dualChannelSyncHandlePsync(link);
return C_OK;
int ret = dualChannelSyncHandlePsync(link);
if (ret == C_ERR) {
*link_closed = 1;
}
return ret;
}
*err = getTryPsyncString(psync_result);
return C_ERR;
Expand Down Expand Up @@ -3368,7 +3408,10 @@ void dualChannelSetupMainConnForPsync(connection *conn) {
if (ret == C_OK) link->state = REPL_STATE_RECEIVE_PSYNC_REPLY;
break;
case REPL_STATE_RECEIVE_PSYNC_REPLY:
ret = dualChannelReplMainConnRecvPsyncReply(link, &err);
serverLog(LL_NOTICE, "Handle PSYNC");
int link_closed;
ret = dualChannelReplMainConnRecvPsyncReply(link, &err, &link_closed);
if (link_closed) return;
if (ret == C_OK && link->rdb_channel_state != REPL_DUAL_CHANNEL_STATE_NONE)
link->state = REPL_STATE_TRANSFER;
/* In case the RDB is already loaded, the repl_state will be set during establishSourceConnection. */
Expand Down Expand Up @@ -3843,6 +3886,7 @@ void syncWithSource(connection *conn) {

replicationLink *createReplicationLink(char *host, int port, int slot_num) {
replicationLink *result = (replicationLink *) zmalloc(sizeof(replicationLink));
result->ref_count = 1;
result->state = REPL_STATE_CONNECT;
result->rdb_channel_state = REPL_DUAL_CHANNEL_STATE_NONE;
result->slot_num = slot_num;
Expand Down Expand Up @@ -3871,8 +3915,13 @@ replicationLink *createReplicationLink(char *host, int port, int slot_num) {
return result;
}

void freeReplicationLink(replicationLink *link) {
if (!link) return;

int freeReplicationLink(replicationLink *link) {
if (!link) return 0;
if (link->ref_count > 1) {
link->ref_count--;
return 0;
}
cancelReplicationHandshake(link, 0);
/* Free primary_host before the call to freeClient since it calls
* replicationHandlePrimaryDisconnection which can trigger a re-connect
Expand Down Expand Up @@ -3902,6 +3951,7 @@ void freeReplicationLink(replicationLink *link) {
}
freePendingReplDataBuf(link);
zfree(link);
return 1;
}

int connectReplicationLink(replicationLink *link) {
Expand Down Expand Up @@ -3988,6 +4038,14 @@ void replicationSetPrimary(char *ip, int port, int full_sync_required) {

if (server.primary_replication_link) {
if (server.primary_replication_link->client) server.primary_replication_link->client->flag.dont_cache_primary = full_sync_required;
if (server.primary_replication_link->client) {
/* freeClient may attempt to cache the primary. However,
* freeReplicationLink may not immediately call freeClient (e.g. if
* there are other references, like in RDB load). We call freeClient
* explicitly to cause this caching to happen now, instead of later. */
freeClient(server.primary_replication_link->client);
server.primary_replication_link->client = NULL;
}
freeReplicationLink(server.primary_replication_link);
server.primary_replication_link = NULL;
}
Expand Down Expand Up @@ -4034,8 +4092,16 @@ void replicationUnsetPrimary(void) {
if (server.primary_replication_link->state == REPL_STATE_CONNECTED)
moduleFireServerEvent(VALKEYMODULE_EVENT_PRIMARY_LINK_CHANGE, VALKEYMODULE_SUBEVENT_PRIMARY_LINK_DOWN, NULL);

/* Clear primary_host first, since the freeClient calls
* replicationHandlePrimaryDisconnection which can attempt to re-connect. */
if (server.primary_replication_link->client) {
/* freeClient may attempt to cache the primary. However,
* freeReplicationLink may not immediately call freeClient (e.g. if
* there are other references, like in RDB load). We call freeClient
* explicitly to prevent this later freeClient call from happening and
* setting the cached primary unexpectedly at some point in the
* future. */
freeClient(server.primary_replication_link->client);
server.primary_replication_link->client = NULL;
}
freeReplicationLink(server.primary_replication_link);
server.primary_replication_link = NULL;

Expand Down Expand Up @@ -4274,7 +4340,7 @@ void replicationCachePrimary(client *c) {
sdsclear(c->querybuf);
c->qb_pos = 0;
c->repl_applied = 0;
c->read_reploff = server.primary_replication_link->client->reploff;
c->read_reploff = c->reploff;
if (c->flag.multi) discardTransaction(c);
listEmpty(c->reply);
c->sentlen = 0;
Expand Down Expand Up @@ -4420,7 +4486,7 @@ void replicationResurrectProvisionalSource(replicationLink *link) {
/* Create a primary client, but do not initialize the read handler yet, as this replica still has a local buffer to
* drain. */
replicationCreateSourceClientWithHandler(link, link->provisional_source_state.dbid, NULL);
memcpy(link->client->replid, link->provisional_source_state.replid, CONFIG_RUN_ID_SIZE);
memcpy(link->client->replid, link->provisional_source_state.replid, CONFIG_RUN_ID_SIZE + 1);
link->client->reploff = link->provisional_source_state.reploff;
link->client->read_reploff = link->provisional_source_state.read_reploff;
server.primary_repl_offset = link->client->reploff;
Expand Down
3 changes: 2 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1659,6 +1659,7 @@ typedef enum childInfoType {
#define REPL_PURPOSE_SLOT_IMPORT 1

typedef struct replicationLink {
int ref_count; /* Reference count to this replication link for memory management.*/
int state; /* State of the sync operation overall. */
int rdb_channel_state;
client *client;
Expand Down Expand Up @@ -2897,7 +2898,7 @@ int processIOThreadsReadDone(void);
int processIOThreadsWriteDone(void);
replicationLink *createReplicationLink(char *host, int port, int slot_num);
int connectReplicationLink(replicationLink *link);
void freeReplicationLink(replicationLink *link);
int freeReplicationLink(replicationLink *link);

/* logreqres.c - logging of requests and responses */
void reqresReset(client *c, int free_buf);
Expand Down

0 comments on commit 00e2a90

Please sign in to comment.