Spark-core 行动算子(十)

Spark-core 行动算子(十),第1张

Spark-core 行动算子(十)
Action 算子详解


  上文我们讲过了常用的转换算子,本篇博客记录一下常用的行动算子,那我们开始啦。

一、Action算子和transformations算子的区别
  • transformations算子是惰性算子,也就是说它的触发条件不是代码执行到这,而是需要transformations算子的结果后才执行这个算子,transformations也为我们减少了很多非必要的开销。
  • Action算子是一个触发计算的算子,我们知道Spark是由很多的任务组成,而一个Action便形成了一个新的任务,也及时Task。可能有一个或者多个Task组成了我们的application。

既然都讲到这了,我们顺便讲一讲Spark的任务组成吧。前面说了可能会造成shuffle的算子,这里我们成为shuffle算子,切记shuffle算子和transformations算子、Action算子并不是一回事。每一个shuffle都将是一个stage,一个Task可能有一个或者多个stage组成,然后一个或者多个Task组成了我们的应用。
这个是一个task的DAG(有向无环图),然后如下图,多个stage组成Task,很多的Task组成一个application。我们知道stage的一般形成条件是出现shuffle算子,而Task的形成条件是Action算子,我们可以随便打开一个行动算子源码,可以看到里面均有一行大致相同的代码,sc.runJob就是触发计算的触发器,当调用sc.runJob后才开始真正的计算。

sc.runJob(this, reducePartition, mergeResult)

二、Action算子 1、reduce

  reduce是Spark行动算子里的聚合算子,主要作用是对RDD做聚合作用,比如求和,极值之类的。相对也比较简单。那我们先看一下使用方法:

    // TODO 创建执行环境
    val conf = new SparkConf().setMaster("local[*]").setAppName("create")
    val sc = new SparkContext(conf)
    val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 4, 5, 6, 7, 8, 9))
    // 求和
    val i: Int = rdd1.reduce(_ + _) // output: 42
    // 求最大值
    val i1: Int = rdd1.reduce((x, y) => {
      if (x > y) {
        x
      }
      else
        y
    }) // output: 9

用法就是这样简单,要是想求平均的话,直接在和的基础上在除以元素个数即可。
那我们在看一下源码:

  def reduce(f: (T, T) => T): T = {
    val cleanF = sc.clean(f)
    val reducePartition: Iterator[T] => Option[T] = iter => {
      if (iter.hasNext) {
        Some(iter.reduceLeft(cleanF))   //一个一个迭代,从左开始
      } else {
        None
      }
    }
    var jobResult: Option[T] = None
    val mergeResult = (index: Int, taskResult: Option[T]) => {  //当每个任务执行完后,都会返回一个这样的结果,index为分区的索引
      if (taskResult.isDefined) {
        jobResult = jobResult match {
          case Some(value) => Some(f(value, taskResult.get))   //第二个分区的结果与第一个分区的一起,再用f处理
          case None => taskResult  //当第一个任务完成时,会进入,taskResult 为第一个分区的结果
        }
      }
    }
    sc.runJob(this, reducePartition, mergeResult)   //任务的执行
    // Get the final result out of our Option, or throw an exception if the RDD was empty
    jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
  }

通过源码,可以看到reduce先有reducePartition获取一个分区的结果的方法,然后再合并结果的方法,最后将他们一并传给 sc.runJob开始计算。有人疑问,为什么把Action的方法传入 sc.runJob就可以获取结果,前面提到有向无环图,当我们把action的方法给runJob,先去寻找该行动算子计算所需要的数据,接着再寻找这个数据的来源,又有人要问了,那怎么寻找了?看上图有向无环图,在Saprk内部维护了一个有向无环图,当我们需要这个数据时候,我们就开始一级一级的往上寻找,直至找到已存在的数据源,然后再往回一级一级的计算,直到获取最后的Action结果,有点绕,慢慢思考一下就可以了。

2、count

  count算子就是获取RDD中元素个数,用法简单,先看一下用法:

    val conf = new SparkConf().setMaster("local[*]").setAppName("create")
    val sc = new SparkContext(conf)
    val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 4, 5, 6, 7, 8, 9))
    val rdd: RDD[(String, Double)] = sc.makeRDD(List(("Tom", 88.0), ("Tom", 90.0), ("jack", 86.0), ("Toni", 85.0), ("jack", 82.0), ("Tom", 93.0)),2)
    val l: Long = rdd1.count() // 8

源码:

  
  def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
