iis服务器助手广告广告
返回顶部
首页 > 资讯 > 精选 >大数据中Spark任务和集群启动流程是什么样的
  • 824
分享到

大数据中Spark任务和集群启动流程是什么样的

2023-06-02 01:06:06 824人浏览 独家记忆
摘要

这篇文章将为大家详细讲解有关大数据中spark任务和集群启动流程是什么样的,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。大数据分享Spark任务和集群启动流程大数据分享Spark任务和集群启

这篇文章将为大家详细讲解有关大数据spark任务和集群启动流程是什么样的,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

大数据分享Spark任务和集群启动流程

大数据分享Spark任务和集群启动流程,Spark集群启动流程

调用start-all.sh脚本,开始启动Master

Master启动以后,preStart方法调用了一个定时器,定时检查超时的Worker后删除

启动脚本会解析slaves配置文件,找到启动Worker的相应节点.开始启动Worker

Worker服务启动后开始调用preStart方法开始向所有的Master进行注册

Master接收到Worker发送过来的注册信息,Master开始保存注册信息并把自己的URL响应给Worker

Worker接收到Master的URL后并更新,开始调用一个定时器,定时的向Master发送心跳信息

任务提交流程

Driver端会通过spark-submit脚本启动SaparkSubmit进程,此时创建了一个非常重要的对象(SparkContext),开始向Master发送消息

Master接收到发送过来的信息后开始生成任务信息,并把任务信息放到一个对列里

Master把所有有效的Worker过滤出来,按照空闲的资源进行排序

Master开始向有效的Worker通知拿取任务信息并启动相应的Executor

Worker启动Executor并向Driver反向注册

Driver开始把生成的task发送给相应的Executor,Executor开始执行任务

集群启动流程

首先创建Master类

import akka.actor.{Actor, ActorSystem, Props}

import com.typesafe.config.{Config, ConfigFactory}

import Scala.collection.mutable

import scala.concurrent.duration._

class Master(val masterHost: String, val masterPort: Int) extends Actor{

// 用来存储Worker的注册信息

val idToWorker = new mutable.HashMap[String, WorkerInfo]()

// 用来存储Worker的信息

val workers = new mutable.HashSet[WorkerInfo]()

// Worker的超时时间间隔

val checkInterval: Long = 15000

// 生命周期方法,在构造器之后,receive方法之前只调用一次

override def preStart(): Unit = {

// 启动一个定时器,用来定时检查超时的Worker

import context.dispatcher

context.system.scheduler.schedule(0 millis, checkInterval millis, self, CheckTimeOutWorker)

}

// 在preStart方法之后,不断的重复调用

override def receive: Receive = {

// Worker -> Master

case ReGISterWorker(id, host, port, memory, cores) => {

if (!idToWorker.contains(id)){

val workerInfo = new WorkerInfo(id, host, port, memory, cores)

idToWorker += (id -> workerInfo)

workers += workerInfo

println("a worker registered")

sender ! RegisteredWorker(s"akka.tcp://${Master.MASTER_SYSTEM}" +

s"@${masterHost}:${masterPort}/user/${Master.MASTER_ACTOR}")

}

}

case HeartBeat(workerId) => {

// 通过传过来的workerId获取对应的WorkerInfo

val workerInfo: WorkerInfo = idToWorker(workerId)

// 获取当前时间

val currentTime = System.currentTimeMillis()

// 更新最后一次心跳时间

workerInfo.lastHeartbeatTime = currentTime

}

case CheckTimeOutWorker => {

val currentTime = System.currentTimeMillis()

val toRemove: mutable.HashSet[WorkerInfo] =

workers.filter(w => currentTime - w.lastHeartbeatTime > checkInterval)

// 将超时的Worker从idToWorker和workers中移除

toRemove.foreach(deadWorker => {

idToWorker -= deadWorker.id

workers -= deadWorker

})

println(s"num of workers: ${workers.size}")

}

}

}

object Master{

val MASTER_SYSTEM = "MasterSystem"

val MASTER_ACTOR = "Master"

def main(args: Array[String]): Unit = {

val host = args(0)

val port = args(1).toInt

val configStr =

s"""

|akka.actor.provider = "akka.remote.RemoteActorRefProvider"

|akka.remote.Netty.tcp.hostname = "$host"

|akka.remote.netty.tcp.port = "$port"

""".stripMargin

// 配置创建Actor需要的配置信息

val config: Config = ConfigFactory.parseString(configStr)

// 创建ActorSystem

val actorSystem: ActorSystem = ActorSystem(MASTER_SYSTEM, config)

// 用actorSystem实例创建Actor

actorSystem.actorOf(Props(new Master(host, port)), MASTER_ACTOR)

actorSystem.awaitTermination()

}

}

创建RemoteMsg特质

