0%

结构

dict使用链地址法解决冲突,size\(2^n\)sizemask = size - 1,用于计算key所属的bucket上,避免了mod,还便于处理scan时发生rehash的情况。每个dict有两个dictht,多个出来的一个用于rehash。

使用了类似list中的方式来保存key和value,并提供了hashFunctionkeyDup等函数操作key和value。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
typedef struct dictEntry {
void *key;
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
struct dictEntry *next;
} dictEntry;

/* This is our hash table structure. Every dictionary has two of this as we
* implement incremental rehashing, for the old to the new table. */
typedef struct dictht {
dictEntry **table;
unsigned long size;
unsigned long sizemask;
unsigned long used;
} dictht;

typedef struct dict {
dictType *type;
void *privdata;
dictht ht[2];
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
unsigned long iterators; /* number of iterators currently running */
} dict;

添加元素

添加元素的过程, 1. dictAdd() 2. dictAddRaw()

* 如果需要rehash,`_dictRehashStep()`。
* `_dictKeyIndex()`查找entry。
* 如果找不到,创建新的entry,并插入链表的头部(这里假设了最近创建的会被更频繁的使用)。当rehash时,选择新的hash table,`d->ht[1]`进行操作。

1
2
3
4
5
6
7
8
ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
entry = zmalloc(sizeof(*entry));
entry->next = ht->table[index];
ht->table[index] = entry;
ht->used++;

/* Set the hash entry fields. */
dictSetKey(d, entry, key);
  1. _dictKeyIndex()

    • 检查是否需要扩张hash table(扩张或为d->ht[0]进行初始化),_dictExpandIfNeeded()
    • 使用hash & d->ht[table].sizemask计算所属bucket。
    • 查找entry。在rehash的时候,会在两个hash table中进行查找。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    for (table = 0; table <= 1; table++) {
    idx = hash & d->ht[table].sizemask;
    /* Search if this slot does not already contain the given key */
    he = d->ht[table].table[idx];
    while(he) {
    if (key==he->key || dictCompareKeys(d, key, he->key)) {
    if (existing) *existing = he;
    return -1;
    }
    he = he->next;
    }
    if (!dictIsRehashing(d)) break;
    }

扩张hash table

每次add都会调用_dictExpandIfNeeded()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static int _dictExpandIfNeeded(dict *d)
{
/* Incremental rehashing already in progress. Return. */
if (dictIsRehashing(d)) return DICT_OK;

/* If the hash table is empty expand it to the initial size. */
if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);

/* If we reached the 1:1 ratio, and we are allowed to resize the hash
* table (global setting) or we should avoid it but the ratio between
* elements/buckets is over the "safe" threshold, we resize doubling
* the number of buckets. */
if (d->ht[0].used >= d->ht[0].size &&
(dict_can_resize ||
d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))
{
return dictExpand(d, d->ht[0].used*2);
}
return DICT_OK;
}
  • 如果d->ht[0]为空,那么dictExpand()中为d->ht[0]初始化,初始大小为4。

    1
    d->ht[0] = n;

  • 如果load factor达到了1,还需要看dict当前是否可以resize,如果不可以,那么会提高load factor到5(有子进程时,为了避免COW内存复制),然后才会在dictExpand()中分配新的hash table。

    1
    2
    d->ht[1] = n;
    d->rehashidx = 0;

dictExpand()中,如果分配了新的hash table,那么会set d->rehashidx = 0;,标识开始rehash。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/* Expand or create the hash table */
int dictExpand(dict *d, unsigned long size)
{
/* the size is invalid if it is smaller than the number of
* elements already inside the hash table */
if (dictIsRehashing(d) || d->ht[0].used > size)
return DICT_ERR;

dictht n; /* the new hash table */
unsigned long realsize = _dictNextPower(size);

/* Rehashing to the same table size is not useful. */
if (realsize == d->ht[0].size) return DICT_ERR;

/* Allocate the new hash table and initialize all pointers to NULL */
n.size = realsize;
n.sizemask = realsize-1;
n.table = zcalloc(realsize*sizeof(dictEntry*));
n.used = 0;

/* Is this the first initialization? If so it's not really a rehashing
* we just set the first hash table so that it can accept keys. */
if (d->ht[0].table == NULL) {
d->ht[0] = n;
return DICT_OK;
}

/* Prepare a second hash table for incremental rehashing */
d->ht[1] = n;
d->rehashidx = 0;
return DICT_OK;
}

删除key

删除分为两种, * dictDelete():会把key,value和entry都真正的进行删除。最终是调用dictGenericDelete()进行删除。 * dictUnlink():只会把entry从链表中移除,并返回entry。最终是调用dictGenericDelete()进行删除。 这个函数主要用于,想把entry从hash table中移除,但同时又想使用这个entry的情况。如果没有这个函数,则需要进行两次查找。

1
2
3
entry = dictFind(...);
// Do something with entry
dictDelete(dictionary,entry);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) {
uint64_t h, idx;
dictEntry *he, *prevHe;
int table;

if (d->ht[0].used == 0 && d->ht[1].used == 0) return NULL;

if (dictIsRehashing(d)) _dictRehashStep(d);
h = dictHashKey(d, key);

for (table = 0; table <= 1; table++) {
idx = h & d->ht[table].sizemask;
he = d->ht[table].table[idx];
prevHe = NULL;
while(he) {
if (key==he->key || dictCompareKeys(d, key, he->key)) {
/* Unlink the element from the list */
if (prevHe)
prevHe->next = he->next;
else
d->ht[table].table[idx] = he->next;
if (!nofree) {
dictFreeKey(d, he);
dictFreeVal(d, he);
zfree(he);
}
d->ht[table].used--;
return he;
}
prevHe = he;
he = he->next;
}
if (!dictIsRehashing(d)) break;
}
return NULL; /* not found */
}

rehash

为了避免集中的rehash操作对性能的影响,redis使用的是渐进式rehash。dict.rehashidx初始化为-1,在rehash时,[0, rehashidnex)代表老的hash table已经迁移的bucket。

rehash会两种情况下发生, 1. 每次查询或更新操作时,都会调用dictRehash()执行一步rehash,一般来说这个函数只会移动一个bucket,也有可能一个都不移动,因为rehash时,每次检查至多10个bucket。

1
2
3
if (dictIsRehashing(d)) _dictRehashStep(d);

dictRehash(d,1);
  1. 如果server比较空闲,上述rehash的过程就会很慢,dict将会占用两个hash table较长的时间。在初始化redis时,serverCron()会作为计时器的回调函数定时执行。

    如果启用了server.activerehashing,执行过程中,会分配1ms的时间来执行dictRehash()

    • serverCron()
    • databasesCron()
    • incrementallyRehash()
    • dictRehashMilliseconds(server.db[dbid].dict,1),1ms
    • dictRehash(d,100)

