题目要求对已给出的数据使用Spark统计每天的新增的用户。
数据如下:
我的思路是:先对数据使用sortByKey算子按照日期进行排序,然后将<日期,用户名>的键值对,键和值的位置互换,即变成<用户名,日期>的键值对,然后使用reduceByKey只保留每个用户名出现的第一条数据(因为一开始已经按照日期对数据进行排序)。最后将<用户名,日期>的键值对互换回去,然后使用groupByKey按照日期进行分组,最后再用sortByKey按照日期排序再输出即可。
代码如下
SparkConf sparkconf = new SparkConf(). setAppName(" ").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(sparkconf); JavaRDDlines = sc.textFile("文件路径");//原始数据文件路径 JavaPairRDD firstClean = lines.mapToPair(f->{ return new Tuple2<>(f.split(" ")[0],f.split(" ")[1]); });//创建键值对 JavaPairRDD rdd = firstClean.sortByKey();//按照日期排序 JavaPairRDD rdd2 = rdd.mapToPair(f->{ return new Tuple2<>(f._2,f._1); });//键值对位置互换生成新的键值对 JavaPairRDD rdd3 = rdd2.reduceByKey((x,y)-> x);//保留用户名出现的第一条数据 JavaPairRDD rdd4 = rdd3.mapToPair(f->{ return new Tuple2<>(f._2,f._1); });//键值对位置互换 JavaPairRDD > rdd5 = rdd4.groupByKey();//按照日期进行分组 JavaPairRDD > rdd6 = rdd5.sortByKey();//按照日期排序 rdd6.foreach(x -> System.out.println(x));//输出
数据变化过程:
同时我也自己生成了数据进行一个数据清洗的过程,最后我没有使用那个数据,但是大致流程也是一样的。
数据生成代码如下:
public class GenerateData{ public static void main(String[] args) throws IOException { // TODO Auto-generated method stub Random r = new Random(); FileWriter fw = new FileWriter("生成的数据存放的位置及其名称"); int x = r.nextInt(1100000)-100000;//生成的数据范围 for(int y = 1;y <= x;y++) { int year = r.nextInt(7)+2015;//年份15-21 int month = r.nextInt(13);//月份0-12 int day = r.nextInt(32);//日期0-31 int n = r.nextInt(10);//用户名长度0-10 String username = getRandomString(n); System.out.println(username); fw.write(y + " " + year + "-" + month + "-" + day + " "+ username + "rn" ); fw.flush();//刷新缓冲区 } } public static String getRandomString(int length){ String str="abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; Random random=new Random(); StringBuffer sb=new StringBuffer(); for(int i=0;i生成数据如下:
因为前面生成数据时不够严谨,生成的数据有些问题。比如用户名缺失,0月份,0日期,每个月都有31日这种问题,所以只有用Spark来数据清洗了。
代码如下:
SparkConf sparkconf = new SparkConf(). setAppName(" ").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(sparkconf); JavaRDDlines = sc.textFile("数据路径"); JavaRDD firstClean1 = lines.filter(f->{ return Findnull(f); });//过滤掉用户名缺失的数据 JavaPairRDD firstClean2 = firstClean1.mapToPair(f->{ return new Tuple2<>(f.split(" ")[1],f.split(" ")[2]); }); long counts = firstClean2.count();//统计还剩多少条数据 JavaPairRDD ,Integer>, String> secondClean1 = firstClean2.mapToPair( f -> { return new Tuple2 ,Integer>, String>( new Tuple2 ,Integer>( new Tuple2 (Integer.valueOf(f._1.split("-")[1]),Integer.valueOf(f._1.split("-")[2])), Integer.valueOf(f._1.split("-")[0])),f._2); });//键值对疯狂套娃hhh,变成<<<月,日>,年>,用户名> 形式的键值对,这样就能对日期比较好 *** 作 JavaPairRDD ,Integer>, String> secondClean2 = secondClean1.filter(f ->{ if(f._1._1._1() != 0) { if(f._1._1._2() != 0) { return true; } } return false; });//过滤调月份和日期为0的数据 JavaPairRDD ,Integer>, String> secondClean3 = secondClean2.filter(f -> { if (f._1._1._1()<=7) { //月份小于等于7时 if (f._1._1._1()%2==0 && f._1._1._1() !=2 ){ if (f._1._1._2() != 31) { //过滤掉偶数月的31日 return true; } } else if(f._1._1._1() == 2 ){ //月份为2时 if(f._1._2()%4==0&&f._1._2()%100!=0||f._1._2()%400==0){ //根据闰年平年判断2月是否保留29日 if (f._1._1._2() != 30 && f._1._1._2() != 31 && f._1._1._2() != 0) { return true; } }else{ if (f._1._1._2() != 30 && f._1._1._2() != 31 && f._1._1._2() != 29 && f._1._1._2() != 0) { return true; } } }else{ return true; } }else{ //月份大于7时 if (f._1._1._1()%2==1){ if (f._1._1._2() != 31 ) { //过滤奇数月的31日 return true; } }else{ return true; } } return false; }); long counts2 = secondClean3.count(); 过滤第三列为空值的数据方法:
public static Boolean Findnull(String a) throws Exception { // TODO Auto-generated method stub String []arr = a.split(" "); String x = arr[0]; String y = arr[1]; try{ String z = arr[2]; } catch(Exception e){ return false; } return true; }闲着无聊又写了一个数据查询的代码,当然要把数据套娃成<<<月,日>,年>,用户名>才可以用。(这一段代码本来不是接在刚才的代码上的,所以参数名有点不一样,将其修改即可)
Scanner A = new Scanner(System.in); System.out.println("请输入需要查询的日期:"); System.out.println("请输入年份:"); int year = A.nextInt(); System.out.println("请输入月份:"); int month = A.nextInt(); System.out.println("请输入日期:"); int day = A.nextInt(); JavaPairRDD,Integer>, Iterable > secondClean3 = secondClean2.filter(f -> { if (month<13&&month>0) { if(day<32&&day>0) { if (f._1._2()==year && f._1._1._1()==month && f._1._1._2()==day) { return true; }else { return false; } }else{ if (f._1._2()==year && f._1._1._1()==month) { return true; }else { return false; } } }else{ if (f._1._2()==year) { return true; }else { return false; } } }); secondClean3.foreach(x -> System.out.println(x)); System.out.println("是否需要保存?(保存请输入“1”)"); int x = A.nextInt(); if (x == 1) { System.out.println("请输入新建的文件夹名:"); String add = A.next(); secondClean3.saveAsTextFile("""+add+"""); } } } 不足之处欢迎指正。hhh
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)