In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
This paper will analyze the scheduling strategy of libgo from the source code implementation, which is mainly related to the definition of the three structures in the previous article:
Scheduler Scheduler (S) Actuator Processer (P) Co-programming Task (T)
The relationship between the three is shown in the following figure:
This article will list the main members and functions of the class for analysis.
1. Cooperative Scheduler: class Scheduler
Libgo/scheduler/scheduler.h
Class Scheduler {public: / * * create a scheduler and initialize libgo * create the executor of the main thread. If there are no parameters in the subsequent STart, only one executor will do it by default * when only one thread is used for co-program scheduling, the execution of the co-program will strictly follow its creation order. * * / static Scheduler* Create (); / * * create a cooperative Task object and add it to the task queue of the current executor processer. * the number of tasks of the scheduler taskCount_ + 1 * * / void CreateTask (TaskF const& fn, TaskOpt const& opt); / * start the scheduler * @ minThreadNumber: the minimum number of scheduled threads is set to the number of cpu cores when 0. * @ maxThreadNumber: the maximum number of scheduled threads. When 0, set to minThreadNumber. * if maxThreadNumber is greater than minThreadNumber, the number of scheduled threads can be automatically expanded when the protocol is blocked for a long time. * Wake up timer thread * each scheduling thread calls Process to start scheduling, and finally starts the scheduling thread with id 0 * if maxThreadNumber_ > 1, the scheduling thread DispatcherThread * * / void Start (int minThreadNumber = 1, int maxThreadNumber = 0) will be started. / * * stop scheduling, which cannot be recovered after stopping. It can only be used to safely exit the main function * if a scheduling thread is blocked by a co-program, you must wait for the blocking to end before exiting. * * / void Stop (); private: / * * scheduling threads, mainly to balance the load of multiple processer and transfer the steal in the high load or blocked p to the low load p * if all the blocks are blocked but there are still processes to be executed, new threads will be created, and the number of threads will not exceed maxThreadNumber_ * the co-routines in the blocking P will be allocated to the less loaded P * / void DispatcherThread () / * * create a new Processer and add it to the double-end queue processers_ * * / void NewProcessThread (); private: atomic_t taskCount_ {0}; / / to count the number of protocols in the Deque processers_; / / DispatcherThread double-ended queue, which is used to store all executors. Each executor will open a separate thread to execute, and call back the Process () method in the thread. Optional lock provided by LFLock started_; / / libgo}
The scheduler is responsible for managing 1 million scheduling threads, each with an executor Processer. The scheduler is only responsible for balancing the load of each actuator, preventing all jamming, and does not involve the switching of cooperative programs.
Use
Ligbo provides the default cooperative scheduler co_sched
# define g_Scheduler:: co::Scheduler::getInstance () # define co_sched g_Scheduler
Users can also create their own collaborative scheduler
Co::Scheduler* my_sched = co::Scheduler::Create ()
Start scheduling
Std::thread t ([my_sched] {mysched- > Start ();}); t.detach (); Scheduler principle
Schedule is responsible for the cooperative scheduling of the whole system, and the operation of the cooperative program depends on the executor Processer (P for short), so the number of P will be created (supporting dynamic growth) when the scheduler is initialized, and all executors will be added to the double-ended queue. The main thread, also as an executor, is created when the Scheduler object is created and is located at the position where the subscript of the double-ended queue is 0 (Note: just create the object and do not start running)
When the Start () function is called, it will officially start running. Inside the Start function, a specified number of executors P will be created, depending on the parameters. By default, minThreadNumber will be created. When all the executors are blocked, the number will be expanded dynamically, up to a maximum of maxThreadNumber. Each executor runs on a separate thread, and the executor is responsible for the switching and execution of the coprograms within that thread.
When a co-program is created, it will be added to an active actuator, or to a P if none happens to be active, which does not affect the normal operation of the executor, because the scheduler's scheduling thread will process it.
Inside the Start function, in addition to the thread where the executor is located, the scheduling thread DispatcherThread will be opened, and the scheduling thread will balance the number and load of each P for steal. If all P are blocked, the number of P will be dynamically increased according to the maxThreadNumber. If only part of P is blocked, all of the blocked P will be steal, which will be distributed to the P with the least load.
Schedule also selectively starts the timer thread of the cooperative process.
Start the FastSteadyClock thread.
The implementation of timers and clocks will be discussed in a later article.
two。 Co-program executor: class Processer
Libgo/scheduler/processer.h
Each co-program executor corresponds to a thread, which is responsible for the co-scheduling of this thread, but it is not thread-safe and is the core of the co-scheduling.
Class Processer {public: / / co-program suspend flag, which is used for subsequent wake-up and timeout judgment struct SuspendEntry {/ /...}; / / co-program cuts out ALWAYS_INLINE static void StaticCoYield (); / / suspends the current co-program static SuspendEntry Suspend (); / / suspends the current co-program and automatically wakes up static SuspendEntry Suspend (FastSteadyClock::duration dur) after the specified time / / Wake up the co-program static bool Wakeup (SuspendEntry const& entry); private: / * * the schedule of the executor to the co-program, which is also the current main processing logic of the executor * * / void Process () / * * steal n co-routines from the current actuator and return * n 0, then steal them all, otherwise take out the corresponding number * * / SList Steal (std::size_t n); / * * mark the current actuator to detect whether the co-program is blocked * * / void Mark (); private: int id_ / / Thread id, corresponding to the _ processer subscript in shcedule, Scheduler * scheduler_; / / the scheduler that the executor depends on volatile bool active_ = true; / / the active state of the executor, which indicates that the actuator is not blocked, and the scheduler's scheduling thread controls volatile int64_t markTick_ = 0; / / the timestamp volatile uint64_t markSwitch_ of / / mark is 0 / / mark volatile uint64_t switchCount_ = 0; / / number of scheduling / / currently running Task* runningTask_ {nullptr}; Task* nextTask_ {nullptr}; / / queue typedef TSQueue TaskQueue; TaskQueue runnableQueue_; / / running queue TaskQueue waitQueue_; / / waiting for queue TSQueue gcQueue_ / / the coprogram queue to be reclaimed will be added to the queue after it has been run, waiting for TaskQueue newQueue_; / / newly added to the executor to be reclaimed, including those that have just come from steal. The coprograms in this queue will not be executed for the time being, and will be continuously added to the runnableQueue_ by the Process () function. / / the number of times the co-program is scheduled / / the condition variable std::mutex cvMutex_; std::condition_variable cv_; std::atomic_bool waiting_ {false};} that the executor waits; the executor schedules the co-program Process ()
The executor Processer maintains three thread-safe co-program queues:
RunnableQueue_: can run the queue of cooperators; waitQueue_: stores pending cooperators; newQueue_: this queue stores newly added cooperators, including newly created ones, wake-up pending ones, and those from steal Void Processer::Process () {GetCurrentProcesser () = this; bool & isStop = * stop_; while (! isStop) {runnableQueue_.front (runningTask_); / / get one that can run on the if (! runningTask_) {if (AddNewTasks ()) runnableQueue_.front (runningTask_) If (! runningTask_) {WaitCondition (); / / there is no wait condition variable AddNewTasks (); continue;}} addNewQuota_ = 1; while (runningTask_ & &! isStop) {runningTask_- > state_ = TaskState::runnable RunningTask_- > proc_ = this; + + switchCount_; runningTask_- > SwapIn (); switch (runningTask_- > state_) {case TaskState::runnable: {std::unique_lock lock (runnableQueue_.LockRef ()); auto next = (Task*) runningTask_- > next If (next) {runningTask_ = next; runningTask_- > check_ = runnableQueue_.check_; break;} if (addNewQuota_
< 1 || newQueue_.emptyUnsafe()) { runningTask_ = nullptr; } else { lock.unlock(); if (AddNewTasks()) { runnableQueue_.next(runningTask_, runningTask_); -- addNewQuota_; } else { std::unique_lock lock2(runnableQueue_.LockRef()); runningTask_ = nullptr; } } } break; case TaskState::block: { std::unique_lock lock(runnableQueue_.LockRef()); runningTask_ = nextTask_; nextTask_ = nullptr; } break; case TaskState::done: default: { runnableQueue_.next(runningTask_, nextTask_); if (!nextTask_ && addNewQuota_ >0) {if (AddNewTasks ()) {runnableQueue_.next (runningTask_, nextTask_);-- addNewQuota_ }} DebugPrint (dbg_task, "task (% s) done.", runningTask_- > DebugInfo ()); runnableQueue_.erase (runningTask_) If (gcQueue_.size () > 16) / / after execution, resources need to be recovered: GC (); gcQueue_.push (runningTask_); if (runningTask_- > eptr_) {std::exception_ptr ep = runningTask_- > eptr_ Std::rethrow_exception (ep);} std::unique_lock lock (runnableQueue_.LockRef ()); runningTask_ = nextTask_; nextTask_ = nullptr;} break }
Until the scheduler Schedule executes the Stop () function, executor P will always be in the scheduling protocol phase Process (). During this period, the executor P will execute the first co-program acquisition in the run queue runnableQueue. If the runnable queue is empty, the executor will try to add the co-program in the newQueue to the runnable queue. If the newQueue_ is empty, the actuator is in a non-cooperative schedulable state. By setting the condition variable, the actuator will be set to the waiting state.
When an executable co-program is obtained, the task of the executable co-program is executed. The execution process of the collaborative program is realized through the state machine. (the cooperative program has three states: running, blocking, and execution completed.)
For running co-programs, we only need to determine the next co-program object to be executed; for blocked co-routines, the state will be switched here only when the co-program is suspended (the Suspend method is called), so we only need to execute nextTask at this time For a running co-program, the state will be switched here only after the execution of the Task handler function is completed, so it is necessary to consider reclaiming the resource of the co-program; condition variable
Processer uses std::mutex and provides conditional variables to wake up. When the scheduler tries to get the next runnable co-program object, if no co-program object is available at this time, it will actively wait for the condition variable, with a default timeout of 100 milliseconds.
Void Processer::WaitCondition () {GC (); std::unique_lock lock (cvMutex_); waiting_ = true; cv_.wait_for (lock, std::chrono::milliseconds); waiting_ = false;} void Processer::NotifyCondition () {cv_.notify_all ();}
When the scheduler adds a new collaborator object to the executor, the condition variable is awakened and the Process process continues. The efficiency of waking up with conditional variables is much higher than that of constantly polling.
Why do you set a timeout and poll regularly after using a condition variable, and want it to return even if it is not awakened?
Because we do not want threads to block here, as long as there is no new co-program to join, we have been dying and so on. We hope that while the thread is waiting, it can also jump out regularly and perform some other detection work.
Steal a specified number of cooperators from the actuator-> steal ()
To put it simply, to get a co-program from an actuator is to get the number of nodes executed from the double-ended queue maintained by the actuator.
Why take it out? As mentioned earlier, either the actuator is overloaded or the actuator is in a blocking state.
SList Processer::Steal (std::size_t n) {if (n > 0) {/ / steal specifies the number of coprograms newQueue_.AssertLink (); auto slist = newQueue_.pop_back (n); newQueue_.AssertLink (); if (slist.size () > = n) return slist; std::unique_lock lock (runnableQueue_.LockRef ()) Bool pushRunningTask = false, pushNextTask = false; if (runningTask_) pushRunningTask = runnableQueue_.eraseWithoutLock (runningTask_, true) | | slist.erase (runningTask_, newQueue_.check_); if (nextTask_) pushNextTask = runnableQueue_.eraseWithoutLock (nextTask_, true) | | slist.erase (nextTask_, newQueue_.check_); auto slist2 = runnableQueue_.pop_backWithoutLock (n-slist.size ()) If (pushRunningTask) runnableQueue_.pushWithoutLock (runningTask_); if (pushNextTask) runnableQueue_.pushWithoutLock (nextTask_); lock.unlock (); slist2.append (std::move (slist)); if (! slist2.empty ()) DebugPrint (dbg_scheduler, "Proc (% d). Stealed =% d", id_, (int) slist2.size ()) Return slist2;} else {/ / steal all newQueue_.AssertLink (); auto slist = newQueue_.pop_all (); newQueue_.AssertLink (); std::unique_lock lock (runnableQueue_.LockRef ()); bool pushRunningTask = false, pushNextTask = false; if (runningTask_) pushRunningTask = runnableQueue_.eraseWithoutLock (runningTask_, true) | | slist.erase (runningTask_, newQueue_.check_) If (nextTask_) pushNextTask = runnableQueue_.eraseWithoutLock (nextTask_, true) | | slist.erase (nextTask_, newQueue_.check_); auto slist2 = runnableQueue_.pop_allWithoutLock (); if (pushRunningTask) runnableQueue_.pushWithoutLock (runningTask_); if (pushNextTask) runnableQueue_.pushWithoutLock (nextTask_); lock.unlock (); slist2.append (std::move (slist)) If (! slist2.empty ()) DebugPrint (dbg_scheduler, "Proc (% d). Stealed all =% d", id_, (int) slist2.size ()); return slist2;}}
First of all, the cooperative nodes will be obtained from the newQueue queue, because the nodes in the newQueue have not been added to the running queue, so they can be taken out directly; if the number of collaborators in the newQueue is insufficient, the nodes will continue to be obtained from the tail of the runnableQueue queue. Special processing is required because we record the runningTask that is being executed and the nextTask that will be executed next time in the runnableQueue queue. The runningTask & nextTask is removed from the queue before stealing the protocol from the runnableQueue and added to the current runnableQueue_ queue again after the node is stolen.
To put it simply, the work of stealing a co-program will not get the co-program identified by runningTask & nextTask from the queue.
Void Processer::Mark () {if (runningTask_ & & markSwitch_! = switchCount_) {markSwitch_ = switchCount_; markTick_ = NowMicrosecond ();}} uint32_t cycle_timeout_us = 10 * 1000; bool Processer::IsBlocking () {if (! markSwitch_ | | markSwitch_! = switchCount_) return false; return NowMicrosecond () > markTick_ + CoroutineOptions::getInstance (). Cycle_timeout_us;}
The Mark function is called in the scheduler's scheduling function, and it is important to note that it is only called when the executor is active. As the name implies, Mark marks the execution, records the timestamp of the mark, and records how many times it was marked in the process of scheduling. The function of Mark is to detect the blockage of the executor.
The active actuator is always performing the cooperative program switching, so the value of switchCount_ will be continuously increased. According to the IsBlocking function, when the number of cooperative scheduling times recorded by the tag exceeds 10ms does not change, we think that the actuator will block and Scheduler will perform Steal operation.
Co-program suspends Suspendstatic SuspendEntry Suspend ()
One way is to suspend it directly, change the state of the protocol to TaskState::block, then delete the protocol from runnableQueue and add it to waitQueue.
Another way is to allow you to automatically wake up the protocol after a period of time after suspension (after the execution of the first method is complete).
Wakeup
Used to wake up the cooperative program.
What the wake protocol needs to do is to remove the wake-up protocol from the waitQueue_ and add it back to the newQueue_.
StaticCoYield
Used to cut out the current protocol in an actuator
There are two possibilities, one is that the co-program is blocked and needs to be suspended, and the other is that the co-program is actively cut out after the execution of the co-program.
The specific implementation is achieved by getting the Task of the current executor and calling the SwapOut () method.
ALWAYS_INLINE void Processer::StaticCoYield () {auto proc = GetCurrentProcesser (); if (proc) proc- > CoYield ();} ALWAYS_INLINE void Processer::CoYield () {Task * tk = GetCurrentTask (); assert (tk); + + tk- > yieldCount_;#if ENABLE_DEBUGGER DebugPrint (dbg_yield, "yield task (% s) state =% s", tk- > DebugInfo (), GetTaskStateName (tk- > state_)) If (Listener::GetTaskListener ()) Listener::GetTaskListener ()-> onSwapOut (tk- > id_); # endif tk- > SwapOut ();} several questions to pay attention to > several situations in which the context of a collaborative program may be cut out: the collaborative program is suspended The execution of the cooperation program is completed; the user actively cuts out the co_yield. # define co_yield do {:: co::Processer::StaticCoYield ();} while (0) > several cases in which the co-program is suspended: the system function is called by hook;libgo_poll (libgo_poll is called by the io operation function of hook) selectsleep, usleep, nanosleep call the co-program lock CoMutex (co_mutex), the co-program read-write lock CoRWMutex (co_rwmutex), or channel is used. > there are several situations in which the context of the protocol is cut into: the executor is during the Process; the wake-up and suspend protocol does not cut into the context, but is re-added to the newQueue_ from the waiting queue. 3. Co-program object: struct Task# co-program status enum class TaskState {runnable, / / runnable block, / / blocking done, / / co-program finished running}; function template struct Task {TaskState state_ = TaskState::runnable; uint64_t id_; / / under the current scheduler provided by typedef std::function TaskF; / / croup11, TaskF fn_ starting from 0 / / the function uint64_t yieldCount_ that the co-program runs is uint64_t yieldCount_ = 0; / / the number of times that the co-program cuts out Processer* proc_ / / context information Processer* proc_ = which executor / / which executor to belong to / / three functions ALWAYS_INLINE void SwapIn () are provided, namely, cut-in, cut-out, and switch to the specified thread. ALWAYS_INLINE void SwapTo (Task* other); ALWAYS_INLINE void SwapOut (); private: static void StaticRun (intptr_t vp); / / the function with the parameter Task*, will execute the fn_ () of the Task. After the execution is completed, the state of the co-program will be changed to TaskState::done, and} will be cut in the executor P
Each Task object is a co-program. In the process of using it, creating a co-program is actually creating a Task object and adding it to the corresponding executor P. As mentioned earlier, the executor's co-program scheduling is implemented through a state machine, where the TaskState is the co-program state. The co-program function fn_ is called in the StaticRun static method, which is registered in the co-program context _ ctx.
In addition, the Task class also provides the cut-in and cut-out methods of the co-program, which essentially calls the context switch.
StaticRun
To control the operation of the protocol, the Task::Run () method is called internally. After the execution of the fn_ function, the state of the program is changed to TaskState::done, and the program is cut out.
Void Task::Run () {auto call_fn = [this] () {this- > fn_ (); this- > fn_ = TaskF (); / / Let the destructing of the function object of the protocol also be performed in the program};\. Call_fn ();\... State_ = TaskState::done; Processer::StaticCoYield ();} void Task::StaticRun (intptr_t vp) {Task* tk = (Task*) vp; tk- > Run ();}
This is the description of the libgo scheduling related implementation, which skips the implementation of the timer and clock parts, which will be described separately later. The code involved in this article is in the source directory
Libgo-master/libgo/scheduler/processer.cpp libgo-master/libgo/scheduler/processer.hlibgo-master/libgo/scheduler/scheduler.cpplibgo-master/libgo/scheduler/scheduler.h
Interested readers can learn from the source code. Welcome to discuss and learn.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.