In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly introduces "how to implement blocking queue based on Redis". In daily operation, I believe many people have doubts about how to achieve blocking queue based on Redis. The editor consulted all kinds of data and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubt of "how to implement blocking queue based on Redis". Next, please follow the editor to study!
There are also many ways to implement queues in list based on Redis. First, it is not recommended to use LPUSH to produce messages, and then to consume messages through RPOP in while (true). This way is indeed possible, but constant polling by code is bound to consume some system resources.
The second method, which is also not recommended, is to produce messages through LPUSH, and then block and consume messages through BRPOP. Compared with the first method, this method reduces useless polling and reduces the consumption of system resources, but there may be queue message loss. If the message is taken out and the processing fails, the extracted message will be lost.
The second way is to produce messages through LPUSH at first, and then wait for the new messages to arrive through BRPOPLPUSH, then start to consume the new messages. At the same time, backup the messages to another list. This method has the advantage of the second way, that is, it reduces useless polling, and at the same time, backing up the messages will not lose data. If the processing is successful, The current message in the backed-up list can be deleted through LREM.
Redis Base # inserts one or more values value into the header LPUSH key value of the list key [value …] # blocking wait to pop up the last element (the tail element) in the list source and return it to the client. Insert the source pop-up element into the list destination as the header element of the destination list. The timeout parameter timeout accepts a number in seconds as the value. The timeout parameter set to 0 indicates that the blocking time can be extended indefinitely (block indefinitely). BRPOPLPUSH source destination timeout# removes the element in the list that is equal to the parameter value based on the value of the parameter count. LREM key count value code implements queue message producer
The author uses Spring-related API to call Redis instructions. First, the production code of the message is implemented and encapsulated into a tool class method. It's simple to call the lpush method to add the serialized key and value to the list.
@ Resourceprivate RedisConnectionFactory connectionFactory;public void lPush (@ Nonnull String key, @ Nonnull String value) {RedisConnection connection = RedisConnectionUtils.getConnection (connectionFactory); try {byte [] byteKey = RedisSerializer.string () .serialize (getKey (key)); byte [] byteValue = RedisSerializer.string () .serialize (value); assert byteKey! = null; connection.lPush (byteKey, byteValue);} finally {RedisConnectionUtils.releaseConnection (connection, connectionFactory);}} Code implements queue message consumers
Because there is a lot of code to implement queue consuming messages, it is impossible to write this code in every place where blocking consumption is needed, so the functional interface of Java8 is used to realize the delivery of methods, and the blocking access message code is executed with a new thread.
Someone saw the following code to complain, didn't they say not to use while (true)? why do you still have it here? explain it a little bit here, because SpringBoot generally specifies the global timeout of timeout, even if BRPOPLPUSH is set to 0, that is, indefinitely, when the value set by timeout is exceeded, a QueryTimeoutException exception will be thrown to cause the thread to exit, so try/catch is added to catch the exception and ignore it. At the same time, use while (true) to ensure that threads can continue to execute.
The current message processing result is recorded in the code, and if the result is successful, the current message in the backup queue needs to be deleted.
Public void bRPopLPush (@ Nonnull String key, Consumer consumer) {CompletableFuture.runAsync (()-> {RedisConnection connection = RedisConnectionUtils.getConnection (connectionFactory); try {byte [] srcKey = RedisSerializer.string () .serialize (getKey (key)); byte [] dstKey = RedisSerializer.string () .serialize (getBackupKey (key)); assert srcKey! = null; assert dstKey! = null; while (true) {byte [] byteValue = new byte [0] Boolean success = false; try {byteValue = connection.bRPopLPush (0, srcKey, dstKey); if (byteValue! = null & & byteValue.length! = 0) {consumer.accept (new String (byteValue)); success = true }} catch (Exception ignored) {/ / prevent getting key timeout and throwing QueryTimeoutException exception exit} finally {if (success) {/ / delete the backup queue's key connection.lRem (dstKey, 1, byteValue) after successful processing;}} finally {RedisConnectionUtils.releaseConnection (connection, connectionFactory) }});} Test code @ Testpublic void testLPush () throws InterruptedException {String queueA = "queueA"; int I = 0; while (true) {String msg = "Hello-" + iTunes; redisBlockQueue.lPush (queueA, msg); System.out.println ("lPush:" + msg); Thread.sleep (3000);} @ Testpublic void testBRPopLPush () {String queueA = "queueA" RedisBlockQueue.bRPopLPush (queueA, (val)-> {/ / handle the specific business logic System.out.println ("val:" + val);}) here; / / prevent the Junit process from exiting LockSupport.park ();} project usage
For ease of use, I extracted it as a tool class, which can be used through Spring injection.
Queue consumption can be used to block the queue and wait for consumption when the project starts.
@ Resourceprivate RedisBlockQueue redisBlockQueue;@PostConstructpublic void init () {redisBlockQueue.bRPopLPush (xx, (value)-> {/ /...});} at this point, the study on "how to implement blocking queues based on Redis" is over, hoping to solve everyone's doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.