Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What are the three working modes of python operating RabbitMq

2025-04-02 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)05/31 Report--

Most people do not understand the knowledge points of this article, "what are the three working modes of python and RabbitMq?", so the editor summarizes the following, detailed contents, clear steps, and a certain reference value. I hope you can get something after reading this article. Let's take a look at this article, "what are the three working modes of python operation RabbitMq?"

I. brief introduction:

RabbitMq is an open source message broker middleware that implements the Advanced message queuing Protocol (AMQP). Message queuing is a way of passing an application to an application. The application writes a message, delivers the message to the queue, and another application reads it to complete the communication. As a middleware, RabbitMq is undoubtedly one of the most popular message queues at present.

RabbitMq has a wide range of application scenarios:

High availability of the system: daily life in a variety of mall second kill, high traffic, high concurrency of the scene. When the server receives such a large number of requests for processing business, there is a risk of downtime. Some businesses may be extremely complex, but this part is not highly timely and does not need to be fed back to the user immediately. We can throw this part of the processing request to the queue and let the program handle it later, so as to reduce the pressure on the server in high concurrency scenarios.

The application of message queue often needs to be considered in distributed system, integrated system, docking between subsystems, and architecture design.

II. Production and consumption of RabbitMq

Producter: the producer of queue messages, responsible for producing messages and passing them into the queue

Import pikaimport jsoncredentials = pika.PlainCredentials ('shampoo',' 123456') # mq username and password # the virtual queue needs to specify the parameter virtual_host, which can be left empty if it is the default. Connection = pika.BlockingConnection (pika.ConnectionParameters (host = '10.1.62.170recording recording port = 5672 recording host =' /', credentials = credentials)) channel=connection.channel () # declares the message queue in which the message will be delivered, if it does not exist Then create result = channel.queue_declare (queue = 'python-test') for i in range (10): message=json.dumps ({' OrderId': "1000% s"% I}) # insert the value routing_key into the queue is the queue name channel.basic_publish (exchange ='', routing_key = 'python-test',body = message) print (message) connection.close ()

Consumer (consumer): the recipient of queue messages, responsible for receiving and processing messages in the message queue

