青椒课堂MapReduce排序和序列化

青椒课堂MapReduce排序和序列化,第1张

青椒课堂MapReduce排序和序列化

任务一:流量统计项目案例
1.基本思路:实现自定义的 bean 来封装流量信息,并将 bean 作为 Map 输出的 key 来传输。
2.MapReduce 程序在处理数据的过程中会对数据排序(Map 输出的 kv 对传输到 Reduce 之前,会排序),排序的依据是 Map 输出的 key, 所以,我们如果要实现自己需要的排序规则,则可以考虑将排序因素放到 key 中。

任务二:WritableComparable 排序
1.排序是 MapReduce 框架中最重要的 *** 作之一。 MapTask 和 ReduceTak 均会对数据按照 key 进行排序。该 *** 作属于 Hadoop 的默认行为,任何应用程序中的数据均会被排序,而不管逻辑上是否需要。
2.WritableComparable 继承自 Writable和 java.lang.Comparable 接口,是一个Writable也是一个Comparable,也就是说,既可以序列化,也可以比较。
3.完整的 bean 程序如下所示:

package com.hongyaa.mr.sort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;


public class FlowBeanSort implements WritableComparable {
	private long upFlow; // 总上行流量
	private long downFlow; // 总下行流量
	private long sumFlow; // 总流量

	// 无参构造方法必须有,目的是为了在反序列化 *** 作创建对象实例时调用无参构造器
	public FlowBeanSort() {
		super();
	}

	// 带参的构造方法,目的是为了对象的初始化	
	public FlowBeanSort(long upFlow, long downFlow, long sumFlow) {
		super();
		this.upFlow = upFlow;
		this.downFlow = downFlow;
		this.sumFlow = sumFlow;
	}

	public long getUpFlow() {
		return upFlow;
	}

	public void setUpFlow(long upFlow) {
		this.upFlow = upFlow;
	}

	public long getDownFlow() {
		return downFlow;
	}

	public void setDownFlow(long downFlow) {
		this.downFlow = downFlow;
	}

	public long getSumFlow() {
		return sumFlow;
	}

	public void setSumFlow(long sumFlow) {
		this.sumFlow = sumFlow;
	}

	// 序列化方法,将对象的字段信息写入输出流
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeLong(upFlow);
		out.writeLong(downFlow);
		out.writeLong(sumFlow);
	}

	// 反序列化方法,从输入流中读取各个字段信息
	// 注意:字段的反序列化顺序需要和序列化的顺序保持一致,而且字段的类型和个数也要保持一致
	@Override
	public void readFields(DataInput in) throws IOException {
		this.upFlow = in.readLong();
		this.downFlow = in.readLong();
		this.sumFlow = in.readLong();
	}

	// 实现自定义排序,按照总流量倒序排序
	@Override
	public int compareTo(FlowBeanSort o) {
		// 自定义降序排列
		return this.sumFlow > o.getSumFlow() ? -1 : 1;
	}

	// 重写toString()方法
	@Override
	public String toString() {
		return upFlow + "t" + downFlow + "t" + sumFlow;
	}
}

任务三:MapReduce 编程
1.Map 端程序编写
具体代码如下所示:

// 输入数据是上一个统计程序的输出结果,已经是各个手机号的总流量信息
public class FlowSumSortMapper extends Mapper {

	@Override
	protected void map(LongWritable key, Text value, Mapper.Context context)
			throws IOException, InterruptedException {
		// (1)获取一行文本的内容,并将其转换为String类型,之后按照分隔符“t”进行切分
		String[] splits = value.toString().split("t");
		// (2)取出手机号
		String telephone = splits[0];
		// (3)封装对象
		FlowBeanSort fbs = new FlowBeanSort();
		fbs.setUpFlow(Long.parseLong(splits[1]));
		fbs.setDownFlow(Long.parseLong(splits[2]));
		fbs.setSumFlow(Long.parseLong(splits[3]));
		// (4)将封装的fbs对象作为key,将手机号作为value,分发给Reduce端
		context.write(fbs, new Text(telephone));
	}
}

2.Reduce 端程序编写
具体代码如下所示:

public class FlowSumSortReducer extends Reducer {
	
	@Override
	protected void reduce(FlowBeanSort key, Iterable values,
			Reducer.Context context) throws IOException, InterruptedException {
		// 遍历集合
		for (Text tele : values) {
			// 将手机号作为key,将封装好的流量信息作为value,作为最终的输出结果
			context.write(new Text(tele), key);
		}
	}
}

3.Driver 端程序编写
完整代码如下所示:

public class FlowSumSortDemo {
	
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		// (1)获取配置信息类
		Configuration conf = new Configuration();
		// 指定mapreduce程序运行的hdfs的相关运行参数
		conf.set("fs.defaultFS", "hdfs://localhost:9000");

		// (2)新建一个Job对象
		Job job = Job.getInstance(conf);

		// (3)将 job 所用到的那些类(class)文件,打成jar包 (打成jar包在集群运行必须写)
		job.setJarByClass(FlowSumSortDemo.class);

		// (4)指定 Mapper 类和 Reducer 类
		job.setMapperClass(FlowSumSortMapper.class);
		job.setReducerClass(FlowSumSortReducer.class);

		// (5)指定 MapTask 的输出key-value类型
		job.setMapOutputKeyClass(FlowBeanSort.class);
		job.setMapOutputValueClass(Text.class);

		// (6)指定 ReduceTask 的输出key-value类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBeanSort.class);

		// (7)指定该 mapreduce 程序数据的输入和输出路径
		Path inPath = new Path("/flow/output_sum");
		Path outPath = new Path("/flow/output_sort");

		// 获取fs对象
		FileSystem fs = FileSystem.get(conf);
		if (fs.exists(outPath)) {
			fs.delete(outPath, true);
		}

		FileInputFormat.setInputPaths(job, inPath);
		FileOutputFormat.setOutputPath(job, outPath);

		// (8)最后给YARN来运行,等着集群运行完成返回反馈信息,客户端退出
		boolean waitForCompletion = job.waitForCompletion(true);
		System.exit(waitForCompletion ? 0 : 1);
	}
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://www.outofmemory.cn/zaji/4657133.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-06
下一篇 2022-11-06

发表评论

登录后才能评论

评论列表(0条)

保存