trait RemoteMsg extends Serializable{

}

// Master -> self(Master)

case object CheckTimeOutWorker

// Worker -> Master

case class RegisterWorker(id: String, host: String,

port: Int, memory: Int, cores: Int) extends RemoteMsg

// Master -> Worker

case class RegisteredWorker(masterUrl: String) extends RemoteMsg

// Worker -> self

case object SendHeartBeat

// Worker -> Master(HeartBeat)

case class HeartBeat(workerId: String) extends RemoteMsg

创建Worker类

import java.util.UUID

import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}

import com.typesafe.config.{Config, ConfigFactory}

import scala.concurrent.duration._

class Worker(val host: String, val port: Int, val masterHost: String,

val masterPort: Int, val memory: Int, val cores: Int) extends Actor{

// 生成一个Worker ID

val workerId = UUID.randomUUID().toString

// 用来存储MasterURL

var masterUrl: String = _

// 心跳时间间隔

val heartBeat_interval: Long = 10000

// master的Actor

var master: ActorSelection = _

override def preStart(){

// 获取Master的Actor

master = context.actorSelection(s"akka.tcp://${Master.MASTER_SYSTEM}" +

s"@${masterHost}:${masterPort}/user/${Master.MASTER_ACTOR}")

master ! RegisterWorker(workerId, host, port, memory, cores)

}

override def receive: Receive = {

// Worker接收到Master发送过来的注册成功的信息(masterUrl)

case RegisteredWorker(masterUrl) => {

this.masterUrl = masterUrl

// 启动一个定时器,定时给Master发送心跳

import context.dispatcher

context.system.scheduler.schedule(0 millis, heartBeat_interval millis, self, SendHeartBeat)

}

case SendHeartBeat => {

// 向Master发送心跳

master ! HeartBeat(workerId)

}

}

}

object Worker{

val WORKER_SYSTEM = "WorkerSystem"

val WORKER_ACTOR = "Worker"

def main(args: Array[String]): Unit = {

val host = args(0)

val port = args(1).toInt

val masterHost = args(2)

val masterPort = args(3).toInt

val memory = args(4).toInt

val cores = args(5).toInt

val configStr =

s"""

|akka.actor.provider = "akka.remote.RemoteActorRefProvider"

|akka.remote.netty.tcp.hostname = "$host"

|akka.remote.netty.tcp.port = "$port"

""".stripMargin

// 配置创建Actor需要的配置信息

val config: Config = ConfigFactory.parseString(configStr)

// 创建ActorSystem

val actorSystem: ActorSystem = ActorSystem(WORKER_SYSTEM, config)

// 用actorSystem实例创建Actor

val worker: ActorRef = actorSystem.actorOf(

Props(new Worker(host, port, masterHost, masterPort, memory, cores)), WORKER_ACTOR)

actorSystem.awaitTermination()

}

}

创建初始化类

class WorkerInfo(val id: String, val host: String, val port: Int,

val memory: Int, val cores: Int) {

// 初始化最后一次心跳的时间

var lastHeartbeatTime: Long = _

}

本地测试需要传入参数:

 大数据中Spark任务和集群启动流程是什么样的

关于大数据中Spark任务和集群启动流程是什么样的就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

--结束END--

本文标题: 大数据中Spark任务和集群启动流程是什么样的

