上文我们讲过了常用的转换算子,本篇博客记录一下常用的行动算子,那我们开始啦。
- transformations算子是惰性算子,也就是说它的触发条件不是代码执行到这,而是需要transformations算子的结果后才执行这个算子,transformations也为我们减少了很多非必要的开销。
- Action算子是一个触发计算的算子,我们知道Spark是由很多的任务组成,而一个Action便形成了一个新的任务,也及时Task。可能有一个或者多个Task组成了我们的application。
既然都讲到这了,我们顺便讲一讲Spark的任务组成吧。前面说了可能会造成shuffle的算子,这里我们成为shuffle算子,切记shuffle算子和transformations算子、Action算子并不是一回事。每一个shuffle都将是一个stage,一个Task可能有一个或者多个stage组成,然后一个或者多个Task组成了我们的应用。
这个是一个task的DAG(有向无环图),然后如下图,多个stage组成Task,很多的Task组成一个application。我们知道stage的一般形成条件是出现shuffle算子,而Task的形成条件是Action算子,我们可以随便打开一个行动算子源码,可以看到里面均有一行大致相同的代码,sc.runJob就是触发计算的触发器,当调用sc.runJob后才开始真正的计算。
sc.runJob(this, reducePartition, mergeResult)二、Action算子 1、reduce
reduce是Spark行动算子里的聚合算子,主要作用是对RDD做聚合作用,比如求和,极值之类的。相对也比较简单。那我们先看一下使用方法:
// TODO 创建执行环境 val conf = new SparkConf().setMaster("local[*]").setAppName("create") val sc = new SparkContext(conf) val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 4, 5, 6, 7, 8, 9)) // 求和 val i: Int = rdd1.reduce(_ + _) // output: 42 // 求最大值 val i1: Int = rdd1.reduce((x, y) => { if (x > y) { x } else y }) // output: 9
用法就是这样简单,要是想求平均的话,直接在和的基础上在除以元素个数即可。
那我们在看一下源码:
def reduce(f: (T, T) => T): T = { val cleanF = sc.clean(f) val reducePartition: Iterator[T] => Option[T] = iter => { if (iter.hasNext) { Some(iter.reduceLeft(cleanF)) //一个一个迭代,从左开始 } else { None } } var jobResult: Option[T] = None val mergeResult = (index: Int, taskResult: Option[T]) => { //当每个任务执行完后,都会返回一个这样的结果,index为分区的索引 if (taskResult.isDefined) { jobResult = jobResult match { case Some(value) => Some(f(value, taskResult.get)) //第二个分区的结果与第一个分区的一起,再用f处理 case None => taskResult //当第一个任务完成时,会进入,taskResult 为第一个分区的结果 } } } sc.runJob(this, reducePartition, mergeResult) //任务的执行 // Get the final result out of our Option, or throw an exception if the RDD was empty jobResult.getOrElse(throw new UnsupportedOperationException("empty collection")) }
通过源码,可以看到reduce先有reducePartition获取一个分区的结果的方法,然后再合并结果的方法,最后将他们一并传给 sc.runJob开始计算。有人疑问,为什么把Action的方法传入 sc.runJob就可以获取结果,前面提到有向无环图,当我们把action的方法给runJob,先去寻找该行动算子计算所需要的数据,接着再寻找这个数据的来源,又有人要问了,那怎么寻找了?看上图有向无环图,在Saprk内部维护了一个有向无环图,当我们需要这个数据时候,我们就开始一级一级的往上寻找,直至找到已存在的数据源,然后再往回一级一级的计算,直到获取最后的Action结果,有点绕,慢慢思考一下就可以了。
2、countcount算子就是获取RDD中元素个数,用法简单,先看一下用法:
val conf = new SparkConf().setMaster("local[*]").setAppName("create") val sc = new SparkContext(conf) val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 4, 5, 6, 7, 8, 9)) val rdd: RDD[(String, Double)] = sc.makeRDD(List(("Tom", 88.0), ("Tom", 90.0), ("jack", 86.0), ("Toni", 85.0), ("jack", 82.0), ("Tom", 93.0)),2) val l: Long = rdd1.count() // 8
源码:
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum3、collect
collect算子就是将RDD元素以scala的数据结构Array形式展示,可以直接打印:
val conf = new SparkConf().setMaster("local[*]").setAppName("create") val sc = new SparkContext(conf) val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 4, 5, 6, 7, 8, 9)) val rdd: RDD[(String, Double)] = sc.makeRDD(List(("Tom", 88.0), ("Tom", 90.0), ("jack", 86.0), ("Toni", 85.0), ("jack", 82.0), ("Tom", 93.0)),2) val ints: Array[Int] = rdd1.collect() val tuples: Array[(String, Double)] = rdd.collect() println(ints.mkString(",")) // 1,2,4,5,6,7,8,9 println(tuples.mkString(",")) // (Tom,88.0),(Tom,90.0),(jack,86.0),(Toni,85.0),(jack,82.0),(Tom,93.0)
源码:
def collect(): Array[T] = withScope { val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*) }
注:该算子仅在数据较少的情况下爱适用,否则全量数据加载在driver上会OOM。
4、first、takefirst算子就是将RDD元素的首个元素,take可以获取RDD指定个数元素;
val conf = new SparkConf().setMaster("local[*]").setAppName("create") val sc = new SparkContext(conf) val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 4, 5, 6, 7, 8, 9)) val rdd: RDD[(String, Double)] = sc.makeRDD(List(("Tom", 88.0), ("Tom", 90.0), ("jack", 86.0), ("Toni", 85.0), ("jack", 82.0), ("Tom", 93.0)),2) val ints: Array[Int] = rdd1.take(3) val tuple: (String, Double) = rdd.first() println(ints.mkString(",")) println(tuple)
源码:
first
def first(): T = withScope { take(1) match { case Array(t) => t case _ => throw new UnsupportedOperationException("empty collection") } }
take()
def take(num: Int): Array[T] = withScope { val scaleUpFactor = Math.max(conf.get(RDD_LIMIT_SCALE_UP_FACTOR), 2) if (num == 0) { new Array[T](0) } else { val buf = new ArrayBuffer[T] val totalParts = this.partitions.length var partsScanned = 0 while (buf.size < num && partsScanned < totalParts) { // The number of partitions to try in this iteration. It is ok for this number to be // greater than totalParts because we actually cap it at totalParts in runJob. var numPartsToTry = 1L val left = num - buf.size if (partsScanned > 0) { // If we didn't find any rows after the previous iteration, quadruple and retry. // Otherwise, interpolate the number of partitions we need to try, but overestimate // it by 50%. We also cap the estimation in the end. if (buf.isEmpty) { numPartsToTry = partsScanned * scaleUpFactor } else { // As left > 0, numPartsToTry is always >= 1 numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt numPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor) } } val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt) val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p) res.foreach(buf ++= _.take(num - buf.size)) partsScanned += p.size } buf.toArray } }
注:该算子仅在数据较少的情况下爱适用,否则全量数据加载在driver上会OOM。
5、takeSample takeSample算子随机获取指定数量的RDD元素,可以指定随机种子,以及是否放回取样。
用法:
val conf = new SparkConf().setMaster("local[*]").setAppName("create") val sc = new SparkContext(conf) val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 4, 5, 6, 7, 8, 9)) val rdd: RDD[(String, Double)] = sc.makeRDD(List(("Tom", 88.0), ("Tom", 90.0), ("jack", 86.0), ("Toni", 85.0), ("jack", 82.0), ("Tom", 93.0)),2) val ints: Array[Int] = rdd1.takeSample(withReplacement = true,num = 3) println(ints.mkString(",")) // 2,1,1 val ints1: Array[Int] = rdd1.takeSample(withReplacement=true,num = 3) println(ints1.mkString(",")) // 9,2,8
withReplacement参数为true式放回取样,所以会有重复取出同一元素的可能。
源码:
def takeSample( withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T] = withScope { val numStDev = 10.0 require(num >= 0, "Negative number of elements requested") require(num <= (Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt), "Cannot support a sample size > Int.MaxValue - " + s"$numStDev * math.sqrt(Int.MaxValue)") if (num == 0) { new Array[T](0) } else { val initialCount = this.count() if (initialCount == 0) { new Array[T](0) } else { val rand = new Random(seed) if (!withReplacement && num >= initialCount) { Utils.randomizeInPlace(this.collect(), rand) } else { val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount, withReplacement) var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect() // If the first sample didn't turn out large enough, keep trying to take samples; // this shouldn't happen often because we use a big multiplier for the initial size var numIters = 0 while (samples.length < num) { logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters") samples = this.sample(withReplacement, fraction, rand.nextInt()).collect() numIters += 1 } Utils.randomizeInPlace(samples, rand).take(num) } } } }
注:该算子仅在数据较少的情况下爱适用,否则全量数据加载在driver上会OOM。
5、takeOrderedtakeOrdered算子按照排序规则返回前面的指定数量数据。该算子默认是降序,如果想要其他排序规则,可以自定义排序并继承于Ordering。
val conf = new SparkConf().setMaster("local[*]").setAppName("create") val sc = new SparkContext(conf) val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 4, 5, 6, 7, 8, 9)) val rdd: RDD[(String, Double)] = sc.makeRDD(List(("Tom", 88.0), ("Tom", 90.0), ("jack", 86.0), ("Toni", 85.0), ("jack", 82.0), ("Tom", 93.0)),2) val ord = new MyOrdering() val ints: Array[Int] = rdd1.takeOrdered(num = 3)(ord) println(ints.mkString(",")) // 2,1,1 } class MyOrdering extends Ordering[Int] { override def compare(x: Int, y: Int): Int = { if (x < y) 1 else if (x == y) 0 else -1 } }
源码:
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope { if (num == 0) { Array.empty } else { val mapRDDs = mapPartitions { items => // Priority keeps the largest elements, so let's reverse the ordering. val queue = new BoundedPriorityQueue[T](num)(ord.reverse) queue ++= collectionUtils.takeOrdered(items, num)(ord) Iterator.single(queue) } if (mapRDDs.partitions.length == 0) { Array.empty } else { mapRDDs.reduce { (queue1, queue2) => queue1 ++= queue2 queue1 }.toArray.sorted(ord) } } }6、countByKey
countByKey算子获取在按照键值分组下的元素数量。
val conf = new SparkConf().setMaster("local[*]").setAppName("create") val sc = new SparkContext(conf) val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 4, 5, 6, 7, 8, 9)) val rdd: RDD[(String, Double)] = sc.makeRDD(List(("Tom", 88.0), ("Tom", 90.0), ("jack", 86.0), ("Toni", 85.0), ("jack", 82.0), ("Tom", 93.0)), 2) val stringToLong: collection.Map[String, Long] = rdd.countByKey() println(stringToLong.mkString(",")) // Tom -> 3,Toni -> 1,jack -> 2
源码:
def countByKey(): Map[K, Long] = self.withScope { self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap }
源码中,可以看到也是用到了collect()。
7、saveAsTextFile、saveAsSequenceFile、saveAsObjectFile落盘算子,这个就是直接将结果以文件的形式保存。
val conf = new SparkConf().setMaster("local[*]").setAppName("create") val sc = new SparkContext(conf) val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 4, 5, 6, 7, 8, 9)) val rdd: RDD[(String, Double)] = sc.makeRDD(List(("Tom", 88.0), ("Tom", 90.0), ("jack", 86.0), ("Toni", 85.0), ("jack", 82.0), ("Tom", 93.0)), 2) rdd.saveAsTextFile("path")// 该路径要存在8、foreach
foreach算子重点注意下,要和我们scala的遍历元素的函数区分,spark下的foreach是作用在每一个分区,但是scala元素遍历下的foreach仅仅在一个节点上。
从执行结果上我们可以区分:
val conf = new SparkConf().setMaster("local[*]").setAppName("create") val sc = new SparkContext(conf) val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 4, 5, 6, 7, 8, 9)) val rdd: RDD[(String, Double)] = sc.makeRDD(List(("Tom", 88.0), ("Tom", 90.0), ("jack", 86.0), ("Toni", 85.0), ("jack", 82.0), ("Tom", 93.0)), 2) rdd1.foreach(print) // 295871641 rdd1.collect().foreach(print) // 12456789
spark中的foreach直接作用在分区,也就是executor,但是元素遍历的foreach是作用在driver,所以打印的顺序是和我们样例数据是一致的,主要是因为在collect算子中将数据从executor汇总到driver已经保证了顺序。但是spark中的foreach作用于executor,数据随机分配在executor上面,没法保证顺序。
看一下Spark中的源码:
// Actions (launch a job to return a value to the user program) def foreach(f: T => Unit): Unit = withScope { val cleanF = sc.clean(f) sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) }
普通遍历中的源码:
def foreach[U](f: A => U): Unit = { var i = 0 val len = length while (i < len) { f(this(i)); i += 1 } }
通过源码,体会下即可明白不同之处。
三、RDD持久化 (RDD Persistence) Spark 中最重要的功能之一是跨 *** 作在内存中持久化(或缓存)数据集。当你持久化一个 RDD 时,每个节点都会存储它在内存中计算的任何分区,并在该数据集(或从它派生的数据集)的其他 *** 作中重用它们。这使得以后的算子可以更快(通常超过 10 倍)执行。缓存是迭代算法和快速交互使用的关键工具。您可以使用 persist() 或 cache() 方法将 RDD 标记为持久化。第一次在计算它时,它将保存在节点的内存中。 Spark 的缓存是容错的——如果 RDD 的任何分区丢失,它将使用最初创建它的转换自动重新计算。此外,每个持久化的 RDD 可以使用不同的存储级别存储,例如,允许您将数据集持久化在磁盘上,将其持久化在内存中,但作为序列化的 Java 对象(以节省空间),跨节点复制它。这些级别是通过将 StorageLevel 对象(Scala、Java、Python)传递给 persist() 来设置的。 cache() 方法是使用默认存储级别的简写,即 StorageLevel.MEMORY_ONLY(将反序列化的对象存储在内存中)
完整的存储级别集是:
注意:在 Python 中,存储的对象将始终使用 Pickle library( Python object serialization) 进行序列化,因此是否选择序列化级别无关紧要。 Python 中可用的存储级别包括 MEMORY_ONLY、MEMORY_ONLY_2、MEMORY_AND_DISK、MEMORY_AND_DISK_2、DISK_ONLY、DISK_ONLY_2 和 DISK_ONLY_3。即使没有用户调用persist,Spark也会在shuffle *** 作(例如reduceByKey)中自动持久化一些中间数据。这样做是为了避免在 shuffle 期间节点失败时重新计算整个输入。如果用户打算重用它,我们仍然建议用户在生成的 RDD 上调用持久化。
Spark 的存储级别旨在提供内存使用和 CPU 效率之间的不同权衡。我们建议通过以下过程来选择一个:
- 如果您的 RDD 与默认存储级别 (MEMORY_ONLY) 相适应,请保持原样。这是 CPU 效率最高的选项,允许 RDD 上的 *** 作尽可能快地运行。
- 如果没有,请尝试使用 MEMORY_ONLY_SER 并选择快速序列化库,以使对象更节省空间,但访问速度仍然相当快。 (Java 和 Scala)
- 尽量不要溢出到磁盘,除非计算数据集的函数很昂贵,或者它们过滤了大量数据。否则,重新计算分区可能与从磁盘读取分区一样快。
- 如果您想要快速故障恢复(例如,如果使用 Spark 来处理来自 Web 应用程序的请求),请使用复制的存储级别。所有存储级别都通过重新计算丢失的数据来提供完全容错,但复制的存储级别让您可以继续在 RDD 上运行任务,而无需等待重新计算丢失的分区。
Spark 自动监控每个节点上的缓存使用情况,并以最近最少使用 (LRU) 的方式丢弃旧数据分区。如果你想手动删除一个 RDD 而不是等待它从缓存中掉出来,使用 RDD.unpersist() 方法。请注意,此方法默认不阻塞。要阻塞直到资源被释放,调用此方法时指定blocking=true。
关于RDD的Action和缓存,暂时讲到这里,我知道还有一些Action没有讲到,一来是因为太过于简单,类似于mean、max等算子,类似于count,直接上手用即可。二者是因为一些算子目前我还没有常用,一些坑没有踩完,但是后续会持续补充。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)