From 9b8248b10ea35fe5e09e2728aacda8d56da72e9f Mon Sep 17 00:00:00 2001 From: Jacob Murphy Date: Mon, 4 Nov 2024 21:49:46 +0000 Subject: [PATCH] Squashed commit of the following: MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit commit 3c32ee1bdaddcd5fbe699aa6c8b320e86702d1b6 Author: Madelyn Olson Date: Mon Nov 4 12:36:20 2024 -0800 Add a filter option to drop all cluster packets (#1252) A minor debugging change that helped in the investigation of https://github.com/valkey-io/valkey/issues/1251. Basically there are some edge cases where we want to fully isolate a note from receiving packets, but can't suspend the process because we need it to continue sending outbound traffic. So, added a filter for that. Signed-off-by: Madelyn Olson commit a102852d5ed5316063d680362f910e725070b9ee Author: Binbin Date: Sat Nov 2 19:51:14 2024 +0800 Fix timing issue in cluster-shards tests (#1243) The cluster-node-timeout is 3000 in our tests, the timing test wasn't succeeding, so extending the wait_for made them much more reliable. Signed-off-by: Binbin commit 0d7b2344b2c0df0269a8e018efc87438a8ec510e Author: Jim Brunner Date: Fri Nov 1 15:16:18 2024 -0700 correct type internal to kvstore (minor) (#1246) All of the internal variables related to number of dicts in the kvstore are type `int`. Not sure why these 2 items were declared as `long long`. Signed-off-by: Jim Brunner commit e985ead7f9377e4d61d2e47c0b964f917d8054b4 Author: zhenwei pi Date: Fri Nov 1 20:28:09 2024 +0800 RDMA: Prevent IO for child process (#1244) RDMA MR (memory region) is not forkable, the VMA (virtual memory area) of a MR gets empty in a child process. Prevent IO for child process to avoid server crash. In the check for whether read and write is allowed in an RDMA connection, a check that if we're in a child process is added. If we are, the function returns an error, which will cause the RDMA client to be disconnected. Suggested-by: Viktor Söderqvist Signed-off-by: zhenwei pi commit 1c222f77cecc100719bbc87c7a2ecd13402fe6db Author: Madelyn Olson Date: Thu Oct 31 11:37:53 2024 -0700 Improve performance of sdssplitargs (#1230) The current implementation of `sdssplitargs` does repeated `sdscatlen` to build the parsed arguments, which isn't very efficient because it does a lot of extra reallocations and moves through the sds code a lot. It also typically results in memory overhead, because `sdscatlen` over-allocates, which is usually not needed since args are usually not modified after being created. The new implementation of sdssplitargs does two passes, the first to parse the argument to figure out the final length and the second to actually copy the string. It's generally about 2x faster for larger strings (~100 bytes), and about 20% faster for small strings (~10 bytes). This is generally faster since as long as everything is in the CPU cache, it's going to be fast. There are a couple of sanity tests, none existed before, as well as some fuzzying which was used to find some bugs and also to do the benchmarking. The original benchmarking code can be seen https://github.com/valkey-io/valkey/pull/1230/commits/6576aeb86adfb5afa74aefb5bc2d2becde95ce4c. ``` test_sdssplitargs_benchmark - unit/test_sds.c:530] Using random seed: 1729883235 [test_sdssplitargs_benchmark - unit/test_sds.c:577] Improvement: 56.44%, new:13039us, old:29930us [test_sdssplitargs_benchmark - unit/test_sds.c:577] Improvement: 56.58%, new:12057us, old:27771us [test_sdssplitargs_benchmark - unit/test_sds.c:577] Improvement: 59.18%, new:9048us, old:22165us [test_sdssplitargs_benchmark - unit/test_sds.c:577] Improvement: 54.61%, new:12381us, old:27278us [test_sdssplitargs_benchmark - unit/test_sds.c:577] Improvement: 51.17%, new:16012us, old:32793us [test_sdssplitargs_benchmark - unit/test_sds.c:577] Improvement: 49.18%, new:16041us, old:31563us [test_sdssplitargs_benchmark - unit/test_sds.c:577] Improvement: 58.40%, new:12450us, old:29930us [test_sdssplitargs_benchmark - unit/test_sds.c:577] Improvement: 56.49%, new:13066us, old:30031us [test_sdssplitargs_benchmark - unit/test_sds.c:577] Improvement: 58.75%, new:12744us, old:30894us [test_sdssplitargs_benchmark - unit/test_sds.c:577] Improvement: 52.44%, new:16885us, old:35504us [test_sdssplitargs_benchmark - unit/test_sds.c:577] Improvement: 62.57%, new:8107us, old:21659us [test_sdssplitargs_benchmark - unit/test_sds.c:577] Improvement: 62.12%, new:8320us, old:21966us [test_sdssplitargs_benchmark - unit/test_sds.c:577] Improvement: 45.23%, new:13960us, old:25487us [test_sdssplitargs_benchmark - unit/test_sds.c:577] Improvement: 57.95%, new:9188us, old:21849us ``` --------- Signed-off-by: Madelyn Olson commit 91cbf7744256b365651d4bc039c2913ecde9dfe6 Author: Masahiro Ide Date: Fri Nov 1 03:30:05 2024 +0900 Eliminate snprintf usage at setDeferredAggregateLen (#1234) to align with how we encode the length at `_addReplyLongLongWithPrefix` Signed-off-by: Masahiro Ide Co-authored-by: Masahiro Ide commit ab98f375db51c83f3c56beac9440097f81af2048 Author: zhenwei pi Date: Wed Oct 30 18:12:42 2024 +0800 RDMA: Delete keepalive timer on closing (#1237) Typically, RDMA connection gets closed by client side, the server side handles diconnected CM event, and delete keepalive timer correctly. However, the server side may close connection voluntarily, for example the maxium connections exceed. Handle this case to avoid invalid memory access. Signed-off-by: zhenwei pi commit 789a73b0d0fc9e2b754adbb39ed3ca92e9c30669 Author: Binbin Date: Wed Oct 30 10:25:50 2024 +0800 Minor fix to debug logging in replicationFeedStreamFromPrimaryStream (#1235) We should only print logs when hide-user-data-from-log is off. Signed-off-by: Binbin commit 13f5f665f259f229d707116432f8ef2969cae0c7 Author: Shivshankar Date: Tue Oct 29 19:19:56 2024 -0400 Update the argument of clusterNodeGetReplica declaration (#1239) clusterNodeGetReplica agrumnets are missed to migrate during the slave to replication migration so updated the argument slave to replica. Signed-off-by: Shivshankar-Reddy commit 5a4c0640cebe922de563e03ff2a683b89612f522 Author: Madelyn Olson Date: Tue Oct 29 14:26:17 2024 -0700 Mark main and serverAssert as weak symbols to be overridden (#1232) At some point unit tests stopped building on MacOS because of duplicate symbols. I had originally solved this problem by using a flag that overrides symbols, but the much better solution is to mark the duplicate symbols as weak and they can be overridden during linking. (Symbols by default are strong, strong symbols override weak symbols) I also added macos unit build to the CI, so that this doesn't silently break in the future again. --------- Signed-off-by: Madelyn Olson Co-authored-by: Viktor Söderqvist commit 8ee7a580254d5600a2af32ac30ce2a103b7d83fb Author: zixuan zhao Date: Tue Oct 29 06:13:30 2024 -0400 Document log format configs in valkey.conf (#1233) Add config options for log format and timestamp format introduced by Related to #1225 This change adds two new configs into valkey.conf: log-format log-timestamp-format --------- Signed-off-by: azuredream commit c21f1dc084d49ea74883ba93583bf957f7636635 Author: Lipeng Zhu Date: Mon Oct 28 13:43:23 2024 +0800 Increase the IO_THREADS_MAX_NUM. (#1220) This patch try to increase the max number of io-threads from 16(128) to 256 for below reasons: 1. The core number increases a lot in the modern server processors, for example, the [Sierra Forest](https://en.wikipedia.org/wiki/Sierra_Forest) processors are targeted towards with up to **288** cores. Due to limitation of **_io-threads_** number (16 and 128 ), benchmark like https://openbenchmarking.org/test/pts/valkey even cannot run on a high core count server. 2. For some workloads, the bottleneck could be main thread, but for the other workloads, big key/value which caused heavy io, the bottleneck could be the io-threads, for example benchmark `memtier_benchmark -s 127.0.0.1 -p 9001 "--data-size" "20000" --ratio 1:0 --key-pattern P:P --key-minimum=1 --key-maximum 1000000 --test-time 180 -c 50 -t 16 --hide-histogram`. The QPS is still scalable after 16 io-threads. ![image](https://github.com/user-attachments/assets/e980f805-a162-44be-b03e-ab37a9c489cf) **Fig 1. QPS Scale factor with io-threads number grows.** Signed-off-by: Lipeng Zhu Co-authored-by: Wangyang Guo commit 5d2ff853a335af2bc2c1527da126b3e947269ad2 Author: Binbin Date: Sun Oct 27 15:23:00 2024 +0800 Fix minor repldbfd leak in updateReplicasWaitingBgsave if fstat fails (#1226) In the old code, if fstat fails, replica->repldbfd will hold the fd and we are doing a free client. And in freeClient, we check and close only if repl_state == REPLICA_STATE_SEND_BULK. So if fstat fails, we will leak the fd. We can also extend freeClient to handle REPLICA_STATE_WAIT_BGSAVE_END as well, but here seems to be a more friendly (and safer) way. Signed-off-by: Binbin commit 4be09e434a3b5bf55e6e6b5a98f315720e31010f Author: Shivshankar Date: Fri Oct 25 08:03:59 2024 -0400 Fix typo in valkey.conf file's shutdown section (#1224) Found typo "exists" ==> "exits" in valkey.conf in shutdown section. Signed-off-by: Shivshankar-Reddy commit 9c60fcdae241a7e1dedc2f51d19491d168d66b9b Author: Lipeng Zhu Date: Fri Oct 25 17:13:28 2024 +0800 Do security attack check only when command not found to reduce the critical path (#1212) When explored the cycles distribution for main thread with io-threads enabled. We found this security attack check takes significant time in main thread, **~3%** cycles were used to do the commands security check in main thread. This patch try to completely avoid doing it in the hot path. We can do it only after we looked up the command and it wasn't found, just before we call commandCheckExistence. --------- Signed-off-by: Lipeng Zhu Co-authored-by: Wangyang Guo commit 55bbbe09a3b37a98c907dbfbc225768ff253e84f Author: zixuan zhao Date: Thu Oct 24 18:36:32 2024 -0400 Configurable log and timestamp formats (logfmt, ISO8601) (#1022) Add ability to configure log output format and timestamp format in the logs. This change adds two new configs: * `log-format`: Either legacy or logfmt (See https://brandur.org/logfmt) * `log-timestamp-format`: legacy, iso8601 or milliseconds (since the eppch). Related to #1006. Example: ``` $ ./valkey-server /home/zhaoz12/git/valkey/valkey/valkey.conf pid=109463 role=RDB/AOF timestamp="2024-09-10T20:37:25.738-04:00" level=warning message="WARNING Memory overcommit must be enabled! Without it, a background save or replication may fail under low memory condition. Being disabled, it can also cause failures without low memory condition, see https://github.com/jemalloc/jemalloc/issues/1328. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect." pid=109463 role=RDB/AOF timestamp="2024-09-10T20:37:25.738-04:00" level=notice message="oO0OoO0OoO0Oo Valkey is starting oO0OoO0OoO0Oo" pid=109463 role=RDB/AOF timestamp="2024-09-10T20:37:25.738-04:00" level=notice message="Valkey version=255.255.255, bits=64, commit=affbea5d, modified=1, pid=109463, just started" pid=109463 role=RDB/AOF timestamp="2024-09-10T20:37:25.738-04:00" level=notice message="Configuration loaded" pid=109463 role=master timestamp="2024-09-10T20:37:25.738-04:00" level=notice message="monotonic clock: POSIX clock_gettime" pid=109463 role=master timestamp="2024-09-10T20:37:25.739-04:00" level=warning message="Failed to write PID file: Permission denied" ``` --------- Signed-off-by: azuredream commit 2956367731305f73005b44b26d6f665564731de3 Author: Binbin Date: Thu Oct 24 21:53:05 2024 +0800 Maintain return value of rdbSaveDb after writing slot-info aux (#1222) All other places written in this function are maintained it, although the caller of rdbSaveDb does not reply on it, it is maintained to be consistent with other places, is its duty. Signed-off-by: Binbin commit a21fe718f46e5987dd9cf1ca698dce6d0d060795 Author: Binbin Date: Thu Oct 24 16:38:47 2024 +0800 Limit CLUSTER_CANT_FAILOVER_DATA_AGE log to 10 times period (#1189) If a replica is step into data_age too old stage, it can not trigger the failover and currently it can not be automatically recovered and we will print a log every CLUSTER_CANT_FAILOVER_RELOG_PERIOD, which is every second. If the primary has not recovered or there is no manual failover, this log will flood the log file. In this case, limit its frequency to 10 times period, which is 10 seconds in our code. Also in this data_age too old stage, the repeated logs also can stand for the progress of the failover. See also #780 for more details about it. Signed-off-by: Binbin Co-authored-by: Ping Xie commit c419524c05d7544636d5bc0cc0cb333052b0a517 Author: muelstefamzn Date: Wed Oct 23 16:56:32 2024 -0700 Trim free space from inline command argument strings to avoid excess memory usage (#1213) The command argument strings created while parsing inline commands (see `processInlineBuffer()`) can contain free capacity. Since some commands ,such as `SET`, store these strings in the database, that free capacity increases the memory usage. In the worst case, it could double the memory usage. This only occurs if the inline command format is used. The argument strings are built by appending character by character in `sdssplitargs()`. Regular RESP commands are not affected. This change trims the strings within `processInlineBuffer()`. this? When the command argument string is packed into an object, `trimStringObjectIfNeeded()` is called. This does only trim the string if it is larger than `PROTO_MBULK_BIG_ARG` (32kB), as only strings larger than this would ever need trimming if the command it sent using the bulk string format. We could modify this condition, but that would potentially have a performance impact on commands using the bulk format. Since those make up for the vast majority of executed commands, limiting this change to inline commands seems prudent. * 1 million `SET [key] [value]` commands * Random keys (16 bytes) * 600 bytes values Memory usage without this change: ``` used_memory:1089327888 used_memory_human:1.01G used_memory_rss:1131696128 used_memory_rss_human:1.05G used_memory_peak:1089348264 used_memory_peak_human:1.01G used_memory_peak_perc:100.00% used_memory_overhead:49302800 used_memory_startup:911808 used_memory_dataset:1040025088 used_memory_dataset_perc:95.55% ``` Memory usage with this change: ``` used_memory:705327888 used_memory_human:672.65M used_memory_rss:718802944 used_memory_rss_human:685.50M used_memory_peak:705348256 used_memory_peak_human:672.67M used_memory_peak_perc:100.00% used_memory_overhead:49302800 used_memory_startup:911808 used_memory_dataset:656025088 used_memory_dataset_perc:93.13% ``` If the same experiment is repeated using the normal RESP array of bulk string format (`*3\r\n$3\r\nSET\r\n...`) then the memory usage is 672MB with and without of this change. If a replica is attached, its memory usage is 672MB with and without this change, since the replication link never uses inline commands. Signed-off-by: Stefan Mueller commit c176de4251cb82eb2b005b40fe284b93bdaa7353 Author: danish-mehmood <35922417+danish-mehmood@users.noreply.github.com> Date: Thu Oct 24 02:30:42 2024 +0500 Clarify the wording from dually to the more common doubly (#1214) Clarify documentation is ziplist.c Signed-off-by: danish-mehmood commit b803f7aeff4ffe43c866a52d9ea830add33b5834 Author: Binbin Date: Wed Oct 23 17:11:42 2024 +0800 Cleaned up getSlotOrReply is return -1 instead of C_ERR (#1211) Minor cleanup since getSlotOrReply return -1 on error, not return C_ERR. Signed-off-by: Binbin commit 5d70ccd70eb42db718e0176987891f91d21d29c8 Author: Binbin Date: Wed Oct 23 10:22:25 2024 +0800 Make replica CLUSTER RESET flush async based on lazyfree-lazy-user-flush (#1190) Currently, if the replica has a lot of data, CLUSTER RESET will block for a while and report the slowlog, and it seems that there is no harm in making it async so external components can be easier when monitoring it. Signed-off-by: Binbin Co-authored-by: Ping Xie commit 285064b114c80cce5bd4a48bf8d6493c9d0e0971 Author: Shivshankar Date: Mon Oct 21 22:54:40 2024 -0400 fix typo (#1202) Signed-off-by: Shivshankar-Reddy commit 771918e4bf52406519dd66e7421de00bc6169d63 Author: Shivshankar Date: Mon Oct 21 16:48:29 2024 -0400 Updating command.def by running the generate-command-code.py (#1203) Part of https://github.com/valkey-io/valkey/pull/1200 PR, since feild is changed. Looks like commands.def is missed to get genereated based on the changes so that is causing CI failure on unstable. Signed-off-by: Shivshankar-Reddy commit 5885dc56bdb40b3e0ea9b3d20a8bb08c7f2c3157 Author: Viktor Söderqvist Date: Mon Oct 21 16:04:47 2024 +0200 Fix BGSAVE CANCEL since and history fields (#1200) Fixes wrong "since" and "history" introduced in #757. --------- Signed-off-by: Viktor Söderqvist commit 29b83f1ac8dd80a9c3214c1e1f0ff3b7730fb612 Author: ranshid <88133677+ranshid@users.noreply.github.com> Date: Mon Oct 21 12:56:44 2024 +0300 Introduce bgsave cancel (#757) In some cases bgsave child process can run for a long time exhausting system resources. Although it is possible to kill the bgsave child process from the system shell, sometimes it is not possible allowing OS level access. This PR adds a new subcommand to the BGSAVE command. When user will issue `BGSAVE CANCEL`, it will do one of the 2: 1. In case a bgsave child process is currently running, the child process would be immediately killed thus terminating any save/replication full sync process. 2. In case a bgsave child process is SCHEDULED to run, the scheduled execution will be cancelled. --------- Signed-off-by: ranshid Signed-off-by: ranshid <88133677+ranshid@users.noreply.github.com> Signed-off-by: Ran Shidlansik Signed-off-by: Binbin Co-authored-by: Binbin Co-authored-by: Viktor Söderqvist commit 71f8c34eede9a8e0fdab09b53ff4202d8bbaa434 Author: zhenwei pi Date: Mon Oct 21 16:11:27 2024 +0800 RDMA: Fix listener priv opaque pointer (#1194) struct connListener.priv should be used by connection type specific data, static local listener data should not use this. A RDMA config structure is going to be introduced in the next step: ``` typedef struct serverRdmaContextConfig { char *bindaddr; int bindaddr_count; int port; int rx_size; int comp_vector; ... } serverRdmaContextConfig; ``` Then a builtin RDMA will be supported. Signed-off-by: zhenwei pi commit 2743b7e04beecc3128b696e2b90a4916f45dc1c4 Author: Binbin Date: Sat Oct 19 14:56:10 2024 +0800 Fix SORT GET to ignore special pattern # in cluster slot check (#1182) This special pattern '#' is used to get the element itself, it does not actually participate in the slot check. In this case, passing `GET #` will cause '#' to participate in the slot check, causing the command to get an `pattern may be in different slots` error. Signed-off-by: Binbin Signed-off-by: Jacob Murphy --- .github/workflows/ci.yml | 2 +- src/Makefile | 11 +- src/cluster.h | 2 +- src/cluster_legacy.c | 28 ++-- src/cluster_slot_stats.c | 4 +- src/commands.def | 11 +- src/commands/bgsave.json | 30 +++- src/config.c | 16 +- src/config.h | 2 +- src/db.c | 4 +- src/debug.c | 4 +- src/io_threads.c | 2 +- src/kvstore.c | 4 +- src/networking.c | 9 +- src/rdb.c | 21 +++ src/rdma.c | 59 +++++-- src/replication.c | 18 ++- src/sds.c | 181 ++++++++++++---------- src/server.c | 119 ++++++++++++-- src/server.h | 33 ++-- src/sort.c | 8 +- src/unit/test_files.h | 3 +- src/unit/test_sds.c | 41 +++++ src/ziplist.c | 2 +- tests/cluster/tests/28-cluster-shards.tcl | 2 +- tests/integration/rdb.tcl | 58 +++++++ tests/unit/cluster/cluster-shards.tcl | 2 +- tests/unit/sort.tcl | 20 ++- valkey.conf | 25 ++- 29 files changed, 542 insertions(+), 179 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f1d23f40fa..48a94ef984 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -83,7 +83,7 @@ jobs: steps: - uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1 - name: make - run: make -j3 SERVER_CFLAGS='-Werror' + run: make -j3 all-with-unit-tests SERVER_CFLAGS='-Werror' build-32bit: runs-on: ubuntu-latest diff --git a/src/Makefile b/src/Makefile index 020b70d6d5..ae2de1c626 100644 --- a/src/Makefile +++ b/src/Makefile @@ -98,15 +98,6 @@ ifeq ($(USE_JEMALLOC),no) MALLOC=libc endif -# Some unit tests compile files a second time to get access to static functions, the "--allow-multiple-definition" flag -# allows us to do that without an error, by using the first instance of function. This behavior can also be used -# to tweak behavior of code just for unit tests. The version of ld on MacOS apparently always does this. -ifneq ($(uname_S),Darwin) - ALLOW_DUPLICATE_FLAG=-Wl,--allow-multiple-definition -else - ALLOW_DUPLICATE_FLAG= -endif - ifdef SANITIZER ifeq ($(SANITIZER),address) MALLOC=libc @@ -494,7 +485,7 @@ $(ENGINE_LIB_NAME): $(ENGINE_SERVER_OBJ) # valkey-unit-tests $(ENGINE_UNIT_TESTS): $(ENGINE_TEST_OBJ) $(ENGINE_LIB_NAME) - $(SERVER_LD) $(ALLOW_DUPLICATE_FLAG) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/lua/src/liblua.a ../deps/hdr_histogram/libhdrhistogram.a ../deps/fpconv/libfpconv.a $(FINAL_LIBS) + $(SERVER_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/lua/src/liblua.a ../deps/hdr_histogram/libhdrhistogram.a ../deps/fpconv/libfpconv.a $(FINAL_LIBS) # valkey-sentinel $(ENGINE_SENTINEL_NAME): $(SERVER_NAME) diff --git a/src/cluster.h b/src/cluster.h index 81bf3a1091..15a553382a 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -96,7 +96,7 @@ int clusterNodeIsFailing(clusterNode *node); int clusterNodeIsNoFailover(clusterNode *node); char *clusterNodeGetShardId(clusterNode *node); int clusterNodeNumReplicas(clusterNode *node); -clusterNode *clusterNodeGetReplica(clusterNode *node, int slave_idx); +clusterNode *clusterNodeGetReplica(clusterNode *node, int replica_idx); clusterNode *getMigratingSlotDest(int slot); clusterNode *getImportingSlotSource(int slot); clusterNode *getNodeBySlot(int slot); diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 70ca2ab252..fd60df19af 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -1237,7 +1237,7 @@ void clusterReset(int hard) { if (nodeIsReplica(myself)) { clusterSetNodeAsPrimary(myself); replicationUnsetPrimary(); - emptyData(-1, EMPTYDB_NO_FLAGS, NULL); + emptyData(-1, server.lazyfree_lazy_user_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS, NULL); } /* Close slots, reset manual failover state. */ @@ -2995,7 +2995,7 @@ int clusterIsValidPacket(clusterLink *link) { return 0; } - if (type == server.cluster_drop_packet_filter) { + if (type == server.cluster_drop_packet_filter || server.cluster_drop_packet_filter == -2) { serverLog(LL_WARNING, "Dropping packet that matches debug drop filter"); return 0; } @@ -3084,7 +3084,8 @@ int clusterProcessPacket(clusterLink *link) { if (!clusterIsValidPacket(link)) { clusterMsg *hdr = (clusterMsg *)link->rcvbuf; uint16_t type = ntohs(hdr->type); - if (server.debug_cluster_close_link_on_packet_drop && type == server.cluster_drop_packet_filter) { + if (server.debug_cluster_close_link_on_packet_drop && + (type == server.cluster_drop_packet_filter || server.cluster_drop_packet_filter == -2)) { freeClusterLink(link); serverLog(LL_WARNING, "Closing link for matching packet type %hu", type); return 0; @@ -4708,11 +4709,18 @@ int clusterGetReplicaRank(void) { void clusterLogCantFailover(int reason) { char *msg; static time_t lastlog_time = 0; + time_t now = time(NULL); - /* Don't log if we have the same reason for some time. */ - if (reason == server.cluster->cant_failover_reason && - time(NULL) - lastlog_time < CLUSTER_CANT_FAILOVER_RELOG_PERIOD) + /* General logging suppression if the same reason has occurred recently. */ + if (reason == server.cluster->cant_failover_reason && now - lastlog_time < CLUSTER_CANT_FAILOVER_RELOG_PERIOD) { return; + } + + /* Special case: If the failure reason is due to data age, log 10 times less frequently. */ + if (reason == server.cluster->cant_failover_reason && reason == CLUSTER_CANT_FAILOVER_DATA_AGE && + now - lastlog_time < 10 * CLUSTER_CANT_FAILOVER_RELOG_PERIOD) { + return; + } server.cluster->cant_failover_reason = reason; @@ -6090,6 +6098,8 @@ const char *clusterGetMessageTypeString(int type) { return "unknown"; } +/* Get the slot from robj and return it. If the slot is not valid, + * return -1 and send an error to the client. */ int getSlotOrReply(client *c, robj *o) { long long slot; @@ -6815,7 +6825,7 @@ int clusterCommandSpecial(client *c) { memset(slots, 0, CLUSTER_SLOTS); /* Check that all the arguments are parseable.*/ for (j = 2; j < c->argc; j++) { - if ((slot = getSlotOrReply(c, c->argv[j])) == C_ERR) { + if ((slot = getSlotOrReply(c, c->argv[j])) == -1) { zfree(slots); return 1; } @@ -6848,11 +6858,11 @@ int clusterCommandSpecial(client *c) { /* Check that all the arguments are parseable and that all the * slots are not already busy. */ for (j = 2; j < c->argc; j += 2) { - if ((startslot = getSlotOrReply(c, c->argv[j])) == C_ERR) { + if ((startslot = getSlotOrReply(c, c->argv[j])) == -1) { zfree(slots); return 1; } - if ((endslot = getSlotOrReply(c, c->argv[j + 1])) == C_ERR) { + if ((endslot = getSlotOrReply(c, c->argv[j + 1])) == -1) { zfree(slots); return 1; } diff --git a/src/cluster_slot_stats.c b/src/cluster_slot_stats.c index 284208af54..b52692bd15 100644 --- a/src/cluster_slot_stats.c +++ b/src/cluster_slot_stats.c @@ -279,8 +279,8 @@ void clusterSlotStatsCommand(client *c) { if (c->argc == 5 && !strcasecmp(c->argv[2]->ptr, "slotsrange")) { /* CLUSTER SLOT-STATS SLOTSRANGE start-slot end-slot */ int startslot, endslot; - if ((startslot = getSlotOrReply(c, c->argv[3])) == C_ERR || - (endslot = getSlotOrReply(c, c->argv[4])) == C_ERR) { + if ((startslot = getSlotOrReply(c, c->argv[3])) == -1 || + (endslot = getSlotOrReply(c, c->argv[4])) == -1) { return; } if (startslot > endslot) { diff --git a/src/commands.def b/src/commands.def index b1b39b9632..168514871c 100644 --- a/src/commands.def +++ b/src/commands.def @@ -6428,6 +6428,7 @@ struct COMMAND_STRUCT ACL_Subcommands[] = { /* BGSAVE history */ commandHistory BGSAVE_History[] = { {"3.2.2","Added the `SCHEDULE` option."}, +{"8.1.0","Added the `CANCEL` option."}, }; #endif @@ -6441,9 +6442,15 @@ commandHistory BGSAVE_History[] = { #define BGSAVE_Keyspecs NULL #endif +/* BGSAVE operation argument table */ +struct COMMAND_ARG BGSAVE_operation_Subargs[] = { +{MAKE_ARG("schedule",ARG_TYPE_PURE_TOKEN,-1,"SCHEDULE",NULL,"3.2.2",CMD_ARG_NONE,0,NULL)}, +{MAKE_ARG("cancel",ARG_TYPE_PURE_TOKEN,-1,"CANCEL",NULL,"8.1.0",CMD_ARG_NONE,0,NULL)}, +}; + /* BGSAVE argument table */ struct COMMAND_ARG BGSAVE_Args[] = { -{MAKE_ARG("schedule",ARG_TYPE_PURE_TOKEN,-1,"SCHEDULE",NULL,"3.2.2",CMD_ARG_OPTIONAL,0,NULL)}, +{MAKE_ARG("operation",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,2,NULL),.subargs=BGSAVE_operation_Subargs}, }; /********** COMMAND COUNT ********************/ @@ -11009,7 +11016,7 @@ struct COMMAND_STRUCT serverCommandTable[] = { /* server */ {MAKE_CMD("acl","A container for Access List Control commands.","Depends on subcommand.","6.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,ACL_History,0,ACL_Tips,0,NULL,-2,CMD_SENTINEL,0,ACL_Keyspecs,0,NULL,0),.subcommands=ACL_Subcommands}, {MAKE_CMD("bgrewriteaof","Asynchronously rewrites the append-only file to disk.","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,BGREWRITEAOF_History,0,BGREWRITEAOF_Tips,0,bgrewriteaofCommand,1,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT,0,BGREWRITEAOF_Keyspecs,0,NULL,0)}, -{MAKE_CMD("bgsave","Asynchronously saves the database(s) to disk.","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,BGSAVE_History,1,BGSAVE_Tips,0,bgsaveCommand,-1,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT,0,BGSAVE_Keyspecs,0,NULL,1),.args=BGSAVE_Args}, +{MAKE_CMD("bgsave","Asynchronously saves the database(s) to disk.","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,BGSAVE_History,2,BGSAVE_Tips,0,bgsaveCommand,-1,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT,0,BGSAVE_Keyspecs,0,NULL,1),.args=BGSAVE_Args}, {MAKE_CMD("command","Returns detailed information about all commands.","O(N) where N is the total number of commands","2.8.13",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,COMMAND_History,0,COMMAND_Tips,1,commandCommand,-1,CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,COMMAND_Keyspecs,0,NULL,0),.subcommands=COMMAND_Subcommands}, {MAKE_CMD("config","A container for server configuration commands.","Depends on subcommand.","2.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,CONFIG_History,0,CONFIG_Tips,0,NULL,-2,0,0,CONFIG_Keyspecs,0,NULL,0),.subcommands=CONFIG_Subcommands}, {MAKE_CMD("dbsize","Returns the number of keys in the database.","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,DBSIZE_History,0,DBSIZE_Tips,2,dbsizeCommand,1,CMD_READONLY|CMD_FAST,ACL_CATEGORY_KEYSPACE,DBSIZE_Keyspecs,0,NULL,0)}, diff --git a/src/commands/bgsave.json b/src/commands/bgsave.json index f73d8a89b5..6b4688ba57 100644 --- a/src/commands/bgsave.json +++ b/src/commands/bgsave.json @@ -10,6 +10,10 @@ [ "3.2.2", "Added the `SCHEDULE` option." + ], + [ + "8.1.0", + "Added the `CANCEL` option." ] ], "command_flags": [ @@ -19,11 +23,23 @@ ], "arguments": [ { - "name": "schedule", - "token": "SCHEDULE", - "type": "pure-token", + "name": "operation", + "type": "oneof", "optional": true, - "since": "3.2.2" + "arguments": [ + { + "name": "schedule", + "token": "SCHEDULE", + "type": "pure-token", + "since": "3.2.2" + }, + { + "name": "cancel", + "token": "CANCEL", + "type": "pure-token", + "since": "8.1.0" + } + ] } ], "reply_schema": { @@ -33,6 +49,12 @@ }, { "const": "Background saving scheduled" + }, + { + "const": "Background saving cancelled" + }, + { + "const": "Scheduled background saving cancelled" } ] } diff --git a/src/config.c b/src/config.c index 560c7266bd..e72e45fb2e 100644 --- a/src/config.c +++ b/src/config.c @@ -152,6 +152,13 @@ configEnum propagation_error_behavior_enum[] = { {"panic-on-replicas", PROPAGATION_ERR_BEHAVIOR_PANIC_ON_REPLICAS}, {NULL, 0}}; +configEnum log_format_enum[] = {{"legacy", LOG_FORMAT_LEGACY}, {"logfmt", LOG_FORMAT_LOGFMT}, {NULL, 0}}; + +configEnum log_timestamp_format_enum[] = {{"legacy", LOG_TIMESTAMP_LEGACY}, + {"iso8601", LOG_TIMESTAMP_ISO8601}, + {"milliseconds", LOG_TIMESTAMP_MILLISECONDS}, + {NULL, 0}}; + /* Output buffer limits presets. */ clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUNT] = { {0, 0, 0}, /* normal */ @@ -622,9 +629,6 @@ void loadServerConfigFromString(char *config) { if (server.config_hz < CONFIG_MIN_HZ) server.config_hz = CONFIG_MIN_HZ; if (server.config_hz > CONFIG_MAX_HZ) server.config_hz = CONFIG_MAX_HZ; - /* To ensure backward compatibility when io_threads_num is according to the previous maximum of 128. */ - if (server.io_threads_num > IO_THREADS_MAX_NUM) server.io_threads_num = IO_THREADS_MAX_NUM; - sdsfreesplitres(lines, totlines); reading_config_file = 0; return; @@ -3192,11 +3196,13 @@ standardConfig static_configs[] = { createEnumConfig("propagation-error-behavior", NULL, MODIFIABLE_CONFIG, propagation_error_behavior_enum, server.propagation_error_behavior, PROPAGATION_ERR_BEHAVIOR_IGNORE, NULL, NULL), createEnumConfig("shutdown-on-sigint", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, shutdown_on_sig_enum, server.shutdown_on_sigint, 0, isValidShutdownOnSigFlags, NULL), createEnumConfig("shutdown-on-sigterm", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, shutdown_on_sig_enum, server.shutdown_on_sigterm, 0, isValidShutdownOnSigFlags, NULL), + createEnumConfig("log-format", NULL, MODIFIABLE_CONFIG, log_format_enum, server.log_format, LOG_FORMAT_LEGACY, NULL, NULL), + createEnumConfig("log-timestamp-format", NULL, MODIFIABLE_CONFIG, log_timestamp_format_enum, server.log_timestamp_format, LOG_TIMESTAMP_LEGACY, NULL, NULL), /* Integer configs */ createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.dbnum, 16, INTEGER_CONFIG, NULL, NULL), - createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */ - createIntConfig("io-threads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, 1, 128, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */ + createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */ + createIntConfig("io-threads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, 1, IO_THREADS_MAX_NUM, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */ createIntConfig("events-per-io-thread", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.events_per_io_thread, 2, INTEGER_CONFIG, NULL, NULL), createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG, 0, 128, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, NULL), createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL), diff --git a/src/config.h b/src/config.h index 558b974f7d..3b79c5c681 100644 --- a/src/config.h +++ b/src/config.h @@ -338,7 +338,7 @@ void setcpuaffinity(const char *cpulist); #define HAVE_FADVISE #endif -#define IO_THREADS_MAX_NUM 16 +#define IO_THREADS_MAX_NUM 256 #ifndef CACHE_LINE_SIZE #if defined(__aarch64__) && defined(__APPLE__) diff --git a/src/db.c b/src/db.c index 1db85439a2..f466dbc82b 100644 --- a/src/db.c +++ b/src/db.c @@ -2418,9 +2418,9 @@ int bzmpopGetKeys(struct serverCommand *cmd, robj **argv, int argc, getKeysResul /* Helper function to extract keys from the SORT RO command. * - * SORT + * SORT_RO * - * The second argument of SORT is always a key, however an arbitrary number of + * The second argument of SORT_RO is always a key, however an arbitrary number of * keys may be accessed while doing the sort (the BY and GET args), so the * key-spec declares incomplete keys which is why we have to provide a concrete * implementation to fetch the keys. diff --git a/src/debug.c b/src/debug.c index 317498dd6a..1b7cd90ae7 100644 --- a/src/debug.c +++ b/src/debug.c @@ -432,7 +432,7 @@ void debugCommand(client *c) { " Some fields of the default behavior may be time consuming to fetch,", " and `fast` can be passed to avoid fetching them.", "DROP-CLUSTER-PACKET-FILTER ", - " Drop all packets that match the filtered type. Set to -1 allow all packets.", + " Drop all packets that match the filtered type. Set to -1 allow all packets or -2 to drop all packets.", "CLOSE-CLUSTER-LINK-ON-PACKET-DROP <0|1>", " This is valid only when DROP-CLUSTER-PACKET-FILTER is set to a valid packet type.", " When set to 1, the cluster link is closed after dropping a packet based on the filter.", @@ -1023,7 +1023,7 @@ void debugCommand(client *c) { /* =========================== Crash handling ============================== */ -__attribute__((noinline)) void _serverAssert(const char *estr, const char *file, int line) { +__attribute__((noinline, weak)) void _serverAssert(const char *estr, const char *file, int line) { int new_report = bugReportStart(); serverLog(LL_WARNING, "=== %sASSERTION FAILED ===", new_report ? "" : "RECURSIVE "); serverLog(LL_WARNING, "==> %s:%d '%s' is not true", file, line, estr); diff --git a/src/io_threads.c b/src/io_threads.c index b0368cf07b..f4471b96d0 100644 --- a/src/io_threads.c +++ b/src/io_threads.c @@ -319,7 +319,7 @@ void initIOThreads(void) { int trySendReadToIOThreads(client *c) { if (server.active_io_threads_num <= 1) return C_ERR; - /* If IO thread is areadty reading, return C_OK to make sure the main thread will not handle it. */ + /* If IO thread is already reading, return C_OK to make sure the main thread will not handle it. */ if (c->io_read_state != CLIENT_IDLE) return C_OK; /* Currently, replica/master writes are not offloaded and are processed synchronously. */ if (c->flag.primary || getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR; diff --git a/src/kvstore.c b/src/kvstore.c index 10d6c73dfe..687ff879cd 100644 --- a/src/kvstore.c +++ b/src/kvstore.c @@ -54,8 +54,8 @@ struct _kvstore { int flags; dictType *dtype; dict **dicts; - long long num_dicts; - long long num_dicts_bits; + int num_dicts; + int num_dicts_bits; list *rehashing; /* List of dictionaries in this kvstore that are currently rehashing. */ int resize_cursor; /* Cron job uses this cursor to gradually resize dictionaries (only used if num_dicts > 1). */ int allocated_dicts; /* The number of allocated dicts. */ diff --git a/src/networking.c b/src/networking.c index c765c3cc4f..5c4dcdae00 100644 --- a/src/networking.c +++ b/src/networking.c @@ -889,8 +889,11 @@ void setDeferredAggregateLen(client *c, void *node, long length, char prefix) { } char lenstr[128]; - size_t lenstr_len = snprintf(lenstr, sizeof(lenstr), "%c%ld\r\n", prefix, length); - setDeferredReply(c, node, lenstr, lenstr_len); + lenstr[0] = prefix; + size_t lenstr_len = ll2string(lenstr + 1, sizeof(lenstr) - 1, length); + lenstr[lenstr_len + 1] = '\r'; + lenstr[lenstr_len + 2] = '\n'; + setDeferredReply(c, node, lenstr, lenstr_len + 3); } void setDeferredArrayLen(client *c, void *node, long length) { @@ -2682,6 +2685,8 @@ void processInlineBuffer(client *c) { /* Create an Object for all arguments. */ for (c->argc = 0, j = 0; j < argc; j++) { + /* Strings returned from sdssplitargs() may have unused capacity that we can trim. */ + argv[j] = sdsRemoveFreeSpace(argv[j], 1); c->argv[c->argc] = createObject(OBJ_STRING, argv[j]); c->argc++; c->argv_len_sum += sdslen(argv[j]); diff --git a/src/rdb.c b/src/rdb.c index 864b51dd1e..eb581aa181 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1359,6 +1359,7 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter, int slot_ sdsfree(slot_info); goto werr; } + written += res; last_slot = curr_slot; sdsfree(slot_info); } @@ -3695,6 +3696,21 @@ void bgsaveCommand(client *c) { if (c->argc > 1) { if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr, "schedule")) { schedule = 1; + } else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr, "cancel")) { + /* Terminates an in progress BGSAVE */ + if (server.child_type == CHILD_TYPE_RDB) { + /* There is an ongoing bgsave */ + serverLog(LL_NOTICE, "Background saving will be aborted due to user request"); + killRDBChild(); + addReplyStatus(c, "Background saving cancelled"); + } else if (server.rdb_bgsave_scheduled == 1) { + serverLog(LL_NOTICE, "Scheduled background saving will be cancelled due to user request"); + server.rdb_bgsave_scheduled = 0; + addReplyStatus(c, "Scheduled background saving cancelled"); + } else { + addReplyError(c, "Background saving is currently not in progress or scheduled"); + } + return; } else { addReplyErrorObject(c, shared.syntaxerr); return; @@ -3709,6 +3725,11 @@ void bgsaveCommand(client *c) { } else if (hasActiveChildProcess() || server.in_exec) { if (schedule || server.in_exec) { server.rdb_bgsave_scheduled = 1; + if (schedule) { + serverLog(LL_NOTICE, "Background saving scheduled due to user request"); + } else { + serverLog(LL_NOTICE, "Background saving scheduled to run after transaction execution"); + } addReplyStatus(c, "Background saving scheduled"); } else { addReplyError(c, "Another child process is active (AOF?): can't BGSAVE right now. " diff --git a/src/rdma.c b/src/rdma.c index dd6de395d0..bb38baa0f1 100644 --- a/src/rdma.c +++ b/src/rdma.c @@ -127,6 +127,8 @@ typedef struct rdma_listener { * handler into pending list */ static list *pending_list; +static rdma_listener *rdma_listeners; + static ConnectionType CT_RDMA; static int valkey_rdma_rx_size = VALKEY_RDMA_DEFAULT_RX_SIZE; @@ -141,12 +143,34 @@ static void serverRdmaError(char *err, const char *fmt, ...) { va_end(ap); } +static inline int connRdmaAllowCommand(void) { + /* RDMA MR is not accessible in a child process, avoid segment fault due to + * invalid MR access, close it rather than server random crash */ + if (server.in_fork_child != CHILD_TYPE_NONE) { + return C_ERR; + } + + return C_OK; +} + +static inline int connRdmaAllowRW(connection *conn) { + if (conn->state == CONN_STATE_ERROR || conn->state == CONN_STATE_CLOSED) { + return C_ERR; + } + + return connRdmaAllowCommand(); +} + static int rdmaPostRecv(RdmaContext *ctx, struct rdma_cm_id *cm_id, ValkeyRdmaCmd *cmd) { struct ibv_sge sge; size_t length = sizeof(ValkeyRdmaCmd); struct ibv_recv_wr recv_wr, *bad_wr; int ret; + if (connRdmaAllowCommand()) { + return C_ERR; + } + sge.addr = (uint64_t)cmd; sge.length = length; sge.lkey = ctx->cmd_mr->lkey; @@ -449,13 +473,22 @@ static int rdmaHandleEstablished(struct rdma_cm_event *ev) { return C_OK; } +static inline void rdmaDelKeepalive(aeEventLoop *el, RdmaContext *ctx) { + if (ctx->keepalive_te == AE_ERR) { + return; + } + + aeDeleteTimeEvent(el, ctx->keepalive_te); + ctx->keepalive_te = AE_ERR; +} + static int rdmaHandleDisconnect(aeEventLoop *el, struct rdma_cm_event *ev) { struct rdma_cm_id *cm_id = ev->id; RdmaContext *ctx = cm_id->context; connection *conn = ctx->conn; rdma_connection *rdma_conn = (rdma_connection *)conn; - aeDeleteTimeEvent(el, ctx->keepalive_te); + rdmaDelKeepalive(el, ctx); conn->state = CONN_STATE_CLOSED; /* we can't close connection now, let's mark this connection as closed state */ @@ -748,7 +781,7 @@ static rdma_listener *rdmaFdToListener(connListener *listener, int fd) { for (int i = 0; i < listener->count; i++) { if (listener->fd[i] != fd) continue; - return (rdma_listener *)listener->priv + i; + return &rdma_listeners[i]; } return NULL; @@ -1171,6 +1204,7 @@ static void connRdmaClose(connection *conn) { } ctx = cm_id->context; + rdmaDelKeepalive(server.el, ctx); rdma_disconnect(cm_id); /* poll all CQ before close */ @@ -1202,6 +1236,10 @@ static size_t connRdmaSend(connection *conn, const void *data, size_t data_len) char *remote_addr = ctx->tx_addr + ctx->tx.offset; int ret; + if (connRdmaAllowCommand()) { + return C_ERR; + } + memcpy(addr, data, data_len); sge.addr = (uint64_t)addr; @@ -1235,7 +1273,7 @@ static int connRdmaWrite(connection *conn, const void *data, size_t data_len) { RdmaContext *ctx = cm_id->context; uint32_t towrite; - if (conn->state == CONN_STATE_ERROR || conn->state == CONN_STATE_CLOSED) { + if (connRdmaAllowRW(conn)) { return C_ERR; } @@ -1278,7 +1316,7 @@ static int connRdmaRead(connection *conn, void *buf, size_t buf_len) { struct rdma_cm_id *cm_id = rdma_conn->cm_id; RdmaContext *ctx = cm_id->context; - if (conn->state == CONN_STATE_ERROR || conn->state == CONN_STATE_CLOSED) { + if (connRdmaAllowRW(conn)) { return C_ERR; } @@ -1300,7 +1338,7 @@ static ssize_t connRdmaSyncWrite(connection *conn, char *ptr, ssize_t size, long long long start = mstime(); uint32_t towrite; - if (conn->state == CONN_STATE_ERROR || conn->state == CONN_STATE_CLOSED) { + if (connRdmaAllowRW(conn)) { return C_ERR; } @@ -1343,7 +1381,7 @@ static ssize_t connRdmaSyncRead(connection *conn, char *ptr, ssize_t size, long long long start = mstime(); uint32_t toread; - if (conn->state == CONN_STATE_ERROR || conn->state == CONN_STATE_CLOSED) { + if (connRdmaAllowRW(conn)) { return C_ERR; } @@ -1378,7 +1416,7 @@ static ssize_t connRdmaSyncReadLine(connection *conn, char *ptr, ssize_t size, l char *c; char nl = 0; - if (conn->state == CONN_STATE_ERROR || conn->state == CONN_STATE_CLOSED) { + if (connRdmaAllowRW(conn)) { return C_ERR; } @@ -1537,7 +1575,7 @@ int connRdmaListen(connListener *listener) { bindaddr = default_bindaddr; } - listener->priv = rdma_listener = zcalloc_num(bindaddr_count, sizeof(*rdma_listener)); + rdma_listeners = rdma_listener = zcalloc_num(bindaddr_count, sizeof(*rdma_listener)); for (j = 0; j < bindaddr_count; j++) { char *addr = bindaddr[j]; int optional = *addr == '-'; @@ -1757,13 +1795,14 @@ static int rdmaChangeListener(void) { aeDeleteFileEvent(server.el, listener->fd[i], AE_READABLE); listener->fd[i] = -1; - struct rdma_listener *rdma_listener = (struct rdma_listener *)listener->priv + i; + struct rdma_listener *rdma_listener = &rdma_listeners[i]; rdma_destroy_id(rdma_listener->cm_id); rdma_destroy_event_channel(rdma_listener->cm_channel); } listener->count = 0; - zfree(listener->priv); + zfree(rdma_listeners); + rdma_listeners = NULL; closeListener(listener); diff --git a/src/replication.c b/src/replication.c index 51b654ae32..2f840dfcd6 100644 --- a/src/replication.c +++ b/src/replication.c @@ -651,7 +651,7 @@ void replicationFeedStreamFromPrimaryStream(char *buf, size_t buflen) { /* Debugging: this is handy to see the stream sent from primary * to replicas. Disabled with if(0). */ if (0) { - if (server.hide_user_data_from_log) { + if (!server.hide_user_data_from_log) { printf("%zu:", buflen); for (size_t j = 0; j < buflen; j++) { printf("%c", isprint(buf[j]) ? buf[j] : '.'); @@ -1761,6 +1761,7 @@ void updateReplicasWaitingBgsave(int bgsaveerr, int type) { client *replica = ln->value; if (replica->repl_state == REPLICA_STATE_WAIT_BGSAVE_END) { + int repldbfd; struct valkey_stat buf; if (bgsaveerr != C_OK) { @@ -1810,17 +1811,26 @@ void updateReplicasWaitingBgsave(int bgsaveerr, int type) { } replica->repl_start_cmd_stream_on_ack = 1; } else { - if ((replica->repldbfd = open(server.rdb_filename, O_RDONLY)) == -1 || - valkey_fstat(replica->repldbfd, &buf) == -1) { + repldbfd = open(server.rdb_filename, O_RDONLY); + if (repldbfd == -1) { freeClientAsync(replica); - serverLog(LL_WARNING, "SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno)); + serverLog(LL_WARNING, "SYNC failed. Can't open DB after BGSAVE: %s", strerror(errno)); continue; } + if (valkey_fstat(repldbfd, &buf) == -1) { + freeClientAsync(replica); + serverLog(LL_WARNING, "SYNC failed. Can't stat DB after BGSAVE: %s", strerror(errno)); + close(repldbfd); + continue; + } + replica->repldbfd = repldbfd; replica->repldboff = 0; replica->repldbsize = buf.st_size; replica->repl_state = REPLICA_STATE_SEND_BULK; replica->replpreamble = sdscatprintf(sdsempty(), "$%lld\r\n", (unsigned long long)replica->repldbsize); + /* When repl_state changes to REPLICA_STATE_SEND_BULK, we will release + * the resources in freeClient. */ connSetWriteHandler(replica->conn, NULL); if (connSetWriteHandler(replica->conn, sendBulkToReplica) == C_ERR) { freeClientAsync(replica); diff --git a/src/sds.c b/src/sds.c index e14f4bd0bd..4dd7d709aa 100644 --- a/src/sds.c +++ b/src/sds.c @@ -1032,6 +1032,86 @@ int hex_digit_to_int(char c) { } } +/* Helper function for sdssplitargs that parses a single argument. It + * populates the number characters needed to store the parsed argument + * in len, if provided, or will copy the parsed string into dst, if provided. + * If the string is able to be parsed, this function returns the number of + * characters that were parsed. If the argument can't be parsed, it + * returns 0. */ +static int sdsparsearg(const char *arg, unsigned int *len, char *dst) { + const char *p = arg; + int inq = 0; /* set to 1 if we are in "quotes" */ + int insq = 0; /* set to 1 if we are in 'single quotes' */ + int done = 0; + + while (!done) { + int new_char = -1; + if (inq) { + if (*p == '\\' && *(p + 1) == 'x' && is_hex_digit(*(p + 2)) && is_hex_digit(*(p + 3))) { + new_char = (hex_digit_to_int(*(p + 2)) * 16) + hex_digit_to_int(*(p + 3)); + p += 3; + } else if (*p == '\\' && *(p + 1)) { + p++; + switch (*p) { + case 'n': new_char = '\n'; break; + case 'r': new_char = '\r'; break; + case 't': new_char = '\t'; break; + case 'b': new_char = '\b'; break; + case 'a': new_char = '\a'; break; + default: new_char = *p; break; + } + } else if (*p == '"') { + /* closing quote must be followed by a space or + * nothing at all. */ + if (*(p + 1) && !isspace(*(p + 1))) return 0; + done = 1; + } else if (!*p) { + /* unterminated quotes */ + return 0; + } else { + new_char = *p; + } + } else if (insq) { + if (*p == '\\' && *(p + 1) == '\'') { + p++; + new_char = *p; + } else if (*p == '\'') { + /* closing quote must be followed by a space or + * nothing at all. */ + if (*(p + 1) && !isspace(*(p + 1))) return 0; + done = 1; + } else if (!*p) { + /* unterminated quotes */ + return 0; + } else { + new_char = *p; + } + } else { + switch (*p) { + case ' ': + case '\n': + case '\r': + case '\t': + case '\0': done = 1; break; + case '"': inq = 1; break; + case '\'': insq = 1; break; + default: new_char = *p; break; + } + } + if (new_char != -1) { + if (len) (*len)++; + if (dst) { + *dst = (char)new_char; + dst++; + } + } + if (*p) { + p++; + } + } + return p - arg; +} + /* Split a line into arguments, where every argument can be in the * following programming-language REPL-alike form: * @@ -1049,103 +1129,42 @@ int hex_digit_to_int(char c) { * The function returns the allocated tokens on success, even when the * input string is empty, or NULL if the input contains unbalanced * quotes or closed quotes followed by non space characters - * as in: "foo"bar or "foo' + * as in: "foo"bar or "foo'. + * + * The sds strings returned by this function are not initialized with + * extra space. */ sds *sdssplitargs(const char *line, int *argc) { const char *p = line; - char *current = NULL; char **vector = NULL; *argc = 0; - while (1) { + while (*p) { /* skip blanks */ while (*p && isspace(*p)) p++; - if (*p) { - /* get a token */ - int inq = 0; /* set to 1 if we are in "quotes" */ - int insq = 0; /* set to 1 if we are in 'single quotes' */ - int done = 0; - - if (current == NULL) current = sdsempty(); - while (!done) { - if (inq) { - if (*p == '\\' && *(p + 1) == 'x' && is_hex_digit(*(p + 2)) && is_hex_digit(*(p + 3))) { - unsigned char byte; - - byte = (hex_digit_to_int(*(p + 2)) * 16) + hex_digit_to_int(*(p + 3)); - current = sdscatlen(current, (char *)&byte, 1); - p += 3; - } else if (*p == '\\' && *(p + 1)) { - char c; - - p++; - switch (*p) { - case 'n': c = '\n'; break; - case 'r': c = '\r'; break; - case 't': c = '\t'; break; - case 'b': c = '\b'; break; - case 'a': c = '\a'; break; - default: c = *p; break; - } - current = sdscatlen(current, &c, 1); - } else if (*p == '"') { - /* closing quote must be followed by a space or - * nothing at all. */ - if (*(p + 1) && !isspace(*(p + 1))) goto err; - done = 1; - } else if (!*p) { - /* unterminated quotes */ - goto err; - } else { - current = sdscatlen(current, p, 1); - } - } else if (insq) { - if (*p == '\\' && *(p + 1) == '\'') { - p++; - current = sdscatlen(current, "'", 1); - } else if (*p == '\'') { - /* closing quote must be followed by a space or - * nothing at all. */ - if (*(p + 1) && !isspace(*(p + 1))) goto err; - done = 1; - } else if (!*p) { - /* unterminated quotes */ - goto err; - } else { - current = sdscatlen(current, p, 1); - } - } else { - switch (*p) { - case ' ': - case '\n': - case '\r': - case '\t': - case '\0': done = 1; break; - case '"': inq = 1; break; - case '\'': insq = 1; break; - default: current = sdscatlen(current, p, 1); break; - } - } - if (*p) p++; - } + if (!(*p)) break; + unsigned int len = 0; + if (sdsparsearg(p, &len, NULL)) { + sds current = sdsnewlen(SDS_NOINIT, len); + int parsedlen = sdsparsearg(p, NULL, current); + assert(parsedlen > 0); + p += parsedlen; + /* add the token to the vector */ vector = s_realloc(vector, ((*argc) + 1) * sizeof(char *)); vector[*argc] = current; (*argc)++; current = NULL; } else { - /* Even on empty input string return something not NULL. */ - if (vector == NULL) vector = s_malloc(sizeof(void *)); - return vector; + while ((*argc)--) sdsfree(vector[*argc]); + s_free(vector); + *argc = 0; + return NULL; } } - -err: - while ((*argc)--) sdsfree(vector[*argc]); - s_free(vector); - if (current) sdsfree(current); - *argc = 0; - return NULL; + /* Even on empty input string return something not NULL. */ + if (vector == NULL) vector = s_malloc(sizeof(void *)); + return vector; } /* Modify the string substituting all the occurrences of the set of diff --git a/src/server.c b/src/server.c index ee401509fc..413d2b7170 100644 --- a/src/server.c +++ b/src/server.c @@ -109,11 +109,69 @@ const char *replstateToString(int replstate); * function of the server may be called from other threads. */ void nolocks_localtime(struct tm *tmp, time_t t, time_t tz, int dst); +/* Formats the timezone offset into a string. daylight_active indicates whether dst is active (1) + * or not (0). */ +void formatTimezone(char *buf, size_t buflen, int timezone, int daylight_active) { + serverAssert(buflen >= 7); + serverAssert(timezone >= -50400 && timezone <= 43200); + // Adjust the timezone for daylight saving, if active + int total_offset = (-1) * timezone + 3600 * daylight_active; + int hours = abs(total_offset / 3600); + int minutes = abs(total_offset % 3600) / 60; + buf[0] = total_offset >= 0 ? '+' : '-'; + buf[1] = '0' + hours / 10; + buf[2] = '0' + hours % 10; + buf[3] = ':'; + buf[4] = '0' + minutes / 10; + buf[5] = '0' + minutes % 10; + buf[6] = '\0'; +} + +bool hasInvalidLogfmtChar(const char *msg) { + if (msg == NULL) return false; + + for (int i = 0; msg[i] != '\0'; i++) { + if (msg[i] == '"' || msg[i] == '\n' || msg[i] == '\r') { + return true; + } + } + return false; +} + +/* Modifies the input string by: + * replacing \r and \n with whitespace + * replacing " with ' + * + * Parameters: + * safemsg - A char pointer where the modified message will be stored + * safemsglen - size of safemsg + * msg - The original message */ +void filterInvalidLogfmtChar(char *safemsg, size_t safemsglen, const char *msg) { + serverAssert(safemsglen == LOG_MAX_LEN); + if (msg == NULL) return; + + size_t index = 0; + while (index < safemsglen - 1 && msg[index] != '\0') { + if (msg[index] == '"') { + safemsg[index] = '\''; + } else if (msg[index] == '\n' || msg[index] == '\r') { + safemsg[index] = ' '; + } else { + safemsg[index] = msg[index]; + } + index++; + } + safemsg[index] = '\0'; +} + /* Low level logging. To use only for very big messages, otherwise * serverLog() is to prefer. */ void serverLogRaw(int level, const char *msg) { const int syslogLevelMap[] = {LOG_DEBUG, LOG_INFO, LOG_NOTICE, LOG_WARNING}; const char *c = ".-*#"; + const char *verbose_level[] = {"debug", "info", "notice", "warning"}; + const char *roles[] = {"sentinel", "RDB/AOF", "replica", "primary"}; + const char *role_chars = "XCSM"; FILE *fp; char buf[64]; int rawmode = (level & LL_RAW); @@ -133,23 +191,54 @@ void serverLogRaw(int level, const char *msg) { } else { int off; struct timeval tv; - int role_char; pid_t pid = getpid(); int daylight_active = atomic_load_explicit(&server.daylight_active, memory_order_relaxed); gettimeofday(&tv, NULL); struct tm tm; nolocks_localtime(&tm, tv.tv_sec, server.timezone, daylight_active); - off = strftime(buf, sizeof(buf), "%d %b %Y %H:%M:%S.", &tm); - snprintf(buf + off, sizeof(buf) - off, "%03d", (int)tv.tv_usec / 1000); + switch (server.log_timestamp_format) { + case LOG_TIMESTAMP_LEGACY: + off = strftime(buf, sizeof(buf), "%d %b %Y %H:%M:%S.", &tm); + snprintf(buf + off, sizeof(buf) - off, "%03d", (int)tv.tv_usec / 1000); + break; + + case LOG_TIMESTAMP_ISO8601: + off = strftime(buf, sizeof(buf), "%Y-%m-%dT%H:%M:%S.", &tm); + char tzbuf[7]; + formatTimezone(tzbuf, sizeof(tzbuf), server.timezone, server.daylight_active); + snprintf(buf + off, sizeof(buf) - off, "%03d%s", (int)tv.tv_usec / 1000, tzbuf); + break; + + case LOG_TIMESTAMP_MILLISECONDS: + snprintf(buf, sizeof(buf), "%lld", (long long)tv.tv_sec * 1000 + (long long)tv.tv_usec / 1000); + break; + } + int role_index; if (server.sentinel_mode) { - role_char = 'X'; /* Sentinel. */ + role_index = 0; /* Sentinel. */ } else if (pid != server.pid) { - role_char = 'C'; /* RDB / AOF writing child. */ + role_index = 1; /* RDB / AOF writing child. */ } else { - role_char = (server.primary_replication_link ? 'S' : 'M'); /* replica or Primary. */ + role_index = (server.primary_replication_link ? 2 : 3); /* replica or Primary. */ + } + switch (server.log_format) { + case LOG_FORMAT_LOGFMT: + if (hasInvalidLogfmtChar(msg)) { + char safemsg[LOG_MAX_LEN]; + filterInvalidLogfmtChar(safemsg, LOG_MAX_LEN, msg); + fprintf(fp, "pid=%d role=%s timestamp=\"%s\" level=%s message=\"%s\"\n", (int)getpid(), roles[role_index], + buf, verbose_level[level], safemsg); + } else { + fprintf(fp, "pid=%d role=%s timestamp=\"%s\" level=%s message=\"%s\"\n", (int)getpid(), roles[role_index], + buf, verbose_level[level], msg); + } + break; + + case LOG_FORMAT_LEGACY: + fprintf(fp, "%d:%c %s %c %s\n", (int)getpid(), role_chars[role_index], buf, c[level], msg); + break; } - fprintf(fp, "%d:%c %s %c %s\n", (int)getpid(), role_char, buf, c[level], msg); } fflush(fp); @@ -3818,12 +3907,6 @@ int processCommand(client *c) { reqresAppendRequest(c); } - /* Handle possible security attacks. */ - if (!strcasecmp(c->argv[0]->ptr, "host:") || !strcasecmp(c->argv[0]->ptr, "post")) { - securityWarningCommand(c); - return C_ERR; - } - /* If we're inside a module blocked context yielding that wants to avoid * processing clients, postpone the command. */ if (server.busy_module_yield_flags != BUSY_MODULE_YIELD_NONE && @@ -3838,6 +3921,13 @@ int processCommand(client *c) { * we do not have to repeat the same checks */ if (!client_reprocessing_command) { struct serverCommand *cmd = c->io_parsed_cmd ? c->io_parsed_cmd : lookupCommand(c->argv, c->argc); + if (!cmd) { + /* Handle possible security attacks. */ + if (!strcasecmp(c->argv[0]->ptr, "host:") || !strcasecmp(c->argv[0]->ptr, "post")) { + securityWarningCommand(c); + return C_ERR; + } + } c->cmd = c->lastcmd = c->realcmd = cmd; sds err; if (!commandCheckExistence(c, &err)) { @@ -6711,7 +6801,8 @@ serverTestProc *getTestProcByName(const char *name) { } #endif -int main(int argc, char **argv) { +/* Main is marked as weak so that unit tests can use their own main function. */ +__attribute__((weak)) int main(int argc, char **argv) { struct timeval tv; int j; char config_from_stdin = 0; diff --git a/src/server.h b/src/server.h index b0520093fd..bda62acdd7 100644 --- a/src/server.h +++ b/src/server.h @@ -568,6 +568,15 @@ typedef enum { #define PAUSE_ACTION_EVICT (1 << 3) #define PAUSE_ACTION_REPLICA (1 << 4) /* pause replica traffic */ +/* Sets log format */ +typedef enum { LOG_FORMAT_LEGACY = 0, + LOG_FORMAT_LOGFMT } log_format_type; + +/* Sets log timestamp format */ +typedef enum { LOG_TIMESTAMP_LEGACY = 0, + LOG_TIMESTAMP_ISO8601, + LOG_TIMESTAMP_MILLISECONDS } log_timestamp_type; + /* common sets of actions to pause/unpause */ #define PAUSE_ACTIONS_CLIENT_WRITE_SET \ (PAUSE_ACTION_CLIENT_WRITE | PAUSE_ACTION_EXPIRE | PAUSE_ACTION_EVICT | PAUSE_ACTION_REPLICA) @@ -2015,17 +2024,19 @@ struct valkeyServer { serverOpArray also_propagate; /* Additional command to propagate. */ int replication_allowed; /* Are we allowed to replicate? */ /* Logging */ - char *logfile; /* Path of log file */ - int syslog_enabled; /* Is syslog enabled? */ - char *syslog_ident; /* Syslog ident */ - int syslog_facility; /* Syslog facility */ - int crashlog_enabled; /* Enable signal handler for crashlog. - * disable for clean core dumps. */ - int crashed; /* True if the server has crashed, used in catClientInfoString - * to indicate that no wait for IO threads is needed. */ - int memcheck_enabled; /* Enable memory check on crash. */ - int use_exit_on_panic; /* Use exit() on panic and assert rather than - * abort(). useful for Valgrind. */ + char *logfile; /* Path of log file */ + int syslog_enabled; /* Is syslog enabled? */ + char *syslog_ident; /* Syslog ident */ + int syslog_facility; /* Syslog facility */ + int crashlog_enabled; /* Enable signal handler for crashlog. + * disable for clean core dumps. */ + int crashed; /* True if the server has crashed, used in catClientInfoString + * to indicate that no wait for IO threads is needed. */ + int memcheck_enabled; /* Enable memory check on crash. */ + int use_exit_on_panic; /* Use exit() on panic and assert rather than + * abort(). useful for Valgrind. */ + int log_format; /* Print log in specific format */ + int log_timestamp_format; /* Timestamp format in log */ /* Shutdown */ int shutdown_timeout; /* Graceful shutdown time limit in seconds. */ int shutdown_on_sigint; /* Shutdown flags configured for SIGINT. */ diff --git a/src/sort.c b/src/sort.c index f027b0c321..92777b068c 100644 --- a/src/sort.c +++ b/src/sort.c @@ -43,6 +43,11 @@ serverSortOperation *createSortOperation(int type, robj *pattern) { return so; } +/* Return 1 if pattern is the special pattern '#'. */ +static int isReturnSubstPattern(sds pattern) { + return pattern[0] == '#' && pattern[1] == '\0'; +} + /* Return the value associated to the key with a name obtained using * the following rules: * @@ -68,7 +73,7 @@ robj *lookupKeyByPattern(serverDb *db, robj *pattern, robj *subst) { /* If the pattern is "#" return the substitution object itself in order * to implement the "SORT ... GET #" feature. */ spat = pattern->ptr; - if (spat[0] == '#' && spat[1] == '\0') { + if (isReturnSubstPattern(spat)) { incrRefCount(subst); return subst; } @@ -258,6 +263,7 @@ void sortCommandGeneric(client *c, int readonly) { * unless we can make sure the keys formed by the pattern are in the same slot * as the key to sort. */ if (server.cluster_enabled && + !isReturnSubstPattern(c->argv[j + 1]->ptr) && patternHashSlot(c->argv[j + 1]->ptr, sdslen(c->argv[j + 1]->ptr)) != getKeySlot(c->argv[1]->ptr)) { addReplyError(c, "GET option of SORT denied in Cluster mode when " "keys formed by the pattern may be in different slots."); diff --git a/src/unit/test_files.h b/src/unit/test_files.h index cd2e0c5b92..c2b062039a 100644 --- a/src/unit/test_files.h +++ b/src/unit/test_files.h @@ -99,6 +99,7 @@ int test_raxFuzz(int argc, char **argv, int flags); int test_sds(int argc, char **argv, int flags); int test_typesAndAllocSize(int argc, char **argv, int flags); int test_sdsHeaderSizes(int argc, char **argv, int flags); +int test_sdssplitargs(int argc, char **argv, int flags); int test_sha1(int argc, char **argv, int flags); int test_string2ll(int argc, char **argv, int flags); int test_string2l(int argc, char **argv, int flags); @@ -157,7 +158,7 @@ unitTest __test_intset_c[] = {{"test_intsetValueEncodings", test_intsetValueEnco unitTest __test_kvstore_c[] = {{"test_kvstoreAdd16Keys", test_kvstoreAdd16Keys}, {"test_kvstoreIteratorRemoveAllKeysNoDeleteEmptyDict", test_kvstoreIteratorRemoveAllKeysNoDeleteEmptyDict}, {"test_kvstoreIteratorRemoveAllKeysDeleteEmptyDict", test_kvstoreIteratorRemoveAllKeysDeleteEmptyDict}, {"test_kvstoreDictIteratorRemoveAllKeysNoDeleteEmptyDict", test_kvstoreDictIteratorRemoveAllKeysNoDeleteEmptyDict}, {"test_kvstoreDictIteratorRemoveAllKeysDeleteEmptyDict", test_kvstoreDictIteratorRemoveAllKeysDeleteEmptyDict}, {NULL, NULL}}; unitTest __test_listpack_c[] = {{"test_listpackCreateIntList", test_listpackCreateIntList}, {"test_listpackCreateList", test_listpackCreateList}, {"test_listpackLpPrepend", test_listpackLpPrepend}, {"test_listpackLpPrependInteger", test_listpackLpPrependInteger}, {"test_listpackGetELementAtIndex", test_listpackGetELementAtIndex}, {"test_listpackPop", test_listpackPop}, {"test_listpackGetELementAtIndex2", test_listpackGetELementAtIndex2}, {"test_listpackIterate0toEnd", test_listpackIterate0toEnd}, {"test_listpackIterate1toEnd", test_listpackIterate1toEnd}, {"test_listpackIterate2toEnd", test_listpackIterate2toEnd}, {"test_listpackIterateBackToFront", test_listpackIterateBackToFront}, {"test_listpackIterateBackToFrontWithDelete", test_listpackIterateBackToFrontWithDelete}, {"test_listpackDeleteWhenNumIsMinusOne", test_listpackDeleteWhenNumIsMinusOne}, {"test_listpackDeleteWithNegativeIndex", test_listpackDeleteWithNegativeIndex}, {"test_listpackDeleteInclusiveRange0_0", test_listpackDeleteInclusiveRange0_0}, {"test_listpackDeleteInclusiveRange0_1", test_listpackDeleteInclusiveRange0_1}, {"test_listpackDeleteInclusiveRange1_2", test_listpackDeleteInclusiveRange1_2}, {"test_listpackDeleteWitStartIndexOutOfRange", test_listpackDeleteWitStartIndexOutOfRange}, {"test_listpackDeleteWitNumOverflow", test_listpackDeleteWitNumOverflow}, {"test_listpackBatchDelete", test_listpackBatchDelete}, {"test_listpackDeleteFooWhileIterating", test_listpackDeleteFooWhileIterating}, {"test_listpackReplaceWithSameSize", test_listpackReplaceWithSameSize}, {"test_listpackReplaceWithDifferentSize", test_listpackReplaceWithDifferentSize}, {"test_listpackRegressionGt255Bytes", test_listpackRegressionGt255Bytes}, {"test_listpackCreateLongListAndCheckIndices", test_listpackCreateLongListAndCheckIndices}, {"test_listpackCompareStrsWithLpEntries", test_listpackCompareStrsWithLpEntries}, {"test_listpackLpMergeEmptyLps", test_listpackLpMergeEmptyLps}, {"test_listpackLpMergeLp1Larger", test_listpackLpMergeLp1Larger}, {"test_listpackLpMergeLp2Larger", test_listpackLpMergeLp2Larger}, {"test_listpackLpNextRandom", test_listpackLpNextRandom}, {"test_listpackLpNextRandomCC", test_listpackLpNextRandomCC}, {"test_listpackRandomPairWithOneElement", test_listpackRandomPairWithOneElement}, {"test_listpackRandomPairWithManyElements", test_listpackRandomPairWithManyElements}, {"test_listpackRandomPairsWithOneElement", test_listpackRandomPairsWithOneElement}, {"test_listpackRandomPairsWithManyElements", test_listpackRandomPairsWithManyElements}, {"test_listpackRandomPairsUniqueWithOneElement", test_listpackRandomPairsUniqueWithOneElement}, {"test_listpackRandomPairsUniqueWithManyElements", test_listpackRandomPairsUniqueWithManyElements}, {"test_listpackPushVariousEncodings", test_listpackPushVariousEncodings}, {"test_listpackLpFind", test_listpackLpFind}, {"test_listpackLpValidateIntegrity", test_listpackLpValidateIntegrity}, {"test_listpackNumberOfElementsExceedsLP_HDR_NUMELE_UNKNOWN", test_listpackNumberOfElementsExceedsLP_HDR_NUMELE_UNKNOWN}, {"test_listpackStressWithRandom", test_listpackStressWithRandom}, {"test_listpackSTressWithVariableSize", test_listpackSTressWithVariableSize}, {"test_listpackBenchmarkInit", test_listpackBenchmarkInit}, {"test_listpackBenchmarkLpAppend", test_listpackBenchmarkLpAppend}, {"test_listpackBenchmarkLpFindString", test_listpackBenchmarkLpFindString}, {"test_listpackBenchmarkLpFindNumber", test_listpackBenchmarkLpFindNumber}, {"test_listpackBenchmarkLpSeek", test_listpackBenchmarkLpSeek}, {"test_listpackBenchmarkLpValidateIntegrity", test_listpackBenchmarkLpValidateIntegrity}, {"test_listpackBenchmarkLpCompareWithString", test_listpackBenchmarkLpCompareWithString}, {"test_listpackBenchmarkLpCompareWithNumber", test_listpackBenchmarkLpCompareWithNumber}, {"test_listpackBenchmarkFree", test_listpackBenchmarkFree}, {NULL, NULL}}; unitTest __test_rax_c[] = {{"test_raxRandomWalk", test_raxRandomWalk}, {"test_raxIteratorUnitTests", test_raxIteratorUnitTests}, {"test_raxTryInsertUnitTests", test_raxTryInsertUnitTests}, {"test_raxRegressionTest1", test_raxRegressionTest1}, {"test_raxRegressionTest2", test_raxRegressionTest2}, {"test_raxRegressionTest3", test_raxRegressionTest3}, {"test_raxRegressionTest4", test_raxRegressionTest4}, {"test_raxRegressionTest5", test_raxRegressionTest5}, {"test_raxRegressionTest6", test_raxRegressionTest6}, {"test_raxBenchmark", test_raxBenchmark}, {"test_raxHugeKey", test_raxHugeKey}, {"test_raxFuzz", test_raxFuzz}, {NULL, NULL}}; -unitTest __test_sds_c[] = {{"test_sds", test_sds}, {"test_typesAndAllocSize", test_typesAndAllocSize}, {"test_sdsHeaderSizes", test_sdsHeaderSizes}, {NULL, NULL}}; +unitTest __test_sds_c[] = {{"test_sds", test_sds}, {"test_typesAndAllocSize", test_typesAndAllocSize}, {"test_sdsHeaderSizes", test_sdsHeaderSizes}, {"test_sdssplitargs", test_sdssplitargs}, {NULL, NULL}}; unitTest __test_sha1_c[] = {{"test_sha1", test_sha1}, {NULL, NULL}}; unitTest __test_util_c[] = {{"test_string2ll", test_string2ll}, {"test_string2l", test_string2l}, {"test_ll2string", test_ll2string}, {"test_ld2string", test_ld2string}, {"test_fixedpoint_d2string", test_fixedpoint_d2string}, {"test_version2num", test_version2num}, {"test_reclaimFilePageCache", test_reclaimFilePageCache}, {NULL, NULL}}; unitTest __test_ziplist_c[] = {{"test_ziplistCreateIntList", test_ziplistCreateIntList}, {"test_ziplistPop", test_ziplistPop}, {"test_ziplistGetElementAtIndex3", test_ziplistGetElementAtIndex3}, {"test_ziplistGetElementOutOfRange", test_ziplistGetElementOutOfRange}, {"test_ziplistGetLastElement", test_ziplistGetLastElement}, {"test_ziplistGetFirstElement", test_ziplistGetFirstElement}, {"test_ziplistGetElementOutOfRangeReverse", test_ziplistGetElementOutOfRangeReverse}, {"test_ziplistIterateThroughFullList", test_ziplistIterateThroughFullList}, {"test_ziplistIterateThroughListFrom1ToEnd", test_ziplistIterateThroughListFrom1ToEnd}, {"test_ziplistIterateThroughListFrom2ToEnd", test_ziplistIterateThroughListFrom2ToEnd}, {"test_ziplistIterateThroughStartOutOfRange", test_ziplistIterateThroughStartOutOfRange}, {"test_ziplistIterateBackToFront", test_ziplistIterateBackToFront}, {"test_ziplistIterateBackToFrontDeletingAllItems", test_ziplistIterateBackToFrontDeletingAllItems}, {"test_ziplistDeleteInclusiveRange0To0", test_ziplistDeleteInclusiveRange0To0}, {"test_ziplistDeleteInclusiveRange0To1", test_ziplistDeleteInclusiveRange0To1}, {"test_ziplistDeleteInclusiveRange1To2", test_ziplistDeleteInclusiveRange1To2}, {"test_ziplistDeleteWithStartIndexOutOfRange", test_ziplistDeleteWithStartIndexOutOfRange}, {"test_ziplistDeleteWithNumOverflow", test_ziplistDeleteWithNumOverflow}, {"test_ziplistDeleteFooWhileIterating", test_ziplistDeleteFooWhileIterating}, {"test_ziplistReplaceWithSameSize", test_ziplistReplaceWithSameSize}, {"test_ziplistReplaceWithDifferentSize", test_ziplistReplaceWithDifferentSize}, {"test_ziplistRegressionTestForOver255ByteStrings", test_ziplistRegressionTestForOver255ByteStrings}, {"test_ziplistRegressionTestDeleteNextToLastEntries", test_ziplistRegressionTestDeleteNextToLastEntries}, {"test_ziplistCreateLongListAndCheckIndices", test_ziplistCreateLongListAndCheckIndices}, {"test_ziplistCompareStringWithZiplistEntries", test_ziplistCompareStringWithZiplistEntries}, {"test_ziplistMergeTest", test_ziplistMergeTest}, {"test_ziplistStressWithRandomPayloadsOfDifferentEncoding", test_ziplistStressWithRandomPayloadsOfDifferentEncoding}, {"test_ziplistCascadeUpdateEdgeCases", test_ziplistCascadeUpdateEdgeCases}, {"test_ziplistInsertEdgeCase", test_ziplistInsertEdgeCase}, {"test_ziplistStressWithVariableSize", test_ziplistStressWithVariableSize}, {"test_BenchmarkziplistFind", test_BenchmarkziplistFind}, {"test_BenchmarkziplistIndex", test_BenchmarkziplistIndex}, {"test_BenchmarkziplistValidateIntegrity", test_BenchmarkziplistValidateIntegrity}, {"test_BenchmarkziplistCompareWithString", test_BenchmarkziplistCompareWithString}, {"test_BenchmarkziplistCompareWithNumber", test_BenchmarkziplistCompareWithNumber}, {"test_ziplistStress__ziplistCascadeUpdate", test_ziplistStress__ziplistCascadeUpdate}, {NULL, NULL}}; diff --git a/src/unit/test_sds.c b/src/unit/test_sds.c index 19b5c7d73f..b97d0d9d32 100644 --- a/src/unit/test_sds.c +++ b/src/unit/test_sds.c @@ -328,3 +328,44 @@ int test_sdsHeaderSizes(int argc, char **argv, int flags) { return 0; } + +int test_sdssplitargs(int argc, char **argv, int flags) { + UNUSED(argc); + UNUSED(argv); + UNUSED(flags); + + int len; + sds *sargv; + + sargv = sdssplitargs("Testing one two three", &len); + TEST_ASSERT(4 == len); + TEST_ASSERT(!strcmp("Testing", sargv[0])); + TEST_ASSERT(!strcmp("one", sargv[1])); + TEST_ASSERT(!strcmp("two", sargv[2])); + TEST_ASSERT(!strcmp("three", sargv[3])); + sdsfreesplitres(sargv, len); + + sargv = sdssplitargs("", &len); + TEST_ASSERT(0 == len); + TEST_ASSERT(sargv != NULL); + sdsfreesplitres(sargv, len); + + sargv = sdssplitargs("\"Testing split strings\" \'Another split string\'", &len); + TEST_ASSERT(2 == len); + TEST_ASSERT(!strcmp("Testing split strings", sargv[0])); + TEST_ASSERT(!strcmp("Another split string", sargv[1])); + sdsfreesplitres(sargv, len); + + sargv = sdssplitargs("\"Hello\" ", &len); + TEST_ASSERT(1 == len); + TEST_ASSERT(!strcmp("Hello", sargv[0])); + sdsfreesplitres(sargv, len); + + char *binary_string = "\"\\x73\\x75\\x70\\x65\\x72\\x20\\x00\\x73\\x65\\x63\\x72\\x65\\x74\\x20\\x70\\x61\\x73\\x73\\x77\\x6f\\x72\\x64\""; + sargv = sdssplitargs(binary_string, &len); + TEST_ASSERT(1 == len); + TEST_ASSERT(22 == sdslen(sargv[0])); + sdsfreesplitres(sargv, len); + + return 0; +} diff --git a/src/ziplist.c b/src/ziplist.c index d4f8b71699..608487fa2b 100644 --- a/src/ziplist.c +++ b/src/ziplist.c @@ -1,4 +1,4 @@ -/* The ziplist is a specially encoded dually linked list that is designed +/* The ziplist is a specially encoded doubly linked list that is designed * to be very memory efficient. It stores both strings and integer values, * where integers are encoded as actual integers instead of a series of * characters. It allows push and pop operations on either side of the list diff --git a/tests/cluster/tests/28-cluster-shards.tcl b/tests/cluster/tests/28-cluster-shards.tcl index d6534c816b..5fb6743246 100644 --- a/tests/cluster/tests/28-cluster-shards.tcl +++ b/tests/cluster/tests/28-cluster-shards.tcl @@ -117,7 +117,7 @@ test "Kill a node and tell the replica to immediately takeover" { # Primary 0 node should report as fail, wait until the new primary acknowledges it. test "Verify health as fail for killed node" { - wait_for_condition 50 100 { + wait_for_condition 1000 50 { "fail" eq [dict get [get_node_info_from_shard $node_0_id 4 "node"] "health"] } else { fail "New primary never detected the node failed" diff --git a/tests/integration/rdb.tcl b/tests/integration/rdb.tcl index e3f92bf521..61cb0cea7e 100644 --- a/tests/integration/rdb.tcl +++ b/tests/integration/rdb.tcl @@ -170,6 +170,64 @@ start_server {} { } assert_equal [s rdb_changes_since_last_save] 0 } + + test {bgsave cancel aborts save} { + r config set save "" + # Generating RDB will take some 100 seconds + r config set rdb-key-save-delay 1000000 + populate 100 "" 16 + + r bgsave + wait_for_condition 50 100 { + [s rdb_bgsave_in_progress] == 1 + } else { + fail "bgsave did not start in time" + } + set fork_child_pid [get_child_pid 0] + + assert {[r bgsave cancel] eq {Background saving cancelled}} + set temp_rdb [file join [lindex [r config get dir] 1] temp-${fork_child_pid}.rdb] + # Temp rdb must be deleted + wait_for_condition 50 100 { + ![file exists $temp_rdb] + } else { + fail "bgsave temp file was not deleted after cancel" + } + + # Make sure no save is running and that bgsave return an error + wait_for_condition 50 100 { + [s rdb_bgsave_in_progress] == 0 + } else { + fail "bgsave is currently running" + } + assert_error "ERR Background saving is currently not in progress or scheduled" {r bgsave cancel} + } + + test {bgsave cancel schedulled request} { + r config set save "" + # Generating RDB will take some 100 seconds + r config set rdb-key-save-delay 1000000 + populate 100 "" 16 + + # start a long AOF child + r bgrewriteaof + wait_for_condition 50 100 { + [s aof_rewrite_in_progress] == 1 + } else { + fail "aof not started" + } + + # Make sure cancel return valid status + assert {[r bgsave schedule] eq {Background saving scheduled}} + + # Cancel the scheduled save + assert {[r bgsave cancel] eq {Scheduled background saving cancelled}} + + # Make sure a second call to bgsave cancel return an error + assert_error "ERR Background saving is currently not in progress or scheduled" {r bgsave cancel} + } + + } test {client freed during loading} { diff --git a/tests/unit/cluster/cluster-shards.tcl b/tests/unit/cluster/cluster-shards.tcl index 19acd186f5..170114d822 100644 --- a/tests/unit/cluster/cluster-shards.tcl +++ b/tests/unit/cluster/cluster-shards.tcl @@ -42,7 +42,7 @@ start_cluster 3 3 {tags {external:skip cluster}} { } test "Verify health as fail for killed node" { - wait_for_condition 50 100 { + wait_for_condition 1000 50 { "fail" eq [dict get [get_node_info_from_shard $node_0_id $validation_node "node"] "health"] } else { fail "New primary never detected the node failed" diff --git a/tests/unit/sort.tcl b/tests/unit/sort.tcl index 397e7e12ea..cd171ee51e 100644 --- a/tests/unit/sort.tcl +++ b/tests/unit/sort.tcl @@ -384,24 +384,28 @@ start_cluster 1 0 {tags {"external:skip cluster sort"}} { test "sort by in cluster mode" { catch {r sort "{a}mylist" by by*} e assert_match {ERR BY option of SORT denied in Cluster mode when *} $e - r sort "{a}mylist" by "{a}by*" - } {3 1 2} + assert_equal {3 1 2} [r sort "{a}mylist" by "{a}by*"] + assert_equal {3 1 2} [r sort "{a}mylist" by "{a}by*" get #] + } test "sort get in cluster mode" { catch {r sort "{a}mylist" by "{a}by*" get get*} e assert_match {ERR GET option of SORT denied in Cluster mode when *} $e - r sort "{a}mylist" by "{a}by*" get "{a}get*" - } {30 200 100} + assert_equal {30 200 100} [r sort "{a}mylist" by "{a}by*" get "{a}get*"] + assert_equal {30 3 200 1 100 2} [r sort "{a}mylist" by "{a}by*" get "{a}get*" get #] + } test "sort_ro by in cluster mode" { catch {r sort_ro "{a}mylist" by by*} e assert_match {ERR BY option of SORT denied in Cluster mode when *} $e - r sort_ro "{a}mylist" by "{a}by*" - } {3 1 2} + assert_equal {3 1 2} [r sort_ro "{a}mylist" by "{a}by*"] + assert_equal {3 1 2} [r sort_ro "{a}mylist" by "{a}by*" get #] + } test "sort_ro get in cluster mode" { catch {r sort_ro "{a}mylist" by "{a}by*" get get*} e assert_match {ERR GET option of SORT denied in Cluster mode when *} $e - r sort_ro "{a}mylist" by "{a}by*" get "{a}get*" - } {30 200 100} + assert_equal {30 200 100} [r sort_ro "{a}mylist" by "{a}by*" get "{a}get*"] + assert_equal {30 3 200 1 100 2} [r sort_ro "{a}mylist" by "{a}by*" get "{a}get*" get #] + } } diff --git a/valkey.conf b/valkey.conf index f485b42b1a..7c7b9da43e 100644 --- a/valkey.conf +++ b/valkey.conf @@ -348,6 +348,23 @@ pidfile /var/run/valkey_6379.pid # nothing (nothing is logged) loglevel notice +# Specify the logging format. +# This can be one of: +# +# - legacy: the default, traditional log format +# - logfmt: a structured log format; see https://www.brandur.org/logfmt +# +# log-format legacy + +# Specify the timestamp format used in logs using 'log-timestamp-format'. +# +# - legacy: default format +# - iso8601: ISO 8601 extended date and time with time zone, on the form +# yyyy-mm-ddThh:mm:ss.sss±hh:mm +# - milliseconds: milliseconds since the epoch +# +# log-timestamp-format legacy + # Specify the log file name. Also the empty string can be used to force # the server to log on the standard output. Note that if you use standard # output for logging but daemonize, logs will be sent to /dev/null @@ -1308,7 +1325,11 @@ lazyfree-lazy-user-del yes # deletion, which can be controlled by passing the [SYNC|ASYNC] flags into the # commands. When neither flag is passed, this directive will be used to determine # if the data should be deleted asynchronously. - +# +# When a replica performs a node reset via CLUSTER RESET, the entire +# database content is removed to allow the node to become an empty primary. +# This directive also determines whether the data should be deleted asynchronously. +# # There are many problems with running flush synchronously. Even in single CPU # environments, the thread managers should balance between the freeing and # serving incoming requests. The default value is yes. @@ -1576,7 +1597,7 @@ aof-timestamp-enabled no # Maximum time to wait for replicas when shutting down, in seconds. # # During shut down, a grace period allows any lagging replicas to catch up with -# the latest replication offset before the primary exists. This period can +# the latest replication offset before the primary exits. This period can # prevent data loss, especially for deployments without configured disk backups. # # The 'shutdown-timeout' value is the grace period's duration in seconds. It is