In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-30 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article introduces the relevant knowledge of "what is the TARS C++ client". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
What is TARS?
TARS is a micro-service development framework that has been used by Tencent for ten years and currently supports C++, Java, PHP, Node.js and Go languages. The open source project provides users with a complete set of micro-service platform PaaS solutions related to development, operation and maintenance, and testing, which helps a product or service to develop, deploy, test and launch quickly. At present, the framework is used in the core business of Tencent, and the scale of service nodes deployed and run based on this framework is up to hundreds of thousands. The communication model of TARS includes client and server. The communication between client and server is mainly carried out by using RPC. This series of articles is divided into two parts, which parse the source code of the RPC call part. This article is the first part, we will take C++ language as the carrier, take you to understand the client of TARS.
First acquaintance of the client
The most important class on the client of TARS is Communicator. A client can only declare one instance of Communicator class. Users can get thread-safe Communicator class singletons through CommunicatorPtr& Application::getCommunicator (). The Communicator class aggregates two more important classes, one is CommunicatorEpoll, which is responsible for the establishment of network threads and the generation of ObjectProxy; through ObjectProxyFactory, and the other is ServantProxyFactory, which generates different RPC service handles, namely ServantProxy, and users call RPC services through ServantProxy. Here is a brief description of the role of several classes.
Communicator
A Communicator instance is a client, which is responsible for establishing a connection with the server and generating a RPC service handle. You can get the Communicator instance through CommunicatorPtr& Application::getCommunicator (). Finally, users do not declare and define a new Communicator instance themselves.
ServantProxy and ServantProxyFactory
ServantProxy is a service proxy, and ServantProxy can be generated through the ServantProxyFactory factory class. Users often call the ServantPrx::element_type* getServantProxy () interface of ServantProxyFactory indirectly through the template void stringToProxy () interface of Communicator to obtain the service proxy. Through the service proxy ServantProxy, users can make RPC calls. ServantProxy contains multiple service entities ObjectProxy (see dot 4 below), which can help users to load balance within the same service proxy.
CommunicatorEpoll
The CommunicatorEpoll class represents the network module of the client, and contains TC_Epoller as IO reuse, which can handle multiple requests from different main threads (caller threads) at the same time. CommunicatorEpoll contains the service entity factory class ObjectProxyFactory (see dot 4 below), which means that entities that can produce different services can make different RPC service calls in the same network thread. CommunicatorEpoll also aggregates the asynchronous call processing thread AsyncProcThread, which is responsible for handing over the response packet to that thread after receiving the asynchronous response packet.
ObjectProxy and ObjectProxyFactory
The ObjectProxy class is a service entity, which is different from the ServantProxy class. The former represents a service entity An on a network thread, and the latter represents a general proxy for a service entity An on all network threads. A more detailed description can be found below. ObjectProxy is generated through ObjectProxyFactory, and the instance of the ObjectProxyFactory class is a member variable of CommunicatorEpoll, which means that a network thread CommunicatorEpoll can generate a variety of service entities ObjectProxy and initiate different RPC services. ObjectProxy manages the connection to the server through AdapterProxy. Well, after introducing all the classes, let's first sort out the relationship between them through the class diagram, which will appear again in a later article.
The most important class on the client of TARS is Communicator. A client can only declare one instance of Communicator class. Users can get thread-safe Communicator class singletons through CommunicatorPtr& Application::getCommunicator (). The Communicator class aggregates two more important classes, one is CommunicatorEpoll, which is responsible for the establishment of network threads and the generation of ObjectProxy; through ObjectProxyFactory, and the other is ServantProxyFactory, which generates different RPC service handles, namely ServantProxy, and users call RPC services through ServantProxy. According to the user configuration, Communicator has n network threads, that is, n CommunicatorEpoll. Each CommunicatorEpoll has an ObjectProxyFactory class, and each ObjectProxyFactory can generate a series of different service entity objects ObjectProxy, so if Communicator has two CommunicatorEpoll and has two different service entity objects, foo and bar, then, as shown in the following figure (1-2), each CommunicatorEpoll can create two types of ObjectProxy through ObjectProxyFactory. This is the first layer load balancing of the TARS client, and each thread can share the RPC requests for all services. Blocking of one service may affect other services because network threads are shared by multiple service entities ObjectProxy.
Another important function of the ServantProxyFactory class under the Communicator class is to generate a ServantProxy handle based on the information of the actual server (such as the socket flag of the server) and the information of the client in the Communicator (such as the number of network threads), and call the RPC service through the handle. For example, as shown in the following figure (1-3), when a Communicator instance constructs a fooServantProxy handle through the getServantProxy () interface of the ServantProxyFactory member variable, it gets the fooObjectProxy (that is, fooObjectProxy-1 and fooObjectProxy-2) from all the CommunicatorEpoll (that is, CommunicatorEpoll-1 and CommunicatorEpoll-2) under the Communicator instance and uses it as a parameter for constructing fooServantProxy. Communicator can obtain the corresponding aggregation relationship between foo and bar ServantProxy,ServantProxy and the corresponding ObjectProxy through ServantProxyFactory:
In addition, each ObjectProxy has an EndpointManager, for example, fooObjectProxy's EndpointManager manages all the fooAdapterProxy under the fooObjectProxy, and each AdapterProxy is connected to a server-side physical machine socket that provides the corresponding foo service. The connection AdapterProxy can also be obtained in different load balancing ways through EndpointManager. If the foo service has two physical machines and the bar service has one physical machine, the relationship between ObjectProxy,EndpointManager and AdapterProxy is shown in figure 1-4 below. As mentioned above, different network threads CommunicatorEpoll can initiate the same RPC request. For the same RPC service, selecting different ObjectProxy (or you can consider selecting different network thread CommunicatorEpoll) is the first layer of load balancer, while for the same selected ObjectProxy, selecting different socket connection AdapterProxy through EndpointManager (if the ObjectProxy has more than one AdapterProxy, such as the fooObjectProxy in figure (1-4)) is the second layer load balance.
When the client initializes, the relationship described above must be established, so the corresponding class diagram is shown in figure (1-5). Through the class diagram, you can see all kinds of relationships and initialize the functions needed.
Initialization code
Now, through the code trace, see how each class is initialized and the above architectural relationship is established during the client initialization process. Before a brief description, you can first take a look at the function call flow chart. If you can't see it clearly, you can save the picture and enlarge it with the picture software. It is strongly recommended to view the TARS source code together with the code parsing of the article, as is the case with all the code flow charts at the back of the article. Next, we will analyze how the client agent is initialized step by step according to the function call flow chart:
1. Execute stringToProxy
In the client program, the following code is executed to initialize the entire client agent:
Communicator comm;HelloPrx prx;comm.stringToProxy ("TestApp.HelloServer.HelloObj@tcp-h 1.1.1.1-p 20001", prx)
First declare a Communicator variable comm (which is not recommended) and a pointer variable prx of the ServantProxy class, where the service is Hello, so declare a HelloPrx prx. Note that a client can only have one Communicator. In order to get the service handle of RPC, we call Communicator::stringToProxy () and pass in the information of the server and the prx variable. When the function returns, prx is the handle of the RPC service. In the Communicator::stringToProxy () function, we use Communicator::getServantProxy () to get the service proxy ServantProxy based on objectName and setName:
/ * the generation agent * @ param T* @ param objectName* @ param setName specifies the setid* @ param proxy*/template void stringToProxy (const string& objectName, T & proxy,const string& setName= "") {ServantProxy * pServantProxy = getServantProxy (objectName,setName) of the set call; proxy = (typename objectName,setName) (pServantProxy);} 2. Execute the initialization function of Communicator
When you enter Communicator::getServantProxy (), you will first execute Communicator::initialize () to initialize Communicator. Note that Communicator::initialize () will only be executed once, and the next execution of Communicator::getServantProxy () will not execute the Communicator::initialize () function again:
Void Communicator::initialize () {TC_LockT lock (* this); if if (_ initialized) / / has been initialized, return return;.}
Enter the Communicator::initialize () function, where you will new the number of network threads with ServantProxyFactory and n CommunicatorEpoll,n as clients, which are closely related to Communicator described above, with a minimum of 1 and a maximum of MAX_CLIENT_THREAD_NUM:
Void Communicator::initialize () {. _ servantProxyFactory = new ServantProxyFactory (this);. For (size_t I = 0; I
< _clientThreadNum; ++i) { _communicatorEpoll[i] = new CommunicatorEpoll(this, i); _communicatorEpoll[i]->Start (); / / start the network thread}.}
In the constructor of CommunicatorEpoll, ObjectProxyFactory is created, which is the premise of constructing the relationship between diagrams (1-2). In addition, you can also see the processing thread after getting the appropriate configuration and creating and starting several asynchronous callbacks. After the creation is complete, call CommunicatorEpoll::start () to start the network thread. So far, Communicator::initialize () has been executed smoothly. Review the above process through the following figure:
3. Try to get ServantProxy
When the code goes back to Communicator::getServantProxy (), Communicator::getServantProxy () executes ServantProxyFactory::getServantProxy () and returns the corresponding service proxy:
ServantProxy* Communicator::getServantProxy (const string& objectName, const string& setName) {. Return _ servantProxyFactory- > getServantProxy (objectName,setName);}
When you enter ServantProxyFactory::getServantProxy (), you will first add a lock, look for the target from map _ servantProxy, and return directly if the search is successful. If the lookup fails, TARS needs to construct the corresponding ServantProxy,ServantProxy that needs the corresponding ObjectProxy as the parameter of the constructor as shown in figure (1-3). Thus, in ServantProxyFactory::getServantProxy (), there is the following code to get the ObjectProxy pointer array:
ObjectProxy * * ppObjectProxy = new ObjectProxy * [_ comm- > getClientThreadNum ()]; assert (ppObjectProxy! = NULL); for (size_t I = 0; I
< _comm->GetClientThreadNum (); + + I) {ppObjectProxy [I] = _ comm- > getCommunicatorEpoll (I)-> getObjectProxy (name, setName);} 4. Get ObjectProxy
When the code comes to ObjectProxyFactory::getObjectProxy (), again, it will first lock it, and then look in map _ objectProxys to see if it already has the target ObjectProxy, and return directly if the search succeeds. If the search fails, you need to create a new ObjectProxy. As you can see from the class diagram, ObjectProxy needs a CommunicatorEpoll object to initialize. After associating and managing your own CommunicatorEpoll,CommunicatorEpoll, you can obtain your own ObjectProxy through the getObjectProxy () API. The detailed process can be seen in the following figure:
5. Establish the relationship between ObjectProxy and AdapterProxy
The process of creating a new ObjectProxy is also noteworthy. In ObjectProxy::ObjectProxy (), the key code is:
_ endpointManger.reset (new EndpointManager (this, _ communicatorEpoll- > getCommunicator (), sObjectProxyName, pCommunicatorEpoll- > isFirstNetThread (), setName))
Each ObjectProxy has its own EndpointManager responsible for managing all socket connection AdapterProxy to the server, and each AdapterProxy is connected to a server physical machine socket that provides the corresponding service. The socket connection AdapterProxy to the server can also be obtained through EndpointManager in different load balancing ways. ObjectProxy:: ObjectProxy () is the slight 1 in figures (1-6) or figures (1-8), and the specific code flow is shown in figures (1-9). ObjectProxy creates an EndpointManager object. In the initialization process of EndpointManager, according to the information provided by the client, directly create the TCP/UDP connection AdapterProxy connected to the server physical machine, or obtain the server physical machine socket list from the agent and then create the TCP/UDP connection AdapterProxy.
After the execution of the program flow in accordance with figures (1-9) is completed, the relationship between one ObjectProxy to multiple AdapterProxy is established as shown in figure (2-3). After you create a new ObjectProxy, you can call its ObjectProxy::initialize () function to initialize the ObjectProxy object. Of course, you need to insert the ObjectProxy object into the ObjectProxyFactory member variables _ objectProxys and _ vObjectProxys, so that you can return the ObjectProxy object directly next time.
6. Continue to complete the ServantProxy creation
Exit the layer-by-layer function call stack, and the code goes back to ServantProxyFactory::getServantProxy () again. At this point, ServantProxyFactory has obtained the corresponding ObjectProxy array ObjectProxy** ppObjectProxy, and then you can call:
ServantPrx sp = new ServantProxy (_ comm, ppObjectProxy, _ comm- > getClientThreadNum ())
Construct the ServantProxy. After the construction is completed, the relationship can be presented as shown in figure (2-1). As can be seen in the constructor of ServantProxy, ServantProxy is creating a new EndpointManagerThread variable, which is a class for external access to routing requests, and is a variety of solutions provided by TARS to solve cross-region calls and other problems for calling logic. At the same time, you can see:
For (size_t I = 0 position I)
< _objectProxyNum; ++i){ (*(_objectProxy + i))->SetServantProxy (this);}
The correlation between ServantProxy and ObjectProxy is established. The rest is to read the configuration file and get the corresponding information. After the construction of the ServantProxy variable is completed, ServantProxyFactory::getServantProxy () takes some timeout parameters, assigns them to the ServantProxy variable, and puts them into map _ servantProxy to facilitate direct search and acquisition next time. ServantProxyFactory::getServantProxy () returns the ServantProxy pointer variable you just constructed to the Communicator::getServantProxy () that calls him, in Communicator::getServantProxy ():
ServantProxy * Communicator::getServantProxy (const string& objectName,const string& setName) {. Return _ servantProxyFactory- > getServantProxy (objectName,setName);}
The return value is returned directly to Communicator::stringToProxy () that called Communicator::getServantProxy (). You can see:
Template void stringToProxy (const string& objectName, T & proxy,const string& setName= "") {ServantProxy * pServantProxy = getServantProxy (objectName,setName); proxy = (typename type*) (pServantProxy);}
Communicator::stringToProxy () casts the return value to the same type HelloPrx as HelloPrx prx in the client code. Because the function parameter proxy is a reference to prx. So the actual thing is that the handle prx has been initialized successfully, and the user can use the handle prx to make RPC calls.
Synchronous call
When we get a ServantProxy handle, we can make the RPC call. Tars provides four RPC invocation methods, namely synchronous invocation, asynchronous invocation, promise invocation and co-program invocation. The simplest and most common RPC invocation is synchronous invocation. Next, we will briefly analyze the synchronous invocation of Tars.
Currently, there is a MyDemo.StringServer.StringServantObj service that provides a RPC interface that is append. Pass in two variables of type string and return the stitching result of two variables of type string. And suppose there are two servers, and the socket identity is 192.168.106.129 and 192.168.106.130, respectively, and the number of network threads on the client is set to 3, then execute the following code:
Communicator _ comm;StringServantPrx _ proxy;_comm.stringToProxy ("MyDemo.StringServer.StringServantObj@tcp-h 192.168.106.129-p 34132", _ proxy); _ comm.stringToProxy ("MyDemo.StringServer.StringServantObj@tcp-h 192.168.106.130-p 34132", _ proxy)
From the above analysis and introduction of client initialization, we can get the diagram shown in the following figure:
After getting StringServantPrx _ proxy, call directly:
String str1 (abc-), str2 (defg), rStr;int retCode = _ proxy- > append (str1, str2, rStr)
After a successful RPC synchronous call, the result returned is rStr = "abc-defg".
Again, let's first look at the class diagram associated with synchronous calls, as shown in the following figure:
StringServantProxy inherits from ServantProxy. StringServantProxy provides the interface Int32 append () for RPC synchronous calls. When a user initiates a synchronous call _ proxy- > append (str1, str2, rStr), the function call process is shown in the following figure.
In the function StringServantProxy::append (), the program will first construct the parameters needed by ServantProxy::tars_invoke (), such as the request packet type, RPC method name, method parameters, etc. It is worth noting that there is a variable of type ResponsePacket in the passed parameters, and the final return result will be placed on this variable in the synchronous call. Then the ServantProxy::tars_invoke () method is called directly:
Tars_invoke (tars::TARSNORMAL, "append", _ os.getByteBuffer (), context, _ mStatus, rep)
In the ServantProxy::tars_invoke () method, first create a ReqMessage variable msg, initialize the msg variable, and assign values to the variable, such as the Tars version number, the packet type, the service name, the RPC method name, the context container of the Tars, the timeout of the synchronous call (in milliseconds), and so on. Finally, call ServantProxy::invoke () to make the remote method call.
ServantProxy::invoke () is a must for RPC calls, both synchronous and various asynchronous calls. In ServantProxy::invoke (), continue to populate the passed variable ReqMessage msg. In addition, you need to get the thread private data ServantProxyThreadData of the caller thread of the caller to guide the RPC call. Each caller thread on the client has its own thread private data that maintains the invocation context, such as hash properties and message coloring information. The most important thing is the bridge between each caller thread and each client network thread CommunicatorEpoll-the communication queue ReqInfoQueue array. Each ReqInfoQueue element in the array is responsible for interacting with a network thread, as shown in figure (1-13), where the orange shadow represents the array ReqInfoQueue [], and the cylinder in the shadow represents the array element ReqInfoQueue. If two client create threads (hereinafter referred to as caller threads) initiate RPC requests for StringServant service, and the number of client network threads is set to 2, then each caller thread has its own private data request queue array ReqInfoQueue []. The ReqInfoQueue element in the array is the communication bridge between the corresponding caller thread and the two network threads, and a network thread corresponds to an element in the array. The array is indexed through the network thread ID. The whole relationship is a bit like the producer-consumer model, where the producer allergen thread requests a packet from the nth element ReqInfoQueue [N] push in its own thread private data * * ReqInfoQueue [] * *, and the consumer client's nth network thread pop the packet from this queue.
Reading the code may find several constant values, such as MAX_CLIENT_THREAD_NUM=64, which is the maximum number of network threads and the maximum size of a single request queue array ReqInfoQueue [] in figure (1-13). MAX_CLIENT_NOTIFYEVENT_NUM=2048, in figure (1-13), can be regarded as the maximum number of caller threads, or the maximum number of request queue array ReqInfoQueue [] (anyway, each caller thread has its own thread private data ReqInfoQueue []).
Then load balance is carried out for the first time according to the thread private data of the caller thread-select ObjectProxy (that is, select the network thread CommunicatorEpoll) and the corresponding ReqInfoQueue:
ObjectProxy * pObjProxy = NULL;ReqInfoQueue * pReqQ = NULL;// Select Network Thread selectNetThreadInfo (pSptd, pObjProxy, pReqQ)
In ServantProxy::selectNetThreadInfo (), select ObjectProxy and ReqInfoQueue in the form of polling.
After exiting ServantProxy::selectNetThreadInfo (), you get the pObjProxy of type ObjectProxy_ and its corresponding ReqInfoQueue of type ReqInfoQueue_. Later, the RPC request is sent through pObjectProxy, and the request information is temporarily stored in the ReqInfoQueue.
Since it is a synchronous call, you need to create a new condition variable to listen to the completion of RPC. You can see:
/ call new a ReqMonitorassert (msg- > pMonitor = = NULL) synchronously; if (msg- > eType = = ReqMessage::SYNC_CALL) {msg- > bMonitorFin = false; if (pSptd- > _ sched) {msg- > bCoroFlag = true; msg- > sched = pSptd- > _ sched; msg- > iCoroId = pSptd- > _ sched- > getCoroutineId () } else {msg- > pMonitor = new ReqMonitor;}}
After creating the condition variable, request the packet msg to push_back () in ReqInfoQueue, and notify the CommunicatorEpoll to which pObjProxy belongs to send the data:
If (! pReqQ- > push_back (msg,bEmpty)) {TLOGERROR ("[TARS] [ServantProxy::invoke msgQueue push_back error num:" _ netSeq notify (pSptd- > _ reqQNo, pReqQ); throw TarsClientQueueException ("client queue full");} pObjProxy- > getCommunicatorEpoll ()-> notify (pSptd- > _ reqQNo, pReqQ)
When you come to CommunicatorEpoll::notify (), add the request event to the request event notification array NotifyInfo _ notify [], and notify CommunicatorEpoll to send the request packet. Note that this function simply notifies the network thread that it is ready to send data, triggers an EPOLLIN event through TC_Epoller::mod () or TC_Epoller::add (), which causes the network thread blocking in TC_Epoller::wait () (blocking in CommunicatorEpoll::run ()) to be awakened, and sets the union epoll_data variable in the awakened epoll_event to & _ notify[ iSeq] .stFDInfo:
Void CommunicatorEpoll::notify (size_t iSeq,ReqInfoQueue * msgQueue) {assert (iSeq)
< MAX_CLIENT_NOTIFYEVENT_NUM); if(_notify[iSeq].bValid) { _ep.mod(_notify[iSeq].notify.getfd(),(long long)&_notify[iSeq].stFDInfo, EPOLLIN); assert(_notify[iSeq].stFDInfo.p == (void*)msgQueue); } else { _notify[iSeq].stFDInfo.iType = FDInfo::ET_C_NOTIFY; _notify[iSeq].stFDInfo.p = (void*)msgQueue; _notify[iSeq].stFDInfo.fd = _notify[iSeq].eventFd; _notify[iSeq].stFDInfo.iSeq = iSeq; _notify[iSeq].notify.createSocket(); _notify[iSeq].bValid = true; _ep.add(_notify[iSeq].notify.getfd(),(long long)&_notify[iSeq].stFDInfo, EPOLLIN); }} 就是经过这么一个操作,网络线程就可以被唤醒,唤醒后通过epoll_event变量可获得&_notify[iSeq].stFDInfo。接下来的请求发送与响应的接收会在后面会详细介绍。 随后,代码再次回到ServantProxy::invoke(),阻塞于: if(!msg->BMonitorFin) {TC_ThreadLock::Lock lock (* (msg- > pMonitor)); / / wait until the network thread notifies if (! msg- > bMonitorFin) {msg- > pMonitor- > wait ();}}
Wait for the network thread to receive the data and wake it up. After receiving the response, check whether the response has been obtained successfully. If so, you can simply exit the function. The response information is in the parameter msg:
If (msg- > eStatus = = ReqMessage::REQ_RSP & & msg- > response.iRet = = TARSSERVERSUCCESS) {snprintf (pSptd- > _ szHost, sizeof (pSptd- > _ szHost), "% s", msg- > adapter- > endpoint (). Desc (). C_str (); / / successful return;}
If the reception fails, an exception is thrown and the msg is deleted:
TarsException::throwException (ret, os.str ())
If the reception is successful, after exiting ServantProxy::invoke (), go back to ServantProxy::tars_invoke (), get the response information of type ResponsePacket, and delete the msg packet:
Rsp = msg- > response;delete msg;msg = NULL
The code goes back to StringServantProxy::append (), and after a synchronous call, you can directly get the RPC return value and return it to the client.
The network thread sends the request
As mentioned above, when CommunicatorEpoll::notify () is called in ServantProxy::invoke () to notify the network thread to send the request, the specific execution flow of the network thread is shown in the following figure:
Because CommunicatorEpoll inherits from TC_Thread, its CommunicatorEpoll::start () function is called to start the network thread after initializing CommunicatorEpoll in the second dot in section 1.2.2 above, and the network thread waits for _ ep.wait (iTimeout) in CommunicatorEpoll::run (). Because in the description in the previous section, in CommunicatorEpoll::notify (), the caller thread initiates the notification notify, the network thread calls CommunicatorEpoll::handle () to process the notification in CommunicatorEpoll::run ():
Void CommunicatorEpoll::run () {. Try {int iTimeout = (_ waitTimeout
< _timeoutCheckInterval) ? _waitTimeout : _timeoutCheckInterval); int num = _ep.wait(iTimeout); if(_terminate) { break; } //先处理epoll的网络事件 for (int i = 0; i < num; ++i) { //获取epoll_event变量的data,就是1.3.1节中提过的&_notify[iSeq].stFDInfo const epoll_event& ev = _ep.get(i); uint64_t data = ev.data.u64; if(data == 0) { continue; //data非指针, 退出循环 } handle((FDInfo*)data, ev.events); } } ...... } 在CommunicatorEpoll::handle()中,通过传递进来的epoll_event中的data成员变量获取前面被选中的ObjectProxy并调用其ObjectProxy::invoke()函数: void CommunicatorEpoll::handle(FDInfo * pFDInfo, uint32_t events){ try { assert(pFDInfo != NULL); //队列有消息通知过来 if(FDInfo::ET_C_NOTIFY == pFDInfo->IType) {ReqInfoQueue* pInfoQueue= (ReqInfoQueue*) pFDInfo- > p; ReqMessage * msg = NULL; try {while (pInfoQueue- > pop_front (msg)) {. Try {msg- > pObjectProxy- > invoke (msg);}. }}. }.}
The second load balance will be carried out in ObjectProxy::invoke (). As shown in figure (1-4), each ObjectProxy can select AdapterProxy in different load balancing ways through EndpointManager:
Void ObjectProxy::invoke (ReqMessage * msg) {. / / Select an Adapter of a remote service to invoke AdapterProxy * pAdapterProxy = NULL; bool bFirst = _ endpointManger- > selectAdapterProxy (msg, pAdapterProxy);.}
In EndpointManager:: selectAdapterProxy (), there are a variety of load balancing ways to select AdapterProxy, such as getHashProxy (), getWeightedProxy (), getNextValidProxy () and so on.
After getting the AdapterProxy, assign the selected AdapterProxy to the reference parameter pAdapterProxy in the EndpointManager:: selectAdapterProxy () function, and then execute:
Void ObjectProxy::invoke (ReqMessage * msg) {. Msg- > adapter = pAdapterProxy; pAdapterProxy- > invoke (msg);}
Call pAdapterProxy to send the request information. In AdapterProxy::invoke (), AdapterProxy will call Transceiver::sendRequset () to send the request. At this point, the work of sending requests from the network thread corresponding to the synchronous call is over, and the network thread will go back to CommunicatorEpoll::run () and continue to wait for the data to be sent and received.
The network thread receives the response
When the network thread CommunicatorEpoll receives the response data, as before sending the request, in CommunicatorEpoll::run (), the program takes the variable of the active epoll_event and passes the epoll_data_t data to CommunicatorEpoll::handle ():
/ / deal with epoll's network event for (int I = 0; I) first
< num; ++i){ const epoll_event& ev = _ep.get(i); uint64_t data = ev.data.u64; if(data == 0) { continue; //data非指针, 退出循环 } handle((FDInfo*)data, ev.events);} 接下来的程序流程如下图所示: 在CommunicatorEpoll::handle()中,从epoll_data::data中获取Transceiver指针,并调用CommunicatorEpoll::handleInputImp(): Transceiver *pTransceiver = (Transceiver*)pFDInfo->If (events & EPOLLIN) {try {handleInputImp (pTransceiver);} catch (exception & e) {TLOGERROR ("[TARS] CommunicatorEpoll::handle exp:" put (msg- > iCoroId);} return;}
At this point, the work of receiving the response from the network thread corresponding to the synchronous call is over, and the network thread will return to CommunicatorEpoll::run () and continue to wait for the data to be sent and received. To sum up, the process of synchronous calls by the client is shown in the following figure.
Asynchronous invocation
In Tars, in addition to the most common synchronous calls, you can also make asynchronous calls. There are three types of asynchronous calls: ordinary asynchronous calls, promise asynchronous calls and co-program asynchronous calls. Here we briefly introduce ordinary asynchronous calls to see what similarities and differences are between them and synchronous calls described above.
The asynchronous call does not block the entire client program. After the call is completed (request sending), the user can continue to deal with other things. After receiving the response, Tars will execute the callback function implemented by the user in the asynchronous processing thread. Here, the "implementation of Template Method pattern by Non-Virtual Interface technique" described in clause 35 of "Effective C++" will be used. Users need to inherit a XXXServantPrxCallback base class and implement the virtual functions in it, and the asynchronous callback thread will call back these virtual functions after receiving the response package. The specific example of the asynchronous call client is not described in detail here, but the corresponding sample code will be found in the Example of Tars.
Initialization
The initialization of the client has been described in detail in the first chapter of this article. In Chapter 1, "1.2.2 initialization Code tracking-2. Execute the initialization function of Communicator", it has been mentioned that during the initialization of each network thread CommunicatorEpoll, a _ asyncThreadNum asynchronous thread will be created to process the response data while waiting for the asynchronous call:
CommunicatorEpoll::CommunicatorEpoll (Communicator * pCommunicator,size_t netThreadSeq) {. / / number of asynchronous threads _ asyncThreadNum = TC_Common::strto (pCommunicator- > getProperty ("asyncthread", "3")); if (_ asyncThreadNum = = 0) {_ asyncThreadNum = 3;} if (_ asyncThreadNum > MAX_CLIENT_ASYNCTHREAD_NUM) {_ asyncThreadNum = MAX_CLIENT_ASYNCTHREAD_NUM;}. / / Asynchronous queue size size_t iAsyncQueueCap = TC_Common::strto (pCommunicator- > getProperty ("asyncqueuecap", "10000")); if (iAsyncQueueCap)
< 10000) { iAsyncQueueCap = 10000; } ...... //创建异步线程 for(size_t i = 0; i < _asyncThreadNum; ++i) { _asyncThread[i] = new AsyncProcThread(iAsyncQueueCap); _asyncThread[i]->Start ();}.}
Before we start talking about asynchronous calls and receiving responses, take a look at the general invocation process and compare it with the synchronous invocation in figure (1-16).
As in the example of synchronous invocation, there is now a MyDemo.StringServer.StringServantObj service that provides a RPC interface that is append, passing in two variables of type string and returning the stitching result of two variables of type string. In the file generated by executing tars2cpp, the callback function base class StringServantPrxCallback is defined. Users need public to inherit this base class and implement their own methods, for example:
Class asyncClientCallback: public StringServantPrxCallback {public: void callback_append (Int32 ret, const string& rStr) {cout pMonitor- > notify () wakes up the client's caller thread to receive the response packet, while in asynchronous invocation, as shown in figure (1-19), the relationship between CommunicatorEpoll and AsyncProcThread is shown in figure (1-20).
In the function AdapterProxy::finishInvoke (ReqMessage*), the program passes:
/ / Asynchronous callback, put in the callback processing thread _ objectProxy- > getCommunicatorEpoll ()-> pushAsyncThreadQueue (msg)
Put the packet msg (with response information) in the asynchronous callback processing thread. In CommunicatorEpoll::pushAsyncThreadQueue (), select the asynchronous callback processing thread to process the received response packets by polling. The default number of asynchronous processing threads is 3, and the maximum is 1024.
Void CommunicatorEpoll::pushAsyncThreadQueue (ReqMessage * msg) {/ / ignore the inconsistent number of queues per thread _ asyncThread [_ asyncSeq]-> push_back (msg); _ asyncSeq + +; if (_ asyncSeq = = _ asyncThreadNum) {_ asyncSeq = 0;}}
After it is selected, the msg packet is placed in the response packet queue AsyncProcThread::_msgQueue through AsyncProcThread::push_back (), and then the asynchronous callback processing thread is notified by the AsyncProcThread:: notify () function to process it. The AsyncProcThread:: notify () function can wake up the asynchronous processing thread of AsyncProcThread::timedWait () blocking in AsyncProcThread:: run ().
In AsyncProcThread::run (), you mainly execute the following program to make a function callback:
If (_ msgQueue- > pop_front (msg)) {. Try {ReqMessagePtr msgPtr = msg; msg- > callback- > onDispatch (msgPtr);} catch (exception& e) {TLOGERROR ("[TARS] [AsyncProcThread exception]:"
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.