gRPC中的FRAME

gRPC中的FRAME

经过前边两章,大家应该了解了gRPC中的网络相关的知识,但是真的要通讯起来,网络包、通讯流程、数据结构又是如何的呢?直接使用HTTP2进行通讯不就完事了?远没有那么简单,本文会从gRPC中的WriteQueue出发,介绍一下gRPC中请求的封装结构,以及Frame的概念。下文所有的Frame,也叫帧,各位看官注意一下。

系列目录

HTTP2中的Frame格式

所有的帧都以一个9字节的报头开始, 后接变长的载荷:

+-----------------------------------------------+
 |                 Length (24)                   |
 +---------------+---------------+---------------+
 |   Type (8)    |   Flags (8)   |
 +-+-------------+---------------+-------------------------------+
 |R|                 Stream Identifier (31)                      |
 +=+=============================================================+
 |                   Frame Payload (0...)                      ...
 +---------------------------------------------------------------+

报头部分的字段定义如下:

  • Length: 载荷的长度, 无符号24位整型. 对于发送值大于2^14 (长度大于16384字节)的载荷, 只有在接收方设置SETTINGS_MAX_FRAME_SIZE为更大的值时才被允许。 注:帧的报头9字节不算在length里。
  • Type: 8位的值表示帧类型, 决定了帧的格式和语义。协议实现上必须忽略任何未知类型的帧。
  • Flags: 为Type保留的bool标识, 大小是8位。对确定的帧类型赋予特定的语义。否则发送时必须忽略(设置为0x0)。
  • R: 1位的保留字段,尚未定义语义。发送和接收必须忽略(0x0)。
  • Stream Identifier:31位无符号整型的流标示符。其中0x0作为保留值, 表示与连接相关的frames作为一个整体而不是一个单独的流。 具体的Frame定义格式大家可以参考RFC7540

数据写入中转站:WriteQueue

为什么需要从WriteQueue开始讲起,因为所有的gRPC的数据都是通过他写入Channel的,Channel不太了解的同学还是看一下Netty的源码。 既然叫做Queue,储存了什么呢?

  private final Channel channel;
  private final Queue<QueuedCommand> queue;
  private final AtomicBoolean scheduled = new AtomicBoolean();

看来是一个个的QueuedCommand,现在已有的类型有:

  • SendGrpcFrameCommand真正的gRPC请求数据,你可以理解成类似Data的东西。
  • CreateStreamCommand在remote端生成Stream的请求,由NettyClientStream调用生成。
  • GracefulCloseCommand正常关闭流。
  • ForcefulCloseCommand强行关闭流。
  • CancelServerStreamCommand取消Server端的流。
  • CancelClientStreamCommand取消Client端的流。
  • SendPingCommand心跳。
  • SendResponseHeadersCommand创建回复Header,一般用于Server处理完请求回写给客户端。 比较重要的就是GrpcFrameCommand,我们可以看到在他的run()方法中,开始了Channel的交互:
    @Override
    public final void run(Channel channel) {
      channel.write(this, promise);
    }
    

    那什么时候会调用呢? 我们来看WriteQueuescheduleFlush()方法:

    void scheduleFlush() {
      if (scheduled.compareAndSet(false, true)) {
        // Add the queue to the tail of the event loop so writes will be executed immediately
        // inside the event loop. Note DO NOT do channel.write outside the event loop as
        // it will not wake up immediately without a flush.
        channel.eventLoop().execute(later);
      }
    }
    

    看来是串行的在往eventLoop()中提交刷新,为什么说是串行的,因为大家会根据源码看看later,其中最终会调用到WriteQueueflush()方法,所以,流程就变成。

    1. WriteQueue中的enqueue()方法加入队列,并决定是否要往Channel中写
    2. 如果要写,那么就会调用scheduleFlush()
    3. 回调到SendGrpcFrameCommand开始写channel
    4. 写完Channel刷新缓冲区。

gRPC中的Frame

讲完WriteQueue,相信大家应该知道了,看来我们rpc通信的数据都是放在SendGrpcFrameCommand里啊。 是的没错,那么具体放了些什么呢? 在AbstractServerStream中我们找到了答案,原来在NettyServerStream.SinkNettyClientStream.Sink中,会进行数据的写入。这里我们注意到:

new SendGrpcFrameCommand(transportState(), bytebuf, false)  //NettyServerStream
new new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream //NettyClientStram

