0%

之前有记录过python Queue的使用,以及multiprocessing.Process模块。现在看看multiprocessing.Queue的具体工作方式(本文基于Python 3.7.4)。

multiprocessing.Queue定义在queues.py中,除此之外还定义了SimpleQueueJoinableQueue,是FIFO队列。

类似multiprocessing.Process,首先导入了context,判断当前系统类型,并相应地使用对应的实现。使用的multiprocessing.Queue()的时候,实际上是调用了context中的Queue()方法,设置了ctx

1
2
3
4
def Queue(self, maxsize=0):
'''Returns a queue object'''
from .queues import Queue
return Queue(maxsize, ctx=self.get_context())

SimpleQueue

SimpleQueue很简单,其实就是一个带锁的pipe,

  • 主进程和子进程分别使用各自的lock,实现写入pipe和读取pipe是并发安全的
    之所以put()get()可以使用不同的lock,是因为pipe两端的读写已经是并发安全的了。
  • multiprocessing.Pipe来实现消息传递,支持多读多写

关于os.pipemultiprocessing.Pipe

  • os.pipe:在Linux上底层访问的是传统的POSIX pipes,单向,使用encode/decode序列化
  • multiprocessing.Pipe:使用multiprocessing.Connection实现,在Linux上使用POSIX sockets完成数据发送,双向,使用pickle/unpickle序列化

Queue

Queue可以看做在SimpleQueue的基础上,增加了生产者端的发送buffer、支持设置队列大小,以及get()put()的无阻塞调用。

python queues

初始化

__init__()初始化了这么几个比较重要的变量,

  • _maxsize:队列最大size
  • _reader_writermultiprocessing.Connection实例,负责数据的收发
  • _rlockmultiprocessing.Lock,进程间共享,保证_reader.recv_bytes()并发安全
  • _wlockmultiprocessing.Lock,进程间共享,保证_writer.send_bytes()并发安全
  • _sem:队列长度信号量,计数器初始化为_maxsize
  • _notempty:条件变量,同步生产者放入_buffer_buffer中数据的发送
  • _buffer:每个进程有自己独立的buffer,线程安全,size其实是由_sem来控制
  • _thread:生产者数据发送线程

put

首先_sem.acquire(),计数器-1,如果不为零,说明队列还可以写入。

1
2
if not self._sem.acquire(block, timeout):
raise Full

然后获得_notempty的锁,如果发送线程未创建,则创建。追加元素至buffer后,_notempty.notify()通知发送线程。生产者中数据的发送由单独的线程完成,主线程只负责将数据放入buffer。

1
2
3
4
5
with self._notempty: # acquire保护_notempty和_thread的修改
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
self._notempty.notify()

feed线程

_start_thread()创建thread对象后,设置daemon属性为True,目的是随主线程的退出而退出,不必手动添加停止的逻辑。接着就启动线程。

具体发送时,不断等待buffer中有元素;如果buffer中有元素,popleft()并发送,直到发送完。

为何_notempty的锁在popleft()就释放了?

  1. buffer是collections.deque,本身就是并发安全的
  2. _notempty锁的目的其实是为了保护_notempty的修改,和put()中的目的类似,但并不是为了保护buffer
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    while 1:
    try:
    nacquire()
    try:
    if not buffer:
    nwait()
    finally:
    nrelease()
    try:
    while 1:
    obj = bpopleft()
    if obj is sentinel:
    debug('feeder thread got sentinel -- exiting')
    close()
    return

    # serialize the data before acquiring the lock
    obj = _ForkingPickler.dumps(obj)
    if wacquire is None:
    send_bytes(obj)
    else:
    wacquire()
    try:
    send_bytes(obj)
    finally:
    wrelease()
    except IndexError:
    pass
    except Exception as e:
    # ......

get

对于block=True的情况
类似SimpleQueue,一直等待_reader.recv_bytes(),直到收到数据。在_reader.recv_bytes()以后,_sem.release(),计数器+1,表示从队列消耗了一个元素。此时如果有进程调用_sem.acquire()并在等待,那么_sem.release()会唤醒其中一个等待的进程。

对于block=False的情况
由于不能阻塞,因此不能一直等待_reader.recv_bytes()

  1. 如果在timeout_rlock都没有获得锁,则返回Empty
  2. _reader.poll()(底层还是使用select来实现的)判断是否在timeout_reader可读,如果不可读,则返回Empty
  3. 最后_reader.recv_bytes(),并_sem.release()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def get(self, block=True, timeout=None):
if block and timeout is None:
with self._rlock:
res = self._recv_bytes()
self._sem.release()
else:
if block:
deadline = time.monotonic() + timeout
if not self._rlock.acquire(block, timeout):
raise Empty
try:
if block:
timeout = deadline - time.monotonic()
if not self._poll(timeout): # 此时的timeout已经减去了等待_rlock.acquire的时间
raise Empty
elif not self._poll():
raise Empty
res = self._recv_bytes()
self._sem.release()
finally:
self._rlock.release()
# unserialize the data after having released the lock
return _ForkingPickler.loads(res)

序列化和反序列化

序列化和反序列化使用的是pickle来实现,那么如何判断消息的边界?Connection中定义了一个规则,在header存放长度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def _send_bytes(self, buf):
n = len(buf)
# For wire compatibility with 3.2 and lower
header = struct.pack("!i", n)
if n > 16384:
# The payload is large so Nagle's algorithm won't be triggered
# and we'd better avoid the cost of concatenation.
self._send(header)
self._send(buf)
else:
# Issue #20540: concatenate before sending, to avoid delays due
# to Nagle's algorithm on a TCP socket.
# Also note we want to avoid sending a 0-length buffer separately,
# to avoid "broken pipe" errors if the other end closed the pipe.
self._send(header + buf)

def _recv_bytes(self, maxsize=None):
buf = self._recv(4)
size, = struct.unpack("!i", buf.getvalue())
if maxsize is not None and size > maxsize:
return None
return self._recv(size)

close和join_thread

由于生产者启动了一个线程来负责发送,元素首先append到buffer,然后发送。在进程结束时,如何确保元素发送完毕的问题?

在启动feed线程的时候,创建了两个Finalize对象,_finalize_close_finalize_join,前者的优先级较高,并set了_close_jointhread。当进程退出的时候,会自动地先后调用这两个函数(基于atexit实现),

  1. _finalize_close:会append元素_sentinel = object()到buffer,feed线程如果看到_sentinel,会调用close()并停止服务
  2. _finalize_join:join feed线程
1
2
3
4
5
6
7
8
9
10
11
12
13
if not self._joincancelled:
self._jointhread = Finalize(
self._thread, Queue._finalize_join,
[weakref.ref(self._thread)],
exitpriority=-5
)

# Send sentinel to the thread queue object when garbage collected
self._close = Finalize(
self, Queue._finalize_close,
[self._buffer, self._notempty],
exitpriority=10
)

模块同样也提供了close()join_thread()方法,调用的其实就是_close_jointhread。如果先手动调用,然后aexit调用,那不会回有问题?首次调用后,都会从util模块的_finalizer_registry中移除,因此不会存在重复调用_finalize_close_finalize_join的问题。

deadlock

这几个stackoverflow(123)的问题很类似,在官方的文档中也有提及(python 2python 3),总结下来就是,join一个调用put的进程,且这个进程尚未把buffer中所有的元素写入pipe时,可能会导致死锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import os
import multiprocessing

def work(q):
q.put('1'*10000000)

if __name__ == "__main__":
print(os.getpid())
q = multiprocessing.Queue()

p = multiprocessing.Process(target=work, args=(q,))

p.start()
p.join()

print(q.get())

原因是,Queue使用了os的pipe来进行数据的传输,而pipe的大小是有限的。如果数据过大,_writer.send_bytes()在写入数据到pipe的时候会阻塞。如果此时join这个子进程,那进程本身已经卡住了,join永远等不到进程结束。

JoinableQueue

JoinableQueue基于Queue实现,覆盖了put(),并新增了,

  • task_done():表示先前放入队列中的元素被取走了,由消费者调用
  • join():阻塞直到队列中所有元素都被取走

初始化

__init__()调用了Queue的初始化函数,并额外初始化了,

  • _unfinished_tasks:信号量,表示队列中当前未取走的元素
  • _cond:条件变量,同步join()task_done()

put

Queueput()基本一致。多出来的一点是,每次put()都会对_unfinished_tasks的计数器+1。

1
2
3
4
5
6
7
8
def put(self, obj, block=True, timeout=None):
# ...
with self._notempty, self._cond:
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
self._unfinished_tasks.release()
self._notempty.notify()

task_done和join

task_done()_unfinished_tasks的计数器进行-1,如果计数器为0,则通知等待在join()的进程。

1
2
3
4
5
6
7
8
9
10
11
def task_done(self):
with self._cond:
if not self._unfinished_tasks.acquire(False):
raise ValueError('task_done() called too many times')
if self._unfinished_tasks._semlock._is_zero():
self._cond.notify_all()

def join(self):
with self._cond:
if not self._unfinished_tasks._semlock._is_zero():
self._cond.wait()

Reference

  1. Python os.pipe vs multiprocessing.Pipe
  2. Daemon is not daemon, but what is it?
  3. Python 中的 multiprocess.Queue
  4. Python 3 Multiprocessing queue deadlock when calling join before the queue is empty
  5. Process.join() and queue don’t work with large numbers
  6. Script using multiprocessing module does not terminate
  7. https://docs.python.org/3/library/multiprocessing.html#all-start-methods
  8. Joining multiprocessing queue takes a long time

之前有记录过python进程间通信的几个方式,现在来看看这个模块的具体的是怎样工作的(本文基于Python 3.7.4)。

multiprocessing.Process典型的使用方式为,

1
2
3
4
5
6
7
8
import multiprocessing

def f(name):
print('hello', name)

p = multiprocessing.Process(target=f, args=('world',))
p.start()
p.join()

以这段代码为例,看看进程的执行过程。

python process

import

import multiprocessing,导入multiprocessing的时候,做了一些初始化的工作。

  1. multiprocessing的包导入了context,这个模块主要是判断当前系统类型,并相应地使用对应的实现。在sys.platform != 'win32'的系统上,例如:mac和linux,默认使用fork来创建子进程。
  2. context模块中,
    • 导入了process
    • 定义了Process类,这个类继承自process.BaseProcess,覆盖了process.BaseProcess_Popen方法,目的是根据系统类型,在初始化的时候根据系统类型,选择相应的Popen实现
  3. process模块中,
    • 导入了util,注册了_exit_function,被注册的函数会在解释器正常终止时执行(子进程主动调用,主进程自动调用)
    • 此时在父进程里,初始化了几个process模块的全局变量,
      • _current_process:代表当前进程
      • _process_counter:进程数计数器
      • _children:子进程的集合

创建Process实例

p = multiprocessing.Process(target=f, args=('world',)),实际上是调用process.BaseProcess__init__()方法进行初始化,并使用了process模块的全局变量初始化实例变量,以及获取当前进程的pid,

  • count = next(_process_counter)
  • self._identity = _current_process._identity + (count,)
  • self._config = _current_process._config.copy()
  • self._parent_pid = os.getpid()

启动

p.start()创建子进程,首先做了几个检查,

  1. 如果已经创建了子进程,那么不能再次创建
  2. 只能start由当前进程创建的Process实例
  3. 不允许创建daemon进程的子进程

