MapReduce工作机制

时间:2017-03-27 12:36 来源:武松娱乐整理 字体:[ ] 评论:

MapReduce任务执行总流程

以下图5 是MapReduce作业详细的执行流程图。 \ 图 5 MapReduce 作业执行流程图 1.在客户端(Client)编写MapReduce代码,配置作业,启动作业。 这里需要注意的一点是:一个MapReduce作业在提交到Hadoop上之后,会进入完全地自动化执行过程。在这个过程中,用户除了监控程序的执行情况和强制终止之外,不能对作业的执行过程进行任何的干预。所以在作业提交之前,用户需要将所有应该配置的参数按照自己的需求配置完毕。 2.向Jobtracker请求一个Job ID 3.复制作业的资源文件 Jobtracker将运行作业所需要的资源,包括作业JAR文件、配置文件和计算所得的输入划分等复制到作业对应的HDFS上。这些文件都存放在Jobtracker专门为该作业创建的文件夹中。文件夹名为该作业的Job ID。作业jar的副本较多默认会有10个副本(mapred.submit.replication属性控制),因此在运行作业的任务时,集群中有很多副本可供taskTracker访问;输入划分信息告诉了JobTracker应该为这个作业启动多少个map任务等信息。 4.提交作业 调用JobTracker对象的submitJob()方法来真正提交作业,告诉JobTracker作业准备执行。 5.初始化 Jobtracker接收到对其submitJob()方法的调用后,会把此调用放入一个内部队列中,交由作业调度进行调度,并对其初始化。 6.获取输入划分 为了创建任务运行列表,作业调度器首先从共享文件系统中获取jobClient计算好的输入分片信息,然后为每一个分片创建map任务。创建reduce任务数量由jobConf的mapred.reduce.task属性决定,然后调度器创建相应数量的要运行的reduce任务。 7.JobTracker分配任务。 Tasktracker和JobTracker之间的通信和任务的分配都是通过心跳机制完成的。 Tasktracker运行一个简单的循环来定期发送“心跳”给Jobtracker,用来告诉Jobtracker它自己是否还存活和是否准备好运行新的任务(Map任务和Reduce任务的个数是否小于上限)。如果是,Jobtracker会为他分配一个任务,并将分配信息封装在心跳通信的返回值中返回给TaskTracker。当TaskTracker从JobTracker返回的心跳信息中获取新的任务信息时,它会将Map任务或者Reduce任务加入到对应的任务槽中。 需要注意的是:在JobTracker为Tasktracker分配Map任务时,为了减少网络带宽,会考虑将map任务数据本地化。它会根据Tasktracker的网络位置,选取一个距离此TaskTracker map任务最近的输入划分文件分配给此Tasktracker。最好的情况是,划分文件就在Tasktracker本地。 8.TaskTracker获取作业资源 TaskTracker将任务运行所必需的数据、配置信息、程序代码从HDFS复制到TaskTracker本地磁盘。 9.发布任务 10.执行 当TaskTracker获取到作业资源,tasktracker就会为任务新建一个本地工作目录,并把jar文件中的内容解压到这个文件夹下,然后tasktracker新建一个TaskRunner实例来运行该任务,TaskRunner会启动一个新的JVM来运行每个任务。 11.输出结果到HDFS。 以上就是MapReduce完成一个任务的整体过程。但其实上图中没有体现客户端是如何知道这个作业的运行进度和状态。下面我简单说明一下 由MapReduce作业分割成的每个任务中都有一组计数器,它们对任务执行过程中的进度组成事件进行计数。如果任务要报告进度,它便会设置一个标志以表明状态变化将会发送到TaskTracker上。另外一个监听线程检查到这标志后,会告知TaskTracker当前的任务状态。同时,TaskTracker在每隔5秒发送给JobTracker的心跳中封装任务状态,告知自己的任务执行状态。通过这种心跳通信机制,所有TaskTracker的统计信息都会汇总到JobTracker处。JobTracker将这些统计信息合并起来,产生一个全局作业进度统计信息,用来表明正在运行的所有作业以及其中包含任务的状态。最后,JobClient通过查看JobTracker来接收作业进度最新状态。 当JobTracker收到作业的最后一个任务已完成的通知后,便把作业状态设置为“成功”。然后在JobClient查询状态时知道任务已经完成。于是JobClient打印一条消息告知用户。最后JobTracker清空作业的工作状态,并指示TaskTracker也清空作业的工作状态(eg:删除中间输出)。 这里需要注意一点:Map任务将结果写入本地硬盘,而非HDFS。因为map任务的结果是中间结果,要给Reduce任务进行再次处理,处理完之后map任务的结果就没有价值了,通常是被删掉。HDFS上的同一份数据,通常情况下是要备份的。所以如果存入HDFS,那么就有些小题大做了。

