In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly introduces "the transaction mechanism of rabbitmq". In the daily operation, I believe that many people have doubts about the transaction mechanism of rabbitmq. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubts about "rabbitmq transaction mechanism". Next, please follow the editor to study!
Rabbitmq transaction mechanism:
1: implemented through transaction mechanism
1:channel.txSelect () declares to start transaction mode
2: channel.txComment () commit transaction
3:channel.txRollback () rollback the transaction
Try {channel.txSelect (); / declare transaction / / send message channel.basicPublish (", _ queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes (" UTF-8 ")); channel.txCommit (); / / commit transaction} catch (Exception e) {channel.txRollback ();} finally {channel.close (); conn.close ();}
2: through the sender to confirm the publisher confirm mechanism.
The Confirm sender acknowledgement mode is similar to a transaction by setting Channel for sender acknowledgment. There are three ways to implement Confirm: mode 1: channel.waitForConfirms () ordinary sender confirmation mode; mode 2: channel.waitForConfirmsOrDie () batch confirmation mode; mode 3: channel.addConfirmListener () asynchronous listening sender confirmation mode Method 1: normal Confirm mode / / create connection ConnectionFactory factory = new ConnectionFactory (); factory.setUsername (config.UserName); factory.setPassword (config.Password); factory.setVirtualHost (config.VHost); factory.setHost (config.Host); factory.setPort (config.Port); Connection conn = factory.newConnection (); / / create channel Channel channel = conn.createChannel () / / declare queue channel.queueDeclare (config.QueueName, false, null); / / enable sender confirmation mode channel.confirmSelect (); String message = String.format ("time = >% s", new Date (). GetTime ()); channel.basicPublish (", config.QueueName, null, message.getBytes (" UTF-8 ")) If (channel.waitForConfirms ()) {System.out.println ("message sent successfully");} you can see from the code that we just need to declare that channel.confirmSelect () turns on sender confirmation mode before pushing the message, and then use channel.waitForConfirms () to wait for the message to be confirmed by the server. Method 2: batch Confirm mode / / create connection ConnectionFactory factory = new ConnectionFactory (); factory.setUsername (config.UserName); factory.setPassword (config.Password); factory.setVirtualHost (config.VHost); factory.setHost (config.Host); factory.setPort (config.Port); Connection conn = factory.newConnection (); / / create channel Channel channel = conn.createChannel () / / declare queue channel.queueDeclare (config.QueueName, false, null); / / enable sender confirmation mode channel.confirmSelect (); for (int I = 0; I)
< 10; i++) { String message = String.format("时间 =>% s ", new Date () .getTime (); channel.basicPublish (", config.QueueName, null, message.getBytes ("UTF-8"));} channel.waitForConfirmsOrDie (); / / until all information is published, as long as there is one unconfirmed, IOException System.out.println ("all execution complete") The above code can see that channel.waitForConfirmsOrDie (), uses synchronization to wait for all messages to be sent before executing the following code, and throws an IOException exception as long as a message is not acknowledged. Method 3: asynchronous Confirm mode / / create connection ConnectionFactory factory = new ConnectionFactory (); factory.setUsername (config.UserName); factory.setPassword (config.Password); factory.setVirtualHost (config.VHost); factory.setHost (config.Host); factory.setPort (config.Port); Connection conn = factory.newConnection (); / / create channel Channel channel = conn.createChannel () / / declare queue channel.queueDeclare (config.QueueName, false, null); / / enable sender confirmation mode channel.confirmSelect (); for (int I = 0; I)
< 10; i++) { String message = String.format("时间 =>S ", new Date () .getTime (); channel.basicPublish (", config.QueueName, null, message.getBytes ("UTF-8")) } / / Asynchronous listen for acknowledgement and unacknowledged messages channel.addConfirmListener (new ConfirmListener () {@ Override public void handleNack (long deliveryTag, boolean multiple) throws IOException {System.out.println ("unacknowledged messages, identity:" + deliveryTag) } @ Override public void handleAck (long deliveryTag, boolean multiple) throws IOException {System.out.println ("acknowledged message, ID:% d, multiple messages:% b", deliveryTag, multiple);}})
Rabbitmq message distribution
When the RabbitMQ queue has multiple consumers, the messages received by the queue are sent to the consumer as a round-robin distributor. Each message is sent to only one consumer in the subscription list. This approach is well suited for extension, and it is specifically designed for concurrent programs. If the load is heavier now, you just need to create more consumers to consume and process messages. In many cases, the distribution mechanism of polling is not so elegant. By default, if there is a consumer, RabbitM distributes the article message to m% n (by way of withdrawal), regardless of whether the consumer consumes the message or not and has Basic.Ack the message. Just imagine, if some consumers have a heavy task and don't have time to consume so much interest, while some other consumers quickly process the assigned messages for some reason (such as simple business logic, excellent machine performance, etc.). In turn, the process is idle, which will lead to a decline in overall application throughput. So how to deal with this situation? The channel.basicQos (int prefetchCoun) method is used here, and as described in the previous section, the channel.basicQos method allows you to limit the number of most unacknowledged messages that consumers on the channel can keep. For example, before subscribing to a consumption queue, the consumer program calls channel.basicQos (5), and then reads a queue for consumption. RabbitM keeps a list of consumers, and each message is counted for the corresponding payer. If the limit is reached, RabbitMQ will not send any more messages to this consumer. Until the consumer acknowledges a message, RabbitMQ subtracts the corresponding count by 1, after which the consumer can continue to receive the message until the upper limit of the count is reached again. This mechanism can be analogous to the "sliding window" in TCP protocols IP Note: the use of Basic.Qos is not effective for pull mode consumption. There are three types of overloading methods for channel.basicQos: (1) void basicQos (int prefetchCount) throws IOException; (2) void basicQos (nt prefetchCount, boo1ean globa1) throws IOExcepti (3) void basicQos (int prefetchSize, int prefetchCount, boo1ean global) IOException; all use only the parameter prefetchCount, when the prefetchCount setting has no upper limit. And the parameter prefetchSize indicates that the upper unit of the overall size of unacknowledged messages that consumers can receive is, and if set to, there is no upper limit. For each channel, it can consume multiple queues at the same time. When prefetchCount is greater than, the channel needs to coordinate with each queue to ensure that the messages sent do not exceed the limited value of prefetchCount. The performance of RabbitM degrades, especially when these queues are scattered among multiple Broker nodes in the cluster. Rabbit improves related performance by redefining the parameter global on top of the AMQPO-9-1 protocol. Compared with the global parameters shown in Table 4-4-1, the global parameter false: new consumers on the channel need to comply with the prefetchCount limit true: all consumers on the channel need to comply with the prefetchCount limit
Sequential consumption of rabbitmq messages:
Rabbitmq retransmission, dead letter queue, delay, and network flash can all cause production disorder, so sequential consumption is not supported.
You can customize the sort value in the business and wait when the accepted sort value is inconsistent with the next estimated value to be consumed.
Rabbitmq message reliability guarantee:
The reliable transmission of messages is generally the primary consideration when the business system accesses the message middleware. Generally, the transmission guarantee of message middleware is divided into three levels. {> At most once: once at most. The message may be lost, but it will never be retransmitted At least once: at least once. Messages are never lost, but may be transmitted repeatedly. Exactly once: just once. Each message must be transmitted once and only once. RabbitMQ supports "up to once" and "at least once". Among them, the implementation of "minimum" delivery needs to be tested in the following aspects: (1) message producers need to open the transaction mechanism or publisher confirm mechanism to ensure that interest cancellation can be reliably input to RabbitMQ. (2) message producers need to use mandatory parameters or backup switches to ensure that messages can be routed from the exchange to the queue so that they can be saved without being discarded. 3) both messages and queues need to be persisted to ensure that when the RabbitMQ server encounters an exception, 90 Rabbi {MQ advances cause message loss. (4) consumers need to set autoAck to false when consuming messages, and then confirm the messages that have been consumed correctly by manual confirmation to avoid unnecessary message loss on the consumer side. The "most times" approach does not have to take into account the above aspects, producers send at will, consumers spend at will, but it is difficult to ensure that messages will not be lost, which is currently not guaranteed by RabbitMQ. Considering such a situation, the consumer sends a confirmation Basic.Ack command to RabbitMQ after consuming a message, and the RabbitMQ does not receive the confirmation command due to network disconnection or other reasons, then RabbitMQ will not delete the message tag. After the connection is re-established, consumers will still consume this message, which leads to repeated consumption. Consider another situation. When using the ublisher confirm mechanism, the producer sends a message and waits for the RabbitMQ to return confirmation notification. At this time, the network is disconnected and the producer catches the abnormal situation. In order to ensure the reliability of the message, the producer chooses to resend it, so that there are two identical messages in the RabbitMQ when consuming. Consumers will repeat spending, so is there a mechanism to remove the weight of RabbitMQ to ensure that it "happens to be once"? The answer is no, and not only most of RabbitMQ's current stream message middleware does not have a message deduplication mechanism, nor does it guarantee "just the right time". De-reprocessing is implemented on the business client side, such as introducing the concept of GUID (Globally Unique Identifier). For GUID, from the point of view of the client, the introduction of centralized cache will inevitably increase the complexity of dependency. in addition, it is difficult to define the size of the cache. It is recommended that in the actual production environment, the business side carries out de-duplication according to its own business characteristics, such as the business message itself has the same nature, or uses other products such as Redis for de-reprocessing. At this point, the study of "the transaction mechanism of rabbitmq" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.