当前位置:首页 > 行业动态 > 正文

MapReduce中的Map链是如何工作的?

MapReduce是一个编程模型,包括一个Map(映射)阶段和一个Reduce(归约)阶段。

链式MapReduce操作的概念

MapReduce中的Map链是如何工作的?  第1张

在Hadoop MapReduce中,一个作业通常包含一个Mapper类和一个Reducer类,这种结构在处理简单任务时非常有效,但在面对复杂业务逻辑时可能显得力不从心,为了解决这个问题,Hadoop提供了链式MapReduce操作,允许在一个作业中包含多个Mapper类,但至多只能有一个Reducer类,通过增加Mapper的数量,可以更灵活地处理复杂的数据处理需求。

链式MapReduce实战案例

需求分析

假设有一批用户点击网页的日志数据,需要统计每个网页被点击的次数,但要过滤掉黑名单用户的点击数据,具体需求包括:

1、加载黑名单数据。

2、过滤掉黑名单用户的点击记录。

3、将过滤后的数据映射为键值对并进行聚合。

4、过滤掉点击次数少于3次的统计信息。

数据准备

点击日志(clicklog.txt):包含用户ID和点击的网页ID。

黑名单数据(blacklist.txt):包含需要过滤的用户ID。

示例数据如下:

// clicklog.txt
20190612 1315 a1
20190612 2654 b1
...
// blacklist.txt
1111 man 23
2333 man 21
...

代码编写

1、第一个Map类:加载黑名单数据并将其存入内存,然后读取点击日志,将用户ID与黑名单进行左外连接,如果用户在黑名单中,将其标记为黑名单;否则标记为NULL。

2、第二个Map类:过滤掉黑名单用户的点击记录,只保留非黑名单用户的点击记录,并将其映射为键值对。

3、Reducer类:对过滤后的点击记录进行聚合,计算每个网页的点击总数。

4、第三个Map类:过滤掉点击次数少于3次的统计结果,并将最终结果写入HDFS。

以下是部分代码示例:

// 第一个Map类
public static class FilterMapper1 extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final Text outKey = new Text();
    private final IntWritable outValue = new IntWritable(1);
    private HashSet<String> blacklist = new HashSet<>();
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        BufferedReader reader = new BufferedReader(new FileReader("blacklist.txt"));
        String line;
        while ((line = reader.readLine()) != null) {
            blacklist.add(line.split("t")[0]); // 添加黑名单用户ID到集合中
        }
    }
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] fields = value.toString().split("t");
        String userId = fields[1];
        if (!blacklist.contains(userId)) {
            outKey.set(fields[2]); // 设置页面ID为key
            context.write(outKey, outValue); // 输出键值对
        }
    }
}
// 第二个Map类
public static class FilterMapper2 extends Mapper<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void map(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        if (sum >= 3) {
            context.write(key, new IntWritable(sum)); // 只输出点击次数大于等于3的键值对
        }
    }
}
// Reducer类
public static class SumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        context.write(key, new IntWritable(sum)); // 输出聚合结果
    }
}
// 第三个Map类
public static class FilterMapper3 extends Mapper<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void map(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        if (sum >= 3) {
            context.write(key, new IntWritable(sum)); // 只输出点击次数大于等于3的键值对
        }
    }
}

测试与归纳

在完成代码编写后,需要进行打包上传和测试,测试过程中要确保所有步骤都能正确执行,并验证最终结果是否符合预期,通过链式MapReduce操作,可以有效地处理复杂的数据处理任务,提高系统的灵活性和效率。

FAQs

1、问题:为什么链式MapReduce操作中的Reducer只能有一个?

答案:在链式MapReduce操作中,Reducer只能有一个是因为Reduce阶段的主要作用是对中间结果进行汇总和聚合,如果存在多个Reducer,会导致结果不一致和难以管理,设计上限制了只能有一个Reducer来保证结果的正确性和一致性。

2、问题:链式MapReduce操作适用于哪些场景?

答案:链式MapReduce操作适用于需要对数据进行多阶段处理的复杂场景,例如数据清洗、过滤、转换和聚合等,通过增加多个Mapper,可以在不同阶段对数据进行不同的处理,从而实现复杂的业务逻辑,典型的应用场景包括日志分析、数据清洗和复杂的数据统计分析等。

阶段 输入 Map函数 输出
Map 1 输入数据集(文本文件) 读取数据行,将每行分割成单词,输出键值对(单词,1) {word1, 1}, {word2, 1}, …, {wordN, 1}
Map 2 Map 1的输出 对Map 1的输出进行进一步处理,将相同的单词合并计数,输出键值对(单词,计数) {word1, count1}, {word2, count2}, …, {wordN, countN}
Map 3 Map 2的输出 对Map 2的输出进行其他处理,按单词长度排序,输出键值对(单词长度,{单词,计数}) {length1, {word1, count1}}, {length2, {word2, count2}}, …, {lengthN, {wordN, countN}}

这个归纳展示了MapReduce任务中一个Map链的三个Map阶段,每个Map阶段对输入数据集进行不同的处理,最终生成一个包含多个键值对的输出,在实际应用中,MapReduce任务可能包含更多Map阶段,每个阶段对输入数据进行不同的转换和计算。

0