SparkRDMA:使用RDMA技術(shù)提升Spark的Shuffle性能

大數(shù)據(jù)

Spark?Shuffle 基礎(chǔ)

在 MapReduce 框架中,Shuffle 是連接 Map 和 Reduce 之間的橋梁,Reduce 要讀取到 Map 的輸出必須要經(jīng)過(guò) Shuffle 這個(gè)環(huán)節(jié);而 Reduce 和 Map 過(guò)程通常不在一臺(tái)節(jié)點(diǎn),這意味著 Shuffle 階段通常需要跨網(wǎng)絡(luò)以及一些磁盤(pán)的讀寫(xiě)操作,因此 Shuffle 的性能高低直接影響了整個(gè)程序的性能和吞吐量。

與 MapReduce 計(jì)算框架一樣,Spark?作業(yè)也有 Shuffle 階段,通常以 Shuffle 來(lái)劃分 Stage;而 Stage 之間的數(shù)據(jù)交互是需要 Shuffle 來(lái)完成的。整個(gè)過(guò)程圖如下所示:

大數(shù)據(jù)

如果想及時(shí)了解Spark、Hadoop或者Hbase相關(guān)的文章,歡迎關(guān)注微信公共帳號(hào):iteblog_hadoop

從上面簡(jiǎn)單的介紹可以得到以下幾個(gè)結(jié)論:

不管是 MapReduce 還是 Spark 作業(yè),Shuffle 操作是很消耗資源的,這里的資源包括:CPU、RAM、磁盤(pán)還有網(wǎng)絡(luò);我們需要盡可能地避免 Shuffle 操作

而目前最新的 Spark(2.2.0) 內(nèi)置只支持一種 Shuffle 實(shí)現(xiàn):org.apache.spark.shuffle.sort.SortShuffleManager,通過(guò)參數(shù)?spark.shuffle.manager?來(lái)配置。這是標(biāo)準(zhǔn)的 Spark Shuffle 實(shí)現(xiàn),其內(nèi)部實(shí)現(xiàn)依賴(lài)了 Netty 框架。本文并不打算詳細(xì)介紹 Spark 內(nèi)部 Shuffle 是如何實(shí)現(xiàn)的,這里我要介紹社區(qū)對(duì) Shuffle 的改進(jìn)。

RDMA 技術(shù)

在進(jìn)行下面的介紹之前,我們先來(lái)了解一些基礎(chǔ)知識(shí)。

傳統(tǒng)的 TCP Socket 數(shù)據(jù)傳輸需要經(jīng)過(guò)很多步驟:數(shù)據(jù)先從源端應(yīng)用程序拷貝到當(dāng)前主機(jī)的 Sockets 緩存區(qū),然后再拷貝到 TransportProtocol Driver,然后到 NIC Driver,最后 NIC 通過(guò)網(wǎng)絡(luò)將數(shù)據(jù)發(fā)送到目標(biāo)主機(jī)的 NIC,目標(biāo)主機(jī)又經(jīng)過(guò)上面步驟將數(shù)據(jù)傳輸?shù)綉?yīng)用程序,整個(gè)過(guò)程如下:

大數(shù)據(jù)

如果想及時(shí)了解Spark、Hadoop或者Hbase相關(guān)的文章,歡迎關(guān)注微信公共帳號(hào):iteblog_hadoop

從上圖可以看出,網(wǎng)絡(luò)數(shù)據(jù)的傳輸很大一部分時(shí)間用于數(shù)據(jù)的拷貝!如果需要傳輸?shù)臄?shù)據(jù)很大,那么這個(gè)階段用的時(shí)間很可能占整個(gè)作業(yè)運(yùn)行時(shí)間的很大一部分!那么有沒(méi)有一種方法直接省掉不同層的數(shù)據(jù)拷貝,使得目標(biāo)主機(jī)直接從源端主機(jī)內(nèi)存獲取數(shù)據(jù)?還真有,這就是 RDMA 技術(shù)!

