Netty_从 ServerBootstrap 启动看 ChannelPipeline
2025-01-22 08:19:30    4k 字   
This post is also available in English and alternative languages.

Netty 服务端 ServerBootstrap 启动的过程中,需要创建及初始化 NioServerSocketChannel,在 NioServerSocketChannel 的初始化阶段,会向 NioServerSocketChannel 的 Pipeline 中添加一个 ChannelHandler(ServerBootstrapAcceptor 处理器 [1]),本篇就将以此作为入口案例,梳理分析 ChannelPipeline。

ChannelPipeline 的作用实现网络事件的动态编排和有序传播,基于 责任链设计模式(Chain of Responsibility) 设计,内部是一个 双向链表 结构,支持动态地添加和删除 ChannelHandler。

  • 网上很多资料都将创建的 NioEventLoopGroup 对象称之为: bossGroup(boss) 和 workGroup(work),这里OP不会这么称呼,而是采用源码中变量的命名,即: parentGroupchildGroup
  • 文中图片较大,懒加载,请耐心等候
  • 本文 Netty 源码解析基于 4.1.86.Final 版本。

1. 概述

Channel、ChannelPipeline、ChannelHandlerContext、ChannelHandler 四者的关系可以用下面这张图表示:

Channel_ChannelPipeline_ChannelHandlerContext_ChannelHandler关系图(图片来自网络)
  • 每个 Channel 内部都有一个 ChannelPipeline,ChannelPipeline 是一个双向链表。
  • 每个 ChannelPipeline 包含多个 ChannelHandlerContext,所有 ChannelHandlerContext 之间组成了双向链表。
  • 每个 ChannelHandlerContext 中 都封装了 ChannelHandler。

ChannelHandler 的作用只是负责处理I/O逻辑,比如编码、解码。它并不会感知到它在 Pipeline 中的位置,更不会感知和它相邻的两个 ChannelHandler。

ChannelHandlerContext 对 ChannelHandler 进行了封装,起到维护 ChannelHandler 上下文的作用,并且将 ChannelHandler 生命周期的所有事件,如 connect、bind、read、flush、write、close 等都剥离出来,减少代码耦合。同时还维护了 ChannelPipeline 双向链表中的 pre 和 next 指针,方便找到与其相邻的 ChannelHandler。


2. ChannelPipeline创建时机

Netty 中每个 Channel 都有独立的 Pipeline,Pipeline 伴随着 Channel 的创建而创建,在 post_link 技术/component/netty/Netty_NioServerSocketChannel与NioSocketChannel%} 中分析梳理过 NioServerSocketChannel 与 NioSocketChannel,ChannelPipeline 的创建在它们父类 AbstractChannel 的构造函数中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
...
public NioServerSocketChannel(ServerSocketChannel channel) {
// 在父类中完成了 非阻塞IO的配置,及事件的注册
super(null, channel, SelectionKey.OP_ACCEPT);

// javaChannel() 方法返回的是 java.nio.channels.ServerSocketChannel,
// 也就是 {@link io.netty.channel.socket.nio.NioServerSocketChannel.newChannel} 中创建的。
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
...
}

public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
...
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
...
}

public abstract class AbstractNioChannel extends AbstractChannel {
...
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
// java.nio.channels.ServerSocketChannel
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) { ... }
}
...
}

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
...
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
// unsafe 用于定义实现对 Channel 的底层操作
unsafe = newUnsafe();
// 为 channel 分配独立的 pipeline 用于IO事件编排
pipeline = newChannelPipeline();
}
...
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
...
}

NioServerSocketChannel 构造函数层层向上调用父类构造函数,最终在 AbstractChannel 的构造函数中创建了 ChannelPipeline,是 DefaultChannelPipeline 类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
...
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
...
}

