0%

quicklist是由ziplist构成的双向链表。ziplist需要可能对内存进行复制,在长度较长的时候,性能不佳。quicklist存储多个小ziplist,对除headtail外的节点还进行了压缩,保证了push和pop性能的同时,又减少了内存的占用。

结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
typedef struct quicklistNode {
struct quicklistNode *prev;
struct quicklistNode *next;
unsigned char *zl;
unsigned int sz; /* ziplist size in bytes */
unsigned int count : 16; /* count of items in ziplist */
unsigned int encoding : 2; /* RAW==1 or LZF==2 */
unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */
unsigned int recompress : 1; /* was this node previous compressed? */
unsigned int attempted_compress : 1; /* node can't compress; too small */
unsigned int extra : 10; /* more bits to steal for future usage */
} quicklistNode;

typedef struct quicklistLZF {
unsigned int sz; /* LZF size in bytes*/
char compressed[];
} quicklistLZF;

typedef struct quicklist {
quicklistNode *head;
quicklistNode *tail;
unsigned long count; /* total count of all entries in all ziplists */
unsigned long len; /* number of quicklistNodes */
int fill : 16; /* fill factor for individual nodes */
unsigned int compress : 16; /* depth of end nodes not to compress;0=off */
} quicklist;

quicklistLZF:存储压缩后的ziplist

quicklist

  • fill:存放list-max-ziplist-size参数
    • fill取正值时,表示entry个数
    • 取负值时,表示大小
  • compress:存放list-compress-depth参数

push

push时,先检查本次push是否会超过限制大小,quicklist->fill,依次按照用户定义大小、默认个数(SIZE_SAFETY_LIMIT)、用户定义个数检查。

  • 如果没有超过,那么在head的ziplist头部添加entry。
  • 如果超过,那么创建新的ziplist,并在quicklist->head前插入新node。最后压缩原来的quicklist->head节点。

quicklistPushTail也是类似的,但是在ziplist的尾部添加entry,且插入后是压缩原来的quicklist->tail节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
quicklistNode *orig_head = quicklist->head;
if (likely(
_quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {
quicklist->head->zl =
ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD);
quicklistNodeUpdateSz(quicklist->head);
} else {
quicklistNode *node = quicklistCreateNode();
node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);

quicklistNodeUpdateSz(node);
_quicklistInsertNodeBefore(quicklist, quicklist->head, node);
}
quicklist->count++;
quicklist->head->count++;
return (orig_head != quicklist->head);
}

ziplist

结构

ziplist是一个压缩的双向链表,由一个特殊编码的连续内存块构成。如果没有特殊指定,所有字段都是以小端序来存储的。

1
2
3
4
5
ziplist:
<zlbytes> <zltail> <zllen> <entry> <entry> ... <entry> <zlend>

entry:
<prevlen> <encoding> <entry-data>

ziplist:

  • uint32_t zlbytes:ziplist占据的内存大小,包括zlbytes字段。
  • uint32_t zltail:最后一个entry(非zlend)的offset,记录了尾节点距离起始地址的字节数,相当于尾指针的作用。
  • uint16_t zllen:entry的数目,如果元素数目大于2^16-2,那么值为2^16-1,只有通过遍历才能得知,有多少个元素。zlen不计入entry数目。
  • uint8_t zlend:特殊的entry,ziplist的结尾标记,值为0xFF

