In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-08 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly explains "how to realize the multithreaded Reactor pattern in Java". The content in the article is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "how to realize the multithreaded Reactor pattern in Java".
The purpose of multithreaded Reactor mode is to allocate multiple reactor and each reactor has an independent selector. In network communication, it is generally designed as the main Reactor responsible for the connection, in which if the selector detects the occurrence of the connection event in the run function of the main Reactor, it will dispatch the event.
Let the Handler responsible for managing the connection handle the connection, where a sub-Handler is created in the Handler processor responsible for the connection to process the IO request. In this way, the connection request is executed separately from the IO request to increase the concurrency of the channel. The advantage of multiple Reactor at the same time is that multiple selector can improve the retrieval speed of channels.
1. Master server package com.crazymakercircle.ReactorModel;import com.crazymakercircle.NioDemoConfig;import com.crazymakercircle.util.Logger;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;import java.util.concurrent.atomic.AtomicInteger;class MultiThreadEchoServerReactor {ServerSocketChannel serverSocket; AtomicInteger next = new AtomicInteger (0); Selector bossSelector = null Reactor bossReactor = null; / / selectors collection, the introduction of multiple selector selectors / / multiple selectors can better improve the concurrency of the channel Selector [] workSelectors = new Selector [2]; / / introduce multiple sub-reactors / / if the CPU is multi-core, you can open multiple sub-Reactor reactors, so that each sub-Reactor reactor can allocate a thread independently. / / each thread can bind a separate Selector selector to increase channel concurrency Reactor [] workReactors = null; MultiThreadEchoServerReactor () throws IOException {bossSelector = Selector.open (); / / initialize multiple selector selectors workSelectors [0] = Selector.open (); workSelectors [1] = Selector.open (); serverSocket = ServerSocketChannel.open () InetSocketAddress address = new InetSocketAddress (NioDemoConfig.SOCKET_SERVER_IP, NioDemoConfig.SOCKET_SERVER_PORT); serverSocket.socket () .bind (address); / / non-blocking serverSocket.configureBlocking (false); / / first selector, responsible for monitoring new connection events SelectionKey sk = serverSocket.register (bossSelector, SelectionKey.OP_ACCEPT) / add a new connection to process the handler processor to SelectionKey (selection key) sk.attach (new AcceptorHandler ()); / / process the newly connected reactor bossReactor = new Reactor (bossSelector); / / the first sub-reactor, one of which is responsible for a selector Reactor subReactor1 = new Reactor (workSelectors [0]) / / the second sub-reactor, one sub-reactor is responsible for a selector Reactor subReactor2 = new Reactor (workSelectors [1]); workReactors = new Reactor [] {subReactor1, subReactor2};} private void startService () {new Thread (bossReactor). Start (); / / one sub-reactor corresponds to a thread new Thread (workReactors [0]). Start () New Thread (workReactors [1]) .start ();} / reactor class Reactor implements Runnable {/ / each thread is responsible for querying a selector final Selector selector; public Reactor (Selector selector) {this.selector = selector } public void run () {try {while (! Thread.interrupted ()) {/ / list selector sensor list selector.select (1000) every other second / / every other second; Set selectedKeys = selector.selectedKeys () If (null = = selectedKeys | | selectedKeys.size () = 0) {/ / if the channel registration event in the list does not occur, then continue to execute continue;} Iterator it = selectedKeys.iterator () While (it.hasNext ()) {/ / Reactor is responsible for events received by dispatch SelectionKey sk = it.next (); dispatch (sk) } / / clear the inductive events that have been handled to prevent repeated processing of selectedKeys.clear ();}} catch (IOException ex) {ex.printStackTrace () }} void dispatch (SelectionKey sk) {Runnable handler = (Runnable) sk.attachment (); / / call the handler processor object if (handler! = null) that attach bound to the selection key before {handler.run () }} / / Handler: new connection processor class AcceptorHandler implements Runnable {public void run () {try {SocketChannel channel = serverSocket.accept (); Logger.info ("received a new connection"); if (channel! = null) {int index = next.get () Logger.info ("selector number:" + index); Selector selector = workSelectors [index]; new MultiThreadEchoHandler (selector, channel);}} catch (IOException e) {e.printStackTrace () } if (next.incrementAndGet () = = workSelectors.length) {next.set (0);} public static void main (String [] args) throws IOException {MultiThreadEchoServerReactor server = new MultiThreadEchoServerReactor (); server.startService ();}}
According to the above design idea, three Reactor are actually designed in the main server, one main Reactor is specially responsible for the connection request and equipped with a separate selector, but the thread Run functions of the three Reactor do the same function, retrieving the event list according to the selector within each thread, and if the registered listening event occurs, call dispactch to distribute to the corresponding Handler of each Reactor.
It is important to note that at the beginning, only the main Reactor responsible for connecting events has assigned an AcceptorHandler () to the corresponding key when registering the selector.
/ / the first selector, responsible for monitoring the new connection event SelectionKey sk = serverSocket.register (bossSelector, SelectionKey.OP_ACCEPT); / / attaching the new connection processing handler processor to SelectionKey (select key) sk.attach (new AcceptorHandler ())
However, in the run method of Reactor, if the corresponding selector key occurs, it needs to dispatch to a Handler. Where is the Handler of the other two sub-Reactor assigned here? In fact, each sub-Handler is created in the Reactor that handles the connection request, as shown in the following code:
In the main Handler, the client channel is created according to the server channel, and the binding between the sub-selector and channel is carried out.
Int index = next.get (); Logger.info ("selector number:" + index); Selector selector = workSelectors [index]; new MultiThreadEchoHandler (selector, channel); 2. IO request handler+ thread pool package com.crazymakercircle.ReactorModel;import com.crazymakercircle.util.Logger;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector Import java.nio.channels.SocketChannel;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;class MultiThreadEchoHandler implements Runnable {final SocketChannel channel; final SelectionKey sk; final ByteBuffer byteBuffer = ByteBuffer.allocate (1024); static final int RECIEVING = 0, SENDING = 1; int state = RECIEVING; / / incoming thread pool static ExecutorService pool = Executors.newFixedThreadPool (4); MultiThreadEchoHandler (Selector selector, SocketChannel c) throws IOException {channel = c; channel.configureBlocking (false) / / Wake up selection to prevent boss threads from being blocked during register. Netty processing is more elegant. Events will be registered in the same thread to avoid blocking boss selector.wakeup (); / / only get the selection key, and then set the interested IO event sk = channel.register (selector, 0); / / use this Handler as an attachment to the sk selection key to facilitate event dispatch sk.attach (this) / register the Read ready event sk.interestOps (SelectionKey.OP_READ) with the sk selection key; / / Wake up selection, the OP_READ takes effect selector.wakeup (); Logger.info ("new connection registration completed");} public void run () {/ / Asynchronous task to execute pool.execute (new AsyncTask ()) in a separate thread pool } / / Asynchronous task that does not execute public synchronized void asyncRun () {try {if (state = = SENDING) {/ / write channel channel.write (byteBuffer) in the Reactor thread; / / after writing, prepare to start reading from the channel, and byteBuffer switches to write mode byteBuffer.clear () / / register the read ready event sk.interestOps (SelectionKey.OP_READ) after writing; / / enter the received state state = RECIEVING;} else if (state = = RECIEVING) {/ / read int length = 0 from the channel While ((length = channel.read (byteBuffer)) > 0) {Logger.info (new String (byteBuffer.array (), 0, length));} / / after reading, prepare to start the write channel, and byteBuffer switches to read mode byteBuffer.flip () / / after reading, register the write ready event sk.interestOps (SelectionKey.OP_WRITE); / / after reading, enter the sent state state = SENDING;} / / processing is over. Select key cannot be closed here, and / / sk.cancel () needs to be reused. } catch (IOException ex) {ex.printStackTrace ();}} / / the inner class class AsyncTask implements Runnable {public void run () {MultiThreadEchoHandler.this.asyncRun ();} 3 of asynchronous tasks, client
The thread pool is used in the Handler for processing IO requests, which has achieved the purpose of asynchronous processing.
Package com.crazymakercircle.ReactorModel;import com.crazymakercircle.NioDemoConfig;import com.crazymakercircle.util.Dateutil;import com.crazymakercircle.util.Logger;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Scanner;import java.util.Set / * create by Nien @ Crazy Creator Circle * * / public class EchoClient {public void start () throws IOException {InetSocketAddress address = new InetSocketAddress (NioDemoConfig.SOCKET_SERVER_IP, NioDemoConfig.SOCKET_SERVER_PORT); / / 1. Get channel (channel) SocketChannel socketChannel = SocketChannel.open (address); Logger.info ("client connection is successful") / 2. Switch to non-blocking mode socketChannel.configureBlocking (false); / / keep spinning, waiting for the connection to complete, or do something else: while (! socketChannel.finishConnect ()) {} Logger.tcfo ("client starts successfully!") ; / / start the accept thread Processer processer = new Processer (socketChannel); new Thread (processer). Start ();} static class Processer implements Runnable {final Selector selector; final SocketChannel channel; Processer (SocketChannel channel) throws IOException {/ / Reactor initialization selector = Selector.open (); this.channel = channel Channel.register (selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);} public void run () {try {while (! Thread.interrupted ()) {selector.select (); Set selected = selector.selectedKeys (); Iterator it = selected.iterator () While (it.hasNext ()) {SelectionKey sk = it.next (); if (sk.isWritable ()) {ByteBuffer buffer = ByteBuffer.allocate (NioDemoConfig.SEND_BUFFER_SIZE); Scanner scanner = new Scanner (System.in) Logger.tcfo ("Please enter what to send:"); if (scanner.hasNext ()) {SocketChannel socketChannel = (SocketChannel) sk.channel (); String next = scanner.next () Buffer.put ((Dateutil.getNow () + "> >" + next). GetBytes ()); buffer.flip (); / / Operation 3: send data socketChannel.write (buffer) Buffer.clear () }} if (sk.isReadable ()) {/ / if the IO event of the selected key is a "readable" event, read the data SocketChannel socketChannel = (SocketChannel) sk.channel () / / read data ByteBuffer byteBuffer = ByteBuffer.allocate (1024); int length = 0; while ((length = socketChannel.read (byteBuffer)) > 0) {byteBuffer.flip () Logger.info ("server echo:" + new String (byteBuffer.array (), 0, length); byteBuffer.clear () }} / / processing is over. You cannot close select key here. You need to reuse / / selectionKey.cancel ();} selected.clear () }} catch (IOException ex) {ex.printStackTrace ();} public static void main (String [] args) throws IOException {new EchoClient () .start () }} Thank you for your reading, the above is the content of "how to realize the multithreaded Reactor pattern in Java". After the study of this article, I believe you have a deeper understanding of how to realize the multithreaded Reactor pattern in Java, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.