int dictRehash(dict *d, int n)中,最多移动100个bucket,在移动每个bucket时,至多会检查n*10个bucket,因为有的bucket可能是空的,因此需要多看几个。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
int dictRehash(dict *d, int n) {
int empty_visits = n*10; /* Max number of empty buckets to visit. */
if (!dictIsRehashing(d)) return 0;

while(n-- && d->ht[0].used != 0) {
dictEntry *de, *nextde;

/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
assert(d->ht[0].size > (unsigned long)d->rehashidx);
while(d->ht[0].table[d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) return 1;
}
de = d->ht[0].table[d->rehashidx];
/* Move all the keys in this bucket from the old to the new hash HT */
while(de) {
uint64_t h;

nextde = de->next;
/* Get the index in the new hash table */
h = dictHashKey(d, de->key) & d->ht[1].sizemask;
de->next = d->ht[1].table[h];
d->ht[1].table[h] = de;
d->ht[0].used--;
d->ht[1].used++;
de = nextde;
}
d->ht[0].table[d->rehashidx] = NULL;
d->rehashidx++;
}

/* Check if we already rehashed the whole table... */
if (d->ht[0].used == 0) {
zfree(d->ht[0].table);
d->ht[0] = d->ht[1];
_dictReset(&d->ht[1]);
d->rehashidx = -1;
return 0;
}

/* More to rehash... */
return 1;
}

scan

scan可以用于遍历dict中的元素,初始提供一个cursor = 0,每次迭代返回一个新的cursor,当cursor再次为0的时候,遍历结束。

遍历每个bucket时,调用fn对每个元素de进行处理,这个处理可能是,复制de到其他地方,例如:void scanCallback(void *privdata, const dictEntry *de)

scan需要考虑是否正在rehash的情况, 1. scan过程中,dict没有变化

由于hash table的大小总是$2^n$,因此每个bucket的index总是`key & 2^n - 1`后的一个值,即`idx = hash & d->ht[table].sizemask`。

每次调用`scan`时,处理完毕一个bucket中的元素后,`v |= ~m0`保留cursor的低n个bit(在遍历结束前,`v == v | ~m0`),并加1。如果遍历结束,那么cursor值为`2^n - 1`,加1,并`v |= ~m0`后,`v`为0。则停止调用`dictScan`。

这里还看不出`reverse cursor`的特殊作用。无论高位加1并`rev()`,还是低位加1,都可以完成上述过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
t0 = &(d->ht[0]);
m0 = t0->sizemask;

/* Emit entries at cursor */
if (bucketfn) bucketfn(privdata, &t0->table[v & m0]);
de = t0->table[v & m0];
while (de) {
next = de->next;
fn(privdata, de);
de = next;
}

/* Set unmasked bits so incrementing the reversed cursor
* operates on the masked bits */
v |= ~m0;

/* Increment the reverse cursor */
v = rev(v);
v++;
v = rev(v);
  1. 如果正在rehash 如果正在rehash,scan是通过高位掩码的方式来完成扫描的。

    先来看bucket的计算方式,idx = hash & d->ht[table].sizemask。如果发生rehash,假设小表a大小为2^n,大表b为2^m。某个key,在两个表中的bucket分别为hash & d->ht[a].sizemaskhash & d->ht[b].sizemask。hash是一致的,这就意味着,两个bucket的低n位也是一致的。这是rehash时能保证元素一定会被扫描到的关键。

    scan时,t0(小表)索引为v & m0的bucket,然后扫描t1中索引低nv & m0的bucket。由于大表中低nv & m0的bucket是多个,因此需要在高m - n位不断递增。由于需要在高位不断递增,因此需要先rev(),然后再加1。

    总结来说,rehash时的扫描过程,其实就是通过一个函数,将一个bucket值xxxxx,映射到集合aa...axxxxx的过程。

    scan大表时的停止条件就是高m - n位都被扫描一遍,v & (m0 ^ m1)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    t0 = &d->ht[0];
    t1 = &d->ht[1];

    /* Make sure t0 is the smaller and t1 is the bigger table */
    if (t0->size > t1->size) {
    t0 = &d->ht[1];
    t1 = &d->ht[0];
    }

    m0 = t0->sizemask;
    m1 = t1->sizemask;

    /* Emit entries at cursor */
    if (bucketfn) bucketfn(privdata, &t0->table[v & m0]);
    de = t0->table[v & m0];
    while (de) {
    next = de->next;
    fn(privdata, de);
    de = next;
    }

    /* Iterate over indices in larger table that are the expansion
    * of the index pointed to by the cursor in the smaller table */
    do {
    /* Emit entries at cursor */
    if (bucketfn) bucketfn(privdata, &t1->table[v & m1]);
    de = t1->table[v & m1];
    while (de) {
    next = de->next;
    fn(privdata, de);
    de = next;
    }

    /* Increment the reverse cursor not covered by the smaller mask.*/
    v |= ~m1;
    v = rev(v);
    v++;
    v = rev(v);

    /* Continue while bits covered by mask difference is non-zero */
    } while (v & (m0 ^ m1));

结构

reids的list采用的是双向链表的实现,未使用dummy node。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
typedef struct listNode {
struct listNode *prev;
struct listNode *next;
void *value;
} listNode;

typedef struct listIter {
listNode *next;
int direction;
} listIter;

typedef struct list {
listNode *head;
listNode *tail;
// 节点值复制函数
void *(*dup)(void *ptr);
// 节点值释放函数
void (*free)(void *ptr);
// 节点值对比函数
int (*match)(void *ptr, void *key);
unsigned long len;
} list;

特点

  • 双端
  • 无环
  • 有头尾指针
  • 有长度计数器
  • 多态(使用void*来保存节点值,并提供freedupmatch来处理节点)

结构

sds主要由lenallocbuf构成。其中buf是柔性数组,分配sds的时候,这个结构体会作为header。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
typedef char *sds;

