Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to solve the problem of using Redisson subscriptions

2025-02-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

In this article, the editor introduces in detail "how to solve the problem of using Redisson subscriptions". The content is detailed, the steps are clear, and the details are handled properly. I hope this article "how to solve the problem of using Redisson subscriptions" can help you solve your doubts.

I. premise

Recently, I encountered an online problem when using the distributed lock redisson: it was found that the size of the subscriptionsPerConnection or subscriptionConnectionPoolSize was not enough and the configuration needed to be improved to solve it.

Second, source code analysis

Let's analyze the source code to find out what logic is causing the problem:

1. RedissonLock#lock () method private void lock (long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {long threadId = Thread.currentThread () .getId (); / / attempt to acquire, if ttl = = null, Long ttl = tryAcquire (leaseTime, unit, threadId); / / lock acquired if (ttl = = null) {return } / / subscribe to lock release events, and block waiting for lock release by await method, effectively solving the problem of invalid lock application wasting resources RFuture future = subscribe (threadId); if (interruptibly) {commandExecutor.syncSubscriptionInterrupted (future);} else {commandExecutor.syncSubscription (future) The following code ignores the try {/ / infinite loop to acquire the lock until / /...} finally {/ / unsubscribe the lock release event unsubscribe (future, threadId);}}

Summarize the main logic:

Gets the thread id of the current thread

TryAquire attempts to acquire the lock and returns ttl

If ttl is empty, end the process; otherwise enter the subsequent logic

This.subscribe (threadId) subscribes to the current thread and returns a RFuture

If it is not monitored at the specified time, an exception such as the above will be generated.

After the subscription is successful, you keep trying to acquire the lock through the while (true) loop

Fially code block, which unsubscribes

So the above problem should appear in the subscribe () method.

2. Take a closer look at the subscribe () method protected RFuture subscribe (long threadId) {/ / entryName format: "id:name"; / / channelName format: "redisson_lock__channel:name"; return pubSub.subscribe (getEntryName (), getChannelName ());}

RedissonLock#pubSub is initialized in the RedissonLock constructor:

