Spark源码——TaskScheduler

Spark源码——TaskScheduler,第1张

Spark源码——TaskScheduler

在上一篇文章的最后,当stage划分完了,task计算好了最佳位置,就要调用taskScheduler.submitTasks,创建taskSet对象并提交

 if (tasks.size > 0) {
      logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
      stage.pendingPartitions ++= tasks.map(_.partitionId)
      logDebug("New pending partitions: " + stage.pendingPartitions)
      taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
    } else {
      // Because we posted SparkListenerStageSubmitted earlier, we should mark
      // the stage as completed here in case there are no tasks to run
      markStageAsFinished(stage, None)

默认情况下,使用standalone,taskScheduler只是一个trait,实际使用的是taskSchedulerImpl

找到taskSchedulerImpl

并找到submitTasks方法

  override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
    //给每个taskset创建一个tasksetmanager,负责taskset的监控和管理
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      //然后加入缓存
      val stage = taskSet.stageId
      val stageTaskSets =
        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
      stageTaskSets(taskSet.stageAttemptId) = manager
      val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
        ts.taskSet != taskSet && !ts.isZombie
      }
      if (conflictingTaskSet) {
        throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
          s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
      }
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient resources")
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
      }
      hasReceivedTask = true
    }
    //在sparkcontext源码那里就出现过,创建TaskScheduler时,会为TaskSchedulerImpl创建一个SparkDeploySchedulerBackend,这个backend就是之前创建好的SparkDeploySchedulerBackend,这个backend是负责创建AppClient,向Master注册Application的	
    backend.reviveOffers()
  }

进入createTaskSetManager

// Label as private[scheduler] to allow tests to swap in different task set managers if necessary
  private[scheduler] def createTaskSetManager(
      taskSet: TaskSet,
      maxTaskFailures: Int): TaskSetManager = {
    new TaskSetManager(this, taskSet, maxTaskFailures)
  }

什么是TaskSetManager

在 TaskSchedulerImpl 中的单个 TaskSet 中调度任务。此类跟踪每个任务,如果任务失败则重试任务(最多有限次数),并通过延迟调度处理此 TaskSet 的位置感知调度。它的主要接口是 resourceOffer,它询问 TaskSet 是否要在一个节点上运行任务,以及 statusUpdate,它告诉它其中一个任务改变了状态(例如,已完成)

然后看到submitTasks方法的最后一句, backend.reviveOffers()
进入CoarseGrainedSchedulerBackend,找到reviveOffers方法

  override def reviveOffers() {
    driverEndpoint.send(ReviveOffers)
  }

可以看到使用driverEndpoint发送了ReviveOffers
进入到driverEndpoint

 var driverEndpoint: RpcEndpointRef = null

  protected def minRegisteredRatio: Double = _minRegisteredRatio

  override def start() {
    val properties = new ArrayBuffer[(String, String)]
    for ((key, value) <- scheduler.sc.conf.getAll) {
      if (key.startsWith("spark.")) {
        properties += ((key, value))
      }
    }

    // TODO (prashant) send conf instead of properties
    driverEndpoint = createDriverEndpointRef(properties)
  }

一直进入createDriverEndpointRef,找到DriverEndpoint
并找到其中的reviveOffers方法

 case ReviveOffers =>
        makeOffers()

进入makeOffers

  // Make fake resource offers on all executors
    private def makeOffers() {
      // Filter out executors under killing
      //先过滤出alive的executor
      val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
      //对alive的executor,得到它们上面可用的资源
      

      val workOffers = activeExecutors.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
      }.toIndexedSeq
      //调用scheduler的resourceOffers,执行任务分配算法,将task分配到executor上
      //然后调用launchTasks,将分配的task发送launchTasks消息到对应的executor上,由executor启动和执行task
      launchTasks(scheduler.resourceOffers(workOffers))
    }
    