/* Note: sdshdr5 is never used, we just access the flags byte directly.
* However is here to document the layout of type 5 SDS strings. */
struct __attribute__ ((__packed__)) sdshdr5 {
unsigned char flags; /* 3 lsb of type, and 5 msb of string length */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr8 {
uint8_t len; /* used */
uint8_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr16 {
uint16_t len; /* used */
uint16_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};

// 如果没有packed,那么sizeof(sdshdr32) =
struct __attribute__ ((__packed__)) sdshdr32 {
uint32_t len; /* used */
uint32_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr64 {
uint64_t len; /* used */
uint64_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};

为sds分配空间,initlen是字符串的长度,1是末尾的'\0'。

1
sh = s_malloc(hdrlen+initlen+1);

sdsnewlen、sdslen,sdsavail

sdsnewlen返回的是char buf[]首元素的地址,这个值作为sds指针的值。这是sds中巧妙的地方。但是如何获取header地址,以及访问成员?毕竟相关字段是在柔性数组的前面。

  • sds - 1就是flag,由flag可以得知header类型,也就知道header的长度,减去header的长度得到header的地址,访问len成员即可得知buf长度。

    1
    #define SDS_HDR(T,s) ((struct sdshdr##T *)((s)-(sizeof(struct sdshdr##T))))

  • sdshdr5是一个特别的类型,长度和类型都隐藏在了flag里面。SDS_TYPE_5存储的是长度小于1 << 5的字符数组,也就是可以用5 bit来表示。这样加上3 bit的类型,合起来刚好8 bit,就是一个flag

    1
    *fp = type | (initlen << SDS_TYPE_BITS);

    因此flag的3 lsb作为类型,5 msb作为长度。但sdshdr5无法记录空闲的空间,因此无法扩容。

sdsgrowzero

  • 新长度小于1MB,则buf大小翻倍
  • 新长度大于等于1MB,则buf加1MB

sdstrim

trim后并未释放buf的空间。

提供了sdsRemoveFreeSpace函数,可以完成在尾部没有空闲的空间。

sds与c字符串的区别

  • 参数时间获取字符串长度
  • 记录长度,避免buffer overflow
  • 空间预分配、惰性释放,减少重分配次数
  • 二进制安全
  • 兼容部分c字符串函数

问题

  1. sds的定义使用了pack,内存没有对齐的情况下,会影响cpu访问内存的性能,这个是如何解决的?

References

  1. Redis源码阅读计划
  2. Redis源码从哪里读起?
  3. Redis 深度历险:核心原理与应用实践

RWMutex RLock重入导致死锁

RWMutex,即读写锁,可以被多个的reader或一个writer获取使用。

死锁例子

在使用RWMutex的时候,同一个reader是不应该连续调用Rlock多次的,这样做不但没有意义,还有可能导致死锁,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func main() {
var l = sync.RWMutex{}
var wg sync.WaitGroup
wg.Add(2)

c := make(chan int)
go func() {
l.RLock()
defer l.RUnlock()
c <- 1
runtime.Gosched()

l.RLock()
defer l.RUnlock()
wg.Done()
}()

go func() {
<-c
l.Lock()
defer l.Unlock()
wg.Done()
}()

wg.Wait()
}

sync.RWMutex分析

下面RWMutex的实现,我们来看这段代码的具体执行。为了方便理解,把if race.Enabled {...}的相关代码都去除了。

  1. goroutine 1l.RLock()

    1
    2
    3
    4
    5
    6
    func (rw *RWMutex) RLock() {
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
    // A writer is pending, wait for it.
    runtime_SemacquireMutex(&rw.readerSem, false, 0)
    }
    }

    执行l.RLock()后,goroutine 1获得写锁。

    • 状态:获得读锁
    • readerCount = 1,readerWait = 0
  2. goroutine 2l.Lock()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    func (rw *RWMutex) Lock() {
    // First, resolve competition with other writers.
    rw.w.Lock()
    // Announce to readers there is a pending writer.
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    // Wait for active readers.
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
    runtime_SemacquireMutex(&rw.writerSem, false, 0)
    }
    }

    由于goroutine 1已获得写锁,此时goroutine 2等待。

    • 状态:等待reader释放读锁
    • readerCount = 1 - rwmutexMaxReaders,readerWait = 1
  3. goroutine 1l.RLock()

    1
    2
    3
    4
    5
    6
    func (rw *RWMutex) RLock() {
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
    // A writer is pending, wait for it.
    runtime_SemacquireMutex(&rw.readerSem, false, 0)
    }
    }

    goroutine 1发现readerCount为负,认为有writer获得了写锁,接着也进入了等待状态。

    • 状态:等待
    • readerCount = 2 - rwmutexMaxReaders,readerWait = 1

最后goroutine 1和goroutine 2都进入了等待状态。

总结

  1. readerCount的作用? 持有读锁的reader数。置为负时,代表了writer正在或者已经获得了读锁,此时其他reader不能再获得写锁。

  2. readerWait的作用,以及在Lock()中,为何需要同时判断r != 0atomic.AddInt32(&rw.readerWait, r) != 0? 置readerCount为负的时候,获得了写锁,但尚未RULock的reader数。writer需要等待这些reader执行结束。

    • r == 0,则无正在持有读锁的reader,可以直接完成读锁的加锁。
    • r != 0,writer需要等待获得了写锁,但尚未RULock的reader执行结束。如何判断是否需要等待呢,即readerWait不为0的时候。同时reader决定是否能唤醒writer,也需要等到readerWait为0的时候。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    func (rw *RWMutex) Lock() {
    rw.w.Lock()
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    // 此刻起,其他reader不再能够获得读锁。
    // 此时,尚未释放写锁的reader数为readerWait个,等待他们结束才能完成读锁的加锁。
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
    runtime_SemacquireMutex(&rw.writerSem, false)
    }
    }

sync.WaitGroup使用的问题

例子

在实现一个需求的时候,需要等待一定数目的go协程执行完毕,但这个数目事先并不好确定。想到了可以用sync.WaitGroup来完成,在使用时候发现,Wait()没有生效,并未等待协程结束,代码大致如下,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func main() {
wg := sync.WaitGroup{}
wg.Add(1)
ch := make(chan struct{})

f := func(){
wg.Add(1)
defer wg.Done()
select {
case <- ch:
fmt.Println("hi")
}
}

go func() {
defer wg.Done()
go f()
go f()
go f()
}()
close(ch)
wg.Wait()
}

执行后程序不会有任何的输出就退出了。

sync.WaitGroup源码分析

Typically this means the calls to Add should execute before the statement creating the goroutine or other event to be waited for.

总结

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func main() {
wg := sync.WaitGroup{}
wg.Add(1)
ch := make(chan struct{})

f := func(){
defer wg.Done()
select {
case <- ch:
fmt.Println("hi")
}
}

go func() {
defer wg.Done()
wg.Add(1)
go f()
wg.Add(1)
go f()
wg.Add(1)
go f()
}()
close(ch)
wg.Wait()
}

References

  1. Waiting on an indeterminate number of goroutines

goroutine

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func process(ctx context.Context, wg *sync.WaitGroup, retCh chan int) {
defer wg.Done()
for {
time.Sleep(time.Second*1)
// process
// send process result back
select {
case <-ctx.Done():
retCh <- 1
retCh <- 2
return
default:
retCh <- 1
retCh <- 2
}
}
}

func loop(ctx context.Context, wg *sync.WaitGroup, retCh chan int) {
defer wg.Done()
wg.Add(1)
go process(ctx, wg, retCh)
for {
select {
case <-ctx.Done():
return
case ret := <-retCh:
fmt.Println(ret)

}
}
}

func main() {
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
retCh := make(chan int, 1)
wg.Add(1)
go loop(ctx, wg, retCh)

time.Sleep(time.Second*5)
cancel()
wg.Wait()
}

之前有记录过python Queue的使用,以及multiprocessing.Process模块。现在看看multiprocessing.Queue的具体工作方式(本文基于Python 3.7.4)。

multiprocessing.Queue定义在queues.py中,除此之外还定义了SimpleQueueJoinableQueue,是FIFO队列。

类似multiprocessing.Process,首先导入了context,判断当前系统类型,并相应地使用对应的实现。使用的multiprocessing.Queue()的时候,实际上是调用了context中的Queue()方法,设置了ctx

1
2
3
4
def Queue(self, maxsize=0):
'''Returns a queue object'''
from .queues import Queue
return Queue(maxsize, ctx=self.get_context())

SimpleQueue

