Netty_ServerBootstrapAcceptor
2025-01-22 08:19:30    1.6k 字   
This post is also available in English and alternative languages.

ServerBootstrapAcceptor 是 Netty 服务端用来连接 parentGroup 和 childGroup的核心类,在 Netty_ServerBootstrap启动过程 的「注册Channel」一节中曾分析过向 parentGroup 注册 NioServerSocketChannel,现在看下是如何向 childGroup 中注册 NioSocketChannel 的

在整个 Netty Reactor 工作架构中的位置大约如下红框所示:

Netty Reactor 工作架构图(图片来自网络)
  • 网上很多资料都将创建的 NioEventLoopGroup 对象称之为: bossGroup(boss) 和 workGroup(work),这里OP不会这么称呼,而是采用源码中变量的命名,即: parentGroupchildGroup

  • 本文 Netty 源码解析基于 4.1.86.Final 版本。


1. ServerBootstrapAcceptor

向 childGroup 中注册 Channel 的时机比较特殊,并不是在服务端启动时触发注册,而是在客户端通过与 parentGroup 中 EventLoop(NioServerSocketChannel) 进行交互并建立连接完成后,再通过 ServerBootstrapAcceptor 将客户端连接(socketChannel) 注册到 childGroup 中。

ServerBootstrapAcceptor 是 Netty 服务端用来接收客户端连接的核心类,它的作用是当新的 SocketChannel 连接时,将 SocketChannel 注册到 childGroup 中的一个 NioEventLoop 上,继续监听相关读写事件。

核心代码: ServerBootstrap.ServerBootstrapAcceptor#channelRead,看下这里是如何处理新连接的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead

public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 此处的 msg,是已建立链接的 NioSocketChannel,因此可以直接强转
final Channel child = (Channel) msg;
// 将 childHandler 添加到 已建立连接的 NioSocketChannel 的 pipeline 中。
child.pipeline().addLast(childHandler);
// 将 childOptions 添加到 已建立连接的 NioSocketChannel 中
setChannelOptions(child, childOptions, logger);
// 将 childAttrs 添加到 已建立连接的 NioSocketChannel 中
setAttributes(child, childAttrs);

try {
// 将已建立链接的 NioSocketChannel 注册到 childGroup,
// 实现类 MultithreadEventLoopGroup.register(io.netty.channel.Channel)
childGroup.register(child).addListener(new ChannelFutureListener() { ... });
} catch (Throwable t) { ... }
}

当有客户端连接时,触发 channelRead 方法;NioEventLoop 会监听 Selector[1] 相关事件,有 OP_ACCEPT 事件时,会触发 Unsafe.read() ,具体实现在 AbstractNioMessageChannel.NioMessageUnsafe#read() 方法中,它会调用 ServerSocketChannel.accept() 获取客户端连接,并触发 channelRead 回调

  • line:7 -> 设置 NioSocketChannel 的 pipeline。
  • line:9 -> 设置 NioSocketChannel 的 options。
  • line:11 -> 设置 NioSocketChannel 的 attrs。
  • line:16 -> 将 NioSocketChannel 注册到 childGroup 中某个 NioEventLoop 的 Selector(多路复用器)[1] 上。

2. 注册的过程

向 childGroup 中注册 NioSocketChannel 的过程,与向 parentGroup 中注册 NioServerSocketChannel 一样,详阅:Netty_ServerBootstrap启动过程 中「注册Channel」小节。


3. 何时触发channelRead

当有新的连接进来时,Netty 是何时调用的 channelRead() 方法的?

在服务端 ServerBootstrap 启动时,Netty 会创建、初始化 NioServerSocketChannel,初始化时会在 NioServerSocketChannel 的 pipeline 中添加一个 ServerBootstrapAcceptor 处理器。ServerBootstrapAcceptor 的作用就是当建立新的 NioSocketChannel 连接时,将 NioSocketChannel 注册到 childGroup 中某个 NioEventLoop 的 Selector(多路复用器)[1] 上。

最后会将 NioServerSocketChannel 其注册到 parentGroup 中某个 NioEventLoop 的 Selector(多路复用器)[1] 上,最后再绑定到本地端口上。

以上逻辑详阅:Netty_ServerBootstrap启动过程


