【Dr.Elephant中文文档-8】调优建议

你可以使用Dr. Elephant来分析你的作业(只需在搜索页贴入你的作业ID),就可以知道你的作业有哪些地方需要优化。

加速你的作业流

一般对于特定的作业,最好有自己的配置。大多数情况下,作业的默认配置无法提供最佳性能。尽管作业调优比较费劲,但一些简单的调整往往也能带来不错的效果。

需要特别注意的是mapperreducer的数量,io和内存使用的配置,以及生成的文件数量。对这几个参数进行调整,让参数更适合当前的任务,可以极大的提升任务的执行性能。

Apache的官网中Hadoop Map/Reduce Tutorial这篇文章提供很多详细且有用的调试建议,有兴趣的可以仔细看看。

常规建议

逐步调优很重要

对于Pig作业来说,如果使用默认参数来设置reducer的数量,这对作业的性能可能是致命的。一般来说,对每个Pig作业,都花一些时间来调优参数PARALLEL是非常值得做的。例如:

1
memberFeaturesGrouped = GROUP memberFeatures BY memberId PARALLEL 90;

文件数vs块数量

为了防止NameNode崩溃,存大文件比小文件更合理。NameNode每存储一个文件大概消耗70 bytes,每存储一个块大概消耗60 byte。一般情况下,对于任务来说,使用一个较大的文件要比使用十个小文件的效率高一些。在大规模集群下,这10 byte的差距会越来越大。此外在许多情况下,1个大文件比10个小文件操作起来更高效。

Java任务内存管理

默认情况下,每个map/reduce作业可以分配最大2G的内存。对于java任务,这2G的空间既包括1G的堆内存,又包括0.5-1G的非堆内存。对于有些任务来说,默认的空间分配可能是不够用的。下面列举了一些能够减少内存使用的技巧:

UseCompressedOops

32位JVM使用32bit无符号整型来定位内存区域,最大和定义的堆内存为(2^32 -1) = 4GB。64位的JVM虚拟机使用64bit的无符号Long整型来定位内存区域,最大可定义的堆内存大小为(2^64 - 1) = 16艾字节。虽然定义的堆内存增加了,但是用Long代替int型,所需内存空间也增加了。大约为原来的1.5倍。这使得你可以突破1G堆空间的限制,对此你可以做些什么呢?现在所有的JVM都支持UseCompressedOops选型,在某些情况下,他使用32bit的空间代替64bit空间来保存内存定位信息。这将可以减少内存的占用而不用回到32bit的情况。你可以在azkaban的作业文件中增加以下选项来实现:

1
hadoop-inject.mapreduce.(map|reduce).java.opts=-Xmx1G -XX:+UseCompressedOops

注意azkaban默认会使用自定义的属性覆盖掉默认配置属性,而不是将自定义的部分添加到mapred-site.xml默认文件中。你需要确认CompressedOops选项和其他默认的配置都是有效的。需要确认的是:”-Xmx1G”是配置文件mapred-site.xml中的,而其他的配置文件是我们自定义的。

UseCompressedStrings

这个选项会将String类型的变量转化为byte[]类型来保存。如果一个任务中使用了大量的String类型变量,那么这个选项将会极大的节约内存使用。在参数mapreduce.(map|reduce).java.opts配置中添加-XX:+UseCompressedString就会激活这个选项。每个作业分配的虚拟内存空间是需要的物理内存空间的2.1倍。如果我们程序抛出以下错误:

1234
Container [pid=PID,containerID=container_ID] is running beyond virtual memory limits. Current usage: 365.1 MB of 1GB physical memory used; 3.2 GB of 2.1 GB virtual memory used. Killingcontainer

你就可以尝试使用这个选项来对程序进行优化。

关键调优参数

Mappers

mapreduce.input.fileinputformat.split.minsize

这个参数表示输入到map中的每个文件块切分的大小的最小值。通过增加dfs.blocksize的块大小,可以增加每个map中输入文件块的大小,从而减少map的数量。这是因为如果说你设置mapreduce.input.fileinputformat.split.minsize的大小为HDFS块大小(dfs.blocksize)的4倍时,那么输入到每个map的数量就是4倍的dfs.blocksize,这样就减少了map的数量。如果把这个值设置为256MB,那么输入的文件大小就是268435456bit。

mapreduce.input.fileinputformat.split.maxsize

