Netty_NioEventLoop启动和运行
2025-01-22 08:19:30    9.6k 字   
This post is also available in English and alternative languages.

上一篇 Netty_NioEventLoop创建 大致梳理了 NioEventLoop 的创建流程,已经得知 NioEventLoop 的创建包含在 NioEventLoopGroup 的创建过程中。

本篇接着继续梳理分析 NioEventLoop 的启动和运行逻辑,值得注意的是,NioEventLoop 并不会在创建完成后立即启动和运行的。

  • 网上很多资料都将创建的 NioEventLoopGroup 对象称之为: bossGroup(boss) 和 workGroup(work),这里OP不会这么称呼,而是采用源码中变量的命名,即: parentGroupchildGroup

  • 文中图片较大,懒加载,请耐心等候

  • 本文 Netty 源码解析基于 4.1.86.Final 版本。

  • 受篇幅限制,NioEventLoop 源码拆分为 Netty_NioEventLoop创建Netty_NioEventLoop启动和运行


1. NioEventLoop启动

事实上,NioEventLoop 创建/启动工作线程 的时机是在第一个任务提交时

  • 对于 parentGroup 来说,NioEventLoop 创建/启动工作线程 的时机是在服务端 ServerBootstrap 启动时,向 parentGroup 注册 NioServerSocketChannel 的时候,详阅: Netty_ServerBootstrap启动过程

  • 对于 childGroup 来说,NioEventLoop 创建/启动工作线程 的时机是在客户端通过与 parentGroup 中 EventLoop(NioServerSocketChannel) 进行交互并建立连接完成后,再通过 ServerBootstrapAcceptor 将客户端连接(socketChannel) 注册到 childGroup 的时候,详阅: Netty_ServerBootstrapAcceptor

此处以 服务端 ServerBootstrap 启动时,向 parentGroup 中注册 NioServerSocketChannel 场景为例,分析 NioEventLoop 启动和运行。

源码入口位于 AbstractChannel.AbstractUnsafe#register 方法,此方法的部分主要作用是将当前 Unsafe 对应的 Channel 注册到 EventLoop 的 Selector[1] 上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
...
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
AbstractChannel.this.eventLoop = eventLoop;

// 判断当前线程是否是 NioEventLoop 中的线程
if (eventLoop.inEventLoop()) {
// 注册
register0(promise);
} else {
try {
// 对于 NioEventLoop 启动工作线程来说,重点方法是: eventLoop.execute(...)
eventLoop.execute(new Runnable() { public void run() { register0(promise); } });
}...
}
}
...
}

注意:这里不关注 Channel 注册到 Selector[1] 上的逻辑,即不关注 register0(promise) 方法,而重点关注向 NioEventLoop 提交第一个任务触发其工作线程启动,即关注: line:14 -> eventLoop.execute 方法

eventLoop.execute 是一个重要方法,此方法来自接口 Executor,并在父类 SingleThreadEventExecutor 中被重写,但并没有直接向线程池中提交任务,要注意区分

进入 SingleThreadEventExecutor.execute 方法继续往下梳理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

public void execute(Runnable task) {
execute0(task);
}

private void execute0(@Schedule Runnable task) {
ObjectUtil.checkNotNull(task, "task");
execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}

private void execute(Runnable task, boolean immediate) {
// 判断当前执行线程是否为EventLoop内部的工作线程
boolean inEventLoop = inEventLoop();
// 将任务添加到内部队列
addTask(task);
// 如果不是 EventLoop 工作线程提交的task,则判断工作线程是否已经启动,没有则启动工作线程
if (!inEventLoop) {
// 启动工作线程
startThread();
...
}
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}

private void startThread() {
if (state == ST_NOT_STARTED) {
// CAS操作,以非阻塞的线程安全方式更新
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
// 启动工作线程
doStartThread();
success = true;
} finally {...}
}
}
}
}
  • line: 9 -> !(task instanceof LazyRunnable) && wakesUpForTask(task),判断任务是否要立即执行。
    !(task instanceof LazyRunnable):只要提交的任务不是 LazyRunnable 类型,返回true,是需要立即执行的。
    wakesUpForTask(task):固定返回 true。
    综上,只要提交的任务不是 LazyRunnable 类型,都是需要立即执行的,在 遗留问题(点击跳转) 小节还会有分析。

  • line: 16 -> 将任务添加到 NioEventLoop 的 taskQueue 普通任务队列 中。

  • line: 35 -> 准备启动 NioEventLoop 中的工作线程。


doStartThread 方法看起来很长,简单来说就是向 NioEventLoop 内部的 ThreadPerTaskExecutor 提交一个线程, 而这个线程就是 NioEventLoop 内部的工作线程。

注意:↓↓↓↓↓第6行向线程池中提交的 Runnable,就是 NioEventLoop 中的工作线程。↓↓↓↓↓

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
...
private void doStartThread() {
assert thread == null;
// 对于 NioEventLoop,这个 executor 就是 ThreadPerTaskExecutor
executor.execute(new Runnable() { // <- 这个线程就是 NioEventLoop 内部的工作线程
@Override
public void run() {
// 这个线程设置为 NioEventLoop 的工作线程
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
// 重要! 执行 SingleThreadEventExecutor 的 run() 方法,它在 NioEventLoop 中实现了
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) { ... }
}
});
}
...
}
  • line: 6 -> executor 是在 MultithreadEventExecutorGroup 中创建的 ThreadPerTaskExecutor , 详阅:Netty_NioEventLoop创建 中「创建线程池」一节。

  • line: 6 -> 向 ThreadPerTaskExecutor 中提交的的这个 Runnable 线程,就是 NioEventLoop 内部的工作线程

  • line: 8 ~ line: 20 -> NioEventLoop 工作线程中的核心逻辑。

