Hive总结及优化
Hive是一个构建在Hadoop之上的数据仓库软件,它可以使已经存储的数据结构化,它提供类似sql的查询语句HiveQL对数据进行分析处理。 Hive将HiveQL语句转换成一系列成MapReduce作业并执行(SQL转化为MapReduce的过程)。用户可以很方便的使用命令行和JDBC程序的方式来连接到hive。 目前,Hive除了支持MapReduce计算引擎,还支持Spark和Tez这两中分布式计算引擎。常用于离线批处理。
一、产生背景
大数据的时代,海量的数据对于传统的关系型数据库来说维护起来成本非常高,那该如何是好,Hadoop分布式的框架,可以使用廉价的机器部署分布式系统把数据存储再HDFS之上,通过MR进行计算,分析,这样是可以的,但是,MR大家应该知道,MapReduce编程带来的不便性,编程十分繁琐,在大多情况下,每个MapReduce程序需要包含Mapper、Reducer和一个Driver,之后需要打成jar包扔到集群上运行。如果mr写完之后,且该项目已经上线,一旦业务逻辑发生了改变,可能就会带来大规模的改动代码,然后重新打包,发布,非常麻烦(这种方式,也是最古老的方式),当大量数据都存放在HDFS上,如何快速的对HDFS上的文件进行统计分析操作?
一般来说,想要做会有两种方式:
A、学Java、学MapReduce(十分麻烦)
B、做DBA的:写SQL(希望能通过写SQL这样的方式来实现,这种方式较好)
然而,HDFS中最关键的一点就是,数据存储HDFS上是没有schema的概念的(schema:相当于表里面有列、字段、字段名称、字段与字段之间的分隔符等,这些就是schema信息),然而HDFS上的仅仅只是一个纯的文本文件而已,那么,没有schema,就没办法使用sql进行查询了啊。。。因此,在这种背景下,就有问题产生:如何为HDFS上的文件添加Schema信息?如果加上去,是否就可以通过SQL的方式进行处理了呢?于是强大的Hive出现了。
二、Hive深入剖析
再来看看官网给我们的介绍:官方第一句话就说明了Apache Hive 是构建在Apache Hadoop之上的数据仓库,有助于对大型的数据集进行读、写和管理。
那我们先对这句话进行剖析:
首先Hive是构建在Hadoop之上的,其实就是Hive中的数据其实是存储再HDFS上的(加上LOCAL关键字则是在本地),默认在/user/hive/warehouse/table
,有助于对大型数据集进行读、写和管理,那也就是意味着传统的关系型数据库已经无法满足现在的数据量了,需要一个更大的仓库来帮助我们存储,这里也引出一个问题:Hive和关系型数据库的区别,后面我们再来聊。
1、Hive的特征:
A、可通过SQL轻松访问数据的工具,从而实现数据仓库任务,如提取/转换/加载(ETL),报告和数据分析。
B、它可以使已经存储的数据结构化
C、可以直接访问存储在Apache HDFS™或其他数据存储系统(如Apache HBase™)中的文件
D、Hive除了支持MapReduce计算引擎,还支持Spark和Tez这两中分布式计算引擎(这里会引申出一个问题,哪些查询跑mr哪些不跑?)
E、它提供类似sql的查询语句HiveQL对数据进行分析处理。
F、数据的存储格式有多种,比如数据源是二进制格式, 普通文本格式等等
而Hive强大之处不要求数据转换成特定的格式,而是利用hadoop本身InputFormat API来从不同的数据源读取数据,同样地使用OutputFormat API将数据写成不同的格式。所以对于不同的数据源,或者写出不同的格式就需要不同的对应的InputFormat和Outputformat类的实现。
以stored as textfile为例,其在底层java API中表现是输入InputFormat格式:TextInputFormat以及输出OutputFormat格式:HiveIgnoreKeyTextOutputFormat。这里InputFormat中定义了如何对数据源文本进行读取划分,以及如何将切片分割成记录存入表中。而Outputformat定义了如何将这些切片写回到文件里或者直接在控制台输出。
不仅如此Hive的SQL还可以通过用户定义的函数(UDF),用户定义的聚合(UDAF)和用户定义的表函数(UDTF)进行扩展。几个函数之间的区别???Hive中不仅可以使用逗号和制表符分隔值(CSV / TSV)文本文件,还可以使用Sequence File、RC、ORC、Parquet (知道这几种存储格式的区别)。
A、在压缩存储时间上,除Sequencefile外基本都相差无几。
B、数据压缩比例上ORC最优,相比textfile节省了50倍磁盘空间,parquet压缩性能也较好。
C、SQL查询速度而言,ORC与parquet性能较好,远超其余存储格式。
综合上述各种性能指标,建议工作中原始日志写入hive的存储格式都采用ORC或者parquet格式,这和目前主流的做法一致。
当然Hive还可以通过用户来自定义自己的存储格式,基本上前面说的到的几种格式完全够了。Hive旨在最大限度地提高可伸缩性(通过向Hadoop集群动态添加更多机器扩展),性能,可扩展性,容错性以及与其输入格式的松散耦合。
2、Hive基本语法
可以参考之前的博客总结。HiveSQL语法<基本原理、操作函数>
Hive的存储结构如下:
A、Database:Hive中包含了多个数据库,默认的数据库为default,对应于HDFS目录是/user/hadoop/hive/warehouse
,可以通过hive.metastore.warehouse.dir
参数进行配置(hive-site.xml中配置)
B、Table: Hive 中的表又分为内部表和外部表 ,Hive 中的每张表对应于HDFS上的一个目录,HDFS目录为:/user/hadoop/hive/warehouse/[databasename.db]/table
。
C、Partition:分区,每张表中可以加入一个分区或者多个,方便查询,提高效率;并且HDFS上会有对应的分区目录:/user/hadoop/hive/warehouse/[databasename.db]/table
D、Bucket:….待续(未完成)
UDF函数这里要进行一个讲解UDF、DUAF、UDTF分别是啥?
我们知道Hive的SQL还可以通过用户定义的函数(UDF),用户定义的聚合(UDAF)和用户定义的表函数(UDTF)进行扩展。当Hive提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数(UDF:user-defined function)。
A、UDF(User-Defined-Function) 一进一出
B、UDAF(User- Defined Aggregation Funcation) 聚集函数,多进一出。
C、UDTF(User-Defined Table-Generating Functions) 一进多出,如:lateral view explore()
3、Hive与关系型数据库的区别
A、时效性、延时性比较高,可扩展性高;
B、Hive数据规模大,优势在于处理大数据集,对于小数据集没有优势
C、事务没什么用(比较鸡肋,没什么实际的意义,对于离线的来说) 一个小问题:那个版本开始提供了事务?
D、insert/update没什么实际用途,大数据场景下大多数是select
E、RDBMS也支持分布式,节点有限 成本高,处理的数据量小
F、Hadoop集群规模更大 部署在廉价机器上,处理的数据量大
G、数据库可以用在Online的应用中,Hive主要进行离线的大数据分析;
H、数据库的查询语句为SQL,Hive的查询语句为HQL;
I、数据库数据存储在LocalFS,Hive的数据存储在HDFS;
J、数据格式:Hive中有多种存储格式,由于在加载数据的过程中,不需要从用户数据格式到 Hive 定义的数据格式的转换,因此,Hive 在加载的过程中不会对数据本身进行任何修改,而只是将数据内容复制或者移动到相应的 HDFS 目录中。而在数据库中,不同的数据库有不同的存储引擎,定义了自己的数据格式。所有数据都会按照一定的组织存储,因此,数据库加载数据的过程会比较耗时。
K、Hive执行MapReduce,MySQL执行Executor;
4、Hive的优点
A、简单易上手
B、扩展能力较好(指集群 HDFS或是YARN)
C、统一的元数据管理 metastore包括的了数据库,表,字段分区等详细信息
D、由于统一的元数据管理所以和spark/impala等SQL引擎是通用的,通用是指,在拥有了统一的metastore之后,在Hive中创建一张表,在Spark/impala中是能用的,反之在Spark中创建一张表,在Hive中也能用;只需要共用元数据,就可以切换SQL引擎。涉及到了Spark sql 和Hive On Spark(实验版本)
E、使用SQL语法,提供快速开发的能力,支持自定义函数UDF。
F、避免了去写mapreduce,减少开发人员学习成本。
G、数据离线处理,比如日志分析,海量数据结构化分析
5、SQL转化为MapReduce的过程
了解了MapReduce实现SQL基本操作之后,我们来看看Hive是如何将SQL转化为MapReduce任务的,整个编译过程分为六个阶段:
A、Antlr定义SQL的语法规则,完成SQL词法,语法解析,将SQL转化为抽象语法树AST Tree
B、遍历AST Tree,抽象出查询的基本组成单元QueryBlock
C、遍历QueryBlock,翻译为执行操作树OperatorTree
D、逻辑层优化器进行OperatorTree变换,合并不必要的ReduceSinkOperator,减少shuffle数据量
E、遍历OperatorTree,翻译为MapReduce任务
F、物理层优化器进行MapReduce任务的变换,生成最终的执行计划
美团技术:HiveSQL编译过程
6、Hive内部表和外部表的区别
未被external修饰的是内部表(managed table),被external修饰的为外部表(external table);
区别:
A、内部表数据由Hive自身管理,外部表数据由HDFS管理;
B、内部表数据存储的位置是hive.metastore.warehouse.dir(默认:/user/hive/warehouse
),外部表数据的存储位置由自己制定;
C、删除内部表会直接删除元数据(metadata)及存储数据;删除外部表仅仅会删除元数据,HDFS上的文件并不会被删除;
7、行式存储vs列式存储
行式数据库存储在hdfs上式按行进行存储的,一个block存储一或多行数据。而列式数据库在hdfs上则是按照列进行存储,一个block可能有一列或多列数据。
如果要将数据进行压缩:
A、对于行式数据库,必然按行压缩,当一行中有多个字段,各个字段对应的数据类型可能不一致,压缩性能压缩比就比较差。
B、对于列式数据库,必然按列压缩,每一列对应的是相同数据类型的数据,故列式数据库的压缩性能要强于行式数据库。
如果要进行数据的查询:
假设执行的查询操作是:select id,name from table_emp;
A、对于行式数据库,它要遍历一整张表将每一行中的id,name字段拼接再展现出来,这样需要查询的数据量就比较大,效率低。
B、对于列式数据库,它只需找到对应的id,name字段的列展现出来即可,需要查询的数据量小,效率高。
假设执行的查询操作是:select * from table_emp;
对于这种查询整个表全部信息的操作,由于列式数据库需要将分散的行进行重新组合,行式数据库效率就高于列式数据库。
但是,在大数据领域,进行全表查询的场景少之又少,进而我们使用较多的还是列式数据库及列式储存。
8、Hive哪些查询会执行MR
Hive 0.10.0为了执行效率考虑,简单的查询,就是只是select,不带count,sum,group by这样的,都不走map/reduce,直接读取hdfs文件进行filter过滤。这样做的好处就是不新开mr任务,执行效率要提高不少,但是不好的地方就是用户界面不友好,有时候数据量大还是要等很长时间,但是又没有任何返回。
改这个很简单,在hive-site.xml里面有个配置参数叫hive.fetch.task.conversion
,将这个参数设置为more,简单查询就不走map/reduce了,设置为minimal,就任何简单select都会走map/reduce
1 | Create Table As Select (CTAS) 走mr |
1 | insert 一条或者多条 走mr |
9、Hive静态分区,动态分区
A、分区的概念
Hive的分区方式:由于Hive实际是存储在HDFS上的抽象,Hive的一个分区名对应HDFS上的一个目录名,子分区名就是子目录名,并不是一个实际字段。
B、分区的好处
产生背景:如果一个表中数据很多,我们查询时就很慢,耗费大量时间,如果要查询其中部分数据该怎么办呢,这是我们引入分区的概念。
Partition:分区,每张表中可以加入一个分区或者多个,方便查询,提高效率;并且HDFS上会有对应的分区目录:
C、语法
Hive分区是在创建表的时候用Partitioned by 关键字定义的,但要注意,Partitioned by子句中定义的列是表中正式的列,但是Hive下的数据文件中并不包含这些列,因为它们是目录名,真正的数据在分区目录下。
D、静态分区 和 动态分区的区别
创建表的语法都一样
静态分区:加载数据的时候要指定分区的值(key=value),比较麻烦的是每次插入数据都要指定分区的值,创建多个分区多分区一样,以逗号分隔。
动态分区:如果用上述的静态分区,插入的时候必须首先要知道有什么分区类型,而且每个分区写一个load data,太烦人。使用动态分区可解决以上问题,其可以根据查询得到的数据动态分配到分区里。其实动态分区与静态分区区别就是不指定分区目录,由系统自己选择。
1 | 首先,启动动态分区功能 |
加载数据方式并没有指定具体的分区,只是指出了分区字段。在select最后一个字段必须跟你的分区字段,这样就会自行根据deptno的value来分区。
10、删除分区
1 | ALTER TABLE my_partition_test_table DROP IF EXISTS PARTITION (day='2019-07-15'); |
11、Hive优化
A、我们知道大数据场景下不害怕数据量大,害怕的是数据倾斜,怎样避免数据倾斜,找到可能产生数据倾斜的函数尤为关键,数据量较大的情况下,慎用count(distinct),count(distinct)容易产生倾斜问题。
B、设置合理的map reduce 的task数量
map阶段优化
mapred.min.split.size: 指的是数据的最小分割单元大小;min的默认值是1B
mapred.max.split.size: 指的是数据的最大分割单元大小;max的默认值是256MB
通过调整max可以起到调整map数的作用,减小max可以增加map数,增大max可以减少map数。
需要提醒的是,直接调整mapred.map.tasks这个参数是没有效果的。
举例:
a) 假设input目录下有1个文件a,大小为780M,那么hadoop会将该文件a分隔成7个块(6个128m的块和1个12m的块),从而产生7个map数
b) 假设input目录下有3个文件a,b,c,大小分别为10m,20m,130m,那么hadoop会分隔成4个块(10m,20m,128m,2m),从而产生4个map数
即,如果文件大于块大小(128m),那么会拆分,如果小于块大小,则把该文件当成一个块。
其实这就涉及到小文件的问题:如果一个任务有很多小文件(远远小于块大小128m),则每个小文件也会被当做一个块,用一个map任务来完成,而一个map任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费。而且,同时可执行的map数是受限的。那么问题又来了。。是不是保证每个map处理接近128m的文件块,就高枕无忧了?
答案也是不一定。比如有一个127m的文件,正常会用一个map去完成,但这个文件只有一个或者两个小字段,却有几千万的记录,
如果map处理的逻辑比较复杂,用一个map任务去做,肯定也比较耗时。
我们该如何去解决呢???
我们需要采取两种方式来解决:即减少map数和增加map数;
1)减少map数量
1 | 假设一个SQL任务: |
2)增大map数量
如何适当的增加map数?
当input的文件都很大,任务逻辑复杂,map执行非常慢的时候,可以考虑增加Map数,来使得每个map处理的数据量减少,从而提高任务的执行效率。
1 | 假设有这样一个任务: |
reduce阶段优化
Reduce的个数对整个作业的运行性能有很大影响。如果Reduce设置的过大,那么将会产生很多小文件,
对NameNode会产生一定的影响,
而且整个作业的运行时间未必会减少;如果Reduce设置的过小,那么单个Reduce处理的数据将会加大,
很可能会引起OOM异常。
如果设置了mapred.reduce.tasks/mapreduce.job.reduces参数,那么Hive会直接使用它的值作为Reduce的个数;
如果mapred.reduce.tasks/mapreduce.job.reduces的值没有设置(也就是-1),那么Hive会
根据输入文件的大小估算出Reduce的个数。
根据输入文件估算Reduce的个数可能未必很准确,因为Reduce的输入是Map的输出,而Map的输出可能会比输入要小,
所以最准确的数根据Map的输出估算Reduce的个数。
1)Hive自己如何确定reduce数:
reduce个数的设定极大影响任务执行效率,不指定reduce个数的情况下,Hive会猜测确定一个reduce个数,基于以下两个设定:
hive.exec.reducers.bytes.per.reducer
(每个reduce任务处理的数据量,默认为1000^3=1G)
hive.exec.reducers.max
(每个任务最大的reduce数,默认为999)
计算reducer数的公式很简单N=min(参数2,总输入数据量/参数1)。即,如果reduce的输入(map的输出)总大小不超过1G,那么只会有一个reduce任务;
1 | select pt,count(1) from popt_tbaccountcopy_mes where pt = '2012-07-04' group by pt; |
2)调整reduce个数方法一
1 | 调整hive.exec.reducers.bytes.per.reducer参数的值; |
3)调整reduce个数方法二
1 | set mapred.reduce.tasks = 15; |
4)reduce个数并不是越多越好
同map一样,启动和初始化reduce也会消耗时间和资源;另外,有多少个reduce,就会有多少个输出文件,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题;
5)什么情况下只有一个reduce
很多时候你会发现任务中不管数据量多大,不管你有没有设置调整reduce个数的参数,任务中一直都只有一个reduce任务;其实只有一个reduce任务的情况,除了数据量小于hive.exec.reducers.bytes.per.reducer
参数值的情况外,还有以下原因:
- 没有group by的汇总,比如把
select pt,count(1) from popt_tbaccountcopy_mes where pt = ‘2012-07-04’ group by pt;
写成select count(1) from popt_tbaccountcopy_mes where pt = ‘2012-07-04’;
这点非常常见,希望大家尽量改写。 - 用了Order by
- 有笛卡尔积,通常这些情况下,除了找办法来变通和避免,我暂时没有什么好的办法,因为这些操作都是全局的,所以hadoop不得不用一个reduce去完成;同样的,在设置reduce个数的时候也需要考虑这两个原则:使大数据量利用合适的reduce数;使单个reduce任务处理合适的数据量;
6)合并小文件
我们知道文件数目小,容易在文件存储端造成瓶颈,给 HDFS 带来压力,影响处理效率。对此,可以通过合并Map和Reduce的结果文件来消除这样的影响。
用于设置合并属性的参数有:
是否合并Map输出文件:hive.merge.mapfiles=true(默认值为真)
是否合并Reduce 端输出文件:hive.merge.mapredfiles=false(默认值为假)
合并文件的大小:hive.merge.size.per.task=25610001000(默认值为 256000000)
12、Hive优化之小文件问题及其解决方案
小文件是如何产生的
动态分区插入数据,产生大量的小文件,从而导致map数量剧增。
reduce数量越多,小文件也越多(reduce的个数和输出文件是对应的)。
数据源本身就包含大量的小文件。
小文件问题的影响
- 从Hive的角度看,小文件会开很多map,一个map开一个JVM去执行,所以这些任务的初始化,启动,执行会浪费大量的资源,严重影响性能。
- 在HDFS中,每个小文件对象约占150byte,如果小文件过多会占用大量内存。这样NameNode内存容量严重制约了集群的扩展。
小文件问题的解决方案
A、从小文件产生的途经就可以从源头上控制小文件数量,方法如下:
- 使用Sequencefile作为表存储格式,不要用textfile,在一定程度上可以减少小文件。
- 减少reduce的数量(可以使用参数进行控制)。
- 少用动态分区,用时记得按distribute by分区。
B、对于已有的小文件,我们可以通过以下几种方案解决
- 使用hadoop archive命令把小文件进行归档。
- 重建表,建表时减少reduce数量。
- 通过参数进行调节,设置map/reduce端的相关参数,如下:
C、设置map输入合并小文件的相关参数:
1 | [java] view plain copy |
D、Write good SQL
E、存储格式
可以使用列裁剪,分区裁剪,orc,parquet等存储格式。参考Hive中的存储格式
Hive支持ORCfile,这是一种新的表格存储格式,通过诸如谓词下推,压缩等技术来提高执行速度提升。对于每个HIVE表使用ORCFile应该是一件容易的事情,并且对于获得HIVE查询的快速响应时间非常有益。
作为一个例子,考虑两个大表A和B(作为文本文件存储,其中一些列未在此处指定,即行试存储的缺点)以及一个简单的查询,如:
1 | SELECT A.customerID, A.name, A.age, A.address join |
此查询可能需要很长时间才能执行,因为表A和B都以TEXT形式存储,进行全表扫描。将这些表格转换为ORCFile格式通常会显着减少查询时间:ORC支持压缩存储(使用ZLIB或如上所示使用SNAPPY),但也支持未压缩的存储。
1 | CREATE TABLE A_ORC ( |
F、压缩格式
大数据场景下存储格式压缩格式尤为关键,可以提升计算速度,减少存储空间,降低网络io,磁盘io,所以要选择合适的压缩格式和存储格式,参考Hadoop中的压缩与解压详解
G、MAP JOIN
mapjoin简单说就是在Map阶段将小表读入内存,顺序扫描大表完成Join。上图是Hive MapJoin的原理图,出自Facebook工程师Liyin Tang的一篇介绍Join优化的slice,从图中可以看出MapJoin分为两个阶段:
1)通过MapReduce Local Task,将小表读入内存,生成HashTableFiles上传至Distributed Cache中,这里会对HashTableFiles进行压缩。
2)MapReduce Job在Map阶段,每个Mapper从Distributed Cache读取HashTableFiles到内存中,顺序扫描大表,在Map阶段直接进行Join,将数据传递给下一个MapReduce任务。
也就是在map端进行join避免了shuffle。
H、引擎的选择
Hive可以使用ApacheTez执行引擎而不是古老的Map-Reduce引擎。
我不会详细讨论在这里提到的使用Tez的许多好处; 相反,我想提出一个简单的建议:
如果它没有在您的环境中默认打开,请在您的Hive查询的开头将以下内容设置为’true’来使用Tez:
设置hive.execution.engine = tez;
通过上述设置,您执行的每个HIVE查询都将利用Tez。
I、Use Vectorization
向量化查询执行通过一次性批量执行1024行而不是每次单行执行,从而提高扫描,聚合,筛选器和连接等操作的性能。
在Hive 0.13中引入,此功能显着提高了查询执行时间,并可通过两个参数设置轻松启用:
设置hive.vectorized.execution.enabled = true;
设置hive.vectorized.execution.reduce.enabled = true;
J、cost based query optimization
Hive 自0.14.0开始,加入了一项”Cost based Optimizer”来对HQL执行计划进行优化,这个功能通 过”hive.cbo.enable”来开启。在Hive 1.1.0之后,这个feature是默认开启的,它可以自动优化HQL中多个JOIN的顺序,并选择合适的JOIN算法。
Hive在提交最终执行前,优化每个查询的执行逻辑和物理执行计划。这些优化工作是交给底层来完成。
根据查询成本执行进一步的优化,从而产生潜在的不同决策:如何排序连接,执行哪种类型的连接,并行度等等。
要使用基于成本的优化(也称为CBO),请在查询开始处设置以下参数:
设置hive.cbo.enable = true;
设置hive.compute.query.using.stats = true;
设置hive.stats.fetch.column.stats = true;
设置hive.stats.fetch.partition.stats = true;
K、模式选择
1)本地模式
对于大多数情况,Hive可以通过本地模式在单台机器上处理所有任务。
对于小数据,执行时间可以明显被缩短。通过set hive.exec.mode.local.auto=true
(默认为false)设置本地模式。
1 | hive> set hive.exec.mode.local.auto; |
2)并行模式
Hive会将一个查询转化成一个或者多个阶段。这样的阶段可以是MapReduce阶段、抽样阶段、合并阶段、limit阶段。
默认情况下,Hive一次只会执行一个阶段,由于job包含多个阶段,而这些阶段并非完全互相依赖,即:这些阶段可以并行执行,可以缩短整个job的执行时间。设置参数:对于小数据,执行时间可以明显被缩短。通过set hive.exec.parallel=true
,或者通过配置文件来完成。
1 | hive> set hive.exec.parallel; |
3)严格模式
Hive提供一个严格模式,可以防止用户执行那些可能产生意想不到的影响查询:
1 | 通过设置Hive.mapred.modestrict来完成 |
L、JVM重用
Hadoop通常是使用派生JVM来执行map和reduce任务的。这时JVM的启动过程可能会造成相当大的开销,尤其是执行的job包含偶成百上千的task任务的情况。JVM重用可以使得JVM示例在同一个job中时候使用N此。通过参数mapred.job.reuse.jvm.num.tasks
来设置。
M、推测执行
Hadoop推测执行可以触发执行一些重复的任务,尽管因对重复的数据进行计算而导致消耗更多的计算资源,
不过这个功能的目标是通过加快获取单个task的结果以侦测执行慢的TaskTracker加入到没名单的方式来提高整体的任务执行效率。
Hadoop的推测执行功能由2个配置控制着,通过mapred-site.xml中配置
mapred.map.tasks.speculative.execution=true
mapred.reduce.tasks.speculative.execution=true