From 855a05975932378df060ff0257ccff39fc5f0dce Mon Sep 17 00:00:00 2001 From: Jacob Murphy Date: Tue, 26 Nov 2024 00:26:08 +0000 Subject: [PATCH] Initial draft of zerocopy for replication streams Signed-off-by: Jacob Murphy --- src/Makefile | 2 +- src/anet.c | 8 +++++ src/anet.h | 1 + src/connection.h | 18 ++++++++++ src/networking.c | 85 ++++++++++++++++++++++++++++++++++++++++++++--- src/replication.c | 7 ++-- src/server.h | 17 ++++++++++ src/socket.c | 35 ++++++++++++++++++- src/zerocopy.c | 72 +++++++++++++++++++++++++++++++++++++++ src/zerocopy.h | 29 ++++++++++++++++ zerocopy_test.sh | 46 +++++++++++++++++++++++++ 11 files changed, 312 insertions(+), 8 deletions(-) create mode 100644 src/zerocopy.c create mode 100644 src/zerocopy.h create mode 100644 zerocopy_test.sh diff --git a/src/Makefile b/src/Makefile index f876f55dec..f97db4ab7b 100644 --- a/src/Makefile +++ b/src/Makefile @@ -411,7 +411,7 @@ endif ENGINE_NAME=valkey SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX) ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX) -ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o +ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o zerocopy.o ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX) ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX) diff --git a/src/anet.c b/src/anet.c index 8dc06ca62e..f49f8b95cf 100644 --- a/src/anet.c +++ b/src/anet.c @@ -335,6 +335,14 @@ int anetRecvTimeout(char *err, int fd, long long ms) { return ANET_OK; } +int anetSetZeroCopy(char *err, int fd, int setting) { + if (setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &setting, sizeof(setting)) < 0) { + anetSetError(err, "setsockopt SO_ZEROCOPY: %s", strerror(errno)); + return ANET_ERR; + } + return ANET_OK; +} + /* Resolve the hostname "host" and set the string representation of the * IP address into the buffer pointed by "ipbuf". * diff --git a/src/anet.h b/src/anet.h index b14b4bdaad..cfed2e20f3 100644 --- a/src/anet.h +++ b/src/anet.h @@ -67,6 +67,7 @@ int anetEnableTcpNoDelay(char *err, int fd); int anetDisableTcpNoDelay(char *err, int fd); int anetSendTimeout(char *err, int fd, long long ms); int anetRecvTimeout(char *err, int fd, long long ms); +int anetSetZeroCopy(char *err, int fd, int setting); int anetFdToString(int fd, char *ip, size_t ip_len, int *port, int remote); int anetKeepAlive(char *err, int fd, int interval); int anetFormatAddr(char *fmt, size_t fmt_len, char *ip, int port); diff --git a/src/connection.h b/src/connection.h index 0762441732..841a64101c 100644 --- a/src/connection.h +++ b/src/connection.h @@ -35,6 +35,7 @@ #include #include #include +#include #include "ae.h" @@ -101,10 +102,13 @@ typedef struct ConnectionType { int (*read)(struct connection *conn, void *buf, size_t buf_len); int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier); int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler); + int (*set_error_queue_handler)(struct connection *conn, ConnectionCallbackFunc handler); const char *(*get_last_error)(struct connection *conn); ssize_t (*sync_write)(struct connection *conn, char *ptr, ssize_t size, long long timeout); ssize_t (*sync_read)(struct connection *conn, char *ptr, ssize_t size, long long timeout); ssize_t (*sync_readline)(struct connection *conn, char *ptr, ssize_t size, long long timeout); + ssize_t (*send)(struct connection *conn, const void *data, size_t data_len, int flags); + ssize_t (*recvmsg)(struct connection *conn, struct msghdr *msg, int flags); /* pending data */ int (*has_pending_data)(void); @@ -132,6 +136,7 @@ struct connection { ConnectionCallbackFunc conn_handler; ConnectionCallbackFunc write_handler; ConnectionCallbackFunc read_handler; + ConnectionCallbackFunc err_queue_handler; }; #define CONFIG_BINDADDR_MAX 16 @@ -255,6 +260,10 @@ static inline int connSetWriteHandlerWithBarrier(connection *conn, ConnectionCal return conn->type->set_write_handler(conn, func, barrier); } +static inline int connSetErrorQueueHandler(connection *conn, ConnectionCallbackFunc func) { + return conn->type->set_error_queue_handler(conn, func); +} + static inline void connShutdown(connection *conn) { conn->type->shutdown(conn); } @@ -282,6 +291,14 @@ static inline ssize_t connSyncReadLine(connection *conn, char *ptr, ssize_t size return conn->type->sync_readline(conn, ptr, size, timeout); } +static inline ssize_t connSend(connection *conn, const void *data, size_t data_len, int flags) { + return conn->type->send(conn, data, data_len, flags); +} + +static inline ssize_t connRecvMsg(connection *conn, struct msghdr *msg, int flags) { + return conn->type->recvmsg(conn, msg, flags); +} + /* Return CONN_TYPE_* for the specified connection */ static inline const char *connGetType(connection *conn) { return conn->type->get_type(conn); @@ -379,6 +396,7 @@ int connDisableTcpNoDelay(connection *conn); int connKeepAlive(connection *conn, int interval); int connSendTimeout(connection *conn, long long ms); int connRecvTimeout(connection *conn, long long ms); +int connSetZeroCopy(connection *conn, int setting); /* Get cert for the secure connection */ static inline sds connGetPeerCert(connection *conn) { diff --git a/src/networking.c b/src/networking.c index 93aa9d00ae..6325493812 100644 --- a/src/networking.c +++ b/src/networking.c @@ -35,12 +35,14 @@ #include "fpconv_dtoa.h" #include "fmtargs.h" #include "io_threads.h" +#include "zerocopy.h" #include #include #include #include #include #include +#include static void setProtocolError(const char *errstr, client *c); static void pauseClientsByClient(mstime_t end, int isPauseClientAll); @@ -234,6 +236,7 @@ client *createClient(connection *conn) { c->commands_processed = 0; c->io_last_reply_block = NULL; c->io_last_bufpos = 0; + c->zero_copy_buffer = createZeroCopyRecordBuffer(); return c; } @@ -1828,6 +1831,7 @@ void freeClient(client *c) { sdsfree(c->peerid); sdsfree(c->sockname); sdsfree(c->replica_addr); + freeZeroCopyRecordBuffer(c->zero_copy_buffer); zfree(c); } @@ -1994,10 +1998,67 @@ client *lookupClientByID(uint64_t id) { return c; } +void handleZeroCopyMessage(connection *conn) { + struct msghdr msg; + struct iovec iov; + char control[1024*10]; + struct cmsghdr *cmsg; + struct sock_extended_err *serr; + + iov.iov_base = NULL; + iov.iov_len = 0; + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + msg.msg_control = control; + msg.msg_controllen = sizeof(control); + + if (connRecvMsg(conn, &msg, MSG_ERRQUEUE) == -1) { + if (errno == EAGAIN) { + /* This callback fires for all readable events, so sometimes it is a no-op. */ + return; + } + serverLog(LL_WARNING, "Got callback for error message but got recvmsg error: %s", strerror(errno)); + return; + } + for (cmsg = CMSG_FIRSTHDR(&msg); cmsg != NULL; cmsg = CMSG_NXTHDR(&msg, cmsg)) { + if (cmsg->cmsg_level != SOL_IP || cmsg->cmsg_type != IP_RECVERR) { + continue; + } + serr = (struct sock_extended_err *) CMSG_DATA(cmsg); + if (serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { + continue; + } + client * c = (client *) connGetPrivateData(conn); + + /* Mark the received messages as finished. */ + const uint32_t begin = serr->ee_info; + const uint32_t end = serr->ee_data; + for (size_t i = begin; i <= end; i++) { + zeroCopyRecord *zcp = zeroCopyRecordBufferGet(c->zero_copy_buffer, i); + serverAssert(zcp != NULL); + zcp->active = 0; + } + + /* Trim the front of the buffer up until the next outstanding write. */ + zeroCopyRecord *head = zeroCopyRecordBufferFront(c->zero_copy_buffer); + while (head != NULL && head->active == 0) { + if (head->last_write_for_block) { + head->block->refcount--; + incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); + } + zeroCopyRecordBufferPop(c->zero_copy_buffer); + head = zeroCopyRecordBufferFront(c->zero_copy_buffer); + } + if (c->zero_copy_buffer->len == 0) + connSetErrorQueueHandler(c->conn, NULL); + } +} + void writeToReplica(client *c) { /* Can be called from main-thread only as replica write offload is not supported yet */ serverAssert(inMainThread()); - int nwritten = 0; + ssize_t nwritten = 0; + zeroCopyRecord *zcp = NULL; serverAssert(c->bufpos == 0 && listLength(c->reply) == 0); while (clientHasPendingReplies(c)) { replBufBlock *o = listNodeValue(c->ref_repl_buf_node); @@ -2005,23 +2066,39 @@ void writeToReplica(client *c) { /* Send current block if it is not fully sent. */ if (o->used > c->ref_block_pos) { - nwritten = connWrite(c->conn, o->buf + c->ref_block_pos, o->used - c->ref_block_pos); + nwritten = connSend(c->conn, o->buf + c->ref_block_pos, o->used - c->ref_block_pos, MSG_ZEROCOPY); if (nwritten <= 0) { c->write_flags |= WRITE_FLAGS_WRITE_ERROR; return; } c->nwritten += nwritten; c->ref_block_pos += nwritten; + + zcp = zeroCopyRecordBufferExtend(c->zero_copy_buffer); + zcp->block = o; + zcp->active = 1; + zcp->last_write_for_block = 0; + connSetErrorQueueHandler(c->conn, handleZeroCopyMessage); + } + + if (!zcp) { + zcp = zeroCopyRecordBufferEnd(c->zero_copy_buffer); } /* If we fully sent the object on head, go to the next one. */ listNode *next = listNextNode(c->ref_repl_buf_node); if (next && c->ref_block_pos == o->used) { - o->refcount--; + if (!zcp) { + /* The write has already finished for all outgoing writes, we can free this block inline. */ + o->refcount--; + incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); + } else { + /* There is an ongoing write, simply mark that write as needing to free this block. */ + zcp->last_write_for_block = 1; + } ((replBufBlock *)(listNodeValue(next)))->refcount++; c->ref_repl_buf_node = next; c->ref_block_pos = 0; - incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); } } } diff --git a/src/replication.c b/src/replication.c index 1654847bd6..62b39d8122 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1036,6 +1036,9 @@ void syncCommand(client *c) { /* ignore SYNC if already replica or in monitor mode */ if (c->flag.replica) return; + // TODO murphyjacob4 find a better place for this + connSetZeroCopy(c->conn, 1); + /* Check if this is a failover request to a replica with the same replid and * become a primary if so. */ if (c->argc > 3 && !strcasecmp(c->argv[0]->ptr, "psync") && !strcasecmp(c->argv[3]->ptr, "failover")) { @@ -4737,8 +4740,8 @@ void replicationCron(void) { * replicas number + 1(replication backlog). */ if (listLength(server.repl_buffer_blocks) > 0) { replBufBlock *o = listNodeValue(listFirst(server.repl_buffer_blocks)); - serverAssert(o->refcount > 0 && - o->refcount <= (int)listLength(server.replicas) + 1 + (int)raxSize(server.replicas_waiting_psync)); + serverAssert(o->refcount > 0); + serverAssert(o->refcount <= (int)listLength(server.replicas) + 1 + (int)raxSize(server.replicas_waiting_psync)); } /* Refresh the number of replicas with lag <= min-replicas-max-lag. */ diff --git a/src/server.h b/src/server.h index f4c7306009..116f2d0658 100644 --- a/src/server.h +++ b/src/server.h @@ -1238,6 +1238,21 @@ typedef struct ClientFlags { uint64_t reserved : 4; /* Reserved for future use */ } ClientFlags; +/* Tracking struct used to decremenet reference count of repl backlog block + * once written to replica by the kernel. */ +typedef struct zeroCopyRecord { + replBufBlock *block; + int active; + int last_write_for_block; +} zeroCopyRecord; + +typedef struct zeroCopyRecordBuffer { + zeroCopyRecord *records; + size_t start; + size_t len; + size_t capacity; +} zeroCopyRecordBuffer; + typedef struct client { uint64_t id; /* Client incremental unique ID. */ union { @@ -1363,6 +1378,8 @@ typedef struct client { * see the definition of replBufBlock. */ size_t ref_block_pos; /* Access position of referenced buffer block, * i.e. the next offset to send. */ + zeroCopyRecordBuffer *zero_copy_buffer; /* Circular buffer of active writes, indexed + * by sequence number. */ /* list node in clients_pending_write or in clients_pending_io_write list */ listNode clients_pending_write_node; diff --git a/src/socket.c b/src/socket.c index 7344d66ad8..67aa96aeef 100644 --- a/src/socket.c +++ b/src/socket.c @@ -238,13 +238,25 @@ static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc fun if (func == conn->read_handler) return C_OK; conn->read_handler = func; - if (!conn->read_handler) + if (!conn->read_handler && !conn->err_queue_handler) aeDeleteFileEvent(server.el, conn->fd, AE_READABLE); else if (aeCreateFileEvent(server.el, conn->fd, AE_READABLE, conn->type->ae_handler, conn) == AE_ERR) return C_ERR; return C_OK; } +static int connSocketSetErrorQueueHandler(connection *conn, ConnectionCallbackFunc func) { + if (func == conn->err_queue_handler) return C_OK; + + conn->err_queue_handler = func; + if (!conn->err_queue_handler && !conn->read_handler) { + aeDeleteFileEvent(server.el, conn->fd, AE_READABLE); + } else if (aeCreateFileEvent(server.el, conn->fd, AE_READABLE, conn->type->ae_handler, conn) == AE_ERR) { + return C_ERR; + } + return C_OK; +} + static const char *connSocketGetLastError(connection *conn) { return strerror(conn->last_errno); } @@ -284,6 +296,11 @@ static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientD int call_write = (mask & AE_WRITABLE) && conn->write_handler; int call_read = (mask & AE_READABLE) && conn->read_handler; + int call_err_queue = (mask & AE_READABLE) && conn->err_queue_handler; + + if (call_err_queue) { + if (!callHandler(conn, conn->err_queue_handler)) return; + } /* Handle normal I/O flows */ if (!invert && call_read) { @@ -374,6 +391,14 @@ static ssize_t connSocketSyncReadLine(connection *conn, char *ptr, ssize_t size, return syncReadLine(conn->fd, ptr, size, timeout); } +static ssize_t connSocketSend(connection *conn, const void *data, size_t data_len, int flags) { + return send(conn->fd, data, data_len, flags); +} + +static ssize_t connSocketRecvMsg(connection *conn, struct msghdr *msg, int flags) { + return recvmsg(conn->fd, msg, flags); +} + static const char *connSocketGetType(connection *conn) { (void)conn; @@ -413,10 +438,13 @@ static ConnectionType CT_Socket = { .read = connSocketRead, .set_write_handler = connSocketSetWriteHandler, .set_read_handler = connSocketSetReadHandler, + .set_error_queue_handler = connSocketSetErrorQueueHandler, .get_last_error = connSocketGetLastError, .sync_write = connSocketSyncWrite, .sync_read = connSocketSyncRead, .sync_readline = connSocketSyncReadLine, + .send = connSocketSend, + .recvmsg = connSocketRecvMsg, /* pending data */ .has_pending_data = NULL, @@ -458,6 +486,11 @@ int connRecvTimeout(connection *conn, long long ms) { return anetRecvTimeout(NULL, conn->fd, ms); } +int connSetZeroCopy(connection *conn, int setting) { + serverLog(LL_NOTICE, "Setting zero copy to %d for fd %d", setting, conn->fd); + return anetSetZeroCopy(NULL, conn->fd, setting); +} + int RedisRegisterConnectionTypeSocket(void) { return connTypeRegister(&CT_Socket); } diff --git a/src/zerocopy.c b/src/zerocopy.c new file mode 100644 index 0000000000..1e83e6695e --- /dev/null +++ b/src/zerocopy.c @@ -0,0 +1,72 @@ +#include "server.h" +#include "zerocopy.h" +#include + +zeroCopyRecordBuffer *createZeroCopyRecordBuffer() { + zeroCopyRecordBuffer *result = (zeroCopyRecordBuffer *) zmalloc(sizeof(zeroCopyRecordBuffer)); + result->start = 0; + result->len = 0; + result->capacity = ZERO_COPY_RECORD_BUF_INIT_SIZE; + result->records = (zeroCopyRecord *) zmalloc(sizeof(zeroCopyRecord) * ZERO_COPY_RECORD_BUF_INIT_SIZE); + return result; +} + +zeroCopyRecord *zeroCopyRecordBufferGet(zeroCopyRecordBuffer *buf, size_t index) { + if (index < buf->start || index >= buf->start + buf->len) { + return NULL; + } + return &(buf->records[index % buf->capacity]); +} + +zeroCopyRecord *zeroCopyRecordBufferFront(zeroCopyRecordBuffer *buf) { + if (buf->len == 0) { + return NULL; + } + return zeroCopyRecordBufferGet(buf, buf->start); +} + +void zeroCopyRecordBufferResize(zeroCopyRecordBuffer *buf, size_t target_capacity) { + zeroCopyRecord *old = buf->records; + size_t old_capacity = buf->capacity; + buf->records = zmalloc(target_capacity * sizeof(zeroCopyRecord)); + for (size_t i = buf->start; i < buf->start + buf->len; i++) { + buf->records[i % target_capacity] = old[i % old_capacity]; + } + buf->capacity = target_capacity; + zfree(old); +} + +void zeroCopyRecordBufferPop(zeroCopyRecordBuffer *buf) { + buf->start++; + buf->len--; + if (buf->capacity > ZERO_COPY_RECORD_BUF_INIT_SIZE && buf->len <= buf->capacity / 2) { + zeroCopyRecordBufferResize(buf, buf->capacity / 2); + } +} + +zeroCopyRecord *zeroCopyRecordBufferExtend(zeroCopyRecordBuffer *buf) { + if (buf->len == buf->capacity) { + zeroCopyRecordBufferResize(buf, buf->capacity * 2); + } + return zeroCopyRecordBufferGet(buf, buf->start + buf->len++); +} + +zeroCopyRecord *zeroCopyRecordBufferEnd(zeroCopyRecordBuffer *buf) { + if (buf->len == 0) { + return NULL; + } + return zeroCopyRecordBufferGet(buf, buf->len - 1); +} + +void freeZeroCopyRecordBuffer(zeroCopyRecordBuffer *buf) { + zeroCopyRecord *head = zeroCopyRecordBufferFront(buf); + while (head != NULL) { + if (head->last_write_for_block) { + serverAssert(head->block->refcount > 0); + head->block->refcount--; + incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); + } + zeroCopyRecordBufferPop(buf); + head = zeroCopyRecordBufferFront(buf); + } +} \ No newline at end of file diff --git a/src/zerocopy.h b/src/zerocopy.h new file mode 100644 index 0000000000..da57961c18 --- /dev/null +++ b/src/zerocopy.h @@ -0,0 +1,29 @@ +#ifndef ZEROCOPY_H +#define ZEROCOPY_H + +#include "server.h" + +#define ZERO_COPY_RECORD_BUF_INIT_SIZE 1024 + +/* Create a new zero copy record buffer with the given capacity. */ +zeroCopyRecordBuffer *createZeroCopyRecordBuffer(void); + +/* Free an existing zero copy buffer and decrement reference counts as needed*/ +void freeZeroCopyRecordBuffer(zeroCopyRecordBuffer *buf); + +/* Get a zero copy record for a specific sequence number. */ +zeroCopyRecord *zeroCopyRecordBufferGet(zeroCopyRecordBuffer *buf, size_t index); + +/* Get the first zero copy record in the buffer. */ +zeroCopyRecord *zeroCopyRecordBufferFront(zeroCopyRecordBuffer *buf); + +/* Remove the first zero copy record in the buffer. */ +void zeroCopyRecordBufferPop(zeroCopyRecordBuffer *buf); + +/* Add a new zero copy record to the end of the buffer. */ +zeroCopyRecord *zeroCopyRecordBufferExtend(zeroCopyRecordBuffer *buf); + +/* Get the last zero copy record in the buffer. */ +zeroCopyRecord *zeroCopyRecordBufferEnd(zeroCopyRecordBuffer *buf); + +#endif /* ZEROCOPY_H */ \ No newline at end of file diff --git a/zerocopy_test.sh b/zerocopy_test.sh new file mode 100644 index 0000000000..7b7f84dc25 --- /dev/null +++ b/zerocopy_test.sh @@ -0,0 +1,46 @@ +#!/bin/bash + +# Configuration +PRIMARY_HOST="127.0.0.1" +PRIMARY_PORT="6379" +REPLICA_HOST="127.0.0.1" +REPLICA_PORT="6380" +KEY_COUNT=100000 +KEY_SIZE=409600 # 40 KiB + +# Start time +START_TIME=$(date +%s%N) +echo "Start time: $START_TIME" + +# Populate Redis with memtier +echo "Filling Redis with $KEY_COUNT keys of $KEY_SIZE bytes each..." +memtier_benchmark --protocol=redis --server=$PRIMARY_HOST --port=$PRIMARY_PORT \ + --key-maximum=$KEY_COUNT --data-size=$KEY_SIZE --pipeline=64 --randomize \ + --clients=6 --threads=6 --requests=allkeys --ratio=1:0 --key-pattern=P:P +echo "Data population complete." + +PRIMARY_FILL_TIME=$(date +%s) + +# Wait for replica to catch up +echo "Waiting for replica to catch up..." +while true; do + PRIMARY_REPL_OFFSET=$(redis-cli -h $PRIMARY_HOST -p $PRIMARY_PORT info replication | grep "master_repl_offset" | awk -F: '{print $2}' | tr -d '\r') + REPLICA_REPL_OFFSET=$(redis-cli -h $REPLICA_HOST -p $REPLICA_PORT info replication | grep "master_repl_offset" | awk -F: '{print $2}' | tr -d '\r') + + if [ "$PRIMARY_REPL_OFFSET" == "$REPLICA_REPL_OFFSET" ]; then + echo "Replica caught up." + break + fi + + echo "Primary offset: $PRIMARY_REPL_OFFSET, Replica offset: $REPLICA_REPL_OFFSET. Waiting..." +done + +# End time +END_TIME=$(date +%s%N) +echo "End time: $END_TIME" + +# Calculate elapsed time +CATCH_UP_TIME=$((END_TIME - PRIMARY_FILL_TIME)) +echo "Replica catch-up time: $CATCH_UP_TIME nanoseconds" +TOTAL_TIME=$((END_TIME - START_TIME)) +echo "Total time: $TOTAL_TIME nanoseconds"