跳到主要内容
  1. 所有文章/
  2. Java并发编程笔记/

ForkJoin(并行任务)

·📄 776 字·🍵 2 分钟

ForkJoin #

ForkJoin 在JDK1.7出现,主要用于并行执行任务。提高效率

工作特点:工作窃取(使用双端队列实现的)

image-20220210114455007.png

使用 #

查看JDK1.8的API文档发现:ForkJoinPool需要创建 ForkJoinTask来执行任务。

image-20220210120351456.png

进入查看,发现:RecursiveAction是递归无返回值的类,RecursiceTask是递归有返回值的类

image-20220210120535583.png

进入 RecursiceTask 类发现它提供了一个案例,以及计算的主要方法

image-20220210120830186.png

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Integer> task = new Fibonacci(10);
        ForkJoinTask<Integer> submit = forkJoinPool.submit(task);//提交任务
        System.out.println(submit.get());//获取结果
    }
}
class Fibonacci extends RecursiveTask<Integer> {
    private final int n;
    Fibonacci(int n) {
        this.n = n;
    }
    //类似递归的任务
    public Integer compute() {
        if (n <= 1) return n;
        Fibonacci f1 = new Fibonacci(n - 1);
        f1.fork();
        Fibonacci f2 = new Fibonacci(n - 2);
        return f2.compute() + f1.join();
    }
}
import java.util.concurrent.RecursiveTask;
//forkjoin计算类封装
public class ForkJoinDemo extends RecursiveTask<Long> {
    private final long star;
    private final long end;

    public ForkJoinDemo(long star, long end) {
        this.star = star;
        this.end = end;
    }

    /**
     * 计算方法
     * @return
     */
    @Override
    protected Long compute() {
        long temp = 1000000L;
        if ((end - star) < temp) {
            long sum = 0L;
            for (long i = star; i < end; i++) {
                sum += i;
            }
            return sum;
        }else {
            // 使用ForkJoin 分而治之 计算
            //1 . 计算平均值
            long middle = (star + end) / 2;
            ForkJoinDemo forkJoinDemo1 = new ForkJoinDemo(star, middle);
            // 拆分任务,把线程压入线程队列
            forkJoinDemo1.fork();
            ForkJoinDemo forkJoinDemo2 = new ForkJoinDemo(middle, end);
            forkJoinDemo2.fork();
            return forkJoinDemo1.join() + forkJoinDemo2.join();
        }
    }
}
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;

public class ForkJoinTest {
    private static final long SUM = 20_0000_0000;

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        test1();
        test2();
        test3();
    }

    /**
     * 使用普通方法
     */
    public static void test1() {
        long star = System.currentTimeMillis();
        long sum = 0L;
        for (long i = 1; i < SUM ; i++) {
            sum += i;
        }
        long end = System.currentTimeMillis();
        System.out.println(sum);
        System.out.println("时间:" + (end - star));
        System.out.println("----------------------");
    }
    /**
     * 使用ForkJoin 方法
     */
    public static void test2() throws ExecutionException, InterruptedException {
        long star = System.currentTimeMillis();

        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Long> task = new ForkJoinDemo(0L, SUM);
        ForkJoinTask<Long> submit = forkJoinPool.submit(task);
        Long along = submit.get();

        System.out.println(along);
        long end = System.currentTimeMillis();
        System.out.println("时间:" + (end - star));
        System.out.println("-----------");
    }
    /**
     * 使用 Stream 流计算
     */
    public static void test3() {
        long star = System.currentTimeMillis();

        long sum = LongStream.range(0L, 20_0000_0000L).parallel().reduce(0, Long::sum);
        System.out.println(sum);
        long end = System.currentTimeMillis();
        System.out.println("时间:" + (end - star));
        System.out.println("-----------");
    }
}

运行结果:

image-20220210115115239.png