一、線(xiàn)程池概述
線(xiàn)程池是一種多線(xiàn)程處理形式,處理過(guò)程中將任務(wù)添加到隊(duì)列,然后在創(chuàng)建線(xiàn)程后自動(dòng)啟動(dòng)這些任務(wù)。線(xiàn)程池線(xiàn)程都是后臺(tái)線(xiàn)
程。每個(gè)線(xiàn)程都使用默認(rèn)的堆棧大小,以默認(rèn)的優(yōu)先級(jí)運(yùn)行,并處于多線(xiàn)程單元中。如果某個(gè)線(xiàn)程在托管代碼中空閑(如正在等待
某個(gè)事件),則線(xiàn)程池將插入另一個(gè)輔助線(xiàn)程來(lái)使所有處理器保持繁忙。如果所有線(xiàn)程池線(xiàn)程都始終保持繁忙,但隊(duì)列中包含掛起
的工作,則線(xiàn)程池將在一段時(shí)間后創(chuàng)建另一個(gè)輔助線(xiàn)程但線(xiàn)程的數(shù)目永遠(yuǎn)不會(huì)超過(guò)最大值。超過(guò)最大值的線(xiàn)程可以排隊(duì),但他們
要等到其他線(xiàn)程完成后才啟動(dòng)。應(yīng)用程序可以有多個(gè)線(xiàn)程,這些線(xiàn)程在休眠狀態(tài)中需要耗費(fèi)大量時(shí)間來(lái)等待事件發(fā)生。其他線(xiàn)程
可能進(jìn)入睡眠狀態(tài),并且僅定期被喚醒以輪循更改或更新?tīng)顟B(tài)信息,然后再次進(jìn)入休眠狀態(tài)。為了簡(jiǎn)化對(duì)這些線(xiàn)程的管理,為每
個(gè)進(jìn)程提供了一個(gè)線(xiàn)程池,一個(gè)線(xiàn)程池有若干個(gè)等待操作狀態(tài),當(dāng)一個(gè)等待操作完成時(shí),線(xiàn)程池中的輔助線(xiàn)程會(huì)執(zhí)行回調(diào)函數(shù)。
線(xiàn)程池中的線(xiàn)程由系統(tǒng)管理,程序員不需要費(fèi)力于線(xiàn)程管理,可以集中精力處理應(yīng)用程序任務(wù)。
使用線(xiàn)程池的好處:
1、減少在創(chuàng)建和銷(xiāo)毀線(xiàn)程上所花的時(shí)間以及系統(tǒng)資源的開(kāi)銷(xiāo);
2、如不使用線(xiàn)程池,有可能造成系統(tǒng)創(chuàng)建大量線(xiàn)程而導(dǎo)致消耗完系統(tǒng)內(nèi)存以及”過(guò)度切換”。
在什么情況下使用線(xiàn)程池:
1、單個(gè)任務(wù)處理的時(shí)間比較短;
2、將需處理的任務(wù)的數(shù)量大。
在什么情況下不使用線(xiàn)程池線(xiàn)程:
1、如果需要使一個(gè)任務(wù)具有特定優(yōu)先級(jí);
2、如果具有可能會(huì)長(zhǎng)時(shí)間運(yùn)行(并因此阻塞其他任務(wù))的任務(wù);
3、如果需要將線(xiàn)程放置到單線(xiàn)程單元中(線(xiàn)程池中的線(xiàn)程均處于多線(xiàn)程單元中);
4、如果需要永久標(biāo)識(shí)來(lái)標(biāo)識(shí)和控制線(xiàn)程,比如想使用專(zhuān)用線(xiàn)程來(lái)終止該線(xiàn)程,將其掛起或按名稱(chēng)發(fā)現(xiàn)它。
二、線(xiàn)程池程序
main.c
#include <stdio.h>#include <unistd.h>#include "thread_pool.h"/**********************************************************************功能:
任務(wù)處理函數(shù)*參數(shù): 無(wú)*返回值: NULL*********************************************************************/void *task_test(void *arg){ printf("/t/tworking on task %d/n", (int)arg); sleep(1); /*休息一秒,延長(zhǎng)任務(wù)的執(zhí)行時(shí)間*/ return NULL;}int main (int argc, char *argv[]){ pool_t pool; int i = 0; pool_init(&pool, 2);//初始化線(xiàn)程池 sleep(1); for(i = 0; i < 5; i++){ sleep(1); pool_add_task(&pool, task_test, (void *)i);//向線(xiàn)程池中添加一個(gè)任務(wù) } sleep(4); pool_uninit(&pool);//銷(xiāo)毀線(xiàn)程池 return 0;}
thread_pool.c
#include <stdio.h>#include <stdlib.h>#include <pthread.h>#include <assert.h>#include "thread_pool.h"static void *pool_thread_server(void *arg);/**********************************************************************功能: 初始化線(xiàn)程池結(jié)構(gòu)體并創(chuàng)建線(xiàn)程*參數(shù): pool:線(xiàn)程池句柄*
threads_limit:線(xiàn)程池中線(xiàn)程的數(shù)量*返回值: 無(wú)*********************************************************************/void pool_init(pool_t *pool, int threads_limit){ pool->threads_limit = threads_limit; pool->queue_head = NULL; pool->task_in_queue = 0; pool->destroy_flag = 0; /*創(chuàng)建存放線(xiàn)程ID的空間*/ pool->threadid = (pthread_t *)calloc(threads_limit, sizeof(pthread_t)); int i = 0; /*初始化互斥鎖和條件變量*/ pthread_mutex_init(&(pool->queue_lock), NULL); pthread_cond_init(&(pool->queue_ready), NULL); /*循環(huán)創(chuàng)建threads_limit個(gè)線(xiàn)程*/ for (i = 0; i < threads_limit; i++){ pthread_create(&(pool->threadid[i]), NULL, pool_thread_server, pool); } return;}/**********************************************************************功能: 銷(xiāo)毀線(xiàn)程池,等待隊(duì)列中的任務(wù)不會(huì)再被執(zhí)行,*
但是正在運(yùn)行的線(xiàn)程會(huì)一直,把任務(wù)運(yùn)行完后再退出*參數(shù): 線(xiàn)程池句柄*返回值: 成功:0,失敗非0*********************************************************************/int pool_uninit(pool_t *pool){ pool_task *head = NULL; int i; pthread_mutex_lock(&(pool->queue_lock)); if(pool->destroy_flag)/* 防止兩次調(diào)用 */ return -1; pool->destroy_flag = 1; pthread_mutex_unlock(&(pool->queue_lock)); /* 喚醒所有等待線(xiàn)程,線(xiàn)程池要銷(xiāo)毀了 */ pthread_cond_broadcast(&(pool->queue_ready)); /* 阻塞等待線(xiàn)程退出,否則就成僵尸了 */ for (i = 0; i < pool->threads_limit; i++) pthread_join(pool->threadid[i], NULL); free(pool->threadid); /* 銷(xiāo)毀等待隊(duì)列 */ pthread_mutex_lock(&(pool->queue_lock)); while(pool->queue_head != NULL){ head = pool->queue_head; pool->queue_head = pool->queue_head->next; free(head); } pthread_mutex_unlock(&(pool->queue_lock)); /*條件變量和互斥量也別忘了銷(xiāo)毀*/ pthread_mutex_destroy(&(pool->queue_lock)); pthread_cond_destroy(&(pool->queue_ready)); return 0;}/**********************************************************************功能: 向任務(wù)隊(duì)列中添加一個(gè)任務(wù)*參數(shù): pool:線(xiàn)程池句柄*
process:任務(wù)處理函數(shù)*
arg:任務(wù)參數(shù)*返回值: 無(wú)*********************************************************************/static void enqueue_task(pool_t *pool, pool_task_f process, void *arg){ pool_task *task = NULL; pool_task *member = NULL; pthread_mutex_lock(&(pool->queue_lock)); if(pool->task_in_queue >= pool->threads_limit){ printf("task_in_queue > threads_limit!/n"); pthread_mutex_unlock (&(pool->queue_lock)); return; } task = (pool_task *)calloc(1, sizeof(pool_task)); assert(task != NULL); task->process = process; task->arg = arg; task->next = NULL; pool->task_in_queue++; member = pool->queue_head; if(member != NULL){ while(member->next != NULL) /* 將任務(wù)加入到任務(wù)鏈連的最后位置. */ member = member->next; member->next = task; }else{ pool->queue_head = task; /* 如果是第一個(gè)任務(wù)的話(huà),就指向頭 */ } printf("/ttasks %d/n", pool->task_in_queue); /* 等待隊(duì)列中有任務(wù)了,喚醒一個(gè)等待線(xiàn)程 */ pthread_cond_signal (&(pool->queue_ready)); pthread_mutex_unlock (&(pool->queue_lock));}/**********************************************************************功能: 從任務(wù)隊(duì)列中取出一個(gè)任務(wù)*參數(shù): 線(xiàn)程池句柄*返回值: 任務(wù)句柄*********************************************************************/static pool_task *dequeue_task(pool_t *pool){ pool_task *task = NULL; pthread_mutex_lock(&(pool->queue_lock)); /* 判斷線(xiàn)程池是否要銷(xiāo)毀了 */ if(pool->destroy_flag){ pthread_mutex_unlock(&(pool->queue_lock)); printf("thread 0x%lx will be destroyed/n", pthread_self()); pthread_exit(NULL); } /* 如果等待隊(duì)列為0并且不銷(xiāo)毀線(xiàn)程池,則處于阻塞狀態(tài) */ if(pool->task_in_queue == 0){ while((pool->task_in_queue == 0) && (!pool->destroy_flag)){ printf("thread 0x%lx is leisure/n", pthread_self()); /* 注意:pthread_cond_wait是一個(gè)原子操作,等待前會(huì)解鎖,喚醒后會(huì)加鎖 */ pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock)); } }else{ /* 等待隊(duì)列長(zhǎng)度減去1,并取出隊(duì)列中的第一個(gè)元素 */ pool->task_in_queue--; task = pool->queue_head; pool->queue_head = task->next; printf("thread 0x%lx received a task/n", pthread_self()); } pthread_mutex_unlock(&(pool->queue_lock)); return task;}/**********************************************************************功能: 向線(xiàn)程池中添加一個(gè)任務(wù)*參數(shù): pool:線(xiàn)程池句柄*
process:任務(wù)處理函數(shù)*
arg:任務(wù)參數(shù)*返回值: 0*********************************************************************/int pool_add_task(pool_t *pool, pool_task_f process, void *arg){ enqueue_task(pool, process, arg); return 0;}/**********************************************************************功能: 線(xiàn)程池服務(wù)程序*參數(shù): 略*返回值: 略*********************************************************************/static void *pool_thread_server(void *arg){ pool_t *pool = NULL; pool = (pool_t *)arg; while(1){ pool_task *task = NULL; task = dequeue_task(pool); /*調(diào)用回調(diào)函數(shù),執(zhí)行任務(wù)*/ if(task != NULL){ printf ("thread 0x%lx is busy/n", pthread_self()); task->process(task->arg); free(task); task = NULL; } } /*這一句應(yīng)該是不可達(dá)的*/ pthread_exit(NULL); return NULL;}