Skip to content

Commit

Permalink
Add tests and support for non-graceful termination
Browse files Browse the repository at this point in the history
Signed-off-by: Jacob Murphy <[email protected]>
  • Loading branch information
murphyjacob4 committed Jan 10, 2025
1 parent 7199031 commit 756ab02
Show file tree
Hide file tree
Showing 16 changed files with 387 additions and 65 deletions.
7 changes: 2 additions & 5 deletions src/ae.c
Original file line number Diff line number Diff line change
Expand Up @@ -531,15 +531,12 @@ int aeWait(int fd, int mask, long long milliseconds) {
pfd.fd = fd;
if (mask & AE_READABLE) pfd.events |= POLLIN;
if (mask & AE_WRITABLE) pfd.events |= POLLOUT;
if (mask & AE_ERROR_QUEUE) pfd.events |= POLLIN;
if (mask & AE_ERROR_QUEUE) pfd.events |= POLLERR;

if ((retval = poll(&pfd, 1, milliseconds)) == 1) {
if (pfd.revents & POLLIN) retmask |= AE_READABLE;
if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE;
if (pfd.revents & POLLERR) {
retmask |= AE_WRITABLE;
retmask |= AE_ERROR_QUEUE;
}
if (pfd.revents & POLLERR) retmask |= AE_ERROR_QUEUE;
if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE;
return retmask;
} else {
Expand Down
28 changes: 25 additions & 3 deletions src/anet.c
Original file line number Diff line number Diff line change
Expand Up @@ -335,19 +335,41 @@ int anetRecvTimeout(char *err, int fd, long long ms) {
return ANET_OK;
}

#ifdef HAVE_MSG_ZEROCOPY

int anetSetZeroCopy(char *err, int fd, int setting) {
#ifdef HAVE_MSG_ZEROCOPY
if (setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &setting, sizeof(setting)) < 0) {
anetSetError(err, "setsockopt SO_ZEROCOPY: %s", strerror(errno));
return ANET_ERR;
}
return ANET_OK;
}
#else
int anetSetZeroCopy(char *err, int fd, int setting) {
UNUSED(fd);
UNUSED(setting);
anetSetError(err, "anetSetZeroCopy unsupported on this platform");
return ANET_OK;
#endif
}

int anetSetForceClose(char *err, int fd, int enable) {
#ifdef HAVE_SO_LINGER
struct linger l = {
.l_onoff = enable,
.l_linger = 0,
};
if (setsockopt(fd, SOL_SOCKET, SO_LINGER, &l, sizeof(l)) < 0) {
anetSetError(err, "setsockopt SO_LINGER: %s", strerror(errno));
return ANET_ERR;
}

return ANET_OK;
#else
UNUSED(fd);
UNUSED(enable);
anetSetError(err, "anetSetForceClose unsupported on this platform");
return ANET_OK;
#endif
}

/* Resolve the hostname "host" and set the string representation of the
* IP address into the buffer pointed by "ipbuf".
Expand Down
1 change: 1 addition & 0 deletions src/anet.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ int anetDisableTcpNoDelay(char *err, int fd);
int anetSendTimeout(char *err, int fd, long long ms);
int anetRecvTimeout(char *err, int fd, long long ms);
int anetSetZeroCopy(char *err, int fd, int setting);
int anetSetForceClose(char *err, int fd, int enable);
int anetFdToString(int fd, char *ip, size_t ip_len, int *port, int remote);
int anetKeepAlive(char *err, int fd, int interval);
int anetFormatAddr(char *fmt, size_t fmt_len, char *ip, int port);
Expand Down
1 change: 1 addition & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3253,6 +3253,7 @@ standardConfig static_configs[] = {
createIntConfig("watchdog-period", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.watchdog_period, 0, INTEGER_CONFIG, NULL, updateWatchdogPeriod),
createIntConfig("shutdown-timeout", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.shutdown_timeout, 10, INTEGER_CONFIG, NULL, NULL),
createIntConfig("repl-diskless-sync-max-replicas", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_diskless_sync_max_replicas, 0, INTEGER_CONFIG, NULL, NULL),
createIntConfig("tcp-zerocopy-min-write-size", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.tcp_zerocopy_min_write_size, CONFIG_DEFAULT_ZERO_COPY_MIN_WRITE_SIZE, INTEGER_CONFIG, NULL, NULL),

/* Unsigned int configs */
createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, server.maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients),
Expand Down
7 changes: 7 additions & 0 deletions src/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@
#endif
#endif

