Netty_NioServerSocketChannel与NioSocketChannel
2025-01-22 08:19:30    2.7k 字   
This post is also available in English and alternative languages.

NioSocketChannel 与 NioServerSocketChannel。

  • 网上很多资料都将创建的 NioEventLoopGroup 对象称之为: bossGroup(boss) 和 workGroup(work),这里OP不会这么称呼,而是采用源码中变量的命名,即: parentGroupchildGroup

  • 本文 Netty 源码解析基于 4.1.86.Final 版本。


1. 继承关系

NioSocketChannel 与 NioServerSocketChannel 继承关系如下:

NioSocketChannel与NioServerSockerChannel继承关系

2. NioServerSocketChannel

parentGroup 中管理的 Channel 是 NioServerSocketChannel

2.1. 创建时机

在服务端 ServerBootstrap 启动时,Netty 会创建、初始化 NioServerSocketChannel,初始化时会在 NioServerSocketChannel 的 pipeline 中添加一个 ServerBootstrapAcceptor 处理器,然后将 NioServerSocketChannel 注册到 parentGroup 中某个 NioEventLoop 的 Selector(多路复用器)[1] 上。

更多细节,详阅: Netty_ServerBootstrap启动过程Netty_ServerBootstrapAcceptor


2.2. NioServerSocketChannel构造函数

NioServerSocketChannel 在服务端 ServerBootstrap 启动时,通过 ReflectiveChannelFactory 中的反射方法,通过无参构造函数创建的。

看下构造函数相关源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// io.netty.channel.socket.nio.NioServerSocketChannel
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
...
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

private static final Method OPEN_SERVER_SOCKET_CHANNEL_WITH_FAMILY =
SelectorProviderUtil.findOpenMethod("openServerSocketChannel");
...
private static ServerSocketChannel newChannel(SelectorProvider provider, InternetProtocolFamily family) {
try {
ServerSocketChannel channel =
SelectorProviderUtil.newChannel(OPEN_SERVER_SOCKET_CHANNEL_WITH_FAMILY, provider, family);
// 获取的是 java.nio.channels.ServerSocketChannel 实例
return channel == null ? provider.openServerSocketChannel() : channel;
} catch (IOException e) { ... }
}

public NioServerSocketChannel() {
this(DEFAULT_SELECTOR_PROVIDER);
}

public NioServerSocketChannel(SelectorProvider provider) {
this(provider, null);
}

public NioServerSocketChannel(SelectorProvider provider, InternetProtocolFamily family) {
this(newChannel(provider, family));
}

public NioServerSocketChannel(ServerSocketChannel channel) {
// 在父类中完成 非阻塞IO的配置, 及设置 OP_ACCEPT 标识
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
...
}
  • line: 5 -> 创建 java.nio.channels.spi.SelectorProvider[2],用于创建 JDK NIO 原生 ServerSocketChannel [1]

  • line: 28、line: 10 -> newChannel 方法创建 JDK NIO 原生 ServerSocketChannel 实例。

  • line: 31 -> 调用父类构造函数。注意,调用父类构造函数时传递了 SelectionKey.OP_ACCEPT 标识用于设置监听, 说明 NioServerSocketChannel 关注 OP_ACCEPT 事件。

  • line: 32 -> 创建 NioServerSocketChannel 的配置类 NioServerSocketChannelConfig,其中封装了对 NioServerSocketChannel 底层的一些配置行为和 java.net.ServerSocket ,以及创建 NioServerSocketChannel 用来接收数据的 Buffer 分配器 ServerChannelRecvByteBufAllocator。

从 line:31 ,继续向上看看父类构造函数中做了什么。


