Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use RabbitMQ to implement RPC

2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

This article is about how to use RabbitMQ to implement RPC. The editor thinks it is very practical, so share it with you as a reference and follow the editor to have a look.

Background knowledge

RabbitMQ

RabbitMQ is a message queue (Message Queue) based on AMQP protocol, and Message Queue is a typical producer / consumer model. Producers release news, consumers consume news, producers and consumers are decoupled and do not know the existence of each other.

RPC

Remote Procedure Call: remote procedure call, the process of a remote procedure call is that the client sends a request to the server, the server processes the request and returns the response information, and the client ends after receiving the response information.

How to use RabbitMQ to implement RPC?

Using RabbitMQ to implement RPC, the corresponding roles are the producer as the client and the consumer as the server.

But RPC calls are generally synchronous, and the client and server are tightly coupled. That is, the client connects to the server through the IP/ domain name and port, sends a request to the server and waits for the server to return the response information.

But the producers and consumers of MQ are completely decoupled, so how do you implement RPC with MQ? It is obvious that MQ is used as middleware to achieve a two-way message delivery:

The client and server are both producers and consumers. The client issues the request and consumes the response; the server consumes the request and publishes the response.

Concrete realization

Definition of the MQ section

Queue for requesting information

We need a queue to store the request information, the client issues the request information to the queue, and the server consumes the queue to process the request. The queue does not require complex routing rules, but simply uses RabbitMQ's default direct exchange to route messages.

Queues that respond to information

There should be more than one queue for response information. If there are multiple clients, there is no guarantee that the response information will be consumed by the client that issued the request. Therefore, a response queue should be created for each client, which should be created by the client and can only be used by this client and deleted after use. Here, you can use the exclusive queue (Exclusive Queue) provided by RabbitMQ:

