MapReduce运行的基本原理
MapReduce 即是编程模型也是计算框架。开发人员必须基于MapReduce 编程模型进行开发,然后将程序通过MapReduce计算框架分发到Hadoop集群中运行。
大数据计算的核心思想是移动计算,它比移动数据更划算,传统的编程模型进行大数据计算就会遇到很多困难,因此Hadoop大数据计算使用这种MapReduce的编程模型。
编程模型
作为一种编程模型,该模型主要分为两个过程,即 Map 和 Reduce。 MapReduce 的整体思想是: 将输入的数据分成 M 个 tasks, 由用户自定义的 Map 函数去执行任务,产出 <Key, Value>形式的中间数据,然后相同的 key 通过用户自定义的 Reduce 函数去聚合,得到最终的结果。
MapReduce执行过程主要包括:
模型实现
具体过程如下:
Map端
- 根据输入输入信息,将输入数据 split 成 M 份
- 在所有可用的
worker节点中,起 M 个task任务的线程, 每个任务会读取对应一个 split 当做输入。 - 调用
map函数,将输入转化为<Key, Value>格式的中间数据,并且排序后,写入磁盘。 这里,每个task会写 R 个文件,对应着Reduce任务数。 数据写入哪个文件的规则有Partitioner决定,默认是hash(key) % R - (可选) 为了优化性能,中间还可以用一个
combiner的中间过程
Reduce 端
map阶段结束以后, 开始进入Reduce阶段,每个Reduce task会从所有的Map中间数据中,获取属于自己的一份数据,拿到所有数据后,一般会进行排序(Hadoop 框架是这样做)。说明: 这个排序是非常必要的,主要因为
Reduce函数的输入 是<key, []values>的格式,因为需要根据key去排序。为啥不用map<>()去实现呢? 原因:因为map必须存到内存中,但是实际中数据量很大,往往需要溢写到磁盘。 但是排序是可以做到的,比如归并排序。 这也就是map端产出数据需要排序,Reduce端获取数据后也需要先排序的原因。调用
Reduce函数,得到最终的结果输出结果,存入对应的文件(可选) 汇总所有
Reduce任务的结果。一般不做汇总,因为通常一个任务的结果往往是另一个MapReduce任务的输入,因此没必要汇总到一个文件中。
Master 数据结构
master 是 MapReduce 任务中最核心的角色,它需要维护 状态信息 和 文件信息。
- 状态信息:
map任务状态Reduce任务状态worker节点状态
- 文件信息
- 输入文件信息
- 输出文件信息
map中间数据文件信息
由于
master节点进程是整个任务的枢纽,因此,它需要维护输入文件地址,map任务执行完后,会产出中间数据文件等待reducer去获取,因此map完成后, 会向master上报这些文件的位置和大小信息,这些信息随着Reduce任务的启动而分发下推到对应的worker。
容错
worker 节点失败
master 会周期性向所有节点发送 ping 心跳检测,如果超时未回复,master 会认为该 worker 已经故障。任何在该节点完成的 map 或者 reduce 任务都会标记为 idle,并由其他的 worker 重新执行。
说明: 因为
MapReduce为了减少网络带宽的消耗,map的数据是存储在本地磁盘的,如果某个worker机器故障,会导致其他的Reduce任务拿不到对应的中间数据,所以需要重跑任务。那么这也可以看出,如果利用hadoop等分布式文件系统来存储中间数据,其实对于完成的map任务,是不需要重跑的,代价就是增加网络带宽。
Master 节点失败
master 节点失败,在没有实现HA的情况下,可以说基本整个MapReduce任务就已经挂了,对于这种情况,直接重启 master 重跑任务就OK了。如果集群有高可用方案,比如 master 主副备用,就可以实现 master 的高可用,代价就是得同步维护主副之间的状态信息和文件信息等。
失败处理的语义
只要Map Reduce函数是确定的,语义上不管是分布式执行还是单机执行,结果都是一致的。每个map Reduce 任务输出是通过原子提交来保证的, 即:一个任务要么有完整的最终文件,要么存在最终输出结果,要么不存在。
- 每个进行中的任务,在没有最终语义完成之前,都只写临时文件,每个
Reduce任务会写一个,而每个Map任务会写 R 个,对应 R 个reducer. - 当
Map任务完成的时候,会向master发送文件位置,大小等信息。Master如果接受到一个已经完成的Map任务的信息,就忽略掉,否则,会记录这个信息。 - 当
Reduce任务完成的时候,会将临时文件重命名为最终的输出文件, 如果多个相同的Reduce任务在多台机器执行完,会多次覆盖输出文件,这个由底层文件系统的rename操作的原子性,保证任何时刻,看到的都是一个完整的成功结果
对于大部分确定性的任务,不管是分布式还是串行执行,最终都会得到一致的结果。对于不确定的map 或者Reduce 任务,MapReduce 保证提供一个弱的,仍然合理的语义。
存储优化
由于网络带宽资源的昂贵性,因此对MapReduce 存储做了很多必要的优化。
- 通过从本地磁盘读取文件,节约网络带宽
- GFS 将文件分解成多个 大小通常为 64M 的
block, 并多备份存储在不同的机器上,在调度时,会考虑文件的位置信息,尽可能在存有输入文件的机器上调度map任务,避免网络IO。 - 任务失败时,也会尝试在离副本最近的worker中执行,比如同一子网下的机器。
- MapReduce 任务在大集群中执行时,大部分输入直接可以从磁盘中读取,不消耗带宽。
任务粒度
通常情况下,任务数即为O(M+R),这个数量应当比worker数量多得多,这样利于负载均衡和失败恢复的情况,但是也不能无限增长,因为太多任务的调度,会消耗master 存储任务信息的内存资源,如果启动task所花的时间比任务执行时间还多,那就不偿失了。
优化
自定义分区函数(partition)
自定义分区可以更好地符合业务和进行负载均衡,防止数据倾斜。 默认只是简单的 hash(key) % R
有序保证
每个partition内的数据都是排序的,这样有利于Reduce阶段的merge合并
Combiner 函数
这个是每个map阶段完成之后,局部先做一次聚合。比如:词频统计,每个 Word 可能出现了100次,如果不使用combiner, 就会发送100 个 <word, 1>, 如果combiner聚合之后,则为 <word, 100>, 大大地减少了网络传输和磁盘的IO。
输入输出类型
一个reader没必要非要从文件读数据,MapReduce 支持可以从不同的数据源中以多种不同的方式读取数据,比如从数据库读取,用户只需要自定义split规则,就能轻易实现。
计数器
MapReduce 还添加了计数器,可以用来检测MapReduce的一些中间操作。
计算框架
MapReduce 流程:input > Splitting > Mapping > Shuffling > Reducing > Result
MapReduce程序读取的数据,都是存储在HDFS的数据,最后的结果,也是要保存在HDFS中,因此,MapReduce要解决的第一个问题就是数据的切分问题。
Split大小的计算
- max.split(100M)
- min.split(10M)
- block(64M)
- max(min.split, min(max.split, block))
Shuffle
处于map和reduce之间,该阶段是最重要的阶段,也是最难的阶段。
input
map 执行的时候,数据来源是HDFS上的block,在MapReduce概念中,读取的是split,split和block对应的。
map阶段
经过自定义的 map 函数的处理后,结果以 key/value 的形式输出,但是这些结果要送到以后的哪一个 reduce 去执行,需要 partition 来决定。
partition
根据 key 或者 value 的值,以及 reduce 的数量,来决定当前的这对输出数据要交到那个 reduce 去处理。
默认方法:对 key hash 以后再以 reduce 的数量取模。
也可以由程序自定义 partition 函数,partition 只是对数据以后要被送到哪个 reduce 去处理做了一个标注,而不是立马把数据进行分区。
数据倾斜和负载均衡:默认的 partition 是可能产生数据倾斜和负载均衡,如果产生数据倾斜,就需要重新定义 partition 的分区规则,就可以避免数据倾斜问题。
内存缓冲区的概念
==每个map任务都有一个内存缓冲区,用于存储任务的输出,默认缓冲区的大小是100M,这个值是可以通过io.sort.mb来调整。由于缓冲区的空间大小有限,所以,当map task的输出结果很多的时候,内存缓冲区就装不下这么多的数据,也就需要将数据写到磁盘去。因此需要一个阈值(io.sort.spill.percent,默认是80%),当内存达到阈值以后,就会有一个单独的后台线程,负责将内存中的数据写到磁盘,这个过程叫做溢写**,**由于是由单独的线程来负责溢写,所以,溢写过程不会影响map结果的输出,但是,如果此期间缓冲区被写满,map就会阻塞知道写磁盘过程完成。==
这里,就有两个过程,一个写内存的过程,另外一个是写磁盘的过程。
sort
这里的 sort 是按字典顺序排列,而不是按数值大小进行排序
combine
将排好序的数据,按照键相同的合并在一起的规则,进行值的合并。
map 的输出结果在经过 partition 阶段的处理后,明确了要发给哪个 reduce 去做处理,当写入内存后,需要将所有的键值对按照 key 进行 sort,在 sort 的基础上,再对结果进行 combine,最后,再写到磁盘文件上。所以,在磁盘上的数据,是已经分好区的,并且已经排好序的。
溢写磁盘需要注意的地方:==如果map的输出结果很大,有多次溢写发生的话,磁盘上就会存在多个溢写文件(每次溢写都会产生一个溢写文件),在map task真正的完成是时,会将所有的溢写文件都Merge到一个溢写文件中,这个过程就叫Merge。比如,从一个map读取过来的是<aaa, 5>,另外一个map读取的 是<aaa,8>。相同的key,就会merge成一个group{aaa,[5, 8…]},这个数组中的不同的值,就是从不同的溢写文件中读取过来的,然后把这些值加起来。==
为什么要设置内存缓冲区:批量收集 map 的结果,减少磁盘 IO 的次数,提高频率。
磁盘文件要写到哪里:写磁盘将按照轮询方式写到mapred.local.dir属性指定的作业特定子目录的目录中。也就是存放在TaskTracker够得着的某个本地目录,每一个reduce task不断通过RPC从JobTracker中获取map task是否完成的信息,如果reduce task得到通知,获知某台TaskTracker上的map task完成,Shuffle的后半段就开始了。
所有的合并究竟是为什么: 因为map节点和reduce节点之间的数据拷贝是通过网络进行拷贝的,数据量越小,拷贝的越快,相应的处理也就越快。所以合并的目的就是减少map的输出数据量,让网络拷贝尽可能快。
需要特殊说明的是,以上的步骤,都是在本地机器上完成,并不需要通过网络进行数据的传输。
reduce的shuffle细节
copy阶段
reduce进程启动一些数据的copy线程,这个线程叫做fetcher线程,通过http方式请求map task所在的TaskTracker,来获取map task的输出数据。
reduce拷贝数据,不是进行随意的拷贝,之前的partition,已经将数据分好区,reduce只是拷贝各个map上分割给自己的那一部分数据,拷贝到本地后,从每一个map上拷贝过来的数据都是一个小文件,也是需要对这些小文件进行合并的。合并以后,输出到reduce进行处理。
加入client有设置过combiner,那么现在就是使用combiner的时候了。将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量。
当整个map结束后再对磁盘中这个map所产生的所有临时文件做合并(merge)
reduce从tasktracker copy数据
copy 过来的数据会先放在内存缓冲区,这里的缓冲区大小要比map端的更为灵活,它基于jvm的heap size 设置
merge的三种形式:1、内存到内存;2、内存到磁盘;3、磁盘到磁盘;merge从不同tasktracker上拿到的数据。