2.3. 父类构造函数

  • AbstractNioMessageChannel

    此父类的构造函数中并无其他逻辑操作,继续调用上层父类构造函数。

    1
    2
    3
    protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent, ch, readInterestOp);
    }

    单独看 AbstractNioMessageChannel 类,主要是对 NioServerSocketChannel 底层读写行为的封装和定义,比如accept接收客户端连接等。


  • AbstractNioChannel

    此父类的构造函数中,设置了相关参数;

    line: 6 -> 将 java.nio.channels.ServerSocketChannel 设置成非阻塞

    1
    2
    3
    4
    5
    6
    7
    8
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
    ch.configureBlocking(false);
    } catch (IOException e) { ... }
    }

  • AbstractChannel

    此父类构造函数中负责创建 unsafe 和 pipeline。

    1
    2
    3
    4
    5
    6
    7
    8
    protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    // unsafe 用于定义实现对 Channel 的底层操作
    unsafe = newUnsafe();
    // 为 channel 分配独立的 pipeline 用于IO事件编排
    pipeline = newChannelPipeline();
    }
    • line: 5 -> 创建 NioServerSocketChannel 底层操作类 Unsafe,这里创建的是 AbstractNioMessageChannel.NioMessageUnsafe

    • line: 7 -> 为 NioServerSocketChannel 分配独立的 pipeline 用于IO事件编排。pipeline 其实是一个 ChannelHandlerContext 类型的双向链表。头结点 HeadContext ,尾结点 TailContext 。 ChannelHandlerContext 中包装着 ChannelHandler。


至此,通过 NioServerSocketChannel 相关构造函数可以得知以下信息:

  1. NioServerSocketChannel 中包含 java.nio.channels.ServerSocketChannel
  2. NioServerSocketChannel 关注 OP_ACCEPT 事件。
  3. NioServerSocketChannel 是非阻塞的。

2.4. NioMessageUnsafe

Unsafe 作为 Channel 的内部类,承担着 Channel 网络相关功能的具体实现,比如读写操作,之所以命名为 Unsafe 是不希望外部使用,并非是不安全的。

NioServerSocketChannel 对应的 Unsafe 实例,在父类 AbstractNioMessageChannel 中进行创建。

1
2
3
4
5
// io.netty.channel.nio.AbstractNioMessageChannel#newUnsafe

protected AbstractNioUnsafe newUnsafe() {
return new NioMessageUnsafe();
}

NioMessageUnsafe 是 AbstractNioMessageChannel 的内部类;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
private final class NioMessageUnsafe extends AbstractNioUnsafe {

private final List<Object> readBuf = new ArrayList<Object>();

@Override
public void read() {
...
// 创建接收数据的 Buffer分配器, 其为 byteBuf 分配大小合适的空间。
// 在当前场景中(接收连接的场景中), 这里的 allocHandle 只是用于控制循环接受并创建连接的次数。
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
// 对于 NioServerSocketChannel 来说,就是接收一个客户端 Channel,添加到 readBuf。
// 调用子类的实现的方法, 当成功读取时返回1。
// io.netty.channel.socket.nio.NioServerSocketChannel.doReadMessages
int localRead = doReadMessages(readBuf);

// 已无数据,跳出循环
if (localRead == 0) { break; }

// 链路关闭,跳出循环
if (localRead < 0) { closed = true; break; }

// 统计在当前事件循环中已经创建连接的个数
allocHandle.incMessagesRead(localRead);
// 默认循环不超过16次
} while (continueReading(allocHandle));
} catch (Throwable t) { exception = t; }

// 循环处理 readBuf 中的 NioSocketChannel
int size = readBuf.size();
for (int i = 0; i < size; i++) {
readPending = false;

// 通过 NioServerSocketChannel 的 pipeline 传播 ChannelRead 事件,
// 将 NioSocketChannel 注册到 childParent 中某个 NioEventLoop 上。
// {@link io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead}
// 从 readBuf 中取出的是 NioSocketChannel
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
// 读取完毕的回调,有的Handle会根据本次读取的总字节数,自适应调整下次应该分配的缓冲区大小
allocHandle.readComplete();
// 通过 pipeline 传播 ChannelReadComplete 事件
pipeline.fireChannelReadComplete();

if (exception != null) { // 事件处理异常了
// 是否需要关闭连接
closed = closeOnReadError(exception);

// 通过pipeline传播异常事件
pipeline.fireExceptionCaught(exception);
}

if (closed) { // 如果需要关闭,那就关闭
inputShutdown = true;
if (isOpen()) { close(voidPromise()); }
}
} finally { ... }
}
}

