Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Example Analysis of Netty distributed client processing access event handle

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly introduces the example analysis of Netty distributed client processing access event handle, which has a certain reference value. Interested friends can refer to it. I hope you will learn a lot after reading this article. Let's take a look at it.

Handle access events to create handle

Going back to the previous chapter, NioEventLoop's processSelectedKey () method

Private void processSelectedKey (SelectionKey k, AbstractNioChannel ch) {/ / get unsafe final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe () in channel; / / if the key is not legal, there may be something wrong with this channel if (! k.isValid ()) {/ / code omission} try {/ / if it is legal, get the io event int readyOps = k.readyOps () of key / / Link event if ((readyOps & SelectionKey.OP_CONNECT)! = 0) {int ops = k.interestOps (); ops & = ~ SelectionKey.OP_CONNECT; k.interestOps (ops); unsafe.finishConnect () } / / write event if ((readyOps & SelectionKey.OP_WRITE)! = 0) {ch.unsafe () .forceFlush () } / / read events and accept link events / / if the current NioEventLoop is a work thread, here is the op_read event / / if the current NioEventLoop is a boss thread, this is the op_accept event if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT))! = 0 | readyOps = = 0) {unsafe.read () If (! ch.isOpen ()) {return;}} catch (CancelledKeyException ignored) {unsafe.close (unsafe.voidPromise ());}}

Let's look at the if and judge:

If ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT))! = 0 | | readyOps = = 0)

As we analyzed in the previous section, if the current NioEventLoop is a work thread, here is the op_read event, and if the current NioEventLoop is a boss thread, this is the op_accept event. Here we take the boss thread as an example.

As we said before, the read () method of unsafe is used to handle both op_read and op_accept events. Here unsafe is obtained through channel. We know that if you are dealing with accept events, the channel here is NioServerSocketChannel, and the unsafe bound to it is NioMessageUnsafe.

We follow the read () method of NioMessageUnsafe:

