Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the logic of incomplete data read by Netty distributed decoder

2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/01 Report--

This article will explain in detail what the logic of incomplete data reading by Netty distributed decoder is. The editor thinks it is very practical, so I share it with you as a reference. I hope you can get something after reading this article.

Overview

If Server triggers a channelRead event when reading data from the client, if one read is incomplete, then how does Netty deal with this kind of problem, which will be analyzed in detail in this chapter

In the previous chapter, we learned about pipeline, where events are passed in pipeline, and handler can intercept events and process them, and the codec analyzed later is actually a handler, intercepting bytes in byteBuf, and then building it into data that the business needs to continue to propagate.

The encoder, usually the OutBoundHandler, processes the outflow of data on its own basis, so it is also called the encoder, which encodes the data and sends it out.

The decoder, usually the inboundHandler, processes the data that flows to it on its own basis, so it is also called the decoder, which receives the data in the opposite direction and decodes it before using it.

Similarly, in the encoder of netty, the problems of half packet and sticky packet will be dealt with accordingly.

What is a half-packet, as the name implies, is an incomplete packet, because when netty polls for read events, the data read in channel is not necessarily a complete packet. In this case, it is called a half-packet.

Sticky packets are also not difficult to understand. If client sends data packets to server, if it is sent frequently, it is likely to send the data of multiple packets to the channel. If server is reading, it may read more than the length of a complete packet. This situation is called sticky packet.

For half-package and sticky package, as shown in the following figure:

6-0-1

Netty's handling of semi-packet or sticky packet is actually very simple. Through previous learning, we know that each handler is uniquely bound to channel, and each handler corresponds to only one channel, so the data in channel is parsed when reading. If it is not a complete data packet, the parsing fails. Save this packet and assemble and parse it next time. The packet will not be passed down until the complete packet is parsed

How is the specific process reflected in the code? Let's get into the source code analysis.

Section 1: ByteToMessageDecoder

The ByteToMessageDecoder decoder, as its name implies, is a decoder that parses Byte into messages.

Let's take a look at his definition public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {/ / Class ellipsis}

ChannelInboundHandlerAdapter is inherited here, and as we have learned before, we know that this is a handler of type inbound, that is, a handler that handles events that flow to itself.

Secondly, the class is modified by the abstract keyword to show that it is an abstract class. When we actually use this class, we do not use this class directly, but use its subclass. The class defines the skeleton method of the decoder, and the concrete implementation logic is given to the subclass. Similarly, it is also implemented by this class in semi-package processing.

Many decoders in netty implement this class, and we can also implement custom decoders by implementing this class

Let's focus on one property of this class:

ByteBuf cumulation

This attribute, which is the key attribute about semi-packet processing, we know from the overview that netty saves incomplete packets, and this packet is stored in this attribute.

In the previous study, we know that ByteBuf will pass channelRead events after reading the data, and the channelRead method of handler will be called in the process of propagation, and the channelRead method of ByteToMessageDecoder is the key part of coding.

Let's see its channelRead method public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception {/ / if message is a byteBuf type if (msg instanceof ByteBuf) {/ / simply treat it as an arrayList to hold the parsed object CodecOutputList out = CodecOutputList.newInstance (); try {ByteBuf data = (ByteBuf) msg / / the current accumulator is empty, indicating that this is the first time to read data from the io stream first = cumulation = = null; if (first) {/ / if it is the first time, assign the accumulator to the object cumulation = data that has just been read in } else {/ / if not the first time, accumulate the current accumulated data and the read data cumulation = cumulator.cumulate (ctx.alloc (), cumulation, data);} / / call the methods of the subclass to parse callDecode (ctx, cumulation, out) } catch (DecoderException e) {throw e;} catch (Throwable t) {throw new DecoderException (t);} finally {if (cumulation! = null & &! cumulation.isReadable ()) {numReads = 0; cumulation.release (); cumulation = null } else if (+ + numReads > = discardAfterReads) {numReads = 0; discardSomeReadBytes ();} / record list length int size = out.size (); decodeWasNull =! out.insertSinceRecycled (); / / propagate fireChannelRead (ctx, out, size) downwards; out.recycle () }} else {/ / is not a byteBuf type, then ctx.fireChannelRead (msg) is propagated downwards;}}

This method is relatively long. I will take you to analyze it step by step.

First of all, it is judged that if the incoming data is ByteBuf, it will enter the if block.

CodecOutputList out = CodecOutputList.newInstance () just use it as an ArrayList to hold the decoded data.

ByteBuf data = (ByteBuf) msg converts the data to ByteBuf

First = cumulation = = null here means that if cumulation = = null, there is no storage board half-packet data, then save the current data in the attribute cumulation

If cumulation! = null indicates that half-packet data is stored, the read data and the original data are accumulated through cumulator.cumulate (ctx.alloc (), cumulation, data) and saved in the attribute cumulation.

Let's look at the cumulator attribute private Cumulator cumulator = MERGE_CUMULATOR

Its static property MERGE_CUMULATOR is called here, and we follow:

