Netty_NioEventLoop创建
2025-01-22 08:19:30    4.8k 字   
This post is also available in English and alternative languages.

NioEventLoop 是 Netty 中的核心之一,通过阅读源码了解其 创建 -> 启动 -> 运行 的过程。

  • 网上很多资料都将创建的 NioEventLoopGroup 对象称之为: bossGroup(boss) 和 workGroup(work),这里OP不会这么称呼,而是采用源码中变量的命名,即: parentGroupchildGroup

  • 文中图片较大,懒加载,请耐心等候

  • 本文 Netty 源码解析基于 4.1.86.Final 版本。

  • 另注: 受篇幅限制,NioEventLoop 源码拆分为 Netty_NioEventLoop创建Netty_NioEventLoop启动和运行


1. 概览

EventLoop 直译的话是 事件循环 的意思,在 Netty 中的作用也就是字面意思;
每当事件发生时,会将产生的事件放入事件队列当中,然后 EventLoop 会轮询从队列中取出事件执行或者将事件分发给相应的事件监听者执行。事件执行的方式通常分为立即执行、延后执行、定期执行几种。

EventLoop通用运行模式(图片来自网络)

Netty 的 EventLoop 是协同设计的一部分,它继承了JDK中 java.util.concurrent 相关并发类,用来执行线程相关操作,其次还继承了 io.netty.channel 包中的类,方便与 Channel 进行交互。


Netty 默认提供了如 NioEventLoop 等多种实现,本篇围绕 NioEventLoop,其余的 EventLoop 实现不展开讨论

EventLoop接口的实现

NioEventLoop 的生命周期分为三个过程:创建 -> 启动 -> 运行,本篇也会围绕这三个过程展开,在整个 Netty Reactor 工作架构中的位置大约如下红框所示(NioEventLoop 的创建包含在 NioEventLoopGroup 的创建过程中)。

Netty Reactor 工作架构图(图片来自网络)

2. NioEventLoopGroup的构造函数

NioEventLoop 的创建发生在创建 NioEventLoopGroup 时,即: new NioEventLoopGroup() 时,先跟随 NioEventLoopGroup 的构造函数,看下其中做了什么。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public NioEventLoopGroup() {
this(0);
}

public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}

public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}

public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}

public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

// io.netty.channel.MultithreadEventLoopGroup
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

// io.netty.util.concurrent.MultithreadEventExecutorGroup
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}

// io.netty.util.concurrent.MultithreadEventExecutorGroup
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
...
}
  • line: 2 -> NioEventLoopGroup 默认构造函数中,nThreads 参数为0,后面将依据此参数确定创建 NioEventLoop 的数量。

  • line: 10 -> SelectorProvider.provider() 方法用来创建 java.nio.channels.spi.SelectorProvider [2]

  • line: 15 -> DefaultSelectStrategyFactory.INSTANCE ,NioEventLoop 最重要的事情之一就是:轮询注册其上 Channel 的 I/O就绪事件,SelectStrategyFactory 用于指定轮询策略。

  • line: 25 -> NioEventLoopGroup 父类 MultithreadEventLoopGroup 中的 DEFAULT_EVENT_LOOP_THREADS 属性决定了创建 NioEventLoop 的数量,暂时不表下面单独展开。

  • line: 30 -> DefaultEventExecutorChooserFactory.INSTANCE,创建 chooser ,即选择器, 可以看做是负载均衡器, 用于从众多 NioEventLoop 中选取一个 NioEventLoop,暂时不表下面单独展开。

  • line: 34 -> NioEventLoopGroup 父类 MultithreadEventExecutorGroup 的构造函数是 NioEventLoopGroup 构造函数调用链路中的核心逻辑所在,其中包含创建 NioEventLoop,下面单独展开。


2.1. 确定创建NioEventLoop数量

在 NioEventLoopGroup 父类 MultithreadEventLoopGroup 的构造函数中确定了创建 NioEventLoop 的数量。

