Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Analysis of Netty distributed flush method for refreshing buffer queue source code

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/01 Report--

In this article Xiaobian introduces in detail "Netty distributed flush method refresh buffer queue source code analysis", the content is detailed, the steps are clear, and the details are handled properly. I hope this "Netty distributed flush method refresh buffer queue source code analysis" article can help you solve your doubts.

Flush method

The flush method is passed through the event, which is eventually passed to the flush method of HeadContext:

Public void flush (ChannelHandlerContext ctx) throws Exception {unsafe.flush ();} here will eventually call AbstractUnsafe's flush method public final void flush () {assertEventLoop (); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer = = null) {return;} outboundBuffer.addFlush (); flush0 ();}

The first thing here is to get the ChannelOutboundBuffer object.

Then let's take a look at this step:

OutboundBuffer.addFlush ()

This step is also the pointer to adjust the ChannelOutboundBuffer.

Follow up the addFlush method public void addFlush () {Entry entry = unflushedEntry; if (entry! = null) {if (flushedEntry = = null) {flushedEntry = entry;} do {flushed + +; if (! entry.promise.setUncancellable ()) {int pending = entry.cancel (); decrementPendingOutboundBytes (pending, false, true) } entry = entry.next;} while (entry! = null); unflushedEntry = null;}}

First declare that an entry points to unflushedEntry, that is, the first entry that is not flush

Normally, unflushedEntry is not empty, so enter if

FlushedEntry is usually empty before it is refreshed, so it executes to flushedEntry = entry

That is, flushedEntry points to entry.

After the above operation, the pointer to the buffer is shown in the figure:

7-4-1

And then through do-while, keep looking for the nodes behind the unflushedEntry until there are no nodes.

Flushed self-increment represents how many nodes need to be refreshed.

In the cycle, we focus on this step.

DecrementPendingOutboundBytes (pending, false, true)

This step is also to count the number of bytes in the buffer, but it is the opposite of the incrementPendingOutboundBytes in the previous section. Because this is a refresh, we need to subtract the number of bytes after refresh.

We follow the method:

Private void decrementPendingOutboundBytes (long size, boolean invokeLater, boolean notifyWritability) {if (size = = 0) {return;} / / subtract long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet (this,-size) from the total size; / / until it is less than a threshold of 32 bytes if (notifyWritability & & newWriteBufferSize)

< channel.config().getWriteBufferLowWaterMark()) { //设置写状态 setWritable(invokeLater); }} 同样TOTAL_PENDING_SIZE_UPDATER代表缓冲区的字节数, 这里的addAndGet中参数是-size, 也就是减掉size的长度 再看 if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) getWriteBufferLowWaterMark()代表写buffer的第水位值, 也就是32k, 如果写buffer的长度小于这个数, 就通过setWritable方法设置写状态 也就是通道由原来的不可写改成可写 回到addFlush方法 遍历do-while循环结束之后, 将unflushedEntry指为空, 代表所有的entry都是可写的 经过上述操作, 缓冲区的指针情况如下图所示: 7-4-2 回到AbstractUnsafe的flush方法 指针调整完之后, 我们跟到flush0()方法中: protected void flush0() { if (inFlush0) { return; } final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) { return; } inFlush0 = true; if (!isActive()) { try { if (isOpen()) { outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true); } else { outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } } finally { inFlush0 = false; } return; } try { doWrite(outboundBuffer); } catch (Throwable t) { if (t instanceof IOException && config().isAutoClose()) { close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } else { outboundBuffer.failFlushed(t, true); } } finally { inFlush0 = false; }} if (inFlush0) 表示判断当前flush是否在进行中, 如果在进行中, 则返回, 避免重复进入 我们重点关注doWrite方法 跟到AbstractNioByteChannel的doWrite方法中去: protected void doWrite(ChannelOutboundBuffer in) throws Exception { int writeSpinCount = -1; boolean setOpWrite = false; for (;;) { //每次拿到当前节点 Object msg = in.current(); if (msg == null) { clearOpWrite(); return; } if (msg instanceof ByteBuf) { //转化成ByteBuf ByteBuf buf = (ByteBuf) msg; //如果没有可写的值 int readableBytes = buf.readableBytes(); if (readableBytes == 0) { //移除 in.remove(); continue; } boolean done = false; long flushedAmount = 0; if (writeSpinCount == -1) { writeSpinCount = config().getWriteSpinCount(); } for (int i = writeSpinCount - 1; i >

= 0; I--) {/ / write buf to socket / / localFlushedAmount represents the number of bytes written to the bottom layer of jdk int localFlushedAmount = doWriteBytes (buf); / / if a byte is not written, direct break if (localFlushedAmount = = 0) {setOpWrite = true; break } / / Statistics on the total number of bytes written flushedAmount + = localFlushedAmount; / / if buffer is all written to the underlying jdk if (! buf.isReadable ()) {/ / tag all writes done = true; break }} in.progress (flushedAmount); if (done) {/ / remove the current object in.remove ();} else {break }} else if (msg instanceof FileRegion) {/ / Code omitted} else {throw new Error ();}} incompleteWrite (setOpWrite);}

