当前位置:首页 > 行业动态 > 正文

MapReduce中的Map阶段如何处理输入数据?

MapReduce是一种编程模型,用于处理和生成大数据集。

MapReduce是一种用于处理和生成大规模数据集的编程模型,它通过将任务分解成小块并行处理,再汇归纳果来高效地完成数据处理任务,在MapReduce框架中,输入数据通常以键值对(key-value pair)的形式进行处理,本文将详细探讨MapReduce框架中的输入机制,特别是如何通过InputFormat接口和RecordReader类来读取和处理输入数据。

MapReduce中的Map阶段如何处理输入数据?  第1张

MapReduce输入

MapReduce框架运转在<key, value>键值对上,即框架把作业的输入看成是一组<key, value>键值对,同样也产生一组<key, value>键值对作为作业的输出,在整个标准的流程中,会有三组<key, value>键值对类型的存在。

InputFormat接口

InputFormat接口决定了输入文件如何被Hadoop分块(split up)与接受,它包含两个抽象方法:

1、getSplits(JobContext context):该方法负责将一个大的输入数据逻辑分成许多片(InputSplit),每个切片由一个MapTask处理,如果数据库表有100条数据,按照主键ID升序存储,假设每20条分成一片,那么这个List的大小就是5。

2、createRecordReader(InputSplit split, TaskAttemptContext context):该方法根据InputSplit定义的方法,返回一个能够读取分片记录的RecordReader,RecordReader将Key-Value对从InputSplit中正确读出来。

RecordReader类

RecordReader类用于从InputSplit中读取数据,常见的RecordReader实现包括TextRecordReader、LineRecordReader等,TextInputFormat类实现了InputFormat类的createRecordReader方法,但是将计算input split个数的工作交给父类FileInputFormat处理。

输入数据的处理流程

1、数据分割:在进行map计算之前,MapReduce会根据输入文件计算输入分片(Input Split),这些分片往往和HDFS的block关系密切,如果设定HDFS块的大小是64MB,而输入文件大小分别是3MB、65MB和127MB,那么MapReduce会把3MB文件作为一个输入分片,65MB分成两个输入分片,127MB分成两个输入分片。

2、读取数据:Map任务会从InputSplit中读取数据,默认情况下,每一行文本内容解析成键值对,其中key是每一行的起始位置(单位是字节),value是本行的文本内容。

3、Map阶段处理:Mapper任务执行过程中,会对每一个<k, v>键值对调用一次map方法,并可能输出零个或多个键值对。

4、Combiner阶段(可选):Combiner是对Map端的输出先做一次合并,以减少在Map和Reduce节点之间的数据传输量,提高网络IO性能。

5、Shuffle和Sort阶段:将Map的输出作为Reduce的输入的过程称为shuffle,在这个过程中,Map的输出会根据键进行排序,并将相同键的值发送到同一个Reducer。

6、Reduce阶段:Reducer任务会主动从Mapper任务复制其输出的键值对,并进行合并和排序后,调用reduce方法进行处理。

示例代码

以下是一个简单的MapReduce程序示例,展示了如何使用TextInputFormat和TextOutputFormat来处理文本文件:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] words = value.toString().split("\s+");
            for (String str : words) {
                word.set(str);
                context.write(word, one);
            }
        }
    }
    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

在这个示例中,TokenizerMapper类负责将输入文本按空格分割成单词,并输出<单词, 1>的键值对。IntSumReducer类则负责将所有相同的单词对应的值相加,输出<单词, 总次数>的键值对。

常见问题解答

Q1: MapReduce中的InputFormat和RecordReader有什么区别?

A1: InputFormat接口用于决定输入文件如何被Hadoop分块(split up)与接受,它包含两个抽象方法:getSplits用于将大数据逻辑分成许多片,createRecordReader用于创建RecordReader实例从InputSplit中读取数据,RecordReader类则具体实现了从InputSplit中读取数据的功能,将数据转换为适合Map任务处理的键值对。

Q2: MapReduce中的Combiner是什么?它有什么作用?

A2: Combiner是MapReduce中的一个优化手段,它也是一种Reduce操作,但运行在Map节点上,Combiner的作用是对Map端的输出先做一次合并,以减少在Map和Reduce节点之间的数据传输量,从而提高网络IO性能,Combiner的输出类型必须与Reducer的输入类型一致,且不能影响最终的业务逻辑。

小编有话说

MapReduce作为一种强大的分布式计算框架,为处理大规模数据集提供了有效的解决方案,通过理解其输入机制,特别是InputFormat接口和RecordReader类的工作原理,我们可以更好地设计和优化MapReduce程序,以提高数据处理的效率和性能,希望本文能够帮助读者深入理解MapReduce的输入过程,并在实际应用中发挥其优势。

0