1
2
3
4
5
6
7
8
9
10
// io.netty.channel.MultithreadEventLoopGroup
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
...
}

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
  • line: 3 ~ line: 4 -> 通过系统变量 io.netty.eventLoopThreads 获取数量,如没有指定,那默认就是 NettyRuntime.availableProcessors() * 2,也就是 CPU可用核数 * 2

因此使用默认构造函数创建 NioEventLoopGroup ,nThreads 参数即为 0,NioEventLoopGroup 内的 NioEventLoop 数量为 CPU可用核数 * 2


2.2. 核心逻辑

NioEventLoopGroup的构造函数 (点击跳转) 小节中提到过,父类 MultithreadEventExecutorGroup 构造函数是 NioEventLoopGroup 构造函数调用链路中的核心逻辑所在,接下来直接看 MultithreadEventExecutorGroup 构造函数,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
...
// NioEventLoopGroup 中保存 NioEventLoop 的集合
private final EventExecutor[] children;

// 选择器, 可以看做是负载均衡器, 用于从 children 集合中选取一个 NioEventLoop。
private final EventExecutorChooserFactory.EventExecutorChooser chooser;

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
...
// 没有指定线程池,则创建默认.
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}

// 创建内部的 eventLoop 数组,nThreads 默认为CPU核数*2
children = new EventExecutor[nThreads];

// 创建对应 nThreads 数量的 eventLoop 对象
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 创建 eventLoop 对象,添加到 children 数组中
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) { ... } finally { ... }
}

// Chooser 本质可以看成一个负载均衡器,用于选择一个内部的 eventLoop
chooser = chooserFactory.newChooser(children);
...
}
...
}
  • line: 18 -> 创建用于运行 NioEventLoop 的默认线程池。
    如果没有指定线程池,则创建默认线程池(ThreadPerTaskExecutor),NioEventLoopGroup 中所有的 NioEventLoop 都运行在这个线程池中。

  • line: 29 -> 创建 NioEventLoop,调用子类实现的 NioEventLoopGroup#newChild 方法。

  • line: 35 -> 创建 chooser。


3. 创建 NioEventLoop

以上,创建 NioEventLoop 的关键是 MultithreadEventExecutorGroup#newChild 抽象方法,NioEventLoopGroup 进行了具体实现,此处这里实际调用的是 NioEventLoopGroup#newChild 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
...
// 创建对应 nThreads 数量的 eventLoop 对象
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 创建 eventLoop 对象, 并添加到 children 数组中
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) { ... } finally { ... }
}
...
}

public class NioEventLoopGroup extends MultithreadEventLoopGroup {

protected EventLoop newChild(Executor executor, Object... args) throws Exception {
// 以下都是用于构建 eventLoop 的参数

// NIO工具类,SelectorProvider 使用了JDK的 SPI机制 来创建Selector、ServerSocketChannel、SocketChannel 等对象;
SelectorProvider selectorProvider = (SelectorProvider) args[0];

// SelectStrategy 是 Netty 用来控制 eventLoop 轮询方式的策略,此处 args[0] = DefaultSelectStrategyFactory.INSTANCE;
SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];

// 线程池的任务拒绝策略,默认是抛出 RejectedExecutionException 异常;
RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];

// 任务队列工厂类,每一个EventLoop对象内部都会包含一个任务队列
// 这个工厂类就是用来创建队列的,默认会创建一个Netty自定义线程安全的 MpscUnboundedArrayQueue 无锁队列。
EventLoopTaskQueueFactory taskQueueFactory = null;
EventLoopTaskQueueFactory tailTaskQueueFactory = null;

int argsLength = args.length;
if (argsLength > 3) {
taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
}
if (argsLength > 4) {
tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
}

