线程安全的集合
如果多个线程要并发地修改一个数据结构,例如散列表,那么很容易破坏这个数据结构。例如,一个线程可能开始向表中插入一个新元素,假定在调整散列表各个桶之间的链接关系的过程中,这个线程的控制权被抢占。如果另一个线程开始遍历同一个链表,可能使用无效的链接并造成混乱,可能会抛出异常或者陷入无限循环。
当然可以通过提供锁来保护共享的数据结构,但是选择线程安全的实现可能更为容易,下面我们将介绍一些 Java 类库提供的另外一些线程安全的集合。
1. 阻塞队列
Java 提供了一个 BlockingQueue
接口,虽然其也是 Queue
的子接口,但它的主要用途并不是作为容器,而是作为线程同步的工具。程序的两个线程通过交替向 BlockingQueue
中放入元素、取出元素,即可很好地控制线程的通信。
BlockingQueue
具有一个特征:当生产者线程试图向 BlockingQueue
中放入元素时,若该队列已满,则该线程被阻塞;当消费者线程试图从 BlockingQueue
中取出元素时,若该队列已空,则该线程被阻塞。
BlockingQueue
继承了 Queue
接口,当然也可以使用 Queue
接口中的方法。但在这里值得注意的是BlockingQueue
提供的两个支持阻塞的方法:
方法 | 说明 |
---|---|
put(E e) | 尝试把 e 元素放入 BlockingQueue 中,若该队列的元素已满,阻塞该线程 |
take() | 尝试从 BlockingQueue 的头部取出元素,若该队列的元素已空,阻塞该线程 |
BlockingQueue
包含如下5个实现类:
BlockingQueue<E> 的实现类 | 说明 |
---|---|
ArrayBlockingQueue | 基于数组实现的 BlockingQueue 队列 |
LinkedBlockingQueue | 基于链表实现的 BlockingQueue 队列 |
LinkedBlockingDeque | |
PriorityBlockingQueue | …… |
DelayQueue | 一个特殊的 BlockingQueue ,底层基于 PriorityBlockingQueue 实现,但 DelayQueue 要求集合元素都实现 Delay 接口,DelayQueue 根据集合元素的 getDelay() 方法返回值进行排序 |
BlockingQueue
与其实现类之间的类图:

代码示例:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
class Producer extends Thread {
private BlockingQueue<String> bq;
public Producer(BlockingQueue<String> bq) {
this.bq = bq;
}
public void run() {
String[] strArr = new String[] { "Java", "Struts", "Spring" };
for (int i = 0; i < 999999999; ++i) {
System.out.println(getName() + "生产者准备生产集合元素!");
try {
Thread.sleep(200);
bq.put(strArr[i % 3]);
} catch (Exception ex) {
ex.printStackTrace();
}
System.out.println(getName() + "生产完成:" + bq);
}
}
}
class Consumer extends Thread {
private BlockingQueue<String> bq;
public Consumer(BlockingQueue<String> bq) {
this.bq = bq;
}
public void run() {
while (true) {
System.out.println(getName() + "消费者准备消费集合元素!");
try {
Thread.sleep(200);
bq.take();
} catch (Exception ex) {
ex.printStackTrace();
}
System.out.println(getName() + "消费完成:" + bq);
}
}
}
class BlockingQueueTest {
public static void main(String[] args) {
BlockingQueue<String> bq = new ArrayBlockingQueue<>(1);
// 启动3个生产者线程
new Producer(bq).start();
new Producer(bq).start();
new Producer(bq).start();
// 启动1个消费者线程
new Consumer(bq).start();
}
}
2. 高效的映射、集、队列
java.util.concurrent
包提供了映射、有序集、队列的高效实现:ConcurrentHashMap, ConcurrentSkipListMap, ConcurrentSkipListSet, ConcurrentLinkedQueue
。
这些集合使用复杂的算法,通过允许并发地访问数据结构的不同部分尽可能减少竞争。
与大多数集合不同,这些类的
size
方法不一定在常量时间内完成操作,确定这些集合的当前大小通常需要遍历。有些应用使用庞大的并发散列映射,这些映射太过庞大,以至于无法用
size
方法得到它的大小,因为这个方法只能返回int
。如果这些映射包含超过20亿个条目,可以使用mappingCount
方法把大小作为long
返回。因为
ConcurrentHashMap
和ConcurrentLinkedQueue
支持多线程并发访问,所以当使用迭代器来遍历集合元素时,该迭代器可能不能反映出创建迭代器之后所做的修改,但程序不会抛出任何异常。
在 java.util.concurrent
包下提供了大量支持高效并发访问的集合接口和实现类:

这些线程安全的集合类可以分为如下两类:
- 以
Concurrent
开头的集合类:代表支持并发访问的集合,它们可以支持多个线程并发写入访问,这些写入线程的所有操作都是线程安全的,但读取操作不必锁定,因此在并发写入时具有较好的性能; - 以
CopyOnWrite
开头的集合类:- 当线程对
CopyOnWriteArrayList
集合执行读取操作时,线程将会直接读取集合本身,无须加锁与阻塞,因而读取操作很快、很安全; - 当线程对
CopyOnWriteArrayList
集合执行写入操作时,该集合会在底层复制一份新的数组,接下来对新的数组执行写入操作,故而保证了线程安全,但也因此使得其在写入操作时性能较差。
- 当线程对
3. 映射条目的原子更新
以后再说吧......
4. 对并发HashMap的批操作
Java API 为并发散列映射提供了批操作,即使有些其他线程在处理映射,这些操作也能安全地执行。批操作会遍历映射,处理遍历过程中找到的元素,这里不会冻结映射的当前快照,除非你恰好知道批操作运行时映射不会被修改,否则就要把结果看作是映射状态的一个近似。
有3种不同的操作:
search
reduce
forEach
每个操作都有4个版本:
operationKeys
:处理键operationValues
:处理值operation
:处理键和值operationEntries
:处理Map.Entry
对象
对于上述各个操作,需要指定一个参数化阈值。如果映射包含的元素多于这个阈值,就会并行完成批操作。如果希望批操作在一个线程中运行,可以使用阈值 Long.MAX_VALUE
;如果希望用尽多的线程运行批操作,可以使用阈值1。
5. 并发Set视图
并没有 ConcurrentHashSet
类,ConcurrentHashMap
有一个静态的 newKeySet
方法会生成一个 Set<K>
,这实际上是 ConcurrentHashMap<K, Boolean>
的一个包装器(所有映射值都为 Boolean.TRUE
,不过因为只是要把它用作一个 Set
,所以并不关心映射值)。
Set<String> words = ConcurrentHashMap.<String>newKeySet();
ConcurrentHashMap
还有第二个 keySet
方法,它包含一个默认值,为集增加元素时可以使用这个方法:
Set<String> words = map.keySet(1L);
words.add("Java"); // 如果"Java"在words中不存在,map会有一个值1
6. 同步包装器
任何集合类都可以通过使用**同步包装器(synchronization wrapper)**变成线程安全的:
List<E> syncArrayList = Collections.synchronizedList(new ArrayList<E>());
Map<K, V> syncHashMap = Collections.synchronizedMap(new HashMap<K, V>());
通常,最好使用 java.util.concurrent
包中定义的集合,而不是同步包装器。