这个参数表示当使用CombineFileInputFormatMultiFileInputFormat时,输入到map的每个文件的最大值。当这个值小于dfs.blocksize时,会增加作业的mapper的数量。这是因为如果说你设置mapreduce.input.fileinputformat.split.minsize的大小为HDFS块大小(dfs.blocksize)的1/4时,这样就将输入到每个map的文件大小限制为dfs.blocksize的1/4,就增加了map的数量。这个值是输入文件切分的值。因此要设置为256MB,你将指定值为268435456。需要注意的是,如果在使用CombineFileInputFormat时未设置最大分割大小,则作业将仅使用1个mapper来处理作业。(这可能是你不希望看到的)

Reducers

mapreduce.job.reduces

影响作业流性能的最大杀手之一是reducers的数量。reducers数量过少,可能会使任务时间超过15分钟,而数量过多也同样会有问题。针对每个特定的任务因地制宜的调整reducer数量是一项艺术。下面列了一些方法来帮助我们来设置合适的reducer数量:

reducers越多意味着Namenode上文件越多,过多的小文件可能会导致Namenode宕机。因此如果reduce输出不大(小于512M),可以减少reducers的数量reducers越多意味着每个reducer处理数据的时间越短,如果reducers数量过少。那么每个reducer的消耗时间就会增加,reducer运行越快,就能处理更多的作业。

在大型任务中,清洗(Shuffling)操作的代价是比较高的。我们通过HDFS文件系统的各个计数器可以看到有大量的数据需要在不同的节点间进行交换。我们用20个reducers的作业来做个试验,文件系统的计数器如下:
FileSystemCounter:

FILE_BYTES_READ | 2950482442768HDFS_BYTES_READ | 1223524334581FILE_BYTES_WRITTEN | 5967256875163

我们可以看到有超过1K个map产生了约5TB的中间数据。再看下清洗时间:
Shuffle Finished:
17-Aug-2010 | 13:32:05 | ( 1hrs, 29mins, 56 sec)

Sort Finished:
17-Aug-2010 | 14:18:35 | (46mins, 29sec)

可以看出,大约有5TB的数据花费了1个半小时来清洗,然后有花了46分钟来进行排序。这个时间成本是巨大的。我们希望任务可以在5-15分钟内完成。我们现在已经解决了这个问题。让我们来算一下:使用20个reducers需要消耗360分钟,200个reducers则需要36分钟,400个reducers则需要18分钟。因此围绕这个逻辑来进行改进,将reducers控制在500个以下则出现以下结果:
Shuffle Finished:
17-Aug-2010 | 16:32:32 | ( 12 mins, 46 sec)

Sort Finished:
17-Aug-2010 | 16:32:37 | (4sec)
效果看着还不错,通过一些调优,我们可以缩短一些任务时间。正如你猜的那样,反过来也一样。如果清洗时间很短,CPU使用也很少,那么说明reducer数量过少,合适的配置需要通过不断的试验来确定。

mapreduce.job.reduce.slowstart.completedmaps

这个参数决定了在reducer开始执行之前,至少有多少比例的mapper必须执行结束。默认值是80%。对于大多数任务,调整这个数字的大小可能会带来性能提升。决定这个数字的因素是:

每个reducer接收到多少数据剩下的map每一个map作业需要花费的时间
如果map的输出数据量比较大,一般会建议让reducer提前开始执行去处理这些数据。如果map任务产生的数量不是很大,一般建议让reducer的执行时间开始的晚一些。对于清洗过程的一个粗略的时间估计是:当所有的mapper结束后开始,到第一个reducer开始执行的时间为止。这是reducer获得所有的输入所需要的时间。一般认为,reducers开始执行时间是:最后一个map结束时间+清洗时间。

压缩

mapreduce.map.output.compress

将这个参数设置为True可以将map输出的数据进行压缩。这可以减少节点间的数据交互,但我们必须确保解压缩的时间要比传输时间短,否则会起反作用。对于大型或可以高度压缩的map输出数据,这个参数选型非常有必要,将大量减少清洗时间。对于小型map输出数据集,关闭这个参数,将降低解压缩带来的CPU消耗时间。注意与mapreduce.output.fileoutputformat.compress选项不同,那个参数决定了任务的输出回写到HDFS时是否需要压缩。

内存

mapreduce.(map|reduce).memory.mb

新版Hadoop中增加了堆内存的限制特性。这使得系统在繁忙情况下更好的管理资源分配。默认情况下,系统会分配给Java任务1GB的堆内存,以及0.5-1GB的非堆内存空间。因此,mapreduce.(map|reduce).memory.mb的默认值为2GB。在某些情况下,这个内存值是不够用的。如果只是设置Java的参数-Xmx,任务会被杀死,需要同时设置参数mapreduce.(map|reduce).momory.mb才能有效的提升或者限制内存使用。

进阶

