ForkJoin(并行任务)
目录
ForkJoin #
ForkJoin 在JDK1.7出现,主要用于并行执行任务。提高效率
工作特点:工作窃取(使用双端队列实现的)
使用 #
查看JDK1.8的API文档发现:ForkJoinPool
需要创建 ForkJoinTask
来执行任务。
进入查看,发现:RecursiveAction
是递归无返回值的类,RecursiceTask
是递归有返回值的类
进入 RecursiceTask
类发现它提供了一个案例,以及计算的主要方法
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> task = new Fibonacci(10);
ForkJoinTask<Integer> submit = forkJoinPool.submit(task);//提交任务
System.out.println(submit.get());//获取结果
}
}
class Fibonacci extends RecursiveTask<Integer> {
private final int n;
Fibonacci(int n) {
this.n = n;
}
//类似递归的任务
public Integer compute() {
if (n <= 1) return n;
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
return f2.compute() + f1.join();
}
}
import java.util.concurrent.RecursiveTask;
//forkjoin计算类封装
public class ForkJoinDemo extends RecursiveTask<Long> {
private final long star;
private final long end;
public ForkJoinDemo(long star, long end) {
this.star = star;
this.end = end;
}
/**
* 计算方法
* @return
*/
@Override
protected Long compute() {
long temp = 1000000L;
if ((end - star) < temp) {
long sum = 0L;
for (long i = star; i < end; i++) {
sum += i;
}
return sum;
}else {
// 使用ForkJoin 分而治之 计算
//1 . 计算平均值
long middle = (star + end) / 2;
ForkJoinDemo forkJoinDemo1 = new ForkJoinDemo(star, middle);
// 拆分任务,把线程压入线程队列
forkJoinDemo1.fork();
ForkJoinDemo forkJoinDemo2 = new ForkJoinDemo(middle, end);
forkJoinDemo2.fork();
return forkJoinDemo1.join() + forkJoinDemo2.join();
}
}
}
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;
public class ForkJoinTest {
private static final long SUM = 20_0000_0000;
public static void main(String[] args) throws ExecutionException, InterruptedException {
test1();
test2();
test3();
}
/**
* 使用普通方法
*/
public static void test1() {
long star = System.currentTimeMillis();
long sum = 0L;
for (long i = 1; i < SUM ; i++) {
sum += i;
}
long end = System.currentTimeMillis();
System.out.println(sum);
System.out.println("时间:" + (end - star));
System.out.println("----------------------");
}
/**
* 使用ForkJoin 方法
*/
public static void test2() throws ExecutionException, InterruptedException {
long star = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinDemo(0L, SUM);
ForkJoinTask<Long> submit = forkJoinPool.submit(task);
Long along = submit.get();
System.out.println(along);
long end = System.currentTimeMillis();
System.out.println("时间:" + (end - star));
System.out.println("-----------");
}
/**
* 使用 Stream 流计算
*/
public static void test3() {
long star = System.currentTimeMillis();
long sum = LongStream.range(0L, 20_0000_0000L).parallel().reduce(0, Long::sum);
System.out.println(sum);
long end = System.currentTimeMillis();
System.out.println("时间:" + (end - star));
System.out.println("-----------");
}
}
运行结果: