MENU

Netty - 核心编解码器

April 6, 2021 • Read: 2207 • 后端

Netty - 核心编解码器

netty 最核心的部分就是掌握 ByteBuf 的用法原理,业务逻辑主要是写ChannelHandler,以及各种编解码器。

1、ChannelInboundHandlerAdapter & ChannelOutboundHandlerAdapter - 核心实现类

这俩实现类都继承了 io.netty.channel.ChannelHandlerAdapter 类,基本实现都需要用户去实现,所以他基本上对于用户来说是透明的,我们开发可以控制我们全部的输入输出,对象进行释放等等。

API 方法

image-20210405232130480.png

image-20210405232659411.png

// 判断是否和其他管道共享数据 ,每一个客户端连接会申请一个管道, 就和别人共享
isSharable
// 每一个客户端只会注册一次handler
handlerAdded
// 每一个客户端只会一次注册
channelRegistered
// 活跃
channelActive

// 接收客户端信息
channelRead -> write/flush操作

// out输出流处理
out : write
out : flush   

// 已经写出去了 ....
channelReadComplete

// 断开 ....
channelInactive
channelUnregistered
handlerRemoved

1. exceptionCaught() 和 handlerRemoved() 事件何时触发

  1. exceptionCaught():异常关闭,比如关闭客户端或者服务端手动关闭
  2. handlerRemoved():正常关闭执行,比如执行 ctx.close()

所以一般场景是:

  1. 服务端执行了 ctx.close(),此时客户端应当在handlerRemoved()中执行被关闭的业务逻辑,同时服务器端也是,当服务器异常关闭的时候,客户端的exceptionCaught()事件会触发
  2. 当客户端执行了 ctx.close(),服务器端的handlerRemoved()事件就会触发,当客户端异常关闭,服务器端的exceptionCaught()事件会触发

所以,异常关闭事件时exceptionCaught(),正常关闭时handlerRemoved()


2. ctx.writeAndFlush(msg) 与 ct.channel().writeAndFlush(msg) 区别

  1. ChannelHandlerContext.writeAndFlush(msg);

比如目前的pipeline.addlast(out1,in,out2) ,此时比如in执行了ChannelHandlerContext.writeAndFlush(msg);,此时解码器只会走out1 , 就是从开头到in, 也就是执行 out1.write -> ou1.flush

  1. ChannelHandlerContext.channel().writeAndFlush(msg);

而这个呢, 还是上面种情况 , 此时我in输出改成了 ChannelHandlerContext.channel().writeAndFlush(msg); 此时会从开头到最后走一遍 , 也就是会从 out1.write -> out2.write -> ou1.flush -> ou2.flush

3. ctx.write() 和 ctx.flush() 和 ctx.writeAndFlush()

write就是写 ,flush就是推到缓冲区发出去

ctx.write() 会调用输出流的 write方法 , 同时 flush也是 , 然后 writeAndFlush是两者的结合体,俩都执行


2、MessageToByteEncoder 与 ByteToMessageDecoder 编解码器

这俩类实现了上面我们刚刚提到的俩编解码器的实现, 所以他基本上封装了他主要的方法 , 所以我们主要关注io.netty.handler.codec.ByteToMessageDecoder#channelRead 这个方法。都是抽象类,所以需要我们继承,重写对应的encode() 和 decode() 方法

1. MessageToByteEncoder<I> 编码器

将 发送出去的信息 转换成 ByteBuf对象。这个泛型参数I,就是输入的消息。

由于我们执行写出操作,比如 ctx.write() 或者 writeAndFlush操作,会调用输出流的write方法

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ByteBuf buf = null;
    try {
        if (acceptOutboundMessage(msg)) {
            @SuppressWarnings("unchecked")
            // 我们发送的消息
            I cast = (I) msg;
            // 分配内存
            buf = allocateBuffer(ctx, cast, preferDirect);
            try {
                // 解码-> 交给我们去写
                encode(ctx, cast, buf);
            } finally {
                ReferenceCountUtil.release(cast);
            }

            if (buf.isReadable()) {
                ctx.write(buf, promise);
            } else {
                buf.release();
                ctx.write(Unpooled.EMPTY_BUFFER, promise);
            }
            buf = null;
        } else {
            //ChannelOutboundInvoker 处理链
            ctx.write(msg, promise);
        }
    } catch (EncoderException e) {
        throw e;
    } catch (Throwable e) {
        throw new EncoderException(e);
    } finally {
        // 他会帮助我们释放一次,所以不需要我们手动释放我们输出的对象
        if (buf != null) {
            buf.release();
        }
    }
}
// 比如:自定义实现这个类,会让我们实现encode方法
public class IntegerEncoder extends MessageToByteEncoder<Integer> {
    @Override
    public void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out)
        throws Exception {
        out.writeInt(msg);
    }
}