控制io.sort.record.percent的值

参数io.sort.record.percent决定使用多少的间接缓存空间来保存每条和每条记录的元信息。一般来说,这个参数的设置是不合理的。

假设在map作业中使用日志的xml的配置文件:

propertyvaluebufstart45633950bufend68450908kvstart503315kvend838860length838860io.sort.mb256io.sort.record.percent0.05

使用上面的值:
| property | value |
| — | — |
| io.sort.spill.percent (length-kvstart+kvend) / length) | 0.8 |
| Size of meta data is (bufend-bufstat) | 22816958 (MB) |
| Records in memory (length-(kvstart+kvend)) | 671087 |
| Average record size (size/records) | 34 Bytes |
| Record+Metadata | 50 Bytes |
| Records per io.sort.mb (io.sort.mb/(record+metadata)) | 5.12 million |
| Metadata % in io.sort.mb ((records per io.sort.mb)*metadata/io.sort.mb) | 0.32 |

我们可以在256MB的缓存中保存很多数据。但是io.sort.record.percent应该设置为0.32,而不是0.5。当设置为0.5时,记录的元信息的缓存会比记录的缓存更大。

调整这个参数会使map运行的更快,磁盘溢出问题更少,因为io.sort.mb的效率更高了;不会再很快就使用完元数据缓冲区的80%空间。

调整io.sort.record.percent后会使得许多map数据不再溢出到磁盘,减少了55%的磁盘溢。最终,系统节省了30%的CPU资源消耗,和30分钟运行时间。

mapreduce.(map|reduce).speculative

这个参数决定了是否允许相同的map/reduce并行执行。你大概知道当发生数据倾斜时,有些mapperreducer会运行很长时间。在这种情况下,你可能希望通过一些预判来防止数据倾斜。

Pig

Pig中你可以通过增加以下命令来设置HadoopPig

1
SET <property_name> <property_value>;

例如,如果你的map内存不足,可以通过以下命令增加内存

1
SET mapreduce.map.memory.mb 4096;

Azkaban中,可以通过以下命令实现

12
jvm.args=-Dmapreduce.map.memory.mb=4096to your job properties.
pig.maxCombinedSplitSize / 增加或减少mapper数量

默认情况下,Pig合并了小文件(pig.splitCombination默认为true),直到需要切分的HDFS块大小超过512MB。要进一步增加这个值,需要调高pig.maxCombinedSplitSize。相关详情可以看这里。你可以在你的Pig脚本中添加以下命令

1
set pig.maxCombinedSplitSize <size-in-bytes>;

在你的Pig脚本的开头。如果您通过Azkaban执行此Pig脚本,您也可以通过添加以下命令来设置他

1
jvm.args=-Dpig.maxCombinedSplitSize=<size-in-bytes>

在你的作业属性中。如果你的mapper耗时太长,想增加mapper数量,你必须同时设置

12
set pig.maxCombinedSplitSize <size-in-bytes>;set mapreduce.input.fileinputformat.split.maxsize <size-in-bytes>;

如果值小于512MB。是因为Pig拆分块的值超过了pig.maxCombinedSplitSize,拆分大小由以下配置决定

1
max(mapreduce.input.fileinputformat.split.minsize, min(mapreduce.input.fileinputformat.split.maxsize, dfs.blocksize))

提供一些集群设置:

1234
mapreduce.input.fileinputformat.split.minsize=0mapreduce.input.fileinputformat.split.maxsize is unsetdfs.blocksize=536870912 // 512 MBwill evaluate to 512 MB.
Reducers数量

Pig中,你可以基于每个作业控制Reducer的数量,还可以选择为整个脚本设置默认的reducers数量。浏览此处获取更多信息。

Hive

mapreduce.input.fileinputformat.split.minsizemapreduce.input.fileinputformat.split.maxsizemapreduce.input.fileinputformat.split.minsize.per.nodemapreduce.input.fileinputforomat.split.minsize.per.rack

对应Hive来说,你可能需要设置以上4个参数来调整切分大小。
例如:

12345
-- The following are the default configurations on our Hadoop clusters.set mapreduce.input.fileinputformat.split.maxsize                 = 2147483648;set mapreduce.input.fileinputformat.split.minsize                 = 1073741824;set mapreduce.input.fileinputformat.split.minsize.per.node        = 1073741824;set mapreduce.input.fileinputformat.split.minsize.per.rack        = 1073741824;

如果你想增加mapper的数量,就减少这些配置的值;反之,则增加这些配置的值。

文章来源:

Author:hyperxu
link:http://www.hyperxu.com/2019/08/27/dr-elephant-8/