0%

数据密集型应用

现在的很多应用是数据密集型的,数据是这些应用的主要挑战-数据的总量、数据的复杂度和数据变化的速度。

很多数据密集型的应用都是基于已有的数据系统提供的常用功能来构建的。例如:

  • 存储数据,以便自己或其他应用能够找到(基于数据库实现此功能)
  • 记住一个昂贵操作的结果,来加速读取(缓存)
  • 允许用户通过关键词搜索数据,或多种方式过滤数据(搜索索引)
  • 发送消息到另一个进程进行异步处理(流式处理)
  • 定时处理大量累积的数据(批处理)

由于数据系统的抽象,这些功能都看似很简单。但是,数据系统在逐渐变得相似,不同的数据系统可能同时具有多种特性,例如:Redis的cache和消息队列功能;应用程序越来越宽泛的需求使得单一数据系统无法完成,需要使用程序代码来组合不同的数据系统;同一需求,可能有多种方式和数据系统来实现。因此选择合适的数据系统和权衡架构的设计是一个值得思考的问题。

设计数据系统的原则

  • 可靠性(Reliability) 就算出现问题的时候(硬件或软件错误,人为错误),系统都应该应该持续的正确工作(在期望的水平上提供正确的功能)。
  • 可扩展性(Scalability) 随着系统的增长(数据量、流量或数据复杂度),应该有合理的方法来应对这些增长。
  • 可维护性(Maintainability) 随着时间的推移,许多人都会参与系统相关的工作(开发和运维,他们保证系统现有行为正常、并使系统能够应对新的情况),他们应该高效的工作。

可靠性

系统具有容错性(fault-tolerant或resilient)。系统在遇到特定错误的时候,也能按预期正常工作。

这里原文特别区别了fault和failure。fault指的是系统组件的运作偏离了预期,而failure指的是系统整体无法提供服务。设计一个完全没有fault的系统是不可能的,但是可以通过设计fault-tolerant机制来避免fault导致failure。 下面,failure会写为系统故障。

硬件故障 每个硬件都有预期的寿命,系统使用的硬件规模够大、运行时间够长时,硬件总会出故障。

通过软件和硬件层面冗余的方式,可以避免出现硬件故障时无法提供服务。

软件错误 硬件错误的发生相对独立,但是软件错误更加难以预期,往往会导致很多的系统错误。

通过完善的考虑系统的假设和交互、测试、进程隔离、监控、允许进程挂了后重启等方式来避免软件错误。

人为错误 系统的设计、构建和运维是由人来进行的,但人是不可靠的。

通过如下方式来减少人为错误: * 以最小化错误机会的方式来设计系统。例如:设计良好的抽象、API和管理界面来避免“做错误的事”。 * 解耦人们最容易犯的错和犯错的地方。例如:提供完整功能的非生产环境sanbox。 * 完善的测试。例如:从单元测试到整个系统集成测试。 * 提供快速和简单的错误恢复机制。例如:快速的回滚配置,上线新代码应逐步从小范围内到大范围,提供重新计算数据的工具来修复老数据错误。 * 详尽和清晰的监控。例如:性能计数和错误率。

可扩展性

即便一个系统现在能够可靠的工作,这不意味着未来也能。一个常见的原因就是负载的增加。当讨论可扩展性时,常常需要考虑,“如果系统以一个特定的方式增长,应该怎么办?”,“如何增加计算资源来应对额外的负载?”。

描述负载 首先需要简洁的描述负载,负载可以使用几个数字来描述,叫做负载参数(load parameters),参数的选择依赖于系统的架构,例如: * web服务器每秒的请求个数。 * 数据库的读写比例。 * 聊天室同时活跃的用户数。 * cache命中率。

描述性能 一旦有了系统负载的描述,那么可以讨论负载增加时会发生什么,可以有两个角度: * 负载增加时,如果保持系统资源不变,系统的性能会怎样? * 负载增加时,为了保持系统性能不变,需要增加多少资源?

讨论这两个问题需要描述性能, 1. 吞吐量(批处理系统) 2. 响应时间(在线系统) * 平均响应时间。 * 百分比响应时间。 例如:p50,取time的中位数,如果是200ms,那么代表50%的响应时间小于200ms。类似的还有p90,p99。 * 高百分比响应时间,又叫做尾部延迟(tail latencies)。

性能与可用性一起被用在SLOs(service level objectives)和SLAs(service level agreements)中,规约定义了服务期望的性能和可用性。

高百分比响应时间常常受队列延迟(queueing delay)的影响,少量慢请求会阻塞后续请求的处理,这个现象又叫做head-of-line blocking。

如果一个请求需要进一步使用更多的后端调用来完成,那么一个较慢的调用就会拖慢整个请求,这叫做尾部延迟放大(tail latency amplification)。

应对负载的方法 1. scaling up 垂直缩放,迁移到性能更强的机器。 2. scaling out 水平缩放,把负载分布到多个小机器上。也叫做share-nothing architecture。

实际工程上可能会混用两种方法,几个性能较强的机器可能比非常多的小机器来的划算。把无状态的服务分布到多个机器较为容易,但是把一个有状态的数据系统分布式化会引入很多额外的复杂性。

大规模分布式系统架构往往是针对应用高度定制化的,并不存在一个通用的分布式架构。因为要解决的问题可能是读为主、写为主、大量数据的存储为主、数据的复杂度、响应时间、访问方式,或者是前面各种因素的混合。

可维护性

软件开发的主要成本并不在最初的开发,而是持续的维护-修bug、运维、排错、兼容性的平台等。主要关注以下方面, 1. 可操作性,便于运维团队的维护。 需要提供: * 完善的监控 * 自动化和集成工具 * 避免依赖特定的机器 * 文档 * 良好的默认行为,并提供修改默认行为的方法 * 自我恢复,并提供手动操作的方法 * 可预测的行为 2. 简单,管理复杂性,便于其他开发者理解系统。 好的抽象可以隐藏复杂的细节。 3. 可进化,便于修改和增加系统功能,又叫做可扩展性(extensibility)、可修改性(modifiability)、可塑性(plasticity)。 增加数据系统的敏捷性。

Readings - In Search of an Understandable Consensus Algorithm (Extended Version) (to end of Section 5)论文

Introduction

共识算法(Consensus algorithms)允许一组机器作为一个一致的组工作,这个组可以在某些成员失败的情况下存活。Paxos是过去10多年最常被讨论的共识算法,但是难以理解且不便于实现。提出Raft的主要目标是可理解性。通过解耦leader选举、log复制和安全,以及减少状态空间,来增加可理解性。

多副本状态机(Replicated state machine)与共识算法 共识算法常常出现在多副本状态机的讨论中。

-w384

多副本状态机常用于解决分布式系统中的各种容错问题,使用复制log来实现,每个状态机以相同的顺序执行相同的命令,最终算出相同的状态和结果。保证replicated log的一致是由共识算法来实现的。

共识算法一般有以下几个特征: * 安全:非拜占庭条件(网络问题、丢包、乱序)下,不返回任何错误的结果。 * 可用性:大多数server可用,则系统整体可用。 * 不依赖时间来保证日志的一致性。 * 大多数server执行完毕则返回,少量慢server不影响系统性能。

Raft

raft的工作过程是,首先选举leader,leader会负责管理replicated log。leader接受来自client的log,复制到其他机器,并在安全的时候通知其他机器把log输入状态机。

raft可以解耦为以下三个部分: 1. leader选举(Leader election) 2. 日志复制(Log replication) 3. 安全(Safety)

在理解这三个部分前,有几个基本概念需要知道, server 状态 * leader:只能有一个leader * follower:只响应来自其他server的请求 * candidate:用于选举

-w433

时间 raft把时间分为terms,term由一个递增的数字标识,表示current term。每个term以leader选举作为起始,如果未选举出leader,那么下一个term继续选。term内选举出的leader管理集群直到term结束。

-w351

不同的server可能观察到不同的term转换,server使用current term(在server通信的时候发送)作为逻辑时钟,来确保能够探测到过期的信息。

通信 使用rpc。基本的共识算法使用两种rpc, 1. RequestVote RPCs:candidates调用,来进行拉票,用于选主。 2. AppendEntries RPCs:leader发起,用于复制日志和心跳。

leader选举(Leader election)

选举过程为,follower在election timeout内都没有收到leader的通信,则转换为candidate状态,并自增当前term、为自己投票、向其他server发送RequestVote RPCs。candidate维持在这个状态,直到下列情况出现: 1. 获得了相同term的多数派的投票,赢得选举。 一旦candidate赢得选举,就转换为leader,并向其他server发送heartbeat,宣告leadership。 2. 收到AppendEntries RPCs,其他candidate赢得了选举,成为leader。 1. 如果leader的term >= 自己的term,那么这个leader是合法的,自己转换为follower状态。 2. 如果leader的term < 自己的term,那么拒绝这个rpc,自己维持在candidate状态。 3. election timeout后,未获得足够的选票。 多个candidate可能同时发起选举,发生了split vote,导致任何一个都无法获得多数派的投票。自增term,开始下一次选举。

在下面的状态图中,状态转换的上方表示触发转换的event,下方表示发生转换时执行的action。

leader election

选举安全性: * 在每个term内,每个server只能至多为一个candidate投票。

在选举过程中,raft使用随机election timeout来保证, * split votes(没有任何candidate能获得大多数投票)会很少出现。 比较常见的是,C先timeout,然后投票给自己,并发送RequestVote RPCs,最终win。 -w397 * 且split votes能够被快速解决。如果出现split votes,candidate先等待随机election timeout。

日志复制(Log replication)

如何复制 1. leader追加log entry。 2. AppendEntries RPC。 3. 当log entry被安全复制(成为committed log entry,leader更新commit index), * leader apply log entry,然后返回结果到client。 * leader AppendEntries RPC通知follower committed entry,follower apply log entry。

如果follower失败、运行缓慢、丢包,leader重试AppendEntries RPC(就算leader已经返回client结果),直到所有follower最终收到所有log entry。raft保证了committed entry是持久化的,且最终能被所有可用的状态机执行。

那什么时候才算log entry被安全复制?当创建log entry的leader把entry复制到大多数server上以后。这同时也提交了在leader中所有先前的log,包括被前leader创建的log。

应对出错 raft在复制log时会维护如下性质来保证Log Matching Property(log consistency), * 如果两个entry在不同的log中有相同的index和term,那么它们的cmd是相同的。 因为leader至多只创建一个带有相同index和term的entry,且entry不会在log中改变位置。 * 如果两个entry在不同的log中有相同的index和term,那么所有先前的log都是相同的。 consistency check:AppendEntries RPC发送新的entry时,会包含前一个entry,如果follower没有找到与前一个entry相同的index和term,那么follower不会写入新的entry。

根据以上两点,结合数学归纳法可知,Log Matching Property可以被满足。在正常情况下,leader和follower的log是保持一致的。

如果leader或follower崩溃引起了log不一致,如何恢复一致性?leader会强制follower复制自己的log, 1. leader维护了{follower: nextIndex},表示下一个将会发送到follower的entry。 2. leader启动时,将nextIndex置为max(self log index) + 1。 3. 如果follower的日志不一致,那么接下来的AppendEntries RPC consistency check会失败。leader会将nextIndex - 1并重试,直到成功。

安全(Safety)

任何实现了日志复制的系统,都必须满足:一旦entry已经applied到状态机,那么任何其他状态机都不能为这条entry apply一个不同的值。

截止目前所描述的leader选举,存在的问题是,如果follower未收到leader的提交,那么当它成为leader以后,就会用自己的log覆盖其他server的。

选举约束 任何基于leader的共识算法,leader必须最终包含所有committed entry。raft需要保证所有committed entry,都出现在后续每个新的leader中,而这个保证确保了本节内容开头的安全性要求。

在选举的时候,使用voting process来避免未包含所有committed entry的candidate赢得选举。由于candidate在选举的时候需要联系大多数server,即只需要求每个committed entry必须出现在至少一个server(选举时联系的server)里面即可(抽屉原理)。

