redis dict
结构
dict使用链地址法解决冲突,size为$2^n$,sizemask = size - 1,用于计算key所属的bucket上,避免了mod,还便于处理scan时发生rehash的情况。每个dict有两个dictht,多个出来的一个用于rehash。
使用了类似list中的方式来保存key和value,并提供了hashFunction,keyDup等函数操作key和value。
| |
添加元素
添加元素的过程,
dictAdd()dictAddRaw()- 如果需要rehash,
_dictRehashStep()。 _dictKeyIndex()查找entry。- 如果找不到,创建新的entry,并插入链表的头部(这里假设了最近创建的会被更频繁的使用)。当rehash时,选择新的hash table,
d->ht[1]进行操作。
1 2 3 4 5 6 7 8ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0]; entry = zmalloc(sizeof(*entry)); entry->next = ht->table[index]; ht->table[index] = entry; ht->used++; /* Set the hash entry fields. */ dictSetKey(d, entry, key);- 如果需要rehash,
_dictKeyIndex()- 检查是否需要扩张hash table(扩张或为
d->ht[0]进行初始化),_dictExpandIfNeeded()。 - 使用
hash & d->ht[table].sizemask计算所属bucket。 - 查找entry。在rehash的时候,会在两个hash table中进行查找。
1 2 3 4 5 6 7 8 9 10 11 12 13for (table = 0; table <= 1; table++) { idx = hash & d->ht[table].sizemask; /* Search if this slot does not already contain the given key */ he = d->ht[table].table[idx]; while(he) { if (key==he->key || dictCompareKeys(d, key, he->key)) { if (existing) *existing = he; return -1; } he = he->next; } if (!dictIsRehashing(d)) break; }- 检查是否需要扩张hash table(扩张或为
扩张hash table
每次add都会调用_dictExpandIfNeeded()。
| |
如果
d->ht[0]为空,那么dictExpand()中为d->ht[0]初始化,初始大小为4。1d->ht[0] = n;如果load factor达到了1,还需要看dict当前是否可以resize,如果不可以,那么会提高load factor到5(有子进程时,为了避免COW内存复制),然后才会在
dictExpand()中分配新的hash table。1 2d->ht[1] = n; d->rehashidx = 0;
dictExpand()中,如果分配了新的hash table,那么会set d->rehashidx = 0;,标识开始rehash。
| |
删除key
删除分为两种,
dictDelete():会把key,value和entry都真正的进行删除。最终是调用dictGenericDelete()进行删除。dictUnlink():只会把entry从链表中移除,并返回entry。最终是调用dictGenericDelete()进行删除。 这个函数主要用于,想把entry从hash table中移除,但同时又想使用这个entry的情况。如果没有这个函数,则需要进行两次查找。1 2 3entry = dictFind(...); // Do something with entry dictDelete(dictionary,entry);
| |
rehash
为了避免集中的rehash操作对性能的影响,redis使用的是渐进式rehash。dict.rehashidx初始化为-1,在rehash时,[0, rehashidnex)代表老的hash table已经迁移的bucket。
rehash会两种情况下发生,
每次查询或更新操作时,都会调用
dictRehash()执行一步rehash,一般来说这个函数只会移动一个bucket,也有可能一个都不移动,因为rehash时,每次检查至多10个bucket。1 2 3if (dictIsRehashing(d)) _dictRehashStep(d); dictRehash(d,1);如果server比较空闲,上述rehash的过程就会很慢,dict将会占用两个hash table较长的时间。在初始化redis时,
serverCron()会作为计时器的回调函数定时执行。如果启用了
server.activerehashing,执行过程中,会分配1ms的时间来执行dictRehash()。serverCron()databasesCron()incrementallyRehash()dictRehashMilliseconds(server.db[dbid].dict,1),1msdictRehash(d,100)
int dictRehash(dict *d, int n)中,最多移动100个bucket,在移动每个bucket时,至多会检查n*10个bucket,因为有的bucket可能是空的,因此需要多看几个。
| |
scan
scan可以用于遍历dict中的元素,初始提供一个cursor = 0,每次迭代返回一个新的cursor,当cursor再次为0的时候,遍历结束。
遍历每个bucket时,调用fn对每个元素de进行处理,这个处理可能是,复制de到其他地方,例如:void scanCallback(void *privdata, const dictEntry *de)。
scan需要考虑是否正在rehash的情况,
scan过程中,dict没有变化
由于hash table的大小总是$2^n$,因此每个bucket的index总是
key & 2^n - 1后的一个值,即idx = hash & d->ht[table].sizemask。每次调用
scan时,处理完毕一个bucket中的元素后,v |= ~m0保留cursor的低n个bit(在遍历结束前,v == v | ~m0),并加1。如果遍历结束,那么cursor值为2^n - 1,加1,并v |= ~m0后,v为0。则停止调用dictScan。这里还看不出
reverse cursor的特殊作用。无论高位加1并rev(),还是低位加1,都可以完成上述过程。1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20t0 = &(d->ht[0]); m0 = t0->sizemask; /* Emit entries at cursor */ if (bucketfn) bucketfn(privdata, &t0->table[v & m0]); de = t0->table[v & m0]; while (de) { next = de->next; fn(privdata, de); de = next; } /* Set unmasked bits so incrementing the reversed cursor * operates on the masked bits */ v |= ~m0; /* Increment the reverse cursor */ v = rev(v); v++; v = rev(v);如果正在rehash 如果正在rehash,scan是通过高位掩码的方式来完成扫描的。
先来看bucket的计算方式,
idx = hash & d->ht[table].sizemask。如果发生rehash,假设小表a大小为2^n,大表b为2^m。某个key,在两个表中的bucket分别为hash & d->ht[a].sizemask,hash & d->ht[b].sizemask。hash是一致的,这就意味着,两个bucket的低n位也是一致的。这是rehash时能保证元素一定会被扫描到的关键。scan时,t0(小表)索引为
v & m0的bucket,然后扫描t1中索引低n为v & m0的bucket。由于大表中低n为v & m0的bucket是多个,因此需要在高m - n位不断递增。由于需要在高位不断递增,因此需要先rev(),然后再加1。总结来说,rehash时的扫描过程,其实就是通过一个函数,将一个bucket值
xxxxx,映射到集合aa...axxxxx的过程。scan大表时的停止条件就是高
m - n位都被扫描一遍,v & (m0 ^ m1)。1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41t0 = &d->ht[0]; t1 = &d->ht[1]; /* Make sure t0 is the smaller and t1 is the bigger table */ if (t0->size > t1->size) { t0 = &d->ht[1]; t1 = &d->ht[0]; } m0 = t0->sizemask; m1 = t1->sizemask; /* Emit entries at cursor */ if (bucketfn) bucketfn(privdata, &t0->table[v & m0]); de = t0->table[v & m0]; while (de) { next = de->next; fn(privdata, de); de = next; } /* Iterate over indices in larger table that are the expansion * of the index pointed to by the cursor in the smaller table */ do { /* Emit entries at cursor */ if (bucketfn) bucketfn(privdata, &t1->table[v & m1]); de = t1->table[v & m1]; while (de) { next = de->next; fn(privdata, de); de = next; } /* Increment the reverse cursor not covered by the smaller mask.*/ v |= ~m1; v = rev(v); v++; v = rev(v); /* Continue while bits covered by mask difference is non-zero */ } while (v & (m0 ^ m1));