In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly shows you "distributed system message middleware RabbitMQ how to use", the content is easy to understand, clear, hope to help you solve doubts, the following let the editor lead you to study and learn "distributed system message middleware RabbitMQ how to use" this article.
Foreword:
This article mainly summarizes several features commonly used by RabbitMQ in daily project development.
one. Mandatory parameter
As we learned in the previous article, producers send messages to RabbitMQ's switches and route them to specific queues for consumers to consume by matching RoutingKey and BindingKey. So what happens to the message when we can't find the queue through the matching rules? Rabbit provides us with two ways. Mandatory and backup switch.
The mandatory parameter is a parameter in the channel.BasicPublish method. Its main function is to return the message to the producer when the destination is unreachable during message delivery. When the mandatory parameter is set to true, the switch cannot find a qualified queue based on its own type and routing key, so RabbitMQ calls the BasicReturn command to return the message to the producer. When the mandatory parameter is set to false. The message is discarded directly. The operation process and implementation code are as follows (take C# RabbitMQ.Client 3.6.9 as an example):
/ / Connect and create channel-we will omit this code and release the connection ConnectionFactory factory = new ConnectionFactory (); factory.UserName = "admin"; factory.Password = "admin"; factory.HostName = "192.168.121.205"; IConnection conn = factory.CreateConnection (); / / connection Rabbit IModel channel = conn.CreateModel (); / / create channel channel.ExchangeDeclare ("exchangeName", "direct", true) / / define the switch String queueName = channel.QueueDeclare ("TestQueue", true, false, false, null) .QueueName;// defines the queue name TestQueue, persistent, non-exclusive, and not automatically deleted. Channel.QueueBind (queueName, "exchangeName", "routingKey"); / / queue binding switch var message = Encoding.UTF8.GetBytes ("TestMsg"); channel.BasicPublish ("exchangeName", "routingKey", true, null, message); / / publish a message that can be routed to the queue, with the mandatory parameter set to true var message1 = Encoding.UTF8.GetBytes ("TestMsg1"); channel.BasicPublish ("exchangeName", "routingKey1", true, null, message) / / publish a message that cannot be routed to the queue. The mandatory parameter is set to true / / producer callback function channel.BasicReturn + = (model, ea) = > {/ / do something... This callback function is called if the message cannot be routed to the queue. }; / / close channel and connection channel.close (); conn.close ()
two. Backup switch
When the message cannot be routed to the queue, we can return the message to the producer for processing through the mandatory setting parameter. But there will be a problem, that is, the producer needs to open a callback function to handle messages that cannot be routed, which will undoubtedly increase the processing logic of the producer. Backup switches (Altemate Exchange) provide another way to handle messages that cannot be routed. The backup switch can store unrouted messages in RabbitMQ and process them when needed. The main implementation code is as follows:
IDictionary args = new Dictionary (); args.Add ("alternate-exchange", "altExchange"); channel.ExchangeDeclare ("normalExchange", "direct", true, false, args); / / define a normal switch and add backup switch parameters channel.ExchangeDeclare ("altExchange", "fanout", true, false, null); / / define a backup switch and declare it as a sector switch channel.QueueDeclare ("normalQueue", true, false, false, null) / define channel.QueueBind ("normalQueue", "normalExchange", "NormalRoutingKey1"); / / bind ordinary queues with ordinary switches channel.QueueDeclare ("altQueue", true, false, false, null); / / define backup queues channel.QueueBind ("altQueue", "altExchange", ""); / / bind backup queues and switches var msg1 = Encoding.UTF8.GetBytes ("TestMsg"); channel.BasicPublish ("normalExchange", "NormalRoutingKey1", false, null, msg1) / / publish a message that can be routed to the queue, and the message will eventually be routed to normalQueue var msg2 = Encoding.UTF8.GetBytes ("TestMsg1"); channel.BasicPublish ("normalExchange", "NormalRoutingKey2", false, null, msg2); / / publish a message that cannot be routed, and the message will eventually enter the altQueue
In fact, the backup switch is not much different from the ordinary switch. For convenience, it is recommended to set it to fanout type, if set to direct or topic type. It is important to note that when the message is re-sent to the backup switch, the routing key is the same as the routing key issued from the producer. Consider such a situation, if the backup switch is of type direct and has a queue bound to it, assuming that the bound routing key is key1, when a message with a routing key of key2 is forwarded to the backup switch and the backup switch does not match to the appropriate queue, the message is lost. If the message carries a routing key of keyl, it can be stored in the queue.
For backup switches, there are several special cases:
If the set backup switch does not exist, there will be no exception on both the client and the RabbitMQ server, and the message will be lost.
If the backup switch does not bind any queues, neither the client nor the RabbitMQ server will have an exception, and the message will be lost.
If the backup switch does not have any matching queues, neither the client nor the RabbitMQ server will have an exception, and the message will be lost.
If the backup switch is used with the mandatory parameter, the mandatory parameter is invalid.
three. Expiration time (TTL)
3.1 set the TTL of the message
There are currently two ways to set the TTL of a message. The first method is through the queue property setting, where all messages in the queue have the same expiration time. The second method is to set up the message itself separately, and the TTL of each message can be different. If the two methods are used together, the TTL of the message is the smaller value between the two. Once the lifetime of a message in the queue exceeds the set TTL value, it will become a "Dead Message" and consumers will no longer be able to receive the message. (for the dead letter queue, please read on.)
The method of setting the message TTL through the queue property is achieved by adding the x-message-ttl parameter to the channel.QueueDeclare method, which is in milliseconds. Under the sample code:
IDictionary args = new Dictionary (); args.Add ("x-message-ttl", 6000); channel.QueueDeclare ("ttlQueue", true, false, false, args)
If you do not set TTL. If TTL is set to 0, the message will be discarded immediately (or processed by a dead letter queue) unless it can be delivered directly to the consumer at this time.
The way to set TTL for each message is to add the property parameter of Expiration to the channel.BasicPublish method, in milliseconds. The key code is as follows:
BasicProperties properties = new BasicProperties () {Expiration = "20000", / / set TTL to 20000 milliseconds}; var message = Encoding.UTF8.GetBytes ("TestMsg"); channel.BasicPublish ("normalExchange", "NormalRoutingKey", true, properties, message)
Note: for the first method of setting the queue TTL property, once the message expires, it is erased from the queue, while in the second method, even if the message expires, it is not immediately erased from the queue, because the expiration of each message is determined before it is about to be delivered to the consumer. Why? In the first method, messages that have expired in the queue must be in the queue header, and RabbitMQ only needs to scan periodically from the queue header for expired messages. In the second method, the expiration time of each message is different, so if you want to delete all expired messages, you have to scan the entire queue, so it is better to wait until the message is about to be consumed to determine whether it expires, and then delete it if it expires.
3.2 set the TTL of the queue
Note that this is different from the TTL of the message set up through the queue above. What is deleted above is the message, while here it is the queue. The x-expires parameter in the channel.QueueDeclare method allows you to control how long the queue is unused before it is automatically deleted. This unused means that there are no consumers on the queue, the queue has not been redeclared, and the channel.BasicGet command has not been called during the expiration period.
The TTL in the set queue can be applied to reply queues similar to RPC. In RPC, many queues are created but are not used (read on for the RabbitMQ implementation RPC). RabbitMQ ensures that the queue is deleted when the expiration time arrives, but it does not guarantee how timely the deletion will be. After RabbitMQ restarts, the expiration time of the persisted queue is recalculated. The x-expires parameter used to represent the expiration time is in milliseconds and is subject to the same constraints as x-message-ttl, except that it cannot be set to 0 (error will be reported).
The sample code is as follows:
IDictionary args = new Dictionary (); args.Add ("x-expires", 6000); channel.QueueDeclare ("ttlQueue", false, args)
four. Dead letter queue
DLX (Dead-Letter-Exchange) dead-letter switch, when a message becomes dead-letter in one queue, it can be re-sent to another switch, this switch is DLX, and the queue bound to DLX is called dead-letter queue.
There are mainly the following situations in which a message becomes a dead letter:
The message is rejected (BasicReject/BasicNack), and the requeue parameter is set to false; (the consumer confirmation mechanism will be covered in the next article)
Message expires
The queue reached its maximum length.
DLX is also a normal switch, no different from a normal switch, it can be specified on any queue, which is actually setting the properties of a queue. When there is a dead letter in this queue, RabbitMQ automatically republishes the message to the set DLX and is routed to another queue, that is, the dead letter queue. Messages in this queue can be monitored for processing accordingly.
Add DLX to the queue by setting the x-dead-letter-exchange parameter in the channel.QueueDeclare method. The sample code is as follows:
Channel.ExchangeDeclare ("exchange.dlx", "direct", true); / define dead-letter switch channel.ExchangeDeclare ("exchange.normal", "direct", true); / / define ordinary switch IDictionary args = new Dictionary (); args.Add ("x-message-ttl", 10000); / / define message expiration time as 10000 milliseconds args.Add ("x-dead-letter-exchange", "exchange.dlx") / / define exchange.dlx as the dead-letter switch args.Add ("x-dead-letter-routing-key", "routingkey"); / / define the binding key of the dead-letter switch, which may not be specified here, then the routing key channel.QueueDeclare of the original queue ("queue.normal", true, false, false, args) is used by default; / / define the normal queue channel.QueueBind ("queue.normal", "exchange.normal", "normalKey") / / ordinary queue switch binding channel.QueueDeclare ("queue.dlx", true, false, false, null); / / Dead letter queue channel.QueueBind ("queue.dlx", "exchange.dlx", "routingkey"); / / Dead letter queue switch binding. If the dead letter queue routing key is defined above, then you need to use the original queue route key / / publish message var message = Encoding.UTF8.GetBytes ("TestMsg"). Channel.BasicPublish ("exchange.normal", "normalKey", null, message)
The following is the operation flow of the dead letter queue:
five. Delay queue
RabbitMQ itself does not provide delay queuing. Delay queue is a logical concept, and its implementation can be simulated by expiration time + dead letter queue. The logical structure of the delay queue is roughly as follows:
The producer sends the message to the queue with an expiration time of n, in which no consumer consumes the message. When the expiration time arrives, the message is forwarded to the dead-letter queue through the dead-letter switch. Consumers consume messages from the dead letter queue. At this time, it reached the point that the producer released the message and the consumer consumed the message after n times, which played a role in delaying consumption.
The delay queue can be applied to many scenarios in our project, such as canceling the order with two messages after placing the order, receiving the goods automatically within seven days, thawing the password 24 hours after the password is frozen, and the message compensation mechanism in the distributed system (compensation after 1s, compensation after 10s, compensation after 5m).
six. Priority queue
Just like the "special" people in our lives, there are some "special" messages in our business that may need to be dealt with first. In life, we may open up a set of VIP channels for these special people, and Rabbit also has such VIP channels (provided after version 3.5), that is, priority queues. Messages in the queue will have high priority messages with the privilege of being consumed first. There are only two things we need to do with these VIP messages:
We only need to do two things:
Declare the queue as a priority queue, that is, add the parameter x-max-priority to specify the highest priority when creating the queue, with a value of 0-255 (integer).
Add priority to the priority message.
The sample code is as follows:
Channel.ExchangeDeclare ("exchange.priority", "direct", true); / / define switch IDictionary args = new Dictionary (); args.Add ("x-max-priority", 10); / / define the maximum priority of the priority queue is 10 channel.QueueDeclare ("queue.priority", true, false, false, args); / / define the priority queue channel.QueueBind ("queue.priority", "exchange.priority", "priorityKey") / / queue switch binds BasicProperties properties = new BasicProperties () {Priority = 8 properties / sets message priority to 8}; var message = Encoding.UTF8.GetBytes ("TestMsg8"); / / publish message channel.BasicPublish ("exchange.priority", "priorityKey", properties, message)
Note: messages that do not specify a priority will treat the priority as 0. For messages that exceed the maximum priority set by the priority queue, the priority is treated as the highest priority. For messages with the same priority, those who are backward are at the front. If the consumer is consuming faster than the producer and there is no message accumulation in the Broker, there is no practical point in prioritizing the messages sent. Because the producer is consumed by the consumer as soon as the producer sends a message, it is equivalent to having at most one message in the Broker, and priority is meaningless for a single message.
With regard to priority queues, it seems to violate the first-in-first-out principle of the data structure of queues, and how it is implemented will not be discussed too much here. Those who are interested can do their own research. There may also be related articles to analyze its principle.
seven. RPC implementation
RPC is the abbreviation of Remote Procedure Call, that is, remote procedure call. It is a technology that requests services from remote computers over a network without knowing the underlying network. The main function of RPC is to make it easier to build distributed computing without losing the semantic simplicity of local calls while providing powerful remote invocation capabilities.
There is not much to say about RPC, but here we focus on how RabbitMQ implements RPC. RabbitMQ can implement a very simple RPC. The client sends the request message, and the server replies with the response message. In order to receive the response message, we need to send a callback queue in the request message (the default queue can be used). The server-side implementation code is as follows:
Static void Main (string [] args) {ConnectionFactory factory = new ConnectionFactory (); factory.UserName = "admin"; factory.Password = "admin"; factory.HostName = "192.168.121.205"; IConnection conn = factory.CreateConnection (); IModel channel = conn.CreateModel (); channel.QueueDeclare ("RpcQueue", true, false, false, null); SimpleRpcServer rpc = new MySimpRpcServer (new Subscription (channel, "RpcQueue")); rpc.MainLoop () } public class MySimpRpcServer: SimpleRpcServer {public MySimpRpcServer (Subscription subscription): base (subscription) {} / public override byte [] HandleSimpleCall (bool isRedelivered, IBasicProperties requestProperties, byte [] body, out IBasicProperties replyProperties) {replyProperties = null; return Encoding.UTF8.GetBytes ("I got it!") } / public override void ProcessRequest (BasicDeliverEventArgs evt) {/ / todo. Base.ProcessRequest (evt);}}
The client implementation code is as follows:
ConnectionFactory factory = new ConnectionFactory (); factory.UserName = "admin"; factory.Password = "admin"; factory.HostName = "192.168.121.205"; IConnection conn = factory.CreateConnection (); IModel channel = conn.CreateModel (); SimpleRpcClient client = new SimpleRpcClient (channel, "RpcQueue"); var message = Encoding.UTF8.GetBytes ("TestMsg8"); var result = client.Call (message); / / do somethings...
The above is the logic of the Rpc client and server encapsulated by the Rabbit client itself. Of course, we can also implement it ourselves, mainly with the help of two parameters of BasicProperties.
ReplyTo: usually used to set up a callback queue.
CorrelationId: used to associate a request (request) with its reply (response) after calling RPC.
The processing flow is as follows:
When the client starts, create an anonymous callback queue.
The client sets two attributes for the RPC request: ReplyTo is used to tell the RPC server the destination queue when replying to the request, that is, the callback queue, and Correlationld is used to mark a request.
The request is sent to the RpcQueue queue.
The RPC server listens for requests in the RpcQueue queue, and when the request arrives, the server processes and sends the resulting message to the client. The received queue is the callback queue set by ReplyTo.
Client Jian Xin callback queue, when there is a message, check the Correlationld attribute. If it matches the request, that is the result.
The above is all the content of the article "how to use RabbitMQ in distributed system message middleware". Thank you for reading! I believe we all have a certain understanding, hope to share the content to help you, if you want to learn more knowledge, welcome to follow the industry information channel!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.