3、collect

  collect算子就是将RDD元素以scala的数据结构Array形式展示,可以直接打印:

    val conf = new SparkConf().setMaster("local[*]").setAppName("create")
    val sc = new SparkContext(conf)
    val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 4, 5, 6, 7, 8, 9))
    val rdd: RDD[(String, Double)] = sc.makeRDD(List(("Tom", 88.0), ("Tom", 90.0), ("jack", 86.0), ("Toni", 85.0), ("jack", 82.0), ("Tom", 93.0)),2)
    val ints: Array[Int] = rdd1.collect()
    val tuples: Array[(String, Double)] = rdd.collect()
    println(ints.mkString(",")) // 1,2,4,5,6,7,8,9
    println(tuples.mkString(",")) // (Tom,88.0),(Tom,90.0),(jack,86.0),(Toni,85.0),(jack,82.0),(Tom,93.0)

源码:

  
  def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }

注:该算子仅在数据较少的情况下爱适用,否则全量数据加载在driver上会OOM。

4、first、take

  first算子就是将RDD元素的首个元素,take可以获取RDD指定个数元素;

    val conf = new SparkConf().setMaster("local[*]").setAppName("create")
    val sc = new SparkContext(conf)
    val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 4, 5, 6, 7, 8, 9))
    val rdd: RDD[(String, Double)] = sc.makeRDD(List(("Tom", 88.0), ("Tom", 90.0), ("jack", 86.0), ("Toni", 85.0), ("jack", 82.0), ("Tom", 93.0)),2)
    val ints: Array[Int] = rdd1.take(3)
    val tuple: (String, Double) = rdd.first()
    println(ints.mkString(","))
    println(tuple)

源码:
first

  
  def first(): T = withScope {
    take(1) match {
      case Array(t) => t
      case _ => throw new UnsupportedOperationException("empty collection")
    }
  }

take()

  
  def take(num: Int): Array[T] = withScope {
    val scaleUpFactor = Math.max(conf.get(RDD_LIMIT_SCALE_UP_FACTOR), 2)
    if (num == 0) {
      new Array[T](0)
    } else {
      val buf = new ArrayBuffer[T]
      val totalParts = this.partitions.length
      var partsScanned = 0
      while (buf.size < num && partsScanned < totalParts) {
        // The number of partitions to try in this iteration. It is ok for this number to be
        // greater than totalParts because we actually cap it at totalParts in runJob.
        var numPartsToTry = 1L
        val left = num - buf.size
        if (partsScanned > 0) {
          // If we didn't find any rows after the previous iteration, quadruple and retry.
          // Otherwise, interpolate the number of partitions we need to try, but overestimate
          // it by 50%. We also cap the estimation in the end.
          if (buf.isEmpty) {
            numPartsToTry = partsScanned * scaleUpFactor
          } else {
            // As left > 0, numPartsToTry is always >= 1
            numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt
            numPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor)
          }
        }

        val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
        val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)

        res.foreach(buf ++= _.take(num - buf.size))
        partsScanned += p.size
      }

      buf.toArray
    }
  }

注:该算子仅在数据较少的情况下爱适用,否则全量数据加载在driver上会OOM。

5、takeSample

  takeSample算子随机获取指定数量的RDD元素,可以指定随机种子,以及是否放回取样。
用法:

    val conf = new SparkConf().setMaster("local[*]").setAppName("create")
    val sc = new SparkContext(conf)
    val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 4, 5, 6, 7, 8, 9))
    val rdd: RDD[(String, Double)] = sc.makeRDD(List(("Tom", 88.0), ("Tom", 90.0), ("jack", 86.0), ("Toni", 85.0), ("jack", 82.0), ("Tom", 93.0)),2)
    val ints: Array[Int] = rdd1.takeSample(withReplacement = true,num = 3)
    println(ints.mkString(",")) // 2,1,1
    val ints1: Array[Int] = rdd1.takeSample(withReplacement=true,num = 3)
    println(ints1.mkString(",")) // 9,2,8

withReplacement参数为true式放回取样,所以会有重复取出同一元素的可能。
源码:

  
  def takeSample(
      withReplacement: Boolean,
      num: Int,
      seed: Long = Utils.random.nextLong): Array[T] = withScope {
    val numStDev = 10.0

    require(num >= 0, "Negative number of elements requested")
    require(num <= (Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt),
      "Cannot support a sample size > Int.MaxValue - " +
      s"$numStDev * math.sqrt(Int.MaxValue)")

    if (num == 0) {
      new Array[T](0)
    } else {
      val initialCount = this.count()
      if (initialCount == 0) {
        new Array[T](0)
      } else {
        val rand = new Random(seed)
        if (!withReplacement && num >= initialCount) {
          Utils.randomizeInPlace(this.collect(), rand)
        } else {
          val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount,
            withReplacement)
          var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()

          // If the first sample didn't turn out large enough, keep trying to take samples;
          // this shouldn't happen often because we use a big multiplier for the initial size
          var numIters = 0
          while (samples.length < num) {
            logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters")
            samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
            numIters += 1
          }
          Utils.randomizeInPlace(samples, rand).take(num)
        }
      }
    }
  }

