Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to understand Tars-Java client

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/03 Report--

This article introduces the relevant knowledge of "how to understand the Tars-Java client". Many people will encounter such a dilemma in the operation of actual cases, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!

A brief introduction to the basic RPC framework

In distributed computing, remote procedure call (Remote Procedure Call, abbreviated RPC) allows a program running on one computer to call another address space computer program, just like calling a local program, there is no need to program the proxy object construction, network protocol and so on involved in this interaction.

The general RPC architecture has at least three structures, namely, registry, service provider and service consumer. As shown in figure 1.1, the registry provides registration services and notification services of registration information changes, service providers run on the server to provide services, and service consumers use the services of service providers.

The service provider (RPC Server), which runs on the server side, provides service interface definitions and service implementation classes, and exposes service interfaces. The registry (Registry), which runs on the server side, is responsible for recording the service objects of the service provider and providing remote service information query service and change notification service. The service consumer (RPC Client), which runs on the client side, invokes the remote service through the remote proxy object.

(basic structure of RPC framework)

1.1 RPC call process

As shown in the following figure, the call flow of RPC is described, where IDL (Interface Description Language) is the interface description language, so that programs running on different platforms and programs written in different languages can communicate with each other.

(Tars Java initialization process)

1) create a CommunicatorConfig configuration item named communicatorConfig with locator, moduleName, connections and other parameters set as needed.

2) through the above CommunicatorConfig configuration item, named config, then call CommunicatorFactory.getInstance (). GetCommunicator (config) to create a Communicator object named communicator.

3) suppose objectName= "MESSAGE.ControlCenter.Dispatcher", the proxy interface that needs to be generated is Dispatcher.class, and the communicator.stringToProxy (objectName, Dispatcher.class) method is called to generate the implementation class of the proxy object.

4) in the stringToProxy () method, first initialize the QueryHelper proxy object, call the getServerNodes () method to get the list of remote service objects, and set the return value to the objectName field of communicatorConfig. For a specific code analysis of the proxy object, see the "2.3 proxy generation" section below.

5) determine whether the LoadBalance parameter is set before calling stringToProxy, and if not, the default DefaultLoadBalance object using the RR rotation algorithm is generated.

6) create a TarsProtocolInvoker protocol call object, in which the process is to obtain the URL list by parsing objectName and simpleObjectName in communicatorConfig, where a URL corresponds to a remote service object, TarsProtocolInvoker initializes the ServantClient object corresponding to each URL, and one URL confirms how many ServantClient objects are generated according to the connections configuration item of the communicatorConfig. Then initialize the TarsInvoker object with parameters such as ServantClients, and set these collections of TarsInvoker objects to the allInvokers member variables of the TarsProtocolInvoker, where each URL corresponds to a TarsInvoker object. The above analysis shows that a remote service node corresponds to a TarsInvoker object, a TarsInvoker object contains connections ServantClient objects, and for TCP protocol, it is a ServantClient object corresponding to a TCP connection.

7) use api, objName, servantProxyConfig,loadBalance,protocolInvoker, this.communicator parameters to generate an ObjectProxy object that implements the JDK proxy interface InvocationHandler.

8) when the ObjectProxy object is generated and initialized at the same time, the loadBalancer.refresh () method will be executed to refresh the remote service node to the load balancer to facilitate routing by subsequent tars remote calls.

9) then register the statistical information reporting device, in which the reporting method uses JDK's ScheduledThreadPoolExecutor for regular rotation training and reporting.

10) the technical method used for registering the service list refresher is basically the same as that of the above statistical information reporter.

2.2 examples of use

The following code is the simplest example. The configuration in CommunicatorConfig uses the default value. After communicator is generated through CommunicatorConfig configuration, it directly specifies the specific service object name, IP and port of the remote service object to generate a remote service proxy object.

The Tars Java code uses the example / / to initialize the basic Tars configuration CommunicatorConfig cfg = new CommunicatorConfig (); / / generate a Communicator object from the CommunicatorConfig configuration above. Communicator communicator = CommunicatorFactory.getInstance () .getCommunicator (cfg); / / specify the service object name, IP, and port of the Tars remote service to generate a remote service proxy object.

