大数据技术入门(第2版)
上QQ阅读APP看书,第一时间看更新

3.3 资源的调度器

一个公司内部的Hadoop YARN集群,肯定会被多个业务、多个用户同时使用,这些业务和用户共享YARN的资源。如果不进行资源的管理与规划,那么整个YARN的资源很容易被某一个用户提交的应用占满,而其他任务只能等待,这种当然很不合理。我们希望每个业务和每个用户都能分配到资源来运行各自的任务。在理想情况下,应用对YARN资源的请求应该立刻得到满足,但在现实情况中资源往往是有限的,特别是在一个很繁忙的集群,一个应用的资源请求经常需要等待一段时间才能得到相应的资源。在YARN中,负责给应用程序分配资源的就是调度器(Scheduler)。其实调度本身就是一个难题,很难找到一个完美的策略可以解决所有的应用场景。为此,YARN提供了多种调度器和可配置的策略供我们选择。YARN自带了三种常用的调度器,分别是FIFO Scheduler(先进先出调度器)、Capacity Scheduler(容量调度器)和Fair Scheduler(公平调度器)。第一个是默认的调度器,它属于批处理调度器,而后两个属于多租户调度器,它采用树形多队列的形式组织资源,更适合上述应用场景。值得指出的是,YARN的调度器是插拔式的。图3-10显示了我们公司在某一个集群上的调度队列。

图3-10 资源调度队列

FIFO Scheduler把应用按提交的顺序排成一个队列,这是一个先进先出队列,在进行资源分配的时候,先给队列中最前面的应用分配资源,等到最前面的应用需求满足后再给下一个应用分配资源,以此类推。FIFO Scheduler是最简单也是最容易理解的调度器,它不需要任何配置,但它并不适用于共享集群。大的应用可能会占用所有集群资源,这就导致其他应用被阻塞。在共享集群中,更适合采用Capacity Scheduler或Fair Scheduler,这两个调度器都允许大任务和小任务在提交的同时获得一定的系统资源。

图3-11是YARN调度器的对比图,它展示了这三类调度器的区别。从图中可以看出,在FIFO Scheduler中,小任务会被大任务阻塞。而对于Capacity Scheduler,有一个专门的队列用来运行小任务,但是为小任务专门设置一个队列会预先占用一定的集群资源,这就导致大任务的执行时间会落后于使用FIFO Scheduler的时间。

在Fair Scheduler中,资源队列不需要预先占用一定的系统资源,Fair Scheduler会为所有运行的作业动态地调整系统资源。如图3-11所示,当第一个大作业提交时,只有这一个作业在运行,此时它获得了所有集群资源;当第二个小任务提交后,Fair Scheduler会分配一半资源给这个小任务,让这两个任务公平的共享集群资源。需要注意的是,在Fair Scheduler中,从第二个任务提交到获得资源会有一定的延迟,因为它需要等待第一个任务释放占用的Container。小任务执行完成之后也会释放自己占用的资源,大任务又获得了全部的系统资源。最终的效果就是Fair Scheduler在得到了高的资源利用率的同时又能保证小任务及时完成。

图3-11 三类调度器

如图3-10所示,正在使用的是Fair Scheduler。Fair Scheduler看上去很公平,但是我们在实际工作中碰到了一些Livelock锁的问题,所以,我们正在转往Capacity Scheduler。

3.3.1 Capacity Scheduler

Capacity Scheduler(容量调度器)允许多个组织或部门共享整个集群,每个组织或部门可以获得集群的一部分计算能力。通过为每个组织或部门分配专门的队列,然后再为每个队列分配一定的集群资源,这样整个集群就可以通过设置多个队列的方式给多个组织或部门提供服务了。在一个队列内部,资源的调度往往是采用先进先出(FIFO)的策略,这样一个组织内部的多个成员就可以共享这个队列资源了。