注:该算子仅在数据较少的情况下爱适用,否则全量数据加载在driver上会OOM。

5、takeOrdered

  takeOrdered算子按照排序规则返回前面的指定数量数据。该算子默认是降序,如果想要其他排序规则,可以自定义排序并继承于Ordering。

    val conf = new SparkConf().setMaster("local[*]").setAppName("create")
    val sc = new SparkContext(conf)
    val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 4, 5, 6, 7, 8, 9))
    val rdd: RDD[(String, Double)] = sc.makeRDD(List(("Tom", 88.0), ("Tom", 90.0), ("jack", 86.0), ("Toni", 85.0), ("jack", 82.0), ("Tom", 93.0)),2)
    val ord = new MyOrdering()
    val ints: Array[Int] = rdd1.takeOrdered(num = 3)(ord)
    println(ints.mkString(",")) // 2,1,1
  }
  class MyOrdering extends Ordering[Int] {
    override def compare(x: Int, y: Int): Int = {
      if (x < y) 1 else if (x == y) 0 else -1
    }
  }

源码:

  
  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
    if (num == 0) {
      Array.empty
    } else {
      val mapRDDs = mapPartitions { items =>
        // Priority keeps the largest elements, so let's reverse the ordering.
        val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
        queue ++= collectionUtils.takeOrdered(items, num)(ord)
        Iterator.single(queue)
      }
      if (mapRDDs.partitions.length == 0) {
        Array.empty
      } else {
        mapRDDs.reduce { (queue1, queue2) =>
          queue1 ++= queue2
          queue1
        }.toArray.sorted(ord)
      }
    }
  }
6、countByKey

  countByKey算子获取在按照键值分组下的元素数量。

    val conf = new SparkConf().setMaster("local[*]").setAppName("create")
    val sc = new SparkContext(conf)
    val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 4, 5, 6, 7, 8, 9))
    val rdd: RDD[(String, Double)] = sc.makeRDD(List(("Tom", 88.0), ("Tom", 90.0), ("jack", 86.0), ("Toni", 85.0), ("jack", 82.0), ("Tom", 93.0)), 2)
    val stringToLong: collection.Map[String, Long] = rdd.countByKey()
    println(stringToLong.mkString(",")) // Tom -> 3,Toni -> 1,jack -> 2

源码:

  def countByKey(): Map[K, Long] = self.withScope {
    self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
  }

源码中,可以看到也是用到了collect()。

7、saveAsTextFile、saveAsSequenceFile、saveAsObjectFile

  落盘算子,这个就是直接将结果以文件的形式保存。

    val conf = new SparkConf().setMaster("local[*]").setAppName("create")
    val sc = new SparkContext(conf)
    val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 4, 5, 6, 7, 8, 9))
    val rdd: RDD[(String, Double)] = sc.makeRDD(List(("Tom", 88.0), ("Tom", 90.0), ("jack", 86.0), ("Toni", 85.0), ("jack", 82.0), ("Tom", 93.0)), 2)
    rdd.saveAsTextFile("path")// 该路径要存在
8、foreach

  foreach算子重点注意下,要和我们scala的遍历元素的函数区分,spark下的foreach是作用在每一个分区,但是scala元素遍历下的foreach仅仅在一个节点上。
从执行结果上我们可以区分:

    val conf = new SparkConf().setMaster("local[*]").setAppName("create")
    val sc = new SparkContext(conf)
    val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 4, 5, 6, 7, 8, 9))
    val rdd: RDD[(String, Double)] = sc.makeRDD(List(("Tom", 88.0), ("Tom", 90.0), ("jack", 86.0), ("Toni", 85.0), ("jack", 82.0), ("Tom", 93.0)), 2)
    rdd1.foreach(print) // 295871641
    rdd1.collect().foreach(print) // 12456789

spark中的foreach直接作用在分区,也就是executor,但是元素遍历的foreach是作用在driver,所以打印的顺序是和我们样例数据是一致的,主要是因为在collect算子中将数据从executor汇总到driver已经保证了顺序。但是spark中的foreach作用于executor,数据随机分配在executor上面,没法保证顺序。
看一下Spark中的源码:

  // Actions (launch a job to return a value to the user program)

  
  def foreach(f: T => Unit): Unit = withScope {
    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
  }

普通遍历中的源码:

  def foreach[U](f: A => U): Unit = {
    var i = 0
    val len = length
    while (i < len) { f(this(i)); i += 1 }
  }

通过源码,体会下即可明白不同之处。

