小谈MapReduce

Hadoop 是Apache 下的一个项目,由HDFS、MapReduce、HBase、Hive 和ZooKeeper等成员组成。
HDFS:分布式存储系统
MapReduce:分布式计算系统
其中,HDFS 和MapReduce 是两个最基础最重要的成员。

InfoWord将MapReduce 评为2009 年十大新兴技术的冠军。MapReduce 是大规模数据(TB 级)计算的利器,Map 和Reduce 是它的主要思想,
来源于函数式编程语言,它的原理如下图所示:Map负责将数据打散,Reduce负责对数据进行聚集,用户只需要实现map 和reduce 两个接口,
即可完成TB级数据的计算,常见的应用包括:日志分析和数据挖掘等数据分析应用。另外,还可用于科学数据计算,如圆周率PI 的计算等。

Hadoop MapReduce的实现也采用了Master/Slave 结构。Master 叫做JobTracker,而Slave 叫做TaskTracker。
用户提交的计算叫做Job,每一个Job会被划分成若干个Tasks。JobTracker负责Job 和Tasks 的调度,而TaskTracker负责执行Tasks。

首先从Map端开始分析,当Map开始产生输出的时候,他并不是简单的把数据写到磁盘,因为频繁的写磁盘会导致性能严重下降,他的处理更加复杂,数据首先是写到内存中的一个缓冲区,并作一些预排序,以提升效率,每个Map任务都有一个用来写入输出数据的循环内存缓冲区,这个缓冲区默认大小是100M,可以通过io.sort.mb属性来设置具体的大小,当缓冲区中的数据量达到一个特定的阀值时,系统将会启动一个后台线程把缓冲区中的内容spill 到磁盘。在spill过程中,Map的输出将会继续写入到缓冲区,但如果缓冲区已经满了,Map就会被阻塞直道spill完成。spill线程在把缓冲区的数据写到磁盘前,会对他进行一个二次排序:
首先根据数据所属的partition排序,然后每个partition中再按Key排序。输出包括一个索引文件和数据文件,如果设定了Combiner,将在排序输出的基础上进行。Combiner就是一个Mini Reducer,它在执行Map任务的节点本身运行,先对Map的输出作一次简单的Reduce,使得Map的输出更紧凑,更少的数据会被写入磁盘和传送到Reducer。Spill文件保存在由mapred.local.dir指定的目录中,Map任务结束后删除。

Map的输出文件放置在运行Map任务的TaskTracker的本地磁盘上(注意:Map输出总是写到本地磁盘,但是Reduce输出不是,一般是写到HDFS),它是运行Reduce任务的 TaskTracker所需要的输入数据。Reduce任务的输入数据分布在集群内的多个Map任务的输出中,Map任务可能会在不同的时间内完成,只要有其中一个Map任务完成,Reduce任务就开始拷贝他的输出。这个阶段称为拷贝阶段,Reduce任务拥有多个拷贝线程,可以并行的获取Map输出。可以通过设定  mapred.reduce.parallel.copies来改变线程数。

Reduce是怎么知道从哪些TaskTrackers中获取Map的输出呢?当Map任务完成之后,会通知他们的父TaskTracker,告知状态更新,然后TaskTracker再转告JobTracker,这些通知信息是通过心跳通信机制传输的,因此针对以一个特定的作业,jobtracker知道Map输出与tasktrackers的映射关系。
Reducer中有一个线程会间歇的向JobTracker询问Map输出的地址,直到把所有的数据都取到。在Reducer取走了Map输出之后,TaskTracker不会立即删除这些数据,因为Reducer可能会失败,他们会在整个作业完成之后,JobTracker告知他们要删除的时候才去删除,如果Map输出足够小,他们会被拷贝到Reduce TaskTracker的内存中(缓冲区的大小由mapred.job.shuffle.input.buffer.percnet控制),或者达到了Map输出的阀值的大小(由mapred.inmem.merge.threshold控制),缓冲区中的数据将会被归并然后spill到磁盘。

拷贝来的数据叠加在磁盘上,有一个后台线程会将它们归并为更大的排序文件,这样做节省了后期归并的时间。对于经过压缩的Map 输出,系统会自动把它们解压到内存方便对其执行归并。

当所有的Map 输出都被拷贝后,Reduce 任务进入排序阶段(更恰当的说应该是归并阶段,因为排序在Map 端就已经完成),这个阶段会对所有的Map 输出进行归并排序,这个工作会重复多次才能完成。假设这里有50 个Map 输出(可能有保存在内存中的),并且归并因子是10(由io.sort.factor控制,就像Map 端的merge 一样),那最终需要5 次归并。每次归并会把10个文件归并为一个,最终生成5 个中间文件。在这一步之后,系统不再把5 个中间文件归并成一个,而是排序后直接“喂”给Reduce 函数,省去向磁盘写数据这一步。

最终归并的数据可以是混合数据,既有内存上的也有磁盘上的。由于归并的目的是归并最少的文件数目,使得在最后一次归并时总文件个数达到归并因子的数目,所以每次操作所涉及的文件个数在实际中会更微妙些。譬如,如果有40 个文件,并不是每次都归并10 个最终得到4 个文件,相反第一次只归并4 个文件,然后再实现三次归并,每次10 个,最终得到4 个归并好的文件和6 个未归并的文件。

要注意,这种做法并没有改变归并的次数,只是最小化写入磁盘的数据优化措施,因为最后一次归并的数据总是直接送到Reduce 函数那里。
在Reduce 阶段,Reduce 函数会作用在排序输出的每一个key 上。这个阶段的输出被直接写到输出文件系统,一般是HDFS。在HDFS 中,因为TaskTracker 节点也运行着一个DataNode 进程,所以第一个块备份会直接写到本地磁盘。到此,MapReduce 的Shuffle 和Sort 分析完毕。

转载请注明:刘召考的博客 » 小谈MapReduce

文章来源:

Author:goomoon
link:http://www.liuzk.com/402.html