Public void read () {/ / must be called by NioEventLoop method and cannot be called by external thread assert eventLoop () .inEventLoop (); / / config final ChannelConfig config of server channel = config (); / / pipeline final ChannelPipeline pipeline of server channel = pipeline (); / / rate of server access final RecvByteBufAllocator.Handle allocHandle = unsafe () .recvBufAllocHandle (); / / set configuration allocHandle.reset (config) Boolean closed = false; Throwable exception = null; try {try {do {/ / create the underlying channel / / readBuf of jdk to temporarily host the read link int localRead = doReadMessages (readBuf); if (localRead = = 0) {break } if (localRead

< 0) { closed = true; break; } //分配器将读到的链接进行计数 allocHandle.incMessagesRead(localRead); //连接数是否超过最大值 } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); //遍历每一条客户端连接 for (int i = 0; i < size; i ++) { readPending = false; //传递事件, 将创建NioSokectChannel进行传递 //最终会调用ServerBootstrap的内部类ServerBootstrapAcceptor的channelRead()方法 pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); //代码省略 } finally { //代码省略 }} 首先获取与NioServerSocketChannel绑定config和pipeline, config我们上一小节进行分析过, pipeline我们将在下一章进行剖析 我们看这一句: final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); 这里通过RecvByteBufAllocator接口调用了其内部接口Handler 我们看其RecvByteBufAllocator接口public interface RecvByteBufAllocator { Handle newHandle(); interface Handle { int guess(); void reset(ChannelConfig config); void incMessagesRead(int numMessages); void lastBytesRead(int bytes); int lastBytesRead(); void attemptedBytesRead(int bytes); int attemptedBytesRead(); boolean continueReading(); void readComplete(); }} 我们看到RecvByteBufAllocator接口只有一个方法newHandle(), 顾名思义就是用于创建Handle对象的方法, 而Handle中的方法, 才是实际用于操作的方法 在RecvByteBufAllocator实现类中包含Handle的子类, 具体实现关系如下:

Go back to the read () method and look at this code again:

Final RecvByteBufAllocator.Handle allocHandle = unsafe () .recvBufAllocHandle ()

Unsafe () returns the unsafe object bound by the current channel, and recvBufAllocHandle () eventually calls the recvBufAllocHandle () method of the AbstractChannel inner class AbstractUnsafe

Follow up the recvBufAllocHandle () method of AbstractUnsafe:

Public RecvByteBufAllocator.Handle recvBufAllocHandle () {/ / if it does not exist, create an instance of recvHandle if (recvHandle = = null) {recvHandle = config () .getRecvByteBufAllocator () .newHandle ();} return recvHandle;}

If it is executed here for the first time and its own property recvHandle is empty, a recvHandle instance will be created. Config () returns the ChannelConfig bound by NioServerSocketChannel, and getRecvByteBufAllocator () gets its RecvByteBufAllocator object. As analyzed in the previous section of these two parts, a Handle is created through newHandle (). Here you will go to the newHandle () method in the AdaptiveRecvByteBufAllocator class.

Follow up public Handle newHandle () {return new HandleImpl (minIndex, maxIndex, initial);} in the newHandle () method

Here, three parameters are passed in to create HandleImpl. We analyzed these three parameters in the previous section. MinIndex is the subscript of the minimum memory SIZE_TABLE, maxIndex is the subscript of the maximum memory SEIZE_TABEL, and initial is the initial memory. We follow in the construction method of HandleImpl:

Public HandleImpl (int minIndex, int maxIndex, int initial) {this.minIndex = minIndex; this.maxIndex = maxIndex; index = getSizeTableIndex (initial); nextReceiveBufferSize = SIZE_TABLE [index];}

Initialize minIndex and maxIndex, find the current subscript according to initial, and nextReceiveBufferSize find the corresponding memory according to the current subscript

So, we created a Handle object

Here we need to know that this handle is the only property bound to channel, while the AdaptiveRecvByteBufAllocator object is only bound to the ChannelConfig object, and indirectly to channel.

Go back to read () method public void read () {/ / must be called by NioEventLoop method, cannot call assert eventLoop () .inEventLoop () through external thread; / / config final ChannelConfig config of server channel = config (); / / pipeline final ChannelPipeline pipeline of server channel = pipeline (); / / rate of server access final RecvByteBufAllocator.Handle allocHandle = unsafe () .recvBufAllocHandle () / / set configuration allocHandle.reset (config); boolean closed = false; Throwable exception = null; try {try {do {/ / create the underlying channel / / readBuf of jdk to temporarily host the read link int localRead = doReadMessages (readBuf); if (localRead = = 0) {break } if (localRead < 0) {closed = true; break;} / / the allocator counts the read links allocHandle.incMessagesRead (localRead) / / whether the number of connections exceeds the maximum} while (allocHandle.continueReading ());} catch (Throwable t) {exception = t;} int size = readBuf.size (); / / traverses each client connection for (int I = 0; I < size; I + +) {readPending = false / / pass the event, which will create a NioSokectChannel to pass / / eventually call the channelRead () method pipeline.fireChannelRead (readBuf.get (I)) of ServerBootstrap's inner class ServerBootstrapAcceptor;} readBuf.clear (); allocHandle.readComplete (); pipeline.fireChannelReadComplete (); / / Code ellipsis} finally {/ / Code ellipsis}

Continue to follow:

AllocHandle.reset (config)

This code is to reset the configuration, that is, initialize the previous configuration information, and eventually go to the reet of the inner class MaxMessageHandle in DefaultMaxMessagesRecvByteBufAllocator.

We follow up public void reset (ChannelConfig config) {this.config = config; maxMessagePerRead = maxMessagesPerRead (); totalMessages = totalBytesRead = 0;} in reset

Here are only a few attributes assigned, a brief introduction to these attributes:

Config: current channelConfig object

MaxMessagePerRead: indicates how many times the message can be read (number of loops). MaxMessagesPerRead () returns the maxMessagesPerRead attribute of RecvByteBufAllocator, which has been analyzed in the previous section.

TotalMessages: represents the number of messages that have been read in the current read loop, that is, the number of loops that have been executed in NIO transport mode, which is initialized to 0

TotalBytesRead: represents the total number of message bytes read so far, which is also initialized to 0

Let's move on, starting with a do-while loop in which the number of read connections is put into a List collection by int localRead = doReadMessages (readBuf). We'll analyze it in the next section, and we'll move on:

Let's first look at the allocHandle.incMessagesRead (localRead) step, where localRead represents the number of connections put into readBuf this time. In Nio mode, this returns 1 if a connection is read.

Follow to the incMessagesRead (int amt) method of MaxMessageHandle in:

Public final void incMessagesRead (int amt) {totalMessages + = amt;}

Here add amt to totalMessages, that is, + 1

Here totalMessage, which has been analyzed just now, in NIO transport mode, that is, the number of cycles that have been executed, is added to each loop.

Then take a look at the loop termination condition allocHandle.continueReading ()

Follow to the continueReading () method of MaxMessageHandle:

Public boolean continueReading () {/ / config.isAutoRead () returns true / / totalMessages < maxMessagePerRead / / totalMessages by default, and defaults to 1 / / maxMessagePerRead maximum number of links per read (default 16) return config.isAutoRead () & & attemptedBytesRead = = lastBytesRead & & totalMessages < maxMessagePerRead & & totalBytesRead < Integer.MAX_VALUE;}

We analyze and judge the conditions one by one:

Config.isAutoRead (): here defaults to true

AttemptedBytesRead = = lastBytesRead: indicates that the number of bytes read this time is equal to the number of bytes read last time. Since no byte array is read here, the default is 0, and true is also returned here.

TotalMessages < maxMessagePerRead

Indicates whether the current number of reads is less than the maximum number of reads. We know that totalMessages will increase itself in each loop, and the default value of maxMessagePerRead is 16, so there is a limit of 16 loops, that is, a maximum of 16 connections can be read at a time.

TotalBytesRead < Integer.MAX_VALUE

Indicates that the number of bytes read cannot exceed the maximum of type int

Thank you for reading this article carefully. I hope the article "sample Analysis of Netty distributed client processing access event handle" shared by the editor will be helpful to you. At the same time, I also hope that you will support and pay attention to the industry information channel. More related knowledge is waiting for you to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report