In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article introduces the relevant knowledge of "hadoop rpc server initialization and call process example analysis". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
Initialization of rpc server
As mentioned above, we mainly borrow namenode's remote service here. Let's take a look at the relevant code:
Public class NameNode implements NameNodeStatusMXBean {public static void main (String argv []) throws Exception {NameNode namenode = createNameNode (argv, null);} protected NameNode (Configuration conf, NamenodeRole role) throws IOException {initialize (conf);} protected void initialize (Configuration conf) throws IOException {rpcServer = createRpcServer (conf); startCommonServices (conf) / quite important} protected NameNodeRpcServer createRpcServer (Configuration conf) throws IOException {return new NameNodeRpcServer (conf, this);}}
When our linux terminal executes the startup command of hadoop, the final command is to call the main method of NameNode, so the entry point of our tracking code is the main method of NameNode. The method is relatively simple, that is, we call the constructor of NameNode to create a NameNode, and then execute the initialization method initialize. Relatively speaking, this method is our focus, and all initialization operations, including rpc services, are placed in this method. Specific to rpc, he implements two related methods, createRpcServer and startCommonServices. The first method is to see the name of the idea, not to say, first briefly introduce the following method, the role of this method is to start the rpc service of namenode, I will give the code later. OK, as you can see from the code above, our rpcServer functions are all placed in the class NameNodeRpcServer. Now let's take a look at the related code in this class:
Class NameNodeRpcServer implements NamenodeProtocols {public NameNodeRpcServer (Configuration conf, NameNode nn) throws IOException {RPC.setProtocolEngine (conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class); ClientNamenodeProtocolServerSideTranslatorPB clientProtocolServerTranslator = new ClientNamenodeProtocolServerSideTranslatorPB (this); BlockingService clientNNPbService = ClientNamenodeProtocol. NewReflectiveBlockingService (clientProtocolServerTranslator); InetSocketAddress rpcAddr = nn.getRpcServerAddress (conf); / / fs.defaultFS String bindHost = nn.getRpcServerBindHost (conf); if (bindHost = = null) {bindHost = rpcAddr.getHostName ();} LOG.info ("RPC server is binding to" + bindHost + ":" + rpcAddr.getPort ()) This.clientRpcServer = new RPC.Builder (conf) .setProtocol (org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class) .setInstance (clientNNPbService) .setBindAddress (bindHost) .setPort (rpcAddr.getPort ()) .setNumHandlers (handlerCount) .setVerbose (false) .setkeeper Manager (namesystem.getDelegationTokenSecretManager ()). Build (); / / Add all the RPC protocols that the namenode implements DFSUtil.addPBProtocol (conf, HAServiceProtocolPB.class, haPbService, clientRpcServer) DFSUtil.addPBProtocol (conf, NamenodeProtocolPB.class, NNPbService, clientRpcServer); DFSUtil.addPBProtocol (conf, DatanodeProtocolPB.class, dnProtoPbService, clientRpcServer);}}
The most important thing in the constructor of NameNodeRpcServer is to instantiate clientRpcServer. What I want to explain most is that NameNode claims to implement three protocols: ClientProtocol, DatanodeProtocol and NamenodeProtocol. The implementation on the server basically depends on types such as ClientNamenodeProtocolServerSideTranslatorPB, especially when instantiating ClientNamenodeProtocolServerSideTranslatorPB, there is a parameter passed in, which is the NameNodeRpcServer instance. See the code:
Public ClientNamenodeProtocolServerSideTranslatorPB (ClientProtocol server) throws IOException {this.server = server;} @ Override public GetBlockLocationsResponseProto getBlockLocations (RpcController controller, GetBlockLocationsRequestProto req) throws ServiceException {try {LocatedBlocks b = server.getBlockLocations (req.getSrc (), req.getOffset (), req.getLength ()); Builder builder = GetBlockLocationsResponseProto .newBuilder (); if (b! = null) {builder.setLocations (PBHelper.convert (b)). Build () } return builder.build ();} catch (IOException e) {throw new ServiceException (e);}}
The getBlockLocations in the above code also illustrates the point just made to some extent.
Now let's go back to the startCommonServices method executed in the initialize method in NameNode, which is used to start the threads under clientRpcServer, including listener,handler and response, as shown in the code:
Public class NameNode implements NameNodeStatusMXBean {private void startCommonServices (Configuration conf) throws IOException {rpcServer.start ();}} class NameNodeRpcServer implements NamenodeProtocols {void start () {clientRpcServer.start (); if (serviceRpcServer! = null) {serviceRpcServer.start ();} public abstract class Server {public synchronized void start () {responder.start (); listener.start (); handlers = new Handler [handlerCount]; for (int I = 0; I < handlerCount) ) {handlers [I] = new Handler (I); handlers [I] .start ();}
The code sees here, and the rpc-related code ends during startup.
The calling procedure of the rpc server
Now let's take a look at the process in which rpc is called, starting with a look at the key structure of Server:
Public abstract class Server {private Listener listener = null; private Responder responder = null; private Handler [] handlers = null; private class Responder extends Thread {} private class Listener extends Thread {} private class Handler extends Thread {}}
At initialization time, start all threads under listener, responder, and handlers.
A socker service is started in the listener thread, which is specially used to accept requests from the client. The thread under handler is used to process specific requests, and responder writes the result of the request. For more information, please see the following code:
Public abstract class Server {private Listener listener = null; private Responder responder = null; private Handler [] handlers = null; private class Listener extends Thread {public Listener () throws IOException {address = new InetSocketAddress (bindAddress, port); / / Create a new server socket and set to non blocking mode acceptChannel = ServerSocketChannel.open (); acceptChannel.configureBlocking (false); / / Bind the server socket to the local host and port bind (acceptChannel.socket (), address, backlogLength, conf, portRangeConfig) Port = acceptChannel.socket (). GetLocalPort (); / / Could be an ephemeral port / / create a selector; selector= Selector.open (); readers = new Reader [readThreads]; for (int I = 0; I < readThreads; I +) {Reader reader = new Reader ("Socket Reader #" + (I + 1) + "for port" + port); readers [I] = reader; reader.start () } / / Register accepts on the server socket with the selector. AcceptChannel.register (selector, SelectionKey.OP_ACCEPT); this.setName ("IPC Server listener on" + port); this.setDaemon (true);} public void run () {while (running) {doAccept (key);}} void doAccept (SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError {Reader reader = getReader (); Connection c = connectionManager.register (channel) Key.attach (c); / / so closeCurrentConnection can get the object reader.addConnection (c);} private class Reader extends Thread {public void run () {doRunLoop ();} private synchronized void doRunLoop () {while (running) {Connection conn = pendingConnections.take () Conn.channel.register (readSelector, SelectionKey.OP_READ, conn);} readSelector.select (); doRead (key);} void doRead (SelectionKey key) throws InterruptedException {Connection c = (Connection) key.attachment (); count = c.readAndProcess () } public class Connection {public int readAndProcess () {processOneRpc (data.array ());} private void processOneRpc (byte [] buf) {processRpcRequest (header, dis) } private void processRpcRequest (RpcRequestHeaderProto header, DataInputStream dis) throws WrappedRpcServerException, InterruptedException {Call call = new Call (header.getCallId (), header.getRetryCount (), rpcRequest, this, ProtoUtil.convert (header.getRpcKind ()), header .getClientId (). ToByteArray ()); callQueue.put (call);} private class Handler extends Thread {public void run () {final Call call = callQueue.take () Value = call (call.rpcKind, call.connection.protocolName, call.rpcRequest, call.timestamp); setupResponse (buf, call, returnStatus, detailedErr, value, errorClass, error); responder.doRespond (call);} private class Responder extends Thread {void doRespond (Call call) throws IOException {processResponse (call.connection.responseQueue, true) } private boolean processResponse (LinkedList responseQueue, boolean inHandler) throws IOException {int numBytes = channelWrite (channel, call.rpcResponse); done = true;}}
Here is a relatively complete rpc call process of Server. Starting from the listener constructor, several reader threads are played in his constructor. When the listener receives the access request, the reader requests to read the data. The reader actually calls the readAndProcess method of connection. In this method, the call object is added to the callQueue in the RPC server. After that, the handler guy fetches the current call from the queue and the specific process. There is something wrong with the call method of the Server class, which is known to those who have followed the code carefully, because the instance class of server is no longer org.apache.hadoop.ipc.Server, but an implementation class of Protobuf, org.apache.hadoop.ipc.RPC.Server, and the call method is rewritten as follows:
@ Override public Writable call (RPC.RpcKind rpcKind, String protocol, Writable rpcRequest, long receiveTime) throws Exception {return getRpcInvoker (rpcKind) .call (this, protocol, rpcRequest, receiveTime);}
Keep tracking, and you'll almost get it to the end:
Public class ProtobufRpcEngine implements RpcEngine {public static class Server extends RPC.Server {public Writable call (RPC.Server server, String protocol, Writable writableRequest, long receiveTime) throws Exception {ProtoClassProtoImpl protocolImpl = getProtocolImpl (server, protoName,clientVersion); BlockingService service = (BlockingService) protocolImpl.protocolImpl; result = service.callBlockingMethod (methodDescriptor, null, param) Return new RpcResponseWrapper (result);}
This part of the code is the combination of hadoop rpc and protobuf, this place in addition, protbufImpl is NameNodeRpcServer initialization, has been prepared, and understand the call method under ProtoBufRpcInvoker, it really needs to be combined with the NameNodeRpcServer initialization process to understand. I get it vaguely. And the depth of this place will let you see something essential, for example, you will trace to ClientNamenodeProtocolServerSideTranslatorPB, then NameNodeRpcServer, then FSNamesystem, and finally you find that the operation of the file system on the server comes from FSNamesystem.
Go back to the run method in handler, and when the call method is called, it's Responder's turn to return the result.
"hadoop rpc server initialization and call process example analysis" content is introduced here, thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.