0%

symbol table

symbol table(符号表)是用于编译器或解释器中的一个数据结构,存储了源码中每个符号以及关联的信息。不同的作用域可能会有各自的符号表。在静态语言中,符号表尤为重要。

python variable

python中的变量与c/c++的不一样,是绑定到对象的symbolic name。aabb都是绑定到一个list对象的名字。

1
2
3
aa = [1, 2, 3]
bb = aa
aa[0] = 666

python symbol table

虽然python是动态语言,没有编译时类型检查,但python也有符号表。python的符号表通过编译器由AST生成,用于计算每个标识符的作用域,最终符号表和AST会被共同用于生成字节码。symtable模块提供了关于标识符的作用域等信息,还能够输出在这些作用域中引用到的变量是哪个。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# add.py
def outer(aa):
def inner():
bb = 1
return aa + bb + cc
return inner

src = open('add.py', 'r').read()
import symtable
table = symtable.symtable(src, 'src.py', 'exec')

def describe_symbol(sym):
assert type(sym) == symtable.Symbol
print("Symbol:", sym.get_name())
for prop in [
'referenced', 'imported', 'parameter',
'global', 'declared_global', 'local',
'free', 'assigned', 'namespace']:
if getattr(sym, 'is_' + prop)():
print(' is', prop)

for i in table.get_children()[0].get_children()[0].get_symbols():
describe_symbol(i)

Symbol: bb
is referenced
is local
is assigned
Symbol: aa
is referenced
is free
Symbol: cc
is referenced
is global

free variable:如果变量在一个代码块内使用(在代码内被绑定),但是并没有在其中定义,那么这个变量是自由变量。顺便提一句,使用了自由变量的函数就是闭包(closure)。 global variable:如果变量代码在模块级别被绑定,这个变量是全局变量。 local variable:如果变量在代码块内被绑定,这个变量是局部变量。

python module

module(模块)是一个包含python定义和语句的文件,文件名由module名和.py后缀构成。module名可由全局变量__name__获取。这个文件可被作为脚本使用或用于repl中,module中的定义可以被其他module或mainmodule导入。

例如:p.py

1
2
def add(a, b):
return a + b

在repl中,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 把module名p加入当前的符号表
import p
p.add(1, 2)

Symbol: p
is imported
is local

# 把p中的add导入当前符号表
from p import add
add(1, 2)

Symbol: add
is imported
is local

# 把p中的除_开头的所有名字导入当前符号表
from p import *

一个module,可以包含可执行语句和函数定义,这些语句是用于初始化module的,仅仅在第一次导入这个模块的时候执行,作为脚本运行的时候也会执行。module作为脚本执行的时候,__name__会被设置为"__main__",很常见的一个语句是if __name__ == "__main__":,作为脚本执行的时候,运行if内的语句。

每个module有自己私有的符号表。module内部的全局变量不会与外部的产生冲突,另一个角度看,如果知道模块内的全局变量名称,可以通过modname.itemname的方式访问到。通过dir()函数可以得到module内定义的名字。

导入module时,module名字的search path为, 1. 内置的module 2. sys.path 这个变量的值由下列几项初始化得到, * 脚本当前目录 * PYTHONPATH环境变量 * the installation-dependent default

`sys.path`初始化以后,可以在程序中修改。

python package

package是一种通过modnameA.submodnameB来组织module的方式。python会把包含__init__.py文件的文件夹当做package。__init__.py文件可以直接留空,也可以加入初始化package的代码或设置__all__变量(用于控制from package imrpot *的时候,导入哪些)。

导入的方式为, * from package import item。import语句首先测试item是否在package中定义,如果没有,那么假设item是一个module并尝试load。最终如果没有找到,那么会抛出ImportError异常。 * import item.subitem.subsubitem,最后一个item可以是module或package,但不能是类、函数或变量,其他必须是package。

class和module

在使用的时候,class和module有一些相似的地方,但是二者完全不同。

  • class是创建带有属性和方法的实例的蓝图,支持继承等。这些module都不能做。
  • module仅仅只是组织代码的方式。

References

  1. Python internals: Symbol tables, part 1
  2. symtable — Access to the compiler’s symbol tables¶

Python进程间通信总结

Python的multiprocessing支持使用类似threading模块的API来创建进程,multiprocessing提供了本地和远程并发,有效的避免了线程中的GIL。下面对Python进程间通信的方法做了简单的总结,并列举了相应的例子。

消息传递

pipe

1
2
3
4
5
6
7
8
9
10
11
def worker(conn):
i = conn.recv()
print(i)
conn.close()

if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
parent_conn.send(1)
p.join()

queue

multiprocessing.Queue使用pepe和locks/semaphores实现了进程间的共享队列。任何picklable对象都可以通过queue传递。

1
2
3
4
5
6
7
8
9
10
11
12
13
def worker(q):
i = q.get()
print(i)

if __name__ == '__main__':
queue = multiprocessing.Queue()
p = multiprocessing.Process(target=worker, args=(queue,))
p.start()

queue.put(1)
queue.close()
queue.join_thread()
p.join()

同步原语

Barrier

一种简单的同步原语,用于固定数目的进程相互等待。当所有进程都调用wait以后,所有进程会同时开始执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def worker(b):
i = b.wait()
print(i)
if i == 0:
print('passed the barrier')

if __name__ == '__main__':
barrier = multiprocessing.Barrier(2)
p = multiprocessing.Process(target=worker, args=(barrier,))
p.start()