这样 Netty 就可以接收客户端的连接了,当有新的客户端连接接入时(OP_ACCEPT),Selector(多路复用器)[1] 会监听到并返回准备就绪的 Channel,NioEventLoop 会处理这些事件,详见NioEventLoop.processSelectedKey()方法。

由于事件类型是OP_ACCEPT,因此会调用Unsafe.read()处理:

1
2
3
4
5
// io.netty.channel.nio.NioEventLoop#processSelectedKey

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}

需要注意,Netty 使用 Unsafe 对象来实现 Channel 具体的读写等业务操作, 而 NioServerSocketChannel 和 NioSocketChannel 分别对应的 Unsafe 对象实现也是不同的。

  • NioServerSocketChannel 对应 AbstractNioMessageChannel.NioMessageUnsafe,关注 OP_ACCEPT 事件。

  • NioSocketChannel 对应 AbstractNioByteChannel.NioByteUnsafe,关注 OP_READ 事件。

对于 NioServerSocketChannel 来说,它的实际上是 读连接(accept), 而对于 NioSocketChannel 来说,它的是真正的读数据(Bytes)


因此对于 NioServerSocketChannel 的 Unsafe.read(),这里会执行 AbstractNioMessageChannel.NioMessageUnsafe#read()方法,它会调用 JDK 底层的 ServerSocketChannel.accept() 接收到客户端的连接后,将其封装成 Netty 的 NioSocketChannel,再通过 Pipeline 将 ChannelRead 事件传播出去,这样 ServerBootstrapAcceptor 就可以在 ChannelRead 回调里处理新的客户端连接了。

核心代码: AbstractNioMessageChannel.NioMessageUnsafe#read

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/**
* NioEventLoop.processSelectedKey() 当 NioServerSocketChannel 有 OP_READ | OP_ACCEPT 事件时调用该方法。
* <p>
* 对于 NioServerSocketChannel 来说,它的<b>读</b> 实际上是 <u><b>读连接(accept)</b></u>,
*/
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();

// 接收对端数据时,ByteBuf的分配策略,基于历史数据动态调整初始化大小,避免太大浪费空间,太小又会频繁扩容
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);

boolean closed = false;
Throwable exception = null;
try {
try {
do {
// 对于 NioServerSocketChannel 来说,就是接收一个客户端 Channel,添加到 readBuf。
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}

// 递增已读取的消息数量
allocHandle.incMessagesRead(localRead);
} while (continueReading(allocHandle));
} catch (Throwable t) { ... }

int size = readBuf.size();
for (int i = 0; i < size; i++) {
readPending = false;
/**
* 通过pipeline传播ChannelRead事件
* {@link io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead}
*/
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
// 读取完毕的回调,有的Handle会根据本次读取的总字节数,自适应调整下次应该分配的缓冲区大小
allocHandle.readComplete();
// 通过pipeline传播ChannelReadComplete事件
pipeline.fireChannelReadComplete();

if (exception != null) {
// 事件处理异常了

// 是否需要关闭连接
closed = closeOnReadError(exception);

// 通过pipeline传播异常事件
pipeline.fireExceptionCaught(exception);
}

if (closed) {
// 如果需要关闭,那就关闭

inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally { ... }
}

4. 示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Slf4j
public class NettyEchoServer {
public static void main(String[] args) {
final AttributeKey<String> key = AttributeKey.valueOf(UUID.randomUUID().toString());
EventLoopGroup parentNioEventLoopGroup = new NioEventLoopGroup();
EventLoopGroup childNioEventLoopGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(parentNioEventLoopGroup, childNioEventLoopGroup)
.handler(new LoggingHandler(LogLevel.INFO))
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.attr(key, "value")
.localAddress(SERVER_PORT)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new NettyEchoServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind().sync();
log.info("Echo 服务端准备就绪...");
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {...} finally { ... }
}
}

5. Reference



  1. 1.java.nio.channels.Selector(多路复用器) 可以看成是操作系统底层中 select/epoll/poll 多路复用函数的Java包装类。只要 Selector 监听的任一一个 Channel 上有事件发生,就可以通过 Selector.select() 方法监测到,并且使用 SelectedKeys 可以得到对应的 Channel,然后对它们执行相应的I/O操作。