Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the core process of Java NIO2 AIO development?

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

This article mainly explains "what is the core process of Java NIO2 AIO development". The content of the explanation is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "what is the core process of Java NIO2 AIO development".

According to the division of "Unix network programming", the IO model can be divided into blocking IO, non-blocking IO, IO multiplexing, signal-driven IO and asynchronous IO. According to the POSIX standard, it can be divided into two categories: synchronous IO and asynchronous IO. How to tell the difference? First of all, an IO operation is actually divided into two steps: initiating an IO request and the actual IO operation. The difference between synchronous IO and asynchronous IO lies in whether the second step is blocked. If the actual IO read and write blocking request process, it is synchronous IO, so blocking IO, non-blocking IO, IO taking, and signal-driven IO are all synchronous IO. If there is no blocking, the operating system helps you finish the IO operation and then returns the result to you. So it's asynchronous IO. The difference between blocking IO and non-blocking IO lies in whether the originating IO request will be blocked in the first step, traditional blocking IO if blocking until completion, and non-blocking IO if not blocking.

The main improvement of Java nio 2.0 is the introduction of asynchronous IO (including files and networks). This paper mainly introduces the use of asynchronous network IO API and the design of the framework, taking the TCP server as an example. First, take a look at the new classes and interfaces introduced to support AIO:

Java.nio.channels.AsynchronousChannel

Mark a channel that supports asynchronous IO operations.

Java.nio.channels.AsynchronousServerSocketChannel

Aio version of ServerSocket, create TCP server, bind address, listening port, etc.

Java.nio.channels.AsynchronousSocketChannel

A stream-oriented asynchronous socket channel that represents a connection.

Java.nio.channels.AsynchronousChannelGroup

The grouping management of asynchronous channel is for resource sharing. An AsynchronousChannelGroup binds to a thread pool that performs two tasks: handling IO events and dispatching CompletionHandler. When AsynchronousServerSocketChannel is created, you can pass in an AsynchronousChannelGroup, then the AsynchronousSocketChannel created through AsynchronousServerSocketChannel will belong to the same group and share resources.

Java.nio.channels.CompletionHandler

The callback API for the result of asynchronous IO operation, which is used to define the callback work done after the completion of the IO operation. AIO's API allows two ways to handle the results of asynchronous operations: the returned Future mode or the registration CompletionHandler, which I prefer to use CompletionHandler, where calls to handler are dispatched by AsynchronousChannelGroup's thread pool. Obviously, the size of the thread pool is a key factor in performance. AsynchronousChannelGroup allows you to bind different thread pools and create them through three static methods:

Public static AsynchronousChannelGroup withFixedThreadPool (int nThreads, ThreadFactory threadFactory) throws IOException public static AsynchronousChannelGroup withCachedThreadPool (ExecutorService executor, int initialSize) public static AsynchronousChannelGroup withThreadPool (ExecutorService executor) throws IOException

Need to adjust according to the specific application, from the framework point of view, need to expose such configuration options to the user.

After introducing the main interfaces and classes of TCP introduced by aio, let's imagine how the next aio framework should be designed. Refer to the design of the non-blocking nio framework, generally using Reactor mode. Reacot is responsible for event registration, select and event dispatch. Accordingly, asynchronous IO has a Proactor mode, and Proactor is responsible for CompletionHandler distribution. Check the flow of a typical IO write operation to see the difference between the two:

Reactor: send (msg)-> whether the message queue is empty, if it is empty-> register OP_WRITE with Reactor, then return-> Reactor select-> trigger Writable, notify the user thread to process-> log out Writable first (the problem with cpu 100% encountered by many people is that it is not logged out), deal with Writeable, if not fully written, continue to register OP_WRITE. Notice that the writing work is still being processed by the user thread.

Proactor: send (msg)-> whether the message queue is empty, if so, initiates an read asynchronous call, registers CompletionHandler, and returns. -> the operating system is responsible for writing your message and returning the result (number of bytes written) to Proactor-> Proactor dispatch CompletionHandler. It can be seen that the work of writing is handled by the operating system without the participation of user threads. In fact, AsynchronousChannelGroup plays the role of Proactor in aio's API.

CompletionHandler has three methods, corresponding to callback processing in the case of success, failure, and cancellation (through the returned Future):

Public interface CompletionHandler {void completed (V result, An attachment); void failed (Throwable exc, An attachment); void cancelled (An attachment);}

The generic parameter V represents the result of the IO call, and An is the attchment passed in when the call is made.

After a preliminary introduction to the classes and interfaces introduced by aio, let's take a look at how a typical tcp server starts, accepts connections and handles reading and writing. The code referenced here is the code in the aio branch of yanf4j, which can be obtained from the svn checkout,svn address: http://yanf4j.googlecode.com/svn/branches/yanf4j-aio

