In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article mainly explains the "detailed introduction of Java non-blocking IO and asynchronous IO". The content of the article is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "the detailed introduction of Java non-blocking IO and asynchronous IO".
Blocking mode IO
We have introduced the ServerSocketChannel, SocketChannel, and Buffer needed to use Java NIO packages to form a simple client-server network communication. Let's integrate them here to give a complete operational example:
Public class Server {public static void main (String [] args) throws IOException {ServerSocketChannel serverSocketChannel = ServerSocketChannel.open (); / / listen for TCP link serverSocketChannel.socket () .bind (new InetSocketAddress (8080)); while (true) {/ / it will block until a requested connection comes in SocketChannel socketChannel = serverSocketChannel.accept () / / start a new thread to process the request, and then continue listening on port 8080 SocketHandler handler = new SocketHandler (socketChannel); new Thread (handler). Start ();}} in the while loop
Here's a look at what the new thread needs to do, SocketHandler:
Public class SocketHandler implements Runnable {private SocketChannel socketChannel; public SocketHandler (SocketChannel socketChannel) {this.socketChannel = socketChannel;} @ Override public void run () {ByteBuffer buffer = ByteBuffer.allocate (1024); try {/ / reads the request data into Buffer int num While ((num = socketChannel.read (buffer)) > 0) {/ / flip buffer.flip () before reading Buffer content; / / extract data from Buffer byte [] bytes = new byte [num]; buffer.get (bytes) String re = new String (bytes, "UTF-8"); System.out.println ("request received:" + re); / / response client ByteBuffer writeBuffer = ByteBuffer.wrap (("I have received your request, your request content is:" + re). GetBytes (); socketChannel.write (writeBuffer) Buffer.clear ();}} catch (IOException e) {IOUtils.closeQuietly (socketChannel);}
Finally, post the use of SocketChannel on the client side. The client is relatively simple:
Public class SocketChannelTest {public static void main (String [] args) throws IOException {SocketChannel socketChannel = SocketChannel.open (); socketChannel.connect (new InetSocketAddress ("localhost", 8080)); / / send request ByteBuffer buffer = ByteBuffer.wrap ("1234567890" .getBytes ()); socketChannel.write (buffer); / / read response ByteBuffer readBuffer = ByteBuffer.allocate (1024); int num If ((num = socketChannel.read (readBuffer)) > 0) {readBuffer.flip (); byte [] re = new byte [num]; readBuffer.get (re); String result = new String (re, "UTF-8"); System.out.println ("return value:" + result);}
The blocking mode code described above should be easy to understand: if we make a new connection, we will open a new thread to handle the connection, and all the subsequent operations will be done by that thread.
So where is the performance bottleneck in this mode?
First of all, it is definitely not appropriate to open a new thread every time a connection is made. Of course, this can be done when the number of active connections is hundreds of thousands, but if the number of active connections is tens of thousands or hundreds of thousands, so many threads will obviously fail. Each thread needs a portion of memory, which is consumed quickly, and thread switching is very expensive.
Second, blocking operations are also a problem here. First of all, accept () is a blocking operation. When accept () returns, it means that a connection is ready to be used. We are going to create a new thread to process the SocketChannel here, but it does not mean that the other party has transferred the data. So, the SocketChannel#read method will block and wait for data, which is obviously not worth the wait. Similarly, the write method also needs to wait for the channel to be writable before performing the write operation, and the blocking wait here is not worth it.
Non-blocking IO
After talking about the use of blocking mode and its disadvantages, we can introduce non-blocking IO here.
The core of non-blocking IO is to use a Selector to manage multiple channels, either SocketChannel or ServerSocketChannel, register each channel with Selector and specify listening events.
You can then poll the Selector with only one thread to see if there is a channel ready, and then start reading and writing when the channel is ready to read or write, which is fast. There is no need for us to have a thread for each channel.
Selector in NIO is an abstraction of the implementation of the underlying operating system, and the management channel state is actually implemented by the underlying system. Here is a brief introduction to the implementation under different systems.
Select: it was implemented in the 1980s. It supports registering FD_SETSIZE (1024) socket, which was certainly enough in those days, but now it certainly won't work.
Poll:1997 year, the emergence of poll as a replacement for select, the biggest difference is that poll no longer limits the number of socket.
The common problem with both select and poll is that they only tell you how many channels are ready, but not which ones. Therefore, once you know that there are channels ready, you still need to do a scan, which is obviously not good, and it is OK when there are few channels. Once the number of channels is more than several hundred thousand, the time of scanning is considerable, and the time complexity is O (n). Therefore, the following implementation was later spawned.
Epoll:2002 is released with the Linux kernel 2.5.44 every year. Epoll can directly return the specific prepared channel with a time complexity of O (1).
In addition to the appearance of Kqueue in the epoll,2000 year FreeBSD in Linux, there is also / dev/poll in Solaris.
There are so many implementations mentioned earlier, but there is no non-blocking IO for the Windows,Windows platform using select, and we don't have to think that Windows is lagging behind. The asynchronous IO provided by IOCP is more powerful in Windows.
Let's go back to Selector. After all, JVM is such a platform that shields the underlying implementation, so we can program for Selector.
I've already learned the basic usage of Selector when I introduced it. Here's a runnable example code that you can take a look at:
Public class SelectorServer {public static void main (String [] args) throws IOException {Selector selector = Selector.open (); ServerSocketChannel server = ServerSocketChannel.open (); server.socket () .bind (new InetSocketAddress (8080)); / / register it in Selector and listen for OP_ACCEPT event server.configureBlocking (false); server.register (selector, SelectionKey.OP_ACCEPT) While (true) {int readyChannels = selector.select (); if (readyChannels = = 0) {continue;} Set readyKeys = selector.selectedKeys (); / / traversal Iterator iterator = readyKeys.iterator (); while (iterator.hasNext ()) {SelectionKey key = iterator.next () Iterator.remove (); if (key.isAcceptable ()) {/ / there is a new connection to the server that has been accepted SocketChannel socketChannel = server.accept () / / having a new connection does not mean that the channel has data. / / here, register the new SocketChannel with Selector, listen for OP_READ events, and wait for data socketChannel.configureBlocking (false); socketChannel.register (selector, SelectionKey.OP_READ) } else if (key.isReadable ()) {/ / have data to read / / SocketChannel SocketChannel socketChannel = (SocketChannel) key.channel () that listens to OP_READ events registered in the above if branch; ByteBuffer readBuffer = ByteBuffer.allocate (1024); int num = socketChannel.read (readBuffer) If (num > 0) {/ / processing incoming data. System.out.println ("received data:" + new String (readBuffer.array ()). Trim ()); ByteBuffer buffer = ByteBuffer.wrap ("data returned to the client..." .getBytes ()); socketChannel.write (buffer) } else if (num = =-1) {/ /-1 indicates that the connection has been closed socketChannel.close ();}
As for the client, you can continue to test using the client that described the blocking mode in the previous section.
NIO.2 Asynchronous IO
More New IO, or NIO.2, released with JDK 1.7, includes the introduction of asynchronous IO interfaces and file access interfaces such as Paths.
I think the word asynchronous is familiar to most developers, and we use it in many scenarios.
Typically, we have a thread pool for executing asynchronous tasks, and the thread that submits the task submits the task to the thread pool and then returns immediately, without having to wait until the task is actually completed. If you want to know the execution result of a task, it is usually by passing a callback function that is called at the end of the task.
By the same token, asynchronous IO in Java is the same, where a thread pool executes the task and then uses callbacks or to query the results yourself.
Most developers know why they design it this way, so let's talk a little bit more about it here. The main purpose of asynchronous IO is to control the number of threads and reduce the memory consumption caused by too many threads and the overhead of CPU in thread scheduling.
In systems such as Unix/Linux, JDK uses thread pools in concurrent packages to manage tasks. You can check the source code of AsynchronousChannelGroup.
In the Windows operating system, it provides a scheme called IOCP O Completion Ports. The operating system is responsible for managing the thread pool, and its performance is very excellent, so JDK directly uses the support of IOCP in Windows, using system support to expose more operation information to the operating system, and also enables the operating system to optimize our IO to a certain extent.
In fact, asynchronous IO system is implemented in Linux, but there are many limitations and general performance, so JDK adopts the way of self-built thread pool.
This article is still practical-based, if you want to know more information, please find other materials on your own. Here is a practical introduction to Java asynchronous IO.
There are a total of three classes we need to focus on, AsynchronousSocketChannel,AsynchronousServerSocketChannel and AsynchronousFileChannel, but with a prefix Asynchronous to the class names of FileChannel, SocketChannel, and ServerSocketChannel introduced earlier.
Java asynchronous IO provides two ways to use it, namely, returning an Future instance and using a callback function.
1. Return the Future instance
We should be familiar with the way java.util.concurrent.Future instances are returned, as is how JDK thread pools are used. Several method semantics of the Future interface are also common here, so let's start with a brief introduction.
Future.isDone ()
Determine whether the operation has been completed, including normal completion, exception throwing, and cancellation
Future.cancel (true)
Cancel the operation by interrupting. The parameter true says that even if the task is being executed, it will be interrupted.
Future.isCancelled ()
Whether it is cancelled or not, this method will return true only if it is cancelled before the task ends normally.
Future.get ()
This is our old friend, get the execution result, block.
Future.get (10, TimeUnit.SECONDS)
If you are not satisfied with the blocking of the get () method above, set a timeout.
2. Provide CompletionHandler callback function
Java.nio.channels.CompletionHandler API definition:
Public interface CompletionHandler {void completed (V result, An attachment); void failed (Throwable exc, An attachment);}
Note that there is an attachment on the parameter, and although it is not commonly used, we can pass this parameter value in the various supported methods
AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel.open () .bind (null); / / the first parameter of the accept method can be passed attachmentlistener.accept (attachment, new CompletionHandler () {public void completed (AsynchronousSocketChannel client, Object attachment) {/ /} public void failed (Throwable exc, Object attachment) {/ /}}); AsynchronousFileChannel
There are many introduction articles about Non-Blocking IO on the Internet, but there are relatively few articles about Asynchronous IO, so I will introduce some more related content here.
First, let's take a look at the asynchronous file IO. As we said earlier, the file IO does not support non-blocking mode in all operating systems, but we can use an asynchronous approach to the file IO to improve performance.
Next, I will introduce some important interfaces in AsynchronousFileChannel, all of which are very simple. If readers find it boring, just skim to the next title.
Instantiate:
AsynchronousFileChannel channel = AsynchronousFileChannel.open (Paths.get ("/ Users/hongjie/test.txt"))
Once the instantiation is complete, we can begin to prepare to read the data into Buffer:
ByteBuffer buffer = ByteBuffer.allocate (1024); Future result = channel.read (buffer, 0)
Both read and write operations of the asynchronous file channel need to provide a start location of the file, which starts at 0.
In addition to returning Future instances, you can also use callback functions. The APIs are as follows:
Public abstract void read (ByteBuffer dst, long position, An attachment, CompletionHandler handler)
By the way, post the interfaces of the two versions of the write operation:
Public abstract Future write (ByteBuffer src, long position); public abstract void write (ByteBuffer src, long position, An attachment, CompletionHandler handler)
We can see that the reading and writing of AIO is mainly related to Buffer, which is in line with NIO.
In addition, methods are provided for brushing data from memory to disk:
Public abstract void force (boolean metaData) throws IOException
Because of our write operations on files, the operating system does not operate directly on files. The system caches them and then periodically flushes them to disk. You can call this method if you want to write the data to disk in time to avoid partial data loss caused by a power outage. If the parameter is set to true, it means that the file property information is also updated to disk.
In addition, it also provides the locking function of the file, we can lock part of the data of the file, so that we can carry out exclusive operation.
Public abstract Future lock (long position, long size, boolean shared)
Position is the beginning of the content to be locked, size indicates the size of the area to lock, and shared indicates whether a shared lock or an exclusive lock is required
Of course, you can also use the version of the callback function:
Public abstract void lock (long position, long size, boolean shared, An attachment, CompletionHandler handler)
The tryLock method is also provided on the file locking feature, which quickly returns the result:
Public abstract FileLock tryLock (long position, long size, boolean shared) throws IOException
This method is simple: try to acquire the lock, and if the area is locked by another thread or other application, return null immediately, otherwise return the FileLock object.
Generally speaking, the operation of AsynchronousFileChannel is based on the interfaces introduced above, which is relatively simple, so let's finish as soon as possible with less nonsense here.
AsynchronousServerSocketChannel
This class corresponds to the ServerSocketChannel of non-blocking IO, which can be used by analogy.
Let's cut the crap and talk about things in code:
Package com.javadoop.aio;import java.io.IOException;import java.net.InetSocketAddress;import java.net.SocketAddress;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousServerSocketChannel;import java.nio.channels.AsynchronousSocketChannel;import java.nio.channels.CompletionHandler;public class Server {public static void main (String [] args) throws IOException {/ / instantiate and listen on port AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open () .bind (new InetSocketAddress (8080)) / define an Attachment class to pass some information Attachment att = new Attachment (); att.setServer (server); server.accept (att, new CompletionHandler () {@ Override public void completed (AsynchronousSocketChannel client, Attachment att) {try {SocketAddress clientAddr = client.getRemoteAddress () System.out.println ("receive new connection:" + clientAddr); / / after receiving a new connection, server should call the accept method again and wait for the new connection to come in att.getServer () .accept (att, this); Attachment newAtt = new Attachment (); newAtt.setServer (server) NewAtt.setClient (client); newAtt.setReadMode (true); newAtt.setBuffer (ByteBuffer.allocate (2048)); / / you can continue to use anonymous implementation classes here, but the code is not good-looking, so a class client.read (newAtt.getBuffer (), newAtt, new ChannelHandler ()) is defined here. } catch (IOException ex) {ex.printStackTrace ();} @ Override public void failed (Throwable t, Attachment att) {System.out.println ("accept failed");}}) / / to prevent main threads from exiting try {Thread.currentThread () .join ();} catch (InterruptedException e) {}
Take a look at the ChannelHandler class:
Package com.javadoop.aio;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.CompletionHandler;import java.nio.charset.Charset;public class ChannelHandler implements CompletionHandler {@ Override public void completed (Integer result, Attachment att) {if (att.isReadMode ()) {/ / read data from the client ByteBuffer buffer = att.getBuffer (); buffer.flip () Byte bytes [] = new byte [buffer.limit ()]; buffer.get (bytes); String msg = new String (buffer.array ()). ToString (). Trim (); System.out.println ("receive data from the client:" + msg); / / return data buffer.clear () in response to the client request Buffer.put ("Response from server!" .getBytes (Charset.forName ("UTF-8")); att.setReadMode (false); buffer.flip (); / / write data to the client is also asynchronous att.getClient () .write (buffer, att, this) } else {/ / here, it means that writing data to the client is over. There are two options: / / 1\. Continue to wait for the client to send new data to / / att.setReadMode (true); / / att.getBuffer () .clear (); / / att.getClient () .read (att.getBuffer (), att, this); / / 2\. Now that the server has returned the data to the client, disconnect the connection try {att.getClient (). Close ();} catch (IOException e) {}} @ Override public void failed (Throwable t, Attachment att) {System.out.println ("connection disconnected");}}
By the way, post the custom Attachment class:
Public class Attachment {private AsynchronousServerSocketChannel server; private AsynchronousSocketChannel client; private boolean isReadMode; private ByteBuffer buffer; / / getter & setter}
In this way, a simple server is written, and then you can receive client requests. We all use callback functions above. If you are interested, you can try to write one that uses Future.
AsynchronousSocketChannel
In fact, after talking about the AsynchronousServerSocketChannel above, the reader basically knows how to use AsynchronousSocketChannel, which is similar to non-blocking IO.
Here is a simple demonstration so that readers can test it with the Server introduced earlier.
Package com.javadoop.aio;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousSocketChannel;import java.nio.charset.Charset;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;public class Client {public static void main (String [] args) throws Exception {AsynchronousSocketChannel client = AsynchronousSocketChannel.open (); / / Future future = client.connect in Future form (new InetSocketAddress (8080)) / / Block for a while and wait for the connection to succeed future.get (); Attachment att = new Attachment (); att.setClient (client); att.setReadMode (false); att.setBuffer (ByteBuffer.allocate (2048)); byte [] data = "I am obot!" .getBytes (); att.getBuffer (). Put (data); att.getBuffer (). Flip () / / asynchronously send data to the server client.write (att.getBuffer (), att, new ClientChannelHandler ()); / / take a break here and then exit, giving enough time to process the data Thread.sleep (2000);}}
Take a look inside the ClientChannelHandler class:
Package com.javadoop.aio;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.CompletionHandler;import java.nio.charset.Charset;public class ClientChannelHandler implements CompletionHandler {@ Override public void completed (Integer result, Attachment att) {ByteBuffer buffer = att.getBuffer (); if (att.isReadMode ()) {/ / read the data from the server buffer.flip () Byte [] bytes = new byte [buffer.limit ()]; buffer.get (bytes); String msg = new String (bytes, Charset.forName ("UTF-8")); System.out.println ("response data received from the server:" + msg); / / next, there are two options: / / 1\. Send new data to the server / / att.setReadMode (false); / / buffer.clear (); / / String newMsg = "new message from client"; / / byte [] data = newMsg.getBytes (Charset.forName ("UTF-8")); / / buffer.put (data); / / buffer.flip () / / att.getClient () .write (buffer, att, this); / / 2\. Close the connection try {att.getClient (). Close ();} catch (IOException e) {}} else {/ / after the write operation is completed, you will enter here att.setReadMode (true); buffer.clear (); att.getClient () .read (buffer, att, this) } @ Override public void failed (Throwable t, Attachment att) {System.out.println ("Server is not responding");}}
The above code can be run and debugged. If the reader has any problems, please leave a message in the comments section.
Asynchronous Channel Groups
For the sake of knowledge integrity, it is necessary to introduce group, that is, to introduce the class AsynchronousChannelGroup. As we said earlier, there must be a thread pool for asynchronous IO that receives tasks, handles IO events, callbacks, and so on. This thread pool is inside the group, and once group is closed, the corresponding thread pool will be closed.
AsynchronousServerSocketChannels and AsynchronousSocketChannels belong to group, and when we call the open () method of AsynchronousServerSocketChannel or AsynchronousSocketChannel, the corresponding channel belongs to the default group, which is automatically constructed and managed by JVM.
If we want to configure this default group, we can specify the following system variables in the JVM startup parameters:
Java.nio.channels.DefaultThreadPool.threadFactory
This system variable is used to set ThreadFactory, which should be the fully qualified class name of the java.util.concurrent.ThreadFactory implementation class. Once we have specified this ThreadFactory, threads in group will use this class to generate.
Java.nio.channels.DefaultThreadPool.initialSize
This system variable is also easy to understand and is used to set the initial size of the thread pool.
You may want to use a self-defined group so that you can have more control over the threads in it, using the following methods:
AsynchronousChannelGroup.withCachedThreadPool (ExecutorService executor, int initialSize)
AsynchronousChannelGroup.withFixedThreadPool (int nThreads, ThreadFactory threadFactory)
AsynchronousChannelGroup.withThreadPool (ExecutorService executor)
Readers familiar with thread pools should have a good understanding of these methods, which are static methods in AsynchronousChannelGroup.
As for the use of group is very simple, the code is easy to understand:
AsynchronousChannelGroup group = AsynchronousChannelGroup. WithFixedThreadPool (10, Executors.defaultThreadFactory ()); AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open (group); AsynchronousSocketChannel client = AsynchronousSocketChannel.open (group)
AsynchronousFileChannels does not belong to group. But they are also associated with a thread pool, and if not specified, the system default thread pool will be used. If you want to use the specified thread pool, you can use the following methods when instantiating:
Public static AsynchronousFileChannel open (Path file, Set... Attrs) {.}
At this point, the introduction of asynchronous IO is complete.
Thank you for your reading. the above is the content of "detailed introduction of Java non-blocking IO and asynchronous IO". After the study of this article, I believe you have a deeper understanding of the detailed introduction of Java non-blocking IO and asynchronous IO, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.