Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How does Springboot+Netty+Websocket implement a message push instance?

2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/03 Report--

It is believed that many inexperienced people have no idea about how to implement the message push instance in Springboot+Netty+Websocket. Therefore, this paper summarizes the causes and solutions of the problem. Through this article, I hope you can solve this problem.

Preface

WebSocket makes it easier to exchange data between the client and the server, allowing the server to actively push data to the client. In WebSocket API, only one handshake is needed between the browser and the server, and a persistent connection can be directly created between the browser and the server for two-way data transmission.

Advantages of the Netty framework

1.API is easy to use and has low development threshold.

two。 Powerful, preset a variety of codec functions, support a variety of mainstream protocols

3. The customization ability is strong, and the communication framework can be flexibly extended through ChannelHandler

4. High performance. Compared with other mainstream NIO frameworks in the industry, Netty has the best overall performance.

5. Mature and stable, Netty fixes all the discovered JDK NIO BUG, and business developers no longer need to worry about NIO's BUG

Tip: the following is the main body of this article. The following examples are available for reference.

First, introduce netty dependency

Io.netty netty-all 4.1.48.Final

Second, use steps

1. Introduce the basic configuration class

Package com.test.netty; public enum Cmd {START ("000,000", "connection successful"), WMESSAGE ("001", "message reminder"),; private String cmd; private String desc; Cmd (String cmd, String desc) {this.cmd = cmd; this.desc = desc;} public String getCmd () {return cmd;} public String getDesc () {return desc;}}

2.netty service startup listener

Package com.test.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.ApplicationRunner; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component / * * @ author test *

* the service starts the monitor * * / @ Slf4j @ Component public class NettyServer {@ Value ("${server.netty.port}") private int port; @ Autowired private ServerChannelInitializer serverChannelInitializer; @ Bean ApplicationRunner nettyRunner () {return args-> {/ / new a main thread group EventLoopGroup bossGroup = new NioEventLoopGroup (1); / / new a working group EventLoopGroup workGroup = new NioEventLoopGroup () ServerBootstrap bootstrap = new ServerBootstrap () .group (bossGroup, workGroup) .channel (NioServerSocketChannel.class) .childHandler (serverChannelInitializer) / sets the queue size. Option (ChannelOption.SO_BACKLOG, 1024) / / TCP automatically sends an activity probe when there are no messages available in the hour. Childoption (ChannelOption.SO_KEEPALIVE, true) / / set the port and begin to receive the incoming connection try {ChannelFuture future = bootstrap.bind (port). Sync (); log.info ("Server start Monitoring Port: {}", port); future.channel (). CloseFuture (). Sync ();} catch (InterruptedException e) {e.printStackTrace ();} finally {/ / close the main thread group bossGroup.shutdownGracefully () / / close the work schedule group workGroup.shutdownGracefully ();}};}}

3.netty server processor

Package com.test.netty; import com.test.common.util.JsonUtil; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.net.URLDecoder Import java.util.*; / * * @ author test *

