如何利用Java API实现MapReduce操作?
- 行业动态
- 2024-10-17
- 1
MapReduce简介
MapReduce是一种面向大数据平台的分布式并行计算框架,它允许用户在不了解分布式并行编程的情况下,将程序运行在分布式系统上,通过提供并行计算框架,MapReduce能自动完成计算任务的并行处理、数据划分、任务分配和结果收集,使开发人员只需关注业务逻辑实现,从而大大降低了开发负担。
MapReduce模型
MapReduce编程模型通常由三部分组成:Map阶段、Reduce阶段和Driver类,Map和Reduce部分负责业务逻辑的实现,而Driver类则负责调用任务并执行MapReduce程序。
1. Map阶段实现
Map函数默认按行从HDFS读取数据进行处理,即从HDFS一行一行地将数据读取过来,读取的数据格式为<行号,行内容>,然后按一定的分隔符切割,最后以keyvalue的格式输出,在编写代码时,需要继承MapReduce的Mapper类,并重写map方法,WordCount案例中,Map阶段的代码如下:
package com.qwer.mapreduce.wordcount; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private Text outK = new Text(); private IntWritable outV = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split(" "); for (String word : words) { outK.set(word); context.write(outK, outV); } } }
2. Reduce阶段实现
Reduce阶段的输入数据类型和Map阶段的输出数据类型是一样的,在拿到输入数据后,可以进行业务逻辑的编写,WordCount案例中,Reduce阶段的代码如下:
package com.qwer.mapreduce.wordcount; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable outV = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } outV.set(sum); context.write(key, outV); } }
3. Driver类实现
Driver类是驱动类,负责调用任务并执行MapReduce程序,其程序大概可以分为八个步骤:获取job、设置jar包路径、关联Mapper和Reducer、设置Map输出的KV类型、设置最终输出的KV类型、设置输入路径、设置输出路径、提交任务,WordCount案例中,Driver类的代码如下:
package com.qwer.mapreduce.wordcount; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(WordCountMapper.class); job.setCombinerClass(WordCountReducer.class); job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
MapReduce编程规范
MapReduce开发共有八个步骤,其中Map阶段有2个步骤、Shuffle阶段有4个步骤、Reduce阶段有2个步骤,具体如下:
Map阶段:设置InputFormat类,将数据切分为键值对;自定义Map逻辑,将结果转换到另外的键值对并输出。
Shuffle阶段:对输出的键值对进行分区;对不同分区的数据按照相同的key排序;(可选)对分组过的数据初步规约,降低数据的网络拷贝;对数据进行分组,相同的key的value放入一个集合中。
Reduce阶段:对多个Map任务的结果进行排序和合并,编写Reduce函数实现自己的逻辑,对输入的键值对进行处理,转为新的键值对输出;设置OutputFormat类处理并保存Reduce输出的键值对数据。
FAQs
1、问题一:MapReduce中的Map和Reduce函数分别承担什么角色?
答案:Map函数负责处理原始数据、为数据打标签、对数据进行分发,它将输入的原始数据进行一定的处理,比如筛选过滤选出所需要的数据集,然后对这部分数据按照业务的逻辑进行打标签(key),等待被分发给对应的Reduce进行处理,而Reduce函数则负责接收Map函数输出的数据,并对这些数据进行汇总、合并等操作,最终得到所需的结果。
2、问题二:为什么使用MapReduce进行大数据处理?
答案:MapReduce具有易于编程、高拓展性、高容错性和高吞吐量等优点,它提供了大量方便开发的接口,使得完全独立完成一个MapReduce程序变得相对简单,通过增加机器数量可以轻松提高计算性能,MapReduce框架提供了多种有效的错误检测和恢复机制,确保了计算过程的高容错性,最重要的是,MapReduce可以对PB级以上的数据进行离线计算,满足了大数据处理的需求。
本站发布或转载的文章及图片均来自网络,其原创性以及文中表达的观点和判断不代表本站,有问题联系侵删!
本文链接:http://www.xixizhuji.com/fuzhu/7628.html