如果candidate包含的日志与大多数server包含的log至少一样新,那么这个candidate就包含了所有的log。具体判断方法是, 1. 在RequestVote RPC进行投票时,rpc包含了candidate最后一个entry的index和term。 2. 如果voter(收到rpc call的一方)的log是更新的,即, \[(lastTerm_v > lastTerm_c) || ((lastTerm_v == lastTerm_c) \\&\\& (lastIndex_v > lastIndex_c))\] 那么voter拒绝此次投票。

提交上一个term的entry 新leader能否直接提交上一个term的entry?

-w463

c中s1成为term4的leader后继续复制term2的日志index2,此时日志已经复制到大多数机器上。按照之前的规则,s1可以认为日志已经是committed。但如果接着s5成为了leader(接受s3和s4的投票),s5会覆盖s1已经commit的日志。

因此不能仅通过副本数判断先前term的日志是不是committed,还需要满足至少有一个属于当前term的log也复制到大多数server,才能认为是。

follower和candidate崩溃

raft rpc是幂等的,leader无限重试。

时间和可用性

raft的安全性不依赖时间。但是系统的可用性依赖时间,这里有尤其指的是leader选举。election timeout需要满足下面的要求,raft才能选举并保持一个稳定的leader,

\[broadcastTime \ll electionTimeout \ll MTBF\]

Leadership transfer

本质上还是使用leader选举的机制,通过主动触发目标server进行选举来实现来迁移,与后面的配置变更不同。过程如下: 1. 旧leader暂停服务。 2. 旧leader进行正常的日志复制过程,把log复制到目标server。 3. 旧leader发送TimeoutNow到目标server,触发目标server开始选举(相当于调快了timer,触发了election timeout)。 4. 目标server大概率会比其他server先发起选举,并成为下一个term的leader。 5. 选举成功后,旧leader收到heartbeat,出让leadership,至此transfer完成。

如果迁移过程未在一个election timeout内完成,那么终止迁移。如果迁移成功了,但是旧leader误以为失败了,那最坏情况是进行一次额外的选举。

问题

  1. 日志复制时每个步骤都有可能出现错误,如何处理? raft并不允许“false positives”的出现(返回client执行成功了,但实际上没有成功),但是可能会有“false negatives”的出现,告知client失败了,但实际上是成功了。为了避免false negatives,client必须重试直到成功,且每次只能有一个未完成的command,相应地,raft也必须提供去重机制。

  2. 为什么不能直接复制上一个term未committed的entry?

  3. raft为何能够防止脑裂? 2f+1个server,成为leader需要获得大多数server(f+1个)的投票。如果发生网络分区,必然有一个partition的分区少于f+1个。这个分区里面,

    • 如果没有leader,那么也不可能选举出leader。
    • 如果有leader,由于日志无法被安全复制,因此不会对client做出任何承诺(不会成为committed entry,也不会被apply到状态机)。
  4. 为什么需要leader?

    • 简化复制日志的过程。
    • 保证所有的副本都以相同的顺序执行相同的命令。
  5. server变为candidate后会发生什么? 开始选举:inc term,vote self,发送RequestVote RPC。可能结果如下,

    • 赢得选举。 变为leader。
    • 选举失败,收到来自其他leader的AppendEntries RPC。 变为follower。
    • 既没有赢得选举,也没有收到来自其他leader的AppendEntries RPC。 维持candidate。election timeout,开始下一次选举。 如果一直无法获得大多数server的投票,这个过程会一直重复,term会一直增加。 此时,如果client有请求会发生什么?
  6. 如何保证同一个term内只有一个leader?

    • leader需要获得来自大多数server的投票。
    • 在同一个term内,每个candidate只能投票一次。
    • 在同一个term内,至多只有一个server能获得大多数server的投票(即使出现网络分区)。
  7. 什么情况下选举会失败?

    • 大多数server都失败了。
    • 两个server获得了相同票数(例如:2f+1个server,挂了一个server)。
  8. 选举失败会发生什么? election timeout,开始下一次选举。

  9. 如何设置election timeout?

    • \(broadcastTime \ll electionTimeout \ll MTBF\)
    • random
    • 至少是几个heartbeat间隔
  10. replicated和committed entry的区别?

    • replicated是做了复制,可能被覆盖。
    • committed,已提交,不会丢失。
  11. 每个副本的日志会完全一致吗? 不会,可能落后,可能会有临时的不一致。但最终会一致,且状态机执行的命令是一致的。

  12. 为什么leader不能直接提交上一个term的entry?以及如何避免直接提交带来的问题? 提交时,需要等到至少一个当前term的entry也安全复制以后。

  13. leader在什么时候覆盖follower的entry是合法的? uncommitted entry。

  14. leader覆盖follower的entry时,可能会覆盖committed entry吗? 不会。

    1. 先看entry成为committed时,发生了什么。log entry需要安全复制,即发送到大多数server上。
    2. candidate成为leader,需要联系大多数server,以保证自身的log与大多数server的一样新(选举约束)。因此成为leader的candidate一定包含了所有committed entry。
  15. 如果下图中的leader(for term 8)挂了,a、d和f,哪个会被选举为leader?谁投的票?新leader产生后,哪些entry一定会保留下来? -w478

    a-f每个server最多可能获得投票的情况如下: a:a,b,e,f b:b,f c:a,b,c,e,f d:a,b,c,d,e,f e:b,e f:f

    所以a或d可能成为leader,一定会保留下的log是111445566。

  16. 如果server刚开始选举,此时收到了来自leader的AppendEntries RPC(假设leader与这个server都是处于同一个term),此时要怎么处理?

    • candidate成为leader后,应该立即发送心跳来避免发生多余的选举。
    • 如果发生这个情况,由于term一样,leader会忽略RequestVote RPC。
  17. 如果leader完成复制,更新自身的commit index,且完成了apply,但在通知follower commit index前挂了(或丢失了leader资格),这个entry最终会丢失吗?

  18. lastApplied是否应该做持久化? 这个地方有点争议。在ongardie/dissertation的Errata中,指明了lastApplied是否做持久化应该与状态机一致。但在Raft Q&A中的说法是,状态机如果做持久化,那么也应该负责记住执行了哪些log。

  19. follower拒绝AppendEntries RPC时,如何快速的解决冲突? follower拒绝时,返回冲突entry的term,以及这个term首个entry的index。

    • 如果leader有这个entry,那么移动nextIndex到冲突的那个entry。
    • 如果leader没有这个entry,那么移动nextIndex到那个term的首个index。

    还可以使用二分法,查找首个冲突的entry,这可以实现在最坏情况下的最佳时间复杂度。

Lab 2A: Raft

锁的使用建议

这个lecture说了几点关于锁的建议,我觉得最重要的一点是,

Be careful about assumptions across a drop and re-acquire of a lock. One place this can arise is when avoiding waiting with locks held.

在减少锁的粒度的时候,可能也把变量的write和read分开了,进而导致在read的时候,变量已经被其他线程改变了。

References

  1. Why Raft never commits log entries from previous terms directly
  2. raft之 为什么不能直接commit 上一个term的entry
  3. raft本质理解
  4. Raft user study lectures
  5. The Raft Consensus Algorithm
  6. ongardie/dissertation
  7. ongardie/runway-model-raft
  8. Raft Q&A

Readings - Practical System for Fault-Tolerant Virtual Machines Fault-Tolerant Virtual Machines论文

这篇论文讨论的主从复制与常见的相比,非常极端和雄心勃勃,论文基于vm构建了一个os级别的主从复制系统,较为细致的讨论了主从复制的设计和实现。

介绍

主从复制是实现server容错的常用方法,如果primary挂了,backup可以继续提供服务。将primary的全量数据同步到backup是比较耗费带宽的操作。更好的方法是将server视为确定状态机,基本思路是把primary和backup置为相同的初始状态,并保证二者都能以相同的顺序收到相同的输入。由于存在某些操作是非确定的(如:中断和获得当前时间),因此还需要额外的协同。

FT协议

Deterministic Replay

为了实现在backup进行replay,需要保证: 1. 正确的捕获所有输入和非确定的操作,以保证backup的确定性执行。 2. 在backup上正确的replay。 3. 对性能没有影响。

论文中关于非确定的操作,零零散散在不少地方提到,不过总结下来其实就是,这个操作不是pure function,结果受除输入的外部状态影响。更具体的就是,非确定操作使用到的资源是与其他(来自vm或hypervisor)的进程共享的,竞争导致了非确定的结果。

在多核系统上,上述的竞争会更复杂,这也是为什么论文没有在多核系统上实现的原因。

FT协议

primary的日志实时的通过logging channel发送到backup。为了保证在切换的时候,backup能够以与primary相同状态进行服务,需要满足:

Output Requirement:如果primary失败,backup接替了primary,backup的执行需要保证所有的输出都与primary的一致。

只要满足Output Requirement,在发生主从切换的时候,client就感知不到打断和不一致。为了保证Output Requirement,需要backup在收到能够产生那个output的input之前,primary推迟输出,也就是下面的规则:

Output Rule:只有当backup收到能够产生一个output的log,并返回ack(表示已经存入log buffer)给primary,primary才能输出。

除了VM-FT有,在其他的多副本系统上都以某种形式出现着。

-w470

primary等待ack不必阻塞系统当前的执行。

检测和响应失败

检测失败: * server之间的heartbeat * logging channel的traffic是否一直存在

如何避免split-brain: * 共享磁盘存储,原子的检测标志位

实现

FT VM启动

  • 使用基于VMware VMotion实现的FT VMotion完成clone,primary的中断小于1s。
  • 使用VMware vShpere实现的集群服务来选择最佳server创建backup。

logging channel管理

-w322

log buffer的局限: primary(或backup)的log buffer满了(或空了),primary(或backup)需要等待至log buffer可用为止,此时会暂停对client的服务。

主从切换的时间=检测primary失败的时间+backup消费log buffer的时间,为了减少切换的时间,在backup无法及时消费log的时候,会减慢primary的速度(限制CPU)。

FT VM上的操作

除VMotion外的所有运维操作(例如:调整cpu限制),都必须在primary上完成,然后通过特殊的日志项目发送到backup。

关于磁盘io的实现问题

  1. 磁盘访问时的竞争会导致非确定的操作。
    • 检测这样的io竞争,并在primary和backup上以相同顺序执行。
  2. 使用DMA时,对磁盘和内存的访问会导致非确定的操作。
    • 对磁盘的读写,改为读写bounce buffer。
    • 只有当数据全部读到buffer以后,FT才会从读指令处恢复primary/backup的执行,避免二者出现差异(如果直接依赖DMA读取,primary和backup可能会在不可预知的时间点发生缺页中断,从而导致不一致)。
  3. primary在磁盘io完成前就挂了。
    • backup切换为primary以后,无从得知io是否已经完成。FT会在backup上重试(前两点已经消除了导致非确定结果的操作)未完成的io操作。
    • 那么如何判断未完成的io操作是哪些?io完成以后,设备会产生一个中断,如果某个io log缺少相应的中断,那么就需要重试。
  4. primary在磁盘io完成后挂了
    • backup同样无从得知io是否已经完成。此时重新执行写磁盘是幂等的(而对于网络io,tcp会忽略重复的包)。

关于网络io的实现问题

hypervisor对vm buffer的更新导致了非确定操作,改为由vm触发hypervisor的中断,并记录到log。

其他设计

非共享磁盘

优势: * 磁盘作为vm的内部状态,primary写磁盘时不必等待backup的ack。

缺点: * backup失败、执行FT VMotion时,需要同步磁盘数据。 * 需要使用其他方法来解决split-brain,例如:另外一个server作为共享存储;多个机器进行投票选主。

backup上执行read

在从执行read,论文考虑的点并不是可以增加系统的read上限,而是考虑不发送read log可以减少logging channel的带宽。