public class DefaultChannelPipeline implements ChannelPipeline {
...
// pipeline 中的头节点
final HeadContext head;
// pipeline 中的尾节点
final TailContext tail;
// pipeline 中持有对应的 channel
private final Channel channel;
...
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
// 创建尾节点
tail = new TailContext(this);
// 创建头节点
head = new HeadContext(this);
// 构建双向链表
head.next = tail;
tail.prev = head;
}
...
}

在 DefaultChannelPipeline 构造函数中,创建双向链表的 head(头节点) 和 tail(尾节点),并将它们构建关联。就此 NioServerSocketChannel 中的 pipeline 已经创建完成。


3. ChannelPipeline中的头尾节点

目前为止,NioServerSocketChannel 中的 Pipeline 已经创建好,Pipeline 中只有 head(头节点) 和 tail(尾节点),示意图如下:

ChannelPipeline中的Head和Tail(图片来自网络)

head(头节点) 和 tail(尾节点) 分别对应的类是 HeadContext 和 TailContext,二者的继承关系如下图:

TailContext和HeadContext继承体系
  • 从 HeadContext 和 TailContext 命名上可以看出,头尾节点都是 ChannelHandlerContext 角色。
  • 由于 HeadContext 和 TailContext 在双向链表中位置的特殊性,二者还分别实现了 ChannelHandler 相关接口,说明二者还承担着 ChannelHandler 角色职责。

3.1. HeadContext

HeadContext 是定义在 DefaultChannelPipeline 类中的内部类,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {

// headContext 中持有对 channel unsafe操作类的引用 用于执行 channel 底层操作
private final Unsafe unsafe;

HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, HeadContext.class);
// headContext 中持有对 channel unsafe操作类的引用 用于执行 channel 底层操作
unsafe = pipeline.channel().unsafe();
// 设置 channelHandler 的状态为 ADD_COMPLETE
setAddComplete();
}

public ChannelHandler handler() {
return this;
}
...
}

如上源码、ChannelPipeline中的头尾节点(点击跳转) 中类图所示,HeadContext 继承了 AbstractChannelHandlerContext 抽象类,间接实现了 ChannelHandlerContext 接口。
同时还实现了 ChannelInboundHandler 和 ChannelOutboundHandler 两个接口,表明 「HeadContext 既是一个 ChannelHandlerContext 又是一个 ChannelHandler,同时可以处理 InboundOutbound 」。


3.2. TailContext

TailContext 是定义在 DefaultChannelPipeline 类中的内部类。

1
2
3
4
5
6
7
8
9
10
11
12
13
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, TailContext.class);
// 设置 channelHandler 的状态为 ADD_COMPLETE
setAddComplete();
}

public ChannelHandler handler() {
return this;
}
...
}

如上源码、ChannelPipeline中的头尾节点(点击跳转) 中类图所示,TailContext 继承了 AbstractChannelHandlerContext 抽象类,间接实现了 ChannelHandlerContext 接口。
相较于 HeadContext,TailContext 只实现了 ChannelInboundHandler 接口,表明 「TailContext 既是一个 ChannelHandlerContext 又是一个 ChannelHandler,同时可以处理 Inbound 」。


3.3. 示意图

HeadContext 作为 Pipeline 的头结点,负责读取数据并开始传递 InBound 事件,当数据处理完成后,数据会反方向经过 Outbound 处理器,最终传递到 HeadContext。所以,HeadContext 又是处理 Outbound 事件的最后一站;

TailContext 作为 Pipeline 的尾结点,会在 ChannelInboundHandler 调用链路的最后一步执行,用于终止 Inbound 事件传播。同时,TailContext 节点作为 OutBound 事件传播的第一站,会将 OutBound 事件传递给上一个节点。

Channel_ChannelPipeline_ChannelHandlerContext_ChannelHandler示意图(图片来自网络)

4. Inbound和Outbound

关键:inbound events and outbound operations,入站事件 和 出站操作。

根据数据的流向 ChannelPipeline 中包含 ChannelInboundHandler(入站) 和 ChannelOutboundHandler(出站) 两种处理器。

