- 一,程序入口
- 1.1,测试代码
- 1.2, *** 作之间的依赖关系
- 1.3,生成的RDD之间的依赖关系
- 二,Spark cache,persist源码跟踪。
- 2.1,缓存运行关键日志
- 2.2,注册需要缓存的RDD
- 2.3,缓存的实现
- 2.4 缓存会重构RDD血缘关系?
- 2.5 缓存是application级别的
- 三。检查点checkpoint
- 3.1 运行关键日志
- 3.2 checkpoint源码跟踪
- 3.3 checkpoint独立于application
- 四,检查点与缓存执行顺序。
- 五,总结
def main(args: Array[String]): Unit = { log.info("-------begin---------") val sparkConnf=new SparkConf().setAppName("persistTest").setMaster("local[3]") val sparkContext=new SparkContext(sparkConnf) sparkContext.setCheckpointDir("./") val rdd = sparkContext.parallelize(Array(1,1, 2, 3, 4,4,5), 2) val mapRDD=rdd.map(num=>{ println("--------map----------") (num,1)}) mapRDD.persist() // mapRDD.checkpoint() val shuffledRDD = mapRDD.reduceByKey(_ + _) log.info("---1---"+mapRDD.toDebugString) log.info("---2---"+shuffledRDD.toDebugString) shuffledRDD.collect().map(x=> log.info(x._1+":"+x._2)) log.info("+++3+++"+mapRDD.toDebugString) log.info("+++4+++"+shuffledRDD.toDebugString) println("---5---") printRDD(mapRDD) println("---6---") printRDD(shuffledRDD) val groupRDD = mapRDD.groupByKey(2) groupRDD.collect().map(println(_)) sparkContext.stop() } def printRDD(rdd:RDD[_]): Unit ={ println(rdd.getClass.getName) if(!rdd.dependencies.isEmpty){ rdd.dependencies.map(dependcy=>printRDD(dependcy.rdd)) } }1.2, *** 作之间的依赖关系 1.3,生成的RDD之间的依赖关系 二,Spark cache,persist源码跟踪。 2.1,缓存运行关键日志
21/12/19 10:02:25 INFO WordCount: ---log1---(2) MapPartitionsRDD[1] at map at WordCount.scala:17 [Memory Deserialized 1x Replicated] | ParallelCollectionRDD[0] at parallelize at WordCount.scala:16 [Memory Deserialized 1x Replicated] 21/12/19 10:02:25 INFO WordCount: ---log2---(2) ShuffledRDD[2] at reduceByKey at WordCount.scala:23 [] +-(2) MapPartitionsRDD[1] at map at WordCount.scala:17 [] | ParallelCollectionRDD[0] at parallelize at --------map---------- --------map---------- --------map---------- --------map---------- --------map---------- --------map---------- --------map---------- 21/12/19 10:02:27 INFO WordCount: 4:2 21/12/19 10:02:27 INFO WordCount: 2:1 21/12/19 10:02:27 INFO WordCount: 1:2 21/12/19 10:02:27 INFO WordCount: 3:1 21/12/19 10:02:27 INFO WordCount: 5:1 21/12/19 10:02:27 INFO WordCount: +++log3+++(2) MapPartitionsRDD[1] at map at WordCount.scala:17 [Memory Deserialized 1x Replicated] | CachedPartitions: 2; MemorySize: 288.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B | ParallelCollectionRDD[0] at parallelize at WordCount.scala:16 [Memory Deserialized 1x Replicated] 21/12/19 10:02:27 INFO WordCount: +++log4+++(2) ShuffledRDD[2] at reduceByKey at WordCount.scala:23 [] +-(2) MapPartitionsRDD[1] at map at WordCount.scala:17 [] | CachedPartitions: 2; MemorySize: 288.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B | ParallelCollectionRDD[0] at parallelize at WordCount.scala:16 [] ---log5--- org.apache.spark.rdd.MapPartitionsRDD org.apache.spark.rdd.ParallelCollectionRDD ---log6--- org.apache.spark.rdd.ShuffledRDD org.apache.spark.rdd.MapPartitionsRDD org.apache.spark.rdd.ParallelCollectionRDD2.2,注册需要缓存的RDD
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY) def cache(): this.type = persist()
cache其实就是存储级别为MEMORY_ONLY的persist。
def persist(newLevel: StorageLevel): this.type = { if (isLocallyCheckpointed) { // This means the user previously called localCheckpoint(), which should have already // marked this RDD for persisting. Here we should override the old storage level with // one that is explicitly requested by the user (after adapting it to use disk). persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true) } else { persist(newLevel, allowOverride = false) } }
先不关注Checkpoint的情况,查看persist(newLevel: StorageLevel, allowOverride: Boolean): this.type 方法
RDD.scala
private var storageLevel: StorageLevel = StorageLevel.NONE private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = { // TODO: Handle changes of StorageLevel if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) { throw new UnsupportedOperationException( "Cannot change storage level of an RDD after it was already assigned a level") } // If this is the first time this RDD is marked for persisting, register it // with the SparkContext for cleanups and accounting. Do this only once. if (storageLevel == StorageLevel.NONE) { sc.cleaner.foreach(_.registerRDDForCleanup(this)) sc.persistRDD(this) } storageLevel = newLevel this }
SparkContext.scala
private[spark] def persistRDD(rdd: RDD[_]): Unit = { persistentRdds(rdd.id) = rdd } // Keeps track of all persisted RDDs private[spark] val persistentRdds = { val map: ConcurrentMap[Int, RDD[_]] = new MapMaker().weakValues().makeMap[Int, RDD[_]]() map.asScala }
这里只是将需要缓存的RDD注册到SparkContext的对象中并改变了类成员storageLevel的值,并没有干实际的工作,也没法干因为这时候还没有数据自然没法缓存。RDD本身只封装了运算逻辑,其中并没有保存数据,只有行动RDD提交job触发计算的时候,rdd才会关联到数据,所以继续关注rdd计算相关的iterator,compute方法。
参考: spark源码跟踪(六)RDD逻辑代码执行.
RDD.scala
final def iterator(split: Partition, context: TaskContext): Iterator[T] = { if (storageLevel != StorageLevel.NONE) { getOrCompute(split, context) } else { computeOrReadCheckpoint(split, context) } } private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = { val blockId = RDDBlockId(id, partition.index) var readCachedBlock = true // This method is called on executors, so we need call SparkEnv.get instead of sc.env. SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => { readCachedBlock = false computeOrReadCheckpoint(partition, context) }) match { case Left(blockResult) => if (readCachedBlock) { val existingMetrics = context.taskMetrics().inputMetrics existingMetrics.incBytesRead(blockResult.bytes) new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) { override def next(): T = { existingMetrics.incRecordsRead(1) delegate.next() } } } else { new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]]) } case Right(iter) => new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]]) } } private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = { if (isCheckpointedAndMaterialized) { firstParent[T].iterator(split, context) } else { compute(split, context) } }
storageLevel的值在上文中已经改变为用户指定的参数(不指定则为StorageLevel.MEMORY_ONLY),不再是StorageLevel.NONE。所以这里会执行 getOrCompute(split, context)
SparkEnv.get.blockManager.getOrElseUpdate的第四个参数是一个参数为空返回值为数据迭代器的匿名函数,这里传入的函数其函数体中通过computeOrReadCheckpoint调用当前RDD中的具体compute方法计算数据并将数据封装在iterator中返回,也就是说该匿名函数的作用是计算返回当前RDD的数据。
def getOrElseUpdate[T]( blockId: BlockId, level: StorageLevel, classTag: ClassTag[T], makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = { // Attempt to read the block from local or remote storage. If it's present, then we don't need // to go through the local-get-or-put path. get[T](blockId)(classTag) match { case Some(block) => return Left(block) case _ => // Need to compute the block. } // Initially we hold no locks on this block. doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match { case None => // doPut() didn't hand work back to us, so the block already existed or was successfully // stored. Therefore, we now hold a read lock on the block. val blockResult = getLocalValues(blockId).getOrElse { // Since we held a read lock between the doPut() and get() calls, the block should not // have been evicted, so get() not returning the block indicates some internal error. releaseLock(blockId) throw new SparkException(s"get() failed for block $blockId even though we held a lock") } // We already hold a read lock on the block from the doPut() call and getLocalValues() // acquires the lock again, so we need to call releaseLock() here so that the net number // of lock acquisitions is 1 (since the caller will only call release() once). releaseLock(blockId) Left(blockResult) case Some(iter) => // The put failed, likely because the data was too large to fit in memory and could not be // dropped to disk. Therefore, we need to pass the input iterator back to the caller so // that they can decide what to do with the values (e.g. process them without caching). Right(iter) } } def get[T: ClassTag](blockId: BlockId): Option[BlockResult] = { val local = getLocalValues(blockId) if (local.isDefined) { logInfo(s"Found block $blockId locally") return local } val remote = getRemotevalues[T](blockId) if (remote.isDefined) { logInfo(s"Found block $blockId remotely") return remote } None }
继续跟踪SparkEnv.get.blockManager.getOrElseUpdate,其第四个参数makeIterator的实际值就是返回RDD的数据的函数(此时没有被调用)。
进入该方法体中,首先从缓存中获取block,获取成功则直接返回结果(类参数重的makeIterator函数都没有被调用,所以也就不触发rdd compute);否则,执行缓存过程。该过程的调用链条过于复杂,涉及到内存管理,磁盘IO等,不深入跟踪。
只需要知道该函数是用来缓存并读取缓存数据,如已经有缓存了直接返回缓存结果,否则会先计算数据然后根据存储级别缓存数据。已有缓存或者新建成功了则返回Left(Option[BlockResult])对象;缓存失败返回Right(Iterator[T]) 对象将RDD数据源路返回。
网上说缓存成功后会重构RDD血缘关系?依据是toDebugString显示的依赖关系中会添加 CachedPartitions依赖。日志中log1,log2与log3,log4的对比确实添加了 CachedPartitions依赖 ,但是这并不意味着RDD的依赖关系发生了变化
21/12/19 09:12:49 INFO WordCount: ---log1---(2) MapPartitionsRDD[1] at map at WordCount.scala:17 [Memory Deserialized 1x Replicated] | ParallelCollectionRDD[0] at parallelize at WordCount.scala:16 [Memory Deserialized 1x Replicated] 21/12/19 09:12:50 INFO WordCount: ---log2---(2) ShuffledRDD[2] at reduceByKey at WordCount.scala:23 [] +-(2) MapPartitionsRDD[1] at map at WordCount.scala:17 [] | ParallelCollectionRDD[0] at parallelize at WordCount.scala:16 [] 21/12/19 09:12:51 INFO WordCount: +++log3+++(2) MapPartitionsRDD[1] at map at WordCount.scala:17 [Memory Deserialized 1x Replicated] | CachedPartitions: 2; MemorySize: 288.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B | ReliableCheckpointRDD[3] at collect at WordCount.scala:26 [Memory Deserialized 1x Replicated] 21/12/19 09:12:51 INFO WordCount: +++log4+++(2) ShuffledRDD[2] at reduceByKey at WordCount.scala:23 [] +-(2) MapPartitionsRDD[1] at map at WordCount.scala:17 [] | CachedPartitions: 2; MemorySize: 288.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B | ReliableCheckpointRDD[3] at collect at WordCount.scala:26 [] ---log5--- org.apache.spark.rdd.MapPartitionsRDD org.apache.spark.rdd.ParallelCollectionRDD ---log6--- org.apache.spark.rdd.ShuffledRDD org.apache.spark.rdd.MapPartitionsRDD org.apache.spark.rdd.ParallelCollectionRDD
我认为这种说话是错误的,toDebugString代码中只是输出了CachedPartitions信息并不是在dependcy依赖链中真实添加了节点。
log5,log6表明rdd血缘关系没有改变,源码中也没有看到任何地方会影响dependency的值。这也是缓存与checkpoint的一大区别,checkpoint确实会改变rdd血缘关系。
2.4中说到缓存不会切断rdd血缘关系,但是缓存确实会切断RDD依赖链逻辑计算(compute方法)的执行。一个RDD的数据依赖于上一级RDD的计算结果,但是如果当前RDD的数据以及被缓存过就可以直接读缓存,不需要调用上一级RDD的计算。所以说RDD缓存不会切断RDD血缘关系,但是会切断RDD计算。
测试代码中有两个行动 *** 作:groupByKey,reducyByKey后分别调用collect()。生成的两个job源头都依赖于MapPartitionsRDD,但是传入其中的逻辑计算代码只执行了一次(如下日志只出现了一次)。
说明第二次行动 *** 作计算是直接读取了mapRDD:MapPartitionsRDD的缓存,并没有触发mapRDD及其父RDD的计算。
还有一个问题:
同一个application的不同job之间的执行时串行切独立的,yarn-cluster模式下,俩job可能都不在同一台机器下执行,后面的job怎么读取前一个job中的rdd缓存?
看下获取RDD缓存的源码:
获取缓存方式分本地和远程,所以俩job不在一台机器上也没关系,可以远程获取。缓存在使用时不需要指定存储路径,spark使用application的临时目录存储缓存,所以直到application结束删除缓存,application中的job都可以访问。即便缓存的存储级别是StorageLevel.MEMORY_ONLY,读取缓存数据的时候也可能有IO,网络IO。
检查点使用时是需要设置存储位置的没有设置的话会报错。
sparkContext.setCheckpointDir(directory: String)3.1 运行关键日志
21/12/19 10:32:05 INFO WordCount: ---log1---(2) MapPartitionsRDD[1] at map at WordCount.scala:17 [Memory Deserialized 1x Replicated] | ParallelCollectionRDD[0] at parallelize at WordCount.scala:16 [Memory Deserialized 1x Replicated] 21/12/19 10:32:05 INFO WordCount: ---log2---(2) ShuffledRDD[2] at reduceByKey at WordCount.scala:23 [] +-(2) MapPartitionsRDD[1] at map at WordCount.scala:17 [] | ParallelCollectionRDD[0] at parallelize at WordCount.scala:16 [] --------map---------- --------map---------- --------map---------- --------map---------- --------map---------- --------map---------- --------map---------- 21/12/19 10:33:58 INFO WordCount: +++log3+++(2) MapPartitionsRDD[1] at map at WordCount.scala:17 [Memory Deserialized 1x Replicated] | CachedPartitions: 2; MemorySize: 288.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B | ReliableCheckpointRDD[3] at collect at WordCount.scala:26 [Memory Deserialized 1x Replicated] 21/12/19 10:33:58 INFO WordCount: +++log4+++(2) ShuffledRDD[2] at reduceByKey at WordCount.scala:23 [] +-(2) MapPartitionsRDD[1] at map at WordCount.scala:17 [] | CachedPartitions: 2; MemorySize: 288.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B | ReliableCheckpointRDD[3] at collect at WordCount.scala:26 [] ---log5--- org.apache.spark.rdd.MapPartitionsRDD org.apache.spark.rdd.ReliableCheckpointRDD ---log6--- org.apache.spark.rdd.ShuffledRDD org.apache.spark.rdd.MapPartitionsRDD org.apache.spark.rdd.ReliableCheckpointRDD3.2 checkpoint源码跟踪
跟踪源码def checkpoint(): Unit = RDDCheckpointData.synchronized
def checkpoint(): Unit = RDDCheckpointData.synchronized { // NOTE: we use a global lock here due to complexities downstream with ensuring // children RDD partitions point to the correct parent partitions. In the future // we should revisit this consideration. if (context.checkpointDir.isEmpty) { throw new SparkException("Checkpoint directory has not been set in the SparkContext") } else if (checkpointData.isEmpty) { checkpointData = Some(new ReliableRDDCheckpointData(this)) } }
这里只检查了检查点路径并初始化了checkpointData对象(将rdd对象与ReliableRDDCheckpointData对象关联起来,后面的缓存结果如path会封装到ReliableRDDCheckpointData对象中),不涉及到数据的缓存。真正有数据是在运行job的时候,跟踪job的提交运算过程。
def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = { if (stopped.get()) { throw new IllegalStateException("SparkContext has been shutdown") } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) if (conf.getBoolean("spark.logLineage", false)) { logInfo("RDD's recursive dependencies:n" + rdd.toDebugString) } dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) rdd.doCheckpoint() }
查看rdd.doCheckpoint()
private[spark] def doCheckpoint(): Unit = { RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) { if (!doCheckpointCalled) { doCheckpointCalled = true if (checkpointData.isDefined) { if (checkpointAllMarkedAncestors) { // TODO We can collect all the RDDs that needs to be checkpointed, and then checkpoint // them in parallel. // Checkpoint parents first because our lineage will be truncated after we // checkpoint ourselves dependencies.foreach(_.rdd.doCheckpoint()) } checkpointData.get.checkpoint() } else { dependencies.foreach(_.rdd.doCheckpoint()) } } } }
这里涉及到一个参数:
spark.checkpoint.checkpointAllMarkedAncestors
其默认值是false,标识是否需要迭代执行父类的checkpoint。
如果当前rdd没有定义checkpoint *** 作,则迭代执行父RDD 的doCheckpoint();
当前RDD的定义了checkpoint的情况,查看spark.checkpoint.checkpointAllMarkedAncestors 的配置,判断是否需要迭代执行父类的checkpoint。如果需要先做父RDD的checkpoint(迭代,保证父RDD在子RDD之前)。
checkpointData.get 返回的是new ReliableRDDCheckpointData(this)对象,this指代的是需要checkpoint的rdd。
RDDCheckpointData.scala
final def checkpoint(): Unit = { // Guard against multiple threads checkpointing the same RDD by // atomically flipping the state of this RDDCheckpointData RDDCheckpointData.synchronized { if (cpState == Initialized) { cpState = CheckpointingInProgress } else { return } } val newRDD = doCheckpoint() // Update our state and truncate the RDD lineage RDDCheckpointData.synchronized { cpRDD = Some(newRDD) cpState = Checkpointed rdd.markCheckpointed() } } protected def doCheckpoint(): CheckpointRDD[T]
doCheckpoint()是个抽象方法,看字类的实现。
只有一个子类ReliableRDDCheckpointData.scala
protected override def doCheckpoint(): CheckpointRDD[T] = { val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir) // Optionally clean our checkpoint files if the reference is out of scope if (rdd.conf.get(CLEANER_REFERENCE_TRACKING_CLEAN_CHECKPOINTS)) { rdd.context.cleaner.foreach { cleaner => cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id) } } logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}") newRDD }
调用其伴生对象中的writeRDDToCheckpointDirectory函数
def writeRDDToCheckpointDirectory[T: ClassTag]( originalRDD: RDD[T], checkpointDir: String, blockSize: Int = -1): ReliableCheckpointRDD[T] = { val checkpointStartTimeNs = System.nanoTime() val sc = originalRDD.sparkContext // Create the output path for the checkpoint val checkpointDirPath = new Path(checkpointDir) val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration) if (!fs.mkdirs(checkpointDirPath)) { throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath") } // Save to file, and reload it as an RDD val broadcastedConf = sc.broadcast( new SerializableConfiguration(sc.hadoopConfiguration)) // TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582) sc.runJob(originalRDD, writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _) if (originalRDD.partitioner.nonEmpty) { writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath) } val checkpointDurationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - checkpointStartTimeNs) logInfo(s"Checkpointing took $checkpointDurationMs ms.") val newRDD = new ReliableCheckpointRDD[T]( sc, checkpointDirPath.toString, originalRDD.partitioner) if (newRDD.partitions.length != originalRDD.partitions.length) { throw new SparkException( "Checkpoint RDD has a different number of partitions from original RDD. Original " + s"RDD [ID: ${originalRDD.id}, num of partitions: ${originalRDD.partitions.length}]; " + s"Checkpoint RDD [ID: ${newRDD.id}, num of partitions: " + s"${newRDD.partitions.length}].") } newRDD }
这里提交了一个job(如果上文中提到”spark.checkpoint.checkpointAllMarkedAncestors"有相关配置,挨个对父RDD做检查点,那代价可就大了),相当于需要checkpoint的rdd调用了一个行动 *** 作,行动 *** 作的内容是 writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
mapRDD.checkpoint() mapRDD.collect(ReliableCheckpointRDD.writeRDDToCheckpointDirectory())
上面两种写法逻辑上大概差不多,语法是肯定是不行的。所以checkpoint是行动 *** 作还是转换 *** 作?
计算结束后writePartitionerToCheckpointDir 将rdd中的数据输出到指定的路径(涉及到IO)。然后生成一个新的newRDD:ReliableCheckpointRDD,newRDD的SparkContext,分区器都和原来一样,并包含了数据存储的地址,返回该ReliableCheckpointRDD对象。
返回到上文调用计算的地方
RDDCheckpointData.scala
final def checkpoint(): Unit = { // Guard against multiple threads checkpointing the same RDD by // atomically flipping the state of this RDDCheckpointData RDDCheckpointData.synchronized { if (cpState == Initialized) { cpState = CheckpointingInProgress } else { return } } val newRDD = doCheckpoint() // Update our state and truncate the RDD lineage RDDCheckpointData.synchronized { cpRDD = Some(newRDD) cpState = Checkpointed rdd.markCheckpointed() } }
更新状态,然后 执行rdd.markCheckpointed()
private[spark] def markCheckpointed(): Unit = stateLock.synchronized { clearDependencies() partitions_ = null deps = null // Forget the constructor argument for dependencies too } protected def clearDependencies(): Unit = stateLock.synchronized { dependencies_ = null }
一个RDD对象mapRDD创建检查点后,父RDD的依赖关系会被切断(dependencies_ = null
),切断后所在DAG的源头RDD是谁?
查看RDD.scala
final def dependencies: Seq[Dependency[_]] = { checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse { if (dependencies_ == null) { stateLock.synchronized { if (dependencies_ == null) { dependencies_ = getDependencies } } } dependencies_ } }
一个rdd执行完毕checkpoint后,其父依赖变为了检查点RDD对象ReliableCheckpointRDD,依赖关系为OneToOneDependency,自身成为DAG中的第二个RDD。
checkpoint执行前后RDD血缘关系变化图:
日志中也能看出来:
Spark checkpoint在执行时需要明确指定存储位置,application执行结束后也不会删除checkpoint文件。
SparkContext中其实也有从checkpoint创建源头RDD的函数,只是包私有的。
再回到获取一个RDD数据的地方
final def iterator(split: Partition, context: TaskContext): Iterator[T] = { if (storageLevel != StorageLevel.NONE) { getOrCompute(split, context) } else { computeOrReadCheckpoint(split, context) } } private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = { if (isCheckpointedAndMaterialized) { firstParent[T].iterator(split, context) } else { compute(split, context) } }
如果RDD有缓存,则从缓存中读取数据,缓存失败的情况下则触发重新计算并再次尝试缓存;
没有缓存有检查点的情况下,则从检查点中读取数据,firstParent[T].iterator(split, context),firstParent即为ReliableCheckpointRDD,当前rdd的compute方法不会执行。
如果有检查点又有缓存的情况下呢?是否与两者的调用顺序有关系?
查看DAGScheduler.scala类job提交方法:
提交job后会返回一个JobWaiter对象用来阻塞当前线程(driver线程)直到job执行结束或者用来取消作业,所以job的提交是串行的。(driver线程提交job后,job的后续工作由一个名为“dag-scheduler-event-loop”的守护线程负责)
再看SparkContext.scala
检查点生成的job在行动算子生成的job之后执行,所以缓存肯定先于检查点执行,与两者的调用顺序无关。检查点job执行时,获取mapRDD数据,这是缓存中已经缓存了,所以不会触发其compute方法的执行,直接从缓存中读取数据。
1,缓存(cache,persist)不会改变RDD的血缘关系:但是会切断RDD依赖链的计算依赖,如果RDD被缓存,那么该RDD上游的所有RDD封装的运算逻辑不会被执行。
2,spark 缓存是application级别:存储在application的临时目录存储(如果需要存硬盘),所以直到application结束删除缓存,application中的job都可以访问。
3,即便缓存的存储级别是StorageLevel.MEMORY_ONLY,读取缓存数据的时候也可能有IO,网络IO。
4,checkpoint会改变RDD血缘关系:一个rdd执行完毕checkpoint后,其父依赖变为了检查点RDD对象ReliableCheckpointRDD,依赖关系为OneToOneDependency,自身成为DAG中的第二个RDD。
5,checkpoint独立于application:application运行结束后不会删除checkpoint目录,SparkContext可以从checkpoint目录创建ReliableCheckpointRDD作为DAG的源头(protected[spark])。
6,检查点checkpoint会提交Job:
checkpoint是个怪胎,如果一个application中没有其它行动 *** 作,那么checkpoint也不会提交job,可以说是转换 *** 作;如果有其它行动 *** 作那么它也会提交job,也可以说是行动 *** 作。
7,缓存先于检查点执行:检查点生成的job在行动算子生成的job之后执行,所以缓存肯定先于检查点执行,与两者的调用顺序无关。
8,检查点应该配合缓存使用:检查点会重新提交Job重复执行RDD的逻辑计算,但是如果RDD缓存数据了,则直接读缓存获取行动 *** 作计算的结果,不用重复执行计算,直接进行IO输出缓存结果。
9,多job情况下,缓存公用的RDD,可避免重复计算。
10,单job,但是DAG过长的情况下,缓存计算量大的RDD,有利于任务失败的情况下恢复数据。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)