RDMA(Remote Direct Memory Access)技術(shù)全稱(chēng)遠(yuǎn)程直接內(nèi)存訪問(wèn),是一種直接內(nèi)存訪問(wèn)技術(shù),它將數(shù)據(jù)直接從一臺(tái)計(jì)算機(jī)的內(nèi)存?zhèn)鬏數(shù)搅硪慌_(tái)計(jì)算機(jī),無(wú)需雙方操作系統(tǒng)的介入。這允許高通量、低延遲的網(wǎng)絡(luò)通信,尤其適合在大規(guī)模并行計(jì)算機(jī)集群中使用(本段摘抄自?維基百科 – 遠(yuǎn)程直接內(nèi)存訪問(wèn))。RDMA 有以下幾個(gè)特點(diǎn):

Zero-copy直接硬件接口(Direct hardware interface),繞過(guò)內(nèi)核和 TCP / IP 的 IO亞微秒延遲Flow control and reliability is offloaded in hardware

所以利用 RDMA 技術(shù)進(jìn)行數(shù)據(jù)傳輸看起來(lái)像下面一樣:

大數(shù)據(jù)

如果想及時(shí)了解Spark、Hadoop或者Hbase相關(guān)的文章,歡迎關(guān)注微信公共帳號(hào):iteblog_hadoop
從上面看出,使用了 RDMA 技術(shù)之后,雖然源端主機(jī)和目標(biāo)主機(jī)是跨網(wǎng)絡(luò)的,但是他們之間的數(shù)據(jù)交互是直接從對(duì)方內(nèi)存獲取的,這明顯會(huì)加快整個(gè)計(jì)算過(guò)程。

SparkRDMA

