最近看 mit-6.824,每个 lecture 会做一次笔记,每个笔记基本都分为,
- Readings:课前阅读的论文
- Lecture:讲义
- Lab:实验
Readings
MapReduce: Simplified Data Processing on Large Clusters论文
实现
map task运行时,
- 定期将生成的kv pair写入本地磁盘,并被parition函数分为R个分区。
- 这些在磁盘上的pair的位置会被返回给master,master进而把这些位置发给reduce task。
reduce task运行时,
- 使用RPC读取map task缓存到磁盘的数据。
- 当reduce worker读取到所有数据时,按临时key对数据排序。因为多个intermediate key可能会由同一个Reducer处理,因此需要排序使得相同的key在一起。
- reduce worker遍历已排序的中间数据,将key和中间value传给用户定义的reduce函数。
master数据结构
- 对于每个map task和reduce task,master保存了任务状态(idle, in-progress, or completed),以及每个worker机器的身份。
- 对于每个已完成的map task,master保存了R个分区的中间数据文件的位置和大小,每当一个map task完成,中间数据文件的位置和大小就被更新。这些信息以增量的形式,发送给有正在运行reduce task任务的机器。
容错
由于长时间未响应master的ping,worker故障,
- 正在运行的map task或reduce task重置为idle->重跑。
- 已完成的map task重置为idle->重跑,因为map task生成的中间数据是本地存储的。
- 已完成的reduce task无需重跑,因为输出已经保存到GFS。
当一个map task先被worker A执行,接着又被B执行(worker A失败),这次重跑会被通知到所有执行reduce task的机器。任何还没有从A读取数据的reduce task,都转而向B读取。
master故障,
- 定期保存master数据结构的checkpoint。
semantics in the presence of failures,
当用户提供的 map 和 reduce 操作的执行结果是确定的,那么分布式的实现也会产生相同的执行结果,这个结果与整个程序顺序执行产生的结果一致。这依赖于,map task 和 reduce task 输出的原子提交。每个正在执行的任务都会把输出写到临时文件。当某个map task完成时,worker向master发送包含有R个临时文件列表的消息。如果master已经收到了这个任务完成的消息,那么本条就会被忽略。而对于 reduce task,输出到GFS的时候,GFS保证了原子性。
当 map 和 reduce 的执行结果是不确定的,那么最终结果也是不确定的,每个 reduce 任务的执行结果都不同的不确定的串行执行的结果。
局部性
GFS 将文件分为 64MB 的块,并在不同机器上冗余存储。Master 可以利用文件的位置信息,在包含这个副本的机器上调度 map 任务。如果无法进行上述调度,那么就近调度(例如:同一个交换机下的 worker)。
任务粒度
MapReduce 把 map 任务和 reduce 任务分别分为 M 份和 R 份,理想情况下,M 和 R 应该远大于机器数目,这样有利于提高动态负载均衡和加速(当某个 worker 挂了情况下的)错误恢复:这个挂了的 worker 所执行的任务可以被重新调度到其他 worker 上进行。
实现时,master 需要进行 O(M + R)
次调度,需要 O(M * R)
状态跟踪 map 产生的临时文件位置(每个 map 产生 R 个临时文件),每个 map/reduce pair 需要一个 byte。
通常 R 的数量是由用户指定的,实际应用中对 M 的划分是要保证一个分片的数据量大小大约是 16-64M,R 的期望是一个比较小的数。
任务备份
一个导致任务执行总时间大大延长的重要原因是存在“拖后腿的任务”,这些执行缓慢的任务可能由多种原因导致。一个通用的解决方案是,当一个 MapReduce 任务将要执行完成时,master 调度一个备份执行。当主任务(原始的)或备份执行的其中之一完成时,这个任务被标记完成。这个方法在消耗少许额外资源的情况下,大大减少了总执行时间。
改进
提升效率的方法。
Partitioning 函数
用户指定 reducer 的数目为 R,map 的中间结果按照 partitioning 函数对临时 key 的计算结果分成了 R 个部分。一般来说,默认的 partitioning 函数是足够的,但也可以自己提供 partitioning 函数来实现特定的 partition 目标。
保证顺序
每个 partition 的 kv 是按照临时 key 升序的顺序处理的。这保证了,
- 每个 reduce 产生的结果是有序的。
- 利于按 key 的随机访问。
- 用户觉得有序的结果方便。
Combiner 函数
当,
- 每个 map 任务会产生大量重复的临时 key,
- 用户提供的 reduce 函数是可交换和可结合的,
可以在 map 任务结束以后,使用 combiner 对 map 任务的结果进行部分合并,减少网络传输的开销。
输入输出类型
MapReduce 提供了读写多种文件格式的支持,用户也可以通过实现相应接口来自定义读写其他格式文件。
副作用
MapReduce 允许 map 或 reduce 生成辅助的输出文件,其原子性依赖于应用的实现。
跳过 Bad Records
某些特定的数据会让含有 bug 的 map 或 reduce 函数崩溃,有时无法修复这些 bug,且忽略这些数据是可接受的,那么可以使用 MapReduce 提供的机制来忽略。
每个 worker 都会监控 segmentation violations 和 bus errors,如果崩溃,发送记录 -> master,如果 master 发现某条记录失败次数大于 1,那么下次执行时就会跳过这条记录。
本地执行
分布式环境下,debug 一个 map 或 reduce 函数是困难的。MapReduce 库允许在本地执行任务,便于调试。
状态信息
master 会运行一个内部的 HTTP 服务器,展示状态信息。
计数器
MapReduce 库提供了一个计数器来统计事件发生的次数。worker 响应 master 的 ping 时,计数器的值就被放在 pong 中,发送到 master。master 聚合计数器的值,并在 map 和 reduce 任务结束后返回给用户。当聚合时,重复执行任务的计数器只会被统计一次。有的计数器值是由 MapReduce 库直接维护的。
Lectures
Distributed System
分布式系统提供了app使用的基础设施,通过抽象隐藏了实现的细节,这些抽象包括:
- 存储(storage)
- 通信(communication)
- 计算(computation)
在讨论分布式系统时,下面几个话题会时常出现:
实现 RPC,线程,并发控制。
性能
- 理想情况:可扩展的吞吐量。N倍的服务器数量->(通过并行的CPU,磁盘,网络实现)N倍的吞吐量。因此为了处理更多的负载,只需要添加更多的机器。
- 可扩展性随着N增加而变得困难:
- 负载不均衡(Load im-blance),集群里有慢的机器(stragglers)。
- 不可并行的代码:初始化,交互
- 共享资源的瓶颈:网络
容错
- 大集群,复杂的网络->总会有出问题的地方
- 希望能够隐藏这些错误,使得错误对app不可见
- 可用性(Availability):在出错的情况下,app能继续使用数据。
- 持久性(Durability):修复错误后,app能够继续正常工作。
- big idea:多个服务器。如果一个server crash,client能继续使用其他的。
一致性(consistency)
- 通用目的的架构需要有良好定义的行为。
- 例如:
get(k)
应该返回最近的put(k, v)
。
- 例如:
- 实现良好定义的行为是困难的。
- 难以保证多个服务器一致。
- 在含有多个操作的更新中,客户端执行到一半可能crash了。
- 服务器在尴尬的时刻崩溃,例如:执行了操作但没有返回结果。
- 由于网络问题,服务器看起来好像挂了。
- 一致性和性能是冲突的。
- 实现一致性需要通信。
- “强一致性”常常导致系统性能低下。
- 高性能通常会对app造成“弱一致性”。
- 开发者在这个范围内追求了很多设计点(People have pursued many design points in this spectrum)。
- 通用目的的架构需要有良好定义的行为。
MapReduce
MapReduce概述
- 上下文:对海量数据进行多个小时的运算。
- 总目标:普通程序员能够在保证合理的效率的情况下,轻松的将数据处理切分到多个服务器。
- 需要定义Map和Reduce函数,非并发的代码,且常常是很简单的。
- MR运行在集群上,处理海量数据,隐藏分布式的细节。
|
|
MR
对每个输入文件调用Map()
,生成<k2, v2>
(中间数据)的集合MR
收集每个k2
对应的所有v2
,并输入给Reduce调用- 最后从
Reduce()
输出<k2, v3>
的集合
MapReduce隐藏了很多痛苦的细节
- 在服务器上启动s/w
- 跟踪已结束的任务
- 数据移动
- 错误恢复
MapReduce有很好的可扩展性
N台机器->N倍吞吐量。假设M和R都>=R(大量的输入文件和map输出的key)。由于Map()
之间无交互,可以并行执行,Reduce()
也是,都能够并行执行,因此可以通过添加机器来增加吞吐量。
性能受限的潜在因素
网络。在Map->Reduce的shuffle时,所有数据都需要通过网络传输,因此减少通过网络来移动的数据是关键。
更多细节
- mater:为worker分配任务,记录中间输出的位置。
- M Map tasks, R Reduce tasks。
- 输入存储在GFS,3份冗余。
- 集群所有机器都运行GFS和MR worker。
- input task比workder数量更多。
- mater为每个worker分配Map task,在旧任务结束时分发新任务。
- Map worker在本地磁盘将中间key hash到R个partition
- 只有当所有Map结束后,本地的Reduce调用才会开始。
- mater告诉Reducer从Map worker获取中间数据partition(intermediate data partitions)。
- Reduce workers将最终结果写入GFS(一个文件对应一个Reduce task)。
设计细节:减少低速网络的影响
- Map从本地磁盘读入GFS上的数据副本(replica),而不是从网络。
- 中间数据仅仅在网络中传输一次。Map worker将数据写入本地磁盘,而不是GFS。
- 中间数据被partition到包含很多key的文件。大网络中的传输更加高效。
如何实现良好的load balance?
load balance对于可扩展性很重要,N-1个server都等待1个server结束是不好的。但是有的任务就是会比其他花费更长的时间。
解决方案:任务数比worker数更多。 master为已完成当前任务的worker分配新的任务。一般来说,就不会有大任务占据主要的计算时间。那么快的server会比慢的server完成更多的任务,最终同时完成所有任务。
如何实现容错?
如果某个server在执行MR任务期间crash怎么办?
不是重启整个job,MR只重新运行失败的Map()
和Reduce()
。这两个操作必须是纯函数:
- 不保留调用之间的状态。
- 不读取和写入除了MR输入/输入的文件。
- 任务之间没有隐藏的通信。
因此重新执行会生成相同的结果。纯函数的要求是MR相比于其他并发编程模型的主要局限,当它也是MR简洁的关键。
crash recovery的细节
- Map worker crashes:
- master观察到worker再也不响应ping。
- crash的worker的中间Map输出丢失,但这个数据有可能每个Reduce任务都需要。
- master根据GFS上输入数据的其他副本来分配任务,并重新执行。
- 某些Reduce worker可能已经读取了crash的worker生成的中间数据。此时就需要依赖于
Map()
的纯函数特性和确定性。 - 如果Reduce获取到了所有的中间数据,那么master就不需要重新运行Map。然后接下来的一个Reduce会crash,进而导致强制重跑失败的Map。
- Reduce worker crashes:
- 已结束的任务不受影响,数据以冗余的形式存储在GFS。
- master重新执行其他worker上未完成的任务。
- Reduce worker在写入输出数据期间crash:
- GFS的rename是atomic的,在写入完成前数据不可见。因此master可以安全的在其他地方重跑Reduce任务。
其他错误和问题
- master给两个worker分配了相同的
Map()
任务怎么办? 原因可能是master错误的认为worker挂了。master只会将其中一个告诉给Reduce worker。 - master给两个worker分配了相同的
Reduce()
任务怎么办? 两个都会尝试在GFS中写入相同的文件。GFS rename的atomic性质避免了结果是两者的混合,只有一个完整的文件可见。而Reduce()
的纯函数特性使得两次输出的文件是一样的。 - 如果某个worker很慢怎么办? 原因可能是硬件问题。master重新运行最后几个任务。
- 如果worker由于h/w或s/w问题计算出了错误的输出怎么办? ╮(╯_╰)╭,MR假设CPU和software是“fail-stop”的。
- 如果master crash了怎么办?
- 从check-point恢复。
- 放弃执行任务。
- 使用多个master,一个可用,剩余standby。
什么样的app不适用于MapReduce?
- 不是所有app都适用于map/shuffle/reduce模式。
- 小数据量,因为开销很高。例如:网站的后端。
- 对海量数据的小更新。例如:为一个大索引添加小文件。
- 随机读写,因为Map和Reduce都不能选择输入数据。
- 多次shuffle,例如:page-rank。可以使用多个MR来实现,但是不高效。
- 更灵活的系统可以实现上述目标,但会导致更复杂的模型。
结论
MapReduce让集群计算受欢迎。
- 不是最高效和灵活的。
- 可扩展性良好。
- 易于编程,错误和数据移动被隐藏了。
这些在实践中是很好的权衡。
Lab 1: MapReduce
- 实验给出了框架代码,需要完成关键函数。
- MapReduce的分布式实现除了需要关注task,还需要考虑存储,分布式存储不是这里的重点,因此实验在一台机器上运行worker thread,使用系统的文件系统模拟分布式存储。
- 实验要求实现两种模式的MR,顺序执行所有task,以及分别并行执行map和reduce task。实验给出了的框架代码使用了go的channel来实现并行执行task。而对于顺序执行task的实现,实际上是分别把所有map和reduce task做了一次封装,得到“一个map task”和“一个reduce task”,分别执行封装好的“map task”和“reduce task”,其中每个实际的map和reduce都是顺序执行的。
- lab的代码在github.com/chaomai/mit-6.824