q = multiprocessing.Process(target=worker, args=(barrier,))
q.start()

p.join()
p.join()

Semaphore

BoundedSemaphore

Condition

条件变量。多个进程可以等待同一个条件变量,直到他们被另一个进程通知。条件变量默认使用RLock,可以提供自己的锁,但必须是Lock或者RLock对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def producer(q, c):
c.acquire()
while len(q) == 0:
c.wait()

i = q.pop()
print(i)
c.release()

def consumer(q, c):
for i in range(10):
c.acquire()
q.append(i)
c.notify_all()
c.release()
time.sleep(1)

if __name__ == '__main__':
queue = []
cond = multiprocessing.Condition()
p1 = multiprocessing.Process(target=consumer, args=(queue, cond,))
p1.start()

p2 = multiprocessing.Process(target=consumer, args=(queue, cond,))
p2.start()

q = multiprocessing.Process(target=producer, args=(queue, cond,))
q.start()

p1.join()
p2.join()
q.join()

Event

Event提供了一种简单的方法来实现进程间的状态传递。event类似一个flag,状态是set或者unset。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def wait_for_event(e):
print('wait_for_event: starting')
e.wait()
print('wait_for_event: e.is_set()->', e.is_set())

def wait_for_event_timeout(e, t):
print('wait_for_event_timeout: starting')
e.wait(t)
print('wait_for_event_timeout: e.is_set()->', e.is_set())

if __name__ == '__main__':
e = multiprocessing.Event()
w1 = multiprocessing.Process(name='block', target=wait_for_event, args=(e,))
w1.start()

w2 = multiprocessing.Process(name='nonblock', target=wait_for_event_timeout, args=(e, 2))
w2.start()

print('main: waiting before calling Event.set()')
time.sleep(3)
e.set()
print('main: event is set')

Lock

状态共享

shared memory map

通过使用ValueArray,数据可以被存储在shared memory map中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]

if __name__ == '__main__':
num = multiprocessing.Value('d', 0.0)
arr = multiprocessing.Array('i', range(10))

p = multiprocessing.Process(target=f, args=(num, arr))
p.start()
p.join()

print(num.value)
print(arr[:])

ValueArray在shared memory分配了ctypes对象。默认情况下,返回的值是使用同步方法封装过的对象,即线程安全的。其中,参数di代表了值的类型。

References

  1. Passing Messages to Processes
  2. multiprocessing — Process-based parallelism

Docker笔记

操作系统虚拟化

Operating-system-level virtualization, 又叫做容器化(containerization),指的是操作系统提供了这样一个功能,内核允许存在多个隔离的用户空间实例,每个实例叫做一个容器(container)。

Docker是一个容器化平台。

container vm
15226880999438 15226881122546
  • 容器是应用层上的抽象,容器之间共享操作系统内核。
  • vm是硬件抽象,每个vm不仅仅包含要允许的程序,还包括了一个完整的os。

基本概念

镜像(Image)

镜像是由一系列的只读层构成的,每层代表了对已有镜像的修改。相当于一个root文件系统。由一个sha256作为镜像ID唯一标识,一个镜像可以有多个标签

对已有镜像中文件的修改,都是以Copy-on-Write的方式进行,例如:删除文件,在新的一层中进行标记删除,原有层的文件不变。

scratch镜像

scratch镜像代表一个空白的镜像,这是一个虚拟的镜像,不实际存在,意味着基于这个构建的新镜像不使用任何已有的基础镜像。

1
2
3
FROM scratch
ADD ubuntu-xenial-core-cloudimg-amd64-root.tar.gz /
...

空悬镜像(Dangling image)

空悬镜像是没有tag,没有被其他镜像依赖的镜像。当创建一个新的镜像时,使用了一个已有的tag,那么老的镜像就会变为空悬镜像。可被删除。

1
2
docker image ls -f dangling=true
docker image prune

Docker build context

Docker是c/s架构的,实际的构建动作发生在server。当ADD文件到镜像中时,server需要知道这个文件,具体的方式都是通过打包构建时的上下文,并发送给server。

1
docker build [OPTIONS] PATH | URL | -
  • PATH,即上下文;
  • URL是一个压缩包,那么其内容会被当做上下文。

Untagged和Delete

镜像的多层结构使得层与层之间存在依赖,在进行删除操作时,需要考虑这些的依赖关系。当某层没有被依赖时,才可以被删除。

删除一个镜像时,实际上是删除镜像的标签(Untagged),当一个镜像的所有标签都被删除了,才会触发删除这个镜像(Delete)。

如果有容器存在,同样不可以删除这个镜像,因为这个镜像被一个可读写层依赖。

容器(Container)

容器是镜像运行的实体,本质上是一个进程,拥有自己的独立的命名空间。在存储上与镜像的区别在于多了最上面的一个读写层,这个层是容器的运行态环境,生命周期与容器一致,不建议在这一层写入数据。

15227153999553

仓库(Registry)

Docker仓库(Docker Registry)是存储和分发镜像的服务,包括多个Repository;每个Repository包含多个标签,通过<仓库名>:<标签>的方式来指定一个镜像。

数据管理

  • 数据卷(Volumes)

    数据卷独立于镜像和容器,可在多个容器之间共享,容器终止后,数据卷也会一直存在。

  • 挂载主机目录(Bind mounts)

网络管理

Docker的网络是通过可插拔的驱动来提供的。除了可以使用Docker已有的驱动,还能安装第三方网络驱动。

