In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >
Share
Shulou(Shulou.com)05/31 Report--
This article mainly introduces Redis how to achieve distributed locking and waiting sequence, has a certain reference value, interested friends can refer to, I hope you can learn a lot after reading this article, let the editor take you to understand it.
In a cluster, resource contention and concurrency problems are often dealt with at the same time, but we all know that the scope of synchronization locks such as synchronized, cas and ReentrankLock is JVM. To put it bluntly, it is useless in a cluster. At this point, we need locks that can determine the order of execution among multiple JVM. Now distributed locks are mainly implemented by redis, Zookeeper, and database, but the performance is too poor, that is, they need a third-party supervision.
Background
Recently, when I was doing a consumer Kafka message, I found that because there are too many consumers online, we often encounter the situation that multiple machines deal with data of a primary key type at the same time. If the update operation is finally performed, it is a problem of update order, but if all the data needs to be inserted, there will be the problem of primary key duplication. This is not allowed in production (because the company has abnormal supervision mechanism, deduction points, etc.), this is the need for a distributed lock, after consideration, the implementation of Redis (because there are many examples on the Internet)
Analysis.
The distributed lock implemented by redis is based on the set method, because when multiple threads request at the same time, only one thread can succeed and return the result, and the validity period can be set to avoid deadlock. Everything is so perfect, but there is a problem. In set, it will directly return the result, success or failure, without blocking effect, so we need to deal with the failed thread process by ourselves. There are two ways.
Discard
Waiting for retry because our system needs this data, we can only try to get it again. Here, the List type of redis is used to realize the function of waiting sequence.
Code
In fact, the utility class of redis can solve the problem by going directly to the code.
Package com.testimport redis.clients.jedis.Jedis;import java.util.Collections;import java.util.List;/** * @ desc redis queue implementation * @ anthor * @ date * * / public class RedisUcUitl {private static final String LOCK_SUCCESS = "OK"; private static final String SET_IF_NOT_EXIST = "NX"; private static final String SET_WITH_EXPIRE_TIME = "PX"; private static final Long RELEASE_SUCCESS = 1L Private RedisUcUitl () {} / * logger * * / / * Storage redis queue order is stored in the queue header * * @ param key byte type * @ param value byte type * / public static Long lpush (Jedis jedis, final byte [] key, final byte [] value) {return jedis.lpush (key, value) } / * remove the last element in the list and add the changed element to another list When the list is empty, the connection is blocked until waiting for timeout * * @ param srckey * @ param dstkey * @ param timeout 0 means never timeout * @ return * / public static byte [] brpoplpush (Jedis jedis,final byte [] srckey, final byte [] dstkey, final int timeout) {return jedis.brpoplpush (srckey, dstkey, timeout) } / * returns the defined key. The redis data of the starting position * @ param redisKey * @ param start * @ param end-1 represents the last * @ return * / public static List lrange (Jedis jedis,final byte [] redisKey, final long start, final long end) {return jedis.lrange (redisKey, start, end). } / * * Delete key * @ param redisKey * / public static void delete (Jedis jedis, final byte [] redisKey) {return jedis.del (redisKey) } / * attempt to lock * @ param lockKey key name * @ param requestId identity * @ param expireTime expiration * / public static boolean tryGetDistributedLock (Jedis jedis,final String lockKey, final String requestId, final int expireTime) {String result = jedis.set (lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); return LOCK_SUCCESS.equals (result) } / * * release lock * @ param lockKey key name * @ param requestId identity * @ return * / public static boolean releaseDistributedLock (Jedis jedis,final String lockKey, final String requestId) {final String script = "if redis.call ('get', KEYS [1]) = ARGV [1] then return redis.call (' del', KEYS [1]) else return 0 end"; jedis.eval (script, Collections.singletonList (lockKey), Collections.singletonList (requestId)) Return RELEASE_SUCCESS.equals (result);}}
The main code of the business logic is as follows
1. Consume the queue first
While (true) {/ / consumption queue try {/ / data serialized in redis queue byte [] bytes = RedisUcUitl.brpoplpush (keyStr.getBytes (UTF_8), dstKeyStr.getBytes (UTF_8), 1); if (bytes = = null | | bytes.isEmpty ()) {/ / exit break;} / / deserialize object Map singleMap = (Map) ObjectSerialUtil.bytesToObject (bytes) if there is no data in the queue / / plug in a unique value to prevent the lock from being misunderstood by other threads: String requestId = UUID.randomUUID (). ToString (); boolean lockGetFlag = RedisUcUitl.tryGetDistributedLock (keyStr,requestId, 100); if (lockGetFlag) {/ / successfully acquire the lock for business processing / / TODO / / release the lock boolean freeLock = RedisUcUitl.releaseDistributedLock (keyStr,requestId) } else {/ / failed to get the lock and put it in the waiting queue RedisUcUitl.lpush (keyStr.getBytes (UTF_8), ObjectSerialUtil.objectToBytes (param));}} catch (Exception e) {break;}}
two。 Processing the latest data received
It is also the process of trying to acquire the lock and not getting it into the queue.
General serialization is fine with the list of fastJson, which is included with JDK. The utility classes are as follows
Public class ObjectSerialUtil {private ObjectSerialUtil () {/ / utility class} / * * serialize Object objects into byte [] * * @ param obj objects * @ return byte array * @ throws Exception * / public static byte [] objectToBytes (Object obj) throws IOException {ByteArrayOutputStream bos = new ByteArrayOutputStream (); ObjectOutputStream oos = new ObjectOutputStream (bos); oos.writeObject (obj); byte [] bytes = bos.toByteArray () Bos.close (); oos.close (); return bytes;} / * * restore the bytes array to an object * * @ param bytes * @ return * @ throws Exception * / public static Object bytesToObject (byte [] bytes) {try {ByteArrayInputStream bin = new ByteArrayInputStream (bytes); ObjectInputStream ois = new ObjectInputStream (bin); return ois.readObject () } catch (Exception e) {throw new BaseException ("deserialization error!" , e);} Thank you for reading this article carefully. I hope the article "how to achieve distributed locking and waiting sequence in Redis" shared by the editor will be helpful to everyone. At the same time, I also hope that you will support us and pay attention to the industry information channel. More related knowledge is waiting for you to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.