In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-06 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article introduces the knowledge of "what are the advantages of Dubbo thread pool". In the operation of practical cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
1 basic knowledge 1.1 DUBBO threading model
1.1.1 basic concepts
DUBBO underlying network communication uses the Netty framework, we write a Netty server to observe:
Public class NettyServer {public static void main (String [] args) throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup (1); EventLoopGroup workerGroup = new NioEventLoopGroup (8); try {ServerBootstrap bootstrap = new ServerBootstrap () Bootstrap.group (bossGroup, workerGroup) .channel (NioServerSocketChannel.class) .option (ChannelOption.SO_BACKLOG, 128) .childOption (ChannelOption.SO_KEEPALIVE) True) .childHandler (new ChannelInitializer () {@ Override protected void initChannel (SocketChannel ch) throws Exception {ch.pipeline () .addLast (new NettyServerHandler ()) }}); ChannelFuture channelFuture = bootstrap.bind (7777). Sync (); System.out.println ("server ready"); channelFuture.channel (). CloseFuture (). Sync ();} catch (Exception ex) {System.out.println (ex.getMessage ()) } finally {bossGroup.shutdownGracefully (); workerGroup.shutdownGracefully ();}
Only one thread in the BossGroup thread group processes the client connection request, and after the connection is completed, the SocketChannel connection that completes the three-way handshake is distributed to the WorkerGroup to process the read and write request. These two thread groups are called "IO threads".
We then introduce the concept of "business thread". After the service producer receives the request, if the processing logic can be processed quickly, it can be directly placed in the IO thread processing, thus reducing thread pool scheduling and context switching. But if the processing logic is time-consuming, or if a new IO request is made, such as querying the database, then it must be dispatched to the business thread pool for processing.
DUBBO provides a variety of threading models. To select a threading model, you need to specify the dispatcher attribute in the configuration file:
Different threading models choose whether to use IO threads or business threads. The official website of DUBBO explains:
All all messages are dispatched to the business thread pool, including requests, responses, connection events, disconnect events, heartbeat direct all messages are not sent to the business thread pool, all message is executed directly in the IO thread only request response messages are sent to the business thread pool, other connection disconnection events, heartbeats and other messages are directly executed in the I thread execution only request messages are sent to the business thread pool, responses and other connection disconnection events Messages such as heartbeat execute connection directly in the IO thread. Disconnect events are queued on the IO thread, executed one by one in an orderly manner, and other messages are sent to the business thread pool.
1.1.2 determine the timing
Producers and consumers determine the threading model at initialization:
/ / producer public class NettyServer extends AbstractServer implements Server {public NettyServer (URL url, ChannelHandler handler) throws RemotingException {super (url, ChannelHandlers.wrap (handler, ExecutorUtil.setThreadName (url, SERVER_THREAD_POOL_NAME));}} / / Consumer public class NettyClient extends AbstractClient {public NettyClient (final URL url, final ChannelHandler handler) throws RemotingException {super (url, wrapChannelHandler (url, handler));}}
Both the producer and consumer default threading models use the AllDispatcher,ChannelHandlers.wrap method to obtain Dispatch adaptive extension points. If we specify dispatcher in the configuration file, the extension point loader will get the property value from URL to load the corresponding thread model. This paper takes the producer as an example to analyze:
Public class NettyServer extends AbstractServer implements Server {public NettyServer (URL url, ChannelHandler handler) throws RemotingException {/ / ChannelHandlers.wrap determines threading strategy super (url, ChannelHandlers.wrap (handler, ExecutorUtil.setThreadName (url, SERVER_THREAD_POOL_NAME));}} public class ChannelHandlers {protected ChannelHandler wrapInternal (ChannelHandler handler, URL url) {return new MultiMessageHandler (new HeartbeatHandler (ExtensionLoader.getExtensionLoader (Dispatcher.class). GetAdaptiveExtension (). Dispatch (handler, url) } @ SPI (AllDispatcher.NAME) public interface Dispatcher {@ Adaptive ({Constants.DISPATCHER_KEY, "channel.handler"}) ChannelHandler dispatch (ChannelHandler handler, URL url);}
1.1.3 Source code analysis
We analyze two of the thread model source code, the other thread model please read the DUBBO source code. All messages in the AllDispatcher model are sent to the business thread pool, including requests, responses, connection events, disconnect events, heartbeats:
Public class AllDispatcher implements Dispatcher {/ / Thread model name public static final String NAME = "all"; / / specific implementation strategy @ Override public ChannelHandler dispatch (ChannelHandler handler, URL url) {return new AllChannelHandler (handler, url) }} public class AllChannelHandler extends WrappedChannelHandler {@ Override public void connected (Channel channel) throws RemotingException {/ / connection completion event to business thread pool ExecutorService cexecutor = getExecutorService (); try {cexecutor.execute (new ChannelEventRunnable (channel, handler, ChannelState.CONNECTED)) } catch (Throwable t) {throw new ExecutionException ("connect event", channel, getClass () + "error when process connected event", t);} @ Override public void disconnected (Channel channel) throws RemotingException {/ / disconnect event to business thread pool ExecutorService cexecutor = getExecutorService () Try {cexecutor.execute (new ChannelEventRunnable (channel, handler, ChannelState.DISCONNECTED));} catch (Throwable t) {throw new ExecutionException ("disconnect event", channel, getClass () + "error when process disconnected event", t) } @ Override public void received (Channel channel, Object message) throws RemotingException {/ / request response event to the business thread pool ExecutorService cexecutor = getExecutorService (); try {cexecutor.execute (new ChannelEventRunnable (channel, handler, ChannelState.RECEIVED, message)) } catch (Throwable t) {if (message instanceof Request & & t instanceof RejectedExecutionException) {Request request = (Request) message; if (request.isTwoWay ()) {String msg = "Server side (" + url.getIp () + "," + url.getPort () + "threadpool is exhausted, detail msg:" + t.getMessage () Response response = new Response (request.getId (), request.getVersion ()); response.setStatus (Response.SERVER_THREADPOOL_EXHAUSTED_ERROR); response.setErrorMessage (msg); channel.send (response); return }} throw new ExecutionException (message, channel, getClass () + "error when process received event", t);} @ Override public void caught (Channel channel, Throwable exception) throws RemotingException {/ / exception event to business thread pool ExecutorService cexecutor = getExecutorService () Try {cexecutor.execute (new ChannelEventRunnable (channel, handler, ChannelState.CAUGHT, exception);} catch (Throwable t) {throw new ExecutionException ("caught event", channel, getClass () + "error when process caught event", t);}
All messages in the DirectDispatcher policy are not sent to the business thread pool, but are executed directly in the IO thread:
Public class DirectDispatcher implements Dispatcher {/ / Thread model name public static final String NAME = "direct"; / / specific implementation strategy @ Override public ChannelHandler dispatch (ChannelHandler handler, URL url) {/ / return handler directly means that all events are handed over to IO thread processing return handler;}} 1.2 DUBBO thread pool policy
1.2.1 basic concepts
The previous section analyzed the threading model, and we know that different threading models will choose whether to use IO threads or business threads. If you use a business thread pool, what thread pool strategy to use is a question that needs to be answered in this section. The DUBBO official website thread dispatch model diagram shows the relationship between the thread model and thread pool policy:
DUBBO provides a variety of thread pool policies. To select a thread pool policy, you need to specify the threadpool attribute in the configuration file:
Different thread pool policies create thread pools with different characteristics:
Fixed contains a fixed number of threads cached threads are recycled when they are idle for one minute, and new threads are created when new requests arrive. The number of limited threads increases as the task increases, but does not exceed the maximum threshold. Idle threads will not be recycled eager when all core threads are busy, priority is given to creating new threads to execute tasks rather than queuing immediately
1.2.2 determine the timing
In this article, we take AllDispatcher as an example to analyze when the thread pool policy is determined:
Public class AllDispatcher implements Dispatcher {public static final String NAME = "all"; @ Override public ChannelHandler dispatch (ChannelHandler handler, URL url) {return new AllChannelHandler (handler, url);}} public class AllChannelHandler extends WrappedChannelHandler {public AllChannelHandler (ChannelHandler handler, URL url) {super (handler, url);}}
If the threadpool attribute is specified in the configuration in the WrappedChannelHandler constructor, the extension point loader will get the attribute value from URL to load the corresponding thread pool policy. The default policy is fixed:
Public class WrappedChannelHandler implements ChannelHandlerDelegate {public WrappedChannelHandler (ChannelHandler handler, URL url) {this.handler = handler; this.url = url; / / get thread pool adaptive extension point executor = (ExecutorService) ExtensionLoader.getExtensionLoader (ThreadPool.class). GetAdaptiveExtension (). GetExecutor (url); String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY If (Constants.CONSUMER_SIDE.equalsIgnoreCase (url.getParameter (Constants.SIDE_KEY) {componentKey = Constants.CONSUMER_SIDE;} DataStore dataStore = ExtensionLoader.getExtensionLoader (DataStore.class). GetDefaultExtension (); dataStore.put (componentKey, Integer.toString (url.getPort ()), executor) } @ SPI ("fixed") public interface ThreadPool {@ Adaptive ({Constants.THREADPOOL_KEY}) Executor getExecutor (URL url);}
1.2.3 Source Code Analysis
(1) FixedThreadPool
Public class FixedThreadPool implements ThreadPool {@ Override public Executor getExecutor (URL url) {/ / thread name String name = url.getParameter (Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); / / the default number of threads is 200thread = url.getParameter (Constants.THREADS_KEY, Constants.DEFAULT_THREADS) / / queue capacity defaults to 0 int queues = url.getParameter (Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES) / / queue capacity equal to 0 use blocking queue SynchronousQueue / / queue capacity less than 0 use unbounded blocking queue LinkedBlockingQueue / / queue capacity greater than 0 use bounded blocking queue LinkedBlockingQueue return new ThreadPoolExecutor (threads, threads, 0, TimeUnit.MILLISECONDS, queues = = 0? New SynchronousQueue (): (queues
< 0 ? new LinkedBlockingQueue() : new LinkedBlockingQueue(queues)), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } } (2) CachedThreadPool public class CachedThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { // 获取线程名称 String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); // 核心线程数默认0 int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS); // 最大线程数默认Int最大值 int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE); // 队列容量默认0 int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); // 线程空闲多少时间被回收默认1分钟 int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE); // 队列容量等于0使用阻塞队列SynchronousQueue // 队列容量小于0使用无界阻塞队列LinkedBlockingQueue // 队列容量大于0使用有界阻塞队列LinkedBlockingQueue return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue() : (queues < 0 ? new LinkedBlockingQueue() : new LinkedBlockingQueue(queues)), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } } (3) LimitedThreadPool public class LimitedThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { // 获取线程名称 String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); // 核心线程数默认0 int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS); // 最大线程数默认200 int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS); // 队列容量默认0 int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); // 队列容量等于0使用阻塞队列SynchronousQueue // 队列容量小于0使用无界阻塞队列LinkedBlockingQueue // 队列容量大于0使用有界阻塞队列LinkedBlockingQueue // keepalive时间设置Long.MAX_VALUE表示不回收空闲线程 return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue() : (queues < 0 ? new LinkedBlockingQueue() : new LinkedBlockingQueue(queues)), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } } (4) EagerThreadPool 我们知道ThreadPoolExecutor是普通线程执行器。当线程池核心线程达到阈值时新任务放入队列,当队列已满开启新线程处理,当前线程数达到最大线程数时执行拒绝策略。 但是EagerThreadPool自定义线程执行策略,当线程池核心线程达到阈值时,新任务不会放入队列而是开启新线程进行处理(要求当前线程数没有超过最大线程数)。当前线程数达到最大线程数时任务放入队列。 public class EagerThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { // 线程名 String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); // 核心线程数默认0 int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS); // 最大线程数默认Int最大值 int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE); // 队列容量默认0 int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); // 线程空闲多少时间被回收默认1分钟 int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE); // 初始化自定义线程池和队列重写相关方法 TaskQueue taskQueue = new TaskQueue(queues { String dumpPath = url.getParameter(Constants.DUMP_DIRECTORY, System.getProperty("user.home")); System.out.println("AbortPolicyWithReport dumpJStack directory=" + dumpPath); SimpleDateFormat sdf; String os = System.getProperty("os.name").toLowerCase(); // linux文件位置/home/xxx/Dubbo_JStack.log.2021-01-01_20:50:15 // windows文件位置/user/xxx/Dubbo_JStack.log.2020-01-01_20-50-15 if (os.contains("win")) { sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss"); } else { sdf = new SimpleDateFormat("yyyy-MM-dd_HH:mm:ss"); } String dateStr = sdf.format(new Date()); try (FileOutputStream jStackStream = new FileOutputStream(new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr))) { JVMUtil.jstack(jStackStream); } catch (Throwable t) { logger.error("dump jStack error", t); } finally { guard.release(); } lastPrintTime = System.currentTimeMillis(); }); pool.shutdown(); } } 拒绝策略会输出线程快照文件,在分析线程快照文件时BLOCKED和TIMED_WAITING线程状态需要我们重点关注。如果发现大量线程阻塞或者等待状态则可以定位到具体代码行: DubboServerHandler-x.x.x.x:9999-thread-200 Id=230 TIMED_WAITING at java.lang.Thread.sleep(Native Method) at com.java.front.dubbo.demo.provider.HelloServiceImpl.sayHello(HelloServiceImpl.java:13) at org.apache.dubbo.common.bytecode.Wrapper1.invokeMethod(Wrapper1.java) at org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory$1.doInvoke(JavassistProxyFactory.java:56) at org.apache.dubbo.rpc.proxy.AbstractProxyInvoker.invoke(AbstractProxyInvoker.java:85) at org.apache.dubbo.config.invoker.DelegateProviderMetaDataInvoker.invoke(DelegateProviderMetaDataInvoker.java:56) at org.apache.dubbo.rpc.protocol.InvokerWrapper.invoke(InvokerWrapper.java:56) (2) 优化慢服务 现在已经找到了慢服务,此时我们就可以优化慢服务了。优化慢服务就需要具体问题具体分析了,这不是本文的重点在此不进行展开。 2.2 生产者预热不充分 2.2.1 原因分析 还有一种RT上升的情况是我们不能忽视的,这种情况就是提供者重启后预热不充分即被调用。因为当生产者刚启动时需要预热,需要和其它资源例如数据库、缓存等建立连接,建立连接是需要时间的。如果此时大量消费者请求到未预热的生产者,链路时间增加了连接时间,RT时间必然会增加,从而也会导致DUBBO线程池打满问题。 2.2.2 解决方案 (1) 等待生产者充分预热 因为生产者预热不充分导致线程池打满问题,最容易发生在系统发布时。例如发布了一台机器后发现线上出现线程池打满问题,千万不要着急重启机器,而是给机器一段时间预热,等连接建立后问题大概率消失。同时我们在发布时也要分多批次发布,不要一次发布太多机器导致服务因为预热问题造成大面积影响。 (2) DUBBO升级版本大于等于2.7.4 DUBBO消费者在调用选择生产者时本身就会执行预热逻辑,为什么还会出现预热不充分问题?这是因为2.5.5之前版本以及2.7.2版本预热机制是有问题的,简而言之就是获取启动时间不正确,2.7.4版本彻底解决了这个问题,所以我们要避免使用问题版本。下面我们阅读2.7.0版本预热机制源码,看看预热机制如何生效: public class RandomLoadBalance extends AbstractLoadBalance { public static final String NAME = "random"; @Override protected Invoker doSelect(List invokers, URL url, Invocation invocation) { // invokers数量 int length = invokers.size(); // 权重是否相同 boolean sameWeight = true; // invokers权重数组 int[] weights = new int[length]; // 第一个invoker权重 int firstWeight = getWeight(invokers.get(0), invocation); weights[0] = firstWeight; // 权重值之和 int totalWeight = firstWeight; for (int i = 1; i < length; i++) { // 计算权重值 int weight = getWeight(invokers.get(i), invocation); weights[i] = weight; totalWeight += weight; // 任意一个invoker权重值不等于第一个invoker权重值则sameWeight设置为FALSE if (sameWeight && weight != firstWeight) { sameWeight = false; } } // 权重值不等则根据总权重值计算 if (totalWeight >0 & &! sameWeight) {int offset = ThreadLocalRandom.current () .nextInt (totalWeight); / / continuously subtract the weight value and return for directly when the weight value is less than 0 (int I = 0; I)
< length; i++) { offset -= weights[i]; if (offset < 0) { return invokers.get(i); } } } // 所有服务权重值一致则随机返回 return invokers.get(ThreadLocalRandom.current().nextInt(length)); } } public abstract class AbstractLoadBalance implements LoadBalance { static int calculateWarmupWeight(int uptime, int warmup, int weight) { // uptime/(warmup*weight) // 如果当前服务提供者没过预热期,用户设置的权重将通过uptime/warmup减小 // 如果服务提供者设置权重很大但是还没过预热时间,重新计算权重会很小 int ww = (int) ((float) uptime / ((float) warmup / (float) weight)); return ww < 1 ? 1 : (ww >Weight? Weight: ww);} protected int getWeight (Invoker invoker, Invocation invocation) {/ / get invoker setting weight default weight = 100int weight = invoker.getUrl () .getMethodParameter (invocation.getMethodName (), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT) / / if the weight is greater than 0 if (weight > 0) {/ / the service provider publishes the service timestamp long timestamp = invoker.getUrl () .getParameter (Constants.REMOTE_TIMESTAMP_KEY, 0L) If (timestamp > 0L) {/ / how long has the service been released int uptime = (int) (System.currentTimeMillis ()-timestamp); / / the warm-up time defaults to 10 minutes int warmup = invoker.getUrl () .getParameter (Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP) / / producer release time is greater than 0 but less than warm-up time if (uptime > 0 & & uptime
< warmup) { // 重新计算权重值 weight = calculateWarmupWeight(uptime, warmup, weight); } } } // 服务发布时间大于预热时间直接返回设置权重值 return weight >= 0? Weight: 0;}} 3 QPS rise
The previous chapter talked at length about the thread pool filling problem caused by RT rising, and now let's talk about another parameter, QPS. When upstream traffic surges, it will lead to the creation of a large number of thread pools, which will also cause the thread pool to be full. At this time, if we find that QPS exceeds the capacity of the system, we have to adopt a downgrade scheme to protect the system.
This is the end of the introduction of "what are the advantages of Dubbo thread pool". Thank you for your reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.