如何将hive的结果写入mysql

如何将hive的结果写入mysql,第1张

大部分利用hive做数据分析的步骤是先用hive将统计结果导出到本地文件或者Hive的其他表中,再将本地文件导入到mysql或者利用sqoop将Hive表导入到mysql中。

今天同事给推荐了一个利用udf函数直接将统计结果导入mysql的方法。

步骤为

hive>add jar /usr/lib/hive/lib/hive-contrib-0.9.0-cdh4.1.2.jar

Added /usr/lib/hive/lib/hive-contrib-0.9.0-cdh4.1.2.jar to class path

Added resource: /usr/lib/hive/lib/hive-contrib-0.9.0-cdh4.1.2.jar

hive>add jar /usr/share/java/mysql-connector-java-5.1.17.jar

Added /usr/share/java/mysql-connector-java-5.1.17.jar to class path

Added resource: /usr/share/java/mysql-connector-java-5.1.17.jar

hive>CREATE TEMPORARY FUNCTION dboutput AS 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFDBOutput'

hive>select dboutput('jdbc:mysql://localhost/result','root','123456','INSERT INTO dc(code,size) VALUES (?,?)',code,size) from accesslog limit 10

注:result为mysql数据库名,dc为数据库result中的表名 dc(code,size)括号中的字段为mysql表dc字段,values(?,?)对应hive统计结果的值 后面的code,size为hive表中的字段,accesslog表示hive中的表名称。

通过以上步骤即可将hive统计结果直接导入到mysql数据库中。

一、启动方法

/data/spark-1.4.0-bin-cdh4/bin/spark-sql --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2

注:/data/spark-1.4.0-bin-cdh4/为spark的安装路径

/data/spark-1.4.0-bin-cdh4/bin/spark-sql –help 查看启动选项

--master MASTER_URL 指定master url

--executor-memory MEM 每个executor的内存,默认为1G

--total-executor-cores NUM 所有executor的总核数

-e <quoted-query-string>直接执行查询SQL

-f <filename>以文件方式批量执行SQL

二、Spark sql对hive支持的功能

1、查询语句:SELECT GROUP BY ORDER BY CLUSTER BY SORT BY

2、hive *** 作运算:

1) 关系运算:= ==, <>, <, >, >=, <=

2) 算术运算:+, -, *, /, %

3) 逻辑运算:AND, &&, OR, ||

4) 复杂的数据结构

5) 数学函数:(sign, ln, cos, etc)

6) 字符串函数:

3、 UDF

4、 UDAF

5、 用户定义的序列化格式

6、join *** 作:JOIN {LEFT|RIGHT|FULL} OUTER JOIN LEFT SEMI JOIN CROSS JOIN

7、 unions *** 作:

8、 子查询: SELECT col FROM ( SELECT a + b AS col from t1) t2

9、Sampling

10、 Explain

11、 分区表

12、 视图

13、 hive ddl功能:CREATE TABLE、CREATE TABLE AS SELECT、ALTER TABLE

14、 支持的数据类型:TINYINT SMALLINT INT BIGINT BOOLEAN FLOAT DOUBLE STRING BINARY TIMESTAMPDATE ARRAY MAP STRUCT

三、Spark sql 在客户端编程方式进行查询数据

1、启动spark-shell

./spark-shell --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2

2、编写程序

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val df = sqlContext.read.json("../examples/src/main/resources/people.json")

查看所有数据:df.show()

查看表结构:df.printSchema()

只看name列:df.select("name").show()

对数据运算:df.select(df("name"), df("age") + 1).show()

过滤数据:df.filter(df("age") >21).show()

分组统计:df.groupBy("age").count().show()

1、查询txt数据

import sqlContext.implicits._

case class Person(name: String, age: Int)

val people = sc.textFile("../examples/src/main/resources/people.txt").map(_.split(",")).map(p =>Person(p(0), p(1).trim.toInt)).toDF()

people.registerTempTable("people")

val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

2、parquet文件

val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")

3、hdfs文件

val df = sqlContext.read.load("hdfs://namenode.Hadoop:9000/user/hive/warehouse/spark_test.db/test_parquet/part-r-00001.gz.parquet")

4、保存查询结果数据

val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")