具体看在backup上执行read,需要注意的点是: * 如果primary成功,backup失败,backup需要重试,直到成功。 * 如果primary失败,那么尝试读取的内存必须发送到backup。 * 如果primary执行了读,然后是写,那么在backup也必须按照这个顺序来执行。

Lecture

Fault tolerance

理想的特性 * 可用性 * 强一致性 * 对client和server software透明 * 高效

容错,容什么错? * Fail-stop failures * Independent failures * Network drops some/all packets * Network partition

不包括: * Incorrect execution * Correlated failures * Configuration errors * Malice

Fault tolerant MapReduce master

lab 1中的woker由于是无状态的,且mapper和reducer执行的都是pure function,因此实现容错很简单。

master实现容错,需要考虑: * 需要复制什么状态?应用程序级?指令级?(woker list、完成的job、空闲的worker、tcp连接状态、程序memory和stack、cpu寄存器?) * primary是否需要等待backup? * 什么时候切换到backup? * 切换时是否能被观察到? * 如何快速的切换?

主要方法有 1. State transfer,简单,但是state可能会很大,传输很慢 * 主副本执行服务 * primary把新状态发送到backup 2. Replicated state machine,高效但复杂 * 所有副本执行所有的操作 * 如果所有副本有相同的起始状态、相同的操作、相同的顺序、deterministic,那么就会有相同的最终状态 * 例如:VM-FT,GFS

VM-FT

VM-FT是一个replicated state machine。

为了避免primary和backup出现差异,backup必须在指令流的相同位置、以相同的顺序看到相同事件。对于普通的指令,这个是比较容易实现的。FT对中断和磁盘/网络io会做特殊的处理。

例如:时钟中断

ft time interrupt

FT的有缺点

适用场景 1. 重要且低延迟的服务,例如:name server。 2. 不便于修改的服务。

不适用的情况 1. 高吞吐量的服务 * state仅仅是应用程序级别的,降低了开销,实现更高的性能。 * 例如:GFS适用应用程序级别的state。

Reference

  1. FAILURE MODES IN DISTRIBUTED SYSTEMS

Readings - The Google File System论文

介绍

GFS是由Google设计和实现的,以满足Google对数据处理快速正常的需求。GFS和先前的分布式文件系统有很多相似的目标,例如:性能、可扩展性、可靠性和可用性。然而,GFS的设计是由Google对应用负载和技术环境的关键(当前和预期的)观察驱动的,这反映了与早期文件系统设计假设的显著不同。Google重新审视传统的选择,并探索在设计领域探索了彻底不同的观点。 * 组件故障是常态而非例外。 因此持续的监控、错误监测、容错和自动恢复是系统不可缺少的。 * 传统标准的文件是巨大的,常常很多GB。处理包含数十亿个对象、很多TB、且快速增长的数据集时,即便文件系统可以支持,也很难管理数十亿个约KB大小的文件。 因此设计假设和参数,例如IO操作和block大小必须被重新审视。 * 大多数文件修改都是通过追加新数据的方式,而非覆盖已有的数据。对文件的随机写几乎已经不存在了。一旦写入,文件就是只读的,且常常只是顺序读。 鉴于这样对大文件的访问模式,追加成为了性能优化和原子保证的的关注点,而在client上对数据块的缓存失去了吸引力。 * 应用程序和文件系统api的协同设计,通过提升灵活性,有益于整个系统。

设计概览

接口

虽然GFS没有实现例如POSIX的标准API,但也提供了一组熟悉的文件系统接口。文件以层次结构的方式用目录组织起来,并用路径名来标识。GFS支持create,delete,open,close,readwrite文件。GFS还支持snapshotrecord append

架构

一个GFS集群由一个master和多个chunkserver构成,可以被多个client访问。

-w757

文件被切分为固定大小的chunks。每个chunk都由一个不可变,且全局唯一的64位chunk handle标识,这个chunk handle在创建chunk时master分配的。chunkservers把chunk作为linux文件存储在本地磁盘上,并通过指定的chunk handle和byte range来读写文件。为了reliability(可靠性),每个chunk会在多个chunkservers上有复制。

master维护这整个文件系统的metadata,包括namespace,访问控制信息,文件到chunk的映射,以及chucks的当前位置。master还可控制着系统级别的活动,例如chuck租约管理,孤儿chunk的垃圾回收,以及chunkservers之间的chunk迁移。master定期与每个chunkserver以HeartBeat消息的形式进行通信,完成指令的发送和状态的收集。

client与master通信来进行metadata相关的操作,数据相关的通信是直接与chunkserver进行的。

client和chunkserver都不需要缓存数据。client缓存数据的收益很小,因为大多数程序都是流式读取大文件或者数据量太大而无法缓存。client不用缓存数据消除了缓存一致性的问题,简化了系统设计。但client会缓存metadata。chunkserver不必缓存文件数据,因为chunk都是以本地文件的形式保存的,linux buffer cache会把常访问的数据放入内存。

单一master

单一master极大的简化了设计,并使master能够使用全局信息来进行复杂的chunk布局和复制决策。然而当读写时,必须最小化master的参与,这样master才不会成为瓶颈。client进行读写时,先询问master应该连接哪个chunkserver,并缓存这个信息一段时间,然后直接与这个chunkserver交互来完成后续的操作。

chunk size

chunk size是关键设计参数之一。GFS使用一个远大于典型文件系统的block size,64MB。每个chunk副本都以普通linux文件的形式存储在chunkserver,并在需要的时候扩展。惰性空间分配避免了由于内部碎片导致的空间浪费。

大型chunk size有这些优势, * 减少了client与master交互的需求。 读写同一个chunk只需要向master请求一次chunk的位置信息。 * 由于chunk较大,client也较为可能在一个给定的chunk上进行很多的操作。 通过在较长时间内保持与chunkserver的TCP连接,可以减少网络开销。 * 减少了存储在master上metadata的大小。 由此可以将metadata放入内存。

然而大型chunk size,即便有惰性空间分配,也存在弊端, * chunkserver的热点访问。 一个小文件仅有为数不多的chunks组成,可能就一个。如果大量的client都访问同一个文件,那么存储这些chunk的chunkserver可能会成为热点。

Metadata

mater主要存储3类metadata: * 文件和chunk namespace * 文件到chunks的映射关系 * 每个chunk副本的位置

其他metadata还包括所有权和权限、每个chunk的版本、引用计数(用于实现copy-on-write)。

所有的metadata都是存储在内存中的。 * 前两种还会进行持久化存储(使用write-ahead log),这是通过把记录修改到操作日志、在master落盘、以及在远程机器上存放副本来实现的。 * 对于最后一种metadata,master并不会做持久化存储。在master启动和有chunkserver加入集群的时候,master会询问每一个chunkserver存放的chunks。

In-Memory Data Structures 由于metadata是存放在内存中的,因此mater的操作很快,除此之外还能完成定期在后台较快的完整扫描。这个定期扫描用于实现chunk的gc,chunkserver故障时副本重新复制,以及chunk的迁移。

一个潜在的问题是存储的chunk数目受限于内存的大小。但由于每个chunk的metadata少于64byte,且文件的namespace数据也少于64byte,并且启用了前缀压缩,因此这不是一个严重的问题。如果确实有必要支持更大的文件系统,加内存即可。

Chunk Locations master并不对chunk locations做持久化。master启动时,会向所有chunkserver请求。之后,由于master控制着所有chunks的放置,并通过心跳消息来监控chunkserver,master能够确保自身的信息是最新的。

为什么不做持久化? 1. 消除了master和chunkserver的同步问题。 chunkserver可能加入、离开、重启、重命名、故障等。 2. chunkserver对自己有和没有哪些chunk有最终的话语权。 在master维护此信息的一致视图是没有意义的,chunkser可能出现1中的各种问题。

Operation Log 操作日志对GFS很重要, * 包含了metadata关键修改的历史记录,并持久化。 * 作为逻辑时间戳,定义了并发操作的顺序。 文件和chunks,以及它们的版本,全都在创建的时候被逻辑时间唯一且永久的标识。

可靠性保证, * 仅当metadata的修改完成持久化以后,这些修改才对client可见。 * 在多个远程机器上有复制。 * 仅当把相应的log记录写入本地和远程机器的磁盘后,才响应client。 * master会批量flush日志,来减少flush和复制对整个集群吞吐量的影响。 * master通过重放操作日志来恢复文件系统的状态。 * 为了减少启动的时间,需要保证log较小。 * 当log增长超过特定大小时,master会checkpoint自身的状态,以便可以在恢复时载入最后一个checkpoint并重放在那之后的log。

checkpoint和恢复, * checkpoint类似于压缩后的B树,可以直接map到内存,并用用户namespace的查找,且不需要额外的解析。 * master创建checkpoint时,会切换到新log文件,并在另外的线程中创建checkpoint(包含了checkpoint前的所有修改),避免延误当前的修改。 * 恢复只需要最近的一个完整(需要检测是否完整)checkpoint和后续的log文件。更老的checkpoint和log是可以释放的。

一致性模型

GFS有一个宽松的一致性模型,这在支持高度分布式应用的同时,也相对简单和高效。

Guarantees by GFS 文件namespace修改(例如:创建文件)是原子的,由master专门执行。namespace锁保证了原子性和正确性;master的操作日志定义了这些操作的全局顺序。

一个文件区域有两种状态, 1. consistent:无论client从哪个副本读取,都能看到相同的数据。 2. defined:在文件数据修改后,如果文件区域是consistent的,并且client能看到所有写入到文件的修改。

数据修改包含, 1. 写入 * 写入操作会把数据写到应用程序指定的文件offset。 2. 追加 * 即使在并发修改存在的情况下,追加会把数据在GFS选择的offset处至少一次原子追加一次。 * 这个offset会返回给client,标记了一个defined文件区域的起始位置,这个区域包含了已追加记录。 * GFS可能会插入填充或重复记录项。他们占据的区域被认为是inconsistent,且占用户数据总量的很小一部分。

Write Record Append
Serial success defined(同时也是consistent) defined interspersed with inconsistent
Concurrent success consistent but undefined
1. 所有client都能看到相同的数据。
2. 但是这些数据并不能反映任何修改所写入的内容。
3. 这些数据一般包含混合了多个修改的片段。
defined interspersed with inconsistent
Failure inconsistent(同时也是undefined) inconsistent

如何区分defined和undefined的文件区域?

在一些列的成功修改后,被修改的文件区域保证是defined(?),且包含了最后一次修改所写入的数据。GFS通过以下方式来实现这个保证, 1. 在所有的副本上以相同的顺序对chunk进行修改。 2. 用chunk版本号来检测过期的副本(由于chunkserver下线导致缺失修改)。 过期的副本不会参与到修改或返回给client,会尽早的被gc掉。

不过这里我存疑,例如:concurrent write,并不能保证defined。

某些client会cache chunk的位置。由于cache的timeout,读取到过期数据的时间窗是有限的。另外,对于大多数文件都是追加操作,一个过期的副本通常会返回过早结束(a premature end of chunk)的文件块,而非过期的数据。

在修改完成很长时间后,机器故障也会破坏或摧毁数据。GFS通过master和chunkserver定期的握手来检测失效的chunkserver,并通过校验和来检查数据损坏。当问题出现时,数据会尽快的从有效的副本进行恢复。如果在GFS来不及反应(没有足够的处理时间)的时候,所有的副本都丢失了,那应用能收到错误,而不是看到损坏的数据。

Implications for Applications GFS应用可以通过一些已经用于其他目的的简单技术来适应宽松一致性模型, * 依赖追加而非覆盖,以及checkpointing * 例如:writer从头生成一个文件,待所有数据写入完毕以后,原子的把文件进行重命名。也可以定期创建checkpoints记录成功写入了多少。checkpoints也可以包含应用级别的校验和。readers只检查和处理至最后一个checkpoint之间的文件区域,这些文件区域是defined。 * 追加远比随机写要高效和有弹性的多。 * checkpoint允许writer以增量的方式重新写入,并避免reader处理那些从应用的角度看仍是不完整的数据,尽管这些数据已被成功写入。 * 写入时自我校验和自我识别的记录 * 记录是以至少追加一次的语义来记录每个writer的输出的,因而reader需要处理偶然的填充和重复的情况。 * writer写入的每个记录都包含额外的信息,例如校验和,用于验证。reader用校验和来识别并丢弃填充数据,以及记录片段。 * 如果无法容忍偶然出现的重复(例如,这些数据会触发非幂等的操作),那可以通过记录中的唯一标识来进行过滤。