本文链接: https://www.lsjlt.com/news/228378.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • 大数据中Spark任务和集群启动流程是什么样的
    这篇文章将为大家详细讲解有关大数据中Spark任务和集群启动流程是什么样的,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。大数据分享Spark任务和集群启动流程大数据分享Spark任务和集群启...
    99+
    2023-06-02
  • 大数据框架中Hadoop和Spark的异同是什么
    大数据框架中Hadoop和Spark的异同是什么,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。在大数据框架中Hadoop和Spark可以说是很火的了,这俩个框架都是对数据进行存...
    99+
    2023-06-28
  • SpringBoot中WEB的启动流程是什么
    这篇文章主要介绍“SpringBoot中WEB的启动流程是什么”,在日常操作中,相信很多人在SpringBoot中WEB的启动流程是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”SpringBoot中WE...
    99+
    2023-06-29
  • 大数据Atlas的部署和维护流程是怎样的
    大数据Atlas的部署和维护流程如下: 部署Atlas:首先需要安装和配置Hadoop集群,然后下载并安装Atlas的软件包,在...
    99+
    2024-03-08
    Atlas
  • Teradata与Hadoop、Spark等大数据平台的集成方式及优势是什么
    Teradata与Hadoop、Spark等大数据平台的集成方式主要有以下几种: 数据集成:Teradata可以与Hadoop、...
    99+
    2024-04-09
    Teradata
  • 大数据流处理中Flume、Kafka和NiFi的对比是怎样的
    今天就跟大家聊聊有关大数据流处理中Flume、Kafka和NiFi的对比是怎样的,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。我们将简要介绍三种Apache处理工具:Flume、Ka...
    99+
    2023-06-02
  • Android中的ActivityThread和APP启动过程是什么
    ActivityThread是Android中负责管理所有Activity的线程,它负责处理Activity的生命周期、事件分发、消...
    99+
    2024-03-08
    Android
  • 现代云架构中的AWS服务器群和数据库是怎么样的
    这篇文章给大家介绍现代云架构中的AWS服务器群和数据库是怎么样的,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。当今云计算技术成了主流的架构和互联网基础服务架构之一。越来越多的企业、组织...
    99+
    2024-04-02
  • 数据库事务Event的生成和写入流程是什么
    这篇文章主要讲解了“数据库事务Event的生成和写入流程是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“数据库事务Event的生成和写入流程是什么”吧!...
    99+
    2024-04-02
  • 大数据编程算法中,Python和Apache的编程优势是什么?
    随着科技不断发展,大数据分析已经成为了当今商业领域中非常重要的一部分。而在大数据编程算法中,Python和Apache的编程优势也备受关注。本文将探讨Python和Apache在大数据编程算法中的优势,并介绍一些演示代码。 一、Python...
    99+
    2023-08-26
    apache 大数据 编程算法
  • php数据流中第K大元素的计算方法是什么
    这篇文章主要介绍“php数据流中第K大元素的计算方法是什么”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“php数据流中第K大元素的计算方法是什么”文章能帮助大家解决...
    99+
    2024-04-02
  • 云服务器和数据库的关系是什么样的
    云服务器和数据库都是存储和访问数据的软件系统。它们之间的关系可以概括为以下几点: 数据存储和访问:云服务器存储和管理的数据可能会存储在多个位置,例如本地磁盘、Web服务器或数据中心。数据也可能会分布在不同的云存储平台上,例如Amazon...
    99+
    2023-10-26
    关系 服务器 数据库
  • Linux中mysql服务启动和关闭的命令是什么
    这篇文章主要介绍“Linux中mysql服务启动和关闭的命令是什么”,在日常操作中,相信很多人在Linux中mysql服务启动和关闭的命令是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Linux中mys...
    99+
    2023-06-28
  • 大数据报表工具中动态参数的使用方法和场景是什么
    这篇文章将为大家详细讲解有关大数据报表工具中动态参数的使用方法和场景是什么,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。报表开发过程中,有的时候我们会觉得普通参数很难满足一些业务需求,比如第...
    99+
    2023-06-04
  • JavaScript中的程序控制流和函数方法是什么
    本篇内容主要讲解“JavaScript中的程序控制流和函数方法是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“JavaScript中的程序控制流和函数方法是什么”吧!程序控制流程序的运行可以...
    99+
    2023-07-04
  • 云服务器和数据库的关系是什么样的呢
    云服务器和数据库都是存储和管理数据的软件系统,它们之间的关系可以概括为以下几种: 云服务器是云计算平台中的基础设施服务。云服务器提供了存储和管理数据的能力,通常用于容纳大量的数据。它们通常被用来托管在云计算平台上,如Amazon Web...
    99+
    2023-10-27
    关系 服务器 数据库
  • 申请亚马逊服务器的条件和流程是什么样的呢
    申请流程主要包括以下几个步骤: 1. 在网上搜索亚马逊服务器的相关信息,了解其申请流程; 2. 按照亚马逊的要求提交申请材料,包括身份证明、雇佣合同、社保证明等; 3. 等待审核通过后,登录亚马逊官网注册账号并购买服务器; 4. 定期更新服...
    99+
    2023-10-27
    亚马逊 流程 条件
  • Python 大数据处理中,numpy 和 http 的区别是什么?
    在 Python 大数据处理中,numpy 和 http 是两个经常被使用的库和协议。虽然它们都可以用于数据处理,但是它们的本质和用途是不同的。 Numpy,即 Numerical Python,是一个开源的 Python 扩展库,用于支...
    99+
    2023-08-16
    大数据 numpy http
  • mysql metadata lock元数据锁中锁状态lock_status流转图是什么样的
    mysql metadata lock元数据锁中锁状态lock_status流转图是什么样的,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。前言...
    99+
    2024-04-02
  • 免费申请谷歌云服务器的条件和流程是什么样的
    一、准备申请材料 企业营业执照或相关证明文件。 组织机构代码证。 法人身份证。 联系人身份证及电话。 网站域名注册申请表。 网站基本信息资料(网站名称、网站类型、网站简介、网站功能、网站内容等)。 二、填写申请表 在申请谷歌云服务器前...
    99+
    2023-10-28
    免费申请 流程 条件
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作