In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
Thread pool source code interpretation of the example analysis, in view of this problem, this article introduces the corresponding analysis and solutions in detail, hoping to help more partners who want to solve this problem to find a more simple and easy way.
1. Execute
As an AtomicInteger class, ctl stores two kinds of information in the class, in which the state of the thread pool is saved by the top 3 bits, and the number of Woker threads in the thread pool is saved by the last 29 bits (thus, the maximum number of threads in the thread pool is about 500 million). Thus it can be seen that the runStateOf () and workerCountOf () methods give methods to view thread status and number of threads, respectively.
Public void execute (Runnable command) {if (command = = null) throw new NullPointerException (); int c = ctl.get (); / / if the number of threads running is less than corePoolSize, try to create a new thread (Worker) and execute its first command if (workerCountOf (c)
< corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //线程数大于corePoolSize,将线程放入任务队列 //第一次校验线程池在运行状态 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //第二次校验,防止在第一次校验通过后线程池关闭。如果线程池关闭,在队列中删除task并拒绝task if (! isRunning(recheck) && remove(command)) reject(command); //如果线程数=0(线程都死掉了,比如:corePoolSize=0),新建线程且未指定firstTask,仅轮询任务队列 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //任务队列已满,尝试创建新线程执行task,创建失败后拒绝task //创建失败原因:1.线程池关闭;2.线程数已经达到maxPoolSize else if (!addWorker(command, false)) reject(command);}private boolean addWorker(Runnable firstTask, boolean core) { retry: //外层循环判断线程池的状态 for (;;) { int c = ctl.get(); //线程池状态 int rs = runStateOf(c); //线程池状态:RUNNING = -1、SHUTDOWN = 0、STOP = 1、TIDYING = 2、TERMINATED if (rs >= SHUTDOWN & &! (rs = = SHUTDOWN & & firstTask = = null &! WorkQueue.isEmpty ()) return false; / / performs a + 1 operation on the number of threads in the form of CAS: for (;;) {int wc = workerCountOf (c); if (wc > = CAPACITY | | wc > = (core? CorePoolSize: maximumPoolSize) return false; if (compareAndIncrementWorkerCount (c)) break retry; c = ctl.get (); / / Re-read ctl if (runStateOf (c)! = rs) continue retry; / / else CAS failed due to workerCount change Retry inner loop} boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try {/ / worker implements the Runable interface w = new Worker (firstTask); final Thread t = w. Thread; if (t! = null) {final ReentrantLock mainLock = this.mainLock MainLock.lock (); try {/ / Recheck while holding lock. / / Back out on ThreadFactory failure or if / / shut down before lock acquired. Int rs = runStateOf (ctl.get ()); if (rs)
< SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //workers是一个worker数组 workers.add(w); int s = workers.size(); if (s >LargestPoolSize) largestPoolSize = s; workerAdded = true;}} finally {mainLock.unlock () } if (workerAdded) {/ / starts the thread and executes run () t.start () in worker; workerStarted = true;} finally {if (! WorkerStarted) addWorkerFailed (w);} return workerStarted;}
Worker class:
Private final class Worker extends AbstractQueuedSynchronizer implements Runnable//Worker is a thread {private static final long serialVersionUID = 6138294804551838833L; final Thread thread; Runnable firstTask; volatile long completedTasks; Worker (Runnable firstTask) {setState (- 1); / / inhibit interrupts until runWorker this.firstTask = firstTask / / package the current Worker as a thread this.thread = getThreadFactory (). NewThread (this);} public void run () {runWorker (this);}} final void runWorker (Worker w) {Thread wt = Thread.currentThread (); Runnable task = w.firstTask; w.firstTask = null; w.unlock () / / allow interrupts boolean completedAbruptly = true; try {/ / if the worker has not yet performed the task passed in the constructor, then in this method, the task will be performed directly, and if not, it will / / attempt to fetch the new task from the task queue. / / after the execution is complete, the worker thread's mission does not really declare a paragraph. In the while section, worker still tries to get new / / tasks through the getTask () method. The following is the implementation of getTask (). While (task! = null | | (task = getTask ())! = null) {w.lock () If ((runStateAtLeast (ctl.get (), STOP) | | (Thread.interrupted () & & runStateAtLeast (ctl.get (), STOP)) & &! wt.isInterrupted () wt.interrupt (); try {beforeExecute (wt, task) Throwable thrown = null; try {task.run ();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x } catch (Throwable x) {thrown = x; throw new Error (x);} finally {afterExecute (task, thrown);}} finally {task = null; W. completedTasksgiving + W.unlock ();}} completedAbruptly = false;} finally {processWorkerExit (w, completedAbruptly);}} private Runnable getTask () {boolean timedOut = false; / / Did the last poll () time out? For (;;) {int c = ctl.get (); int rs = runStateOf (c); / / Check if queue empty only if necessary. If (rs > = SHUTDOWN & & (rs > = STOP | | workQueue.isEmpty ()) {decrementWorkerCount (); return null;} int wc = workerCountOf (c); / / Are workers subject to culling? Boolean timed = allowCoreThreadTimeOut | | wc > corePoolSize; if ((wc > maximumPoolSize | | (timed & & timedOut)) & & (wc > 1 | | workQueue.isEmpty ()) {if (compareAndDecrementWorkerCount (c)) return null; continue } try {/ / pull thread Runnable r = timed from the work queue WorkQueue.poll (keepAliveTime, TimeUnit.NANOSECONDS): workQueue.take (); if (r! = null) return r; timedOut = true;} catch (InterruptedException retry) {timedOut = false;}
II. Submit
Public Future submit (Runnable task) {if (task = = null) throw new NullPointerException (); RunnableFuture ftask = newTaskFor (task, null); execute (ftask); return ftask;} / * * @ throws RejectedExecutionException {@ inheritDoc} * @ throws NullPointerException {@ inheritDoc} * / public Future submit (Runnable task, T result) {if (task = null) throw new NullPointerException () RunnableFuture ftask = newTaskFor (task, result); execute (ftask); return ftask;} / * * @ throws RejectedExecutionException {@ inheritDoc} * @ throws NullPointerException {@ inheritDoc} * / public Future submit (Callable task) {if (task = = null) throw new NullPointerException (); RunnableFuture ftask = newTaskFor (task); execute (ftask); return ftask } protected RunnableFuture newTaskFor (Runnable runnable, T value) {return new FutureTask (runnable, value);} protected RunnableFuture newTaskFor (Callable callable) {return new FutureTask (callable);}
As you can see, submit wraps the ordinary runnable into FutureTask and returns it, and then calls execute to execute it.
This is the answer to the sample analysis question on thread pool source code interpretation. I hope the above content can be of some help to you. If you still have a lot of doubts to be solved, you can follow the industry information channel to learn more about it.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.