6.824 - Spring 2017 Lecture 1 Introduction

最近看 mit-6.824,每个 lecture 会做一次笔记,每个笔记基本都分为,

  • Readings:课前阅读的论文
  • Lecture:讲义
  • Lab:实验

Readings

MapReduce: Simplified Data Processing on Large Clusters论文

实现

map task运行时,

  • 定期将生成的kv pair写入本地磁盘,并被parition函数分为R个分区。
  • 这些在磁盘上的pair的位置会被返回给master,master进而把这些位置发给reduce task。

reduce task运行时,

  • 使用RPC读取map task缓存到磁盘的数据。
  • 当reduce worker读取到所有数据时,按临时key对数据排序。因为多个intermediate key可能会由同一个Reducer处理,因此需要排序使得相同的key在一起。
  • reduce worker遍历已排序的中间数据,将key和中间value传给用户定义的reduce函数。

master数据结构

  • 对于每个map task和reduce task,master保存了任务状态(idle, in-progress, or completed),以及每个worker机器的身份。
  • 对于每个已完成的map task,master保存了R个分区的中间数据文件的位置和大小,每当一个map task完成,中间数据文件的位置和大小就被更新。这些信息以增量的形式,发送给有正在运行reduce task任务的机器。

容错

由于长时间未响应master的ping,worker故障,

  • 正在运行的map task或reduce task重置为idle->重跑。
  • 已完成的map task重置为idle->重跑,因为map task生成的中间数据是本地存储的。
  • 已完成的reduce task无需重跑,因为输出已经保存到GFS。

当一个map task先被worker A执行,接着又被B执行(worker A失败),这次重跑会被通知到所有执行reduce task的机器。任何还没有从A读取数据的reduce task,都转而向B读取。

master故障,

  • 定期保存master数据结构的checkpoint。

semantics in the presence of failures,

  • 当用户提供的 map 和 reduce 操作的执行结果是确定的,那么分布式的实现也会产生相同的执行结果,这个结果与整个程序顺序执行产生的结果一致。这依赖于,map task 和 reduce task 输出的原子提交。每个正在执行的任务都会把输出写到临时文件。当某个map task完成时,worker向master发送包含有R个临时文件列表的消息。如果master已经收到了这个任务完成的消息,那么本条就会被忽略。而对于 reduce task,输出到GFS的时候,GFS保证了原子性。

  • 当 map 和 reduce 的执行结果是不确定的,那么最终结果也是不确定的,每个 reduce 任务的执行结果都不同的不确定的串行执行的结果。

局部性

GFS 将文件分为 64MB 的块,并在不同机器上冗余存储。Master 可以利用文件的位置信息,在包含这个副本的机器上调度 map 任务。如果无法进行上述调度,那么就近调度(例如:同一个交换机下的 worker)。

任务粒度

MapReduce 把 map 任务和 reduce 任务分别分为 M 份和 R 份,理想情况下,M 和 R 应该远大于机器数目,这样有利于提高动态负载均衡和加速(当某个 worker 挂了情况下的)错误恢复:这个挂了的 worker 所执行的任务可以被重新调度到其他 worker 上进行。

实现时,master 需要进行 O(M + R) 次调度,需要 O(M * R) 状态跟踪 map 产生的临时文件位置(每个 map 产生 R 个临时文件),每个 map/reduce pair 需要一个 byte。

通常 R 的数量是由用户指定的,实际应用中对 M 的划分是要保证一个分片的数据量大小大约是 16-64M,R 的期望是一个比较小的数。

任务备份

一个导致任务执行总时间大大延长的重要原因是存在“拖后腿的任务”,这些执行缓慢的任务可能由多种原因导致。一个通用的解决方案是,当一个 MapReduce 任务将要执行完成时,master 调度一个备份执行。当主任务(原始的)或备份执行的其中之一完成时,这个任务被标记完成。这个方法在消耗少许额外资源的情况下,大大减少了总执行时间。

改进

提升效率的方法。

Partitioning 函数

用户指定 reducer 的数目为 R,map 的中间结果按照 partitioning 函数对临时 key 的计算结果分成了 R 个部分。一般来说,默认的 partitioning 函数是足够的,但也可以自己提供 partitioning 函数来实现特定的 partition 目标。

保证顺序

每个 partition 的 kv 是按照临时 key 升序的顺序处理的。这保证了,

  • 每个 reduce 产生的结果是有序的。
  • 利于按 key 的随机访问。
  • 用户觉得有序的结果方便。

Combiner 函数

当,

  • 每个 map 任务会产生大量重复的临时 key,
  • 用户提供的 reduce 函数是可交换和可结合的,

可以在 map 任务结束以后,使用 combiner 对 map 任务的结果进行部分合并,减少网络传输的开销。

输入输出类型

