Skip to content

Commit

Permalink
Initial draft of zerocopy for replication streams
Browse files Browse the repository at this point in the history
Signed-off-by: Jacob Murphy <[email protected]>
  • Loading branch information
murphyjacob4 committed Nov 26, 2024
1 parent 33f42d7 commit 855a059
Show file tree
Hide file tree
Showing 11 changed files with 312 additions and 8 deletions.
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ endif
ENGINE_NAME=valkey
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o zerocopy.o
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX)
Expand Down
8 changes: 8 additions & 0 deletions src/anet.c
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,14 @@ int anetRecvTimeout(char *err, int fd, long long ms) {
return ANET_OK;
}

int anetSetZeroCopy(char *err, int fd, int setting) {
if (setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &setting, sizeof(setting)) < 0) {
anetSetError(err, "setsockopt SO_ZEROCOPY: %s", strerror(errno));
return ANET_ERR;
}
return ANET_OK;
}

/* Resolve the hostname "host" and set the string representation of the
* IP address into the buffer pointed by "ipbuf".
*
Expand Down
1 change: 1 addition & 0 deletions src/anet.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ int anetEnableTcpNoDelay(char *err, int fd);
int anetDisableTcpNoDelay(char *err, int fd);
int anetSendTimeout(char *err, int fd, long long ms);
int anetRecvTimeout(char *err, int fd, long long ms);
int anetSetZeroCopy(char *err, int fd, int setting);
int anetFdToString(int fd, char *ip, size_t ip_len, int *port, int remote);
int anetKeepAlive(char *err, int fd, int interval);
int anetFormatAddr(char *fmt, size_t fmt_len, char *ip, int port);
Expand Down
18 changes: 18 additions & 0 deletions src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <stdio.h>
#include <string.h>
#include <sys/uio.h>
#include <sys/socket.h>

#include "ae.h"

Expand Down Expand Up @@ -101,10 +102,13 @@ typedef struct ConnectionType {
int (*read)(struct connection *conn, void *buf, size_t buf_len);
int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier);
int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler);
int (*set_error_queue_handler)(struct connection *conn, ConnectionCallbackFunc handler);
const char *(*get_last_error)(struct connection *conn);
ssize_t (*sync_write)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
ssize_t (*sync_read)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
ssize_t (*sync_readline)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
ssize_t (*send)(struct connection *conn, const void *data, size_t data_len, int flags);
ssize_t (*recvmsg)(struct connection *conn, struct msghdr *msg, int flags);

/* pending data */
int (*has_pending_data)(void);
Expand Down Expand Up @@ -132,6 +136,7 @@ struct connection {
ConnectionCallbackFunc conn_handler;
ConnectionCallbackFunc write_handler;
ConnectionCallbackFunc read_handler;
ConnectionCallbackFunc err_queue_handler;
};

#define CONFIG_BINDADDR_MAX 16
Expand Down Expand Up @@ -255,6 +260,10 @@ static inline int connSetWriteHandlerWithBarrier(connection *conn, ConnectionCal
return conn->type->set_write_handler(conn, func, barrier);
}

static inline int connSetErrorQueueHandler(connection *conn, ConnectionCallbackFunc func) {
return conn->type->set_error_queue_handler(conn, func);
}

static inline void connShutdown(connection *conn) {
conn->type->shutdown(conn);
}
Expand Down Expand Up @@ -282,6 +291,14 @@ static inline ssize_t connSyncReadLine(connection *conn, char *ptr, ssize_t size
return conn->type->sync_readline(conn, ptr, size, timeout);
}

static inline ssize_t connSend(connection *conn, const void *data, size_t data_len, int flags) {
return conn->type->send(conn, data, data_len, flags);
}

static inline ssize_t connRecvMsg(connection *conn, struct msghdr *msg, int flags) {
return conn->type->recvmsg(conn, msg, flags);
}

