0%

最近看 mit-6.824,每个 lecture 会做一次笔记,每个笔记基本都分为,

  • Readings:课前阅读的论文
  • Lecture:讲义
  • Lab:实验

Readings

MapReduce: Simplified Data Processing on Large Clusters论文

实现

map task运行时,

  • 定期将生成的kv pair写入本地磁盘,并被parition函数分为R个分区。
  • 这些在磁盘上的pair的位置会被返回给master,master进而把这些位置发给reduce task。

reduce task运行时,

  • 使用RPC读取map task缓存到磁盘的数据。
  • 当reduce worker读取到所有数据时,按临时key对数据排序。因为多个intermediate key可能会由同一个Reducer处理,因此需要排序使得相同的key在一起。
  • reduce worker遍历已排序的中间数据,将key和中间value传给用户定义的reduce函数。

master数据结构

  • 对于每个map task和reduce task,master保存了任务状态(idle, in-progress, or completed),以及每个worker机器的身份。
  • 对于每个已完成的map task,master保存了R个分区的中间数据文件的位置和大小,每当一个map task完成,中间数据文件的位置和大小就被更新。这些信息以增量的形式,发送给有正在运行reduce task任务的机器。

容错

由于长时间未响应master的ping,worker故障,

  • 正在运行的map task或reduce task重置为idle->重跑。
  • 已完成的map task重置为idle->重跑,因为map task生成的中间数据是本地存储的。
  • 已完成的reduce task无需重跑,因为输出已经保存到GFS。

当一个map task先被worker A执行,接着又被B执行(worker A失败),这次重跑会被通知到所有执行reduce task的机器。任何还没有从A读取数据的reduce task,都转而向B读取。

master故障,

  • 定期保存master数据结构的checkpoint。

semantics in the presence of failures,

  • 当用户提供的 map 和 reduce 操作的执行结果是确定的,那么分布式的实现也会产生相同的执行结果,这个结果与整个程序顺序执行产生的结果一致。这依赖于,map task 和 reduce task 输出的原子提交。每个正在执行的任务都会把输出写到临时文件。当某个map task完成时,worker向master发送包含有R个临时文件列表的消息。如果master已经收到了这个任务完成的消息,那么本条就会被忽略。而对于 reduce task,输出到GFS的时候,GFS保证了原子性。

  • 当 map 和 reduce 的执行结果是不确定的,那么最终结果也是不确定的,每个 reduce 任务的执行结果都不同的不确定的串行执行的结果。

局部性

GFS 将文件分为 64MB 的块,并在不同机器上冗余存储。Master 可以利用文件的位置信息,在包含这个副本的机器上调度 map 任务。如果无法进行上述调度,那么就近调度(例如:同一个交换机下的 worker)。

任务粒度

MapReduce 把 map 任务和 reduce 任务分别分为 M 份和 R 份,理想情况下,M 和 R 应该远大于机器数目,这样有利于提高动态负载均衡和加速(当某个 worker 挂了情况下的)错误恢复:这个挂了的 worker 所执行的任务可以被重新调度到其他 worker 上进行。

实现时,master 需要进行 O(M + R) 次调度,需要 O(M * R) 状态跟踪 map 产生的临时文件位置(每个 map 产生 R 个临时文件),每个 map/reduce pair 需要一个 byte。

通常 R 的数量是由用户指定的,实际应用中对 M 的划分是要保证一个分片的数据量大小大约是 16-64M,R 的期望是一个比较小的数。

任务备份

一个导致任务执行总时间大大延长的重要原因是存在“拖后腿的任务”,这些执行缓慢的任务可能由多种原因导致。一个通用的解决方案是,当一个 MapReduce 任务将要执行完成时,master 调度一个备份执行。当主任务(原始的)或备份执行的其中之一完成时,这个任务被标记完成。这个方法在消耗少许额外资源的情况下,大大减少了总执行时间。

改进

提升效率的方法。

Partitioning 函数

用户指定 reducer 的数目为 R,map 的中间结果按照 partitioning 函数对临时 key 的计算结果分成了 R 个部分。一般来说,默认的 partitioning 函数是足够的,但也可以自己提供 partitioning 函数来实现特定的 partition 目标。

保证顺序

每个 partition 的 kv 是按照临时 key 升序的顺序处理的。这保证了,

  • 每个 reduce 产生的结果是有序的。
  • 利于按 key 的随机访问。
  • 用户觉得有序的结果方便。

Combiner 函数

当,

  • 每个 map 任务会产生大量重复的临时 key,
  • 用户提供的 reduce 函数是可交换和可结合的,

可以在 map 任务结束以后,使用 combiner 对 map 任务的结果进行部分合并,减少网络传输的开销。

输入输出类型

MapReduce 提供了读写多种文件格式的支持,用户也可以通过实现相应接口来自定义读写其他格式文件。

副作用

MapReduce 允许 map 或 reduce 生成辅助的输出文件,其原子性依赖于应用的实现。

跳过 Bad Records

