Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to analyze Streams as message queue in the new features of Redis5

2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >

Share

Shulou(Shulou.com)05/31 Report--

In this issue, the editor will bring you an analysis of how to use Streams as a message queue in the new features of Redis5. The article is rich in content and analyzes and describes it from a professional point of view. I hope you can get something after reading this article.

Preface

Among the new features of Redis 5, the introduction of Streams data structures can be said to be the biggest feature in this iteration. It enables Redis to get better and more powerful native support when it is used as a message queue in this iteration of 5.x version, especially persistent message queue. At the same time, stream draws lessons from the concept and design of kafka's consumer group model, which makes consumer message processing more efficient and fast. API is often used to analyze the data structure.

Prepare for

The version of Redis used in this article is 5.0.5. If you use an earlier version of 5.x, some API effects are slightly different from those described in this article.

Add message

The Streams add data is added using the XADD instruction, and the data in the message is operated in the form of KMel V key-value pairs. There can be multiple key-value pairs in a message. Add command format:

XADD key ID field string [field string...]

Where key is the name of Streams, ID is the unique flag of the message, and cannot be repeated. Field string is the key-value pair. Let's add a stream with the name person to do the operation.

XADD person * name ytao des https://ytao.top

In the case added above, ID uses the * number to copy, which means that the server automatically generates Id and returns the data "1578238486193-0" after it is added.

The automatically generated Id format here is-Id, which consists of two parts:

Hongmeng official Strategic Cooperation to build HarmonyOS Technology Community

MillisecondsTime is the current server time millisecond timestamp.

The current sequence number of sequenceNumber. The value is derived from the order in which messages are generated within the current millisecond. By default, it starts from 0 and increases by 1.

For example, 1578238486193-3 represents the fourth message added at the timestamp of 1578238486193 milliseconds.

In addition to the server-side automatic generation of Id, the generation of specified Id is also supported, but the specified Id is subject to the following restrictions:

Hongmeng official Strategic Cooperation to build HarmonyOS Technology Community

The front and back parts of the Id must be numeric.

The minimum Id is 0-1, cannot be 0-0, but 2-0pm 3-0. Is allowed.

For the added message, the first half of the Id cannot be smaller than the maximum value of the existing Id, and the second half of the Id cannot be smaller than the largest second half of the same first half.

Otherwise, if the above conditions are not met, an exception will be thrown after adding:

(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

In fact, when a message is added, two operations are performed. The first step is to determine that if there is no Streams, create the name of the Streams, and then add the message to the Streams. Even when you add a message, the name of the current Streams can exist in the Redis because of the Id exception. Id in Streams can also be used as a pointer because it is an ordered tag.

In production, if you add messages in this way, there will be a problem, that is, when the number of messages is too large, it will bring down the service. This problem is also taken into account in the early design of Streams, that is, the capacity of Streams can be specified. If the capacity operates on this set value, the old message will be adjusted. Set the MAXLEN parameter when you add a message.

XADD person MAXLEN 5 * name ytao des https://ytao.top

This specifies that the capacity in the Streams is 5 messages. You can also use XTRIM to intercept messages and eliminate redundant messages from small to large:

XTRIM person MAXLEN 8

Number of messages

Check the number of messages and use the XLEN directive to operate.

XLEN key

Example: check the number of messages in the person stream:

> XLEN person (integer) 5

Query message

XRANGE and XREVRANGE directives are used to query messages in Streams.

XRANGE

When querying data, you can query according to the specified Id range. XRANGE query instruction format:

XRANGE key start end [COUNT count]

Parameter description:

Key is the name of Streams

Start starts the Id for the scope query, including this Id.

Start is the end Id of the scope query, including this Id.

Count returns the maximum number of messages for the query, not required.

Here start and end have two unspecified values-and +, which represent infinitesimal and infinity, respectively, so when using these two values, all messages are queried.

> XRANGE person-+ 1) 1) "name" 2) "ytao" 3) "des" 4) "https://ytao.top" 2) 1)" 0-2 "2) 1)" name "2)" luffy "3)" des "4)" valiant! "3) 1)" 2-0 "2) 1)" name "2)" gaga "3)" des "4)" fishion! "

For the message data queried above, you can see that it is queried in the first-in-first-out order.

Use COUNT to specify the number of queries to return:

# query all messages and return a data > XRANGE person-+ COUNT 1 1) 1) "0-1" 2) 1) "name" 2) "ytao" 3) "des" 4) "https://ytao.top""

In the range query, the second half of the Id can be omitted, and all the data in the second half will be queried.

XREVRANGE

The query for XREVRANGE is similar to that used in the XRANGE directive, but the order of the start and end parameters of the query is reversed:

XREVRANGE key end start [COUNT count]

Use case:

> XREVRANGE person +-1) "2-0" 2) 1) "name" 2) "gaga" 3) "des" 4) "fishion!" 2) "0-2" 2) 1) "name" 2) "luffy" 3) "des" 4) "valiant!" 3) 1) "0-1" "2) 1)" name "2)" ytao "3)" des "4)" https://ytao.top"

The result of the query is in reverse order with that of XRANGE, and everything else is the same. These two instructions can be used to return messages in ascending and descending order.

Delete message

Delete messages using the XDEL directive. You only need to specify the Streams name and Id to be deleted. Multiple messages can be deleted at a time.

XDEL key ID [ID...]

Delete a case:

