线程池的种类:
- ThreadPoolExecutor
- CachedThreadPool
- FixedThreadPool
- ScheduledThreadPool
- SingleThreadPool
- ForkJionPool
- WorkStealingPool
- ParallelStreamAPI
默认构造函数
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler
)
{
....
}
文字描述corePoolSize,maximumPoolSize,workQueue之间关系。
-
当线程池中线程数小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
-
当线程池中线程数达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行 。
-
当workQueue已满,且maximumPoolSize > corePoolSize时,新提交任务会创建新线程执行任务。
-
当workQueue已满,且提交任务数超过maximumPoolSize,任务由RejectedExecutionHandler处理。
-
当线程池中线程数超过corePoolSize,且超过这部分的空闲时间达到keepAliveTime时,回收这些线程。
-
当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize范围内的线程空闲时间达到keepAliveTime也将回收。
默认流程图
线程池的种类以及使用场景
ThreadPoolExecutor
- CachedThreadPool
这类线程池的特点就是里面没有核心线程,全是非核心线程,其maximumPoolSize设置为Integer.MAX_VALUE,线程可以无限创建,当线程池中的线程都处于活动状态的时候,线程池会创建新的线程来处理新任务,否则会用空闲的线程来处理新任务,这类线程池的空闲线程都是有超时机制的,keepAliveTime在这里是有效的,时长为60秒,超过60秒的空闲线程就会被回收,当线程池都处于闲置状态时,线程池中的线程都会因为超时而被回收,所以几乎不会占用什么系统资源。
任务队列采用的是SynchronousQueue,这个队列是无法插入任务的,一有任务立即执行,所以CachedThreadPool比较适合任务量大但耗时少的任务。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
2. FixedThreadPool
这类线程池的特点就是里面全是核心线程,没有非核心线程,也没有超时机制,任务大小也是没有限制的,数量固定,即使是空闲状态,线程不会被回收,除非线程池被关闭,从构造方法也可以看出来,只有两个参数,一个是指定的核心线程数,一个是线程工厂,keepAliveTime无效。任务队列采用了无界的阻塞队列LinkedBlockingQueue,执行execute方法的时候,运行的线程没有达到corePoolSize就创建核心线程执行任务,否则就阻塞在任务队列中,有空闲线程的时候去取任务执行。
由于该线程池线程数固定,且不被回收,线程与线程池的生命周期同步,所以适用于任务量比较固定但耗时长的任务。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- ScheduledThreadPool
这类线程池核心线程数量是固定的,好像和FixThreadPool有点像,但是它的非核心线程是没有限制的,并且非核心线程一闲置就会被回收,keepAliveTime同样无效,因为核心线程是不会回收的,当运行的线程数没有达到corePoolSize的时候,就新建线程去DelayedWorkQueue中取ScheduledFutureTask然后才去执行任务,否则就把任务添加到DelayedWorkQueue,DelayedWorkQueue会将任务排序,按新建一个非核心线程顺序执行,执行完线程就回收,然后循环。
任务队列采用的DelayedWorkQueue是个无界的队列,延时执行队列任务。综合来说,这类线程池适用于执行定时任务和具体固定周期的重复任务。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
- SingleThreadPool
这类线程池顾名思义就是一个只有一个核心线程的线程池,从构造方法来看,它可以单独执行,也可以与周期线程池结合用。其任务队列是LinkedBlockingQueue,这是个无界的阻塞队列,因为线程池里只有一个线程,就确保所有的任务都在同一个线程中顺序执行,这样就不需要处理线程同步的问题。
这类线程池适用于多个任务顺序执行的场景。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
可以看到除了多了个 FinalizableDelegatedExecutorService 代理,其初始化和 newFiexdThreadPool 的 nThreads = 1 的时候是一样的。
区别就在于:
-
newSingleThreadExecutor返回的ExcutorService在析构函数finalize()处会调用shutdown()
-
如果我们没有对它调用shutdown(),那么可以确保它在被回收时调用shutdown()来终止线程。
使用ThreadFactory,可以改变线程的名称、线程组、优先级、守护进程状态,一般采用默认。
流程图略,请参考 newFiexdThreadPool,这里不再累赘。
ForkJoinPool
如果有一天有人面试问你:如何对1000万数字进行排序。
这时候你可以回答:“使用forkjoin”。
Fork/Join是在Java7中提供的一个并发执行任务的框架。他的基本运行流程就是:把一个大任务分解成子任务,如果子任务还不是足够小,就继续分解成子子任务,一直分解到足够小。具体要分解到有多小,你可以自己定义这个阈值。
然后把这些子任务分摊给多个线程去执行,每个线程对应一个双端队列负责保存这些原子任务。
这里叫“原子”任务,之所以叫原子任务,就是为了说明他们已经足够小。是经过多次的递归后的结果。
没错,这就是fork的过程。join的过程就是上面的图颠倒过来。
- ForkJoinPool的简单使用
public class T12_ForkJoinPool {
static int[] nums = new int[1000000];
static final int MAX_NUM = 50000;
static Random r = new Random();
static {
//单线程求和
for (int i = 0; i < nums.length; i++) {
nums[i] = r.nextInt(100);
}
System.out.println("---" + Arrays.stream(nums).sum()); //stream api
}
//没有返回值的任务
static class AddTask extends RecursiveAction {
int start, end;
AddTask(int s, int e) {
start = s;
end = e;
}
@Override
protected void compute() {
if (end - start <= MAX_NUM) {
long sum = 0L;
for (int i = start; i < end; i++) sum += nums[i];
System.out.println("from:" + start + " to:" + end + " = " + sum);
} else {
int middle = start + (end - start) / 2;
AddTask subTask1 = new AddTask(start, middle);
AddTask subTask2 = new AddTask(middle, end);
subTask1.fork();
subTask2.fork();
}
}
}
//有返回值的任务
static class AddTaskRet extends RecursiveTask<Long> {
private static final long serialVersionUID = 1L;
int start, end;
AddTaskRet(int s, int e) {
start = s;
end = e;
}
@Override
protected Long compute() {
if (end - start <= MAX_NUM) {
long sum = 0L;
for (int i = start; i < end; i++) sum += nums[i];
return sum;
}
//end - start大于MAX_NUM,划分为子任务
int middle = start + (end - start) / 2;
AddTaskRet subTask1 = new AddTaskRet(start, middle);
AddTaskRet subTask2 = new AddTaskRet(middle, end);
subTask1.fork();
subTask2.fork();
//子任务join集合起来
return subTask1.join() + subTask2.join();
}
}
public static void main(String[] args) throws IOException {
/*ForkJoinPool fjp = new ForkJoinPool();
AddTask task = new AddTask(0, nums.length);
fjp.execute(task);*/
ForkJoinPool fjp = new ForkJoinPool();
AddTaskRet task = new AddTaskRet(0, nums.length);
fjp.execute(task);
long result = task.join();
System.out.println(result);
}
}
- WorkStealingPool
工作窃取算法指的是某个线程从其他队列里窃取任务来执行。使用的场景是一个大任务拆分成多个小任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列中,并且每个队列都有单独的线程来执行队列里的任务,线程和队列一一对应。但是会出现这样一种情况:A线程处理完了自己队列的任务,B线程的队列里还有很多任务要处理。A是一个很热情的线程,想过去帮忙,但是如果两个线程访问同一个队列,会产生竞争,所以A想了一个办法,从双端队列的尾部拿任务执行。而B线程永远是从双端队列的头部拿任务执行(任务是一个个独立的小任务),这样感觉A线程像是小偷在窃取B线程的东西一样。
- ParallelStreamAPI
parallelStream其实就是一个并行执行的流,它通过默认的ForkJoinPool,可能提高你的多线程任务的速度。
public class T13_ParallelStreamAPI {
public static void main(String[] args) {
List<Integer> nums = new ArrayList<>();
Random r = new Random();
for (int i = 0; i < 10000; i++) nums.add(1000000 + r.nextInt(1000000));
long start = System.currentTimeMillis();
nums.forEach(v -> isPrime(v));
long end = System.currentTimeMillis();
System.out.println(end - start);
//使用parallel stream api
start = System.currentTimeMillis();
nums.parallelStream().forEach(T13_ParallelStreamAPI::isPrime);
end = System.currentTimeMillis();
System.out.println(end - start);
}
static boolean isPrime(int num) {
for (int i = 2; i <= num / 2; i++) {
if (num % i == 0) return false;
}
return true;
}
}
Comments | 0 条评论