某些特定的数据会让含有 bug 的 map 或 reduce 函数崩溃,有时无法修复这些 bug,且忽略这些数据是可接受的,那么可以使用 MapReduce 提供的机制来忽略。

每个 worker 都会监控 segmentation violations 和 bus errors,如果崩溃,发送记录 -> master,如果 master 发现某条记录失败次数大于 1,那么下次执行时就会跳过这条记录。

本地执行

分布式环境下,debug 一个 map 或 reduce 函数是困难的。MapReduce 库允许在本地执行任务,便于调试。

状态信息

master 会运行一个内部的 HTTP 服务器,展示状态信息。

计数器

MapReduce 库提供了一个计数器来统计事件发生的次数。worker 响应 master 的 ping 时,计数器的值就被放在 pong 中,发送到 master。master 聚合计数器的值,并在 map 和 reduce 任务结束后返回给用户。当聚合时,重复执行任务的计数器只会被统计一次。有的计数器值是由 MapReduce 库直接维护的。

Lectures

Distributed System

分布式系统提供了app使用的基础设施,通过抽象隐藏了实现的细节,这些抽象包括:

  • 存储(storage)
  • 通信(communication)
  • 计算(computation)

在讨论分布式系统时,下面几个话题会时常出现:

  • 实现 RPC,线程,并发控制。

  • 性能

    • 理想情况:可扩展的吞吐量。N倍的服务器数量->(通过并行的CPU,磁盘,网络实现)N倍的吞吐量。因此为了处理更多的负载,只需要添加更多的机器。
    • 可扩展性随着N增加而变得困难:
      • 负载不均衡(Load im-blance),集群里有慢的机器(stragglers)。
      • 不可并行的代码:初始化,交互
      • 共享资源的瓶颈:网络
  • 容错

    • 大集群,复杂的网络->总会有出问题的地方
    • 希望能够隐藏这些错误,使得错误对app不可见
      • 可用性(Availability):在出错的情况下,app能继续使用数据。
      • 持久性(Durability):修复错误后,app能够继续正常工作。
    • big idea:多个服务器。如果一个server crash,client能继续使用其他的。
  • 一致性(consistency)

    • 通用目的的架构需要有良好定义的行为。
      • 例如:get(k)应该返回最近的put(k, v)
    • 实现良好定义的行为是困难的。
      • 难以保证多个服务器一致。
      • 在含有多个操作的更新中,客户端执行到一半可能crash了。
      • 服务器在尴尬的时刻崩溃,例如:执行了操作但没有返回结果。
      • 由于网络问题,服务器看起来好像挂了。
    • 一致性和性能是冲突的。
      • 实现一致性需要通信。
      • “强一致性”常常导致系统性能低下。
      • 高性能通常会对app造成“弱一致性”。
    • 开发者在这个范围内追求了很多设计点(People have pursued many design points in this spectrum)。

MapReduce

MapReduce概述

  • 上下文:对海量数据进行多个小时的运算。
  • 总目标:普通程序员能够在保证合理的效率的情况下,轻松的将数据处理切分到多个服务器。
  • 需要定义Map和Reduce函数,非并发的代码,且常常是很简单的。
  • MR运行在集群上,处理海量数据,隐藏分布式的细节。
1
2
3
4
5
6
7
8
input is divided into M files
Input1 -> Map -> a,1 b,1 c,1
Input2 -> Map -> b,1
Input3 -> Map -> a,1 c,1
| | |
| | -> Reduce -> c,2
| -----> Reduce -> b,2
---------> Reduce -> a,2
  • MR对每个输入文件调用Map(),生成<k2, v2>(中间数据)的集合
  • MR收集每个k2对应的所有v2,并输入给Reduce调用
  • 最后从Reduce()输出<k2, v3>的集合

MapReduce隐藏了很多痛苦的细节

  • 在服务器上启动s/w
  • 跟踪已结束的任务
  • 数据移动
  • 错误恢复

MapReduce有很好的可扩展性

N台机器->N倍吞吐量。假设M和R都>=R(大量的输入文件和map输出的key)。由于Map()之间无交互,可以并行执行,Reduce()也是,都能够并行执行,因此可以通过添加机器来增加吞吐量。

性能受限的潜在因素

网络。在Map->Reduce的shuffle时,所有数据都需要通过网络传输,因此减少通过网络来移动的数据是关键。

更多细节

  • mater:为worker分配任务,记录中间输出的位置。
  • M Map tasks, R Reduce tasks。
  • 输入存储在GFS,3份冗余。
  • 集群所有机器都运行GFS和MR worker。
  • input task比workder数量更多。
  • mater为每个worker分配Map task,在旧任务结束时分发新任务。
  • Map worker在本地磁盘将中间key hash到R个partition
  • 只有当所有Map结束后,本地的Reduce调用才会开始。
  • mater告诉Reducer从Map worker获取中间数据partition(intermediate data partitions)。
  • Reduce workers将最终结果写入GFS(一个文件对应一个Reduce task)。

设计细节:减少低速网络的影响

  • Map从本地磁盘读入GFS上的数据副本(replica),而不是从网络。
  • 中间数据仅仅在网络中传输一次。Map worker将数据写入本地磁盘,而不是GFS。
  • 中间数据被partition到包含很多key的文件。大网络中的传输更加高效。

