Spark源码知识讲解之Master启动和通信机制

Spark源码知识讲解之Master启动和通信机制,第1张

Master主要就是用于管理集群,负责资源的调度什么的。


它继承了ThreadSafeRpcEndpoint和LeaderElectable,由于继承ThreadSafeRpcEndpoint,所以Master就可以作为一个RpcEndpoint;继承LeaderElectable之后,就可以参见选举

一 重要属性

RpcEnv rpcEnv:用于注册和维护RpcEndpoint和RpcEndpointRef

RpcAddress:address: 维护了host和port

Int webUiPort : web ui 端口

HashSet[WorkerInfo] workers : 维护全部worker的信息

HashMap[String, WorkerInfo] idToWorker: 维护workid和workerinfo之间的映射关系

HashMap[RpcAddress, WorkerInfo] addressToWorker: 维护worker节点的地址信息个wokrerinfo的映射关系

HashSet[ApplicationInfo] apps: 维护全部application的信息

HashMap[String, ApplicationInfo] idToApp: 维护application id和 ApplicationInfo映射关系

ArrayBuffer[ApplicationInfo] waitingApps: 当前处于等待的application

ArrayBuffer[ApplicationInfo] completedApps:当前已经完成的application

HashMap[RpcEndpointRef, ApplicationInfo]endpointToApp: 维护 RpcEndpointRef和ApplicationInfo之间的映射

HashMap[RpcAddress, ApplicationInfo] addressToApp:维护 RpcAddress和ApplicationInfo之间的映射

HashSet[DriverInfo] drivers:维护所有驱动的信息

ArrayBuffer[DriverInfo] waitingDrivers:当前处于等待的驱动

ArrayBuffer[DriverInfo] completedDrrivers: 当前已经完成的驱动

二 核心方法

2.1 startRpcEnvAndEndpoint

# 注册'Master' RpcEndpoint,并返回RpcEndpointRef,用于发送消息# 向Master的通信终端发送请求,获取绑定的端口号

# 返回一个(RpcEnv, web ui port, rest port)

def startRpcEnvAndEndpoint(host: String, port: Int, webUiPort: Int,

conf: SparkConf): (RpcEnv, Int, Option[Int]) = {

val securityMgr = new SecurityManager(conf)

val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)

// 注册'Master' RpcEndpoint,并返回RpcEndpointRef,用于发送消息

val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,

new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))

// 向Master的通信终端发送请求,获取绑定的端口号

val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)

// 返回一个(RpcEnv,web ui port, rest port)

(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)

}

2.2 初始化Master,首先就会调用到onStart方法

# 构建web ui 和 启动rest server

# 守护线程启动一个调度机制,定期检查Worker是否超时

# 进行Master HA相关的 *** 作

