* 本文修改后的代碼已上傳到GitHub網站Apollo項目中。
{ 1 }
線程池技術簡介
1線程池的定義
線程池是一種多線程形式,首先開啟指定數量的后臺工作線程,并將多個待執行任務添加到任務隊列,然后將隊列中的任務逐個交給空閑的工作線程執行(如下圖所示)。
2使用線程池的原因
創建/銷毀線程伴隨著操作系統的資源開銷,過于頻繁的創建/銷毀線程,會很大程度上影響處理效率。若創建線程消耗時間T1,執行任務消耗時間T2,銷毀線程消耗時間T3,如果T1+T3>T2,則開啟一個線程來執行一個任務就很不劃算,而使用線程池緩存線程,就可利用已有的閑置線程來執行新任務,有效避免T1+T3帶來的系統開銷。
線程并發數量過多,搶占系統資源從而導致阻塞。我們知道線程會共享系統資源,如果同時執行的線程數量過多,可能會導致系統資源不足而產生操作卡頓甚至出現假死現象,運用線程池能有效地控制線程最大并發數,有效避免上述問題。
對線程進行一些簡單的管理。比如:延時執行、定時循環執行等策略,運用線程池就較容易實現。
3C++中如何使用線程池
C++標準庫不提供線程池,如需使用需自行撰寫線程池類。GitHub中有多個線程池類的實現,Apollo項目也參考了其中的一個實現【https://github.com/vit-vit/CTPL】。
{ 2 }
Apollo線程池類源代碼分析
Apollo線程池文件位于[your_apollo_root_dir]/modules/common/util/ctpl_stl.h,包含任務隊列類Queue和線程池類ThreadPool,其中Queue位于命名空間apollo::common::util::detail內,ThreadPool位于命名空間apollo::common::util內。
1任務隊列類Queue
任務隊列類Queue基于C++標準庫的隊列類std::queue實現,只是對push、pop和empty三個函數進行了加鎖操作。
|
template class Queue { public: bool push(T const &value) { // 使用std::lock_guard效率更高 std::unique_lock lock(mutex_); q_.push(value); return true; } // deletes the retrieved element, do not use for non integral types bool pop(T &v) { // NOLINT // 使用std::lock_guard效率更高 std::unique_lock lock(mutex_); if (q_.empty()) { return false; } v = q_.front(); q_.pop(); return true; } bool empty() { // 使用std::lock_guard效率更高 std::unique_lock lock(mutex_); return q_.empty(); } private: std::queue q_; std::mutex mutex_;}; |
根據這篇博客【https://blog.csdn.net/tgxallen/article/details/73522233】的介紹,可使用std::lock_guard和std::unique_lock提供RAII(資源獲取即初始化,Resource Acquisition Is Initialization,參見該網頁【https://blog.csdn.net/doc_sgl/article/details/43028009】)風格的加鎖操作,其中std::lock_guard的系統開銷更小,std::unique_lock更為靈活(可適時解鎖)。就我們的任務隊列類Queue而言,不需要std::unique_lock提供的靈活性,因此使用std::lock_guard更為合適。另外,我還增加一個接受右值引用的push函數,以方便下文中的ThreadPool使用,修改后的類如下:
|
class Queue { public: bool push(const T &value) { std::lock_guard lock(mutex_); q_.push(value); return true; } // 增加一個接受右值引用的push函數 bool push(T &&value) { std::lock_guard lock(mutex_); q_.emplace(std::move(value)); return true; } // deletes the retrieved element, do not use for non integral types bool pop(T &v) { // NOLINT std::lock_guard lock(mutex_); if (q_.empty()) { return false; } v = q_.front(); q_.pop(); return true; } bool empty() { std::lock_guard lock(mutex_); return q_.empty(); } private: std::queue q_; std::mutex mutex_;}; |
2線程池類ThreadPool
線程池類ThreadPool的主要功能是創建n_threads個后臺工作線程,將任務函數f包裝成std::function的形式存入任務隊列q_,根據當前工作線程空閑情況,適時從任務隊列q_中提取一個任務函數并執行之。注意復制構造函數ThreadPool(const ThreadPool &)、移動構造函數ThreadPool(ThreadPool &&)、復制運算符ThreadPool &operator=(const ThreadPool &)、移動運算符ThreadPool &operator=(ThreadPool &&)全部設置為private,表明禁止使用這些函數。其實C++11標準完成可以通過在函數聲明后加上= delete;的方式來禁用,源代碼以注釋的方式給出了這種實現方式。
下面分析該類中幾個比較重要的成員函數。
2.2.1 Push函數
Push函數的作用是將任務函數f包裝成std::function的形式存入任務隊列q_。Push函數有兩個版本,一個允許任務函數f帶可變參數Rest &&... rest,一個不允許任務函數f帶額外參數,函數體內部代碼大同小異,下面以帶可變參數的版本進行分析,代碼如下:
|
template auto Push(F &&f, Rest &&... rest) -> std::future { // std::placeholders::_1表示通過std::bind函數綁定后得到的異步任務對象接受的第一個參數是自由參數 auto pck = std::make_shared>( std::bind(std::forward(f), std::placeholders::_1, std::forward(rest)...)); // 最好使用std::make_shared創建智能指針對象,后面不用操心指針內存的釋放 // auto _f = std::make_shared>([pck](int id) { (*pck)(id); }); auto _f = new std::function([pck](int id) { (*pck)(id); }); q_.push(_f); // 這里不要加鎖,否則易引起死鎖 std::unique_lock lock(mutex_); cv_.notify_one(); return pck->get_future();} |
Push函數的返回值為一個std::future對象,std::future對象內存儲的數據類型由(f(0, rest...)函數的返回值類型確定,decltype(f(0, rest...))的作用就是獲取(f(0, rest...)函數的返回值類型。std::future提供一種異步操作結果的訪問機制,從字面意思來理解,它表示未來,我覺得這個名字非常貼切,因為一個異步操作的結果不可能馬上獲取,只能在未來某個時候得到。關于std::future,這篇博客【https://blog.csdn.net/yockie/article/details/50595958】講得挺不錯,大家可以借鑒。
因為任務函數f的聲明各式各樣,有的不帶參數,有的接受一個參數,有的接受兩個參數……因此不能將其直接存儲到任務隊列q_,于是先利用std::bind函數將其包裝為一個異步操作任務std::packaged_task對象pck(接受一個整型參數,返回值類型為(f(0, rest...)函數的返回值類型),再利用Lambda表達式將pck包裝為一個std::function對象,這樣就可以存儲到任務隊列q_中了。這里原作者直接使用new運算符創建裸指針_f,后面還需想辦法釋放指針內存,我認為不是很合適,使用std::make_shared創建智能指針可以自動管理內存,更加省事,但使用std::shared_ptr>智能指針就不能使用Queue::push(const T &value)版本將其存儲到任務隊列,為此我在Queue類中添加了一個接受右值引用參數的版本Queue::push(T &&value),使用該版本就可以順利將智能指針存儲進去了。
接下來,使用條件變量std::condition_variable對象cv_.notify_one()函數通知各個線程任務隊列已經發生了改變,讓空閑線程趕緊從任務隊列中拉取新任務執行;最后通過pck->get_future()返回一個std::future對象,以便調用者能從中取出函數執行完畢后的返回值。
我看過很多C++多線程方面的書籍(”C++ Concurrency in Action”比較經典),一般不對cv_.notify_one();進行加鎖操作,因為這樣做除了降低效率外,還很容易引起死鎖,故需去除加鎖操作,具體原因參見該網頁【https://stackoverflow.com/questions/17101922/do-i-have-to-acquire-lock-before-calling-condition-variable-notify-one】以及另一個網頁【http://en.cppreference.com/w/cpp/thread/condition_variable/notify_one】。
以下是修改后的版本:
|
template auto Push(F &&f, Rest &&... rest) -> std::future { auto pck = std::make_shared>( std::bind(std::forward(f), std::placeholders::_1, std::forward(rest)...)); auto _f = std::make_shared>( [pck](int id) { (*pck)(id); }); // It is not necessary to lock q_ because it is locked in the Queue class. q_.push(std::move(_f)); cv_.notify_one(); return pck->get_future();} |
2.2.2Pop函數
Pop函數的作用是從任務隊列q_中取出并返回一個任務,代碼如下:
|
std::function Pop() { std::function *_f = nullptr; q_.pop(_f); // 如果任務隊列q_中存儲的是智能指針,就不必使用這種小花招來釋放內存了。 std::unique_ptr> func( _f); // at return, delete the function even if an exception occurred std::function f; if (_f) f = *_f; return f;} |
首先,從從任務隊列q_中取出一個任務函數對象的裸指針_f,若非空,則將其賦值給std::function f并返回。該函數里使用一個小花招,即創建一個智能指針std::unique_ptr> func(_f),當超出該對象的作用域時,就會在其析構函數中調用delete運算符釋放內存。如果任務隊列q_中存儲的是智能指針,就不必使用這種小花招來釋放內存了。
以下是修改后的版本:
|
std::shared_ptr> Pop() { std::shared_ptr> f; q_.pop(f); return f;} |
2.2.3Stop函數
Stop函數的作用停止線程池工作,若不允許等待,則直接停止當前正在執行的工作線程,同時清空任務隊列;若允許等待,則等待當前正在執行的工作線程完成,代碼如下:
|
void Stop(bool is_wait = false) { if (!is_wait) { if (is_stop_) { return; } is_stop_ = true; for (int i = 0, n = size(); i < n; ++i) { ? ? ? ?*(flags_[i]) = true; ?// command the threads to stop ? ? ?} ? ? ?ClearQueue(); ?// empty the queue ? ?} else { ? ? ?if (is_done_ || is_stop_) return; ? ? ?is_done_ = true; ?// give the waiting threads a command to finish ? ?} ? ?{ ? ? ?// 這里不要加鎖,否則易引起死鎖 ? ? ? ?std::unique_lock lock(mutex_); cv_.notify_all(); // stop all waiting threads } for (int i = 0; i < static_cast(threads_.size()); ++i) { // wait for the computing threads to finish if (threads_[i]->joinable()) { threads_[i]->join(); } } // if there were no threads in the pool but some functions in the queue, the // functions are not deleted by the threads // therefore delete them here ClearQueue(); threads_.clear(); flags_.clear();} |
函數中的布爾變量is_stop_、is_done_、flags_[i]為什么都不用加鎖呢?這是因為它們都是原子類型std::atomic,所謂原子類型就是一條CPU指令就能完成取值或寫值操作的變量類型。C++標準可保證std::atomic類型變量在任何架構操作系統中均只使用一條CPU指令就可完成取值或寫值操作,其他形如std::atomic的類型,雖然將其聲明為原子類型,但在某些架構操作系統中,并不能只使用一條CPU指令完成取值或寫值操作。綜上所述,std::atomic類型的變量可以在多線程中不加鎖操作。
根據2.2.1節的分析,cv_.notify_all();的加鎖操作應去除。
修改后的代碼如下:
|
void Stop(bool is_wait = false) { if (!is_wait) { if (is_stop_) { return; } is_stop_ = true; for (int i = 0, n = size(); i < n; ++i) { ? ? ? ?*(flags_[i]) = true; ?// command the threads to stop ? ? ?} ? ? ?ClearQueue(); ?// empty the queue ? ?} else { ? ? ?if (is_done_ || is_stop_) return; ? ? ?is_done_ = true; ?// give the waiting threads a command to finish ? ?} ? ?cv_.notify_all(); ?// stop all waiting threads ? ?for (int i = 0; i < static_cast(threads_.size()); ++i) { // wait for the computing threads to finish if (threads_[i]->joinable()) { threads_[i]->join(); } } // if there were no threads in the pool but some functions in the queue, the // functions are not deleted by the threads // therefore delete them here ClearQueue(); threads_.clear(); flags_.clear();} |
2.2.4ClearQueue函數
ClearQueue函數的作用是清空任務隊列q_,代碼如下:
|
void ClearQueue() { std::function *_f; // empty the queue while (q_.pop(_f)) { delete _f; }} |
使用while循環從任務隊列q_中逐個彈出任務函數指針_f,因為_f使用new運算符創建,故需使用delete運算符刪除以釋放內存。如果任務隊列q_中存儲的是智能指針,就不必手工刪除對象來釋放內存了。
以下是使用智能指針的版本:
|
void ClearQueue() { std::shared_ptr> f; // empty the queue while (q_.pop(f)) { // do nothing } } |
2.2.5Resize函數
Resize函數的作用是更改線程池內工作線程的數量,代碼如下:
|
void Resize(const int n_threads) { if (!is_stop_ && !is_done_) { int old_n_threads = static_cast(threads_.size()); if (old_n_threads <= ? ? ? ? ?n_threads) { ?// if the number of threads is increased ? ? ? ?threads_.resize(n_threads); ? ? ? ?flags_.resize(n_threads); ? ? ? ?for (int i = old_n_threads; i < n_threads; ++i) { ? ? ? ? ?flags_[i] = std::make_shared>(false); SetThread(i); } } else { // the number of threads is decreased for (int i = old_n_threads - 1; i >= n_threads; --i) { *(flags_[i]) = true; // this thread will finish threads_[i]->detach(); } { // stop the detached threads that were waiting // 這里不要加鎖,否則易引起死鎖 std::unique_lock lock(mutex_); cv_.notify_all(); } // safe to delete because the threads are detached threads_.resize(n_threads); // safe to delete because the threads // have copies of shared_ptr of the // flags, not originals flags_.resize(n_threads); } }} |
如果兩個變量is_stop_、is_done_都不為真,表明線程池仍在使用,可以更改線程池內工作線程的數量,否則沒必要對一個停用的線程池更改工作線程的數量。若新線程數n_threads大于當前的工作線程數old_n_threads,則將工作線程數組threads_和線程標志數組flags_的尺寸修改為新數目,同時使用for循環調用SetThread(i)函數逐個重新創建工作線程;若新線程數n_threads小于當前的工作線程數old_n_threads,則將先完成old_n_threads - n_threads個線程正在執行的任務,之后將工作線程數組threads_和線程標志數組flags_的尺寸修改為新數目。
根據2.2.1節的分析,cv_.notify_all();的加鎖操作應去除,具體原因參見該網頁
注意:Resize函數很危險,應盡量少調用,若必須調用,則應當在創建線程池的那個線程內調用,而不要在其他線程中調用。
修改的代碼如下:
|
void Resize(const int n_threads) { if (!is_stop_ && !is_done_) { int old_n_threads = static_cast(threads_.size()); if (old_n_threads <= ? ? ? ? ?n_threads) { ?// if the number of threads is increased ? ? ? ?threads_.resize(n_threads); ? ? ? ?flags_.resize(n_threads); ? ? ? ?for (int i = old_n_threads; i < n_threads; ++i) { ? ? ? ? ?flags_[i] = std::make_shared>(false); SetThread(i); } } else { // the number of threads is decreased for (int i = old_n_threads - 1; i >= n_threads; --i) { *(flags_[i]) = true; // this thread will finish threads_[i]->detach(); } // stop the detached threads that were waiting cv_.notify_all(); // safe to delete because the threads are detached threads_.resize(n_threads); // safe to delete because the threads // have copies of shared_ptr of the // flags, not originals flags_.resize(n_threads); } }} |
2.2.6SetThread函數
SetThread函數的作用重新創建指定序號i的工作線程,代碼如下:
|
void SetThread(int i) { std::shared_ptr> flag( flags_[i]); // a copy of the shared ptr to the flag auto f = [this, i, flag /* a copy of the shared ptr to the flag */]() { std::atomic &_flag = *flag; std::function *_f; bool is_pop_ = q_.pop(_f); while (true) { while (is_pop_) { // if there is anything in the queue // 如果任務隊列q_中存儲的是智能指針,就不必使用這種小花招來釋放內存了。 std::unique_ptr> func( _f); // at return, delete the function even if an exception // occurred // 執行任務函數 (*_f)(i); if (_flag) { // the thread is wanted to stop, return even if the queue is not // empty yet return; } else { is_pop_ = q_.pop(_f); } } // the queue is empty here, wait for the next command // 這里必須使用std::unique_lock,因為后面條件變量cv_等待期間,需要解鎖。 std::unique_lock lock(mutex_); ++n_waiting_; // 等待任務隊列傳來的新任務 cv_.wait(lock, [this, &_f, &is_pop_, &_flag]() { is_pop_ = q_.pop(_f); return is_pop_ || is_done_ || _flag; }); --n_waiting_; if (!is_pop_) { // if the queue is empty and is_done_ == true or *flag // then return return; } } }; threads_[i].reset( new std::thread(f)); // compiler may not support std::make_unique() } |
上述代碼看起來比較復雜,實際上只有三條語句,第一條是std::shared_ptr> flag(flags_[i]);,即使用flags_[i]來初始化標志變量flag;第二條看起來很長,實際上就是創建一個Lambda表達式變量f;第三條是threads_[i].reset(new std::thread(f));,使用Lambda表達式變量f作為工作線程的任務函數,創建序號為i的工作線程。
那么Lambda表達式變量f何時啟動呢?當任務隊列q_.pop(_f)的返回值為true時,表明從任務隊列q_中取到了一個新任務,于是調用(*_f)(i);執行之,如果當前任務隊列沒有任務,則使用:
|
cv_.wait(lock, [this, &_f, &is_pop_, &_flag]() { is_pop_ = q_.pop(_f); return is_pop_ || is_done_ || _flag;}); |
等待新任務的到來,在新任務到來之前,當前工作線程處于休眠狀態。
該函數同樣使用一個小花招,即創建一個智能指針std::unique_ptr> func(_f),當超出該對象的作用域時,就會在其析構函數中調用delete運算符釋放內存。如果任務隊列q_中存儲的是智能指針,就不必使用這種小花招來釋放內存了。
2.2.7修改后的ThreadPool類代碼
為完整起見,這里給出修改后的ThreadPool類代碼。
|
class ThreadPool { public: ThreadPool() { Init(); } explicit ThreadPool(int n_threads) { Init(); Resize(n_threads); } // the destructor waits for all the functions in the queue to be finished ~ThreadPool() { Stop(true); } // get the number of running threads in the pool int size() { return static_cast(threads_.size()); } // number of idle threads int NumIdle() { return n_waiting_; } std::thread &GetThread(const int i) { return *(threads_[i]); } // change the number of threads in the pool // should be called from one thread, otherwise be careful to not interleave, // also with stop() // n_threads must be >= 0 void Resize(const int n_threads) { if (!is_stop_ && !is_done_) { int old_n_threads = static_cast(threads_.size()); if (old_n_threads <= ? ? ? ? ?n_threads) { ?// if the number of threads is increased ? ? ? ?threads_.resize(n_threads); ? ? ? ?flags_.resize(n_threads); ? ? ? ?for (int i = old_n_threads; i < n_threads; ++i) { ? ? ? ? ?flags_[i] = std::make_shared>(false); SetThread(i); } } else { // the number of threads is decreased for (int i = old_n_threads - 1; i >= n_threads; --i) { *(flags_[i]) = true; // this thread will finish threads_[i]->detach(); } // stop the detached threads that were waiting cv_.notify_all(); // safe to delete because the threads are detached threads_.resize(n_threads); // safe to delete because the threads // have copies of shared_ptr of the // flags, not originals flags_.resize(n_threads); } } } // empty the queue void ClearQueue() { std::shared_ptr> f; // empty the queue while (q_.pop(f)) { // do nothing } } // pops a functional wrapper to the original function std::shared_ptr> Pop() { std::shared_ptr> f; q_.pop(f); return f; } // wait for all computing threads to finish and stop all threads // may be called asynchronously to not pause the calling thread while waiting // if is_wait == true, all the functions in the queue are run, otherwise the // queue is cleared without running the functions void Stop(bool is_wait = false) { if (!is_wait) { if (is_stop_) { return; } is_stop_ = true; for (int i = 0, n = size(); i < n; ++i) { ? ? ? ?*(flags_[i]) = true; ?// command the threads to stop ? ? ?} ? ? ?ClearQueue(); ?// empty the queue ? ?} else { ? ? ?if (is_done_ || is_stop_) return; ? ? ?is_done_ = true; ?// give the waiting threads a command to finish ? ?} ? ?cv_.notify_all(); ?// stop all waiting threads ? ?for (int i = 0; i < static_cast(threads_.size()); ++i) { // wait for the computing threads to finish if (threads_[i]->joinable()) { threads_[i]->join(); } } // if there were no threads in the pool but some functions in the queue, the // functions are not deleted by the threads // therefore delete them here ClearQueue(); threads_.clear(); flags_.clear(); } template auto Push(F &&f, Rest &&... rest) -> std::future { auto pck = std::make_shared>( std::bind(std::forward(f), std::placeholders::_1, std::forward(rest)...)); auto _f = std::make_shared>( [pck](int id) { (*pck)(id); }); // It is not necessary to lock q_ because it is locked in the Queue class. q_.push(std::move(_f)); cv_.notify_one(); return pck->get_future(); } // run the user's function that excepts argument int - id of the running // thread. returned value is templatized // operator returns std::future, where the user can get the result and rethrow // the catched exceptins template auto Push(F &&f) -> std::future { auto pck = std::make_shared>( std::forward(f)); auto _f = std::make_shared>( [pck](int id) { (*pck)(id); }); // It is not necessary to lock q_ because it is locked in the Queue class. q_.push(std::move(_f)); cv_.notify_one(); return pck->get_future(); } private: // deleted ThreadPool(const ThreadPool &); // = delete; ThreadPool(ThreadPool &&); // = delete; ThreadPool &operator=(const ThreadPool &); // = delete; ThreadPool &operator=(ThreadPool &&); // = delete; void SetThread(int i) { std::shared_ptr> flag( flags_[i]); // a copy of the shared ptr to the flag auto f = [this, i, flag /* a copy of the shared ptr to the flag */]() { std::atomic &_flag = *flag; std::shared_ptr> _f; bool is_pop_ = q_.pop(_f); while (true) { while (is_pop_) { // if there is anything in the queue (*_f)(i); if (_flag) { // the thread is wanted to stop, return even if the queue is not // empty yet return; } else { is_pop_ = q_.pop(_f); } } // the queue is empty here, wait for the next command { std::unique_lock lock(mutex_); ++n_waiting_; cv_.wait(lock, [this, &_f, &is_pop_, &_flag]() { is_pop_ = q_.pop(_f); return is_pop_ || is_done_ || _flag; }); --n_waiting_; if (!is_pop_) { // if the queue is empty and is_done_ == true or *flag // then return return; } } } }; threads_[i].reset( new std::thread(f)); // compiler may not support std::make_unique() } void Init() { is_stop_ = false; is_done_ = false; n_waiting_ = 0; } std::vector> threads_; std::vector>> flags_; detail::Queue>> q_; std::atomic is_done_; std::atomic is_stop_; std::atomic n_waiting_; // how many threads are waiting std::mutex mutex_; std::condition_variable cv_;}; |
2.2.8增加的單元測試代碼
為檢驗修改后代碼的正確性,增添如下單元測試代碼。第一個待測試函數filter_duplicates_str接受的第一個參數為一個整型ID值,我在測試代碼中只是將其作為一個占位符,實際并未使用,后面接受四個C風格字符串,該函數的任務是去除四個字符串中的重復詞并把去重后的結果按字母升序排列,結果以std::string的形式返回;第二個待測試函數filter_duplicates只接受的一個整型ID值參數,我在測試代碼中只是將其作為一個占位符,實際并未使用,該函數的任務是去除一串固定字符串中的重復詞并把去重后的結果按字母升序排列,結果以std::string的形式返回。因為C++編譯器不能推導出重載函數的正確版本,因此第二個待測函數并未使用重載函數形式。兩個待測函數均使用線程池執行1000次,最后檢查返回結果與預期結果的一致性。
|
#include "modules/common/util/ctpl_stl.h"#include #include #include #include #include #include #include "gtest/gtest.h"namespace apollo {namespace common {namespace util {namespace {// ...// Attention: don't use overloaded functions, otherwise the compiler can't// deduce the correct edition.std::string filter_duplicates_str(int id, const char* str1, const char* str2, const char* str3, const char* str4) { // id is unused. std::stringstream ss_in; ss_in << str1 << " " << str2 << " " << str3 << " " << str4; ?std::set string_set; std::istream_iterator beg(ss_in); std::istream_iterator end; std::copy(beg, end, std::inserter(string_set, string_set.end())); std::stringstream ss_out; std::copy(std::begin(string_set), std::end(string_set), std::ostream_iterator(ss_out, " ")); return ss_out.str();}std::string filter_duplicates(int id) { // id is unused. std::stringstream ss_in; ss_in << "a a b b b c foo foo bar foobar foobar hello world hello hello world"; ?std::set string_set; std::istream_iterator beg(ss_in); std::istream_iterator end; std::copy(beg, end, std::inserter(string_set, string_set.end())); std::stringstream ss_out; std::copy(std::begin(string_set), std::end(string_set), std::ostream_iterator(ss_out, " ")); return ss_out.str();}} // namespaceTEST(ThreadPool, filter_duplicates) { const unsigned int hardware_threads = std::thread::hardware_concurrency(); const unsigned int threads = std::min(hardware_threads != 0 ? hardware_threads : 2, 50U); ThreadPool p(threads); std::vector> futures1, futures2; for (int i = 0; i < 1000; ++i) { ? ?futures1.push_back(std::move(p.Push( ? ? ? ?filter_duplicates_str, "thread pthread", "pthread thread good news", ? ? ? ?"today is a good day", "she is a six years old girl"))); ? ?futures2.push_back(std::move(p.Push(filter_duplicates))); ?} ?for (int i = 0; i < 1000; ++i) { ? ?std::string result1 = futures1[i].get(); ? ?std::string result2 = futures2[i].get(); ? ?EXPECT_STREQ( ? ? ? ?result1.c_str(), ? ? ? ?"a day girl good is news old pthread she six thread today years "); ? ?EXPECT_STREQ(result2.c_str(), "a b bar c foo foobar hello world "); ?}}} ?// namespace util} ?// namespace common} ?// namespace apollo |
{ 3 }
Apollo Planning模塊對于線程池的使用分析
Apollo Planning模塊通過PlanningThreadPool類來完成對線程池ThreadPool的包裝調用。PlanningThreadPool類位于頭文件[your_apollo_root_dir]/modules/planning/common/planning_thread_pool.h及對應的實現文件[your_apollo_root_dir]/modules/planning/common/planning_thread_pool.cc中,位于命名空間apollo::planning內。
1PlanningThreadPool類
PlanningThreadPool類的聲明如下:
|
class PlanningThreadPool { public: void Init(); void Stop() { if (thread_pool_) { thread_pool_->Stop(true); } } template void Push(F &&f, Rest &&... rest) { func_.push_back(std::move(thread_pool_->Push(f, rest...))); } template void Push(F &&f) { func_.push_back(std::move(thread_pool_->Push(f))); } void Synchronize(); private: std::unique_ptr thread_pool_; bool is_initialized = false; // 這里的func_用得非常不恰當,因為這里保存的是std::future對象, // 而非std::function對象,將其修改為futures_很有必要。 std::vector> func_; DECLARE_SINGLETON(PlanningThreadPool);}; |
PlanningThreadPool通過宏DECLARE_SINGLETON定義一個單實例類,因此不能直接在棧(stack)和堆(heap)上創建該類對象,而只能通過PlanningThreadPool::instance()獲取該類的唯一實例。該類中的成員變量func_非常具有誤導性,實際上它是一個保存著多個std::future對象的動態數組,而不是保存std::function對象,也就是說它保存的是函數的異步返回值對象,而非異步函數對象本身,因此這里將其修改為futures_很有必要。
2PlanningThreadPool類的使用
在Planning模塊使用PlanningThreadPool類的步驟如下:
3.2.1初始化線程池
在Planning::Init()函數(位于[your_apollo_root_dir]/modules/planning/planning.cc)中添加如下語句完成PlanningThreadPool類對象的初始化:
|
// initialize planning thread pool PlanningThreadPool::instance()->Init(); |
3.2.2 利用線程池完成并發處理
在合適的位置調用線程池完成某個功能的并發處理,一般而言是在某個循環體內。注意:需進行并發處理的任務,相互之間不能有先后依賴關系,因為使用線程池執行并發任務時根本不知道哪個任務會先執行,哪個任務會后執行。
Planning模塊目前在以下幾處使用了線程池:
ReferenceLineInfo::AddObstacles函數ReferenceLineInfo::AddObstacles函數(位于[your_apollo_root_dir]/modules/planning/common/reference_line_info.cc中)在for循環內使用PlanningThreadPool::instance()->Push添加線程池任務,用于增加當前的障礙物信息,使用PlanningThreadPool::instance()->Synchronize()等待線程池任務全部完成。
|
bool ReferenceLineInfo::AddObstacles(const std::vector& obstacles) {if (FLAGS_use_multi_thread_to_add_obstacles) {std::vector ret(obstacles.size(), 0);for (size_t i = 0; i < obstacles.size(); ++i) { ?const auto* obstacle = obstacles.at(i); ?PlanningThreadPool::instance()->Push(std::bind( &ReferenceLineInfo::AddObstacleHelper, this, obstacle, &(ret[i])));}PlanningThreadPool::instance()->Synchronize();if (std::find(ret.begin(), ret.end(), 0) != ret.end()) { return false;}} else {// ...}return true;} |
DPRoadGraph::GenerateMinCostPath函數DPRoadGraph::GenerateMinCostPath函數(位于[your_apollo_root_dir]/modules/planning/tasks/dp_poly_path/dp_road_graph.cc中)在每級航點(way point)上多個橫向采樣點的for循環內使用PlanningThreadPool::instance()->Push添加線程池任務,用于計算本級航點的最小代價,使用PlanningThreadPool::instance()->Synchronize()等待線程池任務全部完成。
|
bool DPRoadGraph::GenerateMinCostPath( const std::vector &obstacles, std::vector *min_cost_path) { // ... for (std::size_t level = 1; level < path_waypoints.size(); ++level) { ? ?const auto &prev_dp_nodes = graph_nodes.back(); ? ?const auto &level_points = path_waypoints[level]; ? ?graph_nodes.emplace_back(); ? ?for (size_t i = 0; i < level_points.size(); ++i) { ? ? ?const auto &cur_point = level_points[i]; ? ? ?graph_nodes.back().emplace_back(cur_point, nullptr); ? ? ?auto &cur_node = graph_nodes.back().back(); ? ? ?if (FLAGS_enable_multi_thread_in_dp_poly_path) { ? ? ? ?PlanningThreadPool::instance()->Push(std::bind( &DPRoadGraph::UpdateNode, this, std::ref(prev_dp_nodes), level, total_level, &trajectory_cost, &(front), &(cur_node))); } else { UpdateNode(prev_dp_nodes, level, total_level, &trajectory_cost, &front, &cur_node); } } if (FLAGS_enable_multi_thread_in_dp_poly_path) { PlanningThreadPool::instance()->Synchronize(); } } // ...} |
DpStGraph::CalculateTotalCost函數DpStGraph::CalculateTotalCost函數(位于[your_apollo_root_dir]/modules/planning/tasks/dp_st_speed/dp_st_graph.cc中)在for循環內使用PlanningThreadPool::instance()->Push添加線程池任務,對于時間采樣值c上的不同距離采樣值r: next_lowest_row<=r<=next_highest_row計算抵達節點(c, r)的最小總代價,使用PlanningThreadPool::instance()->Synchronize()等待線程池任務全部完成。
|
Status DpStGraph::CalculateTotalCost() { // col and row are for STGraph // t corresponding to col // s corresponding to row uint32_t next_highest_row = 0; uint32_t next_lowest_row = 0; for (size_t c = 0; c < cost_table_.size(); ++c) { ? ?int highest_row = 0; ? ?int lowest_row = cost_table_.back().size() - 1; ? ?for (uint32_t r = next_lowest_row; r <= next_highest_row; ++r) { ? ? ?if (FLAGS_enable_multi_thread_in_dp_st_graph) { ? ? ? ?PlanningThreadPool::instance()->Push( std::bind(&DpStGraph::CalculateCostAt, this, c, r)); } else { CalculateCostAt(c, r); } } if (FLAGS_enable_multi_thread_in_dp_st_graph) { PlanningThreadPool::instance()->Synchronize(); } for (uint32_t r = next_lowest_row; r <= next_highest_row; ++r) { ? ? ?const auto& cost_cr = cost_table_[c][r]; ? ? ?if (cost_cr.total_cost() < std::numeric_limits::infinity()) { int h_r = 0; int l_r = 0; GetRowRange(cost_cr, &h_r, &l_r); highest_row = std::max(highest_row, h_r); lowest_row = std::min(lowest_row, l_r); } } next_highest_row = highest_row; next_lowest_row = lowest_row; } return Status::OK();} |
3.2.3 銷毀線程池
在Planning::Stop()函數(位于[your_apollo_root_dir]/modules/planning/planning.cc)中添加如下語句以便 銷毀線程池:
1 |
PlanningThreadPool::instance()->Stop(); |
自Apollo平臺開放已來,我們收到了大量開發者的咨詢和反饋,越來越多開發者基于Apollo擦出了更多的火花,并愿意將自己的成果貢獻出來,這充分體現了Apollo『貢獻越多,獲得越多』的開源精神。為此我們開設了『開發者說』板塊,希望開發者們能夠踴躍投稿,更好地為廣大自動駕駛開發者營造一個共享交流的平臺!
評論