当前位置:首页 > 行业动态 > 正文

MapReduce中的数据序列化是如何实现的?

在MapReduce中,数据序列化是将数据转换为可以存储或传输的格式。常见的序列化方法包括JSON、XML等,它们可以将复杂的数据结构转换为简单的字符串或字节流,便于在网络中传输或存储到磁盘上。

MapReduce是一种常用的分布式计算模型,通常用于大规模数据处理任务,在MapReduce中,序列化是一个至关重要的概念,因为它将数据转换为字节流,以便在网络中进行传输和存储,本文将详细介绍MapReduce中的序列化机制,包括其、Writable类、常用数据序列化类型以及自定义bean对象实现序列化的步骤。

一、序列化

序列化是将数据结构转换为字节流的过程,通常用于数据在网络中传输和存储,在MapReduce中,序列化尤为重要,因为MapReduce需要将数据分发到多个节点上进行并行计算,在MapReduce中,数据通常以键值对的形式存在,每个键值对都需要进行序列化。

二、Writable类

Writable类是MapReduce中用于序列化和反序列化数据的抽象类,它定义了两个方法:write和readFields,用户可以通过继承Writable来实现自定义数据类型的序列化和反序列化。

write方法

write方法用于将Writable对象转换为字节流,通常实现为将每个字段按照特定的格式写入到输出流中,write方法的实现应遵循以下规则:

写入的数据应该是有序的,并且写入的顺序应该与字段定义的顺序相同。

写入的数据应该是固定长度的,这样可以方便地进行反序列化。

写入的数据应该是可重复的,这样可以方便地进行分布式计算。

一个简单的Writable类的例子如下:

public class MyWritable implements Writable {
    private int field1;
    private String field2;
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(field1);
        out.writeUTF(field2);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        field1 = in.readInt();
        field2 = in.readUTF();
    }
}

在这个例子中,MyWritable类有两个字段:field1和field2,write方法将field1和field2按照固定的顺序写入到输出流中,readFields方法从输入流中读取field1和field2的值。

readFields方法

readFields方法用于将字节流转换为Writable对象,通常实现为从输入流中读取每个字段的值,并将其设置到Writable对象的相应字段中,readFields方法的实现应遵循以下规则:

读取的数据应该是有序的,并且读取的顺序应该与字段定义的顺序相同。

读取的数据应该是固定长度的,这样可以方便地进行反序列化。

下面是一个读取自定义Writable对象的例子:

MyWritable obj = new MyWritable();
obj.readFields(in);

在这个例子中,我们创建了一个MyWritable对象,并调用了readFields方法将输入流中的数据读取到MyWritable对象中。

三、常用数据序列化类型

在MapReduce中,常用的序列化类型有int与IntWritable转化、Text与String序列化等,这些序列化类型通过简单的校验使得存储空间少、传输速度快。

int与IntWritable转化

IntWritable是一个专门用于序列化整数的类,下面是int与IntWritable之间的转换示例:

// b是int类型
IntWritable outV = new IntWritable();
outV.set(b);
// a是IntWritable类型
int b = outV.get();

Text与String序列化

Text是Hadoop提供的用于序列化字符串的类,下面是Text与String之间的转换示例:

// Text --> String
Text text = new Text();
String s = text.toString();
// String --> Text
Text.set(string);

四、自定义bean对象实现序列化接口(Writable)

尽管Hadoop提供了一些常用的数据序列化类型,但它们并不能满足所有的需求,对于复杂的bean对象,我们需要实现Writable接口来进行序列化,自定义bean对象实现序列化的步骤如下:

1、实现Writable接口。

2、重写序列化方法write。

3、重写反序列化方法readFields。

4、反序列化时,需要反射调用空参构造函数,所以必须有空参构造器。

5、序列化的顺序和反序列化的顺序一致。

6、要想把结果显示在文件中,需要重写toString()方法,默认传输过来的是地址值,可用’t’分开,方便后续使用。

7、如果自定义的bean放在key中传输,还需要实现Comparable接口,因为MapReduce框架中的Shuffle过程要求对key必须能排序。

一个自定义bean对象的例子如下:

public class FlowBean implements Writable {
    private long upFlow; //上行流量
    private long downFlow; //下行流量
    private long sumFlow; //总流量
    public long getUpFlow() {
        return upFlow;
    }
    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }
    public long getDownFlow() {
        return downFlow;
    }
    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }
    public long getSumFlow() {
        return sumFlow;
    }
    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }
    //重载计算总流量函数,因为不会传总流量,只会通过上行流量与下行流量计算得出
    public void setSumFlow() {
        this.sumFlow = this.upFlow + this.downFlow;
    }
    //3.重写空参构造
    public FlowBean() {
    }
    //4.重写toString方法
    @Override
    public String toString() {
        //输出会调用此对象的此方法,所以按输出的格式来写
        return upFlow + "t" + downFlow + "t" + sumFlow;
    }
    //2.重写序列化方法
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        //这里的数据都是Long类型,所以使用writeLong
       dataOutput.writeLong(upFlow);
       dataOutput.writeLong(downFlow);
       dataOutput.writeLong(sumFlow);
    }
    //5.重写反序列化方法
    @Override
    public void readFields(DataInput in) throws IOException {
       upFlow = in.readLong();
       downFlow = in.readLong();
       sumFlow = in.readLong();
    }
}