bridge

15261916795186

bridge网络是在链路层用于在不同网段之间转发流量的。bridge可以是一个硬件或软件设备。

bridge驱动创建了主机内部的私有网络,在这个网络中的容器可以相互通信,可以通过端口映射来实现外部的访问。docker通过规则在禁止不同bridge网络之间的通信。

docker的bridge是local scope的,这意味着一个bridge网络只能提供单机上的容器互联。docker提供了默认的bridge,用户也可以添加自定义的bridge。

user-defined bridge和default bridge略有不同,user-defined bridge提供了如下功能:

  • 连接到user-defined bridge的所有容器,都会自动的相互暴露所有端口。
  • 提供了容器之间的自动DNS解析,可以直接通过host name来进行访问其他容器。
  • 创建了可配置的bridge(默认的bridge也可以配置,但是这个操作是在docker之外进行的,需要重启docker)。

默认的bridge特别的一点是,可以通过--link来在容器之间共享环境变量,但--link已被遗弃,user-defined bridge可以通过共享文件或目录、compose file来实现共享环境变量。

overlay

15261917479037

host

使用host网络时,容器的网络栈没有与主机的隔离(存储、进程namespace、用户namespace都有隔离),如果在容器绑定到了80端口,那么容器的程序在主机的80端口可用。这也就意味着,绑定前要求主机的80端口未被占用。

host网络仅在linux上可用,猜测mac和windows上不可用是由于使用了虚拟化。

macvlan

15261917609686

none

Docker上部署Hadoop

  1. Docker上部署Hadoop等完全分布式集群
  2. UNDERSTANDING DOCKER NETWORKING DRIVERS AND THEIR USE CASES
  3. Docker —— 从入门到实践

References

  1. Docker Documentation

Notes of Functional Programming in Scala and more.

Pure Function

对于一个函数 f: A => B,如果任何一个值 a: A 都有只有一个 b: B 与之对应,使得 b 仅仅由 a 决定,那么这个函数是纯函数。任何内部或外部状态的改变都不影响 f(a) 的结果。

Referential Transparency

引用透明是表达式的属性。如果一个表达式是引用透明的,那么把这个表达式替换为它求值后的结果不会影响程序的行为。引用透明是定义一个纯函数的必要条件。

如果表达式不是引用透明的,那么这个表达式就是有 side-effect(副作用)。根据引用透明的定义,对于一个纯函数函数 f,表达式 f(x) 不会有任何的副作用。

Idempotence(幂等性)

纯函数的幂等的。

\[ \forall x, f(f(x)) = f(x) \]

Currying

一个 Curryied 函数是,把一个接受多个参数的函数重写为,一个首先接受第一个参数并返回一个函数接受第二个参数...的函数。

例如:foldLeft。

https://oldfashionedsoftware.com/2009/07/10/scala-code-review-foldleft-and-foldright/

Currying 和 Partially Applied Function

  • http://stackoverflow.com/questions/14309501/scala-currying-vs-partially-applied-functions
  • http://stackoverflow.com/questions/8650549/using-partial-functions-in-scala-how-does-it-work/8650639#8650639

trait and class

  • http://stackoverflow.com/questions/1991042/what-is-the-advantage-of-using-abstract-classes-instead-of-traits
  • http://stackoverflow.com/questions/2005681/difference-between-abstract-class-and-trait
  • http://www.artima.com/pins1ed/traits.html#12.7

Functions and Methods

In Scala there is a rather arbitrary distinction between functions defined as methods, which are introduced with the def keyword, and function values, which are the first-class objects we can pass to other functions, put in collections, and so on.

  • https://www.google.com/search?q=difference+scala+function+method
  • http://www.cnblogs.com/shihuc/p/5082701.html
  • http://stackoverflow.com/questions/4839537/functions-vs-methods-in-scala
  • http://stackoverflow.com/questions/2529184/difference-between-method-and-function-in-scala
  • http://jim-mcbeath.blogspot.com/2009/05/scala-functions-vs-methods.html
  • https://dzone.com/articles/revealing-scala-magician%E2%80%99s
  • https://tpolecat.github.io/2014/06/09/methods-functions.html
  • https://www.quora.com/What-is-the-difference-between-function-and-method-in-Scala
  • https://softwarecorner.wordpress.com/2013/09/06/scala-methods-and-functions/
  • http://www.marcinkossakowski.com/difference-between-functions-and-methods-in-scala/
  • http://japgolly.blogspot.com/2013/10/scala-methods-vs-functions.html
  • http://blog.vmoroz.com/posts/2016-06-01-scala-should-i-use-a-method-or-a-function.html

Functional Combinators

List(1, 2, 3) map squared把函数squared应用到了List(1, 2, 3)的每个元素上,并返回一个新的List。这个操作叫做map组合子(map combinators)。

PartialFunction and Partially Applied Function

Rank-1 polymorphism and Higher-Rank Polymorphism

  • https://apocalisp.wordpress.com/2010/07/02/higher-rank-polymorphism-in-scala/
    • related:
    • https://apocalisp.wordpress.com/2010/10/26/type-level-programming-in-scala-part-7-natural-transformation%C2%A0literals/
    • https://apocalisp.wordpress.com/2011/03/20/towards-an-effect-system-in-scala-part-1/
  • http://existentialtype.net/2008/03/09/higher-rank-impredicative-polymorphism-in-scala/

Existential Types

  • http://www.drmaciver.com/2008/03/existential-types-in-scala/

