Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use Redis's streams

2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly introduces "how to use Redis's streams". In daily operation, I believe many people have doubts about how to use Redis's streams. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful for you to answer the doubts about "how to use Redis's streams"! Next, please follow the editor to study!

Origin

With the introduction of modules in Redis 4.0, users began to think about how they could fix these problems themselves. One of the users, Timothy Downs, said to me via IRC:

\ I plan to add a transaction log data type to this module-which means that a large number of subscribers can do things like publish / subscribe without causing a surge in redis memory.\ subscribers hold their location in the message queue instead of having Redis maintain each consumer's location and copy messages for each subscriber

His train of thought inspired me. I thought about it for a few days and realized that this might be an opportunity for us to solve all the above problems at once. I need to rethink what the concept of "journal" is. Logging is a basic programming element, and everyone has used it because it simply opens a file in append mode and writes data in a certain format. However, Redis data structures must be abstract. They are in memory, and we use memory not because we are lazy, but because we use pointers, we can conceptualize data structures and abstract them to free them from explicit restrictions. For example, generally speaking, there are several problems with logs: the offset is not logical, but the real byte offset. What if you want a logical offset related to the time the entry was inserted? We have a range query available. Similarly, logs are often difficult to garbage collect: how do you delete old elements in a data structure that can only be appended? Well, in our ideal log, we just need to say, I want the entry with the number *, and none of the old elements, and so on.

When I was inspired by Timothy's idea to try to write a specification, I used the radix tree in the Redis cluster to implement it, optimizing some of its internal parts. This provides the basis for implementing a log that uses space efficiently, and it is still possible to access the scope within the logarithmic time logarithmic time. At the same time, I began to read about the stream of Kafka for additional inspiration, which is also very suitable for my design, * draws on the concept of the Kafka consumer group consumer groups, and optimizes again for Redis to be suitable for the use of Redis in memory. However, the specification was only on paper, and after a while I almost rewrote it from beginning to end to add many of the suggestions I had discussed with others to the Redis upgrade. I want Redis flows to be useful features for time series, not just an application of common event and message classes.

Let's write some code.

Since I got back from the Redis conference, I've been implementing a library called listpack all summer. This library is the successor to ziplist.c, a data structure that represents a list of string elements in a single allocation. It is a very special serialization format characterized by the ability to parse in reverse order (from right to left): to replace ziplists in various use cases.

Combined with the features of the radix tree and listpacks, it is easy to build a space-efficient log and is indexable, which means that random access is allowed through ID and time. Since this is in place, I have started to write some code to implement the stream data structure. I'm still working on this implementation, and anyway, it's ready to run in the streams branch of Redis on Github. I don't claim that API is the final version of 100%, but there are two interesting facts: first, only consumer groups were missing at that time, plus some less important commands for the flow of operations, but all the big aspects have been implemented. Second, once all aspects are stable, I decided to take about two months to migrate all the stream features backward to the 4. 0 branch. This means that Redis users want to use streams without waiting for the release of Redis 4.2, which will be available in a production environment right away. This is possible because as a new data structure, almost all code changes appear in the new code. Except for blocking list operations: the code is refactored, and we share the same code for flow and list blocking operations, which greatly simplifies the internal implementation of Redis.

Tutorial: welcome to Redis's streams

In some ways, you can think of streaming as an enhanced version of the Redis list. The stream element is no longer a single string, but an object consisting of a field field and a value value. Range queries are more applicable and faster. In the flow, each entry has an ID, which is a logical offset. Different clients can block waiting for elements whose blocking-wait is larger than the specified ID. One of the basic commands for Redis streams is XADD. Yes, all Redis stream commands are prefixed with an X.

> XADD mystream * sensor-id 1234 temperature 10.51506871964177.0

This XADD command appends the specified entry as a new element of the specified stream-- "mystream". The entry in the above example has two fields: sensor-id and temperature, and each entry can have different fields in the same flow. Memory can be better utilized by using the same field name. Interestingly, the sorting of fields can guarantee the order. XADD returns only the ID of the inserted entry because it is an asterisk (*) in the third parameter, indicating that the ID is automatically generated by the command. This is usually enough, but you can also force an ID to be specified, which is used to copy this command to the slave server slave server and AOFappend-only file files.

The ID consists of two parts: a millisecond time and a serial number. 1506871964177 is a millisecond time, it is just a millisecond UNIX timestamp. The number 0 after the dot (.) is a sequence number that is added to distinguish entries with the same number of milliseconds. Both numbers are 64-bit unsigned integers. This means that we can add all the items we want to the stream, even in the same millisecond. The millisecond portion of ID uses one of the * between the ID generated by the current local time of the Redis server and an entry ID in the stream. So, for example, even if the computer time jumps back, the ID is still increasing. In some cases, you can think of the ID of the stream entry as a complete 128bit number. In fact, however, they are related to the local time of the instance being added, which means that we can query at will in the range of millisecond precision.

As you might expect, after adding two items quickly, the result is that only one serial number is incremented. We can simply simulate the "quick insert" with a MULTI/EXEC block:

> MULTIOK > XADD mystream * foo 10QUEUED > XADD mystream * bar 20QUEUED > EXEC1) 1506872463535.02) 1506872463535.1

In the above example, it is also shown that different fields are used for different entries without specifying any initial schema schema. What's going to happen? As mentioned earlier, only * messages for each block (which usually contains 50-150 message contents) are used. Also, successive entries in the same field are compressed with a flag that indicates that "they are the same as the fields of the * entries in this block." As a result, consecutive messages with the same fields can save a lot of memory, even if the field set changes slowly over time.