一个作业(Job)可能使用不了一个队列上的资源。如果这个队列中要运行多个作业,而且这个队列的资源够用,那么资源就分配给这些作业,一切正常。如果这个队列的资源不够用了呢?其实Capacity Scheduler仍可能分配额外的资源给这个队列,这就是“弹性队列(Queue Elasticity)”的概念。在正常的操作中,Capacity Scheduler不会强制释放Container,当一个队列资源不够用时,这个队列只能获得其他队列释放后的Container资源。当然,我们可以为队列设置一个最大资源使用量,以免这个队列过多的占用空闲资源,导致其他队列无法使用这些空闲资源,这就是弹性队列需要权衡的地方。下面来看一个例子,假设我们有一个如下层次的队列:

下面是一个Capacity Scheduler的配置文件(capacity-scheduler.xml)。在这个配置文件中,在root队列下面定义了两个子队列prod和dev,分别占40%和60%的容量。需要注意,一个队列的配置是通过属性yarn.sheduler.capacity.<queue-path>.<sub-property>来指定的,<queue-path>代表的是队列的继承树,如root.prod队列,<sub-property>一般指capacity和maximum-capacity(最大容量,即最大资源使用量)。

在上面的配置文件中,dev队列又被分成了eng和science两个相同容量的子队列。dev的maximum-capacity属性被设置成了75%,所以,即使prod队列完全空闲dev也不会占用全部集群资源,也就是说,prod队列仍有25%的可用资源用来应急。eng和science两个队列没有设置maximum-capacity属性,也就是说eng或science队列中的作业可能会用到整个dev队列的所有资源(最多为集群的75%)。而类似的,由于没有为prod设置maximum-capacity属性,它有可能会占用集群全部资源。除了可以设置队列及其容量外,还可以设置允许同时运行多少应用、队列的ACL认证等。

队列的使用取决于具体的应用。例如,在MapReduce中,可以通过mapreduce.job.queuename属性指定要用的队列。如果队列不存在,在提交任务时就会收到错误信息。如果没有定义任何队列,所有的应用将会放在一个默认队列中。注意:对于Capacity Scheduler,队列名是队列树中的最后一部分。

以Hive为例,如果使用MR作为执行引擎,那么指定队列如下:

     beeline !connect jdbc:hive2://your.host:your.port/data_base?mapred.job.queue
.name=your_queue_name

如果使用TEZ作为执行引擎,那么指定队列如下:

     beeline !connect jdbc:hive2://your.host:your.port/data_base?tez.queue.name
=<queue-name>

3.3.2 Fair Scheduler

Fair Scheduler(公平调度器)的设计目标是为所有的应用分配公平的资源(对公平的定义可以通过参数来设置)。图3-12的“Fair Scheduler实例对比图”展示了一个队列中两个应用的Fair Scheduler。当然,Fair Scheduler也可以在多个队列间工作。举个例子,假设有两个用户A和B,他们分别拥有一个队列。当用户A启动一个作业(图中的job1)而B没有任务时,A会获得全部集群资源。当B启动一个作业(图中的job2)后,A的作业job1会继续运行,只是过一会儿之后两个任务会各自获得一半的集群资源。如果此时B再启动第二个作业(图中的job3)并且其他作业还在运行,则它将会和B的第一个作业job2共享B这个队列的资源,也就是说,B的两个作业(job2和job3)各自使用四分之一的集群资源,而A的作业job1仍然使用一半的集群资源,结果就是资源最终在两个用户之间平等的共享。

图3-12 Fair Scheduler实例对比图

要启用Fair Scheduler(公平调度器),可以通过yarn-site.xml配置文件中的yarn.resourcemanager.scheduler.class参数进行设置,而默认是采用Capacity Scheduler(容器调度器)。如果我们要使用Fair Scheduler,需要在这个参数上设置FairScheduler类的全名:

     org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler

Fair Scheduler的配置文件是fair-scheduler.xml,具体位置是由yarn-site.xml中的yarn.scheduler.fair.allocation.file属性指定。若系统找不到这个配置文件,那么Fair Scheduler采用的分配策略和前面介绍的类似:Scheduler会在用户提交第一个应用时为其自动创建一个队列,队列的名字就是用户名,所有的应用都会被分配到相应的用户队列中。

