SparkSQL 前讲篇(十三)

SparkSQL 前讲篇(十三),第1张

SparkSQL 前讲篇(十三)
Spark SQL概述

  新的开始,也是新的高度,结束了SparkCore,现在我站在巨人的肩上,总结一下Spark SQL。先说一下,在SparkSQL对于算子可能不会再从源码详细的说,因为SparkCore和SparkSQL在算子上有很多相同之处,原理也是大同小异,熟悉了SparkCore的算子,那么Spark SQL算子也是相差无几,我们主要是对特殊存在的算子以及特殊的数据结构进行介绍。那,我们开始了?

一、Spark SQL 发展。

  Spark SQL主要是将数据转换为结构化的数据,然后进行处理 *** 作,spark的前身,也就是父辈是MapReduce,但是因为性能局限、使用场景,Spark揭竿而起,逐渐发展壮大。

1.1 HDFS -> Hive

  由于Hadoop在企业生产中的大量使用,HDFS上积累了大量数据,为了给熟悉RDBMS但又不理解MapReduce的技术人员提供快速上手的工具,Hive应运而生。Hive的原理是将SQL语句翻译成MapReduce计算。

1.2 Hive -> Shark

  但是,MapReduce计算过程中大量的中间磁盘落地过程消耗了大量的I/O,降低了运行效率,为了提供SQL-on-Hadoop的效率,Shark出现了。

  Shark是伯克利AMPLab实验室Spark生态环境的组件之一,它修改了Hive中的内存管理、物理计划和执行三个模块,使得SQL语句直接运行在Spark上,从而使得SQL查询的速度得到10-100倍的提升。

1.3 Shark-> Spark SQL

  2014年6月1日,Shark项目和SparkSQL项目的主持人Reynold Xin宣布:停止对Shark的开发,团队将所有资源放sparkSQL项目上,至此,Shark的发展画上了句话。Reynold在其微博上发出了下面这段话:

为什么Shark会死呢?Databricks在其官网上给出了答案

Shark built on the Hive codebase and achieved performance improvements by swapping out the physical execution engine part of Hive. While this approach enabled Shark users to speed up their Hive queries, Shark inherited a large, complicated code base from Hive that made it hard to optimize and maintain. As we moved to push the boundary of performance optimizations and integrating sophisticated analytics with SQL, we were constrained by the legacy that was designed for MapReduce.

随着Spark的发展,Shark对于Hive的太多依赖制约了Spark的One Stack rule them all的方针,制约了Spark各个组件的相互集成,同时Shark也无法利用Spark的特性进行深度优化,所以决定放弃Shark,提出了SparkSQL项目。

随着Shark的结束,两个新的项目应运而生:SparkSQL和Hive on Spark。其中SparkSQL作为Spark生态的一员继续发展,而不再受限于Hive,只是兼容Hive;而Hive on Spark是一个Hive的发展计划,该计划将Spark作为Hive的底层引擎之一,也就是说,Hive将不再受限于一个引擎,可以采用Map-Reduce、Tez、Spark等引擎。
  SparkSQL无论在数据兼容、性能优化、组件扩展方面都得到了极大的方便,真可谓“退一步, 海阔天空”。

  • 数据兼容方面 不但兼容hive,还可以从RDD、parquet文件、JSON文件中获取数据,未来版本甚至支持获取RDBMS数据以及cassandra等NOSQL数据
  • 性能优化方面 除了采取In-Memory Columnar Storage、byte-code generation等优化技术外、将会引进Cost Model对查询进行动态评估、获取最佳物理计划等等
  • 组件扩展方面 无论是SQL的语法解析器、分析器还是优化器都可以重新定义,进行扩展。
二 、Spark SQL 的性能。

   摆脱了Hive限制的SparkSQL的性能在Shark的基础上又有了长足的进步。

SparkSQL主要在下面几点做了性能上的优化:

2.1、 内存列存储(In-Memory Columnar Storage)

SparkSQL继承了Shark的内存列存储,内存列存储有诸多好处:

  • GC友好:行存储的情况下每一行会产出一个java对象,而列存储每一列才会产生一个对象,在大数据的情况下,行存储会对GC产生巨大的压力。
  • 压缩友好:相同数据类型的数据在内存中存放在一起,有利于压缩。
  • Cache友好:分析查询中频繁使用的聚合特定列,性能会得到很大的提高,原因就是这些列的数据放在一起,更容易读入内存进行计算。

