In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly explains "how KAFKA deals with sticky package unpacking". Interested friends may wish to have a look at it. The method introduced in this paper is simple, fast and practical. Next let the editor to take you to learn "KAFKA is how to deal with sticky package unpacking" bar!
First, why is there the phenomenon of sticky package unpacking?
We know that TCP packets are unpacked, numbered and then sent in batches according to the protocol.
Then corresponding to our meaningful data packets in the application layer, the protocol of the transport layer does not understand its meaning, let alone subpackage and send it according to your business content, but will only send data according to its own protocol stack.
Therefore, there is the problem of sticking and unpacking of network data.
In fact, the essence of the transport layer is that the transport layer does not understand the data meaning of the upper application and will only send the data according to the protocol stack.
Second, what are the usual ways to solve the problem of sticky package unpacking?
After understanding the nature of the problem, it is easy to solve the problem.
That is, when we receive the data, we in the application layer will judge whether the data is complete according to the identity. If it is complete, we will parse the data packet and finally hand it over to the business code.
There are usually three ways to solve the problem of sticking and unpacking:
Fixed length, for example, I guarantee that every piece of data I receive is 200b, then every 200b I receive is considered to be a complete piece of data, which can then be parsed and delivered to the business code.
The delimiter means the same thing. At the end of each piece of data, I use a delimiter such as a line feed and a tab to indicate that the data has been written, so we receive the data to find out where the separator is. Finally, after cutting, you can get a complete packet.
The custom protocol, which is also very simple, is to define what the content format of your complete packet looks like, such as len + data, where len represents the byte length of data. In this way, each time according to the first 4 bytes of len, you can get how much data you need to make a complete piece of data, wait for less, and intercept more.
Finally, many students who are not familiar with network programming may wonder that the packets of TCP are lost and out of order. Isn't there something wrong with the above method?
In fact, it is not. TCP is a reliable message transmission protocol, and the fundamental idea of its protocol is to provide reliable data transmission services.
You can trust that the data transmitted by TCP is reliable, and this will not happen when delivering data to the application layer.
This situation only occurs in the transport layer, and the TCP protocol also designs a series of operations such as batching, numbering, deduplication, checksum, timeout retransmission and so on, to ensure the reliability of the data.
Third, how does kakfa solve the problem of sticky package unpacking?
Finally, let's take a look at how kafka solves the problem of sticky package unpacking. Which method is used to solve the problem mentioned above?
First of all, look at the sticky packet, that is, after receiving the excess data, how to split the packet and read the correct and complete data packet?
As shown in the following code, there are three phases:
The first 4 bytes are read and converted to an int, that is, the length.
Request memory buffer based on length.
Finally, read the data of the specified size to the applied buffer.
As a result, the correct reading of a whole piece of data is complete. The whole process is actually a simple custom protocol like len+data mentioned above.
Public NetworkReceive read () throws IOException {NetworkReceive result = null; / / create a new receive if (receive = = null) {receive = new NetworkReceive (maxReceiveSize, id, memoryPool);} / / Real data read receive (receive); / / Post operation if (receive.complete ()) {/ / rewind, wait to read receive.payload (). Rewind () / / Direct reference assignment result = receive; / / finally clear the current reference, and then wait for the next time you enter read, execute the new operation receive = null;} else if (receive.requiredMemoryAmountKnown () & &! receive.memoryAllocated () & & isInMutableState ()) {/ / pool must be out of memory, mute ourselves. Mute ();} return result;} public long readFrom (ScatteringByteChannel channel) throws IOException {int read = 0; / there is data if (size.hasRemaining ()) {/ / len + dataint bytesRead = channel.read (size); if (bytesRead)
< 0)throw new EOFException(); read += bytesRead; // 如果读满了长度,则直接倒带得到具体的len值 // 这里的size是一个byteBuffer类型的,也就是接收到的数据 if (!size.hasRemaining()) {size.rewind(); int receiveSize = size.getInt(); if (receiveSize < 0)throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")"); if (maxSize != UNLIMITED && receiveSize >MaxSize) throw new InvalidReceiveException ("Invalid receive (size =" + receiveSize + "larger than" + maxSize + "); requestedBufferSize = receiveSize; / / may be 0 for some payloads (SASL) if (receiveSize = = 0) {buffer = EMPTY_BUFFER } / / if the length is ready, how much space do you need for the next data? apply for if here (buffer = = null & & requestedBufferSize! =-1) {/ / we know the size we want but havent been able to allocate it yet buffer = memoryPool.tryAllocate (requestedBufferSize) If (buffer = = null) log.trace ("Broker low on memory-could not allocate buffer of size {} for source {}", requestedBufferSize, source);} / / after the application is completed, call the read function and read it directly. If (buffer! = null) {int bytesRead = channel.read (buffer); if (bytesRead < 0) throw new EOFException (); read + = bytesRead;} / / returns total bytes read return read;}
First look at the unpacking, that is, the data received is not enough to form a complete piece of data, how to wait for the complete data packet?
The core of the following code is the judgment logic of the receive.complete () function. The three conditions of this judgment mean:
! size.hasRemaining (): the received buffer data has been read.
Buffer! = null:buffer has been created.
! buffer.hasRemaining (): buffer has been read.
In fact, as long as a piece of data is not read completely, then the return value of the receive.complete () function is false, then the final return result is null, waiting for the next OP_READ event to read the last unfinished data until a complete piece of data is read.
Public NetworkReceive read () throws IOException {NetworkReceive result = null; if (receive = = null) {receive = new NetworkReceive (maxReceiveSize, id, memoryPool);} receive (receive); if (receive.complete ()) {receive.payload (). Rewind (); result = receive; receive = null;} else if (receive.requiredMemoryAmountKnown () & & receive.memoryAllocated () & & isInMutableState ()) {/ pool must be out of memory, mute ourselves. Mute ();} return result;} public boolean complete () {return! size.hasRemaining () & & buffer! = null & & buffer.hasRemaining ();}
Finally, let's add that what happens when we receive many pieces of data at one time?
The following source code tells us the answer, which is to read it all at once, and then store it in the stageReceives data structure for further business processing.
Private void attemptRead (SelectionKey key, KafkaChannel channel) throws IOException {/ / if channel is ready and has bytes to read from socket or buffer, and has no / / previous receive (s) already staged or otherwise in progress then read from it if (channel.ready () & & (key.isReadable () | | channel.hasBytesBuffered ()) & &! hasStagedReceive (channel) & &! explicitlyMutedChannels.contains (channel)) {NetworkReceive networkReceive / / read all receives at once and temporarily store them in stageReceives while ((networkReceive = channel.read ())! = null) {madeReadProgressLastPoll = true; addToStagedReceives (channel, networkReceive);} / / isMute is to determine whether the current channel is concerned about the OP_READ event if (channel.isMute ()) {outOfMemory = true; / / channel has muted itself due to memory pressure. } else {madeReadProgressLastPoll = true;}} so far, I believe you have a deeper understanding of "how KAFKA deals with sticky package unpacking". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.