The first is an infinite for loop.

Object msg = in.current () this step is to get the msg in the entry pointed to by flushedEntry

Follow public Object current () {Entry entry = flushedEntry; if (entry = = null) {return null;} return entry.msg;} in the current () method

Here, you can directly get the associated msg in the entry pointed to by flushedEntry, that is, a ByteBuf.

Go back to the doWrite method:

If msg is null, which means there is no entry that can be refreshed, call the clearOpWrite () method to clear the write identity

If msg is not null, it will determine whether it is of ByteBuf type. If it is ByteBuf, enter the logic in the if block.

In the if block, first convert msg to ByteBuf, and then determine whether the ByteBuf is readable. If not, remove the entry associated with the current byteBuf through in.remove (), and then skip this loop and enter the next loop.

The remove method will be analyzed later, so let's move on here.

Boolean done = false set an ID here to identify whether the refresh operation is completed. The default value here is false, which means that the refresh operation has not been completed.

WriteSpinCount = config () .getWriteSpinCount () here is the number of cycles to get a write operation. The default is 16.

Then write the loop according to the number of cycles.

In the loop, focus on this step:

Int localFlushedAmount = doWriteBytes (buf)

This step is to write the contents of buf to channel and return the number of bytes written. Here, doWriteBytes of NioSocketChannel will be called.

We follow the doWriteBytes method: protected int doWriteBytes (ByteBuf buf) throws Exception {final int expectedWrittenBytes = buf.readableBytes (); return buf.readBytes (javaChannel (), expectedWrittenBytes);}

Here, we first get the number of readable bytes of buf, and then write the readable bytes to the underlying channel of jdk through readBytes

Go back to the doWrite method:

After writing the content to the underlying channel of jdk, if not a single byte is written, channel may not be writable now. Set setOpWrite to true to identify the write operation bit and exit the loop.

If bytes have been written, the cumulative number of bytes written by flushedAmount + = localFlushedAmount

Then determine whether the data of the buf has been written according to whether the buf has no readable bytes. If it is done, set done to true, indicating that the write operation is complete and exit the loop.

Because sometimes all the bytes of the byteBuf may not be written at once, it will continue to be written out through a loop until it is looped to 16 times.

If the ByteBuf content is fully written, the current entry will be removed by in.remove ()

We follow the remove method:

Public boolean remove () {/ / get the current first flush entry Entry e = flushedEntry; if (e = = null) {clearNioBuffers (); return false;} Object msg = e.msg; ChannelPromise promise = e.roome; int size = e.originingSize; removeEntry (e); if (! e.cancelled) {ReferenceCountUtil.safeRelease (msg); safeSuccess (promise) DecrementPendingOutboundBytes (size, false, true);} e.recycle (); return true;}

First get the current flushedEntry

Let's focus on the removeEntry step and follow in:

Private void removeEntry (Entry e) {if (--flushed = = 0) {/ / position is empty flushedEntry = null; / / if it is the last node if (e = = tailEntry) {/ / all set to empty tailEntry = null; unflushedEntry = null }} else {/ / move to the next node flushedEntry = e.next;}}

If (--flushed = = 0) indicates whether the current node is the last node that needs to be refreshed. If so, the flushedEntry pointer is set to null

If the current node is a tailEntry node, it means that the current node is the last node. Set both tailEntry and unflushedEntry pointers to null.

If the current node is not the last node that needs to be refreshed, move the flushedEntry pointer to the next node by flushedEntry = e.nex t

After reading this, the article "Netty distributed flush method Refresh buffer queue Source Code Analysis" has been introduced. If you want to master the knowledge points of this article, you still need to practice and use it yourself to understand it. If you want to know more related articles, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report