The first step is to create an AsynchronousServerSocketChannel. Create an AsynchronousChannelGroup before creation. As mentioned above, AsynchronousServerSocketChannel can be bound to an AsynchronousChannelGroup, then all connections established through this AsynchronousServerSocketChannel will belong to the same AsynchronousChannelGroup and share resources:

This.asynchronousChannelGroup = AsynchronousChannelGroup. WithCachedThreadPool (Executors.newCachedThreadPool (), this.threadPoolSize)

Then initialize an AsynchronousServerSocketChannel, through the open method:

This.serverSocketChannel = AsynchronousServerSocketChannel .open (this.asynchronousChannelGroup)

Set some TCP options through the SocketOption class introduced in nio 2.0:

This.serverSocketChannel .setOption (StandardSocketOption.SO_REUSEADDR,true); this.serverSocketChannel .setOption (StandardSocketOption.SO_RCVBUF,16*1024)

Bind local address:

This.serverSocketChannel. Bind (new InetSocketAddress ("localhost", 8080)

Of these, 100 is used to specify the queue size (backlog) waiting for a connection. Are we done? Not yet, the most important monitoring work has not yet started. The listening port is to wait for the connection to come up so that accept can generate an AsynchronousSocketChannel to represent a newly established connection, so you need to make an accept call. The call is asynchronous, and the operating system will return the final result, AsynchronousSocketChannel, to you after the connection is established.

Public void pendingAccept () {if (this.started & & this.serverSocketChannel.isOpen ()) {this.acceptFuture = this.serverSocketChannel.accept (null, new AcceptCompletionHandler ());} else {throw new IllegalStateException ("Controller has been closed");}}

Note that repeated accept calls will throw PendingAcceptException, as will read and write mentioned later. The first parameter of the accept method is the attchment you want to pass to CompletionHandler, the second parameter is the registered CompletionHandler for callback, and the final result Future is returned. You can handle future, and the more recommended way here is to register a CompletionHandler. So what does accept do in CompletionHandler? Obviously, a naked AsynchronousSocketChannel is not enough, we need to encapsulate it into a session, and a session represents a connection (called IoSession in mina) with a buffered message queue and some other resources. After the connection is established, unless your server is ready to accept only one connection, you need to continue to call pendingAccept later to initiate another accept request:

Private final class AcceptCompletionHandler implements CompletionHandler {@ Override public void cancelled (Object attachment) {logger.warn ("Accept operation was canceled") } @ Override public void completed (AsynchronousSocketChannel socketChannel, Object attachment) {try {logger.debug ("Accept connection from" + socketChannel.getRemoteAddress ()); configureChannel (socketChannel); AioSessionConfig sessionConfig = buildSessionConfig (socketChannel) Session session = new AioTCPSession (sessionConfig, AioTCPController.this.configuration. GetSessionReadBufferSize (), AioTCPController.this.sessionTimeout); session.start (); registerSession (session) } catch (Exception e) {e.printStackTrace (); logger.error ("Accept error", e); notifyException (e);} finally {pendingAccept () } @ Override public void failed (Throwable exc, Object attachment) {logger.error ("Accept error", exc); try {notifyException (exc);} finally {pendingAccept ();}

Notice that we call pendingAccept at the end of both the failed and completed methods to continue to initiate the accept call, waiting for the new connection to come up. Some students may want to say, this is not a recursive call, will it be stack overflow? Actually not, because the thread that initiated the accept call is not the same as the CompletionHandler callback thread, not in the same context, and there is no coupling relationship between the two. Note that CompletionHandler callbacks share AsynchronousChannelGroup-bound thread pools, so never call blocking or long-term operations in CompletionHandler callback methods, such as sleep. Callback methods should support timeouts to prevent thread pool exhaustion.

After the connection is established, how to read and write? Recall that in the nonblocking nio framework, what is the first thing to do after a connection is established? Register the OP_READ event to wait for socket to be readable. The same is true of asynchronous IO, where an asynchronous read call is made immediately after the connection is established, waiting for socket to be readable, which is what is done in the Session.start method:

Public class AioTCPSession {protected void start0 () {pendingRead ();} protected final void pendingRead () {if (! isClosed () & & this.asynchronousSocketChannel.isOpen ()) {if (! this.readBuffer.hasRemaining ()) {this.readBuffer = ByteBufferUtils .increaseBufferCapatity (this.readBuffer) } this.readFuture = this.asynchronousSocketChannel.read (this.readBuffer, this, this.readCompletionHandler);} else {throw new IllegalStateException ("Session Or Channel has been closed");}

AsynchronousSocketChannel's read call is similar to AsynchronousServerSocketChannel's accept call, and the return result is also a Future, but the result of the write is an integer, indicating how many bytes have been written, so the read call returns Future, the first parameter of the method is the read buffer, the operating system copies the IO read data to this buffer, the second parameter is the attchment passed to CompletionHandler, and the third parameter is the registered CompletionHandler for callback. The result Future of read is saved here so that the call can be canceled actively when the connection is closed, as is the case with accept. Now take a look at the implementation of read's CompletionHandler:

Public final class ReadCompletionHandler implements CompletionHandler {private static final Logger log = LoggerFactory .getLogger (ReadCompletionHandler.class); protected final AioTCPController controller; public ReadCompletionHandler (AioTCPController controller) {this.controller = controller;} @ Override public void cancelled (AbstractAioSession session) {log.warn ("Session (" + session.getRemoteSocketAddress () + ") read operation was canceled") } @ Override public void completed (Integer result, AbstractAioSession session) {if (log.isDebugEnabled ()) log.debug ("Session (" + session.getRemoteSocketAddress () + ") read +" + result + "bytes"); if (result

< 0) { session.close(); return; } try { if (result >

0) {session.updateTimeStamp (); session.getReadBuffer () .flip (); session.decode (); session.getReadBuffer () .compact ();} finally {try {session.pendingRead () } catch (IOException e) {session.onException (e); session.close ();}} controller.checkSessionTimeout ();} @ Override public void failed (Throwable exc, AbstractAioSession session) {log.error ("Session read error", exc); session.onException (exc) Session.close ();}}

If the IO read fails, the exception caused by the failure will be returned. In this case, we actively close the connection. Through the session.close () method, this method does two things: close the channel and cancel the read call:

If (null! = this.readFuture) {this.readFuture.cancel (true);} this.asynchronousSocketChannel.close ()

In the case of successful reading, we also need to determine whether the result result is less than 0. If it is less than 0, the peer is closed. In this case, we also actively close the connection and return. If we read a certain byte, that is, if the result is greater than 0, we try to decode the message from the read buffer and send the callback method to the business processor. Finally, we continue to initiate the read call through pendingRead to wait for the next readability of socket. It can be seen that we do not need to call channel for IO reading, but the operating system helps you read directly to the buffer, and then gives you a result indicating how many bytes have been read, and you can deal with the result. In the nonblocking IO framework, it is reactor that notifies the user thread that the socket is readable, and then the user thread invokes read for the actual read operation. It is also important to note that the delivery of messages from decode to the business processor is best handled by a thread pool to avoid blocking the thread pool of group bindings.

The IO write operation is similar, but usually we associate a buffer queue in session for processing. Messages that are not fully written or waiting for writing are stored in the queue, and the write call is initiated if the queue is empty:

Protected void write0 (WriteMessage message) {boolean needWrite = false; synchronized (this.writeQueue) {needWrite = this.writeQueue.isEmpty (); this.writeQueue.offer (message);} if (needWrite) {pendingWrite (message);}} protected final void pendingWrite (WriteMessage message) {message = preprocessWriteMessage (message) If (! isClosed () & & this.asynchronousSocketChannel.isOpen ()) {this.asynchronousSocketChannel.write (message.getWriteBuffer (), this, this.writeCompletionHandler);} else {throw new IllegalStateException ("Session Or Channel has been closed");}}

The result returned by the write call is the same Future as read, and the core logic of write's CompletionHandler processing is something like this:

@ Override public void completed (Integer result, AbstractAioSession session) {if (log.isDebugEnabled ()) log.debug ("Session (" + session.getRemoteSocketAddress () + ") writen" + result + "bytes"); WriteMessage writeMessage; Queue writeQueue = session.getWriteQueue (); synchronized (writeQueue) {writeMessage = writeQueue.peek () If (writeMessage.getWriteBuffer () = = null | |! writeMessage.getWriteBuffer () .hasRemaining ()) {writeQueue.remove (); if (writeMessage.getWriteFuture ()! = null) {writeMessage.getWriteFuture () .setResult (Boolean.TRUE) } try {session.getHandler () .onMessageSent (session, writeMessage.getMessage ());} catch (Exception e) {session.onException (e);} writeMessage = writeQueue.peek () } if (writeMessage! = null) {try {session.pendingWrite (writeMessage);} catch (IOException e) {session.onException (e); session.close ();}

The result in the compete method is the actual number of bytes written, and then we determine whether there is any buffer left in the message, remove the message from the queue if not, and continue to make the write call if there are still messages in the queue.

Repeat, the code quoted here is the source code in the yanf4j aio branch, interested friends can directly check out out to have a look: http://yanf4j.googlecode.com/svn/branches/yanf4j-aio.

After the introduction of aio, java's support for the network layer has been very perfect, and java has become one of the preferred languages for server development. The weakness of java lies in the management of memory, and since all this is left to GC, it is still dominated by Cpp on high-performance network servers. There is still a gap between the single heap model of java and the in-process heap model of erlang, so it is difficult to achieve efficient garbage collection and fine-grained memory management.

This paper only introduces the core process of aio development. For a network framework, we also need to consider the processing of timeout, the processing of buffering buffer, the segmentation of business layer and network layer, scalability, the tunability of performance and some general requirements.

Thank you for your reading, the above is the content of "what is the core process of Java NIO2 AIO development". After the study of this article, I believe you have a deeper understanding of what the core process of Java NIO2 AIO development is, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report