Condition 和 AQS
2025-01-22 08:19:30    4.9k 字   
This post is also available in English and alternative languages.

Condition 被用来替代传统的Object的 wait()notify(),实现线程间的协作;

相比使用 Object 的 wait()notify(),使用 Conditionawait()signal() 这种方式实现线程间协作更加安全和高效。


1. 前言

考虑文章排版,测试示例放在了文章的末尾;建议先运行测试示例,了解代码运行流程再看源码会更深刻;

如不了解 ReentrantLock,建议移步 《ReentrantLock 和 AQS》 ,了解完 ReentrantLock 再看 Condition,否则可能会一头雾水。


2. Condition

2.1. 与AQS的关系

AQS类结构

Java中的 Condition 的实现是 ConditionObject,它是 AbstractQueuedSynchronizer 中的内部类;


2.2. ConditionObject成员变量

变量名含义备注
transient Node firstWaiter;条件队列(ConditionQueue)第一个节点用于构建条件队列
下面用 first节点 代称
transient Node lastWaiter;条件队列(ConditionQueue)最后一个节点用于构建条件队列
下面用 last节点 代称
static final int THROW_IE = -1;中断标识收到signal信号前被中断
static final int REINTERRUPT = 1;中断标识收到signal信号后被中断

2.3. 构造方法

1
2
3
4
5
6
7
8
9
// java.util.concurrent.locks.ReentrantLock#newCondition
public Condition newCondition() {
return sync.newCondition();
}

// java.util.concurrent.locks.ReentrantLock.Sync#newCondition
final ConditionObject newCondition() {
return new ConditionObject();
}

3. await 逻辑

awaitsignal方法的调用都在临界区中(被lockunlock环绕);当前线程运行到Condition#await方法,说明此时线程已经拿到了锁;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()

