当前位置:首页 > 行业动态 > 正文

MapReduce 是否支持多种编程语言?

MapReduce是一个用于处理大规模数据集的编程模型,支持多语言编程。以下是一个简单的Python编程实例:,,“ python,from mrjob.job import MRJob,from mrjob.step import MRStep,,class MapReduceExample(MRJob):, def steps(self):, return [, MRStep(, mapper=self.mapper,, reducer=self.reducer, ), ],, def mapper(self, _, line):, words = line.split(), for word in words:, yield (word, 1),, def reducer(self, key, values):, yield (key, sum(values)),,if __name__ == '__main__':, MapReduceExample.run(),` ,,这个例子中,我们定义了一个名为MapReduceExample 的类,它继承自MRJob 。在这个类中,我们定义了两个方法:mapper reducer mapper 方法将输入的每一行文本分割成单词,并为每个单词生成一个键值对(单词,1)。reducer 方法接收相同键的所有值,并计算它们的和。我们在if __name__ == ‘__main__’: 语句中调用MapReduceExample.run()`来运行MapReduce作业。

MapReduce多语言编程实例

MapReduce 是否支持多种编程语言?  第1张

MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算,概念"Map(映射)"和"Reduce(归约)",和他们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性,它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上,当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。

Java编程接口实例解析

WordCount:经典的MapReduce程序示例,通过Mapper将文本中的单词提取出来,并计数每个单词出现的次数,最后由Reducer进行汇总。

public class WordCount {
    public static class TokenizerMapper
            extends Mapper<Object, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }
    public static class IntSumReducer
            extends Reducer<Text,IntWritable,Text,IntWritable> {
        private IntWritable result = new IntWritable();
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }
}

Hadoop Streaming实现方式

Hadoop Streaming是Hadoop提供的一个工具,允许用户使用任何可读/写标准输入/输出流的编程语言编写MapReduce程序,以下是用Python和C++实现的WordCount例子。

Python版WordCount

Mapper:读取输入文本,按空格分割单词,输出每个单词及其出现次数。

代码示例

#!/usr/bin/env python
import sys
import string
for line in sys.stdin:
    line = line.strip()
    words = line.split()
    for word in words:
        print('%st%s' % (word, 1))

Reducer:接收Mapper的输出,对相同单词进行累加,输出最终结果。

代码示例

#!/usr/bin/env python
import sys
current_word = None
current_count = 0
word = None
for line in sys.stdin:
    line = line.strip()
    word, count = line.split('t', 1)
    count = int(count)
    if current_word == word:
        current_count += count
    else:
        if current_word:
            print("%s %s" % (current_word, current_count))
        current_word = word
        current_count = count
if current_word == word:
    print("%s %s" % (current_word, current_count))

C++版WordCount

Mapper:读取输入文本,按空格分割单词,输出每个单词及其出现次数。

代码示例

#include <iostream>
#include <string>
using namespace std;
int main() {
    string line;
    while (cin >> line) {
        string word;
        stringstream myStream(line);
        while (myStream >> word) {
            cout << word << "t" << "1" << endl;
        }
    }
    return 0;
}

Reducer:接收Mapper的输出,对相同单词进行累加,输出最终结果。

代码示例

#include <iostream>
#include <string>
using namespace std;
int main() {
    string prevKey;
    int sum = 0;
    string word;
    while (cin >> word >> sum) {
        if (word != prevKey) {
            if (!prevKey.empty()) {
                cout << prevKey << "t" << sum << endl;
            }
            prevKey = word;
            sum = 0;
        }
        sum += atoi(word.c_str());
    }
    cout << prevKey << "t" << sum << endl;
    return 0;
}

Hadoop Pipes的编程实例

Hadoop Pipes提供了一种使用C++编写MapReduce任务的方式,通过调用Java API来实现MapReduce逻辑,以下是一个使用Hadoop Pipes实现的WordCount示例。