Algebraic Data Type

  • http://tpolecat.github.io/presentations/algebraic_types.html
  • https://www.youtube.com/watch?v=YScIPA8RbVE

Variant

Covariant and contravariant

对于所有类型XY,以及类型Co[+A]Ctr[-A]Iv[A]。如果X是Y的子类,

covariant:Co[X]Co[Y]的子类。在类型参数A前加上+表示 covariant。 contravariant:Ctr[Y]Ctr[X]的子类。在类型参数A前加上-表示 contravariant。 invariant:Iv[X]Iv[Y]无关。

1
2
3
4
5
6
7
8
9
┌───┐     ┌───────┐     ┌───────┐
│ Y │ │ Co[Y] │ │Ctr[X] │
└───┘ └───────┘ └───────┘
▲ ▲ ▲
│ │ │
│ │ │
┌───┐ ┌───────┐ ┌───────┐
│ X │ │ Co[X] │ │Ctr[Y] │
└───┘ └───────┘ └───────┘

表述 covariant 的另一种方式是,在所有上下文中,都可安全的把A转换为A父类。contravariant 类似。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Animal {
val sound = "rustle"
}

class Bird extends Animal {
override val sound = "call"
}

class Chicken extends Bird {
override val sound = "cluck"
}

class covariant1[+A]

// error:
// val covariant_b1: covariant1[Bird] = new covariant1[Animal]
val variant_b2: variant1[Bird] = new variant1[Chicken]
│ │ │ │
└────────────┘ └───────────────┘
▲ │
└───────────────────────┘
to supper class

class contravariant2[-A]

val contravariant_b3: contravariant2[Bird] = new contravariant2[Animal]
// error:
// val contravariant_b4: contravariant2[Bird] = new contravariant2[Chicken]

Covariant and Contravariant Positions

1
trait Function1 [-T1, +R] extends AnyRef

以函数为例,

contravariant (positive) position:一个类型在函数参数的类型中。更一般的来说,是函数所使用的值的类型。 covariant (negative) position:一个类型在函数结果的类型中。更一般的来说,是函数产生的值的类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
                 ┌────────────────────────┐
│ contravariant position │
└────────────────────────┘

┌──────────────────┘

def foo(a: A): B

└────────────┐

┌────────────────────┐
│ covariant position │
└────────────────────┘

对于高阶函数,从外层向内分析。类型最终是什么 position 由分析过程中,各个类型的“累加”得到。就函数 foo 而言,A => B 是在 contravariant position;就函数 f 而言, A 是在 contravariant position,B 是在 covariant position。因此,最终 A 是在 covariant position(可以理解为两个 negative position 合并,负负得正),而 B 是在 contravariant position。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
                  ┌───────────────────────┐
│ A: covariant position │
└───────────────────────┘

│ ┌───────────────────────────┐
│ B: contravariant position │
┌ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ─ └───────────────────────────┘
│ ▲
├──────────────────────────────────────────────────┴──────────┐
│ │ │
│ ┌────────────────────────┐ │
┌────────────────────────┐ │ contravariant position │ ┌────────────────────┐
│ contravariant position │ └────────────────────────┘ │ covariant position │
└────────────────────────┘ ▲ └────────────────────┘
▲ ┌───────────┘ ▲
│ │ ┌───────────────────────────────────┘
│ │ │
┌────┐ ┌───────▶ A => B
│ │───────┘
def foo(f: A => B): C

└────────────┐

┌────────────────────┐
│ covariant position │
└────────────────────┘

Scala要求类型本身的variant与类型所在位置的variant一致。

1
2
3
4
5
6
7
8
9
// error 1
trait Option[+A] {
def orElse(o: Option[A]): Option[A]
...
}

// error 2
class V1[+A]
class V2[-B] extends V1[B]

这个定义将会导致编译错误,Error:... covariant type A occurs in contravariant position in typeorElse 函数接受参数Option[A],这个位置(contravariant position)是只能将 A 转换为 A 的子类型的地方。但是类型 A 是 covariant 的,也就是说在所有上下文中,都可安全的把 A 转换为 A 父类。这里出现了冲突。

对于 orElse,解决方式是不使用 A,使用边界限定类型为 A 的父类。

1
2
3
4
def orElse[B >: A](o: Option[B]): Option[B] = this match {
case None => ob
case _ => this
}

那为什么不,

1
2
3
4
5
// 1.
def orElse[B <: A](o: Option[B]): Option[B]

// 2.
def orElse[B](o: Option[B]): Option[B]

orElse 返回值可能的类型为 Option[B]Option[A], * 对于1,BA 的子类,Option[B]Option[A] 的子类,当返回 this 时,类型为 Option[A],这里无法从父类转换到子类。 * 对于2,当返回 this 时,类型为 Option[A],和 Option[B] 完全无关,类型不匹配。

Laziness 和 Non-strictness

二者都与求值策略1(evaluation strategy)有关。一个编程语言用求值策略来,

  • 确定合适计算一个函数调用的参数。
  • 以什么样的形式来把数值传递给函数。

求值策略包括,

  • 严格求值(strict evaluation):一个函数调用的参数总是在应用这个函数之前进行完全求值。常见的有,
    • call by value
    • call by reference
    关于这个话题,不得不提的是计算顺序2(order of evaluation),在c++中,这个顺序未定义,由编译器决定。
  • 非严格求值(non-strict evaluation):一个函数调用的参数只有在函数体内被使用到时,才进行求值。常见有,
    • call by name:在函数体内被使用到时才进行求值,没有被用到则不会求值。如果被多次使用到,那么每次使用都需要重新求值。
    • call by need:记忆版的 call by name。当参数首次被求值时,值被保留用于后续使用。
  • 非确定性策略(nondeterministic strategies)
    • call by future