/* SO_LINGER. */
#ifdef __linux__
#if defined(SO_LINGER)
#define HAVE_SO_LINGER 1
#endif
#endif

/* Test for polling API */
#ifdef __linux__
#define HAVE_EPOLL 1
Expand Down
2 changes: 2 additions & 0 deletions src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ typedef enum {
CONN_STATE_CONNECTING,
CONN_STATE_ACCEPTING,
CONN_STATE_CONNECTED,
CONN_STATE_SHUTDOWN,
CONN_STATE_CLOSED,
CONN_STATE_ERROR
} ConnectionState;
Expand Down Expand Up @@ -397,6 +398,7 @@ int connKeepAlive(connection *conn, int interval);
int connSendTimeout(connection *conn, long long ms);
int connRecvTimeout(connection *conn, long long ms);
int connSetZeroCopy(connection *conn, int setting);
int connSetForceClose(connection *conn, int enable);

/* Get cert for the secure connection */
static inline sds connGetPeerCert(connection *conn) {
Expand Down
5 changes: 5 additions & 0 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,8 @@ void debugCommand(client *c) {
" Grace period in seconds for replica main channel to establish psync.",
"DICT-RESIZING <0|1>",
" Enable or disable the main dict and expire dict resizing.",
"ZEROCOPY-FOR-LOOPBACK <0|1>",
" Enable or disable zerocopy IO on loopback connections.",
NULL};
addExtendedReplyHelp(c, help, clusterDebugCommandExtendedHelp());
} else if (!strcasecmp(c->argv[1]->ptr, "segfault")) {
Expand Down Expand Up @@ -1020,6 +1022,9 @@ void debugCommand(client *c) {
} else if (!strcasecmp(c->argv[1]->ptr, "dict-resizing") && c->argc == 3) {
server.dict_resizing = atoi(c->argv[2]->ptr);
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "zerocopy-for-loopback") && c->argc == 3) {
server.debug_zerocopy_bypass_loopback_check = atoi(c->argv[2]->ptr);
addReply(c, shared.ok);
} else if (!handleDebugClusterCommand(c)) {
addReplySubcommandSyntaxError(c);
return;
Expand Down
36 changes: 23 additions & 13 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ void installClientWriteHandler(client *c) {
* If we fail and there is more data to write, compared to what the socket
* buffers can hold, then we'll really install the handler. */
void putClientInPendingWriteQueue(client *c) {
serverLog(LL_WARNING, "putClientInPendingWriteQueue %d", c->conn->fd);
/* Schedule the client to write the output buffers to the socket only
* if not already done and, for replicas, if the replica can actually receive
* writes at this stage. */
Expand All @@ -275,6 +276,7 @@ void putClientInPendingWriteQueue(client *c) {
* we'll not be able to write the whole reply at once. */
c->flag.pending_write = 1;
listLinkNodeHead(server.clients_pending_write, &c->clients_pending_write_node);
serverLog(LL_WARNING, "added to queue %d", c->conn->fd);
}
}

Expand Down Expand Up @@ -316,11 +318,13 @@ int prepareClientToWrite(client *c) {
* is set. */
if (c->flag.primary && !c->flag.primary_force_reply) return C_ERR;

if (!c->conn) return C_ERR; /* Fake client for AOF loading. */
/* Don't write to fake clients for AOF loading or shutdown clients. */
if (!c->conn || c->conn->state == CONN_STATE_SHUTDOWN) return C_ERR;

/* Schedule the client to write the output buffers to the socket, unless
* it should already be setup to do so (it has already pending data). */
if (!clientHasPendingReplies(c)) putClientInPendingWriteQueue(c);
else serverLog(LL_WARNING, "already awaiting write on %d", c->conn->fd);

/* Authorize the caller to queue in the output buffer of this client. */
return C_OK;
Expand Down Expand Up @@ -1576,16 +1580,6 @@ void unlinkClient(client *c) {
} else if (c->flag.repl_rdb_channel) {
shutdown(c->conn->fd, SHUT_RDWR);
}
if (c->zero_copy_tracker && c->zero_copy_tracker->len > 0) {
/* At this point, any existing in bound TCP data should be dropped, so we make the
* kernel forcibily reset the connection now. */
struct linger l = {
.l_onoff = 1,
.l_linger = 0
};
setsockopt(c->conn->fd, SOL_SOCKET, SO_LINGER, &l, sizeof(l));
}

connClose(c->conn);
c->conn = NULL;
}
Expand Down Expand Up @@ -1688,7 +1682,7 @@ void freeClient(client *c) {
waitForClientIO(c);

/* For connected clients, call the disconnection event of modules hooks. */
if (c->conn) {
if (c->conn && c->conn->state != CONN_STATE_SHUTDOWN) {
moduleFireServerEvent(VALKEYMODULE_EVENT_CLIENT_CHANGE, VALKEYMODULE_SUBEVENT_CLIENT_CHANGE_DISCONNECTED, c);
}

Expand Down Expand Up @@ -1723,6 +1717,22 @@ void freeClient(client *c) {
}
}

/* For zero copy connections with active writes, we need to give the kernel
* some time to finish the writes it already has in flight. We will
* continue to monitor these clients and terminate them if this takes too
* long (e.g. if the receiver is not ACKing the FIN packet we send) */
if (c->zero_copy_tracker && c->zero_copy_tracker->len > 0) {
if (c->zero_copy_tracker->draining) {
if (c->last_interaction + ZERO_COPY_MAX_DRAIN_TIME_SECONDS > server.unixtime) {
/* Still draining */
return;
}
} else {
zeroCopyStartDraining(c);
return;
}
}

/* Log link disconnection with replica */
if (getClientType(c) == CLIENT_TYPE_REPLICA) {
serverLog(LL_NOTICE,
Expand Down Expand Up @@ -2022,7 +2032,7 @@ void writeToReplica(client *c) {
/* Send current block if it is not fully sent. */
if (o->used > c->ref_block_pos) {
size_t data_len = o->used - c->ref_block_pos;
int use_zerocopy = shouldUseZeroCopy(data_len);
int use_zerocopy = shouldUseZeroCopy(c->conn, data_len);
if (use_zerocopy) {
/* Lazily enable zero copy at the socket level only on first use */
if (!c->zero_copy_tracker) {
Expand Down
3 changes: 3 additions & 0 deletions src/object.c
Original file line number Diff line number Diff line change
Expand Up @@ -1231,6 +1231,9 @@ struct serverMemOverhead *getMemoryOverheadData(void) {
mh->dataset_perc = (float)mh->dataset * 100 / net_usage;
mh->bytes_per_key = mh->total_keys ? (mh->dataset / mh->total_keys) : 0;

if (server.tcp_tx_zerocopy)
mh->zero_copy_tracking = server.stat_zero_copy_tracking_memory;

return mh;
}

Expand Down
3 changes: 2 additions & 1 deletion src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ void feedReplicationBuffer(char *s, size_t len) {
len -= copy;
server.primary_repl_offset += copy;
server.repl_backlog->histlen += copy;
serverLog(LL_WARNING, "adding new replica block of size %lu", size);
}
if (empty_backlog && raxSize(server.replicas_waiting_psync) > 0) {
/* Increase refcount for pending replicas. */
Expand Down Expand Up @@ -4739,7 +4740,7 @@ void replicationCron(void) {
if (listLength(server.repl_buffer_blocks) > 0) {
replBufBlock *o = listNodeValue(listFirst(server.repl_buffer_blocks));
serverAssert(o->refcount > 0 &&
o->refcount <= (int)listLength(server.replicas) + 1 + (int)raxSize(server.replicas_waiting_psync) + server.draining_zero_copy_connections);
o->refcount <= (int)listLength(server.replicas) + 1 + (int)raxSize(server.replicas_waiting_psync));
}

/* Refresh the number of replicas with lag <= min-replicas-max-lag. */
Expand Down
30 changes: 26 additions & 4 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,18 @@ int clientsCronResizeOutputBuffer(client *c, mstime_t now_ms) {
return 0;
}

int clientsCronHandleZeroCopyDraining(client *c, mstime_t now_ms) {
serverAssert(c->zero_copy_tracker->draining);
if (now_ms / 1000 > (c->last_interaction + ZERO_COPY_MAX_DRAIN_TIME_SECONDS)) {
server.stat_zero_copy_clients_force_closed++;
connSetForceClose(c->conn, 1);
freeClient(c);
server.draining_clients--;
return 1;
}
return 0;
}

/* This function is used in order to track clients using the biggest amount
* of memory in the latest few seconds. This way we can provide such information
* in the INFO output (clients section), without having to do an O(N) scan for
Expand Down Expand Up @@ -1106,6 +1118,7 @@ void clientsCron(void) {
/* The following functions do different service checks on the client.
* The protocol is that they return non-zero if the client was
* terminated. */
if (c->zero_copy_tracker && c->zero_copy_tracker->draining && clientsCronHandleZeroCopyDraining(c, now)) continue;
if (clientsCronHandleTimeout(c, now)) continue;
if (clientsCronResizeQueryBuffer(c)) continue;
if (clientsCronResizeOutputBuffer(c, now)) continue;
Expand Down Expand Up @@ -2630,6 +2643,8 @@ void resetServerStats(void) {
server.stat_reply_buffer_expands = 0;
memset(server.duration_stats, 0, sizeof(durationStats) * EL_DURATION_TYPE_NUM);
server.el_cmd_cnt_max = 0;
server.stat_zero_copy_writes_processed = server.stat_zero_copy_writes_in_flight;
server.stat_zero_copy_clients_force_closed = 0;
lazyfreeResetStats();
}

Expand Down Expand Up @@ -2683,7 +2698,8 @@ void initServer(void) {
server.tracking_pending_keys = listCreate();
server.pending_push_messages = listCreate();
server.clients_waiting_acks = listCreate();
server.draining_zero_copy_connections = 0;
server.debug_zerocopy_bypass_loopback_check = 0;
server.draining_clients = 0;
server.get_ack_from_replicas = 0;
server.paused_actions = 0;
memset(server.client_pause_per_purpose, 0, sizeof(server.client_pause_per_purpose));
Expand Down Expand Up @@ -2773,6 +2789,7 @@ void initServer(void) {
server.rdb_last_load_keys_loaded = 0;
server.dirty = 0;
server.crashed = 0;
server.stat_zero_copy_writes_in_flight = 0;
resetServerStats();
/* A few stats we don't want to reset: server startup time, and peak mem. */
server.stat_starttime = time(NULL);
Expand Down Expand Up @@ -5612,7 +5629,8 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
"clients_in_timeout_table:%llu\r\n", (unsigned long long)raxSize(server.clients_timeout_table),
"total_watched_keys:%lu\r\n", watched_keys,
"total_blocking_keys:%lu\r\n", blocking_keys,
"total_blocking_keys_on_nokey:%lu\r\n", blocking_keys_on_nokey));
"total_blocking_keys_on_nokey:%lu\r\n", blocking_keys_on_nokey,
"draining_clients:%d\r\n", server.draining_clients));
}

/* Memory */
Expand Down Expand Up @@ -5707,7 +5725,8 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
"mem_overhead_db_hashtable_rehashing:%zu\r\n", mh->overhead_db_hashtable_rehashing,
"active_defrag_running:%d\r\n", server.active_defrag_running,
"lazyfree_pending_objects:%zu\r\n", lazyfreeGetPendingObjectsCount(),
"lazyfreed_objects:%zu\r\n", lazyfreeGetFreedObjectsCount()));
"lazyfreed_objects:%zu\r\n", lazyfreeGetFreedObjectsCount(),
"used_memory_zero_copy_tracking:%zu\r\n", mh->zero_copy_tracking));
freeMemoryOverheadData(mh);
}

Expand Down Expand Up @@ -5876,7 +5895,10 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
"eventloop_duration_sum:%llu\r\n", server.duration_stats[EL_DURATION_TYPE_EL].sum,
"eventloop_duration_cmd_sum:%llu\r\n", server.duration_stats[EL_DURATION_TYPE_CMD].sum,
"instantaneous_eventloop_cycles_per_sec:%llu\r\n", getInstantaneousMetric(STATS_METRIC_EL_CYCLE),
"instantaneous_eventloop_duration_usec:%llu\r\n", getInstantaneousMetric(STATS_METRIC_EL_DURATION)));
"instantaneous_eventloop_duration_usec:%llu\r\n", getInstantaneousMetric(STATS_METRIC_EL_DURATION),
"zero_copy_writes_processed:%lld\r\n",server.stat_zero_copy_writes_processed,
"zero_copy_writes_in_flight:%lld\r\n",server.stat_zero_copy_writes_in_flight,
"zero_copy_clients_force_closed:%lld\r\n",server.stat_zero_copy_clients_force_closed));
info = genValkeyInfoStringACLStats(info);
}