/ / first initialize the basic Tars configuration CommunicatorConfig cfg = new CommunicatorConfig (); / / generate a Communicator object from the above CommunicatorConfig configuration. Communicator communicator = CommunicatorFactory.getInstance () .getCommunicator (cfg); / / specify the service object name, IP, and port of the Tars remote service to generate a remote service proxy object. HelloPrx proxy = communicator.stringToProxy (HelloPrx.class, "TestApp.HelloServer.HelloObj@tcp-h 127.0.0.1-p 18601-t 60000"); / / synchronous calls blocking until the remote service object's method returns the result String ret = proxy.hello (3000, "Hello World"); System.out.println (ret) / / Asynchronous call, regardless of the final situation of asynchronous call proxy.async_hello (null, 3000, "Hello World"); / / Asynchronous call, register a receipt processing object that implements the TarsAbstractCallback API. The implementation class handles successful call, call timeout and call exception, respectively. Proxy.async_hello (new HelloPrxCallback () {@ Override public void callback_expired () {/ / timeout event handling} @ Override public void callback_exception (Throwable ex) {/ / exception event handling} @ Override public void callback_hello (String ret) {/ / call successful event handling Main.logger.info ("invoke async method successfully {}") Ret) }, 1000, "Hello World")

In the above example, two common invocation methods are demonstrated, synchronous invocation and asynchronous invocation. In the asynchronous call, if the caller wants to capture the final result of the asynchronous call, he can register an implementation class that implements the TarsAbstractCallback interface to handle the exception, timeout and success events of the tars call.

2.3 Agent generation

The remote proxy object of the client stub module of Tars Java adopts the JDK native Proxy method. As shown in the source code below, ObjectProxy implements the interface method of java.lang.reflect.InvocationHandler, which is a proxy interface that comes with JDK.

Agent implementation

