Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use architecture for Libtask source code analysis of protocol library

2025-01-30 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/03 Report--

This article mainly explains how to use the architecture of the Libtask source code analysis of the protocol library. The content of the explanation is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn how to use the architecture of the Libtask source code analysis of the protocol library.

Libtask is written by google boss Russ Cox (the core developer of Go). This article introduces the basic principles of libtask. Let's start with the main function of libtask, which is the c function we use in the c language. Libtask itself implements the main function. When users use libtask, they want to implement the taskmain function. The function declarations of taskmain and main are the same. Let's take a look at the main function.

Int main (int argc, char * * argv) {struct sigaction sa, osa; / / register the SIGQUIT signal processing function memset (& sa, 0, sizeof sa); sa.sa_handler = taskinfo; sa.sa_flags = SA_RESTART; sigaction (SIGQUIT, & sa, & osa); / / Save the command line argument argv0 = argv [0]; taskargc = argc; taskargv = argv If (mainstacksize = = 0) mainstacksize = 256 / 1024; / / create the first taskcreate (taskmainstart, nil, mainstacksize); / / start scheduling taskscheduler (); fprint (2, "taskscheduler returned in main!\ n"); abort (); return 0;}

The two main logic of the main function are the taskcreate and taskscheduler functions. Let's take a look at taskcreate first.

Int taskcreate (void (* fn) (void*), void* arg, uint stack) {int id; Task * t; t = taskalloc (fn, arg, stack); taskcount++; id = t-> id; / / record location t-> alltaskslot = nalltask; / / save to alltask alltask [nalltask++] = t / / modify the state to be ready, can be scheduled, and join the ready queue taskready (t); return id;}

Taskcreate first calls taskalloc to assign a structure Task that represents the protocol. Let's look at the definition of this structure.

Struct Task {char name [256]; / / offset known to acid char state [256]; / / Front and back pointer Task * next; Task * prev; Task * allnext; Task * allprev; / / execution context Context context; / / Sleep time uvlong alarmtime; uint id; / / Stack information uchar * stk Uint stksize; / / whether int exiting; / / the index int alltaskslot; / / in alltask is the system protocol int system; / / ready state int ready; / / entry function void (* startfn) (void*); / / entry parameter void* startarg / / Custom data void * udata;}

Then take a look at the implementation of taskalloc.