2. ByteToMessageDecoder 解码器(重点)

他重写了 ChannelInboundHandlerAdapter 类 , 重写了父类好多方法 ,我们主要关注 io.netty.handler.codec.ByteToMessageDecoder#channelRead 这个方法

基本核心的文档内容在下面 :

​ ChannelInboundHandlerAdapter which decodes bytes in a stream-like fashion from one ByteBuf to an other Message type.

就是将输入进来的ByteBuf转换成我们想要的数据类型对象 , 添加到他的集合中

​ Generally frame detection should be handled earlier in the pipeline by adding a DelimiterBasedFrameDecoder, FixedLengthFrameDecoder, LengthFieldBasedFrameDecoder, or LineBasedFrameDecoder.

通常,通过添加DelimiterBasedFrameDecoderFixedLengthFrameDecoderLengthFieldBasedFrameDecoderLineBasedFrameDecoder,可以在管道中更早地处理帧检测。

io.netty.handler.codec.ByteToMessageDecoder#channelRead

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 判断 , 他是不是ByteBuf 类型
    if (msg instanceof ByteBuf) {
        CodecOutputList out = CodecOutputList.newInstance();
        try {
            // first来判断 cumulation(buf)里面是否缓存了前面的数据
            first = cumulation == null;
            // 这里就会将 前面的累积的数据 与新传进来的数据合并在一起,然后 cumulation 里面 存取当前的所有数据
            // 这里使用了策略模式:cumulator。默认:内存复制积累器
            cumulation = cumulator.cumulate(ctx.alloc(),
                                            first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
            // 这里调用解码方法
            callDecode(ctx, cumulation, out);

        } catch (DecoderException e) {
            throw e;
        } catch (Exception e) {
            throw new DecoderException(e);
        } finally { // 上面的 callDecode()方法里面如果执行了break、等,就会执行这里finally
            // 读完就要释放,所以一般不需要我们手动释放
            if (cumulation != null && !cumulation.isReadable()) {
                numReads = 0;
                cumulation.release();
                cumulation = null;
            } else if (++ numReads >= discardAfterReads) {
                ...
            }

            int size = out.size();
            firedChannelRead |= out.insertSinceRecycled();
            // 向下传递数据
            fireChannelRead(ctx, out, size);
            out.recycle();
        }
    } else {
        ctx.fireChannelRead(msg);
    }
}

解码中两种数据积累器(Cumulator) 的区别。

使用了策略模式

  1. MERGE_CUMULATOR (默认方式)

使用了内存复制 (using memory copies)

  1. COMPOSITE_CUMULATOR

组合。对外提供一个逻辑的统一视图

io.netty.handler.codec.ByteToMessageDecoder#callDecode方法

protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    try {
        // 当可读,就一直执行,就是读指针 < 写指针 
        while (in.isReadable()) {
            int outSize = out.size();

            // 一般不会走这里
            if (outSize > 0) {
                fireChannelRead(ctx, out, outSize);
                out.clear();

                ...
                outSize = 0;
            }

            // 记录可读长度
            int oldInputLength = in.readableBytes();
            // 调用这里。这里面就包括了调用自己实现的decode()方法
            // 在decode中时,不能执行handler remove清理操作
            // 那decode完之后,需要清理数据
            decodeRemovalReentryProtection(ctx, in, out);

            ...

            // 这里就说明没有往out里面添加数据
            if (outSize == out.size()) {
                // 如果读指针没有移动过,
                if (oldInputLength == in.readableBytes()) {
                    break;
                } else {
                    //
                    continue;
                }
            }

            ...
        }
    } catch (DecoderException e) {
        ...
    }
}

