redis dict
结构
dict使用链地址法解决冲突,size
为\(2^n\),sizemask = size - 1
,用于计算key所属的bucket上,避免了mod,还便于处理scan时发生rehash的情况。每个dict有两个dictht,多个出来的一个用于rehash。
使用了类似list中的方式来保存key和value,并提供了hashFunction
,keyDup
等函数操作key和value。
1 | typedef struct dictEntry { |
添加元素
添加元素的过程, 1. dictAdd()
2.
dictAddRaw()
* 如果需要rehash,`_dictRehashStep()`。
* `_dictKeyIndex()`查找entry。
* 如果找不到,创建新的entry,并插入链表的头部(这里假设了最近创建的会被更频繁的使用)。当rehash时,选择新的hash table,`d->ht[1]`进行操作。
1
2
3
4
5
6
7
8
ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
entry = zmalloc(sizeof(*entry));
entry->next = ht->table[index];
ht->table[index] = entry;
ht->used++;
/* Set the hash entry fields. */
dictSetKey(d, entry, key);
_dictKeyIndex()
- 检查是否需要扩张hash
table(扩张或为
d->ht[0]
进行初始化),_dictExpandIfNeeded()
。 - 使用
hash & d->ht[table].sizemask
计算所属bucket。 - 查找entry。在rehash的时候,会在两个hash table中进行查找。
1
2
3
4
5
6
7
8
9
10
11
12
13for (table = 0; table <= 1; table++) {
idx = hash & d->ht[table].sizemask;
/* Search if this slot does not already contain the given key */
he = d->ht[table].table[idx];
while(he) {
if (key==he->key || dictCompareKeys(d, key, he->key)) {
if (existing) *existing = he;
return -1;
}
he = he->next;
}
if (!dictIsRehashing(d)) break;
}- 检查是否需要扩张hash
table(扩张或为
扩张hash table
每次add都会调用_dictExpandIfNeeded()
。
1 | static int _dictExpandIfNeeded(dict *d) |
如果
d->ht[0]
为空,那么dictExpand()
中为d->ht[0]
初始化,初始大小为4。1
d->ht[0] = n;
如果load factor达到了1,还需要看dict当前是否可以resize,如果不可以,那么会提高load factor到5(有子进程时,为了避免COW内存复制),然后才会在
dictExpand()
中分配新的hash table。1
2d->ht[1] = n;
d->rehashidx = 0;
dictExpand()
中,如果分配了新的hash table,那么会set
d->rehashidx = 0;
,标识开始rehash。
1 | /* Expand or create the hash table */ |
删除key
删除分为两种, *
dictDelete()
:会把key,value和entry都真正的进行删除。最终是调用dictGenericDelete()
进行删除。
*
dictUnlink()
:只会把entry从链表中移除,并返回entry。最终是调用dictGenericDelete()
进行删除。
这个函数主要用于,想把entry从hash
table中移除,但同时又想使用这个entry的情况。如果没有这个函数,则需要进行两次查找。
1
2
3
entry = dictFind(...);
// Do something with entry
dictDelete(dictionary,entry);
1 | static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) { |
rehash
为了避免集中的rehash操作对性能的影响,redis使用的是渐进式rehash。dict.rehashidx
初始化为-1,在rehash时,[0, rehashidnex)
代表老的hash
table已经迁移的bucket。
rehash会两种情况下发生, 1.
每次查询或更新操作时,都会调用dictRehash()
执行一步rehash,一般来说这个函数只会移动一个bucket,也有可能一个都不移动,因为rehash时,每次检查至多10个bucket。
1
2
3
if (dictIsRehashing(d)) _dictRehashStep(d);
dictRehash(d,1);
如果server比较空闲,上述rehash的过程就会很慢,dict将会占用两个hash table较长的时间。在初始化redis时,
serverCron()
会作为计时器的回调函数定时执行。如果启用了
server.activerehashing
,执行过程中,会分配1ms的时间来执行dictRehash()
。serverCron()
databasesCron()
incrementallyRehash()
dictRehashMilliseconds(server.db[dbid].dict,1)
,1msdictRehash(d,100)
int dictRehash(dict *d, int n)
中,最多移动100个bucket,在移动每个bucket时,至多会检查n*10
个bucket,因为有的bucket可能是空的,因此需要多看几个。
1 | int dictRehash(dict *d, int n) { |
scan
scan可以用于遍历dict中的元素,初始提供一个cursor = 0
,每次迭代返回一个新的cursor
,当cursor
再次为0的时候,遍历结束。
遍历每个bucket时,调用fn
对每个元素de
进行处理,这个处理可能是,复制de
到其他地方,例如:void scanCallback(void *privdata, const dictEntry *de)
。
scan需要考虑是否正在rehash的情况, 1. scan过程中,dict没有变化
由于hash table的大小总是$2^n$,因此每个bucket的index总是`key & 2^n - 1`后的一个值,即`idx = hash & d->ht[table].sizemask`。
每次调用`scan`时,处理完毕一个bucket中的元素后,`v |= ~m0`保留cursor的低n个bit(在遍历结束前,`v == v | ~m0`),并加1。如果遍历结束,那么cursor值为`2^n - 1`,加1,并`v |= ~m0`后,`v`为0。则停止调用`dictScan`。
这里还看不出`reverse cursor`的特殊作用。无论高位加1并`rev()`,还是低位加1,都可以完成上述过程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
t0 = &(d->ht[0]);
m0 = t0->sizemask;
/* Emit entries at cursor */
if (bucketfn) bucketfn(privdata, &t0->table[v & m0]);
de = t0->table[v & m0];
while (de) {
next = de->next;
fn(privdata, de);
de = next;
}
/* Set unmasked bits so incrementing the reversed cursor
* operates on the masked bits */
v |= ~m0;
/* Increment the reverse cursor */
v = rev(v);
v++;
v = rev(v);
如果正在rehash 如果正在rehash,scan是通过高位掩码的方式来完成扫描的。
先来看bucket的计算方式,
idx = hash & d->ht[table].sizemask
。如果发生rehash,假设小表a大小为2^n
,大表b为2^m
。某个key,在两个表中的bucket分别为hash & d->ht[a].sizemask
,hash & d->ht[b].sizemask
。hash是一致的,这就意味着,两个bucket的低n位也是一致的。这是rehash时能保证元素一定会被扫描到的关键。scan时,t0(小表)索引为
v & m0
的bucket,然后扫描t1中索引低n
为v & m0
的bucket。由于大表中低n
为v & m0
的bucket是多个,因此需要在高m - n
位不断递增。由于需要在高位不断递增,因此需要先rev()
,然后再加1。总结来说,rehash时的扫描过程,其实就是通过一个函数,将一个bucket值
xxxxx
,映射到集合aa...axxxxx
的过程。scan大表时的停止条件就是高
m - n
位都被扫描一遍,v & (m0 ^ m1)
。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41t0 = &d->ht[0];
t1 = &d->ht[1];
/* Make sure t0 is the smaller and t1 is the bigger table */
if (t0->size > t1->size) {
t0 = &d->ht[1];
t1 = &d->ht[0];
}
m0 = t0->sizemask;
m1 = t1->sizemask;
/* Emit entries at cursor */
if (bucketfn) bucketfn(privdata, &t0->table[v & m0]);
de = t0->table[v & m0];
while (de) {
next = de->next;
fn(privdata, de);
de = next;
}
/* Iterate over indices in larger table that are the expansion
* of the index pointed to by the cursor in the smaller table */
do {
/* Emit entries at cursor */
if (bucketfn) bucketfn(privdata, &t1->table[v & m1]);
de = t1->table[v & m1];
while (de) {
next = de->next;
fn(privdata, de);
de = next;
}
/* Increment the reverse cursor not covered by the smaller mask.*/
v |= ~m1;
v = rev(v);
v++;
v = rev(v);
/* Continue while bits covered by mask difference is non-zero */
} while (v & (m0 ^ m1));