SimpleQueue很简单,其实就是一个带锁的pipe, * 主进程和子进程分别使用各自的lock,实现写入pipe和读取pipe是并发安全的 之所以put()get()可以使用不同的lock,是因为pipe两端的读写已经是并发安全的了。 * 用multiprocessing.Pipe来实现消息传递,支持多读多写

关于os.pipemultiprocessing.Pipe * os.pipe:在Linux上底层访问的是传统的POSIX pipes,单向,使用encode/decode序列化 * multiprocessing.Pipe:使用multiprocessing.Connection实现,在Linux上使用POSIX sockets完成数据发送,双向,使用pickle/unpickle序列化

Queue

Queue可以看做在SimpleQueue的基础上,增加了生产者端的发送buffer、支持设置队列大小,以及get()put()的无阻塞调用。

python queues

初始化

__init__()初始化了这么几个比较重要的变量, * _maxsize:队列最大size * _reader_writermultiprocessing.Connection实例,负责数据的收发 * _rlockmultiprocessing.Lock,进程间共享,保证_reader.recv_bytes()并发安全 * _wlockmultiprocessing.Lock,进程间共享,保证_writer.send_bytes()并发安全 * _sem:队列长度信号量,计数器初始化为_maxsize * _notempty:条件变量,同步生产者放入_buffer_buffer中数据的发送 * _buffer:每个进程有自己独立的buffer,线程安全,size其实是由_sem来控制 * _thread:生产者数据发送线程

put

首先_sem.acquire(),计数器-1,如果不为零,说明队列还可以写入。

1
2
if not self._sem.acquire(block, timeout):
raise Full

然后获得_notempty的锁,如果发送线程未创建,则创建。追加元素至buffer后,_notempty.notify()通知发送线程。生产者中数据的发送由单独的线程完成,主线程只负责将数据放入buffer。

1
2
3
4
5
with self._notempty: # acquire保护_notempty和_thread的修改
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
self._notempty.notify()

feed线程

_start_thread()创建thread对象后,设置daemon属性为True,目的是随主线程的退出而退出,不必手动添加停止的逻辑。接着就启动线程。

具体发送时,不断等待buffer中有元素;如果buffer中有元素,popleft()并发送,直到发送完。

为何_notempty的锁在popleft()就释放了? 1. buffer是collections.deque,本身就是并发安全的 2. _notempty锁的目的其实是为了保护_notempty的修改,和put()中的目的类似,但并不是为了保护buffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
while 1:
try:
nacquire()
try:
if not buffer:
nwait()
finally:
nrelease()
try:
while 1:
obj = bpopleft()
if obj is sentinel:
debug('feeder thread got sentinel -- exiting')
close()
return

# serialize the data before acquiring the lock
obj = _ForkingPickler.dumps(obj)
if wacquire is None:
send_bytes(obj)
else:
wacquire()
try:
send_bytes(obj)
finally:
wrelease()
except IndexError:
pass
except Exception as e:
# ......

get

对于block=True的情况 类似SimpleQueue,一直等待_reader.recv_bytes(),直到收到数据。在_reader.recv_bytes()以后,_sem.release(),计数器+1,表示从队列消耗了一个元素。此时如果有进程调用_sem.acquire()并在等待,那么_sem.release()会唤醒其中一个等待的进程。

对于block=False的情况 由于不能阻塞,因此不能一直等待_reader.recv_bytes()

  1. 如果在timeout_rlock都没有获得锁,则返回Empty
  2. _reader.poll()(底层还是使用select来实现的)判断是否在timeout_reader可读,如果不可读,则返回Empty
  3. 最后_reader.recv_bytes(),并_sem.release()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def get(self, block=True, timeout=None):
if block and timeout is None:
with self._rlock:
res = self._recv_bytes()
self._sem.release()
else:
if block:
deadline = time.monotonic() + timeout
if not self._rlock.acquire(block, timeout):
raise Empty
try:
if block:
timeout = deadline - time.monotonic()
if not self._poll(timeout): # 此时的timeout已经减去了等待_rlock.acquire的时间
raise Empty
elif not self._poll():
raise Empty
res = self._recv_bytes()
self._sem.release()
finally:
self._rlock.release()
# unserialize the data after having released the lock
return _ForkingPickler.loads(res)

序列化和反序列化

序列化和反序列化使用的是pickle来实现,那么如何判断消息的边界?Connection中定义了一个规则,在header存放长度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def _send_bytes(self, buf):
n = len(buf)
# For wire compatibility with 3.2 and lower
header = struct.pack("!i", n)
if n > 16384:
# The payload is large so Nagle's algorithm won't be triggered
# and we'd better avoid the cost of concatenation.
self._send(header)
self._send(buf)
else:
# Issue #20540: concatenate before sending, to avoid delays due
# to Nagle's algorithm on a TCP socket.
# Also note we want to avoid sending a 0-length buffer separately,
# to avoid "broken pipe" errors if the other end closed the pipe.
self._send(header + buf)

def _recv_bytes(self, maxsize=None):
buf = self._recv(4)
size, = struct.unpack("!i", buf.getvalue())
if maxsize is not None and size > maxsize:
return None
return self._recv(size)

close和join_thread

由于生产者启动了一个线程来负责发送,元素首先append到buffer,然后发送。在进程结束时,如何确保元素发送完毕的问题?

在启动feed线程的时候,创建了两个Finalize对象,_finalize_close_finalize_join,前者的优先级较高,并set了_close_jointhread。当进程退出的时候,会自动地先后调用这两个函数(基于atexit实现), 1. _finalize_close:会append元素_sentinel = object()到buffer,feed线程如果看到_sentinel,会调用close()并停止服务 2. _finalize_join:join feed线程

1
2
3
4
5
6
7
8
9
10
11
12
13
if not self._joincancelled:
self._jointhread = Finalize(
self._thread, Queue._finalize_join,
[weakref.ref(self._thread)],
exitpriority=-5
)

# Send sentinel to the thread queue object when garbage collected
self._close = Finalize(
self, Queue._finalize_close,
[self._buffer, self._notempty],
exitpriority=10
)

模块同样也提供了close()join_thread()方法,调用的其实就是_close_jointhread。如果先手动调用,然后aexit调用,那不会回有问题?首次调用后,都会从util模块的_finalizer_registry中移除,因此不会存在重复调用_finalize_close_finalize_join的问题。

deadlock

这几个stackoverflow(123)的问题很类似,在官方的文档中也有提及(python 2python 3),总结下来就是,join一个调用put的进程,且这个进程尚未把buffer中所有的元素写入pipe时,可能会导致死锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import os
import multiprocessing

def work(q):
q.put('1'*10000000)

if __name__ == "__main__":
print(os.getpid())
q = multiprocessing.Queue()

p = multiprocessing.Process(target=work, args=(q,))

p.start()
p.join()

print(q.get())

原因是,Queue使用了os的pipe来进行数据的传输,而pipe的大小是有限的。如果数据过大,_writer.send_bytes()在写入数据到pipe的时候会阻塞。如果此时join这个子进程,那进程本身已经卡住了,join永远等不到进程结束。

JoinableQueue

JoinableQueue基于Queue实现,覆盖了put(),并新增了, * task_done():表示先前放入队列中的元素被取走了,由消费者调用 * join():阻塞直到队列中所有元素都被取走

