博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark FinalStage处理(Stage划分)
阅读量:6430 次
发布时间:2019-06-23

本文共 4013 字,大约阅读时间需要 13 分钟。

Spark FinalStage处理(Stage划分)

更多资源

  • github:
  • csdn(汇总视频在线看):

Youtube视频

  • Spark FinalStage处理(Stage划分)(Youtube视频) :

BiliBili视频

  • Spark FinalStage处理(Stage划分)(bilibili视频) :

说明

  • 由于DAGScheduler进行stage提交传的参数为FinalStage,所以对FinalStage的构成进行分析
  • RDD依赖为shuffleDep的stage已经进行了缓存,(这个时候已经对Stage进行明显的划分,只是没有提交) shuffleToMapStage.get(shuffleDep.shuffleId)

DAGScheduler事件处理JobSubmitted

  • 调用newResultStage()方法
private[scheduler] def handleJobSubmitted(jobId: Int,      finalRDD: RDD[_],      func: (TaskContext, Iterator[_]) => _,      partitions: Array[Int],      callSite: CallSite,      listener: JobListener,      properties: Properties) {    var finalStage: ResultStage = null    try {      // New stage creation may throw an exception if, for example, jobs are run on a      // HadoopRDD whose underlying HDFS files have been deleted.      finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)    } catch {      case e: Exception =>        logWarning("Creating new stage failed due to exception - job: " + jobId, e)        listener.jobFailed(e)        return    }    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)    clearCacheLocs()    logInfo("Got job %s (%s) with %d output partitions".format(      job.jobId, callSite.shortForm, partitions.length))    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")    logInfo("Parents of final stage: " + finalStage.parents)    logInfo("Missing parents: " + getMissingParentStages(finalStage))    val jobSubmissionTime = clock.getTimeMillis()    jobIdToActiveJob(jobId) = job    activeJobs += job    finalStage.setActiveJob(job)    val stageIds = jobIdToStageIds(jobId).toArray    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))    listenerBus.post(      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))    submitStage(finalStage)    submitWaitingStages()  }
  • 调用方法getParentStagesAndId()得到上级stage列表
/**   * Create a ResultStage associated with the provided jobId.   */  private def newResultStage(      rdd: RDD[_],      func: (TaskContext, Iterator[_]) => _,      partitions: Array[Int],      jobId: Int,      callSite: CallSite): ResultStage = {    val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)    val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)    stageIdToStage(id) = stage    updateJobIdStageIdMaps(jobId, stage)    stage  }
  • 调用方法getParentStages()
/**   * Helper function to eliminate some code re-use when creating new stages.   */  private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {    val parentStages = getParentStages(rdd, firstJobId)    val id = nextStageId.getAndIncrement()    (parentStages, id)  }
  • 该方法计算上级stage
  • 根据当前RDD=rdd4 的依赖类型判断是不是ShuffleDependency
  • 不是,找上级RDD,再继续判断上级RDD的依赖类型
  • 是,创建ShuffleMapStage并还回,此stage的RDD为rdd4的上级RDD
  • 注意只要有上级stage,就会一直先找上级stage,这样找到根上的stage的id为0,依次子stage的id加1
/**   * Get or create the list of parent stages for a given RDD.  The new Stages will be created with   * the provided firstJobId.   */  private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {    val parents = new HashSet[Stage]    val visited = new HashSet[RDD[_]]    // We are manually maintaining a stack here to prevent StackOverflowError    // caused by recursively visiting    val waitingForVisit = new Stack[RDD[_]]    def visit(r: RDD[_]) {      if (!visited(r)) {        visited += r        // Kind of ugly: need to register RDDs with the cache here since        // we can't do it in its constructor because # of partitions is unknown        for (dep <- r.dependencies) {          dep match {            case shufDep: ShuffleDependency[_, _, _] =>              parents += getShuffleMapStage(shufDep, firstJobId)            case _ =>              waitingForVisit.push(dep.rdd)          }        }      }    }    waitingForVisit.push(rdd)    while (waitingForVisit.nonEmpty) {      visit(waitingForVisit.pop())    }    parents.toList  }

图解FinalStage

转载地址:http://ojiga.baihongyu.com/

你可能感兴趣的文章
AdminLTE 学习笔记
查看>>
面试时,当你有权提问时,别客气,这是个逆转的好机会(内容摘自Java Web轻量级开发面试教程)...
查看>>
学习本身不难,难得是了解该学哪些——总结下我在架构师升级过程中的那些坑以及各种体会...
查看>>
poj 3216 Repairing Company
查看>>
npm install 错误 安装 chromedriver 失败的解决办法
查看>>
设计模式学习笔记之生成器模式
查看>>
jsp入门
查看>>
ORM之轻量级框架--Dapper
查看>>
asp.net mvc 强类型视图中传入List 数据到控制器
查看>>
自动化邮件报告平台-邮件发送highchart图表
查看>>
进程池的返回值
查看>>
053(二十一)
查看>>
ADO.NET笔记——执行事务
查看>>
QueryString 传值
查看>>
小谈一下反射
查看>>
C#调用SQlite常见问题汇总
查看>>
动态规划的简洁说明
查看>>
电脑技术论坛
查看>>
【转账】API自动化测试
查看>>
第一次java作业
查看>>