每个entry大小是不固定的:

  • prevlen:前一个entry的长度,用于从后往前遍历时计算prev entry的起始地址。长度小于254字节和大于等于254字节分别使用两种存储方式。

    value prevlen size
    < 254 0xbb 1字节
    >=254 0xFE xx xx xx xx 5字节

    当长度大于等于254时,0xFE代表prevlen值的类型,后面的4个字节才是长度。

    这样存储方式刚好可以与zlend0xFF区分开来,没有一个entry的开头会是0xFF

  • encoding:记录了entry-data数据类型和长度。

    前两个bit代表存储的是字节数组,还是整型。

    encoding size 值的类型
    00pppppp 1 byte 长度<=63的字节数组
    `01pppppp qqqqqqqq` 2 bytes(14 bits使用大端序存储)
    `10000000 qqqqqqqq rrrrrrrr
    11000000 1 byte int16_t
    11010000 1 byte int32_t
    11100000 1 byte int64_t
    11110000 1 byte 24 bit signed integer
    11111110 1 byte 8 bit signed integer
    1111xxxx 1 byte 没有content属性,4个bit存储[0, 12]的值
    11111111 1 byte zlend
  • entry-data:节点的值,字节数组或整形。

插入

ziplist使用了很多宏来实现,主要是进行类型转换和指针运算以便访问相关的字段。

插入分三个情况,
-w629

需要分别准备新entry的prevlenencoding,分配空间。

  1. prevlen
    先看插入位置是否在zlen之前,

    • 如果不在,p处的prevlen就是新entry的prevlen

    • 如果在,此时无法直接得知prevlen,需要看zlend前是否还有元素。若没有,则zipRawEntryLength(ptail)计算tail元素的长度。

      p不是zlend的时候,新entry的len会作为p处entry的prevlen,需要确保pprevlen空间足够。

      -w736

  2. encoding
    对于encoding,先会尝试编码为整型zipTryEncoding(s,slen,&value,&encoding)

    • 如果可以编码为整型,那么zipIntSize(encoding)得到entry-data的大小。
    • 如果无法合法的编码为整型,那么根据slen编码为字节数组zipStoreEntryEncoding(NULL,encoding,slen),entry-data的大小为slen
  3. 分配空间,
    计算出各字段所需的空间后,memmove()来完成空间的调整。要注意对源地址的处理,p-nextdiff

    • 如果nextdiff == 0,说明p处entry的prevlen,可以保存新entry的大小。

    • 如果nextdiff == 4,那么说明空间不够,此时p处entry的prevlen原大小为1 byte,从p往前数4 bytes,加起来的5 bytes作为新prevlen的存储,因此从p - 4处开始memmove()

      -w614

    • 如果nextdiff == -4,说明空间多出4 bytes,从p + 4的位置开始memmove()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen;
unsigned int prevlensize, prevlen = 0;
size_t offset;
int nextdiff = 0;
unsigned char encoding = 0;
long long value = 123456789; /* initialized to avoid warning. Using a value
that is easy to see if for some reason
we use it uninitialized. */
zlentry tail;

/* Find out prevlen for the entry that is inserted.
*
* entry开头的1 byte是不是0xFF
*
* 如果插入位置是在zlend处,需要判断zlend前是否有entry,
* 如果没有,那么新prevlen为0,
* 如果有,那么需要计算zl+zltail处entry的大小,将其设置为新entry的prevlen
*
*/
if (p[0] != ZIP_END) {
ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
} else {
unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);
if (ptail[0] != ZIP_END) {
prevlen = zipRawEntryLength(ptail);
}
}

/* See if the entry can be encoded */
if (zipTryEncoding(s,slen,&value,&encoding)) {
/* 'encoding' is set to the appropriate integer encoding */
reqlen = zipIntSize(encoding);
} else {
/* 'encoding' is untouched, however zipStoreEntryEncoding will use the
* string length to figure out how to encode it. */
reqlen = slen;
}
/* We need space for both the length of the previous entry and
* the length of the payload. */
reqlen += zipStorePrevEntryLength(NULL,prevlen);
reqlen += zipStoreEntryEncoding(NULL,encoding,slen);

/* When the insert position is not equal to the tail, we need to
* make sure that the next entry can hold this entry's length in
* its prevlen field. */
int forcelarge = 0;
nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;
if (nextdiff == -4 && reqlen < 4) {
nextdiff = 0;
forcelarge = 1;
}

/* Store offset because a realloc may change the address of zl. */
offset = p-zl;
zl = ziplistResize(zl,curlen+reqlen+nextdiff);
p = zl+offset;

/* Apply memory move when necessary and update tail offset. */
if (p[0] != ZIP_END) {
/* Subtract one because of the ZIP_END bytes */
memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);

/* Encode this entry's raw length in the next entry. */
if (forcelarge)
zipStorePrevEntryLengthLarge(p+reqlen,reqlen);
else
zipStorePrevEntryLength(p+reqlen,reqlen);

/* Update offset for tail */
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);

/* When the tail contains more than one entry, we need to take
* "nextdiff" in account as well. Otherwise, a change in the
* size of prevlen doesn't have an effect on the *tail* offset. */
zipEntry(p+reqlen, &tail);
if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
}
} else {
/* This element will be the new tail. */
ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);
}

/* When nextdiff != 0, the raw length of the next entry has changed, so
* we need to cascade the update throughout the ziplist */
if (nextdiff != 0) {
offset = p-zl;
zl = __ziplistCascadeUpdate(zl,p+reqlen);
p = zl+offset;
}

/* Write the entry */
p += zipStorePrevEntryLength(p,prevlen);
p += zipStoreEntryEncoding(p,encoding,slen);
if (ZIP_IS_STR(encoding)) {
memcpy(p,s,slen);
} else {
zipSaveInteger(p,value,encoding);
}
ZIPLIST_INCR_LENGTH(zl,1);
return zl;
}

连锁更新

插入或删除时,可能导致后面entry存储的prevlen发生变化。理论上,扩张和缩小都会有,但redis有意忽略了缩小的情况,避免连续的插入导致频繁的扩容和缩小。

1
2
3
4
5
6
7
8
9
10
if (next.prevrawlensize > rawlensize) {
/* This would result in shrinking, which we want to avoid.
* So, set "rawlen" in the available bytes. */
zipStorePrevEntryLengthLarge(p+rawlen,rawlen);
} else {
zipStorePrevEntryLength(p+rawlen,rawlen);
}

/* Stop here, as the raw length of "next" has not changed. */
break;

结构

intset存储了有序的整数集合。

1
2
3
4
5
typedef struct intset {
uint32_t encoding;
uint32_t length;
int8_t contents[];
} intset;

encoding决定如何解析contents,取值为,

1
2
3
#define INTSET_ENC_INT16 (sizeof(int16_t))
#define INTSET_ENC_INT32 (sizeof(int32_t))
#define INTSET_ENC_INT64 (sizeof(int64_t))

字节序

不同于其他结构,intset在存储的时候考虑了字节序的问题,redis会使用小端序来存储intset的所有字段。目的是intset能够兼容不同字节序的cpu。

sds也有多字节的字段,为什么sds不做这个转换?

查找

由于contents[]是有序的,因此直接使用二分查找。

在执行二分查找前,先进行了特殊情况的判断,避免进行多余的搜索,

  • 如果数组长度为0,则直接返回。
  • 如果value大于末尾元素,或小于首部元素,也直接返回,并set插入位置pos

二分查找中使用了mid = ((unsigned int)min + (unsigned int)max) >> 1来计算mid,会不会存在溢出的问题?毕竟intset的length的类型是uint32_t,理论可以保存$2^{32}$个元素。redis仅使用intset存储少量的元素,如果元素过多,会使用其他方式存储。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) {
int min = 0, max = intrev32ifbe(is->length)-1, mid = -1;
int64_t cur = -1;

/* The value can never be found when the set is empty */
if (intrev32ifbe(is->length) == 0) {
if (pos) *pos = 0;
return 0;
} else {
/* Check for the case where we know we cannot find the value,
* but do know the insert position. */
if (value > _intsetGet(is,max)) {
if (pos) *pos = intrev32ifbe(is->length);
return 0;
} else if (value < _intsetGet(is,0)) {
if (pos) *pos = 0;
return 0;
}
}

while(max >= min) {
mid = ((unsigned int)min + (unsigned int)max) >> 1;
cur = _intsetGet(is,mid);
if (value > cur) {
min = mid+1;
} else if (value < cur) {
max = mid-1;
} else {
break;
}
}

if (value == cur) {
if (pos) *pos = mid;
return 1;
} else {
if (pos) *pos = min;
return 0;
}
}

插入

插入分两种情况,

  • 可直接插入
  • 新元素的类型比集合存储的类型长,需要升级

直接插入

对于直接插入,先使用intsetSearch()找到待插入位置pos,然后移动pos后的所有元素。

1
2
3
4
5
6
7
if (intsetSearch(is,value,&pos)) {
if (success) *success = 0;
return is;
}

is = intsetResize(is,intrev32ifbe(is->length)+1);
if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1);

移动元素分两个步骤完成,

  • intsetResize()中调用zrealloc()先分配is->length+1的空间,这里可能会有一次内存拷贝。

  • 然后在intsetMoveTail()使用memmove()移动内存,这里一定会有一次内存拷贝。memmove()保证了在原位置和目标位置的情况下,能够安全的进行拷贝。

    计算源地址和目的地址时,需要先将contents转换为内容实际对应的类型,然后再做指针的计算。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static intset *intsetResize(intset *is, uint32_t len) {
uint32_t size = len*intrev32ifbe(is->encoding);
is = zrealloc(is,sizeof(intset)+size);
return is;
}

static void intsetMoveTail(intset *is, uint32_t from, uint32_t to) {
void *src, *dst;
uint32_t bytes = intrev32ifbe(is->length)-from;
uint32_t encoding = intrev32ifbe(is->encoding);

if (encoding == INTSET_ENC_INT64) {
src = (int64_t*)is->contents+from;
dst = (int64_t*)is->contents+to;
bytes *= sizeof(int64_t);
} else if (encoding == INTSET_ENC_INT32) {
src = (int32_t*)is->contents+from;
dst = (int32_t*)is->contents+to;
bytes *= sizeof(int32_t);
} else {
src = (int16_t*)is->contents+from;
dst = (int16_t*)is->contents+to;
bytes *= sizeof(int16_t);
}
memmove(dst,src,bytes);
}

需要升级编码

对于插入类型大于集合现有类型的情况,那么待插入的元素一定是在数组头部(负数的时候)或者尾部。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/* Upgrades the intset to a larger encoding and inserts the given integer. */
static intset *intsetUpgradeAndAdd(intset *is, int64_t value) {
uint8_t curenc = intrev32ifbe(is->encoding);
uint8_t newenc = _intsetValueEncoding(value);
int length = intrev32ifbe(is->length);
int prepend = value < 0 ? 1 : 0;

/* First set new encoding and resize */
is->encoding = intrev32ifbe(newenc);
is = intsetResize(is,intrev32ifbe(is->length)+1);

/* Upgrade back-to-front so we don't overwrite values.
* Note that the "prepend" variable is used to make sure we have an empty
* space at either the beginning or the end of the intset. */
while(length--)
_intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc));

/* Set the value at the beginning or the end. */
if (prepend)
_intsetSet(is,0,value);
else
_intsetSet(is,intrev32ifbe(is->length),value);
is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
return is;
}

判断新的编码类型并分配内存以后,从后往前遍历,逐个升级编码。这里需要处理prepend和append的情况。区别就是对于prepend,遍历时,新的元素的位置是当前位置+1,_intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc))

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/* Set the value at pos, using the configured encoding. */
static void _intsetSet(intset *is, int pos, int64_t value) {
uint32_t encoding = intrev32ifbe(is->encoding);

if (encoding == INTSET_ENC_INT64) {
((int64_t*)is->contents)[pos] = value;
memrev64ifbe(((int64_t*)is->contents)+pos);
} else if (encoding == INTSET_ENC_INT32) {
((int32_t*)is->contents)[pos] = value;
memrev32ifbe(((int32_t*)is->contents)+pos);
} else {
((int16_t*)is->contents)[pos] = value;
memrev16ifbe(((int16_t*)is->contents)+pos);
}
}

降级编码

除了遍历,无法确定是否存在超出范围的元素,intset不支持降级。

删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/* Delete integer from intset */
intset *intsetRemove(intset *is, int64_t value, int *success) {
uint8_t valenc = _intsetValueEncoding(value);
uint32_t pos;
if (success) *success = 0;

if (valenc <= intrev32ifbe(is->encoding) && intsetSearch(is,value,&pos)) {
uint32_t len = intrev32ifbe(is->length);

/* We know we can delete */
if (success) *success = 1;

/* Overwrite value with tail and update length */
if (pos < (len-1)) intsetMoveTail(is,pos+1,pos);
is = intsetResize(is,len-1);
is->length = intrev32ifbe(len-1);
}
return is;
}

skiplist

结构

skiplist提供了平均$O(log n)$的查找、插入和删除。zskiplistNode中使用柔性数组设计,每个节点存储了sds,level数组存储了各层指向其他节点的指针,最大层数为ZSKIPLIST_MAXLEVEL = 64

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
sds ele;
double score;
struct zskiplistNode *backward;
struct zskiplistLevel {
struct zskiplistNode *forward;
unsigned long span;
} level[];
} zskiplistNode;

typedef struct zskiplist {
struct zskiplistNode *header, *tail;
unsigned long length;
int level;
} zskiplist;

插入

先找到待插入的位置,查找的时候从顶层开始,在当前层尽可能的往前移动,每层都满足[header, update[i]]:(< score) or (== score and < ele))。最终的插入位置在update[i]之后。

要提的一点是redis的skiplist还维护了,

  • span,记录了两个节点间的距离。

    • 寻找插入位置时,需要在每层都累加节点的span

    • 插入时,

      1
      2
      x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
      update[i]->level[i].span = (rank[0] - rank[i]) + 1;
  • forward,记录了节点的prev指针。

    在插入时,如果新节点的random level > zsl->level,则需要将update中多出来的level设置为header。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;

serverAssert(!isnan(score));
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
/* we assume the element is not already inside, since we allow duplicated
* scores, reinserting the same element should never happen since the
* caller of zslInsert() should test in the hash table if the element is
* already inside or not. */
level = zslRandomLevel();
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
x = zslCreateNode(level,score,ele);
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;

/* update span covered by update[i] as x is inserted here */
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}

/* increment span for untouched levels */
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}

x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}

删除

和以插入相同的方式查找待删除节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;

x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
x = x->level[i].forward;
}
update[i] = x;
}
/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object. */
x = x->level[0].forward;
if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
zslDeleteNode(zsl, x, update);
if (!node)
zslFreeNode(x);
else
*node = x;
return 1;
}
return 0; /* not found */
}

删除时需要维护spanforward

  • 如果当前层update[i]->level[i].forward == x

    • span

      累加上x的,并减1。

    • forward

      在每层指向待删除节点的下一个节点,update[i]->level[i].forward = x->level[i].forward

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
for (i = 0; i < zsl->level; i++) {
if (update[i]->level[i].forward == x) {
update[i]->level[i].span += x->level[i].span - 1;
update[i]->level[i].forward = x->level[i].forward;
} else {
update[i]->level[i].span -= 1;
}
}
if (x->level[0].forward) {
x->level[0].forward->backward = x->backward;
} else {
zsl->tail = x->backward;
}
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
zsl->level--;
zsl->length--;
}

Reference

  1. Redis内部数据结构详解(6)——skiplist

结构

dict使用链地址法解决冲突,size为$2^n$,sizemask = size - 1,用于计算key所属的bucket上,避免了mod,还便于处理scan时发生rehash的情况。每个dict有两个dictht,多个出来的一个用于rehash。

使用了类似list中的方式来保存key和value,并提供了hashFunctionkeyDup等函数操作key和value。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
typedef struct dictEntry {
void *key;
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
struct dictEntry *next;
} dictEntry;

/* This is our hash table structure. Every dictionary has two of this as we
* implement incremental rehashing, for the old to the new table. */
typedef struct dictht {
dictEntry **table;
unsigned long size;
unsigned long sizemask;
unsigned long used;
} dictht;

typedef struct dict {
dictType *type;
void *privdata;
dictht ht[2];
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
unsigned long iterators; /* number of iterators currently running */
} dict;

添加元素

添加元素的过程,

  1. dictAdd()

  2. dictAddRaw()

    • 如果需要rehash,_dictRehashStep()

    • _dictKeyIndex()查找entry。

    • 如果找不到,创建新的entry,并插入链表的头部(这里假设了最近创建的会被更频繁的使用)。当rehash时,选择新的hash table,d->ht[1]进行操作。

      1
      2
      3
      4
      5
      6
      7
      8
      ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
      entry = zmalloc(sizeof(*entry));
      entry->next = ht->table[index];
      ht->table[index] = entry;
      ht->used++;

      /* Set the hash entry fields. */
      dictSetKey(d, entry, key);
  3. _dictKeyIndex()

    • 检查是否需要扩张hash table(扩张或为d->ht[0]进行初始化),_dictExpandIfNeeded()

    • 使用hash & d->ht[table].sizemask计算所属bucket。

    • 查找entry。在rehash的时候,会在两个hash table中进行查找。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      for (table = 0; table <= 1; table++) {
      idx = hash & d->ht[table].sizemask;
      /* Search if this slot does not already contain the given key */
      he = d->ht[table].table[idx];
      while(he) {
      if (key==he->key || dictCompareKeys(d, key, he->key)) {
      if (existing) *existing = he;
      return -1;
      }
      he = he->next;
      }
      if (!dictIsRehashing(d)) break;
      }

扩张hash table

每次add都会调用_dictExpandIfNeeded()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static int _dictExpandIfNeeded(dict *d)
{
/* Incremental rehashing already in progress. Return. */
if (dictIsRehashing(d)) return DICT_OK;

/* If the hash table is empty expand it to the initial size. */
if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);

/* If we reached the 1:1 ratio, and we are allowed to resize the hash
* table (global setting) or we should avoid it but the ratio between
* elements/buckets is over the "safe" threshold, we resize doubling
* the number of buckets. */
if (d->ht[0].used >= d->ht[0].size &&
(dict_can_resize ||
d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))
{
return dictExpand(d, d->ht[0].used*2);
}
return DICT_OK;
}
  • 如果d->ht[0]为空,那么dictExpand()中为d->ht[0]初始化,初始大小为4。

    1
    d->ht[0] = n;
  • 如果load factor达到了1,还需要看dict当前是否可以resize,如果不可以,那么会提高load factor到5(有子进程时,为了避免COW内存复制),然后才会在dictExpand()中分配新的hash table。

    1
    2
    d->ht[1] = n;
    d->rehashidx = 0;

dictExpand()中,如果分配了新的hash table,那么会set d->rehashidx = 0;,标识开始rehash。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/* Expand or create the hash table */
int dictExpand(dict *d, unsigned long size)
{
/* the size is invalid if it is smaller than the number of
* elements already inside the hash table */
if (dictIsRehashing(d) || d->ht[0].used > size)
return DICT_ERR;

dictht n; /* the new hash table */
unsigned long realsize = _dictNextPower(size);

/* Rehashing to the same table size is not useful. */
if (realsize == d->ht[0].size) return DICT_ERR;

/* Allocate the new hash table and initialize all pointers to NULL */
n.size = realsize;
n.sizemask = realsize-1;
n.table = zcalloc(realsize*sizeof(dictEntry*));
n.used = 0;

/* Is this the first initialization? If so it's not really a rehashing
* we just set the first hash table so that it can accept keys. */
if (d->ht[0].table == NULL) {
d->ht[0] = n;
return DICT_OK;
}

/* Prepare a second hash table for incremental rehashing */
d->ht[1] = n;
d->rehashidx = 0;
return DICT_OK;
}

删除key

删除分为两种,

  • dictDelete():会把key,value和entry都真正的进行删除。最终是调用dictGenericDelete()进行删除。

  • dictUnlink():只会把entry从链表中移除,并返回entry。最终是调用dictGenericDelete()进行删除。
    这个函数主要用于,想把entry从hash table中移除,但同时又想使用这个entry的情况。如果没有这个函数,则需要进行两次查找。

    1
    2
    3
    entry = dictFind(...);
    // Do something with entry
    dictDelete(dictionary,entry);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) {
uint64_t h, idx;
dictEntry *he, *prevHe;
int table;

if (d->ht[0].used == 0 && d->ht[1].used == 0) return NULL;

if (dictIsRehashing(d)) _dictRehashStep(d);
h = dictHashKey(d, key);

for (table = 0; table <= 1; table++) {
idx = h & d->ht[table].sizemask;
he = d->ht[table].table[idx];
prevHe = NULL;
while(he) {
if (key==he->key || dictCompareKeys(d, key, he->key)) {
/* Unlink the element from the list */
if (prevHe)
prevHe->next = he->next;
else
d->ht[table].table[idx] = he->next;
if (!nofree) {
dictFreeKey(d, he);
dictFreeVal(d, he);
zfree(he);
}
d->ht[table].used--;
return he;
}
prevHe = he;
he = he->next;
}
if (!dictIsRehashing(d)) break;
}
return NULL; /* not found */
}

rehash

为了避免集中的rehash操作对性能的影响,redis使用的是渐进式rehash。dict.rehashidx初始化为-1,在rehash时,[0, rehashidnex)代表老的hash table已经迁移的bucket。

rehash会两种情况下发生,

  1. 每次查询或更新操作时,都会调用dictRehash()执行一步rehash,一般来说这个函数只会移动一个bucket,也有可能一个都不移动,因为rehash时,每次检查至多10个bucket。

    1
    2
    3
    if (dictIsRehashing(d)) _dictRehashStep(d);

    dictRehash(d,1);
  2. 如果server比较空闲,上述rehash的过程就会很慢,dict将会占用两个hash table较长的时间。在初始化redis时,serverCron()会作为计时器的回调函数定时执行。

    如果启用了server.activerehashing,执行过程中,会分配1ms的时间来执行dictRehash()

    • serverCron()
    • databasesCron()
    • incrementallyRehash()
    • dictRehashMilliseconds(server.db[dbid].dict,1),1ms
    • dictRehash(d,100)

int dictRehash(dict *d, int n)中,最多移动100个bucket,在移动每个bucket时,至多会检查n*10个bucket,因为有的bucket可能是空的,因此需要多看几个。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
int dictRehash(dict *d, int n) {
int empty_visits = n*10; /* Max number of empty buckets to visit. */
if (!dictIsRehashing(d)) return 0;

while(n-- && d->ht[0].used != 0) {
dictEntry *de, *nextde;

/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
assert(d->ht[0].size > (unsigned long)d->rehashidx);
while(d->ht[0].table[d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) return 1;
}
de = d->ht[0].table[d->rehashidx];
/* Move all the keys in this bucket from the old to the new hash HT */
while(de) {
uint64_t h;

nextde = de->next;
/* Get the index in the new hash table */
h = dictHashKey(d, de->key) & d->ht[1].sizemask;
de->next = d->ht[1].table[h];
d->ht[1].table[h] = de;
d->ht[0].used--;
d->ht[1].used++;
de = nextde;
}
d->ht[0].table[d->rehashidx] = NULL;
d->rehashidx++;
}

/* Check if we already rehashed the whole table... */
if (d->ht[0].used == 0) {
zfree(d->ht[0].table);
d->ht[0] = d->ht[1];
_dictReset(&d->ht[1]);
d->rehashidx = -1;
return 0;
}

/* More to rehash... */
return 1;
}

scan

scan可以用于遍历dict中的元素,初始提供一个cursor = 0,每次迭代返回一个新的cursor,当cursor再次为0的时候,遍历结束。

遍历每个bucket时,调用fn对每个元素de进行处理,这个处理可能是,复制de到其他地方,例如:void scanCallback(void *privdata, const dictEntry *de)

scan需要考虑是否正在rehash的情况,

  1. scan过程中,dict没有变化

    由于hash table的大小总是$2^n$,因此每个bucket的index总是key & 2^n - 1后的一个值,即idx = hash & d->ht[table].sizemask

    每次调用scan时,处理完毕一个bucket中的元素后,v |= ~m0保留cursor的低n个bit(在遍历结束前,v == v | ~m0),并加1。如果遍历结束,那么cursor值为2^n - 1,加1,并v |= ~m0后,v为0。则停止调用dictScan

    这里还看不出reverse cursor的特殊作用。无论高位加1并rev(),还是低位加1,都可以完成上述过程。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    t0 = &(d->ht[0]);
    m0 = t0->sizemask;

    /* Emit entries at cursor */
    if (bucketfn) bucketfn(privdata, &t0->table[v & m0]);
    de = t0->table[v & m0];
    while (de) {
    next = de->next;
    fn(privdata, de);
    de = next;
    }

    /* Set unmasked bits so incrementing the reversed cursor
    * operates on the masked bits */
    v |= ~m0;

    /* Increment the reverse cursor */
    v = rev(v);
    v++;
    v = rev(v);
  2. 如果正在rehash
    如果正在rehash,scan是通过高位掩码的方式来完成扫描的。

    先来看bucket的计算方式,idx = hash & d->ht[table].sizemask。如果发生rehash,假设小表a大小为2^n,大表b为2^m。某个key,在两个表中的bucket分别为hash & d->ht[a].sizemaskhash & d->ht[b].sizemask。hash是一致的,这就意味着,两个bucket的低n位也是一致的。这是rehash时能保证元素一定会被扫描到的关键。

    scan时,t0(小表)索引为v & m0的bucket,然后扫描t1中索引低nv & m0的bucket。由于大表中低nv & m0的bucket是多个,因此需要在高m - n位不断递增。由于需要在高位不断递增,因此需要先rev(),然后再加1。

    总结来说,rehash时的扫描过程,其实就是通过一个函数,将一个bucket值xxxxx,映射到集合aa...axxxxx的过程。

    scan大表时的停止条件就是高m - n位都被扫描一遍,v & (m0 ^ m1)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    t0 = &d->ht[0];
    t1 = &d->ht[1];

    /* Make sure t0 is the smaller and t1 is the bigger table */
    if (t0->size > t1->size) {
    t0 = &d->ht[1];
    t1 = &d->ht[0];
    }

    m0 = t0->sizemask;
    m1 = t1->sizemask;

    /* Emit entries at cursor */
    if (bucketfn) bucketfn(privdata, &t0->table[v & m0]);
    de = t0->table[v & m0];
    while (de) {
    next = de->next;
    fn(privdata, de);
    de = next;
    }

    /* Iterate over indices in larger table that are the expansion
    * of the index pointed to by the cursor in the smaller table */
    do {
    /* Emit entries at cursor */
    if (bucketfn) bucketfn(privdata, &t1->table[v & m1]);
    de = t1->table[v & m1];
    while (de) {
    next = de->next;
    fn(privdata, de);
    de = next;
    }

    /* Increment the reverse cursor not covered by the smaller mask.*/
    v |= ~m1;
    v = rev(v);
    v++;
    v = rev(v);

    /* Continue while bits covered by mask difference is non-zero */
    } while (v & (m0 ^ m1));

结构

reids的list采用的是双向链表的实现,未使用dummy node。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
typedef struct listNode {
struct listNode *prev;
struct listNode *next;
void *value;
} listNode;

typedef struct listIter {
listNode *next;
int direction;
} listIter;

typedef struct list {
listNode *head;
listNode *tail;
// 节点值复制函数
void *(*dup)(void *ptr);
// 节点值释放函数
void (*free)(void *ptr);
// 节点值对比函数
int (*match)(void *ptr, void *key);
unsigned long len;
} list;

特点

  • 双端
  • 无环
  • 有头尾指针
  • 有长度计数器
  • 多态(使用void*来保存节点值,并提供freedupmatch来处理节点)

结构

sds主要由lenallocbuf构成。其中buf是柔性数组,分配sds的时候,这个结构体会作为header。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
typedef char *sds;

/* Note: sdshdr5 is never used, we just access the flags byte directly.
* However is here to document the layout of type 5 SDS strings. */
struct __attribute__ ((__packed__)) sdshdr5 {
unsigned char flags; /* 3 lsb of type, and 5 msb of string length */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr8 {
uint8_t len; /* used */
uint8_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr16 {
uint16_t len; /* used */
uint16_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};

// 如果没有packed,那么sizeof(sdshdr32) =
struct __attribute__ ((__packed__)) sdshdr32 {
uint32_t len; /* used */
uint32_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr64 {
uint64_t len; /* used */
uint64_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};

为sds分配空间,initlen是字符串的长度,1是末尾的’\0’。

1
sh = s_malloc(hdrlen+initlen+1);

sdsnewlen、sdslen,sdsavail

sdsnewlen返回的是char buf[]首元素的地址,这个值作为sds指针的值。这是sds中巧妙的地方。但是如何获取header地址,以及访问成员?毕竟相关字段是在柔性数组的前面。

  • sds - 1就是flag,由flag可以得知header类型,也就知道header的长度,减去header的长度得到header的地址,访问len成员即可得知buf长度。

    1
    #define SDS_HDR(T,s) ((struct sdshdr##T *)((s)-(sizeof(struct sdshdr##T))))
  • sdshdr5是一个特别的类型,长度和类型都隐藏在了flag里面。SDS_TYPE_5存储的是长度小于1 << 5的字符数组,也就是可以用5 bit来表示。这样加上3 bit的类型,合起来刚好8 bit,就是一个flag

    1
    *fp = type | (initlen << SDS_TYPE_BITS);

    因此flag的3 lsb作为类型,5 msb作为长度。但sdshdr5无法记录空闲的空间,因此无法扩容。

sdsgrowzero

  • 新长度小于1MB,则buf大小翻倍
  • 新长度大于等于1MB,则buf加1MB

sdstrim

trim后并未释放buf的空间。

提供了sdsRemoveFreeSpace函数,可以完成在尾部没有空闲的空间。

sds与c字符串的区别

  • 参数时间获取字符串长度
  • 记录长度,避免buffer overflow
  • 空间预分配、惰性释放,减少重分配次数
  • 二进制安全
  • 兼容部分c字符串函数

问题

  1. sds的定义使用了pack,内存没有对齐的情况下,会影响cpu访问内存的性能,这个是如何解决的?

References

  1. Redis源码阅读计划
  2. Redis源码从哪里读起?
  3. Redis 深度历险:核心原理与应用实践

RWMutex RLock重入导致死锁

RWMutex,即读写锁,可以被多个的reader或一个writer获取使用。

死锁例子

在使用RWMutex的时候,同一个reader是不应该连续调用Rlock多次的,这样做不但没有意义,还有可能导致死锁,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func main() {
var l = sync.RWMutex{}
var wg sync.WaitGroup
wg.Add(2)

c := make(chan int)
go func() {
l.RLock()
defer l.RUnlock()
c <- 1
runtime.Gosched()

l.RLock()
defer l.RUnlock()
wg.Done()
}()

go func() {
<-c
l.Lock()
defer l.Unlock()
wg.Done()
}()

wg.Wait()
}

sync.RWMutex分析

下面RWMutex的实现,我们来看这段代码的具体执行。为了方便理解,把if race.Enabled {...}的相关代码都去除了。

  1. goroutine 1l.RLock()

    1
    2
    3
    4
    5
    6
    func (rw *RWMutex) RLock() {
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
    // A writer is pending, wait for it.
    runtime_SemacquireMutex(&rw.readerSem, false, 0)
    }
    }

    执行l.RLock()后,goroutine 1获得写锁。

    • 状态:获得读锁
    • readerCount = 1,readerWait = 0
  2. goroutine 2l.Lock()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    func (rw *RWMutex) Lock() {
    // First, resolve competition with other writers.
    rw.w.Lock()
    // Announce to readers there is a pending writer.
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    // Wait for active readers.
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
    runtime_SemacquireMutex(&rw.writerSem, false, 0)
    }
    }

    由于goroutine 1已获得写锁,此时goroutine 2等待。

    • 状态:等待reader释放读锁
    • readerCount = 1 - rwmutexMaxReaders,readerWait = 1
  3. goroutine 1l.RLock()

    1
    2
    3
    4
    5
    6
    func (rw *RWMutex) RLock() {
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
    // A writer is pending, wait for it.
    runtime_SemacquireMutex(&rw.readerSem, false, 0)
    }
    }

    goroutine 1发现readerCount为负,认为有writer获得了写锁,接着也进入了等待状态。

    • 状态:等待
    • readerCount = 2 - rwmutexMaxReaders,readerWait = 1

最后goroutine 1和goroutine 2都进入了等待状态。

总结

  1. readerCount的作用?
    持有读锁的reader数。置为负时,代表了writer正在或者已经获得了读锁,此时其他reader不能再获得写锁。

  2. readerWait的作用,以及在Lock()中,为何需要同时判断r != 0atomic.AddInt32(&rw.readerWait, r) != 0
    置readerCount为负的时候,获得了写锁,但尚未RULock的reader数。writer需要等待这些reader执行结束。

    • r == 0,则无正在持有读锁的reader,可以直接完成读锁的加锁。

    • r != 0,writer需要等待获得了写锁,但尚未RULock的reader执行结束。如何判断是否需要等待呢,即readerWait不为0的时候。同时reader决定是否能唤醒writer,也需要等到readerWait为0的时候。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      func (rw *RWMutex) Lock() {
      rw.w.Lock()
      r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
      // 此刻起,其他reader不再能够获得读锁。
      // 此时,尚未释放写锁的reader数为readerWait个,等待他们结束才能完成读锁的加锁。
      if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
      runtime_SemacquireMutex(&rw.writerSem, false)
      }
      }

sync.WaitGroup使用的问题

例子

在实现一个需求的时候,需要等待一定数目的go协程执行完毕,但这个数目事先并不好确定。想到了可以用sync.WaitGroup来完成,在使用时候发现,Wait()没有生效,并未等待协程结束,代码大致如下,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func main() {
wg := sync.WaitGroup{}
wg.Add(1)
ch := make(chan struct{})

f := func(){
wg.Add(1)
defer wg.Done()
select {
case <- ch:
fmt.Println("hi")
}
}

go func() {
defer wg.Done()
go f()
go f()
go f()
}()
close(ch)
wg.Wait()
}

执行后程序不会有任何的输出就退出了。

sync.WaitGroup源码分析

Typically this means the calls to Add should execute before the statement creating the goroutine or other event to be waited for.

总结

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func main() {
wg := sync.WaitGroup{}
wg.Add(1)
ch := make(chan struct{})

f := func(){
defer wg.Done()
select {
case <- ch:
fmt.Println("hi")
}
}

go func() {
defer wg.Done()
wg.Add(1)
go f()
wg.Add(1)
go f()
wg.Add(1)
go f()
}()
close(ch)
wg.Wait()
}

References

  1. Waiting on an indeterminate number of goroutines

goroutine

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func process(ctx context.Context, wg *sync.WaitGroup, retCh chan int) {
defer wg.Done()
for {
time.Sleep(time.Second*1)
// process
// send process result back
select {
case <-ctx.Done():
retCh <- 1
retCh <- 2
return
default:
retCh <- 1
retCh <- 2
}
}
}

func loop(ctx context.Context, wg *sync.WaitGroup, retCh chan int) {
defer wg.Done()
wg.Add(1)
go process(ctx, wg, retCh)
for {
select {
case <-ctx.Done():
return
case ret := <-retCh:
fmt.Println(ret)

}
}
}

func main() {
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
retCh := make(chan int, 1)
wg.Add(1)
go loop(ctx, wg, retCh)

time.Sleep(time.Second*5)
cancel()
wg.Wait()
}