override def onStart(): Unit = {
 logInfo("Starting Spark master at " + masterUrl)
 logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
 // 构建Master的Web UI,可以查看向Master提交的应用程序等信息
 webUi = new MasterWebUI(this, webUiPort)
 webUi.bind()
 masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
 // 如果启用反向代理
 if (reverseProxy) {
 masterWebUiUrl = conf.get("spark.ui.reverseProxyUrl", masterWebUiUrl)
 logInfo(s"Spark Master is acting as a reverse proxy. Master, Workers and " +
 s"Applications UIs are available at $masterWebUiUrl")
 }
 // 守护线程启动一个调度机制,定期检查Worker是否超时
 checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
 override def run(): Unit = Utils.tryLogNonFatalError {
 self.send(CheckForWorkerTimeOut)
 }
 }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
 // 如果启用了rest server,那么启动rest服务,可以通过该服务向master提交各种请求
 if (restServerEnabled) {
 val port = conf.getInt("spark.master.rest.port", 6066)
 restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl))
 }
 restServerBoundPort = restServer.map(_.start())

 masterMetricsSystem.registerSource(masterSource)
 masterMetricsSystem.start()
 applicationMetricsSystem.start()
 masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
 applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)

 val serializer = new JavaSerializer(conf)
 val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
 // 如果恢复模式是ZOOKEEPER,那么通过zookeeper来持久化恢复状态
 case "ZOOKEEPER" =>
 logInfo("Persisting recovery state to ZooKeeper")
 val zkFactory =
 new ZooKeeperRecoveryModeFactory(conf, serializer)
 (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
 // 如果恢复模式是文件系统,那么通过文件系统来持久化恢复状态
 case "FILESYSTEM" =>
 val fsFactory =
 new FileSystemRecoveryModeFactory(conf, serializer)
 (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
 // 如果恢复模式是定制的,那么指定你定制的全路径类名,然后产生相关 *** 作来持久化恢复状态
 case "CUSTOM" =>
 val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
 val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
 .newInstance(conf, serializer)
 .asInstanceOf[StandaloneRecoveryModeFactory]
 (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
 case _ =>
 (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
 }
 persistenceEngine = persistenceEngine_
 leaderElectionAgent = leaderElectionAgent_
}

2.3 receive

我们知道Master本身就是一个Actor或者RpcEndpoint,所以他肯定会实现receive方法

/**

* receive是一个偏函数,对于偏函数而言,是PartialFunction[A,B]类的一个实例,A是可以接收的类型,

* B是需要返回的类型。


对应着这里的receive方法,所以可以接收任何类型,不需要返回

* @return

*/

override def receive: PartialFunction[Any, Unit] = {

// 如果是ElectedLeader请求,表示需要重新选举

case ElectedLeader =>

// 根据配置的spark.deploy.recoveryMode,决定使用哪一种recovery 模式,然后决定采用什么持久化engine

// 然后根据持久化engine,读取持久化数据,得到一个已经存储的(applicationInfo,driverInfo,workerInfo)

// 的一个三元组

val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)

// 根据读取的持久化数据是否都为空,判断RecoveryState状态是否是alive还是recovering

state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {

RecoveryState.ALIVE

} else {

RecoveryState.RECOVERING

}

logInfo("I have been elected leader! New state: " + state)

// 如果处于recovering状态

if (state == RecoveryState.RECOVERING) {

// 开始恢复数据

beginRecovery(storedApps, storedDrivers, storedWorkers)

// 后台线程调度一个线程去定期检查master完成了恢复工作

recoveryCompletionTask= forwardMessageThread.schedule(new Runnable {

override def run(): Unit = Utils.tryLogNonFatalError {

self.send(CompleteRecovery)

}

}, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)

}

// 如果是CompleteRecovery,则调用completeRecovery

case CompleteRecovery => completeRecovery()

// 如果是RevokedLeadership请求,则是关闭Master,将会重新触发master Leadership选举

case RevokedLeadership =>

logError("Leadership has been revoked -- master shutting down.")

System.exit(0)

// 如果是RegisterApplication请求,则判断是不是leader,从而注册应用程序

case RegisterApplication(description, driver) =>

// 其他的非leader的master是不能进行应用程序的创建和注册

if (state == RecoveryState.STANDBY) {

// ignore,don't send response

} else {

logInfo("Registering app " + description.name)

// 创建应用程序和driver

val app = createApplication(description, driver)

// 注册应用程序

registerApplication(app)

logInfo("Registered app " + description.name + " with ID " + app.id)

// 持久化引擎添加该application

persistenceEngine.addApplication(app)

// 向master发送RegisteredApplication请求,表示注册已完成

driver.send(RegisteredApplication(app.id, self))

schedule()

}

// 如果ExecutorStateChanged请求,表示Executor状态发生改变

case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>

// 通过application获取运行该app的executor得到指定的executor

val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))

execOption match {

case Some(exec) =>

// 获取appinfo信息

val appInfo = idToApp(appId)

// 更新该executor的状态为指定的状态

val oldState = exec.state

exec.state = state

// 如果指定的状态时处于正在运行的状态,将retry重试次数置为0

if (state == ExecutorState.RUNNING) {

assert(oldState == ExecutorState.LAUNCHING,

s"executor$execId state transfer from $oldState to RUNNING is illegal")

appInfo.resetRetryCount()

}

// 给executor对应的 application发送ExecutorUpdated请求

exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false))

