线程池的种类:

  • ThreadPoolExecutor
    • CachedThreadPool
    • FixedThreadPool
    • ScheduledThreadPool
    • SingleThreadPool
  • ForkJionPool
    • WorkStealingPool
    • ParallelStreamAPI

默认构造函数

public ThreadPoolExecutor(
    int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler
) 
{
    ....
}

批注 20200613 153419.jpg

文字描述corePoolSize,maximumPoolSize,workQueue之间关系。

  • 当线程池中线程数小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。

  • 当线程池中线程数达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行 。

  • 当workQueue已满,且maximumPoolSize > corePoolSize时,新提交任务会创建新线程执行任务。

  • 当workQueue已满,且提交任务数超过maximumPoolSize,任务由RejectedExecutionHandler处理。

  • 当线程池中线程数超过corePoolSize,且超过这部分的空闲时间达到keepAliveTime时,回收这些线程。

  • 当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize范围内的线程空闲时间达到keepAliveTime也将回收。

默认流程图
批注 20200613 153755.jpg

线程池的种类以及使用场景

ThreadPoolExecutor

  1. CachedThreadPool
    这类线程池的特点就是里面没有核心线程,全是非核心线程,其maximumPoolSize设置为Integer.MAX_VALUE,线程可以无限创建,当线程池中的线程都处于活动状态的时候,线程池会创建新的线程来处理新任务,否则会用空闲的线程来处理新任务,这类线程池的空闲线程都是有超时机制的,keepAliveTime在这里是有效的,时长为60秒,超过60秒的空闲线程就会被回收,当线程池都处于闲置状态时,线程池中的线程都会因为超时而被回收,所以几乎不会占用什么系统资源。

任务队列采用的是SynchronousQueue,这个队列是无法插入任务的,一有任务立即执行,所以CachedThreadPool比较适合任务量大但耗时少的任务。

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

批注 20200613 153947.jpg
2. FixedThreadPool
这类线程池的特点就是里面全是核心线程,没有非核心线程,也没有超时机制,任务大小也是没有限制的,数量固定,即使是空闲状态,线程不会被回收,除非线程池被关闭,从构造方法也可以看出来,只有两个参数,一个是指定的核心线程数,一个是线程工厂,keepAliveTime无效。任务队列采用了无界的阻塞队列LinkedBlockingQueue,执行execute方法的时候,运行的线程没有达到corePoolSize就创建核心线程执行任务,否则就阻塞在任务队列中,有空闲线程的时候去取任务执行。

由于该线程池线程数固定,且不被回收,线程与线程池的生命周期同步,所以适用于任务量比较固定但耗时长的任务。

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

批注 20200613 153851.jpg

  1. ScheduledThreadPool
    这类线程池核心线程数量是固定的,好像和FixThreadPool有点像,但是它的非核心线程是没有限制的,并且非核心线程一闲置就会被回收,keepAliveTime同样无效,因为核心线程是不会回收的,当运行的线程数没有达到corePoolSize的时候,就新建线程去DelayedWorkQueue中取ScheduledFutureTask然后才去执行任务,否则就把任务添加到DelayedWorkQueue,DelayedWorkQueue会将任务排序,按新建一个非核心线程顺序执行,执行完线程就回收,然后循环。

任务队列采用的DelayedWorkQueue是个无界的队列,延时执行队列任务。综合来说,这类线程池适用于执行定时任务和具体固定周期的重复任务。

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }
  1. SingleThreadPool
    这类线程池顾名思义就是一个只有一个核心线程的线程池,从构造方法来看,它可以单独执行,也可以与周期线程池结合用。其任务队列是LinkedBlockingQueue,这是个无界的阻塞队列,因为线程池里只有一个线程,就确保所有的任务都在同一个线程中顺序执行,这样就不需要处理线程同步的问题。

这类线程池适用于多个任务顺序执行的场景。

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

可以看到除了多了个 FinalizableDelegatedExecutorService 代理,其初始化和 newFiexdThreadPool 的 nThreads = 1 的时候是一样的。

区别就在于:

  • newSingleThreadExecutor返回的ExcutorService在析构函数finalize()处会调用shutdown()

  • 如果我们没有对它调用shutdown(),那么可以确保它在被回收时调用shutdown()来终止线程。

使用ThreadFactory,可以改变线程的名称、线程组、优先级、守护进程状态,一般采用默认。

流程图略,请参考 newFiexdThreadPool,这里不再累赘。

ForkJoinPool

