本文共 1700 字,大约阅读时间需要 5 分钟。
当客户端Channel的Reactor线程NioEventLoop检测到有读事件时,会执行NioByteUnsafe的read()方法。该方法会调用doReadBytes()方法将TCP缓冲区的数据读到由ByteBufAllocator分配的一个ByteBuf对象中,然后通过pipeline.fireChannelRead()方法带上这个ByteBuf对象向下传播ChannelRead事件。
在传播过程中,首先会来到pipeline的head结点的channelRead()方法。该方法会继续带着那个ByteBuf对象向下传播ChannelRead事件,比如会来到ByteToMessageDecoder结点的channelRead()方法。
需要注意的是,服务端Channel的unsafe变量是一个NioMessageUnsafe对象,而客户端Channel的unsafe变量是一个NioByteUnsafe对象。
不断地从TCP缓冲区中读取数据,每次读完都判断是否为一个完整的数据包。如果当前读取的数据不足以拼接成一个完整的数据包,就保留数据,继续从TCP缓冲区中读。如果当前读取的数据加上已读取的数据足够拼成完整的数据包,就将拼好的数据包往业务传递,而多余的数据则保留。
Netty拆包基类内部会有一个字节容器,每次读取到数据就添加到字节容器中。然后尝试对累加的字节数据进行拆包,拆成一个完整的业务数据包,这个拆包基类叫ByteToMessageDecoder。
(2) 首先累加字节流
如果当前字节容器中没有数据,那么就将字节容器的指针指向新读取的数据。如果当前字节容器中有数据,那么就调用累加器的cumulate()方法将数据累加到字节容器。
(3) 然后调用子类的decode()方法进行解析
把累加字节容器里的字节流通过子类的decode()方法进行解析。
(4) 接着清理字节容器
为了防止发送端发送数据过快,ByteToMessageDecoder会在读取完一次数据并完成业务拆包后,清理字节容器。
(5) 最后将解析到的ByteBuf向下传播
如果调用子类的decode()方法可以解析到一个ByteBuf对象,则将这个ByteBuf对象向下传播。
解码过程是通过一个叫ByteToMessageDecoder的抽象解码器来实现的,ByteToMessageDecoder实现的解码过程分为如下四步:
(1) 累加字节流
也就是把当前读到的字节流累加到一个字节容器里。
(2) 调用子类的decode()方法进行解析
ByteToMessageDecoder的decode()方法是一个抽象方法,不同种类的解码器会有自己的decode()方法逻辑。
(3) 清理字节容器
为了防止发送端发送数据过快,ByteToMessageDecoder会在读取完一次数据并完成业务拆包后,清理字节容器。
(4) 传播已解码的业务数据包
如果List列表里有解析出来的业务数据包,那么就通过pipeline的事件传播机制往下进行传播。
判断当前字节容器可读字节是否小于固定长度。
基于行分隔符的拆包器可以同时处理\n和\r\n两种类型的行分隔符,其处理逻辑分为非丢弃模式和丢弃模式、找到行分隔符和未找到行分隔符的情况。
可以向基于分隔符解码器DelimiterBasedFrameDecoder传递一个分隔符列表,这样该解码器就会按照分隔符列表对数据包进行拆分。
主要的逻辑步骤如下:
(1) 丢弃模式的处理
(2) 获取待拆数据包的大小
(3) 对数据包进行长度校验
(4) 跳过指定字节长度
(5) 抽取数据包
转载地址:http://xvcfk.baihongyu.com/