# query all messages > XRANGE person-+ 1) 1) "name" 2) "ytao" 3) "des" 4) "https://ytao.top" 2) 1)" name "2)" luffy "3)" des "4)" valiant " ! "3) 1)" 2-0 "2) 1)" name "2)" gaga "3)" des "4)" fishion! "# Delete messages > XDEL person 2-0 (integer) 1 # query all deleted messages again > XRANGE person-+ 1) 1)" 0-1 "2) 1)" name " 2) "ytao" 3) "des" 4) "https://ytao.top" 2) 1)" 0-2 "2) 1)" name "2)" luffy "3)" des "4)" valiant! "# query deleted length > XLEN person (integer) 2

As you can see from above, after the message is deleted, the length is also reduced by the corresponding number.

Consumption message

In the PUB/SUB of Redis, we consume messages through subscriptions, and we can also achieve the same function in the Streams data structure. When there is no new message, we can block and wait. Support not only individual consumption, but also group consumption.

Individual consumption

Use the XREAD instruction for separate consumption. As you can see, STREAMS,key and ID are required in the following command. ID indicates that messages larger than the ID will be read. When the ID value is given with $, it represents the maximum Id value of the message that already exists.

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key...] ID [ID...]

The above COUNT parameter is used to specify the maximum number of reads, just like XRANGE.

> XREAD COUNT 1 STREAMS person 0 1) 1) "person" 2) 1) 1) "name" 2) "ytao" 3) "des" 4) "https://ytao.top" > XREAD COUNT 2 STREAMS person 0 1) 1)" person "2) 1) 1)" 0-1 " 2) 1) "name" 2) "ytao" 3) "des" 4) "https://ytao.top" 2) 1)" 0-2 "2) 1)" name "2)" luffy "3)" des "4)" valiant! "

There is also a BLOCK parameter in XREAD, which is used to block subscription messages. The parameter BLOCK carries is the blocking time (in milliseconds). If there is no new message consumption within this time, the blocking will be released. When the time here is specified as 0, it will block until a new message is consumed.

# window 1 opens blocking and waits for new messages to arrive > XREAD BLOCK 0 STREAMS person $# another connection window 2, add a new message > XADD person 2-2 name tao des coder "2-2" # window 1, get new messages to consume And with blocking time > XREAD BLOCK 0 STREAMS person $1) "person" 2) 1) 1) "2-2" 2) 1) "name" 2) "tao" 3) "des" 4) "coder" (60.81s)

When using XREAD for sequential consumption, you need to record the Id read to the location to facilitate the next consumption.

Group consumption

The main purpose of group consumption is to divert messages to different clients to process messages at a more efficient rate. In order to meet this liver function requirement, we need to do three things: create a group, read the message from the group, and confirm the message to the server for processing.

Group operation

The operation group uses the XGROUP instruction:

XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]

In the above command, the actions included are:

CREATE creates a consumption group.

SETID modifies the next Id that processes the message.

DESTROY destroys the consumer group.

DELCONSUMER deletes the specified consumer in the consumer group.

What we need to use now is to create a consumer group:

# start consumption with the maximum existing Id > XGROUP CREATE person group1 $OK

Group read message

Group reading uses the XREADGROUP instruction, and COUNT and BLOCK use operations similar to XREAD, but with the addition of a group and consumer specification:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key...] ID [ID...]

Since group consumption is similar to individual consumption, there is only a blocking analysis here. Here Id also has a special value >, which indicates a message that has not been consumed yet:

# window 1, in the consumer group, taotao consumers establish blocking monitoring XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person > # window 2. In consumer groups, yangyang consumers establish blocking monitoring XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person > # window 3, add consumption messages > XADD person 3-1 name tony des 666 "3-1" # window 1, and read new messages. Window 2 has no response at this time > XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person > 1) "person" 2) 1) 1) "3-1" 2) 1) "name" 2) "tony" 3) "des" 4) "666" (77.54s) # window 3 Add consumption message > XADD person 3-2 name james des abc! "3-2" # window 2 again, read the new message Window 1 has no response at this time > XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person > 1) "person" 2) 1) 1) "3-2" 2) 1) "name" 2) "james" 3) "des" 4) "abc!" (76.36s)

In the above execution process, there are two consumers in the group1 group, and when two messages are added, the two consumers consume in turn.

Message ACK

After the message is consumed, in order to avoid repeated consumption, it is necessary to send an ACK to the server to ensure that the message is consumed. For example, in the following case, we have consumed the last two messages, but when we read the message again, we are still read:

> XREADGROUP GROUP group1 yangyang STREAMS person 0 1) 1) "person" 2) 1) 1) "3-2" 2) 1) "name" 2) "james" 3) "des" 4) "abc!"

At this point, we use the XACK directive to tell the server that we have processed the message:

XACK key group ID [ID...] 0

Let the server mark 3-2 has been processed:

> XACK person group1 3-2 (integer) 1

Get the group read message again:

> XREADGROUP GROUP group1 yangyang STREAMS person 0 1) 1) "person" 2) (empty list or set)

There are no readable messages in the queue. In addition to the API described above, the consumer group information can be viewed using the XINFO directive.

Based on the analysis of the commonly used API of Streams, we can feel that Redis is becoming more and more powerful on the road of message queue support. If you have used its PUB/SUB function, you will feel that the 5.x iteration optimizes some of your pain points.

The above is the editor for you to share how to analyze the Streams message queue in the new features of Redis5. If you happen to have similar doubts, please refer to the above analysis to understand. If you want to know more about it, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Database

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report