如果有一天有人面试问你:如何对1000万数字进行排序。
这时候你可以回答:“使用forkjoin”。
Fork/Join是在Java7中提供的一个并发执行任务的框架。他的基本运行流程就是:把一个大任务分解成子任务,如果子任务还不是足够小,就继续分解成子子任务,一直分解到足够小。具体要分解到有多小,你可以自己定义这个阈值。
然后把这些子任务分摊给多个线程去执行,每个线程对应一个双端队列负责保存这些原子任务。
这里叫“原子”任务,之所以叫原子任务,就是为了说明他们已经足够小。是经过多次的递归后的结果。
批注 20200613 171230.jpg
没错,这就是fork的过程。join的过程就是上面的图颠倒过来。

  • ForkJoinPool的简单使用
public class T12_ForkJoinPool {
    static int[] nums = new int[1000000];
    static final int MAX_NUM = 50000;
    static Random r = new Random();

    static {
        //单线程求和
        for (int i = 0; i < nums.length; i++) {
            nums[i] = r.nextInt(100);
        }
        System.out.println("---" + Arrays.stream(nums).sum()); //stream api
    }


    //没有返回值的任务
    static class AddTask extends RecursiveAction {
        int start, end;
        AddTask(int s, int e) {
            start = s;
            end = e;
        }
        @Override
        protected void compute() {
            if (end - start <= MAX_NUM) {
                long sum = 0L;
                for (int i = start; i < end; i++) sum += nums[i];
                System.out.println("from:" + start + " to:" + end + " = " + sum);
            } else {
                int middle = start + (end - start) / 2;
                AddTask subTask1 = new AddTask(start, middle);
                AddTask subTask2 = new AddTask(middle, end);
                subTask1.fork();
                subTask2.fork();
            }
        }
    }

    //有返回值的任务
    static class AddTaskRet extends RecursiveTask<Long> {
        private static final long serialVersionUID = 1L;
        int start, end;
        AddTaskRet(int s, int e) {
            start = s;
            end = e;
        }
        @Override
        protected Long compute() {
            if (end - start <= MAX_NUM) {
                long sum = 0L;
                for (int i = start; i < end; i++) sum += nums[i];
                return sum;
            }
            //end - start大于MAX_NUM,划分为子任务
            int middle = start + (end - start) / 2;
            AddTaskRet subTask1 = new AddTaskRet(start, middle);
            AddTaskRet subTask2 = new AddTaskRet(middle, end);
            subTask1.fork();
            subTask2.fork();
            //子任务join集合起来
            return subTask1.join() + subTask2.join();
        }
    }

    public static void main(String[] args) throws IOException {
		/*ForkJoinPool fjp = new ForkJoinPool();
		AddTask task = new AddTask(0, nums.length);
		fjp.execute(task);*/

        ForkJoinPool fjp = new ForkJoinPool();
        AddTaskRet task = new AddTaskRet(0, nums.length);
        fjp.execute(task);
        long result = task.join();
        System.out.println(result);
    }
}
  • WorkStealingPool

工作窃取算法指的是某个线程从其他队列里窃取任务来执行。使用的场景是一个大任务拆分成多个小任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列中,并且每个队列都有单独的线程来执行队列里的任务,线程和队列一一对应。但是会出现这样一种情况:A线程处理完了自己队列的任务,B线程的队列里还有很多任务要处理。A是一个很热情的线程,想过去帮忙,但是如果两个线程访问同一个队列,会产生竞争,所以A想了一个办法,从双端队列的尾部拿任务执行。而B线程永远是从双端队列的头部拿任务执行(任务是一个个独立的小任务),这样感觉A线程像是小偷在窃取B线程的东西一样。

批注 20200613 172347.jpg

参考https://cloud.tencent.com/developer/article/1165249

  • ParallelStreamAPI

parallelStream其实就是一个并行执行的流,它通过默认的ForkJoinPool,可能提高你的多线程任务的速度。

public class T13_ParallelStreamAPI {
    public static void main(String[] args) {
        List<Integer> nums = new ArrayList<>();
        Random r = new Random();
        for (int i = 0; i < 10000; i++) nums.add(1000000 + r.nextInt(1000000));

        long start = System.currentTimeMillis();
        nums.forEach(v -> isPrime(v));
        long end = System.currentTimeMillis();
        System.out.println(end - start);

        //使用parallel stream api
        start = System.currentTimeMillis();
        nums.parallelStream().forEach(T13_ParallelStreamAPI::isPrime);
        end = System.currentTimeMillis();

        System.out.println(end - start);
    }

    static boolean isPrime(int num) {
        for (int i = 2; i <= num / 2; i++) {
            if (num % i == 0) return false;
        }
        return true;
    }
}

hhhhh