SingleThreadEventExecutor.this.run(); 方法是 NioEventLoop 工作线程中的核心逻辑,此方法是 SingleThreadEventExecutor 类中的抽象方法,NioEventLoop 中进行了实现,因此实际调用的是 NioEventLoop#run 方法。


2. NioEventLoop运行

通过 NioEventLoop启动(点击跳转) 小节中的分析已经知道 NioEventLoop#run 方法是 NioEventLoop 工作线程的核心逻辑,被包在 Runnable 中,放入 ThreadPerTaskExecutor 线程池运行的,并且这个 Runnable 就是 NioEventLoop 中的工作线程。

下面看看,这个 NioEventLoop#run 方法中做了些什么。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public final class NioEventLoop extends SingleThreadEventLoop {
...
protected void run() {
int selectCnt = 0;
// 死循环
for (;;) {
try {
int strategy;
try {
// 确定 select 处理策略,用于控制 select 循环行为,包含 CONTINUE、SELECT、BUSY_WAIT 三种策略,
// Netty不支持 BUSY_WAIT。
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:
// 任务队列为空的时候,会执行本逻辑

// 下一次定时任务触发截止时间
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
...
try {
if (!hasTasks()) { // 再次判断是否有任务
strategy = select(curDeadlineNanos); // 轮询 I/O 事件(轮训就绪的 channel)
}
} finally {
nextWakeupNanos.lazySet(AWAKE); // 阻止不必要的唤醒
}
// fall through
default:
}
} catch (IOException e) { ... }

// 执行到这里说明满足了唤醒条件,EventLoop 线程从 Selector 上被唤醒开始处理 IO就绪事件 和 执行异步任务。

// 轮训次数++,用来解决jdk空轮训bug
selectCnt++;
// ioRatio参数用于控制I/O事件处理和内部任务处理的时间比例,
// 默认为50,一半时间用来处理io事件,一半时间用来处理任务
final int ioRatio = this.ioRatio;
boolean ranTasks;
// 根据 ioRatio,选择 执行IO操作还是内部队列中的任务
if (ioRatio == 100) {
try {
if (strategy > 0) {
processSelectedKeys(); // I/O操作,根据 selectedKey 进行出炉
}
} finally {
ranTasks = runAllTasks(); // 执行完所有任务
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
// I/O操作,根据selectedKey进行出炉
processSelectedKeys();
} finally {
// 按照一定比例执行任务,可能会遗留一部分任务等待下次执行
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
ranTasks = runAllTasks(0);
}

if (ranTasks || strategy > 0) {
...
} else if (unexpectedSelectorWakeup(selectCnt)) {
// unexpectedSelectorWakeup 方法用于解决JDK的epoll空轮询问题
selectCnt = 0;
}
} catch (CancelledKeyException e) { ... }
...
}
}
}

上面这个代码有点多,稍微拆解一下:

  • line: 6 -> 线程中运行的是死循环逻辑, 只要不出现异常就会一直运行下去。

  • line: 9 ~ line: 33 -> 这块逻辑中主要做一件事: Selector[1] 轮询注册其上 Channel 感兴趣的 I/O就绪事件 & 获取轮询策略,下面单独分析。

  • line: 38 ~ line: 72 -> 这块逻辑中主要做两件事:处理I/O就绪事件执行异步任务,下面单独分析。


2.1. 轮询IO就绪事件&轮询策略

如下提取的部分源码中,可以看出 NioEventLoop 内的工作线程主要做了下面两个事情:

  • line: 7 -> 结合异步任务队列,计算轮询策略结果。
  • line: 22 -> Selector[1] 轮询 I/O就绪事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public final class NioEventLoop extends SingleThreadEventLoop {
...
protected void run() {
...
for (;;) {
...
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE: // -2
continue;
case SelectStrategy.BUSY_WAIT: // -3, NioEventLoop不支持, 跳转到 SelectStrategy.SELECT 策略
case SelectStrategy.SELECT: // -1, 任务队列为空的时候, 会执行本逻辑

// 下一次定时任务触发截止时间
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE;
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) { // 再次判断是否有任务
strategy = select(curDeadlineNanos); // 轮询 I/O 事件(轮训就绪的 channel)
}
} finally {
nextWakeupNanos.lazySet(AWAKE); // 阻止不必要的唤醒
}
default:
}
...
}
...
}
}

2.1.1. 获取轮询策略

NioEventLoop 内的工作线程最重要的事情之一就是:通过 Selector[1] 轮询注册其上的 Channel 感兴趣的 I/O就绪事件

对于 NioServerSocketChannel 来说,因为它主要负责接收客户端连接所以监听的是 OP_ACCEPT 事件。
对于 NioSocketChannel 来说,因为它主要负责处理连接上的读写事件所以监听的是 OP_READOP_WRITE 事件。

selectStrategy 则用于控制轮询策略。当存在异步任务时,会优先执行 I/O就绪事件,再执行异步任务。
selectStrateg 是在 NioEventLoopGroup 的构造函数中创建的,默认实现为 DefaultSelectStrategyFactory.INSTANCE,详阅:Netty_NioEventLoop创建 中「NioEventLoopGroup的构造函数」一节。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public final class NioEventLoop extends SingleThreadEventLoop {
...
protected void run() {
...
for (;;) {
...
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE: // -2
...
case SelectStrategy.BUSY_WAIT: // -3, NioEventLoop不支持, 跳转到 SelectStrategy.SELECT 策略
case SelectStrategy.SELECT: // -1, 任务队列为空的时候, 会执行本逻辑
...
default:
}
...
}
...
}
}