可以在配置文件中设置每一个队列,并且可以像Capacity Scheduler一样分层次设置队列。例如,参考capacity-scheduler.xml来设置fair-scheduler,如下所示:

队列的层次是通过嵌套<queue>元素实现的,即使没有设置到<root>元素里,所有的队列也都是root队列的孩子。在上面的设置中,我们把dev队列分成了eng和science两个队列。Fair Scheduler中的队列有一个权重属性(这个权重就是对公平的定义),并把这个属性作为Fair Scheduler的依据。在这个例子中,当调度器将集群的资源以40:60的比例分配给prod和dev时便视作公平,而eng和science队列没有定义权重,则会被平均分配。这里的权重并不是百分比,所以把上面的40和60分别替换成2和3,效果也是一样的。注意,在没有配置文件时,用户自动创建的队列仍有权重并且权重值为1,而每个队列内部仍可以有不同的调度策略。队列的默认调度策略可以通过顶级元素<defaultQueueSchedulingPolicy>进行配置,如果没有配置,则默认采用Fair Scheduler。

尽管是Fair Scheduler,其仍支持在队列级别采用FIFO Scheduler的调度。每个队列的调度策略可以被其内部的<schedulingPolicy>元素所覆盖,在上面这个例子中,prod队列就被指定采用FIFO Scheduler进行调度,所以,对于提交到prod队列的任务就可以按照FIFO Scheduler调度规则顺序地执行了。需要注意的是,prod和dev之间的调度仍然是Fair Scheduler,同样eng和science也是采用Fair Scheduler。尽管在上面的设置中没有列出来,但是每个队列仍可设置最大、最小资源占用数和最大可运行应用的数量。

Fair Scheduler采用了一套基于规则的系统来确定应用应该放到哪个队列中。在上面的例子中,<queuePlacementPolicy>元素定义了一个规则列表,其中的每个规则会被逐个尝试直到匹配成功。例如,上面例子的第一个规则是specified,则会把应用放到它指定的队列中,若这个应用没有指定队列名或队列名不存在,则说明不匹配这个规则,然后尝试下一个规则。primaryGroup规则会尝试把应用放在以用户所在的Unix组名命名的队列中,如果没有这个队列,就转而尝试下一个规则。当前面所有规则都不满足时,则触发默认规则,把应用放在dev.eng队列中。当然,我们可以不配置queuePlacementPolicy规则,调度器则会默认采用如下规则:

     <queuePlacementPolicy>
       <rule name="specified" />
       <rule name="user" />
     </queuePlacementPolicy>

上面规则可以归结成一句话,除非队列被准确地定义,否则会以用户名为队列名来创建队列。还有一个简单的配置策略就是让所有的应用放入同一个队列中(默认方式),这样就可以让所有应用之间平等共享集群。这个配置的定义如下:

     <queuePlacementPolicy>
       <rule name="default" />
     </queuePlacementPolicy>

对于上面的功能,还可以不使用配置文件,而直接设置yarn.scheduler.fair.user-as-default-queue=false,这样应用便会被放入默认队列中,而不是各个用户队列。另外,我们还可以设置yarn.scheduler.fair.allow-undeclared-pools=false,这样用户就无法创建队列了。

