In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)06/03 Report--
This article will introduce some important details of the server architecture and server programming of a project I have worked on.
I. the running environment of the program
Operating system: centos 7.0
Compiler: gcc/g++ 4.8.3 cmake 2.8.11
Mysql database: 5.5.47
Project code management tool: VS2013
I. Program structure
The program has a total of 17 threads, which are divided into nine database worker threads D and one log thread, six ordinary worker threads W and one main thread M. (these letters will be used to refer to these threads.)
(1) the use of database worker threads
Nine database worker threads establish connections with mysql at the beginning of thread startup, that is, each thread maintains a connection with mysql, with a total of nine database connections.
There are two task queues in each database worker thread at the same time, the first queue A stores the task sqlTask which needs to perform the database addition, deletion and modification operation, and the second queue B stores the results after the completion of sqlTask execution. Immediately after the sqlTask execution is completed, it is put into the result queue, so the tasks in the result queue are also tasks that need to be executed one by one. The pseudo code is roughly as follows:
Void db_thread_func () {while (! m_bExit) {if (NULL! = (pTask = m_sqlTask.Pop () {/ / after the task fetched from m_sqlTask is completed, pTask will carry the result data pTask- > Execute () / / immediately put the task in the result task queue m_resultTask.Push (pTask); continue;} sleep (1000);}}
Here comes the question:
Where do the tasks in task queue A come from? at present, there are only consumers, but there are no producers, so who is the producer?
Where will the tasks in Task queue B go? currently, there are only producers and no consumers.
Put these two questions aside for a while, and then I'll answer them later.
(II) worker thread and main thread
When introducing what the main thread and worker thread do, we introduce several concepts that are often abstracted from server programming (here take tcp connection as an example):
TcpServer is a Tcp service, and the server needs to bind the ip address and port number and listen for client connections on that port number (often a member variable TcpListener manages the listening details). So that's what a TcpServer needs to do. In addition, whenever a new connection arrives, TcpServer needs to receive new connections, and when multiple new connections exist, TcpServer needs to methodically manage these connections: connection establishment, disconnection, and so on, that is, generating and managing TcpConnection objects described below.
two。 A connection corresponds to a TcpConnection object, and the TcpConnection object manages some information about the connection: connection status, local and peer ip addresses and port numbers, and so on.
3. The data channel object Channel,Channel records the handle of socket, so it is the real executor of data sending and receiving on the connection, and the Channel object is generally used as a member variable of TcpConnection.
TcpSession object is to unpack the data collected by Channel, or pack the prepared data and send it to Channel.
To sum up: a TcpServer relies on TcpListener to listen and process new connections, relies on TcpConnection objects to manage the data on the connection, TcpConnection actually relies on Channel to send and receive data, and relies on TcpSession to pack and unpack data. That is to say, there is one TcpListener for a TcpServer, corresponding to multiple TcpConnection. There are several TcpSession for every TcpConnection, and there are only several Channel at the same time.
The TcpServer, TcpListener, TcpConnection, Channel, and TcpSession mentioned above are the network layer of the server framework. A good network framework should be decoupled from the business code. That is, the upper code only needs to get the data and execute the business logic, without paying attention to the sending and receiving of data, the packet and unpacking of network packets and the change of network state (such as network disconnection and reconnection).
Take the transmission of data, for example:
When the business logic gives the data to TcpSession,TcpSession to pack the data (there can be some encryption or compression operations after the packaging process), it is handed over to TcpConnection::SendData (), while TcpConnection::SendData () actually calls Channel::SendData (), because Channel contains a socket handle, so Channel::SendData () actually calls the send () / sendto () / write () method to send the data out.
For data reception, there is a slight difference:
After determining which TcpConnection has data arrival through IO multiplex technologies such as select () / poll () / epoll (), activate the Channel object of the TcpConnection to call recv () / recvfrom () / read () to receive the data. After the data is received, the data will be processed by TcpSession and finally handed over to the business layer. Note that data collection, unpacking and even handing over to the business layer must be separated. What I mean is: it's best not to unpack and give it to the business layer and put it together with the logic of data collection. Because data collection is an IO operation, while unpacking and handing over to the business layer are logical computing operations. IO operations are generally slower than logical calculations. How to make a choice according to the server business? that is to say, if you want to decide whether the performance bottleneck of your server program is network IO or logical calculation, even if it is network IO, it can be divided into uplink operation and downlink operation. Uplink operation means that the client sends data to the server, and downlink means that the server sends data to the client. Sometimes the upstream of the data is less and the downlink is large. (for example, the game server, a npc moves the location, the upstream is the client notifies the server of its own latest location, and the downlink is the server to tell each client present.)
In my blog "Server-side programming experience (1)-- the division of labor between the main thread and the worker thread", the flow of the worker thread is introduced:
While (! m_bQuit) {epoll_or_select_func (); handle_io_events (); handle_other_things ();}
Among them, epoll_or_select_func () is to determine which TcpConnection has data arrival through IO multiplex technologies such as select () / poll () / epoll () mentioned above. My server code generally monitors only socket readable events, not socket writable events. As for how to send data, it will be described later in the article. So for readable events, take epoll as an example, the identity bits that need to be set here are:
EPOLLIN normal readable event (when this event is generated when the connection is normal, the recv () / read () function returns the number of bytes received; when the connection is closed, the two functions return 0, which means that we can set this flag to detect new data and peer shutdown events)
EPOLLRDHUP peer shutdown event (it is said in the linux man manual that this event can monitor peer shutdown, but when I actually debug, it does not trigger this event even if peer shutdown is sent. It is still EPOLLIN, but if the recv () / read () function is called at this time, the return value will be 0, so it remains to be verified whether peer shutdown can be monitored by setting this flag in the actual project)
EPOLLPRI out-of-band data
In muduo, the timeout event for epoll_wait is set to 1 millisecond, and my other project sets the epoll_wait timeout to 10 milliseconds. These two values are for your reference.
In this project, both the worker thread and the main thread are the logic in the above code, and the main thread listens for readable events on the socket, that is, to monitor whether a new connection is coming. There is an epollfd on the main thread and on each worker thread. If a new connection comes, it is received in the handle_io_events () of the main thread. Which thread's epollfd is the newly connected socket handle attached to? The approach taken here is the round-robin algorithm, where there is an object CWorkerThreadManager that records the work status on each worker thread. The pseudo code is roughly as follows:
Void attach_new_fd (int newsocketfd) {workerthread = get_next_worker_thread (next); workerthread.attach_to_epollfd (newsocketfd); + + next; if (next > max_worker_thread_num) next = 0;}
That is, start with the epollfd of the first worker thread to hook up the newcomer socket, and then accumulate the index, so that the next time it is the second worker thread. If the number of worker threads is exceeded, restart from the first job. The problem of "load balancing" for new connections to socket is solved here. There is another detail to pay attention to in the actual code: how many struct epoll_event should be set in the function of epoll_wait to be reasonable? The concern is that more waste, less is not enough, I once used 4096 directly in a project:
Const int EPOLL_MAX_EVENTS = 4096th Const int dwSelectTimeout = 10000 struct epoll_event events [Epoll _ MAX_EVENTS]; int nfds = epoll_wait (m_fdEpoll, events, EPOLL_MAX_EVENTS, dwSelectTimeout / 1000)
I found in Chen Shuo's muduo network library that the author used a better idea, that is, to dynamically expand the number: at first, it was n. When the number of fd with events reached n, the number of struct epoll_event was adjusted to 2n. Next time, if it was not enough, it became 4N. And so on, the author skillfully made use of the continuity of stl::vector in memory to achieve this idea:
/ / initialization code std::vector events_ (16); / / Code in the thread loop while (m_bExit) {int numEvents =:: epoll_wait (epollfd_, & * events_.begin (), static_cast (events_.size ()), 1); if (numEvents > 0) {if (static_cast (numEvents) = = events_.size ()) {events_.resize (events_.size () * 2) }}}
At this point, you may think that all the worker thread does is call handle_io_events () to receive network data, but in fact, the worker thread can also do some work on the business logic of the program. It's in handle_other_things (). So how do you add this work to handle_other_things ()? Write a queue, put the task in the queue first, and then let handle_other_things () take it out of the queue to do? I also borrowed from the muduo library in this project. That is, a series of function pointers are called in handle_other_things (), with the pseudo code as follows:
Void do_other_things () {somefunc ();} / / m_functors is a stl::vector, where each element is a function pointer void somefunc () {for (size_t I = 0; I).
< m_functors.size(); ++i) { m_functors[i](); } m_functors.clear();} 当任务产生时,只要我们将执行任务的函数push_back到m_functors这个stl::vector对象中即可。但是问题来了,如果是其他线程产生的任务,两个线程同时操作m_functors,必然要加锁,这也会影响效率。muduo是这样做的: void add_task(const Functor& cb){ std::unique_lock lock(mutex_); m_functors.push_back(cb); }void do_task(){ std::vector functors; { std::unique_lock lock(mutex_); functors.swap(m_functors); } for (size_t i = 0; i < functors.size(); ++i) { functors[i](); }} 看到没有,利用一个栈变量functors将m_functors中的任务函数指针倒换(swap)过来了,这样大大减小了对m_functors操作时的加锁粒度。前后变化:变化前,相当于原来A给B多少东西,B消耗多少,A给的时候,B不能消耗;B消耗的时候A不能给。现在变成A将东西放到篮子里面去,B从篮子里面拿,B如果拿去一部分后,只有消耗完了才会来拿,或者A通知B去篮子里面拿,而B忙碌时,A是不会通知B来拿,这个时候A只管将东西放在篮子里面就可以了。 bool bBusy = false;void add_task(const Functor& cb){ std::unique_lock lock(mutex_); m_functors_.push_back(cb); //B不忙碌时只管往篮子里面加,不要通知B if (!bBusy) { wakeup_to_do_task(); }}void do_task(){ bBusy = true; std::vector functors; { std::unique_lock lock(mutex_); functors.swap(pendingFunctors_); } for (size_t i = 0; i < functors.size(); ++i) { functors[i](); } bBusy = false;} 看,多巧妙的做法! 因为每个工作线程都存在一个m_functors,现在问题来了,如何将产生的任务均衡地分配给每个工作线程。这个做法类似上文中如何将新连接的socket句柄挂载到工作线程的epollfd上,也是round-robin算法。上文已经描述,此处不再赘述。 还有种情况,就是希望任务产生时,工作线程能够立马执行这些任务,而不是等epoll_wait超时返回之后。这个时候的做法,就是使用一些技巧唤醒epoll_wait,linux系统可以使用socketpair或timerevent、eventfd等技巧(这个细节在我的博文《服务器端编程心得(一)-- 主线程与工作线程的分工》已经详细介绍过了)。 上文中留下三个问题: 数据库线程任务队列A中的任务从何而来,目前只有消费者,没有生产者,那么生产者是谁? 2.数据库线程任务队列B中的任务将去何方,目前只有生产者没有消费者。 3.业务层的数据如何发送出去? 问题1的答案是:业务层产生任务可能会交给数据库任务队列A,这里的业务层代码可能就是工作线程中do_other_things()函数执行体中的调用。至于交给这个9个数据库线程的哪一个的任务队列,同样采用了round-robin算法。所以就存在一个对象CDbThreadManager来管理这九个数据库线程。下面的伪码是向数据库工作线程中加入任务: bool CDbThreadManager::AddTask(IMysqlTask* poTask ){ if (m_index >= m_dwThreadsCount) {m_index = 0;} return m _ aoMysqlThreads [m _ index++] .AddTask (poTask);}
Similarly, the consumer in question 2 may also be a call in the execution body of the do_other_things () function.
Now for question 3, after the data in the business layer is generated and packaged by TcpSession, if you need to send it, generate the do_other_things () that the task throws to the worker thread, and then send it in the relevant Channel. Because the writable events on the socket are not monitored, the data may block when you call send () or write (). It doesn't matter, sleep () for a while, continue to send, keep trying, until the data is sent. The pseudo codes are as follows:
Bool Channel::Send () {int offset = 0; while (true) {int n =:: send (socketfd, buf + offset, length-offset); if (n =-1) {if (errno = = EWOULDBLOCK) {:: sleep }} / / the other party closes socket, and it is recommended that else if (n = = 0) {close (socketfd); return false;} offset + = n; if (offset > = length) break;} return true;}
Finally, there is one logging thread that has not been introduced, and high-performance logging implementations are not common at the moment. Limited to the length of the article, we will introduce it next time.
Zhangyl 12:35 in the evening of December 02, 2016
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.