先看看resourceOffers,它传入的就是executor的可用资源

 
  def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    // Mark each slave as alive and remember its hostname
    // Also track if new executor is added
    var newExecAvail = false
    for (o <- offers) {
      if (!hostToExecutors.contains(o.host)) {
        hostToExecutors(o.host) = new HashSet[String]()
      }
      if (!executorIdToRunningTaskIds.contains(o.executorId)) {
        hostToExecutors(o.host) += o.executorId
        executorAdded(o.executorId, o.host)
        executorIdToHost(o.executorId) = o.host
        executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
        newExecAvail = true
      }
      for (rack <- getRackForHost(o.host)) {
        hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
      }
    }

    // Randomly shuffle offers to avoid always placing tasks on the same set of workers.
    //先shuffle,打乱executor,负载均衡
    val shuffledOffers = Random.shuffle(offers)
    // Build a list of tasks to assign to each worker.
    //创建tasks,分配给worker的
    val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
    val availableCpus = shuffledOffers.map(o => o.cores).toArray
    //从调度池中取出排序好的taskset,这个调度池就是sparkcontext初始化创建的那个调度池
    //所有创建好的taskset,都会放入调度池
    //执行task分配算法时,就会从池子里取出taskset
    //所以,分配时是以taskset为单位的
    val sortedTaskSets = rootPool.getSortedTaskSetQueue
    for (taskSet <- sortedTaskSets) {
      logDebug("parentName: %s, name: %s, runningTasks: %s".format(
        taskSet.parent.name, taskSet.name, taskSet.runningTasks))
      if (newExecAvail) {
        taskSet.executorAdded()
      }
    }


	//下面就是核心的任务分配算法了
    // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
    // of locality levels so that it gets a chance to launch local tasks on all of them.
    // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
    
	//每个taskset从最高的本地化级别开始遍历
    for (taskSet <- sortedTaskSets) {
      var launchedAnyTask = false
      var launchedTaskAtCurrentMaxLocality = false
      for (currentMaxLocality <- taskSet.myLocalityLevels) {
      //对每个taskset,尝试使用每一种本地化级别
      //将taskset上的task,在executor上启动
      //如果无法启动,跳出do-while循环,使用另一种本地化级别
      //
        do {
          launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
            taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
          launchedAnyTask |= launchedTaskAtCurrentMaxLocality
        } while (launchedTaskAtCurrentMaxLocality)
      }
      if (!launchedAnyTask) {
        taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
      }
    }

    if (tasks.size > 0) {
      hasLaunchedTask = true
    }
    return tasks
  }

进入resourceOfferSingleTaskSet方法

  private def resourceOfferSingleTaskSet(
      taskSet: TaskSetManager,
      maxLocality: TaskLocality,
      shuffledOffers: Seq[WorkerOffer],
      availableCpus: Array[Int],
      tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
    var launchedTask = false
    //遍历所有executor
    for (i <- 0 until shuffledOffers.size) {
      val execId = shuffledOffers(i).executorId
      val host = shuffledOffers(i).host
      //如果cpu足够提供给每个task
      if (availableCpus(i) >= CPUS_PER_TASK) {
        try {
        //找到,在这个executor上的task,并且根据传入的本地化级别
          for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
          //放入tasks,给指定的executor要启动的task
            tasks(i) += task
            //至此,就实现了task的分配算法
            val tid = task.taskId
            taskIdToTaskSetManager(tid) = taskSet
            taskIdToExecutorId(tid) = execId
            executorIdToRunningTaskIds(execId).add(tid)
            availableCpus(i) -= CPUS_PER_TASK
            assert(availableCpus(i) >= 0)
            //设置为true
            launchedTask = true
          }
        } catch {
          case e: TaskNotSerializableException =>
            logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
            // Do not offer resources for this task, but don't throw an error to allow other
            // task sets to be submitted.
            return launchedTask
        }
      }
    }
    return launchedTask
  }


回到makeOffers,看看launchTasks

// Launch tasks returned by a set of resource offers
    private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
      for (task <- tasks.flatten) {
      //序列化为字节数组
        val serializedTask = ser.serialize(task)
        if (serializedTask.limit >= maxRpcMessageSize) {
          scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
            try {
              var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
                "spark.rpc.message.maxSize (%d bytes). Consider increasing " +
                "spark.rpc.message.maxSize or using broadcast variables for large values."
              msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)
              taskSetMgr.abort(msg)
            } catch {
              case e: Exception => logError("Exception in error callback", e)
            }
          }
        }
        else {
        //找到对应的executor
          val executorData = executorDataMap(task.executorId)
          executorData.freeCores -= scheduler.CPUS_PER_TASK

          logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
            s"${executorData.executorHost}.")

	//向 executor发送launchTask消息       
	executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
        }
      }
    }

欢迎分享,转载请注明来源:内存溢出

原文地址: http://www.outofmemory.cn/zaji/5677700.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存