Netty 中定义了三种轮询策略(io.netty.channel.SelectStrategy):

  • SelectStrategy.SELECT(-1):此时没有任何异步任务需要执行,NioEventLoop 工作线程可以安心的阻塞在 Selector[1] 上等待 I/O就绪事件 的来临。

  • SelectStrategy.CONTINUE(-2):重新开启一轮 IO轮询。

  • SelectStrategy.BUSY_WAIT(-3): NioEventLoop 内的工作线程要进行自旋轮询,由于 NIO 不支持自旋操作,所以会直接跳转到 SelectStrategy.SELECT 策略中。


在分析 selectStrategy 计算轮询策略之前首先判断当前是否有异步任务需要执行,即:hasTasks() 方法,判断依据就是查看 taskQueue 普通任务队列tailTask 尾部队列 中是否有任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
...
protected boolean hasTasks() {
return super.hasTasks() || !tailTasks.isEmpty();
}
...
}

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
...
protected boolean hasTasks() {
assert inEventLoop();
return !taskQueue.isEmpty();
}
...
}

接着根据 hasTasks() 方法结果,计算轮询策略:

1
2
3
4
5
6
7
8
final class DefaultSelectStrategy implements SelectStrategy {
...
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
// 如果有异步任务等待执行,则马上执行 selectNow() 非阻塞轮询一次IO就绪事件
// 没有异步任务,则跳转到 switch select 分支
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
}

selectNow() 非阻塞方法返回值表示,此时有多少 I/O就绪事件 的 Channel;等于零时表示当前没有 I/O就绪事件 的 Channel。

  • 如果 NioEventLoop 中有异步任务等待执行,
    那么 NioEventLoop 中的工作线程需要立即去执行异步任务,但在 calculateStrategy 方法返回之前要顺便调用 selectNow() 非阻塞方法,检查当前是否有 I/O就绪事件 发生。

    • 如果当前有 I/O就绪事件 ,那正好可以与异步任务一起处理, selectNow() 非阻塞方法返回值大于零;
    • 如果当前没有 I/O就绪事件 ,那就及时处理 taskQueue 普通任务队列tailTask 尾部队列 中的异步任务, selectNow() 非阻塞方法返回值等于零。

    Netty 要求 NioEventLoop 中的工作线程优先保证 I/O就绪事件 的处理,然后再及时处理异步任务。如果当前没有 I/O就绪事件 但有异步任务需要执行,那就去执行异步任务,而不是继续阻塞等待 I/O就绪事件


  • 如果当前 NioEventLoop 中没有异步任务需要执行
    那么 calculateStrategy 方法直接返回 SelectStrategy.SELECT,即:-1 。
    但当 calculateStrategy 方法通过 selectNow() 非阻塞方法返回非零数值时,表示此时有 IO就绪 的 Channel ,selectNow() 非阻塞方法的返回值表示有多少个 IO就绪 的 Channel。


简单小结下轮询策略,即 selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()) 方法的返回值含义:

  • 返回值 = -1: switch 逻辑分支进入 SelectStrategy.SELECT 分支,表示此时 NioEventLoop 中 taskQueue(普通任务队列)tailTaskQueue(尾部任务队列) 中没有异步任务需要执行,工作线程可以安心的阻塞在 Selector[1] 上等待 I/O就绪事件 发生。

  • 返回值 = 0:switch 逻辑分支进入 default 分支,表示此时没有 I/O就绪事件 但是 taskQueue(普通任务队列)tailTaskQueue(尾部任务队列) 中有异步任务需要执行。

  • 返回值 > 0:switch 逻辑分支进入 default 分支,表示此时既有 I/O就绪事件 ,在 taskQueue(普通任务队列)tailTaskQueue(尾部任务队列) 中也有异步任务需要执行。


2.1.2. 轮询IO就绪事件

上面是 获取轮询策略 的逻辑,下面梳理分析 轮询I/O就绪事件,也就是 line: 12 ~ line: 30 -> switch...SelectStrategy.SELECT 分支逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public final class NioEventLoop extends SingleThreadEventLoop {
...
protected void run() {
...
for (;;) {
...
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE: // -2
continue;
case SelectStrategy.BUSY_WAIT: // -3, NioEventLoop不支持, 跳转到 SelectStrategy.SELECT 策略
case SelectStrategy.SELECT: // -1, 当前没有异步任务执行,可以放心的阻塞等待IO就绪事件

// 从定时任务队列中取出, 即将快要执行的定时任务的 deadlineNanos(下一次定时任务触发截止时间)。
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
// -1 代表当前定时任务队列中没有定时任务
curDeadlineNanos = NONE;
}

// 最早执行定时任务的 deadlineNanos(下一次定时任务触发截止时间) 作为 select 的阻塞时间,
// 意思是到了定时任务的执行时间, 不管有无 I/O就绪事件, 必须唤醒 selector, 从而使线程执行定时任务
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) { // 再次判断是否有任务
strategy = select(curDeadlineNanos); // 没有任务, select 阻塞轮询 I/O就绪事件
}
} finally {
nextWakeupNanos.lazySet(AWAKE); // 阻止不必要的唤醒
}
default:
}
...
}
...
}
}

粗略过一遍上面的代码,发现 NioEventLoop 中还有定时任务相关逻辑,在 Netty_NioEventLoop创建 的「创建 NioEventLoop」一节中,曾经提到过。

