环境准备,三台虚拟机上已经搭好hadoop和Zookeeper集群,配置好HDFS和MapReduce环境(环境看之前文章),本案例通过使用ava API实现好友推荐(好友推荐思路图如下)。
1、启动环境
//启动三台zookeeper zkServer.sh start //启动HDFS start-all.sh
启动访问主节点,创建文件加mrxx作为数据存储的位置
2、Java API具体实现代码
首先创建一个java项目,再通过工具类随机生成好友关系数据作为测试数据,在通过编写map和reduce的继承类重写方法实现具体 *** 作,最后通过Job类提交任务
① 引入依赖✧配置✧工具类
依赖pom.xml
UTF-8 1.8 1.8 3.1.2 2.4 org.apache.hadoop hadoop-common${hadoop.version} org.apache.hadoop hadoop-hdfs${hadoop.version} org.apache.hadoop hadoop-client${hadoop.version} org.apache.hadoop hadoop-mapreduce-client-common${hadoop.version} org.apache.hadoop hadoop-mapreduce-client-core${hadoop.version} org.apache.hadoop hadoop-mapreduce-client-jobclient${hadoop.version} commons-io commons-io${commons-io.version} com.janeluo ikanalyzer2012_u6
配置文件:
工具类FriendRandomUtil
public class FriendRandomUtil { public static void main(String[] args) throws IOException { //读取学生信息 ListstudentList = FileUtils.readLines(new File(FriendRandomUtil.class.getResource("/students.txt").getPath())); //创建好友列表映射关系 Map > friendMap = studentList.stream().collect(Collectors.toMap(e -> e, e -> new HashSet<>())); //开始计算 for (String student : friendMap.keySet()) { //使用蓄水池算法获取随机好友 List sampleList = FriendRandomUtil.reservoirSampling(studentList, new Random().nextInt(30) + 10); //将list中选出的自己删除掉 sampleList.remove(student); //将数据添加到set friendMap.get(student).addAll(sampleList); //同时将当前学生添加到对方的好友 for (String friend : sampleList) { friendMap.get(friend).add(student); } } //打印好友信息 for (String student : friendMap.keySet()) { System.out.print(student + "t"); friendMap.get(student).stream().forEach(e -> System.out.print(e + "t")); System.out.println(); } } public static List reservoirSampling(List studentList, int num) { //定义数据的蓄水池 List sampleList = studentList.subList(0, num); //开始进行抽样 for (int i = num; i < studentList.size(); i++) { //从0-j中随机出一个数 int r = new Random().nextInt(i); if (r < num) { //如果随机出的r<水池大小 ,则进行替换 sampleList.set(r, studentList.get(i)); } } return sampleList; } }
执行工具类生成测试数据,并将测试数据上传至HDFS下的mrxx文件夹里,方便程序启动时获取数据源
② 重写map、reduce方法
重写map方法
public class FriendMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //对好友关系进行拆分 String[] friends = value.toString().split("\s"); //开始关联直接好友 for (int i = 1; i < friends.length; i++) { context.write(new Text(sortFriendName(friends[0], friends[i])), new IntWritable(0)); } //开始关联间接好友 for (int i = 1; i < friends.length; i++) { for (int j = i + 1; j < friends.length; j++) { context.write(new Text(sortFriendName(friends[i], friends[j])), new IntWritable(1)); } } } private String sortFriendName(String f1, String f2) { return f1.compareTo(f2) < 0 ? f1 + ":" + f2 : f2 + ":" + f1; } }
重写reduce
public class FriendReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { //定义一个计数器 int count = 0; //获取迭代器 Iterator iterator = values.iterator(); while (iterator.hasNext()) { //获取推荐度 int recommendation = iterator.next().get(); //判断 if (recommendation == 0) { return; } else { count += recommendation; } } //将推荐度写出到HDFS context.write(key, new IntWritable(count)); } }
③ 编写作业类FriendsJob
主要通过此类来提交我们的作业,并配置好我们此次作业的配置和重写的map和reduce方法,启动提交即可
public class FriendsJob { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { //声明配置 Configuration configuration=new Configuration(); //设置本地模式 configuration.set("mapreduce.framework.name","local"); //创建job Job job=Job.getInstance(configuration); //设置job job.setJarByClass(FriendsJob.class); job.setJobName("FriendsJob"+System.currentTimeMillis()); job.setNumReduceTasks(2); //设置JOB的输入和输出路径 FileInputFormat.setInputPaths(job, new Path("/mrxx/friends.txt")); FileOutputFormat.setOutputPath(job, new Path("/mrxx/result/" + job.getJobName())); //设置Job的Map和Reduce job.setMapperClass(FriendMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(FriendReducer.class); //提交任务 job.waitForCompletion(true); } }
测试结果:》》》
至此好友推荐完成,后面数字代表的推荐次数高。
3、好友推荐结果存放到Mysql
在原有基础上添加一个friends类引入mysql依赖,用navicat连接虚拟机上的mysql数据库,创建一个t_friends表作为存储
① Friends类✧依赖✧表结构
Friends类
public class Friend implements Writable, DBWritable { private String id; private String person; private String friend; private Integer count; private Date createtime; }//记得补全set get 和构造器 toString @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(this.id); dataOutput.writeUTF(this.person); dataOutput.writeUTF(this.friend); dataOutput.writeInt(this.count); dataOutput.writeLong(this.createtime.getTime()); } @Override public void readFields(DataInput dataInput) throws IOException { this.id = dataInput.readUTF(); this.person = dataInput.readUTF(); this.friend = dataInput.readUTF(); this.count = dataInput.readInt(); this.createtime = new Date(dataInput.readLong()); } @Override public void write(PreparedStatement preparedStatement) throws SQLException { preparedStatement.setString(1, this.id); preparedStatement.setString(2, this.person); preparedStatement.setString(3, this.friend); preparedStatement.setInt(4, this.count); preparedStatement.setTimestamp(5, new Timestamp(this.createtime.getTime())); } @Override public void readFields(ResultSet resultSet) throws SQLException { this.id = resultSet.getString(1); this.person = resultSet.getString(2); this.friend = resultSet.getString(3); this.count = resultSet.getInt(4); this.createtime = resultSet.getTimestamp(5); }
t_friend表结构:
prom.xml
mysql mysql-connector-java5.1.32
② 修改FriendReducer和FriendsJob
FriendsReducer
public class FriendReducer extends Reducer{ private String jobName; @Override protected void setup(Context context) throws IOException, InterruptedException { jobName = context.getJobName(); } @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { //声明一个计数器记录推荐度 int count = 0; //获取迭代器 Iterator iterator = values.iterator(); //开始迭代 *** 作 for (IntWritable value : values) { if (value.get() == 0) { return; } else if (value.get() == 1) { count += value.get(); } } //创建一个Friend对象 String[] pf = key.toString().split(":"); Friend f1 = new Friend(jobName + UUID.randomUUID().toString().substring(0, 9), pf[0], pf[1], count, new Date()); Friend f2 = new Friend(jobName + UUID.randomUUID().toString().substring(0, 9), pf[1], pf[0], count, new Date()); //写出数据 context.write(f1, NullWritable.get()); context.write(f2, NullWritable.get()); } }
FriendsJob
public class FriendJob { private static String driverClass = "com.mysql.jdbc.Driver"; private static String url = "jdbc:mysql://192.168.168.101:3306/mrxx?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true&useSSL=false"; private static String username = "root"; private static String password = "123456"; private static String tableName = "t_friend"; private static String[] fields = {"id", "person", "friend", "count", "createtime"}; public static void main(String[] args) throws Exception { //加载配置文件 Configuration configuration = new Configuration(true); configuration.set("mapreduce.framework.name", "local"); //开始载入数据库的配置文件 DBConfiguration.configureDB(configuration, driverClass, url, username, password); //创建JOB Job job = Job.getInstance(configuration); //设置Job job.setJarByClass(FriendJob.class); job.setNumReduceTasks(1); job.setJobName("FriendsJob" + System.currentTimeMillis()); //设置读取和写出的HDFS地址 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(DBOutputFormat.class); FileInputFormat.setInputPaths(job, new Path("/mrxx/friends.txt")); DBOutputFormat.setOutput(job, tableName, fields); //设置Map和Reduce类以及传输的数据类型 job.setMapperClass(FriendMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(FriendReducer.class); //提交任务 job.waitForCompletion(true); } }
这样就修改完毕启动提交任务即可,但前提是LInux内的mysql已经启动,可以先用Navicat是否能够连接上,启动程序结果就会存放到mysql数据库了
③ 启动测试结果
完结撒花~
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)