In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-11 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly explains "how to open DRA in spark". The content in the article is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "how to open DRA in spark".
DynamicResourceAllocation in spark on yarn
Spark on yarn has been supported for DynamicResourceAllocation allocation since spark 1.2.
Anyone familiar with spark knows that if we want to turn on DynamicResourceAllocation, we need an ExternalShuffleService service.
For yarn, ExternalShuffleService is enabled as an auxiliary service. The configuration is as follows:
Yarn.nodemanager.aux-services spark_shuffle yarn.nodemanager.aux-services.spark_shuffle.class org.apache.spark.network.yarn.YarnShuffleService spark.shuffle.service.port 7337
Restart nodeManager so that a YarnShuffleService is started on each nodeManager node, and then set spark.dynamicAllocation.enabled to true in the spark application, thus achieving the effect of dynamic allocation of resources at run time.
Let's start directly with the creation of SparkEnv in CoarseGrainedExecutorBackend. Every executor starts through the CoarseGrainedExecutorBackend main method, and main involves the creation of SparkEnv.
Val env = SparkEnv.createExecutorEnv (driverConf, arguments.executorId, arguments.bindAddress, arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)
The creation of sparkEnv involves the creation of BlockManager. Go down the code, and finally
Val blockTransferService = new NettyBlockTransferService (conf, securityManager, bindAddress, advertiseAddress, blockManagerPort, numUsableCores, blockManagerMaster.driverEndpoint) val blockManager = new BlockManager (executorId, rpcEnv, blockManagerMaster, serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager, blockTransferService, securityManager, externalShuffleClient)
In the initialize method of blockManager, registerWithExternalShuffleServer is performed
/ / Register Executors' configuration with the local shuffle service, if one should exist. If (externalShuffleServiceEnabled & &! blockManagerId.isDriver) {registerWithExternalShuffleServer ()}
If we enable ExternalShuffleService, if yarn is YarnShuffleService, we will register the current ExecutorShuffleInfo in the ExternalShuffleService with host as shuffleServerId.host and port as shuffleServerId.port. The information of ExecutorShuffleInfo is as follows:
Val shuffleConfig = new ExecutorShuffleInfo (diskBlockManager.localDirsString, diskBlockManager.subDirsPerLocalDir, shuffleManager.getClass.getName)
Here I will focus on the following snippets from registerWithExternalShuffleServer's method.
/ / Synchronous and will throw an exception if we cannot connect. BlockStoreClient.asInstanceOf [ExternalBlockStoreClient] .registerWithShuffleServer (shuffleServerId.host, shuffleServerId.port, shuffleServerId.executorId, shuffleConfig)
The shuffleServerId in this code comes from:
ShuffleServerId = if (externalShuffleServiceEnabled) {logInfo (s "external shuffle service port = $externalShuffleServicePort") BlockManagerId (executorId, blockTransferService.hostName, externalShuffleServicePort)} else {blockManagerId}
And blockTransferService.hostName was sent from advertiseAddress when we created it in SparkEnv.
Finally, it comes from the CoarseGrainedExecutorBackend main class parameter hostname, so how on earth did it come here? Refer to the prepareCommand method of ExecutorRunnable
Val commands = prefixEnv + + Seq (Environment.JAVA_HOME.$$ () + "/ bin/java", "- server") + + javaOpts + + Seq ("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend", "--driver-url", masterAddress, "--executor-id", executorId, "--hostname", hostname, "--cores", executorCores.toString, "--app-id", appId, "--resourceProfileId" ResourceProfileId.toString) + +
And the value of this hostname is ultimately determined by the YarnAllocator method runAllocatedContainers
Val executorHostname = container.getNodeId.getHost
What is passed on, that is to say, we finally get the yarn node, that is, the host of nodeManager, and each executor that starts, registers the ExecutorShuffleInfo information with the YarnShuffleService of the nodeManager where the executor is located, so that for those that enable dynamic resource allocation
For ExternalBlockStoreClient, the fetchBlocksg process is more or less the same as NettyBlockTransferService without dynamic resource allocation.
DynamicResourceAllocation in spark on K8s (kubernetes)
Referring to the previous article, we know that when we started executor in entrypoint, we passed the hostname parameter
Executor) shift 1 CMD= (${JAVA_HOME} / bin/java "${SPARK_EXECUTOR_JAVA_OPTS [@]}"-Xms$SPARK_EXECUTOR_MEMORY-Xmx$SPARK_EXECUTOR_MEMORY-cp "$SPARK_CLASSPATH:$SPARK_DIST_CLASSPATH" org.apache.spark.executor.CoarseGrainedExecutorBackend-- driver-url $SPARK_DRIVER_URL-- executor-id $SPARK_EXECUTOR_ID-- cores $SPARK_ EXECUTOR_CORES-app-id $SPARK_APPLICATION_ID-hostname $SPARK_EXECUTOR_POD_IP)
SPARK_EXECUTOR_POD_IP is the running POD IP. Refer to the BasicExecutorFeatureStep class snippet:
Seq (new EnvVarBuilder () .withName (ENV_EXECUTOR_POD_IP) .withValueFrom (new EnvVarSourceBuilder (). WithNewFieldRef ("v1", "status.podIP") .build ()) .build ())
So according to the analysis of the above process
Executor also cannot register with the k8s node ExternalShuffleService service because the node we registered is POD IP, not node IP
Of course, the spark community has long proposed dynamic resource allocation with external shuffle service not enabled, and has been merged into the master branch. For specific configuration, please refer to the following:
Spark.dynamicAllocation.enabled true spark.dynamicAllocation.shuffleTracking.enabled truespark.dynamicAllocation.minExecutors 1spark.dynamicAllocation.maxExecutors 4spark.dynamicAllocation.executorIdleTimeout 60s Thank you for your reading. The above is the content of "how to turn on DRA in spark". After the study of this article, I believe you have a deeper understanding of how to open DRA in spark, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.