NioEventLoop 中的异步任务分为三类:

  • 普通任务:Netty 最主要执行的异步任务,存放于 taskQueue(普通任务队列)
  • 定时任务:存放于 scheduledTaskQueue(定时任务队列/优先级队列) 中。
  • 尾部任务:存放于 tailTaskQueue(尾部任务队列) ,尾部任务不常用,定时任务和普通任务执行完后才会执行尾部任

既然 NioEventLoop 需要执行定时任务,那么它就不能一直阻塞在 Selector[1] 上等待 I/O就绪事件。为了保证 NioEventLoop 的工作线程能及时执行定时任务,因此需要在即将执行第一个定时任务触发截止时间(deadlineNanos)之前将工作线程唤醒。

所以在 NioEventLoop 工作线程开始轮询 I/O就绪事件 之前,首先要计算出工作线程在 Selector[1] 上阻塞的时间



2.1.2.1. 计算阻塞时间

从 NioEventLoop 的 scheduledTaskQueue(定时任务队列/优先级队列) 中取出即将快要执行的定时任务,定时任务中的 deadlineNanos 属性就是该任务的执行时间。将这个 deadlineNanos 作为工作线程在 Selector[1] 上阻塞轮询的超时时间,这样就可以保证在定时任务即将要执行时,工作线程可以及时从 Selector 上被唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
...
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;

protected final long nextScheduledTaskDeadlineNanos() {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
// -1 代表当前定时任务队列中没有定时任务
return scheduledTask != null ? scheduledTask.deadlineNanos() : -1;
}

final ScheduledFutureTask<?> peekScheduledTask() {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
return scheduledTaskQueue != null ? scheduledTaskQueue.peek() : null;
}
...
}

nextScheduledTaskDeadlineNanos() 方法会返回 scheduledTaskQueue(定时任务队列/优先级队列) 中最近一个定时任务的deadlineNanos 时间点,如果 scheduledTaskQueue(定时任务队列/优先级队列) 中没有定时任务,则返回 -1

NioEventLoop 中 nextWakeupNanos 属性用来存放内存工作线程从 Selector[1] 上被唤醒的时间点,如果当前 scheduledTaskQueue(定时任务队列/优先级队列) 中没有定时任务需要执行(即:-1),那就设置为 Long.MAX_VALUE 一直阻塞轮询,直到有 I/O就绪事件 到达或者 scheduledTaskQueue(定时任务队列/优先级队列) 中有异步任务需要执行。



2.1.2.2. 开始轮询

上面确定好工作线程在 Selector[1] 上阻塞的时间后,NioEventLoop 就开始轮询 I/O就绪事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public final class NioEventLoop extends SingleThreadEventLoop {
...
protected void run() {
...
for (;;) {
...
case SelectStrategy.SELECT: // -1, 当前没有异步任务执行,可以放心的阻塞等待IO就绪事件
try {
...
if (!hasTasks()) { // 再次判断是否有任务
strategy = select(curDeadlineNanos); // 没有任务, select 阻塞轮询 I/O就绪事件
}
} finally {
nextWakeupNanos.lazySet(AWAKE); // 阻止不必要的唤醒
}
...
}
...
}
}

在工作线程开始阻塞轮询 I/O就绪事件 之前还需要再次检查一下 taskQueue(普通任务队列)tailTaskQueue(尾部任务队列) 中是否有异步任务需要执行。

如果此时恰巧有异步任务提交,就需要停止 I/O就绪事件 的轮询,转去执行异步任务。如果没有异步任务,则正式开始轮询 I/O就绪事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public final class NioEventLoop extends SingleThreadEventLoop {
...
private int select(long deadlineNanos) throws IOException {
if (deadlineNanos == NONE) {
// 调用底层 NIO Selector 的 select 阻塞方法, 返回就绪 IO 事件的个数
return selector.select();
}

// 如果 deadlineNanos 小于5纳秒, 则为0,否则取整为1毫秒, 为了向上取整, 转成毫秒
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;

// selector.selectNow() 是立即返回的, 不会阻塞当前线程。
// 返回 IO就绪 Channel 的数量。
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}
...
}

如果 deadlineNanos == NONE,代表当前 NioEventLoop 的 scheduledTaskQueue(定时任务队列/优先级队列) 中没有定时任务,工作线程可以安心的阻塞在 Selector[1] 上等待 I/O就绪事件

需要注意,selector.select() 是一个阻塞方法,如果没有 I/O就绪事件 到来,工作线程就会一直阻塞在此,直到 I/O就绪事件 到来。那么问题来了,如果此时工作线程正阻塞在 selector.select() 方法上,等待 I/O就绪事件 的到来,而此时正好有异步任务被提交到 taskQueue(普通任务队列)tailTaskQueue(尾部任务队列) 中需要执行,并且此时就是没有任何 I/O就绪事件 ,那工作线程该如何去执行异步任务呢?

这个问题先 MARK,下面单独分析。

如果 deadlineNanos != NONE,表示此时 NioEventLoop 的 scheduledTaskQueue(定时任务队列/优先级队列) 中是有定时任务需要执行的,但还没到任务触发截止时间。

deadlineNanos 单位是纳秒,表示绝对时间,所以在工作线程开始阻塞在 Selector[1] 上之前,需要计算出 当前时间(current) 至 绝对时间(deadlineNanos) 之间的 时间差(timeoutMillis),并转换成毫秒。如果 deadlineNanos 小于5纳秒,则为0,否则向上取整为1毫秒,所以在有定时任务的情况下,至少要阻塞1毫秒。