// 如果指定状态该是已经完成状态

if (ExecutorState.isFinished(state)) {

logInfo(s"Removing executor ${exec.fullId} because it is $state")

// 如果应用程序已经运行完毕,则从appInfo移除这个executor

if (!appInfo.isFinished) {

appInfo.removeExecutor(exec)

}

// 该executor所对应的worker也会移除该executor

exec.worker.removeExecutor(exec)

// 重试一定次数,不再无限制循环

val normalExit = exitStatus == Some(0)

// 只要retry次数小于10,那么executor的资源就会不断的调整

if (!normalExit

&& appInfo.incrementRetryCount() >= MAX_EXECUTOR_RETRIES

&& MAX_EXECUTOR_RETRIES >= 0) { // < 0 disables this application-killing path

val execs = appInfo.executors.values

if (!execs.exists(_.state == ExecutorState.RUNNING)) {

logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +

s"${appInfo.retryCount} times; removing it")

removeApplication(appInfo, ApplicationState.FAILED)

}

}

}

//重新分配资源

schedule()

case None =>

logWarning(s"Got status update for unknown executor $appId/$execId")

}

// 如果发送DriverStateChanged请求,表示Driver转态发生变化

case DriverStateChanged(driverId, state, exception) =>

// 如果Driver的state为ERROR | FINISHED | KILLED | FAILED,删除它

state match {

case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>

removeDriver(driverId, state, exception)

case _ =>

throw new Exception(s"Received unexpectedstate update for driver $driverId: $state")

}

// 如果发送的是Heartbeat请求,表示心跳检测机制,由worker向master发起的

case Heartbeat(workerId, worker) =>

// 根据workerId获取worker

idToWorker.get(workerId) match {

// 如果worker存在,则更新workinfo的lastHeartbeat属性,否则表示该worker还没有向master注册

case Some(workerInfo) =>

workerInfo.lastHeartbeat = System.currentTimeMillis()

case None =>

// 如果worker集合已经存在这个worker

if (workers.map(_.id).contains(workerId)) {

logWarning(s"Got heartbeat from unregistered worker $workerId." +

" Askingit to re-register.")

// 则worker向master发起ReconnectWorker请求

worker.send(ReconnectWorker(masterUrl))

} else {

logWarning(s"Got heartbeat from unregistered worker $workerId." +

" Thisworker was never registered, so ignoring the heartbeat.")

}

}

// 如果是MasterChangeAcknowledged请求,表示application已经被master确认,将app状态置为waiting

case MasterChangeAcknowledged(appId) =>

idToApp.get(appId) match {

case Some(app) =>

logInfo("Application has been re-registered: " + appId)

app.state = ApplicationState.WAITING

case None =>

logWarning("Master change ack from unknown app: " + appId)

}

// 判断当前是否可以进行completeRecovery *** 作,如果可以进行completeRecovery *** 作

if (canCompleteRecovery) { completeRecovery() }

// 如果是WorkerSchedulerStateResponse,表示worker调度状态响应请求

case WorkerSchedulerStateResponse(workerId, executors, driverIds) =>

// 根据workerId获取worker

idToWorker.get(workerId) match {

case Some(worker) =>

logInfo("Worker has been re-registered: " + workerId)

// worker状态置为alive

worker.state = WorkerState.ALIVE

// 从指定的executor中过滤出哪些是有效的executor

val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined)

// 遍历有效的executors

for (exec <- validExecutors) {

// 获取executor所对应的app

val app = idToApp.get(exec.appId).get

// 为app设置executor,比如哪一个worker,多少核数等资源

val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId))

// 将该executor添加到该woker

worker.addExecutor(execInfo)

execInfo.copyState(exec)

}

// 将所有的driver设置为RUNNING然后加入到worker中