io.netty.handler.codec.ByteToMessageDecoder#decodeRemovalReentryProtection方法

final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
    throws Exception {
    // decodeState状态:处理handler 被remove的情况
    decodeState = STATE_CALLING_CHILD_DECODE;
    try {
        // 这里调用的是个抽象方法,实际调用的就是自己实现这个类时,具体实现的方法内容
        // 运用了模板模式
        decode(ctx, in, out);
    } finally {
        ...
    }
}

io.netty.handler.codec.ByteToMessageDecoder#decode抽象方法,需要我们去实现

protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;

3、MessageToMessageDecoder 和 MessageToMessageEncoder 编解码器

1. MessageToMessageEncoder<I> 编码器

他继承了 ChannelOutboundHandlerAdapter 类 , 所以是一个输出流 编码器 ,

这里的泛型指的是输入端 , out里添加输出 , 这个不管你是啥, 一般情况都是ByteBuf对象.

这里通常就是将Java对象变成 ByteBuf对象。泛型I就是Java对象

public class IntegerToStringEncoder extends
       MessageToMessageEncoder<Integer> {

    // 差不多,很简单
   @Override
   public void encode(ChannelHandlerContext ctx, Integer message, List<Object> out)
           throws Exception {
       out.add(message.toString());
   }
}

2. MessageToMessageDecoder<I> 解码器

这里的泛型指的是输入端 , 输出中添加你想添加的对象。

这里通常就是将 ByteBuf对象 变成Java对象。泛型I就是ByteBuf对象

重写下面的方法就行 ,下面这个例子表示的是输入是String, 输出是Int 类型

public class StringToIntegerDecoder extends
       MessageToMessageDecoder<String> {

    @Override
   public void decode(ChannelHandlerContext ctx, String message,
                      List<Object> out) throws Exception {
       out.add(message.length());
   }
}

4、SimpleChannelInboundHandler<I>

与我们直接继承 ChannelInboundHandlerAdapter相比的优势:

  1. acceptInboundMessage方法可以判断当前传递的msg是否可以被当前Handler处理
  2. 他可以帮助我们自动释放内存

泛型I是我们已经解码后的类型,他会将解码后的类型传递给我们,同时他并不需要我们手动释放,他实现了ChannelInboundHandlerAdapter类,

基本上他是不需要我们做类型转换的。开发时,只需要专注实现抽象方法 channelRead0

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 每次都会release初始化为true
    boolean release = true;
    try {
        // 判断类型,因为SimpleChannelInboundHandler有一个泛型I, 就是判断和他记录的类型是否相同
        if (acceptInboundMessage(msg)) {
            @SuppressWarnings("unchecked")
            I imsg = (I) msg;
            // 这个就是我们写的业务逻辑
            channelRead0(ctx, imsg);
        } else {
            // 如果不是 I这个类型,当前handler就不会处理
            release = false;
            // 直接像后面的Hanler传递就行
            ctx.fireChannelRead(msg);
        }
    } finally {
        // 最后会自动给我们释放 ,所以我们不需要人工去释放,会造成不必要的浪费
        if (autoRelease && release) {
            ReferenceCountUtil.release(msg);
        }
    }
}

io.netty.channel.SimpleChannelInboundHandler#channelRead0抽象方法

protected abstract void channelRead0(ChannelHandlerContext ctx, I msg) throws Exception;

5、一般用“两层”编解码器

为什么需要“二次”解码

把解决粘包和半包问题的常用三种解码器叫一次解码器

因为一次解码的结果是字节,所以需要将其转换为对象。称为“二次解码器”

反之,编码器也是类似。

  • 一次解码器:ByteToMessageDecoder

ByteBuf(原始数据流) -> ByteBuf(用户数据) --> 这里处理了粘包和半包问题

  • 二次解码器:MessageToMessageDecoder<I>

ByteBuf(用户数据) -> Java Object

可以合并成一步到位?

可以,但不建议。原因:没有分层,不够清晰;耦合性高,不容易置换方案

常用的“二次编解码”方式

Java序列化、XML、JSON、MessagePack、Protobuf、其他

Netty 对二次编解码的支持

例子:io.netty.example.worldclock.WorldClockClientInitializer#initChannel