Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Lesson 43: Spark 1.6RPC insider decryption: operating mechanism, detailed source code interpretation, Netty and Akka, etc.

2025-04-04 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

Spark is a distributed computing framework, and there must be communication between multiple machines. Spark was implemented with Akka in earlier versions. Now an RpcEnv is abstracted from the upper layer of Akka. RpcEnv is responsible for managing communication between machines.

RpcEnv contains the following three cores:

The RpcEndpoint message loop body, which is responsible for receiving and processing messages. Master and Worker in Spark are all RpcEndpoint.

RpcEndpointRef: a reference to RpcEndpoint. If you need to communicate with RpcEndpoint, you must get its RpcEndpointRef and send messages through RpcEndpointRef.

Dispatcher: message scheduler, which is responsible for routing RPC messages to the appropriate RpcEndpoint.

After the RpcEnv is created, the RpcEndpoint can be registered with the RpcEnv, and the registered RpcEndpoint generates a corresponding RpcEndpointRef to reference it. If you need to send a message to RpcEndpoint, you must go to RpcEnv to get the corresponding RpcEndpointRef through the name of RpcEndpoint, and then send a message to RpcEndpoint through RpcEndpointRef.

RpcEnv is responsible for managing the entire lifecycle of RpcEndpoint

Register for RpcEndpoint and use name or uri

Route the message sent to RpcEndpoint.

Stop RpcEndpoint

Note: a RpcEndpoint can only be registered with one RpcEnv.

The logical address of the RpcAddress:RpcEnv, represented by the hostname and port.

RpcEndpointAddress: the address of the RpcEndpoint registered on RpcEnv, consisting of RpcAddress and name.

This shows that RpcEnv and RpcEndpoint are on the same machine (in the same JVM). To send a message to a remote machine, you get the RpcEndpointRef of the remote machine, not the remote RpcEndpoint registered with the local RpcEnv.

In the Spark1.6 version, netty is used by default

Private def getRpcEnvFactory (conf: SparkConf): RpcEnvFactory = {val rpcEnvNames = Map ("akka"-> "org.apache.spark.rpc.akka.AkkaRpcEnvFactory", "netty"-> "org.apache.spark.rpc.netty.NettyRpcEnvFactory") val rpcEnvName = conf.get ("spark.rpc", "netty") val rpcEnvFactoryClassName = rpcEnvNames.getOrElse (rpcEnvName.toLowerCase, rpcEnvName) Utils.classForName (rpcEnvFactoryClassName). NewInstance (). AsInstanceOf [RpcEnvFactory]}

RpcEndpoint is a message loop whose life cycle is as follows:

Construct (Constructor)-> start (onStart)-> message receive (receive&receiveAndReply)-> stop (onStop)

Receive (): runs continuously to process the messages sent by the client.

ReceiveAndReply (): processes the message and responds to each other.

Let's take a look at the Master code:

Def main (argStrings: Array [String]) {SignalLogger.register (log) val conf = new SparkConf val args = new MasterArguments (argStrings, conf) / / the specified hostname must be the local machine name val (rpcEnv, _) = startRpcEnvAndEndpoint (args.host, args.port, args.webUiPort) where the start-master.sh script runs Conf) rpcEnv.awaitTermination ()} / * Start the Master and return a three tuple of: * (1) The Master RpcEnv * (2) The web UI bound port * (3) The REST server bound port, if any * / def startRpcEnvAndEndpoint (host: String, port: Int, webUiPort: Int, conf: SparkConf): (RpcEnv, Int, Option [Int]) = {val securityMgr = new SecurityManager (conf) / / create Rpc environment The hostname and port are the access addresses of the Standalone cluster. SYSTEM_NAME=sparkMaster val rpcEnv = RpcEnv.create (SYSTEM_NAME, host, port, conf, securityMgr) / / Register the Master instance to RpcEnv val masterEndpoint = rpcEnv.setupEndpoint (ENDPOINT_NAME, new Master (rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf)) val portsResponse = masterEndpoint.askWithRetry [BoundPortsResponse] (BoundPortsRequest) (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)}

The RpcEnv is created in the main method, and the Master instance is instantiated, and then registered with RpcEnv.

RpcEndpoint is actually registered with Dispatcher, and the code implementation in netty is as follows:

Override def setupEndpoint (name: String, endpoint: RpcEndpoint): RpcEndpointRef = {dispatcher.registerRpcEndpoint (name, endpoint)}

Note: line 135 of NettyRpcEnv.scala

The following data structures are used in Dispatcher to store RpcEndpoint and RpcEndpointRef

Privateval endpoints = new ConcurrentHashMap [String, EndpointData] privateval endpointRefs = new ConcurrentHashMap [RpcEndpoint, RpcEndpointRef]

EndpointData is a case class:

Private class EndpointData (val name: String, val endpoint: RpcEndpoint, val ref: NettyRpcEndpointRef) {val inbox = new Inbox (ref, endpoint)}

In Master, the data structure WorkerInfo is used to hold the information of each Worker, including the RpcEndpointRef of each Worker.

Note:

1. DT big data DreamWorks Wechat official account DT_Spark

2. IMF 8: 00 p.m. Big data actual combat YY live broadcast channel number: 68917580

3. Sina Weibo: http://www.weibo.com/ilovepains

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report