/ / allocate memory needed for a co-program and initialize some fields static Task* taskalloc (void (* fn) (void*), void* arg, uint stack) {Task* t; sigset_t zero; uint x, y; ulong z; / * allocate the task and stack together * / / structure itself size + stack size t = malloc (sizeof * t+stack); memset (t, 0, sizeof * t) / / memory location of the stack t-> stk = (uchar*) (tsp 1); / / stack size t-> stksize = stack; / / id t-> id = + + taskidgen; / / co-programming working functions and parameters t-> startfn = fn; t-> startarg = arg; / * do a reasonable initialization * / memset (& t-> context.uc, 0, sizeof t-> context.uc); sigemptyset (& zero) / / the initialization uc_sigmask field is empty, that is, the signal sigprocmask (SIG_BLOCK, & zero, & t-> context.uc.uc_sigmask) is not blocked; / * must initialize with current context * / / initialize the uc field getcontext (& t-> context.uc) / / set the stack position and size of the protocol execution t-> context.uc.uc_stack.ss_sp = t-> stk+8 T-> context.uc.uc_stack.ss_size = t-> stksize-64; z = (ulong) t; y = z; z > > = 16; / * hide undefined 32-bit shift from 32-bit compilers * / x = z > > 16; / / Save the information to the uc field makecontext (& t-> context.uc, (void (*) () taskstart, 2, y, x); return t;}

The taskalloc function code looks like a lot, but the logic is not complicated, just apply for the memory needed by the Task structure and the memory of the stack at execution time, and then initialize the fields. In this way, a cooperative process is born. Then execute taskready to add the co-program to the ready queue.

/ / modify the state of the protocol to be ready and join the ready queue void taskready (Task * t) {t-> ready = 1; addtask (& taskrunqueue, t)} / / insert the protocol into the queue. If it was previously in another queue, void addtask (Tasklist * l, Task * t) {if (l-> tail) {1-> tail- > next = t; t-> prev = 1-> tail will be removed } else {l-> head = t; t-> prev = nil;} l-> tail = t; t-> next = nil;}

Taskrunqueue records all ready collaborations. After the co-program is created and queued, the co-program has not yet started execution, just like the processes and threads of the operating system, a scheduler is needed to schedule execution. Let's take a look at the implementation of the scheduler.

/ / static void taskscheduler (void) {int i; Task * t; for (;;) {/ / exit if (taskcount = = 0) exit (taskexitval) if there is no user protocol; / / pull out a taskrunqueue.head from the ready queue If (t = = nil) {fprint (2, "no runnable tasks!% d tasks stalled\ n", taskcount); exit (1);} / remove the deltask (& taskrunqueue, t) from the ready queue; t-> ready = 0; / / Save the executing taskrunning = t / / the number of switches plus one tasknswitch++; / / switch to t execution, and save the current context to taskschedcontext (that is, the code to be executed below) contextswitch (& taskschedcontext, & t-> context); / / until this indicates that no co-program is executing (t switch back), leave taskrunning = nil / / if the co-program t just executed exits if (t-> exiting) {/ / is not a system co-program, then the number of if (! t-> system) taskcount--; / / the index of the current co-program in alltask I = t-> alltaskslot / / change the last nalltask to the location of the current one, because he is exiting alltask [I] = alltask [--nalltask]; / / updating the index alltask of the replaced protocol [I]-> alltaskslot = I; / / freeing heap memory free (t);}

The code of the scheduler looks like a lot, but the core logic takes a co-program t from the ready queue and moves t out of the ready queue 2 to the co-program t through contextswitch to execute 3 co-program t to switch back to the scheduling center, if t has exited, modify the data structure, and then reclaim the memory he occupies. If t does not exit, continue to schedule other co-program execution. At this point, Xiecheng began to run. And there is also a dispatching system. The scheduling mechanism here is relatively simple, that is, it is ready to schedule according to the first-in-first-out mode, and it is non-preemptive. In other words, there is no concept of scheduling according to time slices, the execution time of a cooperative program is determined by itself, and the power to give up execution is also controlled by itself. when the cooperative program does not want to execute, it can call taskyield to give up cpu.

/ / cpu int taskyield (void) {int n; / / number of times the current handover protocol n = tasknswitch; / / insert into the ready queue, waiting for the subsequent scheduling taskready (taskrunning); taskstate ("yield"); / / switching protocol taskswitch () / / equal to 0 means there is currently only one tasknswitch, so here minus one return tasknswitch-n-1;}} / * switch the protocol, taskrunning is the context of the scheduling program (main thread), switch to the scheduling center, and keep the current context to taskrunning- > context * / void taskswitch (void) {needstack (0) Contextswitch (& taskrunning- > context, & taskschedcontext);} / / logical static void contextswitch for switching protocols (Context * from, Context * to) {if (& from- > uc, & to- > uc)

< 0){ fprint(2, "swapcontext failed: %r\n"); assert(0); } } yield的逻辑也很简单,因为协程在执行的时候,是不处于就绪队列的,当协程准备让出cpu时,协程首先把自己重新加入到就绪队列,等待下次被调度执行。当然我们也可以直接调度contextswitch切换到其他协程。重点在于什么时候应该让出cpu,又什么时候应该被调度执行。接下来会详细讲解。至此,我们已经有了支持协程所需要的底层基础。我们看到这个实现的思路也不是很复杂,首先有一个队列表示待执行的的协程,每一个协程对应一个Task结构体。然后调度中心不断地按照先进先出的方式去调度协程的执行就可以。因为没有抢占机制,所以调度中心是依赖协程本身去驱动的,协程需要主动让出cpu,把上下文切换回调度中心,调度中心才能进行下一轮的调度。接下来我们看看,基于这些底层基础,如果实现一个基于协程的服务器。下面我们通过一个例子进行讲解。 void taskmain(int argc, char **argv) { // 启动一个tcp服务器 if((fd = netannounce(TCP, 0, atoi(argv[1]))) < 0){ // ... } // 改为非阻塞模式 fdnoblock(fd); // accept成功后创建一个客户端协程 while((cfd = netaccept(fd, remote, &rport)) >

= 0) {taskcreate (proxytask, (void*) cfd, STACK);}}

We just said that taskmain is the function we need to implement. First, set up a tcp server through netannounce. Then change fd to non-blocking, which is very important, because when you call accept later, if it is a blocking file descriptor, it will cause the process to hang, but in non-blocking mode, the operating system will return the error code of EAGAIN, through which we can decide what to do next. Let's look at the implementation of netaccept.

