Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to master rabbitmq in message queuing principle

2025-03-13 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/03 Report--

This article mainly explains "how to master rabbitmq". The content of the explanation is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "how to master rabbitmq".

Introduction

RabbitMQ is an open source implementation of AMQP (Advanced Message Queuing Protocol, Advanced message queuing Protocol) developed by Erlang, which is used to store and forward messages in distributed systems. It performs well in terms of ease of use, scalability, high availability and so on. Multiple client languages are supported.

Architecture

The overall architecture is illustrated in the following figure

First, let's take a look at the explanation of each ranking in the picture:

Broker: it provides a transport service whose role is to maintain a route from producer to consumer to ensure that data can be transmitted in a specified way, simply a message queuing server entity.

Connection: a direct TCP connection between a client and a Rabbitmq Broker. Usually only one connection is needed between a client and the Broker.

Channel: message channel. Multiple channel can be established in each connection of the client, preferably with a separate Channel for each thread. Subsequent operations on Queue and Exchange are done in Channel.

Producer: a message producer that sends messages to Exchange by establishing Connection and Channel with Broker.

Consumer: a message consumer who consumes messages from Queue by establishing Connection and Channel with Broker.

Exchange: a message switch that delivers messages produced by Producer to Queue according to certain policies, waiting for consumers to consume.

Queue: message queue carrier, where each message is thrown into one or more queues.

Vhost: virtual host, multiple vhost can be set up in a broker to separate the rabbitmq used by different systems and share a message queuing server, but it looks like each rabbitmq server is in use or not.

Binding: binding, which binds exchange and queue according to routing rules so that RabbitMQ knows how to route messages correctly to the specified Queue.

RoutingKey: routing keyword. When sending a message to Exchange, the producer usually specifies a routing key to specify the routing rule of the message, and this routing key needs to be used in conjunction with Exchange Type and binding key to take effect.

The concept that is difficult to understand is RoutingKey,Exchange,Binding. When consumption is sent, it will not be sent directly to Queue. Instead, it will be sent to Exchange first. Exchange will deliver it to the Queue bound to it according to certain rules. What is this rule? Rules are related to Type, Binding and RoutingKey of Exchange. There are four types supported by Exchange, direct,fanout,topic,headers, which means:

Direct: Queue and Exchange need to specify a key when binding, which we call Bindkey. When Producer sends messages to Exchange, you also need to specify a key, which is Routekey. In this mode, Exchange delivers messages to queues with the same Routekey and Bindkey

Fanout: similar to broadcasting, messages are delivered to all queues bound to Exchange without checking Routekey and Bindkey.

Topic: similar to multicast, Bingkey supports fuzzy matching. * means to match an arbitrary phrase, and # means to match 0 or more phrases. For example, Producer generates a message whose RouteKey is benz.car, and the Exchange is bound to three groups of queues (please note that three groups are not three, which means that Exchange can be bound to the same Queue multiple times. Through different Bindkey, there is a many-to-many relationship between them. If the Bindkey is: car, * .car, benz.car, then the message will be delivered to the Queue corresponding to * .car and benz.car.

Headers: the matching rule for this type of Routekey and Bindkey routes the message, but matches based on the headers attribute in the content of the message sent.

Compared with the above picture and ranking explanation should be relatively clear, let's use a few examples to illustrate how to use.

Usage (golang) direct

First, take a look at the default exchange of Rabbitmq, where the first (AMQP default) is the default, which binds all Queue by default, and delivers messages to the queue corresponding to Routekey, that is, Routekey==QueueName.

Package mainimport ("fmt"github.com/streadway/amqp"log") func handlerError (err error, msg string) {if err! = nil {log.Fatalf ("% s:% s", msg, err)}} var url = "amqp://username:password@ip:port" func main () {conn, err: = amqp.Dial (url) handlerError (err) "Failed to connect to RabbitMQ") defer conn.Close () channel, err: = conn.Channel () handlerError (err, "Failed to open a Channel") defer channel.Close () queueNameCar: = "car" if _, err: = channel.QueueDeclare (queueNameCar, false, nil) Err! = nil {handlerError (err, "Failed to decare Queue")} if err: = channel.Publish ("", queueNameCar, false, false, amqp.Publishing {ContentType: "text/plain", Body: [] byte ("test car")}); err! = nil {handlerError (err, "Failed to publish message")}}