接着_children中清理当前已经结束的进程,然后调用self._Popen(self)开始创建子进程。使用os.fork()创建子进程,用法和posix的fork一样,不多说。要注意的一点是,调用os.pipe()创建了parent_rchild_w,而parent_r将会被用于join()的实现。

子进程中,执行_bootstrap()进行初始化和运行target

  1. 初始化了process模块的全局变量,_current_process_process_counter_children
  2. 清空util._finalizer_registry,执行util._run_after_forkers()
  3. run()运行target
  4. util._exit_function()做进程结束后的收尾工作
    • util._exit_function()会调用_run_finalizers(),这个函数会把优先级高于minpriorityfinalizer执行一遍
      1
      2
      3
      4
      5
      6
      7
      def _run_finalizers(minpriority=None):
      '''
      Run all finalizers whose exit priority is not None and at least minpriority

      Finalizers with highest priority are called first; finalizers with
      the same priority will be called in reverse order of creation.
      '''
    • 默认情况下子进程的_finalizer_registry是空的,没有任何的finalizer会被执行,但可以通过multiprocessing.util.Finalize手动的进行注册,来完成一些收尾工作,例如:关闭db连接。
      1
      2
      3
      4
      5
      6
      7
      class Finalize(object):
      '''
      Class which supports object finalization using weakrefs
      '''
      def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
      # ......
      _finalizer_registry[self._key] = self

join

创建子进程的时候,也创建了pipe parent_r,并set self.sentinel = parent_r,且关闭了child_w。此时唯一打开child_w的进程是子进程,唯一打开parent_r的是主进程。主进程调用join()时,实际上是等待parent_r变为可读状态(wait返回)。

1
2
3
from multiprocessing.connection import wait
if not wait([self.sentinel], timeout):
return None

那么何时wait返回?wait循环调用select,当parent_r ready的时候,wait返回,

  • parent_r可读
  • 所有writing side被关闭

当子进程结束的时候,os会关闭这个进程关联的所有fd,又因为主进程已经关闭了child_w,所以此时parent_r ready,主进程中的join()也就返回了。

Reference

  1. multiprocessing — Process-based parallelism
  2. Why should you close a pipe in linux?

介绍

文章主要使用实验验证了raft是否如原论文所阐述的易于理解和实现。重新阐述了raft,使用OCaml实现了raft,开发了一个事件驱动的模拟器来进行测试,重现了raft原论文中的测试,并提出了几个优化。

性能

对于raft的性能,raft原论文中提到了两点,

  1. 典型的情况下,leader复制entry的时间。
  2. 最坏情况下,leader失败后,选举出新leader的时间。
    1. 选举能否快速收敛?
    2. 集群能达到的最小下线时间是什么?

典型的情况下,leader复制entry的时间

需要耗费一个rtt来复制到多数派server,可以做batching和piplining优化。

最坏情况下,leader失败后,选举出新leader的时间

关于这点,raft原论文中使用5个server的集群做了两个测试。为了模拟最坏情况,

  • 每次测试server都有不一样长度的log,使得有的candidate不能成为leader。
  • 为了制造易于出现split vote的环境,在每次终止leader前,都同步地广播heartbeat,目的是重置election timeout。

-w302

两个测试表明了:

  1. election time中加入少量的随机,能够明显的减少选举新leader的时间,减少split vote的出现。
  2. 集群的最小下线时间随着election time的减少而减少,但如果少于broadcast time,那么会集群产生不必要选举,降低可用性。

优化

本文中提出了几个优化,

  1. Different Follower and Candidate Timers
  2. Binary Exponential Backoff
  3. Combined

-w293

Different Follower and Candidate Timers

raft原论文建议$\textit{candidate timeout} = \textit{follower timeout} ∼ U(T, 2T), T=150ms$,但在高竞争的情况下,例如:$U(150ms, 151ms)$,将两个时间设置在不同的范围,可以大大减少选举leader的时间。

Binary Exponential Backoff

类似tcp超时重传的指数回退。

Readings - In Search of an Understandable Consensus Algorithm (Extended Version) (Section 6 to end)论文

集群成员变更

实际应用中,常常会有配置变更的需求,即:成员变更。手动的方式有下面两种,

  • 把集群整体下线,配置修改完毕以后再上线是可行的,但会造成服务不可用。
  • 新server可以通过获取其ip来替换集群成员,需要保证被替换的server不会再加入集群。

但这两个方式都有明显的弊端,且任何手动的步骤都有引起错误的可能。

配置切换需要保证安全性,在同一个term内,集群不能够同时存在两个leader。由于无法一次性原子的切换所有server的配置,一次增减多个server并直接切换配置可能会出现disjoint majorities的情况。

raft变更的方案有两种:

  1. single-server change
  2. 使用joint consensus

single-server change

每次增减一个server。

配置的变更
具体变更过程如下,

  1. leader收到变更请求,AppendEntries RPC按新配置发送$C_{new}$。
  2. 每个server收到$C_{new}$后立即生效
  3. 新配置下,$C_{new}$复制到大多数server,则达成committed。
    • 此时,就算有剩下的server未得到新配置,也不会构成多数派,
    • 且,未得到新配置的server也不被选举为leader。

$C_{new}$提交后,

  1. leader可以响应client,告知本次配置变更已经完成。
  2. 如果配置是移除一个server,那么这个server可以下线了。
  3. 可以开始下一次配置更新。

安全性
-w543

总共有四种情况:

member change 变更后达成disjoint majorities的条件
奇数个成员,增加一个 2k+1 -> 2k+2 old = k+1, new = k+2
奇数个成员,减少一个 2k+1 -> 2k old = k+1, new = k+1
偶数个成员,增加一个 2k -> 2k+1 old = k+1, new = k+1
偶数个成员,减少一个 2k -> 2k-1 old = k+1, new = k

任意一种情况对应的条件都是不可能同时达成的,因为要求的成员数目都大于真正的成员数目,不会产生同一个term两个leader的现象。换句话说,旧配置集群与新配置集群的任意多数派必然有交集,即:至少存在一个voter(接受旧leader的$C_{new}$,并且为新leader投票),不会出现disjoint majorities。

因此增减一个server情况,直接切换配置是安全的。

这个交集也保证了在变更配置的过程中,在$C_{old}$中、以及变更期间复制的日志,最后一定会出现在$C_{new}$。

何时开始下一次变更
能够开始下一次配置更新的前提是当前的配置已经commit,否则无法保证安全性。如果server在$C_{new}$commit以后才使用$C_{new}$,会带来很多不必要的、额外的维护工作,

  1. leader很难知道旧配置集群的多数派使用$C_{new}$的时间。
  2. 需要跟踪哪些server知道了commit,且做持久化。但这些是raft本身不具备的功能。
  3. 如果leader改变了,那么需要移除$C_{new}$的entry,此时,server还需要准备回滚到上一个配置。

majority的是对谁而言的
对于选举和append entry,都是仅由调用方来判断是否达成多数派,接收方不负责,否则会存在类似“鸡生蛋蛋生鸡”的问题。

可用性
配置变更给保证集群的可用性带来了几个问题。

  1. Catching up new servers
    -w517

    一个新server加入集群,新server通常并不包含任何entry,那么可能需要花费较长的时间来同步日志。在这段时间,集群更容易出现不可用的问题。例如:3->4,此时要求的majority是3,但是s3挂了。

    为了最小化不可用的出现,需要保证不可用的时间在一次election timeout内

    -w530

    具体方法是,

    • 新加入的server先作为non-voting成员。
    • 复制到新server的过程分为多个round,每个round都复制leader所有的entry。
    • 当前复制的round内,leader可能又有新的entry了,下一个round会进行复制。
    • 在固定round内(例如:10),如果最后一个round的时间 < election timeout,此时假设不存在更多的entry会导致明显的不可用,添加新server。
    • 否则leader终止变更配置。
  2. Removing the current leader
    如果使用joint Consensus,或没有leadership transfer的情况下,需要一个leader下线的方法:旧leader等到$C_{new}$ commit以后让位(转变为follower状态)。

    在commit之前,当前leader管理集群不包含leader自己,复制和投票的时候不把自己算入majority。

  3. Disruptive servers
    被排除在$C_{new}$之外的server,由于不再收到heartbeat,会不断的发起投票。虽然新选出的leader始终会出现在$C_{new}$中,但是这干扰了集群正常的工作。

    第一个思路是引入一个Pre-Vote阶段,在发起选举前,检查自己是否有成为leader的资格,即:candidate的log比大多数server更新。但并不总是有效。例如:{ABCD}->{ABC}的时候,A是Leader,在尝试复制$C_{new}$到BC的时候,D可能发起了Pre-Vote,D的log相对于BC足够新,可以获得BC的投票成为leader。因此检查log的方式是不可行的。

    raft使用的方式是,如果一个server获得上一次heartbeat的时间在最小election timeout内,这个server收到RequestVote时就不更新term或投票。

    如果确实需要发起选举,例如:进行leadership transfer的时候,可以用一个标志位来区分。

bug in single-server change
如果配置变更是在同一个term内完成的,那么不会有问题。但如果出现在跨term且并发的配置变更,就不一定了。

例如先后增减一个server,具体过程如下,
-w630

2中,s1把D复制到s1和s5然后挂了,接着s2接受s2、s3、s4的投票(使用C判断majority)成为term2的leader。5中,s2把E复制到s2和s3,并标记为committed(使用E判断majority)。然后s1恢复,接受s1、s4、s5的投票(使用D判断majority)成为term3的leader,继续复制D,最后在7中覆盖已提交的E。

这个问题类似提交上一个term的entry,解决方法是一样的,leader当选以后,直到当前term的entry提交以后,才能开始下一次配置变更。可以通过append一个no-op entry来实现。

原文的single-server change保证了,在同一个term内不会出现未提交的configuration entry。这个patch保证了,来自先前term未提交的configuration entry永远不会被提交。

回到前面的例子,3中s2成为term2的leader以后立即append no-op entry,此时使用C判断majority,假设复制到s2、s3、s4的index 2。接着s2继续把E复制到s2和s3。如果接下来s1恢复并发生了选举,s1不可能成为leader,因而避免了已提交的E被覆盖的情况。

使用joint consensus

这个方法并不建议在工程中使用,更简单的single-server change足以将集群变更为任何期望的配置。

-w344

joint consensus
joint consensus状态混合了新旧配置,允许每个server在不同的时间安全地切换配置,且在这个过程中能持续提供服务,这个状态中,

  • entry会被复制到所有新旧配置。
  • 来自任何配置的机器都可以被选举为leader。
  • 选举和append的majority,需要分别来自新旧配置。

相比single-server change,joint consensus引入了一个中间的entry $C_{old,new}$,具体过程是,

  1. 将新旧配置存储到$C_{old,new}$,并复制,进入joint consensus状态。
  2. 每个server收到$C_{old,new}$后立即生效,leader使用$C_{old,new}$来判断是否提交。
    $C_{old,new}$复制的过程中,如果leader挂了,那么新的leader可能在$C_{old,new}$或$C_{old}$中选举出。无论leader来自哪个配置,$C_{new}$不能单方进行决策。
  3. $C_{old,new}$提交后,leader可以复制$C_{new}$。
    一旦$C_{old,new}$提交,$C_{new}$或$C_{old}$都不能单方进行决策。
  4. leader使用$C_{new}$来判断是否提交,提交后,完成配置变更。