错误处理机制

MapReduce任务执行过程中出现的故障可以分为两大类:硬件故障和任务执行失败引发的故障。 硬件故障 在Hadoop Cluster中,只有一个JobTracker,因此,JobTracker本身是存在单点故障的。如何解决JobTracker的单点问题呢?我们可以采用主备部署方式,启动JobTracker主节点的同时,启动一个或多个JobTracker备用节点。当JobTracker主节点出现问题时,通过某种选举算法,从备用的JobTracker节点中重新选出一个主节点。 机器故障除了JobTracker错误就是TaskTracker错误。TaskTracker故障相对较为常见,MapReduce通常是通过重新执行任务来解决该故障。 在Hadoop集群中,正常情况下,TaskTracker会不断的与JobTracker通过心跳机制进行通信。如果某TaskTracker出现故障或者运行缓慢,它会停止或者很少向JobTracker发送心跳。如果一个TaskTracker在一定时间内(默认是1分钟)没有与JobTracker通信,那么JobTracker会将此TaskTracker从等待任务调度的TaskTracker集合中移除。同时JobTracker会要求此TaskTracker上的任务立刻返回。如果此TaskTracker任务仍然在mapping阶段的Map任务,那么JobTracker会要求其他的TaskTracker重新执行所有原本由故障TaskTracker执行的Map任务。如果任务是在Reduce阶段的Reduce任务,那么JobTracker会要求其他TaskTracker重新执行故障TaskTracker未完成的Reduce任务。比如:一个TaskTracker已经完成被分配的三个Reduce任务中的两个,因为Reduce任务一旦完成就会将数据写到HDFS上,所以只有第三个未完成的Reduce需要重新执行。但是对于Map任务来说,即使TaskTracker完成了部分Map,Reduce仍可能无法获取此节点上所有Map的所有输出。所以无论Map任务完成与否,故障TaskTracker上的Map任务都必须重新执行。 任务失败 在实际任务中,MapReduce作业还会遇到用户代码缺陷或进程崩溃引起的任务失败等情况。用户代码缺陷会导致它在执行过程中抛出异常。此时,任务JVM进程会自动退出,并向TaskTracker父进程发送错误消息,同时错误消息也会写入log文件,最后TaskTracker将此次任务尝试标记失败。对于进程崩溃引起的任务失败,TaskTracker的监听程序会发现进程退出,此时TaskTracker也会将此次任务尝试标记为失败。对于死循环程序或执行时间太长的程序,由于TaskTracker没有接收到进度更新,它也会将此次任务尝试标记为失败,并杀死程序对应的进程。 在以上情况中,TaskTracker将任务尝试标记为失败之后会将TaskTracker自身的任务计数器减1,以便想JobTracker申请新的任务。TaskTracker也会通过心跳机制告诉JobTracker本地的一个任务尝试失败。JobTracker接到任务失败的通知后,通过重置任务状态,将其加入到调度队列来重新分配该任务执行(JobTracker会尝试避免将失败的任务再次分配给运行失败的TaskTracker)。如果此任务尝试了4次(次数可以进行设置)仍没有完成,就不会再被重试,此时整个作业也就失败了。

作业调度机制

