Skip to content

Commit

Permalink
Replace dict with new hashtable: hash datatype
Browse files Browse the repository at this point in the history
Signed-off-by: Rain Valentine <[email protected]>
  • Loading branch information
SoftlyRaining committed Jan 4, 2025
1 parent 6644ac5 commit e5b82a9
Show file tree
Hide file tree
Showing 11 changed files with 259 additions and 283 deletions.
2 changes: 1 addition & 1 deletion src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -1935,7 +1935,7 @@ static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) {
return rioWriteBulkString(r, (char *)vstr, vlen);
else
return rioWriteBulkLongLong(r, vll);
} else if (hi->encoding == OBJ_ENCODING_HT) {
} else if (hi->encoding == OBJ_ENCODING_HASHTABLE) {
sds value = hashTypeCurrentFromHashTable(hi, what);
return rioWriteBulkString(r, value, sdslen(value));
}
Expand Down
69 changes: 19 additions & 50 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -979,39 +979,6 @@ void keysScanCallback(void *privdata, void *entry) {

/* This callback is used by scanGenericCommand in order to collect elements
* returned by the dictionary iterator into a list. */
void dictScanCallback(void *privdata, const dictEntry *de) {
scanData *data = (scanData *)privdata;
list *keys = data->keys;
robj *o = data->o;
sds val = NULL;
sds key = NULL;
data->sampled++;

/* This callback is only used for scanning elements within a key (hash
* fields, set elements, etc.) so o must be set here. */
serverAssert(o != NULL);

/* Filter element if it does not match the pattern. */
sds keysds = dictGetKey(de);
if (data->pattern) {
if (!stringmatchlen(data->pattern, sdslen(data->pattern), keysds, sdslen(keysds), 0)) {
return;
}
}

if (o->type == OBJ_HASH) {
key = keysds;
if (!data->only_keys) {
val = dictGetVal(de);
}
} else {
serverPanic("Type not handled in dict SCAN callback.");
}

listAddNodeTail(keys, key);
if (val) listAddNodeTail(keys, val);
}

void hashtableScanCallback(void *privdata, void *entry) {
scanData *data = (scanData *)privdata;
sds val = NULL;
Expand All @@ -1025,14 +992,21 @@ void hashtableScanCallback(void *privdata, void *entry) {
* fields, set elements, etc.) so o must be set here. */
serverAssert(o != NULL);

/* get key */
/* get key, value */
if (o->type == OBJ_SET) {
key = (sds)entry;
} else if (o->type == OBJ_ZSET) {
zskiplistNode *node = (zskiplistNode *)entry;
key = node->ele;
/* zset data is copied after filtering by key */
} else if (o->type == OBJ_HASH) {
key = (sds)hashTypeEntryGetKey(entry);
if (!data->only_keys) {
hashTypeEntry *hash_entry = entry;
val = hash_entry->value;
}
} else {
serverPanic("Type not handled in hashset SCAN callback.");
serverPanic("Type not handled in hashtable SCAN callback.");
}

/* Filter element if it does not match the pattern. */
Expand All @@ -1042,9 +1016,9 @@ void hashtableScanCallback(void *privdata, void *entry) {
}
}

if (o->type == OBJ_SET) {
/* no value, key used by reference */
} else if (o->type == OBJ_ZSET) {
/* zset data must be copied. Do this after filtering to avoid unneeded
* allocations. */
if (o->type == OBJ_ZSET) {
/* zset data is copied */
zskiplistNode *node = (zskiplistNode *)entry;
key = sdsdup(node->ele);
Expand All @@ -1053,8 +1027,6 @@ void hashtableScanCallback(void *privdata, void *entry) {
int len = ld2string(buf, sizeof(buf), node->score, LD_STR_AUTO);
val = sdsnewlen(buf, len);
}
} else {
serverPanic("Type not handled in hashset SCAN callback.");
}

listAddNodeTail(keys, key);
Expand Down Expand Up @@ -1193,20 +1165,19 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) {
* cursor to zero to signal the end of the iteration. */

/* Handle the case of kvstore, dict or hashtable. */
dict *dict_table = NULL;
hashtable *hashtable_table = NULL;
hashtable *ht = NULL;
int shallow_copied_list_items = 0;
if (o == NULL) {
shallow_copied_list_items = 1;
} else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HASHTABLE) {
hashtable_table = o->ptr;
ht = o->ptr;
shallow_copied_list_items = 1;
} else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) {
dict_table = o->ptr;
} else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HASHTABLE) {
ht = o->ptr;
shallow_copied_list_items = 1;
} else if (o->type == OBJ_ZSET && o->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = o->ptr;
hashtable_table = zs->ht;
ht = zs->ht;
/* scanning ZSET allocates temporary strings even though it's a dict */
shallow_copied_list_items = 0;
}
Expand All @@ -1220,7 +1191,7 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) {
}

