[TOC]
是一类将对应容器的状态都封装起来,并对每个共有方法都进行同步(加synchronized关键字修饰)的类,相当于让所有对容器的状态的访问串行化,虽然安全但是并发性差。
- Vector
- HashTable
对容器进行迭代操作时,我们要考虑它是不是会被其他的线程修改,如果是我们自己写代码,可以考虑通过如下方式对容器的迭代操作加锁:
synchronized (vector) {
for (int i = 0; i < vector.size(); i++)
doSomething(vector.get(i));
}
不过 Java 自己的同步容器类并没有考虑并发修改的问题,它主要采用了一种及时失败的方法,即一旦容器被其他线程修改,它就会抛出异常,例如 Vector 类,它的内部实现是这样的:
Vector#iterator()
会返回一个 Vector 的内部类Itr implement Iterator<E>
,在 Itr 的next()
和remove()
方法中有如下代码:
synchronized (Vector.this) { // 类名.this:在内部类中,要用到外围类的this对象,使用“外围类名.this”
checkForComodification(); // 在进行next和remove操作前,会先检查以下容器是否被修改
...
}
/* checkForComodification()方法 */
final void checkForComodification() {
if (modCount != expectedModCount) // 在Itr的成员变量中有一个:int exceptedModCount = modCount;
throw new ConcurrentModificationException(); // 如果容器被修改了,modCount会变
}
因此,我们在调用 Vector 的如下方法时,要小心,因为它们会隐式的调用 Vector 的迭代操作。
- toString
- hashCode
- equals
- containsAll
- removeAll
- retainAll
- 容器作为参数的构造函数
通过上一小节的分析,我们发现同步容器类的性能实在太差,所以我们可以通过并发容器类代替同步容器类,来提高系统的可伸缩性。我们主要介绍ConcurrentHashMap
和CopyOnWriteArrayList
。
- ConcorrentHashMap 实现了 ConcorrentMap 接口,能在并发环境实现更高的吞吐量,而在单线程环境中只损失很小的性能;
- 采用分段锁,使得任意数量的读取线程可以并发地访问 Map,一定数量的写入线程可以并发地修改 Map;
- 不会抛出 ConcorrentModificationException,它返回迭代器具有“弱一致性”,即可以容忍并发修改,但不保证将修改操作反映给容器;
- size() 的返回结果可能已经过期,只是一个估计值,不过 size() 和 isEmpty() 方法在并发环境中用的也不多;
- 提供了许多原子的复合操作:
V putIfAbsent(K key, V value);
:K 没有相应映射才插入boolean remove(K key, V value);
:K 被映射到 V 才移除boolean replace(K key, V oldValue, V newValue);
:K 被映射到 oldValue 时才替换为 newValue
ConcurrentHashMap 内部结构:
- 在构造的时候,Segment 的数量由所谓的 concurrentcyLevel 决定,默认是 16;
- Segment 是基于 ReentrantLock 的扩展实现的,在 put 的时候,会对修改的区域加锁。
锁分段: 不同线程在同一数据的不同部分上不会互相干扰,例如,ConcurrentHashMap 支持 16 个并发的写入器,是用 16 个锁来实现的。它的实现原理如下:
-
使用了一个包含 16 个锁的数组,每个锁保护所有散列桶的 1/16,其中第 N 个散列桶由第(N % 16)个锁来保护;
-
这大约能把对于锁的请求减少到原来的 1/16,也是 ConcurrentHashMap 最多能支持 16 个线程同时写入的原因;
-
对于 ConcurrentHashMap 的 size() 操作,为了避免枚举每个元素,ConcurrentHashMap 为每个分段都维护了一个独立的计数,并通过每个分段的锁来维护这个值,而不是维护一个全局计数;
-
代码示例:
public class StripedMap { // 同步策略:buckets[n]由locks[n % N_LOCKS]保护 private static final int N_LOCKS = 16; private final Node[] buckets; private final Object[] locks; // N_LOCKS个锁 private static class Node { Node next; Object key; Object value; } public StripedMap(int numBuckets) { buckets = new Node[numBuckets]; locks = new Object[N_LOCKS]; for (int i = 0; i < N_LOCKS; i++) locks[i] = new Object(); } private final int hash(Object key) { return Math.abs(key.hashCode() % buckets.length); } public Object get(Object key) { int hash = hash(key); synchronized (locks[hash % N_LOCKS]) { // 分段加锁 for (Node m = buckets[hash]; m != null; m = m.next) if (m.key.equals(key)) return m.value; } return null; } public void clear() { for (int i = 0; i < buckets.length; i++) { synchronized (locks[i % N_LOCKS]) { // 分段加锁 buckets[i] = null; } } } }
- 关于 put 操作:
- 是否需要扩容
- 在插入元素前判断是否需要扩容,
- 比 HashMap 的插入元素后判断是否需要扩容要好,因为可以插入元素后,Map 扩容,之后不再有新的元素插入,Map就进行了一次无效的扩容
- 如何扩容
- 先创建一个容量是原来的2倍的数组,然后将原数组中的元素进行再散列后插入新数组中
- 为了高效,ConcurrentHashMap 只对某个 segment 进行扩容
- 是否需要扩容
- 关于 size 操作:
- 存在问题:如果不进行同步,只是计算所有 Segment 维护区域的 size 总和,那么在计算的过程中,可能有新的元素 put 进来,导致结果不准确,但如果对所有的 Segment 加锁,代价又过高。
- 解决方法:重试机制,通过获取两次来试图获取 size 的可靠值,如果没有监控到发生变化,即
Segment.modCount
没有变化,就直接返回,否则获取锁进行操作。
- 只要正确发布了这个 list,它就是不可变的了,所以随便并发访问,当需要修改时,就创建一个新的容器副本替代原来的,以实现可变性;
- 应用于迭代操作远多于修改操作的情形,如:事件通知系统,分发通知时需要迭代已注册监听器链表,并调用每一个监听器,一般注册和注销事件监听器的操作远少于接收事件通知的操作。
可以根据自身状态协调线程的控制流:
- 生产者消费者模式:阻塞队列(BlockingQueue)
- 并发流程控制:
- 闭锁(CountDownLatch)
- 栅栏(Barrier)
- 信号量(Semaphore)
- 线程间的数据交换:交换者(Exchanger)
BlockingQueue 提供了可阻塞的 put 和 take 方法:(都是阻塞方法,会抛出 InterruptException 异常)
- 如果队列为空,take 方法一直被阻塞,直到队列中出现一个可用元素
- 如果队列已满,put 方法一直被阻塞,直到队列中出现可用空间
是设计 “生产者 -- 消费者模式” 的利器!
阻塞队列类 | 结构 | 界 | 特点 |
---|---|---|---|
ArrayBlockingQueue | 数组 | 有 | FIFO |
LinkedBlockingQueue | 链表 | 有 | FIFO |
PriorityBlockingQueue | 优先队列 | 无 | 按优先级先后出队 |
DelayQueue | 使用优先队列实现 | 无 | 向队列中 put 元素时指定多久才能从队列中获取当前元素,只有当延时时间到了,才能从队列中获取该元素,队列元素要实现 Delayed 接口,可以用来设计缓存系统 |
SynchronousQueue | 不存储元素的阻塞队列 | 有 | 每一个 put 操作等待一个 take 操作,否则无法继续添加元素 |
LinkedTransferQueue | 链表 | 无 | transfer() :如果当前有在等待接收元素的消费者,可以把新元素直接给消费者,没有则阻塞;tryTransfer() :如果没有消费者等待会返回 false;它们的区别就在于会不会立即返回 |
LinkedBlockDeque | 链表(双向队列) | 有 | 双向队列可用来实现工作密取模式,即如果一个消费者完成了自己的 Deque 中的全部任务,它可以偷偷的去其他消费者的 Deque 的尾部获取工作,以保证所有线程都处于忙碌状态,可应用于爬虫。 |
对于阻塞队列的实现原理,我们最关注的是其通知模式的实现,即 BlockingQueue 是如何在队列满时通知 put 操作等待,和如何在队列空时通知 take 操作等待的。
我们可以通过阅读 ArrayBlockingQueue 的源码得知:
- ArrayBlockingQueue 中有一个 ReentrantLock lock;
- 这个 lock 给我们提供了两个 Condition:notEmpty 和 notFull;
- take 操作中,会以 while 循环的方式轮询 count == items.length,如果为 true,就 notFull.await(),这个阻塞状态需要通过 dequeue 方法中的 notFull.signal() 来解除;
- put 操作中,会以 while 循环的方式轮询 count == 0,如果为 true,就 notEmpty.await(),这个阻塞状态需要通过 enqueue 方法中的 notEmpty.signal() 来解除。
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
int count; // 队列中元素的个数
final ReentrantLock lock; // 下面的两个Condition绑定在这个锁上
private final Condition notEmpty; // 用来等待take的条件
private final Condition notFull; // 用来等待put的条件
public ArrayBlockingQueue(int capacity, boolean fair) {
// 省略...
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 加可中断锁
try {
while (count == items.length)
notFull.await(); // 轮询count值,等待count < items.length
enqueue(e); // 包含notFull.signal();
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await(); // 轮询count值,等待count > 0
return dequeue(); // 包含notEmpty.signal();
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal(); // 会唤醒在等待的take操作
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal(); // 会唤醒在等待的put操作
return x;
}
}
可以让一个或多个线程等待其他线程操作完成在继续执行,不可以循环使用,只能使用一次。
public CountDownLatch(int count); // 参数count为计数值
// 调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行,或等待中线程中断
public void await() throws InterruptedException;
// 和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException;
public void countDown(); // 将count值减1
public class CountDownLatchAndJoin {
static CountDownLatch countDownLatch = new CountDownLatch(2);
public static void main(String[] args) throws InterruptedException {
new Thread() {
@Override
public void run() {
System.out.println(1);
countDownLatch.countDown();
System.out.println(2);
countDownLatch.countDown();
}
}.start();
countDownLatch.await();
System.out.println("Main Finished");
}
}
/*
输出:
1
2
Main Finished // main线程会等待main启动的线程执行完再结束
*/
可以让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会打开,让所有线程通过,并且这个屏障可以循环使用(这点和 CountDownLatch 很不同)。
/**
* parties指让多少个线程或者任务等待至barrier状态
* barrierAction为当这些线程都达到barrier状态时会执行的内容
*/
public CyclicBarrier(int parties, Runnable barrierAction); // 常用
public CyclicBarrier(int parties);
public int await()
throws InterruptedException, BrokenBarrierException;
public int await(long timeout, TimeUnit unit)
throws InterruptedException, BrokenBarrierException, TimeoutException;
public class CyclicBarrierDemo2 {
static CyclicBarrier barrier = new CyclicBarrier(2, new After());
public static void main(String[] args) {
new Thread() {
@Override
public void run() {
System.out.println("In thread");
try {
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}.start();
System.out.println("In main");
try {
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Finish.");
}
static class After implements Runnable {
@Override
public void run() {
System.out.println("All reach barrier.");
}
}
}
/*
输出:
In main // main线程到达屏障之后会被阻塞
In thread
All reach barrier. // thread到达屏障之后会执行After的run
Main finish // 然后被阻塞的main线程和thread线程才会继续执行下去
Thread finish
*/
用来控制同时访问特定资源的线程数量。
// 参数permits表示许可数目,即同时可以允许多少线程进行访问,默认是非公平的
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// 这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可
public Semaphore(int permits, boolean fair) {
sync = (fair) ? new FairSync(permits) : new NonfairSync(permits);
}
/* 会阻塞等待的acquire方法 */
public void acquire() throws InterruptedException; // 获取一个许可
public void acquire(int permits) throws InterruptedException; // 获取permits个许可
public void release(); // 释放一个许可
public void release(int permits); // 释放permits个许可
/* 会阻塞但不等待,立即返回的acquire方法 */
// 尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
public boolean tryAcquire() { }
// 尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException { }
// 尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
public boolean tryAcquire(int permits) { }
// 尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException { }
public class SemaphoreDemo2 {
private static final int THREAD_COUNT = 10;
private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
private static Semaphore semaphore = new Semaphore(2);
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("save data");
Thread.sleep(1000);
semaphore.release();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
threadPool.shutdown();
}
}
/*
结果会两个两个的蹦出:save data,说明同时只有两个线程能拿到资源
*/
一个用于两个线程间交换数据的工具类。如果第一个线程先执行了exchange(V)
方法,它会阻塞在那里,等待第二个线程执行exchange(V)
方法,exchange(V)
会返回另一个线程传入的数据。
public Exchanger();
public V exchange(V x)
throws InterruptedException;
public V exchange(V x, long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException;
public class ExchangeDemo {
private static Exchanger<String> exch = new Exchanger<>();
private static ExecutorService pool = Executors.newFixedThreadPool(2);
// 用来保证线程池在两个线程执行完之后再关闭
private static CountDownLatch latch = new CountDownLatch(2);
public static void main(String[] args) {
pool.execute(new Runnable() {
@Override
public void run() {
try {
String data = "第一个线程的结果";
Thread.sleep(100);
String res = exch.exchange(data);
System.out.println("我是第一个线程,我收到另一个线程的结果为:" + res);
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
pool.execute(new Runnable() {
@Override
public void run() {
try {
String data = "第二个线程的结果";
Thread.sleep(1000);
String res = exch.exchange(data);
System.out.println("我是第二个线程,我收到另一个线程的结果为:" + res);
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
try {
latch.await(); // 等待两线程执行完,然后关闭线程池
} catch (Exception e) {
e.printStackTrace();
}
pool.shutdown();
}
}