df.select("name", "favorite_color").write.save("namesAndFavColors.parquet“)

四、Spark sql性能调优

缓存数据表:sqlContext.cacheTable("tableName")

取消缓存表:sqlContext.uncacheTable("tableName")

spark.sql.inMemoryColumnarStorage.compressedtrue 当设置为true时,Spark SQL将为基于数据统计信息的每列自动选择一个压缩算法。

spark.sql.inMemoryColumnarStorage.batchSize 10000 柱状缓存的批数据大小。更大的批数据可以提高内存的利用率以及压缩效率,但有OOMs的风险

本次实战的数据来自于"YouTube视频统计与社交网络"的数据集,是西蒙弗雷泽大学计算机学院在2008年所爬取的数据

数据集地址

数据之间采用"\t"作为分隔符

具体数据如下:

数据量大小为1G,条数为500万+

使用环境为

hive-1.1.0-cdh5.4.5

hadoop-2.6.0-cdh5.4.5

演示形式为使用hive shell

我们一起来看看数据

主要的问题在于category和relatedIDs处理,由于Hive是支持array格式的,所以我们想到的是使用array来存储category和relatedIDs,但是我们发现category的分割符是"&"而realatedIDs的分隔符是"\t",我们在创建表格的时候能够指定array的分隔符,但是只能指定一个,所以再将数据导入到Hive表格之前我们需要对数据进行一定转换和清洗

并且数据中肯定会存在一些不完整数据和一些奇怪的格式,所以数据的清洗是必要的,我在这里所使用的数据清洗方式是使用Spark进行清洗,也可以使用自定义UDF函数来进行清洗

数据清洗注意点

1)我们可以看到每行数据以"\t"作为分隔符,每行有十列数据,最后一列关联ID可以为空,那么我们对数据进行split之后数组的大小要大于8

2)数据中存在 "uNiKXDA8eyQ KRQE 1035 News &ampPolitics 107" 这样格式的数据,所以在处理category时需要注意 News &Politics中间的 &amp

处理后的数据如下:

下面的实战都是基于数据清洗后的数据进行的

1)youtube1的创建,文件格式为textfile

create table youtube1(videoId string, uploader string, age int, category array<string>, length int, views int, rate float, ratings int, comments int,relatedId array<string>)

row format delimited

fields terminated by "\t"

collection items terminated by "&"

stored as textfile

2)youtube2的创建,文件格式为orc

create table youtube2(videoId string, uploader string, age int, category array<string>, length int, views int, rate float, ratings int, comments int,relatedId array<string>)

row format delimited

fields terminated by "\t"

collection items terminated by "&"

stored as orc

3)youtube3的创建,文件格式为orc,进行桶分区

create table youtube3(videoId string, uploader string, age int, category array<string>, length int, views int, rate float, ratings int, comments int,relatedId array<string>)

clustered by (uploader) into 8 buckets

row format delimited

fields terminated by "\t"

collection items terminated by "&"

stored as orc

数据导入:

1)load data inpath "path" into table youtube1

2)由于无法将textfile格式的数据导入到orc格式的表格,所以数据需要从youtube1导入到youtube2和youtube3:

insert into table youtube2 select * from youtube1

insert into table youtube3 select * from youtube1

1)user_tmp的创建,文件格式textfile,24buckets

create table user_tmp(uploader string,videos int,friends int)

clustered by (uploader) into 24 buckets

row format delimited

fields terminated by "\t"

stored as textfile

2)user的创建,文件格式orc,24buckets

create table user(uploader string,videos int,friends int)

clustered by (uploader) into 24 buckets

row format delimited

fields terminated by "\t"

stored as orc

user表的数据导入也是同理

数据导入:

1)load data inpath "path" into table user_tmp

2)由于无法将textfile格式的数据导入到orc格式的表格,所以数据需要从user_tmp导入到user:

insert into table user select * from user_tmp

1)统计出观看数最多的10个视频

2)统计出视频类别热度的前10个类型

3)统计出视频观看数最高的50个视频的所属类别

4)统计出观看数最多的前N个视频所关联的视频的所属类别排行

5)筛选出每个类别中热度最高的前10个视频

6)筛选出每个类别中评分最高的前10个视频

7)找出用户中上传视频最多的10个用户的所有视频

8)筛选出每个类别中观看数Top10

select * from youtube3 order by views desc limit 10

结果如下:

select tagId, count(a.videoid) as sum from (select videoid,tagId from youtube3 lateral view explode(category) catetory as tagId) a group by a.tagId order by sum desc limit 10

结果:

select tagId, count(a.videoid) as sum from (select videoid,tagId from (select * from youtube3 order by views desc limit 20) e lateral view explode(category) catetory as tagId) a group by a.tagId order by sum desc

结果:

思路:

结果:

思路:

结果如下:

select * from youtube_category where categoryId="Music" order by ratings desc limit 10

结果如下:

思路:

结果如下:


欢迎分享,转载请注明来源:内存溢出

原文地址: http://www.outofmemory.cn/bake/11609648.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-17
下一篇 2023-05-17

发表评论

登录后才能评论

评论列表(0条)

保存