在 Scala 中,非严格参数又叫做 call "by name" 参数,对应 Scala 中对这些参数使用的 call by name求值策略。当首次使用到某个参数的时候,通过 lazy val 缓存这个值,来达到 call by need(又叫做 lazy evaluation)。

Nothing 和 NotNothing

Scala 中,关于多态,一个头疼的问题是,如果没有为一个多态方法指定类型参数,并且无法从方法的参数中推断出类型,由于 Nothing 是所有类型的子类型,此时 Scala 就会把没有指定的类型推断为 Nothing

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
sealed trait Dimension

sealed trait UnitDim extends Dimension

trait VarDim extends Dimension

sealed trait DimValue[D <: Dimension] {
def value: Int
}

object DimValue {
def apply[D <: Dimension](v: Int) = new DimValue[D] {
override def value: Int = v
}
}

上面的 DimValue,我希望 apply 中,DDimension 的子类型。如果在调用这个方法时为明确指定类型参数,结果就会是 DimValue[Nothing]。这不是我所期望的。

1
2
3
4
5
val dv = DimValue(3)

Pattern: dv: DimValue[Nothing] with Object {
def value: Int
}

Google 了 Scala NotNothing 以后,找到两个方法,

State Action and FSM

type State[S, +A] = S => (A, S)

State 代表携带者某种状态的计算,或 state action,state transition,或 statement。State 类型表示将 S 从一个状态转换到另一个状态,并产生副作用 A。这有点像有限状态机。

考虑一个 Mealy 型的 FSM,根据定义(主要是转移函数和输出函数),把这个 FSM 表示为,

1
case class Mealy[S, I, A](t: (S, I) => S, g: (S, I) => A)

其中,

  • S:状态的有限集合
  • I:输入字母表的有限集合
  • A:输出字母表的有限集合
  • t:转移函数
  • g:输出函数

转移函数和输出函数都接受 SI 的值作为参数,可以把这两个函数合并,得到,

1
2
3
4
(S, I) => (A, S)

// currying
I => S => (A, S)

根据 State 的定义得到,I => State[S, A]State[S, A] 代表类型为 S => (A, S) 的函数,这个函数参数可以看做这个 Mealy 型的 FSM 的初始状态。

同样的一个 Moore 型的 FSM,根据定义可以表示为,

1
case class Moore[S, I, A](t: (S, I) => S, g: S => A)

转移函数和输出函数都接受 S 的值作为参数,合并得到,S => (I => S, A),但此时就不能够用 State 数据类型来表示了。

implicit


  1. https://en.wikipedia.org/wiki/Evaluation_strategy↩︎

  2. http://en.cppreference.com/w/cpp/language/eval_order↩︎

scanRight

这是 Functional Programming in Scala 一书中,练习5.16的一个问题。问题是,把 tail 泛化为 scanRight,这个 scanRight 需要返回中间结果形式的 stream。例如:

1
2
scala> Stream(3, 2, 1).scanRight(0)(_ + _).toList
res0: List[Int] = List(6, 5, 3, 0)

注:1. 给定一个 stream,tail 会将这个输入的所有后缀作为 stream 返回;2. 中间结果形式的 stream 意味着结果是以 call-by-name 的方式构造的;3. 遍历 scanRight 的时间复杂度应该为 \(O(n)\)

先不考虑时间复杂度的要求,仅仅实现 scanRight 的功能,那很容易就可以写出下面的代码,

1
2
3
4
5
def scanRight1[B](z: B)(f: (A, => B) => B): Stream[B] =
foldRight((z, Stream(z)))((e, acc_s) => {
val acc_e = f(e, acc_s._1)
(acc_e, Stream.cons(acc_e, acc_s._2))
})._2

foldRight 保留了累计值 acc_e 和 stream 的中间值 acc_s,在每次迭代(这里实际上是递归)中,使用这两个值来进行 Stream.cons,以得到最后结果。

计算过程分析

在看这题的答案的时候,发现答案并没有直接使用 acc_s,而是用 lazy 缓存了这个值,然后基于这个缓存的值进行计算。

1
2
3
4
5
6
def scanRight2[B](z: B)(f: (A, => B) => B): Stream[B] =
foldRight((z, Stream(z)))((e, acc_s) => {
lazy val p = acc_s
val acc_e = f(e, p._1)
(acc_e, Stream.cons(acc_e, p._2))
})._2

这里我不理解的是缓存什么不好,为什么要缓存 acc_s。因为每次迭代都会遇到 stream 中的元素,为什么不 lazy val p = e

