Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offload replication writes to IO threads #1485

Open
wants to merge 4 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 23 additions & 14 deletions src/io_threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,8 @@ int trySendReadToIOThreads(client *c) {
if (server.active_io_threads_num <= 1) return C_ERR;
/* If IO thread is already reading, return C_OK to make sure the main thread will not handle it. */
if (c->io_read_state != CLIENT_IDLE) return C_OK;
/* Currently, replica/master writes are not offloaded and are processed synchronously. */
if (c->flag.primary || getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR;
/* For simplicity, don't offload replica clients reads as read traffic is negligible */
if (getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR;
/* With Lua debug client we may call connWrite directly in the main thread */
if (c->flag.lua_debug) return C_ERR;
/* For simplicity let the main-thread handle the blocked clients */
Expand All @@ -345,6 +345,7 @@ int trySendReadToIOThreads(client *c) {
c->cur_tid = tid;
c->read_flags = canParseCommand(c) ? 0 : READ_FLAGS_DONT_PARSE;
c->read_flags |= authRequired(c) ? READ_FLAGS_AUTH_REQUIRED : 0;
c->read_flags |= c->flag.primary ? READ_FLAGS_PRIMARY : 0;

c->io_read_state = CLIENT_PENDING_IO;
connSetPostponeUpdateState(c->conn, 1);
Expand All @@ -363,8 +364,8 @@ int trySendWriteToIOThreads(client *c) {
if (c->io_write_state != CLIENT_IDLE) return C_OK;
/* Nothing to write */
if (!clientHasPendingReplies(c)) return C_ERR;
/* Currently, replica/master writes are not offloaded and are processed synchronously. */
if (c->flag.primary || getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR;
/* For simplicity, avoid offloading non-online replicas */
if (getClientType(c) == CLIENT_TYPE_REPLICA && c->repl_state != REPLICA_STATE_ONLINE) return C_ERR;
/* We can't offload debugged clients as the main-thread may read at the same time */
if (c->flag.lua_debug) return C_ERR;

Expand All @@ -391,21 +392,29 @@ int trySendWriteToIOThreads(client *c) {
serverAssert(c->clients_pending_write_node.prev == NULL && c->clients_pending_write_node.next == NULL);
listLinkNodeTail(server.clients_pending_io_write, &c->clients_pending_write_node);

/* Save the last block of the reply list to io_last_reply_block and the used
* position to io_last_bufpos. The I/O thread will write only up to
* io_last_bufpos, regardless of the c->bufpos value. This is to prevent I/O
* threads from reading data that might be invalid in their local CPU cache. */
c->io_last_reply_block = listLast(c->reply);
if (c->io_last_reply_block) {
c->io_last_bufpos = ((clientReplyBlock *)listNodeValue(c->io_last_reply_block))->used;
int is_replica = getClientType(c) == CLIENT_TYPE_REPLICA;
if (is_replica) {
c->io_last_reply_block = listLast(server.repl_buffer_blocks);
replBufBlock *o = listNodeValue(c->io_last_reply_block);
c->io_last_bufpos = o->used;
} else {
c->io_last_bufpos = (size_t)c->bufpos;
/* Save the last block of the reply list to io_last_reply_block and the used
* position to io_last_bufpos. The I/O thread will write only up to
* io_last_bufpos, regardless of the c->bufpos value. This is to prevent I/O
* threads from reading data that might be invalid in their local CPU cache. */
c->io_last_reply_block = listLast(c->reply);
if (c->io_last_reply_block) {
c->io_last_bufpos = ((clientReplyBlock *)listNodeValue(c->io_last_reply_block))->used;
} else {
c->io_last_bufpos = (size_t)c->bufpos;
}
}
serverAssert(c->bufpos > 0 || c->io_last_bufpos > 0);

serverAssert(c->bufpos > 0 || c->io_last_bufpos > 0 || is_replica);

/* The main-thread will update the client state after the I/O thread completes the write. */
connSetPostponeUpdateState(c->conn, 1);
c->write_flags = 0;
c->write_flags = is_replica ? WRITE_FLAGS_IS_REPLICA : 0;
c->io_write_state = CLIENT_PENDING_IO;

IOJobQueue_push(jq, ioThreadWriteToClient, c);
Expand Down
164 changes: 135 additions & 29 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -2007,36 +2007,121 @@ client *lookupClientByID(uint64_t id) {
return c;
}

void writeToReplica(client *c) {
/* Can be called from main-thread only as replica write offload is not supported yet */
serverAssert(inMainThread());
int nwritten = 0;
static void postWriteToReplica(client *c) {
if (c->nwritten <= 0) return;

server.stat_net_repl_output_bytes += c->nwritten;

/* Locate the last node which has leftover data and
* decrement reference counts of all nodes in front of it.
* Set c->ref_repl_buf_node to point to the last node and
* c->ref_block_pos to the offset within that node */
listNode *curr = c->ref_repl_buf_node;
listNode *next = NULL;
size_t nwritten = c->nwritten + c->ref_block_pos;
replBufBlock *o = listNodeValue(curr);

while (nwritten >= o->used) {
next = listNextNode(curr);
if (!next) break; /* End of list */

nwritten -= o->used;
o->refcount--;

curr = next;
o = listNodeValue(curr);
o->refcount++;
}

serverAssert(nwritten <= o->used);
c->ref_repl_buf_node = curr;
c->ref_block_pos = nwritten;

incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
}

static void writeToReplica(client *c) {
listNode *last_node;
size_t bufpos;

serverAssert(c->bufpos == 0 && listLength(c->reply) == 0);
while (clientHasPendingReplies(c)) {
replBufBlock *o = listNodeValue(c->ref_repl_buf_node);
serverAssert(o->used >= c->ref_block_pos);

/* Send current block if it is not fully sent. */
if (o->used > c->ref_block_pos) {
nwritten = connWrite(c->conn, o->buf + c->ref_block_pos, o->used - c->ref_block_pos);
if (nwritten <= 0) {
c->write_flags |= WRITE_FLAGS_WRITE_ERROR;
return;
}
c->nwritten += nwritten;
c->ref_block_pos += nwritten;
/* Determine the last block and buffer position based on thread context */
if (inMainThread()) {
last_node = listLast(server.repl_buffer_blocks);
if (!last_node) return;
bufpos = ((replBufBlock *)listNodeValue(last_node))->used;
} else {
last_node = c->io_last_reply_block;
serverAssert(last_node != NULL);
bufpos = c->io_last_bufpos;
}
Comment on lines +2044 to +2057
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider simplifying this code to reduce duplication and improve clarity, eg:

    listNode *last_node = inMainThread() ? listLast(server.repl_buffer_blocks) : c->io_last_reply_block;
    if (!last_node) return;

    size_t bufpos = inMainThread() ? 
        ((replBufBlock *)listNodeValue(last_node))->used :  c->io_last_bufpos;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the current version is clearer since we check inMainThread() only once. Additionally, we handle the !last_node case differently depending on whether we are in the main thread or not.


listNode *first_node = c->ref_repl_buf_node;

/*Handle the single block case */
if (first_node == last_node) {
replBufBlock *b = listNodeValue(first_node);
c->nwritten = connWrite(c->conn, b->buf + c->ref_block_pos, bufpos - c->ref_block_pos);
if (c->nwritten <= 0) {
c->write_flags |= WRITE_FLAGS_WRITE_ERROR;
}
return;
}

/* Multiple blocks case */
ssize_t total_bytes = 0;
int iovcnt = 0;
struct iovec iov_arr[IOV_MAX];
struct iovec *iov = iov_arr;
int iovmax = min(IOV_MAX, c->conn->iovcnt);

for (listNode *cur_node = first_node; cur_node != NULL && iovcnt < iovmax; cur_node = listNextNode(cur_node)) {
replBufBlock *cur_block = listNodeValue(cur_node);
size_t start = (cur_node == first_node) ? c->ref_block_pos : 0;
size_t len = (cur_node == last_node) ? bufpos : cur_block->used;
len -= start;

iov[iovcnt].iov_base = cur_block->buf + start;
iov[iovcnt].iov_len = len;
total_bytes += len;
iovcnt++;
}

if (total_bytes == 0) return;

ssize_t totwritten = 0;
while (iovcnt > 0) {
int nwritten = connWritev(c->conn, iov, iovcnt);

if (nwritten <= 0) {
c->write_flags |= WRITE_FLAGS_WRITE_ERROR;
c->nwritten = (totwritten > 0) ? totwritten : nwritten;
return;
}

totwritten += nwritten;

if (totwritten == total_bytes) {
break;
}

/* If we fully sent the object on head, go to the next one. */
listNode *next = listNextNode(c->ref_repl_buf_node);
if (next && c->ref_block_pos == o->used) {
o->refcount--;
((replBufBlock *)(listNodeValue(next)))->refcount++;
c->ref_repl_buf_node = next;
c->ref_block_pos = 0;
incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
/* Update iov array */
while (nwritten > 0) {
if ((size_t)nwritten < iov[0].iov_len) {
/* partial block written */
iov[0].iov_base = (char *)iov[0].iov_base + nwritten;
iov[0].iov_len -= nwritten;
break;
}

/* full block written */
nwritten -= iov[0].iov_len;
iov++;
iovcnt--;
}
}

c->nwritten = totwritten;
}

/* This function should be called from _writeToClient when the reply list is not empty,
Expand Down Expand Up @@ -2230,6 +2315,8 @@ int postWriteToClient(client *c) {
server.stat_total_writes_processed++;
if (getClientType(c) != CLIENT_TYPE_REPLICA) {
_postWriteToClient(c);
} else {
postWriteToReplica(c);
}

if (c->write_flags & WRITE_FLAGS_WRITE_ERROR) {
Expand Down Expand Up @@ -2589,6 +2676,16 @@ void resetClient(client *c) {
}
}

void resetClientIOState(client *c) {
c->nwritten = 0;
c->nread = 0;
c->io_read_state = c->io_write_state = CLIENT_IDLE;
c->io_parsed_cmd = NULL;
c->flag.pending_command = 0;
c->io_last_bufpos = 0;
c->io_last_reply_block = NULL;
}

/* Initializes the shared query buffer to a new sds with the default capacity.
* Need to ensure the initlen is not less than readlen in readToQueryBuf. */
void initSharedQueryBuf(void) {
Expand Down Expand Up @@ -2778,7 +2875,7 @@ void processMultibulkBuffer(client *c) {
serverAssertWithInfo(c, NULL, c->argc == 0);

/* Multi bulk length cannot be read without a \r\n */
newline = strchr(c->querybuf + c->qb_pos, '\r');
newline = memchr(c->querybuf + c->qb_pos, '\r', sdslen(c->querybuf) - c->qb_pos);
if (newline == NULL) {
if (sdslen(c->querybuf) - c->qb_pos > PROTO_INLINE_MAX_SIZE) {
c->read_flags |= READ_FLAGS_ERROR_BIG_MULTIBULK;
Expand Down Expand Up @@ -2855,7 +2952,7 @@ void processMultibulkBuffer(client *c) {
while (c->multibulklen) {
/* Read bulk length if unknown */
if (c->bulklen == -1) {
newline = strchr(c->querybuf + c->qb_pos, '\r');
newline = memchr(c->querybuf + c->qb_pos, '\r', sdslen(c->querybuf) - c->qb_pos);
if (newline == NULL) {
if (sdslen(c->querybuf) - c->qb_pos > PROTO_INLINE_MAX_SIZE) {
c->read_flags |= READ_FLAGS_ERROR_BIG_BULK_COUNT;
Expand Down Expand Up @@ -4954,7 +5051,11 @@ void ioThreadReadQueryFromClient(void *data) {
}

done:
trimClientQueryBuffer(c);
/* Only trim query buffer for non-primary clients
* Primary client's buffer is handled by main thread using repl_applied position */
if (!(c->read_flags & READ_FLAGS_PRIMARY)) {
trimClientQueryBuffer(c);
}
atomic_thread_fence(memory_order_release);
c->io_read_state = CLIENT_COMPLETED_IO;
}
Expand All @@ -4963,7 +5064,12 @@ void ioThreadWriteToClient(void *data) {
client *c = data;
serverAssert(c->io_write_state == CLIENT_PENDING_IO);
c->nwritten = 0;
_writeToClient(c);
if (c->write_flags & WRITE_FLAGS_IS_REPLICA) {
writeToReplica(c);
} else {
_writeToClient(c);
}

atomic_thread_fence(memory_order_release);
c->io_write_state = CLIENT_COMPLETED_IO;
}
3 changes: 3 additions & 0 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -4136,6 +4136,8 @@ void replicationCachePrimary(client *c) {
serverAssert(server.primary != NULL && server.cached_primary == NULL);
serverLog(LL_NOTICE, "Caching the disconnected primary state.");

/* Wait for IO operations to be done before proceeding */
waitForClientIO(c);
/* Unlink the client from the server structures. */
unlinkClient(c);

Expand All @@ -4153,6 +4155,7 @@ void replicationCachePrimary(client *c) {
c->reply_bytes = 0;
c->bufpos = 0;
resetClient(c);
resetClientIOState(c);

/* Save the primary. Server.primary will be set to null later by
* replicationHandlePrimaryDisconnection(). */
Expand Down
5 changes: 3 additions & 2 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1270,7 +1270,7 @@ typedef struct client {
volatile uint8_t io_write_state; /* Indicate the IO write state of the client */
uint8_t cur_tid; /* ID of IO thread currently performing IO for this client */
int nread; /* Number of bytes of the last read. */
int nwritten; /* Number of bytes of the last write. */
ssize_t nwritten; /* Number of bytes of the last write. */
int read_flags; /* Client Read flags - used to communicate the client read state. */
uint16_t write_flags; /* Client Write flags - used to communicate the client write state. */
struct serverCommand *cmd, *lastcmd; /* Last command executed. */
Expand Down Expand Up @@ -2799,7 +2799,7 @@ void dictVanillaFree(void *val);

/* Write flags for various write errors and states */
#define WRITE_FLAGS_WRITE_ERROR (1 << 0)

#define WRITE_FLAGS_IS_REPLICA (1 << 1)

client *createClient(connection *conn);
void freeClient(client *c);
Expand All @@ -2808,6 +2808,7 @@ void logInvalidUseAndFreeClientAsync(client *c, const char *fmt, ...);
void beforeNextClient(client *c);
void clearClientConnectionState(client *c);
void resetClient(client *c);
void resetClientIOState(client *c);
void freeClientOriginalArgv(client *c);
void freeClientArgv(client *c);
void sendReplyToClient(connection *conn);
Expand Down
Loading
Loading