To retrieve data from the stream, there are two ways: range query, which is implemented through the XRANGE command, and streaming streaming, which is implemented through the XREAD command. The XRANGE command only takes all entries from start to stop. So, for example, if I knew its ID, I could get a single entry using the following name:

> XRANGE mystream 1506871964177.01) 1) 1506871964177.01) 1) 1506871964177.02) 1) "sensor-id" 2) "1234" 3) "temperature" 4) "10.5"

In any case, you can use the specified start symbol-and the stop symbol + for the minimum and * ID. To limit the number of entries returned, you can also use the COUNT option. Here is a more complex example of XRANGE:

> XRANGE mystream-+ COUNT 21) 1) 1506871964177.02) 1) "sensor-id" 2) "1234" 3) "temperature" 4) 1) 1506872463535.0 2) 1) "foo" 2) "10"

Here we are talking about the scope of ID, and then, in order to get a specific range of elements within a given time range, you can use XRANGE, because the "sequence number" part of ID can be omitted. Therefore, you can just specify a "millisecond" time, and the following command means: "give me 10 entries from UNIX time 1506872463":

127.0.0.1 foo 6379 > XRANGE mystream 1506872463000 + COUNT 101) 1) 1506872463535.0 2) 1) "foo" 2) "10" 2) 1) 1506872463535.1 2) 1) "bar" 2) "20"

The most important thing to note about XRANGE is that suppose we receive ID in the reply, and then the successive ID just increments the ordinal part, so you can use XRANGE to traverse the entire stream, receiving a specified number of elements for each call. The * SCAN series of commands in Redis allow you to iterate over Redis data structures, although in fact they are not designed for iteration, but this avoids making the same mistake again.

Using XREAD to handle streaming: blocking new data

When we want to access a scope in the flow through ID or time, or to get a single element through ID, using XRANGE is very *. However, in the case of using streams, when the data arrives and it must be consumed by different clients, this is not a good solution, which requires some form of aggregation pool pooling. (this may be a good idea for some applications, as they are only occasionally joined to queries)

The XREAD command is designed to read, specifying only the ID of the * entries we get from that stream from multiple streams at the same time. In addition, if there is no data available, we can ask for blocking and unblock when the data arrives. Similar to the effect of blocking list operations, but there is no consumption of data from the stream, and multiple clients can access the same data at the same time.

Here is a typical example of a XREAD call:

> XREAD BLOCK 5000 STREAMS mystream otherstream $$

It means: get data from mystream and otherstream. If no data is available, block the client for 5000 milliseconds. Specify the keyword we want to listen to after the STREAMS option. * specifies the ID we want to listen to. The specified ID is $, which means: suppose I need all the elements in the stream now, so I just need to start with the next element that arrives.

If I send a command like this from another client:

> XADD otherstream * message "Hi There"

What happens on the XREAD side?

1) 1) "otherstream" 2) 1) 1) 1506935385635.0 2) 1) "message" 2) "Hi There"

Along with the data received, we also get the keywords of the data. In the next call, we will use the ID of the received * * message:

XREAD BLOCK 5000 STREAMS mystream otherstream $1506935385635.0

And so on. However, it is important to note how it is used, and it is possible for the client to connect again after a very large delay (because it takes time to process the message, or whatever). In this case, there will be a lot of messages piling up during this period, and it is wise to use the COUNT option of XREAD to ensure that the client is not inundated with messages and that the server does not waste too much time providing a large number of messages to a single client.

Flow capping

It looks good so far. Sometimes, however, the stream needs to delete some old messages. Fortunately, this can be done using the MAXLEN option of the XADD command:

> XADD mystream MAXLEN 1000000 * field1 value1 field2 value2

It basically means that if you add new elements to the flow and find that the number of messages exceeds 1000000, then delete the old messages so that the total number of elements can return to less than 1000000. It is very similar to the RPUSH + LTRIM used in the list, but this time we use a built-in mechanism to do it. However, it is important to note that the above means that every time we add a new message, we need additional work to delete the old message from the stream. This will consume some CPU resources, so before calculating the MAXLEN, use the ~ symbol as much as possible to show that we don't require a very accurate 1000000 messages, even a little more is not a big problem:

> XADD mystream MAXLEN ~ 1000000 * foo bar

XADD in this way deletes messages only if it can delete the entire node. Compared with ordinary XADD, this method is almost free to cap the flow.

Consumer group (under development)

This is a feature in the * * Redis that has not yet been implemented but is under development. The inspiration also comes from Kafka, although here it is done in different ways. The point is that XREAD is used, and the client can also add a GROUP option. All clients in the same group will automatically get different messages. Of course, the same stream can be read by multiple groups. In this case, all groups will receive the same copy of the message arriving in the stream. However, within each group, messages are not repeated.

When you specify a group, you can specify a RETRY option to extend the group: in this case, if the message is not acknowledged by XACK, it will be delivered again after the specified number of milliseconds. This will provide better reliability for message delivery, in which case the client has no proprietary method to mark the message as processed. This part is also under development.

Memory usage and save loading time

Because of the design used to model Redis streams, memory usage is very low. Depending on the number and length of their fields and values, there can be millions of messages per 100MB memory for simple messages. In addition, the format is envisaged to require very little serialization: listpack blocks are stored as radix tree nodes, represented in the same way on disk and in memory, so they can be easily stored and read. For example, Redis can read 5 million entries from a RDB file in 0.3 seconds. This makes the replication and persistence of streams very efficient.

I also plan to allow partial deletions from the middle of the entry. Now that it is only partially implemented, the policy is to identify the item as deleted in the tag, and when the proportion of deleted entries to all entries reaches the specified value, the block will be reclaimed and rewritten, if necessary, it will be connected to another adjacent block to avoid fragmentation.

At this point, the study on "how to use Redis's streams" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Database

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report