转换的艺术:MessageFrame、MessageDeframer
前边我们讲了gRPC通信的模型,以及WriteQueue还有Frame。哎?那我们最终看到的请求或者响应,都是已经序列化好的类啊?难道Netty还能帮我转换成字节不成?如果一次通信的Frame过大,要分两个Frame发,怎么整?本篇会来介绍处理这些问题的类,MessageFrame和MessageDeframer。
系列目录:
- gRPC网络模型
- Channel、Connection、Htt2Stream、Stream的那些事(基于Netty)
- gRPC中的FRAME
- 转换的艺术:MessageFrame、MessageDeframer
- 待续
MessageFrame
Encodes gRPC messages to be delivered via the transport layer
故名思义,将gRPC的请求变成WritableBuffer(这里要注意哦,只是把InputStream变成了WritableBuffer,真正的Proto的序列化还在外层),其实也就是NettyWritableBuffer,里边就会有我们熟悉的ByteBuf(Netty中通讯的数据单元)。
那MessageFrame里做了什么呢:
@Override
public void writePayload(InputStream message) {
...
if (messageLength != 0 && compressed) {
written = writeCompressed(message, messageLength);
} else {
written = writeUncompressed(message, messageLength);
}
...
}
看来写也要分压缩和非压缩,那我们看看非压缩的情况:
private int writeUncompressed(InputStream message, int messageLength) throws IOException {
if (messageLength != -1) {
currentMessageWireSize = messageLength;
return writeKnownLengthUncompressed(message, messageLength);
}
BufferChainOutputStream bufferChain = new BufferChainOutputStream();
//将message写入bufferChain
int written = writeToOutputStream(message, bufferChain);
if (maxOutboundMessageSize >= 0 && written > maxOutboundMessageSize) {
throw Status.RESOURCE_EXHAUSTED
.withDescription(
String.format("message too large %d > %d", written , maxOutboundMessageSize))
.asRuntimeException();
}
//最终将bufferChain写入到sink
writeBufferChain(bufferChain, false);
return written;
}
最终:
private void writeBufferChain(BufferChainOutputStream bufferChain, boolean compressed) {
ByteBuffer header = ByteBuffer.wrap(headerScratch);
header.put(compressed ? COMPRESSED : UNCOMPRESSED);
int messageLength = bufferChain.readableBytes();
header.putInt(messageLength);
//数据请求头
WritableBuffer writeableHeader = bufferAllocator.allocate(HEADER_LENGTH);
writeableHeader.write(headerScratch, 0, header.position());
if (messageLength == 0) {
// the payload had 0 length so make the header the current buffer.
buffer = writeableHeader;
return;
}
sink.deliverFrame(writeableHeader, false, false, messagesBuffered - 1);
messagesBuffered = 1;
List<WritableBuffer> bufferList = bufferChain.bufferList;
for (int i = 0; i < bufferList.size() - 1; i++) {
sink.deliverFrame(bufferList.get(i), false, false, 0);
}
//最后一个buffer等着flush()的时候写end-of-stream=true
buffer = bufferList.get(bufferList.size() - 1);
currentMessageWireSize = messageLength;
}
我去,细思恐极,原来发一个请求,要写入这么多次WrtieQueue(从上一篇博客中,我们已经知道,deliverFrame()一次,就是往WriteQueue里写入一次)。这里我们稍微捋一下:
ClientCallImpl调用startCall()发送建立Stream的Header。ClientCallImpl中sendMessage()开始发送数据(ps,已经发送过Header了)- 当发送完成后,
ClientCallImpl会half-close,此时会真正提交最后一个buffer(end-of-stream)到WriteQueue,并且发送出去(flush=true)。 - 等待服务端响应。
OK,清晰明了,这里各位估计有个疑问,我们在发送Data Frame的时候,有一个5个字节的writeableHeader,WTF,这个是啥?原来这个这个Header还会标识这个Frame有没有压缩,Frame里边有多少个字节,看样子也是能放下一个int类型的了。用做什么呢?下边继续看。
MessageDeframer
有Encode就有Decode,我们来看看Frame是怎么Decode的。
@Override
public void request(int numMessages) {
checkArgument(numMessages > 0, "numMessages must be > 0");
if (isClosed()) {
return;
}
pendingDeliveries += numMessages;
deliver();
}
Sink可以通过这个方法,去对端拿Frame,这里就是取Message的个数,注意,不是Frame的,并且当读到BODY(也就是Data Frame)的时候才会减少。OK,这里怎么处理的呢:
直接到deliver()里边看:
private void deliver() {
if (inDelivery) {
return;
}
inDelivery = true;
try {
while (!stopDelivery && pendingDeliveries > 0 && readRequiredBytes()) {
switch (state) {
case HEADER:
processHeader();
break;
case BODY:
// Read the body and deliver the message.
processBody();
// Since we've delivered a message, decrement the number of pending
// deliveries remaining.
pendingDeliveries--;
break;
default:
throw new AssertionError("Invalid state: " + state);
}
}
if (stopDelivery) {
close();
return;
}
if (closeWhenComplete && isStalled()) {
close();
}
} finally {
inDelivery = false;
}
}
原来是在readRequiredBytes()中进行循环读了,那Header和Body如何切换呢?这里不得不承认谷歌设计的很好。
private State state = State.HEADER;
private int requiredLength = HEADER_LENGTH;
第一次读,必须是读State.HEADER,还记得上边提到的5个字节的writeableHeader吗,原来如此。每次读到头,processHeader()确认下一次读的requiredLength,读到BODY,再设置读下一个Header(事实很少用到)。
这样,readRequiredBytes()才能读到指定长度的数据,才会进行处理,Google原来在Frame的下层,还会解包,社会社会。
了解了request(),大家也顺便知道了deliver()的作用,那最终MessageDeframe怎么调呢?
还记得古人云,Netty中一切都是异步的,看来deframe()不出意外应该是Netty来调了。bingo,当有响应时,NettyServerHandler、NettyClientHandler的onDataRead就会响起,最终调用到deframe()上边来。
@Override
public void deframe(ReadableBuffer data) {
checkNotNull(data, "data");
boolean needToCloseData = true;
try {
if (!isClosedOrScheduledToClose()) {
if (fullStreamDecompressor != null) {
fullStreamDecompressor.addGzippedBytes(data);
} else {
//先放入unprocessed里边
unprocessed.addBuffer(data);
}
needToCloseData = false;
deliver();
}
} finally {
if (needToCloseData) {
data.close();
}
}
}
看来不管来的Data是啥,不管三七二十一,先放在unprocessed里边,交给deliver()处理。这就回到了readRequiredBytes()中的死循环里。
哎哎哎,不对啊,那deframe()的数据哪里去了?
原来,processBody()的时候传递给listener了,后续再去序列化:
private void processBody() {
statsTraceCtx.inboundMessageRead(currentMessageSeqNo, inboundBodyWireSize, -1);
inboundBodyWireSize = 0;
InputStream stream = compressedFlag ? getCompressedBody() : getUncompressedBody();
nextFrame = null;
//重磅
listener.messagesAvailable(new SingleMessageProducer(stream));
state = State.HEADER;
requiredLength = HEADER_LENGTH;
}
原来如此,讲到这里,大家就知道了,原来gRPC中的请求和返回,是这样变成BufBytes的。
本文为作者原创,转载请注明出处 。邮箱:568718043@qq.com