return new NioEventLoop(this, executor, selectorProvider,
selectStrategyFactory.newSelectStrategy(),
rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
}
}
  • line: 25 -> NioEventLoop 最重要的事情之一就是:通过 Selector[1]轮询注册其上的 Channel 感兴趣的 I/O就绪事件,SelectStrategyFactory 用于指定轮询策略,默认是:DefaultSelectStrategyFactory.INSTANCE


  • line: 32 & line: 33 -> 关于 EventLoopTaskQueueFactory。
    Netty 中的 NioEventLoop 主要工作是轮询注册其上所有 Channel 的 I/O就绪事件 并处理。
    除了这些主要的工作外,Netty 为了极致的压榨 NioEventLoop 的性能,还会让它执行一些异步任务。既然要执行异步任务,那么 NioEventLoop 中就需要队列来保存任务。
    此处的 EventLoopTaskQueueFactory 就是用来创建队列,以保存 NioEventLoop 中待执行的异步任务。


  • 关于异步任务
    NioEventLoop 中的异步任务分为三类:

    • 普通任务(line: 32):Netty 最主要执行的异步任务,存放在 taskQueue(普通任务队列) 中。
    • 尾部任务(line: 33):存放在 tailTaskQueue(尾部任务队列) 中,尾部任务不常用,定时任务和普通任务执行完后才会执行尾部任务。
    • 定时任务:存放在 scheduledTaskQueue(定时任务队列/优先级队列) 中。

  • line: 43 -> 调用 NioEventLoop 构造函数,创建 NioEventLoop。


3.1. NioEventLoop构造函数

1
2
3
4
5
6
7
8
9
10
11
12
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
// 创建 selector
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
  • line: 4 -> 向上调用父类构造函数。

  • line: 4 -> newTaskQueue(taskQueueFactory) ,用于创建 NioEventLoop 内部的 taskQueue(普通任务队列) , 普通任务是 Netty 中最主要执行的异步任务。

  • line: 4 -> newTaskQueue(tailTaskQueueFactory) ,用于创建 NioEventLoop 内部的 tailTaskQueue(尾部任务队列) , 尾部任务不常用, 主要是统计数据的场景。

  • line: 9 -> 创建和优化 Selector [1] 逻辑。


3.1.1. 创建内部队列

NioEventLoop 构造函数中的 newTaskQueue 方法用于创建内部队列;
在 NioEventLoop 的父类 SingleThreadEventLoop 中提供了一个静态变量 DEFAULT_MAX_PENDING_TASKS 用来指定 NioEventLoop 内任务队列的大小。
可以通过系统变量 io.netty.eventLoop.maxPendingTasks 进行设置,默认为 Integer.MAX_VALUE,即为无界队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
...
protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
...
private static Queue<Runnable> newTaskQueue(
EventLoopTaskQueueFactory queueFactory) {
if (queueFactory == null) {
return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
}
return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
}
...
// 创建队列
private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
// This event loop never calls takeTask()
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
...
}

NioEventLoop 中的异步任务队列的类型为 MpscQueue,它是由 JCTools 提供的一个高性能无锁队列,从命名前缀 Mpsc 可以看出,它适用于 多生产者单消费者 的场景,它支持多个生产者线程安全的访问队列,同一时刻只允许一个消费者线程读取队列中的元素。


3.1.2. 创建Selector

NioEventLoop 构造函数中 openSelector() 方法用于创建 Selector[1] ,而且 Netty 还会在 openSelector() 方法中对创建出来的 Selector [1] 进行存储结构优化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public final class NioEventLoop extends SingleThreadEventLoop {
...
// java.nio.channels.Selector
private Selector selector;

private Selector unwrappedSelector;
...
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
// 创建 JDK NIO 原生 Selector
unwrappedSelector = provider.openSelector();
} catch (IOException e) { ... }
...
}
...
}
  • line: 13 -> 创建 Selector [1]
    SelectorProvider [2]会根据操作系统、JDK版本选择不同的 Selector [1] 实现,Linux下会选择 Epoll,Mac下会选择 Kqueue
    注意,此时 unwrappedSelector 属性中存储的是 JDK NIO 原生 Selector [1],还没有被优化过。

3.1.3. 优化NIO原生Selector