Mapper:读取输入文本,按空格分割单词,输出每个单词及其出现次数。

代码示例

#include <hadoop/pipes.h>
using namespace HadoopPipes;
class MyMapper : public Mapper {
public:
    void map(const HadoopPipes::FlowFile& file, const std::string& input, std::ostream& output) {
        std::string line;
        getline(input, line);
        std::istringstream iss(line);
        std::string word;
        while (iss >> word) {
            output << word << " " << 1 << "
";
        }
    }
};

Reducer:接收Mapper的输出,对相同单词进行累加,输出最终结果。

代码示例

#include <hadoop/pipes.h>
using namespace HadoopPipes;
class MyReducer : public Reducer {
public:
    void reduce(const HadoopPipes::FlowFile& file, std::istream& input, std::ostream& output) {
        std::string prevKey;
        int sum = 0;
        std::string key;
        int value;
        while (input >> key >> value) {
            if (key != prevKey) {
                if (!prevKey.empty()) {
                    output << prevKey << " " << sum << "
";
                }
                prevKey = key;
                sum = 0;
            }
            sum += value;
        }
        output << prevKey << " " << sum << "
";
    }
};

| 编程语言 | 例子描述 | 代码片段 |

| | | |

| Java | 使用Java实现MapReduce编程模型来计算单词频率 | “`java

public class WordCount {

public static class TokenizerMapper

extends Mapper<Object, Text, Text, IntWritable>{

private final static IntWritable one = new IntWritable(1);

private Text word = new Text();

public void map(Object key, Text value, Context context

) throws IOException, InterruptedException {

StringTokenizer itr = new StringTokenizer(value.toString());

while (itr.hasMoreTokens()) {

word.set(itr.nextToken());

context.write(word, one);

}

}

}

public static class IntSumReducer

extends Reducer<Text,IntWritable,Text,IntWritable> {

private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values,

Context context

) throws IOException, InterruptedException {

int sum = 0;

for (IntWritable val : values) {

sum += val.get();

}

result.set(sum);

context.write(key, result);

}

}

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

Job job = Job.getInstance(conf, "word count");

job.setJarByClass(WordCount.class);

job.setMapperClass(TokenizerMapper.class);

job.setCombinerClass(IntSumReducer.class);

job.setReducerClass(IntSumReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

“` |

| Python | 使用Python的PySpark库实现MapReduce编程模型来计算单词频率 | “`python

from pyspark import SparkContext

def map_function(line):

words = line.split()

return [(word, 1) for word in words]

def reduce_function(key, values):

return sum(values)

sc = SparkContext("local", "Word Count")

text_file = sc.textFile("wordcount.txt")

words = text_file.flatMap(lambda line: map_function(line)).reduceByKey(reduce_function)

words.collect().foreach(lambda x: print(x))

“` |

| Scala | 使用Scala和Apache Spark实现MapReduce编程模型来计算单词频率 | “`scala

import org.apache.spark.{SparkConf, SparkContext}

object WordCount {

def main(args: Array[String]): Unit = {

val conf = new SparkConf().setAppName("Word Count")

val sc = new SparkContext(conf)

val textFile = sc.textFile("wordcount.txt")

val words = textFile.flatMap(_.split(" "))

.map(word => (word, 1))

.reduceByKey((a, b) => a + b)

words.collect().foreach(println)

sc.stop()

}

“` |

| Ruby | 使用Ruby和Apache Spark实现MapReduce编程模型来计算单词频率 | “`ruby

require ‘spark’

SparkConf.new do |conf|

conf.setAppName "Word Count"

end

sc = SparkContext.new

text_file = sc.textFile("wordcount.txt")

words = text_file.flatMap { |line| line.split(" ") }

.map { |word| [word, 1] }

.reduceByKey { |a, b| a + b }

words.collect.each do |word, count|

puts "#{word}: #{count}"

end

sc.stop

“` |

0