MapReduce是一种用于处理大规模数据集的编程模型,其输入数据通常存储在分布式文件系统(如Hadoop分布式文件系统HDFS)中,MapReduce框架通过将输入数据分成多个逻辑分片(Input Splits),每个分片由一个map任务处理,下面详细介绍MapReduce中的输入格式、分片机制和自定义InputFormat的实现方法:
1、输入格式
InputFormat类:InputFormat是MapReduce框架中用来读取输入数据的抽象类,它负责生成输入分片(Input Splits)并将它们分割成记录(Record),常见的子类包括TextInputFormat、KeyValueTextInputFormat、NLineInputFormat等。
FileInputFormat:所有基于文件的InputFormat类的基类,提供两个主要功能:指出作业的输入文件位置,为输入文件生成分片。
CombineFileInputFormat:用于处理大量小文件的场景,将这些小文件打包到一个分片中,以提高资源利用率。
2、输入分片与记录
输入分片(Input Split):一个输入分片是一个map任务处理的逻辑单元,包含数据引用而非实际数据,InputSplit由InputFormat创建并被MapReduce框架用来分配map任务。
记录(Record):输入分片中的一个键值对,用于存储在map的上下文中,RecordReader从输入分片中读取记录,类似于一个迭代器。
3、自定义InputFormat
需求背景:MapReduce框架内置的InputFormat可能无法满足所有需求,此时可以通过自定义InputFormat来实现特定的数据处理逻辑。
实现步骤:继承FileInputFormat类,重写createRecordReader()方法和isSplitable()方法,createRecordReader()方法用于创建自定义的RecordReader,isSplitable()方法用于决定文件是否可切片。
示例代码:以下是一个简单的自定义InputFormat示例,用于将小文件合并成一个SequenceFile格式的大文件。
import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import java.io.IOException; public class MyInputFormat extends FileInputFormat<NullWritable, BytesWritable> { @Override public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { MyRecordReader myRecordReader = new MyRecordReader(); myRecordReader.initialize(inputSplit, taskAttemptContext); return myRecordReader; } @Override protected boolean isSplitable(JobContext context, Path filename) { return false; // 设置为false表示不可切片 } }
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; public class MyRecordReader extends RecordReader<NullWritable, BytesWritable> { private FileSplit fileSplit; private Configuration configuration; private BytesWritable bytesWritable; private boolean flag = false; @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { this.fileSplit = (FileSplit) inputSplit; this.configuration = taskAttemptContext.getConfiguration(); this.bytesWritable = new BytesWritable(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!flag) { long length = fileSplit.getLength(); byte[] bytes = new byte[(int) length]; Path path = fileSplit.getPath(); FileSystem fs = path.getFileSystem(configuration); FSDataInputStream in = fs.open(path); IOUtils.readFully(in, bytes, 0, bytes.length); in.close(); bytesWritable.set(bytes, 0, bytes.length); flag = true; } return !flag; } // 其他方法省略... }
自定义类 | 说明 |
MyInputFormat | 继承自FileInputFormat,用于合并小文件 |
MyRecordReader | 自定义RecordReader,用于读取合并后的文件数据 |
常见问题解答
问题一:如何设置MapReduce作业的InputFormat?
答:可以通过调用job.setInputFormatClass(XXXInputFormat.class)
来设置MapReduce作业的InputFormat,设置使用自定义的MyInputFormat:
job.setInputFormatClass(MyInputFormat.class);
问题二:如何处理大量小文件以提高MapReduce作业的效率?
答:可以使用CombineFileInputFormat将多个小文件打包成一个大文件,从而减少Map任务的数量,提高资源利用率,具体实现方法是设置最大输入分片大小,并使用相应的配置参数:
job.setInputFormatClass(CombineTextInputFormat.class); CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 设置最大分片大小为4MB
MapReduce的输入格式和分片机制是其高效处理大规模数据的关键,通过合理选择和自定义InputFormat,可以优化MapReduce作业的性能和资源利用率。