初始化

__init__()调用了Queue的初始化函数,并额外初始化了, * _unfinished_tasks:信号量,表示队列中当前未取走的元素 * _cond:条件变量,同步join()task_done()

put

Queueput()基本一致。多出来的一点是,每次put()都会对_unfinished_tasks的计数器+1。

1
2
3
4
5
6
7
8
def put(self, obj, block=True, timeout=None):
# ...
with self._notempty, self._cond:
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
self._unfinished_tasks.release()
self._notempty.notify()

task_done和join

task_done()_unfinished_tasks的计数器进行-1,如果计数器为0,则通知等待在join()的进程。

1
2
3
4
5
6
7
8
9
10
11
def task_done(self):
with self._cond:
if not self._unfinished_tasks.acquire(False):
raise ValueError('task_done() called too many times')
if self._unfinished_tasks._semlock._is_zero():
self._cond.notify_all()

def join(self):
with self._cond:
if not self._unfinished_tasks._semlock._is_zero():
self._cond.wait()

Reference

  1. Python os.pipe vs multiprocessing.Pipe
  2. Daemon is not daemon, but what is it?
  3. Python 中的 multiprocess.Queue
  4. Python 3 Multiprocessing queue deadlock when calling join before the queue is empty
  5. Process.join() and queue don't work with large numbers
  6. Script using multiprocessing module does not terminate
  7. https://docs.python.org/3/library/multiprocessing.html#all-start-methods
  8. Joining multiprocessing queue takes a long time

之前有记录过python进程间通信的几个方式,现在来看看这个模块的具体的是怎样工作的(本文基于Python 3.7.4)。

multiprocessing.Process典型的使用方式为,

1
2
3
4
5
6
7
8
import multiprocessing

def f(name):
print('hello', name)

p = multiprocessing.Process(target=f, args=('world',))
p.start()
p.join()

以这段代码为例,看看进程的执行过程。

python process

import

import multiprocessing,导入multiprocessing的时候,做了一些初始化的工作。 1. multiprocessing的包导入了context,这个模块主要是判断当前系统类型,并相应地使用对应的实现。在sys.platform != 'win32'的系统上,例如:mac和linux,默认使用fork来创建子进程。 2. context模块中, * 导入了process * 定义了Process类,这个类继承自process.BaseProcess,覆盖了process.BaseProcess_Popen方法,目的是根据系统类型,在初始化的时候根据系统类型,选择相应的Popen实现 3. process模块中, * 导入了util,注册了_exit_function,被注册的函数会在解释器正常终止时执行(子进程主动调用,主进程自动调用) * 此时在父进程里,初始化了几个process模块的全局变量, * _current_process:代表当前进程 * _process_counter:进程数计数器 * _children:子进程的集合

创建Process实例

p = multiprocessing.Process(target=f, args=('world',)),实际上是调用process.BaseProcess__init__()方法进行初始化,并使用了process模块的全局变量初始化实例变量,以及获取当前进程的pid, * count = next(_process_counter) * self._identity = _current_process._identity + (count,) * self._config = _current_process._config.copy() * self._parent_pid = os.getpid()

启动

p.start()创建子进程,首先做了几个检查, 1. 如果已经创建了子进程,那么不能再次创建 2. 只能start由当前进程创建的Process实例 3. 不允许创建daemon进程的子进程

接着_children中清理当前已经结束的进程,然后调用self._Popen(self)开始创建子进程。使用os.fork()创建子进程,用法和posix的fork一样,不多说。要注意的一点是,调用os.pipe()创建了parent_rchild_w,而parent_r将会被用于join()的实现。

子进程中,执行_bootstrap()进行初始化和运行target, 1. 初始化了process模块的全局变量,_current_process_process_counter_children 2. 清空util._finalizer_registry,执行util._run_after_forkers() 3. run()运行target 4. util._exit_function()做进程结束后的收尾工作 * util._exit_function()会调用_run_finalizers(),这个函数会把优先级高于minpriorityfinalizer执行一遍

1
2
3
4
5
6
7
def _run_finalizers(minpriority=None):
'''
Run all finalizers whose exit priority is not None and at least minpriority

Finalizers with highest priority are called first; finalizers with
the same priority will be called in reverse order of creation.
'''
* 默认情况下子进程的_finalizer_registry是空的,没有任何的finalizer会被执行,但可以通过multiprocessing.util.Finalize手动的进行注册,来完成一些收尾工作,例如:关闭db连接。
1
2
3
4
5
6
7
class Finalize(object):
'''
Class which supports object finalization using weakrefs
'''
def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
# ......
_finalizer_registry[self._key] = self

join

创建子进程的时候,也创建了pipe parent_r,并set self.sentinel = parent_r,且关闭了child_w。此时唯一打开child_w的进程是子进程,唯一打开parent_r的是主进程。主进程调用join()时,实际上是等待parent_r变为可读状态(wait返回)。

1
2
3
from multiprocessing.connection import wait
if not wait([self.sentinel], timeout):
return None

那么何时wait返回?wait循环调用select,当parent_r ready的时候,wait返回, * parent_r可读 * 所有writing side被关闭

当子进程结束的时候,os会关闭这个进程关联的所有fd,又因为主进程已经关闭了child_w,所以此时parent_r ready,主进程中的join()也就返回了。

Reference

  1. multiprocessing — Process-based parallelism
  2. Why should you close a pipe in linux?

介绍

文章主要使用实验验证了raft是否如原论文所阐述的易于理解和实现。重新阐述了raft,使用OCaml实现了raft,开发了一个事件驱动的模拟器来进行测试,重现了raft原论文中的测试,并提出了几个优化。

性能

对于raft的性能,raft原论文中提到了两点, 1. 典型的情况下,leader复制entry的时间。 2. 最坏情况下,leader失败后,选举出新leader的时间。 1. 选举能否快速收敛? 2. 集群能达到的最小下线时间是什么?

典型的情况下,leader复制entry的时间

需要耗费一个rtt来复制到多数派server,可以做batching和piplining优化。

最坏情况下,leader失败后,选举出新leader的时间

关于这点,raft原论文中使用5个server的集群做了两个测试。为了模拟最坏情况, * 每次测试server都有不一样长度的log,使得有的candidate不能成为leader。 * 为了制造易于出现split vote的环境,在每次终止leader前,都同步地广播heartbeat,目的是重置election timeout。

-w302

两个测试表明了: 1. election time中加入少量的随机,能够明显的减少选举新leader的时间,减少split vote的出现。 2. 集群的最小下线时间随着election time的减少而减少,但如果少于broadcast time,那么会集群产生不必要选举,降低可用性。

优化

本文中提出了几个优化, 1. Different Follower and Candidate Timers 2. Binary Exponential Backoff 3. Combined

-w293

Different Follower and Candidate Timers

raft原论文建议\(\textit{candidate timeout} = \textit{follower timeout} ∼ U(T, 2T), T=150ms\),但在高竞争的情况下,例如:\(U(150ms, 151ms)\),将两个时间设置在不同的范围,可以大大减少选举leader的时间。

Binary Exponential Backoff

类似tcp超时重传的指数回退。

