Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to realize the Model of Live chat Server based on Java NIO

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

This article mainly explains "how to realize the real-time chat server model based on Java NIO". The content in the article is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "how to implement the Java NIO-based live chat server model".

Needless to say, there are many examples of introductions on SelectionKey, Selector and Channel of NIO, which go directly to the code:

JsonParser

Json's parsing class, casually encapsulated, is the most popular fastjson used recently.

Public class JsonParser {private static JSONObject mJson; public synchronized static String get (String json,String key) {mJson = JSON.parseObject (json); return mJson.getString (key);}}

Main

Entrance, no explanation.

Public class Main {public static void main (String... Args) {new SeekServer () .start ();}

Log

Public class Log {public static void i (Object obj) {System.out.println (obj);} public static void e (Object e) {System.err.println (e);}}

SeekServer:

The entry of the server, the encapsulation and reception of requests are all in this class, and the port is temporarily written in the code. The purpose of mSelector.select (TIME_OUT) > 0 is to have an interval in the loop when the server is idle (without any read or write or even request to disconnect events), otherwise it is basically equivalent to while (true) {/ / nothing}, you know.

Public class SeekServer extends Thread {private final int ACCPET_PORT = 55555; private final int TIME_OUT = 1000; private Selector mSelector = null; private ServerSocketChannel mSocketChannel = null; private ServerSocket mServerSocket = null; private InetSocketAddress mAddress = null; public SeekServer () {long sign = System.currentTimeMillis (); try {mSocketChannel = ServerSocketChannel.open () If (mSocketChannel = = null) {System.out.println ("can't open server socket channel");} mServerSocket = mSocketChannel.socket (); mAddress = new InetSocketAddress (ACCPET_PORT); mServerSocket.bind (mAddress); Log.i ("server bind port is" + ACCPET_PORT); mSelector = Selector.open () MSocketChannel.configureBlocking (false); SelectionKey key = mSocketChannel.register (mSelector, SelectionKey.OP_ACCEPT); key.attach (new Acceptor ()); / / detect Session status Looper.getInstance (). Loop (); / / start processing Session SessionProcessor.start () Log.i ("Seek server startup in" + (System.currentTimeMillis ()-sign) + "ms!");} catch (ClosedChannelException e) {Log.e (e.getMessage ());} catch (IOException e) {Log.e (e.getMessage ()) }} public void run () {Log.i ("server is listening..."); while (! Thread.interrupted ()) {try {if (mSelector.select (TIME_OUT) > 0) {Set keys = mSelector.selectedKeys (); Iterator iterator = keys.iterator () SelectionKey key = null; while (iterator.hasNext ()) {key = iterator.next (); Handler at = (Handler) key.attachment (); if (at! = null) {at.exec () } iterator.remove ();} catch (IOException e) {Log.e (e.getMessage ()) } class Acceptor extends Handler {public void exec () {try {SocketChannel sc = mSocketChannel.accept (); new Session (sc, mSelector);} catch (ClosedChannelException e) {Log.e (e) } catch (IOException e) {Log.e (e);}

Handler:

Only one abstract method, exec,Session, will inherit it.

Public abstract class Handler {public abstract void exec ();}

Session:

Encapsulates the user's request and SelectionKey and SocketChannel, resets its last active time every time it receives a new request, performs message receiving and sending through the status mState=READING or SENDING, and clears the session from SessionManager when the client is disconnected abnormally.

Public class Session extends Handler {private SocketChannel mChannel; private SelectionKey mKey; private ByteBuffer mRreceiveBuffer = ByteBuffer.allocate (10240); private Charset charset = Charset.forName ("UTF-8"); private CharsetDecoder mDecoder = charset.newDecoder (); private CharsetEncoder mEncoder = charset.newEncoder (); private long lastPant;// last active time private final int TIME_OUT = 1000 * 60 * 5; / / Session timeout private String key; private String sendData = "" Private String receiveData = null; public static final int READING = 0tending = 1; int mState = READING; public Session (SocketChannel socket, Selector selector) throws IOException {this.mChannel = socket; mChannel = socket; mChannel.configureBlocking (false); mKey = mChannel.register (selector, 0); mKey.attach (this); mKey.interestOps (SelectionKey.OP_READ); selector.wakeup () LastPant = Calendar.getInstance (). GetTimeInMillis ();} public String getReceiveData () {return receiveData;} public void clear () {receiveData = null;} public void setSendData (String sendData) {mState = SENDING; mKey.interestOps (SelectionKey.OP_WRITE); this.sendData = sendData + "\ n" } public boolean isKeekAlive () {return lastPant + TIME_OUT > Calendar.getInstance (). GetTimeInMillis ();} public void setAlive () {lastPant = Calendar.getInstance (). GetTimeInMillis ();} / * * Log out of the current Session * / public void distroy () {try {mChannel.close () MKey.cancel ();} catch (IOException e) {}} @ Override public synchronized void exec () {try {if (mState = = READING) {read ();} else if (mState = = SENDING) {write () } catch (IOException e) {SessionManager.remove (key); try {mChannel.close ();} catch (IOException E1) {Log.e (E1);} mKey.cancel () } public void read () throws IOException {mRreceiveBuffer.clear (); int sign = mChannel.read (mRreceiveBuffer); if (sign = =-1) {/ / client connection closes mChannel.close (); mKey.cancel ();} if (sign > 0) {mRreceiveBuffer.flip () ReceiveData = mDecoder.decode (mRreceiveBuffer). ToString (); setAlive (); setSign (); SessionManager.addSession (key, this);}} private void setSign () {/ / set the Key key of the current Session = JsonParser.get (receiveData, "imei") / / detect whether the message type is heartbeat packet / / String type = jo.getString ("type"); / / if (type.equals ("HEART_BEAT")) {/ / setAlive () / /}} / * write message * / public void write () {try {mChannel.write (mEncoder.encode (CharBuffer.wrap (sendData); sendData = null; mState = READING; mKey.interestOps (SelectionKey.OP_READ) } catch (CharacterCodingException e) {e.printStackTrace ();} catch (IOException e) {try {mChannel.close ();} catch (IOException E1) {Log.e (E1);}

SessionManager:

Store all the Session in ConcurrentHashMap, where the imei of the mobile phone user is used as the key,ConcurrentHashMap, so you can largely avoid the process of synchronization by yourself because it is thread safe.

Encapsulate some methods to operate Session, such as get,remove and so on.

Public class SessionManager {private static ConcurrentHashMap sessions = new ConcurrentHashMap (); public static void addSession (String key,Session session) {sessions.put (key, session);} public static Session getSession (String key) {return sessions.get (key);} public static Set getSessionKeys () {return sessions.keySet () } public static int getSessionCount () {return sessions.size ();} public static void remove (String [] keys) {for (String key:keys) {if (sessions.containsKey (key)) {sessions.get (key) .distroy (); sessions.remove (key) } public static void remove (String key) {if (sessions.containsKey (key)) {sessions.get (key) .distroy (); sessions.remove (key);}

SessionProcessor

JDK's own thread pool is used to distribute and process all requests that need to be processed in the Session (the initialization parameters of the thread pool are not very familiar. I hope children's shoes with knowledge can tell me), while the inner class Process encapsulates Session into SocketRequest and SocketResponse again (it feels a little familiar here, yes, JavaWeb is full of request and response).

Public class SessionProcessor implements Runnable {private static Runnable processor = new SessionProcessor (); private static ThreadPoolExecutor pool = new ThreadPoolExecutor (10,200,500, TimeUnit.MILLISECONDS,new ArrayBlockingQueue (10), new ThreadPoolExecutor.CallerRunsPolicy ()); public static void start () {new Thread (processor). Start ();} @ Override public void run () {while (true) {Session tmp = null For (String key:SessionManager.getSessionKeys ()) {tmp = SessionManager.getSession (key); / / processing Session outstanding requests if (tmp.getReceiveData ()! = null) {pool.execute (new Process (tmp)) }} try {Thread.sleep (10);} catch (InterruptedException e) {Log.e (e);} class Process implements Runnable {private SocketRequest request; private SocketResponse response Public Process (Session session) {/ / encapsulates Session into Request and Response request = new SocketRequest (session); response = new SocketResponse (session);} @ Override public void run () {new RequestTransform () .transfer (request, response);}

The transfer method in RequestTransform invokes different methods of different classes (UserHandler and MessageHandler) by reflecting the request category and request action in the request parameters.

Public class RequestTransform {public void transfer (SocketRequest request,SocketResponse response) {String action = request.getValue ("action"); String handlerName = request.getValue ("handler"); / / depending on the request type of Session, let different class methods handle try {Class c = Class.forName ("com.seek.server.handler." + handlerName); Class [] arg=new Class [] {SocketRequest.class,SocketResponse.class} Method method=c.getMethod (action,arg); method.invoke (c.newInstance (), new Object [] {request,response});} catch (Exception e) {e.printStackTrace ();}

SocketRequest and SocketResponse

Public class SocketRequest {private Session mSession; private String mReceive; public SocketRequest (Session session) {mSession = session; mReceive = session.getReceiveData (); mSession.clear ();} public String getValue (String key) {return JsonParser.get (mReceive, key);} public String getQueryString () {return mReceive }} public class SocketResponse {private Session mSession; public SocketResponse (Session session) {mSession = session;} public void write (String msg) {mSession.setSendData (msg);}}

Finally, there are two Handler that process the request

Public class UserHandler {public void login (SocketRequest request,SocketResponse response) {System.out.println (request.getQueryString ()); / / TODO: handle user login to response.write ("you must have received the message");}} public class MessageHandler {public void send (SocketRequest request,SocketResponse response) {System.out.println (request.getQueryString ()) / / send String key = request.getValue ("imei"); Session session = SessionManager.getSession (key); new SocketResponse (session) .write (request.getValue ("sms"));}}

There is also a class Looper that monitors whether it times out and deletes Session periodically.

Public class Looper extends Thread {private static Looper looper = new Looper (); private static boolean isStart = false; private final int INTERVAL = 1000 * 60 * 5; private Looper () {} public static Looper getInstance () {return looper;} public void loop () {if (! isStart) {isStart = true; this.start () }} public void run () {Task task = new Task (); while (true) {/ / Session expiration detection task.checkState (); / / heartbeat packet detection / / task.sendAck (); try {Thread.sleep (INTERVAL) } catch (InterruptedException e) {Log.e (e);} public class Task {public void checkState () {Set keys = SessionManager.getSessionKeys (); if (keys.size () = = 0) {return;} List removes = new ArrayList (); Iterator iterator = keys.iterator () String key = null; while (iterator.hasNext) {key = iterator.next (); if (! SessionManager.getSession (key). IsKeekAlive ()) {removes.add (key) }} if (removes.size () > 0) {Log.i ("sessions is time out,remove" + removes.size () + "session");} SessionManager.remove (removes.toArray (new String [removes.size ()]));} public void sendAck () {Set keys = SessionManager.getSessionKeys () If (keys.size () = = 0) {return;} Iterator iterator = keys.iterator (); while (iterator.hasNext ()) {iterator.next (); / / TODO sends heartbeat packet}

Note that the sessions of SessionManager is traversed in both Task and SessionProcessor classes. The method used in this article is not very good, mainly because of efficiency. It is recommended to use the way of traversing Entry to get Key and Value. Because it has been tossing about on JavaWeb, it will be very friendly for children's shoes to see Request and Response. This example has not passed any security and performance tests. If you need to put it on the production environment, please do your own test first!

The data content of the client request, such as {handler: "UserHandler", action: "login", imei: "2364656512636".}, these conventions are decided by yourself.

Thank you for your reading, the above is the content of "how to realize the live chat server model based on Java NIO". After the study of this article, I believe you have a deeper understanding of how to realize the live chat server model based on Java NIO, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report