Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is ForkJoin in Java and how to call it

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly explains "what is ForkJoin in Java and how to call it". Interested friends may wish to have a look at it. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn "what is ForkJoin in Java and how to call it?"

What is ForkJoin?

ForkJoin literally Fork means bifurcation, and Join means combination. We can understand that large tasks are divided into small tasks for calculation, and finally the results of small tasks are combined to find the solution of large tasks. These fission small tasks can be calculated by different threads, which is an idea of distributed computing. This is similar to the distributed offline computing MapReduce in big data. One of the most classic applications of ForkJoin is Stream in Java8. We know that Stream is divided into serial flow and parallel flow, in which parallel flow parallelStream relies on ForkJoin to achieve parallel processing.

Let's take a look at the core ForkJoinTask and ForkJoinPool.

ForkJoinTask task

The dependency of ForkJoinTask itself is not complex. Like asynchronous task computing FutureTask, it implements the Future interface.

Let's take a look at the core source code of ForkJoinTask and see how this task is calculated by divide-and-conquer.

The core of ForkJoinTask is the fork () and join () methods.

Fork ()

Determine whether the current thread is a ForkJoinWorkerThread thread

Is to push the current thread directly into the work queue

No call to the externalPush method of ForkJoinPool

A static common object is built in ForkJoinPool, where common's externalPush () is called.

Join ()

Call the doJoin () method to wait for the thread to finish execution

Public final ForkJoinTask fork () {Thread t; if ((t = Thread.currentThread ()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread) t) .workQueue.push (this); else ForkJoinPool.common.externalPush (this); return this;} public final V join () {int s If ((s = doJoin () & DONE_MASK)! = NORMAL) reportException (s); return getRawResult ();} private int doJoin () {int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; return (s = status)

< 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : externalAwaitDone(); } // 获取结果的方法由子类实现 public abstract V getRawResult(); RecursiveTask 是ForkJoinTask的一个子类主要对获取结果的方法进行了实现,通过泛型约束结果。我们如果需要自己创建任务,仍需要实现RecursiveTask,并去编写最为核心的计算方法compute()。 public abstract class RecursiveTask extends ForkJoinTask { private static final long serialVersionUID = 5232453952276485270L; V result; protected abstract V compute(); public final V getRawResult() { return result; } protected final void setRawResult(V value) { result = value; } protected final boolean exec() { result = compute(); return true; }}ForkJoinPool 线程池 ForkJoinTask 中许多功能都依赖于ForkJoinPool线程池,所以说ForkJoinTask运行离不开ForkJoinPool,ForkJoinPool与ThreadPoolExecutor有许多相似之处,他是专门用来执行ForkJoinTask任务的线程池,我之前也有文章对线程池技术进行了介绍,感兴趣的可以进行阅读——从java源码分析线程池(池化技术)的实现原理 ForkJoinPool与ThreadPoolExecutor的继承关系几乎是相同的,他们相当于兄弟关系。 工作窃取算法 ForkJoinPool中采取工作窃取算法,如果每次fork子任务如果都去创建新线程去处理的话,对系统资源的开销是巨大的,所以必须采取线程池。一般的线程池只有一个任务队列,但是对于ForkJoinPool来说,由于同一个任务Fork出的各个子任务是平行关系,为了提高效率,减少线程的竞争,需要将这些平行的任务放到不同的队列中,由于线程处理不同任务的速度不同,这样就可能存在某个线程先执行完了自己队列中的任务,这时为了提升效率,就可以让该线程去"窃取"其它任务队列中的任务,这就是所谓的"工作窃取算法"。 对于一般的队列来说,入队元素都是在队尾,出队元素在队首,要满足"工作窃取"的需求,任务队列应该支持从"队尾"出队元素,这样可以减少与其它工作线程的冲突(因为其它工作线程会从队首获取自己任务队列中的任务),这时就需要使用双端阻塞队列来解决。 构造方法 首先我们来看ForkJoinPool线程池的构造方法,他为我们提供了三种形式的构造,其中最为复杂的是四个入参的构造,下面我们看一下它四个入参都代表什么? int parallelism 可并行级别(不代表最多存在的线程数量) ForkJoinWorkerThreadFactory factory 线程创建工厂 UncaughtExceptionHandler handler 异常捕获处理器 boolean asyncMode 先进先出的工作模式 或者 后进先出的工作模式 public ForkJoinPool() { this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), defaultForkJoinWorkerThreadFactory, null, false); } public ForkJoinPool(int parallelism) { this(parallelism, defaultForkJoinWorkerThreadFactory, null, false); } public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) { this(checkParallelism(parallelism), checkFactory(factory), handler, asyncMode ? FIFO_QUEUE : LIFO_QUEUE, "ForkJoinPool-" + nextPoolId() + "-worker-"); checkPermission(); }提交方法 下面我们看一下提交任务的方法: externalPush这个方法我们很眼熟,它正是在fork的时候如果当前线程不是ForkJoinWorkerThread,新提交任务也是会通过这个方法去执行任务。由此可见,fork就是新建一个子任务进行提交。 externalSubmit是最为核心的一个方法,它可以首次向池提交第一个任务,并执行二次初始化。它还可以检测外部线程的首次提交,并创建一个新的共享队列。 signalWork(ws, q)是发送工作信号,让工作队列进行运转。 public ForkJoinTask submit(Runnable task) { if (task == null) throw new NullPointerException(); ForkJoinTask job; if (task instanceof ForkJoinTask) // avoid re-wrap job = (ForkJoinTask) task; else job = new ForkJoinTask.AdaptedRunnableAction(task); externalPush(job); return job; } final void externalPush(ForkJoinTask task) { WorkQueue[] ws; WorkQueue q; int m; int r = ThreadLocalRandom.getProbe(); int rs = runState; if ((ws = workQueues) != null && (m = (ws.length - 1)) >

= 0 & & (Q = ws [m & r & SQMASK])! = null & & r! = 0 & & rs > 0 & & U.compareAndSwapInt (Q, QLOCK, 0,1)) {ForkJoinTask [] a; int am, n, s If ((a = q.array)! = null & & (am = a.length-1) > (n = (s = q.top)-q.base) {int j = ((am & s))

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report