在 NioEventLoop 中有一个优化开关 DISABLE_KEY_SET_OPTIMIZATION,通过系统变量 io.netty.noKeySetOptimization 指定,默认是开启的,表示需要对 JDK NIO 原生 Selector [1]进行优化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public final class NioEventLoop extends SingleThreadEventLoop {
...
private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
...
private SelectorTuple openSelector() {
...
try {
// 创建 JDK NIO 原生 Selector
unwrappedSelector = provider.openSelector();
} catch (IOException e) { ... }

// 默认是会优化的,否则直接返回 JDK NIO 原生 Selector
if (DISABLE_KEY_SET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}
...
}
...
}

如果优化开关 DISABLE_KEY_SET_OPTIMIZATION 是关闭的,那么直接返回没有优化过的 JDK NIO 原生 Selector [1]

不过注意,openSelector 方法并没有直接返回 Selector [1],而是将 Selector 包装在 NioEventLoop.SelectorTuple 中返回的,主要是为了兼容下面 Netty 优化后的数据结构 SelectedSelectionKeySet

以下是 Netty 优化 JDK NIO 原生 Selector [1]的逻辑,总共分为四步:

  1. 获取 SelectorImpl。
  2. 判断是否是 JDK NIO 原生 Selector [1]
  3. 替换 HashSet。
  4. 包装返回。



3.1.3.1. 获取 SelectorImpl

获取 JDK NIO 原生 Selector [1]的抽象实现类, 即: sun.nio.ch.SelectorImplJDK NIO 原生 Selector [1]的实现都继承于该抽象类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public final class NioEventLoop extends SingleThreadEventLoop {
...
private SelectorTuple openSelector() {
...
// 获取 JDK NIO 原生 Selector 抽象实现类, 即: sun.nio.ch.SelectorImpl
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
public Object run() {
try {
return Class.forName("sun.nio.ch.SelectorImpl", false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) { ... }
}
});
...
}
}

sun.nio.ch.SelectorImpl 部分源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public abstract class SelectorImpl extends AbstractSelector {

// IO就绪的 SelectionKey
protected Set<SelectionKey> selectedKeys;

// 注册在该 Selector 上的所有 SelectionKey
protected HashSet<SelectionKey> keys;

// 用于向调用线程返回的 keys, 不可变
private Set<SelectionKey> publicKeys;

// 当有IO就绪的 SelectionKey 时, 向调用线程返回, 只可删除其中元素,不可增加
private Set<SelectionKey> publicSelectedKeys;

protected SelectorImpl(SelectorProvider sp) {
super(sp);
keys = new HashSet<SelectionKey>();
selectedKeys = new HashSet<SelectionKey>();
if (Util.atBugLevel("1.4")) {
...
} else {
// 不可变
publicKeys = Collections.unmodifiableSet(keys);
// 只可删除元素, 不可以增加
publicSelectedKeys = Util.ungrowableSet(selectedKeys);
}
}
...
}

重点关注抽象类 sun.nio.ch.SelectorImpl 中的 selectedKeyspublicSelectedKeys 这两个字段,注意它们的类型都是 HashSet ,后面优化的就是这里



3.1.3.2. 判断是否是JDK NIO 原生 Selector

上面获取 sun.nio.ch.SelectorImpl 抽象实现类,用于判断由 SelectorProvider[2] 创建出来的 java.nio.channels.Selector 是否是 JDK默认实现

因为 SelectorProvider[2] 可以自定义加载,因此创建出来的 java.nio.channels.Selector [1]并不一定是 JDK NIO 原生 Selector [1]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public final class NioEventLoop extends SingleThreadEventLoop {
...
private SelectorTuple openSelector() {
...
// 只针对 JDK NIO 原生 Selector 的实现类进行优化。
if (!(maybeSelectorImplClass instanceof Class) ||
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
}
return new SelectorTuple(unwrappedSelector);
}
...
}
...
}

而 Netty 优化针对的是 JDK NIO 原生 Selector [1],如果 sun.nio.ch.SelectorImpl 类不是 java.nio.channels.Selector 的父类,说明是自定义加载的,不用优化直接返回,中断后续优化步骤。