最后来讲一下抢占(Preemption)。当一个新的作业提交到一个繁忙集群中的空队列时,作业并不会马上执行,而是阻塞直到正在运行的作业释放系统资源。为了使提交作业的执行时间更具预测性(可以设置等待的超时时间),Fair Scheduler支持抢占。抢占就是允许调度器“杀掉”占用超过其应占份额资源队列的Container(容器),这些资源便可被分配到应该享有这些份额资源的队列中。需要注意的是,抢占会降低集群的执行效率,因为需要重新执行被终止的任务。通过设置一个全局的参数yarn.scheduler.fair.preemption=true来启用抢占功能。此外,还有两个参数用来控制抢占的过期时间(这两个参数默认没有设置,需要至少设置一个以启用抢占功能):“minimum share preemption timeout”(最小共享抢占超时)和“fair share preemption timeout”(公平共享抢占超时)。如果队列在“minimum share preemption timeout”指定的时间内未获得最小的资源保障,调度器就会抢占Container。我们可以通过配置文件中的顶级元素<defaultMinSharePreemptionTimeout>为所有队列设置这个超时时间,还可以在<queue>元素内设置<minSharePreemptionTimeout>元素来为某个队列指定超时时间。与此类似,如果队列在“fair share preemption timeout”指定时间内未获得平等的资源(例如获得一半的资源,这个比例是可以配置的),调度器则会抢占Container。这个超时时间可以通过顶级元素<defaultFairSharePreemptionTimeout>和元素级元素<fairSharePreemptionTimeout>分别设置所有队列和某个队列的超时时间。上面提到的比例可以通过<defaultFairSharePreemptionThreshold>(设置所有队列)和<fairSharePreemptionThreshold>(设置某个队列)进行设置,默认的比例是0.5。

3.3.3 资源调度实例分析

在YARN中,用户以队列(Queue)的形式组织,每个用户可属于一个或多个队列,且只能向这些队列提交应用程序。每个队列被划分了一定比例的资源。图3-13所示的是一个集群的队列配置,其中为每个客户账号创建一个队列(一个队列下可有多个子队列,以此类推)。

图3-13 资源队列实例

YARN的资源分配过程是异步的,也就是说,资源调度器将资源分配给一个应用程序后,不会立刻推送给对应的Application Master(应用程序主控器),而是暂时放到一个缓冲区中,等待Application Master通过周期性的RPC函数主动来取,也就是说,采用了拉动式(Pull-based)模型,而不是推送式(Push-based)模型。在上面例子中,资源调度是使用Fair Scheduler。在每个队列中可以设置或配置最小和最大的可用资源(内存和CPU)、最大可同时运行应用程序的数量、权重、以及可以提交和管理应用程序的用户等。

我们再举个例子。假设整个YARN集群的可用资源为100 vcore和100GB内存,现在为3个业务部门各自规划一个队列,另外,规划一个默认队列,用于运行其他用户和业务部门提交的任务。如果没有在任务中指定队列(通过参数mapreduce.job.queuename),那么可以设置使用用户名作为队列名来提交任务,即用户businessA提交的任务被分配到businessA队列中,用户businessC提交的任务被分配到businessC队列中。除了设置的固定用户,其他用户提交的任务将会被分配到默认队列中。这里的用户名,就是提交应用程序所使用的Linux/Unix用户名。另外,每个队列可以设置允许提交任务的用户名,例如,在businessA队列中设置了允许用户businessA和用户sam提交任务,如果由用户sam提交任务,并且在任务中指定了队列为businessA,那么也可以正常提交到资源池businessA中。下面来看几个应用场景。

应用场景1:根据权重(Weight)获得额外的空闲资源

在每个队列的配置项(或称为设置项)中,有个Weight属性(默认为1),标记了队列的权重。当队列中有任务等待,并且集群中有空闲资源的时候,每个队列可以根据权重获得不同比例的集群空闲资源。例如,队列businessA和businessB的权重分别为2和1,这两个队列中的资源都已经被占满了,并且还有任务在排队,此时集群中有30个Container的空闲资源,那么businessA将会额外获得20个Container的资源,businessB会额外获得10个Container的资源。

应用场景2:最小资源保证

在每个队列中,允许配置该队列的最小资源,这是为了防止把空闲资源共享出去还未回收的时候,该队列恰有任务需要运行的窘境。例如,队列businessA中配置了最小资源为(5 vcore,5GB),那么即使没有任务运行,YARN也会为businessA预留出最小资源。一旦有任务需要运行,而集群中已经没有其他空闲资源的时候,这个最小资源也可以保证businessA中的任务可以先运行起来,随后再从集群中获取更多资源。

