Netty_示例及核心组件
2025-01-22 08:19:30    2k 字   
This post is also available in English and alternative languages.

通过编写一个简单示例,了解Netty中几个核心组件。

本文部分内容转载自 : 《透彻理解Java网络编程(九)——Netty概述》


1. 服务端示例

服务端示例代码

1.1. NettyEchoServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// netty-example-03
@Slf4j
public class NettyEchoServer {
public static void main(String[] args) {
final AttributeKey<String> key = AttributeKey.valueOf(UUID.randomUUID().toString());
EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup childGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(parentGroup, childGroup)
.channel(NioServerSocketChannel.class)
.localAddress(SERVER_PORT)
.option(ChannelOption.SO_BACKLOG, 128)
.attr(key, "value")
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new NettyEchoServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind().sync();
log.info("Echo 服务端准备就绪...");
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {...} finally {
try {
parentGroup.shutdownGracefully().sync();
childGroup.shutdownGracefully().sync();
} catch (InterruptedException e) {...}
}
}
}

1.1.1. Bootstrap

通过 Bootstrap(引导) 来完成Netty服务端或客户端的组件组装、以及Netty程序的初始化;不管程序使用那种协议,无论是创建一个客户端还是服务端都需要使用"Bootstrap"。

Bootstrap有两个类型:

  • ServerBootstrap: Netty服务端 的组装工厂类;
  • Bootstrap: Netty客户端 的组装工厂类。

AbstractBootstrap类关系图

以上面的示例代码为例,配置服务端 BootStrap 有以下几个步骤:

  • 配置 EventLoopGroup 线程组;
  • 配置 Channel 的类型;
  • 设置网络监听的端口;
  • 设置 SocketChannel 对应的 Handler;
  • 配置 Channel 的 Pipeline;
  • 启动 Netty Server;

1.1.2. EventLoopGroup

Netty是基于 《网络编程-Reactor模式》 实现的,配置 ServerBootstrap 时通过 group 方法传递不同的参数,就可以支持 Reactor 的三种线程模型: 单reactor-单线程模型、单reactor-多线程模型、主从reactor-多线程模型。

EventloopGroup 本质是线程池, 它的核心作用是接收 I/O 请求,并分配线程执行处理;Netty 在配置 ServerBootstrap 时通过 group 方法传递不同的参数,就可以支持 Reactor 的三种线程模型:

网上很多资料都将创建的 EventLoopGroup 对象称之为: boosGroup(boos) 和 workGroup(work),这里OP不会这么称呼,而是采用源码中变量的命名,即: parentGroup 和 childGroup。


1.1.3. channel

在Netty中,channel(通道) 是核心概念之一,代表了网络连接,是对Java底层 Socket连接 的抽象,channel(通道)负责与对端进行网络通信: 既可以写入数据到对端,也可以从对端读取数据。

channel(通道) 是 Java NIO 中的核心概念,表示一个打开的连接,这个连接可以连接到I/O设备 (如: 磁盘文件、Socket) 或者一个支持 I/O 访问的应用程序,Java NIO 使用 buffer(缓冲区) 和 channel(通道) 来进行数据传输。

Netty对NIO原生的 channel(通道) 进行了封装和增强(NioXXXChannel),对应于不同的协议,Netty中常见的通道类型如下:

Netty中的Channel继承NIO中的Channel备注
NioSocketChannelTCP Socket传输通道
NioServerSocketChannelTCP Socket服务器端监听通道;
NioDatagramChannelUDP传输通道;
NioSctpChannelSctp传输通道;
NioSctpServerChannelSctp服务器端监听通道;
OioSocketChannel同步阻塞式TCP Socket传输通道;
OioServerSocketChannel同步阻塞式TCP Socket服务器端监听通道;
OioDatagramChannel同步阻塞式UDP传输通道;
OioSctpChannel同步阻塞式Sctp传输通道;
OioSctpServerChannel同步阻塞式Sctp服务器端监听通道。

1.1.4. channelPipeline

如上代码示例(NettyEchoServer),ChannelInitializer 会把 NettyEchoServerHandler 实例添加到 channel(通道) 的 channelPipeline 中。