3.1.3.3. 替换HashSet

创建 SelectedSelectionKeySet,通过反射替换掉 sun.nio.ch.SelectorImpl 类中 selectedKeyspublicSelectedKeys 这两个属性的默认 HashSet 实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public final class NioEventLoop extends SingleThreadEventLoop {
...
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
...
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;

// SelectedSelectionKeySet 是 netty 优化后的数据结构
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

Object maybeException = AccessController.doPrivileged((PrivilegedAction)() -> {
try {

// 用反射的方式, 把 Selector 内部的 selectedKeys 和 publicSelectedKeys ,
// 替换成 netty 优化后的 SelectedSelectionKeySet
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
...
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
...
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) { ... }
});
...
}
}
  • line: 4 -> 被 Netty 优化过的 Selector [1],其中 publicKeyspublicSelectedKeys 属性的类型由 HashSet 被替换成了 SelectedSelectionKeySet。

  • line: 16 ~ line 17 -> 通过反射获取 sun.nio.ch.SelectorImpl 类中 selectedKeyspublicSelectedKeys

  • line: 19 ~ line: 29 -> 通过反射的方式,用 SelectedSelectionKeySet 替换掉 HashSet 实现的SelectorImpl#selectedKeysSelectorImpl#publicSelectedKeys


为什么要用 SelectedSelectionKeySet 替换掉原来的 HashSet 呢?

这就涉及到对 HashSet 类型的两种集合操作:

  • 插入操作Selector[1] 监听到 IO就绪java.nio.channels.SelectionKey 后, 会将这些 java.nio.channels.SelectionKey 插入到 sun.nio.ch.SelectorImpl#selectedKeys 集合中。

  • 遍历操作:NioEventLoop 内的工作线程会遍历 sun.nio.ch.SelectorImpl#selectedKeys 集合, 获取 I/O就绪 的 SocketChannel, 并处理 SocketChannel 上的 I/O就绪事件

翻阅源码, HashSet 的底层数据结构是一个 Hash表,因此存在 Hash冲突 的可能,所以会导致同样是插入和遍历操作, HashSet 的性能不如数组好。还有一个重要原因是,数组可以利用CPU缓存的优势提高遍历效率。

SelectedSelectionKeySet 内部维护一个数组,有一个变量 size 标识数组的逻辑长度。
每次 add 时,会把元素添加到数组的逻辑尾部,然后逻辑长度+1,当逻辑长度等于物理长度时,数组扩容。
相比于 HashSet 的实现,这种方式不需要考虑哈希冲突,是真正的 O(1) 时间复杂度。而调用 iterator 时相当于遍历数组,也比遍历 HashSet 更加高效。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
SelectionKey[] keys;
int size;

SelectedSelectionKeySet() { keys = new SelectionKey[1024]; }

@Override
public boolean add(SelectionKey o) {
if (o == null) { return false; }
keys[size++] = o;
if (size == keys.length) { increaseCapacity(); }
return true;
}
...
@Override
public Iterator<SelectionKey> iterator() {
return new Iterator<SelectionKey>() {
private int idx;
@Override
public boolean hasNext() { return idx < size; }
@Override
public SelectionKey next() {
if (!hasNext()) { throw new NoSuchElementException(); }
return keys[idx++];
}
@Override
public void remove() { throw new UnsupportedOperationException(); }
};
}
...
}



3.1.3.4. 包装返回

完成 SelectorImpl#selectedKeysSelectorImpl#publicSelectedKeys 属性的类型的替换后,将优化 SelectedSelectionKeySet 类型的 selectedKeySet 设置到 NioEventLoop#selectedKeys 属性中保存,并将结果包装成 NioEventLoop.SelectorTuple 返回。

1
2
3
4
5
6
7
8
9
10
public final class NioEventLoop extends SingleThreadEventLoop {
...
private SelectorTuple openSelector() {
...
selectedKeys = selectedKeySet;
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
...
}
}

