Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to parse the source code of Redis distributed reentrant lock based on Redisson in Springboot

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly introduces Springboot based on Redisson how to achieve Redis distributed reentrant lock source code parsing, has a certain reference value, interested friends can refer to, I hope you can learn a lot after reading this article, the following let Xiaobian take you to understand.

I. Preface

We use Redis to implement distributed locks. At first, we generally use SET resource-name anystring NX EX max-lock-time to add locks, and use Lua scripts to ensure atomicity to release locks. It is troublesome to implement it manually, and the official website of Redis also makes it clear that Redisson is used in the Java version. The editor also read the official website to slowly explore clearly, close-up this record. From the official website to the integration of Springboot to source code interpretation, take a single node as an example.

Second, why use Redisson1. Let's open the official website.

Redis Chinese official website

two。 We can see that the authorities have asked us to use other

3. Open the official recommendation

4. Find the document

Redisson address

5. Redisson structure

Third, Springboot integrates Redisson1. Import depends on org.springframework.boot spring-boot-starter-data-redis redis.clients jedis org.redisson redisson 3.12.02. Take the official website as an example to see how to configure

3. Write the configuration class import org.redisson.Redisson;import org.redisson.api.RedissonClient;import org.redisson.config.Config;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration / * @ author wangzhenjun * @ date, 2022-2-9 9:57 * / @ Configurationpublic class MyRedissonConfig {/ * all the use of redisson is done through RedissonClient * @ return * / @ Bean (destroyMethod= "shutdown") public RedissonClient redisson () {/ / 1. Create configuration Config config = new Config (); / / be sure to add redis:// config.useSingleServer (). SetAddress ("redis://192.168.17.130:6379"); / / 2. Create a redissonClient instance based on config RedissonClient redissonClient = Redisson.create (config); return redissonClient;} 4. Example of locking on the official website test

5. Write @ ResponseBody@GetMapping ("/ hello") public String hello () {/ / 1 according to the simple Controller interface of the official website. To acquire a lock, as long as the lock name is the same, it is the same lock RLock lock = redisson.getLock ("my-lock"); / / 2. Lock lock.lock (); / / blocking trial wait for 30s / / with parameters / / lock.lock (10, TimeUnit.SECONDS) by default; / / 10s automatically unlock, and the automatic unlocking time must be longer than the business execution time. Try {System.out.println ("lock successfully" + Thread.currentThread (). GetId ()); Thread.sleep (30000);} catch (InterruptedException e) {e.printStackTrace ();} finally {/ / 3. Unlock System.out.println ("unlocked successfully:" + Thread.currentThread () .getId ()); lock.unlock ();} return "hello";} 6. test

Fourth, lock.lock () source code analysis 1. Open the RedissonLock implementation class

two。 Find the implementation method @ Overridepublic void lock () {try {/ / We found that the default expiration time of the non-wearing expiration time source code is-1 lock (- 1, null, false);} catch (InterruptedException e) {throw new IllegalStateException ();}} 3. Press and hold Ctrl to enter the lock method private void lock (long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {/ / to get the thread's id. When the lock is in possession, the value of field is UUID: thread number id long threadId = Thread.currentThread (). GetId (); / / attempts to acquire the lock Long ttl = tryAcquire (leaseTime, unit, threadId); / / lock acquired acquires the lock and returns if (ttl = = null) {return } / / if you failed to acquire the lock, subscribe to the lock through the thread id RFuture future = subscribe (threadId); if (interruptibly) {commandExecutor.syncSubscriptionInterrupted (future);} else {commandExecutor.syncSubscription (future) } try {/ / spin here, keep trying to acquire lock while (true) {/ / continue trying to acquire lock ttl = tryAcquire (leaseTime, unit, threadId) / / lock acquired obtains the successful if (ttl = = null) {/ / returns directly, picking out the spin break } / / waiting for message continues to wait for lock if (ttl > = 0) {try {future.getNow () .getLatch () .tryAcquire (ttl, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {if (interruptibly) {throw e } future.getNow (). GetLatch (). TryAcquire (ttl, TimeUnit.MILLISECONDS);}} else {if (interruptibly) {future.getNow (). GetLatch (). Acquire ();} else {future.getNow (). GetLatch (). AcquireUninterruptibly () }} finally {/ / Unsubscribe unsubscribe (future, threadId);} / / get (lockAsync (leaseTime, unit));} 4. Go in and try to get the lock.

Private Long tryAcquire (long leaseTime, TimeUnit unit, long threadId) {/ / directly enter the asynchronous method return get (tryAcquireAsync (leaseTime, unit, threadId);} private RFuture tryAcquireAsync (long leaseTime, TimeUnit unit, long threadId) {/ / determine if the parameter leaseTime =-1 if (leaseTime! =-1) {return tryLockInnerAsync (leaseTime, unit, threadId, RedisCommands.EVAL_LONG) is not set. } / / this method is used to acquire the lock, and the expiration time is the default time of the watchdog / / private long lockWatchdogTimeout = 30 * 1000; the default expiration time of the watchdog is 30s / / lock and the expiration time is guaranteed to be atomic. This method must be called to execute the Lua script. Let's see RFuture ttlRemainingFuture = tryLockInnerAsync (commandExecutor.getConnectionManager (). GetCfg (). GetLockWatchdogTimeout (), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG) / / start a scheduled task to refresh the expiration time ttlRemainingFuture.onComplete ((ttlRemaining, e)-> {if (e! = null) {return;} / / lock acquired get the lock if (ttlRemaining = = null) {/ / refresh the expiration time method. Let's talk about scheduleExpirationRenewal (threadId) in detail next. ); return ttlRemainingFuture;5. View the tryLockInnerAsync () method RFuture tryLockInnerAsync (long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {internalLockLeaseTime = unit.toMillis (leaseTime) Return commandExecutor.evalWriteAsync (getName (), LongCodec.INSTANCE, command, / / first determine whether the lock exists "if (redis.call ('exists', KEYS [1]) = = 0) then" + / / acquire the lock "redis.call (' hset', KEYS [1], ARGV [2], 1) "+ / / then set expiration time" redis.call ('pexpire', KEYS [1], ARGV [1]); "+" return nil; "+" end "+ / / hexists to see if the specified field of the hash table exists, there is a lock and the current thread holds the lock" if (redis.call ('hexists', KEYS [1], ARGV [2]) = = 1) then "+ / / hincrby increments one" redis.call (' hincrby', KEYS [1], ARGV [2], 1) "if the value of the + / / lock is greater than 1, it means that the lock can be reentered and the expiration time can be reset" redis.call ('pexpire', KEYS [1], ARGV [1]); "+" return nil; "+" end "+ / / lock already exists and is not the current thread, then the expiration time ttl" return redis.call ('pttl', KEYS [1]); ", Collections.singletonList (getName ()), internalLockLeaseTime, getLockName (threadId));} 6. Enter the scheduleExpirationRenewal () method of the scheduled task left by 4

Look for the source code step by step: scheduleExpirationRenewal-> renewExpiration

According to the following source code, the refresh time of the scheduled task is: internalLockLeaseTime / 3, which is 1 / 3 of the watchdog, that is, it is refreshed every 10 seconds.

Private void renewExpiration () {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get (getEntryName ()); if (ee = = null) {return;} Timeout task = commandExecutor.getConnectionManager (). NewTimeout (new TimerTask () {@ Override public void run (Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get (getEntryName ()); if (ent = = null) {return } Long threadId = ent.getFirstThreadId (); if (threadId = = null) {return;} RFuture future = renewExpirationAsync (threadId) Future.onComplete ((res, e)-> {if (e! = null) {log.error ("Can't update lock" + getName () + "expiration", e); return } if (res) {/ / reschedule itself renewExpiration ();}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); ee.setTimeout (task);} V. Lock.lock (10, TimeUnit.SECONDS) source code analysis

1. Open the implementation class

@ Overridepublic void lock (long leaseTime, TimeUnit unit) {try {/ / the expiration time here is 10 lock (leaseTime, unit, false) that we entered;} catch (InterruptedException e) {throw new IllegalStateException ();}}

two。 The method lock () realizes the display, which is the same as the three. 3 source code

3. Go directly to the attempt to acquire the lock tryAcquireAsync () method

Private RFuture tryAcquireAsync (long leaseTime, TimeUnit unit, long threadId) {/ / determine here that if the parameter leaseTime =-1 is not set, then we will be 10 if (leaseTime! =-1) {/ / come to this method return tryLockInnerAsync (leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} / / omit the latter here and explain it in detail. }

4. Open the tryLockInnerAsync () method

It is not difficult to find that it is the same as not passing the expiration time, except that the value of leaseTime has changed.

RFuture tryLockInnerAsync (long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {internalLockLeaseTime = unit.toMillis (leaseTime) Return commandExecutor.evalWriteAsync (getName (), LongCodec.INSTANCE, command, / / first determine whether the lock exists "if (redis.call ('exists', KEYS [1]) = = 0) then" + / / acquire the lock "redis.call (' hset', KEYS [1], ARGV [2], 1) "+ / / then set expiration time" redis.call ('pexpire', KEYS [1], ARGV [1]); "+" return nil; "+" end "+ / / hexists to see if the specified field of the hash table exists, there is a lock and the current thread holds the lock" if (redis.call ('hexists', KEYS [1], ARGV [2]) = = 1) then "+ / / hincrby increments one" redis.call (' hincrby', KEYS [1], ARGV [2], 1) "if the value of the + / / lock is greater than 1, it means that the lock can be reentered and the expiration time can be reset" redis.call ('pexpire', KEYS [1], ARGV [1]); "+" return nil; "+" end "+ / / lock already exists and is not the thread, then the expiration time ttl" return redis.call ('pttl', KEYS [1]); ", Collections.singletonList (getName ()), internalLockLeaseTime, getLockName (threadId));} VI. Lock.unlock () source code analysis

1. Open method implementation

@ Overridepublic void unlock () {try {/ / Click to enter the release lock method get (unlockAsync (Thread.currentThread (). GetId ());} catch (RedisException e) {if (e.getCause () instanceof IllegalMonitorStateException) {throw (IllegalMonitorStateException) e.getCause ();} else {throw e;} / / Future future = unlockAsync () / / future.awaitUninterruptibly (); / if (future.isSuccess ()) {/ / return;//} / / if (future.cause () instanceof IllegalMonitorStateException) {/ / throw (IllegalMonitorStateException) future.cause (); / /} / / throw commandExecutor.convertException (future);}

two。 Open the unlockAsync () method

@ Overridepublic RFuture unlockAsync (long threadId) {RPromise result = new RedissonPromise (); / / unlock method, then expand and say RFuture future = unlockInnerAsync (threadId); / / complete future.onComplete ((opStatus, e)-> {if (e! = null) {/ / cancel expiration renewal cancelExpirationRenewal (threadId)) / / Mark this future as a failure and notify everyone result.tryFailure (e); return Empty status indicates that the unlocked thread and the current lock are not the same thread if (opStatus = = null) {IllegalMonitorStateException cause = new IllegalMonitorStateException ("attempt to unlock lock, not locked by current thread by node id:" + id + "thread-id:" + threadId); result.tryFailure (cause); return } cancelExpirationRenewal (threadId); result.trySuccess (null);}); return result;}

3. Open the unlockInnerAsync () method

Protected RFuture unlockInnerAsync (long threadId) {return commandExecutor.evalWriteAsync (getName (), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, / / determine whether the thread that releases the lock and the thread that already has the lock are the same thread, and does not return empty "if ('hexists', KEYS [1], ARGV [3]) = 0) then" + "return nil;" + "end "local counter = redis.call ('hincrby', KEYS [1], ARGV [3],-1)" after the lock is released. "+ / / determines whether the remaining quantity is greater than 0" if (counter > 0) then "+ / / > 0, then refresh the expiration time" redis.call ('pexpire', KEYS [1], ARGV [2]); "+" return 0 "+" else "+ / / release lock, delete key and publish lock release message" redis.call ('del', KEYS [1]); "+" redis.call (' publish', KEYS [2], ARGV [1]); "+" return 1; "+" end "+" return nil; ", Arrays.asList (getName (), getChannelName ()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName (threadId) } Thank you for reading this article carefully. I hope the article "how to implement Springboot distributed reentrant lock source code parsing based on Redisson" shared by the editor will be helpful to everyone. At the same time, I also hope you will support us and pay attention to the industry information channel. More related knowledge is waiting for you to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report