应用场景3:动态更新资源配额

Fair Scheduler除了需要在yarn-site.xml文件中启用和配置之外,还需要一个XML文件来配置资源池以及配额,而该XML中每个资源池的配额可以动态更新,调度器会重新装载这个文件,不用重启YARN集群。例如,下面是我们所使用的yarn-site.xml中的部分配置或设置的实例:

以上各个属性的说明如下。

· yarn.resourcemanager.scheduler.class:设置YARN使用的调度器插件类名。Fair Scheduler对应的是:org.apache.hadoop.yarn.server. resourcemanager.scheduler.fair.FairScheduler。

· yarn.scheduler.fair.allocation.file:用于设置队列的XML文件路径。

· yarn.scheduler.fair.preemption:是否启用资源抢占。

· yarn.scheduler.fair.user-as-default-queue:设置成true,当任务中未指定队列的时候,将以用户名作为队列名。这个设置就实现了根据用户名自动分配队列。

· yarn.scheduler.fair.allow-undeclared-pools:是否允许创建未定义的队列。如果设置成true,YARN将会自动创建任务中指定的未定义过的队列。设置成false之后,任务中指定的未定义的队列将无效,该任务会被分配到默认队列中。

最后看一下图3-12中q13队列所对应的fair-scheduler.xml中的部分配置或设置的实例:

在上面的配置中,q13队列下面有2个子队列,分别是hdp2和high_priority。在这两个子队列下面,各自有5个子队列,分别是veryhigh、high、default、low和verylow。以上各个属性的说明如下:

· minResources:最小资源数量(内存和vcores)。hdp2和high_priority分别设置了q13的最小资源数量的20%和80%。

· maxResources:最大资源数量(内存和vcores)。hdp2和high_priority和q13的最大资源数量是一样的。

· maxRunningApps:最大同时可运行应用程序的数量。根据我们的经验,把它的值设置为最小vcore数量的25%。

· weight:资源池权重。

顾名思义,这5个子队列的主要区别是权重和抢占设置。

如图3-13所示,Fair Scheduler各资源池配置及使用情况,在Resource Manager的Web监控页面上都可以看到。

3.3.4 内存和CPU资源调度

当前YARN支持内存和CPU两种资源类型的管理和分配。当Node Manager启动时,会向Resource Manager注册,而注册信息中会包含该节点可分配的CPU和内存总量,这两个值均可通过配置选项进行设置。

1.内存资源的调度

关于内存的几个重要参数如下:

(1)yarn.nodemanager.resource.memory-mb

表示该节点上YARN可使用的物理内存总量,默认是8192MB,注意,如果节点内存资源不够8GB,则需要调减小这个值,而YARN不会智能地探测节点的物理内存总量。

(2)yarn.nodemanager.vmem-pmem-ratio

每单位的物理内存总量对应的虚拟内存量,默认是2.1,表示每使用1MB的物理内存,最多可以使用2.1MB的虚拟内存总量。

YARN允许我们配置每个节点上可用的物理内存资源,注意,这里说的是“可用的”,因为一个节点上的内存会被若干个服务共享,例如一部分给YARN,一部分给HDFS,一部分给HBase等,YARN配置的只是自己可以使用的内存部分。

(3)yarn.nodemanager.pmem-check-enabled

是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其“杀掉”,默认是true。

(4)yarn.nodemanager.vmem-check-enabled

是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其“杀掉”(即终止给任务的运行),默认是true。

(5)yarn.scheduler.minimum-allocation-mb

单个任务可申请的最少物理内存量,默认是1024MB,如果一个任务申请的物理内存量少于该值,则该对应的值改为这个数。

(6)yarn.scheduler.maximum-allocation-mb

单个任务可申请的最多物理内存量,默认是8192MB。在默认情况下,YARN采用了线程监控的方法判断任务是否超量使用内存,一旦发现超量,则直接将其“杀死”。

