广告
返回顶部
首页 > 资讯 > 数据库 >spark-3.0 Application 调度算法解析
  • 454
分享到

spark-3.0 Application 调度算法解析

spark-3.0Application调度算法解析 2018-05-18 14:05:14 454人浏览 无得
摘要

spark 各个版本的application 调度算法还是有这明显的不同之处的。从spark1.3.0 到 spark 1.6.1、spark2.x 到 现在最新的spark 3.x ,调度算法有了一定的修改。下面大家一起学习一下,最新的s

spark 各个版本的application 调度算法还是有这明显的不同之处的。从spark1.3.0 到 spark 1.6.1、spark2.x 到 现在最新的spark 3.x ,调度算法有了一定的修改。下面大家一起学习一下,最新的spark 版本spark-3.0的Application 调度机制。

private def startExecutorsOnWorkers(): Unit = {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
for (app <- waitingApps) {
//如果在 spark-submmit 脚本中,指定了每个executor 多少个 CPU core,
// 则每个Executor 分配该个数的 core,
// 否则 默认每个executor 只分配 1 个 CPU core
val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
// If the cores left is less than the coresPerExecutor,the cores left will not be allocated
// 当前 APP 还需要分配的 core 数 不能 小于 单个 executor 启动 的 CPU core 数
if (app.coresLeft >= coresPerExecutor) {
// Filter out workers that don"t have enough resources to launch an executor
// 过滤出 状态 为 ALIVE,并且还能 发布 Executor 的 worker
// 按照剩余的 CPU core 数 倒序
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(canLaunchExecutor(_, app.desc))
.sortBy(_.coresFree).reverse
if (waitingApps.length == 1 && usableWorkers.isEmpty) {
logWarning(s"App ${app.id} requires more resource than any of Workers could have.")
}
    // TODO:  默认采用 spreadOutApps  调度算法, 将 application需要的 executor资源 分派到  多个 worker 上去
      val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

// Now that we"ve decided how many cores to allocate on each worker, let"s allocate them
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(
app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
}
}
}
}
判断一个 worker 是否可以发布 executor
private def canLaunchExecutor(worker: WorkerInfo, desc: ApplicationDescription): Boolean = {
canLaunch(
worker,
desc.memoryPerExecutORMB,
desc.coresPerExecutor.getOrElse(1),
desc.resourceReqsPerExecutor)
}
让我们看一看里面的 canlunch 方法
private def canLaunch(
worker: WorkerInfo,
memoryReq: Int,
coresReq: Int,
resourceRequirements: Seq[ResourceRequirement])
: Boolean = {
// worker 上 空闲的 内存值 要 大于等于 请求的 内存值
val enoughMem = worker.memoryFree >= memoryReq
// worker 上 空闲的 core 数 要 大于等于 请求的 core数
val enoughCores = worker.coresFree >= coresReq
// worker 是否满足 executor 请求的资源
val enoughResources = ResourceUtils.resourcesMeetRequirements(
worker.resourcesAmountFree, resourceRequirements)
enoughMem && enoughCores && enoughResources
}

回到上面的 scheduleExecutorsOnWorkers
private def scheduleExecutorsOnWorkers(
app: ApplicationInfo,
usableWorkers: Array[WorkerInfo],
spreadOutApps: Boolean): Array[Int] = {
val coresPerExecutor = app.desc.coresPerExecutor
val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
// 默认情况下 是 开启 oneExecutorPerWorker 机制的,也就是默认是在 一个 worker 上 只启动 一个 executor的
// 如果在spark -submit 脚本中设置了coresPerExecutor , 在worker资源充足的时候,则 会在每个worker 上,启动多个executor
val oneExecutorPerWorker = coresPerExecutor.isEmpty
val memoryPerExecutor = app.desc.memoryPerExecutorMB
val resourceReqsPerExecutor = app.desc.resourceReqsPerExecutor
val numUsable = usableWorkers.length
val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)

