基本说明
1、netty的组件设计:Netty的主要组件有Channel、EventLoop、ChannelFuture、ChannelHandler、ChannelPipe等;
2、ChannelHandler充当了处理入站和出站数据的应用程序逻辑的容器。例如:实现ChannelInboundHandler接口(或
ChannelInboundHandlerAdapter),你就可以接收入站事件和数据,这些数据会被业务逻辑处理。当要给客户端发送响应时,也可以从ChannelInboundHandler冲刷数据。业务逻辑通常写在一个或者多个ChannelInboundHandler中。ChannelOutboundHandler原理一样,只不过它是用来处理出站的数据。
3、ChannelPipeline提供了ChannelHandler链的容器。以客户端应用程序为例,如果事件的运动方向是从客户端到服务端的,那么我们称这些事件为出站的,即客户端发送给服务器端的数据会通过pipeline中的一系列ChannelOutboundHandler,并被这些Handler处理,反之则称为入站的。
管道出入站
编码解码器
1、当Netty发送或者接收一个消息的时候,就将会发生一次数据转换。入站消息会被解码:从字节转换为另外一种格式(比如Java对象);如果是出站消息,它会被编码成字节。
2、Netty提供一系列实用的编解码器,他们都实现了ChannelInboundHandler或者ChannelOutboundHandler接口。在这些类中,channelRead方法已经被重写了,以入站为例,对于每个从入站Channel读取的消息,这个方法会被调用。随后,它将调用由解码器所提供的decode()方法进行解码,并将已经解码的字节转发给ChannelPipeline中的下一个ChannelInboundHandler.
解码器-ByteToMessageDecoder
1、由于不可能知道远程节点是否会一次性发送一个完整的信息,tcp有可能出现粘包拆包的问题,这个类会对入站数据进行缓冲,直到它准备好被处理,它的类继承关系如下:
类关系图
2、一个关于ByteToMessageDecoder实例分析
public class ToIntegerDecoder extends ByteToMessageDecoder{ @Override protected void decode(ChannelHandlerContext ctx,ByteBuf in,List<Object> out) throws Exception{ if(in.readableBytes()>=4){ out.add(in.readInt()); } }}
说明:
① 这个例子,每次入站从ByteBuf中读取4个字节,将其解码为一个int,然后将它添加到下一个List中。当没有更多元素可以被添加到该List中时,它的内容将会被发送给下一个ChannelInboundHandler。int在被添加到List中时,会被自动装箱为Integer。在调用readInt()方法前必须验证所输入的ByteBuf是否具有足够的数据;
Netty的handler链的调用机制
实例要求
1、使用自定义的编码器和解码器来说明Netty的handler调用机制;
2、客户端发送long->服务端,服务端发送long->客户端;
handler链调用机制
代码演示
服务端代码,绑定7000端口,并定义一个初始化类MyServerInitializer来设置服务端的解码器、编码器、和它的业务逻辑处理器
/** * 服务端 */public class MyServer { public static void main(String[] args) { NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class) //自定义一个初始化类 .childHandler(new MyServerInitializer()); ChannelFuture channelFuture = serverBootstrap.bind(7000).sync(); channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}
MyServerInitializer自定义类的代码
public class MyServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //入站的handler进行解码MybateToLongDecoder pipeline.addLast(new MyByteToLongDecoder()); //这是出站的handler进行编码 pipeline.addLast(new MyLongToByteEncoder()); //自定义的handler处理业务逻辑 pipeline.addLast(new MyServerHandler()); }}
自定义的解码器MyByteToLongDecoder,对于服务端来说,当消息入站时,会调用该解码器对消息进行解码,前提条件是入站的消息类型和解码器的处理消息类型一致解码器才会被调用,注意我们这里演示的是long类型的消息,它的字节长度为8个字节,所以在解码的时候,我们要进行判断一下,有8个字节才进去读取
public class MyByteToLongDecoder extends ByteToMessageDecoder { /** *decode 会根据接收的数据,被调用多次,直到确定没有新的元素被添加到list,或者 * 是ByteBuf没有更多的可读字节为止,如果list out 不为空,就会将list的内容传递给下一个 * channelinboundHandler处理器,该处理器的方法也会被调用多次 * @param ctx 上下文对象 * @param in 入站的ByteBuf * @param out List集合,将解码后的数据传给下一个handler */ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { System.out.println("MyByteToLongDecoder 被调用"); //因为Long 为8个字节,这里需要判断有8个字节,才能读取一个long if(in.readableBytes()>=8){ out.add(in.readLong()); } }}
自定义编码器MyLongToByteEncoder,对于服务端来说,当数据出站时,它需要将数据进行编码再发送,它的原理其实和解码器是一样的道理,只有发送的消息类型和编码器处理的数据类型一致,该编码器才会在消息发出之前被调用,不然,它就不会经过该编码器,消息就会直接发送出去
public class MyLongToByteEncoder extends MessageToByteEncoder<Long> { /** * 编码方法,这个编码方法会根据接收到的数据类型,如果是Long类型就被被调用, * 如果不是Long类型就不会调用该编码器,直接就会将数据发送出去不会经过该编码器,判断的逻辑在父类MessageToByteEncoder的write方法里面的this.acceptOutboundMessage(msg) * 因此我们在编写Encoder要注意传入的数据类型和处理的数据类型要一致 * @param ctx 上下文 * @param msg 发送的消息 * @param out */ @Override protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception { System.out.println("MyLongToByteEncoder encode 被调用,msg="+msg); //因为msg已经是long类型了,所以这里不需要进行编码,这里只是讲解一下编码的逻辑 out.writeLong(msg); }}
服务端的业务逻辑处理器MyServerHandler,这里我们将客户端发送给服务端的消息直接打印出来,并且给服务端发送一个long类型的消息
public class MyServerHandler extends SimpleChannelInboundHandler<Long> { @Override protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception { System.out.println("从客户端"+ctx.channel().remoteAddress()+" 读取到long "+msg); //给客户端发送一个long ctx.writeAndFlush(98979L); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}
客户端代码,定义一个初始化类MyClientInitializer来设置它的解码器、编码器、和它的业务逻辑处理器
public class MyClient { public static void main(String[] args) { NioEventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class) //自定义一个初始化类 .handler(new MyClientInitializer()); ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync(); channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } }}
客户端的初始化类MyClientInitializer
public class MyClientInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //加入一个出站的handler 对数据进行编码 pipeline.addLast(new MyLongToByteEncoder()); //这是一个入站的解码器(入站handler) pipeline.addLast(new MyByteToLongDecoder()); //加入一个自定义的handler,处理业务 pipeline.addLast(new MyClientHandler()); }}
然后它也是需要有编码器、解码器,和服务端的原理是一样的,它的业务处理器用来处理业务,也就是它可以只关注业务MyClientHandler,这里客户端的业务逻辑只打印出服务端发送的消息,及一连接上服务端就给服务端发送一条消息
public class MyClientHandler extends SimpleChannelInboundHandler<Long> { @Override protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception { System.out.println("服务器的ip="+ctx.channel().remoteAddress()+",收到消息="+msg); } /** * 一连上服务端就给服务端发送消息 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("MyClientHandler 发送数据"); //发送一个Long类型的数据 ctx.writeAndFlush(1234567L); }}
演示代码,先启动服务端,然后再启动客户端,先观察服务端的输出,根据输出可以看到当数据入站时,解码器先被调用,然后再调用业务逻辑处理器,业务逻辑处理器发送消息给客户端的时候,会调用编码器
MyByteToLongDecoder 被调用从客户端/127.0.0.1:53398 读取到long 1234567MyLongToByteEncoder encode 被调用,msg=98979
观察客户端的输出,可以看到发送消息给服务端时,编码器被调用,当服务端回送消息回来的时候,解码器被调用然后再到业务逻辑处理器
MyClientHandler 发送数据MyLongToByteEncoder encode 被调用,msg=1234567MyByteToLongDecoder 被调用服务器的ip=localhost/127.0.0.1:7000,收到消息=98979
3、结论
① 不论解码器handler还是编码器handler即接收的消息类型必须与待处理的消息类型一致,否则该handler不会被执行;
② 在解码器进行数据解码时,需要判断缓存区(ByteBuf)的数据是否足够,否则接收到的结果会和期望的可能不一致;
解码器-ReplayingDecoder
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
1、ReplayingDecoder 扩展了ByteToMessageDecoder类,使用这个类我们不必调用readableBytes()方法,参数S指定了用户状态管理的类型,其中void代表不需要状态管理;
2、应用实例:使用ReplayingDecoder编写解码器,对前面的案例进行简化,它可以替代上面的解码器,这样子在服务端读取消息的时候,就不用判断接收到的消息的字节数是否够8个字节,因为它底层已经做了处理,它直接读取消息便可
public class MyByteToLongDecoder2 extends ReplayingDecoder<Void> { @Override protected void decode(ChannelHandlerContext cx, ByteBuf in, List<Object> out) throws Exception { System.out.println("MyByteToLongDecoder2 被调用"); //在 ReplayingDecoder 不需要判断数据是否足够读取,内部会进行处理判断 out.add(in.readLong()); }}
3、ReplayingDecoder使用方便,但它也有一些局限性
① 并不是所有的ByteBuf操作都被支持,如果调用了一个不被支持的方法,将会抛出一个
UnsupportedOperationException;
② ReplayingDecoder在某些情况下可能稍慢于ByteToMessageDecoder,例如网络缓慢并且消息格式复杂时,消息会被拆成了多个碎片,速度变慢;
其他编解码器
1、LineBasedFrameDecoder:这个类在Netty内部也有使用,它使用行尾控制字符(n或者rn)作为分隔符来解析数据。
2、
DelimiterBasedFrameDecoder:使用自定义的特殊字符作为消息的分隔符。
3、HttpObjectDecoder:一个HTTP数据的解码器。
4、
LengthFieldBasedFrameDecoder:通过指定长度来标识整包消息,这样就可以自动的处理粘包和半包消息。
......