Public final class ObjectProxy implements ServantProxy, InvocationHandler {public Object invoke (Object proxy, Method method, Object [] args) throws Throwable {String methodName = method.getName (); Class [] parameterTypes = method.getParameterTypes (); InvokeContext context = this.protocolInvoker.createContext (proxy, method, args); try {if ("toString" .equals (methodName) & & parameterTypes.length = 0) {return this.toString () } else if / / * omit the code *} else {/ / select a remote call class in the load balancer to encapsulate the application layer protocol, and finally call the TCP transport layer to send. Invoker invoker = this.loadBalancer.select (context); return invoker.invoke (context);}} catch (Throwable var8) {/ / * omit code *}

Of course, the generation of the above remote service proxy class involves auxiliary classes. Tars Java uses ServantProxyFactory to generate the above ObjectProxy and stores the ObjectProxy object to the Map structure, which is convenient for the caller to reuse the existing remote service proxy object directly.

The specific related logic is shown in the source code. ObjectProxyFactory is the auxiliary factory class for generating ObjectProxy. Unlike ServantProxyFactory, it does not cache the generated proxy object itself.

Class ServantProxyFactory {private final ConcurrentHashMap cache = new ConcurrentHashMap (); / / * omit the code * public Object getServantProxy (Class clazz, String objName, ServantProxyConfig servantProxyConfig, LoadBalance loadBalance, ProtocolInvoker protocolInvoker) {Object proxy = this.cache.get (objName); if (proxy = = null) {this.lock.lock (); / / lock to ensure that only one remote service proxy object is generated. Try {proxy = this.cache.get (objName); if (proxy = = null) {/ / create an object that implements JDK's java.lang.reflect.InvocationHandler interface ObjectProxy objectProxy = this.communicator.getObjectProxyFactory (). GetObjectProxy (clazz, objName, servantProxyConfig, loadBalance, protocolInvoker) / / use the java.lang.reflect.Proxy of JDK to generate the actual proxy object this.cache.putIfAbsent (objName, this.createProxy (clazz, objectProxy); proxy = this.cache.get (objName);}} finally {this.lock.unlock () }} return proxy;} / * * generate proxy object * / private Object createProxy (Class clazz, ObjectProxy objectProxy) {return Proxy.newProxyInstance (Thread.currentThread (). GetContextClassLoader (), new Class [] {clazz, ServantProxy.class}, objectProxy) using the Proxy.newProxyInstance included with JDK;} / * omit the code *}

From the source code above, you can see that createProxy uses JDK's Proxy.newProxyInstance method to generate remote service proxy objects.

2.4 remote service addressing method

As a RPC remote framework, invoking remote services in distributed systems involves the problem of how to route, that is, how to select a service node from multiple remote service nodes to invoke. Of course, Tars Java supports directly connecting specific nodes to invoke remote services, as described in the 2.2 usage example above.

As shown in the figure below, a call at some point in ClientA uses the Service3 node to make a remote service call, while a call to ClientB at some point uses the Service2 node. Tars Java provides a variety of load balancing algorithm implementation classes, including RoundRobinLoadBalance using RR rotation training algorithm, ConsistentHashLoadBalance of consistent hash algorithm and HashLoadBalance of ordinary hash algorithm.

(the client invokes the remote service according to specific routing rules)

As shown in the following source code, if you want to customize the load balancer to define the routing rules for remote calls, you need to implement the com.qq.tars.rpc.common.LoadBalance interface, where the LoadBalance.select () method is responsible for selecting the corresponding Invoker object according to the routing rules, and then making the remote call. For more information, please see the source code proxy implementation. Because the remote service node may change, for example, the remote service node needs to refresh the routing information of the local load balancer, the logic of this information update is implemented in the LoadBalance.refresh () method.

Load balancing interface

Public interface LoadBalance {/ * * selects invoker * / Invoker select (InvokeContext invokeContext) throws NoInvokerException; / * * notifies invoker list updates * / void refresh (Collection invokers) according to load balancing policy;} 2.5 Network Model

Tars Java's IO mode adopts JDK's NIO's Selector mode. Here, TCP protocol is used to describe network processing. As shown in the following source code, Reactor is a thread in which the run () method calls the selector.select () method, which means that unless an event is generated in the network at this time, the thread will remain blocked.

If a network event occurs at this time, the thread will be awakened to execute the subsequent code, one of which is dispatcheEvent (key), that is, the event will be distributed.

According to the corresponding conditions, the acceptor.handleConnectEvent (key) method is called to handle the client connection success event, or the acceptor.handleAcceptEvent (key) method is used to handle the server accepting the connection success event, or the acceptor.handleReadEvent (key) method is called to read data from Socket, or the acceptor.handleWriteEvent (key) method is used to write data to Socket.

Reactor event handling

Public final class Reactor extends Thread {protected volatile Selector selector = null; private Acceptor acceptor = null; / / * omit the code * public void run () {try {while (! Thread.interrupted ()) {/ / block until a network event occurs. Selector.select (); / / * omit the code * while (iter.hasNext ()) {SelectionKey key = iter.next (); iter.remove (); if (! key.isValid ()) continue Try {/ / * omit Code * / distribute Transport layer Protocol TCP or UDP Network event dispatchEvent (key) / / * omit code *} / / * omit code *} / / * omit code * private void dispatchEvent (final SelectionKey key) throws IOException {if (key.isConnectable ()) {acceptor.handleConnectEvent (key) } else if (key.isAcceptable ()) {acceptor.handleAcceptEvent (key);} else if (key.isReadable ()) {acceptor.handleReadEvent (key);} else if (key.isValid () & & key.isWritable ()) {acceptor.handleWriteEvent (key);}

Network processing adopts Reactor event-driven mode. Tars defines a Reactor object corresponding to a Selector object. Two Reactor objects are created by default for each remote service (overall service cluster, not a single node program). The specific number of Reactor objects created by a remote service is determined by modifying the value of the JVM startup parameter com.qq.tars.net.client.selectorPoolSize.

(Tars-Java 's network event handling model)

The thread pool in the figure above that handles the read IO event (Read Event) implementation and write IO event (Write Event) is configured when the Communicator is initialized. The specific logic is shown in the source code, where the thread pool parameter configuration is determined by the corePoolSize, maxPoolSize, keepAliveTime and other parameters of CommunicatorConfig.

Read-write event thread pool initialization

Private void initCommunicator (CommunicatorConfig config) throws CommunicatorConfigException {/ / * omitted code * this.threadPoolExecutor = ClientPoolManager.getClientThreadPoolExecutor (config); / / * omitted code *} public class ClientPoolManager {public static ThreadPoolExecutor getClientThreadPoolExecutor (CommunicatorConfig communicatorConfig) {/ / * omitted code * clientThreadPoolMap.put (communicatorConfig, createThreadPool (communicatorConfig)) / / * omit the code * return clientPoolExecutor;} private static ThreadPoolExecutor createThreadPool (CommunicatorConfig communicatorConfig) {int corePoolSize = communicatorConfig.getCorePoolSize (); int maxPoolSize = communicatorConfig.getMaxPoolSize (); int keepAliveTime = communicatorConfig.getKeepAliveTime (); int queueSize = communicatorConfig.getQueueSize (); TaskQueue taskqueue = new TaskQueue (queueSize); String namePrefix = "tars-client-executor-" TaskThreadPoolExecutor executor = new TaskThreadPoolExecutor (corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, taskqueue, new TaskThreadFactory (namePrefix)); taskqueue.setParent (executor); return executor;}} 2.6 remote call interaction model

If you call the method of the proxy class, you will enter the invoke method in the ObjectProxy that implements the InvocationHandler interface.

The following figure describes the process of a remote service invocation. Here we focus on a few points, one is how to write data to the network IO. The second is how Tars Java makes synchronous or asynchronous calls, and what technology is used at the bottom.

(the underlying code writes the IO process)

The specific Reactor logic is shown in the network model 2.5 above. If the Reactor check condition finds that IO can be written, that is, key.isWritable () is true, then the ByteBuffer object will be fetched from the TCPSession.queue in a loop, and SocketChannel.write (byteBuffer) will be called to perform the actual write network Socket operation. For the code logic, please see the doWrite () method in the source code.

Read-write event thread pool initialization

Public class TCPSession extends Session {public void write (Request request) throws IOException {try {IoBuffer buffer = selectorManager.getProtocolFactory () .getEncoder () .encodeRequest (request, this); write (buffer) / / * omit code *} protected void write (IoBuffer buffer) throws IOException {/ / * omit code * if (! this.queue.offer (buffer.buf () {throw new IOException ("The session queue is full. [queue size: "> 2.6.2 low-level technical implementation of synchronous and asynchronous calls

For synchronous method calls, as shown in figure (remote call flow) and source code (synchronous calls of ServantClient), ServantClient calls the underlying network write operation and creates a Ticket object in the invokeWithSync method. Ticket, as its name implies, means a ticket, which uniquely identifies this network call.

Synchronous call of ServantClient

Public class ServantClient {public T invokeWithSync (ServantRequest request) throws IOException {/ / * omit the code * ticket = TicketManager.createTicket (request, session, this.syncTimeout); Session current = session; current.write (request) If (! ticket.await (this.syncTimeout, TimeUnit.MILLISECONDS)) {/ / * omit code * response = ticket.response (); / / * omit code * return response; / / * omit code * return response;}}

As shown in the code, the session.write () operation is followed by the execution of the ticket.await () method, which waits until the remote service returns the result to the client, and after ticket.await () is awakened, the subsequent operation is performed, and the invokeWithSync method returns the response object. Among them, the wait and wake function of Ticket is realized by java.util.concurrent.CountDownLatch.

For asynchronous method calls, the ServantClient.invokeWithAsync method will be executed, a Ticket will also be created, and the Session.write () operation will be performed, although ticket.await () will not be called, but when the Reactor receives a remote reply, it will first parse the Tars protocol header to get the Response object, and then put the Response object into the IO read and write thread pool shown in the figure (Tars-Java 's network event handling model) for further processing. As shown in the following source code (asynchronous callback event handling), the WorkThread.run () method is finally called to execute ticket.notifyResponse (resp) in the run () method, which executes a method similar to the successful callback of the call to the TarsAbstractCallback interface in the above code.

Asynchronous callback event handling

Public final class WorkThread implements Runnable {public void run () {try {/ / * omit code * Ticket ticket = TicketManager.getTicket (resp.getTicketNumber ()); / / * omit code * ticket.notifyResponse (resp); ticket.countDown () TicketManager.removeTicket (ticket.getTicketNumber ());} / / * omit code *}}

As shown in the following source code, TicketManager will have a scheduled task rotation to check whether all calls are timed out. If (currentTime-t.startTime) > t.timeout condition holds, t.expired () will be called to inform the callback object that this call has timed out.

Call timeout event handling

Public class TicketManager {/ / * omit code * static {executor.scheduleAtFixedRate (new Runnable () {long currentTime =-1; public void run () {Collection)

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report