ChannelInboundHandler 和 ChannelOutboundHandler,这里的 Inbound 和 Outbound 是什么意思?Inbound 对应 I/O输入,Outbound 对应 I/O输出,这是看到这两个名字的第一反应。

但翻阅 ChannelOutboundHandler 接口代码,发现其中有 read 方法时产生了疑惑:「如果 Outbound 对应 I/O输出,为什么 ChannelOutboundHandler 接口中有明显表示 I/O输入的 read 方法呢?」

直到看到 Stack Overflow 上 Netty 作者 Trustin Lee 对 Inbound 和 Outbound 的解释 ,疑团解开:

Inbound handlers are supposed to handle inbound events. Events are triggered by external stimuli such as data received from a socket.

Outbound handlers are supposed to intercept the operations issued by your application.

Re: Q1) read() is an operation you can issue to tell Netty to continue reading the inbound data from the socket, and that’s why it’s in an outbound handler.

Re: Q2) You don’t usually issue a read() operation because Netty does that for you automatically if autoRead property is set to true. Typical flow when autoRead is on:

  1. Netty triggers an inbound event channelActive when socket is connected, and then issues a read() request to itself (see DefaultChannelPipeline.fireChannelActive())
  2. Netty reads something from the socket in response to the read() request.
  3. If something was read, Netty triggers channelRead().
  4. If there’s nothing left to read, Netty triggers channelReadComplete()
  5. Netty issues another read() request to continue reading from the socket.

If autoRead is off, you have to issue a read() request manually. It’s sometimes useful to turn autoRead off. For example, you might want to implement a backpressure mechanism by keeping the received data in the kernel space.

Netty 是事件驱动的,事件分为两大类:Inbound 和 Outbound,分别由 ChannelInboundHandler 和 ChannelOutboundHandler 负责处理。所以,Inbound 和 Outbound 并非指 I/O 的输入和输出,而是指事件类型

什么样的事件属于 Inbound,什么样的事件属于 Outbound 呢,即:事件类型的划分依据是什么?答案是:触发事件的源头


4.1. Inbound事件

站在 Netty 服务端角度,Inbound 事件由外部触发,外部是指 Netty 服务端之外,如客户端。因此 Inbound 事件并非因为服务端主动做了什么而触发的事件。

如某个客户端连接上 Netty 服务端并被注册到某个 NioEventLoop 上。
再比如从 Socket 接收数据的过程(注意是"开始读"、"读完了"事件,而不是"读取"这个操作)。

Inbound 事件的详细列表(ChannelInboundHandler):


4.2. Outbound事件

站在 Netty 服务端角度,Outbound 事件由服务端主动触发,可以认为 Outbound 是指服务端主动发起的某个操作。

比如向 Socket 写入数据。
再比如从 Socket 读取数据(注意是"读取"这个操作请求,而非"读完了"这个事件)。

  • 读数据是主动的, 例如: Client 端发送数据给 Server,硬件设备接收数据并向 Buffer 中写入,然后 Server 端主动从 Buffer中读取数据。
    这也是为什么 ChannelOutboundHandler#read 方法的参数列表中没有 msg 参数的原因, 因为只是 Server 触发的读操作, 而不是真的读到数据了。( Stack Overflow 上 Netty 作者 Trustin Lee 对 Inbound 和 Outbound 的解释 )

这也解释了为什么 ChannelOutboundHandler 中会有 read 方法。

Outbound 事件列表(ChannelOutboundHandler):

Outbound 事件大都是在 Socket 上可以执行的一系列常见操作:绑定地址、建立和关闭连接、IO操作,另外还有 Netty 定义的一种操作deregister,即:解除 channel 与 eventloop 的绑定关系。

值得注意的是,一旦应用程序发出以上 Outbound 事件请求,ChannelOutboundHandler 中对应的方法就会被调用,一个 ChannelHandler 在处理时甚至可以将请求拦截而不再传递给后续的 ChannelHandler,使得真正的操作并不会被执行。


4.3. 示意图