以上,Netty 为了优化对 SelectorImpl#selectedKeysSelectorImpl#publicSelectedKeys 属性的集合操作性能, 使用 SelectedSelectionKeySet 来替换掉 sun.nio.ch.SelectorImpl 类中 selectedKeyspublicSelectedKeys 这两个字段的默认 HashSet 实现。


3.2. 父类构造函数

3.2.1. SingleThreadEventLoop

NioEventLoop 父类 SingleThreadEventLoop 的构造函数中也没有做什么处理,继续向上调用父类构造函数。

1
2
3
4
5
6
7
8
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp,
Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
RejectedExecutionHandler rejectedExecutionHandler) {
// addTaskWakesUp 默认为false, 用于标记 添加的任务是否会唤醒线程
// 调用父类
super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
}

line: 6 -> addTaskWakesUp 参数默认为 false, 用于标记 添加的任务是否会唤醒线程

单独看的话, SingleThreadEventLoop 类负责对 tailTaskQueue(尾部任务队列) 进行管理, 并且提供向 NioEventLoop 注册 Channel 的行为。


3.2.2. SingleThreadEventExecutor

SingleThreadEventExecutor 中也没有什么处理,大部分向上透传的参数都保存在了 SingleThreadEventExecutor 类中。

1
2
3
4
5
6
7
8
9
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,boolean addTaskWakesUp,
Queue<Runnable> taskQueue,RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
this.executor = ThreadExecutorMap.apply(executor, this);
this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

单独看的话,SingleThreadEventExecutor 类主要负责对 taskQueue(普通任务队列) 的管理, 以及异步任务的执行, NioEventLoop 中工作线程的启停。


4. 创建chooser

chooser 可以看做是负载均衡器, 用于从 children 集合中选取一个 NioEventLoop。

确定创建NioEventLoop数量(点击跳转) 小节中知道,NioEventLoopGroup 中默认会创建 CPU可用核数 * 2 个 NioEventLoop,当客户端连接完成三次握手后,parentGroup 会创建 NioSocketChannel,并将 NioSocketChannel 注册到 childGroup 中某个 NioEventLoop 的 Selector[1] 上,那么具体注册到childGroup 的哪个 NioEventLoop 上呢?

这个选取策略就是由 chooserFactory 来创建的,默认为 DefaultEventExecutorChooserFactory。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
...
// 创建对应 nThreads 数量的 eventLoop 对象
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 创建 eventLoop 对象,添加到 children 数组中
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {...} finally {...}
}

// Chooser 本质可以看成一个负载均衡器,用于选择一个内部的 eventLoop
chooser = chooserFactory.newChooser(children);

...
}

在 newChooser 方法中,会根据 NioEventLoop 的数量选择不同的 chooser 实现,选择的依据是数量是否是2的幂次方。

1
2
3
4
5
6
7
8
9
10
// io.netty.util.concurrent.DefaultEventExecutorChooserFactory#newChooser

public EventExecutorChooser newChooser(EventExecutor[] executors) {
// 根据待绑定的 executor 数量是否是2的幂次方,选择不同的'选择器实现'
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}

5. NioEventLoop创建流程图

NioEventLoop的创建(新标签页中打开可放大图片)

以上是创建 NioEventLoop 的过程,NioEventLoop 的启动和运行,在下篇 Netty_NioEventLoop启动和运行 中梳理分析。


6. Reference



  1. 1.java.nio.channels.Selector(多路复用器) 可以看成是操作系统底层中 select/epoll/poll 多路复用函数的Java包装类。只要 Selector 监听的任一一个 Channel 上有事件发生,就可以通过 Selector.select() 方法监测到,并且使用 SelectedKeys 可以得到对应的 Channel,然后对它们执行相应的I/O操作。
  2. 2.java.nio.channels.spi.SelectorProvider,相当于一个工厂类,提供了对 java.nio.channels.ServerSocketChannel、java.nio.channels.SocketChannel、java.nio.channels.Selector 等创建方法。