AQS(AbstractQueuedSynchronizer)
(1)AQS是Java中几乎所有锁和同步器的一个基础框架;
(2)AQS中维护了一个队列,这个队列使用双链表实现,用于保存等待锁排队的线程;
(3)AQS中维护了一个状态变量state,通过原子更新这个状态变量state即可以实现加锁解锁操作;
(4)AQS中有一个ConditionObject内部类,用于实现条件锁。
(5)基于AQS自己动手写一个锁非常简单,只需要实现AQS的几个方法即可。
实现一个同步器只需要重写AQS的几个方法就可以了
// 互斥模式下使用:尝试获取锁
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 互斥模式下使用:尝试释放锁
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
// 共享模式下使用:尝试获取锁
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
// 共享模式下使用:尝试释放锁
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
// 如果当前线程独占着锁,返回true
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
ReentrantLock(1)——公平锁、非公平锁
主要内部类
ReentrantLock中主要定义了三个内部类:Sync、NonfairSync、FairSync。
(1)抽象类Sync实现了AQS的部分方法;
(2)NonfairSync实现了Sync,主要用于非公平锁的获取;
(3)FairSync实现了Sync,主要用于公平锁的获取。
lock()方法
公平锁
(1)如果没有线程排队,则尝试获取锁,如果获取到了就直接返回了;
(2)尝试获取锁失败,再调用addWaiter()
构建新节点并把新节点入队;
(3)然后调用acquireQueued()
,如果前一个节点是头结点,再次尝试获取锁,如果成功了,直接返回;
(4)如果再次失败,再调用shouldParkAfterFailedAcquire()
将节点的等待状态置为等待唤醒(SIGNAL);
(5)调用parkAndCheckInterrupt()
阻塞当前线程;
(6)如果被唤醒了,会继续在acquireQueued()的for()循环再次尝试获取锁,如果成功了就返回;
(7)如果不成功,再次阻塞,重复(3)(4)(5)直到成功获取到锁。
非公平锁
相对于公平锁,非公平锁加锁的过程主要有两点不同:
(1)一开始就尝试CAS更新状态变量state的值,如果成功了就获取到锁了;
(2)在tryAcquire()
的时候没有检查是否前面有排队的线程,直接上去获取锁才不管别人有没有排队呢;
总的来说,相对于公平锁,非公平锁在一开始就多了两次直接尝试获取锁的过程。
lockInterruptibly()方法
支持线程中断,它与lock()方法的主要区别在于lockInterruptibly()
获取锁的时候如果线程中断了,会抛出一个异常。
题外话:
线程中断,只是在线程上打一个中断标志,并不会对运行中的线程有什么影响,具体需要根据这个中断标志干些什么,用户自己去决定。
tryLock()方法
尝试获取一次锁,成功了就返回true,没成功就返回false,不会继续尝试。
tryLock(long time, TimeUnit unit)方法
尝试获取锁,并等待一段时间,如果在这段时间内都没有获取到锁,就返回false。
(1)先尝试获取一次锁
(2)通过nanosTimeout = deadline - System.nanoTime()
判断是否到期。到期直接返回。
(3)只有到期时间大于1000纳秒,才阻塞;小于等于1000纳秒,直接自旋解决。
unlock()方法
释放锁。调用AbstractQueuedSynchronizer.release()
(1)将state的值减1;
(2)如果state减到了0,说明已经完全释放锁了,唤醒下一个等待着的节点;
面试题
为什么ReentrantLock默认采用的是非公平模式?
答:因为非公平模式效率比较高。
为什么非公平模式效率比较高?
答:因为非公平模式会在一开始就尝试两次获取锁,如果当时正好state的值为0,它就会成功获取到锁,少了排队导致的阻塞/唤醒过程,并且减少了线程频繁的切换带来的性能损耗。
非公平模式有什么弊端?
答:非公平模式有可能会导致一开始排队的线程一直获取不到锁,导致线程饿死。
ReentrantLock(2)——条件锁
条件锁,是指在获取锁之后发现当前业务场景自己无法处理,而需要等待某个条件的出现才可以继续处理时使用的一种锁。
比如,在阻塞队列中,当队列中没有元素的时候是无法弹出一个元素的,这时候就需要阻塞在条件notEmpty上,等待其它线程往里面放入一个元素后,唤醒这个条件notEmpty,当前线程才可以继续去做“弹出一个元素”的行为。
注意,这里的条件,必须是在获取锁之后去等待,对应到ReentrantLock的条件锁,就是获取锁之后才能调用condition.await()方法。
在java中,条件锁的实现都在AQS的ConditionObject类中,ConditionObject实现了Condition接口,下面我们通过一个例子来进入到条件锁的学习中。
主要属性
条件锁中也维护了一个队列,为了和AQS的队列区分,我这里称为条件队列,firstWaiter是队列的头节点,lastWaiter是队列的尾节点
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
}
lock.newCondition()方法
新建一个条件锁最后就是调用的AQS中的ConditionObject类来实例化条件锁。
condition.await()方法
condition.await()方法,表明现在要等待条件的出现。
(1)新建一个节点加入到条件队列中去;
(2)完全释放当前线程占有的锁;
(3)阻塞当前线程,并等待条件的出现;
(4)条件已出现(此时节点已经移到AQS的队列中),尝试获取锁;
也就是说await()方法内部其实是 先释放锁->等待条件->再次获取锁
的过程。
condition.signal()方法
condition.signal()方法通知条件已经出现。
(1)从条件队列的头节点开始寻找一个非取消状态的节点;
(2)把它从条件队列移到AQS队列;
(3)且只移动一个节点;
ReentrantLock VS synchronized
synchronized是Java原生提供的用于在多线程环境中保证同步的关键字,底层是通过修改对象头中的MarkWord来实现的。
ReentrantLock是Java语言层面提供的用于在多线程环境中保证同步的类,底层是通过原子更新状态变量state来实现的。
功能 | ReentrantLock | synchronized |
---|---|---|
可重入 | 支持 | 支持 |
非公平 | 支持(默认) | 支持 |
加锁/解锁方式 | 需要手动加锁、解锁,一般使用try…finally…保证锁能够释放 | 手动加锁,无需刻意解锁 |
按key锁 | 不支持,比如按用户id加锁 | 支持,synchronized加锁时需要传入一个对象 |
公平锁 | 支持,new ReentrantLock(true) | 不支持 |
中断 | 支持,lockInterruptibly() | 不支持 |
尝试加锁 | 支持,tryLock() | 不支持 |
超时锁 | 支持,tryLock(timeout, unit) | 不支持 |
获取当前线程获取锁的次数 | 支持,getHoldCount() | 不支持 |
获取等待的线程 | 支持,getWaitingThreads() | 不支持 |
检测是否被当前线程占有 | 支持,isHeldByCurrentThread() | 不支持 |
检测是否被任意线程占有 | 支持,isLocked() | 不支持 |
条件锁 | 可支持多个条件,condition.await(),condition.signal(),condition.signalAll() | 只支持一个,obj.wait(),obj.notify(),obj.notifyAll() |
ReentrantReadWriteLock
读写锁是一种特殊的锁,它把对共享资源的访问分为读访问和写访问。
多个线程可以同时对共享资源进行读访问,但是同一时间只能有一个线程对共享资源进行写访问,使用读写锁可以极大地提高并发量。
内部维护了读锁readerLock
、写锁writerLock
和同步器Sync
。
ReadLock.lock() 读锁加锁
(1)先尝试获取读锁;
(2)如果成功了直接结束;
(3)如果失败了,进入doAcquireShared()方法;
(4)doAcquireShared()方法中首先会生成一个新节点并进入AQS队列中;
(5)如果头节点正好是当前节点的上一个节点,再次尝试获取锁;
(6)如果成功了,则设置头节点为新节点,并传播;
(7)传播即唤醒下一个读节点(如果下一个节点是读节点的话);
(8)如果头节点不是当前节点的上一个节点或者(5)失败,则阻塞当前线程等待被唤醒;
(9)唤醒之后继续走(5)的逻辑;
ReadLock.unlock() 读锁解锁
(1)将当前线程重入的次数减1;
(2)将共享锁总共被获取的次数减1;
(3)如果共享锁获取的次数减为0了,说明共享锁完全释放了,那就唤醒下一个节点;
WriteLock.lock() 写锁加锁
(1)尝试获取锁;
(2)如果有读者占有着读锁,尝试获取写锁失败;
(3)如果有其它线程占有着写锁,尝试获取写锁失败;
(4)如果是当前线程占有着写锁,尝试获取写锁成功,state值加1;
(5)如果没有线程占有着锁(state==0),当前线程尝试更新state的值,成功了表示尝试获取锁成功,否则失败;
(6)尝试获取锁失败以后,进入队列排队,等待被唤醒;
(7)后续逻辑跟ReentrantLock是一致;
WriteLock.unlock() 写锁解锁
(1)先尝试释放锁,即状态变量state的值减1;
(2)如果减为0了,说明完全释放了锁;
(3)完全释放了锁才唤醒下一个等待的节点;
总结
(1)ReentrantReadWriteLock采用读写锁的思想,能提高并发的吞吐量;
(2)读锁使用的是共享锁,多个读锁可以一起获取锁,互相不会影响,即读读不互斥;
(3)读写、写读和写写是会互斥的,前者占有着锁,后者需要进入AQS队列中排队;
(4)多个连续的读线程是一个接着一个被唤醒的,而不是一次性唤醒所有读线程;
(5)只有多个读锁都完全释放了才会唤醒下一个写线程;
(6)只有写锁完全释放了才会唤醒下一个等待者,这个等待者有可能是读线程,也可能是写线程;
(7)同一线程可以先写后读,不能先读后写。
彩蛋
同一线程先写后读,先读后写
先读后写,一个线程占有读锁后,其它线程还是可以占有读锁的,这时候如果在其它线程占有读锁之前让自己占有了写锁,其它线程又不能占有读锁了,这段程序会非常难实现,逻辑也很奇怪,所以,设计成只要一个线程占有了读锁,其它线程包括它自己都不能再获取写锁。
先写后读,一个线程占有写锁后,其它线程是不能占有任何锁的,这时候,即使自己占有一个读锁,对程序的逻辑也不会有任何影响,所以,一个线程占有写锁后是可以再占有读锁的,只是这个时候其它线程依然无法获取读锁。
一个线程先占有读锁后占有写锁,会有一个很大的问题——锁无法被释放也无法被获取了。这个线程先占有了读锁,然后自己再占有写锁的时候会阻塞,然后它就自己把自己搞死了,进而把其它线程也搞死了,它无法释放锁,其它线程也无法获得锁了。
这是死锁吗?似乎不是,死锁的定义是线程A占有着线程B需要的资源,线程B占有着线程A需要的资源,两个线程相互等待对方释放资源。纯属自己玩死自己。
如何使用ReentrantReadWriteLock实现一个高效安全的TreeMap?
class SafeTreeMap {
private final Map<String, Object> m = new TreeMap<String, Object>();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
public Object get(String key) {
readLock.lock();
try {
return m.get(key);
} finally {
readLock.unlock();
}
}
public Object put(String key, Object value) {
writeLock.lock();
try {
return m.put(key, value);
} finally {
writeLock.unlock();
}
}
}
Semaphore
Semaphore,信号量,它保存了一系列的许可(permits),每次调用acquire()都将消耗一个许可,每次调用release()都将归还一个许可。
(1)Semaphore,也叫信号量,通常用于控制同一时刻对共享资源的访问上,也就是限流场景;
(2)Semaphore的内部实现是基于AQS的共享锁来实现的;
(3)Semaphore初始化的时候需要指定许可的次数,许可的次数是存储在state中;
(4)获取一个许可时,则state值减1;
(5)释放一个许可时,则state值加1;
(6)可以动态减少n个许可;
(7)可以动态增加n个许可
acquire()
nonfair: 不断循环cas
fair: 先检测前面是否有排队的,如果有排队的则获取许可失败,进入队列排队,否则尝试原子更新state的值。
彩蛋
(1)如何动态增加n个许可?
答:调用release(int permits)即可。我们知道释放许可的时候state的值会相应增加,再回头看看释放许可的源码,发现与ReentrantLock的释放锁还是有点区别的,Semaphore释放许可的时候并不会检查当前线程有没有获取过许可,所以可以调用释放许可的方法动态增加一些许可。
(2)如何实现限流?
答:限流,即在流量突然增大的时候,上层要能够限制住突然的大流量对下游服务的冲击,在分布式系统中限流一般做在网关层,当然在个别功能中也可以自己简单地来限流,比如秒杀场景,假如只有10个商品需要秒杀,那么,服务本身可以限制同时只进来100个请求,其它请求全部作废,这样服务的压力也不会太大。
CountDownLatch
CountDownLatch,可以翻译为倒计时器,但是似乎不太准确,它的含义是允许一个或多个线程等待其它线程的操作执行完毕后再执行后续的操作。
CountDownLatch的通常用法和Thread.join()有点类似,等待其它线程都完成后再执行主任务。
(1)CountDownLatch表示允许一个或多个线程等待其它线程的操作执行完毕后再执行后续的操作;
(2)CountDownLatch使用AQS的共享锁机制实现;
(3)CountDownLatch初始化的时候需要传入次数count;
(4)每次调用countDown()方法count的次数减1;
(5)每次调用await()方法的时候会尝试获取锁,这里的获取锁其实是检查AQS的state变量的值是否为0;
(6)当count的值(也就是state的值)减为0的时候会唤醒排队着的线程(这些线程调用await()进入队列);
countDown()
释放一个共享锁,也就是count的次数会减1,当state等于0时,就会唤醒队列中的线程
await()
调用tryAcquireShared()
方法,若state=0,则成功,否则失败入队。
彩蛋
1、CountDownLatch为什么使用共享锁?
前面我们分析ReentrantReadWriteLock的时候学习过AQS的共享锁模式,比如当前锁是由一个线程获取为互斥锁,那么这时候所有需要获取共享锁的线程都要进入AQS队列中进行排队,当这个共享锁释放的时候,会一个接着一个地唤醒这些连续的排队的等待获取共享锁的线程,注意,这里的用语是“一个接着一个地唤醒”,也就是说这些等待获取共享锁的线程不是一次性唤醒的。
说到这里,是不是很明白了?因为CountDownLatch的await()多个线程可以调用多次,当调用多次的时候这些线程都要进入AQS队列中排队,当count次数减为0的时候,它们都需要被唤醒,继续执行任务,如果使用互斥锁则不行,互斥锁在多个线程之间是互斥的,一次只能唤醒一个,不能保证当count减为0的时候这些调用了await()方法等待的线程都被唤醒。
2、CountDownLatch与Thread.join()有何不同?
Thread.join()是在主线程中调用的,它只能等待被调用的线程结束了才会通知主线程,而CountDownLatch则不同,它的countDown()方法可以在线程执行的任意时刻调用,灵活性更大。
AQS
AQS的全称是AbstractQueuedSynchronizer
,它的定位是为Java中几乎所有的锁和同步器提供一个基础框架。
状态变量state
AQS中定义了一个状态变量state,它有以下两种使用方法:
(1)互斥锁
当AQS只实现为互斥锁的时候,每次只要原子更新state的值从0变为1成功了就获取了锁,可重入是通过不断把state原子更新加1实现的。
(2)共享锁 + 互斥锁
当AQS需要同时实现为互斥锁+共享锁的时候,高16位存储共享锁的状态,低16位存储互斥锁的状态,主要用于实现读写锁。
互斥锁是一种独占锁,每次只允许一个线程独占,且当一个线程独占时,其它线程将无法再获取互斥锁及共享锁,但是它自己可以获取共享锁。
共享锁同时允许多个线程占有,只要有一个线程占有了共享锁,所有线程(包括自己)都将无法再获取互斥锁,但是可以获取共享锁。
AQS队列
AQS中维护了一个队列,获取锁失败(非tryLock())的线程都将进入这个队列中排队。
互斥锁模式下,锁释放后唤醒下一个排队的线程。
共享锁模式下,锁释放后一个接一个地唤醒共享模式的线程。
Condition队列
AQS中还有另一个非常重要的内部类ConditionObject,它实现了Condition接口,主要用于实现条件锁。
ConditionObject中也维护了一个队列,这个队列主要用于等待条件的成立,当条件成立时,其它线程将signal这个队列中的元素,将其移动到AQS的队列中,等待占有锁的线程释放锁后被唤醒。
Condition典型的运用场景是在BlockingQueue中的实现,当队列为空时,获取元素的线程阻塞在notEmpty条件上,一旦队列中添加了一个元素,将通知notEmpty条件,将其队列中的元素移动到AQS队列中等待被唤醒。
模板方法
AQS这个抽象类把模板方法设计模式运用地炉火纯青,它里面定义了一系列的模板方法,比如下面这些:
// 获取互斥锁
public final void acquire(int arg) {
// tryAcquire(arg)需要子类实现
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 获取互斥锁可中断
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// tryAcquire(arg)需要子类实现
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
// 获取共享锁
public final void acquireShared(int arg) {
// tryAcquireShared(arg)需要子类实现
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
// 获取共享锁可中断
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// tryAcquireShared(arg)需要子类实现
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// 释放互斥锁
public final boolean release(int arg) {
// tryRelease(arg)需要子类实现
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// 释放共享锁
public final boolean releaseShared(int arg) {
// tryReleaseShared(arg)需要子类实现
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
需要子类实现的方法
// 互斥模式下使用:尝试获取锁
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 互斥模式下使用:尝试释放锁
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
// 共享模式下使用:尝试获取锁
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
// 共享模式下使用:尝试释放锁
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
// 如果当前线程独占着锁,返回true
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
这几个方法为什么不直接定义成抽象方法呢?
因为子类只要实现这几个方法中的一部分就可以实现一个同步器了,所以不需要定义成抽象方法。
前面的几个类均是基于AQS实现的。下面的类则是自成一体。
StampedLock
StampedLock是java8中新增的类,它是一个更加高效的读写锁的实现。
StampedLock具有三种模式:写模式、读模式、乐观读模式。
ReentrantReadWriteLock中的读和写都是一种悲观锁的体现,StampedLock加入了一种新的模式——乐观读,它是指当乐观读时假定没有其它线程修改数据,读取完成后再检查下版本号有没有变化,没有变化就读取成功了,这种模式更适用于读多写少的场景。
与ReentrantReadWriteLock进行对比:
(1)写锁的使用方式基本一对待;
(2)读锁(悲观)的使用方式可以进行升级,通过tryConvertToWriteLock()方式可以升级为写锁;
(3)乐观读锁是一种全新的方式,它假定数据没有改变,乐观读之后处理完业务逻辑再判断版本号是否有改变,如果没改变则乐观读成功,如果有改变则转化为悲观读锁重试;
acquireWrite()
获取写锁。
第一段自旋——入队:
(1)如果头节点等于尾节点,说明没有其它线程排队,那就多自旋一会,看能不能尝试获取到写锁;
(2)否则,自旋次数为0,直接让其入队;
第二段自旋——阻塞并等待被唤醒 + 第三段自旋——不断尝试获取写锁:
(1)第三段自旋在第二段自旋内部;
(2)如果头节点等于前置节点,那就进入第三段自旋,不断尝试获取写锁;
(3)否则,尝试唤醒头节点中等待着的读线程;
(4)最后,如果当前线程一直都没有获取到写锁,就阻塞当前线程并等待被唤醒;
unlockWrite()
释放写锁。
(1)更改state的值,释放写锁;
(2)版本号加1;
(3)唤醒下一个等待着的节点;
readLock()方法
获取读锁。
获取读锁的时候先看看现在有没有其它线程占用着写锁,如果没有的话再检测读锁被获取的次数有没有达到最大,如果没有的话直接尝试获取一次读锁,如果成功了直接返回版本号,如果没成功就调用acquireRead()排队。
读节点进来都是先判断是头节点如果等于尾节点,说明快轮到自己了,就不断自旋尝试获取读锁,如果成功了就返回;如果头节点不等于尾节点,这里就会让当前节点入队。
unlockRead()方法
释放读锁。
读锁释放的过程就比较简单了,将state的低7位减1,当减为0的时候说明完全释放了读锁,就唤醒下一个排队的线程。
tryOptimisticRead()方法
乐观读。
如果没有写锁,就返回state的高25位,这里把写锁在位置一起返回了,是为了后面检测数据有没有被写过。
validate()方法
检测乐观读版本号(state前24位)是否变化。
变异的CLH队列
StampedLock中的队列是一种变异的CLH队列,图解如下:
总结
(1)StampedLock也是一种读写锁,它不是基于AQS实现的;
(2)StampedLock相较于ReentrantReadWriteLock多了一种乐观读的模式,以及读锁转化为写锁的方法;
(3)StampedLock的state存储的是版本号,确切地说是高24位存储的是版本号,写锁的释放会增加其版本号,读锁不会;
(4)StampedLock的低7位存储的读锁被获取的次数,低第8位存储的是写锁被获取的次数;
(5)StampedLock不是可重入锁,因为只有第8位标识写锁被获取了,并不能重复获取;
(6)StampedLock中获取锁的过程使用了大量的自旋操作,对于短任务的执行会比较高效,长任务的执行会浪费大量CPU;
(7)StampedLock不能实现条件锁;
彩蛋
1、StampedLock与ReentrantReadWriteLock的对比?
(1)两者都有获取读锁、获取写锁、释放读锁、释放写锁的方法,这是相同点;
(2)两者的结构基本类似,都是使用state + CLH队列;
(3)state分成三段
- 高24位存储版本号,写锁的释放会增加其版本号,读锁不会。
- 低7位存储读锁被获取的次数
- 第8位存储写锁被获取的次数,StampedLock不是可重入锁,因为只有第8位标识写锁被获取了,并不能重复获取
(4)前者的CLH队列可以看成是变异的CLH队列,连续的读线程只有首个节点存储在队列中,其它的节点存储的首个节点的cowait栈中;后者的CLH队列是正常的CLH队列,所有的节点都在这个队列中;
(5)前者获取锁的过程中有判断首尾节点是否相同,也就是是不是快轮到自己了,如果是则不断自旋,所以适合执行短任务;后者获取锁的过程中非公平模式下会做有限次尝试;
(6)前者只有非公平模式,一上来就尝试获取锁;
(7)前者唤醒读锁是一次性唤醒连续的读锁的,而且其它线程还会协助唤醒;后者是一个接着一个地唤醒的;
(8)前者有乐观读的模式,乐观读的实现是通过判断state的高25位是否有变化来实现的;
(9)前者各种模式可以互转,类似tryConvertToXxx()方法;
(10)前者写锁不可重入,后者写锁可重入;
(11)前者无法实现条件锁,后者可以实现条件锁;
CyclicBarrier
CyclicBarrier,回环栅栏,它会阻塞一组线程直到这些线程同时达到某个条件才继续执行。它与CountDownLatch很类似,但又不同,CountDownLatch需要调用countDown()方法触发事件,而CyclicBarrier不需要,它就像一个栅栏一样,当一组线程都到达了栅栏处才继续往下走。
(1)其有一个内部类Generation
,中文翻译为代,一代人的代,用于控制CyclicBarrier的循环使用。上面示例中的三个线程完成后进入下一代,继续等待三个线程达到栅栏处再一起执行,而CountDownLatch则做不到这一点,CountDownLatch是一次性的,无法重置其次数
(2)CyclicBarrier会使一组线程阻塞在await()处,若非最后一个线程,会将本线程放入condition队列。当最后一个线程到达时唤醒singalAll(只是从条件队列转移到AQS队列中)前面的线程再继续往下走;
(3)CyclicBarrier不是直接使用AQS实现的一个同步器;
(4)CyclicBarrier基于ReentrantLock及其Condition实现整个同步逻辑;
Phaser
它适用于这样一种场景,一个大任务可以分为多个阶段完成,且每个阶段的任务可以多个线程并发执行,但是必须上一个阶段的任务都完成了才可以执行下一个阶段的任务
(1)state,状态变量,高32位存储当前阶段phase,中间16位存储参与者的数量,低16位存储未完成参与者的数量。
(2)evenQ和oddQ,已完成的参与者存储的队列,当最后一个参与者完成任务后唤醒队列中的参与者继续执行下一个阶段的任务,或者结束任务。
register()
注册一个参与者,如果调用该方法时,onAdvance()方法正在执行,则该方法等待其执行完毕。
(1)增加一个参与者,需要同时增加parties和unarrived两个数值,也就是state的中16位和低16位;
(2)如果是第一个参与者,则尝试原子更新state的值,如果成功了就退出;
(3)如果不是第一个参与者,则检查是不是在执行onAdvance(),如果是等待onAdvance()执行完成,如果否则尝试原子更新state的值,直到成功退出;
(4)等待onAdvance()完成是采用先自旋后进入队列排队的方式等待,减少线程上下文切换;
arriveAndAwaitAdvance()
当前线程当前阶段执行完毕,等待其它线程完成当前阶段。
如果当前线程是该阶段最后一个到达的,则当前线程会执行onAdvance()方法,并唤醒其它线程进入下一个阶段。
(1)修改state中unarrived部分的值减1;
(2)如果不是最后一个到达的,则调用internalAwaitAdvance()方法自旋或排队等待;
(3)如果是最后一个到达的,则调用onAdvance()方法,然后修改state的值为下一阶段对应的值,并唤醒其它等待的线程;
(4)返回下一阶段的值;
总结
(1)Phaser适用于多阶段多任务的场景,每个阶段的任务都可以控制得很细;
(2)Phaser内部使用state变量及队列实现整个逻辑;
(3)state的高32位存储当前阶段phase,中16位存储当前阶段参与者(任务)的数量parties,低16位存储未完成参与者的数量unarrived;
(4)队列会根据当前阶段的奇偶性选择不同的队列,为了在释放一些线程的同时添加其他线程时消除争用,使用两个队列,在偶数和奇数阶段交替;
(5)当不是最后一个参与者到达时,会自旋或者进入队列排队来等待所有参与者完成任务;
(6)当最后一个参与者完成任务时,会唤醒队列中的线程并进入下一个阶段;
彩蛋
Phaser相对于CyclicBarrier和CountDownLatch的优势?
(1)Phaser可以完成多阶段,而一个CyclicBarrier或者CountDownLatch一般只能控制一到两个阶段的任务;
(2)Phaser每个阶段的任务数量可以控制,而一个CyclicBarrier或者CountDownLatch任务数量一旦确定不可修改。
Comments | 0 条评论