deadlineToDelayNanos 方法计算出的结果 timeoutMillis <= 0 时,表示临近定时任务触发截止时间,需要立即返回不能阻塞在 Selector[1] 上。并且在返回前调用 selector.selectNow() 非阻塞方法,轮询一下当前是否有 I/O就绪事件 到达,防止耽误 I/O就绪事件 的处理。

timeoutMillis > 0 时,工作线程可以安心的阻塞在 Selector[1] 上等待 I/O就绪事件 到来,直到到达 timeoutMillis 超时时间。当有 I/O就绪事件 到达,工作线程就会被唤醒,接着工作线程就可以去处理 I/O就绪事件

NioEventLoop#select 方法最终返回的是 IO就绪 Channel 的数量。



2.1.2.3. 遗留问题

上面还遗留了一个问题,当工作线程正阻塞在 selector.select() 方法上,等待 I/O就绪事件 的到来,而此时正好有异步任务被提交到 taskQueue(普通任务队列)tailTaskQueue(尾部任务队列) 中需要执行,并且此时就是没有任何 I/O就绪事件 ,那工作线程该如何去执行异步任务呢?

如果异步任务在被提交后希望立即得到执行,那就需要「在提交异步任务的时候去唤醒此时正阻塞在 Selector[1] 上的工作线程」

NioEventLoop启动(点击跳转) 小节中梳理过 SingleThreadEventExecutor#execute 方法,例如在服务端 ServerBootstrap 启动阶段,将 NioServerSocketChannel 注册到 NioEventLoop 中的 Selector[1] 等重要操作都是封装成异步任务通过此方法提交到 NioEventLoop 中执行的,部分源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
...
private final boolean addTaskWakesUp;
...
private void execute(Runnable task, boolean immediate) {
// 判断当前执行线程是否为EventLoop内部的工作线程
boolean inEventLoop = inEventLoop();
// 将任务添加到内部队列
addTask(task);
// 如果不是 EventLoop 工作线程提交的task,则判断工作线程是否已经启动,没有则启动工作线程
if (!inEventLoop) {
// 启动工作线程
startThread();
...
}
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
}

重点在 SingleThreadEventExecutor#execute 方法的后半段,line: 16 ~ line: 18 ,wakeup 逻辑部分。

先了解下 wakeup 逻辑相关的两个参数:addTaskWakesUpimmediate

  • immediate:提交的任务是否需要被立即执行。在 Netty 中只要提交的任务不是 LazyRunnable 类型,都是需要立即执行的,即:immediate = true

  • addTaskWakesUp:唤醒工作线程的标识。
    创建 NioEventLoop 时, addTaskWakesUp 被初始化为 false。

    1
    addTaskWakesUp – true if and only if invocation of addTask(Runnable) will wake up the executor thread

    根据 SingleThreadEventExecutor 构造函数上注释的理解,如果 addTaskWakesUp 为 true,代表「仅当调用 addTask 方法时才会唤醒 NioEvnetLoop 中的工作线程,调用其他方法不会唤醒 NioEvnetLoop 中的工作线程」。
    根据这个理解进行反推,如果 addTaskWakesUp 为 false,代表「并不是只有调用 addTask 方法才能唤醒 NioEventLoop 中的工作线程,调用其他方法也可以唤醒 NioEventLoop 中的工作线程」。

    需要注意的是,在 SingleThreadEventExecutor 类中除了 execute(java.lang.Runnable, boolean) 方法,还有 shutdownGracefully 方法和 shutdown 方法都用到了 addTaskWakesUp 参数进行判断,是否要唤醒 NioEventLoop 中的工作线程。

以上,只要当前使用的是 NioEventLoop,并且提交的任务不是 LazyRunnable 类型,都会进入 wakeup 逻辑部分,即:提交的任务会立即执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
public final class NioEventLoop extends SingleThreadEventLoop {
...
private static final long AWAKE = -1L;
private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
...
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
// 将 NioEventLoop 内的工作线程 从 Selector 上唤醒。
selector.wakeup();
}
}
...
}

如果 nextWakeupNanos.getAndSet(AWAKE) == AWAKE,表示当前 NioEventLoop 内的工作线程正处于苏醒状态,因此没有必要去执行 selector.wakeup() 而重复唤醒工作线程,这样能节省一次系统调用开销。


2.2. IO事件与任务的处理分配

