問題產生
- 無論是Linux,RTOS,還是Android等開發,我們都會用到多線程編程;但是往往很多人在編程時,都很隨意的創建/銷毀線程的策略來實現多線程編程;很明顯這是不合理的做法,線程的創建/銷毀代價是很高的。那么我們要怎么去設計多線程編程呢???答案:對于長駐的線程,我們可以創建獨立的線程去執行。但是非長駐的線程,我們可以通過線程池的方式來處理這些線程。
線程池概述
-
線程池,它是一種多線程處理形式,處理過程中將任務添加到隊列,然后在創建線程后自動啟動這些任務。線程池線程都是后臺線程。每個線程都使用默認的堆棧大小,以默認的優先級運行,并處于多線程單元中。如果某個線程在托管代碼中空閑(如正在等待某個事件),則線程池將插入另一個輔助線程來使所有處理器保持繁忙。如果所有線程池線程都始終保持繁忙,但隊列中包含掛起的工作,則線程池將在一段時間后創建另一個輔助線程但線程的數目永遠不會超過最大值。超過最大值的線程可以排隊,但他們要等到其他線程完成后才啟動。
-
在一個系統中,線程數過多會帶來調度開銷,進而影響緩存局部性和整體性能。而線程池維護著多個線程,等待著監督管理者分配可并發執行的任務。這避免了在處理短時間任務時創建與銷毀線程的代價。線程池不僅能夠保證內核的充分利用,還能防止過分調度。可用線程數量應該取決于可用的并發處理器、處理器內核、內存、網絡sockets等的數量。線程數過多會導致額外的線程切換開銷。
-
線程的創建-銷毀對系統性能影響很大:
- 創建太多線程,將會浪費一定的資源,有些線程未被充分使用。
- 銷毀太多線程,將導致之后浪費時間再次創建它們。
- 創建線程太慢,將會導致長時間的等待,性能變差。
- 銷毀線程太慢,導致其它線程資源饑餓
-
線程池的應用場景:
- 單位時間內處理的任務頻繁,且任務時間較短;
- 對實時性要求較高。如果接收到任務之后再創建線程,可能無法滿足實時性的要求,此時必須使用線程池;
- 必須經常面對高突發性事件。比如 Web 服務器。如果有足球轉播,則服務器將產生巨大沖擊,此時使用傳統方法,則必須不停的大量創建、銷毀線程。此時采用動態線程池可以避免這種情況的發生。
-
線程池的應用例子:
- EventBus:它是Android的一個事件發布/訂閱輕量級框架。其中事件的異步發布就采用了線程池機制。
- Samgr:它是OpenHarmony的一個服務管理組件,解決多服務的管理的策略,減低了線程的創建開銷。
-
作者最近在開發的過程中,也遇到多線程編程問題,跨平臺,并發任務多,執行周期短。如果按照以往的反復的創建/銷毀線程,顯然不是一個很好的軟件設計。我們需要利用線程池的方式來解決我們問題。
TP(Thread Pool)組件
TP組件,又稱線程池組件。是作者編寫一個多線程管理組件,特點:
- 跨平臺:它支持任意的RTOS系統,Linux系統。
- 易移植:該組件默認支持CMSIS和POSIX接口,其他RTOS可以輕易適配兼容。
- 接口簡單:用戶操作接口簡單,只有三個接口:創建線程池,增加task到線程池,銷毀線程池。
TP原理
- ① 創建一個線程池,線程池中維護一個Task隊列,用于Task任務;理論上:線程池中線程數目至少一個,最多無數個,但是我們要系統能力決定。
- ② 應用層根據業務需求,創建對應Task,Task數目不限制,根據系統資源創建。
- ③ 應用層創建的Task,會被掛在Task隊列中。
- ④ 線程池的空閑線程,會檢測Task隊列中是否為空,如果Task隊列不為空,則提取一個Task在線程中執行。
TP實現
適配層實現
為了實現跨平臺,需要將差異性接口抽象出來,我們整個組件需要抽象幾個內容:①日志接口;②內存管理接口;③ 線程接口;④互斥量接口;⑤信號量接口。以CMSIS接口為例的實現:
- 錯誤碼:提供了四種錯誤碼:無錯誤,錯誤,內存不足,無效參數。
typedefenum{
TP_EOK=0,//Thereisnoerror
TP_ERROR,//Agenericerrorhappens
TP_ENOMEM,//Nomemory
TP_EINVAL,//Invalidargument
}TpErrCode;
- 日志接口適配:
- 需修改宏定義:TP_PRINT;
- 支持三個等級日志打印:錯誤信息日志,運行信息日志,調試信息日志的打印。并且支持帶顏色。
#defineTP_PRINTprintf
#defineTP_LOGE(...)TP_PRINT("33[31;22m[E/TP](%s:%d)",__FUNCTION__,__LINE__);
TP_PRINT(__VA_ARGS__);
TP_PRINT("33[0mn")
#defineTP_LOGI(...)TP_PRINT("33[32;22m[I/TP](%s:%d)",__FUNCTION__,__LINE__);
TP_PRINT(__VA_ARGS__);
TP_PRINT("33[0mn")
#defineTP_LOGD(...)TP_PRINT("[D/TP](%s:%d)",__FUNCTION__,__LINE__);
TP_PRINT(__VA_ARGS__);
TP_PRINT("n")
- 內存接口:只需適配申請內存和釋放內存宏定義
#defineTP_MALLOCmalloc
#defineTP_FREEfree
- 線程接口:
//tp_def.h
typedefvoid*TpThreadId;
typedefvoid*(*tpThreadFunc)(void*argv);
typedefstruct{
char*name;
uint32_tstackSize;
uint32_tpriority:8;
uint32_treserver:24;
}TpThreadAttr;
TpThreadIdTpThreadCreate(tpThreadFuncfunc,void*argv,constTpThreadAttr*attr);
voidTpThreadDelete(TpThreadIdthread);
- 創建線程: TpThreadId TpThreadCreate(tpThreadFunc func, void *argv, const TpThreadAttr *attr);
「參數」 | 「說明」 |
---|---|
func | 線程入口函數 |
argv | 線程入口函數參數 |
attr | 線程屬性:線程名,棧空間,優先級 |
「返回」 | -- |
NULL | 創建失敗 |
線程句柄 | 創建成功 |
- 刪除線程:void TpThreadDelete(TpThreadId thread);
「參數」 | 「說明」 |
---|---|
thread | 線程句柄 |
- CMSIS適配:
//tp_threa_adapter.c
#include"tp_def.h"
#include"cmsis_os2.h"
TpThreadIdTpThreadCreate(tpThreadFuncfunc,void*argv,constTpThreadAttr*attr)
{
osThreadId_tthread=NULL;
osThreadAttr_ttaskAttr={
.name=attr->name,
.attr_bits=0,
.cb_mem=NULL,
.cb_size=0,
.stack_mem=NULL,
.stack_size=attr->stackSize,
.priority=(osPriority_t)attr->priority,
.tz_module=0,
.reserved=0,
};
thread=osThreadNew((osThreadFunc_t)func,argv,&taskAttr);
return(TpThreadId)thread;
}
voidTpThreadDelete(TpThreadIdthread)
{
if(thread!=NULL){
osThreadTerminate(thread);
}
}
- 互斥量接口:
//tp_def.h
typedefvoid*TpMutexId;
TpMutexIdTpMutexCreate(void);
TpErrCodeTpMutexLock(TpMutexIdmutex);
TpErrCodeTpMutexUnlock(TpMutexIdmutex);
voidTpMutexDelete(TpMutexIdmutex);
- 創建互斥量:TpMutexId TpMutexCreate(void);
「參數」 | 「說明」 |
---|---|
「返回」 | -- |
NULL | 創建失敗 |
互斥量句柄 | 創建成功 |
- 獲取互斥量:TpErrCode TpMutexLock(TpMutexId mutex);
「參數」 | 「說明」 |
---|---|
mutex | 互斥量句柄 |
「返回」 | -- |
TP_EINVAL | mutex無效參數 |
TP_ERROR | 獲取互斥量失敗 |
TP_EOK | 成功獲取互斥量 |
- 釋放互斥量:TpErrCode TpMutexUnlock(TpMutexId mutex);
「參數」 | 「說明」 |
---|---|
mutex | 互斥量句柄 |
「返回」 | -- |
TP_EINVAL | mutex無效參數 |
TP_ERROR | 釋放互斥量失敗 |
TP_EOK | 成功釋放互斥量 |
- 刪除互斥量:void TpMutexDelete(TpMutexId mutex);
「參數」 | 「說明」 |
---|---|
mutex | 互斥量句柄 |
- CMSIS適配:
//tp_mutex_adapter.c
#include"tp_def.h"
#include"cmsis_os2.h"
TpMutexIdTpMutexCreate(void)
{
osMutexId_tmutex=NULL;
mutex=osMutexNew(NULL);
return(TpMutexId)mutex;
}
TpErrCodeTpMutexLock(TpMutexIdmutex)
{
if(mutex==NULL){
returnTP_EINVAL;
}
if(osMutexAcquire((osMutexId_t)mutex,osWaitForever)==osOK){
returnTP_EOK;
}
returnTP_ERROR;
}
TpErrCodeTpMutexUnlock(TpMutexIdmutex)
{
if(mutex==NULL){
returnTP_EINVAL;
}
if(osMutexRelease((osMutexId_t)mutex)==osOK){
returnTP_EOK;
}
returnTP_ERROR;
}
voidTpMutexDelete(TpMutexIdmutex)
{
if(mutex==NULL){
return;
}
osMutexDelete(mutex);
}
- 信號量接口:
//tp_def.h
typedefvoid*TpSemId;
TpSemIdTpSemCreate(uint32_tvalue);
TpErrCodeTpSemAcquire(TpSemIdsem);
TpErrCodeTpSemRelease(TpSemIdsem);
voidTpSemDelete(TpSemIdsem);
- 創建信號量:TpSemId TpSemCreate(uint32_t value);
「參數」 | 「說明」 |
---|---|
「返回」 | -- |
NULL | 創建失敗 |
信號量句柄 | 創建成功 |
- 獲取信號量:TpErrCode TpSemAcquire(TpSemId sem);
「參數」 | 「說明」 |
---|---|
sem | 信號量句柄 |
「返回」 | -- |
TP_EINVAL | sem無效參數 |
TP_ERROR | 獲取信號量失敗 |
TP_EOK | 成功獲取信號量 |
- 釋放信號量:TpErrCode TpSemRelease(TpSemId sem);
「參數」 | 「說明」 |
---|---|
sem | 信號量句柄 |
「返回」 | -- |
TP_EINVAL | 信號量無效參數 |
TP_ERROR | 釋放信號量失敗 |
TP_EOK | 成功釋放信號量 |
- 刪除信號量:void TpSemDelete(TpSemId sem);
「參數」 | 「說明」 |
---|---|
sem | 信號量句柄 |
- CMSIS適配:
//tp_sem_adapter.c
#include"tp_def.h"
#include"cmsis_os2.h"
TpSemIdTpSemCreate(uint32_tvalue)
{
osSemaphoreId_tsem=NULL;
sem=osSemaphoreNew(1,value,NULL);
return(TpSemId)sem;
}
TpErrCodeTpSemAcquire(TpSemIdsem)
{
if(sem==NULL){
returnTP_EINVAL;
}
if(osSemaphoreAcquire((osSemaphoreId_t)sem,osWaitForever)!=osOK){
returnTP_ERROR;
}
returnTP_EOK;
}
TpErrCodeTpSemRelease(TpSemIdsem)
{
if(sem==NULL){
returnTP_EINVAL;
}
if(osSemaphoreRelease((osSemaphoreId_t)sem)!=osOK){
returnTP_ERROR;
}
returnTP_EOK;
}
voidTpSemDelete(TpSemIdsem)
{
if(sem==NULL){
return;
}
osSemaphoreDelete((osSemaphoreId_t)sem);
}
核心層實現
tp的提供的接口非常精簡:創建線程池,增加任務到線程池,銷毀線程池。
- 創建線程池:
- 接口描述:TpErrCode TpCreate(Tp *pool, const char *name, uint32_t stackSize, uint8_t threadNum);
「參數」 | 「說明」 |
---|---|
pool | 線程池句柄 |
name | 線程池中線程名字 |
stackSize | 線程池中線程的棧大小 |
theadNum | 線程池中線程數目 |
「返回」 | -- |
TP_EINVAL | pool無效參數 |
TP_ERROR | 創建失敗 |
TP_NOMEM | 內存不足 |
TP_EOK | 創建成功 |
-
接口實現:
- ①創建task隊列增刪互斥量:管理task隊列的增加及釋放的互斥關系,保證增加和釋放為同步策略。
- ②創建task隊列狀態信號量:當task隊列非空則釋放信號量,線程池中的線程可以從task隊列中獲取task執行。
- ③創建線程池中線程:根據threadNum參數,創建對應的線程數目。
TpErrCodeTpCreate(Tp*pool,constchar*name,
uint32_tstackSize,uint8_tthreadNum)
{
intindex=0;
if(pool==NULL){
TP_LOGE("ThreadpoolhandleisNULL");
returnTP_EINVAL;
}
//①
if((pool->queueLock=TpMutexCreate())==NULL){
TP_LOGE("Createthreadpoolmutexfailed");
returnTP_ERROR;
}
//②
if((pool->queueReady=TpSemCreate(0))==NULL){
TP_LOGE("Createthreadpoolsemfailed");
returnTP_ERROR;
}
pool->taskQueue=NULL;
pool->threadNum=threadNum;
pool->waitTaskNum=0;
pool->threads=(TpThreadInfo*)TP_MALLOC(threadNum*sizeof(TpThreadInfo));
if(pool->threads==NULL){
TP_LOGE("Mallocthreadpoolinfomemoryfailed");
returnTP_ENOMEM;
}
//③
for(index=0;indexthreads[index].attr.name=(char*)TP_MALLOC(TP_THREAD_NAME_LEN);
if(pool->threads[index].attr.name==NULL){
TP_LOGE("Mallocthreadnamememoryfailed");
returnTP_ENOMEM;
}
snprintf(pool->threads[index].attr.name,TP_THREAD_NAME_LEN,"%s%d",name,index);
pool->threads[index].attr.stackSize=stackSize;
pool->threads[index].attr.priority=TP_THREAD_PRIORITY;
pool->threads[index].threadId=TpThreadCreate(TpThreadHandler,pool,&pool->threads[index].attr);
}
returnTP_EOK;
}
- 增加任務到線程池:
- 接口描述:TpErrCode TpAddTask(Tp *pool, taskHandle handle, void *argv);
「參數」 | 「說明」 |
---|---|
pool | 線程池句柄 |
handle | 線程池中線程名字 |
argv | 線程池中線程的棧大小 |
「返回」 | -- |
TP_EINVAL | pool無效參數 |
TP_NOMEM | 內存不足 |
TP_EOK | 增加task成功 |
-
接口實現:
- ① 創建一個task句柄,并將注冊task函數和函數的入參。
- ② 獲取task隊列互斥量,避免增加隊列成員時,在釋放隊列成員。
- ③ 釋放task信號量,通知線程池中的線程可以從task隊列中獲取task執行
TpErrCodeTpAddTask(Tp*pool,taskHandlehandle,void*argv)
{
TpTask*newTask=NULL;
TpTask*taskLIst=NULL;
if(pool==NULL){
TP_LOGE("ThreadpoolhandleisNULL");
returnTP_EINVAL;
}
//①
newTask=(TpTask*)TP_MALLOC(sizeof(TpTask));
if(newTask==NULL){
TP_LOGE("Mallocnewtaskhandlememoryfailed");
returnTP_ENOMEM;
}
newTask->handle=handle;
newTask->argv=argv;
newTask->next=NULL;
//②
TpMutexLock(pool->queueLock);
taskLIst=pool->taskQueue;
if(taskLIst==NULL){
pool->taskQueue=newTask;
}
else{
while(taskLIst->next!=NULL){
taskLIst=taskLIst->next;
}
taskLIst->next=newTask;
}
pool->waitTaskNum++;
TpMutexUnlock(pool->queueLock);
//③
TpSemRelease(pool->queueReady);
returnTP_EOK;
}
- 銷毀線程池
- 接口描述:TpErrCode TpDestroy(Tp *pool);
「參數」 | 「說明」 |
---|---|
pool | 線程池句柄 |
「返回」 | -- |
TP_EINVAL | pool無效參數 |
TP_EOK | 銷毀成功 |
-
接口實現:
- ① 刪除線程池中所有線程。
- ② 刪除task隊列互斥量,task狀態信號量。
- ③ 刪除線程池的Task隊列。
TpErrCodeTpDestroy(Tp*pool)
{
intindex=0;
TpTask*head=NULL;
if(pool==NULL){
TP_LOGE("ThreadpoolhandleisNULL");
returnTP_EINVAL;
}
//①
for(index=0;indexthreadNum;index++){
TpThreadDelete(pool->threads[index].threadId);
pool->threads[index].threadId=NULL;
TP_FREE(pool->threads[index].attr.name);
pool->threads[index].attr.name=NULL;
}
//②
TpMutexDelete(pool->queueLock);
pool->queueLock=NULL;
TpSemDelete(pool->queueReady);
pool->queueReady=NULL;
TP_FREE(pool->threads);
pool->threads=NULL;
//③
while(pool->taskQueue!=NULL){
head=pool->taskQueue;
pool->taskQueue=pool->taskQueue->next;
TP_FREE(head);
}
pool=NULL;
returnTP_EOK;
}
- 線程池中線程函數
- 接口描述:static void *TpThreadHandler(void *argv)
「參數」 | 「說明」 |
---|---|
argv | 線程池參數 |
-
接口實現:
- ① 獲取task隊列互斥量,避免增加隊列成員時,在釋放隊列成員。
- ② 當task隊列為空時,將阻塞在獲取信號量,等待用戶增加task時釋放信號量。
- ③ 當task隊列不為空,則從task隊列中獲取task,并執行。
- ④ 當task執行完,會將對應的task句柄刪除。
staticvoid*TpThreadHandler(void*argv)
{
Tp*pool=(Tp*)argv;
TpTask*task=NULL;
while(1){
//①
TpMutexLock(pool->queueLock);
//②
while(pool->waitTaskNum==0){
TpMutexUnlock(pool->queueLock);
TpSemAcquire(pool->queueReady);
TpMutexLock(pool->queueLock);
}
//③
task=pool->taskQueue;
pool->waitTaskNum--;
pool->taskQueue=task->next;
TpMutexUnlock(pool->queueLock);
task->handle(task->argv);
//④
TP_FREE(task);
task=NULL;
}
}
TP應用
- 測試例程:
- 創建一個線程池,線程池中包含3個線程,線程的名字為tp,棧為1024byte。
- 在線程池中創建6個task,其中,task參數為taskId。
#include"tp_manage.h"
Tppool;
voidTestTaskHandle(void*argv)
{
printf("%s--taskId:%drn",__FUNCTION__,(uint32_t)argv);
}
intmain(void)
{
//①
TpCreate(&pool,"tp",1024,3);
//②
TpAddTask(&pool,TestTaskHandle,(void*)1);
TpAddTask(&pool,TestTaskHandle,(void*)2);
TpAddTask(&pool,TestTaskHandle,(void*)3);
TpAddTask(&pool,TestTaskHandle,(void*)4);
TpAddTask(&pool,TestTaskHandle,(void*)5);
TpAddTask(&pool,TestTaskHandle,(void*)6);
return0;
}
- RTOS中的CMSIS運行效果:
- Linux中POSIX接口運行效果:
總結
- 線程池是多線程的一個編程方式,它避免了線程的創建和銷毀的開銷,提高了系統的性能。
- 增加到線程池中的任務是非長駐的,不能存在死循環,否則她會一直持有線程池中的某一個線程。
-
TP線程池組件的開發倉庫鏈接:
歡迎關注微信公眾號『Rice嵌入式開發技術分享』
-
Linux
+關注
關注
87文章
11225瀏覽量
208915 -
RTOS
+關注
關注
21文章
809瀏覽量
119431 -
組件
+關注
關注
1文章
505瀏覽量
17802 -
線程
+關注
關注
0文章
504瀏覽量
19651
發布評論請先 登錄
相關推薦
評論