MapReduce中的Map阶段如何处理输入数据?
- 行业动态
- 2024-12-31
- 2174
MapReduce是一种编程模型,用于处理和生成大数据集。
MapReduce是一种用于处理和生成大规模数据集的编程模型,它通过将任务分解成小块并行处理,再汇归纳果来高效地完成数据处理任务,在MapReduce框架中,输入数据通常以键值对(key-value pair)的形式进行处理,本文将详细探讨MapReduce框架中的输入机制,特别是如何通过InputFormat接口和RecordReader类来读取和处理输入数据。
MapReduce输入
MapReduce框架运转在<key, value>键值对上,即框架把作业的输入看成是一组<key, value>键值对,同样也产生一组<key, value>键值对作为作业的输出,在整个标准的流程中,会有三组<key, value>键值对类型的存在。
InputFormat接口
InputFormat接口决定了输入文件如何被Hadoop分块(split up)与接受,它包含两个抽象方法:
1、getSplits(JobContext context):该方法负责将一个大的输入数据逻辑分成许多片(InputSplit),每个切片由一个MapTask处理,如果数据库表有100条数据,按照主键ID升序存储,假设每20条分成一片,那么这个List的大小就是5。
2、createRecordReader(InputSplit split, TaskAttemptContext context):该方法根据InputSplit定义的方法,返回一个能够读取分片记录的RecordReader,RecordReader将Key-Value对从InputSplit中正确读出来。
RecordReader类
RecordReader类用于从InputSplit中读取数据,常见的RecordReader实现包括TextRecordReader、LineRecordReader等,TextInputFormat类实现了InputFormat类的createRecordReader方法,但是将计算input split个数的工作交给父类FileInputFormat处理。
输入数据的处理流程
1、数据分割:在进行map计算之前,MapReduce会根据输入文件计算输入分片(Input Split),这些分片往往和HDFS的block关系密切,如果设定HDFS块的大小是64MB,而输入文件大小分别是3MB、65MB和127MB,那么MapReduce会把3MB文件作为一个输入分片,65MB分成两个输入分片,127MB分成两个输入分片。
2、读取数据:Map任务会从InputSplit中读取数据,默认情况下,每一行文本内容解析成键值对,其中key是每一行的起始位置(单位是字节),value是本行的文本内容。
3、Map阶段处理:Mapper任务执行过程中,会对每一个<k, v>键值对调用一次map方法,并可能输出零个或多个键值对。
4、Combiner阶段(可选):Combiner是对Map端的输出先做一次合并,以减少在Map和Reduce节点之间的数据传输量,提高网络IO性能。
5、Shuffle和Sort阶段:将Map的输出作为Reduce的输入的过程称为shuffle,在这个过程中,Map的输出会根据键进行排序,并将相同键的值发送到同一个Reducer。
6、Reduce阶段:Reducer任务会主动从Mapper任务复制其输出的键值对,并进行合并和排序后,调用reduce方法进行处理。
示例代码
以下是一个简单的MapReduce程序示例,展示了如何使用TextInputFormat和TextOutputFormat来处理文本文件:
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().split("\s+"); for (String str : words) { word.set(str); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
在这个示例中,TokenizerMapper类负责将输入文本按空格分割成单词,并输出<单词, 1>的键值对。IntSumReducer类则负责将所有相同的单词对应的值相加,输出<单词, 总次数>的键值对。
常见问题解答
Q1: MapReduce中的InputFormat和RecordReader有什么区别?
A1: InputFormat接口用于决定输入文件如何被Hadoop分块(split up)与接受,它包含两个抽象方法:getSplits用于将大数据逻辑分成许多片,createRecordReader用于创建RecordReader实例从InputSplit中读取数据,RecordReader类则具体实现了从InputSplit中读取数据的功能,将数据转换为适合Map任务处理的键值对。
Q2: MapReduce中的Combiner是什么?它有什么作用?
A2: Combiner是MapReduce中的一个优化手段,它也是一种Reduce操作,但运行在Map节点上,Combiner的作用是对Map端的输出先做一次合并,以减少在Map和Reduce节点之间的数据传输量,从而提高网络IO性能,Combiner的输出类型必须与Reducer的输入类型一致,且不能影响最终的业务逻辑。
小编有话说
MapReduce作为一种强大的分布式计算框架,为处理大规模数据集提供了有效的解决方案,通过理解其输入机制,特别是InputFormat接口和RecordReader类的工作原理,我们可以更好地设计和优化MapReduce程序,以提高数据处理的效率和性能,希望本文能够帮助读者深入理解MapReduce的输入过程,并在实际应用中发挥其优势。
本站发布或转载的文章及图片均来自网络,其原创性以及文中表达的观点和判断不代表本站,有问题联系侵删!
本文链接:https://www.xixizhuji.com/fuzhu/378956.html