Skip to content

Commit

Permalink
Offload replication writes to IO threads
Browse files Browse the repository at this point in the history
Signed-off-by: Uri Yagelnik <[email protected]>
  • Loading branch information
uriyage committed Dec 24, 2024
1 parent 39f0a48 commit 846c816
Show file tree
Hide file tree
Showing 6 changed files with 485 additions and 44 deletions.
34 changes: 21 additions & 13 deletions src/io_threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ int trySendReadToIOThreads(client *c) {
if (server.active_io_threads_num <= 1) return C_ERR;
/* If IO thread is already reading, return C_OK to make sure the main thread will not handle it. */
if (c->io_read_state != CLIENT_IDLE) return C_OK;
/* Currently, replica reads are not offloaded to IO threads. */
/* For simplicity, don't offload replica clients reads as read traffic is negligible */
if (getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR;
/* With Lua debug client we may call connWrite directly in the main thread */
if (c->flag.lua_debug) return C_ERR;
Expand Down Expand Up @@ -364,8 +364,8 @@ int trySendWriteToIOThreads(client *c) {
if (c->io_write_state != CLIENT_IDLE) return C_OK;
/* Nothing to write */
if (!clientHasPendingReplies(c)) return C_ERR;
/* Currently, replica writes are not offloaded to IO threads. */
if (getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR;
/* For simplicity, avoid offloading non-online replicas */
if (getClientType(c) == CLIENT_TYPE_REPLICA && c->repl_state != REPLICA_STATE_ONLINE) return C_ERR;
/* We can't offload debugged clients as the main-thread may read at the same time */
if (c->flag.lua_debug) return C_ERR;

Expand All @@ -392,21 +392,29 @@ int trySendWriteToIOThreads(client *c) {
serverAssert(c->clients_pending_write_node.prev == NULL && c->clients_pending_write_node.next == NULL);
listLinkNodeTail(server.clients_pending_io_write, &c->clients_pending_write_node);

/* Save the last block of the reply list to io_last_reply_block and the used
* position to io_last_bufpos. The I/O thread will write only up to
* io_last_bufpos, regardless of the c->bufpos value. This is to prevent I/O
* threads from reading data that might be invalid in their local CPU cache. */
c->io_last_reply_block = listLast(c->reply);
if (c->io_last_reply_block) {
c->io_last_bufpos = ((clientReplyBlock *)listNodeValue(c->io_last_reply_block))->used;
int is_replica = getClientType(c) == CLIENT_TYPE_REPLICA;
if (is_replica) {
c->io_last_reply_block = listLast(server.repl_buffer_blocks);
replBufBlock *o = listNodeValue(c->io_last_reply_block);
c->io_last_bufpos = o->used;
} else {
c->io_last_bufpos = (size_t)c->bufpos;
/* Save the last block of the reply list to io_last_reply_block and the used
* position to io_last_bufpos. The I/O thread will write only up to
* io_last_bufpos, regardless of the c->bufpos value. This is to prevent I/O
* threads from reading data that might be invalid in their local CPU cache. */
c->io_last_reply_block = listLast(c->reply);
if (c->io_last_reply_block) {
c->io_last_bufpos = ((clientReplyBlock *)listNodeValue(c->io_last_reply_block))->used;
} else {
c->io_last_bufpos = (size_t)c->bufpos;
}
}
serverAssert(c->bufpos > 0 || c->io_last_bufpos > 0);

serverAssert(c->bufpos > 0 || c->io_last_bufpos > 0 || is_replica);

/* The main-thread will update the client state after the I/O thread completes the write. */
connSetPostponeUpdateState(c->conn, 1);
c->write_flags = 0;
c->write_flags = is_replica ? WRITE_FLAGS_IS_REPLICA : 0;
c->io_write_state = CLIENT_PENDING_IO;

IOJobQueue_push(jq, ioThreadWriteToClient, c);
Expand Down
148 changes: 120 additions & 28 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -2007,36 +2007,121 @@ client *lookupClientByID(uint64_t id) {
return c;
}

void writeToReplica(client *c) {
/* Can be called from main-thread only as replica write offload is not supported yet */
serverAssert(inMainThread());
int nwritten = 0;
static void postWriteToReplica(client *c) {
if (c->nwritten <= 0) return;

server.stat_net_repl_output_bytes += c->nwritten;

/* Locate the last node which has leftover data and
* decrement reference counts of all nodes in front of it.
* Set c->ref_repl_buf_node to point to the last node and
* c->ref_block_pos to the offset within that node */
listNode *curr = c->ref_repl_buf_node;
listNode *next = NULL;
size_t nwritten = c->nwritten + c->ref_block_pos;
replBufBlock *o = listNodeValue(curr);

while (nwritten >= o->used) {
next = listNextNode(curr);
if (!next) break; /* End of list */

nwritten -= o->used;
o->refcount--;

curr = next;
o = listNodeValue(curr);
o->refcount++;
}

serverAssert(nwritten <= o->used);
c->ref_repl_buf_node = curr;
c->ref_block_pos = nwritten;

incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
}

static void writeToReplica(client *c) {
listNode *last_node;
size_t bufpos;

serverAssert(c->bufpos == 0 && listLength(c->reply) == 0);
while (clientHasPendingReplies(c)) {
replBufBlock *o = listNodeValue(c->ref_repl_buf_node);
serverAssert(o->used >= c->ref_block_pos);

/* Send current block if it is not fully sent. */
if (o->used > c->ref_block_pos) {
nwritten = connWrite(c->conn, o->buf + c->ref_block_pos, o->used - c->ref_block_pos);
if (nwritten <= 0) {
c->write_flags |= WRITE_FLAGS_WRITE_ERROR;
return;
}
c->nwritten += nwritten;
c->ref_block_pos += nwritten;
/* Determine the last block and buffer position based on thread context */
if (inMainThread()) {
last_node = listLast(server.repl_buffer_blocks);
if (!last_node) return;
bufpos = ((replBufBlock *)listNodeValue(last_node))->used;
} else {
last_node = c->io_last_reply_block;
serverAssert(last_node != NULL);
bufpos = c->io_last_bufpos;
}

listNode *first_node = c->ref_repl_buf_node;

/*Handle the single block case */
if (first_node == last_node) {
replBufBlock *b = listNodeValue(first_node);
c->nwritten = connWrite(c->conn, b->buf + c->ref_block_pos, bufpos - c->ref_block_pos);
if (c->nwritten <= 0) {
c->write_flags |= WRITE_FLAGS_WRITE_ERROR;
}
return;
}

/* Multiple blocks case */
ssize_t total_bytes = 0;
int iovcnt = 0;
struct iovec iov_arr[IOV_MAX];
struct iovec *iov = iov_arr;
int iovmax = min(IOV_MAX, c->conn->iovcnt);

for (listNode *cur_node = first_node; cur_node != NULL && iovcnt < iovmax; cur_node = listNextNode(cur_node)) {
replBufBlock *cur_block = listNodeValue(cur_node);
size_t start = (cur_node == first_node) ? c->ref_block_pos : 0;
size_t len = (cur_node == last_node) ? bufpos : cur_block->used;
len -= start;

iov[iovcnt].iov_base = cur_block->buf + start;
iov[iovcnt].iov_len = len;
total_bytes += len;
iovcnt++;
}

if (total_bytes == 0) return;

/* If we fully sent the object on head, go to the next one. */
listNode *next = listNextNode(c->ref_repl_buf_node);
if (next && c->ref_block_pos == o->used) {
o->refcount--;
((replBufBlock *)(listNodeValue(next)))->refcount++;
c->ref_repl_buf_node = next;
c->ref_block_pos = 0;
incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
ssize_t totwritten = 0;
while (iovcnt > 0) {
int nwritten = connWritev(c->conn, iov, iovcnt);

if (nwritten <= 0) {
c->write_flags |= WRITE_FLAGS_WRITE_ERROR;
c->nwritten = (totwritten > 0) ? totwritten : nwritten;
return;
}

totwritten += nwritten;

if (totwritten == total_bytes) {
break;
}

/* Update iov array */
while (nwritten > 0) {
if ((size_t)nwritten < iov[0].iov_len) {
/* partial block written */
iov[0].iov_base = (char *)iov[0].iov_base + nwritten;
iov[0].iov_len -= nwritten;
break;
}

/* full block written */
nwritten -= iov[0].iov_len;
iov++;
iovcnt--;
}
}

c->nwritten = totwritten;
}

/* This function should be called from _writeToClient when the reply list is not empty,
Expand Down Expand Up @@ -2230,6 +2315,8 @@ int postWriteToClient(client *c) {
server.stat_total_writes_processed++;
if (getClientType(c) != CLIENT_TYPE_REPLICA) {
_postWriteToClient(c);
} else {
postWriteToReplica(c);
}

if (c->write_flags & WRITE_FLAGS_WRITE_ERROR) {
Expand Down Expand Up @@ -2788,7 +2875,7 @@ void processMultibulkBuffer(client *c) {
serverAssertWithInfo(c, NULL, c->argc == 0);

/* Multi bulk length cannot be read without a \r\n */
newline = strchr(c->querybuf + c->qb_pos, '\r');
newline = memchr(c->querybuf + c->qb_pos, '\r', sdslen(c->querybuf) - c->qb_pos);
if (newline == NULL) {
if (sdslen(c->querybuf) - c->qb_pos > PROTO_INLINE_MAX_SIZE) {
c->read_flags |= READ_FLAGS_ERROR_BIG_MULTIBULK;
Expand Down Expand Up @@ -2865,7 +2952,7 @@ void processMultibulkBuffer(client *c) {
while (c->multibulklen) {
/* Read bulk length if unknown */
if (c->bulklen == -1) {
newline = strchr(c->querybuf + c->qb_pos, '\r');
newline = memchr(c->querybuf + c->qb_pos, '\r', sdslen(c->querybuf) - c->qb_pos);
if (newline == NULL) {
if (sdslen(c->querybuf) - c->qb_pos > PROTO_INLINE_MAX_SIZE) {
c->read_flags |= READ_FLAGS_ERROR_BIG_BULK_COUNT;
Expand Down Expand Up @@ -4977,7 +5064,12 @@ void ioThreadWriteToClient(void *data) {
client *c = data;
serverAssert(c->io_write_state == CLIENT_PENDING_IO);
c->nwritten = 0;
_writeToClient(c);
if (c->write_flags & WRITE_FLAGS_IS_REPLICA) {
writeToReplica(c);
} else {
_writeToClient(c);
}

atomic_thread_fence(memory_order_release);
c->io_write_state = CLIENT_COMPLETED_IO;
}
4 changes: 2 additions & 2 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1270,7 +1270,7 @@ typedef struct client {
volatile uint8_t io_write_state; /* Indicate the IO write state of the client */
uint8_t cur_tid; /* ID of IO thread currently performing IO for this client */
int nread; /* Number of bytes of the last read. */
int nwritten; /* Number of bytes of the last write. */
ssize_t nwritten; /* Number of bytes of the last write. */
int read_flags; /* Client Read flags - used to communicate the client read state. */
uint16_t write_flags; /* Client Write flags - used to communicate the client write state. */
struct serverCommand *cmd, *lastcmd; /* Last command executed. */
Expand Down Expand Up @@ -2799,7 +2799,7 @@ void dictVanillaFree(void *val);

/* Write flags for various write errors and states */
#define WRITE_FLAGS_WRITE_ERROR (1 << 0)

#define WRITE_FLAGS_IS_REPLICA (1 << 1)

client *createClient(connection *conn);
void freeClient(client *c);
Expand Down
Loading

0 comments on commit 846c816

Please sign in to comment.