编解码器
解码器
ByteToMessageDecoder
数据通过网络传输,最终会缓存在一个字节数组里
所以就会可能出现传输:
接收:
public class TimeDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 如果缓冲区没有足够的数据,不进行处理,只有缓冲区累积一定的数据时,才将数据添加到out
if (in.readableBytes() < 4){
return;
}
// 添加到out后,代表解码器成功解码了一条消息
out.add(in.readBytes(4));
}
}
...
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeDecoder(),new TimeClientHandler());
}
});
当然这种粘包也可以通过创建一个缓冲区,每次数据到来时,将数据放入到缓冲区,如果缓冲区超过一定大小则就进行处理
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
private ByteBuf buf;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg;
buf.writeBytes(m); // (2)
m.release();
if (buf.readableBytes() >= 4) { // (3)
long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
}
}
}
ReplayingDecoder
使用了一个自定义的ByteBuf 支持更简单的操作
MessageToMessageDecoder
ByteToMessage 是一次解码,而MessageToMessage在一次解码的基础上,对其进行二次解码
编码器
MessageToByteEncoder
public class ShortToByteEncoder extends MessageToByteEncoder<Short> { //← -- 扩展了MessageToByteEncoder
@Override
public void encode(ChannelHandlerContext ctx, Short msg, ByteBuf out)
throws Exception {
out.writeShort(msg); // ← -- 将Short 写入ByteBuf 中
}
}
MessageToMessageEncoder
编解码器
- xxxCodec
netty 内置的 Handler 以及 编解码器
- SslHandler
- SSL/TLS
- Http
- HttpResponseDecoder
- HttpRequestEncoder
- HttpServerCodec
pipeline.addLast("aggregator",new HttpObjectAggregator(512 * 1024));
- 压缩:HttpContentCompressor
- WebSocket
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(
new HttpServerCodec(),
new HttpObjectAggregator(65536), ← -- 为握手提供聚合的HttpRequest
new WebSocketServerProtocolHandler("/websocket"), ← -- 如果被请求的端点是"/websocket",则处理该升级握手
new TextFrameHandler(), ← -- TextFrameHandler 处理TextWebSocketFrame
new BinaryFrameHandler(), ← -- BinaryFrameHandler 处理BinaryWebSocketFrame
new ContinuationFrameHandler()); ← -- ContinuationFrameHandler 处理ContinuationWebSocketFrame
}
- 检测空闲连接或超时 前两者都是通过定时任务调度,检测最后操作时间实现,后者则是在特定时间检查特定状态实现
- IdleStateHandler
- ReadTimeoutHandler
- WriteTimeoutHandler
- 根据分隔符分割字节流
- DelimiterBasedFrameDecoder
- LineBasedFrameDecoder
- 根据长度分割字节流
- FixedLengthFrameDecoder
- LengthFieldBasedFrameDecoder
- 写大型数据
FileInputStream in = new FileInputStream(file); ← -- 创建一个FileInputStream
FileRegion region = new DefaultFileRegion( ← -- 以该文件的完整长度创建一个新的DefaultFileRegion
in.getChannel(), 0, file.length());
channel.writeAndFlush(region);
pipeline.addLast(new ChunkedWriteHandler()); ← -- 添加Chunked-WriteHandler以处理作为ChunkedInput传入的数据
pipeline.addLast(new WriteStreamHandler()); ← -- 一旦连接建立,WriteStreamHandler就开始写文件数据
数据传输前置长度
无论使用什么分割符代表消息间隔,数据中都会可能出现这样的符号,为了避免这个问题,可以通过使用固定的字节长度代表下一条消息长度来解决
03 下雨天 03 留客天 02 天留 03 我不留
- LengthFieldBasedFrameDecoder
序列化
JDK
名称 | 描述 |
---|---|
CompatibleObjectDecoder |
和使用JDK序列化的非基于Netty的远程节点进行互操作的解码器 |
CompatibleObjectEncoder |
和使用JDK序列化的非基于Netty的远程节点进行互操作的编码器 |
ObjectDecoder |
构建于JDK序列化之上的使用自定义的序列化来解码的解码器;当没有其他的外部依赖时,它提供了速度上的改进。否则其他的序列化实现更加可取 |
ObjectEncoder |
构建于JDK序列化之上的使用自定义的序列化来编码的编码器;当没有其他的外部依赖时,它提供了速度上的改进。否则其他的序列化实现更加可取 |
JBoss Marshalling
名称 | 描述 |
---|---|
CompatibleMarshallingDecoder ,CompatibleMarshallingEncoder |
与只使用JDK序列化的远程节点兼容 |
MarshallingDecoder , MarshallingEncoder |
适用于使用JBoss Marshalling的节点。这些类必须一起使用 |
Protocol Buffers
名称 | 描述 |
---|---|
ProtobufDecoder |
使用protobuf对消息进行解码 |
ProtobufEncoder |
使用protobuf对消息进行编码 |
ProtobufVarint32FrameDecoder |
根据消息中的Google Protocol Buffers的"Base 128 Varints"a整型长度字段值动态地分割所接收到的ByteBuf |
ProtobufVarint32LengthFieldPrepender |
向ByteBuf 前追加一个Google Protocal Buffers的"Base 128 Varints"整型的长度字段值 |