Hive性能优化之Combine小文件

生产环境中,Flume一般会10分钟归并一个日志文件。每个日志文件都只有10MB~20MB,压缩后页只有2~5MB。Hive面对这样的小文件,该怎么进行性能优化呢?

Job预处理

Hive执行每条SQL前,都会根据SQL查询范围来统计需要扫描的分区,之后再遍历这些分区来收集全部数据文件,这些操作都是基于NameNode进行的,
这个过程中有两点需要注意:

  • 尽可能小的明确指定分区范围,一般都采用日期分区,那么在查询条件中就要尽可能精确的限定日期查询范围。
  • Job预处理结束后,整个MapReduce的输入文件就已经确定了。此时如果删除被Job收集到的文件的话,就会导致MapReduce执行失败,而新增加的文件也不会被Job覆盖到。

Map数量优化

Flume生成的日志小又多,生产环境每个月能新增超过1w个日志文件。如此庞大的日志文件数量,会导致Job中map数量超大而计算速度极慢。

可以通过配置,让Hive采用org.apache.hadoop.hive.ql.io.CombineHiveInputFormat将小文件合并起来,这样一来map数量就大大减少,job耗时大幅提升。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<property>
<name>mapred.max.split.size</name>
<value>128000000</value>
</property>
<property>
<name>mapred.min.split.size.per.node</name>
<value>64000000</value>
</property>
<property>
<name>mapred.min.split.size.per.rack</name>
<value>64000000</value>
</property>
<property>
<name>hive.hadoop.supports.splittable.combineinputformat</name>
<value>true</value>
</property>

hive.hadoop.supports.splittable.combineinputformat配置很重要,因为Hive默认不会合并compression小文件,具体原因见如下Hive源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// Since there is no easy way of knowing whether MAPREDUCE-1597 is present in the tree or not,
// we use a configuration variable for the same
// 如果hive.hadoop.supports.splittable.combineinputformat为false的话,就会进入下面的if
if (this.mrwork != null && !this.mrwork.getHadoopSupportsSplittable()) {
// The following code should be removed, once
// https://issues.apache.org/jira/browse/MAPREDUCE-1597 is fixed.
// Hadoop does not handle non-splittable files correctly for CombineFileInputFormat,
// so don't use CombineFileInputFormat for non-splittable files

//ie, dont't combine if inputformat is a TextInputFormat and has compression turned on

if (inputFormat instanceof TextInputFormat) {
Queue<Path> dirs = new LinkedList<Path>();
FileStatus fStats = inpFs.getFileStatus(path);

// If path is a directory
if (fStats.isDir()) {
dirs.offer(path);
} else if ((new CompressionCodecFactory(job)).getCodec(path) != null) {
//if compresssion codec is set, use HiveInputFormat.getSplits (don't combine)
// 遇到压缩codec,直接使用HiveInputFormat而不combine
splits = super.getSplits(job, numSplits);
return splits;
}

while (dirs.peek() != null) {
Path tstPath = dirs.remove();
FileStatus[] fStatus = inpFs.listStatus(tstPath, FileUtils.HIDDEN_FILES_PATH_FILTER);
for (int idx = 0; idx < fStatus.length; idx++) {
if (fStatus[idx].isDir()) {
dirs.offer(fStatus[idx].getPath());
} else if ((new CompressionCodecFactory(job)).getCodec(
fStatus[idx].getPath()) != null) {
//if compresssion codec is set, use HiveInputFormat.getSplits (don't combine)
// 遇到压缩codec,直接使用HiveInputFormat而不combine
splits = super.getSplits(job, numSplits);
return splits;
}
}
}
}
}

必须配置hive.hadoop.supports.splittable.combineinputformat为true,才会让Hive合并compression小文件。