这个FlowBean类实现了Writable接口,并重写了序列化和反序列化方法,它还重写了toString方法以便在输出时显示结果。

五、序列化案例实操

下面我们通过一个实际案例来演示如何在MapReduce中使用序列化,假设我们需要统计每个手机号的总上行流量、总下行流量和总流量,输入数据格式如下:

id 手机号 网络ip 域名 上行流量 下行流量 网络状态码

期望输出数据格式如下:

手机号 总上行流量 总下行流量 总流量

需求分析

Map阶段:输入的key是这一行的偏移量,输入的value是这一行的数据,输出的key是手机号,输出的value是一个包含上行流量、下行流量和总流量的bean对象。

Reduce阶段:累加上行流量和下行流量得到总流量。

编写MapReduce程序

FlowBean类

我们创建一个FlowBean类,该类实现了Writable接口,并包含了上行流量、下行流量和总流量的属性,代码如下:

public class FlowBean implements Writable {
    private long upFlow; //上行流量
    private long downFlow; //下行流量
    private long sumFlow; //总流量
    public long getUpFlow() {
        return upFlow;
    }
    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }
    public long getDownFlow() {
        return downFlow;
    }
    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }
    public long getSumFlow() {
        return sumFlow;
    }
    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }
    //重载计算总流量函数,因为不会传总流量,只会通过上行流量与下行流量计算得出
    public void setSumFlow() {
        this.sumFlow = this.upFlow + this.downFlow;
    }
    //3.重写空参构造
    public FlowBean() {
    }
    //4.重写toString方法
    @Override
    public String toString() {
        //输出会调用此对象的此方法,所以按输出的格式来写
        return upFlow + "t" + downFlow + "t" + sumFlow;
    }
    //2.重写序列化方法
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        //这里的数据都是Long类型,所以使用writeLong
       dataOutput.writeLong(upFlow);
       dataOutput.writeLong(downFlow);
       dataOutput.writeLong(sumFlow);
    }
    //5.重写反序列化方法
    @Override
    public void readFields(DataInput in) throws IOException {
       upFlow = in.readLong();
       downFlow = in.readLong();
       sumFlow = in.readLong();
    }
}

编写Mapper类和Reducer类

我们编写Mapper类和Reducer类,Mapper类的代码如下:

public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] fields = value.toString().split("t");
        String phoneNum = fields[1]; //提取手机号作为key
        long upFlow = Long.parseLong(fields[4]); //提取上行流量
        long downFlow = Long.parseLong(fields[5]); //提取下行流量
        FlowBean flowBean = new FlowBean(); //创建FlowBean对象并设置值
        flowBean.setUpFlow(upFlow);
        flowBean.setDownFlow(downFlow);
        flowBean.setSumFlow(); //计算总流量并设置值
        context.write(new Text(phoneNum), flowBean); //输出key-value对
    }
}

Reducer类的代码如下:

public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
        long upFlowSum = 0; //初始化上行流量总和为0
        long downFlowSum = 0; //初始化下行流量总和为0
        for (FlowBean bean : values) { //遍历所有值并累加流量
            upFlowSum += bean.getUpFlow(); //累加上行流量
            downFlowSum += bean.getDownFlow(); //累加下行流量
        }
        FlowBean result = new FlowBean(); //创建结果对象并设置值
        result.setUpFlow(upFlowSum); //设置上行流量总和
        result.setDownFlow(downFlowSum); //设置下行流量总和
        result.setSumFlow(); //计算总流量并设置值
        context.write(key, result); //输出结果key-value对
    }
}

编写Driver类并运行程序

我们编写Driver类并运行程序,Driver类的代码如下:

public class FlowDriver {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration(); //配置作业基本信息
        Job job = Job.getInstance(conf, "flow"); //获取作业实例并设置作业名称为flow
        job.setJarByClass(FlowDriver.class); //设置主类为当前类
        job.setMapperClass(FlowMapper.class); //设置Mapper类为FlowMapper类
        job.setCombinerClass(FlowReducer.class); //设置Combiner类为FlowReducer类(可选)
        job.setReducerClass(FlowReducer.class); //设置Reducer类为FlowReducer类
        job.setOutputKeyClass(Text.class); //设置输出key的类型为Text类型
        job.setOutputValueClass(FlowBean.class); //设置输出value的类型为FlowBean类型
        job.setPartitionerClass(HashPartitioner.class); //设置分区器为HashPartitioner类(可选)
        job.setNumReduceTasks(1); //设置reduce任务数量为1个(可选)
        FileInputFormat.setInputPaths(job, new Path("hdfs://localhost:9000/input")); //设置输入路径为hdfs://localhost:9000/input目录中的文件(根据实际情况修改)
        FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output")); //设置输出路径为hdfs://localhost:9000/output目录中的文件(根据实际情况修改)
        System.exit(job.waitForCompletion(true) ? 0 : 1); //等待作业完成并退出程序(返回值为0表示成功,非0表示失败)

到此,以上就是小编对于“mapreduce中的序列化_数据序列化”的问题就介绍到这了,希望介绍的几点解答对大家有用,有任何问题和不懂的,欢迎各位朋友在评论区讨论,给我留言。

0