// 判断 Worker节点是否能够启动Executor
def canLaunchExecutorForApp(pos: Int): Boolean = {

val keepScheduling = coresToAssign >= minCoresPerExecutor
val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
val assignedExecutorNum = assignedExecutors(pos)

// If we allow multiple executors per worker, then we can always launch new executors.
// Otherwise, if there is already an executor on this worker, just give it more cores.

// 如果spark -submit 脚本中设置了coresPerExecutor值,
// 并且当前 这个worker 还没有为这个 application 分配 过 executor ,
val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutorNum == 0
// TODO: 可以启动新的 Executor
if (launchingNewExecutor) {
val assignedMemory = assignedExecutorNum * memoryPerExecutor
val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
val assignedResources = resourceReqsPerExecutor.map {
req => req.resourceName -> req.amount * assignedExecutorNum
}.toMap
val resourcesFree = usableWorkers(pos).resourcesAmountFree.map {
case (rName, free) => rName -> (free - assignedResources.getOrElse(rName, 0))
}
val enoughResources = ResourceUtils.resourcesMeetRequirements(
resourcesFree, resourceReqsPerExecutor)
val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
keepScheduling && enoughCores && enoughMemory && enoughResources && underLimit
} else {
// We"re adding cores to an existing executor, so no need
// to check memory and executor limits
// TODO: 不满足启动新的 Executor条件,则 在 老的 Executor 上 追加 core 数
keepScheduling && enoughCores
}
}

// Keep launching executors until no more workers can accommodate any
// more executors, or if we have reached this application"s limits

var freeWorkers = (0 until numUsable).filter(canLaunchExecutorForApp)
while (freeWorkers.nonEmpty) {
freeWorkers.foreach { pos =>
var keepScheduling = true
while (keepScheduling && canLaunchExecutorForApp(pos)) {
coresToAssign -= minCoresPerExecutor
assignedCores(pos) += minCoresPerExecutor

// If we are launching one executor per worker, then every iteration assigns 1 core
// to the executor. Otherwise, every iteration assigns cores to a new executor.
if (oneExecutorPerWorker) {
//TODO: 如果该Worker节点不能启动新的 Executor,则在老的executor 上 分配 minCoresPerExecutor 个 CPU core(此时该值默认 为 1 )
assignedExecutors(pos) = 1
} else {
//TODO: 如果该Worker节点可以启动新的 Executor,则在新的executor 上 分配 minCoresPerExecutor 个 CPU core(此时该值为 spark-submit脚本配置的 coresPerExecutor 值)
assignedExecutors(pos) += 1
}

// Spreading out an application means spreading out its executors across as
// many workers as possible. If we are not spreading out, then we should keep
// scheduling executors on this worker until we use all of its resources.
// Otherwise, just move on to the next worker.
if (spreadOutApps) {
// TODO: 这里传入 keepScheduling = false , 就是每次 worker上只分配 一次 core ,然后 到 下一个 worker 上 再去 分配 core,直到 worker
// TODO: 完成一次遍历
keepScheduling = false
}
}
}
freeWorkers = freeWorkers.filter(canLaunchExecutorForApp)
}
// 返回每个Worker节点分配的CPU核数
assignedCores
}