for (driverId <- driverIds) {

drivers.find(_.id == driverId).foreach { driver =>

driver.worker = Some(worker)

driver.state = DriverState.RUNNING

worker.drivers(driverId) = driver

}

}

case None =>

logWarning("Scheduler state from unknown worker: " + workerId)

}

// 判断当前是否可以进行completeRecovery *** 作,如果可以进行completeRecovery *** 作

if (canCompleteRecovery) { completeRecovery() }

// 如果是WorkerLatestState,表示woreker最近的状态

case WorkerLatestState(workerId, executors, driverIds) =>

// 根据指定的wrokerId获取worker

idToWorker.get(workerId) match {

case Some(worker) =>

// 比那里指定的executor,判断指定的这些executor是否能够和worker里的executor进行匹配

for (exec <- executors) {

val executorMatches = worker.executors.exists {

case (_, e) => e.application.id == exec.appId && e.id == exec.execId

}

// 如果匹配不上则让worker kill掉这executor

if (!executorMatches) {

// masterdoesn't recognize this executor. So just tell worker to kill it.

worker.endpoint.send(KillExecutor(masterUrl, exec.appId, exec.execId))

}

}

// 遍历传递进来的driver

for (driverId <- driverIds) {

// 判断driver是否匹配

val driverMatches = worker.drivers.exists { case (id, _) => id == driverId }

// 如果匹配不上则让worker kill掉这driver

if (!driverMatches) {

// masterdoesn't recognize this driver. So just tell worker to kill it.

worker.endpoint.send(KillDriver(driverId))

}

}

case None =>

logWarning("Worker state from unknown worker: " + workerId)

}

// 如果是UnregisterApplication请求,表示不注册app,从当前master中移除

case UnregisterApplication(applicationId) =>

logInfo(s"Received unregister request from application $applicationId")

idToApp.get(applicationId).foreach(finishApplication)

// 如果是CheckForWorkerTimeOut,表示检测worker超时的请求

case CheckForWorkerTimeOut =>

timeOutDeadWorkers()


}

2.4 receiveAndReply

在master启动之后就要开始接受消息,但是这些请求时需要返回结果的

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {

// 如果请求是RegisterWorker,表示 Master注册新的Worker

case RegisterWorker(

id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) =>

logInfo("Registering worker %s:%d with %d cores, %s RAM".format(

workerHost, workerPort, cores, Utils.megabytesToString(memory)))

// 如果当前节点状态是standby,返回MasterInStandby

if (state == RecoveryState.STANDBY) {

context.reply(MasterInStandby)

} else if (idToWorker.contains(id)) {

// 判断维护的workerid->WorkerInfo映射是否包含这个worker id

// 如果包含返回wokerid,则返回 worker id重复的RegisterWorkerFailed

context.reply(RegisterWorkerFailed("Duplicateworker ID"))

} else {// 表示当前节点为master,且要注册是worker id之前是不存在的

// 创建worker,并进行注册,注册成功并且返回RegisteredWorker请求,然后开始调度

// 否则返回RegisterWorkerFailed请求,worker注册失败

val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,

workerRef, workerWebUiUrl)

if (registerWorker(worker)) {

persistenceEngine.addWorker(worker)

context.reply(RegisteredWorker(self, masterWebUiUrl))

schedule()

} else {

val workerAddress = worker.endpoint.address

logWarning("Worker registration failed. Attempted to re-register worker at same" +

"address:" + workerAddress)

context.reply(RegisterWorkerFailed("Attemptedto re-register worker at same address: "

+ workerAddress))

}

}

// 如果是RequestSubmitDriver请求,表示提交driver,更新master所维护的driver信息

case RequestSubmitDriver(description) =>

// 如果master不是active,返回错误