安全性
在joint consensus过程中,发生选举时,可能从以下情况选出leader(按joint consensus的步骤顺序列举),

  1. 来自$C_{old}$,log不包含$C_{old,new}$。
  2. 来自$C_{old}$,log包含$C_{old,new}$。
  3. 来自$C_{new}$,log包含$C_{old,new}$。
  4. 来自$C_{new}$,log包含$C_{new}$。

而任何两个leader的组合都是不可能同时出现的。

leader组合 不可能出现的原因
1+1 or 4+4 选举规则限制
1+2 先看2的选举,需要分别来自新旧配置的多数派,此时已经不能再从$C_{old}$中选举1
1+3 类似1+2
1+4 a. 先看4,既然$C_{new}$出现了,那么$C_{old,new}$肯定提交了,这个提交需要分别来自新旧配置的多数派,因此$C_{old}$中不包含$C_{old,new}$的server无法选举为leader
b. $C_{new}$是在$C_{old,new}$提交后才复制,如果选举出4,1就不会存在
2+2 类似1+2
2+3 类似1+2
2+4 类似1+2
3+3 类似1+2
3+4 类似1+2

因此不会出现disjoint majorities的情况。

是否受single-server change的bug影响
不受。

日志压缩

raft的日志随着客户端不断的请求增长。一旦entry已经提交并执行,那么中间的状态和操作就不再需要,可以被压缩。

文章讨论了几种进行日志压缩的方法,

  • Snapshotting memory-based state machines
  • Snapshotting disk-based state machines
  • Incremental cleaning approaches
  • Leader-based approaches

这几个方法有一些核心概念基本都是相通的,

  1. 每个server独立的负责日志压缩,而非由leader集中决定。
  2. raft向状态机转移维护prefix of the log的职责。
  3. raft丢弃部分日志前缀a prefix of the log后,状态机会承担两个新的职责
    1. 如果server重启,状态机在apply entry前,需要先load那些被丢弃的日志。
    2. 为了落后较多的server或新server能够追上,状态机可能需要输出一个状态的镜像。

Snapshotting memory-based state machines

适用于状态机的数据是存放在内存的情况。每个server独立的创建已经提交entry的snapshot。主要过程是,

  • 状态机序列化当前状态。
  • 一旦状态机完成snapshot的写入以后,日志就可以被截断了,raft首先保存snapshot中lastIncludedIndex和lastIncludedTerm,以及这个index对应的lastIncludedConf。
  • raft可以丢弃截止index的entry和先前的snapshot。

-w318

InstallSnapshot
为了落后较多的server或新server能够追上,这个方法里使用InstallSnapshot来实现。leader仅当丢弃了需要复制的next entry的时候,才发送snapshot,snapshot以chunks的形式有序发送。

并发创建
创建snapshot时,状态机需要维持一个不变的状态,但进行序列化和落盘需要较长的时间,因此创建的过程需要与普通操作并发执行。可以使用copy-on-write实现,有两种方法,

  1. 状态机使用不可变数据结构。
  2. 依赖os的copy-on-write支持,例如:linux的fork。

copy-on-write占用额外的内存,在创建的过程中,占用的额外内存与状态的修改成正相关,因此需要事先计划和管理。如果在snapshot的过程中,内存满了,那么server只能暂停服务,此时集群可能还是可用的。最好不要终止稍后重试,下次创建的时候很可能还会有类似的问题。

何时创建
如果创建的过于频繁,会浪费磁盘带宽和其他资源,如果过于稀少,会导致创建出过大的snapshot,增加传输和回放的时间。

有这么几个判断的方法,

  • 如果size(log)明显大于一个预定的值。
    当这个值明显大于snapshot的大小时,磁盘写入开销会很小。但对于较小的状态机,需要等待较长的时间才会有满足大小要求的log。
  • 如果size(log)大于size(snapshot)的倍数。
    不过判断当前状态机的snapshot大小并不容易。
  • 如果size(log)大于size(prev snapshot)的倍数,expansion factor。
    expansion factor控制了磁盘带宽的开销。

还可以仅在少数派server上创建snapshot,不影响服务client。

Client交互

查找cluster

如果配置固定,这个过程很简单。难点在于成员不断变更的情况,可用的方法有,

  • 广播,但受限于特定的网络环境。
  • 使用外部的目录服务,例如:DNS。需要在变更的过程中增减相应的server。

路由请求到leader

client的请求是由leader处理的,因此client需要找到leader,可以随机的选取一个server发起请求,如果不是leader,server拒绝,client重试直到找到为止,尝试次数期望是$(n+1)/2$。在此基础上可以做一些优化,

  • server拒绝的时候返回leader。
  • server做代理,转发请求到leader。

还需要避免过期的leader信息导致处理client请求的时候产生不必要的延迟,

  • leader:如果产生网络分区,且client向拥有少数派的leader发送了请求,在分区恢复前,这个请求一直都无法得到处理。因此当超过election timeout以后,leader都没有向多数派成功的发送心跳,那么leader让位
  • follower:如果follower发起新的选举或者term变更,那么follower丢弃当前维护的leader信息。
  • client:当丢失与某个server的连接,应该随机选取一个server进行重试。

线性一致性

截止目前,raft提供了at-least-once的语义。client重试、以及网络导致请求重复会导致命令被执行多次。但是at-least-once对于一个基于共识的系统是不够的,raft需要可线性化的语义,通过对请求去重,可以实现这一点。

使用session去重
每个client分配一个唯一id,每个请求分配一个唯一的递增序号。server维护每个client的session,这个session跟踪了每个client的最新序号和对应的response,如果收到了一个已经处理过的序号,那么直接返回。

这样每个命令就做到了以log中第一次出现的顺序立即生效且只执行一次。

对于来自同一个client的并发请求,server维护一个<序号,response>的集合。每个请求中携带一个client未收到的最小序号,server丢弃小于这个序号的response。

session保存的多久
受存储的限制,session不能永久保存,server需要对何时过期session达成共识。

  • 一个方法是设置存储session数的上限,并使用lru淘汰session。
  • 另一个方法是基于对时间源达成的共识来淘汰session(原文中的描述不是非常清晰,待补充)。

处理session过期的client请求
当session过期后,client还继续操作时,这被看做异常情况(待补充)。

更高效的处理read-only请求

raft日志的目的是以相同的顺序把变更复制到server上,并保证读写时候的线性一致性语义。read-only命令只涉及查询状态机,可以绕开日志的复制,避免同步磁盘写入,会大大的提升性能。但如果没有额外的控制,client会读取到过期的值。

为了使得绕开raft日志的read-only请求保持线性一致性,针对每次read-only请求,leader需要,

  1. 如果当前term还没有提交过entry,等待直到有。如果是刚成为leader,则需要先提交一个no-op entry。
  2. leader将当前的commit index记录到本地变量readIndex,这个会作为read-only操作的下界。
  3. leader需要自己的身份是有效的,不存在在自己不知情的情况下(网络分区)选举出了新的leader。这里的方法与路由请求到leader中避免过期的leader类似,如果成功向多数派发送了heartbeat,那么leader可以知道在发送heartbeat的时候,身份仍然是有效的。
  4. leader等待lastApplied >= readIndex,此时的readIndex是能保证线性一致性的最小index
  5. leader向自己的状态机发起查询请求,并返回结果。

优化leadership确认
每次查询请求都需要执行3,可以把所有累计的查询通过一次heartbeat来确认leadership。

follower分担read-only负载
同样需要保证不读取到过期的数据,保证线性一致性。为此follower可以向leader发送一个查询当前readIndex的请求,然后leader执行上面的1-3,follower执行4-5。

使用时钟减少heartbeat带来的延迟
虽然有batch优化,read-only查询仍然需要做一次heartbeat来确认leadership,可以用时钟来避免heartbeat带来的延迟。

-w262

  • leader使用heartbeat来维持一个lease,如果leader成功向多数派发送了heartbeat,那么leader可以认为在接下来的election time时间内都不会有新的leader产生,这个lease可以扩展到$start+\frac{\textit{election timeout}}{\textit{clock drift bound}}$,在这个时间之前都不用执行上面的步骤3。
  • 在进行leadership transfer的时候需要将lease主动过期,因为会导致更早的产生新leader。

要注意的是,使用lease的方式假设了server之间时钟漂移的上界(在给定的一段时间内,没有server的时钟增加的时间会超过这个上界),找到并维护这个值会增加额外的运维成本。如果假设失效了,系统可能会返回任意过期的信息。

可以使用一个的扩展来增强对client提供的保证,即使上述假设失效的情况下,读操作满足线性一致性,不至于错的离谱。具体方法是,

  1. server返回client的时候,带上状态机状态对应的index。
  2. client跟踪自己看到的与结果对应的最新index,发送请求的时候带上这个index。
  3. 如果server收到的index > lastApplied,那么server暂时不处理这次请求。

