In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-02 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >
Share
Shulou(Shulou.com)05/31 Report--
How to implement a message queue using Redis flow? I believe that many inexperienced people are at a loss about this, so this article summarizes the causes and solutions of the problem. Through this article, I hope you can solve this problem.
Listing 10-1 shows a message queuing implementation with basic functions:
At the beginning of the code are several conversion functions that are responsible for converting and formatting the relevant inputs and outputs of the program.
The MessageQueue class is used to implement message queues. Its three methods of adding messages, removing messages and returning messages use the stream's XADD command, XDEL command and XLEN command, respectively.
The two fetching methods get_message () and get_by_range () of the message queue call the XRANGE command of the stream in two forms, respectively.
Finally, the iterate () method for iterating messages uses the XREAD command stream to iterate.
Listing 10-1 message queuing using Redis flow: / stream/message_queue.py
Def reconstruct_message_list (message_list): "" in order for multiple messages to be returned to the caller in a more structured manner, multiple messages returned by Redis are changed from the original format: [(id1, {k1:v1, k2:v2,...}), (id2, {k1:v1, k2:v2,...}),...] Convert to the following format: [{id1: {k1:v1, k2:v2,...}}, {id2: {k1:v1, k2:v2,...}},...] "" result = [] for id, kvs in message_list: result.append ({id: kvs}) return resultdef get_message_from_nested_list (lst): "" fetches the message ontology from the nested list. "" return lst [0] [1] class MessageQueue: "" message queuing implemented using Redis flows. "" def _ init__ (self, client, stream_key): self.client = client self.stream = stream_key def add_message (self, key_value_pairs): "stores the given key-value pair in the message and returns the corresponding message ID. "" return self.client.xadd (self.stream, key_value_pairs) def get_message (self, message_id): "" returns the corresponding message based on the given message ID, or None if the message does not exist. "" reply = self.client.xrange (self.stream, message_id, message_id) if len (reply) = = 1: return get_message_from_nested_list (reply) def remove_message (self, message_id): "" deletes the corresponding message based on the given message ID, and ignores the action if the message does not exist. "" self.client.xdel (self.stream, message_id) def len (self): "" returns the length of the message queue. "" return self.client.xlen (self.stream) def get_by_range (self, start_id, end_id, max_item=10): "" returns messages in the queue based on a given ID interval range. "" reply = self.client.xrange (self.stream, start_id, end_id, max_item) return reconstruct_message_list (reply) def iterate (self, start_id=0, max_item=10): "" iterates through the message queue and returns up to N messages larger than a given ID. "" reply = self.client.xread ({self.stream: start_id}, max_item) if len (reply) = = 0: return list () else: messages = get_message_from_nested_list (reply) return reconstruct_message_list (messages)
For this message queue implementation, we can create an instance of it by executing the following code:
> from redis import Redis > from message_queue import MessageQueue > client = Redis (decode_responses=True) > mq = MessageQueue (client, "mq")
Then add ten messages to the queue by executing the following code:
For i in range (10):... Key = "key {0}" .format (I)... Value = "value {0}" .format (I)... Msg = {key:value}... Mq.add_message (msg)... '1554113926280-1, 1554113926281-0, 1554113926281-1, 1554113926281-2, 1554113926281-3, 1554113926281-4, 1554113926281-5, 1554113926281-6, 1554113926282-0'
You can also get a specified message based on ID, or you can use the get_by_range () method to get multiple messages at the same time:
> > mq.get_message ('1554113926280-0') {' key0': 'value0'} > mq.get_message (' 1554113926280-1') {'key1':' value1'} > mq.get_by_range ("-", "+", 3) [{'1554113926280-0: {' key0': 'value0'}}, {' 1554113926280-1: {'key1':' value1'}, {'1554113926281-0: {' key2': 'value2'}}]
Or use the iterate () method to iterate over the message queue, and so on:
> > mq.iterate (0,3) [{'1554113926280-0cycles: {' key0': 'value0'}}, {' 1554113926281-0levels: {'key1':' value1'}}, {'1554113926281-0levels: {' key2': 'value2'}}] > > mq.iterate (' 1554113926281-0levels, 3) [{'1554113926281-1 steps: {' key3': 'value3'}}, {' 1554113926281-2levels: {'key4':' value4'} {'1554113926281-3 hours: {' key5': 'value5'}}] finish reading the above content Have you mastered how to implement a message queue using Redis streams? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.