public final void await() throws InterruptedException {
// 当前线程如果已经中断,直接抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 向等待队列中添加一个新的等待者
Node node = addConditionWaiter();
// 完全释放当前线程持有的锁,保存state
int savedState = fullyRelease(node);
int interruptMode = 0;
// 如果节点不在同步队列中,意味着还没有调用signal方法,应该阻塞
while (!isOnSyncQueue(node)) {
// 阻塞
LockSupport.park(this);
// 被唤醒,即调用了unPark方法,检查是否被取消中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 竞争锁,并做后置处理
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
方法名作用、含义备注
addConditionWaiter将调用await方法的当前线程包装成node,添加到条件队列(ConditionQueue)从尾部添加
fullyRelease完全释放当前线程持有的锁如当前线程重入好几次,则一并释放
isOnSyncQueue判断节点是否在同步队列中
checkInterruptWhileWaiting被唤醒后检查是否被取消中断
acquireQueued对加入同步队列排队的节点添加自旋检查
unlinkCancelledWaiters清理已经拿到锁的节点
reportInterruptAfterWait向上抛出节点状态中断异常,或者中断节点

3.1. addConditionWaiter

此方法会将调用await方法的当前线程包装成node,添加到条件队列(ConditionQueue)中;如果条件队列(ConditionQueue)为空,则会进行初始化;

添加结束,会把已添加到条件队列(ConditionQueue)的node返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#addConditionWaiter

private Node addConditionWaiter() {
// 第一次时为null
Node t = lastWaiter;

// If lastWaiter is cancelled, clean out. 跳过
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 当前线程包装成node节点,并且将node的waitStatus设置为(CONDITION,-2)
Node node = new Node(Thread.currentThread(), Node.CONDITION);

// 第一次时为null
if (t == null)
// 将队列的 first节点 指向 新构建的node节点
firstWaiter = node;
else
// 说明队列不为空
// 尾插法,将当前节点 指向 上一个 last节点后继节点,即:添加到条件队列尾部。
t.nextWaiter = node;

/*
第一次:的 first 和 last 指向同一个node节点。
不是第一次:将last节点指针,指向新添加的node节点。
*/
lastWaiter = node;
return node;
}
  • 第一次初始化构建队列

    初始化构建队列

    初始化构建队列

  • 第二次正常添加

    将node添加到条件队列的尾部

    将node添加到条件队列的尾部

3.2. fullyRelease

此方法的作用是完全释放当前线程持有的锁;

什么是完全释放?ReentrantLock是可重入的,一个线程可能获取了好几次锁。
state:AQS中使用该值表示同步状态,通过CAS修改保证原子性,同时也是线程重入的次数;

release方法的逻辑在 ReentrantLock 和 AQS 中阐述过,此处不再赘述。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// java.util.concurrent.locks.AbstractQueuedSynchronizer#fullyRelease

final int fullyRelease(Node node) {
boolean failed = true;
try {
// 获取AQS中维护的state值
int savedState = getState();
// 当前线程释放锁
if (release(savedState)) {
// 释放成功,将 state 值返回
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
// 当前线程释放锁失败,抛出异常并设置 waitStatus 为 1;
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

3.3. isOnSyncQueue

注意:此方法中的prevnext属性,是ReentrantLock中用于维护同步队列的属性

此方法作用是:判断节点是否在同步队列中;

  • 返回false:当前节点不在同步队列中

    说明当前节点没有被唤醒去竞争锁;需要将当前节点阻塞,直到其他线程调用signal唤醒。

  • 返回true:当前节点在同步队列中。

    说明当前节点已经被唤醒,需要去竞争锁,执行业务。

1
2
3
4
5
6
7
8
9
10
11
// java.util.concurrent.locks.AbstractQueuedSynchronizer#isOnSyncQueue

final boolean isOnSyncQueue(Node node) {
// node 的 waitStatus 是CONDITION;前驱节点为null
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null)
// 说明当前节点包含 后继节点,说明已经在同步队列中
return true;
return findNodeFromTail(node);
}

  • 为什么node.prev == null可以作为判断node不在同步队列的依据?

    站在ReentrantLock视角:

    1. 添加node到同步队列时,首先会先将node的前驱节点指向当前tail节点,即:node.prev = pred
    2. 此时node的前驱节点是不为null的。
    3. 然后通过CAS设置tail节点(compareAndSetTail),即使设置失败,此时node的前驱节点依旧指向当前tail节点;
    4. 第三步CAS方式设置tail失败后,会进入enq方法,自旋直到成功加入同步队列为止。

    因此,说明两个问题:

    1. node的前驱节点(node.prev)为null,node肯定不在同步队列(SyncQueue / CLH)中

    2. node的前驱节点(node.prev)不为null,不能说明node就在同步队列中

    虽然prev不为null,但通过CAS设置tail失败,正处于自旋的过程中,并没有成功加入同步队列。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    // java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter

    private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
    node.prev = pred;
    if (compareAndSetTail(pred, node)) {
    pred.next = node;
    return node;
    }
    }
    enq(node);
    return node;
    }

  • 为什么node.next != null可以作为判断node在同步队列的依据?

    和上面的问题类似;
    站在ReentrantLock视角,当node添加到同步队列时,设置next指针都是通过CAS方式进行的(原子的);
    所以,只要next不为空,那肯定是已经添加到同步队列中了。


如果两个if判断都没有得到结果,那就只能拿着node去同步队列中找,进入findNodeFromTail方法。


3.3.1. findNodeFromTail

此方法的作用是:将已添加到条件队列(ConditionQueue)的node,去同步队列从后向前挨个比较,如果存在说明node在同步队列中。

  • 返回false:当前节点不在同步队列中

    说明当前节点没有被唤醒去竞争锁;需要将当前节点阻塞,直到其他线程调用signal唤醒。

  • 返回true:当前节点在同步队列中。

    说明当前节点已经被唤醒,需要去竞争锁,执行业务。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    // java.util.concurrent.locks.AbstractQueuedSynchronizer#findNodeFromTail

    private boolean findNodeFromTail(Node node) {
    // t指针指向同步队列中的节点;此时指向同步队列的tail(尾节点)
    Node t = tail;
    // 从同步队列尾部向头部查找;
    for (;;) {
    if (t == node)
    // 当前节点 和 同步队列中的某个节点相同
    return true;
    if (t == null)
    // 已循环到head头节点,退出自旋
    return false;
    // t指针指向前一个节点
    t = t.prev;
    }
    }

3.4. 被阻塞

1
2
3
4
5
6
7
8
9
// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()

// node不在同步队列中,返回false,进入while循环
while (!isOnSyncQueue(node)) {
// 阻塞线程
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
  1. 什么情况下会进入while循环?

    node不在同步队列中,即isOnSyncQueue方法返回false;
    说明当前节点没有被唤醒去竞争锁(还没有调用signal),需要将当前节点阻塞,直到其他线程调用signal唤醒。

  2. 什么情况下会结束while循环?

    需要注意的是,结束循环的前提一定是,其他线程调用了unPark方法,将当前节点唤醒;

    什么情况下,会调用unPark方法?这块逻辑关联signal的处理与锁释放,此处先按下不表

假设其他线程调用了LockSupport.unPark方法,当前线程就是被唤醒了,以下两种情况,会结束while循环;

  1. node中断

    从上述代码的第7行继续执行;
    被唤醒后,会去检查节点(线程)是否被中断;如果被中断(interruptMode != 0),会结束while循环。

  2. node添加到同步队列后

    从上述代码的第7行继续执行;
    被唤醒后,检查节点(线程)没有被中断;再次调用isOnSyncQueue方法;
    isOnSyncQueue方法返回true,即当前节点已经添加到在同步队列中,结束循环。

剧透下,添加到同步队列,是在signal方法中实现;

await方法逻辑走到这里,暂告一段落;至此,调用await方法的线程,已经添加到了条件队列,并且已经被LockSupport#park阻塞,建议跳转到<signal逻辑>章节,阅读完再回来往下继续看


3.5. 被唤醒

假设已经看完<signal逻辑>

signal方法逻辑中,会从条件队列(ConditionQueue)将第一个节点取出(指向first),然后以自旋方式,添加到同步队列尾部;

  • 如果node节点(指同步队列中tail节点)的前驱节点状态不正常,Condition为了保证可靠性,会直接调用LockSupport#unpark唤醒。

  • 如果node节点(指同步队列中tail节点)的前驱节点状态正常,则由AQS#unparkSuccessor方法中(释放锁时)执行,按照顺序从 head 向 tail 挨个唤醒排队的节点。

假设此时同步队列中排队的node被唤醒,执行AQS#unparkSuccessor方法,调用LockSupport#unpark唤醒上面被阻塞的线程。

此时被阻塞的线程会从checkInterruptWhileWaiting方法继续执行;检查节点(线程)没有被中断;继续下一次循环,再次调用isOnSyncQueue方法;此时isOnSyncQueue`方法返回true,即当前节点在同步队列中,结束while循环。


3.6. acquireQueued

acquireQueued方法的逻辑在 ReentrantLock 和 AQS 中阐述过,此处不再赘述。

此方法会对加入同步队列排队的节点添加自旋方式的竞争和检查;
自旋检查做两件事情:1.节点能否竞争到锁;2.判断当前节点(线程)是否需要阻塞;
直到节点(线程)获取锁成功或者不再需要获取(异常、中断);

该方法的返回值是boolean,代表自旋过程中,节点/线程 是否被中断过;true - 中断过,false - 没有中断过。


3.7. 后置处理

个人将acquireQueued方法之后的逻辑,统称为后置处理;它们主要负责兜底和扫尾。

1
2
3
4
5
6
7
8
// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()

if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);

  1. 第一个判断

    1
    2
    3
    4
    // java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()

    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    interruptMode = REINTERRUPT;

    acquireQueued方法返回true,说明节点/线程在同步队列排队阻塞的过程中,被中断;

    interruptMode != THROW_IE为true,代表节点在收到signal信号前没有被中断;

    interruptMode标识设置为REINTERRUPT。

  2. 第二个判断

    1
    2
    3
    4
    // java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()

    if (node.nextWaiter != null) // clean up if cancelled
    unlinkCancelledWaiters();

    逻辑走到这个if判断,说明不论node是否被中断,它都已经竞争到了锁;

    如果node的后继节点不为空,调用unlinkCancelledWaiters方法进行清理;

  3. 第三个判断

    1
    2
    if (interruptMode != 0)
    reportInterruptAfterWait(interruptMode);

    interruptMode什么时候不为0?

    • 首先interruptMode初始化是0;
    • node阻塞被唤醒后检查状态,node收到signal信号前被中断interruptMode = -1;node收到signal信号后被中断interruptMode = 1
    • node在同步队列中竞争锁的过程中被中断,而且node收到signal信号前没有被中断,interruptMode = 1

    可以发现,只要node被中断过,不论是什么时机都会进入reportInterruptAfterWait方法;


4. signal 逻辑

awaitsignal方法的调用都在临界区中(被lockunlock环绕);当前线程运行到Condition#await方法,说明此时线程已经拿到了锁;

1
2
3
4
5
6
7
8
9
10
11
12
// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal

public final void signal() {
// 当前线程如果没有获得锁,抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 获取 条件队列(ConditionQueue)中的 first节点
Node first = firstWaiter;
if (first != null)
// 如果 条件队列(ConditionQueue)中的 first节点不为空,调用doSignal方法
doSignal(first);
}
方法名作用、含义备注
isHeldExclusively判断调用signal方法的线程是否持有锁
doSignal执行发送signal信号

4.1. isHeldExclusively

此方法判断调用signal方法的线程是否持有锁;true:持有锁,false:未持有锁

1
2
3
4
5
6
// java.util.concurrent.locks.ReentrantLock.Sync#isHeldExclusively

protected final boolean isHeldExclusively() {
// 当前线程是否持有锁
return getExclusiveOwnerThread() == Thread.currentThread();
}

4.2. doSignal

此方法接收的node是从条件队列(ConditionQueue)中获取的first节点(也就是条件队列中的第一个节点)。

注意:这里循环使用的是do...while,首先执行语句块,再判断表达式,至少执行一次;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#doSignal

private void doSignal(Node first) {
do {
/*
1.将 firstWaiter 指针指向当前first节点的后继节点
2.firstWaiter 为空;说明条件队列中没有节点了,清理lastWaiter
*/
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 此时 firstWaiter 指针指向了后一个节点;因此将 first.nextWaiter 清空。
first.nextWaiter = null;
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
方法名作用、含义备注
transferForSignal将node从 条件队列(ConditionQueue)转移到 同步队列
true - 成功,flase - 失败(节点/线程 被取消中断)
  • 语句块

    循环使用的是do...while,首先执行语句块,再判断表达式,至少执行一次;

    1
    2
    3
    4
    5
    ...
    if ( (firstWaiter = first.nextWaiter) == null)
    lastWaiter = null;
    first.nextWaiter = null;
    ...
    • 将firstWaiter指针指向 first的后继节点;

    • 后继节点为空,说明条件队列中没有节点了,清理 lastWaiter;

    • 不论firstWaiter是否为空,都会清理first的nextWaiter属性(此举意味着解除first节点与条件队列的关联);

    从条件队列中取出第一个节点

  • 表达式

    1
    !transferForSignal(first) && (first = firstWaiter) != null

    transferForSignal方法将从条件队列中取出的node,添加到同步队列中,添加成功返回true;

    • 加入同步队列成功,不会再执行语句块;

    • 加入同步队列失败(说明节点被取消中断了,只有节点被中断才会返回false)

      • 将firstWaiter指针指向的node赋值给first,循环再次进入语句块。
    将节点从条件队列取出添加到同步队列

4.2.1. transferForSignal

此方法作用是将从条件队列中取出的node,添加到同步队列中,添加成功返回 true。

参数列表中的node就是从条件队列中按顺序取出的node;

  • 返回true:node添加到同步队列成功。

  • 返回false:node被取消、中断。

enq方法的逻辑在 ReentrantLock 和 AQS 中阐述过;
此处简述:enq方法通过自旋方式,采用尾插法,将node添加到同步队列;
调用成功的话,添加进去的node节点是同步队列的tail尾节点;enq方法返回的是,同步队列中tail节点的前驱节点(尾节点的前驱节点)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// java.util.concurrent.locks.AbstractQueuedSynchronizer#transferForSignal

final boolean transferForSignal(Node node) {
/*
尝试用CAS形式,修改 node 的 waitStatus 为0;
修改失败,该节点已经被中断;修改成功,该节点正常。
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
通过自旋方式,采用尾插法,将node添加到 同步队列
调用成功的话,此时node节点为 同步队列的tail尾节点。
enq返回的是,同步队列中tail节点的前驱节点(尾节点的前驱节点)。
*/
Node p = enq(node);
// 获取前驱节点的waitStatus
int ws = p.waitStatus;
// 前驱节点的状态是CANCELLED(取消) 或者 修改前驱节点的waitStatus值为SIGNAL(-1)失败,调用unPark方法
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

  • 为何添加到同步队列后会存在LockSupport#unpark操作?

    按照理想状态,当节点从 条件队列(ConditionQueue)转移到同步队列后,unPark操作应该由AQS#unparkSuccessor方法中(释放锁时)执行,按照顺序从 head 向 tail 挨个唤醒排队的节点;

    1
    2
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
    LockSupport.unpark(node.thread);

    首先明确下,p对象代表的是 同步队列tail节点的前驱节点。

    观察上面代码中的两个判断:

    1. ws > 0

      代表前驱节点的状态是CANCELLED,即非正常状态;

    2. !compareAndSetWaitStatus(p, ws, Node.SIGNAL)

      尝试用CAS方式,将前驱节点的状态更新为SIGNAL(-1);更新失败,返回false;

    这两个条件可以判定 同步队列tail节点的前驱节点 状态是不正常的;

    虽然AQS中有剔除非正常状态节点的机制,即释放锁,调用unparkSuccessor方法唤醒下一个节点时,如果head头节点的后驱节点状态异常,会从tail尾节点向前扫描整个链表,剔除非正常节点;
    Condition为了保证tail节点一定会被唤醒,提高可靠性,这里直接调用LockSupport#unpark唤醒。

signal逻辑到这里结束,如果是从<await逻辑#被阻塞>章节跳转而来,可以回头从<await#被唤醒>章节继续往下看,串联整个逻辑


5. 何时调用unPark?

什么时候会调用LockSupport.unpark方法,唤醒在await方法中阻塞的线程?这是<被阻塞>章节中,留下的问题;

signal方法逻辑中,会从条件队列(ConditionQueue)将第一个节点取出(指向first),然后以自旋方式,添加到同步队列尾部;

  • 如果node节点(指同步队列中tail节点)的前驱节点状态不正常,Condition为了保证可靠性,会直接调用LockSupport#unpark唤醒。

    虽然AQS中有剔除同步队列中非正常状态节点的机制,即释放锁,调用unparkSuccessor方法唤醒下一个节点时,如果head头节点的后驱节点状态异常,会从tail尾节点向前扫描整个链表,剔除非正常节点;

    但Condition为了保证同步队列的tail节点一定会被唤醒,提高可靠性,就直接调用LockSupport#unpark唤醒。

  • 如果node节点(指同步队列中tail节点)的前驱节点状态正常,则由AQS#unparkSuccessor方法中(释放锁时)执行,按照顺序从 head 向 tail 挨个唤醒排队的节点。


6. 测试示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@Slf4j
public class ConditionWaitExample implements Runnable {

private final Lock lock;
private final Condition condition;

public ConditionWaitExample(Lock lock, Condition condition) {
this.lock = lock;
this.condition = condition;
}

@Override
public void run() {
log.info("condition#wait - start");
lock.lock();
try {
condition.await();
log.info("condition#wait - end");
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
lock.unlock();
}
}
}

@Slf4j
public class ConditionSignalExample implements Runnable {

private final Lock lock;
private final Condition condition;

public ConditionSignalExample(Lock lock, Condition condition) {
this.lock = lock;
this.condition = condition;
}

@Override
public void run() {
log.info("condition#signal - start");
lock.lock();
try {
condition.signal();
log.info("condition#signal - end");
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
lock.unlock();
}
}
}

@Slf4j
public class ConditionExampleApp {
public static void main(String[] args) throws InterruptedException {

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();

ConditionWaitExample conditionWaitExample = new ConditionWaitExample(lock, condition);
ConditionSignalExample conditionSignalExample = new ConditionSignalExample(lock, condition);

new Thread(conditionWaitExample).start();

//确保 conditionWaitExample 先执行
TimeUnit.SECONDS.sleep(3);

new Thread(conditionSignalExample).start();
}
}

注意:上面示例代码中,awaitsignal方法的调用都在临界区中(被lockunlock环绕)。