Checkpointing allows writers to restart incrementally and keeps readers from processing successfully written file data that is still incomplete from the application’s perspective. 我没有完全理解所谓的从应用的角度看仍是不完整的数据,是指的最后一个checkpoint之后的那些数据?

系统交互

GFS的设计可以最大限度的减少master参与到所有的操作。

租约和修改顺序

文件的修改发生在chunk的所有副本上,GFS使用租约来维护副本之间一致的修改顺序。master会将一个chunk的租借给其中一个副本,这个部分叫做主副本。主副本选择对chunk所有修改的序列顺序,所有的副本在应用这些修改的时候都会遵循这个顺序。最后,全局的修改顺序,首先由master选择的租借授权顺序定义(先选择某个副本为primary,然后可能又选择了另一个为primary),并在租期内由主副本分配的序列号定义。

租约机制的好处是最小化master的管理开销。 * 租约起始的timeout是60s,但只要chunk还在被修改,主副本可以无限次的请求,然后(一般情况下)得到timeout的扩展。 * timeout扩展的请求的授权是存放在master和所有chunkservers定期交互的心跳包里面的。 * master可能会在租约过期前撤销(例如master要禁止一个正在rename的文件的修改)。 * 即使master丢失了与primary的通信,master也可以在老的租约过期后,将新的租约授权给另一个副本。

-w368

下面以写入为例说明整个过程, 1. client向master请求持有租约的chunkserver以及其他副本的位置。如果租约未被持有,master会授权一个副本。 2. master返回给client primary的标识和其他副本的位置。client cache这些数据,且仅在无法连接到primary或primary不在持有租约的时候才会联系master。 3. client将数据推送到所有副本,任何顺序均可。每个chunkserver会将这些数据存储在内部的LRU缓存中,直至数据被使用或过期。 4. 一旦所有副本确认收到数据以后,client向primary发送写请求。写请求标识了先前推送到所有副本的数据。 * primary为所有收到的修改(可能来自多个client)分配连续的序列号,序列号提供了必要的序列化(necessary serialization)。 * primary将修改以序列号顺序应用到本地状态。 5. primary向所有从副本转发写请求。每个从副本都以primary分配的相同序列号顺序应用修改。 6. 所有从副本回复primary,表明已完成操作。 7. primary回复client。

如果上述过程出错, * 任何副本发生错误,都会汇报给client。 * 如果在primary发生错误,序列号将不会被分配和转发。 * client的请求被认为已失败,被修改的文件区域将处于不一致状态。 * client会做重试,先会重试步骤3~7,如果不行,会完全从头重试写入。

如果存在另一个并发写入到相同位置的client, * 前一个client写入的内容被后一个写入的覆盖 * 所有副本都有相同的数据,但是混合了来自两个client的数据,consistent but undefined。

如果一个写入的数据很大或跨越了chunk的边界,GFS client会把数据打散成多个写操作。这些写操作遵循了上述流程,但是可能会与其他client的并发写操作交替和被覆盖。因此文件区域将会出现consistent but undefined的情况。

数据流

从控制流解耦数据流是为了高效的使用网络, 1. 最大化利用每个机器的网络带宽 * 数据以流水线的方式,沿着精心挑选的chunkservers链进行线性的推送,而非以其他拓扑形式来推送(例如:树)。 * 这样每个机器的所有出口带宽都会用书尽可能快的传输数据,而非将带宽拆分到多个接收者。 2. 避免网络瓶颈和高延迟的链路 * 交换机链路通常有这两个问题。 * 每个机器会把数据转发到网络拓扑中最近(通过ip地址来估算)的且未收到数据的机器。 3. 最小化推送所有数据的延迟 * 通过在TCP连接上以pipelining的方式进行数据传输来实现 * 一旦某个chunkserver收到一份数据,就立即开始转发。立即转发并不会降低接收数据的速度。 * pipelining在使用全双工链路交换网络的情况下很有用。 * 在没有网络拥塞的情况下,传输一份R个副本B byte的数据,理想的时间开销是B/T + RL,T是网络吞吐量,L是机器间的传输延迟。

Atomic Record Appends

GFS提供了原子追加记录的操作,叫做record append。 * 传统的写操作中,client指定data需要写入的offset。并发写入到同一个区域是不可序列化的。 * record append中,client指定data,GFS在自己选择的offset处以原子的方式至少一次追加数据到文件末尾,并将offset返回给client。

record append是一种修改操作,遵循前面描述的控制流。 1. client向文件最后一个chunk的所有副本推送数据,然后向primary发送请求。 2. primary检查追加到当前chunk是否会导致chunk超出大小限制。 * 如果是,那么填充当前chunk,并告知从副本也填充。返回client需要在下一个chunk上重试。 * 追加记录被限制在0.25 * maximum chunk size,以限制碎片在可接受的范围内。 * 如果没有超过,那么primary进行追加,并告知从副本也在相同的offset追加,最后返回client操作成功。 3. 如果在任何副本上追加失败,client会重试。

重试追加时, * 重试的结果是,同一个chunk的副本可能会包含不同的数据,可能包括部分或全部相同的记录。 * GFS并不保证所有副本每个字节都是相同的,只能保证数据会以一个原子单位的形式至少一次写入。 * 对于一个成功的写入操作,数据一定写在某个chunk的所有副本的相同offset位置。在这之后,所有副本都至少与记录的结尾一样长。因此任何后续的记录,即使另一个副本成为primary的情况下,都会被分配到更高的offset,或者另一个chunk。

Snapshot

GFS使用copy-on-write的方式来实现文件或目录的快照,创建snapshot的速度很快。 * master收到快照请求时,首先撤销将要做snapshot的所有授权chunk的租约。 * 租约撤销或过期后,master在磁盘记录操作日志。接着通过复制文件或目录树的metadata,把日志应用到内存状态。新创建的文件快照与原文件指向相同的chunks。 * client写入某个chunk C时,先通过master找到primary。master发现引用计数不唯1,选择一个新的chunk handle C',并通知相关chunkserver本地复制创建C'。 * 最后client遵循前面的过程进行写入。

对目录进行snapshot的时候,复制的metadata是什么?

master操作

master执行, * 所有namespace的操作。 * 管理chunk副本的放置、创建、复制、以及与各种系统级活动协调来保证:chunk是完全备份的、平衡chunkserver的负载、回收未使用的空间。

namespace管理和锁

GFS使用完整路径名到metadata的map来从逻辑上表示namespace。存储的时候使用前缀压缩。

锁以及加锁方式, * namespace树的每个节点(绝对文件名或绝对目录名)都有关联的读写锁,读写锁是惰性分配的。 * master进行每个操作前,都会对路径名/d1/d2/.../dn的每一级父目录加上读锁,并对/d1/d2/.../dn/leaf加上读锁或写锁,leaf可能是文件或目录。 这里不需要对父级目录加写锁,因为并没有真正意义上的父级目录,也没有需要避免并发修改的数据。只要能防止被删除、重命名或被snapshot即可。 * 锁是以一个一致的全序来获取的,以避免死锁。首先按namespace树的层级排序,然后按同一级的字典序排序。

副本放置

副本放在分布在多个机器内是不够的,还需要分布在不同的机架,放置策略有两个目的: * 最大化数据可靠性和可用性。 * 最大化带宽利用。

Creation,Re-replication,Rebalancing

chunks在三种情况下会进行创建: 1. master创建新chunk。 创建时会考虑这些因素, * chunkserver磁盘使用率低于平均值。 * 限制每个chunkserver最近创建数。结合上一点,如果不限制,那么对于一个空chunkserver,迁移时繁重的写操作,会导致磁盘I/O过高。 * 跨多个机架。 2. 副本数低于目标值(由于机器不可达、副本损坏、目标值增加),master重新创建副本。 * 每个需要被re-replicated的chunk的优先级由多个指标衡量:低于目标值多少?优先为存在的文件(live files)创建而不是最近被删除的。优先创建任何阻塞了client操作的的chunk。 * 创建时,chunkserver会从有效的副本直接复制,会考虑1中的各种因素。 * 为了避免对client流量的影响,master会限制集群和每个chunkserver的clone数目。chunkserver会限制从源chunkserver的读请求来实现限速。 3. 平衡磁盘空间和负载。 * master定期进行rebalance,一般来说选择移除磁盘空闲空间低于平均值的。 * 新chunkserver会被逐渐的填满,而非立即将新chunk和写流量都直接打上去。

垃圾回收

当文件删除的时候,GFS不会立即回收物理存储。GFS会在定期的gc期间,在文件和chunk级别进行回收。

机制

当文件被删除的时候, * master立即记录删除日志,并将文件重命名到一个包含删除时间的隐藏名字。 * master定期扫描文件系统的namespace时,删除存在3天以上(可配置)的隐藏文件,这有效的切断了与chunks的连接。 * master定期扫描chunk namespace,识别出孤儿chunk,并清除它们的metadata。 * chunkserver与master的心跳包汇报了有哪些chunks,master告知哪些是已经不在metadata中的,可以自由删除。

这个删除机制相比立即删除,有很多好处, * 在机器故障很常见的大规模分布式系统中,这个机制简单可靠。 chunk的创建可能只在部分chunkserver上成功;副本删除消息可能丢失,master必须重发。 * 将存储回收合并到了mater常规的后台活动里。 可以批量完成,开销被均摊了。 * 仅在master相对空闲的时候完成。 * 延迟回收提供了应对意外、不可逆删除的保障。

最大的缺点是,延迟机制妨碍了用户在存储紧张的时候,对空间使用的调优。 * GFS通过加快对再次明确删除已删除的文件的回收来解决这一问题。 * GFS还允许对namespace的不同部分设置不同的复制和回收机制。

过期副本检测(Stale Replica Detection)

副本在chunkserver失败且错过修改的时候会过期。

  • master维护了一个chunk version number来区分最新和过期的副本。在授权租约前,master会增加chunk version number,并通知所有最新的副本。master和所有副本都会将版本号进行持久化存储。
  • 失败的chunkserver在重启后,master通过心跳包可得知副本过期。
  • 反过来,如果chunkserver的版本号高于master记录的,master会假设在授权的时候自己失败了,并把更高的版本号作为最新的。
  • 过期的副本使用gc进移除。
  • mater把primary返回给client的时候,以及通知chunkserver进行clone副本的时候,会带上版本号,二者会检查版本号以保证访问到最新的数据。

容错和诊断

高可用

GFS通过两个简单却有效的策略来实现系统高可用:快速恢复、副本。 * 快速恢复 * master和chunkserver可快速启动。 * 不区分正常和非正常终止。 * chunk副本 * 除副本外,parityerasure coding对于只读的场景,也是有用的。 * master副本 * 为了master状态的可靠性,操作日志和checkpoint会被复制到多个机器上。只有flush到本地和所有远程机器上的修改,才认为是已经提交了的。 * 提交与写入数据的顺序是什么,在写入数据完成前还是后? * 如果在log复制到多个机器前,master挂了?如果未复制log无法恢复? * shadow master * 提供了文件系统的只读访问,因为log可能落后于master;不可写,会导致脑裂。 * 与primary相同的顺序apply操作日志。 * 启动时定位chunk副本的位置(启动后就不会很频繁),并定期与chunkserver通信监控它们的状态。 * 仅当primary更新副本位置时,需要依赖master。 * 可被晋升为master。 * 主从切换是如何进行的?

数据完整性

  • chunkserver使用checksum来检查数据损坏。
    • 直接对比两个chunkserver上的副本是不不现实的。
    • 两个不同的副本可能是合法的(详见Atomic Record Appends)。
    • 每个chunkserver必须独立维护checksum来维护副本完整性。