* netty server processor * * / @ Slf4j @ Component @ ChannelHandler.Sharable public class NettyServerHandler extends SimpleChannelInboundHandler {@ Autowired private ServerChannelCache cache; private static final String dataKey = "test="; @ Data public static class ChannelCache {} / * * the client terminal will initiate * / @ Override public void channelActive (ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel () Log.info ("Channel connection is on, ID- > {}.", channel.id (). AsLongText ();} @ Override public void userEventTriggered (ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {Channel channel = ctx.channel (); WebSocketServerProtocolHandler.HandshakeComplete handshakeComplete = (WebSocketServerProtocolHandler.HandshakeComplete) evt; String requestUri = handshakeComplete.requestUri (); requestUri = URLDecoder.decode (requestUri, "UTF-8") Log.info ("HANDSHAKE_COMPLETE,ID- > {}, URI- > {}", channel.id (). AsLongText (), requestUri); String socketKey = requestUri.substring (requestUri.lastIndexOf (dataKey) + dataKey.length ()); if (socketKey.length () > 0) {cache.add (socketKey, channel); this.send (channel, Cmd.DOWN_START, null);} else {channel.disconnect (); ctx.close () }} super.userEventTriggered (ctx, evt);} @ Override public void channelInactive (ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel (); log.info ("Channel connection has been opened, ID- > {}, use tunnel ID- > {}.", channel.id (). AsLongText (), cache.getCacheId (channel)); cache.remove (channel) } / * frequent occurrence * / @ Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception {Channel channel = ctx.channel (); log.error ("ID- > {}, use regular ID- > {}, regular-> {}. AsLongText (), cache.getCacheId (channel), cause.getMessage (), cause); cache.remove (channel); ctx.close () } / * client messages will be sent * / @ Override protected void channelRead0 (ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {try {/ / log.info ("receiving messages sent by clients: {}", msg.text ()); ctx.channel () .writeAndFlush (JsonUtil.toString (Collections.singletonMap ("cmd") } catch (Exception e) {log.error ("message processing: {}", e.getMessage (), e);}} public void send (Cmd cmd, String id, Object obj) {HashMap channels = cache.get (id); if (channels = = null) {return;} Map data = new LinkedHashMap (); data.put ("cmd", cmd.getCmd ()); data.put ("data", obj) String msg = JsonUtil.toString (data); log.info ("message under server: {}", msg); channels.values () .forEach (channel-> {channel.writeAndFlush (new TextWebSocketFrame (msg));});} public void send (Channel channel, Cmd cmd, Object obj) {Map data = new LinkedHashMap (); data.put ("cmd", cmd.getCmd ()); data.put ("data", obj); String msg = JsonUtil.toString (data) Log.info ("message under server: {}", msg); channel.writeAndFlush (new TextWebSocketFrame (msg));}}

4.netty server-side cache class

Package com.test.netty; import io.netty.channel.Channel; import io.netty.util.AttributeKey; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.concurrent.ConcurrentHashMap; @ Component public class ServerChannelCache {private static final ConcurrentHashMap CACHE_MAP = new ConcurrentHashMap (); private static final AttributeKey CHANNEL_ATTR_KEY = AttributeKey.valueOf ("test"); public String getCacheId (Channel channel) {return channel.attr (CHANNEL_ATTR_KEY) .get () } public void add (String cacheId, Channel channel) {channel.attr (CHANNEL_ATTR_KEY) .set (cacheId); HashMap hashMap = CACHE_MAP.get (cacheId); if (hashMap = = null) {hashMap = new HashMap ();} hashMap.put (channel.id (). AsShortText (), channel); CACHE_MAP.put (cacheId, hashMap);} public HashMap get (String cacheId) {if (cacheId = = null) {return null } return CACHE_MAP.get (cacheId);} public void remove (Channel channel) {String cacheId = getCacheId (channel); if (cacheId = = null) {return;} HashMap hashMap = CACHE_MAP.get (cacheId); if (hashMap = = null) {hashMap = new HashMap ();} hashMap.remove (channel.id (). AsShortText ()); CACHE_MAP.put (cacheId, hashMap);}}

5.netty service initializer

Package com.test.netty; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; / * * @ author test *

* netty service initializer * * / @ Component public class ServerChannelInitializer extends ChannelInitializer {@ Autowired private NettyServerHandler nettyServerHandler; @ Override protected void initChannel (SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline (); pipeline.addLast (new HttpServerCodec ()); pipeline.addLast (new ChunkedWriteHandler ()); pipeline.addLast (new HttpObjectAggregator (8192)); pipeline.addLast (new WebSocketServerProtocolHandler ("/ test.io", true, 5000); pipeline.addLast (nettyServerHandler);}}

6.html test

Test function WebSocketTest () {if ("WebSocket" in window) {alert ("your client supports WebSocket!"); / / open a web socket var ws = new WebSocket ("ws://localhost:port/test.io") Ws.onopen = function () {/ / Web Socket has been connected, use the send () method to send data ws.send ("sending data"); alert ("data sending...");}; ws.onmessage = function (evt) {var received_msg = evt.data; alert ("data received.");} Ws.onclose = function () {/ close websocket alert ("connection has been closed...");};} else {/ / WebSocket alert is not supported by the explorer ("your explorer does not support WebSocket!");}} line WebSocket

7.vue test

Mounted () {this.initWebsocket ();}, methods: {initWebsocket () {let websocket = new WebSocket ('ws://localhost:port/test.io?test=123456'); websocket.onmessage = (event) = > {let msg = JSON.parse (event.data) Switch (msg.cmd) {case "000": this.$message ({type: 'success', message: "establish real-time connection successful!" , duration: 1000}) setInterval (() = > {websocket.send ("heartbeat")}, 60,1000); break; case "1000": this.$message.warning ("receive a new message, please check it in time!") Break;}} websocket.onclose = () = > {setTimeout () = > {this.initWebsocket ();}, 30,1000);} websocket.onerror = () = > {setTimeout (()) = > {this.initWebsocket ();}, 30,1000);},},! [insert the video description here] (https://img-blog.csdnimg.cn/20210107160420568.jpg?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3d1X3Fpbmdfc29uZw==,size_16,color_FFFFFF,t_70#pic_center)

8. The server sends messages

Autowired private NettyServerHandler nettyServerHandler; nettyServerHandler.send (CmdWeb.WMESSAGE, id, message); after reading the above, have you mastered how Springboot+Netty+Websocket implements a message push example? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report