Public static final Cumulator MERGE_CUMULATOR = new Cumulator () {@ Override public ByteBuf cumulate (ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {ByteBuf buffer; / / cannot reach the maximum memory if (cumulation.writerIndex () > cumulation.maxCapacity ()-in.readableBytes () | | cumulation.refCnt () > 1) {buffer = expandCumulation (alloc, cumulation, in.readableBytes ()) } else {buffer = cumulation;} / / buffer buffer.writeBytes the current data (in); in.release (); return buffer;}}

Here, a static object of type Cumulator is created, and the cumulate method is overridden. Here, the cumulate method is used to stitch the ByteBuf:

In the method, first determine whether the number of readable bytes of the write pointer of cumulation + in exceeds the maximum length of cumulation. If so, the cumulation will be expanded, and if not, it will be assigned to the local variable buffer

Then write the data of in to buffer, release the in, and return the ByteBuf after writing the data.

Go back to the channelRead method:

Finally, it is decoded by the callDecode (ctx, cumulation, out) method, where the Context object, buffer cumulation and collection out are passed:

We follow the callDecode (ctx, cumulation, out) method:

Protected void callDecode (ChannelHandlerContext ctx, ByteBuf in, List out) {try {/ / as long as there is while (in.isReadable ()) {int outSize = out.size () in the accumulator / / determine whether the current List has an object if (outSize > 0) {/ / if so, propagate the event fireChannelRead (ctx, out, outSize) downwards; / / clear the current list out.clear () / / if ctx is dropped by removed, break if (ctx.isRemoved ()) {break;} outSize = 0;} / / current readable data length int oldInputLength = in.readableBytes () / / subclass implementation / / subclass parsing, parsing playful objects into out decode (ctx, in, out); if (ctx.isRemoved ()) {break } / / the size before parsing is the same as the length after parsing (what is not parsed) if (outSize = = out.size ()) {/ / original readable length = = readable length after parsing / / indicates that no data has been read (the current accumulated data is not spelled into a complete number According to the package) if (oldInputLength = = in.readableBytes ()) {/ / jump out of the loop (next time the data can be read for subsequent parsing) break } else {/ / did not resolve the data, but read the continue }} / / out contains data, but does not read data from the accumulator if (oldInputLength = = in.readableBytes ()) {throw new DecoderException (StringUtil.simpleClassName (getClass ()) + ".decode () did not read anything but decoded a message.") } if (isSingleDecode ()) {break;} catch (DecoderException e) {throw e;} catch (Throwable cause) {throw new DecoderException (cause);}}

Here, we first loop to determine whether the incoming ByteBuf has readable bytes, and if there are any readable bytes indicating that the decoding is not completed, the loop continues to decode.

Then determine the size of the set out. If the size is greater than 1, it means that the out contains the data after the decoding is completed, and then propagates the event down and empties the out.

Because the first time we decode the out is empty, we will not enter the if block here. We will analyze this part later, and we will move on here.

Use int oldInputLength = in.readableBytes () to get the current ByteBuf, that is, the number of readable bytes of the attribute cumulation. Here is a backup for comparison. Let's move on:

The decode (ctx, in, out) method is the final decoding operation. This part reads the cumulation and puts the decoded data into the collection out. In ByteToMessageDecoder, this method is an abstract method, which is implemented by subclasses. Many of the decoding methods we use in netty inherit ByteToMessageDecoder and implement the decode method to complete the decoding operation. Similarly, we can also follow the corresponding rules to customize the decoder. In the following sections, we will explain the decoders defined by netty and analyze the relevant implementation details. Let's move on here:

The judgment of if (outSize = = out.size ()) indicates that the size of the out before parsing is compared with that of the out after parsing. If it is the same, it means that no data has been parsed, so we enter the if block:

If (oldInputLength = = in.readableBytes ()) indicates that the number of readable bytes of cumulation is the same before and after parsing, indicating that there is no parsing data in the decoding method, that is, if the current data is not a complete data packet, it will jump out of the loop and leave it for the next parsing, otherwise, it means that the data has not been parsed, but it has been read, so skip the loop and enter the next loop.

Finally, if (oldInputLength = = in.readableBytes ()) is judged, which means that there is data in out, but the data is not read from cumulation, indicating that the content of this out is illegal and an exception is thrown directly.

Let's go back to the channRead method

Let's focus on what's in finally:

Finally {if (cumulation! = null & &! cumulation.isReadable ()) {numReads = 0; cumulation.release (); cumulation = null;} else if (+ + numReads > = discardAfterReads) {numReads = 0; discardSomeReadBytes ();} / record list length int size = out.size (); decodeWasNull =! out.insertSinceRecycled (); / / propagate fireChannelRead (ctx, out, size) downwards Out.recycle ();}

First it is determined that cumulation is not null and there are no readable bytes, then the accumulator is released and set to null

The length of the out is then recorded, the channelRead event is propagated downward through fireChannelRead (ctx, out, size), and the out object is recycled.

We follow the fireChannelRead (ctx, out, size) method:

Static void fireChannelRead (ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {/ / traverse List for (int I = 0; I < numElements; I + +) {/ / pass down ctx.fireChannelRead (msgs.getUnsafe (I)) one by one;}}

Here we iterate through the out collection and pass the elements down one by one

This is the end of the article on "what is the logic of incomplete data read by Netty distributed decoder". I hope the above content can be of some help to you, so that you can learn more knowledge. if you think the article is good, please share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report