// io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages
protected int doReadMessages(List<Object> buf) throws Exception {
// 通过 java.nio.channels.ServerSocketChannel 的 accept 方法 接收新的客户端连接,
// 即 java.nio.channels.SocketChannel
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
// 如果获取到客户端连接对象 java.nio.channels.SocketChannel, 创建 NioSocketChannel,并添加到 buf 中
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) { ... }
return 0;
}
  • line: 4 -> 属性名称虽然是 readBuf,但类型是 Object,并且集合中存储的是 NioSocketChannel。

  • line: 25 -> 调用子类实现的 NioServerSocketChannel#doReadMessages 方法,获取客户端Socket连接,并创建 NioSocketChannel 。

  • line: 20 ~ line: 37 -> 注意是 do…while… 循环,每次循环不超过16次。

  • line: 40 ~ line: 47 -> 循环处理存储在 readBuf 中的 NioSocketChannel,通过 NioServerSocketChannel 的 pipeline 传播 ChannelRead 事件,即调用: ServerBootstrap.ServerBootstrapAcceptor#channelRead 方法。

  • line: 48 ~ line:67 -> 后续清理逻辑。

  • line: 80 -> 创建 NioSocketChannel,包装客户端连接(java.nio.channels.SocketChannel)。


至此,通过 NioMessageUnsafe 可以得知以下信息:

  1. NioMessageUnsafe 每次循环不超过16次,换个角度来说,每次循环最多创建16个 NioSocketChannel。
  2. NioMessageUnsafe 会依次处理客户端连接,最后统一触发 ChannelRead 事件,即通过 NioServerSocketChannel 的 pipeline 传播 ChannelRead 事件。



3. NioSocketChannel

childGroup 中管理的 Channel 是 NioSocketChannel

3.1. 创建时机

当客户端连接到达(OP_ACCEPT 事件),parentGroup 中 NioEventLoop 就会调用 NioServerSocketChannel 所属的 NioMessageUnsafe#read 方法进行处理。在 NioMessageUnsafe#read 方法中,会创建 NioSocketChannel。

更多细节详阅: NioMessageUnsafe(点击跳转)


3.2. NioSocketChannel构造函数

参数列表中的 socket 是 java.nio.channels.SocketChannel

1
2
3
4
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}

从 line: 2 ,继续向上看看父类中做了什么。


3.3. 父类构造函数

  • AbstractNioByteChannel

    1
    2
    3
    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
    super(parent, ch, SelectionKey.OP_READ);
    }

    此父类的构造函数中并无其他逻辑操作,继续调用上层父类构造函数。

    需要注意的是,继续向上调用父类构造函数时传递了 SelectionKey.OP_READ 标识用于设置监听,说明 NioSocketChannel 关注 OP_READ 事件。


  • AbstractNioChannel

    此父类的构造函数中,设置了相关参数;

    line: 6 -> 将 java.nio.channels.ServerSocketChannel 设置成非阻塞

    1
    2
    3
    4
    5
    6
    7
    8
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
    ch.configureBlocking(false);
    } catch (IOException e) { ... }
    }

  • AbstractChannel

    此父类构造函数中负责创建 unsafe 和 pipeline。

    1
    2
    3
    4
    5
    6
    7
    8
    protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    // unsafe 用于定义实现对 Channel 的底层操作
    unsafe = newUnsafe();
    // 为 channel 分配独立的 pipeline 用于IO事件编排
    pipeline = newChannelPipeline();
    }
    • line: 5 -> 创建 NioSocketChannel 底层操作类 Unsafe,这里创建的是 AbstractNioByteChannel.NioByteUnsafe

    • line: 7 -> 为 NioSocketChannel 分配独立的 pipeline 用于IO事件编排。pipeline 其实是一个 ChannelHandlerContext 类型的双向链表。头结点 HeadContext ,尾结点 TailContext 。 ChannelHandlerContext 中包装着 ChannelHandler。