I/O就绪事件 无论什么时候到达,NioEventLoop 内的工作线程都需要保证 I/O就绪事件 被及时且完整的处理。为了避免工作线程因处理异步任务时间过长而导致 I/O就绪事件 不能被及时处理,使用 ioRatio 参数对执行异步任务进行限制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public final class NioEventLoop extends SingleThreadEventLoop {
...
private volatile int ioRatio = 50;

protected void run() {
for (; ; ) {
...
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
...
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
if (strategy > 0) {
// I/O操作, 根据 selectedKey 进行出炉
processSelectedKeys();
}
} finally {
// 执行完所有任务
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
// I/O操作, 根据 selectedKey 进行出炉
processSelectedKeys();
} finally {
// 按照一定比例执行任务, 可能会遗留一部分任务等待下次执行
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
// 0表示运行运行最小数量的任务, 即63个
ranTasks = runAllTasks(0);
}
...
}
...
}
}
  • line: 8 -> strategy 表示当前 IO就绪 Channel 的个数,详阅:开始轮询(点击跳转) 小节。

  • line: 10 -> ioRatio 可以通过 NioEventLoop#setIoRatio 方法进行设置。

  • line: 11 -> ioRatio == 100 :表示无需考虑异步任务执行时间的限制。
    当有 I/O就绪事件 时(即:strategy > 0),NioEventLoop 中的工作线程需要优先处理 I/O就绪事件 ,处理完 I/O就绪事件 后执行所有的异步任务(普通任务、尾部任务、定时任务)。

  • line: 21 -> strategy > 0 ,同时 ioRatio 设置的值是默认的 50。
    先执行 I/O就绪事件 ,最后统计出执行 I/O就绪事件 的耗时(ioTime),接着根据 ioTime * (100 - ioRatio) / ioRatio 计算出,后面用于执行异步任务的剩余时间。
    也就是说,NioEventLoop 的工作线程需要在剩余时间内执行有限的异步任务,防止 NioEventLoop 的工作线程处理异步任务时间过长而导致 I/O就绪事件 得不到及时处理。

    执行 I/O就绪事件 和执行异步任务的用时比例是一比一,ioRatio 设置的越高,NioEventLoop 的工作线程执行异步任务的时间占比越小。

  • line: 31 -> 没有 I/O就绪事件 需要处理
    Netty 允许 NioEventLoop 工作线程最多执行 64 个异步任务,然后结束,继续轮训 I/O就绪事件
    核心目的还是防止 NioEventLoop 工作线程处理异步任务时间过长,导致 I/O就绪事件 得不到及时处理。


2.3. 处理I/O就绪事件

轮询 Select 后,NioEventLoop 已经获取到准备就绪的 I/O 事件,接下来需要调用 processSelectedKeys() 方法处理IO事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
public final class NioEventLoop extends SingleThreadEventLoop {
...
private void processSelectedKeys() {
if (selectedKeys != null) {
// Netty优化过的 selectedKeys
processSelectedKeysOptimized();
} else {
// 普通处理逻辑
processSelectedKeysPlain(selector.selectedKeys());
}
}
...
}

Netty_NioEventLoop创建 中「优化NIO原生Selector」一节中曾经提到,出于性能考虑将 selectedKeys 集合的类型由原来的 java.util.HashSet ,替换成 Netty 自己用数组实现的 SelectedSelectionKeySet;通过 DISABLE_KEY_SET_OPTIMIZATION 开关控制是否优化,默认是需要优化的。

优化开关开启的情况下,Netty 将创建的 SelectedSelectionKeySet 集合保存在 NioEventLoop#selectedKeys 属性中,方便 NioEventLoop 工作线程直接从这里获取 I/O就绪事件 的 SelectionKey。

优化开关关闭的情况下,Netty 会直接采用默认实现,此时 NioEventLoop#selectedKeys 属性为 NULL。

  • line: 6 -> processSelectedKeysOptimized 方法中处理的是 Netty 优化过的类型 SelectedSelectionKeySet

  • line: 9 -> processSelectedKeysPlain 中处理的是默认实现。



2.3.1. 处理未优化selectedKeys

当注册在 java.nio.channels.Selector[1] 上的 Channel 发生 I/O就绪事件 时,java.nio.channels.Selector 会将 I/O就绪事件 的 SelectionKey 保存到 sun.nio.ch.SelectorImpl#selectedKeys 集合中。接着 NioEventLoop 工作线程会从 java.nio.channels.Selector#select(long) 阻塞调用中返回。

因此在调用 processSelectedKeysPlain 方法前先要通过 java.nio.channels.Selector#selectedKeys 方法获取 I/O就绪事件 的 SelectionKey 集合。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
processSelectedKeysPlain(selector.selectedKeys());

private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
if (selectedKeys.isEmpty()) {
return;
}

// 循环所有的 selectedKeys
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
// 移除处理完的SelectionKey,防止重复处理
i.remove();

if (a instanceof AbstractNioChannel) {
// I/O 事件由 Netty 负责处理
processSelectedKey(k, (AbstractNioChannel) a);
} else {
// 用户自定义任务,一般情况下不会执行
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}

if (!i.hasNext()) {
break;
}

