From f821c869a6754d9620286c940345c57e88c7c2eb Mon Sep 17 00:00:00 2001 From: Uri Yagelnik Date: Mon, 6 May 2024 08:01:47 +0000 Subject: [PATCH 1/7] Memory Access Amortization Signed-off-by: Uri Yagelnik --- src/dict.c | 141 ++++++++++++++++++++++++++++++++++++ src/dict.h | 4 +- src/fmtargs.h | 6 +- src/kvstore.c | 12 ++++ src/kvstore.h | 5 ++ src/networking.c | 146 ++++++++++++++++++++++++++++++++++++-- src/server.c | 5 ++ src/server.h | 2 + tests/unit/networking.tcl | 37 ++++++++++ utils/generate-fmtargs.py | 2 +- 10 files changed, 349 insertions(+), 11 deletions(-) diff --git a/src/dict.c b/src/dict.c index 2eb3dd386f..eb59949f94 100644 --- a/src/dict.c +++ b/src/dict.c @@ -1541,6 +1541,147 @@ dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctio return v; } +typedef enum { PrefetchStart, PrefetchBucket, PrefetchEntry, PrefetchValue, PrefetchDone } PrefetchState; + +typedef struct { + PrefetchState state; + int ht_idx; + uint64_t idx; + uint64_t key_hash; + dictEntry *current_entry; +} PrefetchInfo; + +/* dictPrefetch - Prefetches dictionary data for an array of keys + * + * This function takes an array of dictionaries and keys, attempting to bring + * data closer to the L1 cache that might be needed for dictionary operations + * on those keys. + * + * dictFind Algorithm: + * 1. Evaluate the hash of the key + * 2. Access the index in the first table + * 3. Walk the linked list until the key is found + * If the key hasn't been found and the dictionary is in the middle of rehashing, + * access the index on the second table and repeat step 3 + * + * dictPrefetch executes the same algorithm as dictFind, but one step at a time + * for each key. Instead of waiting for data to be read from memory, it prefetches + * the data and then moves on to execute the next prefetch for another key. + * + * dictPrefetch can be invoked with a callback function, get_val_data_func, + * to bring the key's value data closer to the L1 cache as well. */ +void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *(*get_val_data_func)(const void *val)) { + PrefetchInfo prefetchInfo[DictMaxPrefetchSize]; + size_t done = 0; + + assert(num_keys <= DictMaxPrefetchSize); + + /* Initialize the prefetch info */ + for (size_t i = 0; i < num_keys; i++) { + PrefetchInfo *info = &prefetchInfo[i]; + if (!keys_dicts[i] || dictSize(keys_dicts[i]) == 0) { + info->state = PrefetchDone; + done++; + continue; + } + info->ht_idx = -1; + info->current_entry = NULL; + info->state = PrefetchStart; + info->key_hash = dictHashKey(keys_dicts[i], keys[i]); + } + + for (size_t j = 0; done < num_keys; j++) { + size_t i = j % num_keys; + PrefetchInfo *info = &prefetchInfo[i]; + switch (info->state) { + case PrefetchDone: + /* Skip already processed keys */ + break; + + case PrefetchStart: + /* Determine which hash table to use */ + if (info->ht_idx == -1) { + info->ht_idx = 0; + } else if (info->ht_idx == 0 && dictIsRehashing(keys_dicts[i])) { + info->ht_idx = 1; + } else { + done++; + info->state = PrefetchDone; + break; + } + + /* Prefetch the bucket */ + info->idx = info->key_hash & DICTHT_SIZE_MASK(keys_dicts[i]->ht_size_exp[info->ht_idx]); + __builtin_prefetch(&keys_dicts[i]->ht_table[info->ht_idx][info->idx]); + info->state = PrefetchBucket; + break; + + case PrefetchBucket: + /* Prefetch the first entry in the bucket */ + info->current_entry = keys_dicts[i]->ht_table[info->ht_idx][info->idx]; + if (info->current_entry) { + __builtin_prefetch(info->current_entry); + info->state = PrefetchEntry; + } else { + /* No entry found in the bucket - try the next table */ + info->state = PrefetchStart; + } + break; + + case PrefetchEntry: { + /* Prefetch the entry's value. */ + void *value = get_val_data_func ? dictGetVal(info->current_entry) : NULL; + + if (info->current_entry->next == NULL && !dictIsRehashing(keys_dicts[i])) { + /* If this is the last element we assume a hit and dont compare the keys */ + if (value) { + __builtin_prefetch(value); + info->state = PrefetchValue; + } else { + done++; + info->state = PrefetchDone; + } + break; + } + + if (value) { + void *current_entry_key = dictGetKey(info->current_entry); + if (keys[i] == current_entry_key || dictCompareKeys(keys_dicts[i], keys[i], current_entry_key)) { + /* If the key is found, prefetch the value */ + __builtin_prefetch(value); + info->state = PrefetchValue; + break; + } + } + + /* Move to next entry or start over */ + info->current_entry = dictGetNext(info->current_entry); + if (info->current_entry) { + __builtin_prefetch(info->current_entry); + info->state = PrefetchEntry; + } else { + info->state = PrefetchStart; + } + + break; + } + + case PrefetchValue: { + /* Prefetch value data if available */ + void *value_data = get_val_data_func(dictGetVal(info->current_entry)); + if (value_data) { + __builtin_prefetch(value_data); + } + done++; + info->state = PrefetchDone; + break; + } + + default: assert(0); + } + } +} + /* ------------------------- private functions ------------------------------ */ /* Because we may need to allocate huge memory chunk at once when dict diff --git a/src/dict.h b/src/dict.h index 97a79910cb..f984023e23 100644 --- a/src/dict.h +++ b/src/dict.h @@ -45,7 +45,8 @@ #define DICT_ERR 1 /* Hash table parameters */ -#define HASHTABLE_MIN_FILL 8 /* Minimal hash table fill 12.5%(100/8) */ +#define HASHTABLE_MIN_FILL 8 /* Minimal hash table fill 12.5%(100/8) */ +#define DictMaxPrefetchSize 16 /* Limit of maximum number of dict entries to prefetch */ typedef struct dictEntry dictEntry; /* opaque */ typedef struct dict dict; @@ -247,6 +248,7 @@ unsigned long dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata); uint64_t dictGetHash(dict *d, const void *key); void dictRehashingInfo(dict *d, unsigned long long *from_size, unsigned long long *to_size); +void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *(*get_val_data_func)(const void *val)); size_t dictGetStatsMsg(char *buf, size_t bufsize, dictStats *stats, int full); dictStats *dictGetStatsHt(dict *d, int htidx, int full); diff --git a/src/fmtargs.h b/src/fmtargs.h index e52d3b99c5..3fb6cbb479 100644 --- a/src/fmtargs.h +++ b/src/fmtargs.h @@ -44,9 +44,9 @@ /* Everything below this line is automatically generated by * generate-fmtargs.py. Do not manually edit. */ -#define ARG_N(_1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11, _12, _13, _14, _15, _16, _17, _18, _19, _20, _21, _22, _23, _24, _25, _26, _27, _28, _29, _30, _31, _32, _33, _34, _35, _36, _37, _38, _39, _40, _41, _42, _43, _44, _45, _46, _47, _48, _49, _50, _51, _52, _53, _54, _55, _56, _57, _58, _59, _60, _61, _62, _63, _64, _65, _66, _67, _68, _69, _70, _71, _72, _73, _74, _75, _76, _77, _78, _79, _80, _81, _82, _83, _84, _85, _86, _87, _88, _89, _90, _91, _92, _93, _94, _95, _96, _97, _98, _99, _100, _101, _102, _103, _104, _105, _106, _107, _108, _109, _110, _111, _112, _113, _114, _115, _116, _117, _118, _119, _120, N, ...) N +#define ARG_N(_1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11, _12, _13, _14, _15, _16, _17, _18, _19, _20, _21, _22, _23, _24, _25, _26, _27, _28, _29, _30, _31, _32, _33, _34, _35, _36, _37, _38, _39, _40, _41, _42, _43, _44, _45, _46, _47, _48, _49, _50, _51, _52, _53, _54, _55, _56, _57, _58, _59, _60, _61, _62, _63, _64, _65, _66, _67, _68, _69, _70, _71, _72, _73, _74, _75, _76, _77, _78, _79, _80, _81, _82, _83, _84, _85, _86, _87, _88, _89, _90, _91, _92, _93, _94, _95, _96, _97, _98, _99, _100, _101, _102, _103, _104, _105, _106, _107, _108, _109, _110, _111, _112, _113, _114, _115, _116, _117, _118, _119, _120, _121, _122, N, ...) N -#define RSEQ_N() 120, 119, 118, 117, 116, 115, 114, 113, 112, 111, 110, 109, 108, 107, 106, 105, 104, 103, 102, 101, 100, 99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88, 87, 86, 85, 84, 83, 82, 81, 80, 79, 78, 77, 76, 75, 74, 73, 72, 71, 70, 69, 68, 67, 66, 65, 64, 63, 62, 61, 60, 59, 58, 57, 56, 55, 54, 53, 52, 51, 50, 49, 48, 47, 46, 45, 44, 43, 42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 +#define RSEQ_N() 122, 121, 120, 119, 118, 117, 116, 115, 114, 113, 112, 111, 110, 109, 108, 107, 106, 105, 104, 103, 102, 101, 100, 99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88, 87, 86, 85, 84, 83, 82, 81, 80, 79, 78, 77, 76, 75, 74, 73, 72, 71, 70, 69, 68, 67, 66, 65, 64, 63, 62, 61, 60, 59, 58, 57, 56, 55, 54, 53, 52, 51, 50, 49, 48, 47, 46, 45, 44, 43, 42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 #define COMPACT_FMT_2(fmt, value) fmt #define COMPACT_FMT_4(fmt, value, ...) fmt COMPACT_FMT_2(__VA_ARGS__) @@ -108,6 +108,7 @@ #define COMPACT_FMT_116(fmt, value, ...) fmt COMPACT_FMT_114(__VA_ARGS__) #define COMPACT_FMT_118(fmt, value, ...) fmt COMPACT_FMT_116(__VA_ARGS__) #define COMPACT_FMT_120(fmt, value, ...) fmt COMPACT_FMT_118(__VA_ARGS__) +#define COMPACT_FMT_122(fmt, value, ...) fmt COMPACT_FMT_120(__VA_ARGS__) #define COMPACT_VALUES_2(fmt, value) value #define COMPACT_VALUES_4(fmt, value, ...) value, COMPACT_VALUES_2(__VA_ARGS__) @@ -169,5 +170,6 @@ #define COMPACT_VALUES_116(fmt, value, ...) value, COMPACT_VALUES_114(__VA_ARGS__) #define COMPACT_VALUES_118(fmt, value, ...) value, COMPACT_VALUES_116(__VA_ARGS__) #define COMPACT_VALUES_120(fmt, value, ...) value, COMPACT_VALUES_118(__VA_ARGS__) +#define COMPACT_VALUES_122(fmt, value, ...) value, COMPACT_VALUES_120(__VA_ARGS__) #endif diff --git a/src/kvstore.c b/src/kvstore.c index 16cc8e4822..d6f886c95b 100644 --- a/src/kvstore.c +++ b/src/kvstore.c @@ -828,3 +828,15 @@ int kvstoreDictDelete(kvstore *kvs, int didx, const void *key) { } return ret; } + +void kvstoreDictPrefetch(kvstore **kvs, + int *slots, + const void **keys, + size_t keys_count, + void *(*get_val_data_func)(const void *val)) { + dict *dicts[keys_count]; + for (size_t i = 0; i < keys_count; i++) { + dicts[i] = kvstoreGetDict(kvs[i], slots[i]); + } + dictPrefetch(dicts, keys_count, keys, get_val_data_func); +} diff --git a/src/kvstore.h b/src/kvstore.h index a94f366b6b..40d1eab15f 100644 --- a/src/kvstore.h +++ b/src/kvstore.h @@ -36,6 +36,11 @@ int kvstoreNumNonEmptyDicts(kvstore *kvs); int kvstoreNumAllocatedDicts(kvstore *kvs); int kvstoreNumDicts(kvstore *kvs); uint64_t kvstoreGetHash(kvstore *kvs, const void *key); +void kvstoreDictPrefetch(kvstore **kvs, + int *slots, + const void **keys, + size_t keys_count, + void *(*get_val_data_func)(const void *val)); /* kvstore iterator specific functions */ kvstoreIterator *kvstoreIteratorInit(kvstore *kvs); diff --git a/src/networking.c b/src/networking.c index 915a0b016f..17325c1c54 100644 --- a/src/networking.c +++ b/src/networking.c @@ -45,6 +45,8 @@ static void setProtocolError(const char *errstr, client *c); static void pauseClientsByClient(mstime_t end, int isPauseClientAll); int postponeClientRead(client *c); char *getClientSockname(client *c); +void removeClientFromPendingPrefetchBatch(client *c); + int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */ __thread sds thread_shared_qb = NULL; @@ -1504,6 +1506,7 @@ void unlinkClient(client *c) { listDelNode(server.clients, c->client_list_node); c->client_list_node = NULL; } + removeClientFromPendingPrefetchBatch(c); /* Check if this is a replica waiting for diskless replication (rdb pipe), * in which case it needs to be cleaned from that list */ @@ -4612,9 +4615,124 @@ int postponeClientRead(client *c) { return (trySendReadToIOThreads(c) == C_OK); } +/* Prefetch multiple commands batch */ +typedef struct { + client *clients[DictMaxPrefetchSize]; + size_t clients_count; + size_t keys_count; + void *keys[DictMaxPrefetchSize]; + kvstore *keys_kvs[DictMaxPrefetchSize]; + kvstore *expire_kvs[DictMaxPrefetchSize]; + int slots[DictMaxPrefetchSize]; + int client_closed_during_batch_execution; +} BatchProcessData; + +static BatchProcessData batch = {0}; + +static void *getValData(const void *val) { + robj *o = (robj *)val; + if (o->type == OBJ_STRING && o->encoding == OBJ_ENCODING_RAW) { + return o->ptr; + } + return NULL; +} + +void processBatchClientsCommands(void) { + if (batch.clients_count == 0) return; + /* Prefetch argv's for all clients */ + for (size_t i = 0; i < batch.clients_count; i++) { + client *c = batch.clients[i]; + if (!c || c->argc <= 1) continue; + /* Skip prefetching first argv (cmd name) it was already looked up by the I/O thread. */ + for (int j = 1; j < c->argc; j++) { + __builtin_prefetch(c->argv[j]); + } + } + + /* prefetch the argv->ptr if required */ + for (size_t i = 0; i < batch.clients_count; i++) { + client *c = batch.clients[i]; + if (!c || c->argc <= 1) continue; + for (int j = 1; j < c->argc; j++) { + if (c->argv[j]->encoding == OBJ_ENCODING_RAW) { + __builtin_prefetch(c->argv[j]->ptr); + } + } + } + + /* Get the keys ptrs - we do it here since we wanted to wait for the arg prefetch */ + for (size_t i = 0; i < batch.keys_count; i++) { + batch.keys[i] = ((robj *)batch.keys[i])->ptr; + } + + /* Prefetch keys for all commands */ + if (batch.keys_count > 1) { + /* Keys */ + kvstoreDictPrefetch(batch.keys_kvs, batch.slots, (const void **)batch.keys, batch.keys_count, getValData); + /* Expires - with expires no values prefetch are required. */ + kvstoreDictPrefetch(batch.expire_kvs, batch.slots, (const void **)batch.keys, batch.keys_count, NULL); + } + + /* Process clients' commands */ + for (size_t i = 0; i < batch.clients_count; i++) { + client *c = batch.clients[i]; + if (c) { + /* Set immediately the client to null - in order to not access it again when ProcessingEventsWhileBlocked */ + batch.clients[i] = NULL; + if (processPendingCommandAndInputBuffer(c) != C_ERR) { + beforeNextClient(c); + } + } + } + + batch.keys_count = 0; + batch.clients_count = 0; +} + +void addCommandToBatchAndProcessIfFull(client *c) { + batch.clients[batch.clients_count++] = c; + + /* Get command's keys. + * When ProcessingEventsWhileBlocked is set, we don't want to prefetch keys, as no commands will be executed. */ + if (c->io_parsed_cmd && !ProcessingEventsWhileBlocked) { + getKeysResult result; + initGetKeysResult(&result); + int num_keys = getKeysFromCommand(c->io_parsed_cmd, c->argv, c->argc, &result); + for (int i = 0; i < num_keys && batch.keys_count < DictMaxPrefetchSize; i++) { + batch.keys[batch.keys_count] = c->argv[result.keys[i].pos]; + batch.slots[batch.keys_count] = c->slot > 0 ? c->slot : 0; + batch.keys_kvs[batch.keys_count] = c->db->keys; + batch.expire_kvs[batch.keys_count] = c->db->expires; + batch.keys_count++; + } + getKeysFreeResult(&result); + } + + /* If the batch is full, process it. + * We also check the client count to handle cases where + * no keys exist for the client's command. */ + if (batch.clients_count == DictMaxPrefetchSize || batch.keys_count == DictMaxPrefetchSize) { + processBatchClientsCommands(); + } +} + +void removeClientFromPendingPrefetchBatch(client *c) { + if (batch.clients_count == 0) return; + + batch.client_closed_during_batch_execution = 1; + + for (size_t i = 0; i < batch.clients_count; i++) { + if (batch.clients[i] == c) { + batch.clients[i] = NULL; + return; + } + } +} + int processIOThreadsReadDone(void) { - if (listLength(server.clients_pending_io_read) == 0) return 0; + if (listLength(server.clients_pending_io_read) == 0 && batch.clients_count == 0) return 0; int processed = 0; + batch.client_closed_during_batch_execution = 0; listNode *ln; listNode *next = listFirst(server.clients_pending_io_read); @@ -4667,16 +4785,18 @@ int processIOThreadsReadDone(void) { c->flag.pending_command = 1; } - size_t list_length_before_command_execute = listLength(server.clients_pending_io_read); - if (processPendingCommandAndInputBuffer(c) == C_OK) { - beforeNextClient(c); - } - if (list_length_before_command_execute != listLength(server.clients_pending_io_read)) { - /* A client was unlink from the list possibly making the next node invalid */ + addCommandToBatchAndProcessIfFull(c); + + /* There is a possibility that a client was closed during the latest batch processing. + * In this case the next node may be invalid */ + if (batch.client_closed_during_batch_execution) { next = listFirst(server.clients_pending_io_read); + batch.client_closed_during_batch_execution = 0; } } + processBatchClientsCommands(); + return processed; } @@ -4775,6 +4895,18 @@ void ioThreadReadQueryFromClient(void *data) { c->io_parsed_cmd = NULL; } + /* Offload slot calculations to the I/O thread to reduce main-thread load. */ + if (c->io_parsed_cmd && server.cluster_enabled) { + getKeysResult result; + initGetKeysResult(&result); + int numkeys = getKeysFromCommand(c->io_parsed_cmd, c->argv, c->argc, &result); + if (numkeys) { + robj *first_key = c->argv[result.keys[0].pos]; + c->slot = calculateKeySlot(first_key->ptr); + } + getKeysFreeResult(&result); + } + done: trimClientQueryBuffer(c); atomic_thread_fence(memory_order_release); diff --git a/src/server.c b/src/server.c index d332e6989c..3e7abeab19 100644 --- a/src/server.c +++ b/src/server.c @@ -5623,6 +5623,10 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { server.stat_last_eviction_exceeded_time ? (long long)elapsedUs(server.stat_last_eviction_exceeded_time) : 0; long long current_active_defrag_time = server.stat_last_active_defrag_time ? (long long)elapsedUs(server.stat_last_active_defrag_time) : 0; + long long average_prefetch_batch_size = + (server.stat_total_prefetch_batches + ? server.stat_total_prefetch_entries / server.stat_total_prefetch_batches + : 0); if (sections++) info = sdscat(info, "\r\n"); /* clang-format off */ @@ -5678,6 +5682,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { "io_threaded_writes_processed:%lld\r\n", server.stat_io_writes_processed, "io_threaded_freed_objects:%lld\r\n", server.stat_io_freed_objects, "io_threaded_poll_processed:%lld\r\n", server.stat_poll_processed_by_io_threads, + "io_threaded_average_prefetch_batch_size:%lld\r\n", average_prefetch_batch_size, "client_query_buffer_limit_disconnections:%lld\r\n", server.stat_client_qbuf_limit_disconnections, "client_output_buffer_limit_disconnections:%lld\r\n", server.stat_client_outbuf_limit_disconnections, "reply_buffer_shrinks:%lld\r\n", server.stat_reply_buffer_shrinks, diff --git a/src/server.h b/src/server.h index ccdece20dd..d1a4d94190 100644 --- a/src/server.h +++ b/src/server.h @@ -1828,6 +1828,8 @@ struct valkeyServer { long long stat_total_writes_processed; /* Total number of write events processed */ long long stat_client_qbuf_limit_disconnections; /* Total number of clients reached query buf length limit */ long long stat_client_outbuf_limit_disconnections; /* Total number of clients reached output buf length limit */ + long long stat_total_prefetch_entries; /* Total number of prefetched dict entries */ + long long stat_total_prefetch_batches; /* Total number of prefetched batches */ /* The following two are used to track instantaneous metrics, like * number of operations per second, network traffic. */ struct { diff --git a/tests/unit/networking.tcl b/tests/unit/networking.tcl index 24f8caae9c..7cc11b2ff9 100644 --- a/tests/unit/networking.tcl +++ b/tests/unit/networking.tcl @@ -170,3 +170,40 @@ start_server {config "minimal.conf" tags {"external:skip"}} { } } } + +start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-debug-command {yes}}} { + test {prefetch works as expected when killing a client from the middle of prefetch commands batch} { + # Create 17 (prefetch batch size) +1 clients + for {set i 0} {$i < 17} {incr i} { + set rd$i [valkey_deferring_client] + } + + # Get the client ID of rd4 + $rd4 client id + set rd4_id [$rd4 read] + + # Create a batch of commands by making sure the server sleeps for a while + # before responding to the first command + $rd0 debug sleep 2 + after 200 ; # wait a bit to make sure the server is sleeping. + + # The first client will kill the fourth client + $rd1 client kill id $rd4_id + + # Send set commands for all clients except the first + for {set i 1} {$i < 17} {incr i} { + [set rd$i] set a $i + [set rd$i] flush + } + + # Read the results + assert_equal {1} [$rd1 read] + catch {$rd4 read} err + assert_match {I/O error reading reply} $err + + # Verify the final state + $rd16 get a + assert_equal {OK} [$rd16 read] + assert_equal {16} [$rd16 read] + } +} diff --git a/utils/generate-fmtargs.py b/utils/generate-fmtargs.py index e16cc368fa..873b8f67f6 100755 --- a/utils/generate-fmtargs.py +++ b/utils/generate-fmtargs.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 # Outputs the generated part of src/fmtargs.h -MAX_ARGS = 120 +MAX_ARGS = 122 import os print("/* Everything below this line is automatically generated by") From da8106b03dd55acccc6498451e77fc12128da1ed Mon Sep 17 00:00:00 2001 From: Uri Yagelnik Date: Sun, 4 Aug 2024 13:28:46 +0000 Subject: [PATCH 2/7] Adress PR comments Signed-off-by: Uri Yagelnik --- src/dict.c | 217 +++++++++++++++++++++++++------------- src/networking.c | 4 +- tests/unit/networking.tcl | 63 ++++++----- 3 files changed, 180 insertions(+), 104 deletions(-) diff --git a/src/dict.c b/src/dict.c index eb59949f94..3f2b547ac1 100644 --- a/src/dict.c +++ b/src/dict.c @@ -1541,16 +1541,110 @@ dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctio return v; } -typedef enum { PrefetchStart, PrefetchBucket, PrefetchEntry, PrefetchValue, PrefetchDone } PrefetchState; + +/* -------------------------- Dict Prefetching ------------------------------ */ + +typedef enum { + PrefetchBucket, /* Initial state, determines which hash table to use, and prefetch the table's bucket */ + PrefetchEntry, /* Prefetches entries associated with the given key's hash */ + PrefetchValue, /* Prefetches the value object of the entry found in the previous step*/ + PrefetchValueData, /* Prefetches the value object's data (if applicable) */ + PrefetchDone /* Indicates that prefetching for this key is complete */ +} PrefetchState; + +/************************************ State machine diagram for the prefetch operation. ******************************** + │ + start + │ + ┌────────▼─────────┐ + ┌─────────►│ PrefetchBucket ├────►────────┐ + │ └────────┬─────────┘ no more tables -> done + | bucket|found | + │ | │ + entry not found - goto next table ┌────────▼────────┐ │ + └────◄─────┤ PrefetchEntry | ▼ + ┌────────────►└────────┬────────┘ │ + | Entry│found │ + │ | │ + value not found - goto next entry ┌───────▼────────┐ | + └───────◄──────┤ PrefetchValue | ▼ + └───────┬────────┘ │ + Value│found │ + | | + ┌───────────▼──────────────┐ │ + │ PrefetchValueData │ ▼ + └───────────┬──────────────┘ │ + | │ + ┌───────-─▼─────────────┐ │ + │ PrefetchDone │◄────────┘ + └───────────────────────┘ +**********************************************************************************************************************/ typedef struct { - PrefetchState state; - int ht_idx; - uint64_t idx; - uint64_t key_hash; - dictEntry *current_entry; + PrefetchState state; /* Current state of the prefetch operation */ + int ht_idx; /* Index of the current hash table (0 or 1 for rehashing) */ + uint64_t bucket_idx; /* Index of the bucket in the current hash table */ + uint64_t key_hash; /* Hash value of the key being prefetched */ + dictEntry *current_entry; /* Pointer to the current entry being processed */ } PrefetchInfo; +typedef struct { + PrefetchInfo prefetch_info[DictMaxPrefetchSize]; + size_t current_batch_size; /* Number of keys in the current batch */ + size_t cur_idx; /* Index of the current key being prefetched */ + size_t keys_done; /* Number of keys that have been processed */ +} PrefetchBatch; + +static PrefetchBatch prefetchBatch; /* Global prefetch batch - holds the current batch of keys being prefetched */ + +static void incrCurIdx(void) { + prefetchBatch.cur_idx++; + if (prefetchBatch.cur_idx >= prefetchBatch.current_batch_size) { + prefetchBatch.cur_idx %= prefetchBatch.current_batch_size; + } +} + +/* Prefetches the given pointer and move to the next key in the batch */ +static void prefetch(void *ptr) { + __builtin_prefetch(ptr); + /* while the prefetch is in progress, we can continue to the next key */ + incrCurIdx(); +} + +static void markDone(PrefetchInfo *info) { + info->state = PrefetchDone; + prefetchBatch.keys_done++; +} + +static PrefetchInfo *getNextPrefetchInfo(void) { + while (prefetchBatch.prefetch_info[prefetchBatch.cur_idx].state == PrefetchDone) { + incrCurIdx(); + } + return &prefetchBatch.prefetch_info[prefetchBatch.cur_idx]; +} + +static void initBatch(dict **keys_dicts, size_t num_keys, const void **keys) { + assert(num_keys <= DictMaxPrefetchSize); + + prefetchBatch.current_batch_size = num_keys; + prefetchBatch.cur_idx = 0; + prefetchBatch.keys_done = 0; + + /* Initialize the prefetch info */ + for (size_t i = 0; i < prefetchBatch.current_batch_size; i++) { + PrefetchInfo *info = &prefetchBatch.prefetch_info[i]; + if (!keys_dicts[i] || dictSize(keys_dicts[i]) == 0) { + info->state = PrefetchDone; + prefetchBatch.keys_done++; + continue; + } + info->ht_idx = -1; + info->current_entry = NULL; + info->state = PrefetchBucket; + info->key_hash = dictHashKey(keys_dicts[i], keys[i]); + } +} + /* dictPrefetch - Prefetches dictionary data for an array of keys * * This function takes an array of dictionaries and keys, attempting to bring @@ -1571,109 +1665,80 @@ typedef struct { * dictPrefetch can be invoked with a callback function, get_val_data_func, * to bring the key's value data closer to the L1 cache as well. */ void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *(*get_val_data_func)(const void *val)) { - PrefetchInfo prefetchInfo[DictMaxPrefetchSize]; - size_t done = 0; - - assert(num_keys <= DictMaxPrefetchSize); - - /* Initialize the prefetch info */ - for (size_t i = 0; i < num_keys; i++) { - PrefetchInfo *info = &prefetchInfo[i]; - if (!keys_dicts[i] || dictSize(keys_dicts[i]) == 0) { - info->state = PrefetchDone; - done++; - continue; - } - info->ht_idx = -1; - info->current_entry = NULL; - info->state = PrefetchStart; - info->key_hash = dictHashKey(keys_dicts[i], keys[i]); - } + initBatch(keys_dicts, num_keys, keys); - for (size_t j = 0; done < num_keys; j++) { - size_t i = j % num_keys; - PrefetchInfo *info = &prefetchInfo[i]; + while (prefetchBatch.keys_done < prefetchBatch.current_batch_size) { + PrefetchInfo *info = getNextPrefetchInfo(); + size_t i = prefetchBatch.cur_idx; switch (info->state) { - case PrefetchDone: - /* Skip already processed keys */ - break; - - case PrefetchStart: + case PrefetchBucket: /* Determine which hash table to use */ if (info->ht_idx == -1) { info->ht_idx = 0; } else if (info->ht_idx == 0 && dictIsRehashing(keys_dicts[i])) { info->ht_idx = 1; } else { - done++; - info->state = PrefetchDone; + /* No more tables left - mark as done. */ + markDone(info); break; } /* Prefetch the bucket */ - info->idx = info->key_hash & DICTHT_SIZE_MASK(keys_dicts[i]->ht_size_exp[info->ht_idx]); - __builtin_prefetch(&keys_dicts[i]->ht_table[info->ht_idx][info->idx]); - info->state = PrefetchBucket; + info->bucket_idx = info->key_hash & DICTHT_SIZE_MASK(keys_dicts[i]->ht_size_exp[info->ht_idx]); + prefetch(&keys_dicts[i]->ht_table[info->ht_idx][info->bucket_idx]); + info->current_entry = NULL; + info->state = PrefetchEntry; break; - case PrefetchBucket: + case PrefetchEntry: /* Prefetch the first entry in the bucket */ - info->current_entry = keys_dicts[i]->ht_table[info->ht_idx][info->idx]; if (info->current_entry) { - __builtin_prefetch(info->current_entry); - info->state = PrefetchEntry; + /* We already found an entry in the bucket - move to the next entry */ + info->current_entry = dictGetNext(info->current_entry); + } else { + /* Find the first entry in the bucket */ + info->current_entry = keys_dicts[i]->ht_table[info->ht_idx][info->bucket_idx]; + } + + if (info->current_entry) { + prefetch(info->current_entry); + info->state = PrefetchValue; } else { - /* No entry found in the bucket - try the next table */ - info->state = PrefetchStart; + /* No entry found in the bucket - try the bucket in the next table */ + info->state = PrefetchBucket; } break; - case PrefetchEntry: { + case PrefetchValue: { /* Prefetch the entry's value. */ - void *value = get_val_data_func ? dictGetVal(info->current_entry) : NULL; + void *value = dictGetVal(info->current_entry); - if (info->current_entry->next == NULL && !dictIsRehashing(keys_dicts[i])) { + if (dictGetNext(info->current_entry) == NULL && !dictIsRehashing(keys_dicts[i])) { /* If this is the last element we assume a hit and dont compare the keys */ - if (value) { - __builtin_prefetch(value); - info->state = PrefetchValue; - } else { - done++; - info->state = PrefetchDone; - } + prefetch(value); + info->state = PrefetchValueData; break; } - if (value) { - void *current_entry_key = dictGetKey(info->current_entry); - if (keys[i] == current_entry_key || dictCompareKeys(keys_dicts[i], keys[i], current_entry_key)) { - /* If the key is found, prefetch the value */ - __builtin_prefetch(value); - info->state = PrefetchValue; - break; - } - } - - /* Move to next entry or start over */ - info->current_entry = dictGetNext(info->current_entry); - if (info->current_entry) { - __builtin_prefetch(info->current_entry); - info->state = PrefetchEntry; + void *current_entry_key = dictGetKey(info->current_entry); + if (keys[i] == current_entry_key || dictCompareKeys(keys_dicts[i], keys[i], current_entry_key)) { + /* If the key is found, prefetch the value */ + prefetch(value); + info->state = PrefetchValueData; } else { - info->state = PrefetchStart; + /* Move to next entry */ + info->state = PrefetchEntry; } - break; } - case PrefetchValue: { + case PrefetchValueData: { /* Prefetch value data if available */ - void *value_data = get_val_data_func(dictGetVal(info->current_entry)); - if (value_data) { - __builtin_prefetch(value_data); + if (get_val_data_func) { + void *value_data = get_val_data_func(dictGetVal(info->current_entry)); + if (value_data) prefetch(value_data); } - done++; - info->state = PrefetchDone; + markDone(info); break; } diff --git a/src/networking.c b/src/networking.c index 17325c1c54..009729b041 100644 --- a/src/networking.c +++ b/src/networking.c @@ -4665,8 +4665,10 @@ void processBatchClientsCommands(void) { batch.keys[i] = ((robj *)batch.keys[i])->ptr; } - /* Prefetch keys for all commands */ + /* Prefetch keys for all commands, prefetch is beneficial only if there are more than one key */ if (batch.keys_count > 1) { + server.stat_total_prefetch_batches++; + server.stat_total_prefetch_entries += batch.keys_count; /* Keys */ kvstoreDictPrefetch(batch.keys_kvs, batch.slots, (const void **)batch.keys, batch.keys_count, getValData); /* Expires - with expires no values prefetch are required. */ diff --git a/tests/unit/networking.tcl b/tests/unit/networking.tcl index 7cc11b2ff9..a9831085f1 100644 --- a/tests/unit/networking.tcl +++ b/tests/unit/networking.tcl @@ -172,38 +172,47 @@ start_server {config "minimal.conf" tags {"external:skip"}} { } start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-debug-command {yes}}} { - test {prefetch works as expected when killing a client from the middle of prefetch commands batch} { - # Create 17 (prefetch batch size) +1 clients - for {set i 0} {$i < 17} {incr i} { - set rd$i [valkey_deferring_client] - } - # Get the client ID of rd4 - $rd4 client id - set rd4_id [$rd4 read] + # Skip if non io-threads mode - as it is relevant only for io-threads mode + if {[r config get io-threads] ne "io-threads 1"} { + test {prefetch works as expected when killing a client from the middle of prefetch commands batch} { + # Create 17 (prefetch batch size) +1 clients + for {set i 0} {$i < 17} {incr i} { + set rd$i [valkey_deferring_client] + } - # Create a batch of commands by making sure the server sleeps for a while - # before responding to the first command - $rd0 debug sleep 2 - after 200 ; # wait a bit to make sure the server is sleeping. + # Get the client ID of rd4 + $rd4 client id + set rd4_id [$rd4 read] - # The first client will kill the fourth client - $rd1 client kill id $rd4_id + # Create a batch of commands by making sure the server sleeps for a while + # before responding to the first command + $rd0 debug sleep 2 + after 200 ; # wait a bit to make sure the server is sleeping. - # Send set commands for all clients except the first - for {set i 1} {$i < 17} {incr i} { - [set rd$i] set a $i - [set rd$i] flush - } + # The first client will kill the fourth client + $rd1 client kill id $rd4_id - # Read the results - assert_equal {1} [$rd1 read] - catch {$rd4 read} err - assert_match {I/O error reading reply} $err + # Send set commands for all clients except the first + for {set i 1} {$i < 17} {incr i} { + [set rd$i] set a $i + [set rd$i] flush + } - # Verify the final state - $rd16 get a - assert_equal {OK} [$rd16 read] - assert_equal {16} [$rd16 read] + # Read the results + assert_equal {1} [$rd1 read] + catch {$rd4 read} err + assert_match {I/O error reading reply} $err + + # verify the prefetch stats are as expected + set info [r info stats] + set prefetch_stats [getInfoProperty $info io_threaded_average_prefetch_batch_size] + assert_range [expr $prefetch_stats] 2 15 ; # we expect max 15 as the the kill command doesn't have any keys. + + # Verify the final state + $rd16 get a + assert_equal {OK} [$rd16 read] + assert_equal {16} [$rd16 read] + } } } From 9a41120506d551eb5a95a0dc01d396f0493e3b68 Mon Sep 17 00:00:00 2001 From: Uri Yagelnik Date: Tue, 6 Aug 2024 22:56:43 +0000 Subject: [PATCH 3/7] Address more PR comments Signed-off-by: Uri Yagelnik --- src/config.h | 16 +++++ src/dict.c | 65 ++++++++++----------- src/fmtargs.h | 82 +++++++++++++++++++++++++- src/networking.c | 119 +++++++++++++++++++------------------- src/server.c | 7 +-- tests/unit/networking.tcl | 6 +- utils/generate-fmtargs.py | 2 +- 7 files changed, 192 insertions(+), 105 deletions(-) diff --git a/src/config.h b/src/config.h index 201e421976..844545dee5 100644 --- a/src/config.h +++ b/src/config.h @@ -348,4 +348,20 @@ void setcpuaffinity(const char *cpulist); #endif #endif +/* Check for GCC version >= 4.9 */ +#if defined(__GNUC__) && (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 9)) +#define HAS_BUILTIN_PREFETCH 1 +/* Check for Clang version >= 3.6 */ +#elif defined(__clang__) && (__clang_major__ > 3 || (__clang_major__ == 3 && __clang_minor__ >= 6)) +#define HAS_BUILTIN_PREFETCH 1 +#else +#define HAS_BUILTIN_PREFETCH 0 +#endif + +#if HAS_BUILTIN_PREFETCH +#define valkey_prefetch(addr) __builtin_prefetch(addr) +#else +#define valkey_prefetch(addr) ((void)(addr)) +#endif + #endif diff --git a/src/dict.c b/src/dict.c index 3f2b547ac1..c7bace846a 100644 --- a/src/dict.c +++ b/src/dict.c @@ -48,6 +48,7 @@ #include "zmalloc.h" #include "serverassert.h" #include "monotonic.h" +#include "config.h" #ifndef static_assert #define static_assert(expr, lit) _Static_assert(expr, lit) @@ -1595,47 +1596,38 @@ typedef struct { size_t keys_done; /* Number of keys that have been processed */ } PrefetchBatch; -static PrefetchBatch prefetchBatch; /* Global prefetch batch - holds the current batch of keys being prefetched */ - -static void incrCurIdx(void) { - prefetchBatch.cur_idx++; - if (prefetchBatch.cur_idx >= prefetchBatch.current_batch_size) { - prefetchBatch.cur_idx %= prefetchBatch.current_batch_size; - } -} - /* Prefetches the given pointer and move to the next key in the batch */ -static void prefetch(void *ptr) { - __builtin_prefetch(ptr); +static void prefetch(void *addr, PrefetchBatch *batch) { + valkey_prefetch(addr); /* while the prefetch is in progress, we can continue to the next key */ - incrCurIdx(); + batch->cur_idx = (batch->cur_idx + 1) % batch->current_batch_size; } -static void markDone(PrefetchInfo *info) { +static void markDone(PrefetchInfo *info, PrefetchBatch *batch) { info->state = PrefetchDone; - prefetchBatch.keys_done++; + batch->keys_done++; } -static PrefetchInfo *getNextPrefetchInfo(void) { - while (prefetchBatch.prefetch_info[prefetchBatch.cur_idx].state == PrefetchDone) { - incrCurIdx(); +static PrefetchInfo *getNextPrefetchInfo(PrefetchBatch *batch) { + while (batch->prefetch_info[batch->cur_idx].state == PrefetchDone) { + batch->cur_idx = (batch->cur_idx + 1) % batch->current_batch_size; } - return &prefetchBatch.prefetch_info[prefetchBatch.cur_idx]; + return &batch->prefetch_info[batch->cur_idx]; } -static void initBatch(dict **keys_dicts, size_t num_keys, const void **keys) { +static void initBatch(dict **keys_dicts, size_t num_keys, const void **keys, PrefetchBatch *batch) { assert(num_keys <= DictMaxPrefetchSize); - prefetchBatch.current_batch_size = num_keys; - prefetchBatch.cur_idx = 0; - prefetchBatch.keys_done = 0; + batch->current_batch_size = num_keys; + batch->cur_idx = 0; + batch->keys_done = 0; /* Initialize the prefetch info */ - for (size_t i = 0; i < prefetchBatch.current_batch_size; i++) { - PrefetchInfo *info = &prefetchBatch.prefetch_info[i]; + for (size_t i = 0; i < batch->current_batch_size; i++) { + PrefetchInfo *info = &batch->prefetch_info[i]; if (!keys_dicts[i] || dictSize(keys_dicts[i]) == 0) { info->state = PrefetchDone; - prefetchBatch.keys_done++; + batch->keys_done++; continue; } info->ht_idx = -1; @@ -1665,11 +1657,12 @@ static void initBatch(dict **keys_dicts, size_t num_keys, const void **keys) { * dictPrefetch can be invoked with a callback function, get_val_data_func, * to bring the key's value data closer to the L1 cache as well. */ void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *(*get_val_data_func)(const void *val)) { - initBatch(keys_dicts, num_keys, keys); + PrefetchBatch batch; /* prefetch batch - holds the current batch of keys being prefetched */ + initBatch(keys_dicts, num_keys, keys, &batch); - while (prefetchBatch.keys_done < prefetchBatch.current_batch_size) { - PrefetchInfo *info = getNextPrefetchInfo(); - size_t i = prefetchBatch.cur_idx; + while (batch.keys_done < batch.current_batch_size) { + PrefetchInfo *info = getNextPrefetchInfo(&batch); + size_t i = batch.cur_idx; switch (info->state) { case PrefetchBucket: /* Determine which hash table to use */ @@ -1679,13 +1672,13 @@ void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *( info->ht_idx = 1; } else { /* No more tables left - mark as done. */ - markDone(info); + markDone(info, &batch); break; } /* Prefetch the bucket */ info->bucket_idx = info->key_hash & DICTHT_SIZE_MASK(keys_dicts[i]->ht_size_exp[info->ht_idx]); - prefetch(&keys_dicts[i]->ht_table[info->ht_idx][info->bucket_idx]); + prefetch(&keys_dicts[i]->ht_table[info->ht_idx][info->bucket_idx], &batch); info->current_entry = NULL; info->state = PrefetchEntry; break; @@ -1701,7 +1694,7 @@ void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *( } if (info->current_entry) { - prefetch(info->current_entry); + prefetch(info->current_entry, &batch); info->state = PrefetchValue; } else { /* No entry found in the bucket - try the bucket in the next table */ @@ -1715,7 +1708,7 @@ void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *( if (dictGetNext(info->current_entry) == NULL && !dictIsRehashing(keys_dicts[i])) { /* If this is the last element we assume a hit and dont compare the keys */ - prefetch(value); + prefetch(value, &batch); info->state = PrefetchValueData; break; } @@ -1723,7 +1716,7 @@ void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *( void *current_entry_key = dictGetKey(info->current_entry); if (keys[i] == current_entry_key || dictCompareKeys(keys_dicts[i], keys[i], current_entry_key)) { /* If the key is found, prefetch the value */ - prefetch(value); + prefetch(value, &batch); info->state = PrefetchValueData; } else { /* Move to next entry */ @@ -1736,9 +1729,9 @@ void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *( /* Prefetch value data if available */ if (get_val_data_func) { void *value_data = get_val_data_func(dictGetVal(info->current_entry)); - if (value_data) prefetch(value_data); + if (value_data) prefetch(value_data, &batch); } - markDone(info); + markDone(info, &batch); break; } diff --git a/src/fmtargs.h b/src/fmtargs.h index 3fb6cbb479..1fbd02ed82 100644 --- a/src/fmtargs.h +++ b/src/fmtargs.h @@ -44,9 +44,9 @@ /* Everything below this line is automatically generated by * generate-fmtargs.py. Do not manually edit. */ -#define ARG_N(_1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11, _12, _13, _14, _15, _16, _17, _18, _19, _20, _21, _22, _23, _24, _25, _26, _27, _28, _29, _30, _31, _32, _33, _34, _35, _36, _37, _38, _39, _40, _41, _42, _43, _44, _45, _46, _47, _48, _49, _50, _51, _52, _53, _54, _55, _56, _57, _58, _59, _60, _61, _62, _63, _64, _65, _66, _67, _68, _69, _70, _71, _72, _73, _74, _75, _76, _77, _78, _79, _80, _81, _82, _83, _84, _85, _86, _87, _88, _89, _90, _91, _92, _93, _94, _95, _96, _97, _98, _99, _100, _101, _102, _103, _104, _105, _106, _107, _108, _109, _110, _111, _112, _113, _114, _115, _116, _117, _118, _119, _120, _121, _122, N, ...) N +#define ARG_N(_1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11, _12, _13, _14, _15, _16, _17, _18, _19, _20, _21, _22, _23, _24, _25, _26, _27, _28, _29, _30, _31, _32, _33, _34, _35, _36, _37, _38, _39, _40, _41, _42, _43, _44, _45, _46, _47, _48, _49, _50, _51, _52, _53, _54, _55, _56, _57, _58, _59, _60, _61, _62, _63, _64, _65, _66, _67, _68, _69, _70, _71, _72, _73, _74, _75, _76, _77, _78, _79, _80, _81, _82, _83, _84, _85, _86, _87, _88, _89, _90, _91, _92, _93, _94, _95, _96, _97, _98, _99, _100, _101, _102, _103, _104, _105, _106, _107, _108, _109, _110, _111, _112, _113, _114, _115, _116, _117, _118, _119, _120, _121, _122, _123, _124, _125, _126, _127, _128, _129, _130, _131, _132, _133, _134, _135, _136, _137, _138, _139, _140, _141, _142, _143, _144, _145, _146, _147, _148, _149, _150, _151, _152, _153, _154, _155, _156, _157, _158, _159, _160, _161, _162, _163, _164, _165, _166, _167, _168, _169, _170, _171, _172, _173, _174, _175, _176, _177, _178, _179, _180, _181, _182, _183, _184, _185, _186, _187, _188, _189, _190, _191, _192, _193, _194, _195, _196, _197, _198, _199, _200, N, ...) N -#define RSEQ_N() 122, 121, 120, 119, 118, 117, 116, 115, 114, 113, 112, 111, 110, 109, 108, 107, 106, 105, 104, 103, 102, 101, 100, 99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88, 87, 86, 85, 84, 83, 82, 81, 80, 79, 78, 77, 76, 75, 74, 73, 72, 71, 70, 69, 68, 67, 66, 65, 64, 63, 62, 61, 60, 59, 58, 57, 56, 55, 54, 53, 52, 51, 50, 49, 48, 47, 46, 45, 44, 43, 42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 +#define RSEQ_N() 200, 199, 198, 197, 196, 195, 194, 193, 192, 191, 190, 189, 188, 187, 186, 185, 184, 183, 182, 181, 180, 179, 178, 177, 176, 175, 174, 173, 172, 171, 170, 169, 168, 167, 166, 165, 164, 163, 162, 161, 160, 159, 158, 157, 156, 155, 154, 153, 152, 151, 150, 149, 148, 147, 146, 145, 144, 143, 142, 141, 140, 139, 138, 137, 136, 135, 134, 133, 132, 131, 130, 129, 128, 127, 126, 125, 124, 123, 122, 121, 120, 119, 118, 117, 116, 115, 114, 113, 112, 111, 110, 109, 108, 107, 106, 105, 104, 103, 102, 101, 100, 99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88, 87, 86, 85, 84, 83, 82, 81, 80, 79, 78, 77, 76, 75, 74, 73, 72, 71, 70, 69, 68, 67, 66, 65, 64, 63, 62, 61, 60, 59, 58, 57, 56, 55, 54, 53, 52, 51, 50, 49, 48, 47, 46, 45, 44, 43, 42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 #define COMPACT_FMT_2(fmt, value) fmt #define COMPACT_FMT_4(fmt, value, ...) fmt COMPACT_FMT_2(__VA_ARGS__) @@ -109,6 +109,45 @@ #define COMPACT_FMT_118(fmt, value, ...) fmt COMPACT_FMT_116(__VA_ARGS__) #define COMPACT_FMT_120(fmt, value, ...) fmt COMPACT_FMT_118(__VA_ARGS__) #define COMPACT_FMT_122(fmt, value, ...) fmt COMPACT_FMT_120(__VA_ARGS__) +#define COMPACT_FMT_124(fmt, value, ...) fmt COMPACT_FMT_122(__VA_ARGS__) +#define COMPACT_FMT_126(fmt, value, ...) fmt COMPACT_FMT_124(__VA_ARGS__) +#define COMPACT_FMT_128(fmt, value, ...) fmt COMPACT_FMT_126(__VA_ARGS__) +#define COMPACT_FMT_130(fmt, value, ...) fmt COMPACT_FMT_128(__VA_ARGS__) +#define COMPACT_FMT_132(fmt, value, ...) fmt COMPACT_FMT_130(__VA_ARGS__) +#define COMPACT_FMT_134(fmt, value, ...) fmt COMPACT_FMT_132(__VA_ARGS__) +#define COMPACT_FMT_136(fmt, value, ...) fmt COMPACT_FMT_134(__VA_ARGS__) +#define COMPACT_FMT_138(fmt, value, ...) fmt COMPACT_FMT_136(__VA_ARGS__) +#define COMPACT_FMT_140(fmt, value, ...) fmt COMPACT_FMT_138(__VA_ARGS__) +#define COMPACT_FMT_142(fmt, value, ...) fmt COMPACT_FMT_140(__VA_ARGS__) +#define COMPACT_FMT_144(fmt, value, ...) fmt COMPACT_FMT_142(__VA_ARGS__) +#define COMPACT_FMT_146(fmt, value, ...) fmt COMPACT_FMT_144(__VA_ARGS__) +#define COMPACT_FMT_148(fmt, value, ...) fmt COMPACT_FMT_146(__VA_ARGS__) +#define COMPACT_FMT_150(fmt, value, ...) fmt COMPACT_FMT_148(__VA_ARGS__) +#define COMPACT_FMT_152(fmt, value, ...) fmt COMPACT_FMT_150(__VA_ARGS__) +#define COMPACT_FMT_154(fmt, value, ...) fmt COMPACT_FMT_152(__VA_ARGS__) +#define COMPACT_FMT_156(fmt, value, ...) fmt COMPACT_FMT_154(__VA_ARGS__) +#define COMPACT_FMT_158(fmt, value, ...) fmt COMPACT_FMT_156(__VA_ARGS__) +#define COMPACT_FMT_160(fmt, value, ...) fmt COMPACT_FMT_158(__VA_ARGS__) +#define COMPACT_FMT_162(fmt, value, ...) fmt COMPACT_FMT_160(__VA_ARGS__) +#define COMPACT_FMT_164(fmt, value, ...) fmt COMPACT_FMT_162(__VA_ARGS__) +#define COMPACT_FMT_166(fmt, value, ...) fmt COMPACT_FMT_164(__VA_ARGS__) +#define COMPACT_FMT_168(fmt, value, ...) fmt COMPACT_FMT_166(__VA_ARGS__) +#define COMPACT_FMT_170(fmt, value, ...) fmt COMPACT_FMT_168(__VA_ARGS__) +#define COMPACT_FMT_172(fmt, value, ...) fmt COMPACT_FMT_170(__VA_ARGS__) +#define COMPACT_FMT_174(fmt, value, ...) fmt COMPACT_FMT_172(__VA_ARGS__) +#define COMPACT_FMT_176(fmt, value, ...) fmt COMPACT_FMT_174(__VA_ARGS__) +#define COMPACT_FMT_178(fmt, value, ...) fmt COMPACT_FMT_176(__VA_ARGS__) +#define COMPACT_FMT_180(fmt, value, ...) fmt COMPACT_FMT_178(__VA_ARGS__) +#define COMPACT_FMT_182(fmt, value, ...) fmt COMPACT_FMT_180(__VA_ARGS__) +#define COMPACT_FMT_184(fmt, value, ...) fmt COMPACT_FMT_182(__VA_ARGS__) +#define COMPACT_FMT_186(fmt, value, ...) fmt COMPACT_FMT_184(__VA_ARGS__) +#define COMPACT_FMT_188(fmt, value, ...) fmt COMPACT_FMT_186(__VA_ARGS__) +#define COMPACT_FMT_190(fmt, value, ...) fmt COMPACT_FMT_188(__VA_ARGS__) +#define COMPACT_FMT_192(fmt, value, ...) fmt COMPACT_FMT_190(__VA_ARGS__) +#define COMPACT_FMT_194(fmt, value, ...) fmt COMPACT_FMT_192(__VA_ARGS__) +#define COMPACT_FMT_196(fmt, value, ...) fmt COMPACT_FMT_194(__VA_ARGS__) +#define COMPACT_FMT_198(fmt, value, ...) fmt COMPACT_FMT_196(__VA_ARGS__) +#define COMPACT_FMT_200(fmt, value, ...) fmt COMPACT_FMT_198(__VA_ARGS__) #define COMPACT_VALUES_2(fmt, value) value #define COMPACT_VALUES_4(fmt, value, ...) value, COMPACT_VALUES_2(__VA_ARGS__) @@ -171,5 +210,44 @@ #define COMPACT_VALUES_118(fmt, value, ...) value, COMPACT_VALUES_116(__VA_ARGS__) #define COMPACT_VALUES_120(fmt, value, ...) value, COMPACT_VALUES_118(__VA_ARGS__) #define COMPACT_VALUES_122(fmt, value, ...) value, COMPACT_VALUES_120(__VA_ARGS__) +#define COMPACT_VALUES_124(fmt, value, ...) value, COMPACT_VALUES_122(__VA_ARGS__) +#define COMPACT_VALUES_126(fmt, value, ...) value, COMPACT_VALUES_124(__VA_ARGS__) +#define COMPACT_VALUES_128(fmt, value, ...) value, COMPACT_VALUES_126(__VA_ARGS__) +#define COMPACT_VALUES_130(fmt, value, ...) value, COMPACT_VALUES_128(__VA_ARGS__) +#define COMPACT_VALUES_132(fmt, value, ...) value, COMPACT_VALUES_130(__VA_ARGS__) +#define COMPACT_VALUES_134(fmt, value, ...) value, COMPACT_VALUES_132(__VA_ARGS__) +#define COMPACT_VALUES_136(fmt, value, ...) value, COMPACT_VALUES_134(__VA_ARGS__) +#define COMPACT_VALUES_138(fmt, value, ...) value, COMPACT_VALUES_136(__VA_ARGS__) +#define COMPACT_VALUES_140(fmt, value, ...) value, COMPACT_VALUES_138(__VA_ARGS__) +#define COMPACT_VALUES_142(fmt, value, ...) value, COMPACT_VALUES_140(__VA_ARGS__) +#define COMPACT_VALUES_144(fmt, value, ...) value, COMPACT_VALUES_142(__VA_ARGS__) +#define COMPACT_VALUES_146(fmt, value, ...) value, COMPACT_VALUES_144(__VA_ARGS__) +#define COMPACT_VALUES_148(fmt, value, ...) value, COMPACT_VALUES_146(__VA_ARGS__) +#define COMPACT_VALUES_150(fmt, value, ...) value, COMPACT_VALUES_148(__VA_ARGS__) +#define COMPACT_VALUES_152(fmt, value, ...) value, COMPACT_VALUES_150(__VA_ARGS__) +#define COMPACT_VALUES_154(fmt, value, ...) value, COMPACT_VALUES_152(__VA_ARGS__) +#define COMPACT_VALUES_156(fmt, value, ...) value, COMPACT_VALUES_154(__VA_ARGS__) +#define COMPACT_VALUES_158(fmt, value, ...) value, COMPACT_VALUES_156(__VA_ARGS__) +#define COMPACT_VALUES_160(fmt, value, ...) value, COMPACT_VALUES_158(__VA_ARGS__) +#define COMPACT_VALUES_162(fmt, value, ...) value, COMPACT_VALUES_160(__VA_ARGS__) +#define COMPACT_VALUES_164(fmt, value, ...) value, COMPACT_VALUES_162(__VA_ARGS__) +#define COMPACT_VALUES_166(fmt, value, ...) value, COMPACT_VALUES_164(__VA_ARGS__) +#define COMPACT_VALUES_168(fmt, value, ...) value, COMPACT_VALUES_166(__VA_ARGS__) +#define COMPACT_VALUES_170(fmt, value, ...) value, COMPACT_VALUES_168(__VA_ARGS__) +#define COMPACT_VALUES_172(fmt, value, ...) value, COMPACT_VALUES_170(__VA_ARGS__) +#define COMPACT_VALUES_174(fmt, value, ...) value, COMPACT_VALUES_172(__VA_ARGS__) +#define COMPACT_VALUES_176(fmt, value, ...) value, COMPACT_VALUES_174(__VA_ARGS__) +#define COMPACT_VALUES_178(fmt, value, ...) value, COMPACT_VALUES_176(__VA_ARGS__) +#define COMPACT_VALUES_180(fmt, value, ...) value, COMPACT_VALUES_178(__VA_ARGS__) +#define COMPACT_VALUES_182(fmt, value, ...) value, COMPACT_VALUES_180(__VA_ARGS__) +#define COMPACT_VALUES_184(fmt, value, ...) value, COMPACT_VALUES_182(__VA_ARGS__) +#define COMPACT_VALUES_186(fmt, value, ...) value, COMPACT_VALUES_184(__VA_ARGS__) +#define COMPACT_VALUES_188(fmt, value, ...) value, COMPACT_VALUES_186(__VA_ARGS__) +#define COMPACT_VALUES_190(fmt, value, ...) value, COMPACT_VALUES_188(__VA_ARGS__) +#define COMPACT_VALUES_192(fmt, value, ...) value, COMPACT_VALUES_190(__VA_ARGS__) +#define COMPACT_VALUES_194(fmt, value, ...) value, COMPACT_VALUES_192(__VA_ARGS__) +#define COMPACT_VALUES_196(fmt, value, ...) value, COMPACT_VALUES_194(__VA_ARGS__) +#define COMPACT_VALUES_198(fmt, value, ...) value, COMPACT_VALUES_196(__VA_ARGS__) +#define COMPACT_VALUES_200(fmt, value, ...) value, COMPACT_VALUES_198(__VA_ARGS__) #endif diff --git a/src/networking.c b/src/networking.c index 009729b041..3965325123 100644 --- a/src/networking.c +++ b/src/networking.c @@ -4618,18 +4618,17 @@ int postponeClientRead(client *c) { /* Prefetch multiple commands batch */ typedef struct { client *clients[DictMaxPrefetchSize]; - size_t clients_count; - size_t keys_count; + size_t client_count; + size_t key_count; void *keys[DictMaxPrefetchSize]; kvstore *keys_kvs[DictMaxPrefetchSize]; kvstore *expire_kvs[DictMaxPrefetchSize]; int slots[DictMaxPrefetchSize]; - int client_closed_during_batch_execution; } BatchProcessData; static BatchProcessData batch = {0}; -static void *getValData(const void *val) { +static void *getObjectValuePtr(const void *val) { robj *o = (robj *)val; if (o->type == OBJ_STRING && o->encoding == OBJ_ENCODING_RAW) { return o->ptr; @@ -4637,93 +4636,91 @@ static void *getValData(const void *val) { return NULL; } -void processBatchClientsCommands(void) { - if (batch.clients_count == 0) return; +static void batchProcessClientCommands(void) { + for (size_t i = 0; i < batch.client_count; i++) { + client *c = batch.clients[i]; + if (c) { + /* Set immediately the client to null - in order to not access it again when ProcessingEventsWhileBlocked */ + batch.clients[i] = NULL; + if (processPendingCommandAndInputBuffer(c) != C_ERR) { + beforeNextClient(c); + } + } + } + memset(&batch, 0, sizeof(batch)); +} + +/*Prefetch the commands' args allocated by the I/O thread and process all the commands in the batch.*/ +static void batchPrefetchArgsAndProcessClientCommands(void) { + if (batch.client_count == 0) return; /* Prefetch argv's for all clients */ - for (size_t i = 0; i < batch.clients_count; i++) { + for (size_t i = 0; i < batch.client_count; i++) { client *c = batch.clients[i]; if (!c || c->argc <= 1) continue; /* Skip prefetching first argv (cmd name) it was already looked up by the I/O thread. */ for (int j = 1; j < c->argc; j++) { - __builtin_prefetch(c->argv[j]); + valkey_prefetch(c->argv[j]); } } /* prefetch the argv->ptr if required */ - for (size_t i = 0; i < batch.clients_count; i++) { + for (size_t i = 0; i < batch.client_count; i++) { client *c = batch.clients[i]; if (!c || c->argc <= 1) continue; for (int j = 1; j < c->argc; j++) { if (c->argv[j]->encoding == OBJ_ENCODING_RAW) { - __builtin_prefetch(c->argv[j]->ptr); + valkey_prefetch(c->argv[j]->ptr); } } } /* Get the keys ptrs - we do it here since we wanted to wait for the arg prefetch */ - for (size_t i = 0; i < batch.keys_count; i++) { + for (size_t i = 0; i < batch.key_count; i++) { batch.keys[i] = ((robj *)batch.keys[i])->ptr; } /* Prefetch keys for all commands, prefetch is beneficial only if there are more than one key */ - if (batch.keys_count > 1) { + if (batch.key_count > 1) { server.stat_total_prefetch_batches++; - server.stat_total_prefetch_entries += batch.keys_count; + server.stat_total_prefetch_entries += batch.key_count; /* Keys */ - kvstoreDictPrefetch(batch.keys_kvs, batch.slots, (const void **)batch.keys, batch.keys_count, getValData); + kvstoreDictPrefetch(batch.keys_kvs, batch.slots, (const void **) batch.keys, batch.key_count, getObjectValuePtr); /* Expires - with expires no values prefetch are required. */ - kvstoreDictPrefetch(batch.expire_kvs, batch.slots, (const void **)batch.keys, batch.keys_count, NULL); + kvstoreDictPrefetch(batch.expire_kvs, batch.slots, (const void **)batch.keys, batch.key_count, NULL); } /* Process clients' commands */ - for (size_t i = 0; i < batch.clients_count; i++) { - client *c = batch.clients[i]; - if (c) { - /* Set immediately the client to null - in order to not access it again when ProcessingEventsWhileBlocked */ - batch.clients[i] = NULL; - if (processPendingCommandAndInputBuffer(c) != C_ERR) { - beforeNextClient(c); - } - } - } - - batch.keys_count = 0; - batch.clients_count = 0; + batchProcessClientCommands(); } void addCommandToBatchAndProcessIfFull(client *c) { - batch.clients[batch.clients_count++] = c; + batch.clients[batch.client_count++] = c; - /* Get command's keys. - * When ProcessingEventsWhileBlocked is set, we don't want to prefetch keys, as no commands will be executed. */ - if (c->io_parsed_cmd && !ProcessingEventsWhileBlocked) { + /* Get command's keys positions */ + if (c->io_parsed_cmd) { getKeysResult result; initGetKeysResult(&result); int num_keys = getKeysFromCommand(c->io_parsed_cmd, c->argv, c->argc, &result); - for (int i = 0; i < num_keys && batch.keys_count < DictMaxPrefetchSize; i++) { - batch.keys[batch.keys_count] = c->argv[result.keys[i].pos]; - batch.slots[batch.keys_count] = c->slot > 0 ? c->slot : 0; - batch.keys_kvs[batch.keys_count] = c->db->keys; - batch.expire_kvs[batch.keys_count] = c->db->expires; - batch.keys_count++; + for (int i = 0; i < num_keys && batch.key_count < DictMaxPrefetchSize; i++) { + batch.keys[batch.key_count] = c->argv[result.keys[i].pos]; + batch.slots[batch.key_count] = c->slot > 0 ? c->slot : 0; + batch.keys_kvs[batch.key_count] = c->db->keys; + batch.expire_kvs[batch.key_count] = c->db->expires; + batch.key_count++; } getKeysFreeResult(&result); } /* If the batch is full, process it. * We also check the client count to handle cases where - * no keys exist for the client's command. */ - if (batch.clients_count == DictMaxPrefetchSize || batch.keys_count == DictMaxPrefetchSize) { - processBatchClientsCommands(); + * no keys exist for the clients' commands. */ + if (batch.client_count == DictMaxPrefetchSize || batch.key_count == DictMaxPrefetchSize) { + batchPrefetchArgsAndProcessClientCommands(); } } void removeClientFromPendingPrefetchBatch(client *c) { - if (batch.clients_count == 0) return; - - batch.client_closed_during_batch_execution = 1; - - for (size_t i = 0; i < batch.clients_count; i++) { + for (size_t i = 0; i < batch.client_count; i++) { if (batch.clients[i] == c) { batch.clients[i] = NULL; return; @@ -4732,9 +4729,14 @@ void removeClientFromPendingPrefetchBatch(client *c) { } int processIOThreadsReadDone(void) { - if (listLength(server.clients_pending_io_read) == 0 && batch.clients_count == 0) return 0; + if (ProcessingEventsWhileBlocked) { + /* When ProcessingEventsWhileBlocked we may call processIOThreadsReadDone recursively. + * In this case, there may be some clients left in the batch waiting to be processed. */ + batchProcessClientCommands(); + } + + if (listLength(server.clients_pending_io_read) == 0) return 0; int processed = 0; - batch.client_closed_during_batch_execution = 0; listNode *ln; listNode *next = listFirst(server.clients_pending_io_read); @@ -4751,16 +4753,17 @@ int processIOThreadsReadDone(void) { } /* memory barrier acquire to get the updated client state */ atomic_thread_fence(memory_order_acquire); - /* Don't post-process-writes to clients that are going to be closed anyway. */ - if (c->flag.close_asap) continue; - /* If a client is protected, don't do anything, - * that may trigger read/write error or recreate handler. */ - if (c->flag.protected) continue; listUnlinkNode(server.clients_pending_io_read, ln); c->flag.pending_read = 0; c->io_read_state = CLIENT_IDLE; + /* Don't post-process-reads to clients that are going to be closed anyway. */ + if (c->flag.close_asap) continue; + /* If a client is protected, don't do anything, + * that may trigger read/write error or recreate handler. */ + if (c->flag.protected) continue; + processed++; server.stat_io_reads_processed++; @@ -4787,17 +4790,15 @@ int processIOThreadsReadDone(void) { c->flag.pending_command = 1; } + size_t list_len = listLength(server.clients_pending_io_read); addCommandToBatchAndProcessIfFull(c); - - /* There is a possibility that a client was closed during the latest batch processing. - * In this case the next node may be invalid */ - if (batch.client_closed_during_batch_execution) { + if (list_len != listLength(server.clients_pending_io_read)) { + /* A client was removed from the list - next node may be invalid */ next = listFirst(server.clients_pending_io_read); - batch.client_closed_during_batch_execution = 0; } } - processBatchClientsCommands(); + batchPrefetchArgsAndProcessClientCommands(); return processed; } diff --git a/src/server.c b/src/server.c index 3e7abeab19..4bcbbe4826 100644 --- a/src/server.c +++ b/src/server.c @@ -5623,10 +5623,6 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { server.stat_last_eviction_exceeded_time ? (long long)elapsedUs(server.stat_last_eviction_exceeded_time) : 0; long long current_active_defrag_time = server.stat_last_active_defrag_time ? (long long)elapsedUs(server.stat_last_active_defrag_time) : 0; - long long average_prefetch_batch_size = - (server.stat_total_prefetch_batches - ? server.stat_total_prefetch_entries / server.stat_total_prefetch_batches - : 0); if (sections++) info = sdscat(info, "\r\n"); /* clang-format off */ @@ -5682,7 +5678,8 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { "io_threaded_writes_processed:%lld\r\n", server.stat_io_writes_processed, "io_threaded_freed_objects:%lld\r\n", server.stat_io_freed_objects, "io_threaded_poll_processed:%lld\r\n", server.stat_poll_processed_by_io_threads, - "io_threaded_average_prefetch_batch_size:%lld\r\n", average_prefetch_batch_size, + "io_threaded_total_prefetch_batches:%lld\r\n", server.stat_total_prefetch_batches, + "io_threaded_total_prefetch_entries:%lld\r\n", server.stat_total_prefetch_entries, "client_query_buffer_limit_disconnections:%lld\r\n", server.stat_client_qbuf_limit_disconnections, "client_output_buffer_limit_disconnections:%lld\r\n", server.stat_client_outbuf_limit_disconnections, "reply_buffer_shrinks:%lld\r\n", server.stat_reply_buffer_shrinks, diff --git a/tests/unit/networking.tcl b/tests/unit/networking.tcl index a9831085f1..1740436c2b 100644 --- a/tests/unit/networking.tcl +++ b/tests/unit/networking.tcl @@ -206,8 +206,10 @@ start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-deb # verify the prefetch stats are as expected set info [r info stats] - set prefetch_stats [getInfoProperty $info io_threaded_average_prefetch_batch_size] - assert_range [expr $prefetch_stats] 2 15 ; # we expect max 15 as the the kill command doesn't have any keys. + set prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries] + assert_range $prefetch_entries 2 15; # With slower machines, the number of prefetch entries can be lower + set prefetch_batches [getInfoProperty $info io_threaded_total_prefetch_batches] + assert_range $prefetch_batches 1 7; # With slower machines, the number of batches can be higher # Verify the final state $rd16 get a diff --git a/utils/generate-fmtargs.py b/utils/generate-fmtargs.py index 873b8f67f6..dfe8efadcc 100755 --- a/utils/generate-fmtargs.py +++ b/utils/generate-fmtargs.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 # Outputs the generated part of src/fmtargs.h -MAX_ARGS = 122 +MAX_ARGS = 200 import os print("/* Everything below this line is automatically generated by") From d04a75595810719082f61aed5526125a3d3067f9 Mon Sep 17 00:00:00 2001 From: Uri Yagelnik Date: Fri, 16 Aug 2024 00:15:25 +0000 Subject: [PATCH 4/7] Dynamic config for batch size. Signed-off-by: Uri Yagelnik --- src/Makefile | 2 +- src/config.c | 7 + src/dict.c | 202 +------------------ src/dict.h | 4 +- src/io_threads.c | 2 + src/kvstore.c | 14 +- src/kvstore.h | 5 - src/maa.c | 415 ++++++++++++++++++++++++++++++++++++++ src/maa.h | 12 ++ src/networking.c | 136 ++----------- src/server.h | 2 + tests/unit/networking.tcl | 66 ++++++ valkey.conf | 15 +- 13 files changed, 537 insertions(+), 345 deletions(-) create mode 100644 src/maa.c create mode 100644 src/maa.h diff --git a/src/Makefile b/src/Makefile index eaf0e4e387..d5cd77be4c 100644 --- a/src/Makefile +++ b/src/Makefile @@ -423,7 +423,7 @@ endif ENGINE_NAME=valkey SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX) ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX) -ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o +ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o maa.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX) ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX) diff --git a/src/config.c b/src/config.c index ae60dd3fd0..364aa38fe1 100644 --- a/src/config.c +++ b/src/config.c @@ -2564,6 +2564,12 @@ static int updateOOMScoreAdj(const char **err) { return 1; } +static int UpdateMaxPrefetchBatchSize(const char **err) { + UNUSED(err); + onMaxBatchSizeChange(); + return 1; +} + int invalidateClusterSlotsResp(const char **err) { UNUSED(err); clearCachedClusterSlotsResponse(); @@ -3164,6 +3170,7 @@ standardConfig static_configs[] = { createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */ createIntConfig("io-threads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, 1, 128, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */ createIntConfig("events-per-io-thread", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.events_per_io_thread, 2, INTEGER_CONFIG, NULL, NULL), + createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG, 0, 128, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, UpdateMaxPrefetchBatchSize), createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL), createIntConfig("cluster-replica-validity-factor", "cluster-slave-validity-factor", MODIFIABLE_CONFIG, 0, INT_MAX, server.cluster_replica_validity_factor, 10, INTEGER_CONFIG, NULL, NULL), /* replica max data age factor. */ createIntConfig("list-max-listpack-size", "list-max-ziplist-size", MODIFIABLE_CONFIG, INT_MIN, INT_MAX, server.list_max_listpack_size, -2, INTEGER_CONFIG, NULL, NULL), diff --git a/src/dict.c b/src/dict.c index c7bace846a..9ad3fd0abf 100644 --- a/src/dict.c +++ b/src/dict.c @@ -120,7 +120,7 @@ static void _dictExpandIfNeeded(dict *d); static void _dictShrinkIfNeeded(dict *d); static signed char _dictNextExp(unsigned long size); static int _dictInit(dict *d, dictType *type); -static dictEntry *dictGetNext(const dictEntry *de); +dictEntry *dictGetNext(const dictEntry *de); static dictEntry **dictGetNextRef(dictEntry *de); static void dictSetNext(dictEntry *de, dictEntry *next); @@ -963,7 +963,7 @@ double *dictGetDoubleValPtr(dictEntry *de) { /* Returns the 'next' field of the entry or NULL if the entry doesn't have a * 'next' field. */ -static dictEntry *dictGetNext(const dictEntry *de) { +dictEntry *dictGetNext(const dictEntry *de) { if (entryIsKey(de)) return NULL; /* there's no next */ if (entryIsNoValue(de)) return decodeEntryNoValue(de)->next; if (entryIsEmbedded(de)) return decodeEmbeddedEntry(de)->next; @@ -1542,204 +1542,6 @@ dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctio return v; } - -/* -------------------------- Dict Prefetching ------------------------------ */ - -typedef enum { - PrefetchBucket, /* Initial state, determines which hash table to use, and prefetch the table's bucket */ - PrefetchEntry, /* Prefetches entries associated with the given key's hash */ - PrefetchValue, /* Prefetches the value object of the entry found in the previous step*/ - PrefetchValueData, /* Prefetches the value object's data (if applicable) */ - PrefetchDone /* Indicates that prefetching for this key is complete */ -} PrefetchState; - -/************************************ State machine diagram for the prefetch operation. ******************************** - │ - start - │ - ┌────────▼─────────┐ - ┌─────────►│ PrefetchBucket ├────►────────┐ - │ └────────┬─────────┘ no more tables -> done - | bucket|found | - │ | │ - entry not found - goto next table ┌────────▼────────┐ │ - └────◄─────┤ PrefetchEntry | ▼ - ┌────────────►└────────┬────────┘ │ - | Entry│found │ - │ | │ - value not found - goto next entry ┌───────▼────────┐ | - └───────◄──────┤ PrefetchValue | ▼ - └───────┬────────┘ │ - Value│found │ - | | - ┌───────────▼──────────────┐ │ - │ PrefetchValueData │ ▼ - └───────────┬──────────────┘ │ - | │ - ┌───────-─▼─────────────┐ │ - │ PrefetchDone │◄────────┘ - └───────────────────────┘ -**********************************************************************************************************************/ - -typedef struct { - PrefetchState state; /* Current state of the prefetch operation */ - int ht_idx; /* Index of the current hash table (0 or 1 for rehashing) */ - uint64_t bucket_idx; /* Index of the bucket in the current hash table */ - uint64_t key_hash; /* Hash value of the key being prefetched */ - dictEntry *current_entry; /* Pointer to the current entry being processed */ -} PrefetchInfo; - -typedef struct { - PrefetchInfo prefetch_info[DictMaxPrefetchSize]; - size_t current_batch_size; /* Number of keys in the current batch */ - size_t cur_idx; /* Index of the current key being prefetched */ - size_t keys_done; /* Number of keys that have been processed */ -} PrefetchBatch; - -/* Prefetches the given pointer and move to the next key in the batch */ -static void prefetch(void *addr, PrefetchBatch *batch) { - valkey_prefetch(addr); - /* while the prefetch is in progress, we can continue to the next key */ - batch->cur_idx = (batch->cur_idx + 1) % batch->current_batch_size; -} - -static void markDone(PrefetchInfo *info, PrefetchBatch *batch) { - info->state = PrefetchDone; - batch->keys_done++; -} - -static PrefetchInfo *getNextPrefetchInfo(PrefetchBatch *batch) { - while (batch->prefetch_info[batch->cur_idx].state == PrefetchDone) { - batch->cur_idx = (batch->cur_idx + 1) % batch->current_batch_size; - } - return &batch->prefetch_info[batch->cur_idx]; -} - -static void initBatch(dict **keys_dicts, size_t num_keys, const void **keys, PrefetchBatch *batch) { - assert(num_keys <= DictMaxPrefetchSize); - - batch->current_batch_size = num_keys; - batch->cur_idx = 0; - batch->keys_done = 0; - - /* Initialize the prefetch info */ - for (size_t i = 0; i < batch->current_batch_size; i++) { - PrefetchInfo *info = &batch->prefetch_info[i]; - if (!keys_dicts[i] || dictSize(keys_dicts[i]) == 0) { - info->state = PrefetchDone; - batch->keys_done++; - continue; - } - info->ht_idx = -1; - info->current_entry = NULL; - info->state = PrefetchBucket; - info->key_hash = dictHashKey(keys_dicts[i], keys[i]); - } -} - -/* dictPrefetch - Prefetches dictionary data for an array of keys - * - * This function takes an array of dictionaries and keys, attempting to bring - * data closer to the L1 cache that might be needed for dictionary operations - * on those keys. - * - * dictFind Algorithm: - * 1. Evaluate the hash of the key - * 2. Access the index in the first table - * 3. Walk the linked list until the key is found - * If the key hasn't been found and the dictionary is in the middle of rehashing, - * access the index on the second table and repeat step 3 - * - * dictPrefetch executes the same algorithm as dictFind, but one step at a time - * for each key. Instead of waiting for data to be read from memory, it prefetches - * the data and then moves on to execute the next prefetch for another key. - * - * dictPrefetch can be invoked with a callback function, get_val_data_func, - * to bring the key's value data closer to the L1 cache as well. */ -void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *(*get_val_data_func)(const void *val)) { - PrefetchBatch batch; /* prefetch batch - holds the current batch of keys being prefetched */ - initBatch(keys_dicts, num_keys, keys, &batch); - - while (batch.keys_done < batch.current_batch_size) { - PrefetchInfo *info = getNextPrefetchInfo(&batch); - size_t i = batch.cur_idx; - switch (info->state) { - case PrefetchBucket: - /* Determine which hash table to use */ - if (info->ht_idx == -1) { - info->ht_idx = 0; - } else if (info->ht_idx == 0 && dictIsRehashing(keys_dicts[i])) { - info->ht_idx = 1; - } else { - /* No more tables left - mark as done. */ - markDone(info, &batch); - break; - } - - /* Prefetch the bucket */ - info->bucket_idx = info->key_hash & DICTHT_SIZE_MASK(keys_dicts[i]->ht_size_exp[info->ht_idx]); - prefetch(&keys_dicts[i]->ht_table[info->ht_idx][info->bucket_idx], &batch); - info->current_entry = NULL; - info->state = PrefetchEntry; - break; - - case PrefetchEntry: - /* Prefetch the first entry in the bucket */ - if (info->current_entry) { - /* We already found an entry in the bucket - move to the next entry */ - info->current_entry = dictGetNext(info->current_entry); - } else { - /* Find the first entry in the bucket */ - info->current_entry = keys_dicts[i]->ht_table[info->ht_idx][info->bucket_idx]; - } - - if (info->current_entry) { - prefetch(info->current_entry, &batch); - info->state = PrefetchValue; - } else { - /* No entry found in the bucket - try the bucket in the next table */ - info->state = PrefetchBucket; - } - break; - - case PrefetchValue: { - /* Prefetch the entry's value. */ - void *value = dictGetVal(info->current_entry); - - if (dictGetNext(info->current_entry) == NULL && !dictIsRehashing(keys_dicts[i])) { - /* If this is the last element we assume a hit and dont compare the keys */ - prefetch(value, &batch); - info->state = PrefetchValueData; - break; - } - - void *current_entry_key = dictGetKey(info->current_entry); - if (keys[i] == current_entry_key || dictCompareKeys(keys_dicts[i], keys[i], current_entry_key)) { - /* If the key is found, prefetch the value */ - prefetch(value, &batch); - info->state = PrefetchValueData; - } else { - /* Move to next entry */ - info->state = PrefetchEntry; - } - break; - } - - case PrefetchValueData: { - /* Prefetch value data if available */ - if (get_val_data_func) { - void *value_data = get_val_data_func(dictGetVal(info->current_entry)); - if (value_data) prefetch(value_data, &batch); - } - markDone(info, &batch); - break; - } - - default: assert(0); - } - } -} - /* ------------------------- private functions ------------------------------ */ /* Because we may need to allocate huge memory chunk at once when dict diff --git a/src/dict.h b/src/dict.h index f984023e23..97a79910cb 100644 --- a/src/dict.h +++ b/src/dict.h @@ -45,8 +45,7 @@ #define DICT_ERR 1 /* Hash table parameters */ -#define HASHTABLE_MIN_FILL 8 /* Minimal hash table fill 12.5%(100/8) */ -#define DictMaxPrefetchSize 16 /* Limit of maximum number of dict entries to prefetch */ +#define HASHTABLE_MIN_FILL 8 /* Minimal hash table fill 12.5%(100/8) */ typedef struct dictEntry dictEntry; /* opaque */ typedef struct dict dict; @@ -248,7 +247,6 @@ unsigned long dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata); uint64_t dictGetHash(dict *d, const void *key); void dictRehashingInfo(dict *d, unsigned long long *from_size, unsigned long long *to_size); -void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *(*get_val_data_func)(const void *val)); size_t dictGetStatsMsg(char *buf, size_t bufsize, dictStats *stats, int full); dictStats *dictGetStatsHt(dict *d, int htidx, int full); diff --git a/src/io_threads.c b/src/io_threads.c index c9345d72e0..7a68cfb87f 100644 --- a/src/io_threads.c +++ b/src/io_threads.c @@ -303,6 +303,8 @@ void initIOThreads(void) { serverAssert(server.io_threads_num <= IO_THREADS_MAX_NUM); + prefetchCommandsBatchInit(); + /* Spawn and initialize the I/O threads. */ for (int i = 1; i < server.io_threads_num; i++) { createIOThread(i); diff --git a/src/kvstore.c b/src/kvstore.c index d6f886c95b..b7fa7359ab 100644 --- a/src/kvstore.c +++ b/src/kvstore.c @@ -93,7 +93,7 @@ typedef struct { /**********************************/ /* Get the dictionary pointer based on dict-index. */ -static dict *kvstoreGetDict(kvstore *kvs, int didx) { +dict *kvstoreGetDict(kvstore *kvs, int didx) { return kvs->dicts[didx]; } @@ -828,15 +828,3 @@ int kvstoreDictDelete(kvstore *kvs, int didx, const void *key) { } return ret; } - -void kvstoreDictPrefetch(kvstore **kvs, - int *slots, - const void **keys, - size_t keys_count, - void *(*get_val_data_func)(const void *val)) { - dict *dicts[keys_count]; - for (size_t i = 0; i < keys_count; i++) { - dicts[i] = kvstoreGetDict(kvs[i], slots[i]); - } - dictPrefetch(dicts, keys_count, keys, get_val_data_func); -} diff --git a/src/kvstore.h b/src/kvstore.h index 40d1eab15f..a94f366b6b 100644 --- a/src/kvstore.h +++ b/src/kvstore.h @@ -36,11 +36,6 @@ int kvstoreNumNonEmptyDicts(kvstore *kvs); int kvstoreNumAllocatedDicts(kvstore *kvs); int kvstoreNumDicts(kvstore *kvs); uint64_t kvstoreGetHash(kvstore *kvs, const void *key); -void kvstoreDictPrefetch(kvstore **kvs, - int *slots, - const void **keys, - size_t keys_count, - void *(*get_val_data_func)(const void *val)); /* kvstore iterator specific functions */ kvstoreIterator *kvstoreIteratorInit(kvstore *kvs); diff --git a/src/maa.c b/src/maa.c new file mode 100644 index 0000000000..e5e8f0abe4 --- /dev/null +++ b/src/maa.c @@ -0,0 +1,415 @@ +/* + * maa.c - Memory Access Amortization (MAA) Implementation + * + * This file implements the memory access amortization technique for Valkey. + * It utilizes prefetching keys and data for multiple commands in a batch, + * to improve performance by amortizing memory access costs across multiple operations. + */ + +#include "maa.h" +#include "server.h" +#include "dict.h" + +/* Forward declarations of dict.c functions */ +dictEntry *dictGetNext(const dictEntry *de); + +/* Forward declarations of kvstore.c functions */ +dict *kvstoreGetDict(kvstore *kvs, int didx); + +#define HT_IDX_FIRST 0 +#define HT_IDX_SECOND 1 +#define HT_IDX_INVALID -1 + +typedef enum { + PREFETCH_BUCKET, /* Initial state, determines which hash table to use and prefetch the table's bucket */ + PREFETCH_ENTRY, /* prefetch entries associated with the given key's hash */ + PREFETCH_VALUE, /* prefetch the value object of the entry found in the previous step */ + PREFETCH_VALUE_DATA, /* prefetch the value object's data (if applicable) */ + PREFETCH_DONE /* Indicates that prefetching for this key is complete */ +} PrefetchState; + + +/************************************ State machine diagram for the prefetch operation. ******************************** + │ + start + │ + ┌────────▼─────────┐ + ┌─────────►│ PREFETCH_BUCKET ├────►────────┐ + │ └────────┬─────────┘ no more tables -> done + | bucket|found | + │ | │ + entry not found - goto next table ┌────────▼────────┐ │ + └────◄─────┤ PREFETCH_ENTRY | ▼ + ┌────────────►└────────┬────────┘ │ + | Entry│found │ + │ | │ + value not found - goto next entry ┌───────▼────────┐ | + └───────◄──────┤ PREFETCH_VALUE | ▼ + └───────┬────────┘ │ + Value│found │ + | | + ┌───────────▼──────────────┐ │ + │ PREFETCH_VALUE_DATA │ ▼ + └───────────┬──────────────┘ │ + | │ + ┌───────-─▼─────────────┐ │ + │ PREFETCH_DONE │◄────────┘ + └───────────────────────┘ +**********************************************************************************************************************/ + +typedef void *(*GetValueDataFunc)(const void *val); + +typedef struct PrefetchInfo { + PrefetchState state; /* Current state of the prefetch operation */ + int ht_idx; /* Index of the current hash table (0 or 1 for rehashing) */ + uint64_t bucket_idx; /* Index of the bucket in the current hash table */ + uint64_t key_hash; /* Hash value of the key being prefetched */ + dictEntry *current_entry; /* Pointer to the current entry being processed */ +} PrefetchInfo; + +/* CommandsBatch structure holds the state of the current batch of client commands being processed. */ +typedef struct CommandsBatch { + size_t cur_idx; /* Index of the current key being processed */ + size_t keys_done; /* Number of keys that have been prefetched */ + size_t key_count; /* Number of keys in the current batch */ + size_t client_count; /* Number of clients in the current batch */ + size_t max_prefetch_size; /* Maximum number of keys to prefetch in a batch */ + size_t executed_commands; /* Number of commands executed in the current batch */ + int *slots; /* Array of slots for each key */ + void **keys; /* Array of keys to prefetch in the current batch */ + client **clients; /* Array of clients in the current batch */ + dict **keys_dicts; /* Main dict for each key */ + dict **expire_dicts; /* Expire dict for each key */ + dict **current_dicts; /* Points to either keys_dicts or expire_dicts */ + PrefetchInfo *prefetch_info; /* Prefetch info for each key */ +} CommandsBatch; + +static CommandsBatch *batch = NULL; + +void freePrefetchCommandsBatch(void) { + if (batch == NULL) { + return; + } + + zfree(batch->clients); + zfree(batch->keys); + zfree(batch->keys_dicts); + zfree(batch->expire_dicts); + zfree(batch->slots); + zfree(batch->prefetch_info); + zfree(batch); + batch = NULL; +} + +void prefetchCommandsBatchInit(void) { + serverAssert(!batch); + size_t max_prefetch_size = server.prefetch_batch_max_size; + + if (max_prefetch_size == 0) { + return; + } + + batch = zcalloc(sizeof(CommandsBatch)); + batch->max_prefetch_size = max_prefetch_size; + batch->clients = zcalloc(max_prefetch_size * sizeof(client *)); + batch->keys = zcalloc(max_prefetch_size * sizeof(void *)); + batch->keys_dicts = zcalloc(max_prefetch_size * sizeof(dict *)); + batch->expire_dicts = zcalloc(max_prefetch_size * sizeof(dict *)); + batch->slots = zcalloc(max_prefetch_size * sizeof(int)); + batch->prefetch_info = zcalloc(max_prefetch_size * sizeof(PrefetchInfo)); +} + +void onMaxBatchSizeChange(void) { + if (batch && batch->client_count > 0) { + /* We need to process the current batch before updating the size */ + return; + } + + freePrefetchCommandsBatch(); + prefetchCommandsBatchInit(); +} + +/* Prefetch the given pointer and move to the next key in the batch. */ +static void prefetch(void *addr) { + valkey_prefetch(addr); + /* While the prefetch is in progress, we can continue to the next key */ + batch->cur_idx = (batch->cur_idx + 1) % batch->key_count; +} + +static void markDone(PrefetchInfo *info) { + info->state = PREFETCH_DONE; + server.stat_total_prefetch_entries++; + batch->keys_done++; +} + +/* Returns the next PrefetchInfo structure that needs to be processed. */ +static PrefetchInfo *getNextPrefetchInfo(void) { + size_t start_idx = batch->cur_idx; + do { + PrefetchInfo *info = &batch->prefetch_info[batch->cur_idx]; + if (info->state != PREFETCH_DONE) return info; + batch->cur_idx = (batch->cur_idx + 1) % batch->key_count; + } while (batch->cur_idx != start_idx); + return NULL; +} + +static void initBatchInfo(dict **dicts) { + batch->current_dicts = dicts; + + /* Initialize the prefetch info */ + for (size_t i = 0; i < batch->key_count; i++) { + PrefetchInfo *info = &batch->prefetch_info[i]; + if (!batch->current_dicts[i] || dictSize(batch->current_dicts[i]) == 0) { + info->state = PREFETCH_DONE; + batch->keys_done++; + continue; + } + info->ht_idx = HT_IDX_INVALID; + info->current_entry = NULL; + info->state = PREFETCH_BUCKET; + info->key_hash = dictHashKey(batch->current_dicts[i], batch->keys[i]); + } +} + +/* Prefetch the bucket of the next hash table index. + * If no tables are left, move to the PREFETCH_DONE state. */ +static void prefetchBucket(PrefetchInfo *info) { + size_t i = batch->cur_idx; + + /* Determine which hash table to use */ + if (info->ht_idx == HT_IDX_INVALID) { + info->ht_idx = HT_IDX_FIRST; + } else if (info->ht_idx == HT_IDX_FIRST && dictIsRehashing(batch->current_dicts[i])) { + info->ht_idx = HT_IDX_SECOND; + } else { + /* No more tables left - mark as done. */ + markDone(info); + return; + } + + /* Prefetch the bucket */ + info->bucket_idx = info->key_hash & DICTHT_SIZE_MASK(batch->current_dicts[i]->ht_size_exp[info->ht_idx]); + prefetch(&batch->current_dicts[i]->ht_table[info->ht_idx][info->bucket_idx]); + info->current_entry = NULL; + info->state = PREFETCH_ENTRY; +} + +/* Prefetch the next entry in the bucket and move to the PREFETCH_VALUE state. + * If no more entries in the bucket, move to the PREFETCH_BUCKET state to look at the next table. */ +static void prefetchEntry(PrefetchInfo *info) { + size_t i = batch->cur_idx; + + if (info->current_entry) { + /* We already found an entry in the bucket - move to the next entry */ + info->current_entry = dictGetNext(info->current_entry); + } else { + /* Go to the first entry in the bucket */ + info->current_entry = batch->current_dicts[i]->ht_table[info->ht_idx][info->bucket_idx]; + } + + if (info->current_entry) { + prefetch(info->current_entry); + info->state = PREFETCH_VALUE; + } else { + /* No entry found in the bucket - try the bucket in the next table */ + info->state = PREFETCH_BUCKET; + } +} + +/* Prefetch the entry's value. If the value is found, move to the PREFETCH_VALUE_DATA state. + * If the value is not found, move to the PREFETCH_ENTRY state to look at the next entry in the bucket. */ +static void prefetchValue(PrefetchInfo *info) { + size_t i = batch->cur_idx; + void *value = dictGetVal(info->current_entry); + + if (dictGetNext(info->current_entry) == NULL && !dictIsRehashing(batch->current_dicts[i])) { + /* If this is the last element, we assume a hit and don't compare the keys */ + prefetch(value); + info->state = PREFETCH_VALUE_DATA; + return; + } + + void *current_entry_key = dictGetKey(info->current_entry); + if (batch->keys[i] == current_entry_key || + dictCompareKeys(batch->current_dicts[i], batch->keys[i], current_entry_key)) { + /* If the key is found, prefetch the value */ + prefetch(value); + info->state = PREFETCH_VALUE_DATA; + } else { + /* Move to the next entry */ + info->state = PREFETCH_ENTRY; + } +} + +/* Prefetch the value data if available. */ +static void prefetchValueData(PrefetchInfo *info, GetValueDataFunc get_val_data_func) { + if (get_val_data_func) { + void *value_data = get_val_data_func(dictGetVal(info->current_entry)); + if (value_data) prefetch(value_data); + } + markDone(info); +} + +/* Prefetch dictionary data for an array of keys. + * + * This function takes an array of dictionaries and keys, attempting to bring + * data closer to the L1 cache that might be needed for dictionary operations + * on those keys. + * + * The dictFind algorithm: + * 1. Evaluate the hash of the key + * 2. Access the index in the first table + * 3. Walk the entries linked list until the key is found + * If the key hasn't been found and the dictionary is in the middle of rehashing, + * access the index on the second table and repeat step 3 + * + * dictPrefetch executes the same algorithm as dictFind, but one step at a time + * for each key. Instead of waiting for data to be read from memory, it prefetches + * the data and then moves on to execute the next prefetch for another key. + * + * dicts - An array of dictionaries to prefetch data from. + * get_val_data_func - A callback function that dictPrefetch can invoke + * to bring the key's value data closer to the L1 cache as well. + */ +static void dictPrefetch(dict **dicts, GetValueDataFunc get_val_data_func) { + initBatchInfo(dicts); + PrefetchInfo *info; + while ((info = getNextPrefetchInfo())) { + switch (info->state) { + case PREFETCH_BUCKET: prefetchBucket(info); break; + case PREFETCH_ENTRY: prefetchEntry(info); break; + case PREFETCH_VALUE: prefetchValue(info); break; + case PREFETCH_VALUE_DATA: prefetchValueData(info, get_val_data_func); break; + default: serverPanic("Unknown prefetch state %d", info->state); + } + } +} + +/* Helper function to get the value pointer of an object. */ +static void *getObjectValuePtr(const void *val) { + robj *o = (robj *)val; + return (o->type == OBJ_STRING && o->encoding == OBJ_ENCODING_RAW) ? o->ptr : NULL; +} + +static void resetCommandsBatch(void) { + batch->cur_idx = 0; + batch->keys_done = 0; + batch->key_count = 0; + batch->client_count = 0; + batch->executed_commands = 0; +} + +/* Prefetch command-related data: + * 1. Prefetch the command arguments allocated by the I/O thread to bring them closer to the L1 cache. + * 2. Prefetch the keys and values for all commands in the current batch from the main and expires dictionaries. */ +static void prefetchCommands(void) { + /* Prefetch argv's for all clients */ + for (size_t i = 0; i < batch->client_count; i++) { + client *c = batch->clients[i]; + if (!c || c->argc <= 1) continue; + /* Skip prefetching first argv (cmd name) it was already looked up by the I/O thread. */ + for (int j = 1; j < c->argc; j++) { + valkey_prefetch(c->argv[j]); + } + } + + /* Prefetch the argv->ptr if required */ + for (size_t i = 0; i < batch->client_count; i++) { + client *c = batch->clients[i]; + if (!c || c->argc <= 1) continue; + for (int j = 1; j < c->argc; j++) { + if (c->argv[j]->encoding == OBJ_ENCODING_RAW) { + valkey_prefetch(c->argv[j]->ptr); + } + } + } + + /* Get the keys ptrs - we do it here after the key obj was prefetched. */ + for (size_t i = 0; i < batch->key_count; i++) { + batch->keys[i] = ((robj *)batch->keys[i])->ptr; + } + + /* Prefetch dict keys for all commands. Prefetching is beneficial only if there are more than one key. */ + if (batch->key_count > 1) { + server.stat_total_prefetch_batches++; + /* Prefetch keys from the main dict */ + dictPrefetch(batch->keys_dicts, getObjectValuePtr); + /* Prefetch keys from the expires dict - no value data to prefetch */ + dictPrefetch(batch->expire_dicts, NULL); + } +} + +/* Processes all the prefetched commands in the current batch. */ +void processClientsCommandsBatch(void) { + if (!batch || batch->client_count == 0) return; + + /* If executed_commands is not 0, + * it means that we are in the middle of processing a batch and this is a recursive call */ + if (batch->executed_commands == 0) { + prefetchCommands(); + } + + /* Process the commands */ + for (size_t i = 0; i < batch->client_count; i++) { + client *c = batch->clients[i]; + if (c == NULL) continue; + + /* Set the client to null immediately to avoid accessing it again recursively when ProcessingEventsWhileBlocked */ + batch->clients[i] = NULL; + batch->executed_commands++; + if (processPendingCommandAndInputBuffer(c) != C_ERR) beforeNextClient(c); + } + + resetCommandsBatch(); + + /* Handle the case where the max prefetch size has been changed during the batch processing */ + if (batch->max_prefetch_size != (size_t)server.prefetch_batch_max_size) { + onMaxBatchSizeChange(); + } +} + +/* Adds the client's command to the current batch and processes the batch + * if it becomes full. + * + * Returns C_OK if the command was added successfully, C_ERR otherwise. */ +int addCommandToBatchAndProcessIfFull(client *c) { + if (!batch) return C_ERR; + + batch->clients[batch->client_count++] = c; + + /* Get command's keys positions */ + if (c->io_parsed_cmd) { + getKeysResult result; + initGetKeysResult(&result); + int num_keys = getKeysFromCommand(c->io_parsed_cmd, c->argv, c->argc, &result); + for (int i = 0; i < num_keys && batch->key_count < batch->max_prefetch_size; i++) { + batch->keys[batch->key_count] = c->argv[result.keys[i].pos]; + batch->slots[batch->key_count] = c->slot > 0 ? c->slot : 0; + batch->keys_dicts[batch->key_count] = kvstoreGetDict(c->db->keys, batch->slots[batch->key_count]); + batch->expire_dicts[batch->key_count] = kvstoreGetDict(c->db->expires, batch->slots[batch->key_count]); + batch->key_count++; + } + getKeysFreeResult(&result); + } + + /* If the batch is full, process it. + * We also check the client count to handle cases where + * no keys exist for the clients' commands. */ + if (batch->client_count == batch->max_prefetch_size || batch->key_count == batch->max_prefetch_size) { + processClientsCommandsBatch(); + } + + return C_OK; +} + +/* Removes the given client from the pending prefetch batch, if present. */ +void removeClientFromPendingCommandsBatch(client *c) { + if (!batch) return; + + for (size_t i = 0; i < batch->client_count; i++) { + if (batch->clients[i] == c) { + batch->clients[i] = NULL; + return; + } + } +} diff --git a/src/maa.h b/src/maa.h new file mode 100644 index 0000000000..3e97e575e7 --- /dev/null +++ b/src/maa.h @@ -0,0 +1,12 @@ +#ifndef MAA_H +#define MAA_H + +struct client; + +void onMaxBatchSizeChange(void); +void prefetchCommandsBatchInit(void); +void processClientsCommandsBatch(void); +int addCommandToBatchAndProcessIfFull(struct client *c); +void removeClientFromPendingCommandsBatch(struct client *c); + +#endif /* MAA_H */ diff --git a/src/networking.c b/src/networking.c index 3965325123..ef32dc9688 100644 --- a/src/networking.c +++ b/src/networking.c @@ -33,8 +33,8 @@ #include "script.h" #include "fpconv_dtoa.h" #include "fmtargs.h" -#include #include "io_threads.h" +#include #include #include #include @@ -45,7 +45,6 @@ static void setProtocolError(const char *errstr, client *c); static void pauseClientsByClient(mstime_t end, int isPauseClientAll); int postponeClientRead(client *c); char *getClientSockname(client *c); -void removeClientFromPendingPrefetchBatch(client *c); int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */ __thread sds thread_shared_qb = NULL; @@ -1506,7 +1505,7 @@ void unlinkClient(client *c) { listDelNode(server.clients, c->client_list_node); c->client_list_node = NULL; } - removeClientFromPendingPrefetchBatch(c); + removeClientFromPendingCommandsBatch(c); /* Check if this is a replica waiting for diskless replication (rdb pipe), * in which case it needs to be cleaned from that list */ @@ -4615,124 +4614,11 @@ int postponeClientRead(client *c) { return (trySendReadToIOThreads(c) == C_OK); } -/* Prefetch multiple commands batch */ -typedef struct { - client *clients[DictMaxPrefetchSize]; - size_t client_count; - size_t key_count; - void *keys[DictMaxPrefetchSize]; - kvstore *keys_kvs[DictMaxPrefetchSize]; - kvstore *expire_kvs[DictMaxPrefetchSize]; - int slots[DictMaxPrefetchSize]; -} BatchProcessData; - -static BatchProcessData batch = {0}; - -static void *getObjectValuePtr(const void *val) { - robj *o = (robj *)val; - if (o->type == OBJ_STRING && o->encoding == OBJ_ENCODING_RAW) { - return o->ptr; - } - return NULL; -} - -static void batchProcessClientCommands(void) { - for (size_t i = 0; i < batch.client_count; i++) { - client *c = batch.clients[i]; - if (c) { - /* Set immediately the client to null - in order to not access it again when ProcessingEventsWhileBlocked */ - batch.clients[i] = NULL; - if (processPendingCommandAndInputBuffer(c) != C_ERR) { - beforeNextClient(c); - } - } - } - memset(&batch, 0, sizeof(batch)); -} - -/*Prefetch the commands' args allocated by the I/O thread and process all the commands in the batch.*/ -static void batchPrefetchArgsAndProcessClientCommands(void) { - if (batch.client_count == 0) return; - /* Prefetch argv's for all clients */ - for (size_t i = 0; i < batch.client_count; i++) { - client *c = batch.clients[i]; - if (!c || c->argc <= 1) continue; - /* Skip prefetching first argv (cmd name) it was already looked up by the I/O thread. */ - for (int j = 1; j < c->argc; j++) { - valkey_prefetch(c->argv[j]); - } - } - - /* prefetch the argv->ptr if required */ - for (size_t i = 0; i < batch.client_count; i++) { - client *c = batch.clients[i]; - if (!c || c->argc <= 1) continue; - for (int j = 1; j < c->argc; j++) { - if (c->argv[j]->encoding == OBJ_ENCODING_RAW) { - valkey_prefetch(c->argv[j]->ptr); - } - } - } - - /* Get the keys ptrs - we do it here since we wanted to wait for the arg prefetch */ - for (size_t i = 0; i < batch.key_count; i++) { - batch.keys[i] = ((robj *)batch.keys[i])->ptr; - } - - /* Prefetch keys for all commands, prefetch is beneficial only if there are more than one key */ - if (batch.key_count > 1) { - server.stat_total_prefetch_batches++; - server.stat_total_prefetch_entries += batch.key_count; - /* Keys */ - kvstoreDictPrefetch(batch.keys_kvs, batch.slots, (const void **) batch.keys, batch.key_count, getObjectValuePtr); - /* Expires - with expires no values prefetch are required. */ - kvstoreDictPrefetch(batch.expire_kvs, batch.slots, (const void **)batch.keys, batch.key_count, NULL); - } - - /* Process clients' commands */ - batchProcessClientCommands(); -} - -void addCommandToBatchAndProcessIfFull(client *c) { - batch.clients[batch.client_count++] = c; - - /* Get command's keys positions */ - if (c->io_parsed_cmd) { - getKeysResult result; - initGetKeysResult(&result); - int num_keys = getKeysFromCommand(c->io_parsed_cmd, c->argv, c->argc, &result); - for (int i = 0; i < num_keys && batch.key_count < DictMaxPrefetchSize; i++) { - batch.keys[batch.key_count] = c->argv[result.keys[i].pos]; - batch.slots[batch.key_count] = c->slot > 0 ? c->slot : 0; - batch.keys_kvs[batch.key_count] = c->db->keys; - batch.expire_kvs[batch.key_count] = c->db->expires; - batch.key_count++; - } - getKeysFreeResult(&result); - } - - /* If the batch is full, process it. - * We also check the client count to handle cases where - * no keys exist for the clients' commands. */ - if (batch.client_count == DictMaxPrefetchSize || batch.key_count == DictMaxPrefetchSize) { - batchPrefetchArgsAndProcessClientCommands(); - } -} - -void removeClientFromPendingPrefetchBatch(client *c) { - for (size_t i = 0; i < batch.client_count; i++) { - if (batch.clients[i] == c) { - batch.clients[i] = NULL; - return; - } - } -} - int processIOThreadsReadDone(void) { if (ProcessingEventsWhileBlocked) { /* When ProcessingEventsWhileBlocked we may call processIOThreadsReadDone recursively. * In this case, there may be some clients left in the batch waiting to be processed. */ - batchProcessClientCommands(); + processClientsCommandsBatch(); } if (listLength(server.clients_pending_io_read) == 0) return 0; @@ -4760,6 +4646,7 @@ int processIOThreadsReadDone(void) { /* Don't post-process-reads to clients that are going to be closed anyway. */ if (c->flag.close_asap) continue; + /* If a client is protected, don't do anything, * that may trigger read/write error or recreate handler. */ if (c->flag.protected) continue; @@ -4790,15 +4677,20 @@ int processIOThreadsReadDone(void) { c->flag.pending_command = 1; } - size_t list_len = listLength(server.clients_pending_io_read); - addCommandToBatchAndProcessIfFull(c); - if (list_len != listLength(server.clients_pending_io_read)) { - /* A client was removed from the list - next node may be invalid */ + size_t list_length_before_command_execute = listLength(server.clients_pending_io_read); + /* try to add the command to the batch */ + int ret = addCommandToBatchAndProcessIfFull(c); + /* If the command was not added to the commands batch, process it immediately */ + if (ret == C_ERR) { + if (processPendingCommandAndInputBuffer(c) == C_OK) beforeNextClient(c); + } + if (list_length_before_command_execute != listLength(server.clients_pending_io_read)) { + /* A client was unlink from the list possibly making the next node invalid */ next = listFirst(server.clients_pending_io_read); } } - batchPrefetchArgsAndProcessClientCommands(); + processClientsCommandsBatch(); return processed; } diff --git a/src/server.h b/src/server.h index d1a4d94190..d6d82ac81a 100644 --- a/src/server.h +++ b/src/server.h @@ -79,6 +79,7 @@ typedef long long ustime_t; /* microsecond time type. */ N-elements flat arrays */ #include "rax.h" /* Radix tree */ #include "connection.h" /* Connection abstraction */ +#include "maa.h" /* Memory access amortization */ #define VALKEYMODULE_CORE 1 typedef struct serverObject robj; @@ -1747,6 +1748,7 @@ struct valkeyServer { int io_threads_do_reads; /* Read and parse from IO threads? */ int active_io_threads_num; /* Current number of active IO threads, includes main thread. */ int events_per_io_thread; /* Number of events on the event loop to trigger IO threads activation. */ + int prefetch_batch_max_size; /* Maximum number of keys to prefetch in a single batch */ long long events_processed_while_blocked; /* processEventsWhileBlocked() */ int enable_protected_configs; /* Enable the modification of protected configs, see PROTECTED_ACTION_ALLOWED_* */ int enable_debug_cmd; /* Enable DEBUG commands, see PROTECTED_ACTION_ALLOWED_* */ diff --git a/tests/unit/networking.tcl b/tests/unit/networking.tcl index 1740436c2b..3ddda042b5 100644 --- a/tests/unit/networking.tcl +++ b/tests/unit/networking.tcl @@ -181,6 +181,9 @@ start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-deb set rd$i [valkey_deferring_client] } + # set a key that will be later be prefetch + r set a 0 + # Get the client ID of rd4 $rd4 client id set rd4_id [$rd4 read] @@ -216,5 +219,68 @@ start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-deb assert_equal {OK} [$rd16 read] assert_equal {16} [$rd16 read] } + + test {prefetch works as expected when changing the batch size while executing the commands batch} { + # Create 16 (default prefetch batch size) clients + for {set i 0} {$i < 16} {incr i} { + set rd$i [valkey_deferring_client] + } + + # Create a batch of commands by making sure the server sleeps for a while + # before responding to the first command + $rd0 debug sleep 2 + after 200 ; # wait a bit to make sure the server is sleeping. + + # Send set commands for all clients the 5th client will change the prefetch batch size + for {set i 0} {$i < 16} {incr i} { + if {$i == 4} { + [set rd$i] config set prefetch-batch-max-size 1 + } + [set rd$i] set a $i + [set rd$i] flush + } + + # Read the results + for {set i 0} {$i < 16} {incr i} { + assert_equal {OK} [[set rd$i] read] + } + + # assert the configured prefetch batch size was changed + assert {[r config get prefetch-batch-max-size] eq "prefetch-batch-max-size 1"} + } + + test {no prefetch when the batch size is set to 0} { + # set the batch size to 0 + r config set prefetch-batch-max-size 0 + # save the current value of prefetch entries + set info [r info stats] + set prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries] + + # Create 16 (default prefetch batch size) clients + for {set i 0} {$i < 16} {incr i} { + set rd$i [valkey_deferring_client] + } + + # Create a batch of commands by making sure the server sleeps for a while + # before responding to the first command + $rd0 debug sleep 2 + after 200 ; # wait a bit to make sure the server is sleeping. + + # Send set commands for all clients + for {set i 0} {$i < 16} {incr i} { + [set rd$i] set a $i + [set rd$i] flush + } + + # Read the results + for {set i 0} {$i < 16} {incr i} { + assert_equal {OK} [[set rd$i] read] + } + + # assert the prefetch entries did not change + set info [r info stats] + set new_prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries] + assert_equal $prefetch_entries $new_prefetch_entries + } } } diff --git a/valkey.conf b/valkey.conf index 68f4ad1f72..dbddcc80be 100644 --- a/valkey.conf +++ b/valkey.conf @@ -1325,7 +1325,20 @@ lazyfree-lazy-user-flush no # to thread the write and read syscall and transfer the client buffers to the # socket and to enable threading of reads and protocol parsing. # -# NOTE 2: If you want to test the server speedup using valkey-benchmark, make +# prefetch-batch-max-size 16 +# +# When multiple commands are parsed by the I/O threads and ready for execution, +# we take advantage of knowing the next set of commands and prefetch their +# required dictionary entries in a batch. This reduces memory access costs. +# +# The optimal batch size depends on the specific workflow of the user. +# The default batch size is 16, which can be modified using the +# 'prefetch-batch-max-size' config. +# +# When the config is set to 0, it means no prefetching will be done. +# This effectively disables the prefetching feature. +# +# NOTE: If you want to test the server speedup using valkey-benchmark, make # sure you also run the benchmark itself in threaded mode, using the # --threads option to match the number of server threads, otherwise you'll not # be able to notice the improvements. From d300b1b056cb7bbbddbf3f01cd57142573bcb6a5 Mon Sep 17 00:00:00 2001 From: Uri Yagelnik Date: Tue, 20 Aug 2024 02:24:53 +0000 Subject: [PATCH 5/7] Adress PR comments Signed-off-by: Uri Yagelnik --- src/Makefile | 2 +- src/{maa.c => memory_prefetch.c} | 13 ++++--------- src/{maa.h => memory_prefetch.h} | 6 +++--- src/server.h | 2 +- valkey.conf | 7 +++---- 5 files changed, 12 insertions(+), 18 deletions(-) rename src/{maa.c => memory_prefetch.c} (97%) rename src/{maa.h => memory_prefetch.h} (74%) diff --git a/src/Makefile b/src/Makefile index d5cd77be4c..f3474094eb 100644 --- a/src/Makefile +++ b/src/Makefile @@ -423,7 +423,7 @@ endif ENGINE_NAME=valkey SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX) ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX) -ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o maa.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o +ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX) ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX) diff --git a/src/maa.c b/src/memory_prefetch.c similarity index 97% rename from src/maa.c rename to src/memory_prefetch.c index e5e8f0abe4..a8a7beecff 100644 --- a/src/maa.c +++ b/src/memory_prefetch.c @@ -1,12 +1,9 @@ /* - * maa.c - Memory Access Amortization (MAA) Implementation - * - * This file implements the memory access amortization technique for Valkey. - * It utilizes prefetching keys and data for multiple commands in a batch, + * This file utilizes prefetching keys and data for multiple commands in a batch, * to improve performance by amortizing memory access costs across multiple operations. */ -#include "maa.h" +#include "memory_prefetch.h" #include "server.h" #include "dict.h" @@ -16,9 +13,7 @@ dictEntry *dictGetNext(const dictEntry *de); /* Forward declarations of kvstore.c functions */ dict *kvstoreGetDict(kvstore *kvs, int didx); -#define HT_IDX_FIRST 0 -#define HT_IDX_SECOND 1 -#define HT_IDX_INVALID -1 +typedef enum { HT_IDX_FIRST = 0, HT_IDX_SECOND = 1, HT_IDX_INVALID = -1 } HashTableIndex; typedef enum { PREFETCH_BUCKET, /* Initial state, determines which hash table to use and prefetch the table's bucket */ @@ -61,7 +56,7 @@ typedef void *(*GetValueDataFunc)(const void *val); typedef struct PrefetchInfo { PrefetchState state; /* Current state of the prefetch operation */ - int ht_idx; /* Index of the current hash table (0 or 1 for rehashing) */ + HashTableIndex ht_idx; /* Index of the current hash table (0 or 1 for rehashing) */ uint64_t bucket_idx; /* Index of the bucket in the current hash table */ uint64_t key_hash; /* Hash value of the key being prefetched */ dictEntry *current_entry; /* Pointer to the current entry being processed */ diff --git a/src/maa.h b/src/memory_prefetch.h similarity index 74% rename from src/maa.h rename to src/memory_prefetch.h index 3e97e575e7..428f097e05 100644 --- a/src/maa.h +++ b/src/memory_prefetch.h @@ -1,5 +1,5 @@ -#ifndef MAA_H -#define MAA_H +#ifndef MEMORY_PREFETCH_H +#define MEMORY_PREFETCH_H struct client; @@ -9,4 +9,4 @@ void processClientsCommandsBatch(void); int addCommandToBatchAndProcessIfFull(struct client *c); void removeClientFromPendingCommandsBatch(struct client *c); -#endif /* MAA_H */ +#endif /* MEMORY_PREFETCH_H */ diff --git a/src/server.h b/src/server.h index d6d82ac81a..bd6e34239d 100644 --- a/src/server.h +++ b/src/server.h @@ -79,7 +79,7 @@ typedef long long ustime_t; /* microsecond time type. */ N-elements flat arrays */ #include "rax.h" /* Radix tree */ #include "connection.h" /* Connection abstraction */ -#include "maa.h" /* Memory access amortization */ +#include "memory_prefetch.h" #define VALKEYMODULE_CORE 1 typedef struct serverObject robj; diff --git a/valkey.conf b/valkey.conf index dbddcc80be..4072c81b56 100644 --- a/valkey.conf +++ b/valkey.conf @@ -1325,8 +1325,6 @@ lazyfree-lazy-user-flush no # to thread the write and read syscall and transfer the client buffers to the # socket and to enable threading of reads and protocol parsing. # -# prefetch-batch-max-size 16 -# # When multiple commands are parsed by the I/O threads and ready for execution, # we take advantage of knowing the next set of commands and prefetch their # required dictionary entries in a batch. This reduces memory access costs. @@ -1335,8 +1333,9 @@ lazyfree-lazy-user-flush no # The default batch size is 16, which can be modified using the # 'prefetch-batch-max-size' config. # -# When the config is set to 0, it means no prefetching will be done. -# This effectively disables the prefetching feature. +# When the config is set to 0, prefetching is disabled. +# +# prefetch-batch-max-size 16 # # NOTE: If you want to test the server speedup using valkey-benchmark, make # sure you also run the benchmark itself in threaded mode, using the From c3195720e009c92e6d44a00aa0d43eee1460ba5d Mon Sep 17 00:00:00 2001 From: Uri Yagelnik Date: Sun, 25 Aug 2024 07:16:21 +0000 Subject: [PATCH 6/7] PR comments Signed-off-by: Uri Yagelnik --- src/config.c | 8 +--- src/dict.c | 1 - src/dict.h | 1 + src/io_threads.c | 6 +++ src/kvstore.h | 1 + src/memory_prefetch.c | 90 ++++++++++++++++++++------------------- src/memory_prefetch.h | 1 - tests/unit/networking.tcl | 58 +++++++++++++------------ 8 files changed, 87 insertions(+), 79 deletions(-) diff --git a/src/config.c b/src/config.c index 364aa38fe1..7ef9d58a23 100644 --- a/src/config.c +++ b/src/config.c @@ -2564,12 +2564,6 @@ static int updateOOMScoreAdj(const char **err) { return 1; } -static int UpdateMaxPrefetchBatchSize(const char **err) { - UNUSED(err); - onMaxBatchSizeChange(); - return 1; -} - int invalidateClusterSlotsResp(const char **err) { UNUSED(err); clearCachedClusterSlotsResponse(); @@ -3170,7 +3164,7 @@ standardConfig static_configs[] = { createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */ createIntConfig("io-threads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, 1, 128, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */ createIntConfig("events-per-io-thread", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.events_per_io_thread, 2, INTEGER_CONFIG, NULL, NULL), - createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG, 0, 128, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, UpdateMaxPrefetchBatchSize), + createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG, 0, 128, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, NULL), createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL), createIntConfig("cluster-replica-validity-factor", "cluster-slave-validity-factor", MODIFIABLE_CONFIG, 0, INT_MAX, server.cluster_replica_validity_factor, 10, INTEGER_CONFIG, NULL, NULL), /* replica max data age factor. */ createIntConfig("list-max-listpack-size", "list-max-ziplist-size", MODIFIABLE_CONFIG, INT_MIN, INT_MAX, server.list_max_listpack_size, -2, INTEGER_CONFIG, NULL, NULL), diff --git a/src/dict.c b/src/dict.c index 9ad3fd0abf..851718626e 100644 --- a/src/dict.c +++ b/src/dict.c @@ -120,7 +120,6 @@ static void _dictExpandIfNeeded(dict *d); static void _dictShrinkIfNeeded(dict *d); static signed char _dictNextExp(unsigned long size); static int _dictInit(dict *d, dictType *type); -dictEntry *dictGetNext(const dictEntry *de); static dictEntry **dictGetNextRef(dictEntry *de); static void dictSetNext(dictEntry *de, dictEntry *next); diff --git a/src/dict.h b/src/dict.h index 97a79910cb..1671533f5c 100644 --- a/src/dict.h +++ b/src/dict.h @@ -229,6 +229,7 @@ void dictInitIterator(dictIterator *iter, dict *d); void dictInitSafeIterator(dictIterator *iter, dict *d); void dictResetIterator(dictIterator *iter); dictEntry *dictNext(dictIterator *iter); +dictEntry *dictGetNext(const dictEntry *de); void dictReleaseIterator(dictIterator *iter); dictEntry *dictGetRandomKey(dict *d); dictEntry *dictGetFairRandomKey(dict *d); diff --git a/src/io_threads.c b/src/io_threads.c index 7a68cfb87f..5b2230f635 100644 --- a/src/io_threads.c +++ b/src/io_threads.c @@ -1,3 +1,9 @@ +/* + * Copyright Valkey Contributors. + * All rights reserved. + * SPDX-License-Identifier: BSD 3-Clause + */ + #include "io_threads.h" static __thread int thread_id = 0; /* Thread local var */ diff --git a/src/kvstore.h b/src/kvstore.h index a94f366b6b..202f6a9c25 100644 --- a/src/kvstore.h +++ b/src/kvstore.h @@ -76,5 +76,6 @@ void kvstoreDictSetVal(kvstore *kvs, int didx, dictEntry *de, void *val); dictEntry *kvstoreDictTwoPhaseUnlinkFind(kvstore *kvs, int didx, const void *key, dictEntry ***plink, int *table_index); void kvstoreDictTwoPhaseUnlinkFree(kvstore *kvs, int didx, dictEntry *he, dictEntry **plink, int table_index); int kvstoreDictDelete(kvstore *kvs, int didx, const void *key); +dict *kvstoreGetDict(kvstore *kvs, int didx); #endif /* DICTARRAY_H_ */ diff --git a/src/memory_prefetch.c b/src/memory_prefetch.c index a8a7beecff..01c510638a 100644 --- a/src/memory_prefetch.c +++ b/src/memory_prefetch.c @@ -1,4 +1,8 @@ /* + * Copyright Valkey Contributors. + * All rights reserved. + * SPDX-License-Identifier: BSD 3-Clause + * * This file utilizes prefetching keys and data for multiple commands in a batch, * to improve performance by amortizing memory access costs across multiple operations. */ @@ -48,38 +52,38 @@ typedef enum { └───────────┬──────────────┘ │ | │ ┌───────-─▼─────────────┐ │ - │ PREFETCH_DONE │◄────────┘ + │ PREFETCH_DONE │◄────────┘ └───────────────────────┘ **********************************************************************************************************************/ typedef void *(*GetValueDataFunc)(const void *val); -typedef struct PrefetchInfo { +typedef struct KeyPrefetchInfo { PrefetchState state; /* Current state of the prefetch operation */ HashTableIndex ht_idx; /* Index of the current hash table (0 or 1 for rehashing) */ uint64_t bucket_idx; /* Index of the bucket in the current hash table */ uint64_t key_hash; /* Hash value of the key being prefetched */ dictEntry *current_entry; /* Pointer to the current entry being processed */ -} PrefetchInfo; - -/* CommandsBatch structure holds the state of the current batch of client commands being processed. */ -typedef struct CommandsBatch { - size_t cur_idx; /* Index of the current key being processed */ - size_t keys_done; /* Number of keys that have been prefetched */ - size_t key_count; /* Number of keys in the current batch */ - size_t client_count; /* Number of clients in the current batch */ - size_t max_prefetch_size; /* Maximum number of keys to prefetch in a batch */ - size_t executed_commands; /* Number of commands executed in the current batch */ - int *slots; /* Array of slots for each key */ - void **keys; /* Array of keys to prefetch in the current batch */ - client **clients; /* Array of clients in the current batch */ - dict **keys_dicts; /* Main dict for each key */ - dict **expire_dicts; /* Expire dict for each key */ - dict **current_dicts; /* Points to either keys_dicts or expire_dicts */ - PrefetchInfo *prefetch_info; /* Prefetch info for each key */ -} CommandsBatch; - -static CommandsBatch *batch = NULL; +} KeyPrefetchInfo; + +/* PrefetchCommandsBatch structure holds the state of the current batch of client commands being processed. */ +typedef struct PrefetchCommandsBatch { + size_t cur_idx; /* Index of the current key being processed */ + size_t keys_done; /* Number of keys that have been prefetched */ + size_t key_count; /* Number of keys in the current batch */ + size_t client_count; /* Number of clients in the current batch */ + size_t max_prefetch_size; /* Maximum number of keys to prefetch in a batch */ + size_t executed_commands; /* Number of commands executed in the current batch */ + int *slots; /* Array of slots for each key */ + void **keys; /* Array of keys to prefetch in the current batch */ + client **clients; /* Array of clients in the current batch */ + dict **keys_dicts; /* Main dict for each key */ + dict **expire_dicts; /* Expire dict for each key */ + dict **current_dicts; /* Points to either keys_dicts or expire_dicts */ + KeyPrefetchInfo *prefetch_info; /* Prefetch info for each key */ +} PrefetchCommandsBatch; + +static PrefetchCommandsBatch *batch = NULL; void freePrefetchCommandsBatch(void) { if (batch == NULL) { @@ -104,14 +108,14 @@ void prefetchCommandsBatchInit(void) { return; } - batch = zcalloc(sizeof(CommandsBatch)); + batch = zcalloc(sizeof(PrefetchCommandsBatch)); batch->max_prefetch_size = max_prefetch_size; batch->clients = zcalloc(max_prefetch_size * sizeof(client *)); batch->keys = zcalloc(max_prefetch_size * sizeof(void *)); batch->keys_dicts = zcalloc(max_prefetch_size * sizeof(dict *)); batch->expire_dicts = zcalloc(max_prefetch_size * sizeof(dict *)); batch->slots = zcalloc(max_prefetch_size * sizeof(int)); - batch->prefetch_info = zcalloc(max_prefetch_size * sizeof(PrefetchInfo)); + batch->prefetch_info = zcalloc(max_prefetch_size * sizeof(KeyPrefetchInfo)); } void onMaxBatchSizeChange(void) { @@ -125,23 +129,23 @@ void onMaxBatchSizeChange(void) { } /* Prefetch the given pointer and move to the next key in the batch. */ -static void prefetch(void *addr) { +static void prefetchAndMoveToNextKey(void *addr) { valkey_prefetch(addr); /* While the prefetch is in progress, we can continue to the next key */ batch->cur_idx = (batch->cur_idx + 1) % batch->key_count; } -static void markDone(PrefetchInfo *info) { +static void markKeyAsdone(KeyPrefetchInfo *info) { info->state = PREFETCH_DONE; server.stat_total_prefetch_entries++; batch->keys_done++; } -/* Returns the next PrefetchInfo structure that needs to be processed. */ -static PrefetchInfo *getNextPrefetchInfo(void) { +/* Returns the next KeyPrefetchInfo structure that needs to be processed. */ +static KeyPrefetchInfo *getNextPrefetchInfo(void) { size_t start_idx = batch->cur_idx; do { - PrefetchInfo *info = &batch->prefetch_info[batch->cur_idx]; + KeyPrefetchInfo *info = &batch->prefetch_info[batch->cur_idx]; if (info->state != PREFETCH_DONE) return info; batch->cur_idx = (batch->cur_idx + 1) % batch->key_count; } while (batch->cur_idx != start_idx); @@ -153,7 +157,7 @@ static void initBatchInfo(dict **dicts) { /* Initialize the prefetch info */ for (size_t i = 0; i < batch->key_count; i++) { - PrefetchInfo *info = &batch->prefetch_info[i]; + KeyPrefetchInfo *info = &batch->prefetch_info[i]; if (!batch->current_dicts[i] || dictSize(batch->current_dicts[i]) == 0) { info->state = PREFETCH_DONE; batch->keys_done++; @@ -168,7 +172,7 @@ static void initBatchInfo(dict **dicts) { /* Prefetch the bucket of the next hash table index. * If no tables are left, move to the PREFETCH_DONE state. */ -static void prefetchBucket(PrefetchInfo *info) { +static void prefetchBucket(KeyPrefetchInfo *info) { size_t i = batch->cur_idx; /* Determine which hash table to use */ @@ -178,20 +182,20 @@ static void prefetchBucket(PrefetchInfo *info) { info->ht_idx = HT_IDX_SECOND; } else { /* No more tables left - mark as done. */ - markDone(info); + markKeyAsdone(info); return; } /* Prefetch the bucket */ info->bucket_idx = info->key_hash & DICTHT_SIZE_MASK(batch->current_dicts[i]->ht_size_exp[info->ht_idx]); - prefetch(&batch->current_dicts[i]->ht_table[info->ht_idx][info->bucket_idx]); + prefetchAndMoveToNextKey(&batch->current_dicts[i]->ht_table[info->ht_idx][info->bucket_idx]); info->current_entry = NULL; info->state = PREFETCH_ENTRY; } /* Prefetch the next entry in the bucket and move to the PREFETCH_VALUE state. * If no more entries in the bucket, move to the PREFETCH_BUCKET state to look at the next table. */ -static void prefetchEntry(PrefetchInfo *info) { +static void prefetchEntry(KeyPrefetchInfo *info) { size_t i = batch->cur_idx; if (info->current_entry) { @@ -203,7 +207,7 @@ static void prefetchEntry(PrefetchInfo *info) { } if (info->current_entry) { - prefetch(info->current_entry); + prefetchAndMoveToNextKey(info->current_entry); info->state = PREFETCH_VALUE; } else { /* No entry found in the bucket - try the bucket in the next table */ @@ -213,13 +217,13 @@ static void prefetchEntry(PrefetchInfo *info) { /* Prefetch the entry's value. If the value is found, move to the PREFETCH_VALUE_DATA state. * If the value is not found, move to the PREFETCH_ENTRY state to look at the next entry in the bucket. */ -static void prefetchValue(PrefetchInfo *info) { +static void prefetchValue(KeyPrefetchInfo *info) { size_t i = batch->cur_idx; void *value = dictGetVal(info->current_entry); if (dictGetNext(info->current_entry) == NULL && !dictIsRehashing(batch->current_dicts[i])) { /* If this is the last element, we assume a hit and don't compare the keys */ - prefetch(value); + prefetchAndMoveToNextKey(value); info->state = PREFETCH_VALUE_DATA; return; } @@ -228,7 +232,7 @@ static void prefetchValue(PrefetchInfo *info) { if (batch->keys[i] == current_entry_key || dictCompareKeys(batch->current_dicts[i], batch->keys[i], current_entry_key)) { /* If the key is found, prefetch the value */ - prefetch(value); + prefetchAndMoveToNextKey(value); info->state = PREFETCH_VALUE_DATA; } else { /* Move to the next entry */ @@ -237,12 +241,12 @@ static void prefetchValue(PrefetchInfo *info) { } /* Prefetch the value data if available. */ -static void prefetchValueData(PrefetchInfo *info, GetValueDataFunc get_val_data_func) { +static void prefetchValueData(KeyPrefetchInfo *info, GetValueDataFunc get_val_data_func) { if (get_val_data_func) { void *value_data = get_val_data_func(dictGetVal(info->current_entry)); - if (value_data) prefetch(value_data); + if (value_data) prefetchAndMoveToNextKey(value_data); } - markDone(info); + markKeyAsdone(info); } /* Prefetch dictionary data for an array of keys. @@ -268,7 +272,7 @@ static void prefetchValueData(PrefetchInfo *info, GetValueDataFunc get_val_data_ */ static void dictPrefetch(dict **dicts, GetValueDataFunc get_val_data_func) { initBatchInfo(dicts); - PrefetchInfo *info; + KeyPrefetchInfo *info; while ((info = getNextPrefetchInfo())) { switch (info->state) { case PREFETCH_BUCKET: prefetchBucket(info); break; @@ -357,7 +361,7 @@ void processClientsCommandsBatch(void) { resetCommandsBatch(); - /* Handle the case where the max prefetch size has been changed during the batch processing */ + /* Handle the case where the max prefetch size has been changed. */ if (batch->max_prefetch_size != (size_t)server.prefetch_batch_max_size) { onMaxBatchSizeChange(); } diff --git a/src/memory_prefetch.h b/src/memory_prefetch.h index 428f097e05..5a181cc58d 100644 --- a/src/memory_prefetch.h +++ b/src/memory_prefetch.h @@ -3,7 +3,6 @@ struct client; -void onMaxBatchSizeChange(void); void prefetchCommandsBatchInit(void); void processClientsCommandsBatch(void); int addCommandToBatchAndProcessIfFull(struct client *c); diff --git a/tests/unit/networking.tcl b/tests/unit/networking.tcl index 3ddda042b5..9eaf467477 100644 --- a/tests/unit/networking.tcl +++ b/tests/unit/networking.tcl @@ -172,12 +172,12 @@ start_server {config "minimal.conf" tags {"external:skip"}} { } start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-debug-command {yes}}} { - + set server_pid [s process_id] # Skip if non io-threads mode - as it is relevant only for io-threads mode if {[r config get io-threads] ne "io-threads 1"} { test {prefetch works as expected when killing a client from the middle of prefetch commands batch} { - # Create 17 (prefetch batch size) +1 clients - for {set i 0} {$i < 17} {incr i} { + # Create 16 (prefetch batch size) +1 clients + for {set i 0} {$i < 16} {incr i} { set rd$i [valkey_deferring_client] } @@ -188,22 +188,24 @@ start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-deb $rd4 client id set rd4_id [$rd4 read] - # Create a batch of commands by making sure the server sleeps for a while + # Create a batch of commands by suspending the server for a while # before responding to the first command - $rd0 debug sleep 2 - after 200 ; # wait a bit to make sure the server is sleeping. + pause_process $server_pid # The first client will kill the fourth client - $rd1 client kill id $rd4_id + $rd0 client kill id $rd4_id # Send set commands for all clients except the first - for {set i 1} {$i < 17} {incr i} { + for {set i 1} {$i < 16} {incr i} { [set rd$i] set a $i [set rd$i] flush } + # Resume the server + resume_process $server_pid + # Read the results - assert_equal {1} [$rd1 read] + assert_equal {1} [$rd0 read] catch {$rd4 read} err assert_match {I/O error reading reply} $err @@ -212,25 +214,24 @@ start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-deb set prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries] assert_range $prefetch_entries 2 15; # With slower machines, the number of prefetch entries can be lower set prefetch_batches [getInfoProperty $info io_threaded_total_prefetch_batches] - assert_range $prefetch_batches 1 7; # With slower machines, the number of batches can be higher + assert_range $prefetch_batches 1 7; # With slower machines, the number of batches can be higher # Verify the final state - $rd16 get a - assert_equal {OK} [$rd16 read] - assert_equal {16} [$rd16 read] + $rd15 get a + assert_equal {OK} [$rd15 read] + assert_equal {15} [$rd15 read] } - + test {prefetch works as expected when changing the batch size while executing the commands batch} { # Create 16 (default prefetch batch size) clients for {set i 0} {$i < 16} {incr i} { set rd$i [valkey_deferring_client] } - - # Create a batch of commands by making sure the server sleeps for a while + + # Create a batch of commands by suspending the server for a while # before responding to the first command - $rd0 debug sleep 2 - after 200 ; # wait a bit to make sure the server is sleeping. - + pause_process $server_pid + # Send set commands for all clients the 5th client will change the prefetch batch size for {set i 0} {$i < 16} {incr i} { if {$i == 4} { @@ -239,14 +240,15 @@ start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-deb [set rd$i] set a $i [set rd$i] flush } - + # Resume the server + resume_process $server_pid # Read the results for {set i 0} {$i < 16} {incr i} { assert_equal {OK} [[set rd$i] read] } # assert the configured prefetch batch size was changed - assert {[r config get prefetch-batch-max-size] eq "prefetch-batch-max-size 1"} + assert {[r config get prefetch-batch-max-size] eq "prefetch-batch-max-size 1"} } test {no prefetch when the batch size is set to 0} { @@ -260,18 +262,20 @@ start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-deb for {set i 0} {$i < 16} {incr i} { set rd$i [valkey_deferring_client] } - - # Create a batch of commands by making sure the server sleeps for a while + + # Create a batch of commands by suspending the server for a while # before responding to the first command - $rd0 debug sleep 2 - after 200 ; # wait a bit to make sure the server is sleeping. - + pause_process $server_pid + # Send set commands for all clients for {set i 0} {$i < 16} {incr i} { [set rd$i] set a $i [set rd$i] flush } - + + # Resume the server + resume_process $server_pid + # Read the results for {set i 0} {$i < 16} {incr i} { assert_equal {OK} [[set rd$i] read] From ebbbed16ea7149fc59e1b096063c80937b5c4c8a Mon Sep 17 00:00:00 2001 From: Madelyn Olson Date: Mon, 26 Aug 2024 13:25:37 -0700 Subject: [PATCH 7/7] Update src/networking.c Signed-off-by: Madelyn Olson --- src/networking.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/networking.c b/src/networking.c index ef32dc9688..21a474d82f 100644 --- a/src/networking.c +++ b/src/networking.c @@ -4644,7 +4644,7 @@ int processIOThreadsReadDone(void) { c->flag.pending_read = 0; c->io_read_state = CLIENT_IDLE; - /* Don't post-process-reads to clients that are going to be closed anyway. */ + /* Don't post-process-reads from clients that are going to be closed anyway. */ if (c->flag.close_asap) continue; /* If a client is protected, don't do anything,