if (state != RecoveryState.ALIVE) {

val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +

"Can onlyaccept driver submissions in ALIVE state."

context.reply(SubmitDriverResponse(self, false, None, msg))

} else {

logInfo("Driver submitted " + description.command.mainClass)

// 创建driver

val driver = createDriver(description)

// 持久化引擎添加drriver

persistenceEngine.addDriver(driver)

// drivers集合和waitingDrivers集合添加driver

waitingDrivers+= driver

drivers.add(driver)

schedule()// 开始调度

// 返回成功的请求消息

context.reply(SubmitDriverResponse(self, true, Some(driver.id),

s"Driversuccessfully submitted as ${driver.id}"))

}

// 如果是RequestKillDriver请求,表示kill掉该driver

case RequestKillDriver(driverId) =>

// 如果master不是active,返回错误

if (state != RecoveryState.ALIVE) {

val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +

s"Canonly kill drivers in ALIVE state."

context.reply(KillDriverResponse(self, driverId, success = false, msg))

} else {

logInfo("Asked to kill driver " + driverId)

// 获取指定的driver

val driver = drivers.find(_.id == driverId)

driver match {

// 从master所维护的driver先关列表或者集合中移除这个driver

case Some(d) =>

// 处于等待的driver集合包含这个driver

if (waitingDrivers.contains(d)) {

// 移除并且发送DriverStateChanged请求

waitingDrivers -= d

self.send(DriverStateChanged(driverId, DriverState.KILLED, None))

} else {

// 否则让worker kill掉这个driver

d.worker.foreach { w =>

w.endpoint.send(KillDriver(driverId))

}

}

// 返回KillDriverResponse请求

val msg = s"Kill request for $driverId submitted"

logInfo(msg)

context.reply(KillDriverResponse(self, driverId, success = true, msg))

case None =>

val msg = s"Driver $driverId has already finished or does not exist"

logWarning(msg)

context.reply(KillDriverResponse(self, driverId, success = false, msg))

}

}

// 如果是RequestDriverStatus,表示获取driver状态信息

case RequestDriverStatus(driverId) =>

// 如果master不是active,返回错误

if (state != RecoveryState.ALIVE) {

val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +

"Can onlyrequest driver status in ALIVE state."

context.reply(

DriverStatusResponse(found = false, None, None, None, Some(new Exception(msg))))

} else {

// 从当前的drivers集合和已经完成的driver集合查找这个driver,并返回相关信息

(drivers ++ completedDrivers).find(_.id == driverId) match {

case Some(driver) =>

context.reply(DriverStatusResponse(found = true, Some(driver.state),

driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception))

case None =>

context.reply(DriverStatusResponse(found = false, None, None, None, None))

}

}

// 如果是RequestMasterState请求,则表示获取master状态

case RequestMasterState =>

context.reply(MasterStateResponse(

address.host, address.port, restServerBoundPort,

workers.toArray, apps.toArray, completedApps.toArray,

drivers.toArray, completedDrivers.toArray, state))

// 如果是BoundPortsRequest,则表示获取绑定的那些端口

case BoundPortsRequest =>

context.reply(BoundPortsResponse(address.port, webUi.boundPort, restServerBoundPort))

// 如果是RequestExecutors,则表示为application设置目标数量的executor

case RequestExecutors(appId, requestedTotal) =>

context.reply(handleRequestExecutors(appId, requestedTotal))

// 如果是KillExecutors,表示杀掉application指定的executors,返回 *** 作状态

case KillExecutors(appId, executorIds) =>

val formattedExecutorIds = formatExecutorIds(executorIds)

context.reply(handleKillExecutors(appId, formattedExecutorIds))

}

2.5 onDisconnected 断开连接

override def onDisconnected(address: RpcAddress): Unit = {

// Thedisconnected client could've been either aworker or an app; remove whichever it was

logInfo(s"$address got disassociated, removing it.")

addressToWorker.get(address).foreach(removeWorker)

addressToApp.get(address).foreach(finishApplication)

if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }

}

2.6 beginRecovery 开始恢复

private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],