Expand Down
16 changes: 14 additions & 2 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ struct hdr_histogram;
#define DEFAULT_WAIT_BEFORE_RDB_CLIENT_FREE 60 /* Grace period in seconds for replica main \
* channel to establish psync. */
#define LOADING_PROCESS_EVENTS_INTERVAL_DEFAULT 100 /* Default: 0.1 seconds */
#define CONFIG_DEFAULT_ZERO_COPY_MIN_WRITE_SIZE 10*1024 /* https://docs.kernel.org/networking/msg_zerocopy.html */
#define ZERO_COPY_MAX_DRAIN_TIME_SECONDS 3 /* Maximum time a connection using zero copy
* is kept alive for the purpose of flushing
* pending writes. If not flushed in this time,
* force close the connection with a RST packet. */

/* Bucket sizes for client eviction pools. Each bucket stores clients with
* memory usage of up to twice the size of the bucket below it. */
Expand Down Expand Up @@ -1251,6 +1256,7 @@ typedef struct zeroCopyTracker {
uint32_t start;
uint32_t len;
uint32_t capacity;
int draining;
} zeroCopyTracker;

typedef struct client {
Expand Down Expand Up @@ -1548,6 +1554,7 @@ struct serverMemOverhead {
size_t overhead_ht_main;
size_t overhead_ht_expires;
} *db;
size_t zero_copy_tracking;
};