Channel.queueDeclare (queue: ", durable:false, exclusive:true, autoDelete:false, new HashMap ())

And to ensure that the queue name is unique, setting the queue name to empty RabbitMQ when declaring the queue generates a unique queue name.

Setting exclusive to true declares an exclusive queue, which is characterized by being used only by the current connection and deleted after the connection is closed.

A simple demo (using pull mechanism)

We use a simple demo to understand the client-side and server-side processing flow.

Publish request

A small problem before writing code

We declare a unique response queue for each client when we declare the queue, so how does the server know which queue to publish the response to? In fact, the client needs to tell the server which queue to publish the response to. RabbitMQ provides this support. An attribute reply_to in the Properties of the message body is used to mark the name of the callback queue, and the server needs to publish the response to the callback queue specified by reply_to.

Once this problem is solved, we can write the code for the client to publish the request:

/ / define response callback queue String replyQueueName = channel.queueDeclare ("", false, true, false, new HashMap ()). GetQueue (); / / set callback queue to PropertiesAMQP.BasicProperties properties = new AMQP.BasicProperties.Builder () .replyTo (replyQueueName) .build (); String request = "request"; / / publish request channel.basicPublish ("", "rpc_queue", properties, request.getBytes ())

Direct reply-to:

RabbitMQ provides a more convenient mechanism to implement RPC. There is no need for the client to define a callback queue every time. The client sets the replyTo to amq.rabbitmq.reply-to when issuing the request, and specifies the consumption amq.rabbitmq.reply-to when consuming the response. RabbitMQ creates an internal queue for the client.

Consumption request

Next, the server processes the request. After receiving the request, the server publishes the response information to the callback queue specified by reply_to:

/ / definition of server Consumer public class RpcServer extends DefaultConsumer {public RpcServer (Channel channel) {super (channel);} @ Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException {String msg = new String (body); String response = (msg + "Received"); / / get callback queue name String replyTo = properties.getReplyTo () / / publish a response message to the callback queue this.getChannel (). BasicPublish ("", replyTo, new AMQP.BasicProperties (), response.getBytes ());}}. / / start server Consumerchannel.basicConsume ("rpc_queue", true, new RpcServer (channel))

Receive response

How does the client receive the response from the server? There are two ways: 1. Polled messages in the pull callback queue, 2. Consumes messages in the callback queue asynchronously. We simply implement the first scheme here.

GetResponse getResponse = null;while (getResponse = = null) {getResponse = channel.basicGet (replyQueueName, true);} String response = new String (getResponse.getBody ())

A simple RabbitMQ-based RPC model has been implemented, but this demo is not practical because the client polls and waits for a response message synchronously every time the request is sent, and can only process one request at a time. The efficiency of RabbitMQ's pull mode is also relatively low.

There is still a lot of work to be done to implement a fully available RPC pattern, and the key points to deal with are also complicated. There is a saying that do not repeat wheels. Spring has implemented a library of fully available RPC schemas. Let's take a look at it. By the way, I would like to recommend a Java architecture exchange learning group: 698581634, you can get Java architect information: there are Spring,MyBatis,Netty source code analysis, high concurrency, high performance, distributed, micro-service architecture principles, JVM performance optimization has become an essential knowledge system for architects, there must be information you need in the group, we hurry to join the group.

Implementation in Spring Rabbit

Corresponding to the above demo's pull pattern can only handle one request at a time: how to receive responses asynchronously and process multiple requests? The key point is that we need to record the request and response and associate them, which is also supported by RabbitMQ, and another property in Properties, correlation_id, is used to identify the unique id of a message.

Refer to the implementation of the convertSendAndReceive method in spring-rabbit to generate a unique correlation_id for each request:

Private final AtomicInteger messageTagProvider = new AtomicInteger ();... String messageTag = String.valueOf (this.messageTagProvider.incrementAndGet ());... message.getMessageProperties () .setCorrelationId (messageTag)

And use a ConcurrentHashMap to maintain the mapping of correlation_id and response information:

Private final Map replyHolder = new ConcurrentHashMap ();... final PendingReply pendingReply = new PendingReply (); this.replyHolder.put (correlationId, pendingReply)

There is a BlockingQueue in PendingReply that stores the response information. After sending the request information, call the pull method of BlockingQueue and set the timeout to get the response:

Private final BlockingQueue queue = new ArrayBlockingQueue (1)

Public Message get (long timeout, TimeUnit unit) throws InterruptedException {Object reply = this. Queue. Poll (timeout, unit); return reply = = null? Null: processReply (reply

);

}

After the response is obtained, regardless of the result, the PendingReply is removed from the replyHolder to prevent the backlog of timeout response messages in the replyHolder:

Try {reply = exchangeMessages (exchange, routingKey, message, correlationData, channel, pendingReply,messageTag);} finally {this.replyHolder.remove (messageTag);...}

When and how did the response information get into this BlockingQueue? Take a look at where RabbitTemplate receives messages:

Public void onMessage (Message message) {String messageTag; if (this.correlationKey = = null) {/ / using standard correlationId property messageTag = message.getMessageProperties (). GetCorrelationId ();} else {messageTag = (String) message.getMessageProperties () .getHeaders (). Get (this.correlationKey);} / / correlation_id is considered to be RPC response information, and if (messageTag = = null) {logger.error ("No correlation header in reply"); return is not processed if it does not exist. } / / take out PendingReply PendingReply pendingReply = this.replyHolder.get (messageTag) for correlation_id from replyHolder; if (pendingReply = = null) {if (logger.isWarnEnabled ()) {logger.warn ("Reply received after timeout for" + messageTag);} throw new AmqpRejectAndDontRequeueException ("Reply received after timeout");} else {restoreProperties (message, pendingReply); / / transfer response information add to pendingReply.reply (message) in BlockingQueue;}}

The above spring code hides a lot of extra processing and details, focusing only on the key parts.

At this point, a fully available RPC pattern implemented by RabbitMQ as middleware is complete.

Summary

Server side

The implementation of the server side is relatively simple, and the only difference from the general Consumer is that you need to reply the request to the queue specified by replyTo and carry the message ID correlation_id.

A little optimization on the server side:

Timeout is handled by the client, so is there anything that can be optimized on the server?

The answer is yes: if our server-side processing is time-consuming, how can we tell if the client is still waiting for a response?

We can use the passive parameter to check whether the queue of replyTo exists, because the client declares an internal queue, and if the client breaks the link, the queue no longer exists, and the server does not have to process the message.

Client

The client takes on more work, including:

Declare replyTo queues (it would be much easier to use amq.rabbitmq.reply-to)

Maintain request and response messages (associated with a unique correlation_id)

The return of the consumer server

Handle exceptions such as timeouts (use BlockingQueue to block fetches)

Fortunately, spring has implemented a complete and reliable code, and after we are clear about the process and key points, we can directly use the RabbitTemplate provided by spring without having to implement it ourselves.

The significance of using MQ to implement RPC

Implementing RPC through MQ seems a little more complicated than direct communication between the client and the server, so why are we doing this? Or what are the benefits of doing so:

Decouple the client from the server: the client simply issues a request to the MQ and consumes the response to the request. Regardless of who handles the request, the consumer of the request on the other side of the MQ can replace it with any server that can handle the request without affecting the client.

Reduce the pressure on the server: in the traditional RPC mode, if there are too many clients and requests, the pressure on the server will be too great. With MQ as the middleware, too many requests are digested by MQ, and the server can control the frequency of consumption requests without affecting the server.

The scale-out of the server is easier: if the processing capacity of the server can not meet the frequency of the request, you only need to increase the server to consume MQ messages, and MQ will help us achieve load balancing of message consumption.

It can be seen that RabbitMQ's support for RPC mode is also friendly.

Amq.rabbitmq.reply-to, reply_to, correlation_id and other features illustrate this point, and coupled with the implementation of spring-rabbit, we can easily use RPC calls in message queuing mode.

Thank you for reading! This is the end of this article on "how to use RabbitMQ to achieve RPC". I hope the above content can be of some help to you, so that you can learn more knowledge. if you think the article is good, you can share it out for more people to see!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report