ChannelPipeline_Inbound和Outbound流转示意图

5. ChannelHandler

概述(点击跳转) 小节中所述,ChannelHandler 专职负责业务处理,使用 Netty 进行开发时,主要的工作就是基于 ChannelHandler 的开发。

前面说过,ChannelHandler 分为 Inbound(入站) 和 Outbound(出站) 两种类型,分别对应 ChannelInboundHandler 和 ChannelOutboundHandler 这两个接口。


5.1. ChannelInboundHandler

ChannelInboundHandler 接口定义了很多与 Inbound 相关的回调方法,触发时机如下:

回调方法触发时机
channelRegisteredChannel 被注册到 EventLoop
channelUnregisteredChannel 从 EventLoop 中取消注册
channelActiveChannel 处于就绪状态,可以被读写
channelInactiveChannel 处于非就绪状态
channelReadChannel 可以从远端读取到数据
channelReadCompleteChannel 读取数据完成
userEventTriggered用户事件触发时
channelWritabilityChangedChannel 的写状态发生变化

ChannelInboundHandler的默认实现为 ChannelInboundHandlerAdapter,开发 Inbound 事件的 ChannelHandler 一般只要继承该类即可。

5.1.1. 处理过程

运行 示例代码(点击跳转) ,观察 ChannelInboundHandler 处理 Inbound 事件的过程:

1
2
3
4
5
6
7
8
9
调用方法:handlerAdded
调用方法:channelRegistered
调用方法:channelActive
调用方法:channelRead
echo server 收到:[/127.0.0.1:51377],message:[Netty rocks]
调用方法:channelReadComplete
调用方法:channelInactive
调用方法:channelUnregistered
调用方法:handlerRemoved
  • handlerAdded:当 ChannelHandler 被加入到 Pipeline 后,此方法被回调。也就是执行完 ch.pipeline().addLast(new NettyEchoServerHandler()); 语句之后回调。

  • channelRegistered:当 Channel 成功注册到一个 NioEventLoop 上之后,会通过 Pipeline 回调所有 ChannelHandler 的channelRegistered 方法;

  • channelUnregistered:当 Channel 和 NioEventLoop 工作线程解除绑定,移除掉对这条通道的事件处理之后,会通过 Pipeline 回调所有 ChannelHandler 的 channelUnregistered 方法。

  • channelActive:当 Channel 处于就绪状态,可以被读写时,会通过 Pipeline 回调所有 ChannelHandler 的 channelActive 方法;

  • channelInactive:当 Channel 的底层连接已经不是 ESTABLISH 状态,或者底层连接已经关闭时,会首先通过 Pipeline 回调所有ChannelHandler 的 channelInactive 方法;

  • handlerRemoved:当 Channel 关闭后,Netty 会移除掉 Pipeline 上所有 ChannelHandler,并通过 Pipeline 回调所有 ChannelHandler 的 handlerRemoved 方法。


5.2. ChannelOutboundHandler

ChannelOutboundHandler 接口定义了很多与 outbound 相关的回调方法,触发时机如下:

回调方法触发时机
bind监听地址(IP+ 端口)绑定:完成底层Java IO通道的地址绑定
connect连接服务端:完成底层Java IO通道的服务器端的连接操作
disconnect断开服务器连接:断开底层Java IO通道的服务器端连接
close主动关闭通道:关闭底层的通道,例如服务器端的新连接监听通道
write写数据到底层:完成Netty通道向底层Java IO通道的数据写入操作。
此方法仅仅是触发一下操作而已,并不是完成实际的数据写入操作。
flush清空缓冲区数据,将数据写到对端

ChannelOutboundHandler 的默认实现为 ChannelOutboundHandlerAdapter,开发 outbound 事件的 ChannelHandler 一般只要继承该类即可。


6. 向Pipeline中添加ChannelHandler