/* Replication error behavior determines the replica behavior
Expand Down Expand Up @@ -1867,6 +1874,10 @@ struct valkeyServer {
long long stat_client_outbuf_limit_disconnections; /* Total number of clients reached output buf length limit */
long long stat_total_prefetch_entries; /* Total number of prefetched dict entries */
long long stat_total_prefetch_batches; /* Total number of prefetched batches */
long long stat_zero_copy_writes_processed; /* Total number of writes using zero copy */
long long stat_zero_copy_writes_in_flight; /* Total number of writes using zero copy that are not yet finished by the kernel */
long long stat_zero_copy_clients_force_closed; /* Total number of clients we force closed without finishing draining. */
size_t stat_zero_copy_tracking_memory; /* Memory usage for zero copy related tracking */
/* The following two are used to track instantaneous metrics, like
* number of operations per second, network traffic. */
struct {
Expand Down Expand Up @@ -2112,8 +2123,9 @@ struct valkeyServer {
int import_mode; /* If true, server is in import mode and forbid expiration and eviction. */
/* TCP Zero Copy */
int tcp_tx_zerocopy; /* If true, use zero copy for writes when possible. */
int draining_zero_copy_connections; /* Count of connections that are to be
* closed once fully drained.*/
int tcp_zerocopy_min_write_size; /* Minimum size for a write before we go thorugh zerocopy. */
int debug_zerocopy_bypass_loopback_check; /* Used to test zerocopy on loopback connections */
unsigned int draining_clients;
/* Synchronous replication. */
list *clients_waiting_acks; /* Clients waiting in WAIT or WAITAOF. */
int get_ack_from_replicas; /* If true we send REPLCONF GETACK. */
Expand Down
5 changes: 5 additions & 0 deletions src/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ static int connSocketConnect(connection *conn,
static void connSocketShutdown(connection *conn) {
if (conn->fd == -1) return;

conn->state = CONN_STATE_SHUTDOWN;
shutdown(conn->fd, SHUT_RDWR);
}

Expand Down Expand Up @@ -495,6 +496,10 @@ int connSetZeroCopy(connection *conn, int setting) {
return anetSetZeroCopy(NULL, conn->fd, setting);
}

int connSetForceClose(connection *conn, int enable) {
return anetSetForceClose(NULL, conn->fd, enable);
}

int RedisRegisterConnectionTypeSocket(void) {
return connTypeRegister(&CT_Socket);
}
Loading

0 comments on commit 756ab02

Please sign in to comment.