转换的艺术:MessageFrame、MessageDeframer
前边我们讲了gRPC通信的模型,以及WriteQueue
还有Frame。哎?那我们最终看到的请求或者响应,都是已经序列化好的类啊?难道Netty
还能帮我转换成字节不成?如果一次通信的Frame过大,要分两个Frame发,怎么整?本篇会来介绍处理这些问题的类,MessageFrame
和MessageDeframer
。
系列目录:
- gRPC网络模型
- Channel、Connection、Htt2Stream、Stream的那些事(基于Netty)
- gRPC中的FRAME
- 转换的艺术:MessageFrame、MessageDeframer
- 待续
MessageFrame
Encodes gRPC messages to be delivered via the transport layer
故名思义,将gRPC的请求变成WritableBuffer
(这里要注意哦,只是把InputStream
变成了WritableBuffer
,真正的Proto的序列化还在外层),其实也就是NettyWritableBuffer
,里边就会有我们熟悉的ByteBuf
(Netty
中通讯的数据单元)。
那
MessageFrame
里做了什么呢:
@Override
public void writePayload(InputStream message) {
...
if (messageLength != 0 && compressed) {
written = writeCompressed(message, messageLength);
} else {
written = writeUncompressed(message, messageLength);
}
...
}
看来写也要分压缩和非压缩,那我们看看非压缩的情况:
private int writeUncompressed(InputStream message, int messageLength) throws IOException {
if (messageLength != -1) {
currentMessageWireSize = messageLength;
return writeKnownLengthUncompressed(message, messageLength);
}
BufferChainOutputStream bufferChain = new BufferChainOutputStream();
//将message写入bufferChain
int written = writeToOutputStream(message, bufferChain);
if (maxOutboundMessageSize >= 0 && written > maxOutboundMessageSize) {
throw Status.RESOURCE_EXHAUSTED
.withDescription(
String.format("message too large %d > %d", written , maxOutboundMessageSize))
.asRuntimeException();
}
//最终将bufferChain写入到sink
writeBufferChain(bufferChain, false);
return written;
}
最终:
private void writeBufferChain(BufferChainOutputStream bufferChain, boolean compressed) {
ByteBuffer header = ByteBuffer.wrap(headerScratch);
header.put(compressed ? COMPRESSED : UNCOMPRESSED);
int messageLength = bufferChain.readableBytes();
header.putInt(messageLength);
//数据请求头
WritableBuffer writeableHeader = bufferAllocator.allocate(HEADER_LENGTH);
writeableHeader.write(headerScratch, 0, header.position());
if (messageLength == 0) {
// the payload had 0 length so make the header the current buffer.
buffer = writeableHeader;
return;
}
sink.deliverFrame(writeableHeader, false, false, messagesBuffered - 1);
messagesBuffered = 1;
List<WritableBuffer> bufferList = bufferChain.bufferList;
for (int i = 0; i < bufferList.size() - 1; i++) {
sink.deliverFrame(bufferList.get(i), false, false, 0);
}
//最后一个buffer等着flush()的时候写end-of-stream=true
buffer = bufferList.get(bufferList.size() - 1);
currentMessageWireSize = messageLength;
}
我去,细思恐极,原来发一个请求,要写入这么多次WrtieQueue
(从上一篇博客中,我们已经知道,deliverFrame()一次,就是往WriteQueue
里写入一次)。这里我们稍微捋一下:
ClientCallImpl
调用startCall()
发送建立Stream
的Header
。ClientCallImpl
中sendMessage()
开始发送数据(ps,已经发送过Header
了)- 当发送完成后,
ClientCallImpl
会half-close
,此时会真正提交最后一个buffer
(end-of-stream
)到WriteQueue
,并且发送出去(flush=true
)。 - 等待服务端响应。
OK,清晰明了,这里各位估计有个疑问,我们在发送Data Frame
的时候,有一个5个字节的writeableHeader
,WTF,这个是啥?原来这个这个Header
还会标识这个Frame
有没有压缩,Frame
里边有多少个字节,看样子也是能放下一个int
类型的了。用做什么呢?下边继续看。
MessageDeframer
有Encode就有Decode,我们来看看Frame
是怎么Decode的。
@Override
public void request(int numMessages) {
checkArgument(numMessages > 0, "numMessages must be > 0");
if (isClosed()) {
return;
}
pendingDeliveries += numMessages;
deliver();
}
Sink可以通过这个方法,去对端拿Frame
,这里就是取Message
的个数,注意,不是Frame
的,并且当读到BODY
(也就是Data Frame
)的时候才会减少。OK,这里怎么处理的呢:
直接到deliver()
里边看:
private void deliver() {
if (inDelivery) {
return;
}
inDelivery = true;
try {
while (!stopDelivery && pendingDeliveries > 0 && readRequiredBytes()) {
switch (state) {
case HEADER:
processHeader();
break;
case BODY:
// Read the body and deliver the message.
processBody();
// Since we've delivered a message, decrement the number of pending
// deliveries remaining.
pendingDeliveries--;
break;
default:
throw new AssertionError("Invalid state: " + state);
}
}
if (stopDelivery) {
close();
return;
}
if (closeWhenComplete && isStalled()) {
close();
}
} finally {
inDelivery = false;
}
}
原来是在readRequiredBytes()
中进行循环读了,那Header
和Body
如何切换呢?这里不得不承认谷歌设计的很好。
private State state = State.HEADER;
private int requiredLength = HEADER_LENGTH;
第一次读,必须是读State.HEADER
,还记得上边提到的5个字节的writeableHeader
吗,原来如此。每次读到头,processHeader()
确认下一次读的requiredLength
,读到BODY
,再设置读下一个Header
(事实很少用到)。
这样,readRequiredBytes()
才能读到指定长度的数据,才会进行处理,Google原来在Frame的下层,还会解包,社会社会。
了解了request()
,大家也顺便知道了deliver()
的作用,那最终MessageDeframe
怎么调呢?
还记得古人云,Netty
中一切都是异步的,看来deframe()
不出意外应该是Netty
来调了。bingo,当有响应时,NettyServerHandler
、NettyClientHandler
的onDataRead
就会响起,最终调用到deframe()
上边来。
@Override
public void deframe(ReadableBuffer data) {
checkNotNull(data, "data");
boolean needToCloseData = true;
try {
if (!isClosedOrScheduledToClose()) {
if (fullStreamDecompressor != null) {
fullStreamDecompressor.addGzippedBytes(data);
} else {
//先放入unprocessed里边
unprocessed.addBuffer(data);
}
needToCloseData = false;
deliver();
}
} finally {
if (needToCloseData) {
data.close();
}
}
}
看来不管来的Data是啥,不管三七二十一,先放在unprocessed
里边,交给deliver()
处理。这就回到了readRequiredBytes()
中的死循环里。
哎哎哎,不对啊,那deframe()
的数据哪里去了?
原来,processBody()
的时候传递给listener了,后续再去序列化:
private void processBody() {
statsTraceCtx.inboundMessageRead(currentMessageSeqNo, inboundBodyWireSize, -1);
inboundBodyWireSize = 0;
InputStream stream = compressedFlag ? getCompressedBody() : getUncompressedBody();
nextFrame = null;
//重磅
listener.messagesAvailable(new SingleMessageProducer(stream));
state = State.HEADER;
requiredLength = HEADER_LENGTH;
}
原来如此,讲到这里,大家就知道了,原来gRPC中的请求和返回,是这样变成BufBytes
的。
本文为作者原创,转载请注明出处 。邮箱:568718043@qq.com