Readings - In Search of an Understandable Consensus Algorithm (Extended Version) (Section 6 to end)论文

集群成员变更

实际应用中,常常会有配置变更的需求,即:成员变更。手动的方式有下面两种, * 把集群整体下线,配置修改完毕以后再上线是可行的,但会造成服务不可用。 * 新server可以通过获取其ip来替换集群成员,需要保证被替换的server不会再加入集群。

但这两个方式都有明显的弊端,且任何手动的步骤都有引起错误的可能。

配置切换需要保证安全性,在同一个term内,集群不能够同时存在两个leader。由于无法一次性原子的切换所有server的配置,一次增减多个server并直接切换配置可能会出现disjoint majorities的情况。

raft变更的方案有两种: 1. single-server change 2. 使用joint consensus

single-server change

每次增减一个server。

配置的变更 具体变更过程如下, 1. leader收到变更请求,AppendEntries RPC按新配置发送\(C_{new}\)。 2. 每个server收到\(C_{new}\)立即生效。 3. 新配置下,\(C_{new}\)复制到大多数server,则达成committed。 * 此时,就算有剩下的server未得到新配置,也不会构成多数派, * 且,未得到新配置的server也不被选举为leader。

\(C_{new}\)提交后, 1. leader可以响应client,告知本次配置变更已经完成。 2. 如果配置是移除一个server,那么这个server可以下线了。 3. 可以开始下一次配置更新。

安全性 -w543

总共有四种情况:

member change 变更后达成disjoint majorities的条件
奇数个成员,增加一个 2k+1 -> 2k+2 old = k+1, new = k+2
奇数个成员,减少一个 2k+1 -> 2k old = k+1, new = k+1
偶数个成员,增加一个 2k -> 2k+1 old = k+1, new = k+1
偶数个成员,减少一个 2k -> 2k-1 old = k+1, new = k

任意一种情况对应的条件都是不可能同时达成的,因为要求的成员数目都大于真正的成员数目,不会产生同一个term两个leader的现象。换句话说,旧配置集群与新配置集群的任意多数派必然有交集,即:至少存在一个voter(接受旧leader的\(C_{new}\),并且为新leader投票),不会出现disjoint majorities。

因此增减一个server情况,直接切换配置是安全的。

这个交集也保证了在变更配置的过程中,在\(C_{old}\)中、以及变更期间复制的日志,最后一定会出现在\(C_{new}\)

何时开始下一次变更 能够开始下一次配置更新的前提是当前的配置已经commit,否则无法保证安全性。如果server在\(C_{new}\)commit以后才使用\(C_{new}\),会带来很多不必要的、额外的维护工作, 1. leader很难知道旧配置集群的多数派使用\(C_{new}\)的时间。 2. 需要跟踪哪些server知道了commit,且做持久化。但这些是raft本身不具备的功能。 3. 如果leader改变了,那么需要移除\(C_{new}\)的entry,此时,server还需要准备回滚到上一个配置。

majority的是对谁而言的 对于选举和append entry,都是仅由调用方来判断是否达成多数派,接收方不负责,否则会存在类似“鸡生蛋蛋生鸡”的问题。

可用性 配置变更给保证集群的可用性带来了几个问题。 1. Catching up new servers -w517

一个新server加入集群,新server通常并不包含任何entry,那么可能需要花费较长的时间来同步日志。在这段时间,集群更容易出现不可用的问题。例如:3->4,此时要求的majority是3,但是s3挂了。

为了最小化不可用的出现,需要*保证不可用的时间在一次election timeout内*。

![-w530](/images/2019/15662902585664.jpg)

具体方法是,
* 新加入的server先作为non-voting成员。
* 复制到新server的过程分为多个round,每个round都复制leader所有的entry。
* 当前复制的round内,leader可能又有新的entry了,下一个round会进行复制。
* 在固定round内(例如:10),如果最后一个round的时间 < election timeout,此时假设不存在更多的entry会导致明显的不可用,添加新server。
* 否则leader终止变更配置。
  1. Removing the current leader 如果使用joint Consensus,或没有leadership transfer的情况下,需要一个leader下线的方法:旧leader等到\(C_{new}\) commit以后让位(转变为follower状态)。

    在commit之前,当前leader管理集群不包含leader自己,复制和投票的时候不把自己算入majority。

  2. Disruptive servers 被排除在\(C_{new}\)之外的server,由于不再收到heartbeat,会不断的发起投票。虽然新选出的leader始终会出现在\(C_{new}\)中,但是这干扰了集群正常的工作。

    第一个思路是引入一个Pre-Vote阶段,在发起选举前,检查自己是否有成为leader的资格,即:candidate的log比大多数server更新。但并不总是有效。例如:{ABCD}->{ABC}的时候,A是Leader,在尝试复制\(C_{new}\)到BC的时候,D可能发起了Pre-Vote,D的log相对于BC足够新,可以获得BC的投票成为leader。因此检查log的方式是不可行的。

    raft使用的方式是,如果一个server获得上一次heartbeat的时间在最小election timeout内,这个server收到RequestVote时就不更新term或投票。

    如果确实需要发起选举,例如:进行leadership transfer的时候,可以用一个标志位来区分。

bug in single-server change 如果配置变更是在同一个term内完成的,那么不会有问题。但如果出现在跨term且并发的配置变更,就不一定了。

例如先后增减一个server,具体过程如下, -w630

2中,s1把D复制到s1和s5然后挂了,接着s2接受s2、s3、s4的投票(使用C判断majority)成为term2的leader。5中,s2把E复制到s2和s3,并标记为committed(使用E判断majority)。然后s1恢复,接受s1、s4、s5的投票(使用D判断majority)成为term3的leader,继续复制D,最后在7中覆盖已提交的E。

这个问题类似提交上一个term的entry,解决方法是一样的,leader当选以后,直到当前term的entry提交以后,才能开始下一次配置变更。可以通过append一个no-op entry来实现。

原文的single-server change保证了,在同一个term内不会出现未提交的configuration entry。这个patch保证了,来自先前term未提交的configuration entry永远不会被提交。

回到前面的例子,3中s2成为term2的leader以后立即append no-op entry,此时使用C判断majority,假设复制到s2、s3、s4的index 2。接着s2继续把E复制到s2和s3。如果接下来s1恢复并发生了选举,s1不可能成为leader,因而避免了已提交的E被覆盖的情况。

使用joint consensus

这个方法并不建议在工程中使用,更简单的single-server change足以将集群变更为任何期望的配置。

-w344

joint consensus joint consensus状态混合了新旧配置,允许每个server在不同的时间安全地切换配置,且在这个过程中能持续提供服务,这个状态中,

  • entry会被复制到所有新旧配置。
  • 来自任何配置的机器都可以被选举为leader。
  • 选举和append的majority,需要分别来自新旧配置。