// Netty 在处理I/O事件时,如果发现channel数量超过256个,会将 Channel 从 Selector 对象中移除,
// 然后将 needsToSelectAgain 置 true,重新做一次轮询,从而确保 keySet 的有效性.
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}
  • line: 3 -> selectedKeys 集合中存放的全部是 I/O就绪事件java.nio.channels.SelectionKey


  • line: 9 ~ line: 10 -> 通过获取 HashSet 的迭代器,逐个处理 I/O就绪事件 的 Channel。


  • line: 12 -> final Object a = k.attachment(); 中存放的是什么?
    Netty_ServerBootstrap启动过程 中「doRegister」一节中梳理过,通过调用 JDK 底层 NIO API java.nio.channels.SelectableChannel#register,将 NioServerSocketChannel 中包装的 JDK NIO 原生 ServerSocketChannel 注册到 Selector[1] 上。以此为例,在调用 register 方法时,通过 this 指针将 NioServerSocketChannel 自己作为 SelectionKey 的 attachment 属性注册到 Selector[1] 中。「这一步完成了 Netty 自定义 Channel 和 JDK NIO 原生 Channel 的绑定」。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    // io.netty.channel.nio.AbstractNioChannel#doRegister
    protected void doRegister() throws Exception {
    // 表示注册操作是否成功
    boolean selected = false;
    for (;;) {
    try {
    selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
    return;
    } catch (CancelledKeyException e) {...}
    }
    }

    对于客户端连接事件(OP_ACCEPT)活跃时,attachment 属性类型是 NioServerSocketChannel;
    对于客户端读写事件(OP_READ、OP_WRITE)活跃时,attachment 属性类型是 NioSocketChannel;


  • line : 14 -> i.remove(); 为什么要删除?
    通过 k.attachment() 获取到 Netty 自定义 Channel 后,需要将当前的 SelectionKey 删除,「因为 Selector[1] 自己并不会主动删除已经处理完的 SelectionKey」
    这样当这个 Channel 下次再有 I/O就绪事件 时,Selector[1] 会再次将这个 Channel 对应的 SelectionKey 放入集合中。


  • line: 16 ~ line : 24 -> AbstractNioChannel 和 NioTask
    一种是熟悉的 AbstractNioChannel,不论是服务端使用的 NioServerSocketChannel 还是客户端使用的 NioSocketChannel 都继承自 AbstractNioChannel。
    另一种是 NioTask,这种类型是 Netty 提供给用的,用户可以自定义逻辑,当 Channel 上发生 I/O就绪事件 时执行。
    重点关注 AbstractNioChannel 类型的处理逻辑即可。


  • line: 18 -> processSelectedKey

    首先拿到了 Channel 对应的 unsafe,这是 Netty 中专门用于处理 Channel 底层操作的组件。然后根据收到的事件类型对 Channel 进行不同的处理。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    public final class NioEventLoop extends SingleThreadEventLoop {
    ...
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    // 获取 channel 的内部辅助类 Unsafe,通过 Unsafe 进行IO事件处理
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if (!k.isValid()) { // 检查 Key 是否合法 (检查连接是否有效)
    // ... 已失效则关闭对应 Channel
    }

    try {
    // 获取IO就绪事件
    int readyOps = k.readyOps();

    // 处理 OP_CONNECT 事件
    if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
    // 将该事件从事件集合中清除,避免事件集合中一直存在连接建立事件
    int ops = k.interestOps();
    ops &= ~SelectionKey.OP_CONNECT;
    k.interestOps(ops);
    //触发 channelActive事件 处理 Connect事件
    unsafe.finishConnect();
    }

    // 处理 WRITE事件 (优先处理,以释放内存)
    if ((readyOps & SelectionKey.OP_WRITE) != 0) {
    ch.unsafe().forceFlush();
    }

    //处理 Read事件 或者 Accept事件
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
    }
    } catch (CancelledKeyException ignored) { ... }
    }
    ...
    }

    对于 parentGroup 来说,轮询到的都是 OP_ACCEPT 事件;对于 childGroup 来说,轮询到的都是 I/O 读写事件。

    OP_CONNECT

    EventLoop 内部调用了 unsafe.finishConnect(),底层调用pipeline().fireChannelActive() 方法,这时会产生一个 Inbound 事件,在Pipeline中传播,依次调用 ChannelHandler 的 channelActive() 方法,通知各个 ChannelHandler 连接建立成功;


    OP_WRITE

    内部会执行 ch.unsafe().forceFlush() 操作,将数据刷到对端,最终会调用Java NIO中 Channel .write() 方法执行底层写操作;


    OP_READ

    Netty 将 OP_READ 和 OP_ACCEPT 事件进行了统一封装,都通过 unsafe.read() 进行处理。

    • NioServerSocketChannel ,unsafe.read() 由子类 AbstractNioMessageChannel.NioMessageUnsafe#read 实现。
    • NioSocketChannel ,unsafe.read() 由子类 AbstractNioByteChannel.NioByteUnsafe#read 实现。



2.3.2. 处理优化的selectedKeys

再看下 Netty 优化过 selectedKeys 是如何处理的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public final class NioEventLoop extends SingleThreadEventLoop {
...
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {

// 取出 SelectionKey
final SelectionKey k = selectedKeys.keys[i];
// 快速释放,便于GC
selectedKeys.keys[i] = null;
// 从 attachment 获得 netty 的自定义 Channel
final Object a = k.attachment();

// 处理 Channel,IO事件
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}

// 判断是否该再来次轮询
if (needsToSelectAgain) {
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
...
}

processSelectedKey 方法在 处理未优化selectedKeys 小节中分析过,此处不再赘述。

processSelectedKeysOptimized 方法与 processSelectedKeysPlain 方法最主要的区别是遍历方式不同,这是因为集合类型不同导致的。



2.3.3. 关于selectedKeys的优化

Netty 在创建 Selector[1] 时会用自定义的 SelectedSelectionKeySet 类型替换 Selector 原本内部的 selectedKeys 和 publicSelectedKeys,从 sun.nio.ch.SelectorImpl 中可以看到这两个属性原本的类型其实是 java.util.HashSet

详阅:Netty_NioEventLoop创建 中「优化NIO原生Selector」一节。



2.3.4. 关于无效key的优化

processSelectedKey 方法运行完成, 处理 Channel 的IO事件也就处理完了,但 Netty 还在此基础上做了优化,做了什么优化呢?

就是在 Netty 拿到一批 key 在进行处理的周期内,如果用户同时也在进行 Channel 的移除操作,这就意味着 Netty 拿到的这一批 key 里面,有些可能已经被用户丢掉了。

Netty 去处理这些无效的 key 会浪费很多资源。所以 Netty 会在用户移除一定数量的 Channel 之后,丢掉原有的 key,重新 select() 一批出来,以保证 key 的有效性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public final class NioEventLoop extends SingleThreadEventLoop {
...
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
...
// 判断是否该再来次轮询
if (needsToSelectAgain) {
// 清理未处理的 key
selectedKeys.reset(i + 1);
// 重新轮询
selectAgain();
i = -1;
}
}
}
}

清除 key 的逻辑很简单,就是把数组清空,而重新轮询就是调用一次 selectNow。