2.CPU资源的调度

由于CPU资源的独特性,目前这种CPU分配方式仍然是粗粒度的。举个例子,很多任务可能是I/O密集型的,消耗的CPU资源非常少,如果此时为它分配一个CPU,则是一种严重浪费,完全可以让它与其他几个任务共用一个CPU。目前的CPU被划分成虚拟CPU(CPU Virtual Core),这里的虚拟CPU是YARN自己引入的概念,初衷是,考虑到不同节点的CPU性能可能不同,每个CPU具有的计算能力也是不一样的,例如某个物理CPU的计算能力可能是另外一个物理CPU的2倍,这时候,我们可以通过为第一个物理CPU多配置几个虚拟CPU来弥补这种差异。用户提交作业时,可以指定每个任务需要的虚拟CPU个数。YARN引入了概念vcore(表示虚拟内核的意思)以区别不同性能的物理内核(core),如图3-14所示。

当用户提交应用程序时,可以指定每个任务需要的虚拟CPU个数。例如,在MRAppMaster中,每个Map Task和Reduce Task默认情况下需要的虚拟CPU个数为1,用户可分别通过mapreduce.map.cpu.vcores和mapreduce.reduce.cpu.vcores进行修改(对于内存资源,Map Task和Reduce Task,用户可分别通过mapreduce.map.memory.mb和mapreduce.reduce.memory.mb进行修改)。

在YARN中,CPU相关配置参数如下:

(1)yarn.nodemanager.resource.cpu-vcores

表示该节点上YARN可使用的虚拟CPU个数,默认是8。注意, YARN不会智能地探测节点的物理CPU总数。

图3-14 CPU资源调度

(2) yarn.scheduler.minimum-allocation-vcores

单个任务可申请的最小虚拟CPU个数,默认是1,如果一个任务申请的CPU个数少于该数,则该对应的值改为这个数。

(3)yarn.scheduler.maximum-allocation-vcores

单个任务可申请的最多虚拟CPU个数,默认是32。在默认情况下,YARN是不会对CPU资源进行调度的,我们需要配置相应的资源调度器,具体可参考Fair Scheduler相关参数和Capacity Scheduler相关参数。

在YARN的框架管理中,无论是Application Master从Resource Manager申请资源,还是Node Manager管理自己所在节点的资源,都是通过Container进行的。Container是YARN的资源抽象,此处的资源包括上述的内存和CPU。

从图3-15中可以看到,首先Application Master通过请求包ResourceRequest从Resource Manager申请资源,当获取到资源后,Application Master对其进行封装,封装成ContainerLaunchContext对象,通过这个对象,Application Master与Node Manager进行通信,以便启动该任务。ResourceRequest结构如下:

图3-15 资源申请流程

在提交申请时,我们可以期望从哪台主机上获得,但最终还是Application Master与Resource Manager协商决定。在上面这个例子中,如果没有限制资源的申请量,则应用程序在资源申请上是无限的。YARN采用覆盖式资源申请方式,即Application Master每次发出的资源请求会覆盖掉之前在同一节点且优先级相同的资源请求,也就是说同一节点中具有相同优先级的资源请求只能有一个。

Container结构如下:

Container是YARN的资源抽象,封装了节点上的一些资源(CPU与内存)。Container是Application Master向Resource Manager申请的,其运行是由Application Master向资源所在Node Manager发起的。每个Container一般可以运行一个任务,当Application Master收到多个Container时,将进一步分给某个任务。如MapReduce。ContainerLaunchContext结构如下:

下面结合一段代码,仅以ContainerLaunchContext为例进行描述。这是申请一个新的ContainerLaunchContext:

     ContainerLaunchContext ctx =Records.newRecord(ContainerLaunchContext.class);
     //填写必要的信息
     ctx.setEnvironment(...);
     childRsrc.setResource(...);
     ctx.setLocalResources(...);
     ctx.setCommands(...);
     //启动任务
     startReq.setContainerLaunchContext(ctx);