MapReduce 提供了读写多种文件格式的支持,用户也可以通过实现相应接口来自定义读写其他格式文件。

副作用

MapReduce 允许 map 或 reduce 生成辅助的输出文件,其原子性依赖于应用的实现。

跳过 Bad Records

某些特定的数据会让含有 bug 的 map 或 reduce 函数崩溃,有时无法修复这些 bug,且忽略这些数据是可接受的,那么可以使用 MapReduce 提供的机制来忽略。

每个 worker 都会监控 segmentation violations 和 bus errors,如果崩溃,发送记录 -> master,如果 master 发现某条记录失败次数大于 1,那么下次执行时就会跳过这条记录。

本地执行

分布式环境下,debug 一个 map 或 reduce 函数是困难的。MapReduce 库允许在本地执行任务,便于调试。

状态信息

master 会运行一个内部的 HTTP 服务器,展示状态信息。

计数器

MapReduce 库提供了一个计数器来统计事件发生的次数。worker 响应 master 的 ping 时,计数器的值就被放在 pong 中,发送到 master。master 聚合计数器的值,并在 map 和 reduce 任务结束后返回给用户。当聚合时,重复执行任务的计数器只会被统计一次。有的计数器值是由 MapReduce 库直接维护的。

Lectures

Distributed System

分布式系统提供了app使用的基础设施,通过抽象隐藏了实现的细节,这些抽象包括:

  • 存储(storage)
  • 通信(communication)
  • 计算(computation)

在讨论分布式系统时,下面几个话题会时常出现:

  • 实现 RPC,线程,并发控制。

  • 性能

    • 理想情况:可扩展的吞吐量。N倍的服务器数量->(通过并行的CPU,磁盘,网络实现)N倍的吞吐量。因此为了处理更多的负载,只需要添加更多的机器。
    • 可扩展性随着N增加而变得困难:
      • 负载不均衡(Load im-blance),集群里有慢的机器(stragglers)。
      • 不可并行的代码:初始化,交互
      • 共享资源的瓶颈:网络
  • 容错

    • 大集群,复杂的网络->总会有出问题的地方
    • 希望能够隐藏这些错误,使得错误对app不可见
      • 可用性(Availability):在出错的情况下,app能继续使用数据。
      • 持久性(Durability):修复错误后,app能够继续正常工作。
    • big idea:多个服务器。如果一个server crash,client能继续使用其他的。
  • 一致性(consistency)

    • 通用目的的架构需要有良好定义的行为。
      • 例如:get(k)应该返回最近的put(k, v)
    • 实现良好定义的行为是困难的。
      • 难以保证多个服务器一致。
      • 在含有多个操作的更新中,客户端执行到一半可能crash了。
      • 服务器在尴尬的时刻崩溃,例如:执行了操作但没有返回结果。
      • 由于网络问题,服务器看起来好像挂了。
    • 一致性和性能是冲突的。
      • 实现一致性需要通信。
      • “强一致性”常常导致系统性能低下。
      • 高性能通常会对app造成“弱一致性”。
    • 开发者在这个范围内追求了很多设计点(People have pursued many design points in this spectrum)。

MapReduce

MapReduce概述

  • 上下文:对海量数据进行多个小时的运算。
  • 总目标:普通程序员能够在保证合理的效率的情况下,轻松的将数据处理切分到多个服务器。
  • 需要定义Map和Reduce函数,非并发的代码,且常常是很简单的。
  • MR运行在集群上,处理海量数据,隐藏分布式的细节。
1
2
3
4
5
6
7
8
input is divided into M files
Input1 -> Map -> a,1 b,1 c,1
Input2 -> Map -> b,1
Input3 -> Map -> a,1 c,1
| | |
| | -> Reduce -> c,2
| -----> Reduce -> b,2
---------> Reduce -> a,2
  • MR对每个输入文件调用Map(),生成<k2, v2>(中间数据)的集合
  • MR收集每个k2对应的所有v2,并输入给Reduce调用
  • 最后从Reduce()输出<k2, v3>的集合

MapReduce隐藏了很多痛苦的细节

  • 在服务器上启动s/w
  • 跟踪已结束的任务
  • 数据移动
  • 错误恢复

MapReduce有很好的可扩展性

N台机器->N倍吞吐量。假设M和R都>=R(大量的输入文件和map输出的key)。由于Map()之间无交互,可以并行执行,Reduce()也是,都能够并行执行,因此可以通过添加机器来增加吞吐量。

性能受限的潜在因素

网络。在Map->Reduce的shuffle时,所有数据都需要通过网络传输,因此减少通过网络来移动的数据是关键。