如何实现良好的load balance?

load balance对于可扩展性很重要,N-1个server都等待1个server结束是不好的。但是有的任务就是会比其他花费更长的时间。

解决方案:任务数比worker数更多。 master为已完成当前任务的worker分配新的任务。一般来说,就不会有大任务占据主要的计算时间。那么快的server会比慢的server完成更多的任务,最终同时完成所有任务。

如何实现容错?

如果某个server在执行MR任务期间crash怎么办? 不是重启整个job,MR只重新运行失败的Map()Reduce()。这两个操作必须是纯函数:

  • 不保留调用之间的状态。
  • 不读取和写入除了MR输入/输入的文件。
  • 任务之间没有隐藏的通信。

因此重新执行会生成相同的结果。纯函数的要求是MR相比于其他并发编程模型的主要局限,当它也是MR简洁的关键。

crash recovery的细节

  • Map worker crashes:
    • master观察到worker再也不响应ping。
    • crash的worker的中间Map输出丢失,但这个数据有可能每个Reduce任务都需要。
    • master根据GFS上输入数据的其他副本来分配任务,并重新执行。
    • 某些Reduce worker可能已经读取了crash的worker生成的中间数据。此时就需要依赖于Map()的纯函数特性和确定性。
    • 如果Reduce获取到了所有的中间数据,那么master就不需要重新运行Map。然后接下来的一个Reduce会crash,进而导致强制重跑失败的Map。
  • Reduce worker crashes:
    • 已结束的任务不受影响,数据以冗余的形式存储在GFS。
    • master重新执行其他worker上未完成的任务。
  • Reduce worker在写入输出数据期间crash:
    • GFS的rename是atomic的,在写入完成前数据不可见。因此master可以安全的在其他地方重跑Reduce任务。

其他错误和问题

  • master给两个worker分配了相同的Map()任务怎么办? 原因可能是master错误的认为worker挂了。master只会将其中一个告诉给Reduce worker。
  • master给两个worker分配了相同的Reduce()任务怎么办? 两个都会尝试在GFS中写入相同的文件。GFS rename的atomic性质避免了结果是两者的混合,只有一个完整的文件可见。而Reduce()的纯函数特性使得两次输出的文件是一样的。
  • 如果某个worker很慢怎么办? 原因可能是硬件问题。master重新运行最后几个任务。
  • 如果worker由于h/w或s/w问题计算出了错误的输出怎么办? ╮(╯_╰)╭,MR假设CPU和software是“fail-stop”的。
  • 如果master crash了怎么办?
    • 从check-point恢复。
    • 放弃执行任务。
    • 使用多个master,一个可用,剩余standby。

什么样的app不适用于MapReduce?

  • 不是所有app都适用于map/shuffle/reduce模式。
  • 小数据量,因为开销很高。例如:网站的后端。
  • 对海量数据的小更新。例如:为一个大索引添加小文件。
  • 随机读写,因为Map和Reduce都不能选择输入数据。
  • 多次shuffle,例如:page-rank。可以使用多个MR来实现,但是不高效。
  • 更灵活的系统可以实现上述目标,但会导致更复杂的模型。

结论

MapReduce让集群计算受欢迎。

  • 不是最高效和灵活的。
  • 可扩展性良好。
  • 易于编程,错误和数据移动被隐藏了。

这些在实践中是很好的权衡。

Lab 1: MapReduce

  • 实验给出了框架代码,需要完成关键函数。
  • MapReduce的分布式实现除了需要关注task,还需要考虑存储,分布式存储不是这里的重点,因此实验在一台机器上运行worker thread,使用系统的文件系统模拟分布式存储。
  • 实验要求实现两种模式的MR,顺序执行所有task,以及分别并行执行map和reduce task。实验给出了的框架代码使用了go的channel来实现并行执行task。而对于顺序执行task的实现,实际上是分别把所有map和reduce task做了一次封装,得到“一个map task”和“一个reduce task”,分别执行封装好的“map task”和“reduce task”,其中每个实际的map和reduce都是顺序执行的。
  • lab的代码在github.com/chaomai/mit-6.824

Generator(生成器)

在Python中,说到generator,就不得不提iterator和iterable,下面这张图来自Iterables vs. Iterators vs. Generators,这里做个简单的说明(文章很好的解释了这三者的关系与区别)。

  • Container:是一个把元素组织在一起的数据结构,可以判断元素是否包含在容器当中。(大多数)容器提供了一种能够得到他们包含的每个元素的方法,这使得这些容器是iterable(可迭代对象)。
  • Iterable:iterable是任何一个可以返回iterator的对象(不限于容器)。
  • Iterator:带状态的对象,当对这个对象调用next()时,可以产生下一个值。任何有__next__()方法的对象都是一个iterator。Iterator就像一个lazy factory,当需要时,才产生一个值返回。

下面是一个iterator的例子,与此同时也是一个iterable,

