Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use java8 Asynchronous call

2025-03-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/01 Report--

This article analyzes "how to use java8 asynchronous calls". The content is detailed and easy to understand. Friends who are interested in "how to use java8 asynchronous calls" can follow the editor's train of thought to read it in depth. I hope it will be helpful to you after reading. Let's learn more about "how to use java8 asynchronous calls" with the editor.

I. Analysis of asynchronous invocation mode

Today, when writing code, I want to call asynchronous operations. Here I use the streaming asynchronous call of java8, but in the process of using it, I found that there are two methods for this asynchronous mode, as shown below:

The difference is that one needs to specify a thread pool and the other does not.

So what are the benefits of specifying a thread pool? Intuitively speaking, there are two advantages:

We can better plan our thread count through pool management according to our server performance.

You can customize the name of the thread we use, which is also mentioned in the Ali java development specification.

1.1 java8 asynchronous invocation default thread pool mode

Of course, there is nothing wrong with routinely using the default. We analyze the process of using the default thread pool through the source code.

Public static CompletableFuture runAsync (Runnable runnable) {return asyncRunStage (asyncPool, runnable);}

What is this asyncPool?

As shown below, if useCommonPool is true, use ForkJoinPool.commonPool (), otherwise create a new ThreadPerTaskExecutor ():

Private static final Executor asyncPool = useCommonPool? ForkJoinPool.commonPool (): new ThreadPerTaskExecutor ()

What is useCommonPool?

Private static final boolean useCommonPool = (ForkJoinPool.getCommonPoolParallelism () > 1); / * * the target parallelism level of the common pool * / public static int getCommonPoolParallelism () {return commonParallelism;}

In the end, this level of parallelism does not give a default value.

Static final int commonParallelism

By finding the call to this constant, let's see how it is initialized. There is a static block of code in ForkJoinPool that initializes commonParallelism when it starts. Let's just focus on the last sentence:

/ / Unsafe mechanics private static final sun.misc.Unsafe U; private static final int ABASE; private static final int ASHIFT; private static final long CTL; private static final long RUNSTATE; private static final long STEALCOUNTER; private static final long PARKBLOCKER; private static final long QTOP; private static final long QLOCK; private static final long QSCANSTATE; private static final long QPARKER; private static final long QCURRENTSTEAL; private static final long QCURRENTJOIN Static {/ / initialize field offsets for CAS etc try {U = sun.misc.Unsafe.getUnsafe (); Class k = ForkJoinPool.class; CTL = U.objectFieldOffset (k.getDeclaredField ("ctl")); RUNSTATE = U.objectFieldOffset (k.getDeclaredField ("runState")) STEALCOUNTER = U.objectFieldOffset (k.getDeclaredField ("stealCounter")); Class tk = Thread.class; PARKBLOCKER = U.objectFieldOffset (tk.getDeclaredField ("parkBlocker")); Class wk = WorkQueue.class; QTOP = U.objectFieldOffset (wk.getDeclaredField ("top")) QLOCK = U.objectFieldOffset (wk.getDeclaredField ("qlock")); QSCANSTATE = U.objectFieldOffset (wk.getDeclaredField ("scanState")); QPARKER = U.objectFieldOffset (wk.getDeclaredField ("parker")); QCURRENTSTEAL = U.objectFieldOffset (wk.getDeclaredField ("currentSteal")) QCURRENTJOIN = U.objectFieldOffset (wk.getDeclaredField ("currentJoin")); Class ak = ForkJoinTask [] .class; ABASE = U.arrayBaseOffset (ak); int scale = U.arrayIndexScale (ak); if ((scale & (scale-1))! = 0) throw new Error ("data type scale not a power of two") ASHIFT = 31-Integer.numberOfLeadingZeros (scale);} catch (Exception e) {throw new Error (e);} commonMaxSpares = DEFAULT_COMMON_MAX_SPARES; defaultForkJoinWorkerThreadFactory = new DefaultForkJoinWorkerThreadFactory (); modifyThreadPermission = new RuntimePermission ("modifyThread") Common = java.security.AccessController.doPrivileged (new java.security.PrivilegedAction () {public ForkJoinPool run () {return makeCommonPool ();}}); / / 1 even if the thread is disabled, at least 1 int par = common.config & SMASK; commonParallelism = par > 0? Par: 1;}

