线程池
1. 线程池概念
构造一个新的线程开销有些大,因为这涉及与操作系统的交互。如果你的程序中创建了大量的生命期很短的线程,那么不应该把每个任务映射到一个单独的线程,而应该使用线程池。
线程池在系统启动时即创建大量空闲的线程,程序将一个 Runnable
对象或 Callable
对象传给线程池,线程池就会启动一个空闲的线程来执行它们的 run
或 call
方法,当 run
或 call
方法执行结束后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个对象的 run
或 call
方法。
使用线程池可以有效地控制系统中并发线程的数量,当系统中包含大量并发线程时,会导致系统性能剧烈下降,甚至 JVM 崩溃,而线程池的最大线程数参数可以控制系统中并发线程数不超过此数。
2. Executor框架
线程池相关内容位于 java.util.concurrent
包下,其主要相关类/接口的体系结构为:
其中
ExecutorService
表达了线程池的概念。

Executor
框架是 Java 5 之后引进的,通过 Executor
来启动线程比使用 Thread
的 start
方法更好,除了更易管理、效率更好(用线程池实现,节约开销)外,还有助于避免 this 逃逸问题。
this 逃逸是指在构造器返回之前,其他线程就持有该对象的引用,调用尚未构造完全的对象的方法可能引发奇葩的错误。
Executor
框架不仅包括了线程池的管理,还提供了线程工厂、队列以及拒绝策略等,Executor
框架让并发编程变得更加简单。
Executor
框架主要由三大部分组成:
- 任务(
Runnable
/Callable
) - 任务的执行(
Executor
) - 异步计算的结果(
Future
)
2.1 任务:Runnable
/Callable
执行任务需要实现 Runnable
接口或 Callable
接口。
Runnable
封装一个异步运行的任务,可以把它想象成一个没有参数和返回值的异步方法。Callable
与Runnable
类似,但是有返回值。
2.2 任务的执行:Executor
如前图所示,主要包括任务执行机制的核心接口 Executor
,以及继承自 Executor
接口的 ExecutorService
接口。
ThreadPoolExecutor
和ScheduledThreadPoolExecutor
这两个关键类实现了ExecutorService
接口。
在 Java 类库中,任务执行的主要抽象不是 Thread
,而是 Executor
,Executor
是个简单的接口,如下:
package java.util.concurrent;
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*/
void execute(Runnable command);
}
Executor
提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用 Runnable
来表示任务,将 Runnable
接口的实现类交给 execute
方法即可,作为它的一个参数。
2.3 异步计算的结果:Future
Future
保存异步计算的结果。可以启动一个计算,将 Future
对象交给某个线程,然后忘掉它,这个 Future
对象的所有者在结果计算好之后就可以获得结果。
说到 Future
,它常用的实现的自然是 FutureTask
了。
3. Executors工厂类
Java 提供了一个执行器 Executors
工厂类来构造线程池,其包含如下几类静态工厂方法来创建线程池:
返回类型 | 工厂方法名 | 说明 |
---|---|---|
ExecutorService | newCachedThreadPool | 创建一个缓存线程池。系统会在必要的时候创建线程,如果线程已经空闲60秒则终止该线程 |
ExecutorService | newFixedThreadPool | 创建一个具有固定线程数的线程池,空闲线程会有一直保留 |
ExecutorService | newWorkStealingPool | 一种适合fork-join任务的线程池,其中复杂的任务会分解为更简单的任务,空闲线程会“密取”较简单的任务 |
ExecutorService | newSingleThreadExecutor | 创建一个只有单线程的“线程池”,会顺序地执行所提交的任务 |
ScheduledExecutorService | newScheduledThreadPool | 创建具有指定线程数的线程池,用于调度执行 |
ScheduledExecutorService | newSingleThreadScheduledExecutor | 用于调度执行的单线程“池” |
ExecutorService
接口代表一个线程池,它可以执行Runnable/Callable
对象所代表的线程,ExecutorService
同时代表尽快执行线程的线程池,只要线程池中有空闲线程,就立即执行线程任务;ScheduledExecutorService
是ExecutorService
的子接口,它代表可在指定延迟后或周期性地执行线程任务的线程池。- 如果线程生存期很短,或者大量时间都在阻塞,那么可以使用一个缓存线程池。
- 为了得到最优的运行速度,并发线程数等于处理器内核数。在这种情况下,就应当使用固定线程池,即并发线程总数有一个上限。
- 单线程执行器对于性能分析很有帮助,如果临时用一个单线程池替换缓存或固定线程池,就能测量不使用并发的情况下应用的运行速度会慢多少。
4. ExecutorService
4.1 ExecutorService 接口
ExecutorService
接口是 Executor
接口的子接口,其设置的方法主要有:
方法 | 作用 |
---|---|
void execute(Runnable command) | 继承自父接口Executor 的方法 |
Future<?> submit(Runnable task) | 提交指定的任务来执行 |
<T> Future<T> submit(Runnable task, T result) | 提交指定的任务来执行 |
<T> Future<T> submit(Callable<T> task) | 提交指定的任务来执行 |
void shutdown() | 关闭服务,完成已经提交的任务但不再接受新的任务。 此方法是立即返回的,不会等待所有任务执行完毕后。 |
List<Runnable> shutdownNow() | 尝试停止所有正在执行的任务,并不再处理处于等待状态的任务,最后返回等待执行的任务列表。 此方法是立即返回的,不会等待正在执行的这些任务执行完毕。 |
boolean isShutdown() | true if this executor has been shut down. |
boolean isTerminated() | true if all tasks have completed following shut down. 要想返回 true ,必须先执行shutdown 或shutdownNow 方法。 |
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException | Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs, or the current thread is interrupted, whichever happens first. 此方法会持续等待才返回。 |
T invokeAny(Collection<Callable<T>> tasks) | 执行给定的任务,返回其中一个任务的结果 |
T invokeAny(Collection<Callable<T>> tasks, long timeout, TimeUnit unit) | 执行给定的任务,返回其中一个任务的结果。如果超时,抛出一个异常 |
List<Future<T>> invokeAll(Collection<Callable<T>> tasks) | 执行给定的任务,返回所有任务的结果。 |
List<Future<T>> invokeAll(Collection<Callable<T>> tasks, long timeout, TimeUnit unit) | 执行给定的任务,返回所有任务的结果。如果超时,抛出一个异常。 |
4.2 ScheduledExecutorService 接口
ScheduledExecutorService
接口又是 ExecutorService
的子接口,其提供的方法主要有:
方法 | 作用 |
---|---|
ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) | 调度在指定的时间之后执行任务 |
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) | 调度在指定的时间之后执行任务 |
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) | 调度在初始延迟之后,周期性地运行给定的任务,周期长度是period个单位 |
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) | 调度在初始延迟之后,周期性地运行给定的任务,在一次调用完成和下一次调用开始之间有长度为delay个单位的延迟 |
4.3 线程池的使用步骤
使用线程池来执行线程任务的步骤如下:
- 调用
Executors
类的静态工厂方法创建一个ExecutorService
对象,该对象代表一个线程池; - 创建
Runnable/Callable
实现类的实例,作为线程执行任务; - 调用
ExecutorService
对象的submit
方法来提交Runnable/Callable
实例; - 当不想提交任何任务时,调用
ExecutorService
对象的shutdown
方法来关闭线程池。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
class ThreadPoolTest {
public static void main(String[] args) throws Exception {
// 创建一个具有固定线程数(6)的线程池
ExecutorService pool = Executors.newFixedThreadPool(6);
Runnable target = () -> {
for (int i = 0; i < 100; ++i) {
System.out.println(Thread.currentThread().getName() + "的值为:" + i);
}
};
pool.submit(target);
pool.submit(target);
pool.shutdown();
}
}
5. fork-join框架
Java 7 中引入了 fork-join “框架”,假设有一个处理任务,它可以很自然地分解为子任务,如下所示:
if (problemSize < threshold) {
// solve problem directly
} else {
// break problem into subproblems
// recursively solve each subproblem
// combine the results
}
图像处理就是这样一个例子。要增强一个图像,可以变换上半部分和下半部分,如果有足够多空闲的处理器,这些操作可以并行运行。
fork-join 池是针对非阻塞工作负载优化的,如果向一个 fork-join 池增加很多阻塞任务,会让它无法有效工作。此时可以让任务实现 ForkJoinPool.ManagedBlocker
接口来解决这个问题,不过这是一种高级技术,在这里不作讨论。
5.1 抽象类RecurvieTask<T>
与RecursiveAction
以一个简单的例子为例,假设想统计一个数组中有多少个元素满足某个特定的属性,可以将这个数组一分为二,分别对这两部分进行统计,再将结果相加。
要采用框架可用的一种方式完成这种递归计算,需要提供一个扩展 RecurvieTask<T>
的类(如果计算会生成一个类型为T
的结果),或者提供一个扩展 RecursiveAction
的类(如果不生成任何结果),再覆盖 compute
方法业生成并调用子任务,然后合并其结果:
class Counter extends RecursiveTask<Integer> {
// ...
protected Integer compute() {
if (to - from < THRESHOLD) {
// solve problem directly
} else {
int mid = (from + to) / 2;
var first = new Counter(values, from, mid, filter);
var second = new Counter(values, mid, to, filter);
invokeAll(first, second);
return first.join() + second.join();
}
}
}
在后台,fork-join 框架使用了一种有效的智能方法来平衡可用线程的工作负载,这种方法称为工作密取(work stealing)。每个工作线程都有一个双端队列来完成任务,一个工作线程将子任务压入其双端队列的队头(只有一个线程可以访问队头,所以不需要加锁),一个工作线程空闲时,它会从另一个双端队列“密取”一个任务。由于大的子任务都在队尾,这种密取很少出现。
5.2 ForkJoinPool类
来自《疯狂Java讲义》。
Java 还提供了 ForkJoinPool
类来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合并成总的计算结果。ForkJoinPool
是 ExecutorService
的实现类,因此是一种特殊的线程池。
构造器
方法 | 作用 |
---|---|
ForkJoinPool(int parallelism) | 创建一个包含 parallelism 个并行线程的 ForkJoinPool |
ForkJoinPool() | 以 Runtime.availableProcessors() 方法的返回值作为 parallelism 参数来创建 ForkJoinPool |
方法
ForkJoinPool
通过2个静态方法提供通用池功能:
方法 | 作用 |
---|---|
ForkJoinPool commonPool() | 返回一个通用池,通用池的运行状态不会受 shutdown()/shutdownNow() 方法的影响 |
int getCommonPoolParallelism() | 返回通用池的并行级别 |
ForkJoinPool
提供的执行任务的方法:
方法 | 作用 |
---|---|
submit(ForkJoinTask task) | |
invoke(ForkJoinTask task) |
ForkJoinTask
代表一个可以并行、合并的任务,其是一个抽象类;ForkJoinTask
还有两个抽象子类:RecursiveAction, RecursiveTask
,前者代表有返回值的任务,后者代表没有返回值的任务。
示例
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;
// “大任务”:简单地打印0~300的数值
class PrintTask extends RecursiveAction {
// 每个“小任务”最多只打印50个数
private static final int THRESHOLD = 50;
private int start;
private int end;
// 打印从start到end的任务
public PrintTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected void compute() {
// 当end与start之间的差小于THRESHOLD时,开始打印
if (end - start < THRESHOLD) {
for (int i = start; i < end; ++i)
System.out.println(Thread.currentThread().getName() + "的i值为:" + i);
} else {
// 当end与start之间的差大于THRESHOLD时
// 将大任务分解成两个小任务
int middle = (start + end) / 2;
PrintTask left = new PrintTask(start, middle);
PrintTask right = new PrintTask(middle, end);
// 并行执行两个“小任务”
left.fork();
right.fork();
}
}
}
class ForkJoinPoolTest {
public static void main(String[] args) throws Exception {
ForkJoinPool pool = new ForkJoinPool();
pool.submit(new PrintTask(0, 300));
pool.awaitTermination(2, TimeUnit.SECONDS);
pool.shutdown();
}
}
6. ExecutorCompletionService 类
TODO 应用场景是?
ExecutorCompletionService<V>
类与 Executor
接口不是一个体系,其实现了 CompletionService<V>
接口。
方法 | 作用 |
---|---|
ExecutorCompletionService(Executor e) | 构造一个执行器完成服务来收集给定执行器的结果 |
Future<V> submit(Callable<V> task) | 提交一个任务给底层的执行器 |
Future<V> submit(Runnable task, V result) | 提交一个任务给底层的执行器 |
Future<V> take() | 移除下一个已完成的结果,如果没有可用的已完成结果,则阻塞 |
Future<V> poll() | 移除并返回下一个已完成的结果,如果没有可用的已完成结果,则返回 null |
Future<V> poll(long time, TimeUnit unit) | 同上,等待给定的时间 |