Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to connect JAVA highly concurrent threads and thread pools

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

This article mainly explains "how to connect JAVA highly concurrent threads and thread pool", the content of the article is simple and clear, easy to learn and understand, the following please follow the editor's ideas slowly in depth, together to study and learn "how to connect JAVA highly concurrent threads and thread pool" bar!

Implementation principle of 1 JAVA thread

The thread of java is based on the native thread model of the operating system (non-user mode). Through the system call, the thread of the program is handed over to the system to schedule and execute.

The java thread has its own virtual machine stack, and when JVM prepares the stack, program counters, working memory, etc., it will allocate a system native thread to execute. The Java thread ends and the native thread is reclaimed

When the native thread is initialized, the run method of the Java thread is called. When the JAVA thread ends, all resources of the native thread and Java thread are released

The execution of the java method corresponds to a stack frame of the virtual machine stack, which is used to store local variables, Operand stack, dynamic links, method exits, etc.

2 Life cycle of JAVA threads

New (new state): after a thread is created with the new keyword, the thread is in a new state, and only JVM allocates memory for it and initializes its member variables

Runnable (ready state): the thread is in a ready state when the Thread.start method is called. JVM will assign it a virtual machine stack, etc., and then wait for the system to schedule.

Running (running state): the thread in the ready state gets CPU, and when the run method is executed, the thread is in the running state

Terminated (thread death): a thread dies when the thread ends normally with run, or throws an uncaptured Throwable, or calls Thread.stop to end the thread.

3. Several common methods of JAVA thread

Thread startup function

/ / Thread.java / / call start to start the thread, enter the Runnable state, and wait for the system to schedule and execute public synchronized void start () {/ / synchronized synchronous execution if (threadStatus! = 0) / 0 represents the new status, and if it is not 0, an error throw new IllegalThreadStateException () is thrown. Start0 (); / / Local method method private native void start0 ()} / / Running state, the code method executed by the new thread, which can be overridden by subclass public void run () {if (target! = null) {/ / target is Runnable,new Thread (Runnable);}}

Thread termination function

/ / Thread.java @ Deprecated public final void stop (); / / interrupt the thread public void interrupt () / / determine whether the current thread is in the interrupted state public static boolean interrupted ()

Using stop forcibly terminates the thread, causing all locks held by the thread to be suddenly released (uncontrollable), and the logic of being locked and synchronized is broken. Not recommended

The interrupt function interrupts the thread, but it does not necessarily cause the thread to exit. It is more elegant and controllable than the stop function.

When the thread is in the blocking state of calling sleep or wait, the InterruptedException is thrown, the code is captured, and the thread is terminated.

If the thread is in a non-blocking state, the program needs to call interrupted () to determine whether or not to exit.

Other common methods

/ / Thread.java / / blocking waiting for other threads public final synchronized void join (final long millis) / / temporarily letting CPU execute public static native void yield (); / / dormant for a period of time public static native void sleep (long millis) throws InterruptedException

The difference between start and run methods

Start is a method of the Thread class. From the life cycle of the thread, the execution of start does not mean the execution of the new thread, but lets the JVM allocate the virtual machine stack and enter the Runnable state, and the execution of start is still on the old thread.

Run is the method that the new thread is scheduled by the system and gets the CPU. It must inherit Thread or implement the Runnable interface.

The difference between Thread.sleep and Object.wait

Thread.sleep needs to specify the sleep time, and continue to run as soon as the time is up. It has nothing to do with the locking mechanism, so it can neither be locked nor released.

Object.wait needs to be called in synchronized, otherwise an IllegalMonitorStateException error is reported. The wait method releases the lock and calls the same lock object Object.notify to wake up the thread

4 Thread Pool and its advantages

Creating, ending and destroying a thread each time it is used is a huge overhead. If the caching strategy (thread pool) is used to temporarily store and reuse the threads that have been created, the consumption of the program can be reduced and the utilization of threads can be improved.

Reduce resource consumption: reuse threads can reduce the consumption caused by thread creation and destruction

Improve response time: when a task arrives, it can be executed immediately without waiting for thread creation

Improve thread manageability: use thread pools for unified allocation, monitoring, and tuning

5 JDK encapsulated thread pool

/ / ThreadPoolExecutor.java public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

1 corePoolSize: number of core threads, number of threads maintained by the thread pool

2 maximumPoolSize: the maximum number of threads. When the blocking queue can no longer accept tasks and the maximumPoolSize is greater than corePoolSize, a non-core thread will be created to execute. When no task is executed, it will be destroyed.

3 keepAliveTime: survival time of non-core threads during leisure

4 TimeUnit: used with keepAliveTime to indicate the time unit of the keepAliveTime parameter

