Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Java Network IO Model and its Classification

2025-01-30 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

This article focuses on "Java network IO model and classification", interested friends may wish to have a look. The method introduced in this paper is simple, fast and practical. Next, let the editor take you to learn the "Java Network IO Model and Classification"!

Network IO Model and its Classification

Network IO model is a frequently mentioned problem, different books or blogs may have different opinions, so there is no need to be stingy, the key is to understand.

Socket connection

Regardless of the model, the socket connection used is the same.

The following is a typical connection on an application server. Various devices of the customer interact with the Tomcat process through the Http protocol. Tomcat needs to access the Redis server, and it has established several connections with the Redis server. Although the client has a short connection to Tomcat and will soon be disconnected, while Tomcat and Redis are long-connected, they are essentially the same.

After establishing a Socket, it is a pair of "local IP+port and remote IP+port". The Socket is created by the system call of the application process calling the operating system, there will be a corresponding structure in kernel space, and the application gets a file descriptor (File Describer), just like opening a normal file, which can be read and written. Different processes have their own file descriptor spaces. For example, there is a socket with a fd of 100in process 1 and a socket with a fd of 100in process 2, and their corresponding socket is different (of course, it is possible that the socket can also be shared).

Socket is full-duplex and can read and write at the same time.

For different application scenarios, the choice of network IO model and other options are different.

For example, for client http requests, we generally use short connections, because there are too many customers, and there may be many customers using App at the same time, but the number of clients sending requests at the same time is far less than the number of customers in use. If long connections are established, there will certainly be insufficient memory, so short connections will be used. Of course, there will be http's keep-alive strategy to allow multiple interactions of http data in one tcp connection, which can reduce chain building. For the internal applications of the system, such as Tomcat accessing Redis, the number of machines accessed is limited. If you use short connections every time, there will be too much loss in building chains, so using long connections can greatly improve efficiency.

The above mentioned are long connections and short connections, which are generally not considered when discussing the IO model, but synchronous and asynchronous, blocking and non-blocking and so on. To determine which IO model to use, it also depends on the scenario. For CPU-intensive applications, such as a request that requires two cores to run continuously for 1 minute, and then returns the result, this application uses the same IO model, because the bottleneck lies in CPU. So generally IO-intensive applications consider how to adjust the IO model to achieve maximum efficiency, the most typical is Web applications, and applications like Redis.

The concept of synchronous async and blocking non-blocking

Synchronous and asynchronous: describes the interaction between the user thread and the kernel. Synchronous means that the user thread needs to wait or poll the kernel IO operation after initiating the IO request before continuing to execute; while asynchronous means that the user thread continues to execute after initiating the IO request, notifies the user thread when the kernel IO operation is completed, or calls the callback function registered by the user thread.

Blocking and non-blocking: description refers to the way the user thread invokes the kernel IO operation. Blocking means that the IO operation needs to be completed completely before returning to user space, while non-blocking means that the IO operation is called and immediately returns a status value to the user without waiting for the IO operation to be completed.

Different IO modes are illustrated by read function calls. Reading data from the peer is divided into two phases

(1) data from device to kernel space (waiting for data to arrive in the figure)

(2) data from kernel space to user space (data copy in figure)

The following blocking IO, non-blocking IO,IO multiplexing, are synchronous IO, and finally asynchronous IO. This place may be difficult to understand. In short, synchronous IO must be blocked all the time after the thread calls the read-write function, or poll the result, while the asynchronous IO will return immediately after calling the read-write function, and the operating system will actively tell the thread after the operation is completed.

Blocking IO

Blocking IO means that after calling read, you must wait for the data to arrive and copy it to user space before you can return, otherwise the whole thread is waiting all the time.

So the problem with blocking IO is that threads cannot do anything else while reading and writing IO.

Non-blocking IO

Non-blocking IO can return immediately after calling read, and then ask the operating system if the data is ready in kernel space, and if so, it can read out. Because we don't know when to be ready, to ensure real-time, we have to keep polling.

IO multiplexing (non-blocking IO)

When using non-blocking IO, if each thread keeps polling after accessing the network, then the thread is occupied, which is no different from blocking IO. Each thread polls its own socket, and these threads cannot do anything else.

If there is a special thread to poll all socket, if the data is ready, find a thread to process, this is IO multiplexing. Of course, the polling thread can also handle it on its own without looking for other threads, such as redis.

