Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to implement RPC remote invocation message queue by RabbitMQ

2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

This article mainly introduces RabbitMQ how to achieve RPC remote call message queue, has a certain reference value, interested friends can refer to, I hope you can learn a lot after reading this article, let the editor take you to know about it.

Client interface

We create a client class to illustrate how to use the RPC service, exposing a call method to send RPC requests and data to get the results.

FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient (); String result = fibonacciRpc.call ("4"); System.out.println ("fib (4) is" + result)

Although RPC is a common pattern in programming, it is often criticized. Because programmers often don't know whether the method called is a local method or a RPC method, this often adds unnecessary complexity to debugging. We should simplify the code instead of abusing RPC to make the code bloated.

Callback queue

Generally speaking, it is very simple to implement RPC through RabbitMQ. The client sends a request message and the server responds to the message. In order to receive the response, we send the "callback" queue address in the request, or we can use the default queue.

CallbackQueueName = channel.queueDeclare () .getQueue (); BasicProperties props = new BasicProperties .Builder () .replyTo (callbackQueueName) .build (); channel.basicPublish ("", "rpc_queue", props, message.getBytes ())

Fourteen message attributes are booked in the AMQP protocol, all of which are rarely used except the following:

DeliveryMode: identifies whether the message is persistent or transient.

ContentType: describes the encoding type of mime-type, such as JSON encoding as "application/json".

ReplyTo: usually used in callback queues.

CorrelationId: used when associating a RPC response in a request.

Associate Id (Correlation Id)

In the previous method, we asked to create a callback queue for each RPC request, which is really tedious, but fortunately we have a good way-to create a simple callback queue on each client.

Here comes the question: how does the queue know which request these responses are coming from? That's when correlationId came out. We set a unique value in each request so that when we receive a message in the callback queue, we know which request sent it. If an unknown correlationId is received, the message is discarded because it is not our request.

You might ask, why throw away unknown information instead of throwing out mistakes? This is caused by the server competing for resources. Although this is unlikely, imagine that if the RPC server dies after sending the response and before sending the reply message, restarting the RPC server will resend the request. This is how we handle repetitive responses gracefully on the client, and RPC should be equivalent.

(1) the client starts to create an anonymous and unique callback queue.

(2) for each RPC request, the client sends a message that contains two attributes, replyTo and correlationId.

(3) the request is sent to rpc_queue queue.

(4) the RPC service waits in the queue for the request, and when the request appears, it uses the queue to send the result to the client according to the replyTo field.

(5) the client waits for data in the callback queue. When the message appears, it checks the correlationId property and, if the value matches, returns the response result to the application.

Sample code

RPCServer.java

Package com.favccxx.favrabbit;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Channel;import com.rabbitmq.client.QueueingConsumer;import com.rabbitmq.client.AMQP.BasicProperties;public class RPCServer {private static final String RPC_QUEUE_NAME = "rpc_queue"; private static int fib (int n) {if (n = 0) return 0 If (n = 1) return 1; return fib (n-1) + fib (n-2);} public static void main (String [] argv) {Connection connection = null; Channel channel = null; try {ConnectionFactory factory = new ConnectionFactory () Factory.setHost ("localhost"); connection = factory.newConnection (); channel = connection.createChannel (); channel.queueDeclare (RPC_QUEUE_NAME, false, null); channel.basicQos (1); QueueingConsumer consumer = new QueueingConsumer (channel) Channel.basicConsume (RPC_QUEUE_NAME, false, consumer); System.out.println ("[x] Awaiting RPC requests"); while (true) {String response = null; QueueingConsumer.Delivery delivery = consumer.nextDelivery () BasicProperties props = delivery.getProperties (); BasicProperties replyProps = new BasicProperties.Builder () .correlationId (props.getCorrelationId ()) .build () Try {String message = new String (delivery.getBody (), "UTF-8"); int n = Integer.parseInt (message); System.out.println ("[.] Fib ("+ message +"); response = "" + fib (n);} catch (Exception e) {System.out.println ("[.]" + e.toString ()); response = "" } finally {channel.basicPublish (", props.getReplyTo (), replyProps, response.getBytes (" UTF-8 ")); channel.basicAck (delivery.getEnvelope () .getDeliveryTag (), false) Catch (Exception e) {e.printStackTrace () } finally {if (connection! = null) {try {connection.close () } catch (Exception ignore) {}

RPCClient.java

Package com.favccxx.favrabbit;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Channel;import com.rabbitmq.client.QueueingConsumer;import com.rabbitmq.client.AMQP.BasicProperties;import java.util.UUID;public class RPCClient {private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; private QueueingConsumer consumer Public RPCClient () throws Exception {ConnectionFactory factory = new ConnectionFactory (); factory.setHost ("localhost"); connection = factory.newConnection (); channel = connection.createChannel (); replyQueueName = channel.queueDeclare (). GetQueue (); consumer = new QueueingConsumer (channel); channel.basicConsume (replyQueueName, true, consumer) } public String call (String message) throws Exception {String response = null; String corrId = UUID.randomUUID (). ToString (); BasicProperties props = new BasicProperties.Builder (). CorrelationId (corrId) .replyTo (replyQueueName). Build (); channel.basicPublish (", requestQueueName, props, message.getBytes (" UTF-8 ")) While (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery (); if (delivery.getProperties (). GetCorrelationId (). Equals (corrId)) {response = new String (delivery.getBody (), "UTF-8"); break }} return response;} public void close () throws Exception {connection.close ();} public static void main (String [] argv) {RPCClient fibonacciRpc = null; String response = null Try {fibonacciRpc = new RPCClient (); System.out.println ("[x] Requesting fib (30)"); response = fibonacciRpc.call ("30"); System.out.println ("[.] Got'"+ response +"');} catch (Exception e) {e.printStackTrace ();} finally {if (fibonacciRpc! = null) {try {fibonacciRpc.close () } catch (Exception ignore) {}

Start RPCServer first, then run RPCClient, and the console output is as follows

RPCClient [x] Requesting fib (30)

RPCClient [.] Got '832040'

RPCServer [x] Awaiting RPC requests

RPCServer [.] Fib (30)

Thank you for reading this article carefully. I hope the article "RabbitMQ how to implement RPC remote call message queue" shared by the editor will be helpful to everyone. At the same time, I also hope that you will support and pay attention to the industry information channel. More related knowledge is waiting for you to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report