更多细节

  • mater:为worker分配任务,记录中间输出的位置。
  • M Map tasks, R Reduce tasks。
  • 输入存储在GFS,3份冗余。
  • 集群所有机器都运行GFS和MR worker。
  • input task比workder数量更多。
  • mater为每个worker分配Map task,在旧任务结束时分发新任务。
  • Map worker在本地磁盘将中间key hash到R个partition
  • 只有当所有Map结束后,本地的Reduce调用才会开始。
  • mater告诉Reducer从Map worker获取中间数据partition(intermediate data partitions)。
  • Reduce workers将最终结果写入GFS(一个文件对应一个Reduce task)。

设计细节:减少低速网络的影响

  • Map从本地磁盘读入GFS上的数据副本(replica),而不是从网络。
  • 中间数据仅仅在网络中传输一次。Map worker将数据写入本地磁盘,而不是GFS。
  • 中间数据被partition到包含很多key的文件。大网络中的传输更加高效。

如何实现良好的load balance?

load balance对于可扩展性很重要,N-1个server都等待1个server结束是不好的。但是有的任务就是会比其他花费更长的时间。

解决方案:任务数比worker数更多。 master为已完成当前任务的worker分配新的任务。一般来说,就不会有大任务占据主要的计算时间。那么快的server会比慢的server完成更多的任务,最终同时完成所有任务。

如何实现容错?

如果某个server在执行MR任务期间crash怎么办? 不是重启整个job,MR只重新运行失败的Map()Reduce()。这两个操作必须是纯函数:

  • 不保留调用之间的状态。
  • 不读取和写入除了MR输入/输入的文件。
  • 任务之间没有隐藏的通信。

因此重新执行会生成相同的结果。纯函数的要求是MR相比于其他并发编程模型的主要局限,当它也是MR简洁的关键。

crash recovery的细节

  • Map worker crashes:
    • master观察到worker再也不响应ping。
    • crash的worker的中间Map输出丢失,但这个数据有可能每个Reduce任务都需要。
    • master根据GFS上输入数据的其他副本来分配任务,并重新执行。
    • 某些Reduce worker可能已经读取了crash的worker生成的中间数据。此时就需要依赖于Map()的纯函数特性和确定性。
    • 如果Reduce获取到了所有的中间数据,那么master就不需要重新运行Map。然后接下来的一个Reduce会crash,进而导致强制重跑失败的Map。
  • Reduce worker crashes:
    • 已结束的任务不受影响,数据以冗余的形式存储在GFS。
    • master重新执行其他worker上未完成的任务。
  • Reduce worker在写入输出数据期间crash:
    • GFS的rename是atomic的,在写入完成前数据不可见。因此master可以安全的在其他地方重跑Reduce任务。

其他错误和问题

  • master给两个worker分配了相同的Map()任务怎么办? 原因可能是master错误的认为worker挂了。master只会将其中一个告诉给Reduce worker。
  • master给两个worker分配了相同的Reduce()任务怎么办? 两个都会尝试在GFS中写入相同的文件。GFS rename的atomic性质避免了结果是两者的混合,只有一个完整的文件可见。而Reduce()的纯函数特性使得两次输出的文件是一样的。
  • 如果某个worker很慢怎么办? 原因可能是硬件问题。master重新运行最后几个任务。
  • 如果worker由于h/w或s/w问题计算出了错误的输出怎么办? ╮(╯_╰)╭,MR假设CPU和software是“fail-stop”的。
  • 如果master crash了怎么办?
    • 从check-point恢复。
    • 放弃执行任务。
    • 使用多个master,一个可用,剩余standby。

什么样的app不适用于MapReduce?

  • 不是所有app都适用于map/shuffle/reduce模式。
  • 小数据量,因为开销很高。例如:网站的后端。
  • 对海量数据的小更新。例如:为一个大索引添加小文件。
  • 随机读写,因为Map和Reduce都不能选择输入数据。
  • 多次shuffle,例如:page-rank。可以使用多个MR来实现,但是不高效。
  • 更灵活的系统可以实现上述目标,但会导致更复杂的模型。

结论

MapReduce让集群计算受欢迎。

  • 不是最高效和灵活的。
  • 可扩展性良好。
  • 易于编程,错误和数据移动被隐藏了。

这些在实践中是很好的权衡。

Lab 1: MapReduce

  • 实验给出了框架代码,需要完成关键函数。
  • MapReduce的分布式实现除了需要关注task,还需要考虑存储,分布式存储不是这里的重点,因此实验在一台机器上运行worker thread,使用系统的文件系统模拟分布式存储。
  • 实验要求实现两种模式的MR,顺序执行所有task,以及分别并行执行map和reduce task。实验给出了的框架代码使用了go的channel来实现并行执行task。而对于顺序执行task的实现,实际上是分别把所有map和reduce task做了一次封装,得到“一个map task”和“一个reduce task”,分别执行封装好的“map task”和“reduce task”,其中每个实际的map和reduce都是顺序执行的。
  • lab的代码在github.com/chaomai/mit-6.824