IO multiplexing allows one or more threads to manage many (possibly thousands) of socket connections, so that the number of connections is no longer limited to the number of threads that the system can start.

We extract the select poll and put it in a thread, and the user thread registers the relevant socket or IO request to it, and notifies the user thread when the data arrives, which can improve the CPU utilization of the user thread. In this way, the asynchronous mode is implemented.

The Reactor design pattern is used in this.

Asynchronous IO

A real asynchronous IO needs stronger support from the operating system. In the IO multiplexing model, the user thread is responsible for copying the data from the kernel space when the data arrives in the kernel, while in the asynchronous IO model, when the user thread receives the notification, the data has been copied by the operating system from the kernel to the buffer specified by the user, and the user thread can use it directly.

Asynchronous IO uses the Proactor design pattern.

Asynchronous IO is rarely used in common Web systems, so this article will not discuss it too much.

Next, a simple java version of redis is used to illustrate the various IO models.

Actual combat

Next I'll write a simple java version of Redis, which only has get and set functions, and only supports strings, just to demonstrate various IO models, some of which are not done properly, such as exception handling.

1. Blocking IO+ single thread + short connection

This practice is only used for writing HelloWorld programs, mainly for debugging and bringing up some common classes.

First, write a Redis interface.

Package org.ifool.niodemo.redis;public interface RedisClient {public String get (String key); public void set (String key,String value); public void close ();}

In addition, there is a utility class for processing the request and returning the result after getting the request data, and some byte to String,String to byte, adding some functions such as length before byte for later use.

Input is get | key or set | key | value, and output is 0 | value or 1 | null or 2 | bad command.

Package org.ifool.niodemo.redis;import java.util.Map;public class Util {/ / put a byte before a String to indicate the length public static byte [] addLength (String str) {byte len = (byte) str.length (); byte [] ret = new byte [len+1]; ret [0] = len; for (int I = 0; I)

< len; i++) { ret[i+1] = (byte)str.charAt(i); } return ret; } //根据input返回一个output,操作缓存, prefixLength为true,则在前面加长度 //input: //->

