当前位置:首页 > 行业动态 > 正文

MapReduce如何合并小文件?探索高效合并策略!

mapreduce合并文件可以通过使用hadoop的getmerge命令或者编写自定义的 mapreduce程序来实现。

在大数据处理中,MapReduce 是一个强大的工具,但当面对大量小文件时,其效率会受到显著影响,为了提高处理效率和减少资源消耗,合并小文件是一个常见且必要的步骤。

MapReduce如何合并小文件?探索高效合并策略!  第1张

一、小文件合并的必要性与方法

1、必要性:HDFS 作为 Hadoop 的分布式文件系统,默认将每个文件存储为一个独立的数据块(默认大小为 128MB),这意味着,如果有大量的小文件(例如每个文件小于 128MB),每个文件都会占用一个数据块,导致 NameNode 的元数据压力增大,从而影响整个集群的性能,大量的小文件会导致 MapReduce 作业生成过多的 map 任务,增加任务调度和管理的开销,合并小文件是优化 HDFS 性能和 MapReduce 作业效率的重要手段。

2、方法

数据采集阶段合并:在数据采集的时候,客户端就将小文件或小批数据合成大文件再上传 HDFS,这种方法可以从根本上减少小文件的数量,提高后续数据处理的效率。

HDFS 上使用 MapReduce 程序合并:在业务处理之前,在 HDFS 上使用 MapReduce 程序对小文件进行合并,这种方法适用于已经存在于 HDFS 上的小文件,可以通过自定义 InputFormat 和 RecordReader 来实现。

MapReduce 处理时合并:在 MapReduce 处理时,可采 用 CombineTextInputFormat 提高效率,这种方法通过在 map 端进行数据聚合,减少 reduce 阶段的输入数据量,从而提高整体处理效率。

二、自定义 InputFormat 合并小文件

自定义 InputFormat 是合并小文件的一种有效方法,通过实现 FileInputFormat 和 RecordReader,可以实现对小文件的完整读取和封装,然后在输出时使用 SequenceFileOutputFormat 将多个小文件合并成一个大文件。

1、自定义 WholeFileInputFormat

isSplitable 方法:重写 isSplitable 方法,设置小文件不进行切片,这样可以确保每个小文件作为一个整体被读取。

createRecordReader 方法:重写 createRecordReader 方法,返回自定义的 WholeFileRecordReader。

2、自定义 WholeFileRecordReader

initialize 方法:初始化方法,设置文件切片和配置信息。

nextKeyValue 方法:核心逻辑,读取文件内容并封装到 BytesWritable 中。

getCurrentKey 方法:返回当前键,通常为空或文件名。

getCurrentValue 方法:返回当前值,即文件内容。

getProgress 方法:返回进度,通常为 0 或 1。

close 方法:关闭文件流。

三、代码实现

以下是一个简单的示例代码,展示了如何实现自定义的 WholeFileInputFormat 和 WholeFileRecordReader:

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable>{
    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }
    @Override
    public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        WholeFileRecordReader reader = new WholeFileRecordReader();
        reader.initialize(inputSplit, taskAttemptContext);
        return reader;
    }
}
class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable>{
    private FileSplit fileSplit;
    private Configuration configuration;
    private BytesWritable bytesWritable = new BytesWritable();
    private boolean nextKeyValue = false;
    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        this.fileSplit = (FileSplit)split;
        this.configuration = context.getConfiguration();
    }
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (!nextKeyValue) {
            byte[] contents = new byte[(int)fileSplit.getLength()];
            FileSystem fs = fileSplit.getPath().getFileSystem(configuration);
            FSDataInputStream fis = fs.open(fileSplit.getPath());
            IOUtils.readFully(fis, contents, 0, contents.length);
            bytesWritable.set(contents, 0, contents.length);
            nextKeyValue = true;
            return true;
        }
        return false;
    }
    @Override
    public NullWritable getCurrentKey() throws IOException, InterruptedException {
        return NullWritable.get();
    }
    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return bytesWritable;
    }
    @Override
    public float getProgress() throws IOException, InterruptedException {
        return nextKeyValue ? 1 : 0;
    }
    @Override
    public void close() throws IOException {
    }
}

四、Hive 小文件合并策略

Hive 在处理大量小文件时,也会面临与 MapReduce 类似的问题,Hive 提供了一些参数来控制小文件的合并策略,以提高查询性能和减少 NameNode 的压力。

1、hive.mergejob.maponly:默认为 true,hadoop 版本支持 CombineFileInputFormat,则启动 Maponly job for merge,否则启动 MapReduce merge job,map 端 combine file 是比较高效的做法。

2、hive.merge.mapfiles:默认为 true,正常的 maponly job 后,是否启动 merge job 来合并 map 端输出的结果。

3、hive.merge.mapredfiles:默认为 false,正常的 mapreduce job 后,是否启动 merge job 来合并 reduce 端输出的结果,建议开启。

4、hive.merge.smallfiles.avgsize:默认为 16MB,如果不是 partitioned table,输出 table 文件的平均大小小于这个值,启动 merge job,如果是 partitioned table,则分别计算每个 partition 下文件平均大小,只 merge 平均大小小于这个值的 partition,这个值只有当 hive.merge.mapfiles 或 hive.merge.mapredfiles 设定为 true 时,才有效。

5、hive.exec.reducers.bytes.per.reducer:默认为 1G,如果用户不主动设置 mapred.reduce.tasks 数,则会根据 input directory 计算出所有读入文件的 input summary size,然后除以这个值算出 reduce number。

6、hive.merge.size.per.task:默认为 256MB,merge job 后每个文件的目标大小(targetSize),用之前 job 输出文件的 total size 除以这个值,就可以决定 merge job 的 reduce 数目,merge job 的 map 端相当于 identity map,shuffle 到 reduce,每个 reduce dump 一个文件,通过这种方式控制文件的数量和大小。

7、mapred.max.split.size:默认为 256MB,一个 split 最大的大小。

8、mapred.min.split.size.per.node:默认为 1 byte,一个节点上(datanode)split 至少的大小。

9、mapred.min.split.size.per.rack:默认为 1 byte,同一个交换机(rack locality)下 split 至少的大小,通过这三个数的调节,组成了一串 CombineFileSplit 用户可以通过增大 mapred.max.split.size 的值来减少 Map Task 数量。

MapReduce 合并小文件是一项重要的优化措施,可以有效提高数据处理效率和集群性能,通过合理选择合并方法和策略,可以显著减少 NameNode 的元数据压力,降低 MapReduce 作业的资源消耗,提高整体数据处理速度。

0