本篇文章小编给大家分享一下java中fork-join的原理代码解析,文章代码介绍的很详细,小编觉得挺不错的,现在分享给大家供大家参考,有需要的小伙伴们可以来看看。
ForkJoinTask就是ForkJoinPool里面的每一个任务。他主要有两个子类:RecursiveAction和RecursiveTask。然后通过fork()方法去分配任务执行任务,通过join()方法汇总任务结果,
这就是整个过程的运用。他有两个子类,使用这两个子类都可以实现我们的任务分配和计算。
(1)RecursiveAction 一个递归无结果的ForkJoinTask(没有返回值)
(2)RecursiveTask 一个递归有结果的ForkJoinTask(有返回值)
ForkJoinPool:中含有一个workQueues队列;
workQueues:由ForkJoinTask数组和workerThread和指向ForkJoinPool的引用;
ForkJoinTask数组负责存放程序提交给ForkJoinPool的任务,而workerThread数组负责执行这些任务,ForkJoinPool的引用是为了当ForkJoinTask数组中的任务处理完之后再次获取任务交给workerThread进行处理。整个结构大致如下图:
知识点扩展:Java并发Fork-Join框架原理解析
1、什么是Foirk/Join框架
Fork/Join框架是Java7提供用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
2、什么是并行流与顺序流2.1 什么是并行流?
并行流就是把一个内容分成多个数据块,并用不同的线程分别处理每个数据块的流。
2.2 工作窃取模式
某个线程从其他队列里窃取任务来执行,
3、使用Fork/Join框架
/** * 累加运算测试 */ public class ForkJoinCalculate extends RecursiveTask{ /** * */ private static final long serialVersionUID = 7125244951292834932L; private long start;// 起始值 private long end;// 结束值 private static final long THRESHOLD = 10000L;// 临界值 @Override protected Long compute() { long length = end - start; if(length <= THRESHOLD) { long sum = 0L; for (long i = start; i <= end; i++) { sum += i; } return sum; }else { long middle = (start + end) / 2;// 中间值 ForkJoinCalculate left = new ForkJoinCalculate(start, middle);// 0-50000000 left.fork();// 拆分子任务,同时压入线程队列 ForkJoinCalculate right = new ForkJoinCalculate(middle + 1, end);// 50000001-100000000 right.fork();// 拆分子任务,同时压入线程队列 return left.join() + right.join();// 汇总任务结果 } } public ForkJoinCalculate() { } public ForkJoinCalculate(long start, long end) { this.start = start; this.end = end; } }
4、Java8中的并行流和顺序流
4.1顺序流
/** * Java8的顺序流 */ @Test public void test3() { Instant start = Instant.now();// java8中新时间日期API LongStream.rangeClosed(0, 10000000000L) .sequential()// 顺序流 .reduce(0, Long::sum); Instant end = Instant.now(); // 5780 System.out.println("耗费时间为:" + Duration.between(start, end).toMillis());// java8中新时间日期API }
4.2步行流
/** * Java8的并行流 */ @Test public void test4() { Instant start = Instant.now();// java8中新时间日期API LongStream.rangeClosed(0, 10000000000L) .parallel()// 并行流 .reduce(0, Long::sum); Instant end = Instant.now(); // 2392 System.out.println("耗费时间为:" + Duration.between(start, end).toMillis());// java8中新时间日期API }