In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
Spark Shuffle Foundation
In the MapReduce framework, Shuffle is the bridge between Map and Reduce, and Reduce must go through Shuffle to read the output to Map; while Reduce and Map processes are usually not in the same node, which means that the Shuffle phase usually requires read and write operations across the network and some disks, so the performance of Shuffle directly affects the performance and throughput of the whole program.
Like the MapReduce computing framework, Spark jobs also have Shuffle phases, which are usually divided into Stage; by Shuffle, while data interaction between Stage needs Shuffle to complete. The whole process diagram is as follows:
From the brief introduction above, we can draw the following conclusions:
Whether it is MapReduce or Spark jobs, Shuffle operations are very resource-consuming, including CPU, RAM, disk and network.
We need to avoid Shuffle operations as much as possible
At present, the latest Spark (2.2.0) only supports one Shuffle implementation: org.apache.spark.shuffle.sort.SortShuffleManager, which is configured by the parameter spark.shuffle.manager. This is a standard Spark Shuffle implementation, and its internal implementation relies on the Netty framework. This article does not intend to elaborate on how Shuffle is implemented within Spark, but here I will introduce community improvements to Shuffle.
RDMA technology
Before proceeding to the following introduction, let's learn some basics.
Traditional TCP Socket data transfer needs to go through many steps: first, the data is copied from the source application to the Sockets cache of the current host, then to TransportProtocol Driver, and then to NIC Driver. Finally, NIC sends the data to the NIC of the target host through the network, and the target host transmits the data to the application through the above steps. The whole process is as follows:
As can be seen from the above picture, the transmission of network data spends a large part of its time on data copying. If there is a large amount of data to be transferred, the time spent in this phase is likely to account for a large part of the overall job run time! Is there a way to directly omit the data copies of different layers so that the target host can obtain data directly from the memory of the source host? Indeed, this is RDMA technology!
RDMA (Remote Direct Memory Access) technology, the full name of remote direct memory access, is a direct memory access technology, which transfers data directly from the memory of one computer to another without the intervention of both operating systems. This allows high-throughput, low-latency network communication, especially for use in massively parallel computer clusters (this paragraph is excerpted from Wikipedia-remote direct memory access). RDMA has the following characteristics:
Zero-copy
Direct hardware interface (Direct hardware interface), bypassing kernel and TCP / IP IO
Submicrosecond delay
Flow control and reliability is offloaded in hardware
So using RDMA technology for data transfer looks like the following:
As can be seen from the above, after using RDMA technology, although the source host and the target host are cross-network, the data interaction between them is directly obtained from each other's memory, which obviously speeds up the whole computing process.
SparkRDMA
Well, now that we have acquired the basic knowledge, we officially enter the topic of this article. SparkRDMA ShuffleManager (GitHub address: https://github.com/Mellanox/SparkRDMA)), which is developed and open source by Mellanox Technologies, uses RDMA technology to make Spark jobs use RDMA instead of standard TCP when Shuffle data. The following is described in the official Wiki of SparkRDMA:
SparkRDMA is a high-performance, scalable and efficient ShuffleManager plugin for Apache Spark. It utilizes RDMA (Remote Direct Memory Access) technology to reduce CPU cycles needed for Shuffle data transfers. It reduces memory usage by reusing memory for transfers instead of copying data multiple times down the traditional TCP-stack.
It can be seen that SparkRDMA extends the ShuffleManager interface of Spark and adopts RDMA technology. The test results show that using RDMA to carry out Shuffle data is 2.18 times faster than the standard way!
SparkRDMA developers have submitted an Issue: [spark-22229] SPIP: RDMA Accelerated Shuffle Engine to the Spark community, detailed design documentation: here. However, judging from the responses from the community, at least it will not be integrated into the Spark code for now.
Installation and use
If you want to use SparkRDMA, we need Apache Spark 2.0.0, Java 2.1.0, Java 2.2.0, and networks that support RDMA technology (such as RoCE and Infiniband).
SparkRDMA officially prepares the corresponding jar packages for different versions of Spark, which we can download here. After decompression, you will get the following four files:
Spark-rdma-1.0-for-spark-2.0.0-jar-with-dependencies.jar
Spark-rdma-1.0-for-spark-2.1.0-jar-with-dependencies.jar
Spark-rdma-1.0-for-spark-2.2.0-jar-with-dependencies.jar
Libdisni.so
Except that the libdisni.so file must be installed on all nodes in the Spark cluster, the other jar packages only need to be selected according to our Spark version. After the relevant files are deployed, we need to add the SparkRDMA module to the running environment of Spark, as follows:
Spark.driver.extraClassPath / path/to/SparkRDMA/spark-rdma-1.0-for-spark-2.0.0-jar-with-dependencies.jar
Spark.executor.extraClassPath / path/to/SparkRDMA/spark-rdma-1.0-for-spark-2.0.0-jar-with-dependencies.jar
To enable the SparkRDMA Shuffle Manager plug-in, we also need to modify the value of spark.shuffle.manager by adding the following matching to $SPARK_HOME/conf/spark-defaults.conf:
Spark.shuffle.manager org.apache.spark.shuffle.rdma.RdmaShuffleManager
Others are the same as using Spark normally.
About configuring libdisni.so
We need to distribute the libdisni.so file to the same directory of all nodes in the cluster, and then configure the following environment:
Export JAVA_LIBRARY_PATH=$JAVA_LIBRARY_PATH:/home/iteblog/spark-2.1.0-bin/rdma/
Export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/home/iteblog/spark-2.1.0-bin/rdma/
Export SPARK_YARN_USER_ENV= "JAVA_LIBRARY_PATH=$JAVA_LIBRARY_PATH,LD_LIBRARY_PATH=$LD_LIBRARY_PATH"
Where / home/iteblog/spark-2.1.0-bin/rdma/ stores the libdisni.so file. Libibverbs.so.1 and librdmacm.so.1 files may also be required during the run, which can be solved with the following command
Yum-y install libibverbs librdmacm
You can then start Spark with the following command:
Bin/spark-shell-- master yarn-client-- driver-memory 18g-- executor-memory 15g\
-queue iteblog-executor-cores 1-num-executors 8\
-conf "spark.yarn.dist.archives=/home/iteblog/spark-2.1.0-bin/rdma/rdma.tgz"\
-conf "spark.executor.extraLibraryPath=/home/iteblog/spark-2.1.0-bin/rdma/libdisni.so"\
-conf "spark.driver.extraLibraryPath=/home/iteblog/spark-2.1.0-bin/rdma/libdisni.so"\
-conf "spark.executor.extraClassPath=rdma.tgz/rdma/*"\
-conf "spark.driver.extraClassPath=/home/iteblog/spark-2.1.0-bin/rdma/*"\
-conf "spark.shuffle.manager=org.apache.spark.shuffle.rdma.RdmaShuffleManager"
However, if your network does not support RDMA technology, then, like me, you will encounter the following problems:
17-11-15 22:01:48 ERROR rdma.RdmaNode: Failed in RdmaNode constructor
17-11-15 22:01:48 ERROR spark.SparkContext: Error initializing SparkContext.
Java.lang.reflect.InvocationTargetException
At sun.reflect.NativeConstructorAccessorImpl.newInstance0 (Native Method)
At sun.reflect.NativeConstructorAccessorImpl.newInstance (NativeConstructorAccessorImpl.java:62)
At sun.reflect.DelegatingConstructorAccessorImpl.newInstance (DelegatingConstructorAccessorImpl.java:45)
At java.lang.reflect.Constructor.newInstance (Constructor.java:423)
At org.apache.spark.SparkEnv$.instantiateClass$1 (SparkEnv.scala:265)
At org.apache.spark.SparkEnv$.create (SparkEnv.scala:323)
At org.apache.spark.SparkEnv$.createDriverEnv (SparkEnv.scala:174)
At org.apache.spark.SparkContext.createSparkEnv (SparkContext.scala:257)
At org.apache.spark.SparkContext. (SparkContext.scala:432)
At org.apache.spark.SparkContext$.getOrCreate (SparkContext.scala:2313)
At org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply (SparkSession.scala:868)
At org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply (SparkSession.scala:860)
At scala.Option.getOrElse (Option.scala:121)
At org.apache.spark.sql.SparkSession$Builder.getOrCreate (SparkSession.scala:860)
At org.apache.spark.repl.Main$.createSparkSession (Main.scala:95)
At $line3.$read$$iw$$iw. (: 15)
At $line3.$read$$iw. (: 42)
At $line3.$read. (: 44)
At $line3.$read$. (: 48)
At $line3.$read$. ()
At $line3.$eval$.$print$lzycompute (: 7)
At $line3.$eval$.$print (: 6)
At $line3.$eval.$print ()
At sun.reflect.NativeMethodAccessorImpl.invoke0 (NativeMethod)
At sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
At sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
At java.lang.reflect.Method.invoke (Method.java:498)
At scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call (IMain.scala:786)
At scala.tools.nsc.interpreter.IMain$Request.loadAndRun (IMain.scala:1047)
At scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply (IMain.scala:638)
At scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply (IMain.scala:637)
At scala.reflect.internal.util.ScalaClassLoader$class.asContext (ScalaClassLoader.scala:31)
At scala.reflect.internal.util.AbstractFileClassLoader.asContext (AbstractFileClassLoader.scala:19)
At scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq (IMain.scala:637)
At scala.tools.nsc.interpreter.IMain.interpret (IMain.scala:569)
At scala.tools.nsc.interpreter.IMain.interpret (IMain.scala:565)
At scala.tools.nsc.interpreter.ILoop.interpretStartingWith (ILoop.scala:807)
At scala.tools.nsc.interpreter.ILoop.command (ILoop.scala:681)
At scala.tools.nsc.interpreter.ILoop.processLine (ILoop.scala:395)
At org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply$mcV$sp (SparkILoop.scala:38)
At org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply (SparkILoop.scala:37)
At org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply (SparkILoop.scala:37)
At scala.tools.nsc.interpreter.IMain.beQuietDuring (IMain.scala:214)
At org.apache.spark.repl.SparkILoop.initializeSpark (SparkILoop.scala:37)
At org.apache.spark.repl.SparkILoop.loadFiles (SparkILoop.scala:105)
At scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp (ILoop.scala:920)
At scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply (ILoop.scala:909)
At scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply (ILoop.scala:909)
At scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader (ScalaClassLoader.scala:97)
At scala.tools.nsc.interpreter.ILoop.process (ILoop.scala:909)
At org.apache.spark.repl.Main$.doMain (Main.scala:68)
At org.apache.spark.repl.Main$.main (Main.scala:51)
At org.apache.spark.repl.Main.main (Main.scala)
At sun.reflect.NativeMethodAccessorImpl.invoke0 (NativeMethod)
At sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
At sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
At java.lang.reflect.Method.invoke (Method.java:498)
At org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain (SparkSubmit.scala:738)
At org.apache.spark.deploy.SparkSubmit$.doRunMain$1 (SparkSubmit.scala:187)
At org.apache.spark.deploy.SparkSubmit$.submit (SparkSubmit.scala:212)
At org.apache.spark.deploy.SparkSubmit$.main (SparkSubmit.scala:126)
At org.apache.spark.deploy.SparkSubmit.main (SparkSubmit.scala)
Caused by: java.io.IOException: Unable to allocate RDMA Event Channel
At org.apache.spark.shuffle.rdma.RdmaNode. (RdmaNode.java:67)
At org.apache.spark.shuffle.rdma.RdmaShuffleManager. (RdmaShuffleManager.scala:181)
... 62 more
Java.io.IOException: Unable to allocate RDMA Event Channel
At org.apache.spark.shuffle.rdma.RdmaNode. (RdmaNode.java:67)
At org.apache.spark.shuffle.rdma.RdmaShuffleManager. (RdmaShuffleManager.scala:181)
At sun.reflect.NativeConstructorAccessorImpl.newInstance0 (Native Method)
At sun.reflect.NativeConstructorAccessorImpl.newInstance (NativeConstructorAccessorImpl.java:62)
At sun.reflect.DelegatingConstructorAccessorImpl.newInstance (DelegatingConstructorAccessorImpl.java:45)
At java.lang.reflect.Constructor.newInstance (Constructor.java:423)
At org.apache.spark.SparkEnv$.instantiateClass$1 (SparkEnv.scala:265)
At org.apache.spark.SparkEnv$.create (SparkEnv.scala:323)
At org.apache.spark.SparkEnv$.createDriverEnv (SparkEnv.scala:174)
At org.apache.spark.SparkContext.createSparkEnv (SparkContext.scala:257)
At org.apache.spark.SparkContext. (SparkContext.scala:432)
At org.apache.spark.SparkContext$.getOrCreate (SparkContext.scala:2313)
At org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply (SparkSession.scala:868)
At org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply (SparkSession.scala:860)
At scala.Option.getOrElse (Option.scala:121)
At org.apache.spark.sql.SparkSession$Builder.getOrCreate (SparkSession.scala:860)
At org.apache.spark.repl.Main$.createSparkSession (Main.scala:95)
... 47 elided
In that case, there is no way to test, . If you really want to use RDMA, ask your company's OPS how to configure this.
Conclusion
Thank you for watching. If there are any deficiencies, you are welcome to criticize and correct them.
If you have a partner who is interested in big data or a veteran driver who works in big data, you can join the group:
658558542
Welcome everyone to exchange and share, study and exchange, and make common progress. There are also a lot of free materials to help you overcome difficulties on your way to becoming big data engineers and even architects! )
Finally, I wish all the big data programmers who encounter bottlenecks to break through themselves and wish you all the best in the future work and interview.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
Configure DHCP server on Cisco Router:--ip dhcp excluded-addres
© 2024 shulou.com SLNews company. All rights reserved.