In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/01 Report--
This article will explain in detail the example analysis of Netty distributed pipeline pipeline propagation outBound events. The editor thinks it is very practical, so I share it with you as a reference. I hope you can get something after reading this article.
Outbound event transfer process
In our business code, it is possible to use the wirte method to write data:
Public void channelActive (ChannelHandlerContext ctx) throws Exception {ctx.channel () write ("test data");}
Of course, directly calling the write method cannot write data to the other party's channel, because this way can only be written to the buffer, and you have to call the flush method to brush the buffer data into the channel, or call the writeAndFlush method directly. We will explain the logic in detail in later chapters. Here we just take the wirte method as an example to demonstrate the flow of the propagation of outbound events.
Here we also give two ways to write public void channelActive (ChannelHandlerContext ctx) throws Exception {/ / 1 ctx.channel (). Write ("test data"); / / 2 ctx.write ("test data");}
What's the difference between these two ways of writing? first of all, let's follow the first one:
Ctx.channel () .write (test data)
Get the channel bound by ctx here
We follow the write method of AbstractChannel:
Public ChannelFuture write (Object msg) {return pipeline.write (msg);}
Here pipeline is DefaultChannelPipeline.
Follow its write method: public final ChannelFuture write (Object msg) {/ / start from the tail node (write forward from the last node) return tail.write (msg);}
Here we call the tail node write method, and here we should be able to analyze that the outbound event begins to propagate upward through the tail node, and with this guess, let's move on.
In fact, the tail node does not override the write method and eventually calls the write method of its parent class AbstractChannelHandlerContext
The write method of AbstractChannelHandlerContext:
Public ChannelFuture write (Object msg) {return write (msg, newPromise ());}
We see that there is a newPromise () method here, which is to create a Promise object. We will analyze the relevant knowledge about Promise in later chapters.
Let's continue with write:
Public ChannelFuture write (final Object msg, final ChannelPromise promise) {/ / Code omits write (msg, false, promise); return promise;}
Continue to follow write:
Private void write (Object msg, boolean flush, ChannelPromise promise) {AbstractChannelHandlerContext next = findContextOutbound (); final Object m = pipeline.touch (msg, next); EventExecutor executor = next.executor (); if (executor.inEventLoop ()) {if (flush) {next.invokeWriteAndFlush (m, promise);} else {/ / flush next.invokeWrite (m, promise) } else {AbstractWriteTask task; if (flush) {task = WriteAndFlushTask.newInstance (next, m, promise);} else {task = WriteTask.newInstance (next, m, promise);} safeExecute (executor, task, promise, m);}}
This is somewhat similar to the channelRead method we analyzed in the previous section, but the direction of event transmission is different, where findContextOutbound () is to get the HandlerContext that marked the last outbound event.
Follow private AbstractChannelHandlerContext findContextOutbound () {AbstractChannelHandlerContext ctx = this; do {ctx = ctx.prev;} while (! ctx.outbound); return ctx;} in findContextOutbound
The logic here is familiar to us, similar to our findContextInbound () method in the previous section, except that the process is reversed.
Here, you will find the last node of the current context. If the marked event is not an outbound event, then continue to look up, which means to find the last node that marked the outbound event.
Go back to the write method: AbstractChannelHandlerContext next = findContextOutbound ()
Here you will find the node to assign to the next attribute
Because the write event we analyzed earlier is propagated from the tail node, the last node may be the context to which the user-defined handler belongs.
Then determine whether it is the current eventLoop thread, if not, encapsulate it as task asynchronous execution, if not, continue to determine whether the flush method is called, because we do not have a call here, so we will execute to next.invokeWrite (m, promise)
Let's keep talking to invokeWrite.
Private void invokeWrite (Object msg, ChannelPromise promise) {if (invokeHandler ()) {invokeWrite0 (msg, promise);} else {write (msg, promise);}}
Here we will determine whether the current handler state is added. What is returned here is true, which will go to the step of invokeWrite0 (msg, promise).
Continue with invokeWrite0private void invokeWrite0 (Object msg, ChannelPromise promise) {try {/ / call the current handler's wirte () method ((ChannelOutboundHandler) handler ()) .write (this, msg, promise);} catch (Throwable t) {notifyOutboundHandlerException (t, promise);}}
The logic here is also familiar. The write method of handler wrapped by the current node is called. If the user does not override the write method, it will be handled by the parent class.
Let's follow up on ChannelOutboundHandlerAdapter's write method to see:
Public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {ctx.write (msg, promise);}
The write method of the current ctx is called here, which is the same as the one we wrote at the beginning of the section. Let's review:
Public void channelActive (ChannelHandlerContext ctx) throws Exception {/ / 1 ctx.channel () .write ("test data"); / / 2 ctx.write ("test data");}
We follow its write method, and here we come to the write method of the AbstractChannelHandlerContext class:
Private void write (Object msg, boolean flush, ChannelPromise promise) {AbstractChannelHandlerContext next = findContextOutbound (); final Object m = pipeline.touch (msg, next); EventExecutor executor = next.executor (); if (executor.inEventLoop ()) {if (flush) {next.invokeWriteAndFlush (m, promise);} else {/ / flush next.invokeWrite (m, promise) } else {AbstractWriteTask task; if (flush) {task = WriteAndFlushTask.newInstance (next, m, promise);} else {task = WriteTask.newInstance (next, m, promise);} safeExecute (executor, task, promise, m);}}
We are familiar with the logic again, find the last node of the current node that marked the event as the outbound event, and continue to execute the invokeWrite method. According to the previous analysis, we know that it will eventually be executed into the write method of the previous handler.
It is not difficult to understand that ctx.channel (). Write ("test data") actually spreads write events from the tail node, while ctx.write ("test data") spreads write events from itself.
Therefore, if you override the write method in handler to pass write events, you must use ctx.write ("test data") or leave it to its parent class to handle, rather than ctx.channel (). Write ("test data"), because it will cause the event to be retransmitted from the tail node every time the event is transmitted here, resulting in an unpredictable error
If the write method of handler is not overridden in the code, the event will be transmitted all the way up, and after all the outbound nodes have been transferred, it will finally go to the wirte method of the head node
We follow public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {unsafe.write (msg, promise);} in HeadContext's write method.
We see that the write event will eventually flow here, with the final write operation through the unsafe object
The transmission of inbound events and outbound events can be illustrated by the following figure:
This is the end of this article on "example Analysis of Netty distributed pipeline Pipeline Propagation outBound events". I hope the above content can be of some help to you, so that you can learn more knowledge. if you think the article is good, please share it for more people to see.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.