In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly introduces the example analysis of Netty distributed encoder and write data event processing, which is very detailed and has certain reference value. Friends who are interested must read it!
Encoder Section 1: event Propagation of writeAndFlush
When we were learning pipeline, we explained the propagation process of write events, but in actual use, we usually do not call the write method of channel, because this method will only be written to the cache of sending data, not directly to channel. If you want to write to channel, you also need to call the flush method.
In actual use, we use more writeAndFlush method, which can not only write data to the send cache, but also refresh it to channel.
Let's look at the easiest scenario to use: public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception {ctx.channel () .writeAndFlush ("test data");}
Students who have studied netty must be familiar with this. In this way, the data can be sent to channel, and the other party can receive a response.
We follow the writeAndFlush method
You will first go to AbstractChannel's writeAndFlush:
Public ChannelFuture writeAndFlush (Object msg) {return pipeline.writeAndFlush (msg);}
Continue to follow the writeAndFlush method in DefualtChannelPipeline:
Public final ChannelFuture writeAndFlush (Object msg) {return tail.writeAndFlush (msg);}
Here we see that writeAndFlush is transmitted from the tail node. We have analyzed the event propagation in pipeline. I believe this is no stranger.
Continue to follow, you will follow the writeAndFlush method in AbstractChannelHandlerContext:
Public ChannelFuture writeAndFlush (Object msg) {return writeAndFlush (msg, newPromise ());}
Continue to follow:
Public ChannelFuture writeAndFlush (Object msg, ChannelPromise promise) {if (msg = = null) {throw new NullPointerException ("msg");} if (! validatePromise (promise, true)) {ReferenceCountUtil.release (msg); / / cancelled return promise;} write (msg, true, promise); return promise;}
Continue to follow the write method:
Private void write (Object msg, boolean flush, ChannelPromise promise) {/ / findContextOutbound () looks for the previous outbound node / / finally ends with the head node AbstractChannelHandlerContext next = findContextOutbound (); final Object m = pipeline.touch (msg, next); EventExecutor executor = next.executor (); if (executor.inEventLoop ()) {if (flush) {next.invokeWriteAndFlush (m, promise) } else {/ / does not tune flush next.invokeWrite (m, promise);}} else {AbstractWriteTask task; if (flush) {task = WriteAndFlushTask.newInstance (next, m, promise);} else {task = WriteTask.newInstance (next, m, promise);} safeExecute (executor, task, promise, m);}}
The logic here is no stranger to us, finding the next node, because the writeAndFlush starts from the tail node and is an event of outBound, so here we will find the last outBoundHandler of the tail node, which may be the encoder or the handler of our business processing.
If (executor.inEventLoop ()) determines whether it is an eventLoop thread. If not, it is encapsulated into task and executed asynchronously through nioEventLoop. Here, we first analyze it according to the eventLoop thread.
First of all, here we use flush to determine whether flush is called. This is obviously true, because the method we call is writeAndFlush.
We follow private void invokeWriteAndFlush (Object msg, ChannelPromise promise) {if (invokeHandler ()) {/ / write invokeWrite0 (msg, promise); / / refresh invokeFlush0 () in invokeWriteAndFlush;} else {writeAndFlush (msg, promise);}}
The truth is clear here. In fact, in writeAndFlush, the write is called first, and then the refresh is performed by calling the flush method after the write is completed.
First follow the invokeWrite0 method:
Private void invokeWrite0 (Object msg, ChannelPromise promise) {try {/ / calls the current handler's wirte () method ((ChannelOutboundHandler) handler ()) .write (this, msg, promise);} catch (Throwable t) {notifyOutboundHandlerException (t, promise);}}
We have analyzed this method in pipeline, that is, to call the write method of the current handler. If the write method in the current handler continues to propagate, it will continue to propagate the write event until it is propagated to the head node, and finally it will go to the write method of HeadContext.
Follow to the write method of HeadContext:
Public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {unsafe.write (msg, promise);}
Here, we will analyze the process of writing the current message to the cache through the unsafe object of the current channel.
Go back to the invokeWriteAndFlush method:
Private void invokeWriteAndFlush (Object msg, ChannelPromise promise) {if (invokeHandler ()) {/ / write invokeWrite0 (msg, promise); / / refresh invokeFlush0 ();} else {writeAndFlush (msg, promise);}} Let's look at invokeFlush0 method private void invokeFlush0 () {try {((ChannelOutboundHandler) handler ()) .flush (this);} catch (Throwable t) {notifyHandlerException (t) }}
Similarly, the flush method of the current handler will be called. If the flush method of the current handler continues to propagate the flush event, the flush event will continue to propagate until the flush method of the head node is finally called. If we are familiar with pipeline, we will be familiar with the logic here.
Follow to the flush method of HeadContext:
Public void flush (ChannelHandlerContext ctx) throws Exception {unsafe.flush ();}
Here again, the cached data will be flushed to the channel through the unsafe object of the current channel by calling the flush method. We will analyze the refresh logic in a later section.
These are all the contents of the article "sample Analysis of Netty distributed Encoder and write data event processing usage scenarios". Thank you for reading! Hope to share the content to help you, more related knowledge, welcome to follow the industry information channel!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.