相比single-server change,joint consensus引入了一个中间的entry \(C_{old,new}\),具体过程是, 1. 将新旧配置存储到\(C_{old,new}\),并复制,进入joint consensus状态。 2. 每个server收到\(C_{old,new}\)立即生效,leader使用\(C_{old,new}\)来判断是否提交。 \(C_{old,new}\)复制的过程中,如果leader挂了,那么新的leader可能在\(C_{old,new}\)\(C_{old}\)中选举出。无论leader来自哪个配置,\(C_{new}\)不能单方进行决策。 3. \(C_{old,new}\)提交后,leader可以复制\(C_{new}\)。 一旦\(C_{old,new}\)提交,\(C_{new}\)\(C_{old}\)都不能单方进行决策。 4. leader使用\(C_{new}\)来判断是否提交,提交后,完成配置变更。

安全性 在joint consensus过程中,发生选举时,可能从以下情况选出leader(按joint consensus的步骤顺序列举), 1. 来自\(C_{old}\),log不包含\(C_{old,new}\)。 2. 来自\(C_{old}\),log包含\(C_{old,new}\)。 3. 来自\(C_{new}\),log包含\(C_{old,new}\)。 4. 来自\(C_{new}\),log包含\(C_{new}\)

而任何两个leader的组合都是不可能同时出现的。

leader组合 不可能出现的原因
1+1 or 4+4 选举规则限制
1+2 先看2的选举,需要分别来自新旧配置的多数派,此时已经不能再从\(C_{old}\)中选举1
1+3 类似1+2
1+4 a. 先看4,既然\(C_{new}\)出现了,那么\(C_{old,new}\)肯定提交了,这个提交需要分别来自新旧配置的多数派,因此\(C_{old}\)中不包含\(C_{old,new}\)的server无法选举为leader
b. \(C_{new}\)是在\(C_{old,new}\)提交后才复制,如果选举出4,1就不会存在
2+2 类似1+2
2+3 类似1+2
2+4 类似1+2
3+3 类似1+2
3+4 类似1+2

因此不会出现disjoint majorities的情况。

是否受single-server change的bug影响 不受。

日志压缩

raft的日志随着客户端不断的请求增长。一旦entry已经提交并执行,那么中间的状态和操作就不再需要,可以被压缩。

文章讨论了几种进行日志压缩的方法, * Snapshotting memory-based state machines * Snapshotting disk-based state machines * Incremental cleaning approaches * Leader-based approaches

这几个方法有一些核心概念基本都是相通的, 1. 每个server独立的负责日志压缩,而非由leader集中决定。 2. raft向状态机转移维护prefix of the log的职责。 3. raft丢弃部分日志前缀a prefix of the log后,状态机会承担两个新的职责 1. 如果server重启,状态机在apply entry前,需要先load那些被丢弃的日志。 2. 为了落后较多的server或新server能够追上,状态机可能需要输出一个状态的镜像。

Snapshotting memory-based state machines

适用于状态机的数据是存放在内存的情况。每个server独立的创建已经提交entry的snapshot。主要过程是, * 状态机序列化当前状态。 * 一旦状态机完成snapshot的写入以后,日志就可以被截断了,raft首先保存snapshot中lastIncludedIndex和lastIncludedTerm,以及这个index对应的lastIncludedConf。 * raft可以丢弃截止index的entry和先前的snapshot。

-w318

InstallSnapshot 为了落后较多的server或新server能够追上,这个方法里使用InstallSnapshot来实现。leader仅当丢弃了需要复制的next entry的时候,才发送snapshot,snapshot以chunks的形式有序发送。

并发创建 创建snapshot时,状态机需要维持一个不变的状态,但进行序列化和落盘需要较长的时间,因此创建的过程需要与普通操作并发执行。可以使用copy-on-write实现,有两种方法, 1. 状态机使用不可变数据结构。 2. 依赖os的copy-on-write支持,例如:linux的fork。

copy-on-write占用额外的内存,在创建的过程中,占用的额外内存与状态的修改成正相关,因此需要事先计划和管理。如果在snapshot的过程中,内存满了,那么server只能暂停服务,此时集群可能还是可用的。最好不要终止稍后重试,下次创建的时候很可能还会有类似的问题。

何时创建 如果创建的过于频繁,会浪费磁盘带宽和其他资源,如果过于稀少,会导致创建出过大的snapshot,增加传输和回放的时间。

有这么几个判断的方法, * 如果size(log)明显大于一个预定的值。 当这个值明显大于snapshot的大小时,磁盘写入开销会很小。但对于较小的状态机,需要等待较长的时间才会有满足大小要求的log。 * 如果size(log)大于size(snapshot)的倍数。 不过判断当前状态机的snapshot大小并不容易。 * 如果size(log)大于size(prev snapshot)的倍数,expansion factor。 expansion factor控制了磁盘带宽的开销。

还可以仅在少数派server上创建snapshot,不影响服务client。

Client交互

查找cluster

如果配置固定,这个过程很简单。难点在于成员不断变更的情况,可用的方法有, * 广播,但受限于特定的网络环境。 * 使用外部的目录服务,例如:DNS。需要在变更的过程中增减相应的server。

路由请求到leader

client的请求是由leader处理的,因此client需要找到leader,可以随机的选取一个server发起请求,如果不是leader,server拒绝,client重试直到找到为止,尝试次数期望是\((n+1)/2\)。在此基础上可以做一些优化, * server拒绝的时候返回leader。 * server做代理,转发请求到leader。

还需要避免过期的leader信息导致处理client请求的时候产生不必要的延迟, * leader:如果产生网络分区,且client向拥有少数派的leader发送了请求,在分区恢复前,这个请求一直都无法得到处理。因此当超过election timeout以后,leader都没有向多数派成功的发送心跳,那么leader让位。 * follower:如果follower发起新的选举或者term变更,那么follower丢弃当前维护的leader信息。 * client:当丢失与某个server的连接,应该随机选取一个server进行重试。

线性一致性

截止目前,raft提供了at-least-once的语义。client重试、以及网络导致请求重复会导致命令被执行多次。但是at-least-once对于一个基于共识的系统是不够的,raft需要可线性化的语义,通过对请求去重,可以实现这一点。

使用session去重 每个client分配一个唯一id,每个请求分配一个唯一的递增序号。server维护每个client的session,这个session跟踪了每个client的最新序号和对应的response,如果收到了一个已经处理过的序号,那么直接返回。

这样每个命令就做到了以log中第一次出现的顺序立即生效且只执行一次。

对于来自同一个client的并发请求,server维护一个<序号,response>的集合。每个请求中携带一个client未收到的最小序号,server丢弃小于这个序号的response。

session保存的多久 受存储的限制,session不能永久保存,server需要对何时过期session达成共识。 * 一个方法是设置存储session数的上限,并使用lru淘汰session。 * 另一个方法是基于对时间源达成的共识来淘汰session(原文中的描述不是非常清晰,待补充)。

处理session过期的client请求 当session过期后,client还继续操作时,这被看做异常情况(待补充)。

更高效的处理read-only请求

raft日志的目的是以相同的顺序把变更复制到server上,并保证读写时候的线性一致性语义。read-only命令只涉及查询状态机,可以绕开日志的复制,避免同步磁盘写入,会大大的提升性能。但如果没有额外的控制,client会读取到过期的值。