Import pikacredentials = pika.PlainCredentials ('shampoo',' 123456') connection = pika.BlockingConnection (pika.ConnectionParameters (host = '10.1.62.170virtualkeeper host =' /', credentials = credentials)) channel = connection.channel () # declares that the message queue is delivered in this queue, and if it does not exist, create a queue channel.queue_declare (queue = 'python-test', durable = False) # define a callback function to process messages in the message queue Here is to print out def callback (ch, method, properties, body): ch.basic_ack (delivery_tag = method.delivery_tag) print (body.decode ()) # tell rabbitmq to use callback to receive messages channel.basic_consume ('python-test',callback) # begins to receive messages and enters a blocking state, callback will be called to process channel.start_consuming () only when there is information in the queue. 3. RabbitMq persistence

MQ sets up temporary queue and exchange by default. If persistence is not declared, once rabbitmq is dead, queue and exchange will be lost. So we usually declare persistence when we create a queue or exchange.

1.queue declaration persistence

# declare the message queue in which the message will be delivered. If it does not exist, it will be created. Durable = True represents message queue persistent storage, False non-persistent storage result = channel.queue_declare (queue = 'python-test',durable = True)

2.exchange declaration persistence

# declare exchange, and exchange specifies the queue in which the message is delivered. If it does not exist, create .durable = True for exchange persistent storage and False non-persistent storage channel.exchange_declare (exchange = 'python-test', durable = True)

Note: if a non-persistent queue or exchange already exists, executing the above code will report an error because the current state cannot change the queue or exchange storage properties and needs to be deleted and rebuilt. Binding is not allowed if one of queue and exchange declares persistence and the other does not.

3. Message persistence

Although both exchange and queue declare persistence, if the message is only in memory, the contents of memory will still be lost when rabbitmq is restarted. So it must be declared that the message is also persisted, transferred from memory to the hard disk.

# inserting a numeric value into the queue routing_key is the queue name. Delivery_mode = 2 declares that the message is persisted in the queue, delivery_mod = 1 message is not persistent channel.basic_publish (exchange ='', routing_key = 'python-test',body = message, properties=pika.BasicProperties (delivery_mode = 2))

4.acknowledgement messages are not lost

When the consumer (consumer) calls the callback function, there is a risk that the message will fail to process, and if the processing fails, the message will be lost. However, you can also choose to return the message to rabbitmq when the consumer processing fails, and then be consumed by the consumer again. At this time, you need to set the confirmation ID.

Channel.basic_consume (callback,queue = 'python-test',# no_ack) is set to False. When the callback function is called, the acknowledgement ID is not received, and the message will be returned to the queue. True, regardless of whether the call to callback is successful or not, the message is consumed no_ack = False) 3. RabbitMq publish and subscribe

The publishing and subscribing of rabbitmq should be realized by the principle of switch (Exchange):

Exchange has three working modes: fanout, direct, and topicd.

Mode 1: fanout

In this mode, messages delivered to the exchange will be forwarded to all queue bound to it.

You do not need to specify a routing_key, and even if you do, it is not valid.

You need to bind exchange and queue in advance. An exchange can bind multiple queue, and a queue can bind multiple exchange.

You need to start the subscriber first. The queue in this mode is randomly generated by consumer, and the publisher only publishes the message to exchange, and the exchange forwards the message to queue.

Published by:

Import pikaimport jsoncredentials = pika.PlainCredentials ('shampoo',' 123456') # mq username and password # the virtual queue needs to specify the parameter virtual_host, which can be left empty if it is the default. Connection = pika.BlockingConnection (pika.ConnectionParameters (host = '10.1.62.170recording recording port = 5672 channel=connection.channel virtualizer host =' /', credentials = credentials)) channel=connection.channel () # declares exchange, and exchange specifies the queue in which the message is delivered, and if it does not exist, create it. Durable = True represents exchange persistent storage, and False non-persistent storage channel.exchange_declare (exchange = 'python-test',durable = True, exchange_type='fanout') for i in range (10): message=json.dumps ({' OrderId': "1000% s"% I}) # inserts the numerical value routing_key into the queue is the queue name. Delivery_mode = 2 declares that the message is persisted in the queue, and the delivery_mod = 1 message is not persistent. Routing_key does not need to configure channel.basic_publish (exchange = 'python-test',routing_key ='', body = message, properties=pika.BasicProperties (delivery_mode = 2)) print (message) connection.close ()

Subscribers:

Import pikacredentials = pika.PlainCredentials ('shampoo',' 123456') connection = pika.BlockingConnection (pika.ConnectionParameters (host = '10.1.62.170virtualkeeper host =' /', credentials = credentials)) channel = connection.channel () # create a temporary queue with empty characters in the queue name. After consumer is closed, the queue is automatically deleted result = channel.queue_declare (', exclusive=True) # declares exchange, and exchange specifies the queue in which the message is delivered, and if it does not exist, create it. Durable = True represents exchange persistent storage, False non-persistent storage channel.exchange_declare (exchange = 'python-test',durable = True, exchange_type='fanout') # binding exchange and queue exchange enables us to specify exactly which queue the message should go to channel.queue_bind (exchange =' python-test',queue = result.method.queue) # define a callback function to process messages in the message queue Here is the printed def callback (ch, method, properties, body): ch.basic_ack (delivery_tag = method.delivery_tag) print (body.decode ()) channel.basic_consume (result.method.queue,callback,# is set to False. When the callback function is called, the acknowledgement ID is not received, and the message will return to the queue. True, no matter whether the call to callback is successful or not, the message is consumed auto_ack = False) channel.start_consuming () pattern 2: direct

The principle of this working mode is that the message is sent to the exchange,exchange and forwarded to the corresponding queue according to the routing key (routing_key).

You can use the default exchange =''or customize exchange

There is no need to bind exchange to anything in this mode, and of course binding is possible. You can bind exchange to queue, routing_key to queue

Routing_key needs to be specified when delivering or receiving messages

You need to start the subscriber first. The queue in this mode is randomly generated by consumer, and the publisher only publishes the message to exchange, and the exchange forwards the message to queue.

Published by:

Import pikaimport jsoncredentials = pika.PlainCredentials ('shampoo',' 123456') # mq username and password # the virtual queue needs to specify the parameter virtual_host, which can be left empty if it is the default. Connection = pika.BlockingConnection (pika.ConnectionParameters (host = '10.1.62.170recording recording port = 5672 channel=connection.channel virtualizer host =' /', credentials = credentials)) channel=connection.channel () # declares exchange, and exchange specifies the queue in which the message is delivered, and if it does not exist, create it. Durable = True represents exchange persistent storage, and False non-persistent storage channel.exchange_declare (exchange = 'python-test',durable = True, exchange_type='direct') for i in range (10): message=json.dumps ({' OrderId': "1000% s"% I}) # specifies routing_key. Delivery_mode = 2 declares that the message is persisted in the queue, delivery_mod = 1 message is not persistent channel.basic_publish (exchange = 'python-test',routing_key =' OrderId',body = message, properties=pika.BasicProperties (delivery_mode = 2)) print (message) connection.close ()

Consumers:

Import pikacredentials = pika.PlainCredentials ('shampoo',' 123456') connection = pika.BlockingConnection (pika.ConnectionParameters (host = '10.1.62.170virtualkeeper host =' /', credentials = credentials)) channel = connection.channel () # create a temporary queue with empty characters in the queue name. After consumer is closed, the queue is automatically deleted result = channel.queue_declare (', exclusive=True) # declares exchange, and exchange specifies the queue in which the message is delivered, and if it does not exist, create it. Durable = True represents exchange persistent storage, False non-persistent storage channel.exchange_declare (exchange = 'python-test',durable = True, exchange_type='direct') # binding exchange and queue exchange enables us to specify exactly which queue the message should go to channel.queue_bind (exchange =' python-test',queue = result.method.queue,routing_key='OrderId') # define a callback function to process messages in the message queue Here is to print out def callback (ch, method, properties, body): ch.basic_ack (delivery_tag = method.delivery_tag) print (body.decode ()) # channel.basic_qos (prefetch_count=1) # tell rabbitmq that callback is used to accept the message channel.basic_consume (result.method.queue,callback,# is set to False. When the callback function is called, the message will return to the queue if the acknowledgement ID is not received. True, regardless of whether the call to callback is successful or not, the message is consumed auto_ack = False) channel.start_consuming () pattern 3: topicd

This mode is similar to the second mode, where exchange also forwards messages to the specified queue through the routing key routing_key. The difference is that routing_key uses regular expressions to support fuzzy matching, but the matching rules are different from regular regular expressions, such as "#" matches all and "*" matches one word.

For example: routing_key = "# orderid#", which means to forward the message to all queues where the routing_key contains the "orderid" character. The code is similar to pattern 2, so it won't be posted.

The above is the content of this article on "what are the three working modes of python operating RabbitMq". I believe we all have a certain understanding. I hope the content shared by the editor will be helpful to you. If you want to know more related knowledge, please pay attention to the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report