Here is a complete Demo, only the sample code for the main () function will be provided later, and the rest will be similar to here.

A message queue named car is declared, and without any binding, a message is sent to defalut exchange, routekey is car, and you can see that it is the same as the queue name.

To facilitate the demonstration, the result is shown as a picture, and you can see that there is a queue of car and a message.

There are several parameters to follow when creating a queue

Durability: persistence, whether to persist the queue to disk. When persistence is selected, when rabbitmq is restarted, the queue is still there, otherwise the queue will be lost when it is restarted and needs to be recreated. This needs to be taken into account when designing programs.

Auto delete: when one of the consumers has finished, the queue is deleted and disconnected from the other consumers.

Arguments:

X-message-ttl: the expiration time of the message and how long the message published to the queue can survive before it is discarded.

X-expires: the expiration time of a queue, and how long a queue is not in use will be automatically deleted.

X-max-length: the length of the queue and the maximum number of messages it can hold.

X-max-length-bytes: the maximum number of messages a queue can contain.

X-dead-letter-exchange: when a message expires or is reject by the client, it should be re-delivered to that exchange, similar to selecting exchange when a producer sends a message

X-dead-letter-routing-key: the Routekey when a message expires or is re-delivered after being reject by the client, similar to setting routekey when a producer sends a message. The default is the routekey of the original message.

X-max-priority: set the priority of the message. Set the maximum priority that can be supported. If set to 10, you can set the priority when sending the message. You can process the message according to the priority. The default is empty. If it is empty, the priority is not supported.

X-queue-mode: set the queue to lazy mode and keep as many messages on disk as possible to reduce RAM usage; if not, the queue will retain the in-memory cache to deliver messages as quickly as possible.

Let's create an exchange of type direct and bind some queues to see what happens.

Func main () {conn, err: = amqp.Dial (url) handlerError (err, "Failed to connect to RabbitMQ") defer conn.Close () channel, err: = conn.Channel () handlerError (err, "Failed to open a Channel") defer channel.Close () directExchangeNameCar: = "direct.car" if err: = channel.ExchangeDeclare (directExchangeNameCar, "direct", true, false, nil) Err! = nil {handlerError (err, "Failed to decalare exchange")} queueNameCar: = "car" queueNameBigCar: = "big-car" queueNameMiddleCar: = "middle-car" queueNameSmallCar: = "small-car" channel.QueueDeclare (queueNameCar, false, nil) channel.QueueDeclare (queueNameBigCar, false, nil) channel.QueueDeclare (queueNameMiddleCar, false, false, false False, nil) channel.QueueDeclare (queueNameSmallCar, false, nil) if err: = channel.QueueBind (queueNameCar, "car", directExchangeNameCar, false, nil) Err! = nil {handlerError (err, "Failed to bind queue to exchange")} if err: = channel.QueueBind (queueNameBigCar, "car", directExchangeNameCar, false, nil); err! = nil {handlerError (err, "Failed to bind queue to exchange")} if err: = channel.QueueBind (queueNameBigCar, "big.car", directExchangeNameCar, false, nil) Err! = nil {handlerError (err, "Failed to bind queue to exchange")} if err: = channel.QueueBind (queueNameMiddleCar, "car", directExchangeNameCar, false, nil); err! = nil {handlerError (err, "Failed to bind queue to exchange")} if err: = channel.QueueBind (queueNameMiddleCar, "middler.car", directExchangeNameCar, false, nil) Err! = nil {handlerError (err, "Failed to bind queue to exchange")} if err: = channel.QueueBind (queueNameSmallCar, "car", directExchangeNameCar, false, nil); err! = nil {handlerError (err, "Failed to bind queue to exchange")} if err: = channel.QueueBind (queueNameSmallCar, "small.car", directExchangeNameCar, false, nil) Err! = nil {handlerError (err, "Failed to bind queue to exchange")} if err: = channel.Publish (directExchangeNameCar, "car", false, false, amqp.Publishing {ContentType: "text/plain", Body: [] byte ("test car")}); err! = nil {handlerError (err, "Failed to publish message")}}