As shown below, the default is 7:

So take a look at the following code:

Private static final boolean useCommonPool = (ForkJoinPool.getCommonPoolParallelism () > 1)

True must be returned here, proving that it is currently parallel.

Private static final Executor asyncPool = useCommonPool? ForkJoinPool.commonPool (): new ThreadPerTaskExecutor ()

It returns a default thread pool with a size of seven

In fact, this default value is the number of cores of the current cpu. My computer is eight cores. By default, the number of cores will be reduced by one in the code, so it is shown to be seven threads.

If (parallelism

< 0 && //默认是1,小于核心数 (parallelism = Runtime.getRuntime().availableProcessors() - 1) MAX_CAP) parallelism = MAX_CAP; 下面我们写个main方法测试一下,10个线程,每个阻塞10秒,看结果: public static void main(String[] args) { // 创建10个任务,每个任务阻塞10秒 for (int i = 0; i < 10; i++) { CompletableFuture.runAsync(() ->

{try {Thread.sleep (10000); System.out.println (new Date () + ":" + Thread.currentThread () .getName ());} catch (InterruptedException e) {e.printStackTrace ();}}) } try {Thread.sleep (30000);} catch (InterruptedException e) {e.printStackTrace ();}}

The results are as follows: the first seven tasks are completed first, and the other three tasks are blocked for 10 seconds before they are completed:

Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-5Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-4Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-2Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-7Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-3Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-6Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-1 -Mon Sep 13 11:21:07 CST 2021:ForkJoinPool.commonPool-worker-2Mon Sep 13 11:21:07 CST 2021:ForkJoinPool.commonPool-worker-5Mon Sep 13 11:21:07 CST 2021:ForkJoinPool.commonPool-worker-4

Conclusion: when we use the default thread pool for asynchronous calls, if the asynchronous task is IO-intensive, in short, it takes a long time to process, which will lead to blocking of other tasks using the shared thread pool, resulting in system performance degradation or even abnormal. Even when part of the interface is called, if the interface times out, it will block for the same amount of time as the timeout; it can actually improve performance in computing-intensive scenarios.

Second, use a custom thread pool

As mentioned above, if it is an IO-intensive scenario, it is better to use a custom thread pool for asynchronous calls.

For the two obvious benefits mentioned at the beginning, a new one is added here:

We can better plan our thread count through pool management according to our server performance.

You can customize the name of the thread we use, which is also mentioned in the Ali java development specification.

Blocking does not cause other threads using the shared thread pool to block or even exception.

Let's customize the following thread pool:

/ * * @ description: global universal thread pool * @ author:weirx * @ date:2021/9/9 18:09 * @ version:3.0 * / @ Slf4jpublic class GlobalThreadPool {/ * Core threads * / public final static int CORE_POOL_SIZE = 10; / * * maximum threads * / public final static int MAX_NUM_POOL_SIZE = 20 / * Task queue size * / public final static int BLOCKING_QUEUE_SIZE = 30; / * Thread pool instance * / private final static ThreadPoolExecutor instance = getInstance () / * description: initialize thread pool * * @ return: java.util.concurrent.ThreadPoolExecutor * @ author: weirx * @ time: 9:49 * / private synchronized static ThreadPoolExecutor getInstance () {/ / generate thread pool ThreadPoolExecutor executor = new ThreadPoolExecutor (CORE_POOL_SIZE, MAX_NUM_POOL_SIZE) on 2021-9-10 60, TimeUnit.SECONDS, new LinkedBlockingQueue (BLOCKING_QUEUE_SIZE), new NamedThreadFactory ("Thread-wjbgn-", false)) Return executor;} private GlobalThreadPool () {} public static ThreadPoolExecutor getExecutor () {return instance;}}

Call:

Public static void main (String [] args) {/ / create 10 tasks, each blocking for for 10 seconds (int I = 0; I

< 10; i++) { CompletableFuture.runAsync(() ->

{try {Thread.sleep (10000); System.out.println (new Date () + ":" + Thread.currentThread (). GetName ());} catch (InterruptedException e) {e.printStackTrace ();}}, GlobalThreadPool.getExecutor ()) } try {Thread.sleep (30000);} catch (InterruptedException e) {e.printStackTrace ();}}