storedWorkers: Seq[WorkerInfo]) {

// 遍历每一个存储的application,注册该application,并且发送MasterChanged请求

for (app <- storedApps) {

logInfo("Trying to recover app: " + app.id)

try {

registerApplication(app)

// 将该application状态置为UNKNOWN状态

app.state = ApplicationState.UNKNOWN

// 然后这个app向master发送MasterChanged请求

app.driver.send(MasterChanged(self, masterWebUiUrl))

} catch {

case e: Exception => logInfo("App " + app.id + " had exception on reconnect")

}

}

// 遍历每一个存储的driver, 更新master所维护的driver集合

for (driver <- storedDrivers) {

drivers += driver

}

// 遍历每一个存储的wroker,然后向master注册worker

for (worker <- storedWorkers) {

logInfo("Trying to recover worker: " + worker.id)

try {

// 注册worker,就是更新master的woker集合,和worker相关的映射列表

registerWorker(worker)

// 将该worker状态置为UNKNOWN状态

worker.state = WorkerState.UNKNOWN

// 然后改worker向master发送MasterChanged请求

worker.endpoint.send(MasterChanged(self, masterWebUiUrl))

} catch {

case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")

}

}

}

2.7 completeRecovery 完成恢复

// 恢复完毕,重新创建Driver,完成资源的重新分配

private def completeRecovery() {

// 如果状态不是recovering则返回

if (state != RecoveryState.RECOVERING) { return }

// 然后状态置为completing_recovery(正处于恢复中)

state = RecoveryState.COMPLETING_RECOVERY


// 杀掉那些不响应但是状态不是UNKNOWN的worker和application

workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)

apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)


// 重新调度未被任何worker声称的driver,即还没有worker来运行

drivers.filter(_.worker.isEmpty).foreach { d =>

logWarning(s"Driver ${d.id} was not found after master recovery")

// 如果是driver是监管者,则重新发起driver,否则删除driver

if (d.desc.supervise) {

logWarning(s"Re-launching ${d.id}")

relaunchDriver(d)

} else {

removeDriver(d.id, DriverState.ERROR, None)

logWarning(s"Did not re-launch ${d.id} because it was not supervised")

}

}

// 然后状态置为alive

state = RecoveryState.ALIVE

// 重新分配资源

schedule()

logInfo("Recovery complete - resuming operations!")

}

2.8 removeWorker 删除worker

private def removeWorker(worker: WorkerInfo) {

logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)

// 更新该worker状态为DEAD

worker.setState(WorkerState.DEAD)

// 从worker相关的映射中移除这个worker

idToWorker -= worker.id

addressToWorker-= worker.endpoint.address

if (reverseProxy) {

webUi.removeProxyTargets(worker.id)

}

// 遍历worker的executors

for (exec <- worker.executors.values) {

logInfo("Telling app of lost executor: " + exec.id)

// 处于该worker上executor里的application发送ExecutorUpdated请求

exec.application.driver.send(ExecutorUpdated(

exec.id, ExecutorState.LOST, Some("worker lost"), None, workerLost = true))

// 该executor状态置为LOST

exec.state = ExecutorState.LOST

// 该executor的application移除executor

exec.application.removeExecutor(exec)

}

// 遍历worker上的driver

for (driver <- worker.drivers.values) {

// 判断driver是否是supervise,如果是重新发起driver,否则移除该driver

if (driver.desc.supervise) {

logInfo(s"Re-launching ${driver.id}")

relaunchDriver(driver)

} else {

logInfo(s"Not re-launching ${driver.id} because it was not supervised")

removeDriver(driver.id, DriverState.ERROR, None)

}

}

// 持久化引擎移除该worker

persistenceEngine.removeWorker(worker)

}

2.9 removeApplication 删除application

