Java 中大部分锁都是基于 AQS 实现的,通过 ReentranLock 管中窥豹。
从 ReentrantLock 的角度看,AQS 中使用 Node 维护了一个双向链表(CLH),用来存储未竞争到锁处于阻塞状态的线程;
1. ReentrantLock
ReentranLock
是一种支持重入的排他锁,支持公平与非公平两种模式;
排他锁
应用在修改数据的场景;
同一时间只能有一个线程持有锁;确保不会同时对同一资源进行多重修改;共享锁
应用于只读数据的场景;
不能修改数据;同一时间可以有多个线程持有锁;
1.1. 与AQS的关系
先不看代码,先看下 ReentrantLock 与 AQS 的关系;
向上
ReentrantLock 实现了Lock
接口;向下
ReentrantLock 内部有一个抽象静态内部类ReentrantLock.Sync
;它继承自 AQS;ReentrantLock.Sync
是 ReentrantLock 锁的同步控制基础;其内部使用AQS维护锁的状态、持有次数。ReentrantLock.Sync
有两个实现(静态内部类):ReentrantLock.NonfairSync
(非公平锁)和ReentrantLock.FairSync
(公平锁)。
1.2. 构造方法
ReentrantLock 有两个构造方法,一个是带参数的,另一个是不带参数的。
不带参数的构造方法;默认返回非公平锁。
1
2
3public ReentrantLock() {
sync = new NonfairSync();
}带参数的构造方法;支持传boolean参数;(true - 公平锁,false - 非公平锁)。
1
2
3public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
从上面 ReentrantLock 的结构信息(或者快速翻阅下源码)可以发现, ReentrantLock 的底层是AQS实现的;AQS提供了原子式管理同步状态、阻塞/唤醒线程、队列模型等功能。
2. AQS速览
只关心和本篇内容相关的参数
2.1. AQS成员变量
变量名 | 含义 | 备注 |
---|---|---|
transient volatile Node head; | 同步队列(SyncQueue / CLH)的头(节点) | 用于构建同步队列 |
transient volatile Node tail; | 同步队列(SyncQueue / CLH)的尾(节点) | 用于构建同步队列 |
volatile int state; | 同步状态 | AQS中使用该值表示同步状态; 通过CAS修改保证原子性。 |
2.2. Node
AbstractQueuedSynchronizer.Node
是一个静态内部类,是AQS内部维护同步队列(SyncQueue / CLH)的基本数据结构。
变量名 | 含义 | 备注 |
---|---|---|
volatile int waitStatus; | 当前节点在同步队列(SyncQueue / CLH)中的状态 | |
volatile Thread thread; | 包装在节点内部的线程 | |
volatile Node prev; | 前驱指针(指向前驱节点) | 用于构建同步队列 |
volatile Node next; | 后继指针(指向后继节点) | 用于构建同步队列 |
2.3. waitStatus枚举值
waitStatus
表示当前Node节点在双向同步队列(SyncQueue / CLH)中的状态;
字面量 | 值 | 含义 |
---|---|---|
0 | Node初始化时的默认值 | |
CANCELLED | 1 | 当前节点(线程),获取锁的请求被取消、中断。 |
SIGNAL | -1 | 当前节点(线程),处于等待状态;等待上一个持有锁的节点(线程)释放锁。 |
3. 加锁逻辑
ReentrantLock.FairSync
(公平锁)的实现代码1
2
3
4
5// java.util.concurrent.locks.ReentrantLock.FairSync
final void lock() {
acquire(1);
}ReentrantLock.NonfairSync
(非公平锁)的实现代码1
2
3
4
5
6
7
8// java.util.concurrent.locks.ReentrantLock.NonfairSync
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
观察上面两种锁加锁的实现代码,ReentrantLock.NonfairSync
(非公平锁)上来会直接试图抢占锁,即:通过CAS设置变量state(compareAndSetState方法);
如果抢占成功(获取锁成功),将当前线程设置为独占线程,然后就可以执行相应的业务处理。
如果抢占失败(获取锁失败),和ReentrantLock.FairSync
(公平锁)一样,执行AQS#acquire
方法。
3.1. acquire方法
如上面所述,ReentrantLock.NonfairSync
(非公平锁)抢占失败后,和ReentrantLock.FairSync
(公平锁)一样,都会执行AQS#acquire
方法;
1 | // java.util.concurrent.locks.AbstractQueuedSynchronizer |
方法名 | 作用、含义 | 备注 |
---|---|---|
tryAcquire | 尝试以独占方式获取锁; 成功返回true,失败返回false | AQS中定义,并做了简单实现ReentrantLock.NonfairSync (非公平锁)和ReentrantLock.FairSync (公平锁)重写了其中逻辑。 |
addWaiter | 将线程包装成Node,放入同步队列中。 | 在AQS中定义并实现。 如果同步队列为空,会在其中进行初始化。 |
acquireQueued | 对在同步队列中排队的线程进行获取锁的操作。 | |
selfInterrupt | 中断线程 |
3.2. tryAcquire
此方法的作用是尝试以独占方式获取锁;
该方法在AQS中定义,并做了简单的实现,具体获取锁的实现方法,由各自锁(子类)重写;ReentrantLock.NonfairSync
(非公平锁)和ReentrantLock.FairSync
(公平锁)都重写了其中逻辑。
ReentrantLock.FairSync
(公平锁)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26// java.util.concurrent.locks.ReentrantLock.FairSync
protected final boolean tryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取AQS中维护的state状态
int c = getState();
// state=0,当前没有线程持有锁,可以竞争
if (c == 0) {
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
// 竞争锁成功
return true;
}
}
// 检查独占线程是否是当前线程
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
// 重入
return true;
}
return false;
}ReentrantLock.NonfairSync
(非公平锁)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29// java.util.concurrent.locks.ReentrantLock.NonfairSync
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
// java.util.concurrent.locks.ReentrantLock.Sync
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取AQS中维护的state状态
int c = getState();
// state=0,当前没有线程持有锁,可以竞争
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 检查独占线程是否是当前线程
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
// 重入
return true;
}
return false;
}
眯一下眼睛,观察上面两种锁tryAcquire
的实现源码,大部分的逻辑是相同的;
- 获取当前线程、以及当前state的状态。
- state=0,说明可以竞争锁,执行相关竞争锁的逻辑;
- state != 0,检查独占线程是否是当前线程,如果是,那就重入;
3.2.1. 差异
唯一的区别是在第二步,即state = 0时,ReentrantLock.FairSync
(公平锁)比ReentrantLock.NonfairSync
(非公平锁)多了一个!hasQueuedPredecessors()
判断条件,即:如果队列中还有阻塞等待的线程,那当前线程就会竞争失败;
此方法放回boolean值,返回true说明获得了锁,返回false获取锁失败。
返回true代表着获取锁成功,后续的addWaiter、acquireQueued方法也就没有必要执行了;
只有返回false,获取锁失败,才需要继续往后执行addWaiter、acquireQueued方法。
3.3. addWaiter
执行acquire
方法,会通过tryAcquire
方法尝试获取锁;这种情况下,如果获取失败,则会调用addWaiter
方法,将当前线程包装成Node,添加到同步队列(SyncQueue / CLH)的尾部。
需要注意的是,Node节点加入同步队列(SyncQueue / CLH)的尾部并不是原子操作,因此通过CAS方式插入;其中AQS#enq
方法中,会对同步队列(SyncQueue / CLH)进行初始化。
注意:同步队列(SyncQueue / CLH)中,第一个节点为虚节点,其实并不存储任何信息,只是占位。真正的第一个有数据的节点,是在第二个节点开始的。
1 | // java.util.concurrent.locks.AbstractQueuedSynchronizer |
调用addWaiter
方法,传参是包装了当前线程的Node节点;
有两种场景会进入enq
方法:
同步队列尚未构建时
同步队列尚未构建,tail 和 pred 都是null,因此会进入
enq
方法;enq
方法中会进行自旋,只有成功初始化同步队列,并将当前节点加入队列后,才会停止自旋;enq
方法的第一次自旋通过
compareAndSetHead
方法,以CAS方式设置head头节点;然后将尾节点也指向头节点;此时头尾节点指向同一个Node。注意:调用
compareAndSetHead
方法时,传参是一个没有设置参数的Node。enq
方法的第二次自旋此时头尾节点不为空,走到else逻辑中;
将当前节点(包装了当前线程的Node节点)的前驱节点设置为tail节点;
然后通过compareAndSetTail(t, node)
方法,以CAS方式重新设置tail尾节点,同时将t的后继节点指向当前节点;
成功设置以后,退出自旋;
假设,这时又有一个线程进入了
addWaiter
方法;
因为同步队列已经初始化过了,pred(tail)不为空,所以不会再进入enq方法;
而是走addWaiter
方法中的正常逻辑,添加到同步队列的尾部。在addWaiter方法中,CAS设置tail节点失败(compareAndSetTail)时
多个节点(线程)同时通过CAS设置tail节点,肯定会有节点(线程)失败;
失败的节点(线程),也会进入
enq
方法,进行自旋,直到成功加入同步队列(SyncQueue / CLH)为止。
调用addWaiter
成功后,会把当前线程包装的Node返回;继续调用acquireQueued方法。
3.4. acquireQueued
注意:此时node已经被加入到同步队列(SyncQueue / CLH)的尾部
当一个线程获取锁失败了,会被放入同步队列(SyncQueue / CLH)中(从尾部添加),acquireQueued
会对加入同步队列(SyncQueue / CLH)排队的节点添加自旋检查;
自旋检查做两件事情:1.节点能否竞争到锁;2.判断当前节点(线程)是否需要阻塞;
直到节点(线程)获取锁成功或者不再需要获取(异常、中断);
1 | //java.util.concurrent.locks.AbstractQueuedSynchronizer |
自旋检查时,如果竞争锁的节点的前驱节点不是头节点,那就没有必要去竞争锁(FIFO,从头节点方向挨个执行,不能跨越);
3.4.1. shouldParkAfterFailedAcquire
上面说过,acquireQueued
会对加入同步队列排队的节点添加自旋检查;自选检查其中一步操作就是:判断当前节点(线程)是否需要阻塞;
此方法就是用来检查节点状态,判断是否需要阻塞的。
1 | // pred:当前节点的前驱节点; |
剔除取消状态的node:基于当前tail节点,向前查找有效的节点,并重新链接prev(前驱指针);
注意代码中while的条件是:
pred.waitStatus > 0
;向前查找到有效节点时就会结束循环;如下图,node3节点的状态是异常的,node4会将prev向前链接到状态正常的node2;因为node2状态是正常的,至此while循环结束。
如果node2状态也是异常的,那就会继续下一个while循环;node4会将prev链接到node1。
设置前驱节点的waitStatus(compareAndSetWaitStatus)
结合上层
acquireQueued
方法中的自旋,多次调用shouldParkAfterFailedAcquire
方法。只有前驱节点的
waitStatus
为SIGNAL(-1)时才会结束自旋,返回true,然后才能进入parkAndCheckInterrupt
方法,执行线程中断的操作。自旋至少需要循环两次,才会返回true;
3.4.2. parkAndCheckInterrupt
1 | private final boolean parkAndCheckInterrupt() { |
LockSupport.park
:阻塞当前线程Thread.interrupted()
:判断线程的中断状态。运行到
LockSupport.park(this);
这一行,当前节点(线程)会被阻塞;后面被unPark
方法唤醒时,才继续往下执行。
此方法返回boolean值,代表的是当前线程的中断标识,true:线程中断、false:线程未中断。
在当前节点(线程)阻塞过程中,人为(代码业务处理)调用了
Thread.currentThread().interrupt()
方法。
3.5. selfInterrupt
此方法很简单,只是设置当前节点(线程)的中断标识。
1 | static void selfInterrupt() { |
此方法在整个流程处理中的作用是什么?
当前节点(线程)在同步队列中阻塞等待时,人为(代码业务处理)调用了Thread.currentThread().interrupt()
方法;说明该线程被中断(停止)了,acquireQueued`方法会记录下来,然后交给此方法处理。
另一个角度说明,在等待阻塞的过程中,即使人为(代码业务处理)调用了线程的Thread.currentThread().interrupt()
方法,被唤醒后还是会不断地去尝试获取锁,直到抢到锁为止;
也就是说,在整个竞争锁的流程(同步队列阻塞等待)中,不响应中断,只记录中断(标识);等抢到锁以后再判断是否被中断过,如果被中断过,AQS会去调用线程的interrupt
方法进行中断。
4. 解锁逻辑
ReentrantLock 中ReentrantLock.NonfairSync
(非公平锁)和ReentrantLock.FairSync
(公平锁)使用同一个解锁方法;
1 | //java.util.concurrent.locks.ReentrantLock |
4.1. release
上面说过,抽象静态内部类ReentrantLock.Sync,
它继承自AQS;这里看似调用ReentrantLock.Sync
的方法,但实际调用的父类AQS的release方法。
1 | //java.util.concurrent.locks.AbstractQueuedSynchronizer |
方法名 | 作用、含义 |
---|---|
tryRelease | AQS中定义并简单实现;ReentrantLock.Sync 中重写 |
unparkSuccessor | 唤醒后继节点 |
4.2. tryRelease
tryRelease
方法在AQS中定义,并做了简单实现;ReentrantLock.Sync
类进行了重写;
ReentrantLock.Sync
是ReentrantLock.NonfairSync
(非公平锁)和ReentrantLock.FairSync
(公平锁)的父类。
1 | // java.util.concurrent.locks.ReentrantLock.Sync |
需要注意的是,只有当state
等于0时,free才会是true(完全释放);如果是重入锁,线程重了多次,但只释放了一次,是不行的。
4.3. unparkSuccessor
这个方法名做到了"见名知意",唤醒继任者;参数列表中的node是head头节点;
1 | // java.util.concurrent.locks.AbstractQueuedSynchronizer |
如下图,当head头节点的后继节点为空、或者状态异常时,会触发一个机制;
这个机制,会从tail尾节点向前挨个查找、剔除异常状态的节点,直到循环完整个同步队列,找到有效的、离head最近的节点;
下图中,从tail尾节点开始扫描,全部扫描完,head头节点有效的后继节点是第三次循环中关联的node2节点。
5. 何时从同步队列剔除异常状态的Node
以 ReentrantLock 锁视角去看,在两个地方会将异常状态的Node从同步队列中剔除,重新连接prev或next指针。
这里说的,异常状态的node指的是:
- node为null
- node的waitStatus状态为
CANCELLED
其他认为是正常的node节点。
- 添加节点(线程)到同步队列后,节点自旋检查时;
源码位置:AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire
触发:tail尾节点的前驱节点状态异常;
执行动作:从tail尾节点向前扫描查找,只要找到正常节点时就会结束扫描;
- 释放锁,调用
unparkSuccessor
方法唤醒下一个节点时;
源码位置:AbstractQueuedSynchronizer#unparkSuccessor
触发:head头节点的后驱节点状态异常;
执行动作:从tail尾节点向前扫描查找,扫描整个同步队列;
6. 为何使用双向链表
持有锁的线程在释放锁,唤醒下一个线程时,是从前向后(head -> tail)逐个唤醒的。
剔除异常状态的node时,是从后向前(head <- tail)剔除的。
如果上面两个场景同时从同一方向一起开始工作,在对同一节点(线程)进行状态更新、判断时会存在冲突的问题;
使用双向链表就是为了避免出现上述问题,减少二者之间的冲突;
7. 整个流程中,非公平锁竞争了几次锁
假设:当前锁被某线程持有,正在运行,一直没有释放,采用非公平锁方式。
结论:一共竞争了3次锁。
第一次:lock 方法
调用
lock
方法时,上来就会无理由的竞争一次锁;1
2
3
4
5
6
7
8// java.util.concurrent.locks.ReentrantLock.NonfairSync#lock
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}第二次:tryAcquire 方法
第一步竞争失败后,进入
AQS#acquire
方法中,其中会调用tryAcquire
方法;非公平锁的实现中,又会竞争一次锁。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) {
...
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
...
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
...
return true;
}
return false;
}
第三次:acquireQueued 方法
节点(线程)尾插法加入同步队列后,开始自旋检查,每次自旋都会试图抢占锁(tryAcquire 方法)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
...
return interrupted;
}
...
}
} finally {
...
}
}