In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-25 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)06/03 Report--
Multithreaded programming-implementation of thread pool
Execution of components separated from tasks-thread pool
Https://github.com/wangbojing/threadpool
Multithreading technology mainly solves the problem of multi-thread execution in the processor unit. it can significantly reduce the idle time of the processor unit and increase the throughput capacity of the processor unit. Thread pool is a necessary component of multithreaded programming and is transparent and mysterious to many programmers. I am honored to be able to explain the reasons for this, but there are still some inadequacies. You are welcome to throw bricks.
The concept of thread pool is a tool used to manage a set of task execution threads. Since it is an administrative tool, the tool management is used to manage tasks and execution. As shown in figure 1, the component topology of thread pool consists of three parts: execution queue (Workers), task queue (Jobs) and pool management (Pool Manager).
The execution queue (Workers) is the queue where running threads are stored.
Task queues (Jobs) are used to store task queues that need to be executed.
Pool management (Pool Manager) mainly manages the execution order of the execution queue, the length of time to execute tasks, the release of execution units that have not been used for a long time, and the timely addition of execution units when the execution unit is running at full load; record the number of unexecuted tasks, join the queue for new tasks, dequeue tasks to be executed, and so on.
Figure 1 Topology diagram of thread pool components
What are the elements of each execution unit (Worker) in the execution queue (Workers)? Thread ID, exit flag.
What are the constituent elements of each Jobs in the task queue (Jobs)? Execute the specific execution function of each task, and the execution parameters of each task.
What elements does pool management (Pool Manager) consist of? Each new task adds a mutex for removal at execution time, and a condition variable that each thread waits while it is suspended.
According to the analysis of the class diagram of the second thread pool as shown in the figure.
Figure 2 class diagram of the thread pool
A simple thread pool is ready to come out here. The following is the implementation code
/ * * Author: WangBoJing * email: 1989wangbojing@gmail.com * github: https://github.com/wangbojing * / # include # define LL_ADD (item, list) do {\ item- > prev = NULL;\ item- > next = list;\ list = item;\} while (0) # define LL_REMOVE (item, list) do {\ if (item- > prev! = NULL) item- > prev- > next = item- > next \ if (item- > next! = NULL) item- > next- > prev > item- > prev;\ if (list = = item) list = item- > next;\ item- > prev = item- > next = NULL;\} while (0) typedef void (* JOB_CALLBACK) (void *); struct NTHREADPOOL;typedef struct NWORKER {pthread_t thread; int terminate; struct NTHREADPOOL * pool; struct NWORKER * next; struct NWORKER * prev;} nWorker;typedef struct NJOB {JOB_CALLBACK job_func; void * arg; struct NJOB * next; struct NJOB * prev } nJob;typedef struct NTHREADPOOL {struct NWORKER * workers; struct NJOB * jobs; pthread_mutex_t jobs_mtx; pthread_cond_t jobs_cond;} nThreadPool;void * ntyWorkerThread (void * arg) {nWorker* worker = (nWorker*) arg; while (1) {pthread_mutex_lock (& worker- > pool- > jobs_mtx); while (worker- > pool- > jobs = = NULL) {if (worker- > terminate) break; pthread_cond_wait (& worker- > pool- > jobs_cond, & worker- > pool- > jobs_mtx) } if (worker- > terminate) {pthread_mutex_unlock (& worker- > pool- > jobs_mtx); break;} nJob * job = worker- > pool- > jobs; if (job! = NULL) {LL_REMOVE (job, worker- > pool- > jobs);} pthread_mutex_unlock (& worker- > pool- > jobs_mtx); if (job = = NULL) continue; job- > job_func (job); usleep (1);} free (worker); pthread_exit (NULL) } int ntyThreadPoolCreate (nThreadPool * pool, int numWorkers) {if (pool = = NULL) return 1; if (numWorkers)
< 1) numWorkers = 1; memset(pool, 0, sizeof(nThreadPool)); pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER; memcpy(&pool->Jobs_cond, & blank_cond, sizeof (pool- > jobs_cond); pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER; memcpy (& pool- > jobs_mtx, & blank_mutex, sizeof (pool- > jobs_mtx)); int I = 0; for (I = 0
< numWorkers;i ++) { nWorker *worker = (nWorker*)malloc(sizeof(nWorker)); if (worker == NULL) { perror("malloc"); return 1; } memset(worker, 0, sizeof(nWorker)); worker->Pool = pool; int ret = pthread_create (& worker- > thread, NULL, ntyWorkerThread, (void*) worker); if (ret) {perror ("pthread_create"); free (worker); return 1;} LL_ADD (worker, worker- > pool- > workers);} void ntyThreadPoolShutdown (nThreadPool * pool) {nWorker * worker = NULL; for (worker = pool- > workers;worker! = NULL;worker = worker- > next) {worker- > terminate = 1 } pthread_mutex_lock (& pool- > jobs_mtx); pool- > workers = NULL; pool- > jobs = NULL; pthread_cond_broadcast (& pool- > jobs_cond); pthread_mutex_unlock (& pool- > jobs_mtx);} void ntyThreadPoolPush (nThreadPool * pool, nJob * job) {pthread_mutex_lock (& pool- > jobs_mtx); LL_ADD (job, pool- > jobs); pthread_cond_signal (& pool- > jobs_cond); pthread_mutex_unlock (& pool- > jobs_mtx) } / * debug thread pool * * / # define KING_MAX_THREADS 80#define KING_COUNTER_SIZE 1000void king_counter (void * arg) {nJob* job = (nJob*) arg; int index = * (int *) job- > arg Printf ("index:% d, selfid:%lu\ n", index, pthread_self ()); free (job- > arg); free (job);} int main (int argc, char * argv []) {nThreadPool pool; ntyThreadPoolCreate (& pool, KING_MAX_THREADS); int I = 0; for (I = 0
< KING_COUNTER_SIZE;i ++) { nJob *job = (nJob*)malloc(sizeof(nJob)); if (job == NULL) { perror("malloc"); exit(1); } job->Job_func = king_counter; job- > arg = malloc (sizeof (int)); * (int*) job- > arg = I; ntyThreadPoolPush (& pool, job);} getchar (); printf ("You are very good!\ n");}
Such a thread pool is still just a Demo for the following reasons that we need to improve.
The number of threads in the thread pool is determined and the size of the thread pool cannot be scaled with the number of system task requests.
Statistics on the number of tasks, but not on the task queue.
Count the number of threads in the execution task and the number of tasks waiting to be executed
There is no limit on the time for each task to be performed.
IO-intensive and compute-intensive are distinguished. Thread pools are not used frequently, but different configurations are set according to different business scenarios.
In the user task execution function, the user actively calls the protection mechanism of the pthread_exit exit thread
In view of the above problems, the first version of thread pool is improved.
/ * * Author: WangBoJing * email: 1989wangbojing@gmail.com * github: https://github.com/wangbojing * / # include # include typedef void (* JOB_CALLBACK) (void *); typedef struct NJOB {struct NJOB * next; JOB_CALLBACK func; void * arg;} nJob;typedef struct NWORKER {struct NWORKER * active_next; pthread_t active_tid;} nWorker;typedef struct NTHREADPOOL {struct NTHREADPOOL * forw; struct NTHREADPOOL * back; pthread_mutex_t mtx; pthread_cond_t busycv Pthread_cond_t workcv; pthread_cond_t waitcv; nWorker * active; nJob * head; nJob * tail; pthread_attr_t attr; int flags; unsigned int linger; int minimum; int maximum; int nthreads; int idle;} nThreadPool;static void* ntyWorkerThread (void* arg); # define NTY_POOL_WAIT 0x01#define NTY_POOL_DESTROY 0x02static pthread_mutex_t nty_pool_lock = PTHREAD_MUTEX_INITIALIZER;static sigset_t fillset;nThreadPool * thread_pool = NULL;static int ntyWorkerCreate (nThreadPool * pool) {sigset_t oset Pthread_t thread_id; pthread_sigmask (SIG_SETMASK, & fillset, & oset); int error = pthread_create (& thread_id, & pool- > attr, ntyWorkerThread, pool); pthread_sigmask (SIG_SETMASK, & oset, NULL); return error;} static void ntyWorkerCleanup (nThreadPool * pool) {--pool- > nthreads; if (pool- > flags & NTY_POOL_DESTROY) {if (pool- > nthreads = = 0) {pthread_cond_broadcast (& pool- > busycv) }} else if (pool- > head! = NULL & & pool- > nthreads
< pool->Maximum & & ntyWorkerCreate (pool) = = 0) {pool- > nthreads + +;} pthread_mutex_unlock (& pool- > mtx);} static void ntyNotifyWaiters (nThreadPool * pool) {if (pool- > head = = NULL & & pool- > active = = NULL) {pool- > flags & = NTY_POOL_WAIT; pthread_cond_broadcast (& pool- > waitcv);} static void ntyJobCleanup (nThreadPool * pool) {pthread_t tid = pthread_self (); nWorker * activep; nWorker * * activepp Pthread_mutex_lock (& pool- > mtx); for (activepp = & pool- > active; (activep = * activepp)! = NULL;activepp = & activep- > active_next) {* activepp = activep- > active_next; break;} if (pool- > flags & NTY_POOL_WAIT) ntyNotifyWaiters (pool);} static void* ntyWorkerThread (void* arg) {nThreadPool* pool = (nThreadPool*) arg; nWorker active; int timeout; struct timespec ts; JOB_CALLBACK func; pthread_mutex_lock (& pool- > mtx) Pthread_cleanup_push (ntyWorkerCleanup, pool); active.active_tid = pthread_self (); while (1) {pthread_sigmask (SIG_SETMASK, & fillset, NULL); pthread_setcanceltype (PTHREAD_CANCEL_DEFERRED, NULL); pthread_setcancelstate (PTHREAD_CANCEL_ENABLE, NULL); timeout = 0; pool- > idle + +; if (pool- > flags & NTY_POOL_WAIT) {ntyNotifyWaiters (pool) } while (pool- > head = = NULL & & (pool- > flags & NTY_POOL_DESTROY)) {if (pool- > nthreads minimum) {pthread_cond_wait (& pool- > workcv, & pool- > mtx);} else {clock_gettime (CLOCK_REALTIME, & ts); ts.tv_sec + = pool- > linger If (pool- > linger = = 0 | pthread_cond_timedwait (& pool- > workcv, & pool- > mtx, & ts) = = ETIMEDOUT) {timeout = 1; break;} pool- > idle -; if (pool- > flags & NTY_POOL_DESTROY) break; nJob * job = pool- > head; if (job! = NULL) {timeout = 0; func = job- > func; void * job_arg = job- > arg Pool- > head = job- > next; if (job = = pool- > tail) {pool- > tail = = NULL;} active.active_next = pool- > active; pool- > active = & active; pthread_mutex_unlock (& pool- > mtx); pthread_cleanup_push (ntyJobCleanup, pool); free (job); func (job_arg); pthread_cleanup_pop (1) } if (timeout & & (pool- > nthreads > pool- > minimum)) {break;}} pthread_cleanup_pop (1); return NULL;} static void ntyCloneAttributes (pthread_attr_t * new_attr, pthread_attr_t * old_attr) {struct sched_param param; void * addr; size_t size; int value; pthread_attr_init (new_attr); if (old_attr! = NULL) {pthread_attr_getstack (old_attr, & addr, & size) Pthread_attr_setstack (new_attr, NULL, size); pthread_attr_getscope (old_attr, & value); pthread_attr_setscope (new_attr, value); pthread_attr_getinheritsched (old_attr, & value); pthread_attr_setinheritsched (new_attr, value); pthread_attr_getschedpolicy (old_attr, & value); pthread_attr_setschedpolicy (new_attr, value); pthread_attr_getschedparam (old_attr, & param) Pthread_attr_setschedparam (new_attr, & param); pthread_attr_getguardsize (old_attr, & size); pthread_attr_setguardsize (new_attr, size);} pthread_attr_setdetachstate (new_attr, PTHREAD_CREATE_DETACHED);} nThreadPool * ntyThreadPoolCreate (int min_threads, int max_threads, int linger, pthread_attr_t * attr) {sigfillset (& fillset); if (min_threads > max_threads | | max_threads)
< 1) { errno = EINVAL; return NULL; } nThreadPool *pool = (nThreadPool*)malloc(sizeof(nThreadPool)); if (pool == NULL) { errno = ENOMEM; return NULL; } pthread_mutex_init(&pool->Mtx, NULL); pthread_cond_init (& pool- > busycv, NULL); pthread_cond_init (& pool- > workcv, NULL); pthread_cond_init (& pool- > waitcv, NULL); pool- > active = NULL; pool- > head = NULL; pool- > tail = NULL; pool- > flags = 0; pool- > linger = linger; pool- > minimum = min_threads; pool- > maximum = max_threads; pool- > nthreads = 0; pool- > idle = 0; ntyCloneAttributes (& pool- > attr, attr); attr (& attr) If (thread_pool = = NULL) {pool- > forw = pool; pool- > back = pool;} else {thread_pool- > back- > forw = pool; pool- > forw = thread_pool; pool- > back = pool- > back; thread_pool- > back = pool;} pthread_mutex_unlock (& nty_pool_lock); return pool } int ntyThreadPoolQueue (nThreadPool * pool, JOB_CALLBACK func, void * arg) {nJob* job = (nJob*) malloc (sizeof (nJob)); if (job = = NULL) {errno = ENOMEM; return-1;} job- > next = NULL; job- > func = func; job- > arg = arg; pthread_mutex_lock (& pool- > mtx); if (pool- > head = NULL) {pool- > head = job;} else {pool- > tail- > next = job;} If (pool- > idle > 0) {pthread_cond_signal (& pool- > workcv);} else if (pool- > nthreads)
< pool->Maximum & & ntyWorkerCreate (pool) = = 0) {pool- > nthreads +;} pthread_mutex_unlock (& pool- > mtx);} void nThreadPoolWait (nThreadPool * pool) {pthread_mutex_lock (& pool- > mtx); pthread_cleanup_push (pthread_mutex_unlock, & pool- > mtx); while (pool- > head! = NULL | | pool- > active! = NULL) {pool- > flags | = NTY_POOL_WAIT; pthread_cond_wait (& pool- > waitcv, & pool- > mtx) } pthread_cleanup_pop (1);} void nThreadPoolDestroy (nThreadPool * pool) {nWorker * activep; nJob * job; pthread_mutex_lock (& pool- > mtx); pthread_cleanup_push (pthread_mutex_unlock, & pool- > mtx); pool- > flags | = NTY_POOL_DESTROY; pthread_cond_broadcast (& pool- > workcv); for (activep = pool- > active;activep! = NULL;activep = activep- > active_next) {pthread_cancel (activep- > active_tid) } while (pool- > nthreads! = 0) {pthread_cond_wait (& pool- > busycv, & pool- > mtx);} pthread_cleanup_pop (1); pthread_mutex_lock (& nty_pool_lock); if (thread_pool = = pool) {thread_pool = pool- > forw;} if (thread_pool = = pool) {thread_pool = NULL;} else {pool- > back- > forw = pool- > forw; pool- > forw- > back = pool- > back } pthread_mutex_unlock (& nty_pool_lock); for (job = pool- > head;job! = NULL;job = pool- > head) {pool- > head = job- > next; free (job);} pthread_attr_destroy (& pool- > attr); free (pool) } / * * debug thread pool * * / void king_counter (void * arg) {int index = * (int*) arg; printf ("index:% d, selfid:% lu\ n", index, pthread_self ()); free (arg); usleep (1) } # define KING_COUNTER_SIZE 1000int main (int argc, char * argv []) {nThreadPool * pool = ntyThreadPoolCreate (10,20,15, NULL); int I = 0; for (I = 0 KING_COUNTER_SIZE;i +) {int* index = (int*) malloc (sizeof (int)); memset (index, 0, sizeof (int)); memcpy (index, & I, sizeof (int)); ntyThreadPoolQueue (pool, king_counter, index);} getchar () Printf ("You are very good!\ n");}
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.