ReentrantLock 和 AQS
2025-01-22 08:19:30    5k 字   
This post is also available in English and alternative languages.

Java 中大部分锁都是基于 AQS 实现的,通过 ReentranLock 管中窥豹。

从 ReentrantLock 的角度看,AQS 中使用 Node 维护了一个双向链表(CLH),用来存储未竞争到锁处于阻塞状态的线程;


1. ReentrantLock

ReentranLock是一种支持重入排他锁,支持公平与非公平两种模式;

  • 排他锁
    应用在修改数据的场景;
    同一时间只能有一个线程持有锁;确保不会同时对同一资源进行多重修改;

  • 共享锁
    应用于只读数据的场景;
    不能修改数据;同一时间可以有多个线程持有锁;


1.1. 与AQS的关系

ReentrantLock类结构图

先不看代码,先看下 ReentrantLock 与 AQS 的关系;

  1. 向上
    ReentrantLock 实现了Lock接口;

  2. 向下
    ReentrantLock 内部有一个抽象静态内部类ReentrantLock.Sync;它继承自 AQS
    ReentrantLock.Sync 是 ReentrantLock 锁的同步控制基础;其内部使用AQS维护锁的状态、持有次数。
    ReentrantLock.Sync 有两个实现(静态内部类):ReentrantLock.NonfairSync(非公平锁)和ReentrantLock.FairSync(公平锁)。


1.2. 构造方法

ReentrantLock 有两个构造方法,一个是带参数的,另一个是不带参数的。

  1. 不带参数的构造方法;默认返回非公平锁

    1
    2
    3
    public ReentrantLock() {
    sync = new NonfairSync();
    }
  2. 带参数的构造方法;支持传boolean参数;(true - 公平锁,false - 非公平锁)。

    1
    2
    3
    public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    }

从上面 ReentrantLock 的结构信息(或者快速翻阅下源码)可以发现, ReentrantLock 的底层是AQS实现的;AQS提供了原子式管理同步状态、阻塞/唤醒线程、队列模型等功能


2. AQS速览

只关心和本篇内容相关的参数

2.1. AQS成员变量

变量名含义备注
transient volatile Node head;同步队列(SyncQueue / CLH)的头(节点)用于构建同步队列
transient volatile Node tail;同步队列(SyncQueue / CLH)的尾(节点)用于构建同步队列
volatile int state;同步状态AQS中使用该值表示同步状态;
通过CAS修改保证原子性。

2.2. Node

AbstractQueuedSynchronizer.Node 是一个静态内部类,是AQS内部维护同步队列(SyncQueue / CLH)的基本数据结构。

变量名含义备注
volatile int waitStatus;当前节点在同步队列(SyncQueue / CLH)中的状态
volatile Thread thread;包装在节点内部的线程
volatile Node prev;前驱指针(指向前驱节点)用于构建同步队列
volatile Node next;后继指针(指向后继节点)用于构建同步队列

2.3. waitStatus枚举值

waitStatus表示当前Node节点在双向同步队列(SyncQueue / CLH)中的状态;

字面量含义
0Node初始化时的默认值
CANCELLED1当前节点(线程),获取锁的请求被取消、中断。
SIGNAL-1当前节点(线程),处于等待状态;等待上一个持有锁的节点(线程)释放锁。

3. 加锁逻辑

  • ReentrantLock.FairSync (公平锁)的实现代码

    1
    2
    3
    4
    5
    // java.util.concurrent.locks.ReentrantLock.FairSync

    final void lock() {
    acquire(1);
    }

  • ReentrantLock.NonfairSync(非公平锁)的实现代码

    1
    2
    3
    4
    5
    6
    7
    8
    // java.util.concurrent.locks.ReentrantLock.NonfairSync

    final void lock() {
    if (compareAndSetState(0, 1))
    setExclusiveOwnerThread(Thread.currentThread());
    else
    acquire(1);
    }

观察上面两种锁加锁的实现代码,ReentrantLock.NonfairSync(非公平锁)上来会直接试图抢占锁,即:通过CAS设置变量state(compareAndSetState方法);

如果抢占成功(获取锁成功),将当前线程设置为独占线程,然后就可以执行相应的业务处理。
如果抢占失败(获取锁失败),和ReentrantLock.FairSync (公平锁)一样,执行AQS#acquire方法。


3.1. acquire方法