/* For main hash table scan or scannable data structure. */
if (!o || dict_table || hashtable_table) {
if (!o || ht) {
/* We set the max number of iterations to ten times the specified
* COUNT, so if the hash table is in a pathological state (very
* sparsely populated) we avoid to block too much time at the cost
Expand Down Expand Up @@ -1260,10 +1231,8 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) {
* If cursor is empty, we should try exploring next non-empty slot. */
if (o == NULL) {
cursor = kvstoreScan(c->db->keys, cursor, onlydidx, keysScanCallback, NULL, &data);
} else if (dict_table) {
cursor = dictScan(dict_table, cursor, dictScanCallback, &data);
} else {
cursor = hashtableScan(hashtable_table, cursor, hashtableScanCallback, &data);
cursor = hashtableScan(ht, cursor, hashtableScanCallback, &data);
}
} while (cursor && maxiterations-- && data.sampled < count);
} else if (o->type == OBJ_SET) {
Expand Down
10 changes: 2 additions & 8 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -923,23 +923,17 @@ void debugCommand(client *c) {
robj *o = objectCommandLookupOrReply(c, c->argv[2], shared.nokeyerr);
if (o == NULL) return;

/* Get the dict reference from the object, if possible. */
dict *d = NULL;
/* Get the hashtable reference from the object, if possible. */
hashtable *ht = NULL;
switch (o->encoding) {
case OBJ_ENCODING_SKIPLIST: {
zset *zs = o->ptr;
ht = zs->ht;
} break;
case OBJ_ENCODING_HT: d = o->ptr; break;
case OBJ_ENCODING_HASHTABLE: ht = o->ptr; break;
}

if (d != NULL) {
char buf[4096];
dictGetStats(buf, sizeof(buf), d, full);
addReplyVerbatim(c, buf, strlen(buf), "txt");
} else if (ht != NULL) {
if (ht != NULL) {
char buf[4096];
hashtableGetStats(buf, sizeof(buf), ht, full);
addReplyVerbatim(c, buf, strlen(buf), "txt");
Expand Down
67 changes: 35 additions & 32 deletions src/defrag.c
Original file line number Diff line number Diff line change
Expand Up @@ -373,13 +373,6 @@ void activeDefragSdsHashtableCallback(void *privdata, void *entry_ref) {
if (new_sds != NULL) *sds_ref = new_sds;
}

void activeDefragSdsHashtable(hashtable *ht) {
unsigned long cursor = 0;
do {
cursor = hashtableScanDefrag(ht, cursor, activeDefragSdsHashtableCallback, NULL, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF);
} while (cursor != 0);
}

/* Defrag a list of ptr, sds or robj string values */
static void activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) {
quicklistNode *newnode, *node = *node_ref;
Expand Down Expand Up @@ -481,26 +474,29 @@ static void scanHashtableCallbackCountScanned(void *privdata, void *elemref) {
server.stat_active_defrag_scanned++;
}

/* Used as dict scan callback when all the work is done in the dictDefragFunctions. */
static void scanCallbackCountScanned(void *privdata, const dictEntry *de) {
UNUSED(privdata);
UNUSED(de);
server.stat_active_defrag_scanned++;
}

static void scanLaterSet(robj *ob, unsigned long *cursor) {
if (ob->type != OBJ_SET || ob->encoding != OBJ_ENCODING_HASHTABLE) return;
hashtable *ht = ob->ptr;
*cursor = hashtableScanDefrag(ht, *cursor, activeDefragSdsHashtableCallback, NULL, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF);
}

static void activeDefragHashField(void *privdata, void *element_ref) {
UNUSED(privdata);
hashTypeEntry **entry_ref = (hashTypeEntry **)element_ref;

/* defragment field */
hashTypeEntry *new_entry = activeDefragAlloc(*entry_ref);
if (new_entry) *entry_ref = new_entry;

/* defragment value */
sds new_value = activeDefragSds((*entry_ref)->value);
if (new_value) (*entry_ref)->value = new_value;
}

static void scanLaterHash(robj *ob, unsigned long *cursor) {
if (ob->type != OBJ_HASH || ob->encoding != OBJ_ENCODING_HT) return;
dict *d = ob->ptr;
dictDefragFunctions defragfns = {.defragAlloc = activeDefragAlloc,
.defragKey = (dictDefragAllocFunction *)activeDefragSds,
.defragVal = (dictDefragAllocFunction *)activeDefragSds};
*cursor = dictScanDefrag(d, *cursor, scanCallbackCountScanned, &defragfns, NULL);
if (ob->type != OBJ_HASH || ob->encoding != OBJ_ENCODING_HASHTABLE) return;
hashtable *ht = ob->ptr;
*cursor = hashtableScanDefrag(ht, *cursor, activeDefragHashField, NULL, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF);
}

static void defragQuicklist(robj *ob) {
Expand Down Expand Up @@ -538,15 +534,19 @@ static void defragZsetSkiplist(robj *ob) {
}

static void defragHash(robj *ob) {
dict *d, *newd;
serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT);
d = ob->ptr;
if (dictSize(d) > server.active_defrag_max_scan_fields)
serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HASHTABLE);
hashtable *ht = ob->ptr;
if (hashtableSize(ht) > server.active_defrag_max_scan_fields) {
defragLater(ob);
else
activeDefragSdsDict(d, DEFRAG_SDS_DICT_VAL_IS_SDS);
/* defrag the dict struct and tables */
if ((newd = dictDefragTables(ob->ptr))) ob->ptr = newd;
} else {
unsigned long cursor = 0;
do {
cursor = hashtableScanDefrag(ht, cursor, activeDefragHashField, NULL, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF);
} while (cursor != 0);
}
/* defrag the hashtable struct and tables */
hashtable *new_hashtable = hashtableDefragTables(ht, activeDefragAlloc);
if (new_hashtable) ob->ptr = new_hashtable;
}

static void defragSet(robj *ob) {
Expand All @@ -555,11 +555,14 @@ static void defragSet(robj *ob) {
if (hashtableSize(ht) > server.active_defrag_max_scan_fields) {
defragLater(ob);
} else {
activeDefragSdsHashtable(ht);
unsigned long cursor = 0;
do {
cursor = hashtableScanDefrag(ht, cursor, activeDefragSdsHashtableCallback, NULL, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF);
} while (cursor != 0);
}
/* defrag the hashtable struct and tables */
hashtable *newHashtable = hashtableDefragTables(ht, activeDefragAlloc);
if (newHashtable) ob->ptr = newHashtable;
hashtable *new_hashtable = hashtableDefragTables(ht, activeDefragAlloc);
if (new_hashtable) ob->ptr = new_hashtable;
}

/* Defrag callback for radix tree iterator, called for each node,
Expand Down Expand Up @@ -776,7 +779,7 @@ static void defragKey(defragKeysCtx *ctx, robj **elemref) {
} else if (ob->type == OBJ_HASH) {
if (ob->encoding == OBJ_ENCODING_LISTPACK) {
if ((newzl = activeDefragAlloc(ob->ptr))) ob->ptr = newzl;
} else if (ob->encoding == OBJ_ENCODING_HT) {
} else if (ob->encoding == OBJ_ENCODING_HASHTABLE) {
defragHash(ob);
} else {
serverPanic("Unknown hash encoding");
Expand Down
6 changes: 3 additions & 3 deletions src/lazyfree.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ size_t lazyfreeGetFreeEffort(robj *key, robj *obj, int dbid) {
} else if (obj->type == OBJ_ZSET && obj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = obj->ptr;
return zs->zsl->length;
} else if (obj->type == OBJ_HASH && obj->encoding == OBJ_ENCODING_HT) {
dict *ht = obj->ptr;
return dictSize(ht);
} else if (obj->type == OBJ_HASH && obj->encoding == OBJ_ENCODING_HASHTABLE) {
hashtable *ht = obj->ptr;
return hashtableSize(ht);
} else if (obj->type == OBJ_STREAM) {
size_t effort = 0;
stream *s = obj->ptr;
Expand Down
36 changes: 7 additions & 29 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -11074,25 +11074,6 @@ typedef struct {
ValkeyModuleScanKeyCB fn;
} ScanKeyCBData;

static void moduleScanKeyDictCallback(void *privdata, const dictEntry *de) {
ScanKeyCBData *data = privdata;
sds key = dictGetKey(de);
robj *o = data->key->value;
robj *field = createStringObject(key, sdslen(key));
robj *value = NULL;

if (o->type == OBJ_HASH) {
sds val = dictGetVal(de);
value = createStringObject(val, sdslen(val));
} else {
serverPanic("unexpected object type");
}

data->fn(data->key, field, value, data->user_data);
decrRefCount(field);
if (value) decrRefCount(value);
}

static void moduleScanKeyHashtableCallback(void *privdata, void *entry) {
ScanKeyCBData *data = privdata;
robj *o = data->key->value;
Expand All @@ -11106,6 +11087,11 @@ static void moduleScanKeyHashtableCallback(void *privdata, void *entry) {
zskiplistNode *node = (zskiplistNode *)entry;
key = node->ele;
value = createStringObjectFromLongDouble(node->score, 0);
} else if (o->type == OBJ_HASH) {
key = (sds)hashTypeEntryGetKey(entry);
hashTypeEntry *hash_entry = entry;
sds val = hash_entry->value;
value = createStringObject(val, sdslen(val));
} else {
serverPanic("unexpected object type");
}
Expand Down Expand Up @@ -11169,13 +11155,12 @@ int VM_ScanKey(ValkeyModuleKey *key, ValkeyModuleScanCursor *cursor, ValkeyModul
errno = EINVAL;
return 0;
}
dict *d = NULL;
hashtable *ht = NULL;
robj *o = key->value;
if (o->type == OBJ_SET) {
if (o->encoding == OBJ_ENCODING_HASHTABLE) ht = o->ptr;
} else if (o->type == OBJ_HASH) {
if (o->encoding == OBJ_ENCODING_HT) d = o->ptr;
if (o->encoding == OBJ_ENCODING_HASHTABLE) ht = o->ptr;
} else if (o->type == OBJ_ZSET) {
if (o->encoding == OBJ_ENCODING_SKIPLIST) ht = ((zset *)o->ptr)->ht;
} else {
Expand All @@ -11187,14 +11172,7 @@ int VM_ScanKey(ValkeyModuleKey *key, ValkeyModuleScanCursor *cursor, ValkeyModul
return 0;
}
int ret = 1;
if (d) {
ScanKeyCBData data = {key, privdata, fn};
cursor->cursor = dictScan(d, cursor->cursor, moduleScanKeyDictCallback, &data);
if (cursor->cursor == 0) {
cursor->done = 1;
ret = 0;
}
} else if (ht) {
if (ht) {
ScanKeyCBData data = {key, privdata, fn};
cursor->cursor = hashtableScan(ht, cursor->cursor, moduleScanKeyHashtableCallback, &data);
if (cursor->cursor == 0) {
Expand Down
Loading

0 comments on commit e5b82a9

Please sign in to comment.