Skip to content

Commit

Permalink
Improving iterator using prefetch values
Browse files Browse the repository at this point in the history
Signed-off-by: NadavGigi <[email protected]>
  • Loading branch information
NadavGigi committed Jan 15, 2025
1 parent c5a1585 commit 1da6b36
Show file tree
Hide file tree
Showing 19 changed files with 96 additions and 50 deletions.
8 changes: 4 additions & 4 deletions src/acl.c
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ void ACLChangeSelectorPerm(aclSelector *selector, struct serverCommand *cmd, int
hashtableIterator iter;
hashtableInitSafeIterator(&iter, cmd->subcommands_ht);
void *next;
while (hashtableNext(&iter, &next)) {
while (hashtableNext(&iter, &next, 0)) {
struct serverCommand *sub = next;
ACLSetSelectorCommandBit(selector, sub->id, allow);
}
Expand All @@ -675,7 +675,7 @@ void ACLSetSelectorCommandBitsForCategory(hashtable *commands, aclSelector *sele
hashtableIterator iter;
hashtableInitIterator(&iter, commands);
void *next;
while (hashtableNext(&iter, &next)) {
while (hashtableNext(&iter, &next, 0)) {
struct serverCommand *cmd = next;
if (cmd->acl_categories & cflag) {
ACLChangeSelectorPerm(selector, cmd, value);
Expand Down Expand Up @@ -743,7 +743,7 @@ void ACLCountCategoryBitsForCommands(hashtable *commands,
hashtableIterator iter;
hashtableInitIterator(&iter, commands);
void *next;
while (hashtableNext(&iter, &next)) {
while (hashtableNext(&iter, &next, 0)) {
struct serverCommand *cmd = next;
if (cmd->acl_categories & cflag) {
if (ACLGetSelectorCommandBit(selector, cmd->id))
Expand Down Expand Up @@ -2767,7 +2767,7 @@ void aclCatWithFlags(client *c, hashtable *commands, uint64_t cflag, int *arrayl
hashtableIterator iter;
hashtableInitIterator(&iter, commands);
void *next;
while (hashtableNext(&iter, &next)) {
while (hashtableNext(&iter, &next, 0)) {
struct serverCommand *cmd = next;
if (cmd->acl_categories & cflag) {
addReplyBulkCBuffer(c, cmd->fullname, sdslen(cmd->fullname));
Expand Down
4 changes: 2 additions & 2 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -1893,7 +1893,7 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) {
hashtableIterator iter;
hashtableInitIterator(&iter, zs->ht);
void *next;
while (hashtableNext(&iter, &next)) {
while (hashtableNext(&iter, &next, 0)) {
zskiplistNode *node = next;
if (count == 0) {
int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ? AOF_REWRITE_ITEMS_PER_CMD : items;
Expand Down Expand Up @@ -2220,7 +2220,7 @@ int rewriteAppendOnlyFileRio(rio *aof) {
kvs_it = kvstoreIteratorInit(db->keys);
/* Iterate this DB writing every entry */
void *next;
while (kvstoreIteratorNext(kvs_it, &next)) {
while (kvstoreIteratorNext(kvs_it, &next, HASHTABLE_ITER_PREFETCH_VALUES)) {
robj *o = next;
sds keystr;
robj key;
Expand Down
2 changes: 1 addition & 1 deletion src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,7 @@ void keysCommand(client *c) {
kvs_it = kvstoreIteratorInit(c->db->keys);
}
void *next;
while (kvs_di ? kvstoreHashtableIteratorNext(kvs_di, &next) : kvstoreIteratorNext(kvs_it, &next)) {
while (kvs_di ? kvstoreHashtableIteratorNext(kvs_di, &next) : kvstoreIteratorNext(kvs_it, &next, 0)) {
robj *val = next;
sds key = objectGetKey(val);
if (allkeys || stringmatchlen(pattern, plen, key, sdslen(key), 0)) {
Expand Down
4 changes: 2 additions & 2 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ void xorObjectDigest(serverDb *db, robj *keyobj, unsigned char *digest, robj *o)
hashtableInitIterator(&iter, zs->ht);

void *next;
while (hashtableNext(&iter, &next)) {
while (hashtableNext(&iter, &next, 0)) {
zskiplistNode *node = next;
const int len = fpconv_dtoa(node->score, buf);
buf[len] = '\0';
Expand Down Expand Up @@ -299,7 +299,7 @@ void computeDatasetDigest(unsigned char *final) {

/* Iterate this DB writing every entry */
void *next;
while (kvstoreIteratorNext(kvs_it, &next)) {
while (kvstoreIteratorNext(kvs_it, &next, HASHTABLE_ITER_PREFETCH_VALUES)) {
robj *o = next;
sds key;
robj *keyobj;
Expand Down
28 changes: 26 additions & 2 deletions src/hashtable.c
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,12 @@ static inline int compareKeys(hashtable *ht, const void *key1, const void *key2)
}
}

static inline void entryPrefetchValue(hashtable *ht, const void *entry) {
if (ht->type->entryPrefetchValue != NULL) {
ht->type->entryPrefetchValue(entry);
}
}

static inline const void *entryGetKey(hashtable *ht, const void *entry) {
if (ht->type->entryGetKey != NULL) {
return ht->type->entryGetKey(entry);
Expand Down Expand Up @@ -979,6 +985,15 @@ static void prefetchNextBucketEntries(iter *iter, bucket *current_bucket) {
}
}

/* Prefetches the values associated with filled entries in the given bucket. */
static void prefetchBucketValues(bucket *b, hashtable *ht) {
for (int pos = 0; pos < numBucketPositions(b); pos++) {
if (isPositionFilled(b, pos)) {
entryPrefetchValue(ht, b->entries[pos]);
}
}
}

/* --- API functions --- */

/* Allocates and initializes a new hashtable specified by the given type. */
Expand Down Expand Up @@ -1874,8 +1889,14 @@ void hashtableReleaseIterator(hashtableIterator *iterator) {
}

/* Points elemptr to the next entry and returns 1 if there is a next entry.
* Returns 0 if there are no more entries. */
int hashtableNext(hashtableIterator *iterator, void **elemptr) {
* Returns 0 if there are no more entries.
* The 'flags' argument can be used to tweak the behavior of the iterator:
* - HASHTABLE_ITER_PREFETCH_VALUES: Enables prefetching of entries values,
* which can improve performance in some scenarios. Because the hashtable is generic and
* doesn't care which object we store, the callback entryPrefetchValue helps
* us prefetch necessary fields of specific object types stored in the hashtable.
*/
int hashtableNext(hashtableIterator *iterator, void **elemptr, int flags) {
iter *iter = iteratorFromOpaque(iterator);
while (1) {
if (iter->index == -1 && iter->table == 0) {
Expand Down Expand Up @@ -1936,6 +1957,9 @@ int hashtableNext(hashtableIterator *iterator, void **elemptr) {
}
bucket *b = iter->bucket;
if (iter->pos_in_bucket == 0) {
if (b->presence && (flags & HASHTABLE_ITER_PREFETCH_VALUES)) {
prefetchBucketValues(b, iter->hashtable);
}
prefetchNextBucketEntries(iter, b);
}
if (!isPositionFilled(b, iter->pos_in_bucket)) {
Expand Down
7 changes: 6 additions & 1 deletion src/hashtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ typedef uint64_t hashtableIncrementalFindState[5];
* optional. With all callbacks omitted, the hashtable is effectively a set of
* pointer-sized integers. */
typedef struct {
/* Callback to prefetch the value associated with a hashtable entry. */
void (*entryPrefetchValue)(const void *entry);
/* If the type of an entry is not the same as the type of a key used for
* lookup, this callback needs to return the key within an entry. */
const void *(*entryGetKey)(const void *entry);
Expand Down Expand Up @@ -91,6 +93,9 @@ typedef void (*hashtableScanFunction)(void *privdata, void *entry);
/* Scan flags */
#define HASHTABLE_SCAN_EMIT_REF (1 << 0)

/* Iterator flags */
#define HASHTABLE_ITER_PREFETCH_VALUES (1 << 0) /* When set, prefetch entries inner data during iteration to reduce latency */

/* --- Prototypes --- */

/* Hash function (global seed) */
Expand Down Expand Up @@ -150,7 +155,7 @@ void hashtableResetIterator(hashtableIterator *iter);
hashtableIterator *hashtableCreateIterator(hashtable *ht);
hashtableIterator *hashtableCreateSafeIterator(hashtable *ht);
void hashtableReleaseIterator(hashtableIterator *iter);
int hashtableNext(hashtableIterator *iter, void **elemptr);
int hashtableNext(hashtableIterator *iter, void **elemptr, int flags);

/* Random entries */
int hashtableRandomEntry(hashtable *ht, void **found);
Expand Down
11 changes: 6 additions & 5 deletions src/kvstore.c
Original file line number Diff line number Diff line change
Expand Up @@ -617,16 +617,17 @@ int kvstoreIteratorGetCurrentHashtableIndex(kvstoreIterator *kvs_it) {
return kvs_it->didx;
}

/* Fetches the next element and returns 1. Returns 0 if there are no more elements. */
int kvstoreIteratorNext(kvstoreIterator *kvs_it, void **next) {
if (kvs_it->didx != -1 && hashtableNext(&kvs_it->di, next)) {
/* Fetches the next element and returns 1. Returns 0 if there are no more elements.
* The 'flags' argument can be used to tweak the behavior of the iterator. */
int kvstoreIteratorNext(kvstoreIterator *kvs_it, void **next, int flags) {
if (kvs_it->didx != -1 && hashtableNext(&kvs_it->di, next, flags)) {
return 1;
} else {
/* No current hashtable or reached the end of the hash table. */
hashtable *ht = kvstoreIteratorNextHashtable(kvs_it);
if (!ht) return 0;
hashtableInitSafeIterator(&kvs_it->di, ht);
return hashtableNext(&kvs_it->di, next);
return hashtableNext(&kvs_it->di, next, flags);
}
}

Expand Down Expand Up @@ -724,7 +725,7 @@ int kvstoreHashtableIteratorNext(kvstoreHashtableIterator *kvs_di, void **next)
/* The hashtable may be deleted during the iteration process, so here need to check for NULL. */
hashtable *ht = kvstoreGetHashtable(kvs_di->kvs, kvs_di->didx);
if (!ht) return 0;
return hashtableNext(&kvs_di->di, next);
return hashtableNext(&kvs_di->di, next, 0);
}

int kvstoreHashtableRandomEntry(kvstore *kvs, int didx, void **entry) {
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ size_t kvstoreHashtableMetadataSize(void);
kvstoreIterator *kvstoreIteratorInit(kvstore *kvs);
void kvstoreIteratorRelease(kvstoreIterator *kvs_it);
int kvstoreIteratorGetCurrentHashtableIndex(kvstoreIterator *kvs_it);
int kvstoreIteratorNext(kvstoreIterator *kvs_it, void **next);
int kvstoreIteratorNext(kvstoreIterator *kvs_it, void **next, int flags);

/* Rehashing */
void kvstoreTryResizeHashtables(kvstore *kvs, int limit);
Expand Down
4 changes: 2 additions & 2 deletions src/latency.c
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ void latencyAllCommandsFillCDF(client *c, hashtable *commands, int *command_with
hashtableIterator iter;
hashtableInitSafeIterator(&iter, commands);
void *next;
while (hashtableNext(&iter, &next)) {
while (hashtableNext(&iter, &next, 0)) {
struct serverCommand *cmd = next;
if (cmd->latency_histogram) {
addReplyBulkCBuffer(c, cmd->fullname, sdslen(cmd->fullname));
Expand Down Expand Up @@ -567,7 +567,7 @@ void latencySpecificCommandsFillCDF(client *c) {
hashtableIterator iter;
hashtableInitSafeIterator(&iter, cmd->subcommands_ht);
void *next;
while (hashtableNext(&iter, &next)) {
while (hashtableNext(&iter, &next, 0)) {
struct serverCommand *sub = next;
if (sub->latency_histogram) {
addReplyBulkCBuffer(c, sub->fullname, sdslen(sub->fullname));
Expand Down
4 changes: 2 additions & 2 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -12163,7 +12163,7 @@ int moduleFreeCommand(struct ValkeyModule *module, struct serverCommand *cmd) {
hashtableIterator iter;
void *next;
hashtableInitSafeIterator(&iter, cmd->subcommands_ht);
while (hashtableNext(&iter, &next)) {
while (hashtableNext(&iter, &next, 0)) {
struct serverCommand *sub = next;
if (moduleFreeCommand(module, sub) != C_OK) continue;

Expand All @@ -12186,7 +12186,7 @@ void moduleUnregisterCommands(struct ValkeyModule *module) {
hashtableIterator iter;
void *next;
hashtableInitSafeIterator(&iter, server.commands);
while (hashtableNext(&iter, &next)) {
while (hashtableNext(&iter, &next, 0)) {
struct serverCommand *cmd = next;
if (moduleFreeCommand(module, cmd) != C_OK) continue;

Expand Down
8 changes: 4 additions & 4 deletions src/object.c
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ void dismissSetObject(robj *o, size_t size_hint) {
hashtableIterator iter;
hashtableInitIterator(&iter, ht);
void *next;
while (hashtableNext(&iter, &next)) {
while (hashtableNext(&iter, &next, 0)) {
sds item = next;
dismissSds(item);
}
Expand Down Expand Up @@ -684,7 +684,7 @@ void dismissHashObject(robj *o, size_t size_hint) {
hashtableIterator iter;
hashtableInitIterator(&iter, ht);
void *next;
while (hashtableNext(&iter, &next)) {
while (hashtableNext(&iter, &next, 0)) {
dismissHashTypeEntry(next);
}
hashtableResetIterator(&iter);
Expand Down Expand Up @@ -1158,7 +1158,7 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) {
hashtableIterator iter;
hashtableInitIterator(&iter, ht);
void *next;
while (hashtableNext(&iter, &next) && samples < sample_size) {
while (hashtableNext(&iter, &next, 0) && samples < sample_size) {
sds element = next;
elesize += sdsAllocSize(element);
samples++;
Expand Down Expand Up @@ -1201,7 +1201,7 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) {
void *next;

asize = sizeof(*o) + hashtableMemUsage(ht);
while (hashtableNext(&iter, &next) && samples < sample_size) {
while (hashtableNext(&iter, &next, 0) && samples < sample_size) {
elesize += hashTypeEntryAllocSize(next);
samples++;
}
Expand Down
6 changes: 3 additions & 3 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -889,7 +889,7 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) {
hashtableIterator iterator;
hashtableInitIterator(&iterator, set);
void *next;
while (hashtableNext(&iterator, &next)) {
while (hashtableNext(&iterator, &next, 0)) {
sds ele = next;
if ((n = rdbSaveRawString(rdb, (unsigned char *)ele, sdslen(ele))) == -1) {
hashtableResetIterator(&iterator);
Expand Down Expand Up @@ -961,7 +961,7 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) {
hashtableIterator iter;
hashtableInitIterator(&iter, ht);
void *next;
while (hashtableNext(&iter, &next)) {
while (hashtableNext(&iter, &next, HASHTABLE_ITER_PREFETCH_VALUES)) {
sds field = hashTypeEntryGetField(next);
sds value = hashTypeEntryGetValue(next);

Expand Down Expand Up @@ -1353,7 +1353,7 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) {
int last_slot = -1;
/* Iterate this DB writing every entry */
void *next;
while (kvstoreIteratorNext(kvs_it, &next)) {
while (kvstoreIteratorNext(kvs_it, &next, HASHTABLE_ITER_PREFETCH_VALUES)) {
robj *o = next;
int curr_slot = kvstoreIteratorGetCurrentHashtableIndex(kvs_it);
/* Save slot info. */
Expand Down
Loading

0 comments on commit 1da6b36

Please sign in to comment.