下面以 Stream(3, 2, 1).scanRight(0)(_ + _) 为例,分析 scanRight1 计算过程。

  1. 把求和的 function 叫 sum,

    1
    s.scanRight(0)(sum)

  2. 展开 scanRight,

    1
    2
    3
    4
    s.foldRight((0, Stream(0)))((e, acc_s) => {
    val acc_e = sum(e, acc_s._1)
    (acc_e, Stream.cons(acc_e, acc_s._2))
    })._2

  3. 为了后续分析方便,对比 foldRight 的类型,提取出 foldRightf

    1
    2
    3
    4
    5
    6
    def foldRight[B](z: => B)(f: (A, => B) => B): B

    f: (Int, (Int, Stream[Int])) => (Int, Stream[Int]) = (e, acc_s) => {
    val acc_e = sum(e, acc_s._1)
    (acc_e, Stream.cons(acc_e, acc_s._2))
    }

  4. 此时可以进行 foldRight 中的 pattern matching 了,

    1
    2
    3
    Cons(h, t) =>
    h: 3
    t: Stream(2, 1).foldRight((0, Stream(0)))(f)

  5. 把参数代入 f 进行计算,

    1
    2
    3
    4
    5
    6
    7
    acc_e: 3
    acc_s: Stream(2, 1).foldRight((0, Stream(0)))(f)

    {
    val acc_e = sum(3, acc_s._1)
    (acc_e, Stream.cons(acc_e, acc_s._2))
    }

    根据 foldRight 的定义,逐层展开 acc_s._1,即:Stream(2, 1).foldRight((0, Stream(0)))(f)._1,省略了展开的详细过程和 foldRight 中对 Empty 的 pattern matching,

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    {
    val acc_e = sum(3,
    {
    val acc_e = sum(2,
    {
    val acc_e = sum(1, 0)

    (acc_e, Stream.cons(acc_e, acc_s._2))
    }._1
    )

    (acc_e, Stream.cons(acc_e, acc_s._2))
    }._1
    )

    (acc_e, Stream.cons(acc_e, acc_s._2))
    }

    等价于,

    1
    2
    3
    4
    {
    val acc_e = 6
    (6, Stream.cons(6, acc_s._2))
    }

  6. 接下来计算 acc_s._2,即:Stream(2, 1).foldRight((0, Stream(0)))(f)._2

观察上述过程发现,Stream(2, 1).foldRight((0, Stream(0)))(f),也就是 acc_s 会被计算两次,这就是 lazy val p = acc_s 的缘由。

与动态规划的联系

