Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

The use and implementation principle of RPC for Hadoop Learning

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)06/02 Report--

Hadoop as a distributed system, the communication between cluster machines is the most basic and common requirement.

This requirement is essentially IPC, that is, interprocess communication. According to the traditional UINX programming model, interprocess communication is nothing more than the following ways:

Pipes, FIFO, message queues, semaphores, shared storage, sockets. Only sockets can communicate across the network of machines, which can meet the needs of hadoop.

Typically, programs for network communication use explicit network programming (that is, direct use of java.net packages). Such as Web browser, Web server and so on.

But there are other programs that use implicit network programming, such as hadoop RPC, a toolkit that encapsulates the underlying communication details.

This makes the underlying network communication transparent to programmers. On the one hand, it reduces the burden on programmers, and on the other hand, it abstracts the functional modules, which makes the responsibilities between modules clearer and easier to maintain.

First, show a hadoop RPC function demo to understand the use of hadoop RPC.

Step 1: add dependencies in the pom.xml file

Org.apache.hadoop hadoop-common 2.6.0

Step 2: add log4j.properties to src/main/resources

Log4j.rootLogger=DEBUG,consolelog4j.additivity.org.apache=true# (console) log4j.appender.console=org.apache.log4j.ConsoleAppenderlog4j.appender.console.Threshold=INFOlog4j.appender.console.ImmediateFlush=truelog4j.appender.console.Target=System.errlog4j.appender.console.layout=org.apache.log4j.PatternLayoutlog4j.appender.console.layout.ConversionPattern= [%-5p]% d (% r)-- > [% t]% l:% m% x% n

Step 3: define the RPC protocol

Package hadooprpc.demo;import java.io.IOException;interface ClientProtocol extends org.apache.hadoop.ipc.VersionedProtocol {/ / version number. By default, RPC Client and Server with different version numbers cannot communicate with each other public static final long versionID = 1L; String echo (String value) throws IOException;int add (int v1, int v2) throws IOException;}

Step 4: implementing the RPC protocol

Package hadooprpc.demo;import java.io.IOException;import org.apache.hadoop.ipc.ProtocolSignature;public class ClientProtocolImpl implements ClientProtocol {/ / overloaded method for getting custom protocol version number, public long getProtocolVersion (String protocol, long clientVersion) {return ClientProtocol.versionID;} / / overloaded method for obtaining protocol signature public ProtocolSignature getProtocolSignature (String protocol, long clientVersion, int hashcode) {return new ProtocolSignature (ClientProtocol.versionID, null) } public String echo (String value) throws IOException {return value;} public int add (int v1, int v2) throws IOException {return v1 + v2;}}

Step 5; construct and start RPC Server

Package hadooprpc.demo;import java.io.IOException;import org.apache.hadoop.HadoopIllegalArgumentException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.ipc.RPC;import org.apache.hadoop.ipc.Server;public class RPCServer {public static void main (String [] args) throws HadoopIllegalArgumentException, IOException {Configuration conf = new Configuration () Server server = new RPC.Builder (conf) .setProtocol (ClientProtocol.class) .setInstance (new ClientProtocolImpl ()) .setBindAddress ("localhost"). SetPort (8097) .setNumHandlers (5). Build (); server.start ();}}

Step 6: construct RPC Client and send RPC request

Package hadooprpc.demo;import java.io.IOException;import java.net.InetSocketAddress;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.ipc.RPC;public class RPCClient {public static void main (String [] args) throws IOException {ClientProtocol client = (ClientProtocol) RPC.getProxy (ClientProtocol.class, ClientProtocol.versionID, new InetSocketAddress (8097), new Configuration ()) Int result = client.add (5,6); System.out.println (result); String echoResult = client.echo ("result"); System.out.println (echoResult); RPC.stopProxy (client);-- close connection}}

Step 7: start RPC Server, and then execute RPC Client.

From the above example, we can find that through this programming method, we do not need to consider the details of the network layer, we only need to write interfaces and interface implementations.

Is it so clear? Through the source code of hadoop RPC, you may be able to see the leopard in the tube.

The implementation principle of hadoop RPC is very simple.

Client:

1. Through the dynamic proxy, get the method and parameter type of calling the interface.

two。 Encode the call information and send it to the server

3. Get the return value of the server and decode it.

4. Returns the return value of the calling method.

Server:

1. Start the server and listen to the client.

two。 Get the calling method and parameters sent by the client.

3. Execute the relevant methods in the implementation class.

4. Send the return value to the client.

After understanding the principle, I implemented a very rough RPC framework with only demonstration functions.

Package srpc;import java.io.*;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;import java.net.ServerSocket;import java.net.Socket;import java.net.SocketException;import java.text.MessageFormat;import java.util.HashMap;import java.util.Map;/** * Created by shgy on 17-5-7. * create a simple PRC framework. Parameters and return values only support the basic type * / / Interface interface MyProtocol {String echo (String msg); int add (int aline int b);} / / implement class MyProtocolImp implements MyProtocol {@ Override public String echo (String msg) {return msg;} @ Override public int add (int aline int b) {return aforb }} public class ImitateRPC {private static final Map > (); static {PRIMITIVE_NAMES.put ("boolean", Boolean.TYPE); PRIMITIVE_NAMES.put ("byte", Byte.TYPE); PRIMITIVE_NAMES.put ("char", Character.TYPE); PRIMITIVE_NAMES.put ("short", Short.TYPE); PRIMITIVE_NAMES.put ("int", Integer.TYPE) PRIMITIVE_NAMES.put ("long", Long.TYPE); PRIMITIVE_NAMES.put ("float", Float.TYPE); PRIMITIVE_NAMES.put ("double", Double.TYPE); PRIMITIVE_NAMES.put ("void", Void.TYPE);} public static class Server {private Class protocolClass; private Object protocolImpl; private ServerSocket server Public Server (Class protocolClass, Object protocolImpl) throws IOException {if (protocolImpl = = null) {throw new IllegalArgumentException ("protocolImpl is not set");} if (protocolClass = = null) {throw new IllegalArgumentException ("protocolClass is not set") } else {if (! protocolClass.isAssignableFrom (protocolImpl.getClass () {throw new IOException ("protocolClass" + protocolClass + "is not implemented by protocolImpl which is of class" + protocolImpl.getClass ());} this.protocolClass = protocolClass This.protocolImpl = protocolImpl;} public void start () {System.out.println ("start server"); try {this.server = new ServerSocket (8189); listen ();} catch (Exception e) {e.printStackTrace () }} public void close () {System.out.println ("close server"); try {this.server.close () } catch (IOException e) {}} private void listen () {new Thread () {@ Override public void run () {while (! server.isClosed ()) {Socket incoming = null Try {incoming = server.accept (); DataInputStream inStream = new DataInputStream (incoming.getInputStream ()); / / read the information of the calling method from the client int dataLen = inStream.readInt () Byte [] data = new byte [dataLen]; inStream.read (data,0, dataLen); DataInputStream contentStream = new DataInputStream (new ByteArrayInputStream (data)); String methodName = contentStream.readUTF (); int paramCount = contentStream.readInt () Class [] paramTypes = new Class [paramCount]; Object [] args = new Object [paramCount]; for

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report