为了使得绕开raft日志的read-only请求保持线性一致性,针对每次read-only请求,leader需要, 1. 如果当前term还没有提交过entry,等待直到有。如果是刚成为leader,则需要先提交一个no-op entry。 2. leader将当前的commit index记录到本地变量readIndex,这个会作为read-only操作的下界。 3. leader需要自己的身份是有效的,不存在在自己不知情的情况下(网络分区)选举出了新的leader。这里的方法与路由请求到leader中避免过期的leader类似,如果成功向多数派发送了heartbeat,那么leader可以知道在发送heartbeat的时候,身份仍然是有效的。 4. leader等待lastApplied >= readIndex,此时的readIndex是能保证线性一致性的最小index。 5. leader向自己的状态机发起查询请求,并返回结果。

优化leadership确认 每次查询请求都需要执行3,可以把所有累计的查询通过一次heartbeat来确认leadership。

follower分担read-only负载 同样需要保证不读取到过期的数据,保证线性一致性。为此follower可以向leader发送一个查询当前readIndex的请求,然后leader执行上面的1-3,follower执行4-5。

使用时钟减少heartbeat带来的延迟 虽然有batch优化,read-only查询仍然需要做一次heartbeat来确认leadership,可以用时钟来避免heartbeat带来的延迟。

-w262
  • leader使用heartbeat来维持一个lease,如果leader成功向多数派发送了heartbeat,那么leader可以认为在接下来的election time时间内都不会有新的leader产生,这个lease可以扩展到\(start+\frac{\textit{election timeout}}{\textit{clock drift bound}}\),在这个时间之前都不用执行上面的步骤3。
  • 在进行leadership transfer的时候需要将lease主动过期,因为会导致更早的产生新leader。

要注意的是,使用lease的方式假设了server之间时钟漂移的上界(在给定的一段时间内,没有server的时钟增加的时间会超过这个上界),找到并维护这个值会增加额外的运维成本。如果假设失效了,系统可能会返回任意过期的信息。

可以使用一个的扩展来增强对client提供的保证,即使上述假设失效的情况下,读操作满足线性一致性,不至于错的离谱。具体方法是, 1. server返回client的时候,带上状态机状态对应的index。 2. client跟踪自己看到的与结果对应的最新index,发送请求的时候带上这个index。 3. 如果server收到的index > lastApplied,那么server暂时不处理这次请求。

问题

  1. 创建snapshot的时候,有什么限制?
    • 不能丢弃未提交的和未执行的
    • 已执行的entry可能需要用于使其他server更新
  2. snapshot和log的关系? snapshot反映的是已经执行的log。
  3. server持久化了哪些数据在磁盘上?
    • 截止到某个entry的snapshot + 后续的log = server完整的log。
    • 其他状态信息,如:currentTerm,votedFor等。
  4. 如果某个follower落后了,同时leader丢弃了follower的所需的log,怎么办? nextIndex将无法会退到那个entry,leader会使用InstallSnapshot RPC。
  5. leader何时会向落后follower发送InstallSnapshot RPC? 上面的问题即为答案。
  6. 为何leader不仅仅丢弃所有follower都有的entry?
    • 每个server是独立创建snapshot的。
    • 少数落后或失败的follower会导致leader log的持续增加。
  7. snapshot包含什么信息?
    • term
    • lastIncludedIndex
    • lastIncludedTerm of lastIncludedIndex
    • lastIncludedConf at lastIncludedIndex
    • snapshot data
  8. follower InstallSnapshot RPC的流程是什么?
    • 检查term
    • 检查是否已包含lastIncludedIndex/lastIncludedTerm
    • set lastApplied = lastIncludedIndex,写入data
    • 使用snapshot重置状态机
  9. server收到InstallSnapshot RPC以后,有没有可能会导致状态机的状态回退? 不会。follower会检查是否已经包含lastIncludedIndex/lastIncludedTerm。
  10. 为什么在处理read-only请求的时候需要提交一个no-op entry? 问题类似于提交上一个term的entry,新leader并不知道先前term的entry是否已提交。需要append一个no-op entry,如果成功提交,那么表示在此之前的所有entry都是已提交了的。
  11. 配置变更时,从集群移除的server如果发起选举,会影响集群的可用性,为何不直接把离开集群的server关闭? \(C_{new}\)不会复制到那些离开集群的server,因此无法做到\(C_{new}\)提交以后,就立即下线这些server。在关闭前的这段时间里,这些server可能会影响集群的可用性。
  12. joint consensus过程中,选举和提交需要同时获得新旧配置的多数派,这对性能的影响有多大?
    • 在大多数不发生错误的情况下,获得新旧配置的多数派应该是一个比较快的过程。
    • 获得新旧配置的多数派仍然会比普通的commit要慢,但考虑到配置变更并不经常发生,所以这个代价可以忍。
  13. joint consensus过程中,选举和提交需要同时获得新旧配置的多数派是否是必须的? 是,这是为了确保安全性所必须的。在joint consensus关于安全性的讨论中,列举了如果leader失败,发生选取时的情况,除特殊的两个外,获得新旧配置多数派的要求避免了disjoint majority出现
  14. 配置变更时,新server是作为non-voting成员加入的,这个要求为何可以提升可用性? 当\(C_{old,new}\)提交以后,集群才可以继续处理请求。而\(C_{old,new}\)的提交需要新配置的多数派复制成功,空server会拖慢达这个过程。
  15. 离开集群的server发起投票会影响集群的可用性,为何不直接使用当前配置来判断,看发起请求的server是否在配置中?
  16. joint consensus的起止时刻是什么?
    • 开始:leader append \(C_{old,new}\)
    • 终止:
      • leader未成功提交\(C_{old,new}\)就挂了。
      • leader成功提交\(C_{new}\)
  17. 配置的entry是否可能被后续leader覆盖? 可能。如果前任leader未成功提交\(C_{old,new}\)就挂了。
  18. 如果log和创建的snapshot大小差别不大,那snapshot是否还有用(例如:k/v server大量插入新key)? 有。
    • 避免raft log entry一直占用内存。
    • 恢复服务时,使用snapshot可能会比直接使用log能更快(比如snapshot数据的组织方式更好)。
  19. InstallSnapshot会占用带宽不? 会,如果状态很大的话。可以用一些方式来减少InstallSnapshot RPC的调用,
    • 考虑让leader保留更久的log,来应对follower的lag或暂时的下线。
    • 只发送两个server diff的部分。
  20. follower的entry是否有可能不在收到的snapshot里面? 有可能,例如:leader尚未提交的entry。
  21. InstallSnapshot是否是原子的?如果在InstallSnapshot执行途中,follower挂了,重发InstallSnapshot是否ok? 是原子的、幂等的。

Lecture

性能问题

raft牺牲了性能来换取简洁的设计: 1. follower拒绝乱序的append,不允许日志有空洞。 2. 尚未支持batch或pipeline方式的append。 3. 对于大的状态,snapshot比较浪费。 4. 慢leader会影响系统的性能。

References

  1. Raft One-Server成员变更
  2. 一文看尽 Raft 一致性协议的关键点
  3. ongardie/raft-single-server-changes-safety
  4. bug in single-server membership changes
  5. raft/JOINT-CONSENSUS.md