/* Return CONN_TYPE_* for the specified connection */
static inline const char *connGetType(connection *conn) {
return conn->type->get_type(conn);
Expand Down Expand Up @@ -379,6 +396,7 @@ int connDisableTcpNoDelay(connection *conn);
int connKeepAlive(connection *conn, int interval);
int connSendTimeout(connection *conn, long long ms);
int connRecvTimeout(connection *conn, long long ms);
int connSetZeroCopy(connection *conn, int setting);

/* Get cert for the secure connection */
static inline sds connGetPeerCert(connection *conn) {
Expand Down
85 changes: 81 additions & 4 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@
#include "fpconv_dtoa.h"
#include "fmtargs.h"
#include "io_threads.h"
#include "zerocopy.h"
#include <strings.h>
#include <sys/socket.h>
#include <sys/uio.h>
#include <math.h>
#include <ctype.h>
#include <stdatomic.h>
#include <linux/errqueue.h>

static void setProtocolError(const char *errstr, client *c);
static void pauseClientsByClient(mstime_t end, int isPauseClientAll);
Expand Down Expand Up @@ -234,6 +236,7 @@ client *createClient(connection *conn) {
c->commands_processed = 0;
c->io_last_reply_block = NULL;
c->io_last_bufpos = 0;
c->zero_copy_buffer = createZeroCopyRecordBuffer();
return c;
}

Expand Down Expand Up @@ -1828,6 +1831,7 @@ void freeClient(client *c) {
sdsfree(c->peerid);
sdsfree(c->sockname);
sdsfree(c->replica_addr);
freeZeroCopyRecordBuffer(c->zero_copy_buffer);
zfree(c);
}

Expand Down Expand Up @@ -1994,34 +1998,107 @@ client *lookupClientByID(uint64_t id) {
return c;
}

void handleZeroCopyMessage(connection *conn) {
struct msghdr msg;
struct iovec iov;
char control[1024*10];
struct cmsghdr *cmsg;
struct sock_extended_err *serr;

iov.iov_base = NULL;
iov.iov_len = 0;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_control = control;
msg.msg_controllen = sizeof(control);

if (connRecvMsg(conn, &msg, MSG_ERRQUEUE) == -1) {
if (errno == EAGAIN) {
/* This callback fires for all readable events, so sometimes it is a no-op. */
return;
}
serverLog(LL_WARNING, "Got callback for error message but got recvmsg error: %s", strerror(errno));
return;
}
for (cmsg = CMSG_FIRSTHDR(&msg); cmsg != NULL; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
if (cmsg->cmsg_level != SOL_IP || cmsg->cmsg_type != IP_RECVERR) {
continue;
}
serr = (struct sock_extended_err *) CMSG_DATA(cmsg);
if (serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
continue;
}
client * c = (client *) connGetPrivateData(conn);

/* Mark the received messages as finished. */
const uint32_t begin = serr->ee_info;
const uint32_t end = serr->ee_data;
for (size_t i = begin; i <= end; i++) {
zeroCopyRecord *zcp = zeroCopyRecordBufferGet(c->zero_copy_buffer, i);
serverAssert(zcp != NULL);
zcp->active = 0;
}

/* Trim the front of the buffer up until the next outstanding write. */
zeroCopyRecord *head = zeroCopyRecordBufferFront(c->zero_copy_buffer);
while (head != NULL && head->active == 0) {
if (head->last_write_for_block) {
head->block->refcount--;
incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
}
zeroCopyRecordBufferPop(c->zero_copy_buffer);
head = zeroCopyRecordBufferFront(c->zero_copy_buffer);
}
if (c->zero_copy_buffer->len == 0)
connSetErrorQueueHandler(c->conn, NULL);
}
}

void writeToReplica(client *c) {
/* Can be called from main-thread only as replica write offload is not supported yet */
serverAssert(inMainThread());
int nwritten = 0;
ssize_t nwritten = 0;
zeroCopyRecord *zcp = NULL;
serverAssert(c->bufpos == 0 && listLength(c->reply) == 0);
while (clientHasPendingReplies(c)) {
replBufBlock *o = listNodeValue(c->ref_repl_buf_node);
serverAssert(o->used >= c->ref_block_pos);

/* Send current block if it is not fully sent. */
if (o->used > c->ref_block_pos) {
nwritten = connWrite(c->conn, o->buf + c->ref_block_pos, o->used - c->ref_block_pos);
nwritten = connSend(c->conn, o->buf + c->ref_block_pos, o->used - c->ref_block_pos, MSG_ZEROCOPY);
if (nwritten <= 0) {
c->write_flags |= WRITE_FLAGS_WRITE_ERROR;
return;
}
c->nwritten += nwritten;
c->ref_block_pos += nwritten;

zcp = zeroCopyRecordBufferExtend(c->zero_copy_buffer);
zcp->block = o;
zcp->active = 1;
zcp->last_write_for_block = 0;
connSetErrorQueueHandler(c->conn, handleZeroCopyMessage);
}

if (!zcp) {
zcp = zeroCopyRecordBufferEnd(c->zero_copy_buffer);
}

/* If we fully sent the object on head, go to the next one. */
listNode *next = listNextNode(c->ref_repl_buf_node);
if (next && c->ref_block_pos == o->used) {
o->refcount--;
if (!zcp) {
/* The write has already finished for all outgoing writes, we can free this block inline. */
o->refcount--;
incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
} else {
/* There is an ongoing write, simply mark that write as needing to free this block. */
zcp->last_write_for_block = 1;
}
((replBufBlock *)(listNodeValue(next)))->refcount++;
c->ref_repl_buf_node = next;
c->ref_block_pos = 0;
incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -1036,6 +1036,9 @@ void syncCommand(client *c) {
/* ignore SYNC if already replica or in monitor mode */
if (c->flag.replica) return;

// TODO murphyjacob4 find a better place for this
connSetZeroCopy(c->conn, 1);

/* Check if this is a failover request to a replica with the same replid and
* become a primary if so. */
if (c->argc > 3 && !strcasecmp(c->argv[0]->ptr, "psync") && !strcasecmp(c->argv[3]->ptr, "failover")) {
Expand Down Expand Up @@ -4737,8 +4740,8 @@ void replicationCron(void) {
* replicas number + 1(replication backlog). */
if (listLength(server.repl_buffer_blocks) > 0) {
replBufBlock *o = listNodeValue(listFirst(server.repl_buffer_blocks));
serverAssert(o->refcount > 0 &&
o->refcount <= (int)listLength(server.replicas) + 1 + (int)raxSize(server.replicas_waiting_psync));
serverAssert(o->refcount > 0);
serverAssert(o->refcount <= (int)listLength(server.replicas) + 1 + (int)raxSize(server.replicas_waiting_psync));
}

/* Refresh the number of replicas with lag <= min-replicas-max-lag. */
Expand Down
17 changes: 17 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1238,6 +1238,21 @@ typedef struct ClientFlags {
uint64_t reserved : 4; /* Reserved for future use */
} ClientFlags;

/* Tracking struct used to decremenet reference count of repl backlog block
* once written to replica by the kernel. */
typedef struct zeroCopyRecord {
replBufBlock *block;
int active;
int last_write_for_block;
} zeroCopyRecord;

typedef struct zeroCopyRecordBuffer {
zeroCopyRecord *records;
size_t start;
size_t len;
size_t capacity;
} zeroCopyRecordBuffer;

typedef struct client {
uint64_t id; /* Client incremental unique ID. */
union {
Expand Down Expand Up @@ -1363,6 +1378,8 @@ typedef struct client {
* see the definition of replBufBlock. */
size_t ref_block_pos; /* Access position of referenced buffer block,
* i.e. the next offset to send. */
zeroCopyRecordBuffer *zero_copy_buffer; /* Circular buffer of active writes, indexed
* by sequence number. */

/* list node in clients_pending_write or in clients_pending_io_write list */
listNode clients_pending_write_node;
Expand Down
35 changes: 34 additions & 1 deletion src/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -238,13 +238,25 @@ static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc fun
if (func == conn->read_handler) return C_OK;

conn->read_handler = func;
if (!conn->read_handler)
if (!conn->read_handler && !conn->err_queue_handler)
aeDeleteFileEvent(server.el, conn->fd, AE_READABLE);
else if (aeCreateFileEvent(server.el, conn->fd, AE_READABLE, conn->type->ae_handler, conn) == AE_ERR)
return C_ERR;
return C_OK;
}

static int connSocketSetErrorQueueHandler(connection *conn, ConnectionCallbackFunc func) {
if (func == conn->err_queue_handler) return C_OK;

conn->err_queue_handler = func;
if (!conn->err_queue_handler && !conn->read_handler) {
aeDeleteFileEvent(server.el, conn->fd, AE_READABLE);
} else if (aeCreateFileEvent(server.el, conn->fd, AE_READABLE, conn->type->ae_handler, conn) == AE_ERR) {
return C_ERR;
}
return C_OK;
}

static const char *connSocketGetLastError(connection *conn) {
return strerror(conn->last_errno);
}
Expand Down Expand Up @@ -284,6 +296,11 @@ static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientD

int call_write = (mask & AE_WRITABLE) && conn->write_handler;
int call_read = (mask & AE_READABLE) && conn->read_handler;
int call_err_queue = (mask & AE_READABLE) && conn->err_queue_handler;

if (call_err_queue) {
if (!callHandler(conn, conn->err_queue_handler)) return;
}

/* Handle normal I/O flows */
if (!invert && call_read) {
Expand Down Expand Up @@ -374,6 +391,14 @@ static ssize_t connSocketSyncReadLine(connection *conn, char *ptr, ssize_t size,
return syncReadLine(conn->fd, ptr, size, timeout);
}

static ssize_t connSocketSend(connection *conn, const void *data, size_t data_len, int flags) {
return send(conn->fd, data, data_len, flags);
}

static ssize_t connSocketRecvMsg(connection *conn, struct msghdr *msg, int flags) {
return recvmsg(conn->fd, msg, flags);
}

static const char *connSocketGetType(connection *conn) {
(void)conn;

Expand Down Expand Up @@ -413,10 +438,13 @@ static ConnectionType CT_Socket = {
.read = connSocketRead,
.set_write_handler = connSocketSetWriteHandler,
.set_read_handler = connSocketSetReadHandler,
.set_error_queue_handler = connSocketSetErrorQueueHandler,
.get_last_error = connSocketGetLastError,
.sync_write = connSocketSyncWrite,
.sync_read = connSocketSyncRead,
.sync_readline = connSocketSyncReadLine,
.send = connSocketSend,
.recvmsg = connSocketRecvMsg,

/* pending data */
.has_pending_data = NULL,
Expand Down Expand Up @@ -458,6 +486,11 @@ int connRecvTimeout(connection *conn, long long ms) {
return anetRecvTimeout(NULL, conn->fd, ms);
}

int connSetZeroCopy(connection *conn, int setting) {
serverLog(LL_NOTICE, "Setting zero copy to %d for fd %d", setting, conn->fd);
return anetSetZeroCopy(NULL, conn->fd, setting);
}

int RedisRegisterConnectionTypeSocket(void) {
return connTypeRegister(&CT_Socket);
}
Loading

0 comments on commit 855a059

Please sign in to comment.