chunk结构: * 细分为64KB block。 * 每个block都有32bit checksum。 * 保留在内存中,且与log一起持久化存储,和用户数据分离。

对于读操作: * 在返回给client或chunkserver前,会检查与读取范围重叠的部分。因此损坏的数据不会传播开。 * 如果checksum不匹配,返回错误给请求方,并通知master。请求方会读取其他副本,master会从其他副本clnoe数据。新副本到位后,通知chunkserver删除错误的副本。

checksum对(读和追加)性能的影响较小: 1. 读 * 绝大多数读只跨越少数几个block,因此只需要对少量数据进行校验checksum。 * GFS client在读取是对尝试对齐checksum block的边界。 * checksum的查找和比较不需要做I/O,checksum的计算可以和I/O同时进行。 2. 追加 * 只需要增量的更新最后一个block的last partial checksum,并计算后续新block的checksum。 * 计算last partial checksum block已经损坏,现在无法检测到它,新的checksum也将与存储的数据不匹配,并且在下次读取时将像往常一样检测到损坏。

> 现在无法检测到它(we fail to detect it now),这具体指的是?

但对于覆盖现有chunk某个range的写操作,必须先读取和检查被覆盖range的首个和最后一个block(为了避免新的checksums可能隐藏存在于未被覆盖区域内的数据损坏),然后写入,最后计算和存储新的checksum。

诊断工具

大规模和细致的诊断日志可以极大帮助问题隔离、debug和性能分析,且只有很小的开销。

思考问题

以下问题部分是我自己提出,部分来自Distributed-Systems/Lec03_GFS/Question.md * 为什么存储三个副本?而不是两个或者四个? * 为什么不适用RAID? 重点是整机的容错,而非存储设备的容错。 * chunk的大小为何选择64MB?这个选择主要基于哪些考虑? * 减少了client与master交互的需求。 * 由于chunk较大,client也较为可能在一个给定的chunk上进行很多的操作。 * 减少了存储在master上metadata的大小。 * master的checkpoint与application checkpointing的区别是什么? * 论文提到append机制可以用于multiple-producer/single-consumer queues,这个具体是如何实现的? * GFS主要支持追加(append)、改写(overwrite)操作比较少。为什么这样设计?如何基于一个仅支持追加操作的文件系统构建分布式表格系统Bigtable? * 为什么要将数据流和控制流分开?如果不分开,如何实现追加流程? * 最大化利用每个机器的网络带宽 * 避免网络瓶颈和高延迟的链路 * 最小化推送所有数据的延迟 * GFS有时会出现重复记录或者补零记录(padding),为什么? * 租约(lease)是什么?在GFS起什么作用?它与心跳(heartbeat)有何区别? * GFS追加操作过程中如果备副本(secondary)出现故障,如何处理?如果主副本(primary)出现故障,如何处理? * GFS master需要存储哪些信息?master数据结构如何设计? * 假设服务一千万个文件,每个文件1GB,master中存储的元数据大概占用多少内存? * master如何实现高可用性? * 负载的影响因素有哪些?如何计算一台机器的负载值? * master新建chunk时如何选择chunkserver?如果新机器上线,负载值特别低,如何避免其他chunkserver同时往这台机器迁移chunk? * 如果某台chunkserver报废,GFS如何处理? * 如果chunkserver下线后过一会重新上线,GFS如何处理? * 如何实现分布式文件系统的快照操作? * chunkserver数据结构如何设计? * 磁盘可能出现“位翻转”错误,chunkserver如何应对? * chunkserver重启后可能有一些过期的chunk,master如何能够发现?

Lectures

什么是一致性?

  • 正确性条件
  • 重要但是当存在数据副本的时候难以实现,尤其是应用并发访问时
  • 弱一致性 read()可能返回过期的数据。
  • 强一致性 read()使用返回最近write()的数据。

“理想”的一致性模型

  • 一个存在副本的FS行为和不存在副本的FS一样。
  • 读可以观察到最近的写。

实现“理想”的一致性模型的挑战

  • 并发
  • 机器失败:任何操作都可能失败
  • 网络分区:每个机器/磁盘不是总能访问到的
  • 为什么这些挑战难以克服?
    • 需要c/s间的通信:可能影响性能
    • 复杂的协议:难以正确的实现
    • 不少系统没有提供理想的一致性模型

GFS是否实现了“理想”的一致性模型

  • 对于目录:实现了
    • 强一致性,只有一个副本
    • 不是高可用,可扩展性受限
  • 对于文件:不总是

总结

  • GFS优秀的地方:
    • 顺序读写性能高
    • 追加
    • 吞吐量大
    • 数据容错
  • GFS不好的地方:
    • master容错
    • 小文件(master是瓶颈)
    • client可能看到过期的数据
    • 追加可能重复

Lab

  • Lecture 3没有关于GFS的实验,不过我找到了ppca-gfs,看介绍是上交ACM班一个课程的作业,就用这个来补上GFS的实验吧。
  • lab的代码在github.com/chaomai/mit-6.824

References

  1. Case Study GFS: Evolution on Fast-forward
  2. Google's Colossus Makes Search Real-Time By Dumping MapReduce
  3. 大规模分布式存储系统:原理解析与架构实战

介绍

YARN(Yet Another Resource Negotiator)是Hadoop的资源管理系统。

YARN把资源管理和任务的调度/监控拆分到了独立的进程,即ResourceManager(RM)和每个程序的ApplicationMaster,一个程序要么是一个单独的job或者是由DAG表示的多个job。

RM和NodeManager(NM)构成了数据计算的框架。RM拥有最大的权利来决断系统中每个程序所需的资源。NM是每个机器上的一个代理,负责container的管理,监控它们资源使用(cpu、内存、磁盘、网络),并汇报给RM。

每个程序的ApplicationMaster是框架的特定库,负责与RM协商资源,并与NMs一起工作来执行和监控任务。

RM有两个组成部分:Scheduler和ApplicationsManager(AM)。

Scheduler,在容量和队列的限制下,负责为各种程序分配资源。Scheduler只负责调度,不负责监控和跟踪程序的状态;也不为由于程序错误或硬件错误导致的任务失败提供重启保证。Scheduler基于程序的资源需求来执行调度功能;进一步说,是基于对资源的抽象,即Container(cpu、内存、磁盘、网络)来进行调度的。

Scheduler有一个可插拔策略,负责在各种队列和程序之间对集群资源进行划分。例如当前的scheduler有CapacityScheduler和FairScheduler。

AM负责接收job的提交、协商第一个container来运行程序的ApplicationMaster,以及为出错的ApplicationMaster container提供重启服务。每个程序的ApplicationMaster负责与Scheduler协商资源适当的的container,并追踪它们的状态和监控进度。

通过ReservationSystem,YARN还支持资源预定。

YARN应用的运行

资源请求

YARN的资源请求模型会考虑, * 每个容器需要的资源。 * 局部性(主要指数据的局部性)。 例如如果使用了HDFS的数据,会优先使用存放副本的结点,其次是存有这些副本的机架,最后才是集群的任意结点。

YARN应用可以在任意时刻提出资源的申请, * 在一开始就申请所有的资源。 * 以动态的方式,在需要更多资源的时候提出。

应用生命周期

按照应用的类型,应用的生命周期会有较大差异,主要分为以下3个模型, 1. 一个应用对应一个用户的job,例如MR任务。 2. 一个应用对应一个工作流或用户jobs的session,container可以在job之间复用,并cache数据,例如Spark。 3. 一个长期运行的应用被多个用户共享。这样的应用一般作为协调者的角色存在。

YARN优势

  • 可扩展性(Scalability) 每个应用都有一个专门的application master,分离了资源调度和task管理。就MR任务而言,这模型与Google MapReduce论文中所述的模型更加接近,即,一个master协调worker上的map和reduce任务。
  • 可用性(Availability) 拆分RM和application master简化了高可用的实现。先为RM提供高可用,再为YARN应用提供高可用。
  • 利用率(Utilization) 相比MapReduce 1,精细化了资源的管理,应用可以按需请求资源。
  • 多租户(Multitenancy) YARN支持除MapReduce外的其他分布式计算框架。

调度

YARN有3中调度器:FIFO调度器、容量调度器和公平调度器。

关于container

vcore是一个host的cpu核心占用比例。

container是, * cpu(vcore)、内存、磁盘、网络的抽象。 * 在有task或ApplicationMaster运行的时候,表示一个已分配的资源。 * 不同于docker中的container概念。

1
2
3
4
5
6
7
8
9
10
11
12
public abstract class ContainerLaunchContext {
// ...
/**
* Add the list of <em>commands</em> for launching the container. All
* pre-existing List entries are cleared before adding the new List
* @param commands the list of <em>commands</em> for launching the container
*/
@Public
@Stable
public abstract void setCommands(List<String> commands);
// ...
}

FIFO调度器(FIFO Scheduler)

按提交的顺序运行应用,首先为第一个应用分配资源,如果可以满足,再依次为其他应用服务。

容量调度器(Capacity Scheduler)

为每个组织分配一个专门的队列,每个队列可配置为使用一定量的集群资源,队列可以再进行划分。同一个队列内使用FIFO策略进行调度。

关于资源的使用, * 队列中单个任务使用的资源不会超过队列的容量。 * 如果队列满,且集群有空闲的资源,调度器可以把资源分配给此队列(可配置),弹性队列。 * 正常情况下,容量调度器不会抢占容器,因此如果一个队列随着使用,资源不够时,只能等待其他队列释放资源。 容量调度器也可以执行work-preserving preemption,RM会请求应用返回容器。

公平调度器(Fair Scheduler)

  • 每个队列有权重元素,用于fair share的计算。
  • 默认队列和动态创建的队列,权重为1(默认队列的可配置)。
  • 调度器会使用最小资源数量来进行资源分配进行优先排序。如果两个队列的资源都低于fair share额度,那么远低于最小资源数量的队列,会被有限分配资源。

队列放置

公平调度器使用一个规则的系统来判断应用所属队列。

饥饿和抢占

FairShare的计算会被用于判断饥饿以及是否进行抢占。在计算FairShare时,有两种: * Steady FairShare,按照配置文件中所有queue的weight,计算出的。 * Instantaneous FairShare,,按照配置文件中所有queue的weight,仅对包含活动应用程序的queue计算出的。

在配置yarn.scheduler.fair.preemptionyarn.scheduler.fair.preemption.cluster-utilization-threshold后,抢占会启用。

饥饿有两种: * FairShare Starvation 判定条件为: 1. 未获得所要求的资源。 2. 应用程序资源使用低于Instantaneous FairShare。 3. 应用程序的资源使用低于fairSharePreemptionThreshold,并持续fairSharePreemptionTimeout。

要注意的是,在同一个队列里面,如果存在多个应用程序,它们会平均的分摊Instantaneous FairShare。因此可能存在队列整体不是饥饿状态,但是每个应用程序是。
  • MinShare Starvation 判定条件为:
    1. 未获得所要求的资源。
    2. 应用程序资源使用低于MinShare。
    3. 应用程序的资源使用低于MinShare,并持续MinSharePreemptionTimeout。

决定需要进行抢占的时候,可能在多个队列中都有可抢占的container,决定container是否可以被抢占,需要满足: * 所在队列是可抢占的。 * 杀死container以后不会导致应用程序的资源低于Instantaneous FairShare。

启用抢占并不能保证队列或应用程序能够获得所有的Instantaneous FairShare。只能最终保证脱离饥饿的状态,即获得fairSharePreemptionThreshold份额的资源。

FairShare Starvation、MinShare Starvation以及抢占的关系如下:

Best Practice

  • 一般不建议配置MinShare Starvation或minimum resources。 增加复杂性的同时,并不能带来多少好处。
  • 如果配置minimum resources,所有minimum resources的加和不能超出总的资源数。

延迟调度