One Exchange and four Queue,7 Binding are declared in the code, and the details of one Binding are as follows:

You can see that messages are sent to this Exchange, Routekey is car, and there is a matching queue, so there should be messages in all four queues, and our idea is to always

The creation of Queue has been mentioned above, and here is the creation of Exchange, so let's take a look at the parameters of creating Exchange.

Type: type, which has been mentioned above

Durability: persistence

Auto delete: whether to delete automatically. If yes, when the queue completes the unbind operation, other queue or exchange will also unbind and delete the exchange.

Internal: if yes, the client cannot send messages directly to this exchange, it can only be used as a binding to exchange.

Fanout

Fanout works like a broadcast. Take a look at the following code

Func main () {conn, err: = amqp.Dial (url) handlerError (err, "Failed to connect to RabbitMQ") defer conn.Close () channel, err: = conn.Channel () handlerError (err, "Failed to open a Channel") defer channel.Close () fanoutExchangeNameCar: = "fanout.car" if err: = channel.ExchangeDeclare (fanoutExchangeNameCar, "fanout", true, false, nil) Err! = nil {handlerError (err, "Failed to decalare exchange")} queueNameCar: = "car" queueNameBigCar: = "big-car" queueNameMiddleCar: = "middle-car" queueNameSmallCar: = "small-car" channel.QueueDeclare (queueNameCar, false, nil) channel.QueueDeclare (queueNameBigCar, false, nil) channel.QueueDeclare (queueNameMiddleCar, false, false, false False, nil) channel.QueueDeclare (queueNameSmallCar, false, nil) if err: = channel.QueueBind (queueNameCar, "car", fanoutExchangeNameCar, false, nil) Err! = nil {handlerError (err, "Failed to bind queue to exchange")} if err: = channel.QueueBind (queueNameBigCar, "car", fanoutExchangeNameCar, false, nil); err! = nil {handlerError (err, "Failed to bind queue to exchange")} if err: = channel.QueueBind (queueNameBigCar, "big.car", fanoutExchangeNameCar, false, nil) Err! = nil {handlerError (err, "Failed to bind queue to exchange")} if err: = channel.QueueBind (queueNameMiddleCar, "car", fanoutExchangeNameCar, false, nil); err! = nil {handlerError (err, "Failed to bind queue to exchange")} if err: = channel.QueueBind (queueNameMiddleCar, "middler.car", fanoutExchangeNameCar, false, nil) Err! = nil {handlerError (err, "Failed to bind queue to exchange")} if err: = channel.QueueBind (queueNameSmallCar, "car", fanoutExchangeNameCar, false, nil); err! = nil {handlerError (err, "Failed to bind queue to exchange")} if err: = channel.QueueBind (queueNameSmallCar, "small.car", fanoutExchangeNameCar, false, nil) Err! = nil {handlerError (err, "Failed to bind queue to exchange")} if err: = channel.Publish (fanoutExchangeNameCar, "middle.car", false, false, amqp.Publishing {ContentType: "text/plain", Body: [] byte ("test car")}); err! = nil {handlerError (err, "Failed to publish message")}}

This declares an exchange of type fanout, similar to the above code, except for exchange.

You can first think about how many messages there are in each queue.

Sending a message to the exchange fanout.car specifies that Routekey is middle.car, but because it is in broadcast mode, it has nothing to do with routekey, and there is one message in each message queue.

Note that some binding points to the same queue, so multiple messages will be generated into the same queue. The answer is no. Producer generates a message, and according to certain rules, each queue will receive only one message (how to comply with the delivery rules).

Topic

Topic is more interesting, different from the previous simple and rude use, first take a look at the following code, declare a topic type exchange, 4 queue