Get | key / /-> set | key | value / / output: / /-> errorcode | response / /-> 0 | response set is successful or get has a value / /-> 1 | null / /-> 2 for response get | bad command public static byte [] proce***equest (Map cache, byte [] request, int length, boolean prefixLength) {if (request = = null) {return prefixLength? AddLength ("2 | bad command"): "2 | bad command" .getBytes ();} String req = new String (request,0,length); Util.log_debug ("command:" + req); String [] params = req.split ("\ |"); if (params.length

< 2 || params.length >

3 | |! (params [0] .equals ("get") | | params [0] .equals ("set")) {return prefixLength? AddLength ("2 | bad command"): "2 | bad command" .getBytes ();} if (params [0] .equals ("get")) {String value = cache.get (params [1]); if (value = = null) {return prefixLength? AddLength ("1 | null"): "1 | null" .getBytes ();} else {return prefixLength? AddLength ("0 |" + value): ("0 |" + value). GetBytes ();}} if (params [0] .equals ("set") & & params.length > = 3) {cache.put (params [1], params [2]); return prefixLength? AddLength ("0 | success"): ("0 | success"). GetBytes ();} else {return prefixLength? AddLength ("2 | bad command"): "2 | bad command" .getBytes ();}} public static int LOG_LEVEL = 0; / / 0 info 1 debug public static void log_debug (String str) {if (LOG_LEVEL > = 1) {System.out.println (str) }} public static void log_info (String str) {if (LOG_LEVEL > = 0) {System.out.println (str);}

The server code is as follows. When creating a server ServerSocket, port 8888 is passed in. The function of backlog is the queue length that the server cannot immediately handle when the client establishes a connection and can wait. Server code

Package org.ifool.niodemo.redis.redis1;import org.ifool.niodemo.redis.Util;import java.io.IOException;import java.io.InputStream;import java.io.OutputStream;import java.net.InetSocketAddress;import java.net.ServerSocket;import java.net.Socket;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;public class RedisServer1 {/ / Global cache public static Map cache = new ConcurrentHashMap (); public static void main (String [] args) throws IOException {ServerSocket serverSocket = new ServerSocket (8888) Byte [] buffer = new byte [512]; while (true) {/ / accept client connection requests Socket clientSocket = null; clientSocket = serverSocket.accept (); System.out.println ("client address:" + clientSocket.getRemoteSocketAddress () .toString ()) / / read the data and operate the cache, and then write back the data try {/ / read data InputStream in = clientSocket.getInputStream (); int bytesRead = in.read (buffer,0512); int totalBytesRead = 0; while (bytesRead! =-1) {totalBytesRead + = bytesRead BytesRead = in.read (buffer,totalBytesRead,512-totalBytesRead);} / / Operation cache byte [] response = Util.proce***equest (cache,buffer,totalBytesRead,false); Util.log_debug ("response:" + new String (response)) / / write back data OutputStream os = clientSocket.getOutputStream (); os.write (response); os.flush (); clientSocket.shutdownOutput ();} catch (IOException e) {System.out.println ("read or write data exception") } finally {try {clientSocket.close ();} catch (IOException ex) {ex.printStackTrace ();}

The client code is as follows:

Package org.ifool.niodemo.redis.redis1;import org.ifool.niodemo.redis.RedisClient;import java.io.IOException;import java.io.InputStream;import java.io.OutputStream;import java.net.Socket;public class RedisClient1 implements RedisClient {public static void main (String [] args) {RedisClient redis = new RedisClient1 ("127.0.0.1", 8888); redis.set ("123,456"); String value = redis.get ("123") System.out.print (value);} private String ip; private int port; public RedisClient1 (String ip, int port) {this.ip = ip; this.port = port;} public String get (String key) {Socket socket = null; try {socket = new Socket (ip, port) } catch (IOException e) {throw new RuntimeException ("connect to" + ip + ":" + port + "failed");} try {/ / write data OutputStream os = socket.getOutputStream (); os.write (("get |" + key). GetBytes ()); socket.shutdownOutput () / / if not shutdown, the peer will wait for read / / read data InputStream in = socket.getInputStream (); byte [] buffer = new byte [512]; int offset = 0; int bytesRead = in.read (buffer); while (bytesRead! =-1) {offset + = bytesRead BytesRead = in.read (buffer, offset, 512-offset);} String [] response = (new String (buffer,0,offset)) .split ("\\ |"); if (response [0] .equals ("2")) {throw new RuntimeException ("bad command") } else if (response [0] .equals ("1")) {return null;} else {return response [1];}} catch (IOException e) {throw new RuntimeException ("network error");} finally {try {socket.close () } catch (IOException e) {e.printStackTrace ();} public void set (String key, String value) {Socket socket = null; try {socket = new Socket (ip, port);} catch (IOException e) {throw new RuntimeException ("connect to" + ip + ":" + port + "failed") } try {OutputStream os = socket.getOutputStream (); os.write (("set |" + key+ "|" + value). GetBytes ()); os.flush (); socket.shutdownOutput (); InputStream in = socket.getInputStream (); byte [] buffer = new byte [512]; int offset = 0 Int bytesRead = in.read (buffer); while (bytesRead! =-1) {offset + = bytesRead; bytesRead = in.read (buffer, offset, 512-offset);} String bufString = new String (buffer,0,offset); String [] response = bufString.split ("\ |") If (response [0] .equals ("2")) {throw new RuntimeException ("bad command");}} catch (IOException e) {throw new RuntimeException ("network error");} finally {try {socket.close ();} catch (IOException e) {e.printStackTrace () } public void close () {}} 2. Blocking IO+ multithread + short connection

General application servers use this model, the main thread has been blocking accept, a connection is handed over to a thread, continue to wait for the connection, and then the processing thread is responsible for closing the connection after reading and writing.

Server code

Package org.ifool.niodemo.redis.redis2;import org.ifool.niodemo.redis.Util;import java.io.IOException;import java.io.InputStream;import java.io.OutputStream;import java.net.ServerSocket;import java.net.Socket;import java.util.Map;import java.util.concurrent.*;public class RedisServer2 {/ / Global cache public static Map cache = new ConcurrentHashMap () Public static void main (String [] args) throws IOException {/ / Thread pool used to process requests ThreadPoolExecutor threadPool = new ThreadPoolExecutor (200,200,30,TimeUnit.SECONDS, new ArrayBlockingQueue (1000)); ServerSocket serverSocket = new ServerSocket (888j1000); while (true) {/ / accept client connection requests Socket clientSocket = serverSocket.accept () Util.log_debug (clientSocket.getRemoteSocketAddress (). ToString ()); / / Let the thread pool process the request threadPool.execute (new RequestHandler (clientSocket));} class RequestHandler implements Runnable {private Socket clientSocket; public RequestHandler (Socket socket) {clientSocket = socket;} public void run () {byte [] buffer = new byte [512] / / read the data and operate the cache, and then write back the data try {/ / read data InputStream in = clientSocket.getInputStream (); int bytesRead = in.read (buffer,0512); int totalBytesRead = 0; while (bytesRead! =-1) {totalBytesRead + = bytesRead BytesRead = in.read (buffer,totalBytesRead,512-totalBytesRead);} / / Operation cache byte [] response = Util.proce***equest (RedisServer2.cache,buffer,totalBytesRead,false); Util.log_debug ("response:" + new String (response)); / / write-back data OutputStream os = clientSocket.getOutputStream () Os.write (response); os.flush (); clientSocket.shutdownOutput ();} catch (IOException e) {System.out.println ("read or write data exception");} finally {try {clientSocket.close ();} catch (IOException ex) {ex.printStackTrace () }

The client code, the code is the same as before, but this time I added a multi-thread read and write, 10 threads each thread read and write 10000 times.

Public static void main (String [] args) {final RedisClient redis = new RedisClient1 ("127.0.0.1", 8888); redis.set ("123,456"); String value = redis.get ("123"); System.out.print (value); redis.close (); System.out.println (new Timestamp (System.currentTimeMillis (); testMultiThread () System.out.println (new Timestamp (System.currentTimeMillis ();} public static void testMultiThread () {Thread [] threads = new Thread [10]; for (int I = 0; I

< 10; i++) { threads[i] = new Thread(new Runnable() { public void run() { RedisClient redis = new RedisClient2("127.0.0.1",8888); for(int j=0; j < 300; j++) { Random rand = new Random(); String key = String.valueOf(rand.nextInt(1000)); String value = String.valueOf(rand.nextInt(1000)); redis.set(key,value); String value1 = redis.get(key); } } }); threads[i].start(); } for(int i = 0; i < 10; i++) { try { threads[i].join(); } catch (InterruptedException e) { e.printStackTrace(); } } } 用这种方式,在10个并发不停读写的情况下,写10000次,出现了一些没法连接的异常,如下: java.net.NoRouteToHostException: Can't assign requested address 查了下跟系统参数配置,mac上不知道怎么调就没调,改成读写300次的时候没报错,大约用1s钟。 3.阻塞IO+多线程+长连接 用短连接的时候,我们可以用inputstream.read() == -1来判断读取结束,但是用长连接时,数据是源源不断的,有可能有粘包或者半包问题,我们需要能从流中找到一次请求的开始和结束。有多种方式,例如使用固定长度、固定分隔符、在前面加长度等方法。此处使用前边加长度的方法,在前面放一个byte,表示一次请求的长度,byte最大是127,所以请求长度不应大于127个字节。 由于我们客户端访问的方式是写完请求后,等待服务端返回数据,等待期间该socket不会被其它人写,所以不存在粘包的问题,只存在半包的问题。有些请求方式可能是写完后在未等待服务端返回就允许其它线程写,那样就可能有半包。 一般客户端用长连接的时候,都是建一个连接池,用的时候上锁获取连接,我们在这个地方直接让一个线程持有一个连接一个读写,这样减少了线程切换与上锁的开销,能实现更大的吞吐量。 客户端代码这次发生了较大变化。 package org.ifool.niodemo.redis.redis3;import org.ifool.niodemo.redis.RedisClient;import org.ifool.niodemo.redis.Util;import java.io.IOException;import java.io.InputStream;import java.io.OutputStream;import java.net.Socket;import java.sql.Timestamp;import java.util.Random;public class RedisClient3 implements RedisClient { public static void main(String[] args) { RedisClient redis = new RedisClient3("127.0.0.1",8888); redis.set("123","456"); String value = redis.get("123"); System.out.print(value); redis.close(); System.out.println(new Timestamp(System.currentTimeMillis())); testMultiThread(); System.out.println(new Timestamp(System.currentTimeMillis())); } public static void testMultiThread() { Thread[] threads = new Thread[10]; for(int i = 0; i < 10; i++) { threads[i] = new Thread(new Runnable() { public void run() { RedisClient redis = new RedisClient3("127.0.0.1",8888); for(int j=0; j < 50; j++) { Random rand = new Random(); String key = String.valueOf(rand.nextInt(1000)); String value = String.valueOf(rand.nextInt(1000)); redis.set(key,value); String value1 = redis.get(key); } } }); threads[i].start(); } for(int i = 0; i < 10; i++) { try { threads[i].join(); } catch (InterruptedException e) { e.printStackTrace(); } } } private String ip; private int port; private Socket socket; public RedisClient3(String ip, int port) { this.ip = ip; this.port = port; try { socket = new Socket(ip, port); } catch(IOException e) { throw new RuntimeException("connect to " + ip + ":" + port + " failed"); } } public String get(String key) { try { //写数据,前边用一个byte存储长度 OutputStream os = socket.getOutputStream(); String cmd = "get|"+key; byte length = (byte)cmd.length(); byte[] data = new byte[cmd.length()+1]; data[0] = length; for(int i = 0; i < cmd.length(); i++) { data[i+1] = (byte)cmd.charAt(i); } os.write(data); os.flush(); //读数据,第一个字节是长度 InputStream in = socket.getInputStream(); int len = in.read(); if(len == -1) { throw new RuntimeException("network error"); } byte[] buffer = new byte[len]; int offset = 0; int bytesRead = in.read(buffer,0,len); while(offset < len) { offset += bytesRead; bytesRead = in.read(buffer, offset, len-offset); } String[] response = (new String(buffer,0,offset)).split("\\|"); if(response[0].equals("2")) { throw new RuntimeException("bad command"); } else if(response[0].equals("1")) { return null; } else { return response[1]; } } catch(IOException e) { throw new RuntimeException("network error"); } finally { } } public void set(String key, String value) { try { //写数据,前边用一个byte存储长度 OutputStream os = socket.getOutputStream(); String cmd = "set|"+key + "|" + value; byte length = (byte)cmd.length(); byte[] data = new byte[cmd.length()+1]; data[0] = length; for(int i = 0; i < cmd.length(); i++) { data[i+1] = (byte)cmd.charAt(i); } os.write(data); os.flush(); InputStream in = socket.getInputStream(); int len = in.read(); if(len == -1) { throw new RuntimeException("network error"); } byte[] buffer = new byte[len]; int offset = 0; int bytesRead = in.read(buffer,0,len); while(offset < len) { offset += bytesRead; bytesRead = in.read(buffer, offset, len-offset); } String bufString = new String(buffer,0,offset); Util.log_debug(bufString); String[] response = bufString.split("\\|"); if(response[0].equals("2")) { throw new RuntimeException("bad command"); } } catch(IOException e) { throw new RuntimeException("network error"); } finally { } } public void close() { try { socket.close(); } catch(IOException ex) { ex.printStackTrace(); } }} 服务端建立一个连接,就由一个线程一直处理这个连接,有数据就处理,没数据就不处理。这样的话,每个连接一个线程,如果连接数较大,就会有问题。 package org.ifool.niodemo.redis.redis3;import org.ifool.niodemo.redis.Util;import java.io.IOException;import java.io.InputStream;import java.io.OutputStream;import java.net.InetSocketAddress;import java.net.ServerSocket;import java.net.Socket;import java.util.Map;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class RedisServer3 { //全局缓存 public static Map cache = new ConcurrentHashMap(); public static void main(String[] args) throws IOException { //用于处理请求的线程池 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(20, 1000, 30, TimeUnit.SECONDS, new ArrayBlockingQueue(5)); ServerSocket serverSocket = new ServerSocket(8888, 10); byte[] buffer = new byte[512]; while (true) { //接受客户端连接请求 Socket clientSocket = null; try { clientSocket = serverSocket.accept(); Util.log_debug(clientSocket.getRemoteSocketAddress().toString()); } catch (IOException e) { e.printStackTrace(); } //让线程池处理这个请求 threadPool.execute(new RequestHandler(clientSocket)); } }}class RequestHandler implements Runnable{ private Socket clientSocket; public RequestHandler(Socket socket) { clientSocket = socket; } public void run() { byte[] buffer = new byte[512]; //读取数据并且操作缓存,然后写回数据 try { while(true) { //读数据 InputStream in = clientSocket.getInputStream(); int len = in.read(); //读取长度 if(len == -1) { throw new IOException("socket closed by client"); } int bytesRead = in.read(buffer, 0, len); int totalBytesRead = 0; while (totalBytesRead < len) { totalBytesRead += bytesRead; bytesRead = in.read(buffer, totalBytesRead, len - totalBytesRead); } //操作缓存 byte[] response = Util.proce***equest(RedisServer3.cache,buffer, totalBytesRead,true); Util.log_debug("response:" + new String(response)); //写回数据 OutputStream os = clientSocket.getOutputStream(); os.write(response); os.flush(); } } catch (IOException e) { System.out.println("read or write data exception"); } finally { try { clientSocket.close(); Util.log_debug("socket closed"); } catch (IOException ex) { ex.printStackTrace(); } } }} 使用这个方式,10个线程连续读写10000次,也就是累计访问20000万次只需要3s。 4.阻塞IO+单线程轮询+多线程处理+长连接(不可行) 多线程和长连接大大提高了效率,但是如果连接数太多,那么需要太多的线程,这样肯定不可行。这样大部分线程即使没数据也不能干其它的,就耗在这个连接上了。 我们可不可以让一个线程去负责等待这些socket,有数据了就告诉工作线程池。 代码如下,加了一个线程遍历已经连接的socket,然后如果socket.getInputStream().available() >

Notify the thread pool when 0.

This program can work normally in some cases, but it is actually problematic. The key is that the above available function is blocked. Every time you poll all socket, you need to wait one by one for whether you already have data, so it is serial. There is no way to set non-blocking for socket alone in java, it must be done from NIO. It is possible to use C language, but not here.

Package org.ifool.niodemo.redis.redis4;import org.ifool.niodemo.redis.Util;import java.io.IOException;import java.io.InputStream;import java.io.OutputStream;import java.net.InetSocketAddress;import java.net.ServerSocket;import java.net.Socket;import java.util.HashSet;import java.util.Iterator;import java.util.Map;import java.util.Set;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit Public class RedisServer4 {/ / Global cache public static Map cache = new ConcurrentHashMap (); / / current socket final public static Set socketSet = new HashSet (10); public static void main (String [] args) throws IOException {/ / thread pool used to process requests final ThreadPoolExecutor threadPool = new ThreadPoolExecutor (20, 1000, 30, TimeUnit.SECONDS, new ArrayBlockingQueue (1000)); ServerSocket serverSocket = new ServerSocket (8888100) / / start a thread to scan the socket that can read data all the time, and remove the closed connection Thread thread = new Thread (new Runnable () {public void run () {/ / find the socket that can be read Processing while (true) {synchronized (socketSet) {Iterator it = socketSet.iterator () While (it.hasNext ()) {Socket socket = it.next () If (socket.isConnected ()) {try {if (! socket.isInputShutdown () & & socket.getInputStream (). Available () > 0) {it.remove () ThreadPool.execute (new RequestHandler (socket));}} catch (IOException ex) {System.out.println ("socket already closed1"); socketSet.remove (socket) Try {socket.close ();} catch (IOException e) {System.out.println ("socket already closed2") } else {socketSet.remove (socket); try {socket.close () } catch (IOException e) {e.printStackTrace ();}}) Thread.start (); while (true) {/ / accept client connection requests and add the newly created socket to socketset Socket clientSocket = null; try {clientSocket = serverSocket.accept (); Util.log_debug ("client address:" + clientSocket.getRemoteSocketAddress () .toString ()) Synchronized (socketSet) {socketSet.add (clientSocket);}} catch (IOException e) {e.printStackTrace ();} class RequestHandler implements Runnable {private Socket clientSocket; public RequestHandler (Socket socket) {clientSocket = socket } public void run () {byte [] buffer = new byte [512]; / / read the data and operate the cache, and then write back the data try {/ / read data InputStream in = clientSocket.getInputStream (); int len = in.read () / / read length if (len = =-1) {throw new IOException ("socket closed by client");} int bytesRead = in.read (buffer, 0, len); int totalBytesRead = 0; while (totalBytesRead

< len) { totalBytesRead += bytesRead; bytesRead = in.read(buffer, totalBytesRead, len - totalBytesRead); } //操作缓存 byte[] response = Util.proce***equest(RedisServer4.cache,buffer, totalBytesRead,true); Util.log_debug("response:" + new String(response)); //写回数据 OutputStream os = clientSocket.getOutputStream(); os.write(response); os.flush(); synchronized (RedisServer4.socketSet) { RedisServer4.socketSet.add(clientSocket); } } catch (IOException e) { e.printStackTrace(); System.out.println("read or write data exception"); } finally { } }}5.IO多路复用+单线程轮询+多线程处理+长连接 在上述例子中我们试图用普通socket实现类似select的功能,在Java里是不可行的,必须用NIO。我们只需要一个select函数就能轮询所有的连接是否准备好数据,准备好了就能调用线程池里的线程处理。 要使用NIO,需要了解ByteBuffer, Channel等内容,比如ByteBuffer设计的就比较麻烦,此处不再展开。 客户端代码暂时不用NIO,还是用原来的,服务端代码如下: package org.ifool.niodemo.redis.redis5;import org.ifool.niodemo.redis.Util;import java.io.IOException;import java.io.InputStream;import java.io.OutputStream;import java.io.SyncFailedException;import java.net.InetSocketAddress;import java.net.ServerSocket;import java.net.Socket;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.*;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class RedisServer5 { //全局缓存 public static Map cache = new ConcurrentHashMap(); public static void main(String[] args) throws IOException { //用于处理请求的线程池 final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(20, 1000, 30, TimeUnit.SECONDS, new ArrayBlockingQueue(1000)); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.socket().bind(new InetSocketAddress(8888),1000); Selector selector = Selector.open(); ssc.configureBlocking(false); //必须设置成非阻塞 ssc.register(selector, SelectionKey.OP_ACCEPT); //serverSocket只关心accept while(true) { int num = selector.select(); if(num == 0) { continue; } Set selectionKeys = selector.selectedKeys(); Iterator it = selectionKeys.iterator(); while(it.hasNext()) { SelectionKey key = it.next(); it.remove(); if(key.isAcceptable()) { SocketChannel sc = ssc.accept(); sc.configureBlocking(false); //设置成非阻塞才能监听 sc.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(512) ); System.out.println("new connection"); } if(key.isReadable()) { SocketChannel clientSocketChannel = (SocketChannel)key.channel(); //System.out.println("socket readable"); if(!clientSocketChannel.isConnected()) { clientSocketChannel.finishConnect(); key.cancel(); clientSocketChannel.close(); System.out.println("socket closed2"); continue; } ByteBuffer buffer = (ByteBuffer)key.attachment(); int len = clientSocketChannel.read(buffer); Socket socket = clientSocketChannel.socket(); if(len == -1) { clientSocketChannel.finishConnect(); key.cancel(); clientSocketChannel.close(); System.out.println("socket closed1"); } else { threadPool.execute(new RequestHandler(clientSocketChannel, buffer)); } } } } }}class RequestHandler implements Runnable{ private SocketChannel channel; private ByteBuffer buffer; public RequestHandler(SocketChannel channel, Object buffer) { this.channel = channel; this.buffer = (ByteBuffer)buffer; } public void run() { //读取数据并且操作缓存,然后写回数据 try { int position = buffer.position(); //切换成读模式,以便把第一个字节到长度读出来 buffer.flip(); int len = buffer.get(); //读取长度 if(len >

Position + 1) {buffer.position (position); buffer.limit (buffer.capacity ()); return;} byte [] data = new byte [len]; buffer.get (data,0,len) / / Operation cache byte [] response = Util.proce***equest (RedisServer5.cache,data, len,true); Util.log_debug ("response:" + new String (response)); buffer.clear (); buffer.put (response); buffer.flip (); channel.write (buffer) Buffer.clear ();} catch (IOException e) {System.out.println ("read or write data exception");} finally {}}

There are a lot of holes in writing NIO programs, and sometimes there are problems with the above code, and some exceptions are not handled properly. But it takes more than 3 seconds for 10 threads to write 10000 times.

IO Multiplexing + Netty

Using java's native NIO to write programs is easy to go wrong, because API is more complex, and there are many exceptions to deal with, such as connection closure, sticky half-package, etc., using a mature framework like Netty will be easier to write.

The common threading model for Netty is shown in the figure below. MainReactor listens for new server socket,accept connections and dispatches the established socket to subReactor. SubReactor is responsible for multiplexing the connected socket, reading and writing network data, and throwing the business processing function to the worker thread pool to complete. In general, the number of subReactor is the same as the number of CPU.

The client code is shown below. Two of the NioEventLoop are mainReactor and subReactor above. The first parameter is 0, which means to use the default number of threads, so the mainReactor is usually 1, and the subReactor is generally connected to the CPU core.

We only have boss (mainReactor) and worker (subReactor). In general, there is a thread pool for dealing with real business logic, because worker is used to read and decode data, and it is not appropriate to deal with business logic, such as accessing a database, in this worker. It's just that our scenario is similar to Redis, so we don't use another thread pool.

Package org.ifool.niodemo.redis.redis6;import io.netty.bootstrap.ServerBootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.LengthFieldBasedFrameDecoder;import org.ifool.niodemo.redis.Util;import java.io.IOException;import java.nio.ByteBuffer;import java.util.Map Import java.util.concurrent.*;import java.util.concurrent.atomic.AtomicInteger;public class RedisServer6 {/ / global cache public static Map cache = new ConcurrentHashMap (); public static void main (String [] args) throws IOException, InterruptedException {/ / thread pool used to handle accept events EventLoopGroup bossGroup = new NioEventLoopGroup (0, new ThreadFactory () {AtomicInteger index = new AtomicInteger (0)) Public Thread newThread (Runnable r) {return newThread (r, "netty-boss-" + index.getAndIncrement ();}}); / / Thread pool used to handle read events EventLoopGroup workerGroup = new NioEventLoopGroup (0, new ThreadFactory () {AtomicInteger index = new AtomicInteger (0)) Public Thread newThread (Runnable r) {return newThread (r, "netty-worker-" + index.getAndIncrement ();}}); ServerBootstrap bootstrap = new ServerBootstrap (); bootstrap.group (bossGroup,workerGroup) .channel (NioServerSocketChannel.class) .option (ChannelOption.SO_BACKLOG,50) .childHandler (new ChildChannelHandler ()) ChannelFuture future = bootstrap.bind (8888). Sync (); future.channel (). CloseFuture (). Sync (); bossGroup.shutdownGracefully (); workerGroup.shutdownGracefully () }} / * * this class is * * / class ChildChannelHandler extends ChannelInitializer {protected void initChannel (SocketChannel socketChannel) throws Exception {/ / first subpackaged through a LengthFieldBasedFrameDecoder, and then passed to RequestHandler socketChannel.pipeline () .addLast (new RedisDecoder (127)). AddLast (new RequestHandler ()) }} class RequestHandler extends ChannelInboundHandlerAdapter {@ Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg; int len = buf.readableBytes ()-1; int lenField = buf.readByte (); if (len! = lenField) {ByteBuf resp = Unpooled.copiedBuffer ("2 | bad cmd" .getBytes ()); ctx.write (resp) } byte [] req = new byte [len]; buf.readBytes (req,0,len); byte [] response = Util.proce***equest (RedisServer6.cache,req,len,true); ByteBuf resp = Unpooled.copiedBuffer (response); ctx.write (resp);} @ Override public void channelReadComplete (ChannelHandlerContext ctx) throws Exception {ctx.flush () } @ Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) {ctx.close ();}} class RedisDecoder extends LengthFieldBasedFrameDecoder {public RedisDecoder (int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {super (maxFrameLength, lengthFieldOffset, lengthFieldLength);}}

As you can see, one boss thread and 10 worker threads are produced after running.

Write in netty is more stable, 10 write non-stop writing 10000 times is also 3 seconds, but do not worry about thread count.

Summary

It is difficult to understand the network IO model only by looking at concepts, and it can only be understood more deeply through examples. Let's sum up by answering a question:

Why can redis achieve tens of thousands of tps through a single thread?

Let's refine the process of processing requests by redis:

(1) read the original data

(2) parsing and processing data (processing business logic)

(3) data will be returned by writing

Reading data is implemented through IO multiplexing, while at the bottom, it is implemented through epoll. Compared with select (not the select of Java NIO, but the select of Linux), epoll has the following two main advantages:

One is to improve the efficiency of traversing socket, even if there are millions of connections, it will only traverse connections with events, while select needs to traverse all of them.

Second, the shared memory of kernel mode and user mode is realized through mmap, that is, the data is copied from the network card to the kernel space and does not need to be copied to the user space, so using epoll, if a read event is found, then the data in the memory is ready and does not need to be copied.

As can be seen from the above, reading data is very fast.

The next step is to process the data, which is the essential reason why single threading can be used. The business logic of redis is pure memory operation, and the time consumption is nanosecond, so the time is negligible. If we are a complex web application and the business logic involves reading the database and calling other modules, we cannot use a single thread.

Similarly, data is written in shared memory through epoll, as long as the result is calculated and put into the user's memory, and then notify the operating system.

Therefore, the premise that redis can support tens of thousands of tps in a single thread is that each request is a memory operation, and the time is very short. Whenever a request is slow, it will cause the request to block. Assuming that 99.99% of the request response time is within 1ms, and 0.01% of the request time is 1s, then when the single-threaded model processes 1s requests, the remaining 1ms requests will also have to be queued.

At this point, I believe you have a deeper understanding of the "Java network IO model and classification". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report