Public RedissonLock (CommandAsyncExecutor commandExecutor, String name) {/ / This.pubSub = commandExecutor.getConnectionManager (). GetSubscribeService (). GetLockPubSub ();

SubscribeService is constructed in the following ways in the implementation of MasterSlaveConnectionManager

Public MasterSlaveConnectionManager (MasterSlaveServersConfig cfg, Config config, UUID id) {this (config, id); this.config = cfg; / / initialize initTimer (cfg); initSingleEntry ();} protected void initTimer (MasterSlaveServersConfig config) {int [] timeouts = new int [] {config.getRetryInterval (), config.getTimeout ()}; Arrays.sort (timeouts); int minTimeout = timeouts [0] If (minTimeout% 100! = 0) {minTimeout = (minTimeout% 100) / 2;} else if (minTimeout = = 100) {minTimeout = 50;} else {minTimeout = 100;} timer = new HashedWheelTimer (new DefaultThreadFactory ("redisson-timer"), minTimeout, TimeUnit.MILLISECONDS, 1024, false); connectionWatcher = new IdleConnectionWatcher (this, config) / / initialization: where this is the MasterSlaveConnectionManager instance and config is the MasterSlaveServersConfig instance: subscribeService = new PublishSubscribeService (this, config);}

PublishSubscribeService constructor

Private final SemaphorePubSub semaphorePubSub = new SemaphorePubSub (this); public PublishSubscribeService (ConnectionManager connectionManager, MasterSlaveServersConfig config) {super (); this.connectionManager = connectionManager; this.config = config; for (int I = 0; I

< locks.length; i++) { // 这里初始化了一组信号量,每个信号量的初始值为1 locks[i] = new AsyncSemaphore(1); }}3、回到subscribe()方法主要逻辑还是交给了 LockPubSub#subscribe()里面private final ConcurrentMap entries = new ConcurrentHashMap();public RFuture subscribe(String entryName, String channelName) { // 从PublishSubscribeService获取对应的信号量。 相同的channelName获取的是同一个信号量 // public AsyncSemaphore getSemaphore(ChannelName channelName) { // return locks[Math.abs(channelName.hashCode() % locks.length)]; // } AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName)); AtomicReference listenerHolder = new AtomicReference(); RPromise newPromise = new RedissonPromise() { @Override public boolean cancel(boolean mayInterruptIfRunning) { return semaphore.remove(listenerHolder.get()); } }; Runnable listener = new Runnable() { @Override public void run() { // 如果存在RedissonLockEntry, 则直接利用已有的监听 E entry = entries.get(entryName); if (entry != null) { entry.acquire(); semaphore.release(); entry.getPromise().onComplete(new TransferListener(newPromise)); return; } E value = createEntry(newPromise); value.acquire(); E oldValue = entries.putIfAbsent(entryName, value); if (oldValue != null) { oldValue.acquire(); semaphore.release(); oldValue.getPromise().onComplete(new TransferListener(newPromise)); return; } // 创建监听, RedisPubSubListener listener = createListener(channelName, value); // 订阅监听 service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener); } }; // 最终会执行listener.run方法 semaphore.acquire(listener); listenerHolder.set(listener); return newPromise;} AsyncSemaphore#acquire()方法 public void acquire(Runnable listener) { acquire(listener, 1);}public void acquire(Runnable listener, int permits) { boolean run = false; synchronized (this) { // counter初始化值为1 if (counter < permits) { // 如果不是第一次执行,则将listener加入到listeners集合中 listeners.add(new Entry(listener, permits)); return; } else { counter -= permits; run = true; } } // 第一次执行acquire, 才会执行listener.run()方法 if (run) { listener.run(); }} 梳理上述逻辑: 1、从PublishSubscribeService获取对应的信号量, 相同的channelName获取的是同一个信号量 2、如果是第一次请求,则会立马执行listener.run()方法, 否则需要等上个线程获取到该信号量执行完方能执行; 3、如果已经存在RedissonLockEntry, 则利用已经订阅就行 4、如果不存在RedissonLockEntry, 则会创建新的RedissonLockEntry,然后进行。 从上面代码看,主要逻辑是交给了PublishSubscribeService#subscribe方法 4、PublishSubscribeService#subscribe逻辑如下:private final ConcurrentMap name2PubSubConnection = new ConcurrentHashMap();private final Queue freePubSubConnections = new ConcurrentLinkedQueue();public RFuture subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener... listeners) { RPromise promise = new RedissonPromise(); // 主要逻辑入口, 这里要主要channelName每次都是新对象, 但内部覆写hashCode+equals。 subscribe(codec, new ChannelName(channelName), promise, PubSubType.SUBSCRIBE, semaphore, listeners); return promise;}private void subscribe(Codec codec, ChannelName channelName, RPromise promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener... listeners) { PubSubConnectionEntry connEntry = name2PubSubConnection.get(channelName); if (connEntry != null) { // 从已有Connection中取,如果存在直接把listeners加入到PubSubConnectionEntry中 addListeners(channelName, promise, type, lock, connEntry, listeners); return; } // 没有时,才是最重要的逻辑 freePubSubLock.acquire(new Runnable() { @Override public void run() { if (promise.isDone()) { lock.release(); freePubSubLock.release(); return; } // 从队列中取头部元素 PubSubConnectionEntry freeEntry = freePubSubConnections.peek(); if (freeEntry == null) { // 第一次肯定是没有的需要建立 connect(codec, channelName, promise, type, lock, listeners); return; } // 如果存在则尝试获取,如果remainFreeAmount小于0则抛出异常终止了。 int remainFreeAmount = freeEntry.tryAcquire(); if (remainFreeAmount == -1) { throw new IllegalStateException(); } PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry); if (oldEntry != null) { freeEntry.release(); freePubSubLock.release(); addListeners(channelName, promise, type, lock, oldEntry, listeners); return; } // 如果remainFreeAmount=0, 则从队列中移除 if (remainFreeAmount == 0) { freePubSubConnections.poll(); } freePubSubLock.release(); // 增加监听 RFuture subscribeFuture = addListeners(channelName, promise, type, lock, freeEntry, listeners); ChannelFuture future; if (PubSubType.PSUBSCRIBE == type) { future = freeEntry.psubscribe(codec, channelName); } else { future = freeEntry.subscribe(codec, channelName); } future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { if (!promise.isDone()) { subscribeFuture.cancel(false); } return; } connectionManager.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { subscribeFuture.cancel(false); } }, config.getTimeout(), TimeUnit.MILLISECONDS); } }); } });}private void connect(Codec codec, ChannelName channelName, RPromise promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener... listeners) { // 根据channelName计算出slot获取PubSubConnection int slot = connectionManager.calcSlot(channelName.getName()); RFuture connFuture = nextPubSubConnection(slot); promise.onComplete((res, e) ->

{if (e! = null) {((RPromise) connFuture) .tryFailure (e);}}); connFuture.onComplete ((conn, e)-> {if (e! = null) {freePubSubLock.release (); lock.release (); promise.tryFailure (e); return } / / subscriptionsPerConnection PubSubConnectionEntry entry = new PubSubConnectionEntry (conn, config.getSubscriptionsPerConnection ()) will be read from the configuration; / / each time it is fetched, the subscriptionsPerConnection will be reduced to 0 int remainFreeAmount = entry.tryAcquire (); / / if the old one exists, the existing entry will be released, and then the listeners will be added to the oldEntry PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent (channelName, entry) If (oldEntry! = null) {releaseSubscribeConnection (slot, entry); freePubSubLock.release (); addListeners (channelName, promise, type, lock, oldEntry, listeners); return;} if (remainFreeAmount > 0) {/ / join freePubSubConnections.add (entry);} freePubSubLock.release () RFuture subscribeFuture = addListeners (channelName, promise, type, lock, entry, listeners); / / actual subscription (underlying interaction with redis) ChannelFuture future; if (PubSubType.PSUBSCRIBE = = type) {future = entry.psubscribe (codec, channelName);} else {future = entry.subscribe (codec, channelName) } future.addListener (new ChannelFutureListener () {@ Override public void operationComplete (ChannelFuture future) throws Exception {if (! future.isSuccess ()) {if (! promise.isDone ()) {subscribeFuture.cancel (false);} return } connectionManager.newTimeout (new TimerTask () {@ Override public void run (Timeout timeout) throws Exception {subscribeFuture.cancel (false);}, config.getTimeout (), TimeUnit.MILLISECONDS);}

The PubSubConnectionEntry#tryAcquire method, where subscriptionsPerConnection represents the maximum number of subscriptions per connection. This number is reduced when tryAcqcurie:

Public int tryAcquire () {while (true) {int value = subscribedChannelsAmount.get (); if (value = = 0) {return-1;} if (subscribedChannelsAmount.compareAndSet (value, value-1)) {return value-1;}

Sort out the above logic:

1. Make a repeated judgment, according to the channelName obtained from the name2PubSubConnection, to see if there is a subscription: if the PubSubConnectionEntry; exists, add the new listener directly to the PubSubConnectionEntry.

2. Get the public PubSubConnectionEntry from the queue freePubSubConnections, and enter the connect () method if it is not available

2.1 creates a PubSubConnectionEntry based on subscriptionsPerConnection and then calls its tryAcquire () method-minus 1 for each call

2.2 put the new PubSubConnectionEntry into the global name2PubSubConnection for subsequent reuse

2.3At the same time, PubSubConnectionEntry is put into queue freePubSubConnections. -remainFreeAmount > 0

2. 4 is followed by the underlying subscribe and addListener.

3. If the PubSubConnectionEntry already exists, then use the existing PubSubConnectionEntry for tryAcquire

4. If remainFreeAmount < 0, an IllegalStateException exception will be thrown; if remainFreeAmount=0, it will be removed from the queue, and subsequent requests will retrieve an available connection

5. Finally, the underlying subscribe and addListener are also carried out.

After reading this, the article "how to solve the problem of using Redisson subscriptions" has been introduced. If you want to master the knowledge points of this article, you still need to practice and use it yourself. If you want to know more about related articles, welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report