1
2
3
4
5
6
7
8
9
10
11
12
13
class fib:
def __init__(self):
self.prev = 0
self.curr = 1

def __iter__(self):
return self

def __next__(self):
value = self.curr
self.curr += self.prev
self.prev = value
return value

下面着重要说的是generator,genarator是一种特殊的iterator,因此它是一个惰性求值的factory。继续上面的例子,generator能够避免编写__iter__()__next__(),而以一种简洁优雅的方式写出上面的fib iterator。

1
2
3
4
5
6
7
8
def fib():
prev, curr = 0, 1
while True:
yield curr
prev, curr = curr, prev + curr

f = fib()
list(islice(f, 0, 10))

当执行fib()时,实例化并返回了一个generator,除此之外什么代码都没有被执行,包括prev, curr = 0, 1islice是一个iterator,因此fib的代码仍然未被执行。

list会使用其参数,由其来构造一个list,它会对islice对象调用next(),进而会对f对象调用next(),此时才开始执行fib()里的代码,直至yield curr,返回curr中的值,并暂停执行fib()fib()的状态被冻结了。这个值被返回给islice,最终被添加到list里面。然后重复上述过程,直到产生第10个元素。

generator的类型有函数generator和表达式generator,表达式generator语法类似列表生成式,

1
2
3
4
5
numbers = [1, 2, 3, 4, 5, 6]
square_list = [x * x for x in numbers]
square_set = {x * x for x in numbers}

lazy_square_list = (x * x for x in numbers)

到目前为止,似乎generator相比起iterator,除了更简洁以外,没有什么特别的东西。pep-0342A new method for generator-iterators is proposed, called send(). It takes exactly one argument, which is the value that should be "sent in" to the generator).规定了一个genarator可以产生一个值,或者在产生一个值的同时还接收一个值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def fib():
prev, curr = 0, 1
while True:
old_curr = curr
curr = yield old_curr
if not curr is None and not curr == old_curr:
prev, curr = curr, curr + 1
else:
prev, curr = old_curr, prev + old_curr

f = fib()
list(islice(f, 0, 10)) # [1, 1, 2, 3, 5, 8, 13, 21, 34, 55]
f.send(100)
list(islice(f, 0, 10)) # [201, 302, 503, 805, 1308, 2113, 3421, 5534, 8955, 14489]

第二次的list(islice(f, 0, 10))结果不是[89, 144, 233, 377, 610, 987, 1597, 2584, 4181, 6765],因为send()改变了curr的值。

在调用send()前,由于还未执行到yield处,因此必须先调用一次next()send(None)

Coroutines(协程)

与Coroutines对应的概念是Subroutine(子程序)。一个普通的函数调用是这样的,从函数的第一行执行到return语句或exception,或者执行到函数的结尾,这样也叫做一个subroutine。但有时候又希望函数能够生成一系列的值,而不仅仅是返回一个值,此时函数不应该return(return control of execution),而是yield(transfer control temporarily and voluntarily),因为函数需要稍后继续执行。generator能够冻结函数的状态,继续执行的时候恢复。

下面是用generator来实现的一个生产者-消费者模型,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import random

def consume():
consumed_count = 0
while True:
data = yield
consumed_count += 1
print('Consuming {}, Total Consumed {}'.format(data, consumed_count))

def produce(consumer):
while True:
data = random.random()
print('Produced {}'.format(data))
consumer.send(data)
yield

consumer = consume()
consumer.send(None) # or consumer.next()
producer = produce(consumer)
for _ in range(10):
print('Producing...')
next(producer)

# Producing...
# Produced 0.4138693968479813
# Consuming 0.4138693968479813, Total Consumed 1
# Producing...
# Produced 0.5462849666609885
# Consuming 0.5462849666609885, Total Consumed 2
# Producing...
# Produced 0.06190270111408913
# Consuming 0.06190270111408913, Total Consumed 3

References

  1. 谈谈Python的生成器 里有一个关于send不错的例子
  2. Improve Your Python: 'yield' and Generators Explained

Fence

除了在原子操作中标记memory ordering外,还可以单独使用fence指定memory ordering。Fence是全局的操作,它影响所执行线程中其他原子操作的ordering。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
atomic<bool> x,y;
atomic<int> z;

void write_x_then_y() {
x.store(true,memory_order_relaxed);
atomic_thread_fence(memory_order_release);
y.store(true,memory_order_relaxed);
}

void read_y_then_x() {
while(!y.load(memory_order_relaxed));
atomic_thread_fence(memory_order_acquire);
if(x.load(memory_order_relaxed))
++z;
}

上面的代码中,如果没有显式的fence,z的值是不确定的。

关于fence,有几个synchronizes-with规则:

  • 如果acquire操作能读取到位于release fence后面store的写入的值,那么这个fence synchronizes-with acquire操作。
  • 如果位于acquire fence前面的load操作能够读取到release操作的值,那么这个release操作synchronizes-with acquire fence。
  • 如果位于acquire fence前面的load操作能够读取到位于release fence后面的store写入的值,那么release fence synchronizes-with acquire fence。