局部性是YARN调度时优先考虑的,但如果发现所请求的节点资源不够,那么任务可能就会被调度到其他节点上了。此时如果等待几秒,能够增加在所请求节点上分配到container的机会。

References

  1. Apache Hadoop YARN
  2. Untangling Apache Hadoop YARN
  3. YARN FairScheduler Preemption Deep Dive
  4. Hadoop - The Definitive Guide

Python Concurrency From the Ground Up,来自捕蛇者说的推荐,是David Beazley在PyCon 2015上的talk。在这个talk中,他边讲边写、外加开点玩笑,可以说David在各种意义上,都是并发的专家,很值得一看。视频和代码如下:

本文记录了这个talk的主要内容,并加上了我自己的理解。

这个talk从零实现了一个能支持多客户端并发访问的server,server计算了菲波那切数列第n项的值,为了展示blocking调用,用的是普通的递归实现fib(n) = fib(n-1) + fib(n-2)。同时还写了两个简单的client来测试server性能:perf1.py无限循环fib(30),并输出每次调用的时间;perf2.py无限循环fib(1),测试ops,这个调用是立即返回的。

版本1:简单单线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def fib_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
while True:
client, addr = sock.accept()
print("Connection", addr)
fib_handler(client)

def fib_handler(client):
while True:
req = client.recv(100)
if not req:
break
n = int(req)
result = fib(n)
resp = str(result).encode('ascii') + b'\n'
client.send(resp)
print("Closed")

这个实现的问题是,server无法同时响应多个client。当某个client连接上后,fib_handler(client)会执行到这个client断开为止。

版本2:多线程

在版本1的基础上,

1
2
3
4
def fib_server(address):
# ...
print("Connection", addr)
Thread(target=fib_handler, args=(client,), daemon=True).start()

此时server可以响应多个client。但是由于GIL的存在,python是无法利用多个cpu核心的,因此,

  1. perf1.py的结果(每次调用的时间)会随着perf1.py实例的增加而增加。同一时刻只能响应一个client,其他的等待,因此每次调用的时间大概是单个client调用时间的均值 * perf1.py实例个数
  2. perf2.py的结果(ops)会受其他调用的影响,ops会下降,n越大,下降越多。David这里提到了,GIL的一个特性是会把优先级给到计算更加密集的任务上,而os的调度却不会受这个影响,运行时间短的任务优先级更高。

python的每个线程实际上都有os实际的线程与其对应,用ps -o cmd,nlwp <pid>可看,但为何如此调度与其实现有关。对于os,Linux 2.6.23开始采用的是Completely Fair Scheduler;FreeBSD和macOS采用的是Multilevel feedback queue的调度算法,这也就解释了上述为什么运行时间较短的任务优先级更高。因为总能在规定的时间片内运行完成,不会被调度到后面的队列。

版本3:多线程+进程池

在版本2的基础上,

1
2
3
4
5
6
from concurrent.futures import ProcessPoolExecutor as Pool

def fib_handler(client):
# ...
future = pool.submit(fib, n)
result = future.result()

由于需要与子进程通信,需要序列化和反序列化数据,引入了额外的开销,因此perf2.py的ops会下降;但与此同时,server端处理计算任务是在单独的进程中,相当于计算任务的调度是由os来完成了,结合版本2:多线程中对os调度的解释,基本不受其他计算更加密集的任务影响。

版本4:事件循环和协程

回看前三个版本的server,使用线程,本质上是为了解决blocking。而blocking主要发生在等待io的时候,可以考虑只有当io ready的时候才去处理socket。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
tasks = deque()
recv_wait = { } # Mapping sockets -> tasks (generators)
send_wait = { }

def run():
while any([tasks, recv_wait, send_wait]):
while not tasks:
# No active tasks to run
# wait for I/O
can_recv, can_send, _ = select(recv_wait, send_wait, [])
for s in can_recv:
tasks.append(recv_wait.pop(s))
for s in can_send:
tasks.append(send_wait.pop(s))

task = tasks.popleft()
try:
why, what = next(task) # Run to the yield
if why == 'recv':
# Must go wait somewhere
recv_wait[what] = task
elif why == 'send':
send_wait[what] = task

else:
raise RuntimeError("ARG!")
except StopIteration:
print("task done")

class AsyncSocket(object):
def __init__(self, sock):
self.sock = sock
def recv(self, maxsize):
yield 'recv', self.sock
return self.sock.recv(maxsize)
def send(self, data):
yield 'send', self.sock
return self.sock.send(data)
def accept(self):
yield 'recv', self.sock
client, addr = self.sock.accept()
return AsyncSocket(client), addr
def __getattr__(self, name):
return getattr(self.sock, name)

def fib_server(address):
sock = AsyncSocket(socket(AF_INET, SOCK_STREAM))
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
while True:
client, addr = yield from sock.accept() # blocking
print("Connection", addr)
tasks.append(fib_handler(client))

def fib_handler(client):
while True:
req = yield from client.recv(100) # blocking
if not req:
break
n = int(req)
result = fib(n)
resp = str(result).encode('ascii') + b'\n'
yield from client.send(resp) # blocking
print("Closed")

tasks.append(fib_server(('',25000)))
run()

这个版本的核心在def run(),利用yield实现了协程。当遇到io时,yield跳出当前执行,select判断io ready后,才去读写socket。要注意的是,这个版本虽然没有使用多线程,但server是可以服务多个client的,因为在某个client的socket没有ready的时候,server可以做其他的事情。不过由于是单线程,对于所有client提交的计算任务,server只能逐一执行。协程并不能帮助解决多线程中GIL的问题,因为并没有利用到多个cpu核心,版本2:多线程的两个性能问题,这里都存在。

我觉得这里的yield和事件循环是用的很出彩,换做是我,我首先考虑到的是select出ready的socket,然后进行读写。弊端在于需要把业务逻辑套在一个大循环里面,每次都先调用select,在不同的socket ready的时候,使用相应的业务逻辑进行处理。

版本5:事件循环和协程+多进程

在版本4的基础上,

1
2
3
4
def fib_handler(client):
# ...
future = pool.submit(fib, n)
result = future.result() # Blocks

考虑到上一个版本无法利用多个cpu核心进行计算,那么如果像版本3:多线程+进程池一样把fib放入pool中,是否能解决问题呢?放入以后,会发现当某个协程执行到'future.result()'的时候就会阻塞,直到pool中的任务计算完毕,相当于server主线程会逐一等待每个计算任务。版本2:多线程的两个问题,仍然存在。

版本6:事件循环和协程+多进程

在版本5的基础上,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
future_wait = { }

future_notify, future_event = socketpair()

def future_done(future):
tasks.append(future_wait.pop(future))
future_notify.send(b'x')

def future_monitor():
while True:
yield 'recv', future_event
future_event.recv(100)

tasks.append(future_monitor())

def run():
# ...
task = tasks.popleft()
try:
why, what = next(task) # Run to the yield
if why == 'recv':
# Must go wait somewhere
recv_wait[what] = task
elif why == 'send':
send_wait[what] = task
elif why == 'future':
future_wait[what] = task
what.add_done_callback(future_done)

# ...

def fib_handler(client):
# ...
future = pool.submit(fib, n)
yield 'future', future
result = future.result() # Blocks

最后这个实现首先将result = fib(n),放入了pool并得到一个futureyeild之后为这个future添加计算完成后的回调future_done。这个pool可以是线程池(无法利用多个cpu),也可以是进程池。比较hacking的地方是用socketpair把计算ready转变了socket ready。

总结

除去实现中用到的一些技巧,这个talk把GIL和blocking的影响、要用什么样的方式来绕开这些问题,以及事件循环和协程讲的很明白。

P.S. 小插曲,现场写的时候,David把

1
can_recv, can_send, _ = select(recv_wait, send_wait, [])

写成了

1
can_recv, can_send, [] = select(recv_wait, send_wait, [])

最后有个提问者表示,the empty listing was just some incredible next-level thing that I was just not capable of

至于为什么unpack到空list,原因如下,

1
2
3
4
5
# 首先unpack到一个变量的list是可以的
>>> a, b, [x, y] = 1, 2, [10, 20] # a=1, b=2, x=10, y=20

# 如果对一个空list进行unpack,由于没有东西可以unpack,所以可以解到另一个空的list里面
>>> a, b, [] = 1, 2, [] # a=1, b=2

一直很想拍到繁星点点的夜空,毕竟自己拍到和看网上别人拍的照片是完全不一样的体验。

下面两张图片,分别拍摄于过年时的云南昆明和五一的内蒙包头,用lr进行了简单的调整。

云南昆明,2019-02-08 23:57

内蒙包头,2019-05-02 23:49

