Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

The method of Netty distributed NioEventLoop Task queue execution

2025-01-27 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly introduces the relevant knowledge of "Netty distributed NioEventLoop task queue execution method". The editor shows you the operation process through an actual case, and the operation method is simple, fast and practical. I hope this article "Netty distributed NioEventLoop task queue execution method" can help you solve the problem.

Execute task queue

Go back to the run () method of NioEventLoop:

Protected void run () {for (;;) {try {switch (selectStrategy.calculateStrategy (selectNowSupplier, hasTasks () {case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: / / polling io event (1) select (wakenUp.getAndSet (false)) If (wakenUp.get ()) {selector.wakeup ();} default:} cancelledKeys = 0; needsToSelectAgain = false; / / defaults to 50 final int ioRatio = this.ioRatio If (ioRatio = = 100) {try {processSelectedKeys ();} finally {runAllTasks ();}} else {/ / record start time final long ioStartTime = System.nanoTime () Try {/ / process polled key (2) processSelectedKeys ();} finally {/ / calculation time final long ioTime = System.nanoTime ()-ioStartTime / / execute task (3) runAllTasks (ioTime * (100-ioRatio) / ioRatio);} catch (Throwable t) {handleLoopException (t);} / / Code omission}}

We see that after processing the polled key, we first record the time spent, and then perform the tasks in the taskQueue through runAllTasks (ioTime * (100-ioRatio) / ioRatio)

We know that ioRatio defaults to 50, so after executing ioTime * (100-ioRatio) / ioRatio, the value passed in by the method is ioTime, which is the execution time of processSelectedKeys ():

Follow up the runAllTasks method: protected boolean runAllTasks (long timeoutNanos) {/ / aggregate tasks fetchFromScheduledTaskQueue () in the scheduled task queue; / / take a task Runnable task = pollTask () from a normal taskQ; / / if task is empty, you will directly return if (task = = null) {/ / run all the tasks to execute the final operation afterRunningAllTasks (); return false } / / if the queue is not empty / / first calculate a deadline (+ 50 milliseconds, because the task is executed, do not exceed this time) final long deadline = ScheduledFutureTask.nanoTime () + timeoutNanos; long runTasks = 0; long lastExecutionTime; / / execute each task for (;;) {safeExecute (task); / / mark the currently finished task runTasks + + / / when running 64 tasks, calculate the current time if ((runTasks & 0x3F) = = 0) {/ / scheduled task initialization to the current time lastExecutionTime = ScheduledFutureTask.nanoTime (); / / if the deadline is exceeded, do not execute (nanoTime () is time-consuming) if (lastExecutionTime > = deadline) {break }} / / if this time is not exceeded, continue to get the task task = pollTask () from the normal task queue; / / until there is no task execution if (task = = null) {/ / record the last execution time lastExecutionTime = ScheduledFutureTask.nanoTime (); break }} / / closing work afterRunningAllTasks (); this.lastExecutionTime = lastExecutionTime; return true;}

First, the method fetchFromScheduledTaskQueue () is executed, which means to aggregate tasks from the scheduled task queue, that is, to add tasks that can be executed from scheduled tasks to the taskQueue.

We follow up the fetchFromScheduledTaskQueue () method private boolean fetchFromScheduledTaskQueue () {long nanoTime = AbstractScheduledEventExecutor.nanoTime (); / / grab the first timed task from the scheduled task queue / / find the task with a deadline of nanoTime Runnable scheduledTask = pollScheduledTask (nanoTime) / / if the timed task queue is not empty, while (scheduledTask! = null) {/ / if it fails to add if (! taskQueue.offer (scheduledTask)) {/ / to the scheduled task queue again, scheduledTaskQueue () .add ((ScheduledFutureTask) scheduledTask); return false } / / continue to pull tasks from the scheduled task queue / / after the execution of the method is completed, all scheduled task queues that meet the running conditions are added to the ordinary task queue scheduledTask = pollScheduledTask (nanoTime);} return true;}

Long nanoTime = AbstractScheduledEventExecutor.nanoTime () represents how long has passed since the scheduled task was initialized.

Runnable scheduledTask= pollScheduledTask (nanoTime) represents a task that is less than the nanoTime time from the scheduled task queue. Because it is less than the time from initialization to the present, it means that the task needs to be executed.

Follow to the pollScheduledTask (nanoTime) method of its parent class AbstractScheduledEventExecutor:

Protected final Runnable pollScheduledTask (long nanoTime) {assert inEventLoop (); / / get the scheduled task queue Queue scheduledTask = scheduledTaskQueue = = null? Null: scheduledTaskQueue.peek (); if (scheduledTask = = null) {return null;} if (scheduledTask.deadlineNanos ()) scheduledTask) re-added to the scheduled task queue

If the addition is successful, continue to add it through the pollScheduledTask (nanoTime) method until there are no tasks to perform

This adds the tasks that need to be performed by the scheduled task queue to the taskQueue

Go back to the runAllTasks (long timeoutNanos) method protected boolean runAllTasks (long timeoutNanos) {/ / aggregate tasks fetchFromScheduledTaskQueue () in the scheduled task queue; / / get a task Runnable task = pollTask () from the ordinary taskQ; / / if task is empty, you will directly return if (task = = null) {/ / run all the tasks to finish the operation afterRunningAllTasks (); return false } / / if the queue is not empty / / first calculate a deadline (+ 50 milliseconds, because the task is executed, do not exceed this time) final long deadline = ScheduledFutureTask.nanoTime () + timeoutNanos; long runTasks = 0; long lastExecutionTime; / / execute each task for (;;) {safeExecute (task); / / mark the currently finished task runTasks + + / / when running 64 tasks, calculate the current time if ((runTasks & 0x3F) = = 0) {/ / scheduled task initialization to the current time lastExecutionTime = ScheduledFutureTask.nanoTime (); / / if the deadline is exceeded, do not execute (nanoTime () is time-consuming) if (lastExecutionTime > = deadline) {break }} / / if this time is not exceeded, continue to get the task task = pollTask () from the normal task queue; / / until there is no task execution if (task = = null) {/ / record the last execution time lastExecutionTime = ScheduledFutureTask.nanoTime (); break }} / / closing work afterRunningAllTasks (); this.lastExecutionTime = lastExecutionTime; return true;}

First get a task from taskQueue by Runnable task = pollTask ()

If the task is not empty, calculate a deadline by final long deadline = ScheduledFutureTask.nanoTime () + timeoutNanos. The execution time of the task cannot exceed this time.

Then execute task through safeExecute (task) in the for loop

We follow safeExecute (task):

Protected static void safeExecute (Runnable task) {try {/ / directly calls the run () method to execute task.run ();} catch (Throwable t) {/ / does not terminate logger.warn ("A task raised an exception. Task: {}", task, t);}}

Here, the run () method of task is directly called to execute. If an exception occurs, only a log is printed, which means that the exception does not stop, and the execution continues.

Go back to the runAllTasks (long timeoutNanos) method:

Protected boolean runAllTasks (long timeoutNanos) {/ / aggregate tasks fetchFromScheduledTaskQueue () in the scheduled task queue; / / get a task Runnable task = pollTask () from a normal taskQ; / / if task is empty, it directly returns if (task = = null) {/ / afterRunningAllTasks () that finalizes the execution of all tasks; return false } / / if the queue is not empty / / first calculate a deadline (+ 50 milliseconds, because the task is executed, do not exceed this time) final long deadline = ScheduledFutureTask.nanoTime () + timeoutNanos; long runTasks = 0; long lastExecutionTime; / / execute each task for (;;) {safeExecute (task); / / mark the currently finished task runTasks + + / / when running 64 tasks, calculate the current time if ((runTasks & 0x3F) = = 0) {/ / scheduled task initialization to the current time lastExecutionTime = ScheduledFutureTask.nanoTime (); / / if the deadline is exceeded, do not execute (nanoTime () is time-consuming) if (lastExecutionTime > = deadline) {break }} / / if this time is not exceeded, continue to get the task task = pollTask () from the normal task queue; / / until there is no task execution if (task = = null) {/ / record the last execution time lastExecutionTime = ScheduledFutureTask.nanoTime (); break }} / / closing work afterRunningAllTasks (); this.lastExecutionTime = lastExecutionTime; return true;}

Each time the task is executed, the runTasks increases itself.

Here if ((runTasks & 0x3F) = = 0) represents whether 64 tasks have been executed. If 64 tasks have been executed, the initialization time of the scheduled task to the present will be recorded by lastExecutionTime = ScheduledFutureTask.nanoTime (). If this time exceeds the deadline, the loop will exit.

If the deadline is not exceeded, continue pop-up task execution through task = pollTask ()

64 tasks are executed here to count the time instead of each task. The main reason is that getting the system time is a time-consuming operation. This is an optimization method of netty.

If there is no task to execute, do the finishing touches through afterRunningAllTasks (), and finally record the last execution time

This is the end of the introduction to "the method of Netty distributed NioEventLoop task queue execution". Thank you for reading. If you want to know more about the industry, you can follow the industry information channel. The editor will update different knowledge points for you every day.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report