In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-10 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly explains "what is the design idea of fork/join". Interested friends may wish to have a look. The method introduced in this paper is simple, fast and practical. Let's let Xiaobian take you to learn "what is the design idea of fork/join"!
1. fork/join design ideas
The first thing to understand a framework is to understand someone else's design ideas!
The general execution process of fork/join is as shown in the figure above. First, a large task is split into many independent small tasks, and then multi-threaded parallel processing of these small tasks is performed. After processing the results, we can combine them to get our final result.
Obviously this framework takes advantage of the multicore nature of modern computers to process data in parallel. It doesn't seem like there's anything special about it. Many people know this routine and often use it at work ~. In fact, the most special thing about fork/join is that it also uses an algorithm called work-stealing, which is designed to put the decomposed small tasks in multiple two-ended queues, and threads can obtain tasks at the head and tail of the queue.
When a thread has finished processing the tasks currently in charge of the queue, it can steal tasks from the end of the queue that have not yet been processed, which makes full use of the idle time of the thread!
The work-steering schematic is as follows:
What roles does fork/join define?
Understanding design principles is only the first step! To understand the overall implementation thinking of others, you also need to understand which roles others define to implement the framework and what the scope of responsibility of those roles is. Because you know who's responsible for what and who's doing what, so the whole logic can be strung together! In Java, roles are defined in the form of classes, and the most direct way to understand the behavior of classes is to look at the public methods defined ~.
Here are the main fork/join-related classes in JDK:
ForkJoinPool: acts as a manager in the fork/join framework, and the most primitive tasks must be handed over to it to handle. It is responsible for controlling how many workerThread there are in the whole fork/join, and the creation and activation of workerThread are controlled by it. It is also responsible for the creation and allocation of workQueue queues, and whenever a workerThread is created, it is responsible for allocating the corresponding workQueue. Then it hands over the received work to workerThread to handle, which can be said to be the container of the whole frog/join.
ForkJoinWorkerThread: The "worker" who really works in fork/join is essentially a thread. There is a ForkJoinPool.WorkQueue queue stores the work it wants to do. Before accepting the work, it has to register (registerWorker) with ForkJoinPool and get the corresponding workQueue. Then take the task out of the workQueue and process it. It is attached to ForkJoinPool and survives. If ForkJoinPool is destroyed, it will end.
ForkJoinPool.WorkQueue: This is the dual-ended queue that stores incoming tasks.
ForkJoinTask: represents the task type in fork/join. We generally use its two subclasses RecursiveTask and RecursiveAction. The difference between these two tasks is that RecursiveTask has a return value and RecursiveAction does not. The task processing logic, including task segmentation, is concentrated in the compute() method.
3. What does fork/join do during initialization
From a system to a framework, initialization is often an important part of logic! Because this is the place to start, the logic behind it will have dependencies! Therefore, if you understand the initialization clearly, a lot of logic behind it will be easier to understand.
Here's the code above,(ps: this code was found online with a small modification)
public class CountTask extends RecursiveTask { private static final int THRESHOLD = 2; //Threshold private int start; private int end; public CountTask(int start,int end){ this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; boolean canCompute = (end - start) task)
The detailed code of externalSubmit is posted below, paying attention to the comments.
private void externalSubmit(ForkJoinTask task) { int r; // initialize caller's probe if ((r = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); r = ThreadLocalRandom.getProbe(); } for (;;) { //use circular enqueue WorkQueue[] ws; WorkQueue q; int rs, m, k; boolean move = false; if ((rs = runState)
< 0) { tryTerminate(false, false); // help terminate throw new RejectedExecutionException(); } else if ((rs & STARTED) == 0 || // initialize 初始化操作 ((ws = workQueues) == null || (m = ws.length - 1) < 0)) { int ns = 0; rs = lockRunState(); try { if ((rs & STARTED) == 0) { U.compareAndSwapObject(this, STEALCOUNTER, null, new AtomicLong()); // create workQueues array with size a power of two int p = config & SMASK; // ensure at least 2 slots //config就是cpu的核数 int n = (p >1) ? p - 1 : 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; n = (n + 1) [] a; int am, n, s; if ((a = q.array) != null && (aam = a.length - 1) > (n = (s = q.top) - q.base)) { int j = ((am & s) exec -> compute -> finally set status and result. Since the status is defined and it is also volatile type, we can infer that workerThread will first judge whether the status is completed or abnormal after obtaining the execution task, and then decide whether to process the task.
Let's take a look at the real processing logic code of the task!
Integer rightResult = rightTask.join() public final V join() { int s; if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); } //Determine if staus is complete before executing processing. If it is complete, return directly. //because this task may have been stolen by other threads and processed in the past private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; return (s = status)
< 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : externalAwaitDone(); } 代码的调用链是从上到下。整体处理逻辑如下: 线程是workerThread: 先判断任务是否已经处理完成,任务完成直接返回,没有则直接尝试出队tryUnpush(this) 然后执行任务处理doExec()。如果没有出队成功或者处理成功,则执行wt.pool.awaitJoin(w, this, 0L)。wt.pool.awaitJoin(w, this, 0L)的处理逻辑简单来说也是在一个for(;;)中不断的轮询任务的状态是不是已完成,完成就直接退出方法。否就继续尝试出队处理。直到任务完成或者超时为止。 线程不是workerThread: 直接进行入externalAwaitDone() private int externalAwaitDone() { int s = ((this instanceof CountedCompleter) ? // try helping ForkJoinPool.common.externalHelpComplete( (CountedCompleter)this, 0) : ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0); if (s >= 0 && (s = status) >= 0) { boolean interrupted = false; do { if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { synchronized (this) { if (status >= 0) { try { wait(0L); } catch (InterruptedException ie) { interrupted = true; } } else notifyAll(); } } } while ((s = status) >= 0); if (interrupted) Thread.currentThread().interrupt(); } return s;
externalAwaitDone的处理逻辑其实也比较简单,当前线程自己先尝试把任务出队ForkJoinPool.common.tryExternalUnpush(this) ? doExec()然后处理掉,如果不成功就交给workerThread去处理,然后利用object/wait的经典方法去监听任务status的状态变更。
6、任务的窃取
一直说fork/join的任务是work-stealing(工作窃取),那任务究竟是怎么被窃取的呢。我们分析一下任务是由workThread来窃取的,workThread是一个线程。线程的所有逻辑都是由run()方法执行,所以任务的窃取逻辑一定在run()方法中可以找到!
public void run() { //线程run方法 if (workQueue.array == null) { // only run once Throwable exception = null; try { onStart(); pool.runWorker(workQueue); //在这里处理任务队列! } catch (Throwable ex) { exexception = ex; } finally { try { onTermination(exception); } catch (Throwable ex) { if (exception == null) exexception = ex; } finally { pool.deregisterWorker(this, exception); } } } } /** * Top-level runloop for workers, called by ForkJoinWorkerThread.run. */ final void runWorker(WorkQueue w) { w.growArray(); // allocate queue 进行队列的初始化 int seed = w.hint; // initially holds randomization hint int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift for (ForkJoinTask t;;) { //又是无限循环处理任务! if ((t = scan(w, r)) != null) //在这里获取任务! w.runTask(t); else if (!awaitWork(w, r)) break; r ^= r >> 17; r ^= r [] a; ForkJoinTask t; int b, n; long c; if ((q = ws[k]) != null) { //随机选中了非空队列 q if ((n = (b = q.base) - q.top) < 0 && (a = q.array) != null) { // non-empty long i = (((a.length - 1) & b)
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.