好,現(xiàn)在基礎(chǔ)的知識(shí)咱們已經(jīng)獲取到了,我們正式進(jìn)入本文主題。由 Mellanox Technologies 公司開(kāi)發(fā)并開(kāi)源的 SparkRDMA ShuffleManager (GitHub 地址:https://github.com/Mellanox/SparkRDMA)就是采用 RDMA 技術(shù),使得 Spark 作業(yè)在 Shuffle 數(shù)據(jù)的時(shí)候使用 RDMA 方式,而非標(biāo)準(zhǔn)的 TCP 方式。在 SparkRDMA 的官方 Wiki 里面有如下介紹:

SparkRDMA is a high-performance, scalable and efficient ShuffleManager plugin for Apache Spark. It utilizes RDMA (Remote Direct Memory Access) technology to reduce CPU cycles needed for Shuffle data transfers. It reduces memory usage by reusing memory for transfers instead of copying data multiple times down the traditional TCP-stack.

可以看出,SparkRDMA 就是擴(kuò)展了 Spark 的?ShuffleManager?接口,并且采用了 RDMA 技術(shù)。在測(cè)試的結(jié)果顯示,采用 RDMA 進(jìn)行 Shuffle 數(shù)據(jù)比標(biāo)準(zhǔn)的方式快 2.18 倍!

大數(shù)據(jù)

如果想及時(shí)了解Spark、Hadoop或者Hbase相關(guān)的文章,歡迎關(guān)注微信公共帳號(hào):iteblog_hadoop
SparkRDMA 開(kāi)發(fā)者們給 Spark 社區(qū)提交了一個(gè) Issue:[SPARK-22229] SPIP: RDMA Accelerated Shuffle Engine,詳細(xì)的設(shè)計(jì)文檔:這里。不過(guò)從社區(qū)的回復(fù)來(lái)看,最少目前不會(huì)整合到 Spark 代碼中去。

安裝使用

如果你想使用 SparkRDMA,我們需要 Apache Spark 2.0.0/2.1.0/2.2.0、Java 8 以及支持 RDMA 技術(shù)的網(wǎng)絡(luò)(比如:RoCE 和 Infiniband)。

SparkRDMA 官方為不同版本的 Spark 預(yù)先編譯好相應(yīng)的 jar 包,我們可以訪問(wèn)?這里?下載。解壓之后會(huì)得到以下四個(gè)文件:

spark-rdma-1.0-for-spark-2.0.0-jar-with-dependencies.jarspark-rdma-1.0-for-spark-2.1.0-jar-with-dependencies.jarspark-rdma-1.0-for-spark-2.2.0-jar-with-dependencies.jarlibdisni.so

除了?libdisni.so?文件一定要安裝到 Spark 集群的所有節(jié)點(diǎn)上,其他的 jar 包只需要根據(jù)我們的 Spark 版本進(jìn)行選擇。相關(guān)的文件部署好之后,我們需要將這個(gè) SparkRDMA 模塊加入到 Spark 的運(yùn)行環(huán)境中去,如下設(shè)置:

spark.executor.extraClassPath /path/to/SparkRDMA/spark-rdma-1.0-for-spark-2.0.0-jar-with-dependencies.jar

為了啟用 SparkRDMA Shuffle Manager 插件,我們還需要修改?spark.shuffle.manager?的值,只需要在?$SPARK_HOME/conf/spark-defaults.conf?里面加入以下的配合即可:

spark.shuffle.manager?? org.apache.spark.shuffle.rdma.RdmaShuffleManager

其他的就和正常使用 Spark 一樣。

關(guān)于配置libdisni.so

我們需要將?libdisni.so?文件分發(fā)到集群的所有節(jié)點(diǎn)的同一目錄下,然后配置下面的環(huán)境:

export JAVA_LIBRARY_PATH=$JAVA_LIBRARY_PATH:/home/iteblog/spark-2.1.0-bin/rdma/ export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/home/iteblog/spark-2.1.0-bin/rdma/ export SPARK_YARN_USER_ENV="JAVA_LIBRARY_PATH=$JAVA_LIBRARY_PATH,LD_LIBRARY_PATH=$LD_LIBRARY_PATH"

其中?/home/iteblog/spark-2.1.0-bin/rdma/?存放了libdisni.so?文件。運(yùn)行的過(guò)程中可能還需要?libibverbs.so.1?和?librdmacm.so.1?文件,可以通過(guò)下面命令解決

yum -y install?? libibverbs? librdmacm

然后可以通過(guò)下面命令啟動(dòng) Spark:

bin/spark-shell --master yarn-client --driver-memory 18g? --executor-memory 15g???????????????? \????????--queue iteblog --executor-cores 1? --num-executors 8?????????????????????????????????? \????????--conf "spark.yarn.dist.archives=/home/iteblog/spark-2.1.0-bin/rdma/rdma.tgz"?????????? \????????--conf "spark.executor.extraLibraryPath=/home/iteblog/spark-2.1.0-bin/rdma/libdisni.so" \????????--conf "spark.driver.extraLibraryPath=/home/iteblog/spark-2.1.0-bin/rdma/libdisni.so"?? \????????--conf "spark.executor.extraClassPath=rdma.tgz/rdma/*"????????????????????????????????? \????????--conf "spark.driver.extraClassPath=/home/iteblog/spark-2.1.0-bin/rdma/*"?????????????? \????????--conf "spark.shuffle.manager=org.apache.spark.shuffle.rdma.RdmaShuffleManager"

不過(guò)如果你網(wǎng)絡(luò)不支持 RDMA 技術(shù),那么就像我一樣會(huì)遇到下面的問(wèn)題:

17/11/15 22:01:48 ERROR rdma.RdmaNode: Failed in RdmaNode constructor17/11/15 22:01:48 ERROR spark.SparkContext: Error initializing SparkContext.java.lang.reflect.InvocationTargetException????at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)????at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)????at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)????at java.lang.reflect.Constructor.newInstance(Constructor.java:423)????at org.apache.spark.SparkEnv$.instantiateClass$1(SparkEnv.scala:265)????at org.apache.spark.SparkEnv$.create(SparkEnv.scala:323)????at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:174)????at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:257)????at org.apache.spark.SparkContext.<init>(SparkContext.scala:432)????at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2313)????at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:868)????at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:860)????at scala.Option.getOrElse(Option.scala:121)????at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:860)????at org.apache.spark.repl.Main$.createSparkSession(Main.scala:95)????at $line3.$read$$iw$$iw.<init>(<console>:15)????at $line3.$read$$iw.<init>(<console>:42)????at $line3.$read.<init>(<console>:44)????at $line3.$read$.<init>(<console>:48)????at $line3.$read$.<clinit>(<console>)????at $line3.$eval$.$print$lzycompute(<console>:7)????at $line3.$eval$.$print(<console>:6)????at $line3.$eval.$print(<console>)????at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)????at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)????at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)????at java.lang.reflect.Method.invoke(Method.java:498)????at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)????at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)????at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)????at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)????at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)????at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)????at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637)????at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)????at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)????at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807)????at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681)????at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395)????at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply$mcV$sp(SparkILoop.scala:38)????at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(SparkILoop.scala:37)????at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(SparkILoop.scala:37)????at scala.tools.nsc.interpreter.IMain.beQuietDuring(IMain.scala:214)????at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:37)????at org.apache.spark.repl.SparkILoop.loadFiles(SparkILoop.scala:105)????at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:920)????at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)????at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)????at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97)????at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909)????at org.apache.spark.repl.Main$.doMain(Main.scala:68)????at org.apache.spark.repl.Main$.main(Main.scala:51)????at org.apache.spark.repl.Main.main(Main.scala)????at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)????at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)????at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)????at java.lang.reflect.Method.invoke(Method.java:498)????at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)????at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)????at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)????at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)????at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)Caused by: java.io.IOException: Unable to allocate RDMA Event Channel????at org.apache.spark.shuffle.rdma.RdmaNode.<init>(RdmaNode.java:67)????at org.apache.spark.shuffle.rdma.RdmaShuffleManager.<init>(RdmaShuffleManager.scala:181)????... 62 morejava.io.IOException: Unable to allocate RDMA Event Channel??at org.apache.spark.shuffle.rdma.RdmaNode.<init>(RdmaNode.java:67)??at org.apache.spark.shuffle.rdma.RdmaShuffleManager.<init>(RdmaShuffleManager.scala:181)??at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)??at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)??at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)??at java.lang.reflect.Constructor.newInstance(Constructor.java:423)??at org.apache.spark.SparkEnv$.instantiateClass$1(SparkEnv.scala:265)??at org.apache.spark.SparkEnv$.create(SparkEnv.scala:323)??at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:174)??at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:257)??at org.apache.spark.SparkContext.<init>(SparkContext.scala:432)??at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2313)??at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:868)??at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:860)??at scala.Option.getOrElse(Option.scala:121)??at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:860)??at org.apache.spark.repl.Main$.createSparkSession(Main.scala:95)??... 47 elided

