Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the distributed WebSocket solution?

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

This article focuses on "what is the distributed WebSocket solution". Interested friends may wish to have a look at it. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn what the distributed WebSocket solution is.

Introduction to the Application of WebSocket monomer

Before introducing distributed clusters, let's take a look at the Prince's WebSocket code implementation. Let's take a look at the java backend code as follows:

Import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject;import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @ ServerEndpoint ("/ webSocket/ {key}") public class WebSocket {private static int onlineCount = 0; / * * Storage connected client * / private static Map clients = new ConcurrentHashMap () Private Session session; / * the target department code * / private String key; @ OnOpen public void onOpen (@ PathParam ("key") String key, Session session) throws IOException {this.key = key; this.session = session; if (! clients.containsKey (key)) {addOnlineCount ();} clients.put (key, this) Log.info (key+ "connected message service!") ;} @ OnClose public void onClose () throws IOException {clients.remove (key); subOnlineCount ();} @ OnMessage public void onMessage (String message) throws IOException {if (message.equals ("ping")) {return;} JSONObject jsonTo = JSON.parseObject (message); String mes = (String) jsonTo.get ("message") If (! jsonTo.get ("to"). Equals ("All") {sendMessageTo (mes, jsonTo.get ("to"). ToString ());} else {sendMessageAll (mes);}} @ OnError public void onError (Session session, Throwable error) {error.printStackTrace () } private void sendMessageTo (String message, String To) throws IOException {for (WebSocket item: clients.values ()) {if (item.key.contains (To)) item.session.getAsyncRemote (). SendText (message);}} private void sendMessageAll (String message) throws IOException {for (WebSocket item: clients.values ()) {item.session.getAsyncRemote (). SendText (message) }} public static synchronized int getOnlineCount () {return onlineCount;} public static synchronized void addOnlineCount () {WebSocket.onlineCount++;} public static synchronized void subOnlineCount () {WebSocket.onlineCount--;} public static synchronized Map getClients () {return clients;}}

The sample code does not use Spring, but is written in native java web. Let's briefly introduce the methods inside.

OnOpen: triggers method execution when the client connects to the WebSocket service

OnClose: triggers execution when the client is disconnected from WebSocket

OnMessage: triggers execution when a message from the client is received

OnError: triggers execution when an error occurs

As you can see, in the onMessage method, we forward the message directly according to the message sent by the client, so there is no problem in the single message service.

Let's take another look at the js code.

Var host = _ document.location.host; / / get the current login department var deptCodes='$ {sessionScope.$UserContext.departmentID}'; deptCodes=deptCodes.replace (/ [\ [|\] |\ s] + / g, ""); var key ='${sessionScope.$UserContext.userID}'+ deptCodes; var lockReconnect = false; / / avoid repeated ws connection var ws = null / / determine whether the current browser supports WebSocket var wsUrl = 'ws://' + host +' / webSocket/'+ key; createWebSocket (wsUrl); / / Connect ws function createWebSocket (url) {try {if ('WebSocket' in window) {ws = new WebSocket (url) } else if ('MozWebSocket' in window) {ws = new MozWebSocket (url);} else {layer.alert ("your browser does not support the websocket protocol, it is recommended to use the new version of Google, Firefox and other browsers, do not use browsers below IE10, please use extreme mode, do not use compatibility mode!") ;} initEventHandle ();} catch (e) {reconnect (url); console.log (e);} function initEventHandle () {ws.onclose = function () {reconnect (wsUrl); console.log ("llws connection closed!" + new Date () .toUTCString ());} Ws.onerror = function () {reconnect (wsUrl); console.log ("llws connection error!");}; ws.onopen = function () {heartCheck.reset () .start (); / / heartbeat detection reset console.log ("llws connection succeeded!" + new Date () .toUTCString ());} Ws.onmessage = function (event) {/ / if the message is obtained, heartbeat detection resets heartCheck.reset () .start (); / / getting any message indicates that the current connection is normal / / the actual business processing of the message is received.} } / / listen for the window closing event. When the window closes, it takes the initiative to close the websocket connection to prevent the window from being closed before the connection is disconnected. An exception will be thrown on the server side. _ window.onbeforeunload = function () {ws.close ();} function reconnect (url) {if (lockReconnect) return; lockReconnect = true; setTimeout (function () {/ / No connection will always be reconnected, set delay to avoid too many createWebSocket (url) requests; lockReconnect = false;}, 2000) } / / heartbeat detection var heartCheck = {timeout: 300000, / / 5-minute heartbeat timeoutObj: null, serverTimeoutObj: null, reset: function () {clearTimeout (this.timeoutObj); clearTimeout (this.serverTimeoutObj); return this;}, start: function () {var self = this This.timeoutObj = setTimeout (function () {/ / send a heartbeat here. After receiving it at the backend, a heartbeat message is returned. / / when onmessage gets the returned heartbeat, it indicates that the connection is normal ws.send ("ping"); console.log ("ping!") Self.serverTimeoutObj = setTimeout (function () {/ / if the backend has not been reset after a certain period of time, the backend actively disconnects ws.close (); / / if onclose executes reconnect, we can execute ws.close (). If executing reconnect directly will trigger onclose to reconnect twice}, self.timeout)}, this.timeout)}}

The js part is written in native H5. If you want to be more compatible with browsers, you can also use SockJS. Interested friends can do it on their own.

Then we optimize the code manually to implement WebSocket's support for distributed architecture.

Thinking about the solution

Now that we understand the code structure of a single application and the problems that WebSocket faces in a distributed environment, it's time to think about how to solve this problem.

Let's first take a look at the root cause of the problem.

If you think about it briefly, you can see that there is only one server under a single application, and all clients connect to this message server, so when the publishers send messages, all clients have already established connections with this server, and it is OK to send messages in groups.

After switching to a distributed system, if we have two message servers, then after the client uses Nginx load balancing, some of the clients will connect to one of the servers and the other to the other, so when the publisher sends the message, it will only send the message to one of the servers, and this message server can perform the group operation, but the problem is that the other server does not know about it. You can't send a message.

Now we know that the root cause is that when producing a message, only one message server can perceive it, so we just have to let another message server perceive it, so that after perceiving it, it can send messages in groups to the clients connected to it.

So what is the way to achieve this function? the prince quickly came up with the idea of introducing message middleware and using its publish and subscribe model to notify all message servers.

Introducing RabbitMQ to solve the WebSocket problem in distributed Environment

In the choice of message middleware, the prince chose RabbitMQ because it is relatively simple to build and powerful, and we only use its function of sending messages in groups.

RabbitMQ has a broadcast mode (fanout), which is what we use.

First, let's write a connection class for RabbitMQ:

Import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class RabbitMQUtil {private static Connection connection; / * establish a connection with rabbitmq * @ return * / public static Connection getConnection () {if (connection! = null&&connection.isOpen ()) {return connection;} ConnectionFactory factory = new ConnectionFactory () Factory.setVirtualHost ("/"); factory.setHost ("192.168.220.110"); / / using the virtual IP address factory.setPort (5672); factory.setUsername ("guest"); factory.setPassword ("guest"); try {connection = factory.newConnection ();} catch (IOException e) {e.printStackTrace () } catch (TimeoutException e) {e.printStackTrace ();} return connection;}}

There is nothing to say about this class, but a factory class that gets MQ connections.

Then, according to our idea, every time the server starts, a consumer of MQ will be created to listen to MQ messages. Prince, the listener of Servlet is used in the test here, as follows:

Import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; public class InitListener implements ServletContextListener {@ Override public void contextInitialized (ServletContextEvent servletContextEvent) {WebSocket.init (); @ Override public void contextDestroyed (ServletContextEvent servletContextEvent) {}}

Remember to configure listener information in Web.xml

InitListener

Add the init method to WebSocket as the consumer part of MQ

Public static void init () {try {Connection connection = RabbitMQUtil.getConnection (); Channel channel = connection.createChannel (); / / switch declaration (parameter: switch name Switch type) channel.exchangeDeclare ("fanoutLogs", BuiltinExchangeType.FANOUT); / / get a temporary queue String queueName = channel.queueDeclare (). GetQueue (); / / the queue is bound to the switch (parameter: queue name; switch name RoutingKey ignore) channel.queueBind (queueName, "fanoutLogs", "") / / the handleDelivery method of DefaultConsumer is rewritten here, because the message is getByte () when it is sent, and here it is reassembled into String Consumer consumer = new DefaultConsumer (channel) {@ Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException {super.handleDelivery (consumerTag, envelope, properties, body) String message = new String (body, "UTF-8"); System.out.println (message); / / you can use WebSocket to send messages to the corresponding client through the message content}}; / / declare the messages consumed in the queue (parameter: queue name Whether the message is automatically acknowledged; consumer body) channel.basicConsume (queueName,true,consumer); / / the connection cannot be closed here. After the consumption method is called, the consumer will always connect to the rabbitMQ and wait for consumption} catch (IOException e) {e.printStackTrace ();}}

At the same time, when receiving a message, send a message to the corresponding client not directly through WebSocket, but to MQ, so that if there are multiple message servers, you will get the message from MQ, and then use WebSocket to push the message content to the corresponding client.

The onMessage method of WebSocket is added as follows:

Try {/ / attempts to get a connection Connection connection = RabbitMQUtil.getConnection (); / / attempts to create a channel Channel channel = connection.createChannel (); / / declares the switch (parameter: switch name; switch type, broadcast mode) channel.exchangeDeclare ("fanoutLogs", BuiltinExchangeType.FANOUT) / / message release (parameter: switch name; routingKey, ignore. In broadcast mode, the producer can declare the name and type of the switch) channel.basicPublish ("fanoutLogs", "null,msg.getBytes (" UTF-8 ")); System.out.println (" publish message "); channel.close ();} catch (IOException | TimeoutException e) {e.printStackTrace ();}

Delete the original Websocket push code after it is added.

At this point, I believe you have a deeper understanding of "what the distributed WebSocket solution is". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report