再来分析 allocateWorkerResourceToExecutors
private def allocateWorkerResourceToExecutors(
app: ApplicationInfo,
assignedCores: Int,
coresPerExecutor: Option[Int],
worker: WorkerInfo): Unit = {
// If the number of cores per executor is specified, we divide the cores assigned
// to this worker evenly among the executors with no remainder.
// Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
for (i <- 1 to numExecutors) {
val allocated = worker.acquireResources(app.desc.resourceReqsPerExecutor)
// TODO : 当前 这个 application 追加 一次 Executor
val exec = app.addExecutor(worker, coresToAssign, allocated)
//TODO: 给worker 线程 发送 launchExecutor 命令
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
}
ok,至此,spark最新版本 spark-3.0的Application 调度算法分析完毕!!!
您可能感兴趣的文档:

--结束END--

本文标题: spark-3.0 Application 调度算法解析

本文链接: https://www.lsjlt.com/news/2883.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • spark-3.0 Application 调度算法解析
    spark 各个版本的application 调度算法还是有这明显的不同之处的。从spark1.3.0 到 spark 1.6.1、spark2.x 到 现在最新的spark 3.x ,调度算法有了一定的修改。下面大家一起学习一下,最新的s...
    99+
    2018-05-18
    spark-3.0 Application 调度算法解析
  • Python实现调度算法代码详解
    调度算法 操作系统管理了系统的有限资源,当有多个进程(或多个进程发出的请求)要使用这些资源时,因为资源的有限性,必须按照一定的原则选择进程(请求)来占用资源。这就是调度。目的是控制资源使用者的数量,选取资源...
    99+
    2022-06-04
    算法 详解 代码
  • matlab遗传算法求解车间调度问题分析及实现源码
    目录一、车间调度简介1 车间调度定义2 传统作业车间调度二、遗传算法简介1 遗传算法概述2 遗传算法的特点和应用3 遗传算法的基本流程及实现技术3.1 遗传算法的基本流程3.2 遗传...
    99+
    2022-11-13
  • C++贪心算法处理多机调度问题详解
    多机调度问题思路 1、把作业按加工所用的时间从大到小排序 2、如果作业数目比机器的数目少或相等,则直接把作业分配下去 3、 如果作业数目比机器的数目多,则每台机器上先分配一个作业,如...
    99+
    2022-11-13
  • 用C语言递归实现火车调度算法详解
    目录1、代码2、代码详解3、用二叉树表示调用过程4、思维导图笔者在李云清版的《数据结构》中第二章遇到了这道经典的火车调度题,经过对一些前辈的代码进行学习,以下将这段火车代码进行分析详...
    99+
    2022-11-12
  • 如何使用matlab鸟群算法求解车间调度问题
    这篇文章主要介绍了如何使用matlab鸟群算法求解车间调度问题,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。一、车间调度简介1 车间调度定义车间调度是指根据产品制造的合理需求...
    99+
    2023-06-29
  • 如何使用matlab遗传算法求解车间调度问题
    这篇文章主要介绍了如何使用matlab遗传算法求解车间调度问题,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。一、车间调度简介1 车间调度定义车间调度是指根据产品制造的合理需求...
    99+
    2023-06-29
  • matlab鸟群算法求解车间调度问题详解及实现源码
    目录一、车间调度简介1 车间调度定义2 传统作业车间调度3 柔性作业车间调度二、蝴蝶优化算法(MBO)简介1 介绍2 香味3 具体算法三、部分源代码五、matlab版本及参考文献一、...
    99+
    2022-11-13
  • 在Go语言中如何解决并发任务的调度算法优化问题?
    在Go语言中如何解决并发任务的调度算法优化问题?Go语言作为一门旨在解决并发编程问题的语言,提供了丰富的并发特性和机制。然而,在实际应用中,我们常常遇到需要优化并发任务调度的问题。本文将介绍一种优化并发任务调度算法的方法,并给出具体的代码示...
    99+
    2023-10-22
    并发任务调度 Go语言并发优化 调度算法优化
  • matlab模拟退火算法单约束车间流水线调度解决实现及示例
    目录一、车间调度简介1 车间调度定义2 传统作业车间调度3 柔性作业车间调度二、模拟退火算法简介三、部分源代码四、运行结果五、matlab版本及参考文献一、车间调度简介 1 车间调度...
    99+
    2022-11-13
  • 【运筹优化】拉格朗日松弛 & 次梯度算法求解整数规划问题 + Java调用Cplex实战
    文章目录 一、拉格朗日松弛二、次梯度算法三、案例实战 一、拉格朗日松弛 当遇到一些很难求解的模型,但又不需要去求解它的精确解,只需要给出一个次优解或者解的上下界,这时便可以考虑采用松弛模...
    99+
    2023-10-26
    运筹优化 算法 Java 拉格朗日松弛 次梯度算法
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作