一、線程池概述
線程池是一種多線程處理形式,處理過程中將任務(wù)添加到隊(duì)列,然后在創(chuàng)建線程后自動(dòng)啟動(dòng)這些任務(wù)。線程池線程都是后臺(tái)線
程。每個(gè)線程都使用默認(rèn)的堆棧大小,以默認(rèn)的優(yōu)先級(jí)運(yùn)行,并處于多線程單元中。如果某個(gè)線程在托管代碼中空閑(如正在等待
某個(gè)事件),則線程池將插入另一個(gè)輔助線程來使所有處理器保持繁忙。如果所有線程池線程都始終保持繁忙,但隊(duì)列中包含掛起
的工作,則線程池將在一段時(shí)間后創(chuàng)建另一個(gè)輔助線程但線程的數(shù)目永遠(yuǎn)不會(huì)超過最大值。超過最大值的線程可以排隊(duì),但他們
要等到其他線程完成后才啟動(dòng)。應(yīng)用程序可以有多個(gè)線程,這些線程在休眠狀態(tài)中需要耗費(fèi)大量時(shí)間來等待事件發(fā)生。其他線程
可能進(jìn)入睡眠狀態(tài),并且僅定期被喚醒以輪循更改或更新狀態(tài)信息,然后再次進(jìn)入休眠狀態(tài)。為了簡化對(duì)這些線程的管理,為每
個(gè)進(jìn)程提供了一個(gè)線程池,一個(gè)線程池有若干個(gè)等待操作狀態(tài),當(dāng)一個(gè)等待操作完成時(shí),線程池中的輔助線程會(huì)執(zhí)行回調(diào)函數(shù)。
線程池中的線程由系統(tǒng)管理,程序員不需要費(fèi)力于線程管理,可以集中精力處理應(yīng)用程序任務(wù)。
使用線程池的好處:
1、減少在創(chuàng)建和銷毀線程上所花的時(shí)間以及系統(tǒng)資源的開銷;
2、如不使用線程池,有可能造成系統(tǒng)創(chuàng)建大量線程而導(dǎo)致消耗完系統(tǒng)內(nèi)存以及”過度切換”。
在什么情況下使用線程池:
1、單個(gè)任務(wù)處理的時(shí)間比較短;
2、將需處理的任務(wù)的數(shù)量大。
在什么情況下不使用線程池線程:
1、如果需要使一個(gè)任務(wù)具有特定優(yōu)先級(jí);
2、如果具有可能會(huì)長時(shí)間運(yùn)行(并因此阻塞其他任務(wù))的任務(wù);
3、如果需要將線程放置到單線程單元中(線程池中的線程均處于多線程單元中);
4、如果需要永久標(biāo)識(shí)來標(biāo)識(shí)和控制線程,比如想使用專用線程來終止該線程,將其掛起或按名稱發(fā)現(xiàn)它。
二、線程池程序
main.c
#include <stdio.h>#include <unistd.h>#include "thread_pool.h"/**********************************************************************功能:
任務(wù)處理函數(shù)*參數(shù): 無*返回值: NULL*********************************************************************/void *task_test(void *arg){ printf("/t/tworking on task %d/n", (int)arg); sleep(1); /*休息一秒,延長任務(wù)的執(zhí)行時(shí)間*/ return NULL;}int main (int argc, char *argv[]){ pool_t pool; int i = 0; pool_init(&pool, 2);//初始化線程池 sleep(1); for(i = 0; i < 5; i++){ sleep(1); pool_add_task(&pool, task_test, (void *)i);//向線程池中添加一個(gè)任務(wù) } sleep(4); pool_uninit(&pool);//銷毀線程池 return 0;}
thread_pool.c
#include <stdio.h>#include <stdlib.h>#include <pthread.h>#include <assert.h>#include "thread_pool.h"static void *pool_thread_server(void *arg);/**********************************************************************功能: 初始化線程池結(jié)構(gòu)體并創(chuàng)建線程*參數(shù): pool:線程池句柄*
threads_limit:線程池中線程的數(shù)量*返回值: 無*********************************************************************/void pool_init(pool_t *pool, int threads_limit){ pool->threads_limit = threads_limit; pool->queue_head = NULL; pool->task_in_queue = 0; pool->destroy_flag = 0; /*創(chuàng)建存放線程ID的空間*/ pool->threadid = (pthread_t *)calloc(threads_limit, sizeof(pthread_t)); int i = 0; /*初始化互斥鎖和條件變量*/ pthread_mutex_init(&(pool->queue_lock), NULL); pthread_cond_init(&(pool->queue_ready), NULL); /*循環(huán)創(chuàng)建threads_limit個(gè)線程*/ for (i = 0; i < threads_limit; i++){ pthread_create(&(pool->threadid[i]), NULL, pool_thread_server, pool); } return;}/**********************************************************************功能: 銷毀線程池,等待隊(duì)列中的任務(wù)不會(huì)再被執(zhí)行,*
但是正在運(yùn)行的線程會(huì)一直,把任務(wù)運(yùn)行完后再退出*參數(shù): 線程池句柄*返回值: 成功:0,失敗非0*********************************************************************/int pool_uninit(pool_t *pool){ pool_task *head = NULL; int i; pthread_mutex_lock(&(pool->queue_lock)); if(pool->destroy_flag)/* 防止兩次調(diào)用 */ return -1; pool->destroy_flag = 1; pthread_mutex_unlock(&(pool->queue_lock)); /* 喚醒所有等待線程,線程池要銷毀了 */ pthread_cond_broadcast(&(pool->queue_ready)); /* 阻塞等待線程退出,否則就成僵尸了 */ for (i = 0; i < pool->threads_limit; i++) pthread_join(pool->threadid[i], NULL); free(pool->threadid); /* 銷毀等待隊(duì)列 */ pthread_mutex_lock(&(pool->queue_lock)); while(pool->queue_head != NULL){ head = pool->queue_head; pool->queue_head = pool->queue_head->next; free(head); } pthread_mutex_unlock(&(pool->queue_lock)); /*條件變量和互斥量也別忘了銷毀*/ pthread_mutex_destroy(&(pool->queue_lock)); pthread_cond_destroy(&(pool->queue_ready)); return 0;}/**********************************************************************功能: 向任務(wù)隊(duì)列中添加一個(gè)任務(wù)*參數(shù): pool:線程池句柄*
process:任務(wù)處理函數(shù)*
arg:任務(wù)參數(shù)*返回值: 無*********************************************************************/static void enqueue_task(pool_t *pool, pool_task_f process, void *arg){ pool_task *task = NULL; pool_task *member = NULL; pthread_mutex_lock(&(pool->queue_lock)); if(pool->task_in_queue >= pool->threads_limit){ printf("task_in_queue > threads_limit!/n"); pthread_mutex_unlock (&(pool->queue_lock)); return; } task = (pool_task *)calloc(1, sizeof(pool_task)); assert(task != NULL); task->process = process; task->arg = arg; task->next = NULL; pool->task_in_queue++; member = pool->queue_head; if(member != NULL){ while(member->next != NULL) /* 將任務(wù)加入到任務(wù)鏈連的最后位置. */ member = member->next; member->next = task; }else{ pool->queue_head = task; /* 如果是第一個(gè)任務(wù)的話,就指向頭 */ } printf("/ttasks %d/n", pool->task_in_queue); /* 等待隊(duì)列中有任務(wù)了,喚醒一個(gè)等待線程 */ pthread_cond_signal (&(pool->queue_ready)); pthread_mutex_unlock (&(pool->queue_lock));}/**********************************************************************功能: 從任務(wù)隊(duì)列中取出一個(gè)任務(wù)*參數(shù): 線程池句柄*返回值: 任務(wù)句柄*********************************************************************/static pool_task *dequeue_task(pool_t *pool){ pool_task *task = NULL; pthread_mutex_lock(&(pool->queue_lock)); /* 判斷線程池是否要銷毀了 */ if(pool->destroy_flag){ pthread_mutex_unlock(&(pool->queue_lock)); printf("thread 0x%lx will be destroyed/n", pthread_self()); pthread_exit(NULL); } /* 如果等待隊(duì)列為0并且不銷毀線程池,則處于阻塞狀態(tài) */ if(pool->task_in_queue == 0){ while((pool->task_in_queue == 0) && (!pool->destroy_flag)){ printf("thread 0x%lx is leisure/n", pthread_self()); /* 注意:pthread_cond_wait是一個(gè)原子操作,等待前會(huì)解鎖,喚醒后會(huì)加鎖 */ pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock)); } }else{ /* 等待隊(duì)列長度減去1,并取出隊(duì)列中的第一個(gè)元素 */ pool->task_in_queue--; task = pool->queue_head; pool->queue_head = task->next; printf("thread 0x%lx received a task/n", pthread_self()); } pthread_mutex_unlock(&(pool->queue_lock)); return task;}/**********************************************************************功能: 向線程池中添加一個(gè)任務(wù)*參數(shù): pool:線程池句柄*
process:任務(wù)處理函數(shù)*
arg:任務(wù)參數(shù)*返回值: 0*********************************************************************/int pool_add_task(pool_t *pool, pool_task_f process, void *arg){ enqueue_task(pool, process, arg); return 0;}/**********************************************************************功能: 線程池服務(wù)程序*參數(shù): 略*返回值: 略*********************************************************************/static void *pool_thread_server(void *arg){ pool_t *pool = NULL; pool = (pool_t *)arg; while(1){ pool_task *task = NULL; task = dequeue_task(pool); /*調(diào)用回調(diào)函數(shù),執(zhí)行任務(wù)*/ if(task != NULL){ printf ("thread 0x%lx is busy/n", pthread_self()); task->process(task->arg); free(task); task = NULL; } } /*這一句應(yīng)該是不可達(dá)的*/ pthread_exit(NULL); return NULL;}