如上面所述,ReentrantLock.NonfairSync(非公平锁)抢占失败后,和ReentrantLock.FairSync (公平锁)一样,都会执行AQS#acquire方法;

1
2
3
4
5
6
7
// java.util.concurrent.locks.AbstractQueuedSynchronizer

public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
方法名作用、含义备注
tryAcquire尝试以独占方式获取锁;
成功返回true,失败返回false
AQS中定义,并做了简单实现
ReentrantLock.NonfairSync(非公平锁)和ReentrantLock.FairSync (公平锁)重写了其中逻辑。
addWaiter将线程包装成Node,放入同步队列中。在AQS中定义并实现。
如果同步队列为空,会在其中进行初始化。
acquireQueued对在同步队列中排队的线程进行获取锁的操作。
selfInterrupt中断线程

3.2. tryAcquire

此方法的作用是尝试以独占方式获取锁
该方法在AQS中定义,并做了简单的实现,具体获取锁的实现方法,由各自锁(子类)重写;ReentrantLock.NonfairSync(非公平锁)和ReentrantLock.FairSync (公平锁)都重写了其中逻辑。

  • ReentrantLock.FairSync (公平锁)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    // java.util.concurrent.locks.ReentrantLock.FairSync

    protected final boolean tryAcquire(int acquires) {
    // 获取当前线程
    final Thread current = Thread.currentThread();
    // 获取AQS中维护的state状态
    int c = getState();
    // state=0,当前没有线程持有锁,可以竞争
    if (c == 0) {
    if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
    setExclusiveOwnerThread(current);
    // 竞争锁成功
    return true;
    }
    }
    // 检查独占线程是否是当前线程
    else if (current == getExclusiveOwnerThread()) {
    int nextc = c + acquires;
    if (nextc < 0)
    throw new Error("Maximum lock count exceeded");
    setState(nextc);
    // 重入
    return true;
    }
    return false;
    }

  • ReentrantLock.NonfairSync(非公平锁)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    // java.util.concurrent.locks.ReentrantLock.NonfairSync
    protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
    }

    // java.util.concurrent.locks.ReentrantLock.Sync
    final boolean nonfairTryAcquire(int acquires) {
    // 获取当前线程
    final Thread current = Thread.currentThread();
    // 获取AQS中维护的state状态
    int c = getState();
    // state=0,当前没有线程持有锁,可以竞争
    if (c == 0) {
    if (compareAndSetState(0, acquires)) {
    setExclusiveOwnerThread(current);
    return true;
    }
    }
    // 检查独占线程是否是当前线程
    else if (current == getExclusiveOwnerThread()) {
    int nextc = c + acquires;
    if (nextc < 0) // overflow
    throw new Error("Maximum lock count exceeded");
    setState(nextc);
    // 重入
    return true;
    }
    return false;
    }

眯一下眼睛,观察上面两种锁tryAcquire的实现源码,大部分的逻辑是相同的;

  1. 获取当前线程、以及当前state的状态。
  2. state=0,说明可以竞争锁,执行相关竞争锁的逻辑;
  3. state != 0,检查独占线程是否是当前线程,如果是,那就重入;

3.2.1. 差异

唯一的区别是在第二步,即state = 0时,ReentrantLock.FairSync (公平锁)比ReentrantLock.NonfairSync(非公平锁)多了一个!hasQueuedPredecessors()判断条件,即:如果队列中还有阻塞等待的线程,那当前线程就会竞争失败;

此方法放回boolean值,返回true说明获得了锁,返回false获取锁失败。
返回true代表着获取锁成功,后续的addWaiteracquireQueued方法也就没有必要执行了;
只有返回false,获取锁失败,才需要继续往后执行addWaiteracquireQueued方法。


3.3. addWaiter

执行acquire方法,会通过tryAcquire方法尝试获取锁;这种情况下,如果获取失败,则会调用addWaiter方法,将当前线程包装成Node,添加到同步队列(SyncQueue / CLH)的尾部。

需要注意的是,Node节点加入同步队列(SyncQueue / CLH)的尾部并不是原子操作,因此通过CAS方式插入;其中AQS#enq方法中,会对同步队列(SyncQueue / CLH)进行初始化。

注意:同步队列(SyncQueue / CLH)中,第一个节点为虚节点,其实并不存储任何信息,只是占位。真正的第一个有数据的节点,是在第二个节点开始的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// java.util.concurrent.locks.AbstractQueuedSynchronizer

