In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-25 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
What this article shares with you is about the analysis of abnormal data sent by websocket based on netty when channelActive is triggered. The editor thinks it is very practical, so I share it with you to learn. I hope you can get something after reading this article.
# the cause of the incident is that websocket is implemented with netty. After the link is created successfully, a message is sent to the client, and we choose to send the message in channelActive. It is conceivable that it will not work. The code is as follows
EventLoopGroup bossGroup = new NioEventLoopGroup (); EventLoopGroup group = new NioEventLoopGroup (); try {ServerBootstrap sb = new ServerBootstrap (); sb.option (ChannelOption.SO_BACKLOG, 1024) / / bind thread pool sb.group (group, bossGroup) / / specify the channel .channel (NioServerSocketChannel.class) / / bind listening port .localAddress (this.port) .option (ChannelOption.SO_KEEPALIVE) True) / / triggers the operation when binding a client connection. ChildHandler (new ChannelInitializer () {@ Override protected void initChannel (SocketChannel ch) throws Exception {log.info ("receive a new connection") Ch.pipeline () .addLast (new LoggingHandler ("DEBUG")); ch.pipeline () .addLast (new IdleStateHandler (60,0,0)) / / the websocket protocol itself is based on the http protocol, so the http decoder ch.pipeline (). AddLast (new HttpServerCodec ()) is also used here; / / the processor ch.pipeline () .addLast (new ChunkedWriteHandler ()) is written in blocks. Ch.pipeline () .addLast (new HttpObjectAggregator (8192)); ch.pipeline () .addLast (new WebSocketServerProtocolHandler ("/ socket", null, true, 65536 * 10); ch.pipeline () .addLast (new MyWebSocketHandler ());}}) / / Server asynchronous creation binding ChannelFuture cf = sb.bind () .sync (); log.info (NettyServer.class + "startup is listening:" + cf.channel () .localAddress ()); / / closing server channel cf.channel () .closeFuture () .sync ()
# troubleshooting 1: because there is no exception, the client does not receive the message, so it uses wireshark to grab the packet and finds that there is no corresponding message to be sent on the network card. Why does the network card have no corresponding packet? after debug, it is found that if the data type sent is WebSocketFrame, the specific code is abnormal when it is finally sent. In the write method of HeadContext, headcontext is the header of netty's channelpipeline, and when it is finally written out, it will be linked from the tail of pipeline to the header to execute (pipeline is a two-way linked list) Why can you write information in channelread but not in channelActive? after analysis, it is found that The specific code that triggers channelActive occurs when socketchannel registers for the first time: in abstrcatchannel
Private void register0 (ChannelPromise promise) {try {/ / check if the channel is still open as it could be closed in the mean time when the register / / call was outside of the eventLoop if (! promise.setUncancellable () | |! ensureOpen (promise)) {return;} boolean firstRegistration = neverRegistered; doRegister () NeverRegistered = false; registered = true; / / Ensure we call handlerAdded (...) Before we actually notify the promise. This is needed as the / / user may already fire events through the pipeline in the ChannelFutureListener. Pipeline.invokeHandlerAddedIfNeeded (); safeSetSuccess (promise); pipeline.fireChannelRegistered (); / / Only fire a channelActive if the channel has never been registered. This prevents firing / / multiple channel actives if the channel is deregistered and re-registered. If (isActive ()) {if (firstRegistration) {pipeline.fireChannelActive ();} else if (config (). IsAutoRead ()) {/ / This channel was registered before and autoRead () is set. This means we need to begin read / / again so that we process inbound data. / See https://github.com/netty/netty/issues/4805 beginRead ();} catch (Throwable t) {/ / Close the channel directly to avoid FD leak. CloseForcibly (); closeFuture.setClosed (); safeSetFailure (promise, t);}}
When grabbing the packet, it is found that when the channelactive is triggered, the server has not yet returned the protocol handshake package of websocket (websocket protocol is derived from http protocol, will first send a http get request and then the server will return a package for websocket protocol to the client). The truth will be revealed. There is the following code in the WebSocketServerProtocolHandler handller we added.
Public void handlerAdded (ChannelHandlerContext ctx) {ChannelPipeline cp = ctx.pipeline (); if (cp.get (WebSocketServerProtocolHandshakeHandler.class) = = null) {/ / Add the WebSocketHandshakeHandler before this one. Ctx.pipeline (). AddBefore (ctx.name (), WebSocketServerProtocolHandshakeHandler.class.getName (), new WebSocketServerProtocolHandshakeHandler (websocketPath, subprotocols, allowExtensions, maxFramePayloadLength, allowMaskMismatch, checkStartsWith);} if (cp.get (Utf8FrameValidator.class) = = null) {/ / Add the UFT8 checking before this one. Ctx.pipeline () .addBefore (ctx.name (), Utf8FrameValidator.class.getName (), new Utf8FrameValidator ());}
The code in the added WebSocketServerProtocolHandshakeHandler is as follows
Public void channelRead (final ChannelHandlerContext ctx, Object msg) throws Exception {final FullHttpRequest req = (FullHttpRequest) msg; if (isNotWebSocketPath (req)) {ctx.fireChannelRead (msg); return;} try {if (! GET.equals (req.method () {sendHttpResponse (ctx, req, new DefaultFullHttpResponse (HTTP_1_1, FORBIDDEN); return } final WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory (getWebSocketLocation (ctx.pipeline (), req, websocketPath), subprotocols, allowExtensions, maxFramePayloadSize, allowMaskMismatch); final WebSocketServerHandshaker handshaker = wsFactory.newHandshaker (req); if (handshaker = = null) {WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse (ctx.channel ()) } else {final ChannelFuture handshakeFuture = handshaker.handshake (ctx.channel (), req); handshakeFuture.addListener (new ChannelFutureListener () {@ Override public void operationComplete (ChannelFuture future) throws Exception {if (! future.isSuccess ()) {ctx.fireExceptionCaught (future.cause ()) } else {/ / Kept for compatibility ctx.fireUserEventTriggered (WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) Ctx.fireUserEventTriggered (new WebSocketServerProtocolHandler.HandshakeComplete (req.uri (), req.headers (), handshaker.selectedSubprotocol ());}) WebSocketServerProtocolHandler.setHandshaker (ctx.channel (), handshaker); ctx.pipeline (). Replace (this, "WS403Responder", WebSocketServerProtocolHandler.forbiddenHttpRequestResponder ());} finally {req.release ();}}
When accepting handshake information, two handler are added to encode and decode handler for websocket protocol information.
Public final ChannelFuture handshake (Channel channel, FullHttpRequest req, HttpHeaders responseHeaders, final ChannelPromise promise) {if (logger.isDebugEnabled ()) {logger.debug ("{} WebSocket version {} server handshake", channel, version ());} FullHttpResponse response = newHandshakeResponse (req, responseHeaders); ChannelPipeline p = channel.pipeline () If (p.get (HttpObjectAggregator.class)! = null) {p.remove (HttpObjectAggregator.class);} if (p.get (HttpContentCompressor.class)! = null) {p.remove (HttpContentCompressor.class);} ChannelHandlerContext ctx = p.context (HttpRequestDecoder.class); final String encoderName If (ctx = = null) {/ / this means the user use a HttpServerCodec ctx = p.context (HttpServerCodec.class); if (ctx = = null) {promise.setFailure (new IllegalStateException ("No HttpDecoder and no HttpServerCodec in the pipeline")); return promise } p.addBefore (ctx.name (), "wsdecoder", newWebsocketDecoder ()); p.addBefore (ctx.name (), "wsencoder", newWebSocketEncoder ()); encoderName = ctx.name ();} else {p.replace (ctx.name (), "wsdecoder", newWebsocketDecoder ()); encoderName = p.context (HttpResponseEncoder.class). Name () P.addBefore (encoderName, "wsencoder", newWebSocketEncoder ());} channel.writeAndFlush (response) .addListener (new ChannelFutureListener () {@ Override public void operationComplete (ChannelFuture future) throws Exception {if (future.isSuccess ()) {ChannelPipeline p = future.channel (). Pipeline (); p.remove (encoderName)) Promise.setSuccess ();} else {promise.setFailure (future.cause ());}); return promise;}
If you want to send a message to the client after the websocket protocol connection is successful, we find that fireUserEventTriggered is triggered after the handshake is successfully sent. Implement userEventTriggered and determine the evt type to handle it.
This is how the websocket based on netty sends abnormal data analysis when channelActive is triggered. The editor believes that there are some knowledge points that we may see or use in our daily work. I hope you can learn more from this article. For more details, please follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.