Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accelerate hash table iterator with value prefetching #1568

Open
wants to merge 1 commit into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/acl.c
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ void ACLChangeSelectorPerm(aclSelector *selector, struct serverCommand *cmd, int
hashtableIterator iter;
hashtableInitSafeIterator(&iter, cmd->subcommands_ht);
void *next;
while (hashtableNext(&iter, &next)) {
while (hashtableNext(&iter, &next, 0)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why aren't we prefetching in these cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this pr i have added support only for hashtables that storing robj and hashTypeEntry.
I will add support for more complex data structures in a different pr.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes the hashtableNext call less simple and the change affects all places even where it is not used. I have another suggestions: storing the iterator flags in the iterator:

        hashtableIterator iter;
        hashtableInitSafeIterator(&iter, cmd->subcommands_ht);
        hashtableIteratorSetFlags(HASHTABLE_ITER_PREFETCH_VALUES);
        void *next;
        while (hashtableNext(&iter, &next)) {

Alternatively, we could set the prefetch callback directly in the iterator instead of storing it in the hashtableType.

        hashtableIteratorSetPrefetchCallback(&iter, hashTypeEntryPrefetchValue);

struct serverCommand *sub = next;
ACLSetSelectorCommandBit(selector, sub->id, allow);
}
Expand All @@ -675,7 +675,7 @@ void ACLSetSelectorCommandBitsForCategory(hashtable *commands, aclSelector *sele
hashtableIterator iter;
hashtableInitIterator(&iter, commands);
void *next;
while (hashtableNext(&iter, &next)) {
while (hashtableNext(&iter, &next, 0)) {
struct serverCommand *cmd = next;
if (cmd->acl_categories & cflag) {
ACLChangeSelectorPerm(selector, cmd, value);
Expand Down Expand Up @@ -743,7 +743,7 @@ void ACLCountCategoryBitsForCommands(hashtable *commands,
hashtableIterator iter;
hashtableInitIterator(&iter, commands);
void *next;
while (hashtableNext(&iter, &next)) {
while (hashtableNext(&iter, &next, 0)) {
struct serverCommand *cmd = next;
if (cmd->acl_categories & cflag) {
if (ACLGetSelectorCommandBit(selector, cmd->id))
Expand Down Expand Up @@ -2767,7 +2767,7 @@ void aclCatWithFlags(client *c, hashtable *commands, uint64_t cflag, int *arrayl
hashtableIterator iter;
hashtableInitIterator(&iter, commands);
void *next;
while (hashtableNext(&iter, &next)) {
while (hashtableNext(&iter, &next, 0)) {
struct serverCommand *cmd = next;
if (cmd->acl_categories & cflag) {
addReplyBulkCBuffer(c, cmd->fullname, sdslen(cmd->fullname));
Expand Down
4 changes: 2 additions & 2 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -1893,7 +1893,7 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) {
hashtableIterator iter;
hashtableInitIterator(&iter, zs->ht);
void *next;
while (hashtableNext(&iter, &next)) {
while (hashtableNext(&iter, &next, 0)) {
zskiplistNode *node = next;
if (count == 0) {
int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ? AOF_REWRITE_ITEMS_PER_CMD : items;
Expand Down Expand Up @@ -2220,7 +2220,7 @@ int rewriteAppendOnlyFileRio(rio *aof) {
kvs_it = kvstoreIteratorInit(db->keys);
/* Iterate this DB writing every entry */
void *next;
while (kvstoreIteratorNext(kvs_it, &next)) {
while (kvstoreIteratorNext(kvs_it, &next, HASHTABLE_ITER_PREFETCH_VALUES)) {
robj *o = next;
sds keystr;
robj key;
Expand Down
2 changes: 1 addition & 1 deletion src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,7 @@ void keysCommand(client *c) {
kvs_it = kvstoreIteratorInit(c->db->keys);
}
void *next;
while (kvs_di ? kvstoreHashtableIteratorNext(kvs_di, &next) : kvstoreIteratorNext(kvs_it, &next)) {
while (kvs_di ? kvstoreHashtableIteratorNext(kvs_di, &next) : kvstoreIteratorNext(kvs_it, &next, 0)) {
robj *val = next;
sds key = objectGetKey(val);
if (allkeys || stringmatchlen(pattern, plen, key, sdslen(key), 0)) {
Expand Down
4 changes: 2 additions & 2 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ void xorObjectDigest(serverDb *db, robj *keyobj, unsigned char *digest, robj *o)
hashtableInitIterator(&iter, zs->ht);

void *next;
while (hashtableNext(&iter, &next)) {
while (hashtableNext(&iter, &next, 0)) {
zskiplistNode *node = next;
const int len = fpconv_dtoa(node->score, buf);
buf[len] = '\0';
Expand Down Expand Up @@ -299,7 +299,7 @@ void computeDatasetDigest(unsigned char *final) {

/* Iterate this DB writing every entry */
void *next;
while (kvstoreIteratorNext(kvs_it, &next)) {
while (kvstoreIteratorNext(kvs_it, &next, HASHTABLE_ITER_PREFETCH_VALUES)) {
robj *o = next;
sds key;
robj *keyobj;
Expand Down
28 changes: 26 additions & 2 deletions src/hashtable.c
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,12 @@ static inline int compareKeys(hashtable *ht, const void *key1, const void *key2)
}
}

static inline void entryPrefetchValue(hashtable *ht, const void *entry) {
if (ht->type->entryPrefetchValue != NULL) {
ht->type->entryPrefetchValue(entry);
}
}

static inline const void *entryGetKey(hashtable *ht, const void *entry) {
if (ht->type->entryGetKey != NULL) {
return ht->type->entryGetKey(entry);
Expand Down Expand Up @@ -979,6 +985,15 @@ static void prefetchNextBucketEntries(iter *iter, bucket *current_bucket) {
}
}

/* Prefetches the values associated with filled entries in the given bucket. */
static void prefetchBucketValues(bucket *b, hashtable *ht) {
for (int pos = 0; pos < numBucketPositions(b); pos++) {
if (isPositionFilled(b, pos)) {
entryPrefetchValue(ht, b->entries[pos]);
}
}
}

/* --- API functions --- */

/* Allocates and initializes a new hashtable specified by the given type. */
Expand Down Expand Up @@ -1874,8 +1889,14 @@ void hashtableReleaseIterator(hashtableIterator *iterator) {
}

/* Points elemptr to the next entry and returns 1 if there is a next entry.
* Returns 0 if there are no more entries. */
int hashtableNext(hashtableIterator *iterator, void **elemptr) {
* Returns 0 if there are no more entries.
* The 'flags' argument can be used to tweak the behavior of the iterator:
* - HASHTABLE_ITER_PREFETCH_VALUES: Enables prefetching of entries values,
* which can improve performance in some scenarios. Because the hashtable is generic and
* doesn't care which object we store, the callback entryPrefetchValue helps
* us prefetch necessary fields of specific object types stored in the hashtable.
*/
int hashtableNext(hashtableIterator *iterator, void **elemptr, int flags) {
iter *iter = iteratorFromOpaque(iterator);
while (1) {
if (iter->index == -1 && iter->table == 0) {
Expand Down Expand Up @@ -1936,6 +1957,9 @@ int hashtableNext(hashtableIterator *iterator, void **elemptr) {
}
bucket *b = iter->bucket;
if (iter->pos_in_bucket == 0) {
if (b->presence && (flags & HASHTABLE_ITER_PREFETCH_VALUES)) {
prefetchBucketValues(b, iter->hashtable);
}
prefetchNextBucketEntries(iter, b);
}
if (!isPositionFilled(b, iter->pos_in_bucket)) {
Expand Down
7 changes: 6 additions & 1 deletion src/hashtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ typedef uint64_t hashtableIncrementalFindState[5];
* optional. With all callbacks omitted, the hashtable is effectively a set of
* pointer-sized integers. */
typedef struct {
/* Callback to prefetch the value associated with a hashtable entry. */
void (*entryPrefetchValue)(const void *entry);
/* If the type of an entry is not the same as the type of a key used for
* lookup, this callback needs to return the key within an entry. */
const void *(*entryGetKey)(const void *entry);
Expand Down Expand Up @@ -91,6 +93,9 @@ typedef void (*hashtableScanFunction)(void *privdata, void *entry);
/* Scan flags */
#define HASHTABLE_SCAN_EMIT_REF (1 << 0)

/* Iterator flags */
#define HASHTABLE_ITER_PREFETCH_VALUES (1 << 0) /* When set, prefetch entries inner data during iteration to reduce latency */

/* --- Prototypes --- */

/* Hash function (global seed) */
Expand Down Expand Up @@ -150,7 +155,7 @@ void hashtableResetIterator(hashtableIterator *iter);
hashtableIterator *hashtableCreateIterator(hashtable *ht);
hashtableIterator *hashtableCreateSafeIterator(hashtable *ht);
void hashtableReleaseIterator(hashtableIterator *iter);
int hashtableNext(hashtableIterator *iter, void **elemptr);
int hashtableNext(hashtableIterator *iter, void **elemptr, int flags);

/* Random entries */
int hashtableRandomEntry(hashtable *ht, void **found);
Expand Down
11 changes: 6 additions & 5 deletions src/kvstore.c
Original file line number Diff line number Diff line change
Expand Up @@ -617,16 +617,17 @@ int kvstoreIteratorGetCurrentHashtableIndex(kvstoreIterator *kvs_it) {
return kvs_it->didx;
}

/* Fetches the next element and returns 1. Returns 0 if there are no more elements. */
int kvstoreIteratorNext(kvstoreIterator *kvs_it, void **next) {
if (kvs_it->didx != -1 && hashtableNext(&kvs_it->di, next)) {
/* Fetches the next element and returns 1. Returns 0 if there are no more elements.
* The 'flags' argument can be used to tweak the behavior of the iterator. */
int kvstoreIteratorNext(kvstoreIterator *kvs_it, void **next, int flags) {
if (kvs_it->didx != -1 && hashtableNext(&kvs_it->di, next, flags)) {
return 1;
} else {
/* No current hashtable or reached the end of the hash table. */
hashtable *ht = kvstoreIteratorNextHashtable(kvs_it);
if (!ht) return 0;
hashtableInitSafeIterator(&kvs_it->di, ht);
return hashtableNext(&kvs_it->di, next);
return hashtableNext(&kvs_it->di, next, flags);
}
}

Expand Down Expand Up @@ -724,7 +725,7 @@ int kvstoreHashtableIteratorNext(kvstoreHashtableIterator *kvs_di, void **next)
/* The hashtable may be deleted during the iteration process, so here need to check for NULL. */
hashtable *ht = kvstoreGetHashtable(kvs_di->kvs, kvs_di->didx);
if (!ht) return 0;
return hashtableNext(&kvs_di->di, next);
return hashtableNext(&kvs_di->di, next, 0);
}

int kvstoreHashtableRandomEntry(kvstore *kvs, int didx, void **entry) {
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ size_t kvstoreHashtableMetadataSize(void);
kvstoreIterator *kvstoreIteratorInit(kvstore *kvs);
void kvstoreIteratorRelease(kvstoreIterator *kvs_it);
int kvstoreIteratorGetCurrentHashtableIndex(kvstoreIterator *kvs_it);
int kvstoreIteratorNext(kvstoreIterator *kvs_it, void **next);
int kvstoreIteratorNext(kvstoreIterator *kvs_it, void **next, int flags);

/* Rehashing */
void kvstoreTryResizeHashtables(kvstore *kvs, int limit);
Expand Down
4 changes: 2 additions & 2 deletions src/latency.c
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ void latencyAllCommandsFillCDF(client *c, hashtable *commands, int *command_with
hashtableIterator iter;
hashtableInitSafeIterator(&iter, commands);
void *next;
while (hashtableNext(&iter, &next)) {
while (hashtableNext(&iter, &next, 0)) {
struct serverCommand *cmd = next;
if (cmd->latency_histogram) {
addReplyBulkCBuffer(c, cmd->fullname, sdslen(cmd->fullname));
Expand Down Expand Up @@ -567,7 +567,7 @@ void latencySpecificCommandsFillCDF(client *c) {
hashtableIterator iter;
hashtableInitSafeIterator(&iter, cmd->subcommands_ht);
void *next;
while (hashtableNext(&iter, &next)) {
while (hashtableNext(&iter, &next, 0)) {
struct serverCommand *sub = next;
if (sub->latency_histogram) {
addReplyBulkCBuffer(c, sub->fullname, sdslen(sub->fullname));
Expand Down
4 changes: 2 additions & 2 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -12163,7 +12163,7 @@ int moduleFreeCommand(struct ValkeyModule *module, struct serverCommand *cmd) {
hashtableIterator iter;
void *next;
hashtableInitSafeIterator(&iter, cmd->subcommands_ht);
while (hashtableNext(&iter, &next)) {
while (hashtableNext(&iter, &next, 0)) {
struct serverCommand *sub = next;
if (moduleFreeCommand(module, sub) != C_OK) continue;

Expand All @@ -12186,7 +12186,7 @@ void moduleUnregisterCommands(struct ValkeyModule *module) {
hashtableIterator iter;
void *next;
hashtableInitSafeIterator(&iter, server.commands);
while (hashtableNext(&iter, &next)) {
while (hashtableNext(&iter, &next, 0)) {
struct serverCommand *cmd = next;
if (moduleFreeCommand(module, cmd) != C_OK) continue;

Expand Down
8 changes: 4 additions & 4 deletions src/object.c
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ void dismissSetObject(robj *o, size_t size_hint) {
hashtableIterator iter;
hashtableInitIterator(&iter, ht);
void *next;
while (hashtableNext(&iter, &next)) {
while (hashtableNext(&iter, &next, 0)) {
sds item = next;
dismissSds(item);
}
Expand Down Expand Up @@ -684,7 +684,7 @@ void dismissHashObject(robj *o, size_t size_hint) {
hashtableIterator iter;
hashtableInitIterator(&iter, ht);
void *next;
while (hashtableNext(&iter, &next)) {
while (hashtableNext(&iter, &next, 0)) {
dismissHashTypeEntry(next);
}
hashtableResetIterator(&iter);
Expand Down Expand Up @@ -1158,7 +1158,7 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) {
hashtableIterator iter;
hashtableInitIterator(&iter, ht);
void *next;
while (hashtableNext(&iter, &next) && samples < sample_size) {
while (hashtableNext(&iter, &next, 0) && samples < sample_size) {
sds element = next;
elesize += sdsAllocSize(element);
samples++;
Expand Down Expand Up @@ -1201,7 +1201,7 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) {
void *next;

asize = sizeof(*o) + hashtableMemUsage(ht);
while (hashtableNext(&iter, &next) && samples < sample_size) {
while (hashtableNext(&iter, &next, 0) && samples < sample_size) {
elesize += hashTypeEntryAllocSize(next);
samples++;
}
Expand Down
6 changes: 3 additions & 3 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -889,7 +889,7 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) {
hashtableIterator iterator;
hashtableInitIterator(&iterator, set);
void *next;
while (hashtableNext(&iterator, &next)) {
while (hashtableNext(&iterator, &next, 0)) {
sds ele = next;
if ((n = rdbSaveRawString(rdb, (unsigned char *)ele, sdslen(ele))) == -1) {
hashtableResetIterator(&iterator);
Expand Down Expand Up @@ -961,7 +961,7 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) {
hashtableIterator iter;
hashtableInitIterator(&iter, ht);
void *next;
while (hashtableNext(&iter, &next)) {
while (hashtableNext(&iter, &next, HASHTABLE_ITER_PREFETCH_VALUES)) {
sds field = hashTypeEntryGetField(next);
sds value = hashTypeEntryGetValue(next);

Expand Down Expand Up @@ -1353,7 +1353,7 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) {
int last_slot = -1;
/* Iterate this DB writing every entry */
void *next;
while (kvstoreIteratorNext(kvs_it, &next)) {
while (kvstoreIteratorNext(kvs_it, &next, HASHTABLE_ITER_PREFETCH_VALUES)) {
robj *o = next;
int curr_slot = kvstoreIteratorGetCurrentHashtableIndex(kvs_it);
/* Save slot info. */
Expand Down
Loading
Loading