private Node addWaiter(Node mode) {
// 将当前线程,构建一个新的node节点
Node node = new Node(Thread.currentThread(), mode);
// 声明一个 pred指针 指向尾节点 tail
Node pred = tail;
if (pred != null) {
// 不为空,说明同步队列中存在其他节点
// 将当前节点的 prev节点 设置为 pred
node.prev = pred;
// 通过CAS方式,设置尾节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// CAS方式设置头节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// CAS方式设置尾节点
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

调用addWaiter方法,传参是包装了当前线程的Node节点;

有两种场景会进入enq方法:

  1. 同步队列尚未构建时

    同步队列尚未构建,tail 和 pred 都是null,因此会进入enq方法;enq方法中会进行自旋,只有成功初始化同步队列,并将当前节点加入队列后,才会停止自旋;

    • enq方法的第一次自旋

      通过compareAndSetHead方法,以CAS方式设置head头节点;然后将尾节点也指向头节点;此时头尾节点指向同一个Node。

      注意:调用compareAndSetHead方法时,传参是一个没有设置参数的Node。

      自旋-第一次

    • enq方法的第二次自旋

      此时头尾节点不为空,走到else逻辑中;
      将当前节点(包装了当前线程的Node节点)的前驱节点设置为tail节点;
      然后通过compareAndSetTail(t, node)方法,以CAS方式重新设置tail尾节点,同时将t的后继节点指向当前节点;
      成功设置以后,退出自旋;

      自旋-第二次

    假设,这时又有一个线程进入了addWaiter方法;
    因为同步队列已经初始化过了,pred(tail)不为空,所以不会再进入enq方法;
    而是走addWaiter方法中的正常逻辑,添加到同步队列的尾部。

    addWaiter-正常添加到同步队列尾部

  2. 在addWaiter方法中,CAS设置tail节点失败(compareAndSetTail)时

    多个节点(线程)同时通过CAS设置tail节点,肯定会有节点(线程)失败;

    失败的节点(线程),也会进入enq方法,进行自旋,直到成功加入同步队列(SyncQueue / CLH)为止。


调用addWaiter成功后,会把当前线程包装的Node返回;继续调用acquireQueued方法。


3.4. acquireQueued

注意:此时node已经被加入到同步队列(SyncQueue / CLH)的尾部

当一个线程获取锁失败了,会被放入同步队列(SyncQueue / CLH)中(从尾部添加),acquireQueued会对加入同步队列(SyncQueue / CLH)排队的节点添加自旋检查;
自旋检查做两件事情:1.节点能否竞争到锁;2.判断当前节点(线程)是否需要阻塞;
直到节点(线程)获取锁成功或者不再需要获取(异常、中断);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//java.util.concurrent.locks.AbstractQueuedSynchronizer

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 阻塞等待过程中是否中断过
boolean interrupted = false;
// 自旋,要么获取锁,要么阻塞
for (;;) {
// 当前竞争锁的节点的前驱节点
final Node p = node.predecessor();
// 如果前驱节点是头结点,说明当前节点位于头节点的后一位,尝试获取锁;
if (p == head && tryAcquire(arg)) {
// 获取锁成功
// 将head指针指向当前node(注意:此方法并没有修改waitStatus)
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 判断当前node是否要被阻塞
// shouldParkAfterFailedAcquire:靠判断前驱节点状态决定当前节点是否应该被阻塞
// parkAndCheckInterrupt:阻塞当前节点(阻塞调用栈),唤醒后返回当前线程的中断标识
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

自旋检查时,如果竞争锁的节点的前驱节点不是头节点,那就没有必要去竞争锁(FIFO,从头节点方向挨个执行,不能跨越);


3.4.1. shouldParkAfterFailedAcquire

上面说过,acquireQueued会对加入同步队列排队的节点添加自旋检查;自选检查其中一步操作就是:判断当前节点(线程)是否需要阻塞;
此方法就是用来检查节点状态,判断是否需要阻塞的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// pred:当前节点的前驱节点;
// node:当前节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前驱节点的等待状态值
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 说明该节点已经设置了请求释放信号的状态,因此可以安全中断
return true;
if (ws > 0) {
// 节点(线程)取消执行;
// 循环,从当前节点的前驱节点一直往前, 将取消执行的节点剔除掉,直到状态正常的节点才停止循环。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);

// 循环剔除完毕,设置正常状态前驱节点的后置节点(next)
pred.next = node;
} else {
// 设置前驱节点的waitStatus为SIGNAL(-1)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
  • 剔除取消状态的node:基于当前tail节点,向前查找有效的节点,并重新链接prev(前驱指针)

    注意代码中while的条件是:pred.waitStatus > 0;向前查找到有效节点时就会结束循环;

    如下图,node3节点的状态是异常的,node4会将prev向前链接到状态正常的node2;因为node2状态是正常的,至此while循环结束。

    如果node2状态也是异常的,那就会继续下一个while循环;node4会将prev链接到node1。

剔除取消状态的节点

  • 设置前驱节点的waitStatus(compareAndSetWaitStatus)

    结合上层acquireQueued方法中的自旋,多次调用shouldParkAfterFailedAcquire方法。

    shouldParkAfterFailedAcquire流程分析

    只有前驱节点的waitStatus为SIGNAL(-1)时才会结束自旋,返回true,然后才能进入parkAndCheckInterrupt方法,执行线程中断的操作。

    自旋至少需要循环两次,才会返回true;


3.4.2. parkAndCheckInterrupt

1
2
3
4
5
6
private final boolean parkAndCheckInterrupt() {
// 阻塞当前线程
LockSupport.park(this);
// 返回线程中断标识
return Thread.interrupted();
}
  • LockSupport.park:阻塞当前线程

  • Thread.interrupted():判断线程的中断状态。

    运行到LockSupport.park(this);这一行,当前节点(线程)会被阻塞;后面被unPark方法唤醒时,才继续往下执行。

此方法返回boolean值,代表的是当前线程的中断标识,true:线程中断、false:线程未中断。

在当前节点(线程)阻塞过程中,人为(代码业务处理)调用了Thread.currentThread().interrupt()方法。


3.5. selfInterrupt

此方法很简单,只是设置当前节点(线程)的中断标识。

1
2
3
static void selfInterrupt() {
Thread.currentThread().interrupt();
}

此方法在整个流程处理中的作用是什么?

当前节点(线程)在同步队列中阻塞等待时,人为(代码业务处理)调用了Thread.currentThread().interrupt()方法;说明该线程被中断(停止)了,acquireQueued`方法会记录下来,然后交给此方法处理。

另一个角度说明,在等待阻塞的过程中,即使人为(代码业务处理)调用了线程的Thread.currentThread().interrupt()方法,被唤醒后还是会不断地去尝试获取锁,直到抢到锁为止;
也就是说,在整个竞争锁的流程(同步队列阻塞等待)中,不响应中断,只记录中断(标识);等抢到锁以后再判断是否被中断过,如果被中断过,AQS会去调用线程的interrupt方法进行中断


4. 解锁逻辑

ReentrantLock 中ReentrantLock.NonfairSync(非公平锁)和ReentrantLock.FairSync(公平锁)使用同一个解锁方法;

1
2
3
4
5
//java.util.concurrent.locks.ReentrantLock

public void unlock() {
sync.release(1);
}

4.1. release

上面说过,抽象静态内部类ReentrantLock.Sync,它继承自AQS;这里看似调用ReentrantLock.Sync的方法,但实际调用的父类AQS的release方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//java.util.concurrent.locks.AbstractQueuedSynchronizer

public final boolean release(int arg) {
// 子类实现逻辑返回true,说明锁被完全释放,没有任何线程持有
if (tryRelease(arg)) {
// 获取头节点
Node h = head;
// 头节点不为空,并且头节点的waitStatus不是初始化节点状态,解除后继线程阻塞状态。
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
方法名作用、含义
tryReleaseAQS中定义并简单实现;ReentrantLock.Sync中重写
unparkSuccessor唤醒后继节点

4.2. tryRelease

tryRelease方法在AQS中定义,并做了简单实现;ReentrantLock.Sync类进行了重写;

ReentrantLock.SyncReentrantLock.NonfairSync(非公平锁)和ReentrantLock.FairSync(公平锁)的父类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// java.util.concurrent.locks.ReentrantLock.Sync

protected final boolean tryRelease(int releases) {
// 减少可重入次数(-1)
int c = getState() - releases;
// 如果当前线程不是持有锁的线程,异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 释放锁的线程释放锁的结果标识
boolean free = false;
// 当前没有线程持有锁(持有线程全部释放完毕),设置结果标识并设置独占线程为null
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 重新设置state
setState(c);
return free;
}

需要注意的是,只有当state等于0时,free才会是true(完全释放);如果是重入锁,线程重了多次,但只释放了一次,是不行的。


4.3. unparkSuccessor

这个方法名做到了"见名知意",唤醒继任者;参数列表中的node是head头节点;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// java.util.concurrent.locks.AbstractQueuedSynchronizer

private void unparkSuccessor(Node node) {
// 头节点的waitStatus状态
int ws = node.waitStatus;
if (ws < 0)
// 说明状态可能是 SIGNAL、CONDITION、PROPAGATE
// cas方式清除状态
compareAndSetWaitStatus(node, ws, 0);

// head头节点的后继节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
// 后继节点为空 或者 后继节点状态异常(CANCELLED)

// 清空后继节点的指针
s = null;

// 从tail尾节点向前循环,查找状态正常的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒后继节点(线程)
LockSupport.unpark(s.thread);
}

如下图,当head头节点的后继节点为空、或者状态异常时,会触发一个机制;
这个机制,会从tail尾节点向前挨个查找、剔除异常状态的节点,直到循环完整个同步队列,找到有效的、离head最近的节点;

下图中,从tail尾节点开始扫描,全部扫描完,head头节点有效的后继节点是第三次循环中关联的node2节点。

unparkSuccessor_从后向前剔除异常状态节点

5. 何时从同步队列剔除异常状态的Node

以 ReentrantLock 锁视角去看,在两个地方会将异常状态的Node从同步队列中剔除,重新连接prev或next指针。

这里说的,异常状态的node指的是:

  1. node为null
  2. node的waitStatus状态为CANCELLED

其他认为是正常的node节点。

  1. 添加节点(线程)到同步队列后,节点自旋检查时;
    源码位置:AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire
    触发:tail尾节点的前驱节点状态异常;
    执行动作:从tail尾节点向前扫描查找,只要找到正常节点时就会结束扫描

  1. 释放锁,调用unparkSuccessor方法唤醒下一个节点时;
    源码位置:AbstractQueuedSynchronizer#unparkSuccessor
    触发:head头节点的后驱节点状态异常;
    执行动作:从tail尾节点向前扫描查找,扫描整个同步队列

6. 为何使用双向链表

  1. 持有锁的线程在释放锁,唤醒下一个线程时,是从前向后(head -> tail)逐个唤醒的。

  2. 剔除异常状态的node时,是从后向前(head <- tail)剔除的。

如果上面两个场景同时从同一方向一起开始工作,在对同一节点(线程)进行状态更新、判断时会存在冲突的问题;
使用双向链表就是为了避免出现上述问题,减少二者之间的冲突;


7. 整个流程中,非公平锁竞争了几次锁

假设:当前锁被某线程持有,正在运行,一直没有释放,采用非公平锁方式
结论:一共竞争了3次锁。

  1. 第一次:lock 方法

    调用lock方法时,上来就会无理由的竞争一次锁;

    1
    2
    3
    4
    5
    6
    7
    8
    // java.util.concurrent.locks.ReentrantLock.NonfairSync#lock

    final void lock() {
    if (compareAndSetState(0, 1))
    setExclusiveOwnerThread(Thread.currentThread());
    else
    acquire(1);
    }

  2. 第二次:tryAcquire 方法

    第一步竞争失败后,进入AQS#acquire方法中,其中会调用tryAcquire方法;非公平锁的实现中,又会竞争一次锁。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    // java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
    public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
    }

    //java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire
    final boolean nonfairTryAcquire(int acquires) {
    ...
    int c = getState();
    if (c == 0) {
    if (compareAndSetState(0, acquires)) {
    ...
    return true;
    }
    }
    else if (current == getExclusiveOwnerThread()) {
    ...
    return true;
    }
    return false;
    }

  1. 第三次:acquireQueued 方法

    节点(线程)尾插法加入同步队列后,开始自旋检查,每次自旋都会试图抢占锁(tryAcquire 方法)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
    boolean interrupted = false;
    for (;;) {
    final Node p = node.predecessor();
    if (p == head && tryAcquire(arg)) {
    ...
    return interrupted;
    }
    ...
    }
    } finally {
    ...
    }
    }

8. Reference