Output the thread with the name we specified:

Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-1Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-10Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-2Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-9Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-5Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-6Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-3Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-7Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-8Mon Sep 13 11:32:35 CST 2021 CST 2021:Thread-Inbox-Model-8Mon Sep Inbox Model-4 III. Extraneous remarks Dynamic thread pool 3.1 what is a dynamic thread pool?

When we use thread pools, are we sometimes confused about how big the thread pool parameters are most appropriate? What if it's not enough? do you want to change the code and redeploy?

In fact, it is not necessary. I remember reading an article by Meituan at that time, which really made people open, ah, dynamic thread pool.

The ThreadPoolExecutor class actually provides modifications to the properties of the thread pool, and allows us to dynamically modify the following attributes:

From top to bottom are:

Thread factory (used to specify thread name)

Number of core threads

Maximum number of threads

Active time

Reject policy.

In Meituan's article, it is to monitor the utilization of server threads, alarm when the threshold is reached, and then dynamically modify these values through the configuration center.

We can do this, too, using @ Reusable ScopePlus nacos.

3.2 practice

I wrote a scheduled task to monitor the thread utilization of the current service, expand the capacity when it is small, and restore the initial value after a period of time when the occupancy rate decreases.

In fact, there are still many areas that need to be improved. Please make more comments. What you are monitoring is the thread pool GlobalThreadPool in front of the article. The following code for scheduling tasks:

/ * * @ description: global thread pool daemon * @ author:weirx * @ date:2021/9/10 16:32 * @ version:3.0 * / @ Slf4j@Componentpublic class DaemonThreadTask {/ * maximum number of threads supported by the service * / public final static int SERVER_MAX_SIZE = 50; / * * maximum threshold Maximum threshold, percentage * / private final static int MAXIMUM_THRESHOLD = 8 / * maximum number of threads per increment * / private final static int INCREMENTAL_MAX_NUM = 10; / * number of core threads per increment * / private final static int INCREMENTAL_CORE_NUM = 5; / * * current threads * / private static int currentSize = GlobalThreadPool.MAX_NUM_POOL_SIZE / * * current core threads * / private static int currentCoreSize = GlobalThreadPool.CORE_POOL_SIZE; @ Scheduled (cron = "0 * / 5 *?") Public static void execute () {threadMonitor ();} / * description: dynamically monitor and set thread parameters * * @ return: void * @ author: weirx * @ time: 13:20 * / private static void threadMonitor () on 2021-9-10 {ThreadPoolExecutor instance = GlobalThreadPool.getExecutor (); int activeCount = instance.getActiveCount () Int size = instance.getQueue (). Size (); log.info ("GlobalThreadPool: the active thread count is {}", activeCount); / / insufficient number of threads, add thread if (activeCount > GlobalThreadPool.MAX_NUM_POOL_SIZE% MAXIMUM_THRESHOLD & & size > = GlobalThreadPool.BLOCKING_QUEUE_SIZE) {currentSize = currentSize + INCREMENTAL_MAX_NUM; currentCoreSize = currentCoreSize + INCREMENTAL_CORE_NUM / / the maximum number of threads currently set is less than the maximum number of threads supported by the service before you can continue to increase the thread if (currentSize GlobalThreadPool.MAX_NUM_POOL_SIZE) {currentSize = GlobalThreadPool.MAX_NUM_POOL_SIZE; currentCoreSize = GlobalThreadPool.CORE_POOL_SIZE; instance.setMaximumPoolSize (currentSize); instance.setCorePoolSize (currentCoreSize) }} 3.3 what is the point of dynamic thread pools?

Some friends have actually asked me, I just set the thread pool a little larger, what is the point of this dynamic thread pool?

Actually, this is a good question. In the previous traditional software, stand-alone deployment, hardware deployment, indeed, the number of threads we can use depends on the number of core threads of the server, and there are few other services to compete for these threads.

But this is the age of containers, the age of native clouds.

If multiple containers are deployed on a single host, a container needs to take up a lot of cpu resources during peak hours. If all containers occupy most of the resources, then the container will inevitably face the risk of blocking or even paralysis.

When the peak is over, this part of the resource can be released and used for other needed containers.

Combined with the current CVM node expansion, it requires dynamic expansion, similar to threads, and saves costs as much as possible in the case of high availability.

About how to use java8 asynchronous calls to share here, I hope that the above content can make you improve. If you want to learn more knowledge, please pay more attention to the editor's updates. Thank you for following the website!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report