2.4. 执行异步任务

接下来 NioEventLoop 就要执行异步任务,上面IO事件处理完成后,即 io.netty.channel.nio.NioEventLoop#processSelectedKeys 方法运行完成后,最终都会处理 taskQueue 中的异步任务。

NioEventLoop#run 方法中关于异步任务的处理有两个:

  • runAllTasks() 方法,无超时时间限制。当 ioRatio 设置为 100 时,NioEventLoop 的工作线程会先执行 I/O就绪事件 ,然后再执行所有的异步任务,没有时间限制。

  • runAllTasks(long timeoutNanos) 方法,有超时时间限制。当 ioRatio != 100 时,NioEventLoop 的工作线程执行异步时间会有时间限制,在限定时间范围内,执行有限的异步任务。

这两个处理方法的核心逻辑可以概括为三步:

  1. 合并 taskQueue(普通任务队列)scheduledTaskQueue(定时任务队列/优先级队列)
  2. 统一从 taskQueue(普通任务队列) 中取出任务并执行
  3. 执行 tailTaskQueue(尾部任务队列) 中的任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
...
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;

do {
//将到达执行时间的定时任务转存到普通任务队列taskQueue中,统一由工作线程从taskQueue中取出执行
fetchedAll = fetchFromScheduledTaskQueue();
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll);

if (ranAtLeastOne) {
lastExecutionTime = getCurrentTimeNanos();
}

//执行尾部任务队列
afterRunningAllTasks();
return ranAtLeastOne;
}
...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
...
protected boolean runAllTasks(long timeoutNanos) {
// 将 定时任务 合并到 普通任务队列中
fetchFromScheduledTaskQueue();

// 处理任务
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}

// 计算 runAllTasks 的超时时间
final long deadline = timeoutNanos > 0 ? getCurrentTimeNanos() + timeoutNanos : 0;
long runTasks = 0;
long lastExecutionTime;
// 死循环执行
for (;;) {
// 执行任务
safeExecute(task);
runTasks ++;
// 每执行64个任务就检查下是否超时,超时后不再继续执行
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = getCurrentTimeNanos();
if (lastExecutionTime >= deadline) {
break;
}
}
// 从普通任务队列中取出下一个任务
task = pollTask();
if (task == null) {
lastExecutionTime = getCurrentTimeNanos();
break;
}
}

afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
...
}

2.4.1. 合并

合并定时任务到普通任务队列,这一步是调用了 fetchFromScheduledTaskQueue() 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private boolean fetchFromScheduledTaskQueue() {
if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
return true;
}
long nanoTime = getCurrentTimeNanos();
for (;;) {
// 从 定时任务队列 中取出执行时间小于等于当前时间的任务
Runnable scheduledTask = pollScheduledTask(nanoTime);
if (scheduledTask == null) {
return true;
}
if (!taskQueue.offer(scheduledTask)) {
// 任务队列中没有剩余空间,说明普通任务队列满了添加失败,
// 把定时任务放回 scheduletaskqueue 以便再次提取它,然后退出
scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
}
}

protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();

// 比较定时任务的 deadlineNanos 和当前时间,如果小于等于就取出,否则返回 null
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {
return null;
}
scheduledTaskQueue.remove();
scheduledTask.setConsumed();
return scheduledTask;
}

可以看到每次取出时会比较定时任务的 deadlineNanos 和当前时间,如果小于等于就取出,否则返回 null。

由于定时任务是按执行时间排列的优先级队列,所以每次取出都是当前队列中执行时间最小的任务,如果某次取出的任务的执行时间大于当前时间,则说明队列中剩下的任务的执行时间都大于当前时间,合并操作就可以结束了。


2.5. 完整流程图

NioEventLoop的启动和运行(新标签页中打开可放大图片)

3. 小结

  • NioEventLoop 是一个基于 JDK NIO 的异步事件循环类,它负责处理一个 Channel 在它的生命周期内的所有事件

  • NioEventLoop 和 Channel 是一对多的关系,一个 NioEventLoop 对应多个 Channel,通过 I/O 多路复用来管理多个 Channel

  • NioEventLoop 任务队列是一个多生产者单消费者的队列,所有提交到 NioEventLoop 任务队列中的任务会被以 FIFO 的方式消费

  • NioEventLoop 的整个生命周期只会依赖于一个单一的线程来完成。

  • NioEventLoop 内部是一个 for 循环,不间断执行以下三件事情:

    • select:轮询 Selector 选择器中已经注册的所有 Channel 的 I/O 事件;

    • processSelectedKeys:根据 SelectedKeys,处理已经准备就绪的 I/O 事件;

    • runAllTasks:执行内部队列中的任务。

NioEventLoop_run方法循环执行的三件事(图片来自网络)

NIO网络编程模型的核心如下图,一个工作线程负责一个Selector,Selector[1] 负责轮询其管理的 Channel。

回过头来看,NioEventLoop 其中封装了下图中的 Thread、Selector[1]、Channel。

NIO网络编程模型(图片来自网络)

EventLoop、EventLoopGroup、Channel 之间的关系如下图:

EventLoopGroups_EventLoop_Channel关系(图片来自网络)

4. Reference



  1. 1.java.nio.channels.Selector(多路复用器) 可以看成是操作系统底层中 select/epoll/poll 多路复用函数的Java包装类。只要 Selector 监听的任一一个 Channel 上有事件发生,就可以通过 Selector.select() 方法监测到,并且使用 SelectedKeys 可以得到对应的 Channel,然后对它们执行相应的I/O操作。