def removeApplication(app: ApplicationInfo, state: ApplicationState.Value) {

// 如果master所维护的application集合包含这个application,则移除它,并且相关的application映射也移除这个app

if (apps.contains(app)) {

logInfo("Removing app " + app.id)

apps -= app

idToApp -= app.id

endpointToApp -= app.driver

addressToApp -= app.driver.address

if (reverseProxy) {

webUi.removeProxyTargets(app.id)

}

// 如果已经完成application的大小大于或者等于spark.deploy.retainedApplications

if (completedApps.size >= RETAINED_APPLICATIONS) {

// 计算要删除的数量

val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)


completedApps.take(toRemove).foreach { a =>

applicationMetricsSystem.removeSource(a.appSource)

}

// 从已经完成application数组移除计算的需要删除的数量的apps

completedApps.trimStart(toRemove)

}

// 然后将该app加入到完成列表

completedApps += app // Remember it in our history

// 等待列表移除这个app

waitingApps -= app

// kill掉运行该app的所有的executor

for (exec <- app.executors.values) {

killExecutor(exec)

}

// 重新标记application状态

app.markFinished(state)

// 如果app不出于完成状态(FINISHED),则发送ApplicationRemoved请求

if (state != ApplicationState.FINISHED) {

app.driver.send(ApplicationRemoved(state.toString))

}

// 持久化引擎移除这个application

persistenceEngine.removeApplication(app)

// 重新调度

schedule()


// 向所有worker发送该app已经完成的请求ApplicationFinished

workers.foreach { w =>

w.endpoint.send(ApplicationFinished(app.id))

}

}

}

2.10 registerApplication 注册应用程序

private def registerApplication(app: ApplicationInfo): Unit = {

// 获取app的RpcAddress

val appAddress = app.driver.address

// 如果已经注册过,则直接返回

if (addressToApp.contains(appAddress)) {

logInfo("Attempted to re-register application at same address: " + appAddress)

return

}


applicationMetricsSystem.registerSource(app.appSource)

apps += app // 添加这个app到master所维护的application集合

// 并且把app相关数据存放到对应application映射列表

idToApp(app.id) = app

endpointToApp(app.driver) = app

addressToApp(appAddress) = app

waitingApps += app

if (reverseProxy) {

webUi.addProxyTargets(app.id, app.desc.appUiUrl)

}

}

2.11 registerWorker 注册worker

private def registerWorker(worker: WorkerInfo): Boolean = {

// 从master维护的worker集合移除状态为dead的worker

workers.filter { w =>

(w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)

}.foreach { w =>

workers -= w

}

// 获取指定要注册worker的RpcAddress

val workerAddress = worker.endpoint.address

// 如果RpcAddress->WorkInfo的映射包含workerAddress,则获取这个worker

if (addressToWorker.contains(workerAddress)) {

val oldWorker = addressToWorker(workerAddress)

// 如果状态是UNKNOWN

if (oldWorker.state == WorkerState.UNKNOWN) {

// 意味着这个worker是重新恢复的worker,所以之前的老的worker需要移除掉

removeWorker(oldWorker)

} else {

logInfo("Attempted to re-register worker at same address: " + workerAddress)

return false

}

}

// 从master维护的worker集合添加这个worker

workers += worker

// 更新master中相关worker的映射或者集合列表

idToWorker(worker.id) = worker

addressToWorker(workerAddress) = worker

if (reverseProxy) {

webUi.addProxyTargets(worker.id, worker.webUiAddress)

}

true

}

2.12 timeOutDeadWorkers 移除那些超时的worker

/** 移除那些超时的worker */

private def timeOutDeadWorkers() {

// 获取那些lastHeartbeat超过指定的超时时间的worker

val currentTime = System.currentTimeMillis()

val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT_MS).toArray

// 遍历这些worker,如果状态不是dead,然后删除

for (worker <- toRemove) {

if (worker.state != WorkerState.DEAD) {

logWarning("Removing %s because we got no heartbeat in %d seconds".format(

worker.id, WORKER_TIMEOUT_MS / 1000))

removeWorker(worker)

} else {

// 如果是dead,满足条件,则从master维护的workers集合移除这个worker

if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS+ 1) * WORKER_TIMEOUT_MS)) {

workers -= worker // we've seen this DEAD worker in the UI, etc.for long enough; cull it

}

}

}

}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://www.outofmemory.cn/zaji/531107.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2019-08-14
下一篇 2019-08-14

发表评论

登录后才能评论

评论列表(0条)

保存