這樣的話那就沒(méi)法測(cè)試了,哈哈。。如果真要使用 RDMA ,咨詢你公司的 OPS 如何配置這個(gè)吧。

免責(zé)聲明:本網(wǎng)站內(nèi)容主要來(lái)自原創(chuàng)、合作伙伴供稿和第三方自媒體作者投稿,凡在本網(wǎng)站出現(xiàn)的信息,均僅供參考。本網(wǎng)站將盡力確保所提供信息的準(zhǔn)確性及可靠性,但不保證有關(guān)資料的準(zhǔn)確性及可靠性,讀者在使用前請(qǐng)進(jìn)一步核實(shí),并對(duì)任何自主決定的行為負(fù)責(zé)。本網(wǎng)站對(duì)有關(guān)資料所引致的錯(cuò)誤、不確或遺漏,概不負(fù)任何法律責(zé)任。任何單位或個(gè)人認(rèn)為本網(wǎng)站中的網(wǎng)頁(yè)或鏈接內(nèi)容可能涉嫌侵犯其知識(shí)產(chǎn)權(quán)或存在不實(shí)內(nèi)容時(shí),應(yīng)及時(shí)向本網(wǎng)站提出書(shū)面權(quán)利通知或不實(shí)情況說(shuō)明,并提供身份證明、權(quán)屬證明及詳細(xì)侵權(quán)或不實(shí)情況證明。本網(wǎng)站在收到上述法律文件后,將會(huì)依法盡快聯(lián)系相關(guān)文章源頭核實(shí),溝通刪除相關(guān)內(nèi)容或斷開(kāi)相關(guān)鏈接。

2017-11-16
SparkRDMA:使用RDMA技術(shù)提升Spark的Shuffle性能
Spark?Shuffle 基礎(chǔ) 在 MapReduce 框架中,Shuffle 是連接 Map 和 Reduce 之間的橋梁,Reduce 要讀取到 Map

長(zhǎng)按掃碼 閱讀全文