对于上面的代码,因为y的load能够读取到前面写入的值(由于fence存在,保证了ordering),所以release fence synchronizes-with acquire fence。

Ordering Nonatomic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
bool x;
atomic<bool> y;
atomic<int> z;

void write_x_then_y() {
x = true;
atomic_thread_fence(memory_order_release);
y.store(true,memory_order_relaxed);
}

void read_y_then_x() {
while(!y.load(memory_order_relaxed));
atomic_thread_fence(memory_order_acquire);
if(x)
++z;
}

现在把前面例子中的x换为普通的x,z的值仍然是有保证的,y必须是原子的。fence保证了x的store和y的store,以及y的load和x的load之间的ordering,而y的store和load之间有happens-before关系,因此x的store和load之间也有happens-before关系(传递)。

Release Sequences

如果

  • 有标记为memory_order_releasememory_order_acq_relmemory_order_seq_cst的store,
  • 和标记为memory_order_consumememory_order_acquirememory_order_seq_cst的load,
  • 并且在操作链中的每个操作都load上一个操作write的值

那么这个操作链构成一个release sequence,并且 * 初始的store synchronizes-with 用memory_order_acquirememory_order_seq_cst标记的最后的load; * 或初始的store dependency-ordered-before 用memory_order_consume标记的最后的load。

操作链中的RMW操作可以被标记为任意一种memory ordering。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
vector<int> queue_data;
atomic<int> count;

void f1() {
queue_data.push_back(1); // (1)
count.store(number_of_items, memory_order_release); // (2)
}

void f2() {
while(true) {
int item_index;
if((item_index = count.fetch_sub(1, memory_order_acquire)) <= 0) { // (3)
continue;
}
process(queue_data[item_index-1]);
}
}

thread a(f1);
thread b(f2);
thread c(f2);

在上述代码中,执行f2的有两个线程,由于acquire-release语义,(2)肯定是与第一个(3)有synchronizes-with关系的。后面还有一个(3),第一个(3)写入的值被第二个(3)读取,因此操作链构成一个release sequence,由因为最后一个(3)是memory_order_acquire(无论是哪个线程中的fetch_sub作为最后一个),因此有(2)synchronizes-with最后一个(3)。

Data Dependency

memory_order_consume是关于data dependency的,我的理解是更细粒度的acquire-release,通过使用memory_order_consume,可以避免对其他无依赖的数据强加同步。

关于data dependency,有两个关系(两个关系都有传递性),

  • carries-a-dependency-to:在单线程中,如果操作A的结果被用作操作B的操作符,那么操作A carries-a-dependency-to 操作B。
  • dependency-ordered-before:在线程之间,有标记为memory_order_releasememory_order_acq_relmemory_order_seq_cst的store操作A,如果标记为memory_order_consume的load操作B read了被store的数据,则操作A dependency-ordered-before 操作B。(操作A和B都是原子的)

线程之间,如果A dependency-ordered-before B,那么A happens-before B。

使用场景

对于这种memory ordering,一个典型的应用就是load一个指针指向的数据,其中load是原子操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
struct X {
int i;
std::string s;
};

std::atomic<X*> p;
std::atomic<int> a;

void create_x() {
X* x = new X;
x->i = 42;
x->s = "hello";
a.store(99, std::memory_order_relaxed);
p.store(x, std::memory_order_release);
}

void use_x() {
X* x;
while (!(x = p.load(std::memory_order_consume))) {
std::this_thread::sleep_for(std::chrono::microseconds(1));
}

assert(x->i == 42);
assert(x->s == "hello");
assert(a.load(std::memory_order_relaxed) == 99);
}

上述代码中,p的store是memory_order_release的,它的load是memory_order_consume的,循环保证了load能够读取到store的指针,因此当load能够读取到store的指针时,p的store dependency-ordered-before 它的load。所以p的store happens-before 它的load。

又因为x的store happens-before p的store,p的load happens-before x的load。

因此x的store happens-before x的load,对于x的两个assert不会触发,但a的load读取到的值是没有保证的。

kill_dependency

kill_dependency可以显式地打破依赖链。

对于如下代码(取自stackoverflow),

1
2
3
r1 = x.load(memory_order_consume);
r2 = r1->index;
do_something_with(a[std::kill_dependency(r2)]);

使用kill_dependency让编译器知道不需要再次读取r2的值,因此编译器可以将代码优化为,

1
2
3
4
predicted_r2 = x->index; // unordered load
r1 = x; // ordered load
r2 = r1->index; // ordered load
do_something_with(a[predicted_r2]); // 不需要再次读取r2

甚至优化为,

1
2
3
4
5
predicted_r2 = x->index; // unordered load
predicted_a = a[predicted_r2]; // get the CPU loading it early on
r1 = x; // ordered load
r2 = r1->index; // ordered load
do_something_with(predicted_a);

Memory ordering描述了CPU访问系统内存,执行load和store的顺序。Memory ordering包括编译时编译器生成的和运行时CPU生成的。为了高效地执行指令,只要不影响单线程程序的行为,编译器和CPU常常会对指令进行memory reordering,使得访问内存的操作不会按照程序代码中指定的顺序执行。

