In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
How to use RedisTemplate in SpringBoot to re-consume non-ACK messages in Redis Stream? I believe many inexperienced people are at a loss about this. Therefore, this paper summarizes the causes and solutions of the problem. Through this article, I hope you can solve this problem.
After the consumer group obtains the message from stream, it will be assigned to one of the consumers in its own group for consumption. After consumption, the consumer needs to return ACK to the consumer group, indicating that the message has been consumed.
When consumers get a message from the consumer group, they will first add the message to their own pending message list, and when the consumer returns ACK to the consumer group, the message will be deleted from the pending queue. (each consumer has its own pending message queue)
Consumers may not return to ACK in time. For example, if the consumer crashes after consumption and fails to return ACK in time, this will cause the message to occupy twice the memory (one in stream and one in the consumer's pending message list)
Before you start, simulate a little bit of data 1 through the Redis client, open the Redis client (we call it the production side), and create a streamm called my_streamXADD my_stream * hello world
Casually add a message to initialize the stream
2. Create a consumer group called my_groupXGROUP CREATE my_stream my_group $3, start a new Redis client (we call it consumer side 1), use the consumer group for blocking consumption, specify the consumer: my_consumer1XREADGROUP GROUP my_group my_consumer1 BLOCK 0 STREAMS my_stream > 4, and then start a new Redis client (we call it: consumer side 2), and use the consumer group for blocking consumption Specified consumer: my_consumer2XREADGROUP GROUP my_group my_consumer2 BLOCK 0 STREAMS my_stream > 5. Push 3 messages XADD my_stream * message1 HelloXADD my_stream * message2 SpringBootXADD my_stream * message3 Community through the production end
Consumer side 1
Consumer end 2
As you can see, a total of three messages have been Push, and their ID is
1605524648266-0 (message1)
1605524657157-0 (message2)
1605524665215-0 (message3)
The current situation is that Consumer 1 consumes two messages (message1 and message3), and Consumer 2 consumes one message (message2). All of them are successful in consumption, but none of them have carried out ACK yet.
On the client side, the consumer will return immediately after consuming a message and need to re-execute the command to return to the blocking state.
ACK message
Now we are going to ACK the message1 that consumer 1 consumes.
XACK my_stream my_group 1605524648266-0 gets the messages to be confirmed (ACK) in the specified consumer group. View the statistics of all messages to be confirmed in the consumer group 127.0.0.1 XPENDING my_stream my_group1 6379) (integer) 2 # consumer group, the number of pending messages for all consumers 2) "1605524657157-0" # pending message, minimum message ID3) "1605524665215-0" # pending message Maximum message ID4) 1) "my_consumer1" # Consumer 12) "1" # has a message to be confirmed 2) 1) "my_consumer2" # Consumer 2 2) "1" # there are 2 messages to be confirmed. Check the details of Consumer 1's message to be confirmed 127.0.0.1 XPENDING my_stream my_ 6379 Group 0 + 10 my_consumer11) 1) "1605524665215-0" # waiting for ACK message ID 2) "my_consumer1" # owned consumer 3) (integer) 847437 # time elapsed since the message was obtained by the consumer (millisecond)-idle time 4) (integer) 1 # number of times the message was obtained-delivery counter
This command indicates that the opending queue of the consumer my_consumer1 in the consumer group my_group is queried. The starting ID is 0 and the ending ID is the largest. A maximum of 10 results can be retrieved.
The current situation is that there are a total of three messages, consumer 1 consumes two messages and ack consumes one message. Consumer 2 consumes one item and has no ack. Consumers 1 and 2 each have an unack message in their pending queues
How to achieve re-consumption of messages that have not been successfully consumed? The purpose of the previous demonstration is to create some data, so the client command is used, and from here on, all demonstrations will use RedisTemplate in spring-data-redis.
Traverse the consumer's pending list, read the messages that are not ACK, and ACKimport java.time.Duration;import java.util.List;import java.util.Map;import org.junit.Test;import org.junit.runner.RunWith;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;import org.springframework.data.domain.Range directly. Import org.springframework.data.redis.connection.stream.Consumer;import org.springframework.data.redis.connection.stream.MapRecord;import org.springframework.data.redis.connection.stream.PendingMessages;import org.springframework.data.redis.connection.stream.PendingMessagesSummary;import org.springframework.data.redis.connection.stream.RecordId;import org.springframework.data.redis.core.StreamOperations;import org.springframework.data.redis.core.StringRedisTemplate;import org.springframework.test.context.junit4.SpringRunner;import io.springboot.jwt.SpringBootJwtApplication RunWith (SpringRunner.class) @ SpringBootTest (classes = SpringBootJwtApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT) public class RedisStreamTest {private static final Logger LOGGER = LoggerFactory.getLogger (RedisStreamTest.class); @ Autowired private StringRedisTemplate stringRedisTemplate; @ Test public void test () {StreamOperations streamOperations = this.stringRedisTemplate.opsForStream () / / get the pending message information in my_group, in essence, execute the XPENDING instruction PendingMessagesSummary pendingMessagesSummary = streamOperations.pending ("my_stream", "my_group"); / / the number of all pending messages long totalPendingMessages = pendingMessagesSummary.getTotalPendingMessages () / / Consumer group name String groupName= pendingMessagesSummary.getGroupName (); / / minimum ID String minMessageId in pending queue = pendingMessagesSummary.minMessageId (); / / maximum ID String maxMessageId in pending queue = pendingMessagesSummary.maxMessageId () LOGGER.info ("Consumer Group: {}, a total of {} pending messages, maximum ID= {}, minimum ID= {}", groupName, totalPendingMessages, minMessageId, maxMessageId); / / number of pending messages per consumer Map pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer () PendingMessagesPerConsumer.entrySet () .forEach (entry-> {/ / Consumer String consumer = entry.getKey (); / / number of pending messages for consumers long consumerTotalPendingMessages = entry.getValue () LOGGER.info ("Consumer: {}, a total of {} pending messages", consumer, consumerTotalPendingMessages) If (consumerTotalPendingMessages > 0) {/ / read the first 10 records of the consumer pending queue, starting with the record of ID=0 Until the maximum ID value PendingMessages pendingMessages = streamOperations.pending ("my_stream", Consumer.from ("my_group", consumer), Range.closed ("0", "+"), 10) / / iterate through the details of all Opending messages pendingMessages.forEach (message-> {/ / message ID RecordId recordId = message.getId () / / the message is obtained from the consumer group. Duration elapsedTimeSinceLastDelivery = message.getElapsedTimeSinceLastDelivery (); / / the number of times the message was obtained long deliveryCount = message.getTotalDeliveryCount () LOGGER.info ("openg message, id= {}, elapsedTimeSinceLastDelivery= {}, deliveryCount= {}", recordId, elapsedTimeSinceLastDelivery, deliveryCount) / * demonstrate that the judgment of manual consumption is very targeted, and the purpose is to read the consumer "my_consumer1" pending message. This message of ID=1605524665215-0 * / if (consumer.equals ("my_consumer1") & & recordId.toString (). Equals ("1605524665215-0")) {/ / read this pending message directly through streamOperations List result = streamOperations.range ("my_stream", Range.rightOpen ("1605524665215-0", "1605524665215-0")) / / both the start and the end are the same ID, so there is only one MapRecord record = result.get (0) / / the log output is executed here, and the simulation is the consumption logic LOGGER.info ("consumed pending messages: id= {}, value= {}", record.getId (), record.getValue ()) / / if the manual consumption is successful, the ACK Long retVal = streamOperations.acknowledge ("my_group", record) of the message is submitted to the consumer group LOGGER.info ("message ack, a total of {} messages", retVal);});}}
This way is to traverse the pending messages of the consumption group, then traverse the id list of pending messages of each consumer, and then go directly to stream to read the message according to id, and then consume Ack.
Output log consumption group: my_group, there are two pending messages, the maximum ID=1605524657157-0, the minimum ID=1605524665215-0 consumer: my_consumer1, there is a total of one pending message openg message, id=1605524665215-0, elapsedTimeSinceLastDelivery=PT1H9M4.061S, deliveryCount=1 consumes pending message: id=1605524665215-0, value= {message3=Community} message ack, a total of one consumer: my_consumer2, there is a total of one pending message openg message, id=1605524657157-0, elapsedTimeSinceLastDelivery=PT1H9M12.172S, deliveryCount=1
The end result is that the only pending message for Consumer 1 has been Ack. Here are a few points to note.
When traversing the consumer pending list, the minimum / maximum message id can be based on the results in the XPENDING instruction. I wrote 0-+ just to be lazy.
When traversing consumer pending messages, you can make some logical judgments based on elapsedTimeSinceLastDelivery (idle time) and deliveryCount (delivery counter). The longer the elapsedTimeSinceLastDelivery is, the longer the message has been consumed, and no Ack,deliveryCount indicates that it has not been successfully consumed after N times of re-delivery (described below). It may be a problem with consumption logic or a problem with Ack.
Check again for XPENDING message 127.0.0.1 XPENDING my_stream my_group 6379 > integer 1) (integer) 12) "1605524657157-0" 3) "1605524657157-0" 4) 1) 1) "my_consumer2" 2) "1"
Consumer 1, there is only one message waiting for ack, which has been manually consumed and manually ack by us, so there is only one pending message left for Consumer 2.
Consumers who change messages through XCLAIM
If a consumer has been unable to consume a message, or if the consumer can never go online because of some news, then the consumer's pending message can be transferred to the pending list of other consumers and re-consumed.
In fact, what we need to do here is to give the only pending message "1605524657157-0" (message2) of "Consumer 2" to "Consumer 1" for new consumption.
Implementation of Redis command XCLAIM my_stream my_group my_consumer1 10000 1605524657157-0
Re-consume the message 1605524657157-0 to the my_consumer1 in my_group, provided that the idle time of the message is greater than 10 seconds (there is no Ack for more than 10 seconds since the message was obtained).
Implementation of import java.time.Duration;import java.util.List;import org.junit.Test;import org.junit.runner.RunWith;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;import org.springframework.dao.DataAccessException;import org.springframework.data.redis.connection.RedisConnection;import org.springframework.data.redis.connection.stream.ByteRecord for Java client Import org.springframework.data.redis.connection.stream.RecordId;import org.springframework.data.redis.core.RedisCallback;import org.springframework.data.redis.core.StringRedisTemplate;import org.springframework.test.context.junit4.SpringRunner;import io.springboot.jwt.SpringBootJwtApplication;@RunWith (SpringRunner.class) @ SpringBootTest (classes = SpringBootJwtApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT) public class RedisStreamTest {private static final Logger LOGGER = LoggerFactory.getLogger (RedisStreamTest.class); @ Autowired private StringRedisTemplate stringRedisTemplate @ Test public void test () {List retVal = this.stringRedisTemplate.execute (new RedisCallback () {@ Override public List doInRedis (RedisConnection connection) throws DataAccessException {/ / XCLAIM instruction implementation Return connection.streamCommands () .xClaim ("my_stream" .getBytes () "my_group", "my_consumer1", Duration.ofSeconds (10), RecordId.of ("1605524657157-0") }}); for (ByteRecord byteRecord: retVal) {LOGGER.info ("changed message consumers: id= {}, value= {}", byteRecord.getId (), byteRecord.getValue ()) } the consumer whose log output changed the message: id=1605524657157-0, value= {[Block10b4f345 = [B@63de4fa} check XPENDING message 127.0.0.1XPENDING my_stream my_group 6379 > XPENDING my_stream my_group 1) (integer) 12) "1605524657157-0" 3) "1605524657157-0" 4) 1) "my_consumer1" 2) "1"
As you can see, the message "1605524657157-0" (message2) has been transferred from "Consumer 2" to "Consumer 1". The next thing to do is to traverse the pending list of "Consumer 1" and consume it.
Read the list of pending messages for consumption
At the beginning, when controlling and demonstrating the consumer blocking consumption through the client, a command was written.
XREADGROUP GROUP my_group my_consumer1 BLOCK 0 STREAMS my_stream >
The last >, which means ID, is a special character. If not, when ID is not a special character >, XREADGROUP no longer reads messages from the message queue, but reads historical messages from the consumer's pending message list. (the parameter is generally set to 0-0, which means that all pending messages are read.)
Redis command 127.0.0.1 STREAMS my_stream 6379 > my_stream 0 STREAMS my_stream 01) 1) "my_stream" 2) 1) 1) "1605524657157-0" 2) 1) "message2" 2) "SpringBoot"
Read the only message record in the consumer 1 pending message.
Implementing import java.util.List;import org.junit.Test;import org.junit.runner.RunWith;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;import org.springframework.data.redis.connection.stream.Consumer;import org.springframework.data.redis.connection.stream.MapRecord;import org.springframework.data.redis.connection.stream.ReadOffset with Java Import org.springframework.data.redis.connection.stream.StreamOffset;import org.springframework.data.redis.core.StreamOperations;import org.springframework.data.redis.core.StringRedisTemplate;import org.springframework.test.context.junit4.SpringRunner;import io.springboot.jwt.SpringBootJwtApplication;@RunWith (SpringRunner.class) @ SpringBootTest (classes = SpringBootJwtApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT) public class RedisStreamTest {private static final Logger LOGGER = LoggerFactory.getLogger (RedisStreamTest.class); @ Autowired private StringRedisTemplate stringRedisTemplate @ SuppressWarnings ("unchecked") @ Test public void test () {StreamOperations streamOperations = this.stringRedisTemplate.opsForStream () / / read message List retVal = streamOperations.read from the consumer's pending queue (Consumer.from ("my_group", "my_consumer1"), StreamOffset.create ("my_stream", ReadOffset.from ("0") / / traverse message for (MapRecord record: retVal) {/ / consume message LOGGER.info ("message id= {}, message value= {}", record.getId (), record.getValue ()) / / Manual ack message streamOperations.acknowledge ("my_group", record);}
In this way, the data is read directly from the consumer's pending queue, consumed manually, and then Ack
Log message id=1605524657157-0, message value= {message2=SpringBoot} View XPENDING message 127.0.0.1 XPENDING my_stream my_group 1) (integer) 02) (nil) 3) (nil) 4) (nil)
No, none. All of them are Ack.
Dead letter
Dead letters, that is, messages that cannot be consumed all the time, can be judged by these two attributes, idle time and delivery counter.
Idle time starts to be timed when the message is read by the consumer. If the idle time of a pending message is very long, it means that an exception occurred during the Ack, or the consumer went down before it could be Ack. As a result, the message has not been Ack all the time. When the message is transferred, it will be cleared to zero and re-timed.
Delivery counter, which indicates the number of transfers. Whenever the consumer of a message changes, its value will be + 1. If the delivery counter value of a pending message is very large, it means that it cannot be successfully consumed after many transfers among multiple consumers, and can be manually read and consumed.
Redis5's stream, it can be said that the function is still quite powerful (the design draws heavily on a Kakfa). If the scale of the application is not large and you need a MQ service, I think you can try it in Stream. It is faster and easier to maintain than building your own kakfa,RocketMQ.
After reading the above, have you learned how to use RedisTemplate in SpringBoot to re-consume messages that are not ACK in Redis Stream? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.