Func main () {conn, err: = amqp.Dial (url) handlerError (err, "Failed to connect to RabbitMQ") defer conn.Close () channel, err: = conn.Channel () handlerError (err, "Failed to open a Channel") defer channel.Close () topicExchangeNameCar: = "topic.car" if err: = channel.ExchangeDeclare (topicExchangeNameCar, "topic", true, false, nil) Err! = nil {handlerError (err, "Failed to decalare exchange")} queueNameCar: = "car" queueNameBigCar: = "big-car" queueNameMiddleCar: = "middle-car" queueNameSmallCar: = "small-car" channel.QueueDeclare (queueNameCar, false, nil) channel.QueueDeclare (queueNameBigCar, false, nil) channel.QueueDeclare (queueNameMiddleCar, false, false, false False, nil) channel.QueueDeclare (queueNameSmallCar, false, nil) if err: = channel.QueueBind (queueNameCar, "car", topicExchangeNameCar, false, nil) Err! = nil {handlerError (err, "Failed to bind queue to exchange")} if err: = channel.QueueBind (queueNameBigCar, "car", topicExchangeNameCar, false, nil); err! = nil {handlerError (err, "Failed to bind queue to exchange")} if err: = channel.QueueBind (queueNameBigCar, "big.car", topicExchangeNameCar, false, nil) Err! = nil {handlerError (err, "Failed to bind queue to exchange")} if err: = channel.QueueBind (queueNameMiddleCar, "car", topicExchangeNameCar, false, nil); err! = nil {handlerError (err, "Failed to bind queue to exchange")} if err: = channel.QueueBind (queueNameMiddleCar, "middler.car", topicExchangeNameCar, false, nil) Err! = nil {handlerError (err, "Failed to bind queue to exchange")} if err: = channel.QueueBind (queueNameSmallCar, "car", topicExchangeNameCar, false, nil); err! = nil {handlerError (err, "Failed to bind queue to exchange")} if err: = channel.QueueBind (queueNameSmallCar, "small.car", topicExchangeNameCar, false, nil) Err! = nil {handlerError (err, "Failed to bind queue to exchange")} if err: = channel.QueueBind (queueNameSmallCar, "* .small.car", topicExchangeNameCar, false, nil); err! = nil {handlerError (err, "Failed to bind queue to exchange")} if err: = channel.QueueBind (queueNameSmallCar, "# .small.car", topicExchangeNameCar, false, nil) Err! = nil {handlerError (err, "Failed to bind queue to exchange")}}

Now think about which queue will receive a message after each producer generates a message.

If err: = channel.Publish (topicExchangeNameCar, "car", false, false, amqp.Publishing {ContentType: "text/plain", Body: [] byte ("test car")}); err! = nil {handlerError (err, "Failed to publish message")}

Every queue gets a message.

If err: = channel.Publish (topicExchangeNameCar, "small.car", false, false, amqp.Publishing {ContentType: "text/plain", Body: [] byte ("test car")}); err! = nil {handlerError (err, "Failed to publish message")}

Small-car this queue will receive a message.

It conforms to the binding whose Routekey is small.car, * .small.car and # .small.car

If err: = channel.Publish (topicExchangeNameCar, "benz.small.car", false, false, amqp.Publishing {ContentType: "text/plain", Body: [] byte ("test car")}); err! = nil {handlerError (err, "Failed to publish message")}

Small-car this queue will receive a message.

It conforms to the binding whose Routekey is * .small.car and # .small.car

If err: = channel.Publish (topicExchangeNameCar, "auto.blue.benz.small.car", false, false, amqp.Publishing {ContentType: "text/plain", Body: [] byte ("test car")}); err! = nil {handlerError (err, "Failed to publish message")}

Small-car this queue will receive a message.

Match binding with Routekey of # .small.car

If err: = channel.Publish (topicExchangeNameCar, "bike", false, false, amqp.Publishing {ContentType: "text/plain", Body: [] byte ("test car")}); err! = nil {handlerError (err, "Failed to publish message")}

Will not receive the message, there is no matching routekey.

Headers

There are few practical application scenarios for this type.

Thank you for reading, the above is the content of "how to master rabbitmq of message queuing principles". After the study of this article, I believe you have a deeper understanding of how to master rabbitmq of message queuing principles, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report