进一步分析,在第一次迭代中,当前的 acc_s 需要计算两次。在后续的迭代中(计算:(Stream.cons(6, acc_s._2)),每次迭代都重复计算了一部分结果。简单来说就是,

1
2
3
第一次:3 + 2 + 1
第二次: 2 + 1
第三次: 1

例如值 1,在 3 次迭代中,都被重复计算得到了 3 次。使用 lazy val p = acc_s 避免了这些多余的计算。

这时回过头去看 scanRight,可以将这个函数的功能看做是,对所有的后缀 stream 进行了 foldRight 操作。对长后缀 stream 计算 foldRight 时,实际上计算了短后缀 stream 的 foldRight。这实际上就是动态规划,

  • 最优子结构:当前长后缀 stream 的 foldRight可以由短后缀 stream 得到。
  • 无后效性:短后缀 stream 的 foldRight 不受长后缀 stream 的影响。
  • 子问题重叠:短后缀 stream 的 foldRight 会被多次计算。

由于子问题重叠,因此常用,

  • Memoization (Top Down)
  • Tabulation (Bottom Up)

来存储子问题的解,避免重复计算。

scanRight 实际上就是用了 Memoization 来存储子问题的解。

Lectures

Remote Procedure Call (RPC)

分布式系统的关键组件。

目标:简化C/S通信的开发。

  • 隐藏 C/S 通信的大多数细节。
  • client 端进行 RPC 调用,就像调用普通的函数一样。
  • server 端处理 RPC 请求,就像处理普通的函数一样。

RPC消息发送图示,

1
2
3
Client             Server
request--->
<---response

软件结构,

1
2
3
4
client app         handlers
stubs dispatcher
RPC lib RPC lib
net ------------ net

一些细节

  • 组织数据:把数据封装为数据包。
  • Binding:client 如何知道应该向谁发送请求?
    • 要么在 client 提供 server 的 host name。
    • 要么有一个 name service,提供 service names -> best server host 的map。

RPC错误处理

可能存在的错误, * lost packet * broken network * slow server * crashed server

client 观察到的错误可能有, * client 无法接收到 server 的 response。 * client 无法得知 server 是否接受到了 request。 * server根本就没有收到请求。 * server执行调用了,但在发送响应前,server出错了。 * server执行调用了,但是发送响应的时候,网络出问题了。

RPC语义

at least once(best effort)

client 等待 server 的 response,没有响应 -> 重新发送请求,尝试多次仍然没有 response,返回 error。 重新发送请求可能导致 server 重复执行操作。如果重复执行操作是不可接受的,那相关接口必须是幂等的。

at most once

由server 检测重复的请求,返回重复请求先前的 reply,而非重复执行。

如何检测:client 的每个请求包含一个唯一的 ID,

1
2
3
4
5
6
7
server:
if seen[xid]:
r = old[xid]
else
r = handler()
old[xid] = r
seen[xid] = true

at most once的难点, * 如何保证 ID 唯一?尤其是分布式多个 client 的情况下。 * 大随机数? * 唯一的 client ID + request seq id? * server 最终是需要丢弃老的 request 的结果的。何时丢弃是安全的? * 类似TCP sequence #s 和 acks 的方法。使用唯一的 client ID;每个 client 的 RPC 有一个 seq id;对于每个 RPC,client 包含了 seen all replies <= X。 * 只允许 client 有一个未完成的 RPC。对于 seq + 1 的 request,server 可以丢弃所有 <= seq。 * 当上一个 request 还在运行的时候,如何处理重复的 request? * 此时 server 还无法查到 reply,但又不想重复执行。可以为每个正在执行的 RPC 加上 pending 标签,重复的 RPC 进行等待/忽略。 * 如果 server 挂了或重启怎么办? * 如果有 memory 中的数据备份,那么 server 可以在重启后忽略并接受重复的请求。 * 数据冗余是否应该写入磁盘? * 备份 server 是否也应该有这些数据的备份?

exactly once

at most once + 无限重试 + 容错服务。

Lab 1: MapReduce

这次的实验仍然继续 Lab 1: MapReduce。完成单机版的 MapReduce 以后,还要实现分布式的 MapReduce(模拟),这里需要 RPC 库。

Go RPC 都提供了 at most once 的语义,换句话说,并不保证送达。如果 worker 没有响应,但实际上是在执行 task 的,那么 master 会重新调度新的 worker 来执行这个任务。

Lab 1: MapReduce 给出的代码已经提供了框架,下面分析了原始代码中,master 初始化,以及注册 worker 的过程,

Variables

一个变量名就是一个合法的表达式。awk 的变量被默认初始化为空 string,当转换为数字时,是0,因此不需要显式初始化变量。

在为 awk 指定参数时,可以定义变量并赋值,-v variable=text,这样的变量在 BEGIN 之前就被定义,可以在 BEGIN 中使用。但 -v 必须先于文件名参数和程序文本。

如果在程序文本之后,awk '{ print $n }' n=4 inventory-shipped n=2 mail-list,那么变量n会在读取每条记录时被赋值,会先打印文件 inventory-shipped 第四列的内容,然后打印文件 mail-list 第二列中的内容。

Predefined Variables

少数变量是有特殊含义的,它们是 predefined variables。这些变量,要么被 awk 自动检查(User-modified),要么被自动设置(Auto-set)。

User-modified

  • CONVFMT:控制数字到 string 转换。
  • FS:列分隔符,值为一个字符或匹配多个字符的正则表达式。 默认是 " ",会匹配任何 \" ", \t, \n。同时会使得每条记录开头和结尾的 " ", \t, \n 被忽略。
  • OFS:输出文件列分隔符。
  • ORS:输出文件记录分隔符(行尾)。
  • RS:输入文件记录分隔符(行尾)。
  • SUBSEP:下标分隔符,默认是 \034

Auto-set

  • ARGC, ARGV:awk 参数信息,ARGV 的起始下标为0。程序文本不会被包含到这两个变量中。
  • ENVIRON:关联数组,包含了 environment 的值。
  • FILENAME:当前输入文件的文件名。 未设置输入文件(stdin输入)时,值为 "-";在BEGIN中时,值为 ""
  • FNR:当前文件的行号,在每次打开新文件时被重置为0。
  • NF:当前记录的列数。在每读取一个新纪录的时候,或者 $0 改变的时候,被重置。
  • NR:已处理的行数。
  • RLENGTH:被 match() 匹配后,substring 的长度,没有匹配值为-1。
  • RSTART:被 match() 匹配后,substring 的起始位置,没有匹配值为0。

Pattern Elements

Regexp Patterns

awk 中正则表达式的形式为/regexp/

Expression Patterns

awk 表达式都可作为一个 pattern,如果表达式的值非空或非零,那么就是匹配上了。常见的是由比较运算符构成的,

  • awk '$1 == "li" { print $2 }' mail-list 用于直接比较 string 的操作符除了 ==,还可以是<, <=, >, >=, ==, !=。以及判断array中是否有以subscript为下表的元素,subscript in array。

  • awk '$1 ~ /li/ { print $2 }' mail-list 用于判断特定的 string 是否 match 正则表达式,需要用 ~, !~ 连接左右操作数。

  • awk '/edu/ && /li/' mail-list 也可以用布尔操作符连接正则表达式、由比较运算符构成的表达式,以及任何其他的 awk 表达式。

Ranges

range pattern 由逗号分隔的两 个pattern 构成,begpat, endpat,用于匹配连续的记录。 当遇到匹配 begpat 的记录时,开始匹配包括这条记录在内的所有后续记录,直到遇到匹配 endpat 的记录时,停止匹配,然后开始下一轮。如果有某条记录使得两个 pattern 都为 true,那么只匹配一次,并开始下一轮。

1
2
3
4
5
6
7
8
file:
awefawef
aweawef%
awefawfe
aweawef%

awk '/%$/,/%$/ {print}' test
aweawef%

BEGIN/END

前面的几个 pattern 都是会匹配输入的,BEGIN/END 是特殊的 pattern,不会匹配(因为在它们执行的时候还没有记录,或者已经没有记录存在了),它们指定了 awk 程序起始和结束时的 action,都只会执行一次。

如果存在多个 BEGIN/END,那么按定义的顺序执行。 当仅有 BEGIN,执行完 BEGIN 后,程序结束;当仅有 END,程序会读取输入,然后执行 END

常用awk程序

下面是我常用到的,或者看到觉得不错的awk程序。

以某个字符串为key,统计对应的value总和

1
awk '{a[$4"\t"$5"\t"$2]+=$3} END {for (x in a) print x"\t"a[x]}' $geo_file_ret | sort -k 4 -n -r > $1

打印非空行

1
awk 'NF'

因为 NF 是当前输入的列数,非空行的列数大于0。当条件为真,且没有其他的 action 时,awk执行默认操作,打印一行。扩展来说,可以只提供表达式,作为条件,表达式为真则打印响应行。

类似的,

1
awk '!a[$0]++'

a 是数组,每个元素被初始化为0,$0 作为索引。首次遇到的行,++ 对以 $0 为下标的元素加1,后缀 ++ 返回原始值0,取反后为真,打印。后续如果再遇到相同的,可以取到一个非0的值,取反为false,不打印。因此实际上是去除了重复的行。

以及,

1
seq 1 30 | awk 'ORS=NR%5?FS:RS'

默认情况下,ORS是一个换行符 "\n\"。这里根据,已处理行数是否是5的倍数,为ORS赋值FS或RS,默认即," " or "\n"。它们的值都不为0,作为条件都是true。因此每一行都会打印,只是被打印行的行为分隔符是FS或RS。

1
2
3
4
5
6
1 2 3 4 5
6 7 8 9 10
11 12 13 14 15
16 17 18 19 20
21 22 23 24 25
26 27 28 29 30

替换分隔符

在没有修改的情况下,awk 并不会重新 build $0,原因之一是,为了性能;二是,对于 echo 'foo;bar' | awk -v FS=';' -v OFS=',' '/foo/',如果真的替换了,那 foo,bar 并不是期望匹配到的 foo;bar。 因此,不同于上面的 ORS,这里直接指定 FS 以及 OFS 是没有用的。

1
awk -v FS=';' -v OFS=',' '{$1=$1}1'

这里的确修改了 $0,但内容没有变,因此会替换FS。如果没有空行,可以把1去除。

连接字符串

利用 awk 中变量被初始化为空 string 或0这一点,可以方便的连接字符串

1
string = string sep somedata; sep = ";"

避免开头出现多余的 ";"

处理两个文件

当处理两个文件时,awk 按命令中指定的顺序,顺序读取每个文件。当读取第一个文件时,NR == FNR总为true。 基于这点,有一个常用的写法,

1
awk 'NR == FNR { # some actions; next} # other condition {# other actions}' file1.txt file2.txt

常用的场景比如,

  • 两文件的交集,awk 'NR == FNR{a[$0];next} $0 in a' file1.txt file2.txt
  • 两文件的差集(仅在第一个文件里出现),awk 'NR == FNR{a[$0];next} $0 in a' file1.txt file2.txt
  • 基于某一列,join两文件,awk 'NR == FNR{a[$1]=$2;next} {$3=a[$3]}1' map.txt data.txt

不过这个写法,仅适用于第一个文件非空的情况,如果为空,会直接用 NR == FNR { # some actions; next} 处理第二个文件,可以检测FILENAME是否等于ARGV[1]。

More

  • http://coolshell.cn/articles/9070.html
  • http://www.gnu.org/software/gawk/manual/gawk.html

cluster by vs order by vs sort by

  • ORDER BY x: guarantees global ordering, but does this by pushing all data through just one reducer. This is basically unacceptable for large datasets. You end up one sorted file as output.

  • SORT BY x: orders data at each of N reducers, but each reducer can receive overlapping ranges of data. You end up with N or more sorted files with overlapping ranges.

  • DISTRIBUTE BY x: ensures each of N reducers gets non-overlapping ranges of x, but doesn't sort the output of each reducer. You end up with N or unsorted files with non-overlapping ranges.

  • CLUSTER BY x: ensures each of N reducers gets non-overlapping ranges, then sorts by those ranges at the reducers. This gives you global ordering, and is the same as doing (DISTRIBUTE BY x and SORT BY x). You end up with N or more sorted files with non-overlapping ranges.

Hive NULL处理

默认情况下,每行数据在被输入script前,列会被转换为 \t 分隔的string,所有 NULL 值会被转换为字符串 \N,目的是为了与空string(长度为0)区分开。script输出一行数据时,标准输出会将输出是为 \t 分隔的列,同时 \N 被转换为 NULL

因此Hive的空值处理分为:

  • NULL\N

    HIVE底层是用,可以通过alter table name SET SERDEPROPERTIES('serialization.null.format' = '\N'); 来更改底层以什么样的字符来存储NULL。

    查询空值字段可用:

    1
    a is null 或 a = '\\N'

    相应的在script在判断是否为 NULL 时,是需要判断!= '\N'

  • 空string

    '' 表示长度为0的 string,但此时字段并非为 NULL,而是一个长度为0的 string。

    查询空string可用:

    1
    a = '' 或 length(a)=0

    在使用outer join的时候尤其要注意以上区别,要想清楚究竟是要判断字段为 NULL,还是判断string为空。

    1
    2
    3
    4
    select case when t.idfa != '' and t.idfa != null then t.idfa end from (select 'B5BF3011-A8A0-46A2-B8C3-0A1976FDD4BE' as idfa) as t;
    select case when t.idfa != '' and t.idfa != NULL then t.idfa end from (select 'B5BF3011-A8A0-46A2-B8C3-0A1976FDD4BE' as idfa) as t;
    select case when t.idfamd5 != null then '1' else '2' end from (select '\N' as idfamd5) as t;
    select case when t.idfamd5 != null then '1' else '2' end from (select NULL as idfamd5) as t;

Insert into bucketed table

Hive 2.x之前,向分桶表中插入数据时,需要(二者之一),

  • 方法1:set hive.enforce.bucketing = true

  • 方法2:

    1. 设置reducer数目,set mapred.reduce.tasks,使其等于bucket的数目
    2. 在select中使用,cluster by