diff --git a/src/io_threads.c b/src/io_threads.c index 46fa3d48c1..21e39ccbc0 100644 --- a/src/io_threads.c +++ b/src/io_threads.c @@ -321,7 +321,7 @@ int trySendReadToIOThreads(client *c) { if (server.active_io_threads_num <= 1) return C_ERR; /* If IO thread is already reading, return C_OK to make sure the main thread will not handle it. */ if (c->io_read_state != CLIENT_IDLE) return C_OK; - /* Currently, replica reads are not offloaded to IO threads. */ + /* For simplicity, don't offload replica clients reads as read traffic is negligible */ if (getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR; /* With Lua debug client we may call connWrite directly in the main thread */ if (c->flag.lua_debug) return C_ERR; @@ -364,8 +364,8 @@ int trySendWriteToIOThreads(client *c) { if (c->io_write_state != CLIENT_IDLE) return C_OK; /* Nothing to write */ if (!clientHasPendingReplies(c)) return C_ERR; - /* Currently, replica writes are not offloaded to IO threads. */ - if (getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR; + /* For simplicity, avoid offloading non-online replicas */ + if (getClientType(c) == CLIENT_TYPE_REPLICA && c->repl_state != REPLICA_STATE_ONLINE) return C_ERR; /* We can't offload debugged clients as the main-thread may read at the same time */ if (c->flag.lua_debug) return C_ERR; @@ -392,21 +392,29 @@ int trySendWriteToIOThreads(client *c) { serverAssert(c->clients_pending_write_node.prev == NULL && c->clients_pending_write_node.next == NULL); listLinkNodeTail(server.clients_pending_io_write, &c->clients_pending_write_node); - /* Save the last block of the reply list to io_last_reply_block and the used - * position to io_last_bufpos. The I/O thread will write only up to - * io_last_bufpos, regardless of the c->bufpos value. This is to prevent I/O - * threads from reading data that might be invalid in their local CPU cache. */ - c->io_last_reply_block = listLast(c->reply); - if (c->io_last_reply_block) { - c->io_last_bufpos = ((clientReplyBlock *)listNodeValue(c->io_last_reply_block))->used; + int is_replica = getClientType(c) == CLIENT_TYPE_REPLICA; + if (is_replica) { + c->io_last_reply_block = listLast(server.repl_buffer_blocks); + replBufBlock *o = listNodeValue(c->io_last_reply_block); + c->io_last_bufpos = o->used; } else { - c->io_last_bufpos = (size_t)c->bufpos; + /* Save the last block of the reply list to io_last_reply_block and the used + * position to io_last_bufpos. The I/O thread will write only up to + * io_last_bufpos, regardless of the c->bufpos value. This is to prevent I/O + * threads from reading data that might be invalid in their local CPU cache. */ + c->io_last_reply_block = listLast(c->reply); + if (c->io_last_reply_block) { + c->io_last_bufpos = ((clientReplyBlock *)listNodeValue(c->io_last_reply_block))->used; + } else { + c->io_last_bufpos = (size_t)c->bufpos; + } } - serverAssert(c->bufpos > 0 || c->io_last_bufpos > 0); + + serverAssert(c->bufpos > 0 || c->io_last_bufpos > 0 || is_replica); /* The main-thread will update the client state after the I/O thread completes the write. */ connSetPostponeUpdateState(c->conn, 1); - c->write_flags = 0; + c->write_flags = is_replica ? WRITE_FLAGS_IS_REPLICA : 0; c->io_write_state = CLIENT_PENDING_IO; IOJobQueue_push(jq, ioThreadWriteToClient, c); diff --git a/src/networking.c b/src/networking.c index 3cd206b175..46be496542 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2007,36 +2007,121 @@ client *lookupClientByID(uint64_t id) { return c; } -void writeToReplica(client *c) { - /* Can be called from main-thread only as replica write offload is not supported yet */ - serverAssert(inMainThread()); - int nwritten = 0; +static void postWriteToReplica(client *c) { + if (c->nwritten <= 0) return; + + server.stat_net_repl_output_bytes += c->nwritten; + + /* Locate the last node which has leftover data and + * decrement reference counts of all nodes in front of it. + * Set c->ref_repl_buf_node to point to the last node and + * c->ref_block_pos to the offset within that node */ + listNode *curr = c->ref_repl_buf_node; + listNode *next = NULL; + size_t nwritten = c->nwritten + c->ref_block_pos; + replBufBlock *o = listNodeValue(curr); + + while (nwritten >= o->used) { + next = listNextNode(curr); + if (!next) break; /* End of list */ + + nwritten -= o->used; + o->refcount--; + + curr = next; + o = listNodeValue(curr); + o->refcount++; + } + + serverAssert(nwritten <= o->used); + c->ref_repl_buf_node = curr; + c->ref_block_pos = nwritten; + + incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); +} + +static void writeToReplica(client *c) { + listNode *last_node; + size_t bufpos; + serverAssert(c->bufpos == 0 && listLength(c->reply) == 0); - while (clientHasPendingReplies(c)) { - replBufBlock *o = listNodeValue(c->ref_repl_buf_node); - serverAssert(o->used >= c->ref_block_pos); - - /* Send current block if it is not fully sent. */ - if (o->used > c->ref_block_pos) { - nwritten = connWrite(c->conn, o->buf + c->ref_block_pos, o->used - c->ref_block_pos); - if (nwritten <= 0) { - c->write_flags |= WRITE_FLAGS_WRITE_ERROR; - return; - } - c->nwritten += nwritten; - c->ref_block_pos += nwritten; + /* Determine the last block and buffer position based on thread context */ + if (inMainThread()) { + last_node = listLast(server.repl_buffer_blocks); + if (!last_node) return; + bufpos = ((replBufBlock *)listNodeValue(last_node))->used; + } else { + last_node = c->io_last_reply_block; + serverAssert(last_node != NULL); + bufpos = c->io_last_bufpos; + } + + listNode *first_node = c->ref_repl_buf_node; + + /*Handle the single block case */ + if (first_node == last_node) { + replBufBlock *b = listNodeValue(first_node); + c->nwritten = connWrite(c->conn, b->buf + c->ref_block_pos, bufpos - c->ref_block_pos); + if (c->nwritten <= 0) { + c->write_flags |= WRITE_FLAGS_WRITE_ERROR; } + return; + } + + /* Multiple blocks case */ + ssize_t total_bytes = 0; + int iovcnt = 0; + struct iovec iov_arr[IOV_MAX]; + struct iovec *iov = iov_arr; + int iovmax = min(IOV_MAX, c->conn->iovcnt); + + for (listNode *cur_node = first_node; cur_node != NULL && iovcnt < iovmax; cur_node = listNextNode(cur_node)) { + replBufBlock *cur_block = listNodeValue(cur_node); + size_t start = (cur_node == first_node) ? c->ref_block_pos : 0; + size_t len = (cur_node == last_node) ? bufpos : cur_block->used; + len -= start; + + iov[iovcnt].iov_base = cur_block->buf + start; + iov[iovcnt].iov_len = len; + total_bytes += len; + iovcnt++; + } + + if (total_bytes == 0) return; - /* If we fully sent the object on head, go to the next one. */ - listNode *next = listNextNode(c->ref_repl_buf_node); - if (next && c->ref_block_pos == o->used) { - o->refcount--; - ((replBufBlock *)(listNodeValue(next)))->refcount++; - c->ref_repl_buf_node = next; - c->ref_block_pos = 0; - incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); + ssize_t totwritten = 0; + while (iovcnt > 0) { + int nwritten = connWritev(c->conn, iov, iovcnt); + + if (nwritten <= 0) { + c->write_flags |= WRITE_FLAGS_WRITE_ERROR; + c->nwritten = (totwritten > 0) ? totwritten : nwritten; + return; + } + + totwritten += nwritten; + + if (totwritten == total_bytes) { + break; + } + + /* Update iov array */ + while (nwritten > 0) { + if ((size_t)nwritten < iov[0].iov_len) { + /* partial block written */ + iov[0].iov_base = (char *)iov[0].iov_base + nwritten; + iov[0].iov_len -= nwritten; + break; + } + + /* full block written */ + nwritten -= iov[0].iov_len; + iov++; + iovcnt--; } } + + c->nwritten = totwritten; } /* This function should be called from _writeToClient when the reply list is not empty, @@ -2230,6 +2315,8 @@ int postWriteToClient(client *c) { server.stat_total_writes_processed++; if (getClientType(c) != CLIENT_TYPE_REPLICA) { _postWriteToClient(c); + } else { + postWriteToReplica(c); } if (c->write_flags & WRITE_FLAGS_WRITE_ERROR) { @@ -2788,7 +2875,7 @@ void processMultibulkBuffer(client *c) { serverAssertWithInfo(c, NULL, c->argc == 0); /* Multi bulk length cannot be read without a \r\n */ - newline = strchr(c->querybuf + c->qb_pos, '\r'); + newline = memchr(c->querybuf + c->qb_pos, '\r', sdslen(c->querybuf) - c->qb_pos); if (newline == NULL) { if (sdslen(c->querybuf) - c->qb_pos > PROTO_INLINE_MAX_SIZE) { c->read_flags |= READ_FLAGS_ERROR_BIG_MULTIBULK; @@ -2865,7 +2952,7 @@ void processMultibulkBuffer(client *c) { while (c->multibulklen) { /* Read bulk length if unknown */ if (c->bulklen == -1) { - newline = strchr(c->querybuf + c->qb_pos, '\r'); + newline = memchr(c->querybuf + c->qb_pos, '\r', sdslen(c->querybuf) - c->qb_pos); if (newline == NULL) { if (sdslen(c->querybuf) - c->qb_pos > PROTO_INLINE_MAX_SIZE) { c->read_flags |= READ_FLAGS_ERROR_BIG_BULK_COUNT; @@ -4977,7 +5064,12 @@ void ioThreadWriteToClient(void *data) { client *c = data; serverAssert(c->io_write_state == CLIENT_PENDING_IO); c->nwritten = 0; - _writeToClient(c); + if (c->write_flags & WRITE_FLAGS_IS_REPLICA) { + writeToReplica(c); + } else { + _writeToClient(c); + } + atomic_thread_fence(memory_order_release); c->io_write_state = CLIENT_COMPLETED_IO; } diff --git a/src/server.h b/src/server.h index 48a2143226..b35fcab5e4 100644 --- a/src/server.h +++ b/src/server.h @@ -1270,7 +1270,7 @@ typedef struct client { volatile uint8_t io_write_state; /* Indicate the IO write state of the client */ uint8_t cur_tid; /* ID of IO thread currently performing IO for this client */ int nread; /* Number of bytes of the last read. */ - int nwritten; /* Number of bytes of the last write. */ + ssize_t nwritten; /* Number of bytes of the last write. */ int read_flags; /* Client Read flags - used to communicate the client read state. */ uint16_t write_flags; /* Client Write flags - used to communicate the client write state. */ struct serverCommand *cmd, *lastcmd; /* Last command executed. */ @@ -2799,7 +2799,7 @@ void dictVanillaFree(void *val); /* Write flags for various write errors and states */ #define WRITE_FLAGS_WRITE_ERROR (1 << 0) - +#define WRITE_FLAGS_IS_REPLICA (1 << 1) client *createClient(connection *conn); void freeClient(client *c); diff --git a/src/unit/test_files.h b/src/unit/test_files.h index f25e320452..5cc02fdc5f 100644 --- a/src/unit/test_files.h +++ b/src/unit/test_files.h @@ -100,6 +100,8 @@ int test_listpackBenchmarkLpValidateIntegrity(int argc, char **argv, int flags); int test_listpackBenchmarkLpCompareWithString(int argc, char **argv, int flags); int test_listpackBenchmarkLpCompareWithNumber(int argc, char **argv, int flags); int test_listpackBenchmarkFree(int argc, char **argv, int flags); +int test_writeToReplica(int argc, char **argv, int flags); +int test_postWriteToReplica(int argc, char **argv, int flags); int test_backupAndUpdateClientArgv(int argc, char **argv, int flags); int test_rewriteClientCommandArgument(int argc, char **argv, int flags); int test_object_with_key(int argc, char **argv, int flags); @@ -236,7 +238,7 @@ unitTest __test_hashtable_c[] = {{"test_cursor", test_cursor}, {"test_set_hash_f unitTest __test_intset_c[] = {{"test_intsetValueEncodings", test_intsetValueEncodings}, {"test_intsetBasicAdding", test_intsetBasicAdding}, {"test_intsetLargeNumberRandomAdd", test_intsetLargeNumberRandomAdd}, {"test_intsetUpgradeFromint16Toint32", test_intsetUpgradeFromint16Toint32}, {"test_intsetUpgradeFromint16Toint64", test_intsetUpgradeFromint16Toint64}, {"test_intsetUpgradeFromint32Toint64", test_intsetUpgradeFromint32Toint64}, {"test_intsetStressLookups", test_intsetStressLookups}, {"test_intsetStressAddDelete", test_intsetStressAddDelete}, {NULL, NULL}}; unitTest __test_kvstore_c[] = {{"test_kvstoreAdd16Keys", test_kvstoreAdd16Keys}, {"test_kvstoreIteratorRemoveAllKeysNoDeleteEmptyHashtable", test_kvstoreIteratorRemoveAllKeysNoDeleteEmptyHashtable}, {"test_kvstoreIteratorRemoveAllKeysDeleteEmptyHashtable", test_kvstoreIteratorRemoveAllKeysDeleteEmptyHashtable}, {"test_kvstoreHashtableIteratorRemoveAllKeysNoDeleteEmptyHashtable", test_kvstoreHashtableIteratorRemoveAllKeysNoDeleteEmptyHashtable}, {"test_kvstoreHashtableIteratorRemoveAllKeysDeleteEmptyHashtable", test_kvstoreHashtableIteratorRemoveAllKeysDeleteEmptyHashtable}, {NULL, NULL}}; unitTest __test_listpack_c[] = {{"test_listpackCreateIntList", test_listpackCreateIntList}, {"test_listpackCreateList", test_listpackCreateList}, {"test_listpackLpPrepend", test_listpackLpPrepend}, {"test_listpackLpPrependInteger", test_listpackLpPrependInteger}, {"test_listpackGetELementAtIndex", test_listpackGetELementAtIndex}, {"test_listpackPop", test_listpackPop}, {"test_listpackGetELementAtIndex2", test_listpackGetELementAtIndex2}, {"test_listpackIterate0toEnd", test_listpackIterate0toEnd}, {"test_listpackIterate1toEnd", test_listpackIterate1toEnd}, {"test_listpackIterate2toEnd", test_listpackIterate2toEnd}, {"test_listpackIterateBackToFront", test_listpackIterateBackToFront}, {"test_listpackIterateBackToFrontWithDelete", test_listpackIterateBackToFrontWithDelete}, {"test_listpackDeleteWhenNumIsMinusOne", test_listpackDeleteWhenNumIsMinusOne}, {"test_listpackDeleteWithNegativeIndex", test_listpackDeleteWithNegativeIndex}, {"test_listpackDeleteInclusiveRange0_0", test_listpackDeleteInclusiveRange0_0}, {"test_listpackDeleteInclusiveRange0_1", test_listpackDeleteInclusiveRange0_1}, {"test_listpackDeleteInclusiveRange1_2", test_listpackDeleteInclusiveRange1_2}, {"test_listpackDeleteWitStartIndexOutOfRange", test_listpackDeleteWitStartIndexOutOfRange}, {"test_listpackDeleteWitNumOverflow", test_listpackDeleteWitNumOverflow}, {"test_listpackBatchDelete", test_listpackBatchDelete}, {"test_listpackDeleteFooWhileIterating", test_listpackDeleteFooWhileIterating}, {"test_listpackReplaceWithSameSize", test_listpackReplaceWithSameSize}, {"test_listpackReplaceWithDifferentSize", test_listpackReplaceWithDifferentSize}, {"test_listpackRegressionGt255Bytes", test_listpackRegressionGt255Bytes}, {"test_listpackCreateLongListAndCheckIndices", test_listpackCreateLongListAndCheckIndices}, {"test_listpackCompareStrsWithLpEntries", test_listpackCompareStrsWithLpEntries}, {"test_listpackLpMergeEmptyLps", test_listpackLpMergeEmptyLps}, {"test_listpackLpMergeLp1Larger", test_listpackLpMergeLp1Larger}, {"test_listpackLpMergeLp2Larger", test_listpackLpMergeLp2Larger}, {"test_listpackLpNextRandom", test_listpackLpNextRandom}, {"test_listpackLpNextRandomCC", test_listpackLpNextRandomCC}, {"test_listpackRandomPairWithOneElement", test_listpackRandomPairWithOneElement}, {"test_listpackRandomPairWithManyElements", test_listpackRandomPairWithManyElements}, {"test_listpackRandomPairsWithOneElement", test_listpackRandomPairsWithOneElement}, {"test_listpackRandomPairsWithManyElements", test_listpackRandomPairsWithManyElements}, {"test_listpackRandomPairsUniqueWithOneElement", test_listpackRandomPairsUniqueWithOneElement}, {"test_listpackRandomPairsUniqueWithManyElements", test_listpackRandomPairsUniqueWithManyElements}, {"test_listpackPushVariousEncodings", test_listpackPushVariousEncodings}, {"test_listpackLpFind", test_listpackLpFind}, {"test_listpackLpValidateIntegrity", test_listpackLpValidateIntegrity}, {"test_listpackNumberOfElementsExceedsLP_HDR_NUMELE_UNKNOWN", test_listpackNumberOfElementsExceedsLP_HDR_NUMELE_UNKNOWN}, {"test_listpackStressWithRandom", test_listpackStressWithRandom}, {"test_listpackSTressWithVariableSize", test_listpackSTressWithVariableSize}, {"test_listpackBenchmarkInit", test_listpackBenchmarkInit}, {"test_listpackBenchmarkLpAppend", test_listpackBenchmarkLpAppend}, {"test_listpackBenchmarkLpFindString", test_listpackBenchmarkLpFindString}, {"test_listpackBenchmarkLpFindNumber", test_listpackBenchmarkLpFindNumber}, {"test_listpackBenchmarkLpSeek", test_listpackBenchmarkLpSeek}, {"test_listpackBenchmarkLpValidateIntegrity", test_listpackBenchmarkLpValidateIntegrity}, {"test_listpackBenchmarkLpCompareWithString", test_listpackBenchmarkLpCompareWithString}, {"test_listpackBenchmarkLpCompareWithNumber", test_listpackBenchmarkLpCompareWithNumber}, {"test_listpackBenchmarkFree", test_listpackBenchmarkFree}, {NULL, NULL}}; -unitTest __test_networking_c[] = {{"test_backupAndUpdateClientArgv", test_backupAndUpdateClientArgv}, {"test_rewriteClientCommandArgument", test_rewriteClientCommandArgument}, {NULL, NULL}}; +unitTest __test_networking_c[] = {{"test_writeToReplica", test_writeToReplica}, {"test_postWriteToReplica", test_postWriteToReplica}, {"test_backupAndUpdateClientArgv", test_backupAndUpdateClientArgv}, {"test_rewriteClientCommandArgument", test_rewriteClientCommandArgument}, {NULL, NULL}}; unitTest __test_object_c[] = {{"test_object_with_key", test_object_with_key}, {NULL, NULL}}; unitTest __test_quicklist_c[] = {{"test_quicklistCreateList", test_quicklistCreateList}, {"test_quicklistAddToTailOfEmptyList", test_quicklistAddToTailOfEmptyList}, {"test_quicklistAddToHeadOfEmptyList", test_quicklistAddToHeadOfEmptyList}, {"test_quicklistAddToTail5xAtCompress", test_quicklistAddToTail5xAtCompress}, {"test_quicklistAddToHead5xAtCompress", test_quicklistAddToHead5xAtCompress}, {"test_quicklistAddToTail500xAtCompress", test_quicklistAddToTail500xAtCompress}, {"test_quicklistAddToHead500xAtCompress", test_quicklistAddToHead500xAtCompress}, {"test_quicklistRotateEmpty", test_quicklistRotateEmpty}, {"test_quicklistComprassionPlainNode", test_quicklistComprassionPlainNode}, {"test_quicklistNextPlainNode", test_quicklistNextPlainNode}, {"test_quicklistRotatePlainNode", test_quicklistRotatePlainNode}, {"test_quicklistRotateOneValOnce", test_quicklistRotateOneValOnce}, {"test_quicklistRotate500Val5000TimesAtCompress", test_quicklistRotate500Val5000TimesAtCompress}, {"test_quicklistPopEmpty", test_quicklistPopEmpty}, {"test_quicklistPop1StringFrom1", test_quicklistPop1StringFrom1}, {"test_quicklistPopHead1NumberFrom1", test_quicklistPopHead1NumberFrom1}, {"test_quicklistPopHead500From500", test_quicklistPopHead500From500}, {"test_quicklistPopHead5000From500", test_quicklistPopHead5000From500}, {"test_quicklistIterateForwardOver500List", test_quicklistIterateForwardOver500List}, {"test_quicklistIterateReverseOver500List", test_quicklistIterateReverseOver500List}, {"test_quicklistInsertAfter1Element", test_quicklistInsertAfter1Element}, {"test_quicklistInsertBefore1Element", test_quicklistInsertBefore1Element}, {"test_quicklistInsertHeadWhileHeadNodeIsFull", test_quicklistInsertHeadWhileHeadNodeIsFull}, {"test_quicklistInsertTailWhileTailNodeIsFull", test_quicklistInsertTailWhileTailNodeIsFull}, {"test_quicklistInsertOnceInElementsWhileIteratingAtCompress", test_quicklistInsertOnceInElementsWhileIteratingAtCompress}, {"test_quicklistInsertBefore250NewInMiddleOf500ElementsAtCompress", test_quicklistInsertBefore250NewInMiddleOf500ElementsAtCompress}, {"test_quicklistInsertAfter250NewInMiddleOf500ElementsAtCompress", test_quicklistInsertAfter250NewInMiddleOf500ElementsAtCompress}, {"test_quicklistDuplicateEmptyList", test_quicklistDuplicateEmptyList}, {"test_quicklistDuplicateListOf1Element", test_quicklistDuplicateListOf1Element}, {"test_quicklistDuplicateListOf500", test_quicklistDuplicateListOf500}, {"test_quicklistIndex1200From500ListAtFill", test_quicklistIndex1200From500ListAtFill}, {"test_quicklistIndex12From500ListAtFill", test_quicklistIndex12From500ListAtFill}, {"test_quicklistIndex100From500ListAtFill", test_quicklistIndex100From500ListAtFill}, {"test_quicklistIndexTooBig1From50ListAtFill", test_quicklistIndexTooBig1From50ListAtFill}, {"test_quicklistDeleteRangeEmptyList", test_quicklistDeleteRangeEmptyList}, {"test_quicklistDeleteRangeOfEntireNodeInListOfOneNode", test_quicklistDeleteRangeOfEntireNodeInListOfOneNode}, {"test_quicklistDeleteRangeOfEntireNodeWithOverflowCounts", test_quicklistDeleteRangeOfEntireNodeWithOverflowCounts}, {"test_quicklistDeleteMiddle100Of500List", test_quicklistDeleteMiddle100Of500List}, {"test_quicklistDeleteLessThanFillButAcrossNodes", test_quicklistDeleteLessThanFillButAcrossNodes}, {"test_quicklistDeleteNegative1From500List", test_quicklistDeleteNegative1From500List}, {"test_quicklistDeleteNegative1From500ListWithOverflowCounts", test_quicklistDeleteNegative1From500ListWithOverflowCounts}, {"test_quicklistDeleteNegative100From500List", test_quicklistDeleteNegative100From500List}, {"test_quicklistDelete10Count5From50List", test_quicklistDelete10Count5From50List}, {"test_quicklistNumbersOnlyListRead", test_quicklistNumbersOnlyListRead}, {"test_quicklistNumbersLargerListRead", test_quicklistNumbersLargerListRead}, {"test_quicklistNumbersLargerListReadB", test_quicklistNumbersLargerListReadB}, {"test_quicklistLremTestAtCompress", test_quicklistLremTestAtCompress}, {"test_quicklistIterateReverseDeleteAtCompress", test_quicklistIterateReverseDeleteAtCompress}, {"test_quicklistIteratorAtIndexTestAtCompress", test_quicklistIteratorAtIndexTestAtCompress}, {"test_quicklistLtrimTestAAtCompress", test_quicklistLtrimTestAAtCompress}, {"test_quicklistLtrimTestBAtCompress", test_quicklistLtrimTestBAtCompress}, {"test_quicklistLtrimTestCAtCompress", test_quicklistLtrimTestCAtCompress}, {"test_quicklistLtrimTestDAtCompress", test_quicklistLtrimTestDAtCompress}, {"test_quicklistVerifySpecificCompressionOfInteriorNodes", test_quicklistVerifySpecificCompressionOfInteriorNodes}, {"test_quicklistBookmarkGetUpdatedToNextItem", test_quicklistBookmarkGetUpdatedToNextItem}, {"test_quicklistBookmarkLimit", test_quicklistBookmarkLimit}, {"test_quicklistCompressAndDecompressQuicklistListpackNode", test_quicklistCompressAndDecompressQuicklistListpackNode}, {"test_quicklistCompressAndDecomressQuicklistPlainNodeLargeThanUINT32MAX", test_quicklistCompressAndDecomressQuicklistPlainNodeLargeThanUINT32MAX}, {NULL, NULL}}; unitTest __test_rax_c[] = {{"test_raxRandomWalk", test_raxRandomWalk}, {"test_raxIteratorUnitTests", test_raxIteratorUnitTests}, {"test_raxTryInsertUnitTests", test_raxTryInsertUnitTests}, {"test_raxRegressionTest1", test_raxRegressionTest1}, {"test_raxRegressionTest2", test_raxRegressionTest2}, {"test_raxRegressionTest3", test_raxRegressionTest3}, {"test_raxRegressionTest4", test_raxRegressionTest4}, {"test_raxRegressionTest5", test_raxRegressionTest5}, {"test_raxRegressionTest6", test_raxRegressionTest6}, {"test_raxBenchmark", test_raxBenchmark}, {"test_raxHugeKey", test_raxHugeKey}, {"test_raxFuzz", test_raxFuzz}, {NULL, NULL}}; diff --git a/src/unit/test_networking.c b/src/unit/test_networking.c index 566583bcc5..d1be23160e 100644 --- a/src/unit/test_networking.c +++ b/src/unit/test_networking.c @@ -4,6 +4,305 @@ #include +/* Mock structures and functions */ +typedef struct mockConnection { + connection conn; + int error; + char *buffer; + size_t buf_size; + size_t written; +} mockConnection; + +/* Mock connWrite function */ +static int mock_connWrite(connection *conn, const void *data, size_t size) { + mockConnection *mock = (mockConnection *)conn; + if (mock->error) return -1; + + size_t to_write = size; + if (mock->written + to_write > mock->buf_size) { + to_write = mock->buf_size - mock->written; + } + + memcpy(mock->buffer + mock->written, data, to_write); + mock->written += to_write; + return to_write; +} + +/* Mock connWritev function */ +static int mock_connWritev(connection *conn, const struct iovec *iov, int iovcnt) { + mockConnection *mock = (mockConnection *)conn; + if (mock->error) return -1; + + size_t total = 0; + for (int i = 0; i < iovcnt; i++) { + size_t to_write = iov[i].iov_len; + if (mock->written + to_write > mock->buf_size) { + to_write = mock->buf_size - mock->written; + } + if (to_write == 0) break; + + memcpy(mock->buffer + mock->written, iov[i].iov_base, to_write); + mock->written += to_write; + total += to_write; + } + return total; +} + +/* Mock connection type */ +static ConnectionType CT_Mock = { + .write = mock_connWrite, + .writev = mock_connWritev, +}; + +static mockConnection *connCreateMock(void) { + mockConnection *conn = zcalloc(sizeof(mockConnection)); + conn->conn.type = &CT_Mock; + conn->conn.fd = -1; + conn->conn.iovcnt = IOV_MAX; + return conn; +} + +int test_writeToReplica(int argc, char **argv, int flags) { + UNUSED(argc); + UNUSED(argv); + UNUSED(flags); + + client *c = zcalloc(sizeof(client)); + server.repl_buffer_blocks = listCreate(); + c->reply = listCreate(); + + /* Test 1: Single block write */ + { + mockConnection *mock_conn = connCreateMock(); + mock_conn->buffer = zmalloc(1024); + mock_conn->buf_size = 1024; + c->conn = (connection *)mock_conn; + + /* Create replication buffer block */ + replBufBlock *block = zmalloc(sizeof(replBufBlock) + 128); + block->size = 128; + block->used = 64; + memset(block->buf, 'A', 64); + + /* Setup client state */ + listAddNodeTail(server.repl_buffer_blocks, block); + c->ref_repl_buf_node = listFirst(server.repl_buffer_blocks); + c->ref_block_pos = 0; + c->bufpos = 0; + + writeToReplica(c); + + TEST_ASSERT(c->nwritten == 64); + TEST_ASSERT(mock_conn->written == 64); + TEST_ASSERT(memcmp(mock_conn->buffer, block->buf, 64) == 0); + TEST_ASSERT((c->write_flags & WRITE_FLAGS_WRITE_ERROR) == 0); + + /* Cleanup */ + zfree(mock_conn->buffer); + zfree(mock_conn); + zfree(block); + listEmpty(server.repl_buffer_blocks); + } + + /* Test 2: Multiple blocks write */ + { + mockConnection *mock_conn = connCreateMock(); + mock_conn->error = 0; + mock_conn->written = 0; + mock_conn->buffer = zmalloc(1024); + mock_conn->buf_size = 1024; + c->conn = (connection *)mock_conn; + + /* Create multiple replication buffer blocks */ + replBufBlock *block1 = zmalloc(sizeof(replBufBlock) + 128); + replBufBlock *block2 = zmalloc(sizeof(replBufBlock) + 128); + block1->size = 128; + block1->used = 64; + block2->size = 128; + block2->used = 32; + memset(block1->buf, 'A', 64); + memset(block2->buf, 'B', 32); + + /* Setup client state */ + listAddNodeTail(server.repl_buffer_blocks, block1); + listAddNodeTail(server.repl_buffer_blocks, block2); + c->ref_repl_buf_node = listFirst(server.repl_buffer_blocks); + c->ref_block_pos = 0; + c->bufpos = 0; + + writeToReplica(c); + + TEST_ASSERT(c->nwritten == 96); /* 64 + 32 */ + TEST_ASSERT(mock_conn->written == 96); + TEST_ASSERT(memcmp(mock_conn->buffer, block1->buf, 64) == 0); + TEST_ASSERT(memcmp(mock_conn->buffer + 64, block2->buf, 32) == 0); + TEST_ASSERT((c->write_flags & WRITE_FLAGS_WRITE_ERROR) == 0); + + /* Cleanup */ + zfree(mock_conn->buffer); + zfree(mock_conn); + zfree(block1); + zfree(block2); + listEmpty(server.repl_buffer_blocks); + } + + /* Test 3: Write error */ + { + mockConnection *mock_conn = connCreateMock(); + mock_conn->error = 1; /* Simulate write error */ + mock_conn->buffer = zmalloc(1024); + mock_conn->buf_size = 1024; + mock_conn->written = 0; + c->conn = (connection *)mock_conn; + + /* Create replication buffer block */ + replBufBlock *block = zmalloc(sizeof(replBufBlock) + 128); + block->size = 128; + block->used = 64; + memset(block->buf, 'A', 64); + + /* Setup client state */ + listAddNodeTail(server.repl_buffer_blocks, block); + c->ref_repl_buf_node = listFirst(server.repl_buffer_blocks); + c->ref_block_pos = 0; + c->bufpos = 0; + + writeToReplica(c); + + TEST_ASSERT(c->nwritten <= 0); + TEST_ASSERT((c->write_flags & WRITE_FLAGS_WRITE_ERROR) != 0); + + /* Cleanup */ + listEmpty(server.repl_buffer_blocks); + zfree(mock_conn->buffer); + zfree(mock_conn); + zfree(block); + } + + /* Cleanup */ + listRelease(server.repl_buffer_blocks); + listRelease(c->reply); + zfree(c); + + return 0; +} + +int test_postWriteToReplica(int argc, char **argv, int flags) { + UNUSED(argc); + UNUSED(argv); + UNUSED(flags); + + client *c = zcalloc(sizeof(client)); + server.repl_buffer_blocks = listCreate(); + c->reply = listCreate(); + createReplicationBacklog(); + + /* Test 1: No write case */ + { + c->nwritten = 0; + server.stat_net_repl_output_bytes = 0; + + postWriteToReplica(c); + + TEST_ASSERT(server.stat_net_repl_output_bytes == 0); + } + + /* Test 2: Single block partial write */ + { + replBufBlock *block = zmalloc(sizeof(replBufBlock) + 128); + block->size = 128; + block->used = 100; + block->refcount = 1; + + listAddNodeTail(server.repl_buffer_blocks, block); + c->ref_repl_buf_node = listFirst(server.repl_buffer_blocks); + c->ref_block_pos = 20; + c->nwritten = 30; + + server.stat_net_repl_output_bytes = 0; + + postWriteToReplica(c); + + TEST_ASSERT(server.stat_net_repl_output_bytes == 30); + TEST_ASSERT(c->ref_block_pos == 50); /* 20 + 30 */ + TEST_ASSERT(c->ref_repl_buf_node == listFirst(server.repl_buffer_blocks)); + TEST_ASSERT(block->refcount == 1); + + /* Cleanup */ + zfree(block); + listEmpty(server.repl_buffer_blocks); + } + + /* Test 3: Multiple blocks write */ + { + replBufBlock *block1 = zmalloc(sizeof(replBufBlock) + 128); + replBufBlock *block2 = zmalloc(sizeof(replBufBlock) + 128); + block1->size = 128; + block1->used = 64; + block1->refcount = 1; + block2->size = 128; + block2->used = 100; + block2->refcount = 0; + + listAddNodeTail(server.repl_buffer_blocks, block1); + listAddNodeTail(server.repl_buffer_blocks, block2); + c->ref_repl_buf_node = listFirst(server.repl_buffer_blocks); + c->ref_block_pos = 30; + c->nwritten = 50; + + server.stat_net_repl_output_bytes = 0; + + postWriteToReplica(c); + + TEST_ASSERT(server.stat_net_repl_output_bytes == 50); + TEST_ASSERT(c->ref_block_pos == 16); /* (30 + 50) - 64 */ + TEST_ASSERT(c->ref_repl_buf_node == listLast(server.repl_buffer_blocks)); + TEST_ASSERT(block1->refcount == 0); + TEST_ASSERT(block2->refcount == 1); + + /* Cleanup */ + zfree(block1); + zfree(block2); + listEmpty(server.repl_buffer_blocks); + } + + /* Test 4: Write exactly to block boundary */ + { + replBufBlock *block = zmalloc(sizeof(replBufBlock) + 128); + block->size = 128; + block->used = 64; + block->refcount = 1; + + /* Setup client state */ + listAddNodeTail(server.repl_buffer_blocks, block); + c->ref_repl_buf_node = listFirst(server.repl_buffer_blocks); + c->ref_block_pos = 30; + c->nwritten = 34; /* Should reach exactly the end of block */ + + server.stat_net_repl_output_bytes = 0; + + postWriteToReplica(c); + + TEST_ASSERT(server.stat_net_repl_output_bytes == 34); + TEST_ASSERT(c->ref_block_pos == 64); + TEST_ASSERT(c->ref_repl_buf_node == listFirst(server.repl_buffer_blocks)); + TEST_ASSERT(block->refcount == 1); /* we don't free the last block even if it's fully written */ + + /* Cleanup */ + zfree(block); + listEmpty(server.repl_buffer_blocks); + } + + /* Cleanup */ + raxFree(server.repl_backlog->blocks_index); + zfree(server.repl_backlog); + listRelease(server.repl_buffer_blocks); + listRelease(c->reply); + zfree(c); + + return 0; +} + int test_backupAndUpdateClientArgv(int argc, char **argv, int flags) { UNUSED(argc); UNUSED(argv); diff --git a/tests/unit/networking.tcl b/tests/unit/networking.tcl index 9eaf467477..f3a021201e 100644 --- a/tests/unit/networking.tcl +++ b/tests/unit/networking.tcl @@ -285,6 +285,46 @@ start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-deb set info [r info stats] set new_prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries] assert_equal $prefetch_entries $new_prefetch_entries + } + + start_server {} { + test {replicas writes are offloaded to IO threads} { + set master [srv -1 client] + set master_host [srv -1 host] + set master_port [srv -1 port] + + set replica [srv 0 client] + $replica replicaof $master_host $master_port + + wait_for_condition 500 100 { + [s 0 master_link_status] eq {up} + } else { + fail "Replication not started." + } + + # get the current io_threaded_writes_processed + set info [$master info stats] + set io_threaded_writes_processed [getInfoProperty $info io_threaded_writes_processed] + + # Send a write command to the master + $master set a 1 + + # Wait for the write to be propagated to the replica + wait_for_condition 50 100 { + [$replica get a] eq {1} + } else { + fail "Replication not propagated." + } + + # Get the new io_threaded_writes_processed + set info [$master info stats] + set new_io_threaded_writes_processed [getInfoProperty $info io_threaded_writes_processed] + # Assert new is old + 3, 3 for the write to the info-client, set-client and to the replica. + assert {$new_io_threaded_writes_processed >= $io_threaded_writes_processed + 3} ; + + # Verify the write was propagated to the replica + assert_equal {1} [$replica get a] + } } } }