/ / process (remove) connect int netaccept (int fd, char * server, int * port) {int cfd, one; struct sockaddr_in sa; uchar * ip; socklen_t len; / / register the event to epoll, wait for the event to trigger fdwait (fd,'r'); len = sizeof sa / / if there is a connection when triggered, execute accept if ((cfd = accept (fd, (void*) & sa, & len)

< 0){ return -1; } // 和客户端通信的fd也改成非阻塞模式 fdnoblock(cfd); one = 1; setsockopt(cfd, IPPROTO_TCP, TCP_NODELAY, (char*)&one, sizeof one); return cfd; } netaccept就是通过调用accept逐个处理tcp连接,但是在accept之前,有一个非常重要的操作fdwait。 // 协程因为等待io需要切换 void fdwait(int fd, int rw) { // 是否已经初始化epoll if(!startedfdtask){ startedfdtask = 1; epfd = epoll_create(1); // 没有初始化则创建一个协程,做io管理 taskcreate(fdtask, 0, 32768); } struct epoll_event ev = {0}; // 记录事件对应的协程和感兴趣的事件 ev.data.ptr = taskrunning; switch(rw){ case 'r': ev.events |= EPOLLIN | EPOLLPRI; break; case 'w': ev.events |= EPOLLOUT; break; } int r = epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev); // 切换到其他协程,等待被唤醒 taskswitch(); // 唤醒后函数刚才注册的事件 epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &ev); } fdwait首先把fd注册到epoll中,然后把协程切换到下一个待执行的协程。这里有个细节,当协程X被调度执行的时候,他是脱离了就绪队列的,而taskswitch函数只是实现了切换上下文到调度中心,调度中心会从就绪队列从选择下一个协程执行,那么这时候,脱离就绪队列的协程X就处于孤岛状态,看起来再也无法给调度中心选中执行,这个问题的处理方式是,把协程、fd和感兴趣的事件信息一起注册到epoll中,当epoll监听到某个fd的事件发生时,就会把对应的协程加入就绪队列,这样协程就可以被调度执行了。在fdwait函数一开始那里处理了epoll相关的逻辑。epoll的逻辑也是在一个协程中执行的,但是epoll所在协程和一般协程不一样,类似于操作系统的内核线程一样,epoll所在的协程成为系统协程,即不是用户定义的,而是系统定义的。我们看一下实现 void fdtask(void *v) { int i, ms; Task *t; uvlong now; // 变成系统协程 tasksystem(); struct epoll_event events[1000]; for(;;){ /* let everyone else run */ // 大于0说明还有其他就绪协程可执行,则先让给他们执行,否则往下执行 while(taskyield() >

0); / * we're the only one runnable-poll for iUnip o * / errno = 0; / / blocking if ((t=sleeping.head) = = nil) ms =-1 if there is no timing event; else {/ * sleep at most 5s * / now = nsec () If (now > = t-> alarmtime) ms = 0; else if (now+5*1000*1000*1000LL > = t-> alarmtime) ms = (t-> alarmtime-now) / 1000000; else ms = 5000;} int nevents / / wait for the event to occur. Ms is the timeout if for waiting ((nevents = epoll_wait (epfd, events, 1000, ms)

< 0){ if(errno == EINTR) continue; fprint(2, "epoll: %s\n", strerror(errno)); taskexitall(0); } /* wake up the guys who deserve it */ // 事件触发,把对应协程插入就绪队列 for(i=0; i= t->

Alarmtime) {deltask (& sleeping, t); if (! t-> system & &-- sleepingcounted = = 0) taskcount--; taskready (t);}

We see that the processing logic of epoll is similar to that of a normal server, blocking through epoll_wait, and then processing every event that occurs when epoll_wait returns, and libtask also supports timeout events. In addition, when there are other ready cooperators in libtask, it will not enter epoll_wait, it will give cpu to the ready co-program (through the taskyield function), and when the ready queue has only the co-program where epoll is located, it will enter the logic of epoll. At this point, we have seen how to turn asynchrony into synchronization in libtask. When a user wants to call an interface that may cause a process to hang, he can call a corresponding API provided by libtask. For example, if we want to read a file, we can call libtask's fdread.

Int fdread (int fd, void * buf, int n) {int m; / / non-blocking read. Register to epoll if not satisfied. Refer to fdread1 while ((m=read (fd, buf, n)) < 0 & & errno = = EAGAIN) fdwait (fd,'r'); return m;}

In this way, there is no need to worry about the process being suspended, and there is no need to deal with epoll-related logic (registration events, handling when events are triggered, etc.). The way for libtask to switch from asynchronous to synchronous is to first register the user's fd in the epoll by providing the corresponding API, and then switch to another protocol. When the epoll listens to the event, the corresponding protocol will be inserted into the ready queue. When the protocol is selected by the dispatch center for execution, it will continue to execute the remaining logic without causing the process to suspend, because the waiting conditions have been met.

Thank you for reading, the above is the content of "how to use the architecture of the Libtask source code analysis of the protocol library". After the study of this article, I believe you have a deeper understanding of how to use the architecture of the Libtask source code analysis of the protocol library, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report