问题

  1. 创建snapshot的时候,有什么限制?
    • 不能丢弃未提交的和未执行的
    • 已执行的entry可能需要用于使其他server更新
  2. snapshot和log的关系?
    snapshot反映的是已经执行的log。
  3. server持久化了哪些数据在磁盘上?
    • 截止到某个entry的snapshot + 后续的log = server完整的log。
    • 其他状态信息,如:currentTerm,votedFor等。
  4. 如果某个follower落后了,同时leader丢弃了follower的所需的log,怎么办?
    nextIndex将无法会退到那个entry,leader会使用InstallSnapshot RPC。
  5. leader何时会向落后follower发送InstallSnapshot RPC?
    上面的问题即为答案。
  6. 为何leader不仅仅丢弃所有follower都有的entry?
    • 每个server是独立创建snapshot的。
    • 少数落后或失败的follower会导致leader log的持续增加。
  7. snapshot包含什么信息?
    • term
    • lastIncludedIndex
    • lastIncludedTerm of lastIncludedIndex
    • lastIncludedConf at lastIncludedIndex
    • snapshot data
  8. follower InstallSnapshot RPC的流程是什么?
    • 检查term
    • 检查是否已包含lastIncludedIndex/lastIncludedTerm
    • set lastApplied = lastIncludedIndex,写入data
    • 使用snapshot重置状态机
  9. server收到InstallSnapshot RPC以后,有没有可能会导致状态机的状态回退?
    不会。follower会检查是否已经包含lastIncludedIndex/lastIncludedTerm。
  10. 为什么在处理read-only请求的时候需要提交一个no-op entry?
    问题类似于提交上一个term的entry,新leader并不知道先前term的entry是否已提交。需要append一个no-op entry,如果成功提交,那么表示在此之前的所有entry都是已提交了的。
  11. 配置变更时,从集群移除的server如果发起选举,会[影响集群的可用性](#single-server change),为何不直接把离开集群的server关闭?
    $C_{new}$不会复制到那些离开集群的server,因此无法做到$C_{new}$提交以后,就立即下线这些server。在关闭前的这段时间里,这些server可能会影响集群的可用性。
  12. joint consensus过程中,选举和提交需要同时获得新旧配置的多数派,这对性能的影响有多大?
    • 在大多数不发生错误的情况下,获得新旧配置的多数派应该是一个比较快的过程。
    • 获得新旧配置的多数派仍然会比普通的commit要慢,但考虑到配置变更并不经常发生,所以这个代价可以忍。
  13. joint consensus过程中,选举和提交需要同时获得新旧配置的多数派是否是必须的?
    是,这是为了确保安全性所必须的。在joint consensus关于[安全性的讨论中](#使用joint consensus),列举了如果leader失败,发生选取时的情况,除特殊的两个外,获得新旧配置多数派的要求避免了disjoint majority出现
  14. 配置变更时,新server是作为non-voting成员加入的,这个要求为何可以提升可用性?
    当$C_{old,new}$提交以后,集群才可以继续处理请求。而$C_{old,new}$的提交需要新配置的多数派复制成功,空server会拖慢达这个过程。
  15. 离开集群的server发起投票会影响集群的可用性,为何不直接使用当前配置来判断,看发起请求的server是否在配置中?
  16. joint consensus的起止时刻是什么?
    • 开始:leader append $C_{old,new}$。
    • 终止:
      • leader未成功提交$C_{old,new}$就挂了。
      • leader成功提交$C_{new}$。
  17. 配置的entry是否可能被后续leader覆盖?
    可能。如果前任leader未成功提交$C_{old,new}$就挂了。
  18. 如果log和创建的snapshot大小差别不大,那snapshot是否还有用(例如:k/v server大量插入新key)?
    有。
    • 避免raft log entry一直占用内存。
    • 恢复服务时,使用snapshot可能会比直接使用log能更快(比如snapshot数据的组织方式更好)。
  19. InstallSnapshot会占用带宽不?
    会,如果状态很大的话。可以用一些方式来减少InstallSnapshot RPC的调用,
    • 考虑让leader保留更久的log,来应对follower的lag或暂时的下线。
    • 只发送两个server diff的部分。
  20. follower的entry是否有可能不在收到的snapshot里面?
    有可能,例如:leader尚未提交的entry。
  21. InstallSnapshot是否是原子的?如果在InstallSnapshot执行途中,follower挂了,重发InstallSnapshot是否ok?
    是原子的、幂等的。

Lecture

性能问题

raft牺牲了性能来换取简洁的设计:

  1. follower拒绝乱序的append,不允许日志有空洞。
  2. 尚未支持batch或pipeline方式的append。
  3. 对于大的状态,snapshot比较浪费。
  4. 慢leader会影响系统的性能。

References

  1. Raft One-Server成员变更
  2. 一文看尽 Raft 一致性协议的关键点
  3. ongardie/raft-single-server-changes-safety
  4. bug in single-server membership changes
  5. raft/JOINT-CONSENSUS.md

数据密集型应用

现在的很多应用是数据密集型的,数据是这些应用的主要挑战-数据的总量、数据的复杂度和数据变化的速度。

很多数据密集型的应用都是基于已有的数据系统提供的常用功能来构建的。例如:

  • 存储数据,以便自己或其他应用能够找到(基于数据库实现此功能)
  • 记住一个昂贵操作的结果,来加速读取(缓存)
  • 允许用户通过关键词搜索数据,或多种方式过滤数据(搜索索引)
  • 发送消息到另一个进程进行异步处理(流式处理)
  • 定时处理大量累积的数据(批处理)

由于数据系统的抽象,这些功能都看似很简单。但是,数据系统在逐渐变得相似,不同的数据系统可能同时具有多种特性,例如:Redis的cache和消息队列功能;应用程序越来越宽泛的需求使得单一数据系统无法完成,需要使用程序代码来组合不同的数据系统;同一需求,可能有多种方式和数据系统来实现。因此选择合适的数据系统和权衡架构的设计是一个值得思考的问题。

设计数据系统的原则

  • 可靠性(Reliability)
    就算出现问题的时候(硬件或软件错误,人为错误),系统都应该应该持续的正确工作(在期望的水平上提供正确的功能)。
  • 可扩展性(Scalability)
    随着系统的增长(数据量、流量或数据复杂度),应该有合理的方法来应对这些增长。
  • 可维护性(Maintainability)
    随着时间的推移,许多人都会参与系统相关的工作(开发和运维,他们保证系统现有行为正常、并使系统能够应对新的情况),他们应该高效的工作。

可靠性

系统具有容错性(fault-tolerant或resilient)。系统在遇到特定错误的时候,也能按预期正常工作。

这里原文特别区别了fault和failure。fault指的是系统组件的运作偏离了预期,而failure指的是系统整体无法提供服务。设计一个完全没有fault的系统是不可能的,但是可以通过设计fault-tolerant机制来避免fault导致failure。
下面,failure会写为系统故障。

硬件故障
每个硬件都有预期的寿命,系统使用的硬件规模够大、运行时间够长时,硬件总会出故障。

通过软件和硬件层面冗余的方式,可以避免出现硬件故障时无法提供服务。

软件错误
硬件错误的发生相对独立,但是软件错误更加难以预期,往往会导致很多的系统错误。

通过完善的考虑系统的假设和交互、测试、进程隔离、监控、允许进程挂了后重启等方式来避免软件错误。

人为错误
系统的设计、构建和运维是由人来进行的,但人是不可靠的。

通过如下方式来减少人为错误:

  • 以最小化错误机会的方式来设计系统。例如:设计良好的抽象、API和管理界面来避免“做错误的事”。
  • 解耦人们最容易犯的错和犯错的地方。例如:提供完整功能的非生产环境sanbox。
  • 完善的测试。例如:从单元测试到整个系统集成测试。
  • 提供快速和简单的错误恢复机制。例如:快速的回滚配置,上线新代码应逐步从小范围内到大范围,提供重新计算数据的工具来修复老数据错误。
  • 详尽和清晰的监控。例如:性能计数和错误率。

可扩展性

即便一个系统现在能够可靠的工作,这不意味着未来也能。一个常见的原因就是负载的增加。当讨论可扩展性时,常常需要考虑,“如果系统以一个特定的方式增长,应该怎么办?”,“如何增加计算资源来应对额外的负载?”。

描述负载
首先需要简洁的描述负载,负载可以使用几个数字来描述,叫做负载参数(load parameters),参数的选择依赖于系统的架构,例如:

  • web服务器每秒的请求个数。
  • 数据库的读写比例。
  • 聊天室同时活跃的用户数。
  • cache命中率。

描述性能
一旦有了系统负载的描述,那么可以讨论负载增加时会发生什么,可以有两个角度:

  • 负载增加时,如果保持系统资源不变,系统的性能会怎样?
  • 负载增加时,为了保持系统性能不变,需要增加多少资源?

讨论这两个问题需要描述性能,

  1. 吞吐量(批处理系统)
  2. 响应时间(在线系统)
    • 平均响应时间。
    • 百分比响应时间。
      例如:p50,取time的中位数,如果是200ms,那么代表50%的响应时间小于200ms。类似的还有p90,p99。
    • 高百分比响应时间,又叫做尾部延迟(tail latencies)。

性能与可用性一起被用在SLOs(service level objectives)和SLAs(service level agreements)中,规约定义了服务期望的性能和可用性。

高百分比响应时间常常受队列延迟(queueing delay)的影响,少量慢请求会阻塞后续请求的处理,这个现象又叫做head-of-line blocking。

如果一个请求需要进一步使用更多的后端调用来完成,那么一个较慢的调用就会拖慢整个请求,这叫做尾部延迟放大(tail latency amplification)。

应对负载的方法

  1. scaling up
    垂直缩放,迁移到性能更强的机器。
  2. scaling out
    水平缩放,把负载分布到多个小机器上。也叫做share-nothing architecture。

实际工程上可能会混用两种方法,几个性能较强的机器可能比非常多的小机器来的划算。把无状态的服务分布到多个机器较为容易,但是把一个有状态的数据系统分布式化会引入很多额外的复杂性。

大规模分布式系统架构往往是针对应用高度定制化的,并不存在一个通用的分布式架构。因为要解决的问题可能是读为主、写为主、大量数据的存储为主、数据的复杂度、响应时间、访问方式,或者是前面各种因素的混合。

可维护性

软件开发的主要成本并不在最初的开发,而是持续的维护-修bug、运维、排错、兼容性的平台等。主要关注以下方面,

  1. 可操作性,便于运维团队的维护。
    需要提供:
    • 完善的监控
    • 自动化和集成工具
    • 避免依赖特定的机器
    • 文档
    • 良好的默认行为,并提供修改默认行为的方法
    • 自我恢复,并提供手动操作的方法
    • 可预测的行为
  2. 简单,管理复杂性,便于其他开发者理解系统。
    好的抽象可以隐藏复杂的细节。
  3. 可进化,便于修改和增加系统功能,又叫做可扩展性(extensibility)、可修改性(modifiability)、可塑性(plasticity)。
    增加数据系统的敏捷性。

Readings - In Search of an Understandable Consensus Algorithm (Extended Version) (to end of Section 5)论文

Introduction

共识算法(Consensus algorithms)允许一组机器作为一个一致的组工作,这个组可以在某些成员失败的情况下存活。Paxos是过去10多年最常被讨论的共识算法,但是难以理解且不便于实现。提出Raft的主要目标是可理解性。通过解耦leader选举、log复制和安全,以及减少状态空间,来增加可理解性。

多副本状态机(Replicated state machine)与共识算法
共识算法常常出现在多副本状态机的讨论中。

-w384

多副本状态机常用于解决分布式系统中的各种容错问题,使用复制log来实现,每个状态机以相同的顺序执行相同的命令,最终算出相同的状态和结果。保证replicated log的一致是由共识算法来实现的。

共识算法一般有以下几个特征:

  • 安全:非拜占庭条件(网络问题、丢包、乱序)下,不返回任何错误的结果。
  • 可用性:大多数server可用,则系统整体可用。
  • 不依赖时间来保证日志的一致性。
  • 大多数server执行完毕则返回,少量慢server不影响系统性能。

Raft

raft的工作过程是,首先选举leader,leader会负责管理replicated log。leader接受来自client的log,复制到其他机器,并在安全的时候通知其他机器把log输入状态机。

raft可以解耦为以下三个部分:

  1. leader选举(Leader election)
  2. 日志复制(Log replication)
  3. 安全(Safety)

在理解这三个部分前,有几个基本概念需要知道,
server 状态

  • leader:只能有一个leader
  • follower:只响应来自其他server的请求
  • candidate:用于选举

-w433

时间
raft把时间分为terms,term由一个递增的数字标识,表示current term。每个term以leader选举作为起始,如果未选举出leader,那么下一个term继续选。term内选举出的leader管理集群直到term结束。

-w351

不同的server可能观察到不同的term转换,server使用current term(在server通信的时候发送)作为逻辑时钟,来确保能够探测到过期的信息。

通信
使用rpc。基本的共识算法使用两种rpc,

  1. RequestVote RPCs:candidates调用,来进行拉票,用于选主。
  2. AppendEntries RPCs:leader发起,用于复制日志和心跳。

leader选举(Leader election)

选举过程为,follower在election timeout内都没有收到leader的通信,则转换为candidate状态,并自增当前term、为自己投票、向其他server发送RequestVote RPCs。candidate维持在这个状态,直到下列情况出现:

  1. 获得了相同term的多数派的投票,赢得选举。
    一旦candidate赢得选举,就转换为leader,并向其他server发送heartbeat,宣告leadership。
  2. 收到AppendEntries RPCs,其他candidate赢得了选举,成为leader。
    1. 如果leader的term >= 自己的term,那么这个leader是合法的,自己转换为follower状态。
    2. 如果leader的term < 自己的term,那么拒绝这个rpc,自己维持在candidate状态。
  3. election timeout后,未获得足够的选票。
    多个candidate可能同时发起选举,发生了split vote,导致任何一个都无法获得多数派的投票。自增term,开始下一次选举。

在下面的状态图中,状态转换的上方表示触发转换的event,下方表示发生转换时执行的action。

leader election

选举安全性:

  • 在每个term内,每个server只能至多为一个candidate投票。

在选举过程中,raft使用随机election timeout来保证,

  • split votes(没有任何candidate能获得大多数投票)会很少出现。
    比较常见的是,C先timeout,然后投票给自己,并发送RequestVote RPCs,最终win。
    -w397
  • 且split votes能够被快速解决。如果出现split votes,candidate先等待随机election timeout。

日志复制(Log replication)

如何复制

  1. leader追加log entry。
  2. AppendEntries RPC。
  3. 当log entry被安全复制(成为committed log entry,leader更新commit index),
    • leader apply log entry,然后返回结果到client。
    • leader AppendEntries RPC通知follower committed entry,follower apply log entry。

如果follower失败、运行缓慢、丢包,leader重试AppendEntries RPC(就算leader已经返回client结果),直到所有follower最终收到所有log entry。raft保证了committed entry是持久化的,且最终能被所有可用的状态机执行。

那什么时候才算log entry被安全复制?当创建log entry的leader把entry复制到大多数server上以后。这同时也提交了在leader中所有先前的log,包括被前leader创建的log。

应对出错
raft在复制log时会维护如下性质来保证Log Matching Property(log consistency),

  • 如果两个entry在不同的log中有相同的index和term,那么它们的cmd是相同的。
    因为leader至多只创建一个带有相同index和term的entry,且entry不会在log中改变位置。
  • 如果两个entry在不同的log中有相同的index和term,那么所有先前的log都是相同的。
    consistency check:AppendEntries RPC发送新的entry时,会包含前一个entry,如果follower没有找到与前一个entry相同的index和term,那么follower不会写入新的entry。

根据以上两点,结合数学归纳法可知,Log Matching Property可以被满足。在正常情况下,leader和follower的log是保持一致的。

如果leader或follower崩溃引起了log不一致,如何恢复一致性?leader会强制follower复制自己的log,

  1. leader维护了{follower: nextIndex},表示下一个将会发送到follower的entry。
  2. leader启动时,将nextIndex置为max(self log index) + 1
  3. 如果follower的日志不一致,那么接下来的AppendEntries RPC consistency check会失败。leader会将nextIndex - 1并重试,直到成功。

安全(Safety)

任何实现了日志复制的系统,都必须满足:一旦entry已经applied到状态机,那么任何其他状态机都不能为这条entry apply一个不同的值。

截止目前所描述的leader选举,存在的问题是,如果follower未收到leader的提交,那么当它成为leader以后,就会用自己的log覆盖其他server的。

选举约束
任何基于leader的共识算法,leader必须最终包含所有committed entry。raft需要保证所有committed entry,都出现在后续每个新的leader中,而这个保证确保了本节内容开头的安全性要求。

在选举的时候,使用voting process来避免未包含所有committed entry的candidate赢得选举。由于candidate在选举的时候需要联系大多数server,即只需要求每个committed entry必须出现在至少一个server(选举时联系的server)里面即可(抽屉原理)。

如果candidate包含的日志与大多数server包含的log至少一样新,那么这个candidate就包含了所有的log。具体判断方法是,

  1. 在RequestVote RPC进行投票时,rpc包含了candidate最后一个entry的index和term。
  2. 如果voter(收到rpc call的一方)的log是更新的,即,
    $$(lastTerm_v > lastTerm_c) || ((lastTerm_v == lastTerm_c) \&\& (lastIndex_v > lastIndex_c))$$
    那么voter拒绝此次投票。

提交上一个term的entry
新leader能否直接提交上一个term的entry?

-w463

c中s1成为term4的leader后继续复制term2的日志index2,此时日志已经复制到大多数机器上。按照之前的规则,s1可以认为日志已经是committed。但如果接着s5成为了leader(接受s3和s4的投票),s5会覆盖s1已经commit的日志。

因此不能仅通过副本数判断先前term的日志是不是committed,还需要满足至少有一个属于当前term的log也复制到大多数server,才能认为是。

follower和candidate崩溃

raft rpc是幂等的,leader无限重试。

时间和可用性

raft的安全性不依赖时间。但是系统的可用性依赖时间,这里有尤其指的是leader选举。election timeout需要满足下面的要求,raft才能选举并保持一个稳定的leader,

$$broadcastTime \ll electionTimeout \ll MTBF$$

Leadership transfer

本质上还是使用leader选举的机制,通过主动触发目标server进行选举来实现来迁移,与后面的配置变更不同。过程如下:

  1. 旧leader暂停服务。
  2. 旧leader进行正常的日志复制过程,把log复制到目标server。
  3. 旧leader发送TimeoutNow到目标server,触发目标server开始选举(相当于调快了timer,触发了election timeout)。
  4. 目标server大概率会比其他server先发起选举,并成为下一个term的leader。
  5. 选举成功后,旧leader收到heartbeat,出让leadership,至此transfer完成。

如果迁移过程未在一个election timeout内完成,那么终止迁移。如果迁移成功了,但是旧leader误以为失败了,那最坏情况是进行一次额外的选举。

问题

  1. 日志复制时每个步骤都有可能出现错误,如何处理?
    raft并不允许“false positives”的出现(返回client执行成功了,但实际上没有成功),但是可能会有“false negatives”的出现,告知client失败了,但实际上是成功了。为了避免false negatives,client必须重试直到成功,且每次只能有一个未完成的command,相应地,raft也必须提供去重机制。

  2. 为什么不能直接复制上一个term未committed的entry?

  3. raft为何能够防止脑裂?
    2f+1个server,成为leader需要获得大多数server(f+1个)的投票。如果发生网络分区,必然有一个partition的分区少于f+1个。这个分区里面,

    • 如果没有leader,那么也不可能选举出leader。
    • 如果有leader,由于日志无法被安全复制,因此不会对client做出任何承诺(不会成为committed entry,也不会被apply到状态机)。
  4. 为什么需要leader?

    • 简化复制日志的过程。
    • 保证所有的副本都以相同的顺序执行相同的命令。
  5. server变为candidate后会发生什么?
    开始选举:inc term,vote self,发送RequestVote RPC。可能结果如下,

    • 赢得选举。
      变为leader。
    • 选举失败,收到来自其他leader的AppendEntries RPC。
      变为follower。
    • 既没有赢得选举,也没有收到来自其他leader的AppendEntries RPC。
      维持candidate。election timeout,开始下一次选举。
      如果一直无法获得大多数server的投票,这个过程会一直重复,term会一直增加。
      此时,如果client有请求会发生什么?
  6. 如何保证同一个term内只有一个leader?

    • leader需要获得来自大多数server的投票。
    • 在同一个term内,每个candidate只能投票一次。
    • 在同一个term内,至多只有一个server能获得大多数server的投票(即使出现网络分区)。
  7. 什么情况下选举会失败?

    • 大多数server都失败了。
    • 两个server获得了相同票数(例如:2f+1个server,挂了一个server)。
  8. 选举失败会发生什么?
    election timeout,开始下一次选举。

  9. 如何设置election timeout?

    • $broadcastTime \ll electionTimeout \ll MTBF$
    • random
    • 至少是几个heartbeat间隔
  10. replicated和committed entry的区别?

    • replicated是做了复制,可能被覆盖。
    • committed,已提交,不会丢失。
  11. 每个副本的日志会完全一致吗?
    不会,可能落后,可能会有临时的不一致。但最终会一致,且状态机执行的命令是一致的。

  12. 为什么leader不能直接提交上一个term的entry?以及如何避免直接提交带来的问题?
    提交时,需要等到至少一个当前term的entry也安全复制以后。

  13. leader在什么时候覆盖follower的entry是合法的?
    uncommitted entry。

  14. leader覆盖follower的entry时,可能会覆盖committed entry吗?
    不会。

    1. 先看entry成为committed时,发生了什么。log entry需要安全复制,即发送到大多数server上。
    2. candidate成为leader,需要联系大多数server,以保证自身的log与大多数server的一样新(选举约束)。因此成为leader的candidate一定包含了所有committed entry。
  15. 如果下图中的leader(for term 8)挂了,a、d和f,哪个会被选举为leader?谁投的票?新leader产生后,哪些entry一定会保留下来?
    -w478

    a-f每个server最多可能获得投票的情况如下:
    a:a,b,e,f
    b:b,f
    c:a,b,c,e,f
    d:a,b,c,d,e,f
    e:b,e
    f:f

    所以a或d可能成为leader,一定会保留下的log是111445566。

  16. 如果server刚开始选举,此时收到了来自leader的AppendEntries RPC(假设leader与这个server都是处于同一个term),此时要怎么处理?

    • candidate成为leader后,应该立即发送心跳来避免发生多余的选举。
    • 如果发生这个情况,由于term一样,leader会忽略RequestVote RPC。
  17. 如果leader完成复制,更新自身的commit index,且完成了apply,但在通知follower commit index前挂了(或丢失了leader资格),这个entry最终会丢失吗?

  18. lastApplied是否应该做持久化?
    这个地方有点争议。在ongardie/dissertation的Errata中,指明了lastApplied是否做持久化应该与状态机一致。但在Raft Q&A中的说法是,状态机如果做持久化,那么也应该负责记住执行了哪些log。

  19. follower拒绝AppendEntries RPC时,如何快速的解决冲突?
    follower拒绝时,返回冲突entry的term,以及这个term首个entry的index。

    • 如果leader有这个entry,那么移动nextIndex到冲突的那个entry。
    • 如果leader没有这个entry,那么移动nextIndex到那个term的首个index。

    还可以使用二分法,查找首个冲突的entry,这可以实现在最坏情况下的最佳时间复杂度。

Lab 2A: Raft

锁的使用建议

这个lecture说了几点关于锁的建议,我觉得最重要的一点是,

Be careful about assumptions across a drop and re-acquire of a
lock. One place this can arise is when avoiding waiting with locks
held.

在减少锁的粒度的时候,可能也把变量的write和read分开了,进而导致在read的时候,变量已经被其他线程改变了。

References

  1. Why Raft never commits log entries from previous terms directly
  2. raft之 为什么不能直接commit 上一个term的entry
  3. raft本质理解
  4. Raft user study lectures
  5. The Raft Consensus Algorithm
  6. ongardie/dissertation
  7. ongardie/runway-model-raft
  8. Raft Q&A

Readings - Practical System for Fault-Tolerant Virtual Machines Fault-Tolerant Virtual Machines论文

这篇论文讨论的主从复制与常见的相比,非常极端和雄心勃勃,论文基于vm构建了一个os级别的主从复制系统,较为细致的讨论了主从复制的设计和实现。

介绍

主从复制是实现server容错的常用方法,如果primary挂了,backup可以继续提供服务。将primary的全量数据同步到backup是比较耗费带宽的操作。更好的方法是将server视为确定状态机,基本思路是把primary和backup置为相同的初始状态,并保证二者都能以相同的顺序收到相同的输入。由于存在某些操作是非确定的(如:中断和获得当前时间),因此还需要额外的协同。

FT协议

Deterministic Replay

为了实现在backup进行replay,需要保证:

  1. 正确的捕获所有输入和非确定的操作,以保证backup的确定性执行。
  2. 在backup上正确的replay。
  3. 对性能没有影响。

论文中关于非确定的操作,零零散散在不少地方提到,不过总结下来其实就是,这个操作不是pure function,结果受除输入的外部状态影响。更具体的就是,非确定操作使用到的资源是与其他(来自vm或hypervisor)的进程共享的,竞争导致了非确定的结果。

在多核系统上,上述的竞争会更复杂,这也是为什么论文没有在多核系统上实现的原因。

FT协议

primary的日志实时的通过logging channel发送到backup。为了保证在切换的时候,backup能够以与primary相同状态进行服务,需要满足:

Output Requirement:如果primary失败,backup接替了primary,backup的执行需要保证所有的输出都与primary的一致。

只要满足Output Requirement,在发生主从切换的时候,client就感知不到打断和不一致。为了保证Output Requirement,需要backup在收到能够产生那个output的input之前,primary推迟输出,也就是下面的规则:

Output Rule:只有当backup收到能够产生一个output的log,并返回ack(表示已经存入log buffer)给primary,primary才能输出。

除了VM-FT有,在其他的多副本系统上都以某种形式出现着。

-w470

primary等待ack不必阻塞系统当前的执行。

检测和响应失败

检测失败:

  • server之间的heartbeat
  • logging channel的traffic是否一直存在

如何避免split-brain:

  • 共享磁盘存储,原子的检测标志位

实现

FT VM启动

  • 使用基于VMware VMotion实现的FT VMotion完成clone,primary的中断小于1s。
  • 使用VMware vShpere实现的集群服务来选择最佳server创建backup。

logging channel管理

-w322

log buffer的局限:
primary(或backup)的log buffer满了(或空了),primary(或backup)需要等待至log buffer可用为止,此时会暂停对client的服务。

主从切换的时间=检测primary失败的时间+backup消费log buffer的时间,为了减少切换的时间,在backup无法及时消费log的时候,会减慢primary的速度(限制CPU)。

FT VM上的操作

除VMotion外的所有运维操作(例如:调整cpu限制),都必须在primary上完成,然后通过特殊的日志项目发送到backup。

关于磁盘io的实现问题

  1. 磁盘访问时的竞争会导致非确定的操作。
    • 检测这样的io竞争,并在primary和backup上以相同顺序执行。
  2. 使用DMA时,对磁盘和内存的访问会导致非确定的操作。
    • 对磁盘的读写,改为读写bounce buffer。
    • 只有当数据全部读到buffer以后,FT才会从读指令处恢复primary/backup的执行,避免二者出现差异(如果直接依赖DMA读取,primary和backup可能会在不可预知的时间点发生缺页中断,从而导致不一致)。
  3. primary在磁盘io完成前就挂了。
    • backup切换为primary以后,无从得知io是否已经完成。FT会在backup上重试(前两点已经消除了导致非确定结果的操作)未完成的io操作。
    • 那么如何判断未完成的io操作是哪些?io完成以后,设备会产生一个中断,如果某个io log缺少相应的中断,那么就需要重试。
  4. primary在磁盘io完成后挂了
    • backup同样无从得知io是否已经完成。此时重新执行写磁盘是幂等的(而对于网络io,tcp会忽略重复的包)。

关于网络io的实现问题

hypervisor对vm buffer的更新导致了非确定操作,改为由vm触发hypervisor的中断,并记录到log。

其他设计

非共享磁盘

优势:

  • 磁盘作为vm的内部状态,primary写磁盘时不必等待backup的ack。

缺点:

  • backup失败、执行FT VMotion时,需要同步磁盘数据。
  • 需要使用其他方法来解决split-brain,例如:另外一个server作为共享存储;多个机器进行投票选主。

backup上执行read

在从执行read,论文考虑的点并不是可以增加系统的read上限,而是考虑不发送read log可以减少logging channel的带宽。

具体看在backup上执行read,需要注意的点是:

  • 如果primary成功,backup失败,backup需要重试,直到成功。
  • 如果primary失败,那么尝试读取的内存必须发送到backup。
  • 如果primary执行了读,然后是写,那么在backup也必须按照这个顺序来执行。

Lecture

Fault tolerance

理想的特性

  • 可用性
  • 强一致性
  • 对client和server software透明
  • 高效

容错,容什么错?

  • Fail-stop failures
  • Independent failures
  • Network drops some/all packets
  • Network partition

不包括:

  • Incorrect execution
  • Correlated failures
  • Configuration errors
  • Malice

Fault tolerant MapReduce master

lab 1中的woker由于是无状态的,且mapper和reducer执行的都是pure function,因此实现容错很简单。

master实现容错,需要考虑:

  • 需要复制什么状态?应用程序级?指令级?(woker list、完成的job、空闲的worker、tcp连接状态、程序memory和stack、cpu寄存器?)
  • primary是否需要等待backup?
  • 什么时候切换到backup?
  • 切换时是否能被观察到?
  • 如何快速的切换?

主要方法有

  1. State transfer,简单,但是state可能会很大,传输很慢
    • 主副本执行服务
    • primary把新状态发送到backup
  2. Replicated state machine,高效但复杂
    • 所有副本执行所有的操作
    • 如果所有副本有相同的起始状态、相同的操作、相同的顺序、deterministic,那么就会有相同的最终状态
    • 例如:VM-FT,GFS

VM-FT

VM-FT是一个replicated state machine。

为了避免primary和backup出现差异,backup必须在指令流的相同位置、以相同的顺序看到相同事件。对于普通的指令,这个是比较容易实现的。FT对中断和磁盘/网络io会做特殊的处理。

例如:时钟中断

ft time interrupt

FT的有缺点

适用场景

  1. 重要且低延迟的服务,例如:name server。
  2. 不便于修改的服务。

不适用的情况

  1. 高吞吐量的服务
    • state仅仅是应用程序级别的,降低了开销,实现更高的性能。
    • 例如:GFS适用应用程序级别的state。

Reference

  1. FAILURE MODES IN DISTRIBUTED SYSTEMS

Readings - The Google File System论文

介绍

GFS是由Google设计和实现的,以满足Google对数据处理快速正常的需求。GFS和先前的分布式文件系统有很多相似的目标,例如:性能、可扩展性、可靠性和可用性。然而,GFS的设计是由Google对应用负载和技术环境的关键(当前和预期的)观察驱动的,这反映了与早期文件系统设计假设的显著不同。Google重新审视传统的选择,并探索在设计领域探索了彻底不同的观点。

  • 组件故障是常态而非例外。
    因此持续的监控、错误监测、容错和自动恢复是系统不可缺少的。
  • 传统标准的文件是巨大的,常常很多GB。处理包含数十亿个对象、很多TB、且快速增长的数据集时,即便文件系统可以支持,也很难管理数十亿个约KB大小的文件。
    因此设计假设和参数,例如IO操作和block大小必须被重新审视。
  • 大多数文件修改都是通过追加新数据的方式,而非覆盖已有的数据。对文件的随机写几乎已经不存在了。一旦写入,文件就是只读的,且常常只是顺序读。
    鉴于这样对大文件的访问模式,追加成为了性能优化和原子保证的的关注点,而在client上对数据块的缓存失去了吸引力。
  • 应用程序和文件系统api的协同设计,通过提升灵活性,有益于整个系统。

设计概览

接口

虽然GFS没有实现例如POSIX的标准API,但也提供了一组熟悉的文件系统接口。文件以层次结构的方式用目录组织起来,并用路径名来标识。GFS支持create,delete,open,close,readwrite文件。GFS还支持snapshotrecord append

架构

一个GFS集群由一个master和多个chunkserver构成,可以被多个client访问。

-w757

文件被切分为固定大小的chunks。每个chunk都由一个不可变,且全局唯一的64位chunk handle标识,这个chunk handle在创建chunk时master分配的。chunkservers把chunk作为linux文件存储在本地磁盘上,并通过指定的chunk handle和byte range来读写文件。为了reliability(可靠性),每个chunk会在多个chunkservers上有复制。

master维护这整个文件系统的metadata,包括namespace,访问控制信息,文件到chunk的映射,以及chucks的当前位置。master还可控制着系统级别的活动,例如chuck租约管理,孤儿chunk的垃圾回收,以及chunkservers之间的chunk迁移。master定期与每个chunkserver以HeartBeat消息的形式进行通信,完成指令的发送和状态的收集。

client与master通信来进行metadata相关的操作,数据相关的通信是直接与chunkserver进行的。

client和chunkserver都不需要缓存数据。client缓存数据的收益很小,因为大多数程序都是流式读取大文件或者数据量太大而无法缓存。client不用缓存数据消除了缓存一致性的问题,简化了系统设计。但client会缓存metadata。chunkserver不必缓存文件数据,因为chunk都是以本地文件的形式保存的,linux buffer cache会把常访问的数据放入内存。

单一master

单一master极大的简化了设计,并使master能够使用全局信息来进行复杂的chunk布局和复制决策。然而当读写时,必须最小化master的参与,这样master才不会成为瓶颈。client进行读写时,先询问master应该连接哪个chunkserver,并缓存这个信息一段时间,然后直接与这个chunkserver交互来完成后续的操作。

chunk size

chunk size是关键设计参数之一。GFS使用一个远大于典型文件系统的block size,64MB。每个chunk副本都以普通linux文件的形式存储在chunkserver,并在需要的时候扩展。惰性空间分配避免了由于内部碎片导致的空间浪费。

大型chunk size有这些优势,

  • 减少了client与master交互的需求。
    读写同一个chunk只需要向master请求一次chunk的位置信息。
  • 由于chunk较大,client也较为可能在一个给定的chunk上进行很多的操作。
    通过在较长时间内保持与chunkserver的TCP连接,可以减少网络开销。
  • 减少了存储在master上metadata的大小。
    由此可以将metadata放入内存。

然而大型chunk size,即便有惰性空间分配,也存在弊端,

  • chunkserver的热点访问。
    一个小文件仅有为数不多的chunks组成,可能就一个。如果大量的client都访问同一个文件,那么存储这些chunk的chunkserver可能会成为热点。

Metadata

mater主要存储3类metadata:

  • 文件和chunk namespace
  • 文件到chunks的映射关系
  • 每个chunk副本的位置

其他metadata还包括所有权和权限、每个chunk的版本、引用计数(用于实现copy-on-write)。

所有的metadata都是存储在内存中的。

  • 前两种还会进行持久化存储(使用write-ahead log),这是通过把记录修改到操作日志、在master落盘、以及在远程机器上存放副本来实现的。
  • 对于最后一种metadata,master并不会做持久化存储。在master启动和有chunkserver加入集群的时候,master会询问每一个chunkserver存放的chunks。

In-Memory Data Structures
由于metadata是存放在内存中的,因此mater的操作很快,除此之外还能完成定期在后台较快的完整扫描。这个定期扫描用于实现chunk的gc,chunkserver故障时副本重新复制,以及chunk的迁移。

一个潜在的问题是存储的chunk数目受限于内存的大小。但由于每个chunk的metadata少于64byte,且文件的namespace数据也少于64byte,并且启用了前缀压缩,因此这不是一个严重的问题。如果确实有必要支持更大的文件系统,加内存即可。

Chunk Locations
master并不对chunk locations做持久化。master启动时,会向所有chunkserver请求。之后,由于master控制着所有chunks的放置,并通过心跳消息来监控chunkserver,master能够确保自身的信息是最新的。

为什么不做持久化?

  1. 消除了master和chunkserver的同步问题。
    chunkserver可能加入、离开、重启、重命名、故障等。
  2. chunkserver对自己有和没有哪些chunk有最终的话语权。
    在master维护此信息的一致视图是没有意义的,chunkser可能出现1中的各种问题。

Operation Log
操作日志对GFS很重要,

  • 包含了metadata关键修改的历史记录,并持久化。
  • 作为逻辑时间戳,定义了并发操作的顺序。
    文件和chunks,以及它们的版本,全都在创建的时候被逻辑时间唯一且永久的标识。

可靠性保证,

  • 仅当metadata的修改完成持久化以后,这些修改才对client可见。
  • 在多个远程机器上有复制。
    • 仅当把相应的log记录写入本地和远程机器的磁盘后,才响应client。
    • master会批量flush日志,来减少flush和复制对整个集群吞吐量的影响。
  • master通过重放操作日志来恢复文件系统的状态。
    • 为了减少启动的时间,需要保证log较小。
    • 当log增长超过特定大小时,master会checkpoint自身的状态,以便可以在恢复时载入最后一个checkpoint并重放在那之后的log。

checkpoint和恢复,

  • checkpoint类似于压缩后的B树,可以直接map到内存,并用用户namespace的查找,且不需要额外的解析。
  • master创建checkpoint时,会切换到新log文件,并在另外的线程中创建checkpoint(包含了checkpoint前的所有修改),避免延误当前的修改。
  • 恢复只需要最近的一个完整(需要检测是否完整)checkpoint和后续的log文件。更老的checkpoint和log是可以释放的。

一致性模型

GFS有一个宽松的一致性模型,这在支持高度分布式应用的同时,也相对简单和高效。

Guarantees by GFS
文件namespace修改(例如:创建文件)是原子的,由master专门执行。namespace锁保证了原子性和正确性;master的操作日志定义了这些操作的全局顺序。

一个文件区域有两种状态,

  1. consistent:无论client从哪个副本读取,都能看到相同的数据。
  2. defined:在文件数据修改后,如果文件区域是consistent的,并且client能看到所有写入到文件的修改。

数据修改包含,

  1. 写入
    • 写入操作会把数据写到应用程序指定的文件offset。
  2. 追加
    • 即使在并发修改存在的情况下,追加会把数据在GFS选择的offset处至少一次原子追加一次。
    • 这个offset会返回给client,标记了一个defined文件区域的起始位置,这个区域包含了已追加记录。
    • GFS可能会插入填充或重复记录项。他们占据的区域被认为是inconsistent,且占用户数据总量的很小一部分。
Write Record Append
Serial success defined(同时也是consistent) defined interspersed with inconsistent
Concurrent success consistent but undefined
1. 所有client都能看到相同的数据。
2. 但是这些数据并不能反映任何修改所写入的内容。
3. 这些数据一般包含混合了多个修改的片段。
defined interspersed with inconsistent
Failure inconsistent(同时也是undefined) inconsistent

如何区分defined和undefined的文件区域?

在一些列的成功修改后,被修改的文件区域保证是defined(?),且包含了最后一次修改所写入的数据。GFS通过以下方式来实现这个保证,

  1. 在所有的副本上以相同的顺序对chunk进行修改。
  2. 用chunk版本号来检测过期的副本(由于chunkserver下线导致缺失修改)。
    过期的副本不会参与到修改或返回给client,会尽早的被gc掉。

不过这里我存疑,例如:concurrent write,并不能保证defined。

某些client会cache chunk的位置。由于cache的timeout,读取到过期数据的时间窗是有限的。另外,对于大多数文件都是追加操作,一个过期的副本通常会返回过早结束(a premature end of chunk)的文件块,而非过期的数据。

在修改完成很长时间后,机器故障也会破坏或摧毁数据。GFS通过master和chunkserver定期的握手来检测失效的chunkserver,并通过校验和来检查数据损坏。当问题出现时,数据会尽快的从有效的副本进行恢复。如果在GFS来不及反应(没有足够的处理时间)的时候,所有的副本都丢失了,那应用能收到错误,而不是看到损坏的数据。

Implications for Applications
GFS应用可以通过一些已经用于其他目的的简单技术来适应宽松一致性模型,

  • 依赖追加而非覆盖,以及checkpointing
    • 例如:writer从头生成一个文件,待所有数据写入完毕以后,原子的把文件进行重命名。也可以定期创建checkpoints记录成功写入了多少。checkpoints也可以包含应用级别的校验和。readers只检查和处理至最后一个checkpoint之间的文件区域,这些文件区域是defined。
    • 追加远比随机写要高效和有弹性的多。
    • checkpoint允许writer以增量的方式重新写入,并避免reader处理那些从应用的角度看仍是不完整的数据,尽管这些数据已被成功写入。
  • 写入时自我校验和自我识别的记录
    • 记录是以至少追加一次的语义来记录每个writer的输出的,因而reader需要处理偶然的填充和重复的情况。
    • writer写入的每个记录都包含额外的信息,例如校验和,用于验证。reader用校验和来识别并丢弃填充数据,以及记录片段。
    • 如果无法容忍偶然出现的重复(例如,这些数据会触发非幂等的操作),那可以通过记录中的唯一标识来进行过滤。

Checkpointing allows writers to restart incrementally and keeps readers from processing successfully written file data that is still incomplete from the application’s perspective.
我没有完全理解所谓的从应用的角度看仍是不完整的数据,是指的最后一个checkpoint之后的那些数据?

系统交互

GFS的设计可以最大限度的减少master参与到所有的操作。

租约和修改顺序

文件的修改发生在chunk的所有副本上,GFS使用租约来维护副本之间一致的修改顺序。master会将一个chunk的租借给其中一个副本,这个部分叫做主副本。主副本选择对chunk所有修改的序列顺序,所有的副本在应用这些修改的时候都会遵循这个顺序。最后,全局的修改顺序,首先由master选择的租借授权顺序定义(先选择某个副本为primary,然后可能又选择了另一个为primary),并在租期内由主副本分配的序列号定义。

租约机制的好处是最小化master的管理开销。

  • 租约起始的timeout是60s,但只要chunk还在被修改,主副本可以无限次的请求,然后(一般情况下)得到timeout的扩展。
  • timeout扩展的请求的授权是存放在master和所有chunkservers定期交互的心跳包里面的。
  • master可能会在租约过期前撤销(例如master要禁止一个正在rename的文件的修改)。
  • 即使master丢失了与primary的通信,master也可以在老的租约过期后,将新的租约授权给另一个副本。

-w368

下面以写入为例说明整个过程,

  1. client向master请求持有租约的chunkserver以及其他副本的位置。如果租约未被持有,master会授权一个副本。
  2. master返回给client primary的标识和其他副本的位置。client cache这些数据,且仅在无法连接到primary或primary不在持有租约的时候才会联系master。
  3. client将数据推送到所有副本,任何顺序均可。每个chunkserver会将这些数据存储在内部的LRU缓存中,直至数据被使用或过期。
  4. 一旦所有副本确认收到数据以后,client向primary发送写请求。写请求标识了先前推送到所有副本的数据。
    • primary为所有收到的修改(可能来自多个client)分配连续的序列号,序列号提供了必要的序列化(necessary serialization)。
    • primary将修改以序列号顺序应用到本地状态。
  5. primary向所有从副本转发写请求。每个从副本都以primary分配的相同序列号顺序应用修改。
  6. 所有从副本回复primary,表明已完成操作。
  7. primary回复client。

如果上述过程出错,

  • 任何副本发生错误,都会汇报给client。
  • 如果在primary发生错误,序列号将不会被分配和转发。
  • client的请求被认为已失败,被修改的文件区域将处于不一致状态。
  • client会做重试,先会重试步骤3~7,如果不行,会完全从头重试写入。

如果存在另一个并发写入到相同位置的client,

  • 前一个client写入的内容被后一个写入的覆盖
  • 所有副本都有相同的数据,但是混合了来自两个client的数据,consistent but undefined。

如果一个写入的数据很大或跨越了chunk的边界,GFS client会把数据打散成多个写操作。这些写操作遵循了上述流程,但是可能会与其他client的并发写操作交替和被覆盖。因此文件区域将会出现consistent but undefined的情况。

数据流

从控制流解耦数据流是为了高效的使用网络,

  1. 最大化利用每个机器的网络带宽
    • 数据以流水线的方式,沿着精心挑选的chunkservers链进行线性的推送,而非以其他拓扑形式来推送(例如:树)。
    • 这样每个机器的所有出口带宽都会用书尽可能快的传输数据,而非将带宽拆分到多个接收者。
  2. 避免网络瓶颈和高延迟的链路
    • 交换机链路通常有这两个问题。
    • 每个机器会把数据转发到网络拓扑中最近(通过ip地址来估算)的且未收到数据的机器。
  3. 最小化推送所有数据的延迟
    • 通过在TCP连接上以pipelining的方式进行数据传输来实现
    • 一旦某个chunkserver收到一份数据,就立即开始转发。立即转发并不会降低接收数据的速度。
    • pipelining在使用全双工链路交换网络的情况下很有用。
    • 在没有网络拥塞的情况下,传输一份R个副本B byte的数据,理想的时间开销是B/T + RL,T是网络吞吐量,L是机器间的传输延迟。

Atomic Record Appends

GFS提供了原子追加记录的操作,叫做record append。

  • 传统的写操作中,client指定data需要写入的offset。并发写入到同一个区域是不可序列化的。
  • record append中,client指定data,GFS在自己选择的offset处以原子的方式至少一次追加数据到文件末尾,并将offset返回给client。

record append是一种修改操作,遵循前面描述的控制流。

  1. client向文件最后一个chunk的所有副本推送数据,然后向primary发送请求。
  2. primary检查追加到当前chunk是否会导致chunk超出大小限制。
    • 如果是,那么填充当前chunk,并告知从副本也填充。返回client需要在下一个chunk上重试。
    • 追加记录被限制在0.25 * maximum chunk size,以限制碎片在可接受的范围内。
    • 如果没有超过,那么primary进行追加,并告知从副本也在相同的offset追加,最后返回client操作成功。
  3. 如果在任何副本上追加失败,client会重试。

重试追加时,

  • 重试的结果是,同一个chunk的副本可能会包含不同的数据,可能包括部分或全部相同的记录。
  • GFS并不保证所有副本每个字节都是相同的,只能保证数据会以一个原子单位的形式至少一次写入。
  • 对于一个成功的写入操作,数据一定写在某个chunk的所有副本的相同offset位置。在这之后,所有副本都至少与记录的结尾一样长。因此任何后续的记录,即使另一个副本成为primary的情况下,都会被分配到更高的offset,或者另一个chunk。

Snapshot

GFS使用copy-on-write的方式来实现文件或目录的快照,创建snapshot的速度很快。

  • master收到快照请求时,首先撤销将要做snapshot的所有授权chunk的租约。
  • 租约撤销或过期后,master在磁盘记录操作日志。接着通过复制文件或目录树的metadata,把日志应用到内存状态。新创建的文件快照与原文件指向相同的chunks。
  • client写入某个chunk C时,先通过master找到primary。master发现引用计数不唯1,选择一个新的chunk handle C’,并通知相关chunkserver本地复制创建C’。
  • 最后client遵循前面的过程进行写入。

对目录进行snapshot的时候,复制的metadata是什么?

master操作

master执行,

  • 所有namespace的操作。
  • 管理chunk副本的放置、创建、复制、以及与各种系统级活动协调来保证:chunk是完全备份的、平衡chunkserver的负载、回收未使用的空间。

namespace管理和锁

GFS使用完整路径名到metadata的map来从逻辑上表示namespace。存储的时候使用前缀压缩。

锁以及加锁方式,

  • namespace树的每个节点(绝对文件名或绝对目录名)都有关联的读写锁,读写锁是惰性分配的。
  • master进行每个操作前,都会对路径名/d1/d2/.../dn的每一级父目录加上读锁,并对/d1/d2/.../dn/leaf加上读锁或写锁,leaf可能是文件或目录。
    这里不需要对父级目录加写锁,因为并没有真正意义上的父级目录,也没有需要避免并发修改的数据。只要能防止被删除、重命名或被snapshot即可。
  • 锁是以一个一致的全序来获取的,以避免死锁。首先按namespace树的层级排序,然后按同一级的字典序排序。

副本放置

副本放在分布在多个机器内是不够的,还需要分布在不同的机架,放置策略有两个目的:

  • 最大化数据可靠性和可用性。
  • 最大化带宽利用。

Creation,Re-replication,Rebalancing

chunks在三种情况下会进行创建:

  1. master创建新chunk。
    创建时会考虑这些因素,
    • chunkserver磁盘使用率低于平均值。
    • 限制每个chunkserver最近创建数。结合上一点,如果不限制,那么对于一个空chunkserver,迁移时繁重的写操作,会导致磁盘I/O过高。
    • 跨多个机架。
  2. 副本数低于目标值(由于机器不可达、副本损坏、目标值增加),master重新创建副本。
    • 每个需要被re-replicated的chunk的优先级由多个指标衡量:低于目标值多少?优先为存在的文件(live files)创建而不是最近被删除的。优先创建任何阻塞了client操作的的chunk。
    • 创建时,chunkserver会从有效的副本直接复制,会考虑1中的各种因素。
    • 为了避免对client流量的影响,master会限制集群和每个chunkserver的clone数目。chunkserver会限制从源chunkserver的读请求来实现限速。
  3. 平衡磁盘空间和负载。
    • master定期进行rebalance,一般来说选择移除磁盘空闲空间低于平均值的。
    • 新chunkserver会被逐渐的填满,而非立即将新chunk和写流量都直接打上去。

垃圾回收

当文件删除的时候,GFS不会立即回收物理存储。GFS会在定期的gc期间,在文件和chunk级别进行回收。

机制

当文件被删除的时候,

  • master立即记录删除日志,并将文件重命名到一个包含删除时间的隐藏名字。
  • master定期扫描文件系统的namespace时,删除存在3天以上(可配置)的隐藏文件,这有效的切断了与chunks的连接。
  • master定期扫描chunk namespace,识别出孤儿chunk,并清除它们的metadata。
  • chunkserver与master的心跳包汇报了有哪些chunks,master告知哪些是已经不在metadata中的,可以自由删除。

这个删除机制相比立即删除,有很多好处,

  • 在机器故障很常见的大规模分布式系统中,这个机制简单可靠。
    chunk的创建可能只在部分chunkserver上成功;副本删除消息可能丢失,master必须重发。
  • 将存储回收合并到了mater常规的后台活动里。
    可以批量完成,开销被均摊了。
  • 仅在master相对空闲的时候完成。
  • 延迟回收提供了应对意外、不可逆删除的保障。

最大的缺点是,延迟机制妨碍了用户在存储紧张的时候,对空间使用的调优。

  • GFS通过加快对再次明确删除已删除的文件的回收来解决这一问题。
  • GFS还允许对namespace的不同部分设置不同的复制和回收机制。

过期副本检测(Stale Replica Detection)

副本在chunkserver失败且错过修改的时候会过期。

  • master维护了一个chunk version number来区分最新和过期的副本。在授权租约前,master会增加chunk version number,并通知所有最新的副本。master和所有副本都会将版本号进行持久化存储。
  • 失败的chunkserver在重启后,master通过心跳包可得知副本过期。
  • 反过来,如果chunkserver的版本号高于master记录的,master会假设在授权的时候自己失败了,并把更高的版本号作为最新的。
  • 过期的副本使用gc进移除。
  • mater把primary返回给client的时候,以及通知chunkserver进行clone副本的时候,会带上版本号,二者会检查版本号以保证访问到最新的数据。

容错和诊断

高可用

GFS通过两个简单却有效的策略来实现系统高可用:快速恢复、副本。

  • 快速恢复
    • master和chunkserver可快速启动。
    • 不区分正常和非正常终止。
  • chunk副本
  • master副本
    • 为了master状态的可靠性,操作日志和checkpoint会被复制到多个机器上。只有flush到本地和所有远程机器上的修改,才认为是已经提交了的。
    • 提交与写入数据的顺序是什么,在写入数据完成前还是后?
    • 如果在log复制到多个机器前,master挂了?如果未复制log无法恢复?
    • shadow master
      • 提供了文件系统的只读访问,因为log可能落后于master;不可写,会导致脑裂。
      • 与primary相同的顺序apply操作日志。
      • 启动时定位chunk副本的位置(启动后就不会很频繁),并定期与chunkserver通信监控它们的状态。
      • 仅当primary更新副本位置时,需要依赖master。
      • 可被晋升为master。
    • 主从切换是如何进行的?

数据完整性

  • chunkserver使用checksum来检查数据损坏。
    • 直接对比两个chunkserver上的副本是不不现实的。
    • 两个不同的副本可能是合法的(详见Atomic Record Appends)。
    • 每个chunkserver必须独立维护checksum来维护副本完整性。

chunk结构:

  • 细分为64KB block。
  • 每个block都有32bit checksum。
  • 保留在内存中,且与log一起持久化存储,和用户数据分离。

对于读操作:

  • 在返回给client或chunkserver前,会检查与读取范围重叠的部分。因此损坏的数据不会传播开。
  • 如果checksum不匹配,返回错误给请求方,并通知master。请求方会读取其他副本,master会从其他副本clnoe数据。新副本到位后,通知chunkserver删除错误的副本。

checksum对(读和追加)性能的影响较小:

    • 绝大多数读只跨越少数几个block,因此只需要对少量数据进行校验checksum。
    • GFS client在读取是对尝试对齐checksum block的边界。
    • checksum的查找和比较不需要做I/O,checksum的计算可以和I/O同时进行。
  1. 追加

    • 只需要增量的更新最后一个block的last partial checksum,并计算后续新block的checksum。

    • 计算last partial checksum block已经损坏,现在无法检测到它,新的checksum也将与存储的数据不匹配,并且在下次读取时将像往常一样检测到损坏。

      现在无法检测到它(we fail to detect it now),这具体指的是?

但对于覆盖现有chunk某个range的写操作,必须先读取和检查被覆盖range的首个和最后一个block(为了避免新的checksums可能隐藏存在于未被覆盖区域内的数据损坏),然后写入,最后计算和存储新的checksum。

诊断工具

大规模和细致的诊断日志可以极大帮助问题隔离、debug和性能分析,且只有很小的开销。

思考问题

以下问题部分是我自己提出,部分来自Distributed-Systems/Lec03_GFS/Question.md

  • 为什么存储三个副本?而不是两个或者四个?
  • 为什么不适用RAID?
    重点是整机的容错,而非存储设备的容错。
  • chunk的大小为何选择64MB?这个选择主要基于哪些考虑?
    • 减少了client与master交互的需求。
    • 由于chunk较大,client也较为可能在一个给定的chunk上进行很多的操作。
    • 减少了存储在master上metadata的大小。
  • master的checkpoint与application checkpointing的区别是什么?
  • 论文提到append机制可以用于multiple-producer/single-consumer queues,这个具体是如何实现的?
  • GFS主要支持追加(append)、改写(overwrite)操作比较少。为什么这样设计?如何基于一个仅支持追加操作的文件系统构建分布式表格系统Bigtable?
  • 为什么要将数据流和控制流分开?如果不分开,如何实现追加流程?
    • 最大化利用每个机器的网络带宽
    • 避免网络瓶颈和高延迟的链路
    • 最小化推送所有数据的延迟
  • GFS有时会出现重复记录或者补零记录(padding),为什么?
  • 租约(lease)是什么?在GFS起什么作用?它与心跳(heartbeat)有何区别?
  • GFS追加操作过程中如果备副本(secondary)出现故障,如何处理?如果主副本(primary)出现故障,如何处理?
  • GFS master需要存储哪些信息?master数据结构如何设计?
  • 假设服务一千万个文件,每个文件1GB,master中存储的元数据大概占用多少内存?
  • master如何实现高可用性?
  • 负载的影响因素有哪些?如何计算一台机器的负载值?
  • master新建chunk时如何选择chunkserver?如果新机器上线,负载值特别低,如何避免其他chunkserver同时往这台机器迁移chunk?
  • 如果某台chunkserver报废,GFS如何处理?
  • 如果chunkserver下线后过一会重新上线,GFS如何处理?
  • 如何实现分布式文件系统的快照操作?
  • chunkserver数据结构如何设计?
  • 磁盘可能出现“位翻转”错误,chunkserver如何应对?
  • chunkserver重启后可能有一些过期的chunk,master如何能够发现?

Lectures

什么是一致性?

  • 正确性条件
  • 重要但是当存在数据副本的时候难以实现,尤其是应用并发访问时
  • 弱一致性
    read()可能返回过期的数据。
  • 强一致性
    read()使用返回最近write()的数据。

“理想”的一致性模型

  • 一个存在副本的FS行为和不存在副本的FS一样。
  • 读可以观察到最近的写。

实现“理想”的一致性模型的挑战

  • 并发
  • 机器失败:任何操作都可能失败
  • 网络分区:每个机器/磁盘不是总能访问到的
  • 为什么这些挑战难以克服?
    • 需要c/s间的通信:可能影响性能
    • 复杂的协议:难以正确的实现
    • 不少系统没有提供理想的一致性模型

GFS是否实现了“理想”的一致性模型

  • 对于目录:实现了
    • 强一致性,只有一个副本
    • 不是高可用,可扩展性受限
  • 对于文件:不总是

总结

  • GFS优秀的地方:
    • 顺序读写性能高
    • 追加
    • 吞吐量大
    • 数据容错
  • GFS不好的地方:
    • master容错
    • 小文件(master是瓶颈)
    • client可能看到过期的数据
    • 追加可能重复

Lab

  • Lecture 3没有关于GFS的实验,不过我找到了ppca-gfs,看介绍是上交ACM班一个课程的作业,就用这个来补上GFS的实验吧。
  • lab的代码在github.com/chaomai/mit-6.824

References

  1. Case Study GFS: Evolution on Fast-forward
  2. Google’s Colossus Makes Search Real-Time By Dumping MapReduce
  3. 大规模分布式存储系统:原理解析与架构实战