Spark统计每天新增用户

Spark统计每天新增用户,第1张

Spark统计每天新增用户

题目要求对已给出的数据使用Spark统计每天的新增的用户。

数据如下:

我的思路是:先对数据使用sortByKey算子按照日期进行排序,然后将<日期,用户名>的键值对,键和值的位置互换,即变成<用户名,日期>的键值对,然后使用reduceByKey只保留每个用户名出现的第一条数据(因为一开始已经按照日期对数据进行排序)。最后将<用户名,日期>的键值对互换回去,然后使用groupByKey按照日期进行分组,最后再用sortByKey按照日期排序再输出即可。

代码如下

		SparkConf sparkconf = new SparkConf().
				setAppName(" ").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(sparkconf);
		JavaRDD lines = sc.textFile("文件路径");//原始数据文件路径

		JavaPairRDDfirstClean = lines.mapToPair(f->{
			return new Tuple2<>(f.split(" ")[0],f.split(" ")[1]);
			});//创建键值对
		
		JavaPairRDD rdd = firstClean.sortByKey();//按照日期排序
		
		JavaPairRDDrdd2 = rdd.mapToPair(f->{
			return new Tuple2<>(f._2,f._1);
			});//键值对位置互换生成新的键值对
		
		JavaPairRDD rdd3 = rdd2.reduceByKey((x,y)-> x);//保留用户名出现的第一条数据
		
		JavaPairRDDrdd4 = rdd3.mapToPair(f->{
			return new Tuple2<>(f._2,f._1);
			});//键值对位置互换
	        JavaPairRDD> rdd5 = rdd4.groupByKey();//按照日期进行分组
		JavaPairRDD> rdd6 = rdd5.sortByKey();//按照日期排序
                rdd6.foreach(x -> System.out.println(x));//输出

数据变化过程:

同时我也自己生成了数据进行一个数据清洗的过程,最后我没有使用那个数据,但是大致流程也是一样的。

数据生成代码如下:
 

public class GenerateData{
	public static void main(String[] args) throws IOException {
		// TODO Auto-generated method stub

		Random r = new Random();
		FileWriter fw = new FileWriter("生成的数据存放的位置及其名称");
		int x = r.nextInt(1100000)-100000;//生成的数据范围
		for(int y = 1;y <= x;y++) {
			int year = r.nextInt(7)+2015;//年份15-21
			int month = r.nextInt(13);//月份0-12
			int day = r.nextInt(32);//日期0-31
			int n = r.nextInt(10);//用户名长度0-10
			String username = getRandomString(n);
			System.out.println(username);
			fw.write(y + " " + year + "-"  + month + "-" + day + " "+ username + "rn" );
			fw.flush();//刷新缓冲区
			}
		}
        
	 public static String getRandomString(int length){
	     String str="abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
	     Random random=new Random();
	     StringBuffer sb=new StringBuffer();
	     for(int i=0;i 

生成数据如下:

因为前面生成数据时不够严谨,生成的数据有些问题。比如用户名缺失,0月份,0日期,每个月都有31日这种问题,所以只有用Spark来数据清洗了。

代码如下:

		SparkConf sparkconf = new SparkConf().
				setAppName(" ").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(sparkconf);
		JavaRDD lines = sc.textFile("数据路径");

		JavaRDD firstClean1 = lines.filter(f->{
			return Findnull(f);
		});//过滤掉用户名缺失的数据
		JavaPairRDDfirstClean2 = firstClean1.mapToPair(f->{
			return new Tuple2<>(f.split(" ")[1],f.split(" ")[2]);
			});
	    long counts = firstClean2.count();//统计还剩多少条数据
		JavaPairRDD,Integer>, String> secondClean1 = firstClean2.mapToPair( f -> {
			return new Tuple2,Integer>, String>(
					new Tuple2,Integer>(
							new Tuple2(Integer.valueOf(f._1.split("-")[1]),Integer.valueOf(f._1.split("-")[2])),
							Integer.valueOf(f._1.split("-")[0])),f._2);
		});//键值对疯狂套娃hhh,变成<<<月,日>,年>,用户名>	形式的键值对,这样就能对日期比较好 *** 作 	
		JavaPairRDD,Integer>, String> secondClean2 =
				secondClean1.filter(f ->{
					if(f._1._1._1() != 0) { 
						if(f._1._1._2() != 0) { 
							return true;
							}
						}
					return false;
				});//过滤调月份和日期为0的数据
		JavaPairRDD,Integer>, String> secondClean3 =
				secondClean2.filter(f -> {
					if (f._1._1._1()<=7) { //月份小于等于7时
						if (f._1._1._1()%2==0 && f._1._1._1() !=2 ){  
							if (f._1._1._2() != 31) { //过滤掉偶数月的31日
								return true;
								}
							}
						else if(f._1._1._1() == 2 ){ //月份为2时
							if(f._1._2()%4==0&&f._1._2()%100!=0||f._1._2()%400==0){ //根据闰年平年判断2月是否保留29日
								if (f._1._1._2() != 30 && f._1._1._2() != 31 && f._1._1._2() != 0) {
									return true;
									}
								}else{
									if (f._1._1._2() != 30 && f._1._1._2() != 31 && f._1._1._2() != 29 && f._1._1._2() != 0) {
										return true;
										}
									}
							}else{
								return true;
								}
						}else{ //月份大于7时
							if (f._1._1._1()%2==1){
								if (f._1._1._2() != 31 ) { //过滤奇数月的31日
									return true;
									}
								}else{
									return true;
									}
							}
					return false;
				});
	    long counts2 = secondClean3.count();

过滤第三列为空值的数据方法:

public static Boolean Findnull(String a) throws Exception {
		// TODO Auto-generated method stub
		String []arr = a.split(" ");
		String x = arr[0];
		String y = arr[1];
		try{
			String z = arr[2];
		}
		catch(Exception e){
			return false;
		}		
		return true;
	}

闲着无聊又写了一个数据查询的代码,当然要把数据套娃成<<<月,日>,年>,用户名>才可以用。(这一段代码本来不是接在刚才的代码上的,所以参数名有点不一样,将其修改即可)

		Scanner A = new Scanner(System.in);
		System.out.println("请输入需要查询的日期:");
		System.out.println("请输入年份:");
		int year = A.nextInt();
		System.out.println("请输入月份:");
		int month = A.nextInt();
		System.out.println("请输入日期:");
		int day = A.nextInt();
		
		JavaPairRDD,Integer>, Iterable> secondClean3 = 
	    		secondClean2.filter(f -> {
	    			if (month<13&&month>0) {
	    				if(day<32&&day>0) {
	    					if (f._1._2()==year && f._1._1._1()==month && f._1._1._2()==day) {
	    						return true;
	    						}else {
	    							return false;
	    						}
	    					}else{
		    					if (f._1._2()==year && f._1._1._1()==month) {
		    						return true;
		    						}else {
		    							return false;
		    							}
		    					}
	    				}else{
	    					if (f._1._2()==year) {
	    						return true;
	    						}else {
	    							return false;
	    						}
	    					}
	    		});
		secondClean3.foreach(x -> System.out.println(x));
		System.out.println("是否需要保存?(保存请输入“1”)");
		int x = A.nextInt();
		if (x == 1) {
			System.out.println("请输入新建的文件夹名:");
			String add = A.next();
			
			secondClean3.saveAsTextFile("""+add+""");
		}
	}
}

不足之处欢迎指正。hhh

欢迎分享,转载请注明来源:内存溢出

原文地址: https://www.outofmemory.cn/zaji/5690683.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存