In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-09 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly explains "how to write a RPC framework". The content in the article is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "how to write a RPC framework".
What should the RPC framework look like
Let's first take a look at: what is an RPC framework? Our most intuitive feeling is:
After integrating the RPC framework, configure the address of a registry. One application (called a service provider) "exposes" an interface (interface), while another application (called a service consumer) "references" this interface (interface), and then invokes it, which miraculously calls the method of another application.
It makes us feel as if we had called a local method. Even if two applications are not in the same JVM or even two applications are not on the same machine.
So how do they do it? When our service consumer invokes a method of a RPC interface, its underlying layer goes to the service provider's machine through a dynamic proxy and then through the network call, and then executes the corresponding method.
Then the result of the method is transmitted back to the service consumer through the network, and then the result can be obtained.
The whole process is shown below:
So at this time, someone may ask: how does the service consumer know which port of which machine the service provider is on?
At this point, a "registry" is needed, specifically like this:
When starting up, the service provider submits the information about the machine on which it is applied to the registry.
When the service consumer starts up, it will retrieve the information of the machine where the interface needs to be consumed.
In this way, the service consumer has a list of machines on which the service provider is located.
When the "service consumer" gets the list of machines of the "service provider", it can make a request through a network request.
What should we use for network clients? There are several options:
Use JDK native BIO (that is, the ServerSocket set). Blocking IO method cannot support high concurrency.
Use JDK native NIO (Selector, SelectionKey). Non-blocking IO can support high concurrency, but its own implementation is complex and needs to deal with a variety of network problems.
The use of the famous NIO framework Netty, natural support for high concurrency, encapsulation, API easy to use.
When the "service consumer" gets the list of machines of the "service provider", it can make a request through a network request.
As an aspiring programmer, we require the framework to be developed to support high concurrency, simplicity and speed.
Of course, choose Netty to achieve, using some very basic API of Netty can meet our needs.
Network protocol definition
Of course, since we are going to use the network to transmit data. First of all, we need to define a set of network protocols.
You may have to ask again, what is a network protocol? Network protocol, popular understanding, means that what the data sent by our client should look like, and the server can parse it out and know what to do.
Needless to say, in the code, suppose we now have two classes for the service provider:
/ / com.study.rpc.test.producer.HelloService public interface HelloService {String sayHello (TestBean testBean);} / / com.study.rpc.test.producer.TestBean public class TestBean {private String name; private Integer age; public TestBean (String name, Integer age) {this.name = name; this.age = age;} / / getter setter}
Now I'm going to call HelloService.sayHello (TestBean testBean).
As a "service consumer", how should we define our request so that the server knows that I am calling this method?
This requires us to generate a unique identity for this interface information: this identity records the interface name, the specific method, and then what the specific parameters are!
The information is then organized and sent to the server, which I do here by saving the information as a string in JSON format for transmission.
For example, in the above interface, the data we transmit is something like this:
{"interfaces": "interface=com.study.rpc.test.producer.HelloService&method=sayHello& parameter=com.study.rpc.test.producer.TestBean", "requestId": "3", "parameter": {"com.study.rpc.test.producer.TestBean": {"age": 20, "name": "Zhang San"}
Well, I use a JSON to identify which method of which interface is called, where interface identifies the unique class, parameter identifies the specific parameters in it, where key is the fully qualified name of the parameter, and value is the JSON information of this class.
As you may see here, you may have an opinion: data may not be transmitted in JSON format, and the use of JSON may not necessarily have the highest performance.
You use JDK's Serializable with Netty's ObjectDecoder to implement, of course, this is also possible, in fact, here is an expansion point, we should provide a variety of serialization methods for users to choose from.
But the reason why I chose JSON here is that it is more intuitive and reasonable for writing articles.
Develop service provider
Well, after settling the network protocol, we began to develop "service providers". For service providers, because we are writing a simple version of the RPC framework here, to keep it concise.
We won't introduce a container framework like Spring, so we need to define a configuration class for a service provider that defines what interface the service provider is, and then what its specific instance object is:
Public class ServiceConfig {public Classtype; public T instance; public ServiceConfig (Classtype, T instance) {this.type = type; this.instance = instance;} public ClassgetType () {return type;} public void setType (Classtype) {this.type = type;} public T getInstance () {return instance } public void setInstance (T instance) {this.instance = instance;}}
With this thing, we know which interfaces need to be exposed. In order for the framework to have a unified entry, I defined a class called ApplicationContext, which can be thought of as an application context whose constructor takes two parameters.
The code is as follows:
Public ApplicationContext (String registryUrl, ListserviceConfigs) {/ / 1. Save the interface configuration that needs to be exposed this.serviceConfigs = serviceConfigs = = null? New ArrayList (): serviceConfigs; / / step 2: instantiate the registry initRegistry (registryUrl); / / step 3: register the interface with the registry, obtain the interface from the registry, initialize the service interface list RegistryInfo registryInfo = null; InetAddress addr = InetAddress.getLocalHost (); String hostname = addr.getHostName (); String hostAddress = addr.getHostAddress (); registryInfo = new RegistryInfo (hostname, hostAddress, port); doRegistry (registryInfo) / / step 4: initialize the Netty server, receive the request, and call if (! this.serviceConfigs.isEmpty ()) {/ / the service provider's service method to expose nettyServer = new NettyServer (this.serviceConfigs, interfaceMethods); nettyServer.init (port);}}
Registry design
There are several steps here. First, the interface configuration is saved, and then the registry is initialized. Because the registry may provide a variety of options for users to choose from, you need to define an interface for the registry:
Public interface Registry {/ * registers the producer interface with the registry * * @ param clazz class * @ param registryInfo native registration information * / void register (Class clazz, RegistryInfo registryInfo) throws Exception;}
Here we provide a registration method whose semantics is to register the interface corresponding to clazz with the registry.
Receive two parameters, one is the class object of the interface, and the other is the registration information, which contains some basic information of the local machine, as follows:
Public class RegistryInfo {private String hostname; private String ip; private Integer port; public RegistryInfo (String hostname, String ip, Integer port) {this.hostname = hostname; this.ip = ip; this.port = port;} / / getter setter}
All right, define the registry and go back to the place where you instantiated the registry. The code is as follows:
/ * Registry * / private Registry registry; private void initRegistry (String registryUrl) {if (registryUrl.startsWith ("zookeeper://")) {registryUrl = registryUrl.substring (12); registry = new ZookeeperRegistry (registryUrl);} else if (registryUrl.startsWith ("multicast://")) {registry = new MulticastRegistry (registryUrl);}}
Here the logic is also very simple, that is, according to the schema of url to determine which registry, the registry here implements two implementation classes, respectively using Zookeeper as the registry, and the other is using broadcast as the registry.
Broadcast registry here is only a demonstration, there is no internal implementation. We mainly implemented the registry of Zookeeper.
Of course, if you are interested, you can implement more registries for users to choose from, such as Redis, just to maintain the "expansion point".
So after instantiating the registry, go back to the above code.
Register a service provider
/ / step 3: register the interface with the registry, obtain the interface from the registry, initialize the service interface list RegistryInfo registryInfo = null; InetAddress addr = InetAddress.getLocalHost (); String hostname = addr.getHostName (); String hostAddress = addr.getHostAddress (); registryInfo = new RegistryInfo (hostname, hostAddress, port); doRegistry (registryInfo)
The logic here is very simple, that is, the basic information of the machine is obtained to construct RegistryInfo, and then the doRegistry method is called:
/ * Interface method corresponds to method object * / private MapinterfaceMethods = new ConcurrentHashMap (); private void doRegistry (RegistryInfo registryInfo) throws Exception {for (ServiceConfig config: serviceConfigs) {Class type = config.getType (); registry.register (type, registryInfo); Method [] declaredMethods = type.getDeclaredMethods (); for (Method method: declaredMethods) {String identify = InvokeUtils.buildInterfaceMethodIdentify (type, method) InterfaceMethods.put (identify, method);}
Two things have been done here:
Register the interface with the registry.
For each method of each interface, a unique identity is generated and saved in the interfaceMethods collection.
The following is a separate analysis of these two things, the first is the registration method: because we use Zookeeper, for convenience, the client framework Curator of Zookeeper is introduced.
Org.apache.curatorgroupId > curator-recipesartifactId > 2.3.0version > dependency >
Then look at the code:
Public class ZookeeperRegistry implements Registry {private CuratorFramework client; public ZookeeperRegistry (String connectString) {RetryPolicy retryPolicy = new ExponentialBackoffRetry (1000, 3); client = CuratorFrameworkFactory.newClient (connectString, retryPolicy); client.start (); try {Stat myRPC = client.checkExists (). ForPath ("/ myRPC") If (myRPC = = null) {client.create () .creatingParentsIfNeutral () .forPath ("/ myRPC");} System.out.println ("Zookeeper Client initialized.");} catch (Exception e) {e.printStackTrace () @ Override public void register (Class clazz, RegistryInfo registryInfo) throws Exception {/ / 1. When registering, first get the data from zk / / 2. Add your own server address to the registry / / register a temporary node for each method of each interface, then key is the unique identification of the interface method, and data is the service address list Method [] declaredMethods = clazz.getDeclaredMethods (); for (Method method: declaredMethods) {String key = InvokeUtils.buildInterfaceMethodIdentify (clazz, method); String path = "/ myRPC/" + key Stat stat = client.checkExists () .forPath (path); ListregistryInfos; if (stat! = null) {/ / if someone has already registered for this interface, get the data back and save your information in byte [] bytes = client.getData () .forPath (path) String data = new String (bytes, StandardCharsets.UTF_8); registryInfos = JSONArray.parseArray (data, RegistryInfo.class) If (registryInfos.contains (registryInfo)) {/ / normally, the temporary node of zk will disappear after disconnection, but restart will often find that there is a node, so there is such a code System.out.println ("address list already contains native [" + key + "], do not register") } else {registryInfos.add (registryInfo); client.setData (). ForPath (path, JSONArray.toJSONString (registryInfos). GetBytes ()); System.out.println ("register to the registry, path: [" + path + "] Information is:" + registryInfo) }} else {registryInfos = new ArrayList (); registryInfos.add (registryInfo) Client.create () .creatingParentsIfNeutral () / / temporary node, disconnect closes .withMode (CreateMode.EPHEMERAL) .forPath (path, JSONArray.toJSONString (registryInfos) .forPath ()) System.out.println ("sign up to the registry, path: [" + path + "] Information:" + registryInfo);}
The Zookeeper registry establishes a connection when initializing. Then when you register, a unique identity is generated for each method of the clazz interface.
The InvokeUtils.buildInterfaceMethodIdentify method is used here:
Public static String buildInterfaceMethodIdentify (Class clazz, Method method) {Map map = new LinkedHashMap (); map.put ("interface", clazz.getName ()); map.put ("method", method.getName ()); Parameter [] parameters = method.getParameters (); if (parameters.length > 0) {StringBuilder param = new StringBuilder (); for (int I = 0; I)
< parameters.length; i++) { Parameter p = parameters[i]; param.append(p.getType().getName()); if (i < parameters.length - 1) { param.append(","); } } map.put("parameter", param.toString()); } return map2String(map); } public static String map2String(Map map) { StringBuilder sb = new StringBuilder(); Iterator iterator = map.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry entry = iterator.next(); sb.append(entry.getKey() + "=" + entry.getValue()); if (iterator.hasNext()) { sb.append("&"); } } return sb.toString(); } 其实就是对接口的方法使用他们的限定名和参数来组成一个唯一的标识,比如 HelloService#sayHello(TestBean) 生成的大概是这样的: interface=com.study.rpc.test.producer.HelloService&method=sayHello& parameter=com.study.rpc.test.producer.TestBean 接下来的逻辑就简单了,在 Zookeeper 中的 /myRPC 路径下面建立临时节点,节点名称为我们上面的接口方法唯一标识,数据内容为机器信息。 之所以采用临时节点是因为:如果机器宕机了,连接断开之后,消费者可以通过 Zookeeper 的 watcher 机制感知到。 大概看起来是这样的: /myRPC/interface=com.study.rpc.test.producer.HelloService&method=sayHello& parameter=com.study.rpc.test.producer.TestBean [ { "hostname":peer1, "port":8080 }, { "hostname":peer2, "port":8081 } ] 通过这样的方式,在服务消费的时候就可以拿到这样的注册信息,然后知道可以调用那台机器的那个端口。 好了,注册中心弄完了之后,我们回到前面说的注册方法做的第二件事情,我们将每一个接口方法标识的方法放入了一个 map 中: /** * 接口方法对应method对象 */ private Map interfaceMethods = new ConcurrentHashMap(); 这个的原因是因为,我们在收到网络请求的时候,需要调用反射的方式调用 Method 对象,所以存起来。 启动网络服务端接受请求 接下来我们就可以看第四步了: // step 4:初始化Netty服务器,接受到请求,直接打到服务提供者的service方法中 if (!this.serviceConfigs.isEmpty()) { // 需要暴露接口才暴露 nettyServer = new NettyServer(this.serviceConfigs, interfaceMethods); nettyServer.init(port); } 因为这里使用 Netty 来做的所以需要引入 Netty 的依赖: io.nettygroupId>Netty-allartifactId > 4.1.30.Finalversion > dependency >
Next, let's analyze:
Public class NettyServer {/ * handler * / private RpcInvokeHandler rpcInvokeHandler; public NettyServer (ListserverConfigs, MapinterfaceMethods) throws InterruptedException {this.rpcInvokeHandler = new RpcInvokeHandler (serverConfigs, interfaceMethods);} public int init (int port) throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup (); EventLoopGroup workerGroup = new NioEventLoopGroup (); ServerBootstrap b = new ServerBootstrap () B.group (bossGroup, workerGroup) .channel (NioServerSocketChannel.class) .option (ChannelOption.SO_BACKLOG, 1024) .childHandler (new ChannelInitializer () {@ Override protected void initChannel (SocketChannel ch) throws Exception {ByteBuf delimiter = Unpooled.copiedBuffer ("$$") / / set to split messages according to the delimiter "& &". A single message is limited to 1MB ch.pipeline () .addLast (new DelimiterBasedFrameDecoder (1024 * 1024, delimiter)); ch.pipeline () .addLast (new StringDecoder ()); ch.pipeline () .addLast () .addLast (rpcInvokeHandler) }}); ChannelFuture sync = b.bind (port) .sync (); System.out.println ("start NettyService, port is:" + port); return port;}}
The main part of this part is the API of Netty. Let's not explain it too much, but just say it briefly:
We use "& &" as an identifier to distinguish between two pieces of information, and then the maximum length of a message is 1MB.
All the logic is in the RpcInvokeHandler, where the configured service interface instance and the Map collection of the corresponding Method object are uniquely identified by each interface method of the service interface instance.
The public class RpcInvokeHandler extends ChannelInboundHandlerAdapter {/ * interface method uniquely identifies the implementation class * / private Map interfaceToInstance corresponding to the corresponding Method object * / private Map interfaceMethods; / * * interface. / * Thread pool, written at will, do not complain * / private ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor (10,50,60, TimeUnit.SECONDS, new LinkedBlockingQueue (100), new ThreadFactory () {AtomicInteger m = new AtomicInteger (0) @ Override public Thread newThread (Runnable r) {return newThread (r, "IO-thread-" + m.incrementAndGet ();}}); public RpcInvokeHandler (ListserviceConfigList, Map interfaceMethods) {this.interfaceToInstance = new ConcurrentHashMap (); this.interfaceMethods = interfaceMethods For (ServiceConfig config: serviceConfigList) {interfaceToInstance.put (config.getType (), config.getInstance ());} @ Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception {try {String message = (String) msg / / what you get here is a string of JSON data, which is parsed into Request objects. / / in fact, when you parse network data, you can use serialization, define an interface, serialize in JSON format, or other serialization / / but forget the demo version. System.out.println ("received message:" + msg); RpcRequest request = RpcRequest.parse (message, ctx); threadPoolExecutor.execute (new RpcInvokeTask (request));} finally {ReferenceCountUtil.release (msg);}} @ Override public void channelReadComplete (ChannelHandlerContext ctx) throws Exception {ctx.flush () } @ Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println ("exception..." + cause); cause.printStackTrace (); ctx.close ();} public class RpcInvokeTask implements Runnable {private RpcRequest rpcRequest; RpcInvokeTask (RpcRequest rpcRequest) {this.rpcRequest = rpcRequest } @ Override public void run () {try {/ * * data looks like this * {"interfaces": "interface=com.study.rpc.test.producer.HelloService&method=sayHello meter=com * .study.rpc.test.producer.TestBean", "requestId": "3" "parameter": {"com.study.rpc.test.producer * .TestBean": {"age": 20, "name": "Zhang San"}} * / you want to get a specific declaration String interfaceIdentity = rpcRequest.getInterfaceIdentity () for each interface of each service object. Method method = interfaceMethods.get (interfaceIdentity); Map map = string2Map (interfaceIdentity); String interfaceName = map.get ("interface"); Class interfaceClass = Class.forName (interfaceName); Object o = interfaceToInstance.get (interfaceClass); String parameterString = map.get ("parameter"); Object result If (parameterString! = null) {String [] parameterTypeClass = parameterString.split (","); Map parameterMap = rpcRequest.getParameterMap (); Object [] parameterInstance = new Object [parameterTypeClass.length]; for (int I = 0; I
< parameterTypeClass.length; i++) { String parameterClazz = parameterTypeClass[i]; parameterInstance[i] = parameterMap.get(parameterClazz); } result = method.invoke(o, parameterInstance); } else { result = method.invoke(o); } // 写回响应 ChannelHandlerContext ctx = rpcRequest.getCtx(); String requestId = rpcRequest.getRequestId(); RpcResponse response = RpcResponse.create(JSONObject.toJSONString(result), interfaceIdentity, requestId); String s = JSONObject.toJSONString(response) + "$$"; ByteBuf byteBuf = Unpooled.copiedBuffer(s.getBytes()); ctx.writeAndFlush(byteBuf); System.out.println("响应给客户端:" + s); } catch (Exception e) { e.printStackTrace(); } } public static Map string2Map(String str) { String[] split = str.split("&"); Map map = new HashMap(16); for (String s : split) { String[] split1 = s.split("="); map.put(split1[0], split1[1]); } return map; } } } 这里说明一下上面的逻辑:channelRead 方法用于接收消息,接收到的就是我们前面分析的那个 JSON 格式的数据,接着我们将消息解析成 RpcRequest。 public class RpcRequest { private String interfaceIdentity; private Map parameterMap = new HashMap(); private ChannelHandlerContext ctx; private String requestId; public static RpcRequest parse(String message, ChannelHandlerContext ctx) throws ClassNotFoundException { /* * { * "interfaces":"interface=com.study.rpc.test.producer.HelloService&method=sayHello2¶meter=java.lang * .String,com.study.rpc.test.producer.TestBean", * "parameter":{ * "java.lang.String":"haha", * "com.study.rpc.test.producer.TestBean":{ * "name":"小王", * "age":20 * } * } * } */ JSONObject jsonObject = JSONObject.parseObject(message); String interfaces = jsonObject.getString("interfaces"); JSONObject parameter = jsonObject.getJSONObject("parameter"); Set strings = parameter.keySet(); RpcRequest request = new RpcRequest(); request.setInterfaceIdentity(interfaces); Map parameterMap = new HashMap(16); String requestId = jsonObject.getString("requestId"); for (String key : strings) { if (key.equals("java.lang.String")) { parameterMap.put(key, parameter.getString(key)); } else { Class clazz = Class.forName(key); Object object = parameter.getObject(key, clazz); parameterMap.put(key, object); } } request.setParameterMap(parameterMap); request.setCtx(ctx); request.setRequestId(requestId); return request; } } 接着从 request 中解析出来需要调用的接口,然后通过反射调用对应的接口,得到结果后我们将响应封装成 PrcResponse 写回给客户端: public class RpcResponse { private String result; private String interfaceMethodIdentify; private String requestId; public String getResult() { return result; } public void setResult(String result) { this.result = result; } public static RpcResponse create(String result, String interfaceMethodIdentify, String requestId) { RpcResponse response = new RpcResponse(); response.setResult(result); response.setInterfaceMethodIdentify(interfaceMethodIdentify); response.setRequestId(requestId); return response; } } 里面包含了请求的结果 JSON 串,接口方法唯一标识,请求 ID。数据大概看起来这个样子: {"interfaceMethodIdentify":"interface=com.study.rpc.test.producer.HelloService&method=sayHello& parameter=com.study.rpc.test.producer.TestBean","requestId":"3", "result":"\"牛逼,我收到了消息:TestBean{name='张三', age=20}\""} 通过这样的信息,客户端就可以通过响应结果解析出来。 测试服务提供者 既然我们代码写完了,现在需要测试一把,首先我们先写一个 HelloService 的实现类出来: public class HelloServiceImpl implements HelloService { @Override public String sayHello(TestBean testBean) { return "牛逼,我收到了消息:" + testBean; } } 接着编写服务提供者代码: public class TestProducer { public static void main(String[] args) throws Exception { String connectionString = "zookeeper://localhost1:2181,localhost2:2181,localhost3:2181"; HelloService service = new HelloServiceImpl(); ServiceConfig config = new ServiceConfig(HelloService.class, service); ListserviceConfigList = new ArrayList(); serviceConfigList.add(config); ApplicationContext ctx = new ApplicationContext(connectionString, serviceConfigList, null, 50071); } } 接着启动起来,看到日志: Zookeeper Client初始化完毕...... 注册到注册中心,路径为:【/myRPC/interface=com.study.rpc.test.producer.HelloService& method=sayHello¶meter=com.study.rpc.test.producer.TestBean】 信息为:RegistryInfo{hostname='localhost', ip='192.168.16.7', port=50071} 启动NettyService,端口为:50071 这个时候,我们期望用 NettyClient 发送请求: { "interfaces": "interface=com.study.rpc.test.producer.HelloService& method=sayHello¶meter=com.study.rpc.test.producer.TestBean", "requestId": "3", "parameter": { "com.study.rpc.test.producer.TestBean": { "age": 20, "name": "张三" } } } 得到的响应应该是: {"interfaceMethodIdentify":"interface=com.study.rpc.test.producer.HelloService&method=sayHello& parameter=com.study.rpc.test.producer.TestBean","requestId":"3", "result":"\"牛逼,我收到了消息:TestBean{name='张三', age=20}\""} 那么,可以编写一个测试程序(这个程序仅仅用于中间测试用,读者不必理解): public class NettyClient { public static void main(String[] args) { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new NettyClientHandler()); } }); ChannelFuture sync = b.connect("127.0.0.1", 50071).sync(); sync.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } private static class NettyClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { JSONObject jsonObject = new JSONObject(); jsonObject.put("interfaces", "interface=com.study.rpc.test.producer" + ".HelloService&method=sayHello¶meter=com.study.rpc.test.producer.TestBean"); JSONObject param = new JSONObject(); JSONObject bean = new JSONObject(); bean.put("age", 20); bean.put("name", "张三"); param.put("com.study.rpc.test.producer.TestBean", bean); jsonObject.put("parameter", param); jsonObject.put("requestId", 3); System.out.println("发送给服务端JSON为:" + jsonObject.toJSONString()); String msg = jsonObject.toJSONString() + "$$"; ByteBuf byteBuf = Unpooled.buffer(msg.getBytes().length); byteBuf.writeBytes(msg.getBytes()); ctx.writeAndFlush(byteBuf); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("收到消息:" + msg); } } } 启动之后,看到控制台输出: 发送给服务端JSON为:{"interfaces":"interface=com.study.rpc.test.producer.HelloService&method=sayHello& parameter=com.study.rpc.test.producer.TestBean","requestId":3, "parameter":{"com.study.rpc.test.producer.TestBean":{"name":"张三","age":20}}} 收到消息:{"interfaceMethodIdentify":"interface=com.study.rpc.test.producer.HelloService& method=sayHello¶meter=com.study.rpc.test.producer.TestBean","requestId":"3", "result":"\"牛逼,我收到了消息:TestBean{name='张三', age=20}\""} Bingo,完美实现了 RPC 的服务提供者。接下来我们只需要实现服务消费者就完成了。 开发服务消费者 服务消费者是同样的处理,我们同样要定义一个消费者的配置: public class ReferenceConfig{ private Class type; public ReferenceConfig(Classtype) { this.type = type; } public ClassgetType() { return type; } public void setType(Classtype) { this.type = type; } } 然后我们是统一入口,在 ApplicationContext 中修改代码: public ApplicationContext(String registryUrl, ListserviceConfigs, ListreferenceConfigs, int port) throws Exception { // step 1: 保存服务提供者和消费者 this.serviceConfigs = serviceConfigs == null ? new ArrayList() : serviceConfigs; this.referenceConfigs = referenceConfigs == null ? new ArrayList() : referenceConfigs; // .... } private void doRegistry(RegistryInfo registryInfo) throws Exception { for (ServiceConfig config : serviceConfigs) { Class type = config.getType(); registry.register(type, registryInfo); Method[] declaredMethods = type.getDeclaredMethods(); for (Method method : declaredMethods) { String identify = InvokeUtils.buildInterfaceMethodIdentify(type, method); interfaceMethods.put(identify, method); } } for (ReferenceConfig config : referenceConfigs) { ListregistryInfos = registry.fetchRegistry(config.getType()); if (registryInfos != null) { interfacesMethodRegistryList.put(config.getType(), registryInfos); initChannel(registryInfos); } } } 在注册的时候,我们需要将需要消费的接口,通过注册中心抓取出来,所以注册中心要增加一个接口方法: public interface Registry { /** * 将生产者接口注册到注册中心 * * @param clazz 类 * @param registryInfo 本机的注册信息 */ void register(Class clazz, RegistryInfo registryInfo) throws Exception; /** * 为服务提供者抓取注册表 * * @param clazz 类 * @return 服务提供者所在的机器列表 */ ListfetchRegistry(Class clazz) throws Exception; } 获取服务提供者的机器列表 具体在 Zookeeper 中的实现如下: @Override public ListfetchRegistry(Class clazz) throws Exception { Method[] declaredMethods = clazz.getDeclaredMethods(); ListregistryInfos = null; for (Method method : declaredMethods) { String key = InvokeUtils.buildInterfaceMethodIdentify(clazz, method); String path = "/myRPC/" + key; Stat stat = client.checkExists() .forPath(path); if (stat == null) { // 这里可以添加watcher来监听变化,这里简化了,没有做这个事情 System.out.println("警告:无法找到服务接口:" + path); continue; } if (registryInfos == null) { byte[] bytes = client.getData().forPath(path); String data = new String(bytes, StandardCharsets.UTF_8); registryInfos = JSONArray.parseArray(data, RegistryInfo.class); } } return registryInfos; } 其实就是去 Zookeeper 获取节点中的数据,得到接口所在的机器信息,获取到的注册信息诸侯,就会调用以下代码: if (registryInfos != null) { // 保存接口和服务地址 interfacesMethodRegistryList.put(config.getType(), registryInfos); // 初始化网络连接 initChannel(registryInfos); } private void initChannel(ListregistryInfos) throws InterruptedException { for (RegistryInfo info : registryInfos) { if (!channels.containsKey(info)) { System.out.println("开始建立连接:" + info.getIp() + ", " + info.getPort()); NettyClient client = new NettyClient(info.getIp(), info.getPort()); client.setMessageCallback(message ->{/ / the message returned by the receiving server is first pressed into the queue RpcResponse response = JSONObject.parseObject (message, RpcResponse.class); responses.offer (response); synchronized (ApplicationContext.this) {ApplicationContext.this.notifyAll ();}}) / / wait for connection to be established ChannelHandlerContext ctx = client.getCtx (); channels.put (info, ctx);}
We will establish a connection for each unique RegistryInfo, and then have a piece of code like this:
Client.setMessageCallback (message-> {/ / messages returned by the receiving server here, RpcResponse response = JSONObject.parseObject (message, RpcResponse.class); responses.offer (response); synchronized (ApplicationContext.this) {ApplicationContext.this.notifyAll ();}})
Set up a callback to call back the code here when you receive a message, which we will analyze later.
Then in the case of client.getCtx (), the synchronization blocks until the connection is completed, and after the connection is established, the NettyClient code is as follows:
Public class NettyClient {private ChannelHandlerContext ctx; private MessageCallback messageCallback; public NettyClient (String ip, Integer port) {EventLoopGroup group = new NioEventLoopGroup (); try {Bootstrap b = new Bootstrap () B.group (group) .channel (NioSocketChannel.class) .option (ChannelOption.TCP_NODELAY) True) .handler (new ChannelInitializer () {@ Override protected void initChannel (SocketChannel ch) throws Exception {ByteBuf delimiter = Unpooled.copiedBuffer ("$" .getBytes () / / sets the message to be segmented according to the delimiter "& &". A single message is limited to 1MB ch.pipeline () .addLast (new DelimiterBasedFrameDecoder (1024 * 1024, delimiter)); ch.pipeline () .addLast (new StringDecoder ()); ch.pipeline () .addLast (new NettyClientHandler ()) }}); ChannelFuture sync = b.connect (ip, port). Sync ();} catch (Exception e) {e.printStackTrace ();}} public void setMessageCallback (MessageCallback callback) {this.messageCallback = callback } public ChannelHandlerContext getCtx () throws InterruptedException {System.out.println ("waiting for a successful connection..."); if (ctx = = null) {synchronized (this) {wait ();}} return ctx } private class NettyClientHandler extends ChannelInboundHandlerAdapter {@ Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception {try {String message = (String) msg; if (messageCallback! = null) {messageCallback.onMessage (message);}} finally {ReferenceCountUtil.release (msg) } @ Override public void channelActive (ChannelHandlerContext ctx) throws Exception {NettyClient.this.ctx = ctx; System.out.println ("connection succeeded:" + ctx); synchronized (NettyClient.this) {NettyClient.this.notifyAll () } @ Override public void channelReadComplete (ChannelHandlerContext ctx) throws Exception {ctx.flush ();} @ Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace ();}} public interface MessageCallback {void onMessage (String message);}}
Here, wait () and notifyAll () are mainly used to implement synchronous blocking waiting for the connection to be established. After the connection is established, we save it to the collection:
/ / waiting for connection to be established ChannelHandlerContext ctx = client.getCtx (); channels.put (info, ctx)
Send a request
Well, here we have established a network connection for each interface that needs to be consumed, and the next thing to do is to provide an interface for the user to get an instance of the service provider.
I wrote this method in ApplicationContext:
/ * the class responsible for generating requestId * / private LongAdder requestIdWorker = new LongAdder (); / * get the calling service * / @ SuppressWarnings ("unchecked") publicT getService (Classclazz) {return (T) Proxy.newProxyInstance (getClass (). GetClassLoader (), new Class [] {clazz}, new InvocationHandler () {@ Override public Object invoke (Object proxy, Method method, Object [] args) throws Throwable {String methodName = method.getName () If ("equals" .equals (methodName) | | "hashCode" .equals (methodName)) {throw new IllegalAccessException ("cannot access" + methodName + "method");} if ("toString" .equals (methodName)) {return clazz.getName () + "#" + methodName } / / step 1: get service address list ListregistryInfos = interfacesMethodRegistryList.get (clazz); if (registryInfos = = null) {throw new RuntimeException ("service provider cannot be found");} / / step 2: load balancer RegistryInfo registryInfo = loadBalancer.choose (registryInfos) ChannelHandlerContext ctx = channels.get (registryInfo); String identify = InvokeUtils.buildInterfaceMethodIdentify (clazz, method); String requestId; synchronized (ApplicationContext.this) {requestIdWorker.increment (); requestId = String.valueOf (requestIdWorker.longValue ());} Invoker invoker = new DefaultInvoker (method.getReturnType (), ctx, requestId, identify) InProgressInvoker.put (identify + "#" + requestId, invoker); return invoker.invoke (args);});}
This is mainly achieved through dynamic agents, first through class to get the corresponding machine list, and then through loadBalancer to select a machine.
This LoaderBalance is an interface:
Public interface LoadBalancer {/ * Select a producer * * @ param registryInfos producer list * @ return selected producer * / RegistryInfo choose (ListregistryInfos);}
You can choose different implementations when initializing ApplicationContext. Here I mainly implement a simple random algorithm (which can be extended to other ones later, such as RoundRobin):
Public class RandomLoadbalancer implements LoadBalancer {@ Override public RegistryInfo choose (ListregistryInfos) {Random random = new Random (); int index = random.nextInt (registryInfos.size ()); return registryInfos.get (index);}}
Then construct a unique identifier of the interface method, identify, and a requestId. Why do you need a requestId?
This is because when we are dealing with a response, we need to find out which request a response corresponds to, but it is not possible to use identify alone, because we may have multiple threads in the same application calling the same method of the same interface at the same time, and the identify is the same.
So we need to judge in the way of identify+requestId that reqeustId is a self-increasing LongAddr. The server will return requestId when it responds.
Then we construct an Invoker and put it into the collection of inProgressInvoker. Its invoke method is called:
Invoker invoker = new DefaultInvoker (method.getReturnType (), ctx, requestId, identify); inProgressInvoker.put (identify + "#" + requestId, invoker); / / blocking waiting result return invoker.invoke (args); public class DefaultInvokerimplements Invoker {private ChannelHandlerContext ctx; private String requestId; private String identify; private ClassreturnType; private T result; DefaultInvoker (ClassreturnType, ChannelHandlerContext ctx, String requestId, String identify) {this.returnType = returnType; this.ctx = ctx This.requestId = requestId; this.identify = identify;} @ SuppressWarnings ("unckecked") @ Override public T invoke (Object [] args) {JSONObject jsonObject = new JSONObject (); jsonObject.put ("interfaces", identify); JSONObject param = new JSONObject () If (args! = null) {for (Object obj: args) {param.put (obj.getClass (). GetName (), obj);} jsonObject.put ("parameter", param); jsonObject.put ("requestId", requestId); System.out.println ("JSON sent to the server is:" + jsonObject.toJSONString ()) String msg = jsonObject.toJSONString () + "$$"; ByteBuf byteBuf = Unpooled.buffer (msg.getBytes () .length); byteBuf.writeBytes (msg.getBytes ()); ctx.writeAndFlush (byteBuf); waitForResult (); return result;} @ Override public void setResult (String result) {synchronized (this) {this.result = JSONObject.parseObject (result, returnType) NotifyAll ();} private void waitForResult () {synchronized (this) {try {wait ();} catch (InterruptedException e) {e.printStackTrace ();}
We can see that after calling the invoke method of Invoker, it runs to waitForResult (), where the request has been sent over the network, but it will get stuck.
This is because the result of our network request is not returned synchronously, it is possible that the client initiates many requests at the same time, so it is impossible for him to block and wait synchronously here.
Accept response
So for the service consumer, the request is sent but stuck, and when the server has finished processing, the message will be returned to the client.
The returned entry is in the onChannelRead of NettyClient:
Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception {try {String message = (String) msg; if (messageCallback! = null) {messageCallback.onMessage (message);}} finally {ReferenceCountUtil.release (msg);}}
Here is called back through callback. Do you remember that when we initialize NettyClient, we set up a callback?
/ * response queue * / private ConcurrentLinkedQueueresponses = new ConcurrentLinkedQueue (); client.setMessageCallback (message-> {/ / the message returned by the receiving server here, RpcResponse response = JSONObject.parseObject (message, RpcResponse.class); responses.offer (response); synchronized (ApplicationContext.this) {ApplicationContext.this.notifyAll ();}})
After the message is accepted here, it is parsed into a RpcResponse object and then pressed into the responses queue, so that we put all the request responses into the queue.
But then, how should we return the response result to the place of the call?
We can do this: start one or more background threads, then take the response from the queue, then find the corresponding Invoker from the inProcessInvoker we saved earlier, and return the result.
Public ApplicationContext (....) {/ /. / / step 5: start the processor initProcessor () to process the response;} private void initProcessor () {/ / in fact, how many processor int num = 3 can be started through the configuration file read here; processors = new ResponseProcessor [num]; for (int I = 0; I < 3; iTunes +) {processors [I] = createProcessor (I) }} / * Thread processing response * / private class ResponseProcessor extends Thread {@ Override public void run () {System.out.println ("start response processing thread:" + getName ()); while (true) {/ / multiple threads get the response here, only one successful RpcResponse response = responses.poll () If (response = = null) {try {synchronized (ApplicationContext.this) {/ / if there is no response, hibernate ApplicationContext.this.wait () }} catch (InterruptedException e) {e.printStackTrace ();}} else {System.out.println ("receive a response:" + response); String interfaceMethodIdentify = response.getInterfaceMethodIdentify (); String requestId = response.getRequestId () String key = interfaceMethodIdentify + "#" + requestId; Invoker invoker = inProgressInvoker.remove (key); invoker.setResult (response.getResult ());}
If the data is not available from the queue, the wait () method is called to wait. It is important to note that when we get the response in callbak, we will call notifyAll () to wake up the thread here:
Responses.offer (response); synchronized (ApplicationContext.this) {ApplicationContext.this.notifyAll ();}
After waking up here, there will be multiple threads competing for that response, because the queue is thread-safe, so multiple threads can get the response results.
After getting the result, construct a unique request ID through identify+requestId, obtain the corresponding invoker from inProgressInvoker, and then set the result into it through setResult:
String key = interfaceMethodIdentify + "#" + requestId; Invoker invoker = inProgressInvoker.remove (key); invoker.setResult (response.getResult ()); @ Override public void setResult (String result) {synchronized (this) {this.result = JSONObject.parseObject (result, returnType); notifyAll ();}}
Once set up here, the result will be deserialized with json to the desired result of the user, and then its notifyAll method will be called to wake up the thread whose invoke method is blocked:
@ SuppressWarnings ("unckecked") @ Override public T invoke (Object [] args) {JSONObject jsonObject = new JSONObject (); jsonObject.put ("interfaces", identify); JSONObject param = new JSONObject (); if (args! = null) {for (Object obj: args) {param.put (obj.getClass (). GetName (), obj) }} jsonObject.put ("parameter", param); jsonObject.put ("requestId", requestId); System.out.println ("sent to the server:" + jsonObject.toJSONString ()); String msg = jsonObject.toJSONString () + NettyServer.DELIMITER; ByteBuf byteBuf = Unpooled.buffer (msg.getBytes () .length); byteBuf.writeBytes (msg.getBytes ()) Ctx.writeAndFlush (byteBuf); / / Wake up here waitForResult (); return result;}
The results can then be returned, and the returned results will be returned to the user.
Overall test
At this point, our producer and consumer code has been written, let's test it as a whole. The producer's code is the same as before:
Public class TestProducer {public static void main (String [] args) throws Exception {String connectionString = "zookeeper://localhost1:2181,localhost2:2182,localhost3:2181"; HelloService service = new HelloServiceImpl (); ServiceConfig config = new ServiceConfig (HelloService.class, service); ListserviceConfigList = new ArrayList (); serviceConfigList.add (config); ApplicationContext ctx = new ApplicationContext (connectionString, serviceConfigList, null, 50071);}}
Consumer test code:
Public class TestConsumer {public static void main (String [] args) throws Exception {String connectionString = "zookeeper://localhost1:2181,localhost2:2182,localhost3:2181"; ReferenceConfigconfig = new ReferenceConfig (HelloService.class); ApplicationContext ctx = new ApplicationContext (connectionString, null, Collections.singletonList (config), 50070); HelloService helloService = ctx.getService (HelloService.class) System.out.println ("sayHello (TestBean) result is:" + helloService.sayHello (new TestBean ("Zhang San", 20));}}
Then start the producer, then start the consumer. The log obtained by the producer is as follows:
Zookeeper Client initialization completed. Register to the registry, the path is: [/ myRPC/interface=com.study.rpc.test.producer.HelloService& method=sayHello meter=com.study.rpc.test.producer.TestBean] Information: RegistryInfo {hostname='localhost', ip='192.168.16.7', port=50071} launch NettyService Port is: 50071 start response processing thread: Response-processor-0 start response processing thread: Response-processor-2 start response processing thread: Response-processor-1 received message: {"interfaces": "interface=com.study.rpc.test.producer.HelloService& method=sayHello meter=com.study.rpc.test.producer.TestBean", "requestId": "1", "parameter": {"com.study.rpc.test.producer.TestBean": {"age": 20 "name": "Zhang San"}} respond to the client: {"interfaceMethodIdentify": "interface=com.study.rpc.test.producer.HelloService& method=sayHello meter=com.study.rpc.test.producer.TestBean", "requestId": "1", "result": "\" awesome. I received a message: TestBean {name=' Zhang San', age=20}\ ""}.
Consumers get the following logs:
Zookeeper Client initialization completed. Start to establish a connection: 192.168.16.7, 50071 wait for the connection to succeed. Start the response processing thread: Response-processor-1 starts the response processing thread: Response-processor-0 starts the response processing thread: Response-processor-2 connection succeeded: ChannelHandlerContext (NettyClient$NettyClientHandler#0, [id: 0xb7a59701, Lpura ChannelHandlerContext: 192.168.16.7 ChannelHandlerContext: 192.168.16.7)) send the JSON to the server as: {"interfaces": "interface=com.study.rpc.test.producer.HelloService& method=sayHello meter=com.study.rpc.test.producer.TestBean" "requestId": "1", "parameter": {"com.study.rpc.test.producer.TestBean": {"age": 20, "name": "Zhang San"} received a response: RpcResponse {result=' "awesome, I received a message: TestBean {name=' Zhang San', age=20}", interfaceMethodIdentify='interface=com.study.rpc.test.producer.HelloService& method=sayHello meter=com.study.rpc.test.producer.TestBean', requestId='1'} sayHello (TestBean) result: impressive I received the message: TestBean {name=' Zhang San', age=20} Thank you for your reading The above is the content of "how to write a RPC framework". After the study of this article, I believe you have a deeper understanding of how to write a RPC framework, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.