上一篇 Netty_NioEventLoop创建 大致梳理了 NioEventLoop 的创建流程,已经得知 NioEventLoop 的创建包含在 NioEventLoopGroup 的创建过程中。
本篇接着继续梳理分析 NioEventLoop 的启动和运行逻辑,值得注意的是,NioEventLoop 并不会在创建完成后立即启动和运行的。
网上很多资料都将创建的 NioEventLoopGroup 对象称之为: bossGroup(boss) 和 workGroup(work),这里OP不会这么称呼,而是采用源码中变量的命名,即: parentGroup和 childGroup。
文中图片较大,懒加载,请耐心等候。
本文 Netty 源码解析基于 4.1.86.Final 版本。
受篇幅限制,NioEventLoop 源码拆分为 Netty_NioEventLoop创建 和 Netty_NioEventLoop启动和运行
1. NioEventLoop启动
事实上,NioEventLoop 创建/启动工作线程 的时机是在第一个任务提交时。
对于 parentGroup 来说,NioEventLoop 创建/启动工作线程 的时机是在服务端 ServerBootstrap 启动时,向 parentGroup 注册 NioServerSocketChannel 的时候,详阅: Netty_ServerBootstrap启动过程 。
对于 childGroup 来说,NioEventLoop 创建/启动工作线程 的时机是在客户端通过与 parentGroup 中 EventLoop(NioServerSocketChannel) 进行交互并建立连接完成后,再通过 ServerBootstrapAcceptor 将客户端连接(socketChannel) 注册到 childGroup 的时候,详阅: Netty_ServerBootstrapAcceptor 。
此处以 服务端 ServerBootstrap 启动时,向 parentGroup 中注册 NioServerSocketChannel 场景为例,分析 NioEventLoop 启动和运行。
源码入口位于 AbstractChannel.AbstractUnsafe#register
方法,此方法的部分主要作用是将当前 Unsafe 对应的 Channel 注册到 EventLoop 的 Selector[1] 上。
1 | public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { |
注意:这里不关注 Channel 注册到 Selector[1] 上的逻辑,即不关注 register0(promise)
方法,而重点关注向 NioEventLoop 提交第一个任务触发其工作线程启动,即关注: line:14 -> eventLoop.execute
方法 。
eventLoop.execute
是一个重要方法,此方法来自接口 Executor,并在父类 SingleThreadEventExecutor 中被重写,但并没有直接向线程池中提交任务,要注意区分。
进入 SingleThreadEventExecutor.execute
方法继续往下梳理:
1 | public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { |
line: 9 ->
!(task instanceof LazyRunnable) && wakesUpForTask(task)
,判断任务是否要立即执行。!(task instanceof LazyRunnable)
:只要提交的任务不是LazyRunnable
类型,返回true,是需要立即执行的。wakesUpForTask(task)
:固定返回 true。
综上,只要提交的任务不是 LazyRunnable 类型,都是需要立即执行的,在 遗留问题(点击跳转) 小节还会有分析。line: 16 -> 将任务添加到 NioEventLoop 的 taskQueue 普通任务队列 中。
line: 35 -> 准备启动 NioEventLoop 中的工作线程。
doStartThread 方法看起来很长,简单来说就是向 NioEventLoop 内部的 ThreadPerTaskExecutor 提交一个线程, 而这个线程就是 NioEventLoop 内部的工作线程。
注意:↓↓↓↓↓第6行向线程池中提交的 Runnable,就是 NioEventLoop 中的工作线程。↓↓↓↓↓
1 | public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { |
line: 6 -> executor 是在
MultithreadEventExecutorGroup
中创建的 ThreadPerTaskExecutor , 详阅:Netty_NioEventLoop创建 中「创建线程池」一节。line: 6 -> 向 ThreadPerTaskExecutor 中提交的的这个 Runnable 线程,就是 NioEventLoop 内部的工作线程。
line: 8 ~ line: 20 -> NioEventLoop 工作线程中的核心逻辑。
SingleThreadEventExecutor.this.run();
方法是 NioEventLoop 工作线程中的核心逻辑,此方法是 SingleThreadEventExecutor 类中的抽象方法,NioEventLoop 中进行了实现,因此实际调用的是 NioEventLoop#run
方法。
2. NioEventLoop运行
通过 NioEventLoop启动(点击跳转) 小节中的分析已经知道 NioEventLoop#run
方法是 NioEventLoop 工作线程的核心逻辑,被包在 Runnable 中,放入 ThreadPerTaskExecutor 线程池运行的,并且这个 Runnable 就是 NioEventLoop 中的工作线程。
下面看看,这个 NioEventLoop#run
方法中做了些什么。
1 | public final class NioEventLoop extends SingleThreadEventLoop { |
上面这个代码有点多,稍微拆解一下:
line: 6 -> 线程中运行的是死循环逻辑, 只要不出现异常就会一直运行下去。
line: 9 ~ line: 33 -> 这块逻辑中主要做一件事: Selector[1] 轮询注册其上 Channel 感兴趣的 I/O就绪事件 & 获取轮询策略,下面单独分析。
line: 38 ~ line: 72 -> 这块逻辑中主要做两件事:处理I/O就绪事件 和 执行异步任务,下面单独分析。
2.1. 轮询IO就绪事件&轮询策略
如下提取的部分源码中,可以看出 NioEventLoop 内的工作线程主要做了下面两个事情:
- line: 7 -> 结合异步任务队列,计算轮询策略结果。
- line: 22 -> Selector[1] 轮询 I/O就绪事件。
1 | public final class NioEventLoop extends SingleThreadEventLoop { |
2.1.1. 获取轮询策略
NioEventLoop 内的工作线程最重要的事情之一就是:通过 Selector[1] 轮询注册其上的 Channel 感兴趣的 I/O就绪事件。
对于 NioServerSocketChannel 来说,因为它主要负责接收客户端连接所以监听的是
OP_ACCEPT
事件。
对于 NioSocketChannel 来说,因为它主要负责处理连接上的读写事件所以监听的是OP_READ
和OP_WRITE
事件。
selectStrategy 则用于控制轮询策略。当存在异步任务时,会优先执行 I/O就绪事件,再执行异步任务。
selectStrateg 是在 NioEventLoopGroup 的构造函数中创建的,默认实现为 DefaultSelectStrategyFactory.INSTANCE
,详阅:Netty_NioEventLoop创建 中「NioEventLoopGroup的构造函数」一节。
1 | public final class NioEventLoop extends SingleThreadEventLoop { |
Netty 中定义了三种轮询策略(io.netty.channel.SelectStrategy
):
SelectStrategy.SELECT
(-1):此时没有任何异步任务需要执行,NioEventLoop 工作线程可以安心的阻塞在 Selector[1] 上等待 I/O就绪事件 的来临。SelectStrategy.CONTINUE
(-2):重新开启一轮 IO轮询。SelectStrategy.BUSY_WAIT
(-3): NioEventLoop 内的工作线程要进行自旋轮询,由于 NIO 不支持自旋操作,所以会直接跳转到 SelectStrategy.SELECT 策略中。
在分析 selectStrategy 计算轮询策略之前首先判断当前是否有异步任务需要执行,即:hasTasks()
方法,判断依据就是查看 taskQueue 普通任务队列 和 tailTask 尾部队列 中是否有任务。
1 | public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop { |
接着根据 hasTasks()
方法结果,计算轮询策略:
1 | final class DefaultSelectStrategy implements SelectStrategy { |
selectNow()
非阻塞方法返回值表示,此时有多少 I/O就绪事件 的 Channel;等于零时表示当前没有 I/O就绪事件 的 Channel。
如果 NioEventLoop 中有异步任务等待执行,
那么 NioEventLoop 中的工作线程需要立即去执行异步任务,但在calculateStrategy
方法返回之前要顺便调用selectNow()
非阻塞方法,检查当前是否有 I/O就绪事件 发生。- 如果当前有 I/O就绪事件 ,那正好可以与异步任务一起处理,
selectNow()
非阻塞方法返回值大于零; - 如果当前没有 I/O就绪事件 ,那就及时处理 taskQueue 普通任务队列 和 tailTask 尾部队列 中的异步任务,
selectNow()
非阻塞方法返回值等于零。
Netty 要求 NioEventLoop 中的工作线程优先保证 I/O就绪事件 的处理,然后再及时处理异步任务。如果当前没有 I/O就绪事件 但有异步任务需要执行,那就去执行异步任务,而不是继续阻塞等待 I/O就绪事件 。
- 如果当前有 I/O就绪事件 ,那正好可以与异步任务一起处理,
如果当前 NioEventLoop 中没有异步任务需要执行
那么calculateStrategy
方法直接返回SelectStrategy.SELECT
,即:-1 。
但当calculateStrategy
方法通过selectNow()
非阻塞方法返回非零数值时,表示此时有 IO就绪 的 Channel ,selectNow()
非阻塞方法的返回值表示有多少个 IO就绪 的 Channel。
简单小结下轮询策略,即 selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())
方法的返回值含义:
返回值 = -1: switch 逻辑分支进入
SelectStrategy.SELECT
分支,表示此时 NioEventLoop 中 taskQueue(普通任务队列) 和 tailTaskQueue(尾部任务队列) 中没有异步任务需要执行,工作线程可以安心的阻塞在 Selector[1] 上等待 I/O就绪事件 发生。返回值 = 0:switch 逻辑分支进入
default
分支,表示此时没有 I/O就绪事件 但是 taskQueue(普通任务队列) 和 tailTaskQueue(尾部任务队列) 中有异步任务需要执行。返回值 > 0:switch 逻辑分支进入
default
分支,表示此时既有 I/O就绪事件 ,在 taskQueue(普通任务队列) 和 tailTaskQueue(尾部任务队列) 中也有异步任务需要执行。
2.1.2. 轮询IO就绪事件
上面是 获取轮询策略 的逻辑,下面梳理分析 轮询I/O就绪事件,也就是 line: 12 ~ line: 30 -> switch...SelectStrategy.SELECT
分支逻辑。
1 | public final class NioEventLoop extends SingleThreadEventLoop { |
粗略过一遍上面的代码,发现 NioEventLoop 中还有定时任务相关逻辑,在 Netty_NioEventLoop创建 的「创建 NioEventLoop」一节中,曾经提到过。
NioEventLoop 中的异步任务分为三类:
- 普通任务:Netty 最主要执行的异步任务,存放于 taskQueue(普通任务队列)。
- 定时任务:存放于 scheduledTaskQueue(定时任务队列/优先级队列) 中。
- 尾部任务:存放于 tailTaskQueue(尾部任务队列) ,尾部任务不常用,定时任务和普通任务执行完后才会执行尾部任
既然 NioEventLoop 需要执行定时任务,那么它就不能一直阻塞在 Selector[1] 上等待 I/O就绪事件。为了保证 NioEventLoop 的工作线程能及时执行定时任务,因此需要在即将执行第一个定时任务触发截止时间(deadlineNanos
)之前将工作线程唤醒。
所以在 NioEventLoop 工作线程开始轮询 I/O就绪事件 之前,首先要计算出工作线程在 Selector[1] 上阻塞的时间。
2.1.2.1. 计算阻塞时间
从 NioEventLoop 的 scheduledTaskQueue(定时任务队列/优先级队列) 中取出即将快要执行的定时任务,定时任务中的 deadlineNanos
属性就是该任务的执行时间。将这个 deadlineNanos
作为工作线程在 Selector[1] 上阻塞轮询的超时时间,这样就可以保证在定时任务即将要执行时,工作线程可以及时从 Selector 上被唤醒。
1 | public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor { |
nextScheduledTaskDeadlineNanos()
方法会返回 scheduledTaskQueue(定时任务队列/优先级队列) 中最近一个定时任务的deadlineNanos
时间点,如果 scheduledTaskQueue(定时任务队列/优先级队列) 中没有定时任务,则返回 -1
。
NioEventLoop 中 nextWakeupNanos
属性用来存放内存工作线程从 Selector[1] 上被唤醒的时间点,如果当前 scheduledTaskQueue(定时任务队列/优先级队列) 中没有定时任务需要执行(即:-1
),那就设置为 Long.MAX_VALUE
一直阻塞轮询,直到有 I/O就绪事件 到达或者 scheduledTaskQueue(定时任务队列/优先级队列) 中有异步任务需要执行。
2.1.2.2. 开始轮询
上面确定好工作线程在 Selector[1] 上阻塞的时间后,NioEventLoop 就开始轮询 I/O就绪事件。
1 | public final class NioEventLoop extends SingleThreadEventLoop { |
在工作线程开始阻塞轮询 I/O就绪事件 之前还需要再次检查一下 taskQueue(普通任务队列) 和 tailTaskQueue(尾部任务队列) 中是否有异步任务需要执行。
如果此时恰巧有异步任务提交,就需要停止 I/O就绪事件 的轮询,转去执行异步任务。如果没有异步任务,则正式开始轮询 I/O就绪事件 。
1 | public final class NioEventLoop extends SingleThreadEventLoop { |
如果 deadlineNanos == NONE
,代表当前 NioEventLoop 的 scheduledTaskQueue(定时任务队列/优先级队列) 中没有定时任务,工作线程可以安心的阻塞在 Selector[1] 上等待 I/O就绪事件 。
需要注意,
selector.select()
是一个阻塞方法,如果没有 I/O就绪事件 到来,工作线程就会一直阻塞在此,直到 I/O就绪事件 到来。那么问题来了,如果此时工作线程正阻塞在selector.select()
方法上,等待 I/O就绪事件 的到来,而此时正好有异步任务被提交到 taskQueue(普通任务队列) 或 tailTaskQueue(尾部任务队列) 中需要执行,并且此时就是没有任何 I/O就绪事件 ,那工作线程该如何去执行异步任务呢?这个问题先 MARK,下面单独分析。
如果 deadlineNanos != NONE
,表示此时 NioEventLoop 的 scheduledTaskQueue(定时任务队列/优先级队列) 中是有定时任务需要执行的,但还没到任务触发截止时间。
deadlineNanos
单位是纳秒,表示绝对时间,所以在工作线程开始阻塞在 Selector[1] 上之前,需要计算出 当前时间(current
) 至 绝对时间(deadlineNanos
) 之间的 时间差(timeoutMillis
),并转换成毫秒。如果 deadlineNanos
小于5纳秒,则为0,否则向上取整为1毫秒,所以在有定时任务的情况下,至少要阻塞1毫秒。
当 deadlineToDelayNanos
方法计算出的结果 timeoutMillis <= 0
时,表示临近定时任务触发截止时间,需要立即返回不能阻塞在 Selector[1] 上。并且在返回前调用 selector.selectNow()
非阻塞方法,轮询一下当前是否有 I/O就绪事件 到达,防止耽误 I/O就绪事件 的处理。
当 timeoutMillis > 0
时,工作线程可以安心的阻塞在 Selector[1] 上等待 I/O就绪事件 到来,直到到达 timeoutMillis
超时时间。当有 I/O就绪事件 到达,工作线程就会被唤醒,接着工作线程就可以去处理 I/O就绪事件 。
NioEventLoop#select
方法最终返回的是 IO就绪 Channel 的数量。
2.1.2.3. 遗留问题
上面还遗留了一个问题,当工作线程正阻塞在 selector.select()
方法上,等待 I/O就绪事件 的到来,而此时正好有异步任务被提交到 taskQueue(普通任务队列) 或 tailTaskQueue(尾部任务队列) 中需要执行,并且此时就是没有任何 I/O就绪事件 ,那工作线程该如何去执行异步任务呢?
如果异步任务在被提交后希望立即得到执行,那就需要「在提交异步任务的时候去唤醒此时正阻塞在 Selector[1] 上的工作线程」。
在 NioEventLoop启动(点击跳转) 小节中梳理过 SingleThreadEventExecutor#execute
方法,例如在服务端 ServerBootstrap 启动阶段,将 NioServerSocketChannel 注册到 NioEventLoop 中的 Selector[1] 上 等重要操作都是封装成异步任务通过此方法提交到 NioEventLoop 中执行的,部分源码如下:
1 | public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { |
重点在 SingleThreadEventExecutor#execute
方法的后半段,line: 16 ~ line: 18 ,wakeup 逻辑部分。
先了解下 wakeup 逻辑相关的两个参数:addTaskWakesUp
和 immediate
:
immediate:提交的任务是否需要被立即执行。在 Netty 中只要提交的任务不是
LazyRunnable
类型,都是需要立即执行的,即:immediate = true
。addTaskWakesUp:唤醒工作线程的标识。
创建 NioEventLoop 时,addTaskWakesUp
被初始化为 false。1
addTaskWakesUp – true if and only if invocation of addTask(Runnable) will wake up the executor thread
根据 SingleThreadEventExecutor 构造函数上注释的理解,如果
addTaskWakesUp
为 true,代表「仅当调用addTask
方法时才会唤醒 NioEvnetLoop 中的工作线程,调用其他方法不会唤醒 NioEvnetLoop 中的工作线程」。
根据这个理解进行反推,如果addTaskWakesUp
为 false,代表「并不是只有调用addTask
方法才能唤醒 NioEventLoop 中的工作线程,调用其他方法也可以唤醒 NioEventLoop 中的工作线程」。需要注意的是,在 SingleThreadEventExecutor 类中除了 execute(java.lang.Runnable, boolean) 方法,还有 shutdownGracefully 方法和 shutdown 方法都用到了
addTaskWakesUp
参数进行判断,是否要唤醒 NioEventLoop 中的工作线程。
以上,只要当前使用的是 NioEventLoop,并且提交的任务不是 LazyRunnable
类型,都会进入 wakeup 逻辑部分,即:提交的任务会立即执行。
1 | public final class NioEventLoop extends SingleThreadEventLoop { |
如果 nextWakeupNanos.getAndSet(AWAKE) == AWAKE
,表示当前 NioEventLoop 内的工作线程正处于苏醒状态,因此没有必要去执行 selector.wakeup()
而重复唤醒工作线程,这样能节省一次系统调用开销。
2.2. IO事件与任务的处理分配
I/O就绪事件 无论什么时候到达,NioEventLoop 内的工作线程都需要保证 I/O就绪事件 被及时且完整的处理。为了避免工作线程因处理异步任务时间过长而导致 I/O就绪事件 不能被及时处理,使用 ioRatio
参数对执行异步任务进行限制。
1 | public final class NioEventLoop extends SingleThreadEventLoop { |
line: 8 ->
strategy
表示当前 IO就绪 Channel 的个数,详阅:开始轮询(点击跳转) 小节。line: 10 ->
ioRatio
可以通过NioEventLoop#setIoRatio
方法进行设置。line: 11 ->
ioRatio == 100
:表示无需考虑异步任务执行时间的限制。
当有 I/O就绪事件 时(即:strategy > 0
),NioEventLoop 中的工作线程需要优先处理 I/O就绪事件 ,处理完 I/O就绪事件 后执行所有的异步任务(普通任务、尾部任务、定时任务)。
line: 21 ->
strategy > 0
,同时ioRatio
设置的值是默认的 50。
先执行 I/O就绪事件 ,最后统计出执行 I/O就绪事件 的耗时(ioTime),接着根据ioTime * (100 - ioRatio) / ioRatio
计算出,后面用于执行异步任务的剩余时间。
也就是说,NioEventLoop 的工作线程需要在剩余时间内执行有限的异步任务,防止 NioEventLoop 的工作线程处理异步任务时间过长而导致 I/O就绪事件 得不到及时处理。执行 I/O就绪事件 和执行异步任务的用时比例是一比一,
ioRatio
设置的越高,NioEventLoop 的工作线程执行异步任务的时间占比越小。line: 31 -> 没有 I/O就绪事件 需要处理
Netty 允许 NioEventLoop 工作线程最多执行 64 个异步任务,然后结束,继续轮训 I/O就绪事件。
核心目的还是防止 NioEventLoop 工作线程处理异步任务时间过长,导致 I/O就绪事件 得不到及时处理。
2.3. 处理I/O就绪事件
轮询 Select 后,NioEventLoop 已经获取到准备就绪的 I/O 事件,接下来需要调用 processSelectedKeys()
方法处理IO事件:
1 | public final class NioEventLoop extends SingleThreadEventLoop { |
在 Netty_NioEventLoop创建 中「优化NIO原生Selector」一节中曾经提到,出于性能考虑将 selectedKeys 集合的类型由原来的 java.util.HashSet
,替换成 Netty 自己用数组实现的 SelectedSelectionKeySet
;通过 DISABLE_KEY_SET_OPTIMIZATION 开关控制是否优化,默认是需要优化的。
优化开关开启的情况下,Netty 将创建的 SelectedSelectionKeySet 集合保存在 NioEventLoop#selectedKeys
属性中,方便 NioEventLoop 工作线程直接从这里获取 I/O就绪事件 的 SelectionKey。
优化开关关闭的情况下,Netty 会直接采用默认实现,此时 NioEventLoop#selectedKeys
属性为 NULL。
line: 6 -> processSelectedKeysOptimized 方法中处理的是 Netty 优化过的类型
SelectedSelectionKeySet
;line: 9 -> processSelectedKeysPlain 中处理的是默认实现。
2.3.1. 处理未优化selectedKeys
当注册在 java.nio.channels.Selector
[1] 上的 Channel 发生 I/O就绪事件 时,java.nio.channels.Selector
会将 I/O就绪事件 的 SelectionKey 保存到 sun.nio.ch.SelectorImpl#selectedKeys
集合中。接着 NioEventLoop 工作线程会从 java.nio.channels.Selector#select(long)
阻塞调用中返回。
因此在调用 processSelectedKeysPlain 方法前先要通过 java.nio.channels.Selector#selectedKeys
方法获取 I/O就绪事件 的 SelectionKey 集合。
1 | processSelectedKeysPlain(selector.selectedKeys()); |
line: 3 ->
selectedKeys
集合中存放的全部是 I/O就绪事件 的java.nio.channels.SelectionKey
,line: 9 ~ line: 10 -> 通过获取
HashSet
的迭代器,逐个处理 I/O就绪事件 的 Channel。line: 12 ->
final Object a = k.attachment();
中存放的是什么?
在 Netty_ServerBootstrap启动过程 中「doRegister」一节中梳理过,通过调用 JDK 底层 NIO APIjava.nio.channels.SelectableChannel#register
,将 NioServerSocketChannel 中包装的 JDK NIO 原生 ServerSocketChannel 注册到 Selector[1] 上。以此为例,在调用 register 方法时,通过 this 指针将 NioServerSocketChannel 自己作为 SelectionKey 的 attachment 属性注册到 Selector[1] 中。「这一步完成了 Netty 自定义 Channel 和 JDK NIO 原生 Channel 的绑定」。1
2
3
4
5
6
7
8
9
10
11// io.netty.channel.nio.AbstractNioChannel#doRegister
protected void doRegister() throws Exception {
// 表示注册操作是否成功
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {...}
}
}对于客户端连接事件(OP_ACCEPT)活跃时,attachment 属性类型是 NioServerSocketChannel;
对于客户端读写事件(OP_READ、OP_WRITE)活跃时,attachment 属性类型是 NioSocketChannel;line : 14 ->
i.remove();
为什么要删除?
通过k.attachment()
获取到 Netty 自定义 Channel 后,需要将当前的 SelectionKey 删除,「因为 Selector[1] 自己并不会主动删除已经处理完的 SelectionKey」。
这样当这个 Channel 下次再有 I/O就绪事件 时,Selector[1] 会再次将这个 Channel 对应的 SelectionKey 放入集合中。line: 16 ~ line : 24 -> AbstractNioChannel 和 NioTask
一种是熟悉的 AbstractNioChannel,不论是服务端使用的 NioServerSocketChannel 还是客户端使用的 NioSocketChannel 都继承自 AbstractNioChannel。
另一种是 NioTask,这种类型是 Netty 提供给用的,用户可以自定义逻辑,当 Channel 上发生 I/O就绪事件 时执行。
重点关注 AbstractNioChannel 类型的处理逻辑即可。line: 18 -> processSelectedKey
首先拿到了 Channel 对应的 unsafe,这是 Netty 中专门用于处理 Channel 底层操作的组件。然后根据收到的事件类型对 Channel 进行不同的处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36public final class NioEventLoop extends SingleThreadEventLoop {
...
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
// 获取 channel 的内部辅助类 Unsafe,通过 Unsafe 进行IO事件处理
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) { // 检查 Key 是否合法 (检查连接是否有效)
// ... 已失效则关闭对应 Channel
}
try {
// 获取IO就绪事件
int readyOps = k.readyOps();
// 处理 OP_CONNECT 事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// 将该事件从事件集合中清除,避免事件集合中一直存在连接建立事件
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
//触发 channelActive事件 处理 Connect事件
unsafe.finishConnect();
}
// 处理 WRITE事件 (优先处理,以释放内存)
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
//处理 Read事件 或者 Accept事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) { ... }
}
...
}对于 parentGroup 来说,轮询到的都是 OP_ACCEPT 事件;对于 childGroup 来说,轮询到的都是 I/O 读写事件。
OP_CONNECT
EventLoop 内部调用了
unsafe.finishConnect()
,底层调用pipeline().fireChannelActive()
方法,这时会产生一个 Inbound 事件,在Pipeline中传播,依次调用 ChannelHandler 的 channelActive() 方法,通知各个 ChannelHandler 连接建立成功;OP_WRITE
内部会执行
ch.unsafe().forceFlush()
操作,将数据刷到对端,最终会调用Java NIO中Channel .write()
方法执行底层写操作;OP_READ
Netty 将 OP_READ 和 OP_ACCEPT 事件进行了统一封装,都通过
unsafe.read()
进行处理。- NioServerSocketChannel ,
unsafe.read()
由子类AbstractNioMessageChannel.NioMessageUnsafe#read
实现。 - NioSocketChannel ,
unsafe.read()
由子类AbstractNioByteChannel.NioByteUnsafe#read
实现。
- NioServerSocketChannel ,
2.3.2. 处理优化的selectedKeys
再看下 Netty 优化过 selectedKeys 是如何处理的。
1 | public final class NioEventLoop extends SingleThreadEventLoop { |
processSelectedKey 方法在 处理未优化selectedKeys 小节中分析过,此处不再赘述。
processSelectedKeysOptimized
方法与 processSelectedKeysPlain
方法最主要的区别是遍历方式不同,这是因为集合类型不同导致的。
2.3.3. 关于selectedKeys的优化
Netty 在创建 Selector[1] 时会用自定义的 SelectedSelectionKeySet 类型替换 Selector 原本内部的 selectedKeys 和 publicSelectedKeys,从 sun.nio.ch.SelectorImpl
中可以看到这两个属性原本的类型其实是 java.util.HashSet
。
详阅:Netty_NioEventLoop创建 中「优化NIO原生Selector」一节。
2.3.4. 关于无效key的优化
processSelectedKey
方法运行完成, 处理 Channel 的IO事件也就处理完了,但 Netty 还在此基础上做了优化,做了什么优化呢?
就是在 Netty 拿到一批 key 在进行处理的周期内,如果用户同时也在进行 Channel 的移除操作,这就意味着 Netty 拿到的这一批 key 里面,有些可能已经被用户丢掉了。
Netty 去处理这些无效的 key 会浪费很多资源。所以 Netty 会在用户移除一定数量的 Channel 之后,丢掉原有的 key,重新 select() 一批出来,以保证 key 的有效性。
1 | public final class NioEventLoop extends SingleThreadEventLoop { |
清除 key 的逻辑很简单,就是把数组清空,而重新轮询就是调用一次 selectNow。
2.4. 执行异步任务
接下来 NioEventLoop 就要执行异步任务,上面IO事件处理完成后,即 io.netty.channel.nio.NioEventLoop#processSelectedKeys
方法运行完成后,最终都会处理 taskQueue 中的异步任务。
在 NioEventLoop#run
方法中关于异步任务的处理有两个:
runAllTasks()
方法,无超时时间限制。当ioRatio
设置为 100 时,NioEventLoop 的工作线程会先执行 I/O就绪事件 ,然后再执行所有的异步任务,没有时间限制。runAllTasks(long timeoutNanos)
方法,有超时时间限制。当ioRatio != 100
时,NioEventLoop 的工作线程执行异步时间会有时间限制,在限定时间范围内,执行有限的异步任务。
这两个处理方法的核心逻辑可以概括为三步:
- 合并 taskQueue(普通任务队列) 和 scheduledTaskQueue(定时任务队列/优先级队列)。
- 统一从 taskQueue(普通任务队列) 中取出任务并执行
- 执行 tailTaskQueue(尾部任务队列) 中的任务。
1 | public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { |
1 | public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { |
2.4.1. 合并
合并定时任务到普通任务队列,这一步是调用了 fetchFromScheduledTaskQueue()
方法。
1 | private boolean fetchFromScheduledTaskQueue() { |
可以看到每次取出时会比较定时任务的 deadlineNanos 和当前时间,如果小于等于就取出,否则返回 null。
由于定时任务是按执行时间排列的优先级队列,所以每次取出都是当前队列中执行时间最小的任务,如果某次取出的任务的执行时间大于当前时间,则说明队列中剩下的任务的执行时间都大于当前时间,合并操作就可以结束了。
2.5. 完整流程图
3. 小结
NioEventLoop 是一个基于 JDK NIO 的异步事件循环类,它负责处理一个 Channel 在它的生命周期内的所有事件。
NioEventLoop 和 Channel 是一对多的关系,一个 NioEventLoop 对应多个 Channel,通过 I/O 多路复用来管理多个 Channel。
NioEventLoop 任务队列是一个多生产者单消费者的队列,所有提交到 NioEventLoop 任务队列中的任务会被以 FIFO 的方式消费。
NioEventLoop 的整个生命周期只会依赖于一个单一的线程来完成。
NioEventLoop 内部是一个 for 循环,不间断执行以下三件事情:
select:轮询 Selector 选择器中已经注册的所有 Channel 的 I/O 事件;
processSelectedKeys:根据 SelectedKeys,处理已经准备就绪的 I/O 事件;
runAllTasks:执行内部队列中的任务。
NIO网络编程模型的核心如下图,一个工作线程负责一个Selector,Selector[1] 负责轮询其管理的 Channel。
回过头来看,NioEventLoop 其中封装了下图中的 Thread、Selector[1]、Channel。
EventLoop、EventLoopGroup、Channel 之间的关系如下图:
4. Reference
- 聊聊Netty那些事儿之Reactor在Netty中的实现(创建篇)
- 一文聊透Netty核心引擎Reactor的运转架构
- 透彻理解Java网络编程(十一)——Netty原理:EventLoopGroup和EventLoop
- 04 事件调度层:为什么 EventLoop 是 Netty 的精髓?
- Netty 核心源码解读 —— EventLoop 篇
- Netty 源码浅析——NioEventLoop
- 1.java.nio.channels.Selector(多路复用器) 可以看成是操作系统底层中 select/epoll/poll 多路复用函数的Java包装类。只要 Selector 监听的任一一个 Channel 上有事件发生,就可以通过 Selector.select() 方法监测到,并且使用 SelectedKeys 可以得到对应的 Channel,然后对它们执行相应的I/O操作。 ↩