三、RDD持久化 (RDD Persistence)

  Spark 中最重要的功能之一是跨 *** 作在内存中持久化(或缓存)数据集。当你持久化一个 RDD 时,每个节点都会存储它在内存中计算的任何分区,并在该数据集(或从它派生的数据集)的其他 *** 作中重用它们。这使得以后的算子可以更快(通常超过 10 倍)执行。缓存是迭代算法和快速交互使用的关键工具。您可以使用 persist() 或 cache() 方法将 RDD 标记为持久化。第一次在计算它时,它将保存在节点的内存中。 Spark 的缓存是容错的——如果 RDD 的任何分区丢失,它将使用最初创建它的转换自动重新计算。此外,每个持久化的 RDD 可以使用不同的存储级别存储,例如,允许您将数据集持久化在磁盘上,将其持久化在内存中,但作为序列化的 Java 对象(以节省空间),跨节点复制它。这些级别是通过将 StorageLevel 对象(Scala、Java、Python)传递给 persist() 来设置的。 cache() 方法是使用默认存储级别的简写,即 StorageLevel.MEMORY_ONLY(将反序列化的对象存储在内存中)
完整的存储级别集是:

Storage LevelMeaningMEMORY_ONLY将 RDD 作为反序列化的 Java 对象存储在 JVM 中。如果 RDD 不适合内存,某些分区将不会被缓存,并且会在每次需要时重新计算。这是默认级别。MEMORY_AND_DISK将 RDD 作为反序列化的 Java 对象存储在 JVM 中。如果 RDD 不适合内存,则存储在磁盘的分区,并在需要时从那里读取它们。MEMORY_ONLY_SER (Java and Scala)将 RDD 存储为序列化的 Java 对象(每个分区一个字节数组)。这通常比反序列化对象更节省空间,尤其是在使用快速序列化程序时,但读取时更占用 CPU。MEMORY_AND_DISK_SER (Java and Scala)与 MEMORY_ONLY_SER 类似,但将不适合内存的分区溢出到磁盘,而不是在每次需要时即时重新计算它们。DISK_ONLY仅将 RDD 分区存储在磁盘上。MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.与上述级别相同,但在两个集群节点上复制每个分区。OFF_HEAP (experimental)类似于 MEMORY_ONLY_SER,但将数据存储在堆外内存中。这需要启用堆外内存。

注意:在 Python 中,存储的对象将始终使用 Pickle library( Python object serialization) 进行序列化,因此是否选择序列化级别无关紧要。 Python 中可用的存储级别包括 MEMORY_ONLY、MEMORY_ONLY_2、MEMORY_AND_DISK、MEMORY_AND_DISK_2、DISK_ONLY、DISK_ONLY_2 和 DISK_ONLY_3。即使没有用户调用persist,Spark也会在shuffle *** 作(例如reduceByKey)中自动持久化一些中间数据。这样做是为了避免在 shuffle 期间节点失败时重新计算整个输入。如果用户打算重用它,我们仍然建议用户在生成的 RDD 上调用持久化。

那我们怎么选择持久化的存储级别呢?

  Spark 的存储级别旨在提供内存使用和 CPU 效率之间的不同权衡。我们建议通过以下过程来选择一个:

  • 如果您的 RDD 与默认存储级别 (MEMORY_ONLY) 相适应,请保持原样。这是 CPU 效率最高的选项,允许 RDD 上的 *** 作尽可能快地运行。
  • 如果没有,请尝试使用 MEMORY_ONLY_SER 并选择快速序列化库,以使对象更节省空间,但访问速度仍然相当快。 (Java 和 Scala)
  • 尽量不要溢出到磁盘,除非计算数据集的函数很昂贵,或者它们过滤了大量数据。否则,重新计算分区可能与从磁盘读取分区一样快。
  • 如果您想要快速故障恢复(例如,如果使用 Spark 来处理来自 Web 应用程序的请求),请使用复制的存储级别。所有存储级别都通过重新计算丢失的数据来提供完全容错,但复制的存储级别让您可以继续在 RDD 上运行任务,而无需等待重新计算丢失的分区。
移除RDD缓存

  Spark 自动监控每个节点上的缓存使用情况,并以最近最少使用 (LRU) 的方式丢弃旧数据分区。如果你想手动删除一个 RDD 而不是等待它从缓存中掉出来,使用 RDD.unpersist() 方法。请注意,此方法默认不阻塞。要阻塞直到资源被释放,调用此方法时指定blocking=true。


关于RDD的Action和缓存,暂时讲到这里,我知道还有一些Action没有讲到,一来是因为太过于简单,类似于mean、max等算子,类似于count,直接上手用即可。二者是因为一些算子目前我还没有常用,一些坑没有踩完,但是后续会持续补充。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://www.outofmemory.cn/zaji/5683053.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存