Condition 被用来替代传统的Object的 wait()
、notify()
,实现线程间的协作;
相比使用 Object 的 wait()
、notify()
,使用 Condition 的 await()
、signal()
这种方式实现线程间协作更加安全和高效。
1. 前言
考虑文章排版,测试示例放在了文章的末尾;建议先运行测试示例,了解代码运行流程再看源码会更深刻;
如不了解 ReentrantLock,建议移步 《ReentrantLock 和 AQS》 ,了解完 ReentrantLock 再看 Condition,否则可能会一头雾水。
2. Condition
2.1. 与AQS的关系
Java中的 Condition
的实现是 ConditionObject,它是 AbstractQueuedSynchronizer 中的内部类;
2.2. ConditionObject成员变量
变量名 | 含义 | 备注 |
---|---|---|
transient Node firstWaiter; | 条件队列(ConditionQueue)第一个节点 | 用于构建条件队列 下面用 first节点 代称 |
transient Node lastWaiter; | 条件队列(ConditionQueue)最后一个节点 | 用于构建条件队列 下面用 last节点 代称 |
static final int THROW_IE = -1; | 中断标识 | 收到signal信号前被中断 |
static final int REINTERRUPT = 1; | 中断标识 | 收到signal信号后被中断 |
2.3. 构造方法
1 | // java.util.concurrent.locks.ReentrantLock#newCondition |
3. await 逻辑
await
、signal
方法的调用都在临界区中(被lock
和unlock
环绕);当前线程运行到Condition#await
方法,说明此时线程已经拿到了锁;
1 | // java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await() |
方法名 | 作用、含义 | 备注 |
---|---|---|
addConditionWaiter | 将调用await 方法的当前线程包装成node,添加到条件队列(ConditionQueue) | 从尾部添加 |
fullyRelease | 完全释放当前线程持有的锁 | 如当前线程重入好几次,则一并释放 |
isOnSyncQueue | 判断节点是否在同步队列中 | |
checkInterruptWhileWaiting | 被唤醒后检查是否被取消中断 | |
acquireQueued | 对加入同步队列排队的节点添加自旋检查 | |
unlinkCancelledWaiters | 清理已经拿到锁的节点 | |
reportInterruptAfterWait | 向上抛出节点状态中断异常,或者中断节点 |
3.1. addConditionWaiter
此方法会将调用await
方法的当前线程包装成node,添加到条件队列(ConditionQueue)中;如果条件队列(ConditionQueue)为空,则会进行初始化;
添加结束,会把已添加到条件队列(ConditionQueue)的node返回。
1 | // java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#addConditionWaiter |
第一次初始化构建队列
初始化构建队列
第二次正常添加
将node添加到条件队列的尾部
3.2. fullyRelease
此方法的作用是完全释放当前线程持有的锁;
什么是完全释放?ReentrantLock是可重入的,一个线程可能获取了好几次锁。
state:AQS中使用该值表示同步状态,通过CAS修改保证原子性,同时也是线程重入的次数;
release
方法的逻辑在 ReentrantLock 和 AQS 中阐述过,此处不再赘述。
1 | // java.util.concurrent.locks.AbstractQueuedSynchronizer#fullyRelease |
3.3. isOnSyncQueue
注意:此方法中的prev和next属性,是ReentrantLock中用于维护同步队列的属性
此方法作用是:判断节点是否在同步队列中;
返回false:当前节点不在同步队列中
说明当前节点没有被唤醒去竞争锁;需要将当前节点阻塞,直到其他线程调用
signal
唤醒。返回true:当前节点在同步队列中。
说明当前节点已经被唤醒,需要去竞争锁,执行业务。
1 | // java.util.concurrent.locks.AbstractQueuedSynchronizer#isOnSyncQueue |
为什么
node.prev == null
可以作为判断node不在同步队列的依据?站在ReentrantLock视角:
- 添加node到同步队列时,首先会先将node的前驱节点指向当前tail节点,即:
node.prev = pred
。 - 此时node的前驱节点是不为null的。
- 然后通过CAS设置tail节点(compareAndSetTail),即使设置失败,此时node的前驱节点依旧指向当前tail节点;
- 第三步CAS方式设置tail失败后,会进入enq方法,自旋直到成功加入同步队列为止。
因此,说明两个问题:
node的前驱节点(node.prev)为null,node肯定不在同步队列(SyncQueue / CLH)中。
node的前驱节点(node.prev)不为null,不能说明node就在同步队列中;
虽然prev不为null,但通过CAS设置tail失败,正处于自旋的过程中,并没有成功加入同步队列。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15// java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}- 添加node到同步队列时,首先会先将node的前驱节点指向当前tail节点,即:
为什么
node.next != null
可以作为判断node在同步队列的依据?和上面的问题类似;
站在ReentrantLock视角,当node添加到同步队列时,设置next指针都是通过CAS方式进行的(原子的);
所以,只要next不为空,那肯定是已经添加到同步队列中了。
如果两个if判断都没有得到结果,那就只能拿着node去同步队列中找,进入findNodeFromTail
方法。
3.3.1. findNodeFromTail
此方法的作用是:将已添加到条件队列(ConditionQueue)的node,去同步队列从后向前挨个比较,如果存在说明node在同步队列中。
返回false:当前节点不在同步队列中
说明当前节点没有被唤醒去竞争锁;需要将当前节点阻塞,直到其他线程调用
signal
唤醒。返回true:当前节点在同步队列中。
说明当前节点已经被唤醒,需要去竞争锁,执行业务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17// java.util.concurrent.locks.AbstractQueuedSynchronizer#findNodeFromTail
private boolean findNodeFromTail(Node node) {
// t指针指向同步队列中的节点;此时指向同步队列的tail(尾节点)
Node t = tail;
// 从同步队列尾部向头部查找;
for (;;) {
if (t == node)
// 当前节点 和 同步队列中的某个节点相同
return true;
if (t == null)
// 已循环到head头节点,退出自旋
return false;
// t指针指向前一个节点
t = t.prev;
}
}
3.4. 被阻塞
1 | // java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await() |
什么情况下会进入while循环?
node不在同步队列中,即
isOnSyncQueue
方法返回false;
说明当前节点没有被唤醒去竞争锁(还没有调用signal),需要将当前节点阻塞,直到其他线程调用signal
唤醒。什么情况下会结束while循环?
需要注意的是,结束循环的前提一定是,其他线程调用了
unPark
方法,将当前节点唤醒;什么情况下,会调用
unPark
方法?这块逻辑关联signal的处理与锁释放,此处先按下不表;
假设其他线程调用了LockSupport.unPark
方法,当前线程就是被唤醒了,以下两种情况,会结束while循环;
node中断
从上述代码的第7行继续执行;
被唤醒后,会去检查节点(线程)是否被中断;如果被中断(interruptMode != 0
),会结束while循环。node添加到同步队列后
从上述代码的第7行继续执行;
被唤醒后,检查节点(线程)没有被中断;再次调用isOnSyncQueue
方法;
待isOnSyncQueue
方法返回true,即当前节点已经添加到在同步队列中,结束循环。
剧透下,添加到同步队列,是在signal方法中实现;
await方法逻辑走到这里,暂告一段落;至此,调用await方法的线程,已经添加到了条件队列,并且已经被LockSupport#park阻塞,建议跳转到<signal逻辑>章节,阅读完再回来往下继续看;
3.5. 被唤醒
假设已经看完<signal逻辑>
在signal
方法逻辑中,会从条件队列(ConditionQueue)将第一个节点取出(指向first),然后以自旋方式,添加到同步队列尾部;
如果node节点(指同步队列中tail节点)的前驱节点状态不正常,Condition为了保证可靠性,会直接调用
LockSupport#unpark
唤醒。如果node节点(指同步队列中tail节点)的前驱节点状态正常,则由
AQS#unparkSuccessor
方法中(释放锁时)执行,按照顺序从 head 向 tail 挨个唤醒排队的节点。
假设此时同步队列中排队的node被唤醒,执行AQS#unparkSuccessor
方法,调用LockSupport#unpark
唤醒上面被阻塞的线程。
此时被阻塞的线程会从checkInterruptWhileWaiting
方法继续执行;检查节点(线程)没有被中断;继续下一次循环,再次调用isOnSyncQueue
方法;此时isOnSyncQueue`方法返回true,即当前节点在同步队列中,结束while循环。
3.6. acquireQueued
acquireQueued
方法的逻辑在 ReentrantLock 和 AQS 中阐述过,此处不再赘述。
此方法会对加入同步队列排队的节点添加自旋方式的竞争和检查;
自旋检查做两件事情:1.节点能否竞争到锁;2.判断当前节点(线程)是否需要阻塞;
直到节点(线程)获取锁成功或者不再需要获取(异常、中断);
该方法的返回值是boolean,代表自旋过程中,节点/线程 是否被中断过;true - 中断过,false - 没有中断过。
3.7. 后置处理
个人将acquireQueued
方法之后的逻辑,统称为后置处理;它们主要负责兜底和扫尾。
1 | // java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await() |
第一个判断
1
2
3
4// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;acquireQueued
方法返回true,说明节点/线程在同步队列排队阻塞的过程中,被中断;interruptMode != THROW_IE
为true,代表节点在收到signal信号前没有被中断;将
interruptMode
标识设置为REINTERRUPT。第二个判断
1
2
3
4// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();逻辑走到这个if判断,说明不论node是否被中断,它都已经竞争到了锁;
如果node的后继节点不为空,调用
unlinkCancelledWaiters
方法进行清理;第三个判断
1
2if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);interruptMode
什么时候不为0?- 首先
interruptMode
初始化是0; - node阻塞被唤醒后检查状态,node收到signal信号前被中断
interruptMode = -1
;node收到signal信号后被中断interruptMode = 1
; - node在同步队列中竞争锁的过程中被中断,而且node收到signal信号前没有被中断,
interruptMode = 1
;
可以发现,只要node被中断过,不论是什么时机都会进入
reportInterruptAfterWait
方法;- 首先
4. signal 逻辑
await
、signal
方法的调用都在临界区中(被lock
和unlock
环绕);当前线程运行到Condition#await
方法,说明此时线程已经拿到了锁;
1 | // java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal |
方法名 | 作用、含义 | 备注 |
---|---|---|
isHeldExclusively | 判断调用signal 方法的线程是否持有锁 | |
doSignal | 执行发送signal信号 |
4.1. isHeldExclusively
此方法判断调用signal
方法的线程是否持有锁;true:持有锁,false:未持有锁
1 | // java.util.concurrent.locks.ReentrantLock.Sync#isHeldExclusively |
4.2. doSignal
此方法接收的node是从条件队列(ConditionQueue)中获取的first节点(也就是条件队列中的第一个节点)。
注意:这里循环使用的是
do...while
,首先执行语句块,再判断表达式,至少执行一次;
1 | // java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#doSignal |
方法名 | 作用、含义 | 备注 |
---|---|---|
transferForSignal | 将node从 条件队列(ConditionQueue)转移到 同步队列 true - 成功,flase - 失败(节点/线程 被取消中断) |
语句块
循环使用的是
do...while
,首先执行语句块,再判断表达式,至少执行一次;1
2
3
4
5...
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
...将firstWaiter指针指向 first的后继节点;
后继节点为空,说明条件队列中没有节点了,清理 lastWaiter;
不论firstWaiter是否为空,都会清理first的nextWaiter属性(此举意味着解除first节点与条件队列的关联);
表达式
1
!transferForSignal(first) && (first = firstWaiter) != null
transferForSignal
方法将从条件队列中取出的node,添加到同步队列中,添加成功返回true;加入同步队列成功,不会再执行语句块;
加入同步队列失败(说明节点被取消中断了,只有节点被中断才会返回false)
- 将firstWaiter指针指向的node赋值给first,循环再次进入语句块。
4.2.1. transferForSignal
此方法作用是将从条件队列中取出的node,添加到同步队列中,添加成功返回 true。
参数列表中的node就是从条件队列中按顺序取出的node;
返回true:node添加到同步队列成功。
返回false:node被取消、中断。
enq
方法的逻辑在 ReentrantLock 和 AQS 中阐述过;
此处简述:enq
方法通过自旋方式,采用尾插法,将node添加到同步队列;
调用成功的话,添加进去的node节点是同步队列的tail尾节点;enq
方法返回的是,同步队列中tail节点的前驱节点(尾节点的前驱节点)。
1 | // java.util.concurrent.locks.AbstractQueuedSynchronizer#transferForSignal |
为何添加到同步队列后会存在
LockSupport#unpark
操作?按照理想状态,当节点从 条件队列(ConditionQueue)转移到同步队列后,unPark操作应该由
AQS#unparkSuccessor
方法中(释放锁时)执行,按照顺序从 head 向 tail 挨个唤醒排队的节点;1
2if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);首先明确下,p对象代表的是 同步队列tail节点的前驱节点。
观察上面代码中的两个判断:
ws > 0
代表前驱节点的状态是CANCELLED,即非正常状态;
!compareAndSetWaitStatus(p, ws, Node.SIGNAL)
尝试用CAS方式,将前驱节点的状态更新为SIGNAL(-1);更新失败,返回false;
这两个条件可以判定 同步队列tail节点的前驱节点 状态是不正常的;
虽然AQS中有剔除非正常状态节点的机制,即释放锁,调用
unparkSuccessor
方法唤醒下一个节点时,如果head头节点的后驱节点状态异常,会从tail尾节点向前扫描整个链表,剔除非正常节点;
Condition为了保证tail节点一定会被唤醒,提高可靠性,这里直接调用LockSupport#unpark
唤醒。
signal逻辑到这里结束,如果是从<await逻辑#被阻塞>章节跳转而来,可以回头从<await#被唤醒>章节继续往下看,串联整个逻辑;
5. 何时调用unPark?
什么时候会调用LockSupport.unpark
方法,唤醒在await
方法中阻塞的线程?这是<被阻塞>章节中,留下的问题;
在signal
方法逻辑中,会从条件队列(ConditionQueue)将第一个节点取出(指向first),然后以自旋方式,添加到同步队列尾部;
如果node节点(指同步队列中tail节点)的前驱节点状态不正常,Condition为了保证可靠性,会直接调用
LockSupport#unpark
唤醒。虽然AQS中有剔除同步队列中非正常状态节点的机制,即释放锁,调用
unparkSuccessor
方法唤醒下一个节点时,如果head头节点的后驱节点状态异常,会从tail尾节点向前扫描整个链表,剔除非正常节点;但Condition为了保证同步队列的tail节点一定会被唤醒,提高可靠性,就直接调用
LockSupport#unpark
唤醒。如果node节点(指同步队列中tail节点)的前驱节点状态正常,则由
AQS#unparkSuccessor
方法中(释放锁时)执行,按照顺序从 head 向 tail 挨个唤醒排队的节点。
6. 测试示例
1 |
|
注意:上面示例代码中,await
、signal
方法的调用都在临界区中(被lock
和unlock
环绕)。