5 workQueue: when the number of tasks in progress exceeds corePoolSize, the waiting queue of the task is blocked

6 threadFactory: creation factory for threads

7 handler: reject policy. The number of threads reaches maximumPoolSize, and if a task is submitted, it is handled using a reject policy.

6. Execution flow of thread pool principle

/ / ThreadPoolExecutor.java public void execute (Runnable command) {... If (workerCountOf (c) < corePoolSize) {/ / plan An if (addWorker (command, true)) return; c = ctl.get ();} if (isRunning (c) & & workQueue.offer (command)) {/ / plan B int recheck = ctl.get (); if (! IsRunning (recheck) & & remove (command) reject (command); else if (workerCountOf (recheck) = = 0) addWorker (null, false);} / / addWorker (command, false) false represents the creation of non-core threads to execute tasks else if (! addWorker (command, false)) / / plan C reject (command); / plan D}

Plan A: the execute of the task, first determine that the number of core threads reaches the limit; if not, create a core thread to execute the task; yes, execute plan B

Plan B: when the number of tasks is greater than the number of cores, the task is added to the blocking queue. If the capacity limit of the blocking queue is exceeded, execute C.

Plan C: create a new non-core thread to execute the task when the blocking queue cannot accept the task and the maximumPoolSize set is greater than corePoolSize

Plan D: when there is nothing plan A, B, C can do, use the reject policy to deal with it.

7 simple understanding of blocking queues

Blocking insertion of the queue: when the queue is full, the queue blocks the thread that inserts the element until the queue is dissatisfied

Queue blocking removal: when the queue is empty, the thread that gets the element waits for the queue to become non-empty

The methods provided by BlockingQueue are as follows, where put and take are blocking operations

Operation method throws exception blocking thread returns special value timeout exits insert element add (e) put (e) offer (e) offer (e, timeout, unit) removes element remove () take () poll () pull (timeout, unit) checks element () peek () for nothing

"ArrayBlockingQueue"

ArrayBlockingQueue is a "bounded blocking queue" implemented by an array. The queue size must be specified and queued according to the principle of first in first out (FIFO).

"LinkedBlockingQueue"

Is a "bounded blocking queue" implemented with a linked list. If the size is not specified when constructing LinkedBlockingQueue, the default is Integer.MAX_VALUE, which is infinite.

The producer and consumer of the queue use independent locks to control data operations so as to improve the concurrency of the queue.

"PriorityBlockingQueue"

Public PriorityBlockingQueue (int initialCapacity, Comparator comparator) is based on an array, the element has a priority "unbounded blocking queue", and the priority is determined by Comparator.

PriorityBlockingQueue does not block producers, but it blocks consumers when there are no tasks to consume.

"DelayQueue"

An "unbounded blocking queue" that supports delayed fetching elements, based on PriorityQueue implementation

The element must implement the Delayed interface, specifying how long it takes to get the element from the queue.

It can be used in the design of cache system, timing task scheduling and other scenarios.

"SynchronousQueue"

SynchronousQueue is an unbuffered waiting queue. "to add an element, you must wait for it to be removed before you can continue to add it."

"LinkedTransferQueue"

The TransferQueue "unbounded blocking queue" composed of linked lists has more tryTransfer and transfer functions than other queues.

Transfer: if there is a consumer waiting for the element, it will be passed directly to the consumer. "otherwise, store it at the end of the queue and block until the element is consumed before returning."

TryTransfer: try to see if the incoming element can be passed directly to the consumer. If there is no consumer waiting for the consumption element, the element is added to the end of the queue and returns false

"LinkedBlockingDeque"

LinkedBlockingDeque is a two-way blocking queue built by linked lists, with one more end that can be operated in and out of the queue, reducing half the competition and improving concurrency.

Analysis of four thread pools of 8 Executors

"newFixedThreadPool"

Specify the number of core threads. The queue is a LinkedBlockingQueue unbounded blocking queue, which can never reject a task. It is suitable for stable and fixed concurrency scenarios. It is recommended that threads be set to CPU cores.

/ / Executors.java public static ExecutorService newFixedThreadPool (int nThreads) {return new ThreadPoolExecutor (nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue ());}

"newCachedThreadPool"

The size of the core pool is 0, and the maximum number of threads in the thread pool is the maximum integer. Task submission is first added to the blocking queue, non-core threads are destroyed if there is no task execution for 60 seconds, and the blocking queue is listed as SynchronousQueue. NewCachedThreadPool constantly creates new threads to execute tasks. It is not recommended to use the

/ / Executors.java public static ExecutorService newCachedThreadPool () {return new ThreadPoolExecutor (0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue ());}

"newScheduledThreadPool"

ScheduledThreadPoolExecutor (STPE) is actually a subclass of ThreadPoolExecutor, which can specify the number of core threads, and the queue is the inner class DelayedWorkQueue of STPE. "the advantage of STPE is that A delays executable tasks and B can perform tasks with return values."

/ / Executors.java public ScheduledThreadPoolExecutor (int corePoolSize, ThreadFactory threadFactory) {super (corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue (), threadFactory);} / / specify delayed execution time public ScheduledFuture schedule (Callable callable, long delay, TimeUnit unit)

"newSingleThreadExecutor"

Same as the newFixedThreadPool constructor, but the number of threads is set to 1. The advantage of SingleThreadExecutor over new threads is: "when a thread throws an exception at runtime, a new thread will join the thread pool to complete the next task; blocking the queue ensures that the task is executed according to FIFO."

/ / Executors.java public static ExecutorService newSingleThreadExecutor () {return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor (1,1,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue (); / / Unbounded queue}

9 if you gracefully close the thread pool thread pool

To shut down, we must first shut down the threads in the pool. As mentioned in the third point above, violent mandatory stop threads will lead to inconsistencies in synchronous data, so we need to call interrupt to shut down threads.

The thread pool provides two shutdown methods, shutdownNow and shuwdown

ShutdownNow: the thread pool refuses to receive new tasks, and immediately closes the thread pool (those in execution will continue to finish). The tasks in the queue will no longer be executed, and the unexecuted task List will be returned.

Public List shutdownNow () {... Final ReentrantLock mainLock = this.mainLock; mainLock.lock (); / / locked try {checkShutdownAccess (); advanceRunState (STOP); interruptWorkers (); / / interrupt closes thread tasks = drainQueue (); / / unexecuted task.

Shuwdown: thread pool refuses to receive new tasks, while waiting for the thread pool to be closed after the tasks in the thread pool are executed. The code is similar to shutdownNow.

10 Why does the thread pool use blocking queues

First consider why the thread in the thread pool will not be released and how it manages the life cycle of the thread.

/ / ThreadPoolExecutor.Worker.class final void runWorker (Worker w) {. / / the worker thread enters a loop to get the logic while of the task execution (task! = null | | (task = getTask ())! = null).} private Runnable getTask () {. Runnable r = timed? WorkQueue.poll (keepAliveTime, TimeUnit.NANOSECONDS): workQueue.take (); / / Thread blocks pending tasks,...}

It can be seen that when no task is executed, the thread pool actually hangs using the take method of blocking the queue, thus maintaining the survival of the core thread.

11 the significance of inheriting AQS from worker of thread pool

/ / Worker class, one worker, one thread Worker (Runnable firstTask) {/ / prohibit new threads from being interrupted setState (- 1); / / inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory (). NewThread (this);} final void runWorker (Worker w) {. / / the corresponding construction Worker is setState (- 1) w.unlock (); / / allow interrupts boolean completedAbruptly = true;. W.lock (); / / Lock synchronization. Try {... Task.run (); afterExecute (task, null);} finally {.... W.unlock (); / / release lock}

Worker inherits the meaning of AQS: a forbids threads to be interrupted before they start; B synchronizes the processing logic of the runWorker method

12 reject strategy

AbortPolicy "discard the task and throw a RejectedExecutionException exception"

DiscardOldestPolicy "discard the first task in the queue and resubmit the rejected task"

DiscardPolicy "discards the task but does not throw an exception"

CallerRunsPolicy

❝A handler for rejected tasks that runs the rejected task directly in the calling thread of the {@ code execute} method, unless the executor has been shut down, in which case the task is discarded. ❞

If the task is rejected, the "thread submitting the task" executes the task

13 ForkJoinPool know a wave

Unlike ThreadPoolExecutor, ForkJoinPool is suitable for performing tasks that can decompose subtasks, such as tree traversal, merge sorting and other recursive scenarios.

ForkJoinPool each thread has a corresponding double-ended queue deque;. When the task in the thread is split by fork, the split subtasks will be put into the thread's own deque, reducing thread competition.

Work-stealing work stealing algorithm

When a thread finishes executing its own deque task and the other thread deque has more tasks, it initiates the stealing policy to get the thread from the end of the deque queue of the other thread.

Using RecursiveTask to implement forkjoin process demo

Public class ForkJoinPoolTest {public static void main (String [] args) throws ExecutionException, InterruptedException {ForkJoinPool forkJoinPool = new ForkJoinPool (); for (int I = 0; I < 10; iTunes +) {ForkJoinTask task = forkJoinPool.submit (new Fibonacci (I)); System.out.println (task.get ());} static class Fibonacci extends RecursiveTask {int n Public Fibonacci (int n) {this.n = n;} @ Override protected Integer compute () {if (n)

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report