Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to analyze the RPC source code of Spark underlying communication

2025-03-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article is about how to analyze the RPC source code of Spark underlying communications. The editor thinks it is very practical, so I share it with you. I hope you can get something after reading this article. Let's follow the editor to have a look.

RPC communication: whether it is hadoop2.x 's Rpc communication mode or Spark2.x 's Rpc communication mode, simply and popularly speaking, it is the remote communication between two processes. For example, there is a class An in a project An of java, a washA method in a project B, a Class class B in a project, and a method washB in it. Project B calls washA in Project A through proxy mode and java reflection mechanism, which can be understood as a simple Rpc communication mode.

Spark2.x

Spark2.x uses the communication method based on RPC, removes the implementation of 1.x Akka, and only retains the implementation of netty. Spark2.x Rpc provides high-level abstraction (RpcEndpoint, RpcEnv, RpcEndPointRef). As long as the defined abstraction is implemented, Rpc communication can be completed. After Spark2.x, the current version only retains the implementation of Netty (NettyRpcEnv, NettyRpcEndpointRef). I believe that developers know the greatest benefits of defining abstraction. In the future, no matter what kind of implementation is provided, as long as the RPCEndpoint,RpcEnv,RpcEndpointRef is implemented, the communication function can be completed. For example, write your own version of Rpc communication implementation.

The Rpc communication mode of Spark2.x mainly includes the following important aspects

RpcEndpoint: message communication body, mainly used to receive and process messages. The RpcEndPoint interface is a message communication body (Master, Work). RpcEndpoint needs to register with RpcEnv.

In the context of RpcEnv:Rpc communication, messages are sent first through RpcEnv and then routed to the corresponding RpcEndPoint to get RpcEndPoint

Reference to RpcEndPointRef:RpcEndPoint if you want to send a message to a RpcEndPoint, you must first get a reference to RpcEndPoint through RpcEnv

The definition in the RpcEndPoint interface is as follows

Val rpcEnv: RpcEnv / / get the RpcEnv object

Final def self: RpcEndpointRef = {/ / return a RpcEnpointRef. This method is usually used to send messages to yourself.

RpcEnv.endpointRef (this)

}

Def receive: PartialFunction [Any, Unit] / / handles RpcEndPointRef.send or RpcEndPointRef.reply methods, which do not require response information

Def receiveAndReply (context: RpcCallContext): PartialFunction [Any, Unit] / / processes the message sent by RpcEndPointref.ask. After processing, you need to send a response message (reply) to the communication side that calls ask.

This method is called when def onError (cause: Throwable) / / fails to process a message.

Triggered when def onConnected (remoteAddress: RpcAddress) / / remotely connects to the current node

Triggered when def onDisconnected (remoteAddress: RpcAddress) / / remote connection is disconnected

Def onNetworkError (cause: Throwable, remoteAddress: RpcAddress) / / triggered when a network exception occurs in a remote connection

Def onStop () / / stop RpcEndPoint

Def onStart () / / start RpcEndPoint. This is not just about starting RpcEndPoint to process any message on the network. In many cases, you can write your own implementation of RpcEndPoint, such as startup port, or create a directory, in the onStart method.

But RpcEndPoint can accept messages sent by RpcEndPointRef only after the onStart method has done some processing.