SparkSQL1.2中把每个列又分成多个batch,这样就可以避免在加载large table的时候出现OOM。

2.2、 SQL

在数据库查询中有一个昂贵的 *** 作是查询语句中的表达式,主要是由于JVM的内存模型引起的。比如如下一个查询:

SELECt a + b FROM table

在这个查询里,如果采用通用的SQL语法途径去处理,会先生成一个表达式树(有两个节点的Add树),在物理处理这个表达式树的时候,将会如图所示的7个步骤:

  • 调用虚函数Add.eval(),需要确认Add两边的数据类型。
  • 调用虚函数a.eval(),需要确认a的数据类型。
  • 确定a的数据类型是Int,装箱。
  • 调用虚函数b.eval(),需要确认b的数据类型。
  • 确定b的数据类型是Int,装箱。
  • 调用Int类型的Add。
  • 返回装箱后的计算结果 其中多次涉及到虚函数的调用,虚函数的调用会打断CPU的正常流水线处理,减缓执行。

Spark1.1.0在catalyst模块的expressions增加了codegen模块,如果使用动态字节码生成技术(配置spark.sql.codegen参数),sparkSQL在执行物理计划的时候,对匹配的表达式采用特定的代码,动态编译,然后运行。
如上图中的例子,开启CG后,SparkSQL最终实现效果类似如下伪代码:

val a: Int = inputRow.getInt(0)
val b: Int = inputRow.getInt(1)
val result: Int = a + b
resultRow.setInt(0, result)

CG优化的实现主要还是依靠scala2.10的reflection和Quasiquotes。CG的性能对比如下图:

2.3 外部数据源Predicate pushdown

  SparkSQL1.2.0可以在读取外部数据以后马上进行filter *** 作,以减少网络传输的数据量;对于Parquet和ORC类型的数据,SparkSQL甚至可以在读取数据的时候就进行某些filter *** 作,以减少磁盘IO。


  主要有两种过滤数据的方式,分别为Column Pruning和Predicte pushdown,分别是针对列和行的过滤。

2.4 Scala代码优化

  SparkSQL在使用Scala编写代码的时候,尽量避免低效的、容易GC的代码;尽管增加了编写代码的难度,但对于用户来说,还是使用统一的接口,没受到使用上的困难。下图是一个scala代码优化的示意图:

三、Spark SQL架构

3.1 Spark SQL特点及作用

特点:

  1. 数据兼容:不仅兼容Hive,还可以从RDD、parquet文件、Json文件获取数据、支持从RDBMS获取数据。
  2. 性能优化:采用内存列式存储、自定义序列化器等方式提升性能。
  3. 组件扩展:SQL的语法解析器、分析器、优化器都可以重新定义和扩展。
  4. 兼容: Hive兼容层面仅依赖HiveQL解析、Hive元数据。
    从HQL被解析成抽象语法树(AST)起,就全部由Spark SQL接管了,Spark SQL执行计划生成和优化都由Catalyst(函数式关系查询优化框架)负责。
    支持: 数据既可以来自RDD,也可以是Hive、HDFS、Cassandra等外部数据源,还可以是JSON格式的数据;
    Spark SQL目前支持Scala、Java、Python三种语言,支持SQL-92规范。

作用:

  1. Spark 中用于处理结构化数据的模块。
  2. 相对于RDD的API来说,提供更多结构化数据信息和计算方法。
  3. 可以通过SQL或DataSet API方式同Spark SQL进行交互。
3.1 Spark SQL数据结构以及发展
RDD------------------->Dataframe-------------------->Dataset
0.0            		     1.3        			       1.6

3.1.1 RDD

优点:

  1. 编译时类型安全,编译时就能检查出类型错误。
  2. 面向对象的编程风格,直接通过class.name的方式来 *** 作数据;idAge.filter(.age > “”) // 编译时报错, int不能跟与String比较
    idAgeRDDPerson.filter(.age > 25) // 直接 *** 作一个个的person对象。

缺点:

  1. 序列化和反序列化的性能开销,无论是集群间的通信, 还是IO *** 作都需要对对象的结构和数据进行序列化反序列化。
  2. GC的性能开销,频繁的创建和销毁对象, 势必会增加GC。
3.1.2 Dataframe

优点:

  1. off-heap类似于地盘, schema类似于地图, 有自己地盘了, 不再受JVM的限制, 也就不再受GC的困扰了。
  2. 通过schema和off-heap, Dataframe克服了RDD的缺点。对比RDD提升计算效率、减少数据读取、底层计算优化。