3.4. NioByteUnsafe

Unsafe 作为 Channel 的内部类,承担着 Channel 网络相关功能的具体实现,比如读写操作,之所以命名为 Unsafe 是不希望外部使用,并非是不安全的。

NioSocketChannel 对应的 Unsafe 实例,在父类 AbstractNioByteChannel 中进行创建。

1
2
3
4
5
// io.netty.channel.nio.AbstractNioByteChannel#newUnsafe

protected AbstractNioUnsafe newUnsafe() {
return new NioByteUnsafe();
}

NioByteUnsafe 是 AbstractNioByteChannel 的内部类,NioByteUnsafe 类中有三个方法,这里关注 read 方法,另两个就不贴了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read

public final void read() {
...
final ChannelPipeline pipeline = pipeline();
// 获取 ByteBuf 分配器,默认为 PooledByteBufAllocator
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
// 清空上一次读取的字节数
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
// 分配内存
byteBuf = allocHandle.allocate(allocator);
// 读取通道接收缓冲区的数据
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// 若没有数据可以读,则释放内存
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// 当读到-1时,表示Channel通道已经关闭,没有必要再继续读。
readPending = false;
}
break;
}
// 更新读取消息计数器
allocHandle.incMessagesRead(1);
readPending = false;

// 通知通道处理器处理数据,通过 Channel 的 pipeline 传播 ChannelRead 事件
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
// 读取操作完毕
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
// 如果Socket通道关闭,则关闭读操作
closeOnRead(pipeline);
}
} catch (Throwable t) { ... } finally { ... }
}
  • line: 7 ~ line: 10 -> 获取 byteBuf 分配器,清空并重置。

  • line: 17 -> 为 byteBuf 分配空间。

  • line: 19 -> doReadBytes 方法,从 java.nio.channels.SocketChannel 读取数据到 ByteBuf 。

  • line: 37 -> 通过 Channel 的 pipeline 传播 ChannelRead 事件。

  • line: 15 ~ line: 39 -> 注意是 do…while… 循环。


至此,通过 NioByteUnsafe 可以得知以下信息:

  1. byteBuf 每次都会重置清空。
  2. 每读取到一定字节就触发 ChannelRead 事件,而不是全部读取完再触发。

4. 小结

NioServerSocketChannel 是非阻塞的, 并且关注 OP_ACCEPT 事件, 具体读写细节在内部类 NioMessageUnsafe 中,会依次处理客户端连接最后统一触发 ChannelRead 事件。

NioSocketChannel 是非阻塞的, 并且关注 OP_READ 事件, 具体读写细节在内部类 NioByteUnsafe 中,每读取到一定字节就触发 ChannelRead 事件。

NioServerSocketChannel 要求高吞吐量,尽可能多的接受客户端连接; 而 NioSocketChannel 要求尽可能快的响应远端请求。


5. Reference



  1. 1.java.nio.channels.Selector(多路复用器) 可以看成是操作系统底层中 select/epoll/poll 多路复用函数的Java包装类。只要 Selector 监听的任一一个 Channel 上有事件发生,就可以通过 Selector.select() 方法监测到,并且使用 SelectedKeys 可以得到对应的 Channel,然后对它们执行相应的I/O操作。
  2. 2.java.nio.channels.spi.SelectorProvider 相当于一个工厂类,提供了对 java.nio.channels.ServerSocketChannel、java.nio.channels.SocketChannel、java.nio.channels.Selector 等创建方法。