博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark技术内幕: Shuffle详解(一)
阅读量:6358 次
发布时间:2019-06-23

本文共 3795 字,大约阅读时间需要 12 分钟。

通过上面一系列文章,我们知道在集群启动时,在Standalone模式下,Worker会向Master注册,使得Master可以感知进而管理整个集群;Master通过借助ZK,可以简单的实现HA;而应用方通过SparkContext这个与集群的交互接口,在创建SparkContext时就完成了Application的注册,Master为其分配Executor;在应用方创建了RDD并且在这个RDD上进行了很多的Transformation后,触发action,通过DAGScheduler将DAG划分为不同的Stage后,将Stage转换为TaskSet交给TaskSchedulerImpl;TaskSchedulerImpl通过SparkDeploySchedulerBackend的reviveOffers,最终向ExecutorBackend发送LaunchTask的消息;ExecutorBackend接收到消息后,启动Task,开始在集群中启动计算。

接下来,会介绍一些更详细的细节实现。

Shuffle,无疑是性能调优的一个重点,本文将从源码实现的角度,深入解析Spark Shuffle的实现细节。

每个Stage的上边界,要不是需要从外部存储读取数据,要么需要读取上一个Stage的输出;而下边界,要么是需要写入本地文件系统,以供child Stage读取,要么是ResultTask,需要输出结果了。

首先从org.apache.spark.rdd.ShuffledRDD开始, 因为ShuffledRDD是一个Stage的开始,它需要获取上一个Stage的输出结果,然后进行接下来的运算。那么这个数据获取是如何实现的?顺着ShuffledRDD的实现,我们可以理清这条线。首先可以看一下compute是如何实现的。

override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {    val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]    SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)      .read()      .asInstanceOf[Iterator[(K, C)]]  }
它需要从ShuffleManager获取shuffleReader,然后读取数据进行计算。看一下shuffleManager:

// Let the user specify short names for shuffle managers    val shortShuffleMgrNames = Map(      "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",      "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")    val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")    val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)    val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
ShuffleManager分为hash和sort,hash是默认的,即Shuffle时不排序。熟悉MapReduce的同学都知道,MapReduce是无论如何都要排序的,即到Reduce端的都是已经排序好的,当然这么做也是为了可以处理海量的数据。在Spark1.1之前,只支持hash based的Shuffle,sort based Shuffle是1.1新加入的实验功能。

hash顾名思义,在Reduce时的数据需要求有序,因此可以在Reduce获得了数据后,立即进行处理;而不需要等待所有的数据都得到后再处理。这个接下来会通过源码进行解释。而sort,意味着排序,实际上对于sortByKey这种转换可能sort是更有意义的。

ShuffledRDD是通过org.apache.spark.shuffle.hash.HashShuffleReader获取上一个Stage的结果。而HashShuffleReader通过org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$#fetch来获取结果。而fetch通过调用org.apache.spark.storage.BlockManager#getMultiple来转发请求:

def getMultiple(      blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],      serializer: Serializer,      readMetrics: ShuffleReadMetrics): BlockFetcherIterator = {    val iter = new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer,      readMetrics)    iter.initialize()    iter  }

而最终的实现在org.apache.spark.storage.BlockFetcherIterator.BasicBlockFetcherIterator#initialize中,

override def initialize() {      // Split local and remote blocks.      // 获得需要远程请求的数据列表,并且将已经在本地的数据的blockid放在localBlocksToFetch中,      // 并且在org.apache.spark.storage.BlockFetcherIterator.BasicBlockFetcherIterator.getLocalBlocks进行本地读取      val remoteRequests = splitLocalRemoteBlocks()      // Add the remote requests into our queue in a random order      fetchRequests ++= Utils.randomize(remoteRequests)      // Send out initial requests for blocks, up to our maxBytesInFlight      while (!fetchRequests.isEmpty && //保证占用内存不超过设定的值spark.reducer.maxMbInFlight,默认值是48M        (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {        sendRequest(fetchRequests.dequeue())      }      val numFetches = remoteRequests.size - fetchRequests.size      logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime))      // Get Local Blocks      startTime = System.currentTimeMillis      getLocalBlocks() // 从本地获取      logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms")    }
具体获取如何获取的策略都在org.apache.spark.storage.BlockFetcherIterator.BasicBlockFetcherIterator#splitLocalRemoteBlocks中。这个会在下一篇博文中详解。

转载于:https://www.cnblogs.com/wuwa/p/6190775.html

你可能感兴趣的文章
Android NDK开发:JNI基础篇
查看>>
使用Maven命令快速建立项目结构
查看>>
二分查找,php
查看>>
python面试题-django相关
查看>>
Python——eventlet.greenthread
查看>>
记大众点评之面试经历
查看>>
第三章:基本概念
查看>>
Jersey+mybatis实现web项目第一篇
查看>>
C++形参中const char * 与 char * 的区别
查看>>
espresso 2.0.4 Apple Xcode 4.4.1 coteditor 价格
查看>>
Object-C中emoji与json的问题
查看>>
linux 命令
查看>>
灾后重建
查看>>
Nothing 和 Is
查看>>
第一个sprint冲刺第三天
查看>>
周末web前端练习
查看>>
hdu 5754 Life Winner Bo 博弈论
查看>>
Overlay network 覆盖网络
查看>>
Linux之编译需要的文件变化时刻
查看>>
IntelliJ IDEA中怎么查看方法说明?
查看>>