两张的拍摄参数类似,受限于光圈不大,并且为了画面相对纯净调低了iso,因此为了得到足够多的进光量,就拉长了曝光时间,其实可以考虑拍摄多张进行叠加的(懒...。拍摄的时候还缺少三脚架,都是随手找几块砖头堆起来。

看来为了以后的户外拍摄,又得剁手了。

文档和资源

要注意的点

Packages

每个go程序都由package构成。

Exported name

在一个包中,如果一个name是以大写字母开头的,那么这个name将会被从这个包中导出。

当import一个包的时候,只能引用被导出的name。

函数

参数

参数名称在前,类型在后,Go's Declaration Syntax

当多个连续的函数参数有共同的类型是,可以省略不写除最后一个外的类型。

1
2
3
func add(s string, x, y int) int {
return x + y
}

返回值

一个函数可以返回任意个数的结果。

返回值可以是有名字的。当return不带任何参数时,函数返回named return values,这叫做"naked" return。

1
2
3
4
5
func split(sum int) (x, y int) {
x = sum * 4 / 9
y = sum - x
return
}

变量

var声明多个变量时,只能写最后一个的类型。

1
2
3
4
var c, python bool

// error
// var c int, python bool

如果声明时给出了初始值,那么会以初始值的类型作为变量的类型,此时声明中的类型可省略。

在函数内部可以用短变量声明:=来声明变量,类型由值的类型来决定。但在函数外部,由于所有语句都需要以关键字开头,因此不可用这个方法。

初始化 在声明变量的时候,变量的值总是会被初始化,要么是用指定的值,要么是零值(变量类型的默认值)。

基本数据类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
bool

string

int int8 int16 int32 int64
uint uint8 uint16 uint32 uint64 uintptr

byte // alias for uint8

rune // alias for int32
// represents a Unicode code point

float32 float64

complex64 complex128

声明变量但不显式给出初始值,变量会被赋予零值。数值类型:0,bool类型:false,string:""

类型转换和推断

在进行类型转换时,go只能使用显式类型转换。

使用:=var =声明变量、未指明类型、但给出初始值时,变量的类型由对初始值进行类型推断得到。如果右侧是数值常量,那么变量的类型可能是intfloat64complex128

常量

只可以用const来声明。数值常量可表示任意精度,且不会溢出。一个未指定类型的常量由上下文来决定其类型。

全局变量

在程序运行期间,始终存在。声明和初始化方式与普通变量相同,需要在函数外部声明。

语句

for

1
2
3
4
5
6
7
8
9
10
11
12
// 对比c语言,括号可选、大括号必须
// 不可以使用var的方式声明
for i := 0; i < 10; i++ { ... }

// 初始化语句和循环的每次更新可省略
for ; sum < 1000; { ... }

// 可当做while使用
for sum < 1000 { ... }

// 无限循环
for { ... }

if

1
2
3
4
5
6
7
8
9
10
11
12
// 对比c语言,括号可选、大括号必须
if x < 0 { ... }

// 可以使用短语句,在判断之前执行,语句中声明的变量仅在if和后续的else语句块中可用
if v := math.Pow(x, n); v < lim { ... }

// else不能换行写
if v := math.Pow(x, n); v < lim {
...
} else {
...
}

switch

求值顺序,按case的顺序,自上向下进行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 对比c语言,
// break可选,不写时自动提供
// case不必是integer

switch os := runtime.GOOS; os {
case "darwin":
fmt.Println("OS X.")
case ...
...

// 不带条件的switch相当于switch true
switch {
case t.Hour() < 12:
fmt.Println("Good morning!")
case ...
...

defer

使用defer时,被defer的函数会被push到一个stack,参数会立即计算,但函数结束时,stack中的函数才会被pop出来执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func t(s string) string {
fmt.Printf("in func t: %s\n", s)
return s
}

func main() {
{
defer fmt.Println(t("you"))
fmt.Println("hello")
}
}

// output:
// in func t: you
// hello
// you

defer的函数可以读取和赋值到函数的返回值。

1
2
3
4
5
6
7
8
9
10
11
12
13
func c() (i int) {
defer func() { i++ }()
return 1
}

// c返回2

func c1() (int) {
var i int
defer func() { i++ }()
return 1
}
// c1返回1

defer、panic和recover 当调用panic时,所有defer的函数都被正常执行。然后函数返回到调用者。

recover仅在defer的函数中有用,正常执行时调用,只会返回nil

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func main() {
f()
fmt.Println("Returned normally from f.")
}

func f() {
// g发生panic后,这个deferred的函数会执行,并捕获panic
defer func() {
if r := recover(); r != nil {
fmt.Println("Recovered in f", r)
}
}()
fmt.Println("Calling g.")
g(0)
fmt.Println("Returned normally from g.")
}

func g(i int) {
if i > 3 {
fmt.Println("Panicking!")
panic(fmt.Sprintf("%v", i))
}
defer fmt.Println("Defer in g", i)
fmt.Println("Printing in g", i)
g(i + 1)
}

其他类型

关于slice,map和channel,某些书中会将它们描述为引用,但从实现上看(例如:slicemapchan),这些类型不过只是封装了底层指针的struct,且go spec也早就在文档中移除了reference一词的使用,而在THE WAY TO GO一书中虽然使用了reference一词,但也明确指出,

A reference type variable r1 contains the address (a number) of the memory location where the value of r1 is stored. ... When assigning r2 = r1, only the reference (the address) is copied. ... In Go pointers (see § 4.9) are reference types, as well as slices (ch 7), maps (ch 8) and channels (ch 13). ......

指针

存储了内存地址,零值为nil&获得变量的地址,*解引用。

对比c的指针,go的指针无法进行算数运算。

1
2
3
4
var p *int

i := 42
p = &i

指针的类型转换 unsafe.Pointertype Pointer int,代表了变量的内存地址,可以将任意变量的内存地址与Pointer指针相互转换。 uintptrtype uintptr intPointer无法进行加减运算,需要转换为uintptr才可以,可以将Pointeruintptr指针相互转换。 unsafe.Offsetof:可以得到字段在结构体内的偏移量。

1
2
3
4
5
6
7
8
9
10
11
12
13
*T <=> unsafe.Pointer <=> uintptr

type Vertex struct {
X int
Y int
}

var v = Vertex {50, 50}
// *Vertex => Pointer => *int => int
var x = *(*int)(unsafe.Pointer(&v))
// *Vertex => Pointer => uintptr => Pointer => *int => int
var y = *(*int)(unsafe.Pointer(uintptr(unsafe.Pointer(&v)) + uintptr(8)))
var y = *(*int)(unsafe.Pointer(uintptr(unsafe.Pointer(&v)) + unsafe.Offsetof(v.Y))

Struct

字段的集合,使用.来访问字段。首字母大写和小写分别代表公开和私有。私有变量只有同一个package才可以访问。

对于struct指针,可以使用(*p).X或直接使用p.X来进行访问。对比c,go不能用->来访问成员。

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type Vertex struct {
X int
Y int
}

// 使用多行的形式指定一个或多个字段的值的时候,最后的逗号不可以省略。如果只是一行,最后的逗号的可选
// 如果没有指定字段值,那么会使用相应类型默认的零值进初始化。
var v1 = Vertex {
X: 1,
Y: 2,
}
var v2 = Vertex {
X: 1,
}
var v3 = Vertex { X: 1 }

// 零值结构体实际分配了结构体的内存空间
var v4 = Vertex {}
var v5 = Vertex { 1, 2 }
var v6 *Vertex = &Vertex { 1,2 }
var v7 *Vertex = new(Vertex)
var v8 Vertex

// nil结构体不会分配结构体内存
var v9 *Vertex = nil

copy * 结构体之间的copy是深拷贝,不共享结构体内部字段。 * 结构体指针的copy是浅拷贝,共享内部字段。

组合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type Vertex struct {
X int
Y int
}

type Circle1 struct {
v Vertex
Radius int
}

// 匿名组合,此时外部的结构体在使用时,可以直接使用内部结构体的成员和方法。如果内部和外部存在相同名字的方法,会调用外部结构体的方法。
type Circle2 struct {
Vertex
Radius int
}

Array

和c一样,数组的大小也是数组类型的一部分,声明数组时必须有大小,通过下标访问数组中的元素。

程序执行时,go会检查访问是否越界。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var a[10] int

var a = [6] int{2, 3, 5, 7, 11, 13}
var b[6] int = [6] int{2, 3, 5, 7, 11, 13}
c := [6] int{2, 3, 5, 7, 11, 13}
d := [...] int{2, 3, 5} // 自动推断长度
e := [5] int{1: 10, 2: 20} // len(e) = 5, e[1] = 10, e[2] = 20

// 数组字面值
primes := [6]int{2, 3, 5, 7, 11, 13}

// 同类型(长度和元素类型)的数组可以相互赋值,会copy数组的内容
var e[6] int
e = a

内部存储 连续分配的内存区域。

copy 数组的类型由元素的类型和数组的大小决定,相同类型的数组之间才可以copy。拷贝一个数组,数组的内部的元素也会被逐一拷贝,因此作为变量传递时,需要注意copy的开销。

Slice

slice的类型为[]int,对数组进行, * a[low_index:high_index]后得到,区间是前闭后开,可以省略low_indexhigh_index,默认值分别为0和数组长度。 cap(a) = len(array) - low_index * a[low_index:high_index:cap_index]后得到,区间是“闭、开、开”。cap_index代表可用到的底层数组的最大index,必须小于len(array)cap(a) = cap_index - low_index

1
2
3
4
var a[10]
a[:4]
a[:]
b := []string {99: ""} // len(b) = 100, cap(b) = 100

内部存储

1
2
3
4
5
6
7
8
       +---------------+
slice: |pointer|len|cap|
+--+------------+
|
|
+--v--------------+
array: |item1|item2|... |
+-----------------+

例如:

1
2
3
4
5
6
s := []int{2, 3, 5, 7, 11, 13}

var s = []int{1,2,3,4,5,6,7,8,9,10}
var address = (**[10]int)(unsafe.Pointer(&s)) // 底层数组的地址
var len = (*int)(unsafe.Pointer(uintptr(unsafe.Pointer(&s)) + uintptr(8)))
var cap = (*int)(unsafe.Pointer(uintptr(unsafe.Pointer(&s)) + uintptr(16)))

slice和array slice本身并不存储任何数据,仅仅是数组选定区间的描述,和数组共享底层的数据。len()cap()对应了slice的长度,和底层数组从low起的大小,即:len(array) - low

对已有slice再做一次slice,实际上是改变slice对底层数组的引用范围。

1
2
3
4
5
6
7
s := []int{2, 3, 5, 7, 11, 13} // len(s)=6, cap(s)=6

// Slice the slice to give it zero length.
s = s[:0]

// Extend its length.
s = s[:4]

slice字面值类似数组的,区别是没有大小。底层实际上创建了相同大小的数组,然后再创建slice。

nil slice 一个nil slice,是未初始化的slice,lencap都为0,且不会分配底层的数组,数组指针为nil

1
var a []int

空slice 一个空slice的lencap都为0,且不会分配底层的数组,数组指针值不为空,但是也未分配底层数组。

1
2
a := make([]int, 0)
b := []int{}

当想声明一个空的slice时,nil slice和空slice都可以,两者在功能上完全等价,但是更推荐nil slice。但二者进行序列化的时候,结果会不同,nil slice会编码为null,而空slice是[]

make slice 通过make来创建动态长度的数组。

1
2
3
4
a := make([]int, 5)  // len(a)=5, cap(a)=5
b := make([]int, 0, 5) // len(b)=0, cap(b)=5
var c []int = make([]int, 5)
var d = make([]int, 5)

append append函数能够将相同类型元素追加至现有slice,若底层数组大小不够,则会重新分配内存,并将slice指向新数组。

如果发生了扩容,且有另一个slice存在,那么另一个slice的仍然指向老的数组。

扩容时,如果cap < 1024,那么会扩100%,否则扩25%。

1
func append(s []T, vs ...T) []T

range 除了普通方法遍历slice,还能使用range

1
2
3
4
5
6
7
8
9
// i是index,v是相应元素的copy。
for i, v := range s { ... }

// 可忽略i或v的赋值
for _, value := range s { ... }

for i, _ := range s { ... }
// 或
for i := range s { ... }

当使用range返回的值,v时,要注意的是range返回的是元素的copy,而不是引用,如果对齐进行&,那么得不到期望的结果。具体来说,Go会使用同一个变量,在每轮迭代中保存元素的copy。可以使用kyoh86/scopelint来检查代码中的unpinned variables。

  1. 取地址。

    1
    2
    3
    4
    5
    a := make([]int, 5)

    for i, v := range a {
    fmt.Println(&v, &a[i]) // v的地址保持不变,且不等于a中任意元素的地址
    }

  2. 这个原因还有可能导致使用goroutine时出现意外

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    // 由于closure已经绑定到了val,又因为goroutine可能在for结束后才执行
    // 因此打印出的可能全都是values的最后一个值。
    for _, val := range values {
    go func() {
    fmt.Println(val)
    }()
    }

    // ok
    for _, val := range values {
    go func(val interface{}) {
    fmt.Println(val)
    }(val)
    }

    // ok
    for i := range valslice {
    val := valslice[i] // val在每次迭代中都会分配新的
    go func() {
    fmt.Println(val)
    }()
    }

copy slice的copy是浅拷贝,两个slice共享底层数组。本质上copy的是:指向底层数组的指针、lencap

1
2
a := []int{2, 3, 5, 7, 11, 13} // len(s)=6, cap(s)=6
var b = a

go还提供了一个函数copy来实现数组内容的copy,copy时,会以目的切片的容量为准。

1
func copy(dst, src []T) int

结合slice的内部存储、append和拷贝,有的使用场景不注意可能导致意料之外的结果。

1
2
3
4
5
6
7
8
9
10
11
12
func FuncSlice(s []int, t int) {
s[0]++
s = append(s, t)
s[0]++
}
func main() {
a := []int{0, 1, 2, 3}
FuncSlice(a, 4)
fmt.Println(a)
}

// [1 1 2 3]

具体过程分析如下: 1. s[0]++as都指向同一个底层数组arr1,此时a->arr1s->arr1,修改了arr1。 2. append:由于扩容,append返回了一个新的底层数组arr2a->arr1s->arr2。 3. s[0]++:修改了arr2arr1不变。

Map

一个仅做了声明的map是nil,需要使用make来进行初始化。切片、函数以及包含切片的结构类型这些类型由于具有引用语义,不能作为映射的key,使用这些类型会造成编译错误。

1
2
3
4
5
6
7
8
9
10
11
type Vertex struct {
Lat, Long float64
}

var m map[string]Vertex

fmt.Println(m == nil) // true
m = make(map[string]Vertex)
m["Bell Labs"] = Vertex{
40.68433, -74.39967,
}

map字面值和struct字面值类似。

1
2
3
4
5
6
7
8
var m = map[string]Vertex{
"Bell Labs": Vertex{ 40.68433, -74.39967,}
}

// 当最上层的类型只是类型名的时候,可以省略。
var m = map[string]Vertex{
"Bell Labs": { 40.68433, -74.39967,}, //最后的逗号不可缺少
}

mil map nil map未进行初始化,不能用于存储key-value。

1
var m map[string]Vertex

make map 通过make来创建map。

1
var m = make(map[string]Vertex, 10)

range 类似slice,且slice中存在的问题,map中也同样存在。由于无法获取index,因此只能通过每轮迭代创建变量来解决。

1
2
3
4
5
6
7
8
9
// k是key,v是相应元素的copy。
for k, v := range m { ... }

// 可忽略k或v的赋值
for _, v := range s { ... }

for k, _ := range s { ... }
// 或
for i := range s { ... }

操作 * 新增和更新:m[key] = elem * 访问key的值:elem = m[key],如果不存在,那么elem为此类型的零值,但如果值真的是零值,那通过这个方法来判断key是否存在就失效了。 * 删除:delete(m, key) * test:var elem, ok = m[key],如果不存在,那么elem为此类型的零值。

字符和字符串

rune

rune字面值代表一个rune常量,是一个标识Unicode code point的整型值。alias for int32

rune字面值可以用'单个字符'来表示,可以用\转义的多个字符来表示,具体见Rune literals

string

string字面值代表了包含一系列字符的string常量,只读。有两种形式:raw string字面值和interpreted string字面值。

  • raw string字面值,是经过未转义处理的,在raw string内部,可以出现任意字符。string中出现的'\r'会被忽略。
  • interpreted string字面值,go会进行转义处理。具体见String literals
1
2
3
fmt.Println("\U000065e5\U0000672c\U00008a9e")    \\ 日本語

fmt.Println(`\U000065e5\U0000672c\U00008a9e`) \\ \U000065e5\U0000672c\U00008a9e

内部存储

1
2
3
4
5
6
7
8
9
10
11
12
            +---------------+
byte slice: |pointer|len|cap|
+--+------------+
|
|
+--v------------+
array: |item1|item2|...|
+---^-----------+
|
+---+-----------+
string: |pointer|len|cap|
+---------------+

1
2
3
4
5
a := "Hello,你好"
fmt.Printf("%x\n", *(*[2]int)(unsafe.Pointer(&a))) \\ [4b9f3c e]

b := a
fmt.Printf("%x\n", *(*[2]int)(unsafe.Pointer(&b))) \\ [4b9f3c e]

由于底层存储是数组,因此可以做slice,但要注意的是,这里本质上是对字节来做slice,因此如果slice的Unicode code point不是一个完整的字符,那么打印的时候,是不会正确显示的。

1
2
3
a := "Hello,你好"
b := a[0:9]
fmt.Println(b) \\ Hello,�

从字符串得到字节slice或者从字节slice得到字符串,会发生底层数组的copy。如果想避免copy,可以手动一个string或slice,获得一个原始string或者slice的“reference”,这种方式不可以通过slice修改string,因为修改后,“reference”到的原有string失效了,可能会被gc回收。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
a := "Hello,World"
b := []byte(a) // copy
c := string(b) // copy

// 手动构造
func str2bytes(s string) []byte {
var strhead = *(*[2]int)(unsafe.Pointer(&s))
var slicehead [3]int
slicehead[0] = strhead[0]
slicehead[1] = strhead[1]
slicehead[2] = strhead[1]
return *(*[]byte)(unsafe.Pointer(&slicehead))
}

func bytes2str(bs []byte) string {
return *(*string)(unsafe.Pointer(&bs))
}

遍历 * 按字节遍历:通过下标。 * 按字符遍历:range方式遍历。

Function values

函数也是值,可作为参数传递,作为返回值返回。

闭包(closure)是function value引用了函数体外部的变量,函数可以访问和修改这些变量。换句话说,闭包包含了函数、以及所在的环境的上下文

方法和接口

方法集(method sets)和调用

方法集函数调用的规范明确了一个类型有哪些方法,以及在什么时候可以调用什么样的方法。go wiki上关于这两个概念有比较详细的例子

method set 1. 对于一个接口类型,接口是方法集。

  1. 对于一个类型T,所有receiver为T的方法是方法集。

    对于类型T对应的指针类型*T,所有receiver为T*T的方法是方法集。

Type method sets
interface type interface
T func (T) f()
*T func (*T) f(), func (T) f()

一个类型T的方法集决定了,这类型T的接口类型的实现,和使用T作为receiver时可以被调用的方法。

call 对于一个方法调用x.m(), 1. 如果x的method set包含m(),且调用时的参数列表合法,那么这个调用是合法的。

  1. 如果x可以取地址的,并且&x的method set包含m(),那么x.m()等价于(&x).m()

    map元素和interface存储的具体值不可取地址。

方法

方法是带有特殊receiver参数(func和函数名之间)的函数。这个receiver不必是struct,但要求receiver的类型定义必须在同一个package里面,且不能直接将内置类型作为receiver。

1
2
3
4
5
type Vertex struct { X, Y float64 }
func (v Vertex) Abs() float64 { ... }

type MyFloat float64
func (f MyFloat) Abs() float64 { ... }

Point receiver 若要修改字段,则必须使用point receiver,无论变量本身是否是指针类型非指针receiver调用时发生了copy

从这里可以得出使用point receiver的场景:1. 避免copy;2. 修改值本身。一般来说,某个类型的receiver应该统一,要么是point receive,要么是普通receiver。

1
2
func (v *Vertex) Scale(f float64) { v.X = v.X * f ... }
func (v Vertex) Scale2(f float64) { v.X = v.X * f ... }
  • 若v不是指针类型,那么go会把v.Scale自动转换为(&v).Scale
  • 反过来,若v是指针类型,在调用Scale2时,go会把v.Scale2转换为(*v).Scale2

对比c++的成员函数,this指针类似于point receiver,但go的普通receiver是不同于c++的。

接口

接口是方法签名的集合。

接口的实现是隐式的,无需类似implement的关键字。隐式实现解耦接口的定义和实现,在package中,接口的定义可以出现在方法和类型定义之后。注意方法的实现区分普通receiver和point receiver

一个接口值可以被赋值为任何实现了接口中所有方法的值。接口值底层实际包含了具体值的类型,接口值可以看做是值和具体类型的元组。调用接口值的方法,实际上会调用具体类型的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type Abser interface { Abs() float64 }

type MyFloat float64
func (f MyFloat) Abs() float64 { ... }

type MyInt int64
func (f MyInt) Abs() int64 { ... }

type Vertex struct { X, Y float64 }
func (v *Vertex) Abs() float64 { ... }

var a Abser
f := MyFloat(1)
i := MyInt(2)
v := Vertex{3, 4}

a = f
a = i
a = &v

// a = v
// error,Vertex没有实现Abs(),*Vertex才实现了Abs()

对比c++的多态,c++中通过继承基类,并覆盖基类的虚函数,在运行时进行动态绑定,以此实现多态。go的接口方法定义可以看做是基类和虚函数,而a = f相当于将子类的指针赋值给基类指针,这样完成了动态绑定。

不同的点还是receiver,实现接口的方法时,go区分了point receiver和普通receiver。

内部存储 实现上,一个接口值底层包含了指向类型和数据的指针。

接口类型之间的赋值和类型转换是共享数据的,而结构体之间的赋值、结构体转接口、接口转结构体,都会导致数据的copy。

空的具体类型值 如果接口的具体类型值是空的,那么将会使用nil receiver来调用方法,不引发空指针异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
type Abser interface { Abs() float64 }

type Vertex struct { X, Y float64 }
func (v *Vertex) Abs() float64 {
if v == nil {
...
}
...
}

var i Abser
var v *Vertex
i = v

i.Abs()
// or
v.Abs()

空的接口值 会发生运行时错误,没有具体的Abs方法可以调用。

1
2
3
type Abser interface { Abs() float64 }
var i Abser
i.Abs()

空接口 空接口的值可以包含任何类型。

1
2
3
4
5
6
var i interface {}
i = 32
i = "hello"

// i = 100000000000000000000000000
// overflows int

接口变量的赋值 对于数值类型,底层的具体类型只能是intfloat64complex128

类型断言 类型断言提供了访问接口底层具体类型值的能力。

  • t := i.(T)断言接口i拥有具体类型T,并把类型T的值赋值给t。如果不是类型T,则触发panic。
  • test:t, ok := i.(T)断言不正确的情况,不触发panic,而是ok为false,且t为类型T的零值。
1
2
3
4
5
var i interface{} = 100000000000000000000000000 // overflow
a := i.(int64)

var i interface{} = 100000
a := i.(int64) // int

Type swtiches是允许断言多个类型的结构。类似switch语句,但是每个case是特定的类型。

1
2
3
4
5
6
7
8
9
10
// 如果test成功,那么v会转换为相应的类型。

switch v := i.(type) {
case T:
// here v has type T
case S:
// here v has type S
default:
// no match; here v has the same type as i
}

对比scala的pattern matching,go的type swtiches像,但不是pattern matching。scala的pattern matching会检查值和pattern是否匹配,能够把值解构为构成值的各部分。猜测go的type swtiches是类型字符串是否相等的test。

一些内置的接口

Stringer 类似python的__str__,定义在fmt中。

1
2
3
4
5
6
7
8
9
type Stringer interface {
String() string
}

type A ...
func (a A) String() string { return "hello" }

a := A()
fmt.Println(a) // hello

error 类似Stringer,fmt在print的时候也会查找error接口。从fmt的实现上看,是error优先

1
2
3
type error interface {
Error() string
}

error更适合用于专门定义的错误类型。否则功能上,stringererror就冗余了。

可以使用fmt.Errorferrors.New来创建error类型的值。

1
fmt.Errorf("math: square root of negative number %g", f)

Reader io包定义了io.Reader接口,代表读取stream,有多个实现(文件、网络等)。

其中func (T) Read(b []byte) (n int, err error)方法使用现有数据填充b,并返回填充的字节数和error。stream结束时,errorio.EOF

并发

Goroutines

由go运行时管理的轻量级线程。

1
go f(x, y, z)

参数的计算在当前goroutine中完成,函数f的调用发生在新的goroutine。所有子协程都是平级的关系(包括在子协程内部启动另一个协程)。

Channels

channel是带类型的管道(typed conduit),每次只能发送或接受一个元素。默认情况下,发送方和接收方会一直阻塞到另一方ready,每次只能唤醒一个发送或接受方。

方向

1
2
3
chan T // 可以发送和接受类型为T的数据
chan<- T // 可以发送类型为T的数据
<-chan T // 可以接受类型为T的数据

unbuffered channel unbuffered channel必须保证先有goroutine正在接收,否则发送方会一直阻塞到有goroutine来接收为止。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ch := make(chan int) // len(ch) == 0, cap(ch) == 0

/////////////////////

ch <- 1 // block
v := <-ch

/////////////////////

// 先有goroutine正在接收
go func() {
v := <-ch
fmt.Println(v)
}()

ch <- 1

buffered channel ch := make(chan int, 100),buffered channel在满或空的情况下,分别会导致发送方和接收方阻塞。

range for i := range ch可以从channel逐个接收值,直到channel被关闭。

close * 发送方可以通过close(ch)来告诉接收方没有后续的值会发送。如果向关闭的channel发送元素,那么会导致抛出异常。 * 接收方可以使用v, ok := <-ch判断channel是否被关闭。如果从一个已经关闭的channel接收元素,会返回channel类型的零值,因此是不能用这个方式来判断channel是否关闭的。

select select语句可以让goroutine等待多个通信操作(发送或接受都可以),block直到其中某个case能执行。如果同时有多个case能执行,则随机选择一个。

若存在default,则当没有case ready的时候,执行default,因此可以通过default实现非阻塞式的发送或接受。

1
2
3
4
5
6
select {
case i := <-c:
// use i
default:
// receiving from c would block
}

并发模式

内存模型

go内存模型

反射

构建

static build

go build启用race,也需要启用cgo。

测试

总结

抛开go的运行时环境和gc不说,go很像c,同时还有着少量函数式语言的特性。

go中,我很喜欢的几点是:

  1. 变量和函数的声明简洁清晰
  2. goroutine
  3. 提供了CSP来实现goroutine之间的通信