两者还是有区别的。看来,Server端写出去的Frame是不会为Stream中最后一个Frame的,只有client端会控制是否是最后一个Frame真相大白,原来请求经过序列化后(MessageFramer),就会封装成WritableBuffer投递给NettyServerStream.SinkNettyClientStream.Sink,投递后,再由Sink加入到WriteQueue,最终由WriteQueue写进Channel 综上,其实WriteQueue是在网络层与真正的服务实现类之间写信息的媒介,无论Client还是Server,最终都是通过WriteQueue开始转到Netty网络层的。 那gRPC中的Frame到底是怎样的呢? 首先我们先讲讲gRPC中的Stream* A single stream of communication between two end-points within a transport. 这里很显然Stream被视为两个物理机通信的数据结构。 然而,在RFC中,我们又找到一句话:

The order in which frames are sent on a stream is significant. Recipients process frames in the order they are received. In particular, the order of HEADERS and DATA frames is semantically significant.

RFC中的Stream说的是Http2Stream,大致的意思是,Stream中的Frame发送和处理的顺序需要某种机制去保证一致,这里大家理解一下,如果A端发送的数据包为1、2,而B端接收的数据包为1,2,但是处理后回写成2的返回、1的返回,那通讯不就乱了?是的,TCP是不会给你做请求对应这种事情的,管你IO就已经很不错了。 那这么说,gRPC中的Frame如何保证数据包对应的呢? 这里楼主说实话,想了很久,查了很久,最后找到了SendResponseHeadersCommand,这是Server在处理完请求后回写给客户端的数据载体,我们翻到NettuServerHandler中去。

  private void sendResponseHeaders(ChannelHandlerContext ctx, SendResponseHeadersCommand cmd,
      ChannelPromise promise) throws Http2Exception {
    int streamId = cmd.stream().id();
    Http2Stream stream = connection().stream(streamId);
    if (stream == null) {
      resetStream(ctx, streamId, Http2Error.CANCEL.code(), promise);
      return;
    }
    if (cmd.endOfStream()) {
      //如果这个数据是Stream中最后一条,就在数据发送完后关闭Stream
      closeStreamWhenDone(promise, streamId);
    }
    encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise);
  }
  
  ...
  private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd,
      ChannelPromise promise) throws Http2Exception {
    if (cmd.endStream()) {
      //如果这个数据是Stream中最后一条,就在数据发送完后关闭Stream
      closeStreamWhenDone(promise, cmd.streamId());
    }
    // Call the base class to write the HTTP/2 DATA frame.
    encoder().writeData(ctx, cmd.streamId(), cmd.content(), 0, cmd.endStream(), promise);
  }

啊,原来如此,看来gRPC中一个Stream只完成一次响应的请求,果然是within a transport,原来Client端在发送了请求后,就已经将Stream置成half-close状态了,我们看下ClientCalls类:

    private static <ReqT, RespT> void asyncUnaryRequestCall(ClientCall<ReqT, RespT> call, ReqT param, Listener<RespT> responseListener, boolean streamingResponse) {
        startCall(call, responseListener, streamingResponse); //想也知道,先发Header,中间处理       可不能异步哦。
        try {
            call.sendMessage(param); //发送请求
            call.halfClose();  //半关闭
        } catch (RuntimeException var5) {
            throw cancelThrow(call, var5);
        } catch (Error var6) {
            throw cancelThrow(call, var6);
        }
    }

那真正的close()在哪里呢? 看来还是在大Server端,ServerCallImpl中:

  @Override
  public void close(Status status, Metadata trailers) {
    checkState(!closeCalled, "call already closed");
    try {
      closeCalled = true;

      if (status.isOk() && method.getType().serverSendsOneMessage() && !messageSent) {
        internalClose(Status.INTERNAL.withDescription(MISSING_RESPONSE));
        return;
      }
      stream.close(status, trailers);
    } finally {
      serverCallTracer.reportCallEnded(status.isOk());
    }
  }

怪不得,我们每次Server接收到响应后,要调用一下onComplete()方法,因为要将流关闭。那onNext()方法呢?onNext()实际上就是回写返回给调用方了。

总结

看来gRPC基于Http2,每次完整的请求周期,只对应一个Stream,在Stream上,Client和Server创建Stream,发送数据,处理,返回,关闭流。这么看来,使用完全异步的Netty也是明智之举,毕竟有StreamId,请求的处理顺序控制的很好。

本文为作者原创,转载请注明出处 。邮箱:568718043@qq.com