Pipeline 在创建 Channel 时同步被创建(ChannelPipeline创建时机(点击跳转)),在 ServerBootstrap#init 方法中对 NioServerSocketChannel 进行了初始化,在初始化的过程中,向 NioServerSocketChannel 的 Pipeline 中添加了一个 ChannelHandler(ServerBootstrapAcceptor 处理器 [1])。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
...
@Override
void init(Channel channel) {
...
// 从 NioServerSocketChannel 中取出 pipeline
ChannelPipeline p = channel.pipeline();

p.addLast(new ChannelInitializer<Channel>() {
// initChannel 方法会在 NioServerSocketChannel 注册完成后,通过 handlerAdded事件 被调用
@Override
public void initChannel(final Channel ch) {
// 注意:这里的 ch 和上面的 channel 是同一个对象,即: NioServerSocketChannel

// 从 NioServerSocketChannel 中取出 pipeline
final ChannelPipeline pipeline = ch.pipeline();

ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}

ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// ServerBootstrapAcceptor 用于将建立连接的 SocketChannel 转发给 childGroup
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
}

ChannelInitializer是一种特殊的ChannelHandler,用于初始化pipeline。

ChannelInitializer

https://www.cnblogs.com/binlovetech/p/16442598.html ChannelInitializer 小节。


7. 示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
@Slf4j
public class NettyEchoServer {
public static void main(String[] args) {
final AttributeKey<String> key = AttributeKey.valueOf(UUID.randomUUID().toString());
EventLoopGroup parentNioEventLoopGroup = new NioEventLoopGroup();
EventLoopGroup childNioEventLoopGroup = new NioEventLoopGroup();

try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(parentNioEventLoopGroup, childNioEventLoopGroup)
.handler(new LoggingHandler(LogLevel.INFO))
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.attr(key, "value")
.localAddress(SERVER_PORT)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new NettyEchoServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind().sync();
log.info("Echo 服务端准备就绪...");
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
try {
parentNioEventLoopGroup.shutdownGracefully().sync();
childNioEventLoopGroup.shutdownGracefully().sync();
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
}
}
}

@Slf4j
@ChannelHandler.Sharable
public class NettyEchoServerHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("调用方法:channelRegistered");
super.channelRegistered(ctx);
}

@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("调用方法:channelUnregistered");
super.channelUnregistered(ctx);
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("调用方法:channelActive");
super.channelActive(ctx);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("调用方法:channelInactive");
super.channelInactive(ctx);
}

/**
* 接收客户端发送的数据
* <p>
* 如果消息超过限制大小,则会多次调用此方法
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("调用方法:channelRead");

ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("echo server 收到:["+ctx.channel().remoteAddress()+"],message:["+byteBuf.toString(CharsetUtil.UTF_8)+"]");

ctx.write(ctx);
ctx.channel().write("");
}

/**
* 消息传输完成后调用此方法,只会调用一次。
* <p>
* 接收的消息超过限制大小,会多次调用{@link NettyEchoServerHandler#channelRead}方法,
* 但{@link NettyEchoServerHandler#channelReadComplete}方法,只会在此消息最后一次读取完成后被调用。
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
System.out.println("调用方法:channelReadComplete");

// 发送消息给客户端
String message = "服务端已收到消息,并给你发送一个问号?";
// 冲刷出站到客户端,并且关闭channel
ctx.writeAndFlush(Unpooled.copiedBuffer(message, CharsetUtil.UTF_8))
.addListener(ChannelFutureListener.CLOSE);
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("调用方法:handlerAdded");
super.handlerAdded(ctx);
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("调用方法:handlerRemoved");
super.handlerRemoved(ctx);
}

/**
* 处理过程中引发异常时被调用
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 发生异常,关闭通道
ctx.close();
// 输出异常信息
log.error(cause.getMessage(), cause);
}
}

8. Reference



  1. 1.ServerBootstrapAcceptor 是 Netty 服务端用来接收客户端连接的核心类,它的作用是当新的 SocketChannel 连接时,将 SocketChannel 注册到 childGroup 中的一个 NioEventLoop 上,继续监听相关读写事件。