当一个新的连接被建立后,一条新的 channel(通道) 也将会被创建,一个 channel(通道) 可能需要很多的 handler(业务处理器) 来处理业务,每个 channel(通道) 内部都有一条 pipeline(流水线) 将所有 handler(业务处理器) 装配起来。

netty业务处理器流水线 channelPipeline 是基于责任链设计模式(Chain of Responsibility)来设计的,内部是一个双向链表结构,能够支持动态添加、删除 handler(业务处理器)。

channelPipeline(图片来源:https://www.tpvlog.com/article/354#menu_6)

1.2. NettyEchoServerHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Slf4j
public class NettyEchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
log.info("echo server 收到:[{}],message:[{}]", ctx.channel().remoteAddress(), byteBuf.toString(CharsetUtil.UTF_8));
ctx.write(ctx);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
String message = "服务端已收到消息,并给你发送一个问号?";
ctx.writeAndFlush(Unpooled.copiedBuffer(message, CharsetUtil.UTF_8))
.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
log.error(cause.getMessage(), cause);
}
}
  • channelRead() 方法: 接收客户端发送的消息数据;如果消息数据超过限制大小,则会多次调用此方法。

  • channelReadComplete() 方法: 接收一条消息数据传输完成后会调用此方法(只会调用一次);如果接收的消息数据超过限制大小,会多次调用channelRead()方法, 但channelReadComplete方法,只会在此消息最后一次读取完成后被调用。

  • exceptionCaught: 处理过程中引发异常时被调用。


2. 客户端示例

2.1. NettyEchoClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Slf4j
public class NettyEchoClient {
public static void main(String[] args) {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(SERVER_ADDRESS, SERVER_PORT))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new NettyEchoClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect().sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {...} finally {
try {
eventLoopGroup.shutdownGracefully().sync();
} catch (InterruptedException e) {...}
}
}
}
  • eventLoopGroup

    该线程组被分配处理事件。


2.2. NettyEchoClientHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Slf4j
public class NettyEchoClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks", CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
log.info("收到服务端:[{}] 的消息:[{}]", ctx.channel().remoteAddress(), byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
log.error(cause.getMessage(), cause);
}
}
  • channelActive

    客户端与服务端建立连接后被调用。

    示例中,当客户端与服务端建立连接后,向服务端发送了一条消息。

  • channelRead

    接收服务端发送的数据。

    值得注意的是,服务器发送的消息可能会被分块接收;也就是说,如果服务器发送了5字节,那么不能保证这5字节会被一次性接收,即使是对于这么少量的数据,channelRead方法也可能会被调用两次。

    • 第一次使用一个持有3字节的ByteBuf(Netty 的字节容器)

    • 第二次使用一个持有2字节的ByteBuf

    作为一个面向流的协议,TCP保证了字节数组将会按照服务器发送它们的顺序被接收。

  • exceptionCaught

    处理过程中引发异常时被调用。


3. 逻辑架构

Netty 采用了典型的三层网络架构

netty逻辑架构(图片来源:https://www.tpvlog.com/article/354#menu_7)

3.1. 网络通信层

网络通信层的职责是执行网络 I/O 的操作;它支持多种网络协议和 I/O 模型的连接操作,当网络数据读取到内核缓冲区后,会触发各种网络事件,这些网络事件会分发给事件调度层进行处理。

网络通信层的核心组件包含 BootStrapServerBootStrapChannel 三个组件:

  • BootStrapServerBootStrap 分别负责客户端和服务端的启动,它们是非常强大的辅助工具类;

  • Channel 是网络通信的载体,提供了与底层 Socket 交互的能力。


3.2. 事件调度层

事件调度层的职责是通过 Reactor 线程模型对各类事件进行聚合处理,通过 Selector 主循环线程集成多种事件( I/O 事件、信号事件、定时事件等),实际的业务处理逻辑是交由服务编排层中相关的 Handler 完成。

事件调度层的核心组件包括 EventLoopGroup、EventLoop


3.3. 服务编排层

服务编排层的职责是负责组装各类服务,它是 Netty 的核心处理链,用以实现网络事件的动态编排和有序传播。

服务编排层的核心组件包括 channelPipelineChannelHandler、ChannelHandlerContext


4. Reference