在0.19.0版本之前,Hadoop集群上的用户作业采用先进先出(FIFO)调度算法,即按照作业提交的顺序来运行,同时每个作业在运行时都会使用整个集群,因此只有轮到自己运行才享受整个集群的服务。虽然FIFO调度器最后又支持了设置优先级别的功能,但是由于不支持优先级抢占,所以这种单用户的调度算法仍然不符合云计算中采用并行计算来提供服务的宗旨。从0.19.0版本开始,Hadoop提供了支持多用户同时服务和集群资源公平共享的调度器,即公平调度器(Fair Scheduler Guide)和容量调度器(Capacity Schedule Guide)。   公平调度器 公平调度是为作业分配资源的方式,其目的是随着时间的推移,让提交的作业获取等量的集群共享资源,让用户公平的共享集群。具体做法是:当集群上只有一个作业在运行时,它将使用整个集群;当有其他作业提交时,系统会将TaskTracker节点空闲时间片分配给这些新的作业,并保证每个作业都得到大概等量的CPU时间。 公平调度按作业池来组织作业,它会按照提交作业的用户数目将资源公平的分到这些作业池里。默认情况下,每一个用户拥有一个独立的作业池,以使每个用户都能获得一份等同的集群资源而不会管他们提交了多少作业。在每一个资源池内,会用公平共享的方法在运行作业之间共享容量,除了提供共享方法外,公平调取器还允许为作业池设置最小的共享资源,以确保特定用户、群组或生产应用程序总能获取到足够的资源。对于设置了最小共享资源的作业来说,如果包含了作业,它至少能获取最小的共享资源。但是如果最小共享资源超过作业需要的资源时,额外的资源会在其他作业池间进行切分。 在常规操作中,当提交一个新作业时,公平调度器会等待已运行作业中的任务完成,以释放时间片给新的作业。但公平调度器也支持作业抢占。如果新的作业在一定时间(即超时时间,可以配置)内还未获取公平的资源配置,公平调度器就会允许这个作业抢占已运行作业中的任务,以获取运行所需的资源。另外,如果作业在超时时间内获取的资源不到公平资源的一半时,也允许对任务进行抢占。而在选择时,公平调度器会在所有运行任务中选择最近运行起来的任务,这样浪费的计算相对较少。由于Hadoop作业能容忍丢失任务,抢占不会导致被抢占的作业失败,只会让被抢占作业的运行时间更长。 最后,公平调度器还可以限制每个用户和每个作业池并发运行的作业数量。这个限制可以在用户一次性提交数百个作业或当大量作业并发执行时用来确保中间数据不会塞满真个集群上的磁盘空间。超出限制的作业会被列入调度器的队列中等待,直到早期作业运行完毕。公平调度器再根据作业优先权和提交时间的排列情况从等待作业中调度即将运行的作业。   容量调度器 容量调度器以队列为单位划分资源,每个队列都有资源使用的下限和上限。每个用户也可以设定资源使用上限。一个队列的剩余资源可以共享给另一个队列,其他队列使用后还可以归还。管理员可以约束单个队列、用户或作业的资源使用。支持资源密集型作业,可以给某些作业分配多个slot(这是比较特殊的一点)。支持作业优先级,但不支持资源抢占。 这里明确一下用户、队列和作业之间的关系。Hadoop以队列为单位管理资源,每个队列分配到一定的资源,用户只能向一个或几个队列提交作业。队列管理体现为两方面:1. 用户权限管理:Hadoop用户管理模块建立在武松娱乐用户和用户组之间的映射之上,允许一个武松娱乐用户或者用户组对应一个或者多个队列。同时可以配置每个队列的管理员用户。队列信息配置在mapred-site.xml文件中,包括队列的名称,是否启用权限管理功能等信息,且不支持动态加载。队列权限选项配置在mapred-queue-acls.xml文件中,可以配置某个用户或用户组在某个队列中的某种权限。权限包括作业提交权限和作业管理权限。2. 系统资源管理:管理员可以配置每个队列和每个用户的可用资源量信息,为调度器提供调度依据。这些信息配置在调度器自己的配置文件(如Capacity- Scheduler.xml)中。 具体以上两种调度器该如何配置,这里不做细讲,在本文末尾一些参考链接中有一些相关配置的方法,有兴趣的可以去查阅。

Shuffle和排序

在MapReduce流程中,为了让Reduce可以并行处理Map结果,必须对Map的输出进行一定的排序和分割,然后再交付给对应的Reduce,而这个将Map输出进行进一步整理并交给Reduce的过程就成为了shuffle,即以下MapReduce流程图红色框中的部分。从shuffle的过程来看,它是MapReduce的核心所在,shuffle过程的性能与整个MapReduce的性能直接相关。 总体来说,shuffle过程包含在Map和Reduce两端中。在Map端的shuffle过程是对Map结果进行划分(partition)、排序(sort)和分割(spill),然后将属于同一个划分的输出合并在一起(Merge)并写在磁盘上,同时按照不同的划分将结果发送给对应的Reduce。Reduce端又会将各个Map送来的属于同一个划分的输出进行合并(Merge),然后对Merge的结果进行排序,最后交给Reduce处理。 \
shuffle过程的优化 在这里简单介绍从Hadoop参数配置触发来优化shuffle过程。在一个任务中,完成单位任务使用时间最多的一般都是I/O操作。在Map端,主要就是shuffle阶段中缓冲区内容超过阀值后的写出操作。可以通过合理的设置ip.sort.*属性来减少这种情况下的写出次数,具体来说就是增加io.sort.mb的值。在Reduce端,在复制Map输出的时候直接将复制的结果放在内存中同样能够提升性能,这样可以让部分数据少做两次I/O操作(前提是留下的内存足够Reduce任务执行)。所以在Reduce函数的内存需求很小的情况下,将mapred.inmen.merge.threshold设置为0,将mapred.job.reduce.input.buffer.percent(默认是0)设置为1.0(或者更低的值)能让I/O操作更少,提升shuffle性能。
顶一下(0) 踩一下(0)
Top_arrow
武松娱乐注册