在单线程程序中,可以忽略reordering的存在;在多线程程序中,mutex,semaphores等互斥方法会保证在相关函数的调用周围没有reordering。在多核环境下(或对称多处理器架构)下,用C、C++等编写lock-free代码时,memory reordering是可观察到的,是重点要考虑的问题。

Compiler Reordering

从源代码得到CPU指令的过程中,编译器会做很多事情,其中之一就是reordering。(例子摘自Preshing on Programming

1
2
3
4
5
6
int Value;
int IsPublished = 0;
void sendValue(int x) {
Value = x;
IsPublished = 1;
}

如果IsPublished = 1;被reordered到Value = x;之前,那么IsPublished作为flag的作用就丧失了。既是是在单线程中,也会有问题。如果该线程在两次store之间被抢占,那么其他使用Value的线程就会访问到Value的旧值,而不是新值。

Compiler Barriers

防止编译器reordering最简单的办法莫过于使用compiler barriers,在不想被reordered的两个操作(load和store)之间加入compiler Barrierasm volatile("" ::: "memory"),但在多核环境下,这并不足够。还需要下面的CPU fence来提供运行时memory barrier的保障。如果使用了CPU fence,那么fence也会作为compiler Barrier。

另一种compiler barrier是函数调用,无论函数是否包含compiler barrier,除了inline函数、声明带有pure属性的函数和使用了链接时代码生成的函数,大多数函数调用都可以作为compiler barrier。因为编译器根本不知道该函数调用是否会修改先前的值,也不知道这个值是否在函数调用返回后会被继续使用,如果进行了reordering,那么很可能会违反一开始提到的原则。而对于包含的函数,无论是不是inline的,都可以作为compiler barrier。

C/C++中,volatie会阻止编译器的优化,编译器不会volatie变量间的操作进行reordering,但是volatie对处理器的reordering是无能为力的,并没有happens-before语义。与此同时,volatie也不能阻止多个线程的并发访问。对于下面的代码,无论shared_data是不是volatiespin_lock都是必须的。

1
2
3
4
spin_lock(&the_lock);
do_something_on(&shared_data);
do_something_else_with(&shared_data);
spin_unlock(&the_lock);

代码里的spin_lock也下面将说的memory barrier。

Processor Reordering

除了编译器会进行reordering,CPU同样会。CPU的reordering仅在多核或多处理器环境下才是可见的。(例子摘自Preshing on Programming

考虑下面的一段代码,thread1Functhread2Func分别在两个线程中运行,最后r1r2的结果会是什么?为了阻止编译器的reordering,已经在store和load之间加入了compiler barrier。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int X, Y;
int r1, r2;
void *thread1Func(void *param) {
X = 1;
asm volatile("" ::: "memory")
r2 = Y;
return NULL;
};
void *thread2Func(void *param) {
Y = 1;
asm volatile("" ::: "memory")
r2 = X;
return NULL;
};

由于是并发执行,两个线程的load和store会交替执行,因此r1r2的结果可能为,

1
2
3
r1=0 r2=1
r1=1 r2=0
r1=1 r2=1

r1=0 r2=0也是完全可能的,如果重复的进行测试,那么这个结果会频繁的发生。Intel在64 and IA-32 Architectures Software Developer's Manual中的8.2.3.4指出,

At each processor, the load and the store are to different locations and hence may be reordered.

代码中每个线程的store和load是不同的内存位置,所以发生StoreLoad Reordering是完全可能的。

Memory Barrier

上述例子的reordering只是众多memory reordering中的一种,主要有四种memory reordering

  • LoadLoad
  • StoreStore
  • LoadStore
  • StoreLoad

类似compiler reordering,需要compiler barrier来阻止CPU的reordering,这里是memory barriers,也叫fence指令。Fence保证了fence之前的load或store和之后的load或store是严格有序的。针对以上四种reordering,有对应的四种barrier,#LoadLoad#StoreStore#LoadStore#StoreLoad。对于现实中CPU的fence指令的行为,通常是上述几种的融合,且还会有其他的效果。

1
Store1; #StoreLoad; Load2;

需要特别说明的是#StoreLoad,这个barrier是唯一能够保证上述例子不出现r1=0 r2=0的。#StoreLoad保证了在barrier前执行Store1对其他处理器是可见的,在barrier后执行的Load2能够得到在barrier之后最新的值(不一定是Store1的值)。#StoreLoad防止了后续的load错误的使用Store1的值,而不是其他处理器在同一内存位置store的更新的值。#StoreLoad几乎在所有现代的多处理器上都是必须的,同时也是代价最高昂的一种barrier。

除了fence指令,memory barrier还有,

  • C++11中,很多原子类型的操作;
  • pthread中,mutex,spin_lock,semaphore的操作。

取自preshing博客上的几篇文章(123)。除了部分翻译外,还有自己的理解。

Happens-before关系

假设A和B两个操作是由多线程程序执行的,如果A happens-before B,那么A对内存的操作在B被执行前对执行B的线程切实可见。

关于happens-before要注意的是一下看起来自相矛盾的两点。因为happens-before所描述的是操作之间的关系,这个关系是独立于时间的,并不是happening before。

Happens-before并不意味着happening before

1
2
3
4
5
6
int A = 0;
int B = 0;
void foo() {
A = B + 1; // (1)
B = 1; // (2)
}

以上代码中,只看program order的话,(1)是happens-before(2)的。但编译器可能会对上面的代码进行reorder(用clang++ 3.7 -O2没有发生),使得B的store先于A完成

从happens-before定义来看,(1)对内存的修改必须在(2)执行前切实可见,也就是说A的store必须影响到B的store。但从这个例子来看,A的store并未影响到A,就算没有(1),(2)的行为也是一样的,这就等价于(1)的操作是可见的

因此(1)和(2)行为并不违背happens-before,happens-before并不意味着happening before。

Happening before并不意味着happens-before

假设下面对的int的store和load都是原子的,有两个线程分别执行两个函数。就program order而言,(1)和(2),(3)和(4)之间有happens-before关系。再假设在运行时,(2)在(3)之前完成,(3)读到了1。

但是并不意味着(2)和(3)之间有happens-before关系。

1
2
3
4
5
6
7
8
9
10
int isReady = 0;
int answer = 0;
void publishMessage() {
answer = 42; // (1)
isReady = 1; // (2)
}
void consumeMessage() {
if (isReady) // (3) <-- Let's suppose this line reads 1
printf("%d\n", answer); // (4)
}

happens-before关系仅仅在标准指明的地方有。C++11中并未规定在普通的store和load之间有happens-before关系。进一步看,(1)和(4)之间也没有。因此(1)和(4)是可以被编译器或CPU reordered的。即使(3)读到了1,(4)可能打印0。

单线程中的happens-before关系

如果操作A和B是由同一个线程执行的,且就program order而言,A的语句位于B之前,那么A happens-before B。

然而这并不是唯一实现happens-before关系的方法。

多线程中的happens-before关系

上面提到了单线程中的happens-before关系是如何产生的,下面来看多线程中的happens-before关系,C++11指出可以通过acquire和release语义,在不同线程的操作中实现happens-before。

Acquire和Release语义

Acquire语义:Acquire语义是一个属性,这个属性只能应用于从共享内存中的read操作,无论这些read是RMW还是普通的load。Acquire语义保证了在program order上位于read-acquire之后的read和write不会被编译器和CPU reordered到read-acquire之前。 Release语义:Release语义也是一个属性,这个属性只能应用于从共享内存中的write操作,无论这些read是RMW还是普通的store。Release语义保证了在program order上位于write-release之前的read和write不会被编译器和CPU reordered到write-release之后。

Raymond Chen的另一个解释, 一个带有acquire语义的操作不允许后续的内存操作提前到该操作之前执行,相对的,一个带有release语义的操作不允许前面的内存操作被滞后到该操作之后执行。

关于Acquire和Release语义,这里还有一个比较好的解释。

通过显式的CPU fence指令实现acquire和release语义

下面代码中有两个全局变量AReady,两个线程分别执行两段代码,Ready作为flag表示A的write是否完成。

1
2
3
4
5
6
7
8
9
10
11
12
int A = 0;
int Ready = 0;

// thread 1
A = 42;
#StoreStore
Ready = 1;

// thread 2
int r1 = Ready;
#LoadLoad
int r2 = A;

通过两个fence,可以保证当线程2发现r1 == 1时,A的值是1,进而保证r2 == 1

规范的来说就是,对Ready的write synchronizes-with对Ready的read。

Synchronizes-with关系

Synchronizes-with用于描述源码级操作的内存影响(describe ways in which the memory effects of source-level operations),即使是非原子操作,也能够保证结果是对其他线程可见。一个较为常见的事情是,无论何时在两个线程间有synchronizes-with关系(一般是在不同的线程间)那么在这些操作之间都会有happens-before关系。

一个Write-Release能够Synchronize-with一个Read-Acquire的

C++11标准规定了,一个对原子对象M执行release操作的原子操作A synchronize-with一个对M执行acquire操作的原子操作B,且能够得到以A为起始的release sequence操作的所有副作用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
struct Message {
clock_t tick;
const char* str;
void* param;
};
Message g_payload;
std::atomic<int> g_guard(0);

void SendTestMessage(void* param) {
// Copy to shared memory using non-atomic stores.
g_payload.tick = clock();
g_payload.str = "TestMessage";
g_payload.param = param;
// Perform an atomic write-release to indicate that the message is ready.
g_guard.store(1, std::memory_order_release);
}

bool TryReceiveMessage(Message& result) {
// Perform an atomic read-acquire to check whether the message is ready.
int ready = g_guard.load(std::memory_order_acquire);
if (ready != 0) {
// Yes. Copy from shared memory using non-atomic loads.
result.tick = g_payload.tick;
result.str = g_payload.str;
result.param = g_payload.param;

return true;
}
// No.
return false;
}

上述代码中,为了能够把g_payload安全的在线程中传递,使用了acquire和release。当TryReceiveMessageg_guard的read-acquire执行完后,如果ready是1,那么g_payload的三个成员一定被成功写入。

与前面的标准对照,可以看到,

  • 原子操作A是SendTestMessage中的write-release;
  • 原子对象M是g_guard
  • 原子操作B是TryReceiveMessage中的read-acquire。

前面所说的takes its value from any side effect in the release sequence headed by A,这里指read-acquire能够读取到write-release所写的值。如果读取到了,那么synchronize-with关系就出现了。此时,两个线程间就有了happens-before关系。有时这也叫做synchronize-with或happens-before“边”。

标准中还保证了只要有synchronize-with边存在,happens-before关系就能够扩展到临近的操作。对应到上例中就是,当其他线程读取g_payload时,保证能够读取到对g_payload写入的值。

运行时关系

想要通过静态的分析代码,来寻找代码中的synchronize-with关系是错误的。synchronize-with是运行时关系

如果g_guard读取的过早,线程1还没有写入g_guard,那么就没有synchronize-with关系。

其他实现Synchronize-with的方法

在写代码是发现拷贝构造函数有时候没有调用,想起C++ Primer中提到过

the compiler can omit calls to the copy constructor.

后来查到是发生了copy elision。

首先有那么一个类定义,其中静态成员c是对象编号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class C {
public:
C() {
name_ = c;
++c;
cout << "c" << name_ << ": " << __func__ << endl;
}
C(const int name) : name_(name) {
cout << "c" << name_ << ": " << __func__ << endl;
}
C(const C &rhs) : name_(c++) {
cout << "copy from c" << rhs.name_ << ", c" << name_ << ": " << __func__
<< endl;
}
C &operator=(const C &rhs) {
cout << "copy assign from c" << rhs.name_ << ", c" << name_ << ": "
<< __func__ << endl;
return *this;
}

int name_ = 0;
static int c;
};

int C::c = 10000;

然后是有下面的几个函数定义和调用,

1
2
3
4
5
6
7
8
9
10
void f(C c) {
C tmp();
tmp = c;
}
C f() { return C(); }

C c1(1);
f(c1);
f(C(2));
C c = f();

以上三个f的调用分别输出什么?

下面是结果,

1
2
3
4
5
6
7
8
9
10
c1: C
copy from c1, c10000: C
c10001: C
copy assign from c10000, c10001: operator=
-----
c2: C
c10002: C
copy assign from c2, c10002: operator=
-----
c10003: C

第一个没什么好说的,首先用c1对形参c做拷贝初始化,接着tmp进行默认初始化,然后用拷贝赋值,将c赋值给tmp

第二个的结果就有点怪了,为什么C(2)得到的临时对象直接进行了赋值,而不首先初始化形参c?而第三个,为什么返回的临时对象一次拷贝都没发生?

因为在这两种情况中发生了copy elision(1234)。Copy elision是一种优化手段,满足特定条件时会发生,当传入的参数是rvalue的时候,无需进行额外的拷贝,直接使用源对象。RVO,全称叫return value optimization,编译器会让调用函数在其栈上分配空间,被调函数返回值处的临时对象会在这块内存上构造,进而避免了return时临时对象的拷贝,是copy elision常见形式。根据返回的对象是否是临时的,有named return value optimizationreturn value optimization

Copy elision和rvo即使在有可观察的到的side-effects时,也会发生,是As-if rule的例外中的一种。Dave Abrahams写过pass by value的一系列文章。Ayman B. Shoukry在这里讨论了nrvo的局限(multiple return points和conditional initialization)。

clang++和g++可以用-fno-elide-constructors控制是否开启优化。关闭优化后,输出的结果就和期待的一致了,

1
2
3
4
5
6
7
8
9
10
11
12
13
c1: C
copy from c1, c10000: C
c10001: C
copy assign from c10000, c10001: operator=
-----
c2: C
copy from c2, c10002: C
c10003: C
copy assign from c10002, c10003: operator=
-----
c10004: C
copy from c10004, c10005: C
copy from c10005, c10006: C

stackoverflow的这个答案,给出了Standard reference和发生copy elision以及return value optimization常见的例子,下面是搬运例子,

  • nrvo
1
2
3
4
5
6
7
8
9
10
11
class Thing {
public:
Thing();
~Thing();
Thing(const Thing&);
};
Thing f() {
Thing t;
return t; // optimization
}
Thing t2 = f();
  • rvo
1
2
3
4
5
6
7
8
9
10
class Thing {
public:
Thing();
~Thing();
Thing(const Thing&);
};
Thing f() {
return Thing(); // optimization
}
Thing t2 = f();
  • pass a temporary object by value
1
2
3
4
5
6
7
8
9
class Thing {
public:
Thing();
~Thing();
Thing(const Thing&);
};
void foo(Thing t); // optimization

foo(Thing());
  • exception is thrown and caught by value
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
struct Thing{
Thing();
Thing(const Thing&);
};

void foo() {
Thing c;
throw c; // optimization
}

int main() {
try {
foo();
}
catch(Thing c) {}
}