Private [spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint// because receive is a concurrent operation, if you want ready-made security, use threadSafeRpcEndPoint.

The lifecycle construction of RpcEndPoint-- > onStart-- > receive-- > onStop. Note that the method of onStart is executed after calling setRpcEndPoint registration. Any onStart method of RpcEndPoint is executed after registration.

The reference to the source code behind the reason

RpcEndpointRef: abstract class

Def address: RpcAddress / / returns a RppAddress based on the hostname port

Def name: String//name a string I don't know what to do for the time being.

Def send (message: Any): Unit// sends a message to RpcEndPoint without returning a result

Def ask [T: ClassTag] (message: Any, timeout: RpcTimeout): Future [T]

Def ask [T: ClassTag] (message: Any): Future [T] = ask (message, defaultAskTimeout) / / send a message to RpcEndPoint and get the returned result

Def askWithRetry [T: ClassTag] (message: Any): t = askWithRetry (message, defaultAskTimeout) / / when you want RpcEndPoint to send a message and return the result within a certain period of time and retry a certain number of times

RpcEnv

Private [rpc] def endpointRef (endpoint: RpcEndpoint): RpcEndpointRef// is passed into RpcEndPoint to get RpcEndPointref object

Def address: RpcAddress// returns a RppAddress based on the hostname port

Def setupEndpoint (name: String, endpoint: RpcEndpoint): RpcEndpointRef// registration RpcEndPoint returns the corresponding RpcEndPointRef

Def asyncSetupEndpointRefByURI (uri: String): Future [RpcEndpointRef] / / get RpcEndPointRef in one step through uri

Def stop (endpoint: RpcEndpointRef): Unit// stops RpcEndPoint according to RpcEndPointRef

Def shutdown (): Unit// closes RpcEndPoint

Def awaitTermination (): Unit// waits for RpcEndPoint to exit

Object RpcEnv

Def create (

Name: String

Host: String

Port: Int

Conf: SparkConf

SecurityManager: SecurityManager

ClientMode: Boolean = false): RpcEnv = {

Val config = RpcEnvConfig (conf, name, host, port, securityManager, clientMode)

New NettyRpcEnvFactory () create (config)

}

/ / create a RpcEnv environment through RpcEnvFactory.create

RpcEnvConfig

Private [spark] case class RpcEnvConfig (

Conf: SparkConf

Name: String

Host: String

Port: Int

SecurityManager: SecurityManager

ClientMode: Boolean)

The case class includes SparkConf,name,host,port, etc.

NettyRpcEnv NettyRpcEnv is created through the create method of NettyRpcEnvFactory

Val nettyEnv =

New NettyRpcEnv (sparkConf, javaSerializerInstance, config.host, config.securityManager) / / create nettyEnv

Privateval dispatcher: Dispatcher = new Dispatcher (this)

Dispatcher is responsible for routing RPC messages. It can route messages to the corresponding RpcEndpoint for processing, and store the mapping between RpcEndPoint and RpcEndPointRef.

NettyStreamManager is responsible for providing file services (files, JAR files, directories)

TransportContext is responsible for managing network transmission context information: creating MessageEncoder, MessageDecoder, TransportClientFactory, TransportServer

NettyRpcHandler handles network IO events, receives RPC call requests, and dispatches messages through Dispatcher

Here, let's talk about the Dispatcher class, which is mainly responsible for Rpc message routing. There is an internal tired EndPointData but there is a ready-made and secure Inbox. It is very important to analyze the messages received when it is stored.

Private class EndpointData (

Val name: String

Val endpoint: RpcEndpoint

Val ref: NettyRpcEndpointRef) {

Val inbox = new Inbox (ref, endpoint)

}

Privateval endpoints = new ConcurrentHashMap [String, EndpointData] / / stores the information of the EndPoint corresponding to name- >

Privateval endpointRefs = new ConcurrentHashMap [RpcEndpoint, RpcEndpointRef] / / the mapping relationship between RpcEndpoint and RpcEndpointRef

Privateval receivers = new LinkedBlockingQueue [EndpointData] / / under the queue, there will be a ready-made one constantly taken out of it for processing.

Def registerRpcEndpoint (name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {

Val addr = RpcEndpointAddress (nettyEnv.address, name)

Val endpointRef = new NettyRpcEndpointRef (nettyEnv.conf, addr, nettyEnv)

Synchronized {

If (stopped) {

Throw new IllegalStateException ("RpcEnv has been stopped")

}

If (endpoints.putIfAbsent (name, new EndpointData (name, endpoint, endpointRef))! = null) {

Throw new IllegalArgumentException (s "There is already an RpcEndpoint called $name")

}

Val data = endpoints.get (name)

EndpointRefs.put (data.endpoint, data.ref)

Receivers.offer (data) / / for the OnStart message

}

EndpointRef

}

/ / registering RpcEndPoint occurs here at the same time as data put to receivers

There is a threadpool in NettyRpcEndPoint

Privateval threadpool: ThreadPoolExecutor = {

Val numThreads = nettyEnv.conf.getInt ("spark.rpc.netty.dispatcher.numThreads"

Math.max (2, Runtime.getRuntime.availableProcessors ())

Val pool = ThreadUtils.newDaemonFixedThreadPool (numThreads, "dispatcher-event-loop")

For (I / / Give this an alias so we can use it more clearly in closures.

@ GuardedBy ("this")

Protected val messages = new java.util.LinkedList [InboxMessage] ()

Inbox.synchronized {

Messages.add (OnStart)

}

When you create this class, you will have a list collection of messagelinkedList, and after you create this combination, you will add the onStart method to it, and it is readily secure.

Then the process method will constantly get the data of the collection to perform the corresponding operation.

Case OnStart = >

Endpoint.onStart ()

If (! endpoint.isInstanceOf [ThreadSafeRpcEndpoint]) {

Inbox.synchronized {

If (! stopped) {

EnableConcurrent = true

}

}

}

At this point, the onStart method is called.

At this time, RpcEndPoint can accept the message and process it.

Spark Rpc communication is divided into local messages and remote messages. Local messages are equivalent to calling methods and are directly stored in Index (Chinese inbox). Remote messages need to go to NettyRpcHandler.

The above is how to carry out Spark underlying communication RPC source code analysis, the editor believes that there are some knowledge points that we may see or use in our daily work. I hope you can learn more from this article. For more details, please follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report