缺点:

  1. Dataframe克服了RDD的缺点,但是却丢了RDD的优点。
  2. Dataframe不是类型安全的,API也不是面向对象风格的。

核心特征

  1. Dataframe的前身是SchemaRDD,不继承RDD,自己实现了RDD的大部分功能,在Dataframe上调用RDD的方法转化成另外一个RDD。
  2. Dataframe可以看做分布式Row对象的集合,Dataframe 不仅有比RDD更多的算子,还可以进行执行计划的优化。
  3. Dataframe表示为DataSet[Row],即DataSet的子集。
  4. Row :被 Dataframe 自动实现,一行就是一个Row对象。
  5. Schema :包含了以ROW为单位的每行数据的列的信息; Spark通过Schema就能够读懂数据, 因此在通信和IO时就只需要序列化和反序列化数据, 而结构的部分就可以省略了。
  6. off-heap : Spark能够以二进制的形式序列化数据(不包括结构)到off-heap中, 当要 *** 作数据时, 就直接 *** 作off-heap内存。
  7. Tungsten:新的执行引擎。
  8. Catalyst:新的语法解析框架。
3.1.3 Dataset

DataSet不同于RDD,没有使用Java序列化器或者Kryo进行序列化,而是使用一个特定的编码器进行序列化,这些序列化器可以自动生成,而且在spark执行很多 *** 作(过滤、排序、hash)的时候不用进行反序列化。

3.3.2 特点

  1. 编译时的类型安全检查,性能极大的提升,内存使用极大降低、减少GC、极大的减少网络数据的传输,极大的减少scala和java之间代码的差异性。
  2. Dataframe每一个行对应了一个Row。而Dataset的定义更加宽松,每一个record对应了一个任意的类型。Dataframe只是Dataset的一种特例。
  3. 不同于Row是一个泛化的无类型JVM object, Dataset是由一系列的强类型JVM object组成的,Scala的case class或者Java class定义。因此Dataset可以在编译时进行类型检查。
  4. Dataset以Catalyst逻辑执行计划表示,并且数据以编码的二进制形式被存储,不需要反序列化就可以执行sorting、shuffle等 *** 作。
  5. Dataset创立需要一个显式的Encoder,把对象序列化为二进制。
3.2 RDD、Dataframe、DataSet 3.2.1、三者关系以及转换

3.2.2、共性
  1. RDD、Dataframe、Dataset都是spark平台下的分布式d性数据集,为处理超大型数据提供便利;
  2. 三者都有惰性机制。在进行创建、转换时(如map方法),不会立即执行;只有在遇到Action时(如foreach) ,才会开始遍历运算。极端情况下,如果代码里面仅有创建、转换,但后面没有在Action中使用对应的结果,在执行时会被直接跳过;
  3. 三者都有partition的概念,进行缓存(cache) *** 作、还可以进行检查点(checkpoint) *** 作;
  4. 三者有许多相似的函数,如map、filter,排序等;
  5. 在对Dataframe和Dataset进行 *** 作时,很多情况下需要 spark.implicits._ 进行支持;
3.2.2、区别

Dataframe: Dataframe(Dataframe 是 Dataset[Row]的别名):Dataframe = RDD[Row] + schema

  1. 与RDD和Dataset不同,Dataframe每一行的类型固定为Row,只有通过解析才能获取各个字段的值;
  2. Dataframe与Dataset一般与spark ml同时使用;
  3. Dataframe与Dataset均支持sparksql的 *** 作,比如select,groupBy之类,还能注册临时视图,进行sql语句 *** 作;
  4. Dataframe与Dataset支持一些特别方便的保存方式,比如保存成csv,可以带上表头,这样每一列的字段名一目了然;

Dataset: Dataset = RDD[case class].toDS

  1. Dataset和Dataframe拥有完全相同的成员函数,区别只是每一行的数据类型不同;
  2. Dataframe 定义为 Dataset[Row]。每一行的类型是Row,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用前面提到的getAS方法或者模式匹配拿出特定字段;
  3. Dataset每一行的类型都是一个case class,在自定义了case class之后可以很自由的获得每一行的信息;

好啦,关于本篇文章,基本都是来源于copy,但是我们都是站在巨人的肩膀上。加油!


欢迎分享,转载请注明来源:内存溢出

原文地址: http://www.outofmemory.cn/zaji/5687005.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存