MapReduce编程模式原理及其算法设计

  MapReduce是一种编程模式,在很大程度上借鉴了函数式语言。它主要的思想是分而治之(divide and conquer)。将一个大的问题切分成很多小的问题,然后在集群中的各个节点上执行,这既是Map过程。在Map过程结束之后,会有一个Ruduce的过程,这个过程即将所有的Map阶段产出的结果进行汇集。


  上述过程可以说是一个显而易见的过程,所以说MapReduce是一个极其简单而有极其复杂的编程模式。说它简单是因为在程序员使用它编程解决实际问题时,他只要编写一个Mapper函数和一个Reduce函数,或许在复杂一点加上一个Combiner函数和Partitioner 函数,其余的就直接交给MapReduce框架执行。这样程序员就只要关注数据业务,而不用关注具体的执行过程。但说它复杂是因为MapRuduce有着看不见的部分,在程序员准备好Mapper和Reducer之后提交个MapReduce,而具体执行过程却是一个非常复杂的过程。


  MapReduce在具体执行过程中,同步化是一个非常棘手的问题。在MapReduce的并行执行过程唯一发生集群级同步化是在Shuffle和Sort阶段,即在Mapper阶段完成后,将各个节点上的中间结果Key/Value 依据Key的值聚集后复制到Reducer节点上。除此之外的过程,各个节点都是独立运行的而且没有直接的通信。这就意味着程序员对MapReduce的执行过程的很多方面都没有控制能力,比如:


  集群中哪个节点来执行Mapper和Reducer。


  什么时候开始和结束Mapper和Reducer。


  输入的key/value 由哪个Mapper来处理。


  中间结果产生的key/value 由哪个Reucer来处理。


  所谓山不转水转,从另一个角度来说,程序员可以控制以下的几个方面:


  数据结构的自定义,即key/value的具体的结构可以是程序员自定义的。


  MapReude 每个Mapper和Reducer的执行过程在开始和结束都可以有一段程序自定义代码,来确定每个Mapper和Reducer在执行之前和结束后的动作。比如Hadoop在实现MapReude框架时,每个Mapper和Reducer都有一个setup 和cleanup 函数,既是定义开始和结束的动作。在一些MapReduce优化算法中会充分利用这两个函数。


  对MapRuduce的Mapper的输入Key 和Reducer的中间结果Key 状态的控制。


  对中间结果Key 排序的控制,这样程序员就有能力控制Reducer处理Key的顺序。


  对中间结果Key 分区的控制,这样程序员就有能力控制Reducer处理Key的集合。


  虽然说MapRudce 框架为我们提供了良好的编程模式和接口,但是从上面的可控方面和不可控方面,我们可以挖掘出一些有效的设计模式。这些模式或可以帮助我们提高MapReduce的执行效率,或可以帮助我们更好的控制代码执行和数据流模式转化。下面具体介绍MapReduce算法设计。


  1. 本地聚集


  在偏数据敏感的分布式处理中,一个重要的性能瓶颈是中间结果的网络传输。就拿Hadoop来说,它将Mapper处理的中间结果在本地磁盘上存储(该过程还涉及到序列化),然后通过网络传输给Reducer。这里磁盘和网络的延时会极大的影响MapReduce的执行效率。很容易想到的解决方案就是如何减少中间结果的大小来提高效率。而本地聚集(local aggregation)可以减少中间结果的产生,从而能提高MapReduce的效率(特别是对于一个Mapper可能产生多个相同的Key)。


  1.1 Combiner和 in-MapperCombining


  Combiner在大多数MapReduce实现中都会提供的,它的主要作用是对每个Mapper产生的结果进行本地聚集。我们知道在MapReducer的输入实现过程是大致这样的:一个大的作业文件会通过MapReduce提供的Splitter 来进行切割成多个小的文件,每个文件会被一个Mapper处理,而每个小文件又会被MapReduce提供的RecordReader进行分割形成很多的key/value 形式的记录,每个Mapper对象中的map 方法会对每条key/value 的记录进行处理。处理后会形成一个新的Key/value 的中间结果,会序列化写到本地磁盘。


  举个常见的例子来说:数单词。假设现在有100 篇的文集,要数出每个单词出现的次数。MapRudce 接到这个任务后,首先检测文集数据的正确性。然后进行分割(这里我们采用逻辑上的分割)。比如集群有10 个TaskTracker可用。MapReduce将100文档进行平均分割,每个TaskTracker会得到10篇文集。若每个TaskTracker运行一个Mapper的话,这10篇文集会一次被Mapper处理。10篇文集又会被分切成10个RecordReader形式的Key/Value(key=docId,value=http://www.china-cloud.com/yunzixun/yunjisuanxinwen/docContent)。这样一个Mapper一次就会处理一片文档。具体算法伪码如下:


  1: class Mapper


  2: method Map(docid a; doc d)


  3: for all term t in doc d do


  4: Emit(term t; count 1)


  1: class Reducer


  2: method Reduce(term t; counts [c1; c2; : : :])


  3: sum = 0


  4: for all count c in counts [c1; c2; : : :] do


  5: sum = sum + c


  6: Emit(term t; count sum)


  首先,我们来讨论Combiner的使用。上述算法只使用了Mapper和Reducer,并没有使用Combiner,这个算法的中间结果都是(term t; count 1) 的形式。即每个单词记录一次,这样的中间结果会很多。我们在下面改进的算法中使用Combiner。


  1: class Mapper
  2: method Map(docid a; doc d)
  3: H = new AssociativeArray
  4: for all term t in doc d do
  5: H{t} = H{t} + 1 . //Tally counts for entire document
  6: for all term t in H do
  7: Emit(term t; count H{t})
  1: class Reducer
  2: method Reduce(term t; counts [c1; c2; : : :])
  3: sum = 0
  4: for all count c in counts [c1; c2; : : :] do
  5: sum = sum + c
  6: Emit(term t; count sum)


  从改进算法的伪码中我们可以看出,我们只是在Mapper中的map方法中添加一个关联数组(即JAVA中的Map)。每次map 处理一个文档时,并不是遇到一个单词就写到本地磁盘,而是将其添加到关联数组中,并且关联数组的值加1。在所有单词处理完后,在使用一个for循环将所有的单词及其出现在该文档中的次数写回本地磁盘。这样每个Mapper 的输出就是一个Map方法处理一篇文集中的单词及其出现在该文档中的次数。比如在处理docId =“16”的时候,单词“link”在这个文集中出现了5次。在没有使用Combiner的情况下,会有5个中间结果写回磁盘即(link;1), (link;1), (link;1), (link;1), (link;1)。而在使用Combiner 的情况下对于单词“link”只会有一个中间结果(link;5)写回磁盘。从中可以看出Combiner可以减少中间结果的数量。


  这个的Combiner只是对本地Map方法产生的结果进行汇总。其作用相当于一个“mini-Reducer”。而事实上在Hadoop的MapReduce的实现中,其Combiner就是实现一个Reducer。


  上述的算法相比没有Combiner 的算法有了很大的提高,实际上该算法还有提升的空间。这就是接下来要讲的“In-Mapper Combining”。


  Combiner 的使用可以使得每个Mapper 的map 方法产生的结果本地聚集。实际上更为有效是,我们可以让每个Mapper 的结果本地聚集。上面数单词的例子中,每个Mapper 会处理10 文档,而Mapper中的map方法会每次处理1个文档,map会循环10遍。我们可以直接将 10个文档的单词进行本地聚集。


  下面是使用 “In-Mapper Combining”的算法伪码实现:


  1: class Mapper
  2: method Initialize
  3: H = new AssociativeArray
  4: method Map(docid a; doc d)
  5: for all term t in doc d do
  6: H{t} = H{t} + 1 . //Tally counts across documents
  7: method Close
  8: for all term t in H do
  9: Emit(term t; count H{t})
  1: class Reducer
  2: method Reduce(term t; counts [c1; c2; : : :])
  3: sum = 0
  4: for all count c in counts [c1; c2; : : :] do
  5: sum = sum + c
  6: Emit(term t; count sum)


  从上面使用“In-Mapper Combining”的伪码中可以看出,Mapper中的不只是有map 一个方法,而是增加了Initialize 和Close 方法。即我们之前说的每个Mapper执行过程中在开始和结束都可以有一段程序自定义代码,来确定每个Mapper 和Reducer 在执行之前和结束后的动作。这里对应Hadoop 中的方法是:setup 和 cleanup 方法。


  下面对该算法分析:在每个Mapper 启动的时候会有一个关联数组的产生。在执行每个Map方法执行完时,并不直接写回磁盘,而是将单词加入到关联数组中,在整个Mapper执行完后才将所有单词写回磁盘(Close方法完成)。对应数单词的例子来说,每个Mapper不是在每篇文档处理完后写回磁盘的,而是每个Spiltter 的10篇文档处理完后,才一次性写回磁盘。这样中间结果相比直接使用Combiner就更少。


  1.2 Combiner 和 in-Mapper Combining的优缺点


  In-Mapper Combining 虽然比Combiner 有更少的中间结果。但是它有几个缺点。首先它破坏了MapReduce 编程模式的基础,因为保存中间结果跨越了多个Key/Value。如果说为了效率,我们不刻意的去追求模式。但是对于一些特定的算法它是不合适使用,比如某些算法要求对Map方法处理的Key/Value的中间结果先后有要求,那么这种In-Mapper Combining 是不适应的。另一个重要的缺点是In-Mapper Combining 对拓展性提出了挑战。以数单词为例,假设Mapper处理的10篇文档很大设计到很多的单词,这样关联数组势必会非常大,又可能大到一个JVM不能完全存储这个关联数组。这样拓展性会遇到挑战。


  对于第二个缺点,我可以采用定期写回磁盘的方法来解决。


  Combiner 和In-Mapper Combining 有除了减少中间结果外,还可以减小分布的倾斜度。比如在数单词的例子中,一些常用的单词,可能会有很多的中间结果,以至于处理这些常用单词的Reducer 会比其他的Reducer 慢很多,这种Reducer拖后腿的现象在MapReducer经常出现,而Combiner 和In-Mapper Combining的使用有助于减少这种情况。